diff --git a/apps/pay/src/index.ts b/apps/pay/src/index.ts index 14ca2af..b12e258 100644 --- a/apps/pay/src/index.ts +++ b/apps/pay/src/index.ts @@ -47,7 +47,8 @@ const app = new Elysia() console.log(`PingQL Pay running at http://localhost:${app.server?.port}`); -// Check pending payments every 30 seconds +// Run immediately on startup, then every 30 seconds +checkPayments().catch((err) => console.error("Payment check failed:", err)); setInterval(() => { checkPayments().catch((err) => console.error("Payment check failed:", err)); }, 30_000); diff --git a/apps/pay/src/monitor.ts b/apps/pay/src/monitor.ts index c3c55ee..56d62a4 100644 --- a/apps/pay/src/monitor.ts +++ b/apps/pay/src/monitor.ts @@ -1,25 +1,17 @@ -/// Payment monitor: uses Freedom.st SSE for instant tx detection and -/// block-based confirmation, with bulk polling as fallback. +/// Payment monitor: raw SSE stream for instant tx/block detection, +/// with bulk polling as fallback. import sql from "./db"; import { getAddressInfo, getAddressInfoBulk } from "./freedom"; import { COINS } from "./plans"; const SOCK_API = process.env.FREEDOM_SOCK ?? "https://sock-v1.freedom.st"; -// ── In-memory maps for SSE matching ───────────────────────────────── -// address → payment row for all active (pending/confirming) payments. -// NOT removed on first tx — user may send multiple txs to same address. +// ── In-memory maps ────────────────────────────────────────────────── let addressMap = new Map(); - -// Confirming payments: paymentId → { payment, txids[] } -// Tracks all txids seen for each confirming payment. let confirmingMap = new Map }>(); -// Reverse lookup: txid → paymentId for fast block matching let txidLookup = new Map(); -// All txids we've already processed from SSE — prevents double-counting on duplicate events const seenTxids = new Set(); -/** Refresh maps from DB. Called periodically. */ async function refreshMaps() { const active = await sql` SELECT * FROM payments @@ -32,11 +24,8 @@ async function refreshMaps() { const newTxidLookup = new Map(); for (const p of active) { - // All active payments stay in addressMap (more txs may arrive) newAddr.set(p.address, p); - if (p.status === "confirming" && p.txid) { - // Restore txids from DB (single stored txid) + merge any we already track const existing = confirmingMap.get(p.id); const txids = existing?.txids ?? new Set(); txids.add(p.txid); @@ -49,45 +38,34 @@ async function refreshMaps() { confirmingMap = newConfirming; txidLookup = newTxidLookup; - // Prune seenTxids — only keep txids that belong to active payments for (const txid of seenTxids) { if (!newTxidLookup.has(txid)) seenTxids.delete(txid); } } -// ── SSE streams per chain ─────────────────────────────────────────── -const activeStreams = new Map(); +// ── Single raw SSE connection — no query, all chains ──────────────── -/** Start a raw SSE stream for a chain — receives ALL txs and blocks. */ -function startChainStream(chain: string) { - if (activeStreams.has(chain)) return; - - const ac = new AbortController(); - activeStreams.set(chain, ac); - - const query = { crypto: chain }; - const q = Buffer.from(JSON.stringify(query)).toString("base64"); - const url = `${SOCK_API}/sse?q=${q}`; - - connectSSE(chain, url, ac.signal); +function startSSE() { + const url = `${SOCK_API}/sse`; + connectSSE(url); } -async function connectSSE(chain: string, url: string, signal: AbortSignal) { - while (!signal.aborted) { +async function connectSSE(url: string) { + while (true) { try { - const res = await fetch(url, { signal }); + const res = await fetch(url); if (!res.ok || !res.body) { - console.error(`SSE ${chain}: HTTP ${res.status}`); - await new Promise(r => setTimeout(r, 5000)); + console.error(`SSE: HTTP ${res.status}`); + await sleep(5000); continue; } - console.log(`SSE ${chain}: connected`); + console.log("SSE: connected (raw, all chains)"); const reader = res.body.getReader(); const decoder = new TextDecoder(); let buffer = ""; - while (!signal.aborted) { + while (true) { const { done, value } = await reader.read(); if (done) break; @@ -101,30 +79,25 @@ async function connectSSE(chain: string, url: string, signal: AbortSignal) { const json = line.startsWith("data: ") ? line.slice(6) : line.slice(5); const event = JSON.parse(json); if (event.type === "block") { - handleBlockEvent(chain, event).catch(() => {}); - } else { - handleTxEvent(chain, event).catch(() => {}); + handleBlockEvent(event).catch(() => {}); + } else if (event.type === "tx") { + handleTxEvent(event).catch(() => {}); } } catch {} } } } catch (e: any) { - if (signal.aborted) return; - console.error(`SSE ${chain} error:`, e.message); - } - if (!signal.aborted) { - console.log(`SSE ${chain}: reconnecting in 3s...`); - await new Promise(r => setTimeout(r, 3000)); + console.error("SSE error:", e.message); } + console.log("SSE: reconnecting in 3s..."); + await sleep(3000); } } -/** Handle a tx event — compare all outputs against our active addresses. */ -async function handleTxEvent(chain: string, event: any) { +async function handleTxEvent(event: any) { const outputs = event.data?.out ?? []; const txHash = event.data?.tx?.hash ?? null; if (!txHash) return; - // SSE can send the same tx twice — skip duplicates if (seenTxids.has(txHash)) return; seenTxids.add(txHash); @@ -134,15 +107,13 @@ async function handleTxEvent(chain: string, event: any) { const payment = addressMap.get(addr); if (!payment) continue; - if (payment.coin !== chain) continue; - const coin = COINS[chain]; + const coin = COINS[payment.coin]; if (!coin) continue; - console.log(`SSE: tx ${txHash} for payment ${payment.id} (${chain})`); + console.log(`SSE: tx ${txHash} for payment ${payment.id} (${payment.coin})`); if (coin.confirmations === 0) { - // 0-conf coin: check if total received meets threshold try { const info = await getAddressInfo(payment.address); if (!info || info.error) continue; @@ -154,7 +125,6 @@ async function handleTxEvent(chain: string, event: any) { } } catch {} } else { - // 1+ conf: track txid, mark confirming if (payment.status === "pending") { await sql`UPDATE payments SET status = 'confirming', txid = ${txHash} WHERE id = ${payment.id}`; payment.status = "confirming"; @@ -162,7 +132,6 @@ async function handleTxEvent(chain: string, event: any) { console.log(`Payment ${payment.id} now confirming`); } - // Add txid to confirming map (handles multiple txs) let entry = confirmingMap.get(payment.id); if (!entry) { entry = { payment, txids: new Set() }; @@ -174,15 +143,10 @@ async function handleTxEvent(chain: string, event: any) { } } -/** Handle a block event — check if any confirming txid is in this block. - * When found, verify confirmed amount meets threshold before activating. */ -async function handleBlockEvent(chain: string, event: any) { +async function handleBlockEvent(event: any) { const blockTxs: string[] = event.data?.tx ?? []; if (blockTxs.length === 0) return; - const blockTxSet = new Set(blockTxs); - - // Find which confirming payments have a txid in this block const toCheck = new Set(); for (const txid of blockTxs) { const paymentId = txidLookup.get(txid); @@ -191,24 +155,20 @@ async function handleBlockEvent(chain: string, event: any) { if (toCheck.size === 0) return; - // Collect addresses to bulk-check const addressesToCheck: string[] = []; const paymentsByAddress = new Map }; paymentId: number }>(); for (const paymentId of toCheck) { const entry = confirmingMap.get(paymentId); - if (!entry || entry.payment.coin !== chain) continue; + if (!entry) continue; addressesToCheck.push(entry.payment.address); paymentsByAddress.set(entry.payment.address, { entry, paymentId }); } if (addressesToCheck.length === 0) return; - // Bulk check confirmed amounts, fall back to individual let bulk: Record = {}; - try { - bulk = await getAddressInfoBulk(addressesToCheck); - } catch {} + try { bulk = await getAddressInfoBulk(addressesToCheck); } catch {} for (const [addr, { entry, paymentId }] of paymentsByAddress) { let info = bulk[addr]; @@ -221,10 +181,9 @@ async function handleBlockEvent(chain: string, event: any) { const threshold = parseFloat(entry.payment.amount_crypto) * 0.995; if (receivedConfirmed >= threshold) { - console.log(`SSE: block confirmed payment ${paymentId} (${chain})`); + console.log(`SSE: block confirmed payment ${paymentId}`); const txid = entry.payment.txid || [...entry.txids][0] || null; await activatePayment(entry.payment, txid); - // Clean up maps for (const t of entry.txids) txidLookup.delete(t); confirmingMap.delete(paymentId); addressMap.delete(addr); @@ -232,39 +191,17 @@ async function handleBlockEvent(chain: string, event: any) { } } -/** Start/stop SSE streams based on which chains have active payments. */ -async function syncStreams() { - const chainsNeeded = new Set(); - for (const p of addressMap.values()) chainsNeeded.add(p.coin); - - for (const chain of chainsNeeded) { - if (COINS[chain]) startChainStream(chain); - } - - for (const [chain, ac] of activeStreams) { - if (!chainsNeeded.has(chain)) { - ac.abort(); - activeStreams.delete(chain); - } - } -} - // ── Bulk polling fallback ─────────────────────────────────────────── -/** Poll all active payments using bulk address endpoint. */ export async function checkPayments() { - // Expire stale payments await sql` UPDATE payments SET status = 'expired' WHERE status IN ('pending', 'confirming') AND expires_at < now() `; - // Refresh maps and sync SSE streams await refreshMaps(); - await syncStreams(); - // Collect all addresses that need checking const allPayments = await sql` SELECT * FROM payments WHERE status IN ('pending', 'confirming') @@ -273,7 +210,6 @@ export async function checkPayments() { if (allPayments.length === 0) return; - // Bulk lookup all addresses at once, fall back to individual lookups const addresses = allPayments.map((p: any) => p.address); let bulk: Record = {}; try { @@ -285,11 +221,8 @@ export async function checkPayments() { for (const payment of allPayments) { try { let info = bulk[payment.address]; - // Fall back to individual lookup if bulk didn't return data for this address if (!info) { - try { - info = await getAddressInfo(payment.address); - } catch { continue; } + try { info = await getAddressInfo(payment.address); } catch { continue; } } if (!info || info.error) continue; @@ -325,6 +258,11 @@ export async function checkPayments() { // ── Helpers ─────────────────────────────────────────────────────────── +/** Add a payment to the address map immediately (called from routes on checkout creation). */ +export function watchPayment(payment: any) { + addressMap.set(payment.address, payment); +} + function findTxid(info: any): string | null { if (info.in?.length) return info.in[0].txid ?? null; return null; @@ -363,7 +301,6 @@ async function activatePayment(payment: any, txid: string | null) { console.log(`Payment ${payment.id} activated: ${payment.plan} for account ${payment.account_id}`); } -/** Downgrade expired pro accounts back to free. */ export async function expireProPlans() { const result = await sql` UPDATE accounts SET plan = 'free', plan_expires_at = NULL @@ -374,4 +311,11 @@ export async function expireProPlans() { if (result.count > 0) { console.log(`Downgraded ${result.count} expired pro accounts to free`); } -} \ No newline at end of file +} + +function sleep(ms: number) { + return new Promise(r => setTimeout(r, ms)); +} + +// Start the single SSE connection immediately on import +startSSE(); diff --git a/apps/pay/src/routes.ts b/apps/pay/src/routes.ts index 26172fc..9ee4d87 100644 --- a/apps/pay/src/routes.ts +++ b/apps/pay/src/routes.ts @@ -3,6 +3,7 @@ import sql from "./db"; import { derive } from "./address"; import { getExchangeRates, getAvailableCoins, getQrUrl } from "./freedom"; import { PLANS, COINS } from "./plans"; +import { watchPayment } from "./monitor"; // Resolve account from key (same logic as API/web apps) async function resolveKey(key: string): Promise<{ accountId: string; keyId: string | null; plan: string } | null> { @@ -100,6 +101,9 @@ export const routes = new Elysia() RETURNING * `; + // Start watching this address immediately via SSE + watchPayment(payment); + // Build payment URI for QR code const coinInfo = COINS[coin]; const uri = `${coinInfo.uri}:${address}?amount=${amountCrypto}`;