From f71388a51af9fea1ce058f1ca8ca179b4660697b Mon Sep 17 00:00:00 2001 From: M1 Date: Tue, 17 Mar 2026 10:44:35 +0400 Subject: [PATCH] =?UTF-8?q?feat:=20jitter=5Fms=20tracking=20=E2=80=94=20sc?= =?UTF-8?q?heduled=5Fat=20stamped=20at=20dispatch,=20jitter=20computed=20o?= =?UTF-8?q?n=20ingest?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/monitor/src/runner.rs | 6 ++++-- apps/monitor/src/types.rs | 2 ++ apps/web/src/db.ts | 22 ++++++++++++++-------- apps/web/src/routes/internal.ts | 8 +++++--- apps/web/src/routes/pings.ts | 8 +++++++- 5 files changed, 32 insertions(+), 14 deletions(-) diff --git a/apps/monitor/src/runner.rs b/apps/monitor/src/runner.rs index 4befb41..85d8aab 100644 --- a/apps/monitor/src/runner.rs +++ b/apps/monitor/src/runner.rs @@ -44,7 +44,7 @@ pub async fn fetch_and_run( let token = token.to_string(); let in_flight = in_flight.clone(); tokio::spawn(async move { - let result = run_check(&client, &monitor).await; + let result = run_check(&client, &monitor, monitor.scheduled_at.clone()).await; // Remove from in-flight before posting so a fast next cycle can pick it up in_flight.lock().await.remove(&monitor.id); if let Err(e) = post_result(&client, &coordinator_url, &token, result).await { @@ -56,7 +56,7 @@ pub async fn fetch_and_run( Ok(spawned) } -async fn run_check(client: &reqwest::Client, monitor: &Monitor) -> PingResult { +async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Option) -> PingResult { let start = Instant::now(); // Check cert expiry for HTTPS URLs @@ -96,6 +96,7 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor) -> PingResult { match result { Err(e) => PingResult { monitor_id: monitor.id.clone(), + scheduled_at, status_code: None, latency_ms: Some(latency_ms), up: false, @@ -142,6 +143,7 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor) -> PingResult { PingResult { monitor_id: monitor.id.clone(), + scheduled_at, status_code: Some(status), latency_ms: Some(latency_ms), up, diff --git a/apps/monitor/src/types.rs b/apps/monitor/src/types.rs index 16d9ca5..22c4bcb 100644 --- a/apps/monitor/src/types.rs +++ b/apps/monitor/src/types.rs @@ -12,11 +12,13 @@ pub struct Monitor { pub timeout_ms: Option, pub interval_s: i64, pub query: Option, + pub scheduled_at: Option, } #[derive(Debug, Serialize)] pub struct PingResult { pub monitor_id: String, + pub scheduled_at: Option, pub status_code: Option, pub latency_ms: Option, pub up: bool, diff --git a/apps/web/src/db.ts b/apps/web/src/db.ts index a1d8342..efe07e9 100644 --- a/apps/web/src/db.ts +++ b/apps/web/src/db.ts @@ -33,17 +33,23 @@ export async function migrate() { await sql` CREATE TABLE IF NOT EXISTS pings ( - id BIGSERIAL PRIMARY KEY, - monitor_id TEXT NOT NULL REFERENCES monitors(id) ON DELETE CASCADE, - checked_at TIMESTAMPTZ NOT NULL DEFAULT now(), - status_code INTEGER, - latency_ms INTEGER, - up BOOLEAN NOT NULL, - error TEXT, - meta JSONB + id BIGSERIAL PRIMARY KEY, + monitor_id TEXT NOT NULL REFERENCES monitors(id) ON DELETE CASCADE, + checked_at TIMESTAMPTZ NOT NULL DEFAULT now(), + scheduled_at TIMESTAMPTZ, + jitter_ms INTEGER, + status_code INTEGER, + latency_ms INTEGER, + up BOOLEAN NOT NULL, + error TEXT, + meta JSONB ) `; + // Migrations for existing deployments + await sql`ALTER TABLE pings ADD COLUMN IF NOT EXISTS scheduled_at TIMESTAMPTZ`; + await sql`ALTER TABLE pings ADD COLUMN IF NOT EXISTS jitter_ms INTEGER`; + await sql`CREATE INDEX IF NOT EXISTS idx_pings_monitor ON pings(monitor_id, checked_at DESC)`; await sql`CREATE INDEX IF NOT EXISTS idx_pings_checked_at ON pings(checked_at)`; diff --git a/apps/web/src/routes/internal.ts b/apps/web/src/routes/internal.ts index 5f7995c..2834e31 100644 --- a/apps/web/src/routes/internal.ts +++ b/apps/web/src/routes/internal.ts @@ -22,10 +22,10 @@ export const internal = new Elysia({ prefix: "/internal", detail: { hide: true } return {}; }) - // Returns monitors that are due for a check + // Returns monitors that are due for a check, with scheduled_at = now() .get("/due", async () => { - - return sql` + const scheduled_at = new Date().toISOString(); + const monitors = await sql` SELECT m.id, m.url, m.method, m.request_headers, m.request_body, m.timeout_ms, m.interval_s, m.query FROM monitors m LEFT JOIN LATERAL ( @@ -37,6 +37,8 @@ export const internal = new Elysia({ prefix: "/internal", detail: { hide: true } AND (last.checked_at IS NULL OR last.checked_at < now() - (m.interval_s || ' seconds')::interval) `; + // Attach scheduled_at to each monitor so the runner can report jitter + return monitors.map((m: any) => ({ ...m, scheduled_at })); }) // Manual retention cleanup trigger diff --git a/apps/web/src/routes/pings.ts b/apps/web/src/routes/pings.ts index c1d619b..dc48114 100644 --- a/apps/web/src/routes/pings.ts +++ b/apps/web/src/routes/pings.ts @@ -56,10 +56,15 @@ export const ingest = new Elysia() const meta = body.meta ? { ...body.meta } : {}; if (body.cert_expiry_days != null) meta.cert_expiry_days = body.cert_expiry_days; + const scheduledAt = body.scheduled_at ? new Date(body.scheduled_at) : null; + const jitterMs = scheduledAt ? Math.max(0, Date.now() - scheduledAt.getTime()) : null; + const [ping] = await sql` - INSERT INTO pings (monitor_id, status_code, latency_ms, up, error, meta) + INSERT INTO pings (monitor_id, scheduled_at, jitter_ms, status_code, latency_ms, up, error, meta) VALUES ( ${body.monitor_id}, + ${scheduledAt}, + ${jitterMs}, ${body.status_code ?? null}, ${body.latency_ms ?? null}, ${body.up}, @@ -77,6 +82,7 @@ export const ingest = new Elysia() }, { body: t.Object({ monitor_id: t.String(), + scheduled_at: t.Optional(t.Nullable(t.String())), status_code: t.Optional(t.Number()), latency_ms: t.Optional(t.Number()), up: t.Boolean(),