chore: move campaign generation to new method

This commit is contained in:
Chris Anderson 2025-07-25 21:54:20 -05:00
parent 55fc56e98d
commit 8670bc0bc4
3 changed files with 47 additions and 87 deletions

View file

@ -12,7 +12,7 @@ import { PageParams } from '../core/searchParams'
import { allLists } from '../lists/ListService'
import { allTemplates, duplicateTemplate, screenshotHtml, templateInUserLocale, validateTemplates } from '../render/TemplateService'
import { getSubscription, getUserSubscriptionState } from '../subscriptions/SubscriptionService'
import { chunk, Chunker, cleanString, pick, shallowEqual } from '../utilities'
import { chunk, cleanString, pick, shallowEqual } from '../utilities'
import { getProvider } from '../providers/ProviderRepository'
import { createTagSubquery, getTags, setTags } from '../tags/TagService'
import { getProject } from '../projects/ProjectService'
@ -21,13 +21,14 @@ import CampaignGenerateListJob from './CampaignGenerateListJob'
import Project from '../projects/Project'
import Template from '../render/Template'
import { differenceInDays, subDays } from 'date-fns'
import { cacheBatchHash, cacheBatchReadHashAndDelete, cacheDel, cacheGet, cacheHashExists, cacheIncr, cacheSet, DataPair, HashScanCallback } from '../config/redis'
import { cacheDel, cacheGet, cacheIncr, cacheSet, DataPair } from '../config/redis'
import App from '../app'
import CampaignAbortJob from './CampaignAbortJob'
import { getRuleQuery } from '../rules/RuleEngine'
import { getJourneysForCampaign } from '../journey/JourneyService'
import { createAuditLog } from '../auth/AuditService'
import { WithAdmin } from '../auth/Audit'
import { createAuditLog } from '../core/audit/AuditService'
import { WithAdmin } from '../core/audit/Audit'
import { processUsers } from '../users/ProcessUsers'
export const CacheKeys = {
pendingStats: 'campaigns:pending_stats',
@ -332,69 +333,6 @@ export const updateSendState = async ({ campaign, user, state = 'sent', referenc
return records
}
const generateSendList = async (project: Project, campaign: SentCampaign, callback: HashScanCallback) => {
const redis = App.main.redis
const hashKey = CacheKeys.generate(campaign)
const hashExists = await cacheHashExists(redis, hashKey)
const isReady = await cacheGet(redis, CacheKeys.generateReady(campaign))
logger.info({
campaignId: campaign.id,
source: hashExists ? 'cache' : 'clickhouse',
}, 'campaign:generate:progress:started')
// Return users from the hash if they exist
if (hashExists && isReady) {
return await cacheBatchReadHashAndDelete(redis, hashKey, callback)
}
const query = await recipientClickhouseQuery(campaign)
logger.info({
campaignId: campaign.id,
query,
}, 'campaign:generate:progress:querying')
// Generate the initial send list from ClickHouse
const result = await User.clickhouse().query(query, {}, {
max_block_size: '16384',
send_progress_in_http_headers: 1,
http_headers_progress_interval_ms: '110000', // 110 seconds
})
// Load the results into a Redis hash for easy retrieval
let count = 0
const chunker = new Chunker<DataPair>(async pairs => {
count += pairs.length
await cacheBatchHash(redis, hashKey, pairs)
}, 2500)
// Stream the data from ClickHouse and pass it to the Redis chunker
for await (const chunk of result.stream() as any) {
for (const result of chunk) {
const user = result.json()
await chunker.add({
key: user.id,
value: cleanString(user.timezone) ?? project.timezone,
})
}
}
await chunker.flush()
// Set the totals in preparation for ingesting to DB
await cacheSet<number>(App.main.redis, CacheKeys.populationProgress(campaign), 0, 86400)
await cacheSet(redis, CacheKeys.populationTotal(campaign), count, 86400)
await cacheSet(redis, CacheKeys.generateReady(campaign), 1, 86400)
// Double check that the campaign hasn't been aborted
const updatedCampaign = await getCampaign(campaign.id, campaign.project_id) as SentCampaign
if (updatedCampaign.isAborted) return
// Now that we have results, pass them back to the callback
return await cacheBatchReadHashAndDelete(redis, hashKey, callback)
}
const cleanupSendListGeneration = async (campaign: Campaign) => {
const { pending, ...delivery } = await campaignDeliveryProgress(campaign.id)
@ -420,25 +358,45 @@ export const populateSendList = async (campaign: SentCampaign) => {
}
const now = Date.now()
const cacheKey = CacheKeys.populationProgress(campaign)
const redis = App.main.redis
const oneDay = 86400 // 24 hours in seconds
const progressCacheKey = CacheKeys.populationProgress(campaign)
const totalCacheKey = CacheKeys.populationTotal(campaign)
await generateSendList(project, campaign, async (pairs: DataPair[]) => {
const items = pairs.map(({ key, value }) => CampaignSend.create(campaign, project, { id: parseInt(key), timezone: value }))
try {
await App.main.db.transaction(async (trx) => {
await CampaignSend.query(trx)
.insert(items)
.onConflict(['campaign_id', 'user_id', 'reference_id'])
.merge(['state', 'send_at'])
})
} catch (error) {
logger.error({ error, campaignId: campaign.id }, 'campaign:generate:progress:error')
}
await cacheIncr(App.main.redis, cacheKey, items.length, 86400)
await processUsers({
query: await recipientClickhouseQuery(campaign),
cacheKey: CacheKeys.generate(campaign),
itemMap: (user) => ({
key: user.id,
value: cleanString(user.timezone) ?? project.timezone,
}),
beforeCallback: async (count: number) => {
await cacheSet<number>(redis, progressCacheKey, 0, oneDay)
await cacheSet(redis, totalCacheKey, count, oneDay)
// Double check that the campaign hasn't been aborted
const updatedCampaign = await getCampaign(campaign.id, campaign.project_id) as SentCampaign
return !updatedCampaign.isAborted
},
callback: async (pairs: DataPair[]) => {
const items = pairs.map(({ key, value }) => CampaignSend.create(campaign, project, { id: parseInt(key), timezone: value }))
try {
await App.main.db.transaction(async (trx) => {
await CampaignSend.query(trx)
.insert(items)
.onConflict(['campaign_id', 'user_id', 'reference_id'])
.merge(['state', 'send_at'])
})
} catch (error) {
logger.error({ error, campaignId: campaign.id }, 'campaign:generate:progress:error')
}
await cacheIncr(redis, progressCacheKey, items.length, oneDay)
},
afterCallback: async () => {
await cleanupSendListGeneration(campaign)
},
})
await cleanupSendListGeneration(campaign)
logger.info({ campaignId: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress:finished')
}

View file

@ -143,7 +143,6 @@ export const getJourneyStepMapForUI = async (journey: Journey) => {
for (const key of Object.keys(map)) {
const step = map[key]
const type = journeyStepTypes[step.type]
console.log('hydrate!', step.type)
map[key] = await type.hydrate(step, journey)
}
return map

View file

@ -9,7 +9,7 @@ type CachedQueryParams = {
cacheKey: string,
itemMap: (data: any) => DataPair
callback: HashScanCallback
beforeCallback?: (count: number) => Promise<void>
beforeCallback?: (count: number) => Promise<boolean>
afterCallback?: () => Promise<void>
}
@ -18,7 +18,7 @@ export const processUsers = async ({
cacheKey,
itemMap,
callback,
beforeCallback,
beforeCallback = async () => true,
afterCallback,
}: CachedQueryParams) => {
@ -43,6 +43,7 @@ export const processUsers = async ({
if (hashExists && isReady) {
await cacheBatchReadHashAndDelete(redis, hashKey, callback)
await cleanupQuery()
return
}
logger.info({
@ -74,7 +75,9 @@ export const processUsers = async ({
await chunker.flush()
// Prepare anything before running, otherwise just set the ready flag
await beforeCallback?.(count)
const shouldContinue = await beforeCallback(count)
if (!shouldContinue) return
await cacheSet(redis, hashKeyReady, 1, 86400)
// Now that we have results, pass them back to the callback