pingql/apps/api/src/routes/pings.ts

175 lines
6.2 KiB
TypeScript

import { Elysia, t } from "elysia";
import sql from "../db";
import { resolveKey } from "./auth";
import { extractAuthKey, safeTokenCompare } from "../../../shared/auth";
type SSEController = ReadableStreamDefaultController<Uint8Array>;
const bus = new Map<string, Set<SSEController>>(); // keyed by accountId
const enc = new TextEncoder();
function publish(accountId: string, data: object) {
const subs = bus.get(accountId);
if (!subs?.size) return;
const msg = enc.encode(`data: ${JSON.stringify(data)}\n\n`);
for (const ctrl of subs) {
try { ctrl.enqueue(msg); } catch { subs.delete(ctrl); }
}
}
function makeSSEStream(accountId: string): Response {
let ctrl: SSEController;
let heartbeat: Timer;
const stream = new ReadableStream<Uint8Array>({
start(c) {
ctrl = c;
if (!bus.has(accountId)) bus.set(accountId, new Set());
bus.get(accountId)!.add(ctrl);
ctrl.enqueue(enc.encode(": connected\n\n"));
heartbeat = setInterval(() => {
try { ctrl.enqueue(enc.encode(": heartbeat\n\n")); } catch {
clearInterval(heartbeat);
bus.get(accountId)?.delete(ctrl);
if (bus.get(accountId)?.size === 0) bus.delete(accountId);
}
}, 10_000);
},
cancel() {
clearInterval(heartbeat);
bus.get(accountId)?.delete(ctrl);
if (bus.get(accountId)?.size === 0) bus.delete(accountId);
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
});
}
export const ingest = new Elysia()
.post("/internal/ingest", async ({ body, headers, set }) => {
const token = headers["x-monitor-token"];
if (!safeTokenCompare(token, process.env.MONITOR_TOKEN)) { set.status = 401; return { error: "Unauthorized" }; }
const [monitor_check] = await sql`
SELECT id, account_id, last_state, consecutive_down, resend_interval
FROM monitors WHERE id = ${body.monitor_id}
`;
if (!monitor_check) { set.status = 404; return { error: "Monitor not found" }; }
const meta = body.meta ? { ...body.meta } : {};
if (body.cert_expiry_days != null) meta.cert_expiry_days = body.cert_expiry_days;
const responseBody: string | null = meta.body_preview ?? null;
delete meta.body_preview;
const checkedAt = body.checked_at ? new Date(body.checked_at) : null;
const scheduledAt = body.scheduled_at ? new Date(body.scheduled_at) : null;
const jitterMs = body.jitter_ms ?? null;
// Important-beat + resend bookkeeping. Important = transition or resend tick.
const newState = body.up ? 'up' : 'down';
const prevState: string | null = monitor_check.last_state;
let consecutiveDown: number = monitor_check.consecutive_down ?? 0;
const resendInterval: number = monitor_check.resend_interval ?? 0;
let important = false;
if (prevState !== newState) {
important = true;
consecutiveDown = body.up ? 0 : 1;
} else if (!body.up) {
consecutiveDown += 1;
if (resendInterval > 0 && consecutiveDown > 1 && (consecutiveDown - 1) % resendInterval === 0) {
important = true;
}
} else {
consecutiveDown = 0;
}
await sql`
UPDATE monitors
SET last_state = ${newState}, consecutive_down = ${consecutiveDown}
WHERE id = ${body.monitor_id}
`;
const [ping] = await sql`
INSERT INTO pings (monitor_id, checked_at, scheduled_at, jitter_ms, status_code, latency_ms, up, important, error, meta, region, run_id)
VALUES (
${body.monitor_id},
${checkedAt ?? sql`now()`},
${scheduledAt},
${jitterMs},
${body.status_code ?? null},
${body.latency_ms ?? null},
${body.up},
${important},
${body.error ?? null},
${Object.keys(meta).length > 0 ? sql.json(meta) : null},
${body.region ?? null},
${body.run_id ?? null}
)
RETURNING *
`;
if (responseBody != null && ping) {
await sql`INSERT INTO ping_bodies (ping_id, body) VALUES (${ping.id}, ${responseBody})`;
}
publish(monitor_check.account_id, ping);
return { ok: true };
}, {
body: t.Object({
monitor_id: t.String(),
checked_at: t.Optional(t.Nullable(t.String())),
scheduled_at: t.Optional(t.Nullable(t.String())),
jitter_ms: t.Optional(t.Nullable(t.Number())),
status_code: t.Optional(t.Nullable(t.Number())),
latency_ms: t.Optional(t.Nullable(t.Number())),
up: t.Boolean(),
error: t.Optional(t.Nullable(t.String())),
cert_expiry_days: t.Optional(t.Nullable(t.Number())),
meta: t.Optional(t.Any()),
region: t.Optional(t.Nullable(t.String())),
run_id: t.Optional(t.Nullable(t.String())),
}),
detail: { hide: true },
})
.get("/pings/:id/body", async ({ params, headers, cookie, set }) => {
const key = extractAuthKey(headers, cookie);
if (!key) { set.status = 401; return { error: "Unauthorized" }; }
const resolved = await resolveKey(key);
if (!resolved) { set.status = 401; return { error: "Unauthorized" }; }
const [ping] = await sql`
SELECT p.id FROM pings p
JOIN monitors m ON m.id = p.monitor_id
WHERE p.id = ${params.id} AND m.account_id = ${resolved.accountId}
`;
if (!ping) { set.status = 404; return { error: "Not found" }; }
const [row] = await sql`SELECT body FROM ping_bodies WHERE ping_id = ${params.id}`;
return { body: row?.body ?? null };
}, { detail: { hide: true } })
.get("/account/stream", async ({ headers, cookie }) => {
const key = extractAuthKey(headers, cookie);
if (!key) return new Response(JSON.stringify({ error: "Unauthorized" }), { status: 401 });
const resolved = await resolveKey(key);
if (!resolved) return new Response(JSON.stringify({ error: "Unauthorized" }), { status: 401 });
const limit = Number(process.env.MAX_SSE_PER_ACCOUNT ?? 10);
if ((bus.get(resolved.accountId)?.size ?? 0) >= limit) {
return new Response(JSON.stringify({ error: "Too many connections" }), { status: 429 });
}
return makeSSEStream(resolved.accountId);
}, { detail: { hide: true } });