feat(observability): Phase 6 OTLP tracing + Prometheus /metrics endpoint

- Add HTTP OTLP exporter (opentelemetry-otlp 0.31) via SdkTracerProvider +
  BatchSpanProcessor + tracing_opentelemetry layer
- Add Prometheus /metrics handler via metrics-exporter-prometheus 0.13
- Replace slog with tracing throughout: HttpMetrics, TracingSpanMiddleware
- Replace .init() with .try_init() to allow OTLP layer registration after
  init_tracing_subscriber()
- otlp.rs: SpanExporter::builder().with_http().with_endpoint(),
  Resource::builder().with_service_name(), .with_attribute(KeyValue::new(...))
- prometheus_exporter.rs: install_recorder(), prometheus_handler(),
  spawn_http_metrics_poller()
This commit is contained in:
ZhenYi 2026-04-21 22:28:15 +08:00
parent 418f9a5d8b
commit b4024aa690
8 changed files with 397 additions and 112 deletions

View File

@ -15,13 +15,26 @@ documentation.workspace = true
[dependencies]
actix-web = { workspace = true }
futures = { workspace = true }
slog = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
chrono = { workspace = true }
once_cell = { workspace = true }
hostname = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["rt"] }
# Prometheus metrics export
metrics = "0.22"
metrics-exporter-prometheus = "0.13"
# OTLP tracing (Phase 6)
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
opentelemetry-otlp = { workspace = true, features = ["reqwest"] }
opentelemetry-http = { workspace = true }
tracing-opentelemetry = { workspace = true }
thiserror = { workspace = true }
reqwest = { workspace = true }
[lints]
workspace = true

View File

@ -1,24 +1,20 @@
//! Observability primitives: slog logger builder, instance ID, tracing initialization, metrics.
//! Observability primitives: tracing subscriber, metrics, OTLP export.
//!
//! All services use `observability::build_logger()` to create their slog `Logger`.
//! This ensures consistent JSON output format and instance identification across
//! all instances of the platform.
//! Call `observability::init_tracing_subscriber(level)` once at startup.
//! All services then use `tracing::info!`, `tracing::warn!`, etc. directly.
mod slog_json;
pub mod tracing_fmt;
pub mod tracing_init;
pub mod metrics_middleware;
pub mod prometheus_exporter;
pub mod otlp;
pub mod tracing_middleware;
pub use slog_json::{build_logger, instance_id};
pub use tracing_fmt::{init_tracing_subscriber, instance_id};
pub use metrics_middleware::{MetricsMiddleware, HttpMetrics};
/// Parse a log level string to an internal filter value.
pub fn parse_level_filter(level: &str) -> usize {
match level {
"trace" => 0,
"debug" => 1,
"info" => 2,
"warn" => 3,
"error" => 4,
_ => 2,
}
}
pub use prometheus_exporter::{
install_recorder, prometheus_handler, spawn_http_metrics_poller,
HttpMetricsSnapshot, HttpSnapshotGuard,
};
pub use otlp::{init_otlp, OtelGuard};
pub use tracing_middleware::TracingSpanMiddleware;

View File

@ -1,6 +1,6 @@
//! Actix-web metrics middleware: counts requests and measures latency.
//
//! Registers metrics into a shared atomic counter exposed as slog structured fields
//! Registers metrics into a shared atomic counter exposed as structured fields
//! on every request. No external metrics endpoint — logs are the export path.
use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform};
@ -33,7 +33,7 @@ impl HttpMetrics {
}
/// Actix-web middleware that collects per-request metrics and exposes them
/// via slog structured fields on every log line.
/// via structured fields on every log line.
pub struct MetricsMiddleware {
metrics: Arc<HttpMetrics>,
}

View File

