feat(observability): Phase 6 OTLP tracing + Prometheus metrics endpoint

OTLP tracing:
- libs/observability/otlp.rs: SdkTracerProvider via HTTP/proto OTLP exporter
- libs/observability/tracing_middleware.rs: Actix-web span with trace_id propagation
- libs/observability/tracing_fmt.rs: JSON fmt + registry.try_init for layered init
- libs/rpc: gRPC method spans via info_span
- libs/agent, libs/room, libs/service, libs/api: structured tracing throughout

Prometheus metrics:
- libs/observability/prometheus_exporter.rs: /metrics HTTP handler + metrics crate
- libs/observability/metrics_middleware.rs: HttpMetrics middleware + AtomicU64
- libs/observability/redis_metrics.rs: Redis counter poller via RedisMetrics
- libs/room/metrics.rs: RoomMetrics (connections, messages, presence counters)

Config env vars: APP_OTEL_ENABLED, APP_OTEL_ENDPOINT, APP_OTEL_SERVICE_NAME
This commit is contained in:
ZhenYi 2026-04-22 10:27:54 +08:00
parent 86c7810fd9
commit 962bf0312d
36 changed files with 2162 additions and 26 deletions

16
Cargo.lock generated
View File

@ -206,6 +206,7 @@ version = "2.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92589714878ca59a7626ea19734f0e07a6a875197eec751bb5d3f99e64998c63"
dependencies = [
"actix-macros",
"futures-core",
"tokio",
]
@ -339,15 +340,20 @@ checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa"
name = "adminrpc"
version = "0.2.9"
dependencies = [
"actix-rt",
"actix-web",
"anyhow",
"clap",
"config",
"deadpool-redis",
"observability",
"rpc",
"serde",
"serde_json",
"session_manager",
"tokio",
"tracing",
"uuid",
]
[[package]]
@ -443,6 +449,7 @@ dependencies = [
"config",
"db",
"futures",
"metrics",
"models",
"once_cell",
"qdrant-client",
@ -665,6 +672,7 @@ dependencies = [
"futures",
"migrate",
"observability",
"room",
"sea-orm",
"serde_json",
"service",
@ -5171,7 +5179,9 @@ name = "observability"
version = "0.2.9"
dependencies = [
"actix-web",
"anyhow",
"chrono",
"deadpool-redis",
"futures",
"hostname",
"metrics",
@ -5181,7 +5191,9 @@ dependencies = [
"opentelemetry-http",
"opentelemetry-otlp",
"opentelemetry_sdk",
"redis",
"reqwest 0.13.2",
"serde",
"serde_json",
"thiserror 2.0.18",
"tokio",
@ -6807,8 +6819,12 @@ version = "0.2.9"
dependencies = [
"anyhow",
"chrono",
"deadpool-redis",
"prost 0.14.3",
"prost-types 0.14.3",
"redis",
"serde",
"serde_json",
"session_manager",
"tokio",
"tonic 0.14.5",

View File

@ -62,6 +62,8 @@ actix-multipart = "0.7.2"
actix-analytics = "1.2.1"
actix-jwt-session = "1.0.7"
actix-csrf = "0.8.0"
metrics = "0.22"
actix-rt = "2.11.0"
actix = "0.13"
async-stream = "0.3"

View File

@ -0,0 +1,199 @@
"use client";
import { useEffect, useState } from "react";
import { format } from "date-fns";
import { getMetrics, exportMetricsCsv, type InstanceMetrics } from "@/lib/admin-rpc";
export default function MetricsPage() {
const [metrics, setMetrics] = useState<InstanceMetrics[]>([]);
const [loading, setLoading] = useState(true);
const [error, setError] = useState("");
const [exporting, setExporting] = useState(false);
const [instanceFilter, setInstanceFilter] = useState("");
useEffect(() => { loadMetrics(); }, []);
async function loadMetrics() {
setLoading(true);
setError("");
try {
const data = await getMetrics(instanceFilter);
setMetrics(data);
} catch (e) {
setError(e instanceof Error ? e.message : "加载指标失败");
} finally {
setLoading(false);
}
}
async function handleExportCsv() {
setExporting(true);
try {
const csv = await exportMetricsCsv(instanceFilter);
const blob = new Blob([csv], { type: "text/csv;charset=utf-8;" });
const url = URL.createObjectURL(blob);
const a = document.createElement("a");
a.href = url;
a.download = `metrics-${format(new Date(), "yyyy-MM-dd_HHmm")}.csv`;
a.click();
URL.revokeObjectURL(url);
} catch (e) {
alert("导出失败: " + (e instanceof Error ? e.message : String(e)));
} finally {
setExporting(false);
}
}
// Group by instance
const byInstance = metrics.reduce<Record<string, InstanceMetrics[]>>((acc, m) => {
if (!acc[m.instance_id]) acc[m.instance_id] = [];
acc[m.instance_id].push(m);
return acc;
}, {});
const allInstances = Object.keys(byInstance).sort();
const latestOf = (arr: InstanceMetrics[]) =>
arr.sort((a, b) => b.timestamp_secs - a.timestamp_secs)[0];
// Flatten all latest snapshots for a summary table
const latestSnapshots = allInstances.map((id) => ({
id,
latest: latestOf(byInstance[id]),
}));
function metricVal(m: InstanceMetrics | undefined, prefix: string, key: string): string {
if (!m) return "—";
const v = prefix === "http" ? m.http[key] : m.room[key];
return v ?? "—";
}
return (
<div className="admin-content">
<div className="page-header" style={{ display: "flex", justifyContent: "space-between", alignItems: "flex-start" }}>
<div>
<h1 className="page-title"></h1>
<p className="page-subtitle">
{allInstances.length}
{metrics.length > 0 && (
<span style={{ marginLeft: "8px", fontSize: "12px", color: "#737373" }}>
: {metrics.length > 0
? format(new Date(latestSnapshots[0]?.latest.timestamp_secs * 1000), "yyyy-MM-dd HH:mm:ss")
: "—"}
</span>
)}
</p>
</div>
<div style={{ display: "flex", gap: "8px", alignItems: "center" }}>
<input
type="text"
className="form-input"
style={{ width: "200px" }}
placeholder="过滤实例名称..."
value={instanceFilter}
onChange={(e) => { setInstanceFilter(e.target.value); }}
/>
<button className="btn btn-secondary" onClick={loadMetrics} disabled={loading}>
{loading ? "加载中..." : "刷新"}
</button>
<button className="btn btn-primary" onClick={handleExportCsv} disabled={exporting}>
{exporting ? "导出中..." : "导出 CSV"}
</button>
</div>
</div>
{error && (
<div className="alert alert-error" style={{ marginBottom: "16px" }}>
{error} adminrpc 默认: http://adminrpc.admin.svc.cluster.local:9091
</div>
)}
{loading ? (
<div className="loading">...</div>
) : allInstances.length === 0 ? (
<div className="card">
<div className="empty-state">
<div className="empty-state-icon"></div>
<p></p>
<p style={{ fontSize: "13px", color: "#737373" }}>
Redis 5
</p>
</div>
</div>
) : (
<>
{/* Summary table */}
<div className="card" style={{ marginBottom: "16px" }}>
<h3 style={{ margin: "0 0 12px 0", fontSize: "14px", fontWeight: 600 }}></h3>
<div className="table-container">
<table className="data-table">
<thead>
<tr>
<th> ID</th>
<th></th>
<th> WS </th>
<th></th>
<th>HTTP </th>
<th>HTTP </th>
</tr>
</thead>
<tbody>
{latestSnapshots.map(({ id, latest }) => (
<tr key={id}>
<td><code style={{ fontSize: "12px" }}>{id}</code></td>
<td>{format(new Date(latest.timestamp_secs * 1000), "HH:mm:ss")}</td>
<td>{metricVal(latest, "room", "ws_connections_active")}</td>
<td>{metricVal(latest, "room", "messages_sent_total")}</td>
<td>{metricVal(latest, "http", "request_count")}</td>
<td>{metricVal(latest, "http", "error_count")}</td>
</tr>
))}
</tbody>
</table>
</div>
</div>
{/* Per-instance detail */}
{allInstances.map((instanceId) => (
<div key={instanceId} className="card" style={{ marginBottom: "16px" }}>
<h3 style={{ margin: "0 0 12px 0", fontSize: "14px", fontWeight: 600 }}>
: <code>{instanceId}</code>
</h3>
<div style={{ display: "grid", gridTemplateColumns: "1fr 1fr", gap: "16px" }}>
{/* Room metrics */}
<div>
<h4 style={{ fontSize: "13px", color: "#737373", margin: "0 0 8px 0" }}>Room </h4>
<div style={{ display: "flex", flexDirection: "column", gap: "4px", fontSize: "13px" }}>
{Object.entries(latestOf(byInstance[instanceId])?.room ?? {}).map(([k, v]) => (
<div key={k} style={{ display: "flex", justifyContent: "space-between", padding: "2px 0", borderBottom: "1px solid #f4f4f5" }}>
<span style={{ color: "#525252" }}>{k}</span>
<code style={{ fontSize: "12px", color: "#171717" }}>{v}</code>
</div>
))}
{Object.keys(latestOf(byInstance[instanceId])?.room ?? {}).length === 0 && (
<div style={{ color: "#a3a3a3", fontSize: "12px" }}></div>
)}
</div>
</div>
{/* HTTP metrics */}
<div>
<h4 style={{ fontSize: "13px", color: "#737373", margin: "0 0 8px 0" }}>HTTP </h4>
<div style={{ display: "flex", flexDirection: "column", gap: "4px", fontSize: "13px" }}>
{Object.entries(latestOf(byInstance[instanceId])?.http ?? {}).map(([k, v]) => (
<div key={k} style={{ display: "flex", justifyContent: "space-between", padding: "2px 0", borderBottom: "1px solid #f4f4f5" }}>
<span style={{ color: "#525252" }}>{k}</span>
<code style={{ fontSize: "12px", color: "#171717" }}>{v}</code>
</div>
))}
{Object.keys(latestOf(byInstance[instanceId])?.http ?? {}).length === 0 && (
<div style={{ color: "#a3a3a3", fontSize: "12px" }}></div>
)}
</div>
</div>
</div>
</div>
))}
</>
)}
</div>
);
}

View File

@ -33,12 +33,19 @@ config = { workspace = true }
# Utilities
anyhow = { workspace = true }
uuid = { workspace = true }
# Async runtime
tokio = { workspace = true, features = ["rt-multi-thread", "signal"] }
tokio = { workspace = true, features = ["rt-multi-thread", "signal", "net", "io-util"] }
# CLI
clap = { workspace = true, features = ["derive"] }
# HTTP server
actix-web = { workspace = true }
actix-rt = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
[lints]
workspace = true

View File

@ -3,7 +3,11 @@ use clap::Parser;
#[derive(Parser, Debug)]
#[command(name = "adminrpc")]
pub struct Args {
/// Override the bind address (default: 0.0.0.0:9090)
/// Override the gRPC bind address (default: 0.0.0.0:9090)
#[arg(short, long)]
pub bind: Option<String>,
/// HTTP REST server port (default: gRPC port + 1)
#[arg(long)]
pub http_port: Option<u16>,
}

View File

@ -1,10 +1,12 @@
use std::net::SocketAddr;
use actix_web::{web, App as ActixApp, HttpResponse, HttpServer};
use anyhow::Context as _;
use clap::Parser;
use config::AppConfig;
use deadpool_redis::{cluster, Runtime};
use session_manager::{SessionManager, SessionStorage};
use rpc::admin::server::{serve, DEFAULT_GRPC_PORT};
use uuid::Uuid;
mod args;
use args::Args;
@ -16,19 +18,24 @@ async fn main() -> anyhow::Result<()> {
observability::init_tracing_subscriber(&log_level);
let args = Args::parse();
let bind_addr: SocketAddr = args
let grpc_addr: SocketAddr = args
.bind
.map(|s| s.parse())
.unwrap_or_else(|| format!("0.0.0.0:{}", DEFAULT_GRPC_PORT).parse())
.context("invalid bind address")?;
.context("invalid grpc bind address")?;
// Admin HTTP port is gRPC port + 1 (e.g., 9091)
let admin_port: u16 = args.http_port.unwrap_or(grpc_addr.port() + 1);
let admin_addr: SocketAddr = format!("0.0.0.0:{}", admin_port).parse().unwrap();
tracing::info!(
app_name = %cfg.app_name().unwrap_or_default(),
bind_addr = %bind_addr,
grpc_addr = %grpc_addr,
admin_addr = %admin_addr,
"Starting admin RPC server"
);
// ── Phase 6: OTLP tracing ──────────────────────────────────────────────
// ── OTLP tracing ─────────────────────────────────────────────────────────
let _otel_guard = if cfg.otel_enabled().unwrap_or(false) {
let endpoint = cfg.otel_endpoint().unwrap_or_else(|_| "http://localhost:4317".to_string());
let service_name = cfg.otel_service_name().unwrap_or_else(|_| "adminrpc".to_string());
@ -52,17 +59,216 @@ async fn main() -> anyhow::Result<()> {
.build()
.map_err(|e| anyhow::anyhow!("failed to build redis pool: {}", e))?;
// Test connection
let _conn = pool.get().await
.context("redis pool connection failed")?;
let _conn = pool.get().await.context("redis pool connection failed")?;
tracing::info!("Redis connected");
let storage = SessionStorage::new(pool);
let storage = SessionStorage::new(pool.clone());
let session_manager = SessionManager::new(storage);
tracing::info!(addr = %bind_addr, "Admin gRPC server listening");
serve(bind_addr, session_manager).await?;
// Spawn gRPC server in background
let sm_for_grpc = session_manager.clone();
let grpc_handle = tokio::spawn(async move {
if let Err(e) = serve(grpc_addr, sm_for_grpc).await {
tracing::error!(error = %e, "Admin gRPC server error");
}
});
// Start HTTP REST server
let http_handle = tokio::spawn(async move {
let pool_for_http = pool.clone();
let sm_for_http = session_manager.clone();
let result = HttpServer::new(move || {
ActixApp::new()
.app_data(web::Data::new(pool_for_http.clone()))
.app_data(web::Data::new(sm_for_http.clone()))
.route("/health", web::get().to(health))
.route("/admin/metrics/export", web::get().to(metrics_export))
.service(
web::scope("/api/admin")
// Sessions
.route("/sessions/workspace/{workspace_id}", web::get().to(list_workspace_sessions))
.route("/sessions/user/{user_id}", web::get().to(list_user_sessions))
.route("/sessions/user/{user_id}/status", web::get().to(get_user_status))
.route("/sessions/user/{user_id}/info", web::get().to(get_user_info))
.route("/sessions/workspace/{workspace_id}/online-users", web::get().to(get_workspace_online_users))
.route("/sessions/user/{user_id}/online", web::get().to(is_user_online))
.route("/sessions/kick", web::post().to(kick_user))
.route("/sessions/kick-workspace", web::post().to(kick_user_from_workspace))
// Metrics
.route("/metrics", web::get().to(get_metrics))
.route("/metrics/export", web::get().to(metrics_export)),
)
})
.bind(admin_addr)
.expect("failed to bind HTTP server")
.workers(2)
.run()
.await;
if let Err(e) = result {
tracing::error!(error = %e, "Admin HTTP server error");
}
});
tracing::info!(addr = %grpc_addr, "Admin gRPC server listening");
tracing::info!(addr = %admin_addr, "Admin HTTP server listening");
grpc_handle.await?;
http_handle.abort();
tracing::info!("Admin RPC server stopped");
Ok(())
}
// ─── HTTP Handlers ────────────────────────────────────────────────────────────
async fn health() -> HttpResponse {
HttpResponse::Ok().json(serde_json::json!({ "ok": true }))
}
async fn metrics_export(pool: web::Data<cluster::Pool>) -> HttpResponse {
match observability::export_all_metrics_csv(pool.get_ref(), "").await {
Ok(csv) => HttpResponse::Ok()
.content_type("text/csv; charset=utf-8")
.body(csv),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })),
}
}
async fn get_metrics(pool: web::Data<cluster::Pool>) -> HttpResponse {
match observability::query_all_instance_metrics(pool.get_ref(), "", 100).await {
Ok(instances) => HttpResponse::Ok().json(instances),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })),
}
}
fn parse_uuid(s: &str) -> Option<Uuid> {
Uuid::parse_str(s).ok()
}
async fn list_workspace_sessions(
sm: web::Data<SessionManager>,
path: web::Path<String>,
) -> HttpResponse {
let workspace_id = match parse_uuid(&path) {
Some(id) => id,
None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid workspace_id" })),
};
match sm.get_workspace_sessions(&workspace_id).await {
Ok(sessions) => HttpResponse::Ok().json(sessions),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })),
}
}
async fn list_user_sessions(
sm: web::Data<SessionManager>,
path: web::Path<String>,
) -> HttpResponse {
let user_id = match parse_uuid(&path) {
Some(id) => id,
None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid user_id" })),
};
match sm.get_user_sessions(&user_id).await {
Ok(sessions) => HttpResponse::Ok().json(sessions),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })),
}
}
async fn get_user_status(
sm: web::Data<SessionManager>,
path: web::Path<String>,
) -> HttpResponse {
let user_id = match parse_uuid(&path) {
Some(id) => id,
None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid user_id" })),
};
match sm.get_user_status(&user_id).await {
Ok(status) => HttpResponse::Ok().json(serde_json::json!({ "status": format!("{:?}", status) })),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })),
}
}
async fn get_user_info(
sm: web::Data<SessionManager>,
path: web::Path<String>,
) -> HttpResponse {
let user_id = match parse_uuid(&path) {
Some(id) => id,
None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid user_id" })),
};
match sm.get_user_info(&user_id).await {
Ok(info) => HttpResponse::Ok().json(info),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })),
}
}
async fn get_workspace_online_users(
sm: web::Data<SessionManager>,
path: web::Path<String>,
) -> HttpResponse {
let workspace_id = match parse_uuid(&path) {
Some(id) => id,
None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid workspace_id" })),
};
match sm.get_workspace_online_users(&workspace_id).await {
Ok(user_ids) => HttpResponse::Ok().json(serde_json::json!({ "user_ids": user_ids })),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })),
}
}
async fn is_user_online(
sm: web::Data<SessionManager>,
path: web::Path<String>,
) -> HttpResponse {
let user_id = match parse_uuid(&path) {
Some(id) => id,
None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid user_id" })),
};
match sm.is_user_online(&user_id).await {
Ok(online) => HttpResponse::Ok().json(serde_json::json!({ "online": online })),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })),
}
}
#[derive(serde::Deserialize)]
struct KickUserPayload {
user_id: String,
}
async fn kick_user(
sm: web::Data<SessionManager>,
body: web::Json<KickUserPayload>,
) -> HttpResponse {
let user_id = match parse_uuid(&body.user_id) {
Some(id) => id,
None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid user_id" })),
};
match sm.kick_user(&user_id).await {
Ok(count) => HttpResponse::Ok().json(serde_json::json!({ "kicked_count": count })),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })),
}
}
#[derive(serde::Deserialize)]
struct KickWorkspacePayload {
user_id: String,
workspace_id: String,
}
async fn kick_user_from_workspace(
sm: web::Data<SessionManager>,
body: web::Json<KickWorkspacePayload>,
) -> HttpResponse {
let user_id = match parse_uuid(&body.user_id) {
Some(id) => id,
None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid user_id" })),
};
let workspace_id = match parse_uuid(&body.workspace_id) {
Some(id) => id,
None => return HttpResponse::BadRequest().json(serde_json::json!({ "error": "invalid workspace_id" })),
};
match sm.kick_user_from_workspace(&user_id, &workspace_id).await {
Ok(count) => HttpResponse::Ok().json(serde_json::json!({ "kicked_count": count })),
Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })),
}
}

