fix(room): add HTTP batch reactions endpoint and clean up dead code

Backend:
- Add reaction_batch handler: GET /api/rooms/{room_id}/messages/reactions/batch?message_ids=...
- Register route in libs/api/room/mod.rs
- Backend already had message_reactions_batch service method, just needed HTTP exposure

Frontend:
- Add ReactionListData import to room-context.tsx
- Fix thisLoadReactions client type (was using broken NonNullable<ReturnType<>>)
- Remove unused oldRoomId variable
- Delete unused useRoomWs.ts hook (RoomWsClient has no on/off methods)
- Remove unused EmojiPicker function and old manual overlay picker from RoomMessageBubble
- Remove unused savedToken variable in room-ws-client
This commit is contained in:
ZhenYi 2026-04-17 22:12:10 +08:00
parent 2f1ed69b31
commit a1ddb5d5bc
6 changed files with 46 additions and 357 deletions

View File

@ -118,6 +118,11 @@ pub fn init_room_routes(cfg: &mut web::ServiceConfig) {
"/rooms/{room_id}/messages/{message_id}/reactions",
web::get().to(reaction::reaction_get),
)
// batch reactions
.route(
"/rooms/{room_id}/messages/reactions/batch",
web::get().to(reaction::reaction_batch),
)
// message search
.route(
"/rooms/{room_id}/messages/search",

View File

@ -18,6 +18,11 @@ pub struct MessageSearchQuery {
pub offset: Option<u64>,
}
#[derive(Debug, serde::Deserialize, IntoParams)]
pub struct ReactionBatchQuery {
pub message_ids: Vec<Uuid>,
}
#[utoipa::path(
post,
path = "/api/rooms/{room_id}/messages/{message_id}/reactions",
@ -119,6 +124,38 @@ pub async fn reaction_get(
Ok(ApiResponse::ok(resp).to_response())
}
#[utoipa::path(
get,
path = "/api/rooms/{room_id}/messages/reactions/batch",
params(
("room_id" = Uuid, Path),
("message_ids" = Vec<Uuid>, Query, description = "List of message IDs to fetch reactions for"),
),
responses(
(status = 200, description = "Batch get reactions", body = ApiResponse<Vec<room::MessageReactionsResponse>>),
(status = 401, description = "Unauthorized"),
),
tag = "Room"
)]
pub async fn reaction_batch(
service: web::Data<AppService>,
session: Session,
path: web::Path<Uuid>,
query: web::Query<ReactionBatchQuery>,
) -> Result<HttpResponse, ApiError> {
let room_id = path.into_inner();
let user_id = session
.user()
.ok_or_else(|| ApiError::from(service::error::AppError::Unauthorized))?;
let ctx = WsUserContext::new(user_id);
let resp = service
.room
.message_reactions_batch(room_id, query.into_inner().message_ids, &ctx)
.await
.map_err(ApiError::from)?;
Ok(ApiResponse::ok(resp).to_response())
}
#[utoipa::path(
get,
path = "/api/rooms/{room_id}/messages/search",

View File

@ -503,21 +503,3 @@ export const RoomMessageBubble = memo(function RoomMessageBubble({
</div>
);
});
function EmojiPicker({ onEmojiSelect }: { onEmojiSelect: (emoji: string) => void }) {
return (
<div className="grid grid-cols-8 gap-1">
{COMMON_EMOJIS.map((emoji) => (
<Button
key={emoji}
variant="ghost"
size="sm"
onClick={() => onEmojiSelect(emoji)}
className="size-7 p-0 text-base hover:bg-accent"
>
{emoji}
</Button>
))}
</div>
);
}

View File

