-
Notifications
You must be signed in to change notification settings - Fork 349
fix: improve healthcheck #349
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,65 +1,74 @@ | ||
| import { tryCatch } from '@openpanel/common'; | ||
| import { chQuery, db } from '@openpanel/db'; | ||
| import { getRedisCache } from '@openpanel/redis'; | ||
| import type { FastifyReply, FastifyRequest } from 'fastify'; | ||
| import { isShuttingDown } from '@/utils/graceful-shutdown'; | ||
|
|
||
| // For docker compose healthcheck | ||
| export async function healthcheck( | ||
| request: FastifyRequest, | ||
| reply: FastifyReply | ||
| ) { | ||
| try { | ||
| const redisRes = await getRedisCache().ping(); | ||
| const dbRes = await db.$executeRaw`SELECT 1`; | ||
| const chRes = await chQuery('SELECT 1'); | ||
| const status = redisRes && dbRes && chRes ? 200 : 503; | ||
|
|
||
| reply.status(status).send({ | ||
| ready: status === 200, | ||
| redis: redisRes === 'PONG', | ||
| db: !!dbRes, | ||
| ch: chRes && chRes.length > 0, | ||
| const [redisResult, dbResult, chResult] = await Promise.all([ | ||
| tryCatch(async () => (await getRedisCache().ping()) === 'PONG'), | ||
| tryCatch(async () => !!(await db.$executeRaw`SELECT 1`)), | ||
| tryCatch(async () => (await chQuery('SELECT 1')).length > 0), | ||
| ]); | ||
|
|
||
| const dependencies = { | ||
| redis: redisResult.ok && redisResult.data, | ||
| db: dbResult.ok && dbResult.data, | ||
| ch: chResult.ok && chResult.data, | ||
| }; | ||
| const dependencyErrors = { | ||
| redis: redisResult.error?.message, | ||
| db: dbResult.error?.message, | ||
| ch: chResult.error?.message, | ||
| }; | ||
|
|
||
| const failedDependencies = Object.entries(dependencies) | ||
| .filter(([, ok]) => !ok) | ||
| .map(([name]) => name); | ||
| const workingDependencies = Object.entries(dependencies) | ||
| .filter(([, ok]) => ok) | ||
| .map(([name]) => name); | ||
|
|
||
| const status = failedDependencies.length === 0 ? 200 : 503; | ||
|
|
||
| if (status === 200) { | ||
| request.log.debug('healthcheck passed', { | ||
| workingDependencies, | ||
| failedDependencies, | ||
| dependencies, | ||
| }); | ||
| } catch (error) { | ||
| request.log.warn('healthcheck failed', { error }); | ||
| return reply.status(503).send({ | ||
| ready: false, | ||
| reason: 'dependencies not ready', | ||
| } else { | ||
| request.log.warn('healthcheck failed', { | ||
| workingDependencies, | ||
| failedDependencies, | ||
| dependencies, | ||
| dependencyErrors, | ||
| }); | ||
| } | ||
|
|
||
| return reply.status(status).send({ | ||
| ready: status === 200, | ||
| ...dependencies, | ||
| failedDependencies, | ||
| workingDependencies, | ||
| }); | ||
| } | ||
|
|
||
| // Kubernetes - Liveness probe - returns 200 if process is alive | ||
| export async function liveness(request: FastifyRequest, reply: FastifyReply) { | ||
| // Kubernetes liveness — shallow, event loop only. | ||
| export async function liveness(_request: FastifyRequest, reply: FastifyReply) { | ||
| return reply.status(200).send({ live: true }); | ||
| } | ||
|
|
||
| // Kubernetes - Readiness probe - returns 200 only when accepting requests, 503 during shutdown | ||
| export async function readiness(request: FastifyRequest, reply: FastifyReply) { | ||
| // Kubernetes readiness — shallow + shutdown-aware. Dependency health lives on | ||
| // /healthcheck so a downstream blip cannot trigger mass pod restarts. | ||
| export async function readiness(_request: FastifyRequest, reply: FastifyReply) { | ||
| if (isShuttingDown()) { | ||
| return reply.status(503).send({ ready: false, reason: 'shutting down' }); | ||
| } | ||
|
|
||
| // Perform lightweight dependency checks for readiness | ||
| const redisRes = await getRedisCache().ping(); | ||
| const dbRes = await db.$executeRaw`SELECT 1`; | ||
| const chRes = await chQuery('SELECT 1'); | ||
|
|
||
| const isReady = redisRes; | ||
|
|
||
| if (!isReady) { | ||
| const res = { | ||
| redis: redisRes === 'PONG', | ||
| db: !!dbRes, | ||
| ch: chRes && chRes.length > 0, | ||
| }; | ||
| request.log.warn('dependencies not ready', res); | ||
| return reply.status(503).send({ | ||
| ready: false, | ||
| reason: 'dependencies not ready', | ||
| ...res, | ||
| }); | ||
| } | ||
|
|
||
| return reply.status(200).send({ ready: true }); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,8 @@ | ||
| import { createBullBoard } from '@bull-board/api'; | ||
| import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; | ||
| import { ExpressAdapter } from '@bull-board/express'; | ||
| import { createInitialSalts } from '@openpanel/db'; | ||
| import { tryCatch } from '@openpanel/common'; | ||
| import { chQuery, createInitialSalts, db } from '@openpanel/db'; | ||
| import { | ||
| cohortComputeQueue, | ||
| cronQueue, | ||
|
|
@@ -13,14 +14,19 @@ import { | |
| notificationQueue, | ||
| sessionsQueue, | ||
| } from '@openpanel/queue'; | ||
| import { getRedisCache } from '@openpanel/redis'; | ||
| import express from 'express'; | ||
| import { BullBoardGroupMQAdapter } from 'groupmq'; | ||
| import client from 'prom-client'; | ||
| import sourceMapSupport from 'source-map-support'; | ||
| import { bootCron } from './boot-cron'; | ||
| import { bootWorkers } from './boot-workers'; | ||
| import { register } from './metrics'; | ||
| import { isShuttingDown } from './utils/graceful-shutdown'; | ||
| import { logger } from './utils/logger'; | ||
| import { getEventsHeartbeat } from './utils/worker-heartbeat'; | ||
|
|
||
| const EVENTS_HEARTBEAT_STALE_MS = 60_000; | ||
|
|
||
| sourceMapSupport.install(); | ||
|
|
||
|
|
@@ -69,8 +75,80 @@ async function start() { | |
| }); | ||
| }); | ||
|
|
||
| app.get('/healthcheck', (req, res) => { | ||
| res.json({ status: 'ok' }); | ||
| app.get('/healthcheck', async (req, res) => { | ||
| const [redisResult, dbResult, chResult] = await Promise.all([ | ||
| tryCatch(async () => (await getRedisCache().ping()) === 'PONG'), | ||
| tryCatch(async () => !!(await db.$executeRaw`SELECT 1`)), | ||
| tryCatch(async () => (await chQuery('SELECT 1')).length > 0), | ||
| ]); | ||
|
|
||
| const dependencies = { | ||
| redis: redisResult.ok && redisResult.data, | ||
| db: dbResult.ok && dbResult.data, | ||
| ch: chResult.ok && chResult.data, | ||
| }; | ||
| const dependencyErrors = { | ||
| redis: redisResult.error?.message, | ||
| db: dbResult.error?.message, | ||
| ch: chResult.error?.message, | ||
| }; | ||
|
|
||
| const failedDependencies = Object.entries(dependencies) | ||
| .filter(([, ok]) => !ok) | ||
| .map(([name]) => name); | ||
| const workingDependencies = Object.entries(dependencies) | ||
| .filter(([, ok]) => ok) | ||
| .map(([name]) => name); | ||
|
|
||
| const status = failedDependencies.length === 0 ? 200 : 503; | ||
|
|
||
| if (status !== 200) { | ||
| logger.warn('healthcheck failed', { | ||
| workingDependencies, | ||
| failedDependencies, | ||
| dependencies, | ||
| dependencyErrors, | ||
| }); | ||
| } | ||
|
|
||
| res.status(status).json({ | ||
| ready: status === 200, | ||
| ...dependencies, | ||
| failedDependencies, | ||
| workingDependencies, | ||
| }); | ||
| }); | ||
|
|
||
| // Kubernetes liveness — shallow, event loop only. | ||
| app.get('/healthz/live', (req, res) => { | ||
| res.status(200).json({ live: true }); | ||
| }); | ||
|
|
||
| // Kubernetes readiness — shallow + shutdown-aware. When events workers run | ||
| // on this instance, also require the events consumer-loop heartbeat to be | ||
| // fresh (refreshed on each `completed`/`drained` event). If events are not | ||
| // enabled here, the heartbeat check is skipped. | ||
| app.get('/healthz/ready', (req, res) => { | ||
| if (isShuttingDown()) { | ||
| res.status(503).json({ ready: false, reason: 'shutting down' }); | ||
| return; | ||
| } | ||
|
|
||
| const { enabled, lastActivityAt } = getEventsHeartbeat(); | ||
| if (enabled) { | ||
| const idleMs = Date.now() - lastActivityAt; | ||
| if (idleMs > EVENTS_HEARTBEAT_STALE_MS) { | ||
| res.status(503).json({ | ||
| ready: false, | ||
| reason: 'events consumer heartbeat stale', | ||
| idleMs, | ||
| thresholdMs: EVENTS_HEARTBEAT_STALE_MS, | ||
| }); | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| res.status(200).json({ ready: true }); | ||
| }); | ||
|
Comment on lines
+131
to
152
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Startup race:
Consider flipping an explicit "booted" flag after 🤖 Prompt for AI Agents |
||
|
|
||
| app.listen(PORT, () => { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| let shuttingDown = false; | ||
|
|
||
| export function setShuttingDown(value: boolean) { | ||
| shuttingDown = value; | ||
| } | ||
|
|
||
| export function isShuttingDown() { | ||
| return shuttingDown; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| // Consumer-loop heartbeat, scoped to the events queue. | ||
| // | ||
| // Enabled only on instances that run events workers. Refreshed on each events | ||
| // worker's `completed` (job processed) or `drained` (poll returned empty), so | ||
| // a healthy consumer loop refreshes the timestamp every ~blockingTimeoutSec | ||
| // regardless of traffic. If enabled and the timestamp goes stale past the | ||
| // readiness threshold, the events consumer is wedged. | ||
|
|
||
| let enabled = false; | ||
| let lastActivityAt = Date.now(); | ||
|
|
||
| export function enableEventsHeartbeat() { | ||
| enabled = true; | ||
| lastActivityAt = Date.now(); | ||
| } | ||
|
|
||
| export function markEventsActivity() { | ||
| lastActivityAt = Date.now(); | ||
| } | ||
|
|
||
| export function getEventsHeartbeat() { | ||
| return { enabled, lastActivityAt }; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| export type TryCatchResult<T, E = Error> = | ||
| | { ok: true; data: T; error: null } | ||
| | { ok: false; data: null; error: E }; | ||
|
|
||
| export async function tryCatch<T, E = Error>( | ||
| input: (() => Promise<T>) | Promise<T>, | ||
| ): Promise<TryCatchResult<T, E>> { | ||
| try { | ||
| const promise = typeof input === 'function' ? input() : input; | ||
| const data = await promise; | ||
| return { ok: true, data, error: null }; | ||
| } catch (error) { | ||
| return { ok: false, data: null, error: error as E }; | ||
| } | ||
| } |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: Openpanel-dev/openpanel
Length of output: 49
🌐 Web query:
groupmq 2.0.0-next.4 Worker events completed drained💡 Result:
For “groupmq 2.0.0-next.4 Worker events completed drained” there isn’t enough evidence in the sources I found to answer what GroupMQ emits for “completed” and “drained” worker events in version 2.0.0-next.4 specifically. What I could verify: 1) GroupMQ is a Redis-backed per-group FIFO queue for Node/TypeScript, with a BullMQ-compatible API (site + README). It provides a Worker class and worker options (including concurrency), but the sources I found do not describe any “worker events” named “completed” or “drained”, nor do they document event payload semantics. 2) The only “completed/drained” event behavior I found is for BullMQ (different library). In BullMQ, there is a “drained” event and it has been discussed/changed regarding when it emits (e.g., only once when the worker is idle; and older versions emitting more often). This does not establish GroupMQ’s 2.0.0-next.4 behavior. 3) I located GroupMQ releases up to 2.0.0-next.1 (2026-03-12). I did not find a 2.0.0-next.4 release page/changelog entry in the searched results, so I couldn’t corroborate “2.0.0-next.4” content. Conclusion: I cannot confidently state what “completed” and “drained” worker events are in groupmq 2.0.0-next.4, or whether “drained” exists / how it fires. If you want, paste the exact code snippet where you attach the listeners (e.g., worker.on('completed'...) and worker.on('drained'...)) or share the groupmq 2.0.0-next.4 changelog / worker source file path, and I can re-run a targeted search against the exact implementation and produce a definitive, version-specific answer.
Citations:
🏁 Script executed:
Repository: Openpanel-dev/openpanel
Length of output: 408
🏁 Script executed:
Repository: Openpanel-dev/openpanel
Length of output: 1416
🏁 Script executed:
Repository: Openpanel-dev/openpanel
Length of output: 49
🏁 Script executed:
Repository: Openpanel-dev/openpanel
Length of output: 94
🏁 Script executed:
Repository: Openpanel-dev/openpanel
Length of output: 49
🏁 Script executed:
Repository: Openpanel-dev/openpanel
Length of output: 49
🏁 Script executed:
Repository: Openpanel-dev/openpanel
Length of output: 49
🏁 Script executed:
Repository: Openpanel-dev/openpanel
Length of output: 2333
🏁 Script executed:
Repository: Openpanel-dev/openpanel
Length of output: 1319
🏁 Script executed:
Repository: Openpanel-dev/openpanel
Length of output: 1407
🏁 Script executed:
Repository: Openpanel-dev/openpanel
Length of output: 385
🏁 Script executed:
Repository: Openpanel-dev/openpanel
Length of output: 49
🌐 Web query:
Openpanel-dev groupmq 2.0.0-next.4 Worker class events completed drained emit💡 Result:
For Openpanel-dev/groupmq, the “drained” and “completed” worker/job lifecycle events you mention are not found in the official GroupMQ docs/pages returned by the research. The only worker events explicitly documented in the Openpanel-dev/groupmq README/docs are: ready, completed, failed, error, closed, and graceful-timeout. Additionally, GroupMQ’s documented job completion flow includes moving a successfully processed job into a :completed retention sorted set and unlocking the group lock, but this is internal state management—not an exposed “drained” event. So, based on the sources located for this query, there is no evidence of a “Worker class events … drained … completed … emit” sequence in groupmq 2.0.0-next.4. What is evidenced: - Worker emits documented events including completed (and other events), and drained is not listed. - Job completion flow marks the job as completed in the queue’s :completed sorted set and unlocks the group. If you can share the exact file/snippet from groupmq 2.0.0-next.4 you’re looking at (or the expected event names), I can re-check precisely whether drained exists under a different name, is part of QueueEvents (not Worker), or was removed/changed in the 2.0.0-next.x line.
Citations:
🏁 Script executed:
Repository: Openpanel-dev/openpanel
Length of output: 49
🏁 Script executed:
Repository: Openpanel-dev/openpanel
Length of output: 193
🏁 Script executed:
Repository: Openpanel-dev/openpanel
Length of output: 49
🌐 Web query:
site:github.com/Openpanel-dev/groupmq Worker class emit events implementation💡 Result:
In libraries like GroupMQ (a BullMQ-inspired Redis job queue for Node.js/TypeScript), the Worker class emits events such as 'ready', 'completed', 'failed', 'error', 'closed', and 'graceful-timeout'. These are listened to using standard Node.js EventEmitter syntax: worker.on('event', handler). Example implementation from GroupMQ documentation: worker.on('ready', => { console.log('Worker is ready'); }); worker.on('completed', (job) => { console.log('Job completed:', job.id); }); worker.on('failed', (job) => { console.log('Job failed:', job.id, job.failedReason); }); worker.on('error', (error) => { console.error('Worker error:', error); }); worker.on('closed', => { console.log('Worker closed'); }); worker.on('graceful-timeout', (job) => { console.log('Job exceeded graceful timeout:', job.id); }); // Remove listeners worker.off('completed', handler); worker.removeAllListeners; The Worker class extends Node.js EventEmitter, allowing it to emit these lifecycle and job status events for monitoring and handling in queue processing systems.
Citations:
🏁 Script executed:
Repository: Openpanel-dev/openpanel
Length of output: 2182
🏁 Script executed:
Repository: Openpanel-dev/openpanel
Length of output: 2485
🌐 Web query:
site:github.com/Openpanel-dev/groupmq "drained" event Worker emit💡 Result:
No evidence found of a "drained" event emitted by a Worker in any job queue library, including BullMQ, groupmq, Bee-Queue, or others. Searches across official documentation and code repositories (primarily groupmq README and BullMQ-inspired libraries) show Worker events like 'ready', 'completed', 'failed', 'error', 'closed', and 'graceful-timeout', but no 'drained' event. The query may refer to a custom implementation, a different library, or a misremembered event name. In Bee-Queue, 'drained' exists but as a queue event when no jobs remain, not emitted by Worker.
Citations:
🏁 Script executed:
Repository: Openpanel-dev/openpanel
Length of output: 90
🌐 Web query:
site:github.com/Openpanel-dev/groupmq Worker.emit all events 2.0.0-next💡 Result:
In the groupmq library (a Redis-backed per-group FIFO queue for Node.js/TypeScript), Worker instances extend TypedEventEmitter and emit the following events: - 'ready': Emitted when the worker is ready. - 'completed': Emitted when a job completes, with the Job object. - 'failed': Emitted when a job fails, with the Job object and failedReason. - 'error': Emitted on worker errors, with the Error. - 'closed': Emitted when the worker closes. - 'graceful-timeout': Emitted when a job exceeds graceful timeout, with the Job object. No version 2.0.0-next or Cloudflare Workers context found matching the query. Latest groupmq version is v1.1.0 (Nov 2025), with next tags like 1.1.1-next.1. Use worker.on(event, handler) to listen and worker.off or worker.removeAllListeners to remove.
Citations:
Remove the
worker.on('drained')listener — the event does not exist in groupmq 2.0.0-next.4.The groupmq Worker class emits only:
'ready','completed','failed','error','closed', and'graceful-timeout'. There is no'drained'event. This means the heartbeat will only refresh when jobs complete; during idle periods (no jobs in queue),markEventsActivity()never fires, and the readiness probe will report 503 after 60 seconds.Either implement a separate idle heartbeat mechanism (e.g., a periodic timer that calls
markEventsActivity()at shorter intervals) or accept that the readiness probe will cycle to unhealthy during sustained idle periods.🤖 Prompt for AI Agents