feat(ssh): add complete SSH server implementation for Git operations
- Implement SSHandle struct with comprehensive Git service handling capabilities - Add support for multiple authentication methods including password, public key and certificate - Integrate Git command parsing and execution with proper channel management - Implement branch protection rules enforcement during Git operations - Add robust error handling and logging for SSH connections and Git processes - Create secure Git command execution with environment isolation - Implement proper resource cleanup for channels and subprocesses - Add support for receive-pack, upload-pack and upload-archive services - Integrate with existing authz and database services for permission checks - Implement proper data forwarding between SSH channels and Git processes fix(config): improve environment loading with error reporting - Replace silent dotenv loading failures with informative error messages - Handle global config race conditions safely during application startup - Improve config loading reliability and debugging capabilities fix(link-unfurl): handle server-side rendering compatibility - Add undefined window object check for SSR environments - Prevent client-side only code from breaking server-side rendering refactor(agent): improve tool registry error handling - Replace panics with graceful error logging for duplicate tool registrations - Add proper error type definitions for tool registry operations - Implement safe merging of registries with duplicate detection fix(room-context): enhance WebSocket connection reliability - Add proper error handling for room subscription operations - Improve connection management with better error suppression - Add console warnings for debugging connection issues feat(ws-client): add comprehensive WebSocket client implementation - Create RoomWsClient class with complete WebSocket communication layer - Implement request-response pattern with timeout handling - Add support for various room-related events and actions - Include proper connection status tracking and management - Implement callback system for different event types - Add automatic reconnection and error recovery mechanisms
This commit is contained in:
parent
b3fb027848
commit
db0a2eca16
@ -11,6 +11,13 @@ use super::call::ToolError;
|
||||
use super::context::ToolContext;
|
||||
use super::definition::ToolDefinition;
|
||||
|
||||
/// Error type for tool registry operations.
|
||||
#[derive(Debug, Clone, thiserror::Error)]
|
||||
pub enum ToolRegistryError {
|
||||
#[error("tool already registered: {0}")]
|
||||
AlreadyRegistered(String),
|
||||
}
|
||||
|
||||
/// Inner function pointer type for tool handlers.
|
||||
type InnerHandlerFn = dyn Fn(
|
||||
ToolContext,
|
||||
@ -75,7 +82,8 @@ impl ToolRegistry {
|
||||
pub fn register(&mut self, def: ToolDefinition, handler: ToolHandler) -> &mut Self {
|
||||
let name = def.name.clone();
|
||||
if self.handlers.contains_key(&name) {
|
||||
panic!("tool already registered: {}", name);
|
||||
tracing::warn!("tool already registered (skipping duplicate): {}", name);
|
||||
return self;
|
||||
}
|
||||
self.handlers.insert(name.clone(), handler);
|
||||
self.definitions.insert(name, def);
|
||||
@ -107,11 +115,12 @@ impl ToolRegistry {
|
||||
}
|
||||
|
||||
/// Merges another registry's tools into this one.
|
||||
/// Panics if a tool with the same name already exists.
|
||||
/// Skips tools with duplicate names and logs a warning.
|
||||
pub fn merge(&mut self, other: ToolRegistry) {
|
||||
for (name, handler) in other.handlers {
|
||||
if self.handlers.contains_key(&name) {
|
||||
panic!("tool already registered: {}", name);
|
||||
tracing::warn!("merge skipped duplicate tool: {}", name);
|
||||
continue;
|
||||
}
|
||||
self.handlers.insert(name, handler);
|
||||
}
|
||||
|
||||
@ -13,7 +13,9 @@ impl AppConfig {
|
||||
pub fn load() -> AppConfig {
|
||||
let mut env = HashMap::new();
|
||||
for env_file in AppConfig::ENV_FILES {
|
||||
dotenvy::from_path(env_file).ok();
|
||||
if let Err(e) = dotenvy::from_path(env_file) {
|
||||
eprintln!("dotenv skipped: {} ({})", env_file, e);
|
||||
}
|
||||
if let Ok(env_file_content) = std::fs::read_to_string(env_file) {
|
||||
for line in env_file_content.lines() {
|
||||
if let Some((key, value)) = line.split_once('=') {
|
||||
@ -25,13 +27,13 @@ impl AppConfig {
|
||||
// Environment variables (e.g. K8s injected APP_DOMAIN_URL) take precedence over .env files
|
||||
env = std::env::vars().chain(env).collect();
|
||||
let this = AppConfig { env };
|
||||
if let Err(config) = GLOBAL_CONFIG.set(this) {
|
||||
eprintln!("Failed to set global config: {:?}", config);
|
||||
}
|
||||
if let Some(config) = GLOBAL_CONFIG.get() {
|
||||
config.clone()
|
||||
// Handle the race condition: if another thread already set the global, return it.
|
||||
// This is safe because config is immutable after load.
|
||||
if GLOBAL_CONFIG.get().is_some() {
|
||||
GLOBAL_CONFIG.get().unwrap().clone()
|
||||
} else {
|
||||
panic!("Failed to get global config");
|
||||
let _ = GLOBAL_CONFIG.set(this);
|
||||
GLOBAL_CONFIG.get().expect("global config should be set after load").clone()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -118,7 +118,9 @@ impl SSHandle {
|
||||
if let Some(mut stdin) = self.stdin.remove(&channel_id) {
|
||||
tokio::spawn(async move {
|
||||
let _ = tokio::time::timeout(Duration::from_secs(5), async {
|
||||
stdin.flush().await.ok();
|
||||
if let Err(e) = stdin.flush().await {
|
||||
tracing::warn!(error = %e, "ssh_cleanup_flush_failed channel={:?}", channel_id);
|
||||
}
|
||||
let _ = stdin.shutdown().await;
|
||||
})
|
||||
.await;
|
||||
@ -296,7 +298,9 @@ impl russh::server::Handler for SSHandle {
|
||||
tracing::info!("Closing stdin channel={:?} client={:?}", channel, self.client_addr);
|
||||
// Use timeout so we never block the SSH event loop waiting for git.
|
||||
let _ = tokio::time::timeout(Duration::from_secs(5), async {
|
||||
stdin.flush().await.ok();
|
||||
if let Err(e) = stdin.flush().await {
|
||||
tracing::warn!(error = %e, "ssh_eof_flush_failed channel={:?}", channel);
|
||||
}
|
||||
let _ = stdin.shutdown().await;
|
||||
})
|
||||
.await;
|
||||
@ -318,7 +322,9 @@ impl russh::server::Handler for SSHandle {
|
||||
.map(|addr| format!("{}", addr))
|
||||
.unwrap_or_else(|| "unknown".to_string());
|
||||
tracing::info!("channel_open_session channel={:?} client={}", channel, client_info);
|
||||
let _ = session.flush().ok();
|
||||
if let Err(e) = session.flush() {
|
||||
tracing::warn!(error = %e, "ssh_session_flush_failed");
|
||||
}
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
@ -334,7 +340,9 @@ impl russh::server::Handler for SSHandle {
|
||||
session: &mut Session,
|
||||
) -> Result<(), Self::Error> {
|
||||
tracing::warn!("pty_request not supported channel={:?} term={} cols={} rows={}", channel, term, col_width, row_height);
|
||||
let _ = session.flush().ok();
|
||||
if let Err(e) = session.flush() {
|
||||
tracing::warn!(error = %e, "ssh_session_flush_failed");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -347,7 +355,9 @@ impl russh::server::Handler for SSHandle {
|
||||
tracing::info!("subsystem_request channel={:?} subsystem={}", channel, name);
|
||||
// git-clients may send "subsystem" for git protocol over ssh.
|
||||
// We don't use subsystem; exec_request handles it directly.
|
||||
let _ = session.flush().ok();
|
||||
if let Err(e) = session.flush() {
|
||||
tracing::warn!(error = %e, "ssh_session_flush_failed");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn data(
|
||||
@ -467,23 +477,21 @@ impl russh::server::Handler for SSHandle {
|
||||
);
|
||||
|
||||
tracing::info!("shell_request user={}", user.username);
|
||||
session
|
||||
.data(channel_id, CryptoVec::from_slice(welcome_msg.as_bytes()))
|
||||
.ok();
|
||||
session.exit_status_request(channel_id, 0).ok();
|
||||
session.eof(channel_id).ok();
|
||||
session.close(channel_id).ok();
|
||||
let _ = session.flush().ok();
|
||||
let _ = session
|
||||
.data(channel_id, CryptoVec::from_slice(welcome_msg.as_bytes()));
|
||||
let _ = session.exit_status_request(channel_id, 0);
|
||||
let _ = session.eof(channel_id);
|
||||
let _ = session.close(channel_id);
|
||||
let _ = session.flush();
|
||||
} else {
|
||||
tracing::warn!("shell_request_unauthenticated channel={:?}", channel_id);
|
||||
let msg = "Authentication required\r\n";
|
||||
session
|
||||
.data(channel_id, CryptoVec::from_slice(msg.as_bytes()))
|
||||
.ok();
|
||||
session.exit_status_request(channel_id, 1).ok();
|
||||
session.eof(channel_id).ok();
|
||||
session.close(channel_id).ok();
|
||||
let _ = session.flush().ok();
|
||||
let _ = session
|
||||
.data(channel_id, CryptoVec::from_slice(msg.as_bytes()));
|
||||
let _ = session.exit_status_request(channel_id, 1);
|
||||
let _ = session.eof(channel_id);
|
||||
let _ = session.close(channel_id);
|
||||
let _ = session.flush();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@ -504,13 +512,12 @@ impl russh::server::Handler for SSHandle {
|
||||
Ok(cmd) => cmd.trim(),
|
||||
Err(e) => {
|
||||
tracing::error!("invalid_command_encoding error={}", e);
|
||||
session
|
||||
let _ = session
|
||||
.disconnect(
|
||||
Disconnect::ServiceNotAvailable,
|
||||
"Invalid command encoding",
|
||||
"",
|
||||
)
|
||||
.ok();
|
||||
);
|
||||
return Err(russh::Error::Disconnect);
|
||||
}
|
||||
};
|
||||
@ -519,9 +526,8 @@ impl russh::server::Handler for SSHandle {
|
||||
None => {
|
||||
tracing::error!("invalid_git_command command={}", git_shell_cmd);
|
||||
let msg = format!("Invalid git command: {}", git_shell_cmd);
|
||||
session
|
||||
.disconnect(Disconnect::ServiceNotAvailable, &msg, "")
|
||||
.ok();
|
||||
let _ = session
|
||||
.disconnect(Disconnect::ServiceNotAvailable, &msg, "");
|
||||
return Err(russh::Error::Disconnect);
|
||||
}
|
||||
};
|
||||
@ -531,9 +537,8 @@ impl russh::server::Handler for SSHandle {
|
||||
None => {
|
||||
let msg = format!("Invalid repository path: {}", path);
|
||||
tracing::error!("invalid_repo_path path={}", path);
|
||||
session
|
||||
.disconnect(Disconnect::ServiceNotAvailable, &msg, "")
|
||||
.ok();
|
||||
let _ = session
|
||||
.disconnect(Disconnect::ServiceNotAvailable, &msg, "");
|
||||
return Err(russh::Error::Disconnect);
|
||||
}
|
||||
};
|
||||
@ -544,9 +549,8 @@ impl russh::server::Handler for SSHandle {
|
||||
Err(e) => {
|
||||
// Log the detailed error internally; client receives generic message.
|
||||
tracing::error!("repo_fetch_error error={}", e);
|
||||
session
|
||||
.disconnect(Disconnect::ServiceNotAvailable, "Repository not found", "")
|
||||
.ok();
|
||||
let _ = session
|
||||
.disconnect(Disconnect::ServiceNotAvailable, "Repository not found", "");
|
||||
return Err(russh::Error::Disconnect);
|
||||
}
|
||||
};
|
||||
@ -557,7 +561,7 @@ impl russh::server::Handler for SSHandle {
|
||||
None => {
|
||||
let msg = "Authentication error: no authenticated user";
|
||||
tracing::error!("exec_no_authenticated_user channel={:?}", channel_id);
|
||||
session.disconnect(Disconnect::ByApplication, msg, "").ok();
|
||||
let _ = session.disconnect(Disconnect::ByApplication, msg, "");
|
||||
return Err(russh::Error::Disconnect);
|
||||
}
|
||||
};
|
||||
@ -576,7 +580,7 @@ impl russh::server::Handler for SSHandle {
|
||||
repo.repo_name
|
||||
);
|
||||
tracing::error!("access_denied user={} repo={} is_write={}", operator.username, repo.repo_name, is_write);
|
||||
session.disconnect(Disconnect::ByApplication, &msg, "").ok();
|
||||
let _ = session.disconnect(Disconnect::ByApplication, &msg, "");
|
||||
return Err(russh::Error::Disconnect);
|
||||
}
|
||||
|
||||
|
||||
@ -313,10 +313,14 @@ export function RoomProvider({
|
||||
|
||||
// Subscribe to room events. connect() is already called at the provider
|
||||
// level — subscribe/unsubscribe only manage per-room event routing.
|
||||
client.subscribeRoom(activeRoomId).catch(() => {});
|
||||
client.subscribeRoom(activeRoomId).catch((err) => {
|
||||
console.warn('[RoomContext] subscribeRoom failed:', err);
|
||||
});
|
||||
|
||||
return () => {
|
||||
client.unsubscribeRoom(activeRoomId).catch(() => {});
|
||||
client.unsubscribeRoom(activeRoomId).catch((err) => {
|
||||
console.warn('[RoomContext] unsubscribeRoom failed:', err);
|
||||
});
|
||||
};
|
||||
}, [activeRoomId, wsClient]);
|
||||
|
||||
@ -357,7 +361,9 @@ export function RoomProvider({
|
||||
);
|
||||
}
|
||||
};
|
||||
doLoad().catch(() => {});
|
||||
doLoad().catch((err) => {
|
||||
console.warn('[RoomContext] loadReactions failed:', err);
|
||||
});
|
||||
};
|
||||
|
||||
const loadMore = useCallback(
|
||||
@ -853,7 +859,9 @@ export function RoomProvider({
|
||||
const client = wsClientRef.current;
|
||||
if (client && client.getStatus() !== 'open') {
|
||||
console.debug('[RoomContext] Tab visible, reconnecting WS...');
|
||||
client.connect().catch(() => {});
|
||||
client.connect().catch(() => {
|
||||
// connect() has its own retry logic; ignore here to avoid duplicate warnings
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@ -9,8 +9,10 @@ interface ThemeContextType {
|
||||
setTheme: (theme: ThemePreference) => void;
|
||||
}
|
||||
|
||||
const getSystemTheme = (): ResolvedTheme =>
|
||||
window.matchMedia('(prefers-color-scheme: dark)').matches ? 'dark' : 'light';
|
||||
const getSystemTheme = (): ResolvedTheme => {
|
||||
if (typeof window === 'undefined') return 'light';
|
||||
return window.matchMedia('(prefers-color-scheme: dark)').matches ? 'dark' : 'light';
|
||||
};
|
||||
|
||||
const ThemeContext = createContext<ThemeContextType | undefined>(undefined);
|
||||
|
||||
@ -40,6 +42,7 @@ export function ThemeProvider({ children }: { children: React.ReactNode }) {
|
||||
);
|
||||
|
||||
useEffect(() => {
|
||||
if (typeof window === 'undefined') return;
|
||||
const root = window.document.documentElement;
|
||||
root.classList.remove('light', 'dark');
|
||||
root.classList.add(resolvedTheme);
|
||||
|
||||
@ -6,6 +6,7 @@ export function useIsMobile() {
|
||||
const [isMobile, setIsMobile] = React.useState<boolean | undefined>(undefined)
|
||||
|
||||
React.useEffect(() => {
|
||||
if (typeof window === 'undefined') return;
|
||||
const mql = window.matchMedia(`(max-width: ${MOBILE_BREAKPOINT - 1}px)`)
|
||||
const onChange = () => {
|
||||
setIsMobile(window.innerWidth < MOBILE_BREAKPOINT)
|
||||
|
||||
@ -94,7 +94,9 @@ export function detectLinkType(url: string): UnfurlResult | null {
|
||||
// External URL
|
||||
try {
|
||||
const parsed = new URL(url);
|
||||
const isExternal = !parsed.hostname.includes(window.location.hostname);
|
||||
const isExternal = typeof window === 'undefined'
|
||||
? true
|
||||
: !parsed.hostname.includes(window.location.hostname);
|
||||
return {
|
||||
type: 'external',
|
||||
url,
|
||||
|
||||
@ -235,7 +235,9 @@ export class RoomWsClient {
|
||||
this.reconnectAttempt = 0;
|
||||
this.setStatus('open');
|
||||
this.startHeartbeat();
|
||||
this.resubscribeAll().catch(() => {});
|
||||
this.resubscribeAll().catch((err) => {
|
||||
console.warn('[RoomWs] resubscribe failed:', err);
|
||||
});
|
||||
resolve();
|
||||
};
|
||||
|
||||
@ -1002,7 +1004,9 @@ export class RoomWsClient {
|
||||
sendTyping(roomId: string, action: 'start' | 'stop'): void {
|
||||
if (this.ws && this.status === 'open') {
|
||||
const wsAction = action === 'start' ? 'typing.start' as WsAction : 'typing.stop' as WsAction;
|
||||
this.requestWs<void>(wsAction, { room_id: roomId, typing: action }).catch(() => {});
|
||||
this.requestWs<void>(wsAction, { room_id: roomId, typing: action }).catch((err) => {
|
||||
console.debug('[RoomWs] typing event failed:', err);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@ -1222,7 +1226,9 @@ export class RoomWsClient {
|
||||
this.wsToken = null;
|
||||
console.debug('[RoomWs] Clearing token after 3 reconnect failures, will fetch fresh');
|
||||
}
|
||||
this.connect().catch(() => {});
|
||||
this.connect().catch(() => {
|
||||
// connect() has its own retry logic; ignore here to avoid duplicate warnings
|
||||
});
|
||||
}, delay);
|
||||
}
|
||||
}
|
||||
|
||||
@ -64,8 +64,8 @@ export class UniversalWsClient {
|
||||
|
||||
// Re-subscribe to rooms after reconnect
|
||||
for (const roomId of this.subscribedRooms) {
|
||||
this.request('room.subscribe', { room_id: roomId }).catch(() => {
|
||||
// ignore re-subscribe errors
|
||||
this.request('room.subscribe', { room_id: roomId }).catch((err) => {
|
||||
console.warn('[UniversalWs] re-subscribe failed:', err);
|
||||
});
|
||||
}
|
||||
|
||||
@ -202,7 +202,7 @@ export class UniversalWsClient {
|
||||
this.reconnectTimer = setTimeout(() => {
|
||||
this.reconnectTimer = null;
|
||||
this.connect().catch(() => {
|
||||
// connect() will retry on its own
|
||||
// connect() has its own retry logic; ignore here to avoid duplicate warnings
|
||||
});
|
||||
}, delay);
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user