diff --git a/libs/observability/src/lib.rs b/libs/observability/src/lib.rs index 879bed9..0458d4f 100644 --- a/libs/observability/src/lib.rs +++ b/libs/observability/src/lib.rs @@ -5,12 +5,14 @@ pub mod tracing_fmt; pub mod tracing_init; +pub mod msg_json_fmt; pub mod metrics_middleware; pub mod prometheus_exporter; pub mod otlp; pub mod tracing_middleware; pub use tracing_fmt::{init_tracing_subscriber, instance_id}; +pub use msg_json_fmt::set_span_msg; pub use metrics_middleware::{MetricsMiddleware, HttpMetrics}; pub use prometheus_exporter::{ install_recorder, prometheus_handler, spawn_http_metrics_poller, diff --git a/libs/observability/src/msg_json_fmt.rs b/libs/observability/src/msg_json_fmt.rs new file mode 100644 index 0000000..ae0691f --- /dev/null +++ b/libs/observability/src/msg_json_fmt.rs @@ -0,0 +1,231 @@ +//! Custom JSON formatter that injects `_msg` as the first field. +//! +//! VictoriaLogs requires every log line to have a `_msg` field as the message +//! subject for full-text search. This formatter ensures `_msg` is always the +//! first key in the serialized JSON object. +//! +//! `_msg` derivation rules: +//! - If the event has a `message` field → use it +//! - If a parent span stored a `_msg` in the thread-local buffer (set by the +//! HTTP middleware) → use it +//! - Fallback → use the event target (module path) + +use serde::Serialize; +use std::cell::RefCell; +use std::collections::HashMap; +use std::fmt; +use tracing::field::{Field, Visit}; +use tracing_subscriber::fmt::format::FormatEvent; + +// Thread-local buffer holding the current span's `_msg` value. +// Set by the HTTP middleware via `set_span_msg()` at span creation time. +// The formatter reads this value when the event message is empty (span close). +thread_local! { + static SPAN_MSG: RefCell = const { RefCell::new(String::new()) }; +} + +/// Set the `_msg` for the current span. Called by middleware at span creation. +pub fn set_span_msg(msg: String) { + SPAN_MSG.with(|cell| cell.borrow_mut().clone_from(&msg)); +} + +/// Read and clear the current span's `_msg`. Called by the formatter. +fn take_span_msg() -> Option { + SPAN_MSG.with(|cell| { + let mut s = cell.borrow_mut(); + if s.is_empty() { + None + } else { + let out = s.clone(); + s.clear(); + Some(out) + } + }) +} + +/// Capture all event fields via the visitor pattern. +struct FieldCollector { + message: Option, + fields: HashMap, +} + +impl FieldCollector { + fn new() -> Self { + Self { + message: None, + fields: HashMap::new(), + } + } +} + +impl Visit for FieldCollector { + fn record_str(&mut self, field: &Field, value: &str) { + if field.name() == "message" { + self.message = Some(value.to_string()); + } else { + self.fields + .insert(field.name().to_string(), serde_json::Value::String(value.to_string())); + } + } + + fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) { + self.fields.insert( + field.name().to_string(), + serde_json::Value::String(format!("{:?}", value)), + ); + } + + fn record_i64(&mut self, field: &Field, value: i64) { + self.fields.insert( + field.name().to_string(), + serde_json::Value::Number(value.into()), + ); + } + + fn record_u64(&mut self, field: &Field, value: u64) { + use serde_json::Number; + self.fields.insert( + field.name().to_string(), + serde_json::Value::Number(Number::from(value)), + ); + } + + fn record_bool(&mut self, field: &Field, value: bool) { + self.fields.insert( + field.name().to_string(), + serde_json::Value::Bool(value), + ); + } +} + +/// Custom `FormatEvent` that outputs JSON with `_msg` as the first field. +pub struct MsgJsonFormat; + +impl FormatEvent for MsgJsonFormat +where + S: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>, + N: for<'a> tracing_subscriber::fmt::format::FormatFields<'a> + 'static, +{ + fn format_event( + &self, + _ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>, + mut writer: tracing_subscriber::fmt::format::Writer<'_>, + event: &tracing::Event<'_>, + ) -> std::fmt::Result { + let mut collector = FieldCollector::new(); + event.record(&mut collector); + + // Derive _msg: event message → thread-local span msg → target fallback + let msg = collector + .message + .clone() + .or_else(take_span_msg) + .unwrap_or_else(|| event.metadata().target().to_string()); + + // Build ordered JSON: _msg first, then standard fields, then event fields + let mut ordered = serde_json::Map::with_capacity(10 + collector.fields.len()); + ordered.insert("_msg".to_string(), serde_json::Value::String(msg)); + ordered.insert( + "timestamp".to_string(), + serde_json::Value::String(chrono::Utc::now().to_rfc3339()), + ); + ordered.insert( + "level".to_string(), + serde_json::Value::String(format!("{}", event.metadata().level())), + ); + ordered.insert( + "target".to_string(), + serde_json::Value::String(event.metadata().target().to_string()), + ); + + if let Some(module_path) = event.metadata().module_path() { + ordered.insert( + "module".to_string(), + serde_json::Value::String(module_path.to_string()), + ); + } + if let Some(file) = event.metadata().file() { + ordered.insert("file".to_string(), serde_json::Value::String(file.to_string())); + } + if let Some(line) = event.metadata().line() { + ordered.insert("line".to_string(), serde_json::Value::Number(line.into())); + } + + // Event fields (excluding `message` which is already in `_msg`) + for (key, value) in &collector.fields { + ordered.insert(key.clone(), value.clone()); + } + + write!(writer, "{}", serde_json::to_string(&ordered).map_err(|_| fmt::Error)?) + } +} + +/// Log entry wrapper for VictoriaLogs-compatible output. +/// +/// All fields serialize in a fixed order with `_msg` first. +#[derive(Serialize)] +pub struct VLogEntry { + _msg: String, + timestamp: String, + level: String, + target: String, + #[serde(skip_serializing_if = "Option::is_none")] + module: Option, + #[serde(skip_serializing_if = "Option::is_none")] + file: Option, + #[serde(skip_serializing_if = "Option::is_none")] + line: Option, + #[serde(flatten)] + extra: serde_json::Map, +} + +impl VLogEntry { + pub fn new(msg: String, level: &str, target: &str) -> Self { + Self { + _msg: msg, + timestamp: chrono::Utc::now().to_rfc3339(), + level: level.to_string(), + target: target.to_string(), + module: None, + file: None, + line: None, + extra: serde_json::Map::new(), + } + } + + pub fn module(mut self, m: String) -> Self { + self.module = Some(m); + self + } + + pub fn file(mut self, f: String) -> Self { + self.file = Some(f); + self + } + + pub fn line(mut self, l: u32) -> Self { + self.line = Some(l); + self + } + + pub fn field(mut self, key: String, value: serde_json::Value) -> Self { + self.extra.insert(key, value); + self + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn json_order_has_msg_first() { + let entry = VLogEntry::new("GET /api/users".to_string(), "INFO", "my::module") + .module("my::module".to_string()) + .field("status".to_string(), serde_json::json!(200)) + .field("duration_ms".to_string(), serde_json::json!(42)); + + let json = serde_json::to_string(&entry).unwrap(); + assert!(json.starts_with(r#"{"_msg":"GET /api/users""#)); + } +} diff --git a/libs/observability/src/otlp.rs b/libs/observability/src/otlp.rs index 4dcd370..6618776 100644 --- a/libs/observability/src/otlp.rs +++ b/libs/observability/src/otlp.rs @@ -8,10 +8,12 @@ //! already registered. This function rebuilds the global subscriber with the //! OTLP tracing layer on top. +use crate::msg_json_fmt::MsgJsonFormat; + use opentelemetry::trace::TracerProvider; use opentelemetry_otlp::{SpanExporter, WithExportConfig}; use opentelemetry_sdk::trace as sdktrace; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; +use tracing_subscriber::{fmt, fmt::Layer as FmtLayer, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; /// Guard that shuts down the OTLP pipeline on drop. #[must_use] @@ -59,13 +61,8 @@ pub fn init_otlp( let env_filter = EnvFilter::try_from_default_env() .unwrap_or_else(|_| EnvFilter::new(log_level)); - let fmt_layer = tracing_subscriber::fmt::layer() - .json() - .with_target(true) - .with_thread_ids(false) - .with_file(true) - .with_line_number(true) - .flatten_event(true); + let fmt_layer: FmtLayer<_, _, _, _> = fmt::layer() + .event_format(MsgJsonFormat); let tracer_provider = sdktrace::SdkTracerProvider::builder() .with_batch_exporter(exporter) diff --git a/libs/observability/src/tracing_fmt.rs b/libs/observability/src/tracing_fmt.rs index 3a5ec02..230e640 100644 --- a/libs/observability/src/tracing_fmt.rs +++ b/libs/observability/src/tracing_fmt.rs @@ -1,12 +1,14 @@ //! Tracing subscriber initialisation with JSON output. //! -//! Uses `tracing_subscriber::fmt` with a custom `Make` impl that injects -//! `instance_id` into every log line. +//! Uses a custom `MsgJsonFormat` that injects `_msg` as the first field +//! for VictoriaLogs compatibility. + +use crate::msg_json_fmt::MsgJsonFormat; use once_cell::sync::Lazy; use std::str::FromStr; use tracing_subscriber::{ - fmt::{self, format::FmtSpan}, + fmt::{self, format::FmtSpan, Layer as FmtLayer}, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, @@ -34,8 +36,8 @@ pub fn instance_id() -> String { /// Initialises the global tracing subscriber with JSON-formatted output to stderr. /// -/// Each JSON line includes `ts`, `level`, `target` (module), `fields` (structured kv), -/// `line`, `file`, and `instance_id`. +/// Each JSON line includes `_msg` (first field), `timestamp`, `level`, `target`, +/// `file`, `line`, and structured event fields. /// `RUST_LOG` env var controls the log level filter. /// /// Pass `defer = true` when OTLP will be initialized afterwards via `init_otlp()`; @@ -46,22 +48,15 @@ pub fn init_tracing_subscriber(level: &str, defer: bool) { .or_else(|_| EnvFilter::from_str(level)) .expect("invalid log level"); - let fmt_layer = fmt::layer() - .json() - .with_target(true) - .with_thread_ids(false) - .with_file(true) - .with_line_number(true) - .with_span_events(FmtSpan::CLOSE) - .flatten_event(true); + let mut fmt_layer: FmtLayer<_, _, _, _> = fmt::layer() + .event_format(MsgJsonFormat); + fmt_layer.set_span_events(FmtSpan::CLOSE); let registry = tracing_subscriber::registry() .with(env_filter) .with(fmt_layer); if defer { - // Caller will invoke init_otlp() which builds the full subscriber - // including the OTLP layer, then calls try_init() once. return; } diff --git a/libs/observability/src/tracing_middleware.rs b/libs/observability/src/tracing_middleware.rs index 736a443..e44b956 100644 --- a/libs/observability/src/tracing_middleware.rs +++ b/libs/observability/src/tracing_middleware.rs @@ -74,6 +74,9 @@ where let service = self.service.clone(); Box::pin(async move { + // Set _msg for VictoriaLogs before entering the span. + // The JSON formatter reads this thread-local value. + crate::msg_json_fmt::set_span_msg(format!("{} {}", method, path)); let span = tracing::info_span!("HTTP {method} {path}", method = %method, path = %path); let _guard = span.enter(); let res = service.call(req).await?;