OTLP tracing: - libs/observability/otlp.rs: SdkTracerProvider via HTTP/proto OTLP exporter - libs/observability/tracing_middleware.rs: Actix-web span with trace_id propagation - libs/observability/tracing_fmt.rs: JSON fmt + registry.try_init for layered init - libs/rpc: gRPC method spans via info_span - libs/agent, libs/room, libs/service, libs/api: structured tracing throughout Prometheus metrics: - libs/observability/prometheus_exporter.rs: /metrics HTTP handler + metrics crate - libs/observability/metrics_middleware.rs: HttpMetrics middleware + AtomicU64 - libs/observability/redis_metrics.rs: Redis counter poller via RedisMetrics - libs/room/metrics.rs: RoomMetrics (connections, messages, presence counters) Config env vars: APP_OTEL_ENABLED, APP_OTEL_ENDPOINT, APP_OTEL_SERVICE_NAME
463 lines
16 KiB
Rust
463 lines
16 KiB
Rust
//! Tonic gRPC server implementation for SessionAdmin service.
|
|
|
|
use session_manager::SessionManager;
|
|
use std::net::SocketAddr;
|
|
use tokio::sync::broadcast;
|
|
use tonic::{transport::Server, Request, Response, Status};
|
|
use tracing::{info_span, Instrument};
|
|
|
|
use super::generated::admin::{
|
|
GetMetricsRequest, GetMetricsResponse, GetUserInfoRequest, GetUserInfoResponse,
|
|
GetUserStatusRequest, GetUserStatusResponse, GetWorkspaceOnlineUsersRequest,
|
|
GetWorkspaceOnlineUsersResponse, InstanceMetrics, IsUserOnlineRequest, IsUserOnlineResponse,
|
|
KickUserFromWorkspaceRequest, KickUserFromWorkspaceResponse, KickUserRequest, KickUserResponse,
|
|
ListUserSessionsRequest, ListUserSessionsResponse, ListWorkspaceSessionsRequest,
|
|
ListWorkspaceSessionsResponse, ExportMetricsCsvRequest, ExportMetricsCsvResponse,
|
|
};
|
|
use super::generated::admin_session_admin::session_admin_server::{
|
|
SessionAdmin, SessionAdminServer,
|
|
};
|
|
use super::types::{parse_uuid, to_proto_info, to_proto_session, to_proto_status};
|
|
|
|
#[derive(Clone)]
|
|
pub struct SessionAdminService {
|
|
session_manager: SessionManager,
|
|
}
|
|
|
|
impl SessionAdminService {
|
|
pub fn new(session_manager: SessionManager) -> Self {
|
|
Self { session_manager }
|
|
}
|
|
}
|
|
|
|
#[tonic::async_trait]
|
|
impl SessionAdmin for SessionAdminService {
|
|
async fn list_workspace_sessions(
|
|
&self,
|
|
req: Request<ListWorkspaceSessionsRequest>,
|
|
) -> Result<Response<ListWorkspaceSessionsResponse>, Status> {
|
|
let workspace_id = parse_uuid(&req.get_ref().workspace_id)
|
|
.ok_or_else(|| Status::invalid_argument("invalid workspace_id"))?;
|
|
|
|
async {
|
|
let sessions = self
|
|
.session_manager
|
|
.get_workspace_sessions(&workspace_id)
|
|
.await
|
|
.map_err(|e| Status::internal(e.to_string()))?;
|
|
|
|
let sessions = sessions.iter().map(to_proto_session).collect();
|
|
Ok(Response::new(ListWorkspaceSessionsResponse { sessions }))
|
|
}
|
|
.instrument(info_span!("list_workspace_sessions", workspace_id = %workspace_id))
|
|
.await
|
|
}
|
|
|
|
async fn list_user_sessions(
|
|
&self,
|
|
req: Request<ListUserSessionsRequest>,
|
|
) -> Result<Response<ListUserSessionsResponse>, Status> {
|
|
let user_id = parse_uuid(&req.get_ref().user_id)
|
|
.ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
|
|
|
|
async {
|
|
let sessions = self
|
|
.session_manager
|
|
.get_user_sessions(&user_id)
|
|
.await
|
|
.map_err(|e| Status::internal(e.to_string()))?;
|
|
|
|
let sessions = sessions.iter().map(to_proto_session).collect();
|
|
Ok(Response::new(ListUserSessionsResponse { sessions }))
|
|
}
|
|
.instrument(info_span!("list_user_sessions", user_id = %user_id))
|
|
.await
|
|
}
|
|
|
|
async fn kick_user_from_workspace(
|
|
&self,
|
|
req: Request<KickUserFromWorkspaceRequest>,
|
|
) -> Result<Response<KickUserFromWorkspaceResponse>, Status> {
|
|
let r = req.get_ref();
|
|
let user_id =
|
|
parse_uuid(&r.user_id).ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
|
|
let workspace_id = parse_uuid(&r.workspace_id)
|
|
.ok_or_else(|| Status::invalid_argument("invalid workspace_id"))?;
|
|
|
|
async {
|
|
let kicked_count = self
|
|
.session_manager
|
|
.kick_user_from_workspace(&user_id, &workspace_id)
|
|
.await
|
|
.map_err(|e| Status::internal(e.to_string()))?;
|
|
|
|
Ok(Response::new(KickUserFromWorkspaceResponse {
|
|
kicked_count: kicked_count as u32,
|
|
}))
|
|
}
|
|
.instrument(info_span!("kick_user_from_workspace", user_id = %user_id, workspace_id = %workspace_id))
|
|
.await
|
|
}
|
|
|
|
async fn kick_user(
|
|
&self,
|
|
req: Request<KickUserRequest>,
|
|
) -> Result<Response<KickUserResponse>, Status> {
|
|
let user_id = parse_uuid(&req.get_ref().user_id)
|
|
.ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
|
|
|
|
async {
|
|
let kicked_count = self
|
|
.session_manager
|
|
.kick_user(&user_id)
|
|
.await
|
|
.map_err(|e| Status::internal(e.to_string()))?;
|
|
|
|
Ok(Response::new(KickUserResponse {
|
|
kicked_count: kicked_count as u32,
|
|
}))
|
|
}
|
|
.instrument(info_span!("kick_user", user_id = %user_id))
|
|
.await
|
|
}
|
|
|
|
async fn get_user_status(
|
|
&self,
|
|
req: Request<GetUserStatusRequest>,
|
|
) -> Result<Response<GetUserStatusResponse>, Status> {
|
|
let user_id = parse_uuid(&req.get_ref().user_id)
|
|
.ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
|
|
|
|
async {
|
|
let status = self
|
|
.session_manager
|
|
.get_user_status(&user_id)
|
|
.await
|
|
.map_err(|e| Status::internal(e.to_string()))?;
|
|
|
|
Ok(Response::new(GetUserStatusResponse {
|
|
status: to_proto_status(status) as i32,
|
|
}))
|
|
}
|
|
.instrument(info_span!("get_user_status", user_id = %user_id))
|
|
.await
|
|
}
|
|
|
|
async fn get_user_info(
|
|
&self,
|
|
req: Request<GetUserInfoRequest>,
|
|
) -> Result<Response<GetUserInfoResponse>, Status> {
|
|
let user_id = parse_uuid(&req.get_ref().user_id)
|
|
.ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
|
|
|
|
async {
|
|
let info = self
|
|
.session_manager
|
|
.get_user_info(&user_id)
|
|
.await
|
|
.map_err(|e| Status::internal(e.to_string()))?;
|
|
|
|
Ok(Response::new(GetUserInfoResponse {
|
|
info: info.as_ref().map(to_proto_info),
|
|
}))
|
|
}
|
|
.instrument(info_span!("get_user_info", user_id = %user_id))
|
|
.await
|
|
}
|
|
|
|
async fn get_workspace_online_users(
|
|
&self,
|
|
req: Request<GetWorkspaceOnlineUsersRequest>,
|
|
) -> Result<Response<GetWorkspaceOnlineUsersResponse>, Status> {
|
|
let workspace_id = parse_uuid(&req.get_ref().workspace_id)
|
|
.ok_or_else(|| Status::invalid_argument("invalid workspace_id"))?;
|
|
|
|
async {
|
|
let user_ids = self
|
|
.session_manager
|
|
.get_workspace_online_users(&workspace_id)
|
|
.await
|
|
.map_err(|e| Status::internal(e.to_string()))?;
|
|
|
|
let user_ids = user_ids.iter().map(|u| u.to_string()).collect();
|
|
Ok(Response::new(GetWorkspaceOnlineUsersResponse { user_ids }))
|
|
}
|
|
.instrument(info_span!("get_workspace_online_users", workspace_id = %workspace_id))
|
|
.await
|
|
}
|
|
|
|
async fn is_user_online(
|
|
&self,
|
|
req: Request<IsUserOnlineRequest>,
|
|
) -> Result<Response<IsUserOnlineResponse>, Status> {
|
|
let user_id = parse_uuid(&req.get_ref().user_id)
|
|
.ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
|
|
|
|
async {
|
|
let online = self
|
|
.session_manager
|
|
.is_user_online(&user_id)
|
|
.await
|
|
.map_err(|e| Status::internal(e.to_string()))?;
|
|
|
|
Ok(Response::new(IsUserOnlineResponse { online }))
|
|
}
|
|
.instrument(info_span!("is_user_online", user_id = %user_id))
|
|
.await
|
|
}
|
|
|
|
async fn get_metrics(
|
|
&self,
|
|
req: Request<GetMetricsRequest>,
|
|
) -> Result<Response<GetMetricsResponse>, Status> {
|
|
let r = req.get_ref();
|
|
let instance_filter = r.instance_filter.as_str();
|
|
let limit = if r.limit > 0 { r.limit as usize } else { 100 };
|
|
|
|
async {
|
|
let pool = self.session_manager.pool();
|
|
let instances = query_all_instance_metrics(pool, instance_filter, limit).await
|
|
.map_err(|e| Status::internal(e.to_string()))?;
|
|
Ok(Response::new(GetMetricsResponse { instances }))
|
|
}
|
|
.instrument(info_span!("get_metrics"))
|
|
.await
|
|
}
|
|
|
|
async fn export_metrics_csv(
|
|
&self,
|
|
req: Request<ExportMetricsCsvRequest>,
|
|
) -> Result<Response<ExportMetricsCsvResponse>, Status> {
|
|
let instance_filter = req.get_ref().instance_filter.as_str();
|
|
|
|
async {
|
|
let pool = self.session_manager.pool();
|
|
let csv = export_all_metrics_csv(pool, instance_filter).await
|
|
.map_err(|e| Status::internal(e.to_string()))?;
|
|
Ok(Response::new(ExportMetricsCsvResponse { csv }))
|
|
}
|
|
.instrument(info_span!("export_metrics_csv"))
|
|
.await
|
|
}
|
|
}
|
|
|
|
/// Default gRPC admin port.
|
|
pub const DEFAULT_GRPC_PORT: u16 = 9090;
|
|
|
|
/// Start the Tonic gRPC server on the given address.
|
|
pub async fn serve(
|
|
addr: SocketAddr,
|
|
session_manager: SessionManager,
|
|
) -> anyhow::Result<()> {
|
|
let service = SessionAdminService::new(session_manager);
|
|
let incoming = tonic::transport::server::TcpIncoming::bind(addr)
|
|
.map_err(|e| anyhow::anyhow!("failed to bind TcpIncoming: {}", e))?;
|
|
|
|
tracing::info!(addr = %addr, "Admin gRPC server listening");
|
|
|
|
Server::builder()
|
|
.add_service(SessionAdminServer::new(service))
|
|
.serve_with_incoming(incoming)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Spawn the gRPC server as a background task.
|
|
pub fn spawn(
|
|
addr: SocketAddr,
|
|
session_manager: SessionManager,
|
|
mut shutdown_rx: broadcast::Receiver<()>,
|
|
) -> tokio::task::JoinHandle<()> {
|
|
tokio::spawn(async move {
|
|
let result = serve(addr, session_manager).await;
|
|
if let Err(e) = result {
|
|
tracing::error!(error = %e, "Admin gRPC server error");
|
|
}
|
|
|
|
let _ = shutdown_rx.recv().await;
|
|
})
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Metrics helpers — mirror of observability::redis_metrics (daily hash format)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
use deadpool_redis::cluster::Pool;
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
/// Snapshot stored in Redis under `metrics:{instance}:{YYYY-MM-DD}` (hash).
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
struct MetricsSnapshot {
|
|
instance_id: String,
|
|
timestamp_secs: i64,
|
|
#[serde(flatten)]
|
|
http: std::collections::HashMap<String, serde_json::Value>,
|
|
#[serde(flatten)]
|
|
room: std::collections::HashMap<String, serde_json::Value>,
|
|
}
|
|
|
|
/// Query metrics for all known instances, optionally filtered by `instance_filter`.
|
|
async fn query_all_instance_metrics(
|
|
pool: &Pool,
|
|
instance_filter: &str,
|
|
limit: usize,
|
|
) -> anyhow::Result<Vec<InstanceMetrics>> {
|
|
let mut conn = pool.get().await?;
|
|
|
|
// Scan for all daily index keys to extract instance ids
|
|
let mut cursor = 0u64;
|
|
let mut all_instance_ids = Vec::new();
|
|
let mut seen = std::collections::HashSet::new();
|
|
loop {
|
|
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
|
|
.arg(cursor)
|
|
.arg("MATCH")
|
|
.arg("metrics:index:*")
|
|
.arg("COUNT")
|
|
.arg(100)
|
|
.query_async(&mut conn)
|
|
.await?;
|
|
for k in keys {
|
|
// Key format: metrics:index:{instance}:{YYYY-MM-DD}
|
|
if let Some(rest) = k.strip_prefix("metrics:index:") {
|
|
if let Some((instance_id, _)) = rest.rsplit_once(':') {
|
|
if seen.insert(instance_id.to_string()) {
|
|
all_instance_ids.push(instance_id.to_string());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
cursor = new_cursor;
|
|
if cursor == 0 {
|
|
break;
|
|
}
|
|
}
|
|
|
|
let mut results = Vec::new();
|
|
for instance_id in all_instance_ids {
|
|
if !instance_filter.is_empty() && !instance_id.contains(instance_filter) {
|
|
continue;
|
|
}
|
|
let snapshots = query_instance_metrics(pool, &instance_id, limit).await?;
|
|
for (_, payload) in snapshots {
|
|
results.push(InstanceMetrics {
|
|
instance_id: payload.instance_id,
|
|
timestamp_secs: payload.timestamp_secs,
|
|
http: payload
|
|
.http
|
|
.into_iter()
|
|
.map(|(k, v)| (k, v.to_string()))
|
|
.collect(),
|
|
room: payload
|
|
.room
|
|
.into_iter()
|
|
.map(|(k, v)| (k, v.to_string()))
|
|
.collect(),
|
|
});
|
|
}
|
|
}
|
|
|
|
Ok(results)
|
|
}
|
|
|
|
/// Query metrics for a single instance from Redis (daily hash buckets).
|
|
async fn query_instance_metrics(
|
|
pool: &Pool,
|
|
instance_id: &str,
|
|
limit: usize,
|
|
) -> anyhow::Result<Vec<(i64, MetricsSnapshot)>> {
|
|
let mut conn = pool.get().await?;
|
|
|
|
// Scan for all daily index keys for this instance
|
|
let index_pattern = format!("metrics:index:{}:*", instance_id);
|
|
let mut day_cursor = 0u64;
|
|
let mut day_keys: Vec<String> = Vec::new();
|
|
loop {
|
|
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
|
|
.arg(day_cursor)
|
|
.arg("MATCH")
|
|
.arg(&index_pattern)
|
|
.arg("COUNT")
|
|
.arg(100)
|
|
.query_async(&mut conn)
|
|
.await?;
|
|
day_keys.extend(keys);
|
|
day_cursor = new_cursor;
|
|
if day_cursor == 0 {
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Collect all timestamps from all daily indexes
|
|
let mut all_timestamps: Vec<i64> = Vec::new();
|
|
for day_key in &day_keys {
|
|
let timestamps: Vec<i64> = redis::cmd("ZREVRANGE")
|
|
.arg(day_key)
|
|
.arg(0)
|
|
.arg((limit as isize - 1).max(0))
|
|
.query_async(&mut conn)
|
|
.await?;
|
|
all_timestamps.extend(timestamps);
|
|
}
|
|
|
|
all_timestamps.sort_by_key(|&ts| std::cmp::Reverse(ts));
|
|
all_timestamps.truncate(limit);
|
|
|
|
let mut results = Vec::with_capacity(all_timestamps.len());
|
|
for ts in &all_timestamps {
|
|
let dt = chrono::DateTime::from_timestamp(*ts, 0)
|
|
.unwrap_or_else(chrono::Utc::now);
|
|
let day_str = dt.format("%Y-%m-%d").to_string();
|
|
let hash_key = format!("metrics:{}:{}", instance_id, day_str);
|
|
|
|
let fields: Option<std::collections::HashMap<String, String>> = redis::cmd("HGETALL")
|
|
.arg(&hash_key)
|
|
.query_async(&mut conn)
|
|
.await?;
|
|
|
|
if let Some(fields) = fields {
|
|
let mut http = std::collections::HashMap::new();
|
|
let mut room = std::collections::HashMap::new();
|
|
for (k, v) in &fields {
|
|
let json_val: serde_json::Value = serde_json::from_str(v).unwrap_or_else(|_| serde_json::Value::String(v.clone()));
|
|
if let Some(stripped) = k.strip_prefix("http_") {
|
|
http.insert(stripped.to_string(), json_val);
|
|
} else if let Some(stripped) = k.strip_prefix("room_") {
|
|
room.insert(stripped.to_string(), json_val);
|
|
}
|
|
}
|
|
results.push((*ts, MetricsSnapshot {
|
|
instance_id: instance_id.to_string(),
|
|
timestamp_secs: *ts,
|
|
http,
|
|
room,
|
|
}));
|
|
}
|
|
}
|
|
|
|
results.sort_by_key(|(ts, _)| *ts);
|
|
Ok(results)
|
|
}
|
|
|
|
/// Export all metrics as CSV.
|
|
async fn export_all_metrics_csv(pool: &Pool, instance_filter: &str) -> anyhow::Result<String> {
|
|
let instances = query_all_instance_metrics(pool, instance_filter, 1000).await?;
|
|
let mut rows = Vec::new();
|
|
for inst in instances {
|
|
let ts = inst.timestamp_secs;
|
|
let id = inst.instance_id;
|
|
for (k, v) in inst.http {
|
|
rows.push(format!("{},{},http_{},{}", id, ts, k, v));
|
|
}
|
|
for (k, v) in inst.room {
|
|
rows.push(format!("{},{},room_{},{}", id, ts, k, v));
|
|
}
|
|
}
|
|
let header = "instance_id,timestamp,metric,value";
|
|
if rows.is_empty() {
|
|
return Ok(header.to_string());
|
|
}
|
|
Ok(format!("{}\n{}", header, rows.join("\n")))
|
|
}
|