List regeneration performance improvements (#625)

This commit is contained in:
Chris Anderson 2025-02-09 16:42:05 -06:00 committed by GitHub
parent be400ec40c
commit a9791ff1e2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 35 additions and 36 deletions

View file

@ -291,7 +291,7 @@ export const generateSendList = async (campaign: SentCampaign) => {
}
const query = recipientQuery(campaign)
await chunk<CampaignSendParams>(query, 100, async (items) => {
await chunk<CampaignSendParams>(query, 25, async (items) => {
await CampaignSend.query()
.insert(items)
.onConflict(['campaign_id', 'user_id', 'reference_id'])

View file

@ -18,6 +18,7 @@ import { RequestError } from '../core/errors'
import RuleError from '../rules/RuleError'
import ListEvaluateUserJob from './ListEvaluateUserJob'
import ListStatsJob from './ListStatsJob'
import { PassThrough } from 'stream'
export const CacheKeys = {
memberCount: (list: List) => `list:${list.id}:${list.version}:count`,
@ -250,15 +251,15 @@ interface UserListEventEvaluation {
interface UserListEvaluation {
list: List
scroll: AsyncGenerator<User[], any, any>
stream: PassThrough & AsyncIterable<unknown>
since?: Date | null
handleRule: (evaluation: UserListEventEvaluation) => Promise<void>
handleEntry: (user: User, result: boolean) => Promise<void>
}
const scrollUserListForEvaluation = async ({
const streamUserListForEvaluation = async ({
list,
scroll,
stream,
since,
handleRule,
handleEntry,
@ -267,33 +268,31 @@ const scrollUserListForEvaluation = async ({
const rule = await fetchAndCompileRule(list.rule_id) as RuleTree
const { eventRules, userRules } = splitRuleTree(rule)
for await (const users of scroll) {
// For each user, evaluate parts and batch enqueue
for await (const rawUser of stream) {
const user = User.fromJson(rawUser)
// For each user, evaluate parts and batch enqueue
for (const user of users) {
const parts: RuleWithEvaluationResult[] = []
const events = await getUserEventsForRules([user.id], eventRules, since)
const parts: RuleWithEvaluationResult[] = []
const events = await getUserEventsForRules([user.id], eventRules, since)
for (const rule of eventRules) {
const result = check({
user: user.flatten(),
events: events.map(e => e.flatten()),
}, rule)
await handleRule({
rule_id: rule.id!,
user_id: user.id,
result,
})
parts.push({
...rule,
result,
})
}
const result = checkRules(user, rule, [...parts, ...userRules])
await handleEntry(user, result)
for (const rule of eventRules) {
const result = check({
user: user.flatten(),
events: events.map(e => e.flatten()),
}, rule)
await handleRule({
rule_id: rule.id!,
user_id: user.id,
result,
})
parts.push({
...rule,
result,
})
}
const result = checkRules(user, rule, [...parts, ...userRules])
await handleEntry(user, result)
}
}
@ -406,12 +405,12 @@ export const refreshList = async (list: List, types: DateRuleTypes) => {
const { id } = list
await updateListState(id, { state: 'loading' })
const scroll = User.scroll(q =>
q.leftJoin('user_list', 'user_list.user_id', 'users.id')
.where('project_id', list.project_id)
.where('user_list.list_id', list.id)
.select('users.*'),
)
const stream = UserList.query()
.leftJoin('users', 'user_list.user_id', 'users.id')
.where('project_id', list.project_id)
.where('list_id', list.id)
.select('users.*')
.stream()
const userChunker = new Chunker<number>(async userIds => {
await UserList.delete(qb => qb.whereIn('user_id', userIds)
@ -419,9 +418,9 @@ export const refreshList = async (list: List, types: DateRuleTypes) => {
await cacheDecr(App.main.redis, CacheKeys.memberCount(list), userIds.length)
}, 50)
await scrollUserListForEvaluation({
await streamUserListForEvaluation({
list,
scroll,
stream,
since: types.value,
handleRule: async ({ rule_id, user_id, result }) => {
if (!result) {