Model sync: - Filter OpenRouter models by what the user's AI client can actually access, before upserting metadata (avoids bloating with inaccessible models). - Fall back to direct endpoint sync when no OpenRouter metadata matches (handles Bailian/MiniMax and other non-OpenRouter providers). Git stability fixes: - SSH: add 5s timeout on stdin flush/shutdown in channel_eof and cleanup_channel to prevent blocking the event loop on unresponsive git. - SSH: remove dbg!() calls from production code paths. - HTTP auth: pass proper Logger to SshAuthService instead of discarding all auth events to slog::Discard. Dependencies: - reqwest: add native-tls feature for HTTPS on Windows/Linux/macOS.
914 lines
32 KiB
Rust
914 lines
32 KiB
Rust
use crate::ssh::ReceiveSyncService;
|
|
use crate::ssh::RepoReceiveSyncTask;
|
|
use crate::ssh::SshTokenService;
|
|
use crate::ssh::authz::SshAuthService;
|
|
use db::cache::AppCache;
|
|
use db::database::AppDatabase;
|
|
use models::repos::{repo, repo_branch_protect};
|
|
use models::users::user;
|
|
use russh::keys::{Certificate, PublicKey};
|
|
use russh::server::{Auth, Handle, Msg, Session};
|
|
use russh::{Channel, ChannelId, CryptoVec, Disconnect};
|
|
use sea_orm::ColumnTrait;
|
|
use sea_orm::EntityTrait;
|
|
use sea_orm::QueryFilter;
|
|
use slog::{Logger, error, info, warn};
|
|
use std::collections::{HashMap, HashSet};
|
|
use std::io;
|
|
use std::net::SocketAddr;
|
|
use std::path::PathBuf;
|
|
use std::process::Stdio;
|
|
use std::str::FromStr;
|
|
use std::time::Duration;
|
|
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
|
|
use tokio::process::ChildStdin;
|
|
use tokio::sync::mpsc::Sender;
|
|
use tokio::time::sleep;
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct RefUpdate {
|
|
pub name: String,
|
|
pub old_oid: String,
|
|
pub new_oid: String,
|
|
}
|
|
|
|
impl RefUpdate {
|
|
/// Parse git reference update commands from SSH protocol text.
|
|
/// Format: "<old-oid> <new-oid> <ref-name>\n"
|
|
pub fn parse_ref_updates(data: &[u8]) -> Result<Vec<Self>, String> {
|
|
let text = String::from_utf8_lossy(data);
|
|
let mut refs = Vec::new();
|
|
for line in text.lines() {
|
|
let line = line.trim();
|
|
if line.is_empty() || line.starts_with('#') || line.starts_with("PACK") {
|
|
continue;
|
|
}
|
|
let mut parts = line.split_whitespace();
|
|
let old_oid = parts.next().map(|s| s.to_string()).unwrap_or_default();
|
|
let new_oid = parts.next().map(|s| s.to_string()).unwrap_or_default();
|
|
let name = parts
|
|
.next()
|
|
.unwrap_or("")
|
|
.trim_start_matches('\0')
|
|
.to_string();
|
|
if !name.is_empty() {
|
|
refs.push(RefUpdate {
|
|
old_oid,
|
|
new_oid,
|
|
name,
|
|
});
|
|
}
|
|
}
|
|
Ok(refs)
|
|
}
|
|
}
|
|
|
|
pub struct SSHandle {
|
|
pub repo: Option<PathBuf>,
|
|
pub model: Option<repo::Model>,
|
|
pub stdin: HashMap<ChannelId, ChildStdin>,
|
|
pub eof: HashMap<ChannelId, Sender<bool>>,
|
|
pub operator: Option<user::Model>,
|
|
pub db: AppDatabase,
|
|
pub auth: SshAuthService,
|
|
pub buffer: HashMap<ChannelId, Vec<u8>>,
|
|
pub branch: HashMap<ChannelId, Vec<RefUpdate>>,
|
|
pub service: Option<GitService>,
|
|
pub cache: AppCache,
|
|
pub sync: ReceiveSyncService,
|
|
pub upload_pack_eof_sent: HashSet<ChannelId>,
|
|
pub logger: Logger,
|
|
pub token_service: SshTokenService,
|
|
pub client_addr: Option<SocketAddr>,
|
|
}
|
|
|
|
impl SSHandle {
|
|
pub fn new(
|
|
db: AppDatabase,
|
|
cache: AppCache,
|
|
sync: ReceiveSyncService,
|
|
logger: Logger,
|
|
token_service: SshTokenService,
|
|
client_addr: Option<SocketAddr>,
|
|
) -> Self {
|
|
let auth = SshAuthService::new(db.clone(), logger.clone());
|
|
let addr_str = client_addr
|
|
.map(|addr| format!("{}", addr))
|
|
.unwrap_or_else(|| "unknown".to_string());
|
|
info!(logger, "SSH handler created for client: {}", addr_str);
|
|
Self {
|
|
repo: None,
|
|
model: None,
|
|
stdin: HashMap::new(),
|
|
eof: HashMap::new(),
|
|
operator: None,
|
|
db,
|
|
auth,
|
|
buffer: HashMap::new(),
|
|
branch: HashMap::new(),
|
|
service: None,
|
|
cache,
|
|
sync,
|
|
upload_pack_eof_sent: HashSet::new(),
|
|
logger,
|
|
token_service,
|
|
client_addr,
|
|
}
|
|
}
|
|
|
|
fn cleanup_channel(&mut self, channel_id: ChannelId) {
|
|
if let Some(mut stdin) = self.stdin.remove(&channel_id) {
|
|
tokio::spawn(async move {
|
|
let _ = tokio::time::timeout(Duration::from_secs(5), async {
|
|
stdin.flush().await.ok();
|
|
let _ = stdin.shutdown().await;
|
|
})
|
|
.await;
|
|
});
|
|
}
|
|
self.eof.remove(&channel_id);
|
|
self.upload_pack_eof_sent.remove(&channel_id);
|
|
}
|
|
}
|
|
|
|
impl Drop for SSHandle {
|
|
fn drop(&mut self) {
|
|
let addr_str = self
|
|
.client_addr
|
|
.map(|addr| format!("{}", addr))
|
|
.unwrap_or_else(|| "unknown".to_string());
|
|
info!(self.logger, "SSH handler dropped for client: {}", addr_str);
|
|
|
|
let channel_ids: Vec<_> = self.stdin.keys().copied().collect();
|
|
for channel_id in channel_ids {
|
|
self.cleanup_channel(channel_id);
|
|
}
|
|
}
|
|
}
|
|
|
|
impl russh::server::Handler for SSHandle {
|
|
type Error = russh::Error;
|
|
|
|
async fn auth_none(&mut self, user: &str) -> Result<Auth, Self::Error> {
|
|
let client_info = self
|
|
.client_addr
|
|
.map(|addr| format!("{}", addr))
|
|
.unwrap_or_else(|| "unknown".to_string());
|
|
info!(
|
|
self.logger,
|
|
"auth_none received for user '{}', client: {}", user, client_info
|
|
);
|
|
Ok(Auth::UnsupportedMethod)
|
|
}
|
|
|
|
async fn auth_password(&mut self, _user: &str, token: &str) -> Result<Auth, Self::Error> {
|
|
let client_info = self
|
|
.client_addr
|
|
.map(|addr| format!("{}", addr))
|
|
.unwrap_or_else(|| "unknown".to_string());
|
|
|
|
|
|
if token.is_empty() {
|
|
warn!(
|
|
self.logger,
|
|
"auth_password rejected: empty token, client: {}", client_info
|
|
);
|
|
return Err(russh::Error::NotAuthenticated);
|
|
}
|
|
|
|
info!(
|
|
self.logger,
|
|
"Attempting SSH token authentication, client: {}", client_info
|
|
);
|
|
|
|
let user_model = match self.token_service.find_user_by_token(token).await {
|
|
Ok(Some(model)) => model,
|
|
Ok(None) => {
|
|
warn!(
|
|
self.logger,
|
|
"SSH token auth rejected: token not found or expired, client: {}", client_info
|
|
);
|
|
return Err(russh::Error::NotAuthenticated);
|
|
}
|
|
Err(e) => {
|
|
error!(
|
|
self.logger,
|
|
"SSH token auth error: {}, client: {}", e, client_info
|
|
);
|
|
return Err(russh::Error::NotAuthenticated);
|
|
}
|
|
};
|
|
|
|
info!(
|
|
self.logger,
|
|
"SSH token authentication successful: user={}, client={}",
|
|
user_model.username,
|
|
client_info
|
|
);
|
|
self.operator = Some(user_model);
|
|
Ok(Auth::Accept)
|
|
}
|
|
async fn auth_publickey_offered(
|
|
&mut self,
|
|
user: &str,
|
|
public_key: &PublicKey,
|
|
) -> Result<Auth, Self::Error> {
|
|
self.auth_publickey(user, public_key).await
|
|
}
|
|
async fn auth_publickey(
|
|
&mut self,
|
|
user: &str,
|
|
public_key: &PublicKey,
|
|
) -> Result<Auth, Self::Error> {
|
|
let client_info = self
|
|
.client_addr
|
|
.map(|addr| format!("{}", addr))
|
|
.unwrap_or_else(|| "unknown".to_string());
|
|
|
|
if user != "git" {
|
|
let msg = format!(
|
|
"SSH auth rejected: invalid username '{}', client: {}",
|
|
user, client_info
|
|
);
|
|
warn!(self.logger, "{}", msg);
|
|
return Err(russh::Error::NotAuthenticated);
|
|
}
|
|
let public_key_str = public_key.to_string();
|
|
if public_key_str.len() < 32 {
|
|
let msg = format!(
|
|
"SSH auth rejected: invalid public key length ({}), client: {}",
|
|
public_key_str.len(),
|
|
client_info
|
|
);
|
|
warn!(self.logger, "{}", msg);
|
|
return Err(russh::Error::NotAuthenticated);
|
|
}
|
|
|
|
info!(
|
|
self.logger,
|
|
"Attempting SSH authentication with public key, client: {}", client_info
|
|
);
|
|
let user_model = match self.auth.find_user_by_public_key(&public_key_str).await {
|
|
Ok(Some(model)) => model,
|
|
Ok(None) => {
|
|
let msg = format!(
|
|
"SSH auth rejected: public key not found or invalid, client: {}",
|
|
client_info
|
|
);
|
|
warn!(self.logger, "{}", msg);
|
|
return Err(russh::Error::NotAuthenticated);
|
|
}
|
|
Err(e) => {
|
|
let msg = format!("SSH auth error: {}, client: {}", e, client_info);
|
|
error!(self.logger, "{}", msg);
|
|
return Err(russh::Error::NotAuthenticated);
|
|
}
|
|
};
|
|
|
|
info!(
|
|
self.logger,
|
|
"SSH authentication successful: user={}, client={}", user_model.username, client_info
|
|
);
|
|
self.operator = Some(user_model);
|
|
Ok(Auth::Accept)
|
|
}
|
|
async fn auth_openssh_certificate(
|
|
&mut self,
|
|
user: &str,
|
|
certificate: &Certificate,
|
|
) -> Result<Auth, Self::Error> {
|
|
let client_info = self
|
|
.client_addr
|
|
.map(|addr| format!("{}", addr))
|
|
.unwrap_or_else(|| "unknown".to_string());
|
|
|
|
if user != "git" {
|
|
let msg = format!(
|
|
"SSH auth rejected: invalid username '{}', client: {}",
|
|
user, client_info
|
|
);
|
|
warn!(self.logger, "{}", msg);
|
|
return Err(russh::Error::NotAuthenticated);
|
|
}
|
|
let public_key_str = certificate.to_string();
|
|
if public_key_str.len() < 32 {
|
|
let msg = format!(
|
|
"SSH auth rejected: invalid public key length ({}), client: {}",
|
|
public_key_str.len(),
|
|
client_info
|
|
);
|
|
warn!(self.logger, "{}", msg);
|
|
return Err(russh::Error::NotAuthenticated);
|
|
}
|
|
|
|
info!(
|
|
self.logger,
|
|
"Attempting SSH authentication with public key, client: {}", client_info
|
|
);
|
|
let user_model = match self.auth.find_user_by_public_key(&public_key_str).await {
|
|
Ok(Some(model)) => model,
|
|
Ok(None) => {
|
|
let msg = format!(
|
|
"SSH auth rejected: public key not found or invalid, client: {}",
|
|
client_info
|
|
);
|
|
warn!(self.logger, "{}", msg);
|
|
return Err(russh::Error::NotAuthenticated);
|
|
}
|
|
Err(e) => {
|
|
let msg = format!("SSH auth error: {}, client: {}", e, client_info);
|
|
error!(self.logger, "{}", msg);
|
|
return Err(russh::Error::NotAuthenticated);
|
|
}
|
|
};
|
|
|
|
info!(
|
|
self.logger,
|
|
"SSH authentication successful: user={}, client={}", user_model.username, client_info
|
|
);
|
|
self.operator = Some(user_model);
|
|
Ok(Auth::Accept)
|
|
}
|
|
async fn authentication_banner(&mut self) -> Result<Option<String>, Self::Error> {
|
|
Ok(None)
|
|
}
|
|
|
|
async fn channel_close(
|
|
&mut self,
|
|
channel: ChannelId,
|
|
_: &mut Session,
|
|
) -> Result<(), Self::Error> {
|
|
info!(self.logger, "{}", format!("channel_close channel={:?} client={:?}", channel, self.client_addr));
|
|
self.cleanup_channel(channel);
|
|
Ok(())
|
|
}
|
|
|
|
async fn channel_eof(
|
|
&mut self,
|
|
channel: ChannelId,
|
|
_: &mut Session,
|
|
) -> Result<(), Self::Error> {
|
|
info!(self.logger, "channel_eof";
|
|
"channel" => ?channel,
|
|
"client" => ?self.client_addr
|
|
);
|
|
|
|
if let Some(eof) = self.eof.get(&channel) {
|
|
let _ = eof.send(true).await;
|
|
}
|
|
|
|
if let Some(mut stdin) = self.stdin.remove(&channel) {
|
|
info!(self.logger, "Closing stdin";
|
|
"channel" => ?channel,
|
|
"client" => ?self.client_addr
|
|
);
|
|
// Use timeout so we never block the SSH event loop waiting for git.
|
|
let _ = tokio::time::timeout(Duration::from_secs(5), async {
|
|
stdin.flush().await.ok();
|
|
let _ = stdin.shutdown().await;
|
|
})
|
|
.await;
|
|
info!(self.logger, "stdin closed";
|
|
"channel" => ?channel,
|
|
"client" => ?self.client_addr
|
|
);
|
|
} else {
|
|
warn!(self.logger, "stdin already removed";
|
|
"channel" => ?channel,
|
|
"client" => ?self.client_addr
|
|
);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn channel_open_session(
|
|
&mut self,
|
|
channel: Channel<Msg>,
|
|
session: &mut Session,
|
|
) -> Result<bool, Self::Error> {
|
|
let client_info = self
|
|
.client_addr
|
|
.map(|addr| format!("{}", addr))
|
|
.unwrap_or_else(|| "unknown".to_string());
|
|
info!(self.logger, "{}", format!("channel_open_session channel={:?} client={}", channel, client_info));
|
|
let _ = session.flush().ok();
|
|
Ok(true)
|
|
}
|
|
|
|
async fn pty_request(
|
|
&mut self,
|
|
channel: ChannelId,
|
|
term: &str,
|
|
col_width: u32,
|
|
row_height: u32,
|
|
_pix_width: u32,
|
|
_pix_height: u32,
|
|
_modes: &[(russh::Pty, u32)],
|
|
session: &mut Session,
|
|
) -> Result<(), Self::Error> {
|
|
let client_info = self
|
|
.client_addr
|
|
.map(|addr| format!("{}", addr))
|
|
.unwrap_or_else(|| "unknown".to_string());
|
|
warn!(self.logger, "{}", format!("pty_request not supported channel={:?} term={} cols={} rows={} client={}", channel, term, col_width, row_height, client_info));
|
|
let _ = session.flush().ok();
|
|
Ok(())
|
|
}
|
|
|
|
async fn subsystem_request(
|
|
&mut self,
|
|
channel: ChannelId,
|
|
name: &str,
|
|
session: &mut Session,
|
|
) -> Result<(), Self::Error> {
|
|
let client_info = self
|
|
.client_addr
|
|
.map(|addr| format!("{}", addr))
|
|
.unwrap_or_else(|| "unknown".to_string());
|
|
info!(self.logger, "{}", format!("subsystem_request channel={:?} subsystem={} client={}", channel, name, client_info));
|
|
// git-clients may send "subsystem" for git protocol over ssh.
|
|
// We don't use subsystem; exec_request handles it directly.
|
|
let _ = session.flush().ok();
|
|
Ok(())
|
|
}
|
|
async fn data(
|
|
&mut self,
|
|
channel: ChannelId,
|
|
data: &[u8],
|
|
session: &mut Session,
|
|
) -> Result<(), Self::Error> {
|
|
if matches!(self.service, Some(GitService::ReceivePack)) {
|
|
if !self.branch.contains_key(&channel) {
|
|
let bf = self.buffer.entry(channel).or_default();
|
|
bf.extend_from_slice(data);
|
|
|
|
if !bf.windows(4).any(|w| w == b"0000") {
|
|
return Ok(());
|
|
}
|
|
|
|
let buffered = self.buffer.remove(&channel).unwrap_or_default();
|
|
|
|
match RefUpdate::parse_ref_updates(&buffered) {
|
|
Ok(refs) => {
|
|
if let Some(model) = &self.model {
|
|
let branch_protect_roles = repo_branch_protect::Entity::find()
|
|
.filter(repo_branch_protect::Column::Repo.eq(model.id))
|
|
.all(self.db.reader())
|
|
.await
|
|
.map_err(|e| {
|
|
russh::Error::IO(io::Error::new(io::ErrorKind::Other, e))
|
|
})?;
|
|
|
|
for r#ref in &refs {
|
|
if branch_protect_roles
|
|
.iter()
|
|
.any(|x| r#ref.name.starts_with(&x.branch))
|
|
{
|
|
let msg =
|
|
format!("remote: Branch '{}' is protected\r\n", r#ref.name);
|
|
let _ = session.extended_data(
|
|
channel,
|
|
1,
|
|
CryptoVec::from_slice(msg.as_bytes()),
|
|
);
|
|
let _ = session.exit_status_request(channel, 1);
|
|
let _ = session.eof(channel);
|
|
let _ = session.close(channel);
|
|
self.cleanup_channel(channel);
|
|
return Ok(());
|
|
}
|
|
}
|
|
}
|
|
self.branch.insert(channel, refs);
|
|
}
|
|
Err(e) => {
|
|
warn!(self.logger, "{}", format!("Failed to parse ref updates, forwarding raw data error={:?}", e));
|
|
self.branch.insert(channel, vec![]);
|
|
}
|
|
}
|
|
|
|
if let Some(stdin) = self.stdin.get_mut(&channel) {
|
|
stdin.write_all(&buffered).await?;
|
|
stdin.flush().await?;
|
|
} else {
|
|
error!(self.logger, "{}", format!("stdin not found channel={:?}", channel));
|
|
}
|
|
return Ok(());
|
|
}
|
|
|
|
if let Some(stdin) = self.stdin.get_mut(&channel) {
|
|
stdin.write_all(data).await?;
|
|
stdin.flush().await?;
|
|
} else {
|
|
error!(self.logger, "{}", format!("stdin not found (forwarding) channel={:?}", channel));
|
|
}
|
|
return Ok(());
|
|
}
|
|
|
|
if let Some(stdin) = self.stdin.get_mut(&channel) {
|
|
stdin.write_all(data).await?;
|
|
if matches!(self.service, Some(GitService::UploadPack))
|
|
&& !self.upload_pack_eof_sent.contains(&channel)
|
|
{
|
|
let has_flush_pkt = data.windows(4).any(|w| w == b"0000");
|
|
if has_flush_pkt {
|
|
stdin.flush().await?;
|
|
let _ = stdin.shutdown().await;
|
|
self.upload_pack_eof_sent.insert(channel);
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
async fn shell_request(
|
|
&mut self,
|
|
channel_id: ChannelId,
|
|
session: &mut Session,
|
|
) -> Result<(), Self::Error> {
|
|
if let Some(user) = &self.operator {
|
|
let welcome_msg = format!(
|
|
"Hi {}! You've successfully authenticated, but GitData does not provide shell access.\r\n",
|
|
user.username
|
|
);
|
|
|
|
info!(self.logger, "{}", format!("Shell request user={}", user.username));
|
|
session
|
|
.data(channel_id, CryptoVec::from_slice(welcome_msg.as_bytes()))
|
|
.ok();
|
|
session.exit_status_request(channel_id, 0).ok();
|
|
session.eof(channel_id).ok();
|
|
session.close(channel_id).ok();
|
|
let _ = session.flush().ok();
|
|
} else {
|
|
warn!(self.logger, "Shell request without authentication");
|
|
let msg = "Authentication required\r\n";
|
|
session
|
|
.data(channel_id, CryptoVec::from_slice(msg.as_bytes()))
|
|
.ok();
|
|
session.exit_status_request(channel_id, 1).ok();
|
|
session.eof(channel_id).ok();
|
|
session.close(channel_id).ok();
|
|
let _ = session.flush().ok();
|
|
}
|
|
Ok(())
|
|
}
|
|
async fn exec_request(
|
|
&mut self,
|
|
channel_id: ChannelId,
|
|
data: &[u8],
|
|
session: &mut Session,
|
|
) -> Result<(), Self::Error> {
|
|
let client_info = self
|
|
.client_addr
|
|
.map(|addr| format!("{}", addr))
|
|
.unwrap_or_else(|| "unknown".to_string());
|
|
|
|
info!(
|
|
self.logger,
|
|
"exec_request received, channel: {:?}, client: {}", channel_id, client_info
|
|
);
|
|
|
|
let git_shell_cmd = match std::str::from_utf8(data) {
|
|
Ok(cmd) => cmd.trim(),
|
|
Err(e) => {
|
|
error!(self.logger, "{}", format!("Invalid command encoding error={}", e));
|
|
session
|
|
.disconnect(
|
|
Disconnect::ServiceNotAvailable,
|
|
"Invalid command encoding",
|
|
"",
|
|
)
|
|
.ok();
|
|
return Err(russh::Error::Disconnect);
|
|
}
|
|
};
|
|
let (service, path) = match parse_git_command(git_shell_cmd) {
|
|
Some((s, p)) => (s, p),
|
|
None => {
|
|
error!(self.logger, "{}", format!("Invalid git command command={}", git_shell_cmd));
|
|
let msg = format!("Invalid git command: {}", git_shell_cmd);
|
|
session
|
|
.disconnect(Disconnect::ServiceNotAvailable, &msg, "")
|
|
.ok();
|
|
return Err(russh::Error::Disconnect);
|
|
}
|
|
};
|
|
self.service = Some(service);
|
|
let (owner, repo) = match parse_repo_path(path) {
|
|
Some(pair) => pair,
|
|
None => {
|
|
let msg = format!("Invalid repository path: {}", path);
|
|
error!(self.logger, "{}", format!("Invalid repo path path={}", path));
|
|
session
|
|
.disconnect(Disconnect::ServiceNotAvailable, &msg, "")
|
|
.ok();
|
|
return Err(russh::Error::Disconnect);
|
|
}
|
|
};
|
|
let repo = repo.strip_suffix(".git").unwrap_or(repo).to_string();
|
|
|
|
let repo = match self.auth.find_repo(owner, &repo).await {
|
|
Ok(repo) => repo,
|
|
Err(e) => {
|
|
// Log the detailed error internally; client receives generic message.
|
|
error!(self.logger, "{}", format!("Error fetching repo error={}", e));
|
|
session
|
|
.disconnect(Disconnect::ServiceNotAvailable, "Repository not found", "")
|
|
.ok();
|
|
return Err(russh::Error::Disconnect);
|
|
}
|
|
};
|
|
|
|
self.model = Some(repo.clone());
|
|
let operator = match &self.operator {
|
|
Some(user) => user,
|
|
None => {
|
|
let msg = "Authentication error: no authenticated user";
|
|
error!(self.logger, "No authenticated user");
|
|
session.disconnect(Disconnect::ByApplication, msg, "").ok();
|
|
return Err(russh::Error::Disconnect);
|
|
}
|
|
};
|
|
|
|
let is_write = service == GitService::ReceivePack;
|
|
let has_permission = self
|
|
.auth
|
|
.check_repo_permission(operator, &repo, is_write)
|
|
.await;
|
|
|
|
if !has_permission {
|
|
let msg = format!(
|
|
"Access denied: user '{}' does not have {} permission for repository {}",
|
|
operator.username,
|
|
if is_write { "write" } else { "read" },
|
|
repo.repo_name
|
|
);
|
|
error!(self.logger, "{}", format!("Access denied user={} repo={} is_write={}", operator.username, repo.repo_name, is_write));
|
|
session.disconnect(Disconnect::ByApplication, &msg, "").ok();
|
|
return Err(russh::Error::Disconnect);
|
|
}
|
|
|
|
info!(self.logger, "{}", format!("Access granted user={} repo={} is_write={}", operator.username, repo.repo_name, is_write));
|
|
|
|
let repo_path = PathBuf::from(&repo.storage_path);
|
|
if !repo_path.exists() {
|
|
error!(self.logger, "{}", format!("Repository path not found path={}", repo.storage_path));
|
|
}
|
|
let mut cmd = build_git_command(service, repo_path);
|
|
let logger = self.logger.clone();
|
|
info!(&logger, "{}", format!("Spawning git process service={:?} path={}", service, repo.storage_path));
|
|
let mut shell = match cmd
|
|
.stdin(Stdio::piped())
|
|
.stdout(Stdio::piped())
|
|
.stderr(Stdio::piped())
|
|
.spawn()
|
|
{
|
|
Ok(shell) => {
|
|
let _ = session.channel_success(channel_id);
|
|
shell
|
|
}
|
|
Err(e) => {
|
|
error!(&logger, "{}", format!("Process spawn failed error={}", e));
|
|
let _ = session.channel_failure(channel_id);
|
|
self.cleanup_channel(channel_id);
|
|
return Err(russh::Error::IO(e));
|
|
}
|
|
};
|
|
let session_handle = session.handle();
|
|
let stdin = shell.stdin.take().unwrap();
|
|
self.stdin.insert(channel_id, stdin);
|
|
let mut shell_stdout = shell.stdout.take().unwrap();
|
|
let mut shell_stderr = shell.stderr.take().unwrap();
|
|
|
|
let (eof_tx, mut eof_rx) = tokio::sync::mpsc::channel::<bool>(10);
|
|
self.eof.insert(channel_id, eof_tx);
|
|
let repo_uid = repo.id;
|
|
let should_sync = service == GitService::ReceivePack;
|
|
let sync = self.sync.clone();
|
|
let logger_for_fut = self.logger.clone();
|
|
let fut = async move {
|
|
info!(&logger_for_fut, "{}", format!("Task started channel={:?}", channel_id));
|
|
|
|
let mut stdout_done = false;
|
|
let mut stderr_done = false;
|
|
|
|
let stdout_fut = forward(
|
|
&session_handle,
|
|
channel_id,
|
|
&mut shell_stdout,
|
|
|handle, chan, data| async move { handle.data(chan, data).await },
|
|
);
|
|
tokio::pin!(stdout_fut);
|
|
|
|
let stderr_fut = forward(
|
|
&session_handle,
|
|
channel_id,
|
|
&mut shell_stderr,
|
|
|handle, chan, data| async move { handle.extended_data(chan, 1, data).await },
|
|
);
|
|
tokio::pin!(stderr_fut);
|
|
|
|
loop {
|
|
tokio::select! {
|
|
result = shell.wait() => {
|
|
let status = result?;
|
|
let status_code = status.code().unwrap_or(128) as u32;
|
|
|
|
info!(&logger_for_fut, "{}", format!("Git process exited channel={:?} status={}", channel_id, status_code));
|
|
|
|
if !stdout_done || !stderr_done {
|
|
let _ = tokio::time::timeout(Duration::from_millis(100), async {
|
|
tokio::join!(
|
|
async {
|
|
if !stdout_done {
|
|
let _ = (&mut stdout_fut).await;
|
|
}
|
|
},
|
|
async {
|
|
if !stderr_done {
|
|
let _ = (&mut stderr_fut).await;
|
|
}
|
|
}
|
|
);
|
|
}).await;
|
|
}
|
|
|
|
if should_sync {
|
|
let sync = sync.clone();
|
|
tokio::spawn(async move {
|
|
sync.send(RepoReceiveSyncTask { repo_uid }).await
|
|
});
|
|
}
|
|
|
|
let _ = session_handle.exit_status_request(channel_id, status_code).await;
|
|
sleep(Duration::from_millis(50)).await;
|
|
let _ = session_handle.eof(channel_id).await;
|
|
let _ = session_handle.close(channel_id).await;
|
|
info!(&logger_for_fut, "{}", format!("Channel closed channel={:?}", channel_id));
|
|
break;
|
|
}
|
|
result = &mut stdout_fut, if !stdout_done => {
|
|
info!(&logger_for_fut, "stdout completed");
|
|
stdout_done = true;
|
|
if let Err(e) = result {
|
|
warn!(&logger_for_fut, "{}", format!("stdout forward error error={:?}", e));
|
|
}
|
|
}
|
|
result = &mut stderr_fut, if !stderr_done => {
|
|
info!(&logger_for_fut, "stderr completed");
|
|
stderr_done = true;
|
|
if let Err(e) = result {
|
|
warn!(&logger_for_fut, "{}", format!("stderr forward error error={:?}", e));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok::<(), russh::Error>(())
|
|
};
|
|
|
|
tokio::spawn(async move {
|
|
if let Err(e) = fut.await {
|
|
error!(&logger, "{}", format!("Git SSH channel task error error={}", e));
|
|
}
|
|
while eof_rx.recv().await.is_some() {}
|
|
});
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
fn parse_git_command(cmd: &str) -> Option<(GitService, &str)> {
|
|
let (svc, path) = match cmd.split_once(' ') {
|
|
Some(("git-receive-pack", path)) => (GitService::ReceivePack, path),
|
|
Some(("git-upload-pack", path)) => (GitService::UploadPack, path),
|
|
Some(("git-upload-archive", path)) => (GitService::UploadArchive, path),
|
|
_ => return None,
|
|
};
|
|
Some((svc, strip_apostrophes(path)))
|
|
}
|
|
|
|
fn parse_repo_path(path: &str) -> Option<(&str, &str)> {
|
|
let path = path.trim_matches('/');
|
|
let mut parts = path.splitn(2, '/');
|
|
match (parts.next(), parts.next()) {
|
|
(Some(owner), Some(repo)) if !owner.is_empty() && !repo.is_empty() => Some((owner, repo)),
|
|
_ => None,
|
|
}
|
|
}
|
|
|
|
fn build_git_command(service: GitService, path: PathBuf) -> tokio::process::Command {
|
|
let mut cmd = tokio::process::Command::new("git");
|
|
|
|
// Canonicalize only for validation; if it fails, fall back to the raw path.
|
|
// Using canonicalize for current_dir is safe since we validate repo existence
|
|
// before reaching this point.
|
|
let cwd = match path.canonicalize() {
|
|
Ok(p) => p,
|
|
Err(e) => {
|
|
// Log and continue with the raw path — the git process will fail
|
|
// with a clear "repository not found" message rather than panicking here.
|
|
let _ = e;
|
|
path.clone()
|
|
}
|
|
};
|
|
cmd.current_dir(cwd);
|
|
|
|
match service {
|
|
GitService::UploadPack => { cmd.arg("upload-pack"); }
|
|
GitService::ReceivePack => { cmd.arg("receive-pack"); }
|
|
GitService::UploadArchive => { cmd.arg("upload-archive"); }
|
|
}
|
|
|
|
cmd.arg(".")
|
|
.env("GIT_CONFIG_NOSYSTEM", "1")
|
|
.env("GIT_NO_REPLACE_OBJECTS", "1");
|
|
|
|
#[cfg(unix)]
|
|
{
|
|
cmd.env("GIT_CONFIG_GLOBAL", "/dev/null")
|
|
.env("GIT_CONFIG_SYSTEM", "/dev/null");
|
|
}
|
|
#[cfg(windows)]
|
|
{
|
|
// On Windows, /dev/null doesn't exist. Set invalid paths so git
|
|
// ignores them without crashing. GIT_CONFIG_NOSYSTEM already disables
|
|
// the system config.
|
|
let nul = "NUL";
|
|
cmd.env("GIT_CONFIG_GLOBAL", nul)
|
|
.env("GIT_CONFIG_SYSTEM", nul);
|
|
}
|
|
|
|
cmd
|
|
}
|
|
|
|
fn strip_apostrophes(s: &str) -> &str {
|
|
s.trim_matches('\'')
|
|
}
|
|
|
|
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
|
|
pub enum GitService {
|
|
UploadPack,
|
|
ReceivePack,
|
|
UploadArchive,
|
|
}
|
|
|
|
impl FromStr for GitService {
|
|
type Err = ();
|
|
|
|
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
|
match s {
|
|
"upload-pack" => Ok(Self::UploadPack),
|
|
"receive-pack" => Ok(Self::ReceivePack),
|
|
"upload-archive" => Ok(Self::UploadArchive),
|
|
_ => Err(()),
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn forward<'a, R, Fut, Fwd>(
|
|
session_handle: &'a Handle,
|
|
chan_id: ChannelId,
|
|
r: &mut R,
|
|
mut fwd: Fwd,
|
|
) -> Result<(), russh::Error>
|
|
where
|
|
R: AsyncRead + Send + Unpin,
|
|
Fut: Future<Output = Result<(), CryptoVec>> + 'a,
|
|
Fwd: FnMut(&'a Handle, ChannelId, CryptoVec) -> Fut,
|
|
{
|
|
const BUF_SIZE: usize = 1024 * 32;
|
|
const MAX_RETRIES: usize = 5;
|
|
const RETRY_DELAY: u64 = 10; // ms
|
|
|
|
let mut buf = [0u8; BUF_SIZE];
|
|
loop {
|
|
let read = r.read(&mut buf).await?;
|
|
|
|
if read == 0 {
|
|
break;
|
|
}
|
|
|
|
let mut chunk = CryptoVec::from_slice(&buf[..read]);
|
|
let mut retries = 0;
|
|
loop {
|
|
match fwd(session_handle, chan_id, chunk).await {
|
|
Ok(()) => break,
|
|
Err(unsent) => {
|
|
retries += 1;
|
|
if retries >= MAX_RETRIES {
|
|
// Give up — connection is likely broken. Returning Ok (not Err)
|
|
// so the outer task can clean up gracefully without logging
|
|
// a spurious error for a normal disconnection.
|
|
return Ok(());
|
|
}
|
|
chunk = unsent;
|
|
sleep(Duration::from_millis(RETRY_DELAY)).await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|