fix: replace curl subprocess with ureq blocking client in std::thread

This commit is contained in:
M1 2026-03-18 13:40:28 +04:00
parent 6b8e1fc9d9
commit 3fa624eff8
2 changed files with 65 additions and 89 deletions

View File

@ -5,7 +5,8 @@ edition = "2021"
[dependencies] [dependencies]
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.12", features = ["json", "native-tls", "blocking"], default-features = false } reqwest = { version = "0.12", features = ["json", "native-tls"], default-features = false }
ureq = { version = "3", features = ["native-tls"] }
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
scraper = "0.21" # CSS selector / HTML parsing scraper = "0.21" # CSS selector / HTML parsing

View File

@ -88,16 +88,25 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Op
let is_https = monitor.url.starts_with("https://"); let is_https = monitor.url.starts_with("https://");
let url_for_cert = monitor.url.clone(); let url_for_cert = monitor.url.clone();
// Use curl subprocess — the only reliable way to enforce a hard timeout // Run the check in a real OS thread using ureq (blocking, synchronous HTTP).
// including TLS handshake on hosts that accept TCP but stall at TLS. // ureq sets SO_RCVTIMEO/SO_SNDTIMEO at the socket level, which reliably
let timeout_secs = (timeout.as_millis() as f64 / 1000.0).max(1.0); // interrupts even a hanging TLS handshake — unlike async reqwest which
let curl_result = run_curl( // cannot cancel syscall-level blocks via future cancellation.
&monitor.url, let url = monitor.url.clone();
&method, let req_headers = monitor.request_headers.clone();
monitor.request_headers.as_ref(), let req_body = monitor.request_body.clone();
monitor.request_body.as_deref(), let method_clone = method.clone();
timeout_secs,
).await; let (tx, rx) = tokio::sync::oneshot::channel::<Result<(u16, HashMap<String, String>, String), String>>();
std::thread::spawn(move || {
let _ = tx.send(run_check_blocking(&url, &method_clone, req_headers.as_ref(), req_body.as_deref(), timeout));
});
let curl_result = tokio::time::timeout(timeout + std::time::Duration::from_secs(2), rx)
.await
.map_err(|_| format!("timed out after {}ms", timeout.as_millis()))
.and_then(|r| r.map_err(|_| "check thread dropped".to_string()))
.unwrap_or_else(|e| Err(e));
let latency_ms = start.elapsed().as_millis() as u64; let latency_ms = start.elapsed().as_millis() as u64;
@ -180,109 +189,75 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Op
} }
} }
/// Run an HTTP check via curl subprocess writing output to a temp file. /// Run an HTTP check synchronously using ureq.
/// Using a temp file instead of a pipe avoids async read_to_end hanging /// ureq applies timeouts at the socket/IO level (not just future cancellation),
/// when curl exits without closing its inherited stdout properly. /// which reliably interrupts hanging TLS handshakes.
async fn run_curl( /// Must be called from a std::thread (not async context).
fn run_check_blocking(
url: &str, url: &str,
method: &str, method: &str,
headers: Option<&HashMap<String, String>>, headers: Option<&HashMap<String, String>>,
body: Option<&str>, body: Option<&str>,
timeout_secs: f64, timeout: std::time::Duration,
) -> Result<(u16, HashMap<String, String>, String), String> { ) -> Result<(u16, HashMap<String, String>, String), String> {
use std::process::Stdio; let agent = ureq::Agent::config_builder()
.timeout_global(Some(timeout))
.timeout_connect(Some(timeout))
.http_status_as_error(false) // handle 4xx/5xx as Ok so we can evaluate queries
.user_agent("PingQL-Monitor/0.1")
.build()
.new_agent();
// Write output to a temp file so we just wait for the process to exit let mut builder = ureq::http::Request::builder()
let tmp = format!("/tmp/pingql-curl-{}-{}.txt", std::process::id(), .method(method)
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap_or_default().subsec_nanos()); .uri(url);
let mut args: Vec<String> = vec![
"--silent".into(),
"--show-error".into(),
"--include".into(),
"--max-time".into(), format!("{:.1}", timeout_secs),
"--connect-timeout".into(), format!("{:.1}", timeout_secs),
"-X".into(), method.to_string(),
"--user-agent".into(), "PingQL-Monitor/0.1".into(),
"--location".into(),
"--output".into(), tmp.clone(),
];
if let Some(hdrs) = headers { if let Some(hdrs) = headers {
for (k, v) in hdrs { for (k, v) in hdrs {
args.push("-H".into()); builder = builder.header(k.as_str(), v.as_str());
args.push(format!("{k}: {v}"));
} }
} }
if let Some(b) = body {
args.push("--data-raw".into()); let result = match body {
args.push(b.to_string()); Some(b) => {
let req = builder.body(b.as_bytes()).map_err(|e| e.to_string())?;
agent.run(req)
} }
args.push(url.to_string()); None => {
let req = builder.body(()).map_err(|e| e.to_string())?;
// Run curl in a real OS thread, signal completion via tokio oneshot. agent.run(req)
// std::thread detaches completely from tokio's runtime, so a hung curl
// process can't block other monitors. The tokio::time::timeout on the
// oneshot receiver gives us a hard async deadline.
let args_owned = args.clone();
let tmp_owned = tmp.clone();
let (tx, rx) = tokio::sync::oneshot::channel::<(std::io::Result<std::process::ExitStatus>, Vec<u8>)>();
std::thread::spawn(move || {
let status = std::process::Command::new("curl")
.args(&args_owned)
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.status();
let output = std::fs::read(&tmp_owned).unwrap_or_default();
let _ = std::fs::remove_file(&tmp_owned);
let _ = tx.send((status, output));
});
let (status_result, raw_bytes) = tokio::time::timeout(
std::time::Duration::from_secs_f64(timeout_secs + 2.0),
rx
).await
.map_err(|_| format!("timed out after {:.0}s", timeout_secs))?
.map_err(|_| "curl thread dropped".to_string())?;
let exit_code = status_result.ok().and_then(|s| s.code()).unwrap_or(-1);
if exit_code != 0 {
return Err(match exit_code {
28 => format!("timed out after {:.0}s", timeout_secs),
6 => "DNS lookup failed".to_string(),
7 => "connection refused".to_string(),
_ => format!("curl error (exit {})", exit_code),
});
} }
};
let raw = String::from_utf8_lossy(&raw_bytes).to_string(); let mut resp = match result {
let mut parts = raw.splitn(2, "\r\n\r\n"); Err(e) => {
let header_block = parts.next().unwrap_or(""); let msg = e.to_string();
let body_str = parts.next().unwrap_or("").to_string(); let friendly = if msg.contains("timed out") || msg.contains("timeout") || msg.contains("TimedOut") {
format!("timed out after {}ms", timeout.as_millis())
} else {
msg
};
return Err(friendly);
}
Ok(r) => r,
};
let mut lines = header_block.lines(); let status = resp.status().as_u16();
let status_line = lines.next().unwrap_or("HTTP/1.1 0");
let status_code: u16 = status_line.split_whitespace().nth(1)
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let mut resp_headers = HashMap::new(); let mut resp_headers = HashMap::new();
for line in lines { for (name, value) in resp.headers() {
if let Some((k, v)) = line.split_once(':') { resp_headers.insert(name.as_str().to_lowercase(), value.to_str().unwrap_or("").to_string());
resp_headers.insert(k.trim().to_lowercase(), v.trim().to_string());
}
} }
const MAX_BODY: usize = 10 * 1024 * 1024; const MAX_BODY: usize = 10 * 1024 * 1024;
let body_str = resp.body_mut().read_to_string().unwrap_or_default();
let body_out = if body_str.len() > MAX_BODY { let body_out = if body_str.len() > MAX_BODY {
format!("[body truncated: {} bytes]", body_str.len()) format!("[body truncated: {} bytes]", body_str.len())
} else { } else {
body_str body_str
}; };
Ok((status_code, resp_headers, body_out)) Ok((status, resp_headers, body_out))
} }
/// Check SSL certificate expiry for a given HTTPS URL. /// Check SSL certificate expiry for a given HTTPS URL.