feat: per-region due scheduling + run_id to group pings across regions

This commit is contained in:
M1 2026-03-18 16:36:35 +04:00
parent e057a65535
commit f7ab3b96b2
6 changed files with 20 additions and 5 deletions

View File

@ -51,6 +51,7 @@ export async function migrate() {
await sql`ALTER TABLE pings ADD COLUMN IF NOT EXISTS jitter_ms INTEGER`; await sql`ALTER TABLE pings ADD COLUMN IF NOT EXISTS jitter_ms INTEGER`;
await sql`ALTER TABLE monitors ADD COLUMN IF NOT EXISTS regions TEXT[] NOT NULL DEFAULT '{}'`; await sql`ALTER TABLE monitors ADD COLUMN IF NOT EXISTS regions TEXT[] NOT NULL DEFAULT '{}'`;
await sql`ALTER TABLE pings ADD COLUMN IF NOT EXISTS region TEXT`; await sql`ALTER TABLE pings ADD COLUMN IF NOT EXISTS region TEXT`;
await sql`ALTER TABLE pings ADD COLUMN IF NOT EXISTS run_id TEXT`;
await sql`CREATE INDEX IF NOT EXISTS idx_pings_monitor ON pings(monitor_id, checked_at DESC)`; await sql`CREATE INDEX IF NOT EXISTS idx_pings_monitor ON pings(monitor_id, checked_at DESC)`;
await sql`CREATE INDEX IF NOT EXISTS idx_pings_checked_at ON pings(checked_at)`; await sql`CREATE INDEX IF NOT EXISTS idx_pings_checked_at ON pings(checked_at)`;

View File

@ -32,7 +32,9 @@ export const internal = new Elysia({ prefix: "/internal", detail: { hide: true }
}) })
// Returns monitors that are due for a check. // Returns monitors that are due for a check.
// scheduled_at = last_checked_at + interval_s (ideal fire time), so jitter = actual_start - scheduled_at // When a region is provided, due-ness and scheduled_at are based on
// the last ping from that specific region — so each region has its own
// independent schedule and they don't drift against each other.
.get("/due", async ({ request }) => { .get("/due", async ({ request }) => {
const region = new URL(request.url).searchParams.get('region') || undefined; const region = new URL(request.url).searchParams.get('region') || undefined;
const monitors = await sql` const monitors = await sql`
@ -45,6 +47,7 @@ export const internal = new Elysia({ prefix: "/internal", detail: { hide: true }
LEFT JOIN LATERAL ( LEFT JOIN LATERAL (
SELECT checked_at FROM pings SELECT checked_at FROM pings
WHERE monitor_id = m.id WHERE monitor_id = m.id
AND (${region ? sql`region = ${region}` : sql`true`})
ORDER BY checked_at DESC LIMIT 1 ORDER BY checked_at DESC LIMIT 1
) last ON true ) last ON true
WHERE m.enabled = true WHERE m.enabled = true

View File

@ -73,7 +73,7 @@ export const ingest = new Elysia()
const jitterMs = body.jitter_ms ?? null; const jitterMs = body.jitter_ms ?? null;
const [ping] = await sql` const [ping] = await sql`
INSERT INTO pings (monitor_id, scheduled_at, jitter_ms, status_code, latency_ms, up, error, meta, region) INSERT INTO pings (monitor_id, scheduled_at, jitter_ms, status_code, latency_ms, up, error, meta, region, run_id)
VALUES ( VALUES (
${body.monitor_id}, ${body.monitor_id},
${scheduledAt}, ${scheduledAt},
@ -83,7 +83,8 @@ export const ingest = new Elysia()
${body.up}, ${body.up},
${body.error ?? null}, ${body.error ?? null},
${Object.keys(meta).length > 0 ? sql.json(meta) : null}, ${Object.keys(meta).length > 0 ? sql.json(meta) : null},
${body.region ?? null} ${body.region ?? null},
${body.run_id ?? null}
) )
RETURNING * RETURNING *
`; `;
@ -105,6 +106,7 @@ export const ingest = new Elysia()
cert_expiry_days: t.Optional(t.Nullable(t.Number())), cert_expiry_days: t.Optional(t.Nullable(t.Number())),
meta: t.Optional(t.Any()), meta: t.Optional(t.Any()),
region: t.Optional(t.Nullable(t.String())), region: t.Optional(t.Nullable(t.String())),
run_id: t.Optional(t.Nullable(t.String())),
}), }),
detail: { hide: true }, detail: { hide: true },
}) })

