From 4c4c33f9704f4c2ee28ccaa25bb142d8b3860e83 Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Thu, 14 May 2026 10:02:00 +0800 Subject: [PATCH] refactor(git,gingress-proxy): apply rustfmt formatting --- libs/gingress-proxy/src/server.rs | 38 ++++--- libs/gingress-proxy/src/tls.rs | 9 +- libs/git/archive/ops.rs | 28 ++++- libs/git/branch/ops.rs | 2 +- libs/git/branch/query.rs | 16 ++- libs/git/commit/traverse.rs | 4 +- libs/git/diff/ops.rs | 46 ++++++-- libs/git/hook/embed.rs | 5 +- libs/git/hook/mod.rs | 9 +- libs/git/hook/pool/redis.rs | 43 +++++++- libs/git/hook/pool/worker.rs | 42 ++++--- libs/git/hook/sync/commit.rs | 14 ++- libs/git/hook/sync/mod.rs | 29 +++-- libs/git/hook/sync/tag.rs | 12 +- libs/git/hook/webhook_dispatch.rs | 46 ++++++-- libs/git/http/handler.rs | 39 ++++++- libs/git/http/lfs.rs | 28 +++-- libs/git/http/lfs_routes.rs | 34 +++--- libs/git/http/mod.rs | 80 +++++++------- libs/git/http/rate_limit.rs | 16 ++- libs/git/http/utils.rs | 2 +- libs/git/lib.rs | 4 +- libs/git/ssh/authz.rs | 53 +++++++-- libs/git/ssh/branch_protect.rs | 3 +- libs/git/ssh/forward.rs | 2 +- libs/git/ssh/git_service.rs | 12 +- libs/git/ssh/handle.rs | 177 +++++++++++++++++++++--------- libs/git/ssh/mod.rs | 48 +++++--- 28 files changed, 579 insertions(+), 262 deletions(-) diff --git a/libs/gingress-proxy/src/server.rs b/libs/gingress-proxy/src/server.rs index ea90f60..612ca83 100644 --- a/libs/gingress-proxy/src/server.rs +++ b/libs/gingress-proxy/src/server.rs @@ -44,19 +44,21 @@ impl GIngressProxy { /// Match a request to a route rule based on host and path. fn match_route(cfg: &crate::config::ProxyConfig, host: &str, path: &str) -> Option { - cfg.routes.get(host).and_then(|rules| { - rules.iter().find(|r| match r.path_type { - crate::config::PathType::Prefix | crate::config::PathType::ImplementationSpecific => { - path.starts_with(&r.path) - } - crate::config::PathType::Exact => path == r.path, + cfg.routes + .get(host) + .and_then(|rules| { + rules.iter().find(|r| match r.path_type { + crate::config::PathType::Prefix + | crate::config::PathType::ImplementationSpecific => path.starts_with(&r.path), + crate::config::PathType::Exact => path == r.path, + }) + }) + .map(|r| { + format!( + "upstream:{}/{}:{}", + r.backend.namespace, r.backend.name, r.backend.port + ) }) - }).map(|r| { - format!( - "upstream:{}/{}:{}", - r.backend.namespace, r.backend.name, r.backend.port - ) - }) } } @@ -128,7 +130,11 @@ impl ProxyHttp for GIngressProxy { } None => pingora::Error::e_explain( pingora::ErrorType::InternalError, - format!("no upstream found for host '{}' path '{}'", host, session.req_header().uri.path()), + format!( + "no upstream found for host '{}' path '{}'", + host, + session.req_header().uri.path() + ), ), } } @@ -143,11 +149,7 @@ impl ProxyHttp for GIngressProxy { .unwrap() .run_pre(session, ctx) .map_err(|e| { - pingora::Error::because( - pingora::ErrorType::InternalError, - "pre-filter failed", - e, - ) + pingora::Error::because(pingora::ErrorType::InternalError, "pre-filter failed", e) })?; Ok(false) } diff --git a/libs/gingress-proxy/src/tls.rs b/libs/gingress-proxy/src/tls.rs index 2e7f01a..9512cbc 100644 --- a/libs/gingress-proxy/src/tls.rs +++ b/libs/gingress-proxy/src/tls.rs @@ -5,9 +5,9 @@ use crate::config::ConfigStore; use anyhow::Context; +use rustls::ServerConfig; use rustls::server::ResolvesServerCert; use rustls::sign::CertifiedKey; -use rustls::ServerConfig; use std::collections::HashMap; use std::fmt; use std::sync::Arc; @@ -43,12 +43,7 @@ impl SniResolver { } /// Add a certificate for a specific hostname. - pub fn add_cert( - &mut self, - host: &str, - cert_pem: &str, - key_pem: &str, - ) -> anyhow::Result<()> { + pub fn add_cert(&mut self, host: &str, cert_pem: &str, key_pem: &str) -> anyhow::Result<()> { let cert_chain = rustls_pemfile::certs(&mut cert_pem.as_bytes()) .collect::, _>>() .context("Failed to parse certificate PEM")?; diff --git a/libs/git/archive/ops.rs b/libs/git/archive/ops.rs index 6fb63f7..7dc69f3 100644 --- a/libs/git/archive/ops.rs +++ b/libs/git/archive/ops.rs @@ -267,7 +267,12 @@ impl GitDomain { let obj = match self.repo().find_object(oid, None) { Ok(o) => o, Err(e) => { - tracing::warn!("archive_skip_missing_object oid={} path={} error={}", oid, full_path, e); + tracing::warn!( + "archive_skip_missing_object oid={} path={} error={}", + oid, + full_path, + e + ); continue; } }; @@ -385,7 +390,12 @@ impl GitDomain { let obj = match self.repo().find_object(oid, None) { Ok(o) => o, Err(e) => { - tracing::warn!("archive_skip_missing_object oid={} path={} error={}", oid, full_path, e); + tracing::warn!( + "archive_skip_missing_object oid={} path={} error={}", + oid, + full_path, + e + ); continue; } }; @@ -464,7 +474,12 @@ impl GitDomain { let obj = match self.repo().find_object(oid, None) { Ok(o) => o, Err(e) => { - tracing::warn!("archive_skip_missing_object oid={} path={} error={}", oid, full_path, e); + tracing::warn!( + "archive_skip_missing_object oid={} path={} error={}", + oid, + full_path, + e + ); continue; } }; @@ -528,7 +543,12 @@ impl GitDomain { let obj = match self.repo().find_object(oid, None) { Ok(o) => o, Err(e) => { - tracing::warn!("archive_list_skip_missing_object oid={} path={} error={}", oid, full_path, e); + tracing::warn!( + "archive_list_skip_missing_object oid={} path={} error={}", + oid, + full_path, + e + ); continue; } }; diff --git a/libs/git/branch/ops.rs b/libs/git/branch/ops.rs index cbb03d9..6d9f900 100644 --- a/libs/git/branch/ops.rs +++ b/libs/git/branch/ops.rs @@ -71,7 +71,7 @@ impl GitDomain { pub fn branch_delete_remote(&self, name: &str) -> GitResult<()> { let full_name = format!("refs/remotes/{}", name); -let mut branch = self + let mut branch = self .repo() .find_branch(&full_name, BranchType::Local) .map_err(|_e| GitError::RefNotFound(full_name.clone()))?; diff --git a/libs/git/branch/query.rs b/libs/git/branch/query.rs index 6b06df6..7912fc1 100644 --- a/libs/git/branch/query.rs +++ b/libs/git/branch/query.rs @@ -16,7 +16,11 @@ impl GitDomain { let mut branches = Vec::with_capacity(16); // Keep head_name as full ref for comparison with branch names - let head_name = self.repo.head().ok().and_then(|r| r.name().map(String::from)); + let head_name = self + .repo + .head() + .ok() + .and_then(|r| r.name().map(String::from)); for branch_result in self .repo() @@ -94,7 +98,11 @@ impl GitDomain { self.repo .find_branch(full_name, git2::BranchType::Local) .ok() - .or_else(|| self.repo.find_branch(full_name, git2::BranchType::Remote).ok()) + .or_else(|| { + self.repo + .find_branch(full_name, git2::BranchType::Remote) + .ok() + }) }) .ok_or_else(|| GitError::RefNotFound(name.to_string()))?; @@ -139,9 +147,7 @@ impl GitDomain { }; candidates.iter().any(|full_name| { - self.repo - .find_branch(full_name, BranchType::Local) - .is_ok() + self.repo.find_branch(full_name, BranchType::Local).is_ok() || self .repo() .find_branch(full_name, BranchType::Remote) diff --git a/libs/git/commit/traverse.rs b/libs/git/commit/traverse.rs index 6161554..b46c3fd 100644 --- a/libs/git/commit/traverse.rs +++ b/libs/git/commit/traverse.rs @@ -206,7 +206,9 @@ impl GitDomain { return Ok(CommitOid::from_git2(target)); } } - return Err(GitError::InvalidOid("cannot resolve: HEAD (detached or empty)".into())); + return Err(GitError::InvalidOid( + "cannot resolve: HEAD (detached or empty)".into(), + )); } if let Ok(reference) = self.repo.find_reference(rev) { diff --git a/libs/git/diff/ops.rs b/libs/git/diff/ops.rs index d7a33e1..d423b98 100644 --- a/libs/git/diff/ops.rs +++ b/libs/git/diff/ops.rs @@ -21,7 +21,8 @@ impl GitDomain { let o = oid .to_oid() .map_err(|_| GitError::InvalidOid(oid.to_string()))?; - let obj = self.repo() + let obj = self + .repo() .find_object(o, None) .map_err(|e| GitError::Internal(e.to_string()))?; Some( @@ -36,7 +37,8 @@ impl GitDomain { let o = oid .to_oid() .map_err(|_| GitError::InvalidOid(oid.to_string()))?; - let obj = self.repo() + let obj = self + .repo() .find_object(o, None) .map_err(|e| GitError::Internal(e.to_string()))?; Some( @@ -196,11 +198,21 @@ impl GitDomain { .to_oid() .map_err(|_| GitError::InvalidOid(new_tree.to_string()))?; - let old_obj = self.repo().find_object(old_oid, None).map_err(|e| GitError::Internal(e.to_string()))?; - let new_obj = self.repo().find_object(new_oid, None).map_err(|e| GitError::Internal(e.to_string()))?; - - let old_tree = old_obj.peel_to_tree().map_err(|e| GitError::Internal(e.to_string()))?; - let new_tree = new_obj.peel_to_tree().map_err(|e| GitError::Internal(e.to_string()))?; + let old_obj = self + .repo() + .find_object(old_oid, None) + .map_err(|e| GitError::Internal(e.to_string()))?; + let new_obj = self + .repo() + .find_object(new_oid, None) + .map_err(|e| GitError::Internal(e.to_string()))?; + + let old_tree = old_obj + .peel_to_tree() + .map_err(|e| GitError::Internal(e.to_string()))?; + let new_tree = new_obj + .peel_to_tree() + .map_err(|e| GitError::Internal(e.to_string()))?; let diff = self .repo() @@ -222,11 +234,21 @@ impl GitDomain { .to_oid() .map_err(|_| GitError::InvalidOid(new_tree.to_string()))?; - let old_obj = self.repo().find_object(old_oid, None).map_err(|e| GitError::Internal(e.to_string()))?; - let new_obj = self.repo().find_object(new_oid, None).map_err(|e| GitError::Internal(e.to_string()))?; - - let old_tree = old_obj.peel_to_tree().map_err(|e| GitError::Internal(e.to_string()))?; - let new_tree = new_obj.peel_to_tree().map_err(|e| GitError::Internal(e.to_string()))?; + let old_obj = self + .repo() + .find_object(old_oid, None) + .map_err(|e| GitError::Internal(e.to_string()))?; + let new_obj = self + .repo() + .find_object(new_oid, None) + .map_err(|e| GitError::Internal(e.to_string()))?; + + let old_tree = old_obj + .peel_to_tree() + .map_err(|e| GitError::Internal(e.to_string()))?; + let new_tree = new_obj + .peel_to_tree() + .map_err(|e| GitError::Internal(e.to_string()))?; let diff = self .repo() diff --git a/libs/git/hook/embed.rs b/libs/git/hook/embed.rs index b76f7d8..c3c2abf 100644 --- a/libs/git/hook/embed.rs +++ b/libs/git/hook/embed.rs @@ -4,5 +4,8 @@ use models::TagEmbedInput; /// Defined here to avoid git → agent dependency. #[async_trait::async_trait] pub trait TagEmbedder: Send + Sync { - async fn embed_tags_batch(&self, tags: Vec) -> Result<(), Box>; + async fn embed_tags_batch( + &self, + tags: Vec, + ) -> Result<(), Box>; } diff --git a/libs/git/hook/mod.rs b/libs/git/hook/mod.rs index 89d4d2d..a5d0770 100644 --- a/libs/git/hook/mod.rs +++ b/libs/git/hook/mod.rs @@ -12,8 +12,8 @@ pub mod sync; pub mod webhook_dispatch; pub use embed::TagEmbedder; -pub use pool::{HookWorker, PoolConfig, RedisConsumer}; pub use pool::types::{HookTask, TaskType}; +pub use pool::{HookWorker, PoolConfig, RedisConsumer}; /// Hook service that manages the Redis-backed task queue worker. /// Multiple gitserver pods can run concurrently — the worker acquires a @@ -28,12 +28,7 @@ pub struct HookService { } impl HookService { - pub fn new( - db: AppDatabase, - cache: AppCache, - redis_pool: RedisPool, - config: AppConfig, - ) -> Self { + pub fn new(db: AppDatabase, cache: AppCache, redis_pool: RedisPool, config: AppConfig) -> Self { Self { db, cache, diff --git a/libs/git/hook/pool/redis.rs b/libs/git/hook/pool/redis.rs index 8d912fd..ed0241a 100644 --- a/libs/git/hook/pool/redis.rs +++ b/libs/git/hook/pool/redis.rs @@ -8,8 +8,25 @@ use std::time::Duration; /// NATS consumer function type: returns (task, ack_fn) pairs. pub type NatsHookConsumeFn = Arc< - dyn Fn(String, usize) -> Pin, Box Pin> + Send>> + Send>)>>> + Send>> - + Send + dyn Fn( + String, + usize, + ) -> Pin< + Box< + dyn Future< + Output = anyhow::Result< + Vec<( + Vec, + Box< + dyn Fn() -> Pin< + Box> + Send>, + > + Send, + >, + )>, + >, + > + Send, + >, + > + Send + Sync, >; @@ -101,7 +118,11 @@ impl RedisConsumer { match serde_json::from_slice::(&data) { Ok(task) => { let task_json = String::from_utf8_lossy(&data).to_string(); - tracing::debug!("task dequeued from NATS task_id={} task_type={}", task.id, task.task_type); + tracing::debug!( + "task dequeued from NATS task_id={} task_type={}", + task.id, + task.task_type + ); // Store ack_fn for later use - we'll need to refactor to support async ack // For now, we'll ack immediately after processing in the worker @@ -141,12 +162,21 @@ impl RedisConsumer { Some(json) => { match serde_json::from_str::(&json) { Ok(task) => { - tracing::debug!("task dequeued task_id={} task_type={} queue={}", task.id, task.task_type, queue_key); + tracing::debug!( + "task dequeued task_id={} task_type={} queue={}", + task.id, + task.task_type, + queue_key + ); Ok(Some((task, json))) } Err(e) => { // Malformed task — remove from work queue and discard - tracing::warn!("malformed task JSON, discarding error={} queue={}", e, work_key); + tracing::warn!( + "malformed task JSON, discarding error={} queue={}", + e, + work_key + ); let _ = self.ack_raw(&work_key, &json).await; Ok(None) } @@ -190,7 +220,8 @@ impl RedisConsumer { queue_key: &str, task_json: &str, ) -> Result<(), GitError> { - self.nak_with_retry(work_key, queue_key, task_json, task_json).await + self.nak_with_retry(work_key, queue_key, task_json, task_json) + .await } /// Negative acknowledge with a different (updated) task JSON — used to diff --git a/libs/git/hook/pool/worker.rs b/libs/git/hook/pool/worker.rs index 199067a..088eea3 100644 --- a/libs/git/hook/pool/worker.rs +++ b/libs/git/hook/pool/worker.rs @@ -6,8 +6,8 @@ use crate::hook::sync::HookMetaDataSync; use db::cache::AppCache; use db::database::AppDatabase; use metrics::counter; -use models::repos::repo_tag; use models::EntityTrait; +use models::repos::repo_tag; use sea_orm::{ColumnTrait, QueryFilter}; use std::sync::Arc; use std::time::Duration; @@ -100,8 +100,12 @@ impl HookWorker { work_key: &str, queue_key: &str, ) { - tracing::info!("task started task_id={} task_type={} repo_id={}", - task.id, task.task_type, task.repo_id); + tracing::info!( + "task started task_id={} task_type={} repo_id={}", + task.id, + task.task_type, + task.repo_id + ); counter!("hook_tasks_total", "task_type" => task.task_type.to_string()).increment(1); @@ -113,7 +117,8 @@ impl HookWorker { match result { Ok(()) => { - counter!("hook_tasks_success_total", "task_type" => task.task_type.to_string()).increment(1); + counter!("hook_tasks_success_total", "task_type" => task.task_type.to_string()) + .increment(1); if let Err(e) = self.consumer.ack(work_key, task_json).await { tracing::warn!("failed to ack task: {}", e); } @@ -125,19 +130,31 @@ impl HookWorker { if is_locked { counter!("hook_tasks_locked_total").increment(1); // Another worker holds the lock — requeue without counting as retry. - tracing::info!("repo locked by another worker, requeueing task_id={}", task.id); + tracing::info!( + "repo locked by another worker, requeueing task_id={}", + task.id + ); if let Err(nak_err) = self.consumer.nak(work_key, queue_key, task_json).await { tracing::warn!("failed to requeue locked task: {}", nak_err); } } else { - counter!("hook_tasks_failed_total", "task_type" => task.task_type.to_string()).increment(1); - tracing::warn!("task failed task_id={} task_type={} repo_id={} error={}", - task.id, task.task_type, task.repo_id, e); + counter!("hook_tasks_failed_total", "task_type" => task.task_type.to_string()) + .increment(1); + tracing::warn!( + "task failed task_id={} task_type={} repo_id={} error={}", + task.id, + task.task_type, + task.repo_id, + e + ); if task.retry_count >= self.max_retries { counter!("hook_tasks_exhausted_total").increment(1); - tracing::warn!("task exhausted retries, discarding task_id={} retry_count={}", - task.id, task.retry_count); + tracing::warn!( + "task exhausted retries, discarding task_id={} retry_count={}", + task.id, + task.retry_count + ); let _ = self.consumer.ack(work_key, task_json).await; } else { counter!("hook_tasks_retried_total").increment(1); @@ -197,9 +214,8 @@ impl HookWorker { // Run full sync (internally acquires/releases per-repo lock) let sync_clone = sync.clone(); tokio::task::spawn_blocking(move || { - let result = tokio::runtime::Handle::current().block_on(async { - sync_clone.sync().await - }); + let result = + tokio::runtime::Handle::current().block_on(async { sync_clone.sync().await }); match result { Ok(()) => Ok::<(), GitError>(()), Err(e) => Err(GitError::Internal(e.to_string())), diff --git a/libs/git/hook/sync/commit.rs b/libs/git/hook/sync/commit.rs index 985027a..8df02b6 100644 --- a/libs/git/hook/sync/commit.rs +++ b/libs/git/hook/sync/commit.rs @@ -91,7 +91,11 @@ impl HookMetaDataSync { } else if is_branch && !is_remote { // Try to get upstream branch name from the reference's upstream target let upstream: Option = if reference.target().is_some() { - if let Ok(branch) = self.domain.repo().find_branch(&name, git2::BranchType::Local) { + if let Ok(branch) = self + .domain + .repo() + .find_branch(&name, git2::BranchType::Local) + { if let Ok(upstream_ref) = branch.upstream() { if let Some(upstream_name) = upstream_ref.name().ok().flatten() { Some(upstream_name.to_string()) @@ -132,8 +136,7 @@ impl HookMetaDataSync { .all(txn) .await .map_err(|e| GitError::IoError(format!("failed to query branches: {}", e)))?; - let mut existing_names: HashSet = - existing.iter().map(|r| r.name.clone()).collect(); + let mut existing_names: HashSet = existing.iter().map(|r| r.name.clone()).collect(); let (branches, _) = self.collect_git_refs()?; @@ -155,7 +158,10 @@ impl HookMetaDataSync { if current_default.is_none() { // Prefer known branch names over first-come for preferred in PREFERRED_BRANCHES { - if branches.iter().any(|b| b.shorthand == *preferred && b.is_branch && !b.is_remote) { + if branches + .iter() + .any(|b| b.shorthand == *preferred && b.is_branch && !b.is_remote) + { auto_detected_branch = Some(ToString::to_string(preferred)); break; } diff --git a/libs/git/hook/sync/mod.rs b/libs/git/hook/sync/mod.rs index ae1c1d4..1c2df49 100644 --- a/libs/git/hook/sync/mod.rs +++ b/libs/git/hook/sync/mod.rs @@ -8,11 +8,11 @@ pub mod tag; use db::cache::AppCache; use db::database::AppDatabase; +use models::ActiveModelTrait; +use models::RepoId; use models::projects::project_skill::ActiveModel as SkillActiveModel; use models::projects::project_skill::{Column as SkillCol, Entity as SkillEntity}; use models::repos::repo::Model as RepoModel; -use models::ActiveModelTrait; -use models::RepoId; use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, Set}; use std::collections::HashMap; use std::path::Path; @@ -160,7 +160,9 @@ fn scan_skills_from_tree( match entry.kind() { Some(git2::ObjectType::Tree) => { if !name.starts_with('.') { - if let Ok(subtree) = entry.to_object(git_repo).and_then(|o| o.peel_to_tree()) { + if let Ok(subtree) = + entry.to_object(git_repo).and_then(|o| o.peel_to_tree()) + { stack.push((subtree, entry_path)); } } @@ -406,16 +408,19 @@ impl HookMetaDataSync { }; // Deduplicate by {repo_id}+{blob_hash}, keep latest by commit_sha - let mut deduped: std::collections::HashMap = std::collections::HashMap::new(); + let mut deduped: std::collections::HashMap = + std::collections::HashMap::new(); for skill in discovered { let key = if let Some(ref hash) = skill.blob_hash { - format!("{}:{}", self.repo.id, hash) - } else { - format!("{}:{}:slug", self.repo.id, skill.slug) - }; + format!("{}:{}", self.repo.id, hash) + } else { + format!("{}:{}:slug", self.repo.id, skill.slug) + }; match deduped.get(&key) { Some(existing) => { - if skill.commit_sha.as_ref().unwrap_or(&String::new()) > existing.commit_sha.as_ref().unwrap_or(&String::new()) { + if skill.commit_sha.as_ref().unwrap_or(&String::new()) + > existing.commit_sha.as_ref().unwrap_or(&String::new()) + { deduped.insert(key, skill); } } @@ -428,7 +433,11 @@ impl HookMetaDataSync { let existing_by_hash: HashMap<_, _> = existing .into_iter() .map(|s| { - let key = format!("{}:{}", s.repo_id.unwrap_or_default(), s.blob_hash.clone().unwrap_or_default()); + let key = format!( + "{}:{}", + s.repo_id.unwrap_or_default(), + s.blob_hash.clone().unwrap_or_default() + ); (key, s) }) .collect(); diff --git a/libs/git/hook/sync/tag.rs b/libs/git/hook/sync/tag.rs index e96accb..d7b9bf4 100644 --- a/libs/git/hook/sync/tag.rs +++ b/libs/git/hook/sync/tag.rs @@ -1,6 +1,6 @@ -use crate::hook::sync::commit::TagTip; use crate::GitError; use crate::hook::sync::HookMetaDataSync; +use crate::hook::sync::commit::TagTip; use db::database::AppTransaction; use models::repos::repo_tag; use sea_orm::prelude::Expr; @@ -79,9 +79,15 @@ impl HookMetaDataSync { .filter(repo_tag::Column::Repo.eq(repo_id)) .filter(repo_tag::Column::Name.eq(&tag.name)) .col_expr(repo_tag::Column::Oid, Expr::value(&tag.target_oid)) - .col_expr(repo_tag::Column::Description, Expr::value(tag.description.clone())) + .col_expr( + repo_tag::Column::Description, + Expr::value(tag.description.clone()), + ) .col_expr(repo_tag::Column::TaggerName, Expr::value(&tag.tagger_name)) - .col_expr(repo_tag::Column::TaggerEmail, Expr::value(&tag.tagger_email)) + .col_expr( + repo_tag::Column::TaggerEmail, + Expr::value(&tag.tagger_email), + ) .exec(txn) .await .map_err(|e| GitError::IoError(format!("failed to update tag: {}", e)))?; diff --git a/libs/git/hook/webhook_dispatch.rs b/libs/git/hook/webhook_dispatch.rs index c502adb..fa083dc 100644 --- a/libs/git/hook/webhook_dispatch.rs +++ b/libs/git/hook/webhook_dispatch.rs @@ -309,15 +309,28 @@ pub async fn dispatch_repo_webhooks( .await { Ok(Ok(())) => { - tracing::info!("push webhook delivered webhook_id={} url={}", webhook_id, url); + tracing::info!( + "push webhook delivered webhook_id={} url={}", + webhook_id, + url + ); let _ = touch_webhook(db, webhook_id, true).await; } Ok(Err(e)) => { - tracing::warn!("push webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e); + tracing::warn!( + "push webhook delivery failed webhook_id={} url={} error={}", + webhook_id, + url, + e + ); let _ = touch_webhook(db, webhook_id, false).await; } Err(_) => { - tracing::warn!("push webhook timed out webhook_id={} url={}", webhook_id, url); + tracing::warn!( + "push webhook timed out webhook_id={} url={}", + webhook_id, + url + ); let _ = touch_webhook(db, webhook_id, false).await; } } @@ -363,15 +376,28 @@ pub async fn dispatch_repo_webhooks( .await { Ok(Ok(())) => { - tracing::info!("tag webhook delivered webhook_id={} url={}", webhook_id, url); + tracing::info!( + "tag webhook delivered webhook_id={} url={}", + webhook_id, + url + ); let _ = touch_webhook(db, webhook_id, true).await; } Ok(Err(e)) => { - tracing::warn!("tag webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e); + tracing::warn!( + "tag webhook delivery failed webhook_id={} url={} error={}", + webhook_id, + url, + e + ); let _ = touch_webhook(db, webhook_id, false).await; } Err(_) => { - tracing::warn!("tag webhook timed out webhook_id={} url={}", webhook_id, url); + tracing::warn!( + "tag webhook timed out webhook_id={} url={}", + webhook_id, + url + ); let _ = touch_webhook(db, webhook_id, false).await; } } @@ -380,10 +406,14 @@ pub async fn dispatch_repo_webhooks( } } -async fn touch_webhook(db: &AppDatabase, webhook_id: i64, success: bool) -> Result<(), sea_orm::DbErr> { +async fn touch_webhook( + db: &AppDatabase, + webhook_id: i64, + success: bool, +) -> Result<(), sea_orm::DbErr> { use models::repos::repo_webhook::{Column as RwCol, Entity as RepoWebhookEntity}; use models::{ColumnTrait, EntityTrait, QueryFilter}; - use sea_orm::{sea_query::Expr, ExprTrait}; + use sea_orm::{ExprTrait, sea_query::Expr}; let result: Result = if success { RepoWebhookEntity::update_many() diff --git a/libs/git/http/handler.rs b/libs/git/http/handler.rs index 4b18a6c..7a27098 100644 --- a/libs/git/http/handler.rs +++ b/libs/git/http/handler.rs @@ -98,7 +98,12 @@ impl GitHttpHandler { mut payload: web::Payload, ) -> Result { let started = Instant::now(); - tracing::info!("git_rpc_started service={} repo={} repo_id={}", service, self.repo.repo_name, self.repo.id.to_string()); + tracing::info!( + "git_rpc_started service={} repo={} repo_id={}", + service, + self.repo.repo_name, + self.repo.id.to_string() + ); let mut child = tokio::process::Command::new("git") .arg(service) .arg("--stateless-rpc") @@ -140,7 +145,12 @@ impl GitHttpHandler { // Reject oversized pre-PACK data to prevent memory exhaustion if pre_pack.len() + bytes.len() > PRE_PACK_LIMIT { - tracing::warn!("git_rpc_payload_too_large service={} repo={} repo_id={}", service, self.repo.repo_name, self.repo.id.to_string()); + tracing::warn!( + "git_rpc_payload_too_large service={} repo={} repo_id={}", + service, + self.repo.repo_name, + self.repo.id.to_string() + ); return Err(actix_web::error::ErrorPayloadTooLarge(format!( "Ref negotiation exceeds {} byte limit", PRE_PACK_LIMIT @@ -151,7 +161,12 @@ impl GitHttpHandler { pre_pack.extend_from_slice(&bytes[..pos]); if let Err(msg) = check_branch_protection(&branch_protects, &pre_pack) { - tracing::warn!("branch_protection_violation repo={} repo_id={} message={}", self.repo.repo_name, self.repo.id.to_string(), msg); + tracing::warn!( + "branch_protection_violation repo={} repo_id={} message={}", + self.repo.repo_name, + self.repo.id.to_string(), + msg + ); return Err(actix_web::error::ErrorForbidden(msg)); } @@ -212,7 +227,14 @@ impl GitHttpHandler { if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); let ms = started.elapsed().as_millis() as u64; - tracing::error!("git_rpc_failed service={} repo={} repo_id={} duration_ms={} stderr={}", service, self.repo.repo_name, self.repo.id.to_string(), ms, stderr.to_string()); + tracing::error!( + "git_rpc_failed service={} repo={} repo_id={} duration_ms={} stderr={}", + service, + self.repo.repo_name, + self.repo.id.to_string(), + ms, + stderr.to_string() + ); return Err(actix_web::error::ErrorInternalServerError(format!( "Git command failed: {}", stderr @@ -220,7 +242,14 @@ impl GitHttpHandler { } let ms = started.elapsed().as_millis() as u64; - tracing::info!("git_rpc_completed service={} repo={} repo_id={} duration_ms={} bytes_out={}", service, self.repo.repo_name, self.repo.id.to_string(), ms, output.stdout.len()); + tracing::info!( + "git_rpc_completed service={} repo={} repo_id={} duration_ms={} bytes_out={}", + service, + self.repo.repo_name, + self.repo.id.to_string(), + ms, + output.stdout.len() + ); Ok(HttpResponse::Ok() .content_type(format!("application/x-git-{}-result", service)) diff --git a/libs/git/http/lfs.rs b/libs/git/http/lfs.rs index 18f3586..9e9748b 100644 --- a/libs/git/http/lfs.rs +++ b/libs/git/http/lfs.rs @@ -294,8 +294,13 @@ impl LfsHandler { let token = uuid::Uuid::now_v7().to_string(); crate::http::lfs_routes::store_lfs_token( - cache, &token, self.model.id, user_uid, "upload", - ).await; + cache, + &token, + self.model.id, + user_uid, + "upload", + ) + .await; let mut headers = HashMap::new(); headers.insert("authorization".to_string(), format!("Bearer {}", token)); @@ -320,8 +325,13 @@ impl LfsHandler { let token = uuid::Uuid::now_v7().to_string(); crate::http::lfs_routes::store_lfs_token( - cache, &token, self.model.id, user_uid, "download", - ).await; + cache, + &token, + self.model.id, + user_uid, + "download", + ) + .await; let mut headers = HashMap::new(); headers.insert("authorization".to_string(), format!("Bearer {}", token)); @@ -461,10 +471,7 @@ impl LfsHandler { Ok(HttpResponse::Ok().finish()) } - pub async fn download_object( - &self, - oid: &str, - ) -> Result { + pub async fn download_object(&self, oid: &str) -> Result { if !is_valid_lfs_oid(oid) { return Err(GitError::InvalidOid(format!("Invalid OID format: {}", oid))); } @@ -481,7 +488,10 @@ impl LfsHandler { let expected_base = self.get_lfs_storage_path(); let obj_path = PathBuf::from(&obj.storage_path); if !obj_path.starts_with(&expected_base) { - tracing::error!("LFS object path outside storage directory: {}", obj.storage_path); + tracing::error!( + "LFS object path outside storage directory: {}", + obj.storage_path + ); return Err(GitError::AuthFailed("Invalid object path".to_string())); } diff --git a/libs/git/http/lfs_routes.rs b/libs/git/http/lfs_routes.rs index b35e1d1..cceb3f0 100644 --- a/libs/git/http/lfs_routes.rs +++ b/libs/git/http/lfs_routes.rs @@ -36,10 +36,7 @@ fn hash_token(token: &str) -> Result { } /// Derive the acting user from the authenticated bearer token. -async fn user_uid( - req: &HttpRequest, - db: &db::database::AppDatabase, -) -> Result { +async fn user_uid(req: &HttpRequest, db: &db::database::AppDatabase) -> Result { let auth_header = req .headers() .get("authorization") @@ -61,8 +58,8 @@ async fn user_uid( .await .map_err(|_| actix_web::error::ErrorUnauthorized("Authentication failed"))?; - let token_model = token_model - .ok_or_else(|| actix_web::error::ErrorUnauthorized("Invalid token"))?; + let token_model = + token_model.ok_or_else(|| actix_web::error::ErrorUnauthorized("Invalid token"))?; if let Some(expires_at) = token_model.expires_at { if expires_at < chrono::Utc::now() { @@ -124,7 +121,9 @@ async fn validate_lfs_token( let operation = parts[2]; if repo_id != expected_repo_id { - return Err(actix_web::error::ErrorUnauthorized("Token not valid for this repo")); + return Err(actix_web::error::ErrorUnauthorized( + "Token not valid for this repo", + )); } if operation != expected_operation { return Err(actix_web::error::ErrorUnauthorized( @@ -133,7 +132,8 @@ async fn validate_lfs_token( } // Consume the token (one-time use) - let _: Result<(), redis::RedisError> = conn.del(format!("lfs:token:{}", token)).await; + let _: Result<(), redis::RedisError> = + conn.del(format!("lfs:token:{}", token)).await; return Ok(user_uid); } @@ -252,7 +252,9 @@ pub async fn lfs_download( Ok(response) => Ok(response), Err(GitError::NotFound(_)) => Err(actix_web::error::ErrorNotFound("Object not found")), Err(GitError::AuthFailed(_)) => Err(actix_web::error::ErrorUnauthorized("Unauthorized")), - Err(_e) => Err(actix_web::error::ErrorInternalServerError("Download failed")), + Err(_e) => Err(actix_web::error::ErrorInternalServerError( + "Download failed", + )), } } @@ -295,7 +297,9 @@ pub async fn lfs_lock_list( match handler.list_locks(maybe_oid).await { Ok(list) => Ok(HttpResponse::Ok().json(list)), - Err(_e) => Err(actix_web::error::ErrorInternalServerError("Lock list failed")), + Err(_e) => Err(actix_web::error::ErrorInternalServerError( + "Lock list failed", + )), } } @@ -317,7 +321,9 @@ pub async fn lfs_lock_get( match handler.get_lock(&lock_path).await { Ok(lock) => Ok(HttpResponse::Ok().json(lock)), Err(GitError::NotFound(_)) => Err(actix_web::error::ErrorNotFound("Lock not found")), - Err(_e) => Err(actix_web::error::ErrorInternalServerError("Lock get failed")), + Err(_e) => Err(actix_web::error::ErrorInternalServerError( + "Lock get failed", + )), } } @@ -337,6 +343,8 @@ pub async fn lfs_lock_delete( Ok(()) => Ok(HttpResponse::NoContent().finish()), Err(GitError::PermissionDenied(_)) => Err(actix_web::error::ErrorForbidden("Not allowed")), Err(GitError::NotFound(_)) => Err(actix_web::error::ErrorNotFound("Lock not found")), - Err(_e) => Err(actix_web::error::ErrorInternalServerError("Lock delete failed")), + Err(_e) => Err(actix_web::error::ErrorInternalServerError( + "Lock delete failed", + )), } -} \ No newline at end of file +} diff --git a/libs/git/http/mod.rs b/libs/git/http/mod.rs index 6c02a60..b1db305 100644 --- a/libs/git/http/mod.rs +++ b/libs/git/http/mod.rs @@ -1,5 +1,5 @@ use crate::hook::HookService; -use actix_web::{App, HttpServer, HttpResponse, web}; +use actix_web::{App, HttpResponse, HttpServer, web}; use config::AppConfig; use db::cache::AppCache; use db::database::AppDatabase; @@ -71,45 +71,45 @@ pub fn git_http_cfg(cfg: &mut web::ServiceConfig) { cfg.route("/robots.txt", web::get().to(robots)) .route("/health", web::get().to(health)) .route( - "/{namespace}/{repo_name}.git/info/refs", - web::get().to(routes::info_refs), - ) - .route( - "/{namespace}/{repo_name}.git/git-upload-pack", - web::post().to(routes::upload_pack), - ) - .route( - "/{namespace}/{repo_name}.git/git-receive-pack", - web::post().to(routes::receive_pack), - ) - .route( - "/{namespace}/{repo_name}.git/info/lfs/objects/batch", - web::post().to(lfs_routes::lfs_batch), - ) - .route( - "/{namespace}/{repo_name}.git/info/lfs/objects/{oid}", - web::put().to(lfs_routes::lfs_upload), - ) - .route( - "/{namespace}/{repo_name}.git/info/lfs/objects/{oid}", - web::get().to(lfs_routes::lfs_download), - ) - .route( - "/{namespace}/{repo_name}.git/info/lfs/locks", - web::post().to(lfs_routes::lfs_lock_create), - ) - .route( - "/{namespace}/{repo_name}.git/info/lfs/locks", - web::get().to(lfs_routes::lfs_lock_list), - ) - .route( - "/{namespace}/{repo_name}.git/info/lfs/locks/{id}", - web::get().to(lfs_routes::lfs_lock_get), - ) - .route( - "/{namespace}/{repo_name}.git/info/lfs/locks/{id}", - web::delete().to(lfs_routes::lfs_lock_delete), - ); + "/{namespace}/{repo_name}.git/info/refs", + web::get().to(routes::info_refs), + ) + .route( + "/{namespace}/{repo_name}.git/git-upload-pack", + web::post().to(routes::upload_pack), + ) + .route( + "/{namespace}/{repo_name}.git/git-receive-pack", + web::post().to(routes::receive_pack), + ) + .route( + "/{namespace}/{repo_name}.git/info/lfs/objects/batch", + web::post().to(lfs_routes::lfs_batch), + ) + .route( + "/{namespace}/{repo_name}.git/info/lfs/objects/{oid}", + web::put().to(lfs_routes::lfs_upload), + ) + .route( + "/{namespace}/{repo_name}.git/info/lfs/objects/{oid}", + web::get().to(lfs_routes::lfs_download), + ) + .route( + "/{namespace}/{repo_name}.git/info/lfs/locks", + web::post().to(lfs_routes::lfs_lock_create), + ) + .route( + "/{namespace}/{repo_name}.git/info/lfs/locks", + web::get().to(lfs_routes::lfs_lock_list), + ) + .route( + "/{namespace}/{repo_name}.git/info/lfs/locks/{id}", + web::get().to(lfs_routes::lfs_lock_get), + ) + .route( + "/{namespace}/{repo_name}.git/info/lfs/locks/{id}", + web::delete().to(lfs_routes::lfs_lock_delete), + ); } pub async fn run_http(config: AppConfig) -> anyhow::Result<()> { diff --git a/libs/git/http/rate_limit.rs b/libs/git/http/rate_limit.rs index e56a759..fe9521f 100644 --- a/libs/git/http/rate_limit.rs +++ b/libs/git/http/rate_limit.rs @@ -58,13 +58,21 @@ impl RateLimiter { } pub async fn is_read_allowed(&self) -> bool { - self.is_allowed("global:read", BucketOp::Read, self.config.read_requests_per_window) - .await + self.is_allowed( + "global:read", + BucketOp::Read, + self.config.read_requests_per_window, + ) + .await } pub async fn is_write_allowed(&self) -> bool { - self.is_allowed("global:write", BucketOp::Write, self.config.write_requests_per_window) - .await + self.is_allowed( + "global:write", + BucketOp::Write, + self.config.write_requests_per_window, + ) + .await } pub async fn is_repo_write_allowed(&self, repo_path: &str) -> bool { diff --git a/libs/git/http/utils.rs b/libs/git/http/utils.rs index c9eb6ce..5fb429b 100644 --- a/libs/git/http/utils.rs +++ b/libs/git/http/utils.rs @@ -1,6 +1,6 @@ use actix_web::{Error, HttpRequest}; -use argon2::password_hash::{SaltString, PasswordHasher}; use argon2::Argon2; +use argon2::password_hash::{PasswordHasher, SaltString}; use base64::Engine; use base64::engine::general_purpose::STANDARD; use db::database::AppDatabase; diff --git a/libs/git/lib.rs b/libs/git/lib.rs index ddefdb2..e85cc1b 100644 --- a/libs/git/lib.rs +++ b/libs/git/lib.rs @@ -36,9 +36,9 @@ pub use diff::types::{ }; pub use domain::GitDomain; pub use error::{GitError, GitResult}; -pub use hook::pool::types::{HookTask, TaskType}; -pub use hook::pool::PoolConfig; pub use hook::pool::HookWorker; +pub use hook::pool::PoolConfig; +pub use hook::pool::types::{HookTask, TaskType}; pub use hook::sync::HookMetaDataSync; pub use lfs::types::{LfsConfig, LfsEntry, LfsOid, LfsPointer}; pub use merge::types::{MergeAnalysisResult, MergeOptions, MergePreferenceResult, MergeheadInfo}; diff --git a/libs/git/ssh/authz.rs b/libs/git/ssh/authz.rs index 3b888dd..c137ea6 100644 --- a/libs/git/ssh/authz.rs +++ b/libs/git/ssh/authz.rs @@ -111,7 +111,10 @@ impl SshAuthService { } else { fingerprint.clone() }; - tracing::info!("looking up user with SSH key fingerprint={}", fingerprint_preview); + tracing::info!( + "looking up user with SSH key fingerprint={}", + fingerprint_preview + ); let ssh_key = user_ssh_key::Entity::find() .filter(user_ssh_key::Column::Fingerprint.eq(&fingerprint)) @@ -128,7 +131,11 @@ impl SshAuthService { }; if self.is_key_expired(&ssh_key) { - tracing::warn!("SSH key expired key_id={} expires_at={:?}", ssh_key.id, ssh_key.expires_at); + tracing::warn!( + "SSH key expired key_id={} expires_at={:?}", + ssh_key.id, + ssh_key.expires_at + ); return Ok(None); } @@ -138,7 +145,11 @@ impl SshAuthService { .await?; if let Some(ref user) = user_model { - tracing::info!("user authenticated via SSH key user={} key={}", user.username, ssh_key.title); + tracing::info!( + "user authenticated via SSH key user={} key={}", + user.username, + ssh_key.title + ); self.update_key_last_used_async(ssh_key.id); } @@ -158,15 +169,16 @@ impl SshAuthService { let db_clone = self.db.clone(); tokio::spawn(async move { if let Err(e) = Self::update_key_last_used_sync(db_clone, key_id).await { - tracing::warn!("failed to update key last_used key_id={} error={}", key_id, e); + tracing::warn!( + "failed to update key last_used key_id={} error={}", + key_id, + e + ); } }); } - async fn update_key_last_used_sync( - db: AppDatabase, - key_id: i64, - ) -> Result<(), DbErr> { + async fn update_key_last_used_sync(db: AppDatabase, key_id: i64) -> Result<(), DbErr> { let key = user_ssh_key::Entity::find_by_id(key_id) .one(db.reader()) .await?; @@ -191,7 +203,11 @@ impl SshAuthService { is_write: bool, ) -> bool { if repo.created_by == user.uid { - tracing::info!("user is repo owner user={} repo={}", user.username, repo.repo_name); + tracing::info!( + "user is repo owner user={} repo={}", + user.username, + repo.repo_name + ); return true; } @@ -205,7 +221,11 @@ impl SshAuthService { .await .unwrap_or(false) { - tracing::info!("user has collaborator access user={} repo={}", user.username, repo.repo_name); + tracing::info!( + "user has collaborator access user={} repo={}", + user.username, + repo.repo_name + ); return true; } @@ -215,11 +235,20 @@ impl SshAuthService { .await .unwrap_or(false) { - tracing::info!("user has project member access user={} repo={}", user.username, repo.repo_name); + tracing::info!( + "user has project member access user={} repo={}", + user.username, + repo.repo_name + ); return true; } - tracing::warn!("access denied user={} repo={} is_write={}", user.username, repo.repo_name, is_write); + tracing::warn!( + "access denied user={} repo={} is_write={}", + user.username, + repo.repo_name, + is_write + ); false } diff --git a/libs/git/ssh/branch_protect.rs b/libs/git/ssh/branch_protect.rs index 173f0f6..6591dcd 100644 --- a/libs/git/ssh/branch_protect.rs +++ b/libs/git/ssh/branch_protect.rs @@ -5,8 +5,7 @@ use models::repos::repo_branch_protect; /// (e.g. "refs/heads/main" matches "refs/heads/main" and "refs/heads/main/*" /// but NOT "refs/heads/main-v2"). fn ref_matches_protection(ref_name: &str, protection_branch: &str) -> bool { - ref_name == protection_branch - || ref_name.starts_with(&format!("{}/", protection_branch)) + ref_name == protection_branch || ref_name.starts_with(&format!("{}/", protection_branch)) } /// Granular branch protection check (same logic as HTTP handler). diff --git a/libs/git/ssh/forward.rs b/libs/git/ssh/forward.rs index 7bfb4f5..2ed144c 100644 --- a/libs/git/ssh/forward.rs +++ b/libs/git/ssh/forward.rs @@ -1,5 +1,5 @@ -use russh::server::Handle; use russh::ChannelId; +use russh::server::Handle; use std::future::Future; use std::time::Duration; use tokio::io::{AsyncRead, AsyncReadExt}; diff --git a/libs/git/ssh/git_service.rs b/libs/git/ssh/git_service.rs index 50869c4..b0e21b8 100644 --- a/libs/git/ssh/git_service.rs +++ b/libs/git/ssh/git_service.rs @@ -53,9 +53,15 @@ pub fn build_git_command(service: GitService, path: PathBuf) -> tokio::process:: cmd.current_dir(cwd); match service { - GitService::UploadPack => { cmd.arg("upload-pack"); } - GitService::ReceivePack => { cmd.arg("receive-pack"); } - GitService::UploadArchive => { cmd.arg("upload-archive"); } + GitService::UploadPack => { + cmd.arg("upload-pack"); + } + GitService::ReceivePack => { + cmd.arg("receive-pack"); + } + GitService::UploadArchive => { + cmd.arg("upload-archive"); + } } cmd.arg(".") diff --git a/libs/git/ssh/handle.rs b/libs/git/ssh/handle.rs index 651be41..7b87fda 100644 --- a/libs/git/ssh/handle.rs +++ b/libs/git/ssh/handle.rs @@ -2,6 +2,10 @@ use crate::ssh::ReceiveSyncService; use crate::ssh::RepoReceiveSyncTask; use crate::ssh::SshTokenService; use crate::ssh::authz::SshAuthService; +use crate::ssh::branch_protect::check_branch_protection; +use crate::ssh::forward::forward; +use crate::ssh::git_service::{GitService, build_git_command, parse_git_command, parse_repo_path}; +use crate::ssh::ref_update::RefUpdate; use db::cache::AppCache; use db::database::AppDatabase; use models::repos::{repo, repo_branch_protect}; @@ -9,11 +13,6 @@ use models::users::user; use russh::keys::{Certificate, PublicKey}; use russh::server::{Auth, Msg, Session}; use russh::{Channel, ChannelId, Disconnect}; -use crate::ssh::ref_update::RefUpdate; -use crate::ssh::git_service::{GitService, parse_git_command, parse_repo_path, build_git_command}; -use crate::ssh::branch_protect::check_branch_protection; -use crate::ssh::forward::forward; -use tokio_util::bytes::Bytes; use sea_orm::ColumnTrait; use sea_orm::EntityTrait; use sea_orm::QueryFilter; @@ -23,6 +22,7 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::process::Stdio; use std::time::Duration; +use tokio_util::bytes::Bytes; const PRE_PACK_LIMIT: usize = 1_048_576; use tokio::io::AsyncWriteExt; @@ -131,7 +131,6 @@ impl russh::server::Handler for SSHandle { .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); - if token.is_empty() { tracing::warn!("auth_rejected_empty_token client={}", client_info); return Err(russh::Error::NotAuthenticated); @@ -151,7 +150,11 @@ impl russh::server::Handler for SSHandle { } }; - tracing::info!("auth_token_success user={} client={}", user_model.username, client_info); + tracing::info!( + "auth_token_success user={} client={}", + user_model.username, + client_info + ); self.operator = Some(user_model); Ok(Auth::Accept) } @@ -173,12 +176,19 @@ impl russh::server::Handler for SSHandle { .unwrap_or_else(|| "unknown".to_string()); if user != "git" { - tracing::warn!("auth_rejected_invalid_username user={} client={}", user, client_info); + tracing::warn!( + "auth_rejected_invalid_username user={} client={}", + user, + client_info + ); return Err(russh::Error::NotAuthenticated); } let public_key_str = public_key.to_string(); if public_key_str.len() < 32 { - tracing::warn!("auth_rejected_invalid_key_length key_length={}", public_key_str.len()); + tracing::warn!( + "auth_rejected_invalid_key_length key_length={}", + public_key_str.len() + ); return Err(russh::Error::NotAuthenticated); } @@ -195,7 +205,11 @@ impl russh::server::Handler for SSHandle { } }; - tracing::info!("auth_publickey_success user={} client={}", user_model.username, client_info); + tracing::info!( + "auth_publickey_success user={} client={}", + user_model.username, + client_info + ); self.operator = Some(user_model); Ok(Auth::Accept) } @@ -210,12 +224,19 @@ impl russh::server::Handler for SSHandle { .unwrap_or_else(|| "unknown".to_string()); if user != "git" { - tracing::warn!("auth_rejected_invalid_username user={} client={}", user, client_info); + tracing::warn!( + "auth_rejected_invalid_username user={} client={}", + user, + client_info + ); return Err(russh::Error::NotAuthenticated); } let public_key_str = certificate.to_string(); if public_key_str.len() < 32 { - tracing::warn!("auth_rejected_invalid_key_length key_length={}", public_key_str.len()); + tracing::warn!( + "auth_rejected_invalid_key_length key_length={}", + public_key_str.len() + ); return Err(russh::Error::NotAuthenticated); } @@ -232,7 +253,11 @@ impl russh::server::Handler for SSHandle { } }; - tracing::info!("auth_publickey_success user={} client={}", user_model.username, client_info); + tracing::info!( + "auth_publickey_success user={} client={}", + user_model.username, + client_info + ); self.operator = Some(user_model); Ok(Auth::Accept) } @@ -245,7 +270,11 @@ impl russh::server::Handler for SSHandle { channel: ChannelId, _: &mut Session, ) -> Result<(), Self::Error> { - tracing::info!("channel_close channel={:?} client={:?}", channel, self.client_addr); + tracing::info!( + "channel_close channel={:?} client={:?}", + channel, + self.client_addr + ); self.cleanup_channel(channel); Ok(()) } @@ -255,14 +284,22 @@ impl russh::server::Handler for SSHandle { channel: ChannelId, _: &mut Session, ) -> Result<(), Self::Error> { - tracing::info!("channel_eof channel={:?} client={:?}", channel, self.client_addr); + tracing::info!( + "channel_eof channel={:?} client={:?}", + channel, + self.client_addr + ); if let Some(eof) = self.eof.get(&channel) { let _ = eof.send(true).await; } if let Some(mut stdin) = self.stdin.remove(&channel) { - tracing::info!("Closing stdin channel={:?} client={:?}", channel, self.client_addr); + tracing::info!( + "Closing stdin channel={:?} client={:?}", + channel, + self.client_addr + ); // Use timeout so we never block the SSH event loop waiting for git. let _ = tokio::time::timeout(Duration::from_secs(5), async { if let Err(e) = stdin.flush().await { @@ -271,9 +308,17 @@ impl russh::server::Handler for SSHandle { let _ = stdin.shutdown().await; }) .await; - tracing::info!("stdin closed channel={:?} client={:?}", channel, self.client_addr); + tracing::info!( + "stdin closed channel={:?} client={:?}", + channel, + self.client_addr + ); } else { - tracing::warn!("stdin already removed channel={:?} client={:?}", channel, self.client_addr); + tracing::warn!( + "stdin already removed channel={:?} client={:?}", + channel, + self.client_addr + ); } Ok(()) @@ -288,7 +333,11 @@ impl russh::server::Handler for SSHandle { .client_addr .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); - tracing::info!("channel_open_session channel={:?} client={}", channel, client_info); + tracing::info!( + "channel_open_session channel={:?} client={}", + channel, + client_info + ); if let Err(e) = session.flush() { tracing::warn!(error = %e, "ssh_session_flush_failed"); } @@ -306,7 +355,13 @@ impl russh::server::Handler for SSHandle { _modes: &[(russh::Pty, u32)], session: &mut Session, ) -> Result<(), Self::Error> { - tracing::warn!("pty_request not supported channel={:?} term={} cols={} rows={}", channel, term, col_width, row_height); + tracing::warn!( + "pty_request not supported channel={:?} term={} cols={} rows={}", + channel, + term, + col_width, + row_height + ); if let Err(e) = session.flush() { tracing::warn!(error = %e, "ssh_session_flush_failed"); } @@ -341,11 +396,8 @@ impl russh::server::Handler for SSHandle { if bf.len() + data.len() > PRE_PACK_LIMIT { tracing::warn!("ssh_pre_pack_too_large channel={:?}", channel); let msg = "remote: Ref negotiation exceeds size limit\r\n"; - let _ = session.extended_data( - channel, - 1, - Bytes::copy_from_slice(msg.as_bytes()), - ); + let _ = + session.extended_data(channel, 1, Bytes::copy_from_slice(msg.as_bytes())); let _ = session.exit_status_request(channel, 1); let _ = session.eof(channel); let _ = session.close(channel); @@ -376,8 +428,7 @@ impl russh::server::Handler for SSHandle { if let Some(msg) = check_branch_protection(&branch_protect_roles, r#ref) { - let full_msg = - format!("remote: {}\r\n", msg); + let full_msg = format!("remote: {}\r\n", msg); let _ = session.extended_data( channel, 1, @@ -444,8 +495,7 @@ impl russh::server::Handler for SSHandle { ); tracing::info!("shell_request user={}", user.username); - let _ = session - .data(channel_id, Bytes::copy_from_slice(welcome_msg.as_bytes())); + let _ = session.data(channel_id, Bytes::copy_from_slice(welcome_msg.as_bytes())); let _ = session.exit_status_request(channel_id, 0); let _ = session.eof(channel_id); let _ = session.close(channel_id); @@ -453,8 +503,7 @@ impl russh::server::Handler for SSHandle { } else { tracing::warn!("shell_request_unauthenticated channel={:?}", channel_id); let msg = "Authentication required\r\n"; - let _ = session - .data(channel_id, Bytes::copy_from_slice(msg.as_bytes())); + let _ = session.data(channel_id, Bytes::copy_from_slice(msg.as_bytes())); let _ = session.exit_status_request(channel_id, 1); let _ = session.eof(channel_id); let _ = session.close(channel_id); @@ -473,18 +522,21 @@ impl russh::server::Handler for SSHandle { .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); - tracing::info!("exec_request received channel={:?} client={}", channel_id, client_info); + tracing::info!( + "exec_request received channel={:?} client={}", + channel_id, + client_info + ); let git_shell_cmd = match std::str::from_utf8(data) { Ok(cmd) => cmd.trim(), Err(e) => { tracing::error!("invalid_command_encoding error={}", e); - let _ = session - .disconnect( - Disconnect::ServiceNotAvailable, - "Invalid command encoding", - "", - ); + let _ = session.disconnect( + Disconnect::ServiceNotAvailable, + "Invalid command encoding", + "", + ); return Err(russh::Error::Disconnect); } }; @@ -493,8 +545,7 @@ impl russh::server::Handler for SSHandle { None => { tracing::error!("invalid_git_command command={}", git_shell_cmd); let msg = format!("Invalid git command: {}", git_shell_cmd); - let _ = session - .disconnect(Disconnect::ServiceNotAvailable, &msg, ""); + let _ = session.disconnect(Disconnect::ServiceNotAvailable, &msg, ""); return Err(russh::Error::Disconnect); } }; @@ -504,8 +555,7 @@ impl russh::server::Handler for SSHandle { None => { let msg = format!("Invalid repository path: {}", path); tracing::error!("invalid_repo_path path={}", path); - let _ = session - .disconnect(Disconnect::ServiceNotAvailable, &msg, ""); + let _ = session.disconnect(Disconnect::ServiceNotAvailable, &msg, ""); return Err(russh::Error::Disconnect); } }; @@ -516,8 +566,8 @@ impl russh::server::Handler for SSHandle { Err(e) => { // Log the detailed error internally; client receives generic message. tracing::error!("repo_fetch_error error={}", e); - let _ = session - .disconnect(Disconnect::ServiceNotAvailable, "Repository not found", ""); + let _ = + session.disconnect(Disconnect::ServiceNotAvailable, "Repository not found", ""); return Err(russh::Error::Disconnect); } }; @@ -546,20 +596,34 @@ impl russh::server::Handler for SSHandle { if is_write { "write" } else { "read" }, repo.repo_name ); - tracing::error!("access_denied user={} repo={} is_write={}", operator.username, repo.repo_name, is_write); + tracing::error!( + "access_denied user={} repo={} is_write={}", + operator.username, + repo.repo_name, + is_write + ); let _ = session.disconnect(Disconnect::ByApplication, &msg, ""); return Err(russh::Error::Disconnect); } - tracing::info!("access_granted user={} repo={} is_write={}", operator.username, repo.repo_name, is_write); + tracing::info!( + "access_granted user={} repo={} is_write={}", + operator.username, + repo.repo_name, + is_write + ); let repo_path = PathBuf::from(&repo.storage_path); if !repo_path.exists() { tracing::error!("repo_path_not_found path={}", repo.storage_path); } let mut cmd = build_git_command(service, repo_path); - - tracing::info!("spawn_git_process service={:?} path={}", service, repo.storage_path); + + tracing::info!( + "spawn_git_process service={:?} path={}", + service, + repo.storage_path + ); let mut shell = match cmd .stdin(Stdio::piped()) .stdout(Stdio::piped()) @@ -583,7 +647,10 @@ impl russh::server::Handler for SSHandle { None => { tracing::error!("stdin pipe unavailable for channel={:?}", channel_id); let _ = session_handle.channel_failure(channel_id).await; - return Err(russh::Error::IO(io::Error::new(io::ErrorKind::Other, "stdin unavailable"))); + return Err(russh::Error::IO(io::Error::new( + io::ErrorKind::Other, + "stdin unavailable", + ))); } }; self.stdin.insert(channel_id, stdin); @@ -591,14 +658,20 @@ impl russh::server::Handler for SSHandle { Some(s) => s, None => { tracing::error!("stdout pipe unavailable for channel={:?}", channel_id); - return Err(russh::Error::IO(io::Error::new(io::ErrorKind::Other, "stdout unavailable"))); + return Err(russh::Error::IO(io::Error::new( + io::ErrorKind::Other, + "stdout unavailable", + ))); } }; let mut shell_stderr = match shell.stderr.take() { Some(s) => s, None => { tracing::error!("stderr pipe unavailable for channel={:?}", channel_id); - return Err(russh::Error::IO(io::Error::new(io::ErrorKind::Other, "stderr unavailable"))); + return Err(russh::Error::IO(io::Error::new( + io::ErrorKind::Other, + "stderr unavailable", + ))); } }; @@ -607,7 +680,7 @@ impl russh::server::Handler for SSHandle { let repo_uid = repo.id; let should_sync = service == GitService::ReceivePack; let sync = self.sync.clone(); - + let fut = async move { tracing::info!(channel = ?channel_id, "git_task_started"); diff --git a/libs/git/ssh/mod.rs b/libs/git/ssh/mod.rs index 20e544f..095cb75 100644 --- a/libs/git/ssh/mod.rs +++ b/libs/git/ssh/mod.rs @@ -40,12 +40,7 @@ impl SSHHandle { } }); } - pub fn new( - db: AppDatabase, - app: AppConfig, - cache: AppCache, - redis_pool: RedisPool, - ) -> Self { + pub fn new(db: AppDatabase, app: AppConfig, cache: AppCache, redis_pool: RedisPool) -> Self { SSHHandle { db, app, @@ -72,10 +67,7 @@ impl SSHHandle { ) })?; - tracing::info!( - "Hex decoded to {} bytes", - private_key_bytes.len() - ); + tracing::info!("Hex decoded to {} bytes", private_key_bytes.len()); let private_key_pem = std::str::from_utf8(&private_key_bytes) .with_context(|| "Decoded SSH private key is not valid UTF-8")?; @@ -98,7 +90,8 @@ impl SSHHandle { } Err(e) => { tracing::info!( - "ssh-key from_openssh failed: {}, trying direct russh parse", e + "ssh-key from_openssh failed: {}, trying direct russh parse", + e ); PrivateKey::from_str(private_key_pem).with_context(|| { format!("Failed to parse SSH private key with both methods") @@ -119,9 +112,7 @@ impl SSHHandle { config.keepalive_interval = Some(Duration::from_secs(60)); config.keepalive_max = 3; - tracing::info!( - "SSH server configured with methods: {:?}", config.methods - ); + tracing::info!("SSH server configured with methods: {:?}", config.methods); let token_service = SshTokenService::new(self.db.clone()); let mut server = server::SSHServer::new( self.db.clone(), @@ -161,7 +152,17 @@ pub struct ReceiveSyncService { pool: deadpool_redis::cluster::Pool, redis_prefix: String, /// Optional NATS publish function: (subject, payload) -> Result - nats_publish: Option) -> std::pin::Pin> + Send>> + Send + Sync>>, + nats_publish: Option< + Arc< + dyn Fn( + String, + Vec, + ) -> std::pin::Pin< + Box> + Send>, + > + Send + + Sync, + >, + >, } impl ReceiveSyncService { @@ -175,7 +176,15 @@ impl ReceiveSyncService { pub fn with_nats( pool: deadpool_redis::cluster::Pool, - nats_publish: Arc) -> std::pin::Pin> + Send>> + Send + Sync>, + nats_publish: Arc< + dyn Fn( + String, + Vec, + ) -> std::pin::Pin< + Box> + Send>, + > + Send + + Sync, + >, ) -> Self { Self { pool, @@ -243,8 +252,11 @@ impl ReceiveSyncService { .query_async::<()>(&mut conn) .await { - tracing::error!("failed to enqueue sync task repo_id={} error={}", - task.repo_uid, e); + tracing::error!( + "failed to enqueue sync task repo_id={} error={}", + task.repo_uid, + e + ); } else { tracing::info!(repo_id = %task.repo_uid, "hook task queued to Redis"); metrics::counter!("hook_task_queued_total", "backend" => "redis").increment(1);