Adds batching capability to all job queues (#101)

This commit is contained in:
Chris Anderson 2023-03-27 14:24:05 -05:00 committed by GitHub
parent bd472b9244
commit b920a27f9c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 81 additions and 7 deletions

View file

@ -7,6 +7,7 @@ import { aliasUser } from '../users/UserRepository'
import { ProjectState } from '../auth/AuthMiddleware'
import { projectMiddleware } from '../projects/ProjectController'
import UserPatchJob from '../users/UserPatchJob'
import { Job } from '../queue'
const router = new Router<ProjectState>()
router.use(projectMiddleware)
@ -63,6 +64,8 @@ const segmentEventsRequest: JSONSchemaType<SegmentPostEventsRequest> = {
router.post('/segment', async ctx => {
const events = validate(segmentEventsRequest, ctx.request.body)
let chunks: Job[] = []
for (const event of events) {
const identity = {
anonymous_id: event.anonymousId,
@ -72,7 +75,7 @@ router.post('/segment', async ctx => {
await aliasUser(ctx.state.project.id, identity)
} else if (event.type === 'identify') {
await App.main.queue.enqueue(UserPatchJob.from({
chunks.push(UserPatchJob.from({
project_id: ctx.state.project.id,
user: {
...identity,
@ -83,7 +86,7 @@ router.post('/segment', async ctx => {
}))
} else if (event.type === 'track') {
await App.main.queue.enqueue(EventPostJob.from({
chunks.push(EventPostJob.from({
project_id: ctx.state.project.id,
event: {
...identity,
@ -92,6 +95,18 @@ router.post('/segment', async ctx => {
},
}))
}
// Based on queue max batch size, process in largest chunks
// possible
if (chunks.length > App.main.queue.batchSize) {
await App.main.queue.enqueueBatch(chunks)
chunks = []
}
}
// Insert any remaining items
if (chunks.length > 0) {
await App.main.queue.enqueueBatch(chunks)
}
ctx.status = 204

View file

@ -38,7 +38,6 @@ const connect = (config: DatabaseConfig, withDB = true) => {
}
const migrate = async (config: DatabaseConfig, db: Database, fresh = false) => {
console.warn(fresh, config)
if (fresh) await db.raw(`CREATE DATABASE ${config.connection.database}`)
return db.migrate.latest({
directory: './db/migrations',

View file

@ -11,6 +11,7 @@ export default class MemoryQueueProvider implements QueueProvider {
queue: Queue
backlog: Job[] = []
loop: NodeJS.Timeout | undefined
batchSize = 1000 as const
constructor(queue: Queue) {
this.queue = queue
@ -21,6 +22,10 @@ export default class MemoryQueueProvider implements QueueProvider {
this.backlog.push(job)
}
async enqueueBatch(jobs: Job[]): Promise<void> {
this.backlog.push(...jobs)
}
start(): void {
if (process.env.NODE_ENV === 'test') return
if (this.loop) return

View file

@ -40,6 +40,14 @@ export default class Queue {
return await this.provider.enqueue(job)
}
async enqueueBatch(jobs: Job[]): Promise<void> {
return await this.provider.enqueueBatch(jobs)
}
get batchSize() {
return this.provider.batchSize
}
register(job: typeof Job) {
this.jobs[job.$name] = job.handler
}

View file

@ -5,6 +5,8 @@ export type QueueProviderName = 'sqs' | 'redis' | 'memory' | 'logger'
export default interface QueueProvider {
queue: Queue
batchSize: number
enqueue(job: Job): Promise<void>
enqueueBatch(jobs: Job[]): Promise<void>
close(): void
}

View file

@ -1,5 +1,6 @@
import { Queue as BullQueue, Worker } from 'bullmq'
import { logger } from '../config/logger'
import { batch } from '../utilities'
import Job from './Job'
import Queue, { QueueTypeConfig } from './Queue'
import QueueProvider from './QueueProvider'
@ -16,6 +17,7 @@ export default class RedisQueueProvider implements QueueProvider {
queue: Queue
bull: BullQueue
worker: Worker
batchSize = 50 as const
constructor(config: RedisConfig, queue: Queue) {
this.queue = queue
@ -31,13 +33,28 @@ export default class RedisQueueProvider implements QueueProvider {
async enqueue(job: Job): Promise<void> {
try {
await this.bull.add(job.name, job, {
const { name, data, opts } = this.adaptJob(job)
await this.bull.add(name, data, opts)
} catch (error) {
logger.error(error, 'sqs:error:enqueue')
}
}
async enqueueBatch(jobs: Job[]): Promise<void> {
for (const part of batch(jobs, this.batchSize)) {
await this.bull.addBulk(part.map(item => this.adaptJob(item)))
}
}
private adaptJob(job: Job) {
return {
name: job.name,
data: job,
opts: {
removeOnComplete: 50,
removeOnFail: 50,
delay: job.options.delay,
})
} catch (error) {
logger.error(error, 'sqs:error:enqueue')
},
}
}

View file

@ -2,6 +2,7 @@ import * as AWS from '@aws-sdk/client-sqs'
import { Consumer } from '@rxfork/sqs-consumer'
import { logger } from '../config/logger'
import { AWSConfig } from '../core/aws'
import { batch, uuid } from '../utilities'
import Job, { EncodedJob } from './Job'
import Queue, { QueueTypeConfig } from './Queue'
import QueueProvider from './QueueProvider'
@ -17,6 +18,7 @@ export default class SQSQueueProvider implements QueueProvider {
queue: Queue
app: Consumer
sqs: AWS.SQS
batchSize = 10 as const
constructor(config: SQSConfig, queue: Queue) {
this.queue = queue
@ -61,6 +63,24 @@ export default class SQSQueueProvider implements QueueProvider {
}
}
async enqueueBatch(jobs: Job[]): Promise<void> {
for (const part of batch(jobs, this.batchSize)) {
try {
const params = {
QueueUrl: this.config.queueUrl,
Entries: part.map(item => ({
Id: uuid(),
MessageBody: JSON.stringify(item),
DelaySeconds: item.options.delay,
})),
}
await this.sqs.sendMessageBatch(params)
} catch (error) {
logger.error(error, 'sqs:error:enqueue')
}
}
}
close(): void {
this.app.stop()
}

View file

@ -76,6 +76,14 @@ export const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve
export const removeKey = <T, O extends keyof T>(propKey: O, { [propKey]: propValue, ...rest }: T): Omit<T, O> => rest
export const batch = <T>(arr: T[], size: number) => {
const result: T[][] = []
for (let i = 0, len = arr.length; i < len; i += size) {
result.push(arr.slice(i, i + size))
}
return result
}
export function extractQueryParams<T extends Record<string, any>>(search: URLSearchParams | Record<string, undefined | string | string[]>, schema: JSONSchemaType<T>) {
return validate(schema, Object.entries<JSONSchemaType<any>>(schema.properties).reduce((a, [name, def]) => {
let values: string[]