From 0c1a9ddf98fa383ec4f31185924d6f572c337feb Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Tue, 21 Apr 2026 22:29:26 +0800 Subject: [PATCH] refactor(git): migrate libs/git from slog to tracing - Remove all use slog::* imports and log: slog::Logger fields - ssh/handle.rs: replace slog macro chains with tracing::{info!, warn!, error!, debug!}; remove log field from GitSshHandle - ssh/authz.rs, ssh/mod.rs, ssh/server.rs: remove slog Logger fields - http/: auth.rs, handler.rs, mod.rs, routes.rs: remove slog usage - hook/: pool worker, sync modules, webhook_dispatch.rs: remove slog --- libs/git/Cargo.toml | 2 +- libs/git/hook/mod.rs | 5 -- libs/git/hook/pool/mod.rs | 5 +- libs/git/hook/pool/redis.rs | 18 +--- libs/git/hook/pool/worker.rs | 46 ++++------ libs/git/hook/sync/commit.rs | 2 +- libs/git/hook/sync/fsck.rs | 16 ++-- libs/git/hook/sync/gc.rs | 3 +- libs/git/hook/sync/lfs.rs | 6 +- libs/git/hook/sync/mod.rs | 18 ++-- libs/git/hook/webhook_dispatch.rs | 35 ++++---- libs/git/http/auth.rs | 4 +- libs/git/http/handler.rs | 15 ++-- libs/git/http/mod.rs | 17 ++-- libs/git/http/routes.rs | 12 +-- libs/git/ssh/authz.rs | 36 ++++---- libs/git/ssh/handle.rs | 137 ++++++++++++------------------ libs/git/ssh/mod.rs | 54 +++++------- libs/git/ssh/server.rs | 35 +++----- 19 files changed, 173 insertions(+), 293 deletions(-) diff --git a/libs/git/Cargo.toml b/libs/git/Cargo.toml index 81127ee..7392543 100644 --- a/libs/git/Cargo.toml +++ b/libs/git/Cargo.toml @@ -28,8 +28,8 @@ models = { workspace = true } db = { workspace = true } deadpool-redis = { workspace = true, features = ["rt_tokio_1", "cluster-async", "cluster"] } config = { workspace = true } -slog = { workspace = true } tokio = { workspace = true, features = ["sync", "rt", "process"] } +tracing = { workspace = true } tokio-util = { workspace = true } qdrant-client = { workspace = true } redis = { workspace = true } diff --git a/libs/git/hook/mod.rs b/libs/git/hook/mod.rs index 3f3cd43..6a63bb6 100644 --- a/libs/git/hook/mod.rs +++ b/libs/git/hook/mod.rs @@ -2,7 +2,6 @@ use config::AppConfig; use db::cache::AppCache; use db::database::AppDatabase; use deadpool_redis::cluster::Pool as RedisPool; -use slog::Logger; use tokio_util::sync::CancellationToken; pub mod pool; @@ -20,7 +19,6 @@ pub struct HookService { pub(crate) db: AppDatabase, pub(crate) cache: AppCache, pub(crate) redis_pool: RedisPool, - pub(crate) logger: Logger, pub(crate) config: AppConfig, } @@ -29,14 +27,12 @@ impl HookService { db: AppDatabase, cache: AppCache, redis_pool: RedisPool, - logger: Logger, config: AppConfig, ) -> Self { Self { db, cache, redis_pool, - logger, config, } } @@ -48,7 +44,6 @@ impl HookService { self.db.clone(), self.cache.clone(), self.redis_pool.clone(), - self.logger.clone(), pool_config, ) } diff --git a/libs/git/hook/pool/mod.rs b/libs/git/hook/pool/mod.rs index 2248978..5c0948e 100644 --- a/libs/git/hook/pool/mod.rs +++ b/libs/git/hook/pool/mod.rs @@ -9,7 +9,6 @@ pub use worker::HookWorker; use db::cache::AppCache; use db::database::AppDatabase; use deadpool_redis::cluster::Pool as RedisPool; -use slog::Logger; use std::sync::Arc; use tokio_util::sync::CancellationToken; @@ -19,19 +18,17 @@ pub fn start_worker( db: AppDatabase, cache: AppCache, redis_pool: RedisPool, - logger: Logger, config: PoolConfig, ) -> CancellationToken { let consumer = RedisConsumer::new( redis_pool.clone(), config.redis_list_prefix.clone(), config.redis_block_timeout_secs, - logger.clone(), ); let http_client = Arc::new(reqwest::Client::new()); let max_retries = config.redis_max_retries as u32; - let worker = HookWorker::new(db, cache, logger, consumer, http_client, max_retries); + let worker = HookWorker::new(db, cache, consumer, http_client, max_retries); let cancel = CancellationToken::new(); let cancel_clone = cancel.clone(); diff --git a/libs/git/hook/pool/redis.rs b/libs/git/hook/pool/redis.rs index 796ffc0..66bb066 100644 --- a/libs/git/hook/pool/redis.rs +++ b/libs/git/hook/pool/redis.rs @@ -1,7 +1,6 @@ use crate::error::GitError; use crate::hook::pool::types::HookTask; use deadpool_redis::cluster::Connection as RedisConn; -use slog::Logger; use std::time::Duration; /// Redis List consumer using BLMOVE for atomic move-from-queue-to-work pattern. @@ -10,7 +9,6 @@ pub struct RedisConsumer { /// Hash-tag-prefixed key prefix, e.g. "{hook}". prefix: String, block_timeout_secs: u64, - logger: Logger, } const POOL_GET_TIMEOUT: Duration = Duration::from_secs(5); @@ -20,13 +18,11 @@ impl RedisConsumer { pool: deadpool_redis::cluster::Pool, prefix: String, block_timeout_secs: u64, - logger: Logger, ) -> Self { Self { pool, prefix, block_timeout_secs, - logger, } } @@ -61,19 +57,12 @@ impl RedisConsumer { Some(json) => { match serde_json::from_str::(&json) { Ok(task) => { - slog::debug!(self.logger, "task dequeued"; - "task_id" => %task.id, - "task_type" => %task.task_type, - "queue" => %queue_key - ); + tracing::debug!("task dequeued task_id={} task_type={} queue={}", task.id, task.task_type, queue_key); Ok(Some((task, json))) } Err(e) => { // Malformed task — remove from work queue and discard - slog::warn!(self.logger, "malformed task JSON, discarding"; - "error" => %e, - "queue" => %work_key - ); + tracing::warn!("malformed task JSON, discarding error={} queue={}", e, work_key); let _ = self.ack_raw(&work_key, &json).await; Ok(None) } @@ -154,7 +143,7 @@ impl RedisConsumer { .await .map_err(|e| GitError::Internal(format!("nak script failed: {}", e)))?; - slog::warn!(self.logger, "task nack'd and requeued queue={}", queue_key); + tracing::warn!("task nack'd and requeued queue={}", queue_key); Ok(()) } @@ -170,7 +159,6 @@ impl Clone for RedisConsumer { pool: self.pool.clone(), prefix: self.prefix.clone(), block_timeout_secs: self.block_timeout_secs, - logger: self.logger.clone(), } } } diff --git a/libs/git/hook/pool/worker.rs b/libs/git/hook/pool/worker.rs index 853c2e4..ffe1c6d 100644 --- a/libs/git/hook/pool/worker.rs +++ b/libs/git/hook/pool/worker.rs @@ -5,7 +5,6 @@ use crate::hook::sync::HookMetaDataSync; use db::cache::AppCache; use db::database::AppDatabase; use models::EntityTrait; -use slog::Logger; use std::sync::Arc; use std::time::Duration; use tokio_util::sync::CancellationToken; @@ -17,7 +16,6 @@ use tokio_util::sync::CancellationToken; pub struct HookWorker { db: AppDatabase, cache: AppCache, - logger: Logger, consumer: RedisConsumer, http_client: Arc, max_retries: u32, @@ -27,7 +25,6 @@ impl HookWorker { pub fn new( db: AppDatabase, cache: AppCache, - logger: Logger, consumer: RedisConsumer, http_client: Arc, max_retries: u32, @@ -35,7 +32,6 @@ impl HookWorker { Self { db, cache, - logger, consumer, http_client, max_retries, @@ -44,7 +40,7 @@ impl HookWorker { /// Run the worker loop. Blocks until cancelled. pub async fn run(&self, cancel: CancellationToken) { - slog::info!(self.logger, "hook worker started"); + tracing::info!("hook worker started"); let task_types = [TaskType::Sync, TaskType::Fsck, TaskType::Gc]; let mut redis_backoff_ms: u64 = 1000; @@ -52,7 +48,7 @@ impl HookWorker { loop { // Check cancellation at top of loop to avoid unnecessary work if cancel.is_cancelled() { - slog::info!(self.logger, "hook worker shutdown signal received"); + tracing::info!("hook worker shutdown signal received"); break; } @@ -67,7 +63,7 @@ impl HookWorker { } Ok(None) => continue, Err(e) => { - slog::warn!(self.logger, "failed to dequeue task: {}", e); + tracing::warn!("failed to dequeue task: {}", e); tokio::time::sleep(Duration::from_millis(redis_backoff_ms)).await; // Exponential backoff, cap at 32s redis_backoff_ms = (redis_backoff_ms * 2).min(32_000); @@ -83,7 +79,7 @@ impl HookWorker { } } - slog::info!(self.logger, "hook worker stopped"); + tracing::info!("hook worker stopped"); } async fn process_task( @@ -93,7 +89,7 @@ impl HookWorker { work_key: &str, queue_key: &str, ) { - slog::info!(self.logger, "task started task_id={} task_type={} repo_id={}", + tracing::info!("task started task_id={} task_type={} repo_id={}", task.id, task.task_type, task.repo_id); let result = match task.task_type { @@ -105,25 +101,25 @@ impl HookWorker { match result { Ok(()) => { if let Err(e) = self.consumer.ack(work_key, task_json).await { - slog::warn!(self.logger, "failed to ack task: {}", e); + tracing::warn!("failed to ack task: {}", e); } - slog::info!(self.logger, "task completed task_id={}", task.id); + tracing::info!("task completed task_id={}", task.id); } Err(e) => { let is_locked = matches!(e, crate::GitError::Locked(_)); if is_locked { // Another worker holds the lock — requeue without counting as retry. - slog::info!(self.logger, "repo locked by another worker, requeueing task_id={}", task.id); + tracing::info!("repo locked by another worker, requeueing task_id={}", task.id); if let Err(nak_err) = self.consumer.nak(work_key, queue_key, task_json).await { - slog::warn!(self.logger, "failed to requeue locked task: {}", nak_err); + tracing::warn!("failed to requeue locked task: {}", nak_err); } } else { - slog::warn!(self.logger, "task failed task_id={} task_type={} repo_id={} error={}", + tracing::warn!("task failed task_id={} task_type={} repo_id={} error={}", task.id, task.task_type, task.repo_id, e); if task.retry_count >= self.max_retries { - slog::warn!(self.logger, "task exhausted retries, discarding task_id={} retry_count={}", + tracing::warn!("task exhausted retries, discarding task_id={} retry_count={}", task.id, task.retry_count); let _ = self.consumer.ack(work_key, task_json).await; } else { @@ -162,10 +158,9 @@ impl HookWorker { let before_tips = tokio::task::spawn_blocking({ let db = self.db.clone(); let cache = self.cache.clone(); - let logger = self.logger.clone(); let repo = repo.clone(); move || { - let sync = HookMetaDataSync::new(db, cache, repo, logger) + let sync = HookMetaDataSync::new(db, cache, repo) .map_err(|e| GitError::Internal(e.to_string()))?; Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips())) } @@ -177,11 +172,10 @@ impl HookWorker { // Run full sync (internally acquires/releases per-repo lock) let db = self.db.clone(); let cache = self.cache.clone(); - let logger = self.logger.clone(); let repo_clone = repo.clone(); let _sync_result = tokio::task::spawn_blocking(move || { let result = tokio::runtime::Handle::current().block_on(async { - let sync = HookMetaDataSync::new(db.clone(), cache.clone(), repo_clone.clone(), logger.clone())?; + let sync = HookMetaDataSync::new(db.clone(), cache.clone(), repo_clone.clone())?; sync.sync().await }); match result { @@ -199,10 +193,9 @@ impl HookWorker { let after_tips = tokio::task::spawn_blocking({ let db = self.db.clone(); let cache = self.cache.clone(); - let logger = self.logger.clone(); let repo = repo.clone(); move || { - let sync = HookMetaDataSync::new(db, cache, repo, logger) + let sync = HookMetaDataSync::new(db, cache, repo) .map_err(|e| GitError::Internal(e.to_string()))?; Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips())) } @@ -229,7 +222,6 @@ impl HookWorker { let default_branch = repo.default_branch.clone(); let http_client = self.http_client.clone(); let db = self.db.clone(); - let logger = self.logger.clone(); // Dispatch branch webhooks and collect handles let mut handles = Vec::new(); @@ -245,7 +237,6 @@ impl HookWorker { let h = tokio::spawn({ let http_client = http_client.clone(); let db = db.clone(); - let logger = logger.clone(); let repo_id_str = repo_id_str.clone(); let namespace = namespace.clone(); let repo_name = repo_name.clone(); @@ -254,7 +245,6 @@ impl HookWorker { crate::hook::webhook_dispatch::dispatch_repo_webhooks( &db, &http_client, - &logger, &repo_id_str, &namespace, &repo_name, @@ -289,7 +279,6 @@ impl HookWorker { let h = tokio::spawn({ let http_client = http_client.clone(); let db = db.clone(); - let logger = logger.clone(); let repo_id_str = repo_id_str.clone(); let namespace = namespace.clone(); let repo_name = repo_name.clone(); @@ -298,7 +287,6 @@ impl HookWorker { crate::hook::webhook_dispatch::dispatch_repo_webhooks( &db, &http_client, - &logger, &repo_id_str, &namespace, &repo_name, @@ -345,8 +333,7 @@ impl HookWorker { let db = self.db.clone(); let cache = self.cache.clone(); - let logger = self.logger.clone(); - let sync = HookMetaDataSync::new(db, cache, repo, logger)?; + let sync = HookMetaDataSync::new(db, cache, repo)?; sync.fsck_only().await?; Ok(()) @@ -371,8 +358,7 @@ impl HookWorker { let db = self.db.clone(); let cache = self.cache.clone(); - let logger = self.logger.clone(); - let sync = HookMetaDataSync::new(db, cache, repo, logger)?; + let sync = HookMetaDataSync::new(db, cache, repo)?; sync.gc_only().await?; Ok(()) diff --git a/libs/git/hook/sync/commit.rs b/libs/git/hook/sync/commit.rs index 2bef47c..0598265 100644 --- a/libs/git/hook/sync/commit.rs +++ b/libs/git/hook/sync/commit.rs @@ -61,7 +61,7 @@ impl HookMetaDataSync { let reference = match ref_result { Ok(r) => r, Err(e) => { - slog::warn!(self.logger, "failed to read reference: {}", e); + tracing::warn!("failed to read reference error={}", e); continue; } }; diff --git a/libs/git/hook/sync/fsck.rs b/libs/git/hook/sync/fsck.rs index 040a9c2..800f9ba 100644 --- a/libs/git/hook/sync/fsck.rs +++ b/libs/git/hook/sync/fsck.rs @@ -13,7 +13,6 @@ impl HookMetaDataSync { ) -> Result<(), GitError> { let snapshot = self.snapshot_refs(); let storage_path = self.repo.storage_path.clone(); - let logger = self.logger.clone(); let fsck_errors = tokio::task::spawn_blocking(move || { let output = Command::new("git") @@ -27,9 +26,8 @@ impl HookMetaDataSync { if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr).to_string(); let stdout = String::from_utf8_lossy(&output.stdout).to_string(); - slog::warn!( - logger, - "git fsck failed with code {:?}. stdout: {}, stderr: {}", + tracing::warn!( + "git fsck failed code={:?} stdout={} stderr={}", output.status.code(), stdout, stderr @@ -98,7 +96,6 @@ impl HookMetaDataSync { async fn rollback_refs(&self, snapshot: &HashMap) { let storage_path = self.repo.storage_path.clone(); - let logger = self.logger.clone(); let refs: Vec<(String, String)> = snapshot .iter() .map(|(k, v)| (k.clone(), v.clone())) @@ -119,18 +116,17 @@ impl HookMetaDataSync { match status { Ok(s) if s.success() => { - slog::info!(logger, "rolled back ref {} to {}", ref_name, oid); + tracing::info!("rolled back ref ref_name={} oid={}", ref_name, oid); } Ok(s) => { - slog::error!( - logger, - "failed to rollback ref {}: git exited with {:?}", + tracing::error!( + "failed to rollback ref ref_name={} code={:?}", ref_name, s.code() ); } Err(e) => { - slog::error!(logger, "failed to rollback ref {}: {}", ref_name, e); + tracing::error!("failed to rollback ref ref_name={} error={}", ref_name, e); } } } diff --git a/libs/git/hook/sync/gc.rs b/libs/git/hook/sync/gc.rs index 6b45663..0723142 100644 --- a/libs/git/hook/sync/gc.rs +++ b/libs/git/hook/sync/gc.rs @@ -5,7 +5,6 @@ use std::process::Command; impl HookMetaDataSync { pub async fn run_gc(&self) -> Result<(), GitError> { let storage_path = self.repo.storage_path.clone(); - let logger = self.logger.clone(); tokio::task::spawn_blocking(move || { let status = Command::new("git") @@ -20,7 +19,7 @@ impl HookMetaDataSync { if !status.success() { // git gc --auto exits non-zero when there's nothing to collect, // or when another gc is already running — both are benign. - slog::warn!(logger, "git gc exited with {:?}", status.code()); + tracing::warn!(code = ?status.code(), "git gc exited with non-zero status"); } Ok::<(), GitError>(()) diff --git a/libs/git/hook/sync/lfs.rs b/libs/git/hook/sync/lfs.rs index 796af51..64f8dd7 100644 --- a/libs/git/hook/sync/lfs.rs +++ b/libs/git/hook/sync/lfs.rs @@ -33,11 +33,7 @@ impl HookMetaDataSync { let path = match self.domain.lfs_object_path(&oid) { Ok(p) => p, Err(e) => { - slog::warn!( - self.logger, - "invalid LFS OID in local objects directory: {}", - e - ); + tracing::warn!("invalid LFS OID in local objects directory error={}", e); continue; } }; diff --git a/libs/git/hook/sync/mod.rs b/libs/git/hook/sync/mod.rs index 4cac9b4..5df0996 100644 --- a/libs/git/hook/sync/mod.rs +++ b/libs/git/hook/sync/mod.rs @@ -14,7 +14,6 @@ use models::repos::repo::Model as RepoModel; use models::RepoId; use models::ActiveModelTrait; use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, Set}; -use slog::Logger; use std::collections::HashMap; use std::path::Path; @@ -133,7 +132,6 @@ pub struct HookMetaDataSync { pub cache: AppCache, pub repo: RepoModel, pub domain: GitDomain, - pub logger: Logger, } impl HookMetaDataSync { @@ -141,7 +139,6 @@ impl HookMetaDataSync { db: AppDatabase, cache: AppCache, repo: RepoModel, - logger: Logger, ) -> Result { let domain = GitDomain::from_model(repo.clone())?; Ok(Self { @@ -149,7 +146,6 @@ impl HookMetaDataSync { cache, repo, domain, - logger, }) } @@ -160,7 +156,7 @@ impl HookMetaDataSync { let res = self.sync_work().await; if let Err(ref e) = res { - slog::error!(self.logger, "sync failed: {}", e); + tracing::error!("sync failed error={}", e); } let _ = self.release_lock(&lock_value).await; @@ -304,11 +300,11 @@ impl HookMetaDataSync { { Ok(Ok(d)) => d, Ok(Err(e)) => { - slog::warn!(self.logger, "failed to scan skills directory: {}", e); + tracing::warn!("failed to scan skills directory error={}", e); return; } Err(e) => { - slog::warn!(self.logger, "spawn_blocking join error: {}", e); + tracing::warn!("spawn_blocking join error error={}", e); return; } }; @@ -331,7 +327,7 @@ impl HookMetaDataSync { { Ok(e) => e, Err(e) => { - slog::warn!(self.logger, "failed to query existing skills: {}", e); + tracing::warn!("failed to query existing skills error={}", e); return; } }; @@ -396,11 +392,7 @@ impl HookMetaDataSync { } if created > 0 || updated > 0 || removed > 0 { - slog::info!( - self.logger, - "skills synced: created={}, updated={}, removed={}", - created, updated, removed - ); + tracing::info!("skills synced created={} updated={} removed={}", created, updated, removed); } } } diff --git a/libs/git/hook/webhook_dispatch.rs b/libs/git/hook/webhook_dispatch.rs index 985139a..0727bd6 100644 --- a/libs/git/hook/webhook_dispatch.rs +++ b/libs/git/hook/webhook_dispatch.rs @@ -212,7 +212,6 @@ pub enum WebhookEventKind { pub async fn dispatch_repo_webhooks( db: &AppDatabase, http: &reqwest::Client, - logs: &slog::Logger, repo_uuid: &str, namespace: &str, repo_name: &str, @@ -231,7 +230,7 @@ pub async fn dispatch_repo_webhooks( { Ok(ws) => ws, Err(e) => { - slog::error!(logs, "failed to query webhooks repo={} error={}", repo_uuid, e); + tracing::error!("failed to query webhooks repo={} error={}", repo_uuid, e); return; } }; @@ -297,7 +296,7 @@ pub async fn dispatch_repo_webhooks( let body = match serde_json::to_vec(&payload) { Ok(b) => b, Err(e) => { - slog::error!(logs, "failed to serialize push payload error={}", e); + tracing::error!("failed to serialize push payload error={}", e); continue; } }; @@ -310,16 +309,16 @@ pub async fn dispatch_repo_webhooks( .await { Ok(Ok(())) => { - slog::info!(logs, "push webhook delivered webhook_id={} url={}", webhook_id, url); - let _ = touch_webhook(db, webhook_id, true, logs).await; + tracing::info!("push webhook delivered webhook_id={} url={}", webhook_id, url); + let _ = touch_webhook(db, webhook_id, true).await; } Ok(Err(e)) => { - slog::warn!(logs, "push webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e); - let _ = touch_webhook(db, webhook_id, false, logs).await; + tracing::warn!("push webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e); + let _ = touch_webhook(db, webhook_id, false).await; } Err(_) => { - slog::warn!(logs, "push webhook timed out webhook_id={} url={}", webhook_id, url); - let _ = touch_webhook(db, webhook_id, false, logs).await; + tracing::warn!("push webhook timed out webhook_id={} url={}", webhook_id, url); + let _ = touch_webhook(db, webhook_id, false).await; } } } @@ -351,7 +350,7 @@ pub async fn dispatch_repo_webhooks( let body = match serde_json::to_vec(&payload) { Ok(b) => b, Err(e) => { - slog::error!(logs, "failed to serialize tag payload error={}", e); + tracing::error!("failed to serialize tag payload error={}", e); continue; } }; @@ -364,16 +363,16 @@ pub async fn dispatch_repo_webhooks( .await { Ok(Ok(())) => { - slog::info!(logs, "tag webhook delivered webhook_id={} url={}", webhook_id, url); - let _ = touch_webhook(db, webhook_id, true, logs).await; + tracing::info!("tag webhook delivered webhook_id={} url={}", webhook_id, url); + let _ = touch_webhook(db, webhook_id, true).await; } Ok(Err(e)) => { - slog::warn!(logs, "tag webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e); - let _ = touch_webhook(db, webhook_id, false, logs).await; + tracing::warn!("tag webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e); + let _ = touch_webhook(db, webhook_id, false).await; } Err(_) => { - slog::warn!(logs, "tag webhook timed out webhook_id={} url={}", webhook_id, url); - let _ = touch_webhook(db, webhook_id, false, logs).await; + tracing::warn!("tag webhook timed out webhook_id={} url={}", webhook_id, url); + let _ = touch_webhook(db, webhook_id, false).await; } } } @@ -381,7 +380,7 @@ pub async fn dispatch_repo_webhooks( } } -async fn touch_webhook(db: &AppDatabase, webhook_id: i64, success: bool, logs: &slog::Logger) { +async fn touch_webhook(db: &AppDatabase, webhook_id: i64, success: bool) { use models::repos::repo_webhook::{Column as RwCol, Entity as RepoWebhookEntity}; use models::{ColumnTrait, EntityTrait, QueryFilter}; use sea_orm::{sea_query::Expr, ExprTrait}; @@ -405,6 +404,6 @@ async fn touch_webhook(db: &AppDatabase, webhook_id: i64, success: bool, logs: & }; if let Err(e) = result { - slog::warn!(logs, "failed to update webhook touch error={}", e); + tracing::warn!("failed to update webhook touch error={}", e); } } diff --git a/libs/git/http/auth.rs b/libs/git/http/auth.rs index 95ac17b..75e3419 100644 --- a/libs/git/http/auth.rs +++ b/libs/git/http/auth.rs @@ -6,7 +6,6 @@ use models::repos::repo; use models::users::{user, user_token}; use sea_orm::sqlx::types::chrono; use sea_orm::*; -use slog::Logger; pub async fn verify_access_token( db: &AppDatabase, @@ -45,7 +44,6 @@ pub async fn verify_access_token( pub async fn authorize_repo_access( req: &HttpRequest, db: &AppDatabase, - logger: &Logger, repo: &repo::Model, is_write: bool, ) -> Result<(), Error> { @@ -55,7 +53,7 @@ pub async fn authorize_repo_access( let (username, access_key) = extract_basic_credentials(req)?; let user = verify_access_token(db, &username, &access_key).await?; - let authz = SshAuthService::new(db.clone(), logger.clone()); + let authz = SshAuthService::new(db.clone()); let can_access = authz.check_repo_permission(&user, repo, is_write).await; if !can_access { diff --git a/libs/git/http/handler.rs b/libs/git/http/handler.rs index 2a4fee4..beac38b 100644 --- a/libs/git/http/handler.rs +++ b/libs/git/http/handler.rs @@ -4,7 +4,6 @@ use futures_util::Stream; use futures_util::StreamExt; use models::repos::{repo, repo_branch_protect}; use sea_orm::*; -use slog::{error, info, warn, Logger}; use std::path::PathBuf; use std::pin::Pin; use std::time::{Duration, Instant}; @@ -25,16 +24,14 @@ pub struct GitHttpHandler { storage_path: PathBuf, repo: repo::Model, db: AppDatabase, - logger: Logger, } impl GitHttpHandler { - pub fn new(storage_path: PathBuf, repo: repo::Model, db: AppDatabase, logger: Logger) -> Self { + pub fn new(storage_path: PathBuf, repo: repo::Model, db: AppDatabase) -> Self { Self { storage_path, repo, db, - logger, } } @@ -96,7 +93,7 @@ impl GitHttpHandler { mut payload: web::Payload, ) -> Result { let started = Instant::now(); - info!(self.logger, "git_rpc_started"; "service" => %service, "repo" => %self.repo.repo_name, "repo_id" => self.repo.id.to_string()); + tracing::info!("git_rpc_started service={} repo={} repo_id={}", service, self.repo.repo_name, self.repo.id.to_string()); let mut child = tokio::process::Command::new("git") .arg(service) .arg("--stateless-rpc") @@ -138,7 +135,7 @@ impl GitHttpHandler { // Reject oversized pre-PACK data to prevent memory exhaustion if pre_pack.len() + bytes.len() > PRE_PACK_LIMIT { - warn!(self.logger, "git_rpc_payload_too_large"; "service" => %service, "repo" => %self.repo.repo_name, "repo_id" => self.repo.id.to_string()); + tracing::warn!("git_rpc_payload_too_large service={} repo={} repo_id={}", service, self.repo.repo_name, self.repo.id.to_string()); return Err(actix_web::error::ErrorPayloadTooLarge(format!( "Ref negotiation exceeds {} byte limit", PRE_PACK_LIMIT @@ -149,7 +146,7 @@ impl GitHttpHandler { pre_pack.extend_from_slice(&bytes[..pos]); if let Err(msg) = check_branch_protection(&branch_protects, &pre_pack) { - warn!(self.logger, "branch_protection_violation"; "repo" => %self.repo.repo_name, "repo_id" => self.repo.id.to_string(), "message" => %msg); + tracing::warn!("branch_protection_violation repo={} repo_id={} message={}", self.repo.repo_name, self.repo.id.to_string(), msg); return Err(actix_web::error::ErrorForbidden(msg)); } @@ -210,7 +207,7 @@ impl GitHttpHandler { if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); let ms = started.elapsed().as_millis() as u64; - error!(self.logger, "git_rpc_failed"; "service" => %service, "repo" => %self.repo.repo_name, "repo_id" => self.repo.id.to_string(), "duration_ms" => ms, "stderr" => %stderr.to_string()); + tracing::error!("git_rpc_failed service={} repo={} repo_id={} duration_ms={} stderr={}", service, self.repo.repo_name, self.repo.id.to_string(), ms, stderr.to_string()); return Err(actix_web::error::ErrorInternalServerError(format!( "Git command failed: {}", stderr @@ -218,7 +215,7 @@ impl GitHttpHandler { } let ms = started.elapsed().as_millis() as u64; - info!(self.logger, "git_rpc_completed"; "service" => %service, "repo" => %self.repo.repo_name, "repo_id" => self.repo.id.to_string(), "duration_ms" => ms, "bytes_out" => output.stdout.len()); + tracing::info!("git_rpc_completed service={} repo={} repo_id={} duration_ms={} bytes_out={}", service, self.repo.repo_name, self.repo.id.to_string(), ms, output.stdout.len()); Ok(HttpResponse::Ok() .content_type(format!("application/x-git-{}-result", service)) diff --git a/libs/git/http/mod.rs b/libs/git/http/mod.rs index 86b8e83..d57283b 100644 --- a/libs/git/http/mod.rs +++ b/libs/git/http/mod.rs @@ -3,7 +3,6 @@ use actix_web::{App, HttpServer, web}; use config::AppConfig; use db::cache::AppCache; use db::database::AppDatabase; -use slog::{Logger, error, info}; use std::sync::Arc; pub mod auth; @@ -20,7 +19,6 @@ pub struct HttpAppState { pub cache: AppCache, pub sync: crate::ssh::ReceiveSyncService, pub rate_limiter: Arc, - pub logger: Logger, } pub fn git_http_cfg(cfg: &mut web::ServiceConfig) { @@ -66,7 +64,7 @@ pub fn git_http_cfg(cfg: &mut web::ServiceConfig) { ); } -pub async fn run_http(config: AppConfig, logger: Logger) -> anyhow::Result<()> { +pub async fn run_http(config: AppConfig) -> anyhow::Result<()> { let (db, app_cache) = tokio::join!(AppDatabase::init(&config), AppCache::init(&config),); let db = db?; let app_cache = app_cache?; @@ -76,13 +74,12 @@ pub async fn run_http(config: AppConfig, logger: Logger) -> anyhow::Result<()> { db.clone(), app_cache.clone(), redis_pool.clone(), - logger.clone(), config.clone(), ); let _worker_cancel = hook.start_worker(); - slog::info!(logger, "hook worker started"); + tracing::info!("hook worker started"); - let sync = crate::ssh::ReceiveSyncService::new(redis_pool.clone(), logger.clone()); + let sync = crate::ssh::ReceiveSyncService::new(redis_pool.clone()); let rate_limiter = Arc::new(rate_limit::RateLimiter::new( rate_limit::RateLimitConfig::default(), @@ -94,11 +91,9 @@ pub async fn run_http(config: AppConfig, logger: Logger) -> anyhow::Result<()> { cache: app_cache.clone(), sync, rate_limiter, - logger: logger.clone(), }; - let logger_startup = logger.clone(); - info!(&logger_startup, "Starting git HTTP server on 0.0.0.0:8021"); + tracing::info!("Starting git HTTP server on 0.0.0.0:8021"); let server = HttpServer::new(move || { App::new() @@ -112,9 +107,9 @@ pub async fn run_http(config: AppConfig, logger: Logger) -> anyhow::Result<()> { // workers finish in-flight requests then exit (graceful shutdown). let result = server.await; if let Err(e) = result { - error!(&logger, "HTTP server error: {}", e); + tracing::error!("HTTP server error: {}", e); } - info!(&logger, "Git HTTP server stopped"); + tracing::info!("Git HTTP server stopped"); Ok(()) } diff --git a/libs/git/http/routes.rs b/libs/git/http/routes.rs index 322b178..6a74404 100644 --- a/libs/git/http/routes.rs +++ b/libs/git/http/routes.rs @@ -34,10 +34,10 @@ pub async fn info_refs( let path_inner = path.into_inner(); let model = get_repo_model(&path_inner.0, &path_inner.1, &state.db).await?; let is_write = service_param == "git-receive-pack"; - authorize_repo_access(&req, &state.db, &state.logger, &model, is_write).await?; + authorize_repo_access(&req, &state.db, &model, is_write).await?; let storage_path = PathBuf::from(&model.storage_path); - let handler = GitHttpHandler::new(storage_path, model, state.db.clone(), state.logger.clone()); + let handler = GitHttpHandler::new(storage_path, model, state.db.clone()); handler.info_refs(service_param).await } @@ -56,10 +56,10 @@ pub async fn upload_pack( let path_inner = path.into_inner(); let model = get_repo_model(&path_inner.0, &path_inner.1, &state.db).await?; - authorize_repo_access(&req, &state.db, &state.logger, &model, false).await?; + authorize_repo_access(&req, &state.db, &model, false).await?; let storage_path = PathBuf::from(&model.storage_path); - let handler = GitHttpHandler::new(storage_path, model, state.db.clone(), state.logger.clone()); + let handler = GitHttpHandler::new(storage_path, model, state.db.clone()); handler.upload_pack(payload).await } @@ -78,10 +78,10 @@ pub async fn receive_pack( let path_inner = path.into_inner(); let model = get_repo_model(&path_inner.0, &path_inner.1, &state.db).await?; - authorize_repo_access(&req, &state.db, &state.logger, &model, true).await?; + authorize_repo_access(&req, &state.db, &model, true).await?; let storage_path = PathBuf::from(&model.storage_path); - let handler = GitHttpHandler::new(storage_path, model.clone(), state.db.clone(), state.logger.clone()); + let handler = GitHttpHandler::new(storage_path, model.clone(), state.db.clone()); let result = handler.receive_pack(payload).await; let _ = tokio::spawn({ diff --git a/libs/git/ssh/authz.rs b/libs/git/ssh/authz.rs index 7f53da4..3b888dd 100644 --- a/libs/git/ssh/authz.rs +++ b/libs/git/ssh/authz.rs @@ -8,17 +8,15 @@ use models::users::{user, user_ssh_key}; use sea_orm::sqlx::types::chrono; use sea_orm::*; use sha2::{Digest, Sha256}; -use slog::{Logger, error, info, warn}; /// SSH authentication service optimized for performance pub struct SshAuthService { db: AppDatabase, - logger: Logger, } impl SshAuthService { - pub fn new(db: AppDatabase, logger: Logger) -> Self { - Self { db, logger } + pub fn new(db: AppDatabase) -> Self { + Self { db } } pub async fn find_repo( @@ -103,7 +101,7 @@ impl SshAuthService { let fingerprint = match self.generate_fingerprint_from_public_key(public_key_str) { Ok(fp) => fp, Err(e) => { - error!(self.logger, "{}", format!("Failed to generate fingerprint error={}", e)); + tracing::error!("failed to generate SSH key fingerprint error={}", e); return Ok(None); } }; @@ -113,7 +111,7 @@ impl SshAuthService { } else { fingerprint.clone() }; - info!(self.logger, "{}", format!("Looking up user with SSH key fingerprint={}", fingerprint_preview)); + tracing::info!("looking up user with SSH key fingerprint={}", fingerprint_preview); let ssh_key = user_ssh_key::Entity::find() .filter(user_ssh_key::Column::Fingerprint.eq(&fingerprint)) @@ -124,13 +122,13 @@ impl SshAuthService { let ssh_key = match ssh_key { Some(key) => key, None => { - warn!(self.logger, "{}", format!("No SSH key found fingerprint={}", fingerprint)); + tracing::warn!("no SSH key found fingerprint={}", fingerprint); return Ok(None); } }; if self.is_key_expired(&ssh_key) { - warn!(self.logger, "{}", format!("SSH key expired key_id={} expires_at={:?}", ssh_key.id, ssh_key.expires_at)); + tracing::warn!("SSH key expired key_id={} expires_at={:?}", ssh_key.id, ssh_key.expires_at); return Ok(None); } @@ -140,7 +138,7 @@ impl SshAuthService { .await?; if let Some(ref user) = user_model { - info!(self.logger, "{}", format!("User authenticated user={} key={}", user.username, ssh_key.title)); + tracing::info!("user authenticated via SSH key user={} key={}", user.username, ssh_key.title); self.update_key_last_used_async(ssh_key.id); } @@ -158,17 +156,15 @@ impl SshAuthService { fn update_key_last_used_async(&self, key_id: i64) { let db_clone = self.db.clone(); - let logger = self.logger.clone(); tokio::spawn(async move { - if let Err(e) = Self::update_key_last_used_sync(db_clone, &logger, key_id).await { - warn!(&logger, "{}", format!("Failed to update key last_used key_id={} error={}", key_id, e)); + if let Err(e) = Self::update_key_last_used_sync(db_clone, key_id).await { + tracing::warn!("failed to update key last_used key_id={} error={}", key_id, e); } }); } async fn update_key_last_used_sync( db: AppDatabase, - logger: &Logger, key_id: i64, ) -> Result<(), DbErr> { let key = user_ssh_key::Entity::find_by_id(key_id) @@ -182,7 +178,7 @@ impl SshAuthService { active_key.updated_at = Set(now); active_key.update(db.writer()).await?; - info!(logger, "{}", format!("Updated key last_used key_id={}", key_id)); + tracing::info!("updated key last_used key_id={}", key_id); } Ok(()) @@ -195,12 +191,12 @@ impl SshAuthService { is_write: bool, ) -> bool { if repo.created_by == user.uid { - info!(self.logger, "{}", format!("User is repo owner user={} repo={}", user.username, repo.repo_name)); + tracing::info!("user is repo owner user={} repo={}", user.username, repo.repo_name); return true; } if !is_write && !repo.is_private { - info!(self.logger, "{}", format!("Public repo allows read repo={}", repo.repo_name)); + tracing::info!("public repo allows read access repo={}", repo.repo_name); return true; } @@ -209,7 +205,7 @@ impl SshAuthService { .await .unwrap_or(false) { - info!(self.logger, "{}", format!("User has collaborator access user={} repo={}", user.username, repo.repo_name)); + tracing::info!("user has collaborator access user={} repo={}", user.username, repo.repo_name); return true; } @@ -219,11 +215,11 @@ impl SshAuthService { .await .unwrap_or(false) { - info!(self.logger, "{}", format!("User has project member access user={} repo={}", user.username, repo.repo_name)); + tracing::info!("user has project member access user={} repo={}", user.username, repo.repo_name); return true; } - warn!(self.logger, "{}", format!("Access denied user={} repo={} write={}", user.username, repo.repo_name, is_write)); + tracing::warn!("access denied user={} repo={} is_write={}", user.username, repo.repo_name, is_write); false } @@ -251,7 +247,7 @@ impl SshAuthService { return Ok(true); } - warn!(self.logger, "{}", format!("Collaborator has no valid roles scope={}", collab.scope)); + tracing::warn!("collaborator has no valid roles scope={}", collab.scope); Ok(false) } else { Ok(false) diff --git a/libs/git/ssh/handle.rs b/libs/git/ssh/handle.rs index 68b955b..3f60b19 100644 --- a/libs/git/ssh/handle.rs +++ b/libs/git/ssh/handle.rs @@ -12,7 +12,6 @@ use russh::{Channel, ChannelId, CryptoVec, Disconnect}; use sea_orm::ColumnTrait; use sea_orm::EntityTrait; use sea_orm::QueryFilter; -use slog::{Logger, error, info, warn}; use std::collections::{HashMap, HashSet}; use std::io; use std::net::SocketAddr; @@ -77,7 +76,6 @@ pub struct SSHandle { pub cache: AppCache, pub sync: ReceiveSyncService, pub upload_pack_eof_sent: HashSet, - pub logger: Logger, pub token_service: SshTokenService, pub client_addr: Option, } @@ -87,15 +85,14 @@ impl SSHandle { db: AppDatabase, cache: AppCache, sync: ReceiveSyncService, - logger: Logger, token_service: SshTokenService, client_addr: Option, ) -> Self { - let auth = SshAuthService::new(db.clone(), logger.clone()); + let auth = SshAuthService::new(db.clone()); let addr_str = client_addr .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); - info!(logger, "SSH handler created for client: {}", addr_str); + tracing::info!("SSH handler created client={}", addr_str); Self { repo: None, model: None, @@ -110,7 +107,6 @@ impl SSHandle { cache, sync, upload_pack_eof_sent: HashSet::new(), - logger, token_service, client_addr, } @@ -137,7 +133,7 @@ impl Drop for SSHandle { .client_addr .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); - info!(self.logger, "ssh_handler_dropped"; "client" => %addr_str); + tracing::info!("ssh_handler_dropped client={}", addr_str); let channel_ids: Vec<_> = self.stdin.keys().copied().collect(); for channel_id in channel_ids { @@ -154,7 +150,7 @@ impl russh::server::Handler for SSHandle { .client_addr .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); - info!(self.logger, "auth_none_received"; "user" => %user, "client" => %client_info); + tracing::info!("auth_none_received user={} client={}", user, client_info); Ok(Auth::UnsupportedMethod) } @@ -166,25 +162,25 @@ impl russh::server::Handler for SSHandle { if token.is_empty() { - warn!(self.logger, "auth_rejected_empty_token"; "client" => %client_info); + tracing::warn!("auth_rejected_empty_token client={}", client_info); return Err(russh::Error::NotAuthenticated); } - info!(self.logger, "auth_token_attempt"; "client" => %client_info); + tracing::info!("auth_token_attempt client={}", client_info); let user_model = match self.token_service.find_user_by_token(token).await { Ok(Some(model)) => model, Ok(None) => { - warn!(self.logger, "auth_rejected_token_not_found"; "client" => %client_info); + tracing::warn!("auth_rejected_token_not_found client={}", client_info); return Err(russh::Error::NotAuthenticated); } Err(e) => { - error!(self.logger, "auth_token_error"; "error" => %e.to_string(), "client" => %client_info); + tracing::error!("auth_token_error error={}", e); return Err(russh::Error::NotAuthenticated); } }; - info!(self.logger, "auth_token_success"; "user" => %user_model.username, "client" => %client_info); + tracing::info!("auth_token_success user={} client={}", user_model.username, client_info); self.operator = Some(user_model); Ok(Auth::Accept) } @@ -206,29 +202,29 @@ impl russh::server::Handler for SSHandle { .unwrap_or_else(|| "unknown".to_string()); if user != "git" { - warn!(self.logger, "auth_rejected_invalid_username"; "user" => %user, "client" => %client_info); + tracing::warn!("auth_rejected_invalid_username user={} client={}", user, client_info); return Err(russh::Error::NotAuthenticated); } let public_key_str = public_key.to_string(); if public_key_str.len() < 32 { - warn!(self.logger, "auth_rejected_invalid_key_length"; "key_length" => public_key_str.len(), "client" => %client_info); + tracing::warn!("auth_rejected_invalid_key_length key_length={}", public_key_str.len()); return Err(russh::Error::NotAuthenticated); } - info!(self.logger, "auth_publickey_attempt"; "client" => %client_info); + tracing::info!("auth_publickey_attempt client={}", client_info); let user_model = match self.auth.find_user_by_public_key(&public_key_str).await { Ok(Some(model)) => model, Ok(None) => { - warn!(self.logger, "auth_rejected_key_not_found"; "client" => %client_info); + tracing::warn!("auth_rejected_key_not_found client={}", client_info); return Err(russh::Error::NotAuthenticated); } Err(e) => { - error!(self.logger, "auth_publickey_error"; "error" => %e.to_string(), "client" => %client_info); + tracing::error!("auth_publickey_error error={}", e); return Err(russh::Error::NotAuthenticated); } }; - info!(self.logger, "auth_publickey_success"; "user" => %user_model.username, "client" => %client_info); + tracing::info!("auth_publickey_success user={} client={}", user_model.username, client_info); self.operator = Some(user_model); Ok(Auth::Accept) } @@ -243,29 +239,29 @@ impl russh::server::Handler for SSHandle { .unwrap_or_else(|| "unknown".to_string()); if user != "git" { - warn!(self.logger, "auth_rejected_invalid_username"; "user" => %user, "client" => %client_info); + tracing::warn!("auth_rejected_invalid_username user={} client={}", user, client_info); return Err(russh::Error::NotAuthenticated); } let public_key_str = certificate.to_string(); if public_key_str.len() < 32 { - warn!(self.logger, "auth_rejected_invalid_key_length"; "key_length" => public_key_str.len(), "client" => %client_info); + tracing::warn!("auth_rejected_invalid_key_length key_length={}", public_key_str.len()); return Err(russh::Error::NotAuthenticated); } - info!(self.logger, "auth_publickey_attempt"; "client" => %client_info); + tracing::info!("auth_publickey_attempt client={}", client_info); let user_model = match self.auth.find_user_by_public_key(&public_key_str).await { Ok(Some(model)) => model, Ok(None) => { - warn!(self.logger, "auth_rejected_key_not_found"; "client" => %client_info); + tracing::warn!("auth_rejected_key_not_found client={}", client_info); return Err(russh::Error::NotAuthenticated); } Err(e) => { - error!(self.logger, "auth_publickey_error"; "error" => %e.to_string(), "client" => %client_info); + tracing::error!("auth_publickey_error error={}", e); return Err(russh::Error::NotAuthenticated); } }; - info!(self.logger, "auth_publickey_success"; "user" => %user_model.username, "client" => %client_info); + tracing::info!("auth_publickey_success user={} client={}", user_model.username, client_info); self.operator = Some(user_model); Ok(Auth::Accept) } @@ -278,7 +274,7 @@ impl russh::server::Handler for SSHandle { channel: ChannelId, _: &mut Session, ) -> Result<(), Self::Error> { - info!(self.logger, "channel_close"; "channel" => ?channel, "client" => ?self.client_addr); + tracing::info!("channel_close channel={:?} client={:?}", channel, self.client_addr); self.cleanup_channel(channel); Ok(()) } @@ -288,35 +284,23 @@ impl russh::server::Handler for SSHandle { channel: ChannelId, _: &mut Session, ) -> Result<(), Self::Error> { - info!(self.logger, "channel_eof"; - "channel" => ?channel, - "client" => ?self.client_addr - ); + tracing::info!("channel_eof channel={:?} client={:?}", channel, self.client_addr); if let Some(eof) = self.eof.get(&channel) { let _ = eof.send(true).await; } if let Some(mut stdin) = self.stdin.remove(&channel) { - info!(self.logger, "Closing stdin"; - "channel" => ?channel, - "client" => ?self.client_addr - ); + tracing::info!("Closing stdin channel={:?} client={:?}", channel, self.client_addr); // Use timeout so we never block the SSH event loop waiting for git. let _ = tokio::time::timeout(Duration::from_secs(5), async { stdin.flush().await.ok(); let _ = stdin.shutdown().await; }) .await; - info!(self.logger, "stdin closed"; - "channel" => ?channel, - "client" => ?self.client_addr - ); + tracing::info!("stdin closed channel={:?} client={:?}", channel, self.client_addr); } else { - warn!(self.logger, "stdin already removed"; - "channel" => ?channel, - "client" => ?self.client_addr - ); + tracing::warn!("stdin already removed channel={:?} client={:?}", channel, self.client_addr); } Ok(()) @@ -331,7 +315,7 @@ impl russh::server::Handler for SSHandle { .client_addr .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); - info!(self.logger, "channel_open_session"; "channel" => ?channel, "client" => %client_info); + tracing::info!("channel_open_session channel={:?} client={}", channel, client_info); let _ = session.flush().ok(); Ok(true) } @@ -347,11 +331,7 @@ impl russh::server::Handler for SSHandle { _modes: &[(russh::Pty, u32)], session: &mut Session, ) -> Result<(), Self::Error> { - let client_info = self - .client_addr - .map(|addr| format!("{}", addr)) - .unwrap_or_else(|| "unknown".to_string()); - warn!(self.logger, "pty_request not supported"; "channel" => ?channel, "term" => %term, "cols" => col_width, "rows" => row_height, "client" => %client_info); + tracing::warn!("pty_request not supported channel={:?} term={} cols={} rows={}", channel, term, col_width, row_height); let _ = session.flush().ok(); Ok(()) } @@ -362,11 +342,7 @@ impl russh::server::Handler for SSHandle { name: &str, session: &mut Session, ) -> Result<(), Self::Error> { - let client_info = self - .client_addr - .map(|addr| format!("{}", addr)) - .unwrap_or_else(|| "unknown".to_string()); - info!(self.logger, "subsystem_request"; "channel" => ?channel, "subsystem" => %name, "client" => %client_info); + tracing::info!("subsystem_request channel={:?} subsystem={}", channel, name); // git-clients may send "subsystem" for git protocol over ssh. // We don't use subsystem; exec_request handles it directly. let _ = session.flush().ok(); @@ -423,7 +399,7 @@ impl russh::server::Handler for SSHandle { self.branch.insert(channel, refs); } Err(e) => { - warn!(self.logger, "ref_update_parse_error"; "error" => ?e); + tracing::warn!("ref_update_parse_error error={:?}", e); self.branch.insert(channel, vec![]); } } @@ -432,7 +408,7 @@ impl russh::server::Handler for SSHandle { stdin.write_all(&buffered).await?; stdin.flush().await?; } else { - error!(self.logger, "stdin_not_found"; "channel" => ?channel); + tracing::error!("stdin_not_found channel={:?}", channel); } return Ok(()); } @@ -441,7 +417,7 @@ impl russh::server::Handler for SSHandle { stdin.write_all(data).await?; stdin.flush().await?; } else { - error!(self.logger, "stdin_not_found_forwarding"; "channel" => ?channel); + tracing::error!("stdin_not_found_forwarding channel={:?}", channel); } return Ok(()); } @@ -472,7 +448,7 @@ impl russh::server::Handler for SSHandle { user.username ); - info!(self.logger, "shell_request"; "user" => %user.username); + tracing::info!("shell_request user={}", user.username); session .data(channel_id, CryptoVec::from_slice(welcome_msg.as_bytes())) .ok(); @@ -481,7 +457,7 @@ impl russh::server::Handler for SSHandle { session.close(channel_id).ok(); let _ = session.flush().ok(); } else { - warn!(self.logger, "shell_request_unauthenticated"; "channel" => ?channel_id); + tracing::warn!("shell_request_unauthenticated channel={:?}", channel_id); let msg = "Authentication required\r\n"; session .data(channel_id, CryptoVec::from_slice(msg.as_bytes())) @@ -504,15 +480,12 @@ impl russh::server::Handler for SSHandle { .map(|addr| format!("{}", addr)) .unwrap_or_else(|| "unknown".to_string()); - info!( - self.logger, - "exec_request received, channel: {:?}, client: {}", channel_id, client_info - ); + tracing::info!("exec_request received channel={:?} client={}", channel_id, client_info); let git_shell_cmd = match std::str::from_utf8(data) { Ok(cmd) => cmd.trim(), Err(e) => { - error!(self.logger, "invalid_command_encoding"; "error" => %e.to_string()); + tracing::error!("invalid_command_encoding error={}", e); session .disconnect( Disconnect::ServiceNotAvailable, @@ -526,7 +499,7 @@ impl russh::server::Handler for SSHandle { let (service, path) = match parse_git_command(git_shell_cmd) { Some((s, p)) => (s, p), None => { - error!(self.logger, "invalid_git_command"; "command" => %git_shell_cmd); + tracing::error!("invalid_git_command command={}", git_shell_cmd); let msg = format!("Invalid git command: {}", git_shell_cmd); session .disconnect(Disconnect::ServiceNotAvailable, &msg, "") @@ -539,7 +512,7 @@ impl russh::server::Handler for SSHandle { Some(pair) => pair, None => { let msg = format!("Invalid repository path: {}", path); - error!(self.logger, "invalid_repo_path"; "path" => %path); + tracing::error!("invalid_repo_path path={}", path); session .disconnect(Disconnect::ServiceNotAvailable, &msg, "") .ok(); @@ -552,7 +525,7 @@ impl russh::server::Handler for SSHandle { Ok(repo) => repo, Err(e) => { // Log the detailed error internally; client receives generic message. - error!(self.logger, "repo_fetch_error"; "error" => %e.to_string()); + tracing::error!("repo_fetch_error error={}", e); session .disconnect(Disconnect::ServiceNotAvailable, "Repository not found", "") .ok(); @@ -565,7 +538,7 @@ impl russh::server::Handler for SSHandle { Some(user) => user, None => { let msg = "Authentication error: no authenticated user"; - error!(self.logger, "exec_no_authenticated_user"; "channel" => ?channel_id); + tracing::error!("exec_no_authenticated_user channel={:?}", channel_id); session.disconnect(Disconnect::ByApplication, msg, "").ok(); return Err(russh::Error::Disconnect); } @@ -584,20 +557,20 @@ impl russh::server::Handler for SSHandle { if is_write { "write" } else { "read" }, repo.repo_name ); - error!(self.logger, "access_denied"; "user" => %operator.username, "repo" => %repo.repo_name, "is_write" => is_write); + tracing::error!("access_denied user={} repo={} is_write={}", operator.username, repo.repo_name, is_write); session.disconnect(Disconnect::ByApplication, &msg, "").ok(); return Err(russh::Error::Disconnect); } - info!(self.logger, "access_granted"; "user" => %operator.username, "repo" => %repo.repo_name, "is_write" => is_write); + tracing::info!("access_granted user={} repo={} is_write={}", operator.username, repo.repo_name, is_write); let repo_path = PathBuf::from(&repo.storage_path); if !repo_path.exists() { - error!(self.logger, "repo_path_not_found"; "path" => %repo.storage_path); + tracing::error!("repo_path_not_found path={}", repo.storage_path); } let mut cmd = build_git_command(service, repo_path); - let logger = self.logger.clone(); - info!(&logger, "spawn_git_process"; "service" => ?service, "path" => %repo.storage_path); + + tracing::info!("spawn_git_process service={:?} path={}", service, repo.storage_path); let mut shell = match cmd .stdin(Stdio::piped()) .stdout(Stdio::piped()) @@ -609,7 +582,7 @@ impl russh::server::Handler for SSHandle { shell } Err(e) => { - error!(&logger, "process_spawn_failed"; "error" => %e.to_string()); + tracing::error!("process_spawn_failed error={}", e); let _ = session.channel_failure(channel_id); self.cleanup_channel(channel_id); return Err(russh::Error::IO(e)); @@ -626,9 +599,9 @@ impl russh::server::Handler for SSHandle { let repo_uid = repo.id; let should_sync = service == GitService::ReceivePack; let sync = self.sync.clone(); - let logger_for_fut = self.logger.clone(); + let fut = async move { - info!(&logger_for_fut, "git_task_started"; "channel" => ?channel_id); + tracing::info!(channel = ?channel_id, "git_task_started"); let mut stdout_done = false; let mut stderr_done = false; @@ -655,7 +628,7 @@ impl russh::server::Handler for SSHandle { let status = result?; let status_code = status.code().unwrap_or(128) as u32; - info!(&logger_for_fut, "git_process_exited"; "channel" => ?channel_id, "status" => status_code); + tracing::info!("git_process_exited channel={:?} status={}", channel_id, status_code); if !stdout_done || !stderr_done { let _ = tokio::time::timeout(Duration::from_millis(100), async { @@ -685,21 +658,21 @@ impl russh::server::Handler for SSHandle { sleep(Duration::from_millis(50)).await; let _ = session_handle.eof(channel_id).await; let _ = session_handle.close(channel_id).await; - info!(&logger_for_fut, "channel_closed"; "channel" => ?channel_id); + tracing::info!(channel = ?channel_id, "channel_closed"); break; } result = &mut stdout_fut, if !stdout_done => { - info!(&logger_for_fut, "stdout completed"); + tracing::info!("stdout completed"); stdout_done = true; if let Err(e) = result { - warn!(&logger_for_fut, "stdout_forward_error"; "error" => ?e); + tracing::warn!(error = ?e, "stdout_forward_error"); } } result = &mut stderr_fut, if !stderr_done => { - info!(&logger_for_fut, "stderr completed"); + tracing::info!("stderr completed"); stderr_done = true; if let Err(e) = result { - warn!(&logger_for_fut, "stderr_forward_error"; "error" => ?e); + tracing::warn!(error = ?e, "stderr_forward_error"); } } } @@ -710,7 +683,7 @@ impl russh::server::Handler for SSHandle { tokio::spawn(async move { if let Err(e) = fut.await { - error!(&logger, "git_ssh_channel_task_error"; "error" => %e.to_string()); + tracing::error!("git_ssh_channel_task_error error={}", e); } while eof_rx.recv().await.is_some() {} }); diff --git a/libs/git/ssh/mod.rs b/libs/git/ssh/mod.rs index 57a651e..0bc4664 100644 --- a/libs/git/ssh/mod.rs +++ b/libs/git/ssh/mod.rs @@ -12,7 +12,6 @@ use russh::server::Server; use russh::{MethodKind, MethodSet, SshId, server::Config}; use sea_orm::prelude::*; use sha2::{Digest, Sha256}; -use slog::{Logger, error, info}; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -27,7 +26,6 @@ pub struct SSHHandle { pub app: AppConfig, pub cache: AppCache, pub redis_pool: RedisPool, - pub logger: Logger, } impl SSHHandle { @@ -35,7 +33,7 @@ impl SSHHandle { let this = self.clone(); tokio::spawn(async move { if let Err(e) = this.run_ssh().await { - error!(this.logger, "SSH server error: {}", e); + tracing::error!("SSH server error: {}", e); } }); } @@ -44,18 +42,16 @@ impl SSHHandle { app: AppConfig, cache: AppCache, redis_pool: RedisPool, - logger: Logger, ) -> Self { SSHHandle { db, app, cache, redis_pool, - logger, } } pub async fn run_ssh(&self) -> anyhow::Result<()> { - info!(self.logger, "SSH server starting"); + tracing::info!("SSH server starting"); let private_key_content = self.app.ssh_server_private_key()?; if private_key_content.is_empty() { return Err(anyhow::anyhow!("SSH server private key is not configured")); @@ -66,8 +62,7 @@ impl SSHHandle { } else { private_key_content.clone() }; - info!( - self.logger, + tracing::info!( "Loading SSH private key (hex, {} bytes)", private_key_content.len() ); @@ -79,8 +74,7 @@ impl SSHHandle { ) })?; - info!( - self.logger, + tracing::info!( "Hex decoded to {} bytes", private_key_bytes.len() ); @@ -89,18 +83,18 @@ impl SSHHandle { .with_context(|| "Decoded SSH private key is not valid UTF-8")?; if let Some(first_line) = private_key_pem.lines().next() { - info!(self.logger, "PEM format starts with: {}", first_line); + tracing::info!("PEM format starts with: {}", first_line); } - info!( - self.logger, - "Complete private key content:\n{}", private_key_pem + tracing::info!( + "Complete private key content:\n{}", + private_key_pem ); let private_key = { match ssh_key::PrivateKey::from_openssh(private_key_pem) { Ok(ssh_key) => { - info!(self.logger, "Successfully parsed with ssh-key crate"); + tracing::info!("Successfully parsed with ssh-key crate"); let openssh_pem = ssh_key .to_openssh(ssh_key::LineEnding::LF) .with_context(|| "Failed to serialize to OpenSSH format")?; @@ -109,8 +103,7 @@ impl SSHHandle { .with_context(|| "Failed to parse with russh after ssh-key conversion")? } Err(e) => { - info!( - self.logger, + tracing::info!( "ssh-key from_openssh failed: {}, trying direct russh parse", e ); PrivateKey::from_str(private_key_pem).with_context(|| { @@ -119,7 +112,7 @@ impl SSHHandle { } } }; - info!(self.logger, "SSH private key loaded"); + tracing::info!("SSH private key loaded"); let mut config = Config::default(); config.keys = vec![private_key]; let version = format!("SSH-2.0-GitdataAI {}", env!("CARGO_PKG_VERSION")); @@ -132,8 +125,7 @@ impl SSHHandle { config.keepalive_interval = Some(Duration::from_secs(60)); config.keepalive_max = 3; - info!( - self.logger, + tracing::info!( "SSH server configured with methods: {:?}", config.methods ); let token_service = SshTokenService::new(self.db.clone()); @@ -141,7 +133,6 @@ impl SSHHandle { self.db.clone(), self.cache.clone(), self.redis_pool.clone(), - self.logger.clone(), token_service, ); @@ -161,7 +152,7 @@ impl SSHHandle { ssh_port, public_host, ssh_port ) }; - info!(self.logger, "{}", msg); + tracing::info!("{}", msg); server.run_on_address(Arc::new(config), bind_addr).await?; Ok(()) } @@ -172,15 +163,13 @@ impl SSHHandle { #[derive(Clone)] pub struct ReceiveSyncService { pool: deadpool_redis::cluster::Pool, - logger: Logger, redis_prefix: String, } impl ReceiveSyncService { - pub fn new(pool: deadpool_redis::cluster::Pool, logger: Logger) -> Self { + pub fn new(pool: deadpool_redis::cluster::Pool) -> Self { Self { pool, - logger, redis_prefix: "{hook}".to_string(), } } @@ -199,7 +188,7 @@ impl ReceiveSyncService { let task_json = match serde_json::to_string(&hook_task) { Ok(j) => j, Err(e) => { - error!(self.logger, "failed to serialize hook task: {}", e); + tracing::error!("failed to serialize hook task: {}", e); return; } }; @@ -209,7 +198,7 @@ impl ReceiveSyncService { let redis = match self.pool.get().await { Ok(c) => c, Err(e) => { - error!(self.logger, "failed to get Redis connection: {}", e); + tracing::error!("failed to get Redis connection: {}", e); return; } }; @@ -221,7 +210,7 @@ impl ReceiveSyncService { .query_async::<()>(&mut conn) .await { - error!(self.logger, "failed to enqueue sync task repo_id={} error={}", + tracing::error!("failed to enqueue sync task repo_id={} error={}", task.repo_uid, e); } } @@ -282,8 +271,8 @@ impl SshTokenService { } } -pub async fn run_ssh(config: AppConfig, logger: Logger) -> anyhow::Result<()> { - info!(logger, "SSH server initializing"); +pub async fn run_ssh(config: AppConfig) -> anyhow::Result<()> { + tracing::info!("SSH server initializing"); let db = AppDatabase::init(&config).await?; let cache = AppCache::init(&config).await?; let redis_pool = cache.redis_pool().clone(); @@ -293,13 +282,12 @@ pub async fn run_ssh(config: AppConfig, logger: Logger) -> anyhow::Result<()> { db.clone(), cache.clone(), redis_pool.clone(), - logger.clone(), config.clone(), ); let _worker_cancel = hook.start_worker(); - slog::info!(logger, "hook worker started"); + tracing::info!("hook worker started"); - SSHHandle::new(db, config.clone(), cache, redis_pool, logger) + SSHHandle::new(db, config.clone(), cache, redis_pool) .run_ssh() .await?; Ok(()) diff --git a/libs/git/ssh/server.rs b/libs/git/ssh/server.rs index ea87b4e..29ece65 100644 --- a/libs/git/ssh/server.rs +++ b/libs/git/ssh/server.rs @@ -5,7 +5,6 @@ use db::cache::AppCache; use db::database::AppDatabase; use deadpool_redis::cluster::Pool as RedisPool; use russh::server::Handler; -use slog::{Logger, info, warn}; use std::io; use std::net::SocketAddr; @@ -13,7 +12,6 @@ pub struct SSHServer { pub db: AppDatabase, pub cache: AppCache, pub redis_pool: RedisPool, - pub logger: Logger, pub token_service: SshTokenService, } @@ -22,14 +20,12 @@ impl SSHServer { db: AppDatabase, cache: AppCache, redis_pool: RedisPool, - logger: Logger, token_service: SshTokenService, ) -> Self { SSHServer { db, cache, redis_pool, - logger, token_service, } } @@ -39,21 +35,15 @@ impl russh::server::Server for SSHServer { fn new_client(&mut self, addr: Option) -> Self::Handler { if let Some(addr) = addr { - info!( - self.logger, - "New SSH connection from {}:{}", - addr.ip(), - addr.port() - ); + tracing::info!("New SSH connection ip={} port={}", addr.ip(), addr.port()); } else { - info!(self.logger, "New SSH connection from unknown address"); + tracing::info!("New SSH connection from unknown address"); } - let sync_service = ReceiveSyncService::new(self.redis_pool.clone(), self.logger.clone()); + let sync_service = ReceiveSyncService::new(self.redis_pool.clone()); SSHandle::new( self.db.clone(), self.cache.clone(), sync_service, - self.logger.clone(), self.token_service.clone(), addr, ) @@ -62,33 +52,28 @@ impl russh::server::Server for SSHServer { fn handle_session_error(&mut self, error: ::Error) { match error { russh::Error::Disconnect => { - info!(self.logger, "Connection disconnected by peer"); + tracing::info!("Connection disconnected by peer"); } russh::Error::Inconsistent => { - warn!(self.logger, "Protocol inconsistency detected"); + tracing::warn!("Protocol inconsistency detected"); } russh::Error::NotAuthenticated => { - warn!(self.logger, "Authentication failed"); + tracing::warn!("Authentication failed"); } russh::Error::IO(ref io_err) => { - let error_msg = format!( - "IO error: kind={:?}, message={}, raw_os_error={:?}", + tracing::warn!( + "SSH IO error kind={:?} message={} raw_os_error={:?}", io_err.kind(), io_err, io_err.raw_os_error() ); - warn!(self.logger, "{}", error_msg); if io_err.kind() == io::ErrorKind::UnexpectedEof { - warn!( - self.logger, - "Client disconnected during handshake or before authentication" - ); + tracing::warn!("Client disconnected during handshake or before authentication"); } } _ => { - let error_msg = format!("SSH session error: {}", error); - warn!(self.logger, "{}", error_msg); + tracing::warn!("SSH session error error={}", error); } } }