From 4955767a13885643ee05a983d36d287fecaa4940 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Thu, 23 Apr 2026 22:45:53 +0200 Subject: [PATCH] fix: improve healthcheck --- .../src/controllers/healthcheck.controller.ts | 91 ++++++++++--------- apps/worker/src/boot-workers.ts | 17 ++++ apps/worker/src/index.ts | 84 ++++++++++++++++- apps/worker/src/utils/graceful-shutdown.ts | 9 ++ apps/worker/src/utils/worker-heartbeat.ts | 23 +++++ packages/common/index.ts | 1 + packages/common/src/try-catch.ts | 15 +++ pnpm-lock.yaml | 16 ++-- pnpm-workspace.yaml | 2 +- 9 files changed, 205 insertions(+), 53 deletions(-) create mode 100644 apps/worker/src/utils/graceful-shutdown.ts create mode 100644 apps/worker/src/utils/worker-heartbeat.ts create mode 100644 packages/common/src/try-catch.ts diff --git a/apps/api/src/controllers/healthcheck.controller.ts b/apps/api/src/controllers/healthcheck.controller.ts index 2fc70cbd2..47fef627f 100644 --- a/apps/api/src/controllers/healthcheck.controller.ts +++ b/apps/api/src/controllers/healthcheck.controller.ts @@ -1,65 +1,74 @@ +import { tryCatch } from '@openpanel/common'; import { chQuery, db } from '@openpanel/db'; import { getRedisCache } from '@openpanel/redis'; import type { FastifyReply, FastifyRequest } from 'fastify'; import { isShuttingDown } from '@/utils/graceful-shutdown'; -// For docker compose healthcheck export async function healthcheck( request: FastifyRequest, reply: FastifyReply ) { - try { - const redisRes = await getRedisCache().ping(); - const dbRes = await db.$executeRaw`SELECT 1`; - const chRes = await chQuery('SELECT 1'); - const status = redisRes && dbRes && chRes ? 200 : 503; - reply.status(status).send({ - ready: status === 200, - redis: redisRes === 'PONG', - db: !!dbRes, - ch: chRes && chRes.length > 0, + const [redisResult, dbResult, chResult] = await Promise.all([ + tryCatch(async () => (await getRedisCache().ping()) === 'PONG'), + tryCatch(async () => !!(await db.$executeRaw`SELECT 1`)), + tryCatch(async () => (await chQuery('SELECT 1')).length > 0), + ]); + + const dependencies = { + redis: redisResult.ok && redisResult.data, + db: dbResult.ok && dbResult.data, + ch: chResult.ok && chResult.data, + }; + const dependencyErrors = { + redis: redisResult.error?.message, + db: dbResult.error?.message, + ch: chResult.error?.message, + }; + + const failedDependencies = Object.entries(dependencies) + .filter(([, ok]) => !ok) + .map(([name]) => name); + const workingDependencies = Object.entries(dependencies) + .filter(([, ok]) => ok) + .map(([name]) => name); + + const status = failedDependencies.length === 0 ? 200 : 503; + + if (status === 200) { + request.log.debug('healthcheck passed', { + workingDependencies, + failedDependencies, + dependencies, }); - } catch (error) { - request.log.warn('healthcheck failed', { error }); - return reply.status(503).send({ - ready: false, - reason: 'dependencies not ready', + } else { + request.log.warn('healthcheck failed', { + workingDependencies, + failedDependencies, + dependencies, + dependencyErrors, }); } + + return reply.status(status).send({ + ready: status === 200, + ...dependencies, + failedDependencies, + workingDependencies, + }); } -// Kubernetes - Liveness probe - returns 200 if process is alive -export async function liveness(request: FastifyRequest, reply: FastifyReply) { +// Kubernetes liveness — shallow, event loop only. +export async function liveness(_request: FastifyRequest, reply: FastifyReply) { return reply.status(200).send({ live: true }); } -// Kubernetes - Readiness probe - returns 200 only when accepting requests, 503 during shutdown -export async function readiness(request: FastifyRequest, reply: FastifyReply) { +// Kubernetes readiness — shallow + shutdown-aware. Dependency health lives on +// /healthcheck so a downstream blip cannot trigger mass pod restarts. +export async function readiness(_request: FastifyRequest, reply: FastifyReply) { if (isShuttingDown()) { return reply.status(503).send({ ready: false, reason: 'shutting down' }); } - // Perform lightweight dependency checks for readiness - const redisRes = await getRedisCache().ping(); - const dbRes = await db.$executeRaw`SELECT 1`; - const chRes = await chQuery('SELECT 1'); - - const isReady = redisRes; - - if (!isReady) { - const res = { - redis: redisRes === 'PONG', - db: !!dbRes, - ch: chRes && chRes.length > 0, - }; - request.log.warn('dependencies not ready', res); - return reply.status(503).send({ - ready: false, - reason: 'dependencies not ready', - ...res, - }); - } - return reply.status(200).send({ ready: true }); } diff --git a/apps/worker/src/boot-workers.ts b/apps/worker/src/boot-workers.ts index f58d4881f..70e4c3e9d 100644 --- a/apps/worker/src/boot-workers.ts +++ b/apps/worker/src/boot-workers.ts @@ -28,6 +28,11 @@ import { miscJob } from './jobs/misc'; import { notificationJob } from './jobs/notification'; import { sessionsJob } from './jobs/sessions'; import { eventsGroupJobDuration } from './metrics'; +import { setShuttingDown } from './utils/graceful-shutdown'; +import { + enableEventsHeartbeat, + markEventsActivity, +} from './utils/worker-heartbeat'; import { logger } from './utils/logger'; const workerOptions: WorkerOptions = { @@ -117,6 +122,10 @@ export function bootWorkers() { } } + if (eventQueuesToStart.length > 0) { + enableEventsHeartbeat(); + } + for (const index of eventQueuesToStart) { const queue = eventsGroupQueues[index]; if (!queue) { @@ -141,6 +150,13 @@ export function bootWorkers() { }, }); + // Consumer-loop heartbeat for the readiness probe. `completed` fires after + // each processed job; `drained` fires on each poll cycle that finds the + // queue empty. Together they refresh the timestamp every poll cycle while + // the consumer is alive — busy or idle. + worker.on('completed', markEventsActivity); + worker.on('drained', markEventsActivity); + worker.run(); workers.push(worker); logger.info(`Started worker for ${queueName}`, { concurrency }); @@ -337,6 +353,7 @@ export function bootWorkers() { ['uncaughtException', 'unhandledRejection', 'SIGTERM', 'SIGINT'].forEach( (evt) => { process.on(evt, (code) => { + setShuttingDown(true); exitHandler(evt, code); }); } diff --git a/apps/worker/src/index.ts b/apps/worker/src/index.ts index ff9c98270..10e206254 100644 --- a/apps/worker/src/index.ts +++ b/apps/worker/src/index.ts @@ -1,7 +1,8 @@ import { createBullBoard } from '@bull-board/api'; import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; import { ExpressAdapter } from '@bull-board/express'; -import { createInitialSalts } from '@openpanel/db'; +import { tryCatch } from '@openpanel/common'; +import { chQuery, createInitialSalts, db } from '@openpanel/db'; import { cohortComputeQueue, cronQueue, @@ -13,6 +14,7 @@ import { notificationQueue, sessionsQueue, } from '@openpanel/queue'; +import { getRedisCache } from '@openpanel/redis'; import express from 'express'; import { BullBoardGroupMQAdapter } from 'groupmq'; import client from 'prom-client'; @@ -20,7 +22,11 @@ import sourceMapSupport from 'source-map-support'; import { bootCron } from './boot-cron'; import { bootWorkers } from './boot-workers'; import { register } from './metrics'; +import { isShuttingDown } from './utils/graceful-shutdown'; import { logger } from './utils/logger'; +import { getEventsHeartbeat } from './utils/worker-heartbeat'; + +const EVENTS_HEARTBEAT_STALE_MS = 60_000; sourceMapSupport.install(); @@ -69,8 +75,80 @@ async function start() { }); }); - app.get('/healthcheck', (req, res) => { - res.json({ status: 'ok' }); + app.get('/healthcheck', async (req, res) => { + const [redisResult, dbResult, chResult] = await Promise.all([ + tryCatch(async () => (await getRedisCache().ping()) === 'PONG'), + tryCatch(async () => !!(await db.$executeRaw`SELECT 1`)), + tryCatch(async () => (await chQuery('SELECT 1')).length > 0), + ]); + + const dependencies = { + redis: redisResult.ok && redisResult.data, + db: dbResult.ok && dbResult.data, + ch: chResult.ok && chResult.data, + }; + const dependencyErrors = { + redis: redisResult.error?.message, + db: dbResult.error?.message, + ch: chResult.error?.message, + }; + + const failedDependencies = Object.entries(dependencies) + .filter(([, ok]) => !ok) + .map(([name]) => name); + const workingDependencies = Object.entries(dependencies) + .filter(([, ok]) => ok) + .map(([name]) => name); + + const status = failedDependencies.length === 0 ? 200 : 503; + + if (status !== 200) { + logger.warn('healthcheck failed', { + workingDependencies, + failedDependencies, + dependencies, + dependencyErrors, + }); + } + + res.status(status).json({ + ready: status === 200, + ...dependencies, + failedDependencies, + workingDependencies, + }); + }); + + // Kubernetes liveness — shallow, event loop only. + app.get('/healthz/live', (req, res) => { + res.status(200).json({ live: true }); + }); + + // Kubernetes readiness — shallow + shutdown-aware. When events workers run + // on this instance, also require the events consumer-loop heartbeat to be + // fresh (refreshed on each `completed`/`drained` event). If events are not + // enabled here, the heartbeat check is skipped. + app.get('/healthz/ready', (req, res) => { + if (isShuttingDown()) { + res.status(503).json({ ready: false, reason: 'shutting down' }); + return; + } + + const { enabled, lastActivityAt } = getEventsHeartbeat(); + if (enabled) { + const idleMs = Date.now() - lastActivityAt; + if (idleMs > EVENTS_HEARTBEAT_STALE_MS) { + res.status(503).json({ + ready: false, + reason: 'events consumer heartbeat stale', + idleMs, + thresholdMs: EVENTS_HEARTBEAT_STALE_MS, + }); + return; + } + } + + res.status(200).json({ ready: true }); }); app.listen(PORT, () => { diff --git a/apps/worker/src/utils/graceful-shutdown.ts b/apps/worker/src/utils/graceful-shutdown.ts new file mode 100644 index 000000000..535568c53 --- /dev/null +++ b/apps/worker/src/utils/graceful-shutdown.ts @@ -0,0 +1,9 @@ +let shuttingDown = false; + +export function setShuttingDown(value: boolean) { + shuttingDown = value; +} + +export function isShuttingDown() { + return shuttingDown; +} diff --git a/apps/worker/src/utils/worker-heartbeat.ts b/apps/worker/src/utils/worker-heartbeat.ts new file mode 100644 index 000000000..4b6834353 --- /dev/null +++ b/apps/worker/src/utils/worker-heartbeat.ts @@ -0,0 +1,23 @@ +// Consumer-loop heartbeat, scoped to the events queue. +// +// Enabled only on instances that run events workers. Refreshed on each events +// worker's `completed` (job processed) or `drained` (poll returned empty), so +// a healthy consumer loop refreshes the timestamp every ~blockingTimeoutSec +// regardless of traffic. If enabled and the timestamp goes stale past the +// readiness threshold, the events consumer is wedged. + +let enabled = false; +let lastActivityAt = Date.now(); + +export function enableEventsHeartbeat() { + enabled = true; + lastActivityAt = Date.now(); +} + +export function markEventsActivity() { + lastActivityAt = Date.now(); +} + +export function getEventsHeartbeat() { + return { enabled, lastActivityAt }; +} diff --git a/packages/common/index.ts b/packages/common/index.ts index 0f24c118f..24dcb4095 100644 --- a/packages/common/index.ts +++ b/packages/common/index.ts @@ -9,4 +9,5 @@ export * from './src/url'; export * from './src/id'; export * from './src/get-previous-metric'; export * from './src/group-by-labels'; +export * from './src/try-catch'; export * from './server/get-client-ip'; diff --git a/packages/common/src/try-catch.ts b/packages/common/src/try-catch.ts new file mode 100644 index 000000000..f437bbd63 --- /dev/null +++ b/packages/common/src/try-catch.ts @@ -0,0 +1,15 @@ +export type TryCatchResult = + | { ok: true; data: T; error: null } + | { ok: false; data: null; error: E }; + +export async function tryCatch( + input: (() => Promise) | Promise, +): Promise> { + try { + const promise = typeof input === 'function' ? input() : input; + const data = await promise; + return { ok: true, data, error: null }; + } catch (error) { + return { ok: false, data: null, error: error as E }; + } +} \ No newline at end of file diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index aa12e8b74..b573c8880 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -16,8 +16,8 @@ catalogs: specifier: ^19.2.3 version: 19.2.3 groupmq: - specifier: 2.0.0-next.3 - version: 2.0.0-next.3 + specifier: 2.0.0-next.4 + version: 2.0.0-next.4 react: specifier: ^19.2.3 version: 19.2.3 @@ -210,7 +210,7 @@ importers: version: 5.6.1(@fastify/swagger-ui@5.2.5)(@fastify/swagger@9.7.0)(fastify@5.6.1)(zod@4.3.6) groupmq: specifier: 'catalog:' - version: 2.0.0-next.3(ioredis@5.8.2) + version: 2.0.0-next.4(ioredis@5.8.2) jsonwebtoken: specifier: ^9.0.2 version: 9.0.2 @@ -988,7 +988,7 @@ importers: version: 4.18.2 groupmq: specifier: 'catalog:' - version: 2.0.0-next.3(ioredis@5.8.2) + version: 2.0.0-next.4(ioredis@5.8.2) prom-client: specifier: ^15.1.3 version: 15.1.3 @@ -1508,7 +1508,7 @@ importers: version: 5.63.0 groupmq: specifier: 'catalog:' - version: 2.0.0-next.3(ioredis@5.8.2) + version: 2.0.0-next.4(ioredis@5.8.2) devDependencies: '@openpanel/tsconfig': specifier: workspace:* @@ -13176,8 +13176,8 @@ packages: resolution: {integrity: sha512-5gghUc24tP9HRznNpV2+FIoq3xKkj5dTQqf4v0CpdPbFVwFkWoxOM+o+2OC9ZSvjEMTjfmG9QT+gcvggTwW1zw==} engines: {node: '>= 10.x'} - groupmq@2.0.0-next.3: - resolution: {integrity: sha512-aVza75AKavLUxDKhGGljJDJbYW5PjM5pbBErbGLLm6F8ybqWXEbKHi+P69DvRwI+ChF6Z6DbJwTBz9v9/EW81Q==} + groupmq@2.0.0-next.4: + resolution: {integrity: sha512-VvSKuDdEWr7PrtpGo+sWaMsH4q61NMBTiThFXXZVwgcAMJNeOWHhVVMiEvxHiU/R6P6xVqy0HaJu75Pbo6iD1A==} engines: {node: '>=18'} peerDependencies: ioredis: '>=5' @@ -33993,7 +33993,7 @@ snapshots: graphql@15.8.0: {} - groupmq@2.0.0-next.3(ioredis@5.8.2): + groupmq@2.0.0-next.4(ioredis@5.8.2): dependencies: cron-parser: 4.9.0 ioredis: 5.8.2 diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index 9c96df004..972e0601b 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -13,4 +13,4 @@ catalog: "@types/react-dom": ^19.2.3 "@types/node": ^24.7.1 typescript: ^5.9.3 - groupmq: 2.0.0-next.3 + groupmq: 2.0.0-next.4