feat: adds proper provider based limiting and improves journey processing

This commit is contained in:
Chris Anderson 2025-08-11 08:21:52 -05:00
parent cca0277e5f
commit 5b85b581a7
23 changed files with 177 additions and 231 deletions

View file

@ -147,8 +147,8 @@ export default class App {
await this.forceClose('uncaughtException', 'uncaught error', error)
})
process.on('unhandledRejection', async (error, promise) => {
await this.forceClose('unhandledRejection', `uncaught error: ${promise}, reason: ${error}`)
process.on('unhandledRejection', async (error: Error, promise) => {
await this.forceClose('unhandledRejection', `uncaught error: ${promise}, reason: ${error}, stack: ${error?.stack}`)
})
}
}

View file

@ -4,6 +4,9 @@ import { chunk } from '../utilities'
import App from '../app'
import { getProvider } from '../providers/ProviderRepository'
import { ChannelType } from '../config/channels'
import { cacheIncr } from '../config/redis'
import { sendsAvailable } from '../providers/ProviderService'
import Provider from '../providers/Provider'
type CampaignEnqueueSendsJobParams = {
provider_id: number
@ -26,39 +29,11 @@ export default class CampaignEnqueueSendsJob extends Job {
static async handler({ provider_id }: CampaignEnqueueSendsJobParams) {
// Only enqueue the maximum that can be sent for the interval
// this job runs (every minute)
// this job runs (every 15 seconds)
const provider = await getProvider(provider_id)
if (!provider) return
const ratePerMinute = provider?.ratePer('minute')
const ratePerSecond = provider?.ratePer('second')
// Get the number of sends that have been sent this period
// How often should we run this? Every ten seconds?
// backlog = number of ones to still process, can be up to 2x rate limit
// points = how many do we have left for the period
// 10 left this second
// 100 queued
// time remaining? Does it matter?
// If we add 600 to the queue and the rate limit is 10/sec
// ten seconds later how many can we add?
// the window has shifted by 10 seconds so we can add 100 more
// we then need to set how many have been added to the queue
// in the given window so that we can shift it appropriately
const key = 'campaigns:provider:' + provider.id + ':rate_limit'
const response = await App.main.redis
.multi()
.get(key)
.ttl(key)
.exec()
const consumed = parseInt(response?.[0][1] as string) || 0
const ttl = parseInt(response?.[1][1] as string) || 0
const expired = (60 - ttl) * ratePerSecond
const available = ratePerMinute - expired
const available = sendsAvailable(provider)
// Anything that is ready to be sent, enqueue for sending
const query = providerSendReadyQuery(provider_id, available)
@ -72,6 +47,6 @@ export default class CampaignEnqueueSendsJob extends Job {
await App.main.queue.enqueueBatch(jobs)
count += items.length
})
await App.main.redis.set(key, consumed + count, 'EX', 60)
await cacheIncr(Provider.cacheKey.consumed(provider.id), count, 15)
}
}

View file

