Initial work on event idempotency

This commit is contained in:
Chris Anderson 2025-01-19 10:25:10 -06:00
parent 729397907d
commit 1034b706e4
12 changed files with 60 additions and 16 deletions

View file

@ -0,0 +1,17 @@
exports.up = async function(knex) {
await knex.schema.alterTable('user_events', function(table) {
table.string('distinct_id')
})
// Splitting into second operation to allow for instant add
// and less table locking
await knex.schema.alterTable('user_events', function(table) {
table.unique('distinct_id')
})
}
exports.down = async function(knex) {
await knex.schema.alterTable('user_events', function(table) {
table.dropColumn('distinct_id')
})
}

View file

@ -27,11 +27,12 @@ export default class CampaignTriggerSendJob extends Job {
const campaign = await getCampaign(campaign_id, project_id)
if (!campaign) return
const { user: { id: userId }, event: { id: eventId } } = await EventPostJob.from({
const response = await EventPostJob.from({
project_id,
event: {
name: 'campaign_trigger',
external_id: user.external_id,
distinct_id: uuid(),
data: {
...event,
campaign: { id: campaign_id, name: campaign.name },
@ -39,6 +40,9 @@ export default class CampaignTriggerSendJob extends Job {
user: { external_id, email, phone, data, locale, timezone },
},
}).handle<{ user: User, event: UserEvent }>()
if (!response.event) return
const { user: { id: userId }, event: { id: eventId } } = response
if (device_token) {
await UserDeviceJob.from({

View file

@ -27,6 +27,7 @@ export type ClientDeleteUsersRequest = string[]
export type ClientPostEvent = {
name: string
distinct_id?: string
data?: Record<string, unknown>
user?: ClientIdentifyParams
created_at?: Date
@ -53,6 +54,7 @@ export interface SegmentContext {
// https://segment.com/docs/connections/spec/common/
export type SegmentPostEvent = {
event: string
messageId: string
anonymousId: string
userId: string
previousId?: string

View file

@ -185,6 +185,10 @@ const postEventsRequest: JSONSchemaType<ClientPostEventsRequest> = {
type: 'string',
nullable: true,
},
distinct_id: {
type: 'string',
nullable: true,
},
data: {
type: 'object',
nullable: true,

View file

@ -7,6 +7,7 @@ import { matchingRulesForEvent } from '../rules/RuleService'
import { enterJourneysFromEvent } from '../journey/JourneyService'
import { UserPatchJob } from '../jobs'
import { User } from '../users/User'
import { uuid } from '../utilities'
interface EventPostTrigger {
project_id: number
@ -45,16 +46,21 @@ export default class EventPostJob extends Job {
// Create event for given user
const dbEvent = await createAndFetchEvent(user, {
name: event.name,
distinct_id: event.distinct_id ?? uuid(),
data: event.data || {},
}, forward)
const results = await matchingRulesForEvent(user, dbEvent)
if (dbEvent) {
const results = await matchingRulesForEvent(user, dbEvent)
// Check to see if a user has any lists
await updateUsersLists(user, results, dbEvent)
// Check to see if a user has any lists
await updateUsersLists(user, results, dbEvent)
// Enter any journey entrances associated with this event
await enterJourneysFromEvent(dbEvent, user)
// Enter any journey entrances associated with this event
await enterJourneysFromEvent(dbEvent, user)
return { user }
}
return { user, event: dbEvent }
}

View file

@ -103,6 +103,7 @@ router.post('/segment', async ctx => {
event: {
...identity,
name: event.event,
distinct_id: event.messageId,
data: { ...event.properties, ...event.context },
created_at: new Date(event.timestamp),
},

View file

@ -245,15 +245,16 @@ export default class Model {
}
}
static async insert<T extends typeof Model>(this: T, data: Partial<InstanceType<T>>, db?: Database): Promise<number>
static async insert<T extends typeof Model>(this: T, data: Partial<InstanceType<T>>[], db?: Database): Promise<number[]>
static async insert<T extends typeof Model>(this: T, data: Partial<InstanceType<T>>, db?: Database, query?: Query): Promise<number>
static async insert<T extends typeof Model>(this: T, data: Partial<InstanceType<T>>[], db?: Database, query?: Query): Promise<number[]>
static async insert<T extends typeof Model>(
this: T,
data: Partial<InstanceType<T>> | Partial<InstanceType<T>>[] = {},
db: Database = App.main.db,
query: Query = qb => qb,
): Promise<number | number[]> {
const formattedData = Array.isArray(data) ? data.map(o => this.formatJson(o)) : this.formatJson(data)
const value = await this.table(db).insert(formattedData)
const value = await query(this.table(db)).insert(formattedData)
if (Array.isArray(data)) return value
return value[0]
}

View file

@ -130,6 +130,9 @@ export const triggerEntrance = async (journey: Journey, payload: JourneyEntrance
},
}).handle<{ user: User, event: UserEvent }>()
// If no event because of idempotency, return
if (!event) return
// create new entrance
const entrance_id = await JourneyUserStep.insert({
journey_id: journey.id,

View file

@ -51,6 +51,7 @@ export default class SMTPEmailProvider extends EmailProvider {
port: this.port,
secure: this.secure,
auth: this.auth,
pool: true,
})
}

View file

@ -7,6 +7,7 @@ export interface TemplateEvent extends Record<string, any> {
export class UserEvent extends Model {
project_id!: number
user_id!: number
distinct_id?: string
name!: string
data!: Record<string, unknown>
@ -21,4 +22,4 @@ export class UserEvent extends Model {
}
}
export type UserEventParams = Pick<UserEvent, 'name' | 'data'>
export type UserEventParams = Pick<UserEvent, 'name' | 'distinct_id' | 'data'>

View file

@ -1,20 +1,23 @@
import App from '../app'
import { PageParams } from '../core/searchParams'
import { loadAnalytics } from '../providers/analytics'
import { User } from '../users/User'
import { uuid } from '../utilities'
import { UserEvent, UserEventParams } from './UserEvent'
export const createEvent = async (
user: User,
{ name, data }: UserEventParams,
{ name, distinct_id, data }: UserEventParams,
forward = true,
filter = (data: Record<string, unknown>) => data,
): Promise<number> => {
): Promise<number | undefined> => {
const id = await UserEvent.insert({
name,
data,
project_id: user.project_id,
user_id: user.id,
})
distinct_id: distinct_id ?? uuid(),
}, App.main.db, qb => qb.onConflict(['distinct_id']).ignore())
if (forward) {
const analytics = await loadAnalytics(user.project_id)
@ -28,10 +31,10 @@ export const createEvent = async (
return id
}
export const createAndFetchEvent = async (user: User, event: UserEventParams, forward = false): Promise<UserEvent> => {
export const createAndFetchEvent = async (user: User, event: UserEventParams, forward = false): Promise<UserEvent | undefined> => {
const id = await createEvent(user, event, forward)
const userEvent = await UserEvent.find(id)
return userEvent!
if (!id) return
return await UserEvent.find(id)
}
export const getUserEvents = async (id: number, params: PageParams, projectId: number) => {

View file

@ -147,6 +147,7 @@ An array containing at least one object with the following parameters:
- **name** string (optional) - The name of the event
- **anonymous_id** string
- **external_id** string
- **distinct_id** string (optional) - An unique identifier for the event to ensure idempotency
- **data** object (optional)
Either an anonymous or external ID is required in order to post an event.