//! Tonic gRPC server implementation for SessionAdmin service. use session_manager::SessionManager; use std::net::SocketAddr; use tokio::sync::broadcast; use tonic::{transport::Server, Request, Response, Status}; use tracing::{info_span, Instrument}; use super::generated::admin::{ GetMetricsRequest, GetMetricsResponse, GetUserInfoRequest, GetUserInfoResponse, GetUserStatusRequest, GetUserStatusResponse, GetWorkspaceOnlineUsersRequest, GetWorkspaceOnlineUsersResponse, InstanceMetrics, IsUserOnlineRequest, IsUserOnlineResponse, KickUserFromWorkspaceRequest, KickUserFromWorkspaceResponse, KickUserRequest, KickUserResponse, ListUserSessionsRequest, ListUserSessionsResponse, ListWorkspaceSessionsRequest, ListWorkspaceSessionsResponse, ExportMetricsCsvRequest, ExportMetricsCsvResponse, }; use super::generated::admin_session_admin::session_admin_server::{ SessionAdmin, SessionAdminServer, }; use super::types::{parse_uuid, to_proto_info, to_proto_session, to_proto_status}; #[derive(Clone)] pub struct SessionAdminService { session_manager: SessionManager, } impl SessionAdminService { pub fn new(session_manager: SessionManager) -> Self { Self { session_manager } } } #[tonic::async_trait] impl SessionAdmin for SessionAdminService { async fn list_workspace_sessions( &self, req: Request, ) -> Result, Status> { let workspace_id = parse_uuid(&req.get_ref().workspace_id) .ok_or_else(|| Status::invalid_argument("invalid workspace_id"))?; async { let sessions = self .session_manager .get_workspace_sessions(&workspace_id) .await .map_err(|e| Status::internal(e.to_string()))?; let sessions = sessions.iter().map(to_proto_session).collect(); Ok(Response::new(ListWorkspaceSessionsResponse { sessions })) } .instrument(info_span!("list_workspace_sessions", workspace_id = %workspace_id)) .await } async fn list_user_sessions( &self, req: Request, ) -> Result, Status> { let user_id = parse_uuid(&req.get_ref().user_id) .ok_or_else(|| Status::invalid_argument("invalid user_id"))?; async { let sessions = self .session_manager .get_user_sessions(&user_id) .await .map_err(|e| Status::internal(e.to_string()))?; let sessions = sessions.iter().map(to_proto_session).collect(); Ok(Response::new(ListUserSessionsResponse { sessions })) } .instrument(info_span!("list_user_sessions", user_id = %user_id)) .await } async fn kick_user_from_workspace( &self, req: Request, ) -> Result, Status> { let r = req.get_ref(); let user_id = parse_uuid(&r.user_id).ok_or_else(|| Status::invalid_argument("invalid user_id"))?; let workspace_id = parse_uuid(&r.workspace_id) .ok_or_else(|| Status::invalid_argument("invalid workspace_id"))?; async { let kicked_count = self .session_manager .kick_user_from_workspace(&user_id, &workspace_id) .await .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(KickUserFromWorkspaceResponse { kicked_count: kicked_count as u32, })) } .instrument(info_span!("kick_user_from_workspace", user_id = %user_id, workspace_id = %workspace_id)) .await } async fn kick_user( &self, req: Request, ) -> Result, Status> { let user_id = parse_uuid(&req.get_ref().user_id) .ok_or_else(|| Status::invalid_argument("invalid user_id"))?; async { let kicked_count = self .session_manager .kick_user(&user_id) .await .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(KickUserResponse { kicked_count: kicked_count as u32, })) } .instrument(info_span!("kick_user", user_id = %user_id)) .await } async fn get_user_status( &self, req: Request, ) -> Result, Status> { let user_id = parse_uuid(&req.get_ref().user_id) .ok_or_else(|| Status::invalid_argument("invalid user_id"))?; async { let status = self .session_manager .get_user_status(&user_id) .await .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(GetUserStatusResponse { status: to_proto_status(status) as i32, })) } .instrument(info_span!("get_user_status", user_id = %user_id)) .await } async fn get_user_info( &self, req: Request, ) -> Result, Status> { let user_id = parse_uuid(&req.get_ref().user_id) .ok_or_else(|| Status::invalid_argument("invalid user_id"))?; async { let info = self .session_manager .get_user_info(&user_id) .await .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(GetUserInfoResponse { info: info.as_ref().map(to_proto_info), })) } .instrument(info_span!("get_user_info", user_id = %user_id)) .await } async fn get_workspace_online_users( &self, req: Request, ) -> Result, Status> { let workspace_id = parse_uuid(&req.get_ref().workspace_id) .ok_or_else(|| Status::invalid_argument("invalid workspace_id"))?; async { let user_ids = self .session_manager .get_workspace_online_users(&workspace_id) .await .map_err(|e| Status::internal(e.to_string()))?; let user_ids = user_ids.iter().map(|u| u.to_string()).collect(); Ok(Response::new(GetWorkspaceOnlineUsersResponse { user_ids })) } .instrument(info_span!("get_workspace_online_users", workspace_id = %workspace_id)) .await } async fn is_user_online( &self, req: Request, ) -> Result, Status> { let user_id = parse_uuid(&req.get_ref().user_id) .ok_or_else(|| Status::invalid_argument("invalid user_id"))?; async { let online = self .session_manager .is_user_online(&user_id) .await .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(IsUserOnlineResponse { online })) } .instrument(info_span!("is_user_online", user_id = %user_id)) .await } async fn get_metrics( &self, req: Request, ) -> Result, Status> { let r = req.get_ref(); let instance_filter = r.instance_filter.as_str(); let limit = if r.limit > 0 { r.limit as usize } else { 100 }; async { let pool = self.session_manager.pool(); let instances = query_all_instance_metrics(pool, instance_filter, limit).await .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(GetMetricsResponse { instances })) } .instrument(info_span!("get_metrics")) .await } async fn export_metrics_csv( &self, req: Request, ) -> Result, Status> { let instance_filter = req.get_ref().instance_filter.as_str(); async { let pool = self.session_manager.pool(); let csv = export_all_metrics_csv(pool, instance_filter).await .map_err(|e| Status::internal(e.to_string()))?; Ok(Response::new(ExportMetricsCsvResponse { csv })) } .instrument(info_span!("export_metrics_csv")) .await } } /// Default gRPC admin port. pub const DEFAULT_GRPC_PORT: u16 = 9090; /// Start the Tonic gRPC server on the given address. pub async fn serve( addr: SocketAddr, session_manager: SessionManager, ) -> anyhow::Result<()> { let service = SessionAdminService::new(session_manager); let incoming = tonic::transport::server::TcpIncoming::bind(addr) .map_err(|e| anyhow::anyhow!("failed to bind TcpIncoming: {}", e))?; tracing::info!(addr = %addr, "Admin gRPC server listening"); Server::builder() .add_service(SessionAdminServer::new(service)) .serve_with_incoming(incoming) .await?; Ok(()) } /// Spawn the gRPC server as a background task. pub fn spawn( addr: SocketAddr, session_manager: SessionManager, mut shutdown_rx: broadcast::Receiver<()>, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let result = serve(addr, session_manager).await; if let Err(e) = result { tracing::error!(error = %e, "Admin gRPC server error"); } let _ = shutdown_rx.recv().await; }) } // --------------------------------------------------------------------------- // Metrics helpers — mirror of observability::redis_metrics (daily hash format) // --------------------------------------------------------------------------- use deadpool_redis::cluster::Pool; use serde::{Deserialize, Serialize}; /// Snapshot stored in Redis under `metrics:{instance}:{YYYY-MM-DD}` (hash). #[derive(Debug, Clone, Serialize, Deserialize)] struct MetricsSnapshot { instance_id: String, timestamp_secs: i64, #[serde(flatten)] http: std::collections::HashMap, #[serde(flatten)] room: std::collections::HashMap, } /// Query metrics for all known instances, optionally filtered by `instance_filter`. async fn query_all_instance_metrics( pool: &Pool, instance_filter: &str, limit: usize, ) -> anyhow::Result> { let mut conn = pool.get().await?; // Scan for all daily index keys to extract instance ids let mut cursor = 0u64; let mut all_instance_ids = Vec::new(); let mut seen = std::collections::HashSet::new(); loop { let (new_cursor, keys): (u64, Vec) = redis::cmd("SCAN") .arg(cursor) .arg("MATCH") .arg("metrics:index:*") .arg("COUNT") .arg(100) .query_async(&mut conn) .await?; for k in keys { // Key format: metrics:index:{instance}:{YYYY-MM-DD} if let Some(rest) = k.strip_prefix("metrics:index:") { if let Some((instance_id, _)) = rest.rsplit_once(':') { if seen.insert(instance_id.to_string()) { all_instance_ids.push(instance_id.to_string()); } } } } cursor = new_cursor; if cursor == 0 { break; } } let mut results = Vec::new(); for instance_id in all_instance_ids { if !instance_filter.is_empty() && !instance_id.contains(instance_filter) { continue; } let snapshots = query_instance_metrics(pool, &instance_id, limit).await?; for (_, payload) in snapshots { results.push(InstanceMetrics { instance_id: payload.instance_id, timestamp_secs: payload.timestamp_secs, http: payload .http .into_iter() .map(|(k, v)| (k, v.to_string())) .collect(), room: payload .room .into_iter() .map(|(k, v)| (k, v.to_string())) .collect(), }); } } Ok(results) } /// Query metrics for a single instance from Redis (daily hash buckets). async fn query_instance_metrics( pool: &Pool, instance_id: &str, limit: usize, ) -> anyhow::Result> { let mut conn = pool.get().await?; // Scan for all daily index keys for this instance let index_pattern = format!("metrics:index:{}:*", instance_id); let mut day_cursor = 0u64; let mut day_keys: Vec = Vec::new(); loop { let (new_cursor, keys): (u64, Vec) = redis::cmd("SCAN") .arg(day_cursor) .arg("MATCH") .arg(&index_pattern) .arg("COUNT") .arg(100) .query_async(&mut conn) .await?; day_keys.extend(keys); day_cursor = new_cursor; if day_cursor == 0 { break; } } // Collect all timestamps from all daily indexes let mut all_timestamps: Vec = Vec::new(); for day_key in &day_keys { let timestamps: Vec = redis::cmd("ZREVRANGE") .arg(day_key) .arg(0) .arg((limit as isize - 1).max(0)) .query_async(&mut conn) .await?; all_timestamps.extend(timestamps); } all_timestamps.sort_by_key(|&ts| std::cmp::Reverse(ts)); all_timestamps.truncate(limit); let mut results = Vec::with_capacity(all_timestamps.len()); for ts in &all_timestamps { let dt = chrono::DateTime::from_timestamp(*ts, 0) .unwrap_or_else(chrono::Utc::now); let day_str = dt.format("%Y-%m-%d").to_string(); let hash_key = format!("metrics:{}:{}", instance_id, day_str); let fields: Option> = redis::cmd("HGETALL") .arg(&hash_key) .query_async(&mut conn) .await?; if let Some(fields) = fields { let mut http = std::collections::HashMap::new(); let mut room = std::collections::HashMap::new(); for (k, v) in &fields { let json_val: serde_json::Value = serde_json::from_str(v).unwrap_or_else(|_| serde_json::Value::String(v.clone())); if let Some(stripped) = k.strip_prefix("http_") { http.insert(stripped.to_string(), json_val); } else if let Some(stripped) = k.strip_prefix("room_") { room.insert(stripped.to_string(), json_val); } } results.push((*ts, MetricsSnapshot { instance_id: instance_id.to_string(), timestamp_secs: *ts, http, room, })); } } results.sort_by_key(|(ts, _)| *ts); Ok(results) } /// Export all metrics as CSV. async fn export_all_metrics_csv(pool: &Pool, instance_filter: &str) -> anyhow::Result { let instances = query_all_instance_metrics(pool, instance_filter, 1000).await?; let mut rows = Vec::new(); for inst in instances { let ts = inst.timestamp_secs; let id = inst.instance_id; for (k, v) in inst.http { rows.push(format!("{},{},http_{},{}", id, ts, k, v)); } for (k, v) in inst.room { rows.push(format!("{},{},room_{},{}", id, ts, k, v)); } } let header = "instance_id,timestamp,metric,value"; if rows.is_empty() { return Ok(header.to_string()); } Ok(format!("{}\n{}", header, rows.join("\n"))) }