From 7782f840bdf92672c6a9b967ccf6c6b9ab456543 Mon Sep 17 00:00:00 2001 From: productdevbook Date: Mon, 9 Mar 2026 09:19:53 +0300 Subject: [PATCH] Scale notification delivery pipeline --- app/.graphql/nitro-graphql-client.d.ts | 7 + app/.graphql/nitro-graphql-server.d.ts | 10 + app/.graphql/schema.graphql | 7 + app/server/api/track/click/[id].get.ts | 53 +--- app/server/api/track/open/[id].get.ts | 55 +--- app/server/database/connection.ts | 3 +- .../migration.sql | 4 + .../migration.sql | 3 + app/server/database/schema/enums.ts | 12 +- app/server/database/schema/notification.ts | 5 +- .../getNotificationAnalytics.resolver.ts | 86 ++---- .../mutation/cancelNotification.resolver.ts | 2 +- .../mutation/scheduleNotification.resolver.ts | 74 ++--- .../mutation/sendNotification.resolver.ts | 227 +++++---------- .../notifications/notification.graphql | 8 +- app/server/graphql/share/shared.graphql | 3 +- .../stats/query/dashboardStats.resolver.ts | 20 +- app/server/plugins/scheduler.ts | 121 +++----- app/server/plugins/worker.ts | 65 +++-- app/server/queues/notification.queue.ts | 47 ++- app/server/queues/tracking.queue.ts | 16 ++ app/server/queues/webhook.queue.ts | 18 ++ app/server/utils/apiKeyCache.ts | 54 ++++ app/server/utils/auth.ts | 42 ++- app/server/utils/batching.ts | 12 + app/server/utils/bullmq.ts | 6 +- app/server/utils/notificationJobId.ts | 20 ++ app/server/utils/notificationStatus.ts | 33 +++ app/server/utils/notificationTargeting.ts | 214 ++++++++++++++ app/server/utils/webhookDispatcher.ts | 17 +- app/server/workers/notification.worker.ts | 272 ++++++++++++++++-- app/server/workers/tracking.worker.ts | 96 +++++++ app/server/workers/webhook.worker.ts | 12 + app/tests/api-key-cache.test.ts | 25 ++ app/tests/batching.test.ts | 12 + app/tests/notification-queue.test.ts | 34 +++ app/tests/notification-status.test.ts | 20 ++ 37 files changed, 1205 insertions(+), 510 deletions(-) create mode 100644 app/server/database/migrations/20260308000000_add_notification_targeting_fields/migration.sql create mode 100644 app/server/database/migrations/20260309000000_expand_notification_status_enum/migration.sql create mode 100644 app/server/queues/tracking.queue.ts create mode 100644 app/server/queues/webhook.queue.ts create mode 100644 app/server/utils/apiKeyCache.ts create mode 100644 app/server/utils/batching.ts create mode 100644 app/server/utils/notificationJobId.ts create mode 100644 app/server/utils/notificationStatus.ts create mode 100644 app/server/utils/notificationTargeting.ts create mode 100644 app/server/workers/tracking.worker.ts create mode 100644 app/server/workers/webhook.worker.ts create mode 100644 app/tests/api-key-cache.test.ts create mode 100644 app/tests/batching.test.ts create mode 100644 app/tests/notification-queue.test.ts create mode 100644 app/tests/notification-status.test.ts diff --git a/app/.graphql/nitro-graphql-client.d.ts b/app/.graphql/nitro-graphql-client.d.ts index ba424df..1ca426b 100644 --- a/app/.graphql/nitro-graphql-client.d.ts +++ b/app/.graphql/nitro-graphql-client.d.ts @@ -294,6 +294,7 @@ export type HookEvent = | 'NOTIFICATION_SENT' | 'NOTIFICATION_DELIVERED' | 'NOTIFICATION_FAILED' + | 'NOTIFICATION_OPENED' | 'NOTIFICATION_CLICKED' | 'WORKFLOW_COMPLETED' | 'WORKFLOW_FAILED'; @@ -550,6 +551,9 @@ export type Notification = { sound?: Maybe; badge?: Maybe; status: NotificationStatus; + channelType?: Maybe; + channelId?: Maybe; + contactIds?: Maybe; targetDevices?: Maybe; platforms?: Maybe; scheduledAt?: Maybe; @@ -580,9 +584,12 @@ export type NotificationAnalytics = { export type NotificationStatus = | 'PENDING' + | 'QUEUED' + | 'PROCESSING' | 'SENT' | 'DELIVERED' | 'FAILED' + | 'PARTIAL' | 'SCHEDULED'; export type PageInfo = { diff --git a/app/.graphql/nitro-graphql-server.d.ts b/app/.graphql/nitro-graphql-server.d.ts index 0bfa5b7..e2aecca 100644 --- a/app/.graphql/nitro-graphql-server.d.ts +++ b/app/.graphql/nitro-graphql-server.d.ts @@ -355,6 +355,7 @@ export type HookEvent = | 'NOTIFICATION_SENT' | 'NOTIFICATION_DELIVERED' | 'NOTIFICATION_FAILED' + | 'NOTIFICATION_OPENED' | 'NOTIFICATION_CLICKED' | 'WORKFLOW_COMPLETED' | 'WORKFLOW_FAILED'; @@ -611,6 +612,9 @@ export interface Notification { sound?: Maybe; badge?: Maybe; status: NotificationStatus; + channelType?: Maybe; + channelId?: Maybe; + contactIds?: Maybe; targetDevices?: Maybe; platforms?: Maybe; scheduledAt?: Maybe; @@ -641,9 +645,12 @@ export interface NotificationAnalytics { export type NotificationStatus = | 'PENDING' + | 'QUEUED' + | 'PROCESSING' | 'SENT' | 'DELIVERED' | 'FAILED' + | 'PARTIAL' | 'SCHEDULED'; export interface PageInfo { @@ -1520,6 +1527,9 @@ export type NotificationResolvers, ParentType, ContextType>; badge?: Resolver, ParentType, ContextType>; status?: Resolver; + channelType?: Resolver, ParentType, ContextType>; + channelId?: Resolver, ParentType, ContextType>; + contactIds?: Resolver, ParentType, ContextType>; targetDevices?: Resolver, ParentType, ContextType>; platforms?: Resolver, ParentType, ContextType>; scheduledAt?: Resolver, ParentType, ContextType>; diff --git a/app/.graphql/schema.graphql b/app/.graphql/schema.graphql index 312d85e..37800f2 100644 --- a/app/.graphql/schema.graphql +++ b/app/.graphql/schema.graphql @@ -273,6 +273,7 @@ enum HookEvent { NOTIFICATION_SENT NOTIFICATION_DELIVERED NOTIFICATION_FAILED + NOTIFICATION_OPENED NOTIFICATION_CLICKED WORKFLOW_COMPLETED WORKFLOW_FAILED @@ -344,6 +345,9 @@ type Notification { sound: String badge: Int status: NotificationStatus! + channelType: ChannelType + channelId: ID + contactIds: JSON targetDevices: JSON platforms: JSON scheduledAt: Timestamp @@ -373,9 +377,12 @@ type NotificationAnalytics { enum NotificationStatus { PENDING + QUEUED + PROCESSING SENT DELIVERED FAILED + PARTIAL SCHEDULED } diff --git a/app/server/api/track/click/[id].get.ts b/app/server/api/track/click/[id].get.ts index 7770a9b..0844e91 100644 --- a/app/server/api/track/click/[id].get.ts +++ b/app/server/api/track/click/[id].get.ts @@ -1,8 +1,5 @@ import { Buffer } from 'node:buffer' -import { getDatabase } from '#server/database/connection' -import { deliveryLog, notification } from '#server/database/schema' -import { dispatchHooks } from '#server/utils/webhookDispatcher' -import { eq, sql } from 'drizzle-orm' +import { addTrackNotificationEventJob } from '#server/queues/tracking.queue' import { defineEventHandler, getQuery, getRouterParam, sendRedirect } from 'nitro/h3' export default defineEventHandler(async (event) => { @@ -27,49 +24,11 @@ export default defineEventHandler(async (event) => { if (id) { try { - const db = getDatabase() - - const rows = await db - .select({ - id: deliveryLog.id, - notificationId: deliveryLog.notificationId, - clickedAt: deliveryLog.clickedAt, - }) - .from(deliveryLog) - .where(eq(deliveryLog.id, id)) - .limit(1) - - const log = rows[0] - if (log && !log.clickedAt) { - const now = new Date().toISOString() - - await db - .update(deliveryLog) - .set({ clickedAt: now, updatedAt: now }) - .where(eq(deliveryLog.id, id)) - - const notifRows = await db - .select({ appId: notification.appId }) - .from(notification) - .where(eq(notification.id, log.notificationId)) - .limit(1) - - if (notifRows[0]) { - const { appId } = notifRows[0] - - await db - .update(notification) - .set({ totalClicked: sql`"totalClicked" + 1`, updatedAt: now }) - .where(eq(notification.id, log.notificationId)) - - await dispatchHooks(appId, 'NOTIFICATION_CLICKED', { - notificationId: log.notificationId, - deliveryLogId: id, - clickedAt: now, - destination, - }) - } - } + await addTrackNotificationEventJob({ + type: 'click', + deliveryLogId: id, + destination, + }) } catch (err) { // Never block the redirect on tracking errors diff --git a/app/server/api/track/open/[id].get.ts b/app/server/api/track/open/[id].get.ts index 777d084..0eac057 100644 --- a/app/server/api/track/open/[id].get.ts +++ b/app/server/api/track/open/[id].get.ts @@ -1,8 +1,5 @@ import { Buffer } from 'node:buffer' -import { getDatabase } from '#server/database/connection' -import { deliveryLog, notification } from '#server/database/schema' -import { dispatchHooks } from '#server/utils/webhookDispatcher' -import { eq, sql } from 'drizzle-orm' +import { addTrackNotificationEventJob } from '#server/queues/tracking.queue' import { defineEventHandler, getRouterParam } from 'nitro/h3' // 1×1 transparent GIF @@ -21,52 +18,10 @@ export default defineEventHandler(async (event) => { } try { - const db = getDatabase() - - // Fetch delivery log — only update on first open - const rows = await db - .select({ - id: deliveryLog.id, - notificationId: deliveryLog.notificationId, - openedAt: deliveryLog.openedAt, - }) - .from(deliveryLog) - .where(eq(deliveryLog.id, id)) - .limit(1) - - const log = rows[0] - if (!log || log.openedAt) { - // Unknown ID or already tracked — return pixel silently - return PIXEL - } - - const now = new Date().toISOString() - - await db - .update(deliveryLog) - .set({ openedAt: now, updatedAt: now }) - .where(eq(deliveryLog.id, id)) - - const notifRows = await db - .select({ appId: notification.appId }) - .from(notification) - .where(eq(notification.id, log.notificationId)) - .limit(1) - - if (notifRows[0]) { - const { appId } = notifRows[0] - - await db - .update(notification) - .set({ totalOpened: sql`"totalOpened" + 1`, updatedAt: now }) - .where(eq(notification.id, log.notificationId)) - - await dispatchHooks(appId, 'NOTIFICATION_OPENED', { - notificationId: log.notificationId, - deliveryLogId: id, - openedAt: now, - }) - } + await addTrackNotificationEventJob({ + type: 'open', + deliveryLogId: id, + }) } catch (err) { // Never break pixel delivery on tracking errors diff --git a/app/server/database/connection.ts b/app/server/database/connection.ts index 8fd3d57..ef88849 100644 --- a/app/server/database/connection.ts +++ b/app/server/database/connection.ts @@ -8,7 +8,8 @@ let db: ReturnType> | undefined export function getDatabase() { if (!db) { const connectionString = process.env.DATABASE_URL || 'postgresql://localhost:5432/nitroping' - pgClient = postgres(connectionString, { max: 3, idle_timeout: 30 }) + const maxConnections = Number.parseInt(process.env.DATABASE_POOL_MAX || '20') + pgClient = postgres(connectionString, { max: maxConnections, idle_timeout: 30 }) db = drizzle({ client: pgClient, schema }) } return db diff --git a/app/server/database/migrations/20260308000000_add_notification_targeting_fields/migration.sql b/app/server/database/migrations/20260308000000_add_notification_targeting_fields/migration.sql new file mode 100644 index 0000000..4368301 --- /dev/null +++ b/app/server/database/migrations/20260308000000_add_notification_targeting_fields/migration.sql @@ -0,0 +1,4 @@ +ALTER TABLE "notification" + ADD COLUMN IF NOT EXISTS "channelType" "channelType", + ADD COLUMN IF NOT EXISTS "channelId" uuid, + ADD COLUMN IF NOT EXISTS "contactIds" jsonb; diff --git a/app/server/database/migrations/20260309000000_expand_notification_status_enum/migration.sql b/app/server/database/migrations/20260309000000_expand_notification_status_enum/migration.sql new file mode 100644 index 0000000..a569894 --- /dev/null +++ b/app/server/database/migrations/20260309000000_expand_notification_status_enum/migration.sql @@ -0,0 +1,3 @@ +ALTER TYPE "notification_status" ADD VALUE IF NOT EXISTS 'QUEUED'; +ALTER TYPE "notification_status" ADD VALUE IF NOT EXISTS 'PROCESSING'; +ALTER TYPE "notification_status" ADD VALUE IF NOT EXISTS 'PARTIAL'; diff --git a/app/server/database/schema/enums.ts b/app/server/database/schema/enums.ts index a8b937a..8e81a9f 100644 --- a/app/server/database/schema/enums.ts +++ b/app/server/database/schema/enums.ts @@ -10,7 +10,16 @@ export const categoryEnum = pgEnum('category', [ export const platformEnum = pgEnum('platform', ['IOS', 'ANDROID', 'WEB']) -export const notificationStatusEnum = pgEnum('notification_status', ['PENDING', 'SENT', 'DELIVERED', 'FAILED', 'SCHEDULED']) +export const notificationStatusEnum = pgEnum('notification_status', [ + 'PENDING', + 'QUEUED', + 'PROCESSING', + 'SENT', + 'DELIVERED', + 'FAILED', + 'PARTIAL', + 'SCHEDULED', +]) export const deviceStatusEnum = pgEnum('device_status', ['ACTIVE', 'INACTIVE', 'EXPIRED']) @@ -33,6 +42,7 @@ export const hookEventEnum = pgEnum('hookEvent', [ 'NOTIFICATION_SENT', 'NOTIFICATION_DELIVERED', 'NOTIFICATION_FAILED', + 'NOTIFICATION_OPENED', 'NOTIFICATION_CLICKED', 'WORKFLOW_COMPLETED', 'WORKFLOW_FAILED', diff --git a/app/server/database/schema/notification.ts b/app/server/database/schema/notification.ts index 0118ff0..e72f21c 100644 --- a/app/server/database/schema/notification.ts +++ b/app/server/database/schema/notification.ts @@ -2,7 +2,7 @@ import { index, integer, pgTable, text, uuid } from 'drizzle-orm/pg-core' import { createInsertSchema, createSelectSchema } from 'drizzle-zod' import { customJsonb, customTimestamp, uuidv7Generator } from '../shared' import { app } from './app' -import { notificationStatusEnum } from './enums' +import { channelTypeEnum, notificationStatusEnum } from './enums' export const notification = pgTable('notification', { id: uuid().primaryKey().$defaultFn(uuidv7Generator), @@ -16,6 +16,9 @@ export const notification = pgTable('notification', { icon: text(), image: text(), imageUrl: text(), + channelType: channelTypeEnum(), + channelId: uuid(), + contactIds: customJsonb(), targetDevices: customJsonb(), platforms: customJsonb(), scheduledAt: customTimestamp(), diff --git a/app/server/graphql/analytics/query/getNotificationAnalytics.resolver.ts b/app/server/graphql/analytics/query/getNotificationAnalytics.resolver.ts index c3495a9..6a5f739 100644 --- a/app/server/graphql/analytics/query/getNotificationAnalytics.resolver.ts +++ b/app/server/graphql/analytics/query/getNotificationAnalytics.resolver.ts @@ -1,6 +1,7 @@ import * as tables from '#server/database/schema' +import { aggregateNotificationMetrics } from '#server/utils/notificationTargeting' import { useDatabase } from '#server/utils/useDatabase' -import { eq } from 'drizzle-orm' +import { eq, sql } from 'drizzle-orm' import { defineQuery } from 'nitro-graphql/define' import { HTTPError } from 'nitro/h3' @@ -26,68 +27,35 @@ export const getNotificationAnalyticsQuery = defineQuery({ const notif = notification[0]! - // Get delivery logs for this notification - const deliveryLogs = await db - .select() + const metrics = await aggregateNotificationMetrics(db, notificationId) + + const platformBreakdownRows = await db + .select({ + platform: sql`coalesce(${tables.deliveryLog.platform}, 'unknown')`, + sent: sql`count(*)`, + delivered: sql`count(*) filter (where ${tables.deliveryLog.deliveredAt} is not null)`, + opened: sql`count(*) filter (where ${tables.deliveryLog.openedAt} is not null)`, + clicked: sql`count(*) filter (where ${tables.deliveryLog.clickedAt} is not null)`, + avgDeliveryTime: sql`avg(extract(epoch from (${tables.deliveryLog.deliveredAt} - ${tables.deliveryLog.sentAt})))`, + avgOpenTime: sql`avg(extract(epoch from (${tables.deliveryLog.openedAt} - ${tables.deliveryLog.deliveredAt})))`, + }) .from(tables.deliveryLog) .where(eq(tables.deliveryLog.notificationId, notificationId)) + .groupBy(sql`coalesce(${tables.deliveryLog.platform}, 'unknown')`) - const sentCount = notif.totalSent - const deliveredCount = deliveryLogs.filter((log: any) => log.deliveredAt !== null).length - const openedCount = deliveryLogs.filter((log: any) => log.openedAt !== null).length - const clickedCount = deliveryLogs.filter((log: any) => log.clickedAt !== null).length - - // Calculate platform breakdown - const platformStats = new Map() - - deliveryLogs.forEach((log: any) => { - const platform = log.platform || 'unknown' - if (!platformStats.has(platform)) { - platformStats.set(platform, { - platform, - sent: 0, - delivered: 0, - opened: 0, - clicked: 0, - deliveryTimes: [], - openTimes: [], - }) - } - - const stats = platformStats.get(platform) - stats.sent++ - - if (log.deliveredAt) { - stats.delivered++ - if (log.sentAt) { - stats.deliveryTimes.push(new Date(log.deliveredAt).getTime() - new Date(log.sentAt).getTime()) - } - } - - if (log.openedAt) { - stats.opened++ - if (log.deliveredAt) { - stats.openTimes.push(new Date(log.openedAt).getTime() - new Date(log.deliveredAt).getTime()) - } - } - - if (log.clickedAt) { - stats.clicked++ - } - }) + const sentCount = notif.totalSent ?? metrics.sentCount + const deliveredCount = metrics.deliveredCount + const openedCount = metrics.openedCount + const clickedCount = metrics.clickedCount - const platformBreakdown = Array.from(platformStats.values()).map(stats => ({ - platform: stats.platform, - sent: stats.sent, - delivered: stats.delivered, - opened: stats.opened, - clicked: stats.clicked, - avgDeliveryTime: stats.deliveryTimes.length > 0 - ? stats.deliveryTimes.reduce((a: number, b: number) => a + b, 0) / stats.deliveryTimes.length / 1000 - : null, - avgOpenTime: stats.openTimes.length > 0 - ? stats.openTimes.reduce((a: number, b: number) => a + b, 0) / stats.openTimes.length / 1000 - : null, + const platformBreakdown = platformBreakdownRows.map(row => ({ + platform: row.platform, + sent: Number(row.sent ?? 0), + delivered: Number(row.delivered ?? 0), + opened: Number(row.opened ?? 0), + clicked: Number(row.clicked ?? 0), + avgDeliveryTime: row.avgDeliveryTime === null ? null : Number(row.avgDeliveryTime), + avgOpenTime: row.avgOpenTime === null ? null : Number(row.avgOpenTime), })) return { diff --git a/app/server/graphql/notifications/mutation/cancelNotification.resolver.ts b/app/server/graphql/notifications/mutation/cancelNotification.resolver.ts index 876ae95..43ec31d 100644 --- a/app/server/graphql/notifications/mutation/cancelNotification.resolver.ts +++ b/app/server/graphql/notifications/mutation/cancelNotification.resolver.ts @@ -24,7 +24,7 @@ export const cancelNotificationMutation = defineMutation({ } // Only scheduled or pending notifications can be cancelled - if (!['scheduled', 'pending'].includes(notification[0].status)) { + if (!['SCHEDULED', 'PENDING'].includes(notification[0].status)) { throw new HTTPError({ status: 400, message: 'Only scheduled or pending notifications can be cancelled', diff --git a/app/server/graphql/notifications/mutation/scheduleNotification.resolver.ts b/app/server/graphql/notifications/mutation/scheduleNotification.resolver.ts index 737da91..6c312b3 100644 --- a/app/server/graphql/notifications/mutation/scheduleNotification.resolver.ts +++ b/app/server/graphql/notifications/mutation/scheduleNotification.resolver.ts @@ -1,6 +1,8 @@ +import type { ChannelType } from '#server/database/schema/enums' import * as tables from '#server/database/schema' +import { countChannelTargets, countPushTargets } from '#server/utils/notificationTargeting' import { useDatabase } from '#server/utils/useDatabase' -import { and, count, eq, inArray } from 'drizzle-orm' +import { eq } from 'drizzle-orm' import { defineMutation } from 'nitro-graphql/define' import { HTTPError } from 'nitro/h3' @@ -8,6 +10,7 @@ export const scheduleNotificationMutation = defineMutation({ scheduleNotification: { resolve: async (_parent, { input }, _ctx) => { const db = useDatabase() + const channelType = ((input.channelType as ChannelType | undefined) || 'PUSH') as ChannelType // Ensure scheduledAt is provided for scheduled notifications if (!input.scheduledAt) { @@ -29,6 +32,11 @@ export const scheduleNotificationMutation = defineMutation({ clickAction: input.clickAction, sound: input.sound, badge: input.badge, + channelType, + channelId: input.channelId as string | undefined, + contactIds: input.contactIds as string[] | undefined, + targetDevices: input.targetDevices as string[] | undefined, + platforms: input.platforms as string[] | undefined, status: 'SCHEDULED', scheduledAt: new Date(input.scheduledAt as string).toISOString(), totalTargets: 0, // Will be calculated @@ -41,42 +49,44 @@ export const scheduleNotificationMutation = defineMutation({ const insertedNotification = newNotification[0]! - // Get target devices count for statistics - let targetDevicesCount = 0 + try { + const targetDevicesCount = channelType !== 'PUSH' + ? await countChannelTargets(db, { + appId: input.appId, + channelType: channelType as 'EMAIL' | 'SMS' | 'IN_APP' | 'DISCORD' | 'TELEGRAM', + contactIds: input.contactIds as string[] | undefined, + }) + : await countPushTargets(db, { + appId: input.appId, + targetDevices: input.targetDevices as string[] | undefined, + platforms: input.platforms as string[] | undefined, + }) - if (input.targetDevices && input.targetDevices.length > 0) { - // Specific devices - const devices = await db - .select({ count: count() }) - .from(tables.device) - .where(inArray(tables.device.token, input.targetDevices)) - targetDevicesCount = devices.length - } - else { - // All devices for app (optionally filtered by platform) - const whereConditions = [eq(tables.device.appId, input.appId)] + await db + .update(tables.notification) + .set({ totalTargets: targetDevicesCount, updatedAt: new Date().toISOString() }) + .where(eq(tables.notification.id, insertedNotification.id)) - if (input.platforms && input.platforms.length > 0) { - whereConditions.push(inArray(tables.device.platform, input.platforms.map((p: string) => p.toLowerCase()))) + return { + ...insertedNotification, + channelType, + channelId: input.channelId as string | undefined, + contactIds: input.contactIds as string[] | undefined, + targetDevices: input.targetDevices as string[] | undefined, + platforms: input.platforms as string[] | undefined, + totalTargets: targetDevicesCount, } - - const devices = await db - .select({ count: count() }) - .from(tables.device) - .where(and(...whereConditions)) - - targetDevicesCount = devices.length } + catch (error) { + await db + .update(tables.notification) + .set({ + status: 'FAILED', + updatedAt: new Date().toISOString(), + }) + .where(eq(tables.notification.id, insertedNotification.id)) - // Update notification with target count - await db - .update(tables.notification) - .set({ totalTargets: targetDevicesCount }) - .where(eq(tables.notification.id, insertedNotification.id)) - - return { - ...insertedNotification, - totalTargets: targetDevicesCount, + throw error } }, }, diff --git a/app/server/graphql/notifications/mutation/sendNotification.resolver.ts b/app/server/graphql/notifications/mutation/sendNotification.resolver.ts index 1eda5ee..d78e736 100644 --- a/app/server/graphql/notifications/mutation/sendNotification.resolver.ts +++ b/app/server/graphql/notifications/mutation/sendNotification.resolver.ts @@ -1,13 +1,16 @@ +import type { ChannelType } from '#server/database/schema/enums' import * as tables from '#server/database/schema' -import { addSendNotificationJob } from '#server/queues/notification.queue' +import { addFanoutNotificationJob } from '#server/queues/notification.queue' +import { countChannelTargets, countPushTargets } from '#server/utils/notificationTargeting' import { useDatabase } from '#server/utils/useDatabase' -import { and, eq, inArray } from 'drizzle-orm' +import { eq } from 'drizzle-orm' import { defineMutation } from 'nitro-graphql/define' export const notificationMutations = defineMutation({ sendNotification: { resolve: async (_parent, { input }, _ctx) => { const db = useDatabase() + const channelType = ((input.channelType as ChannelType | undefined) || 'PUSH') as ChannelType // Create notification record immediately so the caller has an ID to track const [insertedNotification] = await db @@ -21,6 +24,11 @@ export const notificationMutations = defineMutation({ clickAction: input.clickAction as string | undefined, sound: input.sound as string | undefined, badge: input.badge as number | undefined, + channelType, + channelId: input.channelId as string | undefined, + contactIds: input.contactIds as string[] | undefined, + targetDevices: input.targetDevices as string[] | undefined, + platforms: input.platforms as string[] | undefined, status: (input.scheduledAt ? 'SCHEDULED' : 'PENDING') as 'SCHEDULED' | 'PENDING', scheduledAt: input.scheduledAt as string | undefined, totalTargets: 0, @@ -35,179 +43,74 @@ export const notificationMutations = defineMutation({ throw new Error('Failed to create notification record') } - // Scheduled notifications are handled by the scheduler plugin - if (input.scheduledAt) { - return insertedNotification - } - - const channelType = input.channelType as string | undefined - - // ── Channel-based delivery (EMAIL, SMS, DISCORD, IN_APP) ────────────── - if (channelType && channelType !== 'PUSH') { - const appId = input.appId as string - const channelId = input.channelId as string | undefined - const contactIds = input.contactIds as string[] | undefined - - const payload = { - title: input.title as string, - body: input.body as string, - data: input.data as Record | undefined, - } - - if (channelType === 'DISCORD' || channelType === 'TELEGRAM') { - // Discord/Telegram — single destination, no contact required - const resolvedChannelId = channelId ?? await resolveChannelId(db, appId, channelType) - - await addSendNotificationJob({ - deliveryMode: 'channel', - notificationId: insertedNotification.id, - appId, - channelId: resolvedChannelId, - to: '', - channelType: channelType as 'DISCORD' | 'TELEGRAM', - payload, - }) - - await db - .update(tables.notification) - .set({ totalTargets: 1, status: 'SENT', sentAt: new Date().toISOString() }) - .where(eq(tables.notification.id, insertedNotification.id)) - - return { ...insertedNotification, totalTargets: 1, status: 'SENT' } - } - - // For EMAIL, SMS, IN_APP — resolve contacts - let contacts: Array - - if (contactIds && contactIds.length > 0) { - contacts = await db - .select() - .from(tables.contact) - .where(inArray(tables.contact.id, contactIds)) + try { + // Scheduled notifications are handled by the scheduler plugin + if (input.scheduledAt) { + return insertedNotification } - else { - contacts = await db - .select() - .from(tables.contact) - .where(eq(tables.contact.appId, appId)) - } - - const resolvedChannelId = channelId ?? await resolveChannelId(db, appId, channelType as 'EMAIL' | 'SMS' | 'IN_APP') - - const jobs = contacts - .map((c) => { - const to = channelType === 'EMAIL' - ? c.email - : channelType === 'SMS' - ? c.phone - : c.id // IN_APP uses contactId - if (!to) - return null - - return addSendNotificationJob({ - deliveryMode: 'channel', - notificationId: insertedNotification.id, - appId, - channelId: resolvedChannelId, - to, - channelType: channelType as 'EMAIL' | 'SMS' | 'IN_APP', - payload, + const totalTargets = channelType !== 'PUSH' + ? await countChannelTargets(db, { + appId: input.appId as string, + channelType: channelType as 'EMAIL' | 'SMS' | 'IN_APP' | 'DISCORD' | 'TELEGRAM', + contactIds: input.contactIds as string[] | undefined, + }) + : await countPushTargets(db, { + appId: input.appId as string, + targetDevices: input.targetDevices as string[] | undefined, + platforms: input.platforms as string[] | undefined, }) - }) - .filter(Boolean) - - await Promise.all(jobs) await db .update(tables.notification) - .set({ totalTargets: jobs.length, status: 'SENT', sentAt: new Date().toISOString() }) + .set({ + totalTargets, + status: 'QUEUED', + updatedAt: new Date().toISOString(), + }) .where(eq(tables.notification.id, insertedNotification.id)) - return { ...insertedNotification, totalTargets: jobs.length, status: 'SENT' } - } - - // ── Device-based push delivery ───────────────────────────────────────── - let targetDevices: Array<{ id: string, appId: string, token: string, platform: string, webPushP256dh: string | null, webPushAuth: string | null }> - - if (input.targetDevices && (input.targetDevices as string[]).length > 0) { - targetDevices = await db - .select() - .from(tables.device) - .where(inArray(tables.device.token, input.targetDevices as string[])) - } - else { - const conditions = [eq(tables.device.appId, input.appId as string)] + await addFanoutNotificationJob({ + notificationId: insertedNotification.id, + appId: input.appId as string, + channelType, + channelId: input.channelId as string | undefined, + contactIds: input.contactIds as string[] | undefined, + targetDevices: input.targetDevices as string[] | undefined, + platforms: input.platforms as string[] | undefined, + payload: { + title: input.title as string, + body: input.body as string, + data: input.data as Record | undefined, + badge: input.badge as number | undefined, + sound: input.sound as string | undefined, + clickAction: input.clickAction as string | undefined, + imageUrl: input.imageUrl as string | undefined, + }, + }) - if (input.platforms && (input.platforms as string[]).length > 0) { - conditions.push(inArray(tables.device.platform, input.platforms as any)) + return { + ...insertedNotification, + channelType, + channelId: input.channelId as string | undefined, + contactIds: input.contactIds as string[] | undefined, + targetDevices: input.targetDevices as string[] | undefined, + platforms: input.platforms as string[] | undefined, + status: 'QUEUED', + totalTargets, } - - targetDevices = await db - .select() - .from(tables.device) - .where(and(...conditions)) } + catch (error) { + await db + .update(tables.notification) + .set({ + status: 'FAILED', + updatedAt: new Date().toISOString(), + }) + .where(eq(tables.notification.id, insertedNotification.id)) - // Update target count before queuing - await db - .update(tables.notification) - .set({ totalTargets: targetDevices.length }) - .where(eq(tables.notification.id, insertedNotification.id)) - - // Enqueue one job per device - await Promise.all( - targetDevices.map(device => - addSendNotificationJob({ - deliveryMode: 'device', - notificationId: insertedNotification.id, - deviceId: device.id, - appId: input.appId as string, - platform: device.platform.toLowerCase() as 'ios' | 'android' | 'web', - token: device.token, - webPushP256dh: device.webPushP256dh ?? undefined, - webPushAuth: device.webPushAuth ?? undefined, - payload: { - title: input.title as string, - body: input.body as string, - data: input.data as Record | undefined, - badge: input.badge as number | undefined, - sound: input.sound as string | undefined, - clickAction: input.clickAction as string | undefined, - imageUrl: input.imageUrl as string | undefined, - }, - }), - ), - ) - - return { - ...insertedNotification, - totalTargets: targetDevices.length, + throw error } }, }, }) - -async function resolveChannelId( - db: ReturnType, - appId: string, - type: string, -): Promise { - const rows = await db - .select() - .from(tables.channel) - .where( - and( - eq(tables.channel.appId, appId), - eq(tables.channel.type, type as any), - eq(tables.channel.isActive, true), - ), - ) - .limit(1) - - if (!rows[0]) { - throw new Error(`No active ${type} channel configured for app ${appId}`) - } - - return rows[0].id -} diff --git a/app/server/graphql/notifications/notification.graphql b/app/server/graphql/notifications/notification.graphql index cb6bd08..f5a1e33 100644 --- a/app/server/graphql/notifications/notification.graphql +++ b/app/server/graphql/notifications/notification.graphql @@ -1,8 +1,11 @@ enum NotificationStatus { PENDING + QUEUED + PROCESSING SENT DELIVERED FAILED + PARTIAL SCHEDULED } @@ -26,6 +29,9 @@ type Notification { sound: String badge: Int status: NotificationStatus! + channelType: ChannelType + channelId: ID + contactIds: JSON # Targeting targetDevices: JSON # Array of device IDs @@ -98,4 +104,4 @@ extend type Mutation { sendNotification(input: SendNotificationInput!): Notification! scheduleNotification(input: SendNotificationInput!): Notification! cancelNotification(id: ID!): Boolean! -} \ No newline at end of file +} diff --git a/app/server/graphql/share/shared.graphql b/app/server/graphql/share/shared.graphql index dcbb369..722b4a1 100644 --- a/app/server/graphql/share/shared.graphql +++ b/app/server/graphql/share/shared.graphql @@ -54,7 +54,8 @@ enum HookEvent { NOTIFICATION_SENT NOTIFICATION_DELIVERED NOTIFICATION_FAILED + NOTIFICATION_OPENED NOTIFICATION_CLICKED WORKFLOW_COMPLETED WORKFLOW_FAILED -} \ No newline at end of file +} diff --git a/app/server/graphql/stats/query/dashboardStats.resolver.ts b/app/server/graphql/stats/query/dashboardStats.resolver.ts index 9cbfdfc..fcc31ff 100644 --- a/app/server/graphql/stats/query/dashboardStats.resolver.ts +++ b/app/server/graphql/stats/query/dashboardStats.resolver.ts @@ -1,6 +1,6 @@ import * as tables from '#server/database/schema' import { useDatabase } from '#server/utils/useDatabase' -import { count, eq, gte, sql } from 'drizzle-orm' +import { sql } from 'drizzle-orm' import { defineQuery } from 'nitro-graphql/define' export const statsQuery = defineQuery({ @@ -12,31 +12,27 @@ export const statsQuery = defineQuery({ yesterday.setDate(yesterday.getDate() - 1) const yesterdayIso = yesterday.toISOString() - // Total apps const totalAppsResult = await db - .select({ count: count() }) + .select({ count: sql`count(*)` }) .from(tables.app) - // Active devices const activeDevicesResult = await db - .select({ count: count() }) + .select({ count: sql`count(*)` }) .from(tables.device) - .where(eq(tables.device.status, 'ACTIVE')) + .where(sql`${tables.device.status} = 'ACTIVE'`) - // Notifications sent in last 24h const notificationsSentResult = await db - .select({ count: count() }) + .select({ count: sql`count(*)` }) .from(tables.notification) - .where(gte(tables.notification.createdAt, yesterdayIso)) + .where(sql`${tables.notification.createdAt} >= ${yesterdayIso}`) - // Calculate delivery rate const deliveryRateResult = await db .select({ delivered: sql`count(case when ${tables.deliveryLog.status} = 'DELIVERED' then 1 end)`, - total: count(), + total: sql`count(*)`, }) .from(tables.deliveryLog) - .where(gte(tables.deliveryLog.createdAt, yesterdayIso)) + .where(sql`${tables.deliveryLog.createdAt} >= ${yesterdayIso}`) const deliveryRate = (deliveryRateResult[0]?.total ?? 0) > 0 ? (deliveryRateResult[0]!.delivered / deliveryRateResult[0]!.total) * 100 diff --git a/app/server/plugins/scheduler.ts b/app/server/plugins/scheduler.ts index 9a26798..5505be5 100644 --- a/app/server/plugins/scheduler.ts +++ b/app/server/plugins/scheduler.ts @@ -1,15 +1,21 @@ -import { and, eq, inArray, lte } from 'drizzle-orm' +import { and, eq, lte } from 'drizzle-orm' import { definePlugin } from 'nitro' import { getDatabase } from '../database/connection' -import { device, notification } from '../database/schema' -import { addSendNotificationJob } from '../queues/notification.queue' +import { notification } from '../database/schema' +import { addFanoutNotificationJob } from '../queues/notification.queue' -const SCHEDULER_INTERVAL = 60000 // 1 minute -const BATCH_SIZE = 100 +const SCHEDULER_INTERVAL = Number.parseInt(process.env.SCHEDULER_INTERVAL_MS || '5000') +const BATCH_SIZE = Number.parseInt(process.env.SCHEDULER_BATCH_SIZE || '500') let schedulerInterval: ReturnType | null = null +let isSchedulerRunning = false async function processScheduledNotifications() { + if (isSchedulerRunning) { + return + } + + isSchedulerRunning = true const db = getDatabase() try { @@ -23,6 +29,7 @@ async function processScheduledNotifications() { lte(notification.scheduledAt, new Date().toISOString()), ), ) + .orderBy(notification.scheduledAt, notification.id) .limit(BATCH_SIZE) if (dueNotifications.length === 0) { @@ -33,98 +40,51 @@ async function processScheduledNotifications() { for (const notif of dueNotifications) { try { - // Mark as PENDING to prevent duplicate processing - await db - .update(notification) - .set({ status: 'PENDING', updatedAt: new Date().toISOString() }) - .where(eq(notification.id, notif.id)) - - // Get target devices - let targetDevices = [] - - if (notif.targetDevices && (notif.targetDevices as string[]).length > 0) { - // Specific devices - targetDevices = await db - .select() - .from(device) - .where(inArray(device.token, notif.targetDevices as string[])) - } - else { - // All devices for app (optionally filtered by platform) - const whereConditions = [eq(device.appId, notif.appId)] - - if (notif.platforms && (notif.platforms as string[]).length > 0) { - whereConditions.push(inArray(device.platform, notif.platforms as any)) - } - - targetDevices = await db - .select() - .from(device) - .where(and(...whereConditions)) - } - - // Update target count - await db + const now = new Date().toISOString() + const claimedRows = await db .update(notification) - .set({ totalTargets: targetDevices.length }) - .where(eq(notification.id, notif.id)) + .set({ status: 'PENDING', updatedAt: now }) + .where(and(eq(notification.id, notif.id), eq(notification.status, 'SCHEDULED'))) + .returning({ id: notification.id }) - if (targetDevices.length === 0) { - // No devices to send to, mark as sent - await db - .update(notification) - .set({ - status: 'SENT', - sentAt: new Date().toISOString(), - updatedAt: new Date().toISOString(), - }) - .where(eq(notification.id, notif.id)) + if (!claimedRows[0]) { continue } - // Queue jobs for each device - for (const dev of targetDevices) { - await addSendNotificationJob({ - deliveryMode: 'device', - notificationId: notif.id, - deviceId: dev.id, - appId: notif.appId, - platform: dev.platform.toLowerCase() as 'ios' | 'android' | 'web', - token: dev.token, - webPushP256dh: dev.webPushP256dh || undefined, - webPushAuth: dev.webPushAuth || undefined, - payload: { - title: notif.title, - body: notif.body, - data: notif.data as Record | undefined, - badge: notif.badge || undefined, - sound: notif.sound || undefined, - clickAction: notif.clickAction || undefined, - imageUrl: notif.imageUrl || undefined, - }, - }) - } + await addFanoutNotificationJob({ + notificationId: notif.id, + appId: notif.appId, + channelType: notif.channelType || undefined, + channelId: notif.channelId || undefined, + contactIds: (notif.contactIds as string[] | null) || undefined, + targetDevices: (notif.targetDevices as string[] | null) || undefined, + platforms: (notif.platforms as string[] | null) || undefined, + payload: { + title: notif.title, + body: notif.body, + data: notif.data as Record | undefined, + badge: notif.badge || undefined, + sound: notif.sound || undefined, + clickAction: notif.clickAction || undefined, + imageUrl: notif.imageUrl || undefined, + }, + }) - // Update status to SENT await db .update(notification) .set({ - status: 'SENT', - sentAt: new Date().toISOString(), - updatedAt: new Date().toISOString(), + status: 'QUEUED', + updatedAt: now, }) .where(eq(notification.id, notif.id)) - - console.log(`[Scheduler] Queued ${targetDevices.length} jobs for notification ${notif.id}`) } catch (error) { console.error(`[Scheduler] Error processing notification ${notif.id}:`, error) - // Mark as failed await db .update(notification) .set({ - status: 'FAILED', + status: 'SCHEDULED', updatedAt: new Date().toISOString(), }) .where(eq(notification.id, notif.id)) @@ -134,6 +94,9 @@ async function processScheduledNotifications() { catch (error) { console.error('[Scheduler] Error in processScheduledNotifications:', error) } + finally { + isSchedulerRunning = false + } } function startScheduler() { diff --git a/app/server/plugins/worker.ts b/app/server/plugins/worker.ts index 36b12ca..1e4d1d4 100644 --- a/app/server/plugins/worker.ts +++ b/app/server/plugins/worker.ts @@ -1,12 +1,22 @@ -import type { SendNotificationJobData } from '#server/queues/notification.queue' +import type { FanoutNotificationJobData, SendNotificationJobData } from '#server/queues/notification.queue' +import type { TrackNotificationEventJobData } from '#server/queues/tracking.queue' +import type { DispatchWebhookJobData } from '#server/queues/webhook.queue' import type { ExecuteWorkflowStepJobData, TriggerWorkflowJobData } from '#server/queues/workflow.queue' import { getNotificationQueue } from '#server/queues/notification.queue' +import { getTrackingQueue } from '#server/queues/tracking.queue' +import { getWebhookQueue } from '#server/queues/webhook.queue' import { getWorkflowQueue } from '#server/queues/workflow.queue' import { useWorker } from '#server/utils/bullmq' import { processNotificationJob } from '#server/workers/notification.worker' +import { processTrackingJob } from '#server/workers/tracking.worker' +import { processWebhookJob } from '#server/workers/webhook.worker' import { processWorkflowJob } from '#server/workers/workflow.worker' import { definePlugin } from 'nitro' +function isWorkerEnabled(envName: string) { + return process.env[envName] !== 'false' +} + export default definePlugin((_nitroApp) => { console.log('[WorkerPlugin] Initializing...') if (import.meta.prerender) { @@ -14,22 +24,39 @@ export default definePlugin((_nitroApp) => { return } - // Eagerly initialize queues so BullMQ metadata exists in Redis before workers start - getNotificationQueue() - getWorkflowQueue() - - // Notification worker - useWorker('notifications', async (job) => { - console.log(`[WorkerPlugin] Processor called for job ${job.id} (${job.name})`) - if (job.name === 'send-notification') { - return processNotificationJob(job) - } - console.warn(`[NotificationWorker] Unknown job: ${job.name}`) - return undefined - }) - - // Workflow worker - useWorker('workflows', async (job) => { - return processWorkflowJob(job) - }, { concurrency: 5 }) + const notificationWorkerEnabled = isWorkerEnabled('NOTIFICATION_WORKER_ENABLED') + const trackingWorkerEnabled = isWorkerEnabled('TRACKING_WORKER_ENABLED') + const webhookWorkerEnabled = isWorkerEnabled('WEBHOOK_WORKER_ENABLED') + const workflowWorkerEnabled = isWorkerEnabled('WORKFLOW_WORKER_ENABLED') + + if (notificationWorkerEnabled) + getNotificationQueue() + if (trackingWorkerEnabled) + getTrackingQueue() + if (webhookWorkerEnabled) + getWebhookQueue() + if (workflowWorkerEnabled) + getWorkflowQueue() + + if (notificationWorkerEnabled) { + useWorker('notifications', processNotificationJob) + } + + if (trackingWorkerEnabled) { + useWorker('tracking', processTrackingJob, { + concurrency: 50, + }) + } + + if (webhookWorkerEnabled) { + useWorker('webhooks', processWebhookJob, { + concurrency: 20, + }) + } + + if (workflowWorkerEnabled) { + useWorker('workflows', async (job) => { + return processWorkflowJob(job) + }, { concurrency: 5 }) + } }) diff --git a/app/server/queues/notification.queue.ts b/app/server/queues/notification.queue.ts index 2a39913..ed8b236 100644 --- a/app/server/queues/notification.queue.ts +++ b/app/server/queues/notification.queue.ts @@ -1,5 +1,6 @@ import type { ChannelType } from '#server/database/schema/enums' import { useQueue } from '#server/utils/bullmq' +import { getSendNotificationJobId } from '#server/utils/notificationJobId' interface BaseJobData { notificationId: string @@ -31,18 +32,54 @@ interface ChannelJobData extends BaseJobData { channelType: ChannelType } +export interface FanoutNotificationJobData extends BaseJobData { + channelType?: ChannelType + channelId?: string + contactIds?: string[] + targetDevices?: string[] + platforms?: string[] +} + export type SendNotificationJobData = DeviceJobData | ChannelJobData export function getNotificationQueue() { - return useQueue('notifications') + return useQueue('notifications') } export async function addSendNotificationJob(data: SendNotificationJobData) { const queue = getNotificationQueue() - const job = await queue.add('send-notification', data) - const counts = await queue.getJobCounts('waiting', 'active', 'failed', 'delayed') - console.log(`[NotificationQueue] Job ${job.id} added (${data.deliveryMode}) — counts:`, counts) - return job + return queue.add('send-notification', data, { + jobId: getSendNotificationJobId(data), + }) +} + +export async function addSendNotificationJobs(data: SendNotificationJobData[]) { + if (data.length === 0) { + return [] + } + + const queue = getNotificationQueue() + return queue.addBulk( + data.map(job => ({ + name: 'send-notification', + data: job, + opts: { + jobId: getSendNotificationJobId(job), + }, + })), + ) +} + +export async function addFanoutNotificationJob(data: FanoutNotificationJobData) { + const queue = getNotificationQueue() + return queue.add('fanout-notification', data, { + jobId: `fanout:${data.notificationId}`, + attempts: 5, + backoff: { + type: 'exponential', + delay: 10_000, + }, + }) } export async function closeNotificationQueue() { diff --git a/app/server/queues/tracking.queue.ts b/app/server/queues/tracking.queue.ts new file mode 100644 index 0000000..6c105bd --- /dev/null +++ b/app/server/queues/tracking.queue.ts @@ -0,0 +1,16 @@ +import { useQueue } from '#server/utils/bullmq' + +export interface TrackNotificationEventJobData { + type: 'open' | 'click' + deliveryLogId: string + destination?: string +} + +export function getTrackingQueue() { + return useQueue('tracking') +} + +export async function addTrackNotificationEventJob(data: TrackNotificationEventJobData) { + const queue = getTrackingQueue() + return queue.add('track-notification-event', data) +} diff --git a/app/server/queues/webhook.queue.ts b/app/server/queues/webhook.queue.ts new file mode 100644 index 0000000..13d557e --- /dev/null +++ b/app/server/queues/webhook.queue.ts @@ -0,0 +1,18 @@ +import type { HookEvent } from '#server/utils/webhookDispatcher' +import { useQueue } from '#server/utils/bullmq' + +export interface DispatchWebhookJobData { + appId: string + event: HookEvent + payload: Record + occurredAt?: string +} + +export function getWebhookQueue() { + return useQueue('webhooks') +} + +export async function addDispatchWebhookJob(data: DispatchWebhookJobData) { + const queue = getWebhookQueue() + return queue.add('dispatch-webhook', data) +} diff --git a/app/server/utils/apiKeyCache.ts b/app/server/utils/apiKeyCache.ts new file mode 100644 index 0000000..efba7ef --- /dev/null +++ b/app/server/utils/apiKeyCache.ts @@ -0,0 +1,54 @@ +export interface CachedApiKey { + id: string + appId: string + permissions: string[] + appName: string +} + +export class ApiKeyCache { + private readonly ttlMs: number + private readonly writeIntervalMs: number + private readonly values = new Map() + private readonly lastWriteAt = new Map() + + constructor(ttlMs: number, writeIntervalMs: number) { + this.ttlMs = ttlMs + this.writeIntervalMs = writeIntervalMs + } + + get(key: string, now: number = Date.now()): CachedApiKey | null { + const entry = this.values.get(key) + if (!entry) { + return null + } + + if (entry.expiresAt <= now) { + this.values.delete(key) + return null + } + + return entry.value + } + + set(key: string, value: CachedApiKey, now: number = Date.now()) { + this.values.set(key, { + value, + expiresAt: now + this.ttlMs, + }) + } + + shouldWriteUsage(id: string, now: number = Date.now()) { + const lastWrite = this.lastWriteAt.get(id) + if (lastWrite !== undefined && now - lastWrite < this.writeIntervalMs) { + return false + } + + this.lastWriteAt.set(id, now) + return true + } + + clear() { + this.values.clear() + this.lastWriteAt.clear() + } +} diff --git a/app/server/utils/auth.ts b/app/server/utils/auth.ts index c95b947..1c4c6ab 100644 --- a/app/server/utils/auth.ts +++ b/app/server/utils/auth.ts @@ -5,6 +5,7 @@ import jwt from 'jsonwebtoken' import { HTTPError } from 'nitro/h3' import { getDatabase } from '../database/connection' import { apiKey, app } from '../database/schema' +import { ApiKeyCache } from './apiKeyCache' const JWT_SECRET = process.env.JWT_SECRET @@ -17,6 +18,9 @@ if (!JWT_SECRET) { } const EFFECTIVE_JWT_SECRET = JWT_SECRET ?? 'dev-only-insecure-jwt-secret-do-not-use-in-production' +const API_KEY_CACHE_TTL_MS = Number.parseInt(process.env.API_KEY_CACHE_TTL_MS || '30000') +const API_KEY_USAGE_WRITE_INTERVAL_MS = Number.parseInt(process.env.API_KEY_USAGE_WRITE_INTERVAL_MS || '300000') +const apiKeyCache = new ApiKeyCache(API_KEY_CACHE_TTL_MS, API_KEY_USAGE_WRITE_INTERVAL_MS) export interface JWTPayload { appId: string @@ -44,6 +48,12 @@ export function generateApiKey(): string { export async function validateApiKey(apiKeyValue: string) { const db = getDatabase() + const cached = apiKeyCache.get(apiKeyValue) + + if (cached) { + scheduleApiKeyUsageUpdate(cached.id) + return cached + } const result = await db .select({ @@ -77,18 +87,17 @@ export async function validateApiKey(apiKeyValue: string) { return null } - // Update last used timestamp - await db - .update(apiKey) - .set({ lastUsedAt: new Date().toISOString() }) - .where(eq(apiKey.id, keyData.id)) - - return { + const value = { id: keyData.id, appId: keyData.appId, permissions: keyData.permissions as string[] || [], appName: keyData.appName, } + + apiKeyCache.set(apiKeyValue, value) + scheduleApiKeyUsageUpdate(keyData.id) + + return value } export async function verifyApiKey(apiKeyValue: string) { @@ -162,3 +171,22 @@ export async function extractAuthFromEvent(event: H3Event) { }) } } + +function scheduleApiKeyUsageUpdate(apiKeyId: string) { + if (!apiKeyCache.shouldWriteUsage(apiKeyId)) { + return + } + + const db = getDatabase() + void db + .update(apiKey) + .set({ lastUsedAt: new Date().toISOString() }) + .where(eq(apiKey.id, apiKeyId)) + .catch((error) => { + console.error('[Auth] Failed to update API key lastUsedAt:', error) + }) +} + +export function clearApiKeyCacheForTests() { + apiKeyCache.clear() +} diff --git a/app/server/utils/batching.ts b/app/server/utils/batching.ts new file mode 100644 index 0000000..315e7be --- /dev/null +++ b/app/server/utils/batching.ts @@ -0,0 +1,12 @@ +export function chunkArray(items: readonly T[], size: number): T[][] { + if (size <= 0) { + throw new Error('Chunk size must be greater than 0') + } + + const chunks: T[][] = [] + for (let index = 0; index < items.length; index += size) { + chunks.push(items.slice(index, index + size)) + } + + return chunks +} diff --git a/app/server/utils/bullmq.ts b/app/server/utils/bullmq.ts index 87cf22d..e61cd40 100644 --- a/app/server/utils/bullmq.ts +++ b/app/server/utils/bullmq.ts @@ -34,7 +34,6 @@ export function useQueue( }, }) queueMap.set(name, queue) - console.log(`[Queue:${name}] Created`) } return queueMap.get(name) as Queue } @@ -51,15 +50,12 @@ export function useWorker( connection: createRedisConnection(), }) - worker.on('ready', () => console.log(`[Worker:${name}] Ready — listening for jobs`)) - worker.on('active', job => console.log(`[Worker:${name}] Job ${job.id} active`)) - worker.on('completed', job => console.log(`[Worker:${name}] Job ${job.id} completed`)) + worker.on('ready', () => console.log(`[Worker:${name}] Ready`)) worker.on('failed', (job, err) => console.error(`[Worker:${name}] Job ${job?.id} failed:`, err.message)) worker.on('stalled', jobId => console.warn(`[Worker:${name}] Job ${jobId} stalled — was active when process died`)) worker.on('error', err => console.error(`[Worker:${name}] Error:`, err.message)) workerMap.set(name, worker) - console.log(`[Worker:${name}] Started`) } return workerMap.get(name) as Worker } diff --git a/app/server/utils/notificationJobId.ts b/app/server/utils/notificationJobId.ts new file mode 100644 index 0000000..c4b4a2d --- /dev/null +++ b/app/server/utils/notificationJobId.ts @@ -0,0 +1,20 @@ +interface DeviceJobShape { + deliveryMode: 'device' + notificationId: string + deviceId: string +} + +interface ChannelJobShape { + deliveryMode: 'channel' + notificationId: string + channelId: string + to: string +} + +export function getSendNotificationJobId(data: DeviceJobShape | ChannelJobShape) { + if (data.deliveryMode === 'device') { + return `notification:${data.notificationId}:device:${data.deviceId}` + } + + return `notification:${data.notificationId}:channel:${data.channelId}:${data.to}` +} diff --git a/app/server/utils/notificationStatus.ts b/app/server/utils/notificationStatus.ts new file mode 100644 index 0000000..431affc --- /dev/null +++ b/app/server/utils/notificationStatus.ts @@ -0,0 +1,33 @@ +export type NotificationLifecycleStatus + = | 'PENDING' + | 'QUEUED' + | 'PROCESSING' + | 'SENT' + | 'DELIVERED' + | 'FAILED' + | 'PARTIAL' + | 'SCHEDULED' + +export function getNotificationTerminalStatus( + totalTargets: number, + totalSent: number, + totalFailed: number, +): NotificationLifecycleStatus | null { + if (totalTargets <= 0) { + return 'SENT' + } + + if (totalSent + totalFailed < totalTargets) { + return null + } + + if (totalFailed === 0) { + return 'SENT' + } + + if (totalSent === 0) { + return 'FAILED' + } + + return 'PARTIAL' +} diff --git a/app/server/utils/notificationTargeting.ts b/app/server/utils/notificationTargeting.ts new file mode 100644 index 0000000..a2c0d16 --- /dev/null +++ b/app/server/utils/notificationTargeting.ts @@ -0,0 +1,214 @@ +import type { ChannelType } from '#server/database/schema/enums' +import type { InferSelectModel } from 'drizzle-orm' +import * as tables from '#server/database/schema' +import { and, asc, count, eq, gt, inArray, isNotNull, or, sql } from 'drizzle-orm' + +type Database = ReturnType + +const DEFAULT_PAGE_SIZE = Number.parseInt(process.env.NOTIFICATION_TARGET_BATCH_SIZE || '1000') + +type DeviceRow = Pick< + InferSelectModel, + 'id' | 'appId' | 'token' | 'platform' | 'webPushP256dh' | 'webPushAuth' +> + +type ContactRow = Pick, 'id' | 'email' | 'phone'> + +interface PushTargeting { + appId: string + targetDevices?: string[] | null + platforms?: string[] | null +} + +interface ChannelTargeting { + appId: string + channelType: Exclude + contactIds?: string[] | null +} + +function toCount(value: unknown): number { + if (typeof value === 'number') { + return value + } + + if (typeof value === 'string') { + return Number.parseInt(value, 10) + } + + return 0 +} + +export async function resolveActiveChannelId( + db: Database, + appId: string, + type: Exclude, +): Promise { + const rows = await db + .select({ id: tables.channel.id }) + .from(tables.channel) + .where( + and( + eq(tables.channel.appId, appId), + eq(tables.channel.type, type), + eq(tables.channel.isActive, true), + ), + ) + .limit(1) + + if (!rows[0]) { + throw new Error(`No active ${type} channel configured for app ${appId}`) + } + + return rows[0].id +} + +export async function countPushTargets(db: Database, targeting: PushTargeting): Promise { + const targetDevices = targeting.targetDevices?.filter(Boolean) ?? [] + const platforms = targeting.platforms?.filter(Boolean) ?? [] + + if (targetDevices.length > 0) { + const rows = await db + .select({ count: count() }) + .from(tables.device) + .where(or( + inArray(tables.device.token, targetDevices), + inArray(tables.device.id, targetDevices), + )) + + return toCount(rows[0]?.count) + } + + const conditions = [eq(tables.device.appId, targeting.appId)] + if (platforms.length > 0) { + conditions.push(inArray(tables.device.platform, platforms as any)) + } + + const rows = await db + .select({ count: count() }) + .from(tables.device) + .where(and(...conditions)) + + return toCount(rows[0]?.count) +} + +export async function countChannelTargets(db: Database, targeting: ChannelTargeting): Promise { + if (targeting.channelType === 'DISCORD' || targeting.channelType === 'TELEGRAM') { + return 1 + } + + const conditions = [eq(tables.contact.appId, targeting.appId)] + const contactIds = targeting.contactIds?.filter(Boolean) ?? [] + + if (contactIds.length > 0) { + conditions.push(inArray(tables.contact.id, contactIds)) + } + + if (targeting.channelType === 'EMAIL') { + conditions.push(isNotNull(tables.contact.email)) + } + else if (targeting.channelType === 'SMS') { + conditions.push(isNotNull(tables.contact.phone)) + } + + const rows = await db + .select({ count: count() }) + .from(tables.contact) + .where(and(...conditions)) + + return toCount(rows[0]?.count) +} + +export async function loadPushTargetBatch( + db: Database, + targeting: PushTargeting, + afterId?: string, + pageSize: number = DEFAULT_PAGE_SIZE, +): Promise { + const targetDevices = targeting.targetDevices?.filter(Boolean) ?? [] + const platforms = targeting.platforms?.filter(Boolean) ?? [] + const conditions = [] + + if (targetDevices.length > 0) { + conditions.push(or( + inArray(tables.device.token, targetDevices), + inArray(tables.device.id, targetDevices), + )!) + } + else { + conditions.push(eq(tables.device.appId, targeting.appId)) + } + if (platforms.length > 0 && targetDevices.length === 0) { + conditions.push(inArray(tables.device.platform, platforms as any)) + } + if (afterId) { + conditions.push(gt(tables.device.id, afterId)) + } + + return await db + .select({ + id: tables.device.id, + appId: tables.device.appId, + token: tables.device.token, + platform: tables.device.platform, + webPushP256dh: tables.device.webPushP256dh, + webPushAuth: tables.device.webPushAuth, + }) + .from(tables.device) + .where(and(...conditions)) + .orderBy(asc(tables.device.id)) + .limit(pageSize) +} + +export async function loadChannelTargetBatch( + db: Database, + targeting: ChannelTargeting, + afterId?: string, + pageSize: number = DEFAULT_PAGE_SIZE, +): Promise { + const contactIds = targeting.contactIds?.filter(Boolean) ?? [] + + const conditions = [eq(tables.contact.appId, targeting.appId)] + if (contactIds.length > 0) { + conditions.push(inArray(tables.contact.id, contactIds)) + } + + if (targeting.channelType === 'EMAIL') { + conditions.push(isNotNull(tables.contact.email)) + } + else if (targeting.channelType === 'SMS') { + conditions.push(isNotNull(tables.contact.phone)) + } + if (afterId) { + conditions.push(gt(tables.contact.id, afterId)) + } + + return await db + .select({ + id: tables.contact.id, + email: tables.contact.email, + phone: tables.contact.phone, + }) + .from(tables.contact) + .where(and(...conditions)) + .orderBy(asc(tables.contact.id)) + .limit(pageSize) +} + +export async function aggregateNotificationMetrics(db: Database, notificationId: string) { + const rows = await db + .select({ + sentCount: count(), + deliveredCount: sql`count(*) filter (where ${tables.deliveryLog.deliveredAt} is not null)`, + openedCount: sql`count(*) filter (where ${tables.deliveryLog.openedAt} is not null)`, + clickedCount: sql`count(*) filter (where ${tables.deliveryLog.clickedAt} is not null)`, + }) + .from(tables.deliveryLog) + .where(eq(tables.deliveryLog.notificationId, notificationId)) + + return { + sentCount: toCount(rows[0]?.sentCount), + deliveredCount: toCount(rows[0]?.deliveredCount), + openedCount: toCount(rows[0]?.openedCount), + clickedCount: toCount(rows[0]?.clickedCount), + } +} diff --git a/app/server/utils/webhookDispatcher.ts b/app/server/utils/webhookDispatcher.ts index 1597a42..24ed5d2 100644 --- a/app/server/utils/webhookDispatcher.ts +++ b/app/server/utils/webhookDispatcher.ts @@ -1,6 +1,7 @@ import { createHmac } from 'node:crypto' import { getDatabase } from '#server/database/connection' import * as tables from '#server/database/schema' +import { addDispatchWebhookJob } from '#server/queues/webhook.queue' import { decryptSensitiveData, isDataEncrypted } from '#server/utils/crypto' import { and, eq } from 'drizzle-orm' @@ -21,6 +22,20 @@ export async function dispatchHooks( appId: string, event: HookEvent, payload: Record, +): Promise { + await addDispatchWebhookJob({ + appId, + event, + payload, + occurredAt: new Date().toISOString(), + }) +} + +export async function deliverHooksForEvent( + appId: string, + event: HookEvent, + payload: Record, + occurredAt: string = new Date().toISOString(), ): Promise { const db = getDatabase() @@ -38,7 +53,7 @@ export async function dispatchHooks( return } - const body = JSON.stringify({ event, appId, timestamp: new Date().toISOString(), payload }) + const body = JSON.stringify({ event, appId, timestamp: occurredAt, payload }) await Promise.allSettled( matchingHooks.map(h => deliverHook(h.url, h.secret, body)), diff --git a/app/server/workers/notification.worker.ts b/app/server/workers/notification.worker.ts index e309d81..9a83be5 100644 --- a/app/server/workers/notification.worker.ts +++ b/app/server/workers/notification.worker.ts @@ -1,14 +1,240 @@ import type { Job } from 'bullmq' -import type { SendNotificationJobData } from '../queues/notification.queue' +import type { FanoutNotificationJobData, SendNotificationJobData } from '../queues/notification.queue' import { eq, sql } from 'drizzle-orm' import { v7 as uuidv7 } from 'uuid' import { getChannelById } from '../channels' import { getDatabase } from '../database/connection' import { deliveryLog, notification } from '../database/schema' import { getProviderForApp } from '../providers' +import { addSendNotificationJobs } from '../queues/notification.queue' +import { chunkArray } from '../utils/batching' +import { getNotificationTerminalStatus } from '../utils/notificationStatus' +import { + countChannelTargets, + countPushTargets, + loadChannelTargetBatch, + loadPushTargetBatch, + resolveActiveChannelId, +} from '../utils/notificationTargeting' import { dispatchHooks } from '../utils/webhookDispatcher' const TRACKING_BASE_URL = process.env.APP_URL || 'http://localhost:3412' +const ENQUEUE_CHUNK_SIZE = Number.parseInt(process.env.NOTIFICATION_ENQUEUE_CHUNK_SIZE || '500') + +async function processFanoutJob(job: Job) { + const db = getDatabase() + const now = new Date().toISOString() + const channelType = job.data.channelType + await db + .update(notification) + .set({ + status: 'PROCESSING', + updatedAt: now, + }) + .where(eq(notification.id, job.data.notificationId)) + + try { + if (channelType && channelType !== 'PUSH') { + const resolvedChannelId = job.data.channelId ?? await resolveActiveChannelId(db, job.data.appId, channelType) + const payload = { + title: job.data.payload.title, + body: job.data.payload.body, + data: job.data.payload.data, + } + + if (channelType === 'DISCORD' || channelType === 'TELEGRAM') { + await addSendNotificationJobs([{ + deliveryMode: 'channel', + notificationId: job.data.notificationId, + appId: job.data.appId, + channelId: resolvedChannelId, + to: '', + channelType, + payload, + }]) + + await db + .update(notification) + .set({ + totalTargets: 1, + status: 'PROCESSING', + updatedAt: now, + }) + .where(eq(notification.id, job.data.notificationId)) + + return + } + + const totalTargets = await countChannelTargets(db, { + appId: job.data.appId, + channelType, + contactIds: job.data.contactIds, + }) + + let queued = 0 + let cursor: string | undefined + while (queued < totalTargets) { + const contacts = await loadChannelTargetBatch(db, { + appId: job.data.appId, + channelType, + contactIds: job.data.contactIds, + }, cursor) + + if (contacts.length === 0) { + break + } + + const jobs = contacts.flatMap((contact) => { + const to = channelType === 'EMAIL' + ? contact.email + : channelType === 'SMS' + ? contact.phone + : contact.id + + if (!to) { + return [] + } + + return [{ + deliveryMode: 'channel' as const, + notificationId: job.data.notificationId, + appId: job.data.appId, + channelId: resolvedChannelId, + to, + channelType, + payload, + }] + }) + + for (const batch of chunkArray(jobs, ENQUEUE_CHUNK_SIZE)) { + await addSendNotificationJobs(batch) + } + + queued += jobs.length + cursor = contacts.at(-1)?.id + } + + await db + .update(notification) + .set({ + totalTargets: queued, + status: queued === 0 ? 'SENT' : 'PROCESSING', + sentAt: queued === 0 ? now : undefined, + updatedAt: now, + }) + .where(eq(notification.id, job.data.notificationId)) + + return + } + + const totalTargets = await countPushTargets(db, { + appId: job.data.appId, + targetDevices: job.data.targetDevices, + platforms: job.data.platforms, + }) + + let queued = 0 + let cursor: string | undefined + while (queued < totalTargets) { + const devices = await loadPushTargetBatch(db, { + appId: job.data.appId, + targetDevices: job.data.targetDevices, + platforms: job.data.platforms, + }, cursor) + + if (devices.length === 0) { + break + } + + const jobs = devices.map(device => ({ + deliveryMode: 'device' as const, + notificationId: job.data.notificationId, + deviceId: device.id, + appId: job.data.appId, + platform: device.platform.toLowerCase() as 'ios' | 'android' | 'web', + token: device.token, + webPushP256dh: device.webPushP256dh ?? undefined, + webPushAuth: device.webPushAuth ?? undefined, + payload: job.data.payload, + })) + + for (const batch of chunkArray(jobs, ENQUEUE_CHUNK_SIZE)) { + await addSendNotificationJobs(batch) + } + + queued += jobs.length + cursor = devices.at(-1)?.id + } + + await db + .update(notification) + .set({ + totalTargets: queued, + status: queued === 0 ? 'SENT' : 'PROCESSING', + sentAt: queued === 0 ? now : undefined, + updatedAt: now, + }) + .where(eq(notification.id, job.data.notificationId)) + } + catch (error) { + await db + .update(notification) + .set({ + status: 'FAILED', + updatedAt: new Date().toISOString(), + }) + .where(eq(notification.id, job.data.notificationId)) + + throw error + } +} + +async function updateNotificationDeliveryProgress( + notificationId: string, + sentDelta: number, + failedDelta: number, +) { + const db = getDatabase() + const now = new Date().toISOString() + const [updated] = await db + .update(notification) + .set({ + totalSent: sentDelta > 0 ? sql`"totalSent" + ${sentDelta}` : sql`"totalSent"`, + totalFailed: failedDelta > 0 ? sql`"totalFailed" + ${failedDelta}` : sql`"totalFailed"`, + status: 'PROCESSING', + sentAt: sentDelta > 0 ? sql`coalesce("sentAt", ${now})` : undefined, + updatedAt: now, + }) + .where(eq(notification.id, notificationId)) + .returning({ + totalTargets: notification.totalTargets, + totalSent: notification.totalSent, + totalFailed: notification.totalFailed, + }) + + if (!updated) { + return + } + + const terminalStatus = getNotificationTerminalStatus( + updated.totalTargets, + updated.totalSent, + updated.totalFailed, + ) + + if (!terminalStatus || terminalStatus === 'PROCESSING') { + return + } + + await db + .update(notification) + .set({ + status: terminalStatus, + sentAt: sql`coalesce("sentAt", ${now})`, + updatedAt: now, + }) + .where(eq(notification.id, notificationId)) +} async function processChannelDelivery(job: Job) { if (job.data.deliveryMode !== 'channel') @@ -50,10 +276,7 @@ async function processChannelDelivery(job: Job) { }) if (result.success) { - await db - .update(notification) - .set({ totalSent: sql`"totalSent" + 1`, updatedAt: new Date().toISOString() }) - .where(eq(notification.id, notificationId)) + await updateNotificationDeliveryProgress(notificationId, 1, 0) await dispatchHooks(appId, 'NOTIFICATION_SENT', { notificationId, @@ -62,10 +285,7 @@ async function processChannelDelivery(job: Job) { }) } else { - await db - .update(notification) - .set({ totalFailed: sql`"totalFailed" + 1`, updatedAt: new Date().toISOString() }) - .where(eq(notification.id, notificationId)) + await updateNotificationDeliveryProgress(notificationId, 0, 1) await dispatchHooks(appId, 'NOTIFICATION_FAILED', { notificationId, @@ -135,10 +355,7 @@ async function processDeviceDelivery(job: Job) { }) if (result.success) { - await db - .update(notification) - .set({ totalSent: sql`"totalSent" + 1`, updatedAt: new Date().toISOString() }) - .where(eq(notification.id, notificationId)) + await updateNotificationDeliveryProgress(notificationId, 1, 0) await dispatchHooks(appId, 'NOTIFICATION_SENT', { notificationId, @@ -147,10 +364,7 @@ async function processDeviceDelivery(job: Job) { }) } else { - await db - .update(notification) - .set({ totalFailed: sql`"totalFailed" + 1`, updatedAt: new Date().toISOString() }) - .where(eq(notification.id, notificationId)) + await updateNotificationDeliveryProgress(notificationId, 0, 1) await dispatchHooks(appId, 'NOTIFICATION_FAILED', { notificationId, @@ -179,10 +393,7 @@ async function processDeviceDelivery(job: Job) { errorMessage: error instanceof Error ? error.message : 'Unknown error', sentAt: null, }) - await db - .update(notification) - .set({ totalFailed: sql`"totalFailed" + 1`, updatedAt: new Date().toISOString() }) - .where(eq(notification.id, notificationId)) + await updateNotificationDeliveryProgress(notificationId, 0, 1) } catch (dbError) { console.error('[NotificationWorker] Failed to write delivery log:', dbError) @@ -193,10 +404,19 @@ async function processDeviceDelivery(job: Job) { } } -export async function processNotificationJob(job: Job) { - console.log(`[NotificationWorker] Processing job ${job.id} (${job.name})`) - if (job.data.deliveryMode === 'channel') { - return processChannelDelivery(job) +export async function processNotificationJob(job: Job) { + if (job.name === 'fanout-notification') { + return processFanoutJob(job as Job) + } + + if (job.name !== 'send-notification') { + return + } + + const sendJob = job as Job + + if (sendJob.data.deliveryMode === 'channel') { + return processChannelDelivery(sendJob) } - return processDeviceDelivery(job) + return processDeviceDelivery(sendJob) } diff --git a/app/server/workers/tracking.worker.ts b/app/server/workers/tracking.worker.ts new file mode 100644 index 0000000..08934a5 --- /dev/null +++ b/app/server/workers/tracking.worker.ts @@ -0,0 +1,96 @@ +import type { TrackNotificationEventJobData } from '#server/queues/tracking.queue' +import type { Job } from 'bullmq' +import { getDatabase } from '#server/database/connection' +import { deliveryLog, notification } from '#server/database/schema' +import { dispatchHooks } from '#server/utils/webhookDispatcher' +import { and, eq, isNull, sql } from 'drizzle-orm' + +async function processOpenEvent(deliveryLogId: string) { + const db = getDatabase() + const now = new Date().toISOString() + + const updatedRows = await db + .update(deliveryLog) + .set({ openedAt: now, updatedAt: now }) + .where(and(eq(deliveryLog.id, deliveryLogId), isNull(deliveryLog.openedAt))) + .returning({ + id: deliveryLog.id, + notificationId: deliveryLog.notificationId, + }) + + const updated = updatedRows[0] + if (!updated) { + return + } + + const notificationRows = await db + .update(notification) + .set({ totalOpened: sql`"totalOpened" + 1`, updatedAt: now }) + .where(eq(notification.id, updated.notificationId)) + .returning({ + appId: notification.appId, + }) + + const notificationRow = notificationRows[0] + if (!notificationRow) { + return + } + + await dispatchHooks(notificationRow.appId, 'NOTIFICATION_OPENED', { + notificationId: updated.notificationId, + deliveryLogId, + openedAt: now, + }) +} + +async function processClickEvent(deliveryLogId: string, destination: string) { + const db = getDatabase() + const now = new Date().toISOString() + + const updatedRows = await db + .update(deliveryLog) + .set({ clickedAt: now, updatedAt: now }) + .where(and(eq(deliveryLog.id, deliveryLogId), isNull(deliveryLog.clickedAt))) + .returning({ + id: deliveryLog.id, + notificationId: deliveryLog.notificationId, + }) + + const updated = updatedRows[0] + if (!updated) { + return + } + + const notificationRows = await db + .update(notification) + .set({ totalClicked: sql`"totalClicked" + 1`, updatedAt: now }) + .where(eq(notification.id, updated.notificationId)) + .returning({ + appId: notification.appId, + }) + + const notificationRow = notificationRows[0] + if (!notificationRow) { + return + } + + await dispatchHooks(notificationRow.appId, 'NOTIFICATION_CLICKED', { + notificationId: updated.notificationId, + deliveryLogId, + clickedAt: now, + destination, + }) +} + +export async function processTrackingJob(job: Job) { + if (job.name !== 'track-notification-event') { + return + } + + if (job.data.type === 'open') { + await processOpenEvent(job.data.deliveryLogId) + return + } + + await processClickEvent(job.data.deliveryLogId, job.data.destination || '/') +} diff --git a/app/server/workers/webhook.worker.ts b/app/server/workers/webhook.worker.ts new file mode 100644 index 0000000..f9a2caf --- /dev/null +++ b/app/server/workers/webhook.worker.ts @@ -0,0 +1,12 @@ +import type { DispatchWebhookJobData } from '#server/queues/webhook.queue' +import type { Job } from 'bullmq' +import { deliverHooksForEvent } from '#server/utils/webhookDispatcher' + +export async function processWebhookJob(job: Job) { + if (job.name !== 'dispatch-webhook') { + return + } + + const { appId, event, payload, occurredAt } = job.data + await deliverHooksForEvent(appId, event, payload, occurredAt) +} diff --git a/app/tests/api-key-cache.test.ts b/app/tests/api-key-cache.test.ts new file mode 100644 index 0000000..b806925 --- /dev/null +++ b/app/tests/api-key-cache.test.ts @@ -0,0 +1,25 @@ +import { describe, expect, it } from 'vitest' +import { ApiKeyCache } from '../server/utils/apiKeyCache' + +describe('ApiKeyCache', () => { + it('returns cached values until ttl expires', () => { + const cache = new ApiKeyCache(100, 1_000) + cache.set('key', { + id: 'k1', + appId: 'a1', + permissions: ['send'], + appName: 'NitroPing', + }, 1_000) + + expect(cache.get('key', 1_050)?.id).toBe('k1') + expect(cache.get('key', 1_101)).toBeNull() + }) + + it('throttles usage writes per key', () => { + const cache = new ApiKeyCache(1_000, 5_000) + + expect(cache.shouldWriteUsage('k1', 1_000)).toBe(true) + expect(cache.shouldWriteUsage('k1', 2_000)).toBe(false) + expect(cache.shouldWriteUsage('k1', 6_001)).toBe(true) + }) +}) diff --git a/app/tests/batching.test.ts b/app/tests/batching.test.ts new file mode 100644 index 0000000..71f9f30 --- /dev/null +++ b/app/tests/batching.test.ts @@ -0,0 +1,12 @@ +import { describe, expect, it } from 'vitest' +import { chunkArray } from '../server/utils/batching' + +describe('chunkArray', () => { + it('splits arrays into fixed-size chunks', () => { + expect(chunkArray([1, 2, 3, 4, 5], 2)).toEqual([[1, 2], [3, 4], [5]]) + }) + + it('rejects non-positive chunk sizes', () => { + expect(() => chunkArray([1, 2, 3], 0)).toThrow('Chunk size must be greater than 0') + }) +}) diff --git a/app/tests/notification-queue.test.ts b/app/tests/notification-queue.test.ts new file mode 100644 index 0000000..75cd86a --- /dev/null +++ b/app/tests/notification-queue.test.ts @@ -0,0 +1,34 @@ +import { describe, expect, it } from 'vitest' +import { getSendNotificationJobId } from '../server/utils/notificationJobId' + +describe('getSendNotificationJobId', () => { + it('builds deterministic ids for device jobs', () => { + expect(getSendNotificationJobId({ + deliveryMode: 'device', + notificationId: 'n1', + deviceId: 'd1', + appId: 'a1', + platform: 'ios', + token: 'token-1', + payload: { + title: 'Hello', + body: 'World', + }, + })).toBe('notification:n1:device:d1') + }) + + it('builds deterministic ids for channel jobs', () => { + expect(getSendNotificationJobId({ + deliveryMode: 'channel', + notificationId: 'n1', + appId: 'a1', + channelId: 'c1', + to: 'foo@example.com', + channelType: 'EMAIL', + payload: { + title: 'Hello', + body: 'World', + }, + })).toBe('notification:n1:channel:c1:foo@example.com') + }) +}) diff --git a/app/tests/notification-status.test.ts b/app/tests/notification-status.test.ts new file mode 100644 index 0000000..fe41bff --- /dev/null +++ b/app/tests/notification-status.test.ts @@ -0,0 +1,20 @@ +import { describe, expect, it } from 'vitest' +import { getNotificationTerminalStatus } from '../server/utils/notificationStatus' + +describe('getNotificationTerminalStatus', () => { + it('returns null while deliveries are still in progress', () => { + expect(getNotificationTerminalStatus(10, 4, 1)).toBeNull() + }) + + it('returns SENT when all targets succeeded', () => { + expect(getNotificationTerminalStatus(5, 5, 0)).toBe('SENT') + }) + + it('returns FAILED when all targets failed', () => { + expect(getNotificationTerminalStatus(3, 0, 3)).toBe('FAILED') + }) + + it('returns PARTIAL when both success and failure exist', () => { + expect(getNotificationTerminalStatus(4, 3, 1)).toBe('PARTIAL') + }) +})