This commit is contained in:
nate 2026-04-08 16:09:26 +04:00
parent 33896daf77
commit 60037edf21
2 changed files with 37 additions and 13 deletions

View File

@ -17,8 +17,9 @@ const BUCKET_TRUNC: Record<BucketType, string> = {
async function rollupCurrent(bucket: BucketType): Promise<number> { async function rollupCurrent(bucket: BucketType): Promise<number> {
const trunc = BUCKET_TRUNC[bucket]; const trunc = BUCKET_TRUNC[bucket];
// Aggregate the bucket containing now(). ON CONFLICT updates if the row exists, // GROUP BY 1,2,4 (ordinals) instead of repeating the date_trunc expression —
// so this is safe to run repeatedly during the bucket's lifetime. // when the unit is a $-bound parameter, Postgres won't recognize the two
// expressions as identical and will reject the column. Ordinals are safe.
const result = await sql` const result = await sql`
INSERT INTO monitor_uptime_rollup (monitor_id, region, bucket_type, bucket_start, total, up_count, avg_latency) INSERT INTO monitor_uptime_rollup (monitor_id, region, bucket_type, bucket_start, total, up_count, avg_latency)
SELECT SELECT
@ -31,7 +32,7 @@ async function rollupCurrent(bucket: BucketType): Promise<number> {
avg(latency_ms)::real AS avg_latency avg(latency_ms)::real AS avg_latency
FROM pings FROM pings
WHERE checked_at >= date_trunc(${trunc}, now()) WHERE checked_at >= date_trunc(${trunc}, now())
GROUP BY monitor_id, COALESCE(region, 'default'), date_trunc(${trunc}, checked_at) GROUP BY 1, 2, 3, 4
ON CONFLICT (monitor_id, region, bucket_type, bucket_start) DO UPDATE SET ON CONFLICT (monitor_id, region, bucket_type, bucket_start) DO UPDATE SET
total = EXCLUDED.total, total = EXCLUDED.total,
up_count = EXCLUDED.up_count, up_count = EXCLUDED.up_count,
@ -59,7 +60,7 @@ async function backfillRecent(bucket: BucketType, units: number): Promise<number
avg(latency_ms)::real AS avg_latency avg(latency_ms)::real AS avg_latency
FROM pings FROM pings
WHERE checked_at >= date_trunc(${trunc}, now()) - ${intervalLiteral}::interval WHERE checked_at >= date_trunc(${trunc}, now()) - ${intervalLiteral}::interval
GROUP BY monitor_id, COALESCE(region, 'default'), date_trunc(${trunc}, checked_at) GROUP BY 1, 2, 3, 4
ON CONFLICT (monitor_id, region, bucket_type, bucket_start) DO NOTHING ON CONFLICT (monitor_id, region, bucket_type, bucket_start) DO NOTHING
`; `;
return result.count ?? 0; return result.count ?? 0;
@ -67,22 +68,43 @@ async function backfillRecent(bucket: BucketType, units: number): Promise<number
let started = false; let started = false;
async function rollupIsEmpty(bucket: BucketType): Promise<boolean> {
const [row] = await sql<{ exists: boolean }[]>`
SELECT EXISTS (SELECT 1 FROM monitor_uptime_rollup WHERE bucket_type = ${bucket} LIMIT 1) AS exists
`;
return !row?.exists;
}
export async function startRollupJob() { export async function startRollupJob() {
if (started) return; if (started) return;
started = true; started = true;
// Startup backfill: gives existing accounts immediate history without waiting // Startup backfill. Errors are logged BUT NOT swallowed silently anymore —
// for the periodic timers to wander backwards. Cheap because pings is indexed // we throw so a broken rollup query trips the api process and shows in the
// on checked_at and the units are bounded. // service logs immediately, instead of leaving the table mysteriously empty.
try { try {
const [h, d, w] = await Promise.all([ const [h, d, w] = await Promise.all([
backfillRecent("hourly", 48), // 48h of hourly buckets backfillRecent("hourly", 48),
backfillRecent("daily", 90), // 90 days of daily buckets backfillRecent("daily", 90),
backfillRecent("weekly", 26), // 26 weeks of weekly buckets backfillRecent("weekly", 26),
]); ]);
console.log(`[rollup] backfilled rows: hourly=${h} daily=${d} weekly=${w}`); console.log(`[rollup] backfilled rows: hourly=${h} daily=${d} weekly=${w}`);
} catch (e) { } catch (e) {
console.warn("[rollup] backfill failed:", e); console.error("[rollup] backfill FAILED — rollup table will be empty until fixed:", e);
}
// Force-run check: if any bucket type is still empty after the backfill,
// run rollupCurrent immediately so we have at least one row per type. This
// covers the "fresh deploy with very recent pings only" case.
try {
for (const b of ["hourly", "daily", "weekly"] as BucketType[]) {
if (await rollupIsEmpty(b)) {
console.log(`[rollup] ${b} still empty — forcing current-bucket aggregation`);
await rollupCurrent(b);
}
}
} catch (e) {
console.error("[rollup] force-run check failed:", e);
} }
// Periodic refreshes for the *current* bucket of each resolution. // Periodic refreshes for the *current* bucket of each resolution.

View File

@ -131,6 +131,8 @@ export async function loadMonitors(pageId: string, window: Window): Promise<Moni
// silently broken), aggregate directly from pings. Bounded by the window so // silently broken), aggregate directly from pings. Bounded by the window so
// it stays cheap. Once the rollup catches up this branch never fires. // it stays cheap. Once the rollup catches up this branch never fires.
if (rollupRows.length === 0) { if (rollupRows.length === 0) {
// Group/order by ordinals — Postgres won't dedupe a $-parameterised
// date_trunc() between SELECT and GROUP BY otherwise.
rollupRows = await sql<any[]>` rollupRows = await sql<any[]>`
SELECT SELECT
monitor_id, monitor_id,
@ -141,8 +143,8 @@ export async function loadMonitors(pageId: string, window: Window): Promise<Moni
FROM pings FROM pings
WHERE monitor_id = ANY(${sql.array(ids)}::text[]) WHERE monitor_id = ANY(${sql.array(ids)}::text[])
AND checked_at > date_trunc(${truncUnit}, now()) - ${intervalLiteral}::interval AND checked_at > date_trunc(${truncUnit}, now()) - ${intervalLiteral}::interval
GROUP BY monitor_id, date_trunc(${truncUnit}, checked_at) GROUP BY 1, 2
ORDER BY monitor_id, bucket_start ASC ORDER BY 1, 2 ASC
`; `;
} }
// Index actual rollup data by (monitor_id, isoBucketStart) so we can fill in // Index actual rollup data by (monitor_id, isoBucketStart) so we can fill in