Performance improvements for list import (#637)

This commit is contained in:
Chris Anderson 2025-03-04 10:28:07 -06:00 committed by GitHub
parent 6e1ef5f76c
commit 4fe9a25231
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 63 additions and 24 deletions

View file

@ -27,6 +27,7 @@ import ScheduledEntranceJob from '../journey/ScheduledEntranceJob'
import ScheduledEntranceOrchestratorJob from '../journey/ScheduledEntranceOrchestratorJob'
import ListRefreshJob from '../lists/ListRefreshJob'
import ListEvaluateUserJob from '../lists/ListEvaluateUserJob'
import UserListMatchJob from '../lists/UserListMatchJob'
export const jobs = [
CampaignGenerateListJob,
@ -53,6 +54,7 @@ export const jobs = [
UserAliasJob,
UserDeleteJob,
UserDeviceJob,
UserListMatchJob,
UserPatchJob,
UserSchemaSyncJob,
WebhookJob,

View file

@ -39,3 +39,4 @@ export class UserList extends Model {
export type ListUpdateParams = Pick<List, 'name' | 'tags'> & { rule?: RuleTree, published?: boolean }
export type ListCreateParams = ListUpdateParams & Pick<List, 'type' | 'is_visible'> & { rule?: RuleTree }
export type ListVersion = Pick<List, 'id' | 'version'>

View file

@ -1,7 +1,7 @@
import { UserEvent } from '../users/UserEvent'
import { User } from '../users/User'
import { check } from '../rules/RuleEngine'
import List, { DynamicList, ListCreateParams, ListProgress, ListUpdateParams, UserList } from './List'
import List, { DynamicList, ListCreateParams, ListProgress, ListUpdateParams, ListVersion, UserList } from './List'
import Rule, { RuleEvaluation, RuleTree } from '../rules/Rule'
import { PageParams } from '../core/searchParams'
import ListPopulateJob from './ListPopulateJob'
@ -21,9 +21,9 @@ import ListStatsJob from './ListStatsJob'
import { PassThrough } from 'stream'
export const CacheKeys = {
memberCount: (list: List) => `list:${list.id}:${list.version}:count`,
populationProgress: (list: List) => `list:${list.id}:${list.version}:progress`,
populationTotal: (list: Pick<List, 'id' | 'version'>) => `list:${list.id}:${list.version}:total`,
memberCount: (list: ListVersion) => `list:${list.id}:${list.version}:count`,
populationProgress: (list: ListVersion) => `list:${list.id}:${list.version}:progress`,
populationTotal: (list: ListVersion) => `list:${list.id}:${list.version}:total`,
}
export const pagedLists = async (params: PageParams, projectId: number) => {
@ -202,7 +202,7 @@ export const deleteList = async (id: number, projectId: number) => {
return await List.deleteById(id, qb => qb.where('project_id', projectId))
}
export const addUserToList = async (user: User | number, list: List, event?: UserEvent) => {
export const addUserToList = async (user: User | number, list: ListVersion, event?: UserEvent) => {
const userId = user instanceof User ? user.id : user
const resp = await UserList.query()
.insert({

View file

@ -0,0 +1,26 @@
import { Job } from '../queue'
import { matchingRulesForUser } from '../rules/RuleService'
import { getUser } from '../users/UserRepository'
import { updateUsersLists } from './ListService'
interface UserListMatchParams {
userId: number
projectId: number
}
export default class UserListMatchJob extends Job {
static $name = 'user_list_match_job'
static from(userId: number, projectId: number): UserListMatchJob {
return new this({ userId, projectId })
}
static async handler({ userId, projectId }: UserListMatchParams) {
const user = await getUser(userId, projectId)
if (!user) return
const results = await matchingRulesForUser(user)
await updateUsersLists(user, results)
}
}

View file

@ -19,7 +19,7 @@ export default class RedisQueueProvider implements QueueProvider {
bull: BullQueue
worker?: Worker
concurrency: number
batchSize = 60 as const
batchSize = 50 as const
queueName = 'parcelvoy' as const
constructor({ concurrency, ...config }: RedisQueueConfig, queue: Queue) {

View file

@ -5,15 +5,19 @@ import ListStatsJob from '../lists/ListStatsJob'
import { RequestError } from '../core/errors'
import App from '../app'
import { Chunker } from '../utilities'
import { getList } from '../lists/ListService'
export interface UserImport {
project_id: number
stream: FileStream
list_id?: number
list_id: number
}
export const importUsers = async ({ project_id, stream, list_id }: UserImport) => {
const list = await getList(list_id, project_id)
if (!list) return
const options: Options = {
columns: true,
cast: true,
@ -41,7 +45,10 @@ export const importUsers = async ({ project_id, stream, list_id }: UserImport) =
created_at,
},
options: {
join_list_id: list_id,
join_list: {
id: list.id,
version: list.version,
},
},
}))
}

View file

@ -1,15 +1,18 @@
import { User, UserInternalParams } from './User'
import { Job } from '../queue'
import { createUser, getUsersFromIdentity } from './UserRepository'
import { addUserToList, getList, updateUsersLists } from '../lists/ListService'
import { createUser, getUsersFromIdentity, isUserDirty } from './UserRepository'
import { addUserToList } from '../lists/ListService'
import { ClientIdentity } from '../client/Client'
import { matchingRulesForUser } from '../rules/RuleService'
import UserListMatchJob from '../lists/UserListMatchJob'
interface UserPatchTrigger {
project_id: number
user: UserInternalParams
options?: {
join_list_id?: number
join_list?: {
id: number
version: number
}
skip_list_updating?: boolean
}
}
@ -30,9 +33,6 @@ export default class UserPatchJob extends Job {
const { anonymous, external } = await getUsersFromIdentity(project_id, identity)
const existing = external ?? anonymous
// TODO: Utilize phone and email as backup identifiers
// to decrease the likelihood of future duplicates
// If user, update otherwise insert
try {
return existing
@ -57,21 +57,17 @@ export default class UserPatchJob extends Job {
const user = await upsert(patch)
const {
join_list_id,
join_list,
skip_list_updating = false,
} = patch.options ?? {}
// Use updated user to check for list membership
if (!skip_list_updating) {
const results = await matchingRulesForUser(user)
await updateUsersLists(user, results)
// Use updated user to check for dynamic list membership
if (!skip_list_updating && isUserDirty(patch.user)) {
await UserListMatchJob.from(user.id, user.project_id).queue()
}
// If provided a list to join, add user to it
if (join_list_id) {
const list = await getList(join_list_id, patch.project_id)
if (list) await addUserToList(user, list)
}
if (join_list) await addUserToList(user, join_list)
return user
}

View file

@ -198,3 +198,10 @@ export const getUserEventsForRules = async (
return qb
})
}
export const isUserDirty = (params: UserInternalParams) => {
const hasData = !!params.data && Object.keys(params.data).length > 0
const hasReserved = !!params.email || !!params.phone || !!params.timezone || !!params.locale || !!params.created_at
return hasData || hasReserved
}