gitdataai/libs/rpc/admin/server.rs
ZhenYi 962bf0312d feat(observability): Phase 6 OTLP tracing + Prometheus metrics endpoint
OTLP tracing:
- libs/observability/otlp.rs: SdkTracerProvider via HTTP/proto OTLP exporter
- libs/observability/tracing_middleware.rs: Actix-web span with trace_id propagation
- libs/observability/tracing_fmt.rs: JSON fmt + registry.try_init for layered init
- libs/rpc: gRPC method spans via info_span
- libs/agent, libs/room, libs/service, libs/api: structured tracing throughout

Prometheus metrics:
- libs/observability/prometheus_exporter.rs: /metrics HTTP handler + metrics crate
- libs/observability/metrics_middleware.rs: HttpMetrics middleware + AtomicU64
- libs/observability/redis_metrics.rs: Redis counter poller via RedisMetrics
- libs/room/metrics.rs: RoomMetrics (connections, messages, presence counters)

Config env vars: APP_OTEL_ENABLED, APP_OTEL_ENDPOINT, APP_OTEL_SERVICE_NAME
2026-04-22 10:27:54 +08:00

463 lines
16 KiB
Rust

//! Tonic gRPC server implementation for SessionAdmin service.
use session_manager::SessionManager;
use std::net::SocketAddr;
use tokio::sync::broadcast;
use tonic::{transport::Server, Request, Response, Status};
use tracing::{info_span, Instrument};
use super::generated::admin::{
GetMetricsRequest, GetMetricsResponse, GetUserInfoRequest, GetUserInfoResponse,
GetUserStatusRequest, GetUserStatusResponse, GetWorkspaceOnlineUsersRequest,
GetWorkspaceOnlineUsersResponse, InstanceMetrics, IsUserOnlineRequest, IsUserOnlineResponse,
KickUserFromWorkspaceRequest, KickUserFromWorkspaceResponse, KickUserRequest, KickUserResponse,
ListUserSessionsRequest, ListUserSessionsResponse, ListWorkspaceSessionsRequest,
ListWorkspaceSessionsResponse, ExportMetricsCsvRequest, ExportMetricsCsvResponse,
};
use super::generated::admin_session_admin::session_admin_server::{
SessionAdmin, SessionAdminServer,
};
use super::types::{parse_uuid, to_proto_info, to_proto_session, to_proto_status};
#[derive(Clone)]
pub struct SessionAdminService {
session_manager: SessionManager,
}
impl SessionAdminService {
pub fn new(session_manager: SessionManager) -> Self {
Self { session_manager }
}
}
#[tonic::async_trait]
impl SessionAdmin for SessionAdminService {
async fn list_workspace_sessions(
&self,
req: Request<ListWorkspaceSessionsRequest>,
) -> Result<Response<ListWorkspaceSessionsResponse>, Status> {
let workspace_id = parse_uuid(&req.get_ref().workspace_id)
.ok_or_else(|| Status::invalid_argument("invalid workspace_id"))?;
async {
let sessions = self
.session_manager
.get_workspace_sessions(&workspace_id)
.await
.map_err(|e| Status::internal(e.to_string()))?;
let sessions = sessions.iter().map(to_proto_session).collect();
Ok(Response::new(ListWorkspaceSessionsResponse { sessions }))
}
.instrument(info_span!("list_workspace_sessions", workspace_id = %workspace_id))
.await
}
async fn list_user_sessions(
&self,
req: Request<ListUserSessionsRequest>,
) -> Result<Response<ListUserSessionsResponse>, Status> {
let user_id = parse_uuid(&req.get_ref().user_id)
.ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
async {
let sessions = self
.session_manager
.get_user_sessions(&user_id)
.await
.map_err(|e| Status::internal(e.to_string()))?;
let sessions = sessions.iter().map(to_proto_session).collect();
Ok(Response::new(ListUserSessionsResponse { sessions }))
}
.instrument(info_span!("list_user_sessions", user_id = %user_id))
.await
}
async fn kick_user_from_workspace(
&self,
req: Request<KickUserFromWorkspaceRequest>,
) -> Result<Response<KickUserFromWorkspaceResponse>, Status> {
let r = req.get_ref();
let user_id =
parse_uuid(&r.user_id).ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
let workspace_id = parse_uuid(&r.workspace_id)
.ok_or_else(|| Status::invalid_argument("invalid workspace_id"))?;
async {
let kicked_count = self
.session_manager
.kick_user_from_workspace(&user_id, &workspace_id)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(KickUserFromWorkspaceResponse {
kicked_count: kicked_count as u32,
}))
}
.instrument(info_span!("kick_user_from_workspace", user_id = %user_id, workspace_id = %workspace_id))
.await
}
async fn kick_user(
&self,
req: Request<KickUserRequest>,
) -> Result<Response<KickUserResponse>, Status> {
let user_id = parse_uuid(&req.get_ref().user_id)
.ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
async {
let kicked_count = self
.session_manager
.kick_user(&user_id)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(KickUserResponse {
kicked_count: kicked_count as u32,
}))
}
.instrument(info_span!("kick_user", user_id = %user_id))
.await
}
async fn get_user_status(
&self,
req: Request<GetUserStatusRequest>,
) -> Result<Response<GetUserStatusResponse>, Status> {
let user_id = parse_uuid(&req.get_ref().user_id)
.ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
async {
let status = self
.session_manager
.get_user_status(&user_id)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(GetUserStatusResponse {
status: to_proto_status(status) as i32,
}))
}
.instrument(info_span!("get_user_status", user_id = %user_id))
.await
}
async fn get_user_info(
&self,
req: Request<GetUserInfoRequest>,
) -> Result<Response<GetUserInfoResponse>, Status> {
let user_id = parse_uuid(&req.get_ref().user_id)
.ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
async {
let info = self
.session_manager
.get_user_info(&user_id)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(GetUserInfoResponse {
info: info.as_ref().map(to_proto_info),
}))
}
.instrument(info_span!("get_user_info", user_id = %user_id))
.await
}
async fn get_workspace_online_users(
&self,
req: Request<GetWorkspaceOnlineUsersRequest>,
) -> Result<Response<GetWorkspaceOnlineUsersResponse>, Status> {
let workspace_id = parse_uuid(&req.get_ref().workspace_id)
.ok_or_else(|| Status::invalid_argument("invalid workspace_id"))?;
async {
let user_ids = self
.session_manager
.get_workspace_online_users(&workspace_id)
.await
.map_err(|e| Status::internal(e.to_string()))?;
let user_ids = user_ids.iter().map(|u| u.to_string()).collect();
Ok(Response::new(GetWorkspaceOnlineUsersResponse { user_ids }))
}
.instrument(info_span!("get_workspace_online_users", workspace_id = %workspace_id))
.await
}
async fn is_user_online(
&self,
req: Request<IsUserOnlineRequest>,
) -> Result<Response<IsUserOnlineResponse>, Status> {
let user_id = parse_uuid(&req.get_ref().user_id)
.ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
async {
let online = self
.session_manager
.is_user_online(&user_id)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(IsUserOnlineResponse { online }))
}
.instrument(info_span!("is_user_online", user_id = %user_id))
.await
}
async fn get_metrics(
&self,
req: Request<GetMetricsRequest>,
) -> Result<Response<GetMetricsResponse>, Status> {
let r = req.get_ref();
let instance_filter = r.instance_filter.as_str();
let limit = if r.limit > 0 { r.limit as usize } else { 100 };
async {
let pool = self.session_manager.pool();
let instances = query_all_instance_metrics(pool, instance_filter, limit).await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(GetMetricsResponse { instances }))
}
.instrument(info_span!("get_metrics"))
.await
}
async fn export_metrics_csv(
&self,
req: Request<ExportMetricsCsvRequest>,
) -> Result<Response<ExportMetricsCsvResponse>, Status> {
let instance_filter = req.get_ref().instance_filter.as_str();
async {
let pool = self.session_manager.pool();
let csv = export_all_metrics_csv(pool, instance_filter).await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(ExportMetricsCsvResponse { csv }))
}
.instrument(info_span!("export_metrics_csv"))
.await
}
}
/// Default gRPC admin port.
pub const DEFAULT_GRPC_PORT: u16 = 9090;
/// Start the Tonic gRPC server on the given address.
pub async fn serve(
addr: SocketAddr,
session_manager: SessionManager,
) -> anyhow::Result<()> {
let service = SessionAdminService::new(session_manager);
let incoming = tonic::transport::server::TcpIncoming::bind(addr)
.map_err(|e| anyhow::anyhow!("failed to bind TcpIncoming: {}", e))?;
tracing::info!(addr = %addr, "Admin gRPC server listening");
Server::builder()
.add_service(SessionAdminServer::new(service))
.serve_with_incoming(incoming)
.await?;
Ok(())
}
/// Spawn the gRPC server as a background task.
pub fn spawn(
addr: SocketAddr,
session_manager: SessionManager,
mut shutdown_rx: broadcast::Receiver<()>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let result = serve(addr, session_manager).await;
if let Err(e) = result {
tracing::error!(error = %e, "Admin gRPC server error");
}
let _ = shutdown_rx.recv().await;
})
}
// ---------------------------------------------------------------------------
// Metrics helpers — mirror of observability::redis_metrics (daily hash format)
// ---------------------------------------------------------------------------
use deadpool_redis::cluster::Pool;
use serde::{Deserialize, Serialize};
/// Snapshot stored in Redis under `metrics:{instance}:{YYYY-MM-DD}` (hash).
#[derive(Debug, Clone, Serialize, Deserialize)]
struct MetricsSnapshot {
instance_id: String,
timestamp_secs: i64,
#[serde(flatten)]
http: std::collections::HashMap<String, serde_json::Value>,
#[serde(flatten)]
room: std::collections::HashMap<String, serde_json::Value>,
}
/// Query metrics for all known instances, optionally filtered by `instance_filter`.
async fn query_all_instance_metrics(
pool: &Pool,
instance_filter: &str,
limit: usize,
) -> anyhow::Result<Vec<InstanceMetrics>> {
let mut conn = pool.get().await?;
// Scan for all daily index keys to extract instance ids
let mut cursor = 0u64;
let mut all_instance_ids = Vec::new();
let mut seen = std::collections::HashSet::new();
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg("metrics:index:*")
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await?;
for k in keys {
// Key format: metrics:index:{instance}:{YYYY-MM-DD}
if let Some(rest) = k.strip_prefix("metrics:index:") {
if let Some((instance_id, _)) = rest.rsplit_once(':') {
if seen.insert(instance_id.to_string()) {
all_instance_ids.push(instance_id.to_string());
}
}
}
}
cursor = new_cursor;
if cursor == 0 {
break;
}
}
let mut results = Vec::new();
for instance_id in all_instance_ids {
if !instance_filter.is_empty() && !instance_id.contains(instance_filter) {
continue;
}
let snapshots = query_instance_metrics(pool, &instance_id, limit).await?;
for (_, payload) in snapshots {
results.push(InstanceMetrics {
instance_id: payload.instance_id,
timestamp_secs: payload.timestamp_secs,
http: payload
.http
.into_iter()
.map(|(k, v)| (k, v.to_string()))
.collect(),
room: payload
.room
.into_iter()
.map(|(k, v)| (k, v.to_string()))
.collect(),
});
}
}
Ok(results)
}
/// Query metrics for a single instance from Redis (daily hash buckets).
async fn query_instance_metrics(
pool: &Pool,
instance_id: &str,
limit: usize,
) -> anyhow::Result<Vec<(i64, MetricsSnapshot)>> {
let mut conn = pool.get().await?;
// Scan for all daily index keys for this instance
let index_pattern = format!("metrics:index:{}:*", instance_id);
let mut day_cursor = 0u64;
let mut day_keys: Vec<String> = Vec::new();
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(day_cursor)
.arg("MATCH")
.arg(&index_pattern)
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await?;
day_keys.extend(keys);
day_cursor = new_cursor;
if day_cursor == 0 {
break;
}
}
// Collect all timestamps from all daily indexes
let mut all_timestamps: Vec<i64> = Vec::new();
for day_key in &day_keys {
let timestamps: Vec<i64> = redis::cmd("ZREVRANGE")
.arg(day_key)
.arg(0)
.arg((limit as isize - 1).max(0))
.query_async(&mut conn)
.await?;
all_timestamps.extend(timestamps);
}
all_timestamps.sort_by_key(|&ts| std::cmp::Reverse(ts));
all_timestamps.truncate(limit);
let mut results = Vec::with_capacity(all_timestamps.len());
for ts in &all_timestamps {
let dt = chrono::DateTime::from_timestamp(*ts, 0)
.unwrap_or_else(chrono::Utc::now);
let day_str = dt.format("%Y-%m-%d").to_string();
let hash_key = format!("metrics:{}:{}", instance_id, day_str);
let fields: Option<std::collections::HashMap<String, String>> = redis::cmd("HGETALL")
.arg(&hash_key)
.query_async(&mut conn)
.await?;
if let Some(fields) = fields {
let mut http = std::collections::HashMap::new();
let mut room = std::collections::HashMap::new();
for (k, v) in &fields {
let json_val: serde_json::Value = serde_json::from_str(v).unwrap_or_else(|_| serde_json::Value::String(v.clone()));
if let Some(stripped) = k.strip_prefix("http_") {
http.insert(stripped.to_string(), json_val);
} else if let Some(stripped) = k.strip_prefix("room_") {
room.insert(stripped.to_string(), json_val);
}
}
results.push((*ts, MetricsSnapshot {
instance_id: instance_id.to_string(),
timestamp_secs: *ts,
http,
room,
}));
}
}
results.sort_by_key(|(ts, _)| *ts);
Ok(results)
}
/// Export all metrics as CSV.
async fn export_all_metrics_csv(pool: &Pool, instance_filter: &str) -> anyhow::Result<String> {
let instances = query_all_instance_metrics(pool, instance_filter, 1000).await?;
let mut rows = Vec::new();
for inst in instances {
let ts = inst.timestamp_secs;
let id = inst.instance_id;
for (k, v) in inst.http {
rows.push(format!("{},{},http_{},{}", id, ts, k, v));
}
for (k, v) in inst.room {
rows.push(format!("{},{},room_{},{}", id, ts, k, v));
}
}
let header = "instance_id,timestamp,metric,value";
if rows.is_empty() {
return Ok(header.to_string());
}
Ok(format!("{}\n{}", header, rows.join("\n")))
}