//! Unified observability aggregator for in-cluster deployment. //! //! Collects metrics from all app pods via Prometheus scrape, forwards traces //! to OTLP endpoint, and streams logs from all pods to Loki-compatible backend. //! //! Usage: //! METRICS_AGGREGATOR_PORT=9090 \ //! OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317 \ //! LOKI_URL=http://loki:3100/loki/api/v1/push \ //! SCRAPE_INTERVAL_SECS=15 \ //! SCRAPE_TARGETS_FILE=/etc/metrics/targets.json \ //! metrics-aggregator mod args; mod hotreload; mod k8s_discovery; mod loki; mod metrics; mod otel; mod scrape; mod stats_store; mod target; use serde::Deserialize; use std::collections::HashMap; use std::fmt::Write as _; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use actix_web::{HttpResponse, HttpServer, web}; use clap::Parser; use loki::{LokiEntry, LokiForwarder}; use metrics::AggMetrics; use observability::{init_tracing_subscriber, install_recorder, instance_id}; use otel::OtelGuard; use scrape::{HttpClient, ScrapeResult}; use stats_store::StatsStore; use target::ScrapeTarget; use tokio::io::AsyncBufReadExt; use tokio::sync::{RwLock, broadcast}; use tokio::time::interval; type MetricsStore = Arc>>>; // StatsStore is defined in stats_store.rs — per-app aggregated data. #[actix_web::main] async fn main() -> std::io::Result<()> { let args = args::Args::parse(); init_tracing_subscriber(&args.log_level, false); let instance = instance_id(); tracing::info!( instance = %instance, port = args.port, scrape_interval = args.scrape_interval_secs, "metrics-aggregator starting" ); let prometheus_handle = install_recorder(); metrics::init(); let metrics = AggMetrics::new(); let store: MetricsStore = Arc::new(RwLock::new(HashMap::new())); let stats_store: StatsStore = Arc::new(RwLock::new(HashMap::new())); let targets: Arc>> = Arc::new(RwLock::new(Vec::new())); let http = HttpClient::new(10); let otel_guard = init_otel_from_args(&args); let loki = init_loki_from_args(&args); let (shutdown_tx, _) = broadcast::channel::<()>(4); // Background task: evict push entries older than 5 minutes. let stats_store_for_evict = stats_store.clone(); let mut evict_shutdown = shutdown_tx.subscribe(); tokio::spawn(async move { let mut ticker = interval(Duration::from_secs(30)); loop { tokio::select! { _ = evict_shutdown.recv() => break, _ = ticker.tick() => { let cutoff = chrono::Utc::now().timestamp() - 300; let mut guard = stats_store_for_evict.write().await; guard.retain(|_, entry| entry.last_seen >= cutoff); } } } }); if let Some(path) = &args.targets_file { match target::load_targets_from_file(path).await { Ok(initial_targets) => { let mut guard = targets.write().await; *guard = initial_targets; tracing::info!(count = guard.len(), "loaded initial targets from file"); } Err(e) => { tracing::warn!(error = %e, "failed to load targets file"); } } let tw = hotreload::watch_targets_file(path.clone(), targets.clone(), shutdown_tx.subscribe()); tokio::spawn(tw); } else if std::env::var("KUBERNETES_SERVICE_HOST").is_ok() { if let Some(k8s_targets) = k8s_discovery::k8s_pod_discovery().await { let mut guard = targets.write().await; *guard = k8s_targets.clone(); tracing::info!(count = guard.len(), "discovered K8s pods as targets"); } } let scrape_filter = args .scrape_apps .as_ref() .map(|s| s.split(',').map(|p| p.trim().to_string()).collect()); let scrape_targets = targets.clone(); let scrape_store = store.clone(); let scrape_metrics = metrics.clone(); let scrape_http = http.clone(); let loki_clone = loki.clone(); let shutdown_tx_clone = shutdown_tx.clone(); let scrape_interval = args.scrape_interval_secs; let scrape_filter_clone = scrape_filter.clone(); tokio::task::spawn_local(async move { scrape_loop( scrape_targets, scrape_store, scrape_metrics, scrape_http, scrape_interval, scrape_filter_clone, loki_clone, shutdown_tx_clone.subscribe(), ) .await; }); let log_shutdown = shutdown_tx.subscribe(); let log_loki = loki.clone(); tokio::task::spawn_local(async move { log_collector(log_loki, log_shutdown).await; }); let bind_addr: SocketAddr = ([0, 0, 0, 0], args.port).into(); tracing::info!(addr = %bind_addr, "HTTP server starting"); let app_targets = targets.clone(); let app_store = store.clone(); let app_handle = prometheus_handle.clone(); let loki_for_push: Option> = loki.map(Arc::new); let app_stats = stats_store.clone(); let server = HttpServer::new(move || { let targets = app_targets.clone(); let store = app_store.clone(); let handle = app_handle.clone(); let stats_store = app_stats.clone(); let loki_for_push: Option> = loki_for_push.clone(); actix_web::App::new() .app_data(web::Data::new(targets)) .app_data(web::Data::new(store)) .app_data(web::Data::new(handle)) .app_data(web::Data::new(stats_store)) .app_data(web::Data::new(loki_for_push)) .route("/metrics", web::get().to(handle_metrics)) .route("/api/v1/metrics", web::get().to(handle_metrics)) .route("/api/v1/push", web::post().to(handle_push)) .route("/api/v1/dashboard", web::get().to(handle_dashboard)) .route("/api/v1/stats", web::get().to(handle_stats)) .route("/health", web::get().to(handle_health)) .route("/api/v1/health", web::get().to(handle_health)) .route("/api/v1/targets", web::get().to(handle_targets)) }) .bind(&bind_addr)? .run(); let server_handle = server.handle(); tokio::spawn(server); tokio::signal::ctrl_c().await.ok(); tracing::info!("received Ctrl+C, shutting down"); let _ = shutdown_tx.send(()); server_handle.stop(true).await; if let Some(guard) = otel_guard { guard.shutdown().await; } tracing::info!("metrics-aggregator stopped"); Ok(()) } fn init_otel_from_args(args: &args::Args) -> Option { if args.no_otel { return None; } let endpoint = args .otel_endpoint .clone() .or_else(|| std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").ok())?; match otel::init_otel(&endpoint, "metrics-aggregator") { Ok(guard) => { tracing::info!(endpoint = %endpoint, "OTLP tracing enabled"); Some(guard) } Err(e) => { tracing::warn!(error = %e, "OTLP init failed, continuing without traces"); None } } } fn init_loki_from_args(args: &args::Args) -> Option { if args.no_loki { return None; } let url = args .loki_url .clone() .or_else(|| std::env::var("LOKI_URL").ok())?; tracing::info!("Loki log forwarding enabled"); Some(LokiForwarder::new(url)) } async fn handle_metrics( store: web::Data, stats_store: web::Data, handle: web::Data, ) -> HttpResponse { let extra = vec![("aggregator_instance".to_string(), "default".to_string())]; let scraped = render_aggregated_metrics(store, extra.clone()).await; let pushed = render_pushed_metrics(stats_store).await; let combined = format!("{}{}{}", handle.render(), scraped, pushed); HttpResponse::Ok() .content_type("text/plain; version=0.0.4; charset=utf-8") .body(combined) } async fn handle_health() -> HttpResponse { HttpResponse::Ok() .content_type("application/json") .body(r#"{"status":"ok"}"#) } async fn handle_targets(targets: web::Data>>>) -> HttpResponse { let guard = targets.read().await; let json = serde_json::to_string(&*guard).unwrap_or_default(); HttpResponse::Ok() .content_type("application/json") .body(json) } // ── Push endpoint payload ──────────────────────────────────────────────────── #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] struct PushPayload { app: String, #[serde(default)] instance: String, timestamp: i64, #[serde(default)] http: Option, #[serde(default)] system: Option, #[serde(default)] business: HashMap, #[serde(default)] token_usage: Option, #[serde(default)] tasks: Option, #[serde(default)] latency: HashMap, #[serde(default)] logs: Vec, } async fn handle_push( stats_store: web::Data, loki: web::Data>>, payload: web::Json, ) -> HttpResponse { let app = payload.app.clone(); stats_store::merge_push_payload( &stats_store, &app, &payload.instance, payload.timestamp, payload.http.as_ref(), payload.system.as_ref(), &payload.business, payload.token_usage.as_ref(), payload.tasks.as_ref(), &payload.latency, &payload.logs, ) .await; // Forward logs to Loki if configured if !payload.logs.is_empty() { if let Some(loki_fwd) = loki.as_ref() { let entries: Vec = payload .logs .iter() .map(|l| LokiEntry { timestamp: chrono::DateTime::from_timestamp(l.timestamp, 0) .unwrap_or_else(chrono::Utc::now), line: format!("[{}] {}", l.level.to_lowercase(), l.message), }) .collect(); if let Err(e) = loki_fwd.push(entries).await { tracing::warn!(error = %e, "loki push on /push failed"); } } } HttpResponse::Ok().body("ok") } async fn scrape_loop( targets: Arc>>, store: MetricsStore, metrics: AggMetrics, http: HttpClient, interval_secs: u64, scrape_apps_filter: Option>, _loki: Option, mut shutdown: broadcast::Receiver<()>, ) { let mut ticker = interval(Duration::from_secs(interval_secs)); loop { tokio::select! { _ = shutdown.recv() => break, _ = ticker.tick() => { let targets_snapshot = targets.read().await.clone(); let count = targets_snapshot.len() as u64; metrics.targets_total.set(count as f64); let mut healthy_count = 0u64; for target in &targets_snapshot { if let Some(ref filter) = scrape_apps_filter { if !filter.contains(&target.name) { continue; } } metrics.scrape_total.increment(1); match http.scrape(target).await { ScrapeResult::Success(body, duration_ms) => { metrics.scrape_success.increment(1); metrics.scrape_duration.record(duration_ms); let parsed = scrape::parse_prometheus(&body); update_store(store.clone(), &target.name, parsed).await; healthy_count += 1; } ScrapeResult::Timeout => { metrics.scrape_failures.increment(1); metrics.scrape_errors_timeout.increment(1); tracing::warn!(target = %target.name, "scrape timeout"); } ScrapeResult::ConnectionError(e) => { metrics.scrape_failures.increment(1); metrics.scrape_errors_connection.increment(1); tracing::warn!(target = %target.name, error = %e, "scrape connection error"); } ScrapeResult::HttpError(status) => { metrics.scrape_failures.increment(1); tracing::warn!(target = %target.name, status = status, "scrape HTTP error"); } } } metrics.targets_healthy.set(healthy_count as f64); } } } } async fn update_store(store: MetricsStore, target_name: &str, metrics: Vec) { let mut guard = store.write().await; guard.insert(target_name.to_string(), metrics); } async fn render_aggregated_metrics( store: web::Data, extra_group_labels: Vec<(String, String)>, ) -> String { let guard = store.read().await; let mut output = String::new(); for (target_name, metrics) in guard.iter() { for metric in metrics { let mut labels = metric.labels.clone(); labels.insert( "aggregated_by".to_string(), "metrics-aggregator".to_string(), ); labels.insert("source_target".to_string(), target_name.clone()); for (k, v) in &extra_group_labels { labels.insert(k.clone(), v.clone()); } let label_str = if labels.is_empty() { String::new() } else { let pairs: Vec = labels .iter() .map(|(k, v)| { format!( r#"{}="{}""#, k, v.replace('\\', "\\\\").replace('"', "\\\"") ) }) .collect(); format!("{{{}}}", pairs.join(",")) }; let _ = writeln!(&mut output, "{}{} {}", metric.name, label_str, metric.value); } } output } async fn render_pushed_metrics(stats_store: web::Data) -> String { let guard = stats_store.read().await; let mut output = String::new(); for (app_name, entry) in guard.iter() { let labels = [ format!(r#"app="{}""#, app_name), "aggregated_by".to_string(), "metrics-aggregator".to_string(), "push_source=true".to_string(), ]; let label_str = format!("{{{}}}", labels.join(",")); let h = &entry; let _ = writeln!( &mut output, "push_http_requests_total{} {}", label_str, h.requests_total ); let _ = writeln!( &mut output, "push_http_request_duration_ms_total{} {}", label_str, h.request_duration_ms_total ); let _ = writeln!( &mut output, "push_http_requests_2xx{} {}", label_str, h.requests_2xx ); let _ = writeln!( &mut output, "push_http_requests_4xx{} {}", label_str, h.requests_4xx ); let _ = writeln!( &mut output, "push_http_requests_5xx{} {}", label_str, h.requests_5xx ); for (endpoint, &count) in &h.endpoints { let sanitized = endpoint.replace([' ', '/'], "_").to_lowercase(); let ep_labels = format!( r#"app="{}",endpoint="{}",aggregated_by="metrics-aggregator",push_source="true""#, app_name, sanitized ); let _ = writeln!( &mut output, "push_http_endpoint_requests_total{{{}}} {}", ep_labels, count ); } // System metrics in Prometheus format let sys_labels = format!(r#"app="{}",aggregated_by="metrics-aggregator""#, app_name); let _ = writeln!( &mut output, "system_cpu_usage_percent{{{}}} {}", sys_labels, h.cpu_usage_percent ); let _ = writeln!( &mut output, "system_memory_used_mb{{{}}} {}", sys_labels, h.memory_used_mb ); let _ = writeln!( &mut output, "system_memory_total_mb{{{}}} {}", sys_labels, h.memory_total_mb ); let _ = writeln!( &mut output, "system_uptime_secs{{{}}} {}", sys_labels, h.uptime_secs ); // Business counters for (counter_name, value) in &h.business { let biz_labels = format!(r#"app="{}",aggregated_by="metrics-aggregator""#, app_name); let _ = writeln!(&mut output, "{}{{{}}} {}", counter_name, biz_labels, value); } // Token usage let ai_labels = format!(r#"app="{}",aggregated_by="metrics-aggregator""#, app_name); let _ = writeln!( &mut output, "ai_input_tokens_total{{{}}} {}", ai_labels, h.ai_input_tokens_total ); let _ = writeln!( &mut output, "ai_output_tokens_total{{{}}} {}", ai_labels, h.ai_output_tokens_total ); let _ = writeln!( &mut output, "ai_calls_total{{{}}} {}", ai_labels, h.ai_calls_total ); // Latency per endpoint for (endpoint, lat) in &h.latency { let lat_labels = format!( r#"app="{}",endpoint="{}",aggregated_by="metrics-aggregator""#, app_name, endpoint ); let _ = writeln!( &mut output, "latency_p99_ms{{{}}} {}", lat_labels, lat.p99_ms ); let _ = writeln!( &mut output, "latency_p90_ms{{{}}} {}", lat_labels, lat.p90_ms ); let _ = writeln!( &mut output, "latency_p50_ms{{{}}} {}", lat_labels, lat.p50_ms ); let _ = writeln!( &mut output, "latency_max_ms{{{}}} {}", lat_labels, lat.max_ms ); } } output } // ── JSON API handlers ──────────────────────────────────────────────────────── async fn handle_dashboard(stats_store: web::Data) -> HttpResponse { let dashboard = stats_store::build_dashboard(&stats_store).await; let json = serde_json::to_string(&dashboard).unwrap_or_default(); HttpResponse::Ok() .content_type("application/json") .body(json) } async fn handle_stats(stats_store: web::Data) -> HttpResponse { // Returns per-app stats as JSON let guard = stats_store.read().await; let json = serde_json::to_string(&*guard).unwrap_or_default(); HttpResponse::Ok() .content_type("application/json") .body(json) } async fn log_collector(loki: Option, mut shutdown: broadcast::Receiver<()>) { let stdin = tokio::io::stdin(); let mut reader = tokio::io::BufReader::new(stdin); let mut interval_tick = interval(Duration::from_secs(1)); let mut batch: Vec = Vec::with_capacity(100); let mut line_buf = String::new(); loop { tokio::select! { _ = shutdown.recv() => break, _ = interval_tick.tick() => { if !batch.is_empty() { if let Some(ref loki) = loki { if let Err(e) = loki.push(std::mem::take(&mut batch)).await { tracing::warn!(error = %e, "Loki push failed"); } } } } _ = async { line_buf.clear(); reader.read_line(&mut line_buf).await.ok() } => { if !line_buf.is_empty() { let line = line_buf.trim_end().to_string(); if !line.is_empty() { batch.push(LokiEntry { timestamp: chrono::Utc::now(), line, }); if batch.len() >= 100 { if let Some(ref loki) = loki { if let Err(e) = loki.push(std::mem::take(&mut batch)).await { tracing::warn!(error = %e, "Loki push failed"); } } } } } } } } }