@ -1,7 +1,6 @@
import { logger } from '../config/logger'
import { Job } from '../queue'
import { CampaignJobParams, SentCampaign } from './Campaign'
import CampaignEnqueueSendsJob from './CampaignEnqueueSendsJob'
import { getCampaign, populateSendList } from './CampaignService'
export default class CampaignGenerateListJob extends Job {
@ -22,11 +21,7 @@ export default class CampaignGenerateListJob extends Job {
logger.info({ campaignId: id }, 'campaign:generate:populating')
await populateSendList(campaign)
logger.info({ campaignId: id }, 'campaign:generate:sending')
await CampaignEnqueueSendsJob.from({
id: campaign.id,
project_id: campaign.project_id,
}).queue()
logger.info({ campaignId: id }, 'campaign:generate:finished')
} catch (error) {
logger.info({ campaignId: id, error }, 'campaign:generate:failed')
throw error

View file

@ -13,7 +13,8 @@ export default class CampaignProcessSendsJob extends Job {
// For each provider, enqueue a job to load up all sends that are
// possible within the given parameters of the provider
const providers = await Provider.all(qb => qb.select('id'))
const groups = ['email', 'push', 'text', 'webhook']
const providers = await Provider.all(qb => qb.whereIn('group', groups).select('id'))
for (const { id } of providers) {
await CampaignEnqueueSendsJob.from({ provider_id: id }).queue()
}

View file

@ -4,14 +4,14 @@ import TextJob from '../providers/text/TextJob'
import EmailJob from '../providers/email/EmailJob'
import { logger } from '../config/logger'
import { User } from '../users/User'
import Campaign, { CampaignCreateParams, CampaignDelivery, campaignEndedStates, CampaignParams, CampaignPopulationProgress, CampaignProgress, CampaignSend, CampaignSendReferenceType, CampaignSendState, CampaignState, SentCampaign } from './Campaign'
import Campaign, { CampaignCreateParams, CampaignDelivery, campaignEndedStates, CampaignParams, CampaignPopulationProgress, CampaignProgress, CampaignSend, CampaignSendParams, CampaignSendReferenceType, CampaignSendState, CampaignState, SentCampaign } from './Campaign'
import List from '../lists/List'
import Subscription, { SubscriptionState } from '../subscriptions/Subscription'
import Subscription from '../subscriptions/Subscription'
import { RequestError } from '../core/errors'
import { PageParams } from '../core/searchParams'
import { allLists } from '../lists/ListService'
import { allTemplates, duplicateTemplate, screenshotHtml, templateInUserLocale, validateTemplates } from '../render/TemplateService'
import { getSubscription, getUserSubscriptionState } from '../subscriptions/SubscriptionService'
import { getSubscription, isUserUnsubscribed } from '../subscriptions/SubscriptionService'
import { batch, chunk, cleanString, pick, shallowEqual } from '../utilities'
import { getProvider } from '../providers/ProviderRepository'
import { createTagSubquery, getTags, setTags } from '../tags/TagService'
@ -29,6 +29,8 @@ import { getJourneysForCampaign } from '../journey/JourneyService'
import { createAuditLog } from '../core/audit/AuditService'
import { WithAdmin } from '../core/audit/Audit'
import { processUsers } from '../users/ProcessUsers'
import { ChannelType } from '../config/channels'
import { canSendImmediately } from '../providers/ProviderService'
export const CacheKeys = {
pendingStats: 'campaigns:pending_stats',
@ -253,13 +255,14 @@ type TriggerCampaign = {
user: User
} & SendCampaign
export const triggerCampaignSend = async ({ campaign, user, exists, reference_type, reference_id }: TriggerCampaign) => {
class CampaignSendError extends Error {}
export const triggerCampaignSend = async ({ campaign, user, exists, reference_type, reference_id }: TriggerCampaign): Promise<EmailJob | TextJob | PushJob | WebhookJob | undefined> => {
// Check if the user can receive the campaign and has not unsubscribed
if (!canSendCampaignToUser(campaign, user)) return
const subscriptionState = await getUserSubscriptionState(user, campaign.subscription_id)
if (subscriptionState === SubscriptionState.unsubscribed) return
const isUnsubscribed = await isUserUnsubscribed(user, campaign.subscription_id)
const hasChannel = isChannelAvailable(campaign.channel, user)
if (!hasChannel || isUnsubscribed) throw new CampaignSendError()
// If the send doesn't already exist, lets create it ahead of scheduling
const reference = { reference_id, reference_type }
@ -273,6 +276,10 @@ export const triggerCampaignSend = async ({ campaign, user, exists, reference_ty
})
}
// Check if the provider queue is able to send right now
const canSend = await canSendImmediately(campaign.provider_id)
if (!canSend) return
return sendCampaignJob({
campaign,
user,
@ -348,10 +355,9 @@ const cleanupSendListGeneration = async (campaign: Campaign) => {
}
const cleanupGenerationCacheKeys = async (campaign: Campaign) => {
const redis = App.main.redis
await cacheDel(redis, CacheKeys.generate(campaign))
await cacheDel(redis, CacheKeys.populationTotal(campaign))
await cacheDel(redis, CacheKeys.populationProgress(campaign))
await cacheDel(CacheKeys.generate(campaign))
await cacheDel(CacheKeys.populationTotal(campaign))
await cacheDel(CacheKeys.populationProgress(campaign))
}
export const populateSendList = async (campaign: SentCampaign) => {
@ -362,7 +368,6 @@ export const populateSendList = async (campaign: SentCampaign) => {
}
const now = Date.now()
const redis = App.main.redis
const oneDay = 86400 // 24 hours in seconds
const progressCacheKey = CacheKeys.populationProgress(campaign)
const totalCacheKey = CacheKeys.populationTotal(campaign)
@ -410,8 +415,8 @@ export const populateSendList = async (campaign: SentCampaign) => {
value: cleanString(user.timezone) ?? project.timezone,
}),
beforeCallback: async (count: number) => {
await cacheSet<number>(redis, progressCacheKey, 0, oneDay)
await cacheSet(redis, totalCacheKey, count, oneDay)
await cacheSet<number>(progressCacheKey, 0, oneDay)
await cacheSet(totalCacheKey, count, oneDay)
// Double check that the campaign hasn't been aborted
const updatedCampaign = await getCampaign(campaign.id, campaign.project_id) as SentCampaign
@ -420,7 +425,7 @@ export const populateSendList = async (campaign: SentCampaign) => {
callback: async (pairs: DataPair[]) => {
const items = pairs.map(({ key, value }) => CampaignSend.create(campaign, project, { id: parseInt(key), timezone: value }))
await insertRows(items)
await cacheIncr(redis, progressCacheKey, items.length, oneDay)
await cacheIncr(progressCacheKey, items.length, oneDay)
},
afterCallback: async () => {
await cleanupSendListGeneration(campaign)
@ -446,7 +451,7 @@ export const campaignSendReadyQuery = (
export const providerSendReadyQuery = (
providerId: number,
limit: number,
limit: number | undefined,
) => {
return CampaignSend.query()
.leftJoin('campaigns', 'campaigns.id', 'campaign_sends.campaign_id')
@ -455,7 +460,7 @@ export const providerSendReadyQuery = (
.where('campaigns.provider_id', providerId)
.whereNotIn('campaigns.state', campaignEndedStates)
.select('campaign_id', 'user_id', 'reference_id', 'campaigns.channel')
.limit(limit)
.when(!!limit, qb => qb.limit(limit || 0))
}
export const failStalledSends = async () => {
@ -542,8 +547,8 @@ export const duplicateCampaign = async (campaign: Campaign, adminId?: number) =>
export const campaignPopulationProgress = async (campaign: Campaign): Promise<CampaignPopulationProgress> => {
return {
complete: await cacheGet<number>(App.main.redis, CacheKeys.populationProgress(campaign)) ?? 0,
total: await cacheGet<number>(App.main.redis, CacheKeys.populationTotal(campaign)) ?? 0,
complete: await cacheGet<number>(CacheKeys.populationProgress(campaign)) ?? 0,
total: await cacheGet<number>(CacheKeys.populationTotal(campaign)) ?? 0,
}
}
@ -624,9 +629,9 @@ export const estimatedSendSize = async (campaign: Campaign) => {
return lists.reduce((acc, list) => (list.users_count ?? 0) + acc, 0)
}
export const canSendCampaignToUser = (campaign: Campaign, user: Pick<User, 'email' | 'phone' | 'has_push_device' | 'devices'>) => {
if (campaign.channel === 'email' && !user.email) return false
if (campaign.channel === 'text' && !user.phone) return false
if (campaign.channel === 'push' && !(user.has_push_device || !!user.devices)) return false
export const isChannelAvailable = (channel: ChannelType, user: Pick<User, 'email' | 'phone' | 'has_push_device'>) => {
if (channel === 'email' && !user.email) return false
if (channel === 'text' && !user.phone) return false
if (channel === 'push' && !user.has_push_device) return false
return true
}

View file

@ -1,43 +0,0 @@
import { Job } from '../queue'
import Campaign from './Campaign'
import CampaignGenerateListJob from './CampaignGenerateListJob'
import CampaignEnqueueSendsJob from './CampaignEnqueueSendsJob'
import { failStalledSends } from './CampaignService'
import Provider from '../providers/Provider'
export default class ProcessCampaignsJob extends Job {
static $name = 'process_campaigns_job'
static from(): ProcessCampaignsJob {
return new this().deduplicationKey(this.$name)
}
static async handler() {
const campaigns = await Campaign.query()
.whereIn('state', ['loading', 'scheduled', 'running'])
.whereNotNull('send_at')
.whereNull('deleted_at')
.where('type', 'blast') as Campaign[]
for (const campaign of campaigns) {
// When in loading state we need to regenerate send list
if (campaign.state === 'loading') {
await CampaignGenerateListJob.from(campaign).queue()
}
// Start looking through messages that are ready to send
await CampaignEnqueueSendsJob.from(campaign).queue()
}
// For each provider, enqueue a job to load up all sends that are
// possible within the given parameters of the provider
const providers = await Provider.all(qb => qb.select('id'))
for (const { id } of providers) {
await CampaignEnqueueSendsJob.from({ provider_id: id }).queue()
}
// Look for items that have stalled out and mark them as failed
await failStalledSends()
}
}

View file

@ -10,7 +10,6 @@ import JourneyDelayJob from '../journey/JourneyDelayJob'
import JourneyProcessJob from '../journey/JourneyProcessJob'
import ListStatsJob from '../lists/ListStatsJob'
import ProcessListsJob from '../lists/ProcessListsJob'
import ProcessCampaignsJob from '../campaigns/ProcessCampaignsJob'
import CampaignEnqueueSendJob from '../campaigns/CampaignEnqueueSendsJob'
import CampaignStateJob from '../campaigns/CampaignStateJob'
import CampaignGenerateListJob from '../campaigns/CampaignGenerateListJob'
@ -26,12 +25,16 @@ import ScheduledEntranceOrchestratorJob from '../journey/ScheduledEntranceOrches
import CampaignAbortJob from '../campaigns/CampaignAbortJob'
import MigrateJob from '../organizations/MigrateJob'
import UnsubscribeJob from '../subscriptions/UnsubscribeJob'
import CampaignProcessSendsJob from '../campaigns/CampaignProcessSendsJob'
import CampaignProcessGenerationJob from '../campaigns/CampaignProcessGenerationJob'
export const jobs = [
CampaignAbortJob,
CampaignGenerateListJob,
CampaignEnqueueSendJob,
CampaignInteractJob,
CampaignProcessSendsJob,
CampaignProcessGenerationJob,
CampaignStateJob,
EmailJob,
EventPostJob,
@ -41,7 +44,6 @@ export const jobs = [
ListStatsJob,
MigrateJob,
ProcessListsJob,
ProcessCampaignsJob,
PushJob,
ScheduledEntranceJob,
ScheduledEntranceOrchestratorJob,

View file

@ -1,4 +1,5 @@
import IORedis, { Redis } from 'ioredis'
import App from '../app'
export interface RedisConfig {
host: string
@ -21,24 +22,24 @@ export const DefaultRedis = ({ port, host, username, password, tls }: RedisConfi
})
}
export const cacheGet = async <T>(redis: Redis, key: string): Promise<T | undefined> => {
export const cacheGet = async <T>(key: string, redis = App.main.redis): Promise<T | undefined> => {
const value = await redis.get(key)
if (!value) return undefined
return JSON.parse(value) as T
}
export const cacheSet = async <T>(redis: Redis, key: string, value: T, ttl?: number) => {
export const cacheSet = async <T>(key: string, value: T, ttl?: number, redis = App.main.redis) => {
await redis.set(key, JSON.stringify(value))
if (ttl) {
await redis.expire(key, ttl)
}
}
export const cacheDel = async (redis: Redis, key: string) => {
export const cacheDel = async (key: string, redis = App.main.redis) => {
return await redis.del(key)
}
export const cacheIncr = async (redis: Redis, key: string, incr = 1, ttl?: number) => {
export const cacheIncr = async (key: string, incr = 1, ttl?: number, redis = App.main.redis) => {
const val = await redis.incrby(key, incr)
if (ttl) {
await redis.expire(key, ttl)
@ -46,7 +47,7 @@ export const cacheIncr = async (redis: Redis, key: string, incr = 1, ttl?: numbe
return val
}
export const cacheDecr = async (redis: Redis, key: string, ttl?: number) => {
export const cacheDecr = async (key: string, ttl?: number, redis = App.main.redis) => {
const val = await redis.decr(key)
if (ttl) {
await redis.expire(key, ttl)
@ -59,9 +60,9 @@ export type DataPair = {
value: string
}
export const cacheBatchHash = async (
redis: Redis,
hashKey: string,
pairs: DataPair[],
redis = App.main.redis,
): Promise<void> => {
const pipeline = redis.pipeline()
@ -78,10 +79,10 @@ export const cacheBatchHash = async (
export type HashScanCallback = (pairs: DataPair[]) => Promise<void> | void
export const cacheBatchReadHashAndDelete = async (
redis: Redis,
hashKey: string,
callback: HashScanCallback,
scanCount = 2500,
redis = App.main.redis,
): Promise<void> => {
let cursor = '0'
@ -112,11 +113,11 @@ export const cacheBatchReadHashAndDelete = async (
await redis.del(hashKey)
}
export const cacheBatchLength = async (redis: Redis, hashKey: string): Promise<number> => {
export const cacheBatchLength = async (hashKey: string, redis = App.main.redis): Promise<number> => {
return await redis.hlen(hashKey)
}
export const cacheHashExists = async (redis: Redis, hashKey: string): Promise<boolean> => {
export const cacheHashExists = async (hashKey: string, redis = App.main.redis): Promise<boolean> => {
const exists = await redis.exists(hashKey)
return exists !== 0
}

View file

@ -2,7 +2,6 @@ import { cleanupExpiredRevokedTokens } from '../auth/TokenRepository'
import { subDays, subHours } from 'date-fns'
import nodeScheduler from 'node-schedule'
import App from '../app'
import ProcessCampaignsJob from '../campaigns/ProcessCampaignsJob'
import JourneyDelayJob from '../journey/JourneyDelayJob'
import ProcessListsJob from '../lists/ProcessListsJob'
import CampaignStateJob from '../campaigns/CampaignStateJob'
@ -12,6 +11,7 @@ import ScheduledEntranceOrchestratorJob from '../journey/ScheduledEntranceOrches
import { acquireLock } from '../core/Lock'
import Queue from '../queue'
import CampaignProcessSendsJob from '../campaigns/CampaignProcessSendsJob'
import CampaignProcessGenerationJob from '../campaigns/CampaignProcessGenerationJob'
export default (app: App, queue: Queue) => {
const scheduler = new Scheduler(app)
@ -19,7 +19,7 @@ export default (app: App, queue: Queue) => {
rule: '* * * * *',
callback: () => {
JourneyDelayJob.enqueueActive(app)
app.queue.enqueue(ProcessCampaignsJob.from())
app.queue.enqueue(CampaignProcessGenerationJob.from())
app.queue.enqueue(CampaignStateJob.from())
app.queue.enqueue(ScheduledEntranceOrchestratorJob.from())
},
@ -44,7 +44,7 @@ export default (app: App, queue: Queue) => {
queue.schedule(
CampaignProcessSendsJob,
'*/15 * * * *', // Every fifteen seconds
'*/15 * * * * *', // Every fifteen seconds
)
return scheduler

View file

@ -1,18 +1,13 @@
import App from '../app'
import { acquireLock, releaseLock } from '../core/Lock'
import { getProject } from '../projects/ProjectService'
import Job from '../queue/Job'
import { Rule } from '../rules/Rule'
import { User } from '../users/User'
import { UserEvent } from '../users/UserEvent'
import { getUserEventsForRules } from '../users/UserRepository'
import { shallowEqual } from '../utilities'
import { getEntranceSubsequentSteps, getJourneyStepChildren, getJourneySteps } from './JourneyRepository'
import { JourneyStep, JourneyStepChild, journeyStepTypes } from './JourneyStep'
import JourneyUserStep from './JourneyUserStep'
type JobOrJobFunc = Job | ((state: JourneyState) => Promise<Job | undefined>)
export class JourneyState {
/**
@ -21,46 +16,34 @@ export class JourneyState {
* @param user target user to run journey for
* @returns promise that resolves when processing ends
*/
public static async resume(entrance: number | JourneyUserStep, user?: User) {
public static async resume(entrance?: number | JourneyUserStep, user?: User) {
// find entrance
if (typeof entrance === 'number') {
entrance = (await JourneyUserStep.find(entrance))!
}
if (!entrance) {
return
}
// Find entrance
entrance = entrance instanceof JourneyUserStep
? entrance
: await JourneyUserStep.find(entrance)
if (!entrance) return
// If step isn't an entrance, find real entrance
if (entrance.entrance_id) {
entrance = (await JourneyUserStep.find(entrance.entrance_id))!
if (!entrance || entrance.entrance_id) {
return
}
entrance = await JourneyUserStep.find(entrance.entrance_id)
if (!entrance || entrance.entrance_id) return
}
// Entrance has already ended
if (entrance.ended_at) {
return
}
if (entrance.ended_at) return
// Find user
if (!user) {
user = await User.find(entrance.user_id)
}
if (!user) {
return
}
if (!user) user = await User.find(entrance.user_id)
if (!user) return
// User-entrance mismatch
if (entrance.user_id !== user.id) {
return
}
if (entrance.user_id !== user.id) return
// Acquire lock to prevent multiple simultaneous runs
const key = `journey:entrance:${entrance.id}`
const acquired = await acquireLock({ key })
if (!acquired) {
return
}
if (!acquired) return
// Load all journey dependencies
const [steps, children, userSteps] = await Promise.all([
@ -80,11 +63,7 @@ export class JourneyState {
}
// Load step dependencies once and cache in state
private _events?: UserEvent[]
private _timezone?: string
// Batch enqueue jobs after processing
private _jobs: JobOrJobFunc[] = []
#timezone?: string
constructor(
public readonly entrance: JourneyUserStep,
@ -103,7 +82,7 @@ export class JourneyState {
if (userStep.step_id !== step.id) {
// create a placeholder for new step
// Create a placeholder for new step
this.userSteps.push(userStep = JourneyUserStep.fromJson({
journey_id: this.entrance.journey_id,
entrance_id: this.entrance.id,
@ -113,7 +92,7 @@ export class JourneyState {
}))
}
// continue on if this step is completed
// Continue on if this step is completed
if (userStep.type === 'completed') {
step = await this.nextOrEnd(step)
continue
@ -121,14 +100,16 @@ export class JourneyState {
const copy = { ...userStep }
// delegate to step type
// Delegate to step type
let processData: Record<string, any> | undefined
try {
await step.process(this, userStep)
const data = await step.process(this, userStep)
if (data) processData = data
} catch (err) {
userStep.type = 'error'
}
// persist and update the user step
// Persist and update the user step
if (userStep.id) {
// only update the step is something has changed
if (!shallowEqual(copy, userStep)) {
@ -138,6 +119,9 @@ export class JourneyState {
userStep.parseJson(await JourneyUserStep.insertAndFetch(userStep))
}
// Process post actions
await step.postProcess(this, userStep, processData)
// Stop processing if latest isn't completed
if (userStep.type !== 'completed') {
// Exit journey completely if a catastrophic error
@ -148,19 +132,6 @@ export class JourneyState {
break
}
}
if (this._jobs.length) {
const jobs: Job[] = []
for (let j of this._jobs) {
if (typeof j === 'function') {
const i = await j(this)
if (!i) continue
j = i
}
jobs.push(j)
}
await App.main.queue.enqueueBatch(jobs)
}
}
private async nextOrEnd(step: JourneyStep) {
@ -170,7 +141,7 @@ export class JourneyState {
const step = this.steps.find(s => s.id === stepId)
if (step) {
if (this.userSteps.find(s => s.step_id === step.id)) {
// circular reference, this step has already visited
// Circular reference, this step has already visited
await this.end()
return
}
@ -191,23 +162,19 @@ export class JourneyState {
return this.children.filter(sc => sc.step_id === stepId)
}
public job(job: JobOrJobFunc) {
this._jobs.push(job)
}
public async events(rule: Rule) {
// TODO: Find a way to not have to pull in all events, better discern
return await getUserEventsForRules(this.user.id, rule)
}
public async timezone() {
if (!this._timezone) {
this._timezone = this.user.timezone
if (!this.#timezone) {
this.#timezone = this.user.timezone
}
if (!this._timezone) {
this._timezone = (await getProject(this.user.project_id))!.timezone
if (!this.#timezone) {
this.#timezone = (await getProject(this.user.project_id))!.timezone
}
return this._timezone!
return this.#timezone!
}
public stepData() {

View file

@ -16,6 +16,8 @@ import { exitUserFromJourney, getJourneyUserStepByExternalId } from './JourneyRe
import JourneyUserStep from './JourneyUserStep'
import Journey from './Journey'
type JourneyStepProcessData = Record<string, any>
export class JourneyStepChild extends Model {
step_id!: number
@ -58,10 +60,15 @@ export class JourneyStep extends Model {
return this.data_key ?? this.id.toString()
}
async process(state: JourneyState, userStep: JourneyUserStep): Promise<void> {
async process(state: JourneyState, userStep: JourneyUserStep): Promise<JourneyStepProcessData | void> {
userStep.type = 'completed'
}
// eslint-disable-next-line @typescript-eslint/no-unused-vars
async postProcess(state: JourneyState, userStep: JourneyUserStep, processData: JourneyStepProcessData | undefined): Promise<void> {
return Promise.resolve()
}
async next(state: JourneyState): Promise<undefined | number> {
return state.childrenOf(this.id)[0]?.child_id
}
@ -302,7 +309,7 @@ export class JourneyAction extends JourneyStep {
this.campaign_id = json?.data?.campaign_id
}
async process(state: JourneyState, userStep: JourneyUserStep): Promise<void> {
async process(state: JourneyState, userStep: JourneyUserStep): Promise<JourneyStepProcessData | void> {
const campaign = await getCampaign(this.campaign_id, state.user.project_id)
@ -324,24 +331,27 @@ export class JourneyAction extends JourneyStep {
userStep.type = 'action'
// defer job construction so that we have the journey_user_step.id value
state.job(async () => {
const campaignSend = await getCampaignSend(campaign.id, state.user.id, `${userStep.id}`)
return { campaign }
}
const send = triggerCampaignSend({
async postProcess(state: JourneyState, userStep: JourneyUserStep, processData: JourneyStepProcessData | undefined): Promise<void> {
if (!processData || userStep.type === 'completed') return
const { campaign } = processData || {}
const campaignSend = await getCampaignSend(campaign.id, state.user.id, `${userStep.id}`)
try {
const job = await triggerCampaignSend({
campaign,
user: state.user,
exists: !!campaignSend,
reference_id: `${userStep.id}`,
reference_type: 'journey',
})
if (!send) {
userStep.type = 'error'
}
return send
})
if (job) await job.queue()
} catch {
userStep.type = 'error'
}
}
}

View file

@ -87,6 +87,9 @@ export default class Provider extends Model {
rateLimit(id: number, period: RateInterval = 'minute') {
return `providers:${id}:ratelimit:${period}`
},
consumed(id: number) {
return `providers:${id}:consumed`
},
}
}

View file

@ -5,6 +5,7 @@ import { JSONSchemaType, validate } from '../core/validate'
import Provider, { ProviderControllers, ProviderGroup, ProviderMeta, ProviderParams } from './Provider'
import { createProvider, getProvider, loadProvider, updateProvider } from './ProviderRepository'
import App from '../app'
import { cacheGet } from '../config/redis'
export const allProviders = async (projectId: number) => {
return await Provider.all(qb => qb.where('project_id', projectId).whereNull('deleted_at'))
@ -31,6 +32,32 @@ export const archiveProvider = async (id: number, projectId: number) => {
return getProvider(id, projectId)
}
export const sendsAvailable = (provider: Provider): number | undefined => {
// Number of seconds between bulk sends
const interval = 15
// If there is no rate limit, return undefined (unlimited)
if (!provider.rate_limit) return undefined
// Return number of sends available over interval
return provider.ratePer('second') * interval
}
export const canSendImmediately = async (providerId: number): Promise<boolean> => {
// Get the provider
const provider = await getProvider(providerId)
if (!provider) return false
// Check is there are sends available (undefined means unlimited)
const available = sendsAvailable(provider)
if (!available) return true
// Compare to the number of sends already consumed over time period
const consumed = await cacheGet<number>(Provider.cacheKey.consumed(provider.id)) || 0
return available > consumed
}
export const loadController = (routers: ProviderControllers, provider: typeof Provider): ProviderMeta => {
const { admin: adminRouter, public: publicRouter } = provider.controllers()
if (routers.admin && adminRouter) {

View file

@ -2,7 +2,6 @@ import { WebhookTemplate } from '../../render/Template'
import { Variables } from '../../render'
import { WebhookProvider } from './WebhookProvider'
import { WebhookResponse } from './Webhook'
import App from '../../app'
import { cacheGet, cacheSet } from '../../config/redis'
export default class WebhookChannel {
@ -18,16 +17,15 @@ export default class WebhookChannel {
async send(template: WebhookTemplate, variables: Variables): Promise<WebhookResponse> {
const message = template.compile(variables)
const redis = App.main.redis
// If we have a cache key, check cache first
if (message.cacheKey?.length) {
const key = `wh:${variables.context.campaign_id}:${message.cacheKey}`
const value = await cacheGet<WebhookResponse>(redis, key)
const value = await cacheGet<WebhookResponse>(key)
if (value) return value
const response = await this.provider.send(message)
await cacheSet(redis, key, response, 3600)
await cacheSet(key, response, 3600)
return response
}

View file

@ -1,4 +1,3 @@
import { Scheduler } from '../config/scheduler'
import { sleep, uuid } from '../utilities'
import Job from './Job'
import Queue, { QueueTypeConfig } from './Queue'
@ -32,7 +31,7 @@ export default class MemoryQueueProvider implements QueueProvider {
for (const job of jobs) this.enqueue(job)
}
async schedule(job: typeof Job, cron: string): Promise<void> {
async schedule(_job: typeof Job, _cron: string): Promise<void> {
throw new Error('MemoryQueueProvider does not support scheduling jobs.')
}

View file

@ -29,6 +29,7 @@ export default class Queue {
async dequeue(job: EncodedJob): Promise<boolean> {
if (!job || !job.name) return false
const handler = this.jobs[job.name]
if (!handler) {
App.main.error.notify(new Error(`No handler found for job: ${job.name}`))
@ -73,8 +74,8 @@ export default class Queue {
this.jobs[job.$name] = job.handler
}
schedule(job: typeof Job, cron: string) {
this.provider.schedule(job, cron)
async schedule(job: typeof Job, cron: string) {
await this.provider.schedule(job, cron)
}
async started(job: EncodedJob) {

View file

@ -57,7 +57,7 @@ export default class RedisQueueProvider implements QueueProvider {
async schedule(job: typeof Job, cron: string): Promise<void> {
await this.bull.upsertJobScheduler(
job.name,
job.$name,
{ pattern: cron },
{
name: job.$name,
@ -114,6 +114,7 @@ export default class RedisQueueProvider implements QueueProvider {
this.worker = new Worker('parcelvoy', async (job, token) => {
await this.queue.dequeue({
...job.data,
name: job.name ?? job.data.name,
options: {
...job.data.options,
jobId: job.id,

View file

@ -34,19 +34,21 @@ router.get('/.well-known/:file', async ctx => {
const organization = ctx.state.organization
const url = organization?.tracking_deeplink_mirror_url
const file = ctx.params.file
if (!url) {
const whitelist = ['apple-app-site-association', 'assetlinks.json']
if (!url || !whitelist.includes(file)) {
ctx.status = 404
return
}
const key = `well-known:${organization.id}:${file}`
const value = await cacheGet<any>(App.main.redis, key)
const value = await cacheGet<any>(key)
if (value) {
ctx.body = value
} else {
const response = await fetch(`${url}/.well-known/${file}`)
const value = await response.json()
await cacheSet(App.main.redis, key, value, 60 * 60 * 5)
await cacheSet(key, value, 60 * 60 * 5)
ctx.body = value
}
})

View file

@ -14,7 +14,7 @@ const CacheKeys = {
*/
export const fetchAndCompileRule = async (rootId: number): Promise<RuleTree> => {
const cache = await cacheGet<RuleTree>(App.main.redis, CacheKeys.ruleTree(rootId))
const cache = await cacheGet<RuleTree>(CacheKeys.ruleTree(rootId))
if (cache) return cache
const root = await App.main.db('rule').where('id', rootId).first()
@ -22,7 +22,7 @@ export const fetchAndCompileRule = async (rootId: number): Promise<RuleTree> =>
const rules = await App.main.db('rule').where('root_uuid', root!.uuid)
const compiled = compileRule(root, rules)
await cacheSet(App.main.redis, CacheKeys.ruleTree(rootId), compiled, 3600)
await cacheSet(CacheKeys.ruleTree(rootId), compiled, 3600)
return compiled
}

View file

@ -380,7 +380,6 @@ export const subscriptionUpdateSchema: JSONSchemaType<SubscriptionUpdateParams>
}
router.patch('/:subscriptionId', async ctx => {
const payload = validate(subscriptionUpdateSchema, ctx.request.body)
console.log(payload)
ctx.body = await updateSubscription(ctx.state.subscription!.id, payload)
})

View file

@ -41,6 +41,11 @@ export const getUserSubscriptionState = async (user: User | number, subscription
return fetchedUser?.subscriptionState(subscriptionId)
}
export const isUserUnsubscribed = async (user: User | number, subscriptionId: number): Promise<boolean> => {
const state = await getUserSubscriptionState(user, subscriptionId)
return state === SubscriptionState.unsubscribed
}
export const allSubscriptions = async (projectId: number, channels?: ChannelType[]) => {
return await Subscription.all(
qb => {

View file

@ -1,5 +1,4 @@
import { Chunker } from '../utilities'
import App from '../app'
import { logger } from '../config/logger'
import { cacheBatchHash, cacheBatchLength, cacheBatchReadHashAndDelete, cacheDel, cacheGet, cacheHashExists, cacheSet, DataPair, HashScanCallback } from '../config/redis'
import { User } from './User'
@ -22,22 +21,21 @@ export const processUsers = async ({
afterCallback,
}: CachedQueryParams) => {
const redis = App.main.redis
const hashKey = cacheKey
const hashKeyReady = `${hashKey}:ready`
const hashExists = await cacheHashExists(redis, hashKey)
const isReady = await cacheGet(redis, hashKeyReady)
const hashExists = await cacheHashExists(hashKey)
const isReady = await cacheGet(hashKeyReady)
const processFromCache = async () => {
logger.info({
key: hashKey,
count: await cacheBatchLength(redis, hashKey),
count: await cacheBatchLength(hashKey),
}, 'users:generate:loading:started')
await cacheBatchReadHashAndDelete(redis, hashKey, callback)
await cacheBatchReadHashAndDelete(hashKey, callback)
await afterCallback?.()
await cacheDel(redis, hashKeyReady)
await cacheDel(redis, hashKey)
await cacheDel(hashKeyReady)
await cacheDel(hashKey)
logger.info({ key: hashKey }, 'users:generate:loading:finished')
}
@ -69,7 +67,7 @@ export const processUsers = async ({
let count = 0
const chunker = new Chunker<DataPair>(async pairs => {
count += pairs.length
await cacheBatchHash(redis, hashKey, pairs)
await cacheBatchHash(hashKey, pairs)
}, 2500)
// Stream the data from ClickHouse and pass it to the Redis chunker
@ -90,7 +88,7 @@ export const processUsers = async ({
const shouldContinue = await beforeCallback(count)
if (!shouldContinue) return
await cacheSet(redis, hashKeyReady, 1, 86400)
await cacheSet(hashKeyReady, 1, 86400)
// Now that we have results, pass them back to the callback
await processFromCache()

View file

@ -13,13 +13,13 @@ import MigrateJob from '../organizations/MigrateJob'
export const migrateToClickhouse = async () => {
const jobs = []
const shouldMigrateUsers = await cacheGet<boolean>(App.main.redis, 'migration:users') ?? false
const shouldMigrateUsers = await cacheGet<boolean>('migration:users') ?? false
if (shouldMigrateUsers) jobs.push(MigrateJob.from({ type: 'users' }).jobId('migrate_users'))
const shouldMigrateEvents = await cacheGet<boolean>(App.main.redis, 'migration:events') ?? false
const shouldMigrateEvents = await cacheGet<boolean>('migration:events') ?? false
if (shouldMigrateEvents) jobs.push(MigrateJob.from({ type: 'events' }).jobId('migrate_events'))
const shouldMigrateLists = await cacheGet<boolean>(App.main.redis, 'migration:lists') ?? false
const shouldMigrateLists = await cacheGet<boolean>('migration:lists') ?? false
if (shouldMigrateLists) jobs.push(MigrateJob.from({ type: 'lists' }).jobId('migrate_lists'))
await App.main.queue.enqueueBatch(jobs)
}
@ -60,7 +60,7 @@ export const migrateUsers = async (since?: Date, id?: number) => {
await chunker.flush()
logger.info('parcelvoy:migration users finished')
await cacheDel(App.main.redis, 'migration:users')
await cacheDel('migration:users')
}
export const migrateEvents = async (since?: Date) => {
@ -85,7 +85,7 @@ export const migrateEvents = async (since?: Date) => {
await chunker.flush()
logger.info('parcelvoy:migration events finished')
await cacheDel(App.main.redis, 'migration:events')
await cacheDel('migration:events')
}
export const migrateStaticList = async ({ id, project_id }: List) => {
@ -134,5 +134,5 @@ export const migrateLists = async () => {
}
logger.info('parcelvoy:migration lists finished')
await cacheDel(App.main.redis, 'migration:lists')
await cacheDel('migration:lists')
}