chore: switch from database to redis locks (#688)

This commit is contained in:
Chris Anderson 2025-07-23 08:42:35 -04:00 committed by GitHub
parent a8f42ddd58
commit 51b38fd8ca
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 1943 additions and 1448 deletions

View file

@ -5,56 +5,55 @@
"main": "build/index.js",
"types": "build/index.d.ts",
"dependencies": {
"@aws-sdk/client-s3": "^3.171.0",
"@aws-sdk/client-ses": "^3.121.0",
"@aws-sdk/client-sqs": "^3.171.0",
"@aws-sdk/lib-storage": "^3.171.0",
"@bugsnag/js": "^8.2.0",
"@bugsnag/plugin-koa": "^8.2.0",
"@clickhouse/client": "~1.11.1",
"@aws-sdk/client-s3": "^3.850.0",
"@aws-sdk/client-ses": "^3.848.0",
"@aws-sdk/client-sqs": "^3.848.0",
"@aws-sdk/lib-storage": "^3.850.0",
"@bugsnag/js": "^8.4.0",
"@bugsnag/plugin-koa": "^8.4.0",
"@clickhouse/client": "~1.11.2",
"@koa/cors": "^5.0.0",
"@koa/router": "^11.0.1",
"@koa/router": "^11.0.2",
"@ladjs/country-language": "^1.0.3",
"@node-saml/node-saml": "^4.0.5",
"@segment/analytics-node": "^1.0.0-beta.23",
"@sentry/node": "^7.46.0",
"@sentry/utils": "^7.46.0",
"ajv": "^8.11.0",
"@segment/analytics-node": "^1.3.0",
"@sentry/node": "^7.120.3",
"@sentry/utils": "^7.120.3",
"ajv": "^8.17.1",
"ajv-errors": "^3.0.0",
"ajv-formats": "^2.1.1",
"bullmq": "^5.12.14",
"bullmq": "^5.56.5",
"busboy": "^1.6.0",
"csv-parse": "^5.3.3",
"date-fns": "^2.29.2",
"date-fns-tz": "^1.3.7",
"dotenv": "^16.0.1",
"eslint-plugin-import": "^2.29.0",
"csv-parse": "^5.6.0",
"date-fns": "^2.30.0",
"date-fns-tz": "^1.3.8",
"dotenv": "^16.6.1",
"eslint-plugin-import": "^2.32.0",
"eventemitter2": "^6.4.9",
"handlebars": "^4.7.7",
"handlebars": "^4.7.8",
"handlebars-utils": "^1.0.6",
"hashids": "^2.2.10",
"html-to-text": "^9.0.4",
"ioredis": "^5.3.1",
"hashids": "^2.3.0",
"html-to-text": "^9.0.5",
"ioredis": "^5.6.1",
"jsonpath": "^1.1.1",
"jsonwebtoken": "^9.0.0",
"knex": "^2.3.0",
"koa": "^2.13.4",
"jsonwebtoken": "^9.0.2",
"knex": "^2.5.1",
"koa": "^2.16.1",
"koa-body": "5.0.0",
"koa-send": "^5.0.1",
"koa-static": "^5.0.0",
"libphonenumber-js": "^1.10.24",
"mysql2": "^3.11.0",
"node-pushnotifications": "^3.0.0",
"node-schedule": "^2.1.0",
"nodemailer": "^6.9.5",
"libphonenumber-js": "^1.12.10",
"mysql2": "^3.14.2",
"node-pushnotifications": "^3.1.1",
"node-schedule": "^2.1.1",
"nodemailer": "^6.10.1",
"nodemailer-mailgun-transport": "^2.1.5",
"nodemailer-sendgrid": "^1.0.3",
"openid-client": "^5.2.1",
"pino": "^8.1.0",
"openid-client": "^5.7.1",
"pino": "^8.21.0",
"pino-pretty": "^8.1.0",
"posthog-node": "^3.1.2",
"rrule": "2.7.2",
"sqs-consumer": "^7.0.0"
"posthog-node": "^3.6.3",
"rrule": "2.7.2"
},
"scripts": {
"start": "nodemon",
@ -69,34 +68,34 @@
"migration:output": "node ./scripts/output-migration.mjs"
},
"devDependencies": {
"@types/busboy": "^1.5.0",
"@types/html-to-text": "^9.0.0",
"@types/jest": "^28.1.6",
"@types/jsonpath": "^0.2.0",
"@types/jsonwebtoken": "^9.0.5",
"@types/busboy": "^1.5.4",
"@types/html-to-text": "^9.0.4",
"@types/jest": "^28.1.8",
"@types/jsonpath": "^0.2.4",
"@types/jsonwebtoken": "^9.0.10",
"@types/koa": "^2.15.0",
"@types/koa-send": "^4.1.6",
"@types/koa-static": "^4.0.2",
"@types/koa__cors": "^3.3.0",
"@types/koa-static": "^4.0.4",
"@types/koa__cors": "^3.3.1",
"@types/koa__router": "^8.0.11",
"@types/node": "^18.15.11",
"@types/node-pushnotifications": "^1.0.4",
"@types/node-schedule": "^2.1.0",
"@types/nodemailer": "^6.4.4",
"@types/nodemailer-mailgun-transport": "^1.4.3",
"@types/nodemailer-sendgrid": "^1.0.1",
"@types/supertest": "^2.0.12",
"@typescript-eslint/eslint-plugin": "^5.30.5",
"@typescript-eslint/parser": "^5.30.5",
"eslint": "^8.19.0",
"eslint-config-standard": "^17.0.0",
"ioredis-mock": "^8.8.3",
"@types/node": "^18.19.120",
"@types/node-pushnotifications": "^1.0.8",
"@types/node-schedule": "^2.1.8",
"@types/nodemailer": "^6.4.17",
"@types/nodemailer-mailgun-transport": "^1.4.6",
"@types/nodemailer-sendgrid": "^1.0.3",
"@types/supertest": "^2.0.16",
"@typescript-eslint/eslint-plugin": "^5.62.0",
"@typescript-eslint/parser": "^5.62.0",
"eslint": "^8.57.1",
"eslint-config-standard": "^17.1.0",
"ioredis-mock": "^8.9.0",
"jest": "^28.1.3",
"nodemon": "^2.0.19",
"supertest": "^6.3.3",
"ts-jest": "^28.0.7",
"ts-node": "^10.8.2",
"typescript": "^4.9.3"
"nodemon": "^2.0.22",
"supertest": "^6.3.4",
"ts-jest": "^28.0.8",
"ts-node": "^10.9.2",
"typescript": "^4.9.5"
},
"packageManager": "pnpm@10.8.0+sha512.0e82714d1b5b43c74610193cb20734897c1d00de89d0e18420aebc5977fa13d780a9cb05734624e81ebd81cc876cd464794850641c48b9544326b5622ca29971"
}

