pingql/apps/pay/src/monitor.ts

344 lines
12 KiB
TypeScript

import sql from "./db";
import { getAddressInfo, getAddressInfoBulk } from "./freedom";
import { COINS, planTier } from "../../shared/plans";
import { generateReceipt } from "./receipt";
const SOCK_API = process.env.FREEDOM_SOCK ?? "https://sock.freedom.st";
const THRESHOLD = 0.95;
let addressMap = new Map<string, any>(); // address → payment
let txidToPayment = new Map<string, number>(); // txid → payment.id
const seenTxids = new Set<string>();
async function refreshMaps() {
const active = await sql`
SELECT * FROM payments
WHERE status IN ('pending', 'underpaid', 'confirming')
AND expires_at >= now()
`;
const newAddr = new Map<string, any>();
const newTxid = new Map<string, number>();
for (const p of active) newAddr.set(p.address, p);
if (active.length > 0) {
const ids = active.map((p: any) => p.id);
const txs = await sql`SELECT payment_id, txid FROM payment_txs WHERE payment_id = ANY(${ids})`;
for (const tx of txs) newTxid.set(tx.txid, tx.payment_id);
}
addressMap = newAddr;
txidToPayment = newTxid;
for (const t of seenTxids) { if (!newTxid.has(t)) seenTxids.delete(t); }
}
async function recordTx(paymentId: number, address: string, txid: string, amount: number, confirmed: boolean) {
// Verify the payment exists and the address matches - prevents stale in-memory state
// from attributing transactions to the wrong payment
const [payment] = await sql`
SELECT id FROM payments WHERE id = ${paymentId} AND address = ${address}
AND status IN ('pending', 'underpaid', 'confirming')
`;
if (!payment) return;
const [ins] = await sql`
INSERT INTO payment_txs (payment_id, txid, amount, confirmed)
VALUES (${paymentId}, ${txid}, ${amount.toFixed(8)}, ${confirmed})
ON CONFLICT (payment_id, txid) DO UPDATE SET confirmed = EXCLUDED.confirmed OR payment_txs.confirmed
RETURNING (xmax = 0) as is_new
`;
if (ins?.is_new) {
const exp = new Date(Date.now() + 24 * 60 * 60 * 1000).toISOString();
await sql`UPDATE payments SET expires_at = ${exp} WHERE id = ${paymentId} AND expires_at < ${exp}`;
}
}
async function evaluatePayment(paymentId: number) {
const [payment] = await sql`
SELECT * FROM payments WHERE id = ${paymentId} AND status IN ('pending', 'underpaid', 'confirming')
`;
if (!payment) return;
const coin = COINS[payment.coin];
if (!coin) return;
const [{ total, unconfirmed }] = await sql`
SELECT COALESCE(SUM(amount::numeric), 0) as total,
COUNT(*) FILTER (WHERE NOT confirmed)::int as unconfirmed
FROM payment_txs WHERE payment_id = ${paymentId}
`;
const received = Number(total);
const expected = parseFloat(payment.amount_crypto);
const threshold = expected * THRESHOLD;
let newStatus = payment.status;
if (received >= threshold && (coin.confirmations === 0 || Number(unconfirmed) === 0)) {
newStatus = "paid";
} else if (received >= threshold) {
newStatus = "confirming";
} else if (received > 0) {
newStatus = "underpaid";
}
if (newStatus === payment.status && received <= parseFloat(payment.amount_received || "0")) return;
if (newStatus === "paid") {
await sql`UPDATE payments SET status = 'paid', amount_received = ${received.toFixed(8)}, paid_at = now() WHERE id = ${paymentId} AND status != 'paid'`;
await applyPlan(payment);
await generateReceipt(paymentId).catch(e => console.error(`Receipt generation failed for ${paymentId}:`, e));
addressMap.delete(payment.address);
console.log(`Payment ${paymentId} paid`);
} else {
await sql`UPDATE payments SET status = ${newStatus}, amount_received = ${received.toFixed(8)} WHERE id = ${paymentId}`;
}
}
async function handleTxEvent(event: any) {
const txHash = event.data?.hash;
if (!txHash || seenTxids.has(txHash)) return;
seenTxids.add(txHash);
const outputs = event.data?.to ?? [];
for (const out of outputs) {
const addr = out?.address;
const payment = addr && addressMap.get(addr);
if (!payment) continue;
const txValue = outputs
.filter((o: any) => o?.address === addr)
.reduce((s: number, o: any) => s + Number(o.amount ?? 0), 0);
if (txValue <= 0) continue;
console.log(`SSE: tx ${txHash} for payment ${payment.id}: +${txValue} ${payment.coin}`);
await recordTx(payment.id, payment.address, txHash, txValue, false);
txidToPayment.set(txHash, payment.id);
await evaluatePayment(payment.id);
return;
}
}
async function handleBlockEvent(event: any) {
const blockTxs: string[] = (event.data?.transactions ?? []).map((t: any) => typeof t === 'string' ? t : t.hash);
const matched = blockTxs.filter(t => txidToPayment.has(t));
if (matched.length === 0) return;
await sql`UPDATE payment_txs SET confirmed = true WHERE txid = ANY(${matched})`;
const pids = new Set(matched.map(t => txidToPayment.get(t)!));
for (const pid of pids) {
console.log(`SSE: block confirmed tx for payment ${pid}`);
await evaluatePayment(pid);
}
}
function startSSE() {
connectSSE(`${SOCK_API}/sse`);
}
async function connectSSE(url: string) {
while (true) {
try {
const res = await fetch(url);
if (!res.ok || !res.body) { await sleep(5000); continue; }
console.log("SSE: connected");
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buf = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buf += decoder.decode(value, { stream: true });
const lines = buf.split("\n");
buf = lines.pop() ?? "";
for (const line of lines) {
if (!line.startsWith("data:")) continue;
try {
const event = JSON.parse(line.slice(line.indexOf("{")));
if (event.type === "block") handleBlockEvent(event).catch(() => {});
else if (event.type === "tx") handleTxEvent(event).catch(() => {});
} catch {}
}
}
} catch (e: any) {
console.error("SSE error:", e.message);
}
console.log("SSE: reconnecting in 3s...");
await sleep(3000);
}
}
export async function checkPayments() {
await sql`
UPDATE payments SET status = 'expired'
WHERE status IN ('pending', 'underpaid', 'confirming') AND expires_at < now()
`;
await refreshMaps();
const payments = await sql`
SELECT * FROM payments WHERE status IN ('pending', 'underpaid', 'confirming') AND expires_at >= now()
`;
if (payments.length === 0) return;
let bulk: Record<string, any> = {};
try { bulk = await getAddressInfoBulk(payments.map((p: any) => p.address)); } catch {}
for (const payment of payments) {
try {
let info = bulk[payment.address];
if (!info) try { info = await getAddressInfo(payment.address); } catch { continue; }
if (!info || info.error) continue;
const inbound = (info.transactions ?? []).filter((tx: any) => tx.amount > 0);
for (const tx of inbound) {
if (!tx.hash) continue;
await recordTx(payment.id, payment.address, tx.hash, Number(tx.amount ?? 0), tx.block != null);
}
await evaluatePayment(payment.id);
} catch (e) {
console.error(`Error checking payment ${payment.id}:`, e);
}
}
}
export function watchPayment(payment: any) {
addressMap.set(payment.address, payment);
}
interface StackEntry { plan: string; remaining_days: number | null }
interface AccountState { plan: string; plan_expires_at: Date | null; plan_stack: StackEntry[] }
interface AccountUpdate { plan: string; plan_expires_at: Date | null; plan_stack: StackEntry[] }
export function insertIntoStack(stack: StackEntry[], entry: StackEntry): StackEntry[] {
const result = stack.slice();
const existing = result.findIndex(e => e.plan === entry.plan);
if (existing !== -1) {
const old = result[existing];
if (old.remaining_days === null || entry.remaining_days === null) {
result[existing] = { plan: entry.plan, remaining_days: null };
} else {
result[existing] = { plan: entry.plan, remaining_days: old.remaining_days + entry.remaining_days };
}
result.sort((a, b) => planTier(b.plan) - planTier(a.plan));
return result;
}
const tier = planTier(entry.plan);
let i = 0;
while (i < result.length && planTier(result[i].plan) >= tier) i++;
result.splice(i, 0, entry);
return result;
}
export function computeApplyPlan(
acc: AccountState,
payment: { plan: string; months: number | null },
now: Date
): AccountUpdate {
const stack = (acc.plan_stack || []).slice();
const newPlan = payment.plan;
const newDays = payment.plan === "lifetime" ? null : (payment.months ?? 1) * 30;
const currentExpiry = acc.plan_expires_at ? new Date(acc.plan_expires_at) : null;
const currentIsActive = acc.plan === "lifetime"
|| (acc.plan !== "free" && currentExpiry && currentExpiry > now);
if (!currentIsActive || acc.plan === "free") {
const expiresAt = newDays != null ? new Date(now.getTime() + newDays * 86400000) : null;
return { plan: newPlan, plan_expires_at: expiresAt, plan_stack: stack };
}
if (newPlan === acc.plan && newDays != null && currentExpiry) {
const extended = new Date(currentExpiry.getTime() + newDays * 86400000);
return { plan: acc.plan, plan_expires_at: extended, plan_stack: stack };
}
if (planTier(newPlan) > planTier(acc.plan)) {
const remainingDays = acc.plan === "lifetime"
? null
: Math.ceil((currentExpiry!.getTime() - now.getTime()) / 86400000);
const newStack = insertIntoStack(stack, { plan: acc.plan, remaining_days: remainingDays });
const expiresAt = newDays != null ? new Date(now.getTime() + newDays * 86400000) : null;
return { plan: newPlan, plan_expires_at: expiresAt, plan_stack: newStack };
}
const newStack = insertIntoStack(stack, { plan: newPlan, remaining_days: newDays });
return { plan: acc.plan, plan_expires_at: currentExpiry, plan_stack: newStack };
}
export function computeExpiry(acc: AccountState, now: Date): AccountUpdate | null {
if (!["pro", "pro2x", "pro4x"].includes(acc.plan)) return null;
if (!acc.plan_expires_at || new Date(acc.plan_expires_at) >= now) return null;
const stack = (acc.plan_stack || []).slice();
while (stack.length > 0) {
const next = stack.shift()!;
if (next.remaining_days === null) {
return { plan: next.plan, plan_expires_at: null, plan_stack: stack };
}
if (next.remaining_days > 0) {
const expiresAt = new Date(now.getTime() + next.remaining_days * 86400000);
return { plan: next.plan, plan_expires_at: expiresAt, plan_stack: stack };
}
}
return { plan: "free", plan_expires_at: null, plan_stack: [] };
}
async function applyPlan(payment: any) {
try {
await sql.begin(async (tx) => {
const [acc] = await tx`
SELECT plan, plan_expires_at, plan_stack FROM accounts WHERE id = ${payment.account_id} FOR UPDATE
`;
const stack = typeof acc.plan_stack === "string" ? JSON.parse(acc.plan_stack) : (acc.plan_stack || []);
const update = computeApplyPlan(
{ plan: acc.plan, plan_expires_at: acc.plan_expires_at, plan_stack: stack },
{ plan: payment.plan, months: payment.months },
new Date()
);
await tx`
UPDATE accounts SET plan = ${update.plan},
plan_expires_at = ${update.plan_expires_at?.toISOString() ?? null},
plan_stack = ${sql.json(update.plan_stack)}
WHERE id = ${payment.account_id}
`;
});
console.log(`Payment ${payment.id} activated: ${payment.plan} for account ${payment.account_id}`);
} catch (e) {
console.error(`applyPlan failed for payment ${payment.id}:`, e);
}
}
export async function expireProPlans() {
const expired = await sql`
SELECT id, plan, plan_expires_at, plan_stack FROM accounts
WHERE plan IN ('pro', 'pro2x', 'pro4x') AND plan_expires_at IS NOT NULL AND plan_expires_at < now()
`;
if (expired.length === 0) return;
const now = new Date();
for (const acc of expired) {
const stack = typeof acc.plan_stack === "string" ? JSON.parse(acc.plan_stack) : (acc.plan_stack || []);
const update = computeExpiry(
{ plan: acc.plan, plan_expires_at: acc.plan_expires_at, plan_stack: stack },
now
);
if (!update) continue;
await sql`
UPDATE accounts SET plan = ${update.plan},
plan_expires_at = ${update.plan_expires_at?.toISOString() ?? null},
plan_stack = ${sql.json(update.plan_stack)}
WHERE id = ${acc.id}
`;
console.log(`Account ${acc.id}: ${acc.plan} expired → ${update.plan}`);
}
}
function sleep(ms: number) { return new Promise(r => setTimeout(r, ms)); }
startSSE();