diff --git a/apps/monitor/Cargo.toml b/apps/monitor/Cargo.toml index af4a9f0..843568a 100644 --- a/apps/monitor/Cargo.toml +++ b/apps/monitor/Cargo.toml @@ -5,7 +5,8 @@ edition = "2021" [dependencies] tokio = { version = "1", features = ["full"] } -reqwest = { version = "0.12", features = ["json", "native-tls", "blocking"], default-features = false } +reqwest = { version = "0.12", features = ["json", "native-tls"], default-features = false } +ureq = { version = "3", features = ["native-tls"] } serde = { version = "1", features = ["derive"] } serde_json = "1" scraper = "0.21" # CSS selector / HTML parsing diff --git a/apps/monitor/src/runner.rs b/apps/monitor/src/runner.rs index 2c67724..7bb96fe 100644 --- a/apps/monitor/src/runner.rs +++ b/apps/monitor/src/runner.rs @@ -88,16 +88,25 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Op let is_https = monitor.url.starts_with("https://"); let url_for_cert = monitor.url.clone(); - // Use curl subprocess — the only reliable way to enforce a hard timeout - // including TLS handshake on hosts that accept TCP but stall at TLS. - let timeout_secs = (timeout.as_millis() as f64 / 1000.0).max(1.0); - let curl_result = run_curl( - &monitor.url, - &method, - monitor.request_headers.as_ref(), - monitor.request_body.as_deref(), - timeout_secs, - ).await; + // Run the check in a real OS thread using ureq (blocking, synchronous HTTP). + // ureq sets SO_RCVTIMEO/SO_SNDTIMEO at the socket level, which reliably + // interrupts even a hanging TLS handshake — unlike async reqwest which + // cannot cancel syscall-level blocks via future cancellation. + let url = monitor.url.clone(); + let req_headers = monitor.request_headers.clone(); + let req_body = monitor.request_body.clone(); + let method_clone = method.clone(); + + let (tx, rx) = tokio::sync::oneshot::channel::, String), String>>(); + std::thread::spawn(move || { + let _ = tx.send(run_check_blocking(&url, &method_clone, req_headers.as_ref(), req_body.as_deref(), timeout)); + }); + + let curl_result = tokio::time::timeout(timeout + std::time::Duration::from_secs(2), rx) + .await + .map_err(|_| format!("timed out after {}ms", timeout.as_millis())) + .and_then(|r| r.map_err(|_| "check thread dropped".to_string())) + .unwrap_or_else(|e| Err(e)); let latency_ms = start.elapsed().as_millis() as u64; @@ -180,109 +189,75 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Op } } -/// Run an HTTP check via curl subprocess writing output to a temp file. -/// Using a temp file instead of a pipe avoids async read_to_end hanging -/// when curl exits without closing its inherited stdout properly. -async fn run_curl( +/// Run an HTTP check synchronously using ureq. +/// ureq applies timeouts at the socket/IO level (not just future cancellation), +/// which reliably interrupts hanging TLS handshakes. +/// Must be called from a std::thread (not async context). +fn run_check_blocking( url: &str, method: &str, headers: Option<&HashMap>, body: Option<&str>, - timeout_secs: f64, + timeout: std::time::Duration, ) -> Result<(u16, HashMap, String), String> { - use std::process::Stdio; + let agent = ureq::Agent::config_builder() + .timeout_global(Some(timeout)) + .timeout_connect(Some(timeout)) + .http_status_as_error(false) // handle 4xx/5xx as Ok so we can evaluate queries + .user_agent("PingQL-Monitor/0.1") + .build() + .new_agent(); - // Write output to a temp file so we just wait for the process to exit - let tmp = format!("/tmp/pingql-curl-{}-{}.txt", std::process::id(), - std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().subsec_nanos()); - - let mut args: Vec = vec![ - "--silent".into(), - "--show-error".into(), - "--include".into(), - "--max-time".into(), format!("{:.1}", timeout_secs), - "--connect-timeout".into(), format!("{:.1}", timeout_secs), - "-X".into(), method.to_string(), - "--user-agent".into(), "PingQL-Monitor/0.1".into(), - "--location".into(), - "--output".into(), tmp.clone(), - ]; + let mut builder = ureq::http::Request::builder() + .method(method) + .uri(url); if let Some(hdrs) = headers { for (k, v) in hdrs { - args.push("-H".into()); - args.push(format!("{k}: {v}")); + builder = builder.header(k.as_str(), v.as_str()); } } - if let Some(b) = body { - args.push("--data-raw".into()); - args.push(b.to_string()); - } - args.push(url.to_string()); - // Run curl in a real OS thread, signal completion via tokio oneshot. - // std::thread detaches completely from tokio's runtime, so a hung curl - // process can't block other monitors. The tokio::time::timeout on the - // oneshot receiver gives us a hard async deadline. - let args_owned = args.clone(); - let tmp_owned = tmp.clone(); + let result = match body { + Some(b) => { + let req = builder.body(b.as_bytes()).map_err(|e| e.to_string())?; + agent.run(req) + } + None => { + let req = builder.body(()).map_err(|e| e.to_string())?; + agent.run(req) + } + }; - let (tx, rx) = tokio::sync::oneshot::channel::<(std::io::Result, Vec)>(); - std::thread::spawn(move || { - let status = std::process::Command::new("curl") - .args(&args_owned) - .stdin(Stdio::null()) - .stdout(Stdio::null()) - .stderr(Stdio::null()) - .status(); - let output = std::fs::read(&tmp_owned).unwrap_or_default(); - let _ = std::fs::remove_file(&tmp_owned); - let _ = tx.send((status, output)); - }); + let mut resp = match result { + Err(e) => { + let msg = e.to_string(); + let friendly = if msg.contains("timed out") || msg.contains("timeout") || msg.contains("TimedOut") { + format!("timed out after {}ms", timeout.as_millis()) + } else { + msg + }; + return Err(friendly); + } + Ok(r) => r, + }; - let (status_result, raw_bytes) = tokio::time::timeout( - std::time::Duration::from_secs_f64(timeout_secs + 2.0), - rx - ).await - .map_err(|_| format!("timed out after {:.0}s", timeout_secs))? - .map_err(|_| "curl thread dropped".to_string())?; - let exit_code = status_result.ok().and_then(|s| s.code()).unwrap_or(-1); - - if exit_code != 0 { - return Err(match exit_code { - 28 => format!("timed out after {:.0}s", timeout_secs), - 6 => "DNS lookup failed".to_string(), - 7 => "connection refused".to_string(), - _ => format!("curl error (exit {})", exit_code), - }); - } - - let raw = String::from_utf8_lossy(&raw_bytes).to_string(); - let mut parts = raw.splitn(2, "\r\n\r\n"); - let header_block = parts.next().unwrap_or(""); - let body_str = parts.next().unwrap_or("").to_string(); - - let mut lines = header_block.lines(); - let status_line = lines.next().unwrap_or("HTTP/1.1 0"); - let status_code: u16 = status_line.split_whitespace().nth(1) - .and_then(|s| s.parse().ok()) - .unwrap_or(0); + let status = resp.status().as_u16(); let mut resp_headers = HashMap::new(); - for line in lines { - if let Some((k, v)) = line.split_once(':') { - resp_headers.insert(k.trim().to_lowercase(), v.trim().to_string()); - } + for (name, value) in resp.headers() { + resp_headers.insert(name.as_str().to_lowercase(), value.to_str().unwrap_or("").to_string()); } const MAX_BODY: usize = 10 * 1024 * 1024; + let body_str = resp.body_mut().read_to_string().unwrap_or_default(); let body_out = if body_str.len() > MAX_BODY { format!("[body truncated: {} bytes]", body_str.len()) } else { body_str }; - Ok((status_code, resp_headers, body_out)) + Ok((status, resp_headers, body_out)) } /// Check SSL certificate expiry for a given HTTPS URL.