gitdataai/libs/rpc/admin/server.rs
ZhenYi 418f9a5d8b
Some checks are pending
CI / Rust Lint & Check (push) Waiting to run
CI / Rust Tests (push) Waiting to run
CI / Frontend Lint & Type Check (push) Waiting to run
CI / Frontend Build (push) Blocked by required conditions
feat(rpc): migrate admin from Redis Pub/Sub JSON-RPC to Tonic gRPC
- libs/rpc/proto/: admin.proto with 8 RPC methods
- libs/rpc/admin/: tonic server impl (SessionAdminService), client
  wrapper (AdminGrpcClient), types, generated/ tonic-prost build output
- libs/rpc/build.rs: tonic-prost-build two-step (proto -> message types
  + manual service defs)
- libs/rpc/lib.rs: module re-exports
- libs/session_manager/: session manager types used by admin service
2026-04-21 13:44:25 +08:00

216 lines
7.0 KiB
Rust

//! Tonic gRPC server implementation for SessionAdmin service.
use session_manager::SessionManager;
use slog::Logger;
use std::net::SocketAddr;
use tokio::sync::broadcast;
use tonic::{transport::Server, Request, Response, Status};
use super::generated::admin::{
GetUserInfoRequest, GetUserInfoResponse, GetUserStatusRequest, GetUserStatusResponse,
GetWorkspaceOnlineUsersRequest, GetWorkspaceOnlineUsersResponse, IsUserOnlineRequest,
IsUserOnlineResponse, KickUserFromWorkspaceRequest, KickUserFromWorkspaceResponse,
KickUserRequest, KickUserResponse, ListUserSessionsRequest, ListUserSessionsResponse,
ListWorkspaceSessionsRequest, ListWorkspaceSessionsResponse,
};
use super::generated::admin_session_admin::session_admin_server::{
SessionAdmin, SessionAdminServer,
};
use super::types::{parse_uuid, to_proto_info, to_proto_session, to_proto_status};
#[derive(Clone)]
pub struct SessionAdminService {
session_manager: SessionManager,
}
impl SessionAdminService {
pub fn new(session_manager: SessionManager) -> Self {
Self { session_manager }
}
}
#[tonic::async_trait]
impl SessionAdmin for SessionAdminService {
async fn list_workspace_sessions(
&self,
req: Request<ListWorkspaceSessionsRequest>,
) -> Result<Response<ListWorkspaceSessionsResponse>, Status> {
let workspace_id = parse_uuid(&req.get_ref().workspace_id)
.ok_or_else(|| Status::invalid_argument("invalid workspace_id"))?;
let sessions = self
.session_manager
.get_workspace_sessions(&workspace_id)
.await
.map_err(|e| Status::internal(e.to_string()))?;
let sessions = sessions.iter().map(to_proto_session).collect();
Ok(Response::new(ListWorkspaceSessionsResponse { sessions }))
}
async fn list_user_sessions(
&self,
req: Request<ListUserSessionsRequest>,
) -> Result<Response<ListUserSessionsResponse>, Status> {
let user_id = parse_uuid(&req.get_ref().user_id)
.ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
let sessions = self
.session_manager
.get_user_sessions(&user_id)
.await
.map_err(|e| Status::internal(e.to_string()))?;
let sessions = sessions.iter().map(to_proto_session).collect();
Ok(Response::new(ListUserSessionsResponse { sessions }))
}
async fn kick_user_from_workspace(
&self,
req: Request<KickUserFromWorkspaceRequest>,
) -> Result<Response<KickUserFromWorkspaceResponse>, Status> {
let r = req.get_ref();
let user_id =
parse_uuid(&r.user_id).ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
let workspace_id = parse_uuid(&r.workspace_id)
.ok_or_else(|| Status::invalid_argument("invalid workspace_id"))?;
let kicked_count = self
.session_manager
.kick_user_from_workspace(&user_id, &workspace_id)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(KickUserFromWorkspaceResponse {
kicked_count: kicked_count as u32,
}))
}
async fn kick_user(
&self,
req: Request<KickUserRequest>,
) -> Result<Response<KickUserResponse>, Status> {
let user_id = parse_uuid(&req.get_ref().user_id)
.ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
let kicked_count = self
.session_manager
.kick_user(&user_id)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(KickUserResponse {
kicked_count: kicked_count as u32,
}))
}
async fn get_user_status(
&self,
req: Request<GetUserStatusRequest>,
) -> Result<Response<GetUserStatusResponse>, Status> {
let user_id = parse_uuid(&req.get_ref().user_id)
.ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
let status = self
.session_manager
.get_user_status(&user_id)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(GetUserStatusResponse {
status: to_proto_status(status) as i32,
}))
}
async fn get_user_info(
&self,
req: Request<GetUserInfoRequest>,
) -> Result<Response<GetUserInfoResponse>, Status> {
let user_id = parse_uuid(&req.get_ref().user_id)
.ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
let info = self
.session_manager
.get_user_info(&user_id)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(GetUserInfoResponse {
info: info.as_ref().map(to_proto_info),
}))
}
async fn get_workspace_online_users(
&self,
req: Request<GetWorkspaceOnlineUsersRequest>,
) -> Result<Response<GetWorkspaceOnlineUsersResponse>, Status> {
let workspace_id = parse_uuid(&req.get_ref().workspace_id)
.ok_or_else(|| Status::invalid_argument("invalid workspace_id"))?;
let user_ids = self
.session_manager
.get_workspace_online_users(&workspace_id)
.await
.map_err(|e| Status::internal(e.to_string()))?;
let user_ids = user_ids.iter().map(|u| u.to_string()).collect();
Ok(Response::new(GetWorkspaceOnlineUsersResponse { user_ids }))
}
async fn is_user_online(
&self,
req: Request<IsUserOnlineRequest>,
) -> Result<Response<IsUserOnlineResponse>, Status> {
let user_id = parse_uuid(&req.get_ref().user_id)
.ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
let online = self
.session_manager
.is_user_online(&user_id)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(IsUserOnlineResponse { online }))
}
}
/// Default gRPC admin port.
pub const DEFAULT_GRPC_PORT: u16 = 9090;
/// Start the Tonic gRPC server on the given address.
pub async fn serve(
addr: SocketAddr,
session_manager: SessionManager,
log: Logger,
) -> anyhow::Result<()> {
let service = SessionAdminService::new(session_manager);
let incoming = tonic::transport::server::TcpIncoming::bind(addr)
.map_err(|e| anyhow::anyhow!("failed to bind TcpIncoming: {}", e))?;
slog::info!(log, "Admin gRPC server listening on {}", addr);
Server::builder()
.add_service(SessionAdminServer::new(service))
.serve_with_incoming(incoming)
.await?;
Ok(())
}
/// Spawn the gRPC server as a background task.
pub fn spawn(
addr: SocketAddr,
session_manager: SessionManager,
log: Logger,
mut shutdown_rx: broadcast::Receiver<()>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let result = serve(addr, session_manager, log).await;
if let Err(e) = result {
eprintln!("Admin gRPC server error: {}", e);
}
let _ = shutdown_rx.recv().await;
})
}