gitdataai/libs/room/src/metrics.rs
ZhenYi 962bf0312d feat(observability): Phase 6 OTLP tracing + Prometheus metrics endpoint
OTLP tracing:
- libs/observability/otlp.rs: SdkTracerProvider via HTTP/proto OTLP exporter
- libs/observability/tracing_middleware.rs: Actix-web span with trace_id propagation
- libs/observability/tracing_fmt.rs: JSON fmt + registry.try_init for layered init
- libs/rpc: gRPC method spans via info_span
- libs/agent, libs/room, libs/service, libs/api: structured tracing throughout

Prometheus metrics:
- libs/observability/prometheus_exporter.rs: /metrics HTTP handler + metrics crate
- libs/observability/metrics_middleware.rs: HttpMetrics middleware + AtomicU64
- libs/observability/redis_metrics.rs: Redis counter poller via RedisMetrics
- libs/room/metrics.rs: RoomMetrics (connections, messages, presence counters)

Config env vars: APP_OTEL_ENABLED, APP_OTEL_ENDPOINT, APP_OTEL_SERVICE_NAME
2026-04-22 10:27:54 +08:00

234 lines
10 KiB
Rust

use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use metrics::{describe_counter, describe_gauge, describe_histogram, Counter, Gauge, Histogram, Unit};
use uuid::Uuid;
pub struct RoomMetrics {
pub rooms_online: Gauge,
pub users_online: Gauge,
pub ws_connections_active: Gauge,
pub ws_connections_total: Counter,
pub ws_disconnections_total: Counter,
pub messages_sent: Counter,
pub messages_persisted: Counter,
pub messages_persist_failed: Counter,
pub broadcasts_sent: Counter,
pub broadcasts_dropped: Counter,
pub duplicates_skipped: Counter,
pub redis_publish_failed: Counter,
pub message_latency_ms: Histogram,
pub ws_rate_limit_hits: Counter,
pub ws_auth_failures: Counter,
pub ws_heartbeat_sent_total: Counter,
pub ws_heartbeat_timeout_total: Counter,
pub ws_idle_timeout_total: Counter,
// Atomic backing for snapshot reads (all values stored as f64 for gauges, u64 for counters)
pub _rooms_online_val: AtomicU64,
pub _users_online_val: AtomicU64,
pub _ws_connections_active_val: AtomicU64,
pub _ws_connections_total_val: AtomicU64,
pub _ws_disconnections_total_val: AtomicU64,
pub _messages_sent_val: AtomicU64,
pub _messages_persisted_val: AtomicU64,
pub _messages_persist_failed_val: AtomicU64,
pub _broadcasts_sent_val: AtomicU64,
pub _broadcasts_dropped_val: AtomicU64,
pub _duplicates_skipped_val: AtomicU64,
pub _redis_publish_failed_val: AtomicU64,
pub _ws_rate_limit_hits_val: AtomicU64,
pub _ws_auth_failures_val: AtomicU64,
pub _ws_heartbeat_sent_total_val: AtomicU64,
pub _ws_heartbeat_timeout_total_val: AtomicU64,
pub _ws_idle_timeout_total_val: AtomicU64,
}
impl Default for RoomMetrics {
fn default() -> Self {
describe_gauge!("room_online_rooms", "Number of rooms with active workers");
describe_gauge!(
"room_online_users",
"Total number of online WebSocket users"
);
describe_gauge!(
"room_ws_connections_active",
"Current number of active WebSocket connections"
);
describe_counter!(
"room_ws_connections_total",
Unit::Count,
"Total WebSocket connections established"
);
describe_counter!(
"room_ws_disconnections_total",
Unit::Count,
"Total WebSocket disconnections"
);
describe_counter!(
"room_messages_sent_total",
Unit::Count,
"Total messages sent to rooms"
);
describe_counter!(
"room_messages_persisted_total",
Unit::Count,
"Total messages persisted to database"
);
describe_counter!(
"room_messages_persist_failed_total",
Unit::Count,
"Total message persistence failures"
);
describe_counter!(
"room_broadcasts_sent_total",
Unit::Count,
"Total WebSocket broadcasts sent"
);
describe_counter!(
"room_duplicates_skipped_total",
Unit::Count,
"Total duplicate messages skipped (idempotency)"
);
describe_counter!(
"room_redis_publish_failed_total",
Unit::Count,
"Total Redis publish failures"
);
describe_histogram!(
"room_message_latency_ms",
Unit::Milliseconds,
"Message processing latency from publish to persist"
);
describe_counter!(
"room_ws_rate_limit_hits_total",
Unit::Count,
"Total WebSocket rate limit rejections"
);
describe_counter!(
"room_ws_auth_failures_total",
Unit::Count,
"Total WebSocket authentication/authorization failures"
);
describe_counter!(
"room_ws_heartbeat_sent_total",
Unit::Count,
"Total WebSocket heartbeat pings sent by server"
);
describe_counter!(
"room_ws_heartbeat_timeout_total",
Unit::Count,
"Total WebSocket connections closed due to heartbeat timeout"
);
describe_counter!(
"room_ws_idle_timeout_total",
Unit::Count,
"Total WebSocket connections closed due to idle timeout"
);
describe_counter!(
"room_broadcasts_dropped_total",
Unit::Count,
"Total broadcasts dropped due to channel full"
);
Self {
rooms_online: metrics::gauge!("room_online_rooms"),
users_online: metrics::gauge!("room_online_users"),
ws_connections_active: metrics::gauge!("room_ws_connections_active"),
ws_connections_total: metrics::counter!("room_ws_connections_total"),
ws_disconnections_total: metrics::counter!("room_ws_disconnections_total"),
messages_sent: metrics::counter!("room_messages_sent_total"),
messages_persisted: metrics::counter!("room_messages_persisted_total"),
messages_persist_failed: metrics::counter!("room_messages_persist_failed_total"),
broadcasts_sent: metrics::counter!("room_broadcasts_sent_total"),
broadcasts_dropped: metrics::counter!("room_broadcasts_dropped_total"),
duplicates_skipped: metrics::counter!("room_duplicates_skipped_total"),
redis_publish_failed: metrics::counter!("room_redis_publish_failed_total"),
message_latency_ms: metrics::histogram!("room_message_latency_ms"),
ws_rate_limit_hits: metrics::counter!("room_ws_rate_limit_hits_total"),
ws_auth_failures: metrics::counter!("room_ws_auth_failures_total"),
ws_heartbeat_sent_total: metrics::counter!("room_ws_heartbeat_sent_total"),
ws_heartbeat_timeout_total: metrics::counter!("room_ws_heartbeat_timeout_total"),
ws_idle_timeout_total: metrics::counter!("room_ws_idle_timeout_total"),
_rooms_online_val: AtomicU64::new(0),
_users_online_val: AtomicU64::new(0),
_ws_connections_active_val: AtomicU64::new(0),
_ws_connections_total_val: AtomicU64::new(0),
_ws_disconnections_total_val: AtomicU64::new(0),
_messages_sent_val: AtomicU64::new(0),
_messages_persisted_val: AtomicU64::new(0),
_messages_persist_failed_val: AtomicU64::new(0),
_broadcasts_sent_val: AtomicU64::new(0),
_broadcasts_dropped_val: AtomicU64::new(0),
_duplicates_skipped_val: AtomicU64::new(0),
_redis_publish_failed_val: AtomicU64::new(0),
_ws_rate_limit_hits_val: AtomicU64::new(0),
_ws_auth_failures_val: AtomicU64::new(0),
_ws_heartbeat_sent_total_val: AtomicU64::new(0),
_ws_heartbeat_timeout_total_val: AtomicU64::new(0),
_ws_idle_timeout_total_val: AtomicU64::new(0),
}
}
}
impl RoomMetrics {
pub fn new() -> Self {
Self::default()
}
pub fn record_message_latency(&self, ms: f64) {
self.message_latency_ms.record(ms);
}
pub fn incr_duplicates_skipped(&self) {
self.duplicates_skipped.increment(1);
}
pub async fn incr_room_connections(&self, room_id: Uuid) {
let name = format!("room_connections{{room_id=\"{}\"}}", room_id);
metrics::gauge!(name).increment(1.0);
}
pub async fn dec_room_connections(&self, room_id: Uuid) {
let name = format!("room_connections{{room_id=\"{}\"}}", room_id);
metrics::gauge!(name).decrement(1.0);
}
pub async fn incr_room_messages(&self, room_id: Uuid) {
let name = format!("room_messages_total{{room_id=\"{}\"}}", room_id);
metrics::counter!(name).increment(1);
}
#[allow(dead_code)]
pub async fn cleanup_stale_rooms(&self, _active_room_ids: &[Uuid]) {
// Per-room metrics are registered on-demand; no cleanup needed.
}
pub fn into_arc(self) -> Arc<RoomMetrics> {
Arc::new(self)
}
/// Returns a snapshot of all current gauge and counter values as a flat map.
pub fn snapshot(&self) -> HashMap<String, serde_json::Value> {
let mut m = HashMap::new();
m.insert("room_online_rooms".into(), serde_json::json!(self._rooms_online_val.load(Ordering::Relaxed) as f64));
m.insert("room_online_users".into(), serde_json::json!(self._users_online_val.load(Ordering::Relaxed) as f64));
m.insert("room_ws_connections_active".into(), serde_json::json!(self._ws_connections_active_val.load(Ordering::Relaxed) as f64));
m.insert("room_ws_connections_total".into(), serde_json::json!(self._ws_connections_total_val.load(Ordering::Relaxed)));
m.insert("room_ws_disconnections_total".into(), serde_json::json!(self._ws_disconnections_total_val.load(Ordering::Relaxed)));
m.insert("room_messages_sent_total".into(), serde_json::json!(self._messages_sent_val.load(Ordering::Relaxed)));
m.insert("room_messages_persisted_total".into(), serde_json::json!(self._messages_persisted_val.load(Ordering::Relaxed)));
m.insert("room_messages_persist_failed_total".into(), serde_json::json!(self._messages_persist_failed_val.load(Ordering::Relaxed)));
m.insert("room_broadcasts_sent_total".into(), serde_json::json!(self._broadcasts_sent_val.load(Ordering::Relaxed)));
m.insert("room_broadcasts_dropped_total".into(), serde_json::json!(self._broadcasts_dropped_val.load(Ordering::Relaxed)));
m.insert("room_duplicates_skipped_total".into(), serde_json::json!(self._duplicates_skipped_val.load(Ordering::Relaxed)));
m.insert("room_redis_publish_failed_total".into(), serde_json::json!(self._redis_publish_failed_val.load(Ordering::Relaxed)));
m.insert("room_ws_rate_limit_hits_total".into(), serde_json::json!(self._ws_rate_limit_hits_val.load(Ordering::Relaxed)));
m.insert("room_ws_auth_failures_total".into(), serde_json::json!(self._ws_auth_failures_val.load(Ordering::Relaxed)));
m.insert("room_ws_heartbeat_sent_total".into(), serde_json::json!(self._ws_heartbeat_sent_total_val.load(Ordering::Relaxed)));
m.insert("room_ws_heartbeat_timeout_total".into(), serde_json::json!(self._ws_heartbeat_timeout_total_val.load(Ordering::Relaxed)));
m.insert("room_ws_idle_timeout_total".into(), serde_json::json!(self._ws_idle_timeout_total_val.load(Ordering::Relaxed)));
m
}
}