diff --git a/apps/app/src/main.rs b/apps/app/src/main.rs index 7eb053c..469b6f1 100644 --- a/apps/app/src/main.rs +++ b/apps/app/src/main.rs @@ -1,24 +1,24 @@ use actix_cors::Cors; use actix_web::cookie::time::Duration; use actix_web::dev::{Service, ServiceRequest, ServiceResponse}; -use actix_web::{cookie::Key, web, App, HttpResponse, HttpServer}; +use actix_web::{App, HttpResponse, HttpServer, cookie::Key, web}; +use api::{robots, sidemap}; use clap::Parser; use db::cache::AppCache; use db::database::AppDatabase; use futures::future::LocalBoxFuture; use observability::{ - init_tracing_subscriber, install_recorder, prometheus_handler, spawn_http_metrics_poller, HttpMetrics, HttpSnapshotGuard, MetricsMiddleware, TracingSpanMiddleware, - push::MetricsPusher, + init_tracing_subscriber, install_recorder, prometheus_handler, push::MetricsPusher, + spawn_http_metrics_poller, }; use sea_orm::ConnectionTrait; use service::AppService; -use session::config::{PersistentSession, SessionLifecycle, TtlExtensionPolicy}; -use api::{robots, sidemap}; -use session::storage::RedisClusterSessionStore; use session::SessionMiddleware; -use std::task::{Context, Poll}; +use session::config::{PersistentSession, SessionLifecycle, TtlExtensionPolicy}; +use session::storage::RedisClusterSessionStore; use std::sync::Arc; +use std::task::{Context, Poll}; use std::time::Instant; mod args; @@ -63,7 +63,11 @@ struct RequestLoggerService { impl actix_web::dev::Service for RequestLoggerService where - S: actix_web::dev::Service, Error = actix_web::Error>, + S: actix_web::dev::Service< + ServiceRequest, + Response = ServiceResponse, + Error = actix_web::Error, + >, S::Future: 'static, B: 'static, { @@ -125,7 +129,9 @@ fn build_session_key(cfg: &AppConfig) -> anyhow::Result { .map_err(|e| anyhow::anyhow!("HKDF expand failed: {}", e))?; return Ok(Key::from(&okm)); } - tracing::warn!("APP_SESSION_SECRET not set, using generated key (sessions invalidated on restart)"); + tracing::warn!( + "APP_SESSION_SECRET not set, using generated key (sessions invalidated on restart)" + ); Ok(Key::generate()) } @@ -198,7 +204,11 @@ async fn main() -> anyhow::Result<()> { // Metrics pusher: periodically push all metrics to apps/metrics aggregator if let Some(push_url) = std::env::var("METRICS_PUSH_URL").ok() { let pusher = MetricsPusher::new(&push_url, "app"); - pusher.spawn(http_metrics.clone(), Arc::new(prometheus_handle.clone()), std::time::Duration::from_secs(15)); + pusher.spawn( + http_metrics.clone(), + Arc::new(prometheus_handle.clone()), + std::time::Duration::from_secs(15), + ); tracing::info!(push_url = %push_url, "Metrics pusher started (interval 15s)"); } @@ -208,7 +218,12 @@ async fn main() -> anyhow::Result<()> { let cors_origins: Vec = cfg .env .get("CORS_ORIGINS") - .map(|s| s.split(',').map(|s| s.trim().to_string()).filter(|s| !s.is_empty()).collect()) + .map(|s| { + s.split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect() + }) .unwrap_or_else(|| vec!["http://localhost:5173".to_string()]); let cookie_secure = cfg .env @@ -223,7 +238,13 @@ async fn main() -> anyhow::Result<()> { } let cors = cors .allowed_methods(["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"]) - .allowed_headers(["Content-Type", "Authorization", "X-Requested-With", "Accept", "Origin"]) + .allowed_headers([ + "Content-Type", + "Authorization", + "X-Requested-With", + "Accept", + "Origin", + ]) .supports_credentials() .max_age(3600); @@ -319,16 +340,8 @@ async fn health_check(state: web::Data) -> HttpResponse { } async fn db_ping(db: &AppDatabase) -> bool { - let writer_ok = db - .writer() - .execute_unprepared("SELECT 1") - .await - .is_ok(); - let reader_ok = db - .reader() - .execute_unprepared("SELECT 1") - .await - .is_ok(); + let writer_ok = db.writer().execute_unprepared("SELECT 1").await.is_ok(); + let reader_ok = db.reader().execute_unprepared("SELECT 1").await.is_ok(); writer_ok && reader_ok } diff --git a/apps/email/src/main.rs b/apps/email/src/main.rs index 5f362b8..6859014 100644 --- a/apps/email/src/main.rs +++ b/apps/email/src/main.rs @@ -1,8 +1,8 @@ use clap::Parser; use config::AppConfig; -use metrics::{describe_counter, Unit}; +use metrics::{Unit, describe_counter}; use metrics_exporter_prometheus::PrometheusHandle; -use observability::{init_tracing_subscriber, install_recorder, HttpMetrics, push::MetricsPusher}; +use observability::{HttpMetrics, init_tracing_subscriber, install_recorder, push::MetricsPusher}; use sea_orm::ConnectionTrait; use service::AppService; use std::sync::Arc; @@ -23,16 +23,8 @@ async fn http_handler( ) -> Result, std::convert::Infallible> { match req.uri().path() { "/health" => { - let writer_ok = db - .writer() - .execute_unprepared("SELECT 1") - .await - .is_ok(); - let reader_ok = db - .reader() - .execute_unprepared("SELECT 1") - .await - .is_ok(); + let writer_ok = db.writer().execute_unprepared("SELECT 1").await.is_ok(); + let reader_ok = db.reader().execute_unprepared("SELECT 1").await.is_ok(); let db_ok = writer_ok && reader_ok; let cache_ok = cache.conn().await.is_ok(); @@ -45,10 +37,12 @@ async fn http_handler( let status = if db_ok && cache_ok { 200 } else { 503 }; let body_bytes = match serde_json::to_string(&body) { Ok(s) => hyper::Body::from(s), - Err(e) => return Ok(hyper::Response::builder() - .status(500) - .body(hyper::Body::from(format!("serialize error: {}", e))) - .expect("static response")), + Err(e) => { + return Ok(hyper::Response::builder() + .status(500) + .body(hyper::Body::from(format!("serialize error: {}", e))) + .expect("static response")); + } }; Ok(hyper::Response::builder() .status(status) @@ -78,14 +72,42 @@ async fn main() -> anyhow::Result<()> { init_tracing_subscriber(&args.log_level, false); // Pre-register all email/queue metrics so they appear in /metrics even before first event. - describe_counter!("email_queued_total", Unit::Count, "Emails written to Redis stream"); - describe_counter!("email_consumed_total", Unit::Count, "Emails consumed from queue"); - describe_counter!("email_batch_size", Unit::Count, "Email consumer batch sizes accumulated"); - describe_counter!("email_validation_skipped_total", Unit::Count, "Emails skipped due to invalid recipient"); - describe_counter!("email_build_errors_total", Unit::Count, "Email message build failures"); - describe_counter!("email_send_attempts_total", Unit::Count, "SMTP send attempts (including retries)"); + describe_counter!( + "email_queued_total", + Unit::Count, + "Emails written to Redis stream" + ); + describe_counter!( + "email_consumed_total", + Unit::Count, + "Emails consumed from queue" + ); + describe_counter!( + "email_batch_size", + Unit::Count, + "Email consumer batch sizes accumulated" + ); + describe_counter!( + "email_validation_skipped_total", + Unit::Count, + "Emails skipped due to invalid recipient" + ); + describe_counter!( + "email_build_errors_total", + Unit::Count, + "Email message build failures" + ); + describe_counter!( + "email_send_attempts_total", + Unit::Count, + "SMTP send attempts (including retries)" + ); describe_counter!("email_sent_total", Unit::Count, "Emails sent successfully"); - describe_counter!("email_send_failures_total", Unit::Count, "Emails that failed after all retries"); + describe_counter!( + "email_send_failures_total", + Unit::Count, + "Emails that failed after all retries" + ); let metrics_handle = Arc::new(install_recorder()); let http_metrics = Arc::new(HttpMetrics::new()); // Worker app — HTTP section will be empty @@ -93,7 +115,11 @@ async fn main() -> anyhow::Result<()> { // Metrics pusher: periodically push all metrics to apps/metrics aggregator if let Some(push_url) = std::env::var("METRICS_PUSH_URL").ok() { let pusher = MetricsPusher::new(&push_url, "email"); - pusher.spawn(http_metrics.clone(), metrics_handle.clone(), std::time::Duration::from_secs(15)); + pusher.spawn( + http_metrics.clone(), + metrics_handle.clone(), + std::time::Duration::from_secs(15), + ); tracing::info!(push_url = %push_url, "Metrics pusher started (interval 15s)"); } diff --git a/apps/gingress/src/bin/kubectl-gingress/main.rs b/apps/gingress/src/bin/kubectl-gingress/main.rs index 7e9bac3..b8cfd03 100644 --- a/apps/gingress/src/bin/kubectl-gingress/main.rs +++ b/apps/gingress/src/bin/kubectl-gingress/main.rs @@ -86,7 +86,11 @@ async fn main() -> Result<(), Box> { match cli.command { Command::List { namespace, json } => cmd_list(&client, namespace, json).await?, - Command::Routes { namespace, host, json } => cmd_routes(&client, namespace, host, json).await?, + Command::Routes { + namespace, + host, + json, + } => cmd_routes(&client, namespace, host, json).await?, Command::Backends { namespace, json } => cmd_backends(&client, namespace, json).await?, Command::Certs { namespace, json } => cmd_certs(&client, namespace, json).await?, Command::Validate { namespace } => cmd_validate(&client, namespace).await?, @@ -98,7 +102,11 @@ async fn main() -> Result<(), Box> { // ── list ────────────────────────────────────────────────────────── -async fn cmd_list(client: &Client, namespace: Option, json: bool) -> Result<(), Box> { +async fn cmd_list( + client: &Client, + namespace: Option, + json: bool, +) -> Result<(), Box> { let ingresses = list_ingresses(client, namespace.as_deref()).await?; if json { @@ -111,7 +119,10 @@ async fn cmd_list(client: &Client, namespace: Option, json: bool) -> Res return Ok(()); } - println!("{:<25} {:<20} {:<40} {:<50} {:<15}", "NAMESPACE", "NAME", "HOSTS", "PATHS", "TLS"); + println!( + "{:<25} {:<20} {:<40} {:<50} {:<15}", + "NAMESPACE", "NAME", "HOSTS", "PATHS", "TLS" + ); println!("{:-<150}", ""); for ing in &ingresses { @@ -126,7 +137,8 @@ async fn cmd_list(client: &Client, namespace: Option, json: bool) -> Res .join(", "); let tls = if ing.has_tls() { "Enabled" } else { "-" }; - println!("{:<25} {:<20} {:<40} {:<50} {:<15}", + println!( + "{:<25} {:<20} {:<40} {:<50} {:<15}", truncate(&ns, 25), truncate(&name, 20), truncate(&hosts, 40), @@ -151,10 +163,18 @@ async fn cmd_routes( let mut routes: Vec = Vec::new(); for ing in &ingresses { - for rule in ing.spec.as_ref().and_then(|s| s.rules.as_ref()).into_iter().flatten() { + for rule in ing + .spec + .as_ref() + .and_then(|s| s.rules.as_ref()) + .into_iter() + .flatten() + { let host = rule.host.as_deref().unwrap_or("*"); if let Some(ref hf) = host_filter { - if host != hf { continue; } + if host != hf { + continue; + } } if let Some(http) = &rule.http { for path_item in &http.paths { @@ -184,13 +204,16 @@ async fn cmd_routes( return Ok(()); } - println!("{:<20} {:<20} {:<30} {:<18} {:<15} {:<15} {:<15}", - "NAMESPACE", "INGRESS", "HOST", "PATH", "TYPE", "BACKEND", "PORT"); + println!( + "{:<20} {:<20} {:<30} {:<18} {:<15} {:<15} {:<15}", + "NAMESPACE", "INGRESS", "HOST", "PATH", "TYPE", "BACKEND", "PORT" + ); println!("{:-<133}", ""); for r in &routes { let port = extract_backend_port_str(r); - println!("{:<20} {:<20} {:<30} {:<18} {:<15} {:<15} {:<15}", + println!( + "{:<20} {:<20} {:<30} {:<18} {:<15} {:<15} {:<15}", truncate(&r.namespace, 20), truncate(&r.ingress, 20), truncate(&r.host, 30), @@ -219,14 +242,25 @@ async fn cmd_backends( let mut seen = std::collections::HashSet::new(); for ing in &ingresses { - for rule in ing.spec.as_ref().and_then(|s| s.rules.as_ref()).into_iter().flatten() { + for rule in ing + .spec + .as_ref() + .and_then(|s| s.rules.as_ref()) + .into_iter() + .flatten() + { if let Some(http) = &rule.http { for path_item in &http.paths { let svc = match path_item.backend.service.as_ref() { Some(s) => s, None => continue, }; - let key = format!("{}/{}:{}", ing.namespace(), svc.name, svc.port.as_ref().and_then(|p| p.number).unwrap_or(80)); + let key = format!( + "{}/{}:{}", + ing.namespace(), + svc.name, + svc.port.as_ref().and_then(|p| p.number).unwrap_or(80) + ); if seen.insert(key.clone()) { let ns = ing.namespace(); let ep_status = get_endpoint_status(client, &ns, &svc.name).await; @@ -254,14 +288,25 @@ async fn cmd_backends( return Ok(()); } - println!("{:<20} {:<20} {:<8} {:<8} {:<18} {:<20}", - "NAMESPACE", "SERVICE", "PORT", "HEALTH", "ENDPOINTS", "REFERENCED BY"); + println!( + "{:<20} {:<20} {:<8} {:<8} {:<18} {:<20}", + "NAMESPACE", "SERVICE", "PORT", "HEALTH", "ENDPOINTS", "REFERENCED BY" + ); println!("{:-<94}", ""); for b in &backends { - let health = if b.total_endpoints == 0 { "WARN" } else if b.ready_endpoints == 0 { "DOWN" } else if b.ready_endpoints < b.total_endpoints { "PARTIAL" } else { "OK" }; + let health = if b.total_endpoints == 0 { + "WARN" + } else if b.ready_endpoints == 0 { + "DOWN" + } else if b.ready_endpoints < b.total_endpoints { + "PARTIAL" + } else { + "OK" + }; let eps = format!("{}/{} ready", b.ready_endpoints, b.total_endpoints); - println!("{:<20} {:<20} {:<8} {:<8} {:<18} {:<20}", + println!( + "{:<20} {:<20} {:<8} {:<8} {:<18} {:<20}", truncate(&b.namespace, 20), truncate(&b.service, 20), b.port, @@ -322,12 +367,16 @@ async fn cmd_certs( return Ok(()); } - println!("{:<20} {:<30} {:<30} {:<10}", "NAMESPACE", "SECRET", "HOST", "STATUS"); + println!( + "{:<20} {:<30} {:<30} {:<10}", + "NAMESPACE", "SECRET", "HOST", "STATUS" + ); println!("{:-<90}", ""); for c in &certs { let status = if c.found { "OK" } else { "MISSING" }; - println!("{:<20} {:<30} {:<30} {:<10}", + println!( + "{:<20} {:<30} {:<30} {:<10}", truncate(&c.namespace, 20), truncate(&c.secret_name, 30), truncate(&c.host, 30), @@ -375,12 +424,18 @@ async fn cmd_validate( for tls in &tls_entries { let secret_name = tls.secret_name.as_deref().unwrap_or(""); if secret_name.is_empty() { - println!("[{}/{}] WARNING: TLS configured but no secretName specified", ns, name); + println!( + "[{}/{}] WARNING: TLS configured but no secretName specified", + ns, name + ); warnings += 1; } else { let found = check_secret_exists(client, &ns, secret_name).await; if !found { - println!("[{}/{}] ERROR: TLS secret '{}' not found in namespace '{}'", ns, name, secret_name, ns); + println!( + "[{}/{}] ERROR: TLS secret '{}' not found in namespace '{}'", + ns, name, secret_name, ns + ); errors += 1; } } @@ -396,7 +451,9 @@ async fn cmd_validate( if endpoints.total == 0 { println!( "[{}/{}] WARNING: Backend service '{}' has no endpoints (host: {})", - ns, name, svc.name, + ns, + name, + svc.name, rule.host.as_deref().unwrap_or("*") ); warnings += 1; @@ -409,10 +466,17 @@ async fn cmd_validate( } if errors == 0 && warnings == 0 { - println!("Validation passed — no issues found in {} Ingress(es).", ingresses.len()); + println!( + "Validation passed — no issues found in {} Ingress(es).", + ingresses.len() + ); } else { - println!("\nValidation complete: {} error(s), {} warning(s) across {} Ingress(es).", - errors, warnings, ingresses.len()); + println!( + "\nValidation complete: {} error(s), {} warning(s) across {} Ingress(es).", + errors, + warnings, + ingresses.len() + ); } Ok(()) @@ -472,7 +536,9 @@ async fn cmd_status(client: &Client, json: bool) -> Result<(), Box Result<(), Box Result<(), Box String { self.namespace.clone() } - fn name_any(&self) -> String { self.name.clone() } - fn hosts(&self) -> &[String] { &self.hosts } - fn paths_display(&self) -> &[PathSummary] { &self.paths_for_display } - fn has_tls(&self) -> bool { self.has_tls } + fn namespace(&self) -> String { + self.namespace.clone() + } + fn name_any(&self) -> String { + self.name.clone() + } + fn hosts(&self) -> &[String] { + &self.hosts + } + fn paths_display(&self) -> &[PathSummary] { + &self.paths_for_display + } + fn has_tls(&self) -> bool { + self.has_tls + } } fn ingress_to_summary(ing: &Ingress) -> IngressSummary { @@ -663,7 +746,10 @@ async fn find_gingress_pods(client: &Client) -> Vec { } } -async fn list_ingresses(client: &Client, namespace: Option<&str>) -> Result, Box> { +async fn list_ingresses( + client: &Client, + namespace: Option<&str>, +) -> Result, Box> { let params = ListParams { ..Default::default() }; @@ -702,7 +788,11 @@ struct EndpointStatus { total: usize, } -async fn get_endpoint_status(client: &Client, namespace: &str, service_name: &str) -> EndpointStatus { +async fn get_endpoint_status( + client: &Client, + namespace: &str, + service_name: &str, +) -> EndpointStatus { use k8s_openapi::api::core::v1::Endpoints; let api: Api = Api::namespaced(client.clone(), namespace); match api.get_opt(service_name).await { diff --git a/apps/gingress/src/controller/endpoint_watcher.rs b/apps/gingress/src/controller/endpoint_watcher.rs index f89006a..6aaeefc 100644 --- a/apps/gingress/src/controller/endpoint_watcher.rs +++ b/apps/gingress/src/controller/endpoint_watcher.rs @@ -3,8 +3,8 @@ //! Tracks Pod IPs for each Service. When endpoints change (scale up/down, //! rolling restart, health check failures), the upstream pool is updated. -use futures::pin_mut; use futures::StreamExt; +use futures::pin_mut; use gingress_proxy::config::{ConfigStore, Endpoint}; use k8s_openapi::api::core::v1::Endpoints as K8sEndpoints; use kube::ResourceExt; @@ -109,10 +109,7 @@ fn process_endpoints( // If no ports at all, write an empty entry for the base key so the reconciler // can detect that this service has no endpoints. if port_groups.is_empty() { - store.set::>( - &format!("upstream:{}/{}", namespace, name), - &vec![], - ); + store.set::>(&format!("upstream:{}/{}", namespace, name), &vec![]); } store.signal_reload(); diff --git a/apps/gingress/src/controller/ingress_watcher.rs b/apps/gingress/src/controller/ingress_watcher.rs index 3fe9cab..4d0963b 100644 --- a/apps/gingress/src/controller/ingress_watcher.rs +++ b/apps/gingress/src/controller/ingress_watcher.rs @@ -1,7 +1,7 @@ //! Watches Kubernetes Ingress resources and converts them to routing rules. -use futures::pin_mut; use futures::StreamExt; +use futures::pin_mut; use gingress_proxy::config::{ ConfigStore, HeaderOp, PathType, RateLimitPolicy, RouteRule, SessionAffinityConfig, }; @@ -25,7 +25,9 @@ pub async fn watch_ingresses( let api = kube::Api::::all(client.as_ref().clone()); let config = watcher::Config { - field_selector: namespace.as_ref().map(|ns| format!("metadata.namespace={}", ns)), + field_selector: namespace + .as_ref() + .map(|ns| format!("metadata.namespace={}", ns)), ..Default::default() }; @@ -138,7 +140,10 @@ fn process_ingress(ingress: &Ingress, store: &ConfigStore, _ingress_class: &str) /// Convert a Kubernetes Ingress path to an internal RouteRule. fn ingress_path_to_route(host: &str, path: &HTTPIngressPath, namespace: &str) -> RouteRule { - let service = path.backend.service.as_ref() + let service = path + .backend + .service + .as_ref() .expect("Ingress backend must reference a service"); RouteRule { @@ -342,7 +347,10 @@ fn parse_session_affinity(val: &str) -> (bool, String, u64) { /// Parse git-backend annotation value. /// /// Format: "namespace/name:port" or "name:port" (namespace defaults to Ingress namespace). -fn parse_git_backend(val: &str, default_namespace: &str) -> Option { +fn parse_git_backend( + val: &str, + default_namespace: &str, +) -> Option { let val = val.trim(); // Split off port: "namespace/name:port" → ("namespace/name", "port") let (ns_name, port_str) = val.rsplit_once(':').unwrap_or((val, "")); diff --git a/apps/gingress/src/controller/secret_watcher.rs b/apps/gingress/src/controller/secret_watcher.rs index 5492409..db8e4d1 100644 --- a/apps/gingress/src/controller/secret_watcher.rs +++ b/apps/gingress/src/controller/secret_watcher.rs @@ -9,8 +9,8 @@ //! - After reconciliation, the reconciler copies certs to `tls:` for //! direct SNI lookup by the proxy. -use futures::pin_mut; use futures::StreamExt; +use futures::pin_mut; use gingress_proxy::config::{ConfigStore, TlsCert}; use kube::ResourceExt; use kube::runtime::watcher::{self, Event}; diff --git a/apps/git-hook/src/main.rs b/apps/git-hook/src/main.rs index bf8a316..29fc5b6 100644 --- a/apps/git-hook/src/main.rs +++ b/apps/git-hook/src/main.rs @@ -4,9 +4,9 @@ use db::cache::AppCache; use db::database::AppDatabase; use git::hook::HookService; use git::hook::embed::TagEmbedder; -use metrics::{describe_counter, Unit}; +use metrics::{Unit, describe_counter}; use metrics_exporter_prometheus::PrometheusHandle; -use observability::{init_tracing_subscriber, install_recorder, HttpMetrics, push::MetricsPusher}; +use observability::{HttpMetrics, init_tracing_subscriber, install_recorder, push::MetricsPusher}; use sea_orm::ConnectionTrait; use std::sync::Arc; use tokio::signal; @@ -21,7 +21,9 @@ async fn init_embed_service( db: &AppDatabase, ) -> Result> { let client = agent::new_embed_client(cfg).await?; - let model_name = cfg.get_embed_model_name().unwrap_or_else(|_| "text-embedding-3-small".into()); + let model_name = cfg + .get_embed_model_name() + .unwrap_or_else(|_| "text-embedding-3-small".into()); let dimensions = cfg.get_embed_model_dimensions().unwrap_or(1536); let svc = agent::embed::EmbedService::new(client, db.writer().clone(), model_name, dimensions); let _ = svc.ensure_collections().await; @@ -34,16 +36,24 @@ struct EmbedServiceAdapter(agent::embed::EmbedService); #[async_trait::async_trait] impl TagEmbedder for EmbedServiceAdapter { - async fn embed_tags_batch(&self, tags: Vec) -> Result<(), Box> { + async fn embed_tags_batch( + &self, + tags: Vec, + ) -> Result<(), Box> { // Convert from models::TagEmbedInput to agent's TagEmbedInput (same struct, different path) - let agent_tags: Vec = tags.into_iter().map(|t| agent::embed::TagEmbedInput { - repo_id: t.repo_id, - repo_name: t.repo_name, - project_id: t.project_id, - name: t.name, - description: t.description, - }).collect(); - self.0.embed_tags_batch(agent_tags).await + let agent_tags: Vec = tags + .into_iter() + .map(|t| agent::embed::TagEmbedInput { + repo_id: t.repo_id, + repo_name: t.repo_name, + project_id: t.project_id, + name: t.name, + description: t.description, + }) + .collect(); + self.0 + .embed_tags_batch(agent_tags) + .await .map_err(|e| Box::new(e) as Box) } } @@ -56,16 +66,8 @@ async fn http_handler( ) -> Result, std::convert::Infallible> { match req.uri().path() { "/health" => { - let writer_ok = db - .writer() - .execute_unprepared("SELECT 1") - .await - .is_ok(); - let reader_ok = db - .reader() - .execute_unprepared("SELECT 1") - .await - .is_ok(); + let writer_ok = db.writer().execute_unprepared("SELECT 1").await.is_ok(); + let reader_ok = db.reader().execute_unprepared("SELECT 1").await.is_ok(); let db_ok = writer_ok && reader_ok; let cache_ok = cache.conn().await.is_ok(); @@ -78,10 +80,12 @@ async fn http_handler( let status = if db_ok && cache_ok { 200 } else { 503 }; let body_bytes = match serde_json::to_string(&body) { Ok(s) => hyper::Body::from(s), - Err(e) => return Ok(hyper::Response::builder() - .status(500) - .body(hyper::Body::from(format!("serialize error: {}", e))) - .expect("static response")), + Err(e) => { + return Ok(hyper::Response::builder() + .status(500) + .body(hyper::Body::from(format!("serialize error: {}", e))) + .expect("static response")); + } }; Ok(hyper::Response::builder() .status(status) @@ -106,7 +110,6 @@ async fn http_handler( #[tokio::main] async fn main() -> anyhow::Result<()> { - let cfg = AppConfig::load(); let log_level = cfg.log_level().unwrap_or_else(|_| "info".to_string()); @@ -114,13 +117,41 @@ async fn main() -> anyhow::Result<()> { // Pre-register all hook metrics so they appear in /metrics even before first increment. describe_counter!("hook_tasks_total", Unit::Count, "Total hook tasks dequeued"); - describe_counter!("hook_tasks_success_total", Unit::Count, "Hook tasks completed successfully"); - describe_counter!("hook_tasks_failed_total", Unit::Count, "Hook tasks that failed"); - describe_counter!("hook_tasks_locked_total", Unit::Count, "Hook tasks re-queued due to repo lock"); - describe_counter!("hook_tasks_retried_total", Unit::Count, "Hook tasks that entered retry"); - describe_counter!("hook_tasks_exhausted_total", Unit::Count, "Hook tasks that exhausted retries"); - describe_counter!("hook_sync_branches_changed_total", Unit::Count, "Branches changed during sync"); - describe_counter!("hook_sync_tags_changed_total", Unit::Count, "Tags changed during sync"); + describe_counter!( + "hook_tasks_success_total", + Unit::Count, + "Hook tasks completed successfully" + ); + describe_counter!( + "hook_tasks_failed_total", + Unit::Count, + "Hook tasks that failed" + ); + describe_counter!( + "hook_tasks_locked_total", + Unit::Count, + "Hook tasks re-queued due to repo lock" + ); + describe_counter!( + "hook_tasks_retried_total", + Unit::Count, + "Hook tasks that entered retry" + ); + describe_counter!( + "hook_tasks_exhausted_total", + Unit::Count, + "Hook tasks that exhausted retries" + ); + describe_counter!( + "hook_sync_branches_changed_total", + Unit::Count, + "Branches changed during sync" + ); + describe_counter!( + "hook_sync_tags_changed_total", + Unit::Count, + "Tags changed during sync" + ); let metrics_handle = Arc::new(install_recorder()); let http_metrics = Arc::new(HttpMetrics::new()); // Worker app — HTTP section will be empty @@ -128,7 +159,11 @@ async fn main() -> anyhow::Result<()> { // Metrics pusher: periodically push all metrics to apps/metrics aggregator if let Some(push_url) = std::env::var("METRICS_PUSH_URL").ok() { let pusher = MetricsPusher::new(&push_url, "git-hook"); - pusher.spawn(http_metrics.clone(), metrics_handle.clone(), std::time::Duration::from_secs(15)); + pusher.spawn( + http_metrics.clone(), + metrics_handle.clone(), + std::time::Duration::from_secs(15), + ); tracing::info!(push_url = %push_url, "Metrics pusher started (interval 15s)"); } @@ -165,8 +200,7 @@ async fn main() -> anyhow::Result<()> { let health_db = db.clone(); let health_cache = cache.clone(); let health_metrics = metrics_handle.clone(); - let health_addr: std::net::SocketAddr = - ([0, 0, 0, 0], 8083).into(); + let health_addr: std::net::SocketAddr = ([0, 0, 0, 0], 8083).into(); let health_service = hyper::service::make_service_fn(move |_| { let db = health_db.clone(); let cache = health_cache.clone(); diff --git a/apps/gitserver/src/main.rs b/apps/gitserver/src/main.rs index c6597db..89e75c6 100644 --- a/apps/gitserver/src/main.rs +++ b/apps/gitserver/src/main.rs @@ -1,6 +1,6 @@ use clap::Parser; use config::AppConfig; -use observability::{init_tracing_subscriber, install_recorder, HttpMetrics, push::MetricsPusher}; +use observability::{HttpMetrics, init_tracing_subscriber, install_recorder, push::MetricsPusher}; use std::sync::Arc; #[derive(Parser, Debug)] @@ -23,7 +23,11 @@ async fn main() -> anyhow::Result<()> { // Metrics pusher: periodically push all metrics to apps/metrics aggregator if let Some(push_url) = std::env::var("METRICS_PUSH_URL").ok() { let pusher = MetricsPusher::new(&push_url, "gitserver"); - pusher.spawn(http_metrics.clone(), prometheus_handle.clone(), std::time::Duration::from_secs(15)); + pusher.spawn( + http_metrics.clone(), + prometheus_handle.clone(), + std::time::Duration::from_secs(15), + ); tracing::info!(push_url = %push_url, "Metrics pusher started (interval 15s)"); } diff --git a/apps/metrics/src/args.rs b/apps/metrics/src/args.rs index a61d51b..4e8e4b3 100644 --- a/apps/metrics/src/args.rs +++ b/apps/metrics/src/args.rs @@ -32,4 +32,4 @@ pub struct Args { #[arg(long)] pub no_loki: bool, -} \ No newline at end of file +} diff --git a/apps/metrics/src/hotreload.rs b/apps/metrics/src/hotreload.rs index 5d84df8..b19b1ee 100644 --- a/apps/metrics/src/hotreload.rs +++ b/apps/metrics/src/hotreload.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use tokio::sync::RwLock; -use crate::target::{load_targets_from_file, ScrapeTarget}; +use crate::target::{ScrapeTarget, load_targets_from_file}; pub async fn watch_targets_file( path: String, @@ -37,4 +37,4 @@ pub async fn watch_targets_file( } } } -} \ No newline at end of file +} diff --git a/apps/metrics/src/k8s_discovery.rs b/apps/metrics/src/k8s_discovery.rs index b6f64cc..1e3f403 100644 --- a/apps/metrics/src/k8s_discovery.rs +++ b/apps/metrics/src/k8s_discovery.rs @@ -11,7 +11,10 @@ pub async fn k8s_pod_discovery() -> Option> { let client = Client::builder() .timeout(Duration::from_secs(5)) - .add_default_header((awc::http::header::AUTHORIZATION.as_str(), format!("Bearer {}", token))) + .add_default_header(( + awc::http::header::AUTHORIZATION.as_str(), + format!("Bearer {}", token), + )) .finish(); let api_url = format!( @@ -64,4 +67,4 @@ pub async fn k8s_pod_discovery() -> Option> { .collect(); Some(targets) -} \ No newline at end of file +} diff --git a/apps/metrics/src/loki.rs b/apps/metrics/src/loki.rs index 3e02bb4..c170cd1 100644 --- a/apps/metrics/src/loki.rs +++ b/apps/metrics/src/loki.rs @@ -1,7 +1,7 @@ -use std::collections::HashMap; use chrono::{DateTime, Utc}; -use serde::Serialize; use reqwest::Client; +use serde::Serialize; +use std::collections::HashMap; #[derive(Clone)] pub struct LokiForwarder { @@ -37,7 +37,8 @@ impl LokiForwarder { let payload = LokiPayload { streams }; - let resp = self.client + let resp = self + .client .post(&self.url) .header("Content-Type", "application/json") .json(&payload) @@ -66,4 +67,4 @@ struct LokiStream { pub struct LokiEntry { pub timestamp: DateTime, pub line: String, -} \ No newline at end of file +} diff --git a/apps/metrics/src/main.rs b/apps/metrics/src/main.rs index d897d7e..2b01cbc 100644 --- a/apps/metrics/src/main.rs +++ b/apps/metrics/src/main.rs @@ -28,7 +28,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use actix_web::{web, HttpResponse, HttpServer}; +use actix_web::{HttpResponse, HttpServer, web}; use clap::Parser; use loki::{LokiEntry, LokiForwarder}; use metrics::AggMetrics; @@ -38,14 +38,13 @@ use scrape::{HttpClient, ScrapeResult}; use stats_store::StatsStore; use target::ScrapeTarget; use tokio::io::AsyncBufReadExt; -use tokio::sync::{broadcast, RwLock}; +use tokio::sync::{RwLock, broadcast}; use tokio::time::interval; type MetricsStore = Arc>>>; // StatsStore is defined in stats_store.rs — per-app aggregated data. - #[actix_web::main] async fn main() -> std::io::Result<()> { let args = args::Args::parse(); @@ -307,7 +306,8 @@ async fn handle_push( payload.tasks.as_ref(), &payload.latency, &payload.logs, - ).await; + ) + .await; // Forward logs to Loki if configured if !payload.logs.is_empty() { @@ -457,31 +457,64 @@ async fn render_pushed_metrics(stats_store: web::Data) -> String { let _ = writeln!( &mut output, "push_http_requests_total{} {}", - label_str, - h.requests_total + label_str, h.requests_total ); let _ = writeln!( &mut output, "push_http_request_duration_ms_total{} {}", - label_str, - h.request_duration_ms_total + label_str, h.request_duration_ms_total + ); + let _ = writeln!( + &mut output, + "push_http_requests_2xx{} {}", + label_str, h.requests_2xx + ); + let _ = writeln!( + &mut output, + "push_http_requests_4xx{} {}", + label_str, h.requests_4xx + ); + let _ = writeln!( + &mut output, + "push_http_requests_5xx{} {}", + label_str, h.requests_5xx ); - let _ = writeln!(&mut output, "push_http_requests_2xx{} {}", label_str, h.requests_2xx); - let _ = writeln!(&mut output, "push_http_requests_4xx{} {}", label_str, h.requests_4xx); - let _ = writeln!(&mut output, "push_http_requests_5xx{} {}", label_str, h.requests_5xx); for (endpoint, &count) in &h.endpoints { let sanitized = endpoint.replace([' ', '/'], "_").to_lowercase(); - let ep_labels = format!(r#"app="{}",endpoint="{}",aggregated_by="metrics-aggregator",push_source="true""#, app_name, sanitized); - let _ = writeln!(&mut output, "push_http_endpoint_requests_total{{{}}} {}", ep_labels, count); + let ep_labels = format!( + r#"app="{}",endpoint="{}",aggregated_by="metrics-aggregator",push_source="true""#, + app_name, sanitized + ); + let _ = writeln!( + &mut output, + "push_http_endpoint_requests_total{{{}}} {}", + ep_labels, count + ); } // System metrics in Prometheus format let sys_labels = format!(r#"app="{}",aggregated_by="metrics-aggregator""#, app_name); - let _ = writeln!(&mut output, "system_cpu_usage_percent{{{}}} {}", sys_labels, h.cpu_usage_percent); - let _ = writeln!(&mut output, "system_memory_used_mb{{{}}} {}", sys_labels, h.memory_used_mb); - let _ = writeln!(&mut output, "system_memory_total_mb{{{}}} {}", sys_labels, h.memory_total_mb); - let _ = writeln!(&mut output, "system_uptime_secs{{{}}} {}", sys_labels, h.uptime_secs); + let _ = writeln!( + &mut output, + "system_cpu_usage_percent{{{}}} {}", + sys_labels, h.cpu_usage_percent + ); + let _ = writeln!( + &mut output, + "system_memory_used_mb{{{}}} {}", + sys_labels, h.memory_used_mb + ); + let _ = writeln!( + &mut output, + "system_memory_total_mb{{{}}} {}", + sys_labels, h.memory_total_mb + ); + let _ = writeln!( + &mut output, + "system_uptime_secs{{{}}} {}", + sys_labels, h.uptime_secs + ); // Business counters for (counter_name, value) in &h.business { @@ -491,17 +524,48 @@ async fn render_pushed_metrics(stats_store: web::Data) -> String { // Token usage let ai_labels = format!(r#"app="{}",aggregated_by="metrics-aggregator""#, app_name); - let _ = writeln!(&mut output, "ai_input_tokens_total{{{}}} {}", ai_labels, h.ai_input_tokens_total); - let _ = writeln!(&mut output, "ai_output_tokens_total{{{}}} {}", ai_labels, h.ai_output_tokens_total); - let _ = writeln!(&mut output, "ai_calls_total{{{}}} {}", ai_labels, h.ai_calls_total); + let _ = writeln!( + &mut output, + "ai_input_tokens_total{{{}}} {}", + ai_labels, h.ai_input_tokens_total + ); + let _ = writeln!( + &mut output, + "ai_output_tokens_total{{{}}} {}", + ai_labels, h.ai_output_tokens_total + ); + let _ = writeln!( + &mut output, + "ai_calls_total{{{}}} {}", + ai_labels, h.ai_calls_total + ); // Latency per endpoint for (endpoint, lat) in &h.latency { - let lat_labels = format!(r#"app="{}",endpoint="{}",aggregated_by="metrics-aggregator""#, app_name, endpoint); - let _ = writeln!(&mut output, "latency_p99_ms{{{}}} {}", lat_labels, lat.p99_ms); - let _ = writeln!(&mut output, "latency_p90_ms{{{}}} {}", lat_labels, lat.p90_ms); - let _ = writeln!(&mut output, "latency_p50_ms{{{}}} {}", lat_labels, lat.p50_ms); - let _ = writeln!(&mut output, "latency_max_ms{{{}}} {}", lat_labels, lat.max_ms); + let lat_labels = format!( + r#"app="{}",endpoint="{}",aggregated_by="metrics-aggregator""#, + app_name, endpoint + ); + let _ = writeln!( + &mut output, + "latency_p99_ms{{{}}} {}", + lat_labels, lat.p99_ms + ); + let _ = writeln!( + &mut output, + "latency_p90_ms{{{}}} {}", + lat_labels, lat.p90_ms + ); + let _ = writeln!( + &mut output, + "latency_p50_ms{{{}}} {}", + lat_labels, lat.p50_ms + ); + let _ = writeln!( + &mut output, + "latency_max_ms{{{}}} {}", + lat_labels, lat.max_ms + ); } } diff --git a/apps/metrics/src/metrics.rs b/apps/metrics/src/metrics.rs index 68a1adc..6991a67 100644 --- a/apps/metrics/src/metrics.rs +++ b/apps/metrics/src/metrics.rs @@ -1,4 +1,6 @@ -use metrics::{describe_counter, describe_gauge, describe_histogram, Counter, Gauge, Histogram, Unit}; +use metrics::{ + Counter, Gauge, Histogram, Unit, describe_counter, describe_gauge, describe_histogram, +}; pub fn init() { describe_gauge!( diff --git a/apps/metrics/src/otel.rs b/apps/metrics/src/otel.rs index 2d7325c..8c69e97 100644 --- a/apps/metrics/src/otel.rs +++ b/apps/metrics/src/otel.rs @@ -36,5 +36,7 @@ pub fn init_otel(endpoint: &str, service_name: &str) -> anyhow::Result= cutoff); @@ -196,7 +199,11 @@ pub async fn build_dashboard(store: &StatsStore) -> DashboardResponse { } } - let avg_p99_ms = if p99_count > 0 { avg_p99 / p99_count as f64 } else { 0.0 }; + let avg_p99_ms = if p99_count > 0 { + avg_p99 / p99_count as f64 + } else { + 0.0 + }; DashboardResponse { timestamp: chrono::Utc::now().timestamp(), @@ -207,4 +214,4 @@ pub async fn build_dashboard(store: &StatsStore) -> DashboardResponse { total_output_tokens: total_output, total_ai_calls: total_calls, } -} \ No newline at end of file +} diff --git a/apps/metrics/src/target.rs b/apps/metrics/src/target.rs index f927b01..dad1e49 100644 --- a/apps/metrics/src/target.rs +++ b/apps/metrics/src/target.rs @@ -1,6 +1,6 @@ -use std::collections::HashMap; use anyhow::Context; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ScrapeTarget { @@ -27,8 +27,10 @@ impl ScrapeTarget { } pub async fn load_targets_from_file(path: &str) -> anyhow::Result> { - let content = tokio::fs::read_to_string(path).await.context("read targets file")?; - let targets: Vec = serde_json::from_str(&content) - .with_context(|| format!("parse targets file {path}"))?; + let content = tokio::fs::read_to_string(path) + .await + .context("read targets file")?; + let targets: Vec = + serde_json::from_str(&content).with_context(|| format!("parse targets file {path}"))?; Ok(targets) } diff --git a/apps/static/src/main.rs b/apps/static/src/main.rs index 4b572ea..68129bc 100644 --- a/apps/static/src/main.rs +++ b/apps/static/src/main.rs @@ -1,14 +1,14 @@ use actix_cors::Cors; use actix_files::Files; use actix_web::dev::{Service, ServiceRequest, ServiceResponse}; -use actix_web::{http::header, web, App, HttpResponse, HttpServer}; +use actix_web::{App, HttpResponse, HttpServer, http::header, web}; use futures::future::LocalBoxFuture; use log::info; +use observability::{HttpMetrics, init_tracing_subscriber, install_recorder, push::MetricsPusher}; use std::path::PathBuf; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Instant; -use observability::{init_tracing_subscriber, install_recorder, HttpMetrics, push::MetricsPusher}; /// Static file server for avatar, blob, and other static files /// Serves files from /data/{type} directories @@ -128,7 +128,11 @@ async fn main() -> anyhow::Result<()> { // Metrics pusher: periodically push all metrics to apps/metrics aggregator if let Some(push_url) = std::env::var("METRICS_PUSH_URL").ok() { let pusher = MetricsPusher::new(&push_url, "static"); - pusher.spawn(http_metrics.clone(), prometheus_handle.clone(), std::time::Duration::from_secs(15)); + pusher.spawn( + http_metrics.clone(), + prometheus_handle.clone(), + std::time::Duration::from_secs(15), + ); info!("Metrics pusher started (interval 15s, url: {})", push_url); } @@ -138,7 +142,14 @@ async fn main() -> anyhow::Result<()> { println!("Static file server starting..."); println!(" Root: {:?}", cfg.root); println!(" Bind: {}", bind); - println!(" CORS: {}", if cfg.cors_enabled { "enabled" } else { "disabled" }); + println!( + " CORS: {}", + if cfg.cors_enabled { + "enabled" + } else { + "disabled" + } + ); // Ensure all directories exist for name in ["avatar", "blob", "media", "static"] {