From 821b0e998dbdd61b55e5f9a5a8ed008396102a0b Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Sat, 18 Apr 2026 19:05:21 +0800 Subject: [PATCH] refactor(room): Discord layout and room WebSocket client refactor - Refactor room-context.tsx with improved WebSocket state management - Enhance room-ws-client.ts with reconnect logic and message handling - Update Discord layout components with message editor improvements - Add WebSocket universal endpoint support in ws_universal.rs --- libs/agent/chat/service.rs | 12 +- libs/api/room/ws_universal.rs | 9 + src/components/room/DiscordChatPanel.tsx | 11 +- src/components/room/message/MessageInput.tsx | 1 + .../room/message/editor/IMEditor.tsx | 6 + src/contexts/room-context.tsx | 276 ++++++------------ src/lib/room-ws-client.ts | 157 +++++++++- 7 files changed, 264 insertions(+), 208 deletions(-) diff --git a/libs/agent/chat/service.rs b/libs/agent/chat/service.rs index 7dd2a15..56db151 100644 --- a/libs/agent/chat/service.rs +++ b/libs/agent/chat/service.rs @@ -98,11 +98,7 @@ impl ChatService { frequency_penalty: Some(frequency_penalty as f32), presence_penalty: Some(presence_penalty as f32), stream: Some(false), - reasoning_effort: Some(if think { - ReasoningEffort::High - } else { - ReasoningEffort::None - }), + reasoning_effort: if think { Some(ReasoningEffort::High) } else { None }, tools: if tools_enabled { Some( tools @@ -232,11 +228,7 @@ impl ChatService { frequency_penalty: Some(frequency_penalty as f32), presence_penalty: Some(presence_penalty as f32), stream: Some(true), - reasoning_effort: Some(if think { - ReasoningEffort::High - } else { - ReasoningEffort::None - }), + reasoning_effort: if think { Some(ReasoningEffort::High) } else { None }, tools: if tools_enabled { Some( tools diff --git a/libs/api/room/ws_universal.rs b/libs/api/room/ws_universal.rs index d4694eb..011c0e7 100644 --- a/libs/api/room/ws_universal.rs +++ b/libs/api/room/ws_universal.rs @@ -289,6 +289,15 @@ pub async fn ws_universal( continue; } + // Handle JSON-level ping (application heartbeat). + // Client sends {"type":"ping"} and we reply with {"type":"pong"}. + if text.trim() == r#"{"type":"ping"}"# { + if session.text(r#"{"type":"pong"}"#).await.is_err() { break; } + last_activity = Instant::now(); + last_heartbeat = Instant::now(); + continue; + } + match serde_json::from_str::(&text) { Ok(request) => { let action_str = request.action.to_string(); diff --git a/src/components/room/DiscordChatPanel.tsx b/src/components/room/DiscordChatPanel.tsx index 8a13043..478a925 100644 --- a/src/components/room/DiscordChatPanel.tsx +++ b/src/components/room/DiscordChatPanel.tsx @@ -277,6 +277,10 @@ export function DiscordChatPanel({ room, isAdmin, onClose, onDelete }: DiscordCh onRevoke={handleRevoke} onReply={setReplyingTo} onMention={undefined} + onOpenUserCard={({ userId, username }) => { + messageInputRef.current?.insertMention('user', userId, username); + messageInputRef.current?.focus(); + }} onOpenThread={handleOpenThread} onCreateThread={handleCreateThread} /> @@ -294,7 +298,12 @@ export function DiscordChatPanel({ room, isAdmin, onClose, onDelete }: DiscordCh {}} + onMemberClick={({ user, user_info, role }) => { + const label = user_info?.username ?? user; + const type = role === 'ai' ? 'ai' : 'user'; + messageInputRef.current?.insertMention(type, user, label); + messageInputRef.current?.focus(); + }} aiConfigs={roomAiConfigs} /> )} diff --git a/src/components/room/message/MessageInput.tsx b/src/components/room/message/MessageInput.tsx index 2eca377..491c63f 100644 --- a/src/components/room/message/MessageInput.tsx +++ b/src/components/room/message/MessageInput.tsx @@ -21,6 +21,7 @@ export interface MessageInputHandle { focus: () => void; clearContent: () => void; getContent: () => string; + insertMention: (type: string, id: string, label: string) => void; } export const MessageInput = forwardRef(function MessageInput( diff --git a/src/components/room/message/editor/IMEditor.tsx b/src/components/room/message/editor/IMEditor.tsx index 86f26a3..197665a 100644 --- a/src/components/room/message/editor/IMEditor.tsx +++ b/src/components/room/message/editor/IMEditor.tsx @@ -33,6 +33,7 @@ export interface IMEditorHandle { focus: () => void; clearContent: () => void; getContent: () => string; + insertMention: (type: string, id: string, label: string) => void; } // ─── Color System (Google AI Studio / Linear palette, no Discord) ──────────── @@ -368,6 +369,11 @@ export const IMEditor = forwardRef(function IMEdi focus: () => editor?.commands.focus(), clearContent: () => editor?.commands.clearContent(), getContent: () => editor?.getText() ?? '', + insertMention: (type: string, id: string, label: string) => { + if (!editor) return; + const mentionStr = `@[${type}:${id}:${label}] `; + editor.chain().focus().insertContent(mentionStr).run(); + }, })); const hasContent = !!editor && !editor.isEmpty; diff --git a/src/contexts/room-context.tsx b/src/contexts/room-context.tsx index 030ca44..a62c551 100644 --- a/src/contexts/room-context.tsx +++ b/src/contexts/room-context.tsx @@ -30,13 +30,6 @@ import { } from '@/lib/room-ws-client'; import { requestWsToken } from '@/lib/ws-token'; import { useUser } from '@/contexts'; -import { - saveMessage, - saveMessages, - loadMessages as loadMessagesFromIdb, - loadOlderMessagesFromIdb, - deleteMessage as deleteMessageFromIdb, -} from '@/lib/storage/indexed-db'; export type { RoomWsStatus, RoomWsClient } from '@/lib/room-ws-client'; @@ -249,23 +242,17 @@ export function RoomProvider({ } }, [activeRoomId]); + // ── Subscribe to room (WS must already be connected) ─────────────────────── useEffect(() => { const client = wsClientRef.current; if (!activeRoomId || !client) return; - const setup = async () => { - // IDB load does NOT need WS — show cached messages immediately. - // loadMore checks IDB first, then falls back to API (WS-first + HTTP). - loadMore(null); + // Load messages via WS (with HTTP fallback) + loadMore(null); - // Connect WS in parallel for real-time push + reactions batch-fetch. - // connect() is idempotent — no-op if already connecting/open. - // subscribeRoom uses WS-first request() with HTTP fallback. - await client.connect(); - if (activeRoomIdRef.current !== activeRoomId) return; - client.subscribeRoom(activeRoomId).catch(() => {}); - }; - setup().catch(() => {}); + // Subscribe to room events. connect() is already called at the provider + // level — subscribe/unsubscribe only manage per-room event routing. + client.subscribeRoom(activeRoomId).catch(() => {}); return () => { client.unsubscribeRoom(activeRoomId).catch(() => {}); @@ -284,26 +271,32 @@ export function RoomProvider({ ) => { const msgIds = msgs.map((m) => m.id); if (msgIds.length === 0) return; - client - .reactionListBatch(roomId, msgIds) - .then((reactionResults: ReactionListData[]) => { - const reactionMap = new Map(); - for (const result of reactionResults) { - if (result.reactions.length > 0) { - reactionMap.set(result.message_id, result.reactions); - } + const doLoad = async () => { + let reactionResults: ReactionListData[]; + if (client.getStatus() === 'open') { + try { + reactionResults = await client.reactionListBatchWs(roomId, msgIds); + } catch { + reactionResults = await client.reactionListBatch(roomId, msgIds); } - if (reactionMap.size > 0) { - setMessages((prev) => - prev.map((m) => - reactionMap.has(m.id) ? { ...m, reactions: reactionMap.get(m.id) } : m, - ), - ); + } else { + reactionResults = await client.reactionListBatch(roomId, msgIds); + } + const reactionMap = new Map(); + for (const result of reactionResults) { + if (result.reactions.length > 0) { + reactionMap.set(result.message_id, result.reactions); } - }) - .catch(() => { - // Non-fatal: WS push will keep reactions up to date - }); + } + if (reactionMap.size > 0) { + setMessages((prev) => + prev.map((m) => + reactionMap.has(m.id) ? { ...m, reactions: reactionMap.get(m.id) } : m, + ), + ); + } + }; + doLoad().catch(() => {}); }; const loadMore = useCallback( @@ -321,56 +314,28 @@ export function RoomProvider({ const isInitial = cursor === null || cursor === undefined; const limit = isInitial ? 200 : 50; - // --- Initial load: try IndexedDB first for instant render --- - if (isInitial) { - const cached = await loadMessagesFromIdb(activeRoomId); - if (cached.length > 0) { - setMessages(cached); - setIsTransitioningRoom(false); - const minSeq = cached[0].seq; - setNextCursor(minSeq > 0 ? minSeq - 1 : null); - setIsLoadingMore(false); - // No API call needed — WS will push any new messages that arrived while away. - // Fetch reactions via WS (with HTTP fallback) so reactions appear without extra latency. - thisLoadReactions(activeRoomId, client, cached); - return; - } - } - - // --- Load older history: try IDB first, then fall back to API --- - if (!isInitial && cursor != null) { - const idbMessages = await loadOlderMessagesFromIdb(activeRoomId, cursor, limit); - if (idbMessages.length > 0) { - setMessages((prev) => { - if (abortController.signal.aborted) return prev; - const existingIds = new Set(prev.map((m) => m.id)); - const filtered = idbMessages.filter((m) => !existingIds.has(m.id)); - let merged = [...filtered, ...prev]; - merged.sort((a, b) => a.seq - b.seq); - if (merged.length > MAX_MESSAGES_IN_MEMORY) { - merged = merged.slice(-MAX_MESSAGES_IN_MEMORY); - } - return merged; + // Try WebSocket first; fall back to HTTP on failure + let resp: import('@/lib/room-ws-client').RoomMessageListResponse; + if (client.getStatus() === 'open') { + try { + resp = await client.messageListWs(activeRoomId, { + beforeSeq: cursor ?? undefined, + limit, + }); + } catch { + // WS failed — fall back to HTTP + resp = await client.messageList(activeRoomId, { + beforeSeq: cursor ?? undefined, + limit, }); - const oldest = idbMessages[0]; - setNextCursor(oldest.seq > 0 ? oldest.seq - 1 : null); - if (idbMessages.length < limit) { - setIsHistoryLoaded(true); - } - setIsLoadingMore(false); - // Also fetch reactions for the IDB-loaded history messages. - thisLoadReactions(activeRoomId, client, idbMessages); - return; } - // IDB empty for this range — fall through to API + } else { + resp = await client.messageList(activeRoomId, { + beforeSeq: cursor ?? undefined, + limit, + }); } - // --- API fetch --- - const resp = await client.messageList(activeRoomId, { - beforeSeq: cursor ?? undefined, - limit, - }); - if (abortController.signal.aborted) return; const newMessages = resp.messages.map((m) => ({ @@ -396,16 +361,12 @@ export function RoomProvider({ return merged; }); - if (newMessages.length > 0) { - saveMessages(activeRoomId, newMessages).catch(() => {}); - } - if (resp.messages.length < limit) { setIsHistoryLoaded(true); } setNextCursor(resp.messages.length > 0 ? resp.messages[resp.messages.length - 1].seq : null); - // Fetch reactions for all loaded messages (WS-first with HTTP fallback) + // Fetch reactions for all loaded messages thisLoadReactions(activeRoomId, client, newMessages); } catch (error) { if (abortController.signal.aborted) return; @@ -434,8 +395,6 @@ export function RoomProvider({ // Room AI configs for @ai: mention suggestions const [roomAiConfigs, setRoomAiConfigs] = useState([]); const [aiConfigsLoading, setAiConfigsLoading] = useState(false); - // Available models (for looking up AI model names) — TODO: wire up once model sync API is available - const [_availableModels, _setAvailableModels] = useState<{ id: string; name: string }[]>([]); useEffect(() => { const baseUrl = import.meta.env.VITE_API_BASE_URL ?? window.location.origin; @@ -446,37 +405,28 @@ export function RoomProvider({ // Use ref to get current activeRoomId to avoid stale closure if (payload.room_id === activeRoomIdRef.current) { setMessages((prev) => { - // Check if this is a reaction-update event (same ID, different reactions). - // publish_reaction_event sends RoomMessageEvent with reactions field set. const existingIdx = prev.findIndex((m) => m.id === payload.id); if (existingIdx !== -1) { - // Message already exists — update reactions if provided. - // Reaction events have empty content/sender_type. + // Message already exists — update reactions if provided if (payload.reactions !== undefined) { const updated = [...prev]; updated[existingIdx] = { ...updated[existingIdx], reactions: payload.reactions }; - const msg = updated[existingIdx]; - saveMessage(msg).catch(() => {}); return updated; } // Duplicate of a real message — ignore return prev; } - // Also check if there's an optimistic message with the same seq that should be replaced + // Replace optimistic message with server-confirmed one const optimisticIdx = prev.findIndex( (m) => m.isOptimistic && m.seq === payload.seq && m.seq !== 0, ); if (optimisticIdx !== -1) { - // Replace optimistic message with confirmed one const confirmed: MessageWithMeta = { ...wsMessageToUiMessage(payload), reactions: prev[optimisticIdx].reactions, }; const next = [...prev]; next[optimisticIdx] = confirmed; - // Remove optimistic from IDB, save confirmed - deleteMessageFromIdb(prev[optimisticIdx].id).catch(() => {}); - saveMessage(confirmed).catch(() => {}); return next; } const newMsg = wsMessageToUiMessage(payload); @@ -487,39 +437,29 @@ export function RoomProvider({ } return updated; }); - // Persist to IndexedDB - const msg = wsMessageToUiMessage(payload); - saveMessage(msg).catch(() => {}); } }, onAiStreamChunk: (chunk) => { if (chunk.done) { - // When streaming is done, update the message content and remove from streaming setStreamingContent((prev) => { const next = new Map(prev); next.delete(chunk.message_id); return next; }); - setMessages((prev) => { - const updated = prev.map((m) => + setMessages((prev) => + prev.map((m) => m.id === chunk.message_id ? { ...m, content: chunk.content, is_streaming: false } : m, - ); - // Persist final content to IndexedDB - const msg = updated.find((m) => m.id === chunk.message_id); - if (msg) saveMessage(msg).catch(() => {}); - return updated; - }); + ), + ); } else { - // Accumulate streaming content setStreamingContent((prev) => { const next = new Map(prev); const existing = next.get(chunk.message_id) ?? ''; next.set(chunk.message_id, existing + chunk.content); return next; }); - // Create streaming message placeholder if it doesn't exist setMessages((prev) => { if (prev.some((m) => m.id === chunk.message_id)) { return prev; @@ -539,48 +479,34 @@ export function RoomProvider({ } }, onRoomReactionUpdated: (payload: RoomReactionUpdatedPayload) => { - // Guard: ignore events for rooms that are no longer active. - // Without this, a WS event arriving after room switch could update - // the wrong room's message list (same message ID, different room). if (!activeRoomIdRef.current) return; - setMessages((prev) => { const existingIdx = prev.findIndex((m) => m.id === payload.message_id); if (existingIdx === -1) return prev; const updated = [...prev]; updated[existingIdx] = { ...updated[existingIdx], reactions: payload.reactions }; - // Persist reaction update to IndexedDB - saveMessage(updated[existingIdx]).catch(() => {}); return updated; }); }, onMessageEdited: async (payload) => { - // The event only contains message_id and edited_at. - // Optimistically update edited_at, then fetch the full message from the API. if (payload.room_id !== activeRoomIdRef.current) return; - const client = wsClientRef.current; if (!client) return; - // Capture original edited_at for rollback if fetch fails let rollbackEditedAt: string | null = null; setMessages((prev) => { const msg = prev.find((m) => m.id === payload.message_id); rollbackEditedAt = msg?.edited_at ?? null; - const updated = prev.map((m) => + return prev.map((m) => m.id === payload.message_id ? { ...m, edited_at: payload.edited_at } : m, ); - const saved = updated.find((m) => m.id === payload.message_id); - if (saved) saveMessage(saved).catch(() => {}); - return updated; }); - // Fetch full updated message from API try { const updatedMsg = await client.messageGet(payload.message_id); if (!updatedMsg) return; - setMessages((prev) => { - const merged = prev.map((m) => + setMessages((prev) => + prev.map((m) => m.id === payload.message_id ? { ...m, @@ -589,14 +515,9 @@ export function RoomProvider({ edited_at: payload.edited_at, } : m, - ); - // Persist to IndexedDB - const saved = merged.find((m) => m.id === payload.message_id); - if (saved) saveMessage(saved).catch(() => {}); - return merged; - }); + ), + ); } catch { - // Revert edited_at if the fetch failed if (rollbackEditedAt !== null) { setMessages((prev) => prev.map((m) => @@ -608,8 +529,8 @@ export function RoomProvider({ }, onMessageRevoked: async (payload) => { if (payload.room_id !== activeRoomIdRef.current) return; - setMessages((prev) => { - const updated = prev.map((m) => + setMessages((prev) => + prev.map((m) => m.id === payload.message_id ? { ...m, @@ -619,12 +540,8 @@ export function RoomProvider({ display_content: '', } : m, - ); - // Persist to IndexedDB - const msg = updated.find((m) => m.id === payload.message_id); - if (msg) saveMessage(msg).catch(() => {}); - return updated; - }); + ), + ); }, onMessagePinned: async (payload) => { if (payload.room_id !== activeRoomIdRef.current) return; @@ -670,18 +587,14 @@ export function RoomProvider({ }; }, [wsToken]); + // ── Connect WS whenever a new client is created ───────────────────────────── + // Intentionally depends on wsClient (not wsClientRef) so a new client triggers connect(). + // connect() is idempotent — no-op if already connecting/open. useEffect(() => { - // NOTE: intentionally omitted [wsClient] from deps. - // In React StrictMode the component mounts twice — if wsClient were a dep, - // the first mount's effect would connect client-1, then StrictMode cleanup - // would disconnect it, then the second mount's effect would connect client-2, - // then immediately the first mount's *second* cleanup would fire and - // disconnect client-2 — leaving WS unconnected. Using a ref for the initial - // connect avoids this. The client is always ready by the time this runs. wsClientRef.current?.connect().catch((e) => { console.error('[RoomContext] WS connect error:', e); }); - }, []); + }, [wsClient]); const connectWs = useCallback(async () => { const client = wsClientRef.current; @@ -899,8 +812,6 @@ export function RoomProvider({ }; setMessages((prev) => [...prev, optimisticMsg]); - // Persist optimistic message to IndexedDB so it's not lost on refresh - saveMessage(optimisticMsg).catch(() => {}); try { const confirmedMsg = await client.messageCreate(activeRoomId, content, { @@ -918,10 +829,6 @@ export function RoomProvider({ isOptimistic: false, reactions: [], }; - // Remove optimistic from IDB - deleteMessageFromIdb(optimisticId).catch(() => {}); - // Save confirmed to IDB - saveMessage(confirmed).catch(() => {}); return [...without, confirmed]; }); } catch (err) { @@ -944,7 +851,6 @@ export function RoomProvider({ const client = wsClientRef.current; if (!client) return; - // Capture original content for rollback on server rejection let rollbackContent: string | null = null; setMessages((prev) => { const msg = prev.find((m) => m.id === messageId); @@ -955,15 +861,16 @@ export function RoomProvider({ }); try { - await client.messageUpdate(messageId, content); - // Persist updated content to IndexedDB - setMessages((prev) => { - const msg = prev.find((m) => m.id === messageId); - if (msg) saveMessage(msg).catch(() => {}); - return prev; - }); + if (client.getStatus() === 'open') { + try { + await client.messageUpdateWs(messageId, content); + } catch { + await client.messageUpdate(messageId, content); + } + } else { + await client.messageUpdate(messageId, content); + } } catch (err) { - // Rollback optimistic update on server rejection if (rollbackContent !== null) { setMessages((prev) => prev.map((m) => @@ -982,7 +889,6 @@ export function RoomProvider({ const client = wsClientRef.current; if (!client) return; - // Optimistic removal: hide message immediately let rollbackMsg: MessageWithMeta | null = null; setMessages((prev) => { rollbackMsg = prev.find((m) => m.id === messageId) ?? null; @@ -990,13 +896,18 @@ export function RoomProvider({ }); try { - await client.messageRevoke(messageId); - deleteMessageFromIdb(messageId).catch(() => {}); + if (client.getStatus() === 'open') { + try { + await client.messageRevokeWs(messageId); + } catch { + await client.messageRevoke(messageId); + } + } else { + await client.messageRevoke(messageId); + } } catch (err) { - // Rollback: restore message on server rejection if (rollbackMsg) { setMessages((prev) => [...prev, rollbackMsg!]); - saveMessage(rollbackMsg!).catch(() => {}); } handleRoomError('Delete message', err); } @@ -1160,25 +1071,10 @@ export function RoomProvider({ } }, [activeRoomId]); - // Fetch available models (for AI model name lookup) - const fetchAvailableModels = useCallback(async () => { - try { - const resp = await (await import('@/client')).modelList({}); - const inner = (resp.data as { data?: { data?: { id: string; name: string }[] } } | undefined); - _setAvailableModels(inner?.data?.data ?? []); - } catch { - // Non-fatal - } - }, []); - useEffect(() => { fetchProjectRepos(); }, [fetchProjectRepos]); - useEffect(() => { - fetchAvailableModels(); - }, [fetchAvailableModels]); - useEffect(() => { fetchRoomAiConfigs(); }, [fetchRoomAiConfigs]); diff --git a/src/lib/room-ws-client.ts b/src/lib/room-ws-client.ts index cbf91fe..74724a6 100644 --- a/src/lib/room-ws-client.ts +++ b/src/lib/room-ws-client.ts @@ -77,6 +77,8 @@ export interface RoomWsCallbacks { onMessageUnpinned?: (payload: import('./ws-protocol').MessageUnpinnedPayload) => void; onStatusChange?: (status: RoomWsStatus) => void; onError?: (error: Error) => void; + /** Called each time the client sends a heartbeat ping */ + onHeartbeat?: () => void; } export class RoomWsClient { @@ -93,7 +95,11 @@ export class RoomWsClient { private readonly reconnectBaseDelay: number; private readonly reconnectMaxDelay: number; private readonly requestTimeout: number; + private readonly heartbeatInterval: number; + private readonly heartbeatTimeout: number; private wsToken: string | null = null; + private heartbeatTimer: ReturnType | null = null; + private lastHeartbeat: number = 0; constructor( baseUrl: string, @@ -102,6 +108,8 @@ export class RoomWsClient { reconnectBaseDelay?: number; reconnectMaxDelay?: number; requestTimeout?: number; + heartbeatInterval?: number; + heartbeatTimeout?: number; wsToken?: string; } = {}, ) { @@ -110,6 +118,9 @@ export class RoomWsClient { this.reconnectBaseDelay = options.reconnectBaseDelay ?? 1000; this.reconnectMaxDelay = options.reconnectMaxDelay ?? 15000; this.requestTimeout = options.requestTimeout ?? 30_000; + // Heartbeat: send a ping every 25s, timeout after 55s of inactivity + this.heartbeatInterval = options.heartbeatInterval ?? 25_000; + this.heartbeatTimeout = options.heartbeatTimeout ?? 55_000; this.wsToken = options.wsToken ?? null; } @@ -200,22 +211,21 @@ export class RoomWsClient { console.debug('[RoomWs] Connected'); this.reconnectAttempt = 0; this.setStatus('open'); + this.startHeartbeat(); this.resubscribeAll().catch(() => {}); resolve(); }; this.ws!.onmessage = (ev: MessageEvent) => { - try { - const message: WsInMessage = JSON.parse(ev.data); - this.handleMessage(message); - } catch (e) { - console.warn('[RoomWs] parse error:', e); - } + const text = ev.data; + if (typeof text !== 'string') return; + this.handleMessage(text); }; this.ws!.onclose = (ev: CloseEvent) => { clearTimeout(timeoutId); console.debug(`[RoomWs] onclose code=${ev.code} reason=${ev.reason || 'none'} wasClean=${ev.wasClean}`); + this.stopHeartbeat(); this.ws = null; this.setStatus('closed'); for (const [, req] of this.pendingRequests) { @@ -241,6 +251,7 @@ export class RoomWsClient { disconnect(): void { this.shouldReconnect = false; + this.stopHeartbeat(); if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; @@ -253,6 +264,39 @@ export class RoomWsClient { this.setStatus('closed'); } + private startHeartbeat(): void { + this.stopHeartbeat(); + this.lastHeartbeat = Date.now(); + this.heartbeatTimer = setInterval(() => { + if (this.status !== 'open' || !this.ws) { + this.stopHeartbeat(); + return; + } + // Detect heartbeat timeout (server died or network dropped) + if (Date.now() - this.lastHeartbeat > this.heartbeatTimeout) { + console.warn('[RoomWs] Heartbeat timeout — closing connection'); + this.callbacks.onError?.(new Error('Heartbeat timeout')); + this.stopHeartbeat(); + this.ws.close(); + return; + } + // Send application-level ping + try { + this.ws.send(JSON.stringify({ type: 'ping' })); + this.callbacks.onHeartbeat?.(); + } catch { + this.stopHeartbeat(); + } + }, this.heartbeatInterval); + } + + private stopHeartbeat(): void { + if (this.heartbeatTimer !== null) { + clearInterval(this.heartbeatTimer); + this.heartbeatTimer = null; + } + } + private async request(action: WsAction, params?: WsRequestParams): Promise { if (this.ws && this.status === 'open') { return await this.requestWs(action, params); @@ -486,6 +530,87 @@ export class RoomWsClient { return data || { messages: [], total: 0 }; } + /** + * Load messages via WebSocket only. Throws if WS is not connected. + * Use this for WS-first loading; falls back to `messageList()` on failure. + */ + async messageListWs( + roomId: string, + options?: { + beforeSeq?: number; + afterSeq?: number; + limit?: number; + }, + ): Promise { + const data = await this.requestWs('message.list', { + room_id: roomId, + before_seq: options?.beforeSeq, + after_seq: options?.afterSeq, + limit: options?.limit, + }); + return data || { messages: [], total: 0 }; + } + + async messageGetWs(messageId: string): Promise { + const data = await this.requestWs('message.get', { + message_id: messageId, + }); + return data || null; + } + + async messageUpdateWs(messageId: string, content: string): Promise { + return this.requestWs('message.update', { + message_id: messageId, + content, + }); + } + + async messageRevokeWs(messageId: string): Promise { + return this.requestWs('message.revoke', { + message_id: messageId, + }); + } + + async messageSearchWs( + roomId: string, + query: string, + options?: { limit?: number; offset?: number }, + ): Promise { + return this.requestWs('message.search', { + room_id: roomId, + query, + limit: options?.limit, + offset: options?.offset, + }); + } + + async messageEditHistoryWs(messageId: string): Promise { + return this.requestWs('message.edit_history', { + message_id: messageId, + }); + } + + async threadMessagesWs( + threadId: string, + options?: { beforeSeq?: number; afterSeq?: number; limit?: number }, + ): Promise { + const data = await this.requestWs('thread.messages', { + thread_id: threadId, + before_seq: options?.beforeSeq, + after_seq: options?.afterSeq, + limit: options?.limit, + }); + return data || { messages: [], total: 0 }; + } + + async reactionListBatchWs(roomId: string, messageIds: string[]): Promise { + const data = await this.requestWs('reaction.list_batch', { + room_id: roomId, + message_ids: messageIds, + }); + return Array.isArray(data) ? data : []; + } + async messageCreate( roomId: string, content: string, @@ -827,7 +952,23 @@ export class RoomWsClient { return url; } - private handleMessage(message: WsInMessage): void { + private handleMessage(rawText: string): void { + // Handle raw JSON pong before full parsing — resets heartbeat + if (rawText.trim() === '{"type":"pong"}') { + this.lastHeartbeat = Date.now(); + return; + } + + // Reset heartbeat on any other message (server is alive) + this.lastHeartbeat = Date.now(); + + let message: WsInMessage; + try { + message = JSON.parse(rawText); + } catch { + return; + } + if ('type' in message && message.type === 'error') { return; } @@ -966,6 +1107,8 @@ export function createRoomWsClient( reconnectBaseDelay?: number; reconnectMaxDelay?: number; requestTimeout?: number; + heartbeatInterval?: number; + heartbeatTimeout?: number; wsToken?: string; }, ): RoomWsClient {