Removes outer user fetch query from campaign generation (#647)

This commit is contained in:
Chris Anderson 2025-03-08 21:56:19 -06:00 committed by GitHub
parent d368613818
commit f3f9192da8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 33 additions and 39 deletions

View file

@ -17,7 +17,7 @@ export default class CampaignGenerateListJob extends Job {
static async handler({ id, project_id }: CampaignJobParams) {
const key = `campaign_generate_${id}`
logger.info({ id }, 'campaign:generate:loading')
logger.info({ campaign_id: id }, 'campaign:generate:loading')
const campaign = await getCampaign(id, project_id) as SentCampaign
if (!campaign) return
if (campaign.state === 'aborted' || campaign.state === 'draft') return
@ -27,20 +27,20 @@ export default class CampaignGenerateListJob extends Job {
// Increase lock duration based on estimated send size
const lockTime = Math.ceil(Math.max(estimatedSize / 1000, 900))
logger.info({ id, estimatedSize, lockTime }, 'campaign:generate:estimated_size')
logger.info({ campaignId: id, estimatedSize, lockTime }, 'campaign:generate:estimated_size')
const acquired = await acquireLock({ key, timeout: lockTime })
logger.info({ id, acquired }, 'campaign:generate:lock')
logger.info({ campaignId: id, acquired }, 'campaign:generate:lock')
if (!acquired) return
// Use approximate size for progress
await cacheSet<number>(App.main.redis, CacheKeys.populationTotal(campaign), estimatedSize, 86400)
await cacheSet<number>(App.main.redis, CacheKeys.populationProgress(campaign), 0, 86400)
logger.info({ id }, 'campaign:generate:querying')
logger.info({ campaignId: id }, 'campaign:generate:querying')
await generateSendList(campaign)
logger.info({ id }, 'campaign:generate:sending')
logger.info({ campaignId: id }, 'campaign:generate:sending')
await CampaignEnqueueSendsJob.from({
id: campaign.id,
project_id: campaign.project_id,

View file

@ -292,37 +292,27 @@ export const generateSendList = async (campaign: SentCampaign) => {
const now = Date.now()
const cacheKey = CacheKeys.populationProgress(campaign)
const stream = UserList.query()
.select('id')
.whereIn('list_id', campaign.list_ids ?? [])
.stream()
logger.debug({ campaignId: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress:started')
logger.debug({ campaign: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress:started')
let count = 0
const limit = 10_000
for await (const user of stream) {
if (count % limit === 0) {
logger.debug({ count, campaign: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress')
await recipientPartialQuery({
campaign,
project,
sinceId: user.id,
callback: async (items) => {
await CampaignSend.query()
.insert(items)
.onConflict(['campaign_id', 'user_id', 'reference_id'])
.ignore()
await cacheIncr(App.main.redis, cacheKey, items.length, 300)
},
limit,
})
}
count++
let nextId: number | null = 0
while (nextId !== null) {
logger.debug({ nextId, campaignId: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress')
nextId = await recipientPartialQuery({
campaign,
project,
sinceId: nextId,
callback: async (items) => {
await CampaignSend.query()
.insert(items)
.onConflict(['campaign_id', 'user_id', 'reference_id'])
.ignore()
await cacheIncr(App.main.redis, cacheKey, items.length, 300)
},
limit: 10_000,
})
}
logger.debug({ count, campaign: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress:finished')
logger.debug({ nextId, campaignId: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress:finished')
await Campaign.update(qb => qb.where('id', campaign.id), { state: 'scheduled' })
}
@ -579,18 +569,19 @@ export const recipientPartialQuery = async ({ campaign, project, sinceId, callba
users.timezone,
users.phone,
users.email,
users.project_id
users.project_id,
user_list.id AS user_list_id
from users
inner join user_list
on users.id = user_list.user_id
and user_list.list_id IN (${lists.join(',')})
where user_list.id >= ${sinceId}
where user_list.id > ${sinceId}
order by user_list.id
limit ${limit}
;`)
const query = Model.query(trx)
.select('users.id AS user_id', 'users.timezone')
.select('users.id AS user_id', 'users.timezone', 'users.user_list_id')
.from(`${table} AS users`)
.where('users.project_id', campaign.project_id)
.where(qb => {
@ -614,10 +605,13 @@ export const recipientPartialQuery = async ({ campaign, project, sinceId, callba
.where('state', SubscriptionState.unsubscribed),
)
let nextId = null
await chunk<CampaignSendParams>(query, 500, async (items) => {
await callback(items)
}, ({ user_id, timezone }: { user_id: number, timezone: string }) =>
CampaignSend.create(campaign, project, User.fromJson({ id: user_id, timezone })),
)
}, ({ user_id, timezone, user_list_id }: { user_id: number, timezone: string, user_list_id: number }) => {
nextId = user_list_id
return CampaignSend.create(campaign, project, User.fromJson({ id: user_id, timezone }))
})
return nextId
})
}