Switches to temp tables for campaign generation (#645)

This commit is contained in:
Chris Anderson 2025-03-08 19:23:59 -06:00 committed by GitHub
parent 0afe09da05
commit a7c855e8db
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -21,7 +21,7 @@ import CampaignGenerateListJob from './CampaignGenerateListJob'
import Project from '../projects/Project'
import Template from '../render/Template'
import { differenceInDays, subDays } from 'date-fns'
import { raw, ref } from '../core/Model'
import Model, { raw, ref } from '../core/Model'
import { cacheGet, cacheIncr } from '../config/redis'
import App from '../app'
@ -288,33 +288,32 @@ export const generateSendList = async (campaign: SentCampaign) => {
// Clear any aborted sends
await clearCampaign(campaign)
const cacheKey = CacheKeys.populationProgress(campaign)
const stream = UserList.query()
.select('users.id AS id')
.leftJoin('users', 'user_list.user_id', 'users.id')
.whereIn('user_list.list_id', campaign.list_ids ?? [])
.select('id')
.whereIn('list_id', campaign.list_ids ?? [])
.stream()
const ingest = async (lastId: number, limit: number) => {
const query = recipientPartialQuery(campaign, lastId, limit)
const cacheKey = CacheKeys.populationProgress(campaign)
await chunk<CampaignSendParams>(query, 300, async (items) => {
await CampaignSend.query()
.insert(items)
.onConflict(['campaign_id', 'user_id', 'reference_id'])
.ignore()
await cacheIncr(App.main.redis, cacheKey, items.length, 300)
}, ({ user_id, timezone }: { user_id: number, timezone: string }) =>
CampaignSend.create(campaign, project, User.fromJson({ id: user_id, timezone })),
)
}
let count = 0
let lastId = 0
const limit = 10_000
for await (const user of stream) {
if (count % limit === 0) await ingest(lastId, limit)
lastId = user.id
if (count % limit === 0) {
await recipientPartialQuery({
campaign,
project,
sinceId: user.id,
callback: async (items) => {
await CampaignSend.query()
.insert(items)
.onConflict(['campaign_id', 'user_id', 'reference_id'])
.ignore()
await cacheIncr(App.main.redis, cacheKey, items.length, 300)
},
limit,
})
}
count++
}
@ -551,41 +550,67 @@ export const updateCampaignSendEnrollment = async (user: User) => {
}
}
const recipientPartialQuery = (campaign: Campaign, sinceId: number, limit = 10000) => {
return User.query()
.select('users.id AS user_id', 'users.timezone')
.innerJoin('user_list', sbq =>
sbq.on('users.id', 'user_list.user_id')
.onIn('user_list.list_id', campaign.list_ids ?? []),
)
.where('users.project_id', campaign.project_id)
.where(qb => {
if (campaign.channel === 'email') {
qb.whereNotNull('users.email')
} else if (campaign.channel === 'text') {
qb.whereNotNull('users.phone')
} else if (campaign.channel === 'push') {
qb.whereNotNull('users.devices')
}
})
.whereNotExists(
UserList.query()
.whereIn('list_id', campaign.exclusion_list_ids ?? [])
.where('user_id', ref('users.id')),
)
.whereNotExists(
CampaignSend.query()
.where('campaign_id', campaign.id)
.where('user_id', ref('users.id'))
.where('state', 'sent'),
)
.whereNotExists(
UserSubscription.query()
.where('subscription_id', campaign.subscription_id)
.where('user_id', ref('users.id'))
.where('state', SubscriptionState.unsubscribed),
)
.where('users.id', '>', sinceId)
.orderBy('user_list.id')
.limit(limit)
interface RecipientQueryParams {
campaign: SentCampaign
project: Project
sinceId: number
callback: (chunk: CampaignSendParams[]) => Promise<void>
limit: number
}
export const recipientPartialQuery = async ({ campaign, project, sinceId, callback, limit }: RecipientQueryParams) => {
const lists = campaign.list_ids ?? []
const exclusionLists = campaign.exclusion_list_ids ?? []
return await App.main.db.transaction(async (trx) => {
const table = `tmp_inclusive_${sinceId}`
await trx.raw(`drop temporary table if exists ${table}`)
await trx.raw(`
create temporary table ${table}
select
users.id,
users.timezone,
users.phone,
users.email,
users.project_id
from users
inner join user_list
on users.id = user_list.user_id
and user_list.list_id IN (${lists.join(',')})
where user_list.id >= ${sinceId}
order by user_list.id
limit ${limit}
;`)
const query = Model.query(trx)
.select('users.id AS user_id', 'users.timezone')
.from(`${table} AS users`)
.where('users.project_id', campaign.project_id)
.where(qb => {
if (campaign.channel === 'email') {
qb.whereNotNull('users.email')
} else if (campaign.channel === 'text') {
qb.whereNotNull('users.phone')
} else if (campaign.channel === 'push') {
qb.whereNotNull('users.devices')
}
})
.whereNotExists(
UserList.query()
.whereIn('list_id', exclusionLists)
.where('user_id', ref('users.id')),
)
.whereNotExists(
UserSubscription.query()
.where('subscription_id', campaign.subscription_id)
.where('user_id', ref('users.id'))
.where('state', SubscriptionState.unsubscribed),
)
await chunk<CampaignSendParams>(query, 500, async (items) => {
await callback(items)
}, ({ user_id, timezone }: { user_id: number, timezone: string }) =>
CampaignSend.create(campaign, project, User.fromJson({ id: user_id, timezone })),
)
})
}