fix: backend SSE uses fetch streaming (EventSource not available in Bun), bulk lookup fallback

This commit is contained in:
nate 2026-03-19 00:29:42 +04:00
parent 0854914411
commit 2dbf85652b
2 changed files with 101 additions and 37 deletions

View File

@ -15,7 +15,8 @@ export async function getAddressInfo(address: string): Promise<any> {
return res.json();
}
/** Bulk address lookup — POST /address with { terms: [...] } */
/** Bulk address lookup POST /address with { terms: [...] }
* Normalizes response to { address: info } map regardless of API format. */
export async function getAddressInfoBulk(addresses: string[]): Promise<Record<string, any>> {
if (addresses.length === 0) return {};
const res = await fetch(`${API}/address`, {
@ -23,7 +24,32 @@ export async function getAddressInfoBulk(addresses: string[]): Promise<Record<st
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ terms: addresses }),
});
return res.json();
const data = await res.json();
// Normalize: if response is already keyed by address, return as-is.
// If it's an array, index by address field.
if (Array.isArray(data)) {
const map: Record<string, any> = {};
for (const item of data) {
if (item?.address) map[item.address] = item;
}
return map;
}
// Check if it's keyed by address — verify first value has address-like fields
const firstKey = Object.keys(data)[0];
if (firstKey && data[firstKey]?.address) return data;
// If keyed by index (0, 1, 2...), map back to addresses
if (firstKey === "0" || firstKey === "1") {
const map: Record<string, any> = {};
for (let i = 0; i < addresses.length; i++) {
if (data[i]) map[addresses[i]] = data[i];
}
return map;
}
return data;
}
export function getQrUrl(text: string): string {

View File

@ -1,7 +1,7 @@
/// Payment monitor: uses Freedom.st SSE for instant tx detection and
/// block-based confirmation, with bulk polling as fallback.
import sql from "./db";
import { getAddressInfoBulk } from "./freedom";
import { getAddressInfo, getAddressInfoBulk } from "./freedom";
import { COINS } from "./plans";
const SOCK_API = process.env.FREEDOM_SOCK ?? "https://sock-v1.freedom.st";
@ -56,33 +56,67 @@ async function refreshMaps() {
}
// ── SSE streams per chain ───────────────────────────────────────────
const activeStreams = new Map<string, EventSource>();
const activeStreams = new Map<string, AbortController>();
/** Start a raw SSE stream for a chain — receives ALL txs and blocks. */
function startChainStream(chain: string) {
if (activeStreams.has(chain)) return;
const ac = new AbortController();
activeStreams.set(chain, ac);
const query = { crypto: chain };
const q = Buffer.from(JSON.stringify(query)).toString("base64");
const url = `${SOCK_API}/sse?q=${q}`;
const es = new EventSource(url);
activeStreams.set(chain, es);
connectSSE(chain, url, ac.signal);
}
es.onmessage = (e) => {
async function connectSSE(chain: string, url: string, signal: AbortSignal) {
while (!signal.aborted) {
try {
const event = JSON.parse(e.data);
if (event.type === "block") {
handleBlockEvent(chain, event).catch(() => {});
} else {
handleTxEvent(chain, event).catch(() => {});
const res = await fetch(url, { signal });
if (!res.ok || !res.body) {
console.error(`SSE ${chain}: HTTP ${res.status}`);
await new Promise(r => setTimeout(r, 5000));
continue;
}
} catch {}
};
es.onerror = () => {
console.error(`SSE ${chain}: connection error (will auto-reconnect)`);
};
console.log(`SSE ${chain}: connected`);
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (!signal.aborted) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
if (!line.startsWith("data:")) continue;
try {
const json = line.startsWith("data: ") ? line.slice(6) : line.slice(5);
const event = JSON.parse(json);
if (event.type === "block") {
handleBlockEvent(chain, event).catch(() => {});
} else {
handleTxEvent(chain, event).catch(() => {});
}
} catch {}
}
}
} catch (e: any) {
if (signal.aborted) return;
console.error(`SSE ${chain} error:`, e.message);
}
if (!signal.aborted) {
console.log(`SSE ${chain}: reconnecting in 3s...`);
await new Promise(r => setTimeout(r, 3000));
}
}
}
/** Handle a tx event — compare all outputs against our active addresses. */
@ -108,12 +142,10 @@ async function handleTxEvent(chain: string, event: any) {
console.log(`SSE: tx ${txHash} for payment ${payment.id} (${chain})`);
if (coin.confirmations === 0) {
// 0-conf coin: check if total received meets threshold via bulk poll
// (user may have sent partial amounts across multiple txs)
// 0-conf coin: check if total received meets threshold
try {
const bulk = await getAddressInfoBulk([payment.address]);
const info = bulk[payment.address];
if (!info) continue;
const info = await getAddressInfo(payment.address);
if (!info || info.error) continue;
const received = Number(info.received ?? 0);
const threshold = parseFloat(payment.amount_crypto) * 0.995;
if (received >= threshold) {
@ -172,17 +204,18 @@ async function handleBlockEvent(chain: string, event: any) {
if (addressesToCheck.length === 0) return;
// Bulk check confirmed amounts
let bulk: Record<string, any>;
// Bulk check confirmed amounts, fall back to individual
let bulk: Record<string, any> = {};
try {
bulk = await getAddressInfoBulk(addressesToCheck);
} catch {
return;
}
} catch {}
for (const [addr, { entry, paymentId }] of paymentsByAddress) {
const info = bulk[addr];
if (!info) continue;
let info = bulk[addr];
if (!info) {
try { info = await getAddressInfo(addr); } catch { continue; }
}
if (!info || info.error) continue;
const receivedConfirmed = Number(info.received_confirmed ?? 0);
const threshold = parseFloat(entry.payment.amount_crypto) * 0.995;
@ -208,9 +241,9 @@ async function syncStreams() {
if (COINS[chain]) startChainStream(chain);
}
for (const [chain, es] of activeStreams) {
for (const [chain, ac] of activeStreams) {
if (!chainsNeeded.has(chain)) {
es.close();
ac.abort();
activeStreams.delete(chain);
}
}
@ -240,20 +273,25 @@ export async function checkPayments() {
if (allPayments.length === 0) return;
// Bulk lookup all addresses at once
// Bulk lookup all addresses at once, fall back to individual lookups
const addresses = allPayments.map((p: any) => p.address);
let bulk: Record<string, any>;
let bulk: Record<string, any> = {};
try {
bulk = await getAddressInfoBulk(addresses);
} catch (e) {
console.error("Bulk address lookup failed:", e);
return;
console.error("Bulk address lookup failed, falling back to individual:", e);
}
for (const payment of allPayments) {
try {
const info = bulk[payment.address];
if (!info) continue;
let info = bulk[payment.address];
// Fall back to individual lookup if bulk didn't return data for this address
if (!info) {
try {
info = await getAddressInfo(payment.address);
} catch { continue; }
}
if (!info || info.error) continue;
const coin = COINS[payment.coin];
if (!coin) continue;