fix: skip in-flight monitors to prevent stacked slow requests

This commit is contained in:
M1 2026-03-16 15:41:34 +04:00
parent 9b970a90e0
commit 6c539d9066
2 changed files with 25 additions and 6 deletions

View File

@ -3,8 +3,11 @@ mod runner;
mod types; mod types;
use anyhow::Result; use anyhow::Result;
use std::collections::HashSet;
use std::env; use std::env;
use std::sync::Arc;
use rustls; use rustls;
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use tracing::{error, info}; use tracing::{error, info};
@ -27,13 +30,14 @@ async fn main() -> Result<()> {
info!("PingQL monitor starting, coordinator: {coordinator_url}"); info!("PingQL monitor starting, coordinator: {coordinator_url}");
let client = reqwest::Client::builder() let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.user_agent("PingQL-Monitor/0.1") .user_agent("PingQL-Monitor/0.1")
.build()?; .build()?;
let in_flight: Arc<Mutex<HashSet<String>>> = Arc::new(Mutex::new(HashSet::new()));
loop { loop {
match runner::fetch_and_run(&client, &coordinator_url, &monitor_token).await { match runner::fetch_and_run(&client, &coordinator_url, &monitor_token, &in_flight).await {
Ok(n) => info!("Ran {n} checks"), Ok(n) => { if n > 0 { info!("Spawned {n} checks"); } },
Err(e) => error!("Check cycle failed: {e}"), Err(e) => error!("Check cycle failed: {e}"),
} }
sleep(Duration::from_secs(10)).await; sleep(Duration::from_secs(10)).await;

View File

@ -2,9 +2,10 @@ use crate::query::{self, Response};
use crate::types::{PingResult, Monitor}; use crate::types::{PingResult, Monitor};
use anyhow::Result; use anyhow::Result;
use serde_json::json; use serde_json::json;
use std::collections::HashMap; use std::collections::{HashMap, HashSet};
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use tokio::sync::Mutex;
use tracing::{debug, warn}; use tracing::{debug, warn};
/// Fetch due monitors from coordinator, run them, post results back. /// Fetch due monitors from coordinator, run them, post results back.
@ -12,6 +13,7 @@ pub async fn fetch_and_run(
client: &reqwest::Client, client: &reqwest::Client,
coordinator_url: &str, coordinator_url: &str,
token: &str, token: &str,
in_flight: &Arc<Mutex<HashSet<String>>>,
) -> Result<usize> { ) -> Result<usize> {
// Fetch due monitors // Fetch due monitors
let monitors: Vec<Monitor> = client let monitors: Vec<Monitor> = client
@ -25,20 +27,33 @@ pub async fn fetch_and_run(
let n = monitors.len(); let n = monitors.len();
if n == 0 { return Ok(0); } if n == 0 { return Ok(0); }
// Spawn all checks — fire and forget, don't block the cycle // Spawn all checks — fire and forget, skip if already in-flight
let mut spawned = 0usize;
for monitor in monitors { for monitor in monitors {
{
let mut set = in_flight.lock().await;
if set.contains(&monitor.id) {
debug!("Skipping {} — check already in flight", monitor.id);
continue;
}
set.insert(monitor.id.clone());
}
spawned += 1;
let client = client.clone(); let client = client.clone();
let coordinator_url = coordinator_url.to_string(); let coordinator_url = coordinator_url.to_string();
let token = token.to_string(); let token = token.to_string();
let in_flight = in_flight.clone();
tokio::spawn(async move { tokio::spawn(async move {
let result = run_check(&client, &monitor).await; let result = run_check(&client, &monitor).await;
// Remove from in-flight before posting so a fast next cycle can pick it up
in_flight.lock().await.remove(&monitor.id);
if let Err(e) = post_result(&client, &coordinator_url, &token, result).await { if let Err(e) = post_result(&client, &coordinator_url, &token, result).await {
warn!("Failed to post result for {}: {e}", monitor.id); warn!("Failed to post result for {}: {e}", monitor.id);
} }
}); });
} }
Ok(n) Ok(spawned)
} }
async fn run_check(client: &reqwest::Client, monitor: &Monitor) -> PingResult { async fn run_check(client: &reqwest::Client, monitor: &Monitor) -> PingResult {