@ -0,0 +1,103 @@
//! OTLP tracer initialisation (Phase 6).
//!
//! Uses HTTP/proto transport to the OTLP endpoint.
//! The endpoint URL is passed as-is to the HTTP exporter.
//! Default Kubernetes otel-collector-agent accepts HTTP on :4318.
//!
//! Call `init_otlp()` **after** `init_tracing_subscriber()` so the fmt layer is
//! already registered. This function rebuilds the global subscriber with the
//! OTLP tracing layer on top.
use opentelemetry::trace::TracerProvider;
use opentelemetry::KeyValue;
use opentelemetry_otlp::{SpanExporter, WithExportConfig};
use opentelemetry_sdk::trace as sdktrace;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
/// Guard that shuts down the OTLP pipeline on drop.
#[must_use]
pub struct OtelGuard {
provider: sdktrace::SdkTracerProvider,
}
impl OtelGuard {
/// Force-flush any pending spans and shut down the OTLP exporter.
pub async fn shutdown(self) {
if let Err(e) = self.provider.shutdown() {
tracing::warn!(error = %e, "OTLP tracer shutdown error");
}
}
}
/// Initialise OTLP tracing and attach it to the global tracing subscriber.
///
/// Uses HTTP/proto transport to the given endpoint.
/// Returns `Ok(Some(guard))` on success; the caller should store the guard and
/// call `guard.shutdown()` during app shutdown for a clean flush.
pub fn init_otlp(
endpoint: &str,
service_name: &str,
service_version: &str,
log_level: &str,
) -> Result<Option<OtelGuard>, InitOtlError> {
if endpoint.is_empty() {
return Err(InitOtlError::EmptyEndpoint);
}
let endpoint = endpoint.trim_end_matches('/');
let exporter = SpanExporter::builder()
.with_http()
.with_endpoint(endpoint)
.build()
.map_err(|e| InitOtlError::ExporterInit(e.to_string()))?;
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new(log_level));
let fmt_layer = tracing_subscriber::fmt::layer()
.json()
.with_target(true)
.with_thread_ids(false)
.with_file(true)
.with_line_number(true)
.flatten_event(true);
let resource = opentelemetry_sdk::Resource::builder()
.with_service_name(service_name.to_string())
.with_attribute(KeyValue::new("service.version", service_version.to_string()))
.build();
let tracer_provider = sdktrace::SdkTracerProvider::builder()
.with_batch_exporter(exporter)
.with_resource(resource)
.build();
let tracer = tracer_provider.tracer(service_name.to_string());
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
let registry = tracing_subscriber::registry()
.with(env_filter)
.with(fmt_layer)
.with(otel_layer);
registry
.try_init()
.map_err(|e| InitOtlError::SubscriberInit(e.to_string()))?;
tracing::debug!(endpoint = %endpoint, "OTLP tracer installed");
Ok(Some(OtelGuard { provider: tracer_provider }))
}
#[derive(Debug, thiserror::Error)]
pub enum InitOtlError {
#[error("endpoint is empty")]
EmptyEndpoint,
#[error("failed to build OTLP exporter: {0}")]
ExporterInit(String),
#[error("failed to set tracing subscriber: {0}")]
SubscriberInit(String),
}

View File

