fix: improve sql queries
This commit is contained in:
parent
89f0856a04
commit
91ca996e74
|
|
@ -0,0 +1,62 @@
|
||||||
|
// In-memory cache of the enabled monitor list, keyed by region. The /internal/due
|
||||||
|
// endpoint is polled by every runner roughly once a second per region; without
|
||||||
|
// this cache each poll re-runs a 500-row scan against `monitors` with an array
|
||||||
|
// predicate, which dominates the api's Postgres traffic at any real fleet size.
|
||||||
|
//
|
||||||
|
// The list almost never changes between polls — monitor create/edit/delete is at
|
||||||
|
// most a few times per hour. So we memoize per-region with a short TTL and bust
|
||||||
|
// the cache from the monitor mutation handlers so edits are visible instantly.
|
||||||
|
|
||||||
|
import sql from "../db";
|
||||||
|
|
||||||
|
const TTL_MS = 5000;
|
||||||
|
|
||||||
|
type MonitorRow = Record<string, any>;
|
||||||
|
type Entry = { rows: MonitorRow[]; expiresAt: number };
|
||||||
|
|
||||||
|
const cache = new Map<string, Entry>();
|
||||||
|
const inflight = new Map<string, Promise<MonitorRow[]>>();
|
||||||
|
|
||||||
|
async function fetchForRegion(region: string): Promise<MonitorRow[]> {
|
||||||
|
return sql<MonitorRow[]>`
|
||||||
|
SELECT id, url, method, request_headers, request_body, timeout_ms, interval_s, query, regions,
|
||||||
|
max_retries, retry_interval_s, created_at
|
||||||
|
FROM monitors
|
||||||
|
WHERE enabled = true
|
||||||
|
AND (
|
||||||
|
array_length(regions, 1) IS NULL
|
||||||
|
OR regions = '{}'
|
||||||
|
OR ${region} = ANY(regions)
|
||||||
|
)
|
||||||
|
LIMIT 500
|
||||||
|
`;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function getMonitorsForRegion(region: string): Promise<MonitorRow[]> {
|
||||||
|
const now = Date.now();
|
||||||
|
const hit = cache.get(region);
|
||||||
|
if (hit && hit.expiresAt > now) return hit.rows;
|
||||||
|
|
||||||
|
// Coalesce concurrent refreshes for the same region so a thundering herd of
|
||||||
|
// runner polls doesn't fan out into N parallel SELECTs against an expired
|
||||||
|
// entry.
|
||||||
|
let pending = inflight.get(region);
|
||||||
|
if (!pending) {
|
||||||
|
pending = fetchForRegion(region)
|
||||||
|
.then((rows) => {
|
||||||
|
cache.set(region, { rows, expiresAt: Date.now() + TTL_MS });
|
||||||
|
return rows;
|
||||||
|
})
|
||||||
|
.finally(() => { inflight.delete(region); });
|
||||||
|
inflight.set(region, pending);
|
||||||
|
}
|
||||||
|
return pending;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Called by monitor create/patch/delete/toggle handlers. Wipes the entire
|
||||||
|
// region map — fine because (a) entries are tiny, (b) refresh is cheap, and
|
||||||
|
// (c) we don't know which regions a freshly-edited monitor belongs to without
|
||||||
|
// reading it back. Simpler than per-region invalidation, identical net effect.
|
||||||
|
export function invalidateMonitorList(): void {
|
||||||
|
cache.clear();
|
||||||
|
}
|
||||||
|
|
@ -2,7 +2,7 @@ import { Elysia } from "elysia";
|
||||||
import { ingest } from "./routes/pings";
|
import { ingest } from "./routes/pings";
|
||||||
import { monitors } from "./routes/monitors";
|
import { monitors } from "./routes/monitors";
|
||||||
import { account } from "./routes/auth";
|
import { account } from "./routes/auth";
|
||||||
import { internal } from "./routes/internal";
|
import { internal, startPruneJob } from "./routes/internal";
|
||||||
import { channels } from "./routes/channels";
|
import { channels } from "./routes/channels";
|
||||||
import { statusPages } from "./routes/status_pages";
|
import { statusPages } from "./routes/status_pages";
|
||||||
import { incidents } from "./routes/incidents";
|
import { incidents } from "./routes/incidents";
|
||||||
|
|
@ -21,6 +21,7 @@ process.on("uncaughtException", (err) => {
|
||||||
|
|
||||||
await migrate();
|
await migrate();
|
||||||
await startRollupJob();
|
await startRollupJob();
|
||||||
|
startPruneJob();
|
||||||
|
|
||||||
const elysia = new Elysia()
|
const elysia = new Elysia()
|
||||||
.get("/", () => ({
|
.get("/", () => ({
|
||||||
|
|
|
||||||
|
|
@ -4,8 +4,13 @@ import sql from "../db";
|
||||||
// widgets can compute uptime % over arbitrary windows without ever scanning the
|
// widgets can compute uptime % over arbitrary windows without ever scanning the
|
||||||
// pings table at read time. Two resolutions: hourly and daily.
|
// pings table at read time. Two resolutions: hourly and daily.
|
||||||
//
|
//
|
||||||
// Each pass aggregates the *current* bucket only. The query is bounded by the
|
// Watermark model: rollup_watermarks(bucket_type) records the most recent
|
||||||
// bucket size, not the table size, so it's cheap regardless of history depth.
|
// pings.checked_at that has been folded into this bucket_type's rollup rows.
|
||||||
|
// Each periodic pass scans ONLY pings newer than the watermark, then merges
|
||||||
|
// them into existing rollup rows additively (total = total + new_total, etc.)
|
||||||
|
// via ON CONFLICT … DO UPDATE. This makes per-pass work proportional to the
|
||||||
|
// delta of new pings, not the bucket size — critical once a single account has
|
||||||
|
// thousands of monitors.
|
||||||
|
|
||||||
type BucketType = "hourly" | "daily";
|
type BucketType = "hourly" | "daily";
|
||||||
|
|
||||||
|
|
@ -14,11 +19,49 @@ const BUCKET_TRUNC: Record<BucketType, string> = {
|
||||||
daily: "day",
|
daily: "day",
|
||||||
};
|
};
|
||||||
|
|
||||||
async function rollupCurrent(bucket: BucketType): Promise<number> {
|
// Pull the current watermark for a bucket type. First call after a fresh
|
||||||
|
// deploy returns epoch (1970-01-01) so the first pass folds the entire
|
||||||
|
// retention window in one go.
|
||||||
|
async function getWatermark(bucket: BucketType): Promise<Date> {
|
||||||
|
const [row] = await sql<{ last_aggregated_at: Date }[]>`
|
||||||
|
SELECT last_aggregated_at FROM rollup_watermarks WHERE bucket_type = ${bucket}
|
||||||
|
`;
|
||||||
|
return row?.last_aggregated_at ?? new Date(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function setWatermark(bucket: BucketType, ts: Date): Promise<void> {
|
||||||
|
await sql`
|
||||||
|
INSERT INTO rollup_watermarks (bucket_type, last_aggregated_at)
|
||||||
|
VALUES (${bucket}, ${ts})
|
||||||
|
ON CONFLICT (bucket_type) DO UPDATE SET last_aggregated_at = EXCLUDED.last_aggregated_at
|
||||||
|
`;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Aggregate every ping newer than the watermark into the appropriate bucket
|
||||||
|
// row. Merges into existing rows additively so we never re-scan rows that
|
||||||
|
// were already folded in on a previous pass. The avg_latency merge is a
|
||||||
|
// weighted average over the running totals.
|
||||||
|
async function rollupSinceWatermark(bucket: BucketType): Promise<number> {
|
||||||
const trunc = BUCKET_TRUNC[bucket];
|
const trunc = BUCKET_TRUNC[bucket];
|
||||||
|
const watermark = await getWatermark(bucket);
|
||||||
|
|
||||||
|
// Capture the upper bound BEFORE the aggregation runs so we don't miss any
|
||||||
|
// pings that arrive between SELECT and the watermark write. Anything ingested
|
||||||
|
// after `boundary` will be picked up on the next pass.
|
||||||
|
const boundary = new Date();
|
||||||
|
|
||||||
// GROUP BY 1,2,4 (ordinals) instead of repeating the date_trunc expression —
|
// GROUP BY 1,2,4 (ordinals) instead of repeating the date_trunc expression —
|
||||||
// when the unit is a $-bound parameter, Postgres won't recognize the two
|
// when the unit is a $-bound parameter, Postgres won't recognize the two
|
||||||
// expressions as identical and will reject the column. Ordinals are safe.
|
// expressions as identical and will reject the column. Ordinals are safe.
|
||||||
|
//
|
||||||
|
// The ON CONFLICT merge formula:
|
||||||
|
// total = old + new
|
||||||
|
// up_count = old + new
|
||||||
|
// avg_latency = weighted average of old and new, weighted by their totals
|
||||||
|
// (NULLIF guards against the degenerate empty-bucket case)
|
||||||
|
// The weighted-average formula is mathematically identical to a one-shot
|
||||||
|
// SUM(latency)/SUM(total) recompute, so the merged value is byte-equal to
|
||||||
|
// what a full rescan would produce.
|
||||||
const result = await sql`
|
const result = await sql`
|
||||||
INSERT INTO monitor_uptime_rollup (monitor_id, region, bucket_type, bucket_start, total, up_count, avg_latency)
|
INSERT INTO monitor_uptime_rollup (monitor_id, region, bucket_type, bucket_start, total, up_count, avg_latency)
|
||||||
SELECT
|
SELECT
|
||||||
|
|
@ -30,22 +73,31 @@ async function rollupCurrent(bucket: BucketType): Promise<number> {
|
||||||
count(*) FILTER (WHERE up)::int AS up_count,
|
count(*) FILTER (WHERE up)::int AS up_count,
|
||||||
avg(latency_ms)::real AS avg_latency
|
avg(latency_ms)::real AS avg_latency
|
||||||
FROM pings
|
FROM pings
|
||||||
WHERE checked_at >= date_trunc(${trunc}, now())
|
WHERE checked_at > ${watermark} AND checked_at <= ${boundary}
|
||||||
GROUP BY 1, 2, 3, 4
|
GROUP BY 1, 2, 3, 4
|
||||||
ON CONFLICT (monitor_id, region, bucket_type, bucket_start) DO UPDATE SET
|
ON CONFLICT (monitor_id, region, bucket_type, bucket_start) DO UPDATE SET
|
||||||
total = EXCLUDED.total,
|
total = monitor_uptime_rollup.total + EXCLUDED.total,
|
||||||
up_count = EXCLUDED.up_count,
|
up_count = monitor_uptime_rollup.up_count + EXCLUDED.up_count,
|
||||||
avg_latency = EXCLUDED.avg_latency
|
avg_latency = (
|
||||||
|
COALESCE(monitor_uptime_rollup.avg_latency, 0) * monitor_uptime_rollup.total
|
||||||
|
+ COALESCE(EXCLUDED.avg_latency, 0) * EXCLUDED.total
|
||||||
|
) / NULLIF(monitor_uptime_rollup.total + EXCLUDED.total, 0)
|
||||||
`;
|
`;
|
||||||
|
|
||||||
|
await setWatermark(bucket, boundary);
|
||||||
return result.count ?? 0;
|
return result.count ?? 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Walk back N units and aggregate any buckets that don't exist yet. Used at
|
// One-shot recompute over an arbitrary window, fully overwriting matched rows.
|
||||||
// startup so a freshly-deployed system has historical data immediately.
|
// Used for the startup backfill and the "still empty after backfill" force-run.
|
||||||
async function backfillRecent(bucket: BucketType, units: number): Promise<number> {
|
// Takes an explicit upper boundary so the caller can capture it BEFORE running
|
||||||
|
// the recompute and use the same value for the watermark write afterwards —
|
||||||
|
// any ping with checked_at > boundary is guaranteed to be outside this window
|
||||||
|
// and will be picked up by the first incremental pass instead. This closes
|
||||||
|
// the race where a ping ingested between boundary capture and the SELECT
|
||||||
|
// could otherwise be folded in by both recompute and incremental.
|
||||||
|
async function recomputeWindow(bucket: BucketType, units: number, boundary: Date): Promise<number> {
|
||||||
const trunc = BUCKET_TRUNC[bucket];
|
const trunc = BUCKET_TRUNC[bucket];
|
||||||
// Build the interval string entirely in JS so postgres.js binds a single text
|
|
||||||
// parameter. Avoids the int || text type-mismatch trap inside SQL.
|
|
||||||
const intervalLiteral = `${units} ${trunc}s`;
|
const intervalLiteral = `${units} ${trunc}s`;
|
||||||
const result = await sql`
|
const result = await sql`
|
||||||
INSERT INTO monitor_uptime_rollup (monitor_id, region, bucket_type, bucket_start, total, up_count, avg_latency)
|
INSERT INTO monitor_uptime_rollup (monitor_id, region, bucket_type, bucket_start, total, up_count, avg_latency)
|
||||||
|
|
@ -58,9 +110,13 @@ async function backfillRecent(bucket: BucketType, units: number): Promise<number
|
||||||
count(*) FILTER (WHERE up)::int AS up_count,
|
count(*) FILTER (WHERE up)::int AS up_count,
|
||||||
avg(latency_ms)::real AS avg_latency
|
avg(latency_ms)::real AS avg_latency
|
||||||
FROM pings
|
FROM pings
|
||||||
WHERE checked_at >= date_trunc(${trunc}, now()) - ${intervalLiteral}::interval
|
WHERE checked_at >= date_trunc(${trunc}, ${boundary}::timestamptz) - ${intervalLiteral}::interval
|
||||||
|
AND checked_at <= ${boundary}
|
||||||
GROUP BY 1, 2, 3, 4
|
GROUP BY 1, 2, 3, 4
|
||||||
ON CONFLICT (monitor_id, region, bucket_type, bucket_start) DO NOTHING
|
ON CONFLICT (monitor_id, region, bucket_type, bucket_start) DO UPDATE SET
|
||||||
|
total = EXCLUDED.total,
|
||||||
|
up_count = EXCLUDED.up_count,
|
||||||
|
avg_latency = EXCLUDED.avg_latency
|
||||||
`;
|
`;
|
||||||
return result.count ?? 0;
|
return result.count ?? 0;
|
||||||
}
|
}
|
||||||
|
|
@ -78,36 +134,45 @@ export async function startRollupJob() {
|
||||||
if (started) return;
|
if (started) return;
|
||||||
started = true;
|
started = true;
|
||||||
|
|
||||||
// Startup backfill. Errors are logged BUT NOT swallowed silently anymore —
|
// Startup backfill. Capture the boundary FIRST, then run the one-shot
|
||||||
// we throw so a broken rollup query trips the api process and shows in the
|
// recompute bounded by it. The watermark is then set to the same boundary,
|
||||||
// service logs immediately, instead of leaving the table mysteriously empty.
|
// so any ping with checked_at > boundary is guaranteed to be picked up by
|
||||||
|
// the first incremental pass — never folded in twice and never missed.
|
||||||
try {
|
try {
|
||||||
|
const boundary = new Date();
|
||||||
const [h, d] = await Promise.all([
|
const [h, d] = await Promise.all([
|
||||||
backfillRecent("hourly", 48),
|
recomputeWindow("hourly", 48, boundary),
|
||||||
backfillRecent("daily", 90),
|
recomputeWindow("daily", 90, boundary),
|
||||||
]);
|
]);
|
||||||
console.log(`[rollup] backfilled rows: hourly=${h} daily=${d}`);
|
console.log(`[rollup] backfilled rows: hourly=${h} daily=${d}`);
|
||||||
|
await Promise.all([
|
||||||
|
setWatermark("hourly", boundary),
|
||||||
|
setWatermark("daily", boundary),
|
||||||
|
]);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.error("[rollup] backfill FAILED — rollup table will be empty until fixed:", e);
|
console.error("[rollup] backfill FAILED — rollup table will be empty until fixed:", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Force-run check: if any bucket type is still empty after the backfill,
|
// Force-run check: if any bucket type is still empty after the backfill,
|
||||||
// run rollupCurrent immediately so we have at least one row per type. This
|
// run an incremental pass immediately so we have at least one row per type.
|
||||||
// covers the "fresh deploy with very recent pings only" case.
|
// Covers the "fresh deploy with very recent pings only" case.
|
||||||
try {
|
try {
|
||||||
for (const b of ["hourly", "daily"] as BucketType[]) {
|
for (const b of ["hourly", "daily"] as BucketType[]) {
|
||||||
if (await rollupIsEmpty(b)) {
|
if (await rollupIsEmpty(b)) {
|
||||||
console.log(`[rollup] ${b} still empty — forcing current-bucket aggregation`);
|
console.log(`[rollup] ${b} still empty — forcing incremental aggregation`);
|
||||||
await rollupCurrent(b);
|
// Reset watermark so the pass picks up everything in retention.
|
||||||
|
await setWatermark(b, new Date(0));
|
||||||
|
await rollupSinceWatermark(b);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.error("[rollup] force-run check failed:", e);
|
console.error("[rollup] force-run check failed:", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Periodic refreshes for the *current* bucket of each resolution. Each query
|
// Periodic incremental refreshes. Each pass scans only pings newer than the
|
||||||
// is bounded by the current bucket only (date_trunc(...)) so it stays cheap
|
// last watermark, so the work is proportional to the delta — not the bucket
|
||||||
// even at high cadence.
|
// size. Hourly runs frequently so the current-hour bar appears quickly for
|
||||||
setInterval(() => { rollupCurrent("hourly").catch((e) => console.warn("[rollup] hourly failed:", e)); }, 30 * 1000); // every 30s
|
// fresh monitors; daily can run less often.
|
||||||
setInterval(() => { rollupCurrent("daily").catch((e) => console.warn("[rollup] daily failed:", e)); }, 5 * 60 * 1000); // every 5min
|
setInterval(() => { rollupSinceWatermark("hourly").catch((e) => console.warn("[rollup] hourly failed:", e)); }, 30 * 1000); // every 30s
|
||||||
|
setInterval(() => { rollupSinceWatermark("daily").catch((e) => console.warn("[rollup] daily failed:", e)); }, 5 * 60 * 1000); // every 5min
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,16 +1,91 @@
|
||||||
import { Elysia } from "elysia";
|
import { Elysia } from "elysia";
|
||||||
import sql from "../db";
|
import sql from "../db";
|
||||||
import { safeTokenCompare } from "../../../shared/auth";
|
import { safeTokenCompare } from "../../../shared/auth";
|
||||||
|
import { getMonitorsForRegion } from "../cache/monitor-list";
|
||||||
|
|
||||||
|
// Chunked retention prune. A single big DELETE on the pings table holds locks
|
||||||
|
// for the duration of the scan, blocks autovacuum from reclaiming dead tuples,
|
||||||
|
// and can stall replication. We loop in 10k-row batches so each commit
|
||||||
|
// releases its locks and lets autovacuum keep up. Total wall-clock is similar
|
||||||
|
// to a one-shot DELETE but the per-statement impact on the rest of the system
|
||||||
|
// is dramatically lower.
|
||||||
|
const PRUNE_BATCH_SIZE = 10_000;
|
||||||
export async function pruneOldPings(retentionDays = 90) {
|
export async function pruneOldPings(retentionDays = 90) {
|
||||||
const result = await sql`DELETE FROM pings WHERE checked_at < now() - ${retentionDays + ' days'}::interval`;
|
const interval = `${retentionDays} days`;
|
||||||
return result.count;
|
let total = 0;
|
||||||
|
while (true) {
|
||||||
|
const result = await sql`
|
||||||
|
WITH victims AS (
|
||||||
|
SELECT id FROM pings
|
||||||
|
WHERE checked_at < now() - ${interval}::interval
|
||||||
|
LIMIT ${PRUNE_BATCH_SIZE}
|
||||||
|
)
|
||||||
|
DELETE FROM pings WHERE id IN (SELECT id FROM victims)
|
||||||
|
`;
|
||||||
|
const batch = result.count ?? 0;
|
||||||
|
total += batch;
|
||||||
|
if (batch < PRUNE_BATCH_SIZE) break;
|
||||||
|
}
|
||||||
|
return total;
|
||||||
}
|
}
|
||||||
|
|
||||||
setInterval(() => {
|
// Periodic prune is gated behind a Postgres session-level advisory lock so
|
||||||
const days = Number(process.env.PING_RETENTION_DAYS ?? 90);
|
// horizontally scaled api replicas don't race each other. Without the lock,
|
||||||
pruneOldPings(days).catch((err) => console.error("Retention cleanup failed:", err));
|
// two replicas running the same chunked DELETE would interleave their
|
||||||
}, 60 * 60 * 1000);
|
// LIMIT 10000 batches and compete for row locks. The lock is acquired with
|
||||||
|
// pg_try_advisory_lock so a busy replica skips its tick instead of blocking.
|
||||||
|
//
|
||||||
|
// Session-level locks live on a specific Postgres backend connection, so we
|
||||||
|
// reserve a connection from the pool with sql.reserve() and run the lock,
|
||||||
|
// the chunked prune, and the unlock all on it. Releasing the reservation
|
||||||
|
// returns the connection to the pool. If the api process crashes mid-prune,
|
||||||
|
// the backend dies with it and Postgres releases the lock automatically.
|
||||||
|
//
|
||||||
|
// 134678338 is just an arbitrary lock id unique within this app. If we add
|
||||||
|
// more global jobs later, give each its own constant in this file.
|
||||||
|
const PRUNE_LOCK_ID = 134678338;
|
||||||
|
let pruneJobStarted = false;
|
||||||
|
|
||||||
|
async function runPruneTickWithLock(retentionDays: number): Promise<void> {
|
||||||
|
const reserved = await (sql as any).reserve();
|
||||||
|
try {
|
||||||
|
const [{ locked }] = await reserved`
|
||||||
|
SELECT pg_try_advisory_lock(${PRUNE_LOCK_ID}) AS locked
|
||||||
|
`;
|
||||||
|
if (!locked) return; // another replica is pruning right now
|
||||||
|
try {
|
||||||
|
const interval = `${retentionDays} days`;
|
||||||
|
let total = 0;
|
||||||
|
while (true) {
|
||||||
|
const result = await reserved`
|
||||||
|
WITH victims AS (
|
||||||
|
SELECT id FROM pings
|
||||||
|
WHERE checked_at < now() - ${interval}::interval
|
||||||
|
LIMIT ${PRUNE_BATCH_SIZE}
|
||||||
|
)
|
||||||
|
DELETE FROM pings WHERE id IN (SELECT id FROM victims)
|
||||||
|
`;
|
||||||
|
const batch = result.count ?? 0;
|
||||||
|
total += batch;
|
||||||
|
if (batch < PRUNE_BATCH_SIZE) break;
|
||||||
|
}
|
||||||
|
if (total > 0) console.log(`[prune] retention pruned ${total} pings (>${retentionDays}d)`);
|
||||||
|
} finally {
|
||||||
|
await reserved`SELECT pg_advisory_unlock(${PRUNE_LOCK_ID})`;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
reserved.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function startPruneJob() {
|
||||||
|
if (pruneJobStarted) return;
|
||||||
|
pruneJobStarted = true;
|
||||||
|
setInterval(() => {
|
||||||
|
const days = Number(process.env.PING_RETENTION_DAYS ?? 90);
|
||||||
|
runPruneTickWithLock(days).catch((err) => console.error("Retention cleanup failed:", err));
|
||||||
|
}, 60 * 60 * 1000);
|
||||||
|
}
|
||||||
|
|
||||||
export const internal = new Elysia({ prefix: "/internal", detail: { hide: true } })
|
export const internal = new Elysia({ prefix: "/internal", detail: { hide: true } })
|
||||||
.derive(({ headers, set }) => {
|
.derive(({ headers, set }) => {
|
||||||
|
|
@ -37,18 +112,13 @@ export const internal = new Elysia({ prefix: "/internal", detail: { hide: true }
|
||||||
// creation is a tick. We pull all enabled monitors that match this
|
// creation is a tick. We pull all enabled monitors that match this
|
||||||
// region, compute the next tick in JS, and return the ones whose next
|
// region, compute the next tick in JS, and return the ones whose next
|
||||||
// tick falls within the lookahead window.
|
// tick falls within the lookahead window.
|
||||||
const monitors = await sql`
|
//
|
||||||
SELECT id, url, method, request_headers, request_body, timeout_ms, interval_s, query, regions,
|
// The monitor list itself is memoized in apps/api/src/cache/monitor-list.ts
|
||||||
max_retries, retry_interval_s, created_at
|
// with a 5s TTL — runners poll this endpoint roughly once a second per
|
||||||
FROM monitors
|
// region, but the underlying list almost never changes between polls. The
|
||||||
WHERE enabled = true
|
// cache is busted from monitor create/patch/delete/toggle so edits show up
|
||||||
AND (
|
// immediately.
|
||||||
array_length(regions, 1) IS NULL
|
const monitors = await getMonitorsForRegion(region);
|
||||||
OR regions = '{}'
|
|
||||||
OR ${region} = ANY(regions)
|
|
||||||
)
|
|
||||||
LIMIT 500
|
|
||||||
`;
|
|
||||||
|
|
||||||
const nowMs = Date.now();
|
const nowMs = Date.now();
|
||||||
const lookaheadEnd = nowMs + lookaheadMs;
|
const lookaheadEnd = nowMs + lookaheadMs;
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ import { requireAuth } from "./auth";
|
||||||
import sql from "../db";
|
import sql from "../db";
|
||||||
import { validateMonitorUrl } from "../utils/ssrf";
|
import { validateMonitorUrl } from "../utils/ssrf";
|
||||||
import { getPlanLimits } from "../../../shared/plans";
|
import { getPlanLimits } from "../../../shared/plans";
|
||||||
|
import { invalidateMonitorList } from "../cache/monitor-list";
|
||||||
|
|
||||||
const MonitorBody = t.Object({
|
const MonitorBody = t.Object({
|
||||||
name: t.String({ maxLength: 200, description: "Human-readable name" }),
|
name: t.String({ maxLength: 200, description: "Human-readable name" }),
|
||||||
|
|
@ -111,6 +112,7 @@ export const monitors = new Elysia({ prefix: "/monitors" })
|
||||||
`;
|
`;
|
||||||
if (body.channel_ids) await replaceMonitorChannels(monitor.id, accountId, body.channel_ids);
|
if (body.channel_ids) await replaceMonitorChannels(monitor.id, accountId, body.channel_ids);
|
||||||
if (body.tags) await replaceMonitorTags(monitor.id, body.tags);
|
if (body.tags) await replaceMonitorTags(monitor.id, body.tags);
|
||||||
|
invalidateMonitorList();
|
||||||
return monitor;
|
return monitor;
|
||||||
}, { body: MonitorBody, detail: { summary: "Create monitor", tags: ["monitors"] } })
|
}, { body: MonitorBody, detail: { summary: "Create monitor", tags: ["monitors"] } })
|
||||||
|
|
||||||
|
|
@ -177,6 +179,7 @@ export const monitors = new Elysia({ prefix: "/monitors" })
|
||||||
if (!monitor) { set.status = 404; return { error: "Not found" }; }
|
if (!monitor) { set.status = 404; return { error: "Not found" }; }
|
||||||
if (body.channel_ids) await replaceMonitorChannels(monitor.id, accountId, body.channel_ids);
|
if (body.channel_ids) await replaceMonitorChannels(monitor.id, accountId, body.channel_ids);
|
||||||
if (body.tags) await replaceMonitorTags(monitor.id, body.tags);
|
if (body.tags) await replaceMonitorTags(monitor.id, body.tags);
|
||||||
|
invalidateMonitorList();
|
||||||
return monitor;
|
return monitor;
|
||||||
}, { body: t.Partial(MonitorBody), detail: { summary: "Update monitor", tags: ["monitors"] } })
|
}, { body: t.Partial(MonitorBody), detail: { summary: "Update monitor", tags: ["monitors"] } })
|
||||||
|
|
||||||
|
|
@ -185,6 +188,7 @@ export const monitors = new Elysia({ prefix: "/monitors" })
|
||||||
DELETE FROM monitors WHERE id = ${params.id} AND account_id = ${accountId} RETURNING id
|
DELETE FROM monitors WHERE id = ${params.id} AND account_id = ${accountId} RETURNING id
|
||||||
`;
|
`;
|
||||||
if (!deleted) { set.status = 404; return { error: "Not found" }; }
|
if (!deleted) { set.status = 404; return { error: "Not found" }; }
|
||||||
|
invalidateMonitorList();
|
||||||
return { deleted: true };
|
return { deleted: true };
|
||||||
}, { detail: { summary: "Delete monitor", tags: ["monitors"] } })
|
}, { detail: { summary: "Delete monitor", tags: ["monitors"] } })
|
||||||
|
|
||||||
|
|
@ -195,6 +199,7 @@ export const monitors = new Elysia({ prefix: "/monitors" })
|
||||||
RETURNING id, enabled
|
RETURNING id, enabled
|
||||||
`;
|
`;
|
||||||
if (!monitor) { set.status = 404; return { error: "Not found" }; }
|
if (!monitor) { set.status = 404; return { error: "Not found" }; }
|
||||||
|
invalidateMonitorList();
|
||||||
return monitor;
|
return monitor;
|
||||||
}, { detail: { summary: "Toggle monitor on/off", tags: ["monitors"] } })
|
}, { detail: { summary: "Toggle monitor on/off", tags: ["monitors"] } })
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -56,10 +56,28 @@ export const ingest = new Elysia()
|
||||||
const token = headers["x-monitor-token"];
|
const token = headers["x-monitor-token"];
|
||||||
if (!safeTokenCompare(token, process.env.MONITOR_TOKEN)) { set.status = 401; return { error: "Unauthorized" }; }
|
if (!safeTokenCompare(token, process.env.MONITOR_TOKEN)) { set.status = 401; return { error: "Unauthorized" }; }
|
||||||
|
|
||||||
const [monitor_check] = await sql`
|
// Per-region transition state. Region is always populated by current runners;
|
||||||
SELECT id, account_id, name, url, resend_interval, cert_alert_days
|
// legacy null values from older pings collapse to "default" so state and
|
||||||
FROM monitors WHERE id = ${body.monitor_id}
|
// notifications never carry an empty label.
|
||||||
`;
|
const region = body.region && body.region.length > 0 ? body.region : 'default';
|
||||||
|
|
||||||
|
// The monitor lookup and the per-region state lookup are independent —
|
||||||
|
// the state row's primary key doesn't depend on anything from the monitor
|
||||||
|
// row. Fire them in parallel to halve the wall-clock cost on the hottest
|
||||||
|
// path in the system. (Combining them into a JOIN is a wash on a warm
|
||||||
|
// pool: both sides are PK lookups, and a JOIN just adds nested-loop
|
||||||
|
// planner overhead. Promise.all keeps each query's plan trivial.)
|
||||||
|
const [[monitor_check], [stateRow]] = await Promise.all([
|
||||||
|
sql`
|
||||||
|
SELECT id, account_id, name, url, resend_interval, cert_alert_days
|
||||||
|
FROM monitors WHERE id = ${body.monitor_id}
|
||||||
|
`,
|
||||||
|
sql`
|
||||||
|
SELECT last_state, consecutive_down, cert_alert_sent
|
||||||
|
FROM monitor_region_state
|
||||||
|
WHERE monitor_id = ${body.monitor_id} AND region = ${region}
|
||||||
|
`,
|
||||||
|
]);
|
||||||
if (!monitor_check) { set.status = 404; return { error: "Monitor not found" }; }
|
if (!monitor_check) { set.status = 404; return { error: "Monitor not found" }; }
|
||||||
|
|
||||||
const meta = body.meta ? { ...body.meta } : {};
|
const meta = body.meta ? { ...body.meta } : {};
|
||||||
|
|
@ -72,16 +90,6 @@ export const ingest = new Elysia()
|
||||||
const scheduledAt = body.scheduled_at ? new Date(body.scheduled_at) : null;
|
const scheduledAt = body.scheduled_at ? new Date(body.scheduled_at) : null;
|
||||||
const jitterMs = body.jitter_ms ?? null;
|
const jitterMs = body.jitter_ms ?? null;
|
||||||
|
|
||||||
// Per-region transition state. Region is always populated by current runners;
|
|
||||||
// legacy null values from older pings collapse to "default" so state and
|
|
||||||
// notifications never carry an empty label.
|
|
||||||
const region = body.region && body.region.length > 0 ? body.region : 'default';
|
|
||||||
const [stateRow] = await sql`
|
|
||||||
SELECT last_state, consecutive_down, cert_alert_sent
|
|
||||||
FROM monitor_region_state
|
|
||||||
WHERE monitor_id = ${body.monitor_id} AND region = ${region}
|
|
||||||
`;
|
|
||||||
|
|
||||||
const newState = body.up ? 'up' : 'down';
|
const newState = body.up ? 'up' : 'down';
|
||||||
const prevState: string | null = stateRow?.last_state ?? null;
|
const prevState: string | null = stateRow?.last_state ?? null;
|
||||||
let consecutiveDown: number = stateRow?.consecutive_down ?? 0;
|
let consecutiveDown: number = stateRow?.consecutive_down ?? 0;
|
||||||
|
|
|
||||||
|
|
@ -62,17 +62,22 @@ async function replaceGroupsAndMonitors(
|
||||||
if (groups !== undefined) {
|
if (groups !== undefined) {
|
||||||
await sql`DELETE FROM status_page_groups WHERE status_page_id = ${pageId}`;
|
await sql`DELETE FROM status_page_groups WHERE status_page_id = ${pageId}`;
|
||||||
}
|
}
|
||||||
|
// Single bulk INSERT instead of one round-trip per group. The RETURNING set
|
||||||
|
// comes back in INSERT order, which equals the array order — that lets us
|
||||||
|
// map index → id without a follow-up SELECT. Mirrors the bulk insert pattern
|
||||||
|
// used by the monitors block right below.
|
||||||
const groupIds: string[] = [];
|
const groupIds: string[] = [];
|
||||||
if (groups && groups.length > 0) {
|
if (groups && groups.length > 0) {
|
||||||
for (let i = 0; i < groups.length; i++) {
|
const rows = groups.map((g, i) => ({
|
||||||
const g = groups[i]!;
|
status_page_id: pageId,
|
||||||
const [row] = await sql<{ id: string }[]>`
|
name: g.name,
|
||||||
INSERT INTO status_page_groups (status_page_id, name, position)
|
position: g.position ?? i,
|
||||||
VALUES (${pageId}, ${g.name}, ${g.position ?? i})
|
}));
|
||||||
RETURNING id
|
const inserted = await sql<{ id: string }[]>`
|
||||||
`;
|
INSERT INTO status_page_groups ${sql(rows, "status_page_id", "name", "position")}
|
||||||
groupIds.push(row!.id);
|
RETURNING id
|
||||||
}
|
`;
|
||||||
|
for (const r of inserted) groupIds.push(r.id);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (monitorsList !== undefined) {
|
if (monitorsList !== undefined) {
|
||||||
|
|
|
||||||
|
|
@ -243,6 +243,17 @@ export async function migrate(sql: any) {
|
||||||
`;
|
`;
|
||||||
await sql`CREATE INDEX IF NOT EXISTS idx_uptime_rollup_lookup ON monitor_uptime_rollup(monitor_id, bucket_type, bucket_start DESC)`;
|
await sql`CREATE INDEX IF NOT EXISTS idx_uptime_rollup_lookup ON monitor_uptime_rollup(monitor_id, bucket_type, bucket_start DESC)`;
|
||||||
|
|
||||||
|
// Watermark table for the rollup job. Each row records the most recent
|
||||||
|
// pings.checked_at that has been folded into a given bucket_type's rollup
|
||||||
|
// rows. Lets the periodic rollup pass scan only NEW pings since the last
|
||||||
|
// pass instead of re-aggregating the entire current bucket on every tick.
|
||||||
|
await sql`
|
||||||
|
CREATE TABLE IF NOT EXISTS rollup_watermarks (
|
||||||
|
bucket_type TEXT PRIMARY KEY,
|
||||||
|
last_aggregated_at TIMESTAMPTZ NOT NULL DEFAULT to_timestamp(0)
|
||||||
|
)
|
||||||
|
`;
|
||||||
|
|
||||||
await sql`CREATE INDEX IF NOT EXISTS idx_pings_monitor ON pings(monitor_id, checked_at DESC)`;
|
await sql`CREATE INDEX IF NOT EXISTS idx_pings_monitor ON pings(monitor_id, checked_at DESC)`;
|
||||||
await sql`CREATE INDEX IF NOT EXISTS idx_pings_checked_at ON pings(checked_at)`;
|
await sql`CREATE INDEX IF NOT EXISTS idx_pings_checked_at ON pings(checked_at)`;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -53,63 +53,11 @@ export interface MonitorRow {
|
||||||
latency_history: Array<{ region: string; latency_ms: number | null; ts: string }>;
|
latency_history: Array<{ region: string; latency_ms: number | null; ts: string }>;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Single SQL pass that produces all four uptime windows for a set of monitors.
|
// Multi-window uptime (24h / 7d / 30d / 90d) is now derived from the same
|
||||||
// Reads only the rollup table; falls back to a pings aggregate when the rollup
|
// rollup row set that loadMonitors pulls for the bar chart — see the in-JS
|
||||||
// has nothing for these monitors yet (same pattern as loadMonitors).
|
// aggregation pass below. This used to be a second SQL round-trip running
|
||||||
export async function loadMultiWindowUptime(monitorIds: string[]): Promise<Record<string, MultiWindowUptime>> {
|
// four FILTER aggregates that redid arithmetic the raw bucket rows already
|
||||||
const empty: Record<string, MultiWindowUptime> = {};
|
// contained.
|
||||||
if (monitorIds.length === 0) return empty;
|
|
||||||
for (const id of monitorIds) empty[id] = { d24: null, d7: null, d30: null, d90: null };
|
|
||||||
|
|
||||||
const ids = sql.array(monitorIds);
|
|
||||||
|
|
||||||
let rows = await sql<any[]>`
|
|
||||||
SELECT monitor_id,
|
|
||||||
(sum(up_count) FILTER (WHERE bucket_type='hourly' AND bucket_start > now() - interval '24 hours'))::float
|
|
||||||
/ NULLIF(sum(total) FILTER (WHERE bucket_type='hourly' AND bucket_start > now() - interval '24 hours'), 0) AS pct_24h,
|
|
||||||
(sum(up_count) FILTER (WHERE bucket_type='daily' AND bucket_start > now() - interval '7 days'))::float
|
|
||||||
/ NULLIF(sum(total) FILTER (WHERE bucket_type='daily' AND bucket_start > now() - interval '7 days'), 0) AS pct_7d,
|
|
||||||
(sum(up_count) FILTER (WHERE bucket_type='daily' AND bucket_start > now() - interval '30 days'))::float
|
|
||||||
/ NULLIF(sum(total) FILTER (WHERE bucket_type='daily' AND bucket_start > now() - interval '30 days'), 0) AS pct_30d,
|
|
||||||
(sum(up_count) FILTER (WHERE bucket_type='daily' AND bucket_start > now() - interval '90 days'))::float
|
|
||||||
/ NULLIF(sum(total) FILTER (WHERE bucket_type='daily' AND bucket_start > now() - interval '90 days'), 0) AS pct_90d
|
|
||||||
FROM monitor_uptime_rollup
|
|
||||||
WHERE monitor_id = ANY(${ids}::text[])
|
|
||||||
GROUP BY 1
|
|
||||||
`;
|
|
||||||
|
|
||||||
// Fallback when the rollup is empty: aggregate directly from pings. Bounded
|
|
||||||
// by the 90d window so it's still cheap.
|
|
||||||
if (rows.length === 0) {
|
|
||||||
rows = await sql<any[]>`
|
|
||||||
SELECT monitor_id,
|
|
||||||
(count(*) FILTER (WHERE up AND checked_at > now() - interval '24 hours'))::float
|
|
||||||
/ NULLIF(count(*) FILTER (WHERE checked_at > now() - interval '24 hours'), 0) AS pct_24h,
|
|
||||||
(count(*) FILTER (WHERE up AND checked_at > now() - interval '7 days'))::float
|
|
||||||
/ NULLIF(count(*) FILTER (WHERE checked_at > now() - interval '7 days'), 0) AS pct_7d,
|
|
||||||
(count(*) FILTER (WHERE up AND checked_at > now() - interval '30 days'))::float
|
|
||||||
/ NULLIF(count(*) FILTER (WHERE checked_at > now() - interval '30 days'), 0) AS pct_30d,
|
|
||||||
(count(*) FILTER (WHERE up AND checked_at > now() - interval '90 days'))::float
|
|
||||||
/ NULLIF(count(*) FILTER (WHERE checked_at > now() - interval '90 days'), 0) AS pct_90d
|
|
||||||
FROM pings
|
|
||||||
WHERE monitor_id = ANY(${ids}::text[])
|
|
||||||
AND checked_at > now() - interval '90 days'
|
|
||||||
GROUP BY 1
|
|
||||||
`;
|
|
||||||
}
|
|
||||||
|
|
||||||
const out = empty;
|
|
||||||
const toPct = (v: any): number | null => v == null ? null : +(Number(v) * 100).toFixed(2);
|
|
||||||
for (const r of rows) {
|
|
||||||
out[r.monitor_id] = {
|
|
||||||
d24: toPct(r.pct_24h),
|
|
||||||
d7: toPct(r.pct_7d),
|
|
||||||
d30: toPct(r.pct_30d),
|
|
||||||
d90: toPct(r.pct_90d),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface GroupRow {
|
export interface GroupRow {
|
||||||
id: string;
|
id: string;
|
||||||
|
|
@ -189,85 +137,186 @@ export async function loadMonitors(
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 3: uptime rollup buckets covering the requested window. The bar
|
// Step 3: ONE unified rollup query covering everything we need:
|
||||||
// frequency + count are admin-controlled per page (independent of the
|
// - bar chart: bucket_type = barFrequency, last barCount buckets
|
||||||
// multi-window uptime cells). We keep region in the result so JS can pick
|
// - multi-window uptime: hourly back 24h + daily back 90d
|
||||||
// the fastest region per monitor and emit per-bucket latency from just
|
// - latency sparkline: hourly back 30h
|
||||||
// that region (status pages are customer-facing — show the best line).
|
//
|
||||||
|
// Union of all of those is "hourly back N hours OR daily back N days" with
|
||||||
|
// N chosen to cover whichever consumer needs the widest window. The rows
|
||||||
|
// are then partitioned by purpose entirely in JS — no second round-trip,
|
||||||
|
// no duplicate FILTER aggregates inside Postgres.
|
||||||
const bucket: BucketType = barFrequency;
|
const bucket: BucketType = barFrequency;
|
||||||
const count = Math.max(1, Math.min(180, barCount));
|
const count = Math.max(1, Math.min(180, barCount));
|
||||||
const truncUnit = bucket === "hourly" ? "hour" : "day";
|
|
||||||
const intervalLiteral = `${count} ${truncUnit}s`;
|
// Hourly span has to cover the latency sparkline (30h) AND the bar chart if
|
||||||
|
// it's hourly (up to 180h). +2h slack so the truncated bucket boundary at
|
||||||
|
// the start of the window is included even if we cross an hour during the
|
||||||
|
// request.
|
||||||
|
const hourlyBackHours = Math.max(30, bucket === "hourly" ? count : 0) + 2;
|
||||||
|
// Daily span has to cover multi-window uptime (90d) AND the bar chart if
|
||||||
|
// it's daily (up to 180d). +1d slack for the same reason.
|
||||||
|
const dailyBackDays = Math.max(90, bucket === "daily" ? count : 0) + 1;
|
||||||
|
const hourlyInterval = `${hourlyBackHours} hours`;
|
||||||
|
const dailyInterval = `${dailyBackDays} days`;
|
||||||
|
|
||||||
let rollupRows = await sql<any[]>`
|
let rollupRows = await sql<any[]>`
|
||||||
SELECT monitor_id, region, bucket_start, total, up_count, avg_latency
|
SELECT monitor_id, region, bucket_type, bucket_start, total, up_count, avg_latency
|
||||||
FROM monitor_uptime_rollup
|
FROM monitor_uptime_rollup
|
||||||
WHERE monitor_id = ANY(${sql.array(ids)}::text[])
|
WHERE monitor_id = ANY(${sql.array(ids)}::text[])
|
||||||
AND bucket_type = ${bucket}
|
AND (
|
||||||
AND bucket_start > date_trunc(${truncUnit}, now()) - ${intervalLiteral}::interval
|
(bucket_type = 'hourly' AND bucket_start > now() - ${hourlyInterval}::interval)
|
||||||
ORDER BY monitor_id, region, bucket_start ASC
|
OR
|
||||||
|
(bucket_type = 'daily' AND bucket_start > now() - ${dailyInterval}::interval)
|
||||||
|
)
|
||||||
|
ORDER BY monitor_id, bucket_type, region, bucket_start ASC
|
||||||
`;
|
`;
|
||||||
|
|
||||||
// Fallback: if the rollup table has nothing for any of these monitors in
|
// Fallback: if the rollup table has nothing for any of these monitors in
|
||||||
// this window (e.g. the api hasn't backfilled yet, or the rollup job is
|
// either bucket type (cold deploy, broken job), aggregate directly from
|
||||||
// silently broken), aggregate directly from pings. Bounded by the window so
|
// pings. Produces both bucket types via UNION ALL so downstream JS doesn't
|
||||||
// it stays cheap. Once the rollup catches up this branch never fires.
|
// need to know which path it came from. Bounded by the wider of the two
|
||||||
|
// windows so it stays cheap. Once the rollup catches up this never fires.
|
||||||
if (rollupRows.length === 0) {
|
if (rollupRows.length === 0) {
|
||||||
rollupRows = await sql<any[]>`
|
rollupRows = await sql<any[]>`
|
||||||
SELECT
|
(
|
||||||
monitor_id,
|
SELECT
|
||||||
COALESCE(region, 'default') AS region,
|
monitor_id,
|
||||||
date_trunc(${truncUnit}, checked_at) AS bucket_start,
|
COALESCE(region, 'default') AS region,
|
||||||
count(*)::int AS total,
|
'hourly'::text AS bucket_type,
|
||||||
count(*) FILTER (WHERE up)::int AS up_count,
|
date_trunc('hour', checked_at) AS bucket_start,
|
||||||
avg(latency_ms)::real AS avg_latency
|
count(*)::int AS total,
|
||||||
FROM pings
|
count(*) FILTER (WHERE up)::int AS up_count,
|
||||||
WHERE monitor_id = ANY(${sql.array(ids)}::text[])
|
avg(latency_ms)::real AS avg_latency
|
||||||
AND checked_at > date_trunc(${truncUnit}, now()) - ${intervalLiteral}::interval
|
FROM pings
|
||||||
GROUP BY 1, 2, 3
|
WHERE monitor_id = ANY(${sql.array(ids)}::text[])
|
||||||
ORDER BY 1, 2, 3 ASC
|
AND checked_at > now() - ${hourlyInterval}::interval
|
||||||
|
GROUP BY 1, 2, 4
|
||||||
|
)
|
||||||
|
UNION ALL
|
||||||
|
(
|
||||||
|
SELECT
|
||||||
|
monitor_id,
|
||||||
|
COALESCE(region, 'default') AS region,
|
||||||
|
'daily'::text AS bucket_type,
|
||||||
|
date_trunc('day', checked_at) AS bucket_start,
|
||||||
|
count(*)::int AS total,
|
||||||
|
count(*) FILTER (WHERE up)::int AS up_count,
|
||||||
|
avg(latency_ms)::real AS avg_latency
|
||||||
|
FROM pings
|
||||||
|
WHERE monitor_id = ANY(${sql.array(ids)}::text[])
|
||||||
|
AND checked_at > now() - ${dailyInterval}::interval
|
||||||
|
GROUP BY 1, 2, 4
|
||||||
|
)
|
||||||
`;
|
`;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Single pass over rollup rows builds three indices:
|
// Single pass over the unified rows builds every index we need:
|
||||||
// indexed[mid][isoStart] → cross-region {total, up} for bar coloring
|
// barIndexed[mid][isoStart] → cross-region {total, up} for bar coloring (only rows of barFrequency)
|
||||||
// regionLat[mid][region] → cross-window weighted latency for picking fastest region
|
// barRegionLat[mid][region] → weighted latency over the bar window for picking fastest region
|
||||||
// regionBucketLat[mid][region][isoStart] → per-bucket latency for the fastest-region tooltip lookup
|
// barRegionBucketLat[mid][region][iso] → per-bucket latency in the fastest region (only rows of barFrequency)
|
||||||
const indexed: Record<string, Record<string, { total: number; up: number }>> = {};
|
// windowTotals[mid][windowKey] → {up, total} per uptime window (24h/7d/30d/90d)
|
||||||
const regionLat: Record<string, Record<string, { sum: number; n: number }>> = {};
|
// latByMonitor[mid][] → 30h hourly latency sparkline rows
|
||||||
const regionBucketLat: Record<string, Record<string, Record<string, number>>> = {};
|
const barIndexed: Record<string, Record<string, { total: number; up: number }>> = {};
|
||||||
|
const barRegionLat: Record<string, Record<string, { sum: number; n: number }>> = {};
|
||||||
|
const barRegionBucketLat: Record<string, Record<string, Record<string, number>>> = {};
|
||||||
|
|
||||||
|
type WindowKey = "d24" | "d7" | "d30" | "d90";
|
||||||
|
const windowTotals: Record<string, Record<WindowKey, { up: number; total: number }>> = {};
|
||||||
|
const initWindowTotals = (mid: string) => {
|
||||||
|
if (!windowTotals[mid]) {
|
||||||
|
windowTotals[mid] = {
|
||||||
|
d24: { up: 0, total: 0 },
|
||||||
|
d7: { up: 0, total: 0 },
|
||||||
|
d30: { up: 0, total: 0 },
|
||||||
|
d90: { up: 0, total: 0 },
|
||||||
|
};
|
||||||
|
}
|
||||||
|
return windowTotals[mid]!;
|
||||||
|
};
|
||||||
|
|
||||||
|
const latByMonitor: Record<string, MonitorRow["latency_history"]> = {};
|
||||||
|
|
||||||
|
const nowMs = Date.now();
|
||||||
|
const ms24h = 24 * 3600_000;
|
||||||
|
const ms7d = 7 * 86_400_000;
|
||||||
|
const ms30d = 30 * 86_400_000;
|
||||||
|
const ms90d = 90 * 86_400_000;
|
||||||
|
const ms30h = 30 * 3600_000;
|
||||||
|
|
||||||
for (const r of rollupRows) {
|
for (const r of rollupRows) {
|
||||||
const startIso = r.bucket_start instanceof Date ? r.bucket_start.toISOString() : String(r.bucket_start);
|
const startDate = r.bucket_start instanceof Date ? r.bucket_start : new Date(r.bucket_start);
|
||||||
|
const startIso = startDate.toISOString();
|
||||||
|
const startMs = startDate.getTime();
|
||||||
|
const total = Number(r.total);
|
||||||
|
const up = Number(r.up_count);
|
||||||
|
const avgLat = r.avg_latency == null ? null : Number(r.avg_latency);
|
||||||
|
const mid = r.monitor_id;
|
||||||
|
const bt: BucketType = r.bucket_type;
|
||||||
|
|
||||||
// Cross-region bucket totals (for bar coloring)
|
// Bar chart accumulators — only rows matching the configured bar frequency.
|
||||||
if (!indexed[r.monitor_id]) indexed[r.monitor_id] = {};
|
if (bt === bucket) {
|
||||||
const slot = indexed[r.monitor_id]![startIso] ?? { total: 0, up: 0 };
|
if (!barIndexed[mid]) barIndexed[mid] = {};
|
||||||
slot.total += Number(r.total);
|
const slot = barIndexed[mid]![startIso] ?? { total: 0, up: 0 };
|
||||||
slot.up += Number(r.up_count);
|
slot.total += total;
|
||||||
indexed[r.monitor_id]![startIso] = slot;
|
slot.up += up;
|
||||||
|
barIndexed[mid]![startIso] = slot;
|
||||||
|
|
||||||
// Per-region latency tracking
|
if (avgLat != null && total > 0) {
|
||||||
if (r.avg_latency != null && Number(r.total) > 0) {
|
if (!barRegionLat[mid]) barRegionLat[mid] = {};
|
||||||
if (!regionLat[r.monitor_id]) regionLat[r.monitor_id] = {};
|
const acc = barRegionLat[mid]![r.region] ?? { sum: 0, n: 0 };
|
||||||
const acc = regionLat[r.monitor_id]![r.region] ?? { sum: 0, n: 0 };
|
acc.sum += avgLat * total;
|
||||||
acc.sum += Number(r.avg_latency) * Number(r.total);
|
acc.n += total;
|
||||||
acc.n += Number(r.total);
|
barRegionLat[mid]![r.region] = acc;
|
||||||
regionLat[r.monitor_id]![r.region] = acc;
|
|
||||||
|
|
||||||
if (!regionBucketLat[r.monitor_id]) regionBucketLat[r.monitor_id] = {};
|
if (!barRegionBucketLat[mid]) barRegionBucketLat[mid] = {};
|
||||||
if (!regionBucketLat[r.monitor_id]![r.region]) regionBucketLat[r.monitor_id]![r.region] = {};
|
if (!barRegionBucketLat[mid]![r.region]) barRegionBucketLat[mid]![r.region] = {};
|
||||||
regionBucketLat[r.monitor_id]![r.region]![startIso] = Math.round(Number(r.avg_latency));
|
barRegionBucketLat[mid]![r.region]![startIso] = Math.round(avgLat);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Multi-window uptime accumulators. 24h uses hourly buckets; 7d/30d/90d
|
||||||
|
// use daily buckets — same as the old loadMultiWindowUptime SQL did.
|
||||||
|
// Strict `<` to match the old SQL's `bucket_start > now() - interval`.
|
||||||
|
const wt = initWindowTotals(mid);
|
||||||
|
if (bt === "hourly" && nowMs - startMs < ms24h) {
|
||||||
|
wt.d24.up += up; wt.d24.total += total;
|
||||||
|
}
|
||||||
|
if (bt === "daily") {
|
||||||
|
const age = nowMs - startMs;
|
||||||
|
if (age < ms7d) { wt.d7.up += up; wt.d7.total += total; }
|
||||||
|
if (age < ms30d) { wt.d30.up += up; wt.d30.total += total; }
|
||||||
|
if (age < ms90d) { wt.d90.up += up; wt.d90.total += total; }
|
||||||
|
}
|
||||||
|
|
||||||
|
// 30h hourly latency sparkline.
|
||||||
|
if (bt === "hourly" && nowMs - startMs < ms30h) {
|
||||||
|
if (!latByMonitor[mid]) latByMonitor[mid] = [];
|
||||||
|
latByMonitor[mid]!.push({
|
||||||
|
region: r.region,
|
||||||
|
latency_ms: avgLat == null ? null : Math.round(avgLat),
|
||||||
|
ts: startIso,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pick the fastest region per monitor (lowest weighted average latency over
|
// Sort the latency sparkline rows by ts ASC per monitor (the unified query
|
||||||
// the whole window). All per-bucket latency display falls back to this
|
// sorts by bucket_type then region then bucket_start, so the per-monitor
|
||||||
// region's per-bucket numbers; the per-monitor avg_latency uses the same.
|
// hourly subset is already ordered within a region but interleaved across
|
||||||
|
// regions — this normalises it the same way the old separate query did).
|
||||||
|
for (const mid of Object.keys(latByMonitor)) {
|
||||||
|
latByMonitor[mid]!.sort((a, b) => a.ts.localeCompare(b.ts));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pick the fastest region per monitor over the bar window (lowest weighted
|
||||||
|
// average latency). Per-bucket latency display + the per-monitor avg_latency
|
||||||
|
// both come from the chosen region.
|
||||||
const fastestRegionByMonitor: Record<string, string | null> = {};
|
const fastestRegionByMonitor: Record<string, string | null> = {};
|
||||||
const fastestLatency: Record<string, number | null> = {};
|
const fastestLatency: Record<string, number | null> = {};
|
||||||
for (const id of ids) {
|
for (const id of ids) {
|
||||||
let bestRegion: string | null = null;
|
let bestRegion: string | null = null;
|
||||||
let bestAvg = Infinity;
|
let bestAvg = Infinity;
|
||||||
const regions = regionLat[id] ?? {};
|
const regions = barRegionLat[id] ?? {};
|
||||||
for (const [region, acc] of Object.entries(regions)) {
|
for (const [region, acc] of Object.entries(regions)) {
|
||||||
if (acc.n === 0) continue;
|
if (acc.n === 0) continue;
|
||||||
const avg = acc.sum / acc.n;
|
const avg = acc.sum / acc.n;
|
||||||
|
|
@ -278,8 +327,8 @@ export async function loadMonitors(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate the full sequence of expected bucket timestamps so empty bars
|
// Generate the full sequence of expected bucket timestamps so empty bars
|
||||||
// render as "no data" instead of disappearing entirely. Truncate `now()` to
|
// render as "no data" instead of disappearing entirely. Truncate `now()`
|
||||||
// the unit so the slot boundaries line up with what the rollup writes.
|
// to the unit so the slot boundaries line up with what the rollup writes.
|
||||||
const bucketMs = bucket === "hourly" ? 3600_000 : 86_400_000;
|
const bucketMs = bucket === "hourly" ? 3600_000 : 86_400_000;
|
||||||
const truncate = (d: Date): Date => {
|
const truncate = (d: Date): Date => {
|
||||||
const t = new Date(d);
|
const t = new Date(d);
|
||||||
|
|
@ -294,9 +343,9 @@ export async function loadMonitors(
|
||||||
}
|
}
|
||||||
const bucketsByMonitor: Record<string, MonitorRow["buckets"]> = {};
|
const bucketsByMonitor: Record<string, MonitorRow["buckets"]> = {};
|
||||||
for (const id of ids) {
|
for (const id of ids) {
|
||||||
const slotMap = indexed[id] ?? {};
|
const slotMap = barIndexed[id] ?? {};
|
||||||
const bestRegion = fastestRegionByMonitor[id];
|
const bestRegion = fastestRegionByMonitor[id];
|
||||||
const fastestBuckets = bestRegion ? regionBucketLat[id]?.[bestRegion] ?? {} : {};
|
const fastestBuckets = bestRegion ? barRegionBucketLat[id]?.[bestRegion] ?? {} : {};
|
||||||
bucketsByMonitor[id] = slotIsos.map((iso) => {
|
bucketsByMonitor[id] = slotIsos.map((iso) => {
|
||||||
const hit = slotMap[iso];
|
const hit = slotMap[iso];
|
||||||
const lat = fastestBuckets[iso] ?? null;
|
const lat = fastestBuckets[iso] ?? null;
|
||||||
|
|
@ -306,28 +355,19 @@ export async function loadMonitors(
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 4: multi-window uptime row (24h / 7d / 30d / 90d) per monitor.
|
// Multi-window uptime is a straight read from the windowTotals accumulator.
|
||||||
const multiWindow = await loadMultiWindowUptime(ids);
|
const multiWindow: Record<string, MultiWindowUptime> = {};
|
||||||
|
const toPct = (up: number, total: number): number | null =>
|
||||||
// Step 5: tiny recent latency history for the sparkline (last 30 hourly buckets).
|
total > 0 ? +(100 * up / total).toFixed(2) : null;
|
||||||
const latRows = await sql<any[]>`
|
for (const id of ids) {
|
||||||
SELECT monitor_id, region, bucket_start, avg_latency
|
const wt = windowTotals[id];
|
||||||
FROM monitor_uptime_rollup
|
multiWindow[id] = wt
|
||||||
WHERE monitor_id = ANY(${sql.array(ids)}::text[])
|
? { d24: toPct(wt.d24.up, wt.d24.total), d7: toPct(wt.d7.up, wt.d7.total), d30: toPct(wt.d30.up, wt.d30.total), d90: toPct(wt.d90.up, wt.d90.total) }
|
||||||
AND bucket_type = 'hourly'
|
: { d24: null, d7: null, d30: null, d90: null };
|
||||||
AND bucket_start > now() - interval '30 hours'
|
|
||||||
ORDER BY monitor_id, bucket_start ASC
|
|
||||||
`;
|
|
||||||
const latencyByMonitorList: Record<string, MonitorRow["latency_history"]> = {};
|
|
||||||
for (const r of latRows) {
|
|
||||||
if (!latencyByMonitorList[r.monitor_id]) latencyByMonitorList[r.monitor_id] = [];
|
|
||||||
latencyByMonitorList[r.monitor_id]!.push({
|
|
||||||
region: r.region,
|
|
||||||
latency_ms: r.avg_latency != null ? Math.round(r.avg_latency) : null,
|
|
||||||
ts: r.bucket_start instanceof Date ? r.bucket_start.toISOString() : String(r.bucket_start),
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const latencyByMonitorList = latByMonitor;
|
||||||
|
|
||||||
return monitorRows.map((m) => {
|
return monitorRows.map((m) => {
|
||||||
const region_states = stateByMonitor[m.id] ?? [];
|
const region_states = stateByMonitor[m.id] ?? [];
|
||||||
let current_state: MonitorRow["current_state"] = "unknown";
|
let current_state: MonitorRow["current_state"] = "unknown";
|
||||||
|
|
|
||||||
|
|
@ -283,23 +283,32 @@ export const dashboard = new Elysia()
|
||||||
const keyId = resolved?.keyId ?? null;
|
const keyId = resolved?.keyId ?? null;
|
||||||
if (!accountId) return redirect("/dashboard");
|
if (!accountId) return redirect("/dashboard");
|
||||||
|
|
||||||
const [acc] = await sql`SELECT id, email_hash, plan, plan_expires_at, plan_stack, created_at FROM accounts WHERE id = ${accountId}`;
|
|
||||||
const isSubKey = !!keyId;
|
const isSubKey = !!keyId;
|
||||||
const apiKeys = isSubKey ? [] : await sql`SELECT id, key, label, created_at, last_used_at FROM api_keys WHERE account_id = ${accountId} ORDER BY created_at DESC`;
|
|
||||||
const loginKey = isSubKey ? null : (cookie?.pingql_key?.value ?? null);
|
const loginKey = isSubKey ? null : (cookie?.pingql_key?.value ?? null);
|
||||||
const [{ count: monitorCount }] = await sql`SELECT COUNT(*)::int as count FROM monitors WHERE account_id = ${accountId}`;
|
|
||||||
|
|
||||||
let invoices: any[] = [];
|
// All four reads are independent — fan them out in parallel instead of
|
||||||
try {
|
// serializing four round-trips. Each individual query is fast (PK seek or
|
||||||
invoices = await sql`
|
// small indexed scan); the win is just halving the wall-clock by not
|
||||||
SELECT id, plan, months, amount_usd, coin, amount_crypto, status, created_at, paid_at, expires_at, txid
|
// waiting on each one in turn.
|
||||||
FROM payments
|
const accountQ = sql`SELECT id, email_hash, plan, plan_expires_at, plan_stack, created_at FROM accounts WHERE id = ${accountId}`;
|
||||||
WHERE account_id = ${accountId}
|
const apiKeysQ = isSubKey
|
||||||
AND (status = 'paid' OR (status IN ('pending', 'underpaid', 'confirming') AND expires_at >= now()))
|
? Promise.resolve([] as any[])
|
||||||
ORDER BY created_at DESC
|
: sql`SELECT id, key, label, created_at, last_used_at FROM api_keys WHERE account_id = ${accountId} ORDER BY created_at DESC`;
|
||||||
LIMIT 20
|
const monitorCountQ = sql`SELECT COUNT(*)::int as count FROM monitors WHERE account_id = ${accountId}`;
|
||||||
`;
|
const invoicesQ = sql`
|
||||||
} catch {}
|
SELECT id, plan, months, amount_usd, coin, amount_crypto, status, created_at, paid_at, expires_at, txid
|
||||||
|
FROM payments
|
||||||
|
WHERE account_id = ${accountId}
|
||||||
|
AND (status = 'paid' OR (status IN ('pending', 'underpaid', 'confirming') AND expires_at >= now()))
|
||||||
|
ORDER BY created_at DESC
|
||||||
|
LIMIT 20
|
||||||
|
`.catch(() => [] as any[]);
|
||||||
|
|
||||||
|
const [accountRows, apiKeys, monitorCountRows, invoices] = await Promise.all([
|
||||||
|
accountQ, apiKeysQ, monitorCountQ, invoicesQ,
|
||||||
|
]);
|
||||||
|
const acc = accountRows[0];
|
||||||
|
const monitorCount = monitorCountRows[0].count;
|
||||||
|
|
||||||
return html("settings", { nav: "settings", account: acc, apiKeys, accountId, loginKey, isSubKey, monitorCount, invoices });
|
return html("settings", { nav: "settings", account: acc, apiKeys, accountId, loginKey, isSubKey, monitorCount, invoices });
|
||||||
})
|
})
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue