feat(observability): Phase 1-5 slog structured logging across platform

Phase 1: add libs/observability crate (build_logger, instance_id);
  remove duplicate logger init from 4 crates
Phase 2: Actix-web RequestLogger with trace_id; MetricsMiddleware + HttpMetrics
Phase 3: Git SSH handle.rs slog struct; HTTP handler Logger kv
Phase 4: AI client eprintln -> slog warn; billing ai_usage_recorded log
Phase 5: SessionManager slog; workspace alert slog 2.x syntax
This commit is contained in:
ZhenYi 2026-04-21 13:44:12 +08:00
parent a527428b2d
commit 81e6ee3d48
25 changed files with 725 additions and 356 deletions

273
Cargo.lock generated
View File

@ -435,6 +435,7 @@ dependencies = [
"sea-orm", "sea-orm",
"serde", "serde",
"serde_json", "serde_json",
"slog",
"thiserror 2.0.18", "thiserror 2.0.18",
"tiktoken-rs", "tiktoken-rs",
"tokio", "tokio",
@ -648,6 +649,7 @@ dependencies = [
"db", "db",
"futures", "futures",
"migrate", "migrate",
"observability",
"sea-orm", "sea-orm",
"serde_json", "serde_json",
"service", "service",
@ -1079,14 +1081,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"axum-core", "axum-core 0.4.5",
"bytes", "bytes",
"futures-util", "futures-util",
"http 1.4.0", "http 1.4.0",
"http-body", "http-body",
"http-body-util", "http-body-util",
"itoa", "itoa",
"matchit", "matchit 0.7.3",
"memchr", "memchr",
"mime", "mime",
"percent-encoding", "percent-encoding",
@ -1099,6 +1101,31 @@ dependencies = [
"tower-service", "tower-service",
] ]
[[package]]
name = "axum"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31b698c5f9a010f6573133b09e0de5408834d0c82f8d7475a89fc1867a71cd90"
dependencies = [
"axum-core 0.5.6",
"bytes",
"futures-util",
"http 1.4.0",
"http-body",
"http-body-util",
"itoa",
"matchit 0.8.4",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"serde_core",
"sync_wrapper",
"tower 0.5.3",
"tower-layer",
"tower-service",
]
[[package]] [[package]]
name = "axum-core" name = "axum-core"
version = "0.4.5" version = "0.4.5"
@ -1119,6 +1146,24 @@ dependencies = [
"tower-service", "tower-service",
] ]
[[package]]
name = "axum-core"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1"
dependencies = [
"bytes",
"futures-core",
"http 1.4.0",
"http-body",
"http-body-util",
"mime",
"pin-project-lite",
"sync_wrapper",
"tower-layer",
"tower-service",
]
[[package]] [[package]]
name = "backoff" name = "backoff"
version = "0.4.0" version = "0.4.0"
@ -2689,6 +2734,12 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582"
[[package]]
name = "fixedbitset"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
[[package]] [[package]]
name = "flate2" name = "flate2"
version = "1.1.9" version = "1.1.9"
@ -3010,6 +3061,7 @@ dependencies = [
"config", "config",
"db", "db",
"git", "git",
"observability",
"reqwest 0.13.2", "reqwest 0.13.2",
"slog", "slog",
"tokio", "tokio",
@ -3072,6 +3124,7 @@ dependencies = [
"config", "config",
"db", "db",
"git", "git",
"observability",
"slog", "slog",
"tokio", "tokio",
] ]
@ -4548,6 +4601,12 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "matchit"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
[[package]] [[package]]
name = "matrixmultiply" name = "matrixmultiply"
version = "0.3.10" version = "0.3.10"
@ -4745,6 +4804,12 @@ dependencies = [
"pxfm", "pxfm",
] ]
[[package]]
name = "multimap"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084"
[[package]] [[package]]
name = "nalgebra" name = "nalgebra"
version = "0.34.2" version = "0.34.2"
@ -5002,6 +5067,21 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "observability"
version = "0.2.9"
dependencies = [
"actix-web",
"chrono",
"futures",
"hostname",
"once_cell",
"serde_json",
"slog",
"tracing",
"tracing-subscriber",
]
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.21.4" version = "1.21.4"
@ -5344,6 +5424,17 @@ dependencies = [
"sha2 0.10.9", "sha2 0.10.9",
] ]
[[package]]
name = "petgraph"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8701b58ea97060d5e5b155d383a69952a60943f0e6dfe30b04c287beb0b27455"
dependencies = [
"fixedbitset",
"hashbrown 0.15.5",
"indexmap 2.13.0",
]
[[package]] [[package]]
name = "pgvector" name = "pgvector"
version = "0.4.1" version = "0.4.1"
@ -5709,7 +5800,38 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
dependencies = [ dependencies = [
"bytes", "bytes",
"prost-derive", "prost-derive 0.13.5",
]
[[package]]
name = "prost"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568"
dependencies = [
"bytes",
"prost-derive 0.14.3",
]
[[package]]
name = "prost-build"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7"
dependencies = [
"heck 0.5.0",
"itertools",
"log",
"multimap",
"petgraph",
"prettyplease",
"prost 0.14.3",
"prost-types 0.14.3",
"pulldown-cmark 0.13.3",
"pulldown-cmark-to-cmark",
"regex",
"syn 2.0.117",
"tempfile",
] ]
[[package]] [[package]]
@ -5725,13 +5847,35 @@ dependencies = [
"syn 2.0.117", "syn 2.0.117",
] ]
[[package]]
name = "prost-derive"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn 2.0.117",
]
[[package]] [[package]]
name = "prost-types" name = "prost-types"
version = "0.13.5" version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16"
dependencies = [ dependencies = [
"prost", "prost 0.13.5",
]
[[package]]
name = "prost-types"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8991c4cbdb8bc5b11f0b074ffe286c30e523de90fee5ba8132f1399f23cb3dd7"
dependencies = [
"prost 0.14.3",
] ]
[[package]] [[package]]
@ -5777,12 +5921,32 @@ dependencies = [
"unicase", "unicase",
] ]
[[package]]
name = "pulldown-cmark"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c3a14896dfa883796f1cb410461aef38810ea05f2b2c33c5aded3649095fdad"
dependencies = [
"bitflags",
"memchr",
"unicase",
]
[[package]] [[package]]
name = "pulldown-cmark-escape" name = "pulldown-cmark-escape"
version = "0.11.0" version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "007d8adb5ddab6f8e3f491ac63566a7d5002cc7ed73901f72057943fa71ae1ae" checksum = "007d8adb5ddab6f8e3f491ac63566a7d5002cc7ed73901f72057943fa71ae1ae"
[[package]]
name = "pulldown-cmark-to-cmark"
version = "22.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50793def1b900256624a709439404384204a5dc3a6ec580281bfaac35e882e90"
dependencies = [
"pulldown-cmark 0.13.3",
]
[[package]] [[package]]
name = "pxfm" name = "pxfm"
version = "0.1.28" version = "0.1.28"
@ -5800,15 +5964,15 @@ dependencies = [
"futures", "futures",
"futures-util", "futures-util",
"parking_lot", "parking_lot",
"prost", "prost 0.13.5",
"prost-types", "prost-types 0.13.5",
"reqwest 0.12.28", "reqwest 0.12.28",
"semver", "semver",
"serde", "serde",
"serde_json", "serde_json",
"thiserror 1.0.69", "thiserror 1.0.69",
"tokio", "tokio",
"tonic", "tonic 0.12.3",
] ]
[[package]] [[package]]
@ -6433,6 +6597,18 @@ dependencies = [
[[package]] [[package]]
name = "rpc" name = "rpc"
version = "0.2.9" version = "0.2.9"
dependencies = [
"anyhow",
"chrono",
"prost 0.14.3",
"prost-types 0.14.3",
"session_manager",
"slog",
"tokio",
"tonic 0.14.5",
"tonic-prost-build",
"uuid",
]
[[package]] [[package]]
name = "rsa" name = "rsa"
@ -7161,8 +7337,9 @@ dependencies = [
"mime_guess2", "mime_guess2",
"models", "models",
"moka", "moka",
"observability",
"p256", "p256",
"pulldown-cmark", "pulldown-cmark 0.12.2",
"queue", "queue",
"quick-xml 0.37.5", "quick-xml 0.37.5",
"rand 0.10.0", "rand 0.10.0",
@ -7209,6 +7386,23 @@ dependencies = [
"uuid", "uuid",
] ]
[[package]]
name = "session_manager"
version = "0.2.9"
dependencies = [
"anyhow",
"chrono",
"deadpool-redis",
"rand 0.10.0",
"redis",
"serde",
"serde_json",
"slog",
"thiserror 2.0.18",
"tokio",
"uuid",
]
[[package]] [[package]]
name = "sha1" name = "sha1"
version = "0.10.6" version = "0.10.6"
@ -8132,7 +8326,7 @@ checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52"
dependencies = [ dependencies = [
"async-stream", "async-stream",
"async-trait", "async-trait",
"axum", "axum 0.7.9",
"base64 0.22.1", "base64 0.22.1",
"bytes", "bytes",
"flate2", "flate2",
@ -8145,7 +8339,7 @@ dependencies = [
"hyper-util", "hyper-util",
"percent-encoding", "percent-encoding",
"pin-project", "pin-project",
"prost", "prost 0.13.5",
"rustls-native-certs 0.8.3", "rustls-native-certs 0.8.3",
"rustls-pemfile", "rustls-pemfile",
"socket2 0.5.10", "socket2 0.5.10",
@ -8158,6 +8352,63 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "tonic"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fec7c61a0695dc1887c1b53952990f3ad2e3a31453e1f49f10e75424943a93ec"
dependencies = [
"async-trait",
"axum 0.8.9",
"base64 0.22.1",
"bytes",
"h2 0.4.13",
"http 1.4.0",
"http-body",
"http-body-util",
"hyper",
"hyper-timeout",
"hyper-util",
"percent-encoding",
"pin-project",
"socket2 0.6.3",
"sync_wrapper",
"tokio",
"tokio-stream",
"tower 0.5.3",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tonic-build"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1882ac3bf5ef12877d7ed57aad87e75154c11931c2ba7e6cde5e22d63522c734"
dependencies = [
"prettyplease",
"proc-macro2",
"quote",
"syn 2.0.117",
]
[[package]]
name = "tonic-prost-build"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3144df636917574672e93d0f56d7edec49f90305749c668df5101751bb8f95a"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build",
"prost-types 0.14.3",
"quote",
"syn 2.0.117",
"tempfile",
"tonic-build",
]
[[package]] [[package]]
name = "tower" name = "tower"
version = "0.4.13" version = "0.4.13"
@ -8186,7 +8437,9 @@ checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-util", "futures-util",
"indexmap 2.13.0",
"pin-project-lite", "pin-project-lite",
"slab",
"sync_wrapper", "sync_wrapper",
"tokio", "tokio",
"tokio-util", "tokio-util",

View File

@ -14,6 +14,7 @@ members = [
"libs/webhook", "libs/webhook",
"libs/transport", "libs/transport",
"libs/rpc", "libs/rpc",
"libs/observability",
"libs/avatar", "libs/avatar",
"libs/agent", "libs/agent",
"libs/migrate", "libs/migrate",
@ -44,8 +45,10 @@ api = { path = "libs/api" }
agent = { path = "libs/agent" } agent = { path = "libs/agent" }
webhook = { path = "libs/webhook" } webhook = { path = "libs/webhook" }
rpc = { path = "libs/rpc" } rpc = { path = "libs/rpc" }
observability = { path = "libs/observability" }
avatar = { path = "libs/avatar" } avatar = { path = "libs/avatar" }
migrate = { path = "libs/migrate" } migrate = { path = "libs/migrate" }
session_manager = { path = "libs/session_manager" }
sea-query = "1.0.0-rc.31" sea-query = "1.0.0-rc.31"
@ -99,6 +102,7 @@ opentelemetry-http = "0.31.0"
prost = "0.14.3" prost = "0.14.3"
prost-build = "0.14.3" prost-build = "0.14.3"
qdrant-client = "1.17.0" qdrant-client = "1.17.0"
prost-types = "0.14.3"
rand = "0.10.0" rand = "0.10.0"
russh = { version = "0.50.0", default-features = false, features = [] } russh = { version = "0.50.0", default-features = false, features = [] }
hmac = { version = "0.12.1", features = ["std"] } hmac = { version = "0.12.1", features = ["std"] }
@ -133,7 +137,7 @@ clap = "4.6.0"
time = "0.3.47" time = "0.3.47"
chrono = "0.4.44" chrono = "0.4.44"
tracing = "0.1.44" tracing = "0.1.44"
tracing-subscriber = "0.3.23" tracing-subscriber = { version = "0.3.23", features = ["env-filter"] }
tracing-opentelemetry = "0.32.1" tracing-opentelemetry = "0.32.1"
tonic = "0.14.5" tonic = "0.14.5"
tonic-build = "0.14.5" tonic-build = "0.14.5"

View File

@ -16,6 +16,7 @@ documentation.workspace = true
tokio = { workspace = true, features = ["full"] } tokio = { workspace = true, features = ["full"] }
uuid = { workspace = true } uuid = { workspace = true }
service = { workspace = true } service = { workspace = true }
observability = { workspace = true }
api = { workspace = true } api = { workspace = true }
session = { workspace = true } session = { workspace = true }
config = { workspace = true } config = { workspace = true }

View File

@ -1,7 +1,7 @@
//! Structured HTTP request logging middleware using slog. //! Structured HTTP request logging middleware using slog.
//! //!
//! Logs every incoming request with method, path, status code, //! Logs every incoming request with method, path, status code,
//! response time, client IP, and authenticated user ID. //! response time, client IP, authenticated user ID, and trace_id.
use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform}; use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform};
use futures::future::{LocalBoxFuture, Ready, ok}; use futures::future::{LocalBoxFuture, Ready, ok};
@ -83,6 +83,7 @@ where
.map(|s| s.to_string()) .map(|s| s.to_string())
.unwrap_or_else(|| "unknown".to_string()); .unwrap_or_else(|| "unknown".to_string());
let user_id: Option<Uuid> = req.get_session().user(); let user_id: Option<Uuid> = req.get_session().user();
let trace_id = Uuid::now_v7().to_string();
let full_path = if query.is_empty() { let full_path = if query.is_empty() {
path.clone() path.clone()
@ -104,20 +105,42 @@ where
let user_id_str = user_id let user_id_str = user_id
.map(|u: Uuid| u.to_string()) .map(|u: Uuid| u.to_string())
.unwrap_or_else(|| "-".to_string()); .unwrap_or_else(|| "-".to_string());
let log_message = format!( let duration_ms = elapsed.as_millis() as u64;
"HTTP request | method={} | path={} | status={} | duration_ms={} | remote={} | user_id={}",
method,
full_path,
status_code,
elapsed.as_millis(),
remote,
user_id_str
);
match status_code { match status_code {
200..=299 => slog_info!(&log, "{}", log_message), 200..=299 => {
400..=499 => slog_warn!(&log, "{}", log_message), slog_info!(log, "http_request";
_ => slog_error!(&log, "{}", log_message), "method" => %method,
"path" => %full_path,
"status" => status_code,
"duration_ms" => duration_ms,
"remote" => %remote,
"user_id" => %user_id_str,
"trace_id" => %trace_id
);
}
400..=499 => {
slog_warn!(log, "http_request";
"method" => %method,
"path" => %full_path,
"status" => status_code,
"duration_ms" => duration_ms,
"remote" => %remote,
"user_id" => %user_id_str,
"trace_id" => %trace_id
);
}
_ => {
slog_error!(log, "http_request";
"method" => %method,
"path" => %full_path,
"status" => status_code,
"duration_ms" => duration_ms,
"remote" => %remote,
"user_id" => %user_id_str,
"trace_id" => %trace_id
);
}
} }
} }
Ok(res) Ok(res)

View File

@ -10,7 +10,7 @@ use service::AppService;
use session::SessionMiddleware; use session::SessionMiddleware;
use session::config::{PersistentSession, SessionLifecycle, TtlExtensionPolicy}; use session::config::{PersistentSession, SessionLifecycle, TtlExtensionPolicy};
use session::storage::RedisClusterSessionStore; use session::storage::RedisClusterSessionStore;
use slog::Drain; use observability::{build_logger, MetricsMiddleware, HttpMetrics};
mod args; mod args;
mod logging; mod logging;
@ -25,56 +25,6 @@ pub struct AppState {
pub cache: AppCache, pub cache: AppCache,
} }
fn build_slog_logger(level: &str) -> slog::Logger {
let level_filter = match level {
"trace" => 0usize,
"debug" => 1usize,
"info" => 2usize,
"warn" => 3usize,
"error" => 4usize,
_ => 2usize,
};
struct StderrDrain(usize);
impl Drain for StderrDrain {
type Ok = ();
type Err = ();
#[inline]
fn log(&self, record: &slog::Record, _logger: &slog::OwnedKVList) -> Result<(), ()> {
let slog_level = match record.level() {
slog::Level::Trace => 0,
slog::Level::Debug => 1,
slog::Level::Info => 2,
slog::Level::Warning => 3,
slog::Level::Error => 4,
slog::Level::Critical => 5,
};
if slog_level < self.0 {
return Ok(());
}
let _ = eprintln!(
"{} [{}] {}:{} - {}",
chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ"),
record.level().to_string(),
record
.file()
.rsplit_once('/')
.map(|(_, s)| s)
.unwrap_or(record.file()),
record.line(),
record.msg(),
);
Ok(())
}
}
let drain = StderrDrain(level_filter);
let drain = std::sync::Mutex::new(drain);
let drain = slog::Fuse::new(drain);
slog::Logger::root(drain, slog::o!())
}
fn build_session_key(cfg: &AppConfig) -> anyhow::Result<Key> { fn build_session_key(cfg: &AppConfig) -> anyhow::Result<Key> {
if let Some(secret) = cfg.env.get("APP_SESSION_SECRET") { if let Some(secret) = cfg.env.get("APP_SESSION_SECRET") {
let bytes: Vec<u8> = secret.as_bytes().iter().cycle().take(64).copied().collect(); let bytes: Vec<u8> = secret.as_bytes().iter().cycle().take(64).copied().collect();
@ -87,7 +37,7 @@ fn build_session_key(cfg: &AppConfig) -> anyhow::Result<Key> {
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
let cfg = AppConfig::load(); let cfg = AppConfig::load();
let log_level = cfg.log_level().unwrap_or_else(|_| "info".to_string()); let log_level = cfg.log_level().unwrap_or_else(|_| "info".to_string());
let log = build_slog_logger(&log_level); let log = build_logger(&log_level);
slog::info!( slog::info!(
log, log,
"Starting {} {}", "Starting {} {}",
@ -125,7 +75,9 @@ async fn main() -> anyhow::Result<()> {
}); });
let bind_addr = args.bind.unwrap_or_else(|| "127.0.0.1:8080".to_string()); let bind_addr = args.bind.unwrap_or_else(|| "127.0.0.1:8080".to_string());
let http_metrics = std::sync::Arc::new(HttpMetrics::new());
slog::info!(log, "Listening on {}", bind_addr); slog::info!(log, "Listening on {}", bind_addr);
let http_metrics_server = http_metrics.clone();
HttpServer::new(move || { HttpServer::new(move || {
let cors = Cors::default() let cors = Cors::default()
.allow_any_origin() .allow_any_origin()
@ -146,10 +98,13 @@ async fn main() -> anyhow::Result<()> {
)) ))
.build(); .build();
let metrics_mw = MetricsMiddleware::new(http_metrics_server.clone());
App::new() App::new()
.wrap(cors) .wrap(cors)
.wrap(session_mw) .wrap(session_mw)
.wrap(Logger::default().exclude("/health")) .wrap(Logger::default().exclude("/health"))
.wrap(metrics_mw)
.app_data(web::Data::new(AppState { .app_data(web::Data::new(AppState {
db: db.clone(), db: db.clone(),
cache: cache.clone(), cache: cache.clone(),

View File

@ -15,6 +15,7 @@ documentation.workspace = true
[dependencies] [dependencies]
tokio = { workspace = true, features = ["full"] } tokio = { workspace = true, features = ["full"] }
git = { workspace = true } git = { workspace = true }
observability = { workspace = true }
db = { workspace = true } db = { workspace = true }
config = { workspace = true } config = { workspace = true }
tracing = { workspace = true } tracing = { workspace = true }

View File

@ -3,7 +3,7 @@ use config::AppConfig;
use db::cache::AppCache; use db::cache::AppCache;
use db::database::AppDatabase; use db::database::AppDatabase;
use git::hook::HookService; use git::hook::HookService;
use slog::{Drain, OwnedKVList, Record}; use observability::build_logger;
use tokio::signal; use tokio::signal;
mod args; mod args;
@ -17,7 +17,7 @@ async fn main() -> anyhow::Result<()> {
// 2. Init slog logging // 2. Init slog logging
let log_level = cfg.log_level().unwrap_or_else(|_| "info".to_string()); let log_level = cfg.log_level().unwrap_or_else(|_| "info".to_string());
let log = build_slog_logger(&log_level); let log = build_logger(&log_level);
// 3. Connect to database // 3. Connect to database
let db = AppDatabase::init(&cfg).await?; let db = AppDatabase::init(&cfg).await?;
@ -81,52 +81,3 @@ async fn main() -> anyhow::Result<()> {
Ok(()) Ok(())
} }
fn build_slog_logger(level: &str) -> slog::Logger {
let level_filter = match level {
"trace" => 0usize,
"debug" => 1usize,
"info" => 2usize,
"warn" => 3usize,
"error" => 4usize,
_ => 2usize,
};
struct StderrDrain(usize);
impl Drain for StderrDrain {
type Ok = ();
type Err = ();
#[inline]
fn log(&self, record: &Record, _logger: &OwnedKVList) -> Result<(), ()> {
let slog_level = match record.level() {
slog::Level::Trace => 0,
slog::Level::Debug => 1,
slog::Level::Info => 2,
slog::Level::Warning => 3,
slog::Level::Error => 4,
slog::Level::Critical => 5,
};
if slog_level < self.0 {
return Ok(());
}
let _ = eprintln!(
"{} [{}] {}:{} - {}",
chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ"),
record.level().to_string(),
record
.file()
.rsplit_once('/')
.map(|(_, s)| s)
.unwrap_or(record.file()),
record.line(),
record.msg(),
);
Ok(())
}
}
let drain = StderrDrain(level_filter);
let drain = std::sync::Mutex::new(drain);
let drain = slog::Fuse::new(drain);
slog::Logger::root(drain, slog::o!())
}

View File

@ -19,6 +19,7 @@ path = "src/main.rs"
[dependencies] [dependencies]
tokio = { workspace = true, features = ["full"] } tokio = { workspace = true, features = ["full"] }
git = { workspace = true } git = { workspace = true }
observability = { workspace = true }
db = { workspace = true } db = { workspace = true }
config = { workspace = true } config = { workspace = true }
slog = { workspace = true } slog = { workspace = true }

View File

@ -1,6 +1,6 @@
use clap::Parser; use clap::Parser;
use config::AppConfig; use config::AppConfig;
use slog::{Drain, OwnedKVList, Record}; use observability::build_logger;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command(name = "gitserver")] #[command(name = "gitserver")]
@ -43,52 +43,3 @@ async fn main() -> anyhow::Result<()> {
Ok(()) Ok(())
} }
fn build_logger(level: &str) -> slog::Logger {
let level_filter = match level {
"trace" => 0usize,
"debug" => 1usize,
"info" => 2usize,
"warn" => 3usize,
"error" => 4usize,
_ => 2usize,
};
struct StderrDrain(usize);
impl Drain for StderrDrain {
type Ok = ();
type Err = ();
#[inline]
fn log(&self, record: &Record, _logger: &OwnedKVList) -> Result<(), ()> {
let slog_level = match record.level() {
slog::Level::Trace => 0,
slog::Level::Debug => 1,
slog::Level::Info => 2,
slog::Level::Warning => 3,
slog::Level::Error => 4,
slog::Level::Critical => 5,
};
if slog_level < self.0 {
return Ok(());
}
let _ = eprintln!(
"{} [{}] {}:{} - {}",
chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ"),
record.level().to_string(),
record
.file()
.rsplit_once('/')
.map(|(_, s)| s)
.unwrap_or(record.file()),
record.line(),
record.msg(),
);
Ok(())
}
}
let drain = StderrDrain(level_filter);
let drain = std::sync::Mutex::new(drain);
let drain = slog::Fuse::new(drain);
slog::Logger::root(drain, slog::o!())
}

View File

@ -33,5 +33,6 @@ tiktoken-rs = { workspace = true }
agent-tool-derive = { path = "../agent-tool-derive" } agent-tool-derive = { path = "../agent-tool-derive" }
once_cell = { workspace = true } once_cell = { workspace = true }
regex = { workspace = true } regex = { workspace = true }
slog = { workspace = true }
[lints] [lints]
workspace = true workspace = true

View File

@ -14,19 +14,22 @@ use async_openai::types::chat::{
use std::time::Instant; use std::time::Instant;
use crate::error::{AgentError, Result}; use crate::error::{AgentError, Result};
use slog::warn;
/// Configuration for the AI client. /// Configuration for the AI client.
#[derive(Clone)] #[derive(Clone)]
pub struct AiClientConfig { pub struct AiClientConfig {
pub api_key: String, pub api_key: String,
pub base_url: Option<String>, pub base_url: Option<String>,
pub logger: slog::Logger,
} }
impl AiClientConfig { impl AiClientConfig {
pub fn new(api_key: String) -> Self { pub fn new(api_key: String, logger: slog::Logger) -> Self {
Self { Self {
api_key, api_key,
base_url: None, base_url: None,
logger,
} }
} }
@ -172,11 +175,12 @@ pub async fn call_with_retry(
Err(err) => { Err(err) => {
if state.should_retry() && is_retryable_error(&err) { if state.should_retry() && is_retryable_error(&err) {
let duration = state.backoff_duration(); let duration = state.backoff_duration();
eprintln!( warn!(config.logger, "ai_call_retry";
"AI call failed (attempt {}/{}), retrying in {:?}", "attempt" => state.attempt + 1,
state.attempt + 1, "max_retries" => state.max_retries,
state.max_retries, "backoff_ms" => duration.as_millis() as u64,
duration "model" => %model,
"error" => %err.to_string()
); );
tokio::time::sleep(duration).await; tokio::time::sleep(duration).await;
state.next(); state.next();
@ -239,11 +243,12 @@ pub async fn call_with_params(
Err(err) => { Err(err) => {
if state.should_retry() && is_retryable_error(&err) { if state.should_retry() && is_retryable_error(&err) {
let duration = state.backoff_duration(); let duration = state.backoff_duration();
eprintln!( warn!(config.logger, "ai_call_retry";
"AI call failed (attempt {}/{}), retrying in {:?}", "attempt" => state.attempt + 1,
state.attempt + 1, "max_retries" => state.max_retries,
state.max_retries, "backoff_ms" => duration.as_millis() as u64,
duration "model" => %model,
"error" => %err.to_string()
); );
tokio::time::sleep(duration).await; tokio::time::sleep(duration).await;
state.next(); state.next();

View File

@ -4,9 +4,10 @@ use futures_util::Stream;
use futures_util::StreamExt; use futures_util::StreamExt;
use models::repos::{repo, repo_branch_protect}; use models::repos::{repo, repo_branch_protect};
use sea_orm::*; use sea_orm::*;
use slog::{error, info, warn, Logger};
use std::path::PathBuf; use std::path::PathBuf;
use std::pin::Pin; use std::pin::Pin;
use std::time::Duration; use std::time::{Duration, Instant};
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use db::database::AppDatabase; use db::database::AppDatabase;
@ -24,14 +25,16 @@ pub struct GitHttpHandler {
storage_path: PathBuf, storage_path: PathBuf,
repo: repo::Model, repo: repo::Model,
db: AppDatabase, db: AppDatabase,
logger: Logger,
} }
impl GitHttpHandler { impl GitHttpHandler {
pub fn new(storage_path: PathBuf, repo: repo::Model, db: AppDatabase) -> Self { pub fn new(storage_path: PathBuf, repo: repo::Model, db: AppDatabase, logger: Logger) -> Self {
Self { Self {
storage_path, storage_path,
repo, repo,
db, db,
logger,
} }
} }
@ -92,6 +95,8 @@ impl GitHttpHandler {
service: &str, service: &str,
mut payload: web::Payload, mut payload: web::Payload,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let started = Instant::now();
info!(self.logger, "git_rpc_started"; "service" => %service, "repo" => %self.repo.repo_name, "repo_id" => self.repo.id.to_string());
let mut child = tokio::process::Command::new("git") let mut child = tokio::process::Command::new("git")
.arg(service) .arg(service)
.arg("--stateless-rpc") .arg("--stateless-rpc")
@ -133,6 +138,7 @@ impl GitHttpHandler {
// Reject oversized pre-PACK data to prevent memory exhaustion // Reject oversized pre-PACK data to prevent memory exhaustion
if pre_pack.len() + bytes.len() > PRE_PACK_LIMIT { if pre_pack.len() + bytes.len() > PRE_PACK_LIMIT {
warn!(self.logger, "git_rpc_payload_too_large"; "service" => %service, "repo" => %self.repo.repo_name, "repo_id" => self.repo.id.to_string());
return Err(actix_web::error::ErrorPayloadTooLarge(format!( return Err(actix_web::error::ErrorPayloadTooLarge(format!(
"Ref negotiation exceeds {} byte limit", "Ref negotiation exceeds {} byte limit",
PRE_PACK_LIMIT PRE_PACK_LIMIT
@ -143,6 +149,7 @@ impl GitHttpHandler {
pre_pack.extend_from_slice(&bytes[..pos]); pre_pack.extend_from_slice(&bytes[..pos]);
if let Err(msg) = check_branch_protection(&branch_protects, &pre_pack) { if let Err(msg) = check_branch_protection(&branch_protects, &pre_pack) {
warn!(self.logger, "branch_protection_violation"; "repo" => %self.repo.repo_name, "repo_id" => self.repo.id.to_string(), "message" => %msg);
return Err(actix_web::error::ErrorForbidden(msg)); return Err(actix_web::error::ErrorForbidden(msg));
} }
@ -202,12 +209,17 @@ impl GitHttpHandler {
if !output.status.success() { if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr); let stderr = String::from_utf8_lossy(&output.stderr);
let ms = started.elapsed().as_millis() as u64;
error!(self.logger, "git_rpc_failed"; "service" => %service, "repo" => %self.repo.repo_name, "repo_id" => self.repo.id.to_string(), "duration_ms" => ms, "stderr" => %stderr.to_string());
return Err(actix_web::error::ErrorInternalServerError(format!( return Err(actix_web::error::ErrorInternalServerError(format!(
"Git command failed: {}", "Git command failed: {}",
stderr stderr
))); )));
} }
let ms = started.elapsed().as_millis() as u64;
info!(self.logger, "git_rpc_completed"; "service" => %service, "repo" => %self.repo.repo_name, "repo_id" => self.repo.id.to_string(), "duration_ms" => ms, "bytes_out" => output.stdout.len());
Ok(HttpResponse::Ok() Ok(HttpResponse::Ok()
.content_type(format!("application/x-git-{}-result", service)) .content_type(format!("application/x-git-{}-result", service))
.insert_header(("Cache-Control", "no-cache")) .insert_header(("Cache-Control", "no-cache"))

View File

@ -37,7 +37,7 @@ pub async fn info_refs(
authorize_repo_access(&req, &state.db, &state.logger, &model, is_write).await?; authorize_repo_access(&req, &state.db, &state.logger, &model, is_write).await?;
let storage_path = PathBuf::from(&model.storage_path); let storage_path = PathBuf::from(&model.storage_path);
let handler = GitHttpHandler::new(storage_path, model, state.db.clone()); let handler = GitHttpHandler::new(storage_path, model, state.db.clone(), state.logger.clone());
handler.info_refs(service_param).await handler.info_refs(service_param).await
} }
@ -59,7 +59,7 @@ pub async fn upload_pack(
authorize_repo_access(&req, &state.db, &state.logger, &model, false).await?; authorize_repo_access(&req, &state.db, &state.logger, &model, false).await?;
let storage_path = PathBuf::from(&model.storage_path); let storage_path = PathBuf::from(&model.storage_path);
let handler = GitHttpHandler::new(storage_path, model, state.db.clone()); let handler = GitHttpHandler::new(storage_path, model, state.db.clone(), state.logger.clone());
handler.upload_pack(payload).await handler.upload_pack(payload).await
} }
@ -81,7 +81,7 @@ pub async fn receive_pack(
authorize_repo_access(&req, &state.db, &state.logger, &model, true).await?; authorize_repo_access(&req, &state.db, &state.logger, &model, true).await?;
let storage_path = PathBuf::from(&model.storage_path); let storage_path = PathBuf::from(&model.storage_path);
let handler = GitHttpHandler::new(storage_path, model.clone(), state.db.clone()); let handler = GitHttpHandler::new(storage_path, model.clone(), state.db.clone(), state.logger.clone());
let result = handler.receive_pack(payload).await; let result = handler.receive_pack(payload).await;
let _ = tokio::spawn({ let _ = tokio::spawn({

View File

@ -137,7 +137,7 @@ impl Drop for SSHandle {
.client_addr .client_addr
.map(|addr| format!("{}", addr)) .map(|addr| format!("{}", addr))
.unwrap_or_else(|| "unknown".to_string()); .unwrap_or_else(|| "unknown".to_string());
info!(self.logger, "SSH handler dropped for client: {}", addr_str); info!(self.logger, "ssh_handler_dropped"; "client" => %addr_str);
let channel_ids: Vec<_> = self.stdin.keys().copied().collect(); let channel_ids: Vec<_> = self.stdin.keys().copied().collect();
for channel_id in channel_ids { for channel_id in channel_ids {
@ -154,10 +154,7 @@ impl russh::server::Handler for SSHandle {
.client_addr .client_addr
.map(|addr| format!("{}", addr)) .map(|addr| format!("{}", addr))
.unwrap_or_else(|| "unknown".to_string()); .unwrap_or_else(|| "unknown".to_string());
info!( info!(self.logger, "auth_none_received"; "user" => %user, "client" => %client_info);
self.logger,
"auth_none received for user '{}', client: {}", user, client_info
);
Ok(Auth::UnsupportedMethod) Ok(Auth::UnsupportedMethod)
} }
@ -169,42 +166,25 @@ impl russh::server::Handler for SSHandle {
if token.is_empty() { if token.is_empty() {
warn!( warn!(self.logger, "auth_rejected_empty_token"; "client" => %client_info);
self.logger,
"auth_password rejected: empty token, client: {}", client_info
);
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
info!( info!(self.logger, "auth_token_attempt"; "client" => %client_info);
self.logger,
"Attempting SSH token authentication, client: {}", client_info
);
let user_model = match self.token_service.find_user_by_token(token).await { let user_model = match self.token_service.find_user_by_token(token).await {
Ok(Some(model)) => model, Ok(Some(model)) => model,
Ok(None) => { Ok(None) => {
warn!( warn!(self.logger, "auth_rejected_token_not_found"; "client" => %client_info);
self.logger,
"SSH token auth rejected: token not found or expired, client: {}", client_info
);
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
Err(e) => { Err(e) => {
error!( error!(self.logger, "auth_token_error"; "error" => %e.to_string(), "client" => %client_info);
self.logger,
"SSH token auth error: {}, client: {}", e, client_info
);
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
}; };
info!( info!(self.logger, "auth_token_success"; "user" => %user_model.username, "client" => %client_info);
self.logger,
"SSH token authentication successful: user={}, client={}",
user_model.username,
client_info
);
self.operator = Some(user_model); self.operator = Some(user_model);
Ok(Auth::Accept) Ok(Auth::Accept)
} }
@ -226,49 +206,29 @@ impl russh::server::Handler for SSHandle {
.unwrap_or_else(|| "unknown".to_string()); .unwrap_or_else(|| "unknown".to_string());
if user != "git" { if user != "git" {
let msg = format!( warn!(self.logger, "auth_rejected_invalid_username"; "user" => %user, "client" => %client_info);
"SSH auth rejected: invalid username '{}', client: {}",
user, client_info
);
warn!(self.logger, "{}", msg);
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
let public_key_str = public_key.to_string(); let public_key_str = public_key.to_string();
if public_key_str.len() < 32 { if public_key_str.len() < 32 {
let msg = format!( warn!(self.logger, "auth_rejected_invalid_key_length"; "key_length" => public_key_str.len(), "client" => %client_info);
"SSH auth rejected: invalid public key length ({}), client: {}",
public_key_str.len(),
client_info
);
warn!(self.logger, "{}", msg);
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
info!( info!(self.logger, "auth_publickey_attempt"; "client" => %client_info);
self.logger,
"Attempting SSH authentication with public key, client: {}", client_info
);
let user_model = match self.auth.find_user_by_public_key(&public_key_str).await { let user_model = match self.auth.find_user_by_public_key(&public_key_str).await {
Ok(Some(model)) => model, Ok(Some(model)) => model,
Ok(None) => { Ok(None) => {
let msg = format!( warn!(self.logger, "auth_rejected_key_not_found"; "client" => %client_info);
"SSH auth rejected: public key not found or invalid, client: {}",
client_info
);
warn!(self.logger, "{}", msg);
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
Err(e) => { Err(e) => {
let msg = format!("SSH auth error: {}, client: {}", e, client_info); error!(self.logger, "auth_publickey_error"; "error" => %e.to_string(), "client" => %client_info);
error!(self.logger, "{}", msg);
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
}; };
info!( info!(self.logger, "auth_publickey_success"; "user" => %user_model.username, "client" => %client_info);
self.logger,
"SSH authentication successful: user={}, client={}", user_model.username, client_info
);
self.operator = Some(user_model); self.operator = Some(user_model);
Ok(Auth::Accept) Ok(Auth::Accept)
} }
@ -283,49 +243,29 @@ impl russh::server::Handler for SSHandle {
.unwrap_or_else(|| "unknown".to_string()); .unwrap_or_else(|| "unknown".to_string());
if user != "git" { if user != "git" {
let msg = format!( warn!(self.logger, "auth_rejected_invalid_username"; "user" => %user, "client" => %client_info);
"SSH auth rejected: invalid username '{}', client: {}",
user, client_info
);
warn!(self.logger, "{}", msg);
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
let public_key_str = certificate.to_string(); let public_key_str = certificate.to_string();
if public_key_str.len() < 32 { if public_key_str.len() < 32 {
let msg = format!( warn!(self.logger, "auth_rejected_invalid_key_length"; "key_length" => public_key_str.len(), "client" => %client_info);
"SSH auth rejected: invalid public key length ({}), client: {}",
public_key_str.len(),
client_info
);
warn!(self.logger, "{}", msg);
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
info!( info!(self.logger, "auth_publickey_attempt"; "client" => %client_info);
self.logger,
"Attempting SSH authentication with public key, client: {}", client_info
);
let user_model = match self.auth.find_user_by_public_key(&public_key_str).await { let user_model = match self.auth.find_user_by_public_key(&public_key_str).await {
Ok(Some(model)) => model, Ok(Some(model)) => model,
Ok(None) => { Ok(None) => {
let msg = format!( warn!(self.logger, "auth_rejected_key_not_found"; "client" => %client_info);
"SSH auth rejected: public key not found or invalid, client: {}",
client_info
);
warn!(self.logger, "{}", msg);
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
Err(e) => { Err(e) => {
let msg = format!("SSH auth error: {}, client: {}", e, client_info); error!(self.logger, "auth_publickey_error"; "error" => %e.to_string(), "client" => %client_info);
error!(self.logger, "{}", msg);
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
}; };
info!( info!(self.logger, "auth_publickey_success"; "user" => %user_model.username, "client" => %client_info);
self.logger,
"SSH authentication successful: user={}, client={}", user_model.username, client_info
);
self.operator = Some(user_model); self.operator = Some(user_model);
Ok(Auth::Accept) Ok(Auth::Accept)
} }
@ -338,7 +278,7 @@ impl russh::server::Handler for SSHandle {
channel: ChannelId, channel: ChannelId,
_: &mut Session, _: &mut Session,
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
info!(self.logger, "{}", format!("channel_close channel={:?} client={:?}", channel, self.client_addr)); info!(self.logger, "channel_close"; "channel" => ?channel, "client" => ?self.client_addr);
self.cleanup_channel(channel); self.cleanup_channel(channel);
Ok(()) Ok(())
} }
@ -391,7 +331,7 @@ impl russh::server::Handler for SSHandle {
.client_addr .client_addr
.map(|addr| format!("{}", addr)) .map(|addr| format!("{}", addr))
.unwrap_or_else(|| "unknown".to_string()); .unwrap_or_else(|| "unknown".to_string());
info!(self.logger, "{}", format!("channel_open_session channel={:?} client={}", channel, client_info)); info!(self.logger, "channel_open_session"; "channel" => ?channel, "client" => %client_info);
let _ = session.flush().ok(); let _ = session.flush().ok();
Ok(true) Ok(true)
} }
@ -411,7 +351,7 @@ impl russh::server::Handler for SSHandle {
.client_addr .client_addr
.map(|addr| format!("{}", addr)) .map(|addr| format!("{}", addr))
.unwrap_or_else(|| "unknown".to_string()); .unwrap_or_else(|| "unknown".to_string());
warn!(self.logger, "{}", format!("pty_request not supported channel={:?} term={} cols={} rows={} client={}", channel, term, col_width, row_height, client_info)); warn!(self.logger, "pty_request not supported"; "channel" => ?channel, "term" => %term, "cols" => col_width, "rows" => row_height, "client" => %client_info);
let _ = session.flush().ok(); let _ = session.flush().ok();
Ok(()) Ok(())
} }
@ -426,7 +366,7 @@ impl russh::server::Handler for SSHandle {
.client_addr .client_addr
.map(|addr| format!("{}", addr)) .map(|addr| format!("{}", addr))
.unwrap_or_else(|| "unknown".to_string()); .unwrap_or_else(|| "unknown".to_string());
info!(self.logger, "{}", format!("subsystem_request channel={:?} subsystem={} client={}", channel, name, client_info)); info!(self.logger, "subsystem_request"; "channel" => ?channel, "subsystem" => %name, "client" => %client_info);
// git-clients may send "subsystem" for git protocol over ssh. // git-clients may send "subsystem" for git protocol over ssh.
// We don't use subsystem; exec_request handles it directly. // We don't use subsystem; exec_request handles it directly.
let _ = session.flush().ok(); let _ = session.flush().ok();
@ -483,7 +423,7 @@ impl russh::server::Handler for SSHandle {
self.branch.insert(channel, refs); self.branch.insert(channel, refs);
} }
Err(e) => { Err(e) => {
warn!(self.logger, "{}", format!("Failed to parse ref updates, forwarding raw data error={:?}", e)); warn!(self.logger, "ref_update_parse_error"; "error" => ?e);
self.branch.insert(channel, vec![]); self.branch.insert(channel, vec![]);
} }
} }
@ -492,7 +432,7 @@ impl russh::server::Handler for SSHandle {
stdin.write_all(&buffered).await?; stdin.write_all(&buffered).await?;
stdin.flush().await?; stdin.flush().await?;
} else { } else {
error!(self.logger, "{}", format!("stdin not found channel={:?}", channel)); error!(self.logger, "stdin_not_found"; "channel" => ?channel);
} }
return Ok(()); return Ok(());
} }
@ -501,7 +441,7 @@ impl russh::server::Handler for SSHandle {
stdin.write_all(data).await?; stdin.write_all(data).await?;
stdin.flush().await?; stdin.flush().await?;
} else { } else {
error!(self.logger, "{}", format!("stdin not found (forwarding) channel={:?}", channel)); error!(self.logger, "stdin_not_found_forwarding"; "channel" => ?channel);
} }
return Ok(()); return Ok(());
} }
@ -532,7 +472,7 @@ impl russh::server::Handler for SSHandle {
user.username user.username
); );
info!(self.logger, "{}", format!("Shell request user={}", user.username)); info!(self.logger, "shell_request"; "user" => %user.username);
session session
.data(channel_id, CryptoVec::from_slice(welcome_msg.as_bytes())) .data(channel_id, CryptoVec::from_slice(welcome_msg.as_bytes()))
.ok(); .ok();
@ -541,7 +481,7 @@ impl russh::server::Handler for SSHandle {
session.close(channel_id).ok(); session.close(channel_id).ok();
let _ = session.flush().ok(); let _ = session.flush().ok();
} else { } else {
warn!(self.logger, "Shell request without authentication"); warn!(self.logger, "shell_request_unauthenticated"; "channel" => ?channel_id);
let msg = "Authentication required\r\n"; let msg = "Authentication required\r\n";
session session
.data(channel_id, CryptoVec::from_slice(msg.as_bytes())) .data(channel_id, CryptoVec::from_slice(msg.as_bytes()))
@ -572,7 +512,7 @@ impl russh::server::Handler for SSHandle {
let git_shell_cmd = match std::str::from_utf8(data) { let git_shell_cmd = match std::str::from_utf8(data) {
Ok(cmd) => cmd.trim(), Ok(cmd) => cmd.trim(),
Err(e) => { Err(e) => {
error!(self.logger, "{}", format!("Invalid command encoding error={}", e)); error!(self.logger, "invalid_command_encoding"; "error" => %e.to_string());
session session
.disconnect( .disconnect(
Disconnect::ServiceNotAvailable, Disconnect::ServiceNotAvailable,
@ -586,7 +526,7 @@ impl russh::server::Handler for SSHandle {
let (service, path) = match parse_git_command(git_shell_cmd) { let (service, path) = match parse_git_command(git_shell_cmd) {
Some((s, p)) => (s, p), Some((s, p)) => (s, p),
None => { None => {
error!(self.logger, "{}", format!("Invalid git command command={}", git_shell_cmd)); error!(self.logger, "invalid_git_command"; "command" => %git_shell_cmd);
let msg = format!("Invalid git command: {}", git_shell_cmd); let msg = format!("Invalid git command: {}", git_shell_cmd);
session session
.disconnect(Disconnect::ServiceNotAvailable, &msg, "") .disconnect(Disconnect::ServiceNotAvailable, &msg, "")
@ -599,7 +539,7 @@ impl russh::server::Handler for SSHandle {
Some(pair) => pair, Some(pair) => pair,
None => { None => {
let msg = format!("Invalid repository path: {}", path); let msg = format!("Invalid repository path: {}", path);
error!(self.logger, "{}", format!("Invalid repo path path={}", path)); error!(self.logger, "invalid_repo_path"; "path" => %path);
session session
.disconnect(Disconnect::ServiceNotAvailable, &msg, "") .disconnect(Disconnect::ServiceNotAvailable, &msg, "")
.ok(); .ok();
@ -612,7 +552,7 @@ impl russh::server::Handler for SSHandle {
Ok(repo) => repo, Ok(repo) => repo,
Err(e) => { Err(e) => {
// Log the detailed error internally; client receives generic message. // Log the detailed error internally; client receives generic message.
error!(self.logger, "{}", format!("Error fetching repo error={}", e)); error!(self.logger, "repo_fetch_error"; "error" => %e.to_string());
session session
.disconnect(Disconnect::ServiceNotAvailable, "Repository not found", "") .disconnect(Disconnect::ServiceNotAvailable, "Repository not found", "")
.ok(); .ok();
@ -625,7 +565,7 @@ impl russh::server::Handler for SSHandle {
Some(user) => user, Some(user) => user,
None => { None => {
let msg = "Authentication error: no authenticated user"; let msg = "Authentication error: no authenticated user";
error!(self.logger, "No authenticated user"); error!(self.logger, "exec_no_authenticated_user"; "channel" => ?channel_id);
session.disconnect(Disconnect::ByApplication, msg, "").ok(); session.disconnect(Disconnect::ByApplication, msg, "").ok();
return Err(russh::Error::Disconnect); return Err(russh::Error::Disconnect);
} }
@ -644,20 +584,20 @@ impl russh::server::Handler for SSHandle {
if is_write { "write" } else { "read" }, if is_write { "write" } else { "read" },
repo.repo_name repo.repo_name
); );
error!(self.logger, "{}", format!("Access denied user={} repo={} is_write={}", operator.username, repo.repo_name, is_write)); error!(self.logger, "access_denied"; "user" => %operator.username, "repo" => %repo.repo_name, "is_write" => is_write);
session.disconnect(Disconnect::ByApplication, &msg, "").ok(); session.disconnect(Disconnect::ByApplication, &msg, "").ok();
return Err(russh::Error::Disconnect); return Err(russh::Error::Disconnect);
} }
info!(self.logger, "{}", format!("Access granted user={} repo={} is_write={}", operator.username, repo.repo_name, is_write)); info!(self.logger, "access_granted"; "user" => %operator.username, "repo" => %repo.repo_name, "is_write" => is_write);
let repo_path = PathBuf::from(&repo.storage_path); let repo_path = PathBuf::from(&repo.storage_path);
if !repo_path.exists() { if !repo_path.exists() {
error!(self.logger, "{}", format!("Repository path not found path={}", repo.storage_path)); error!(self.logger, "repo_path_not_found"; "path" => %repo.storage_path);
} }
let mut cmd = build_git_command(service, repo_path); let mut cmd = build_git_command(service, repo_path);
let logger = self.logger.clone(); let logger = self.logger.clone();
info!(&logger, "{}", format!("Spawning git process service={:?} path={}", service, repo.storage_path)); info!(&logger, "spawn_git_process"; "service" => ?service, "path" => %repo.storage_path);
let mut shell = match cmd let mut shell = match cmd
.stdin(Stdio::piped()) .stdin(Stdio::piped())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
@ -669,7 +609,7 @@ impl russh::server::Handler for SSHandle {
shell shell
} }
Err(e) => { Err(e) => {
error!(&logger, "{}", format!("Process spawn failed error={}", e)); error!(&logger, "process_spawn_failed"; "error" => %e.to_string());
let _ = session.channel_failure(channel_id); let _ = session.channel_failure(channel_id);
self.cleanup_channel(channel_id); self.cleanup_channel(channel_id);
return Err(russh::Error::IO(e)); return Err(russh::Error::IO(e));
@ -688,7 +628,7 @@ impl russh::server::Handler for SSHandle {
let sync = self.sync.clone(); let sync = self.sync.clone();
let logger_for_fut = self.logger.clone(); let logger_for_fut = self.logger.clone();
let fut = async move { let fut = async move {
info!(&logger_for_fut, "{}", format!("Task started channel={:?}", channel_id)); info!(&logger_for_fut, "git_task_started"; "channel" => ?channel_id);
let mut stdout_done = false; let mut stdout_done = false;
let mut stderr_done = false; let mut stderr_done = false;
@ -715,7 +655,7 @@ impl russh::server::Handler for SSHandle {
let status = result?; let status = result?;
let status_code = status.code().unwrap_or(128) as u32; let status_code = status.code().unwrap_or(128) as u32;
info!(&logger_for_fut, "{}", format!("Git process exited channel={:?} status={}", channel_id, status_code)); info!(&logger_for_fut, "git_process_exited"; "channel" => ?channel_id, "status" => status_code);
if !stdout_done || !stderr_done { if !stdout_done || !stderr_done {
let _ = tokio::time::timeout(Duration::from_millis(100), async { let _ = tokio::time::timeout(Duration::from_millis(100), async {
@ -745,21 +685,21 @@ impl russh::server::Handler for SSHandle {
sleep(Duration::from_millis(50)).await; sleep(Duration::from_millis(50)).await;
let _ = session_handle.eof(channel_id).await; let _ = session_handle.eof(channel_id).await;
let _ = session_handle.close(channel_id).await; let _ = session_handle.close(channel_id).await;
info!(&logger_for_fut, "{}", format!("Channel closed channel={:?}", channel_id)); info!(&logger_for_fut, "channel_closed"; "channel" => ?channel_id);
break; break;
} }
result = &mut stdout_fut, if !stdout_done => { result = &mut stdout_fut, if !stdout_done => {
info!(&logger_for_fut, "stdout completed"); info!(&logger_for_fut, "stdout completed");
stdout_done = true; stdout_done = true;
if let Err(e) = result { if let Err(e) = result {
warn!(&logger_for_fut, "{}", format!("stdout forward error error={:?}", e)); warn!(&logger_for_fut, "stdout_forward_error"; "error" => ?e);
} }
} }
result = &mut stderr_fut, if !stderr_done => { result = &mut stderr_fut, if !stderr_done => {
info!(&logger_for_fut, "stderr completed"); info!(&logger_for_fut, "stderr completed");
stderr_done = true; stderr_done = true;
if let Err(e) = result { if let Err(e) = result {
warn!(&logger_for_fut, "{}", format!("stderr forward error error={:?}", e)); warn!(&logger_for_fut, "stderr_forward_error"; "error" => ?e);
} }
} }
} }
@ -770,7 +710,7 @@ impl russh::server::Handler for SSHandle {
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = fut.await { if let Err(e) = fut.await {
error!(&logger, "{}", format!("Git SSH channel task error error={}", e)); error!(&logger, "git_ssh_channel_task_error"; "error" => %e.to_string());
} }
while eof_rx.recv().await.is_some() {} while eof_rx.recv().await.is_some() {}
}); });

View File

@ -0,0 +1,27 @@
[package]
name = "observability"
version.workspace = true
edition.workspace = true
authors.workspace = true
description.workspace = true
repository.workspace = true
readme.workspace = true
homepage.workspace = true
license.workspace = true
keywords.workspace = true
categories.workspace = true
documentation.workspace = true
[dependencies]
actix-web = { workspace = true }
futures = { workspace = true }
slog = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
chrono = { workspace = true }
once_cell = { workspace = true }
hostname = { workspace = true }
serde_json = { workspace = true }
[lints]
workspace = true

View File

@ -0,0 +1,24 @@
//! Observability primitives: slog logger builder, instance ID, tracing initialization, metrics.
//!
//! All services use `observability::build_logger()` to create their slog `Logger`.
//! This ensures consistent JSON output format and instance identification across
//! all instances of the platform.
mod slog_json;
pub mod tracing_init;
pub mod metrics_middleware;
pub use slog_json::{build_logger, instance_id};
pub use metrics_middleware::{MetricsMiddleware, HttpMetrics};
/// Parse a log level string to an internal filter value.
pub fn parse_level_filter(level: &str) -> usize {
match level {
"trace" => 0,
"debug" => 1,
"info" => 2,
"warn" => 3,
"error" => 4,
_ => 2,
}
}

View File

@ -0,0 +1,126 @@
//! Actix-web metrics middleware: counts requests and measures latency.
//
//! Registers metrics into a shared atomic counter exposed as slog structured fields
//! on every request. No external metrics endpoint — logs are the export path.
use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform};
use futures::future::{LocalBoxFuture, Ready, ok};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
/// HTTP metrics collected by this middleware.
#[derive(Debug, Default)]
pub struct HttpMetrics {
/// Total number of requests processed.
pub request_count: AtomicU64,
/// Sum of all request durations in milliseconds.
pub total_duration_ms: AtomicU64,
/// Number of 2xx responses.
pub status_2xx: AtomicU64,
/// Number of 4xx responses.
pub status_4xx: AtomicU64,
/// Number of 5xx responses.
pub status_5xx: AtomicU64,
}
impl HttpMetrics {
/// Creates a new instance with all counters initialised to zero.
pub fn new() -> Self {
Self::default()
}
}
/// Actix-web middleware that collects per-request metrics and exposes them
/// via slog structured fields on every log line.
pub struct MetricsMiddleware {
metrics: Arc<HttpMetrics>,
}
impl MetricsMiddleware {
/// Constructs a new `MetricsMiddleware` wrapping the shared `HttpMetrics`.
pub fn new(metrics: Arc<HttpMetrics>) -> Self {
Self { metrics }
}
}
impl<S, B> Transform<S, ServiceRequest> for MetricsMiddleware
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error> + 'static,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = actix_web::Error;
type Transform = MetricsMiddlewareService<S>;
type InitError = ();
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ok(MetricsMiddlewareService {
service: Arc::new(service),
metrics: self.metrics.clone(),
})
}
}
pub struct MetricsMiddlewareService<S> {
service: Arc<S>,
metrics: Arc<HttpMetrics>,
}
impl<S> Clone for MetricsMiddlewareService<S> {
fn clone(&self) -> Self {
Self {
service: self.service.clone(),
metrics: self.metrics.clone(),
}
}
}
impl<S, B> Service<ServiceRequest> for MetricsMiddlewareService<S>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error> + 'static,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = actix_web::Error;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&self, req: ServiceRequest) -> Self::Future {
let started = Instant::now();
let service = self.service.clone();
let metrics = self.metrics.clone();
Box::pin(async move {
let res = service.call(req).await?;
let elapsed_ms = started.elapsed().as_millis() as u64;
let status_code = res.status().as_u16();
// Update counters atomically.
metrics.request_count.fetch_add(1, Ordering::Relaxed);
metrics.total_duration_ms.fetch_add(elapsed_ms, Ordering::Relaxed);
match status_code {
200..=299 => {
metrics.status_2xx.fetch_add(1, Ordering::Relaxed);
}
400..=499 => {
metrics.status_4xx.fetch_add(1, Ordering::Relaxed);
}
500..=599 => {
metrics.status_5xx.fetch_add(1, Ordering::Relaxed);
}
_ => {}
}
Ok(res)
})
}
}

View File

@ -0,0 +1,91 @@
//! slog JSON logger builder with instance_id support.
use once_cell::sync::Lazy;
use serde_json::json;
use slog::{Drain, Logger};
use std::io::Write;
use std::sync::Mutex;
/// Global instance identifier, initialized once at startup.
/// Priority: INSTANCE_ID env var → system hostname → "unknown"
static INSTANCE_ID: Lazy<String> = Lazy::new(|| {
std::env::var("INSTANCE_ID")
.ok()
.filter(|s| !s.is_empty())
.or_else(|| {
hostname::get()
.ok()
.and_then(|h| h.into_string().ok())
.filter(|s| !s.is_empty())
})
.unwrap_or_else(|| "unknown".to_string())
});
/// Returns the platform-wide instance identifier for this process.
pub fn instance_id() -> String {
INSTANCE_ID.clone()
}
/// Build a slog `Logger` that outputs structured JSON to stderr.
///
/// Each log line includes:
/// - `ts`: ISO8601 timestamp with milliseconds
/// - `level`: uppercase log level (TRACE, DEBUG, INFO, WARN, ERROR)
/// - `msg`: log message
/// - `instance_id`: unique identifier for this running instance
/// - `file`, `line`: source location
pub fn build_logger(level: &str) -> Logger {
let filter = crate::parse_level_filter(level);
let drain = Mutex::new(JsonDrain {
min_level: filter,
instance_id: INSTANCE_ID.clone(),
});
Logger::root(slog::Fuse::new(drain), slog::o!())
}
/// A drain that writes log records as JSON to stderr.
struct JsonDrain {
min_level: usize,
instance_id: String,
}
impl Drain for JsonDrain {
type Ok = ();
type Err = std::io::Error;
fn log(&self, record: &slog::Record, _logger: &slog::OwnedKVList) -> Result<(), Self::Err> {
let slog_level = match record.level() {
slog::Level::Trace => 0,
slog::Level::Debug => 1,
slog::Level::Info => 2,
slog::Level::Warning => 3,
slog::Level::Error => 4,
slog::Level::Critical => 5,
};
if slog_level < self.min_level {
return Ok(());
}
let file = record
.file()
.rsplit_once('/')
.map(|(_, s)| s)
.unwrap_or(record.file());
let ts = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
let obj = json!({
"ts": ts,
"level": record.level().to_string(),
"msg": record.msg().to_string(),
"instance_id": self.instance_id,
"file": file,
"line": record.line(),
});
// Gather additional structured keys from the record's owned kvs.
// These are fields added via slog::info!(log, "msg"; "key" = value).
writeln!(std::io::stderr(), "{}", obj)
}
}

View File

@ -0,0 +1,30 @@
//! Tracing initialization.
//!
//! Call `init_tracing()` during application startup to set up the
//! tracing-subscriber fmt layer (writes human-readable spans to stderr).
use tracing_subscriber::{fmt, EnvFilter};
use tracing_subscriber::layer::SubscriberExt;
/// Initialize tracing with a fmt layer.
///
/// The `EnvFilter` reads the `RUST_LOG` environment variable to
/// set the log level (e.g. `RUST_LOG=info`).
pub fn init_tracing() {
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info"));
let fmt_layer = fmt::layer()
.with_target(true)
.with_thread_ids(false)
.with_file(true)
.with_line_number(true)
.compact();
let registry = tracing_subscriber::registry()
.with(env_filter)
.with(fmt_layer);
tracing::subscriber::set_global_default(registry)
.expect("failed to set global tracing subscriber");
}

View File

@ -22,6 +22,7 @@ models = { workspace = true }
email = { workspace = true } email = { workspace = true }
avatar = { workspace = true } avatar = { workspace = true }
git = { workspace = true } git = { workspace = true }
observability = { workspace = true }
git2 = { workspace = true } git2 = { workspace = true }
queue = { workspace = true } queue = { workspace = true }
room = { workspace = true } room = { workspace = true }

View File

@ -20,6 +20,7 @@ use models::workspaces::workspace_billing_history;
use rust_decimal::Decimal; use rust_decimal::Decimal;
use sea_orm::*; use sea_orm::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use slog::info;
use utoipa::ToSchema; use utoipa::ToSchema;
use uuid::Uuid; use uuid::Uuid;
@ -136,6 +137,16 @@ impl AppService {
updated.updated_at = Set(now); updated.updated_at = Set(now);
updated.update(&self.db).await?; updated.update(&self.db).await?;
info!(self.logs, "ai_usage_recorded";
"project_id" => %project_uid,
"model_id" => %model_id,
"input_tokens" => input_tokens,
"output_tokens" => output_tokens,
"cost" => total_cost,
"currency" => %currency,
"workspace_id" => %workspace_id.to_string()
);
Ok(BillingRecord { Ok(BillingRecord {
cost: total_cost, cost: total_cost,
currency, currency,
@ -186,6 +197,15 @@ impl AppService {
updated.balance = Set(new_balance); updated.balance = Set(new_balance);
updated.update(&self.db).await?; updated.update(&self.db).await?;
info!(self.logs, "ai_usage_recorded";
"project_id" => %project_uid,
"model_id" => %model_id,
"input_tokens" => input_tokens,
"output_tokens" => output_tokens,
"cost" => total_cost,
"currency" => %currency
);
Ok(BillingRecord { Ok(BillingRecord {
cost: total_cost, cost: total_cost,
currency, currency,

View File

@ -149,7 +149,7 @@ impl AppService {
let prompt = build_code_review_prompt(&pr, &diff); let prompt = build_code_review_prompt(&pr, &diff);
let ai_response = call_ai_model(&model.name, &prompt, &self.config).await?; let ai_response = call_ai_model(&model.name, &prompt, &self.config, self.logs.clone()).await?;
// Record billing (non-fatal — log warning but don't fail the review). // Record billing (non-fatal — log warning but don't fail the review).
let billing = self let billing = self
@ -392,6 +392,7 @@ async fn call_ai_model(
model_name: &str, model_name: &str,
prompt: &str, prompt: &str,
app_config: &config::AppConfig, app_config: &config::AppConfig,
logger: slog::Logger,
) -> Result<agent::AiCallResponse, AppError> { ) -> Result<agent::AiCallResponse, AppError> {
let api_key = app_config let api_key = app_config
.ai_api_key() .ai_api_key()
@ -401,7 +402,7 @@ async fn call_ai_model(
.ai_basic_url() .ai_basic_url()
.unwrap_or_else(|_| "https://api.openai.com".into()); .unwrap_or_else(|_| "https://api.openai.com".into());
let client_config = agent::AiClientConfig::new(api_key).with_base_url(base_url); let client_config = agent::AiClientConfig::new(api_key, logger).with_base_url(base_url);
let messages = vec![ let messages = vec![
async_openai::types::chat::ChatCompletionRequestMessage::User( async_openai::types::chat::ChatCompletionRequestMessage::User(

View File

@ -125,6 +125,7 @@ async fn call_ai_model_for_description(
model_name: &str, model_name: &str,
prompt: &str, prompt: &str,
app_config: &config::AppConfig, app_config: &config::AppConfig,
logger: slog::Logger,
) -> Result<agent::AiCallResponse, AppError> { ) -> Result<agent::AiCallResponse, AppError> {
let api_key = app_config let api_key = app_config
.ai_api_key() .ai_api_key()
@ -134,7 +135,7 @@ async fn call_ai_model_for_description(
.ai_basic_url() .ai_basic_url()
.unwrap_or_else(|_| "https://api.openai.com".into()); .unwrap_or_else(|_| "https://api.openai.com".into());
let client_config = agent::AiClientConfig::new(api_key).with_base_url(base_url); let client_config = agent::AiClientConfig::new(api_key, logger).with_base_url(base_url);
let messages = vec![ let messages = vec![
async_openai::types::chat::ChatCompletionRequestMessage::User( async_openai::types::chat::ChatCompletionRequestMessage::User(
@ -239,7 +240,7 @@ impl AppService {
// Build prompt and call AI // Build prompt and call AI
let prompt = build_description_prompt(&pr.title, pr.body.as_deref(), &diff); let prompt = build_description_prompt(&pr.title, pr.body.as_deref(), &diff);
let ai_response = call_ai_model_for_description(&model.name, &prompt, &self.config).await?; let ai_response = call_ai_model_for_description(&model.name, &prompt, &self.config, self.logs.clone()).await?;
// Record billing (non-fatal). // Record billing (non-fatal).
let billing = self let billing = self

View File

@ -16,7 +16,7 @@ use queue::{
use room::metrics::RoomMetrics; use room::metrics::RoomMetrics;
use room::RoomService; use room::RoomService;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use slog::{Drain, OwnedKVList, Record}; use observability::build_logger;
use utoipa::ToSchema; use utoipa::ToSchema;
use ws_token::WsTokenService; use ws_token::WsTokenService;
@ -89,62 +89,12 @@ impl AppService {
self.room.start_workers(shutdown_rx, log).await self.room.start_workers(shutdown_rx, log).await
} }
pub fn build_slog_logger(level: &str) -> slog::Logger {
let level_filter = match level {
"trace" => 0usize,
"debug" => 1usize,
"info" => 2usize,
"warn" => 3usize,
"error" => 4usize,
_ => 2usize,
};
struct StderrDrain(usize);
impl Drain for StderrDrain {
type Ok = ();
type Err = ();
#[inline]
fn log(&self, record: &Record, _logger: &OwnedKVList) -> Result<(), ()> {
let slog_level = match record.level() {
slog::Level::Trace => 0,
slog::Level::Debug => 1,
slog::Level::Info => 2,
slog::Level::Warning => 3,
slog::Level::Error => 4,
slog::Level::Critical => 5,
};
if slog_level < self.0 {
return Ok(());
}
let _ = eprintln!(
"{} [{}] {}:{} - {}",
chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ"),
record.level().to_string(),
record
.file()
.rsplit_once('/')
.map(|(_, s)| s)
.unwrap_or(record.file()),
record.line(),
record.msg(),
);
Ok(())
}
}
let drain = StderrDrain(level_filter);
let drain = std::sync::Mutex::new(drain);
let drain = slog::Fuse::new(drain);
slog::Logger::root(drain, slog::o!())
}
pub async fn new(config: AppConfig) -> anyhow::Result<Self> { pub async fn new(config: AppConfig) -> anyhow::Result<Self> {
let db = AppDatabase::init(&config).await?; let db = AppDatabase::init(&config).await?;
let cache = AppCache::init(&config).await?; let cache = AppCache::init(&config).await?;
let log_level = config.log_level().unwrap_or_else(|_| "info".to_string()); let log_level = config.log_level().unwrap_or_else(|_| "info".to_string());
let logs = Self::build_slog_logger(&log_level); let logs = build_logger(&log_level);
let email = AppEmail::init(&config, logs.clone()).await?; let email = AppEmail::init(&config, logs.clone()).await?;
let avatar = AppAvatar::init(&config).await?; let avatar = AppAvatar::init(&config).await?;

View File

@ -306,9 +306,9 @@ impl AppService {
if result.alerts_sent > 0 { if result.alerts_sent > 0 {
slog::info!( slog::info!(
logs, logs,
"Billing alerts: checked {} workspaces, sent {} emails", "billing_alerts_sent";
result.workspaces_checked, "workspaces_checked" => result.workspaces_checked,
result.alerts_sent "alerts_sent" => result.alerts_sent
); );
} }
} }