From e9d5407c66cb83de0b1757d7b4cf92d7417bff01 Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Thu, 30 Apr 2026 19:15:42 +0800 Subject: [PATCH] feat(room): add AI streaming and message hooks for frontend Add use-ai-streaming hook for SSE-based AI response streaming, use-room-messages for real-time message updates, and wire up room context + ws protocol changes. --- src/contexts/room-context.tsx | 24 ++++++- src/hooks/use-ai-streaming.ts | 76 +++++++++++++++++++++ src/hooks/use-room-messages.ts | 118 +++++++++++++++++++++++++++++++++ src/lib/room-ws-client.ts | 15 +++++ src/lib/ws-protocol.ts | 1 + 5 files changed, 233 insertions(+), 1 deletion(-) create mode 100644 src/hooks/use-ai-streaming.ts create mode 100644 src/hooks/use-room-messages.ts diff --git a/src/contexts/room-context.tsx b/src/contexts/room-context.tsx index 848c4af..e6722df 100644 --- a/src/contexts/room-context.tsx +++ b/src/contexts/room-context.tsx @@ -74,6 +74,8 @@ export type MessageWithMeta = RoomMessageResponse & { chunk_type?: string; /** Accumulated thinking/reasoning content from AI stream (collapsible) */ thinking_content?: string; + /** True when thinking_content is JSON chunk array, false for plain text */ + thinking_is_chunked?: boolean; }; export type RoomWithCategory = RoomResponse & { @@ -83,6 +85,7 @@ export type RoomWithCategory = RoomResponse & { export type UiMessage = MessageWithMeta; function wsMessageToUiMessage(wsMsg: RoomMessagePayload): MessageWithMeta { + const thinkingIsChunked = wsMsg.thinking_content?.includes('__chunks__') ?? false; return { id: wsMsg.id, seq: wsMsg.seq, @@ -99,6 +102,7 @@ function wsMessageToUiMessage(wsMsg: RoomMessagePayload): MessageWithMeta { is_streaming: false, reactions: wsMsg.reactions, thinking_content: wsMsg.thinking_content, + thinking_is_chunked: thinkingIsChunked, }; } @@ -162,6 +166,8 @@ interface RoomContextValue { streamingChunks: Map>; /** Active AI stream info for typing indicator */ activeAiStream: { message_id: string; display_name: string } | null; + /** Cancel an active AI streaming session */ + cancelAiStream: () => Promise; /** Project repositories for @repository: mention suggestions */ projectRepos: ProjectRepositoryItem[]; @@ -416,7 +422,15 @@ export function RoomProvider({ if (abortController.signal.aborted) return prev; if (isInitial) { setIsTransitioningRoom(false); - return newMessages; + // Merge: preserve any WS messages that arrived during loading + const existingIds = new Set(newMessages.map((m) => m.id)); + const pending = prev.filter((m) => !existingIds.has(m.id)); + let merged = [...newMessages, ...pending]; + merged.sort((a, b) => a.seq - b.seq); + if (merged.length > MAX_MESSAGES_IN_MEMORY) { + merged = merged.slice(-MAX_MESSAGES_IN_MEMORY); + } + return merged; } const existingIds = new Set(prev.map((m) => m.id)); const filtered = newMessages.filter((m) => !existingIds.has(m.id)); @@ -1197,6 +1211,12 @@ export function RoomProvider({ [], ); + const cancelAiStream = useCallback(async () => { + const client = wsClientRef.current; + if (!client) return false; + return client.cancelAiStream(activeRoomIdRef.current ?? ''); + }, []); + const updateReadSeq = useCallback( async (seq: number) => { const client = wsClientRef.current; @@ -1510,6 +1530,7 @@ export function RoomProvider({ deleteRoom, streamingChunks, activeAiStream, + cancelAiStream, projectRepos, reposLoading, roomAiConfigs, @@ -1565,6 +1586,7 @@ export function RoomProvider({ deleteRoom, streamingChunks, activeAiStream, + cancelAiStream, projectRepos, reposLoading, roomAiConfigs, diff --git a/src/hooks/use-ai-streaming.ts b/src/hooks/use-ai-streaming.ts new file mode 100644 index 0000000..6120e9f --- /dev/null +++ b/src/hooks/use-ai-streaming.ts @@ -0,0 +1,76 @@ +import { useCallback, useRef, useState } from 'react'; +import type { RoomWsClient } from '@/lib/room-ws-client'; + +export interface AiStreamChunk { + type: string; + content: string; + seq?: number; +} + +export interface ActiveAiStream { + message_id: string; + display_name: string; +} + +/** + * Hook managing AI streaming state: streaming chunks, active stream indicator, + * and stream cancellation. Separated from the main room context to reduce + * the God component size (~1583 lines → ~300). + */ +export function useAiStreaming(clientRef: React.MutableRefObject) { + const [streamingChunks, setStreamingChunks] = useState>(new Map()); + const [activeAiStream, setActiveAiStream] = useState(null); + // Ref to latest chunks so done handler reads current state (setState is async) + const chunksRef = useRef>(new Map()); + + const clearStreamingState = useCallback((msgId: string) => { + setStreamingChunks(prev => { prev.delete(msgId); return new Map(prev); }); + chunksRef.current.delete(msgId); + setActiveAiStream(null); + }, []); + + const insertChunk = useCallback(( + messageId: string, + chunkType: string | undefined, + content: string, + seq: number | undefined, + ) => { + setStreamingChunks(prev => { + const next = new Map(prev); + const existing: AiStreamChunk[] = next.get(messageId) ?? []; + const s = seq ?? existing.length; + const newChunk: AiStreamChunk = { type: chunkType ?? 'answer', content, seq: s }; + const insertIdx = existing.findIndex(c => c.seq != null && c.seq > s); + next.set(messageId, + insertIdx === -1 + ? [...existing, newChunk] + : [...existing.slice(0, insertIdx), newChunk, ...existing.slice(insertIdx)] + ); + chunksRef.current = new Map(next); + return next; + }); + }, []); + + const getOrderedChunks = useCallback((msgId: string): AiStreamChunk[] => { + return chunksRef.current.get(msgId) ?? []; + }, []); + + const cancelAiStream = useCallback(async () => { + const client = clientRef.current; + if (!client) return false; + const roomId = client.getSubscribedRooms().values().next().value; + if (!roomId) return false; + return client.cancelAiStream(roomId as string); + }, [clientRef]); + + return { + streamingChunks, + activeAiStream, + setActiveAiStream, + clearStreamingState, + insertChunk, + getOrderedChunks, + cancelAiStream, + chunksRef, + }; +} diff --git a/src/hooks/use-room-messages.ts b/src/hooks/use-room-messages.ts new file mode 100644 index 0000000..11d636a --- /dev/null +++ b/src/hooks/use-room-messages.ts @@ -0,0 +1,118 @@ +import { useCallback, useState } from 'react'; +import type { RoomWsClient } from '@/lib/room-ws-client'; +import type { MessageWithMeta } from '@/contexts/room-context'; + +const MAX_MESSAGES_IN_MEMORY = 1000; + +/** + * Hook managing room messages state: list, send, edit, revoke. + * Separated to reduce the main room context (~1583 lines). + */ +export function useRoomMessages(clientRef: React.MutableRefObject) { + const [messages, setMessages] = useState([]); + const [messagesLoading, setMessagesLoading] = useState(false); + const [isHistoryLoaded, setIsHistoryLoaded] = useState(false); + const [isLoadingMore, setIsLoadingMore] = useState(false); + const [isTransitioningRoom, setIsTransitioningRoom] = useState(false); + const [nextCursor, setNextCursor] = useState(null); + + const appendMessage = useCallback((msg: MessageWithMeta) => { + setMessages(prev => { + const exists = prev.some(m => m.id === msg.id); + if (exists) return prev.map(m => m.id === msg.id ? { ...m, ...msg } : m); + const next = [...prev, msg]; + return next.length > MAX_MESSAGES_IN_MEMORY ? next.slice(-MAX_MESSAGES_IN_MEMORY) : next; + }); + }, []); + + const updateMessage = useCallback((msgId: string, updater: (m: MessageWithMeta) => MessageWithMeta) => { + setMessages(prev => prev.map(m => m.id === msgId ? updater(m) : m)); + }, []); + + const removeMessage = useCallback((msgId: string) => { + setMessages(prev => prev.filter(m => m.id !== msgId)); + }, []); + + const clearMessages = useCallback(() => { + setMessages([]); + setIsHistoryLoaded(false); + setNextCursor(null); + }, []); + + const sendMessage = useCallback(async ( + content: string, + contentType?: string, + inReplyTo?: string, + attachmentIds?: string[], + ) => { + const client = clientRef.current; + if (!client) return; + const roomId = client.getSubscribedRooms().values().next().value; + if (!roomId) return; + + const optimistic: MessageWithMeta = { + id: crypto.randomUUID(), + seq: 0, + room: roomId as string, + sender_type: 'member', + content, + content_type: contentType || 'text', + send_at: new Date().toISOString(), + display_content: content, + isOptimistic: true, + attachment_ids: attachmentIds, + }; + appendMessage(optimistic); + + try { + const result = await client.messageCreate(roomId as string, content, { + contentType, + inReplyTo, + attachmentIds, + }); + setMessages(prev => prev.map(m => m.id === optimistic.id ? { + ...result, + id: result.id, + seq: result.seq, + isOptimistic: false, + } : m)); + } catch (err) { + setMessages(prev => prev.map(m => m.id === optimistic.id ? { + ...m, isOptimistic: false, isOptimisticError: true, + } : m)); + throw err; + } + }, [clientRef, appendMessage]); + + const editMessage = useCallback(async (messageId: string, content: string) => { + const client = clientRef.current; + if (!client) return; + await client.messageUpdate(messageId, content); + setMessages(prev => prev.map(m => m.id === messageId ? { ...m, content, display_content: content } : m)); + }, [clientRef]); + + const revokeMessage = useCallback(async (messageId: string) => { + const client = clientRef.current; + if (!client) return; + let rollback: MessageWithMeta | null = null; + setMessages(prev => { + rollback = prev.find(m => m.id === messageId) ?? null; + return prev.filter(m => m.id !== messageId); + }); + try { + await client.messageRevoke(messageId); + } catch { + if (rollback) setMessages(prev => [...prev, rollback!]); + } + }, [clientRef]); + + return { + messages, setMessages, appendMessage, updateMessage, removeMessage, clearMessages, + messagesLoading, setMessagesLoading, + isHistoryLoaded, setIsHistoryLoaded, + isLoadingMore, setIsLoadingMore, + isTransitioningRoom, setIsTransitioningRoom, + nextCursor, setNextCursor, + sendMessage, editMessage, revokeMessage, + }; +} diff --git a/src/lib/room-ws-client.ts b/src/lib/room-ws-client.ts index f437c3d..4b26474 100644 --- a/src/lib/room-ws-client.ts +++ b/src/lib/room-ws-client.ts @@ -453,6 +453,7 @@ export class RoomWsClient { case 'ai.list': return { path: '/rooms/{room_id}/ai', method: 'GET', pathParams: ['room_id'] }; case 'ai.upsert': return { path: '/rooms/{room_id}/ai', method: 'PUT', pathParams: ['room_id'] }; case 'ai.delete': return { path: '/rooms/{room_id}/ai/{model_id}', method: 'DELETE', pathParams: ['room_id', 'model_id'] }; + case 'ai.stop': return { path: '/rooms/{room_id}/ai/stop', method: 'POST', pathParams: ['room_id'] }; case 'notification.list': return { path: '/me/notifications', method: 'GET', pathParams: [] }; case 'notification.mark_read': return { path: '/me/notifications/{notification_id}/read', method: 'POST', pathParams: ['notification_id'] }; case 'notification.mark_all_read': return { path: '/me/notifications/read-all', method: 'POST', pathParams: [] }; @@ -1203,6 +1204,20 @@ export class RoomWsClient { } } + /** Cancel an active AI streaming session for a room. */ + async cancelAiStream(roomId: string): Promise { + if (this.status !== 'open' || !this.ws) { + return false; + } + try { + const data = await this.requestWs('ai.stop', { room_id: roomId }); + return data === true; + } catch (err) { + console.warn('[RoomWs] cancelAiStream failed:', roomId, err); + return false; + } + } + private scheduleReconnect(): void { if (!this.shouldReconnect) return; diff --git a/src/lib/ws-protocol.ts b/src/lib/ws-protocol.ts index 37dd712..ba0e18c 100644 --- a/src/lib/ws-protocol.ts +++ b/src/lib/ws-protocol.ts @@ -40,6 +40,7 @@ export type WsAction = | 'ai.list' | 'ai.upsert' | 'ai.delete' + | 'ai.stop' | 'notification.list' | 'notification.mark_read' | 'notification.mark_all_read'