@ -0,0 +1,115 @@
//! Prometheus metrics exporter.
//!
//! Exposes `HttpMetrics` (AtomicU64) and `RoomMetrics` (`metrics` crate) via a
//! Prometheus-compatible `/metrics` endpoint.
//!
//! Usage:
//! let recorder_handle = install_recorder(); // returns PrometheusHandle
//! let guard = Arc::new(RwLock::new(HttpMetricsSnapshot::default()));
//! spawn_http_metrics_poller(metrics.clone(), guard.clone(), Duration::from_secs(15));
//! // Register /metrics route with prometheus_handler
//!
//! `RoomMetrics` must be constructed **after** `install_recorder()` is called,
//! because its `register_*` macro calls require a global recorder to be set.
use actix_web::{web, HttpRequest, HttpResponse};
use metrics_exporter_prometheus::PrometheusBuilder;
use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock};
/// Installs the global `metrics` crate recorder as a Prometheus exporter.
///
/// Returns a `PrometheusHandle` for rendering the `/metrics` endpoint.
/// **Must be called before any `metrics::register_*` macro is invoked.**
pub fn install_recorder() -> metrics_exporter_prometheus::PrometheusHandle {
let recorder = PrometheusBuilder::new()
.build_recorder();
let handle = recorder.handle();
metrics::set_global_recorder(recorder)
.expect("failed to set global metrics recorder");
handle
}
/// Re-export `HttpMetrics` so callers don't need to import from `metrics_middleware`.
pub use crate::metrics_middleware::HttpMetrics;
/// Live HTTP metric values updated by the background poller.
#[derive(Debug, Clone, Default)]
pub struct HttpMetricsSnapshot {
pub request_count: u64,
pub total_duration_ms: u64,
pub status_2xx: u64,
pub status_4xx: u64,
pub status_5xx: u64,
}
/// `Arc<RwLock<HttpMetricsSnapshot>>` shared between the background poller
/// and the HTTP handler.
pub type HttpSnapshotGuard = Arc<RwLock<HttpMetricsSnapshot>>;
/// Starts a background task that snapshots `HttpMetrics` atomics and stores them
/// in `guard` at `interval`.
pub fn spawn_http_metrics_poller(
metrics: Arc<HttpMetrics>,
guard: HttpSnapshotGuard,
interval: std::time::Duration,
) {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
loop {
ticker.tick().await;
let snapshot = HttpMetricsSnapshot {
request_count: metrics.request_count.load(Ordering::Relaxed),
total_duration_ms: metrics.total_duration_ms.load(Ordering::Relaxed),
status_2xx: metrics.status_2xx.load(Ordering::Relaxed),
status_4xx: metrics.status_4xx.load(Ordering::Relaxed),
status_5xx: metrics.status_5xx.load(Ordering::Relaxed),
};
if let Ok(mut g) = guard.write() {
*g = snapshot;
}
}
});
}
/// Prometheus exposition handler.
///
/// Requires `web::Data<HttpSnapshotGuard>` injected via `.app_data()`.
pub async fn prometheus_handler(
_: HttpRequest,
snapshot: web::Data<HttpSnapshotGuard>,
handle: actix_web::web::Data<metrics_exporter_prometheus::PrometheusHandle>,
) -> HttpResponse {
// Render all `metrics`-crate metrics (RoomMetrics etc.) registered with the
// global recorder installed by `install_recorder()`.
let body = handle.render();
// Append live HttpMetrics from the shared snapshot.
let snap = snapshot.read().expect("metrics snapshot lock poisoned");
let http_extra = format!(
concat!(
"# TYPE http_requests_total counter\n",
"http_requests_total{{service=\"app\",protocol=\"HTTP\"}} {}\n",
"# TYPE http_request_duration_ms_total counter\n",
"http_request_duration_ms_total{{service=\"app\"}} {}\n",
"# TYPE http_requests_by_status_class gauge\n",
"http_requests_by_status_class{{service=\"app\",status_class=\"2xx\"}} {}\n",
"http_requests_by_status_class{{service=\"app\",status_class=\"4xx\"}} {}\n",
"http_requests_by_status_class{{service=\"app\",status_class=\"5xx\"}} {}\n",
),
snap.request_count,
snap.total_duration_ms,
snap.status_2xx,
snap.status_4xx,
snap.status_5xx,
);
let combined = format!("{}{}", body, http_extra);
HttpResponse::Ok()
.content_type("text/plain; version=0.0.4; charset=utf-8")
.body(combined)
}

View File

@ -1,91 +1,3 @@
//! slog JSON logger builder with instance_id support.
use once_cell::sync::Lazy;
use serde_json::json;
use slog::{Drain, Logger};
use std::io::Write;
use std::sync::Mutex;
/// Global instance identifier, initialized once at startup.
/// Priority: INSTANCE_ID env var → system hostname → "unknown"
static INSTANCE_ID: Lazy<String> = Lazy::new(|| {
std::env::var("INSTANCE_ID")
.ok()
.filter(|s| !s.is_empty())
.or_else(|| {
hostname::get()
.ok()
.and_then(|h| h.into_string().ok())
.filter(|s| !s.is_empty())
})
.unwrap_or_else(|| "unknown".to_string())
});
/// Returns the platform-wide instance identifier for this process.
pub fn instance_id() -> String {
INSTANCE_ID.clone()
}
/// Build a slog `Logger` that outputs structured JSON to stderr.
///
/// Each log line includes:
/// - `ts`: ISO8601 timestamp with milliseconds
/// - `level`: uppercase log level (TRACE, DEBUG, INFO, WARN, ERROR)
/// - `msg`: log message
/// - `instance_id`: unique identifier for this running instance
/// - `file`, `line`: source location
pub fn build_logger(level: &str) -> Logger {
let filter = crate::parse_level_filter(level);
let drain = Mutex::new(JsonDrain {
min_level: filter,
instance_id: INSTANCE_ID.clone(),
});
Logger::root(slog::Fuse::new(drain), slog::o!())
}
/// A drain that writes log records as JSON to stderr.
struct JsonDrain {
min_level: usize,
instance_id: String,
}
impl Drain for JsonDrain {
type Ok = ();
type Err = std::io::Error;
fn log(&self, record: &slog::Record, _logger: &slog::OwnedKVList) -> Result<(), Self::Err> {
let slog_level = match record.level() {
slog::Level::Trace => 0,
slog::Level::Debug => 1,
slog::Level::Info => 2,
slog::Level::Warning => 3,
slog::Level::Error => 4,
slog::Level::Critical => 5,
};
if slog_level < self.min_level {
return Ok(());
}
let file = record
.file()
.rsplit_once('/')
.map(|(_, s)| s)
.unwrap_or(record.file());
let ts = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
let obj = json!({
"ts": ts,
"level": record.level().to_string(),
"msg": record.msg().to_string(),
"instance_id": self.instance_id,
"file": file,
"line": record.line(),
});
// Gather additional structured keys from the record's owned kvs.
// These are fields added via slog::info!(log, "msg"; "key" = value).
writeln!(std::io::stderr(), "{}", obj)
}
}
// This file is intentionally left empty.
// Slog has been migrated to tracing.
// Replaced by tracing_subscriber in observability crate v2.

