gitdataai/src/lib/universal-ws.ts
2026-04-15 09:08:09 +08:00

204 lines
5.7 KiB
TypeScript

import { v7 as uuidv7 } from 'uuid';
import type { WsRequest, WsEvent, WsInMessage, WsAction, WsResponse } from './ws-protocol';
import { buildWsUrlWithToken } from './ws-token';
interface UniversalWsClientOptions {
onEvent?: (event: WsEvent) => void;
onStatusChange?: (status: WsStatus) => void;
reconnectBaseDelay?: number;
reconnectMaxDelay?: number;
}
export type WsStatus = 'idle' | 'connecting' | 'open' | 'closing' | 'closed' | 'error';
interface PendingRequest<T = unknown> {
resolve: (value: T) => void;
reject: (error: Error) => void;
timeout: ReturnType<typeof setTimeout>;
}
export class UniversalWsClient {
private ws: WebSocket | null = null;
private status: WsStatus = 'idle';
private baseUrl: string;
private options: UniversalWsClientOptions;
private pendingRequests = new Map<string, PendingRequest>();
private reconnectAttempt = 0;
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
private shouldReconnect = true;
private subscribedRooms = new Set<string>();
private wsToken: string | null = null;
constructor(
baseUrl: string,
options: UniversalWsClientOptions & { wsToken?: string } = {},
) {
this.baseUrl = baseUrl;
this.options = options;
this.wsToken = options.wsToken ?? null;
}
getStatus(): WsStatus {
return this.status;
}
connect(): Promise<void> {
return new Promise((resolve, reject) => {
if (this.ws && this.status === 'open') {
resolve();
return;
}
this.shouldReconnect = true;
this.setStatus('connecting');
const wsUrl = this.buildWsUrl();
const ws = new WebSocket(wsUrl);
this.ws = ws;
ws.onopen = () => {
this.reconnectAttempt = 0;
this.setStatus('open');
// Re-subscribe to rooms after reconnect
for (const roomId of this.subscribedRooms) {
this.request('room.subscribe', { room_id: roomId }).catch(() => {
// ignore re-subscribe errors
});
}
resolve();
};
ws.onmessage = (ev: MessageEvent) => {
try {
const message: WsInMessage = JSON.parse(ev.data);
this.handleMessage(message);
} catch {
console.warn('[UniversalWs] parse error:', ev.data);
}
};
ws.onclose = (ev: CloseEvent) => {
this.ws = null;
this.setStatus('closed');
// Reject all pending requests
for (const [id, req] of this.pendingRequests) {
clearTimeout(req.timeout);
req.reject(new Error(`WebSocket closed: ${ev.reason || 'unknown'}`));
this.pendingRequests.delete(id);
}
// Attempt reconnection
if (this.shouldReconnect) {
this.scheduleReconnect();
}
};
ws.onerror = (ev: Event) => {
console.error('[UniversalWs] error:', ev);
this.setStatus('error');
reject(new Error('WebSocket connection failed'));
};
});
}
disconnect(): void {
this.shouldReconnect = false;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.ws) {
this.setStatus('closing');
this.ws.close();
this.ws = null;
}
this.setStatus('closed');
}
request<T = unknown>(action: WsAction, params?: Record<string, unknown>): Promise<T> {
return new Promise((resolve, reject) => {
if (!this.ws || this.status !== 'open') {
reject(new Error('WebSocket not connected'));
return;
}
const requestId = uuidv7();
const timeout = setTimeout(() => {
this.pendingRequests.delete(requestId);
reject(new Error(`Request timeout: ${action}`));
}, 30_000);
this.pendingRequests.set(requestId, { resolve: resolve as unknown as (value: unknown) => void, reject, timeout });
const request: WsRequest = {
type: 'request',
request_id: requestId,
action,
params,
};
this.ws.send(JSON.stringify(request));
});
}
async subscribe(roomId: string): Promise<void> {
this.subscribedRooms.add(roomId);
if (this.status === 'open') {
await this.request('room.subscribe', { room_id: roomId });
}
}
async unsubscribe(roomId: string): Promise<void> {
this.subscribedRooms.delete(roomId);
if (this.status === 'open') {
await this.request('room.unsubscribe', { room_id: roomId });
}
}
private buildWsUrl(): string {
return buildWsUrlWithToken(this.baseUrl, '/ws', this.wsToken);
}
private handleMessage(message: WsInMessage): void {
if ('type' in message && message.type === 'response') {
const msg = message as WsResponse;
const req = this.pendingRequests.get(msg.request_id);
if (req) {
clearTimeout(req.timeout);
this.pendingRequests.delete(msg.request_id);
if (msg.error) {
req.reject(new Error(msg.error.error || String(msg.error)));
} else {
req.resolve(msg.data as unknown);
}
}
} else if ('event' in message) {
this.options.onEvent?.(message as WsEvent);
}
}
private setStatus(status: WsStatus): void {
this.status = status;
this.options.onStatusChange?.(status);
}
private scheduleReconnect(): void {
if (!this.shouldReconnect) return;
const baseDelay = this.options.reconnectBaseDelay ?? 1000;
const maxDelay = this.options.reconnectMaxDelay ?? 15000;
const delay = Math.min(baseDelay * 2 ** this.reconnectAttempt, maxDelay);
this.reconnectAttempt++;
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null;
this.connect().catch(() => {
// connect() will retry on its own
});
}, delay);
}
}