gitdataai/libs/gingress-proxy/src/observability.rs

83 lines
2.7 KiB
Rust

//! Observability integration for the GIngress proxy.
//!
//! Reuses the workspace `observability` lib for tracing initialization,
//! and adds GIngress-specific Prometheus metrics.
use std::sync::Arc;
/// GIngress-specific HTTP metrics.
///
/// Extends the workspace `observability::HttpMetrics` with ingress-specific counters.
pub struct IngressMetrics {
/// Total requests per host
pub requests_total: Arc<dashmap::DashMap<String, u64>>,
/// Active WebSocket connections
pub ws_connections_active: Arc<std::sync::atomic::AtomicU64>,
/// Upstream health status (0 = unhealthy, 1 = healthy)
pub upstream_health: Arc<dashmap::DashMap<String, u8>>,
/// TLS certificate expiry timestamps
pub tls_cert_expiry: Arc<dashmap::DashMap<String, i64>>,
}
impl IngressMetrics {
pub fn new() -> Self {
Self {
requests_total: Arc::new(dashmap::DashMap::new()),
ws_connections_active: Arc::new(std::sync::atomic::AtomicU64::new(0)),
upstream_health: Arc::new(dashmap::DashMap::new()),
tls_cert_expiry: Arc::new(dashmap::DashMap::new()),
}
}
/// Record a request for a given host and status code.
pub fn record_request(&self, host: &str, _status: u16) {
self.requests_total
.entry(host.to_string())
.and_modify(|c| *c += 1)
.or_insert(1);
}
/// Record WebSocket connection opened.
pub fn ws_open(&self) {
self.ws_connections_active
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
/// Record WebSocket connection closed.
pub fn ws_close(&self) {
self.ws_connections_active
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
}
/// Update upstream health status.
pub fn set_upstream_health(&self, upstream: &str, healthy: bool) {
self.upstream_health
.insert(upstream.to_string(), if healthy { 1 } else { 0 });
}
/// Record TLS cert expiry time.
pub fn set_cert_expiry(&self, host: &str, expiry_unix: i64) {
self.tls_cert_expiry.insert(host.to_string(), expiry_unix);
}
}
impl Default for IngressMetrics {
fn default() -> Self {
Self::new()
}
}
/// Initialize tracing via the workspace observability lib.
pub fn init_tracing(level: &str, otel_enabled: bool) {
observability::init_tracing_subscriber(level, otel_enabled);
}
/// Initialize OTLP export for distributed tracing.
pub fn init_otlp(
endpoint: &str,
service_name: &str,
) -> anyhow::Result<Option<observability::OtelGuard>> {
observability::init_otlp(endpoint, service_name, "0.1.0", "info")
.map_err(|e| anyhow::anyhow!("OTLP init failed: {}", e))
}