use crate::query::{self, Response}; use crate::types::{PingResult, Monitor}; use anyhow::Result; use serde_json::json; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Instant; use tokio::sync::Mutex; use tracing::{debug, warn}; /// Fetch due monitors from coordinator, run them, post results back. pub async fn fetch_and_run( client: &reqwest::Client, coordinator_url: &str, token: &str, in_flight: &Arc>>, ) -> Result { // Fetch due monitors let monitors: Vec = client .get(format!("{coordinator_url}/internal/due")) .header("x-monitor-token", token) .send() .await? .json() .await?; let n = monitors.len(); if n == 0 { return Ok(0); } // Spawn all checks — fire and forget, skip if already in-flight let mut spawned = 0usize; for monitor in monitors { { let mut set = in_flight.lock().await; if set.contains(&monitor.id) { debug!("Skipping {} — check already in flight", monitor.id); continue; } set.insert(monitor.id.clone()); } spawned += 1; let client = client.clone(); let coordinator_url = coordinator_url.to_string(); let token = token.to_string(); let in_flight = in_flight.clone(); tokio::spawn(async move { let result = run_check(&client, &monitor, monitor.scheduled_at.clone()).await; // Remove from in-flight before posting so a fast next cycle can pick it up in_flight.lock().await.remove(&monitor.id); if let Err(e) = post_result(&client, &coordinator_url, &token, result).await { warn!("Failed to post result for {}: {e}", monitor.id); } }); } Ok(spawned) } async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Option) -> PingResult { // Compute jitter: how late we actually started vs when we were scheduled let jitter_ms: Option = scheduled_at.as_deref().and_then(|s| { let scheduled = chrono::DateTime::parse_from_rfc3339(s).ok()?; let now = chrono::Utc::now(); Some((now - scheduled.with_timezone(&chrono::Utc)).num_milliseconds()) }); let start = Instant::now(); // Check cert expiry for HTTPS URLs let cert_expiry_days = if monitor.url.starts_with("https://") { check_cert_expiry(&monitor.url).await.ok().flatten() } else { None }; // Build request with method, headers, body, timeout let method = monitor.method.as_deref().unwrap_or("GET").to_uppercase(); let timeout = std::time::Duration::from_millis(monitor.timeout_ms.unwrap_or(30000)); let req_method = reqwest::Method::from_bytes(method.as_bytes()) .unwrap_or(reqwest::Method::GET); let mut req = client.request(req_method, &monitor.url).timeout(timeout); if let Some(headers) = &monitor.request_headers { for (k, v) in headers { if let (Ok(name), Ok(value)) = ( reqwest::header::HeaderName::from_bytes(k.as_bytes()), reqwest::header::HeaderValue::from_str(v), ) { req = req.header(name, value); } } } if let Some(body) = &monitor.request_body { req = req.body(body.clone()); } let result = req.send().await; let latency_ms = start.elapsed().as_millis() as u64; match result { Err(e) => PingResult { monitor_id: monitor.id.clone(), scheduled_at, jitter_ms, status_code: None, latency_ms: Some(latency_ms), up: false, error: Some(e.to_string()), cert_expiry_days, meta: None, }, Ok(resp) => { let status = resp.status().as_u16(); let headers: HashMap = resp.headers().iter() .filter_map(|(k, v)| Some((k.to_string(), v.to_str().ok()?.to_string()))) .collect(); // Limit response body to 10MB to prevent OOM from malicious targets const MAX_BODY_BYTES: usize = 10 * 1024 * 1024; let body = { let content_len = resp.content_length().unwrap_or(0) as usize; if content_len > MAX_BODY_BYTES { // Skip reading body entirely if Content-Length exceeds limit format!("[body truncated: Content-Length {} exceeds 10MB limit]", content_len) } else { let bytes = resp.bytes().await.unwrap_or_default(); let truncated = &bytes[..bytes.len().min(MAX_BODY_BYTES)]; String::from_utf8_lossy(truncated).into_owned() } }; // Evaluate query if present let (up, query_error) = if let Some(q) = &monitor.query { let response = Response { status, body: body.clone(), headers: headers.clone(), latency_ms: Some(latency_ms), cert_expiry_days, }; match query::evaluate(q, &response) { Ok(result) => (result, None), Err(e) => { warn!("Query error for {}: {e}", monitor.id); // Fall back to status-based up/down (status < 400, Some(e.to_string())) } } } else { // Default: up if 2xx/3xx (status < 400, None) }; let meta = json!({ "headers": headers, "body_preview": &body[..body.len().min(500)], }); debug!("{} → {status} {latency_ms}ms up={up}", monitor.url); PingResult { monitor_id: monitor.id.clone(), scheduled_at, jitter_ms, status_code: Some(status), latency_ms: Some(latency_ms), up, error: query_error, cert_expiry_days, meta: Some(meta), } } } } /// Check SSL certificate expiry for a given HTTPS URL. /// Returns the number of days until the certificate expires. async fn check_cert_expiry(url: &str) -> Result> { use rustls::ClientConfig; use rustls::pki_types::ServerName; use tokio::net::TcpStream; use tokio_rustls::TlsConnector; use x509_parser::prelude::*; // Parse host and port from URL let url_parsed = reqwest::Url::parse(url)?; let host = url_parsed.host_str().unwrap_or(""); let port = url_parsed.port().unwrap_or(443); // Build a rustls config that captures certificates let mut root_store = rustls::RootCertStore::empty(); root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); let config = ClientConfig::builder() .with_root_certificates(root_store) .with_no_client_auth(); let connector = TlsConnector::from(Arc::new(config)); let server_name = ServerName::try_from(host.to_string())?; let stream = TcpStream::connect(format!("{host}:{port}")).await?; let tls_stream = connector.connect(server_name, stream).await?; // Get peer certificates let (_, conn) = tls_stream.get_ref(); let certs = conn.peer_certificates().unwrap_or(&[]); if let Some(cert_der) = certs.first() { let (_, cert) = X509Certificate::from_der(cert_der.as_ref())?; let not_after = cert.validity().not_after.timestamp(); let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs() as i64; let days = (not_after - now) / 86400; return Ok(Some(days)); } Ok(None) } async fn post_result( client: &reqwest::Client, coordinator_url: &str, token: &str, result: PingResult, ) -> Result<()> { client .post(format!("{coordinator_url}/internal/ingest")) .header("x-monitor-token", token) .json(&result) .send() .await?; Ok(()) }