@ -25,6 +25,7 @@ import {
type RoomMessagePayload,
type RoomCategoryResponse as WsRoomCategoryResponse,
type RoomReactionUpdatedPayload,
type ReactionListData,
} from '@/lib/room-ws-client';
import { requestWsToken } from '@/lib/ws-token';
import { useUser } from '@/contexts';
@ -221,7 +222,6 @@ export function RoomProvider({
useEffect(() => {
if (prevRoomIdRef.current !== activeRoomId) {
const oldRoomId = prevRoomIdRef.current;
prevRoomIdRef.current = activeRoomId;
loadMessagesAbortRef.current?.abort();
loadMessagesAbortRef.current = null;
@ -266,15 +266,15 @@ export function RoomProvider({
*/
const thisLoadReactions = (
roomId: string,
client: NonNullable<ReturnType<typeof wsClientRef.current>>,
client: RoomWsClient,
msgs: MessageWithMeta[],
) => {
const msgIds = msgs.map((m) => m.id);
if (msgIds.length === 0) return;
client
.reactionListBatch(roomId, msgIds)
.then((reactionResults) => {
const reactionMap = new Map<string, import('@/lib/room-ws-client').ReactionItem[]>();
.then((reactionResults: ReactionListData[]) => {
const reactionMap = new Map<string, ReactionListData['reactions']>();
for (const result of reactionResults) {
if (result.reactions.length > 0) {
reactionMap.set(result.message_id, result.reactions);

View File

@ -1,334 +0,0 @@
import { useCallback, useEffect, useMemo, useRef, useState } from 'react';
import { toast } from 'sonner';
import type {
AiStreamChunkPayload,
RoomMessagePayload,
RoomWsStatus,
RoomWsClient,
} from '@/lib/room-ws-client';
import type { RoomMemberResponse } from '@/client';
type RoomMessageCacheEntry = {
messages: UiMessage[];
isHistoryLoaded: boolean;
nextCursor: number | null;
};
/** A message as held in the UI state */
export type UiMessage = RoomMessagePayload & {
is_streaming?: boolean;
streaming_content?: string;
display_name?: string;
avatar_url?: string;
isOptimisticError?: boolean;
in_reply_to?: string | null;
edited_at?: string | null;
revoked?: string | null;
revoked_by?: string | null;
};
function compareMessages(a: UiMessage, b: UiMessage): number {
const timeDiff = new Date(a.send_at).getTime() - new Date(b.send_at).getTime();
return timeDiff !== 0 ? timeDiff : a.id.localeCompare(b.id);
}
function insertSorted(arr: UiMessage[], msg: UiMessage): UiMessage[] {
const result = [...arr];
let lo = 0;
let hi = result.length;
while (lo < hi) {
const mid = (lo + hi) >>> 1;
if (compareMessages(result[mid], msg) < 0) lo = mid + 1;
else hi = mid;
}
result.splice(lo, 0, msg);
return result;
}
export interface UseRoomWsOptions {
/** Shared RoomWsClient instance (from useRoom context) */
wsClient: RoomWsClient | null;
/** Currently open room ID */
roomId: string | null;
/** Limit for initial history load */
historyLimit?: number;
/** Room members, used to resolve display_name for user messages */
members?: RoomMemberResponse[];
/** Called when the AI streaming chunk arrives */
onAiStreamChunk?: (payload: AiStreamChunkPayload) => void;
}
export interface UseRoomWsReturn {
messages: UiMessage[];
status: RoomWsStatus;
errorMessage: string | null;
isHistoryLoaded: boolean;
isLoadingMore: boolean;
nextCursor: number | null;
loadMore: (cursor?: number | null) => void;
}
/**
* Manages room message state using the shared RoomWsClient.
*
* Uses WS request/response for message history (initial load + loadMore),
* and WS push events (room_message, ai_stream_chunk) for real-time updates.
* Falls back to HTTP automatically when WS is not connected.
*/
export function useRoomWs({
wsClient,
roomId,
historyLimit = 50,
members = [],
onAiStreamChunk,
}: UseRoomWsOptions): UseRoomWsReturn {
const [messages, setMessages] = useState<UiMessage[]>([]);
const [status, setStatus] = useState<RoomWsStatus>('idle');
const [errorMessage, setErrorMessage] = useState<string | null>(null);
const [isHistoryLoaded, setIsHistoryLoaded] = useState(false);
const [isLoadingMore, setIsLoadingMore] = useState(false);
const [nextCursor, setNextCursor] = useState<number | null>(null);
const roomCacheRef = useRef<Map<string, RoomMessageCacheEntry>>(new Map());
const messagesRef = useRef<UiMessage[]>([]);
messagesRef.current = messages;
const nextCursorRef = useRef<number | null>(null);
nextCursorRef.current = nextCursor;
const membersRef = useRef<RoomMemberResponse[]>([]);
membersRef.current = members;
const streamingBatchRef = useRef<Map<string, { content: string; done: boolean; room_id: string }>>(new Map());
const streamingRafRef = useRef<number | null>(null);
const flushStreamingBatch = useCallback(() => {
const batch = streamingBatchRef.current;
if (batch.size === 0) return;
setMessages((prev) => {
const next = [...prev];
let changed = false;
for (const [messageId, chunk] of batch) {
const idx = next.findIndex((m) => m.id === messageId);
if (idx === -1) {
const placeholder: UiMessage = {
id: messageId,
room_id: chunk.room_id ?? next.find(() => true)?.room_id ?? '',
sender_type: 'ai',
content: chunk.done ? chunk.content : '',
content_type: 'text',
send_at: new Date().toISOString(),
seq: 0,
display_name: 'AI',
is_streaming: !chunk.done,
streaming_content: chunk.done ? undefined : chunk.content,
};
next.push(placeholder);
changed = true;
} else {
const updated = { ...next[idx] };
if (chunk.done) {
updated.is_streaming = false;
updated.content = chunk.content;
updated.streaming_content = undefined;
} else {
updated.is_streaming = true;
updated.streaming_content = (updated.streaming_content ?? updated.content) + chunk.content;
}
next[idx] = updated;
changed = true;
}
}
return changed ? next : prev;
});
streamingBatchRef.current.clear();
streamingRafRef.current = null;
}, []);
// Sync WS status to local state
useEffect(() => {
if (!wsClient) {
setStatus('idle');
return;
}
const handleStatusChange = (newStatus: RoomWsStatus) => {
setStatus(newStatus);
};
wsClient.on('statusChange', handleStatusChange);
// Sync initial status
setStatus(wsClient.getStatus());
return () => {
wsClient.off('statusChange', handleStatusChange);
};
}, [wsClient]);
// Register push event handlers on wsClient
useEffect(() => {
if (!wsClient) return;
const handleRoomMessage = (payload: RoomMessagePayload) => {
const currentMembers = membersRef.current;
const existingIds = new Set(messagesRef.current.map((m) => m.id));
if (existingIds.has(payload.id)) return;
const member = currentMembers.find((m) => m.user === payload.sender_id);
const display_name =
payload.display_name ??
(member ? member.user_info?.username ?? member.user : undefined);
const avatar_url = member?.user_info?.avatar_url;
setMessages((prev) =>
insertSorted(prev, { ...payload, display_name, avatar_url }),
);
};
const handleAiStreamChunk = (chunk: AiStreamChunkPayload) => {
onAiStreamChunk?.(chunk);
streamingBatchRef.current.set(chunk.message_id, {
content: chunk.content,
done: chunk.done,
room_id: chunk.room_id,
});
if (streamingRafRef.current == null) {
streamingRafRef.current = requestAnimationFrame(() => {
flushStreamingBatch();
});
}
};
wsClient.on('roomMessage', handleRoomMessage);
wsClient.on('aiStreamChunk', handleAiStreamChunk);
return () => {
wsClient.off('roomMessage', handleRoomMessage);
wsClient.off('aiStreamChunk', handleAiStreamChunk);
};
}, [wsClient, onAiStreamChunk, flushStreamingBatch]);
// Cache messages when they change
useEffect(() => {
const room = roomId;
if (!room) return;
roomCacheRef.current.set(room, {
messages,
isHistoryLoaded,
nextCursor,
});
}, [messages, isHistoryLoaded, nextCursor, roomId]);
// Subscribe to room and load messages
useEffect(() => {
if (!wsClient || !roomId) return;
// Restore from cache or load fresh
const cached = roomCacheRef.current.get(roomId);
if (cached) {
setMessages(cached.messages);
setIsHistoryLoaded(cached.isHistoryLoaded);
setNextCursor(cached.nextCursor);
} else {
setMessages([]);
setIsHistoryLoaded(false);
setNextCursor(null);
// Use WS (with HTTP fallback) to load initial history
wsClient
.messageList(roomId, { limit: historyLimit })
.then((resp) => {
const msgs = (resp.messages ?? []).map((m) => {
const member = membersRef.current.find((mb) => mb.user === m.sender_id);
return {
...m,
room_id: m.room,
thread_id: m.thread ?? null,
display_name: m.display_name ?? member?.user_info?.username ?? member?.user,
avatar_url: member?.user_info?.avatar_url,
} as UiMessage;
});
setMessages(msgs);
setNextCursor(msgs.length > 0 ? msgs[msgs.length - 1].seq : null);
setIsHistoryLoaded(msgs.length < historyLimit);
})
.catch(() => {
toast.error('Failed to load message history');
setIsHistoryLoaded(true);
});
}
// Subscribe to room push events
wsClient.subscribeRoom(roomId).catch(() => {});
return () => {
wsClient.unsubscribeRoom(roomId).catch(() => {});
};
}, [wsClient, roomId, historyLimit]);
const loadMore = useCallback(
async (cursor?: number | null) => {
if (!wsClient || !roomId || isLoadingMore) return;
const effectiveCursor = cursor ?? nextCursorRef.current;
if (effectiveCursor == null) return;
setIsLoadingMore(true);
try {
// Use WS (with HTTP fallback) for paginated history
const resp = await wsClient.messageList(roomId, {
beforeSeq: effectiveCursor,
limit: historyLimit,
});
const older = (resp.messages ?? []).map((m) => {
const member = membersRef.current.find((mb) => mb.user === m.sender_id);
return {
...m,
room_id: m.room,
thread_id: m.thread ?? null,
display_name: m.display_name ?? member?.user_info?.username ?? member?.user,
avatar_url: member?.user_info?.avatar_url,
} as UiMessage;
});
if (older.length === 0) {
setIsHistoryLoaded(true);
return;
}
setMessages((prev) => {
const existingIds = new Set(prev.map((m) => m.id));
const newOnes = older.filter((m) => !existingIds.has(m.id));
if (newOnes.length === 0) {
setIsHistoryLoaded(true);
return prev;
}
const newCursor = newOnes[newOnes.length - 1].seq;
setNextCursor(newCursor > 0 ? newCursor : null);
return [...newOnes, ...prev];
});
} catch {
toast.error('Failed to load more messages');
setIsHistoryLoaded(true);
} finally {
setIsLoadingMore(false);
}
},
[wsClient, roomId, historyLimit, isLoadingMore],
);
return useMemo(
() => ({
messages,
status,
errorMessage,
isHistoryLoaded,
isLoadingMore,
nextCursor,
loadMore,
}),
[messages, status, errorMessage, isHistoryLoaded, isLoadingMore, nextCursor, loadMore],
);
}

View File

@ -178,7 +178,6 @@ export class RoomWsClient {
// If we used an existing token and it was immediately rejected, retry with a new token
if (!forceNewToken && this.wsToken) {
console.debug('[RoomWs] Existing token rejected — fetching new token and retrying');
const savedToken = this.wsToken;
this.wsToken = null;
return this.connect(true);
}