diff --git a/apps/api/src/cache/monitor-list.ts b/apps/api/src/cache/monitor-list.ts new file mode 100644 index 0000000..729bf70 --- /dev/null +++ b/apps/api/src/cache/monitor-list.ts @@ -0,0 +1,62 @@ +// In-memory cache of the enabled monitor list, keyed by region. The /internal/due +// endpoint is polled by every runner roughly once a second per region; without +// this cache each poll re-runs a 500-row scan against `monitors` with an array +// predicate, which dominates the api's Postgres traffic at any real fleet size. +// +// The list almost never changes between polls — monitor create/edit/delete is at +// most a few times per hour. So we memoize per-region with a short TTL and bust +// the cache from the monitor mutation handlers so edits are visible instantly. + +import sql from "../db"; + +const TTL_MS = 5000; + +type MonitorRow = Record; +type Entry = { rows: MonitorRow[]; expiresAt: number }; + +const cache = new Map(); +const inflight = new Map>(); + +async function fetchForRegion(region: string): Promise { + return sql` + SELECT id, url, method, request_headers, request_body, timeout_ms, interval_s, query, regions, + max_retries, retry_interval_s, created_at + FROM monitors + WHERE enabled = true + AND ( + array_length(regions, 1) IS NULL + OR regions = '{}' + OR ${region} = ANY(regions) + ) + LIMIT 500 + `; +} + +export async function getMonitorsForRegion(region: string): Promise { + const now = Date.now(); + const hit = cache.get(region); + if (hit && hit.expiresAt > now) return hit.rows; + + // Coalesce concurrent refreshes for the same region so a thundering herd of + // runner polls doesn't fan out into N parallel SELECTs against an expired + // entry. + let pending = inflight.get(region); + if (!pending) { + pending = fetchForRegion(region) + .then((rows) => { + cache.set(region, { rows, expiresAt: Date.now() + TTL_MS }); + return rows; + }) + .finally(() => { inflight.delete(region); }); + inflight.set(region, pending); + } + return pending; +} + +// Called by monitor create/patch/delete/toggle handlers. Wipes the entire +// region map — fine because (a) entries are tiny, (b) refresh is cheap, and +// (c) we don't know which regions a freshly-edited monitor belongs to without +// reading it back. Simpler than per-region invalidation, identical net effect. +export function invalidateMonitorList(): void { + cache.clear(); +} diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 82cd58c..ebb252c 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -2,7 +2,7 @@ import { Elysia } from "elysia"; import { ingest } from "./routes/pings"; import { monitors } from "./routes/monitors"; import { account } from "./routes/auth"; -import { internal } from "./routes/internal"; +import { internal, startPruneJob } from "./routes/internal"; import { channels } from "./routes/channels"; import { statusPages } from "./routes/status_pages"; import { incidents } from "./routes/incidents"; @@ -21,6 +21,7 @@ process.on("uncaughtException", (err) => { await migrate(); await startRollupJob(); +startPruneJob(); const elysia = new Elysia() .get("/", () => ({ diff --git a/apps/api/src/jobs/rollup.ts b/apps/api/src/jobs/rollup.ts index d58fed0..4942970 100644 --- a/apps/api/src/jobs/rollup.ts +++ b/apps/api/src/jobs/rollup.ts @@ -4,8 +4,13 @@ import sql from "../db"; // widgets can compute uptime % over arbitrary windows without ever scanning the // pings table at read time. Two resolutions: hourly and daily. // -// Each pass aggregates the *current* bucket only. The query is bounded by the -// bucket size, not the table size, so it's cheap regardless of history depth. +// Watermark model: rollup_watermarks(bucket_type) records the most recent +// pings.checked_at that has been folded into this bucket_type's rollup rows. +// Each periodic pass scans ONLY pings newer than the watermark, then merges +// them into existing rollup rows additively (total = total + new_total, etc.) +// via ON CONFLICT … DO UPDATE. This makes per-pass work proportional to the +// delta of new pings, not the bucket size — critical once a single account has +// thousands of monitors. type BucketType = "hourly" | "daily"; @@ -14,11 +19,49 @@ const BUCKET_TRUNC: Record = { daily: "day", }; -async function rollupCurrent(bucket: BucketType): Promise { +// Pull the current watermark for a bucket type. First call after a fresh +// deploy returns epoch (1970-01-01) so the first pass folds the entire +// retention window in one go. +async function getWatermark(bucket: BucketType): Promise { + const [row] = await sql<{ last_aggregated_at: Date }[]>` + SELECT last_aggregated_at FROM rollup_watermarks WHERE bucket_type = ${bucket} + `; + return row?.last_aggregated_at ?? new Date(0); +} + +async function setWatermark(bucket: BucketType, ts: Date): Promise { + await sql` + INSERT INTO rollup_watermarks (bucket_type, last_aggregated_at) + VALUES (${bucket}, ${ts}) + ON CONFLICT (bucket_type) DO UPDATE SET last_aggregated_at = EXCLUDED.last_aggregated_at + `; +} + +// Aggregate every ping newer than the watermark into the appropriate bucket +// row. Merges into existing rows additively so we never re-scan rows that +// were already folded in on a previous pass. The avg_latency merge is a +// weighted average over the running totals. +async function rollupSinceWatermark(bucket: BucketType): Promise { const trunc = BUCKET_TRUNC[bucket]; + const watermark = await getWatermark(bucket); + + // Capture the upper bound BEFORE the aggregation runs so we don't miss any + // pings that arrive between SELECT and the watermark write. Anything ingested + // after `boundary` will be picked up on the next pass. + const boundary = new Date(); + // GROUP BY 1,2,4 (ordinals) instead of repeating the date_trunc expression — // when the unit is a $-bound parameter, Postgres won't recognize the two // expressions as identical and will reject the column. Ordinals are safe. + // + // The ON CONFLICT merge formula: + // total = old + new + // up_count = old + new + // avg_latency = weighted average of old and new, weighted by their totals + // (NULLIF guards against the degenerate empty-bucket case) + // The weighted-average formula is mathematically identical to a one-shot + // SUM(latency)/SUM(total) recompute, so the merged value is byte-equal to + // what a full rescan would produce. const result = await sql` INSERT INTO monitor_uptime_rollup (monitor_id, region, bucket_type, bucket_start, total, up_count, avg_latency) SELECT @@ -30,22 +73,31 @@ async function rollupCurrent(bucket: BucketType): Promise { count(*) FILTER (WHERE up)::int AS up_count, avg(latency_ms)::real AS avg_latency FROM pings - WHERE checked_at >= date_trunc(${trunc}, now()) + WHERE checked_at > ${watermark} AND checked_at <= ${boundary} GROUP BY 1, 2, 3, 4 ON CONFLICT (monitor_id, region, bucket_type, bucket_start) DO UPDATE SET - total = EXCLUDED.total, - up_count = EXCLUDED.up_count, - avg_latency = EXCLUDED.avg_latency + total = monitor_uptime_rollup.total + EXCLUDED.total, + up_count = monitor_uptime_rollup.up_count + EXCLUDED.up_count, + avg_latency = ( + COALESCE(monitor_uptime_rollup.avg_latency, 0) * monitor_uptime_rollup.total + + COALESCE(EXCLUDED.avg_latency, 0) * EXCLUDED.total + ) / NULLIF(monitor_uptime_rollup.total + EXCLUDED.total, 0) `; + + await setWatermark(bucket, boundary); return result.count ?? 0; } -// Walk back N units and aggregate any buckets that don't exist yet. Used at -// startup so a freshly-deployed system has historical data immediately. -async function backfillRecent(bucket: BucketType, units: number): Promise { +// One-shot recompute over an arbitrary window, fully overwriting matched rows. +// Used for the startup backfill and the "still empty after backfill" force-run. +// Takes an explicit upper boundary so the caller can capture it BEFORE running +// the recompute and use the same value for the watermark write afterwards — +// any ping with checked_at > boundary is guaranteed to be outside this window +// and will be picked up by the first incremental pass instead. This closes +// the race where a ping ingested between boundary capture and the SELECT +// could otherwise be folded in by both recompute and incremental. +async function recomputeWindow(bucket: BucketType, units: number, boundary: Date): Promise { const trunc = BUCKET_TRUNC[bucket]; - // Build the interval string entirely in JS so postgres.js binds a single text - // parameter. Avoids the int || text type-mismatch trap inside SQL. const intervalLiteral = `${units} ${trunc}s`; const result = await sql` INSERT INTO monitor_uptime_rollup (monitor_id, region, bucket_type, bucket_start, total, up_count, avg_latency) @@ -58,9 +110,13 @@ async function backfillRecent(bucket: BucketType, units: number): Promise= date_trunc(${trunc}, now()) - ${intervalLiteral}::interval + WHERE checked_at >= date_trunc(${trunc}, ${boundary}::timestamptz) - ${intervalLiteral}::interval + AND checked_at <= ${boundary} GROUP BY 1, 2, 3, 4 - ON CONFLICT (monitor_id, region, bucket_type, bucket_start) DO NOTHING + ON CONFLICT (monitor_id, region, bucket_type, bucket_start) DO UPDATE SET + total = EXCLUDED.total, + up_count = EXCLUDED.up_count, + avg_latency = EXCLUDED.avg_latency `; return result.count ?? 0; } @@ -78,36 +134,45 @@ export async function startRollupJob() { if (started) return; started = true; - // Startup backfill. Errors are logged BUT NOT swallowed silently anymore — - // we throw so a broken rollup query trips the api process and shows in the - // service logs immediately, instead of leaving the table mysteriously empty. + // Startup backfill. Capture the boundary FIRST, then run the one-shot + // recompute bounded by it. The watermark is then set to the same boundary, + // so any ping with checked_at > boundary is guaranteed to be picked up by + // the first incremental pass — never folded in twice and never missed. try { + const boundary = new Date(); const [h, d] = await Promise.all([ - backfillRecent("hourly", 48), - backfillRecent("daily", 90), + recomputeWindow("hourly", 48, boundary), + recomputeWindow("daily", 90, boundary), ]); console.log(`[rollup] backfilled rows: hourly=${h} daily=${d}`); + await Promise.all([ + setWatermark("hourly", boundary), + setWatermark("daily", boundary), + ]); } catch (e) { console.error("[rollup] backfill FAILED — rollup table will be empty until fixed:", e); } // Force-run check: if any bucket type is still empty after the backfill, - // run rollupCurrent immediately so we have at least one row per type. This - // covers the "fresh deploy with very recent pings only" case. + // run an incremental pass immediately so we have at least one row per type. + // Covers the "fresh deploy with very recent pings only" case. try { for (const b of ["hourly", "daily"] as BucketType[]) { if (await rollupIsEmpty(b)) { - console.log(`[rollup] ${b} still empty — forcing current-bucket aggregation`); - await rollupCurrent(b); + console.log(`[rollup] ${b} still empty — forcing incremental aggregation`); + // Reset watermark so the pass picks up everything in retention. + await setWatermark(b, new Date(0)); + await rollupSinceWatermark(b); } } } catch (e) { console.error("[rollup] force-run check failed:", e); } - // Periodic refreshes for the *current* bucket of each resolution. Each query - // is bounded by the current bucket only (date_trunc(...)) so it stays cheap - // even at high cadence. - setInterval(() => { rollupCurrent("hourly").catch((e) => console.warn("[rollup] hourly failed:", e)); }, 30 * 1000); // every 30s - setInterval(() => { rollupCurrent("daily").catch((e) => console.warn("[rollup] daily failed:", e)); }, 5 * 60 * 1000); // every 5min + // Periodic incremental refreshes. Each pass scans only pings newer than the + // last watermark, so the work is proportional to the delta — not the bucket + // size. Hourly runs frequently so the current-hour bar appears quickly for + // fresh monitors; daily can run less often. + setInterval(() => { rollupSinceWatermark("hourly").catch((e) => console.warn("[rollup] hourly failed:", e)); }, 30 * 1000); // every 30s + setInterval(() => { rollupSinceWatermark("daily").catch((e) => console.warn("[rollup] daily failed:", e)); }, 5 * 60 * 1000); // every 5min } diff --git a/apps/api/src/routes/internal.ts b/apps/api/src/routes/internal.ts index c6f13b2..55bcd82 100644 --- a/apps/api/src/routes/internal.ts +++ b/apps/api/src/routes/internal.ts @@ -1,16 +1,91 @@ import { Elysia } from "elysia"; import sql from "../db"; import { safeTokenCompare } from "../../../shared/auth"; +import { getMonitorsForRegion } from "../cache/monitor-list"; +// Chunked retention prune. A single big DELETE on the pings table holds locks +// for the duration of the scan, blocks autovacuum from reclaiming dead tuples, +// and can stall replication. We loop in 10k-row batches so each commit +// releases its locks and lets autovacuum keep up. Total wall-clock is similar +// to a one-shot DELETE but the per-statement impact on the rest of the system +// is dramatically lower. +const PRUNE_BATCH_SIZE = 10_000; export async function pruneOldPings(retentionDays = 90) { - const result = await sql`DELETE FROM pings WHERE checked_at < now() - ${retentionDays + ' days'}::interval`; - return result.count; + const interval = `${retentionDays} days`; + let total = 0; + while (true) { + const result = await sql` + WITH victims AS ( + SELECT id FROM pings + WHERE checked_at < now() - ${interval}::interval + LIMIT ${PRUNE_BATCH_SIZE} + ) + DELETE FROM pings WHERE id IN (SELECT id FROM victims) + `; + const batch = result.count ?? 0; + total += batch; + if (batch < PRUNE_BATCH_SIZE) break; + } + return total; } -setInterval(() => { - const days = Number(process.env.PING_RETENTION_DAYS ?? 90); - pruneOldPings(days).catch((err) => console.error("Retention cleanup failed:", err)); -}, 60 * 60 * 1000); +// Periodic prune is gated behind a Postgres session-level advisory lock so +// horizontally scaled api replicas don't race each other. Without the lock, +// two replicas running the same chunked DELETE would interleave their +// LIMIT 10000 batches and compete for row locks. The lock is acquired with +// pg_try_advisory_lock so a busy replica skips its tick instead of blocking. +// +// Session-level locks live on a specific Postgres backend connection, so we +// reserve a connection from the pool with sql.reserve() and run the lock, +// the chunked prune, and the unlock all on it. Releasing the reservation +// returns the connection to the pool. If the api process crashes mid-prune, +// the backend dies with it and Postgres releases the lock automatically. +// +// 134678338 is just an arbitrary lock id unique within this app. If we add +// more global jobs later, give each its own constant in this file. +const PRUNE_LOCK_ID = 134678338; +let pruneJobStarted = false; + +async function runPruneTickWithLock(retentionDays: number): Promise { + const reserved = await (sql as any).reserve(); + try { + const [{ locked }] = await reserved` + SELECT pg_try_advisory_lock(${PRUNE_LOCK_ID}) AS locked + `; + if (!locked) return; // another replica is pruning right now + try { + const interval = `${retentionDays} days`; + let total = 0; + while (true) { + const result = await reserved` + WITH victims AS ( + SELECT id FROM pings + WHERE checked_at < now() - ${interval}::interval + LIMIT ${PRUNE_BATCH_SIZE} + ) + DELETE FROM pings WHERE id IN (SELECT id FROM victims) + `; + const batch = result.count ?? 0; + total += batch; + if (batch < PRUNE_BATCH_SIZE) break; + } + if (total > 0) console.log(`[prune] retention pruned ${total} pings (>${retentionDays}d)`); + } finally { + await reserved`SELECT pg_advisory_unlock(${PRUNE_LOCK_ID})`; + } + } finally { + reserved.release(); + } +} + +export function startPruneJob() { + if (pruneJobStarted) return; + pruneJobStarted = true; + setInterval(() => { + const days = Number(process.env.PING_RETENTION_DAYS ?? 90); + runPruneTickWithLock(days).catch((err) => console.error("Retention cleanup failed:", err)); + }, 60 * 60 * 1000); +} export const internal = new Elysia({ prefix: "/internal", detail: { hide: true } }) .derive(({ headers, set }) => { @@ -37,18 +112,13 @@ export const internal = new Elysia({ prefix: "/internal", detail: { hide: true } // creation is a tick. We pull all enabled monitors that match this // region, compute the next tick in JS, and return the ones whose next // tick falls within the lookahead window. - const monitors = await sql` - SELECT id, url, method, request_headers, request_body, timeout_ms, interval_s, query, regions, - max_retries, retry_interval_s, created_at - FROM monitors - WHERE enabled = true - AND ( - array_length(regions, 1) IS NULL - OR regions = '{}' - OR ${region} = ANY(regions) - ) - LIMIT 500 - `; + // + // The monitor list itself is memoized in apps/api/src/cache/monitor-list.ts + // with a 5s TTL — runners poll this endpoint roughly once a second per + // region, but the underlying list almost never changes between polls. The + // cache is busted from monitor create/patch/delete/toggle so edits show up + // immediately. + const monitors = await getMonitorsForRegion(region); const nowMs = Date.now(); const lookaheadEnd = nowMs + lookaheadMs; diff --git a/apps/api/src/routes/monitors.ts b/apps/api/src/routes/monitors.ts index 7674f6e..5e9ae0f 100644 --- a/apps/api/src/routes/monitors.ts +++ b/apps/api/src/routes/monitors.ts @@ -3,6 +3,7 @@ import { requireAuth } from "./auth"; import sql from "../db"; import { validateMonitorUrl } from "../utils/ssrf"; import { getPlanLimits } from "../../../shared/plans"; +import { invalidateMonitorList } from "../cache/monitor-list"; const MonitorBody = t.Object({ name: t.String({ maxLength: 200, description: "Human-readable name" }), @@ -111,6 +112,7 @@ export const monitors = new Elysia({ prefix: "/monitors" }) `; if (body.channel_ids) await replaceMonitorChannels(monitor.id, accountId, body.channel_ids); if (body.tags) await replaceMonitorTags(monitor.id, body.tags); + invalidateMonitorList(); return monitor; }, { body: MonitorBody, detail: { summary: "Create monitor", tags: ["monitors"] } }) @@ -177,6 +179,7 @@ export const monitors = new Elysia({ prefix: "/monitors" }) if (!monitor) { set.status = 404; return { error: "Not found" }; } if (body.channel_ids) await replaceMonitorChannels(monitor.id, accountId, body.channel_ids); if (body.tags) await replaceMonitorTags(monitor.id, body.tags); + invalidateMonitorList(); return monitor; }, { body: t.Partial(MonitorBody), detail: { summary: "Update monitor", tags: ["monitors"] } }) @@ -185,6 +188,7 @@ export const monitors = new Elysia({ prefix: "/monitors" }) DELETE FROM monitors WHERE id = ${params.id} AND account_id = ${accountId} RETURNING id `; if (!deleted) { set.status = 404; return { error: "Not found" }; } + invalidateMonitorList(); return { deleted: true }; }, { detail: { summary: "Delete monitor", tags: ["monitors"] } }) @@ -195,6 +199,7 @@ export const monitors = new Elysia({ prefix: "/monitors" }) RETURNING id, enabled `; if (!monitor) { set.status = 404; return { error: "Not found" }; } + invalidateMonitorList(); return monitor; }, { detail: { summary: "Toggle monitor on/off", tags: ["monitors"] } }) diff --git a/apps/api/src/routes/pings.ts b/apps/api/src/routes/pings.ts index 287f9e9..d87e2e1 100644 --- a/apps/api/src/routes/pings.ts +++ b/apps/api/src/routes/pings.ts @@ -56,10 +56,28 @@ export const ingest = new Elysia() const token = headers["x-monitor-token"]; if (!safeTokenCompare(token, process.env.MONITOR_TOKEN)) { set.status = 401; return { error: "Unauthorized" }; } - const [monitor_check] = await sql` - SELECT id, account_id, name, url, resend_interval, cert_alert_days - FROM monitors WHERE id = ${body.monitor_id} - `; + // Per-region transition state. Region is always populated by current runners; + // legacy null values from older pings collapse to "default" so state and + // notifications never carry an empty label. + const region = body.region && body.region.length > 0 ? body.region : 'default'; + + // The monitor lookup and the per-region state lookup are independent — + // the state row's primary key doesn't depend on anything from the monitor + // row. Fire them in parallel to halve the wall-clock cost on the hottest + // path in the system. (Combining them into a JOIN is a wash on a warm + // pool: both sides are PK lookups, and a JOIN just adds nested-loop + // planner overhead. Promise.all keeps each query's plan trivial.) + const [[monitor_check], [stateRow]] = await Promise.all([ + sql` + SELECT id, account_id, name, url, resend_interval, cert_alert_days + FROM monitors WHERE id = ${body.monitor_id} + `, + sql` + SELECT last_state, consecutive_down, cert_alert_sent + FROM monitor_region_state + WHERE monitor_id = ${body.monitor_id} AND region = ${region} + `, + ]); if (!monitor_check) { set.status = 404; return { error: "Monitor not found" }; } const meta = body.meta ? { ...body.meta } : {}; @@ -72,16 +90,6 @@ export const ingest = new Elysia() const scheduledAt = body.scheduled_at ? new Date(body.scheduled_at) : null; const jitterMs = body.jitter_ms ?? null; - // Per-region transition state. Region is always populated by current runners; - // legacy null values from older pings collapse to "default" so state and - // notifications never carry an empty label. - const region = body.region && body.region.length > 0 ? body.region : 'default'; - const [stateRow] = await sql` - SELECT last_state, consecutive_down, cert_alert_sent - FROM monitor_region_state - WHERE monitor_id = ${body.monitor_id} AND region = ${region} - `; - const newState = body.up ? 'up' : 'down'; const prevState: string | null = stateRow?.last_state ?? null; let consecutiveDown: number = stateRow?.consecutive_down ?? 0; diff --git a/apps/api/src/routes/status_pages.ts b/apps/api/src/routes/status_pages.ts index f829968..b8b3121 100644 --- a/apps/api/src/routes/status_pages.ts +++ b/apps/api/src/routes/status_pages.ts @@ -62,17 +62,22 @@ async function replaceGroupsAndMonitors( if (groups !== undefined) { await sql`DELETE FROM status_page_groups WHERE status_page_id = ${pageId}`; } + // Single bulk INSERT instead of one round-trip per group. The RETURNING set + // comes back in INSERT order, which equals the array order — that lets us + // map index → id without a follow-up SELECT. Mirrors the bulk insert pattern + // used by the monitors block right below. const groupIds: string[] = []; if (groups && groups.length > 0) { - for (let i = 0; i < groups.length; i++) { - const g = groups[i]!; - const [row] = await sql<{ id: string }[]>` - INSERT INTO status_page_groups (status_page_id, name, position) - VALUES (${pageId}, ${g.name}, ${g.position ?? i}) - RETURNING id - `; - groupIds.push(row!.id); - } + const rows = groups.map((g, i) => ({ + status_page_id: pageId, + name: g.name, + position: g.position ?? i, + })); + const inserted = await sql<{ id: string }[]>` + INSERT INTO status_page_groups ${sql(rows, "status_page_id", "name", "position")} + RETURNING id + `; + for (const r of inserted) groupIds.push(r.id); } if (monitorsList !== undefined) { diff --git a/apps/shared/db.ts b/apps/shared/db.ts index fac68ef..c908b1a 100644 --- a/apps/shared/db.ts +++ b/apps/shared/db.ts @@ -243,6 +243,17 @@ export async function migrate(sql: any) { `; await sql`CREATE INDEX IF NOT EXISTS idx_uptime_rollup_lookup ON monitor_uptime_rollup(monitor_id, bucket_type, bucket_start DESC)`; + // Watermark table for the rollup job. Each row records the most recent + // pings.checked_at that has been folded into a given bucket_type's rollup + // rows. Lets the periodic rollup pass scan only NEW pings since the last + // pass instead of re-aggregating the entire current bucket on every tick. + await sql` + CREATE TABLE IF NOT EXISTS rollup_watermarks ( + bucket_type TEXT PRIMARY KEY, + last_aggregated_at TIMESTAMPTZ NOT NULL DEFAULT to_timestamp(0) + ) + `; + await sql`CREATE INDEX IF NOT EXISTS idx_pings_monitor ON pings(monitor_id, checked_at DESC)`; await sql`CREATE INDEX IF NOT EXISTS idx_pings_checked_at ON pings(checked_at)`; diff --git a/apps/status/src/data.ts b/apps/status/src/data.ts index d68614e..27f4fd7 100644 --- a/apps/status/src/data.ts +++ b/apps/status/src/data.ts @@ -53,63 +53,11 @@ export interface MonitorRow { latency_history: Array<{ region: string; latency_ms: number | null; ts: string }>; } -// Single SQL pass that produces all four uptime windows for a set of monitors. -// Reads only the rollup table; falls back to a pings aggregate when the rollup -// has nothing for these monitors yet (same pattern as loadMonitors). -export async function loadMultiWindowUptime(monitorIds: string[]): Promise> { - const empty: Record = {}; - if (monitorIds.length === 0) return empty; - for (const id of monitorIds) empty[id] = { d24: null, d7: null, d30: null, d90: null }; - - const ids = sql.array(monitorIds); - - let rows = await sql` - SELECT monitor_id, - (sum(up_count) FILTER (WHERE bucket_type='hourly' AND bucket_start > now() - interval '24 hours'))::float - / NULLIF(sum(total) FILTER (WHERE bucket_type='hourly' AND bucket_start > now() - interval '24 hours'), 0) AS pct_24h, - (sum(up_count) FILTER (WHERE bucket_type='daily' AND bucket_start > now() - interval '7 days'))::float - / NULLIF(sum(total) FILTER (WHERE bucket_type='daily' AND bucket_start > now() - interval '7 days'), 0) AS pct_7d, - (sum(up_count) FILTER (WHERE bucket_type='daily' AND bucket_start > now() - interval '30 days'))::float - / NULLIF(sum(total) FILTER (WHERE bucket_type='daily' AND bucket_start > now() - interval '30 days'), 0) AS pct_30d, - (sum(up_count) FILTER (WHERE bucket_type='daily' AND bucket_start > now() - interval '90 days'))::float - / NULLIF(sum(total) FILTER (WHERE bucket_type='daily' AND bucket_start > now() - interval '90 days'), 0) AS pct_90d - FROM monitor_uptime_rollup - WHERE monitor_id = ANY(${ids}::text[]) - GROUP BY 1 - `; - - // Fallback when the rollup is empty: aggregate directly from pings. Bounded - // by the 90d window so it's still cheap. - if (rows.length === 0) { - rows = await sql` - SELECT monitor_id, - (count(*) FILTER (WHERE up AND checked_at > now() - interval '24 hours'))::float - / NULLIF(count(*) FILTER (WHERE checked_at > now() - interval '24 hours'), 0) AS pct_24h, - (count(*) FILTER (WHERE up AND checked_at > now() - interval '7 days'))::float - / NULLIF(count(*) FILTER (WHERE checked_at > now() - interval '7 days'), 0) AS pct_7d, - (count(*) FILTER (WHERE up AND checked_at > now() - interval '30 days'))::float - / NULLIF(count(*) FILTER (WHERE checked_at > now() - interval '30 days'), 0) AS pct_30d, - (count(*) FILTER (WHERE up AND checked_at > now() - interval '90 days'))::float - / NULLIF(count(*) FILTER (WHERE checked_at > now() - interval '90 days'), 0) AS pct_90d - FROM pings - WHERE monitor_id = ANY(${ids}::text[]) - AND checked_at > now() - interval '90 days' - GROUP BY 1 - `; - } - - const out = empty; - const toPct = (v: any): number | null => v == null ? null : +(Number(v) * 100).toFixed(2); - for (const r of rows) { - out[r.monitor_id] = { - d24: toPct(r.pct_24h), - d7: toPct(r.pct_7d), - d30: toPct(r.pct_30d), - d90: toPct(r.pct_90d), - }; - } - return out; -} +// Multi-window uptime (24h / 7d / 30d / 90d) is now derived from the same +// rollup row set that loadMonitors pulls for the bar chart — see the in-JS +// aggregation pass below. This used to be a second SQL round-trip running +// four FILTER aggregates that redid arithmetic the raw bucket rows already +// contained. export interface GroupRow { id: string; @@ -189,85 +137,186 @@ export async function loadMonitors( }); } - // Step 3: uptime rollup buckets covering the requested window. The bar - // frequency + count are admin-controlled per page (independent of the - // multi-window uptime cells). We keep region in the result so JS can pick - // the fastest region per monitor and emit per-bucket latency from just - // that region (status pages are customer-facing — show the best line). + // Step 3: ONE unified rollup query covering everything we need: + // - bar chart: bucket_type = barFrequency, last barCount buckets + // - multi-window uptime: hourly back 24h + daily back 90d + // - latency sparkline: hourly back 30h + // + // Union of all of those is "hourly back N hours OR daily back N days" with + // N chosen to cover whichever consumer needs the widest window. The rows + // are then partitioned by purpose entirely in JS — no second round-trip, + // no duplicate FILTER aggregates inside Postgres. const bucket: BucketType = barFrequency; const count = Math.max(1, Math.min(180, barCount)); - const truncUnit = bucket === "hourly" ? "hour" : "day"; - const intervalLiteral = `${count} ${truncUnit}s`; + + // Hourly span has to cover the latency sparkline (30h) AND the bar chart if + // it's hourly (up to 180h). +2h slack so the truncated bucket boundary at + // the start of the window is included even if we cross an hour during the + // request. + const hourlyBackHours = Math.max(30, bucket === "hourly" ? count : 0) + 2; + // Daily span has to cover multi-window uptime (90d) AND the bar chart if + // it's daily (up to 180d). +1d slack for the same reason. + const dailyBackDays = Math.max(90, bucket === "daily" ? count : 0) + 1; + const hourlyInterval = `${hourlyBackHours} hours`; + const dailyInterval = `${dailyBackDays} days`; + let rollupRows = await sql` - SELECT monitor_id, region, bucket_start, total, up_count, avg_latency + SELECT monitor_id, region, bucket_type, bucket_start, total, up_count, avg_latency FROM monitor_uptime_rollup WHERE monitor_id = ANY(${sql.array(ids)}::text[]) - AND bucket_type = ${bucket} - AND bucket_start > date_trunc(${truncUnit}, now()) - ${intervalLiteral}::interval - ORDER BY monitor_id, region, bucket_start ASC + AND ( + (bucket_type = 'hourly' AND bucket_start > now() - ${hourlyInterval}::interval) + OR + (bucket_type = 'daily' AND bucket_start > now() - ${dailyInterval}::interval) + ) + ORDER BY monitor_id, bucket_type, region, bucket_start ASC `; // Fallback: if the rollup table has nothing for any of these monitors in - // this window (e.g. the api hasn't backfilled yet, or the rollup job is - // silently broken), aggregate directly from pings. Bounded by the window so - // it stays cheap. Once the rollup catches up this branch never fires. + // either bucket type (cold deploy, broken job), aggregate directly from + // pings. Produces both bucket types via UNION ALL so downstream JS doesn't + // need to know which path it came from. Bounded by the wider of the two + // windows so it stays cheap. Once the rollup catches up this never fires. if (rollupRows.length === 0) { rollupRows = await sql` - SELECT - monitor_id, - COALESCE(region, 'default') AS region, - date_trunc(${truncUnit}, checked_at) AS bucket_start, - count(*)::int AS total, - count(*) FILTER (WHERE up)::int AS up_count, - avg(latency_ms)::real AS avg_latency - FROM pings - WHERE monitor_id = ANY(${sql.array(ids)}::text[]) - AND checked_at > date_trunc(${truncUnit}, now()) - ${intervalLiteral}::interval - GROUP BY 1, 2, 3 - ORDER BY 1, 2, 3 ASC + ( + SELECT + monitor_id, + COALESCE(region, 'default') AS region, + 'hourly'::text AS bucket_type, + date_trunc('hour', checked_at) AS bucket_start, + count(*)::int AS total, + count(*) FILTER (WHERE up)::int AS up_count, + avg(latency_ms)::real AS avg_latency + FROM pings + WHERE monitor_id = ANY(${sql.array(ids)}::text[]) + AND checked_at > now() - ${hourlyInterval}::interval + GROUP BY 1, 2, 4 + ) + UNION ALL + ( + SELECT + monitor_id, + COALESCE(region, 'default') AS region, + 'daily'::text AS bucket_type, + date_trunc('day', checked_at) AS bucket_start, + count(*)::int AS total, + count(*) FILTER (WHERE up)::int AS up_count, + avg(latency_ms)::real AS avg_latency + FROM pings + WHERE monitor_id = ANY(${sql.array(ids)}::text[]) + AND checked_at > now() - ${dailyInterval}::interval + GROUP BY 1, 2, 4 + ) `; } - // Single pass over rollup rows builds three indices: - // indexed[mid][isoStart] → cross-region {total, up} for bar coloring - // regionLat[mid][region] → cross-window weighted latency for picking fastest region - // regionBucketLat[mid][region][isoStart] → per-bucket latency for the fastest-region tooltip lookup - const indexed: Record> = {}; - const regionLat: Record> = {}; - const regionBucketLat: Record>> = {}; + // Single pass over the unified rows builds every index we need: + // barIndexed[mid][isoStart] → cross-region {total, up} for bar coloring (only rows of barFrequency) + // barRegionLat[mid][region] → weighted latency over the bar window for picking fastest region + // barRegionBucketLat[mid][region][iso] → per-bucket latency in the fastest region (only rows of barFrequency) + // windowTotals[mid][windowKey] → {up, total} per uptime window (24h/7d/30d/90d) + // latByMonitor[mid][] → 30h hourly latency sparkline rows + const barIndexed: Record> = {}; + const barRegionLat: Record> = {}; + const barRegionBucketLat: Record>> = {}; + + type WindowKey = "d24" | "d7" | "d30" | "d90"; + const windowTotals: Record> = {}; + const initWindowTotals = (mid: string) => { + if (!windowTotals[mid]) { + windowTotals[mid] = { + d24: { up: 0, total: 0 }, + d7: { up: 0, total: 0 }, + d30: { up: 0, total: 0 }, + d90: { up: 0, total: 0 }, + }; + } + return windowTotals[mid]!; + }; + + const latByMonitor: Record = {}; + + const nowMs = Date.now(); + const ms24h = 24 * 3600_000; + const ms7d = 7 * 86_400_000; + const ms30d = 30 * 86_400_000; + const ms90d = 90 * 86_400_000; + const ms30h = 30 * 3600_000; + for (const r of rollupRows) { - const startIso = r.bucket_start instanceof Date ? r.bucket_start.toISOString() : String(r.bucket_start); + const startDate = r.bucket_start instanceof Date ? r.bucket_start : new Date(r.bucket_start); + const startIso = startDate.toISOString(); + const startMs = startDate.getTime(); + const total = Number(r.total); + const up = Number(r.up_count); + const avgLat = r.avg_latency == null ? null : Number(r.avg_latency); + const mid = r.monitor_id; + const bt: BucketType = r.bucket_type; - // Cross-region bucket totals (for bar coloring) - if (!indexed[r.monitor_id]) indexed[r.monitor_id] = {}; - const slot = indexed[r.monitor_id]![startIso] ?? { total: 0, up: 0 }; - slot.total += Number(r.total); - slot.up += Number(r.up_count); - indexed[r.monitor_id]![startIso] = slot; + // Bar chart accumulators — only rows matching the configured bar frequency. + if (bt === bucket) { + if (!barIndexed[mid]) barIndexed[mid] = {}; + const slot = barIndexed[mid]![startIso] ?? { total: 0, up: 0 }; + slot.total += total; + slot.up += up; + barIndexed[mid]![startIso] = slot; - // Per-region latency tracking - if (r.avg_latency != null && Number(r.total) > 0) { - if (!regionLat[r.monitor_id]) regionLat[r.monitor_id] = {}; - const acc = regionLat[r.monitor_id]![r.region] ?? { sum: 0, n: 0 }; - acc.sum += Number(r.avg_latency) * Number(r.total); - acc.n += Number(r.total); - regionLat[r.monitor_id]![r.region] = acc; + if (avgLat != null && total > 0) { + if (!barRegionLat[mid]) barRegionLat[mid] = {}; + const acc = barRegionLat[mid]![r.region] ?? { sum: 0, n: 0 }; + acc.sum += avgLat * total; + acc.n += total; + barRegionLat[mid]![r.region] = acc; - if (!regionBucketLat[r.monitor_id]) regionBucketLat[r.monitor_id] = {}; - if (!regionBucketLat[r.monitor_id]![r.region]) regionBucketLat[r.monitor_id]![r.region] = {}; - regionBucketLat[r.monitor_id]![r.region]![startIso] = Math.round(Number(r.avg_latency)); + if (!barRegionBucketLat[mid]) barRegionBucketLat[mid] = {}; + if (!barRegionBucketLat[mid]![r.region]) barRegionBucketLat[mid]![r.region] = {}; + barRegionBucketLat[mid]![r.region]![startIso] = Math.round(avgLat); + } + } + + // Multi-window uptime accumulators. 24h uses hourly buckets; 7d/30d/90d + // use daily buckets — same as the old loadMultiWindowUptime SQL did. + // Strict `<` to match the old SQL's `bucket_start > now() - interval`. + const wt = initWindowTotals(mid); + if (bt === "hourly" && nowMs - startMs < ms24h) { + wt.d24.up += up; wt.d24.total += total; + } + if (bt === "daily") { + const age = nowMs - startMs; + if (age < ms7d) { wt.d7.up += up; wt.d7.total += total; } + if (age < ms30d) { wt.d30.up += up; wt.d30.total += total; } + if (age < ms90d) { wt.d90.up += up; wt.d90.total += total; } + } + + // 30h hourly latency sparkline. + if (bt === "hourly" && nowMs - startMs < ms30h) { + if (!latByMonitor[mid]) latByMonitor[mid] = []; + latByMonitor[mid]!.push({ + region: r.region, + latency_ms: avgLat == null ? null : Math.round(avgLat), + ts: startIso, + }); } } - // Pick the fastest region per monitor (lowest weighted average latency over - // the whole window). All per-bucket latency display falls back to this - // region's per-bucket numbers; the per-monitor avg_latency uses the same. + // Sort the latency sparkline rows by ts ASC per monitor (the unified query + // sorts by bucket_type then region then bucket_start, so the per-monitor + // hourly subset is already ordered within a region but interleaved across + // regions — this normalises it the same way the old separate query did). + for (const mid of Object.keys(latByMonitor)) { + latByMonitor[mid]!.sort((a, b) => a.ts.localeCompare(b.ts)); + } + + // Pick the fastest region per monitor over the bar window (lowest weighted + // average latency). Per-bucket latency display + the per-monitor avg_latency + // both come from the chosen region. const fastestRegionByMonitor: Record = {}; const fastestLatency: Record = {}; for (const id of ids) { let bestRegion: string | null = null; let bestAvg = Infinity; - const regions = regionLat[id] ?? {}; + const regions = barRegionLat[id] ?? {}; for (const [region, acc] of Object.entries(regions)) { if (acc.n === 0) continue; const avg = acc.sum / acc.n; @@ -278,8 +327,8 @@ export async function loadMonitors( } // Generate the full sequence of expected bucket timestamps so empty bars - // render as "no data" instead of disappearing entirely. Truncate `now()` to - // the unit so the slot boundaries line up with what the rollup writes. + // render as "no data" instead of disappearing entirely. Truncate `now()` + // to the unit so the slot boundaries line up with what the rollup writes. const bucketMs = bucket === "hourly" ? 3600_000 : 86_400_000; const truncate = (d: Date): Date => { const t = new Date(d); @@ -294,9 +343,9 @@ export async function loadMonitors( } const bucketsByMonitor: Record = {}; for (const id of ids) { - const slotMap = indexed[id] ?? {}; + const slotMap = barIndexed[id] ?? {}; const bestRegion = fastestRegionByMonitor[id]; - const fastestBuckets = bestRegion ? regionBucketLat[id]?.[bestRegion] ?? {} : {}; + const fastestBuckets = bestRegion ? barRegionBucketLat[id]?.[bestRegion] ?? {} : {}; bucketsByMonitor[id] = slotIsos.map((iso) => { const hit = slotMap[iso]; const lat = fastestBuckets[iso] ?? null; @@ -306,28 +355,19 @@ export async function loadMonitors( }); } - // Step 4: multi-window uptime row (24h / 7d / 30d / 90d) per monitor. - const multiWindow = await loadMultiWindowUptime(ids); - - // Step 5: tiny recent latency history for the sparkline (last 30 hourly buckets). - const latRows = await sql` - SELECT monitor_id, region, bucket_start, avg_latency - FROM monitor_uptime_rollup - WHERE monitor_id = ANY(${sql.array(ids)}::text[]) - AND bucket_type = 'hourly' - AND bucket_start > now() - interval '30 hours' - ORDER BY monitor_id, bucket_start ASC - `; - const latencyByMonitorList: Record = {}; - for (const r of latRows) { - if (!latencyByMonitorList[r.monitor_id]) latencyByMonitorList[r.monitor_id] = []; - latencyByMonitorList[r.monitor_id]!.push({ - region: r.region, - latency_ms: r.avg_latency != null ? Math.round(r.avg_latency) : null, - ts: r.bucket_start instanceof Date ? r.bucket_start.toISOString() : String(r.bucket_start), - }); + // Multi-window uptime is a straight read from the windowTotals accumulator. + const multiWindow: Record = {}; + const toPct = (up: number, total: number): number | null => + total > 0 ? +(100 * up / total).toFixed(2) : null; + for (const id of ids) { + const wt = windowTotals[id]; + multiWindow[id] = wt + ? { d24: toPct(wt.d24.up, wt.d24.total), d7: toPct(wt.d7.up, wt.d7.total), d30: toPct(wt.d30.up, wt.d30.total), d90: toPct(wt.d90.up, wt.d90.total) } + : { d24: null, d7: null, d30: null, d90: null }; } + const latencyByMonitorList = latByMonitor; + return monitorRows.map((m) => { const region_states = stateByMonitor[m.id] ?? []; let current_state: MonitorRow["current_state"] = "unknown"; diff --git a/apps/web/src/routes/dashboard.ts b/apps/web/src/routes/dashboard.ts index ac0f02d..c945350 100644 --- a/apps/web/src/routes/dashboard.ts +++ b/apps/web/src/routes/dashboard.ts @@ -283,23 +283,32 @@ export const dashboard = new Elysia() const keyId = resolved?.keyId ?? null; if (!accountId) return redirect("/dashboard"); - const [acc] = await sql`SELECT id, email_hash, plan, plan_expires_at, plan_stack, created_at FROM accounts WHERE id = ${accountId}`; const isSubKey = !!keyId; - const apiKeys = isSubKey ? [] : await sql`SELECT id, key, label, created_at, last_used_at FROM api_keys WHERE account_id = ${accountId} ORDER BY created_at DESC`; const loginKey = isSubKey ? null : (cookie?.pingql_key?.value ?? null); - const [{ count: monitorCount }] = await sql`SELECT COUNT(*)::int as count FROM monitors WHERE account_id = ${accountId}`; - let invoices: any[] = []; - try { - invoices = await sql` - SELECT id, plan, months, amount_usd, coin, amount_crypto, status, created_at, paid_at, expires_at, txid - FROM payments - WHERE account_id = ${accountId} - AND (status = 'paid' OR (status IN ('pending', 'underpaid', 'confirming') AND expires_at >= now())) - ORDER BY created_at DESC - LIMIT 20 - `; - } catch {} + // All four reads are independent — fan them out in parallel instead of + // serializing four round-trips. Each individual query is fast (PK seek or + // small indexed scan); the win is just halving the wall-clock by not + // waiting on each one in turn. + const accountQ = sql`SELECT id, email_hash, plan, plan_expires_at, plan_stack, created_at FROM accounts WHERE id = ${accountId}`; + const apiKeysQ = isSubKey + ? Promise.resolve([] as any[]) + : sql`SELECT id, key, label, created_at, last_used_at FROM api_keys WHERE account_id = ${accountId} ORDER BY created_at DESC`; + const monitorCountQ = sql`SELECT COUNT(*)::int as count FROM monitors WHERE account_id = ${accountId}`; + const invoicesQ = sql` + SELECT id, plan, months, amount_usd, coin, amount_crypto, status, created_at, paid_at, expires_at, txid + FROM payments + WHERE account_id = ${accountId} + AND (status = 'paid' OR (status IN ('pending', 'underpaid', 'confirming') AND expires_at >= now())) + ORDER BY created_at DESC + LIMIT 20 + `.catch(() => [] as any[]); + + const [accountRows, apiKeys, monitorCountRows, invoices] = await Promise.all([ + accountQ, apiKeysQ, monitorCountQ, invoicesQ, + ]); + const acc = accountRows[0]; + const monitorCount = monitorCountRows[0].count; return html("settings", { nav: "settings", account: acc, apiKeys, accountId, loginKey, isSubKey, monitorCount, invoices }); })