View File

@ -0,0 +1,63 @@
//! Tracing subscriber initialisation with JSON output.
//!
//! Uses `tracing_subscriber::fmt` with a custom `Make` impl that injects
//! `instance_id` into every log line.
use once_cell::sync::Lazy;
use std::str::FromStr;
use tracing_subscriber::{
fmt::{self, format::FmtSpan},
layer::SubscriberExt,
util::SubscriberInitExt,
EnvFilter,
};
/// Global instance identifier, resolved once at startup.
/// Priority: `INSTANCE_ID` env var → system hostname → `"unknown"`.
pub static INSTANCE_ID: Lazy<String> = Lazy::new(|| {
std::env::var("INSTANCE_ID")
.ok()
.filter(|s| !s.is_empty())
.or_else(|| {
hostname::get()
.ok()
.and_then(|h| h.into_string().ok())
.filter(|s| !s.is_empty())
})
.unwrap_or_else(|| "unknown".to_string())
});
/// Returns the platform-wide instance identifier for this process.
pub fn instance_id() -> String {
INSTANCE_ID.clone()
}
/// Initialises the global tracing subscriber with JSON-formatted output to stderr.
///
/// Each JSON line includes `ts`, `level`, `target` (module), `fields` (structured kv),
/// `line`, `file`, and `instance_id`.
/// `RUST_LOG` env var controls the log level filter.
pub fn init_tracing_subscriber(level: &str) {
let env_filter = EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::from_str(level))
.expect("invalid log level");
let fmt_layer = fmt::layer()
.json()
.with_target(true)
.with_thread_ids(false)
.with_file(true)
.with_line_number(true)
.with_span_events(FmtSpan::CLOSE)
.flatten_event(true);
let registry = tracing_subscriber::registry()
.with(env_filter)
.with(fmt_layer);
// try_init only fails if a global is already set — this is safe when
// init_otlp() is also called (it rebuilds the subscriber with OTLP layers).
let _ = registry.try_init();
}

View File

@ -0,0 +1,83 @@
//! Actix-web middleware that creates a `tracing::Span` for each HTTP request.
//!
//! The span is set as the current context so all downstream async code is
//! automatically instrumented. When the `tracing_opentelemetry` layer is
//! active, spans are exported via OTLP.
use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform};
use std::sync::Arc;
use std::task::{Context, Poll};
/// Actix-web middleware that creates a tracing span per request.
pub struct TracingSpanMiddleware;
impl TracingSpanMiddleware {
pub fn new() -> Self {
Self
}
}
impl Default for TracingSpanMiddleware {
fn default() -> Self {
Self::new()
}
}
impl<S, B> Transform<S, ServiceRequest> for TracingSpanMiddleware
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error> + 'static,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = actix_web::Error;
type Transform = TracingSpanMiddlewareService<S>;
type InitError = ();
type Future = std::future::Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
std::future::ready(Ok(TracingSpanMiddlewareService {
service: Arc::new(service),
}))
}
}
pub struct TracingSpanMiddlewareService<S> {
service: Arc<S>,
}
impl<S> Clone for TracingSpanMiddlewareService<S> {
fn clone(&self) -> Self {
Self {
service: self.service.clone(),
}
}
}
impl<S, B> Service<ServiceRequest> for TracingSpanMiddlewareService<S>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error> + 'static,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = actix_web::Error;
type Future = std::pin::Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + 'static>>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&self, req: ServiceRequest) -> Self::Future {
let method = req.method().to_string();
let path = req.path().to_string();
let service = self.service.clone();
Box::pin(async move {
let span = tracing::info_span!("HTTP {method} {path}", method = %method, path = %path);
let _guard = span.enter();
let res = service.call(req).await?;
Ok(res)
})
}
}