From 2dbf85652bca45c606c1e7a881df54ec871e2999 Mon Sep 17 00:00:00 2001 From: nate Date: Thu, 19 Mar 2026 00:29:42 +0400 Subject: [PATCH] fix: backend SSE uses fetch streaming (EventSource not available in Bun), bulk lookup fallback --- apps/pay/src/freedom.ts | 30 ++++++++++- apps/pay/src/monitor.ts | 108 +++++++++++++++++++++++++++------------- 2 files changed, 101 insertions(+), 37 deletions(-) diff --git a/apps/pay/src/freedom.ts b/apps/pay/src/freedom.ts index ea6c024..dfe984f 100644 --- a/apps/pay/src/freedom.ts +++ b/apps/pay/src/freedom.ts @@ -15,7 +15,8 @@ export async function getAddressInfo(address: string): Promise { return res.json(); } -/** Bulk address lookup — POST /address with { terms: [...] } */ +/** Bulk address lookup — POST /address with { terms: [...] } + * Normalizes response to { address: info } map regardless of API format. */ export async function getAddressInfoBulk(addresses: string[]): Promise> { if (addresses.length === 0) return {}; const res = await fetch(`${API}/address`, { @@ -23,7 +24,32 @@ export async function getAddressInfoBulk(addresses: string[]): Promise = {}; + for (const item of data) { + if (item?.address) map[item.address] = item; + } + return map; + } + + // Check if it's keyed by address — verify first value has address-like fields + const firstKey = Object.keys(data)[0]; + if (firstKey && data[firstKey]?.address) return data; + + // If keyed by index (0, 1, 2...), map back to addresses + if (firstKey === "0" || firstKey === "1") { + const map: Record = {}; + for (let i = 0; i < addresses.length; i++) { + if (data[i]) map[addresses[i]] = data[i]; + } + return map; + } + + return data; } export function getQrUrl(text: string): string { diff --git a/apps/pay/src/monitor.ts b/apps/pay/src/monitor.ts index dc994cc..c3c55ee 100644 --- a/apps/pay/src/monitor.ts +++ b/apps/pay/src/monitor.ts @@ -1,7 +1,7 @@ /// Payment monitor: uses Freedom.st SSE for instant tx detection and /// block-based confirmation, with bulk polling as fallback. import sql from "./db"; -import { getAddressInfoBulk } from "./freedom"; +import { getAddressInfo, getAddressInfoBulk } from "./freedom"; import { COINS } from "./plans"; const SOCK_API = process.env.FREEDOM_SOCK ?? "https://sock-v1.freedom.st"; @@ -56,33 +56,67 @@ async function refreshMaps() { } // ── SSE streams per chain ─────────────────────────────────────────── -const activeStreams = new Map(); +const activeStreams = new Map(); /** Start a raw SSE stream for a chain — receives ALL txs and blocks. */ function startChainStream(chain: string) { if (activeStreams.has(chain)) return; + const ac = new AbortController(); + activeStreams.set(chain, ac); + const query = { crypto: chain }; const q = Buffer.from(JSON.stringify(query)).toString("base64"); const url = `${SOCK_API}/sse?q=${q}`; - const es = new EventSource(url); - activeStreams.set(chain, es); + connectSSE(chain, url, ac.signal); +} - es.onmessage = (e) => { +async function connectSSE(chain: string, url: string, signal: AbortSignal) { + while (!signal.aborted) { try { - const event = JSON.parse(e.data); - if (event.type === "block") { - handleBlockEvent(chain, event).catch(() => {}); - } else { - handleTxEvent(chain, event).catch(() => {}); + const res = await fetch(url, { signal }); + if (!res.ok || !res.body) { + console.error(`SSE ${chain}: HTTP ${res.status}`); + await new Promise(r => setTimeout(r, 5000)); + continue; } - } catch {} - }; - es.onerror = () => { - console.error(`SSE ${chain}: connection error (will auto-reconnect)`); - }; + console.log(`SSE ${chain}: connected`); + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + while (!signal.aborted) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split("\n"); + buffer = lines.pop() ?? ""; + + for (const line of lines) { + if (!line.startsWith("data:")) continue; + try { + const json = line.startsWith("data: ") ? line.slice(6) : line.slice(5); + const event = JSON.parse(json); + if (event.type === "block") { + handleBlockEvent(chain, event).catch(() => {}); + } else { + handleTxEvent(chain, event).catch(() => {}); + } + } catch {} + } + } + } catch (e: any) { + if (signal.aborted) return; + console.error(`SSE ${chain} error:`, e.message); + } + if (!signal.aborted) { + console.log(`SSE ${chain}: reconnecting in 3s...`); + await new Promise(r => setTimeout(r, 3000)); + } + } } /** Handle a tx event — compare all outputs against our active addresses. */ @@ -108,12 +142,10 @@ async function handleTxEvent(chain: string, event: any) { console.log(`SSE: tx ${txHash} for payment ${payment.id} (${chain})`); if (coin.confirmations === 0) { - // 0-conf coin: check if total received meets threshold via bulk poll - // (user may have sent partial amounts across multiple txs) + // 0-conf coin: check if total received meets threshold try { - const bulk = await getAddressInfoBulk([payment.address]); - const info = bulk[payment.address]; - if (!info) continue; + const info = await getAddressInfo(payment.address); + if (!info || info.error) continue; const received = Number(info.received ?? 0); const threshold = parseFloat(payment.amount_crypto) * 0.995; if (received >= threshold) { @@ -172,17 +204,18 @@ async function handleBlockEvent(chain: string, event: any) { if (addressesToCheck.length === 0) return; - // Bulk check confirmed amounts - let bulk: Record; + // Bulk check confirmed amounts, fall back to individual + let bulk: Record = {}; try { bulk = await getAddressInfoBulk(addressesToCheck); - } catch { - return; - } + } catch {} for (const [addr, { entry, paymentId }] of paymentsByAddress) { - const info = bulk[addr]; - if (!info) continue; + let info = bulk[addr]; + if (!info) { + try { info = await getAddressInfo(addr); } catch { continue; } + } + if (!info || info.error) continue; const receivedConfirmed = Number(info.received_confirmed ?? 0); const threshold = parseFloat(entry.payment.amount_crypto) * 0.995; @@ -208,9 +241,9 @@ async function syncStreams() { if (COINS[chain]) startChainStream(chain); } - for (const [chain, es] of activeStreams) { + for (const [chain, ac] of activeStreams) { if (!chainsNeeded.has(chain)) { - es.close(); + ac.abort(); activeStreams.delete(chain); } } @@ -240,20 +273,25 @@ export async function checkPayments() { if (allPayments.length === 0) return; - // Bulk lookup all addresses at once + // Bulk lookup all addresses at once, fall back to individual lookups const addresses = allPayments.map((p: any) => p.address); - let bulk: Record; + let bulk: Record = {}; try { bulk = await getAddressInfoBulk(addresses); } catch (e) { - console.error("Bulk address lookup failed:", e); - return; + console.error("Bulk address lookup failed, falling back to individual:", e); } for (const payment of allPayments) { try { - const info = bulk[payment.address]; - if (!info) continue; + let info = bulk[payment.address]; + // Fall back to individual lookup if bulk didn't return data for this address + if (!info) { + try { + info = await getAddressInfo(payment.address); + } catch { continue; } + } + if (!info || info.error) continue; const coin = COINS[payment.coin]; if (!coin) continue;