feat: SSE live ping stream for monitors

This commit is contained in:
M1 2026-03-16 16:14:23 +04:00
parent 1e95149456
commit 6d48a83560
4 changed files with 146 additions and 8 deletions

View File

@ -93,3 +93,21 @@ function escapeHtml(str) {
div.textContent = str; div.textContent = str;
return div.innerHTML; return div.innerHTML;
} }
// Subscribe to live ping updates for a monitor via SSE
// onPing(ping) called with each new ping object
function watchMonitor(monitorId, onPing) {
const key = localStorage.getItem('pingql_key');
if (!key) return null;
const url = `/monitors/${monitorId}/stream`;
const es = new EventSource(url + `?auth=${encodeURIComponent(key)}`);
es.onmessage = (e) => {
try { onPing(JSON.parse(e.data)); } catch {}
};
es.onerror = () => {
// Reconnect is automatic with EventSource
};
return es;
}

View File

@ -1,8 +1,48 @@
import { Elysia, t } from "elysia"; import { Elysia, t } from "elysia";
import sql from "../db"; import sql from "../db";
// Internal-only: called by the Rust monitor runner // ── SSE bus ───────────────────────────────────────────────────────────────────
type SSEController = ReadableStreamDefaultController<Uint8Array>;
const bus = new Map<string, Set<SSEController>>();
const enc = new TextEncoder();
function publish(monitorId: string, data: object) {
const subs = bus.get(monitorId);
if (!subs?.size) return;
const msg = enc.encode(`data: ${JSON.stringify(data)}\n\n`);
for (const ctrl of subs) {
try { ctrl.enqueue(msg); } catch { subs.delete(ctrl); }
}
}
function makeSSEStream(monitorId: string): Response {
let ctrl: SSEController;
const stream = new ReadableStream<Uint8Array>({
start(c) {
ctrl = c;
if (!bus.has(monitorId)) bus.set(monitorId, new Set());
bus.get(monitorId)!.add(ctrl);
ctrl.enqueue(enc.encode(": connected\n\n"));
},
cancel() {
bus.get(monitorId)?.delete(ctrl);
if (bus.get(monitorId)?.size === 0) bus.delete(monitorId);
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
});
}
// ── Routes ────────────────────────────────────────────────────────────────────
export const ingest = new Elysia() export const ingest = new Elysia()
// Internal: called by Rust monitor runner
.post("/internal/ingest", async ({ body, headers, error }) => { .post("/internal/ingest", async ({ body, headers, error }) => {
const token = headers["x-monitor-token"]; const token = headers["x-monitor-token"];
if (token !== process.env.MONITOR_TOKEN) return error(401, { error: "Unauthorized" }); if (token !== process.env.MONITOR_TOKEN) return error(401, { error: "Unauthorized" });
@ -10,7 +50,7 @@ export const ingest = new Elysia()
const meta = body.meta ? { ...body.meta } : {}; const meta = body.meta ? { ...body.meta } : {};
if (body.cert_expiry_days != null) meta.cert_expiry_days = body.cert_expiry_days; if (body.cert_expiry_days != null) meta.cert_expiry_days = body.cert_expiry_days;
await sql` const [ping] = await sql`
INSERT INTO pings (monitor_id, status_code, latency_ms, up, error, meta) INSERT INTO pings (monitor_id, status_code, latency_ms, up, error, meta)
VALUES ( VALUES (
${body.monitor_id}, ${body.monitor_id},
@ -20,7 +60,10 @@ export const ingest = new Elysia()
${body.error ?? null}, ${body.error ?? null},
${Object.keys(meta).length > 0 ? sql.json(meta) : null} ${Object.keys(meta).length > 0 ? sql.json(meta) : null}
) )
RETURNING *
`; `;
publish(body.monitor_id, ping);
return { ok: true }; return { ok: true };
}, { }, {
body: t.Object({ body: t.Object({
@ -33,4 +76,28 @@ export const ingest = new Elysia()
meta: t.Optional(t.Any()), meta: t.Optional(t.Any()),
}), }),
detail: { hide: true }, detail: { hide: true },
}); })
// SSE: stream live pings — auth via Bearer header OR ?auth= query param
// (EventSource doesn't support custom headers, hence the query param fallback)
.get("/monitors/:id/stream", async ({ params, headers, query, error }) => {
const key = headers["authorization"]?.replace("Bearer ", "").trim()
?? (query.auth as string | undefined);
if (!key) return error(401, { error: "Unauthorized" });
// Resolve account from primary key or sub-key
const [acc] = await sql`SELECT id FROM accounts WHERE id = ${key}`;
const accountId = acc?.id ?? (
await sql`SELECT account_id FROM api_keys WHERE id = ${key}`.then(r => r[0]?.account_id)
);
if (!accountId) return error(401, { error: "Unauthorized" });
// Verify ownership
const [monitor] = await sql`
SELECT id FROM monitors WHERE id = ${params.id} AND account_id = ${accountId}
`;
if (!monitor) return error(404, { error: "Not found" });
return makeSSEStream(params.id);
}, { detail: { hide: true } });

View File

@ -267,7 +267,35 @@
}); });
load(); load();
setInterval(load, 30000); setInterval(load, 60000); // fallback poll, less frequent now that SSE handles updates
// SSE: live ping updates
const id = window.location.pathname.split('/').pop();
watchMonitor(id, (ping) => {
// Update stat bar
document.getElementById('stat-last').innerHTML = timeAgo(ping.checked_at);
if (ping.latency_ms) document.getElementById('stat-latency').textContent = ping.latency_ms + 'ms';
// Prepend new row to ping table
const tbody = document.getElementById('pings-table');
if (tbody) {
const upBadge = ping.up
? '<span class="text-green-400">Up</span>'
: '<span class="text-red-400">Down</span>';
const tr = document.createElement('tr');
tr.className = 'border-b border-gray-800 fade-in-row';
tr.innerHTML = `
<td class="px-4 py-2">${upBadge}</td>
<td class="px-4 py-2 text-gray-300">${ping.status_code ?? '—'}</td>
<td class="px-4 py-2 text-gray-300">${ping.latency_ms ? ping.latency_ms + 'ms' : '—'}</td>
<td class="px-4 py-2 text-gray-500">${timeAgo(ping.checked_at)}</td>
<td class="px-4 py-2 text-red-400 text-xs">${ping.error ?? ''}</td>
`;
tbody.prepend(tr);
// Trim to keep table manageable
while (tbody.children.length > 100) tbody.removeChild(tbody.lastChild);
}
});
</script> </script>
<%~ include('./partials/foot') %> <%~ include('./partials/foot') %>

