chore: improve logging around campaign generation

This commit is contained in:
Chris Anderson 2025-08-02 15:35:39 -05:00
parent 2701aef923
commit b15e8d2be1
5 changed files with 28 additions and 13 deletions

View file

@ -4,7 +4,7 @@ import TextJob from '../providers/text/TextJob'
import EmailJob from '../providers/email/EmailJob'
import { logger } from '../config/logger'
import { User } from '../users/User'
import Campaign, { CampaignCreateParams, CampaignDelivery, CampaignParams, CampaignPopulationProgress, CampaignProgress, CampaignSend, CampaignSendReferenceType, CampaignSendState, SentCampaign } from './Campaign'
import Campaign, { CampaignCreateParams, CampaignDelivery, CampaignParams, CampaignPopulationProgress, CampaignProgress, CampaignSend, CampaignSendReferenceType, CampaignSendState, CampaignState, SentCampaign } from './Campaign'
import List from '../lists/List'
import Subscription, { SubscriptionState } from '../subscriptions/Subscription'
import { RequestError } from '../core/errors'
@ -334,10 +334,9 @@ export const updateSendState = async ({ campaign, user, state = 'sent', referenc
}
const cleanupSendListGeneration = async (campaign: Campaign) => {
const { pending, ...delivery } = await campaignDeliveryProgress(campaign.id)
// Update the state & count of the campaign
await Campaign.update(qb => qb.where('id', campaign.id).where('project_id', campaign.project_id), { state: 'scheduled', delivery })
await updateCampaignProgress(campaign, 'scheduled')
// Clear out all the keys related to the generation
await cleanupGenerationCacheKeys(campaign)
@ -529,7 +528,7 @@ export const campaignDeliveryProgress = async (campaignId: number): Promise<Camp
}
}
export const updateCampaignProgress = async (campaign: Campaign): Promise<void> => {
export const updateCampaignProgress = async (campaign: Campaign, stateOverride?: CampaignState): Promise<void> => {
const currentState = (pending: number, delivery: CampaignDelivery) => {
if (campaign.type === 'trigger') return 'running'
if (campaign.state === 'draft') return 'draft'
@ -540,7 +539,7 @@ export const updateCampaignProgress = async (campaign: Campaign): Promise<void>
}
const { pending, ...delivery } = await campaignDeliveryProgress(campaign.id)
const state = currentState(pending, delivery)
const state = stateOverride ?? currentState(pending, delivery)
// If nothing has changed, continue otherwise update
if (shallowEqual(campaign.delivery, delivery) && state === campaign.state) return

View file

@ -112,6 +112,10 @@ export const cacheBatchReadHashAndDelete = async (
await redis.del(hashKey)
}
export const cacheBatchLength = async (redis: Redis, hashKey: string): Promise<number> => {
return await redis.hlen(hashKey)
}
export const cacheHashExists = async (redis: Redis, hashKey: string): Promise<boolean> => {
const exists = await redis.exists(hashKey)
return exists !== 0

View file

@ -41,7 +41,7 @@ export default class ScheduledEntranceJob extends Job {
const query = getRuleQuery(list.project_id, list.rule)
await processUsers({
query,
cacheKey: `journeys:${journey}:entrance:${entrance.id}:users`,
cacheKey: `journeys:${journey.id}:entrance:${entrance.id}:users`,
itemMap: (user) => ({
key: user.id,
value: `${user.id}`,

View file

@ -28,6 +28,7 @@ export default class Queue {
}
async dequeue(job: EncodedJob): Promise<boolean> {
if (!job || !job.name) return false
const handler = this.jobs[job.name]
if (!handler) {
App.main.error.notify(new Error(`No handler found for job: ${job.name}`))

View file

@ -1,7 +1,7 @@
import { Chunker } from '../utilities'
import App from '../app'
import { logger } from '../config/logger'
import { cacheBatchHash, cacheBatchReadHashAndDelete, cacheDel, cacheGet, cacheHashExists, cacheSet, DataPair, HashScanCallback } from '../config/redis'
import { cacheBatchHash, cacheBatchLength, cacheBatchReadHashAndDelete, cacheDel, cacheGet, cacheHashExists, cacheSet, DataPair, HashScanCallback } from '../config/redis'
import { User } from './User'
type CachedQueryParams = {
@ -28,10 +28,18 @@ export const processUsers = async ({
const hashExists = await cacheHashExists(redis, hashKey)
const isReady = await cacheGet(redis, hashKeyReady)
const cleanupQuery = async () => {
const processFromCache = async () => {
logger.info({
key: hashKey,
count: await cacheBatchLength(redis, hashKey),
}, 'users:generate:loading:started')
await cacheBatchReadHashAndDelete(redis, hashKey, callback)
await afterCallback?.()
await cacheDel(redis, hashKeyReady)
await cacheDel(redis, hashKey)
logger.info({ key: hashKey }, 'users:generate:loading:finished')
}
logger.info({
@ -41,15 +49,14 @@ export const processUsers = async ({
// Return users from the hash if they exist
if (hashExists && isReady) {
await cacheBatchReadHashAndDelete(redis, hashKey, callback)
await cleanupQuery()
await processFromCache()
return
}
logger.info({
query,
key: hashKey,
}, 'users:generate:querying')
}, 'users:generate:querying:started')
// Generate the initial send list from ClickHouse
const result = await User.clickhouse().query(query, {}, {
@ -74,6 +81,11 @@ export const processUsers = async ({
}
await chunker.flush()
logger.info({
key: hashKey,
count,
}, 'users:generate:querying:finished')
// Prepare anything before running, otherwise just set the ready flag
const shouldContinue = await beforeCallback(count)
if (!shouldContinue) return
@ -81,6 +93,5 @@ export const processUsers = async ({
await cacheSet(redis, hashKeyReady, 1, 86400)
// Now that we have results, pass them back to the callback
await cacheBatchReadHashAndDelete(redis, hashKey, callback)
await cleanupQuery()
await processFromCache()
}