use crate::ssh::ReceiveSyncService; use crate::ssh::RepoReceiveSyncTask; use crate::ssh::SshTokenService; use crate::ssh::authz::SshAuthService; use crate::ssh::branch_protect::check_branch_protection; use crate::ssh::forward::forward; use crate::ssh::git_service::{GitService, build_git_command, parse_git_command, parse_repo_path}; use crate::ssh::ref_update::RefUpdate; use db::cache::AppCache; use db::database::AppDatabase; use models::repos::{repo, repo_branch_protect}; use models::users::user; use russh::keys::{Certificate, PublicKey}; use russh::server::{Auth, Msg, Session}; use russh::{Channel, ChannelId, Disconnect}; use sea_orm::ColumnTrait; use sea_orm::EntityTrait; use sea_orm::QueryFilter; use std::collections::{HashMap, HashSet}; use std::io; use std::net::SocketAddr; use std::path::PathBuf; use std::process::Stdio; use std::time::Duration; use tokio_util::bytes::Bytes; const PRE_PACK_LIMIT: usize = 1_048_576; use tokio::io::AsyncWriteExt; use tokio::process::ChildStdin; use tokio::sync::mpsc::Sender; use tokio::time::sleep; pub struct SSHandle { pub repo: Option, pub model: Option, pub stdin: HashMap, pub eof: HashMap>, pub operator: Option, pub db: AppDatabase, pub auth: SshAuthService, pub buffer: HashMap>, pub branch: HashMap>, pub service: Option, pub cache: AppCache, pub sync: ReceiveSyncService, pub upload_pack_eof_sent: HashSet, pub token_service: SshTokenService, pub client_addr: Option, } impl SSHandle { pub fn new( db: AppDatabase, cache: AppCache, sync: ReceiveSyncService, token_service: SshTokenService, client_addr: Option, ) -> Self { let auth = SshAuthService::new(db.clone()); let addr_str = client_addr .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); tracing::info!("SSH handler created client={}", addr_str); Self { repo: None, model: None, stdin: HashMap::new(), eof: HashMap::new(), operator: None, db, auth, buffer: HashMap::new(), branch: HashMap::new(), service: None, cache, sync, upload_pack_eof_sent: HashSet::new(), token_service, client_addr, } } fn cleanup_channel(&mut self, channel_id: ChannelId) { if let Some(stdin) = self.stdin.remove(&channel_id) { let channel_id_for_task = channel_id; tokio::spawn(async move { let _ = tokio::time::timeout(Duration::from_secs(5), async { let mut stdin = stdin; if let Err(e) = stdin.flush().await { tracing::warn!(error = %e, "ssh_cleanup_flush_failed channel={:?}", channel_id_for_task); } let _ = stdin.shutdown().await; }) .await; }); } self.eof.remove(&channel_id); self.upload_pack_eof_sent.remove(&channel_id); } } impl Drop for SSHandle { fn drop(&mut self) { let addr_str = self .client_addr .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); tracing::info!("ssh_handler_dropped client={}", addr_str); let channel_ids: Vec<_> = self.stdin.keys().copied().collect(); for channel_id in channel_ids { self.cleanup_channel(channel_id); } } } impl russh::server::Handler for SSHandle { type Error = russh::Error; async fn auth_none(&mut self, user: &str) -> Result { let client_info = self .client_addr .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); tracing::info!("auth_none_received user={} client={}", user, client_info); Ok(Auth::UnsupportedMethod) } async fn auth_password(&mut self, _user: &str, token: &str) -> Result { let client_info = self .client_addr .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); if token.is_empty() { tracing::warn!("auth_rejected_empty_token client={}", client_info); return Err(russh::Error::NotAuthenticated); } tracing::info!("auth_token_attempt client={}", client_info); let user_model = match self.token_service.find_user_by_token(token).await { Ok(Some(model)) => model, Ok(None) => { tracing::warn!("auth_rejected_token_not_found client={}", client_info); return Err(russh::Error::NotAuthenticated); } Err(e) => { tracing::error!("auth_token_error error={}", e); return Err(russh::Error::NotAuthenticated); } }; tracing::info!( "auth_token_success user={} client={}", user_model.username, client_info ); self.operator = Some(user_model); Ok(Auth::Accept) } async fn auth_publickey_offered( &mut self, user: &str, public_key: &PublicKey, ) -> Result { let client_info = self .client_addr .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); if user != "git" { tracing::warn!( "auth_publickey_offer_rejected_invalid_username user={} client={}", user, client_info ); return Err(russh::Error::NotAuthenticated); } let public_key_str = public_key.to_string(); if public_key_str.len() < 32 { tracing::warn!( "auth_publickey_offer_rejected_invalid_key_length key_length={}", public_key_str.len() ); return Err(russh::Error::NotAuthenticated); } tracing::info!("auth_publickey_offer client={}", client_info); match self.auth.find_user_by_public_key(&public_key_str).await { Ok(Some(key_user)) => { tracing::info!( "auth_publickey_offer_accepted user={} key={} client={}", key_user.user.username, key_user.key_title, client_info ); Ok(Auth::Accept) } Ok(None) => { tracing::warn!( "auth_publickey_offer_rejected_key_not_found client={}", client_info ); Err(russh::Error::NotAuthenticated) } Err(e) => { tracing::error!("auth_publickey_offer_error error={}", e); Err(russh::Error::NotAuthenticated) } } } async fn auth_publickey( &mut self, user: &str, public_key: &PublicKey, ) -> Result { let client_info = self .client_addr .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); if user != "git" { tracing::warn!( "auth_rejected_invalid_username user={} client={}", user, client_info ); return Err(russh::Error::NotAuthenticated); } let public_key_str = public_key.to_string(); if public_key_str.len() < 32 { tracing::warn!( "auth_rejected_invalid_key_length key_length={}", public_key_str.len() ); return Err(russh::Error::NotAuthenticated); } tracing::info!("auth_publickey_attempt client={}", client_info); let key_user = match self.auth.find_user_by_public_key(&public_key_str).await { Ok(Some(key_user)) => key_user, Ok(None) => { tracing::warn!("auth_rejected_key_not_found client={}", client_info); return Err(russh::Error::NotAuthenticated); } Err(e) => { tracing::error!("auth_publickey_error error={}", e); return Err(russh::Error::NotAuthenticated); } }; tracing::info!( "auth_publickey_success user={} client={}", key_user.user.username, client_info ); self.auth.update_key_last_used_async(key_user.key_id); self.operator = Some(key_user.user); Ok(Auth::Accept) } async fn auth_openssh_certificate( &mut self, user: &str, certificate: &Certificate, ) -> Result { let client_info = self .client_addr .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); if user != "git" { tracing::warn!( "auth_rejected_invalid_username user={} client={}", user, client_info ); return Err(russh::Error::NotAuthenticated); } let public_key_str = certificate.to_string(); if public_key_str.len() < 32 { tracing::warn!( "auth_rejected_invalid_key_length key_length={}", public_key_str.len() ); return Err(russh::Error::NotAuthenticated); } tracing::info!("auth_publickey_attempt client={}", client_info); let key_user = match self.auth.find_user_by_public_key(&public_key_str).await { Ok(Some(key_user)) => key_user, Ok(None) => { tracing::warn!("auth_rejected_key_not_found client={}", client_info); return Err(russh::Error::NotAuthenticated); } Err(e) => { tracing::error!("auth_publickey_error error={}", e); return Err(russh::Error::NotAuthenticated); } }; tracing::info!( "auth_publickey_success user={} client={}", key_user.user.username, client_info ); self.auth.update_key_last_used_async(key_user.key_id); self.operator = Some(key_user.user); Ok(Auth::Accept) } async fn authentication_banner(&mut self) -> Result, Self::Error> { Ok(None) } async fn channel_close( &mut self, channel: ChannelId, _: &mut Session, ) -> Result<(), Self::Error> { tracing::info!( "channel_close channel={:?} client={:?}", channel, self.client_addr ); self.cleanup_channel(channel); Ok(()) } async fn channel_eof( &mut self, channel: ChannelId, _: &mut Session, ) -> Result<(), Self::Error> { tracing::info!( "channel_eof channel={:?} client={:?}", channel, self.client_addr ); if let Some(eof) = self.eof.get(&channel) { let _ = eof.send(true).await; } if let Some(mut stdin) = self.stdin.remove(&channel) { tracing::info!( "Closing stdin channel={:?} client={:?}", channel, self.client_addr ); // Use timeout so we never block the SSH event loop waiting for git. let _ = tokio::time::timeout(Duration::from_secs(5), async { if let Err(e) = stdin.flush().await { tracing::warn!(error = %e, "ssh_eof_flush_failed channel={:?}", channel); } let _ = stdin.shutdown().await; }) .await; tracing::info!( "stdin closed channel={:?} client={:?}", channel, self.client_addr ); } else { tracing::warn!( "stdin already removed channel={:?} client={:?}", channel, self.client_addr ); } Ok(()) } async fn channel_open_session( &mut self, channel: Channel, session: &mut Session, ) -> Result { let client_info = self .client_addr .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); tracing::info!( "channel_open_session channel={:?} client={}", channel, client_info ); if let Err(e) = session.flush() { tracing::warn!(error = %e, "ssh_session_flush_failed"); } Ok(true) } async fn pty_request( &mut self, channel: ChannelId, term: &str, col_width: u32, row_height: u32, _pix_width: u32, _pix_height: u32, _modes: &[(russh::Pty, u32)], session: &mut Session, ) -> Result<(), Self::Error> { tracing::warn!( "pty_request not supported channel={:?} term={} cols={} rows={}", channel, term, col_width, row_height ); if let Err(e) = session.flush() { tracing::warn!(error = %e, "ssh_session_flush_failed"); } Ok(()) } async fn subsystem_request( &mut self, channel: ChannelId, name: &str, session: &mut Session, ) -> Result<(), Self::Error> { tracing::info!("subsystem_request channel={:?} subsystem={}", channel, name); // git-clients may send "subsystem" for git protocol over ssh. // We don't use subsystem; exec_request handles it directly. if let Err(e) = session.flush() { tracing::warn!(error = %e, "ssh_session_flush_failed"); } Ok(()) } async fn data( &mut self, channel: ChannelId, data: &[u8], session: &mut Session, ) -> Result<(), Self::Error> { if matches!(self.service, Some(GitService::ReceivePack)) { if !self.branch.contains_key(&channel) { let bf = self.buffer.entry(channel).or_default(); // Reject oversized pre-PACK data to prevent memory exhaustion if bf.len() + data.len() > PRE_PACK_LIMIT { tracing::warn!("ssh_pre_pack_too_large channel={:?}", channel); let msg = "remote: Ref negotiation exceeds size limit\r\n"; let _ = session.extended_data(channel, 1, Bytes::copy_from_slice(msg.as_bytes())); let _ = session.exit_status_request(channel, 1); let _ = session.eof(channel); let _ = session.close(channel); self.cleanup_channel(channel); return Ok(()); } bf.extend_from_slice(data); if !bf.windows(4).any(|w| w == b"0000") { return Ok(()); } let buffered = self.buffer.remove(&channel).unwrap_or_default(); match RefUpdate::parse_ref_updates(&buffered) { Ok(refs) => { if let Some(model) = &self.model { let branch_protect_roles = repo_branch_protect::Entity::find() .filter(repo_branch_protect::Column::Repo.eq(model.id)) .all(self.db.reader()) .await .map_err(|e| { russh::Error::IO(io::Error::new(io::ErrorKind::Other, e)) })?; for r#ref in &refs { if let Some(msg) = check_branch_protection(&branch_protect_roles, r#ref) { let full_msg = format!("remote: {}\r\n", msg); let _ = session.extended_data( channel, 1, Bytes::copy_from_slice(full_msg.as_bytes()), ); let _ = session.exit_status_request(channel, 1); let _ = session.eof(channel); let _ = session.close(channel); self.cleanup_channel(channel); return Ok(()); } } } self.branch.insert(channel, refs); } Err(e) => { tracing::warn!("ref_update_parse_error error={:?}", e); self.branch.insert(channel, vec![]); } } if let Some(stdin) = self.stdin.get_mut(&channel) { stdin.write_all(&buffered).await?; stdin.flush().await?; } else { tracing::error!("stdin_not_found channel={:?}", channel); } return Ok(()); } if let Some(stdin) = self.stdin.get_mut(&channel) { stdin.write_all(data).await?; stdin.flush().await?; } else { tracing::error!("stdin_not_found_forwarding channel={:?}", channel); } return Ok(()); } if let Some(stdin) = self.stdin.get_mut(&channel) { stdin.write_all(data).await?; if matches!(self.service, Some(GitService::UploadPack)) && !self.upload_pack_eof_sent.contains(&channel) { let has_flush_pkt = data.windows(4).any(|w| w == b"0000"); if has_flush_pkt { stdin.flush().await?; let _ = stdin.shutdown().await; self.upload_pack_eof_sent.insert(channel); } } } Ok(()) } async fn shell_request( &mut self, channel_id: ChannelId, session: &mut Session, ) -> Result<(), Self::Error> { if let Some(user) = &self.operator { let welcome_msg = format!( "Hi {}! You've successfully authenticated, but GitData does not provide shell access.\r\n", user.username ); tracing::info!("shell_request user={}", user.username); let _ = session.data(channel_id, Bytes::copy_from_slice(welcome_msg.as_bytes())); let _ = session.exit_status_request(channel_id, 0); let _ = session.eof(channel_id); let _ = session.close(channel_id); let _ = session.flush(); } else { tracing::warn!("shell_request_unauthenticated channel={:?}", channel_id); let msg = "Authentication required\r\n"; let _ = session.data(channel_id, Bytes::copy_from_slice(msg.as_bytes())); let _ = session.exit_status_request(channel_id, 1); let _ = session.eof(channel_id); let _ = session.close(channel_id); let _ = session.flush(); } Ok(()) } async fn exec_request( &mut self, channel_id: ChannelId, data: &[u8], session: &mut Session, ) -> Result<(), Self::Error> { let client_info = self .client_addr .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); tracing::info!( "exec_request received channel={:?} client={}", channel_id, client_info ); let git_shell_cmd = match std::str::from_utf8(data) { Ok(cmd) => cmd.trim(), Err(e) => { tracing::error!("invalid_command_encoding error={}", e); let _ = session.disconnect( Disconnect::ServiceNotAvailable, "Invalid command encoding", "", ); return Err(russh::Error::Disconnect); } }; let (service, path) = match parse_git_command(git_shell_cmd) { Some((s, p)) => (s, p), None => { tracing::error!("invalid_git_command command={}", git_shell_cmd); let msg = format!("Invalid git command: {}", git_shell_cmd); let _ = session.disconnect(Disconnect::ServiceNotAvailable, &msg, ""); return Err(russh::Error::Disconnect); } }; self.service = Some(service); let (owner, repo) = match parse_repo_path(path) { Some(pair) => pair, None => { let msg = format!("Invalid repository path: {}", path); tracing::error!("invalid_repo_path path={}", path); let _ = session.disconnect(Disconnect::ServiceNotAvailable, &msg, ""); return Err(russh::Error::Disconnect); } }; let repo = repo.strip_suffix(".git").unwrap_or(repo).to_string(); let repo = match self.auth.find_repo(owner, &repo).await { Ok(repo) => repo, Err(e) => { // Log the detailed error internally; client receives generic message. tracing::error!("repo_fetch_error error={}", e); let _ = session.disconnect(Disconnect::ServiceNotAvailable, "Repository not found", ""); return Err(russh::Error::Disconnect); } }; self.model = Some(repo.clone()); let operator = match &self.operator { Some(user) => user, None => { let msg = "Authentication error: no authenticated user"; tracing::error!("exec_no_authenticated_user channel={:?}", channel_id); let _ = session.disconnect(Disconnect::ByApplication, msg, ""); return Err(russh::Error::Disconnect); } }; let is_write = service == GitService::ReceivePack; let has_permission = self .auth .check_repo_permission(operator, &repo, is_write) .await; if !has_permission { let msg = format!( "Access denied: user '{}' does not have {} permission for repository {}", operator.username, if is_write { "write" } else { "read" }, repo.repo_name ); tracing::error!( "access_denied user={} repo={} is_write={}", operator.username, repo.repo_name, is_write ); let _ = session.disconnect(Disconnect::ByApplication, &msg, ""); return Err(russh::Error::Disconnect); } tracing::info!( "access_granted user={} repo={} is_write={}", operator.username, repo.repo_name, is_write ); let repo_path = PathBuf::from(&repo.storage_path); if !repo_path.exists() { tracing::error!("repo_path_not_found path={}", repo.storage_path); } let mut cmd = build_git_command(service, repo_path); tracing::info!( "spawn_git_process service={:?} path={}", service, repo.storage_path ); let mut shell = match cmd .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() { Ok(shell) => { let _ = session.channel_success(channel_id); shell } Err(e) => { tracing::error!("process_spawn_failed error={}", e); let _ = session.channel_failure(channel_id); self.cleanup_channel(channel_id); return Err(russh::Error::IO(e)); } }; let session_handle = session.handle(); let stdin = match shell.stdin.take() { Some(s) => s, None => { tracing::error!("stdin pipe unavailable for channel={:?}", channel_id); let _ = session_handle.channel_failure(channel_id).await; return Err(russh::Error::IO(io::Error::new( io::ErrorKind::Other, "stdin unavailable", ))); } }; self.stdin.insert(channel_id, stdin); let mut shell_stdout = match shell.stdout.take() { Some(s) => s, None => { tracing::error!("stdout pipe unavailable for channel={:?}", channel_id); return Err(russh::Error::IO(io::Error::new( io::ErrorKind::Other, "stdout unavailable", ))); } }; let mut shell_stderr = match shell.stderr.take() { Some(s) => s, None => { tracing::error!("stderr pipe unavailable for channel={:?}", channel_id); return Err(russh::Error::IO(io::Error::new( io::ErrorKind::Other, "stderr unavailable", ))); } }; let (eof_tx, mut eof_rx) = tokio::sync::mpsc::channel::(10); self.eof.insert(channel_id, eof_tx); let repo_uid = repo.id; let should_sync = service == GitService::ReceivePack; let sync = self.sync.clone(); let fut = async move { tracing::info!(channel = ?channel_id, "git_task_started"); let mut stdout_done = false; let mut stderr_done = false; let stdout_fut = forward( &session_handle, channel_id, &mut shell_stdout, |handle, chan, data| async move { handle.data(chan, data).await }, ); tokio::pin!(stdout_fut); let stderr_fut = forward( &session_handle, channel_id, &mut shell_stderr, |handle, chan, data| async move { handle.extended_data(chan, 1, data).await }, ); tokio::pin!(stderr_fut); loop { tokio::select! { result = shell.wait() => { let status = result?; let status_code = status.code().unwrap_or(128) as u32; tracing::info!("git_process_exited channel={:?} status={}", channel_id, status_code); if !stdout_done || !stderr_done { let _ = tokio::time::timeout(Duration::from_millis(100), async { tokio::join!( async { if !stdout_done { let _ = (&mut stdout_fut).await; } }, async { if !stderr_done { let _ = (&mut stderr_fut).await; } } ); }).await; } if should_sync { let sync = sync.clone(); tokio::spawn(async move { sync.send(RepoReceiveSyncTask { repo_uid }).await }); } let _ = session_handle.exit_status_request(channel_id, status_code).await; sleep(Duration::from_millis(50)).await; let _ = session_handle.eof(channel_id).await; let _ = session_handle.close(channel_id).await; tracing::info!(channel = ?channel_id, "channel_closed"); break; } result = &mut stdout_fut, if !stdout_done => { tracing::info!("stdout completed"); stdout_done = true; if let Err(e) = result { tracing::warn!(error = ?e, "stdout_forward_error"); } } result = &mut stderr_fut, if !stderr_done => { tracing::info!("stderr completed"); stderr_done = true; if let Err(e) = result { tracing::warn!(error = ?e, "stderr_forward_error"); } } } } Ok::<(), russh::Error>(()) }; tokio::spawn(async move { if let Err(e) = fut.await { tracing::error!("git_ssh_channel_task_error error={}", e); } while eof_rx.recv().await.is_some() {} }); Ok(()) } }