From 31d1fa7b04ac68e72fb8120fb8892819b1eac102 Mon Sep 17 00:00:00 2001 From: M1 Date: Mon, 16 Mar 2026 16:17:33 +0400 Subject: [PATCH] fix: SSE via fetch for auth headers, remove query param auth, add heartbeat every 10s --- apps/web/src/dashboard/app.js | 45 ++++++++++++++++++++++++++--------- apps/web/src/routes/pings.ts | 14 +++++++---- apps/web/src/views/home.ejs | 2 +- 3 files changed, 44 insertions(+), 17 deletions(-) diff --git a/apps/web/src/dashboard/app.js b/apps/web/src/dashboard/app.js index b7e00f9..29428ad 100644 --- a/apps/web/src/dashboard/app.js +++ b/apps/web/src/dashboard/app.js @@ -94,20 +94,43 @@ function escapeHtml(str) { return div.innerHTML; } -// Subscribe to live ping updates for a monitor via SSE -// onPing(ping) called with each new ping object +// Subscribe to live ping updates for a monitor via SSE (fetch-based for auth header support) +// Returns an AbortController — call .abort() to close function watchMonitor(monitorId, onPing) { const key = localStorage.getItem('pingql_key'); if (!key) return null; - const url = `/monitors/${monitorId}/stream`; - const es = new EventSource(url + `?auth=${encodeURIComponent(key)}`); + const ac = new AbortController(); - es.onmessage = (e) => { - try { onPing(JSON.parse(e.data)); } catch {} - }; - es.onerror = () => { - // Reconnect is automatic with EventSource - }; - return es; + async function connect() { + try { + const res = await fetch(`/monitors/${monitorId}/stream`, { + headers: { Authorization: `Bearer ${key}` }, + signal: ac.signal, + }); + if (!res.ok || !res.body) return; + const reader = res.body.getReader(); + const dec = new TextDecoder(); + let buf = ''; + while (true) { + const { done, value } = await reader.read(); + if (done) break; + buf += dec.decode(value, { stream: true }); + const lines = buf.split('\n'); + buf = lines.pop() ?? ''; + for (const line of lines) { + if (line.startsWith('data: ')) { + try { onPing(JSON.parse(line.slice(6))); } catch {} + } + } + } + } catch (e) { + if (e.name === 'AbortError') return; + // Reconnect after a short delay on unexpected disconnect + setTimeout(connect, 3000); + } + } + + connect(); + return ac; } diff --git a/apps/web/src/routes/pings.ts b/apps/web/src/routes/pings.ts index b9f8df5..fccc331 100644 --- a/apps/web/src/routes/pings.ts +++ b/apps/web/src/routes/pings.ts @@ -17,14 +17,20 @@ function publish(monitorId: string, data: object) { function makeSSEStream(monitorId: string): Response { let ctrl: SSEController; + let heartbeat: Timer; const stream = new ReadableStream({ start(c) { ctrl = c; if (!bus.has(monitorId)) bus.set(monitorId, new Set()); bus.get(monitorId)!.add(ctrl); ctrl.enqueue(enc.encode(": connected\n\n")); + // Keepalive — prevents proxies/Cloudflare from closing idle connections + heartbeat = setInterval(() => { + try { ctrl.enqueue(enc.encode(": heartbeat\n\n")); } catch { clearInterval(heartbeat); } + }, 10_000); }, cancel() { + clearInterval(heartbeat); bus.get(monitorId)?.delete(ctrl); if (bus.get(monitorId)?.size === 0) bus.delete(monitorId); }, @@ -78,11 +84,9 @@ export const ingest = new Elysia() detail: { hide: true }, }) - // SSE: stream live pings — auth via Bearer header OR ?auth= query param - // (EventSource doesn't support custom headers, hence the query param fallback) - .get("/monitors/:id/stream", async ({ params, headers, query, error }) => { - const key = headers["authorization"]?.replace("Bearer ", "").trim() - ?? (query.auth as string | undefined); + // SSE: stream live pings — auth via Bearer header + .get("/monitors/:id/stream", async ({ params, headers, error }) => { + const key = headers["authorization"]?.replace("Bearer ", "").trim(); if (!key) return error(401, { error: "Unauthorized" }); diff --git a/apps/web/src/views/home.ejs b/apps/web/src/views/home.ejs index cdbbfa9..bd933ff 100644 --- a/apps/web/src/views/home.ejs +++ b/apps/web/src/views/home.ejs @@ -91,7 +91,7 @@ // SSE: subscribe to all monitors after load so cards update in real time const sseConnections = []; async function subscribeAll() { - sseConnections.forEach(es => es.close()); + sseConnections.forEach(es => es.abort()); sseConnections.length = 0; const monitors = await api('/monitors/'); monitors.forEach(m => {