From 1f01a00ad66091eea11cd605a4c6707edb0c9482 Mon Sep 17 00:00:00 2001 From: nate Date: Wed, 8 Apr 2026 08:58:44 +0400 Subject: [PATCH] feat: refactor stage 1 --- apps/api/src/routes/internal.ts | 1 + apps/api/src/routes/monitors.ts | 11 +++- apps/api/src/routes/pings.ts | 36 +++++++++-- apps/monitor/src/runner.rs | 62 +++++++++++++++---- apps/monitor/src/types.rs | 4 ++ apps/shared/db.ts | 7 +++ apps/web/src/routes/dashboard.ts | 6 ++ .../src/views/partials/monitor-form-js.ejs | 3 + apps/web/src/views/partials/monitor-form.ejs | 30 +++++++++ 9 files changed, 143 insertions(+), 17 deletions(-) diff --git a/apps/api/src/routes/internal.ts b/apps/api/src/routes/internal.ts index dce4218..17d55c4 100644 --- a/apps/api/src/routes/internal.ts +++ b/apps/api/src/routes/internal.ts @@ -34,6 +34,7 @@ export const internal = new Elysia({ prefix: "/internal", detail: { hide: true } const monitors = await sql` SELECT m.id, m.url, m.method, m.request_headers, m.request_body, m.timeout_ms, m.interval_s, m.query, m.regions, + m.max_retries, m.retry_interval_s, EXTRACT(EPOCH FROM ( CASE WHEN last.checked_at IS NULL THEN now() diff --git a/apps/api/src/routes/monitors.ts b/apps/api/src/routes/monitors.ts index 243b303..e8c19ac 100644 --- a/apps/api/src/routes/monitors.ts +++ b/apps/api/src/routes/monitors.ts @@ -12,6 +12,9 @@ const MonitorBody = t.Object({ request_body: t.Optional(t.Nullable(t.String({ maxLength: 65536, description: "Request body for POST/PUT/PATCH (max 64KB)" }))), timeout_ms: t.Optional(t.Number({ minimum: 1000, maximum: 60000, default: 10000, description: "Request timeout in ms" })), interval_s: t.Optional(t.Number({ minimum: 2, default: 30, description: "Check interval in seconds (minimum 2)" })), + max_retries: t.Optional(t.Number({ minimum: 0, maximum: 10, default: 0, description: "Retry a failing check up to N times before declaring DOWN" })), + retry_interval_s: t.Optional(t.Number({ minimum: 1, maximum: 600, default: 30, description: "Seconds between retries" })), + resend_interval: t.Optional(t.Number({ minimum: 0, maximum: 1000, default: 0, description: "Re-alert every Nth consecutive down beat. 0 = never resend." })), query: t.Optional(t.Any({ description: "PingQL query — filter conditions for up/down" })), regions: t.Optional(t.Array(t.String(), { description: "Regions to run checks from. Empty array = all regions." })), }); @@ -47,7 +50,7 @@ export const monitors = new Elysia({ prefix: "/monitors" }) const ssrfError = await validateMonitorUrl(body.url); if (ssrfError) { set.status = 400; return { error: ssrfError }; } const [monitor] = await sql` - INSERT INTO monitors (account_id, name, url, method, request_headers, request_body, timeout_ms, interval_s, query, regions) + INSERT INTO monitors (account_id, name, url, method, request_headers, request_body, timeout_ms, interval_s, max_retries, retry_interval_s, resend_interval, query, regions) VALUES ( ${accountId}, ${body.name}, ${body.url}, ${(body.method ?? 'GET').toUpperCase()}, @@ -55,6 +58,9 @@ export const monitors = new Elysia({ prefix: "/monitors" }) ${body.request_body ?? null}, ${body.timeout_ms ?? 10000}, ${interval}, + ${body.max_retries ?? 0}, + ${body.retry_interval_s ?? 30}, + ${body.resend_interval ?? 0}, ${body.query ? sql.json(body.query) : null}, ${sql.array(regions)} ) @@ -103,6 +109,9 @@ export const monitors = new Elysia({ prefix: "/monitors" }) request_body = COALESCE(${body.request_body ?? null}, request_body), timeout_ms = COALESCE(${body.timeout_ms ?? null}, timeout_ms), interval_s = COALESCE(${body.interval_s ?? null}, interval_s), + max_retries = COALESCE(${body.max_retries ?? null}, max_retries), + retry_interval_s = COALESCE(${body.retry_interval_s ?? null}, retry_interval_s), + resend_interval = COALESCE(${body.resend_interval ?? null}, resend_interval), query = COALESCE(${body.query ? sql.json(body.query) : null}, query), regions = COALESCE(${body.regions ? sql.array(body.regions) : null}, regions) WHERE id = ${params.id} AND account_id = ${accountId} diff --git a/apps/api/src/routes/pings.ts b/apps/api/src/routes/pings.ts index 0a3c12d..50cfe76 100644 --- a/apps/api/src/routes/pings.ts +++ b/apps/api/src/routes/pings.ts @@ -55,7 +55,10 @@ export const ingest = new Elysia() const token = headers["x-monitor-token"]; if (!safeTokenCompare(token, process.env.MONITOR_TOKEN)) { set.status = 401; return { error: "Unauthorized" }; } - const [monitor_check] = await sql`SELECT id FROM monitors WHERE id = ${body.monitor_id}`; + const [monitor_check] = await sql` + SELECT id, account_id, last_state, consecutive_down, resend_interval + FROM monitors WHERE id = ${body.monitor_id} + `; if (!monitor_check) { set.status = 404; return { error: "Monitor not found" }; } const meta = body.meta ? { ...body.meta } : {}; @@ -68,8 +71,33 @@ export const ingest = new Elysia() const scheduledAt = body.scheduled_at ? new Date(body.scheduled_at) : null; const jitterMs = body.jitter_ms ?? null; + // Important-beat + resend bookkeeping. Important = transition or resend tick. + const newState = body.up ? 'up' : 'down'; + const prevState: string | null = monitor_check.last_state; + let consecutiveDown: number = monitor_check.consecutive_down ?? 0; + const resendInterval: number = monitor_check.resend_interval ?? 0; + let important = false; + + if (prevState !== newState) { + important = true; + consecutiveDown = body.up ? 0 : 1; + } else if (!body.up) { + consecutiveDown += 1; + if (resendInterval > 0 && consecutiveDown > 1 && (consecutiveDown - 1) % resendInterval === 0) { + important = true; + } + } else { + consecutiveDown = 0; + } + + await sql` + UPDATE monitors + SET last_state = ${newState}, consecutive_down = ${consecutiveDown} + WHERE id = ${body.monitor_id} + `; + const [ping] = await sql` - INSERT INTO pings (monitor_id, checked_at, scheduled_at, jitter_ms, status_code, latency_ms, up, error, meta, region, run_id) + INSERT INTO pings (monitor_id, checked_at, scheduled_at, jitter_ms, status_code, latency_ms, up, important, error, meta, region, run_id) VALUES ( ${body.monitor_id}, ${checkedAt ?? sql`now()`}, @@ -78,6 +106,7 @@ export const ingest = new Elysia() ${body.status_code ?? null}, ${body.latency_ms ?? null}, ${body.up}, + ${important}, ${body.error ?? null}, ${Object.keys(meta).length > 0 ? sql.json(meta) : null}, ${body.region ?? null}, @@ -90,8 +119,7 @@ export const ingest = new Elysia() await sql`INSERT INTO ping_bodies (ping_id, body) VALUES (${ping.id}, ${responseBody})`; } - const [monitor] = await sql`SELECT account_id FROM monitors WHERE id = ${body.monitor_id}`; - if (monitor) publish(monitor.account_id, ping); + publish(monitor_check.account_id, ping); return { ok: true }; }, { diff --git a/apps/monitor/src/runner.rs b/apps/monitor/src/runner.rs index fa96843..24c9c72 100644 --- a/apps/monitor/src/runner.rs +++ b/apps/monitor/src/runner.rs @@ -85,8 +85,12 @@ pub async fn fetch_and_run( } } let timeout_ms = monitor.timeout_ms.unwrap_or(30000); - let deadline = std::time::Duration::from_millis(timeout_ms + 5000); - let result = match tokio::time::timeout(deadline, run_check(&client, &monitor, scheduled_at_iso.clone(), ®ion_owned, &run_id_owned)).await { + let attempts = monitor.max_retries.saturating_add(1) as u64; + let retry_gap_s = monitor.retry_interval_s; + let deadline = std::time::Duration::from_millis( + attempts * (timeout_ms + 5000) + attempts.saturating_sub(1) * retry_gap_s * 1000, + ); + let result = match tokio::time::timeout(deadline, run_check_with_retries(&client, &monitor, scheduled_at_iso.clone(), ®ion_owned, &run_id_owned)).await { Ok(r) => r, Err(_) => PingResult { monitor_id: monitor.id.clone(), @@ -113,6 +117,35 @@ pub async fn fetch_and_run( Ok(spawned) } +async fn run_check_with_retries(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Option, region: &str, run_id: &str) -> PingResult { + let attempts = monitor.max_retries.saturating_add(1); + let retry_gap = std::time::Duration::from_secs(monitor.retry_interval_s); + let mut last: Option = None; + for attempt in 0..attempts { + let mut result = run_check(client, monitor, scheduled_at.clone(), region, run_id).await; + if result.up { + if attempt > 0 { + if let Some(meta) = result.meta.as_mut().and_then(|m| m.as_object_mut()) { + meta.insert("retries".into(), json!(attempt)); + } + } + return result; + } + last = Some(result); + if attempt + 1 < attempts { + tokio::time::sleep(retry_gap).await; + } + } + let mut result = last.expect("at least one attempt"); + if attempts > 1 { + let meta = result.meta.get_or_insert_with(|| json!({})); + if let Some(obj) = meta.as_object_mut() { + obj.insert("retries".into(), json!(attempts - 1)); + } + } + result +} + async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Option, region: &str, run_id: &str) -> PingResult { let checked_at = chrono::Utc::now().to_rfc3339(); let jitter_ms: Option = scheduled_at.as_deref().and_then(|s| { @@ -203,7 +236,7 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Op } } } else { - (status < 400, None) + ((200..300).contains(&status), None) }; let cert_expiry_days = match cert_handle { @@ -304,15 +337,20 @@ fn run_check_blocking( resp_headers.insert(name.as_str().to_lowercase(), value.to_str().unwrap_or("").to_string()); } - const MAX_BODY: usize = 10 * 1024 * 1024; - let body_str = match resp.body_mut().read_to_string() { - Ok(s) => s, - Err(e) => format!("[failed to read body: {}]", e), - }; - let body_out = if body_str.len() > MAX_BODY { - format!("[body truncated: {} bytes]", body_str.len()) - } else { - body_str + const MAX_BODY: usize = 2 * 1024 * 1024; + let body_out = match resp.body_mut().with_config().limit((MAX_BODY + 1) as u64).read_to_vec() { + Ok(mut buf) => { + if buf.len() > MAX_BODY { buf.truncate(MAX_BODY); } + String::from_utf8_lossy(&buf).into_owned() + } + Err(e) => { + let msg = e.to_string(); + if msg.contains("limit") || msg.contains("Limit") { + format!("[body exceeded {}-byte cap]", MAX_BODY) + } else { + format!("[failed to read body: {}]", e) + } + } }; Ok((status, resp_headers, body_out)) diff --git a/apps/monitor/src/types.rs b/apps/monitor/src/types.rs index c59b244..113cb8e 100644 --- a/apps/monitor/src/types.rs +++ b/apps/monitor/src/types.rs @@ -23,6 +23,10 @@ pub struct Monitor { pub request_body: Option, pub timeout_ms: Option, pub interval_s: i64, + #[serde(default)] + pub max_retries: u32, + #[serde(default)] + pub retry_interval_s: u64, pub query: Option, pub scheduled_at: Option, // ISO string for backward compat in PingResult #[serde(deserialize_with = "deserialize_ms")] diff --git a/apps/shared/db.ts b/apps/shared/db.ts index a7651c3..f505422 100644 --- a/apps/shared/db.ts +++ b/apps/shared/db.ts @@ -69,6 +69,13 @@ export async function migrate(sql: any) { await sql`ALTER TABLE accounts ADD COLUMN IF NOT EXISTS plan_expires_at TIMESTAMPTZ`; await sql`ALTER TABLE accounts ADD COLUMN IF NOT EXISTS plan_stack JSONB NOT NULL DEFAULT '[]'`; + await sql`ALTER TABLE monitors ADD COLUMN IF NOT EXISTS max_retries INTEGER NOT NULL DEFAULT 0`; + await sql`ALTER TABLE monitors ADD COLUMN IF NOT EXISTS retry_interval_s INTEGER NOT NULL DEFAULT 30`; + await sql`ALTER TABLE monitors ADD COLUMN IF NOT EXISTS resend_interval INTEGER NOT NULL DEFAULT 0`; + await sql`ALTER TABLE monitors ADD COLUMN IF NOT EXISTS consecutive_down INTEGER NOT NULL DEFAULT 0`; + await sql`ALTER TABLE monitors ADD COLUMN IF NOT EXISTS last_state TEXT`; + await sql`ALTER TABLE pings ADD COLUMN IF NOT EXISTS important BOOLEAN NOT NULL DEFAULT false`; + await sql`CREATE INDEX IF NOT EXISTS idx_pings_monitor ON pings(monitor_id, checked_at DESC)`; await sql`CREATE INDEX IF NOT EXISTS idx_pings_checked_at ON pings(checked_at)`; diff --git a/apps/web/src/routes/dashboard.ts b/apps/web/src/routes/dashboard.ts index e403394..ffbd2ff 100644 --- a/apps/web/src/routes/dashboard.ts +++ b/apps/web/src/routes/dashboard.ts @@ -442,6 +442,9 @@ export const dashboard = new Elysia() method: b.method || "GET", interval_s: Number(b.interval_s) || 30, timeout_ms: Number(b.timeout_ms) || 10000, + max_retries: Number(b.max_retries) || 0, + retry_interval_s: Number(b.retry_interval_s) || 30, + resend_interval: Number(b.resend_interval) || 0, regions, request_headers: Object.keys(requestHeaders).length ? requestHeaders : null, request_body: b.request_body || null, @@ -479,6 +482,9 @@ export const dashboard = new Elysia() method: b.method || "GET", interval_s: Number(b.interval_s) || 30, timeout_ms: Number(b.timeout_ms) || 10000, + max_retries: Number(b.max_retries) || 0, + retry_interval_s: Number(b.retry_interval_s) || 30, + resend_interval: Number(b.resend_interval) || 0, regions, request_headers: Object.keys(requestHeaders).length ? requestHeaders : null, request_body: b.request_body || null, diff --git a/apps/web/src/views/partials/monitor-form-js.ejs b/apps/web/src/views/partials/monitor-form-js.ejs index 2084c99..9602c01 100644 --- a/apps/web/src/views/partials/monitor-form-js.ejs +++ b/apps/web/src/views/partials/monitor-form-js.ejs @@ -43,6 +43,9 @@ method: document.getElementById(prefix + 'method').value, interval_s: Number(document.getElementById(prefix + 'interval').value), timeout_ms: Number(document.getElementById(prefix + 'timeout').value), + max_retries: Number(document.getElementById(prefix + 'max-retries').value), + retry_interval_s: Number(document.getElementById(prefix + 'retry-interval').value), + resend_interval: Number(document.getElementById(prefix + 'resend-interval').value), }; if (Object.keys(headers).length) body.request_headers = headers; else body.request_headers = null; diff --git a/apps/web/src/views/partials/monitor-form.ejs b/apps/web/src/views/partials/monitor-form.ejs index a53c653..9385fe0 100644 --- a/apps/web/src/views/partials/monitor-form.ejs +++ b/apps/web/src/views/partials/monitor-form.ejs @@ -84,6 +84,36 @@ +
+
+ + +
+
+ + +
+
+ + +
+
+ <% // Default to all regions if none selected const selectedRegions = (monitor.regions && monitor.regions.length) ? monitor.regions : regions.map(r => r[0]);