Improvements to row locking (#649)

This commit is contained in:
Chris Anderson 2025-03-09 09:04:04 -05:00 committed by GitHub
parent 4c62f60cf8
commit 7b7a73f912
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 14 additions and 13 deletions

View file

@ -292,7 +292,7 @@ export const generateSendList = async (campaign: SentCampaign) => {
const now = Date.now()
const cacheKey = CacheKeys.populationProgress(campaign)
logger.debug({ campaignId: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress:started')
logger.info({ campaignId: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress:started')
let lastId = 0
let isExhausted = false
@ -302,18 +302,14 @@ export const generateSendList = async (campaign: SentCampaign) => {
project,
sinceId: lastId,
callback: async (items) => {
await CampaignSend.query()
.insert(items)
.onConflict()
.ignore()
await cacheIncr(App.main.redis, cacheKey, items.length, 300)
},
limit: 10_000,
}))
logger.debug({ campaign: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress')
logger.info({ campaign: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress')
}
logger.debug({ lastId, campaignId: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress:finished')
logger.info({ lastId, campaignId: campaign.id, elapsed: Date.now() - now }, 'campaign:generate:progress:finished')
await Campaign.update(qb => qb.where('id', campaign.id), { state: 'scheduled' })
}
@ -608,7 +604,13 @@ export const recipientPartialQuery = async ({ campaign, project, sinceId, callba
.where('state', SubscriptionState.unsubscribed),
)
await chunk<CampaignSendParams>(query, 250, async (items) => {
await chunk<CampaignSendParams>(query, 1000, async (items) => {
await App.main.db.transaction(async (trx) => {
await CampaignSend.query(trx)
.insert(items)
.onConflict()
.ignore()
}, { isolationLevel: 'read committed' })
await callback(items)
}, ({ user_id, timezone }: { user_id: number, timezone: string }) => {
return CampaignSend.create(campaign, project, User.fromJson({ id: user_id, timezone }))

View file

@ -15,14 +15,13 @@ export default class ProcessCampaignsJob extends Job {
.where('type', 'blast') as Campaign[]
for (const campaign of campaigns) {
// When pending we need to regenerate send list
// When in loading state we need to regenerate send list
if (campaign.state === 'loading') {
await CampaignGenerateListJob.from(campaign).queue()
// Otherwise lets look through messages that are ready to send
} else {
await CampaignEnqueueSendsJob.from(campaign).queue()
}
// Start looking through messages that are ready to send
await CampaignEnqueueSendsJob.from(campaign).queue()
}
}
}