fix(room): use WS for message fetching, eliminate duplicate WS connection

- Fix initial room load being skipped: `setup()` called `loadMoreRef.current`
  which was null on first mount (ref assigned in later effect). Call `loadMore`
  directly so the initial fetch always fires. WS message.list used when
  connected, HTTP fallback otherwise.
- Rewrite useRoomWs to use shared RoomWsClient instead of creating its own
  raw WebSocket, eliminating duplicate WS connection per room.
- Remove dead loadMoreRef now that setup calls loadMore directly.
This commit is contained in:
ZhenYi 2026-04-17 21:18:56 +08:00
parent cf5c728286
commit f2a2ae5d7f
2 changed files with 139 additions and 368 deletions

View File

@ -238,8 +238,6 @@ export function RoomProvider({
}
}, [activeRoomId]);
const loadMoreRef = useRef<((cursor?: number | null) => Promise<void>) | null>(null);
useEffect(() => {
const client = wsClientRef.current;
if (!activeRoomId || !client) return;
@ -253,7 +251,9 @@ export function RoomProvider({
const roomId = activeRoomIdRef.current;
if (!roomId) return;
await client.subscribeRoom(roomId);
loadMoreRef.current?.(null);
// loadMoreRef.current is null on first mount (set later in render order).
// Call loadMore directly to ensure initial message fetch always runs.
loadMore(null);
};
setup();
@ -376,10 +376,6 @@ export function RoomProvider({
[activeRoomId],
);
useEffect(() => {
loadMoreRef.current = loadMore;
}, [loadMore]);
const [members, setMembers] = useState<RoomMemberResponse[]>([]);
const [membersLoading, setMembersLoading] = useState(false);

View File

@ -1,95 +1,31 @@
import { useCallback, useEffect, useMemo, useRef, useState } from 'react';
import { toast } from 'sonner';
import {
type AiStreamChunkPayload,
type RoomMessagePayload,
type RoomWsStatus,
type WsOutEvent,
} from '@/lib/room';
import { requestWsToken, buildWsUrlWithToken } from '@/lib/ws-token';
import { client } from '@/client/client.gen';
import type { AxiosResponse } from 'axios';
import type {
AiStreamChunkPayload,
RoomMessagePayload,
RoomWsStatus,
RoomWsClient,
} from '@/lib/room-ws-client';
import type { RoomMemberResponse } from '@/client';
const RECONNECT_BASE_DELAY = 1_000;
const RECONNECT_MAX_DELAY = 15_000;
/** A message as held in the UI state */
export type UiMessage = RoomMessagePayload & {
/** Set while the server is streaming an AI reply into this message */
is_streaming?: boolean;
/** Accumulated streaming content; flushed to content on `done: true` */
streaming_content?: string;
/** Display name resolved from sender_id; undefined if not yet resolved */
display_name?: string;
/** Avatar URL resolved from members list */
avatar_url?: string;
/** For optimistic UI: message failed to send */
isOptimisticError?: boolean;
/** Reply to message ID */
in_reply_to?: string | null;
/** Edited timestamp */
edited_at?: string | null;
/** Revoked timestamp */
revoked?: string | null;
/** Revoked by user ID */
revoked_by?: string | null;
};
type RoomMessageCacheEntry = {
messages: UiMessage[];
isHistoryLoaded: boolean;
/** seq of the latest message, used as cursor for pagination */
nextCursor: number | null;
};
interface MessageListResponse {
code: number;
message: string;
data: { messages: RestMessage[]; total: number };
}
/** REST message shape (matches RoomMessageResponse from the SDK) */
interface RestMessage {
id: string;
seq: number;
room: string;
sender_type: string;
sender_id?: string | null;
display_name?: string | null;
thread?: string | null;
/** A message as held in the UI state */
export type UiMessage = RoomMessagePayload & {
is_streaming?: boolean;
streaming_content?: string;
display_name?: string;
avatar_url?: string;
isOptimisticError?: boolean;
in_reply_to?: string | null;
content: string;
content_type: string;
edited_at?: string | null;
send_at: string;
revoked?: string | null;
revoked_by?: string | null;
}
/** Display name and avatar URL resolved from a message's sender */
interface SenderInfo {
displayName: string;
avatarUrl: string | undefined;
}
/** Resolve displayName and avatar URL for a message sender.
* - AI messages: use sender_id (the model UUID)
* - User messages: look up user_info from members list, fall back to sender_id */
function resolveSender(payload: RoomMessagePayload, members: RoomMemberResponse[]): SenderInfo {
if (payload.sender_type === 'ai') {
return { displayName: payload.sender_id ?? 'AI', avatarUrl: undefined };
}
if (payload.sender_id) {
const member = members.find((m) => m.user === payload.sender_id);
if (member) {
const username = member.user_info?.username ?? member.user;
return { displayName: username, avatarUrl: member.user_info?.avatar_url ?? undefined };
}
}
if (payload.sender_type === 'system') return { displayName: 'System', avatarUrl: undefined };
return { displayName: payload.sender_type, avatarUrl: undefined };
}
};
function compareMessages(a: UiMessage, b: UiMessage): number {
const timeDiff = new Date(a.send_at).getTime() - new Date(b.send_at).getTime();
@ -110,8 +46,8 @@ function insertSorted(arr: UiMessage[], msg: UiMessage): UiMessage[] {
}
export interface UseRoomWsOptions {
/** VITE_API_BASE_URL value (without /ws suffix) */
baseUrl: string;
/** Shared RoomWsClient instance (from useRoom context) */
wsClient: RoomWsClient | null;
/** Currently open room ID */
roomId: string | null;
/** Limit for initial history load */
@ -129,21 +65,18 @@ export interface UseRoomWsReturn {
isHistoryLoaded: boolean;
isLoadingMore: boolean;
nextCursor: number | null;
/** Load older messages (called when user scrolls to top) */
loadMore: (cursor?: number | null) => void;
}
/**
* Manages a WebSocket connection for a single room.
* Manages room message state using the shared RoomWsClient.
*
* Features:
* - Auto-reconnect with exponential back-off
* - Per-room message cache so switching rooms preserves scroll position
* - AI streaming chunk accumulation via `streaming_content`
* - `loadMore` for cursor-based history pagination
* Uses WS request/response for message history (initial load + loadMore),
* and WS push events (room_message, ai_stream_chunk) for real-time updates.
* Falls back to HTTP automatically when WS is not connected.
*/
export function useRoomWs({
baseUrl,
wsClient,
roomId,
historyLimit = 50,
members = [],
@ -156,31 +89,19 @@ export function useRoomWs({
const [isLoadingMore, setIsLoadingMore] = useState(false);
const [nextCursor, setNextCursor] = useState<number | null>(null);
const wsRef = useRef<WebSocket | null>(null);
const activeRoomIdRef = useRef<string | null>(null);
const shouldReconnectRef = useRef(true);
const reconnectAttemptRef = useRef(0);
const reconnectTimeoutRef = useRef<ReturnType<typeof setTimeout> | null>(null);
const wsTokenRef = useRef<string | null>(null);
const roomCacheRef = useRef<Map<string, RoomMessageCacheEntry>>(new Map());
/** Ref to current messages for use inside event handlers */
const messagesRef = useRef<UiMessage[]>([]);
messagesRef.current = messages;
/** Ref to current nextCursor */
const nextCursorRef = useRef<number | null>(null);
nextCursorRef.current = nextCursor;
/** Ref to members, used for display_name resolution */
const membersRef = useRef<RoomMemberResponse[]>([]);
membersRef.current = members;
/** Ref for AI streaming RAF batch */
const streamingBatchRef = useRef<Map<string, { content: string; done: boolean; room_id: string }>>(new Map());
const streamingRafRef = useRef<number | null>(null);
/** Flush streaming batch to state */
const flushStreamingBatch = useCallback(() => {
const batch = streamingBatchRef.current;
if (batch.size === 0) return;
@ -228,98 +149,46 @@ export function useRoomWs({
streamingRafRef.current = null;
}, []);
// Sync WS status to local state
useEffect(() => {
const room = activeRoomIdRef.current;
if (!room) return;
roomCacheRef.current.set(room, {
messages,
isHistoryLoaded,
nextCursor,
});
}, [messages, isHistoryLoaded, nextCursor]);
const connectWs = useCallback(
async (roomUid: string) => {
if (!shouldReconnectRef.current || activeRoomIdRef.current !== roomUid) return;
// Build URL with token if available
const url = buildWsUrlWithToken(baseUrl, `/ws/rooms/${roomUid}`, wsTokenRef.current);
console.debug('[useRoomWs] connecting to', url, { baseUrl, roomUid });
if (reconnectTimeoutRef.current) {
clearTimeout(reconnectTimeoutRef.current);
reconnectTimeoutRef.current = null;
if (!wsClient) {
setStatus('idle');
return;
}
setStatus('connecting');
setErrorMessage(null);
const ws = new WebSocket(url);
wsRef.current = ws;
ws.onopen = () => {
if (activeRoomIdRef.current !== roomUid) return;
reconnectAttemptRef.current = 0;
setStatus('open');
setErrorMessage(null);
console.debug('[useRoomWs] ws opened for room', roomUid);
const handleStatusChange = (newStatus: RoomWsStatus) => {
setStatus(newStatus);
};
wsClient.on('statusChange', handleStatusChange);
// Sync initial status
setStatus(wsClient.getStatus());
return () => {
wsClient.off('statusChange', handleStatusChange);
};
}, [wsClient]);
ws.onmessage = (ev: MessageEvent<string>) => {
if (activeRoomIdRef.current !== roomUid) return;
// Register push event handlers on wsClient
useEffect(() => {
if (!wsClient) return;
let event: WsOutEvent;
try {
event = JSON.parse(ev.data) as WsOutEvent;
} catch {
console.warn('[useRoomWs] parse error, data:', ev.data);
setErrorMessage('Invalid WebSocket message');
return;
}
if ('error' in event && event.error) {
console.warn('[useRoomWs] error event:', event.error);
setErrorMessage(event.error);
return;
}
if (!('event' in event) || !event.event) {
console.warn('[useRoomWs] no event field, raw:', event);
return;
}
console.debug('[useRoomWs] received event type:', event.event.type, event.event);
switch (event.event.type) {
case 'room_message': {
// Backend sends payload flat on event.event (no data wrapper); also support nested data
const raw = (event.event as any);
const incoming: RoomMessagePayload = raw.data ?? raw;
console.debug('[useRoomWs] room_message:', incoming.id, incoming.content);
// Use Set for O(1) duplicate check instead of O(n) Array.some
const handleRoomMessage = (payload: RoomMessagePayload) => {
const currentMembers = membersRef.current;
const existingIds = new Set(messagesRef.current.map((m) => m.id));
if (existingIds.has(incoming.id)) {
console.debug('[useRoomWs] duplicate message, skipping');
return;
}
if (existingIds.has(payload.id)) return;
const sender = resolveSender(incoming, membersRef.current);
const display_name = incoming.display_name ?? sender.displayName;
const avatar_url = sender.avatarUrl;
const member = currentMembers.find((m) => m.user === payload.sender_id);
const display_name =
payload.display_name ??
(member ? member.user_info?.username ?? member.user : undefined);
const avatar_url = member?.user_info?.avatar_url;
setMessages((prev) =>
insertSorted(prev, { ...incoming, display_name, avatar_url }),
insertSorted(prev, { ...payload, display_name, avatar_url }),
);
break;
}
};
case 'ai_stream_chunk': {
const raw = event.event as any;
const chunk: AiStreamChunkPayload = raw.data ?? raw;
const handleAiStreamChunk = (chunk: AiStreamChunkPayload) => {
onAiStreamChunk?.(chunk);
// Batch streaming chunks using RAF to reduce re-render frequency
streamingBatchRef.current.set(chunk.message_id, {
content: chunk.content,
done: chunk.done,
@ -331,90 +200,33 @@ export function useRoomWs({
flushStreamingBatch();
});
}
break;
}
default:
break;
}
};
ws.onclose = (ev: CloseEvent) => {
console.debug('[useRoomWs] WebSocket closed', { code: ev.code, reason: ev.reason, wasClean: ev.wasClean });
const activeSocket = wsRef.current;
if (activeSocket !== ws) return;
wsRef.current = null;
if (activeRoomIdRef.current !== roomUid) return;
wsClient.on('roomMessage', handleRoomMessage);
wsClient.on('aiStreamChunk', handleAiStreamChunk);
setStatus('closed');
if (shouldReconnectRef.current) {
const attempt = ++reconnectAttemptRef.current;
const delay = Math.min(RECONNECT_BASE_DELAY * 2 ** (attempt - 1), RECONNECT_MAX_DELAY);
reconnectTimeoutRef.current = setTimeout(() => connectWs(roomUid), delay);
}
return () => {
wsClient.off('roomMessage', handleRoomMessage);
wsClient.off('aiStreamChunk', handleAiStreamChunk);
};
}, [wsClient, onAiStreamChunk, flushStreamingBatch]);
ws.onerror = (ev: Event) => {
console.error('[useRoomWs] WebSocket error', ev);
if (activeRoomIdRef.current !== roomUid) return;
setErrorMessage('WebSocket error');
};
},
[baseUrl, onAiStreamChunk],
);
// Cache messages when they change
useEffect(() => {
const prevRoom = activeRoomIdRef.current;
if (!roomId) {
// Disconnect
activeRoomIdRef.current = null;
shouldReconnectRef.current = false;
if (reconnectTimeoutRef.current) {
clearTimeout(reconnectTimeoutRef.current);
reconnectTimeoutRef.current = null;
}
if (wsRef.current) {
wsRef.current.close();
wsRef.current = null;
}
setMessages([]);
setStatus('idle');
setErrorMessage(null);
setIsHistoryLoaded(false);
setNextCursor(null);
return;
}
// Save previous room's state
if (prevRoom && prevRoom !== roomId) {
roomCacheRef.current.set(prevRoom, {
messages: messagesRef.current,
const room = roomId;
if (!room) return;
roomCacheRef.current.set(room, {
messages,
isHistoryLoaded,
nextCursor: nextCursorRef.current,
nextCursor,
});
}
}, [messages, isHistoryLoaded, nextCursor, roomId]);
activeRoomIdRef.current = roomId;
shouldReconnectRef.current = true;
reconnectAttemptRef.current = 0;
// Subscribe to room and load messages
useEffect(() => {
if (!wsClient || !roomId) return;
// Fetch WS token before connecting (skip if we have a recent token)
const connectWithToken = async () => {
// Only fetch a new token if we don't have one or it's older than 4 minutes
// (tokens have 5-min TTL)
const shouldFetchToken = !wsTokenRef.current;
if (shouldFetchToken) {
try {
const token = await requestWsToken();
wsTokenRef.current = token;
} catch (error) {
console.warn('[useRoomWs] Failed to fetch WS token, falling back to cookie auth:', error);
wsTokenRef.current = null;
}
}
// Restore from cache or start fresh
// Restore from cache or load fresh
const cached = roomCacheRef.current.get(roomId);
if (cached) {
setMessages(cached.messages);
@ -424,103 +236,68 @@ export function useRoomWs({
setMessages([]);
setIsHistoryLoaded(false);
setNextCursor(null);
// Load initial history via REST (WS is push-only, can't request history)
if (roomId) {
client
.get({ url: `/api/rooms/${roomId}/messages`, params: { limit: historyLimit } })
// Use WS (with HTTP fallback) to load initial history
wsClient
.messageList(roomId, { limit: historyLimit })
.then((resp) => {
const r = resp as AxiosResponse<MessageListResponse>;
if (activeRoomIdRef.current !== roomId) return;
const msgs = (r.data?.data?.messages ?? []).map((m) => {
const sender = resolveSender({ ...m, room_id: m.room } as RoomMessagePayload, members);
const display_name = m.display_name ?? sender.displayName;
const avatar_url = sender.avatarUrl;
const msgs = (resp.messages ?? []).map((m) => {
const member = membersRef.current.find((mb) => mb.user === m.sender_id);
return {
...m,
room_id: m.room,
thread_id: m.thread ?? null,
display_name,
avatar_url,
};
display_name: m.display_name ?? member?.user_info?.username ?? member?.user,
avatar_url: member?.user_info?.avatar_url,
} as UiMessage;
});
setMessages(msgs);
setNextCursor(msgs.length > 0 ? msgs[msgs.length - 1].seq : null);
setIsHistoryLoaded(msgs.length < historyLimit);
})
.catch(() => {
if (activeRoomIdRef.current !== roomId) return;
toast.error('Failed to load message history');
setIsHistoryLoaded(true);
});
}
}
// Close other connections (shouldn't be any in practice)
if (wsRef.current) {
wsRef.current.close();
wsRef.current = null;
}
connectWs(roomId);
};
connectWithToken();
// Subscribe to room push events
wsClient.subscribeRoom(roomId).catch(() => {});
return () => {
// Save state before unmounting
const room = activeRoomIdRef.current;
if (room) {
roomCacheRef.current.set(room, {
messages: messagesRef.current,
isHistoryLoaded: isHistoryLoaded,
nextCursor: nextCursorRef.current,
});
}
shouldReconnectRef.current = false;
if (reconnectTimeoutRef.current) {
clearTimeout(reconnectTimeoutRef.current);
reconnectTimeoutRef.current = null;
}
if (wsRef.current) {
wsRef.current.close();
wsRef.current = null;
}
if (streamingRafRef.current != null) {
cancelAnimationFrame(streamingRafRef.current);
streamingRafRef.current = null;
}
// Don't clear activeRoomIdRef or wsTokenRef — preserving them
// prevents unnecessary re-token-fetch and reconnect on re-mount
wsClient.unsubscribeRoom(roomId).catch(() => {});
};
}, [roomId, connectWs]);
}, [wsClient, roomId, historyLimit]);
const loadMore = useCallback(
async (cursor?: number | null) => {
if (!roomId || isLoadingMore) return;
// Use REST API for history pagination — WS is push-only
if (!wsClient || !roomId || isLoadingMore) return;
const effectiveCursor = cursor ?? nextCursorRef.current;
if (effectiveCursor == null) return;
setIsLoadingMore(true);
try {
const resp = await client.get({ url: `/api/rooms/${roomId}/messages`, params: { before_seq: effectiveCursor, limit: historyLimit } }) as AxiosResponse<MessageListResponse>;
const older = (resp.data?.data?.messages ?? []).map((m) => {
const sender = resolveSender({ ...m, room_id: m.room } as RoomMessagePayload, membersRef.current);
const display_name = m.display_name ?? sender.displayName;
const avatar_url = sender.avatarUrl;
// Use WS (with HTTP fallback) for paginated history
const resp = await wsClient.messageList(roomId, {
beforeSeq: effectiveCursor,
limit: historyLimit,
});
const older = (resp.messages ?? []).map((m) => {
const member = membersRef.current.find((mb) => mb.user === m.sender_id);
return {
...m,
room_id: m.room,
thread_id: m.thread ?? null,
display_name,
avatar_url,
};
display_name: m.display_name ?? member?.user_info?.username ?? member?.user,
avatar_url: member?.user_info?.avatar_url,
} as UiMessage;
});
if (older.length === 0) {
setIsHistoryLoaded(true);
return;
}
// Prepend older messages (they arrive in ascending seq order)
setMessages((prev) => {
const existingIds = new Set(prev.map((m) => m.id));
const newOnes = older.filter((m) => !existingIds.has(m.id));
@ -528,20 +305,18 @@ export function useRoomWs({
setIsHistoryLoaded(true);
return prev;
}
// New cursor = smallest seq among loaded messages
const newCursor = newOnes[newOnes.length - 1].seq;
setNextCursor(newCursor > 0 ? newCursor : null);
return [...newOnes, ...prev];
});
} catch {
// Non-critical — show toast so user knows the load failed
toast.error('Failed to load more messages');
setIsHistoryLoaded(true);
} finally {
setIsLoadingMore(false);
}
},
[roomId, historyLimit, isLoadingMore],
[wsClient, roomId, historyLimit, isLoadingMore],
);
return useMemo(