Removes top level transaction around campaigns (#648)

This commit is contained in:
Chris Anderson 2025-03-08 23:29:12 -06:00 committed by GitHub
parent f3f9192da8
commit 4c62f60cf8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -294,25 +294,26 @@ export const generateSendList = async (campaign: SentCampaign) => {
logger.debug({ campaignId: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress:started')
let nextId: number | null = 0
while (nextId !== null) {
logger.debug({ nextId, campaignId: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress')
nextId = await recipientPartialQuery({
let lastId = 0
let isExhausted = false
while (!isExhausted) {
({ isExhausted, lastId } = await recipientPartialQuery({
campaign,
project,
sinceId: nextId,
sinceId: lastId,
callback: async (items) => {
await CampaignSend.query()
.insert(items)
.onConflict(['campaign_id', 'user_id', 'reference_id'])
.onConflict()
.ignore()
await cacheIncr(App.main.redis, cacheKey, items.length, 300)
},
limit: 10_000,
})
}))
logger.debug({ campaign: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress')
}
logger.debug({ nextId, campaignId: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress:finished')
logger.debug({ lastId, campaignId: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress:finished')
await Campaign.update(qb => qb.where('id', campaign.id), { state: 'scheduled' })
}
@ -559,59 +560,60 @@ export const recipientPartialQuery = async ({ campaign, project, sinceId, callba
const lists = campaign.list_ids ?? []
const exclusionLists = campaign.exclusion_list_ids ?? []
return await App.main.db.transaction(async (trx) => {
const table = `tmp_inclusive_${sinceId}`
await trx.raw(`drop temporary table if exists ${table}`)
await trx.raw(`
create temporary table ${table}
select
users.id,
users.timezone,
users.phone,
users.email,
users.project_id,
user_list.id AS user_list_id
from users
inner join user_list
on users.id = user_list.user_id
and user_list.list_id IN (${lists.join(',')})
where user_list.id > ${sinceId}
order by user_list.id
limit ${limit}
;`)
const table = `tmp_inclusive_${campaign.id}`
await raw(`drop temporary table if exists ${table}`)
await raw(`
create temporary table ${table}
select
users.id,
users.timezone,
users.phone,
users.email,
users.project_id,
user_list.id AS user_list_id
from users
inner join user_list
on users.id = user_list.user_id
and user_list.list_id IN (${lists.join(',')})
where user_list.id > ${sinceId}
order by user_list.id
limit ${limit}
`)
const query = Model.query(trx)
.select('users.id AS user_id', 'users.timezone', 'users.user_list_id')
.from(`${table} AS users`)
.where('users.project_id', campaign.project_id)
.where(qb => {
if (campaign.channel === 'email') {
qb.whereNotNull('users.email')
} else if (campaign.channel === 'text') {
qb.whereNotNull('users.phone')
} else if (campaign.channel === 'push') {
qb.whereNotNull('users.devices')
}
})
.whereNotExists(
UserList.query()
.whereIn('list_id', exclusionLists)
.where('user_id', ref('users.id')),
)
.whereNotExists(
UserSubscription.query()
.where('subscription_id', campaign.subscription_id)
.where('user_id', ref('users.id'))
.where('state', SubscriptionState.unsubscribed),
)
const rawMaxResult = await raw(`select max(user_list_id) as last_id from ${table}`)
const lastId: number | null = rawMaxResult[0]?.[0]?.last_id ?? null
let nextId = null
await chunk<CampaignSendParams>(query, 500, async (items) => {
await callback(items)
}, ({ user_id, timezone, user_list_id }: { user_id: number, timezone: string, user_list_id: number }) => {
nextId = user_list_id
return CampaignSend.create(campaign, project, User.fromJson({ id: user_id, timezone }))
const query = Model.query()
.select('users.id AS user_id', 'users.timezone')
.from(`${table} AS users`)
.where('users.project_id', campaign.project_id)
.where(qb => {
if (campaign.channel === 'email') {
qb.whereNotNull('users.email')
} else if (campaign.channel === 'text') {
qb.whereNotNull('users.phone')
} else if (campaign.channel === 'push') {
qb.whereNotNull('users.devices')
}
})
return nextId
.whereNotExists(
UserList.query()
.whereIn('list_id', exclusionLists)
.where('user_id', ref('users.id')),
)
.whereNotExists(
UserSubscription.query()
.where('subscription_id', campaign.subscription_id)
.where('user_id', ref('users.id'))
.where('state', SubscriptionState.unsubscribed),
)
await chunk<CampaignSendParams>(query, 250, async (items) => {
await callback(items)
}, ({ user_id, timezone }: { user_id: number, timezone: string }) => {
return CampaignSend.create(campaign, project, User.fromJson({ id: user_id, timezone }))
})
await raw(`drop temporary table if exists ${table}`)
return { isExhausted: lastId == null, lastId: lastId ?? 0 }
}