diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml deleted file mode 100644 index eb32a85b..00000000 --- a/.github/workflows/publish.yml +++ /dev/null @@ -1,33 +0,0 @@ -name: Publish Packages - -on: - push: - tags: - - "v*.*.*" - -jobs: - build-package: - name: "Publish to GitHub Packages" - runs-on: ubuntu-latest - permissions: - contents: read - packages: write - if: github.repository_owner == 'parcelvoy' - steps: - - uses: actions/checkout@v3 - - name: Use Node.js 20.x - uses: actions/setup-node@v4 - with: - node-version: 20.x - registry-url: https://registry.npmjs.org - - name: Cache NPM - uses: actions/cache@v4 - with: - path: ~/.npm - key: ${{ runner.os }}-node-${{ hashFiles('**/package-lock.json') }} - restore-keys: | - ${{ runner.os }}-node- - - run: npm ci - - run: npm run package:publish --tag=$(echo ${GITHUB_REF_NAME:1}) - env: - NODE_AUTH_TOKEN: ${{secrets.NPM_TOKEN}} \ No newline at end of file diff --git a/apps/platform/package.json b/apps/platform/package.json index 019abfc9..01071c2c 100644 --- a/apps/platform/package.json +++ b/apps/platform/package.json @@ -65,6 +65,7 @@ "docker:build": "docker buildx build -f ./Dockerfile -t ghcr.io/parcelvoy/api:latest -t ghcr.io/parcelvoy/api:$npm_config_tag ../../", "docker:build:push": "npm run docker:build -- --push", "migration:create": "node ./scripts/create-migration.mjs", + "migration:output": "node ./scripts/output-migration.mjs", "package:publish": "npm run build && npm version $npm_config_tag --no-git-tag-version && npm pack && npm publish --access public" }, "devDependencies": { diff --git a/apps/platform/scripts/output-migration.mjs b/apps/platform/scripts/output-migration.mjs new file mode 100644 index 00000000..1ccc94c0 --- /dev/null +++ b/apps/platform/scripts/output-migration.mjs @@ -0,0 +1,49 @@ +import knex from 'knex' + +const connection = knex({ + client: process.env.DB_CLIENT ?? 'mysql2', + connection: { + host: process.env.DB_HOST, + user: process.env.DB_USERNAME, + password: process.env.DB_PASSWORD, + port: process.env.DB_PORT, + database: process.env.DB_DATABASE, + }, +}) + +const name = process.argv[2] +if (!name) { + console.log('migration: please include a migration to output') + process.exit(9) +} + +const logRaw = (sql) => { + const end = sql.charAt(sql.length - 1) + console.log(end !== ';' ? sql + ';' : sql) +} + +const migration = await import(`../db/migrations/${name}`) +const method = (type) => { + return (name, query) => { + const schema = connection.schema + const result = schema[type](name, query) + logRaw(result.toString()) + return options.schema + } +} + +const options = { + schema: { + table: method('table'), + createTable: method('createTable'), + alterTable: method('alterTable'), + dropTable: method('dropTable'), + }, + raw: (query) => logRaw(query), +} + +console.log('up') +migration.up(options).then(() => { + console.log('down') + migration.down(options) +}) diff --git a/apps/platform/src/campaigns/Campaign.ts b/apps/platform/src/campaigns/Campaign.ts index dfdc69fb..ee9005f4 100644 --- a/apps/platform/src/campaigns/Campaign.ts +++ b/apps/platform/src/campaigns/Campaign.ts @@ -8,7 +8,7 @@ import { crossTimezoneCopy } from '../utilities' import Project from '../projects/Project' import { User } from '../users/User' -export type CampaignState = 'draft' | 'scheduled' | 'pending' | 'running' | 'finished' | 'aborted' +export type CampaignState = 'draft' | 'scheduled' | 'loading' | 'running' | 'finished' | 'aborted' export interface CampaignDelivery { sent: number total: number @@ -37,6 +37,7 @@ export default class Campaign extends Model { state!: CampaignState delivery!: CampaignDelivery tags?: string[] + progress?: CampaignPopulationProgress send_in_user_timezone?: boolean send_at?: string | Date | null @@ -50,9 +51,14 @@ export default class Campaign extends Model { } } +export type CampaignPopulationProgress = { + complete: number + total: number +} + export type SentCampaign = Campaign & { send_at: Date } -export type CampaignParams = Omit +export type CampaignParams = Omit export type CampaignCreateParams = Omit export type CampaignUpdateParams = Omit diff --git a/apps/platform/src/campaigns/CampaignGenerateListJob.ts b/apps/platform/src/campaigns/CampaignGenerateListJob.ts index 947f062e..02034ec0 100644 --- a/apps/platform/src/campaigns/CampaignGenerateListJob.ts +++ b/apps/platform/src/campaigns/CampaignGenerateListJob.ts @@ -1,9 +1,11 @@ +import App from '../app' import { logger } from '../config/logger' +import { cacheSet } from '../config/redis' import { acquireLock, releaseLock } from '../core/Lock' import { Job } from '../queue' import { CampaignJobParams, SentCampaign } from './Campaign' import CampaignEnqueueSendsJob from './CampaignEnqueueSendsJob' -import { estimatedSendSize, generateSendList, getCampaign } from './CampaignService' +import { CacheKeys, estimatedSendSize, generateSendList, getCampaign } from './CampaignService' export default class CampaignGenerateListJob extends Job { static $name = 'campaign_generate_list_job' @@ -20,8 +22,14 @@ export default class CampaignGenerateListJob extends Job { if (!campaign) return if (campaign.state === 'aborted' || campaign.state === 'draft') return - // Increase lock duration based on estimated send size + // Approximate the size of the send list const estimatedSize = await estimatedSendSize(campaign) + + // Use approximate size for progress + await cacheSet(App.main.redis, CacheKeys.populationTotal(campaign), estimatedSize, 86400) + await cacheSet(App.main.redis, CacheKeys.populationProgress(campaign), 0, 86400) + + // Increase lock duration based on estimated send size const lockTime = Math.ceil(Math.max(estimatedSize / 1000, 900)) logger.info({ id, estimatedSize, lockTime }, 'campaign:generate:estimated_size') diff --git a/apps/platform/src/campaigns/CampaignService.ts b/apps/platform/src/campaigns/CampaignService.ts index c65c1ab3..75573817 100644 --- a/apps/platform/src/campaigns/CampaignService.ts +++ b/apps/platform/src/campaigns/CampaignService.ts @@ -4,7 +4,7 @@ import TextJob from '../providers/text/TextJob' import EmailJob from '../providers/email/EmailJob' import { User } from '../users/User' import { UserEvent } from '../users/UserEvent' -import Campaign, { CampaignCreateParams, CampaignDelivery, CampaignParams, CampaignProgress, CampaignSend, CampaignSendParams, CampaignSendReferenceType, CampaignSendState, SentCampaign } from './Campaign' +import Campaign, { CampaignCreateParams, CampaignDelivery, CampaignParams, CampaignPopulationProgress, CampaignProgress, CampaignSend, CampaignSendParams, CampaignSendReferenceType, CampaignSendState, SentCampaign } from './Campaign' import List, { UserList } from '../lists/List' import Subscription, { SubscriptionState, UserSubscription } from '../subscriptions/Subscription' import { RequestError } from '../core/errors' @@ -21,10 +21,14 @@ import CampaignGenerateListJob from './CampaignGenerateListJob' import Project from '../projects/Project' import Template from '../render/Template' import { differenceInDays, subDays } from 'date-fns' -import { raw } from '../core/Model' +import { raw, ref } from '../core/Model' +import { cacheGet, cacheIncr } from '../config/redis' +import App from '../app' export const CacheKeys = { pendingStats: 'campaigns:pending_stats', + populationProgress: (campaign: Campaign) => `campaigns:${campaign.id}:progress`, + populationTotal: (campaign: Campaign) => `campaigns:${campaign.id}:total`, } export const pagedCampaigns = async (params: PageParams, projectId: number) => { @@ -67,6 +71,9 @@ export const getCampaign = async (id: number, projectId: number): Promise m.get(campaign.id)) + if (campaign.state === 'loading') { + campaign.progress = await campaignPopulationProgress(campaign) + } } return campaign @@ -87,14 +94,6 @@ export const createCampaign = async (projectId: number, { tags, ...params }: Cam project_id: projectId, }) - // Calculate initial users count - await Campaign.update(qb => qb.where('id', campaign.id), { - delivery: { - ...campaign.delivery, - total: await initialUsersCount(campaign), - }, - }) - if (tags?.length) { await setTags({ project_id: projectId, @@ -128,7 +127,7 @@ export const updateCampaign = async (id: number, projectId: number, { tags, ...p if (send_at && campaign.send_at && send_at !== campaign.send_at) { - data.state = 'pending' + data.state = 'loading' await abortCampaign(campaign) } @@ -136,8 +135,8 @@ export const updateCampaign = async (id: number, projectId: number, { tags, ...p if (data.state === 'scheduled') { await validateTemplates(projectId, id) - // Set to pending if success so scheduling starts - data.state = 'pending' + // Set to loading if success so scheduling starts + data.state = 'loading' } // If this is a trigger campaign, should always be running @@ -159,7 +158,7 @@ export const updateCampaign = async (id: number, projectId: number, { tags, ...p }) } - if (data.state === 'pending' && campaign.type === 'blast') { + if (data.state === 'loading' && campaign.type === 'blast') { await CampaignGenerateListJob.from(campaign).queue() } @@ -289,16 +288,35 @@ export const generateSendList = async (campaign: SentCampaign) => { // Clear any aborted sends await clearCampaign(campaign) - // Generate the initial send list - const query = recipientQuery(campaign) - await chunk(query, 250, async (items) => { - await CampaignSend.query() - .insert(items) - .onConflict(['campaign_id', 'user_id', 'reference_id']) - .ignore() - }, ({ user_id, timezone }: { user_id: number, timezone: string }) => - CampaignSend.create(campaign, project, User.fromJson({ id: user_id, timezone })), - ) + const stream = UserList.query() + .select('users.id AS id') + .leftJoin('users', 'user_list.user_id', 'users.id') + .whereIn('user_list.list_id', campaign.list_ids ?? []) + .stream() + + const ingest = async (lastId: number, limit: number) => { + const query = recipientPartialQuery(campaign, lastId, limit) + const cacheKey = CacheKeys.populationProgress(campaign) + await chunk(query, 300, async (items) => { + await CampaignSend.query() + .insert(items) + .onConflict(['campaign_id', 'user_id', 'reference_id']) + .ignore() + await cacheIncr(App.main.redis, cacheKey, items.length, 300) + }, ({ user_id, timezone }: { user_id: number, timezone: string }) => + CampaignSend.create(campaign, project, User.fromJson({ id: user_id, timezone })), + ) + } + + let count = 0 + let lastId = 0 + const limit = 10_000 + + for await (const user of stream) { + if (count % limit === 0) await ingest(lastId, limit) + lastId = user.id + count++ + } await Campaign.update(qb => qb.where('id', campaign.id), { state: 'scheduled' }) } @@ -413,15 +431,14 @@ export const duplicateCampaign = async (campaign: Campaign) => { return await getCampaign(cloneId, campaign.project_id) } -const initialUsersCount = async (campaign: Campaign): Promise => { - const response = await recipientQuery(campaign) - .clear('select') - .select(UserList.raw('COUNT(DISTINCT(users.id)) as count')) - const { count } = response[0] - return Math.max(0, count) +export const campaignPopulationProgress = async (campaign: Campaign): Promise => { + return { + complete: await cacheGet(App.main.redis, CacheKeys.populationProgress(campaign)) ?? 0, + total: await cacheGet(App.main.redis, CacheKeys.populationTotal(campaign)) ?? 0, + } } -export const campaignProgress = async (campaignId: number): Promise => { +export const campaignDeliveryProgress = async (campaignId: number): Promise => { const progress = await CampaignSend.query() .where('campaign_id', campaignId) .select(CampaignSend.raw("SUM(IF(state = 'sent', 1, 0)) AS sent, SUM(IF(state IN('pending', 'throttled'), 1, 0)) AS pending, COUNT(*) AS total, SUM(IF(opened_at IS NOT NULL, 1, 0)) AS opens, SUM(IF(clicks > 0, 1, 0)) AS clicks")) @@ -443,7 +460,7 @@ export const updateCampaignProgress = async (campaign: Campaign): Promise return 'running' } - const { pending, ...delivery } = await campaignProgress(campaign.id) + const { pending, ...delivery } = await campaignDeliveryProgress(campaign.id) const state = currentState(pending, delivery) // If nothing has changed, continue otherwise update @@ -533,3 +550,42 @@ export const updateCampaignSendEnrollment = async (user: User) => { .merge(['state', 'send_at']) } } + +const recipientPartialQuery = (campaign: Campaign, sinceId: number, limit = 10000) => { + return User.query() + .select('users.id AS user_id', 'users.timezone') + .innerJoin('user_list', sbq => + sbq.on('users.id', 'user_list.user_id') + .onIn('user_list.list_id', campaign.list_ids ?? []), + ) + .where('users.project_id', campaign.project_id) + .where(qb => { + if (campaign.channel === 'email') { + qb.whereNotNull('users.email') + } else if (campaign.channel === 'text') { + qb.whereNotNull('users.phone') + } else if (campaign.channel === 'push') { + qb.whereNotNull('users.devices') + } + }) + .whereNotExists( + UserList.query() + .whereIn('list_id', campaign.exclusion_list_ids ?? []) + .where('user_id', ref('users.id')), + ) + .whereNotExists( + CampaignSend.query() + .where('campaign_id', campaign.id) + .where('user_id', ref('users.id')) + .where('state', 'sent'), + ) + .whereNotExists( + UserSubscription.query() + .where('subscription_id', campaign.subscription_id) + .where('user_id', ref('users.id')) + .where('state', SubscriptionState.unsubscribed), + ) + .where('users.id', '>', sinceId) + .orderBy('user_list.id') + .limit(limit) +} diff --git a/apps/platform/src/campaigns/ProcessCampaignsJob.ts b/apps/platform/src/campaigns/ProcessCampaignsJob.ts index b891f79d..2feec761 100644 --- a/apps/platform/src/campaigns/ProcessCampaignsJob.ts +++ b/apps/platform/src/campaigns/ProcessCampaignsJob.ts @@ -9,14 +9,14 @@ export default class ProcessCampaignsJob extends Job { static async handler() { const campaigns = await Campaign.query() - .whereIn('state', ['pending', 'scheduled', 'running']) + .whereIn('state', ['loading', 'scheduled', 'running']) .whereNotNull('send_at') .whereNull('deleted_at') - .where('type', 'blast') + .where('type', 'blast') as Campaign[] for (const campaign of campaigns) { // When pending we need to regenerate send list - if (campaign.state === 'pending') { + if (campaign.state === 'loading') { await CampaignGenerateListJob.from(campaign).queue() // Otherwise lets look through messages that are ready to send diff --git a/apps/platform/src/core/Model.ts b/apps/platform/src/core/Model.ts index b68badfc..c5129913 100644 --- a/apps/platform/src/core/Model.ts +++ b/apps/platform/src/core/Model.ts @@ -8,6 +8,10 @@ export const raw = (raw: Database.Value, db: Database = App.main.db) => { return db.raw(raw) } +export const ref = (ref: string, db: Database = App.main.db) => { + return db.ref(ref) +} + export interface SearchResult { results: T[] nextCursor?: string diff --git a/apps/ui/src/types.ts b/apps/ui/src/types.ts index 625a11e5..cfd3c80a 100644 --- a/apps/ui/src/types.ts +++ b/apps/ui/src/types.ts @@ -349,7 +349,7 @@ export interface JourneyEntranceDetail { userSteps: JourneyUserStep[] } -export type CampaignState = 'draft' | 'pending' | 'scheduled' | 'running' | 'finished' | 'aborted' +export type CampaignState = 'draft' | 'loading' | 'scheduled' | 'running' | 'finished' | 'aborted' export interface CampaignDelivery { sent: number @@ -381,6 +381,10 @@ export interface Campaign { send_in_user_timezone: boolean send_at: string screenshot_url: string + progress?: { + complete: number + total: number + } created_at: string updated_at: string } diff --git a/apps/ui/src/views/campaign/CampaignDelivery.tsx b/apps/ui/src/views/campaign/CampaignDelivery.tsx index 6b7e16ed..98a50ede 100644 --- a/apps/ui/src/views/campaign/CampaignDelivery.tsx +++ b/apps/ui/src/views/campaign/CampaignDelivery.tsx @@ -1,4 +1,4 @@ -import { useCallback, useContext } from 'react' +import { useCallback, useContext, useEffect } from 'react' import api from '../../api' import { CampaignContext, ProjectContext } from '../../contexts' import { CampaignDelivery as Delivery, CampaignSendState } from '../../types' @@ -51,10 +51,30 @@ export default function CampaignDelivery() { const [project] = useContext(ProjectContext) const { t } = useTranslation() const [preferences] = useContext(PreferencesContext) - const [{ id, state, send_at, delivery }] = useContext(CampaignContext) + const [campaign, setCampaign] = useContext(CampaignContext) + const { id, state, send_at, delivery, progress } = campaign const searchState = useSearchTableState(useCallback(async params => await api.campaigns.users(project.id, id, params), [id, project])) const route = useRoute() + useEffect(() => { + const refresh = () => { + api.campaigns.get(project.id, campaign.id) + .then(setCampaign) + .then(() => searchState.reload) + .catch(() => {}) + } + + if (state !== 'loading') return + const complete = progress?.complete ?? 0 + const total = progress?.total ?? 0 + const percent = total > 0 ? complete / total * 100 : 0 + const refreshRate = percent < 5 ? 1000 : 5000 + const interval = setInterval(refresh, refreshRate) + refresh() + + return () => clearInterval(interval) + }, [state]) + return ( <> diff --git a/apps/ui/src/views/campaign/CampaignDetail.tsx b/apps/ui/src/views/campaign/CampaignDetail.tsx index 3c355860..23a3dbd3 100644 --- a/apps/ui/src/views/campaign/CampaignDetail.tsx +++ b/apps/ui/src/views/campaign/CampaignDetail.tsx @@ -57,16 +57,19 @@ export default function CampaignDetail() { const [project] = useContext(ProjectContext) const { t } = useTranslation() const [campaign, setCampaign] = useContext(CampaignContext) - const { name, templates, state } = campaign + const { name, templates, state, progress } = campaign const [locale, setLocale] = useState(localeState(templates ?? [])) useEffect(() => { setLocale(localeState(templates ?? [])) }, [campaign.id]) const [isLaunchOpen, setIsLaunchOpen] = useState(false) + const [isLoading, setIsLoading] = useState(false) const handleAbort = async () => { + setIsLoading(true) const value = await api.campaigns.update(project.id, campaign.id, { state: 'aborted' }) setCampaign(value) + setIsLoading(false) } const tabs = [ @@ -105,7 +108,7 @@ export default function CampaignDetail() { onClick={() => setIsLaunchOpen(true)} >{t('restart_campaign')} ), - pending: <>, + loading: <>, scheduled: ( <> @@ -121,6 +125,7 @@ export default function CampaignDetail() { running: ( ), @@ -130,7 +135,7 @@ export default function CampaignDetail() { return ( } + desc={state !== 'draft' && } actions={campaign.type !== 'trigger' && action[state]} fullscreen={true}> diff --git a/apps/ui/src/views/campaign/Campaigns.tsx b/apps/ui/src/views/campaign/Campaigns.tsx index 43bd2c02..b2757f18 100644 --- a/apps/ui/src/views/campaign/Campaigns.tsx +++ b/apps/ui/src/views/campaign/Campaigns.tsx @@ -18,17 +18,23 @@ import { ProjectContext } from '../../contexts' import { PreferencesContext } from '../../ui/PreferencesContext' import { Translation, useTranslation } from 'react-i18next' -export const CampaignTag = ({ state }: { state: CampaignState }) => { +export const CampaignTag = ({ state, progress }: Pick) => { const variant: Record = { draft: 'plain', aborted: 'error', - pending: 'info', + loading: 'info', scheduled: 'info', running: 'info', finished: 'success', } + + const complete = progress?.complete ?? 0 + const total = progress?.total ?? 0 + const percent = total > 0 ? complete / total : 0 + const percentStr = percent.toLocaleString(undefined, { style: 'percent', minimumFractionDigits: 0 }) return { (t) => t(state) } + {progress && ` (${percentStr})`} }