95 lines
3.1 KiB
Rust
95 lines
3.1 KiB
Rust
use std::sync::Arc;
|
|
|
|
use track::{CounterVec, Gauge};
|
|
|
|
#[derive(Clone)]
|
|
pub struct ChannelMetrics {
|
|
pub messages_sent: Arc<std::sync::atomic::AtomicU64>,
|
|
pub messages_received: Arc<std::sync::atomic::AtomicU64>,
|
|
pub messages_failed: Arc<std::sync::atomic::AtomicU64>,
|
|
pub active_connections: Arc<std::sync::atomic::AtomicI64>,
|
|
events_total: Option<CounterVec>,
|
|
active_connections_gauge: Option<Gauge>,
|
|
}
|
|
|
|
impl ChannelMetrics {
|
|
pub fn new(registry: Option<track::MetricsRegistry>) -> Self {
|
|
let events_total = registry.as_ref().and_then(|registry| {
|
|
registry
|
|
.register_counter_vec(
|
|
"channel_events_total",
|
|
"Total channel socket and message events",
|
|
&["event"],
|
|
)
|
|
.map_err(|error| {
|
|
tracing::warn!(%error, "failed to register channel_events_total");
|
|
error
|
|
})
|
|
.ok()
|
|
});
|
|
let active_connections_gauge = registry.as_ref().and_then(|registry| {
|
|
registry
|
|
.register_gauge(
|
|
"channel_active_connections",
|
|
"Current active channel socket connections",
|
|
)
|
|
.map_err(|error| {
|
|
tracing::warn!(%error, "failed to register channel_active_connections");
|
|
error
|
|
})
|
|
.ok()
|
|
});
|
|
|
|
Self {
|
|
messages_sent: Arc::new(std::sync::atomic::AtomicU64::new(0)),
|
|
messages_received: Arc::new(std::sync::atomic::AtomicU64::new(0)),
|
|
messages_failed: Arc::new(std::sync::atomic::AtomicU64::new(0)),
|
|
active_connections: Arc::new(std::sync::atomic::AtomicI64::new(0)),
|
|
events_total,
|
|
active_connections_gauge,
|
|
}
|
|
}
|
|
|
|
pub fn increment_sent(&self) {
|
|
self.messages_sent
|
|
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
|
self.record_event("sent");
|
|
}
|
|
|
|
pub fn increment_received(&self) {
|
|
self.messages_received
|
|
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
|
self.record_event("received");
|
|
}
|
|
|
|
pub fn increment_failed(&self) {
|
|
self.messages_failed
|
|
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
|
self.record_event("failed");
|
|
}
|
|
|
|
pub fn increment_connections(&self) {
|
|
self.active_connections
|
|
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
|
if let Some(gauge) = &self.active_connections_gauge {
|
|
gauge.inc();
|
|
}
|
|
self.record_event("connected");
|
|
}
|
|
|
|
pub fn decrement_connections(&self) {
|
|
self.active_connections
|
|
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
|
|
if let Some(gauge) = &self.active_connections_gauge {
|
|
gauge.dec();
|
|
}
|
|
self.record_event("disconnected");
|
|
}
|
|
|
|
fn record_event(&self, event: &str) {
|
|
if let Some(counter) = &self.events_total {
|
|
counter.with_label_values(&[event]).inc();
|
|
}
|
|
}
|
|
}
|