gitdataai/src/lib/room-ws-client.ts
ZhenYi b70d91866c fix(room-ws): try reconnect with existing token before requesting new
On auto-reconnect (scheduleReconnect), attempt connection with the stored
wsToken first. If the WS closes immediately (server rejected the token),
fall back to fetching a fresh token and retrying. Only requests a new
token when the existing one fails or when connect() is called manually
with forceNewToken=true.
2026-04-17 21:40:07 +08:00

975 lines
34 KiB
TypeScript

import { v7 as uuidv7 } from 'uuid';
import type {
WsRequest,
WsResponse,
WsEvent,
WsInMessage,
WsAction,
WsRequestParams,
WsResponseData,
RoomMessagePayload,
AiStreamChunkPayload,
ProjectEventPayload,
RoomResponse,
RoomCategoryResponse,
RoomMessageResponse,
RoomMessageListResponse,
RoomMemberResponse,
PinResponseData,
RoomThreadResponse,
ReactionListData,
ReactionItem,
SearchResultData,
EditHistoryEntry,
AiConfigData,
NotificationListData,
MentionListData,
MessageEditHistoryResponse,
SubscribeData,
UserInfo,
RoomReactionUpdatedPayload,
} from './ws-protocol';
export type {
RoomMessagePayload,
AiStreamChunkPayload,
ProjectEventPayload,
RoomResponse,
RoomCategoryResponse,
RoomMessageResponse,
RoomMessageListResponse,
RoomMemberResponse,
PinResponseData,
RoomThreadResponse,
ReactionListData,
ReactionItem,
SearchResultData,
EditHistoryEntry,
AiConfigData,
NotificationListData,
MentionListData,
MessageEditHistoryResponse,
UserInfo,
RoomReactionUpdatedPayload,
};
export interface WsTokenResponse {
token: string;
expires_in_seconds: number;
}
export type RoomWsStatus = 'idle' | 'connecting' | 'open' | 'closing' | 'closed' | 'error';
interface PendingRequest<T = WsResponseData> {
resolve: (value: T) => void;
reject: (error: Error) => void;
timeout: ReturnType<typeof setTimeout>;
}
export interface RoomWsCallbacks {
onRoomMessage?: (payload: RoomMessagePayload) => void;
onProjectEvent?: (payload: ProjectEventPayload) => void;
onAiStreamChunk?: (chunk: AiStreamChunkPayload) => void;
onRoomReactionUpdated?: (payload: import('./ws-protocol').RoomReactionUpdatedPayload) => void;
onMessageEdited?: (payload: import('./ws-protocol').MessageEditedPayload) => void;
onMessageRevoked?: (payload: import('./ws-protocol').MessageRevokedPayload) => void;
onMessagePinned?: (payload: import('./ws-protocol').MessagePinnedPayload) => void;
onMessageUnpinned?: (payload: import('./ws-protocol').MessageUnpinnedPayload) => void;
onStatusChange?: (status: RoomWsStatus) => void;
onError?: (error: Error) => void;
}
export class RoomWsClient {
private ws: WebSocket | null = null;
private status: RoomWsStatus = 'idle';
private baseUrl: string;
private callbacks: RoomWsCallbacks;
private pendingRequests = new Map<string, PendingRequest>();
private reconnectAttempt = 0;
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
private shouldReconnect = true;
private subscribedRooms = new Set<string>();
private subscribedProjects = new Set<string>();
private readonly reconnectBaseDelay: number;
private readonly reconnectMaxDelay: number;
private readonly requestTimeout: number;
private wsToken: string | null = null;
constructor(
baseUrl: string,
callbacks: RoomWsCallbacks = {},
options: {
reconnectBaseDelay?: number;
reconnectMaxDelay?: number;
requestTimeout?: number;
wsToken?: string;
} = {},
) {
this.baseUrl = baseUrl;
this.callbacks = callbacks;
this.reconnectBaseDelay = options.reconnectBaseDelay ?? 1000;
this.reconnectMaxDelay = options.reconnectMaxDelay ?? 15000;
this.requestTimeout = options.requestTimeout ?? 30_000;
this.wsToken = options.wsToken ?? null;
}
setWsToken(token: string | null): void {
this.wsToken = token;
}
getWsToken(): string | null {
return this.wsToken;
}
getStatus(): RoomWsStatus {
return this.status;
}
getSubscribedRooms(): Set<string> {
return new Set(this.subscribedRooms);
}
getSubscribedProjects(): Set<string> {
return new Set(this.subscribedProjects);
}
async connect(forceNewToken = false): Promise<void> {
if (this.ws && this.status === 'open') {
return;
}
this.shouldReconnect = true;
this.setStatus('connecting');
// Fetch a fresh token unless we have a valid existing one and not forcing.
// When forceNewToken=false (reconnect path), try existing token first.
if (forceNewToken || !this.wsToken) {
try {
const tokenResp = await fetch(`${this.baseUrl}/api/ws/token`, {
method: 'POST',
credentials: 'include',
});
if (!tokenResp.ok) {
const text = await tokenResp.text().catch(() => '');
console.error(`[RoomWs] Token fetch failed: ${tokenResp.status} ${tokenResp.statusText}${text}`);
throw new Error(`Token fetch failed: ${tokenResp.status}`);
}
const tokenData = await tokenResp.json();
this.wsToken = tokenData.data?.token || null;
if (!this.wsToken) {
console.error('[RoomWs] Token is empty — not logged in?');
throw new Error('No WS token received');
}
} catch (err) {
console.error('[RoomWs] Failed to fetch WS token:', err);
this.setStatus('error');
this.callbacks.onError?.(err instanceof Error ? err : new Error(String(err)));
throw err;
}
}
const wsUrl = this.buildWsUrl();
console.debug('[RoomWs] Connecting to:', wsUrl);
this.ws = new WebSocket(wsUrl);
// Guard: if ws is closed before handlers are set, skip
if (this.ws.readyState === WebSocket.CLOSED || this.ws.readyState === WebSocket.CLOSING) {
console.warn('[RoomWs] WebSocket closed immediately');
// If we used an existing token and it was immediately rejected, retry with a new token
if (!forceNewToken && this.wsToken) {
console.debug('[RoomWs] Existing token rejected — fetching new token and retrying');
const savedToken = this.wsToken;
this.wsToken = null;
return this.connect(true);
}
return;
}
return new Promise((resolve, reject) => {
// Safety timeout: if not open within 10s, give up
const timeoutId = setTimeout(() => {
if (this.status === 'connecting') {
console.error(`[RoomWs] Connection timeout after 10s — closing`);
this.ws?.close();
this.setStatus('error');
reject(new Error('Connection timeout'));
}
}, 10_000);
this.ws!.onopen = () => {
clearTimeout(timeoutId);
console.debug('[RoomWs] Connected');
this.reconnectAttempt = 0;
this.setStatus('open');
this.resubscribeAll().catch(() => {});
resolve();
};
this.ws!.onmessage = (ev: MessageEvent) => {
try {
const message: WsInMessage = JSON.parse(ev.data);
this.handleMessage(message);
} catch (e) {
console.warn('[RoomWs] parse error:', e);
}
};
this.ws!.onclose = (ev: CloseEvent) => {
clearTimeout(timeoutId);
console.debug(`[RoomWs] onclose code=${ev.code} reason=${ev.reason || 'none'} wasClean=${ev.wasClean}`);
this.ws = null;
this.setStatus('closed');
for (const [, req] of this.pendingRequests) {
clearTimeout(req.timeout);
req.reject(new Error(`WebSocket closed: ${ev.reason || 'unknown'}`));
}
this.pendingRequests.clear();
if (this.shouldReconnect) {
this.scheduleReconnect();
}
};
this.ws!.onerror = () => {
clearTimeout(timeoutId);
const rs = this.ws?.readyState ?? 'unknown';
console.error(`[RoomWs] onerror readyState=${rs} url=${wsUrl}`);
this.setStatus('error');
this.callbacks.onError?.(new Error(`WebSocket error (readyState=${rs})`));
reject(new Error(`WebSocket error (readyState=${rs})`));
};
});
}
disconnect(): void {
this.shouldReconnect = false;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.ws) {
this.setStatus('closing');
this.ws.close();
this.ws = null;
}
this.setStatus('closed');
}
private async request<T = WsResponseData>(action: WsAction, params?: WsRequestParams): Promise<T> {
if (this.ws && this.status === 'open') {
return await this.requestWs<T>(action, params);
}
return this.requestHttp<T>(action, params);
}
private async requestWs<T = WsResponseData>(
action: WsAction,
params?: WsRequestParams,
): Promise<T> {
return new Promise<T>((resolve, reject) => {
const requestId = uuidv7();
const timeout = setTimeout(() => {
this.pendingRequests.delete(requestId);
reject(new Error(`Request timeout: ${action}`));
}, this.requestTimeout);
this.pendingRequests.set(requestId, { resolve: resolve as (value: unknown) => void, reject, timeout });
const request: WsRequest = {
type: 'request',
request_id: requestId,
action,
params,
};
this.ws!.send(JSON.stringify(request));
});
}
private async requestHttp<T>(action: WsAction, params?: WsRequestParams): Promise<T> {
const endpoint = this.actionToEndpoint(action);
let path = endpoint.path;
for (const paramName of endpoint.pathParams) {
const value = params?.[paramName as keyof WsRequestParams];
if (value !== undefined && value !== null) {
path = path.replace(`{${paramName}}`, String(value));
}
}
const url = `${this.baseUrl}/api${path}`;
const method = endpoint.method;
const headers: Record<string, string> = {
'Content-Type': 'application/json',
};
if (this.wsToken) {
headers['Authorization'] = `Bearer ${this.wsToken}`;
}
const fullUrl = new URL(url);
if (method === 'GET' && params) {
Object.entries(params).forEach(([key, value]) => {
if (value !== undefined && value !== null && !endpoint.pathParams.includes(key)) {
fullUrl.searchParams.append(key, String(value));
}
});
}
const options: RequestInit = {
method,
headers,
credentials: 'include',
};
if ((method === 'POST' || method === 'PUT' || method === 'PATCH') && params) {
const bodyParams: Record<string, unknown> = {};
Object.entries(params).forEach(([key, value]) => {
if (value !== undefined && value !== null && !endpoint.pathParams.includes(key)) {
bodyParams[key] = value;
}
});
options.body = JSON.stringify(bodyParams);
}
const response = await fetch(fullUrl.toString(), options);
if (!response.ok) {
const errorText = await response.text();
throw new Error(`HTTP ${response.status}: ${errorText}`);
}
const json = await response.json();
return json.data as T;
}
private actionToEndpoint(action: WsAction): { path: string; method: string; pathParams: string[] } {
switch (action) {
case 'room.list': return { path: '/project_room/{project_name}/rooms', method: 'GET', pathParams: ['project_name'] };
case 'room.get': return { path: '/rooms/{room_id}', method: 'GET', pathParams: ['room_id'] };
case 'room.create': return { path: '/project_room/{project_name}/rooms', method: 'POST', pathParams: ['project_name'] };
case 'room.update': return { path: '/rooms/{room_id}', method: 'PATCH', pathParams: ['room_id'] };
case 'room.delete': return { path: '/rooms/{room_id}', method: 'DELETE', pathParams: ['room_id'] };
case 'category.list': return { path: '/project_room/{project_name}/room-categories', method: 'GET', pathParams: ['project_name'] };
case 'category.create': return { path: '/project_room/{project_name}/room-categories', method: 'POST', pathParams: ['project_name'] };
case 'category.update': return { path: '/room-categories/{category_id}', method: 'PATCH', pathParams: ['category_id'] };
case 'category.delete': return { path: '/room-categories/{category_id}', method: 'DELETE', pathParams: ['category_id'] };
case 'message.list': return { path: '/rooms/{room_id}/messages', method: 'GET', pathParams: ['room_id'] };
case 'message.create': return { path: '/rooms/{room_id}/messages', method: 'POST', pathParams: ['room_id'] };
case 'message.update': return { path: '/rooms/{room_id}/messages/{message_id}', method: 'PATCH', pathParams: ['room_id', 'message_id'] };
case 'message.revoke': return { path: '/rooms/{room_id}/messages/{message_id}/revoke', method: 'POST', pathParams: ['room_id', 'message_id'] };
case 'message.get': return { path: '/rooms/{room_id}/messages/{message_id}', method: 'GET', pathParams: ['room_id', 'message_id'] };
case 'message.edit_history': return { path: '/rooms/{room_id}/messages/{message_id}/edit-history', method: 'GET', pathParams: ['room_id', 'message_id'] };
case 'message.search': return { path: '/rooms/{room_id}/messages/search', method: 'GET', pathParams: ['room_id'] };
case 'member.list': return { path: '/rooms/{room_id}/members', method: 'GET', pathParams: ['room_id'] };
case 'member.add': return { path: '/rooms/{room_id}/members', method: 'POST', pathParams: ['room_id'] };
case 'member.remove': return { path: '/rooms/{room_id}/members/{user_id}', method: 'DELETE', pathParams: ['room_id', 'user_id'] };
case 'member.leave': return { path: '/rooms/{room_id}/members/me', method: 'DELETE', pathParams: ['room_id'] };
case 'member.set_read_seq': return { path: '/rooms/{room_id}/members/me/read-seq', method: 'PATCH', pathParams: ['room_id'] };
case 'member.update_role': return { path: '/rooms/{room_id}/members/{user_id}/role', method: 'PATCH', pathParams: ['room_id', 'user_id'] };
case 'pin.list': return { path: '/rooms/{room_id}/pins', method: 'GET', pathParams: ['room_id'] };
case 'pin.add': return { path: '/rooms/{room_id}/messages/{message_id}/pin', method: 'POST', pathParams: ['room_id', 'message_id'] };
case 'pin.remove': return { path: '/rooms/{room_id}/messages/{message_id}/pin', method: 'DELETE', pathParams: ['room_id', 'message_id'] };
case 'thread.list': return { path: '/rooms/{room_id}/threads', method: 'GET', pathParams: ['room_id'] };
case 'thread.create': return { path: '/rooms/{room_id}/threads', method: 'POST', pathParams: ['room_id'] };
case 'thread.messages': return { path: '/rooms/{room_id}/threads/{thread_id}/messages', method: 'GET', pathParams: ['room_id', 'thread_id'] };
case 'reaction.add': return { path: '/rooms/{room_id}/messages/{message_id}/reactions', method: 'POST', pathParams: ['room_id', 'message_id'] };
case 'reaction.remove': return { path: '/rooms/{room_id}/messages/{message_id}/reactions/{emoji}', method: 'DELETE', pathParams: ['room_id', 'message_id', 'emoji'] };
case 'reaction.get': return { path: '/rooms/{room_id}/messages/{message_id}/reactions', method: 'GET', pathParams: ['room_id', 'message_id'] };
case 'reaction.list_batch': return { path: '/rooms/{room_id}/messages/reactions/batch', method: 'GET', pathParams: ['room_id'] };
case 'ai.list': return { path: '/rooms/{room_id}/ai', method: 'GET', pathParams: ['room_id'] };
case 'ai.upsert': return { path: '/rooms/{room_id}/ai', method: 'PUT', pathParams: ['room_id'] };
case 'ai.delete': return { path: '/rooms/{room_id}/ai/{model_id}', method: 'DELETE', pathParams: ['room_id', 'model_id'] };
case 'notification.list': return { path: '/me/notifications', method: 'GET', pathParams: [] };
case 'notification.mark_read': return { path: '/me/notifications/{notification_id}/read', method: 'POST', pathParams: ['notification_id'] };
case 'notification.mark_all_read': return { path: '/me/notifications/read-all', method: 'POST', pathParams: [] };
case 'notification.archive': return { path: '/me/notifications/{notification_id}/archive', method: 'POST', pathParams: ['notification_id'] };
case 'mention.list': return { path: '/me/mentions', method: 'GET', pathParams: [] };
case 'mention.read_all': return { path: '/me/mentions/read-all', method: 'POST', pathParams: [] };
case 'room.subscribe': return { path: '/ws', method: 'POST', pathParams: [] };
case 'room.unsubscribe': return { path: '/ws', method: 'POST', pathParams: [] };
case 'project.subscribe': return { path: '/ws', method: 'POST', pathParams: [] };
case 'project.unsubscribe': return { path: '/ws', method: 'POST', pathParams: [] };
default: throw new Error(`Unknown action: ${action}`);
}
}
async roomList(projectName: string, onlyPublic?: boolean): Promise<RoomResponse[]> {
const data = await this.request<RoomResponse[]>('room.list', {
project_name: projectName,
only_public: onlyPublic,
});
return Array.isArray(data) ? data : [];
}
async roomGet(roomId: string): Promise<RoomResponse | null> {
const data = await this.request<RoomResponse>('room.get', { room_id: roomId });
return data || null;
}
async roomCreate(
projectName: string,
roomName: string,
isPublic: boolean,
categoryId?: string,
): Promise<RoomResponse> {
return this.request<RoomResponse>('room.create', {
project_name: projectName,
room_name: roomName,
room_public: isPublic,
room_category: categoryId,
});
}
async roomUpdate(
roomId: string,
updates: {
roomName?: string;
isPublic?: boolean;
categoryId?: string;
},
): Promise<RoomResponse> {
return this.request<RoomResponse>('room.update', {
room_id: roomId,
room_name: updates.roomName,
room_public: updates.isPublic,
room_category: updates.categoryId,
});
}
async roomDelete(roomId: string): Promise<boolean> {
const data = await this.request<boolean>('room.delete', { room_id: roomId });
return data === true;
}
async categoryList(projectName: string): Promise<RoomCategoryResponse[]> {
const data = await this.request<RoomCategoryResponse[]>('category.list', {
project_name: projectName,
});
return Array.isArray(data) ? data : [];
}
async categoryCreate(projectName: string, name: string, position?: number): Promise<RoomCategoryResponse> {
return this.request<RoomCategoryResponse>('category.create', {
project_name: projectName,
name,
position,
});
}
async categoryUpdate(
categoryId: string,
updates: { name?: string; position?: number },
): Promise<RoomCategoryResponse> {
return this.request<RoomCategoryResponse>('category.update', {
category_id: categoryId,
name: updates.name,
position: updates.position,
});
}
async categoryDelete(categoryId: string): Promise<boolean> {
const data = await this.request<boolean>('category.delete', { category_id: categoryId });
return data === true;
}
async messageList(
roomId: string,
options?: {
beforeSeq?: number;
afterSeq?: number;
limit?: number;
},
): Promise<RoomMessageListResponse> {
const data = await this.request<RoomMessageListResponse>('message.list', {
room_id: roomId,
before_seq: options?.beforeSeq,
after_seq: options?.afterSeq,
limit: options?.limit,
});
return data || { messages: [], total: 0 };
}
async messageCreate(
roomId: string,
content: string,
options?: {
contentType?: string;
threadId?: string;
inReplyTo?: string;
},
): Promise<RoomMessageResponse> {
return this.request<RoomMessageResponse>('message.create', {
room_id: roomId,
content,
content_type: options?.contentType,
thread_id: options?.threadId,
in_reply_to: options?.inReplyTo,
});
}
async messageUpdate(messageId: string, content: string): Promise<RoomMessageResponse> {
return this.request<RoomMessageResponse>('message.update', {
message_id: messageId,
content,
});
}
async messageRevoke(messageId: string): Promise<RoomMessageResponse> {
return this.request<RoomMessageResponse>('message.revoke', { message_id: messageId });
}
async messageGet(messageId: string): Promise<RoomMessageResponse | null> {
return this.request<RoomMessageResponse>('message.get', { message_id: messageId });
}
async messageEditHistory(messageId: string): Promise<MessageEditHistoryResponse> {
const data = await this.request<MessageEditHistoryResponse>('message.edit_history', {
message_id: messageId,
});
return data || { message_id: messageId, history: [], total_edits: 0 };
}
async memberList(roomId: string): Promise<RoomMemberResponse[]> {
const data = await this.request<RoomMemberResponse[]>('member.list', { room_id: roomId });
return Array.isArray(data) ? data : [];
}
async memberAdd(roomId: string, userId: string, role?: string): Promise<RoomMemberResponse> {
return this.request<RoomMemberResponse>('member.add', {
room_id: roomId,
user_id: userId,
role,
});
}
async memberRemove(roomId: string, userId: string): Promise<boolean> {
const data = await this.request<boolean>('member.remove', {
room_id: roomId,
user_id: userId,
});
return data === true;
}
async memberLeave(roomId: string): Promise<boolean> {
const data = await this.request<boolean>('member.leave', { room_id: roomId });
return data === true;
}
async memberSetReadSeq(roomId: string, lastReadSeq: number): Promise<RoomMemberResponse> {
return this.request<RoomMemberResponse>('member.set_read_seq', {
room_id: roomId,
last_read_seq: lastReadSeq,
});
}
async memberUpdateRole(roomId: string, userId: string, role: string): Promise<RoomMemberResponse> {
return this.request<RoomMemberResponse>('member.update_role', {
room_id: roomId,
user_id: userId,
role,
});
}
async pinList(roomId: string): Promise<PinResponseData[]> {
const data = await this.request<PinResponseData[]>('pin.list', { room_id: roomId });
return Array.isArray(data) ? data : [];
}
async pinAdd(roomId: string, messageId: string): Promise<PinResponseData> {
return this.request<PinResponseData>('pin.add', { room_id: roomId, message_id: messageId });
}
async pinRemove(roomId: string, messageId: string): Promise<boolean> {
const data = await this.request<boolean>('pin.remove', { room_id: roomId, message_id: messageId });
return data === true;
}
async threadList(roomId: string): Promise<RoomThreadResponse[]> {
const data = await this.request<RoomThreadResponse[]>('thread.list', { room_id: roomId });
return Array.isArray(data) ? data : [];
}
async threadCreate(roomId: string, parentSeq: number): Promise<RoomThreadResponse> {
return this.request<RoomThreadResponse>('thread.create', {
room_id: roomId,
parent_seq: parentSeq,
});
}
async threadMessages(
threadId: string,
options?: {
beforeSeq?: number;
afterSeq?: number;
limit?: number;
},
): Promise<RoomMessageListResponse> {
const data = await this.request<RoomMessageListResponse>('thread.messages', {
thread_id: threadId,
before_seq: options?.beforeSeq,
after_seq: options?.afterSeq,
limit: options?.limit,
});
return data || { messages: [], total: 0 };
}
async reactionAdd(roomId: string, messageId: string, emoji: string): Promise<ReactionListData> {
return this.request<ReactionListData>('reaction.add', {
room_id: roomId,
message_id: messageId,
emoji,
});
}
async reactionRemove(roomId: string, messageId: string, emoji: string): Promise<ReactionListData> {
return this.request<ReactionListData>('reaction.remove', {
room_id: roomId,
message_id: messageId,
emoji,
});
}
async reactionGet(roomId: string, messageId: string): Promise<ReactionListData> {
const data = await this.request<ReactionListData>('reaction.get', { room_id: roomId, message_id: messageId });
return data || { message_id: messageId, reactions: [] };
}
async reactionListBatch(roomId: string, messageIds: string[]): Promise<ReactionListData[]> {
if (messageIds.length === 0) return [];
const data = await this.request<ReactionListData[]>('reaction.list_batch', {
room_id: roomId,
message_ids: messageIds,
});
return data || [];
}
async messageSearch(
roomId: string,
query: string,
options?: {
limit?: number;
offset?: number;
},
): Promise<SearchResultData> {
const data = await this.request<SearchResultData>('message.search', {
room_id: roomId,
query,
limit: options?.limit,
offset: options?.offset,
});
return data || { messages: [], total: 0 };
}
async aiList(roomId: string): Promise<AiConfigData[]> {
const data = await this.request<AiConfigData[]>('ai.list', { room_id: roomId });
return Array.isArray(data) ? data : [];
}
async aiUpsert(
roomId: string,
model: string,
options?: {
version?: string;
historyLimit?: number;
systemPrompt?: string;
temperature?: number;
maxTokens?: number;
useExact?: boolean;
think?: boolean;
stream?: boolean;
minScore?: number;
},
): Promise<AiConfigData> {
return this.request<AiConfigData>('ai.upsert', {
room_id: roomId,
model,
model_version: options?.version,
history_limit: options?.historyLimit,
system_prompt: options?.systemPrompt,
temperature: options?.temperature,
max_tokens: options?.maxTokens,
use_exact: options?.useExact,
think: options?.think,
stream: options?.stream,
min_score: options?.minScore,
});
}
async aiDelete(roomId: string, modelId: string): Promise<boolean> {
const data = await this.request<boolean>('ai.delete', {
room_id: roomId,
model_id: modelId,
});
return data === true;
}
async notificationList(options?: {
onlyUnread?: boolean;
limit?: number;
}): Promise<NotificationListData> {
const data = await this.request<NotificationListData>('notification.list', {
only_unread: options?.onlyUnread,
limit: options?.limit,
});
return data || { notifications: [], total: 0, unread_count: 0 };
}
async notificationMarkRead(notificationId: string): Promise<boolean> {
const data = await this.request<boolean>('notification.mark_read', {
notification_id: notificationId,
});
return data === true;
}
async notificationMarkAllRead(): Promise<number> {
const data = await this.request<number>('notification.mark_all_read', {});
return typeof data === 'number' ? data : 0;
}
async notificationArchive(notificationId: string): Promise<boolean> {
const data = await this.request<boolean>('notification.archive', {
notification_id: notificationId,
});
return data === true;
}
async mentionList(limit?: number): Promise<MentionListData> {
const data = await this.request<MentionListData>('mention.list', { limit });
return data || { mentions: [] };
}
async mentionReadAll(): Promise<boolean> {
const data = await this.request<boolean>('mention.read_all', {});
return data === true;
}
async subscribeRoom(roomId: string): Promise<void> {
this.subscribedRooms.add(roomId);
// subscribe is WS-only, don't fallback to HTTP
if (this.status === 'open' && this.ws) {
try {
await this.requestWs<SubscribeData>('room.subscribe', { room_id: roomId });
} catch (err) {
// WS subscribe failed, add retry logic
console.warn('[RoomWs] subscribeRoom failed, will retry:', roomId, err);
// Schedule a retry with delay
this.retrySubscribeRoom(roomId, 1);
}
}
}
private async retrySubscribeRoom(roomId: string, attempt: number): Promise<void> {
if (!this.subscribedRooms.has(roomId)) return; // User unsubscribed
if (attempt > 3) {
console.error('[RoomWs] subscribeRoom retry exhausted:', roomId);
return;
}
const delay = 1000 * Math.pow(2, attempt - 1); // 1s, 2s, 4s
console.debug(`[RoomWs] retrying subscribeRoom in ${delay}ms (attempt ${attempt}/3)`);
await new Promise(resolve => setTimeout(resolve, delay));
if (this.status === 'open' && this.ws) {
try {
await this.requestWs<SubscribeData>('room.subscribe', { room_id: roomId });
console.debug('[RoomWs] subscribeRoom retry succeeded:', roomId);
} catch (err) {
console.warn('[RoomWs] subscribeRoom retry failed:', roomId, err);
this.retrySubscribeRoom(roomId, attempt + 1);
}
} else {
// WS not open, will retry when connection is established via resubscribeAll
console.debug('[RoomWs] WS not open, will retry on reconnect');
}
}
async unsubscribeRoom(roomId: string): Promise<void> {
this.subscribedRooms.delete(roomId);
if (this.status === 'open' && this.ws) {
try {
await this.requestWs<boolean>('room.unsubscribe', { room_id: roomId });
} catch {
// ignore
}
}
}
async subscribeProject(projectName: string): Promise<void> {
this.subscribedProjects.add(projectName);
// subscribe is WS-only, don't fallback to HTTP
if (this.status === 'open' && this.ws) {
try {
await this.requestWs<SubscribeData>('project.subscribe', { project_name: projectName });
} catch {
console.warn('[RoomWs] subscribeProject failed:', projectName);
}
}
}
async unsubscribeProject(projectName: string): Promise<void> {
this.subscribedProjects.delete(projectName);
if (this.status === 'open' && this.ws) {
try {
await this.requestWs<boolean>('project.unsubscribe', { project_name: projectName });
} catch {
// ignore
}
}
}
private buildWsUrl(): string {
const wsBase = this.baseUrl.replace(/^http/, 'ws').replace(/^https/, 'wss');
let url = `${wsBase}/ws`;
// Add token as query parameter if available
if (this.wsToken) {
url = `${url}?token=${this.wsToken}`;
}
return url;
}
private handleMessage(message: WsInMessage): void {
if ('type' in message && message.type === 'error') {
return;
}
if ('request_id' in message && 'action' in message) {
const response = message as WsResponse;
const req = this.pendingRequests.get(response.request_id);
if (req) {
clearTimeout(req.timeout);
this.pendingRequests.delete(response.request_id);
if (response.error) {
req.reject(new Error(`${response.error.error}: ${response.error.message}`));
} else {
req.resolve(response.data as WsResponseData);
}
}
}
if ('type' in message && message.type === 'event') {
this.handlePushEvent(message as WsEvent);
}
}
private handlePushEvent(event: WsEvent): void {
if (!event.event) return;
// Parse backend event type string to typed payload
switch (event.event) {
case 'room.message':
case 'room_message':
this.callbacks.onRoomMessage?.(event.data as RoomMessagePayload);
break;
case 'project.event':
case 'project_event':
this.dispatchProjectEvent(event.data as ProjectEventPayload);
break;
case 'ai.stream_chunk':
case 'ai_stream_chunk':
this.callbacks.onAiStreamChunk?.(event.data as AiStreamChunkPayload);
break;
case 'room.reaction_updated':
case 'room_reaction_updated':
this.callbacks.onRoomReactionUpdated?.(event.data as RoomReactionUpdatedPayload);
break;
default:
// Unknown event type - ignore silently
break;
}
}
private dispatchProjectEvent(event: ProjectEventPayload): void {
switch (event.event_type) {
case 'message_edited':
this.callbacks.onMessageEdited?.({
message_id: event.message_id ?? '',
room_id: event.room_id ?? '',
edited_at: event.timestamp,
} as import('./ws-protocol').MessageEditedPayload);
break;
case 'message_revoked':
this.callbacks.onMessageRevoked?.({
message_id: event.message_id ?? '',
room_id: event.room_id ?? '',
revoked_at: event.timestamp,
revoked_by: '',
} as import('./ws-protocol').MessageRevokedPayload);
break;
case 'message_pinned':
this.callbacks.onMessagePinned?.({
message_id: event.message_id ?? '',
room_id: event.room_id ?? '',
pinned_by: '',
pinned_at: event.timestamp,
} as import('./ws-protocol').MessagePinnedPayload);
break;
case 'message_unpinned':
this.callbacks.onMessageUnpinned?.({
message_id: event.message_id ?? '',
room_id: event.room_id ?? '',
} as import('./ws-protocol').MessageUnpinnedPayload);
break;
default:
// Other project events (member_joined, room_created, etc.)
this.callbacks.onProjectEvent?.(event);
break;
}
}
private setStatus(status: RoomWsStatus): void {
this.status = status;
this.callbacks.onStatusChange?.(status);
}
private async resubscribeAll(): Promise<void> {
for (const roomId of this.subscribedRooms) {
try {
await this.request('room.subscribe', { room_id: roomId });
} catch (err) {
// Resubscribe failure is non-fatal — messages still arrive via REST poll.
// Log at warn level so operators can observe patterns (e.g. auth expiry).
console.warn(`[RoomWs] resubscribe room failed (will retry on next reconnect): ${roomId}`, err);
}
}
for (const projectName of this.subscribedProjects) {
try {
await this.request('project.subscribe', { project_name: projectName });
} catch (err) {
console.warn(`[RoomWs] resubscribe project failed (will retry on next reconnect): ${projectName}`, err);
}
}
}
private scheduleReconnect(): void {
if (!this.shouldReconnect) return;
// Exponential backoff with full jitter (uniform random within the backoff window).
// Without jitter, all disconnected clients reconnect at exactly the same time
// (thundering herd) after a server restart, overwhelming it.
const baseDelay = this.reconnectBaseDelay * Math.pow(2, this.reconnectAttempt);
const cappedDelay = Math.min(baseDelay, this.reconnectMaxDelay);
const jitter = Math.random() * cappedDelay;
const delay = Math.floor(jitter);
this.reconnectAttempt++;
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null;
this.connect().catch(() => {});
}, delay);
}
}
export function createRoomWsClient(
baseUrl: string,
callbacks: RoomWsCallbacks = {},
options?: {
reconnectBaseDelay?: number;
reconnectMaxDelay?: number;
requestTimeout?: number;
wsToken?: string;
},
): RoomWsClient {
return new RoomWsClient(baseUrl, callbacks, options);
}