use crate::query::{self, Response}; use crate::types::{PingResult, Monitor}; use anyhow::Result; use serde_json::json; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Instant; use tokio::sync::Mutex; use tokio::net::TcpStream; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{debug, warn}; use rustls::pki_types::ServerName; use tokio_rustls::TlsConnector; use x509_parser::prelude::*; static ROOT_STORE: std::sync::LazyLock> = std::sync::LazyLock::new(|| { let mut store = rustls::RootCertStore::empty(); store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); Arc::new(store) }); struct CheckResult { status: u16, headers: HashMap, body: String, dns_ms: u64, tcp_ms: u64, tls_ms: u64, http_ms: u64, cert_expiry_days: Option, cert_issuer: Option, } pub async fn fetch_and_run( client: &reqwest::Client, coordinator_url: &str, token: &str, region: &str, in_flight: &Arc>>, ) -> Result { let url = format!("{coordinator_url}/internal/due?region={region}&lookahead_ms=2000"); let resp = client .get(&url) .header("x-monitor-token", token) .send() .await?; let status = resp.status(); let body = resp.text().await?; let monitors: Vec = match serde_json::from_str(&body) { Ok(m) => m, Err(e) => { let preview: String = body.chars().take(500).collect(); anyhow::bail!("/internal/due returned status {status}, body could not be parsed: {e}. Body preview: {preview}"); } }; let n = monitors.len(); if n == 0 { return Ok(0); } let coordinator_url: Arc = Arc::from(coordinator_url); let token: Arc = Arc::from(token); let region: Arc = Arc::from(region); let mut spawned = 0usize; for monitor in monitors { { let mut set = in_flight.lock().await; if set.contains(&monitor.id) { debug!("Skipping {} - check already in flight", monitor.id); continue; } set.insert(monitor.id.clone()); } spawned += 1; let client = client.clone(); let coordinator_url = Arc::clone(&coordinator_url); let token = Arc::clone(&token); let region_owned = Arc::clone(®ion); let run_id_owned = { use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; let bucket = monitor.scheduled_at_ms.unwrap_or_else(|| chrono::Utc::now().timestamp_millis()) / (monitor.interval_s * 1000); let mut h = DefaultHasher::new(); monitor.id.hash(&mut h); bucket.hash(&mut h); format!("{:016x}", h.finish()) }; let scheduled_at_iso = monitor.scheduled_at_ms.map(|ms| { chrono::DateTime::::from_timestamp_millis(ms) .map(|dt| dt.to_rfc3339()) .unwrap_or_default() }); let in_flight = in_flight.clone(); tokio::spawn(async move { if let Some(ms) = monitor.scheduled_at_ms { let now_ms = chrono::Utc::now().timestamp_millis(); if ms > now_ms { let wait = std::time::Duration::from_millis((ms - now_ms) as u64); tokio::time::sleep(wait).await; } } let timeout_ms = monitor.timeout_ms.unwrap_or(30000); let attempts = monitor.max_retries.saturating_add(1) as u64; let retry_gap_s = monitor.retry_interval_s; let deadline = std::time::Duration::from_millis( attempts * (timeout_ms + 5000) + attempts.saturating_sub(1) * retry_gap_s * 1000, ); let result = match tokio::time::timeout(deadline, run_check_with_retries(&monitor, scheduled_at_iso.clone(), ®ion_owned, &run_id_owned)).await { Ok(r) => r, Err(_) => PingResult { monitor_id: monitor.id.clone(), checked_at: Some(chrono::Utc::now().to_rfc3339()), scheduled_at: scheduled_at_iso.clone(), jitter_ms: None, status_code: None, latency_ms: Some(timeout_ms as u64), up: false, error: Some(format!("timed out after {}ms", timeout_ms)), cert_expiry_days: None, cert_issuer: None, response_size: None, meta: None, region: Some(region_owned.to_string()), run_id: Some(run_id_owned.clone()), }, }; if let Err(e) = post_result(&client, &coordinator_url, &token, result).await { warn!("Failed to post result for {}: {e}", monitor.id); } in_flight.lock().await.remove(&monitor.id); }); } Ok(spawned) } async fn run_check_with_retries(monitor: &Monitor, scheduled_at: Option, region: &str, run_id: &str) -> PingResult { let attempts = monitor.max_retries.saturating_add(1); let retry_gap = std::time::Duration::from_secs(monitor.retry_interval_s); let mut last: Option = None; for attempt in 0..attempts { let mut result = run_check(monitor, scheduled_at.clone(), region, run_id).await; if result.up { if attempt > 0 { if let Some(meta) = result.meta.as_mut().and_then(|m| m.as_object_mut()) { meta.insert("retries".into(), json!(attempt)); } } return result; } last = Some(result); if attempt + 1 < attempts { tokio::time::sleep(retry_gap).await; } } let mut result = last.expect("at least one attempt"); if attempts > 1 { let meta = result.meta.get_or_insert_with(|| json!({})); if let Some(obj) = meta.as_object_mut() { obj.insert("retries".into(), json!(attempts - 1)); } } result } async fn run_check(monitor: &Monitor, scheduled_at: Option, region: &str, run_id: &str) -> PingResult { let checked_at = chrono::Utc::now().to_rfc3339(); let jitter_ms: Option = scheduled_at.as_deref().and_then(|s| { let scheduled = chrono::DateTime::parse_from_rfc3339(s).ok()?; let now = chrono::Utc::now(); Some((now - scheduled.with_timezone(&chrono::Utc)).num_milliseconds()) }); let timeout = std::time::Duration::from_millis(monitor.timeout_ms.unwrap_or(30000)); let start = Instant::now(); let result = tokio::time::timeout(timeout, run_check_async( &monitor.url, monitor.method.as_deref().unwrap_or("GET"), monitor.request_headers.as_ref(), monitor.request_body.as_deref(), monitor.max_redirects, )).await; let cr = match result { Err(_) => { return PingResult { monitor_id: monitor.id.clone(), checked_at: Some(checked_at), scheduled_at, jitter_ms, status_code: None, latency_ms: Some(start.elapsed().as_millis() as u64), up: false, error: Some(format!("timed out after {}ms", timeout.as_millis())), cert_expiry_days: None, cert_issuer: None, response_size: None, meta: None, region: Some(region.to_string()), run_id: Some(run_id.to_string()), }; } Ok(Err(e)) => { return PingResult { monitor_id: monitor.id.clone(), checked_at: Some(checked_at), scheduled_at, jitter_ms, status_code: None, latency_ms: Some(start.elapsed().as_millis() as u64), up: false, error: Some(e.to_string()), cert_expiry_days: None, cert_issuer: None, response_size: None, meta: None, region: Some(region.to_string()), run_id: Some(run_id.to_string()), }; } Ok(Ok(cr)) => cr, }; let latency_ms = cr.http_ms; let query = &monitor.query; let (up, query_error) = if let Some(q) = query { let response = Response { status: cr.status, body: cr.body.clone(), headers: cr.headers.clone(), latency_ms: Some(latency_ms), cert_expiry_days: cr.cert_expiry_days, cert_issuer: cr.cert_issuer.clone(), }; match query::evaluate(q, &response) { Ok(result) => (result, None), Err(e) => { warn!("Query error for {}: {e}", monitor.id); (cr.status < 400, Some(e.to_string())) } } } else { ((200..300).contains(&(cr.status as u16)), None) }; let meta = json!({ "headers": cr.headers, "body_preview": &cr.body[..cr.body.len().min(25_000)], }); debug!("{} -> {status} {latency_ms}ms (dns={dns} tcp={tcp} tls={tls} http={http}) up={up}", monitor.url, status=cr.status, dns=cr.dns_ms, tcp=cr.tcp_ms, tls=cr.tls_ms, http=cr.http_ms); PingResult { monitor_id: monitor.id.clone(), checked_at: Some(checked_at), scheduled_at, jitter_ms, status_code: Some(cr.status), latency_ms: Some(latency_ms), up, error: query_error, cert_expiry_days: cr.cert_expiry_days, cert_issuer: cr.cert_issuer, response_size: Some(cr.body.len()), meta: Some(meta), region: Some(region.to_string()), run_id: Some(run_id.to_string()), } } async fn run_check_async( url: &str, method: &str, req_headers: Option<&HashMap>, req_body: Option<&str>, max_redirects: u32, ) -> Result { let mut current_url = url.to_string(); let mut redirects_left = max_redirects; let mut final_result: Option = None; loop { let parsed = reqwest::Url::parse(¤t_url)?; let is_https = parsed.scheme() == "https"; let host = parsed.host_str().ok_or_else(|| anyhow::anyhow!("no host"))?.to_string(); let port = parsed.port().unwrap_or(if is_https { 443 } else { 80 }); let path = if parsed.query().is_some() { format!("{}?{}", parsed.path(), parsed.query().unwrap()) } else { parsed.path().to_string() }; // 1. DNS resolve let dns_start = Instant::now(); let addr = tokio::net::lookup_host(format!("{host}:{port}")) .await? .next() .ok_or_else(|| anyhow::anyhow!("DNS lookup returned no addresses"))?; let dns_ms = dns_start.elapsed().as_millis() as u64; // 2. TCP connect let tcp_start = Instant::now(); let tcp_stream = TcpStream::connect(addr).await?; let tcp_ms = tcp_start.elapsed().as_millis() as u64; // 3. TLS (if HTTPS) let mut tls_ms = 0u64; let mut cert_expiry_days: Option = None; let mut cert_issuer: Option = None; let (status, resp_headers, body, http_ms) = if is_https { let config = rustls::ClientConfig::builder() .with_root_certificates(ROOT_STORE.clone()) .with_no_client_auth(); let connector = TlsConnector::from(Arc::new(config)); let server_name = ServerName::try_from(host.clone())?; let tls_start = Instant::now(); let tls_stream = connector.connect(server_name, tcp_stream).await?; tls_ms = tls_start.elapsed().as_millis() as u64; // Extract cert info from this connection let (_, conn) = tls_stream.get_ref(); if let Some(certs) = conn.peer_certificates() { if let Some(cert_der) = certs.first() { if let Ok((_, cert)) = X509Certificate::from_der(cert_der.as_ref()) { let not_after = cert.validity().not_after.timestamp(); let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs() as i64; cert_expiry_days = Some((not_after - now) / 86400); cert_issuer = Some(cert.issuer().to_string()); } } } // HTTP over TLS do_http_request(tls_stream, &host, &path, method, req_headers, req_body).await? } else { // HTTP over plain TCP do_http_request(tcp_stream, &host, &path, method, req_headers, req_body).await? }; // Check for redirect if (301..=308).contains(&status) && redirects_left > 0 { if let Some(location) = resp_headers.get("location") { let next = if location.starts_with("http://") || location.starts_with("https://") { location.clone() } else { // Relative redirect let base = reqwest::Url::parse(¤t_url)?; base.join(location)?.to_string() }; current_url = next; redirects_left -= 1; continue; } } final_result = Some(CheckResult { status, headers: resp_headers, body, dns_ms, tcp_ms, tls_ms, http_ms, cert_expiry_days, cert_issuer, }); break; } final_result.ok_or_else(|| anyhow::anyhow!("no result")) } async fn do_http_request( stream: S, host: &str, path: &str, method: &str, req_headers: Option<&HashMap>, req_body: Option<&str>, ) -> Result<(u16, HashMap, String, u64)> where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, { use hyper_util::rt::TokioIo; let io = TokioIo::new(stream); let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?; // Spawn the connection driver tokio::spawn(async move { if let Err(e) = conn.await { debug!("connection error: {e}"); } }); // Build request let body_bytes = req_body.unwrap_or("").to_string(); let has_body = req_body.is_some() && !body_bytes.is_empty(); let mut builder = http::Request::builder() .method(method) .uri(path) .header("host", host) .header("user-agent", "Mozilla/5.0 (compatible; PingQL/1.0; +https://pingql.com)"); if let Some(hdrs) = req_headers { let mut has_content_type = false; for (k, v) in hdrs { if k.eq_ignore_ascii_case("content-type") { has_content_type = true; } builder = builder.header(k.as_str(), v.as_str()); } if has_body && !has_content_type { builder = builder.header("content-type", "application/json"); } } else if has_body { builder = builder.header("content-type", "application/json"); } let req = if has_body { builder.body(http_body_util::Full::new(hyper::body::Bytes::from(body_bytes)))? } else { builder.body(http_body_util::Full::new(hyper::body::Bytes::new()))? }; // Time just the HTTP request/response let http_start = Instant::now(); let resp = sender.send_request(req).await?; let status = resp.status().as_u16(); let mut resp_headers = HashMap::new(); for (name, value) in resp.headers() { resp_headers.insert( name.as_str().to_lowercase(), value.to_str().unwrap_or("").to_string(), ); } // Read body with size limit const MAX_BODY: usize = 2 * 1024 * 1024; let body_bytes = http_body_util::BodyExt::collect(resp.into_body()).await?; let mut body_vec = body_bytes.to_bytes().to_vec(); if body_vec.len() > MAX_BODY { body_vec.truncate(MAX_BODY); } let body_str = String::from_utf8_lossy(&body_vec).into_owned(); let http_ms = http_start.elapsed().as_millis() as u64; Ok((status, resp_headers, body_str, http_ms)) } async fn post_result( client: &reqwest::Client, coordinator_url: &str, token: &str, result: PingResult, ) -> Result<()> { let resp = tokio::time::timeout( std::time::Duration::from_secs(10), client .post(format!("{coordinator_url}/internal/ingest")) .header("x-monitor-token", token) .json(&result) .send() ).await .map_err(|_| anyhow::anyhow!("post_result timed out"))? .map_err(|e| anyhow::anyhow!("{e}"))?; if !resp.status().is_success() { let status = resp.status(); let body = resp.text().await.unwrap_or_default(); return Err(anyhow::anyhow!("ingest returned {status}: {body}")); } Ok(()) }