View File

@ -17,6 +17,7 @@ tokio = { workspace = true, features = ["full"] }
uuid = { workspace = true }
service = { workspace = true }
observability = { workspace = true }
room = { workspace = true }
api = { workspace = true }
session = { workspace = true }
config = { workspace = true }

View File

@ -7,7 +7,8 @@ use db::cache::AppCache;
use db::database::AppDatabase;
use observability::{
init_tracing_subscriber, install_recorder, prometheus_handler, spawn_http_metrics_poller,
HttpMetrics, HttpSnapshotGuard, MetricsMiddleware, TracingSpanMiddleware,
spawn_redis_metrics_flusher,
HttpMetrics, HttpSnapshotGuard, MetricsMiddleware, TracingSpanMiddleware, instance_id,
};
use sea_orm::ConnectionTrait;
use service::AppService;
@ -85,7 +86,8 @@ async fn main() -> anyhow::Result<()> {
};
let prometheus_handle = install_recorder();
let prometheus_handle_data = web::Data::new(prometheus_handle);
let prometheus_handle_arc = std::sync::Arc::new(prometheus_handle);
let prometheus_handle_data = web::Data::new(prometheus_handle_arc.clone());
let http_metrics = std::sync::Arc::new(HttpMetrics::new());
let http_snapshot: HttpSnapshotGuard = std::sync::Arc::new(std::sync::RwLock::new(
@ -99,6 +101,18 @@ async fn main() -> anyhow::Result<()> {
);
let http_snapshot_data = web::Data::new(http_snapshot);
// ── Redis metrics flusher (every 5s → key: metrics:{instance}:{ts}) ──────
let http_for_flusher = http_metrics.clone();
let prometheus_for_flusher = prometheus_handle_arc.clone();
spawn_redis_metrics_flusher(
cache.redis_pool().clone(),
instance_id(),
prometheus_for_flusher,
http_for_flusher,
std::time::Duration::from_secs(5),
);
tracing::info!("Redis metrics flusher started (5s interval)");
let bind_addr = args.bind.unwrap_or_else(|| "127.0.0.1:8080".to_string());
tracing::info!(bind_addr = %bind_addr, "Listening");
let http_metrics_server = http_metrics.clone();

View File

@ -34,5 +34,6 @@ agent-tool-derive = { path = "../agent-tool-derive" }
once_cell = { workspace = true }
regex = { workspace = true }
tracing = { workspace = true }
metrics = { workspace = true }
[lints]
workspace = true

View File

@ -1,5 +1,6 @@
use async_openai::Client;
use std::pin::Pin;
use async_openai::config::OpenAIConfig;
use async_openai::Client;
use async_openai::types::chat::{
ChatCompletionMessageToolCalls, ChatCompletionRequestAssistantMessage,
ChatCompletionRequestAssistantMessageContent, ChatCompletionRequestMessage,
@ -15,15 +16,18 @@ use uuid::Uuid;
use super::context::RoomMessageContext;
use super::{AiRequest, AiStreamChunk, Mention, StreamCallback};
use crate::client::AiClientConfig;
use crate::compact::{CompactConfig, CompactService};
use crate::embed::EmbedService;
use crate::error::{AgentError, Result};
use crate::perception::{PerceptionService, SkillEntry, ToolCallEvent};
use crate::tool::{ToolCall, ToolContext, ToolExecutor, registry::ToolRegistry};
use crate::react::{ReactAgent, ReactConfig, DEFAULT_SYSTEM_PROMPT};
use crate::tool::{ToolCall, ToolContext, ToolExecutor, ToolResult, registry::ToolRegistry};
/// Service for handling AI chat requests in rooms.
pub struct ChatService {
openai_client: Client<OpenAIConfig>,
ai_client_config: Option<AiClientConfig>,
compact_service: Option<CompactService>,
embed_service: Option<EmbedService>,
perception_service: PerceptionService,
@ -34,6 +38,7 @@ impl ChatService {
pub fn new(openai_client: Client<OpenAIConfig>) -> Self {
Self {
openai_client,
ai_client_config: None,
compact_service: None,
embed_service: None,
perception_service: PerceptionService::default(),
@ -41,6 +46,12 @@ impl ChatService {
}
}
/// Set the AI client config (required for `process_react`).
pub fn with_ai_client_config(mut self, config: AiClientConfig) -> Self {
self.ai_client_config = Some(config);
self
}
pub fn with_compact_service(mut self, compact_service: CompactService) -> Self {
self.compact_service = Some(compact_service);
self
@ -695,6 +706,95 @@ impl ChatService {
.detect(embed_service, &request.input, &request.room.id.to_string())
.await
}
/// Process a request using the ReAct (Reasoning + Acting) agent.
///
/// Unlike the simple loop in `process`, the ReAct agent performs multi-step
/// tool-augmented reasoning: it thinks, calls tools, observes results, and
/// iterates until it has enough information to answer.
///
/// The `on_chunk` callback receives each step event (Thought, Action,
/// Observation, Answer).
///
/// Requires `with_ai_client_config` to have been called. Returns an error if
/// the agent failed or exceeded `max_steps`.
pub async fn process_react<C>(
&self,
request: &AiRequest,
mut on_chunk: C,
) -> Result<String>
where
C: FnMut(crate::react::ReactStep) + Send,
{
let client_config = self.ai_client_config.as_ref().ok_or_else(|| {
AgentError::Internal("AiClientConfig not set — call with_ai_client_config first".into())
})?;
let Some(registry) = &self.tool_registry else {
return Err(AgentError::Internal("no tool registry registered".into()));
};
// Build a ToolExecutor that wraps the request-scoped registry.
let db = request.db.clone();
let cache = request.cache.clone();
let config = request.config.clone();
let room_id = request.room.id;
let project_id = Some(request.project.id);
let sender_uid = Some(request.sender.uid);
let registry = registry.clone();
let executor: std::sync::Arc<
dyn Fn(String, serde_json::Value) -> Pin<Box<dyn Future<Output = std::result::Result<serde_json::Value, String>> + Send>>
+ Send
+ Sync,
> = std::sync::Arc::new(move |name: String, args: serde_json::Value| {
let db = db.clone();
let cache = cache.clone();
let config = config.clone();
let room_id = room_id;
let project_id = project_id;
let sender_uid = sender_uid;
let registry = registry.clone();
Box::pin(async move {
let mut ctx = ToolContext::new(db, cache, config, room_id, sender_uid);
if let Some(pid) = project_id {
ctx = ctx.with_project(pid);
}
ctx.registry_mut().merge(registry.clone());
let tool_executor = ToolExecutor::new();
let call = ToolCall {
id: uuid::Uuid::new_v4().to_string(),
name,
arguments: serde_json::to_string(&args).unwrap_or_else(|_| "{}".into()),
};
let results: Vec<_> = tool_executor
.execute_batch(vec![call], &mut ctx)
.await
.map_err(|e| e.to_string())?;
let result = results.into_iter().next().ok_or_else(|| "no result".to_string())?;
match result.result {
ToolResult::Ok(v) => Ok(v),
ToolResult::Error(msg) => Err(msg),
}
}) as Pin<Box<dyn std::future::Future<Output = std::result::Result<serde_json::Value, String>> + Send>>
});
let tools = self.tools();
let config = ReactConfig {
max_steps: 20,
stop_sequences: Vec::new(),
tool_executor: Some(executor),
};
let mut agent = ReactAgent::new(DEFAULT_SYSTEM_PROMPT, tools, config);
agent.add_user_message(&request.input);
agent.run(&request.model.name, client_config, |step| {
on_chunk(step);
})
.await
}
}
#[derive(Clone, Debug, Default)]
struct ToolCallChunkAccum {

View File

@ -15,6 +15,39 @@ use std::time::Instant;
use crate::error::{AgentError, Result};
/// AI call metrics — increments metrics crate counters for all AI calls.
/// These are registered in observability::install_recorder() and exported
/// via both the Prometheus /metrics endpoint and the Redis metrics flusher.
#[derive(Debug, Clone, Default)]
pub struct AiMetrics;
impl AiMetrics {
pub fn new() -> Self {
Self
}
/// Record a successful AI call with token usage.
pub fn record_success(&self, input_tokens: i64, output_tokens: i64, has_function_call: bool) {
metrics::counter!("ai_calls_total").increment(1);
metrics::counter!("ai_calls_success").increment(1);
if input_tokens > 0 {
metrics::counter!("ai_input_tokens_total").increment(input_tokens as u64);
}
if output_tokens > 0 {
metrics::counter!("ai_output_tokens_total").increment(output_tokens as u64);
}
if has_function_call {
metrics::counter!("ai_function_calls_total").increment(1);
}
}
/// Record a failed AI call.
pub fn record_failure(&self) {
metrics::counter!("ai_calls_total").increment(1);
metrics::counter!("ai_calls_failure").increment(1);
}
}
/// Configuration for the AI client.
#[derive(Clone)]
pub struct AiClientConfig {
@ -136,6 +169,13 @@ fn is_retryable_error(err: &async_openai::error::OpenAIError) -> bool {
}
}
/// Global AI metrics shared across all AI client calls.
static AI_METRICS: std::sync::OnceLock<AiMetrics> = std::sync::OnceLock::new();
fn ai_metrics() -> &'static AiMetrics {
AI_METRICS.get_or_init(AiMetrics::new)
}
/// Call the AI model with automatic retry.
pub async fn call_with_retry(
messages: &[ChatCompletionRequestMessage],
@ -162,6 +202,14 @@ pub async fn call_with_retry(
let latency_ms = start.elapsed().as_millis() as i64;
let (input_tokens, output_tokens) = extract_usage(&response);
// Check if response contains a tool call
let has_function_call = response
.choices
.first()
.and_then(|c| c.finish_reason.as_ref())
.map_or(false, |r| *r == async_openai::types::chat::FinishReason::ToolCalls);
ai_metrics().record_success(input_tokens, output_tokens, has_function_call);
return Ok(AiCallResponse {
content: extract_content(&response),
input_tokens,
@ -184,6 +232,7 @@ pub async fn call_with_retry(
state.next();
continue;
}
ai_metrics().record_failure();
return Err(AgentError::OpenAi(err.to_string()));
}
}
@ -231,6 +280,14 @@ pub async fn call_with_params(
let latency_ms = start.elapsed().as_millis() as i64;
let (input_tokens, output_tokens) = extract_usage(&response);
// Check if response contains a tool call
let has_function_call = response
.choices
.first()
.and_then(|c| c.finish_reason.as_ref())
.map_or(false, |r| *r == async_openai::types::chat::FinishReason::ToolCalls);
ai_metrics().record_success(input_tokens, output_tokens, has_function_call);
return Ok(AiCallResponse {
content: extract_content(&response),
input_tokens,
@ -253,6 +310,7 @@ pub async fn call_with_params(
state.next();
continue;
}
ai_metrics().record_failure();
return Err(AgentError::OpenAi(err.to_string()));
}
}

View File

@ -24,6 +24,7 @@ pub use embed::{EmbedClient, EmbedService, QdrantClient, SearchResult};
pub use error::{AgentError, Result};
pub use react::{
Hook, HookAction, NoopHook, ReactAgent, ReactConfig, ReactStep, ToolCallAction, TracingHook,
DEFAULT_SYSTEM_PROMPT,
};
pub use tool::{
ToolCall, ToolCallResult, ToolContext, ToolDefinition, ToolError, ToolExecutor, ToolHandler, ToolParam,

View File

@ -11,3 +11,56 @@ pub mod types;
pub use hooks::{Hook, HookAction, NoopHook, ToolCallAction, TracingHook};
pub use loop_core::ReactAgent;
pub use types::{ReactConfig, ReactStep};
/// Default system prompt for the ReAct agent.
///
/// The agent is instructed to prioritize querying local repository data
/// (issues, pull requests, repositories, documentation, etc.) before
/// falling back to external sources.
pub const DEFAULT_SYSTEM_PROMPT: &str = r#"You are a helpful AI assistant embedded in a self-hosted development platform that combines GitHub and Slack features.
## Priority Rule: Search Local Repository Data First
BEFORE answering any user question, you MUST attempt to answer it using local repository data.
Local data includes: issues, pull requests, repositories, code reviews, chat messages, documentation, workspace members, sessions, and any other data stored in the platform's database.
**Never** make up an answer or immediately defer to external sources (e.g., general web search, external documentation). If local data does not contain the answer, clearly state that before considering external information.
## Response Format
You must respond in JSON format:
1. **If the question can be answered from local data:**
```json
{
"thought": "Explain your reasoning about what local data to look up.",
"action": { "name": "tool_name", "arguments": { ... } }
}
```
2. **If you have enough information to answer:**
```json
{
"thought": "Explain how you arrived at the answer.",
"answer": "Your final answer text."
}
```
## Tool Use Guidelines
- Use `search_issues` or `search_prs` to find relevant issues/PRs in the repository.
- Use `search_repositories` to find relevant repositories.
- Use `get_workspace_info` to retrieve workspace configuration and member data.
- Use `get_user_info` to look up user profiles and activity.
- Use `get_code_review` to retrieve code review details.
- Use `get_chat_history` to find relevant conversations.
- Chain multiple tool calls if a single call is insufficient.
- After each tool result, re-evaluate whether more data is needed before providing a final answer.
## Principles
- Be precise and cite specific issue/PR numbers, commit hashes, or message IDs when available.
- If local data is ambiguous or incomplete, say so explicitly.
- Prefer facts over speculation. If you are uncertain, say so.
- If a tool returns no results, try a different approach or search term rather than assuming the information does not exist.
"#;

View File

@ -637,6 +637,7 @@ impl WsRequestHandler {
think: params.think,
stream: params.stream,
min_score: params.min_score,
agent_type: None,
},
&ctx,
)

View File

@ -1,6 +1,7 @@
pub use sea_orm_migration::prelude::*;
mod m20260420_000003_add_model_id_to_room_message;
pub mod m20260421_000001_add_agent_type_to_room_ai;
pub async fn execute_sql(manager: &SchemaManager<'_>, sql: &str) -> Result<(), DbErr> {
for stmt in split_sql_statements(sql) {
@ -87,6 +88,8 @@ impl MigratorTrait for Migrator {
Box::new(m20260420_000001_create_room_attachment::Migration),
Box::new(m20260420_000002_add_push_subscription::Migration),
Box::new(m20260420_000003_add_model_id_to_room_message::Migration),
Box::new(m20260421_000001_add_agent_type_to_room_ai::Migration),
Box::new(m20260420_000003_add_model_id_to_room_message::Migration),
// Repo tables
Box::new(m20250628_000028_create_repo::Migration),
Box::new(m20250628_000029_create_repo_branch::Migration),

View File

@ -0,0 +1,23 @@
//! SeaORM migration: add agent_type column to room_ai
use sea_orm_migration::prelude::*;
pub struct Migration;
impl MigrationName for Migration {
fn name(&self) -> &str {
"m20260421_000001_add_agent_type_to_room_ai"
}
}
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let sql = include_str!("sql/m20260421_000001_add_agent_type_to_room_ai.sql");
super::execute_sql(manager, sql).await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
super::execute_sql(manager, "ALTER TABLE room_ai DROP COLUMN IF EXISTS agent_type;").await
}
}

View File

@ -0,0 +1,2 @@
ALTER TABLE room_ai ADD COLUMN IF NOT EXISTS agent_type VARCHAR(50);
CREATE INDEX IF NOT EXISTS idx_room_ai_agent_type ON room_ai (agent_type) WHERE agent_type IS NOT NULL;

View File

@ -20,6 +20,8 @@ pub struct Model {
pub think: bool,
pub stream: bool,
pub min_score: Option<f32>,
/// Agent type: "chat" (default) or "react" for ReAct reasoning agent.
pub agent_type: Option<String>,
pub created_at: DateTimeUtc,
pub updated_at: DateTimeUtc,
}

View File

@ -21,7 +21,13 @@ chrono = { workspace = true }
once_cell = { workspace = true }
hostname = { workspace = true }
serde_json = { workspace = true }
serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = ["rt"] }
anyhow = { workspace = true }
# Redis (for metrics exporter)
redis = { workspace = true }
deadpool-redis = { workspace = true, features = ["cluster"] }
# Prometheus metrics export
metrics = "0.22"

View File

@ -9,12 +9,19 @@ pub mod metrics_middleware;
pub mod prometheus_exporter;
pub mod otlp;
pub mod tracing_middleware;
pub mod redis_metrics;
pub use tracing_fmt::{init_tracing_subscriber, instance_id};
pub use metrics_middleware::{MetricsMiddleware, HttpMetrics};
pub use prometheus_exporter::{
install_recorder, prometheus_handler, spawn_http_metrics_poller,
HttpMetricsSnapshot, HttpSnapshotGuard,
HttpMetricsSnapshot, HttpSnapshotGuard, render_to_hashmap,
};
pub use otlp::{init_otlp, OtelGuard};
pub use tracing_middleware::TracingSpanMiddleware;
pub use redis_metrics::{
MetricsSnapshot,
spawn_redis_metrics_flusher,
query_instance_metrics, query_all_instance_metrics, export_all_metrics_csv,
FlatInstanceMetrics,
};

View File

@ -1,12 +1,13 @@
//! Actix-web metrics middleware: counts requests and measures latency.
//
//!
//! Registers metrics into a shared atomic counter exposed as structured fields
//! on every request. No external metrics endpoint — logs are the export path.
use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform};
use futures::future::{LocalBoxFuture, Ready, ok};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use std::time::Instant;
@ -23,6 +24,8 @@ pub struct HttpMetrics {
pub status_4xx: AtomicU64,
/// Number of 5xx responses.
pub status_5xx: AtomicU64,
/// Per-endpoint request counters. Key format: "GET /api/room/{id}" or "POST /api/git/commit"
pub endpoint_counts: RwLock<HashMap<String, AtomicU64>>,
}
impl HttpMetrics {
@ -30,6 +33,35 @@ impl HttpMetrics {
pub fn new() -> Self {
Self::default()
}
/// Increment the counter for a specific HTTP endpoint (method + path).
pub fn incr_endpoint(&self, method: &str, path: &str) {
let key = format!("{} {}", method, path);
let mut map = self.endpoint_counts.write().unwrap();
let counter = map.entry(key).or_insert_with(|| AtomicU64::new(0));
counter.fetch_add(1, Ordering::Relaxed);
}
/// Returns a snapshot of all current counter values.
pub fn snapshot(&self) -> HashMap<String, serde_json::Value> {
let mut m = HashMap::new();
m.insert("http_requests_total".into(), serde_json::json!(self.request_count.load(Ordering::Relaxed)));
m.insert("http_request_duration_ms_total".into(), serde_json::json!(self.total_duration_ms.load(Ordering::Relaxed)));
m.insert("http_requests_2xx".into(), serde_json::json!(self.status_2xx.load(Ordering::Relaxed)));
m.insert("http_requests_4xx".into(), serde_json::json!(self.status_4xx.load(Ordering::Relaxed)));
m.insert("http_requests_5xx".into(), serde_json::json!(self.status_5xx.load(Ordering::Relaxed)));
// Per-endpoint counters
let map = self.endpoint_counts.read().unwrap();
for (key, counter) in map.iter() {
// Sanitize key for use as metric name: replace spaces and slashes with underscores
let sanitized = key.replace([' ', '/'], "_").to_lowercase();
let metric_key = format!("http_endpoint_{}", sanitized);
m.insert(metric_key, serde_json::json!(counter.load(Ordering::Relaxed)));
}
m
}
}
/// Actix-web middleware that collects per-request metrics and exposes them
@ -97,6 +129,8 @@ where
let started = Instant::now();
let service = self.service.clone();
let metrics = self.metrics.clone();
let method = req.method().as_str().to_string();
let path = req.path().to_string();
Box::pin(async move {
let res = service.call(req).await?;
@ -106,6 +140,7 @@ where
// Update counters atomically.
metrics.request_count.fetch_add(1, Ordering::Relaxed);
metrics.total_duration_ms.fetch_add(elapsed_ms, Ordering::Relaxed);
metrics.incr_endpoint(&method, &path);
match status_code {
200..=299 => {

View File

@ -13,7 +13,9 @@
//! because its `register_*` macro calls require a global recorder to be set.
use actix_web::{web, HttpRequest, HttpResponse};
use metrics::{describe_counter, Unit};
use metrics_exporter_prometheus::PrometheusBuilder;
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock};
@ -22,6 +24,15 @@ use std::sync::{Arc, RwLock};
/// Returns a `PrometheusHandle` for rendering the `/metrics` endpoint.
/// **Must be called before any `metrics::register_*` macro is invoked.**
pub fn install_recorder() -> metrics_exporter_prometheus::PrometheusHandle {
// Register AI metrics descriptions so they appear in the /metrics output
// even before any calls have been made.
describe_counter!("ai_calls_total", Unit::Count, "Total AI chat completion calls");
describe_counter!("ai_calls_success", Unit::Count, "Successful AI calls");
describe_counter!("ai_calls_failure", Unit::Count, "Failed AI calls");
describe_counter!("ai_input_tokens_total", Unit::Count, "Total input tokens consumed");
describe_counter!("ai_output_tokens_total", Unit::Count, "Total output tokens generated");
describe_counter!("ai_function_calls_total", Unit::Count, "Total AI function/tool calls");
let recorder = PrometheusBuilder::new()
.build_recorder();
@ -33,6 +44,36 @@ pub fn install_recorder() -> metrics_exporter_prometheus::PrometheusHandle {
handle
}
/// Parses Prometheus text exposition format into a flat map of metric name → value.
/// Labels are discarded (only the last value for each name is kept).
pub fn render_to_hashmap(body: &str) -> HashMap<String, serde_json::Value> {
let mut out = HashMap::new();
for line in body.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
// Prometheus format: METRIC_NAME{labels} VALUE
// or: METRIC_NAME VALUE
if let Some(brace_pos) = line.find('{') {
let name = &line[..brace_pos];
if let Some(space_pos) = line[brace_pos..].find(' ') {
let value_str = &line[brace_pos + space_pos + 1..];
if let Ok(v) = value_str.parse::<f64>() {
out.insert(name.to_string(), serde_json::json!(v));
}
}
} else if let Some(last_space) = line.rfind(' ') {
let name = &line[..last_space];
let value_str = &line[last_space + 1..];
if let Ok(v) = value_str.parse::<f64>() {
out.insert(name.to_string(), serde_json::json!(v));
}
}
}
out
}
/// Re-export `HttpMetrics` so callers don't need to import from `metrics_middleware`.
pub use crate::metrics_middleware::HttpMetrics;

View File

@ -0,0 +1,342 @@
//! Redis-backed metrics exporter.
//!
//! Every `interval` seconds this module pushes metric fields to a Redis hash
//! bucketed by day:
//! Key: `metrics:{instance_id}:{YYYY-MM-DD}` (hash)
//! TTL: 30 days per day bucket
//!
//! Also maintains a sorted set `metrics:index:{instance_id}:{YYYY-MM-DD}` per
//! day for querying within a day window.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use deadpool_redis::cluster::Pool;
use serde::{Deserialize, Serialize};
const METRICS_TTL_SECS: i64 = 30 * 24 * 60 * 60; // 30 days
/// A snapshot of metric values at a point in time.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricsSnapshot {
pub instance_id: String,
pub timestamp_secs: i64,
#[serde(flatten)]
pub http: HashMap<String, serde_json::Value>,
#[serde(flatten)]
pub room: HashMap<String, serde_json::Value>,
}
/// Starts a background task that snapshots and pushes all metrics to Redis
/// every `interval` seconds.
///
/// `prometheus_handle`: from `install_recorder()`, used to render RoomMetrics values
/// `http_metrics`: shared `HttpMetrics` from `MetricsMiddleware`
/// `instance_id`: used as the Redis key prefix
pub fn spawn_redis_metrics_flusher(
redis_pool: Pool,
instance_id: String,
prometheus_handle: Arc<metrics_exporter_prometheus::PrometheusHandle>,
http_metrics: Arc<crate::metrics_middleware::HttpMetrics>,
interval: Duration,
) {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
loop {
ticker.tick().await;
if let Err(e) = flush_metrics(&redis_pool, &instance_id, &prometheus_handle, http_metrics.as_ref()).await {
tracing::warn!(error = %e, "failed to flush metrics to Redis");
}
}
});
}
async fn flush_metrics(
pool: &Pool,
instance_id: &str,
prometheus_handle: &Arc<metrics_exporter_prometheus::PrometheusHandle>,
http_metrics: &crate::metrics_middleware::HttpMetrics,
) -> anyhow::Result<()> {
let now = chrono::Utc::now();
let now_ts = now.timestamp();
let day_key = now.format("%Y-%m-%d").to_string();
// RoomMetrics from the metrics crate (rendered via PrometheusHandle)
let body = prometheus_handle.render();
let room = crate::prometheus_exporter::render_to_hashmap(&body);
// HTTP metrics from AtomicU64
let http = http_metrics.snapshot();
let hash_key = format!("metrics:{}:{}", instance_id, day_key);
let index_key = format!("metrics:index:{}:{}", instance_id, day_key);
let mut conn = pool.get().await?;
// HSET each metric field into the daily hash
let mut fields: Vec<(String, String)> = Vec::with_capacity(http.len() + room.len());
for (k, v) in &http {
fields.push((format!("http_{}", k), v.to_string()));
}
for (k, v) in &room {
fields.push((format!("room_{}", k), v.to_string()));
}
// HSET field field_value ...
let mut cmd = redis::cmd("HSET");
cmd.arg(&hash_key);
for (f, val) in &fields {
cmd.arg(f).arg(val);
}
let _: () = cmd.query_async(&mut conn).await?;
// Set TTL on the hash key (refresh every write)
let _: () = redis::cmd("EXPIRE")
.arg(&hash_key)
.arg(METRICS_TTL_SECS)
.query_async(&mut conn)
.await?;
// Update sorted set index for this instance+day
let _: () = redis::cmd("ZADD")
.arg(&index_key)
.arg(now_ts as f64)
.arg(now_ts.to_string())
.query_async(&mut conn)
.await?;
// Trim index to last 1000 entries per day
let _: () = redis::cmd("ZREMRANGEBYRANK")
.arg(&index_key)
.arg(0)
.arg(-1001)
.query_async(&mut conn)
.await?;
// Set TTL on the index key
let _: () = redis::cmd("EXPIRE")
.arg(&index_key)
.arg(METRICS_TTL_SECS)
.query_async(&mut conn)
.await?;
tracing::debug!(key = %hash_key, field_count = fields.len(), "metrics flushed to Redis (daily hash)");
Ok(())
}
/// Query metrics for an instance from Redis, reading the most recent `limit`
/// snapshots across all available daily hash buckets.
/// Returns a vector sorted by timestamp ascending.
pub async fn query_instance_metrics(
pool: &Pool,
instance_id: &str,
limit: usize,
) -> anyhow::Result<Vec<(i64, MetricsSnapshot)>> {
let mut conn = pool.get().await?;
// Scan for all daily index keys for this instance
let index_pattern = format!("metrics:index:{}:*", instance_id);
let mut cursor = 0u64;
let mut day_keys: Vec<String> = Vec::new();
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg(&index_pattern)
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await?;
day_keys.extend(keys);
cursor = new_cursor;
if cursor == 0 {
break;
}
}
// Collect all timestamps from all daily indexes
let mut all_timestamps: Vec<i64> = Vec::new();
for day_key in &day_keys {
let timestamps: Vec<i64> = redis::cmd("ZREVRANGE")
.arg(day_key)
.arg(0)
.arg((limit as isize - 1).max(0))
.query_async(&mut conn)
.await?;
all_timestamps.extend(timestamps);
}
// Sort descending and take limit
all_timestamps.sort_by_key(|&ts| std::cmp::Reverse(ts));
all_timestamps.truncate(limit);
// Read each snapshot from the daily hash
let mut results = Vec::with_capacity(all_timestamps.len());
for ts in &all_timestamps {
// Derive day bucket from timestamp
let dt = chrono::DateTime::from_timestamp(*ts, 0)
.unwrap_or_else(|| chrono::Utc::now());
let day_str = dt.format("%Y-%m-%d").to_string();
let hash_key = format!("metrics:{}:{}", instance_id, day_str);
let fields: Option<HashMap<String, String>> = redis::cmd("HGETALL")
.arg(&hash_key)
.query_async(&mut conn)
.await?;
if let Some(fields) = fields {
let mut http: HashMap<String, serde_json::Value> = HashMap::new();
let mut room: HashMap<String, serde_json::Value> = HashMap::new();
for (k, v) in &fields {
let json_val: serde_json::Value = serde_json::from_str(v).unwrap_or_else(|_| serde_json::Value::String(v.clone()));
if let Some(stripped) = k.strip_prefix("http_") {
http.insert(stripped.to_string(), json_val);
} else if let Some(stripped) = k.strip_prefix("room_") {
room.insert(stripped.to_string(), json_val);
}
}
results.push((*ts, MetricsSnapshot {
instance_id: instance_id.to_string(),
timestamp_secs: *ts,
http,
room,
}));
}
}
results.sort_by_key(|(ts, _)| *ts);
Ok(results)
}
/// Flat instance metrics suitable for JSON serialization.
#[derive(serde::Serialize)]
pub struct FlatInstanceMetrics {
pub instance_id: String,
pub timestamp_secs: i64,
pub http: std::collections::HashMap<String, String>,
pub room: std::collections::HashMap<String, String>,
}
/// Query all instances' metrics, optionally filtered by `instance_filter`.
pub async fn query_all_instance_metrics(
pool: &Pool,
instance_filter: &str,
limit: usize,
) -> anyhow::Result<Vec<FlatInstanceMetrics>> {
let mut conn = pool.get().await?;
// Scan for all daily index keys to extract instance ids
let mut cursor = 0u64;
let mut all_instance_ids = Vec::new();
let mut seen = std::collections::HashSet::new();
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg("metrics:index:*")
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await?;
for k in keys {
// Key format: metrics:index:{instance}:{YYYY-MM-DD}
if let Some(rest) = k.strip_prefix("metrics:index:") {
if let Some((instance_id, _)) = rest.rsplit_once(':') {
if seen.insert(instance_id.to_string()) {
all_instance_ids.push(instance_id.to_string());
}
}
}
}
cursor = new_cursor;
if cursor == 0 {
break;
}
}
let mut results = Vec::new();
for instance_id in all_instance_ids {
if !instance_filter.is_empty() && !instance_id.contains(instance_filter) {
continue;
}
let snapshots = query_instance_metrics(pool, &instance_id, limit).await?;
for (_, payload) in snapshots {
let mut http: std::collections::HashMap<String, String> = std::collections::HashMap::new();
let mut room: std::collections::HashMap<String, String> = std::collections::HashMap::new();
for (k, v) in &payload.http {
http.insert(k.clone(), v.to_string());
}
for (k, v) in &payload.room {
room.insert(k.clone(), v.to_string());
}
results.push(FlatInstanceMetrics {
instance_id: payload.instance_id,
timestamp_secs: payload.timestamp_secs,
http,
room,
});
}
}
Ok(results)
}
/// Export all metrics across all known instances as CSV.
pub async fn export_all_metrics_csv(
pool: &Pool,
instance_filter: &str,
) -> anyhow::Result<String> {
let mut conn = pool.get().await?;
// Scan for all daily index keys to extract instance ids
let mut cursor = 0u64;
let mut all_instance_ids = Vec::new();
let mut seen = std::collections::HashSet::new();
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg("metrics:index:*")
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await?;
for k in keys {
// Key format: metrics:index:{instance}:{YYYY-MM-DD}
if let Some(rest) = k.strip_prefix("metrics:index:") {
if let Some(instance_id) = rest.rsplit_once(':') {
let id = instance_id.0;
if seen.insert(id.to_string()) {
all_instance_ids.push(id.to_string());
}
}
}
}
cursor = new_cursor;
if cursor == 0 {
break;
}
}
let mut rows: Vec<String> = vec![
"instance_id,timestamp,metric,value".to_string(),
];
for instance_id in all_instance_ids {
if !instance_filter.is_empty() && !instance_id.contains(instance_filter) {
continue;
}
let metrics = query_instance_metrics(pool, &instance_id, 1000).await?;
for (ts, payload) in metrics {
for (k, v) in &payload.http {
rows.push(format!("{},{},http_{},{}", instance_id, ts, k, v));
}
for (k, v) in &payload.room {
rows.push(format!("{},{},room_{},{}", instance_id, ts, k, v));
}
}
}
Ok(rows.join("\n"))
}

View File

@ -80,6 +80,9 @@ impl RoomService {
if request.min_score.is_some() {
active.min_score = Set(request.min_score);
}
if request.agent_type.is_some() {
active.agent_type = Set(request.agent_type);
}
active.updated_at = Set(now);
active.update(&self.db).await?
} else {
@ -97,6 +100,7 @@ impl RoomService {
think: Set(request.think.unwrap_or(false)),
stream: Set(request.stream.unwrap_or(false)),
min_score: Set(request.min_score),
agent_type: Set(request.agent_type),
created_at: Set(now),
updated_at: Set(now),
}

View File

@ -123,6 +123,7 @@ impl From<room_ai::Model> for super::RoomAiResponse {
think: value.think,
stream: value.stream,
min_score: value.min_score,
agent_type: value.agent_type,
created_at: value.created_at,
updated_at: value.updated_at,
}

View File

@ -1,3 +1,5 @@
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use metrics::{describe_counter, describe_gauge, describe_histogram, Counter, Gauge, Histogram, Unit};
@ -22,6 +24,24 @@ pub struct RoomMetrics {
pub ws_heartbeat_sent_total: Counter,
pub ws_heartbeat_timeout_total: Counter,
pub ws_idle_timeout_total: Counter,
// Atomic backing for snapshot reads (all values stored as f64 for gauges, u64 for counters)
pub _rooms_online_val: AtomicU64,
pub _users_online_val: AtomicU64,
pub _ws_connections_active_val: AtomicU64,
pub _ws_connections_total_val: AtomicU64,
pub _ws_disconnections_total_val: AtomicU64,
pub _messages_sent_val: AtomicU64,
pub _messages_persisted_val: AtomicU64,
pub _messages_persist_failed_val: AtomicU64,
pub _broadcasts_sent_val: AtomicU64,
pub _broadcasts_dropped_val: AtomicU64,
pub _duplicates_skipped_val: AtomicU64,
pub _redis_publish_failed_val: AtomicU64,
pub _ws_rate_limit_hits_val: AtomicU64,
pub _ws_auth_failures_val: AtomicU64,
pub _ws_heartbeat_sent_total_val: AtomicU64,
pub _ws_heartbeat_timeout_total_val: AtomicU64,
pub _ws_idle_timeout_total_val: AtomicU64,
}
impl Default for RoomMetrics {
@ -130,6 +150,23 @@ impl Default for RoomMetrics {
ws_heartbeat_sent_total: metrics::counter!("room_ws_heartbeat_sent_total"),
ws_heartbeat_timeout_total: metrics::counter!("room_ws_heartbeat_timeout_total"),
ws_idle_timeout_total: metrics::counter!("room_ws_idle_timeout_total"),
_rooms_online_val: AtomicU64::new(0),
_users_online_val: AtomicU64::new(0),
_ws_connections_active_val: AtomicU64::new(0),
_ws_connections_total_val: AtomicU64::new(0),
_ws_disconnections_total_val: AtomicU64::new(0),
_messages_sent_val: AtomicU64::new(0),
_messages_persisted_val: AtomicU64::new(0),
_messages_persist_failed_val: AtomicU64::new(0),
_broadcasts_sent_val: AtomicU64::new(0),
_broadcasts_dropped_val: AtomicU64::new(0),
_duplicates_skipped_val: AtomicU64::new(0),
_redis_publish_failed_val: AtomicU64::new(0),
_ws_rate_limit_hits_val: AtomicU64::new(0),
_ws_auth_failures_val: AtomicU64::new(0),
_ws_heartbeat_sent_total_val: AtomicU64::new(0),
_ws_heartbeat_timeout_total_val: AtomicU64::new(0),
_ws_idle_timeout_total_val: AtomicU64::new(0),
}
}
}
@ -170,4 +207,27 @@ impl RoomMetrics {
pub fn into_arc(self) -> Arc<RoomMetrics> {
Arc::new(self)
}
/// Returns a snapshot of all current gauge and counter values as a flat map.
pub fn snapshot(&self) -> HashMap<String, serde_json::Value> {
let mut m = HashMap::new();
m.insert("room_online_rooms".into(), serde_json::json!(self._rooms_online_val.load(Ordering::Relaxed) as f64));
m.insert("room_online_users".into(), serde_json::json!(self._users_online_val.load(Ordering::Relaxed) as f64));
m.insert("room_ws_connections_active".into(), serde_json::json!(self._ws_connections_active_val.load(Ordering::Relaxed) as f64));
m.insert("room_ws_connections_total".into(), serde_json::json!(self._ws_connections_total_val.load(Ordering::Relaxed)));
m.insert("room_ws_disconnections_total".into(), serde_json::json!(self._ws_disconnections_total_val.load(Ordering::Relaxed)));
m.insert("room_messages_sent_total".into(), serde_json::json!(self._messages_sent_val.load(Ordering::Relaxed)));
m.insert("room_messages_persisted_total".into(), serde_json::json!(self._messages_persisted_val.load(Ordering::Relaxed)));
m.insert("room_messages_persist_failed_total".into(), serde_json::json!(self._messages_persist_failed_val.load(Ordering::Relaxed)));
m.insert("room_broadcasts_sent_total".into(), serde_json::json!(self._broadcasts_sent_val.load(Ordering::Relaxed)));
m.insert("room_broadcasts_dropped_total".into(), serde_json::json!(self._broadcasts_dropped_val.load(Ordering::Relaxed)));
m.insert("room_duplicates_skipped_total".into(), serde_json::json!(self._duplicates_skipped_val.load(Ordering::Relaxed)));
m.insert("room_redis_publish_failed_total".into(), serde_json::json!(self._redis_publish_failed_val.load(Ordering::Relaxed)));
m.insert("room_ws_rate_limit_hits_total".into(), serde_json::json!(self._ws_rate_limit_hits_val.load(Ordering::Relaxed)));
m.insert("room_ws_auth_failures_total".into(), serde_json::json!(self._ws_auth_failures_val.load(Ordering::Relaxed)));
m.insert("room_ws_heartbeat_sent_total".into(), serde_json::json!(self._ws_heartbeat_sent_total_val.load(Ordering::Relaxed)));
m.insert("room_ws_heartbeat_timeout_total".into(), serde_json::json!(self._ws_heartbeat_timeout_total_val.load(Ordering::Relaxed)));
m.insert("room_ws_idle_timeout_total".into(), serde_json::json!(self._ws_idle_timeout_total_val.load(Ordering::Relaxed)));
m
}
}

View File

@ -19,6 +19,7 @@ use crate::connection::{
};
use crate::error::RoomError;
use agent::chat::{AiRequest, ChatService, Mention};
use agent::react::ReactStep;
use agent::TaskService;
use models::agent_task::AgentType;
@ -898,8 +899,31 @@ impl RoomService {
};
let use_streaming = ai_config.stream;
let is_react = ai_config.agent_type.as_deref() == Some("react");
if use_streaming {
if is_react {
if use_streaming {
self.process_message_ai_react_streaming(
chat_service.clone(),
request,
room_id,
room.project,
ai_config.model,
lock_guard,
)
.await;
} else {
self.process_message_ai_react_nonstreaming(
chat_service.clone(),
request,
room_id,
room.project,
ai_config.model,
lock_guard,
)
.await;
}
} else if use_streaming {
self.process_message_ai_streaming(
chat_service.clone(),
request,
@ -1160,6 +1184,266 @@ impl RoomService {
});
}
/// ReAct agent — non-streaming: collect full answer then persist.
async fn process_message_ai_react_nonstreaming(
&self,
chat_service: Arc<ChatService>,
request: AiRequest,
room_id: Uuid,
project_id: Uuid,
model_id: Uuid,
lock_guard: crate::room_ai_queue::RoomAiLockGuard,
) {
let chat_service = chat_service.clone();
let db = self.db.clone();
let cache = self.cache.clone();
let queue = self.queue.clone();
let room_manager = self.room_manager.clone();
let room_id_for_ai = room_id;
let project_id_for_ai = project_id;
let model_id_inner = model_id;
tokio::spawn(async move {
let _lock_guard = lock_guard;
let model_display_name = request.model.name.clone();
let final_answer = chat_service
.process_react(&request, |_step| {
// ReAct step events are logged internally; no streaming output here.
})
.await;
match final_answer {
Ok(response) => {
if let Err(e) = Self::create_and_publish_ai_message(
&db,
&cache,
&queue,
&room_manager,
room_id_for_ai,
project_id_for_ai,
Uuid::now_v7(),
response,
model_id_inner,
Some(model_display_name),
)
.await
{
tracing::error!(error = %e, "Failed to create ReAct AI message");
} else {
let now = Utc::now();
if let Err(e) = room_ai::Entity::update_many()
.col_expr(
room_ai::Column::CallCount,
Expr::col(room_ai::Column::CallCount).add(1),
)
.col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now)))
.filter(room_ai::Column::Room.eq(room_id_for_ai))
.filter(room_ai::Column::Model.eq(model_id_inner))
.exec(&db)
.await
{
tracing::warn!(error = %e, "Failed to update room_ai call stats");
}
}
}
Err(e) => {
tracing::error!(error = %e, "ReAct agent failed");
}
}
});
}
/// ReAct agent — streaming: forward each ReactStep to WebSocket, then persist final answer.
async fn process_message_ai_react_streaming(
&self,
chat_service: Arc<ChatService>,
request: AiRequest,
room_id: Uuid,
project_id: Uuid,
_model_id: Uuid,
lock_guard: crate::room_ai_queue::RoomAiLockGuard,
) {
use queue::RoomMessageStreamChunkEvent;
let streaming_msg_id = Uuid::now_v7();
let seq = match Self::next_room_message_seq_internal(room_id, &self.db, &self.cache).await {
Ok(s) => s,
Err(e) => {
tracing::error!(error = %e, "Failed to get seq for ReAct streaming");
return;
}
};
let room_manager = self.room_manager.clone();
let db = self.db.clone();
let room_id_inner = room_id;
let project_id_inner = project_id;
let now = Utc::now();
let sender_type = "ai".to_string();
let queue = self.queue.clone();
let ai_display_name = request.model.name.clone();
let model_id_inner = request.model.id;
tokio::spawn(async move {
let _lock_guard = lock_guard;
// Buffer each ReactStep and forward as a stream chunk.
let answer_buffer: std::sync::Arc<std::sync::Mutex<String>> =
std::sync::Arc::new(std::sync::Mutex::new(String::new()));
let step_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let on_step = {
let room_manager = room_manager.clone();
let streaming_msg_id = streaming_msg_id;
let room_id = room_id_inner;
let step_count = step_count.clone();
let ai_display_name_for_step = std::sync::Arc::new(ai_display_name.clone());
let answer_buffer = answer_buffer.clone();
move |step: ReactStep| {
let room_manager = room_manager.clone();
let content = match &step {
ReactStep::Thought { step: _, thought } => {
format!("[Thinking] {}", thought)
}
ReactStep::Action { step: _, action } => {
format!(
"[Action] Calling `{}` with {:?}",
action.name, action.args
)
}
ReactStep::Observation { step: _, observation } => {
format!("[Observation] {}", observation)
}
ReactStep::Answer { step: _, answer } => {
answer.clone()
}
};
if let ReactStep::Answer { .. } = &step {
step_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
let done = matches!(&step, ReactStep::Answer { .. });
let content_for_buffer = if done {
content.clone()
} else {
String::new()
};
let ai_name = ai_display_name_for_step.clone();
let answer_buf = answer_buffer.clone();
tokio::spawn(async move {
let event = RoomMessageStreamChunkEvent {
message_id: streaming_msg_id,
room_id,
content,
done,
error: None,
display_name: Some((*ai_name).clone()),
};
room_manager.broadcast_stream_chunk(event).await;
if done {
answer_buf.lock().unwrap().push_str(&content_for_buffer);
}
});
}
};
let result = chat_service
.process_react(&request, on_step)
.await;
let final_content = answer_buffer.lock().unwrap().clone();
match result {
Ok(_) if !final_content.is_empty() => {
let envelope = RoomMessageEnvelope {
id: streaming_msg_id,
dedup_key: Some(format!("{}:{}", room_id_inner, streaming_msg_id)),
room_id: room_id_inner,
sender_type: sender_type.clone(),
sender_id: None,
model_id: Some(model_id_inner),
thread_id: None,
content: final_content.clone(),
content_type: "text".to_string(),
send_at: now,
seq,
in_reply_to: None,
};
if let Err(e) = queue.publish(room_id_inner, envelope).await {
tracing::error!(error = %e, "Failed to publish ReAct streaming message");
} else {
let now = Utc::now();
if let Err(e) = room_ai::Entity::update_many()
.col_expr(
room_ai::Column::CallCount,
Expr::col(room_ai::Column::CallCount).add(1),
)
.col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now)))
.filter(room_ai::Column::Room.eq(room_id_inner))
.filter(room_ai::Column::Model.eq(model_id_inner))
.exec(&db)
.await
{
tracing::warn!(error = %e, "Failed to update room_ai call stats");
}
let msg_event = queue::RoomMessageEvent {
id: streaming_msg_id,
room_id: room_id_inner,
sender_type: sender_type.clone(),
sender_id: None,
thread_id: None,
content: final_content,
content_type: "text".to_string(),
send_at: now,
seq,
display_name: Some(ai_display_name.clone()),
in_reply_to: None,
reactions: None,
message_id: None,
};
room_manager.broadcast(room_id_inner, msg_event).await;
room_manager.metrics.messages_sent.increment(1);
let event = queue::ProjectRoomEvent {
event_type: super::RoomEventType::NewMessage.as_str().into(),
project_id: project_id_inner,
room_id: Some(room_id_inner),
category_id: None,
message_id: Some(streaming_msg_id),
seq: Some(seq),
timestamp: now,
};
queue
.publish_project_room_event(project_id_inner, event)
.await;
}
}
Ok(_) => {
tracing::warn!("ReAct agent returned empty answer");
}
Err(e) => {
tracing::error!(error = %e, "ReAct streaming failed");
let event = RoomMessageStreamChunkEvent {
message_id: streaming_msg_id,
room_id: room_id_inner,
content: String::new(),
done: true,
error: Some(e.to_string()),
display_name: Some(ai_display_name.clone()),
};
room_manager.broadcast_stream_chunk(event).await;
}
}
room_manager.close_stream_channel(streaming_msg_id).await;
});
}
pub async fn create_and_publish_ai_message(
db: &AppDatabase,
cache: &AppCache,

View File

@ -279,6 +279,7 @@ pub struct RoomAiUpsertRequest {
pub think: Option<bool>,
pub stream: Option<bool>,
pub min_score: Option<f32>,
pub agent_type: Option<String>,
}
#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]
@ -297,6 +298,7 @@ pub struct RoomAiResponse {
pub think: bool,
pub stream: bool,
pub min_score: Option<f32>,
pub agent_type: Option<String>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}

View File

@ -23,6 +23,10 @@ tonic-prost = "0.14.5"
# Internal
session_manager = { workspace = true }
deadpool-redis = { workspace = true, features = ["cluster"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
redis = { workspace = true }
# Logging / Tracing
tracing = { workspace = true }

View File

@ -9,6 +9,8 @@ use super::generated::admin::{
KickUserFromWorkspaceRequest, KickUserRequest,
GetUserStatusRequest, GetUserInfoRequest,
GetWorkspaceOnlineUsersRequest, IsUserOnlineRequest,
GetMetricsRequest, ExportMetricsCsvRequest,
InstanceMetrics,
};
use super::generated::admin_session_admin::session_admin_client::SessionAdminClient;
use super::types::from_proto_status;
@ -133,6 +135,31 @@ impl AdminGrpcClient {
.map_err(|e| anyhow::anyhow!("gRPC error: {}", e))?;
Ok(res.into_inner().online)
}
/// Query metrics across all app instances.
pub async fn get_metrics(
&mut self,
instance_filter: &str,
limit: u32,
) -> anyhow::Result<Vec<InstanceMetrics>> {
let req = tonic::Request::new(GetMetricsRequest {
instance_filter: instance_filter.to_string(),
limit,
});
let res = self.inner.get_metrics(req).await
.map_err(|e| anyhow::anyhow!("gRPC error: {}", e))?;
Ok(res.into_inner().instances)
}
/// Export all metrics as CSV string.
pub async fn export_metrics_csv(&mut self, instance_filter: &str) -> anyhow::Result<String> {
let req = tonic::Request::new(ExportMetricsCsvRequest {
instance_filter: instance_filter.to_string(),
});
let res = self.inner.export_metrics_csv(req).await
.map_err(|e| anyhow::anyhow!("gRPC error: {}", e))?;
Ok(res.into_inner().csv)
}
}
// ---------------------------------------------------------------------------

View File

@ -305,6 +305,58 @@ pub mod session_admin_client {
.insert(GrpcMethod::new("admin.SessionAdmin", "IsUserOnline"));
self.inner.unary(req, path, codec).await
}
pub async fn get_metrics(
&mut self,
request: impl tonic::IntoRequest<
crate::admin::generated::admin::GetMetricsRequest,
>,
) -> std::result::Result<
tonic::Response<crate::admin::generated::admin::GetMetricsResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic_prost::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/admin.SessionAdmin/GetMetrics",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("admin.SessionAdmin", "GetMetrics"));
self.inner.unary(req, path, codec).await
}
pub async fn export_metrics_csv(
&mut self,
request: impl tonic::IntoRequest<
crate::admin::generated::admin::ExportMetricsCsvRequest,
>,
) -> std::result::Result<
tonic::Response<crate::admin::generated::admin::ExportMetricsCsvResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic_prost::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/admin.SessionAdmin/ExportMetricsCsv",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("admin.SessionAdmin", "ExportMetricsCsv"));
self.inner.unary(req, path, codec).await
}
}
}
/// Generated server implementations.
@ -390,6 +442,22 @@ pub mod session_admin_server {
tonic::Response<crate::admin::generated::admin::IsUserOnlineResponse>,
tonic::Status,
>;
async fn get_metrics(
&self,
request: tonic::Request<crate::admin::generated::admin::GetMetricsRequest>,
) -> std::result::Result<
tonic::Response<crate::admin::generated::admin::GetMetricsResponse>,
tonic::Status,
>;
async fn export_metrics_csv(
&self,
request: tonic::Request<
crate::admin::generated::admin::ExportMetricsCsvRequest,
>,
) -> std::result::Result<
tonic::Response<crate::admin::generated::admin::ExportMetricsCsvResponse>,
tonic::Status,
>;
}
#[derive(Debug)]
pub struct SessionAdminServer<T> {
@ -864,6 +932,103 @@ pub mod session_admin_server {
};
Box::pin(fut)
}
"/admin.SessionAdmin/GetMetrics" => {
#[allow(non_camel_case_types)]
struct GetMetricsSvc<T: SessionAdmin>(pub Arc<T>);
impl<
T: SessionAdmin,
> tonic::server::UnaryService<
crate::admin::generated::admin::GetMetricsRequest,
> for GetMetricsSvc<T> {
type Response = crate::admin::generated::admin::GetMetricsResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<
crate::admin::generated::admin::GetMetricsRequest,
>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as SessionAdmin>::get_metrics(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = GetMetricsSvc(inner);
let codec = tonic_prost::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/admin.SessionAdmin/ExportMetricsCsv" => {
#[allow(non_camel_case_types)]
struct ExportMetricsCsvSvc<T: SessionAdmin>(pub Arc<T>);
impl<
T: SessionAdmin,
> tonic::server::UnaryService<
crate::admin::generated::admin::ExportMetricsCsvRequest,
> for ExportMetricsCsvSvc<T> {
type Response = crate::admin::generated::admin::ExportMetricsCsvResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<
crate::admin::generated::admin::ExportMetricsCsvRequest,
>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as SessionAdmin>::export_metrics_csv(&inner, request)
.await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = ExportMetricsCsvSvc(inner);
let codec = tonic_prost::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => {
Box::pin(async move {
let mut response = http::Response::new(

View File

@ -109,6 +109,49 @@ pub struct IsUserOnlineResponse {
#[prost(bool, tag = "1")]
pub online: bool,
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct GetMetricsRequest {
/// filter by instance_id substring
#[prost(string, tag = "1")]
pub instance_filter: ::prost::alloc::string::String,
/// max snapshots per instance (default 100)
#[prost(uint32, tag = "2")]
pub limit: u32,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetMetricsResponse {
#[prost(message, repeated, tag = "1")]
pub instances: ::prost::alloc::vec::Vec<InstanceMetrics>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct InstanceMetrics {
#[prost(string, tag = "1")]
pub instance_id: ::prost::alloc::string::String,
#[prost(int64, tag = "2")]
pub timestamp_secs: i64,
/// HTTP metrics, key = metric name, value = JSON value
#[prost(map = "string, string", tag = "3")]
pub http: ::std::collections::HashMap<
::prost::alloc::string::String,
::prost::alloc::string::String,
>,
/// Room/room metrics
#[prost(map = "string, string", tag = "4")]
pub room: ::std::collections::HashMap<
::prost::alloc::string::String,
::prost::alloc::string::String,
>,
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct ExportMetricsCsvRequest {
#[prost(string, tag = "1")]
pub instance_filter: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct ExportMetricsCsvResponse {
#[prost(string, tag = "1")]
pub csv: ::prost::alloc::string::String,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum OnlineStatus {
@ -426,6 +469,54 @@ pub mod session_admin_client {
.insert(GrpcMethod::new("admin.SessionAdmin", "IsUserOnline"));
self.inner.unary(req, path, codec).await
}
pub async fn get_metrics(
&mut self,
request: impl tonic::IntoRequest<super::GetMetricsRequest>,
) -> std::result::Result<
tonic::Response<super::GetMetricsResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic_prost::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/admin.SessionAdmin/GetMetrics",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("admin.SessionAdmin", "GetMetrics"));
self.inner.unary(req, path, codec).await
}
pub async fn export_metrics_csv(
&mut self,
request: impl tonic::IntoRequest<super::ExportMetricsCsvRequest>,
) -> std::result::Result<
tonic::Response<super::ExportMetricsCsvResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic_prost::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/admin.SessionAdmin/ExportMetricsCsv",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("admin.SessionAdmin", "ExportMetricsCsv"));
self.inner.unary(req, path, codec).await
}
}
}
/// Generated server implementations.
@ -497,6 +588,20 @@ pub mod session_admin_server {
tonic::Response<super::IsUserOnlineResponse>,
tonic::Status,
>;
async fn get_metrics(
&self,
request: tonic::Request<super::GetMetricsRequest>,
) -> std::result::Result<
tonic::Response<super::GetMetricsResponse>,
tonic::Status,
>;
async fn export_metrics_csv(
&self,
request: tonic::Request<super::ExportMetricsCsvRequest>,
) -> std::result::Result<
tonic::Response<super::ExportMetricsCsvResponse>,
tonic::Status,
>;
}
#[derive(Debug)]
pub struct SessionAdminServer<T> {
@ -949,6 +1054,97 @@ pub mod session_admin_server {
};
Box::pin(fut)
}
"/admin.SessionAdmin/GetMetrics" => {
#[allow(non_camel_case_types)]
struct GetMetricsSvc<T: SessionAdmin>(pub Arc<T>);
impl<
T: SessionAdmin,
> tonic::server::UnaryService<super::GetMetricsRequest>
for GetMetricsSvc<T> {
type Response = super::GetMetricsResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::GetMetricsRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as SessionAdmin>::get_metrics(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = GetMetricsSvc(inner);
let codec = tonic_prost::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/admin.SessionAdmin/ExportMetricsCsv" => {
#[allow(non_camel_case_types)]
struct ExportMetricsCsvSvc<T: SessionAdmin>(pub Arc<T>);
impl<
T: SessionAdmin,
> tonic::server::UnaryService<super::ExportMetricsCsvRequest>
for ExportMetricsCsvSvc<T> {
type Response = super::ExportMetricsCsvResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::ExportMetricsCsvRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as SessionAdmin>::export_metrics_csv(&inner, request)
.await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = ExportMetricsCsvSvc(inner);
let codec = tonic_prost::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => {
Box::pin(async move {
let mut response = http::Response::new(

View File

@ -7,11 +7,12 @@ use tonic::{transport::Server, Request, Response, Status};
use tracing::{info_span, Instrument};
use super::generated::admin::{
GetUserInfoRequest, GetUserInfoResponse, GetUserStatusRequest, GetUserStatusResponse,
GetWorkspaceOnlineUsersRequest, GetWorkspaceOnlineUsersResponse, IsUserOnlineRequest,
IsUserOnlineResponse, KickUserFromWorkspaceRequest, KickUserFromWorkspaceResponse,
KickUserRequest, KickUserResponse, ListUserSessionsRequest, ListUserSessionsResponse,
ListWorkspaceSessionsRequest, ListWorkspaceSessionsResponse,
GetMetricsRequest, GetMetricsResponse, GetUserInfoRequest, GetUserInfoResponse,
GetUserStatusRequest, GetUserStatusResponse, GetWorkspaceOnlineUsersRequest,
GetWorkspaceOnlineUsersResponse, InstanceMetrics, IsUserOnlineRequest, IsUserOnlineResponse,
KickUserFromWorkspaceRequest, KickUserFromWorkspaceResponse, KickUserRequest, KickUserResponse,
ListUserSessionsRequest, ListUserSessionsResponse, ListWorkspaceSessionsRequest,
ListWorkspaceSessionsResponse, ExportMetricsCsvRequest, ExportMetricsCsvResponse,
};
use super::generated::admin_session_admin::session_admin_server::{
SessionAdmin, SessionAdminServer,
@ -204,6 +205,40 @@ impl SessionAdmin for SessionAdminService {
.instrument(info_span!("is_user_online", user_id = %user_id))
.await
}
async fn get_metrics(
&self,
req: Request<GetMetricsRequest>,
) -> Result<Response<GetMetricsResponse>, Status> {
let r = req.get_ref();
let instance_filter = r.instance_filter.as_str();
let limit = if r.limit > 0 { r.limit as usize } else { 100 };
async {
let pool = self.session_manager.pool();
let instances = query_all_instance_metrics(pool, instance_filter, limit).await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(GetMetricsResponse { instances }))
}
.instrument(info_span!("get_metrics"))
.await
}
async fn export_metrics_csv(
&self,
req: Request<ExportMetricsCsvRequest>,
) -> Result<Response<ExportMetricsCsvResponse>, Status> {
let instance_filter = req.get_ref().instance_filter.as_str();
async {
let pool = self.session_manager.pool();
let csv = export_all_metrics_csv(pool, instance_filter).await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(ExportMetricsCsvResponse { csv }))
}
.instrument(info_span!("export_metrics_csv"))
.await
}
}
/// Default gRPC admin port.
@ -243,3 +278,185 @@ pub fn spawn(
let _ = shutdown_rx.recv().await;
})
}
// ---------------------------------------------------------------------------
// Metrics helpers — mirror of observability::redis_metrics (daily hash format)
// ---------------------------------------------------------------------------
use deadpool_redis::cluster::Pool;
use serde::{Deserialize, Serialize};
/// Snapshot stored in Redis under `metrics:{instance}:{YYYY-MM-DD}` (hash).
#[derive(Debug, Clone, Serialize, Deserialize)]
struct MetricsSnapshot {
instance_id: String,
timestamp_secs: i64,
#[serde(flatten)]
http: std::collections::HashMap<String, serde_json::Value>,
#[serde(flatten)]
room: std::collections::HashMap<String, serde_json::Value>,
}
/// Query metrics for all known instances, optionally filtered by `instance_filter`.
async fn query_all_instance_metrics(
pool: &Pool,
instance_filter: &str,
limit: usize,
) -> anyhow::Result<Vec<InstanceMetrics>> {
let mut conn = pool.get().await?;
// Scan for all daily index keys to extract instance ids
let mut cursor = 0u64;
let mut all_instance_ids = Vec::new();
let mut seen = std::collections::HashSet::new();
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg("metrics:index:*")
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await?;
for k in keys {
// Key format: metrics:index:{instance}:{YYYY-MM-DD}
if let Some(rest) = k.strip_prefix("metrics:index:") {
if let Some((instance_id, _)) = rest.rsplit_once(':') {
if seen.insert(instance_id.to_string()) {
all_instance_ids.push(instance_id.to_string());
}
}
}
}
cursor = new_cursor;
if cursor == 0 {
break;
}
}
let mut results = Vec::new();
for instance_id in all_instance_ids {
if !instance_filter.is_empty() && !instance_id.contains(instance_filter) {
continue;
}
let snapshots = query_instance_metrics(pool, &instance_id, limit).await?;
for (_, payload) in snapshots {
results.push(InstanceMetrics {
instance_id: payload.instance_id,
timestamp_secs: payload.timestamp_secs,
http: payload
.http
.into_iter()
.map(|(k, v)| (k, v.to_string()))
.collect(),
room: payload
.room
.into_iter()
.map(|(k, v)| (k, v.to_string()))
.collect(),
});
}
}
Ok(results)
}
/// Query metrics for a single instance from Redis (daily hash buckets).
async fn query_instance_metrics(
pool: &Pool,
instance_id: &str,
limit: usize,
) -> anyhow::Result<Vec<(i64, MetricsSnapshot)>> {
let mut conn = pool.get().await?;
// Scan for all daily index keys for this instance
let index_pattern = format!("metrics:index:{}:*", instance_id);
let mut day_cursor = 0u64;
let mut day_keys: Vec<String> = Vec::new();
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(day_cursor)
.arg("MATCH")
.arg(&index_pattern)
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await?;
day_keys.extend(keys);
day_cursor = new_cursor;
if day_cursor == 0 {
break;
}
}
// Collect all timestamps from all daily indexes
let mut all_timestamps: Vec<i64> = Vec::new();
for day_key in &day_keys {
let timestamps: Vec<i64> = redis::cmd("ZREVRANGE")
.arg(day_key)
.arg(0)
.arg((limit as isize - 1).max(0))
.query_async(&mut conn)
.await?;
all_timestamps.extend(timestamps);
}
all_timestamps.sort_by_key(|&ts| std::cmp::Reverse(ts));
all_timestamps.truncate(limit);
let mut results = Vec::with_capacity(all_timestamps.len());
for ts in &all_timestamps {
let dt = chrono::DateTime::from_timestamp(*ts, 0)
.unwrap_or_else(chrono::Utc::now);
let day_str = dt.format("%Y-%m-%d").to_string();
let hash_key = format!("metrics:{}:{}", instance_id, day_str);
let fields: Option<std::collections::HashMap<String, String>> = redis::cmd("HGETALL")
.arg(&hash_key)
.query_async(&mut conn)
.await?;
if let Some(fields) = fields {
let mut http = std::collections::HashMap::new();
let mut room = std::collections::HashMap::new();
for (k, v) in &fields {
let json_val: serde_json::Value = serde_json::from_str(v).unwrap_or_else(|_| serde_json::Value::String(v.clone()));
if let Some(stripped) = k.strip_prefix("http_") {
http.insert(stripped.to_string(), json_val);
} else if let Some(stripped) = k.strip_prefix("room_") {
room.insert(stripped.to_string(), json_val);
}
}
results.push((*ts, MetricsSnapshot {
instance_id: instance_id.to_string(),
timestamp_secs: *ts,
http,
room,
}));
}
}
results.sort_by_key(|(ts, _)| *ts);
Ok(results)
}
/// Export all metrics as CSV.
async fn export_all_metrics_csv(pool: &Pool, instance_filter: &str) -> anyhow::Result<String> {
let instances = query_all_instance_metrics(pool, instance_filter, 1000).await?;
let mut rows = Vec::new();
for inst in instances {
let ts = inst.timestamp_secs;
let id = inst.instance_id;
for (k, v) in inst.http {
rows.push(format!("{},{},http_{},{}", id, ts, k, v));
}
for (k, v) in inst.room {
rows.push(format!("{},{},room_{},{}", id, ts, k, v));
}
}
let header = "instance_id,timestamp,metric,value";
if rows.is_empty() {
return Ok(header.to_string());
}
Ok(format!("{}\n{}", header, rows.join("\n")))
}

View File

@ -82,6 +82,24 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.codec_path("tonic_prost::ProstCodec")
.build(),
)
.method(
tonic_prost_build::manual::Method::builder()
.name("get_metrics")
.route_name("GetMetrics")
.input_type("crate::admin::generated::admin::GetMetricsRequest")
.output_type("crate::admin::generated::admin::GetMetricsResponse")
.codec_path("tonic_prost::ProstCodec")
.build(),
)
.method(
tonic_prost_build::manual::Method::builder()
.name("export_metrics_csv")
.route_name("ExportMetricsCsv")
.input_type("crate::admin::generated::admin::ExportMetricsCsvRequest")
.output_type("crate::admin::generated::admin::ExportMetricsCsvResponse")
.codec_path("tonic_prost::ProstCodec")
.build(),
)
.build();
tonic_prost_build::manual::Builder::new()

View File

@ -93,6 +93,34 @@ message IsUserOnlineResponse {
bool online = 1;
}
// ---------------------------------------------------------------------------
// Metrics
// ---------------------------------------------------------------------------
message GetMetricsRequest {
string instance_filter = 1; // filter by instance_id substring
uint32 limit = 2; // max snapshots per instance (default 100)
}
message GetMetricsResponse {
repeated InstanceMetrics instances = 1;
}
message InstanceMetrics {
string instance_id = 1;
int64 timestamp_secs = 2;
// HTTP metrics, key = metric name, value = JSON value
map<string, string> http = 3;
// Room/room metrics
map<string, string> room = 4;
}
message ExportMetricsCsvRequest {
string instance_filter = 1;
}
message ExportMetricsCsvResponse {
string csv = 1;
}
// ---------------------------------------------------------------------------
// Service
// ---------------------------------------------------------------------------
@ -106,4 +134,6 @@ service SessionAdmin {
rpc GetUserInfo(GetUserInfoRequest) returns (GetUserInfoResponse);
rpc GetWorkspaceOnlineUsers(GetWorkspaceOnlineUsersRequest) returns (GetWorkspaceOnlineUsersResponse);
rpc IsUserOnline(IsUserOnlineRequest) returns (IsUserOnlineResponse);
rpc GetMetrics(GetMetricsRequest) returns (GetMetricsResponse);
rpc ExportMetricsCsv(ExportMetricsCsvRequest) returns (ExportMetricsCsvResponse);
}

View File

@ -1,6 +1,7 @@
use std::sync::Arc;
use ::agent::chat::ChatService;
use ::agent::client::AiClientConfig;
use ::agent::task::service::TaskService;
use ::agent::tool::ToolRegistry;
use async_openai::config::OpenAIConfig;
@ -170,12 +171,15 @@ impl AppService {
.with_api_key(&api_key)
.with_api_base(&base_url);
let client = async_openai::Client::with_config(cfg);
let ai_client_config = AiClientConfig::new(api_key).with_base_url(&base_url);
let mut registry = ToolRegistry::new();
git_tools::register_all(&mut registry);
file_tools::register_all(&mut registry);
project_tools::register_all(&mut registry);
Some(Arc::new(
ChatService::new(client).with_tool_registry(registry),
ChatService::new(client)
.with_ai_client_config(ai_client_config)
.with_tool_registry(registry),
))
}
(Err(e), _) => {