diff --git a/libs/api/room/mod.rs b/libs/api/room/mod.rs index 92a311e..8d1cdc9 100644 --- a/libs/api/room/mod.rs +++ b/libs/api/room/mod.rs @@ -118,6 +118,11 @@ pub fn init_room_routes(cfg: &mut web::ServiceConfig) { "/rooms/{room_id}/messages/{message_id}/reactions", web::get().to(reaction::reaction_get), ) + // batch reactions + .route( + "/rooms/{room_id}/messages/reactions/batch", + web::get().to(reaction::reaction_batch), + ) // message search .route( "/rooms/{room_id}/messages/search", diff --git a/libs/api/room/reaction.rs b/libs/api/room/reaction.rs index 2e70bdd..8673a6e 100644 --- a/libs/api/room/reaction.rs +++ b/libs/api/room/reaction.rs @@ -18,6 +18,11 @@ pub struct MessageSearchQuery { pub offset: Option, } +#[derive(Debug, serde::Deserialize, IntoParams)] +pub struct ReactionBatchQuery { + pub message_ids: Vec, +} + #[utoipa::path( post, path = "/api/rooms/{room_id}/messages/{message_id}/reactions", @@ -119,6 +124,38 @@ pub async fn reaction_get( Ok(ApiResponse::ok(resp).to_response()) } +#[utoipa::path( + get, + path = "/api/rooms/{room_id}/messages/reactions/batch", + params( + ("room_id" = Uuid, Path), + ("message_ids" = Vec, Query, description = "List of message IDs to fetch reactions for"), + ), + responses( + (status = 200, description = "Batch get reactions", body = ApiResponse>), + (status = 401, description = "Unauthorized"), + ), + tag = "Room" +)] +pub async fn reaction_batch( + service: web::Data, + session: Session, + path: web::Path, + query: web::Query, +) -> Result { + let room_id = path.into_inner(); + let user_id = session + .user() + .ok_or_else(|| ApiError::from(service::error::AppError::Unauthorized))?; + let ctx = WsUserContext::new(user_id); + let resp = service + .room + .message_reactions_batch(room_id, query.into_inner().message_ids, &ctx) + .await + .map_err(ApiError::from)?; + Ok(ApiResponse::ok(resp).to_response()) +} + #[utoipa::path( get, path = "/api/rooms/{room_id}/messages/search", diff --git a/src/components/room/RoomMessageBubble.tsx b/src/components/room/RoomMessageBubble.tsx index fe497ab..655f918 100644 --- a/src/components/room/RoomMessageBubble.tsx +++ b/src/components/room/RoomMessageBubble.tsx @@ -503,21 +503,3 @@ export const RoomMessageBubble = memo(function RoomMessageBubble({ ); }); - -function EmojiPicker({ onEmojiSelect }: { onEmojiSelect: (emoji: string) => void }) { - return ( -
- {COMMON_EMOJIS.map((emoji) => ( - - ))} -
- ); -} diff --git a/src/contexts/room-context.tsx b/src/contexts/room-context.tsx index 51019ec..a87c8ad 100644 --- a/src/contexts/room-context.tsx +++ b/src/contexts/room-context.tsx @@ -25,6 +25,7 @@ import { type RoomMessagePayload, type RoomCategoryResponse as WsRoomCategoryResponse, type RoomReactionUpdatedPayload, + type ReactionListData, } from '@/lib/room-ws-client'; import { requestWsToken } from '@/lib/ws-token'; import { useUser } from '@/contexts'; @@ -221,7 +222,6 @@ export function RoomProvider({ useEffect(() => { if (prevRoomIdRef.current !== activeRoomId) { - const oldRoomId = prevRoomIdRef.current; prevRoomIdRef.current = activeRoomId; loadMessagesAbortRef.current?.abort(); loadMessagesAbortRef.current = null; @@ -266,15 +266,15 @@ export function RoomProvider({ */ const thisLoadReactions = ( roomId: string, - client: NonNullable>, + client: RoomWsClient, msgs: MessageWithMeta[], ) => { const msgIds = msgs.map((m) => m.id); if (msgIds.length === 0) return; client .reactionListBatch(roomId, msgIds) - .then((reactionResults) => { - const reactionMap = new Map(); + .then((reactionResults: ReactionListData[]) => { + const reactionMap = new Map(); for (const result of reactionResults) { if (result.reactions.length > 0) { reactionMap.set(result.message_id, result.reactions); diff --git a/src/hooks/useRoomWs.ts b/src/hooks/useRoomWs.ts deleted file mode 100644 index 694eacf..0000000 --- a/src/hooks/useRoomWs.ts +++ /dev/null @@ -1,334 +0,0 @@ -import { useCallback, useEffect, useMemo, useRef, useState } from 'react'; -import { toast } from 'sonner'; -import type { - AiStreamChunkPayload, - RoomMessagePayload, - RoomWsStatus, - RoomWsClient, -} from '@/lib/room-ws-client'; -import type { RoomMemberResponse } from '@/client'; - -type RoomMessageCacheEntry = { - messages: UiMessage[]; - isHistoryLoaded: boolean; - nextCursor: number | null; -}; - -/** A message as held in the UI state */ -export type UiMessage = RoomMessagePayload & { - is_streaming?: boolean; - streaming_content?: string; - display_name?: string; - avatar_url?: string; - isOptimisticError?: boolean; - in_reply_to?: string | null; - edited_at?: string | null; - revoked?: string | null; - revoked_by?: string | null; -}; - -function compareMessages(a: UiMessage, b: UiMessage): number { - const timeDiff = new Date(a.send_at).getTime() - new Date(b.send_at).getTime(); - return timeDiff !== 0 ? timeDiff : a.id.localeCompare(b.id); -} - -function insertSorted(arr: UiMessage[], msg: UiMessage): UiMessage[] { - const result = [...arr]; - let lo = 0; - let hi = result.length; - while (lo < hi) { - const mid = (lo + hi) >>> 1; - if (compareMessages(result[mid], msg) < 0) lo = mid + 1; - else hi = mid; - } - result.splice(lo, 0, msg); - return result; -} - -export interface UseRoomWsOptions { - /** Shared RoomWsClient instance (from useRoom context) */ - wsClient: RoomWsClient | null; - /** Currently open room ID */ - roomId: string | null; - /** Limit for initial history load */ - historyLimit?: number; - /** Room members, used to resolve display_name for user messages */ - members?: RoomMemberResponse[]; - /** Called when the AI streaming chunk arrives */ - onAiStreamChunk?: (payload: AiStreamChunkPayload) => void; -} - -export interface UseRoomWsReturn { - messages: UiMessage[]; - status: RoomWsStatus; - errorMessage: string | null; - isHistoryLoaded: boolean; - isLoadingMore: boolean; - nextCursor: number | null; - loadMore: (cursor?: number | null) => void; -} - -/** - * Manages room message state using the shared RoomWsClient. - * - * Uses WS request/response for message history (initial load + loadMore), - * and WS push events (room_message, ai_stream_chunk) for real-time updates. - * Falls back to HTTP automatically when WS is not connected. - */ -export function useRoomWs({ - wsClient, - roomId, - historyLimit = 50, - members = [], - onAiStreamChunk, -}: UseRoomWsOptions): UseRoomWsReturn { - const [messages, setMessages] = useState([]); - const [status, setStatus] = useState('idle'); - const [errorMessage, setErrorMessage] = useState(null); - const [isHistoryLoaded, setIsHistoryLoaded] = useState(false); - const [isLoadingMore, setIsLoadingMore] = useState(false); - const [nextCursor, setNextCursor] = useState(null); - - const roomCacheRef = useRef>(new Map()); - const messagesRef = useRef([]); - messagesRef.current = messages; - - const nextCursorRef = useRef(null); - nextCursorRef.current = nextCursor; - - const membersRef = useRef([]); - membersRef.current = members; - - const streamingBatchRef = useRef>(new Map()); - const streamingRafRef = useRef(null); - - const flushStreamingBatch = useCallback(() => { - const batch = streamingBatchRef.current; - if (batch.size === 0) return; - - setMessages((prev) => { - const next = [...prev]; - let changed = false; - - for (const [messageId, chunk] of batch) { - const idx = next.findIndex((m) => m.id === messageId); - if (idx === -1) { - const placeholder: UiMessage = { - id: messageId, - room_id: chunk.room_id ?? next.find(() => true)?.room_id ?? '', - sender_type: 'ai', - content: chunk.done ? chunk.content : '', - content_type: 'text', - send_at: new Date().toISOString(), - seq: 0, - display_name: 'AI', - is_streaming: !chunk.done, - streaming_content: chunk.done ? undefined : chunk.content, - }; - next.push(placeholder); - changed = true; - } else { - const updated = { ...next[idx] }; - if (chunk.done) { - updated.is_streaming = false; - updated.content = chunk.content; - updated.streaming_content = undefined; - } else { - updated.is_streaming = true; - updated.streaming_content = (updated.streaming_content ?? updated.content) + chunk.content; - } - next[idx] = updated; - changed = true; - } - } - - return changed ? next : prev; - }); - - streamingBatchRef.current.clear(); - streamingRafRef.current = null; - }, []); - - // Sync WS status to local state - useEffect(() => { - if (!wsClient) { - setStatus('idle'); - return; - } - const handleStatusChange = (newStatus: RoomWsStatus) => { - setStatus(newStatus); - }; - wsClient.on('statusChange', handleStatusChange); - // Sync initial status - setStatus(wsClient.getStatus()); - return () => { - wsClient.off('statusChange', handleStatusChange); - }; - }, [wsClient]); - - // Register push event handlers on wsClient - useEffect(() => { - if (!wsClient) return; - - const handleRoomMessage = (payload: RoomMessagePayload) => { - const currentMembers = membersRef.current; - const existingIds = new Set(messagesRef.current.map((m) => m.id)); - if (existingIds.has(payload.id)) return; - - const member = currentMembers.find((m) => m.user === payload.sender_id); - const display_name = - payload.display_name ?? - (member ? member.user_info?.username ?? member.user : undefined); - const avatar_url = member?.user_info?.avatar_url; - - setMessages((prev) => - insertSorted(prev, { ...payload, display_name, avatar_url }), - ); - }; - - const handleAiStreamChunk = (chunk: AiStreamChunkPayload) => { - onAiStreamChunk?.(chunk); - - streamingBatchRef.current.set(chunk.message_id, { - content: chunk.content, - done: chunk.done, - room_id: chunk.room_id, - }); - - if (streamingRafRef.current == null) { - streamingRafRef.current = requestAnimationFrame(() => { - flushStreamingBatch(); - }); - } - }; - - wsClient.on('roomMessage', handleRoomMessage); - wsClient.on('aiStreamChunk', handleAiStreamChunk); - - return () => { - wsClient.off('roomMessage', handleRoomMessage); - wsClient.off('aiStreamChunk', handleAiStreamChunk); - }; - }, [wsClient, onAiStreamChunk, flushStreamingBatch]); - - // Cache messages when they change - useEffect(() => { - const room = roomId; - if (!room) return; - roomCacheRef.current.set(room, { - messages, - isHistoryLoaded, - nextCursor, - }); - }, [messages, isHistoryLoaded, nextCursor, roomId]); - - // Subscribe to room and load messages - useEffect(() => { - if (!wsClient || !roomId) return; - - // Restore from cache or load fresh - const cached = roomCacheRef.current.get(roomId); - if (cached) { - setMessages(cached.messages); - setIsHistoryLoaded(cached.isHistoryLoaded); - setNextCursor(cached.nextCursor); - } else { - setMessages([]); - setIsHistoryLoaded(false); - setNextCursor(null); - - // Use WS (with HTTP fallback) to load initial history - wsClient - .messageList(roomId, { limit: historyLimit }) - .then((resp) => { - const msgs = (resp.messages ?? []).map((m) => { - const member = membersRef.current.find((mb) => mb.user === m.sender_id); - return { - ...m, - room_id: m.room, - thread_id: m.thread ?? null, - display_name: m.display_name ?? member?.user_info?.username ?? member?.user, - avatar_url: member?.user_info?.avatar_url, - } as UiMessage; - }); - setMessages(msgs); - setNextCursor(msgs.length > 0 ? msgs[msgs.length - 1].seq : null); - setIsHistoryLoaded(msgs.length < historyLimit); - }) - .catch(() => { - toast.error('Failed to load message history'); - setIsHistoryLoaded(true); - }); - } - - // Subscribe to room push events - wsClient.subscribeRoom(roomId).catch(() => {}); - - return () => { - wsClient.unsubscribeRoom(roomId).catch(() => {}); - }; - }, [wsClient, roomId, historyLimit]); - - const loadMore = useCallback( - async (cursor?: number | null) => { - if (!wsClient || !roomId || isLoadingMore) return; - const effectiveCursor = cursor ?? nextCursorRef.current; - if (effectiveCursor == null) return; - - setIsLoadingMore(true); - try { - // Use WS (with HTTP fallback) for paginated history - const resp = await wsClient.messageList(roomId, { - beforeSeq: effectiveCursor, - limit: historyLimit, - }); - const older = (resp.messages ?? []).map((m) => { - const member = membersRef.current.find((mb) => mb.user === m.sender_id); - return { - ...m, - room_id: m.room, - thread_id: m.thread ?? null, - display_name: m.display_name ?? member?.user_info?.username ?? member?.user, - avatar_url: member?.user_info?.avatar_url, - } as UiMessage; - }); - - if (older.length === 0) { - setIsHistoryLoaded(true); - return; - } - - setMessages((prev) => { - const existingIds = new Set(prev.map((m) => m.id)); - const newOnes = older.filter((m) => !existingIds.has(m.id)); - if (newOnes.length === 0) { - setIsHistoryLoaded(true); - return prev; - } - const newCursor = newOnes[newOnes.length - 1].seq; - setNextCursor(newCursor > 0 ? newCursor : null); - return [...newOnes, ...prev]; - }); - } catch { - toast.error('Failed to load more messages'); - setIsHistoryLoaded(true); - } finally { - setIsLoadingMore(false); - } - }, - [wsClient, roomId, historyLimit, isLoadingMore], - ); - - return useMemo( - () => ({ - messages, - status, - errorMessage, - isHistoryLoaded, - isLoadingMore, - nextCursor, - loadMore, - }), - [messages, status, errorMessage, isHistoryLoaded, isLoadingMore, nextCursor, loadMore], - ); -} diff --git a/src/lib/room-ws-client.ts b/src/lib/room-ws-client.ts index dbc05d8..cbf91fe 100644 --- a/src/lib/room-ws-client.ts +++ b/src/lib/room-ws-client.ts @@ -178,7 +178,6 @@ export class RoomWsClient { // If we used an existing token and it was immediately rejected, retry with a new token if (!forceNewToken && this.wsToken) { console.debug('[RoomWs] Existing token rejected — fetching new token and retrying'); - const savedToken = this.wsToken; this.wsToken = null; return this.connect(true); }