From 27d478c41a9b8637edb79d1ce23dec3a94cc27c2 Mon Sep 17 00:00:00 2001 From: nate Date: Fri, 10 Apr 2026 07:29:46 +0400 Subject: [PATCH] fix: improve monitor timings --- apps/monitor/Cargo.toml | 12 +- apps/monitor/src/runner.rs | 493 +++++++++++++++++++------------------ 2 files changed, 264 insertions(+), 241 deletions(-) diff --git a/apps/monitor/Cargo.toml b/apps/monitor/Cargo.toml index f40935c..0fdd2b5 100644 --- a/apps/monitor/Cargo.toml +++ b/apps/monitor/Cargo.toml @@ -6,18 +6,20 @@ edition = "2021" [dependencies] tokio = { version = "1", features = ["full"] } reqwest = { version = "0.12", features = ["json", "native-tls"], default-features = false } -ureq = { version = "3", features = ["rustls", "_ring"] } +hyper = { version = "1", features = ["client", "http1", "http2"] } +hyper-util = { version = "0.1", features = ["client-legacy", "tokio", "http1", "http2"] } +http-body-util = "0.1" serde = { version = "1", features = ["derive"] } serde_json = "1" -scraper = "0.21" # CSS selector / HTML parsing +scraper = "0.26" regex = "1" anyhow = "1" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } rustls = { version = "0.23", features = ["ring"] } rustls-native-certs = "0.8" -webpki-roots = "0.26" -x509-parser = "0.16" +webpki-roots = "1" +x509-parser = "0.18" tokio-rustls = "0.26" chrono = { version = "0.4", features = ["serde"] } - +http = "1" diff --git a/apps/monitor/src/runner.rs b/apps/monitor/src/runner.rs index 5063db4..b36e6ed 100644 --- a/apps/monitor/src/runner.rs +++ b/apps/monitor/src/runner.rs @@ -6,16 +6,30 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Instant; use tokio::sync::Mutex; +use tokio::net::TcpStream; +use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{debug, warn}; +use rustls::pki_types::ServerName; +use tokio_rustls::TlsConnector; +use x509_parser::prelude::*; -thread_local! { - static ROOT_CERTS: Arc>> = Arc::new( - rustls_native_certs::load_native_certs() - .certs - .into_iter() - .map(|c| ureq::tls::Certificate::from_der(c.as_ref()).to_owned()) - .collect() - ); + +static ROOT_STORE: std::sync::LazyLock> = std::sync::LazyLock::new(|| { + let mut store = rustls::RootCertStore::empty(); + store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); + Arc::new(store) +}); + +struct CheckResult { + status: u16, + headers: HashMap, + body: String, + dns_ms: u64, + tcp_ms: u64, + tls_ms: u64, + http_ms: u64, + cert_expiry_days: Option, + cert_issuer: Option, } pub async fn fetch_and_run( @@ -53,7 +67,7 @@ pub async fn fetch_and_run( { let mut set = in_flight.lock().await; if set.contains(&monitor.id) { - debug!("Skipping {} — check already in flight", monitor.id); + debug!("Skipping {} - check already in flight", monitor.id); continue; } set.insert(monitor.id.clone()); @@ -93,7 +107,7 @@ pub async fn fetch_and_run( let deadline = std::time::Duration::from_millis( attempts * (timeout_ms + 5000) + attempts.saturating_sub(1) * retry_gap_s * 1000, ); - let result = match tokio::time::timeout(deadline, run_check_with_retries(&client, &monitor, scheduled_at_iso.clone(), ®ion_owned, &run_id_owned)).await { + let result = match tokio::time::timeout(deadline, run_check_with_retries(&monitor, scheduled_at_iso.clone(), ®ion_owned, &run_id_owned)).await { Ok(r) => r, Err(_) => PingResult { monitor_id: monitor.id.clone(), @@ -122,12 +136,12 @@ pub async fn fetch_and_run( Ok(spawned) } -async fn run_check_with_retries(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Option, region: &str, run_id: &str) -> PingResult { +async fn run_check_with_retries(monitor: &Monitor, scheduled_at: Option, region: &str, run_id: &str) -> PingResult { let attempts = monitor.max_retries.saturating_add(1); let retry_gap = std::time::Duration::from_secs(monitor.retry_interval_s); let mut last: Option = None; for attempt in 0..attempts { - let mut result = run_check(client, monitor, scheduled_at.clone(), region, run_id).await; + let mut result = run_check(monitor, scheduled_at.clone(), region, run_id).await; if result.up { if attempt > 0 { if let Some(meta) = result.meta.as_mut().and_then(|m| m.as_object_mut()) { @@ -151,7 +165,7 @@ async fn run_check_with_retries(client: &reqwest::Client, monitor: &Monitor, sch result } -async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Option, region: &str, run_id: &str) -> PingResult { +async fn run_check(monitor: &Monitor, scheduled_at: Option, region: &str, run_id: &str) -> PingResult { let checked_at = chrono::Utc::now().to_rfc3339(); let jitter_ms: Option = scheduled_at.as_deref().and_then(|s| { let scheduled = chrono::DateTime::parse_from_rfc3339(s).ok()?; @@ -159,283 +173,290 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Op Some((now - scheduled.with_timezone(&chrono::Utc)).num_milliseconds()) }); - let start = Instant::now(); - let method = monitor.method.as_deref().unwrap_or("GET").to_uppercase(); let timeout = std::time::Duration::from_millis(monitor.timeout_ms.unwrap_or(30000)); - let is_https = monitor.url.starts_with("https://"); + let start = Instant::now(); - let url = monitor.url.clone(); - let req_headers = monitor.request_headers.clone(); - let req_body = monitor.request_body.clone(); - let max_redirects = monitor.max_redirects; + let result = tokio::time::timeout(timeout, run_check_async( + &monitor.url, + monitor.method.as_deref().unwrap_or("GET"), + monitor.request_headers.as_ref(), + monitor.request_body.as_deref(), + monitor.max_redirects, + )).await; - let (tx, rx) = tokio::sync::oneshot::channel::, String, u64, u64), String>>(); - std::thread::spawn(move || { - let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { - run_check_blocking(&url, &method, req_headers.as_ref(), req_body.as_deref(), timeout, max_redirects) - })); - let _ = tx.send(match result { - Ok(r) => r, - Err(_) => Err("check panicked".to_string()), - }); - }); - - let result = tokio::time::timeout(timeout + std::time::Duration::from_secs(2), rx) - .await - .map_err(|_| format!("timed out after {}ms", timeout.as_millis())) - .and_then(|r| r.map_err(|_| "check thread dropped".to_string())) - .unwrap_or_else(|e| Err(e)); - - match result { - Err(ref e) => { - let latency_ms = start.elapsed().as_millis() as u64; - debug!("{} check error: {e}", monitor.url); - PingResult { + let cr = match result { + Err(_) => { + return PingResult { monitor_id: monitor.id.clone(), - checked_at: Some(checked_at.clone()), + checked_at: Some(checked_at), scheduled_at, jitter_ms, status_code: None, - latency_ms: Some(latency_ms), + latency_ms: Some(start.elapsed().as_millis() as u64), up: false, - error: Some(e.clone()), + error: Some(format!("timed out after {}ms", timeout.as_millis())), cert_expiry_days: None, cert_issuer: None, response_size: None, meta: None, region: Some(region.to_string()), run_id: Some(run_id.to_string()), - } - }, - Ok((status, headers, body, total_ms, dns_ms)) => { - - let cert_handle = if is_https { - let cert_url = monitor.url.clone(); - Some(tokio::spawn(async move { - match tokio::time::timeout( - std::time::Duration::from_secs(5), - check_cert(&cert_url), - ).await { - Ok(Ok(info)) => info, - _ => None, - } - })) - } else { - None }; - - let query = &monitor.query; - - let (up, query_error) = if let Some(q) = query { - let response = Response { - status, - body: body.clone(), - headers: headers.clone(), - latency_ms: Some(total_ms.saturating_sub(dns_ms)), - cert_expiry_days: None, - cert_issuer: None, - }; - match query::evaluate(q, &response) { - Ok(result) => (result, None), - Err(e) => { - warn!("Query error for {}: {e}", monitor.id); - (status < 400, Some(e.to_string())) - } - } - } else { - ((200..300).contains(&status), None) - }; - - let cert_info = match cert_handle { - Some(h) => h.await.unwrap_or(None), - None => None, - }; - let cert_expiry_days = cert_info.as_ref().map(|c| c.expiry_days); - let tls_ms = cert_info.as_ref().map(|c| c.tls_ms).unwrap_or(0); - let cert_issuer = cert_info.map(|c| c.issuer); - - // Subtract DNS and TLS time from total to get server response time only - let latency_ms = total_ms.saturating_sub(dns_ms).saturating_sub(tls_ms); - - let meta = json!({ - "headers": headers, - "body_preview": &body[..body.len().min(25_000)], - }); - - debug!("{} → {status} {latency_ms}ms (total={total_ms} dns={dns_ms} tls={tls_ms}) up={up}", monitor.url); - - PingResult { + } + Ok(Err(e)) => { + return PingResult { monitor_id: monitor.id.clone(), checked_at: Some(checked_at), scheduled_at, jitter_ms, - status_code: Some(status), - latency_ms: Some(latency_ms), - up, - error: query_error, - cert_expiry_days, - cert_issuer, - response_size: Some(body.len()), - meta: Some(meta), + status_code: None, + latency_ms: Some(start.elapsed().as_millis() as u64), + up: false, + error: Some(e.to_string()), + cert_expiry_days: None, + cert_issuer: None, + response_size: None, + meta: None, region: Some(region.to_string()), run_id: Some(run_id.to_string()), + }; + } + Ok(Ok(cr)) => cr, + }; + + let latency_ms = cr.http_ms; + + let query = &monitor.query; + let (up, query_error) = if let Some(q) = query { + let response = Response { + status: cr.status, + body: cr.body.clone(), + headers: cr.headers.clone(), + latency_ms: Some(latency_ms), + cert_expiry_days: cr.cert_expiry_days, + cert_issuer: cr.cert_issuer.clone(), + }; + match query::evaluate(q, &response) { + Ok(result) => (result, None), + Err(e) => { + warn!("Query error for {}: {e}", monitor.id); + (cr.status < 400, Some(e.to_string())) } } + } else { + ((200..300).contains(&(cr.status as u16)), None) + }; + + let meta = json!({ + "headers": cr.headers, + "body_preview": &cr.body[..cr.body.len().min(25_000)], + }); + + debug!("{} -> {status} {latency_ms}ms (dns={dns} tcp={tcp} tls={tls} http={http}) up={up}", + monitor.url, status=cr.status, dns=cr.dns_ms, tcp=cr.tcp_ms, tls=cr.tls_ms, http=cr.http_ms); + + PingResult { + monitor_id: monitor.id.clone(), + checked_at: Some(checked_at), + scheduled_at, + jitter_ms, + status_code: Some(cr.status), + latency_ms: Some(latency_ms), + up, + error: query_error, + cert_expiry_days: cr.cert_expiry_days, + cert_issuer: cr.cert_issuer, + response_size: Some(cr.body.len()), + meta: Some(meta), + region: Some(region.to_string()), + run_id: Some(run_id.to_string()), } } -// Returns (status, headers, body, total_ms, dns_ms) -fn run_check_blocking( +async fn run_check_async( url: &str, method: &str, - headers: Option<&HashMap>, - body: Option<&str>, - timeout: std::time::Duration, + req_headers: Option<&HashMap>, + req_body: Option<&str>, max_redirects: u32, -) -> Result<(u16, HashMap, String, u64, u64), String> { - // Measure DNS resolution time separately (lookup only, no connection) - let dns_ms = { - let parsed = reqwest::Url::parse(url).map_err(|e| e.to_string())?; - let host = parsed.host_str().unwrap_or(""); - let port = parsed.port().unwrap_or(if parsed.scheme() == "https" { 443 } else { 80 }); - let addr = format!("{host}:{port}"); +) -> Result { + let mut current_url = url.to_string(); + let mut redirects_left = max_redirects; + let mut final_result: Option = None; + + loop { + let parsed = reqwest::Url::parse(¤t_url)?; + let is_https = parsed.scheme() == "https"; + let host = parsed.host_str().ok_or_else(|| anyhow::anyhow!("no host"))?.to_string(); + let port = parsed.port().unwrap_or(if is_https { 443 } else { 80 }); + let path = if parsed.query().is_some() { + format!("{}?{}", parsed.path(), parsed.query().unwrap()) + } else { + parsed.path().to_string() + }; + + // 1. DNS resolve let dns_start = Instant::now(); - let _ = std::net::ToSocketAddrs::to_socket_addrs(&addr as &str); - dns_start.elapsed().as_millis() as u64 - }; + let addr = tokio::net::lookup_host(format!("{host}:{port}")) + .await? + .next() + .ok_or_else(|| anyhow::anyhow!("DNS lookup returned no addresses"))?; + let dns_ms = dns_start.elapsed().as_millis() as u64; - let root_certs = ROOT_CERTS.with(|c| Arc::clone(c)); + // 2. TCP connect + let tcp_start = Instant::now(); + let tcp_stream = TcpStream::connect(addr).await?; + let tcp_ms = tcp_start.elapsed().as_millis() as u64; - let tls = ureq::tls::TlsConfig::builder() - .root_certs(ureq::tls::RootCerts::Specific(root_certs)) - .build(); + // 3. TLS (if HTTPS) + let mut tls_ms = 0u64; + let mut cert_expiry_days: Option = None; + let mut cert_issuer: Option = None; - let agent = ureq::Agent::config_builder() - .timeout_global(Some(timeout)) - .timeout_connect(Some(timeout)) - .http_status_as_error(false) - .max_redirects(max_redirects) - .user_agent("Mozilla/5.0 (compatible; PingQL/1.0; +https://pingql.com)") - .tls_config(tls) - .build() - .new_agent(); + let (status, resp_headers, body, http_ms) = if is_https { + let config = rustls::ClientConfig::builder() + .with_root_certificates(ROOT_STORE.clone()) + .with_no_client_auth(); + let connector = TlsConnector::from(Arc::new(config)); + let server_name = ServerName::try_from(host.clone())?; - let request_start = Instant::now(); + let tls_start = Instant::now(); + let tls_stream = connector.connect(server_name, tcp_stream).await?; + tls_ms = tls_start.elapsed().as_millis() as u64; - let mut builder = ureq::http::Request::builder() + // Extract cert info from this connection + let (_, conn) = tls_stream.get_ref(); + if let Some(certs) = conn.peer_certificates() { + if let Some(cert_der) = certs.first() { + if let Ok((_, cert)) = X509Certificate::from_der(cert_der.as_ref()) { + let not_after = cert.validity().not_after.timestamp(); + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64; + cert_expiry_days = Some((not_after - now) / 86400); + cert_issuer = Some(cert.issuer().to_string()); + } + } + } + + // HTTP over TLS + do_http_request(tls_stream, &host, &path, method, req_headers, req_body).await? + } else { + // HTTP over plain TCP + do_http_request(tcp_stream, &host, &path, method, req_headers, req_body).await? + }; + + // Check for redirect + if (301..=308).contains(&status) && redirects_left > 0 { + if let Some(location) = resp_headers.get("location") { + let next = if location.starts_with("http://") || location.starts_with("https://") { + location.clone() + } else { + // Relative redirect + let base = reqwest::Url::parse(¤t_url)?; + base.join(location)?.to_string() + }; + current_url = next; + redirects_left -= 1; + continue; + } + } + + final_result = Some(CheckResult { + status, + headers: resp_headers, + body, + dns_ms, + tcp_ms, + tls_ms, + http_ms, + cert_expiry_days, + cert_issuer, + }); + break; + } + + final_result.ok_or_else(|| anyhow::anyhow!("no result")) +} + +async fn do_http_request( + stream: S, + host: &str, + path: &str, + method: &str, + req_headers: Option<&HashMap>, + req_body: Option<&str>, +) -> Result<(u16, HashMap, String, u64)> +where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + use hyper_util::rt::TokioIo; + + let io = TokioIo::new(stream); + let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?; + + // Spawn the connection driver + tokio::spawn(async move { + if let Err(e) = conn.await { + debug!("connection error: {e}"); + } + }); + + // Build request + let body_bytes = req_body.unwrap_or("").to_string(); + let has_body = req_body.is_some() && !body_bytes.is_empty(); + + let mut builder = http::Request::builder() .method(method) - .uri(url); + .uri(path) + .header("host", host) + .header("user-agent", "Mozilla/5.0 (compatible; PingQL/1.0; +https://pingql.com)"); - let mut has_content_type = false; - if let Some(hdrs) = headers { + if let Some(hdrs) = req_headers { + let mut has_content_type = false; for (k, v) in hdrs { if k.eq_ignore_ascii_case("content-type") { has_content_type = true; } builder = builder.header(k.as_str(), v.as_str()); } + if has_body && !has_content_type { + builder = builder.header("content-type", "application/json"); + } + } else if has_body { + builder = builder.header("content-type", "application/json"); } - let result = match body { - Some(b) => { - if !has_content_type { - builder = builder.header("Content-Type", "application/json"); - } - let req = builder.body(b.as_bytes()).map_err(|e| e.to_string())?; - agent.run(req) - } - None => { - let req = builder.body(()).map_err(|e| e.to_string())?; - agent.run(req) - } - }; - - let mut resp = match result { - Err(e) => { - let msg = e.to_string(); - let friendly = if msg.contains("timed out") || msg.contains("timeout") || msg.contains("TimedOut") { - format!("timed out after {}ms", timeout.as_millis()) - } else { - msg - }; - return Err(friendly); - } - Ok(r) => r, + let req = if has_body { + builder.body(http_body_util::Full::new(hyper::body::Bytes::from(body_bytes)))? + } else { + builder.body(http_body_util::Full::new(hyper::body::Bytes::new()))? }; + // Time just the HTTP request/response + let http_start = Instant::now(); + let resp = sender.send_request(req).await?; let status = resp.status().as_u16(); let mut resp_headers = HashMap::new(); for (name, value) in resp.headers() { - resp_headers.insert(name.as_str().to_lowercase(), value.to_str().unwrap_or("").to_string()); + resp_headers.insert( + name.as_str().to_lowercase(), + value.to_str().unwrap_or("").to_string(), + ); } + // Read body with size limit const MAX_BODY: usize = 2 * 1024 * 1024; - let body_out = match resp.body_mut().with_config().limit((MAX_BODY + 1) as u64).read_to_vec() { - Ok(mut buf) => { - if buf.len() > MAX_BODY { buf.truncate(MAX_BODY); } - String::from_utf8_lossy(&buf).into_owned() - } - Err(e) => { - let msg = e.to_string(); - if msg.contains("limit") || msg.contains("Limit") { - format!("[body exceeded {}-byte cap]", MAX_BODY) - } else { - format!("[failed to read body: {}]", e) - } - } - }; - - let total_ms = request_start.elapsed().as_millis() as u64; - Ok((status, resp_headers, body_out, total_ms, dns_ms)) -} - -struct CertInfo { - expiry_days: i64, - issuer: String, - tls_ms: u64, -} - -async fn check_cert(url: &str) -> Result> { - use rustls::ClientConfig; - use rustls::pki_types::ServerName; - use tokio::net::TcpStream; - use tokio_rustls::TlsConnector; - use x509_parser::prelude::*; - - let url_parsed = reqwest::Url::parse(url)?; - let host = url_parsed.host_str().unwrap_or(""); - let port = url_parsed.port().unwrap_or(443); - - let mut root_store = rustls::RootCertStore::empty(); - root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); - - let config = ClientConfig::builder() - .with_root_certificates(root_store) - .with_no_client_auth(); - - let connector = TlsConnector::from(Arc::new(config)); - let server_name = ServerName::try_from(host.to_string())?; - - let stream = TcpStream::connect(format!("{host}:{port}")).await?; - let tls_start = Instant::now(); - let tls_stream = connector.connect(server_name, stream).await?; - let tls_ms = tls_start.elapsed().as_millis() as u64; - - let (_, conn) = tls_stream.get_ref(); - let certs = conn.peer_certificates().unwrap_or(&[]); - - if let Some(cert_der) = certs.first() { - let (_, cert) = X509Certificate::from_der(cert_der.as_ref())?; - let not_after = cert.validity().not_after.timestamp(); - let now = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs() as i64; - let days = (not_after - now) / 86400; - let issuer = cert.issuer().to_string(); - return Ok(Some(CertInfo { expiry_days: days, issuer, tls_ms })); + let body_bytes = http_body_util::BodyExt::collect(resp.into_body()).await?; + let mut body_vec = body_bytes.to_bytes().to_vec(); + if body_vec.len() > MAX_BODY { + body_vec.truncate(MAX_BODY); } + let body_str = String::from_utf8_lossy(&body_vec).into_owned(); - Ok(None) + let http_ms = http_start.elapsed().as_millis() as u64; + + Ok((status, resp_headers, body_str, http_ms)) } async fn post_result(