feat(gRPC): migrate admin RPC from Redis Pub/Sub to Tonic gRPC

- libs/rpc/admin: tonic-prost generated server + client wrappers
- apps/adminrpc: standalone binary with all 8 admin RPC methods
- Redis Pub/Sub JSON-RPC code removed from admin module
- libs/agent: add React agent loop for ReAct pattern
- proto/admin.proto: updated with list_workspace_sessions, is_user_online
This commit is contained in:
ZhenYi 2026-04-22 22:39:06 +08:00
parent 850a5392ce
commit f67c788cbe
6 changed files with 338 additions and 72 deletions

View File

@ -1,11 +1,11 @@
use std::net::SocketAddr;
use actix_web::{web, App as ActixApp, HttpResponse, HttpServer};
use anyhow::Context as _;
use clap::Parser;
use config::AppConfig;
use deadpool_redis::{cluster, Runtime};
use session_manager::{SessionManager, SessionStorage};
use rpc::admin::server::{serve, DEFAULT_GRPC_PORT};
use session_manager::{SessionManager, SessionStorage};
use std::net::SocketAddr;
use uuid::Uuid;
mod args;
@ -24,9 +24,8 @@ async fn main() -> anyhow::Result<()> {
.unwrap_or_else(|| format!("0.0.0.0:{}", DEFAULT_GRPC_PORT).parse())
.context("invalid grpc bind address")?;
// Admin HTTP port is gRPC port + 1 (e.g., 9091)
let admin_port: u16 = args.http_port.unwrap_or(grpc_addr.port() + 1);
let admin_addr: SocketAddr = format!("0.0.0.0:{}", admin_port).parse().unwrap();
let admin_addr: SocketAddr = format!("0.0.0.0:{}", admin_port).parse()?;
tracing::info!(
app_name = %cfg.app_name().unwrap_or_default(),
@ -35,20 +34,25 @@ async fn main() -> anyhow::Result<()> {
"Starting admin RPC server"
);
// ── OTLP tracing ─────────────────────────────────────────────────────────
let _otel_guard = if cfg.otel_enabled().unwrap_or(false) {
let endpoint = cfg.otel_endpoint().unwrap_or_else(|_| "http://localhost:4317".to_string());
let service_name = cfg.otel_service_name().unwrap_or_else(|_| "adminrpc".to_string());
let service_version = cfg.otel_service_version().unwrap_or_else(|_| "0.1.0".to_string());
let endpoint = cfg
.otel_endpoint()
.unwrap_or_else(|_| "http://localhost:4317".to_string());
let service_name = cfg
.otel_service_name()
.unwrap_or_else(|_| "adminrpc".to_string());
let service_version = cfg
.otel_service_version()
.unwrap_or_else(|_| "0.1.0".to_string());
tracing::info!(endpoint = %endpoint, service = %service_name, "OTLP tracing enabled");
let guard = observability::init_otlp(&endpoint, &service_name, &service_version, &log_level)
let guard =
observability::init_otlp(&endpoint, &service_name, &service_version, &log_level)
.map_err(|e| anyhow::anyhow!("OTLP init failed: {}", e))?;
guard
} else {
None
};
// Redis connection pool
let redis_url = cfg.redis_url()?;
tracing::info!(redis_url = %redis_url, "Connecting to Redis");
let manager = cluster::Manager::new(vec![redis_url.clone()], false)
@ -65,7 +69,6 @@ async fn main() -> anyhow::Result<()> {
let storage = SessionStorage::new(pool.clone());
let session_manager = SessionManager::new(storage);
// Spawn gRPC server in background
let sm_for_grpc = session_manager.clone();
let grpc_handle = tokio::spawn(async move {
if let Err(e) = serve(grpc_addr, sm_for_grpc).await {
@ -73,7 +76,6 @@ async fn main() -> anyhow::Result<()> {
}
});
// Start HTTP REST server
let http_handle = tokio::spawn(async move {
let pool_for_http = pool.clone();
let sm_for_http = session_manager.clone();
@ -86,15 +88,35 @@ async fn main() -> anyhow::Result<()> {
.route("/admin/metrics/export", web::get().to(metrics_export))
.service(
web::scope("/api/admin")
// Sessions
.route("/sessions/workspace/{workspace_id}", web::get().to(list_workspace_sessions))
.route("/sessions/user/{user_id}", web::get().to(list_user_sessions))
.route("/sessions/user/{user_id}/status", web::get().to(get_user_status))
.route("/sessions/user/{user_id}/info", web::get().to(get_user_info))
.route("/sessions/workspace/{workspace_id}/online-users", web::get().to(get_workspace_online_users))
.route("/sessions/user/{user_id}/online", web::get().to(is_user_online))
.route(
"/sessions/workspace/{workspace_id}",
web::get().to(list_workspace_sessions),
)
.route(
"/sessions/user/{user_id}",
web::get().to(list_user_sessions),
)
.route(
"/sessions/user/{user_id}/status",
web::get().to(get_user_status),
)
.route(
"/sessions/user/{user_id}/info",
web::get().to(get_user_info),
)
.route(
"/sessions/workspace/{workspace_id}/online-users",
web::get().to(get_workspace_online_users),
)
.route(
"/sessions/user/{user_id}/online",
web::get().to(is_user_online),
)
.route("/sessions/kick", web::post().to(kick_user))
.route("/sessions/kick-workspace", web::post().to(kick_user_from_workspace))
.route(
"/sessions/kick-workspace",
web::post().to(kick_user_from_workspace),
)
// Metrics
.route("/metrics", web::get().to(get_metrics))
.route("/metrics/export", web::get().to(metrics_export)),
@ -131,14 +153,18 @@ async fn metrics_export(pool: web::Data<cluster::Pool>) -> HttpResponse {
Ok(csv) => HttpResponse::Ok()
.content_type("text/csv; charset=utf-8")
.body(csv),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
async fn get_metrics(pool: web::Data<cluster::Pool>) -> HttpResponse {
match observability::query_all_instance_metrics(pool.get_ref(), "", 100).await {
Ok(instances) => HttpResponse::Ok().json(instances),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
@ -152,11 +178,16 @@ async fn list_workspace_sessions(
) -> HttpResponse {
let workspace_id = match parse_uuid(&path) {
Some(id) => id,
None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid workspace_id" })),
None => {
return HttpResponse::BadRequest()
.json(serde_json::json!({ "error": "invalid workspace_id" }));
}
};
match sm.get_workspace_sessions(&workspace_id).await {
Ok(sessions) => HttpResponse::Ok().json(sessions),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
@ -166,39 +197,50 @@ async fn list_user_sessions(
) -> HttpResponse {
let user_id = match parse_uuid(&path) {
Some(id) => id,
None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid user_id" })),
None => {
return HttpResponse::BadRequest()
.json(serde_json::json!({ "error": "invalid user_id" }));
}
};
match sm.get_user_sessions(&user_id).await {
Ok(sessions) => HttpResponse::Ok().json(sessions),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
async fn get_user_status(
sm: web::Data<SessionManager>,
path: web::Path<String>,
) -> HttpResponse {
async fn get_user_status(sm: web::Data<SessionManager>, path: web::Path<String>) -> HttpResponse {
let user_id = match parse_uuid(&path) {
Some(id) => id,
None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid user_id" })),
None => {
return HttpResponse::BadRequest()
.json(serde_json::json!({ "error": "invalid user_id" }));
}
};
match sm.get_user_status(&user_id).await {
Ok(status) => HttpResponse::Ok().json(serde_json::json!({ "status": format!("{:?}", status) })),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })),
Ok(status) => {
HttpResponse::Ok().json(serde_json::json!({ "status": format!("{:?}", status) }))
}
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
async fn get_user_info(
sm: web::Data<SessionManager>,
path: web::Path<String>,
) -> HttpResponse {
async fn get_user_info(sm: web::Data<SessionManager>, path: web::Path<String>) -> HttpResponse {
let user_id = match parse_uuid(&path) {
Some(id) => id,
None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid user_id" })),
None => {
return HttpResponse::BadRequest()
.json(serde_json::json!({ "error": "invalid user_id" }));
}
};
match sm.get_user_info(&user_id).await {
Ok(info) => HttpResponse::Ok().json(info),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
@ -208,25 +250,32 @@ async fn get_workspace_online_users(
) -> HttpResponse {
let workspace_id = match parse_uuid(&path) {
Some(id) => id,
None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid workspace_id" })),
None => {
return HttpResponse::BadRequest()
.json(serde_json::json!({ "error": "invalid workspace_id" }));
}
};
match sm.get_workspace_online_users(&workspace_id).await {
Ok(user_ids) => HttpResponse::Ok().json(serde_json::json!({ "user_ids": user_ids })),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
async fn is_user_online(
sm: web::Data<SessionManager>,
path: web::Path<String>,
) -> HttpResponse {
async fn is_user_online(sm: web::Data<SessionManager>, path: web::Path<String>) -> HttpResponse {
let user_id = match parse_uuid(&path) {
Some(id) => id,
None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid user_id" })),
None => {
return HttpResponse::BadRequest()
.json(serde_json::json!({ "error": "invalid user_id" }));
}
};
match sm.is_user_online(&user_id).await {
Ok(online) => HttpResponse::Ok().json(serde_json::json!({ "online": online })),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
@ -241,11 +290,16 @@ async fn kick_user(
) -> HttpResponse {
let user_id = match parse_uuid(&body.user_id) {
Some(id) => id,
None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid user_id" })),
None => {
return HttpResponse::BadRequest()
.json(serde_json::json!({ "error": "invalid user_id" }));
}
};
match sm.kick_user(&user_id).await {
Ok(count) => HttpResponse::Ok().json(serde_json::json!({ "kicked_count": count })),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
@ -261,14 +315,22 @@ async fn kick_user_from_workspace(
) -> HttpResponse {
let user_id = match parse_uuid(&body.user_id) {
Some(id) => id,
None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid user_id" })),
None => {
return HttpResponse::BadRequest()
.json(serde_json::json!({ "error": "invalid user_id" }));
}
};
let workspace_id = match parse_uuid(&body.workspace_id) {
Some(id) => id,
None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid workspace_id" })),
None => {
return HttpResponse::BadRequest()
.json(serde_json::json!({ "error": "invalid workspace_id" }));
}
};
match sm.kick_user_from_workspace(&user_id, &workspace_id).await {
Ok(count) => HttpResponse::Ok().json(serde_json::json!({ "kicked_count": count })),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}

View File

@ -1,4 +1,5 @@
use std::pin::Pin;
use std::time::Duration;
use async_openai::config::OpenAIConfig;
use async_openai::Client;
use async_openai::types::chat::{
@ -707,6 +708,28 @@ impl ChatService {
.await
}
/// Returns true if the error message indicates a transient failure that can be retried.
fn is_retryable_tool_error(msg: &str) -> bool {
let msg_lower = msg.to_lowercase();
// Transient errors: network, timeouts, rate limits, permission issues that may be temporary
msg_lower.contains("connection")
|| msg_lower.contains("timeout")
|| msg_lower.contains("timed out")
|| msg_lower.contains("rate limit")
|| msg_lower.contains("too many")
|| msg_lower.contains("unavailable")
|| msg_lower.contains("service unavailable")
|| msg_lower.contains("temporarily")
|| msg_lower.contains("refused")
|| msg_lower.contains("reset")
|| msg_lower.contains("broken pipe")
|| msg_lower.contains("deadline exceeded")
|| msg_lower.contains("try again")
|| msg_lower.contains("not found") // DB/Redis transient not-found
|| msg_lower.contains("permission denied")
|| msg_lower.contains("access denied")
}
/// Process a request using the ReAct (Reasoning + Acting) agent.
///
/// Unlike the simple loop in `process`, the ReAct agent performs multi-step
@ -756,7 +779,11 @@ impl ChatService {
let registry = registry.clone();
Box::pin(async move {
let mut ctx = ToolContext::new(db, cache, config, room_id, sender_uid);
let max_retries = 3;
let mut last_err = String::new();
for attempt in 0..=max_retries {
let mut ctx = ToolContext::new(db.clone(), cache.clone(), config.clone(), room_id, sender_uid);
if let Some(pid) = project_id {
ctx = ctx.with_project(pid);
}
@ -764,19 +791,58 @@ impl ChatService {
let tool_executor = ToolExecutor::new();
let call = ToolCall {
id: uuid::Uuid::new_v4().to_string(),
name,
id: Uuid::new_v4().to_string(),
name: name.clone(),
arguments: serde_json::to_string(&args).unwrap_or_else(|_| "{}".into()),
};
let results: Vec<_> = tool_executor
.execute_batch(vec![call], &mut ctx)
.await
.map_err(|e| e.to_string())?;
let result = results.into_iter().next().ok_or_else(|| "no result".to_string())?;
match tool_executor.execute_batch(vec![call], &mut ctx).await {
Ok(results) => {
let result = results.into_iter().next()
.ok_or_else(|| "no tool result returned".to_string())?;
match result.result {
ToolResult::Ok(v) => Ok(v),
ToolResult::Error(msg) => Err(msg),
ToolResult::Ok(v) => return Ok(v),
ToolResult::Error(msg) => {
// Check if error is retryable
if attempt < max_retries && Self::is_retryable_tool_error(&msg) {
last_err = msg;
let backoff_ms = 100u64.saturating_mul(2u64.pow(attempt as u32));
tracing::warn!(
tool = %name,
attempt = attempt + 1,
backoff_ms = backoff_ms,
error = %last_err,
"tool_execute_retry"
);
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
continue;
}
// Non-retryable or exhausted retries — pass error to AI as observation
return Err(msg);
}
}
}
Err(e) => {
last_err = e.to_string();
if attempt < max_retries && Self::is_retryable_tool_error(&last_err) {
let backoff_ms = 100u64.saturating_mul(2u64.pow(attempt as u32));
tracing::warn!(
tool = %name,
attempt = attempt + 1,
backoff_ms = backoff_ms,
error = %last_err,
"tool_execute_retry"
);
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
continue;
}
return Err(last_err);
}
}
}
// Should not reach here, but just in case
Err(last_err)
}) as Pin<Box<dyn std::future::Future<Output = std::result::Result<serde_json::Value, String>> + Send>>
});

View File

@ -57,6 +57,18 @@ You must respond in JSON format:
- Chain multiple tool calls if a single call is insufficient.
- After each tool result, re-evaluate whether more data is needed before providing a final answer.
## Handling Tool Errors
When a tool returns an error observation (a JSON object with an "error" field):
- **Transient errors** (e.g., "connection refused", "not found", "timeout", "rate limit", "permission denied"): Retry with adjusted arguments, or try an alternative tool.
- **Permanent errors** (e.g., "invalid arguments", "tool not registered"): Do NOT retry acknowledge the error and try a different approach or reformulate your question.
- **Empty results** (e.g., "no issues found"): This is NOT an error continue with the next logical tool or provide your answer based on what was found.
The system automatically retries transient failures up to 3 times with backoff, but you should still:
1. Fix any malformed arguments before retrying.
2. If the same tool fails twice with the same error, switch to a different approach.
3. Always provide a useful answer even if all tools fail state what you attempted and what went wrong.
## Principles
- Be precise and cite specific issue/PR numbers, commit hashes, or message IDs when available.

8
libs/rpc/buf.gen.yaml Normal file
View File

@ -0,0 +1,8 @@
version: v2
managed:
enabled: true
plugins:
- remote: buf.build/bufbuild/es
out: ../../admin/src/lib/adminrpc/generated
- remote: buf.build/bufbuild/connect-es
out: ../../admin/src/lib/adminrpc/generated

9
libs/rpc/buf.yaml Normal file
View File

@ -0,0 +1,9 @@
version: v2
name: buf.build/gitdataai/code
deps:
- buf.build/googleapis/googlerpc
lint:
use:
- DEFAULT
except:
- PACKAGE_VERSION_SUFFIX

View File

@ -121,6 +121,98 @@ message ExportMetricsCsvResponse {
string csv = 1;
}
// ---------------------------------------------------------------------------
// AI Model Sync
// ---------------------------------------------------------------------------
message SyncModelsRequest {}
message SyncModelsResponse {
string body_json = 1; // Serialized SyncModelsResponse JSON
}
// ---------------------------------------------------------------------------
// Billing Alert Check
// ---------------------------------------------------------------------------
message CheckAlertsRequest {}
message CheckAlertsResponse {
string body_json = 1; // Serialized CheckAlertsResponse JSON
}
// ---------------------------------------------------------------------------
// AI Provider CRUD
// ---------------------------------------------------------------------------
message CreateProviderRequest {
string body_json = 1; // Serialized AdminCreateProvider JSON
}
message UpdateProviderRequest {
string id = 1;
string body_json = 2; // Serialized AdminUpdateProvider JSON
}
message DeleteProviderRequest {
string id = 1;
}
message ProviderResponse {
string body_json = 1; // Serialized response JSON
}
// ---------------------------------------------------------------------------
// AI Model CRUD
// ---------------------------------------------------------------------------
message CreateModelRequest {
string body_json = 1; // Serialized AdminCreateModel JSON
}
message UpdateModelRequest {
string id = 1;
string body_json = 2; // Serialized AdminUpdateModel JSON
}
message DeleteModelRequest {
string id = 1;
}
message ModelResponse {
string body_json = 1;
}
// ---------------------------------------------------------------------------
// AI Version CRUD
// ---------------------------------------------------------------------------
message CreateVersionRequest {
string body_json = 1; // Serialized AdminCreateVersion JSON
}
message UpdateVersionRequest {
string id = 1;
string body_json = 2; // Serialized AdminUpdateVersion JSON
}
message DeleteVersionRequest {
string id = 1;
}
message VersionResponse {
string body_json = 1;
}
// ---------------------------------------------------------------------------
// AI Pricing Update
// ---------------------------------------------------------------------------
message UpdatePricingRequest {
string id = 1;
string body_json = 2; // Serialized AdminUpdatePricing JSON
}
message PricingResponse {
string body_json = 1;
}
// ---------------------------------------------------------------------------
// Generic delete response
// ---------------------------------------------------------------------------
message DeleteResponse {
bool deleted = 1;
}
// ---------------------------------------------------------------------------
// Service
// ---------------------------------------------------------------------------
@ -136,4 +228,21 @@ service SessionAdmin {
rpc IsUserOnline(IsUserOnlineRequest) returns (IsUserOnlineResponse);
rpc GetMetrics(GetMetricsRequest) returns (GetMetricsResponse);
rpc ExportMetricsCsv(ExportMetricsCsvRequest) returns (ExportMetricsCsvResponse);
// AI
rpc SyncModels(SyncModelsRequest) returns (SyncModelsResponse);
rpc CheckAlerts(CheckAlertsRequest) returns (CheckAlertsResponse);
// AI Provider
rpc CreateProvider(CreateProviderRequest) returns (ProviderResponse);
rpc UpdateProvider(UpdateProviderRequest) returns (ProviderResponse);
rpc DeleteProvider(DeleteProviderRequest) returns (DeleteResponse);
// AI Model
rpc CreateModel(CreateModelRequest) returns (ModelResponse);
rpc UpdateModel(UpdateModelRequest) returns (ModelResponse);
rpc DeleteModel(DeleteModelRequest) returns (DeleteResponse);
// AI Version
rpc CreateVersion(CreateVersionRequest) returns (VersionResponse);
rpc UpdateVersion(UpdateVersionRequest) returns (VersionResponse);
rpc DeleteVersion(DeleteVersionRequest) returns (DeleteResponse);
// AI Pricing
rpc UpdatePricing(UpdatePricingRequest) returns (PricingResponse);
}