use crate::query::{self, Response}; use crate::types::{PingResult, Monitor}; use anyhow::Result; use serde_json::json; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Instant; use tokio::sync::Mutex; use tracing::{debug, warn}; thread_local! { static ROOT_CERTS: Arc>> = Arc::new( rustls_native_certs::load_native_certs() .certs .into_iter() .map(|c| ureq::tls::Certificate::from_der(c.as_ref()).to_owned()) .collect() ); } pub async fn fetch_and_run( client: &reqwest::Client, coordinator_url: &str, token: &str, region: &str, in_flight: &Arc>>, ) -> Result { let url = format!("{coordinator_url}/internal/due?region={region}&lookahead_ms=2000"); let resp = client .get(&url) .header("x-monitor-token", token) .send() .await?; let status = resp.status(); let body = resp.text().await?; let monitors: Vec = match serde_json::from_str(&body) { Ok(m) => m, Err(e) => { let preview: String = body.chars().take(500).collect(); anyhow::bail!("/internal/due returned status {status}, body could not be parsed: {e}. Body preview: {preview}"); } }; let n = monitors.len(); if n == 0 { return Ok(0); } let coordinator_url: Arc = Arc::from(coordinator_url); let token: Arc = Arc::from(token); let region: Arc = Arc::from(region); let mut spawned = 0usize; for monitor in monitors { { let mut set = in_flight.lock().await; if set.contains(&monitor.id) { debug!("Skipping {} — check already in flight", monitor.id); continue; } set.insert(monitor.id.clone()); } spawned += 1; let client = client.clone(); let coordinator_url = Arc::clone(&coordinator_url); let token = Arc::clone(&token); let region_owned = Arc::clone(®ion); let run_id_owned = { use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; let bucket = monitor.scheduled_at_ms.unwrap_or_else(|| chrono::Utc::now().timestamp_millis()) / (monitor.interval_s * 1000); let mut h = DefaultHasher::new(); monitor.id.hash(&mut h); bucket.hash(&mut h); format!("{:016x}", h.finish()) }; let scheduled_at_iso = monitor.scheduled_at_ms.map(|ms| { chrono::DateTime::::from_timestamp_millis(ms) .map(|dt| dt.to_rfc3339()) .unwrap_or_default() }); let in_flight = in_flight.clone(); tokio::spawn(async move { if let Some(ms) = monitor.scheduled_at_ms { let now_ms = chrono::Utc::now().timestamp_millis(); if ms > now_ms { let wait = std::time::Duration::from_millis((ms - now_ms) as u64); tokio::time::sleep(wait).await; } } let timeout_ms = monitor.timeout_ms.unwrap_or(30000); let attempts = monitor.max_retries.saturating_add(1) as u64; let retry_gap_s = monitor.retry_interval_s; let deadline = std::time::Duration::from_millis( attempts * (timeout_ms + 5000) + attempts.saturating_sub(1) * retry_gap_s * 1000, ); let result = match tokio::time::timeout(deadline, run_check_with_retries(&client, &monitor, scheduled_at_iso.clone(), ®ion_owned, &run_id_owned)).await { Ok(r) => r, Err(_) => PingResult { monitor_id: monitor.id.clone(), checked_at: Some(chrono::Utc::now().to_rfc3339()), scheduled_at: scheduled_at_iso.clone(), jitter_ms: None, status_code: None, latency_ms: Some(timeout_ms as u64), up: false, error: Some(format!("timed out after {}ms", timeout_ms)), cert_expiry_days: None, cert_issuer: None, response_size: None, meta: None, region: Some(region_owned.to_string()), run_id: Some(run_id_owned.clone()), }, }; if let Err(e) = post_result(&client, &coordinator_url, &token, result).await { warn!("Failed to post result for {}: {e}", monitor.id); } in_flight.lock().await.remove(&monitor.id); }); } Ok(spawned) } async fn run_check_with_retries(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Option, region: &str, run_id: &str) -> PingResult { let attempts = monitor.max_retries.saturating_add(1); let retry_gap = std::time::Duration::from_secs(monitor.retry_interval_s); let mut last: Option = None; for attempt in 0..attempts { let mut result = run_check(client, monitor, scheduled_at.clone(), region, run_id).await; if result.up { if attempt > 0 { if let Some(meta) = result.meta.as_mut().and_then(|m| m.as_object_mut()) { meta.insert("retries".into(), json!(attempt)); } } return result; } last = Some(result); if attempt + 1 < attempts { tokio::time::sleep(retry_gap).await; } } let mut result = last.expect("at least one attempt"); if attempts > 1 { let meta = result.meta.get_or_insert_with(|| json!({})); if let Some(obj) = meta.as_object_mut() { obj.insert("retries".into(), json!(attempts - 1)); } } result } async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Option, region: &str, run_id: &str) -> PingResult { let checked_at = chrono::Utc::now().to_rfc3339(); let jitter_ms: Option = scheduled_at.as_deref().and_then(|s| { let scheduled = chrono::DateTime::parse_from_rfc3339(s).ok()?; let now = chrono::Utc::now(); Some((now - scheduled.with_timezone(&chrono::Utc)).num_milliseconds()) }); let start = Instant::now(); let method = monitor.method.as_deref().unwrap_or("GET").to_uppercase(); let timeout = std::time::Duration::from_millis(monitor.timeout_ms.unwrap_or(30000)); let is_https = monitor.url.starts_with("https://"); let url = monitor.url.clone(); let req_headers = monitor.request_headers.clone(); let req_body = monitor.request_body.clone(); let max_redirects = monitor.max_redirects; let (tx, rx) = tokio::sync::oneshot::channel::, String), String>>(); std::thread::spawn(move || { let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { run_check_blocking(&url, &method, req_headers.as_ref(), req_body.as_deref(), timeout, max_redirects) })); let _ = tx.send(match result { Ok(r) => r, Err(_) => Err("check panicked".to_string()), }); }); let result = tokio::time::timeout(timeout + std::time::Duration::from_secs(2), rx) .await .map_err(|_| format!("timed out after {}ms", timeout.as_millis())) .and_then(|r| r.map_err(|_| "check thread dropped".to_string())) .unwrap_or_else(|e| Err(e)); let latency_ms = start.elapsed().as_millis() as u64; match result { Err(ref e) => { debug!("{} check error: {e}", monitor.url); PingResult { monitor_id: monitor.id.clone(), checked_at: Some(checked_at.clone()), scheduled_at, jitter_ms, status_code: None, latency_ms: Some(latency_ms), up: false, error: Some(e.clone()), cert_expiry_days: None, cert_issuer: None, response_size: None, meta: None, region: Some(region.to_string()), run_id: Some(run_id.to_string()), } }, Ok((status, headers, body)) => { let cert_handle = if is_https { let cert_url = monitor.url.clone(); Some(tokio::spawn(async move { match tokio::time::timeout( std::time::Duration::from_secs(5), check_cert(&cert_url), ).await { Ok(Ok(info)) => info, _ => None, } })) } else { None }; let query = &monitor.query; let (up, query_error) = if let Some(q) = query { let response = Response { status, body: body.clone(), headers: headers.clone(), latency_ms: Some(latency_ms), cert_expiry_days: None, cert_issuer: None, }; match query::evaluate(q, &response) { Ok(result) => (result, None), Err(e) => { warn!("Query error for {}: {e}", monitor.id); (status < 400, Some(e.to_string())) } } } else { ((200..300).contains(&status), None) }; let cert_info = match cert_handle { Some(h) => h.await.unwrap_or(None), None => None, }; let cert_expiry_days = cert_info.as_ref().map(|c| c.expiry_days); let cert_issuer = cert_info.map(|c| c.issuer); let meta = json!({ "headers": headers, "body_preview": &body[..body.len().min(25_000)], }); debug!("{} → {status} {latency_ms}ms up={up}", monitor.url); PingResult { monitor_id: monitor.id.clone(), checked_at: Some(checked_at), scheduled_at, jitter_ms, status_code: Some(status), latency_ms: Some(latency_ms), up, error: query_error, cert_expiry_days, cert_issuer, response_size: Some(body.len()), meta: Some(meta), region: Some(region.to_string()), run_id: Some(run_id.to_string()), } } } } fn run_check_blocking( url: &str, method: &str, headers: Option<&HashMap>, body: Option<&str>, timeout: std::time::Duration, max_redirects: u32, ) -> Result<(u16, HashMap, String), String> { let root_certs = ROOT_CERTS.with(|c| Arc::clone(c)); let tls = ureq::tls::TlsConfig::builder() .root_certs(ureq::tls::RootCerts::Specific(root_certs)) .build(); let agent = ureq::Agent::config_builder() .timeout_global(Some(timeout)) .timeout_connect(Some(timeout)) .http_status_as_error(false) .max_redirects(max_redirects) .user_agent("Mozilla/5.0 (compatible; PingQL/1.0; +https://pingql.com)") .tls_config(tls) .build() .new_agent(); let mut builder = ureq::http::Request::builder() .method(method) .uri(url); let mut has_content_type = false; if let Some(hdrs) = headers { for (k, v) in hdrs { if k.eq_ignore_ascii_case("content-type") { has_content_type = true; } builder = builder.header(k.as_str(), v.as_str()); } } let result = match body { Some(b) => { if !has_content_type { builder = builder.header("Content-Type", "application/json"); } let req = builder.body(b.as_bytes()).map_err(|e| e.to_string())?; agent.run(req) } None => { let req = builder.body(()).map_err(|e| e.to_string())?; agent.run(req) } }; let mut resp = match result { Err(e) => { let msg = e.to_string(); let friendly = if msg.contains("timed out") || msg.contains("timeout") || msg.contains("TimedOut") { format!("timed out after {}ms", timeout.as_millis()) } else { msg }; return Err(friendly); } Ok(r) => r, }; let status = resp.status().as_u16(); let mut resp_headers = HashMap::new(); for (name, value) in resp.headers() { resp_headers.insert(name.as_str().to_lowercase(), value.to_str().unwrap_or("").to_string()); } const MAX_BODY: usize = 2 * 1024 * 1024; let body_out = match resp.body_mut().with_config().limit((MAX_BODY + 1) as u64).read_to_vec() { Ok(mut buf) => { if buf.len() > MAX_BODY { buf.truncate(MAX_BODY); } String::from_utf8_lossy(&buf).into_owned() } Err(e) => { let msg = e.to_string(); if msg.contains("limit") || msg.contains("Limit") { format!("[body exceeded {}-byte cap]", MAX_BODY) } else { format!("[failed to read body: {}]", e) } } }; Ok((status, resp_headers, body_out)) } struct CertInfo { expiry_days: i64, issuer: String, } async fn check_cert(url: &str) -> Result> { use rustls::ClientConfig; use rustls::pki_types::ServerName; use tokio::net::TcpStream; use tokio_rustls::TlsConnector; use x509_parser::prelude::*; let url_parsed = reqwest::Url::parse(url)?; let host = url_parsed.host_str().unwrap_or(""); let port = url_parsed.port().unwrap_or(443); let mut root_store = rustls::RootCertStore::empty(); root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); let config = ClientConfig::builder() .with_root_certificates(root_store) .with_no_client_auth(); let connector = TlsConnector::from(Arc::new(config)); let server_name = ServerName::try_from(host.to_string())?; let stream = TcpStream::connect(format!("{host}:{port}")).await?; let tls_stream = connector.connect(server_name, stream).await?; let (_, conn) = tls_stream.get_ref(); let certs = conn.peer_certificates().unwrap_or(&[]); if let Some(cert_der) = certs.first() { let (_, cert) = X509Certificate::from_der(cert_der.as_ref())?; let not_after = cert.validity().not_after.timestamp(); let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs() as i64; let days = (not_after - now) / 86400; let issuer = cert.issuer().to_string(); return Ok(Some(CertInfo { expiry_days: days, issuer })); } Ok(None) } async fn post_result( client: &reqwest::Client, coordinator_url: &str, token: &str, result: PingResult, ) -> Result<()> { let resp = tokio::time::timeout( std::time::Duration::from_secs(10), client .post(format!("{coordinator_url}/internal/ingest")) .header("x-monitor-token", token) .json(&result) .send() ).await .map_err(|_| anyhow::anyhow!("post_result timed out"))? .map_err(|e| anyhow::anyhow!("{e}"))?; if !resp.status().is_success() { let status = resp.status(); let body = resp.text().await.unwrap_or_default(); return Err(anyhow::anyhow!("ingest returned {status}: {body}")); } Ok(()) }