chore(admin): remove all metrics/observability features

- Delete admin metrics dashboard page (admin/metrics/page.tsx)
- Delete LineChart component (used only by metrics)
- Remove "指标监控" nav link from Sidebar
- Remove getMetrics/exportMetricsCsv from admin-rpc.ts client
- Remove /api/admin/metrics and /api/admin/metrics/export HTTP routes
  from adminrpc (was also leaking metrics via HTTP)
- Remove metrics RPC methods (get_metrics, export_metrics_csv) from
  adminrpc gRPC server and their helper functions
- Remove spawn_redis_metrics_flusher from app/main.rs
- Remove redis_metrics module from observability crate
- Remove redis/deadpool-redis deps from observability Cargo.toml
This commit is contained in:
ZhenYi 2026-04-23 15:42:00 +08:00
parent 3773fdc780
commit ae601774df
11 changed files with 7 additions and 1012 deletions

2
Cargo.lock generated
View File

@ -5181,7 +5181,6 @@ dependencies = [
"actix-web",
"anyhow",
"chrono",
"deadpool-redis",
"futures",
"hostname",
"metrics",
@ -5191,7 +5190,6 @@ dependencies = [
"opentelemetry-http",
"opentelemetry-otlp",
"opentelemetry_sdk",
"redis",
"reqwest 0.13.2",
"serde",
"serde_json",

View File

@ -1,199 +0,0 @@
"use client";
import { useEffect, useState } from "react";
import { format } from "date-fns";
import { getMetrics, exportMetricsCsv, type InstanceMetrics } from "@/lib/admin-rpc";
export default function MetricsPage() {
const [metrics, setMetrics] = useState<InstanceMetrics[]>([]);
const [loading, setLoading] = useState(true);
const [error, setError] = useState("");
const [exporting, setExporting] = useState(false);
const [instanceFilter, setInstanceFilter] = useState("");
useEffect(() => { loadMetrics(); }, []);
async function loadMetrics() {
setLoading(true);
setError("");
try {
const data = await getMetrics(instanceFilter);
setMetrics(data);
} catch (e) {
setError(e instanceof Error ? e.message : "加载指标失败");
} finally {
setLoading(false);
}
}
async function handleExportCsv() {
setExporting(true);
try {
const csv = await exportMetricsCsv(instanceFilter);
const blob = new Blob([csv], { type: "text/csv;charset=utf-8;" });
const url = URL.createObjectURL(blob);
const a = document.createElement("a");
a.href = url;
a.download = `metrics-${format(new Date(), "yyyy-MM-dd_HHmm")}.csv`;
a.click();
URL.revokeObjectURL(url);
} catch (e) {
alert("导出失败: " + (e instanceof Error ? e.message : String(e)));
} finally {
setExporting(false);
}
}
// Group by instance
const byInstance = metrics.reduce<Record<string, InstanceMetrics[]>>((acc, m) => {
if (!acc[m.instance_id]) acc[m.instance_id] = [];
acc[m.instance_id].push(m);
return acc;
}, {});
const allInstances = Object.keys(byInstance).sort();
const latestOf = (arr: InstanceMetrics[]) =>
arr.sort((a, b) => b.timestamp_secs - a.timestamp_secs)[0];
// Flatten all latest snapshots for a summary table
const latestSnapshots = allInstances.map((id) => ({
id,
latest: latestOf(byInstance[id]),
}));
function metricVal(m: InstanceMetrics | undefined, prefix: string, key: string): string {
if (!m) return "—";
const v = prefix === "http" ? m.http[key] : m.room[key];
return v ?? "—";
}
return (
<div className="admin-content">
<div className="page-header" style={{ display: "flex", justifyContent: "space-between", alignItems: "flex-start" }}>
<div>
<h1 className="page-title"></h1>
<p className="page-subtitle">
{allInstances.length}
{metrics.length > 0 && (
<span style={{ marginLeft: "8px", fontSize: "12px", color: "#737373" }}>
: {metrics.length > 0
? format(new Date(latestSnapshots[0]?.latest.timestamp_secs * 1000), "yyyy-MM-dd HH:mm:ss")
: "—"}
</span>
)}
</p>
</div>
<div style={{ display: "flex", gap: "8px", alignItems: "center" }}>
<input
type="text"
className="form-input"
style={{ width: "200px" }}
placeholder="过滤实例名称..."
value={instanceFilter}
onChange={(e) => { setInstanceFilter(e.target.value); }}
/>
<button className="btn btn-secondary" onClick={loadMetrics} disabled={loading}>
{loading ? "加载中..." : "刷新"}
</button>
<button className="btn btn-primary" onClick={handleExportCsv} disabled={exporting}>
{exporting ? "导出中..." : "导出 CSV"}
</button>
</div>
</div>
{error && (
<div className="alert alert-error" style={{ marginBottom: "16px" }}>
{error} adminrpc 默认: http://adminrpc.admin.svc.cluster.local:9091
</div>
)}
{loading ? (
<div className="loading">...</div>
) : allInstances.length === 0 ? (
<div className="card">
<div className="empty-state">
<div className="empty-state-icon"></div>
<p></p>
<p style={{ fontSize: "13px", color: "#737373" }}>
Redis 5
</p>
</div>
</div>
) : (
<>
{/* Summary table */}
<div className="card" style={{ marginBottom: "16px" }}>
<h3 style={{ margin: "0 0 12px 0", fontSize: "14px", fontWeight: 600 }}></h3>
<div className="table-container">
<table className="data-table">
<thead>
<tr>
<th> ID</th>
<th></th>
<th> WS </th>
<th></th>
<th>HTTP </th>
<th>HTTP </th>
</tr>
</thead>
<tbody>
{latestSnapshots.map(({ id, latest }) => (
<tr key={id}>
<td><code style={{ fontSize: "12px" }}>{id}</code></td>
<td>{format(new Date(latest.timestamp_secs * 1000), "HH:mm:ss")}</td>
<td>{metricVal(latest, "room", "ws_connections_active")}</td>
<td>{metricVal(latest, "room", "messages_sent_total")}</td>
<td>{metricVal(latest, "http", "request_count")}</td>
<td>{metricVal(latest, "http", "error_count")}</td>
</tr>
))}
</tbody>
</table>
</div>
</div>
{/* Per-instance detail */}
{allInstances.map((instanceId) => (
<div key={instanceId} className="card" style={{ marginBottom: "16px" }}>
<h3 style={{ margin: "0 0 12px 0", fontSize: "14px", fontWeight: 600 }}>
: <code>{instanceId}</code>
</h3>
<div style={{ display: "grid", gridTemplateColumns: "1fr 1fr", gap: "16px" }}>
{/* Room metrics */}
<div>
<h4 style={{ fontSize: "13px", color: "#737373", margin: "0 0 8px 0" }}>Room </h4>
<div style={{ display: "flex", flexDirection: "column", gap: "4px", fontSize: "13px" }}>
{Object.entries(latestOf(byInstance[instanceId])?.room ?? {}).map(([k, v]) => (
<div key={k} style={{ display: "flex", justifyContent: "space-between", padding: "2px 0", borderBottom: "1px solid #f4f4f5" }}>
<span style={{ color: "#525252" }}>{k}</span>
<code style={{ fontSize: "12px", color: "#171717" }}>{v}</code>
</div>
))}
{Object.keys(latestOf(byInstance[instanceId])?.room ?? {}).length === 0 && (
<div style={{ color: "#a3a3a3", fontSize: "12px" }}></div>
)}
</div>
</div>
{/* HTTP metrics */}
<div>
<h4 style={{ fontSize: "13px", color: "#737373", margin: "0 0 8px 0" }}>HTTP </h4>
<div style={{ display: "flex", flexDirection: "column", gap: "4px", fontSize: "13px" }}>
{Object.entries(latestOf(byInstance[instanceId])?.http ?? {}).map(([k, v]) => (
<div key={k} style={{ display: "flex", justifyContent: "space-between", padding: "2px 0", borderBottom: "1px solid #f4f4f5" }}>
<span style={{ color: "#525252" }}>{k}</span>
<code style={{ fontSize: "12px", color: "#171717" }}>{v}</code>
</div>
))}
{Object.keys(latestOf(byInstance[instanceId])?.http ?? {}).length === 0 && (
<div style={{ color: "#a3a3a3", fontSize: "12px" }}></div>
)}
</div>
</div>
</div>
</div>
))}
</>
)}
</div>
);
}

View File

@ -1,169 +0,0 @@
"use client";
import { format, parseISO } from "date-fns";
interface DataPoint {
date: string;
value: number;
}
interface LineChartProps {
data: DataPoint[];
width?: number;
height?: number;
color?: string;
label: string;
unit?: string;
}
export default function LineChart({
data,
width = 600,
height = 200,
color = "#6366f1",
label,
unit = "",
}: LineChartProps) {
if (data.length === 0) {
return (
<div style={{ color: "#737373", fontSize: "13px", textAlign: "center", padding: "32px" }}>
</div>
);
}
const padding = { top: 16, right: 16, bottom: 36, left: 44 };
const chartWidth = width - padding.left - padding.right;
const chartHeight = height - padding.top - padding.bottom;
const values = data.map((d) => d.value);
const maxValue = Math.max(...values, 1);
const minValue = 0;
function xScale(i: number) {
return padding.left + (i / (data.length - 1 || 1)) * chartWidth;
}
function yScale(v: number) {
return padding.top + chartHeight - ((v - minValue) / (maxValue - minValue || 1)) * chartHeight;
}
// Build SVG path
const pathD = data
.map((d, i) => `${i === 0 ? "M" : "L"} ${xScale(i).toFixed(1)} ${yScale(d.value).toFixed(1)}`)
.join(" ");
// Area path (fill under the line)
const areaD =
pathD +
` L ${xScale(data.length - 1).toFixed(1)} ${(padding.top + chartHeight).toFixed(1)}` +
` L ${padding.left} ${(padding.top + chartHeight).toFixed(1)} Z`;
// Y-axis ticks (5 ticks)
const yTicks = Array.from({ length: 5 }, (_, i) => {
const v = minValue + ((maxValue - minValue) * i) / 4;
return { v, y: yScale(v) };
});
// X-axis ticks (show every nth label based on data length)
const step = Math.max(1, Math.floor(data.length / 6));
const xTicks = data.filter((_, i) => i % step === 0 || i === data.length - 1);
const gradientId = `grad-${label.replace(/\s/g, "-")}`;
return (
<svg
width={width}
height={height}
style={{ display: "block", maxWidth: "100%" }}
aria-label={`${label} 趋势图`}
>
<defs>
<linearGradient id={gradientId} x1="0" y1="0" x2="0" y2="1">
<stop offset="0%" stopColor={color} stopOpacity={0.2} />
<stop offset="100%" stopColor={color} stopOpacity={0} />
</linearGradient>
</defs>
{/* Y-axis gridlines */}
{yTicks.map((t, i) => (
<line
key={i}
x1={padding.left}
y1={t.y}
x2={width - padding.right}
y2={t.y}
stroke="#e5e5e5"
strokeWidth={1}
/>
))}
{/* Y-axis labels */}
{yTicks.map((t, i) => (
<text
key={i}
x={padding.left - 6}
y={t.y + 4}
textAnchor="end"
fontSize={11}
fill="#737373"
>
{Math.round(t.v)}
</text>
))}
{/* X-axis labels */}
{xTicks.map((d, i) => {
const idx = data.findIndex((x) => x.date === d.date);
return (
<text
key={i}
x={xScale(idx)}
y={padding.top + chartHeight + 16}
textAnchor="middle"
fontSize={11}
fill="#737373"
>
{(() => {
try {
return format(parseISO(d.date), "MM-dd");
} catch {
return d.date.slice(5);
}
})()}
</text>
);
})}
{/* Area fill */}
<path d={areaD} fill={`url(#${gradientId})`} />
{/* Line */}
<path d={pathD} fill="none" stroke={color} strokeWidth={2} strokeLinejoin="round" />
{/* Data points (dots) */}
{data.map((d, i) => (
<circle
key={i}
cx={xScale(i)}
cy={yScale(d.value)}
r={3}
fill={color}
stroke="white"
strokeWidth={1.5}
style={{ cursor: "default" }}
>
<title>
{(() => {
try {
return `${format(parseISO(d.date), "yyyy-MM-dd")}: ${d.value}${unit}`;
} catch {
return `${d.date}: ${d.value}${unit}`;
}
})()}
</title>
</circle>
))}
</svg>
);
}

View File

@ -52,7 +52,6 @@ export default function Sidebar({ user, loading, onLogout }: SidebarProps) {
{ href: "/admin/sessions", label: "Admin 会话", icon: "◐" },
{ href: "/platform/sessions", label: "平台会话", icon: "☀" },
{ href: "/admin/api-tokens", label: "API Token", icon: "⚿" },
{ href: "/admin/metrics", label: "指标监控", icon: "◉" },
{ href: "/admin/daily-report", label: "每日报告", icon: "📧" },
],
},

View File

@ -2,7 +2,7 @@
* adminrpc HTTP REST client
*
* Calls the adminrpc HTTP server (default: http://gitdata-adminrpc.gitdataai.svc.cluster.local:9091)
* which exposes the same session management and metrics APIs as the gRPC service.
* which exposes session management and health APIs.
*
* Usage:
* import { listWorkspaceSessions, kickUser } from "@/lib/admin-rpc";
@ -99,31 +99,6 @@ export async function kickUserFromWorkspace(
});
}
// ─── Metrics API ─────────────────────────────────────────────────────────────
export interface InstanceMetrics {
instance_id: string;
timestamp_secs: number;
http: Record<string, string>;
room: Record<string, string>;
}
/** Get metrics across all app instances. */
export async function getMetrics(instanceFilter = ""): Promise<InstanceMetrics[]> {
const qs = instanceFilter ? `?instance_filter=${encodeURIComponent(instanceFilter)}` : "";
return rpc<InstanceMetrics[]>(`/api/admin/metrics${qs}`);
}
/** Export all metrics as CSV string. */
export async function exportMetricsCsv(instanceFilter = ""): Promise<string> {
const qs = instanceFilter ? `?instance_filter=${encodeURIComponent(instanceFilter)}` : "";
const res = await fetch(`${BASE_URL}/api/admin/metrics/export${qs}`);
if (!res.ok) {
throw new Error(`adminrpc GET /metrics/export failed (${res.status})`);
}
return res.text();
}
/** Get adminrpc health status. */
export async function adminRpcHealth(): Promise<{ ok: boolean }> {
return rpc<{ ok: boolean }>("/health");

View File

@ -78,15 +78,12 @@ async fn main() -> anyhow::Result<()> {
});
let http_handle = tokio::spawn(async move {
let pool_for_http = pool.clone();
let sm_for_http = session_manager.clone();
let result = HttpServer::new(move || {
ActixApp::new()
.app_data(web::Data::new(pool_for_http.clone()))
.app_data(web::Data::new(sm_for_http.clone()))
.route("/health", web::get().to(health))
.route("/admin/metrics/export", web::get().to(metrics_export))
.service(
web::scope("/api/admin")
.route(
@ -117,10 +114,7 @@ async fn main() -> anyhow::Result<()> {
.route(
"/sessions/kick-workspace",
web::post().to(kick_user_from_workspace),
)
// Metrics
.route("/metrics", web::get().to(get_metrics))
.route("/metrics/export", web::get().to(metrics_export)),
),
)
})
.bind(admin_addr)
@ -149,26 +143,6 @@ async fn health() -> HttpResponse {
HttpResponse::Ok().json(serde_json::json!({ "ok": true }))
}
async fn metrics_export(pool: web::Data<cluster::Pool>) -> HttpResponse {
match observability::export_all_metrics_csv(pool.get_ref(), "").await {
Ok(csv) => HttpResponse::Ok()
.content_type("text/csv; charset=utf-8")
.body(csv),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
async fn get_metrics(pool: web::Data<cluster::Pool>) -> HttpResponse {
match observability::query_all_instance_metrics(pool.get_ref(), "", 100).await {
Ok(instances) => HttpResponse::Ok().json(instances),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
fn parse_uuid(s: &str) -> Option<Uuid> {
Uuid::parse_str(s).ok()
}

View File

@ -7,7 +7,6 @@ use db::cache::AppCache;
use db::database::AppDatabase;
use observability::{
init_tracing_subscriber, install_recorder, prometheus_handler, spawn_http_metrics_poller,
spawn_redis_metrics_flusher,
HttpMetrics, HttpSnapshotGuard, MetricsMiddleware, TracingSpanMiddleware, instance_id,
};
use sea_orm::ConnectionTrait;
@ -102,18 +101,6 @@ async fn main() -> anyhow::Result<()> {
);
let http_snapshot_data = web::Data::new(http_snapshot);
// ── Redis metrics flusher (every 5s → key: metrics:{instance}:{ts}) ──────
let http_for_flusher = http_metrics.clone();
let prometheus_for_flusher = prometheus_handle_arc.clone();
spawn_redis_metrics_flusher(
cache.redis_pool().clone(),
instance_id(),
prometheus_for_flusher,
http_for_flusher,
std::time::Duration::from_secs(5),
);
tracing::info!("Redis metrics flusher started (5s interval)");
let bind_addr = args.bind.unwrap_or_else(|| "127.0.0.1:8080".to_string());
tracing::info!(bind_addr = %bind_addr, "Listening");
let http_metrics_server = http_metrics.clone();

View File

@ -25,10 +25,6 @@ serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = ["rt"] }
anyhow = { workspace = true }
# Redis (for metrics exporter)
redis = { workspace = true }
deadpool-redis = { workspace = true, features = ["cluster"] }
# Prometheus metrics export
metrics = "0.22"
metrics-exporter-prometheus = "0.13"

View File

@ -9,7 +9,6 @@ pub mod metrics_middleware;
pub mod prometheus_exporter;
pub mod otlp;
pub mod tracing_middleware;
pub mod redis_metrics;
pub use tracing_fmt::{init_tracing_subscriber, instance_id};
pub use metrics_middleware::{MetricsMiddleware, HttpMetrics};
@ -19,9 +18,3 @@ pub use prometheus_exporter::{
};
pub use otlp::{init_otlp, OtelGuard};
pub use tracing_middleware::TracingSpanMiddleware;
pub use redis_metrics::{
MetricsSnapshot,
spawn_redis_metrics_flusher,
query_instance_metrics, query_all_instance_metrics, export_all_metrics_csv,
FlatInstanceMetrics,
};

View File

@ -1,342 +0,0 @@
//! Redis-backed metrics exporter.
//!
//! Every `interval` seconds this module pushes metric fields to a Redis hash
//! bucketed by day:
//! Key: `metrics:{instance_id}:{YYYY-MM-DD}` (hash)
//! TTL: 30 days per day bucket
//!
//! Also maintains a sorted set `metrics:index:{instance_id}:{YYYY-MM-DD}` per
//! day for querying within a day window.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use deadpool_redis::cluster::Pool;
use serde::{Deserialize, Serialize};
const METRICS_TTL_SECS: i64 = 30 * 24 * 60 * 60; // 30 days
/// A snapshot of metric values at a point in time.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricsSnapshot {
pub instance_id: String,
pub timestamp_secs: i64,
#[serde(flatten)]
pub http: HashMap<String, serde_json::Value>,
#[serde(flatten)]
pub room: HashMap<String, serde_json::Value>,
}
/// Starts a background task that snapshots and pushes all metrics to Redis
/// every `interval` seconds.
///
/// `prometheus_handle`: from `install_recorder()`, used to render RoomMetrics values
/// `http_metrics`: shared `HttpMetrics` from `MetricsMiddleware`
/// `instance_id`: used as the Redis key prefix
pub fn spawn_redis_metrics_flusher(
redis_pool: Pool,
instance_id: String,
prometheus_handle: Arc<metrics_exporter_prometheus::PrometheusHandle>,
http_metrics: Arc<crate::metrics_middleware::HttpMetrics>,
interval: Duration,
) {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
loop {
ticker.tick().await;
if let Err(e) = flush_metrics(&redis_pool, &instance_id, &prometheus_handle, http_metrics.as_ref()).await {
tracing::warn!(error = %e, "failed to flush metrics to Redis");
}
}
});
}
async fn flush_metrics(
pool: &Pool,
instance_id: &str,
prometheus_handle: &Arc<metrics_exporter_prometheus::PrometheusHandle>,
http_metrics: &crate::metrics_middleware::HttpMetrics,
) -> anyhow::Result<()> {
let now = chrono::Utc::now();
let now_ts = now.timestamp();
let day_key = now.format("%Y-%m-%d").to_string();
// RoomMetrics from the metrics crate (rendered via PrometheusHandle)
let body = prometheus_handle.render();
let room = crate::prometheus_exporter::render_to_hashmap(&body);
// HTTP metrics from AtomicU64
let http = http_metrics.snapshot();
let hash_key = format!("metrics:{}:{}", instance_id, day_key);
let index_key = format!("metrics:index:{}:{}", instance_id, day_key);
let mut conn = pool.get().await?;
// HSET each metric field into the daily hash
let mut fields: Vec<(String, String)> = Vec::with_capacity(http.len() + room.len());
for (k, v) in &http {
fields.push((format!("http_{}", k), v.to_string()));
}
for (k, v) in &room {
fields.push((format!("room_{}", k), v.to_string()));
}
// HSET field field_value ...
let mut cmd = redis::cmd("HSET");
cmd.arg(&hash_key);
for (f, val) in &fields {
cmd.arg(f).arg(val);
}
let _: () = cmd.query_async(&mut conn).await?;
// Set TTL on the hash key (refresh every write)
let _: () = redis::cmd("EXPIRE")
.arg(&hash_key)
.arg(METRICS_TTL_SECS)
.query_async(&mut conn)
.await?;
// Update sorted set index for this instance+day
let _: () = redis::cmd("ZADD")
.arg(&index_key)
.arg(now_ts as f64)
.arg(now_ts.to_string())
.query_async(&mut conn)
.await?;
// Trim index to last 1000 entries per day
let _: () = redis::cmd("ZREMRANGEBYRANK")
.arg(&index_key)
.arg(0)
.arg(-1001)
.query_async(&mut conn)
.await?;
// Set TTL on the index key
let _: () = redis::cmd("EXPIRE")
.arg(&index_key)
.arg(METRICS_TTL_SECS)
.query_async(&mut conn)
.await?;
tracing::debug!(key = %hash_key, field_count = fields.len(), "metrics flushed to Redis (daily hash)");
Ok(())
}
/// Query metrics for an instance from Redis, reading the most recent `limit`
/// snapshots across all available daily hash buckets.
/// Returns a vector sorted by timestamp ascending.
pub async fn query_instance_metrics(
pool: &Pool,
instance_id: &str,
limit: usize,
) -> anyhow::Result<Vec<(i64, MetricsSnapshot)>> {
let mut conn = pool.get().await?;
// Scan for all daily index keys for this instance
let index_pattern = format!("metrics:index:{}:*", instance_id);
let mut cursor = 0u64;
let mut day_keys: Vec<String> = Vec::new();
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg(&index_pattern)
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await?;
day_keys.extend(keys);
cursor = new_cursor;
if cursor == 0 {
break;
}
}
// Collect all timestamps from all daily indexes
let mut all_timestamps: Vec<i64> = Vec::new();
for day_key in &day_keys {
let timestamps: Vec<i64> = redis::cmd("ZREVRANGE")
.arg(day_key)
.arg(0)
.arg((limit as isize - 1).max(0))
.query_async(&mut conn)
.await?;
all_timestamps.extend(timestamps);
}
// Sort descending and take limit
all_timestamps.sort_by_key(|&ts| std::cmp::Reverse(ts));
all_timestamps.truncate(limit);
// Read each snapshot from the daily hash
let mut results = Vec::with_capacity(all_timestamps.len());
for ts in &all_timestamps {
// Derive day bucket from timestamp
let dt = chrono::DateTime::from_timestamp(*ts, 0)
.unwrap_or_else(|| chrono::Utc::now());
let day_str = dt.format("%Y-%m-%d").to_string();
let hash_key = format!("metrics:{}:{}", instance_id, day_str);
let fields: Option<HashMap<String, String>> = redis::cmd("HGETALL")
.arg(&hash_key)
.query_async(&mut conn)
.await?;
if let Some(fields) = fields {
let mut http: HashMap<String, serde_json::Value> = HashMap::new();
let mut room: HashMap<String, serde_json::Value> = HashMap::new();
for (k, v) in &fields {
let json_val: serde_json::Value = serde_json::from_str(v).unwrap_or_else(|_| serde_json::Value::String(v.clone()));
if let Some(stripped) = k.strip_prefix("http_") {
http.insert(stripped.to_string(), json_val);
} else if let Some(stripped) = k.strip_prefix("room_") {
room.insert(stripped.to_string(), json_val);
}
}
results.push((*ts, MetricsSnapshot {
instance_id: instance_id.to_string(),
timestamp_secs: *ts,
http,
room,
}));
}
}
results.sort_by_key(|(ts, _)| *ts);
Ok(results)
}
/// Flat instance metrics suitable for JSON serialization.
#[derive(serde::Serialize)]
pub struct FlatInstanceMetrics {
pub instance_id: String,
pub timestamp_secs: i64,
pub http: std::collections::HashMap<String, String>,
pub room: std::collections::HashMap<String, String>,
}
/// Query all instances' metrics, optionally filtered by `instance_filter`.
pub async fn query_all_instance_metrics(
pool: &Pool,
instance_filter: &str,
limit: usize,
) -> anyhow::Result<Vec<FlatInstanceMetrics>> {
let mut conn = pool.get().await?;
// Scan for all daily index keys to extract instance ids
let mut cursor = 0u64;
let mut all_instance_ids = Vec::new();
let mut seen = std::collections::HashSet::new();
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg("metrics:index:*")
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await?;
for k in keys {
// Key format: metrics:index:{instance}:{YYYY-MM-DD}
if let Some(rest) = k.strip_prefix("metrics:index:") {
if let Some((instance_id, _)) = rest.rsplit_once(':') {
if seen.insert(instance_id.to_string()) {
all_instance_ids.push(instance_id.to_string());
}
}
}
}
cursor = new_cursor;
if cursor == 0 {
break;
}
}
let mut results = Vec::new();
for instance_id in all_instance_ids {
if !instance_filter.is_empty() && !instance_id.contains(instance_filter) {
continue;
}
let snapshots = query_instance_metrics(pool, &instance_id, limit).await?;
for (_, payload) in snapshots {
let mut http: std::collections::HashMap<String, String> = std::collections::HashMap::new();
let mut room: std::collections::HashMap<String, String> = std::collections::HashMap::new();
for (k, v) in &payload.http {
http.insert(k.clone(), v.to_string());
}
for (k, v) in &payload.room {
room.insert(k.clone(), v.to_string());
}
results.push(FlatInstanceMetrics {
instance_id: payload.instance_id,
timestamp_secs: payload.timestamp_secs,
http,
room,
});
}
}
Ok(results)
}
/// Export all metrics across all known instances as CSV.
pub async fn export_all_metrics_csv(
pool: &Pool,
instance_filter: &str,
) -> anyhow::Result<String> {
let mut conn = pool.get().await?;
// Scan for all daily index keys to extract instance ids
let mut cursor = 0u64;
let mut all_instance_ids = Vec::new();
let mut seen = std::collections::HashSet::new();
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg("metrics:index:*")
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await?;
for k in keys {
// Key format: metrics:index:{instance}:{YYYY-MM-DD}
if let Some(rest) = k.strip_prefix("metrics:index:") {
if let Some(instance_id) = rest.rsplit_once(':') {
let id = instance_id.0;
if seen.insert(id.to_string()) {
all_instance_ids.push(id.to_string());
}
}
}
}
cursor = new_cursor;
if cursor == 0 {
break;
}
}
let mut rows: Vec<String> = vec![
"instance_id,timestamp,metric,value".to_string(),
];
for instance_id in all_instance_ids {
if !instance_filter.is_empty() && !instance_id.contains(instance_filter) {
continue;
}
let metrics = query_instance_metrics(pool, &instance_id, 1000).await?;
for (ts, payload) in metrics {
for (k, v) in &payload.http {
rows.push(format!("{},{},http_{},{}", instance_id, ts, k, v));
}
for (k, v) in &payload.room {
rows.push(format!("{},{},room_{},{}", instance_id, ts, k, v));
}
}
}
Ok(rows.join("\n"))
}

