fix: SSE via fetch for auth headers, remove query param auth, add heartbeat every 10s
This commit is contained in:
parent
6d48a83560
commit
31d1fa7b04
|
|
@ -94,20 +94,43 @@ function escapeHtml(str) {
|
||||||
return div.innerHTML;
|
return div.innerHTML;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe to live ping updates for a monitor via SSE
|
// Subscribe to live ping updates for a monitor via SSE (fetch-based for auth header support)
|
||||||
// onPing(ping) called with each new ping object
|
// Returns an AbortController — call .abort() to close
|
||||||
function watchMonitor(monitorId, onPing) {
|
function watchMonitor(monitorId, onPing) {
|
||||||
const key = localStorage.getItem('pingql_key');
|
const key = localStorage.getItem('pingql_key');
|
||||||
if (!key) return null;
|
if (!key) return null;
|
||||||
|
|
||||||
const url = `/monitors/${monitorId}/stream`;
|
const ac = new AbortController();
|
||||||
const es = new EventSource(url + `?auth=${encodeURIComponent(key)}`);
|
|
||||||
|
|
||||||
es.onmessage = (e) => {
|
async function connect() {
|
||||||
try { onPing(JSON.parse(e.data)); } catch {}
|
try {
|
||||||
};
|
const res = await fetch(`/monitors/${monitorId}/stream`, {
|
||||||
es.onerror = () => {
|
headers: { Authorization: `Bearer ${key}` },
|
||||||
// Reconnect is automatic with EventSource
|
signal: ac.signal,
|
||||||
};
|
});
|
||||||
return es;
|
if (!res.ok || !res.body) return;
|
||||||
|
const reader = res.body.getReader();
|
||||||
|
const dec = new TextDecoder();
|
||||||
|
let buf = '';
|
||||||
|
while (true) {
|
||||||
|
const { done, value } = await reader.read();
|
||||||
|
if (done) break;
|
||||||
|
buf += dec.decode(value, { stream: true });
|
||||||
|
const lines = buf.split('\n');
|
||||||
|
buf = lines.pop() ?? '';
|
||||||
|
for (const line of lines) {
|
||||||
|
if (line.startsWith('data: ')) {
|
||||||
|
try { onPing(JSON.parse(line.slice(6))); } catch {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
if (e.name === 'AbortError') return;
|
||||||
|
// Reconnect after a short delay on unexpected disconnect
|
||||||
|
setTimeout(connect, 3000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
connect();
|
||||||
|
return ac;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,14 +17,20 @@ function publish(monitorId: string, data: object) {
|
||||||
|
|
||||||
function makeSSEStream(monitorId: string): Response {
|
function makeSSEStream(monitorId: string): Response {
|
||||||
let ctrl: SSEController;
|
let ctrl: SSEController;
|
||||||
|
let heartbeat: Timer;
|
||||||
const stream = new ReadableStream<Uint8Array>({
|
const stream = new ReadableStream<Uint8Array>({
|
||||||
start(c) {
|
start(c) {
|
||||||
ctrl = c;
|
ctrl = c;
|
||||||
if (!bus.has(monitorId)) bus.set(monitorId, new Set());
|
if (!bus.has(monitorId)) bus.set(monitorId, new Set());
|
||||||
bus.get(monitorId)!.add(ctrl);
|
bus.get(monitorId)!.add(ctrl);
|
||||||
ctrl.enqueue(enc.encode(": connected\n\n"));
|
ctrl.enqueue(enc.encode(": connected\n\n"));
|
||||||
|
// Keepalive — prevents proxies/Cloudflare from closing idle connections
|
||||||
|
heartbeat = setInterval(() => {
|
||||||
|
try { ctrl.enqueue(enc.encode(": heartbeat\n\n")); } catch { clearInterval(heartbeat); }
|
||||||
|
}, 10_000);
|
||||||
},
|
},
|
||||||
cancel() {
|
cancel() {
|
||||||
|
clearInterval(heartbeat);
|
||||||
bus.get(monitorId)?.delete(ctrl);
|
bus.get(monitorId)?.delete(ctrl);
|
||||||
if (bus.get(monitorId)?.size === 0) bus.delete(monitorId);
|
if (bus.get(monitorId)?.size === 0) bus.delete(monitorId);
|
||||||
},
|
},
|
||||||
|
|
@ -78,11 +84,9 @@ export const ingest = new Elysia()
|
||||||
detail: { hide: true },
|
detail: { hide: true },
|
||||||
})
|
})
|
||||||
|
|
||||||
// SSE: stream live pings — auth via Bearer header OR ?auth= query param
|
// SSE: stream live pings — auth via Bearer header
|
||||||
// (EventSource doesn't support custom headers, hence the query param fallback)
|
.get("/monitors/:id/stream", async ({ params, headers, error }) => {
|
||||||
.get("/monitors/:id/stream", async ({ params, headers, query, error }) => {
|
const key = headers["authorization"]?.replace("Bearer ", "").trim();
|
||||||
const key = headers["authorization"]?.replace("Bearer ", "").trim()
|
|
||||||
?? (query.auth as string | undefined);
|
|
||||||
|
|
||||||
if (!key) return error(401, { error: "Unauthorized" });
|
if (!key) return error(401, { error: "Unauthorized" });
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -91,7 +91,7 @@
|
||||||
// SSE: subscribe to all monitors after load so cards update in real time
|
// SSE: subscribe to all monitors after load so cards update in real time
|
||||||
const sseConnections = [];
|
const sseConnections = [];
|
||||||
async function subscribeAll() {
|
async function subscribeAll() {
|
||||||
sseConnections.forEach(es => es.close());
|
sseConnections.forEach(es => es.abort());
|
||||||
sseConnections.length = 0;
|
sseConnections.length = 0;
|
||||||
const monitors = await api('/monitors/');
|
const monitors = await api('/monitors/');
|
||||||
monitors.forEach(m => {
|
monitors.forEach(m => {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue