Revert to original campaign generation implementation (#650)

This commit is contained in:
Chris Anderson 2025-03-09 12:30:49 -05:00 committed by GitHub
parent 7b7a73f912
commit 610c820943
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 19 additions and 19 deletions

View file

@ -26,7 +26,7 @@ export default class CampaignGenerateListJob extends Job {
const estimatedSize = await estimatedSendSize(campaign)
// Increase lock duration based on estimated send size
const lockTime = Math.ceil(Math.max(estimatedSize / 1000, 900))
const lockTime = Math.ceil(Math.max(estimatedSize / 500, 900))
logger.info({ campaignId: id, estimatedSize, lockTime }, 'campaign:generate:estimated_size')
const acquired = await acquireLock({ key, timeout: lockTime })

View file

@ -25,6 +25,7 @@ import { differenceInDays, subDays } from 'date-fns'
import Model, { raw, ref } from '../core/Model'
import { cacheGet, cacheIncr } from '../config/redis'
import App from '../app'
import { releaseLock } from '../core/Lock'
export const CacheKeys = {
pendingStats: 'campaigns:pending_stats',
@ -286,30 +287,27 @@ export const generateSendList = async (campaign: SentCampaign) => {
throw new RequestError('Unable to send to a campaign that does not have an associated list', 404)
}
// Clear any aborted sends
await clearCampaign(campaign)
const now = Date.now()
const cacheKey = CacheKeys.populationProgress(campaign)
logger.info({ campaignId: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress:started')
let lastId = 0
let isExhausted = false
while (!isExhausted) {
({ isExhausted, lastId } = await recipientPartialQuery({
campaign,
project,
sinceId: lastId,
callback: async (items) => {
await cacheIncr(App.main.redis, cacheKey, items.length, 300)
},
limit: 10_000,
}))
// Generate the initial send list
const query = recipientQuery(campaign)
await chunk<CampaignSendParams>(query, 500, async (items) => {
await App.main.db.transaction(async (trx) => {
await CampaignSend.query(trx)
.insert(items)
.onConflict(['campaign_id', 'user_id', 'reference_id'])
.merge(['state', 'send_at'])
})
await cacheIncr(App.main.redis, cacheKey, items.length, 300)
logger.info({ campaign: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress')
}
}, ({ user_id, timezone }: { user_id: number, timezone: string }) =>
CampaignSend.create(campaign, project, User.fromJson({ id: user_id, timezone })),
)
logger.info({ lastId, campaignId: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress:finished')
logger.info({ campaignId: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress:finished')
await Campaign.update(qb => qb.where('id', campaign.id), { state: 'scheduled' })
}
@ -404,6 +402,7 @@ export const abortCampaign = async (campaign: Campaign) => {
.where('campaign_id', campaign.id)
.where('state', 'pending')
.update({ state: 'aborted' })
await releaseLock(`campaign_generate_${campaign.id}`)
}
export const clearCampaign = async (campaign: Campaign) => {
@ -448,6 +447,7 @@ export const campaignDeliveryProgress = async (campaignId: number): Promise<Camp
export const updateCampaignProgress = async (campaign: Campaign): Promise<void> => {
const currentState = (pending: number, delivery: CampaignDelivery) => {
if (campaign.type === 'trigger') return 'running'
if (campaign.state === 'loading') return 'loading'
if (pending <= 0) return 'finished'
if (delivery.sent === 0) return 'scheduled'
return 'running'

View file

@ -13,7 +13,7 @@ export default class CampaignStateJob extends Job {
// within the last two days or has activity since last run
const openedCampaignIds = await App.main.redis.smembers(CacheKeys.pendingStats).then(ids => ids.map(parseInt).filter(x => x))
const campaigns = await Campaign.query()
.whereIn('state', ['scheduled', 'running'])
.whereIn('state', ['loading', 'scheduled', 'running'])
.orWhere(function(qb) {
qb.where('state', 'finished')
.where('send_at', '>', subDays(Date.now(), 2))