Compare commits

...

13 Commits

Author SHA1 Message Date
ZhenYi
beee62832f fix(room): reasoning chain fallback, streaming error messages, borrow fixes
Some checks are pending
CI / Rust Tests (push) Waiting to run
CI / Frontend Lint & Type Check (push) Waiting to run
CI / Rust Lint & Check (push) Waiting to run
CI / Frontend Build (push) Blocked by required conditions
- ReAct streaming: collect all ReactStep chunks into reasoning_buffer;
  if no Answer step is emitted, persist the full reasoning chain instead
  of empty content
- All AI error paths (reasoning loop failure, non-streaming errors) now
  send user-visible [AI error: ...] messages instead of silently dropping
- Fix borrow checker: clone content before struct init, use should_log bool
  to avoid double-borrow on err_msg
2026-04-24 13:17:20 +08:00
ZhenYi
d89d02e81b fix(agent): surface FC/tool errors as observations to continue ReAct loop
- Streaming path: on tool_call execution error, emit an [Observation]
  chunk so the model sees the failure and can retry/adapt
- Non-streaming path: inject error as a user message so the loop
  continues with error context, not silently stop
2026-04-24 13:17:13 +08:00
ZhenYi
94825316dc fix(agent): extract JSON from model output even with leading text prefix
ReAct loop was terminating early when the model returned:
  [Agent ran through N steps...]
  {"thought": "...", "action": {...}}