View file

@ -65,7 +65,7 @@ export type CampaignPopulationProgress = {
export type SentCampaign = Campaign & { send_at: Date }
export type CampaignParams = Omit<Campaign, ModelParams | 'delivery' | 'eventName' | 'templates' | 'lists' | 'exclusion_lists' | 'subscription' | 'provider' | 'journeys' | 'deleted_at' | 'progress'>
export type CampaignParams = Omit<Campaign, ModelParams | 'delivery' | 'eventName' | 'templates' | 'lists' | 'exclusion_lists' | 'subscription' | 'provider' | 'journeys' | 'deleted_at' | 'progress' | 'isAborted' | 'isAbortedOrDraft'>
export type CampaignCreateParams = Omit<CampaignParams, 'state'>
export type CampaignUpdateParams = Omit<CampaignParams, 'channel' | 'type'>

View file

@ -24,7 +24,7 @@ export default (config: ClickhouseConfig) => {
wait_for_async_insert: 0,
async_insert_deduplicate: 1,
async_insert_busy_timeout_ms: 1000,
lightweight_deletes_sync: 0,
lightweight_deletes_sync: '0',
},
})
}

View file

@ -96,14 +96,6 @@ export default (type?: EnvType): Env => {
tls: process.env.REDIS_TLS === 'true',
},
queue: driver<QueueConfig>(process.env.QUEUE_DRIVER, {
sqs: () => ({
queueUrl: process.env.AWS_SQS_QUEUE_URL!,
region: process.env.AWS_REGION!,
credentials: {
accessKeyId: process.env.AWS_ACCESS_KEY_ID!,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY!,
},
}),
redis: () => ({
host: process.env.REDIS_HOST!,
port: envInt(process.env.REDIS_PORT, 6379),

View file

@ -44,3 +44,5 @@ export const acquireLock = async ({
export const releaseLock = async (key: string) => {
await App.main.redis.del(`lock:${key}`)
}
export class LockError extends Error {}

View file

@ -36,6 +36,10 @@ export default class MemoryQueueProvider implements QueueProvider {
await this.enqueue(job)
}
async retry(job: Job): Promise<void> {
await this.enqueue(job)
}
start(): void {
if (process.env.NODE_ENV === 'test') return
if (this.loop) return

View file

@ -6,9 +6,8 @@ import Job, { EncodedJob, JobError } from './Job'
import MemoryQueueProvider, { MemoryConfig } from './MemoryQueueProvider'
import QueueProvider, { MetricPeriod, QueueMetric, QueueProviderName } from './QueueProvider'
import RedisQueueProvider, { RedisQueueConfig } from './RedisQueueProvider'
import SQSQueueProvider, { SQSConfig } from './SQSQueueProvider'
export type QueueConfig = SQSConfig | RedisQueueConfig | MemoryConfig | LoggerConfig
export type QueueConfig = RedisQueueConfig | MemoryConfig | LoggerConfig
export interface QueueTypeConfig extends DriverConfig {
driver: QueueProviderName
@ -19,9 +18,7 @@ export default class Queue {
jobs: Record<string, (data: any, raw?: EncodedJob) => Promise<any>> = {}
constructor(config?: QueueConfig) {
if (config?.driver === 'sqs') {
this.provider = new SQSQueueProvider(config, this)
} else if (config?.driver === 'redis') {
if (config?.driver === 'redis') {
this.provider = new RedisQueueProvider(config, this)
} else if (config?.driver === 'memory') {
this.provider = new MemoryQueueProvider(this)
@ -63,6 +60,10 @@ export default class Queue {
await this.provider.delay(job, milliseconds)
}
async retry(job: EncodedJob) {
await this.provider.retry(job)
}
get batchSize() {
return this.provider.batchSize
}

View file

@ -1,7 +1,7 @@
import Queue from './Queue'
import { EncodedJob } from './Job'
export type QueueProviderName = 'sqs' | 'redis' | 'memory' | 'logger'
export type QueueProviderName = 'redis' | 'memory' | 'logger'
export interface Metric {
date: Date
@ -27,6 +27,7 @@ export default interface QueueProvider {
enqueue(job: EncodedJob): Promise<void>
enqueueBatch(jobs: EncodedJob[]): Promise<void>
delay(job: EncodedJob, milliseconds: number): Promise<void>
retry(job: EncodedJob): Promise<void>
start(): void
pause(): Promise<void>
resume(): Promise<void>

View file

@ -1,4 +1,4 @@
import { MetricsTime, Queue as BullQueue, Worker, JobsOptions, DelayedError } from 'bullmq'
import { MetricsTime, Queue as BullQueue, Worker, JobsOptions, DelayedError, WaitingError } from 'bullmq'
import { subMinutes } from 'date-fns'
import { logger } from '../config/logger'
import { batch } from '../utilities'
@ -72,6 +72,19 @@ export default class RedisQueueProvider implements QueueProvider {
throw new DelayedError()
}
async retry(job: EncodedJob): Promise<void> {
if (!job.options.jobId || !job.token) {
await this.enqueue(job)
return
}
const bullJob = await this.bull.getJob(job.options.jobId)
await bullJob?.moveToWait(job.token)
// Special error so the job is just moved
throw new WaitingError()
}
private adaptJob(job: EncodedJob): { name: string, data: any, opts: JobsOptions | undefined } {
return {
name: job.name,

View file

@ -1,141 +0,0 @@
import { Consumer } from 'sqs-consumer'
import { logger } from '../config/logger'
import { AWSConfig } from '../core/aws'
import { batch, uuid } from '../utilities'
import Job, { EncodedJob } from './Job'
import Queue, { QueueTypeConfig } from './Queue'
import QueueProvider from './QueueProvider'
import { SQS, Message } from '@aws-sdk/client-sqs'
export interface SQSConfig extends QueueTypeConfig, AWSConfig {
driver: 'sqs'
queueUrl: string
}
export default class SQSQueueProvider implements QueueProvider {
config: SQSConfig
queue: Queue
app?: Consumer
sqs: SQS
batchSize = 10 as const
constructor(config: SQSConfig, queue: Queue) {
this.queue = queue
this.config = config
this.sqs = new SQS({ region: this.config.region })
}
parse(message: Message): EncodedJob {
return JSON.parse(message.Body ?? '')
}
async enqueue(job: Job): Promise<void> {
try {
const params = {
DelaySeconds: job.options.delay,
MessageBody: JSON.stringify(job),
QueueUrl: this.config.queueUrl,
}
await this.sqs.sendMessage(params)
} catch (error) {
logger.error(error, 'sqs:error:enqueue')
}
}
async enqueueBatch(jobs: Job[]): Promise<void> {
// If provided array of jobs is larger than max supported
// batch by AWS, batch the batch
for (const part of batch(jobs, this.batchSize)) {
try {
const params = {
QueueUrl: this.config.queueUrl,
Entries: part.map(item => ({
Id: uuid(),
MessageBody: JSON.stringify(item),
DelaySeconds: item.options.delay,
})),
}
await this.sqs.sendMessageBatch(params)
} catch (error) {
logger.error(error, 'sqs:error:enqueue')
}
}
}
async delay(job: Job, milliseconds: number): Promise<void> {
// SQS Delay is in seconds, so convert milliseconds to seconds
// also, max delay is 15 minutes
const seconds = Math.min(Math.floor(milliseconds / 1000), 900)
job.options.delay = seconds
await this.enqueue(job)
}
start(): void {
const app = Consumer.create({
queueUrl: this.config.queueUrl,
handleMessage: async (message) => {
await this.queue.dequeue(this.parse(message))
return message
},
handleMessageBatch: async (messages) => {
// Map messages to job operations
const promises = messages.map(message =>
this.queue.dequeue(this.parse(message)),
)
// Execute each job and get results
const results = await Promise.allSettled(promises)
// Return the messages that have succeeded to awk them
return messages.reduce((acc: Message[], curr, index) => {
const status = results[index].status === 'fulfilled'
if (status) {
acc.push(curr)
}
return acc
}, [])
},
batchSize: this.batchSize,
sqs: this.sqs,
})
// Catches errors related to the queue / connection
app.on('error', (error, message) => {
if (Array.isArray(message)) {
message.forEach(message => this.queue.errored(error, this.parse(message)))
} else if (message) {
this.queue.errored(error, this.parse(message))
} else {
this.queue.errored(error)
}
})
// Catches errors related to the job
app.on('processing_error', (error) => {
logger.error({ error }, 'sqs:error:processing')
})
app.start()
this.app = app
}
pause(): Promise<void> {
this.app?.stop()
return Promise.resolve()
}
resume(): Promise<void> {
this.app?.start()
return Promise.resolve()
}
isRunning(): Promise<boolean> {
return Promise.resolve(this.app?.isRunning ?? false)
}
close(): void {
this.app?.stop()
}
}

View file

@ -2,12 +2,13 @@ import { Job } from '../queue'
import { toggleSubscription } from './SubscriptionService'
import { SubscriptionState } from './Subscription'
import { getUserFromClientId } from '../users/UserRepository'
import { ClientIdentity } from '../client/Client'
type UserUnsubscribeParams = {
external_id: string
project_id: number
subscription_id: number
}
} & ClientIdentity
export default class UnsubscribeJob extends Job {
static $name = 'unsubscribe'

View file

@ -1,11 +1,12 @@
import App from '../app'
import { User, UserInternalParams } from './User'
import { Job } from '../queue'
import { createUser, getUsersFromIdentity, updateUser } from './UserRepository'
import { ClientIdentity } from '../client/Client'
import { ListVersion } from '../lists/List'
import { addUserToList } from '../lists/ListService'
import App from '../app'
import { Transaction } from '../core/Model'
import { EncodedJob } from '../queue/Job'
import { LockError } from '../core/Lock'
interface UserPatchTrigger {
project_id: number
@ -23,36 +24,43 @@ export default class UserPatchJob extends Job {
return new this(data)
}
static async handler(patch: UserPatchTrigger): Promise<User> {
static async handler(patch: UserPatchTrigger, raw: EncodedJob): Promise<User> {
const upsert = async (patch: UserPatchTrigger, tries = 3, trx: Transaction): Promise<User> => {
const app = App.main
const upsert = async (patch: UserPatchTrigger, tries = 3): Promise<User> => {
const { project_id, user: { external_id, anonymous_id, data, ...fields } } = patch
const identity = { external_id, anonymous_id } as ClientIdentity
// Check for existing user
const { anonymous, external } = await getUsersFromIdentity(project_id, identity, trx)
const { anonymous, external } = await getUsersFromIdentity(project_id, identity)
const existing = external ?? anonymous
// If user, update otherwise insert
try {
return existing
? await updateUser(existing, patch.user, anonymous, trx)
? await updateUser(existing, patch.user, anonymous)
: await createUser(project_id, {
...identity,
data,
...fields,
}, trx)
})
} catch (error: any) {
// If record is locked, re-queue the job
if (error instanceof LockError) {
await app.queue.retry(raw)
throw error
}
// If there is an error (such as constraints,
// retry up to three times)
// retry inline up to three times)
if (tries <= 0) throw error
return upsert(patch, --tries, trx)
return upsert(patch, --tries)
}
}
const user = await App.main.db.transaction(async (trx) => {
return await upsert(patch, 1, trx)
})
const user = await upsert(patch)
const {
join_list,

View file

@ -13,6 +13,12 @@ import App from '../app'
import { Device, DeviceParams } from './Device'
import { getDeviceFromIdOrToken, markDevicesAsPushDisabled, userHasPushDevice } from './DeviceRepository'
import Project from '../projects/Project'
import { acquireLock, LockError, releaseLock } from '../core/Lock'
const CacheKeys = {
user: (id: number) => `user:${id}`,
userPatch: (id: number) => `lock:user:${id}:patch`,
}
export const getUser = async (id: number, projectId?: number, trx?: Transaction): Promise<User | undefined> => {
return await User.find(id, qb => {
@ -149,18 +155,35 @@ export const createUser = async (projectId: number, { external_id, anonymous_id,
return user
}
export const updateUser = async (existing: User, params: Partial<User>, anonymous?: User, trx?: Transaction): Promise<User> => {
const { external_id, anonymous_id, data, ...fields } = params
const hasChanges = isUserDirty(existing, params)
if (hasChanges) {
const patchUser = async (fields: Partial<User>, existing: User, trx?: Transaction) => {
// Create a lock to prevent concurrent updates
const key = CacheKeys.userPatch(existing.id)
const acquired = await acquireLock({ key, timeout: 300 })
if (!acquired) throw new LockError()
try {
const after = await User.updateAndFetch(existing.id, {
data: data ? { ...existing.data, ...data } : undefined,
...fields,
...!anonymous ? { anonymous_id } : {},
version: Date.now(),
}, trx)
await User.clickhouse().upsert(after, existing)
return after
} finally {
await releaseLock(key)
}
}
export const updateUser = async (existing: User, params: Partial<User>, anonymous?: User, trx?: Transaction): Promise<User> => {
const { external_id, anonymous_id, data, ...fields } = params
const hasChanges = isUserDirty(existing, params)
if (hasChanges) {
return await patchUser({
data: data ? { ...existing.data, ...data } : undefined,
...fields,
...!anonymous ? { anonymous_id } : {},
version: Date.now(),
}, existing, trx)
}
return existing
}
@ -268,11 +291,9 @@ export const disableNotifications = async (user: User, tokens: string[]): Promis
}
const updateUserDeviceState = async (user: User, hasPushDevice: boolean, trx?: Transaction): Promise<void> => {
const after = await User.updateAndFetch(user.id, {
await patchUser({
has_push_device: hasPushDevice,
version: Date.now(),
}, trx)
await User.clickhouse().upsert(after, user)
}, user, trx)
}
export const getUserEventsForRules = async (