From f2a2ae5d7fa58187d4a55133d039e92f774f9eee Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Fri, 17 Apr 2026 21:18:56 +0800 Subject: [PATCH] fix(room): use WS for message fetching, eliminate duplicate WS connection - Fix initial room load being skipped: `setup()` called `loadMoreRef.current` which was null on first mount (ref assigned in later effect). Call `loadMore` directly so the initial fetch always fires. WS message.list used when connected, HTTP fallback otherwise. - Rewrite useRoomWs to use shared RoomWsClient instead of creating its own raw WebSocket, eliminating duplicate WS connection per room. - Remove dead loadMoreRef now that setup calls loadMore directly. --- src/contexts/room-context.tsx | 10 +- src/hooks/useRoomWs.ts | 497 ++++++++++------------------------ 2 files changed, 139 insertions(+), 368 deletions(-) diff --git a/src/contexts/room-context.tsx b/src/contexts/room-context.tsx index 3452bab..a318c77 100644 --- a/src/contexts/room-context.tsx +++ b/src/contexts/room-context.tsx @@ -238,8 +238,6 @@ export function RoomProvider({ } }, [activeRoomId]); - const loadMoreRef = useRef<((cursor?: number | null) => Promise) | null>(null); - useEffect(() => { const client = wsClientRef.current; if (!activeRoomId || !client) return; @@ -253,7 +251,9 @@ export function RoomProvider({ const roomId = activeRoomIdRef.current; if (!roomId) return; await client.subscribeRoom(roomId); - loadMoreRef.current?.(null); + // loadMoreRef.current is null on first mount (set later in render order). + // Call loadMore directly to ensure initial message fetch always runs. + loadMore(null); }; setup(); @@ -376,10 +376,6 @@ export function RoomProvider({ [activeRoomId], ); - useEffect(() => { - loadMoreRef.current = loadMore; - }, [loadMore]); - const [members, setMembers] = useState([]); const [membersLoading, setMembersLoading] = useState(false); diff --git a/src/hooks/useRoomWs.ts b/src/hooks/useRoomWs.ts index 32161cf..694eacf 100644 --- a/src/hooks/useRoomWs.ts +++ b/src/hooks/useRoomWs.ts @@ -1,95 +1,31 @@ import { useCallback, useEffect, useMemo, useRef, useState } from 'react'; import { toast } from 'sonner'; -import { - type AiStreamChunkPayload, - type RoomMessagePayload, - type RoomWsStatus, - type WsOutEvent, -} from '@/lib/room'; -import { requestWsToken, buildWsUrlWithToken } from '@/lib/ws-token'; -import { client } from '@/client/client.gen'; -import type { AxiosResponse } from 'axios'; +import type { + AiStreamChunkPayload, + RoomMessagePayload, + RoomWsStatus, + RoomWsClient, +} from '@/lib/room-ws-client'; import type { RoomMemberResponse } from '@/client'; -const RECONNECT_BASE_DELAY = 1_000; -const RECONNECT_MAX_DELAY = 15_000; - -/** A message as held in the UI state */ -export type UiMessage = RoomMessagePayload & { - /** Set while the server is streaming an AI reply into this message */ - is_streaming?: boolean; - /** Accumulated streaming content; flushed to content on `done: true` */ - streaming_content?: string; - /** Display name resolved from sender_id; undefined if not yet resolved */ - display_name?: string; - /** Avatar URL resolved from members list */ - avatar_url?: string; - /** For optimistic UI: message failed to send */ - isOptimisticError?: boolean; - /** Reply to message ID */ - in_reply_to?: string | null; - /** Edited timestamp */ - edited_at?: string | null; - /** Revoked timestamp */ - revoked?: string | null; - /** Revoked by user ID */ - revoked_by?: string | null; -}; - type RoomMessageCacheEntry = { messages: UiMessage[]; isHistoryLoaded: boolean; - /** seq of the latest message, used as cursor for pagination */ nextCursor: number | null; }; -interface MessageListResponse { - code: number; - message: string; - data: { messages: RestMessage[]; total: number }; -} - -/** REST message shape (matches RoomMessageResponse from the SDK) */ -interface RestMessage { - id: string; - seq: number; - room: string; - sender_type: string; - sender_id?: string | null; - display_name?: string | null; - thread?: string | null; +/** A message as held in the UI state */ +export type UiMessage = RoomMessagePayload & { + is_streaming?: boolean; + streaming_content?: string; + display_name?: string; + avatar_url?: string; + isOptimisticError?: boolean; in_reply_to?: string | null; - content: string; - content_type: string; edited_at?: string | null; - send_at: string; revoked?: string | null; revoked_by?: string | null; -} - -/** Display name and avatar URL resolved from a message's sender */ -interface SenderInfo { - displayName: string; - avatarUrl: string | undefined; -} - -/** Resolve displayName and avatar URL for a message sender. - * - AI messages: use sender_id (the model UUID) - * - User messages: look up user_info from members list, fall back to sender_id */ -function resolveSender(payload: RoomMessagePayload, members: RoomMemberResponse[]): SenderInfo { - if (payload.sender_type === 'ai') { - return { displayName: payload.sender_id ?? 'AI', avatarUrl: undefined }; - } - if (payload.sender_id) { - const member = members.find((m) => m.user === payload.sender_id); - if (member) { - const username = member.user_info?.username ?? member.user; - return { displayName: username, avatarUrl: member.user_info?.avatar_url ?? undefined }; - } - } - if (payload.sender_type === 'system') return { displayName: 'System', avatarUrl: undefined }; - return { displayName: payload.sender_type, avatarUrl: undefined }; -} +}; function compareMessages(a: UiMessage, b: UiMessage): number { const timeDiff = new Date(a.send_at).getTime() - new Date(b.send_at).getTime(); @@ -110,8 +46,8 @@ function insertSorted(arr: UiMessage[], msg: UiMessage): UiMessage[] { } export interface UseRoomWsOptions { - /** VITE_API_BASE_URL value (without /ws suffix) */ - baseUrl: string; + /** Shared RoomWsClient instance (from useRoom context) */ + wsClient: RoomWsClient | null; /** Currently open room ID */ roomId: string | null; /** Limit for initial history load */ @@ -129,21 +65,18 @@ export interface UseRoomWsReturn { isHistoryLoaded: boolean; isLoadingMore: boolean; nextCursor: number | null; - /** Load older messages (called when user scrolls to top) */ loadMore: (cursor?: number | null) => void; } /** - * Manages a WebSocket connection for a single room. + * Manages room message state using the shared RoomWsClient. * - * Features: - * - Auto-reconnect with exponential back-off - * - Per-room message cache so switching rooms preserves scroll position - * - AI streaming chunk accumulation via `streaming_content` - * - `loadMore` for cursor-based history pagination + * Uses WS request/response for message history (initial load + loadMore), + * and WS push events (room_message, ai_stream_chunk) for real-time updates. + * Falls back to HTTP automatically when WS is not connected. */ export function useRoomWs({ - baseUrl, + wsClient, roomId, historyLimit = 50, members = [], @@ -156,31 +89,19 @@ export function useRoomWs({ const [isLoadingMore, setIsLoadingMore] = useState(false); const [nextCursor, setNextCursor] = useState(null); - const wsRef = useRef(null); - const activeRoomIdRef = useRef(null); - const shouldReconnectRef = useRef(true); - const reconnectAttemptRef = useRef(0); - const reconnectTimeoutRef = useRef | null>(null); - const wsTokenRef = useRef(null); - const roomCacheRef = useRef>(new Map()); - /** Ref to current messages for use inside event handlers */ const messagesRef = useRef([]); messagesRef.current = messages; - /** Ref to current nextCursor */ const nextCursorRef = useRef(null); nextCursorRef.current = nextCursor; - /** Ref to members, used for display_name resolution */ const membersRef = useRef([]); membersRef.current = members; - /** Ref for AI streaming RAF batch */ const streamingBatchRef = useRef>(new Map()); const streamingRafRef = useRef(null); - /** Flush streaming batch to state */ const flushStreamingBatch = useCallback(() => { const batch = streamingBatchRef.current; if (batch.size === 0) return; @@ -228,299 +149,155 @@ export function useRoomWs({ streamingRafRef.current = null; }, []); + // Sync WS status to local state useEffect(() => { - const room = activeRoomIdRef.current; + if (!wsClient) { + setStatus('idle'); + return; + } + const handleStatusChange = (newStatus: RoomWsStatus) => { + setStatus(newStatus); + }; + wsClient.on('statusChange', handleStatusChange); + // Sync initial status + setStatus(wsClient.getStatus()); + return () => { + wsClient.off('statusChange', handleStatusChange); + }; + }, [wsClient]); + + // Register push event handlers on wsClient + useEffect(() => { + if (!wsClient) return; + + const handleRoomMessage = (payload: RoomMessagePayload) => { + const currentMembers = membersRef.current; + const existingIds = new Set(messagesRef.current.map((m) => m.id)); + if (existingIds.has(payload.id)) return; + + const member = currentMembers.find((m) => m.user === payload.sender_id); + const display_name = + payload.display_name ?? + (member ? member.user_info?.username ?? member.user : undefined); + const avatar_url = member?.user_info?.avatar_url; + + setMessages((prev) => + insertSorted(prev, { ...payload, display_name, avatar_url }), + ); + }; + + const handleAiStreamChunk = (chunk: AiStreamChunkPayload) => { + onAiStreamChunk?.(chunk); + + streamingBatchRef.current.set(chunk.message_id, { + content: chunk.content, + done: chunk.done, + room_id: chunk.room_id, + }); + + if (streamingRafRef.current == null) { + streamingRafRef.current = requestAnimationFrame(() => { + flushStreamingBatch(); + }); + } + }; + + wsClient.on('roomMessage', handleRoomMessage); + wsClient.on('aiStreamChunk', handleAiStreamChunk); + + return () => { + wsClient.off('roomMessage', handleRoomMessage); + wsClient.off('aiStreamChunk', handleAiStreamChunk); + }; + }, [wsClient, onAiStreamChunk, flushStreamingBatch]); + + // Cache messages when they change + useEffect(() => { + const room = roomId; if (!room) return; roomCacheRef.current.set(room, { messages, isHistoryLoaded, nextCursor, }); - }, [messages, isHistoryLoaded, nextCursor]); - - const connectWs = useCallback( - async (roomUid: string) => { - if (!shouldReconnectRef.current || activeRoomIdRef.current !== roomUid) return; - - // Build URL with token if available - const url = buildWsUrlWithToken(baseUrl, `/ws/rooms/${roomUid}`, wsTokenRef.current); - console.debug('[useRoomWs] connecting to', url, { baseUrl, roomUid }); - - if (reconnectTimeoutRef.current) { - clearTimeout(reconnectTimeoutRef.current); - reconnectTimeoutRef.current = null; - } - - setStatus('connecting'); - setErrorMessage(null); - - const ws = new WebSocket(url); - wsRef.current = ws; - - ws.onopen = () => { - if (activeRoomIdRef.current !== roomUid) return; - reconnectAttemptRef.current = 0; - setStatus('open'); - setErrorMessage(null); - console.debug('[useRoomWs] ws opened for room', roomUid); - }; - - ws.onmessage = (ev: MessageEvent) => { - if (activeRoomIdRef.current !== roomUid) return; - - let event: WsOutEvent; - try { - event = JSON.parse(ev.data) as WsOutEvent; - } catch { - console.warn('[useRoomWs] parse error, data:', ev.data); - setErrorMessage('Invalid WebSocket message'); - return; - } - - if ('error' in event && event.error) { - console.warn('[useRoomWs] error event:', event.error); - setErrorMessage(event.error); - return; - } - - if (!('event' in event) || !event.event) { - console.warn('[useRoomWs] no event field, raw:', event); - return; - } - - console.debug('[useRoomWs] received event type:', event.event.type, event.event); - - switch (event.event.type) { - case 'room_message': { - // Backend sends payload flat on event.event (no data wrapper); also support nested data - const raw = (event.event as any); - const incoming: RoomMessagePayload = raw.data ?? raw; - console.debug('[useRoomWs] room_message:', incoming.id, incoming.content); - - // Use Set for O(1) duplicate check instead of O(n) Array.some - const existingIds = new Set(messagesRef.current.map((m) => m.id)); - if (existingIds.has(incoming.id)) { - console.debug('[useRoomWs] duplicate message, skipping'); - return; - } - - const sender = resolveSender(incoming, membersRef.current); - const display_name = incoming.display_name ?? sender.displayName; - const avatar_url = sender.avatarUrl; - - setMessages((prev) => - insertSorted(prev, { ...incoming, display_name, avatar_url }), - ); - break; - } - - case 'ai_stream_chunk': { - const raw = event.event as any; - const chunk: AiStreamChunkPayload = raw.data ?? raw; - onAiStreamChunk?.(chunk); - - // Batch streaming chunks using RAF to reduce re-render frequency - streamingBatchRef.current.set(chunk.message_id, { - content: chunk.content, - done: chunk.done, - room_id: chunk.room_id, - }); - - if (streamingRafRef.current == null) { - streamingRafRef.current = requestAnimationFrame(() => { - flushStreamingBatch(); - }); - } - break; - } - - default: - break; - } - }; - - ws.onclose = (ev: CloseEvent) => { - console.debug('[useRoomWs] WebSocket closed', { code: ev.code, reason: ev.reason, wasClean: ev.wasClean }); - const activeSocket = wsRef.current; - if (activeSocket !== ws) return; - wsRef.current = null; - if (activeRoomIdRef.current !== roomUid) return; - - setStatus('closed'); - if (shouldReconnectRef.current) { - const attempt = ++reconnectAttemptRef.current; - const delay = Math.min(RECONNECT_BASE_DELAY * 2 ** (attempt - 1), RECONNECT_MAX_DELAY); - reconnectTimeoutRef.current = setTimeout(() => connectWs(roomUid), delay); - } - }; - - ws.onerror = (ev: Event) => { - console.error('[useRoomWs] WebSocket error', ev); - if (activeRoomIdRef.current !== roomUid) return; - setErrorMessage('WebSocket error'); - }; - }, - [baseUrl, onAiStreamChunk], - ); + }, [messages, isHistoryLoaded, nextCursor, roomId]); + // Subscribe to room and load messages useEffect(() => { - const prevRoom = activeRoomIdRef.current; + if (!wsClient || !roomId) return; - if (!roomId) { - // Disconnect - activeRoomIdRef.current = null; - shouldReconnectRef.current = false; - if (reconnectTimeoutRef.current) { - clearTimeout(reconnectTimeoutRef.current); - reconnectTimeoutRef.current = null; - } - if (wsRef.current) { - wsRef.current.close(); - wsRef.current = null; - } + // Restore from cache or load fresh + const cached = roomCacheRef.current.get(roomId); + if (cached) { + setMessages(cached.messages); + setIsHistoryLoaded(cached.isHistoryLoaded); + setNextCursor(cached.nextCursor); + } else { setMessages([]); - setStatus('idle'); - setErrorMessage(null); setIsHistoryLoaded(false); setNextCursor(null); - return; + + // Use WS (with HTTP fallback) to load initial history + wsClient + .messageList(roomId, { limit: historyLimit }) + .then((resp) => { + const msgs = (resp.messages ?? []).map((m) => { + const member = membersRef.current.find((mb) => mb.user === m.sender_id); + return { + ...m, + room_id: m.room, + thread_id: m.thread ?? null, + display_name: m.display_name ?? member?.user_info?.username ?? member?.user, + avatar_url: member?.user_info?.avatar_url, + } as UiMessage; + }); + setMessages(msgs); + setNextCursor(msgs.length > 0 ? msgs[msgs.length - 1].seq : null); + setIsHistoryLoaded(msgs.length < historyLimit); + }) + .catch(() => { + toast.error('Failed to load message history'); + setIsHistoryLoaded(true); + }); } - // Save previous room's state - if (prevRoom && prevRoom !== roomId) { - roomCacheRef.current.set(prevRoom, { - messages: messagesRef.current, - isHistoryLoaded, - nextCursor: nextCursorRef.current, - }); - } - - activeRoomIdRef.current = roomId; - shouldReconnectRef.current = true; - reconnectAttemptRef.current = 0; - - // Fetch WS token before connecting (skip if we have a recent token) - const connectWithToken = async () => { - // Only fetch a new token if we don't have one or it's older than 4 minutes - // (tokens have 5-min TTL) - const shouldFetchToken = !wsTokenRef.current; - if (shouldFetchToken) { - try { - const token = await requestWsToken(); - wsTokenRef.current = token; - } catch (error) { - console.warn('[useRoomWs] Failed to fetch WS token, falling back to cookie auth:', error); - wsTokenRef.current = null; - } - } - - // Restore from cache or start fresh - const cached = roomCacheRef.current.get(roomId); - if (cached) { - setMessages(cached.messages); - setIsHistoryLoaded(cached.isHistoryLoaded); - setNextCursor(cached.nextCursor); - } else { - setMessages([]); - setIsHistoryLoaded(false); - setNextCursor(null); - // Load initial history via REST (WS is push-only, can't request history) - if (roomId) { - client - .get({ url: `/api/rooms/${roomId}/messages`, params: { limit: historyLimit } }) - .then((resp) => { - const r = resp as AxiosResponse; - if (activeRoomIdRef.current !== roomId) return; - const msgs = (r.data?.data?.messages ?? []).map((m) => { - const sender = resolveSender({ ...m, room_id: m.room } as RoomMessagePayload, members); - const display_name = m.display_name ?? sender.displayName; - const avatar_url = sender.avatarUrl; - return { - ...m, - room_id: m.room, - thread_id: m.thread ?? null, - display_name, - avatar_url, - }; - }); - setMessages(msgs); - setNextCursor(msgs.length > 0 ? msgs[msgs.length - 1].seq : null); - setIsHistoryLoaded(msgs.length < historyLimit); - }) - .catch(() => { - if (activeRoomIdRef.current !== roomId) return; - toast.error('Failed to load message history'); - setIsHistoryLoaded(true); - }); - } - } - - // Close other connections (shouldn't be any in practice) - if (wsRef.current) { - wsRef.current.close(); - wsRef.current = null; - } - - connectWs(roomId); - }; - - connectWithToken(); + // Subscribe to room push events + wsClient.subscribeRoom(roomId).catch(() => {}); return () => { - // Save state before unmounting - const room = activeRoomIdRef.current; - if (room) { - roomCacheRef.current.set(room, { - messages: messagesRef.current, - isHistoryLoaded: isHistoryLoaded, - nextCursor: nextCursorRef.current, - }); - } - shouldReconnectRef.current = false; - if (reconnectTimeoutRef.current) { - clearTimeout(reconnectTimeoutRef.current); - reconnectTimeoutRef.current = null; - } - if (wsRef.current) { - wsRef.current.close(); - wsRef.current = null; - } - if (streamingRafRef.current != null) { - cancelAnimationFrame(streamingRafRef.current); - streamingRafRef.current = null; - } - // Don't clear activeRoomIdRef or wsTokenRef — preserving them - // prevents unnecessary re-token-fetch and reconnect on re-mount + wsClient.unsubscribeRoom(roomId).catch(() => {}); }; - }, [roomId, connectWs]); + }, [wsClient, roomId, historyLimit]); const loadMore = useCallback( async (cursor?: number | null) => { - if (!roomId || isLoadingMore) return; - // Use REST API for history pagination — WS is push-only + if (!wsClient || !roomId || isLoadingMore) return; const effectiveCursor = cursor ?? nextCursorRef.current; if (effectiveCursor == null) return; setIsLoadingMore(true); try { - const resp = await client.get({ url: `/api/rooms/${roomId}/messages`, params: { before_seq: effectiveCursor, limit: historyLimit } }) as AxiosResponse; - const older = (resp.data?.data?.messages ?? []).map((m) => { - const sender = resolveSender({ ...m, room_id: m.room } as RoomMessagePayload, membersRef.current); - const display_name = m.display_name ?? sender.displayName; - const avatar_url = sender.avatarUrl; + // Use WS (with HTTP fallback) for paginated history + const resp = await wsClient.messageList(roomId, { + beforeSeq: effectiveCursor, + limit: historyLimit, + }); + const older = (resp.messages ?? []).map((m) => { + const member = membersRef.current.find((mb) => mb.user === m.sender_id); return { ...m, room_id: m.room, thread_id: m.thread ?? null, - display_name, - avatar_url, - }; + display_name: m.display_name ?? member?.user_info?.username ?? member?.user, + avatar_url: member?.user_info?.avatar_url, + } as UiMessage; }); + if (older.length === 0) { setIsHistoryLoaded(true); return; } - // Prepend older messages (they arrive in ascending seq order) + setMessages((prev) => { const existingIds = new Set(prev.map((m) => m.id)); const newOnes = older.filter((m) => !existingIds.has(m.id)); @@ -528,20 +305,18 @@ export function useRoomWs({ setIsHistoryLoaded(true); return prev; } - // New cursor = smallest seq among loaded messages const newCursor = newOnes[newOnes.length - 1].seq; setNextCursor(newCursor > 0 ? newCursor : null); return [...newOnes, ...prev]; }); } catch { - // Non-critical — show toast so user knows the load failed toast.error('Failed to load more messages'); setIsHistoryLoaded(true); } finally { setIsLoadingMore(false); } }, - [roomId, historyLimit, isLoadingMore], + [wsClient, roomId, historyLimit, isLoadingMore], ); return useMemo(