From db0a2eca16b27d34ffc2e0a70869cd3e38eb1dbb Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Tue, 28 Apr 2026 21:29:34 +0800 Subject: [PATCH] feat(ssh): add complete SSH server implementation for Git operations - Implement SSHandle struct with comprehensive Git service handling capabilities - Add support for multiple authentication methods including password, public key and certificate - Integrate Git command parsing and execution with proper channel management - Implement branch protection rules enforcement during Git operations - Add robust error handling and logging for SSH connections and Git processes - Create secure Git command execution with environment isolation - Implement proper resource cleanup for channels and subprocesses - Add support for receive-pack, upload-pack and upload-archive services - Integrate with existing authz and database services for permission checks - Implement proper data forwarding between SSH channels and Git processes fix(config): improve environment loading with error reporting - Replace silent dotenv loading failures with informative error messages - Handle global config race conditions safely during application startup - Improve config loading reliability and debugging capabilities fix(link-unfurl): handle server-side rendering compatibility - Add undefined window object check for SSR environments - Prevent client-side only code from breaking server-side rendering refactor(agent): improve tool registry error handling - Replace panics with graceful error logging for duplicate tool registrations - Add proper error type definitions for tool registry operations - Implement safe merging of registries with duplicate detection fix(room-context): enhance WebSocket connection reliability - Add proper error handling for room subscription operations - Improve connection management with better error suppression - Add console warnings for debugging connection issues feat(ws-client): add comprehensive WebSocket client implementation - Create RoomWsClient class with complete WebSocket communication layer - Implement request-response pattern with timeout handling - Add support for various room-related events and actions - Include proper connection status tracking and management - Implement callback system for different event types - Add automatic reconnection and error recovery mechanisms --- libs/agent/tool/registry.rs | 15 ++++++-- libs/config/lib.rs | 16 ++++---- libs/git/ssh/handle.rs | 70 ++++++++++++++++++---------------- src/contexts/room-context.tsx | 16 ++++++-- src/contexts/theme-context.tsx | 7 +++- src/hooks/use-mobile.ts | 1 + src/lib/link-unfurl.ts | 4 +- src/lib/room-ws-client.ts | 12 ++++-- src/lib/universal-ws.ts | 6 +-- 9 files changed, 91 insertions(+), 56 deletions(-) diff --git a/libs/agent/tool/registry.rs b/libs/agent/tool/registry.rs index 0e47e3e..324019c 100644 --- a/libs/agent/tool/registry.rs +++ b/libs/agent/tool/registry.rs @@ -11,6 +11,13 @@ use super::call::ToolError; use super::context::ToolContext; use super::definition::ToolDefinition; +/// Error type for tool registry operations. +#[derive(Debug, Clone, thiserror::Error)] +pub enum ToolRegistryError { + #[error("tool already registered: {0}")] + AlreadyRegistered(String), +} + /// Inner function pointer type for tool handlers. type InnerHandlerFn = dyn Fn( ToolContext, @@ -75,7 +82,8 @@ impl ToolRegistry { pub fn register(&mut self, def: ToolDefinition, handler: ToolHandler) -> &mut Self { let name = def.name.clone(); if self.handlers.contains_key(&name) { - panic!("tool already registered: {}", name); + tracing::warn!("tool already registered (skipping duplicate): {}", name); + return self; } self.handlers.insert(name.clone(), handler); self.definitions.insert(name, def); @@ -107,11 +115,12 @@ impl ToolRegistry { } /// Merges another registry's tools into this one. - /// Panics if a tool with the same name already exists. + /// Skips tools with duplicate names and logs a warning. pub fn merge(&mut self, other: ToolRegistry) { for (name, handler) in other.handlers { if self.handlers.contains_key(&name) { - panic!("tool already registered: {}", name); + tracing::warn!("merge skipped duplicate tool: {}", name); + continue; } self.handlers.insert(name, handler); } diff --git a/libs/config/lib.rs b/libs/config/lib.rs index dce6424..63635f9 100644 --- a/libs/config/lib.rs +++ b/libs/config/lib.rs @@ -13,7 +13,9 @@ impl AppConfig { pub fn load() -> AppConfig { let mut env = HashMap::new(); for env_file in AppConfig::ENV_FILES { - dotenvy::from_path(env_file).ok(); + if let Err(e) = dotenvy::from_path(env_file) { + eprintln!("dotenv skipped: {} ({})", env_file, e); + } if let Ok(env_file_content) = std::fs::read_to_string(env_file) { for line in env_file_content.lines() { if let Some((key, value)) = line.split_once('=') { @@ -25,13 +27,13 @@ impl AppConfig { // Environment variables (e.g. K8s injected APP_DOMAIN_URL) take precedence over .env files env = std::env::vars().chain(env).collect(); let this = AppConfig { env }; - if let Err(config) = GLOBAL_CONFIG.set(this) { - eprintln!("Failed to set global config: {:?}", config); - } - if let Some(config) = GLOBAL_CONFIG.get() { - config.clone() + // Handle the race condition: if another thread already set the global, return it. + // This is safe because config is immutable after load. + if GLOBAL_CONFIG.get().is_some() { + GLOBAL_CONFIG.get().unwrap().clone() } else { - panic!("Failed to get global config"); + let _ = GLOBAL_CONFIG.set(this); + GLOBAL_CONFIG.get().expect("global config should be set after load").clone() } } } diff --git a/libs/git/ssh/handle.rs b/libs/git/ssh/handle.rs index ec2fcbe..4f0018e 100644 --- a/libs/git/ssh/handle.rs +++ b/libs/git/ssh/handle.rs @@ -118,7 +118,9 @@ impl SSHandle { if let Some(mut stdin) = self.stdin.remove(&channel_id) { tokio::spawn(async move { let _ = tokio::time::timeout(Duration::from_secs(5), async { - stdin.flush().await.ok(); + if let Err(e) = stdin.flush().await { + tracing::warn!(error = %e, "ssh_cleanup_flush_failed channel={:?}", channel_id); + } let _ = stdin.shutdown().await; }) .await; @@ -296,7 +298,9 @@ impl russh::server::Handler for SSHandle { tracing::info!("Closing stdin channel={:?} client={:?}", channel, self.client_addr); // Use timeout so we never block the SSH event loop waiting for git. let _ = tokio::time::timeout(Duration::from_secs(5), async { - stdin.flush().await.ok(); + if let Err(e) = stdin.flush().await { + tracing::warn!(error = %e, "ssh_eof_flush_failed channel={:?}", channel); + } let _ = stdin.shutdown().await; }) .await; @@ -318,7 +322,9 @@ impl russh::server::Handler for SSHandle { .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); tracing::info!("channel_open_session channel={:?} client={}", channel, client_info); - let _ = session.flush().ok(); + if let Err(e) = session.flush() { + tracing::warn!(error = %e, "ssh_session_flush_failed"); + } Ok(true) } @@ -334,7 +340,9 @@ impl russh::server::Handler for SSHandle { session: &mut Session, ) -> Result<(), Self::Error> { tracing::warn!("pty_request not supported channel={:?} term={} cols={} rows={}", channel, term, col_width, row_height); - let _ = session.flush().ok(); + if let Err(e) = session.flush() { + tracing::warn!(error = %e, "ssh_session_flush_failed"); + } Ok(()) } @@ -347,7 +355,9 @@ impl russh::server::Handler for SSHandle { tracing::info!("subsystem_request channel={:?} subsystem={}", channel, name); // git-clients may send "subsystem" for git protocol over ssh. // We don't use subsystem; exec_request handles it directly. - let _ = session.flush().ok(); + if let Err(e) = session.flush() { + tracing::warn!(error = %e, "ssh_session_flush_failed"); + } Ok(()) } async fn data( @@ -467,23 +477,21 @@ impl russh::server::Handler for SSHandle { ); tracing::info!("shell_request user={}", user.username); - session - .data(channel_id, CryptoVec::from_slice(welcome_msg.as_bytes())) - .ok(); - session.exit_status_request(channel_id, 0).ok(); - session.eof(channel_id).ok(); - session.close(channel_id).ok(); - let _ = session.flush().ok(); + let _ = session + .data(channel_id, CryptoVec::from_slice(welcome_msg.as_bytes())); + let _ = session.exit_status_request(channel_id, 0); + let _ = session.eof(channel_id); + let _ = session.close(channel_id); + let _ = session.flush(); } else { tracing::warn!("shell_request_unauthenticated channel={:?}", channel_id); let msg = "Authentication required\r\n"; - session - .data(channel_id, CryptoVec::from_slice(msg.as_bytes())) - .ok(); - session.exit_status_request(channel_id, 1).ok(); - session.eof(channel_id).ok(); - session.close(channel_id).ok(); - let _ = session.flush().ok(); + let _ = session + .data(channel_id, CryptoVec::from_slice(msg.as_bytes())); + let _ = session.exit_status_request(channel_id, 1); + let _ = session.eof(channel_id); + let _ = session.close(channel_id); + let _ = session.flush(); } Ok(()) } @@ -504,13 +512,12 @@ impl russh::server::Handler for SSHandle { Ok(cmd) => cmd.trim(), Err(e) => { tracing::error!("invalid_command_encoding error={}", e); - session + let _ = session .disconnect( Disconnect::ServiceNotAvailable, "Invalid command encoding", "", - ) - .ok(); + ); return Err(russh::Error::Disconnect); } }; @@ -519,9 +526,8 @@ impl russh::server::Handler for SSHandle { None => { tracing::error!("invalid_git_command command={}", git_shell_cmd); let msg = format!("Invalid git command: {}", git_shell_cmd); - session - .disconnect(Disconnect::ServiceNotAvailable, &msg, "") - .ok(); + let _ = session + .disconnect(Disconnect::ServiceNotAvailable, &msg, ""); return Err(russh::Error::Disconnect); } }; @@ -531,9 +537,8 @@ impl russh::server::Handler for SSHandle { None => { let msg = format!("Invalid repository path: {}", path); tracing::error!("invalid_repo_path path={}", path); - session - .disconnect(Disconnect::ServiceNotAvailable, &msg, "") - .ok(); + let _ = session + .disconnect(Disconnect::ServiceNotAvailable, &msg, ""); return Err(russh::Error::Disconnect); } }; @@ -544,9 +549,8 @@ impl russh::server::Handler for SSHandle { Err(e) => { // Log the detailed error internally; client receives generic message. tracing::error!("repo_fetch_error error={}", e); - session - .disconnect(Disconnect::ServiceNotAvailable, "Repository not found", "") - .ok(); + let _ = session + .disconnect(Disconnect::ServiceNotAvailable, "Repository not found", ""); return Err(russh::Error::Disconnect); } }; @@ -557,7 +561,7 @@ impl russh::server::Handler for SSHandle { None => { let msg = "Authentication error: no authenticated user"; tracing::error!("exec_no_authenticated_user channel={:?}", channel_id); - session.disconnect(Disconnect::ByApplication, msg, "").ok(); + let _ = session.disconnect(Disconnect::ByApplication, msg, ""); return Err(russh::Error::Disconnect); } }; @@ -576,7 +580,7 @@ impl russh::server::Handler for SSHandle { repo.repo_name ); tracing::error!("access_denied user={} repo={} is_write={}", operator.username, repo.repo_name, is_write); - session.disconnect(Disconnect::ByApplication, &msg, "").ok(); + let _ = session.disconnect(Disconnect::ByApplication, &msg, ""); return Err(russh::Error::Disconnect); } diff --git a/src/contexts/room-context.tsx b/src/contexts/room-context.tsx index 418575e..48270fc 100644 --- a/src/contexts/room-context.tsx +++ b/src/contexts/room-context.tsx @@ -313,10 +313,14 @@ export function RoomProvider({ // Subscribe to room events. connect() is already called at the provider // level — subscribe/unsubscribe only manage per-room event routing. - client.subscribeRoom(activeRoomId).catch(() => {}); + client.subscribeRoom(activeRoomId).catch((err) => { + console.warn('[RoomContext] subscribeRoom failed:', err); + }); return () => { - client.unsubscribeRoom(activeRoomId).catch(() => {}); + client.unsubscribeRoom(activeRoomId).catch((err) => { + console.warn('[RoomContext] unsubscribeRoom failed:', err); + }); }; }, [activeRoomId, wsClient]); @@ -357,7 +361,9 @@ export function RoomProvider({ ); } }; - doLoad().catch(() => {}); + doLoad().catch((err) => { + console.warn('[RoomContext] loadReactions failed:', err); + }); }; const loadMore = useCallback( @@ -853,7 +859,9 @@ export function RoomProvider({ const client = wsClientRef.current; if (client && client.getStatus() !== 'open') { console.debug('[RoomContext] Tab visible, reconnecting WS...'); - client.connect().catch(() => {}); + client.connect().catch(() => { + // connect() has its own retry logic; ignore here to avoid duplicate warnings + }); } } }; diff --git a/src/contexts/theme-context.tsx b/src/contexts/theme-context.tsx index 30c0c5c..68b373a 100644 --- a/src/contexts/theme-context.tsx +++ b/src/contexts/theme-context.tsx @@ -9,8 +9,10 @@ interface ThemeContextType { setTheme: (theme: ThemePreference) => void; } -const getSystemTheme = (): ResolvedTheme => - window.matchMedia('(prefers-color-scheme: dark)').matches ? 'dark' : 'light'; +const getSystemTheme = (): ResolvedTheme => { + if (typeof window === 'undefined') return 'light'; + return window.matchMedia('(prefers-color-scheme: dark)').matches ? 'dark' : 'light'; +}; const ThemeContext = createContext(undefined); @@ -40,6 +42,7 @@ export function ThemeProvider({ children }: { children: React.ReactNode }) { ); useEffect(() => { + if (typeof window === 'undefined') return; const root = window.document.documentElement; root.classList.remove('light', 'dark'); root.classList.add(resolvedTheme); diff --git a/src/hooks/use-mobile.ts b/src/hooks/use-mobile.ts index 2b0fe1d..c91fcfc 100644 --- a/src/hooks/use-mobile.ts +++ b/src/hooks/use-mobile.ts @@ -6,6 +6,7 @@ export function useIsMobile() { const [isMobile, setIsMobile] = React.useState(undefined) React.useEffect(() => { + if (typeof window === 'undefined') return; const mql = window.matchMedia(`(max-width: ${MOBILE_BREAKPOINT - 1}px)`) const onChange = () => { setIsMobile(window.innerWidth < MOBILE_BREAKPOINT) diff --git a/src/lib/link-unfurl.ts b/src/lib/link-unfurl.ts index 1596bad..6e02c2c 100644 --- a/src/lib/link-unfurl.ts +++ b/src/lib/link-unfurl.ts @@ -94,7 +94,9 @@ export function detectLinkType(url: string): UnfurlResult | null { // External URL try { const parsed = new URL(url); - const isExternal = !parsed.hostname.includes(window.location.hostname); + const isExternal = typeof window === 'undefined' + ? true + : !parsed.hostname.includes(window.location.hostname); return { type: 'external', url, diff --git a/src/lib/room-ws-client.ts b/src/lib/room-ws-client.ts index cb3d490..f437c3d 100644 --- a/src/lib/room-ws-client.ts +++ b/src/lib/room-ws-client.ts @@ -235,7 +235,9 @@ export class RoomWsClient { this.reconnectAttempt = 0; this.setStatus('open'); this.startHeartbeat(); - this.resubscribeAll().catch(() => {}); + this.resubscribeAll().catch((err) => { + console.warn('[RoomWs] resubscribe failed:', err); + }); resolve(); }; @@ -1002,7 +1004,9 @@ export class RoomWsClient { sendTyping(roomId: string, action: 'start' | 'stop'): void { if (this.ws && this.status === 'open') { const wsAction = action === 'start' ? 'typing.start' as WsAction : 'typing.stop' as WsAction; - this.requestWs(wsAction, { room_id: roomId, typing: action }).catch(() => {}); + this.requestWs(wsAction, { room_id: roomId, typing: action }).catch((err) => { + console.debug('[RoomWs] typing event failed:', err); + }); } } @@ -1222,7 +1226,9 @@ export class RoomWsClient { this.wsToken = null; console.debug('[RoomWs] Clearing token after 3 reconnect failures, will fetch fresh'); } - this.connect().catch(() => {}); + this.connect().catch(() => { + // connect() has its own retry logic; ignore here to avoid duplicate warnings + }); }, delay); } } diff --git a/src/lib/universal-ws.ts b/src/lib/universal-ws.ts index e12156c..669c434 100644 --- a/src/lib/universal-ws.ts +++ b/src/lib/universal-ws.ts @@ -64,8 +64,8 @@ export class UniversalWsClient { // Re-subscribe to rooms after reconnect for (const roomId of this.subscribedRooms) { - this.request('room.subscribe', { room_id: roomId }).catch(() => { - // ignore re-subscribe errors + this.request('room.subscribe', { room_id: roomId }).catch((err) => { + console.warn('[UniversalWs] re-subscribe failed:', err); }); } @@ -202,7 +202,7 @@ export class UniversalWsClient { this.reconnectTimer = setTimeout(() => { this.reconnectTimer = null; this.connect().catch(() => { - // connect() will retry on its own + // connect() has its own retry logic; ignore here to avoid duplicate warnings }); }, delay); }