diff --git a/apps/monitor/src/main.rs b/apps/monitor/src/main.rs index 0f621fb..11e9198 100644 --- a/apps/monitor/src/main.rs +++ b/apps/monitor/src/main.rs @@ -3,8 +3,11 @@ mod runner; mod types; use anyhow::Result; +use std::collections::HashSet; use std::env; +use std::sync::Arc; use rustls; +use tokio::sync::Mutex; use tokio::time::{sleep, Duration}; use tracing::{error, info}; @@ -27,13 +30,14 @@ async fn main() -> Result<()> { info!("PingQL monitor starting, coordinator: {coordinator_url}"); let client = reqwest::Client::builder() - .timeout(Duration::from_secs(30)) .user_agent("PingQL-Monitor/0.1") .build()?; + let in_flight: Arc>> = Arc::new(Mutex::new(HashSet::new())); + loop { - match runner::fetch_and_run(&client, &coordinator_url, &monitor_token).await { - Ok(n) => info!("Ran {n} checks"), + match runner::fetch_and_run(&client, &coordinator_url, &monitor_token, &in_flight).await { + Ok(n) => { if n > 0 { info!("Spawned {n} checks"); } }, Err(e) => error!("Check cycle failed: {e}"), } sleep(Duration::from_secs(10)).await; diff --git a/apps/monitor/src/runner.rs b/apps/monitor/src/runner.rs index ab50826..4befb41 100644 --- a/apps/monitor/src/runner.rs +++ b/apps/monitor/src/runner.rs @@ -2,9 +2,10 @@ use crate::query::{self, Response}; use crate::types::{PingResult, Monitor}; use anyhow::Result; use serde_json::json; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Instant; +use tokio::sync::Mutex; use tracing::{debug, warn}; /// Fetch due monitors from coordinator, run them, post results back. @@ -12,6 +13,7 @@ pub async fn fetch_and_run( client: &reqwest::Client, coordinator_url: &str, token: &str, + in_flight: &Arc>>, ) -> Result { // Fetch due monitors let monitors: Vec = client @@ -25,20 +27,33 @@ pub async fn fetch_and_run( let n = monitors.len(); if n == 0 { return Ok(0); } - // Spawn all checks — fire and forget, don't block the cycle + // Spawn all checks — fire and forget, skip if already in-flight + let mut spawned = 0usize; for monitor in monitors { + { + let mut set = in_flight.lock().await; + if set.contains(&monitor.id) { + debug!("Skipping {} — check already in flight", monitor.id); + continue; + } + set.insert(monitor.id.clone()); + } + spawned += 1; let client = client.clone(); let coordinator_url = coordinator_url.to_string(); let token = token.to_string(); + let in_flight = in_flight.clone(); tokio::spawn(async move { let result = run_check(&client, &monitor).await; + // Remove from in-flight before posting so a fast next cycle can pick it up + in_flight.lock().await.remove(&monitor.id); if let Err(e) = post_result(&client, &coordinator_url, &token, result).await { warn!("Failed to post result for {}: {e}", monitor.id); } }); } - Ok(n) + Ok(spawned) } async fn run_check(client: &reqwest::Client, monitor: &Monitor) -> PingResult {