From 282f865e3f23ebcbc6bbca3658e0a353b21b9e2f Mon Sep 17 00:00:00 2001 From: Mouad Kommir Date: Fri, 13 Mar 2026 01:13:43 +0000 Subject: [PATCH 1/3] Fix self-hosted checker and workflow routing --- .env.docker-lightweight.example | 9 + .env.docker.example | 26 +- apps/checker/checker/update.go | 37 ++- apps/server/src/libs/checker/utils.ts | 15 ++ apps/server/src/routes/v1/check/http/post.ts | 27 ++- apps/workflows/src/cron/checker.ts | 237 +++++++++++++++++++ apps/workflows/src/cron/monitor.ts | 108 ++++++--- apps/workflows/src/env.ts | 4 + docker-compose.github-packages.yaml | 72 +++++- docker-compose.yaml | 94 ++++++++ packages/api/src/router/checker.ts | 47 +++- 11 files changed, 611 insertions(+), 65 deletions(-) diff --git a/.env.docker-lightweight.example b/.env.docker-lightweight.example index 532a76d51c..d3effacbc9 100644 --- a/.env.docker-lightweight.example +++ b/.env.docker-lightweight.example @@ -81,6 +81,15 @@ NODE_ENV=production # [REQUIRED] Public URL for the application NEXT_PUBLIC_URL=http://localhost:3002 +# Self-hosted checker and workflows routing. +UPSTASH_REDIS_REST_URL=http://redis-http:80 +UPSTASH_REDIS_REST_TOKEN=replace-with-a-long-random-secret +CHECKER_BASE_URL=http://checker:8080 +CHECKER_REGION=ams +WORKFLOWS_BASE_URL=http://workflows:3000 +OPENSTATUS_WORKFLOWS_URL=http://workflows:3000 +OPENSTATUS_INGEST_URL=http://server:3000 + # DEVELOPMENT & TESTING diff --git a/.env.docker.example b/.env.docker.example index 9dc891d854..2e1140ff18 100644 --- a/.env.docker.example +++ b/.env.docker.example @@ -16,9 +16,9 @@ DATABASE_AUTH_TOKEN= # REDIS & QUEUE # ============================================================================ -# Redis (optional) - for caching -UPSTASH_REDIS_REST_URL=http://localhost:6379 -UPSTASH_REDIS_REST_TOKEN=placeholder +# Self-hosted Redis REST shim used by workflows/dashboard/server. +UPSTASH_REDIS_REST_URL=http://redis-http:80 +UPSTASH_REDIS_REST_TOKEN=replace-with-a-long-random-secret # QStash (optional - for background jobs) QSTASH_CURRENT_SIGNING_KEY= @@ -62,15 +62,7 @@ AUTH_GITHUB_SECRET= AUTH_GOOGLE_ID= AUTH_GOOGLE_SECRET= -# GOOGLE CLOUD -# ============================================================================ -# Google Cloud Platform (optional - for scheduled tasks) -GCP_PROJECT_ID=your-value -GCP_LOCATION=your-value -GCP_CLIENT_EMAIL=your-value -GCP_PRIVATE_KEY=your-value - -# Cron secret for scheduled jobs +# Cron secret for scheduled jobs and checker callbacks. CRON_SECRET=your-random-cron-secret # API KEYS @@ -133,11 +125,17 @@ NODE_ENV=production # [REQUIRED] Public URL for the application NEXT_PUBLIC_URL=http://localhost:3002 +# Self-hosted checker and workflows routing. +CHECKER_BASE_URL=http://checker:8080 +CHECKER_REGION=ams +WORKFLOWS_BASE_URL=http://workflows:3000 + # Screenshot service (optional) SCREENSHOT_SERVICE_URL= -# External services -OPENSTATUS_INGEST_URL=https://openstatus-private-location.fly.dev +# Private-location and checker callbacks stay on the internal Docker network. +OPENSTATUS_WORKFLOWS_URL=http://workflows:3000 +OPENSTATUS_INGEST_URL=http://server:3000 # DEVELOPMENT & TESTING # ============================================================================ diff --git a/apps/checker/checker/update.go b/apps/checker/checker/update.go index f3adbe19bd..86af351070 100644 --- a/apps/checker/checker/update.go +++ b/apps/checker/checker/update.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "net/http" "os" "strings" @@ -27,10 +28,42 @@ type UpdateData struct { } func UpdateStatus(ctx context.Context, updateData UpdateData) error { - - url := "https://openstatus-workflows.fly.dev/updateStatus" + url := os.Getenv("OPENSTATUS_WORKFLOWS_URL") + if url == "" { + url = "https://openstatus-workflows.fly.dev" + } + url = strings.TrimRight(url, "/") + "/updateStatus" basic := "Basic " + os.Getenv("CRON_SECRET") payloadBuf := new(bytes.Buffer) + + if os.Getenv("SELF_HOST") == "true" || os.Getenv("OPENSTATUS_WORKFLOWS_URL") != "" { + if err := json.NewEncoder(payloadBuf).Encode(updateData); err != nil { + log.Ctx(ctx).Error().Err(err).Msg("error while encoding update payload") + return err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, payloadBuf) + if err != nil { + log.Ctx(ctx).Error().Err(err).Msg("error while creating update request") + return err + } + req.Header.Set("Authorization", basic) + req.Header.Set("Content-Type", "application/json") + + res, err := http.DefaultClient.Do(req) + if err != nil { + log.Ctx(ctx).Error().Err(err).Msg("error while posting update status directly") + return err + } + defer res.Body.Close() + + if res.StatusCode < 200 || res.StatusCode >= 300 { + return fmt.Errorf("direct updateStatus failed with status %d", res.StatusCode) + } + + return nil + } + c := os.Getenv("GCP_PRIVATE_KEY") c = strings.ReplaceAll(c, "\\n", "\n") opts := &auth.Options2LO{ diff --git a/apps/server/src/libs/checker/utils.ts b/apps/server/src/libs/checker/utils.ts index ecff4b3a16..441a710925 100644 --- a/apps/server/src/libs/checker/utils.ts +++ b/apps/server/src/libs/checker/utils.ts @@ -7,6 +7,17 @@ import { transformHeaders, } from "@openstatus/utils"; +function isSelfHost() { + return process.env.SELF_HOST === "true"; +} + +function getCheckerBaseUrl() { + return (process.env.CHECKER_BASE_URL || "http://checker:8080").replace( + /\/$/, + "", + ); +} + export function getCheckerPayload( monitor: z.infer, status: z.infer["status"], @@ -72,6 +83,10 @@ export function getCheckerUrl( data: false, }, ): string { + if (isSelfHost()) { + return `${getCheckerBaseUrl()}/checker/${monitor.jobType}?monitor_id=${monitor.id}&trigger=${opts.trigger}&data=${opts.data}`; + } + switch (monitor.jobType) { case "http": return `https://openstatus-checker.fly.dev/checker/http?monitor_id=${monitor.id}&trigger=${opts.trigger}&data=${opts.data}`; diff --git a/apps/server/src/routes/v1/check/http/post.ts b/apps/server/src/routes/v1/check/http/post.ts index 2d6e0b685a..d383f8b779 100644 --- a/apps/server/src/routes/v1/check/http/post.ts +++ b/apps/server/src/routes/v1/check/http/post.ts @@ -17,6 +17,25 @@ import { ResponseSchema, } from "./schema"; +function isSelfHost() { + return process.env.SELF_HOST === "true"; +} + +function getCheckerBaseUrl() { + return (process.env.CHECKER_BASE_URL || "http://checker:8080").replace( + /\/$/, + "", + ); +} + +function getCheckerRegion(region: string) { + if (!isSelfHost()) { + return region; + } + + return process.env.CHECKER_REGION || "ams"; +} + const postRoute = createRoute({ method: "post", tags: ["check"], @@ -69,11 +88,15 @@ export function registerHTTPPostCheck(api: typeof checkApi) { for (let count = 0; count < input.runCount; count++) { const currentFetch = []; for (const region of input.regions) { - const r = fetch(`https://openstatus-checker.fly.dev/ping/${region}`, { + const targetRegion = getCheckerRegion(region); + const targetUrl = isSelfHost() + ? `${getCheckerBaseUrl()}/ping/${targetRegion}` + : `https://openstatus-checker.fly.dev/ping/${targetRegion}`; + const r = fetch(targetUrl, { headers: { Authorization: `Basic ${env.CRON_SECRET}`, "Content-Type": "application/json", - "fly-prefer-region": region, + ...(isSelfHost() ? {} : { "fly-prefer-region": targetRegion }), }, method: "POST", body: JSON.stringify({ diff --git a/apps/workflows/src/cron/checker.ts b/apps/workflows/src/cron/checker.ts index 3b253d642a..102d6149f1 100644 --- a/apps/workflows/src/cron/checker.ts +++ b/apps/workflows/src/cron/checker.ts @@ -37,6 +37,23 @@ export const isAuthorizedDomain = (url: string) => { const logger = getLogger("workflow"); +function isSelfHost() { + return env().SELF_HOST === "true"; +} + +function hasCloudTaskConfig() { + return Boolean( + env().GCP_PROJECT_ID.trim() && + env().GCP_CLIENT_EMAIL.trim() && + env().GCP_PRIVATE_KEY.trim() && + env().GCP_LOCATION.trim(), + ); +} + +function getCheckerBaseUrl() { + return env().CHECKER_BASE_URL.replace(/\/$/, ""); +} + const channelOptions = { // Conservative 5-minute keepalive (gRPC best practice) "grpc.keepalive_time_ms": 300000, @@ -50,6 +67,11 @@ export async function sendCheckerTasks( periodicity: z.infer, c: Context, ) { + if (isSelfHost() || !hasCloudTaskConfig()) { + await sendCheckerTasksDirect(periodicity, c); + return; + } + const client = new CloudTasksClient({ fallback: "rest", channelOptions, @@ -194,6 +216,221 @@ export async function sendCheckerTasks( ); } } + +async function sendCheckerTasksDirect( + periodicity: z.infer, + c: Context, +) { + const timestamp = Date.now(); + const selfHostRegion = env().CHECKER_REGION as Region; + + const currentMaintenance = db + .select({ id: maintenance.id }) + .from(maintenance) + .where( + and(lte(maintenance.from, new Date()), gte(maintenance.to, new Date())), + ) + .as("currentMaintenance"); + + const currentMaintenanceMonitors = db + .select({ id: pageComponent.monitorId }) + .from(maintenancesToPageComponents) + .innerJoin( + currentMaintenance, + eq(maintenancesToPageComponents.maintenanceId, currentMaintenance.id), + ) + .innerJoin( + pageComponent, + eq(maintenancesToPageComponents.pageComponentId, pageComponent.id), + ) + .where(isNotNull(pageComponent.monitorId)); + + const result = await db + .select() + .from(monitor) + .where( + and( + eq(monitor.periodicity, periodicity), + eq(monitor.active, true), + notInArray(monitor.id, currentMaintenanceMonitors), + ), + ) + .all(); + + logger.info("Starting direct self-host checker run", { + periodicity, + monitor_count: result.length, + }); + + const monitors = z.array(selectMonitorSchema).safeParse(result); + const allResult = []; + if (!monitors.success) { + logger.error(`Error while fetching the monitors ${monitors.error}`); + throw new Error("Error while fetching the monitors"); + } + + for (const row of monitors.data) { + const result = await db + .select() + .from(monitorStatusTable) + .where(eq(monitorStatusTable.monitorId, row.id)) + .all(); + const monitorStatus = z.array(selectMonitorStatusSchema).safeParse(result); + if (!monitorStatus.success) { + logger.error("Failed to parse monitor status", { + monitor_id: row.id, + error_message: monitorStatus.error.message, + }); + continue; + } + + for (const region of [selfHostRegion]) { + const status = + monitorStatus.data.find((m) => region === m.region)?.status || "active"; + allResult.push( + dispatchCheckerTaskDirect({ + row, + timestamp, + status, + region, + }), + ); + } + } + + if (periodicity === "30s") { + logger.warn( + "Self-host direct checker mode does not schedule the delayed second 30s task. Use 1m+ periodicities for reliable self-host operation.", + ); + } + + const allRequests = await Promise.allSettled(allResult); + + const success = allRequests.filter((r) => r.status === "fulfilled").length; + const failed = allRequests.filter((r) => r.status === "rejected").length; + + logger.info("Completed direct self-host checker run", { + periodicity, + total_tasks: allResult.length, + success_count: success, + failed_count: failed, + }); + if (failed > 0) { + getSentry(c).captureMessage( + `direct sendCheckerTasks for ${periodicity} ended with ${failed} failed tasks`, + "error", + ); + } +} + +async function dispatchCheckerTaskDirect({ + row, + timestamp, + status, + region, +}: { + row: z.infer; + timestamp: number; + status: MonitorStatus; + region: Region; +}) { + let payload: + | z.infer + | z.infer + | z.infer + | null = null; + + if (row.jobType === "http") { + payload = { + workspaceId: String(row.workspaceId), + monitorId: String(row.id), + url: row.url, + method: row.method || "GET", + cronTimestamp: timestamp, + body: row.body, + headers: row.headers, + status: status, + assertions: row.assertions ? JSON.parse(row.assertions) : null, + degradedAfter: row.degradedAfter, + timeout: row.timeout, + trigger: "cron", + otelConfig: row.otelEndpoint + ? { + endpoint: row.otelEndpoint, + headers: transformHeaders(row.otelHeaders), + } + : undefined, + retry: row.retry || 3, + followRedirects: + row.followRedirects === null ? true : row.followRedirects, + }; + } + if (row.jobType === "tcp") { + payload = { + workspaceId: String(row.workspaceId), + monitorId: String(row.id), + uri: row.url, + status: status, + assertions: row.assertions ? JSON.parse(row.assertions) : null, + cronTimestamp: timestamp, + degradedAfter: row.degradedAfter, + timeout: row.timeout, + trigger: "cron", + retry: row.retry || 3, + otelConfig: row.otelEndpoint + ? { + endpoint: row.otelEndpoint, + headers: transformHeaders(row.otelHeaders), + } + : undefined, + }; + } + if (row.jobType === "dns") { + payload = { + workspaceId: String(row.workspaceId), + monitorId: String(row.id), + uri: row.url, + cronTimestamp: timestamp, + status: status, + assertions: row.assertions ? JSON.parse(row.assertions) : null, + degradedAfter: row.degradedAfter, + timeout: row.timeout, + trigger: "cron", + otelConfig: row.otelEndpoint + ? { + endpoint: row.otelEndpoint, + headers: transformHeaders(row.otelHeaders), + } + : undefined, + retry: row.retry || 3, + }; + } + + if (!payload) { + throw new Error("Invalid jobType"); + } + + const response = await fetch( + `${getCheckerBaseUrl()}/checker/${row.jobType}?monitor_id=${row.id}`, + { + method: "POST", + headers: { + Authorization: `Basic ${env().CRON_SECRET}`, + "Content-Type": "application/json", + }, + body: JSON.stringify(payload), + }, + ); + + if (!response.ok) { + const body = await response.text(); + throw new Error( + `direct checker request failed for monitor ${row.id} (${row.jobType}) with status ${response.status}: ${body}`, + ); + } + + return response; +} // timestamp needs to be in ms const createCronTask = async ({ row, diff --git a/apps/workflows/src/cron/monitor.ts b/apps/workflows/src/cron/monitor.ts index 74964a225c..f8f2e3bd6c 100644 --- a/apps/workflows/src/cron/monitor.ts +++ b/apps/workflows/src/cron/monitor.ts @@ -24,24 +24,50 @@ import { env } from "../env"; const redis = Redis.fromEnv(); -const client = new CloudTasksClient({ - projectId: env().GCP_PROJECT_ID, - fallback: "rest", - credentials: { - client_email: env().GCP_CLIENT_EMAIL, - private_key: env().GCP_PRIVATE_KEY.replaceAll("\\n", "\n"), - }, -}); - -const parent = client.queuePath( - env().GCP_PROJECT_ID, - env().GCP_LOCATION, - "workflow", -); - const limiter = new RateLimiter({ tokensPerInterval: 15, interval: "second" }); +type CloudTaskContext = { + client: CloudTasksClient; + parent: string; +}; + +function getCloudTaskContext(queue: string): CloudTaskContext | null { + const currentEnv = env(); + const hasCloudTaskConfig = Boolean( + currentEnv.GCP_PROJECT_ID.trim() && + currentEnv.GCP_CLIENT_EMAIL.trim() && + currentEnv.GCP_PRIVATE_KEY.trim() && + currentEnv.GCP_LOCATION.trim(), + ); + + if (currentEnv.SELF_HOST === "true" || !hasCloudTaskConfig) { + return null; + } + + const client = new CloudTasksClient({ + projectId: currentEnv.GCP_PROJECT_ID, + fallback: "rest", + credentials: { + client_email: currentEnv.GCP_CLIENT_EMAIL, + private_key: currentEnv.GCP_PRIVATE_KEY.replaceAll("\\n", "\n"), + }, + }); + + return { + client, + parent: client.queuePath(currentEnv.GCP_PROJECT_ID, currentEnv.GCP_LOCATION, queue), + }; +} + export async function LaunchMonitorWorkflow() { + const cloudTaskContext = getCloudTaskContext("workflow"); + if (!cloudTaskContext) { + console.log( + "Skipping monitor lifecycle workflow: Cloud Tasks are unavailable in self-host mode.", + ); + return; + } + // Expires is one month after last connection, so if we want to reach people who connected 3 months ago we need to check for people with expires 2 months ago const twoMonthAgo = new Date().setMonth(new Date().getMonth() - 2); @@ -130,7 +156,7 @@ export async function LaunchMonitorWorkflow() { for (const user of users) { await limiter.removeTokens(1); - const workflow = workflowInit({ user }); + const workflow = workflowInit({ user, cloudTaskContext }); allResult.push(workflow); } @@ -146,12 +172,14 @@ export async function LaunchMonitorWorkflow() { async function workflowInit({ user, + cloudTaskContext, }: { user: { userId: number; email: string | null; workspaceId: number; }; + cloudTaskContext: CloudTaskContext; }) { console.log(`Starting workflow for ${user.userId}`); // Let's check if the user is in the workflow @@ -174,8 +202,7 @@ async function workflowInit({ return; } await CreateTask({ - parent, - client: client, + cloudTaskContext, step: "14days", userId: user.userId, initialRun: new Date().getTime(), @@ -187,6 +214,7 @@ async function workflowInit({ } export async function Step14Days(userId: number, workFlowRunTimestamp: number) { + const cloudTaskContext = getCloudTaskContext("workflow"); const user = await getUser(userId); // Send email saying we are going to pause the monitors @@ -209,17 +237,19 @@ export async function Step14Days(userId: number, workFlowRunTimestamp: number) { }, ]); - await CreateTask({ - parent, - client: client, - step: "3days", - userId: user.id, - initialRun: workFlowRunTimestamp, - }); + if (cloudTaskContext) { + await CreateTask({ + cloudTaskContext, + step: "3days", + userId: user.id, + initialRun: workFlowRunTimestamp, + }); + } } } export async function Step3Days(userId: number, workFlowRunTimestamp: number) { + const cloudTaskContext = getCloudTaskContext("workflow"); // check if user has connected const hasConnected = await hasUserLoggedIn({ userId, @@ -253,13 +283,14 @@ export async function Step3Days(userId: number, workFlowRunTimestamp: number) { // Send second email //TODO: Send email // Let's schedule the next task - await CreateTask({ - client, - parent, - step: "paused", - userId, - initialRun: workFlowRunTimestamp, - }); + if (cloudTaskContext) { + await CreateTask({ + cloudTaskContext, + step: "paused", + userId, + initialRun: workFlowRunTimestamp, + }); + } } export async function StepPaused(userId: number, workFlowRunTimestamp: number) { @@ -347,19 +378,18 @@ async function hasUserLoggedIn({ } function CreateTask({ - parent, - client, + cloudTaskContext, step, userId, initialRun, }: { - parent: string; - client: CloudTasksClient; + cloudTaskContext: CloudTaskContext; step: z.infer; userId: number; initialRun: number; }) { - const url = `https://openstatus-workflows.fly.dev/cron/monitors/${step}?userId=${userId}&initialRun=${initialRun}`; + const workflowsBaseUrl = env().WORKFLOWS_BASE_URL.replace(/\/$/, ""); + const url = `${workflowsBaseUrl}/cron/monitors/${step}?userId=${userId}&initialRun=${initialRun}`; const timestamp = getScheduledTime(step); const newTask: google.cloud.tasks.v2beta3.ITask = { httpRequest: { @@ -375,8 +405,8 @@ function CreateTask({ }, }; - const request = { parent: parent, task: newTask }; - return client.createTask(request); + const request = { parent: cloudTaskContext.parent, task: newTask }; + return cloudTaskContext.client.createTask(request); } function getScheduledTime(step: z.infer) { diff --git a/apps/workflows/src/env.ts b/apps/workflows/src/env.ts index fe6fd7120d..67e131979f 100644 --- a/apps/workflows/src/env.ts +++ b/apps/workflows/src/env.ts @@ -5,12 +5,16 @@ export const env = () => .object({ NODE_ENV: z.string().prefault("development"), PORT: z.coerce.number().prefault(3000), + SELF_HOST: z.enum(["true", "false"]).prefault("false"), GCP_PROJECT_ID: z.string().prefault(""), GCP_CLIENT_EMAIL: z.string().prefault(""), GCP_PRIVATE_KEY: z.string().prefault(""), GCP_LOCATION: z.string().prefault("europe-west1"), CRON_SECRET: z.string().prefault(""), SITE_URL: z.string().prefault("http://localhost:3000"), + WORKFLOWS_BASE_URL: z.string().prefault("http://workflows:3000"), + CHECKER_BASE_URL: z.string().prefault("http://checker:8080"), + CHECKER_REGION: z.string().prefault("ams"), DATABASE_URL: z.string().prefault("http://localhost:8080"), DATABASE_AUTH_TOKEN: z.string().prefault(""), RESEND_API_KEY: z.string().prefault(""), diff --git a/docker-compose.github-packages.yaml b/docker-compose.github-packages.yaml index a1c8654992..9c59d95239 100644 --- a/docker-compose.github-packages.yaml +++ b/docker-compose.github-packages.yaml @@ -6,6 +6,8 @@ networks: volumes: libsql-data: name: openstatus-libsql-data + redis-data: + name: openstatus-redis-data workflows-data: name: openstatus-workflows-data @@ -53,6 +55,39 @@ services: start_period: 20s restart: unless-stopped + redis: + container_name: openstatus-redis + image: redis/redis-stack-server:6.2.6-v6 + networks: + - openstatus + ports: + - "6379:6379" + volumes: + - redis-data:/data + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s + restart: unless-stopped + + redis-http: + container_name: openstatus-redis-http + image: hiett/serverless-redis-http:latest + networks: + - openstatus + ports: + - "8079:80" + environment: + - SRH_MODE=env + - SRH_TOKEN=${UPSTASH_REDIS_REST_TOKEN:-replace-with-a-long-random-secret} + - SRH_CONNECTION_STRING=redis://redis:6379 + depends_on: + redis: + condition: service_healthy + restart: unless-stopped + # Internal Services - Using GitHub Packages workflows: container_name: openstatus-workflows @@ -68,9 +103,18 @@ services: environment: - DATABASE_URL=http://libsql:8080 - PORT=3000 + - SELF_HOST=true + - SITE_URL=${NEXT_PUBLIC_URL:-http://localhost:3002} + - CHECKER_BASE_URL=${CHECKER_BASE_URL:-http://checker:8080} + - CHECKER_REGION=${CHECKER_REGION:-ams} + - WORKFLOWS_BASE_URL=${WORKFLOWS_BASE_URL:-http://workflows:3000} + - UPSTASH_REDIS_REST_URL=${UPSTASH_REDIS_REST_URL:-http://redis-http:80} + - UPSTASH_REDIS_REST_TOKEN=${UPSTASH_REDIS_REST_TOKEN:-replace-with-a-long-random-secret} depends_on: libsql: condition: service_healthy + redis-http: + condition: service_started healthcheck: test: ["CMD-SHELL", "curl -f http://localhost:3000/ping || exit 1"] interval: 15s @@ -91,11 +135,17 @@ services: environment: - DATABASE_URL=http://libsql:8080 - PORT=3000 + - SELF_HOST=true + - CHECKER_BASE_URL=${CHECKER_BASE_URL:-http://checker:8080} + - CHECKER_REGION=${CHECKER_REGION:-ams} + - CRON_SECRET=${CRON_SECRET} depends_on: workflows: condition: service_healthy libsql: condition: service_healthy + checker: + condition: service_healthy healthcheck: test: ["CMD-SHELL", "curl -f http://localhost:3000/ping || exit 1"] interval: 15s @@ -116,6 +166,7 @@ services: environment: - DB_URL=http://libsql:8080 - TINYBIRD_URL=http://tinybird-local:7181 + - OPENSTATUS_INGEST_URL=${OPENSTATUS_INGEST_URL:-http://server:3000} - GIN_MODE=release - PORT=8080 depends_on: @@ -146,10 +197,17 @@ services: env_file: - .env.docker environment: - - DATABASE_URL=http://libsql:8080 - PORT=8080 + - SELF_HOST=true + - CRON_SECRET=${CRON_SECRET} + - OPENSTATUS_WORKFLOWS_URL=${OPENSTATUS_WORKFLOWS_URL:-http://workflows:3000} + - TINYBIRD_TOKEN=${TINY_BIRD_API_KEY} + - CLOUD_PROVIDER=fly + - FLY_REGION=${CHECKER_REGION:-ams} depends_on: - server: + workflows: + condition: service_healthy + tinybird-local: condition: service_healthy healthcheck: test: ["CMD-SHELL", "curl -f http://localhost:8080/health || exit 1"] @@ -173,6 +231,11 @@ services: - PORT=3000 - HOSTNAME=0.0.0.0 - AUTH_TRUST_HOST=true + - SELF_HOST=true + - CHECKER_BASE_URL=${CHECKER_BASE_URL:-http://checker:8080} + - CHECKER_REGION=${CHECKER_REGION:-ams} + - UPSTASH_REDIS_REST_URL=${UPSTASH_REDIS_REST_URL:-http://redis-http:80} + - UPSTASH_REDIS_REST_TOKEN=${UPSTASH_REDIS_REST_TOKEN:-replace-with-a-long-random-secret} depends_on: workflows: condition: service_healthy @@ -180,6 +243,8 @@ services: condition: service_healthy server: condition: service_healthy + checker: + condition: service_healthy healthcheck: test: ["CMD-SHELL", "curl -f http://localhost:3000/ || exit 1"] interval: 15s @@ -202,6 +267,9 @@ services: - PORT=3000 - HOSTNAME=0.0.0.0 - AUTH_TRUST_HOST=true + - SELF_HOST=true + - UPSTASH_REDIS_REST_URL=${UPSTASH_REDIS_REST_URL:-http://redis-http:80} + - UPSTASH_REDIS_REST_TOKEN=${UPSTASH_REDIS_REST_TOKEN:-replace-with-a-long-random-secret} depends_on: workflows: condition: service_healthy diff --git a/docker-compose.yaml b/docker-compose.yaml index d409daae5f..b3eb2e1d81 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -6,6 +6,8 @@ networks: volumes: libsql-data: name: openstatus-libsql-data + redis-data: + name: openstatus-redis-data workflows-data: name: openstatus-workflows-data @@ -53,6 +55,39 @@ services: start_period: 20s restart: unless-stopped + redis: + container_name: openstatus-redis + image: redis/redis-stack-server:6.2.6-v6 + networks: + - openstatus + ports: + - "6379:6379" + volumes: + - redis-data:/data + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 10s + restart: unless-stopped + + redis-http: + container_name: openstatus-redis-http + image: hiett/serverless-redis-http:latest + networks: + - openstatus + ports: + - "8079:80" + environment: + - SRH_MODE=env + - SRH_TOKEN=${UPSTASH_REDIS_REST_TOKEN:-replace-with-a-long-random-secret} + - SRH_CONNECTION_STRING=redis://redis:6379 + depends_on: + redis: + condition: service_healthy + restart: unless-stopped + # Internal Services workflows: container_name: openstatus-workflows @@ -71,9 +106,18 @@ services: environment: - DATABASE_URL=http://libsql:8080 - PORT=3000 + - SELF_HOST=true + - SITE_URL=${NEXT_PUBLIC_URL:-http://localhost:3002} + - CHECKER_BASE_URL=${CHECKER_BASE_URL:-http://checker:8080} + - CHECKER_REGION=${CHECKER_REGION:-ams} + - WORKFLOWS_BASE_URL=${WORKFLOWS_BASE_URL:-http://workflows:3000} + - UPSTASH_REDIS_REST_URL=${UPSTASH_REDIS_REST_URL:-http://redis-http:80} + - UPSTASH_REDIS_REST_TOKEN=${UPSTASH_REDIS_REST_TOKEN:-replace-with-a-long-random-secret} depends_on: libsql: condition: service_healthy + redis-http: + condition: service_started healthcheck: test: ["CMD-SHELL", "curl -f http://localhost:3000/ping || exit 1"] interval: 15s @@ -97,11 +141,17 @@ services: environment: - DATABASE_URL=http://libsql:8080 - PORT=3000 + - SELF_HOST=true + - CHECKER_BASE_URL=${CHECKER_BASE_URL:-http://checker:8080} + - CHECKER_REGION=${CHECKER_REGION:-ams} + - CRON_SECRET=${CRON_SECRET} depends_on: workflows: condition: service_healthy libsql: condition: service_healthy + checker: + condition: service_healthy healthcheck: test: ["CMD-SHELL", "curl -f http://localhost:3000/ping || exit 1"] interval: 15s @@ -125,6 +175,7 @@ services: environment: - DB_URL=http://libsql:8080 - TINYBIRD_URL=http://tinybird-local:7181 + - OPENSTATUS_INGEST_URL=${OPENSTATUS_INGEST_URL:-http://server:3000} - GIN_MODE=release - PORT=8080 depends_on: @@ -145,6 +196,39 @@ services: start_period: 30s restart: unless-stopped + checker: + container_name: openstatus-checker + build: + context: apps/checker + dockerfile: Dockerfile + image: openstatus/checker:latest + networks: + - openstatus + ports: + - "8082:8080" + env_file: + - .env.docker + environment: + - PORT=8080 + - SELF_HOST=true + - CRON_SECRET=${CRON_SECRET} + - OPENSTATUS_WORKFLOWS_URL=${OPENSTATUS_WORKFLOWS_URL:-http://workflows:3000} + - TINYBIRD_TOKEN=${TINY_BIRD_API_KEY} + - CLOUD_PROVIDER=fly + - FLY_REGION=${CHECKER_REGION:-ams} + depends_on: + workflows: + condition: service_healthy + tinybird-local: + condition: service_healthy + healthcheck: + test: ["CMD-SHELL", "curl -f http://localhost:8080/health || exit 1"] + interval: 15s + timeout: 10s + retries: 3 + start_period: 30s + restart: unless-stopped + dashboard: container_name: openstatus-dashboard build: @@ -162,6 +246,11 @@ services: - PORT=3000 - HOSTNAME=0.0.0.0 - AUTH_TRUST_HOST=true + - SELF_HOST=true + - CHECKER_BASE_URL=${CHECKER_BASE_URL:-http://checker:8080} + - CHECKER_REGION=${CHECKER_REGION:-ams} + - UPSTASH_REDIS_REST_URL=${UPSTASH_REDIS_REST_URL:-http://redis-http:80} + - UPSTASH_REDIS_REST_TOKEN=${UPSTASH_REDIS_REST_TOKEN:-replace-with-a-long-random-secret} depends_on: workflows: condition: service_healthy @@ -169,6 +258,8 @@ services: condition: service_healthy server: condition: service_healthy + checker: + condition: service_healthy healthcheck: test: ["CMD-SHELL", "curl -f http://localhost:3000/ || exit 1"] interval: 15s @@ -194,6 +285,9 @@ services: - PORT=3000 - HOSTNAME=0.0.0.0 - AUTH_TRUST_HOST=true + - SELF_HOST=true + - UPSTASH_REDIS_REST_URL=${UPSTASH_REDIS_REST_URL:-http://redis-http:80} + - UPSTASH_REDIS_REST_TOKEN=${UPSTASH_REDIS_REST_TOKEN:-replace-with-a-long-random-secret} depends_on: workflows: condition: service_healthy diff --git a/packages/api/src/router/checker.ts b/packages/api/src/router/checker.ts index 1f20ba22cb..691dda5a3b 100644 --- a/packages/api/src/router/checker.ts +++ b/packages/api/src/router/checker.ts @@ -24,6 +24,25 @@ import { createTRPCRouter, protectedProcedure } from "../trpc"; const ABORT_TIMEOUT = 10000; +function isSelfHost() { + return process.env.SELF_HOST === "true"; +} + +function getCheckerBaseUrl() { + return (process.env.CHECKER_BASE_URL || "http://checker:8080").replace( + /\/$/, + "", + ); +} + +function getCheckerRegion(region: string) { + if (!isSelfHost()) { + return region; + } + + return process.env.CHECKER_REGION || "ams"; +} + // Input schemas const httpTestInput = z.object({ url: z.url(), @@ -158,14 +177,18 @@ export async function testHttp(input: z.infer) { } try { + const targetRegion = getCheckerRegion(input.region); + const targetUrl = isSelfHost() + ? `${getCheckerBaseUrl()}/ping/${targetRegion}` + : `https://openstatus-checker.fly.dev/ping/${targetRegion}`; const res = await fetch( - `https://openstatus-checker.fly.dev/ping/${input.region}`, + targetUrl, { method: "POST", headers: { Authorization: `Basic ${env.CRON_SECRET}`, "Content-Type": "application/json", - "fly-prefer-region": input.region, + ...(isSelfHost() ? {} : { "fly-prefer-region": targetRegion }), }, body: JSON.stringify({ url: input.url, @@ -250,14 +273,18 @@ export async function testHttp(input: z.infer) { export async function testTcp(input: z.infer) { try { + const targetRegion = getCheckerRegion(input.region); + const targetUrl = isSelfHost() + ? `${getCheckerBaseUrl()}/tcp/${targetRegion}` + : `https://openstatus-checker.fly.dev/tcp/${targetRegion}`; const res = await fetch( - `https://openstatus-checker.fly.dev/tcp/${input.region}`, + targetUrl, { method: "POST", headers: { Authorization: `Basic ${env.CRON_SECRET}`, "Content-Type": "application/json", - "fly-prefer-region": input.region, + ...(isSelfHost() ? {} : { "fly-prefer-region": targetRegion }), }, body: JSON.stringify({ uri: input.url }), signal: AbortSignal.timeout(ABORT_TIMEOUT), @@ -301,14 +328,18 @@ export async function testTcp(input: z.infer) { export async function testDns(input: z.infer) { try { + const targetRegion = getCheckerRegion(input.region); + const targetUrl = isSelfHost() + ? `${getCheckerBaseUrl()}/dns/${targetRegion}` + : `https://openstatus-checker.fly.dev/dns/${targetRegion}`; const res = await fetch( - `https://openstatus-checker.fly.dev/dns/${input.region}`, + targetUrl, { method: "POST", headers: { Authorization: `Basic ${env.CRON_SECRET}`, "Content-Type": "application/json", - "fly-prefer-region": input.region, + ...(isSelfHost() ? {} : { "fly-prefer-region": targetRegion }), }, body: JSON.stringify({ uri: input.url, @@ -469,6 +500,10 @@ export async function triggerChecker( } function generateUrl({ row }: { row: z.infer }) { + if (isSelfHost()) { + return `${getCheckerBaseUrl()}/checker/${row.jobType}?monitor_id=${row.id}`; + } + switch (row.jobType) { case "http": return `https://openstatus-checker.fly.dev/checker/http?monitor_id=${row.id}`; From b63362c3551c7d7017626449eb7ca17dbeaca921 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Fri, 13 Mar 2026 06:59:25 +0000 Subject: [PATCH 2/3] ci: apply automated fixes --- apps/workflows/src/cron/monitor.ts | 6 +- packages/api/src/router/checker.ts | 89 ++++++++++++++---------------- 2 files changed, 45 insertions(+), 50 deletions(-) diff --git a/apps/workflows/src/cron/monitor.ts b/apps/workflows/src/cron/monitor.ts index f8f2e3bd6c..ac784d4e22 100644 --- a/apps/workflows/src/cron/monitor.ts +++ b/apps/workflows/src/cron/monitor.ts @@ -55,7 +55,11 @@ function getCloudTaskContext(queue: string): CloudTaskContext | null { return { client, - parent: client.queuePath(currentEnv.GCP_PROJECT_ID, currentEnv.GCP_LOCATION, queue), + parent: client.queuePath( + currentEnv.GCP_PROJECT_ID, + currentEnv.GCP_LOCATION, + queue, + ), }; } diff --git a/packages/api/src/router/checker.ts b/packages/api/src/router/checker.ts index 691dda5a3b..8e1d7ced93 100644 --- a/packages/api/src/router/checker.ts +++ b/packages/api/src/router/checker.ts @@ -181,30 +181,27 @@ export async function testHttp(input: z.infer) { const targetUrl = isSelfHost() ? `${getCheckerBaseUrl()}/ping/${targetRegion}` : `https://openstatus-checker.fly.dev/ping/${targetRegion}`; - const res = await fetch( - targetUrl, - { - method: "POST", - headers: { - Authorization: `Basic ${env.CRON_SECRET}`, - "Content-Type": "application/json", - ...(isSelfHost() ? {} : { "fly-prefer-region": targetRegion }), - }, - body: JSON.stringify({ - url: input.url, - method: input.method, - headers: input.headers?.reduce( - (acc, { key, value }) => { - if (!key) return acc; - return { ...acc, [key]: value }; - }, - {} as Record, - ), - body: input.body, - }), - signal: AbortSignal.timeout(ABORT_TIMEOUT), + const res = await fetch(targetUrl, { + method: "POST", + headers: { + Authorization: `Basic ${env.CRON_SECRET}`, + "Content-Type": "application/json", + ...(isSelfHost() ? {} : { "fly-prefer-region": targetRegion }), }, - ); + body: JSON.stringify({ + url: input.url, + method: input.method, + headers: input.headers?.reduce( + (acc, { key, value }) => { + if (!key) return acc; + return { ...acc, [key]: value }; + }, + {} as Record, + ), + body: input.body, + }), + signal: AbortSignal.timeout(ABORT_TIMEOUT), + }); const json = await res.json(); const result = httpOutput.safeParse(json); @@ -277,19 +274,16 @@ export async function testTcp(input: z.infer) { const targetUrl = isSelfHost() ? `${getCheckerBaseUrl()}/tcp/${targetRegion}` : `https://openstatus-checker.fly.dev/tcp/${targetRegion}`; - const res = await fetch( - targetUrl, - { - method: "POST", - headers: { - Authorization: `Basic ${env.CRON_SECRET}`, - "Content-Type": "application/json", - ...(isSelfHost() ? {} : { "fly-prefer-region": targetRegion }), - }, - body: JSON.stringify({ uri: input.url }), - signal: AbortSignal.timeout(ABORT_TIMEOUT), + const res = await fetch(targetUrl, { + method: "POST", + headers: { + Authorization: `Basic ${env.CRON_SECRET}`, + "Content-Type": "application/json", + ...(isSelfHost() ? {} : { "fly-prefer-region": targetRegion }), }, - ); + body: JSON.stringify({ uri: input.url }), + signal: AbortSignal.timeout(ABORT_TIMEOUT), + }); const json = await res.json(); const result = tcpOutput.safeParse(json); @@ -332,21 +326,18 @@ export async function testDns(input: z.infer) { const targetUrl = isSelfHost() ? `${getCheckerBaseUrl()}/dns/${targetRegion}` : `https://openstatus-checker.fly.dev/dns/${targetRegion}`; - const res = await fetch( - targetUrl, - { - method: "POST", - headers: { - Authorization: `Basic ${env.CRON_SECRET}`, - "Content-Type": "application/json", - ...(isSelfHost() ? {} : { "fly-prefer-region": targetRegion }), - }, - body: JSON.stringify({ - uri: input.url, - }), - signal: AbortSignal.timeout(ABORT_TIMEOUT), + const res = await fetch(targetUrl, { + method: "POST", + headers: { + Authorization: `Basic ${env.CRON_SECRET}`, + "Content-Type": "application/json", + ...(isSelfHost() ? {} : { "fly-prefer-region": targetRegion }), }, - ); + body: JSON.stringify({ + uri: input.url, + }), + signal: AbortSignal.timeout(ABORT_TIMEOUT), + }); const json = await res.json(); const result = dnsOutput.safeParse(json); From 52ee9dde158b97bbe22a745c70ead4b82ab2cae6 Mon Sep 17 00:00:00 2001 From: Mouad Kommir Date: Fri, 13 Mar 2026 13:46:47 +0000 Subject: [PATCH 3/3] Add a concurrency limit to checker requests to keep throughput predictable. Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- apps/workflows/src/cron/checker.ts | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/apps/workflows/src/cron/checker.ts b/apps/workflows/src/cron/checker.ts index 102d6149f1..d8d41ac088 100644 --- a/apps/workflows/src/cron/checker.ts +++ b/apps/workflows/src/cron/checker.ts @@ -1,6 +1,7 @@ import { CloudTasksClient } from "@google-cloud/tasks"; import type { google } from "@google-cloud/tasks/build/protos/protos"; import { z } from "zod"; +import pLimit from "p-limit"; import { and, eq, gte, isNotNull, lte, notInArray } from "@openstatus/db"; import { @@ -264,6 +265,7 @@ async function sendCheckerTasksDirect( const monitors = z.array(selectMonitorSchema).safeParse(result); const allResult = []; + const limit = pLimit(10); if (!monitors.success) { logger.error(`Error while fetching the monitors ${monitors.error}`); throw new Error("Error while fetching the monitors"); @@ -288,12 +290,14 @@ async function sendCheckerTasksDirect( const status = monitorStatus.data.find((m) => region === m.region)?.status || "active"; allResult.push( - dispatchCheckerTaskDirect({ - row, - timestamp, - status, - region, - }), + limit(() => + dispatchCheckerTaskDirect({ + row, + timestamp, + status, + region, + }), + ), ); } }