The extract_json function only checked the string start or code fences.
Now scans for { or [ at non-word positions and uses depth-counting
to strip trailing text, allowing JSON buried anywhere in the response.
2026-04-24 13:17:06 +08:00
ZhenYi
261989fca3 feat(frontend): TipTap mention nodes with keyboard nav and sectioned dropdown
- MentionNode.tsx: custom TipTap atom node for @/#//mentions
- MentionView.tsx: colored inline labels by type (user=blue, ai=indigo, special=orange)
- IMEditor.tsx: register MentionNode, ↑↓/Enter/Tab/Esc keyboard nav,
  sectioned dropdown (@ groups Notify/AI/Members, # channels, / commands),
  serialize AST to @[type🆔label] on send
- MessageInput.tsx: wire roomAiConfigs into mentionItems, AST serialization
- MessageBubble.tsx: default expanded text (showFullText=true), AI messages never collapsed
2026-04-24 13:16:59 +08:00
ZhenYi
6aca08b8ab feat(room-ui): typing indicator, quick reactions, message grouping, @here/@channel, drag-drop categories, REST category loading
- DiscordChatPanel: typing indicator with animated dots and user names
- MessageActions: quick emoji bar (👍❤️😂🎉😮) on hover
- MessageList: group consecutive messages from same sender within 5min
- MessageInput/IMEditor: @here/@channel special mention suggestions
- DiscordChannelSidebar: useDroppable on category headers for drag-drop,
  empty categories now render, rooms/categories loaded via REST API
- room-context: typingUsers state, REST roomList/categoryList, category
  merge into rooms
2026-04-24 00:04:46 +08:00
ZhenYi
59640c6f44 feat(ws-client): add TypingStart/TypingStop protocol types and client handlers
ws-protocol.ts: TypingStartPayload/TypingStopPayload interfaces,
WsEventPayload union types.

room-ws-client.ts: onTypingStart/onTypingStop callbacks, sendTyping()
method, event dispatch for typing.start/typing_start.

editor/types.ts: special_here/special_channel MentionType + description
field on MentionItem.
2026-04-24 00:04:36 +08:00
ZhenYi
5776af18ca perf: sequence generation Redis-only + session MGET batch
service.rs: Replace per-message Lua+DB seq with simple INCR, only
reconcile DB every 1000 messages (99.9% queries eliminated).

storage.rs: Replace N+1 GET loop with single MGET for both
get_user_sessions and get_workspace_sessions (N+1 → 2 roundtrips).
2026-04-24 00:04:27 +08:00
ZhenYi
33ab7b058d fix(ws): replace unreachable_unchecked with safe fallback for TypingStart/TypingStop
TypingStart/TypingStop actions are intercepted in ws_universal.rs so
this match arm is never reached, but we need a safe fallback instead
of std::hint::unreachable_unchecked().
2026-04-24 00:04:18 +08:00
ZhenYi
fb28fdd056 feat(room): implement typing indicator broadcast with Redis 10s TTL
RoomConnectionManager now holds a cache field and typing_inner broadcast
map. broadcast_typing() persists start/stop to Redis (SETEX 10s / DEL)
and broadcasts via tokio channel. ws_universal.rs handles TypingStart/
TypingStop actions and streams typing events to WS clients.
2026-04-24 00:04:09 +08:00
ZhenYi
e83512382f feat(room): add TypingEvent type and TypingStart/TypingStop event variants
Add TypingEvent struct in queue::types for broadcast-based typing
indicators, and TypingStart/TypingStop variants in RoomEventType for
WebSocket event dispatch.
2026-04-24 00:04:01 +08:00
ZhenYi
22b5eab769 fix(admin): restore LineChart component
LineChart is used by the dashboard page to render DAU trend charts.
It was accidentally removed during metrics cleanup — restore it.
2026-04-23 15:44:10 +08:00
ZhenYi
ae601774df chore(admin): remove all metrics/observability features
- Delete admin metrics dashboard page (admin/metrics/page.tsx)
- Delete LineChart component (used only by metrics)
- Remove "指标监控" nav link from Sidebar
- Remove getMetrics/exportMetricsCsv from admin-rpc.ts client
- Remove /api/admin/metrics and /api/admin/metrics/export HTTP routes
  from adminrpc (was also leaking metrics via HTTP)
- Remove metrics RPC methods (get_metrics, export_metrics_csv) from
  adminrpc gRPC server and their helper functions
- Remove spawn_redis_metrics_flusher from app/main.rs
- Remove redis_metrics module from observability crate
- Remove redis/deadpool-redis deps from observability Cargo.toml
2026-04-23 15:42:00 +08:00
ZhenYi
3773fdc780 feat(admin): add structured error logger for all API routes
Replace bare console.error() calls with logError() utility across all
47 API route handlers. logError() prints timestamp + context + message
+ stack trace + extra request data to stderr, and redacts sensitive
fields (password, token, secret, key, etc.) from logged objects.
2026-04-23 09:55:35 +08:00
84 changed files with 1331 additions and 1215 deletions

2
Cargo.lock generated
View File

@ -5181,7 +5181,6 @@ dependencies = [
"actix-web",
"anyhow",
"chrono",
"deadpool-redis",
"futures",
"hostname",
"metrics",
@ -5191,7 +5190,6 @@ dependencies = [
"opentelemetry-http",
"opentelemetry-otlp",
"opentelemetry_sdk",
"redis",
"reqwest 0.13.2",
"serde",
"serde_json",

View File

@ -1,199 +0,0 @@
"use client";
import { useEffect, useState } from "react";
import { format } from "date-fns";
import { getMetrics, exportMetricsCsv, type InstanceMetrics } from "@/lib/admin-rpc";
export default function MetricsPage() {
const [metrics, setMetrics] = useState<InstanceMetrics[]>([]);
const [loading, setLoading] = useState(true);
const [error, setError] = useState("");
const [exporting, setExporting] = useState(false);
const [instanceFilter, setInstanceFilter] = useState("");
useEffect(() => { loadMetrics(); }, []);
async function loadMetrics() {
setLoading(true);
setError("");
try {
const data = await getMetrics(instanceFilter);
setMetrics(data);
} catch (e) {
setError(e instanceof Error ? e.message : "加载指标失败");
} finally {
setLoading(false);
}
}
async function handleExportCsv() {
setExporting(true);
try {
const csv = await exportMetricsCsv(instanceFilter);
const blob = new Blob([csv], { type: "text/csv;charset=utf-8;" });
const url = URL.createObjectURL(blob);
const a = document.createElement("a");
a.href = url;
a.download = `metrics-${format(new Date(), "yyyy-MM-dd_HHmm")}.csv`;
a.click();
URL.revokeObjectURL(url);
} catch (e) {
alert("导出失败: " + (e instanceof Error ? e.message : String(e)));
} finally {
setExporting(false);
}
}
// Group by instance
const byInstance = metrics.reduce<Record<string, InstanceMetrics[]>>((acc, m) => {
if (!acc[m.instance_id]) acc[m.instance_id] = [];
acc[m.instance_id].push(m);
return acc;
}, {});
const allInstances = Object.keys(byInstance).sort();
const latestOf = (arr: InstanceMetrics[]) =>
arr.sort((a, b) => b.timestamp_secs - a.timestamp_secs)[0];
// Flatten all latest snapshots for a summary table
const latestSnapshots = allInstances.map((id) => ({
id,
latest: latestOf(byInstance[id]),
}));
function metricVal(m: InstanceMetrics | undefined, prefix: string, key: string): string {
if (!m) return "—";
const v = prefix === "http" ? m.http[key] : m.room[key];
return v ?? "—";
}
return (
<div className="admin-content">
<div className="page-header" style={{ display: "flex", justifyContent: "space-between", alignItems: "flex-start" }}>
<div>
<h1 className="page-title"></h1>
<p className="page-subtitle">
{allInstances.length}
{metrics.length > 0 && (
<span style={{ marginLeft: "8px", fontSize: "12px", color: "#737373" }}>
: {metrics.length > 0
? format(new Date(latestSnapshots[0]?.latest.timestamp_secs * 1000), "yyyy-MM-dd HH:mm:ss")
: "—"}
</span>
)}
</p>
</div>
<div style={{ display: "flex", gap: "8px", alignItems: "center" }}>
<input
type="text"
className="form-input"
style={{ width: "200px" }}
placeholder="过滤实例名称..."
value={instanceFilter}
onChange={(e) => { setInstanceFilter(e.target.value); }}
/>
<button className="btn btn-secondary" onClick={loadMetrics} disabled={loading}>
{loading ? "加载中..." : "刷新"}
</button>
<button className="btn btn-primary" onClick={handleExportCsv} disabled={exporting}>
{exporting ? "导出中..." : "导出 CSV"}
</button>
</div>
</div>
{error && (
<div className="alert alert-error" style={{ marginBottom: "16px" }}>
{error} adminrpc 默认: http://adminrpc.admin.svc.cluster.local:9091
</div>
)}
{loading ? (
<div className="loading">...</div>
) : allInstances.length === 0 ? (
<div className="card">
<div className="empty-state">
<div className="empty-state-icon"></div>
<p></p>
<p style={{ fontSize: "13px", color: "#737373" }}>
Redis 5
</p>
</div>
</div>
) : (
<>
{/* Summary table */}
<div className="card" style={{ marginBottom: "16px" }}>
<h3 style={{ margin: "0 0 12px 0", fontSize: "14px", fontWeight: 600 }}></h3>
<div className="table-container">
<table className="data-table">
<thead>
<tr>
<th> ID</th>
<th></th>
<th> WS </th>
<th></th>
<th>HTTP </th>
<th>HTTP </th>
</tr>
</thead>
<tbody>
{latestSnapshots.map(({ id, latest }) => (
<tr key={id}>
<td><code style={{ fontSize: "12px" }}>{id}</code></td>
<td>{format(new Date(latest.timestamp_secs * 1000), "HH:mm:ss")}</td>
<td>{metricVal(latest, "room", "ws_connections_active")}</td>
<td>{metricVal(latest, "room", "messages_sent_total")}</td>
<td>{metricVal(latest, "http", "request_count")}</td>
<td>{metricVal(latest, "http", "error_count")}</td>
</tr>
))}
</tbody>
</table>
</div>
</div>
{/* Per-instance detail */}
{allInstances.map((instanceId) => (
<div key={instanceId} className="card" style={{ marginBottom: "16px" }}>
<h3 style={{ margin: "0 0 12px 0", fontSize: "14px", fontWeight: 600 }}>
: <code>{instanceId}</code>
</h3>
<div style={{ display: "grid", gridTemplateColumns: "1fr 1fr", gap: "16px" }}>
{/* Room metrics */}
<div>
<h4 style={{ fontSize: "13px", color: "#737373", margin: "0 0 8px 0" }}>Room </h4>
<div style={{ display: "flex", flexDirection: "column", gap: "4px", fontSize: "13px" }}>
{Object.entries(latestOf(byInstance[instanceId])?.room ?? {}).map(([k, v]) => (
<div key={k} style={{ display: "flex", justifyContent: "space-between", padding: "2px 0", borderBottom: "1px solid #f4f4f5" }}>
<span style={{ color: "#525252" }}>{k}</span>
<code style={{ fontSize: "12px", color: "#171717" }}>{v}</code>
</div>
))}
{Object.keys(latestOf(byInstance[instanceId])?.room ?? {}).length === 0 && (
<div style={{ color: "#a3a3a3", fontSize: "12px" }}></div>
)}
</div>
</div>
{/* HTTP metrics */}
<div>
<h4 style={{ fontSize: "13px", color: "#737373", margin: "0 0 8px 0" }}>HTTP </h4>
<div style={{ display: "flex", flexDirection: "column", gap: "4px", fontSize: "13px" }}>
{Object.entries(latestOf(byInstance[instanceId])?.http ?? {}).map(([k, v]) => (
<div key={k} style={{ display: "flex", justifyContent: "space-between", padding: "2px 0", borderBottom: "1px solid #f4f4f5" }}>
<span style={{ color: "#525252" }}>{k}</span>
<code style={{ fontSize: "12px", color: "#171717" }}>{v}</code>
</div>
))}
{Object.keys(latestOf(byInstance[instanceId])?.http ?? {}).length === 0 && (
<div style={{ color: "#a3a3a3", fontSize: "12px" }}></div>
)}
</div>
</div>
</div>
</div>
))}
</>
)}
</div>
);
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { createModel, updateModel, deleteModel } from "@/lib/adminrpc/client";
@ -11,7 +12,7 @@ export async function POST(req: NextRequest) {
return NextResponse.json(data);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
console.error("Create model error:", msg);
logError("Create model error:", e);
return NextResponse.json({ error: `创建失败: ${msg}` }, { status: 500 });
}
}
@ -27,7 +28,7 @@ export async function PATCH(req: NextRequest) {
return NextResponse.json(data);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
console.error("Update model error:", msg);
logError("Update model error:", e);
return NextResponse.json({ error: `更新失败: ${msg}` }, { status: 500 });
}
}
@ -42,7 +43,7 @@ export async function DELETE(req: NextRequest) {
return NextResponse.json(data);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
console.error("Delete model error:", msg);
logError("Delete model error:", e);
return NextResponse.json({ error: `删除失败: ${msg}` }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { updatePricing } from "@/lib/adminrpc/client";
@ -15,7 +16,7 @@ export async function PATCH(
return NextResponse.json(data);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
console.error("Update pricing error:", msg);
logError("Update pricing error:", e);
return NextResponse.json({ error: `更新失败: ${msg}` }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { createProvider, updateProvider, deleteProvider } from "@/lib/adminrpc/client";
@ -11,7 +12,7 @@ export async function POST(req: NextRequest) {
return NextResponse.json(data);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
console.error("Create provider error:", msg);
logError("Create provider error:", e);
return NextResponse.json({ error: `创建失败: ${msg}` }, { status: 500 });
}
}
@ -27,7 +28,7 @@ export async function PATCH(req: NextRequest) {
return NextResponse.json(data);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
console.error("Update provider error:", msg);
logError("Update provider error:", e);
return NextResponse.json({ error: `更新失败: ${msg}` }, { status: 500 });
}
}
@ -42,7 +43,7 @@ export async function DELETE(req: NextRequest) {
return NextResponse.json(data);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
console.error("Delete provider error:", msg);
logError("Delete provider error:", e);
return NextResponse.json({ error: `删除失败: ${msg}` }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { createVersion, updateVersion, deleteVersion } from "@/lib/adminrpc/client";
@ -11,7 +12,7 @@ export async function POST(req: NextRequest) {
return NextResponse.json(data);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
console.error("Create version error:", msg);
logError("Create version error:", e);
return NextResponse.json({ error: `创建失败: ${msg}` }, { status: 500 });
}
}
@ -27,7 +28,7 @@ export async function PATCH(req: NextRequest) {
return NextResponse.json(data);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
console.error("Update version error:", msg);
logError("Update version error:", e);
return NextResponse.json({ error: `更新失败: ${msg}` }, { status: 500 });
}
}
@ -42,7 +43,7 @@ export async function DELETE(req: NextRequest) {
return NextResponse.json(data);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
console.error("Delete version error:", msg);
logError("Delete version error:", e);
return NextResponse.json({ error: `删除失败: ${msg}` }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
import { createAuditLog } from "@/lib/log";
@ -45,7 +46,7 @@ export async function GET() {
return NextResponse.json({ config });
} catch (e) {
console.error("Get AI config error:", e);
logError("Get AI config error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -95,7 +96,7 @@ export async function PUT(req: NextRequest) {
return NextResponse.json({ success: true });
} catch (e) {
console.error("Update AI config error:", e);
logError("Update AI config error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
@ -149,7 +150,7 @@ export async function POST(req: NextRequest) {
aiSummaryUsed: !!aiSummary,
});
} catch (e) {
console.error("[daily-report] Error:", e);
logError("[daily-report] Error:", e);
return NextResponse.json({ error: String(e) }, { status: 500 });
}
}
@ -484,7 +485,7 @@ async function sendEmail(opts: {
sentCount++;
console.log(`[daily-report] Sent to ${recipient}`);
} catch (e) {
console.error(`[daily-report] Failed to send to ${recipient}:`, e);
logError(`[daily-report] Failed to send to ${recipient}:`, e);
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
import { createAuditLog } from "@/lib/log";
@ -66,7 +67,7 @@ export async function PATCH(
if (err?.code === "23505") {
return NextResponse.json({ error: "该邮箱已存在" }, { status: 409 });
}
console.error("Update recipient error:", e);
logError("Update recipient error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -99,7 +100,7 @@ export async function DELETE(
return NextResponse.json({ success: true });
} catch (e) {
console.error("Delete recipient error:", e);
logError("Delete recipient error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
import { createAuditLog } from "@/lib/log";
@ -14,7 +15,7 @@ export async function GET() {
);
return NextResponse.json({ recipients: result.rows });
} catch (e) {
console.error("[recipients] List error:", e);
logError("[recipients] List error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -53,7 +54,7 @@ export async function POST(req: NextRequest) {
if (err?.code === "23505") {
return NextResponse.json({ error: "该邮箱已存在" }, { status: 409 });
}
console.error("[recipients] Add error:", e);
logError("[recipients] Add error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
import { createAuditLog } from "@/lib/log";
@ -38,7 +39,7 @@ export async function GET() {
);
return NextResponse.json({ recipients: result.rows });
} catch (e) {
console.error("List recipients error:", e);
logError("List recipients error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -78,7 +79,7 @@ export async function POST(req: NextRequest) {
if (err?.code === "23505") {
return NextResponse.json({ error: "该邮箱已存在" }, { status: 409 });
}
console.error("Add recipient error:", e);
logError("Add recipient error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query, transaction } from "@/lib/db";
@ -41,7 +42,7 @@ export async function GET(
})),
});
} catch (e) {
console.error("Project billing error:", e);
logError("Project billing error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -82,7 +83,7 @@ export async function POST(
return NextResponse.json({ ok: true, amount });
} catch (e) {
console.error("Project add credit error:", e);
logError("Project add credit error:", e);
return NextResponse.json({ error: "充值失败" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
import { createAuditLog } from "@/lib/log";
@ -48,7 +49,7 @@ export async function PATCH(
return NextResponse.json({ success: true });
} catch (e) {
console.error("Update project member error:", e);
logError("Update project member error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -90,7 +91,7 @@ export async function DELETE(
return NextResponse.json({ success: true });
} catch (e) {
console.error("Delete project member error:", e);
logError("Delete project member error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
import { createAuditLog } from "@/lib/log";
@ -38,7 +39,7 @@ export async function GET(
return NextResponse.json({ members });
} catch (e) {
console.error("List project members error:", e);
logError("List project members error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -108,7 +109,7 @@ export async function POST(
return NextResponse.json({ success: true, id: result.rows[0]?.id }, { status: 201 });
} catch (e) {
console.error("Add project member error:", e);
logError("Add project member error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
@ -56,7 +57,7 @@ export async function GET(
billingHistory: billingHistoryRows.rows,
});
} catch (e) {
console.error("Project detail error:", e);
logError("Project detail error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
@ -64,7 +65,7 @@ export async function GET(req: NextRequest) {
return NextResponse.json({ projects: projects.rows, total, page, pageSize });
} catch (e) {
console.error("List projects error:", e);
logError("List projects error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
@ -108,7 +109,7 @@ export async function GET(
return NextResponse.json({ repo, branches, commits });
} catch (e) {
console.error("Repo detail error:", e);
logError("Repo detail error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { deleteApiToken } from "@/lib/api-token";
import { createAuditLog } from "@/lib/log";
@ -29,7 +30,7 @@ export async function DELETE(
return NextResponse.json({ success: true });
} catch (e) {
console.error("Delete token error:", e);
logError("Delete token error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import {
listApiTokens,
@ -11,7 +12,7 @@ export async function GET(req: NextRequest) {
const tokens = await listApiTokens();
return NextResponse.json({ tokens });
} catch (e) {
console.error("List tokens error:", e);
logError("List tokens error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -58,7 +59,7 @@ export async function POST(req: NextRequest) {
return NextResponse.json({ id: result.id, token, name: name.trim(), expiresAt }, { status: 201 });
} catch (e) {
console.error("Create token error:", e);
logError("Create token error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { login, buildSetCookieHeader } from "@/lib/auth";
import { createAuditLog } from "@/lib/log";
@ -70,7 +71,7 @@ export async function POST(req: NextRequest) {
return response;
} catch (e) {
console.error("Login error:", e);
logError("Login error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { logout, parseSessionCookie, loadAdminSession, buildClearCookieHeader } from "@/lib/auth";
import { createAuditLog } from "@/lib/log";
@ -31,7 +32,7 @@ export async function POST(req: NextRequest) {
response.headers.set("Set-Cookie", buildClearCookieHeader());
return response;
} catch (e) {
console.error("Logout error:", e);
logError("Logout error:", e);
const response = NextResponse.json({ success: false });
response.headers.set("Set-Cookie", buildClearCookieHeader());
return response;

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { parseSessionCookie, loadAdminSession, touchSession } from "@/lib/auth";
@ -29,7 +30,7 @@ export async function GET(req: NextRequest) {
},
});
} catch (e) {
console.error("Session check error:", e);
logError("Session check error:", e);
return NextResponse.json({ user: null });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextResponse } from "next/server";
import { query } from "@/lib/db";
@ -132,7 +133,7 @@ export async function GET() {
await ensureTables();
return NextResponse.json({ status: "ok" }, { status: 200 });
} catch (e) {
console.error("[Health] DB check failed:", e);
logError("[Health] DB check failed:", e);
return NextResponse.json({ status: "error", reason: String(e) }, { status: 503 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { listAuditLogs } from "@/lib/log";
@ -87,7 +88,7 @@ export async function GET(req: NextRequest) {
return NextResponse.json(result);
} catch (e) {
console.error("List audit logs error:", e);
logError("List audit logs error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import {
listPermissions,
@ -19,7 +20,7 @@ export async function GET() {
const permissions = await listPermissions();
return NextResponse.json({ permissions });
} catch (e) {
console.error("List permissions error:", e);
logError("List permissions error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -63,7 +64,7 @@ export async function POST(req: NextRequest) {
if ((e as { code?: string }).code === "23505") {
return NextResponse.json({ error: "权限代码已存在" }, { status: 409 });
}
console.error("Create permission error:", e);
logError("Create permission error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -97,7 +98,7 @@ export async function PUT(req: NextRequest) {
return NextResponse.json(permission);
} catch (e) {
console.error("Update permission error:", e);
logError("Update permission error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -125,7 +126,7 @@ export async function DELETE(req: NextRequest) {
return NextResponse.json({ success: true });
} catch (e) {
console.error("Delete permission error:", e);
logError("Delete permission error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextResponse } from "next/server";
import { query } from "@/lib/db";
@ -76,7 +77,7 @@ export async function GET() {
last24h: parseInt(last24h.rows[0]?.count || "0", 10),
});
} catch (e) {
console.error("Activity stats error:", e);
logError("Activity stats error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
@ -97,7 +98,7 @@ export async function GET(req: NextRequest) {
versions: versionsList,
});
} catch (e) {
console.error("AI data error:", e);
logError("AI data error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextResponse } from "next/server";
import { syncModels } from "@/lib/adminrpc/client";
@ -12,7 +13,7 @@ export async function POST() {
return NextResponse.json(data);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
console.error("AI sync error:", msg);
logError("AI sync error:", e);
return NextResponse.json({ error: `同步失败: ${msg}` }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextResponse } from "next/server";
import { checkAlerts } from "@/lib/adminrpc/client";
@ -12,7 +13,7 @@ export async function POST() {
return NextResponse.json(data);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
console.error("Alert check error:", msg);
logError("Alert check error:", e);
return NextResponse.json({ error: `检查失败: ${msg}` }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
@ -99,7 +100,7 @@ export async function GET(req: NextRequest) {
return NextResponse.json({ logs: combined.slice(0, pageSize), total, page, pageSize });
} catch (e) {
console.error("Platform audit logs error:", e);
logError("Platform audit logs error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
@ -90,7 +91,7 @@ export async function GET(req: NextRequest) {
return NextResponse.json({ repos, total, page, pageSize });
} catch (e) {
console.error("Repos error:", e);
logError("Repos error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
import { getAdminUserId } from "@/lib/auth";
@ -30,7 +31,7 @@ export async function DELETE(
return NextResponse.json({ success: true });
} catch (e) {
console.error("Revoke message error:", e);
logError("Revoke message error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
@ -72,7 +73,7 @@ export async function GET(
return NextResponse.json({ messages, total, page, pageSize });
} catch (e) {
console.error("Room messages error:", e);
logError("Room messages error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
@ -90,7 +91,7 @@ export async function GET(req: NextRequest) {
return NextResponse.json({ rooms, total, page, pageSize });
} catch (e) {
console.error("Rooms error:", e);
logError("Rooms error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import {
getOnlinePlatformSessions,
@ -22,7 +23,7 @@ export async function GET() {
}
return NextResponse.json({ sessions });
} catch (e) {
console.error("Get platform sessions error:", e);
logError("Get platform sessions error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -51,7 +52,7 @@ export async function DELETE(req: NextRequest) {
return NextResponse.json({ success: ok });
} catch (e) {
console.error("Delete platform session error:", e);
logError("Delete platform session error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextResponse } from "next/server";
import { query } from "@/lib/db";
@ -73,7 +74,7 @@ export async function GET() {
planDistribution,
});
} catch (e) {
console.error("Stats error:", e);
logError("Stats error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
import { createAuditLog } from "@/lib/log";
@ -45,7 +46,7 @@ export async function GET(
},
});
} catch (e) {
console.error("Get user error:", e);
logError("Get user error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -145,7 +146,7 @@ export async function PATCH(
return NextResponse.json({ success: true });
} catch (e) {
console.error("Update user error:", e);
logError("Update user error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
import { createAuditLog } from "@/lib/log";
@ -55,7 +56,7 @@ export async function GET(req: NextRequest) {
return NextResponse.json({ users, total, page, pageSize });
} catch (e) {
console.error("Platform users error:", e);
logError("Platform users error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -110,7 +111,7 @@ export async function PATCH(req: NextRequest) {
return NextResponse.json({ success: true, updated: uids.length });
} catch (e) {
console.error("Batch update user status error:", e);
logError("Batch update user status error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query, getClient } from "@/lib/db";
import { createAuditLog } from "@/lib/log";
@ -94,7 +95,7 @@ export async function POST(
return NextResponse.json({ success: true, amount, currency });
} catch (e) {
console.error("Add workspace credit error:", e);
logError("Add workspace credit error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query, getClient } from "@/lib/db";
import { getAdminUserId } from "@/lib/auth";
@ -35,7 +36,7 @@ export async function GET(
);
return NextResponse.json({ configs: result.rows });
} catch (e) {
console.error("Alert config error:", e);
logError("Alert config error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -87,7 +88,7 @@ export async function PUT(
client.release();
}
} catch (e) {
console.error("Alert config update error:", e);
logError("Alert config update error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
import { createAuditLog } from "@/lib/log";
@ -52,7 +53,7 @@ export async function PATCH(
return NextResponse.json({ success: true });
} catch (e) {
console.error("Update workspace member error:", e);
logError("Update workspace member error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -94,7 +95,7 @@ export async function DELETE(
return NextResponse.json({ success: true });
} catch (e) {
console.error("Delete workspace member error:", e);
logError("Delete workspace member error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
import { createAuditLog } from "@/lib/log";
@ -43,7 +44,7 @@ export async function GET(
return NextResponse.json({ members });
} catch (e) {
console.error("List workspace members error:", e);
logError("List workspace members error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -113,7 +114,7 @@ export async function POST(
return NextResponse.json({ success: true, id: result.rows[0]?.id }, { status: 201 });
} catch (e) {
console.error("Add workspace member error:", e);
logError("Add workspace member error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
@ -59,7 +60,7 @@ export async function GET(
billingHistory: billing.rows,
});
} catch (e) {
console.error("Workspace detail error:", e);
logError("Workspace detail error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { query } from "@/lib/db";
import { createAuditLog } from "@/lib/log";
@ -61,7 +62,7 @@ export async function GET(req: NextRequest) {
pageSize,
});
} catch (e) {
console.error("List workspaces error:", e);
logError("List workspaces error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -108,7 +109,7 @@ export async function PATCH(req: NextRequest) {
return NextResponse.json({ success: true, updated: ids.length });
} catch (e) {
console.error("Batch update workspace plan error:", e);
logError("Batch update workspace plan error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import {
getRoleById,
@ -27,7 +28,7 @@ export async function GET(
const permissions = await getRolePermissions(roleId);
return NextResponse.json({ ...role, permissions });
} catch (e) {
console.error("Get role error:", e);
logError("Get role error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -66,7 +67,7 @@ export async function PUT(
return NextResponse.json(role);
} catch (e) {
console.error("Update role error:", e);
logError("Update role error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -94,7 +95,7 @@ export async function DELETE(
return NextResponse.json({ success: true });
} catch (e) {
console.error("Delete role error:", e);
logError("Delete role error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import {
listRoles,
@ -19,7 +20,7 @@ export async function GET() {
const roles = await listRoles();
return NextResponse.json({ roles });
} catch (e) {
console.error("List roles error:", e);
logError("List roles error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -60,7 +61,7 @@ export async function POST(req: NextRequest) {
if ((e as { code?: string }).code === "23505") {
return NextResponse.json({ error: "角色名已存在" }, { status: 409 });
}
console.error("Create role error:", e);
logError("Create role error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import { getOnlineSessions } from "@/lib/redis";
import { createAuditLog } from "@/lib/log";
@ -14,7 +15,7 @@ export async function GET() {
const sessions = await getOnlineSessions();
return NextResponse.json({ sessions });
} catch (e) {
console.error("Get sessions error:", e);
logError("Get sessions error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -44,7 +45,7 @@ export async function DELETE(req: NextRequest) {
return NextResponse.json({ success: ok });
} catch (e) {
console.error("Delete session error:", e);
logError("Delete session error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import {
getUserById,
@ -32,7 +33,7 @@ export async function GET(
return NextResponse.json({ ...safeUser, roles });
} catch (e) {
console.error("Get user error:", e);
logError("Get user error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -80,7 +81,7 @@ export async function PUT(
const { password_hash: _, ...safeUser } = user;
return NextResponse.json(safeUser);
} catch (e) {
console.error("Update user error:", e);
logError("Update user error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -113,7 +114,7 @@ export async function DELETE(
return NextResponse.json({ success: true });
} catch (e) {
console.error("Delete user error:", e);
logError("Delete user error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -1,3 +1,4 @@
import { logError } from "@/lib/logger";
import { NextRequest, NextResponse } from "next/server";
import {
listUsers,
@ -24,7 +25,7 @@ export async function GET(req: NextRequest) {
const result = await listUsers({ page, pageSize, search });
return NextResponse.json(result);
} catch (e) {
console.error("List users error:", e);
logError("List users error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}
@ -70,7 +71,7 @@ export async function POST(req: NextRequest) {
if ((e as { code?: string }).code === "23505") {
return NextResponse.json({ error: "用户名已存在" }, { status: 409 });
}
console.error("Create user error:", e);
logError("Create user error:", e);
return NextResponse.json({ error: "服务器错误" }, { status: 500 });
}
}

View File

@ -52,7 +52,6 @@ export default function Sidebar({ user, loading, onLogout }: SidebarProps) {
{ href: "/admin/sessions", label: "Admin 会话", icon: "◐" },
{ href: "/platform/sessions", label: "平台会话", icon: "☀" },
{ href: "/admin/api-tokens", label: "API Token", icon: "⚿" },
{ href: "/admin/metrics", label: "指标监控", icon: "◉" },
{ href: "/admin/daily-report", label: "每日报告", icon: "📧" },
],
},

View File

@ -2,7 +2,7 @@
* adminrpc HTTP REST client
*
* Calls the adminrpc HTTP server (default: http://gitdata-adminrpc.gitdataai.svc.cluster.local:9091)
* which exposes the same session management and metrics APIs as the gRPC service.
* which exposes session management and health APIs.
*
* Usage:
* import { listWorkspaceSessions, kickUser } from "@/lib/admin-rpc";
@ -99,31 +99,6 @@ export async function kickUserFromWorkspace(
});
}
// ─── Metrics API ─────────────────────────────────────────────────────────────
export interface InstanceMetrics {
instance_id: string;
timestamp_secs: number;
http: Record<string, string>;
room: Record<string, string>;
}
/** Get metrics across all app instances. */
export async function getMetrics(instanceFilter = ""): Promise<InstanceMetrics[]> {
const qs = instanceFilter ? `?instance_filter=${encodeURIComponent(instanceFilter)}` : "";
return rpc<InstanceMetrics[]>(`/api/admin/metrics${qs}`);
}
/** Export all metrics as CSV string. */
export async function exportMetricsCsv(instanceFilter = ""): Promise<string> {
const qs = instanceFilter ? `?instance_filter=${encodeURIComponent(instanceFilter)}` : "";
const res = await fetch(`${BASE_URL}/api/admin/metrics/export${qs}`);
if (!res.ok) {
throw new Error(`adminrpc GET /metrics/export failed (${res.status})`);
}
return res.text();
}
/** Get adminrpc health status. */
export async function adminRpcHealth(): Promise<{ ok: boolean }> {
return rpc<{ ok: boolean }>("/health");

70
admin/src/lib/logger.ts Normal file
View File

@ -0,0 +1,70 @@
/**
* Structured error logger for the admin module.
*
* All API route errors should use this instead of bare console.error.
* Prints: [timestamp] [ERROR] context | message | stack | extra
*/
export interface LogExtra {
method?: string;
url?: string;
params?: Record<string, unknown>;
body?: unknown;
[key: string]: unknown;
}
/**
* Format and print a detailed error to stderr.
* @param context Short label describing where the error occurred (e.g. "GET /api/users")
* @param error The caught error (any type)
* @param extra Optional additional context (request method, url, params, etc.)
*/
export function logError(context: string, error: unknown, extra?: LogExtra): void {
const timestamp = new Date().toISOString();
// Extract best-effort message
let message = "unknown error";
if (error instanceof Error) {
message = error.message;
} else if (typeof error === "string") {
message = error;
} else if (error && typeof error === "object" && "message" in error) {
message = String((error as { message: unknown }).message);
}
// Extract stack trace
let stack = "";
if (error instanceof Error && error.stack) {
stack = error.stack;
}
const extraStr = extra
? ` | extra: ${JSON.stringify(redact(extra))}`
: "";
// Multi-line format for easy grepping
console.error(
`[${timestamp}] [ERROR] ${context}` +
`\n message: ${message}` +
(stack ? `\n stack: ${stack}` : "") +
extraStr
);
}
/** Recursively remove sensitive fields before logging */
function redact(obj: unknown, depth = 0): unknown {
if (depth > 4) return "[max-depth]";
if (obj === null || obj === undefined) return obj;
if (typeof obj !== "object") return obj;
if (Array.isArray(obj)) return obj.map((v) => redact(v, depth + 1));
const sensitive = ["password", "token", "secret", "key", "authorization", "cookie", "api_key"];
const result: Record<string, unknown> = {};
for (const [k, v] of Object.entries(obj as Record<string, unknown>)) {
if (sensitive.some((s) => k.toLowerCase().includes(s))) {
result[k] = "[REDACTED]";
} else {
result[k] = redact(v, depth + 1);
}
}
return result;
}

View File

@ -78,15 +78,12 @@ async fn main() -> anyhow::Result<()> {
});
let http_handle = tokio::spawn(async move {
let pool_for_http = pool.clone();
let sm_for_http = session_manager.clone();
let result = HttpServer::new(move || {
ActixApp::new()
.app_data(web::Data::new(pool_for_http.clone()))
.app_data(web::Data::new(sm_for_http.clone()))
.route("/health", web::get().to(health))
.route("/admin/metrics/export", web::get().to(metrics_export))
.service(
web::scope("/api/admin")
.route(
@ -117,10 +114,7 @@ async fn main() -> anyhow::Result<()> {
.route(
"/sessions/kick-workspace",
web::post().to(kick_user_from_workspace),
)
// Metrics
.route("/metrics", web::get().to(get_metrics))
.route("/metrics/export", web::get().to(metrics_export)),
),
)
})
.bind(admin_addr)
@ -149,26 +143,6 @@ async fn health() -> HttpResponse {
HttpResponse::Ok().json(serde_json::json!({ "ok": true }))
}
async fn metrics_export(pool: web::Data<cluster::Pool>) -> HttpResponse {
match observability::export_all_metrics_csv(pool.get_ref(), "").await {
Ok(csv) => HttpResponse::Ok()
.content_type("text/csv; charset=utf-8")
.body(csv),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
async fn get_metrics(pool: web::Data<cluster::Pool>) -> HttpResponse {
match observability::query_all_instance_metrics(pool.get_ref(), "", 100).await {
Ok(instances) => HttpResponse::Ok().json(instances),
Err(e) => {
HttpResponse::InternalServerError().json(serde_json::json!({ "error": e.to_string() }))
}
}
}
fn parse_uuid(s: &str) -> Option<Uuid> {
Uuid::parse_str(s).ok()
}

View File

@ -7,7 +7,6 @@ use db::cache::AppCache;
use db::database::AppDatabase;
use observability::{
init_tracing_subscriber, install_recorder, prometheus_handler, spawn_http_metrics_poller,
spawn_redis_metrics_flusher,
HttpMetrics, HttpSnapshotGuard, MetricsMiddleware, TracingSpanMiddleware, instance_id,
};
use sea_orm::ConnectionTrait;
@ -102,18 +101,6 @@ async fn main() -> anyhow::Result<()> {
);
let http_snapshot_data = web::Data::new(http_snapshot);
// ── Redis metrics flusher (every 5s → key: metrics:{instance}:{ts}) ──────
let http_for_flusher = http_metrics.clone();
let prometheus_for_flusher = prometheus_handle_arc.clone();
spawn_redis_metrics_flusher(
cache.redis_pool().clone(),
instance_id(),
prometheus_for_flusher,
http_for_flusher,
std::time::Duration::from_secs(5),
);
tracing::info!("Redis metrics flusher started (5s interval)");
let bind_addr = args.bind.unwrap_or_else(|| "127.0.0.1:8080".to_string());
tracing::info!(bind_addr = %bind_addr, "Listening");
let http_metrics_server = http_metrics.clone();

View File

@ -5,7 +5,9 @@ use async_openai::Client;
use async_openai::types::chat::{
ChatCompletionMessageToolCalls, ChatCompletionRequestAssistantMessage,
ChatCompletionRequestAssistantMessageContent, ChatCompletionRequestMessage,
ChatCompletionRequestSystemMessage, ChatCompletionRequestUserMessage, ChatCompletionTool,
ChatCompletionRequestSystemMessage, ChatCompletionRequestToolMessage,
ChatCompletionRequestToolMessageContent, ChatCompletionRequestUserMessage,
ChatCompletionRequestUserMessageContent, ChatCompletionTool,
ChatCompletionTools, CreateChatCompletionRequest, CreateChatCompletionResponse,
CreateChatCompletionStreamResponse, FinishReason, ReasoningEffort, ToolChoiceOptions,
};
@ -193,7 +195,24 @@ impl ChatService {
.collect();
if !calls.is_empty() {
let tool_messages = self.execute_tool_calls(calls, &request).await?;
let tool_messages = match self.execute_tool_calls(calls, &request).await {
Ok(msgs) => msgs,
Err(e) => {
// Surface the error as a tool result so the model can continue
let err_text = format!("[Tool call failed: {}]", e);
messages.push(ChatCompletionRequestMessage::User(
ChatCompletionRequestUserMessage {
content: ChatCompletionRequestUserMessageContent::Text(err_text.clone()),
name: None,
},
));
tool_depth += 1;
if tool_depth >= max_tool_depth {
return Ok(err_text);
}
continue;
}
};
messages.extend(tool_messages);
tool_depth += 1;
@ -387,7 +406,25 @@ impl ChatService {
},
));
let tool_messages = self.execute_tool_calls(tool_calls, &request).await?;
let tool_messages = match self.execute_tool_calls(tool_calls, &request).await {
Ok(msgs) => msgs,
Err(e) => {
// Stream the FC error as an observation so the user sees it
let err_text = format!("[Tool call failed: {}]", e);
on_chunk(AiStreamChunk {
content: err_text.clone(),
done: true,
})
.await;
// Return an empty tool result so the loop can continue
vec![ChatCompletionRequestMessage::Tool(
ChatCompletionRequestToolMessage {
tool_call_id: String::new(),
content: ChatCompletionRequestToolMessageContent::Text(err_text),
},
)]
}
};
messages.extend(tool_messages);
tool_depth += 1;

View File

@ -378,15 +378,21 @@ fn parse_react_response(content: &str) -> ParsedReActResponse {
}
}
/// Extract the first JSON object or array from a string, handling markdown fences.
/// Extract a JSON object or array from a string, even when wrapped in non-JSON text.
/// Handles: raw JSON at start, JSON in code fences, JSON buried in text (e.g. after
/// a prefix line like "[Agent ran through N steps]").
fn extract_json(s: &str) -> Option<String> {
let trimmed = s.trim();
// Direct match — starts with { or [
if trimmed.starts_with('{') || trimmed.starts_with('[') {
return Some(trimmed.to_string());
}
// Code fence handling
for line in trimmed.lines() {
let line = line.trim();
if line.starts_with("```json") || line.starts_with("```") {
if line.starts_with("```json") || line == "```" {
let mut buf = String::new();
let mut found_start = false;
for l in trimmed.lines() {
@ -409,6 +415,47 @@ fn extract_json(s: &str) -> Option<String> {
}
}
}
// Scan for JSON object/array buried in text (common with prefix lines).
// Find the first '{' or '[' that is NOT preceded by a word character,
// then try to parse from there (stripping trailing non-JSON text).
let chars: Vec<char> = trimmed.chars().collect();
for i in 0..chars.len() {
let c = chars[i];
if (c == '{' || c == '[') && i > 0 {
// Skip if preceded by a word character (would be part of a string value)
let prev = chars[i - 1];
if prev.is_alphanumeric() || prev == '_' || prev == '"' || prev == '\'' {
continue;
}
let candidate: String = chars[i..].iter().collect();
// Try full candidate first
if serde_json::from_str::<serde_json::Value>(&candidate).is_ok() {
return Some(candidate.trim_end().to_string());
}
// Try stripping trailing text (text after the JSON closing brace/bracket)
let mut depth = 0isize;
let mut in_string = false;
let mut escaped = false;
for (j, c) in candidate.char_indices() {
if escaped { escaped = false; continue; }
if c == '\\' { escaped = true; continue; }
if c == '"' { in_string = !in_string; continue; }
if in_string { continue; }
if c == '{' || c == '[' { depth += 1; }
if c == '}' || c == ']' { depth -= 1; }
if depth == 0 {
// Found the end of the JSON value
let json_end = j + c.len_utf8();
let trimmed_candidate = &candidate[..json_end];
if serde_json::from_str::<serde_json::Value>(trimmed_candidate).is_ok() {
return Some(trimmed_candidate.to_string());
}
}
}
}
}
None
}

View File

@ -728,6 +728,9 @@ impl WsRequestHandler {
WsAction::UnsubscribeRoom => Ok(WsResponseData::bool(true)),
WsAction::SubscribeProject => Ok(WsResponseData::subscribed(None, None)),
WsAction::UnsubscribeProject => Ok(WsResponseData::bool(true)),
// TypingStart/TypingStop are handled directly in ws_universal.rs
// (interception point sends response there, so this arm is never reached)
WsAction::TypingStart | WsAction::TypingStop => Ok(WsResponseData::bool(true)),
}
}
}

View File

@ -114,6 +114,10 @@ pub enum WsAction {
SubscribeProject,
#[serde(rename = "project.unsubscribe")]
UnsubscribeProject,
#[serde(rename = "typing.start")]
TypingStart,
#[serde(rename = "typing.stop")]
TypingStop,
}
impl std::fmt::Display for WsAction {
@ -164,6 +168,8 @@ impl std::fmt::Display for WsAction {
WsAction::UnsubscribeRoom => write!(f, "room.unsubscribe"),
WsAction::SubscribeProject => write!(f, "project.subscribe"),
WsAction::UnsubscribeProject => write!(f, "project.unsubscribe"),
WsAction::TypingStart => write!(f, "typing.start"),
WsAction::TypingStop => write!(f, "typing.stop"),
}
}
}
@ -211,6 +217,8 @@ pub struct WsRequestParams {
pub min_score: Option<f32>,
pub query: Option<String>,
pub attachment_ids: Option<Vec<Uuid>>,
/// Typing event: "start" or "stop"
pub typing: Option<String>,
}
#[derive(Debug, Clone, Serialize)]

View File

@ -9,7 +9,7 @@ use tokio_stream::wrappers::BroadcastStream;
use uuid::Uuid;
use crate::error::ApiError;
use queue::{ReactionGroup, RoomMessageEvent, RoomMessageStreamChunkEvent};
use queue::{ReactionGroup, RoomMessageEvent, RoomMessageStreamChunkEvent, TypingEvent};
use room::connection::RoomConnectionManager;
use service::AppService;
@ -40,6 +40,10 @@ pub enum WsPushEvent {
room_id: Uuid,
chunk: Arc<RoomMessageStreamChunkEvent>,
},
TypingIndicator {
room_id: Uuid,
event: Arc<TypingEvent>,
},
}
/// Maps room_id -> (room_message_broadcast_stream, stream_chunk_broadcast_stream)
@ -48,6 +52,7 @@ type PushStreams = HashMap<
(
BroadcastStream<Arc<RoomMessageEvent>>,
BroadcastStream<Arc<RoomMessageStreamChunkEvent>>,
BroadcastStream<Arc<TypingEvent>>,
),
>;
@ -245,6 +250,22 @@ pub async fn ws_universal(
break;
}
}
Some(WsPushEvent::TypingIndicator { room_id, event }) => {
let payload = serde_json::json!({
"type": "event",
"event": "room.typing",
"room_id": room_id,
"data": {
"user_id": event.user_id,
"username": event.username,
"avatar_url": event.avatar_url,
"action": event.action,
},
});
if session.text(payload.to_string()).await.is_err() {
break;
}
}
None => {
}
}
@ -307,9 +328,11 @@ pub async fn ws_universal(
match manager.subscribe(room_id, user_id).await {
Ok(rx) => {
let stream_rx = manager.subscribe_room_stream(room_id).await;
let typing_rx = manager.subscribe_typing(room_id).await;
push_streams.insert(room_id, (
BroadcastStream::new(rx),
BroadcastStream::new(stream_rx),
BroadcastStream::new(typing_rx),
));
let _ = session.text(serde_json::to_string(&WsResponse::success(
request.request_id, &action_str,
@ -338,6 +361,24 @@ pub async fn ws_universal(
request.request_id, &action_str, WsResponseData::bool(true)
)).unwrap_or_default()).await;
}
WsAction::TypingStart | WsAction::TypingStop => {
if let (Some(room_id), Some(action)) =
(request.params().room_id, request.params().typing.as_deref())
{
let names = handler.service().room.get_user_names(&[user_id]).await;
let typing_event = TypingEvent {
room_id,
user_id,
username: names.into_values().next().unwrap_or_else(|| "unknown".to_string()),
avatar_url: None,
action: action.to_string(),
};
manager.broadcast_typing(room_id, typing_event).await;
}
let _ = session.text(serde_json::to_string(&WsResponse::success(
request.request_id, &action_str, WsResponseData::bool(true)
)).unwrap_or_default()).await;
}
_ => {
let resp = handler.handle(request).await;
let _ = session.text(serde_json::to_string(&resp).unwrap_or_default()).await;
@ -383,7 +424,7 @@ async fn poll_push_streams(
let mut dead_rooms: Vec<Uuid> = Vec::new();
for room_id in room_ids {
if let Some((msg_stream, chunk_stream)) = streams.get_mut(&room_id) {
if let Some((msg_stream, chunk_stream, typing_stream)) = streams.get_mut(&room_id) {
tokio::select! {
result = msg_stream.next() => {
match result {
@ -412,6 +453,16 @@ async fn poll_push_streams(
}
}
}
result = typing_stream.next() => {
match result {
Some(Ok(event)) => {
return Some(WsPushEvent::TypingIndicator { room_id, event });
}
Some(Err(_)) | None => {
// Typing channel going dead is non-fatal — typing is ephemeral
}
}
}
}
}
}
@ -424,9 +475,11 @@ async fn poll_push_streams(
if service.room.check_room_access(room_id, user_id).await.is_ok() {
if let Ok(rx) = manager.subscribe(room_id, user_id).await {
let stream_rx = manager.subscribe_room_stream(room_id).await;
let typing_rx = manager.subscribe_typing(room_id).await;
streams.insert(room_id, (
BroadcastStream::new(rx),
BroadcastStream::new(stream_rx),
BroadcastStream::new(typing_rx),
));
}
}

View File

@ -25,10 +25,6 @@ serde = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = ["rt"] }
anyhow = { workspace = true }
# Redis (for metrics exporter)
redis = { workspace = true }
deadpool-redis = { workspace = true, features = ["cluster"] }
# Prometheus metrics export
metrics = "0.22"
metrics-exporter-prometheus = "0.13"

View File

@ -9,7 +9,6 @@ pub mod metrics_middleware;
pub mod prometheus_exporter;
pub mod otlp;
pub mod tracing_middleware;
pub mod redis_metrics;
pub use tracing_fmt::{init_tracing_subscriber, instance_id};
pub use metrics_middleware::{MetricsMiddleware, HttpMetrics};
@ -19,9 +18,3 @@ pub use prometheus_exporter::{
};
pub use otlp::{init_otlp, OtelGuard};
pub use tracing_middleware::TracingSpanMiddleware;
pub use redis_metrics::{
MetricsSnapshot,
spawn_redis_metrics_flusher,
query_instance_metrics, query_all_instance_metrics, export_all_metrics_csv,
FlatInstanceMetrics,
};

View File

@ -1,342 +0,0 @@
//! Redis-backed metrics exporter.
//!
//! Every `interval` seconds this module pushes metric fields to a Redis hash
//! bucketed by day:
//! Key: `metrics:{instance_id}:{YYYY-MM-DD}` (hash)
//! TTL: 30 days per day bucket
//!
//! Also maintains a sorted set `metrics:index:{instance_id}:{YYYY-MM-DD}` per
//! day for querying within a day window.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use deadpool_redis::cluster::Pool;
use serde::{Deserialize, Serialize};
const METRICS_TTL_SECS: i64 = 30 * 24 * 60 * 60; // 30 days
/// A snapshot of metric values at a point in time.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricsSnapshot {
pub instance_id: String,
pub timestamp_secs: i64,
#[serde(flatten)]
pub http: HashMap<String, serde_json::Value>,
#[serde(flatten)]
pub room: HashMap<String, serde_json::Value>,
}
/// Starts a background task that snapshots and pushes all metrics to Redis
/// every `interval` seconds.
///
/// `prometheus_handle`: from `install_recorder()`, used to render RoomMetrics values
/// `http_metrics`: shared `HttpMetrics` from `MetricsMiddleware`
/// `instance_id`: used as the Redis key prefix
pub fn spawn_redis_metrics_flusher(
redis_pool: Pool,
instance_id: String,
prometheus_handle: Arc<metrics_exporter_prometheus::PrometheusHandle>,
http_metrics: Arc<crate::metrics_middleware::HttpMetrics>,
interval: Duration,
) {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
loop {
ticker.tick().await;
if let Err(e) = flush_metrics(&redis_pool, &instance_id, &prometheus_handle, http_metrics.as_ref()).await {
tracing::warn!(error = %e, "failed to flush metrics to Redis");
}
}
});
}
async fn flush_metrics(
pool: &Pool,
instance_id: &str,
prometheus_handle: &Arc<metrics_exporter_prometheus::PrometheusHandle>,
http_metrics: &crate::metrics_middleware::HttpMetrics,
) -> anyhow::Result<()> {
let now = chrono::Utc::now();
let now_ts = now.timestamp();
let day_key = now.format("%Y-%m-%d").to_string();
// RoomMetrics from the metrics crate (rendered via PrometheusHandle)
let body = prometheus_handle.render();
let room = crate::prometheus_exporter::render_to_hashmap(&body);
// HTTP metrics from AtomicU64
let http = http_metrics.snapshot();
let hash_key = format!("metrics:{}:{}", instance_id, day_key);
let index_key = format!("metrics:index:{}:{}", instance_id, day_key);
let mut conn = pool.get().await?;
// HSET each metric field into the daily hash
let mut fields: Vec<(String, String)> = Vec::with_capacity(http.len() + room.len());
for (k, v) in &http {
fields.push((format!("http_{}", k), v.to_string()));
}
for (k, v) in &room {
fields.push((format!("room_{}", k), v.to_string()));
}
// HSET field field_value ...
let mut cmd = redis::cmd("HSET");
cmd.arg(&hash_key);
for (f, val) in &fields {
cmd.arg(f).arg(val);
}
let _: () = cmd.query_async(&mut conn).await?;
// Set TTL on the hash key (refresh every write)
let _: () = redis::cmd("EXPIRE")
.arg(&hash_key)
.arg(METRICS_TTL_SECS)
.query_async(&mut conn)
.await?;
// Update sorted set index for this instance+day
let _: () = redis::cmd("ZADD")
.arg(&index_key)
.arg(now_ts as f64)
.arg(now_ts.to_string())
.query_async(&mut conn)
.await?;
// Trim index to last 1000 entries per day
let _: () = redis::cmd("ZREMRANGEBYRANK")
.arg(&index_key)
.arg(0)
.arg(-1001)
.query_async(&mut conn)
.await?;
// Set TTL on the index key
let _: () = redis::cmd("EXPIRE")
.arg(&index_key)
.arg(METRICS_TTL_SECS)
.query_async(&mut conn)
.await?;
tracing::debug!(key = %hash_key, field_count = fields.len(), "metrics flushed to Redis (daily hash)");
Ok(())
}
/// Query metrics for an instance from Redis, reading the most recent `limit`
/// snapshots across all available daily hash buckets.
/// Returns a vector sorted by timestamp ascending.
pub async fn query_instance_metrics(
pool: &Pool,
instance_id: &str,
limit: usize,
) -> anyhow::Result<Vec<(i64, MetricsSnapshot)>> {
let mut conn = pool.get().await?;
// Scan for all daily index keys for this instance
let index_pattern = format!("metrics:index:{}:*", instance_id);
let mut cursor = 0u64;
let mut day_keys: Vec<String> = Vec::new();
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg(&index_pattern)
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await?;
day_keys.extend(keys);
cursor = new_cursor;
if cursor == 0 {
break;
}
}
// Collect all timestamps from all daily indexes
let mut all_timestamps: Vec<i64> = Vec::new();
for day_key in &day_keys {
let timestamps: Vec<i64> = redis::cmd("ZREVRANGE")
.arg(day_key)
.arg(0)
.arg((limit as isize - 1).max(0))
.query_async(&mut conn)
.await?;
all_timestamps.extend(timestamps);
}
// Sort descending and take limit
all_timestamps.sort_by_key(|&ts| std::cmp::Reverse(ts));
all_timestamps.truncate(limit);
// Read each snapshot from the daily hash
let mut results = Vec::with_capacity(all_timestamps.len());
for ts in &all_timestamps {
// Derive day bucket from timestamp
let dt = chrono::DateTime::from_timestamp(*ts, 0)
.unwrap_or_else(|| chrono::Utc::now());
let day_str = dt.format("%Y-%m-%d").to_string();
let hash_key = format!("metrics:{}:{}", instance_id, day_str);
let fields: Option<HashMap<String, String>> = redis::cmd("HGETALL")
.arg(&hash_key)
.query_async(&mut conn)
.await?;
if let Some(fields) = fields {
let mut http: HashMap<String, serde_json::Value> = HashMap::new();
let mut room: HashMap<String, serde_json::Value> = HashMap::new();
for (k, v) in &fields {
let json_val: serde_json::Value = serde_json::from_str(v).unwrap_or_else(|_| serde_json::Value::String(v.clone()));
if let Some(stripped) = k.strip_prefix("http_") {
http.insert(stripped.to_string(), json_val);
} else if let Some(stripped) = k.strip_prefix("room_") {
room.insert(stripped.to_string(), json_val);
}
}
results.push((*ts, MetricsSnapshot {
instance_id: instance_id.to_string(),
timestamp_secs: *ts,
http,
room,
}));
}
}
results.sort_by_key(|(ts, _)| *ts);
Ok(results)
}
/// Flat instance metrics suitable for JSON serialization.
#[derive(serde::Serialize)]
pub struct FlatInstanceMetrics {
pub instance_id: String,
pub timestamp_secs: i64,
pub http: std::collections::HashMap<String, String>,
pub room: std::collections::HashMap<String, String>,
}
/// Query all instances' metrics, optionally filtered by `instance_filter`.
pub async fn query_all_instance_metrics(
pool: &Pool,
instance_filter: &str,
limit: usize,
) -> anyhow::Result<Vec<FlatInstanceMetrics>> {
let mut conn = pool.get().await?;
// Scan for all daily index keys to extract instance ids
let mut cursor = 0u64;
let mut all_instance_ids = Vec::new();
let mut seen = std::collections::HashSet::new();
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg("metrics:index:*")
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await?;
for k in keys {
// Key format: metrics:index:{instance}:{YYYY-MM-DD}
if let Some(rest) = k.strip_prefix("metrics:index:") {
if let Some((instance_id, _)) = rest.rsplit_once(':') {
if seen.insert(instance_id.to_string()) {
all_instance_ids.push(instance_id.to_string());
}
}
}
}
cursor = new_cursor;
if cursor == 0 {
break;
}
}
let mut results = Vec::new();
for instance_id in all_instance_ids {
if !instance_filter.is_empty() && !instance_id.contains(instance_filter) {
continue;
}
let snapshots = query_instance_metrics(pool, &instance_id, limit).await?;
for (_, payload) in snapshots {
let mut http: std::collections::HashMap<String, String> = std::collections::HashMap::new();
let mut room: std::collections::HashMap<String, String> = std::collections::HashMap::new();
for (k, v) in &payload.http {
http.insert(k.clone(), v.to_string());
}
for (k, v) in &payload.room {
room.insert(k.clone(), v.to_string());
}
results.push(FlatInstanceMetrics {
instance_id: payload.instance_id,
timestamp_secs: payload.timestamp_secs,
http,
room,
});
}
}
Ok(results)
}
/// Export all metrics across all known instances as CSV.
pub async fn export_all_metrics_csv(
pool: &Pool,
instance_filter: &str,
) -> anyhow::Result<String> {
let mut conn = pool.get().await?;
// Scan for all daily index keys to extract instance ids
let mut cursor = 0u64;
let mut all_instance_ids = Vec::new();
let mut seen = std::collections::HashSet::new();
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg("metrics:index:*")
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await?;
for k in keys {
// Key format: metrics:index:{instance}:{YYYY-MM-DD}
if let Some(rest) = k.strip_prefix("metrics:index:") {
if let Some(instance_id) = rest.rsplit_once(':') {
let id = instance_id.0;
if seen.insert(id.to_string()) {
all_instance_ids.push(id.to_string());
}
}
}
}
cursor = new_cursor;
if cursor == 0 {
break;
}
}
let mut rows: Vec<String> = vec![
"instance_id,timestamp,metric,value".to_string(),
];
for instance_id in all_instance_ids {
if !instance_filter.is_empty() && !instance_id.contains(instance_filter) {
continue;
}
let metrics = query_instance_metrics(pool, &instance_id, 1000).await?;
for (ts, payload) in metrics {
for (k, v) in &payload.http {
rows.push(format!("{},{},http_{},{}", instance_id, ts, k, v));
}
for (k, v) in &payload.room {
rows.push(format!("{},{},room_{},{}", instance_id, ts, k, v));
}
}
}
Ok(rows.join("\n"))
}

View File

@ -7,7 +7,7 @@ pub mod worker;
pub use producer::{MessageProducer, RedisPubSub};
pub use types::{
AgentTaskEvent, EmailEnvelope, ProjectRoomEvent, ReactionGroup, RoomMessageEnvelope,
RoomMessageEvent, RoomMessageStreamChunkEvent,
RoomMessageEvent, RoomMessageStreamChunkEvent, TypingEvent,
};
pub use worker::{
room_worker_task, start as start_worker, start_email_worker, EmailSendFn, EmailSendFut, GetRedis,

View File

@ -42,6 +42,17 @@ pub struct RoomMessageEvent {
pub message_id: Option<Uuid>,
}
/// Typing indicator event — broadcast to all room members.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TypingEvent {
pub room_id: Uuid,
pub user_id: Uuid,
pub username: String,
pub avatar_url: Option<String>,
/// "start" or "stop"
pub action: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReactionGroup {
pub emoji: String,

View File

@ -7,8 +7,10 @@ use std::time::{Duration, Instant};
use tokio::sync::{RwLock, broadcast};
use uuid::Uuid;
use db::cache::AppCache;
use db::database::AppDatabase;
use models::rooms::{MessageContentType, MessageSenderType, room_message};
use queue::types::TypingEvent;
use queue::{AgentTaskEvent, ProjectRoomEvent, RoomMessageEnvelope, RoomMessageEvent, RoomMessageStreamChunkEvent};
use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, Set};
@ -33,6 +35,7 @@ pub struct RoomConnectionManager {
/// Broadcast channel for agent task events per project.
task_inner: RwLock<HashMap<Uuid, broadcast::Sender<Arc<AgentTaskEvent>>>>,
pub metrics: Arc<RoomMetrics>,
cache: AppCache,
connection_rate: RwLock<HashMap<(Uuid, Uuid), Instant>>,
shutdown_tx: broadcast::Sender<()>,
room_shutdown_txs: RwLock<HashMap<Uuid, broadcast::Sender<()>>>,
@ -40,6 +43,7 @@ pub struct RoomConnectionManager {
user_shutdown_txs: RwLock<HashMap<Uuid, broadcast::Sender<()>>>,
stream_inner: RwLock<HashMap<Uuid, broadcast::Sender<Arc<RoomMessageStreamChunkEvent>>>>,
room_stream_inner: RwLock<HashMap<Uuid, broadcast::Sender<Arc<RoomMessageStreamChunkEvent>>>>,
typing_inner: RwLock<HashMap<Uuid, broadcast::Sender<Arc<TypingEvent>>>>,
room_last_activity: RwLock<HashMap<Uuid, Instant>>,
room_subscriber_count: RwLock<HashMap<Uuid, usize>>,
project_subscriber_count: RwLock<HashMap<Uuid, usize>>,
@ -47,7 +51,7 @@ pub struct RoomConnectionManager {
}
impl RoomConnectionManager {
pub fn new(metrics: Arc<RoomMetrics>) -> Self {
pub fn new(metrics: Arc<RoomMetrics>, cache: AppCache) -> Self {
let (shutdown_tx, _) = broadcast::channel(SHUTDOWN_CHANNEL_CAPACITY);
Self {
#[allow(clippy::default_constructed_unit_structs)]
@ -61,6 +65,7 @@ impl RoomConnectionManager {
#[allow(clippy::default_constructed_unit_structs)]
task_inner: RwLock::new(HashMap::new()),
metrics,
cache,
#[allow(clippy::default_constructed_unit_structs)]
connection_rate: RwLock::new(HashMap::new()),
shutdown_tx,
@ -75,6 +80,8 @@ impl RoomConnectionManager {
#[allow(clippy::default_constructed_unit_structs)]
room_stream_inner: RwLock::new(HashMap::new()),
#[allow(clippy::default_constructed_unit_structs)]
typing_inner: RwLock::new(HashMap::new()),
#[allow(clippy::default_constructed_unit_structs)]
room_last_activity: RwLock::new(HashMap::new()),
#[allow(clippy::default_constructed_unit_structs)]
room_subscriber_count: RwLock::new(HashMap::new()),
@ -621,6 +628,57 @@ impl RoomConnectionManager {
let mut map = self.stream_inner.write().await;
map.remove(&message_id);
}
pub async fn subscribe_typing(
&self,
room_id: Uuid,
) -> broadcast::Receiver<Arc<TypingEvent>> {
let mut map: tokio::sync::RwLockWriteGuard<'_, std::collections::HashMap<Uuid, broadcast::Sender<Arc<TypingEvent>>>> = self.typing_inner.write().await;
if let Some(tx) = map.get(&room_id) {
return tx.subscribe();
}
let (tx, rx) = broadcast::channel(BROADCAST_CAPACITY);
map.insert(room_id, tx);
rx
}
/// Broadcast a typing event and persist it to Redis with 10s TTL.
/// - "start": writes key with 10s expiry, broadcasts start event
/// - "stop": deletes key, broadcasts stop event
pub async fn broadcast_typing(&self, room_id: Uuid, event: TypingEvent) {
let user_key = format!("typing:{}:{}", room_id, event.user_id);
let action = event.action.clone();
let username = event.username.clone();
let avatar_url = event.avatar_url.clone();
// Write/delete Redis key for 10s expiry (non-blocking)
if let Ok(mut conn) = self.cache.conn().await {
let key = user_key;
tokio::spawn(async move {
if action == "start" {
let value = serde_json::json!({
"username": username,
"avatar_url": avatar_url,
})
.to_string();
let _: Result<(), _> = redis::cmd("SETEX")
.arg(&key)
.arg(10i64)
.arg(&value)
.query_async(&mut conn)
.await;
} else {
let _: Result<(), _> = redis::cmd("DEL").arg(&key).query_async(&mut conn).await;
}
});
}
let map: tokio::sync::RwLockReadGuard<'_, std::collections::HashMap<Uuid, broadcast::Sender<Arc<TypingEvent>>>> = self.typing_inner.read().await;
if let Some(tx) = map.get(&room_id) {
let event = Arc::new(event);
let _ = tx.send(event);
}
}
}
fn parse_sender_type(s: &str) -> MessageSenderType {
@ -738,15 +796,24 @@ pub fn make_persist_fn(
.exec(&db)
.await?;
// Update content_tsv for inserted messages
for env in chunk.iter() {
let update_sql = format!(
"UPDATE room_message SET content_tsv = to_tsvector('simple', content) WHERE id = '{}'",
env.id
// Batch update content_tsv using a single UPDATE with subquery
// instead of N individual UPDATE statements (N=chunk size, up to 100)
let ids: Vec<String> = chunk
.iter()
.filter(|e| !existing_ids.contains(&e.id))
.map(|e| format!("'{}'", e.id))
.collect();
if !ids.is_empty() {
let batch_sql = format!(
"UPDATE room_message AS t \
SET content_tsv = to_tsvector('simple', content) \
WHERE t.id IN ({})",
ids.join(",")
);
let stmt = sea_orm::Statement::from_sql_and_values(
sea_orm::DbBackend::Postgres,
&update_sql,
&batch_sql,
vec![],
);
let _ = db.execute_raw(stmt).await;

View File

@ -284,7 +284,7 @@ impl RoomService {
.await;
}
let should_respond = match self.should_ai_respond(room_id).await {
let should_respond = match self.should_ai_respond(room_id, &content).await {
Ok(v) => v,
Err(e) => {
tracing::warn!(room_id = %room_id, error = %e, "should_ai_respond failed");

View File

@ -756,7 +756,16 @@ impl RoomService {
}
}
pub async fn should_ai_respond(&self, room_id: Uuid) -> Result<bool, RoomError> {
/// Determine whether AI should respond to a message in this room.
/// - No room_ai config → AI not configured, never respond.
/// - use_exact = false → respond to every text message.
/// - use_exact = true → only respond when the message contains an @[ai:...] or
/// <mention type="ai">... tag that mentions this room's configured AI model.
pub async fn should_ai_respond(
&self,
room_id: Uuid,
content: &str,
) -> Result<bool, RoomError> {
use models::rooms::room_ai;
let ai_config = room_ai::Entity::find()
@ -764,7 +773,37 @@ impl RoomService {
.one(&self.db)
.await?;
Ok(ai_config.is_some())
let config = match ai_config {
Some(c) => c,
None => return Ok(false),
};
if !config.use_exact {
return Ok(true);
}
// use_exact mode: only respond when AI is explicitly mentioned
let model_id_str = config.model.to_string();
// Check @[ai:model_id:label] format
for cap in MENTION_BRACKET_RE.captures_iter(content) {
if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) {
if type_m.as_str() == "ai" && id_m.as_str().trim() == model_id_str {
return Ok(true);
}
}
}
// Check <mention type="ai" id="model_id">label</mention> format
for cap in MENTION_TAG_RE.captures_iter(content) {
if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) {
if type_m.as_str() == "ai" && id_m.as_str().trim() == model_id_str {
return Ok(true);
}
}
}
Ok(false)
}
pub async fn get_room_ai_config(
@ -1179,6 +1218,20 @@ impl RoomService {
}
Err(e) => {
tracing::error!(error = %e, "AI processing failed");
// Send an error message so the user knows something went wrong
let _ = Self::create_and_publish_ai_message(
&db,
&cache,
&queue,
&room_manager,
room_id_for_ai,
project_id_for_ai,
Uuid::now_v7(),
format!("[AI error: {}]", e),
model_id_inner,
Some(model_display_name),
)
.await;
}
}
});
@ -1249,6 +1302,19 @@ impl RoomService {
}
Err(e) => {
tracing::error!(error = %e, "ReAct agent failed");
let _ = Self::create_and_publish_ai_message(
&db,
&cache,
&queue,
&room_manager,
room_id_for_ai,
project_id_for_ai,
Uuid::now_v7(),
format!("[AI error: {}]", e),
model_id_inner,
Some(model_display_name),
)
.await;
}
}
});
@ -1288,7 +1354,9 @@ impl RoomService {
tokio::spawn(async move {
let _lock_guard = lock_guard;
// Buffer each ReactStep and forward as a stream chunk.
// Buffer all reasoning steps + the final answer separately.
let reasoning_buffer: std::sync::Arc<std::sync::Mutex<String>> =
std::sync::Arc::new(std::sync::Mutex::new(String::new()));
let answer_buffer: std::sync::Arc<std::sync::Mutex<String>> =
std::sync::Arc::new(std::sync::Mutex::new(String::new()));
let step_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
@ -1299,6 +1367,7 @@ impl RoomService {
let room_id = room_id_inner;
let step_count = step_count.clone();
let ai_display_name_for_step = std::sync::Arc::new(ai_display_name.clone());
let reasoning_buffer = reasoning_buffer.clone();
let answer_buffer = answer_buffer.clone();
move |step: ReactStep| {
let room_manager = room_manager.clone();
@ -1320,31 +1389,35 @@ impl RoomService {
}
};
if let ReactStep::Answer { .. } = &step {
let is_answer = matches!(&step, ReactStep::Answer { .. });
if is_answer {
step_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
let done = matches!(&step, ReactStep::Answer { .. });
let content_for_buffer = if done {
content.clone()
} else {
String::new()
};
let done = is_answer;
let ai_name = ai_display_name_for_step.clone();
let reasoning_buf = reasoning_buffer.clone();
let answer_buf = answer_buffer.clone();
tokio::spawn(async move {
// Always broadcast every step as a stream chunk
let event = RoomMessageStreamChunkEvent {
message_id: streaming_msg_id,
room_id,
content,
content: content.clone(),
done,
error: None,
display_name: Some((*ai_name).clone()),
};
room_manager.broadcast_stream_chunk(event).await;
if done {
answer_buf.lock().unwrap().push_str(&content_for_buffer);
// Collect all steps into reasoning_buffer; Answer goes to answer_buffer
let mut rb = reasoning_buf.lock().unwrap();
rb.push_str(&content);
rb.push('\n');
drop(rb);
if is_answer {
answer_buf.lock().unwrap().push_str(&content);
}
});
}
@ -1355,89 +1428,110 @@ impl RoomService {
.await;
let final_content = answer_buffer.lock().unwrap().clone();
let reasoning_chain = reasoning_buffer.lock().unwrap().clone();
match result {
Ok(_) if !final_content.is_empty() => {
let envelope = RoomMessageEnvelope {
id: streaming_msg_id,
dedup_key: Some(format!("{}:{}", room_id_inner, streaming_msg_id)),
room_id: room_id_inner,
sender_type: sender_type.clone(),
sender_id: None,
model_id: Some(model_id_inner),
thread_id: None,
content: final_content.clone(),
content_type: "text".to_string(),
send_at: now,
seq,
in_reply_to: None,
};
// Determine what to persist: prefer the answer, fall back to the reasoning chain
let content_to_persist = if !final_content.is_empty() {
final_content
} else if !reasoning_chain.trim().is_empty() {
// No Answer step, but the reasoning chain was streamed — still send it
format!(
"[Agent ran through {} reasoning steps but did not produce a final answer.]\n{}",
step_count.load(std::sync::atomic::Ordering::Relaxed),
reasoning_chain.trim_end()
)
} else {
// Nothing produced — this should not happen in practice
String::from("[No output from reasoning agent]")
};
if let Err(e) = queue.publish(room_id_inner, envelope).await {
tracing::error!(error = %e, "Failed to publish ReAct streaming message");
} else {
let now = Utc::now();
if let Err(e) = room_ai::Entity::update_many()
.col_expr(
room_ai::Column::CallCount,
Expr::col(room_ai::Column::CallCount).add(1),
)
.col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now)))
.filter(room_ai::Column::Room.eq(room_id_inner))
.filter(room_ai::Column::Model.eq(model_id_inner))
.exec(&db)
.await
{
tracing::warn!(error = %e, "Failed to update room_ai call stats");
}
let (err_msg, should_log) = match &result {
Err(e) => (Some(format!("[Agent error: {}]", e)), true),
_ => (None, false),
};
let msg_event = queue::RoomMessageEvent {
id: streaming_msg_id,
room_id: room_id_inner,
sender_type: sender_type.clone(),
sender_id: None,
thread_id: None,
content: final_content,
content_type: "text".to_string(),
send_at: now,
seq,
display_name: Some(ai_display_name.clone()),
in_reply_to: None,
reactions: None,
message_id: None,
};
room_manager.broadcast(room_id_inner, msg_event).await;
room_manager.metrics.messages_sent.increment(1);
let content_to_persist = if let Some(msg) = &err_msg {
format!(
"{}\n[Error during reasoning: {}]",
content_to_persist.trim_end(),
msg.trim_start_matches("[Agent error: ").trim_end_matches("]")
)
} else {
content_to_persist
};
let event = queue::ProjectRoomEvent {
event_type: super::RoomEventType::NewMessage.as_str().into(),
project_id: project_id_inner,
room_id: Some(room_id_inner),
category_id: None,
message_id: Some(streaming_msg_id),
seq: Some(seq),
timestamp: now,
};
queue
.publish_project_room_event(project_id_inner, event)
.await;
}
}
Ok(_) => {
tracing::warn!("ReAct agent returned empty answer");
}
Err(e) => {
tracing::error!(error = %e, "ReAct streaming failed");
let event = RoomMessageStreamChunkEvent {
message_id: streaming_msg_id,
room_id: room_id_inner,
content: String::new(),
done: true,
error: Some(e.to_string()),
display_name: Some(ai_display_name.clone()),
};
room_manager.broadcast_stream_chunk(event).await;
if should_log {
tracing::error!(error = %result.as_ref().unwrap_err(), "ReAct streaming failed");
}
let persist_content = content_to_persist.trim().to_string();
if persist_content.is_empty() {
return;
}
let envelope = RoomMessageEnvelope {
id: streaming_msg_id,
dedup_key: Some(format!("{}:{}", room_id_inner, streaming_msg_id)),
room_id: room_id_inner,
sender_type: sender_type.clone(),
sender_id: None,
model_id: Some(model_id_inner),
thread_id: None,
content: persist_content.clone(),
content_type: "text".to_string(),
send_at: now,
seq,
in_reply_to: None,
};
if let Err(e) = queue.publish(room_id_inner, envelope).await {
tracing::error!(error = %e, "Failed to publish ReAct streaming message");
} else {
let now = Utc::now();
if let Err(e) = room_ai::Entity::update_many()
.col_expr(
room_ai::Column::CallCount,
Expr::col(room_ai::Column::CallCount).add(1),
)
.col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now)))
.filter(room_ai::Column::Room.eq(room_id_inner))
.filter(room_ai::Column::Model.eq(model_id_inner))
.exec(&db)
.await
{
tracing::warn!(error = %e, "Failed to update room_ai call stats");
}
let msg_event = queue::RoomMessageEvent {
id: streaming_msg_id,
room_id: room_id_inner,
sender_type: sender_type.clone(),
sender_id: None,
thread_id: None,
content: persist_content,
content_type: "text".to_string(),
send_at: now,
seq,
display_name: Some(ai_display_name.clone()),
in_reply_to: None,
reactions: None,
message_id: None,
};
room_manager.broadcast(room_id_inner, msg_event).await;
room_manager.metrics.messages_sent.increment(1);
let event = queue::ProjectRoomEvent {
event_type: super::RoomEventType::NewMessage.as_str().into(),
project_id: project_id_inner,
room_id: Some(room_id_inner),
category_id: None,
message_id: Some(streaming_msg_id),
seq: Some(seq),
timestamp: now,
};
queue
.publish_project_room_event(project_id_inner, event)
.await;
}
room_manager.close_stream_channel(streaming_msg_id).await;
@ -1540,52 +1634,39 @@ impl RoomService {
let mut conn = cache.conn().await.map_err(|e| {
RoomError::Internal(format!("failed to get redis connection for seq: {}", e))
})?;
// Atomically increment and check via Lua: INCR first, then if Redis was
// externally set to a higher value, jump to max+1. This prevents concurrent
// requests from getting duplicate seqs — the Lua script runs as one atomic unit.
let seq: i64 = redis::cmd("EVAL")
.arg(
r#"
local current = redis.call('INCR', KEYS[1])
local stored = redis.call('GET', KEYS[1])
if stored and tonumber(stored) > current then
local next = tonumber(stored) + 1
redis.call('SET', KEYS[1], next)
return next
end
return current
"#,
)
.arg(1)
// Normal path: Redis INCR is atomic and sufficient for sequence generation.
// Lua script removed — it was executing on every single message (costly).
let seq: i64 = redis::cmd("INCR")
.arg(&seq_key)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("seq Lua script: {}", e)))?;
.map_err(|e| RoomError::Internal(format!("seq INCR: {}", e)))?;
// Reconciliation check: if DB is ahead of Redis (e.g. server restart wiped
// Redis), bump Redis to stay in sync. This query is only hit on the rare
// cross-server handoff case, not on every request.
use models::rooms::room_message::{Column as RmCol, Entity as RoomMessage};
use sea_orm::EntityTrait;
let db_seq: Option<Option<Option<i64>>> = RoomMessage::find()
.filter(RmCol::Room.eq(room_id))
.select_only()
.column_as(RmCol::Seq.max(), "max_seq")
.into_tuple::<Option<Option<i64>>>()
.one(db)
.await?
.map(|r| r);
let db_seq = db_seq.flatten().flatten().unwrap_or(0);
// DB reconciliation: only check every 1000 messages, not on every request.
// This handles the rare cross-server handoff case (Redis restart wipe).
if seq % 1000 == 0 {
use models::rooms::room_message::{Column as RmCol, Entity as RoomMessage};
use sea_orm::EntityTrait;
let db_seq: Option<Option<Option<i64>>> = RoomMessage::find()
.filter(RmCol::Room.eq(room_id))
.select_only()
.column_as(RmCol::Seq.max(), "max_seq")
.into_tuple::<Option<Option<i64>>>()
.one(db)
.await?
.map(|r| r);
let db_seq = db_seq.flatten().flatten().unwrap_or(0);
if db_seq >= seq {
// Another server handled this room while we were idle — catch up.
let _: String = redis::cmd("SET")
.arg(&seq_key)
.arg(db_seq + 1)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("SET seq: {}", e)))?;
return Ok(db_seq + 1);
if db_seq >= seq {
let _: String = redis::cmd("SET")
.arg(&seq_key)
.arg(db_seq + 1)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("seq SET: {}", e)))?;
return Ok(db_seq + 1);
}
}
Ok(seq)

View File

@ -33,6 +33,8 @@ pub enum RoomEventType {
ReadReceipt,
ReactionAdded,
ReactionRemoved,
TypingStart,
TypingStop,
}
impl RoomEventType {
@ -55,6 +57,8 @@ impl RoomEventType {
RoomEventType::ReadReceipt => "read_receipt",
RoomEventType::ReactionAdded => "reaction_added",
RoomEventType::ReactionRemoved => "reaction_removed",
RoomEventType::TypingStart => "typing_start",
RoomEventType::TypingStop => "typing_stop",
}
}
@ -77,6 +81,8 @@ impl RoomEventType {
"read_receipt" => Some(RoomEventType::ReadReceipt),
"reaction_added" => Some(RoomEventType::ReactionAdded),
"reaction_removed" => Some(RoomEventType::ReactionRemoved),
"typing_start" => Some(RoomEventType::TypingStart),
"typing_stop" => Some(RoomEventType::TypingStop),
_ => None,
}
}

View File

@ -7,12 +7,11 @@ use tonic::{transport::Server, Request, Response, Status};
use tracing::{info_span, Instrument};
use super::generated::admin::{
GetMetricsRequest, GetMetricsResponse, GetUserInfoRequest, GetUserInfoResponse,
GetUserStatusRequest, GetUserStatusResponse, GetWorkspaceOnlineUsersRequest,
GetWorkspaceOnlineUsersResponse, InstanceMetrics, IsUserOnlineRequest, IsUserOnlineResponse,
KickUserFromWorkspaceRequest, KickUserFromWorkspaceResponse, KickUserRequest, KickUserResponse,
ListUserSessionsRequest, ListUserSessionsResponse, ListWorkspaceSessionsRequest,
ListWorkspaceSessionsResponse, ExportMetricsCsvRequest, ExportMetricsCsvResponse,
GetUserInfoRequest, GetUserInfoResponse, GetUserStatusRequest, GetUserStatusResponse,
GetWorkspaceOnlineUsersRequest, GetWorkspaceOnlineUsersResponse, IsUserOnlineRequest,
IsUserOnlineResponse, KickUserFromWorkspaceRequest, KickUserFromWorkspaceResponse,
KickUserRequest, KickUserResponse, ListUserSessionsRequest, ListUserSessionsResponse,
ListWorkspaceSessionsRequest, ListWorkspaceSessionsResponse,
};
use super::generated::admin_session_admin::session_admin_server::{
SessionAdmin, SessionAdminServer,
@ -205,40 +204,6 @@ impl SessionAdmin for SessionAdminService {
.instrument(info_span!("is_user_online", user_id = %user_id))
.await
}
async fn get_metrics(
&self,
req: Request<GetMetricsRequest>,
) -> Result<Response<GetMetricsResponse>, Status> {
let r = req.get_ref();
let instance_filter = r.instance_filter.as_str();
let limit = if r.limit > 0 { r.limit as usize } else { 100 };
async {
let pool = self.session_manager.pool();
let instances = query_all_instance_metrics(pool, instance_filter, limit).await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(GetMetricsResponse { instances }))
}
.instrument(info_span!("get_metrics"))
.await
}
async fn export_metrics_csv(
&self,
req: Request<ExportMetricsCsvRequest>,
) -> Result<Response<ExportMetricsCsvResponse>, Status> {
let instance_filter = req.get_ref().instance_filter.as_str();
async {
let pool = self.session_manager.pool();
let csv = export_all_metrics_csv(pool, instance_filter).await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(ExportMetricsCsvResponse { csv }))
}
.instrument(info_span!("export_metrics_csv"))
.await
}
}
/// Default gRPC admin port.
@ -278,185 +243,3 @@ pub fn spawn(
let _ = shutdown_rx.recv().await;
})
}
// ---------------------------------------------------------------------------
// Metrics helpers — mirror of observability::redis_metrics (daily hash format)
// ---------------------------------------------------------------------------
use deadpool_redis::cluster::Pool;
use serde::{Deserialize, Serialize};
/// Snapshot stored in Redis under `metrics:{instance}:{YYYY-MM-DD}` (hash).
#[derive(Debug, Clone, Serialize, Deserialize)]
struct MetricsSnapshot {
instance_id: String,
timestamp_secs: i64,
#[serde(flatten)]
http: std::collections::HashMap<String, serde_json::Value>,
#[serde(flatten)]
room: std::collections::HashMap<String, serde_json::Value>,
}
/// Query metrics for all known instances, optionally filtered by `instance_filter`.
async fn query_all_instance_metrics(
pool: &Pool,
instance_filter: &str,
limit: usize,
) -> anyhow::Result<Vec<InstanceMetrics>> {
let mut conn = pool.get().await?;
// Scan for all daily index keys to extract instance ids
let mut cursor = 0u64;
let mut all_instance_ids = Vec::new();
let mut seen = std::collections::HashSet::new();
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg("metrics:index:*")
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await?;
for k in keys {
// Key format: metrics:index:{instance}:{YYYY-MM-DD}
if let Some(rest) = k.strip_prefix("metrics:index:") {
if let Some((instance_id, _)) = rest.rsplit_once(':') {
if seen.insert(instance_id.to_string()) {
all_instance_ids.push(instance_id.to_string());
}
}
}
}
cursor = new_cursor;
if cursor == 0 {
break;
}
}
let mut results = Vec::new();
for instance_id in all_instance_ids {
if !instance_filter.is_empty() && !instance_id.contains(instance_filter) {
continue;
}
let snapshots = query_instance_metrics(pool, &instance_id, limit).await?;
for (_, payload) in snapshots {
results.push(InstanceMetrics {
instance_id: payload.instance_id,
timestamp_secs: payload.timestamp_secs,
http: payload
.http
.into_iter()
.map(|(k, v)| (k, v.to_string()))
.collect(),
room: payload
.room
.into_iter()
.map(|(k, v)| (k, v.to_string()))
.collect(),
});
}
}
Ok(results)
}
/// Query metrics for a single instance from Redis (daily hash buckets).
async fn query_instance_metrics(
pool: &Pool,
instance_id: &str,
limit: usize,
) -> anyhow::Result<Vec<(i64, MetricsSnapshot)>> {
let mut conn = pool.get().await?;
// Scan for all daily index keys for this instance
let index_pattern = format!("metrics:index:{}:*", instance_id);
let mut day_cursor = 0u64;
let mut day_keys: Vec<String> = Vec::new();
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(day_cursor)
.arg("MATCH")
.arg(&index_pattern)
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await?;
day_keys.extend(keys);
day_cursor = new_cursor;
if day_cursor == 0 {
break;
}
}
// Collect all timestamps from all daily indexes
let mut all_timestamps: Vec<i64> = Vec::new();
for day_key in &day_keys {
let timestamps: Vec<i64> = redis::cmd("ZREVRANGE")
.arg(day_key)
.arg(0)
.arg((limit as isize - 1).max(0))
.query_async(&mut conn)
.await?;
all_timestamps.extend(timestamps);
}
all_timestamps.sort_by_key(|&ts| std::cmp::Reverse(ts));
all_timestamps.truncate(limit);
let mut results = Vec::with_capacity(all_timestamps.len());
for ts in &all_timestamps {
let dt = chrono::DateTime::from_timestamp(*ts, 0)
.unwrap_or_else(chrono::Utc::now);
let day_str = dt.format("%Y-%m-%d").to_string();
let hash_key = format!("metrics:{}:{}", instance_id, day_str);
let fields: Option<std::collections::HashMap<String, String>> = redis::cmd("HGETALL")
.arg(&hash_key)
.query_async(&mut conn)
.await?;
if let Some(fields) = fields {
let mut http = std::collections::HashMap::new();
let mut room = std::collections::HashMap::new();
for (k, v) in &fields {
let json_val: serde_json::Value = serde_json::from_str(v).unwrap_or_else(|_| serde_json::Value::String(v.clone()));
if let Some(stripped) = k.strip_prefix("http_") {
http.insert(stripped.to_string(), json_val);
} else if let Some(stripped) = k.strip_prefix("room_") {
room.insert(stripped.to_string(), json_val);
}
}
results.push((*ts, MetricsSnapshot {
instance_id: instance_id.to_string(),
timestamp_secs: *ts,
http,
room,
}));
}
}
results.sort_by_key(|(ts, _)| *ts);
Ok(results)
}
/// Export all metrics as CSV.
async fn export_all_metrics_csv(pool: &Pool, instance_filter: &str) -> anyhow::Result<String> {
let instances = query_all_instance_metrics(pool, instance_filter, 1000).await?;
let mut rows = Vec::new();
for inst in instances {
let ts = inst.timestamp_secs;
let id = inst.instance_id;
for (k, v) in inst.http {
rows.push(format!("{},{},http_{},{}", id, ts, k, v));
}
for (k, v) in inst.room {
rows.push(format!("{},{},room_{},{}", id, ts, k, v));
}
}
let header = "instance_id,timestamp,metric,value";
if rows.is_empty() {
return Ok(header.to_string());
}
Ok(format!("{}\n{}", header, rows.join("\n")))
}

View File

@ -154,6 +154,7 @@ impl AppService {
let room_metrics = Arc::new(RoomMetrics::default());
let room_manager = Arc::new(room::connection::RoomConnectionManager::new(
room_metrics.clone(),
cache.clone(),
));
let redis_url = config

View File

@ -236,15 +236,28 @@ impl SessionStorage {
.await
.map_err(Self::to_err)?;
let mut sessions = Vec::new();
for id_str in &session_ids {
if let Ok(sid) = Uuid::parse_str(id_str) {
if let Ok(Some(session)) = self.get_session(&sid).await {
sessions.push(session);
}
}
if session_ids.is_empty() {
return Ok(Vec::new());
}
// Batch fetch all sessions in a single MGET instead of N individual GET calls.
let keys: Vec<String> = session_ids
.iter()
.map(|id| format!("{}{}", KEY_CONN, id))
.collect();
let values: Vec<Option<String>> = redis::cmd("MGET")
.arg(&keys)
.query_async(&mut conn)
.await
.map_err(Self::to_err)?;
let sessions: Vec<UserSession> = values
.into_iter()
.flatten()
.filter_map(|v| serde_json::from_str(&v).ok())
.collect();
Ok(sessions)
}
@ -261,15 +274,28 @@ impl SessionStorage {
.await
.map_err(Self::to_err)?;
let mut sessions = Vec::new();
for id_str in &session_ids {
if let Ok(sid) = Uuid::parse_str(id_str) {
if let Ok(Some(session)) = self.get_session(&sid).await {
sessions.push(session);
}
}
if session_ids.is_empty() {
return Ok(Vec::new());
}
// Batch fetch all sessions in a single MGET instead of N individual GET calls.
let keys: Vec<String> = session_ids
.iter()
.map(|id| format!("{}{}", KEY_CONN, id))
.collect();
let values: Vec<Option<String>> = redis::cmd("MGET")
.arg(&keys)
.query_async(&mut conn)
.await
.map_err(Self::to_err)?;
let sessions: Vec<UserSession> = values
.into_iter()
.flatten()
.filter_map(|v| serde_json::from_str(&v).ok())
.collect();
Ok(sessions)
}

View File

@ -13,6 +13,7 @@ import {
PointerSensor,
useSensor,
useSensors,
useDroppable,
type DragEndEvent,
type UniqueIdentifier,
} from '@dnd-kit/core';
@ -141,6 +142,9 @@ const ChannelGroup = memo(function ChannelGroup({
}) {
const ids: UniqueIdentifier[] = rooms.map((r) => `${DRAG_PREFIX}${r.id}`);
// Make the category header a droppable zone so rooms can be dragged onto it
const { setNodeRef: setHeaderRef, isOver: isOverHeader } = useDroppable({ id: categoryName });
return (
<div
className="discord-channel-category"
@ -148,7 +152,8 @@ const ChannelGroup = memo(function ChannelGroup({
onDrop={canReceiveDrops ? () => undefined /* handled by DnD */ : undefined}
>
<button
className={cn('discord-channel-category-header w-full', isCollapsed && 'collapsed')}
ref={setHeaderRef}
className={cn('discord-channel-category-header w-full', isCollapsed && 'collapsed', isOverHeader && 'ring-1 ring-accent')}
onClick={onToggle}
title={isCollapsed ? 'Expand' : 'Collapse'}
>
@ -309,25 +314,30 @@ export const DiscordChannelSidebar = memo(function DiscordChannelSidebar({
[onMoveRoomToCategory],
);
// Group rooms by category
// Group rooms by category — empty categories still show as collapsible groups
const uncategorized = useMemo(
() => rooms.filter((r) => !r.category_info?.name),
() => rooms.filter((r) => !r.category),
[rooms],
);
const categorized = useMemo(
() => rooms.filter((r) => r.category_info?.name),
() => rooms.filter((r) => r.category),
[rooms],
);
const categoryMap = useMemo(() => {
// Start with ALL categories (including empty ones), then merge in rooms
const map = new Map<CatName, RoomWithCategory[]>();
for (const cat of categories) {
if (!map.has(cat.name)) map.set(cat.name, []);
}
for (const room of categorized) {
const name = room.category_info!.name;
if (!map.has(name)) map.set(name, []);
map.get(name)!.push(room);
const catName = room.category_info?.name;
if (catName && map.has(catName)) {
map.get(catName)!.push(room);
}
}
return map;
}, [categorized]);
}, [categorized, categories]);
return (
<div className="discord-channel-sidebar flex flex-col h-full">

View File

@ -59,6 +59,7 @@ export function DiscordChatPanel({ room, isAdmin, onClose, onDelete, onToggleCha
refreshThreads,
roomAiConfigs,
presence,
typingUsers,
} = useRoom();
const messagesEndRef = useRef<HTMLDivElement>(null);
@ -352,6 +353,35 @@ export function DiscordChatPanel({ room, isAdmin, onClose, onDelete, onToggleCha
onCreateThread={handleCreateThread}
/>
{/* Typing indicator — show who is typing */}
{(() => {
const roomTyping = typingUsers?.[room.id] ?? {};
const typingList = Object.entries(roomTyping);
if (typingList.length === 0) return null;
const names = typingList.map(([, v]) => v.username);
const label = names.length === 1
? `${names[0]} is typing...`
: names.length === 2
? `${names[0]} and ${names[1]} are typing...`
: `${names[0]} and ${names.length - 1} others are typing...`;
return (
<div className="px-4 py-1 text-xs text-muted-foreground animate-pulse flex items-center gap-1.5">
<span className="flex gap-0.5">
{[0, 1, 2].map((i) => (
<span
key={i}
className="w-1.5 h-1.5 rounded-full bg-muted-foreground"
style={{
animation: `typing-bounce 1.2s infinite ${i * 0.2}s`,
}}
/>
))}
</span>
{label}
</div>
);
})()}
<MessageInput
ref={messageInputRef}
roomName={room.room_name ?? 'room'}

View File

@ -13,6 +13,8 @@ import type { MessageWithMeta } from '@/contexts';
import { useCallback, useState } from 'react';
import { toast } from 'sonner';
const QUICK_EMOJIS = ['👍', '❤️', '😂', '🎉', '😮'];
interface MessageActionsProps {
message: MessageWithMeta;
isOwner: boolean;
@ -52,7 +54,26 @@ export function MessageActions({
return (
<div className="flex items-start gap-0.5 opacity-0 transition-opacity group-hover:opacity-100">
{/* Add reaction */}
{/* Quick reaction bar — Slack-style hover reveal */}
{QUICK_EMOJIS.map((emoji) => {
const reacted = message.reactions?.find((r) => r.emoji === emoji)?.reacted_by_me;
return (
<Button
key={emoji}
variant="ghost"
size="sm"
onClick={() => handleReaction(emoji)}
className={`size-6 p-0 text-sm hover:bg-accent transition-transform hover:scale-125 ${
reacted ? 'bg-accent' : ''
}`}
title={emoji}
>
{emoji}
</Button>
);
})}
{/* Add more reaction — opens full picker */}
<Popover open={reactionPickerOpen} onOpenChange={setReactionPickerOpen}>
<PopoverTrigger
render={

View File

@ -69,7 +69,7 @@ export const MessageBubble = memo(function MessageBubble({
onOpenUserCard,
onOpenThread,
}: MessageBubbleProps) {
const [showFullText, setShowFullText] = useState(false);
const [showFullText, setShowFullText] = useState(true); // default expanded
const [isEditing, setIsEditing] = useState(false);
const [editContent, setEditContent] = useState(message.content);
const [isSavingEdit, setIsSavingEdit] = useState(false);

View File

@ -8,7 +8,7 @@
import { forwardRef, useImperativeHandle, useMemo, useRef } from 'react';
import { IMEditor } from './editor/IMEditor';
import { useRoom } from '@/contexts';
import type { MessageAST } from './editor/types';
import type { MessageAST, EditorNode } from './editor/types';
import type { IMEditorHandle } from './editor/IMEditor';
export interface MessageInputProps {
@ -26,11 +26,52 @@ export interface MessageInputHandle {
getAttachmentIds: () => string[];
}
// Slash commands available in the editor
const SLASH_COMMANDS = [
{ id: 'ai', label: '/ai', description: 'Ask AI a question', type: 'command' as const },
{ id: 'remind', label: '/remind', description: 'Set a reminder (e.g. /remind 10m Check CI)', type: 'command' as const },
{ id: 'poll', label: '/poll', description: 'Create a poll (e.g. /poll "Question?" A B C)', type: 'command' as const },
{ id: 'code-review', label: '/code-review', description: 'Request AI code review', type: 'command' as const },
];
// Special mention items — @here (online), @channel (all members)
const SPECIAL_MENTIONS = [
{
id: '__here__',
label: 'here',
description: 'Notify online members',
type: 'special_here' as const,
},
{
id: '__channel__',
label: 'channel',
description: 'Notify all members',
type: 'special_channel' as const,
},
];
/** Serialize tiptap AST to backend-parseable string format. */
function serializeMessageAst(ast: MessageAST): string {
return ast.content.map(serializeNode).join('\n');
}
function serializeNode(node: EditorNode): string {
if (node.type === 'text') return node.text;
if (node.type === 'mention') return `@[${node.attrs.type}:${node.attrs.id}:${node.attrs.label}]`;
if (node.type === 'hardBreak') return '\n';
if (node.type === 'file') return ''; // files are sent separately via attachmentIds
if (node.type === 'emoji') return `[emoji:${node.attrs.name}]`;
// Recurse into container nodes (paragraph, bulletList, etc.)
const children = (node as any).content as EditorNode[] | undefined;
if (children) return children.map(serializeNode).join('');
return '';
}
export const MessageInput = forwardRef<MessageInputHandle, MessageInputProps>(function MessageInput(
{ roomName, onSend, replyingTo, onCancelReply },
ref,
) {
const { members, activeRoomId } = useRoom();
const { members, activeRoomId, roomAiConfigs } = useRoom();
// Ref passed to the inner IMEditor
const innerEditorRef = useRef<IMEditorHandle | null>(null);
@ -45,14 +86,6 @@ export const MessageInput = forwardRef<MessageInputHandle, MessageInputProps>(fu
getAttachmentIds: () => innerEditorRef.current?.getAttachmentIds() ?? [],
}), []);
// Slash commands available in the editor
const SLASH_COMMANDS = [
{ id: 'ai', label: '/ai', description: 'Ask AI a question', type: 'command' as const },
{ id: 'remind', label: '/remind', description: 'Set a reminder (e.g. /remind 10m Check CI)', type: 'command' as const },
{ id: 'poll', label: '/poll', description: 'Create a poll (e.g. /poll "Question?" A B C)', type: 'command' as const },
{ id: 'code-review', label: '/code-review', description: 'Request AI code review', type: 'command' as const },
];
// Transform room data into MentionItems — memoized to prevent IMEditor re-creation
const mentionItems = useMemo(() => ({
users: members.map((m) => ({
@ -62,9 +95,14 @@ export const MessageInput = forwardRef<MessageInputHandle, MessageInputProps>(fu
avatar: m.user_info?.avatar_url ?? undefined,
})),
channels: [] as { id: string; label: string; type: 'channel'; avatar?: string }[],
ai: [] as { id: string; label: string; type: 'ai'; avatar?: string }[],
ai: roomAiConfigs.map((cfg) => ({
id: cfg.model,
label: cfg.modelName ?? cfg.model,
type: 'ai' as const,
})),
commands: SLASH_COMMANDS,
}), [members]);
specialMentions: SPECIAL_MENTIONS,
}), [members, roomAiConfigs]);
// File upload handler — POST to /rooms/{room_id}/upload
const handleUploadFile = async (file: File): Promise<{ id: string; url: string }> => {
@ -77,9 +115,10 @@ export const MessageInput = forwardRef<MessageInputHandle, MessageInputProps>(fu
return res.json();
};
// onSend: extract plain text from MessageAST for sending
const handleSend = (text: string, _ast: MessageAST) => {
onSend(text);
// onSend: serialize AST to backend-parseable format
const handleSend = (_text: string, ast: MessageAST) => {
const serialized = serializeMessageAst(ast);
onSend(serialized);
};
return (

View File

@ -108,10 +108,13 @@ export const MessageList = memo(function MessageList({
const result: MessageRow[] = [];
let lastDateKey: string | null = null;
let lastSenderKey: string | null = null;
let lastMessageTime: number | null = null;
const GROUP_GAP_MS = 5 * 60 * 1000; // 5 minutes
for (const message of messages) {
const dateKey = getDateKey(message.send_at);
const senderKey = getSenderKey(message);
const msgTime = new Date(message.send_at).getTime();
if (dateKey !== lastDateKey) {
result.push({
@ -121,9 +124,14 @@ export const MessageList = memo(function MessageList({
});
lastDateKey = dateKey;
lastSenderKey = null;
lastMessageTime = null;
}
const grouped = senderKey === lastSenderKey;
// Group if: same sender AND within 5-minute gap (Discord-style)
const sameSender = senderKey === lastSenderKey;
const withinTimeGap = lastMessageTime !== null && (msgTime - lastMessageTime) < GROUP_GAP_MS;
const grouped = sameSender && withinTimeGap;
result.push({
type: 'message',
message,
@ -132,6 +140,7 @@ export const MessageList = memo(function MessageList({
key: message.id,
});
lastSenderKey = senderKey;
lastMessageTime = msgTime;
}
return result;
}, [messages, replyMap]);

View File

@ -5,12 +5,13 @@
* Colors: Clean modern palette, no Discord reference
*/
import {forwardRef, useCallback, useEffect, useImperativeHandle, useRef, useState} from 'react';
import {forwardRef, useCallback, useEffect, useImperativeHandle, useMemo, useRef, useState} from 'react';
import {EditorContent, Extension, useEditor} from '@tiptap/react';
import StarterKit from '@tiptap/starter-kit';
import Placeholder from '@tiptap/extension-placeholder';
import {CustomEmojiNode} from './EmojiNode';
import type {MentionItem, MentionType, MessageAST} from './types';
import {MentionNodeType} from './MentionNode';
import type {EditorNode, MentionItem, MentionType, MessageAST} from './types';
import {Paperclip, Send, Smile, X} from 'lucide-react';
import {cn} from '@/lib/utils';
import {COMMON_EMOJIS} from '../../shared';
@ -26,6 +27,7 @@ export interface IMEditorProps {
channels: MentionItem[];
ai: MentionItem[];
commands: MentionItem[];
specialMentions?: MentionItem[];
};
onUploadFile?: (file: File) => Promise<{ id: string; url: string }>;
placeholder?: string;
@ -137,30 +139,6 @@ function EmojiPicker({onClose, onSelect, p}: { onClose: () => void; onSelect: (e
);
}
// ─── Keyboard Extension ───────────────────────────────────────────────────────
const KeyboardSend = Extension.create({
name: 'keyboardSend',
addKeyboardShortcuts() {
return {
Enter: ({editor}) => {
if (editor.isEmpty) return true;
const text = editor.getText().trim();
if (!text) return true;
(editor.storage as any).keyboardSend?.onSend?.(text, editor.getJSON() as MessageAST);
return true;
},
'Shift-Enter': ({editor}) => {
editor.chain().focus().setHardBreak().run();
return true;
},
};
},
addStorage() {
return {onSend: null as ((t: string, a: MessageAST) => void) | null};
},
});
// ─── Helpers ─────────────────────────────────────────────────────────────────
function filterMentionItems(all: MentionItem[], q: string): MentionItem[] {
@ -174,20 +152,77 @@ function getBadge(type: MentionType): { label: string; cls: string } | null {
return null;
}
// ─── Mention Dropdown ────────────────────────────────────────────────────────
/** Serialize tiptap AST to backend-parseable string. */
function serializeAstForSend(ast: MessageAST): string {
return ast.content.map(serializeAstNode).join('\n');
}
function serializeAstNode(node: EditorNode): string {
if (node.type === 'text') return node.text;
if (node.type === 'mention') return `@[${node.attrs.type}:${node.attrs.id}:${node.attrs.label}]`;
if (node.type === 'hardBreak') return '\n';
if (node.type === 'emoji') return `[emoji:${node.attrs.name}]`;
if (node.type === 'file') return '';
// Recurse into container nodes (paragraph, bulletList, etc.)
const children = (node as any).content as EditorNode[] | undefined;
if (children) return children.map(serializeAstNode).join('');
return '';
}
// ─── Mention Dropdown (sectioned by type) ────────────────────────────────────
const SECTION_ORDER = ['special_here', 'special_channel', 'ai', 'user', 'channel', 'command'] as const;
const SECTION_LABELS: Record<string, string> = {
special_here: 'Notify',
special_channel: 'Notify',
ai: 'AI',
user: 'Members',
channel: 'Channels',
command: 'Commands',
};
const SPECIAL_TYPES = ['special_here', 'special_channel'];
function MentionDropdown({
items, selectedIndex, onSelect, p, query,
}: {
items, selectedIndex, onSelect, p, query,
}: {
items: MentionItem[];
selectedIndex: number;
onSelect: (item: MentionItem) => void;
p: Palette;
query: string;
}) {
const scrollRef = useRef<HTMLDivElement>(null);
// Auto-scroll selected item into view
useEffect(() => {
if (!scrollRef.current) return;
const selectedEl = scrollRef.current.querySelector(`[data-mention-idx="${selectedIndex}"]`);
if (selectedEl) {
selectedEl.scrollIntoView({ block: 'nearest' });
}
}, [selectedIndex]);
// Group items by section
const sections: Map<string, MentionItem[]> = new Map();
for (const sectionType of SECTION_ORDER) {
const sectionItems = items.filter(
(item) => sectionType === 'special_here' || sectionType === 'special_channel'
? item.type === sectionType
: SPECIAL_TYPES.includes(sectionType)
? false
: item.type === sectionType,
);
if (sectionItems.length > 0) {
sections.set(sectionType, sectionItems);
}
}
// Build flat index map: item → its position in the overall items array
const flatIndexMap = new Map<MentionItem, number>();
items.forEach((item, i) => flatIndexMap.set(item, i));
return (
<div
className="absolute left-0 z-50 overflow-hidden"
className="absolute bottom-full left-0 mb-1 z-50 overflow-hidden"
style={{
background: p.popupBg,
border: `1px solid ${p.popupBorder}`,
@ -202,38 +237,81 @@ function MentionDropdown({
No results for &ldquo;{query}&rdquo;
</div>
) : (
<div className="py-1 max-h-60 overflow-y-auto">
{items.map((item, i) => {
const badge = getBadge(item.type);
return (
<button
key={item.id}
onClick={() => onSelect(item)}
className="w-full flex items-center gap-3 px-3 py-2.5 transition-colors text-left cursor-pointer"
style={{background: i === selectedIndex ? p.popupSelected : 'transparent'}}
>
{item.avatar ? (
<img src={item.avatar} alt={item.label} className="w-7 h-7 rounded-full shrink-0"/>
) : (
<span
className="w-7 h-7 rounded-full shrink-0 flex items-center justify-center text-xs font-semibold"
style={{background: p === DARK ? '#2a2a30' : '#eeeef0', color: p.text}}
<div className="py-1 max-h-60 overflow-y-auto" ref={scrollRef}>
{Array.from(sections.entries()).map(([sectionType, sectionItems], si) => (
<div key={sectionType}>
{/* Section header — only show if there are other sections after or before */}
{sections.size > 1 && (
<div className="px-3 py-1 text-[10px] font-semibold uppercase tracking-wide" style={{color: p.textSubtle}}>
{SECTION_LABELS[sectionType]}
</div>
)}
{sectionItems.map((item) => {
const realIndex = flatIndexMap.get(item) ?? 0;
const isSpecial = SPECIAL_TYPES.includes(item.type);
const icon = item.type === 'special_here' ? '📍' : item.type === 'special_channel' ? '📢' : undefined;
if (isSpecial) {
return (
<button
key={item.id}
data-mention-idx={realIndex}
onClick={() => onSelect(item)}
className="w-full flex items-center gap-3 px-3 py-2.5 transition-colors text-left cursor-pointer"
style={{background: realIndex === selectedIndex ? p.popupSelected : 'transparent'}}
>
<span className="w-7 h-7 rounded-full shrink-0 flex items-center justify-center text-base">
{icon}
</span>
<span className="flex-1 truncate text-sm font-medium" style={{color: p.text}}>
@{item.label}
</span>
{item.description && (
<span className="text-[10px] text-muted-foreground mr-1">
{item.description}
</span>
)}
</button>
);
}
const badge = getBadge(item.type);
return (
<button
key={item.id}
data-mention-idx={realIndex}
onClick={() => onSelect(item)}
className="w-full flex items-center gap-3 px-3 py-2.5 transition-colors text-left cursor-pointer"
style={{background: realIndex === selectedIndex ? p.popupSelected : 'transparent'}}
>
{item.label.charAt(0).toUpperCase()}
</span>
)}
<span className="flex-1 truncate text-sm font-medium" style={{color: p.text}}>
{item.label}
</span>
{badge && (
<span
className={cn('shrink-0 text-[10px] font-bold px-1.5 py-0.5 rounded-full', badge.cls)}>
{badge.label}
</span>
)}
</button>
);
})}
{item.avatar ? (
<img src={item.avatar} alt={item.label} className="w-7 h-7 rounded-full shrink-0"/>
) : (
<span
className="w-7 h-7 rounded-full shrink-0 flex items-center justify-center text-xs font-semibold"
style={{background: p === DARK ? '#2a2a30' : '#eeeef0', color: p.text}}
>
{item.label.charAt(0).toUpperCase()}
</span>
)}
<span className="flex-1 truncate text-sm font-medium" style={{color: p.text}}>
{item.label}
</span>
{badge && (
<span
className={cn('shrink-0 text-[10px] font-bold px-1.5 py-0.5 rounded-full', badge.cls)}>
{badge.label}
</span>
)}
</button>
);
})}
{/* Divider between sections */}
{si < sections.size - 1 && (
<div className="mx-3 my-1 border-t border-border" />
)}
</div>
))}
</div>
)}
</div>
@ -258,25 +336,77 @@ export const IMEditor = forwardRef<IMEditorHandle, IMEditorProps>(function IMEdi
const [, setMentionPos] = useState({top: 0, left: 0});
const [focused, setFocused] = useState(false);
// Refs for keyboard shortcut closures (tiptap can't read React state directly)
const mentionOpenRef = useRef(false);
const mentionIdxRef = useRef(0);
const mentionItemsRef = useRef<MentionItem[]>([]);
const editorRef = useRef<ReturnType<typeof useEditor>>(null);
// Sync refs with state
useEffect(() => { mentionOpenRef.current = mentionOpen; }, [mentionOpen]);
useEffect(() => { mentionIdxRef.current = mentionIdx; }, [mentionIdx]);
useEffect(() => { mentionItemsRef.current = mentionItems2; }, [mentionItems2]);
const wrapRef = useRef<HTMLDivElement>(null);
const allItems = [
...mentionItems.users,
...mentionItems.channels,
// Candidate pools by trigger character
const atPool = useMemo(() => [
...(mentionItems.specialMentions ?? []),
...mentionItems.ai,
...mentionItems.commands,
];
...mentionItems.users,
], [mentionItems.specialMentions, mentionItems.ai, mentionItems.users]);
const hashPool = useMemo(() => [...mentionItems.channels], [mentionItems.channels]);
const slashPool = useMemo(() => [...mentionItems.commands], [mentionItems.commands]);
const selectMention = useCallback((item: MentionItem) => {
const editor = editorRef.current;
if (!editor) return;
if (item.type === 'command') {
// Replace the / prefix with the full command label
editor.chain().focus().insertContent(item.label + ' ').run();
} else {
// Use backend-parseable format: @[type:id:label]
const mentionStr = `@[${item.type}:${item.id}:${item.label}] `;
editor.chain().focus().insertContent(mentionStr).run();
// Delete the trigger + query text first, then insert the mention node
const text = editor.getText();
const {from} = editor.state.selection;
let triggerStart = from;
for (let i = from - 1; i >= 1; i--) {
const c = text[i - 1];
if (c === '@' || c === '#' || c === '/') {
triggerStart = i;
break;
}
if (/\s/.test(c)) break;
}
// Delete from triggerStart-1 to from (the @query / #query / /query text)
if (triggerStart < from) {
editor.chain().focus().deleteRange({from: triggerStart - 1, to: from}).run();
}
// Insert mention node
editor.chain().focus().insertContent({
type: 'mention',
attrs: { id: item.id, label: item.label, type: item.type },
}).insertContent(' ').run();
setMentionOpen(false);
}, []);
const moveMentionIdx = useCallback((delta: number) => {
const len = mentionItemsRef.current.length;
if (len === 0) return;
const next = (mentionIdxRef.current + delta + len) % len;
setMentionIdx(next);
}, []);
const selectCurrentMention = useCallback(() => {
const items = mentionItemsRef.current;
const idx = mentionIdxRef.current;
if (items[idx]) {
selectMention(items[idx]);
}
}, [selectMention]);
const closeMention = useCallback(() => {
setMentionOpen(false);
}, []);
@ -285,7 +415,62 @@ export const IMEditor = forwardRef<IMEditorHandle, IMEditorProps>(function IMEdi
StarterKit.configure({undoRedo: {depth: 100}}),
Placeholder.configure({placeholder}),
CustomEmojiNode,
KeyboardSend,
MentionNodeType,
Extension.create({
name: 'mentionKeyboard',
addKeyboardShortcuts() {
return {
Enter: () => {
if (mentionOpenRef.current) {
selectCurrentMention();
return true;
}
const ed = editorRef.current;
if (!ed) return true;
const ast = ed.getJSON() as MessageAST;
const serialized = serializeAstForSend(ast);
if (!serialized.trim()) return true;
(ed.storage as any).mentionKeyboard?.onSend?.(serialized, ast);
return true;
},
'Shift-Enter': ({editor: ed}) => {
ed.chain().focus().setHardBreak().run();
return true;
},
ArrowUp: () => {
if (mentionOpenRef.current) {
moveMentionIdx(-1);
return true;
}
return false;
},
ArrowDown: () => {
if (mentionOpenRef.current) {
moveMentionIdx(1);
return true;
}
return false;
},
Escape: () => {
if (mentionOpenRef.current) {
closeMention();
return true;
}
return false;
},
Tab: () => {
if (mentionOpenRef.current) {
selectCurrentMention();
return true;
}
return false;
},
};
},
addStorage() {
return {onSend: null as ((t: string, a: MessageAST) => void) | null};
},
}),
],
editorProps: {
handlePaste: (_v, e) => {
@ -311,23 +496,27 @@ export const IMEditor = forwardRef<IMEditorHandle, IMEditorProps>(function IMEdi
const text = ed.getText();
const {from} = ed.state.selection;
// Backward scan from cursor to find trigger character
let ts = from;
let trigger: string | null = null;
for (let i = from - 1; i >= 1; i--) {
const c = text[i - 1];
if (c === '@' || c === '/') {
if (c === '@' || c === '#' || c === '/') {
ts = i;
trigger = c;
break;
}
if (/\s/.test(c)) break;
}
const q = text.slice(ts - 1, from);
if (q.startsWith('@') && q.length > 1) {
const results = filterMentionItems(allItems, q.slice(1));
if (trigger === '@' && q.length >= 1) {
const results = filterMentionItems(atPool, q.slice(1));
setMentionQuery(q.slice(1));
setMentionItems2(results);
setMentionIdx(0);
setMentionOpen(true);
setMentionOpen(results.length > 0);
if (wrapRef.current) {
const sel = window.getSelection();
@ -337,13 +526,27 @@ export const IMEditor = forwardRef<IMEditorHandle, IMEditorProps>(function IMEdi
setMentionPos({top: r.bottom - cr.top + 6, left: Math.max(0, r.left - cr.left)});
}
}
} else if (q.startsWith('/') && q.length > 1) {
// Filter commands by query (e.g. "/ai" matches "ai")
const results = filterMentionItems(mentionItems.commands, q.slice(1));
} else if (trigger === '#' && q.length >= 1) {
const results = filterMentionItems(hashPool, q.slice(1));
setMentionQuery(q.slice(1));
setMentionItems2(results);
setMentionIdx(0);
setMentionOpen(true);
setMentionOpen(results.length > 0);
if (wrapRef.current) {
const sel = window.getSelection();
if (sel?.rangeCount) {
const r = sel.getRangeAt(0).getBoundingClientRect();
const cr = wrapRef.current.getBoundingClientRect();
setMentionPos({top: r.bottom - cr.top + 6, left: Math.max(0, r.left - cr.left)});
}
}
} else if (trigger === '/' && q.length >= 1) {
const results = filterMentionItems(slashPool, q.slice(1));
setMentionQuery(q.slice(1));
setMentionItems2(results);
setMentionIdx(0);
setMentionOpen(results.length > 0);
if (wrapRef.current) {
const sel = window.getSelection();
@ -361,8 +564,13 @@ export const IMEditor = forwardRef<IMEditorHandle, IMEditorProps>(function IMEdi
onBlur: () => setFocused(false),
});
// Store editor ref
useEffect(() => {
if (editor) (editor.storage as any).keyboardSend = {onSend};
editorRef.current = editor;
}, [editor]);
useEffect(() => {
if (editor) (editor.storage as any).mentionKeyboard = {onSend};
}, [editor, onSend]);
const doUpload = async (file: File) => {
@ -392,21 +600,26 @@ export const IMEditor = forwardRef<IMEditorHandle, IMEditorProps>(function IMEdi
};
const send = () => {
if (!editor || editor.isEmpty) return;
const text = editor.getText().trim();
if (!text) return;
onSend(text, editor.getJSON() as MessageAST);
if (!editor) return;
const ast = editor.getJSON() as MessageAST;
const serialized = serializeAstForSend(ast);
if (!serialized.trim()) return;
onSend(serialized, ast);
editor.commands.clearContent();
};
const hasContent = !!editor && editor.state.doc.content.size > 2;
useImperativeHandle(ref, () => ({
focus: () => editor?.commands.focus(),
clearContent: () => editor?.commands.clearContent(),
getContent: () => editor?.getText() ?? '',
insertMention: (type: string, id: string, label: string) => {
if (!editor) return;
const mentionStr = `@[${type}:${id}:${label}] `;
editor.chain().focus().insertContent(mentionStr).run();
editor.chain().focus().insertContent({
type: 'mention',
attrs: { id, label, type },
}).insertContent(' ').run();
},
getAttachmentIds: () => {
if (!editor) return [];
@ -425,7 +638,6 @@ export const IMEditor = forwardRef<IMEditorHandle, IMEditorProps>(function IMEdi
},
}));
const hasContent = !!editor && !editor.isEmpty;
// Dynamic styles
const borderColor = focused ? p.borderFocus : p.border;
@ -464,6 +676,7 @@ export const IMEditor = forwardRef<IMEditorHandle, IMEditorProps>(function IMEdi
{/* Input area */}
<div
className="relative"
onClick={() => editor?.commands.focus()}
style={{
background: p.bg,

View File

@ -0,0 +1,36 @@
/**
* TipTap Mention Node inline atom for @mentions, #channels, /commands.
* Renders via MentionView React NodeView with type-specific coloring.
*/
import { Node, mergeAttributes } from '@tiptap/core';
import { ReactNodeViewRenderer } from '@tiptap/react';
import MentionView from './MentionView';
export const MentionNodeType = Node.create({
name: 'mention',
group: 'inline',
inline: true,
selectable: true,
atom: true,
addAttributes() {
return {
id: { default: null },
label: { default: null },
type: { default: 'user' },
};
},
parseHTML() {
return [{ tag: 'span[data-mention]' }];
},
renderHTML({ HTMLAttributes }) {
return ['span', mergeAttributes({ 'data-mention': '' }, HTMLAttributes)];
},
addNodeView() {
return ReactNodeViewRenderer(MentionView);
},
});

View File

@ -0,0 +1,37 @@
/**
* React NodeView for TipTap mention nodes.
* Renders colored inline labels by mention type.
*/
import type { ReactNodeViewProps } from '@tiptap/react';
import { NodeViewWrapper } from '@tiptap/react';
const TYPE_STYLE: Record<string, { bg: string; text: string; prefix: string }> = {
user: { bg: 'bg-blue-100 dark:bg-blue-900/40', text: 'text-blue-700 dark:text-blue-300', prefix: '@' },
ai: { bg: 'bg-indigo-100 dark:bg-indigo-900/40', text: 'text-indigo-700 dark:text-indigo-300', prefix: '@' },
channel: { bg: 'bg-gray-100 dark:bg-gray-800', text: 'text-gray-600 dark:text-gray-400', prefix: '#' },
special_here: { bg: 'bg-orange-100 dark:bg-orange-900/40', text: 'text-orange-700 dark:text-orange-300', prefix: '@' },
special_channel: { bg: 'bg-orange-100 dark:bg-orange-900/40', text: 'text-orange-700 dark:text-orange-300', prefix: '@' },
command: { bg: 'bg-amber-100 dark:bg-amber-900/30', text: 'text-amber-700 dark:text-amber-300', prefix: '/' },
};
export default function MentionView(props: ReactNodeViewProps) {
const attrs = props.node.attrs as Record<string, string>;
const label = attrs.label ?? '';
const type = attrs.type ?? 'user';
const style = TYPE_STYLE[type] ?? TYPE_STYLE.user;
return (
<NodeViewWrapper className="inline" as="span">
<span
className={`${style.bg} ${style.text} rounded px-1.5 py-0.5 text-sm font-medium select-none cursor-default inline-flex items-center leading-tight`}
data-mention
data-id={attrs.id}
data-label={label}
data-type={type}
>
{style.prefix}{label}
</span>
</NodeViewWrapper>
);
}

View File

@ -2,13 +2,14 @@
* Core types for the IM editor (mentions, files, emojis).
*/
export type MentionType = 'user' | 'channel' | 'ai' | 'command';
export type MentionType = 'user' | 'channel' | 'ai' | 'command' | 'special_here' | 'special_channel';
export interface MentionItem {
id: string;
label: string;
type: MentionType;
avatar?: string;
description?: string; // shown under label in suggestion dropdown
}
export interface FileData {

View File

@ -18,6 +18,8 @@ import {
type RoomPinResponse,
type RoomResponse,
type RoomThreadResponse,
categoryList as restCategoryList,
roomList as restRoomList,
} from '@/client';
import {
createRoomWsClient,
@ -159,6 +161,9 @@ interface RoomContextValue {
/** Room AI configs for @ai: mention suggestions */
roomAiConfigs: RoomAiConfig[];
aiConfigsLoading?: boolean;
/** Typing users in the active room: roomId -> userId -> { username, avatar_url } */
typingUsers: Record<string, Record<string, { username: string; avatar_url?: string; timeoutId?: ReturnType<typeof setTimeout> }>>;
}
const RoomContext = createContext<RoomContextValue | null>(null);
@ -224,6 +229,15 @@ export function RoomProvider({
const [categories, setCategories] = useState<RoomCategoryResponse[]>([]);
const [categoriesLoading, setCategoriesLoading] = useState(false);
// Merge category_info into rooms whenever either changes
const roomsWithCategory = useMemo<RoomWithCategory[]>(() => {
const catMap = new Map(categories.map((c) => [c.id, c]));
return rooms.map((r) => ({
...r,
category_info: r.category ? (catMap.get(r.category) ?? null) : null,
}));
}, [rooms, categories]);
const [activeRoom, setActiveRoomState] = useState<RoomResponse | null>(null);
const [messages, setMessages] = useState<MessageWithMeta[]>([]);
@ -413,6 +427,9 @@ export function RoomProvider({
// User presence map: user_id -> status
const [presence, setPresence] = useState<PresenceMap>({});
// Typing users map: roomId -> Map<userId, { username, avatar_url, timeoutId }>
const [typingUsers, setTypingUsers] = useState<Record<string, Record<string, { username: string; avatar_url?: string; timeoutId?: ReturnType<typeof setTimeout> }>>>({});
const [streamingContent, setStreamingContent] = useState<Map<string, string>>(new Map());
// Project repos for @repository: mention suggestions
@ -638,6 +655,41 @@ export function RoomProvider({
if (payload.room_id !== activeRoomIdRef.current) return;
setPresence((prev) => ({ ...prev, [payload.user_id]: payload.status }));
},
onTypingStart: (payload) => {
if (payload.room_id !== activeRoomIdRef.current) return;
if (payload.user_id === user?.uid) return; // Don't show self
setTypingUsers((prev) => {
const roomMap = prev[payload.room_id] ?? {};
// Clear existing timeout for this user
const existing = roomMap[payload.user_id];
if (existing?.timeoutId) clearTimeout(existing.timeoutId);
const timeoutId = setTimeout(() => {
setTypingUsers((p) => {
const rm = { ...p[payload.room_id] };
delete rm[payload.user_id];
return { ...p, [payload.room_id]: rm };
});
}, 4000);
return {
...prev,
[payload.room_id]: {
...roomMap,
[payload.user_id]: { username: payload.username, avatar_url: payload.avatar_url, timeoutId },
},
};
});
},
onTypingStop: (payload) => {
if (payload.room_id !== activeRoomIdRef.current) return;
setTypingUsers((prev) => {
const roomMap = prev[payload.room_id] ?? {};
const existing = roomMap[payload.user_id];
if (existing?.timeoutId) clearTimeout(existing.timeoutId);
const newRoomMap = { ...roomMap };
delete newRoomMap[payload.user_id];
return { ...prev, [payload.room_id]: newRoomMap };
});
},
onStatusChange: (status) => {
setWsStatus(status);
if (status === 'closed' || status === 'error') {
@ -682,16 +734,20 @@ export function RoomProvider({
}, []);
const fetchRooms = useCallback(async () => {
const client = wsClientRef.current;
if (!projectName || !client) {
if (!projectName) {
setRooms([]);
return;
}
setRoomsLoading(true);
setRoomsError(null);
try {
const resp = await client.roomList(projectName);
setRooms(resp.map((r) => ({ ...r, category_info: null })));
const resp = await restRoomList({ path: { project_name: projectName } });
const data = resp.data?.data;
if (Array.isArray(data)) {
setRooms(data.map((r) => ({ ...r, category_info: null })));
} else {
setRooms([]);
}
} catch (err) {
setRoomsError(err instanceof Error ? err : new Error('Failed to load rooms'));
} finally {
@ -704,12 +760,12 @@ export function RoomProvider({
}, [fetchRooms]);
const fetchCategories = useCallback(async () => {
const client = wsClientRef.current;
if (!projectName || !client) return;
if (!projectName) return;
setCategoriesLoading(true);
try {
const resp = await client.categoryList(projectName);
setCategories(resp);
const resp = await restCategoryList({ path: { project_name: projectName } });
const data = resp.data?.data;
setCategories(Array.isArray(data) ? data : []);
} catch (error) {
handleRoomError('Load categories', error);
} finally {
@ -1236,7 +1292,7 @@ export function RoomProvider({
wsClient: wsClientRef.current,
connectWs,
disconnectWs,
rooms,
rooms: roomsWithCategory,
roomsLoading,
roomsError,
refreshRooms: fetchRooms,
@ -1283,6 +1339,7 @@ export function RoomProvider({
reposLoading,
roomAiConfigs,
aiConfigsLoading,
typingUsers,
}),
[
wsStatus,
@ -1290,7 +1347,7 @@ export function RoomProvider({
connectWs,
disconnectWs,
wsClientRef.current,
rooms,
roomsWithCategory,
roomsLoading,
roomsError,
fetchRooms,
@ -1336,6 +1393,7 @@ export function RoomProvider({
reposLoading,
roomAiConfigs,
aiConfigsLoading,
typingUsers,
],
);

View File

@ -78,6 +78,8 @@ export interface RoomWsCallbacks {
onMessagePinned?: (payload: import('./ws-protocol').MessagePinnedPayload) => void;
onMessageUnpinned?: (payload: import('./ws-protocol').MessageUnpinnedPayload) => void;
onUserPresence?: (payload: UserPresencePayload) => void;
onTypingStart?: (payload: import('./ws-protocol').TypingStartPayload) => void;
onTypingStop?: (payload: import('./ws-protocol').TypingStopPayload) => void;
onStatusChange?: (status: RoomWsStatus) => void;
onError?: (error: Error) => void;
/** Called each time the client sends a heartbeat ping */
@ -961,6 +963,14 @@ export class RoomWsClient {
return url;
}
/** Send a typing_start / typing_stop event directly via WebSocket push (no response needed). */
sendTyping(roomId: string, action: 'start' | 'stop'): void {
if (this.ws && this.status === 'open') {
const event = { type: 'event', event: `typing_${action}`, room_id: roomId };
this.ws.send(JSON.stringify(event));
}
}
private handleMessage(rawText: string): void {
// Handle raw JSON pong before full parsing — resets heartbeat
if (rawText.trim() === '{"type":"pong"}') {
@ -1033,6 +1043,22 @@ export class RoomWsClient {
status: ((event.data as { status?: string })?.status ?? 'offline') as 'online' | 'away' | 'dnd' | 'offline',
});
break;
case 'typing.start':
case 'typing_start':
this.callbacks.onTypingStart?.({
room_id: event.room_id ?? '',
user_id: (event.data as { user_id?: string })?.user_id ?? '',
username: (event.data as { username?: string })?.username ?? '',
avatar_url: (event.data as { avatar_url?: string })?.avatar_url,
});
break;
case 'typing.stop':
case 'typing_stop':
this.callbacks.onTypingStop?.({
room_id: event.room_id ?? '',
user_id: (event.data as { user_id?: string })?.user_id ?? '',
});
break;
default:
// Unknown event type - ignore silently
break;

View File

@ -133,7 +133,20 @@ export type WsResponseData =
| NotificationListData
| MentionListData
| SubscribeData
| UserInfo[];
| UserInfo[]
| null;
export interface TypingStartPayload {
room_id: string;
user_id: string;
username: string;
avatar_url?: string;
}
export interface TypingStopPayload {
room_id: string;
user_id: string;
}
export interface WsEvent {
type: 'event';
@ -155,6 +168,8 @@ export type WsEventPayload =
| { type: 'message_pinned'; data: MessagePinnedPayload }
| { type: 'message_unpinned'; data: MessageUnpinnedPayload }
| { type: 'user_presence'; data: UserPresencePayload }
| { type: 'typing_start'; data: TypingStartPayload }
| { type: 'typing_stop'; data: TypingStopPayload }
| { type: string; data: unknown }; // catch-all for unknown events
export interface RoomMessagePayload {