From 66b368453db166abcd95210bff31fbb06668311f Mon Sep 17 00:00:00 2001 From: M1 Date: Tue, 17 Mar 2026 07:06:09 +0400 Subject: [PATCH] refactor: single account-level SSE stream instead of per-monitor connections --- apps/web/src/dashboard/app.js | 10 ++++----- apps/web/src/routes/pings.ts | 39 ++++++++++++++++------------------- apps/web/src/views/detail.ejs | 5 +++-- apps/web/src/views/home.ejs | 13 ++++++------ 4 files changed, 32 insertions(+), 35 deletions(-) diff --git a/apps/web/src/dashboard/app.js b/apps/web/src/dashboard/app.js index 28e50b9..3f89537 100644 --- a/apps/web/src/dashboard/app.js +++ b/apps/web/src/dashboard/app.js @@ -58,14 +58,15 @@ function escapeHtml(str) { return div.innerHTML; } -// Subscribe to live ping updates for a monitor via SSE (fetch-based for auth header support) -// Returns an AbortController — call .abort() to close -function watchMonitor(monitorId, onPing) { +// Subscribe to live ping updates for the whole account via a single SSE stream. +// onPing receives each ping object (includes monitor_id). +// Returns an AbortController — call .abort() to close. +function watchAccount(onPing) { const ac = new AbortController(); async function connect() { try { - const res = await fetch(`/monitors/${monitorId}/stream`, { + const res = await fetch(`/account/stream`, { credentials: 'same-origin', signal: ac.signal, }); @@ -87,7 +88,6 @@ function watchMonitor(monitorId, onPing) { } } catch (e) { if (e.name === 'AbortError') return; - // Reconnect after a short delay on unexpected disconnect setTimeout(connect, 3000); } } diff --git a/apps/web/src/routes/pings.ts b/apps/web/src/routes/pings.ts index de8fbec..c1d619b 100644 --- a/apps/web/src/routes/pings.ts +++ b/apps/web/src/routes/pings.ts @@ -4,11 +4,11 @@ import { resolveKey } from "./auth"; // ── SSE bus ─────────────────────────────────────────────────────────────────── type SSEController = ReadableStreamDefaultController; -const bus = new Map>(); +const bus = new Map>(); // keyed by accountId const enc = new TextEncoder(); -function publish(monitorId: string, data: object) { - const subs = bus.get(monitorId); +function publish(accountId: string, data: object) { + const subs = bus.get(accountId); if (!subs?.size) return; const msg = enc.encode(`data: ${JSON.stringify(data)}\n\n`); for (const ctrl of subs) { @@ -16,24 +16,23 @@ function publish(monitorId: string, data: object) { } } -function makeSSEStream(monitorId: string): Response { +function makeSSEStream(accountId: string): Response { let ctrl: SSEController; let heartbeat: Timer; const stream = new ReadableStream({ start(c) { ctrl = c; - if (!bus.has(monitorId)) bus.set(monitorId, new Set()); - bus.get(monitorId)!.add(ctrl); + if (!bus.has(accountId)) bus.set(accountId, new Set()); + bus.get(accountId)!.add(ctrl); ctrl.enqueue(enc.encode(": connected\n\n")); - // Keepalive — prevents proxies/Cloudflare from closing idle connections heartbeat = setInterval(() => { try { ctrl.enqueue(enc.encode(": heartbeat\n\n")); } catch { clearInterval(heartbeat); } }, 10_000); }, cancel() { clearInterval(heartbeat); - bus.get(monitorId)?.delete(ctrl); - if (bus.get(monitorId)?.size === 0) bus.delete(monitorId); + bus.get(accountId)?.delete(ctrl); + if (bus.get(accountId)?.size === 0) bus.delete(accountId); }, }); return new Response(stream, { @@ -70,7 +69,10 @@ export const ingest = new Elysia() RETURNING * `; - publish(body.monitor_id, ping); + // Look up account and publish to account-level bus + const [monitor] = await sql`SELECT account_id FROM monitors WHERE id = ${body.monitor_id}`; + if (monitor) publish(monitor.account_id, ping); + return { ok: true }; }, { body: t.Object({ @@ -85,8 +87,8 @@ export const ingest = new Elysia() detail: { hide: true }, }) - // SSE: stream live pings — auth via Bearer header or cookie - .get("/monitors/:id/stream", async ({ params, headers, cookie }) => { + // SSE: single stream for all of the account's monitors + .get("/account/stream", async ({ headers, cookie }) => { const authHeader = headers["authorization"] ?? ""; const bearer = authHeader.match(/^bearer\s+(.+)$/i)?.[1]?.trim(); const key = bearer ?? cookie?.pingql_key?.value; @@ -96,15 +98,10 @@ export const ingest = new Elysia() const resolved = await resolveKey(key); if (!resolved) return new Response(JSON.stringify({ error: "Unauthorized" }), { status: 401 }); - const [monitor] = await sql` - SELECT id FROM monitors WHERE id = ${params.id} AND account_id = ${resolved.accountId} - `; - if (!monitor) return new Response(JSON.stringify({ error: "Not found" }), { status: 404 }); - - const limit = Number(process.env.MAX_SSE_PER_MONITOR ?? 10); - if ((bus.get(params.id)?.size ?? 0) >= limit) { - return new Response(JSON.stringify({ error: "Too many connections for this monitor" }), { status: 429 }); + const limit = Number(process.env.MAX_SSE_PER_ACCOUNT ?? 10); + if ((bus.get(resolved.accountId)?.size ?? 0) >= limit) { + return new Response(JSON.stringify({ error: "Too many connections" }), { status: 429 }); } - return makeSSEStream(params.id); + return makeSSEStream(resolved.accountId); }, { detail: { hide: true } }); diff --git a/apps/web/src/views/detail.ejs b/apps/web/src/views/detail.ejs index f3e3341..1943b8c 100644 --- a/apps/web/src/views/detail.ejs +++ b/apps/web/src/views/detail.ejs @@ -265,9 +265,10 @@ } }); - // SSE: on each ping, refresh the chart (debounced — at most once per 5s) + // SSE: account stream filtered to this monitor — refresh chart on ping let _chartRefreshTimer = null; - watchMonitor(monitorId, () => { + watchAccount((ping) => { + if (ping.monitor_id !== monitorId) return; if (_chartRefreshTimer) return; _chartRefreshTimer = setTimeout(async () => { _chartRefreshTimer = null; diff --git a/apps/web/src/views/home.ejs b/apps/web/src/views/home.ejs index 4297f58..9f3e1a9 100644 --- a/apps/web/src/views/home.ejs +++ b/apps/web/src/views/home.ejs @@ -88,13 +88,12 @@ }, 5000); } - // SSE: subscribe to all monitors, refresh sparkline on each ping - document.querySelectorAll('[data-monitor-id]').forEach(card => { - const mid = card.dataset.monitorId; - watchMonitor(mid, () => { - const sparkEl = card.querySelector('.stat-sparkline'); - if (sparkEl) scheduleSparklineRefresh(mid, sparkEl); - }); + // SSE: single account stream — refresh sparkline for the relevant card on each ping + watchAccount((ping) => { + const card = document.querySelector(`[data-monitor-id="${ping.monitor_id}"]`); + if (!card) return; + const sparkEl = card.querySelector('.stat-sparkline'); + if (sparkEl) scheduleSparklineRefresh(ping.monitor_id, sparkEl); });