import { Elysia, t } from "elysia"; import sql from "../db"; import { resolveKey } from "./auth"; import { extractAuthKey, safeTokenCompare } from "../../../shared/auth"; import { dispatchForMonitor, type NotificationEvent } from "../notifications"; type SSEController = ReadableStreamDefaultController; const bus = new Map>(); // keyed by accountId const enc = new TextEncoder(); function publish(accountId: string, data: object) { const subs = bus.get(accountId); if (!subs?.size) return; const msg = enc.encode(`data: ${JSON.stringify(data)}\n\n`); for (const ctrl of subs) { try { ctrl.enqueue(msg); } catch { subs.delete(ctrl); } } } function makeSSEStream(accountId: string): Response { let ctrl: SSEController; let heartbeat: Timer; const stream = new ReadableStream({ start(c) { ctrl = c; if (!bus.has(accountId)) bus.set(accountId, new Set()); bus.get(accountId)!.add(ctrl); ctrl.enqueue(enc.encode(": connected\n\n")); heartbeat = setInterval(() => { try { ctrl.enqueue(enc.encode(": heartbeat\n\n")); } catch { clearInterval(heartbeat); bus.get(accountId)?.delete(ctrl); if (bus.get(accountId)?.size === 0) bus.delete(accountId); } }, 10_000); }, cancel() { clearInterval(heartbeat); bus.get(accountId)?.delete(ctrl); if (bus.get(accountId)?.size === 0) bus.delete(accountId); }, }); return new Response(stream, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, }); } export const ingest = new Elysia() .post("/internal/ingest", async ({ body, headers, set }) => { const token = headers["x-monitor-token"]; if (!safeTokenCompare(token, process.env.MONITOR_TOKEN)) { set.status = 401; return { error: "Unauthorized" }; } const [monitor_check] = await sql` SELECT id, account_id, name, url, resend_interval, cert_alert_days FROM monitors WHERE id = ${body.monitor_id} `; if (!monitor_check) { set.status = 404; return { error: "Monitor not found" }; } const meta = body.meta ? { ...body.meta } : {}; if (body.cert_expiry_days != null) meta.cert_expiry_days = body.cert_expiry_days; const responseBody: string | null = meta.body_preview ?? null; delete meta.body_preview; const checkedAt = body.checked_at ? new Date(body.checked_at) : null; const scheduledAt = body.scheduled_at ? new Date(body.scheduled_at) : null; const jitterMs = body.jitter_ms ?? null; // Per-region transition state. Empty string = unspecified/single-region. const region = body.region ?? ''; const [stateRow] = await sql` SELECT last_state, consecutive_down, cert_alert_sent FROM monitor_region_state WHERE monitor_id = ${body.monitor_id} AND region = ${region} `; const newState = body.up ? 'up' : 'down'; const prevState: string | null = stateRow?.last_state ?? null; let consecutiveDown: number = stateRow?.consecutive_down ?? 0; let certAlertSent: boolean = stateRow?.cert_alert_sent ?? false; const resendInterval: number = monitor_check.resend_interval ?? 0; const certAlertDays: number = monitor_check.cert_alert_days ?? 0; let important = false; if (prevState !== newState) { important = true; consecutiveDown = body.up ? 0 : 1; } else if (!body.up) { consecutiveDown += 1; if (resendInterval > 0 && consecutiveDown > 1 && (consecutiveDown - 1) % resendInterval === 0) { important = true; } } else { consecutiveDown = 0; } // Cert dedupe flag: flip on threshold cross, clear on renewal. Per-region. let certEvent = false; const days = body.cert_expiry_days; if (days != null && certAlertDays > 0) { if (days <= certAlertDays && !certAlertSent) { certEvent = true; certAlertSent = true; } else if (days > certAlertDays && certAlertSent) { certAlertSent = false; } } await sql` INSERT INTO monitor_region_state (monitor_id, region, last_state, consecutive_down, cert_alert_sent, updated_at) VALUES (${body.monitor_id}, ${region}, ${newState}, ${consecutiveDown}, ${certAlertSent}, now()) ON CONFLICT (monitor_id, region) DO UPDATE SET last_state = EXCLUDED.last_state, consecutive_down = EXCLUDED.consecutive_down, cert_alert_sent = EXCLUDED.cert_alert_sent, updated_at = now() `; const [ping] = await sql` INSERT INTO pings (monitor_id, checked_at, scheduled_at, jitter_ms, status_code, latency_ms, up, important, error, meta, region, run_id) VALUES ( ${body.monitor_id}, ${checkedAt ?? sql`now()`}, ${scheduledAt}, ${jitterMs}, ${body.status_code ?? null}, ${body.latency_ms ?? null}, ${body.up}, ${important}, ${body.error ?? null}, ${Object.keys(meta).length > 0 ? sql.json(meta) : null}, ${body.region ?? null}, ${body.run_id ?? null} ) RETURNING * `; if (responseBody != null && ping) { await sql`INSERT INTO ping_bodies (ping_id, body) VALUES (${ping.id}, ${responseBody})`; } publish(monitor_check.account_id, ping); // Fire-and-forget notifications. Failures log only and never block ingest. const monitorCtx = { id: monitor_check.id, name: monitor_check.name, url: monitor_check.url, region, }; const pingCtx = { status_code: body.status_code ?? null, latency_ms: body.latency_ms ?? null, error: body.error ?? null, checked_at: ping?.checked_at?.toISOString?.() ?? new Date().toISOString(), }; if (important) { const event: NotificationEvent = body.up ? { kind: "up", monitor: monitorCtx, ping: pingCtx } : { kind: "down", monitor: monitorCtx, ping: pingCtx }; // Suppress the synthetic "up" on a brand-new monitor (no prior state). if (!(body.up && prevState === null)) { void dispatchForMonitor(monitor_check.id, event); } } if (certEvent && days != null) { void dispatchForMonitor(monitor_check.id, { kind: "cert", monitor: monitorCtx, days }); } return { ok: true }; }, { body: t.Object({ monitor_id: t.String(), checked_at: t.Optional(t.Nullable(t.String())), scheduled_at: t.Optional(t.Nullable(t.String())), jitter_ms: t.Optional(t.Nullable(t.Number())), status_code: t.Optional(t.Nullable(t.Number())), latency_ms: t.Optional(t.Nullable(t.Number())), up: t.Boolean(), error: t.Optional(t.Nullable(t.String())), cert_expiry_days: t.Optional(t.Nullable(t.Number())), meta: t.Optional(t.Any()), region: t.Optional(t.Nullable(t.String())), run_id: t.Optional(t.Nullable(t.String())), }), detail: { hide: true }, }) .get("/pings/:id/body", async ({ params, headers, cookie, set }) => { const key = extractAuthKey(headers, cookie); if (!key) { set.status = 401; return { error: "Unauthorized" }; } const resolved = await resolveKey(key); if (!resolved) { set.status = 401; return { error: "Unauthorized" }; } const [ping] = await sql` SELECT p.id FROM pings p JOIN monitors m ON m.id = p.monitor_id WHERE p.id = ${params.id} AND m.account_id = ${resolved.accountId} `; if (!ping) { set.status = 404; return { error: "Not found" }; } const [row] = await sql`SELECT body FROM ping_bodies WHERE ping_id = ${params.id}`; return { body: row?.body ?? null }; }, { detail: { hide: true } }) .get("/account/stream", async ({ headers, cookie }) => { const key = extractAuthKey(headers, cookie); if (!key) return new Response(JSON.stringify({ error: "Unauthorized" }), { status: 401 }); const resolved = await resolveKey(key); if (!resolved) return new Response(JSON.stringify({ error: "Unauthorized" }), { status: 401 }); const limit = Number(process.env.MAX_SSE_PER_ACCOUNT ?? 10); if ((bus.get(resolved.accountId)?.size ?? 0) >= limit) { return new Response(JSON.stringify({ error: "Too many connections" }), { status: 429 }); } return makeSSEStream(resolved.accountId); }, { detail: { hide: true } });