chore: improve implementation of scheduled journeys (#689)

This commit is contained in:
Chris Anderson 2025-07-25 09:14:43 -05:00 committed by GitHub
parent 36f3652dce
commit a86ec79444
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 125 additions and 47 deletions

View file

@ -19,6 +19,7 @@ export default (app: App) => {
JourneyDelayJob.enqueueActive(app)
app.queue.enqueue(ProcessCampaignsJob.from())
app.queue.enqueue(CampaignStateJob.from())
app.queue.enqueue(ScheduledEntranceOrchestratorJob.from())
},
lockLength: 120,
})
@ -30,7 +31,6 @@ export default (app: App) => {
delta: subHours(new Date(), 1),
}))
app.queue.enqueue(UpdateJourneysJob.from())
app.queue.enqueue(ScheduledEntranceOrchestratorJob.from())
},
})
scheduler.schedule({

View file

@ -1,13 +1,15 @@
import { Job } from '../queue'
import { JourneyEntrance } from './JourneyStep'
import { JourneyEntrance, JourneyStep } from './JourneyStep'
import JourneyUserStep from './JourneyUserStep'
import { chunk, Chunker, uuid } from '../utilities'
import { uuid } from '../utilities'
import App from '../app'
import JourneyProcessJob from './JourneyProcessJob'
import Journey from './Journey'
import List from '../lists/List'
import { getRuleQuery } from '../rules/RuleEngine'
import { User } from '../users/User'
import Project from '../projects/Project'
import { logger } from '../config/logger'
import { processUsers } from '../users/ProcessUsers'
interface ScheduledEntranceTrigger {
entranceId: number
@ -18,13 +20,12 @@ export default class ScheduledEntranceJob extends Job {
static $name = 'scheduled_entrance_job'
static from(params: ScheduledEntranceTrigger) {
return new ScheduledEntranceJob(params)
return new ScheduledEntranceJob(params).deduplicationKey(`${this.$name}_${params.entranceId}`)
}
static async handler({ entranceId }: ScheduledEntranceTrigger) {
const entrance = await JourneyEntrance.find(entranceId)
if (!entrance || entrance.type !== JourneyEntrance.type || !entrance.list_id) {
return
}
@ -33,42 +34,44 @@ export default class ScheduledEntranceJob extends Job {
Journey.find(entrance.journey_id),
List.find(entrance.list_id),
])
if (!list || list.project_id !== journey?.project_id) return
if (!list || list.project_id !== journey?.project_id) {
return // bad list id or project mismatch
}
const project = await Project.find(journey.project_id)
const ref = uuid()
const result = await User.clickhouse().query(
getRuleQuery(list.project_id, list.rule),
)
const query = getRuleQuery(list.project_id, list.rule)
await processUsers({
query,
cacheKey: `journeys:${journey}:entrance:${entrance.id}:users`,
itemMap: (user) => ({
key: user.id,
value: `${user.id}`,
}),
callback: async (pairs) => {
try {
const ref = uuid()
const items = pairs.map(({ key }) => ({
user_id: parseInt(key),
type: 'completed',
journey_id: entrance.journey_id,
step_id: entrance.id,
ref,
}))
await JourneyUserStep.insert(items)
const chunker = new Chunker<Partial<JourneyUserStep>>(async items => {
await App.main.db.transaction(async (trx) => {
await JourneyUserStep.query(trx)
.insert(items)
})
}, 500)
const steps = await JourneyUserStep.all(qb => qb.select('id')
.where('ref', ref),
)
for await (const chunk of result.stream() as any) {
for (const result of chunk) {
const user = result.json()
chunker.add({
user_id: user.id,
type: 'completed',
journey_id: entrance.journey_id,
step_id: entrance.id,
ref,
await App.main.queue.enqueueBatch(steps.map(({ id }) => JourneyProcessJob.from({ entrance_id: id })))
} catch (error) {
logger.error({ error, journey: journey.id }, 'campaign:generate:progress:error')
}
},
afterCallback: async () => {
await JourneyStep.update(q => q.where('id', entrance.id), {
next_scheduled_at: entrance.nextDate(project?.timezone ?? 'UTC'),
})
}
}
await chunker.flush()
const query = JourneyUserStep.query().select('id').where('ref', ref)
await chunk<{ id: number }>(query, App.main.queue.batchSize, async items => {
await App.main.queue.enqueueBatch(items.map(({ id }) => JourneyProcessJob.from({ entrance_id: id })))
},
})
}
}

View file

@ -1,7 +1,6 @@
import App from '../app'
import { getProject } from '../projects/ProjectService'
import { Job } from '../queue'
import { JourneyEntrance, JourneyStep } from './JourneyStep'
import { JourneyEntrance } from './JourneyStep'
import ScheduledEntranceJob from './ScheduledEntranceJob'
export default class ScheduledEntranceOrchestratorJob extends Job {
@ -32,12 +31,6 @@ export default class ScheduledEntranceOrchestratorJob extends Job {
const jobs: Job[] = []
for (const entrance of entrances) {
const project = await getProject(entrance.project_id)
await JourneyStep.update(q => q.where('id', entrance.id), {
next_scheduled_at: entrance.nextDate(project?.timezone ?? 'UTC'),
})
if (entrance.list_id) {
jobs.push(ScheduledEntranceJob.from({
entranceId: entrance.id,
@ -49,5 +42,4 @@ export default class ScheduledEntranceOrchestratorJob extends Job {
await App.main.queue.enqueueBatch(jobs)
}
}
}

View file

@ -14,7 +14,7 @@ export default class ListStatsJob extends Job {
listId: number,
projectId: number,
): ListStatsJob {
return new this({ listId, projectId }).deduplicationKey(`${this.name}_${listId}`)
return new this({ listId, projectId }).deduplicationKey(`${this.$name}_${listId}`)
}
static async handler({ listId, projectId }: ListStatsParams) {

View file

@ -0,0 +1,83 @@
import { Chunker } from '../utilities'
import App from '../app'
import { logger } from '../config/logger'
import { cacheBatchHash, cacheBatchReadHashAndDelete, cacheDel, cacheGet, cacheHashExists, cacheSet, DataPair, HashScanCallback } from '../config/redis'
import { User } from './User'
type CachedQueryParams = {
query: string
cacheKey: string,
itemMap: (data: any) => DataPair
callback: HashScanCallback
beforeCallback?: (count: number) => Promise<void>
afterCallback?: () => Promise<void>
}
export const processUsers = async ({
query,
cacheKey,
itemMap,
callback,
beforeCallback,
afterCallback,
}: CachedQueryParams) => {
const redis = App.main.redis
const hashKey = cacheKey
const hashKeyReady = `${hashKey}:ready`
const hashExists = await cacheHashExists(redis, hashKey)
const isReady = await cacheGet(redis, hashKeyReady)
const cleanupQuery = async () => {
await afterCallback?.()
await cacheDel(redis, hashKeyReady)
await cacheDel(redis, hashKey)
}
logger.info({
source: hashExists ? 'cache' : 'clickhouse',
key: hashKey,
}, 'users:generate:started')
// Return users from the hash if they exist
if (hashExists && isReady) {
await cacheBatchReadHashAndDelete(redis, hashKey, callback)
await cleanupQuery()
}
logger.info({
query,
key: hashKey,
}, 'users:generate:querying')
// Generate the initial send list from ClickHouse
const result = await User.clickhouse().query(query, {}, {
max_block_size: '16384',
send_progress_in_http_headers: 1,
http_headers_progress_interval_ms: '110000', // 110 seconds
})
// Load the results into a Redis hash for easy retrieval
let count = 0
const chunker = new Chunker<DataPair>(async pairs => {
count += pairs.length
await cacheBatchHash(redis, hashKey, pairs)
}, 2500)
// Stream the data from ClickHouse and pass it to the Redis chunker
for await (const chunk of result.stream() as any) {
for (const result of chunk) {
const item = result.json()
await chunker.add(itemMap(item))
}
}
await chunker.flush()
// Prepare anything before running, otherwise just set the ready flag
await beforeCallback?.(count)
await cacheSet(redis, hashKeyReady, 1, 86400)
// Now that we have results, pass them back to the callback
await cacheBatchReadHashAndDelete(redis, hashKey, callback)
await cleanupQuery()
}