From 81f1e1585eaaabecb318353bf72bf250086e2f2d Mon Sep 17 00:00:00 2001 From: nate Date: Thu, 19 Mar 2026 00:04:46 +0400 Subject: [PATCH] fix: use raw SSE with local matching, bulk polling, block-based confirmations, and multi-tx support --- .env.example | 2 +- apps/pay/src/freedom.ts | 11 ++ apps/pay/src/monitor.ts | 312 ++++++++++++++++++++------------ apps/web/src/views/checkout.ejs | 58 ++++-- 4 files changed, 251 insertions(+), 132 deletions(-) diff --git a/.env.example b/.env.example index 060a438..df385f4 100644 --- a/.env.example +++ b/.env.example @@ -13,7 +13,7 @@ RUST_LOG=info # Pay app — crypto payments FREEDOM_API=https://api-v1.freedom.st XPUB_BTC=xpub... -XPUB_LTC=Ltub... +XPUB_LTC=zpub6oKss9XPK8K3GFp8FmvPqnd8gcKZeXG7GZ2jwed9M5nuD3VovacCX5XTTC22idzwHxFggbx13qRFZEWzrwHeAZs32LNWuqGKKvdiZ37Lafy XPUB_DOGE=dgub... XPUB_DASH=drkp... XPUB_BCH=xpub... diff --git a/apps/pay/src/freedom.ts b/apps/pay/src/freedom.ts index af8cf5f..ea6c024 100644 --- a/apps/pay/src/freedom.ts +++ b/apps/pay/src/freedom.ts @@ -15,6 +15,17 @@ export async function getAddressInfo(address: string): Promise { return res.json(); } +/** Bulk address lookup — POST /address with { terms: [...] } */ +export async function getAddressInfoBulk(addresses: string[]): Promise> { + if (addresses.length === 0) return {}; + const res = await fetch(`${API}/address`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ terms: addresses }), + }); + return res.json(); +} + export function getQrUrl(text: string): string { return `${API}/invoice/qr/${encodeURIComponent(text)}`; } diff --git a/apps/pay/src/monitor.ts b/apps/pay/src/monitor.ts index ef73af8..429573f 100644 --- a/apps/pay/src/monitor.ts +++ b/apps/pay/src/monitor.ts @@ -1,38 +1,70 @@ -/// Payment monitor: uses Freedom.st SSE for instant tx detection, -/// with periodic polling as fallback for confirmations and expiry. +/// Payment monitor: uses Freedom.st SSE for instant tx detection and +/// block-based confirmation, with bulk polling as fallback. import sql from "./db"; -import { getAddressInfo } from "./freedom"; +import { getAddressInfoBulk } from "./freedom"; import { COINS } from "./plans"; const SOCK_API = process.env.FREEDOM_SOCK ?? "https://sock-v1.freedom.st"; -// ── In-memory address lookup for SSE matching ───────────────────────── -// Maps address → payment row for all pending/confirming payments. +// ── In-memory maps for SSE matching ───────────────────────────────── +// address → payment row for all active (pending/confirming) payments. +// NOT removed on first tx — user may send multiple txs to same address. let addressMap = new Map(); -/** Refresh the address map from DB. Called periodically. */ -async function refreshAddressMap() { - const pending = await sql` +// Confirming payments: paymentId → { payment, txids[] } +// Tracks all txids seen for each confirming payment. +let confirmingMap = new Map }>(); +// Reverse lookup: txid → paymentId for fast block matching +let txidLookup = new Map(); +// All txids we've already processed from SSE — prevents double-counting on duplicate events +const seenTxids = new Set(); + +/** Refresh maps from DB. Called periodically. */ +async function refreshMaps() { + const active = await sql` SELECT * FROM payments WHERE status IN ('pending', 'confirming') AND expires_at >= now() `; - const map = new Map(); - for (const p of pending) map.set(p.address, p); - addressMap = map; + + const newAddr = new Map(); + const newConfirming = new Map }>(); + const newTxidLookup = new Map(); + + for (const p of active) { + // All active payments stay in addressMap (more txs may arrive) + newAddr.set(p.address, p); + + if (p.status === "confirming" && p.txid) { + // Restore txids from DB (single stored txid) + merge any we already track + const existing = confirmingMap.get(p.id); + const txids = existing?.txids ?? new Set(); + txids.add(p.txid); + newConfirming.set(p.id, { payment: p, txids }); + for (const t of txids) newTxidLookup.set(t, p.id); + } + } + + addressMap = newAddr; + confirmingMap = newConfirming; + txidLookup = newTxidLookup; + + // Prune seenTxids — only keep txids that belong to active payments + for (const txid of seenTxids) { + if (!newTxidLookup.has(txid)) seenTxids.delete(txid); + } } -// ── SSE listeners per chain ─────────────────────────────────────────── +// ── SSE streams per chain ─────────────────────────────────────────── const activeStreams = new Map(); -/** Start an SSE stream for a chain, watching for txs to any of our addresses. */ +/** Start a raw SSE stream for a chain — receives ALL txs and blocks. */ function startChainStream(chain: string) { if (activeStreams.has(chain)) return; const ac = new AbortController(); activeStreams.set(chain, ac); - // Watch all txs for this coin and match server-side against our address map. const query = { crypto: chain }; const q = Buffer.from(JSON.stringify(query)).toString("base64"); const url = `${SOCK_API}/sse?q=${q}`; @@ -66,7 +98,11 @@ async function connectSSE(chain: string, url: string, signal: AbortSignal) { if (!line.startsWith("data: ")) continue; try { const event = JSON.parse(line.slice(6)); - await handleTxEvent(chain, event); + if (event.type === "block") { + await handleBlockEvent(chain, event); + } else { + await handleTxEvent(chain, event); + } } catch {} } } @@ -74,61 +110,133 @@ async function connectSSE(chain: string, url: string, signal: AbortSignal) { if (signal.aborted) return; console.error(`SSE ${chain} error:`, e.message); } - // Reconnect after brief pause if (!signal.aborted) await sleep(3000); } } -/** Handle a transaction event from SSE. Check if any output matches a pending payment address. */ +/** Handle a tx event — compare all outputs against our active addresses. */ async function handleTxEvent(chain: string, event: any) { - const tx = event.data || event; - const outputs = tx.out || tx.outputs || tx.vout || []; - const txid = tx.txid || tx.hash || null; + const outputs = event.data?.out ?? []; + const txHash = event.data?.tx?.hash ?? null; + if (!txHash) return; + // SSE can send the same tx twice — skip duplicates + if (seenTxids.has(txHash)) return; + seenTxids.add(txHash); for (const out of outputs) { - // Freedom.st tx format: out[].script.address - const addr = out?.script?.address || out?.address || out?.scriptPubKey?.address; + const addr = out?.script?.address; if (!addr) continue; const payment = addressMap.get(addr); if (!payment) continue; if (payment.coin !== chain) continue; - console.log(`SSE: tx ${txid} detected for payment ${payment.id} (${chain})`); - const coin = COINS[chain]; if (!coin) continue; + console.log(`SSE: tx ${txHash} for payment ${payment.id} (${chain})`); + if (coin.confirmations === 0) { - // 0-conf: activate immediately - await activatePayment(payment, txid); - addressMap.delete(addr); + // 0-conf coin: check if total received meets threshold via bulk poll + // (user may have sent partial amounts across multiple txs) + try { + const bulk = await getAddressInfoBulk([payment.address]); + const info = bulk[payment.address]; + if (!info) continue; + const received = Number(info.received ?? 0); + const threshold = parseFloat(payment.amount_crypto) * 0.995; + if (received >= threshold) { + await activatePayment(payment, txHash); + addressMap.delete(addr); + } + } catch {} } else { - // 1+ conf: mark as confirming, polling will handle confirmation + // 1+ conf: track txid, mark confirming if (payment.status === "pending") { - await sql`UPDATE payments SET status = 'confirming', txid = ${txid} WHERE id = ${payment.id}`; + await sql`UPDATE payments SET status = 'confirming', txid = ${txHash} WHERE id = ${payment.id}`; payment.status = "confirming"; - payment.txid = txid; - console.log(`Payment ${payment.id} now confirming (waiting for block)`); + payment.txid = txHash; + console.log(`Payment ${payment.id} now confirming`); } + + // Add txid to confirming map (handles multiple txs) + let entry = confirmingMap.get(payment.id); + if (!entry) { + entry = { payment, txids: new Set() }; + confirmingMap.set(payment.id, entry); + } + entry.txids.add(txHash); + txidLookup.set(txHash, payment.id); } } } -/** Start SSE streams for all chains that have pending payments. */ -async function syncStreams() { - // Determine which chains have pending payments - const chainsNeeded = new Set(); - for (const p of addressMap.values()) { - chainsNeeded.add(p.coin); +/** Handle a block event — check if any confirming txid is in this block. + * When found, verify confirmed amount meets threshold before activating. */ +async function handleBlockEvent(chain: string, event: any) { + const blockTxs: string[] = event.data?.tx ?? []; + if (blockTxs.length === 0) return; + + const blockTxSet = new Set(blockTxs); + + // Find which confirming payments have a txid in this block + const toCheck = new Set(); + for (const txid of blockTxs) { + const paymentId = txidLookup.get(txid); + if (paymentId != null) toCheck.add(paymentId); } - // Start streams for chains we need + if (toCheck.size === 0) return; + + // Collect addresses to bulk-check + const addressesToCheck: string[] = []; + const paymentsByAddress = new Map }; paymentId: number }>(); + + for (const paymentId of toCheck) { + const entry = confirmingMap.get(paymentId); + if (!entry || entry.payment.coin !== chain) continue; + addressesToCheck.push(entry.payment.address); + paymentsByAddress.set(entry.payment.address, { entry, paymentId }); + } + + if (addressesToCheck.length === 0) return; + + // Bulk check confirmed amounts + let bulk: Record; + try { + bulk = await getAddressInfoBulk(addressesToCheck); + } catch { + return; + } + + for (const [addr, { entry, paymentId }] of paymentsByAddress) { + const info = bulk[addr]; + if (!info) continue; + + const receivedConfirmed = Number(info.received_confirmed ?? 0); + const threshold = parseFloat(entry.payment.amount_crypto) * 0.995; + + if (receivedConfirmed >= threshold) { + console.log(`SSE: block confirmed payment ${paymentId} (${chain})`); + const txid = entry.payment.txid || [...entry.txids][0] || null; + await activatePayment(entry.payment, txid); + // Clean up maps + for (const t of entry.txids) txidLookup.delete(t); + confirmingMap.delete(paymentId); + addressMap.delete(addr); + } + } +} + +/** Start/stop SSE streams based on which chains have active payments. */ +async function syncStreams() { + const chainsNeeded = new Set(); + for (const p of addressMap.values()) chainsNeeded.add(p.coin); + for (const chain of chainsNeeded) { if (COINS[chain]) startChainStream(chain); } - // Stop streams for chains with no pending payments for (const [chain, ac] of activeStreams) { if (!chainsNeeded.has(chain)) { ac.abort(); @@ -137,9 +245,9 @@ async function syncStreams() { } } -// ── Polling fallback (for confirmations and missed txs) ─────────────── +// ── Bulk polling fallback ─────────────────────────────────────────── -/** Poll all pending/confirming payments. Handles confirmations and expiry. */ +/** Poll all active payments using bulk address endpoint. */ export async function checkPayments() { // Expire stale payments await sql` @@ -148,103 +256,71 @@ export async function checkPayments() { AND expires_at < now() `; - // Refresh address map and sync SSE streams - await refreshAddressMap(); + // Refresh maps and sync SSE streams + await refreshMaps(); await syncStreams(); - // Poll confirming payments for block confirmation - const confirming = await sql` + // Collect all addresses that need checking + const allPayments = await sql` SELECT * FROM payments - WHERE status = 'confirming' + WHERE status IN ('pending', 'confirming') AND expires_at >= now() `; - for (const payment of confirming) { - try { - await checkConfirmation(payment); - } catch (e) { - console.error(`Error checking confirmation for payment ${payment.id}:`, e); - } + if (allPayments.length === 0) return; + + // Bulk lookup all addresses at once + const addresses = allPayments.map((p: any) => p.address); + let bulk: Record; + try { + bulk = await getAddressInfoBulk(addresses); + } catch (e) { + console.error("Bulk address lookup failed:", e); + return; } - // Also poll any pending payments as fallback (SSE might have missed them) - const pending = await sql` - SELECT * FROM payments - WHERE status = 'pending' - AND expires_at >= now() - `; - - for (const payment of pending) { + for (const payment of allPayments) { try { - await checkPending(payment); + const info = bulk[payment.address]; + if (!info) continue; + + const coin = COINS[payment.coin]; + if (!coin) continue; + + const received = Number(info.received ?? 0); + const receivedConfirmed = Number(info.received_confirmed ?? 0); + const expectedCrypto = parseFloat(payment.amount_crypto); + const threshold = expectedCrypto * 0.995; + + if (payment.status === "pending") { + if (coin.confirmations === 0 && received >= threshold) { + await activatePayment(payment, findTxid(info)); + } else if (coin.confirmations > 0 && receivedConfirmed >= threshold) { + await activatePayment(payment, findTxid(info)); + } else if (received >= threshold) { + const txid = findTxid(info); + await sql`UPDATE payments SET status = 'confirming', txid = ${txid} WHERE id = ${payment.id}`; + console.log(`Poll: payment ${payment.id} now confirming`); + } + } else if (payment.status === "confirming") { + if (receivedConfirmed >= threshold) { + const txid = payment.txid || findTxid(info); + await activatePayment(payment, txid); + } + } } catch (e) { console.error(`Error checking payment ${payment.id}:`, e); } } } -async function checkConfirmation(payment: any) { - const info = await getAddressInfo(payment.address); - if (info.error) return; - - const expectedSats = cryptoToSats(payment.coin, payment.amount_crypto); - const threshold = expectedSats * 0.995; - const confirmed = sumReceived(info, true); - - if (confirmed >= threshold) { - const txid = payment.txid || findTxid(info); - await activatePayment(payment, txid); - addressMap.delete(payment.address); - } -} - -async function checkPending(payment: any) { - const info = await getAddressInfo(payment.address); - if (info.error) return; - - const coin = COINS[payment.coin]; - if (!coin) return; - - const multiplier = payment.coin === "xec" ? 100 : 1e8; - const expectedCrypto = parseFloat(payment.amount_crypto); - const threshold = expectedCrypto * 0.995; - - const confirmed = sumReceived(info, true); - const total = sumReceived(info, false); - - if (coin.confirmations === 0 && total >= threshold) { - await activatePayment(payment, findTxid(info)); - addressMap.delete(payment.address); - } else if (coin.confirmations > 0 && confirmed >= threshold) { - await activatePayment(payment, findTxid(info)); - addressMap.delete(payment.address); - } else if (total >= threshold && payment.status === "pending") { - await sql`UPDATE payments SET status = 'confirming' WHERE id = ${payment.id}`; - } -} - // ── Helpers ─────────────────────────────────────────────────────────── -function sumReceived(info: any, confirmedOnly: boolean): number { - if (confirmedOnly) { - return Number(info.balance?.confirmed ?? info.confirmed ?? info.received ?? 0); - } - const confirmed = Number(info.balance?.confirmed ?? info.confirmed ?? info.received ?? 0); - const unconfirmed = Number(info.balance?.unconfirmed ?? info.unconfirmed ?? 0); - return confirmed + unconfirmed; -} - function findTxid(info: any): string | null { - if (info.txs?.length) return info.txs[0].txid || info.txs[0].hash || null; - if (info.transactions?.length) return info.transactions[0].txid || info.transactions[0] || null; + if (info.in?.length) return info.in[0].txid ?? null; return null; } -function cryptoToSats(coin: string, amount: string): number { - const multiplier = coin === "xec" ? 100 : 1e8; - return Math.round(parseFloat(amount) * multiplier); -} - async function activatePayment(payment: any, txid: string | null) { const [updated] = await sql` UPDATE payments @@ -252,7 +328,7 @@ async function activatePayment(payment: any, txid: string | null) { WHERE id = ${payment.id} AND status != 'paid' RETURNING id `; - if (!updated) return; // Already activated by another path + if (!updated) return; if (payment.plan === "lifetime") { await sql` diff --git a/apps/web/src/views/checkout.ejs b/apps/web/src/views/checkout.ejs index e09ab1a..0d5e2c0 100644 --- a/apps/web/src/views/checkout.ejs +++ b/apps/web/src/views/checkout.ejs @@ -245,12 +245,20 @@ pollInterval = setInterval(() => pollStatus(), 10000); } - /** Subscribe to Freedom.st SSE filtered by our payment address for instant detection. */ + let watchedAddress = null; + let watchedTxids = []; + + /** Listen to raw SSE for this coin, match tx outputs against our address locally. + * Tracks multiple txids (user may send across several transactions). + * On block, checks if any of our txids got confirmed. */ function watchAddress(coin, address) { if (sseAbort) sseAbort.abort(); sseAbort = new AbortController(); + watchedAddress = address; + watchedTxids = []; - const query = { crypto: coin, "data.out.script.address": address }; + // Raw stream for this coin — all txs and blocks, no query filter + const query = { crypto: coin }; const q = btoa(JSON.stringify(query)); const url = `${SOCK_API}/sse?q=${q}`; @@ -275,7 +283,11 @@ if (!line.startsWith('data: ')) continue; try { const event = JSON.parse(line.slice(6)); - onTxDetected(event); + if (event.type === 'block') { + onBlock(event); + } else { + onTx(event); + } } catch {} } } @@ -287,16 +299,34 @@ })(); } - function onTxDetected(event) { - // Transaction matched our address filter — payment detected instantly - console.log('TX detected via SSE:', event); - const statusEl = document.getElementById('pay-status'); - statusEl.innerHTML = ` - - Transaction detected, waiting for confirmation... - `; - // Immediately poll the pay API to get authoritative status update - pollStatus(); + /** Check if any tx output matches our payment address. */ + function onTx(event) { + if (!watchedAddress) return; + const outputs = event.data?.out ?? []; + for (const out of outputs) { + if (out?.script?.address !== watchedAddress) continue; + const txHash = event.data?.tx?.hash ?? null; + if (!txHash || watchedTxids.includes(txHash)) return; + watchedTxids.push(txHash); + console.log('SSE: tx detected for our address:', txHash, `(${watchedTxids.length} total)`); + document.getElementById('pay-status').innerHTML = ` + + Transaction detected, waiting for confirmation... + `; + pollStatus(); + return; + } + } + + /** Check if a new block includes any of our confirming txids. */ + function onBlock(event) { + if (watchedTxids.length === 0) return; + const blockTxs = event.data?.tx ?? []; + const found = watchedTxids.some(t => blockTxs.includes(t)); + if (found) { + console.log('SSE: block contains one of our txids'); + pollStatus(); + } } function updateCountdown(expiresAt) { @@ -369,6 +399,8 @@ if (sseAbort) { sseAbort.abort(); sseAbort = null; } if (pollInterval) { clearInterval(pollInterval); pollInterval = null; } if (countdownInterval) { clearInterval(countdownInterval); countdownInterval = null; } + watchedAddress = null; + watchedTxids = []; document.getElementById('step-select').classList.remove('hidden'); document.getElementById('step-pay').classList.add('hidden'); document.getElementById('pay-success').classList.add('hidden');