feat: lookahead scheduling — API returns scheduled_at_ms, nodes sleep-until for tight coordination

This commit is contained in:
M1 2026-03-18 17:14:28 +04:00
parent 7b98ae78e5
commit c5eb514990
4 changed files with 40 additions and 24 deletions

View File

@ -31,18 +31,22 @@ export const internal = new Elysia({ prefix: "/internal", detail: { hide: true }
return {}; return {};
}) })
// Returns monitors that are due for a check. // Returns monitors due within the next `lookahead_ms` milliseconds (default 2000).
// When a region is provided, due-ness and scheduled_at are based on // Nodes receive scheduled_at as an exact unix ms timestamp and sleep until that
// the last ping from that specific region — so each region has its own // moment before firing — all regions coordinate to the same scheduled slot.
// independent schedule and they don't drift against each other.
.get("/due", async ({ request }) => { .get("/due", async ({ request }) => {
const region = new URL(request.url).searchParams.get('region') || undefined; const params = new URL(request.url).searchParams;
const region = params.get('region') || undefined;
const lookaheadMs = Math.min(Number(params.get('lookahead_ms') || 2000), 10000);
const monitors = await sql` const monitors = await sql`
SELECT m.id, m.url, m.method, m.request_headers, m.request_body, m.timeout_ms, m.interval_s, m.query, m.regions, SELECT m.id, m.url, m.method, m.request_headers, m.request_body, m.timeout_ms, m.interval_s, m.query, m.regions,
EXTRACT(EPOCH FROM (
CASE CASE
WHEN last.checked_at IS NULL THEN now() WHEN last.checked_at IS NULL THEN now()
ELSE last.checked_at + (m.interval_s || ' seconds')::interval ELSE last.checked_at + (m.interval_s || ' seconds')::interval
END AS scheduled_at END
))::bigint * 1000 AS scheduled_at_ms
FROM monitors m FROM monitors m
LEFT JOIN LATERAL ( LEFT JOIN LATERAL (
SELECT checked_at FROM pings SELECT checked_at FROM pings
@ -52,7 +56,7 @@ export const internal = new Elysia({ prefix: "/internal", detail: { hide: true }
) last ON true ) last ON true
WHERE m.enabled = true WHERE m.enabled = true
AND (last.checked_at IS NULL AND (last.checked_at IS NULL
OR last.checked_at < now() - (m.interval_s || ' seconds')::interval) OR last.checked_at < now() - (m.interval_s || ' seconds')::interval + (${lookaheadMs} || ' milliseconds')::interval)
AND ( AND (
array_length(m.regions, 1) IS NULL array_length(m.regions, 1) IS NULL
OR m.regions = '{}' OR m.regions = '{}'

View File

@ -42,6 +42,6 @@ async fn main() -> Result<()> {
Ok(n) => { if n > 0 { info!("Spawned {n} checks"); } }, Ok(n) => { if n > 0 { info!("Spawned {n} checks"); } },
Err(e) => error!("Check cycle failed: {e}"), Err(e) => error!("Check cycle failed: {e}"),
} }
sleep(Duration::from_millis(50)).await; sleep(Duration::from_millis(1000)).await;
} }
} }

View File

@ -16,11 +16,12 @@ pub async fn fetch_and_run(
region: &str, region: &str,
in_flight: &Arc<Mutex<HashSet<String>>>, in_flight: &Arc<Mutex<HashSet<String>>>,
) -> Result<usize> { ) -> Result<usize> {
// Fetch due monitors for this region // Fetch monitors due within the next 2s — nodes receive exact scheduled_at_ms
// and sleep until that moment, so all regions fire in tight coordination.
let url = if region.is_empty() { let url = if region.is_empty() {
format!("{coordinator_url}/internal/due") format!("{coordinator_url}/internal/due?lookahead_ms=2000")
} else { } else {
format!("{coordinator_url}/internal/due?region={}", region) format!("{coordinator_url}/internal/due?region={}&lookahead_ms=2000", region)
}; };
let monitors: Vec<Monitor> = client let monitors: Vec<Monitor> = client
.get(&url) .get(&url)
@ -53,31 +54,41 @@ pub async fn fetch_and_run(
let coordinator_url = coordinator_url.to_string(); let coordinator_url = coordinator_url.to_string();
let token = token.to_string(); let token = token.to_string();
let region_owned = region.to_string(); let region_owned = region.to_string();
// Derive run_id by hashing (monitor_id, interval_bucket) so every region // run_id: hash(monitor_id, interval_bucket) — same across all regions for this window
// checking within the same scheduled window gets the same short ID.
let run_id_owned = { let run_id_owned = {
use std::collections::hash_map::DefaultHasher; use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
let epoch = monitor.scheduled_at.as_deref() let bucket = monitor.scheduled_at_ms.unwrap_or_else(|| chrono::Utc::now().timestamp_millis())
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) / (monitor.interval_s * 1000);
.map(|dt| dt.timestamp())
.unwrap_or_else(|| chrono::Utc::now().timestamp());
let bucket = epoch / monitor.interval_s;
let mut h = DefaultHasher::new(); let mut h = DefaultHasher::new();
monitor.id.hash(&mut h); monitor.id.hash(&mut h);
bucket.hash(&mut h); bucket.hash(&mut h);
format!("{:016x}", h.finish()) format!("{:016x}", h.finish())
}; };
// Convert scheduled_at_ms to an ISO string for storage in the ping
let scheduled_at_iso = monitor.scheduled_at_ms.map(|ms| {
chrono::DateTime::<chrono::Utc>::from_timestamp_millis(ms)
.map(|dt| dt.to_rfc3339())
.unwrap_or_default()
});
let in_flight = in_flight.clone(); let in_flight = in_flight.clone();
tokio::spawn(async move { tokio::spawn(async move {
// Sleep until the exact scheduled moment — tight multi-region coordination
if let Some(ms) = monitor.scheduled_at_ms {
let now_ms = chrono::Utc::now().timestamp_millis();
if ms > now_ms {
let wait = std::time::Duration::from_millis((ms - now_ms) as u64);
tokio::time::sleep(wait).await;
}
}
let timeout_ms = monitor.timeout_ms.unwrap_or(30000); let timeout_ms = monitor.timeout_ms.unwrap_or(30000);
// Hard deadline: timeout + 5s buffer, so hung checks always resolve // Hard deadline: timeout + 5s buffer, so hung checks always resolve
let deadline = std::time::Duration::from_millis(timeout_ms + 5000); let deadline = std::time::Duration::from_millis(timeout_ms + 5000);
let result = match tokio::time::timeout(deadline, run_check(&client, &monitor, monitor.scheduled_at.clone(), &region_owned, &run_id_owned)).await { let result = match tokio::time::timeout(deadline, run_check(&client, &monitor, scheduled_at_iso.clone(), &region_owned, &run_id_owned)).await {
Ok(r) => r, Ok(r) => r,
Err(_) => PingResult { Err(_) => PingResult {
monitor_id: monitor.id.clone(), monitor_id: monitor.id.clone(),
scheduled_at: monitor.scheduled_at.clone(), scheduled_at: scheduled_at_iso.clone(),
jitter_ms: None, jitter_ms: None,
status_code: None, status_code: None,
latency_ms: Some(timeout_ms as u64), latency_ms: Some(timeout_ms as u64),

View File

@ -12,7 +12,8 @@ pub struct Monitor {
pub timeout_ms: Option<u64>, pub timeout_ms: Option<u64>,
pub interval_s: i64, pub interval_s: i64,
pub query: Option<Value>, pub query: Option<Value>,
pub scheduled_at: Option<String>, pub scheduled_at: Option<String>, // ISO string for backward compat in PingResult
pub scheduled_at_ms: Option<i64>, // unix ms from API for precise scheduling
pub regions: Option<Vec<String>>, pub regions: Option<Vec<String>>,
} }