From a86ec794445dd02c4a9e57ec630d8e7597699367 Mon Sep 17 00:00:00 2001 From: Chris Anderson Date: Fri, 25 Jul 2025 09:14:43 -0500 Subject: [PATCH] chore: improve implementation of scheduled journeys (#689) --- apps/platform/src/config/scheduler.ts | 2 +- .../src/journey/ScheduledEntranceJob.ts | 75 +++++++++-------- .../ScheduledEntranceOrchestratorJob.ts | 10 +-- apps/platform/src/lists/ListStatsJob.ts | 2 +- apps/platform/src/users/ProcessUsers.ts | 83 +++++++++++++++++++ 5 files changed, 125 insertions(+), 47 deletions(-) create mode 100644 apps/platform/src/users/ProcessUsers.ts diff --git a/apps/platform/src/config/scheduler.ts b/apps/platform/src/config/scheduler.ts index 27880bdf..bae44c7c 100644 --- a/apps/platform/src/config/scheduler.ts +++ b/apps/platform/src/config/scheduler.ts @@ -19,6 +19,7 @@ export default (app: App) => { JourneyDelayJob.enqueueActive(app) app.queue.enqueue(ProcessCampaignsJob.from()) app.queue.enqueue(CampaignStateJob.from()) + app.queue.enqueue(ScheduledEntranceOrchestratorJob.from()) }, lockLength: 120, }) @@ -30,7 +31,6 @@ export default (app: App) => { delta: subHours(new Date(), 1), })) app.queue.enqueue(UpdateJourneysJob.from()) - app.queue.enqueue(ScheduledEntranceOrchestratorJob.from()) }, }) scheduler.schedule({ diff --git a/apps/platform/src/journey/ScheduledEntranceJob.ts b/apps/platform/src/journey/ScheduledEntranceJob.ts index 70cecdbf..f11e3116 100644 --- a/apps/platform/src/journey/ScheduledEntranceJob.ts +++ b/apps/platform/src/journey/ScheduledEntranceJob.ts @@ -1,13 +1,15 @@ import { Job } from '../queue' -import { JourneyEntrance } from './JourneyStep' +import { JourneyEntrance, JourneyStep } from './JourneyStep' import JourneyUserStep from './JourneyUserStep' -import { chunk, Chunker, uuid } from '../utilities' +import { uuid } from '../utilities' import App from '../app' import JourneyProcessJob from './JourneyProcessJob' import Journey from './Journey' import List from '../lists/List' import { getRuleQuery } from '../rules/RuleEngine' -import { User } from '../users/User' +import Project from '../projects/Project' +import { logger } from '../config/logger' +import { processUsers } from '../users/ProcessUsers' interface ScheduledEntranceTrigger { entranceId: number @@ -18,13 +20,12 @@ export default class ScheduledEntranceJob extends Job { static $name = 'scheduled_entrance_job' static from(params: ScheduledEntranceTrigger) { - return new ScheduledEntranceJob(params) + return new ScheduledEntranceJob(params).deduplicationKey(`${this.$name}_${params.entranceId}`) } static async handler({ entranceId }: ScheduledEntranceTrigger) { const entrance = await JourneyEntrance.find(entranceId) - if (!entrance || entrance.type !== JourneyEntrance.type || !entrance.list_id) { return } @@ -33,42 +34,44 @@ export default class ScheduledEntranceJob extends Job { Journey.find(entrance.journey_id), List.find(entrance.list_id), ]) + if (!list || list.project_id !== journey?.project_id) return - if (!list || list.project_id !== journey?.project_id) { - return // bad list id or project mismatch - } + const project = await Project.find(journey.project_id) - const ref = uuid() - const result = await User.clickhouse().query( - getRuleQuery(list.project_id, list.rule), - ) + const query = getRuleQuery(list.project_id, list.rule) + await processUsers({ + query, + cacheKey: `journeys:${journey}:entrance:${entrance.id}:users`, + itemMap: (user) => ({ + key: user.id, + value: `${user.id}`, + }), + callback: async (pairs) => { + try { + const ref = uuid() + const items = pairs.map(({ key }) => ({ + user_id: parseInt(key), + type: 'completed', + journey_id: entrance.journey_id, + step_id: entrance.id, + ref, + })) + await JourneyUserStep.insert(items) - const chunker = new Chunker>(async items => { - await App.main.db.transaction(async (trx) => { - await JourneyUserStep.query(trx) - .insert(items) - }) - }, 500) + const steps = await JourneyUserStep.all(qb => qb.select('id') + .where('ref', ref), + ) - for await (const chunk of result.stream() as any) { - for (const result of chunk) { - const user = result.json() - chunker.add({ - user_id: user.id, - type: 'completed', - journey_id: entrance.journey_id, - step_id: entrance.id, - ref, + await App.main.queue.enqueueBatch(steps.map(({ id }) => JourneyProcessJob.from({ entrance_id: id }))) + } catch (error) { + logger.error({ error, journey: journey.id }, 'campaign:generate:progress:error') + } + }, + afterCallback: async () => { + await JourneyStep.update(q => q.where('id', entrance.id), { + next_scheduled_at: entrance.nextDate(project?.timezone ?? 'UTC'), }) - } - } - - await chunker.flush() - - const query = JourneyUserStep.query().select('id').where('ref', ref) - - await chunk<{ id: number }>(query, App.main.queue.batchSize, async items => { - await App.main.queue.enqueueBatch(items.map(({ id }) => JourneyProcessJob.from({ entrance_id: id }))) + }, }) } } diff --git a/apps/platform/src/journey/ScheduledEntranceOrchestratorJob.ts b/apps/platform/src/journey/ScheduledEntranceOrchestratorJob.ts index 72fbbb6a..aeded937 100644 --- a/apps/platform/src/journey/ScheduledEntranceOrchestratorJob.ts +++ b/apps/platform/src/journey/ScheduledEntranceOrchestratorJob.ts @@ -1,7 +1,6 @@ import App from '../app' -import { getProject } from '../projects/ProjectService' import { Job } from '../queue' -import { JourneyEntrance, JourneyStep } from './JourneyStep' +import { JourneyEntrance } from './JourneyStep' import ScheduledEntranceJob from './ScheduledEntranceJob' export default class ScheduledEntranceOrchestratorJob extends Job { @@ -32,12 +31,6 @@ export default class ScheduledEntranceOrchestratorJob extends Job { const jobs: Job[] = [] for (const entrance of entrances) { - - const project = await getProject(entrance.project_id) - await JourneyStep.update(q => q.where('id', entrance.id), { - next_scheduled_at: entrance.nextDate(project?.timezone ?? 'UTC'), - }) - if (entrance.list_id) { jobs.push(ScheduledEntranceJob.from({ entranceId: entrance.id, @@ -49,5 +42,4 @@ export default class ScheduledEntranceOrchestratorJob extends Job { await App.main.queue.enqueueBatch(jobs) } } - } diff --git a/apps/platform/src/lists/ListStatsJob.ts b/apps/platform/src/lists/ListStatsJob.ts index 17e4b7b2..f433e7ce 100644 --- a/apps/platform/src/lists/ListStatsJob.ts +++ b/apps/platform/src/lists/ListStatsJob.ts @@ -14,7 +14,7 @@ export default class ListStatsJob extends Job { listId: number, projectId: number, ): ListStatsJob { - return new this({ listId, projectId }).deduplicationKey(`${this.name}_${listId}`) + return new this({ listId, projectId }).deduplicationKey(`${this.$name}_${listId}`) } static async handler({ listId, projectId }: ListStatsParams) { diff --git a/apps/platform/src/users/ProcessUsers.ts b/apps/platform/src/users/ProcessUsers.ts new file mode 100644 index 00000000..45dc8521 --- /dev/null +++ b/apps/platform/src/users/ProcessUsers.ts @@ -0,0 +1,83 @@ +import { Chunker } from '../utilities' +import App from '../app' +import { logger } from '../config/logger' +import { cacheBatchHash, cacheBatchReadHashAndDelete, cacheDel, cacheGet, cacheHashExists, cacheSet, DataPair, HashScanCallback } from '../config/redis' +import { User } from './User' + +type CachedQueryParams = { + query: string + cacheKey: string, + itemMap: (data: any) => DataPair + callback: HashScanCallback + beforeCallback?: (count: number) => Promise + afterCallback?: () => Promise +} + +export const processUsers = async ({ + query, + cacheKey, + itemMap, + callback, + beforeCallback, + afterCallback, +}: CachedQueryParams) => { + + const redis = App.main.redis + const hashKey = cacheKey + const hashKeyReady = `${hashKey}:ready` + const hashExists = await cacheHashExists(redis, hashKey) + const isReady = await cacheGet(redis, hashKeyReady) + + const cleanupQuery = async () => { + await afterCallback?.() + await cacheDel(redis, hashKeyReady) + await cacheDel(redis, hashKey) + } + + logger.info({ + source: hashExists ? 'cache' : 'clickhouse', + key: hashKey, + }, 'users:generate:started') + + // Return users from the hash if they exist + if (hashExists && isReady) { + await cacheBatchReadHashAndDelete(redis, hashKey, callback) + await cleanupQuery() + } + + logger.info({ + query, + key: hashKey, + }, 'users:generate:querying') + + // Generate the initial send list from ClickHouse + const result = await User.clickhouse().query(query, {}, { + max_block_size: '16384', + send_progress_in_http_headers: 1, + http_headers_progress_interval_ms: '110000', // 110 seconds + }) + + // Load the results into a Redis hash for easy retrieval + let count = 0 + const chunker = new Chunker(async pairs => { + count += pairs.length + await cacheBatchHash(redis, hashKey, pairs) + }, 2500) + + // Stream the data from ClickHouse and pass it to the Redis chunker + for await (const chunk of result.stream() as any) { + for (const result of chunk) { + const item = result.json() + await chunker.add(itemMap(item)) + } + } + await chunker.flush() + + // Prepare anything before running, otherwise just set the ready flag + await beforeCallback?.(count) + await cacheSet(redis, hashKeyReady, 1, 86400) + + // Now that we have results, pass them back to the callback + await cacheBatchReadHashAndDelete(redis, hashKey, callback) + await cleanupQuery() +}