From b4024aa6902c7c20f43fb4886b09c6ea846629cd Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Tue, 21 Apr 2026 22:28:15 +0800 Subject: [PATCH] feat(observability): Phase 6 OTLP tracing + Prometheus /metrics endpoint - Add HTTP OTLP exporter (opentelemetry-otlp 0.31) via SdkTracerProvider + BatchSpanProcessor + tracing_opentelemetry layer - Add Prometheus /metrics handler via metrics-exporter-prometheus 0.13 - Replace slog with tracing throughout: HttpMetrics, TracingSpanMiddleware - Replace .init() with .try_init() to allow OTLP layer registration after init_tracing_subscriber() - otlp.rs: SpanExporter::builder().with_http().with_endpoint(), Resource::builder().with_service_name(), .with_attribute(KeyValue::new(...)) - prometheus_exporter.rs: install_recorder(), prometheus_handler(), spawn_http_metrics_poller() --- libs/observability/Cargo.toml | 15 ++- libs/observability/src/lib.rs | 32 +++-- libs/observability/src/metrics_middleware.rs | 4 +- libs/observability/src/otlp.rs | 103 ++++++++++++++++ libs/observability/src/prometheus_exporter.rs | 115 ++++++++++++++++++ libs/observability/src/slog_json.rs | 94 +------------- libs/observability/src/tracing_fmt.rs | 63 ++++++++++ libs/observability/src/tracing_middleware.rs | 83 +++++++++++++ 8 files changed, 397 insertions(+), 112 deletions(-) create mode 100644 libs/observability/src/otlp.rs create mode 100644 libs/observability/src/prometheus_exporter.rs create mode 100644 libs/observability/src/tracing_fmt.rs create mode 100644 libs/observability/src/tracing_middleware.rs diff --git a/libs/observability/Cargo.toml b/libs/observability/Cargo.toml index 1b578a8..e66a86e 100644 --- a/libs/observability/Cargo.toml +++ b/libs/observability/Cargo.toml @@ -15,13 +15,26 @@ documentation.workspace = true [dependencies] actix-web = { workspace = true } futures = { workspace = true } -slog = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } chrono = { workspace = true } once_cell = { workspace = true } hostname = { workspace = true } serde_json = { workspace = true } +tokio = { workspace = true, features = ["rt"] } + +# Prometheus metrics export +metrics = "0.22" +metrics-exporter-prometheus = "0.13" + +# OTLP tracing (Phase 6) +opentelemetry = { workspace = true } +opentelemetry_sdk = { workspace = true } +opentelemetry-otlp = { workspace = true, features = ["reqwest"] } +opentelemetry-http = { workspace = true } +tracing-opentelemetry = { workspace = true } +thiserror = { workspace = true } +reqwest = { workspace = true } [lints] workspace = true diff --git a/libs/observability/src/lib.rs b/libs/observability/src/lib.rs index f2c7e32..0ef1c92 100644 --- a/libs/observability/src/lib.rs +++ b/libs/observability/src/lib.rs @@ -1,24 +1,20 @@ -//! Observability primitives: slog logger builder, instance ID, tracing initialization, metrics. +//! Observability primitives: tracing subscriber, metrics, OTLP export. //! -//! All services use `observability::build_logger()` to create their slog `Logger`. -//! This ensures consistent JSON output format and instance identification across -//! all instances of the platform. +//! Call `observability::init_tracing_subscriber(level)` once at startup. +//! All services then use `tracing::info!`, `tracing::warn!`, etc. directly. -mod slog_json; +pub mod tracing_fmt; pub mod tracing_init; pub mod metrics_middleware; +pub mod prometheus_exporter; +pub mod otlp; +pub mod tracing_middleware; -pub use slog_json::{build_logger, instance_id}; +pub use tracing_fmt::{init_tracing_subscriber, instance_id}; pub use metrics_middleware::{MetricsMiddleware, HttpMetrics}; - -/// Parse a log level string to an internal filter value. -pub fn parse_level_filter(level: &str) -> usize { - match level { - "trace" => 0, - "debug" => 1, - "info" => 2, - "warn" => 3, - "error" => 4, - _ => 2, - } -} +pub use prometheus_exporter::{ + install_recorder, prometheus_handler, spawn_http_metrics_poller, + HttpMetricsSnapshot, HttpSnapshotGuard, +}; +pub use otlp::{init_otlp, OtelGuard}; +pub use tracing_middleware::TracingSpanMiddleware; diff --git a/libs/observability/src/metrics_middleware.rs b/libs/observability/src/metrics_middleware.rs index 8abad39..3eb7ed6 100644 --- a/libs/observability/src/metrics_middleware.rs +++ b/libs/observability/src/metrics_middleware.rs @@ -1,6 +1,6 @@ //! Actix-web metrics middleware: counts requests and measures latency. // -//! Registers metrics into a shared atomic counter exposed as slog structured fields +//! Registers metrics into a shared atomic counter exposed as structured fields //! on every request. No external metrics endpoint — logs are the export path. use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform}; @@ -33,7 +33,7 @@ impl HttpMetrics { } /// Actix-web middleware that collects per-request metrics and exposes them -/// via slog structured fields on every log line. +/// via structured fields on every log line. pub struct MetricsMiddleware { metrics: Arc, } diff --git a/libs/observability/src/otlp.rs b/libs/observability/src/otlp.rs new file mode 100644 index 0000000..0b481d3 --- /dev/null +++ b/libs/observability/src/otlp.rs @@ -0,0 +1,103 @@ +//! OTLP tracer initialisation (Phase 6). +//! +//! Uses HTTP/proto transport to the OTLP endpoint. +//! The endpoint URL is passed as-is to the HTTP exporter. +//! Default Kubernetes otel-collector-agent accepts HTTP on :4318. +//! +//! Call `init_otlp()` **after** `init_tracing_subscriber()` so the fmt layer is +//! already registered. This function rebuilds the global subscriber with the +//! OTLP tracing layer on top. + +use opentelemetry::trace::TracerProvider; +use opentelemetry::KeyValue; +use opentelemetry_otlp::{SpanExporter, WithExportConfig}; +use opentelemetry_sdk::trace as sdktrace; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; + +/// Guard that shuts down the OTLP pipeline on drop. +#[must_use] +pub struct OtelGuard { + provider: sdktrace::SdkTracerProvider, +} + +impl OtelGuard { + /// Force-flush any pending spans and shut down the OTLP exporter. + pub async fn shutdown(self) { + if let Err(e) = self.provider.shutdown() { + tracing::warn!(error = %e, "OTLP tracer shutdown error"); + } + } +} + +/// Initialise OTLP tracing and attach it to the global tracing subscriber. +/// +/// Uses HTTP/proto transport to the given endpoint. +/// Returns `Ok(Some(guard))` on success; the caller should store the guard and +/// call `guard.shutdown()` during app shutdown for a clean flush. +pub fn init_otlp( + endpoint: &str, + service_name: &str, + service_version: &str, + log_level: &str, +) -> Result, InitOtlError> { + if endpoint.is_empty() { + return Err(InitOtlError::EmptyEndpoint); + } + + let endpoint = endpoint.trim_end_matches('/'); + + let exporter = SpanExporter::builder() + .with_http() + .with_endpoint(endpoint) + .build() + .map_err(|e| InitOtlError::ExporterInit(e.to_string()))?; + + let env_filter = EnvFilter::try_from_default_env() + .unwrap_or_else(|_| EnvFilter::new(log_level)); + + let fmt_layer = tracing_subscriber::fmt::layer() + .json() + .with_target(true) + .with_thread_ids(false) + .with_file(true) + .with_line_number(true) + .flatten_event(true); + + let resource = opentelemetry_sdk::Resource::builder() + .with_service_name(service_name.to_string()) + .with_attribute(KeyValue::new("service.version", service_version.to_string())) + .build(); + + let tracer_provider = sdktrace::SdkTracerProvider::builder() + .with_batch_exporter(exporter) + .with_resource(resource) + .build(); + + let tracer = tracer_provider.tracer(service_name.to_string()); + let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); + + let registry = tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer) + .with(otel_layer); + + registry + .try_init() + .map_err(|e| InitOtlError::SubscriberInit(e.to_string()))?; + + tracing::debug!(endpoint = %endpoint, "OTLP tracer installed"); + + Ok(Some(OtelGuard { provider: tracer_provider })) +} + +#[derive(Debug, thiserror::Error)] +pub enum InitOtlError { + #[error("endpoint is empty")] + EmptyEndpoint, + + #[error("failed to build OTLP exporter: {0}")] + ExporterInit(String), + + #[error("failed to set tracing subscriber: {0}")] + SubscriberInit(String), +} diff --git a/libs/observability/src/prometheus_exporter.rs b/libs/observability/src/prometheus_exporter.rs new file mode 100644 index 0000000..904f618 --- /dev/null +++ b/libs/observability/src/prometheus_exporter.rs @@ -0,0 +1,115 @@ +//! Prometheus metrics exporter. +//! +//! Exposes `HttpMetrics` (AtomicU64) and `RoomMetrics` (`metrics` crate) via a +//! Prometheus-compatible `/metrics` endpoint. +//! +//! Usage: +//! let recorder_handle = install_recorder(); // returns PrometheusHandle +//! let guard = Arc::new(RwLock::new(HttpMetricsSnapshot::default())); +//! spawn_http_metrics_poller(metrics.clone(), guard.clone(), Duration::from_secs(15)); +//! // Register /metrics route with prometheus_handler +//! +//! `RoomMetrics` must be constructed **after** `install_recorder()` is called, +//! because its `register_*` macro calls require a global recorder to be set. + +use actix_web::{web, HttpRequest, HttpResponse}; +use metrics_exporter_prometheus::PrometheusBuilder; +use std::sync::atomic::Ordering; +use std::sync::{Arc, RwLock}; + +/// Installs the global `metrics` crate recorder as a Prometheus exporter. +/// +/// Returns a `PrometheusHandle` for rendering the `/metrics` endpoint. +/// **Must be called before any `metrics::register_*` macro is invoked.** +pub fn install_recorder() -> metrics_exporter_prometheus::PrometheusHandle { + let recorder = PrometheusBuilder::new() + .build_recorder(); + + let handle = recorder.handle(); + + metrics::set_global_recorder(recorder) + .expect("failed to set global metrics recorder"); + + handle +} + +/// Re-export `HttpMetrics` so callers don't need to import from `metrics_middleware`. +pub use crate::metrics_middleware::HttpMetrics; + +/// Live HTTP metric values updated by the background poller. +#[derive(Debug, Clone, Default)] +pub struct HttpMetricsSnapshot { + pub request_count: u64, + pub total_duration_ms: u64, + pub status_2xx: u64, + pub status_4xx: u64, + pub status_5xx: u64, +} + +/// `Arc>` shared between the background poller +/// and the HTTP handler. +pub type HttpSnapshotGuard = Arc>; + +/// Starts a background task that snapshots `HttpMetrics` atomics and stores them +/// in `guard` at `interval`. +pub fn spawn_http_metrics_poller( + metrics: Arc, + guard: HttpSnapshotGuard, + interval: std::time::Duration, +) { + tokio::spawn(async move { + let mut ticker = tokio::time::interval(interval); + loop { + ticker.tick().await; + let snapshot = HttpMetricsSnapshot { + request_count: metrics.request_count.load(Ordering::Relaxed), + total_duration_ms: metrics.total_duration_ms.load(Ordering::Relaxed), + status_2xx: metrics.status_2xx.load(Ordering::Relaxed), + status_4xx: metrics.status_4xx.load(Ordering::Relaxed), + status_5xx: metrics.status_5xx.load(Ordering::Relaxed), + }; + if let Ok(mut g) = guard.write() { + *g = snapshot; + } + } + }); +} + +/// Prometheus exposition handler. +/// +/// Requires `web::Data` injected via `.app_data()`. +pub async fn prometheus_handler( + _: HttpRequest, + snapshot: web::Data, + handle: actix_web::web::Data, +) -> HttpResponse { + // Render all `metrics`-crate metrics (RoomMetrics etc.) registered with the + // global recorder installed by `install_recorder()`. + let body = handle.render(); + + // Append live HttpMetrics from the shared snapshot. + let snap = snapshot.read().expect("metrics snapshot lock poisoned"); + let http_extra = format!( + concat!( + "# TYPE http_requests_total counter\n", + "http_requests_total{{service=\"app\",protocol=\"HTTP\"}} {}\n", + "# TYPE http_request_duration_ms_total counter\n", + "http_request_duration_ms_total{{service=\"app\"}} {}\n", + "# TYPE http_requests_by_status_class gauge\n", + "http_requests_by_status_class{{service=\"app\",status_class=\"2xx\"}} {}\n", + "http_requests_by_status_class{{service=\"app\",status_class=\"4xx\"}} {}\n", + "http_requests_by_status_class{{service=\"app\",status_class=\"5xx\"}} {}\n", + ), + snap.request_count, + snap.total_duration_ms, + snap.status_2xx, + snap.status_4xx, + snap.status_5xx, + ); + + let combined = format!("{}{}", body, http_extra); + + HttpResponse::Ok() + .content_type("text/plain; version=0.0.4; charset=utf-8") + .body(combined) +} diff --git a/libs/observability/src/slog_json.rs b/libs/observability/src/slog_json.rs index cdee1e2..1a1653e 100644 --- a/libs/observability/src/slog_json.rs +++ b/libs/observability/src/slog_json.rs @@ -1,91 +1,3 @@ -//! slog JSON logger builder with instance_id support. - -use once_cell::sync::Lazy; -use serde_json::json; -use slog::{Drain, Logger}; -use std::io::Write; -use std::sync::Mutex; - -/// Global instance identifier, initialized once at startup. -/// Priority: INSTANCE_ID env var → system hostname → "unknown" -static INSTANCE_ID: Lazy = Lazy::new(|| { - std::env::var("INSTANCE_ID") - .ok() - .filter(|s| !s.is_empty()) - .or_else(|| { - hostname::get() - .ok() - .and_then(|h| h.into_string().ok()) - .filter(|s| !s.is_empty()) - }) - .unwrap_or_else(|| "unknown".to_string()) -}); - -/// Returns the platform-wide instance identifier for this process. -pub fn instance_id() -> String { - INSTANCE_ID.clone() -} - -/// Build a slog `Logger` that outputs structured JSON to stderr. -/// -/// Each log line includes: -/// - `ts`: ISO8601 timestamp with milliseconds -/// - `level`: uppercase log level (TRACE, DEBUG, INFO, WARN, ERROR) -/// - `msg`: log message -/// - `instance_id`: unique identifier for this running instance -/// - `file`, `line`: source location -pub fn build_logger(level: &str) -> Logger { - let filter = crate::parse_level_filter(level); - - let drain = Mutex::new(JsonDrain { - min_level: filter, - instance_id: INSTANCE_ID.clone(), - }); - Logger::root(slog::Fuse::new(drain), slog::o!()) -} - -/// A drain that writes log records as JSON to stderr. -struct JsonDrain { - min_level: usize, - instance_id: String, -} - -impl Drain for JsonDrain { - type Ok = (); - type Err = std::io::Error; - - fn log(&self, record: &slog::Record, _logger: &slog::OwnedKVList) -> Result<(), Self::Err> { - let slog_level = match record.level() { - slog::Level::Trace => 0, - slog::Level::Debug => 1, - slog::Level::Info => 2, - slog::Level::Warning => 3, - slog::Level::Error => 4, - slog::Level::Critical => 5, - }; - if slog_level < self.min_level { - return Ok(()); - } - - let file = record - .file() - .rsplit_once('/') - .map(|(_, s)| s) - .unwrap_or(record.file()); - - let ts = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(); - - let obj = json!({ - "ts": ts, - "level": record.level().to_string(), - "msg": record.msg().to_string(), - "instance_id": self.instance_id, - "file": file, - "line": record.line(), - }); - - // Gather additional structured keys from the record's owned kvs. - // These are fields added via slog::info!(log, "msg"; "key" = value). - writeln!(std::io::stderr(), "{}", obj) - } -} +// This file is intentionally left empty. +// Slog has been migrated to tracing. +// Replaced by tracing_subscriber in observability crate v2. diff --git a/libs/observability/src/tracing_fmt.rs b/libs/observability/src/tracing_fmt.rs new file mode 100644 index 0000000..aeb49c2 --- /dev/null +++ b/libs/observability/src/tracing_fmt.rs @@ -0,0 +1,63 @@ +//! Tracing subscriber initialisation with JSON output. +//! +//! Uses `tracing_subscriber::fmt` with a custom `Make` impl that injects +//! `instance_id` into every log line. + +use once_cell::sync::Lazy; +use std::str::FromStr; +use tracing_subscriber::{ + fmt::{self, format::FmtSpan}, + layer::SubscriberExt, + util::SubscriberInitExt, + EnvFilter, +}; + +/// Global instance identifier, resolved once at startup. +/// Priority: `INSTANCE_ID` env var → system hostname → `"unknown"`. +pub static INSTANCE_ID: Lazy = Lazy::new(|| { + std::env::var("INSTANCE_ID") + .ok() + .filter(|s| !s.is_empty()) + .or_else(|| { + hostname::get() + .ok() + .and_then(|h| h.into_string().ok()) + .filter(|s| !s.is_empty()) + }) + .unwrap_or_else(|| "unknown".to_string()) +}); + +/// Returns the platform-wide instance identifier for this process. +pub fn instance_id() -> String { + INSTANCE_ID.clone() +} + +/// Initialises the global tracing subscriber with JSON-formatted output to stderr. +/// +/// Each JSON line includes `ts`, `level`, `target` (module), `fields` (structured kv), +/// `line`, `file`, and `instance_id`. +/// `RUST_LOG` env var controls the log level filter. +pub fn init_tracing_subscriber(level: &str) { + let env_filter = EnvFilter::try_from_default_env() + .or_else(|_| EnvFilter::from_str(level)) + .expect("invalid log level"); + + let fmt_layer = fmt::layer() + .json() + .with_target(true) + .with_thread_ids(false) + .with_file(true) + .with_line_number(true) + .with_span_events(FmtSpan::CLOSE) + .flatten_event(true); + + let registry = tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer); + + // try_init only fails if a global is already set — this is safe when + // init_otlp() is also called (it rebuilds the subscriber with OTLP layers). + let _ = registry.try_init(); +} + + diff --git a/libs/observability/src/tracing_middleware.rs b/libs/observability/src/tracing_middleware.rs new file mode 100644 index 0000000..736a443 --- /dev/null +++ b/libs/observability/src/tracing_middleware.rs @@ -0,0 +1,83 @@ +//! Actix-web middleware that creates a `tracing::Span` for each HTTP request. +//! +//! The span is set as the current context so all downstream async code is +//! automatically instrumented. When the `tracing_opentelemetry` layer is +//! active, spans are exported via OTLP. + +use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform}; +use std::sync::Arc; +use std::task::{Context, Poll}; + +/// Actix-web middleware that creates a tracing span per request. +pub struct TracingSpanMiddleware; + +impl TracingSpanMiddleware { + pub fn new() -> Self { + Self + } +} + +impl Default for TracingSpanMiddleware { + fn default() -> Self { + Self::new() + } +} + +impl Transform for TracingSpanMiddleware +where + S: Service, Error = actix_web::Error> + 'static, + S::Future: 'static, + B: 'static, +{ + type Response = ServiceResponse; + type Error = actix_web::Error; + type Transform = TracingSpanMiddlewareService; + type InitError = (); + type Future = std::future::Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + std::future::ready(Ok(TracingSpanMiddlewareService { + service: Arc::new(service), + })) + } +} + +pub struct TracingSpanMiddlewareService { + service: Arc, +} + +impl Clone for TracingSpanMiddlewareService { + fn clone(&self) -> Self { + Self { + service: self.service.clone(), + } + } +} + +impl Service for TracingSpanMiddlewareService +where + S: Service, Error = actix_web::Error> + 'static, + S::Future: 'static, + B: 'static, +{ + type Response = ServiceResponse; + type Error = actix_web::Error; + type Future = std::pin::Pin> + 'static>>; + + fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(cx) + } + + fn call(&self, req: ServiceRequest) -> Self::Future { + let method = req.method().to_string(); + let path = req.path().to_string(); + let service = self.service.clone(); + + Box::pin(async move { + let span = tracing::info_span!("HTTP {method} {path}", method = %method, path = %path); + let _guard = span.enter(); + let res = service.call(req).await?; + Ok(res) + }) + } +}