feat: add asynchronous chunker for faster ingest

This commit is contained in:
Chris Anderson 2025-07-07 22:35:34 -07:00
parent d0577b1636
commit a976e9bd3a
2 changed files with 52 additions and 3 deletions

View file

@ -12,7 +12,7 @@ import { PageParams } from '../core/searchParams'
import { allLists } from '../lists/ListService'
import { allTemplates, duplicateTemplate, screenshotHtml, templateInUserLocale, validateTemplates } from '../render/TemplateService'
import { getSubscription, getUserSubscriptionState } from '../subscriptions/SubscriptionService'
import { chunk, Chunker, pick, shallowEqual } from '../utilities'
import { AsyncChunker, chunk, pick, shallowEqual } from '../utilities'
import { getProvider } from '../providers/ProviderRepository'
import { createTagSubquery, getTags, setTags } from '../tags/TagService'
import { getProject } from '../projects/ProjectService'
@ -308,7 +308,7 @@ export const generateSendList = async (campaign: SentCampaign) => {
http_headers_progress_interval_ms: '110000', // 110 seconds
})
const chunker = new Chunker<CampaignSendParams>(async items => {
const chunker = new AsyncChunker<CampaignSendParams>(async items => {
await App.main.db.transaction(async (trx) => {
await CampaignSend.query(trx)
.insert(items)
@ -331,6 +331,8 @@ export const generateSendList = async (campaign: SentCampaign) => {
}
}
// Most of the work will happen here since ClickHouse should return
// records fairly quickly
await chunker.flush()
logger.info({ campaignId: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress:finished')

View file

@ -279,7 +279,7 @@ export class Chunker<T> {
#items: T[] = []
constructor(
private callback: (batch: T[]) => Promise<void>,
private callback: ChunkCallback<T>,
private size: number,
) {}
@ -300,6 +300,53 @@ export class Chunker<T> {
}
}
export class AsyncChunker<T> {
private buffer: T[] = []
private isProcessing = false
constructor(
private readonly callback: ChunkCallback<T>,
private readonly batchSize: number,
) {}
add(item: T): void {
this.buffer.push(item)
if (this.buffer.length >= this.batchSize) {
this.triggerProcessing()
}
}
private triggerProcessing(): void {
if (this.isProcessing) return
this.isProcessing = true
setTimeout(async () => {
const batch = this.buffer.splice(0, this.batchSize)
try {
await this.callback(batch)
} finally {
this.isProcessing = false
if (this.buffer.length >= this.batchSize) {
this.triggerProcessing()
}
}
}, 0)
}
async flush(): Promise<void> {
this.isProcessing = true
while (this.buffer.length > 0) {
const batch = this.buffer.splice(0, this.batchSize)
await this.callback(batch)
}
}
}
export function visit<T>(item: T, children: (item: T) => undefined | T[], callback: (item: T) => void) {
callback(item)
const items = children(item)