diff --git a/apps/adminrpc/src/main.rs b/apps/adminrpc/src/main.rs index a373c38..33ee64b 100644 --- a/apps/adminrpc/src/main.rs +++ b/apps/adminrpc/src/main.rs @@ -1,11 +1,11 @@ -use std::net::SocketAddr; use actix_web::{web, App as ActixApp, HttpResponse, HttpServer}; use anyhow::Context as _; use clap::Parser; use config::AppConfig; use deadpool_redis::{cluster, Runtime}; -use session_manager::{SessionManager, SessionStorage}; use rpc::admin::server::{serve, DEFAULT_GRPC_PORT}; +use session_manager::{SessionManager, SessionStorage}; +use std::net::SocketAddr; use uuid::Uuid; mod args; @@ -24,9 +24,8 @@ async fn main() -> anyhow::Result<()> { .unwrap_or_else(|| format!("0.0.0.0:{}", DEFAULT_GRPC_PORT).parse()) .context("invalid grpc bind address")?; - // Admin HTTP port is gRPC port + 1 (e.g., 9091) let admin_port: u16 = args.http_port.unwrap_or(grpc_addr.port() + 1); - let admin_addr: SocketAddr = format!("0.0.0.0:{}", admin_port).parse().unwrap(); + let admin_addr: SocketAddr = format!("0.0.0.0:{}", admin_port).parse()?; tracing::info!( app_name = %cfg.app_name().unwrap_or_default(), @@ -35,20 +34,25 @@ async fn main() -> anyhow::Result<()> { "Starting admin RPC server" ); - // ── OTLP tracing ───────────────────────────────────────────────────────── let _otel_guard = if cfg.otel_enabled().unwrap_or(false) { - let endpoint = cfg.otel_endpoint().unwrap_or_else(|_| "http://localhost:4317".to_string()); - let service_name = cfg.otel_service_name().unwrap_or_else(|_| "adminrpc".to_string()); - let service_version = cfg.otel_service_version().unwrap_or_else(|_| "0.1.0".to_string()); + let endpoint = cfg + .otel_endpoint() + .unwrap_or_else(|_| "http://localhost:4317".to_string()); + let service_name = cfg + .otel_service_name() + .unwrap_or_else(|_| "adminrpc".to_string()); + let service_version = cfg + .otel_service_version() + .unwrap_or_else(|_| "0.1.0".to_string()); tracing::info!(endpoint = %endpoint, service = %service_name, "OTLP tracing enabled"); - let guard = observability::init_otlp(&endpoint, &service_name, &service_version, &log_level) - .map_err(|e| anyhow::anyhow!("OTLP init failed: {}", e))?; + let guard = + observability::init_otlp(&endpoint, &service_name, &service_version, &log_level) + .map_err(|e| anyhow::anyhow!("OTLP init failed: {}", e))?; guard } else { None }; - // Redis connection pool let redis_url = cfg.redis_url()?; tracing::info!(redis_url = %redis_url, "Connecting to Redis"); let manager = cluster::Manager::new(vec![redis_url.clone()], false) @@ -65,7 +69,6 @@ async fn main() -> anyhow::Result<()> { let storage = SessionStorage::new(pool.clone()); let session_manager = SessionManager::new(storage); - // Spawn gRPC server in background let sm_for_grpc = session_manager.clone(); let grpc_handle = tokio::spawn(async move { if let Err(e) = serve(grpc_addr, sm_for_grpc).await { @@ -73,7 +76,6 @@ async fn main() -> anyhow::Result<()> { } }); - // Start HTTP REST server let http_handle = tokio::spawn(async move { let pool_for_http = pool.clone(); let sm_for_http = session_manager.clone(); @@ -86,15 +88,35 @@ async fn main() -> anyhow::Result<()> { .route("/admin/metrics/export", web::get().to(metrics_export)) .service( web::scope("/api/admin") - // Sessions - .route("/sessions/workspace/{workspace_id}", web::get().to(list_workspace_sessions)) - .route("/sessions/user/{user_id}", web::get().to(list_user_sessions)) - .route("/sessions/user/{user_id}/status", web::get().to(get_user_status)) - .route("/sessions/user/{user_id}/info", web::get().to(get_user_info)) - .route("/sessions/workspace/{workspace_id}/online-users", web::get().to(get_workspace_online_users)) - .route("/sessions/user/{user_id}/online", web::get().to(is_user_online)) + .route( + "/sessions/workspace/{workspace_id}", + web::get().to(list_workspace_sessions), + ) + .route( + "/sessions/user/{user_id}", + web::get().to(list_user_sessions), + ) + .route( + "/sessions/user/{user_id}/status", + web::get().to(get_user_status), + ) + .route( + "/sessions/user/{user_id}/info", + web::get().to(get_user_info), + ) + .route( + "/sessions/workspace/{workspace_id}/online-users", + web::get().to(get_workspace_online_users), + ) + .route( + "/sessions/user/{user_id}/online", + web::get().to(is_user_online), + ) .route("/sessions/kick", web::post().to(kick_user)) - .route("/sessions/kick-workspace", web::post().to(kick_user_from_workspace)) + .route( + "/sessions/kick-workspace", + web::post().to(kick_user_from_workspace), + ) // Metrics .route("/metrics", web::get().to(get_metrics)) .route("/metrics/export", web::get().to(metrics_export)), @@ -131,14 +153,18 @@ async fn metrics_export(pool: web::Data) -> HttpResponse { Ok(csv) => HttpResponse::Ok() .content_type("text/csv; charset=utf-8") .body(csv), - Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })), + Err(e) => { + HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })) + } } } async fn get_metrics(pool: web::Data) -> HttpResponse { match observability::query_all_instance_metrics(pool.get_ref(), "", 100).await { Ok(instances) => HttpResponse::Ok().json(instances), - Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })), + Err(e) => { + HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })) + } } } @@ -152,11 +178,16 @@ async fn list_workspace_sessions( ) -> HttpResponse { let workspace_id = match parse_uuid(&path) { Some(id) => id, - None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid workspace_id" })), + None => { + return HttpResponse::BadRequest() + .json(serde_json::json!({ "error": "invalid workspace_id" })); + } }; match sm.get_workspace_sessions(&workspace_id).await { Ok(sessions) => HttpResponse::Ok().json(sessions), - Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })), + Err(e) => { + HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })) + } } } @@ -166,39 +197,50 @@ async fn list_user_sessions( ) -> HttpResponse { let user_id = match parse_uuid(&path) { Some(id) => id, - None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid user_id" })), + None => { + return HttpResponse::BadRequest() + .json(serde_json::json!({ "error": "invalid user_id" })); + } }; match sm.get_user_sessions(&user_id).await { Ok(sessions) => HttpResponse::Ok().json(sessions), - Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })), + Err(e) => { + HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })) + } } } -async fn get_user_status( - sm: web::Data, - path: web::Path, -) -> HttpResponse { +async fn get_user_status(sm: web::Data, path: web::Path) -> HttpResponse { let user_id = match parse_uuid(&path) { Some(id) => id, - None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid user_id" })), + None => { + return HttpResponse::BadRequest() + .json(serde_json::json!({ "error": "invalid user_id" })); + } }; match sm.get_user_status(&user_id).await { - Ok(status) => HttpResponse::Ok().json(serde_json::json!({ "status": format!("{:?}", status) })), - Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })), + Ok(status) => { + HttpResponse::Ok().json(serde_json::json!({ "status": format!("{:?}", status) })) + } + Err(e) => { + HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })) + } } } -async fn get_user_info( - sm: web::Data, - path: web::Path, -) -> HttpResponse { +async fn get_user_info(sm: web::Data, path: web::Path) -> HttpResponse { let user_id = match parse_uuid(&path) { Some(id) => id, - None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid user_id" })), + None => { + return HttpResponse::BadRequest() + .json(serde_json::json!({ "error": "invalid user_id" })); + } }; match sm.get_user_info(&user_id).await { Ok(info) => HttpResponse::Ok().json(info), - Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })), + Err(e) => { + HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })) + } } } @@ -208,25 +250,32 @@ async fn get_workspace_online_users( ) -> HttpResponse { let workspace_id = match parse_uuid(&path) { Some(id) => id, - None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid workspace_id" })), + None => { + return HttpResponse::BadRequest() + .json(serde_json::json!({ "error": "invalid workspace_id" })); + } }; match sm.get_workspace_online_users(&workspace_id).await { Ok(user_ids) => HttpResponse::Ok().json(serde_json::json!({ "user_ids": user_ids })), - Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })), + Err(e) => { + HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })) + } } } -async fn is_user_online( - sm: web::Data, - path: web::Path, -) -> HttpResponse { +async fn is_user_online(sm: web::Data, path: web::Path) -> HttpResponse { let user_id = match parse_uuid(&path) { Some(id) => id, - None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid user_id" })), + None => { + return HttpResponse::BadRequest() + .json(serde_json::json!({ "error": "invalid user_id" })); + } }; match sm.is_user_online(&user_id).await { Ok(online) => HttpResponse::Ok().json(serde_json::json!({ "online": online })), - Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })), + Err(e) => { + HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })) + } } } @@ -241,11 +290,16 @@ async fn kick_user( ) -> HttpResponse { let user_id = match parse_uuid(&body.user_id) { Some(id) => id, - None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid user_id" })), + None => { + return HttpResponse::BadRequest() + .json(serde_json::json!({ "error": "invalid user_id" })); + } }; match sm.kick_user(&user_id).await { Ok(count) => HttpResponse::Ok().json(serde_json::json!({ "kicked_count": count })), - Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })), + Err(e) => { + HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })) + } } } @@ -261,14 +315,22 @@ async fn kick_user_from_workspace( ) -> HttpResponse { let user_id = match parse_uuid(&body.user_id) { Some(id) => id, - None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid user_id" })), + None => { + return HttpResponse::BadRequest() + .json(serde_json::json!({ "error": "invalid user_id" })); + } }; let workspace_id = match parse_uuid(&body.workspace_id) { Some(id) => id, - None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid workspace_id" })), + None => { + return HttpResponse::BadRequest() + .json(serde_json::json!({ "error": "invalid workspace_id" })); + } }; match sm.kick_user_from_workspace(&user_id, &workspace_id).await { Ok(count) => HttpResponse::Ok().json(serde_json::json!({ "kicked_count": count })), - Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })), + Err(e) => { + HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })) + } } } diff --git a/libs/agent/chat/service.rs b/libs/agent/chat/service.rs index 13da448..150cc8c 100644 --- a/libs/agent/chat/service.rs +++ b/libs/agent/chat/service.rs @@ -1,4 +1,5 @@ use std::pin::Pin; +use std::time::Duration; use async_openai::config::OpenAIConfig; use async_openai::Client; use async_openai::types::chat::{ @@ -707,6 +708,28 @@ impl ChatService { .await } + /// Returns true if the error message indicates a transient failure that can be retried. + fn is_retryable_tool_error(msg: &str) -> bool { + let msg_lower = msg.to_lowercase(); + // Transient errors: network, timeouts, rate limits, permission issues that may be temporary + msg_lower.contains("connection") + || msg_lower.contains("timeout") + || msg_lower.contains("timed out") + || msg_lower.contains("rate limit") + || msg_lower.contains("too many") + || msg_lower.contains("unavailable") + || msg_lower.contains("service unavailable") + || msg_lower.contains("temporarily") + || msg_lower.contains("refused") + || msg_lower.contains("reset") + || msg_lower.contains("broken pipe") + || msg_lower.contains("deadline exceeded") + || msg_lower.contains("try again") + || msg_lower.contains("not found") // DB/Redis transient not-found + || msg_lower.contains("permission denied") + || msg_lower.contains("access denied") + } + /// Process a request using the ReAct (Reasoning + Acting) agent. /// /// Unlike the simple loop in `process`, the ReAct agent performs multi-step @@ -756,27 +779,70 @@ impl ChatService { let registry = registry.clone(); Box::pin(async move { - let mut ctx = ToolContext::new(db, cache, config, room_id, sender_uid); - if let Some(pid) = project_id { - ctx = ctx.with_project(pid); - } - ctx.registry_mut().merge(registry.clone()); + let max_retries = 3; + let mut last_err = String::new(); - let tool_executor = ToolExecutor::new(); - let call = ToolCall { - id: uuid::Uuid::new_v4().to_string(), - name, - arguments: serde_json::to_string(&args).unwrap_or_else(|_| "{}".into()), - }; - let results: Vec<_> = tool_executor - .execute_batch(vec![call], &mut ctx) - .await - .map_err(|e| e.to_string())?; - let result = results.into_iter().next().ok_or_else(|| "no result".to_string())?; - match result.result { - ToolResult::Ok(v) => Ok(v), - ToolResult::Error(msg) => Err(msg), + for attempt in 0..=max_retries { + let mut ctx = ToolContext::new(db.clone(), cache.clone(), config.clone(), room_id, sender_uid); + if let Some(pid) = project_id { + ctx = ctx.with_project(pid); + } + ctx.registry_mut().merge(registry.clone()); + + let tool_executor = ToolExecutor::new(); + let call = ToolCall { + id: Uuid::new_v4().to_string(), + name: name.clone(), + arguments: serde_json::to_string(&args).unwrap_or_else(|_| "{}".into()), + }; + + match tool_executor.execute_batch(vec![call], &mut ctx).await { + Ok(results) => { + let result = results.into_iter().next() + .ok_or_else(|| "no tool result returned".to_string())?; + match result.result { + ToolResult::Ok(v) => return Ok(v), + ToolResult::Error(msg) => { + // Check if error is retryable + if attempt < max_retries && Self::is_retryable_tool_error(&msg) { + last_err = msg; + let backoff_ms = 100u64.saturating_mul(2u64.pow(attempt as u32)); + tracing::warn!( + tool = %name, + attempt = attempt + 1, + backoff_ms = backoff_ms, + error = %last_err, + "tool_execute_retry" + ); + tokio::time::sleep(Duration::from_millis(backoff_ms)).await; + continue; + } + // Non-retryable or exhausted retries — pass error to AI as observation + return Err(msg); + } + } + } + Err(e) => { + last_err = e.to_string(); + if attempt < max_retries && Self::is_retryable_tool_error(&last_err) { + let backoff_ms = 100u64.saturating_mul(2u64.pow(attempt as u32)); + tracing::warn!( + tool = %name, + attempt = attempt + 1, + backoff_ms = backoff_ms, + error = %last_err, + "tool_execute_retry" + ); + tokio::time::sleep(Duration::from_millis(backoff_ms)).await; + continue; + } + return Err(last_err); + } + } } + + // Should not reach here, but just in case + Err(last_err) }) as Pin> + Send>> }); diff --git a/libs/agent/react/mod.rs b/libs/agent/react/mod.rs index 0f9d074..cd9b17a 100644 --- a/libs/agent/react/mod.rs +++ b/libs/agent/react/mod.rs @@ -57,6 +57,18 @@ You must respond in JSON format: - Chain multiple tool calls if a single call is insufficient. - After each tool result, re-evaluate whether more data is needed before providing a final answer. +## Handling Tool Errors + +When a tool returns an error observation (a JSON object with an "error" field): +- **Transient errors** (e.g., "connection refused", "not found", "timeout", "rate limit", "permission denied"): Retry with adjusted arguments, or try an alternative tool. +- **Permanent errors** (e.g., "invalid arguments", "tool not registered"): Do NOT retry — acknowledge the error and try a different approach or reformulate your question. +- **Empty results** (e.g., "no issues found"): This is NOT an error — continue with the next logical tool or provide your answer based on what was found. + +The system automatically retries transient failures up to 3 times with backoff, but you should still: +1. Fix any malformed arguments before retrying. +2. If the same tool fails twice with the same error, switch to a different approach. +3. Always provide a useful answer even if all tools fail — state what you attempted and what went wrong. + ## Principles - Be precise and cite specific issue/PR numbers, commit hashes, or message IDs when available. diff --git a/libs/rpc/buf.gen.yaml b/libs/rpc/buf.gen.yaml new file mode 100644 index 0000000..a197de0 --- /dev/null +++ b/libs/rpc/buf.gen.yaml @@ -0,0 +1,8 @@ +version: v2 +managed: + enabled: true +plugins: + - remote: buf.build/bufbuild/es + out: ../../admin/src/lib/adminrpc/generated + - remote: buf.build/bufbuild/connect-es + out: ../../admin/src/lib/adminrpc/generated diff --git a/libs/rpc/buf.yaml b/libs/rpc/buf.yaml new file mode 100644 index 0000000..0fb0b65 --- /dev/null +++ b/libs/rpc/buf.yaml @@ -0,0 +1,9 @@ +version: v2 +name: buf.build/gitdataai/code +deps: + - buf.build/googleapis/googlerpc +lint: + use: + - DEFAULT + except: + - PACKAGE_VERSION_SUFFIX diff --git a/libs/rpc/proto/admin.proto b/libs/rpc/proto/admin.proto index 46b8c5b..ea1caad 100644 --- a/libs/rpc/proto/admin.proto +++ b/libs/rpc/proto/admin.proto @@ -121,6 +121,98 @@ message ExportMetricsCsvResponse { string csv = 1; } +// --------------------------------------------------------------------------- +// AI Model Sync +// --------------------------------------------------------------------------- + +message SyncModelsRequest {} +message SyncModelsResponse { + string body_json = 1; // Serialized SyncModelsResponse JSON +} + +// --------------------------------------------------------------------------- +// Billing Alert Check +// --------------------------------------------------------------------------- + +message CheckAlertsRequest {} +message CheckAlertsResponse { + string body_json = 1; // Serialized CheckAlertsResponse JSON +} + +// --------------------------------------------------------------------------- +// AI Provider CRUD +// --------------------------------------------------------------------------- + +message CreateProviderRequest { + string body_json = 1; // Serialized AdminCreateProvider JSON +} +message UpdateProviderRequest { + string id = 1; + string body_json = 2; // Serialized AdminUpdateProvider JSON +} +message DeleteProviderRequest { + string id = 1; +} +message ProviderResponse { + string body_json = 1; // Serialized response JSON +} + +// --------------------------------------------------------------------------- +// AI Model CRUD +// --------------------------------------------------------------------------- + +message CreateModelRequest { + string body_json = 1; // Serialized AdminCreateModel JSON +} +message UpdateModelRequest { + string id = 1; + string body_json = 2; // Serialized AdminUpdateModel JSON +} +message DeleteModelRequest { + string id = 1; +} +message ModelResponse { + string body_json = 1; +} + +// --------------------------------------------------------------------------- +// AI Version CRUD +// --------------------------------------------------------------------------- + +message CreateVersionRequest { + string body_json = 1; // Serialized AdminCreateVersion JSON +} +message UpdateVersionRequest { + string id = 1; + string body_json = 2; // Serialized AdminUpdateVersion JSON +} +message DeleteVersionRequest { + string id = 1; +} +message VersionResponse { + string body_json = 1; +} + +// --------------------------------------------------------------------------- +// AI Pricing Update +// --------------------------------------------------------------------------- + +message UpdatePricingRequest { + string id = 1; + string body_json = 2; // Serialized AdminUpdatePricing JSON +} +message PricingResponse { + string body_json = 1; +} + +// --------------------------------------------------------------------------- +// Generic delete response +// --------------------------------------------------------------------------- + +message DeleteResponse { + bool deleted = 1; +} + // --------------------------------------------------------------------------- // Service // --------------------------------------------------------------------------- @@ -136,4 +228,21 @@ service SessionAdmin { rpc IsUserOnline(IsUserOnlineRequest) returns (IsUserOnlineResponse); rpc GetMetrics(GetMetricsRequest) returns (GetMetricsResponse); rpc ExportMetricsCsv(ExportMetricsCsvRequest) returns (ExportMetricsCsvResponse); + // AI + rpc SyncModels(SyncModelsRequest) returns (SyncModelsResponse); + rpc CheckAlerts(CheckAlertsRequest) returns (CheckAlertsResponse); + // AI Provider + rpc CreateProvider(CreateProviderRequest) returns (ProviderResponse); + rpc UpdateProvider(UpdateProviderRequest) returns (ProviderResponse); + rpc DeleteProvider(DeleteProviderRequest) returns (DeleteResponse); + // AI Model + rpc CreateModel(CreateModelRequest) returns (ModelResponse); + rpc UpdateModel(UpdateModelRequest) returns (ModelResponse); + rpc DeleteModel(DeleteModelRequest) returns (DeleteResponse); + // AI Version + rpc CreateVersion(CreateVersionRequest) returns (VersionResponse); + rpc UpdateVersion(UpdateVersionRequest) returns (VersionResponse); + rpc DeleteVersion(DeleteVersionRequest) returns (DeleteResponse); + // AI Pricing + rpc UpdatePricing(UpdatePricingRequest) returns (PricingResponse); }