fix: use blocking reqwest in spawn_blocking for reliable OS-level timeout
This commit is contained in:
parent
4035a3b215
commit
c68700da46
|
|
@ -5,7 +5,7 @@ edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
reqwest = { version = "0.12", features = ["json", "native-tls"], default-features = false }
|
reqwest = { version = "0.12", features = ["json", "native-tls", "blocking"], default-features = false }
|
||||||
serde = { version = "1", features = ["derive"] }
|
serde = { version = "1", features = ["derive"] }
|
||||||
serde_json = "1"
|
serde_json = "1"
|
||||||
scraper = "0.21" # CSS selector / HTML parsing
|
scraper = "0.21" # CSS selector / HTML parsing
|
||||||
|
|
|
||||||
|
|
@ -69,63 +69,30 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Op
|
||||||
// Build request with method, headers, body, timeout
|
// Build request with method, headers, body, timeout
|
||||||
let method = monitor.method.as_deref().unwrap_or("GET").to_uppercase();
|
let method = monitor.method.as_deref().unwrap_or("GET").to_uppercase();
|
||||||
let timeout = std::time::Duration::from_millis(monitor.timeout_ms.unwrap_or(30000));
|
let timeout = std::time::Duration::from_millis(monitor.timeout_ms.unwrap_or(30000));
|
||||||
|
let is_https = monitor.url.starts_with("https://");
|
||||||
|
let url_for_cert = monitor.url.clone();
|
||||||
|
|
||||||
// Pre-flight TCP connect check with a hard OS-level timeout.
|
// Use blocking reqwest in a thread so OS-level socket timeouts actually work.
|
||||||
// This catches hosts where the SYN packet hangs indefinitely —
|
// Async reqwest with rustls/native-tls does not reliably cancel on TLS hangs.
|
||||||
// reqwest/hyper with rustls cannot be cancelled via tokio future drop alone.
|
let url = monitor.url.clone();
|
||||||
let url_parsed = reqwest::Url::parse(&monitor.url).ok();
|
let method_str = method.clone();
|
||||||
if let Some(ref u) = url_parsed {
|
let req_headers = monitor.request_headers.clone();
|
||||||
let host = u.host_str().unwrap_or("");
|
let req_body = monitor.request_body.clone();
|
||||||
let port = u.port_or_known_default().unwrap_or(443);
|
let query_clone = monitor.query.clone();
|
||||||
let addr = format!("{host}:{port}");
|
|
||||||
// Resolve DNS first
|
|
||||||
let addrs: Vec<_> = match tokio::net::lookup_host(&addr).await {
|
|
||||||
Ok(a) => a.collect(),
|
|
||||||
Err(e) => {
|
|
||||||
return PingResult {
|
|
||||||
monitor_id: monitor.id.clone(),
|
|
||||||
scheduled_at,
|
|
||||||
jitter_ms,
|
|
||||||
status_code: None,
|
|
||||||
latency_ms: Some(start.elapsed().as_millis() as u64),
|
|
||||||
up: false,
|
|
||||||
error: Some(format!("DNS error: {e}")),
|
|
||||||
cert_expiry_days: None,
|
|
||||||
meta: None,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
};
|
|
||||||
// Try TCP connect with hard timeout
|
|
||||||
let tcp_result = tokio::time::timeout(
|
|
||||||
timeout,
|
|
||||||
tokio::net::TcpStream::connect(addrs.as_slice()),
|
|
||||||
).await;
|
|
||||||
if let Err(_) | Ok(Err(_)) = tcp_result {
|
|
||||||
let err = match tcp_result {
|
|
||||||
Err(_) => format!("timed out after {}ms", timeout.as_millis()),
|
|
||||||
Ok(Err(e)) => e.to_string(),
|
|
||||||
_ => unreachable!(),
|
|
||||||
};
|
|
||||||
return PingResult {
|
|
||||||
monitor_id: monitor.id.clone(),
|
|
||||||
scheduled_at,
|
|
||||||
jitter_ms,
|
|
||||||
status_code: None,
|
|
||||||
latency_ms: Some(start.elapsed().as_millis() as u64),
|
|
||||||
up: false,
|
|
||||||
error: Some(err),
|
|
||||||
cert_expiry_days: None,
|
|
||||||
meta: None,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let req_method = reqwest::Method::from_bytes(method.as_bytes())
|
let blocking_result = tokio::task::spawn_blocking(move || {
|
||||||
|
let block_client = reqwest::blocking::Client::builder()
|
||||||
|
.user_agent("PingQL-Monitor/0.1")
|
||||||
|
.connect_timeout(timeout)
|
||||||
|
.timeout(timeout)
|
||||||
|
.build()?;
|
||||||
|
|
||||||
|
let req_method = reqwest::Method::from_bytes(method_str.as_bytes())
|
||||||
.unwrap_or(reqwest::Method::GET);
|
.unwrap_or(reqwest::Method::GET);
|
||||||
|
|
||||||
let mut req = client.request(req_method, &monitor.url).timeout(timeout);
|
let mut req = block_client.request(req_method, &url);
|
||||||
|
|
||||||
if let Some(headers) = &monitor.request_headers {
|
if let Some(ref headers) = req_headers {
|
||||||
for (k, v) in headers {
|
for (k, v) in headers {
|
||||||
if let (Ok(name), Ok(value)) = (
|
if let (Ok(name), Ok(value)) = (
|
||||||
reqwest::header::HeaderName::from_bytes(k.as_bytes()),
|
reqwest::header::HeaderName::from_bytes(k.as_bytes()),
|
||||||
|
|
@ -136,15 +103,11 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Op
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(body) = &monitor.request_body {
|
if let Some(ref body) = req_body {
|
||||||
req = req.body(body.clone());
|
req = req.body(body.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
let is_https = monitor.url.starts_with("https://");
|
let resp = req.send()?;
|
||||||
let url_for_cert = monitor.url.clone();
|
|
||||||
|
|
||||||
let result: Result<_, String> = async {
|
|
||||||
let resp = req.send().await.map_err(|e| e.to_string())?;
|
|
||||||
let status = resp.status();
|
let status = resp.status();
|
||||||
let headers: HashMap<String, String> = resp.headers().iter()
|
let headers: HashMap<String, String> = resp.headers().iter()
|
||||||
.filter_map(|(k, v)| Some((k.to_string(), v.to_str().ok()?.to_string())))
|
.filter_map(|(k, v)| Some((k.to_string(), v.to_str().ok()?.to_string())))
|
||||||
|
|
@ -156,16 +119,23 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Op
|
||||||
if content_len > MAX_BODY_BYTES {
|
if content_len > MAX_BODY_BYTES {
|
||||||
format!("[body truncated: Content-Length {} exceeds 10MB limit]", content_len)
|
format!("[body truncated: Content-Length {} exceeds 10MB limit]", content_len)
|
||||||
} else {
|
} else {
|
||||||
let bytes = resp.bytes().await.map_err(|e| e.to_string())?;
|
let bytes = resp.bytes()?;
|
||||||
let truncated = &bytes[..bytes.len().min(MAX_BODY_BYTES)];
|
let truncated = &bytes[..bytes.len().min(MAX_BODY_BYTES)];
|
||||||
String::from_utf8_lossy(truncated).into_owned()
|
String::from_utf8_lossy(truncated).into_owned()
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
Ok((status, headers, body))
|
|
||||||
}.await;
|
Ok::<_, reqwest::Error>((status, headers, body, query_clone))
|
||||||
|
}).await;
|
||||||
|
|
||||||
let latency_ms = start.elapsed().as_millis() as u64;
|
let latency_ms = start.elapsed().as_millis() as u64;
|
||||||
|
|
||||||
|
let result = match blocking_result {
|
||||||
|
Err(e) => Err(e.to_string()), // spawn_blocking panic
|
||||||
|
Ok(Err(e)) => Err(e.to_string()), // reqwest error
|
||||||
|
Ok(Ok(v)) => Ok(v),
|
||||||
|
};
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
Err(e) => PingResult {
|
Err(e) => PingResult {
|
||||||
monitor_id: monitor.id.clone(),
|
monitor_id: monitor.id.clone(),
|
||||||
|
|
@ -178,11 +148,10 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Op
|
||||||
cert_expiry_days: None,
|
cert_expiry_days: None,
|
||||||
meta: None,
|
meta: None,
|
||||||
},
|
},
|
||||||
Ok((status_raw, headers, body)) => {
|
Ok((status_raw, headers, body, query)) => {
|
||||||
let status = status_raw.as_u16();
|
let status = status_raw.as_u16();
|
||||||
|
|
||||||
// Only attempt cert check after a successful response — avoids opening
|
// Only attempt cert check after a successful response
|
||||||
// a second TCP connection to a host that's already timing out.
|
|
||||||
let cert_expiry_days = if is_https {
|
let cert_expiry_days = if is_https {
|
||||||
match tokio::time::timeout(
|
match tokio::time::timeout(
|
||||||
std::time::Duration::from_secs(5),
|
std::time::Duration::from_secs(5),
|
||||||
|
|
@ -196,7 +165,7 @@ async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Op
|
||||||
};
|
};
|
||||||
|
|
||||||
// Evaluate query if present
|
// Evaluate query if present
|
||||||
let (up, query_error) = if let Some(q) = &monitor.query {
|
let (up, query_error) = if let Some(q) = &query {
|
||||||
let response = Response {
|
let response = Response {
|
||||||
status,
|
status,
|
||||||
body: body.clone(),
|
body: body.clone(),
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue