feat(room): add AI streaming and message hooks for frontend

Add use-ai-streaming hook for SSE-based AI response streaming,
use-room-messages for real-time message updates, and wire up
room context + ws protocol changes.
This commit is contained in:
ZhenYi 2026-04-30 19:15:42 +08:00
parent 009ccee72b
commit e9d5407c66
5 changed files with 233 additions and 1 deletions

View File

@ -74,6 +74,8 @@ export type MessageWithMeta = RoomMessageResponse & {
chunk_type?: string;
/** Accumulated thinking/reasoning content from AI stream (collapsible) */
thinking_content?: string;
/** True when thinking_content is JSON chunk array, false for plain text */
thinking_is_chunked?: boolean;
};
export type RoomWithCategory = RoomResponse & {
@ -83,6 +85,7 @@ export type RoomWithCategory = RoomResponse & {
export type UiMessage = MessageWithMeta;
function wsMessageToUiMessage(wsMsg: RoomMessagePayload): MessageWithMeta {
const thinkingIsChunked = wsMsg.thinking_content?.includes('__chunks__') ?? false;
return {
id: wsMsg.id,
seq: wsMsg.seq,
@ -99,6 +102,7 @@ function wsMessageToUiMessage(wsMsg: RoomMessagePayload): MessageWithMeta {
is_streaming: false,
reactions: wsMsg.reactions,
thinking_content: wsMsg.thinking_content,
thinking_is_chunked: thinkingIsChunked,
};
}
@ -162,6 +166,8 @@ interface RoomContextValue {
streamingChunks: Map<string, Array<{ type: string; content: string }>>;
/** Active AI stream info for typing indicator */
activeAiStream: { message_id: string; display_name: string } | null;
/** Cancel an active AI streaming session */
cancelAiStream: () => Promise<boolean>;
/** Project repositories for @repository: mention suggestions */
projectRepos: ProjectRepositoryItem[];
@ -416,7 +422,15 @@ export function RoomProvider({
if (abortController.signal.aborted) return prev;
if (isInitial) {
setIsTransitioningRoom(false);
return newMessages;
// Merge: preserve any WS messages that arrived during loading
const existingIds = new Set(newMessages.map((m) => m.id));
const pending = prev.filter((m) => !existingIds.has(m.id));
let merged = [...newMessages, ...pending];
merged.sort((a, b) => a.seq - b.seq);
if (merged.length > MAX_MESSAGES_IN_MEMORY) {
merged = merged.slice(-MAX_MESSAGES_IN_MEMORY);
}
return merged;
}
const existingIds = new Set(prev.map((m) => m.id));
const filtered = newMessages.filter((m) => !existingIds.has(m.id));
@ -1197,6 +1211,12 @@ export function RoomProvider({
[],
);
const cancelAiStream = useCallback(async () => {
const client = wsClientRef.current;
if (!client) return false;
return client.cancelAiStream(activeRoomIdRef.current ?? '');
}, []);
const updateReadSeq = useCallback(
async (seq: number) => {
const client = wsClientRef.current;
@ -1510,6 +1530,7 @@ export function RoomProvider({
deleteRoom,
streamingChunks,
activeAiStream,
cancelAiStream,
projectRepos,
reposLoading,
roomAiConfigs,
@ -1565,6 +1586,7 @@ export function RoomProvider({
deleteRoom,
streamingChunks,
activeAiStream,
cancelAiStream,
projectRepos,
reposLoading,
roomAiConfigs,

View File

@ -0,0 +1,76 @@
import { useCallback, useRef, useState } from 'react';
import type { RoomWsClient } from '@/lib/room-ws-client';
export interface AiStreamChunk {
type: string;
content: string;
seq?: number;
}
export interface ActiveAiStream {
message_id: string;
display_name: string;
}
/**
* Hook managing AI streaming state: streaming chunks, active stream indicator,
* and stream cancellation. Separated from the main room context to reduce
* the God component size (~1583 lines ~300).
*/
export function useAiStreaming(clientRef: React.MutableRefObject<RoomWsClient | null>) {
const [streamingChunks, setStreamingChunks] = useState<Map<string, AiStreamChunk[]>>(new Map());
const [activeAiStream, setActiveAiStream] = useState<ActiveAiStream | null>(null);
// Ref to latest chunks so done handler reads current state (setState is async)
const chunksRef = useRef<Map<string, AiStreamChunk[]>>(new Map());
const clearStreamingState = useCallback((msgId: string) => {
setStreamingChunks(prev => { prev.delete(msgId); return new Map(prev); });
chunksRef.current.delete(msgId);
setActiveAiStream(null);
}, []);
const insertChunk = useCallback((
messageId: string,
chunkType: string | undefined,
content: string,
seq: number | undefined,
) => {
setStreamingChunks(prev => {
const next = new Map(prev);
const existing: AiStreamChunk[] = next.get(messageId) ?? [];
const s = seq ?? existing.length;
const newChunk: AiStreamChunk = { type: chunkType ?? 'answer', content, seq: s };
const insertIdx = existing.findIndex(c => c.seq != null && c.seq > s);
next.set(messageId,
insertIdx === -1
? [...existing, newChunk]
: [...existing.slice(0, insertIdx), newChunk, ...existing.slice(insertIdx)]
);
chunksRef.current = new Map(next);
return next;
});
}, []);
const getOrderedChunks = useCallback((msgId: string): AiStreamChunk[] => {
return chunksRef.current.get(msgId) ?? [];
}, []);
const cancelAiStream = useCallback(async () => {
const client = clientRef.current;
if (!client) return false;
const roomId = client.getSubscribedRooms().values().next().value;
if (!roomId) return false;
return client.cancelAiStream(roomId as string);
}, [clientRef]);
return {
streamingChunks,
activeAiStream,
setActiveAiStream,
clearStreamingState,
insertChunk,
getOrderedChunks,
cancelAiStream,
chunksRef,
};
}

View File

@ -0,0 +1,118 @@
import { useCallback, useState } from 'react';
import type { RoomWsClient } from '@/lib/room-ws-client';
import type { MessageWithMeta } from '@/contexts/room-context';
const MAX_MESSAGES_IN_MEMORY = 1000;
/**
* Hook managing room messages state: list, send, edit, revoke.
* Separated to reduce the main room context (~1583 lines).
*/
export function useRoomMessages(clientRef: React.MutableRefObject<RoomWsClient | null>) {
const [messages, setMessages] = useState<MessageWithMeta[]>([]);
const [messagesLoading, setMessagesLoading] = useState(false);
const [isHistoryLoaded, setIsHistoryLoaded] = useState(false);
const [isLoadingMore, setIsLoadingMore] = useState(false);
const [isTransitioningRoom, setIsTransitioningRoom] = useState(false);
const [nextCursor, setNextCursor] = useState<number | null>(null);
const appendMessage = useCallback((msg: MessageWithMeta) => {
setMessages(prev => {
const exists = prev.some(m => m.id === msg.id);
if (exists) return prev.map(m => m.id === msg.id ? { ...m, ...msg } : m);
const next = [...prev, msg];
return next.length > MAX_MESSAGES_IN_MEMORY ? next.slice(-MAX_MESSAGES_IN_MEMORY) : next;
});
}, []);
const updateMessage = useCallback((msgId: string, updater: (m: MessageWithMeta) => MessageWithMeta) => {
setMessages(prev => prev.map(m => m.id === msgId ? updater(m) : m));
}, []);
const removeMessage = useCallback((msgId: string) => {
setMessages(prev => prev.filter(m => m.id !== msgId));
}, []);
const clearMessages = useCallback(() => {
setMessages([]);
setIsHistoryLoaded(false);
setNextCursor(null);
}, []);
const sendMessage = useCallback(async (
content: string,
contentType?: string,
inReplyTo?: string,
attachmentIds?: string[],
) => {
const client = clientRef.current;
if (!client) return;
const roomId = client.getSubscribedRooms().values().next().value;
if (!roomId) return;
const optimistic: MessageWithMeta = {
id: crypto.randomUUID(),
seq: 0,
room: roomId as string,
sender_type: 'member',
content,
content_type: contentType || 'text',
send_at: new Date().toISOString(),
display_content: content,
isOptimistic: true,
attachment_ids: attachmentIds,
};
appendMessage(optimistic);
try {
const result = await client.messageCreate(roomId as string, content, {
contentType,
inReplyTo,
attachmentIds,
});
setMessages(prev => prev.map(m => m.id === optimistic.id ? {
...result,
id: result.id,
seq: result.seq,
isOptimistic: false,
} : m));
} catch (err) {
setMessages(prev => prev.map(m => m.id === optimistic.id ? {
...m, isOptimistic: false, isOptimisticError: true,
} : m));
throw err;
}
}, [clientRef, appendMessage]);
const editMessage = useCallback(async (messageId: string, content: string) => {
const client = clientRef.current;
if (!client) return;
await client.messageUpdate(messageId, content);
setMessages(prev => prev.map(m => m.id === messageId ? { ...m, content, display_content: content } : m));
}, [clientRef]);
const revokeMessage = useCallback(async (messageId: string) => {
const client = clientRef.current;
if (!client) return;
let rollback: MessageWithMeta | null = null;
setMessages(prev => {
rollback = prev.find(m => m.id === messageId) ?? null;
return prev.filter(m => m.id !== messageId);
});
try {
await client.messageRevoke(messageId);
} catch {
if (rollback) setMessages(prev => [...prev, rollback!]);
}
}, [clientRef]);
return {
messages, setMessages, appendMessage, updateMessage, removeMessage, clearMessages,
messagesLoading, setMessagesLoading,
isHistoryLoaded, setIsHistoryLoaded,
isLoadingMore, setIsLoadingMore,
isTransitioningRoom, setIsTransitioningRoom,
nextCursor, setNextCursor,
sendMessage, editMessage, revokeMessage,
};
}

View File

@ -453,6 +453,7 @@ export class RoomWsClient {
case 'ai.list': return { path: '/rooms/{room_id}/ai', method: 'GET', pathParams: ['room_id'] };
case 'ai.upsert': return { path: '/rooms/{room_id}/ai', method: 'PUT', pathParams: ['room_id'] };
case 'ai.delete': return { path: '/rooms/{room_id}/ai/{model_id}', method: 'DELETE', pathParams: ['room_id', 'model_id'] };
case 'ai.stop': return { path: '/rooms/{room_id}/ai/stop', method: 'POST', pathParams: ['room_id'] };
case 'notification.list': return { path: '/me/notifications', method: 'GET', pathParams: [] };
case 'notification.mark_read': return { path: '/me/notifications/{notification_id}/read', method: 'POST', pathParams: ['notification_id'] };
case 'notification.mark_all_read': return { path: '/me/notifications/read-all', method: 'POST', pathParams: [] };
@ -1203,6 +1204,20 @@ export class RoomWsClient {
}
}
/** Cancel an active AI streaming session for a room. */
async cancelAiStream(roomId: string): Promise<boolean> {
if (this.status !== 'open' || !this.ws) {
return false;
}
try {
const data = await this.requestWs<boolean>('ai.stop', { room_id: roomId });
return data === true;
} catch (err) {
console.warn('[RoomWs] cancelAiStream failed:', roomId, err);
return false;
}
}
private scheduleReconnect(): void {
if (!this.shouldReconnect) return;

View File

@ -40,6 +40,7 @@ export type WsAction =
| 'ai.list'
| 'ai.upsert'
| 'ai.delete'
| 'ai.stop'
| 'notification.list'
| 'notification.mark_read'
| 'notification.mark_all_read'