diff --git a/apps/web/src/dashboard/app.js b/apps/web/src/dashboard/app.js index d944cfc..b7e00f9 100644 --- a/apps/web/src/dashboard/app.js +++ b/apps/web/src/dashboard/app.js @@ -93,3 +93,21 @@ function escapeHtml(str) { div.textContent = str; return div.innerHTML; } + +// Subscribe to live ping updates for a monitor via SSE +// onPing(ping) called with each new ping object +function watchMonitor(monitorId, onPing) { + const key = localStorage.getItem('pingql_key'); + if (!key) return null; + + const url = `/monitors/${monitorId}/stream`; + const es = new EventSource(url + `?auth=${encodeURIComponent(key)}`); + + es.onmessage = (e) => { + try { onPing(JSON.parse(e.data)); } catch {} + }; + es.onerror = () => { + // Reconnect is automatic with EventSource + }; + return es; +} diff --git a/apps/web/src/routes/pings.ts b/apps/web/src/routes/pings.ts index 9d411b8..b9f8df5 100644 --- a/apps/web/src/routes/pings.ts +++ b/apps/web/src/routes/pings.ts @@ -1,8 +1,48 @@ import { Elysia, t } from "elysia"; import sql from "../db"; -// Internal-only: called by the Rust monitor runner +// ── SSE bus ─────────────────────────────────────────────────────────────────── +type SSEController = ReadableStreamDefaultController; +const bus = new Map>(); +const enc = new TextEncoder(); + +function publish(monitorId: string, data: object) { + const subs = bus.get(monitorId); + if (!subs?.size) return; + const msg = enc.encode(`data: ${JSON.stringify(data)}\n\n`); + for (const ctrl of subs) { + try { ctrl.enqueue(msg); } catch { subs.delete(ctrl); } + } +} + +function makeSSEStream(monitorId: string): Response { + let ctrl: SSEController; + const stream = new ReadableStream({ + start(c) { + ctrl = c; + if (!bus.has(monitorId)) bus.set(monitorId, new Set()); + bus.get(monitorId)!.add(ctrl); + ctrl.enqueue(enc.encode(": connected\n\n")); + }, + cancel() { + bus.get(monitorId)?.delete(ctrl); + if (bus.get(monitorId)?.size === 0) bus.delete(monitorId); + }, + }); + return new Response(stream, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + }); +} + +// ── Routes ──────────────────────────────────────────────────────────────────── export const ingest = new Elysia() + + // Internal: called by Rust monitor runner .post("/internal/ingest", async ({ body, headers, error }) => { const token = headers["x-monitor-token"]; if (token !== process.env.MONITOR_TOKEN) return error(401, { error: "Unauthorized" }); @@ -10,7 +50,7 @@ export const ingest = new Elysia() const meta = body.meta ? { ...body.meta } : {}; if (body.cert_expiry_days != null) meta.cert_expiry_days = body.cert_expiry_days; - await sql` + const [ping] = await sql` INSERT INTO pings (monitor_id, status_code, latency_ms, up, error, meta) VALUES ( ${body.monitor_id}, @@ -20,7 +60,10 @@ export const ingest = new Elysia() ${body.error ?? null}, ${Object.keys(meta).length > 0 ? sql.json(meta) : null} ) + RETURNING * `; + + publish(body.monitor_id, ping); return { ok: true }; }, { body: t.Object({ @@ -33,4 +76,28 @@ export const ingest = new Elysia() meta: t.Optional(t.Any()), }), detail: { hide: true }, - }); + }) + + // SSE: stream live pings — auth via Bearer header OR ?auth= query param + // (EventSource doesn't support custom headers, hence the query param fallback) + .get("/monitors/:id/stream", async ({ params, headers, query, error }) => { + const key = headers["authorization"]?.replace("Bearer ", "").trim() + ?? (query.auth as string | undefined); + + if (!key) return error(401, { error: "Unauthorized" }); + + // Resolve account from primary key or sub-key + const [acc] = await sql`SELECT id FROM accounts WHERE id = ${key}`; + const accountId = acc?.id ?? ( + await sql`SELECT account_id FROM api_keys WHERE id = ${key}`.then(r => r[0]?.account_id) + ); + if (!accountId) return error(401, { error: "Unauthorized" }); + + // Verify ownership + const [monitor] = await sql` + SELECT id FROM monitors WHERE id = ${params.id} AND account_id = ${accountId} + `; + if (!monitor) return error(404, { error: "Not found" }); + + return makeSSEStream(params.id); + }, { detail: { hide: true } }); diff --git a/apps/web/src/views/detail.ejs b/apps/web/src/views/detail.ejs index d5d58bf..f6e6883 100644 --- a/apps/web/src/views/detail.ejs +++ b/apps/web/src/views/detail.ejs @@ -267,7 +267,35 @@ }); load(); - setInterval(load, 30000); + setInterval(load, 60000); // fallback poll, less frequent now that SSE handles updates + + // SSE: live ping updates + const id = window.location.pathname.split('/').pop(); + watchMonitor(id, (ping) => { + // Update stat bar + document.getElementById('stat-last').innerHTML = timeAgo(ping.checked_at); + if (ping.latency_ms) document.getElementById('stat-latency').textContent = ping.latency_ms + 'ms'; + + // Prepend new row to ping table + const tbody = document.getElementById('pings-table'); + if (tbody) { + const upBadge = ping.up + ? 'Up' + : 'Down'; + const tr = document.createElement('tr'); + tr.className = 'border-b border-gray-800 fade-in-row'; + tr.innerHTML = ` + ${upBadge} + ${ping.status_code ?? '—'} + ${ping.latency_ms ? ping.latency_ms + 'ms' : '—'} + ${timeAgo(ping.checked_at)} + ${ping.error ?? ''} + `; + tbody.prepend(tr); + // Trim to keep table manageable + while (tbody.children.length > 100) tbody.removeChild(tbody.lastChild); + } + }); <%~ include('./partials/foot') %> diff --git a/apps/web/src/views/home.ejs b/apps/web/src/views/home.ejs index d9dc8da..cdbbfa9 100644 --- a/apps/web/src/views/home.ejs +++ b/apps/web/src/views/home.ejs @@ -59,10 +59,10 @@ const avgLatency = latencies.length ? Math.round(latencies.reduce((a, b) => a + b, 0) / latencies.length) : null; return ` - +
- ${statusBadge(lastPing?.up)} +
${escapeHtml(m.name)}
${escapeHtml(m.url)}
@@ -71,8 +71,8 @@
-
${avgLatency != null ? avgLatency + 'ms' : '—'}
-
${lastPing ? timeAgo(lastPing.checked_at) : 'no pings'}
+
${avgLatency != null ? avgLatency + 'ms' : '—'}
+
${lastPing ? timeAgo(lastPing.checked_at) : 'no pings'}
${m.enabled ? m.interval_s + 's' : 'paused'}
@@ -87,6 +87,31 @@ load(); setInterval(load, 30000); + + // SSE: subscribe to all monitors after load so cards update in real time + const sseConnections = []; + async function subscribeAll() { + sseConnections.forEach(es => es.close()); + sseConnections.length = 0; + const monitors = await api('/monitors/'); + monitors.forEach(m => { + const es = watchMonitor(m.id, (ping) => { + // Update the card's last ping info without full reload + const card = document.querySelector(`[data-monitor-id="${m.id}"]`); + if (!card) return; + const statusDot = card.querySelector('.status-dot'); + const latencyEl = card.querySelector('.stat-latency'); + const lastEl = card.querySelector('.stat-last'); + if (statusDot) { + statusDot.className = `status-dot w-2.5 h-2.5 rounded-full ${ping.up ? 'bg-green-500' : 'bg-red-500'}`; + } + if (latencyEl && ping.latency_ms) latencyEl.textContent = `${ping.latency_ms}ms`; + if (lastEl) lastEl.innerHTML = timeAgo(ping.checked_at); + }); + if (es) sseConnections.push(es); + }); + } + subscribeAll(); <%~ include('./partials/foot') %>