Campaign list generation performance improvements (#641)

This commit is contained in:
Chris Anderson 2025-03-07 21:39:15 -06:00 committed by GitHub
parent 30bde64257
commit d1c0cf1500
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 143 additions and 85 deletions

View file

@ -0,0 +1,32 @@
exports.up = async function(knex) {
await knex.schema.alterTable('campaign_sends', table => {
table.integer('id').alter()
})
await knex.raw('alter table campaign_sends drop primary key')
await knex.schema.alterTable('campaign_sends', table => {
table.dropColumn('id')
})
await knex.raw('alter table campaign_sends add primary key (campaign_id, user_id, reference_id)')
await knex.schema.alterTable('campaign_sends', table => {
table.index('user_id', 'campaign_sends_user_id_foreign')
table.dropUnique(['user_id', 'campaign_id', 'reference_id'])
})
}
exports.down = async function(knex) {
await knex.schema.alterTable('campaign_sends', table => {
table.unique(['user_id', 'campaign_id', 'reference_id'])
})
await knex.raw('alter table campaign_sends drop primary key')
await knex.schema.alterTable('campaign_sends', table => {
table.increments('id')
})
await knex.raw('alter table campaign_sends add primary key (id)')
}

View file

@ -65,7 +65,7 @@
"docker:build": "docker buildx build -f ./Dockerfile -t ghcr.io/parcelvoy/api:latest -t ghcr.io/parcelvoy/api:$npm_config_tag ../../",
"docker:build:push": "npm run docker:build -- --push",
"migration:create": "node ./scripts/create-migration.mjs",
"package:publish": "npm run build && npm version $npm_config_tag --no-git-tag-version && npm pack && npm publish --tag=latest --access public"
"package:publish": "npm run build && npm version $npm_config_tag --no-git-tag-version && npm pack && npm publish --access public"
},
"devDependencies": {
"@types/busboy": "^1.5.0",

View file

@ -1,6 +1,6 @@
import Provider from '../providers/Provider'
import { ChannelType } from '../config/channels'
import Model, { ModelParams } from '../core/Model'
import Model, { BaseModel, ModelParams } from '../core/Model'
import List from '../lists/List'
import Template from '../render/Template'
import Subscription from '../subscriptions/Subscription'
@ -58,7 +58,7 @@ export type CampaignUpdateParams = Omit<CampaignParams, 'channel' | 'type'>
export type CampaignSendState = 'pending' | 'sent' | 'throttled' | 'failed' | 'bounced' | 'aborted'
export type CampaignSendReferenceType = 'journey' | 'trigger'
export class CampaignSend extends Model {
export class CampaignSend extends BaseModel {
campaign_id!: number
user_id!: number
state!: CampaignSendState

View file

@ -33,8 +33,8 @@ export default class CampaignEnqueueSendsJob extends Job {
// Anything that is ready to be sent, enqueue for sending
const query = campaignSendReadyQuery(campaign.id, includeThrottled, ratePerMinute)
await chunk<{ user_id: number, send_id: number }>(query, 100, async (items) => {
const jobs = items.map(({ user_id, send_id }) => sendCampaignJob({ campaign, user: user_id, send_id }))
await chunk<{ user_id: number, reference_id?: string }>(query, 100, async (items) => {
const jobs = items.map(({ user_id, reference_id }) => sendCampaignJob({ campaign, user: user_id, reference_id }))
await App.main.queue.enqueueBatch(jobs)
})

View file

@ -25,7 +25,7 @@ export default class CampaignInteractJob extends Job {
if (!send) return
if (type === 'opened' && !send.opened_at) {
await updateCampaignSend(send.id, { opened_at: new Date() })
await updateCampaignSend(campaign_id, user_id, reference_id, { opened_at: new Date() })
await App.main.redis.sadd(CacheKeys.pendingStats, campaign_id)
}
@ -34,11 +34,11 @@ export default class CampaignInteractJob extends Job {
if (!send.opened_at) {
updates.opened_at = new Date()
}
await updateCampaignSend(send.id, updates)
await updateCampaignSend(campaign_id, user_id, reference_id, updates)
}
if (type === 'complained' || type === 'bounced') {
await updateCampaignSend(send.id, { state: 'bounced' })
await updateCampaignSend(campaign_id, user_id, reference_id, { state: 'bounced' })
}
if (subscription_id && action === 'unsubscribe') {

View file

@ -189,12 +189,12 @@ interface SendCampaign {
campaign: Campaign
user: User | number
event?: UserEvent | number
send_id?: number
exists?: boolean
reference_type?: CampaignSendReferenceType
reference_id?: string
}
export const triggerCampaignSend = async ({ campaign, user, event, send_id, reference_type, reference_id }: SendCampaign) => {
export const triggerCampaignSend = async ({ campaign, user, event, exists, reference_type, reference_id }: SendCampaign) => {
const userId = user instanceof User ? user.id : user
const eventId = event instanceof UserEvent ? event?.id : event
@ -202,8 +202,8 @@ export const triggerCampaignSend = async ({ campaign, user, event, send_id, refe
if (subscriptionState === SubscriptionState.unsubscribed) return
const reference = { reference_id, reference_type }
if (!send_id) {
send_id = await CampaignSend.insert({
if (!exists) {
await CampaignSend.insert({
campaign_id: campaign.id,
user_id: userId,
state: 'pending',
@ -216,18 +216,16 @@ export const triggerCampaignSend = async ({ campaign, user, event, send_id, refe
campaign,
user: userId,
event: eventId,
send_id,
...reference,
})
}
export const sendCampaignJob = ({ campaign, user, event, send_id, reference_type, reference_id }: SendCampaign): EmailJob | TextJob | PushJob | WebhookJob => {
export const sendCampaignJob = ({ campaign, user, event, reference_type, reference_id }: SendCampaign): EmailJob | TextJob | PushJob | WebhookJob => {
const body = {
campaign_id: campaign.id,
user_id: user instanceof User ? user.id : user,
event_id: event instanceof UserEvent ? event?.id : event,
send_id,
reference_type,
reference_id,
}
@ -239,9 +237,7 @@ export const sendCampaignJob = ({ campaign, user, event, send_id, reference_type
webhook: WebhookJob.from(body),
}
const job = channels[campaign.channel]
if (send_id) {
job.jobId(`sid${send_id}`)
}
job.jobId(`sid_${campaign.id}_${body.user_id}_${body.reference_id}`)
return job
}
@ -290,12 +286,16 @@ export const generateSendList = async (campaign: SentCampaign) => {
throw new RequestError('Unable to send to a campaign that does not have an associated list', 404)
}
// Clear any aborted sends
await clearCampaign(campaign)
// Generate the initial send list
const query = recipientQuery(campaign)
await chunk<CampaignSendParams>(query, 25, async (items) => {
await chunk<CampaignSendParams>(query, 250, async (items) => {
await CampaignSend.query()
.insert(items)
.onConflict(['campaign_id', 'user_id', 'reference_id'])
.merge(['state', 'send_at'])
.ignore()
}, ({ user_id, timezone }: { user_id: number, timezone: string }) =>
CampaignSend.create(campaign, project, User.fromJson({ id: user_id, timezone })),
)
@ -312,7 +312,7 @@ export const campaignSendReadyQuery = (
.where('campaign_sends.send_at', '<=', CampaignSend.raw('NOW()'))
.whereIn('campaign_sends.state', includeThrottled ? ['pending', 'throttled'] : ['pending'])
.where('campaign_id', campaignId)
.select('user_id', 'campaign_sends.id AS send_id')
.select('user_id', 'reference_id')
if (limit) query.limit(limit)
return query
}
@ -335,12 +335,12 @@ export const failStalledSends = async (campaign: Campaign) => {
.where('campaign_sends.send_at', '<', subDays(Date.now(), stalledDays))
.where('campaign_sends.state', 'throttled')
.where('campaign_id', campaign.id)
.select('id')
.select('user_id', 'campaign_id')
await chunk(query, 25, async (items) => {
await CampaignSend.query()
.update({ state: 'failed' })
.whereIn('id', items)
}, ({ id }: Pick<CampaignSend, 'id'>) => id)
.whereIn(['user_id', 'campaign_id'], items)
}, ({ user_id, campaign_id }: CampaignSend) => ([user_id, campaign_id]))
}
export const recipientQuery = (campaign: Campaign) => {
@ -395,6 +395,13 @@ export const abortCampaign = async (campaign: Campaign) => {
.update({ state: 'aborted' })
}
export const clearCampaign = async (campaign: Campaign) => {
await CampaignSend.query()
.where('campaign_id', campaign.id)
.whereIn('state', ['pending', 'throttled', 'aborted'])
.delete()
}
export const duplicateCampaign = async (campaign: Campaign) => {
const params: Partial<Campaign> = pick(campaign, ['project_id', 'list_ids', 'exclusion_list_ids', 'provider_id', 'subscription_id', 'channel', 'name', 'type'])
params.name = `Copy of ${params.name}`
@ -444,7 +451,7 @@ export const updateCampaignProgress = async (campaign: Campaign): Promise<void>
await Campaign.update(qb => qb.where('id', campaign.id).where('project_id', campaign.project_id), { state, delivery })
}
export const getCampaignSend = async (campaignId: number, userId: number, referenceId: string) => {
export const getCampaignSend = async (campaignId: number, userId: number, referenceId = '0') => {
return CampaignSend.first(qb => qb
.where('campaign_id', campaignId)
.where('user_id', userId)
@ -452,9 +459,12 @@ export const getCampaignSend = async (campaignId: number, userId: number, refere
)
}
export const updateCampaignSend = async (id: number, update: Partial<CampaignSend>) => {
export const updateCampaignSend = async (campaignId: number, userId: number, referenceId: string, update: Partial<CampaignSend>) => {
await CampaignSend.update(
qb => qb.where('id', id),
qb => qb
.where('campaign_id', campaignId)
.where('user_id', userId)
.where('reference_id', referenceId),
update,
)
}
@ -491,7 +501,7 @@ export const updateCampaignSendEnrollment = async (user: User) => {
.leftJoin('projects', 'projects.id', 'campaigns.project_id')
.where('campaigns.project_id', user.project_id)
.where('campaigns.state', 'scheduled')
.select('campaigns.*', 'campaign_sends.id AS send_id', 'campaign_sends.state AS send_state', 'projects.timezone') as Array<SentCampaign & { send_id: number, send_state: CampaignSendState, timezone: string }>
.select('campaigns.*', 'campaign_sends.user_id', 'campaign_sends.state AS send_state', 'projects.timezone') as Array<SentCampaign & { user_id: number, send_state: CampaignSendState, timezone: string }>
const join = []
const leave = []
@ -502,19 +512,19 @@ export const updateCampaignSendEnrollment = async (user: User) => {
// If user matches recipient query and they aren't already in the
// list, add to send list
if (match && !campaign.send_id) {
if (match && !campaign.user_id) {
join.push(CampaignSend.create(campaign, Project.fromJson({ timezone: campaign.timezone }), user))
}
// If user is not in recipient list but we have a record, remove from
// send list
if (!match && campaign.send_id && campaign.send_state === 'pending') {
leave.push(campaign.send_id)
if (!match && campaign.send_state === 'pending') {
leave.push([campaign.id, campaign.user_id])
}
}
if (leave.length) {
await CampaignSend.query().whereIn('id', leave).delete()
await CampaignSend.query().whereIn(['campaign_id', 'user_id'], leave).delete()
}
if (join.length) {
await CampaignSend.query()

View file

@ -9,21 +9,23 @@ export default class CampaignStateJob extends Job {
static async handler() {
// Fetch anything that is currently running or has finished
// within the last two days or has activity
const openedCampaignIds = await App.main.redis.smembers(CacheKeys.pendingStats)
// Fetch anything that is currently running, has finished
// within the last two days or has activity since last run
const openedCampaignIds = await App.main.redis.smembers(CacheKeys.pendingStats).then(ids => ids.map(parseInt))
const campaigns = await Campaign.query()
.whereIn('state', ['scheduled', 'running'])
.orWhere(function(qb) {
qb.where('state', 'finished')
.where('send_at', '>', subDays(Date.now(), 2))
})
.orWhereIn('id', openedCampaignIds.map(id => parseInt(id, 10))) as Campaign[]
.orWhereIn('id', openedCampaignIds) as Campaign[]
for (const campaign of campaigns) {
await updateCampaignProgress(campaign)
}
await App.main.redis.srem(CacheKeys.pendingStats, ...campaigns.map(c => c.id))
if (campaigns.length) {
await App.main.redis.srem(CacheKeys.pendingStats, ...campaigns.map(c => c.id))
}
}
}

View file

@ -15,16 +15,15 @@ export interface SearchResult<T> {
limit: number
}
export default class Model {
export class BaseModel {
id!: number
created_at: Date = new Date()
updated_at: Date = new Date()
static jsonAttributes: string[] = []
static virtualAttributes: string[] = []
static fromJson<T extends typeof Model>(this: T, json: Partial<InstanceType<T>>): InstanceType<T> {
static fromJson<T extends typeof BaseModel>(this: T, json: Partial<InstanceType<T>>): InstanceType<T> {
const model = new this()
// Remove any value that could conflict with a virtual key
@ -45,7 +44,7 @@ export default class Model {
return (this.constructor as any).toJson(this)
}
static toJson<T extends typeof Model>(this: T, model: any) {
static toJson<T extends typeof BaseModel>(this: T, model: any) {
const json: any = {}
const keys = [...Object.keys(model), ...this.virtualAttributes]
for (const key of keys) {
@ -73,11 +72,11 @@ export default class Model {
return json
}
static query<T extends typeof Model>(this: T, db: Database = App.main.db): Database.QueryBuilder<InstanceType<T>> {
static query<T extends typeof BaseModel>(this: T, db: Database = App.main.db): Database.QueryBuilder<InstanceType<T>> {
return this.table(db)
}
static async first<T extends typeof Model>(
static async first<T extends typeof BaseModel>(
this: T,
query: Query = (qb) => qb,
db: Database = App.main.db,
@ -87,7 +86,7 @@ export default class Model {
return this.fromJson(record)
}
static async find<T extends typeof Model>(
static async find<T extends typeof BaseModel>(
this: T,
id: number | string | undefined,
query: Query = (qb) => qb,
@ -105,21 +104,7 @@ export default class Model {
return this.fromJson(record)
}
static async findMap<T extends typeof Model>(
this: T,
ids: number[],
db: Database = App.main.db,
) {
const m = new Map<number, InstanceType<T>>()
if (!ids.length) return m
const records = await this.all(q => q.whereIn('id', ids), db)
for (const record of records) {
m.set(record.id, record)
}
return m
}
static async all<T extends typeof Model>(
static async all<T extends typeof BaseModel>(
this: T,
query: Query = qb => qb,
db: Database = App.main.db,
@ -128,28 +113,29 @@ export default class Model {
return records.map((item: any) => this.fromJson(item))
}
static async count<T extends typeof Model>(
static async count<T extends typeof BaseModel>(
this: T,
query: Query = qb => qb,
column?: string,
db: Database = App.main.db,
): Promise<number> {
return await query(this.table(db))
.clone()
.clearSelect()
.count(`${this.tableName}.id as C`)
.count(column ? `${column} AS C` : `${this.tableName}.id as C`)
.then(r => r[0].C || 0)
}
static async exists<T extends typeof Model>(
static async exists<T extends typeof BaseModel>(
this: T,
query: Query = qb => qb,
db: Database = App.main.db,
): Promise<boolean> {
const count = await this.count(qb => query(qb).limit(1), db)
const count = await this.count(qb => query(qb).limit(1), '*', db)
return count > 0
}
static async search<T extends typeof Model>(
static async search<T extends typeof BaseModel>(
this: T,
params: PageQueryParams<T>,
query: Query = qb => qb,
@ -245,9 +231,9 @@ export default class Model {
}
}
static async insert<T extends typeof Model>(this: T, data: Partial<InstanceType<T>>, db?: Database): Promise<number>
static async insert<T extends typeof Model>(this: T, data: Partial<InstanceType<T>>[], db?: Database): Promise<number[]>
static async insert<T extends typeof Model>(
static async insert<T extends typeof BaseModel>(this: T, data: Partial<InstanceType<T>>, db?: Database): Promise<number>
static async insert<T extends typeof BaseModel>(this: T, data: Partial<InstanceType<T>>[], db?: Database): Promise<number[]>
static async insert<T extends typeof BaseModel>(
this: T,
data: Partial<InstanceType<T>> | Partial<InstanceType<T>>[] = {},
db: Database = App.main.db,
@ -258,7 +244,7 @@ export default class Model {
return value[0]
}
static async insertAndFetch<T extends typeof Model>(
static async insertAndFetch<T extends typeof BaseModel>(
this: T,
data: Partial<InstanceType<T>> = {},
db: Database = App.main.db,
@ -269,7 +255,7 @@ export default class Model {
return model
}
static async update<T extends typeof Model>(
static async update<T extends typeof BaseModel>(
this: T,
query: Query,
data: Partial<InstanceType<T>> = {},
@ -279,7 +265,7 @@ export default class Model {
return await query(this.table(db)).update(formattedData)
}
static async updateAndFetch<T extends typeof Model>(
static async updateAndFetch<T extends typeof BaseModel>(
this: T,
id: number,
data: Partial<InstanceType<T>> = {},
@ -292,7 +278,7 @@ export default class Model {
return model
}
static async archive<T extends typeof Model>(
static async archive<T extends typeof BaseModel>(
this: T,
id: number,
query: Query = qb => qb,
@ -307,7 +293,7 @@ export default class Model {
return model
}
static async delete<T extends typeof Model>(
static async delete<T extends typeof BaseModel>(
this: T,
query: Query,
db: Database = App.main.db,
@ -315,7 +301,7 @@ export default class Model {
return await query(this.table(db)).delete()
}
static async deleteById<T extends typeof Model>(
static async deleteById<T extends typeof BaseModel>(
this: T,
id: number,
query: Query = qb => qb,
@ -329,7 +315,7 @@ export default class Model {
return count > 0
}
static scroll = async function * <T extends typeof Model>(
static scroll = async function * <T extends typeof BaseModel>(
this: T,
query: Query = qb => qb,
batchSize = 100,
@ -363,7 +349,7 @@ export default class Model {
static raw = raw
static build<T extends typeof Model>(
static build<T extends typeof BaseModel>(
query: Query,
db: Database = App.main.db,
): Database.QueryBuilder<InstanceType<T>> {
@ -380,4 +366,22 @@ export default class Model {
}
}
export default class Model extends BaseModel {
id!: number
static async findMap<T extends typeof Model>(
this: T,
ids: number[],
db: Database = App.main.db,
) {
const m = new Map<number, InstanceType<T>>()
if (!ids.length) return m
const records = await this.all(q => q.whereIn('id', ids), db)
for (const record of records) {
m.set(record.id, record)
}
return m
}
}
export type ModelParams = 'id' | 'created_at' | 'updated_at' | 'parseJson' | 'project_id' | 'toJSON'

View file

@ -1,5 +1,5 @@
import { JSONSchemaType } from 'ajv'
import Model from './Model'
import { BaseModel } from './Model'
export interface PageParams {
limit: number
@ -13,7 +13,7 @@ export interface PageParams {
id?: number[]
}
export interface PageQueryParams<T extends typeof Model> extends PageParams {
export interface PageQueryParams<T extends typeof BaseModel> extends PageParams {
fields?: Array<keyof InstanceType<T> | string>
mode?: 'exact' | 'partial'
}

View file

@ -307,12 +307,12 @@ export class JourneyAction extends JourneyStep {
// defer job construction so that we have the journey_user_step.id value
state.job(async () => {
const send_id = await getCampaignSend(campaign.id, state.user.id, `${userStep.id}`).then(s => s?.id)
const campaignSend = await getCampaignSend(campaign.id, state.user.id, `${userStep.id}`)
const send = triggerCampaignSend({
campaign,
user: state.user,
send_id,
exists: !!campaignSend,
reference_id: `${userStep.id}`,
reference_type: 'journey',
})

View file

@ -1,5 +1,4 @@
export interface MessageTrigger {
send_id?: number
campaign_id: number
user_id: number
event_id?: number

View file

@ -1,7 +1,7 @@
import { JourneyUserStep } from '../journey/JourneyStep'
import App from '../app'
import Campaign, { CampaignSend } from '../campaigns/Campaign'
import { updateSendState } from '../campaigns/CampaignService'
import Campaign from '../campaigns/Campaign'
import { getCampaignSend, updateSendState } from '../campaigns/CampaignService'
import { Channel } from '../config/channels'
import { RateLimitResponse } from '../config/rateLimit'
import { acquireLock } from '../core/Lock'
@ -30,12 +30,12 @@ interface MessageTriggerHydrated<T> {
context: RenderContext
}
export async function loadSendJob<T extends TemplateType>({ campaign_id, user_id, event_id, send_id, reference_type, reference_id }: MessageTrigger): Promise<MessageTriggerHydrated<T> | undefined> {
export async function loadSendJob<T extends TemplateType>({ campaign_id, user_id, event_id, reference_type, reference_id }: MessageTrigger): Promise<MessageTriggerHydrated<T> | undefined> {
const user = await User.find(user_id)
const event = await UserEvent.find(event_id)
const project = await Project.find(user?.project_id)
const send = await CampaignSend.find(send_id)
const send = await getCampaignSend(campaign_id, user_id, reference_id)
// If user or project is deleted, abort and discard job
if (!user || !project) return

View file

@ -1,5 +1,5 @@
import { logger } from '../../config/logger'
import { randomInt, sleep } from '../../utilities'
import { randomInt, sleep, uuid } from '../../utilities'
import { ExternalProviderParams, ProviderControllers, ProviderSchema } from '../Provider'
import { createController } from '../ProviderService'
import { Email } from './Email'
@ -24,6 +24,17 @@ export default class LoggerEmailProvider extends EmailProvider {
if (this.addLatency) await sleep(randomInt())
logger.info(message, 'provider:email:logger')
return {
messageId: uuid(),
messageSize: 0,
messageTime: Date.now(),
envelope: {},
accepted: [message.to],
rejected: [],
pending: [],
response: 'Message sent to logger',
}
}
async verify(): Promise<boolean> {