diff --git a/apps/api/src/routes/internal.ts b/apps/api/src/routes/internal.ts index b95388c..d0174a8 100644 --- a/apps/api/src/routes/internal.ts +++ b/apps/api/src/routes/internal.ts @@ -31,18 +31,22 @@ export const internal = new Elysia({ prefix: "/internal", detail: { hide: true } return {}; }) - // Returns monitors that are due for a check. - // When a region is provided, due-ness and scheduled_at are based on - // the last ping from that specific region — so each region has its own - // independent schedule and they don't drift against each other. + // Returns monitors due within the next `lookahead_ms` milliseconds (default 2000). + // Nodes receive scheduled_at as an exact unix ms timestamp and sleep until that + // moment before firing — all regions coordinate to the same scheduled slot. .get("/due", async ({ request }) => { - const region = new URL(request.url).searchParams.get('region') || undefined; + const params = new URL(request.url).searchParams; + const region = params.get('region') || undefined; + const lookaheadMs = Math.min(Number(params.get('lookahead_ms') || 2000), 10000); + const monitors = await sql` SELECT m.id, m.url, m.method, m.request_headers, m.request_body, m.timeout_ms, m.interval_s, m.query, m.regions, - CASE - WHEN last.checked_at IS NULL THEN now() - ELSE last.checked_at + (m.interval_s || ' seconds')::interval - END AS scheduled_at + EXTRACT(EPOCH FROM ( + CASE + WHEN last.checked_at IS NULL THEN now() + ELSE last.checked_at + (m.interval_s || ' seconds')::interval + END + ))::bigint * 1000 AS scheduled_at_ms FROM monitors m LEFT JOIN LATERAL ( SELECT checked_at FROM pings @@ -52,7 +56,7 @@ export const internal = new Elysia({ prefix: "/internal", detail: { hide: true } ) last ON true WHERE m.enabled = true AND (last.checked_at IS NULL - OR last.checked_at < now() - (m.interval_s || ' seconds')::interval) + OR last.checked_at < now() - (m.interval_s || ' seconds')::interval + (${lookaheadMs} || ' milliseconds')::interval) AND ( array_length(m.regions, 1) IS NULL OR m.regions = '{}' diff --git a/apps/monitor/src/main.rs b/apps/monitor/src/main.rs index bc8fefd..6343050 100644 --- a/apps/monitor/src/main.rs +++ b/apps/monitor/src/main.rs @@ -42,6 +42,6 @@ async fn main() -> Result<()> { Ok(n) => { if n > 0 { info!("Spawned {n} checks"); } }, Err(e) => error!("Check cycle failed: {e}"), } - sleep(Duration::from_millis(50)).await; + sleep(Duration::from_millis(1000)).await; } } diff --git a/apps/monitor/src/runner.rs b/apps/monitor/src/runner.rs index 7b71837..a9b7335 100644 --- a/apps/monitor/src/runner.rs +++ b/apps/monitor/src/runner.rs @@ -16,11 +16,12 @@ pub async fn fetch_and_run( region: &str, in_flight: &Arc>>, ) -> Result { - // Fetch due monitors for this region + // Fetch monitors due within the next 2s — nodes receive exact scheduled_at_ms + // and sleep until that moment, so all regions fire in tight coordination. let url = if region.is_empty() { - format!("{coordinator_url}/internal/due") + format!("{coordinator_url}/internal/due?lookahead_ms=2000") } else { - format!("{coordinator_url}/internal/due?region={}", region) + format!("{coordinator_url}/internal/due?region={}&lookahead_ms=2000", region) }; let monitors: Vec = client .get(&url) @@ -53,31 +54,41 @@ pub async fn fetch_and_run( let coordinator_url = coordinator_url.to_string(); let token = token.to_string(); let region_owned = region.to_string(); - // Derive run_id by hashing (monitor_id, interval_bucket) so every region - // checking within the same scheduled window gets the same short ID. + // run_id: hash(monitor_id, interval_bucket) — same across all regions for this window let run_id_owned = { use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; - let epoch = monitor.scheduled_at.as_deref() - .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) - .map(|dt| dt.timestamp()) - .unwrap_or_else(|| chrono::Utc::now().timestamp()); - let bucket = epoch / monitor.interval_s; + let bucket = monitor.scheduled_at_ms.unwrap_or_else(|| chrono::Utc::now().timestamp_millis()) + / (monitor.interval_s * 1000); let mut h = DefaultHasher::new(); monitor.id.hash(&mut h); bucket.hash(&mut h); format!("{:016x}", h.finish()) }; + // Convert scheduled_at_ms to an ISO string for storage in the ping + let scheduled_at_iso = monitor.scheduled_at_ms.map(|ms| { + chrono::DateTime::::from_timestamp_millis(ms) + .map(|dt| dt.to_rfc3339()) + .unwrap_or_default() + }); let in_flight = in_flight.clone(); tokio::spawn(async move { + // Sleep until the exact scheduled moment — tight multi-region coordination + if let Some(ms) = monitor.scheduled_at_ms { + let now_ms = chrono::Utc::now().timestamp_millis(); + if ms > now_ms { + let wait = std::time::Duration::from_millis((ms - now_ms) as u64); + tokio::time::sleep(wait).await; + } + } let timeout_ms = monitor.timeout_ms.unwrap_or(30000); // Hard deadline: timeout + 5s buffer, so hung checks always resolve let deadline = std::time::Duration::from_millis(timeout_ms + 5000); - let result = match tokio::time::timeout(deadline, run_check(&client, &monitor, monitor.scheduled_at.clone(), ®ion_owned, &run_id_owned)).await { + let result = match tokio::time::timeout(deadline, run_check(&client, &monitor, scheduled_at_iso.clone(), ®ion_owned, &run_id_owned)).await { Ok(r) => r, Err(_) => PingResult { monitor_id: monitor.id.clone(), - scheduled_at: monitor.scheduled_at.clone(), + scheduled_at: scheduled_at_iso.clone(), jitter_ms: None, status_code: None, latency_ms: Some(timeout_ms as u64), diff --git a/apps/monitor/src/types.rs b/apps/monitor/src/types.rs index 6c38c05..74e7dd8 100644 --- a/apps/monitor/src/types.rs +++ b/apps/monitor/src/types.rs @@ -12,7 +12,8 @@ pub struct Monitor { pub timeout_ms: Option, pub interval_s: i64, pub query: Option, - pub scheduled_at: Option, + pub scheduled_at: Option, // ISO string for backward compat in PingResult + pub scheduled_at_ms: Option, // unix ms from API for precise scheduling pub regions: Option>, }