fix: single raw SSE connection with no query filter, always on from boot

This commit is contained in:
nate 2026-03-19 00:36:43 +04:00
parent 2dbf85652b
commit 2554321183
3 changed files with 47 additions and 98 deletions

View File

@ -47,7 +47,8 @@ const app = new Elysia()
console.log(`PingQL Pay running at http://localhost:${app.server?.port}`);
// Check pending payments every 30 seconds
// Run immediately on startup, then every 30 seconds
checkPayments().catch((err) => console.error("Payment check failed:", err));
setInterval(() => {
checkPayments().catch((err) => console.error("Payment check failed:", err));
}, 30_000);

View File

@ -1,25 +1,17 @@
/// Payment monitor: uses Freedom.st SSE for instant tx detection and
/// block-based confirmation, with bulk polling as fallback.
/// Payment monitor: raw SSE stream for instant tx/block detection,
/// with bulk polling as fallback.
import sql from "./db";
import { getAddressInfo, getAddressInfoBulk } from "./freedom";
import { COINS } from "./plans";
const SOCK_API = process.env.FREEDOM_SOCK ?? "https://sock-v1.freedom.st";
// ── In-memory maps for SSE matching ─────────────────────────────────
// address → payment row for all active (pending/confirming) payments.
// NOT removed on first tx — user may send multiple txs to same address.
// ── In-memory maps ──────────────────────────────────────────────────
let addressMap = new Map<string, any>();
// Confirming payments: paymentId → { payment, txids[] }
// Tracks all txids seen for each confirming payment.
let confirmingMap = new Map<number, { payment: any; txids: Set<string> }>();
// Reverse lookup: txid → paymentId for fast block matching
let txidLookup = new Map<string, number>();
// All txids we've already processed from SSE — prevents double-counting on duplicate events
const seenTxids = new Set<string>();
/** Refresh maps from DB. Called periodically. */
async function refreshMaps() {
const active = await sql`
SELECT * FROM payments
@ -32,11 +24,8 @@ async function refreshMaps() {
const newTxidLookup = new Map<string, number>();
for (const p of active) {
// All active payments stay in addressMap (more txs may arrive)
newAddr.set(p.address, p);
if (p.status === "confirming" && p.txid) {
// Restore txids from DB (single stored txid) + merge any we already track
const existing = confirmingMap.get(p.id);
const txids = existing?.txids ?? new Set<string>();
txids.add(p.txid);
@ -49,45 +38,34 @@ async function refreshMaps() {
confirmingMap = newConfirming;
txidLookup = newTxidLookup;
// Prune seenTxids — only keep txids that belong to active payments
for (const txid of seenTxids) {
if (!newTxidLookup.has(txid)) seenTxids.delete(txid);
}
}
// ── SSE streams per chain ───────────────────────────────────────────
const activeStreams = new Map<string, AbortController>();
// ── Single raw SSE connection — no query, all chains ────────────────
/** Start a raw SSE stream for a chain — receives ALL txs and blocks. */
function startChainStream(chain: string) {
if (activeStreams.has(chain)) return;
const ac = new AbortController();
activeStreams.set(chain, ac);
const query = { crypto: chain };
const q = Buffer.from(JSON.stringify(query)).toString("base64");
const url = `${SOCK_API}/sse?q=${q}`;
connectSSE(chain, url, ac.signal);
function startSSE() {
const url = `${SOCK_API}/sse`;
connectSSE(url);
}
async function connectSSE(chain: string, url: string, signal: AbortSignal) {
while (!signal.aborted) {
async function connectSSE(url: string) {
while (true) {
try {
const res = await fetch(url, { signal });
const res = await fetch(url);
if (!res.ok || !res.body) {
console.error(`SSE ${chain}: HTTP ${res.status}`);
await new Promise(r => setTimeout(r, 5000));
console.error(`SSE: HTTP ${res.status}`);
await sleep(5000);
continue;
}
console.log(`SSE ${chain}: connected`);
console.log("SSE: connected (raw, all chains)");
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (!signal.aborted) {
while (true) {
const { done, value } = await reader.read();
if (done) break;
@ -101,30 +79,25 @@ async function connectSSE(chain: string, url: string, signal: AbortSignal) {
const json = line.startsWith("data: ") ? line.slice(6) : line.slice(5);
const event = JSON.parse(json);
if (event.type === "block") {
handleBlockEvent(chain, event).catch(() => {});
} else {
handleTxEvent(chain, event).catch(() => {});
handleBlockEvent(event).catch(() => {});
} else if (event.type === "tx") {
handleTxEvent(event).catch(() => {});
}
} catch {}
}
}
} catch (e: any) {
if (signal.aborted) return;
console.error(`SSE ${chain} error:`, e.message);
}
if (!signal.aborted) {
console.log(`SSE ${chain}: reconnecting in 3s...`);
await new Promise(r => setTimeout(r, 3000));
console.error("SSE error:", e.message);
}
console.log("SSE: reconnecting in 3s...");
await sleep(3000);
}
}
/** Handle a tx event — compare all outputs against our active addresses. */
async function handleTxEvent(chain: string, event: any) {
async function handleTxEvent(event: any) {
const outputs = event.data?.out ?? [];
const txHash = event.data?.tx?.hash ?? null;
if (!txHash) return;
// SSE can send the same tx twice — skip duplicates
if (seenTxids.has(txHash)) return;
seenTxids.add(txHash);
@ -134,15 +107,13 @@ async function handleTxEvent(chain: string, event: any) {
const payment = addressMap.get(addr);
if (!payment) continue;
if (payment.coin !== chain) continue;
const coin = COINS[chain];
const coin = COINS[payment.coin];
if (!coin) continue;
console.log(`SSE: tx ${txHash} for payment ${payment.id} (${chain})`);
console.log(`SSE: tx ${txHash} for payment ${payment.id} (${payment.coin})`);
if (coin.confirmations === 0) {
// 0-conf coin: check if total received meets threshold
try {
const info = await getAddressInfo(payment.address);
if (!info || info.error) continue;
@ -154,7 +125,6 @@ async function handleTxEvent(chain: string, event: any) {
}
} catch {}
} else {
// 1+ conf: track txid, mark confirming
if (payment.status === "pending") {
await sql`UPDATE payments SET status = 'confirming', txid = ${txHash} WHERE id = ${payment.id}`;
payment.status = "confirming";
@ -162,7 +132,6 @@ async function handleTxEvent(chain: string, event: any) {
console.log(`Payment ${payment.id} now confirming`);
}
// Add txid to confirming map (handles multiple txs)
let entry = confirmingMap.get(payment.id);
if (!entry) {
entry = { payment, txids: new Set() };
@ -174,15 +143,10 @@ async function handleTxEvent(chain: string, event: any) {
}
}
/** Handle a block event check if any confirming txid is in this block.
* When found, verify confirmed amount meets threshold before activating. */
async function handleBlockEvent(chain: string, event: any) {
async function handleBlockEvent(event: any) {
const blockTxs: string[] = event.data?.tx ?? [];
if (blockTxs.length === 0) return;
const blockTxSet = new Set(blockTxs);
// Find which confirming payments have a txid in this block
const toCheck = new Set<number>();
for (const txid of blockTxs) {
const paymentId = txidLookup.get(txid);
@ -191,24 +155,20 @@ async function handleBlockEvent(chain: string, event: any) {
if (toCheck.size === 0) return;
// Collect addresses to bulk-check
const addressesToCheck: string[] = [];
const paymentsByAddress = new Map<string, { entry: { payment: any; txids: Set<string> }; paymentId: number }>();
for (const paymentId of toCheck) {
const entry = confirmingMap.get(paymentId);
if (!entry || entry.payment.coin !== chain) continue;
if (!entry) continue;
addressesToCheck.push(entry.payment.address);
paymentsByAddress.set(entry.payment.address, { entry, paymentId });
}
if (addressesToCheck.length === 0) return;
// Bulk check confirmed amounts, fall back to individual
let bulk: Record<string, any> = {};
try {
bulk = await getAddressInfoBulk(addressesToCheck);
} catch {}
try { bulk = await getAddressInfoBulk(addressesToCheck); } catch {}
for (const [addr, { entry, paymentId }] of paymentsByAddress) {
let info = bulk[addr];
@ -221,10 +181,9 @@ async function handleBlockEvent(chain: string, event: any) {
const threshold = parseFloat(entry.payment.amount_crypto) * 0.995;
if (receivedConfirmed >= threshold) {
console.log(`SSE: block confirmed payment ${paymentId} (${chain})`);
console.log(`SSE: block confirmed payment ${paymentId}`);
const txid = entry.payment.txid || [...entry.txids][0] || null;
await activatePayment(entry.payment, txid);
// Clean up maps
for (const t of entry.txids) txidLookup.delete(t);
confirmingMap.delete(paymentId);
addressMap.delete(addr);
@ -232,39 +191,17 @@ async function handleBlockEvent(chain: string, event: any) {
}
}
/** Start/stop SSE streams based on which chains have active payments. */
async function syncStreams() {
const chainsNeeded = new Set<string>();
for (const p of addressMap.values()) chainsNeeded.add(p.coin);
for (const chain of chainsNeeded) {
if (COINS[chain]) startChainStream(chain);
}
for (const [chain, ac] of activeStreams) {
if (!chainsNeeded.has(chain)) {
ac.abort();
activeStreams.delete(chain);
}
}
}
// ── Bulk polling fallback ───────────────────────────────────────────
/** Poll all active payments using bulk address endpoint. */
export async function checkPayments() {
// Expire stale payments
await sql`
UPDATE payments SET status = 'expired'
WHERE status IN ('pending', 'confirming')
AND expires_at < now()
`;
// Refresh maps and sync SSE streams
await refreshMaps();
await syncStreams();
// Collect all addresses that need checking
const allPayments = await sql`
SELECT * FROM payments
WHERE status IN ('pending', 'confirming')
@ -273,7 +210,6 @@ export async function checkPayments() {
if (allPayments.length === 0) return;
// Bulk lookup all addresses at once, fall back to individual lookups
const addresses = allPayments.map((p: any) => p.address);
let bulk: Record<string, any> = {};
try {
@ -285,11 +221,8 @@ export async function checkPayments() {
for (const payment of allPayments) {
try {
let info = bulk[payment.address];
// Fall back to individual lookup if bulk didn't return data for this address
if (!info) {
try {
info = await getAddressInfo(payment.address);
} catch { continue; }
try { info = await getAddressInfo(payment.address); } catch { continue; }
}
if (!info || info.error) continue;
@ -325,6 +258,11 @@ export async function checkPayments() {
// ── Helpers ───────────────────────────────────────────────────────────
/** Add a payment to the address map immediately (called from routes on checkout creation). */
export function watchPayment(payment: any) {
addressMap.set(payment.address, payment);
}
function findTxid(info: any): string | null {
if (info.in?.length) return info.in[0].txid ?? null;
return null;
@ -363,7 +301,6 @@ async function activatePayment(payment: any, txid: string | null) {
console.log(`Payment ${payment.id} activated: ${payment.plan} for account ${payment.account_id}`);
}
/** Downgrade expired pro accounts back to free. */
export async function expireProPlans() {
const result = await sql`
UPDATE accounts SET plan = 'free', plan_expires_at = NULL
@ -375,3 +312,10 @@ export async function expireProPlans() {
console.log(`Downgraded ${result.count} expired pro accounts to free`);
}
}
function sleep(ms: number) {
return new Promise(r => setTimeout(r, ms));
}
// Start the single SSE connection immediately on import
startSSE();

View File

@ -3,6 +3,7 @@ import sql from "./db";
import { derive } from "./address";
import { getExchangeRates, getAvailableCoins, getQrUrl } from "./freedom";
import { PLANS, COINS } from "./plans";
import { watchPayment } from "./monitor";
// Resolve account from key (same logic as API/web apps)
async function resolveKey(key: string): Promise<{ accountId: string; keyId: string | null; plan: string } | null> {
@ -100,6 +101,9 @@ export const routes = new Elysia()
RETURNING *
`;
// Start watching this address immediately via SSE
watchPayment(payment);
// Build payment URI for QR code
const coinInfo = COINS[coin];
const uri = `${coinInfo.uri}:${address}?amount=${amountCrypto}`;