gitdataai/libs/observability/src/prometheus_exporter.rs
ZhenYi 962bf0312d feat(observability): Phase 6 OTLP tracing + Prometheus metrics endpoint
OTLP tracing:
- libs/observability/otlp.rs: SdkTracerProvider via HTTP/proto OTLP exporter
- libs/observability/tracing_middleware.rs: Actix-web span with trace_id propagation
- libs/observability/tracing_fmt.rs: JSON fmt + registry.try_init for layered init
- libs/rpc: gRPC method spans via info_span
- libs/agent, libs/room, libs/service, libs/api: structured tracing throughout

Prometheus metrics:
- libs/observability/prometheus_exporter.rs: /metrics HTTP handler + metrics crate
- libs/observability/metrics_middleware.rs: HttpMetrics middleware + AtomicU64
- libs/observability/redis_metrics.rs: Redis counter poller via RedisMetrics
- libs/room/metrics.rs: RoomMetrics (connections, messages, presence counters)

Config env vars: APP_OTEL_ENABLED, APP_OTEL_ENDPOINT, APP_OTEL_SERVICE_NAME
2026-04-22 10:27:54 +08:00

157 lines
6.2 KiB
Rust

//! Prometheus metrics exporter.
//!
//! Exposes `HttpMetrics` (AtomicU64) and `RoomMetrics` (`metrics` crate) via a
//! Prometheus-compatible `/metrics` endpoint.
//!
//! Usage:
//! let recorder_handle = install_recorder(); // returns PrometheusHandle
//! let guard = Arc::new(RwLock::new(HttpMetricsSnapshot::default()));
//! spawn_http_metrics_poller(metrics.clone(), guard.clone(), Duration::from_secs(15));
//! // Register /metrics route with prometheus_handler
//!
//! `RoomMetrics` must be constructed **after** `install_recorder()` is called,
//! because its `register_*` macro calls require a global recorder to be set.
use actix_web::{web, HttpRequest, HttpResponse};
use metrics::{describe_counter, Unit};
use metrics_exporter_prometheus::PrometheusBuilder;
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock};
/// Installs the global `metrics` crate recorder as a Prometheus exporter.
///
/// Returns a `PrometheusHandle` for rendering the `/metrics` endpoint.
/// **Must be called before any `metrics::register_*` macro is invoked.**
pub fn install_recorder() -> metrics_exporter_prometheus::PrometheusHandle {
// Register AI metrics descriptions so they appear in the /metrics output
// even before any calls have been made.
describe_counter!("ai_calls_total", Unit::Count, "Total AI chat completion calls");
describe_counter!("ai_calls_success", Unit::Count, "Successful AI calls");
describe_counter!("ai_calls_failure", Unit::Count, "Failed AI calls");
describe_counter!("ai_input_tokens_total", Unit::Count, "Total input tokens consumed");
describe_counter!("ai_output_tokens_total", Unit::Count, "Total output tokens generated");
describe_counter!("ai_function_calls_total", Unit::Count, "Total AI function/tool calls");
let recorder = PrometheusBuilder::new()
.build_recorder();
let handle = recorder.handle();
metrics::set_global_recorder(recorder)
.expect("failed to set global metrics recorder");
handle
}
/// Parses Prometheus text exposition format into a flat map of metric name → value.
/// Labels are discarded (only the last value for each name is kept).
pub fn render_to_hashmap(body: &str) -> HashMap<String, serde_json::Value> {
let mut out = HashMap::new();
for line in body.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
// Prometheus format: METRIC_NAME{labels} VALUE
// or: METRIC_NAME VALUE
if let Some(brace_pos) = line.find('{') {
let name = &line[..brace_pos];
if let Some(space_pos) = line[brace_pos..].find(' ') {
let value_str = &line[brace_pos + space_pos + 1..];
if let Ok(v) = value_str.parse::<f64>() {
out.insert(name.to_string(), serde_json::json!(v));
}
}
} else if let Some(last_space) = line.rfind(' ') {
let name = &line[..last_space];
let value_str = &line[last_space + 1..];
if let Ok(v) = value_str.parse::<f64>() {
out.insert(name.to_string(), serde_json::json!(v));
}
}
}
out
}
/// Re-export `HttpMetrics` so callers don't need to import from `metrics_middleware`.
pub use crate::metrics_middleware::HttpMetrics;
/// Live HTTP metric values updated by the background poller.
#[derive(Debug, Clone, Default)]
pub struct HttpMetricsSnapshot {
pub request_count: u64,
pub total_duration_ms: u64,
pub status_2xx: u64,
pub status_4xx: u64,
pub status_5xx: u64,
}
/// `Arc<RwLock<HttpMetricsSnapshot>>` shared between the background poller
/// and the HTTP handler.
pub type HttpSnapshotGuard = Arc<RwLock<HttpMetricsSnapshot>>;
/// Starts a background task that snapshots `HttpMetrics` atomics and stores them
/// in `guard` at `interval`.
pub fn spawn_http_metrics_poller(
metrics: Arc<HttpMetrics>,
guard: HttpSnapshotGuard,
interval: std::time::Duration,
) {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
loop {
ticker.tick().await;
let snapshot = HttpMetricsSnapshot {
request_count: metrics.request_count.load(Ordering::Relaxed),
total_duration_ms: metrics.total_duration_ms.load(Ordering::Relaxed),
status_2xx: metrics.status_2xx.load(Ordering::Relaxed),
status_4xx: metrics.status_4xx.load(Ordering::Relaxed),
status_5xx: metrics.status_5xx.load(Ordering::Relaxed),
};
if let Ok(mut g) = guard.write() {
*g = snapshot;
}
}
});
}
/// Prometheus exposition handler.
///
/// Requires `web::Data<HttpSnapshotGuard>` injected via `.app_data()`.
pub async fn prometheus_handler(
_: HttpRequest,
snapshot: web::Data<HttpSnapshotGuard>,
handle: actix_web::web::Data<metrics_exporter_prometheus::PrometheusHandle>,
) -> HttpResponse {
// Render all `metrics`-crate metrics (RoomMetrics etc.) registered with the
// global recorder installed by `install_recorder()`.
let body = handle.render();
// Append live HttpMetrics from the shared snapshot.
let snap = snapshot.read().expect("metrics snapshot lock poisoned");
let http_extra = format!(
concat!(
"# TYPE http_requests_total counter\n",
"http_requests_total{{service=\"app\",protocol=\"HTTP\"}} {}\n",
"# TYPE http_request_duration_ms_total counter\n",
"http_request_duration_ms_total{{service=\"app\"}} {}\n",
"# TYPE http_requests_by_status_class gauge\n",
"http_requests_by_status_class{{service=\"app\",status_class=\"2xx\"}} {}\n",
"http_requests_by_status_class{{service=\"app\",status_class=\"4xx\"}} {}\n",
"http_requests_by_status_class{{service=\"app\",status_class=\"5xx\"}} {}\n",
),
snap.request_count,
snap.total_duration_ms,
snap.status_2xx,
snap.status_4xx,
snap.status_5xx,
);
let combined = format!("{}{}", body, http_extra);
HttpResponse::Ok()
.content_type("text/plain; version=0.0.4; charset=utf-8")
.body(combined)
}