fix: use native EventSource for SSE instead of fetch with manual parsing

This commit is contained in:
nate 2026-03-19 00:24:16 +04:00
parent c3103f06ce
commit 0854914411
2 changed files with 38 additions and 91 deletions

View File

@ -56,62 +56,33 @@ async function refreshMaps() {
}
// ── SSE streams per chain ───────────────────────────────────────────
const activeStreams = new Map<string, AbortController>();
const activeStreams = new Map<string, EventSource>();
/** Start a raw SSE stream for a chain — receives ALL txs and blocks. */
function startChainStream(chain: string) {
if (activeStreams.has(chain)) return;
const ac = new AbortController();
activeStreams.set(chain, ac);
const query = { crypto: chain };
const q = Buffer.from(JSON.stringify(query)).toString("base64");
const url = `${SOCK_API}/sse?q=${q}`;
connectSSE(chain, url, ac.signal);
}
const es = new EventSource(url);
activeStreams.set(chain, es);
async function connectSSE(chain: string, url: string, signal: AbortSignal) {
while (!signal.aborted) {
es.onmessage = (e) => {
try {
const res = await fetch(url, { signal });
if (!res.ok || !res.body) {
console.error(`SSE ${chain}: HTTP ${res.status}`);
await sleep(5000);
continue;
const event = JSON.parse(e.data);
if (event.type === "block") {
handleBlockEvent(chain, event).catch(() => {});
} else {
handleTxEvent(chain, event).catch(() => {});
}
} catch {}
};
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (!signal.aborted) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
try {
const event = JSON.parse(line.slice(6));
if (event.type === "block") {
await handleBlockEvent(chain, event);
} else {
await handleTxEvent(chain, event);
}
} catch {}
}
}
} catch (e: any) {
if (signal.aborted) return;
console.error(`SSE ${chain} error:`, e.message);
}
if (!signal.aborted) await sleep(3000);
}
es.onerror = () => {
console.error(`SSE ${chain}: connection error (will auto-reconnect)`);
};
}
/** Handle a tx event — compare all outputs against our active addresses. */
@ -237,9 +208,9 @@ async function syncStreams() {
if (COINS[chain]) startChainStream(chain);
}
for (const [chain, ac] of activeStreams) {
for (const [chain, es] of activeStreams) {
if (!chainsNeeded.has(chain)) {
ac.abort();
es.close();
activeStreams.delete(chain);
}
}
@ -366,7 +337,3 @@ export async function expireProPlans() {
console.log(`Downgraded ${result.count} expired pro accounts to free`);
}
}
function sleep(ms: number) {
return new Promise(resolve => setTimeout(resolve, ms));
}

View File

@ -136,7 +136,6 @@
let paymentId = null;
let pollInterval = null;
let countdownInterval = null;
let sseAbort = null;
// Fetch available coins on load
(async () => {
@ -248,55 +247,36 @@
let watchedAddress = null;
let watchedTxids = [];
/** Listen to raw SSE for this coin, match tx outputs against our address locally.
/** Listen to raw SSE via EventSource for this coin, match tx outputs locally.
* Tracks multiple txids (user may send across several transactions).
* On block, checks if any of our txids got confirmed. */
let eventSource = null;
function watchAddress(coin, address) {
if (sseAbort) sseAbort.abort();
sseAbort = new AbortController();
if (eventSource) { eventSource.close(); eventSource = null; }
watchedAddress = address;
watchedTxids = [];
// Raw stream for this coin — all txs and blocks, no query filter
const query = { crypto: coin };
const q = btoa(JSON.stringify(query));
const url = `${SOCK_API}/sse?q=${q}`;
(async function connect() {
while (!sseAbort.signal.aborted) {
try {
const res = await fetch(url, { signal: sseAbort.signal });
if (!res.ok || !res.body) { await new Promise(r => setTimeout(r, 3000)); continue; }
eventSource = new EventSource(url);
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (!sseAbort.signal.aborted) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() ?? '';
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
try {
const event = JSON.parse(line.slice(6));
if (event.type === 'block') {
onBlock(event);
} else {
onTx(event);
}
} catch {}
}
}
} catch (e) {
if (sseAbort.signal.aborted) return;
eventSource.onmessage = (e) => {
try {
const event = JSON.parse(e.data);
if (event.type === 'block') {
onBlock(event);
} else {
onTx(event);
}
if (!sseAbort.signal.aborted) await new Promise(r => setTimeout(r, 3000));
}
})();
} catch {}
};
eventSource.onerror = () => {
// EventSource auto-reconnects
};
}
/** Check if any tx output matches our payment address. */
@ -352,14 +332,14 @@
} else if (data.status === 'paid') {
clearInterval(pollInterval);
clearInterval(countdownInterval);
if (sseAbort) { sseAbort.abort(); sseAbort = null; }
if (eventSource) { eventSource.close(); eventSource = null; }
document.getElementById('pay-status-section').classList.add('hidden');
document.getElementById('pay-success').classList.remove('hidden');
setTimeout(() => { window.location.href = '/dashboard/settings'; }, 3000);
} else if (data.status === 'expired') {
clearInterval(pollInterval);
clearInterval(countdownInterval);
if (sseAbort) { sseAbort.abort(); sseAbort = null; }
if (eventSource) { eventSource.close(); eventSource = null; }
document.getElementById('pay-status-section').classList.add('hidden');
document.getElementById('pay-expired').classList.remove('hidden');
}
@ -396,7 +376,7 @@
}
function resetCheckout() {
if (sseAbort) { sseAbort.abort(); sseAbort = null; }
if (eventSource) { eventSource.close(); eventSource = null; }
if (pollInterval) { clearInterval(pollInterval); pollInterval = null; }
if (countdownInterval) { clearInterval(countdownInterval); countdownInterval = null; }
watchedAddress = null;