View File

@ -7,12 +7,11 @@ use tonic::{transport::Server, Request, Response, Status};
use tracing::{info_span, Instrument};
use super::generated::admin::{
GetMetricsRequest, GetMetricsResponse, GetUserInfoRequest, GetUserInfoResponse,
GetUserStatusRequest, GetUserStatusResponse, GetWorkspaceOnlineUsersRequest,
GetWorkspaceOnlineUsersResponse, InstanceMetrics, IsUserOnlineRequest, IsUserOnlineResponse,
KickUserFromWorkspaceRequest, KickUserFromWorkspaceResponse, KickUserRequest, KickUserResponse,
ListUserSessionsRequest, ListUserSessionsResponse, ListWorkspaceSessionsRequest,
ListWorkspaceSessionsResponse, ExportMetricsCsvRequest, ExportMetricsCsvResponse,
GetUserInfoRequest, GetUserInfoResponse, GetUserStatusRequest, GetUserStatusResponse,
GetWorkspaceOnlineUsersRequest, GetWorkspaceOnlineUsersResponse, IsUserOnlineRequest,
IsUserOnlineResponse, KickUserFromWorkspaceRequest, KickUserFromWorkspaceResponse,
KickUserRequest, KickUserResponse, ListUserSessionsRequest, ListUserSessionsResponse,
ListWorkspaceSessionsRequest, ListWorkspaceSessionsResponse,
};
use super::generated::admin_session_admin::session_admin_server::{
SessionAdmin, SessionAdminServer,
@ -205,40 +204,6 @@ impl SessionAdmin for SessionAdminService {
.instrument(info_span!("is_user_online", user_id = %user_id))
.await
}
async fn get_metrics(
&self,
req: Request<GetMetricsRequest>,
) -> Result<Response<GetMetricsResponse>, Status> {
let r = req.get_ref();
let instance_filter = r.instance_filter.as_str();
let limit = if r.limit > 0 { r.limit as usize } else { 100 };
async {
let pool = self.session_manager.pool();
let instances = query_all_instance_metrics(pool, instance_filter, limit).await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(GetMetricsResponse { instances }))
}
.instrument(info_span!("get_metrics"))
.await
}
async fn export_metrics_csv(
&self,
req: Request<ExportMetricsCsvRequest>,
) -> Result<Response<ExportMetricsCsvResponse>, Status> {
let instance_filter = req.get_ref().instance_filter.as_str();
async {
let pool = self.session_manager.pool();
let csv = export_all_metrics_csv(pool, instance_filter).await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(ExportMetricsCsvResponse { csv }))
}
.instrument(info_span!("export_metrics_csv"))
.await
}
}
/// Default gRPC admin port.
@ -278,185 +243,3 @@ pub fn spawn(
let _ = shutdown_rx.recv().await;
})
}
// ---------------------------------------------------------------------------
// Metrics helpers — mirror of observability::redis_metrics (daily hash format)
// ---------------------------------------------------------------------------
use deadpool_redis::cluster::Pool;
use serde::{Deserialize, Serialize};
/// Snapshot stored in Redis under `metrics:{instance}:{YYYY-MM-DD}` (hash).
#[derive(Debug, Clone, Serialize, Deserialize)]
struct MetricsSnapshot {
instance_id: String,
timestamp_secs: i64,
#[serde(flatten)]
http: std::collections::HashMap<String, serde_json::Value>,
#[serde(flatten)]
room: std::collections::HashMap<String, serde_json::Value>,
}
/// Query metrics for all known instances, optionally filtered by `instance_filter`.
async fn query_all_instance_metrics(
pool: &Pool,
instance_filter: &str,
limit: usize,
) -> anyhow::Result<Vec<InstanceMetrics>> {
let mut conn = pool.get().await?;
// Scan for all daily index keys to extract instance ids
let mut cursor = 0u64;
let mut all_instance_ids = Vec::new();
let mut seen = std::collections::HashSet::new();
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg("metrics:index:*")
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await?;
for k in keys {
// Key format: metrics:index:{instance}:{YYYY-MM-DD}
if let Some(rest) = k.strip_prefix("metrics:index:") {
if let Some((instance_id, _)) = rest.rsplit_once(':') {
if seen.insert(instance_id.to_string()) {
all_instance_ids.push(instance_id.to_string());
}
}
}
}
cursor = new_cursor;
if cursor == 0 {
break;
}
}
let mut results = Vec::new();
for instance_id in all_instance_ids {
if !instance_filter.is_empty() && !instance_id.contains(instance_filter) {
continue;
}
let snapshots = query_instance_metrics(pool, &instance_id, limit).await?;
for (_, payload) in snapshots {
results.push(InstanceMetrics {
instance_id: payload.instance_id,
timestamp_secs: payload.timestamp_secs,
http: payload
.http
.into_iter()
.map(|(k, v)| (k, v.to_string()))
.collect(),
room: payload
.room
.into_iter()
.map(|(k, v)| (k, v.to_string()))
.collect(),
});
}
}
Ok(results)
}
/// Query metrics for a single instance from Redis (daily hash buckets).
async fn query_instance_metrics(
pool: &Pool,
instance_id: &str,
limit: usize,
) -> anyhow::Result<Vec<(i64, MetricsSnapshot)>> {
let mut conn = pool.get().await?;
// Scan for all daily index keys for this instance
let index_pattern = format!("metrics:index:{}:*", instance_id);
let mut day_cursor = 0u64;
let mut day_keys: Vec<String> = Vec::new();
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(day_cursor)
.arg("MATCH")
.arg(&index_pattern)
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await?;
day_keys.extend(keys);
day_cursor = new_cursor;
if day_cursor == 0 {
break;
}
}
// Collect all timestamps from all daily indexes
let mut all_timestamps: Vec<i64> = Vec::new();
for day_key in &day_keys {
let timestamps: Vec<i64> = redis::cmd("ZREVRANGE")
.arg(day_key)
.arg(0)
.arg((limit as isize - 1).max(0))
.query_async(&mut conn)
.await?;
all_timestamps.extend(timestamps);
}
all_timestamps.sort_by_key(|&ts| std::cmp::Reverse(ts));
all_timestamps.truncate(limit);
let mut results = Vec::with_capacity(all_timestamps.len());
for ts in &all_timestamps {
let dt = chrono::DateTime::from_timestamp(*ts, 0)
.unwrap_or_else(chrono::Utc::now);
let day_str = dt.format("%Y-%m-%d").to_string();
let hash_key = format!("metrics:{}:{}", instance_id, day_str);
let fields: Option<std::collections::HashMap<String, String>> = redis::cmd("HGETALL")
.arg(&hash_key)
.query_async(&mut conn)
.await?;
if let Some(fields) = fields {
let mut http = std::collections::HashMap::new();
let mut room = std::collections::HashMap::new();
for (k, v) in &fields {
let json_val: serde_json::Value = serde_json::from_str(v).unwrap_or_else(|_| serde_json::Value::String(v.clone()));
if let Some(stripped) = k.strip_prefix("http_") {
http.insert(stripped.to_string(), json_val);
} else if let Some(stripped) = k.strip_prefix("room_") {
room.insert(stripped.to_string(), json_val);
}
}
results.push((*ts, MetricsSnapshot {
instance_id: instance_id.to_string(),
timestamp_secs: *ts,
http,
room,
}));
}
}
results.sort_by_key(|(ts, _)| *ts);
Ok(results)
}
/// Export all metrics as CSV.
async fn export_all_metrics_csv(pool: &Pool, instance_filter: &str) -> anyhow::Result<String> {
let instances = query_all_instance_metrics(pool, instance_filter, 1000).await?;
let mut rows = Vec::new();
for inst in instances {
let ts = inst.timestamp_secs;
let id = inst.instance_id;
for (k, v) in inst.http {
rows.push(format!("{},{},http_{},{}", id, ts, k, v));
}
for (k, v) in inst.room {
rows.push(format!("{},{},room_{},{}", id, ts, k, v));
}
}
let header = "instance_id,timestamp,metric,value";
if rows.is_empty() {
return Ok(header.to_string());
}
Ok(format!("{}\n{}", header, rows.join("\n")))
}