refactor(room): Discord layout and room WebSocket client refactor

- Refactor room-context.tsx with improved WebSocket state management
- Enhance room-ws-client.ts with reconnect logic and message handling
- Update Discord layout components with message editor improvements
- Add WebSocket universal endpoint support in ws_universal.rs
This commit is contained in:
ZhenYi 2026-04-18 19:05:21 +08:00
parent 0cccec33b2
commit 821b0e998d
7 changed files with 264 additions and 208 deletions

View File

@ -98,11 +98,7 @@ impl ChatService {
frequency_penalty: Some(frequency_penalty as f32),
presence_penalty: Some(presence_penalty as f32),
stream: Some(false),
reasoning_effort: Some(if think {
ReasoningEffort::High
} else {
ReasoningEffort::None
}),
reasoning_effort: if think { Some(ReasoningEffort::High) } else { None },
tools: if tools_enabled {
Some(
tools
@ -232,11 +228,7 @@ impl ChatService {
frequency_penalty: Some(frequency_penalty as f32),
presence_penalty: Some(presence_penalty as f32),
stream: Some(true),
reasoning_effort: Some(if think {
ReasoningEffort::High
} else {
ReasoningEffort::None
}),
reasoning_effort: if think { Some(ReasoningEffort::High) } else { None },
tools: if tools_enabled {
Some(
tools

View File

@ -289,6 +289,15 @@ pub async fn ws_universal(
continue;
}
// Handle JSON-level ping (application heartbeat).
// Client sends {"type":"ping"} and we reply with {"type":"pong"}.
if text.trim() == r#"{"type":"ping"}"# {
if session.text(r#"{"type":"pong"}"#).await.is_err() { break; }
last_activity = Instant::now();
last_heartbeat = Instant::now();
continue;
}
match serde_json::from_str::<WsRequest>(&text) {
Ok(request) => {
let action_str = request.action.to_string();

View File

@ -277,6 +277,10 @@ export function DiscordChatPanel({ room, isAdmin, onClose, onDelete }: DiscordCh
onRevoke={handleRevoke}
onReply={setReplyingTo}
onMention={undefined}
onOpenUserCard={({ userId, username }) => {
messageInputRef.current?.insertMention('user', userId, username);
messageInputRef.current?.focus();
}}
onOpenThread={handleOpenThread}
onCreateThread={handleCreateThread}
/>
@ -294,7 +298,12 @@ export function DiscordChatPanel({ room, isAdmin, onClose, onDelete }: DiscordCh
<DiscordMemberList
members={members}
membersLoading={membersLoading}
onMemberClick={() => {}}
onMemberClick={({ user, user_info, role }) => {
const label = user_info?.username ?? user;
const type = role === 'ai' ? 'ai' : 'user';
messageInputRef.current?.insertMention(type, user, label);
messageInputRef.current?.focus();
}}
aiConfigs={roomAiConfigs}
/>
)}

View File

@ -21,6 +21,7 @@ export interface MessageInputHandle {
focus: () => void;
clearContent: () => void;
getContent: () => string;
insertMention: (type: string, id: string, label: string) => void;
}
export const MessageInput = forwardRef<MessageInputHandle, MessageInputProps>(function MessageInput(

View File

@ -33,6 +33,7 @@ export interface IMEditorHandle {
focus: () => void;
clearContent: () => void;
getContent: () => string;
insertMention: (type: string, id: string, label: string) => void;
}
// ─── Color System (Google AI Studio / Linear palette, no Discord) ────────────
@ -368,6 +369,11 @@ export const IMEditor = forwardRef<IMEditorHandle, IMEditorProps>(function IMEdi
focus: () => editor?.commands.focus(),
clearContent: () => editor?.commands.clearContent(),
getContent: () => editor?.getText() ?? '',
insertMention: (type: string, id: string, label: string) => {
if (!editor) return;
const mentionStr = `@[${type}:${id}:${label}] `;
editor.chain().focus().insertContent(mentionStr).run();
},
}));
const hasContent = !!editor && !editor.isEmpty;

View File

@ -30,13 +30,6 @@ import {
} from '@/lib/room-ws-client';
import { requestWsToken } from '@/lib/ws-token';
import { useUser } from '@/contexts';
import {
saveMessage,
saveMessages,
loadMessages as loadMessagesFromIdb,
loadOlderMessagesFromIdb,
deleteMessage as deleteMessageFromIdb,
} from '@/lib/storage/indexed-db';
export type { RoomWsStatus, RoomWsClient } from '@/lib/room-ws-client';
@ -249,23 +242,17 @@ export function RoomProvider({
}
}, [activeRoomId]);
// ── Subscribe to room (WS must already be connected) ───────────────────────
useEffect(() => {
const client = wsClientRef.current;
if (!activeRoomId || !client) return;
const setup = async () => {
// IDB load does NOT need WS — show cached messages immediately.
// loadMore checks IDB first, then falls back to API (WS-first + HTTP).
// Load messages via WS (with HTTP fallback)
loadMore(null);
// Connect WS in parallel for real-time push + reactions batch-fetch.
// connect() is idempotent — no-op if already connecting/open.
// subscribeRoom uses WS-first request() with HTTP fallback.
await client.connect();
if (activeRoomIdRef.current !== activeRoomId) return;
// Subscribe to room events. connect() is already called at the provider
// level — subscribe/unsubscribe only manage per-room event routing.
client.subscribeRoom(activeRoomId).catch(() => {});
};
setup().catch(() => {});
return () => {
client.unsubscribeRoom(activeRoomId).catch(() => {});
@ -284,9 +271,17 @@ export function RoomProvider({
) => {
const msgIds = msgs.map((m) => m.id);
if (msgIds.length === 0) return;
client
.reactionListBatch(roomId, msgIds)
.then((reactionResults: ReactionListData[]) => {
const doLoad = async () => {
let reactionResults: ReactionListData[];
if (client.getStatus() === 'open') {
try {
reactionResults = await client.reactionListBatchWs(roomId, msgIds);
} catch {
reactionResults = await client.reactionListBatch(roomId, msgIds);
}
} else {
reactionResults = await client.reactionListBatch(roomId, msgIds);
}
const reactionMap = new Map<string, ReactionListData['reactions']>();
for (const result of reactionResults) {
if (result.reactions.length > 0) {
@ -300,10 +295,8 @@ export function RoomProvider({
),
);
}
})
.catch(() => {
// Non-fatal: WS push will keep reactions up to date
});
};
doLoad().catch(() => {});
};
const loadMore = useCallback(
@ -321,55 +314,27 @@ export function RoomProvider({
const isInitial = cursor === null || cursor === undefined;
const limit = isInitial ? 200 : 50;
// --- Initial load: try IndexedDB first for instant render ---
if (isInitial) {
const cached = await loadMessagesFromIdb(activeRoomId);
if (cached.length > 0) {
setMessages(cached);
setIsTransitioningRoom(false);
const minSeq = cached[0].seq;
setNextCursor(minSeq > 0 ? minSeq - 1 : null);
setIsLoadingMore(false);
// No API call needed — WS will push any new messages that arrived while away.
// Fetch reactions via WS (with HTTP fallback) so reactions appear without extra latency.
thisLoadReactions(activeRoomId, client, cached);
return;
}
}
// --- Load older history: try IDB first, then fall back to API ---
if (!isInitial && cursor != null) {
const idbMessages = await loadOlderMessagesFromIdb(activeRoomId, cursor, limit);
if (idbMessages.length > 0) {
setMessages((prev) => {
if (abortController.signal.aborted) return prev;
const existingIds = new Set(prev.map((m) => m.id));
const filtered = idbMessages.filter((m) => !existingIds.has(m.id));
let merged = [...filtered, ...prev];
merged.sort((a, b) => a.seq - b.seq);
if (merged.length > MAX_MESSAGES_IN_MEMORY) {
merged = merged.slice(-MAX_MESSAGES_IN_MEMORY);
}
return merged;
});
const oldest = idbMessages[0];
setNextCursor(oldest.seq > 0 ? oldest.seq - 1 : null);
if (idbMessages.length < limit) {
setIsHistoryLoaded(true);
}
setIsLoadingMore(false);
// Also fetch reactions for the IDB-loaded history messages.
thisLoadReactions(activeRoomId, client, idbMessages);
return;
}
// IDB empty for this range — fall through to API
}
// --- API fetch ---
const resp = await client.messageList(activeRoomId, {
// Try WebSocket first; fall back to HTTP on failure
let resp: import('@/lib/room-ws-client').RoomMessageListResponse;
if (client.getStatus() === 'open') {
try {
resp = await client.messageListWs(activeRoomId, {
beforeSeq: cursor ?? undefined,
limit,
});
} catch {
// WS failed — fall back to HTTP
resp = await client.messageList(activeRoomId, {
beforeSeq: cursor ?? undefined,
limit,
});
}
} else {
resp = await client.messageList(activeRoomId, {
beforeSeq: cursor ?? undefined,
limit,
});
}
if (abortController.signal.aborted) return;
@ -396,16 +361,12 @@ export function RoomProvider({
return merged;
});
if (newMessages.length > 0) {
saveMessages(activeRoomId, newMessages).catch(() => {});
}
if (resp.messages.length < limit) {
setIsHistoryLoaded(true);
}
setNextCursor(resp.messages.length > 0 ? resp.messages[resp.messages.length - 1].seq : null);
// Fetch reactions for all loaded messages (WS-first with HTTP fallback)
// Fetch reactions for all loaded messages
thisLoadReactions(activeRoomId, client, newMessages);
} catch (error) {
if (abortController.signal.aborted) return;
@ -434,8 +395,6 @@ export function RoomProvider({
// Room AI configs for @ai: mention suggestions
const [roomAiConfigs, setRoomAiConfigs] = useState<RoomAiConfig[]>([]);
const [aiConfigsLoading, setAiConfigsLoading] = useState(false);
// Available models (for looking up AI model names) — TODO: wire up once model sync API is available
const [_availableModels, _setAvailableModels] = useState<{ id: string; name: string }[]>([]);
useEffect(() => {
const baseUrl = import.meta.env.VITE_API_BASE_URL ?? window.location.origin;
@ -446,37 +405,28 @@ export function RoomProvider({
// Use ref to get current activeRoomId to avoid stale closure
if (payload.room_id === activeRoomIdRef.current) {
setMessages((prev) => {
// Check if this is a reaction-update event (same ID, different reactions).
// publish_reaction_event sends RoomMessageEvent with reactions field set.
const existingIdx = prev.findIndex((m) => m.id === payload.id);
if (existingIdx !== -1) {
// Message already exists — update reactions if provided.
// Reaction events have empty content/sender_type.
// Message already exists — update reactions if provided
if (payload.reactions !== undefined) {
const updated = [...prev];
updated[existingIdx] = { ...updated[existingIdx], reactions: payload.reactions };
const msg = updated[existingIdx];
saveMessage(msg).catch(() => {});
return updated;
}
// Duplicate of a real message — ignore
return prev;
}
// Also check if there's an optimistic message with the same seq that should be replaced
// Replace optimistic message with server-confirmed one
const optimisticIdx = prev.findIndex(
(m) => m.isOptimistic && m.seq === payload.seq && m.seq !== 0,
);
if (optimisticIdx !== -1) {
// Replace optimistic message with confirmed one
const confirmed: MessageWithMeta = {
...wsMessageToUiMessage(payload),
reactions: prev[optimisticIdx].reactions,
};
const next = [...prev];
next[optimisticIdx] = confirmed;
// Remove optimistic from IDB, save confirmed
deleteMessageFromIdb(prev[optimisticIdx].id).catch(() => {});
saveMessage(confirmed).catch(() => {});
return next;
}
const newMsg = wsMessageToUiMessage(payload);
@ -487,39 +437,29 @@ export function RoomProvider({
}
return updated;
});
// Persist to IndexedDB
const msg = wsMessageToUiMessage(payload);
saveMessage(msg).catch(() => {});
}
},
onAiStreamChunk: (chunk) => {
if (chunk.done) {
// When streaming is done, update the message content and remove from streaming
setStreamingContent((prev) => {
const next = new Map(prev);
next.delete(chunk.message_id);
return next;
});
setMessages((prev) => {
const updated = prev.map((m) =>
setMessages((prev) =>
prev.map((m) =>
m.id === chunk.message_id
? { ...m, content: chunk.content, is_streaming: false }
: m,
),
);
// Persist final content to IndexedDB
const msg = updated.find((m) => m.id === chunk.message_id);
if (msg) saveMessage(msg).catch(() => {});
return updated;
});
} else {
// Accumulate streaming content
setStreamingContent((prev) => {
const next = new Map(prev);
const existing = next.get(chunk.message_id) ?? '';
next.set(chunk.message_id, existing + chunk.content);
return next;
});
// Create streaming message placeholder if it doesn't exist
setMessages((prev) => {
if (prev.some((m) => m.id === chunk.message_id)) {
return prev;
@ -539,48 +479,34 @@ export function RoomProvider({
}
},
onRoomReactionUpdated: (payload: RoomReactionUpdatedPayload) => {
// Guard: ignore events for rooms that are no longer active.
// Without this, a WS event arriving after room switch could update
// the wrong room's message list (same message ID, different room).
if (!activeRoomIdRef.current) return;
setMessages((prev) => {
const existingIdx = prev.findIndex((m) => m.id === payload.message_id);
if (existingIdx === -1) return prev;
const updated = [...prev];
updated[existingIdx] = { ...updated[existingIdx], reactions: payload.reactions };
// Persist reaction update to IndexedDB
saveMessage(updated[existingIdx]).catch(() => {});
return updated;
});
},
onMessageEdited: async (payload) => {
// The event only contains message_id and edited_at.
// Optimistically update edited_at, then fetch the full message from the API.
if (payload.room_id !== activeRoomIdRef.current) return;
const client = wsClientRef.current;
if (!client) return;
// Capture original edited_at for rollback if fetch fails
let rollbackEditedAt: string | null = null;
setMessages((prev) => {
const msg = prev.find((m) => m.id === payload.message_id);
rollbackEditedAt = msg?.edited_at ?? null;
const updated = prev.map((m) =>
return prev.map((m) =>
m.id === payload.message_id ? { ...m, edited_at: payload.edited_at } : m,
);
const saved = updated.find((m) => m.id === payload.message_id);
if (saved) saveMessage(saved).catch(() => {});
return updated;
});
// Fetch full updated message from API
try {
const updatedMsg = await client.messageGet(payload.message_id);
if (!updatedMsg) return;
setMessages((prev) => {
const merged = prev.map((m) =>
setMessages((prev) =>
prev.map((m) =>
m.id === payload.message_id
? {
...m,
@ -589,14 +515,9 @@ export function RoomProvider({
edited_at: payload.edited_at,
}
: m,
),
);
// Persist to IndexedDB
const saved = merged.find((m) => m.id === payload.message_id);
if (saved) saveMessage(saved).catch(() => {});
return merged;
});
} catch {
// Revert edited_at if the fetch failed
if (rollbackEditedAt !== null) {
setMessages((prev) =>
prev.map((m) =>
@ -608,8 +529,8 @@ export function RoomProvider({
},
onMessageRevoked: async (payload) => {
if (payload.room_id !== activeRoomIdRef.current) return;
setMessages((prev) => {
const updated = prev.map((m) =>
setMessages((prev) =>
prev.map((m) =>
m.id === payload.message_id
? {
...m,
@ -619,12 +540,8 @@ export function RoomProvider({
display_content: '',
}
: m,
),
);
// Persist to IndexedDB
const msg = updated.find((m) => m.id === payload.message_id);
if (msg) saveMessage(msg).catch(() => {});
return updated;
});
},
onMessagePinned: async (payload) => {
if (payload.room_id !== activeRoomIdRef.current) return;
@ -670,18 +587,14 @@ export function RoomProvider({
};
}, [wsToken]);
// ── Connect WS whenever a new client is created ─────────────────────────────
// Intentionally depends on wsClient (not wsClientRef) so a new client triggers connect().
// connect() is idempotent — no-op if already connecting/open.
useEffect(() => {
// NOTE: intentionally omitted [wsClient] from deps.
// In React StrictMode the component mounts twice — if wsClient were a dep,
// the first mount's effect would connect client-1, then StrictMode cleanup
// would disconnect it, then the second mount's effect would connect client-2,
// then immediately the first mount's *second* cleanup would fire and
// disconnect client-2 — leaving WS unconnected. Using a ref for the initial
// connect avoids this. The client is always ready by the time this runs.
wsClientRef.current?.connect().catch((e) => {
console.error('[RoomContext] WS connect error:', e);
});
}, []);
}, [wsClient]);
const connectWs = useCallback(async () => {
const client = wsClientRef.current;
@ -899,8 +812,6 @@ export function RoomProvider({
};
setMessages((prev) => [...prev, optimisticMsg]);
// Persist optimistic message to IndexedDB so it's not lost on refresh
saveMessage(optimisticMsg).catch(() => {});
try {
const confirmedMsg = await client.messageCreate(activeRoomId, content, {
@ -918,10 +829,6 @@ export function RoomProvider({
isOptimistic: false,
reactions: [],
};
// Remove optimistic from IDB
deleteMessageFromIdb(optimisticId).catch(() => {});
// Save confirmed to IDB
saveMessage(confirmed).catch(() => {});
return [...without, confirmed];
});
} catch (err) {
@ -944,7 +851,6 @@ export function RoomProvider({
const client = wsClientRef.current;
if (!client) return;
// Capture original content for rollback on server rejection
let rollbackContent: string | null = null;
setMessages((prev) => {
const msg = prev.find((m) => m.id === messageId);
@ -955,15 +861,16 @@ export function RoomProvider({
});
try {
if (client.getStatus() === 'open') {
try {
await client.messageUpdateWs(messageId, content);
} catch {
await client.messageUpdate(messageId, content);
// Persist updated content to IndexedDB
setMessages((prev) => {
const msg = prev.find((m) => m.id === messageId);
if (msg) saveMessage(msg).catch(() => {});
return prev;
});
}
} else {
await client.messageUpdate(messageId, content);
}
} catch (err) {
// Rollback optimistic update on server rejection
if (rollbackContent !== null) {
setMessages((prev) =>
prev.map((m) =>
@ -982,7 +889,6 @@ export function RoomProvider({
const client = wsClientRef.current;
if (!client) return;
// Optimistic removal: hide message immediately
let rollbackMsg: MessageWithMeta | null = null;
setMessages((prev) => {
rollbackMsg = prev.find((m) => m.id === messageId) ?? null;
@ -990,13 +896,18 @@ export function RoomProvider({
});
try {
if (client.getStatus() === 'open') {
try {
await client.messageRevokeWs(messageId);
} catch {
await client.messageRevoke(messageId);
deleteMessageFromIdb(messageId).catch(() => {});
}
} else {
await client.messageRevoke(messageId);
}
} catch (err) {
// Rollback: restore message on server rejection
if (rollbackMsg) {
setMessages((prev) => [...prev, rollbackMsg!]);
saveMessage(rollbackMsg!).catch(() => {});
}
handleRoomError('Delete message', err);
}
@ -1160,25 +1071,10 @@ export function RoomProvider({
}
}, [activeRoomId]);
// Fetch available models (for AI model name lookup)
const fetchAvailableModels = useCallback(async () => {
try {
const resp = await (await import('@/client')).modelList({});
const inner = (resp.data as { data?: { data?: { id: string; name: string }[] } } | undefined);
_setAvailableModels(inner?.data?.data ?? []);
} catch {
// Non-fatal
}
}, []);
useEffect(() => {
fetchProjectRepos();
}, [fetchProjectRepos]);
useEffect(() => {
fetchAvailableModels();
}, [fetchAvailableModels]);
useEffect(() => {
fetchRoomAiConfigs();
}, [fetchRoomAiConfigs]);

View File

@ -77,6 +77,8 @@ export interface RoomWsCallbacks {
onMessageUnpinned?: (payload: import('./ws-protocol').MessageUnpinnedPayload) => void;
onStatusChange?: (status: RoomWsStatus) => void;
onError?: (error: Error) => void;
/** Called each time the client sends a heartbeat ping */
onHeartbeat?: () => void;
}
export class RoomWsClient {
@ -93,7 +95,11 @@ export class RoomWsClient {
private readonly reconnectBaseDelay: number;
private readonly reconnectMaxDelay: number;
private readonly requestTimeout: number;
private readonly heartbeatInterval: number;
private readonly heartbeatTimeout: number;
private wsToken: string | null = null;
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
private lastHeartbeat: number = 0;
constructor(
baseUrl: string,
@ -102,6 +108,8 @@ export class RoomWsClient {
reconnectBaseDelay?: number;
reconnectMaxDelay?: number;
requestTimeout?: number;
heartbeatInterval?: number;
heartbeatTimeout?: number;
wsToken?: string;
} = {},
) {
@ -110,6 +118,9 @@ export class RoomWsClient {
this.reconnectBaseDelay = options.reconnectBaseDelay ?? 1000;
this.reconnectMaxDelay = options.reconnectMaxDelay ?? 15000;
this.requestTimeout = options.requestTimeout ?? 30_000;
// Heartbeat: send a ping every 25s, timeout after 55s of inactivity
this.heartbeatInterval = options.heartbeatInterval ?? 25_000;
this.heartbeatTimeout = options.heartbeatTimeout ?? 55_000;
this.wsToken = options.wsToken ?? null;
}
@ -200,22 +211,21 @@ export class RoomWsClient {
console.debug('[RoomWs] Connected');
this.reconnectAttempt = 0;
this.setStatus('open');
this.startHeartbeat();
this.resubscribeAll().catch(() => {});
resolve();
};
this.ws!.onmessage = (ev: MessageEvent) => {
try {
const message: WsInMessage = JSON.parse(ev.data);
this.handleMessage(message);
} catch (e) {
console.warn('[RoomWs] parse error:', e);
}
const text = ev.data;
if (typeof text !== 'string') return;
this.handleMessage(text);
};
this.ws!.onclose = (ev: CloseEvent) => {
clearTimeout(timeoutId);
console.debug(`[RoomWs] onclose code=${ev.code} reason=${ev.reason || 'none'} wasClean=${ev.wasClean}`);
this.stopHeartbeat();
this.ws = null;
this.setStatus('closed');
for (const [, req] of this.pendingRequests) {
@ -241,6 +251,7 @@ export class RoomWsClient {
disconnect(): void {
this.shouldReconnect = false;
this.stopHeartbeat();
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
@ -253,6 +264,39 @@ export class RoomWsClient {
this.setStatus('closed');
}
private startHeartbeat(): void {
this.stopHeartbeat();
this.lastHeartbeat = Date.now();
this.heartbeatTimer = setInterval(() => {
if (this.status !== 'open' || !this.ws) {
this.stopHeartbeat();
return;
}
// Detect heartbeat timeout (server died or network dropped)
if (Date.now() - this.lastHeartbeat > this.heartbeatTimeout) {
console.warn('[RoomWs] Heartbeat timeout — closing connection');
this.callbacks.onError?.(new Error('Heartbeat timeout'));
this.stopHeartbeat();
this.ws.close();
return;
}
// Send application-level ping
try {
this.ws.send(JSON.stringify({ type: 'ping' }));
this.callbacks.onHeartbeat?.();
} catch {
this.stopHeartbeat();
}
}, this.heartbeatInterval);
}
private stopHeartbeat(): void {
if (this.heartbeatTimer !== null) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
private async request<T = WsResponseData>(action: WsAction, params?: WsRequestParams): Promise<T> {
if (this.ws && this.status === 'open') {
return await this.requestWs<T>(action, params);
@ -486,6 +530,87 @@ export class RoomWsClient {
return data || { messages: [], total: 0 };
}
/**
* Load messages via WebSocket only. Throws if WS is not connected.
* Use this for WS-first loading; falls back to `messageList()` on failure.
*/
async messageListWs(
roomId: string,
options?: {
beforeSeq?: number;
afterSeq?: number;
limit?: number;
},
): Promise<RoomMessageListResponse> {
const data = await this.requestWs<RoomMessageListResponse>('message.list', {
room_id: roomId,
before_seq: options?.beforeSeq,
after_seq: options?.afterSeq,
limit: options?.limit,
});
return data || { messages: [], total: 0 };
}
async messageGetWs(messageId: string): Promise<RoomMessageResponse | null> {
const data = await this.requestWs<RoomMessageResponse>('message.get', {
message_id: messageId,
});
return data || null;
}
async messageUpdateWs(messageId: string, content: string): Promise<RoomMessageResponse> {
return this.requestWs<RoomMessageResponse>('message.update', {
message_id: messageId,
content,
});
}
async messageRevokeWs(messageId: string): Promise<RoomMessageResponse> {
return this.requestWs<RoomMessageResponse>('message.revoke', {
message_id: messageId,
});
}
async messageSearchWs(
roomId: string,
query: string,
options?: { limit?: number; offset?: number },
): Promise<SearchResultData> {
return this.requestWs<SearchResultData>('message.search', {
room_id: roomId,
query,
limit: options?.limit,
offset: options?.offset,
});
}
async messageEditHistoryWs(messageId: string): Promise<MessageEditHistoryResponse> {
return this.requestWs<MessageEditHistoryResponse>('message.edit_history', {
message_id: messageId,
});
}
async threadMessagesWs(
threadId: string,
options?: { beforeSeq?: number; afterSeq?: number; limit?: number },
): Promise<RoomMessageListResponse> {
const data = await this.requestWs<RoomMessageListResponse>('thread.messages', {
thread_id: threadId,
before_seq: options?.beforeSeq,
after_seq: options?.afterSeq,
limit: options?.limit,
});
return data || { messages: [], total: 0 };
}
async reactionListBatchWs(roomId: string, messageIds: string[]): Promise<ReactionListData[]> {
const data = await this.requestWs<ReactionListData[]>('reaction.list_batch', {
room_id: roomId,
message_ids: messageIds,
});
return Array.isArray(data) ? data : [];
}
async messageCreate(
roomId: string,
content: string,
@ -827,7 +952,23 @@ export class RoomWsClient {
return url;
}
private handleMessage(message: WsInMessage): void {
private handleMessage(rawText: string): void {
// Handle raw JSON pong before full parsing — resets heartbeat
if (rawText.trim() === '{"type":"pong"}') {
this.lastHeartbeat = Date.now();
return;
}
// Reset heartbeat on any other message (server is alive)
this.lastHeartbeat = Date.now();
let message: WsInMessage;
try {
message = JSON.parse(rawText);
} catch {
return;
}
if ('type' in message && message.type === 'error') {
return;
}
@ -966,6 +1107,8 @@ export function createRoomWsClient(
reconnectBaseDelay?: number;
reconnectMaxDelay?: number;
requestTimeout?: number;
heartbeatInterval?: number;
heartbeatTimeout?: number;
wsToken?: string;
},
): RoomWsClient {