diff --git a/apps/api/src/db.ts b/apps/api/src/db.ts index 7bd2823..40cd549 100644 --- a/apps/api/src/db.ts +++ b/apps/api/src/db.ts @@ -1,6 +1,10 @@ import postgres from "postgres"; -const sql = postgres(process.env.DATABASE_URL ?? "postgres://pingql:pingql@localhost:5432/pingql"); +const sql = postgres(process.env.DATABASE_URL ?? "postgres://pingql:pingql@localhost:5432/pingql", { + max: 20, + idle_timeout: 30, + connect_timeout: 10, +}); export default sql; diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index a02dc73..136018b 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -5,7 +5,6 @@ import { monitors } from "./routes/monitors"; import { account } from "./routes/auth"; import { internal } from "./routes/internal"; import { migrate } from "./db"; - await migrate(); const CORS_ORIGIN = process.env.CORS_ORIGINS?.split(",") ?? ["https://pingql.com"]; @@ -16,29 +15,6 @@ const CORS_HEADERS = { "access-control-allow-headers": "Content-Type, Authorization", }; -// ── Rate limiter ────────────────────────────────────────────────────── -const rateLimitMap = new Map(); -const RATE_LIMIT_WINDOW = 60_000; // 1 minute - -function rateLimit(ip: string, maxRequests: number): boolean { - const now = Date.now(); - const entry = rateLimitMap.get(ip); - if (!entry || now > entry.resetAt) { - rateLimitMap.set(ip, { count: 1, resetAt: now + RATE_LIMIT_WINDOW }); - return true; - } - entry.count++; - return entry.count <= maxRequests; -} - -// Cleanup stale entries every 5 minutes -setInterval(() => { - const now = Date.now(); - for (const [key, entry] of rateLimitMap) { - if (now > entry.resetAt) rateLimitMap.delete(key); - } -}, 5 * 60_000); - const SECURITY_HEADERS = { "X-Content-Type-Options": "nosniff", "X-Frame-Options": "DENY", diff --git a/apps/api/src/query/index.ts b/apps/api/src/query/index.ts index 670c0dd..216525f 100644 --- a/apps/api/src/query/index.ts +++ b/apps/api/src/query/index.ts @@ -317,6 +317,10 @@ export function validateQuery(query: unknown, path = ""): ValidationError[] { if (typeof value !== "string") { errors.push({ path: keyPath, message: `${key} expects a string` }); } + } else if (key === "$consider") { + if (value !== "up" && value !== "down") { + errors.push({ path: keyPath, message: '$consider must be "up" or "down"' }); + } } else if (key.startsWith("$")) { // It's an operator inside a field condition — skip validation here } else { diff --git a/apps/api/src/routes/auth.ts b/apps/api/src/routes/auth.ts index fd54c83..bf6aa24 100644 --- a/apps/api/src/routes/auth.ts +++ b/apps/api/src/routes/auth.ts @@ -1,27 +1,10 @@ import { Elysia, t } from "elysia"; import { createHmac, randomBytes } from "crypto"; import sql from "../db"; +import { createRateLimiter } from "../utils/rate-limit"; // ── Per-IP rate limiting for auth endpoints ─────────────────────────── -const authRateMap = new Map(); - -function checkAuthRateLimit(ip: string, maxPerMinute: number): boolean { - const now = Date.now(); - const entry = authRateMap.get(ip); - if (!entry || now > entry.resetAt) { - authRateMap.set(ip, { count: 1, resetAt: now + 60_000 }); - return true; - } - entry.count++; - return entry.count <= maxPerMinute; -} - -setInterval(() => { - const now = Date.now(); - for (const [key, entry] of authRateMap) { - if (now > entry.resetAt) authRateMap.delete(key); - } -}, 5 * 60_000); +const checkAuthRateLimit = createRateLimiter(); const EMAIL_HMAC_KEY = process.env.EMAIL_HMAC_KEY || "pingql-default-hmac-key"; diff --git a/apps/api/src/routes/internal.ts b/apps/api/src/routes/internal.ts index d0174a8..38187fd 100644 --- a/apps/api/src/routes/internal.ts +++ b/apps/api/src/routes/internal.ts @@ -2,16 +2,8 @@ /// Protected by MONITOR_TOKEN — not exposed to users. import { Elysia } from "elysia"; -import { timingSafeEqual } from "crypto"; import sql from "../db"; - -function safeTokenCompare(a: string | undefined, b: string | undefined): boolean { - if (!a || !b) return false; - const bufA = Buffer.from(a); - const bufB = Buffer.from(b); - if (bufA.length !== bufB.length) return false; - return timingSafeEqual(bufA, bufB); -} +import { safeTokenCompare } from "../utils/token"; export async function pruneOldPings(retentionDays = 90) { const result = await sql`DELETE FROM pings WHERE checked_at < now() - ${retentionDays + ' days'}::interval`; diff --git a/apps/api/src/routes/monitors.ts b/apps/api/src/routes/monitors.ts index 48249cc..21534d9 100644 --- a/apps/api/src/routes/monitors.ts +++ b/apps/api/src/routes/monitors.ts @@ -113,7 +113,7 @@ export const monitors = new Elysia({ prefix: "/monitors" }) SELECT id FROM monitors WHERE id = ${params.id} AND account_id = ${accountId} `; if (!monitor) return error(404, { error: "Not found" }); - const limit = Math.min(Number(query.limit ?? 100), 1000); + const limit = Math.min(Number(query.limit) || 100, 1000); return sql` SELECT * FROM pings WHERE monitor_id = ${params.id} diff --git a/apps/api/src/routes/pings.ts b/apps/api/src/routes/pings.ts index a5e4106..bc1f21f 100644 --- a/apps/api/src/routes/pings.ts +++ b/apps/api/src/routes/pings.ts @@ -1,15 +1,7 @@ import { Elysia, t } from "elysia"; -import { timingSafeEqual } from "crypto"; import sql from "../db"; import { resolveKey } from "./auth"; - -function safeTokenCompare(a: string | undefined, b: string | undefined): boolean { - if (!a || !b) return false; - const bufA = Buffer.from(a); - const bufB = Buffer.from(b); - if (bufA.length !== bufB.length) return false; - return timingSafeEqual(bufA, bufB); -} +import { safeTokenCompare } from "../utils/token"; // ── SSE bus ─────────────────────────────────────────────────────────────────── type SSEController = ReadableStreamDefaultController; @@ -35,7 +27,11 @@ function makeSSEStream(accountId: string): Response { bus.get(accountId)!.add(ctrl); ctrl.enqueue(enc.encode(": connected\n\n")); heartbeat = setInterval(() => { - try { ctrl.enqueue(enc.encode(": heartbeat\n\n")); } catch { clearInterval(heartbeat); } + try { ctrl.enqueue(enc.encode(": heartbeat\n\n")); } catch { + clearInterval(heartbeat); + bus.get(accountId)?.delete(ctrl); + if (bus.get(accountId)?.size === 0) bus.delete(accountId); + } }, 10_000); }, cancel() { diff --git a/apps/api/src/utils/rate-limit.ts b/apps/api/src/utils/rate-limit.ts new file mode 100644 index 0000000..7d89a39 --- /dev/null +++ b/apps/api/src/utils/rate-limit.ts @@ -0,0 +1,21 @@ +export function createRateLimiter(windowMs = 60_000, cleanupIntervalMs = 5 * 60_000) { + const map = new Map(); + + setInterval(() => { + const now = Date.now(); + for (const [key, entry] of map) { + if (now > entry.resetAt) map.delete(key); + } + }, cleanupIntervalMs); + + return function check(key: string, max: number): boolean { + const now = Date.now(); + const entry = map.get(key); + if (!entry || now > entry.resetAt) { + map.set(key, { count: 1, resetAt: now + windowMs }); + return true; + } + entry.count++; + return entry.count <= max; + }; +} diff --git a/apps/api/src/utils/token.ts b/apps/api/src/utils/token.ts new file mode 100644 index 0000000..01c58f6 --- /dev/null +++ b/apps/api/src/utils/token.ts @@ -0,0 +1,9 @@ +import { timingSafeEqual } from "crypto"; + +export function safeTokenCompare(a: string | undefined, b: string | undefined): boolean { + if (!a || !b) return false; + const bufA = Buffer.from(a); + const bufB = Buffer.from(b); + if (bufA.length !== bufB.length) return false; + return timingSafeEqual(bufA, bufB); +} diff --git a/apps/monitor/src/main.rs b/apps/monitor/src/main.rs index 6343050..34468a9 100644 --- a/apps/monitor/src/main.rs +++ b/apps/monitor/src/main.rs @@ -37,11 +37,28 @@ async fn main() -> Result<()> { let in_flight: Arc>> = Arc::new(Mutex::new(HashSet::new())); + let shutdown = tokio::signal::ctrl_c(); + tokio::pin!(shutdown); + loop { - match runner::fetch_and_run(&client, &coordinator_url, &monitor_token, ®ion, &in_flight).await { - Ok(n) => { if n > 0 { info!("Spawned {n} checks"); } }, - Err(e) => error!("Check cycle failed: {e}"), + tokio::select! { + _ = &mut shutdown => { + info!("Shutdown signal received, waiting for in-flight checks..."); + let deadline = tokio::time::Instant::now() + Duration::from_secs(35); + while !in_flight.lock().await.is_empty() && tokio::time::Instant::now() < deadline { + sleep(Duration::from_millis(500)).await; + } + info!("Shutdown complete"); + break; + } + _ = sleep(Duration::from_millis(1000)) => { + match runner::fetch_and_run(&client, &coordinator_url, &monitor_token, ®ion, &in_flight).await { + Ok(n) => { if n > 0 { info!("Spawned {n} checks"); } }, + Err(e) => error!("Check cycle failed: {e}"), + } + } } - sleep(Duration::from_millis(1000)).await; } + + Ok(()) } diff --git a/apps/monitor/src/query.rs b/apps/monitor/src/query.rs index 342d899..c9c76d4 100644 --- a/apps/monitor/src/query.rs +++ b/apps/monitor/src/query.rs @@ -105,9 +105,13 @@ pub fn evaluate(query: &Value, response: &Response) -> Result { Value::Object(m) => m, _ => bail!("$select expects an object {{ selector: condition }}"), }; + // Parse HTML once for all selectors + let doc = Html::parse_document(&response.body); for (selector, condition) in sel_obj { - let selected = css_select(&response.body, selector) - .map(Value::String) + let sel = Selector::parse(selector) + .map_err(|_| anyhow::anyhow!("Invalid CSS selector: {selector}"))?; + let selected = doc.select(&sel).next() + .map(|el| Value::String(el.text().collect::().trim().to_string())) .unwrap_or(Value::Null); if !eval_condition(condition, &selected, response)? { return Ok(false); } } @@ -244,7 +248,10 @@ fn eval_op(op: &str, field_val: &Value, val: &Value, response: &Response) -> Res // If no comparison operator follows, just check existence selected.is_some() } - _ => true, // unknown op — skip + _ => { + tracing::warn!("Unknown query operator: {op}"); + false + } }; Ok(ok) } diff --git a/apps/monitor/src/runner.rs b/apps/monitor/src/runner.rs index a9b7335..304ffb2 100644 --- a/apps/monitor/src/runner.rs +++ b/apps/monitor/src/runner.rs @@ -8,6 +8,17 @@ use std::time::Instant; use tokio::sync::Mutex; use tracing::{debug, warn}; +// Cache native root certs per OS thread to avoid reloading from disk on every check. +thread_local! { + static ROOT_CERTS: Arc>> = Arc::new( + rustls_native_certs::load_native_certs() + .certs + .into_iter() + .map(|c| ureq::tls::Certificate::from_der(c.as_ref()).to_owned()) + .collect() + ); +} + /// Fetch due monitors from coordinator, run them, post results back. pub async fn fetch_and_run( client: &reqwest::Client, @@ -34,9 +45,10 @@ pub async fn fetch_and_run( let n = monitors.len(); if n == 0 { return Ok(0); } - // run_id is computed deterministically per monitor+interval bucket so all regions - // checking within the same scheduled window share the same ID. - // Format: first 8 chars of monitor_id + ':' + floor(scheduled_at_epoch / interval_s) + // Shared read-only strings — clone the Arc instead of allocating per monitor + let coordinator_url: Arc = Arc::from(coordinator_url); + let token: Arc = Arc::from(token); + let region: Arc = Arc::from(region); // Spawn all checks — fire and forget, skip if already in-flight let mut spawned = 0usize; @@ -51,9 +63,9 @@ pub async fn fetch_and_run( } spawned += 1; let client = client.clone(); - let coordinator_url = coordinator_url.to_string(); - let token = token.to_string(); - let region_owned = region.to_string(); + let coordinator_url = Arc::clone(&coordinator_url); + let token = Arc::clone(&token); + let region_owned = Arc::clone(®ion); // run_id: hash(monitor_id, interval_bucket) — same across all regions for this window let run_id_owned = { use std::collections::hash_map::DefaultHasher; @@ -96,7 +108,7 @@ pub async fn fetch_and_run( error: Some(format!("timed out after {}ms", timeout_ms)), cert_expiry_days: None, meta: None, - region: if region_owned.is_empty() { None } else { Some(region_owned.clone()) }, + region: if region_owned.is_empty() { None } else { Some(region_owned.to_string()) }, run_id: Some(run_id_owned.clone()), }, }; @@ -139,7 +151,13 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Op let (tx, rx) = tokio::sync::oneshot::channel::, String), String>>(); std::thread::spawn(move || { - let _ = tx.send(run_check_blocking(&url, &method_clone, req_headers.as_ref(), req_body.as_deref(), timeout)); + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + run_check_blocking(&url, &method_clone, req_headers.as_ref(), req_body.as_deref(), timeout) + })); + let _ = tx.send(match result { + Ok(r) => r, + Err(_) => Err("check panicked".to_string()), + }); }); let curl_result = tokio::time::timeout(timeout + std::time::Duration::from_secs(2), rx) @@ -244,17 +262,11 @@ fn run_check_blocking( body: Option<&str>, timeout: std::time::Duration, ) -> Result<(u16, HashMap, String), String> { - // Load system CA certs so we can verify chains from Cloudflare and other - // CAs not included in the bundled webpki-roots. - let root_certs: Vec> = - rustls_native_certs::load_native_certs() - .certs - .into_iter() - .map(|c| ureq::tls::Certificate::from_der(c.as_ref()).to_owned()) - .collect(); + // Reuse cached root certs (loaded once per OS thread via thread_local) + let root_certs = ROOT_CERTS.with(|c| Arc::clone(c)); let tls = ureq::tls::TlsConfig::builder() - .root_certs(ureq::tls::RootCerts::Specific(Arc::new(root_certs))) + .root_certs(ureq::tls::RootCerts::Specific(root_certs)) .build(); let agent = ureq::Agent::config_builder() diff --git a/apps/web/src/db.ts b/apps/web/src/db.ts index efe07e9..58de9e0 100644 --- a/apps/web/src/db.ts +++ b/apps/web/src/db.ts @@ -1,6 +1,10 @@ import postgres from "postgres"; -const sql = postgres(process.env.DATABASE_URL ?? "postgres://pingql:pingql@localhost:5432/pingql"); +const sql = postgres(process.env.DATABASE_URL ?? "postgres://pingql:pingql@localhost:5432/pingql", { + max: 20, + idle_timeout: 30, + connect_timeout: 10, +}); export default sql; diff --git a/apps/web/src/query/index.ts b/apps/web/src/query/index.ts index 670c0dd..216525f 100644 --- a/apps/web/src/query/index.ts +++ b/apps/web/src/query/index.ts @@ -317,6 +317,10 @@ export function validateQuery(query: unknown, path = ""): ValidationError[] { if (typeof value !== "string") { errors.push({ path: keyPath, message: `${key} expects a string` }); } + } else if (key === "$consider") { + if (value !== "up" && value !== "down") { + errors.push({ path: keyPath, message: '$consider must be "up" or "down"' }); + } } else if (key.startsWith("$")) { // It's an operator inside a field condition — skip validation here } else { diff --git a/apps/web/src/routes/auth.ts b/apps/web/src/routes/auth.ts index cc89f40..7a54819 100644 --- a/apps/web/src/routes/auth.ts +++ b/apps/web/src/routes/auth.ts @@ -1,29 +1,12 @@ import { Elysia, t } from "elysia"; import { createHmac, randomBytes } from "crypto"; import sql from "../db"; +import { createRateLimiter } from "../utils/rate-limit"; const EMAIL_HMAC_KEY = process.env.EMAIL_HMAC_KEY || "pingql-default-hmac-key"; // ── Per-IP rate limiting for auth endpoints ─────────────────────────── -const authRateMap = new Map(); - -function checkAuthRateLimit(ip: string, maxPerMinute: number): boolean { - const now = Date.now(); - const entry = authRateMap.get(ip); - if (!entry || now > entry.resetAt) { - authRateMap.set(ip, { count: 1, resetAt: now + 60_000 }); - return true; - } - entry.count++; - return entry.count <= maxPerMinute; -} - -setInterval(() => { - const now = Date.now(); - for (const [key, entry] of authRateMap) { - if (now > entry.resetAt) authRateMap.delete(key); - } -}, 5 * 60_000); +const checkAuthRateLimit = createRateLimiter(); function generateKey(): string { return randomBytes(32).toString("base64url"); diff --git a/apps/web/src/utils/rate-limit.ts b/apps/web/src/utils/rate-limit.ts new file mode 100644 index 0000000..7d89a39 --- /dev/null +++ b/apps/web/src/utils/rate-limit.ts @@ -0,0 +1,21 @@ +export function createRateLimiter(windowMs = 60_000, cleanupIntervalMs = 5 * 60_000) { + const map = new Map(); + + setInterval(() => { + const now = Date.now(); + for (const [key, entry] of map) { + if (now > entry.resetAt) map.delete(key); + } + }, cleanupIntervalMs); + + return function check(key: string, max: number): boolean { + const now = Date.now(); + const entry = map.get(key); + if (!entry || now > entry.resetAt) { + map.set(key, { count: 1, resetAt: now + windowMs }); + return true; + } + entry.count++; + return entry.count <= max; + }; +}