diff --git a/Cargo.lock b/Cargo.lock index 505ee9b..c157261 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5181,7 +5181,6 @@ dependencies = [ "actix-web", "anyhow", "chrono", - "deadpool-redis", "futures", "hostname", "metrics", @@ -5191,7 +5190,6 @@ dependencies = [ "opentelemetry-http", "opentelemetry-otlp", "opentelemetry_sdk", - "redis", "reqwest 0.13.2", "serde", "serde_json", diff --git a/admin/src/app/admin/metrics/page.tsx b/admin/src/app/admin/metrics/page.tsx deleted file mode 100644 index 51b7f7e..0000000 --- a/admin/src/app/admin/metrics/page.tsx +++ /dev/null @@ -1,199 +0,0 @@ -"use client"; - -import { useEffect, useState } from "react"; -import { format } from "date-fns"; -import { getMetrics, exportMetricsCsv, type InstanceMetrics } from "@/lib/admin-rpc"; - -export default function MetricsPage() { - const [metrics, setMetrics] = useState([]); - const [loading, setLoading] = useState(true); - const [error, setError] = useState(""); - const [exporting, setExporting] = useState(false); - const [instanceFilter, setInstanceFilter] = useState(""); - - useEffect(() => { loadMetrics(); }, []); - - async function loadMetrics() { - setLoading(true); - setError(""); - try { - const data = await getMetrics(instanceFilter); - setMetrics(data); - } catch (e) { - setError(e instanceof Error ? e.message : "加载指标失败"); - } finally { - setLoading(false); - } - } - - async function handleExportCsv() { - setExporting(true); - try { - const csv = await exportMetricsCsv(instanceFilter); - const blob = new Blob([csv], { type: "text/csv;charset=utf-8;" }); - const url = URL.createObjectURL(blob); - const a = document.createElement("a"); - a.href = url; - a.download = `metrics-${format(new Date(), "yyyy-MM-dd_HHmm")}.csv`; - a.click(); - URL.revokeObjectURL(url); - } catch (e) { - alert("导出失败: " + (e instanceof Error ? e.message : String(e))); - } finally { - setExporting(false); - } - } - - // Group by instance - const byInstance = metrics.reduce>((acc, m) => { - if (!acc[m.instance_id]) acc[m.instance_id] = []; - acc[m.instance_id].push(m); - return acc; - }, {}); - - const allInstances = Object.keys(byInstance).sort(); - const latestOf = (arr: InstanceMetrics[]) => - arr.sort((a, b) => b.timestamp_secs - a.timestamp_secs)[0]; - - // Flatten all latest snapshots for a summary table - const latestSnapshots = allInstances.map((id) => ({ - id, - latest: latestOf(byInstance[id]), - })); - - function metricVal(m: InstanceMetrics | undefined, prefix: string, key: string): string { - if (!m) return "—"; - const v = prefix === "http" ? m.http[key] : m.room[key]; - return v ?? "—"; - } - - return ( -
-
-
-

应用指标监控

-

- 共 {allInstances.length} 个实例 - {metrics.length > 0 && ( - - 最新数据: {metrics.length > 0 - ? format(new Date(latestSnapshots[0]?.latest.timestamp_secs * 1000), "yyyy-MM-dd HH:mm:ss") - : "—"} - - )} -

-
-
- { setInstanceFilter(e.target.value); }} - /> - - -
-
- - {error && ( -
- {error} — 确认 adminrpc 服务正在运行(默认: http://adminrpc.admin.svc.cluster.local:9091) -
- )} - - {loading ? ( -
加载中...
- ) : allInstances.length === 0 ? ( -
-
-
-

暂无指标数据

-

- 确认应用已将指标刷新到 Redis(每 5 秒一次) -

-
-
- ) : ( - <> - {/* Summary table */} -
-

实例概览

-
- - - - - - - - - - - - - {latestSnapshots.map(({ id, latest }) => ( - - - - - - - - - ))} - -
实例 ID最后更新活跃 WS 连接发送消息数HTTP 请求数HTTP 错误数
{id}{format(new Date(latest.timestamp_secs * 1000), "HH:mm:ss")}{metricVal(latest, "room", "ws_connections_active")}{metricVal(latest, "room", "messages_sent_total")}{metricVal(latest, "http", "request_count")}{metricVal(latest, "http", "error_count")}
-
-
- - {/* Per-instance detail */} - {allInstances.map((instanceId) => ( -
-

- 实例: {instanceId} -

-
- {/* Room metrics */} -
-

Room 指标

-
- {Object.entries(latestOf(byInstance[instanceId])?.room ?? {}).map(([k, v]) => ( -
- {k} - {v} -
- ))} - {Object.keys(latestOf(byInstance[instanceId])?.room ?? {}).length === 0 && ( -
无数据
- )} -
-
- {/* HTTP metrics */} -
-

HTTP 指标

-
- {Object.entries(latestOf(byInstance[instanceId])?.http ?? {}).map(([k, v]) => ( -
- {k} - {v} -
- ))} - {Object.keys(latestOf(byInstance[instanceId])?.http ?? {}).length === 0 && ( -
无数据
- )} -
-
-
-
- ))} - - )} -
- ); -} diff --git a/admin/src/components/admin/LineChart.tsx b/admin/src/components/admin/LineChart.tsx deleted file mode 100644 index f50da74..0000000 --- a/admin/src/components/admin/LineChart.tsx +++ /dev/null @@ -1,169 +0,0 @@ -"use client"; - -import { format, parseISO } from "date-fns"; - -interface DataPoint { - date: string; - value: number; -} - -interface LineChartProps { - data: DataPoint[]; - width?: number; - height?: number; - color?: string; - label: string; - unit?: string; -} - -export default function LineChart({ - data, - width = 600, - height = 200, - color = "#6366f1", - label, - unit = "", -}: LineChartProps) { - if (data.length === 0) { - return ( -
- 暂无数据 -
- ); - } - - const padding = { top: 16, right: 16, bottom: 36, left: 44 }; - const chartWidth = width - padding.left - padding.right; - const chartHeight = height - padding.top - padding.bottom; - - const values = data.map((d) => d.value); - const maxValue = Math.max(...values, 1); - const minValue = 0; - - function xScale(i: number) { - return padding.left + (i / (data.length - 1 || 1)) * chartWidth; - } - - function yScale(v: number) { - return padding.top + chartHeight - ((v - minValue) / (maxValue - minValue || 1)) * chartHeight; - } - - // Build SVG path - const pathD = data - .map((d, i) => `${i === 0 ? "M" : "L"} ${xScale(i).toFixed(1)} ${yScale(d.value).toFixed(1)}`) - .join(" "); - - // Area path (fill under the line) - const areaD = - pathD + - ` L ${xScale(data.length - 1).toFixed(1)} ${(padding.top + chartHeight).toFixed(1)}` + - ` L ${padding.left} ${(padding.top + chartHeight).toFixed(1)} Z`; - - // Y-axis ticks (5 ticks) - const yTicks = Array.from({ length: 5 }, (_, i) => { - const v = minValue + ((maxValue - minValue) * i) / 4; - return { v, y: yScale(v) }; - }); - - // X-axis ticks (show every nth label based on data length) - const step = Math.max(1, Math.floor(data.length / 6)); - const xTicks = data.filter((_, i) => i % step === 0 || i === data.length - 1); - - const gradientId = `grad-${label.replace(/\s/g, "-")}`; - - return ( - - - - - - - - - {/* Y-axis gridlines */} - {yTicks.map((t, i) => ( - - ))} - - {/* Y-axis labels */} - {yTicks.map((t, i) => ( - - {Math.round(t.v)} - - ))} - - {/* X-axis labels */} - {xTicks.map((d, i) => { - const idx = data.findIndex((x) => x.date === d.date); - return ( - - {(() => { - try { - return format(parseISO(d.date), "MM-dd"); - } catch { - return d.date.slice(5); - } - })()} - - ); - })} - - {/* Area fill */} - - - {/* Line */} - - - {/* Data points (dots) */} - {data.map((d, i) => ( - - - {(() => { - try { - return `${format(parseISO(d.date), "yyyy-MM-dd")}: ${d.value}${unit}`; - } catch { - return `${d.date}: ${d.value}${unit}`; - } - })()} - - - ))} - - ); -} diff --git a/admin/src/components/admin/Sidebar.tsx b/admin/src/components/admin/Sidebar.tsx index 3d2a71d..ba8dfd0 100644 --- a/admin/src/components/admin/Sidebar.tsx +++ b/admin/src/components/admin/Sidebar.tsx @@ -52,7 +52,6 @@ export default function Sidebar({ user, loading, onLogout }: SidebarProps) { { href: "/admin/sessions", label: "Admin 会话", icon: "◐" }, { href: "/platform/sessions", label: "平台会话", icon: "☀" }, { href: "/admin/api-tokens", label: "API Token", icon: "⚿" }, - { href: "/admin/metrics", label: "指标监控", icon: "◉" }, { href: "/admin/daily-report", label: "每日报告", icon: "📧" }, ], }, diff --git a/admin/src/lib/admin-rpc.ts b/admin/src/lib/admin-rpc.ts index e8a547f..16e43b6 100644 --- a/admin/src/lib/admin-rpc.ts +++ b/admin/src/lib/admin-rpc.ts @@ -2,7 +2,7 @@ * adminrpc HTTP REST client * * Calls the adminrpc HTTP server (default: http://gitdata-adminrpc.gitdataai.svc.cluster.local:9091) - * which exposes the same session management and metrics APIs as the gRPC service. + * which exposes session management and health APIs. * * Usage: * import { listWorkspaceSessions, kickUser } from "@/lib/admin-rpc"; @@ -99,31 +99,6 @@ export async function kickUserFromWorkspace( }); } -// ─── Metrics API ───────────────────────────────────────────────────────────── - -export interface InstanceMetrics { - instance_id: string; - timestamp_secs: number; - http: Record; - room: Record; -} - -/** Get metrics across all app instances. */ -export async function getMetrics(instanceFilter = ""): Promise { - const qs = instanceFilter ? `?instance_filter=${encodeURIComponent(instanceFilter)}` : ""; - return rpc(`/api/admin/metrics${qs}`); -} - -/** Export all metrics as CSV string. */ -export async function exportMetricsCsv(instanceFilter = ""): Promise { - const qs = instanceFilter ? `?instance_filter=${encodeURIComponent(instanceFilter)}` : ""; - const res = await fetch(`${BASE_URL}/api/admin/metrics/export${qs}`); - if (!res.ok) { - throw new Error(`adminrpc GET /metrics/export failed (${res.status})`); - } - return res.text(); -} - /** Get adminrpc health status. */ export async function adminRpcHealth(): Promise<{ ok: boolean }> { return rpc<{ ok: boolean }>("/health"); diff --git a/apps/adminrpc/src/main.rs b/apps/adminrpc/src/main.rs index 7258d28..a02a829 100644 --- a/apps/adminrpc/src/main.rs +++ b/apps/adminrpc/src/main.rs @@ -78,15 +78,12 @@ async fn main() -> anyhow::Result<()> { }); let http_handle = tokio::spawn(async move { - let pool_for_http = pool.clone(); let sm_for_http = session_manager.clone(); let result = HttpServer::new(move || { ActixApp::new() - .app_data(web::Data::new(pool_for_http.clone())) .app_data(web::Data::new(sm_for_http.clone())) .route("/health", web::get().to(health)) - .route("/admin/metrics/export", web::get().to(metrics_export)) .service( web::scope("/api/admin") .route( @@ -117,10 +114,7 @@ async fn main() -> anyhow::Result<()> { .route( "/sessions/kick-workspace", web::post().to(kick_user_from_workspace), - ) - // Metrics - .route("/metrics", web::get().to(get_metrics)) - .route("/metrics/export", web::get().to(metrics_export)), + ), ) }) .bind(admin_addr) @@ -149,26 +143,6 @@ async fn health() -> HttpResponse { HttpResponse::Ok().json(serde_json::json!({ "ok": true })) } -async fn metrics_export(pool: web::Data) -> HttpResponse { - match observability::export_all_metrics_csv(pool.get_ref(), "").await { - Ok(csv) => HttpResponse::Ok() - .content_type("text/csv; charset=utf-8") - .body(csv), - Err(e) => { - HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })) - } - } -} - -async fn get_metrics(pool: web::Data) -> HttpResponse { - match observability::query_all_instance_metrics(pool.get_ref(), "", 100).await { - Ok(instances) => HttpResponse::Ok().json(instances), - Err(e) => { - HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() })) - } - } -} - fn parse_uuid(s: &str) -> Option { Uuid::parse_str(s).ok() } diff --git a/apps/app/src/main.rs b/apps/app/src/main.rs index d7b9bd9..b434445 100644 --- a/apps/app/src/main.rs +++ b/apps/app/src/main.rs @@ -7,7 +7,6 @@ use db::cache::AppCache; use db::database::AppDatabase; use observability::{ init_tracing_subscriber, install_recorder, prometheus_handler, spawn_http_metrics_poller, - spawn_redis_metrics_flusher, HttpMetrics, HttpSnapshotGuard, MetricsMiddleware, TracingSpanMiddleware, instance_id, }; use sea_orm::ConnectionTrait; @@ -102,18 +101,6 @@ async fn main() -> anyhow::Result<()> { ); let http_snapshot_data = web::Data::new(http_snapshot); - // ── Redis metrics flusher (every 5s → key: metrics:{instance}:{ts}) ────── - let http_for_flusher = http_metrics.clone(); - let prometheus_for_flusher = prometheus_handle_arc.clone(); - spawn_redis_metrics_flusher( - cache.redis_pool().clone(), - instance_id(), - prometheus_for_flusher, - http_for_flusher, - std::time::Duration::from_secs(5), - ); - tracing::info!("Redis metrics flusher started (5s interval)"); - let bind_addr = args.bind.unwrap_or_else(|| "127.0.0.1:8080".to_string()); tracing::info!(bind_addr = %bind_addr, "Listening"); let http_metrics_server = http_metrics.clone(); diff --git a/libs/observability/Cargo.toml b/libs/observability/Cargo.toml index c886c7d..b70f3cf 100644 --- a/libs/observability/Cargo.toml +++ b/libs/observability/Cargo.toml @@ -25,10 +25,6 @@ serde = { workspace = true, features = ["derive"] } tokio = { workspace = true, features = ["rt"] } anyhow = { workspace = true } -# Redis (for metrics exporter) -redis = { workspace = true } -deadpool-redis = { workspace = true, features = ["cluster"] } - # Prometheus metrics export metrics = "0.22" metrics-exporter-prometheus = "0.13" diff --git a/libs/observability/src/lib.rs b/libs/observability/src/lib.rs index aee8bdd..879bed9 100644 --- a/libs/observability/src/lib.rs +++ b/libs/observability/src/lib.rs @@ -9,7 +9,6 @@ pub mod metrics_middleware; pub mod prometheus_exporter; pub mod otlp; pub mod tracing_middleware; -pub mod redis_metrics; pub use tracing_fmt::{init_tracing_subscriber, instance_id}; pub use metrics_middleware::{MetricsMiddleware, HttpMetrics}; @@ -19,9 +18,3 @@ pub use prometheus_exporter::{ }; pub use otlp::{init_otlp, OtelGuard}; pub use tracing_middleware::TracingSpanMiddleware; -pub use redis_metrics::{ - MetricsSnapshot, - spawn_redis_metrics_flusher, - query_instance_metrics, query_all_instance_metrics, export_all_metrics_csv, - FlatInstanceMetrics, -}; diff --git a/libs/observability/src/redis_metrics.rs b/libs/observability/src/redis_metrics.rs deleted file mode 100644 index fa702f0..0000000 --- a/libs/observability/src/redis_metrics.rs +++ /dev/null @@ -1,342 +0,0 @@ -//! Redis-backed metrics exporter. -//! -//! Every `interval` seconds this module pushes metric fields to a Redis hash -//! bucketed by day: -//! Key: `metrics:{instance_id}:{YYYY-MM-DD}` (hash) -//! TTL: 30 days per day bucket -//! -//! Also maintains a sorted set `metrics:index:{instance_id}:{YYYY-MM-DD}` per -//! day for querying within a day window. - -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Duration; - -use deadpool_redis::cluster::Pool; -use serde::{Deserialize, Serialize}; - -const METRICS_TTL_SECS: i64 = 30 * 24 * 60 * 60; // 30 days - -/// A snapshot of metric values at a point in time. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct MetricsSnapshot { - pub instance_id: String, - pub timestamp_secs: i64, - #[serde(flatten)] - pub http: HashMap, - #[serde(flatten)] - pub room: HashMap, -} - -/// Starts a background task that snapshots and pushes all metrics to Redis -/// every `interval` seconds. -/// -/// `prometheus_handle`: from `install_recorder()`, used to render RoomMetrics values -/// `http_metrics`: shared `HttpMetrics` from `MetricsMiddleware` -/// `instance_id`: used as the Redis key prefix -pub fn spawn_redis_metrics_flusher( - redis_pool: Pool, - instance_id: String, - prometheus_handle: Arc, - http_metrics: Arc, - interval: Duration, -) { - tokio::spawn(async move { - let mut ticker = tokio::time::interval(interval); - loop { - ticker.tick().await; - if let Err(e) = flush_metrics(&redis_pool, &instance_id, &prometheus_handle, http_metrics.as_ref()).await { - tracing::warn!(error = %e, "failed to flush metrics to Redis"); - } - } - }); -} - -async fn flush_metrics( - pool: &Pool, - instance_id: &str, - prometheus_handle: &Arc, - http_metrics: &crate::metrics_middleware::HttpMetrics, -) -> anyhow::Result<()> { - let now = chrono::Utc::now(); - let now_ts = now.timestamp(); - let day_key = now.format("%Y-%m-%d").to_string(); - - // RoomMetrics from the metrics crate (rendered via PrometheusHandle) - let body = prometheus_handle.render(); - let room = crate::prometheus_exporter::render_to_hashmap(&body); - - // HTTP metrics from AtomicU64 - let http = http_metrics.snapshot(); - - let hash_key = format!("metrics:{}:{}", instance_id, day_key); - let index_key = format!("metrics:index:{}:{}", instance_id, day_key); - - let mut conn = pool.get().await?; - - // HSET each metric field into the daily hash - let mut fields: Vec<(String, String)> = Vec::with_capacity(http.len() + room.len()); - for (k, v) in &http { - fields.push((format!("http_{}", k), v.to_string())); - } - for (k, v) in &room { - fields.push((format!("room_{}", k), v.to_string())); - } - - // HSET field field_value ... - let mut cmd = redis::cmd("HSET"); - cmd.arg(&hash_key); - for (f, val) in &fields { - cmd.arg(f).arg(val); - } - let _: () = cmd.query_async(&mut conn).await?; - - // Set TTL on the hash key (refresh every write) - let _: () = redis::cmd("EXPIRE") - .arg(&hash_key) - .arg(METRICS_TTL_SECS) - .query_async(&mut conn) - .await?; - - // Update sorted set index for this instance+day - let _: () = redis::cmd("ZADD") - .arg(&index_key) - .arg(now_ts as f64) - .arg(now_ts.to_string()) - .query_async(&mut conn) - .await?; - - // Trim index to last 1000 entries per day - let _: () = redis::cmd("ZREMRANGEBYRANK") - .arg(&index_key) - .arg(0) - .arg(-1001) - .query_async(&mut conn) - .await?; - - // Set TTL on the index key - let _: () = redis::cmd("EXPIRE") - .arg(&index_key) - .arg(METRICS_TTL_SECS) - .query_async(&mut conn) - .await?; - - tracing::debug!(key = %hash_key, field_count = fields.len(), "metrics flushed to Redis (daily hash)"); - Ok(()) -} - -/// Query metrics for an instance from Redis, reading the most recent `limit` -/// snapshots across all available daily hash buckets. -/// Returns a vector sorted by timestamp ascending. -pub async fn query_instance_metrics( - pool: &Pool, - instance_id: &str, - limit: usize, -) -> anyhow::Result> { - let mut conn = pool.get().await?; - - // Scan for all daily index keys for this instance - let index_pattern = format!("metrics:index:{}:*", instance_id); - let mut cursor = 0u64; - let mut day_keys: Vec = Vec::new(); - loop { - let (new_cursor, keys): (u64, Vec) = redis::cmd("SCAN") - .arg(cursor) - .arg("MATCH") - .arg(&index_pattern) - .arg("COUNT") - .arg(100) - .query_async(&mut conn) - .await?; - day_keys.extend(keys); - cursor = new_cursor; - if cursor == 0 { - break; - } - } - - // Collect all timestamps from all daily indexes - let mut all_timestamps: Vec = Vec::new(); - for day_key in &day_keys { - let timestamps: Vec = redis::cmd("ZREVRANGE") - .arg(day_key) - .arg(0) - .arg((limit as isize - 1).max(0)) - .query_async(&mut conn) - .await?; - all_timestamps.extend(timestamps); - } - - // Sort descending and take limit - all_timestamps.sort_by_key(|&ts| std::cmp::Reverse(ts)); - all_timestamps.truncate(limit); - - // Read each snapshot from the daily hash - let mut results = Vec::with_capacity(all_timestamps.len()); - for ts in &all_timestamps { - // Derive day bucket from timestamp - let dt = chrono::DateTime::from_timestamp(*ts, 0) - .unwrap_or_else(|| chrono::Utc::now()); - let day_str = dt.format("%Y-%m-%d").to_string(); - let hash_key = format!("metrics:{}:{}", instance_id, day_str); - - let fields: Option> = redis::cmd("HGETALL") - .arg(&hash_key) - .query_async(&mut conn) - .await?; - - if let Some(fields) = fields { - let mut http: HashMap = HashMap::new(); - let mut room: HashMap = HashMap::new(); - for (k, v) in &fields { - let json_val: serde_json::Value = serde_json::from_str(v).unwrap_or_else(|_| serde_json::Value::String(v.clone())); - if let Some(stripped) = k.strip_prefix("http_") { - http.insert(stripped.to_string(), json_val); - } else if let Some(stripped) = k.strip_prefix("room_") { - room.insert(stripped.to_string(), json_val); - } - } - results.push((*ts, MetricsSnapshot { - instance_id: instance_id.to_string(), - timestamp_secs: *ts, - http, - room, - })); - } - } - - results.sort_by_key(|(ts, _)| *ts); - Ok(results) -} - -/// Flat instance metrics suitable for JSON serialization. -#[derive(serde::Serialize)] -pub struct FlatInstanceMetrics { - pub instance_id: String, - pub timestamp_secs: i64, - pub http: std::collections::HashMap, - pub room: std::collections::HashMap, -} - -/// Query all instances' metrics, optionally filtered by `instance_filter`. -pub async fn query_all_instance_metrics( - pool: &Pool, - instance_filter: &str, - limit: usize, -) -> anyhow::Result> { - let mut conn = pool.get().await?; - - // Scan for all daily index keys to extract instance ids - let mut cursor = 0u64; - let mut all_instance_ids = Vec::new(); - let mut seen = std::collections::HashSet::new(); - loop { - let (new_cursor, keys): (u64, Vec) = redis::cmd("SCAN") - .arg(cursor) - .arg("MATCH") - .arg("metrics:index:*") - .arg("COUNT") - .arg(100) - .query_async(&mut conn) - .await?; - for k in keys { - // Key format: metrics:index:{instance}:{YYYY-MM-DD} - if let Some(rest) = k.strip_prefix("metrics:index:") { - if let Some((instance_id, _)) = rest.rsplit_once(':') { - if seen.insert(instance_id.to_string()) { - all_instance_ids.push(instance_id.to_string()); - } - } - } - } - cursor = new_cursor; - if cursor == 0 { - break; - } - } - - let mut results = Vec::new(); - for instance_id in all_instance_ids { - if !instance_filter.is_empty() && !instance_id.contains(instance_filter) { - continue; - } - let snapshots = query_instance_metrics(pool, &instance_id, limit).await?; - for (_, payload) in snapshots { - let mut http: std::collections::HashMap = std::collections::HashMap::new(); - let mut room: std::collections::HashMap = std::collections::HashMap::new(); - for (k, v) in &payload.http { - http.insert(k.clone(), v.to_string()); - } - for (k, v) in &payload.room { - room.insert(k.clone(), v.to_string()); - } - results.push(FlatInstanceMetrics { - instance_id: payload.instance_id, - timestamp_secs: payload.timestamp_secs, - http, - room, - }); - } - } - - Ok(results) -} - -/// Export all metrics across all known instances as CSV. -pub async fn export_all_metrics_csv( - pool: &Pool, - instance_filter: &str, -) -> anyhow::Result { - let mut conn = pool.get().await?; - - // Scan for all daily index keys to extract instance ids - let mut cursor = 0u64; - let mut all_instance_ids = Vec::new(); - let mut seen = std::collections::HashSet::new(); - loop { - let (new_cursor, keys): (u64, Vec) = redis::cmd("SCAN") - .arg(cursor) - .arg("MATCH") - .arg("metrics:index:*") - .arg("COUNT") - .arg(100) - .query_async(&mut conn) - .await?; - for k in keys { - // Key format: metrics:index:{instance}:{YYYY-MM-DD} - if let Some(rest) = k.strip_prefix("metrics:index:") { - if let Some(instance_id) = rest.rsplit_once(':') { - let id = instance_id.0; - if seen.insert(id.to_string()) { - all_instance_ids.push(id.to_string()); - } - } - } - } - cursor = new_cursor; - if cursor == 0 { - break; - } - } - - let mut rows: Vec = vec![ - "instance_id,timestamp,metric,value".to_string(), - ]; - - for instance_id in all_instance_ids { - if !instance_filter.is_empty() && !instance_id.contains(instance_filter) { - continue; - } - let metrics = query_instance_metrics(pool, &instance_id, 1000).await?; - for (ts, payload) in metrics { - for (k, v) in &payload.http { - rows.push(format!("{},{},http_{},{}", instance_id, ts, k, v)); - } - for (k, v) in &payload.room { - rows.push(format!("{},{},room_{},{}", instance_id, ts, k, v)); - } - } - } - - Ok(rows.join("\n")) -} diff --git a/libs/rpc/admin/server.rs b/libs/rpc/admin/server.rs index 6688753..7f70e0b 100644 --- a/libs/rpc/admin/server.rs +++ b/libs/rpc/admin/server.rs @@ -7,12 +7,11 @@ use tonic::{transport::Server, Request, Response, Status}; use tracing::{info_span, Instrument}; use super::generated::admin::{ - GetMetricsRequest, GetMetricsResponse, GetUserInfoRequest, GetUserInfoResponse, - GetUserStatusRequest, GetUserStatusResponse, GetWorkspaceOnlineUsersRequest, - GetWorkspaceOnlineUsersResponse, InstanceMetrics, IsUserOnlineRequest, IsUserOnlineResponse, - KickUserFromWorkspaceRequest, KickUserFromWorkspaceResponse, KickUserRequest, KickUserResponse, - ListUserSessionsRequest, ListUserSessionsResponse, ListWorkspaceSessionsRequest, - ListWorkspaceSessionsResponse, ExportMetricsCsvRequest, ExportMetricsCsvResponse, + GetUserInfoRequest, GetUserInfoResponse, GetUserStatusRequest, GetUserStatusResponse, + GetWorkspaceOnlineUsersRequest, GetWorkspaceOnlineUsersResponse, IsUserOnlineRequest, + IsUserOnlineResponse, KickUserFromWorkspaceRequest, KickUserFromWorkspaceResponse, + KickUserRequest, KickUserResponse, ListUserSessionsRequest, ListUserSessionsResponse, + ListWorkspaceSessionsRequest, ListWorkspaceSessionsResponse, }; use super::generated::admin_session_admin::session_admin_server::{ SessionAdmin, SessionAdminServer, @@ -205,40 +204,6 @@ impl SessionAdmin for SessionAdminService { .instrument(info_span!("is_user_online", user_id = %user_id)) .await } - - async fn get_metrics( - &self, - req: Request, - ) -> Result, Status> { - let r = req.get_ref(); - let instance_filter = r.instance_filter.as_str(); - let limit = if r.limit > 0 { r.limit as usize } else { 100 }; - - async { - let pool = self.session_manager.pool(); - let instances = query_all_instance_metrics(pool, instance_filter, limit).await - .map_err(|e| Status::internal(e.to_string()))?; - Ok(Response::new(GetMetricsResponse { instances })) - } - .instrument(info_span!("get_metrics")) - .await - } - - async fn export_metrics_csv( - &self, - req: Request, - ) -> Result, Status> { - let instance_filter = req.get_ref().instance_filter.as_str(); - - async { - let pool = self.session_manager.pool(); - let csv = export_all_metrics_csv(pool, instance_filter).await - .map_err(|e| Status::internal(e.to_string()))?; - Ok(Response::new(ExportMetricsCsvResponse { csv })) - } - .instrument(info_span!("export_metrics_csv")) - .await - } } /// Default gRPC admin port. @@ -278,185 +243,3 @@ pub fn spawn( let _ = shutdown_rx.recv().await; }) } - -// --------------------------------------------------------------------------- -// Metrics helpers — mirror of observability::redis_metrics (daily hash format) -// --------------------------------------------------------------------------- - -use deadpool_redis::cluster::Pool; -use serde::{Deserialize, Serialize}; - -/// Snapshot stored in Redis under `metrics:{instance}:{YYYY-MM-DD}` (hash). -#[derive(Debug, Clone, Serialize, Deserialize)] -struct MetricsSnapshot { - instance_id: String, - timestamp_secs: i64, - #[serde(flatten)] - http: std::collections::HashMap, - #[serde(flatten)] - room: std::collections::HashMap, -} - -/// Query metrics for all known instances, optionally filtered by `instance_filter`. -async fn query_all_instance_metrics( - pool: &Pool, - instance_filter: &str, - limit: usize, -) -> anyhow::Result> { - let mut conn = pool.get().await?; - - // Scan for all daily index keys to extract instance ids - let mut cursor = 0u64; - let mut all_instance_ids = Vec::new(); - let mut seen = std::collections::HashSet::new(); - loop { - let (new_cursor, keys): (u64, Vec) = redis::cmd("SCAN") - .arg(cursor) - .arg("MATCH") - .arg("metrics:index:*") - .arg("COUNT") - .arg(100) - .query_async(&mut conn) - .await?; - for k in keys { - // Key format: metrics:index:{instance}:{YYYY-MM-DD} - if let Some(rest) = k.strip_prefix("metrics:index:") { - if let Some((instance_id, _)) = rest.rsplit_once(':') { - if seen.insert(instance_id.to_string()) { - all_instance_ids.push(instance_id.to_string()); - } - } - } - } - cursor = new_cursor; - if cursor == 0 { - break; - } - } - - let mut results = Vec::new(); - for instance_id in all_instance_ids { - if !instance_filter.is_empty() && !instance_id.contains(instance_filter) { - continue; - } - let snapshots = query_instance_metrics(pool, &instance_id, limit).await?; - for (_, payload) in snapshots { - results.push(InstanceMetrics { - instance_id: payload.instance_id, - timestamp_secs: payload.timestamp_secs, - http: payload - .http - .into_iter() - .map(|(k, v)| (k, v.to_string())) - .collect(), - room: payload - .room - .into_iter() - .map(|(k, v)| (k, v.to_string())) - .collect(), - }); - } - } - - Ok(results) -} - -/// Query metrics for a single instance from Redis (daily hash buckets). -async fn query_instance_metrics( - pool: &Pool, - instance_id: &str, - limit: usize, -) -> anyhow::Result> { - let mut conn = pool.get().await?; - - // Scan for all daily index keys for this instance - let index_pattern = format!("metrics:index:{}:*", instance_id); - let mut day_cursor = 0u64; - let mut day_keys: Vec = Vec::new(); - loop { - let (new_cursor, keys): (u64, Vec) = redis::cmd("SCAN") - .arg(day_cursor) - .arg("MATCH") - .arg(&index_pattern) - .arg("COUNT") - .arg(100) - .query_async(&mut conn) - .await?; - day_keys.extend(keys); - day_cursor = new_cursor; - if day_cursor == 0 { - break; - } - } - - // Collect all timestamps from all daily indexes - let mut all_timestamps: Vec = Vec::new(); - for day_key in &day_keys { - let timestamps: Vec = redis::cmd("ZREVRANGE") - .arg(day_key) - .arg(0) - .arg((limit as isize - 1).max(0)) - .query_async(&mut conn) - .await?; - all_timestamps.extend(timestamps); - } - - all_timestamps.sort_by_key(|&ts| std::cmp::Reverse(ts)); - all_timestamps.truncate(limit); - - let mut results = Vec::with_capacity(all_timestamps.len()); - for ts in &all_timestamps { - let dt = chrono::DateTime::from_timestamp(*ts, 0) - .unwrap_or_else(chrono::Utc::now); - let day_str = dt.format("%Y-%m-%d").to_string(); - let hash_key = format!("metrics:{}:{}", instance_id, day_str); - - let fields: Option> = redis::cmd("HGETALL") - .arg(&hash_key) - .query_async(&mut conn) - .await?; - - if let Some(fields) = fields { - let mut http = std::collections::HashMap::new(); - let mut room = std::collections::HashMap::new(); - for (k, v) in &fields { - let json_val: serde_json::Value = serde_json::from_str(v).unwrap_or_else(|_| serde_json::Value::String(v.clone())); - if let Some(stripped) = k.strip_prefix("http_") { - http.insert(stripped.to_string(), json_val); - } else if let Some(stripped) = k.strip_prefix("room_") { - room.insert(stripped.to_string(), json_val); - } - } - results.push((*ts, MetricsSnapshot { - instance_id: instance_id.to_string(), - timestamp_secs: *ts, - http, - room, - })); - } - } - - results.sort_by_key(|(ts, _)| *ts); - Ok(results) -} - -/// Export all metrics as CSV. -async fn export_all_metrics_csv(pool: &Pool, instance_filter: &str) -> anyhow::Result { - let instances = query_all_instance_metrics(pool, instance_filter, 1000).await?; - let mut rows = Vec::new(); - for inst in instances { - let ts = inst.timestamp_secs; - let id = inst.instance_id; - for (k, v) in inst.http { - rows.push(format!("{},{},http_{},{}", id, ts, k, v)); - } - for (k, v) in inst.room { - rows.push(format!("{},{},room_{},{}", id, ts, k, v)); - } - } - let header = "instance_id,timestamp,metric,value"; - if rows.is_empty() { - return Ok(header.to_string()); - } - Ok(format!("{}\n{}", header, rows.join("\n"))) -}