gitdataai/apps/adminrpc/src/main.rs
ZhenYi f67c788cbe feat(gRPC): migrate admin RPC from Redis Pub/Sub to Tonic gRPC
- libs/rpc/admin: tonic-prost generated server + client wrappers
- apps/adminrpc: standalone binary with all 8 admin RPC methods
- Redis Pub/Sub JSON-RPC code removed from admin module
- libs/agent: add React agent loop for ReAct pattern
- proto/admin.proto: updated with list_workspace_sessions, is_user_online
2026-04-22 22:39:06 +08:00

337 lines
12 KiB
Rust

use actix_web::{web, App as ActixApp, HttpResponse, HttpServer};
use anyhow::Context as _;
use clap::Parser;
use config::AppConfig;
use deadpool_redis::{cluster, Runtime};
use rpc::admin::server::{serve, DEFAULT_GRPC_PORT};
use session_manager::{SessionManager, SessionStorage};
use std::net::SocketAddr;
use uuid::Uuid;
mod args;
use args::Args;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cfg = AppConfig::load();
let log_level = cfg.log_level().unwrap_or_else(|_| "info".to_string());
observability::init_tracing_subscriber(&log_level);
let args = Args::parse();
let grpc_addr: SocketAddr = args
.bind
.map(|s| s.parse())
.unwrap_or_else(|| format!("0.0.0.0:{}", DEFAULT_GRPC_PORT).parse())
.context("invalid grpc bind address")?;
let admin_port: u16 = args.http_port.unwrap_or(grpc_addr.port() + 1);
let admin_addr: SocketAddr = format!("0.0.0.0:{}", admin_port).parse()?;
tracing::info!(
app_name = %cfg.app_name().unwrap_or_default(),
grpc_addr = %grpc_addr,
admin_addr = %admin_addr,
"Starting admin RPC server"
);
let _otel_guard = if cfg.otel_enabled().unwrap_or(false) {
let endpoint = cfg
.otel_endpoint()
.unwrap_or_else(|_| "http://localhost:4317".to_string());
let service_name = cfg
.otel_service_name()
.unwrap_or_else(|_| "adminrpc".to_string());
let service_version = cfg
.otel_service_version()
.unwrap_or_else(|_| "0.1.0".to_string());
tracing::info!(endpoint = %endpoint, service = %service_name, "OTLP tracing enabled");
let guard =
observability::init_otlp(&endpoint, &service_name, &service_version, &log_level)
.map_err(|e| anyhow::anyhow!("OTLP init failed: {}", e))?;
guard
} else {
None
};
let redis_url = cfg.redis_url()?;
tracing::info!(redis_url = %redis_url, "Connecting to Redis");
let manager = cluster::Manager::new(vec![redis_url.clone()], false)
.map_err(|e| anyhow::anyhow!("failed to create redis cluster manager: {}", e))?;
let pool: cluster::Pool = cluster::Pool::builder(manager)
.max_size(16)
.runtime(Runtime::Tokio1)
.build()
.map_err(|e| anyhow::anyhow!("failed to build redis pool: {}", e))?;
let _conn = pool.get().await.context("redis pool connection failed")?;
tracing::info!("Redis connected");
let storage = SessionStorage::new(pool.clone());
let session_manager = SessionManager::new(storage);
let sm_for_grpc = session_manager.clone();
let grpc_handle = tokio::spawn(async move {
if let Err(e) = serve(grpc_addr, sm_for_grpc).await {
tracing::error!(error = %e, "Admin gRPC server error");
}
});
let http_handle = tokio::spawn(async move {
let pool_for_http = pool.clone();
let sm_for_http = session_manager.clone();
let result = HttpServer::new(move || {
ActixApp::new()
.app_data(web::Data::new(pool_for_http.clone()))
.app_data(web::Data::new(sm_for_http.clone()))
.route("/health", web::get().to(health))
.route("/admin/metrics/export", web::get().to(metrics_export))
.service(
web::scope("/api/admin")
.route(
"/sessions/workspace/{workspace_id}",
web::get().to(list_workspace_sessions),
)
.route(
"/sessions/user/{user_id}",
web::get().to(list_user_sessions),
)
.route(
"/sessions/user/{user_id}/status",
web::get().to(get_user_status),
)
.route(
"/sessions/user/{user_id}/info",
web::get().to(get_user_info),
)
.route(
"/sessions/workspace/{workspace_id}/online-users",
web::get().to(get_workspace_online_users),
)
.route(
"/sessions/user/{user_id}/online",
web::get().to(is_user_online),
)
.route("/sessions/kick", web::post().to(kick_user))
.route(
"/sessions/kick-workspace",
web::post().to(kick_user_from_workspace),
)
// Metrics
.route("/metrics", web::get().to(get_metrics))
.route("/metrics/export", web::get().to(metrics_export)),
)
})
.bind(admin_addr)
.expect("failed to bind HTTP server")
.workers(2)
.run()
.await;
if let Err(e) = result {
tracing::error!(error = %e, "Admin HTTP server error");
}
});
tracing::info!(addr = %grpc_addr, "Admin gRPC server listening");
tracing::info!(addr = %admin_addr, "Admin HTTP server listening");
grpc_handle.await?;
http_handle.abort();
tracing::info!("Admin RPC server stopped");
Ok(())
}
// ─── HTTP Handlers ────────────────────────────────────────────────────────────
async fn health() -> HttpResponse {
HttpResponse::Ok().json(serde_json::json!({ "ok": true }))
}
async fn metrics_export(pool: web::Data<cluster::Pool>) -> HttpResponse {
match observability::export_all_metrics_csv(pool.get_ref(), "").await {
Ok(csv) => HttpResponse::Ok()
.content_type("text/csv; charset=utf-8")
.body(csv),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
async fn get_metrics(pool: web::Data<cluster::Pool>) -> HttpResponse {
match observability::query_all_instance_metrics(pool.get_ref(), "", 100).await {
Ok(instances) => HttpResponse::Ok().json(instances),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
fn parse_uuid(s: &str) -> Option<Uuid> {
Uuid::parse_str(s).ok()
}
async fn list_workspace_sessions(
sm: web::Data<SessionManager>,
path: web::Path<String>,
) -> HttpResponse {
let workspace_id = match parse_uuid(&path) {
Some(id) => id,
None => {
return HttpResponse::BadRequest()
.json(serde_json::json!({ "error": "invalid workspace_id" }));
}
};
match sm.get_workspace_sessions(&workspace_id).await {
Ok(sessions) => HttpResponse::Ok().json(sessions),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
async fn list_user_sessions(
sm: web::Data<SessionManager>,
path: web::Path<String>,
) -> HttpResponse {
let user_id = match parse_uuid(&path) {
Some(id) => id,
None => {
return HttpResponse::BadRequest()
.json(serde_json::json!({ "error": "invalid user_id" }));
}
};
match sm.get_user_sessions(&user_id).await {
Ok(sessions) => HttpResponse::Ok().json(sessions),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
async fn get_user_status(sm: web::Data<SessionManager>, path: web::Path<String>) -> HttpResponse {
let user_id = match parse_uuid(&path) {
Some(id) => id,
None => {
return HttpResponse::BadRequest()
.json(serde_json::json!({ "error": "invalid user_id" }));
}
};
match sm.get_user_status(&user_id).await {
Ok(status) => {
HttpResponse::Ok().json(serde_json::json!({ "status": format!("{:?}", status) }))
}
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
async fn get_user_info(sm: web::Data<SessionManager>, path: web::Path<String>) -> HttpResponse {
let user_id = match parse_uuid(&path) {
Some(id) => id,
None => {
return HttpResponse::BadRequest()
.json(serde_json::json!({ "error": "invalid user_id" }));
}
};
match sm.get_user_info(&user_id).await {
Ok(info) => HttpResponse::Ok().json(info),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
async fn get_workspace_online_users(
sm: web::Data<SessionManager>,
path: web::Path<String>,
) -> HttpResponse {
let workspace_id = match parse_uuid(&path) {
Some(id) => id,
None => {
return HttpResponse::BadRequest()
.json(serde_json::json!({ "error": "invalid workspace_id" }));
}
};
match sm.get_workspace_online_users(&workspace_id).await {
Ok(user_ids) => HttpResponse::Ok().json(serde_json::json!({ "user_ids": user_ids })),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
async fn is_user_online(sm: web::Data<SessionManager>, path: web::Path<String>) -> HttpResponse {
let user_id = match parse_uuid(&path) {
Some(id) => id,
None => {
return HttpResponse::BadRequest()
.json(serde_json::json!({ "error": "invalid user_id" }));
}
};
match sm.is_user_online(&user_id).await {
Ok(online) => HttpResponse::Ok().json(serde_json::json!({ "online": online })),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
#[derive(serde::Deserialize)]
struct KickUserPayload {
user_id: String,
}
async fn kick_user(
sm: web::Data<SessionManager>,
body: web::Json<KickUserPayload>,
) -> HttpResponse {
let user_id = match parse_uuid(&body.user_id) {
Some(id) => id,
None => {
return HttpResponse::BadRequest()
.json(serde_json::json!({ "error": "invalid user_id" }));
}
};
match sm.kick_user(&user_id).await {
Ok(count) => HttpResponse::Ok().json(serde_json::json!({ "kicked_count": count })),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
#[derive(serde::Deserialize)]
struct KickWorkspacePayload {
user_id: String,
workspace_id: String,
}
async fn kick_user_from_workspace(
sm: web::Data<SessionManager>,
body: web::Json<KickWorkspacePayload>,
) -> HttpResponse {
let user_id = match parse_uuid(&body.user_id) {
Some(id) => id,
None => {
return HttpResponse::BadRequest()
.json(serde_json::json!({ "error": "invalid user_id" }));
}
};
let workspace_id = match parse_uuid(&body.workspace_id) {
Some(id) => id,
None => {
return HttpResponse::BadRequest()
.json(serde_json::json!({ "error": "invalid workspace_id" }));
}
};
match sm.kick_user_from_workspace(&user_id, &workspace_id).await {
Ok(count) => HttpResponse::Ok().json(serde_json::json!({ "kicked_count": count })),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}