diff --git a/apps/api/src/db.ts b/apps/api/src/db.ts index 5b87a65..7bd2823 100644 --- a/apps/api/src/db.ts +++ b/apps/api/src/db.ts @@ -51,6 +51,7 @@ export async function migrate() { await sql`ALTER TABLE pings ADD COLUMN IF NOT EXISTS jitter_ms INTEGER`; await sql`ALTER TABLE monitors ADD COLUMN IF NOT EXISTS regions TEXT[] NOT NULL DEFAULT '{}'`; await sql`ALTER TABLE pings ADD COLUMN IF NOT EXISTS region TEXT`; + await sql`ALTER TABLE pings ADD COLUMN IF NOT EXISTS run_id TEXT`; await sql`CREATE INDEX IF NOT EXISTS idx_pings_monitor ON pings(monitor_id, checked_at DESC)`; await sql`CREATE INDEX IF NOT EXISTS idx_pings_checked_at ON pings(checked_at)`; diff --git a/apps/api/src/routes/internal.ts b/apps/api/src/routes/internal.ts index c9f5ebf..b95388c 100644 --- a/apps/api/src/routes/internal.ts +++ b/apps/api/src/routes/internal.ts @@ -32,7 +32,9 @@ export const internal = new Elysia({ prefix: "/internal", detail: { hide: true } }) // Returns monitors that are due for a check. - // scheduled_at = last_checked_at + interval_s (ideal fire time), so jitter = actual_start - scheduled_at + // When a region is provided, due-ness and scheduled_at are based on + // the last ping from that specific region — so each region has its own + // independent schedule and they don't drift against each other. .get("/due", async ({ request }) => { const region = new URL(request.url).searchParams.get('region') || undefined; const monitors = await sql` @@ -45,6 +47,7 @@ export const internal = new Elysia({ prefix: "/internal", detail: { hide: true } LEFT JOIN LATERAL ( SELECT checked_at FROM pings WHERE monitor_id = m.id + AND (${region ? sql`region = ${region}` : sql`true`}) ORDER BY checked_at DESC LIMIT 1 ) last ON true WHERE m.enabled = true diff --git a/apps/api/src/routes/pings.ts b/apps/api/src/routes/pings.ts index 0abb078..a5e4106 100644 --- a/apps/api/src/routes/pings.ts +++ b/apps/api/src/routes/pings.ts @@ -73,7 +73,7 @@ export const ingest = new Elysia() const jitterMs = body.jitter_ms ?? null; const [ping] = await sql` - INSERT INTO pings (monitor_id, scheduled_at, jitter_ms, status_code, latency_ms, up, error, meta, region) + INSERT INTO pings (monitor_id, scheduled_at, jitter_ms, status_code, latency_ms, up, error, meta, region, run_id) VALUES ( ${body.monitor_id}, ${scheduledAt}, @@ -83,7 +83,8 @@ export const ingest = new Elysia() ${body.up}, ${body.error ?? null}, ${Object.keys(meta).length > 0 ? sql.json(meta) : null}, - ${body.region ?? null} + ${body.region ?? null}, + ${body.run_id ?? null} ) RETURNING * `; @@ -105,6 +106,7 @@ export const ingest = new Elysia() cert_expiry_days: t.Optional(t.Nullable(t.Number())), meta: t.Optional(t.Any()), region: t.Optional(t.Nullable(t.String())), + run_id: t.Optional(t.Nullable(t.String())), }), detail: { hide: true }, }) diff --git a/apps/monitor/Cargo.toml b/apps/monitor/Cargo.toml index 2f82bfe..7fcc5cd 100644 --- a/apps/monitor/Cargo.toml +++ b/apps/monitor/Cargo.toml @@ -20,3 +20,4 @@ webpki-roots = "0.26" x509-parser = "0.16" tokio-rustls = "0.26" chrono = { version = "0.4", features = ["serde"] } +uuid = { version = "1", features = ["v4"] } diff --git a/apps/monitor/src/runner.rs b/apps/monitor/src/runner.rs index 03fd085..37a919a 100644 --- a/apps/monitor/src/runner.rs +++ b/apps/monitor/src/runner.rs @@ -33,6 +33,9 @@ pub async fn fetch_and_run( let n = monitors.len(); if n == 0 { return Ok(0); } + // One run_id per fetch batch — groups all checks from this cycle across regions + let run_id = uuid::Uuid::new_v4().to_string(); + // Spawn all checks — fire and forget, skip if already in-flight let mut spawned = 0usize; for monitor in monitors { @@ -49,12 +52,13 @@ pub async fn fetch_and_run( let coordinator_url = coordinator_url.to_string(); let token = token.to_string(); let region_owned = region.to_string(); + let run_id_owned = run_id.clone(); let in_flight = in_flight.clone(); tokio::spawn(async move { let timeout_ms = monitor.timeout_ms.unwrap_or(30000); // Hard deadline: timeout + 5s buffer, so hung checks always resolve let deadline = std::time::Duration::from_millis(timeout_ms + 5000); - let result = match tokio::time::timeout(deadline, run_check(&client, &monitor, monitor.scheduled_at.clone(), ®ion_owned)).await { + let result = match tokio::time::timeout(deadline, run_check(&client, &monitor, monitor.scheduled_at.clone(), ®ion_owned, &run_id_owned)).await { Ok(r) => r, Err(_) => PingResult { monitor_id: monitor.id.clone(), @@ -67,6 +71,7 @@ pub async fn fetch_and_run( cert_expiry_days: None, meta: None, region: if region_owned.is_empty() { None } else { Some(region_owned.clone()) }, + run_id: Some(run_id_owned.clone()), }, }; // Post result first, then clear in-flight — this prevents the next @@ -81,7 +86,7 @@ pub async fn fetch_and_run( Ok(spawned) } -async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Option, region: &str) -> PingResult { +async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Option, region: &str, run_id: &str) -> PingResult { // Compute jitter: how late we actually started vs when we were scheduled let jitter_ms: Option = scheduled_at.as_deref().and_then(|s| { let scheduled = chrono::DateTime::parse_from_rfc3339(s).ok()?; @@ -135,6 +140,7 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Op cert_expiry_days: None, meta: None, region: if region.is_empty() { None } else { Some(region.to_string()) }, + run_id: Some(run_id.to_string()), } }, Ok((status_code, headers, body)) => { @@ -195,6 +201,7 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Op cert_expiry_days, meta: Some(meta), region: if region.is_empty() { None } else { Some(region.to_string()) }, + run_id: Some(run_id.to_string()), } } } diff --git a/apps/monitor/src/types.rs b/apps/monitor/src/types.rs index 7bb98d4..6c38c05 100644 --- a/apps/monitor/src/types.rs +++ b/apps/monitor/src/types.rs @@ -28,4 +28,5 @@ pub struct PingResult { pub cert_expiry_days: Option, pub meta: Option, pub region: Option, + pub run_id: Option, }