View File

@ -59,10 +59,10 @@
const avgLatency = latencies.length ? Math.round(latencies.reduce((a, b) => a + b, 0) / latencies.length) : null; const avgLatency = latencies.length ? Math.round(latencies.reduce((a, b) => a + b, 0) / latencies.length) : null;
return ` return `
<a href="/dashboard/monitors/${m.id}" class="block bg-gray-900 hover:bg-gray-800/80 border border-gray-800 rounded-xl p-4 transition-colors group"> <a href="/dashboard/monitors/${m.id}" data-monitor-id="${m.id}" class="block bg-gray-900 hover:bg-gray-800/80 border border-gray-800 rounded-xl p-4 transition-colors group">
<div class="flex items-center justify-between"> <div class="flex items-center justify-between">
<div class="flex items-center gap-3 min-w-0"> <div class="flex items-center gap-3 min-w-0">
${statusBadge(lastPing?.up)} <span class="status-dot w-2.5 h-2.5 rounded-full ${lastPing == null ? 'bg-gray-600' : lastPing.up ? 'bg-green-500' : 'bg-red-500'}"></span>
<div class="min-w-0"> <div class="min-w-0">
<div class="font-medium text-gray-100 group-hover:text-white truncate">${escapeHtml(m.name)}</div> <div class="font-medium text-gray-100 group-hover:text-white truncate">${escapeHtml(m.name)}</div>
<div class="text-xs text-gray-500 truncate">${escapeHtml(m.url)}</div> <div class="text-xs text-gray-500 truncate">${escapeHtml(m.url)}</div>
@ -71,8 +71,8 @@
<div class="flex items-center gap-6 shrink-0 ml-4"> <div class="flex items-center gap-6 shrink-0 ml-4">
<div class="hidden sm:block">${sparkline(latencies)}</div> <div class="hidden sm:block">${sparkline(latencies)}</div>
<div class="text-right"> <div class="text-right">
<div class="text-sm text-gray-300">${avgLatency != null ? avgLatency + 'ms' : '—'}</div> <div class="text-sm text-gray-300 stat-latency">${avgLatency != null ? avgLatency + 'ms' : '—'}</div>
<div class="text-xs text-gray-500">${lastPing ? timeAgo(lastPing.checked_at) : 'no pings'}</div> <div class="text-xs text-gray-500 stat-last">${lastPing ? timeAgo(lastPing.checked_at) : 'no pings'}</div>
</div> </div>
<div class="text-xs px-2 py-1 rounded ${m.enabled ? 'bg-gray-800 text-gray-400' : 'bg-yellow-900/30 text-yellow-500'}">${m.enabled ? m.interval_s + 's' : 'paused'}</div> <div class="text-xs px-2 py-1 rounded ${m.enabled ? 'bg-gray-800 text-gray-400' : 'bg-yellow-900/30 text-yellow-500'}">${m.enabled ? m.interval_s + 's' : 'paused'}</div>
</div> </div>
@ -87,6 +87,31 @@
load(); load();
setInterval(load, 30000); setInterval(load, 30000);
// SSE: subscribe to all monitors after load so cards update in real time
const sseConnections = [];
async function subscribeAll() {
sseConnections.forEach(es => es.close());
sseConnections.length = 0;
const monitors = await api('/monitors/');
monitors.forEach(m => {
const es = watchMonitor(m.id, (ping) => {
// Update the card's last ping info without full reload
const card = document.querySelector(`[data-monitor-id="${m.id}"]`);
if (!card) return;
const statusDot = card.querySelector('.status-dot');
const latencyEl = card.querySelector('.stat-latency');
const lastEl = card.querySelector('.stat-last');
if (statusDot) {
statusDot.className = `status-dot w-2.5 h-2.5 rounded-full ${ping.up ? 'bg-green-500' : 'bg-red-500'}`;
}
if (latencyEl && ping.latency_ms) latencyEl.textContent = `${ping.latency_ms}ms`;
if (lastEl) lastEl.innerHTML = timeAgo(ping.checked_at);
});
if (es) sseConnections.push(es);
});
}
subscribeAll();
</script> </script>
<%~ include('./partials/foot') %> <%~ include('./partials/foot') %>