perf: optimize monitor runner, fix SSE leak, deduplicate shared utils

This commit is contained in:
nate 2026-03-18 18:44:08 +04:00
parent 980261632e
commit 425bfbfc39
16 changed files with 141 additions and 108 deletions

View File

@ -1,6 +1,10 @@
import postgres from "postgres";
const sql = postgres(process.env.DATABASE_URL ?? "postgres://pingql:pingql@localhost:5432/pingql");
const sql = postgres(process.env.DATABASE_URL ?? "postgres://pingql:pingql@localhost:5432/pingql", {
max: 20,
idle_timeout: 30,
connect_timeout: 10,
});
export default sql;

View File

@ -5,7 +5,6 @@ import { monitors } from "./routes/monitors";
import { account } from "./routes/auth";
import { internal } from "./routes/internal";
import { migrate } from "./db";
await migrate();
const CORS_ORIGIN = process.env.CORS_ORIGINS?.split(",") ?? ["https://pingql.com"];
@ -16,29 +15,6 @@ const CORS_HEADERS = {
"access-control-allow-headers": "Content-Type, Authorization",
};
// ── Rate limiter ──────────────────────────────────────────────────────
const rateLimitMap = new Map<string, { count: number; resetAt: number }>();
const RATE_LIMIT_WINDOW = 60_000; // 1 minute
function rateLimit(ip: string, maxRequests: number): boolean {
const now = Date.now();
const entry = rateLimitMap.get(ip);
if (!entry || now > entry.resetAt) {
rateLimitMap.set(ip, { count: 1, resetAt: now + RATE_LIMIT_WINDOW });
return true;
}
entry.count++;
return entry.count <= maxRequests;
}
// Cleanup stale entries every 5 minutes
setInterval(() => {
const now = Date.now();
for (const [key, entry] of rateLimitMap) {
if (now > entry.resetAt) rateLimitMap.delete(key);
}
}, 5 * 60_000);
const SECURITY_HEADERS = {
"X-Content-Type-Options": "nosniff",
"X-Frame-Options": "DENY",

View File

@ -317,6 +317,10 @@ export function validateQuery(query: unknown, path = ""): ValidationError[] {
if (typeof value !== "string") {
errors.push({ path: keyPath, message: `${key} expects a string` });
}
} else if (key === "$consider") {
if (value !== "up" && value !== "down") {
errors.push({ path: keyPath, message: '$consider must be "up" or "down"' });
}
} else if (key.startsWith("$")) {
// It's an operator inside a field condition — skip validation here
} else {

View File

@ -1,27 +1,10 @@
import { Elysia, t } from "elysia";
import { createHmac, randomBytes } from "crypto";
import sql from "../db";
import { createRateLimiter } from "../utils/rate-limit";
// ── Per-IP rate limiting for auth endpoints ───────────────────────────
const authRateMap = new Map<string, { count: number; resetAt: number }>();
function checkAuthRateLimit(ip: string, maxPerMinute: number): boolean {
const now = Date.now();
const entry = authRateMap.get(ip);
if (!entry || now > entry.resetAt) {
authRateMap.set(ip, { count: 1, resetAt: now + 60_000 });
return true;
}
entry.count++;
return entry.count <= maxPerMinute;
}
setInterval(() => {
const now = Date.now();
for (const [key, entry] of authRateMap) {
if (now > entry.resetAt) authRateMap.delete(key);
}
}, 5 * 60_000);
const checkAuthRateLimit = createRateLimiter();
const EMAIL_HMAC_KEY = process.env.EMAIL_HMAC_KEY || "pingql-default-hmac-key";

View File

@ -2,16 +2,8 @@
/// Protected by MONITOR_TOKEN — not exposed to users.
import { Elysia } from "elysia";
import { timingSafeEqual } from "crypto";
import sql from "../db";
function safeTokenCompare(a: string | undefined, b: string | undefined): boolean {
if (!a || !b) return false;
const bufA = Buffer.from(a);
const bufB = Buffer.from(b);
if (bufA.length !== bufB.length) return false;
return timingSafeEqual(bufA, bufB);
}
import { safeTokenCompare } from "../utils/token";
export async function pruneOldPings(retentionDays = 90) {
const result = await sql`DELETE FROM pings WHERE checked_at < now() - ${retentionDays + ' days'}::interval`;

View File

@ -113,7 +113,7 @@ export const monitors = new Elysia({ prefix: "/monitors" })
SELECT id FROM monitors WHERE id = ${params.id} AND account_id = ${accountId}
`;
if (!monitor) return error(404, { error: "Not found" });
const limit = Math.min(Number(query.limit ?? 100), 1000);
const limit = Math.min(Number(query.limit) || 100, 1000);
return sql`
SELECT * FROM pings
WHERE monitor_id = ${params.id}

View File

@ -1,15 +1,7 @@
import { Elysia, t } from "elysia";
import { timingSafeEqual } from "crypto";
import sql from "../db";
import { resolveKey } from "./auth";
function safeTokenCompare(a: string | undefined, b: string | undefined): boolean {
if (!a || !b) return false;
const bufA = Buffer.from(a);
const bufB = Buffer.from(b);
if (bufA.length !== bufB.length) return false;
return timingSafeEqual(bufA, bufB);
}
import { safeTokenCompare } from "../utils/token";
// ── SSE bus ───────────────────────────────────────────────────────────────────
type SSEController = ReadableStreamDefaultController<Uint8Array>;
@ -35,7 +27,11 @@ function makeSSEStream(accountId: string): Response {
bus.get(accountId)!.add(ctrl);
ctrl.enqueue(enc.encode(": connected\n\n"));
heartbeat = setInterval(() => {
try { ctrl.enqueue(enc.encode(": heartbeat\n\n")); } catch { clearInterval(heartbeat); }
try { ctrl.enqueue(enc.encode(": heartbeat\n\n")); } catch {
clearInterval(heartbeat);
bus.get(accountId)?.delete(ctrl);
if (bus.get(accountId)?.size === 0) bus.delete(accountId);
}
}, 10_000);
},
cancel() {

View File

@ -0,0 +1,21 @@
export function createRateLimiter(windowMs = 60_000, cleanupIntervalMs = 5 * 60_000) {
const map = new Map<string, { count: number; resetAt: number }>();
setInterval(() => {
const now = Date.now();
for (const [key, entry] of map) {
if (now > entry.resetAt) map.delete(key);
}
}, cleanupIntervalMs);
return function check(key: string, max: number): boolean {
const now = Date.now();
const entry = map.get(key);
if (!entry || now > entry.resetAt) {
map.set(key, { count: 1, resetAt: now + windowMs });
return true;
}
entry.count++;
return entry.count <= max;
};
}

View File

@ -0,0 +1,9 @@
import { timingSafeEqual } from "crypto";
export function safeTokenCompare(a: string | undefined, b: string | undefined): boolean {
if (!a || !b) return false;
const bufA = Buffer.from(a);
const bufB = Buffer.from(b);
if (bufA.length !== bufB.length) return false;
return timingSafeEqual(bufA, bufB);
}

View File

@ -37,11 +37,28 @@ async fn main() -> Result<()> {
let in_flight: Arc<Mutex<HashSet<String>>> = Arc::new(Mutex::new(HashSet::new()));
let shutdown = tokio::signal::ctrl_c();
tokio::pin!(shutdown);
loop {
tokio::select! {
_ = &mut shutdown => {
info!("Shutdown signal received, waiting for in-flight checks...");
let deadline = tokio::time::Instant::now() + Duration::from_secs(35);
while !in_flight.lock().await.is_empty() && tokio::time::Instant::now() < deadline {
sleep(Duration::from_millis(500)).await;
}
info!("Shutdown complete");
break;
}
_ = sleep(Duration::from_millis(1000)) => {
match runner::fetch_and_run(&client, &coordinator_url, &monitor_token, &region, &in_flight).await {
Ok(n) => { if n > 0 { info!("Spawned {n} checks"); } },
Err(e) => error!("Check cycle failed: {e}"),
}
sleep(Duration::from_millis(1000)).await;
}
}
}
Ok(())
}

View File

@ -105,9 +105,13 @@ pub fn evaluate(query: &Value, response: &Response) -> Result<bool> {
Value::Object(m) => m,
_ => bail!("$select expects an object {{ selector: condition }}"),
};
// Parse HTML once for all selectors
let doc = Html::parse_document(&response.body);
for (selector, condition) in sel_obj {
let selected = css_select(&response.body, selector)
.map(Value::String)
let sel = Selector::parse(selector)
.map_err(|_| anyhow::anyhow!("Invalid CSS selector: {selector}"))?;
let selected = doc.select(&sel).next()
.map(|el| Value::String(el.text().collect::<String>().trim().to_string()))
.unwrap_or(Value::Null);
if !eval_condition(condition, &selected, response)? { return Ok(false); }
}
@ -244,7 +248,10 @@ fn eval_op(op: &str, field_val: &Value, val: &Value, response: &Response) -> Res
// If no comparison operator follows, just check existence
selected.is_some()
}
_ => true, // unknown op — skip
_ => {
tracing::warn!("Unknown query operator: {op}");
false
}
};
Ok(ok)
}

View File

@ -8,6 +8,17 @@ use std::time::Instant;
use tokio::sync::Mutex;
use tracing::{debug, warn};
// Cache native root certs per OS thread to avoid reloading from disk on every check.
thread_local! {
static ROOT_CERTS: Arc<Vec<ureq::tls::Certificate<'static>>> = Arc::new(
rustls_native_certs::load_native_certs()
.certs
.into_iter()
.map(|c| ureq::tls::Certificate::from_der(c.as_ref()).to_owned())
.collect()
);
}
/// Fetch due monitors from coordinator, run them, post results back.
pub async fn fetch_and_run(
client: &reqwest::Client,
@ -34,9 +45,10 @@ pub async fn fetch_and_run(
let n = monitors.len();
if n == 0 { return Ok(0); }
// run_id is computed deterministically per monitor+interval bucket so all regions
// checking within the same scheduled window share the same ID.
// Format: first 8 chars of monitor_id + ':' + floor(scheduled_at_epoch / interval_s)
// Shared read-only strings — clone the Arc instead of allocating per monitor
let coordinator_url: Arc<str> = Arc::from(coordinator_url);
let token: Arc<str> = Arc::from(token);
let region: Arc<str> = Arc::from(region);
// Spawn all checks — fire and forget, skip if already in-flight
let mut spawned = 0usize;
@ -51,9 +63,9 @@ pub async fn fetch_and_run(
}
spawned += 1;
let client = client.clone();
let coordinator_url = coordinator_url.to_string();
let token = token.to_string();
let region_owned = region.to_string();
let coordinator_url = Arc::clone(&coordinator_url);
let token = Arc::clone(&token);
let region_owned = Arc::clone(&region);
// run_id: hash(monitor_id, interval_bucket) — same across all regions for this window
let run_id_owned = {
use std::collections::hash_map::DefaultHasher;
@ -96,7 +108,7 @@ pub async fn fetch_and_run(
error: Some(format!("timed out after {}ms", timeout_ms)),
cert_expiry_days: None,
meta: None,
region: if region_owned.is_empty() { None } else { Some(region_owned.clone()) },
region: if region_owned.is_empty() { None } else { Some(region_owned.to_string()) },
run_id: Some(run_id_owned.clone()),
},
};
@ -139,7 +151,13 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Op
let (tx, rx) = tokio::sync::oneshot::channel::<Result<(u16, HashMap<String, String>, String), String>>();
std::thread::spawn(move || {
let _ = tx.send(run_check_blocking(&url, &method_clone, req_headers.as_ref(), req_body.as_deref(), timeout));
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
run_check_blocking(&url, &method_clone, req_headers.as_ref(), req_body.as_deref(), timeout)
}));
let _ = tx.send(match result {
Ok(r) => r,
Err(_) => Err("check panicked".to_string()),
});
});
let curl_result = tokio::time::timeout(timeout + std::time::Duration::from_secs(2), rx)
@ -244,17 +262,11 @@ fn run_check_blocking(
body: Option<&str>,
timeout: std::time::Duration,
) -> Result<(u16, HashMap<String, String>, String), String> {
// Load system CA certs so we can verify chains from Cloudflare and other
// CAs not included in the bundled webpki-roots.
let root_certs: Vec<ureq::tls::Certificate<'static>> =
rustls_native_certs::load_native_certs()
.certs
.into_iter()
.map(|c| ureq::tls::Certificate::from_der(c.as_ref()).to_owned())
.collect();
// Reuse cached root certs (loaded once per OS thread via thread_local)
let root_certs = ROOT_CERTS.with(|c| Arc::clone(c));
let tls = ureq::tls::TlsConfig::builder()
.root_certs(ureq::tls::RootCerts::Specific(Arc::new(root_certs)))
.root_certs(ureq::tls::RootCerts::Specific(root_certs))
.build();
let agent = ureq::Agent::config_builder()

View File

@ -1,6 +1,10 @@
import postgres from "postgres";
const sql = postgres(process.env.DATABASE_URL ?? "postgres://pingql:pingql@localhost:5432/pingql");
const sql = postgres(process.env.DATABASE_URL ?? "postgres://pingql:pingql@localhost:5432/pingql", {
max: 20,
idle_timeout: 30,
connect_timeout: 10,
});
export default sql;

View File

@ -317,6 +317,10 @@ export function validateQuery(query: unknown, path = ""): ValidationError[] {
if (typeof value !== "string") {
errors.push({ path: keyPath, message: `${key} expects a string` });
}
} else if (key === "$consider") {
if (value !== "up" && value !== "down") {
errors.push({ path: keyPath, message: '$consider must be "up" or "down"' });
}
} else if (key.startsWith("$")) {
// It's an operator inside a field condition — skip validation here
} else {

View File

@ -1,29 +1,12 @@
import { Elysia, t } from "elysia";
import { createHmac, randomBytes } from "crypto";
import sql from "../db";
import { createRateLimiter } from "../utils/rate-limit";
const EMAIL_HMAC_KEY = process.env.EMAIL_HMAC_KEY || "pingql-default-hmac-key";
// ── Per-IP rate limiting for auth endpoints ───────────────────────────
const authRateMap = new Map<string, { count: number; resetAt: number }>();
function checkAuthRateLimit(ip: string, maxPerMinute: number): boolean {
const now = Date.now();
const entry = authRateMap.get(ip);
if (!entry || now > entry.resetAt) {
authRateMap.set(ip, { count: 1, resetAt: now + 60_000 });
return true;
}
entry.count++;
return entry.count <= maxPerMinute;
}
setInterval(() => {
const now = Date.now();
for (const [key, entry] of authRateMap) {
if (now > entry.resetAt) authRateMap.delete(key);
}
}, 5 * 60_000);
const checkAuthRateLimit = createRateLimiter();
function generateKey(): string {
return randomBytes(32).toString("base64url");

View File

@ -0,0 +1,21 @@
export function createRateLimiter(windowMs = 60_000, cleanupIntervalMs = 5 * 60_000) {
const map = new Map<string, { count: number; resetAt: number }>();
setInterval(() => {
const now = Date.now();
for (const [key, entry] of map) {
if (now > entry.resetAt) map.delete(key);
}
}, cleanupIntervalMs);
return function check(key: string, max: number): boolean {
const now = Date.now();
const entry = map.get(key);
if (!entry || now > entry.resetAt) {
map.set(key, { count: 1, resetAt: now + windowMs });
return true;
}
entry.count++;
return entry.count <= max;
};
}