diff --git a/apps/pay/src/monitor.ts b/apps/pay/src/monitor.ts index 429573f..dc994cc 100644 --- a/apps/pay/src/monitor.ts +++ b/apps/pay/src/monitor.ts @@ -56,62 +56,33 @@ async function refreshMaps() { } // ── SSE streams per chain ─────────────────────────────────────────── -const activeStreams = new Map(); +const activeStreams = new Map(); /** Start a raw SSE stream for a chain — receives ALL txs and blocks. */ function startChainStream(chain: string) { if (activeStreams.has(chain)) return; - const ac = new AbortController(); - activeStreams.set(chain, ac); - const query = { crypto: chain }; const q = Buffer.from(JSON.stringify(query)).toString("base64"); const url = `${SOCK_API}/sse?q=${q}`; - connectSSE(chain, url, ac.signal); -} + const es = new EventSource(url); + activeStreams.set(chain, es); -async function connectSSE(chain: string, url: string, signal: AbortSignal) { - while (!signal.aborted) { + es.onmessage = (e) => { try { - const res = await fetch(url, { signal }); - if (!res.ok || !res.body) { - console.error(`SSE ${chain}: HTTP ${res.status}`); - await sleep(5000); - continue; + const event = JSON.parse(e.data); + if (event.type === "block") { + handleBlockEvent(chain, event).catch(() => {}); + } else { + handleTxEvent(chain, event).catch(() => {}); } + } catch {} + }; - const reader = res.body.getReader(); - const decoder = new TextDecoder(); - let buffer = ""; - - while (!signal.aborted) { - const { done, value } = await reader.read(); - if (done) break; - - buffer += decoder.decode(value, { stream: true }); - const lines = buffer.split("\n"); - buffer = lines.pop() ?? ""; - - for (const line of lines) { - if (!line.startsWith("data: ")) continue; - try { - const event = JSON.parse(line.slice(6)); - if (event.type === "block") { - await handleBlockEvent(chain, event); - } else { - await handleTxEvent(chain, event); - } - } catch {} - } - } - } catch (e: any) { - if (signal.aborted) return; - console.error(`SSE ${chain} error:`, e.message); - } - if (!signal.aborted) await sleep(3000); - } + es.onerror = () => { + console.error(`SSE ${chain}: connection error (will auto-reconnect)`); + }; } /** Handle a tx event — compare all outputs against our active addresses. */ @@ -237,9 +208,9 @@ async function syncStreams() { if (COINS[chain]) startChainStream(chain); } - for (const [chain, ac] of activeStreams) { + for (const [chain, es] of activeStreams) { if (!chainsNeeded.has(chain)) { - ac.abort(); + es.close(); activeStreams.delete(chain); } } @@ -365,8 +336,4 @@ export async function expireProPlans() { if (result.count > 0) { console.log(`Downgraded ${result.count} expired pro accounts to free`); } -} - -function sleep(ms: number) { - return new Promise(resolve => setTimeout(resolve, ms)); -} +} \ No newline at end of file diff --git a/apps/web/src/views/checkout.ejs b/apps/web/src/views/checkout.ejs index 0d5e2c0..e239a5a 100644 --- a/apps/web/src/views/checkout.ejs +++ b/apps/web/src/views/checkout.ejs @@ -136,7 +136,6 @@ let paymentId = null; let pollInterval = null; let countdownInterval = null; - let sseAbort = null; // Fetch available coins on load (async () => { @@ -248,55 +247,36 @@ let watchedAddress = null; let watchedTxids = []; - /** Listen to raw SSE for this coin, match tx outputs against our address locally. + /** Listen to raw SSE via EventSource for this coin, match tx outputs locally. * Tracks multiple txids (user may send across several transactions). * On block, checks if any of our txids got confirmed. */ + let eventSource = null; + function watchAddress(coin, address) { - if (sseAbort) sseAbort.abort(); - sseAbort = new AbortController(); + if (eventSource) { eventSource.close(); eventSource = null; } watchedAddress = address; watchedTxids = []; - // Raw stream for this coin — all txs and blocks, no query filter const query = { crypto: coin }; const q = btoa(JSON.stringify(query)); const url = `${SOCK_API}/sse?q=${q}`; - (async function connect() { - while (!sseAbort.signal.aborted) { - try { - const res = await fetch(url, { signal: sseAbort.signal }); - if (!res.ok || !res.body) { await new Promise(r => setTimeout(r, 3000)); continue; } + eventSource = new EventSource(url); - const reader = res.body.getReader(); - const decoder = new TextDecoder(); - let buffer = ''; - - while (!sseAbort.signal.aborted) { - const { done, value } = await reader.read(); - if (done) break; - buffer += decoder.decode(value, { stream: true }); - const lines = buffer.split('\n'); - buffer = lines.pop() ?? ''; - - for (const line of lines) { - if (!line.startsWith('data: ')) continue; - try { - const event = JSON.parse(line.slice(6)); - if (event.type === 'block') { - onBlock(event); - } else { - onTx(event); - } - } catch {} - } - } - } catch (e) { - if (sseAbort.signal.aborted) return; + eventSource.onmessage = (e) => { + try { + const event = JSON.parse(e.data); + if (event.type === 'block') { + onBlock(event); + } else { + onTx(event); } - if (!sseAbort.signal.aborted) await new Promise(r => setTimeout(r, 3000)); - } - })(); + } catch {} + }; + + eventSource.onerror = () => { + // EventSource auto-reconnects + }; } /** Check if any tx output matches our payment address. */ @@ -352,14 +332,14 @@ } else if (data.status === 'paid') { clearInterval(pollInterval); clearInterval(countdownInterval); - if (sseAbort) { sseAbort.abort(); sseAbort = null; } + if (eventSource) { eventSource.close(); eventSource = null; } document.getElementById('pay-status-section').classList.add('hidden'); document.getElementById('pay-success').classList.remove('hidden'); setTimeout(() => { window.location.href = '/dashboard/settings'; }, 3000); } else if (data.status === 'expired') { clearInterval(pollInterval); clearInterval(countdownInterval); - if (sseAbort) { sseAbort.abort(); sseAbort = null; } + if (eventSource) { eventSource.close(); eventSource = null; } document.getElementById('pay-status-section').classList.add('hidden'); document.getElementById('pay-expired').classList.remove('hidden'); } @@ -396,7 +376,7 @@ } function resetCheckout() { - if (sseAbort) { sseAbort.abort(); sseAbort = null; } + if (eventSource) { eventSource.close(); eventSource = null; } if (pollInterval) { clearInterval(pollInterval); pollInterval = null; } if (countdownInterval) { clearInterval(countdownInterval); countdownInterval = null; } watchedAddress = null;