From c68700da4664aba0196a14270e374755c839a945 Mon Sep 17 00:00:00 2001 From: M1 Date: Wed, 18 Mar 2026 12:48:30 +0400 Subject: [PATCH] fix: use blocking reqwest in spawn_blocking for reliable OS-level timeout --- apps/monitor/Cargo.toml | 2 +- apps/monitor/src/runner.rs | 129 ++++++++++++++----------------------- 2 files changed, 50 insertions(+), 81 deletions(-) diff --git a/apps/monitor/Cargo.toml b/apps/monitor/Cargo.toml index ccf593b..af4a9f0 100644 --- a/apps/monitor/Cargo.toml +++ b/apps/monitor/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" [dependencies] tokio = { version = "1", features = ["full"] } -reqwest = { version = "0.12", features = ["json", "native-tls"], default-features = false } +reqwest = { version = "0.12", features = ["json", "native-tls", "blocking"], default-features = false } serde = { version = "1", features = ["derive"] } serde_json = "1" scraper = "0.21" # CSS selector / HTML parsing diff --git a/apps/monitor/src/runner.rs b/apps/monitor/src/runner.rs index 8f8addb..4349279 100644 --- a/apps/monitor/src/runner.rs +++ b/apps/monitor/src/runner.rs @@ -69,82 +69,45 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Op // Build request with method, headers, body, timeout let method = monitor.method.as_deref().unwrap_or("GET").to_uppercase(); let timeout = std::time::Duration::from_millis(monitor.timeout_ms.unwrap_or(30000)); - - // Pre-flight TCP connect check with a hard OS-level timeout. - // This catches hosts where the SYN packet hangs indefinitely — - // reqwest/hyper with rustls cannot be cancelled via tokio future drop alone. - let url_parsed = reqwest::Url::parse(&monitor.url).ok(); - if let Some(ref u) = url_parsed { - let host = u.host_str().unwrap_or(""); - let port = u.port_or_known_default().unwrap_or(443); - let addr = format!("{host}:{port}"); - // Resolve DNS first - let addrs: Vec<_> = match tokio::net::lookup_host(&addr).await { - Ok(a) => a.collect(), - Err(e) => { - return PingResult { - monitor_id: monitor.id.clone(), - scheduled_at, - jitter_ms, - status_code: None, - latency_ms: Some(start.elapsed().as_millis() as u64), - up: false, - error: Some(format!("DNS error: {e}")), - cert_expiry_days: None, - meta: None, - }; - } - }; - // Try TCP connect with hard timeout - let tcp_result = tokio::time::timeout( - timeout, - tokio::net::TcpStream::connect(addrs.as_slice()), - ).await; - if let Err(_) | Ok(Err(_)) = tcp_result { - let err = match tcp_result { - Err(_) => format!("timed out after {}ms", timeout.as_millis()), - Ok(Err(e)) => e.to_string(), - _ => unreachable!(), - }; - return PingResult { - monitor_id: monitor.id.clone(), - scheduled_at, - jitter_ms, - status_code: None, - latency_ms: Some(start.elapsed().as_millis() as u64), - up: false, - error: Some(err), - cert_expiry_days: None, - meta: None, - }; - } - } - - let req_method = reqwest::Method::from_bytes(method.as_bytes()) - .unwrap_or(reqwest::Method::GET); - - let mut req = client.request(req_method, &monitor.url).timeout(timeout); - - if let Some(headers) = &monitor.request_headers { - for (k, v) in headers { - if let (Ok(name), Ok(value)) = ( - reqwest::header::HeaderName::from_bytes(k.as_bytes()), - reqwest::header::HeaderValue::from_str(v), - ) { - req = req.header(name, value); - } - } - } - - if let Some(body) = &monitor.request_body { - req = req.body(body.clone()); - } - let is_https = monitor.url.starts_with("https://"); let url_for_cert = monitor.url.clone(); - let result: Result<_, String> = async { - let resp = req.send().await.map_err(|e| e.to_string())?; + // Use blocking reqwest in a thread so OS-level socket timeouts actually work. + // Async reqwest with rustls/native-tls does not reliably cancel on TLS hangs. + let url = monitor.url.clone(); + let method_str = method.clone(); + let req_headers = monitor.request_headers.clone(); + let req_body = monitor.request_body.clone(); + let query_clone = monitor.query.clone(); + + let blocking_result = tokio::task::spawn_blocking(move || { + let block_client = reqwest::blocking::Client::builder() + .user_agent("PingQL-Monitor/0.1") + .connect_timeout(timeout) + .timeout(timeout) + .build()?; + + let req_method = reqwest::Method::from_bytes(method_str.as_bytes()) + .unwrap_or(reqwest::Method::GET); + + let mut req = block_client.request(req_method, &url); + + if let Some(ref headers) = req_headers { + for (k, v) in headers { + if let (Ok(name), Ok(value)) = ( + reqwest::header::HeaderName::from_bytes(k.as_bytes()), + reqwest::header::HeaderValue::from_str(v), + ) { + req = req.header(name, value); + } + } + } + + if let Some(ref body) = req_body { + req = req.body(body.clone()); + } + + let resp = req.send()?; let status = resp.status(); let headers: HashMap = resp.headers().iter() .filter_map(|(k, v)| Some((k.to_string(), v.to_str().ok()?.to_string()))) @@ -156,16 +119,23 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Op if content_len > MAX_BODY_BYTES { format!("[body truncated: Content-Length {} exceeds 10MB limit]", content_len) } else { - let bytes = resp.bytes().await.map_err(|e| e.to_string())?; + let bytes = resp.bytes()?; let truncated = &bytes[..bytes.len().min(MAX_BODY_BYTES)]; String::from_utf8_lossy(truncated).into_owned() } }; - Ok((status, headers, body)) - }.await; + + Ok::<_, reqwest::Error>((status, headers, body, query_clone)) + }).await; let latency_ms = start.elapsed().as_millis() as u64; + let result = match blocking_result { + Err(e) => Err(e.to_string()), // spawn_blocking panic + Ok(Err(e)) => Err(e.to_string()), // reqwest error + Ok(Ok(v)) => Ok(v), + }; + match result { Err(e) => PingResult { monitor_id: monitor.id.clone(), @@ -178,11 +148,10 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Op cert_expiry_days: None, meta: None, }, - Ok((status_raw, headers, body)) => { + Ok((status_raw, headers, body, query)) => { let status = status_raw.as_u16(); - // Only attempt cert check after a successful response — avoids opening - // a second TCP connection to a host that's already timing out. + // Only attempt cert check after a successful response let cert_expiry_days = if is_https { match tokio::time::timeout( std::time::Duration::from_secs(5), @@ -196,7 +165,7 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Op }; // Evaluate query if present - let (up, query_error) = if let Some(q) = &monitor.query { + let (up, query_error) = if let Some(q) = &query { let response = Response { status, body: body.clone(),