refactor: single account-level SSE stream instead of per-monitor connections

This commit is contained in:
M1 2026-03-17 07:06:09 +04:00
parent 55f9f6d8ed
commit 66b368453d
4 changed files with 32 additions and 35 deletions

View File

@ -58,14 +58,15 @@ function escapeHtml(str) {
return div.innerHTML;
}
// Subscribe to live ping updates for a monitor via SSE (fetch-based for auth header support)
// Returns an AbortController — call .abort() to close
function watchMonitor(monitorId, onPing) {
// Subscribe to live ping updates for the whole account via a single SSE stream.
// onPing receives each ping object (includes monitor_id).
// Returns an AbortController — call .abort() to close.
function watchAccount(onPing) {
const ac = new AbortController();
async function connect() {
try {
const res = await fetch(`/monitors/${monitorId}/stream`, {
const res = await fetch(`/account/stream`, {
credentials: 'same-origin',
signal: ac.signal,
});
@ -87,7 +88,6 @@ function watchMonitor(monitorId, onPing) {
}
} catch (e) {
if (e.name === 'AbortError') return;
// Reconnect after a short delay on unexpected disconnect
setTimeout(connect, 3000);
}
}

View File

@ -4,11 +4,11 @@ import { resolveKey } from "./auth";
// ── SSE bus ───────────────────────────────────────────────────────────────────
type SSEController = ReadableStreamDefaultController<Uint8Array>;
const bus = new Map<string, Set<SSEController>>();
const bus = new Map<string, Set<SSEController>>(); // keyed by accountId
const enc = new TextEncoder();
function publish(monitorId: string, data: object) {
const subs = bus.get(monitorId);
function publish(accountId: string, data: object) {
const subs = bus.get(accountId);
if (!subs?.size) return;
const msg = enc.encode(`data: ${JSON.stringify(data)}\n\n`);
for (const ctrl of subs) {
@ -16,24 +16,23 @@ function publish(monitorId: string, data: object) {
}
}
function makeSSEStream(monitorId: string): Response {
function makeSSEStream(accountId: string): Response {
let ctrl: SSEController;
let heartbeat: Timer;
const stream = new ReadableStream<Uint8Array>({
start(c) {
ctrl = c;
if (!bus.has(monitorId)) bus.set(monitorId, new Set());
bus.get(monitorId)!.add(ctrl);
if (!bus.has(accountId)) bus.set(accountId, new Set());
bus.get(accountId)!.add(ctrl);
ctrl.enqueue(enc.encode(": connected\n\n"));
// Keepalive — prevents proxies/Cloudflare from closing idle connections
heartbeat = setInterval(() => {
try { ctrl.enqueue(enc.encode(": heartbeat\n\n")); } catch { clearInterval(heartbeat); }
}, 10_000);
},
cancel() {
clearInterval(heartbeat);
bus.get(monitorId)?.delete(ctrl);
if (bus.get(monitorId)?.size === 0) bus.delete(monitorId);
bus.get(accountId)?.delete(ctrl);
if (bus.get(accountId)?.size === 0) bus.delete(accountId);
},
});
return new Response(stream, {
@ -70,7 +69,10 @@ export const ingest = new Elysia()
RETURNING *
`;
publish(body.monitor_id, ping);
// Look up account and publish to account-level bus
const [monitor] = await sql`SELECT account_id FROM monitors WHERE id = ${body.monitor_id}`;
if (monitor) publish(monitor.account_id, ping);
return { ok: true };
}, {
body: t.Object({
@ -85,8 +87,8 @@ export const ingest = new Elysia()
detail: { hide: true },
})
// SSE: stream live pings — auth via Bearer header or cookie
.get("/monitors/:id/stream", async ({ params, headers, cookie }) => {
// SSE: single stream for all of the account's monitors
.get("/account/stream", async ({ headers, cookie }) => {
const authHeader = headers["authorization"] ?? "";
const bearer = authHeader.match(/^bearer\s+(.+)$/i)?.[1]?.trim();
const key = bearer ?? cookie?.pingql_key?.value;
@ -96,15 +98,10 @@ export const ingest = new Elysia()
const resolved = await resolveKey(key);
if (!resolved) return new Response(JSON.stringify({ error: "Unauthorized" }), { status: 401 });
const [monitor] = await sql`
SELECT id FROM monitors WHERE id = ${params.id} AND account_id = ${resolved.accountId}
`;
if (!monitor) return new Response(JSON.stringify({ error: "Not found" }), { status: 404 });
const limit = Number(process.env.MAX_SSE_PER_MONITOR ?? 10);
if ((bus.get(params.id)?.size ?? 0) >= limit) {
return new Response(JSON.stringify({ error: "Too many connections for this monitor" }), { status: 429 });
const limit = Number(process.env.MAX_SSE_PER_ACCOUNT ?? 10);
if ((bus.get(resolved.accountId)?.size ?? 0) >= limit) {
return new Response(JSON.stringify({ error: "Too many connections" }), { status: 429 });
}
return makeSSEStream(params.id);
return makeSSEStream(resolved.accountId);
}, { detail: { hide: true } });

View File

@ -265,9 +265,10 @@
}
});
// SSE: on each ping, refresh the chart (debounced — at most once per 5s)
// SSE: account stream filtered to this monitor — refresh chart on ping
let _chartRefreshTimer = null;
watchMonitor(monitorId, () => {
watchAccount((ping) => {
if (ping.monitor_id !== monitorId) return;
if (_chartRefreshTimer) return;
_chartRefreshTimer = setTimeout(async () => {
_chartRefreshTimer = null;

View File

@ -88,13 +88,12 @@
}, 5000);
}
// SSE: subscribe to all monitors, refresh sparkline on each ping
document.querySelectorAll('[data-monitor-id]').forEach(card => {
const mid = card.dataset.monitorId;
watchMonitor(mid, () => {
// SSE: single account stream — refresh sparkline for the relevant card on each ping
watchAccount((ping) => {
const card = document.querySelector(`[data-monitor-id="${ping.monitor_id}"]`);
if (!card) return;
const sparkEl = card.querySelector('.stat-sparkline');
if (sparkEl) scheduleSparklineRefresh(mid, sparkEl);
});
if (sparkEl) scheduleSparklineRefresh(ping.monitor_id, sparkEl);
});
</script>