feat: add cleanup of journey user steps

This commit is contained in:
Chris Anderson 2025-07-07 06:01:02 -07:00
parent 25658f710f
commit af9ca324c7
7 changed files with 98 additions and 18 deletions

View file

@ -0,0 +1,12 @@
exports.up = async function(knex) {
await knex.schema.table('journey_user_step', function(table) {
table.enum('data_state', ['active', 'available', 'cleared']).defaultTo('active').notNullable()
table.index('data_state')
})
}
exports.down = async function(knex) {
await knex.schema.table('journey_user_step', function(table) {
table.dropColumn('data_state')
})
}

View file

@ -0,0 +1,27 @@
import { Job } from '../queue'
import JourneyUserStep from './JourneyUserStep'
interface JourneyCleanupJobParams {
journey_id: number
}
export default class JourneyCleanupJob extends Job {
static $name = 'journey_delay_job'
static from(journey_id: number) {
return new JourneyCleanupJob({ journey_id })
}
static async handler({ journey_id }: JourneyCleanupJobParams) {
if (!journey_id) return
// Clean up the data for old user steps
await JourneyUserStep.query()
.where('journey_id', journey_id)
.where('data_state', 'available')
.update({
data_state: 'cleared',
data: null,
})
}
}

View file

@ -5,7 +5,7 @@ import { searchParamsSchema } from '../core/searchParams'
import { JSONSchemaType, validate } from '../core/validate'
import { extractQueryParams } from '../utilities'
import Journey, { JourneyEntranceTriggerParams, JourneyParams } from './Journey'
import { createJourney, getJourneyStepMap, getJourney, pagedJourneys, setJourneyStepMap, updateJourney, pagedEntrancesByJourney, getEntranceLog, pagedUsersByStep, archiveJourney, deleteJourney, exitUserFromJourney, publishJourney } from './JourneyRepository'
import { createJourney, getJourneyStepMap, getJourney, pagedJourneys, setJourneyStepMap, updateJourney, pagedEntrancesByJourney, getEntranceLog, pagedUsersByStep, archiveJourney, deleteJourney, exitUserFromJourney, publishJourney, getEntrancesForUser } from './JourneyRepository'
import { JourneyStep, JourneyStepMapParams, journeyStepTypes, toJourneyStepMap } from './JourneyStep'
import JourneyUserStep from './JourneyUserStep'
import { User } from '../users/User'
@ -193,7 +193,11 @@ router.get('/:journeyId/entrances', async ctx => {
router.delete('/:journeyId/entrances/:entranceId/users/:userId', async ctx => {
const user = await getUserFromContext(ctx)
if (!user) return ctx.throw(404)
const results = await exitUserFromJourney(user.id, parseInt(ctx.params.entranceId), ctx.state.journey!.id)
const results = await exitUserFromJourney(
user.id,
parseInt(ctx.params.entranceId),
ctx.state.journey!.id,
)
ctx.body = { exits: results }
})
@ -210,13 +214,12 @@ router.get('/:journeyId/steps/:stepId/users', async ctx => {
router.delete('/:journeyId/users/:userId', async ctx => {
const user = await getUserFromContext(ctx)
if (!user) return ctx.throw(404)
const results = await JourneyUserStep.update(
q => q.where('user_id', user.id)
.whereNull('entrance_id')
.whereNull('ended_at')
.where('journey_id', ctx.state.journey!.id),
{ ended_at: new Date() },
)
const entrances = await getEntrancesForUser(user.id, ctx.state.journey!.id)
let results = 0
for (const entrance of entrances) {
results += await exitUserFromJourney(user.id, entrance.id, ctx.state.journey!.id)
}
ctx.body = { exits: results }
})

View file

@ -247,6 +247,14 @@ export const setJourneyStepMap = async (journey: Journey, stepMap: JourneyStepMa
})
}
export const getEntrancesForUser = async (userId: number, journeyId: number) => {
return await JourneyUserStep.all(q => q
.where('journey_id', journeyId)
.where('user_id', userId)
.whereNull('entrance_id'),
)
}
export const getEntranceSubsequentSteps = async (entranceId: number) => {
return JourneyUserStep.all(q => q
.where('entrance_id', entranceId)
@ -347,11 +355,28 @@ export const getJourneyUserStepByExternalId = async (journeyId: number, userId:
}
export const exitUserFromJourney = async (userId: number, entranceId: number, journeyId: number) => {
await JourneyUserStep.update(
// Exit the entrance itself
const results = await JourneyUserStep.update(
q => q.where('user_id', userId)
.where('id', entranceId)
.whereNull('ended_at')
.where('journey_id', journeyId),
{ ended_at: new Date() },
.where('journey_id', journeyId)
.whereNull('ended_at'),
{
ended_at: new Date(),
data_state: 'available',
},
)
// Exit all steps referencing the entrance
await JourneyUserStep.update(
q => q.where('user_id', userId)
.where('entrance_id', entranceId)
.where('journey_id', journeyId),
{
data_state: 'available',
},
)
return results
}

View file

@ -7,7 +7,7 @@ import { User } from '../users/User'
import { UserEvent } from '../users/UserEvent'
import { getUserEventsForRules } from '../users/UserRepository'
import { shallowEqual } from '../utilities'
import { getEntranceSubsequentSteps, getJourneyStepChildren, getJourneySteps } from './JourneyRepository'
import { exitUserFromJourney, getEntranceSubsequentSteps, getJourneyStepChildren, getJourneySteps } from './JourneyRepository'
import { JourneyGate, JourneyStep, JourneyStepChild, journeyStepTypes } from './JourneyStep'
import JourneyUserStep from './JourneyUserStep'
@ -182,9 +182,11 @@ export class JourneyState {
}
private async end() {
await JourneyUserStep.update(q => q.where('id', this.entrance.id), {
ended_at: new Date(),
})
await exitUserFromJourney(
this.user.id,
this.entrance.id,
this.entrance.journey_id,
)
}
public childrenOf(stepId: number) {

View file

@ -1,6 +1,8 @@
import Model from '../core/Model'
import { type JourneyStep } from './JourneyStep'
type JourneyUserStepDataState = 'active' | 'available' | 'cleared'
export default class JourneyUserStep extends Model {
user_id!: number
type!: string
@ -11,6 +13,7 @@ export default class JourneyUserStep extends Model {
ended_at?: Date
data?: Record<string, unknown> | null
ref?: string
data_state?: JourneyUserStepDataState
step?: JourneyStep

View file

@ -3,6 +3,7 @@ import { Job } from '../queue'
import Journey from './Journey'
import App from '../app'
import JourneyStatsJob from './JourneyStatsJob'
import JourneyCleanupJob from './JourneyCleanupJob'
export default class UpdateJourneysJob extends Job {
static $name = 'update_journeys_job'
@ -12,7 +13,14 @@ export default class UpdateJourneysJob extends Job {
const { db, queue } = App.main
await chunk<Journey>(Journey.query(db), queue.batchSize, async journeys => {
queue.enqueueBatch(journeys.map(({ id }) => JourneyStatsJob.from(id)))
const steps = []
for (const journey of journeys) {
steps.push(
JourneyCleanupJob.from(journey.id),
JourneyStatsJob.from(journey.id),
)
}
queue.enqueueBatch(steps)
})
}
}