use actix_web::{web, App as ActixApp, HttpResponse, HttpServer}; use anyhow::Context as _; use clap::Parser; use config::AppConfig; use deadpool_redis::{cluster, Runtime}; use rpc::admin::server::{serve, DEFAULT_GRPC_PORT}; use session_manager::{SessionManager, SessionStorage}; use std::net::SocketAddr; use uuid::Uuid; mod args; use args::Args; #[tokio::main] async fn main() -> anyhow::Result<()> { let cfg = AppConfig::load(); let log_level = cfg.log_level().unwrap_or_else(|_| "info".to_string()); let otel_enabled = cfg.otel_enabled().unwrap_or(false); observability::init_tracing_subscriber(&log_level, otel_enabled); let args = Args::parse(); let grpc_addr: SocketAddr = args .bind .map(|s| s.parse()) .unwrap_or_else(|| format!("0.0.0.0:{}", DEFAULT_GRPC_PORT).parse()) .context("invalid grpc bind address")?; let admin_port: u16 = args.http_port.unwrap_or(grpc_addr.port() + 1); let admin_addr: SocketAddr = format!("0.0.0.0:{}", admin_port).parse()?; tracing::info!( app_name = %cfg.app_name().unwrap_or_default(), grpc_addr = %grpc_addr, admin_addr = %admin_addr, "Starting admin RPC server" ); let _otel_guard = if otel_enabled { let endpoint = cfg .otel_endpoint() .unwrap_or_else(|_| "http://localhost:4317".to_string()); let service_name = cfg .otel_service_name() .unwrap_or_else(|_| "adminrpc".to_string()); let service_version = cfg .otel_service_version() .unwrap_or_else(|_| "0.1.0".to_string()); tracing::info!(endpoint = %endpoint, service = %service_name, "OTLP tracing enabled"); let guard = observability::init_otlp(&endpoint, &service_name, &service_version, &log_level) .map_err(|e| anyhow::anyhow!("OTLP init failed: {}", e))?; guard } else { None }; let redis_url = cfg.redis_url()?; tracing::info!(redis_url = %redis_url, "Connecting to Redis"); let manager = cluster::Manager::new(vec![redis_url.clone()], false) .map_err(|e| anyhow::anyhow!("failed to create redis cluster manager: {}", e))?; let pool: cluster::Pool = cluster::Pool::builder(manager) .max_size(16) .runtime(Runtime::Tokio1) .build() .map_err(|e| anyhow::anyhow!("failed to build redis pool: {}", e))?; let _conn = pool.get().await.context("redis pool connection failed")?; tracing::info!("Redis connected"); let storage = SessionStorage::new(pool.clone()); let session_manager = SessionManager::new(storage); let sm_for_grpc = session_manager.clone(); let grpc_handle = tokio::spawn(async move { if let Err(e) = serve(grpc_addr, sm_for_grpc).await { tracing::error!(error = %e, "Admin gRPC server error"); } }); let http_handle = tokio::spawn(async move { let sm_for_http = session_manager.clone(); let result = HttpServer::new(move || { ActixApp::new() .app_data(web::Data::new(sm_for_http.clone())) .route("/health", web::get().to(health)) .service( web::scope("/api/admin") .route( "/sessions/workspace/{workspace_id}", web::get().to(list_workspace_sessions), ) .route( "/sessions/user/{user_id}", web::get().to(list_user_sessions), ) .route( "/sessions/user/{user_id}/status", web::get().to(get_user_status), ) .route( "/sessions/user/{user_id}/info", web::get().to(get_user_info), ) .route( "/sessions/workspace/{workspace_id}/online-users", web::get().to(get_workspace_online_users), ) .route( "/sessions/user/{user_id}/online", web::get().to(is_user_online), ) .route("/sessions/kick", web::post().to(kick_user)) .route( "/sessions/kick-workspace", web::post().to(kick_user_from_workspace), ), ) }) .bind(admin_addr) .expect("failed to bind HTTP server") .workers(2) .run() .await; if let Err(e) = result { tracing::error!(error = %e, "Admin HTTP server error"); } }); tracing::info!(addr = %grpc_addr, "Admin gRPC server listening"); tracing::info!(addr = %admin_addr, "Admin HTTP server listening"); grpc_handle.await?; http_handle.abort(); tracing::info!("Admin RPC server stopped"); Ok(()) } // ─── HTTP Handlers ──────────────────────────────────────────────────────────── async fn health() -> HttpResponse { HttpResponse::Ok().json(serde_json::json!({ "ok": true })) } fn parse_uuid(s: &str) -> Option { Uuid::parse_str(s).ok() } async fn list_workspace_sessions( sm: web::Data, path: web::Path, ) -> HttpResponse { let workspace_id = match parse_uuid(&path) { Some(id) => id, None => { return HttpResponse::BadRequest() .json(serde_json::json!({ "error": "invalid workspace_id" })); } }; match sm.get_workspace_sessions(&workspace_id).await { Ok(sessions) => HttpResponse::Ok().json(sessions), Err(e) => { HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })) } } } async fn list_user_sessions( sm: web::Data, path: web::Path, ) -> HttpResponse { let user_id = match parse_uuid(&path) { Some(id) => id, None => { return HttpResponse::BadRequest() .json(serde_json::json!({ "error": "invalid user_id" })); } }; match sm.get_user_sessions(&user_id).await { Ok(sessions) => HttpResponse::Ok().json(sessions), Err(e) => { HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })) } } } async fn get_user_status(sm: web::Data, path: web::Path) -> HttpResponse { let user_id = match parse_uuid(&path) { Some(id) => id, None => { return HttpResponse::BadRequest() .json(serde_json::json!({ "error": "invalid user_id" })); } }; match sm.get_user_status(&user_id).await { Ok(status) => { HttpResponse::Ok().json(serde_json::json!({ "status": format!("{:?}", status) })) } Err(e) => { HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })) } } } async fn get_user_info(sm: web::Data, path: web::Path) -> HttpResponse { let user_id = match parse_uuid(&path) { Some(id) => id, None => { return HttpResponse::BadRequest() .json(serde_json::json!({ "error": "invalid user_id" })); } }; match sm.get_user_info(&user_id).await { Ok(info) => HttpResponse::Ok().json(info), Err(e) => { HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })) } } } async fn get_workspace_online_users( sm: web::Data, path: web::Path, ) -> HttpResponse { let workspace_id = match parse_uuid(&path) { Some(id) => id, None => { return HttpResponse::BadRequest() .json(serde_json::json!({ "error": "invalid workspace_id" })); } }; match sm.get_workspace_online_users(&workspace_id).await { Ok(user_ids) => HttpResponse::Ok().json(serde_json::json!({ "user_ids": user_ids })), Err(e) => { HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })) } } } async fn is_user_online(sm: web::Data, path: web::Path) -> HttpResponse { let user_id = match parse_uuid(&path) { Some(id) => id, None => { return HttpResponse::BadRequest() .json(serde_json::json!({ "error": "invalid user_id" })); } }; match sm.is_user_online(&user_id).await { Ok(online) => HttpResponse::Ok().json(serde_json::json!({ "online": online })), Err(e) => { HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })) } } } #[derive(serde::Deserialize)] struct KickUserPayload { user_id: String, } async fn kick_user( sm: web::Data, body: web::Json, ) -> HttpResponse { let user_id = match parse_uuid(&body.user_id) { Some(id) => id, None => { return HttpResponse::BadRequest() .json(serde_json::json!({ "error": "invalid user_id" })); } }; match sm.kick_user(&user_id).await { Ok(count) => HttpResponse::Ok().json(serde_json::json!({ "kicked_count": count })), Err(e) => { HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })) } } } #[derive(serde::Deserialize)] struct KickWorkspacePayload { user_id: String, workspace_id: String, } async fn kick_user_from_workspace( sm: web::Data, body: web::Json, ) -> HttpResponse { let user_id = match parse_uuid(&body.user_id) { Some(id) => id, None => { return HttpResponse::BadRequest() .json(serde_json::json!({ "error": "invalid user_id" })); } }; let workspace_id = match parse_uuid(&body.workspace_id) { Some(id) => id, None => { return HttpResponse::BadRequest() .json(serde_json::json!({ "error": "invalid workspace_id" })); } }; match sm.kick_user_from_workspace(&user_id, &workspace_id).await { Ok(count) => HttpResponse::Ok().json(serde_json::json!({ "kicked_count": count })), Err(e) => { HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })) } } }