From 7d1a350f864a7977ebbab8276995749495e3c154 Mon Sep 17 00:00:00 2001 From: nate Date: Wed, 18 Mar 2026 23:06:57 +0400 Subject: [PATCH] feat: instant payment detection via Freedom.st SSE on backend and frontend --- apps/pay/src/monitor.ts | 233 +++++++++++++++++++++++++++----- apps/web/src/views/checkout.ejs | 68 +++++++++- 2 files changed, 264 insertions(+), 37 deletions(-) diff --git a/apps/pay/src/monitor.ts b/apps/pay/src/monitor.ts index bcd71ed..da78c3c 100644 --- a/apps/pay/src/monitor.ts +++ b/apps/pay/src/monitor.ts @@ -1,9 +1,147 @@ -/// Background job: poll pending payments and activate plans on confirmation. +/// Payment monitor: uses Freedom.st SSE for instant tx detection, +/// with periodic polling as fallback for confirmations and expiry. import sql from "./db"; import { getAddressInfo } from "./freedom"; import { COINS } from "./plans"; -/** Check all pending/confirming payments against the blockchain. */ +const SOCK_API = process.env.FREEDOM_SOCK ?? "https://sock-v1.freedom.st"; + +// ── In-memory address lookup for SSE matching ───────────────────────── +// Maps address → payment row for all pending/confirming payments. +let addressMap = new Map(); + +/** Refresh the address map from DB. Called periodically. */ +async function refreshAddressMap() { + const pending = await sql` + SELECT * FROM payments + WHERE status IN ('pending', 'confirming') + AND expires_at >= now() + `; + const map = new Map(); + for (const p of pending) map.set(p.address, p); + addressMap = map; +} + +// ── SSE listeners per chain ─────────────────────────────────────────── +const activeStreams = new Map(); + +/** Start an SSE stream for a chain, watching for txs to any of our addresses. */ +function startChainStream(chain: string) { + if (activeStreams.has(chain)) return; + + const ac = new AbortController(); + activeStreams.set(chain, ac); + + // No address filter — we watch ALL txs on this chain and match server-side + // against our address map. This is necessary because addresses change as + // new payments are created. + const query = { chain }; + const q = Buffer.from(JSON.stringify(query)).toString("base64"); + const url = `${SOCK_API}/sse?q=${q}`; + + connectSSE(chain, url, ac.signal); +} + +async function connectSSE(chain: string, url: string, signal: AbortSignal) { + while (!signal.aborted) { + try { + const res = await fetch(url, { signal }); + if (!res.ok || !res.body) { + console.error(`SSE ${chain}: HTTP ${res.status}`); + await sleep(5000); + continue; + } + + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + while (!signal.aborted) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split("\n"); + buffer = lines.pop() ?? ""; + + for (const line of lines) { + if (!line.startsWith("data: ")) continue; + try { + const event = JSON.parse(line.slice(6)); + await handleTxEvent(chain, event); + } catch {} + } + } + } catch (e: any) { + if (signal.aborted) return; + console.error(`SSE ${chain} error:`, e.message); + } + // Reconnect after brief pause + if (!signal.aborted) await sleep(3000); + } +} + +/** Handle a transaction event from SSE. Check if any output matches a pending payment address. */ +async function handleTxEvent(chain: string, event: any) { + const tx = event.data || event; + const outputs = tx.out || tx.outputs || tx.vout || []; + const txid = tx.txid || tx.hash || null; + + for (const out of outputs) { + // Freedom.st tx format: out[].script.address + const addr = out?.script?.address || out?.address || out?.scriptPubKey?.address; + if (!addr) continue; + + const payment = addressMap.get(addr); + if (!payment) continue; + if (payment.coin !== chain) continue; + + console.log(`SSE: tx ${txid} detected for payment ${payment.id} (${chain})`); + + const coin = COINS[chain]; + if (!coin) continue; + + if (coin.confirmations === 0) { + // 0-conf: activate immediately + await activatePayment(payment, txid); + addressMap.delete(addr); + } else { + // 1+ conf: mark as confirming, polling will handle confirmation + if (payment.status === "pending") { + await sql`UPDATE payments SET status = 'confirming', txid = ${txid} WHERE id = ${payment.id}`; + payment.status = "confirming"; + payment.txid = txid; + console.log(`Payment ${payment.id} now confirming (waiting for block)`); + } + } + } +} + +/** Start SSE streams for all chains that have pending payments. */ +async function syncStreams() { + // Determine which chains have pending payments + const chainsNeeded = new Set(); + for (const p of addressMap.values()) { + chainsNeeded.add(p.coin); + } + + // Start streams for chains we need + for (const chain of chainsNeeded) { + if (COINS[chain]) startChainStream(chain); + } + + // Stop streams for chains with no pending payments + for (const [chain, ac] of activeStreams) { + if (!chainsNeeded.has(chain)) { + ac.abort(); + activeStreams.delete(chain); + } + } +} + +// ── Polling fallback (for confirmations and missed txs) ─────────────── + +/** Poll all pending/confirming payments. Handles confirmations and expiry. */ export async function checkPayments() { // Expire stale payments await sql` @@ -12,22 +150,57 @@ export async function checkPayments() { AND expires_at < now() `; + // Refresh address map and sync SSE streams + await refreshAddressMap(); + await syncStreams(); + + // Poll confirming payments for block confirmation + const confirming = await sql` + SELECT * FROM payments + WHERE status = 'confirming' + AND expires_at >= now() + `; + + for (const payment of confirming) { + try { + await checkConfirmation(payment); + } catch (e) { + console.error(`Error checking confirmation for payment ${payment.id}:`, e); + } + } + + // Also poll any pending payments as fallback (SSE might have missed them) const pending = await sql` SELECT * FROM payments - WHERE status IN ('pending', 'confirming') + WHERE status = 'pending' AND expires_at >= now() `; for (const payment of pending) { try { - await checkPayment(payment); + await checkPending(payment); } catch (e) { console.error(`Error checking payment ${payment.id}:`, e); } } } -async function checkPayment(payment: any) { +async function checkConfirmation(payment: any) { + const info = await getAddressInfo(payment.address); + if (info.error) return; + + const expectedSats = cryptoToSats(payment.coin, payment.amount_crypto); + const threshold = expectedSats * 0.995; + const confirmed = sumReceived(info, true); + + if (confirmed >= threshold) { + const txid = payment.txid || findTxid(info); + await activatePayment(payment, txid); + addressMap.delete(payment.address); + } +} + +async function checkPending(payment: any) { const info = await getAddressInfo(payment.address); if (info.error) return; @@ -35,65 +208,52 @@ async function checkPayment(payment: any) { if (!coin) return; const expectedSats = cryptoToSats(payment.coin, payment.amount_crypto); - - // Sum confirmed received - const confirmedReceived = sumReceived(info, true); - // Sum all received (including unconfirmed) - const totalReceived = sumReceived(info, false); - - // Allow 0.5% tolerance for rounding const threshold = expectedSats * 0.995; - if (coin.confirmations === 0) { - // 0-conf coins (BCH, XEC): accept as soon as seen in mempool - if (totalReceived >= threshold) { - const txid = findTxid(info); - await activatePayment(payment, txid); - } - } else { - // 1-conf coins: need confirmed balance - if (confirmedReceived >= threshold) { - const txid = findTxid(info); - await activatePayment(payment, txid); - } else if (totalReceived >= threshold && payment.status === "pending") { - // Seen in mempool but not yet confirmed - await sql`UPDATE payments SET status = 'confirming' WHERE id = ${payment.id}`; - } + const confirmed = sumReceived(info, true); + const total = sumReceived(info, false); + + if (coin.confirmations === 0 && total >= threshold) { + await activatePayment(payment, findTxid(info)); + addressMap.delete(payment.address); + } else if (coin.confirmations > 0 && confirmed >= threshold) { + await activatePayment(payment, findTxid(info)); + addressMap.delete(payment.address); + } else if (total >= threshold && payment.status === "pending") { + await sql`UPDATE payments SET status = 'confirming' WHERE id = ${payment.id}`; } } +// ── Helpers ─────────────────────────────────────────────────────────── + function sumReceived(info: any, confirmedOnly: boolean): number { - // Freedom.st /address response has balance fields - // The response includes received/balance in the coin's base unit (satoshis) if (confirmedOnly) { return Number(info.balance?.confirmed ?? info.confirmed ?? info.received ?? 0); } - // Total = confirmed + unconfirmed const confirmed = Number(info.balance?.confirmed ?? info.confirmed ?? info.received ?? 0); const unconfirmed = Number(info.balance?.unconfirmed ?? info.unconfirmed ?? 0); return confirmed + unconfirmed; } function findTxid(info: any): string | null { - // Try to get the first txid from transaction history if (info.txs?.length) return info.txs[0].txid || info.txs[0].hash || null; if (info.transactions?.length) return info.transactions[0].txid || info.transactions[0] || null; return null; } -/** Convert a decimal crypto amount string to satoshis/base units. */ function cryptoToSats(coin: string, amount: string): number { - // XEC uses 100 sats per coin, everything else uses 1e8 const multiplier = coin === "xec" ? 100 : 1e8; return Math.round(parseFloat(amount) * multiplier); } async function activatePayment(payment: any, txid: string | null) { - await sql` + const [updated] = await sql` UPDATE payments SET status = 'paid', paid_at = now(), txid = ${txid} WHERE id = ${payment.id} AND status != 'paid' + RETURNING id `; + if (!updated) return; // Already activated by another path if (payment.plan === "lifetime") { await sql` @@ -101,7 +261,6 @@ async function activatePayment(payment: any, txid: string | null) { WHERE id = ${payment.account_id} `; } else { - // Pro: extend from current expiry or now const [account] = await sql` SELECT plan, plan_expires_at FROM accounts WHERE id = ${payment.account_id} `; @@ -132,3 +291,7 @@ export async function expireProPlans() { console.log(`Downgraded ${result.count} expired pro accounts to free`); } } + +function sleep(ms: number) { + return new Promise(resolve => setTimeout(resolve, ms)); +} diff --git a/apps/web/src/views/checkout.ejs b/apps/web/src/views/checkout.ejs index 15bb794..39cf28b 100644 --- a/apps/web/src/views/checkout.ejs +++ b/apps/web/src/views/checkout.ejs @@ -127,6 +127,7 @@