View File

@ -20,3 +20,4 @@ webpki-roots = "0.26"
x509-parser = "0.16" x509-parser = "0.16"
tokio-rustls = "0.26" tokio-rustls = "0.26"
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1", features = ["v4"] }

View File

@ -33,6 +33,9 @@ pub async fn fetch_and_run(
let n = monitors.len(); let n = monitors.len();
if n == 0 { return Ok(0); } if n == 0 { return Ok(0); }
// One run_id per fetch batch — groups all checks from this cycle across regions
let run_id = uuid::Uuid::new_v4().to_string();
// Spawn all checks — fire and forget, skip if already in-flight // Spawn all checks — fire and forget, skip if already in-flight
let mut spawned = 0usize; let mut spawned = 0usize;
for monitor in monitors { for monitor in monitors {
@ -49,12 +52,13 @@ pub async fn fetch_and_run(
let coordinator_url = coordinator_url.to_string(); let coordinator_url = coordinator_url.to_string();
let token = token.to_string(); let token = token.to_string();
let region_owned = region.to_string(); let region_owned = region.to_string();
let run_id_owned = run_id.clone();
let in_flight = in_flight.clone(); let in_flight = in_flight.clone();
tokio::spawn(async move { tokio::spawn(async move {
let timeout_ms = monitor.timeout_ms.unwrap_or(30000); let timeout_ms = monitor.timeout_ms.unwrap_or(30000);
// Hard deadline: timeout + 5s buffer, so hung checks always resolve // Hard deadline: timeout + 5s buffer, so hung checks always resolve
let deadline = std::time::Duration::from_millis(timeout_ms + 5000); let deadline = std::time::Duration::from_millis(timeout_ms + 5000);
let result = match tokio::time::timeout(deadline, run_check(&client, &monitor, monitor.scheduled_at.clone(), &region_owned)).await { let result = match tokio::time::timeout(deadline, run_check(&client, &monitor, monitor.scheduled_at.clone(), &region_owned, &run_id_owned)).await {
Ok(r) => r, Ok(r) => r,
Err(_) => PingResult { Err(_) => PingResult {
monitor_id: monitor.id.clone(), monitor_id: monitor.id.clone(),
@ -67,6 +71,7 @@ pub async fn fetch_and_run(
cert_expiry_days: None, cert_expiry_days: None,
meta: None, meta: None,
region: if region_owned.is_empty() { None } else { Some(region_owned.clone()) }, region: if region_owned.is_empty() { None } else { Some(region_owned.clone()) },
run_id: Some(run_id_owned.clone()),
}, },
}; };
// Post result first, then clear in-flight — this prevents the next // Post result first, then clear in-flight — this prevents the next
@ -81,7 +86,7 @@ pub async fn fetch_and_run(
Ok(spawned) Ok(spawned)
} }
async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Option<String>, region: &str) -> PingResult { async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Option<String>, region: &str, run_id: &str) -> PingResult {
// Compute jitter: how late we actually started vs when we were scheduled // Compute jitter: how late we actually started vs when we were scheduled
let jitter_ms: Option<i64> = scheduled_at.as_deref().and_then(|s| { let jitter_ms: Option<i64> = scheduled_at.as_deref().and_then(|s| {
let scheduled = chrono::DateTime::parse_from_rfc3339(s).ok()?; let scheduled = chrono::DateTime::parse_from_rfc3339(s).ok()?;
@ -135,6 +140,7 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Op
cert_expiry_days: None, cert_expiry_days: None,
meta: None, meta: None,
region: if region.is_empty() { None } else { Some(region.to_string()) }, region: if region.is_empty() { None } else { Some(region.to_string()) },
run_id: Some(run_id.to_string()),
} }
}, },
Ok((status_code, headers, body)) => { Ok((status_code, headers, body)) => {
@ -195,6 +201,7 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Op
cert_expiry_days, cert_expiry_days,
meta: Some(meta), meta: Some(meta),
region: if region.is_empty() { None } else { Some(region.to_string()) }, region: if region.is_empty() { None } else { Some(region.to_string()) },
run_id: Some(run_id.to_string()),
} }
} }
} }

View File

@ -28,4 +28,5 @@ pub struct PingResult {
pub cert_expiry_days: Option<i64>, pub cert_expiry_days: Option<i64>,
pub meta: Option<Value>, pub meta: Option<Value>,
pub region: Option<String>, pub region: Option<String>,
pub run_id: Option<String>,
} }