pingql/apps/monitor/src/runner.rs

374 lines
14 KiB
Rust

use crate::query::{self, Response};
use crate::types::{PingResult, Monitor};
use anyhow::Result;
use serde_json::json;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::Mutex;
use tracing::{debug, warn};
/// Fetch due monitors from coordinator, run them, post results back.
pub async fn fetch_and_run(
client: &reqwest::Client,
coordinator_url: &str,
token: &str,
region: &str,
in_flight: &Arc<Mutex<HashSet<String>>>,
) -> Result<usize> {
// Fetch due monitors for this region
let url = if region.is_empty() {
format!("{coordinator_url}/internal/due")
} else {
format!("{coordinator_url}/internal/due?region={}", region)
};
let monitors: Vec<Monitor> = client
.get(&url)
.header("x-monitor-token", token)
.send()
.await?
.json()
.await?;
let n = monitors.len();
if n == 0 { return Ok(0); }
// run_id is computed deterministically per monitor+interval bucket so all regions
// checking within the same scheduled window share the same ID.
// Format: first 8 chars of monitor_id + ':' + floor(scheduled_at_epoch / interval_s)
// Spawn all checks — fire and forget, skip if already in-flight
let mut spawned = 0usize;
for monitor in monitors {
{
let mut set = in_flight.lock().await;
if set.contains(&monitor.id) {
debug!("Skipping {} — check already in flight", monitor.id);
continue;
}
set.insert(monitor.id.clone());
}
spawned += 1;
let client = client.clone();
let coordinator_url = coordinator_url.to_string();
let token = token.to_string();
let region_owned = region.to_string();
// Derive run_id from the monitor's scheduled_at bucket
let run_id_owned = {
let epoch = monitor.scheduled_at.as_deref()
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.timestamp())
.unwrap_or_else(|| chrono::Utc::now().timestamp());
let bucket = epoch / monitor.interval_s;
format!("{}:{}", &monitor.id[..8.min(monitor.id.len())], bucket)
};
let in_flight = in_flight.clone();
tokio::spawn(async move {
let timeout_ms = monitor.timeout_ms.unwrap_or(30000);
// Hard deadline: timeout + 5s buffer, so hung checks always resolve
let deadline = std::time::Duration::from_millis(timeout_ms + 5000);
let result = match tokio::time::timeout(deadline, run_check(&client, &monitor, monitor.scheduled_at.clone(), &region_owned, &run_id_owned)).await {
Ok(r) => r,
Err(_) => PingResult {
monitor_id: monitor.id.clone(),
scheduled_at: monitor.scheduled_at.clone(),
jitter_ms: None,
status_code: None,
latency_ms: Some(timeout_ms as u64),
up: false,
error: Some(format!("timed out after {}ms", timeout_ms)),
cert_expiry_days: None,
meta: None,
region: if region_owned.is_empty() { None } else { Some(region_owned.clone()) },
run_id: Some(run_id_owned.clone()),
},
};
// Post result first, then clear in-flight — this prevents the next
// poll from picking up the monitor again before the ping is persisted.
if let Err(e) = post_result(&client, &coordinator_url, &token, result).await {
warn!("Failed to post result for {}: {e}", monitor.id);
}
in_flight.lock().await.remove(&monitor.id);
});
}
Ok(spawned)
}
async fn run_check(client: &reqwest::Client, monitor: &Monitor, scheduled_at: Option<String>, region: &str, run_id: &str) -> PingResult {
// Compute jitter: how late we actually started vs when we were scheduled
let jitter_ms: Option<i64> = scheduled_at.as_deref().and_then(|s| {
let scheduled = chrono::DateTime::parse_from_rfc3339(s).ok()?;
let now = chrono::Utc::now();
Some((now - scheduled.with_timezone(&chrono::Utc)).num_milliseconds())
});
let start = Instant::now();
// Build request with method, headers, body, timeout
let method = monitor.method.as_deref().unwrap_or("GET").to_uppercase();
let timeout = std::time::Duration::from_millis(monitor.timeout_ms.unwrap_or(30000));
let is_https = monitor.url.starts_with("https://");
let url_for_cert = monitor.url.clone();
// Run the check in a real OS thread using ureq (blocking, synchronous HTTP).
// ureq sets SO_RCVTIMEO/SO_SNDTIMEO at the socket level, which reliably
// interrupts even a hanging TLS handshake — unlike async reqwest which
// cannot cancel syscall-level blocks via future cancellation.
let url = monitor.url.clone();
let req_headers = monitor.request_headers.clone();
let req_body = monitor.request_body.clone();
let method_clone = method.clone();
let (tx, rx) = tokio::sync::oneshot::channel::<Result<(u16, HashMap<String, String>, String), String>>();
std::thread::spawn(move || {
let _ = tx.send(run_check_blocking(&url, &method_clone, req_headers.as_ref(), req_body.as_deref(), timeout));
});
let curl_result = tokio::time::timeout(timeout + std::time::Duration::from_secs(2), rx)
.await
.map_err(|_| format!("timed out after {}ms", timeout.as_millis()))
.and_then(|r| r.map_err(|_| "check thread dropped".to_string()))
.unwrap_or_else(|e| Err(e));
let latency_ms = start.elapsed().as_millis() as u64;
let result = curl_result;
match result {
Err(ref e) => {
debug!("{} check error: {e}", monitor.url);
PingResult {
monitor_id: monitor.id.clone(),
scheduled_at,
jitter_ms,
status_code: None,
latency_ms: Some(latency_ms),
up: false,
error: Some(e.clone()),
cert_expiry_days: None,
meta: None,
region: if region.is_empty() { None } else { Some(region.to_string()) },
run_id: Some(run_id.to_string()),
}
},
Ok((status_code, headers, body)) => {
let status = status_code;
// Only attempt cert check after a successful response
let cert_expiry_days = if is_https {
match tokio::time::timeout(
std::time::Duration::from_secs(5),
check_cert_expiry(&url_for_cert),
).await {
Ok(Ok(days)) => days,
_ => None,
}
} else {
None
};
let query = &monitor.query;
// Evaluate query if present
let (up, query_error) = if let Some(q) = query {
let response = Response {
status,
body: body.clone(),
headers: headers.clone(),
latency_ms: Some(latency_ms),
cert_expiry_days,
};
match query::evaluate(q, &response) {
Ok(result) => (result, None),
Err(e) => {
warn!("Query error for {}: {e}", monitor.id);
// Fall back to status-based up/down
(status < 400, Some(e.to_string()))
}
}
} else {
// Default: up if 2xx/3xx
(status < 400, None)
};
let meta = json!({
"headers": headers,
"body_preview": &body[..body.len().min(500)],
});
debug!("{} → {status} {latency_ms}ms up={up}", monitor.url);
PingResult {
monitor_id: monitor.id.clone(),
scheduled_at,
jitter_ms,
status_code: Some(status),
latency_ms: Some(latency_ms),
up,
error: query_error,
cert_expiry_days,
meta: Some(meta),
region: if region.is_empty() { None } else { Some(region.to_string()) },
run_id: Some(run_id.to_string()),
}
}
}
}
/// Run an HTTP check synchronously using ureq.
/// ureq applies timeouts at the socket/IO level (not just future cancellation),
/// which reliably interrupts hanging TLS handshakes.
/// Must be called from a std::thread (not async context).
fn run_check_blocking(
url: &str,
method: &str,
headers: Option<&HashMap<String, String>>,
body: Option<&str>,
timeout: std::time::Duration,
) -> Result<(u16, HashMap<String, String>, String), String> {
// Load system CA certs so we can verify chains from Cloudflare and other
// CAs not included in the bundled webpki-roots.
let root_certs: Vec<ureq::tls::Certificate<'static>> =
rustls_native_certs::load_native_certs()
.certs
.into_iter()
.map(|c| ureq::tls::Certificate::from_der(c.as_ref()).to_owned())
.collect();
let tls = ureq::tls::TlsConfig::builder()
.root_certs(ureq::tls::RootCerts::Specific(Arc::new(root_certs)))
.build();
let agent = ureq::Agent::config_builder()
.timeout_global(Some(timeout))
.timeout_connect(Some(timeout))
.http_status_as_error(false)
.user_agent("Mozilla/5.0 (compatible; PingQL/1.0; +https://pingql.com)")
.tls_config(tls)
.build()
.new_agent();
let mut builder = ureq::http::Request::builder()
.method(method)
.uri(url);
if let Some(hdrs) = headers {
for (k, v) in hdrs {
builder = builder.header(k.as_str(), v.as_str());
}
}
let result = match body {
Some(b) => {
let req = builder.body(b.as_bytes()).map_err(|e| e.to_string())?;
agent.run(req)
}
None => {
let req = builder.body(()).map_err(|e| e.to_string())?;
agent.run(req)
}
};
let mut resp = match result {
Err(e) => {
let msg = e.to_string();
let friendly = if msg.contains("timed out") || msg.contains("timeout") || msg.contains("TimedOut") {
format!("timed out after {}ms", timeout.as_millis())
} else {
msg
};
return Err(friendly);
}
Ok(r) => r,
};
let status = resp.status().as_u16();
let mut resp_headers = HashMap::new();
for (name, value) in resp.headers() {
resp_headers.insert(name.as_str().to_lowercase(), value.to_str().unwrap_or("").to_string());
}
const MAX_BODY: usize = 10 * 1024 * 1024;
let body_str = resp.body_mut().read_to_string().unwrap_or_default();
let body_out = if body_str.len() > MAX_BODY {
format!("[body truncated: {} bytes]", body_str.len())
} else {
body_str
};
Ok((status, resp_headers, body_out))
}
/// Check SSL certificate expiry for a given HTTPS URL.
/// Returns the number of days until the certificate expires.
async fn check_cert_expiry(url: &str) -> Result<Option<i64>> {
use rustls::ClientConfig;
use rustls::pki_types::ServerName;
use tokio::net::TcpStream;
use tokio_rustls::TlsConnector;
use x509_parser::prelude::*;
// Parse host and port from URL
let url_parsed = reqwest::Url::parse(url)?;
let host = url_parsed.host_str().unwrap_or("");
let port = url_parsed.port().unwrap_or(443);
// Build a rustls config that captures certificates
let mut root_store = rustls::RootCertStore::empty();
root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
let config = ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();
let connector = TlsConnector::from(Arc::new(config));
let server_name = ServerName::try_from(host.to_string())?;
let stream = TcpStream::connect(format!("{host}:{port}")).await?;
let tls_stream = connector.connect(server_name, stream).await?;
// Get peer certificates
let (_, conn) = tls_stream.get_ref();
let certs = conn.peer_certificates().unwrap_or(&[]);
if let Some(cert_der) = certs.first() {
let (_, cert) = X509Certificate::from_der(cert_der.as_ref())?;
let not_after = cert.validity().not_after.timestamp();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
let days = (not_after - now) / 86400;
return Ok(Some(days));
}
Ok(None)
}
async fn post_result(
client: &reqwest::Client,
coordinator_url: &str,
token: &str,
result: PingResult,
) -> Result<()> {
let resp = tokio::time::timeout(
std::time::Duration::from_secs(10),
client
.post(format!("{coordinator_url}/internal/ingest"))
.header("x-monitor-token", token)
.json(&result)
.send()
).await
.map_err(|_| anyhow::anyhow!("post_result timed out"))?
.map_err(|e| anyhow::anyhow!("{e}"))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(anyhow::anyhow!("ingest returned {status}: {body}"));
}
Ok(())
}