gitdataai/src/ws/bridge.ts

146 lines
5.6 KiB
TypeScript

/**
* Socket.IO bridge server — thin Node.js process that connects to the
* Rust backend via raw WebSocket and exposes events via Socket.IO.
*
* Usage: `node --import tsx src/ws/bridge.ts`
* Or: integrate into Vite dev server via custom plugin.
*
* Protocol mapping:
* - Rust → raw WS tagged JSON → bridge parses `type` field → emits Socket.IO event
* - Socket.IO event → bridge wraps with `type` field → sends raw WS tagged JSON
*/
import { createServer } from 'http';
import { Server as IoServer } from 'socket.io';
import type { WsOutEvent, WsOutEventName } from './types/outbound';
import type { WsInMessage, WsInEventName } from './types/inbound';
import { DEFAULT_WS_PATH } from './constants';
const RUST_BACKEND_URL = process.env.RUST_BACKEND_URL || 'http://localhost:8080';
const BRIDGE_PORT = parseInt(process.env.WS_BRIDGE_PORT || '3001', 10);
// ── HTTP server + Socket.IO ─────────────────────────────
const httpServer = createServer();
const ioServer = new IoServer(httpServer, {
cors: { origin: '*', methods: ['GET', 'POST'] },
transports: ['websocket'],
});
// ── Auth: validate Socket.IO connection via backend WS token ──
ioServer.use(async (socket, next) => {
const token = socket.handshake.auth.token;
if (!token) {
// Try to fetch a token from the backend using the user's session cookie
// This requires the frontend to pass the auth token
return next(new Error('no token provided'));
}
// Validate token against Rust backend
try {
const res = await fetch(`${RUST_BACKEND_URL}/api/ws/token`, {
method: 'POST',
headers: { Authorization: `Bearer ${token}` },
});
if (res.ok) {
const body = await res.json();
socket.data.userId = body.data?.user_id || body.user_id;
return next();
}
return next(new Error('token validation failed'));
} catch {
return next(new Error('backend unreachable'));
}
});
// ── Per-connection raw WS to Rust backend ───────────────
ioServer.on('connection', (ioSocket) => {
const userId = ioSocket.data.userId as string;
console.log(`[bridge] Socket.IO client connected: ${userId}`);
// Connect to Rust backend via raw WebSocket
const wsUrl = `${RUST_BACKEND_URL.replace('http', 'ws')}${DEFAULT_WS_PATH}?token=${encodeURIComponent(ioSocket.handshake.auth.token)}`;
const rawWs = new WebSocket(wsUrl);
// ── Rust → Socket.IO ────────────────────────────────
rawWs.onmessage = (ev) => {
try {
const event = JSON.parse(ev.data as string) as WsOutEvent;
const eventName: WsOutEventName = event.type;
// Forward to Socket.IO client
// Remove the `type` field from payload (Socket.IO event name replaces it)
const payload = { ...event };
delete (payload as Record<string, unknown>).type;
ioSocket.emit(eventName, payload);
} catch {
// Raw WS ping/pong — forward as Socket.IO ping event
if ((ev.data as string).trim() === '{"type":"pong"}') {
ioSocket.emit('pong', {});
}
}
};
rawWs.onclose = () => {
console.log(`[bridge] Raw WS closed for user: ${userId}`);
ioSocket.disconnect(true);
};
rawWs.onerror = () => {
console.error(`[bridge] Raw WS error for user: ${userId}`);
};
// ── Socket.IO → Rust ────────────────────────────────
// Listen for all inbound event names and forward as tagged JSON
const inboundEvents: WsInEventName[] = [
'subscribe', 'unsubscribe',
'typing_start', 'typing_stop', 'read_receipt',
'message_list', 'message_create', 'message_update', 'message_revoke',
'room_get', 'room_create', 'room_update', 'room_delete',
'category_create', 'category_update', 'category_delete',
'access_grant', 'access_revoke',
'state_set_read_seq', 'state_update_dnd',
'reaction_add', 'reaction_remove',
'thread_create', 'thread_resolve', 'thread_archive',
'pin_add', 'pin_remove',
'draft_save', 'draft_clear', 'search',
'notification_mark_read', 'notification_mark_all_read', 'notification_archive',
'presence_update', 'custom_status_update',
'invite_create', 'invite_accept', 'invite_revoke',
'ban_create', 'ban_remove',
'voice_join', 'voice_leave', 'voice_mute', 'voice_deaf', 'screen_share',
'ai_list', 'ai_upsert', 'ai_delete', 'ai_stop',
];
for (const eventName of inboundEvents) {
ioSocket.on(eventName, (data: Record<string, unknown>) => {
const msg: WsInMessage = { type: eventName, ...data } as WsInMessage;
if (rawWs.readyState === WebSocket.OPEN) {
rawWs.send(JSON.stringify(msg));
}
});
}
// Special: Socket.IO ping → Rust WS ping
ioSocket.on('ping', () => {
if (rawWs.readyState === WebSocket.OPEN) {
rawWs.send(JSON.stringify({ type: 'ping' }));
}
});
// ── Cleanup ──────────────────────────────────────────
ioSocket.on('disconnect', () => {
console.log(`[bridge] Socket.IO client disconnected: ${userId}`);
if (rawWs.readyState === WebSocket.OPEN) {
rawWs.close();
}
});
});
// ── Start ───────────────────────────────────────────────
httpServer.listen(BRIDGE_PORT, () => {
console.log(`[bridge] Socket.IO bridge server running on port ${BRIDGE_PORT}`);
console.log(`[bridge] Connecting to Rust backend at ${RUST_BACKEND_URL}`);
});