feat: jitter_ms tracking — scheduled_at stamped at dispatch, jitter computed on ingest

This commit is contained in:
M1 2026-03-17 10:44:35 +04:00
parent 5c91cbc522
commit f71388a51a
5 changed files with 32 additions and 14 deletions

View File

@ -44,7 +44,7 @@ pub async fn fetch_and_run(
let token = token.to_string(); let token = token.to_string();
let in_flight = in_flight.clone(); let in_flight = in_flight.clone();
tokio::spawn(async move { tokio::spawn(async move {
let result = run_check(&client, &monitor).await; let result = run_check(&client, &monitor, monitor.scheduled_at.clone()).await;
// Remove from in-flight before posting so a fast next cycle can pick it up // Remove from in-flight before posting so a fast next cycle can pick it up
in_flight.lock().await.remove(&monitor.id); in_flight.lock().await.remove(&monitor.id);
if let Err(e) = post_result(&client, &coordinator_url, &token, result).await { if let Err(e) = post_result(&client, &coordinator_url, &token, result).await {
@ -56,7 +56,7 @@ pub async fn fetch_and_run(
Ok(spawned) Ok(spawned)
} }
async fn run_check(client: &reqwest::Client, monitor: &Monitor) -> PingResult { async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Option<String>) -> PingResult {
let start = Instant::now(); let start = Instant::now();
// Check cert expiry for HTTPS URLs // Check cert expiry for HTTPS URLs
@ -96,6 +96,7 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor) -> PingResult {
match result { match result {
Err(e) => PingResult { Err(e) => PingResult {
monitor_id: monitor.id.clone(), monitor_id: monitor.id.clone(),
scheduled_at,
status_code: None, status_code: None,
latency_ms: Some(latency_ms), latency_ms: Some(latency_ms),
up: false, up: false,
@ -142,6 +143,7 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor) -> PingResult {
PingResult { PingResult {
monitor_id: monitor.id.clone(), monitor_id: monitor.id.clone(),
scheduled_at,
status_code: Some(status), status_code: Some(status),
latency_ms: Some(latency_ms), latency_ms: Some(latency_ms),
up, up,

View File

@ -12,11 +12,13 @@ pub struct Monitor {
pub timeout_ms: Option<u64>, pub timeout_ms: Option<u64>,
pub interval_s: i64, pub interval_s: i64,
pub query: Option<Value>, pub query: Option<Value>,
pub scheduled_at: Option<String>,
} }
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
pub struct PingResult { pub struct PingResult {
pub monitor_id: String, pub monitor_id: String,
pub scheduled_at: Option<String>,
pub status_code: Option<u16>, pub status_code: Option<u16>,
pub latency_ms: Option<u64>, pub latency_ms: Option<u64>,
pub up: bool, pub up: bool,

View File

@ -33,17 +33,23 @@ export async function migrate() {
await sql` await sql`
CREATE TABLE IF NOT EXISTS pings ( CREATE TABLE IF NOT EXISTS pings (
id BIGSERIAL PRIMARY KEY, id BIGSERIAL PRIMARY KEY,
monitor_id TEXT NOT NULL REFERENCES monitors(id) ON DELETE CASCADE, monitor_id TEXT NOT NULL REFERENCES monitors(id) ON DELETE CASCADE,
checked_at TIMESTAMPTZ NOT NULL DEFAULT now(), checked_at TIMESTAMPTZ NOT NULL DEFAULT now(),
status_code INTEGER, scheduled_at TIMESTAMPTZ,
latency_ms INTEGER, jitter_ms INTEGER,
up BOOLEAN NOT NULL, status_code INTEGER,
error TEXT, latency_ms INTEGER,
meta JSONB up BOOLEAN NOT NULL,
error TEXT,
meta JSONB
) )
`; `;
// Migrations for existing deployments
await sql`ALTER TABLE pings ADD COLUMN IF NOT EXISTS scheduled_at TIMESTAMPTZ`;
await sql`ALTER TABLE pings ADD COLUMN IF NOT EXISTS jitter_ms INTEGER`;
await sql`CREATE INDEX IF NOT EXISTS idx_pings_monitor ON pings(monitor_id, checked_at DESC)`; await sql`CREATE INDEX IF NOT EXISTS idx_pings_monitor ON pings(monitor_id, checked_at DESC)`;
await sql`CREATE INDEX IF NOT EXISTS idx_pings_checked_at ON pings(checked_at)`; await sql`CREATE INDEX IF NOT EXISTS idx_pings_checked_at ON pings(checked_at)`;

View File

@ -22,10 +22,10 @@ export const internal = new Elysia({ prefix: "/internal", detail: { hide: true }
return {}; return {};
}) })
// Returns monitors that are due for a check // Returns monitors that are due for a check, with scheduled_at = now()
.get("/due", async () => { .get("/due", async () => {
const scheduled_at = new Date().toISOString();
return sql` const monitors = await sql`
SELECT m.id, m.url, m.method, m.request_headers, m.request_body, m.timeout_ms, m.interval_s, m.query SELECT m.id, m.url, m.method, m.request_headers, m.request_body, m.timeout_ms, m.interval_s, m.query
FROM monitors m FROM monitors m
LEFT JOIN LATERAL ( LEFT JOIN LATERAL (
@ -37,6 +37,8 @@ export const internal = new Elysia({ prefix: "/internal", detail: { hide: true }
AND (last.checked_at IS NULL AND (last.checked_at IS NULL
OR last.checked_at < now() - (m.interval_s || ' seconds')::interval) OR last.checked_at < now() - (m.interval_s || ' seconds')::interval)
`; `;
// Attach scheduled_at to each monitor so the runner can report jitter
return monitors.map((m: any) => ({ ...m, scheduled_at }));
}) })
// Manual retention cleanup trigger // Manual retention cleanup trigger

View File

@ -56,10 +56,15 @@ export const ingest = new Elysia()
const meta = body.meta ? { ...body.meta } : {}; const meta = body.meta ? { ...body.meta } : {};
if (body.cert_expiry_days != null) meta.cert_expiry_days = body.cert_expiry_days; if (body.cert_expiry_days != null) meta.cert_expiry_days = body.cert_expiry_days;
const scheduledAt = body.scheduled_at ? new Date(body.scheduled_at) : null;
const jitterMs = scheduledAt ? Math.max(0, Date.now() - scheduledAt.getTime()) : null;
const [ping] = await sql` const [ping] = await sql`
INSERT INTO pings (monitor_id, status_code, latency_ms, up, error, meta) INSERT INTO pings (monitor_id, scheduled_at, jitter_ms, status_code, latency_ms, up, error, meta)
VALUES ( VALUES (
${body.monitor_id}, ${body.monitor_id},
${scheduledAt},
${jitterMs},
${body.status_code ?? null}, ${body.status_code ?? null},
${body.latency_ms ?? null}, ${body.latency_ms ?? null},
${body.up}, ${body.up},
@ -77,6 +82,7 @@ export const ingest = new Elysia()
}, { }, {
body: t.Object({ body: t.Object({
monitor_id: t.String(), monitor_id: t.String(),
scheduled_at: t.Optional(t.Nullable(t.String())),
status_code: t.Optional(t.Number()), status_code: t.Optional(t.Number()),
latency_ms: t.Optional(t.Number()), latency_ms: t.Optional(t.Number()),
up: t.Boolean(), up: t.Boolean(),