mirror of
https://fast.feibisi.com/https://github.com/parcelvoy/platform.git
synced 2025-09-12 14:08:50 +08:00
Performance and locking improvements (#212)
This commit is contained in:
parent
3558009bdf
commit
331f30ba1e
16 changed files with 192 additions and 210 deletions
|
@ -12,6 +12,7 @@ import { uuid } from './utilities'
|
|||
import Api from './api'
|
||||
import Worker from './worker'
|
||||
import ErrorHandler from './error/ErrorHandler'
|
||||
import { DefaultRedis, Redis } from './config/redis'
|
||||
|
||||
export default class App {
|
||||
private static $main: App
|
||||
|
@ -62,6 +63,7 @@ export default class App {
|
|||
api?: Api
|
||||
worker?: Worker
|
||||
rateLimiter: RateLimiter
|
||||
redis: Redis
|
||||
#registered: { [key: string | number]: unknown }
|
||||
|
||||
constructor(
|
||||
|
@ -74,6 +76,7 @@ export default class App {
|
|||
) {
|
||||
this.#registered = {}
|
||||
this.rateLimiter = loadRateLimit(env.redis)
|
||||
this.redis = DefaultRedis(env.redis)
|
||||
this.unhandledErrorListener()
|
||||
}
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ export type CampaignParams = Omit<Campaign, ModelParams | 'delivery' | 'eventNam
|
|||
export type CampaignCreateParams = Omit<CampaignParams, 'state'>
|
||||
export type CampaignUpdateParams = Omit<CampaignParams, 'channel' | 'type'>
|
||||
|
||||
export type CampaignSendState = 'pending' | 'sent' | 'throttled' | 'failed' | 'bounced' | 'aborted' | 'locked'
|
||||
export type CampaignSendState = 'pending' | 'sent' | 'throttled' | 'failed' | 'bounced' | 'aborted'
|
||||
export class CampaignSend extends Model {
|
||||
campaign_id!: number
|
||||
user_id!: number
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
import { Job } from '../queue'
|
||||
import { campaignSendReadyQuery, getCampaign, sendCampaign } from './CampaignService'
|
||||
import { campaignSendReadyQuery, getCampaign, sendCampaignJob } from './CampaignService'
|
||||
import { CampaignJobParams } from './Campaign'
|
||||
import { chunk } from '../utilities'
|
||||
import App from '../app'
|
||||
|
||||
export default class CampaignSendJob extends Job {
|
||||
static $name = 'campaign_send_job'
|
||||
|
@ -13,11 +15,10 @@ export default class CampaignSendJob extends Job {
|
|||
const campaign = await getCampaign(id, project_id)
|
||||
if (!campaign) return
|
||||
|
||||
await campaignSendReadyQuery(campaign.id)
|
||||
.stream(async function(stream) {
|
||||
for await (const { user_id, send_id } of stream) {
|
||||
await sendCampaign({ campaign, user: user_id, send_id })
|
||||
}
|
||||
})
|
||||
const query = campaignSendReadyQuery(campaign.id)
|
||||
await chunk<{ user_id: number, send_id: number }>(query, 100, async (items) => {
|
||||
const jobs = items.map(({ user_id, send_id }) => sendCampaignJob({ campaign, user: user_id, send_id }))
|
||||
await App.main.queue.enqueueBatch(jobs)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,12 +8,11 @@ import Campaign, { CampaignCreateParams, CampaignDelivery, CampaignParams, Campa
|
|||
import { UserList } from '../lists/List'
|
||||
import Subscription from '../subscriptions/Subscription'
|
||||
import { RequestError } from '../core/errors'
|
||||
import App from '../app'
|
||||
import { PageParams } from '../core/searchParams'
|
||||
import { allLists } from '../lists/ListService'
|
||||
import { allTemplates, duplicateTemplate, screenshotHtml, templateInUserLocale, validateTemplates } from '../render/TemplateService'
|
||||
import { getSubscription } from '../subscriptions/SubscriptionService'
|
||||
import { crossTimezoneCopy, pick } from '../utilities'
|
||||
import { chunk, crossTimezoneCopy, pick } from '../utilities'
|
||||
import { getProvider } from '../providers/ProviderRepository'
|
||||
import { createTagSubquery, getTags, setTags } from '../tags/TagService'
|
||||
import { getProject } from '../projects/ProjectService'
|
||||
|
@ -181,7 +180,7 @@ interface SendCampaign {
|
|||
send_id?: number
|
||||
}
|
||||
|
||||
export const sendCampaign = async ({ campaign, user, event, send_id }: SendCampaign): Promise<void> => {
|
||||
export const sendCampaignJob = ({ campaign, user, event, send_id }: SendCampaign): EmailJob | TextJob | PushJob | WebhookJob => {
|
||||
|
||||
// TODO: Might also need to check for unsubscribe in here since we can
|
||||
// do individual sends
|
||||
|
@ -192,12 +191,6 @@ export const sendCampaign = async ({ campaign, user, event, send_id }: SendCampa
|
|||
send_id,
|
||||
}
|
||||
|
||||
// TODO: Should filter out anyone who has already been through this
|
||||
// campaign before
|
||||
|
||||
// TODO: Create a `campaign_send` record for users coming through
|
||||
// this path
|
||||
|
||||
const channels = {
|
||||
email: EmailJob.from(body),
|
||||
text: TextJob.from(body),
|
||||
|
@ -205,7 +198,11 @@ export const sendCampaign = async ({ campaign, user, event, send_id }: SendCampa
|
|||
webhook: WebhookJob.from(body),
|
||||
}
|
||||
|
||||
await App.main.queue.enqueue(channels[campaign.channel])
|
||||
return channels[campaign.channel]
|
||||
}
|
||||
export const sendCampaign = async (data: SendCampaign): Promise<void> => {
|
||||
|
||||
await sendCampaignJob(data).queue()
|
||||
}
|
||||
|
||||
export const updateSendState = async (campaign: Campaign | number, user: User | number, state: CampaignSendState = 'sent') => {
|
||||
|
@ -221,7 +218,7 @@ export const updateSendState = async (campaign: Campaign | number, user: User |
|
|||
|
||||
// If no records were updated then try and create missing record
|
||||
if (records <= 0) {
|
||||
await CampaignSend.query()
|
||||
const records = await CampaignSend.query()
|
||||
.insert({
|
||||
user_id: userId,
|
||||
campaign_id: campaignId,
|
||||
|
@ -229,7 +226,10 @@ export const updateSendState = async (campaign: Campaign | number, user: User |
|
|||
})
|
||||
.onConflict(['user_id', 'list_id'])
|
||||
.merge(['state'])
|
||||
return Array.isArray(records) ? records[0] : records
|
||||
}
|
||||
|
||||
return records
|
||||
}
|
||||
|
||||
export const generateSendList = async (campaign: SentCampaign) => {
|
||||
|
@ -239,50 +239,27 @@ export const generateSendList = async (campaign: SentCampaign) => {
|
|||
throw new RequestError('Unable to send to a campaign that does not have an associated list', 404)
|
||||
}
|
||||
|
||||
const insertChunk = async (chunk: CampaignSendParams[]) => {
|
||||
const query = recipientQuery(campaign)
|
||||
await chunk<CampaignSendParams>(query, 100, async (items) => {
|
||||
if (chunk.length <= 0) return
|
||||
return await CampaignSend.query()
|
||||
.insert(chunk)
|
||||
await CampaignSend.query()
|
||||
.insert(items)
|
||||
.onConflict(['user_id', 'list_id'])
|
||||
.merge(['state', 'send_at'])
|
||||
}
|
||||
}, ({ user_id, timezone }: { user_id: number, timezone: string }) => ({
|
||||
user_id,
|
||||
campaign_id: campaign.id,
|
||||
state: 'pending',
|
||||
send_at: campaign.send_in_user_timezone
|
||||
? crossTimezoneCopy(
|
||||
campaign.send_at,
|
||||
project.timezone,
|
||||
timezone ?? project.timezone,
|
||||
)
|
||||
: campaign.send_at,
|
||||
}))
|
||||
|
||||
// Stream results so that we aren't overwhelmed by millions
|
||||
// of potential entries
|
||||
let chunk: CampaignSendParams[] = []
|
||||
await recipientQuery(campaign)
|
||||
.stream(async function(stream) {
|
||||
|
||||
// Create records of the send in a pending state
|
||||
// Once sent, each record will be updated accordingly
|
||||
const chunkSize = 100
|
||||
let i = 0
|
||||
for await (const { user_id, timezone } of stream) {
|
||||
chunk.push({
|
||||
user_id,
|
||||
campaign_id: campaign.id,
|
||||
state: 'pending',
|
||||
send_at: campaign.send_in_user_timezone
|
||||
? crossTimezoneCopy(
|
||||
campaign.send_at,
|
||||
project.timezone,
|
||||
timezone ?? project.timezone,
|
||||
)
|
||||
: campaign.send_at,
|
||||
})
|
||||
i++
|
||||
if (i % chunkSize === 0) {
|
||||
await insertChunk(chunk)
|
||||
chunk = []
|
||||
}
|
||||
}
|
||||
})
|
||||
.then(async function() {
|
||||
// Insert remaining items
|
||||
await insertChunk(chunk)
|
||||
|
||||
await Campaign.update(qb => qb.where('id', campaign.id), { state: 'scheduled' })
|
||||
})
|
||||
await Campaign.update(qb => qb.where('id', campaign.id), { state: 'scheduled' })
|
||||
}
|
||||
|
||||
export const campaignSendReadyQuery = (campaignId: number) => {
|
||||
|
|
|
@ -173,6 +173,7 @@ describe('CampaignService', () => {
|
|||
const list = await createList(params.project_id, {
|
||||
name: uuid(),
|
||||
type: 'static',
|
||||
is_visible: true,
|
||||
})
|
||||
const campaign = await createTestCampaign(params, {
|
||||
list_ids: [list.id],
|
||||
|
@ -201,10 +202,12 @@ describe('CampaignService', () => {
|
|||
const list = await createList(params.project_id, {
|
||||
name: uuid(),
|
||||
type: 'static',
|
||||
is_visible: true,
|
||||
})
|
||||
const list2 = await createList(params.project_id, {
|
||||
name: uuid(),
|
||||
type: 'static',
|
||||
is_visible: true,
|
||||
})
|
||||
const campaign = await createTestCampaign(params, {
|
||||
list_ids: [list.id],
|
||||
|
|
|
@ -1,25 +1,27 @@
|
|||
import { DefaultRedis, Redis, RedisConfig } from './redis'
|
||||
|
||||
// eslint-disable-next-line quotes
|
||||
const incrTtlLuaScript = `redis.call('set', KEYS[1], 0, 'EX', ARGV[2], 'NX') \
|
||||
local consumed = redis.call('incrby', KEYS[1], ARGV[1]) \
|
||||
local ttl = redis.call('pttl', KEYS[1]) \
|
||||
if ttl == -1 then \
|
||||
redis.call('expire', KEYS[1], ARGV[2]) \
|
||||
ttl = 1000 * ARGV[2] \
|
||||
end \
|
||||
return {consumed, ttl} \
|
||||
const slidingRateLimiterLuaScript = `
|
||||
local current_time = redis.call('TIME')
|
||||
local trim_time = tonumber(current_time[1]) - ARGV[1]
|
||||
redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, trim_time)
|
||||
local request_count = redis.call('ZCARD', KEYS[1])
|
||||
|
||||
if request_count < tonumber(ARGV[2]) then
|
||||
redis.call('ZADD', KEYS[1], current_time[1], current_time[1] .. current_time[2])
|
||||
redis.call('EXPIRE', KEYS[1], ARGV[1])
|
||||
return {request_count, 0}
|
||||
end
|
||||
return {request_count, 1}
|
||||
`
|
||||
|
||||
interface RateLimitRedis extends Redis {
|
||||
rlflxIncr(key: string, points: number, secDuration: number): Promise<[ consumed: number, ttl: number ]>
|
||||
slidingRateLimiter(key: string, window: number, points: number): Promise<[consumed: number, exceeded: number]>
|
||||
}
|
||||
|
||||
export interface RateLimitResponse {
|
||||
exceeded: boolean
|
||||
pointsRemaining: number
|
||||
pointsUsed: number
|
||||
msRemaining: number
|
||||
}
|
||||
|
||||
export default (config: RedisConfig) => {
|
||||
|
@ -30,20 +32,20 @@ export class RateLimiter {
|
|||
client: RateLimitRedis
|
||||
constructor(config: RedisConfig) {
|
||||
this.client = DefaultRedis(config) as RateLimitRedis
|
||||
this.client.defineCommand('rlflxIncr', {
|
||||
this.client.defineCommand('slidingRateLimiter', {
|
||||
numberOfKeys: 1,
|
||||
lua: incrTtlLuaScript,
|
||||
lua: slidingRateLimiterLuaScript,
|
||||
})
|
||||
}
|
||||
|
||||
async consume(key: string, limit: number, msDuration = 1000): Promise<RateLimitResponse> {
|
||||
const secDuration = Math.floor(msDuration / 1000)
|
||||
const response = await this.client.rlflxIncr(key, 1, secDuration)
|
||||
const window = Math.floor(msDuration / 1000)
|
||||
const [consumed, exceeded] = await this.client.slidingRateLimiter(key, window, limit)
|
||||
|
||||
return {
|
||||
exceeded: response[0] > limit,
|
||||
pointsRemaining: Math.max(0, limit - response[0]),
|
||||
pointsUsed: response[0],
|
||||
msRemaining: response[1],
|
||||
exceeded: exceeded === 1,
|
||||
pointsRemaining: Math.max(0, limit - consumed),
|
||||
pointsUsed: consumed,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,14 +1,13 @@
|
|||
import { cleanupExpiredRevokedTokens } from '../auth/TokenRepository'
|
||||
import { addSeconds, subDays, subHours } from 'date-fns'
|
||||
import { subDays, subHours } from 'date-fns'
|
||||
import nodeScheduler from 'node-schedule'
|
||||
import App from '../app'
|
||||
import CampaignTriggerJob from '../campaigns/CampaignTriggerJob'
|
||||
import JourneyDelayJob from '../journey/JourneyDelayJob'
|
||||
import ProcessListsJob from '../lists/ProcessListsJob'
|
||||
import Model from '../core/Model'
|
||||
import { sleep, randomInt } from '../utilities'
|
||||
import CampaignStateJob from '../campaigns/CampaignStateJob'
|
||||
import UserSchemaSyncJob from '../schema/UserSchemaSyncJob'
|
||||
import { uuid } from '../utilities'
|
||||
|
||||
export default (app: App) => {
|
||||
const scheduler = new Scheduler(app)
|
||||
|
@ -55,10 +54,10 @@ export class Scheduler {
|
|||
|
||||
async schedule({ rule, name, callback, lockLength = 3600 }: Schedule) {
|
||||
nodeScheduler.scheduleJob(rule, async () => {
|
||||
const lock = await SchedulerLock.acquire({
|
||||
const lock = await acquireLock({
|
||||
key: name ?? rule,
|
||||
owner: this.app.uuid,
|
||||
expiration: addSeconds(Date.now(), lockLength),
|
||||
timeout: lockLength,
|
||||
})
|
||||
if (lock) {
|
||||
callback()
|
||||
|
@ -71,82 +70,44 @@ export class Scheduler {
|
|||
}
|
||||
}
|
||||
|
||||
class JobLock extends Model {
|
||||
key!: string
|
||||
owner!: string
|
||||
expiration!: Date
|
||||
interface LockParams {
|
||||
key: string
|
||||
owner?: string
|
||||
timeout?: number
|
||||
}
|
||||
|
||||
type LockParams = Pick<JobLock, 'key' | 'owner' | 'expiration'>
|
||||
export const acquireLock = async ({
|
||||
key,
|
||||
owner,
|
||||
timeout = 60,
|
||||
}: LockParams) => {
|
||||
try {
|
||||
const result = await App.main.redis.set(
|
||||
`lock:${key}`,
|
||||
owner ?? uuid(),
|
||||
'EX',
|
||||
timeout,
|
||||
'NX',
|
||||
)
|
||||
|
||||
class SchedulerLock {
|
||||
// Because of the NX condition, value will only be set
|
||||
// if it hasn't been set already (original owner)
|
||||
if (result === null) {
|
||||
|
||||
/**
|
||||
* Attempt to get a lock for a given job so that it is not
|
||||
* executed multiple times.
|
||||
* @param job Partial<JobLock>
|
||||
* @returns Promise<boolean>
|
||||
*/
|
||||
static async acquire({ key, owner, expiration }: LockParams) {
|
||||
let acquired = false
|
||||
try {
|
||||
|
||||
// First try inserting a new lock for the given job
|
||||
await JobLock.insert({
|
||||
key,
|
||||
owner,
|
||||
expiration,
|
||||
})
|
||||
acquired = true
|
||||
} catch (error) {
|
||||
|
||||
// Because of the unique index, duplicate locks for a job
|
||||
// will fail. In which case lets next check if the lock
|
||||
// has expired or if current owner, extend the lock
|
||||
acquired = await this.extendLock({ key, owner, expiration })
|
||||
}
|
||||
|
||||
// Clean up any oddball pending jobs that are missed
|
||||
// Randomly run this job to reduce chance of deadlocks
|
||||
if (randomInt() < 10) {
|
||||
await sleep(randomInt(5, 20))
|
||||
const locks = await JobLock.all(
|
||||
qb => qb.where('expiration', '<=', new Date())
|
||||
.orderBy('id'),
|
||||
)
|
||||
await JobLock.delete(qb => qb.whereIn('id', locks.map(item => item.id)))
|
||||
}
|
||||
|
||||
return acquired
|
||||
}
|
||||
|
||||
static async extendLock({ key, owner, expiration }: LockParams, retry = 3): Promise<boolean> {
|
||||
|
||||
// If out of retries, fail
|
||||
if (retry <= 0) return false
|
||||
|
||||
// Update job can deadlock. In case of deadlock, retry operation
|
||||
// up to three times total before failing.
|
||||
try {
|
||||
const updatedCount = await JobLock.update(
|
||||
qb =>
|
||||
qb.where('key', key)
|
||||
.where((subQb) => {
|
||||
subQb.where('owner', owner)
|
||||
.orWhere('expiration', '<=', new Date())
|
||||
})
|
||||
.orderBy('id'),
|
||||
{
|
||||
owner,
|
||||
expiration,
|
||||
},
|
||||
)
|
||||
return updatedCount > 0
|
||||
} catch {
|
||||
|
||||
// Introduce jitter before trying again
|
||||
await sleep(randomInt(5, 20))
|
||||
return this.extendLock({ key, owner, expiration }, --retry)
|
||||
// Since we know there already is a lock, lets see if
|
||||
// it is this instance that owns it
|
||||
if (owner) {
|
||||
const value = await App.main.redis.get(`lock:${key}`)
|
||||
return value === owner
|
||||
}
|
||||
return false
|
||||
}
|
||||
return true
|
||||
} catch {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
export const releaseLock = async (key: string) => {
|
||||
await App.main.redis.del(`lock:${key}`)
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import ListPopulateJob from './ListPopulateJob'
|
|||
import { importUsers } from '../users/UserImport'
|
||||
import { FileStream } from '../storage/FileStream'
|
||||
import { createTagSubquery, getTags, setTags } from '../tags/TagService'
|
||||
import { chunk } from '../utilities'
|
||||
|
||||
export const pagedLists = async (params: PageParams, projectId: number) => {
|
||||
const result = await List.search(
|
||||
|
@ -169,34 +170,15 @@ export const populateList = async (list: List, rule: Rule) => {
|
|||
const version = oldVersion + 1
|
||||
await updateList(id, { state: 'loading', version })
|
||||
|
||||
type UserListChunk = { user_id: number, list_id: number, version: number }[]
|
||||
const insertChunk = async (chunk: UserListChunk) => {
|
||||
return await UserList.query()
|
||||
.insert(chunk)
|
||||
type UserListChunk = { user_id: number, list_id: number, version: number }
|
||||
const query = ruleQuery(rule)
|
||||
await chunk<UserListChunk>(query, 100, async (items) => {
|
||||
if (items.length <= 0) return
|
||||
await UserList.query()
|
||||
.insert(items)
|
||||
.onConflict(['user_id', 'list_id'])
|
||||
.merge(['version'])
|
||||
}
|
||||
|
||||
await ruleQuery(rule)
|
||||
.where('users.project_id', list.project_id)
|
||||
.stream(async function(stream) {
|
||||
|
||||
// Stream results and insert in chunks of 100
|
||||
const chunkSize = 100
|
||||
let chunk: UserListChunk = []
|
||||
let i = 0
|
||||
for await (const { id: user_id } of stream) {
|
||||
chunk.push({ user_id, list_id: id, version })
|
||||
i++
|
||||
if (i % chunkSize === 0) {
|
||||
await insertChunk(chunk)
|
||||
chunk = []
|
||||
}
|
||||
}
|
||||
|
||||
// Insert remaining items
|
||||
await insertChunk(chunk)
|
||||
})
|
||||
}, ({ id: user_id }: { id: number }) => ({ user_id, list_id: id, version }))
|
||||
|
||||
// Once list is regenerated, drop any users from previous version
|
||||
await UserList.delete(qb => qb
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
import App from '../app'
|
||||
import Campaign, { CampaignSend } from '../campaigns/Campaign'
|
||||
import { updateSendState } from '../campaigns/CampaignService'
|
||||
import { Channel } from '../config/channels'
|
||||
import { RateLimitResponse } from '../config/rateLimit'
|
||||
import { acquireLock } from '../config/scheduler'
|
||||
import Project from '../projects/Project'
|
||||
import { EncodedJob } from '../queue'
|
||||
import { RenderContext } from '../render'
|
||||
|
@ -9,9 +11,8 @@ import Template, { TemplateType } from '../render/Template'
|
|||
import { templateInUserLocale } from '../render/TemplateService'
|
||||
import { User } from '../users/User'
|
||||
import { UserEvent } from '../users/UserEvent'
|
||||
import EmailChannel from './email/EmailChannel'
|
||||
import { randomInt } from '../utilities'
|
||||
import { MessageTrigger } from './MessageTrigger'
|
||||
import TextChannel from './text/TextChannel'
|
||||
|
||||
interface MessageTriggerHydrated<T> {
|
||||
user: User
|
||||
|
@ -34,7 +35,7 @@ export async function loadSendJob<T extends TemplateType>({ campaign_id, user_id
|
|||
|
||||
// If there is a send and it's in an aborted state or has already
|
||||
// sent, abort this job to prevent duplicate sends
|
||||
if (send && (send.state === 'aborted' || send.state === 'sent' || send.state === 'locked')) return
|
||||
if (send && (send.state === 'aborted' || send.state === 'sent')) return
|
||||
|
||||
// Fetch campaign and templates
|
||||
const campaign = await Campaign.find(campaign_id)
|
||||
|
@ -67,8 +68,10 @@ export async function loadSendJob<T extends TemplateType>({ campaign_id, user_id
|
|||
return { campaign, template: template.map() as T, user, project, event, context }
|
||||
}
|
||||
|
||||
export const messageLock = (campaign: Campaign, user: User) => `parcelvoy:send:${campaign.id}:${user.id}`
|
||||
|
||||
export const prepareSend = async <T>(
|
||||
channel: EmailChannel | TextChannel,
|
||||
channel: Channel,
|
||||
message: MessageTriggerHydrated<T>,
|
||||
raw: EncodedJob,
|
||||
): Promise<boolean | undefined> => {
|
||||
|
@ -83,17 +86,20 @@ export const prepareSend = async <T>(
|
|||
// to the queue
|
||||
await updateSendState(campaign, user, 'throttled')
|
||||
|
||||
// Schedule the resend for after the throttle finishes
|
||||
await requeueSend(raw, rateCheck.msRemaining)
|
||||
// Schedule the resend for a jittered number of seconds later
|
||||
const delay = 1000 + randomInt(0, 5000)
|
||||
await requeueSend(raw, delay)
|
||||
return false
|
||||
}
|
||||
|
||||
await updateSendState(campaign, user, 'locked')
|
||||
// Create a lock for this process to make sure it doesn't run twice
|
||||
const acquired = await acquireLock({ key: messageLock(campaign, user) })
|
||||
if (!acquired) return false
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
export const throttleSend = async (channel: EmailChannel | TextChannel): Promise<RateLimitResponse | undefined> => {
|
||||
export const throttleSend = async (channel: Channel): Promise<RateLimitResponse | undefined> => {
|
||||
const provider = channel.provider
|
||||
|
||||
// If no rate limit, just break
|
||||
|
@ -108,5 +114,5 @@ export const throttleSend = async (channel: EmailChannel | TextChannel): Promise
|
|||
|
||||
export const requeueSend = async (job: EncodedJob, delay: number): Promise<void> => {
|
||||
job.options.delay = delay
|
||||
return App.main.queue.enqueue(job)
|
||||
return await App.main.queue.enqueue(job)
|
||||
}
|
||||
|
|
|
@ -2,11 +2,12 @@ import Job from '../../queue/Job'
|
|||
import { createEvent } from '../../users/UserEventRepository'
|
||||
import { MessageTrigger } from '../MessageTrigger'
|
||||
import { updateSendState } from '../../campaigns/CampaignService'
|
||||
import { loadEmailChannel } from '.'
|
||||
import { loadSendJob, prepareSend } from '../MessageTriggerService'
|
||||
import { loadEmailChannel } from './index'
|
||||
import { loadSendJob, messageLock, prepareSend } from '../MessageTriggerService'
|
||||
import { EmailTemplate } from '../../render/Template'
|
||||
import { EncodedJob } from '../../queue'
|
||||
import App from '../../app'
|
||||
import { releaseLock } from '../../config/scheduler'
|
||||
|
||||
export default class EmailJob extends Job {
|
||||
static $name = 'email'
|
||||
|
@ -24,12 +25,13 @@ export default class EmailJob extends Job {
|
|||
// Load email channel so its ready to send
|
||||
const channel = await loadEmailChannel(campaign.provider_id, project.id)
|
||||
if (!channel) {
|
||||
await updateSendState(campaign, user, 'failed')
|
||||
await updateSendState(campaign, user, 'aborted')
|
||||
App.main.error.notify(new Error('Unabled to send when there is no channel available.'))
|
||||
return
|
||||
}
|
||||
|
||||
// Check current send rate and if the send is locked
|
||||
const isReady = prepareSend(channel, data, raw)
|
||||
const isReady = await prepareSend(channel, data, raw)
|
||||
if (!isReady) return
|
||||
|
||||
try {
|
||||
|
@ -39,6 +41,7 @@ export default class EmailJob extends Job {
|
|||
// On error, mark as failed and notify just in case
|
||||
await updateSendState(campaign, user, 'failed')
|
||||
App.main.error.notify(error)
|
||||
return
|
||||
}
|
||||
|
||||
// Update send record
|
||||
|
@ -49,5 +52,7 @@ export default class EmailJob extends Job {
|
|||
name: campaign.eventName('sent'),
|
||||
data: context,
|
||||
})
|
||||
|
||||
await releaseLock(messageLock(campaign, user))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,13 +1,14 @@
|
|||
import { Job } from '../../queue'
|
||||
import { EncodedJob, Job } from '../../queue'
|
||||
import { PushTemplate } from '../../render/Template'
|
||||
import { createEvent } from '../../users/UserEventRepository'
|
||||
import { MessageTrigger } from '../MessageTrigger'
|
||||
import PushError from './PushError'
|
||||
import { disableNotifications } from '../../users/UserRepository'
|
||||
import { updateSendState } from '../../campaigns/CampaignService'
|
||||
import { loadSendJob } from '../MessageTriggerService'
|
||||
import { loadSendJob, messageLock, prepareSend } from '../MessageTriggerService'
|
||||
import { loadPushChannel } from '.'
|
||||
import App from '../../app'
|
||||
import { releaseLock } from '../../config/scheduler'
|
||||
|
||||
export default class PushJob extends Job {
|
||||
static $name = 'push'
|
||||
|
@ -16,7 +17,7 @@ export default class PushJob extends Job {
|
|||
return new this(data)
|
||||
}
|
||||
|
||||
static async handler(trigger: MessageTrigger) {
|
||||
static async handler(trigger: MessageTrigger, raw: EncodedJob) {
|
||||
const data = await loadSendJob<PushTemplate>(trigger)
|
||||
if (!data) return
|
||||
|
||||
|
@ -26,12 +27,13 @@ export default class PushJob extends Job {
|
|||
// Load email channel so its ready to send
|
||||
const channel = await loadPushChannel(campaign.provider_id, project.id)
|
||||
if (!channel) {
|
||||
await updateSendState(campaign, user, 'failed')
|
||||
await updateSendState(campaign, user, 'aborted')
|
||||
return
|
||||
}
|
||||
|
||||
// Lock the send record to prevent duplicate sends
|
||||
await updateSendState(campaign, user, 'locked')
|
||||
// Check current send rate and if the send is locked
|
||||
const isReady = await prepareSend(channel, data, raw)
|
||||
if (!isReady) return
|
||||
|
||||
// Send the push and update the send record
|
||||
await channel.send(template, { user, event, context })
|
||||
|
@ -43,6 +45,8 @@ export default class PushJob extends Job {
|
|||
data: context,
|
||||
})
|
||||
|
||||
await releaseLock(messageLock(campaign, user))
|
||||
|
||||
} catch (error: any) {
|
||||
if (error instanceof PushError) {
|
||||
|
||||
|
|
|
@ -3,8 +3,10 @@ import { TextTemplate } from '../../render/Template'
|
|||
import { createEvent } from '../../users/UserEventRepository'
|
||||
import { MessageTrigger } from '../MessageTrigger'
|
||||
import { updateSendState } from '../../campaigns/CampaignService'
|
||||
import { loadSendJob, prepareSend } from '../MessageTriggerService'
|
||||
import { loadSendJob, messageLock, prepareSend } from '../MessageTriggerService'
|
||||
import { loadTextChannel } from '.'
|
||||
import { releaseLock } from '../../config/scheduler'
|
||||
import App from '../../app'
|
||||
|
||||
export default class TextJob extends Job {
|
||||
static $name = 'text'
|
||||
|
@ -23,12 +25,13 @@ export default class TextJob extends Job {
|
|||
// Send and render text
|
||||
const channel = await loadTextChannel(campaign.provider_id, project.id)
|
||||
if (!channel) {
|
||||
await updateSendState(campaign, user, 'failed')
|
||||
await updateSendState(campaign, user, 'aborted')
|
||||
App.main.error.notify(new Error('Unabled to send when there is no channel available.'))
|
||||
return
|
||||
}
|
||||
|
||||
// Check current send rate and if the send is locked
|
||||
const isReady = prepareSend(channel, data, raw)
|
||||
const isReady = await prepareSend(channel, data, raw)
|
||||
if (!isReady) return
|
||||
|
||||
await channel.send(template, { user, event, context })
|
||||
|
@ -41,5 +44,7 @@ export default class TextJob extends Job {
|
|||
name: campaign.eventName('sent'),
|
||||
data: context,
|
||||
})
|
||||
|
||||
await releaseLock(messageLock(campaign, user))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
import { Job } from '../../queue'
|
||||
import { EncodedJob, Job } from '../../queue'
|
||||
import { MessageTrigger } from '../MessageTrigger'
|
||||
import { WebhookTemplate } from '../../render/Template'
|
||||
import { createEvent } from '../../users/UserEventRepository'
|
||||
import { updateSendState } from '../../campaigns/CampaignService'
|
||||
import { loadSendJob } from '../MessageTriggerService'
|
||||
import { loadSendJob, messageLock, prepareSend } from '../MessageTriggerService'
|
||||
import { loadWebhookChannel } from '.'
|
||||
import { releaseLock } from '../../config/scheduler'
|
||||
|
||||
export default class WebhookJob extends Job {
|
||||
static $name = 'webhook'
|
||||
|
@ -13,7 +14,7 @@ export default class WebhookJob extends Job {
|
|||
return new this(data)
|
||||
}
|
||||
|
||||
static async handler(trigger: MessageTrigger) {
|
||||
static async handler(trigger: MessageTrigger, raw: EncodedJob) {
|
||||
const data = await loadSendJob<WebhookTemplate>(trigger)
|
||||
if (!data) return
|
||||
|
||||
|
@ -22,18 +23,25 @@ export default class WebhookJob extends Job {
|
|||
// Send and render webhook
|
||||
const channel = await loadWebhookChannel(campaign.provider_id, project.id)
|
||||
if (!channel) {
|
||||
await updateSendState(campaign, user, 'failed')
|
||||
await updateSendState(campaign, user, 'aborted')
|
||||
return
|
||||
}
|
||||
|
||||
// Check current send rate and if the send is locked
|
||||
const isReady = await prepareSend(channel, data, raw)
|
||||
if (!isReady) return
|
||||
|
||||
await channel.send(template, { user, event, context })
|
||||
|
||||
// Update send record
|
||||
await updateSendState(campaign, user)
|
||||
|
||||
// Create an event on the user about the email
|
||||
createEvent(user, {
|
||||
await createEvent(user, {
|
||||
name: campaign.eventName('sent'),
|
||||
data: context,
|
||||
})
|
||||
|
||||
await releaseLock(messageLock(campaign, user))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import crypto from 'crypto'
|
|||
import Hashids from 'hashids'
|
||||
import { differenceInSeconds } from 'date-fns'
|
||||
import { utcToZonedTime } from 'date-fns-tz'
|
||||
import { Database } from '../config/database'
|
||||
|
||||
export const pluralize = (noun: string, count = 2, suffix = 's') => `${noun}${count !== 1 ? suffix : ''}`
|
||||
|
||||
|
@ -173,3 +174,28 @@ export function shallowEqual(object1: any, object2: any) {
|
|||
if (keys1.length !== keys2.length) return false
|
||||
return keys1.every(key => object1[key] === object2[key])
|
||||
}
|
||||
|
||||
type ChunkCallback<T> = (chunk: T[]) => Promise<void>
|
||||
|
||||
export const chunk = async <T>(
|
||||
query: Database.QueryBuilder,
|
||||
size = 100,
|
||||
callback: ChunkCallback<T>,
|
||||
modifier: (result: any) => T = (result) => result,
|
||||
) => {
|
||||
let chunk: T[] = []
|
||||
await query.stream(async function(stream) {
|
||||
let i = 0
|
||||
for await (const result of stream) {
|
||||
chunk.push(modifier(result))
|
||||
i++
|
||||
if (i % size === 0) {
|
||||
await callback(chunk)
|
||||
chunk = []
|
||||
}
|
||||
}
|
||||
})
|
||||
.then(async function() {
|
||||
await callback(chunk)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -326,7 +326,7 @@ export interface Campaign {
|
|||
updated_at: string
|
||||
}
|
||||
|
||||
export type CampaignSendState = 'pending' | 'throttled' | 'bounced' | 'sent' | 'failed' | 'locked'
|
||||
export type CampaignSendState = 'pending' | 'throttled' | 'bounced' | 'sent' | 'failed'
|
||||
|
||||
export type CampaignUpdateParams = Partial<Pick<Campaign, 'name' | 'state' | 'list_ids' | 'exclusion_list_ids' | 'subscription_id' | 'tags'>>
|
||||
export type CampaignCreateParams = Pick<Campaign, 'name' | 'type' | 'list_ids' | 'exclusion_list_ids' | 'channel' | 'subscription_id' | 'provider_id' | 'tags'>
|
||||
|
|
|
@ -18,7 +18,6 @@ export const CampaignSendTag = ({ state }: { state: CampaignSendState }) => {
|
|||
bounced: 'error',
|
||||
sent: 'success',
|
||||
failed: 'error',
|
||||
locked: undefined,
|
||||
}
|
||||
|
||||
return <Tag variant={variant[state]}>
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue