From 81e6ee3d486d9038b2369898dfb11b35efa16bc2 Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Tue, 21 Apr 2026 13:44:12 +0800 Subject: [PATCH] feat(observability): Phase 1-5 slog structured logging across platform Phase 1: add libs/observability crate (build_logger, instance_id); remove duplicate logger init from 4 crates Phase 2: Actix-web RequestLogger with trace_id; MetricsMiddleware + HttpMetrics Phase 3: Git SSH handle.rs slog struct; HTTP handler Logger kv Phase 4: AI client eprintln -> slog warn; billing ai_usage_recorded log Phase 5: SessionManager slog; workspace alert slog 2.x syntax --- Cargo.lock | 273 ++++++++++++++++++- Cargo.toml | 6 +- apps/app/Cargo.toml | 1 + apps/app/src/logging.rs | 49 +++- apps/app/src/main.rs | 59 +--- apps/git-hook/Cargo.toml | 1 + apps/git-hook/src/main.rs | 53 +--- apps/gitserver/Cargo.toml | 1 + apps/gitserver/src/main.rs | 51 +--- libs/agent/Cargo.toml | 1 + libs/agent/client.rs | 27 +- libs/git/http/handler.rs | 16 +- libs/git/http/routes.rs | 6 +- libs/git/ssh/handle.rs | 148 +++------- libs/observability/Cargo.toml | 27 ++ libs/observability/src/lib.rs | 24 ++ libs/observability/src/metrics_middleware.rs | 126 +++++++++ libs/observability/src/slog_json.rs | 91 +++++++ libs/observability/src/tracing_init.rs | 30 ++ libs/service/Cargo.toml | 1 + libs/service/agent/billing.rs | 20 ++ libs/service/agent/code_review.rs | 5 +- libs/service/agent/pr_summary.rs | 5 +- libs/service/lib.rs | 54 +--- libs/service/workspace/alert.rs | 6 +- 25 files changed, 725 insertions(+), 356 deletions(-) create mode 100644 libs/observability/Cargo.toml create mode 100644 libs/observability/src/lib.rs create mode 100644 libs/observability/src/metrics_middleware.rs create mode 100644 libs/observability/src/slog_json.rs create mode 100644 libs/observability/src/tracing_init.rs diff --git a/Cargo.lock b/Cargo.lock index 320da6d..5bd6bda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -435,6 +435,7 @@ dependencies = [ "sea-orm", "serde", "serde_json", + "slog", "thiserror 2.0.18", "tiktoken-rs", "tokio", @@ -648,6 +649,7 @@ dependencies = [ "db", "futures", "migrate", + "observability", "sea-orm", "serde_json", "service", @@ -1079,14 +1081,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.4.5", "bytes", "futures-util", "http 1.4.0", "http-body", "http-body-util", "itoa", - "matchit", + "matchit 0.7.3", "memchr", "mime", "percent-encoding", @@ -1099,6 +1101,31 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31b698c5f9a010f6573133b09e0de5408834d0c82f8d7475a89fc1867a71cd90" +dependencies = [ + "axum-core 0.5.6", + "bytes", + "futures-util", + "http 1.4.0", + "http-body", + "http-body-util", + "itoa", + "matchit 0.8.4", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde_core", + "sync_wrapper", + "tower 0.5.3", + "tower-layer", + "tower-service", +] + [[package]] name = "axum-core" version = "0.4.5" @@ -1119,6 +1146,24 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-core" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" +dependencies = [ + "bytes", + "futures-core", + "http 1.4.0", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "sync_wrapper", + "tower-layer", + "tower-service", +] + [[package]] name = "backoff" version = "0.4.0" @@ -2689,6 +2734,12 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" +[[package]] +name = "fixedbitset" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" + [[package]] name = "flate2" version = "1.1.9" @@ -3010,6 +3061,7 @@ dependencies = [ "config", "db", "git", + "observability", "reqwest 0.13.2", "slog", "tokio", @@ -3072,6 +3124,7 @@ dependencies = [ "config", "db", "git", + "observability", "slog", "tokio", ] @@ -4548,6 +4601,12 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "matrixmultiply" version = "0.3.10" @@ -4745,6 +4804,12 @@ dependencies = [ "pxfm", ] +[[package]] +name = "multimap" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" + [[package]] name = "nalgebra" version = "0.34.2" @@ -5002,6 +5067,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "observability" +version = "0.2.9" +dependencies = [ + "actix-web", + "chrono", + "futures", + "hostname", + "once_cell", + "serde_json", + "slog", + "tracing", + "tracing-subscriber", +] + [[package]] name = "once_cell" version = "1.21.4" @@ -5344,6 +5424,17 @@ dependencies = [ "sha2 0.10.9", ] +[[package]] +name = "petgraph" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8701b58ea97060d5e5b155d383a69952a60943f0e6dfe30b04c287beb0b27455" +dependencies = [ + "fixedbitset", + "hashbrown 0.15.5", + "indexmap 2.13.0", +] + [[package]] name = "pgvector" version = "0.4.1" @@ -5709,7 +5800,38 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.13.5", +] + +[[package]] +name = "prost" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568" +dependencies = [ + "bytes", + "prost-derive 0.14.3", +] + +[[package]] +name = "prost-build" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7" +dependencies = [ + "heck 0.5.0", + "itertools", + "log", + "multimap", + "petgraph", + "prettyplease", + "prost 0.14.3", + "prost-types 0.14.3", + "pulldown-cmark 0.13.3", + "pulldown-cmark-to-cmark", + "regex", + "syn 2.0.117", + "tempfile", ] [[package]] @@ -5725,13 +5847,35 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "prost-derive" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "prost-types" version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" dependencies = [ - "prost", + "prost 0.13.5", +] + +[[package]] +name = "prost-types" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8991c4cbdb8bc5b11f0b074ffe286c30e523de90fee5ba8132f1399f23cb3dd7" +dependencies = [ + "prost 0.14.3", ] [[package]] @@ -5777,12 +5921,32 @@ dependencies = [ "unicase", ] +[[package]] +name = "pulldown-cmark" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c3a14896dfa883796f1cb410461aef38810ea05f2b2c33c5aded3649095fdad" +dependencies = [ + "bitflags", + "memchr", + "unicase", +] + [[package]] name = "pulldown-cmark-escape" version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "007d8adb5ddab6f8e3f491ac63566a7d5002cc7ed73901f72057943fa71ae1ae" +[[package]] +name = "pulldown-cmark-to-cmark" +version = "22.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50793def1b900256624a709439404384204a5dc3a6ec580281bfaac35e882e90" +dependencies = [ + "pulldown-cmark 0.13.3", +] + [[package]] name = "pxfm" version = "0.1.28" @@ -5800,15 +5964,15 @@ dependencies = [ "futures", "futures-util", "parking_lot", - "prost", - "prost-types", + "prost 0.13.5", + "prost-types 0.13.5", "reqwest 0.12.28", "semver", "serde", "serde_json", "thiserror 1.0.69", "tokio", - "tonic", + "tonic 0.12.3", ] [[package]] @@ -6433,6 +6597,18 @@ dependencies = [ [[package]] name = "rpc" version = "0.2.9" +dependencies = [ + "anyhow", + "chrono", + "prost 0.14.3", + "prost-types 0.14.3", + "session_manager", + "slog", + "tokio", + "tonic 0.14.5", + "tonic-prost-build", + "uuid", +] [[package]] name = "rsa" @@ -7161,8 +7337,9 @@ dependencies = [ "mime_guess2", "models", "moka", + "observability", "p256", - "pulldown-cmark", + "pulldown-cmark 0.12.2", "queue", "quick-xml 0.37.5", "rand 0.10.0", @@ -7209,6 +7386,23 @@ dependencies = [ "uuid", ] +[[package]] +name = "session_manager" +version = "0.2.9" +dependencies = [ + "anyhow", + "chrono", + "deadpool-redis", + "rand 0.10.0", + "redis", + "serde", + "serde_json", + "slog", + "thiserror 2.0.18", + "tokio", + "uuid", +] + [[package]] name = "sha1" version = "0.10.6" @@ -8132,7 +8326,7 @@ checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.7.9", "base64 0.22.1", "bytes", "flate2", @@ -8145,7 +8339,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost", + "prost 0.13.5", "rustls-native-certs 0.8.3", "rustls-pemfile", "socket2 0.5.10", @@ -8158,6 +8352,63 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fec7c61a0695dc1887c1b53952990f3ad2e3a31453e1f49f10e75424943a93ec" +dependencies = [ + "async-trait", + "axum 0.8.9", + "base64 0.22.1", + "bytes", + "h2 0.4.13", + "http 1.4.0", + "http-body", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "socket2 0.6.3", + "sync_wrapper", + "tokio", + "tokio-stream", + "tower 0.5.3", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1882ac3bf5ef12877d7ed57aad87e75154c11931c2ba7e6cde5e22d63522c734" +dependencies = [ + "prettyplease", + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "tonic-prost-build" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3144df636917574672e93d0f56d7edec49f90305749c668df5101751bb8f95a" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "prost-types 0.14.3", + "quote", + "syn 2.0.117", + "tempfile", + "tonic-build", +] + [[package]] name = "tower" version = "0.4.13" @@ -8186,7 +8437,9 @@ checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4" dependencies = [ "futures-core", "futures-util", + "indexmap 2.13.0", "pin-project-lite", + "slab", "sync_wrapper", "tokio", "tokio-util", diff --git a/Cargo.toml b/Cargo.toml index 5d27e3b..00d72e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ members = [ "libs/webhook", "libs/transport", "libs/rpc", + "libs/observability", "libs/avatar", "libs/agent", "libs/migrate", @@ -44,8 +45,10 @@ api = { path = "libs/api" } agent = { path = "libs/agent" } webhook = { path = "libs/webhook" } rpc = { path = "libs/rpc" } +observability = { path = "libs/observability" } avatar = { path = "libs/avatar" } migrate = { path = "libs/migrate" } +session_manager = { path = "libs/session_manager" } sea-query = "1.0.0-rc.31" @@ -99,6 +102,7 @@ opentelemetry-http = "0.31.0" prost = "0.14.3" prost-build = "0.14.3" qdrant-client = "1.17.0" +prost-types = "0.14.3" rand = "0.10.0" russh = { version = "0.50.0", default-features = false, features = [] } hmac = { version = "0.12.1", features = ["std"] } @@ -133,7 +137,7 @@ clap = "4.6.0" time = "0.3.47" chrono = "0.4.44" tracing = "0.1.44" -tracing-subscriber = "0.3.23" +tracing-subscriber = { version = "0.3.23", features = ["env-filter"] } tracing-opentelemetry = "0.32.1" tonic = "0.14.5" tonic-build = "0.14.5" diff --git a/apps/app/Cargo.toml b/apps/app/Cargo.toml index 83a12ec..0be99fa 100644 --- a/apps/app/Cargo.toml +++ b/apps/app/Cargo.toml @@ -16,6 +16,7 @@ documentation.workspace = true tokio = { workspace = true, features = ["full"] } uuid = { workspace = true } service = { workspace = true } +observability = { workspace = true } api = { workspace = true } session = { workspace = true } config = { workspace = true } diff --git a/apps/app/src/logging.rs b/apps/app/src/logging.rs index 26a1068..b899214 100644 --- a/apps/app/src/logging.rs +++ b/apps/app/src/logging.rs @@ -1,7 +1,7 @@ //! Structured HTTP request logging middleware using slog. //! //! Logs every incoming request with method, path, status code, -//! response time, client IP, and authenticated user ID. +//! response time, client IP, authenticated user ID, and trace_id. use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform}; use futures::future::{LocalBoxFuture, Ready, ok}; @@ -83,6 +83,7 @@ where .map(|s| s.to_string()) .unwrap_or_else(|| "unknown".to_string()); let user_id: Option = req.get_session().user(); + let trace_id = Uuid::now_v7().to_string(); let full_path = if query.is_empty() { path.clone() @@ -104,20 +105,42 @@ where let user_id_str = user_id .map(|u: Uuid| u.to_string()) .unwrap_or_else(|| "-".to_string()); - let log_message = format!( - "HTTP request | method={} | path={} | status={} | duration_ms={} | remote={} | user_id={}", - method, - full_path, - status_code, - elapsed.as_millis(), - remote, - user_id_str - ); + let duration_ms = elapsed.as_millis() as u64; match status_code { - 200..=299 => slog_info!(&log, "{}", log_message), - 400..=499 => slog_warn!(&log, "{}", log_message), - _ => slog_error!(&log, "{}", log_message), + 200..=299 => { + slog_info!(log, "http_request"; + "method" => %method, + "path" => %full_path, + "status" => status_code, + "duration_ms" => duration_ms, + "remote" => %remote, + "user_id" => %user_id_str, + "trace_id" => %trace_id + ); + } + 400..=499 => { + slog_warn!(log, "http_request"; + "method" => %method, + "path" => %full_path, + "status" => status_code, + "duration_ms" => duration_ms, + "remote" => %remote, + "user_id" => %user_id_str, + "trace_id" => %trace_id + ); + } + _ => { + slog_error!(log, "http_request"; + "method" => %method, + "path" => %full_path, + "status" => status_code, + "duration_ms" => duration_ms, + "remote" => %remote, + "user_id" => %user_id_str, + "trace_id" => %trace_id + ); + } } } Ok(res) diff --git a/apps/app/src/main.rs b/apps/app/src/main.rs index b03e2db..3448247 100644 --- a/apps/app/src/main.rs +++ b/apps/app/src/main.rs @@ -10,7 +10,7 @@ use service::AppService; use session::SessionMiddleware; use session::config::{PersistentSession, SessionLifecycle, TtlExtensionPolicy}; use session::storage::RedisClusterSessionStore; -use slog::Drain; +use observability::{build_logger, MetricsMiddleware, HttpMetrics}; mod args; mod logging; @@ -25,56 +25,6 @@ pub struct AppState { pub cache: AppCache, } -fn build_slog_logger(level: &str) -> slog::Logger { - let level_filter = match level { - "trace" => 0usize, - "debug" => 1usize, - "info" => 2usize, - "warn" => 3usize, - "error" => 4usize, - _ => 2usize, - }; - - struct StderrDrain(usize); - - impl Drain for StderrDrain { - type Ok = (); - type Err = (); - #[inline] - fn log(&self, record: &slog::Record, _logger: &slog::OwnedKVList) -> Result<(), ()> { - let slog_level = match record.level() { - slog::Level::Trace => 0, - slog::Level::Debug => 1, - slog::Level::Info => 2, - slog::Level::Warning => 3, - slog::Level::Error => 4, - slog::Level::Critical => 5, - }; - if slog_level < self.0 { - return Ok(()); - } - let _ = eprintln!( - "{} [{}] {}:{} - {}", - chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ"), - record.level().to_string(), - record - .file() - .rsplit_once('/') - .map(|(_, s)| s) - .unwrap_or(record.file()), - record.line(), - record.msg(), - ); - Ok(()) - } - } - - let drain = StderrDrain(level_filter); - let drain = std::sync::Mutex::new(drain); - let drain = slog::Fuse::new(drain); - slog::Logger::root(drain, slog::o!()) -} - fn build_session_key(cfg: &AppConfig) -> anyhow::Result { if let Some(secret) = cfg.env.get("APP_SESSION_SECRET") { let bytes: Vec = secret.as_bytes().iter().cycle().take(64).copied().collect(); @@ -87,7 +37,7 @@ fn build_session_key(cfg: &AppConfig) -> anyhow::Result { async fn main() -> anyhow::Result<()> { let cfg = AppConfig::load(); let log_level = cfg.log_level().unwrap_or_else(|_| "info".to_string()); - let log = build_slog_logger(&log_level); + let log = build_logger(&log_level); slog::info!( log, "Starting {} {}", @@ -125,7 +75,9 @@ async fn main() -> anyhow::Result<()> { }); let bind_addr = args.bind.unwrap_or_else(|| "127.0.0.1:8080".to_string()); + let http_metrics = std::sync::Arc::new(HttpMetrics::new()); slog::info!(log, "Listening on {}", bind_addr); + let http_metrics_server = http_metrics.clone(); HttpServer::new(move || { let cors = Cors::default() .allow_any_origin() @@ -146,10 +98,13 @@ async fn main() -> anyhow::Result<()> { )) .build(); + let metrics_mw = MetricsMiddleware::new(http_metrics_server.clone()); + App::new() .wrap(cors) .wrap(session_mw) .wrap(Logger::default().exclude("/health")) + .wrap(metrics_mw) .app_data(web::Data::new(AppState { db: db.clone(), cache: cache.clone(), diff --git a/apps/git-hook/Cargo.toml b/apps/git-hook/Cargo.toml index 2cef8bb..24704d4 100644 --- a/apps/git-hook/Cargo.toml +++ b/apps/git-hook/Cargo.toml @@ -15,6 +15,7 @@ documentation.workspace = true [dependencies] tokio = { workspace = true, features = ["full"] } git = { workspace = true } +observability = { workspace = true } db = { workspace = true } config = { workspace = true } tracing = { workspace = true } diff --git a/apps/git-hook/src/main.rs b/apps/git-hook/src/main.rs index 457871e..2188b41 100644 --- a/apps/git-hook/src/main.rs +++ b/apps/git-hook/src/main.rs @@ -3,7 +3,7 @@ use config::AppConfig; use db::cache::AppCache; use db::database::AppDatabase; use git::hook::HookService; -use slog::{Drain, OwnedKVList, Record}; +use observability::build_logger; use tokio::signal; mod args; @@ -17,7 +17,7 @@ async fn main() -> anyhow::Result<()> { // 2. Init slog logging let log_level = cfg.log_level().unwrap_or_else(|_| "info".to_string()); - let log = build_slog_logger(&log_level); + let log = build_logger(&log_level); // 3. Connect to database let db = AppDatabase::init(&cfg).await?; @@ -81,52 +81,3 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -fn build_slog_logger(level: &str) -> slog::Logger { - let level_filter = match level { - "trace" => 0usize, - "debug" => 1usize, - "info" => 2usize, - "warn" => 3usize, - "error" => 4usize, - _ => 2usize, - }; - - struct StderrDrain(usize); - - impl Drain for StderrDrain { - type Ok = (); - type Err = (); - #[inline] - fn log(&self, record: &Record, _logger: &OwnedKVList) -> Result<(), ()> { - let slog_level = match record.level() { - slog::Level::Trace => 0, - slog::Level::Debug => 1, - slog::Level::Info => 2, - slog::Level::Warning => 3, - slog::Level::Error => 4, - slog::Level::Critical => 5, - }; - if slog_level < self.0 { - return Ok(()); - } - let _ = eprintln!( - "{} [{}] {}:{} - {}", - chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ"), - record.level().to_string(), - record - .file() - .rsplit_once('/') - .map(|(_, s)| s) - .unwrap_or(record.file()), - record.line(), - record.msg(), - ); - Ok(()) - } - } - - let drain = StderrDrain(level_filter); - let drain = std::sync::Mutex::new(drain); - let drain = slog::Fuse::new(drain); - slog::Logger::root(drain, slog::o!()) -} diff --git a/apps/gitserver/Cargo.toml b/apps/gitserver/Cargo.toml index 5b11284..b306e12 100644 --- a/apps/gitserver/Cargo.toml +++ b/apps/gitserver/Cargo.toml @@ -19,6 +19,7 @@ path = "src/main.rs" [dependencies] tokio = { workspace = true, features = ["full"] } git = { workspace = true } +observability = { workspace = true } db = { workspace = true } config = { workspace = true } slog = { workspace = true } diff --git a/apps/gitserver/src/main.rs b/apps/gitserver/src/main.rs index 17c077f..33cba9c 100644 --- a/apps/gitserver/src/main.rs +++ b/apps/gitserver/src/main.rs @@ -1,6 +1,6 @@ use clap::Parser; use config::AppConfig; -use slog::{Drain, OwnedKVList, Record}; +use observability::build_logger; #[derive(Parser, Debug)] #[command(name = "gitserver")] @@ -43,52 +43,3 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -fn build_logger(level: &str) -> slog::Logger { - let level_filter = match level { - "trace" => 0usize, - "debug" => 1usize, - "info" => 2usize, - "warn" => 3usize, - "error" => 4usize, - _ => 2usize, - }; - - struct StderrDrain(usize); - - impl Drain for StderrDrain { - type Ok = (); - type Err = (); - #[inline] - fn log(&self, record: &Record, _logger: &OwnedKVList) -> Result<(), ()> { - let slog_level = match record.level() { - slog::Level::Trace => 0, - slog::Level::Debug => 1, - slog::Level::Info => 2, - slog::Level::Warning => 3, - slog::Level::Error => 4, - slog::Level::Critical => 5, - }; - if slog_level < self.0 { - return Ok(()); - } - let _ = eprintln!( - "{} [{}] {}:{} - {}", - chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ"), - record.level().to_string(), - record - .file() - .rsplit_once('/') - .map(|(_, s)| s) - .unwrap_or(record.file()), - record.line(), - record.msg(), - ); - Ok(()) - } - } - - let drain = StderrDrain(level_filter); - let drain = std::sync::Mutex::new(drain); - let drain = slog::Fuse::new(drain); - slog::Logger::root(drain, slog::o!()) -} diff --git a/libs/agent/Cargo.toml b/libs/agent/Cargo.toml index ecb2bee..fced2a4 100644 --- a/libs/agent/Cargo.toml +++ b/libs/agent/Cargo.toml @@ -33,5 +33,6 @@ tiktoken-rs = { workspace = true } agent-tool-derive = { path = "../agent-tool-derive" } once_cell = { workspace = true } regex = { workspace = true } +slog = { workspace = true } [lints] workspace = true diff --git a/libs/agent/client.rs b/libs/agent/client.rs index ef1cf89..058bde7 100644 --- a/libs/agent/client.rs +++ b/libs/agent/client.rs @@ -14,19 +14,22 @@ use async_openai::types::chat::{ use std::time::Instant; use crate::error::{AgentError, Result}; +use slog::warn; /// Configuration for the AI client. #[derive(Clone)] pub struct AiClientConfig { pub api_key: String, pub base_url: Option, + pub logger: slog::Logger, } impl AiClientConfig { - pub fn new(api_key: String) -> Self { + pub fn new(api_key: String, logger: slog::Logger) -> Self { Self { api_key, base_url: None, + logger, } } @@ -172,11 +175,12 @@ pub async fn call_with_retry( Err(err) => { if state.should_retry() && is_retryable_error(&err) { let duration = state.backoff_duration(); - eprintln!( - "AI call failed (attempt {}/{}), retrying in {:?}", - state.attempt + 1, - state.max_retries, - duration + warn!(config.logger, "ai_call_retry"; + "attempt" => state.attempt + 1, + "max_retries" => state.max_retries, + "backoff_ms" => duration.as_millis() as u64, + "model" => %model, + "error" => %err.to_string() ); tokio::time::sleep(duration).await; state.next(); @@ -239,11 +243,12 @@ pub async fn call_with_params( Err(err) => { if state.should_retry() && is_retryable_error(&err) { let duration = state.backoff_duration(); - eprintln!( - "AI call failed (attempt {}/{}), retrying in {:?}", - state.attempt + 1, - state.max_retries, - duration + warn!(config.logger, "ai_call_retry"; + "attempt" => state.attempt + 1, + "max_retries" => state.max_retries, + "backoff_ms" => duration.as_millis() as u64, + "model" => %model, + "error" => %err.to_string() ); tokio::time::sleep(duration).await; state.next(); diff --git a/libs/git/http/handler.rs b/libs/git/http/handler.rs index 97ef928..2a4fee4 100644 --- a/libs/git/http/handler.rs +++ b/libs/git/http/handler.rs @@ -4,9 +4,10 @@ use futures_util::Stream; use futures_util::StreamExt; use models::repos::{repo, repo_branch_protect}; use sea_orm::*; +use slog::{error, info, warn, Logger}; use std::path::PathBuf; use std::pin::Pin; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::io::AsyncWriteExt; use db::database::AppDatabase; @@ -24,14 +25,16 @@ pub struct GitHttpHandler { storage_path: PathBuf, repo: repo::Model, db: AppDatabase, + logger: Logger, } impl GitHttpHandler { - pub fn new(storage_path: PathBuf, repo: repo::Model, db: AppDatabase) -> Self { + pub fn new(storage_path: PathBuf, repo: repo::Model, db: AppDatabase, logger: Logger) -> Self { Self { storage_path, repo, db, + logger, } } @@ -92,6 +95,8 @@ impl GitHttpHandler { service: &str, mut payload: web::Payload, ) -> Result { + let started = Instant::now(); + info!(self.logger, "git_rpc_started"; "service" => %service, "repo" => %self.repo.repo_name, "repo_id" => self.repo.id.to_string()); let mut child = tokio::process::Command::new("git") .arg(service) .arg("--stateless-rpc") @@ -133,6 +138,7 @@ impl GitHttpHandler { // Reject oversized pre-PACK data to prevent memory exhaustion if pre_pack.len() + bytes.len() > PRE_PACK_LIMIT { + warn!(self.logger, "git_rpc_payload_too_large"; "service" => %service, "repo" => %self.repo.repo_name, "repo_id" => self.repo.id.to_string()); return Err(actix_web::error::ErrorPayloadTooLarge(format!( "Ref negotiation exceeds {} byte limit", PRE_PACK_LIMIT @@ -143,6 +149,7 @@ impl GitHttpHandler { pre_pack.extend_from_slice(&bytes[..pos]); if let Err(msg) = check_branch_protection(&branch_protects, &pre_pack) { + warn!(self.logger, "branch_protection_violation"; "repo" => %self.repo.repo_name, "repo_id" => self.repo.id.to_string(), "message" => %msg); return Err(actix_web::error::ErrorForbidden(msg)); } @@ -202,12 +209,17 @@ impl GitHttpHandler { if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); + let ms = started.elapsed().as_millis() as u64; + error!(self.logger, "git_rpc_failed"; "service" => %service, "repo" => %self.repo.repo_name, "repo_id" => self.repo.id.to_string(), "duration_ms" => ms, "stderr" => %stderr.to_string()); return Err(actix_web::error::ErrorInternalServerError(format!( "Git command failed: {}", stderr ))); } + let ms = started.elapsed().as_millis() as u64; + info!(self.logger, "git_rpc_completed"; "service" => %service, "repo" => %self.repo.repo_name, "repo_id" => self.repo.id.to_string(), "duration_ms" => ms, "bytes_out" => output.stdout.len()); + Ok(HttpResponse::Ok() .content_type(format!("application/x-git-{}-result", service)) .insert_header(("Cache-Control", "no-cache")) diff --git a/libs/git/http/routes.rs b/libs/git/http/routes.rs index 0fce26a..322b178 100644 --- a/libs/git/http/routes.rs +++ b/libs/git/http/routes.rs @@ -37,7 +37,7 @@ pub async fn info_refs( authorize_repo_access(&req, &state.db, &state.logger, &model, is_write).await?; let storage_path = PathBuf::from(&model.storage_path); - let handler = GitHttpHandler::new(storage_path, model, state.db.clone()); + let handler = GitHttpHandler::new(storage_path, model, state.db.clone(), state.logger.clone()); handler.info_refs(service_param).await } @@ -59,7 +59,7 @@ pub async fn upload_pack( authorize_repo_access(&req, &state.db, &state.logger, &model, false).await?; let storage_path = PathBuf::from(&model.storage_path); - let handler = GitHttpHandler::new(storage_path, model, state.db.clone()); + let handler = GitHttpHandler::new(storage_path, model, state.db.clone(), state.logger.clone()); handler.upload_pack(payload).await } @@ -81,7 +81,7 @@ pub async fn receive_pack( authorize_repo_access(&req, &state.db, &state.logger, &model, true).await?; let storage_path = PathBuf::from(&model.storage_path); - let handler = GitHttpHandler::new(storage_path, model.clone(), state.db.clone()); + let handler = GitHttpHandler::new(storage_path, model.clone(), state.db.clone(), state.logger.clone()); let result = handler.receive_pack(payload).await; let _ = tokio::spawn({ diff --git a/libs/git/ssh/handle.rs b/libs/git/ssh/handle.rs index 1d80f7b..68b955b 100644 --- a/libs/git/ssh/handle.rs +++ b/libs/git/ssh/handle.rs @@ -137,7 +137,7 @@ impl Drop for SSHandle { .client_addr .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); - info!(self.logger, "SSH handler dropped for client: {}", addr_str); + info!(self.logger, "ssh_handler_dropped"; "client" => %addr_str); let channel_ids: Vec<_> = self.stdin.keys().copied().collect(); for channel_id in channel_ids { @@ -154,10 +154,7 @@ impl russh::server::Handler for SSHandle { .client_addr .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); - info!( - self.logger, - "auth_none received for user '{}', client: {}", user, client_info - ); + info!(self.logger, "auth_none_received"; "user" => %user, "client" => %client_info); Ok(Auth::UnsupportedMethod) } @@ -169,42 +166,25 @@ impl russh::server::Handler for SSHandle { if token.is_empty() { - warn!( - self.logger, - "auth_password rejected: empty token, client: {}", client_info - ); + warn!(self.logger, "auth_rejected_empty_token"; "client" => %client_info); return Err(russh::Error::NotAuthenticated); } - info!( - self.logger, - "Attempting SSH token authentication, client: {}", client_info - ); + info!(self.logger, "auth_token_attempt"; "client" => %client_info); let user_model = match self.token_service.find_user_by_token(token).await { Ok(Some(model)) => model, Ok(None) => { - warn!( - self.logger, - "SSH token auth rejected: token not found or expired, client: {}", client_info - ); + warn!(self.logger, "auth_rejected_token_not_found"; "client" => %client_info); return Err(russh::Error::NotAuthenticated); } Err(e) => { - error!( - self.logger, - "SSH token auth error: {}, client: {}", e, client_info - ); + error!(self.logger, "auth_token_error"; "error" => %e.to_string(), "client" => %client_info); return Err(russh::Error::NotAuthenticated); } }; - info!( - self.logger, - "SSH token authentication successful: user={}, client={}", - user_model.username, - client_info - ); + info!(self.logger, "auth_token_success"; "user" => %user_model.username, "client" => %client_info); self.operator = Some(user_model); Ok(Auth::Accept) } @@ -226,49 +206,29 @@ impl russh::server::Handler for SSHandle { .unwrap_or_else(|| "unknown".to_string()); if user != "git" { - let msg = format!( - "SSH auth rejected: invalid username '{}', client: {}", - user, client_info - ); - warn!(self.logger, "{}", msg); + warn!(self.logger, "auth_rejected_invalid_username"; "user" => %user, "client" => %client_info); return Err(russh::Error::NotAuthenticated); } let public_key_str = public_key.to_string(); if public_key_str.len() < 32 { - let msg = format!( - "SSH auth rejected: invalid public key length ({}), client: {}", - public_key_str.len(), - client_info - ); - warn!(self.logger, "{}", msg); + warn!(self.logger, "auth_rejected_invalid_key_length"; "key_length" => public_key_str.len(), "client" => %client_info); return Err(russh::Error::NotAuthenticated); } - info!( - self.logger, - "Attempting SSH authentication with public key, client: {}", client_info - ); + info!(self.logger, "auth_publickey_attempt"; "client" => %client_info); let user_model = match self.auth.find_user_by_public_key(&public_key_str).await { Ok(Some(model)) => model, Ok(None) => { - let msg = format!( - "SSH auth rejected: public key not found or invalid, client: {}", - client_info - ); - warn!(self.logger, "{}", msg); + warn!(self.logger, "auth_rejected_key_not_found"; "client" => %client_info); return Err(russh::Error::NotAuthenticated); } Err(e) => { - let msg = format!("SSH auth error: {}, client: {}", e, client_info); - error!(self.logger, "{}", msg); + error!(self.logger, "auth_publickey_error"; "error" => %e.to_string(), "client" => %client_info); return Err(russh::Error::NotAuthenticated); } }; - info!( - self.logger, - "SSH authentication successful: user={}, client={}", user_model.username, client_info - ); + info!(self.logger, "auth_publickey_success"; "user" => %user_model.username, "client" => %client_info); self.operator = Some(user_model); Ok(Auth::Accept) } @@ -283,49 +243,29 @@ impl russh::server::Handler for SSHandle { .unwrap_or_else(|| "unknown".to_string()); if user != "git" { - let msg = format!( - "SSH auth rejected: invalid username '{}', client: {}", - user, client_info - ); - warn!(self.logger, "{}", msg); + warn!(self.logger, "auth_rejected_invalid_username"; "user" => %user, "client" => %client_info); return Err(russh::Error::NotAuthenticated); } let public_key_str = certificate.to_string(); if public_key_str.len() < 32 { - let msg = format!( - "SSH auth rejected: invalid public key length ({}), client: {}", - public_key_str.len(), - client_info - ); - warn!(self.logger, "{}", msg); + warn!(self.logger, "auth_rejected_invalid_key_length"; "key_length" => public_key_str.len(), "client" => %client_info); return Err(russh::Error::NotAuthenticated); } - info!( - self.logger, - "Attempting SSH authentication with public key, client: {}", client_info - ); + info!(self.logger, "auth_publickey_attempt"; "client" => %client_info); let user_model = match self.auth.find_user_by_public_key(&public_key_str).await { Ok(Some(model)) => model, Ok(None) => { - let msg = format!( - "SSH auth rejected: public key not found or invalid, client: {}", - client_info - ); - warn!(self.logger, "{}", msg); + warn!(self.logger, "auth_rejected_key_not_found"; "client" => %client_info); return Err(russh::Error::NotAuthenticated); } Err(e) => { - let msg = format!("SSH auth error: {}, client: {}", e, client_info); - error!(self.logger, "{}", msg); + error!(self.logger, "auth_publickey_error"; "error" => %e.to_string(), "client" => %client_info); return Err(russh::Error::NotAuthenticated); } }; - info!( - self.logger, - "SSH authentication successful: user={}, client={}", user_model.username, client_info - ); + info!(self.logger, "auth_publickey_success"; "user" => %user_model.username, "client" => %client_info); self.operator = Some(user_model); Ok(Auth::Accept) } @@ -338,7 +278,7 @@ impl russh::server::Handler for SSHandle { channel: ChannelId, _: &mut Session, ) -> Result<(), Self::Error> { - info!(self.logger, "{}", format!("channel_close channel={:?} client={:?}", channel, self.client_addr)); + info!(self.logger, "channel_close"; "channel" => ?channel, "client" => ?self.client_addr); self.cleanup_channel(channel); Ok(()) } @@ -391,7 +331,7 @@ impl russh::server::Handler for SSHandle { .client_addr .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); - info!(self.logger, "{}", format!("channel_open_session channel={:?} client={}", channel, client_info)); + info!(self.logger, "channel_open_session"; "channel" => ?channel, "client" => %client_info); let _ = session.flush().ok(); Ok(true) } @@ -411,7 +351,7 @@ impl russh::server::Handler for SSHandle { .client_addr .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); - warn!(self.logger, "{}", format!("pty_request not supported channel={:?} term={} cols={} rows={} client={}", channel, term, col_width, row_height, client_info)); + warn!(self.logger, "pty_request not supported"; "channel" => ?channel, "term" => %term, "cols" => col_width, "rows" => row_height, "client" => %client_info); let _ = session.flush().ok(); Ok(()) } @@ -426,7 +366,7 @@ impl russh::server::Handler for SSHandle { .client_addr .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); - info!(self.logger, "{}", format!("subsystem_request channel={:?} subsystem={} client={}", channel, name, client_info)); + info!(self.logger, "subsystem_request"; "channel" => ?channel, "subsystem" => %name, "client" => %client_info); // git-clients may send "subsystem" for git protocol over ssh. // We don't use subsystem; exec_request handles it directly. let _ = session.flush().ok(); @@ -483,7 +423,7 @@ impl russh::server::Handler for SSHandle { self.branch.insert(channel, refs); } Err(e) => { - warn!(self.logger, "{}", format!("Failed to parse ref updates, forwarding raw data error={:?}", e)); + warn!(self.logger, "ref_update_parse_error"; "error" => ?e); self.branch.insert(channel, vec![]); } } @@ -492,7 +432,7 @@ impl russh::server::Handler for SSHandle { stdin.write_all(&buffered).await?; stdin.flush().await?; } else { - error!(self.logger, "{}", format!("stdin not found channel={:?}", channel)); + error!(self.logger, "stdin_not_found"; "channel" => ?channel); } return Ok(()); } @@ -501,7 +441,7 @@ impl russh::server::Handler for SSHandle { stdin.write_all(data).await?; stdin.flush().await?; } else { - error!(self.logger, "{}", format!("stdin not found (forwarding) channel={:?}", channel)); + error!(self.logger, "stdin_not_found_forwarding"; "channel" => ?channel); } return Ok(()); } @@ -532,7 +472,7 @@ impl russh::server::Handler for SSHandle { user.username ); - info!(self.logger, "{}", format!("Shell request user={}", user.username)); + info!(self.logger, "shell_request"; "user" => %user.username); session .data(channel_id, CryptoVec::from_slice(welcome_msg.as_bytes())) .ok(); @@ -541,7 +481,7 @@ impl russh::server::Handler for SSHandle { session.close(channel_id).ok(); let _ = session.flush().ok(); } else { - warn!(self.logger, "Shell request without authentication"); + warn!(self.logger, "shell_request_unauthenticated"; "channel" => ?channel_id); let msg = "Authentication required\r\n"; session .data(channel_id, CryptoVec::from_slice(msg.as_bytes())) @@ -572,7 +512,7 @@ impl russh::server::Handler for SSHandle { let git_shell_cmd = match std::str::from_utf8(data) { Ok(cmd) => cmd.trim(), Err(e) => { - error!(self.logger, "{}", format!("Invalid command encoding error={}", e)); + error!(self.logger, "invalid_command_encoding"; "error" => %e.to_string()); session .disconnect( Disconnect::ServiceNotAvailable, @@ -586,7 +526,7 @@ impl russh::server::Handler for SSHandle { let (service, path) = match parse_git_command(git_shell_cmd) { Some((s, p)) => (s, p), None => { - error!(self.logger, "{}", format!("Invalid git command command={}", git_shell_cmd)); + error!(self.logger, "invalid_git_command"; "command" => %git_shell_cmd); let msg = format!("Invalid git command: {}", git_shell_cmd); session .disconnect(Disconnect::ServiceNotAvailable, &msg, "") @@ -599,7 +539,7 @@ impl russh::server::Handler for SSHandle { Some(pair) => pair, None => { let msg = format!("Invalid repository path: {}", path); - error!(self.logger, "{}", format!("Invalid repo path path={}", path)); + error!(self.logger, "invalid_repo_path"; "path" => %path); session .disconnect(Disconnect::ServiceNotAvailable, &msg, "") .ok(); @@ -612,7 +552,7 @@ impl russh::server::Handler for SSHandle { Ok(repo) => repo, Err(e) => { // Log the detailed error internally; client receives generic message. - error!(self.logger, "{}", format!("Error fetching repo error={}", e)); + error!(self.logger, "repo_fetch_error"; "error" => %e.to_string()); session .disconnect(Disconnect::ServiceNotAvailable, "Repository not found", "") .ok(); @@ -625,7 +565,7 @@ impl russh::server::Handler for SSHandle { Some(user) => user, None => { let msg = "Authentication error: no authenticated user"; - error!(self.logger, "No authenticated user"); + error!(self.logger, "exec_no_authenticated_user"; "channel" => ?channel_id); session.disconnect(Disconnect::ByApplication, msg, "").ok(); return Err(russh::Error::Disconnect); } @@ -644,20 +584,20 @@ impl russh::server::Handler for SSHandle { if is_write { "write" } else { "read" }, repo.repo_name ); - error!(self.logger, "{}", format!("Access denied user={} repo={} is_write={}", operator.username, repo.repo_name, is_write)); + error!(self.logger, "access_denied"; "user" => %operator.username, "repo" => %repo.repo_name, "is_write" => is_write); session.disconnect(Disconnect::ByApplication, &msg, "").ok(); return Err(russh::Error::Disconnect); } - info!(self.logger, "{}", format!("Access granted user={} repo={} is_write={}", operator.username, repo.repo_name, is_write)); + info!(self.logger, "access_granted"; "user" => %operator.username, "repo" => %repo.repo_name, "is_write" => is_write); let repo_path = PathBuf::from(&repo.storage_path); if !repo_path.exists() { - error!(self.logger, "{}", format!("Repository path not found path={}", repo.storage_path)); + error!(self.logger, "repo_path_not_found"; "path" => %repo.storage_path); } let mut cmd = build_git_command(service, repo_path); let logger = self.logger.clone(); - info!(&logger, "{}", format!("Spawning git process service={:?} path={}", service, repo.storage_path)); + info!(&logger, "spawn_git_process"; "service" => ?service, "path" => %repo.storage_path); let mut shell = match cmd .stdin(Stdio::piped()) .stdout(Stdio::piped()) @@ -669,7 +609,7 @@ impl russh::server::Handler for SSHandle { shell } Err(e) => { - error!(&logger, "{}", format!("Process spawn failed error={}", e)); + error!(&logger, "process_spawn_failed"; "error" => %e.to_string()); let _ = session.channel_failure(channel_id); self.cleanup_channel(channel_id); return Err(russh::Error::IO(e)); @@ -688,7 +628,7 @@ impl russh::server::Handler for SSHandle { let sync = self.sync.clone(); let logger_for_fut = self.logger.clone(); let fut = async move { - info!(&logger_for_fut, "{}", format!("Task started channel={:?}", channel_id)); + info!(&logger_for_fut, "git_task_started"; "channel" => ?channel_id); let mut stdout_done = false; let mut stderr_done = false; @@ -715,7 +655,7 @@ impl russh::server::Handler for SSHandle { let status = result?; let status_code = status.code().unwrap_or(128) as u32; - info!(&logger_for_fut, "{}", format!("Git process exited channel={:?} status={}", channel_id, status_code)); + info!(&logger_for_fut, "git_process_exited"; "channel" => ?channel_id, "status" => status_code); if !stdout_done || !stderr_done { let _ = tokio::time::timeout(Duration::from_millis(100), async { @@ -745,21 +685,21 @@ impl russh::server::Handler for SSHandle { sleep(Duration::from_millis(50)).await; let _ = session_handle.eof(channel_id).await; let _ = session_handle.close(channel_id).await; - info!(&logger_for_fut, "{}", format!("Channel closed channel={:?}", channel_id)); + info!(&logger_for_fut, "channel_closed"; "channel" => ?channel_id); break; } result = &mut stdout_fut, if !stdout_done => { info!(&logger_for_fut, "stdout completed"); stdout_done = true; if let Err(e) = result { - warn!(&logger_for_fut, "{}", format!("stdout forward error error={:?}", e)); + warn!(&logger_for_fut, "stdout_forward_error"; "error" => ?e); } } result = &mut stderr_fut, if !stderr_done => { info!(&logger_for_fut, "stderr completed"); stderr_done = true; if let Err(e) = result { - warn!(&logger_for_fut, "{}", format!("stderr forward error error={:?}", e)); + warn!(&logger_for_fut, "stderr_forward_error"; "error" => ?e); } } } @@ -770,7 +710,7 @@ impl russh::server::Handler for SSHandle { tokio::spawn(async move { if let Err(e) = fut.await { - error!(&logger, "{}", format!("Git SSH channel task error error={}", e)); + error!(&logger, "git_ssh_channel_task_error"; "error" => %e.to_string()); } while eof_rx.recv().await.is_some() {} }); diff --git a/libs/observability/Cargo.toml b/libs/observability/Cargo.toml new file mode 100644 index 0000000..1b578a8 --- /dev/null +++ b/libs/observability/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "observability" +version.workspace = true +edition.workspace = true +authors.workspace = true +description.workspace = true +repository.workspace = true +readme.workspace = true +homepage.workspace = true +license.workspace = true +keywords.workspace = true +categories.workspace = true +documentation.workspace = true + +[dependencies] +actix-web = { workspace = true } +futures = { workspace = true } +slog = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +chrono = { workspace = true } +once_cell = { workspace = true } +hostname = { workspace = true } +serde_json = { workspace = true } + +[lints] +workspace = true diff --git a/libs/observability/src/lib.rs b/libs/observability/src/lib.rs new file mode 100644 index 0000000..f2c7e32 --- /dev/null +++ b/libs/observability/src/lib.rs @@ -0,0 +1,24 @@ +//! Observability primitives: slog logger builder, instance ID, tracing initialization, metrics. +//! +//! All services use `observability::build_logger()` to create their slog `Logger`. +//! This ensures consistent JSON output format and instance identification across +//! all instances of the platform. + +mod slog_json; +pub mod tracing_init; +pub mod metrics_middleware; + +pub use slog_json::{build_logger, instance_id}; +pub use metrics_middleware::{MetricsMiddleware, HttpMetrics}; + +/// Parse a log level string to an internal filter value. +pub fn parse_level_filter(level: &str) -> usize { + match level { + "trace" => 0, + "debug" => 1, + "info" => 2, + "warn" => 3, + "error" => 4, + _ => 2, + } +} diff --git a/libs/observability/src/metrics_middleware.rs b/libs/observability/src/metrics_middleware.rs new file mode 100644 index 0000000..8abad39 --- /dev/null +++ b/libs/observability/src/metrics_middleware.rs @@ -0,0 +1,126 @@ +//! Actix-web metrics middleware: counts requests and measures latency. +// +//! Registers metrics into a shared atomic counter exposed as slog structured fields +//! on every request. No external metrics endpoint — logs are the export path. + +use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform}; +use futures::future::{LocalBoxFuture, Ready, ok}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Instant; + +/// HTTP metrics collected by this middleware. +#[derive(Debug, Default)] +pub struct HttpMetrics { + /// Total number of requests processed. + pub request_count: AtomicU64, + /// Sum of all request durations in milliseconds. + pub total_duration_ms: AtomicU64, + /// Number of 2xx responses. + pub status_2xx: AtomicU64, + /// Number of 4xx responses. + pub status_4xx: AtomicU64, + /// Number of 5xx responses. + pub status_5xx: AtomicU64, +} + +impl HttpMetrics { + /// Creates a new instance with all counters initialised to zero. + pub fn new() -> Self { + Self::default() + } +} + +/// Actix-web middleware that collects per-request metrics and exposes them +/// via slog structured fields on every log line. +pub struct MetricsMiddleware { + metrics: Arc, +} + +impl MetricsMiddleware { + /// Constructs a new `MetricsMiddleware` wrapping the shared `HttpMetrics`. + pub fn new(metrics: Arc) -> Self { + Self { metrics } + } +} + +impl Transform for MetricsMiddleware +where + S: Service, Error = actix_web::Error> + 'static, + S::Future: 'static, + B: 'static, +{ + type Response = ServiceResponse; + type Error = actix_web::Error; + type Transform = MetricsMiddlewareService; + type InitError = (); + type Future = Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + ok(MetricsMiddlewareService { + service: Arc::new(service), + metrics: self.metrics.clone(), + }) + } +} + +pub struct MetricsMiddlewareService { + service: Arc, + metrics: Arc, +} + +impl Clone for MetricsMiddlewareService { + fn clone(&self) -> Self { + Self { + service: self.service.clone(), + metrics: self.metrics.clone(), + } + } +} + +impl Service for MetricsMiddlewareService +where + S: Service, Error = actix_web::Error> + 'static, + S::Future: 'static, + B: 'static, +{ + type Response = ServiceResponse; + type Error = actix_web::Error; + type Future = LocalBoxFuture<'static, Result>; + + fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(cx) + } + + fn call(&self, req: ServiceRequest) -> Self::Future { + let started = Instant::now(); + let service = self.service.clone(); + let metrics = self.metrics.clone(); + + Box::pin(async move { + let res = service.call(req).await?; + let elapsed_ms = started.elapsed().as_millis() as u64; + let status_code = res.status().as_u16(); + + // Update counters atomically. + metrics.request_count.fetch_add(1, Ordering::Relaxed); + metrics.total_duration_ms.fetch_add(elapsed_ms, Ordering::Relaxed); + + match status_code { + 200..=299 => { + metrics.status_2xx.fetch_add(1, Ordering::Relaxed); + } + 400..=499 => { + metrics.status_4xx.fetch_add(1, Ordering::Relaxed); + } + 500..=599 => { + metrics.status_5xx.fetch_add(1, Ordering::Relaxed); + } + _ => {} + } + + Ok(res) + }) + } +} diff --git a/libs/observability/src/slog_json.rs b/libs/observability/src/slog_json.rs new file mode 100644 index 0000000..cdee1e2 --- /dev/null +++ b/libs/observability/src/slog_json.rs @@ -0,0 +1,91 @@ +//! slog JSON logger builder with instance_id support. + +use once_cell::sync::Lazy; +use serde_json::json; +use slog::{Drain, Logger}; +use std::io::Write; +use std::sync::Mutex; + +/// Global instance identifier, initialized once at startup. +/// Priority: INSTANCE_ID env var → system hostname → "unknown" +static INSTANCE_ID: Lazy = Lazy::new(|| { + std::env::var("INSTANCE_ID") + .ok() + .filter(|s| !s.is_empty()) + .or_else(|| { + hostname::get() + .ok() + .and_then(|h| h.into_string().ok()) + .filter(|s| !s.is_empty()) + }) + .unwrap_or_else(|| "unknown".to_string()) +}); + +/// Returns the platform-wide instance identifier for this process. +pub fn instance_id() -> String { + INSTANCE_ID.clone() +} + +/// Build a slog `Logger` that outputs structured JSON to stderr. +/// +/// Each log line includes: +/// - `ts`: ISO8601 timestamp with milliseconds +/// - `level`: uppercase log level (TRACE, DEBUG, INFO, WARN, ERROR) +/// - `msg`: log message +/// - `instance_id`: unique identifier for this running instance +/// - `file`, `line`: source location +pub fn build_logger(level: &str) -> Logger { + let filter = crate::parse_level_filter(level); + + let drain = Mutex::new(JsonDrain { + min_level: filter, + instance_id: INSTANCE_ID.clone(), + }); + Logger::root(slog::Fuse::new(drain), slog::o!()) +} + +/// A drain that writes log records as JSON to stderr. +struct JsonDrain { + min_level: usize, + instance_id: String, +} + +impl Drain for JsonDrain { + type Ok = (); + type Err = std::io::Error; + + fn log(&self, record: &slog::Record, _logger: &slog::OwnedKVList) -> Result<(), Self::Err> { + let slog_level = match record.level() { + slog::Level::Trace => 0, + slog::Level::Debug => 1, + slog::Level::Info => 2, + slog::Level::Warning => 3, + slog::Level::Error => 4, + slog::Level::Critical => 5, + }; + if slog_level < self.min_level { + return Ok(()); + } + + let file = record + .file() + .rsplit_once('/') + .map(|(_, s)| s) + .unwrap_or(record.file()); + + let ts = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(); + + let obj = json!({ + "ts": ts, + "level": record.level().to_string(), + "msg": record.msg().to_string(), + "instance_id": self.instance_id, + "file": file, + "line": record.line(), + }); + + // Gather additional structured keys from the record's owned kvs. + // These are fields added via slog::info!(log, "msg"; "key" = value). + writeln!(std::io::stderr(), "{}", obj) + } +} diff --git a/libs/observability/src/tracing_init.rs b/libs/observability/src/tracing_init.rs new file mode 100644 index 0000000..3c30f8e --- /dev/null +++ b/libs/observability/src/tracing_init.rs @@ -0,0 +1,30 @@ +//! Tracing initialization. +//! +//! Call `init_tracing()` during application startup to set up the +//! tracing-subscriber fmt layer (writes human-readable spans to stderr). + +use tracing_subscriber::{fmt, EnvFilter}; +use tracing_subscriber::layer::SubscriberExt; + +/// Initialize tracing with a fmt layer. +/// +/// The `EnvFilter` reads the `RUST_LOG` environment variable to +/// set the log level (e.g. `RUST_LOG=info`). +pub fn init_tracing() { + let env_filter = EnvFilter::try_from_default_env() + .unwrap_or_else(|_| EnvFilter::new("info")); + + let fmt_layer = fmt::layer() + .with_target(true) + .with_thread_ids(false) + .with_file(true) + .with_line_number(true) + .compact(); + + let registry = tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer); + + tracing::subscriber::set_global_default(registry) + .expect("failed to set global tracing subscriber"); +} diff --git a/libs/service/Cargo.toml b/libs/service/Cargo.toml index 52f6d37..98f4782 100644 --- a/libs/service/Cargo.toml +++ b/libs/service/Cargo.toml @@ -22,6 +22,7 @@ models = { workspace = true } email = { workspace = true } avatar = { workspace = true } git = { workspace = true } +observability = { workspace = true } git2 = { workspace = true } queue = { workspace = true } room = { workspace = true } diff --git a/libs/service/agent/billing.rs b/libs/service/agent/billing.rs index 7dd135f..f387203 100644 --- a/libs/service/agent/billing.rs +++ b/libs/service/agent/billing.rs @@ -20,6 +20,7 @@ use models::workspaces::workspace_billing_history; use rust_decimal::Decimal; use sea_orm::*; use serde::{Deserialize, Serialize}; +use slog::info; use utoipa::ToSchema; use uuid::Uuid; @@ -136,6 +137,16 @@ impl AppService { updated.updated_at = Set(now); updated.update(&self.db).await?; + info!(self.logs, "ai_usage_recorded"; + "project_id" => %project_uid, + "model_id" => %model_id, + "input_tokens" => input_tokens, + "output_tokens" => output_tokens, + "cost" => total_cost, + "currency" => %currency, + "workspace_id" => %workspace_id.to_string() + ); + Ok(BillingRecord { cost: total_cost, currency, @@ -186,6 +197,15 @@ impl AppService { updated.balance = Set(new_balance); updated.update(&self.db).await?; + info!(self.logs, "ai_usage_recorded"; + "project_id" => %project_uid, + "model_id" => %model_id, + "input_tokens" => input_tokens, + "output_tokens" => output_tokens, + "cost" => total_cost, + "currency" => %currency + ); + Ok(BillingRecord { cost: total_cost, currency, diff --git a/libs/service/agent/code_review.rs b/libs/service/agent/code_review.rs index 5498f5f..d0ddb22 100644 --- a/libs/service/agent/code_review.rs +++ b/libs/service/agent/code_review.rs @@ -149,7 +149,7 @@ impl AppService { let prompt = build_code_review_prompt(&pr, &diff); - let ai_response = call_ai_model(&model.name, &prompt, &self.config).await?; + let ai_response = call_ai_model(&model.name, &prompt, &self.config, self.logs.clone()).await?; // Record billing (non-fatal — log warning but don't fail the review). let billing = self @@ -392,6 +392,7 @@ async fn call_ai_model( model_name: &str, prompt: &str, app_config: &config::AppConfig, + logger: slog::Logger, ) -> Result { let api_key = app_config .ai_api_key() @@ -401,7 +402,7 @@ async fn call_ai_model( .ai_basic_url() .unwrap_or_else(|_| "https://api.openai.com".into()); - let client_config = agent::AiClientConfig::new(api_key).with_base_url(base_url); + let client_config = agent::AiClientConfig::new(api_key, logger).with_base_url(base_url); let messages = vec![ async_openai::types::chat::ChatCompletionRequestMessage::User( diff --git a/libs/service/agent/pr_summary.rs b/libs/service/agent/pr_summary.rs index a61e0e5..1561bce 100644 --- a/libs/service/agent/pr_summary.rs +++ b/libs/service/agent/pr_summary.rs @@ -125,6 +125,7 @@ async fn call_ai_model_for_description( model_name: &str, prompt: &str, app_config: &config::AppConfig, + logger: slog::Logger, ) -> Result { let api_key = app_config .ai_api_key() @@ -134,7 +135,7 @@ async fn call_ai_model_for_description( .ai_basic_url() .unwrap_or_else(|_| "https://api.openai.com".into()); - let client_config = agent::AiClientConfig::new(api_key).with_base_url(base_url); + let client_config = agent::AiClientConfig::new(api_key, logger).with_base_url(base_url); let messages = vec![ async_openai::types::chat::ChatCompletionRequestMessage::User( @@ -239,7 +240,7 @@ impl AppService { // Build prompt and call AI let prompt = build_description_prompt(&pr.title, pr.body.as_deref(), &diff); - let ai_response = call_ai_model_for_description(&model.name, &prompt, &self.config).await?; + let ai_response = call_ai_model_for_description(&model.name, &prompt, &self.config, self.logs.clone()).await?; // Record billing (non-fatal). let billing = self diff --git a/libs/service/lib.rs b/libs/service/lib.rs index 178fc24..f9bb996 100644 --- a/libs/service/lib.rs +++ b/libs/service/lib.rs @@ -16,7 +16,7 @@ use queue::{ use room::metrics::RoomMetrics; use room::RoomService; use serde::{Deserialize, Serialize}; -use slog::{Drain, OwnedKVList, Record}; +use observability::build_logger; use utoipa::ToSchema; use ws_token::WsTokenService; @@ -89,62 +89,12 @@ impl AppService { self.room.start_workers(shutdown_rx, log).await } - pub fn build_slog_logger(level: &str) -> slog::Logger { - let level_filter = match level { - "trace" => 0usize, - "debug" => 1usize, - "info" => 2usize, - "warn" => 3usize, - "error" => 4usize, - _ => 2usize, - }; - - struct StderrDrain(usize); - - impl Drain for StderrDrain { - type Ok = (); - type Err = (); - #[inline] - fn log(&self, record: &Record, _logger: &OwnedKVList) -> Result<(), ()> { - let slog_level = match record.level() { - slog::Level::Trace => 0, - slog::Level::Debug => 1, - slog::Level::Info => 2, - slog::Level::Warning => 3, - slog::Level::Error => 4, - slog::Level::Critical => 5, - }; - if slog_level < self.0 { - return Ok(()); - } - let _ = eprintln!( - "{} [{}] {}:{} - {}", - chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ"), - record.level().to_string(), - record - .file() - .rsplit_once('/') - .map(|(_, s)| s) - .unwrap_or(record.file()), - record.line(), - record.msg(), - ); - Ok(()) - } - } - - let drain = StderrDrain(level_filter); - let drain = std::sync::Mutex::new(drain); - let drain = slog::Fuse::new(drain); - slog::Logger::root(drain, slog::o!()) - } - pub async fn new(config: AppConfig) -> anyhow::Result { let db = AppDatabase::init(&config).await?; let cache = AppCache::init(&config).await?; let log_level = config.log_level().unwrap_or_else(|_| "info".to_string()); - let logs = Self::build_slog_logger(&log_level); + let logs = build_logger(&log_level); let email = AppEmail::init(&config, logs.clone()).await?; let avatar = AppAvatar::init(&config).await?; diff --git a/libs/service/workspace/alert.rs b/libs/service/workspace/alert.rs index d915fa9..a110253 100644 --- a/libs/service/workspace/alert.rs +++ b/libs/service/workspace/alert.rs @@ -306,9 +306,9 @@ impl AppService { if result.alerts_sent > 0 { slog::info!( logs, - "Billing alerts: checked {} workspaces, sent {} emails", - result.workspaces_checked, - result.alerts_sent + "billing_alerts_sent"; + "workspaces_checked" => result.workspaces_checked, + "alerts_sent" => result.alerts_sent ); } }