From bbf2d75fbacbc560607186b3481d6a2c51784e46 Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Thu, 16 Apr 2026 21:41:35 +0800 Subject: [PATCH] fix(git): harden hook pool retry, standardize slog log format - Add retry_count to HookTask with serde(default) for backwards compat - Limit hook task retries to MAX_RETRIES=5, discard after limit to prevent infinite requeue loops that caused 'task nack'd and requeued' log spam - Add nak_with_retry() in RedisConsumer to requeue with incremented count - Standardize all slog logs: replace "info!(l, "msg"; "k" => v)" shorthand with "info!(l, "{}", format!("msg k={}", v))" across ssh/authz.rs, ssh/handle.rs, ssh/server.rs, hook/webhook_dispatch.rs, hook/pool/mod.rs --- libs/git/hook/pool/mod.rs | 45 ++++++++++++++++++++++++------- libs/git/hook/pool/redis.rs | 22 +++++++++++---- libs/git/hook/pool/types.rs | 4 +++ libs/git/hook/webhook_dispatch.rs | 20 +++++++------- libs/git/ssh/authz.rs | 26 +++++++++--------- libs/git/ssh/handle.rs | 40 +++++++++++++-------------- libs/git/ssh/mod.rs | 3 ++- libs/git/ssh/server.rs | 2 +- 8 files changed, 103 insertions(+), 59 deletions(-) diff --git a/libs/git/hook/pool/mod.rs b/libs/git/hook/pool/mod.rs index 82dd8de..bf86a6e 100644 --- a/libs/git/hook/pool/mod.rs +++ b/libs/git/hook/pool/mod.rs @@ -181,7 +181,8 @@ impl GitHookPool { "task_id" => &task.id, "task_type" => %task.task_type, "repo_id" => &task.repo_id, - "worker_id" => &self.config.worker_id + "worker_id" => &self.config.worker_id, + "retry" => task.retry_count ); self.log_stream @@ -210,13 +211,39 @@ impl GitHookPool { .await; } Err(e) => { - if let Err(e) = consumer.nak(&work_key, &queue_key, &task_json).await { - slog::warn!(self.logger, "failed to nak task: {}", e); + // Check retry count and decide whether to requeue or discard. + const MAX_RETRIES: u32 = 5; + if task.retry_count >= MAX_RETRIES { + // Max retries exceeded — discard the task to prevent infinite loop. + slog::warn!(self.logger, "task exhausted retries, discarding"; + "task_id" => &task.id, + "task_type" => %task.task_type, + "repo_id" => &task.repo_id, + "retry_count" => task.retry_count, + "last_error" => %e + ); + if let Err(e) = consumer.ack(&work_key, &task_json).await { + slog::warn!(self.logger, "failed to ack discarded task: {}", e); + } + self.total_failed.fetch_add(1, Ordering::Relaxed); + self.log_stream + .error(&task.id, &task.repo_id, &format!("task failed after {} retries: {}", task.retry_count, e)) + .await; + } else { + // Requeue with incremented retry count. + let mut task = task; + task.retry_count += 1; + let retry_json = serde_json::to_string(&task) + .unwrap_or_else(|_| task_json.clone()); + + if let Err(e) = consumer.nak_with_retry(&work_key, &queue_key, &task_json, &retry_json).await { + slog::warn!(self.logger, "failed to nak task: {}", e); + } + self.total_failed.fetch_add(1, Ordering::Relaxed); + self.log_stream + .error(&task.id, &task.repo_id, &format!("task failed: {}", e)) + .await; } - self.total_failed.fetch_add(1, Ordering::Relaxed); - self.log_stream - .error(&task.id, &task.repo_id, &format!("task failed: {}", e)) - .await; } } @@ -315,7 +342,7 @@ impl GitHookPool { let after = after_oid.clone(); let branch_name = branch.clone(); - slog::info!(logger, "detected push on branch"; "branch" => &branch_name, "before" => &before_oid, "after" => &after); + slog::info!(logger, "{}", format!("detected push on branch branch={} before={} after={}", branch_name, before_oid, after)); let http = http.clone(); let db = db.clone(); @@ -361,7 +388,7 @@ impl GitHookPool { let after = after_oid.clone(); let tag_name = tag.clone(); - slog::info!(logger, "detected tag push"; "tag" => &tag_name, "before" => &before_oid, "after" => &after); + slog::info!(logger, "{}", format!("detected tag push tag={} before={} after={}", tag_name, before_oid, after)); let http = http.clone(); let db = db.clone(); diff --git a/libs/git/hook/pool/redis.rs b/libs/git/hook/pool/redis.rs index 95c11e4..2ea6265 100644 --- a/libs/git/hook/pool/redis.rs +++ b/libs/git/hook/pool/redis.rs @@ -120,10 +120,22 @@ impl RedisConsumer { queue_key: &str, task_json: &str, ) -> Result<(), GitError> { - // First remove from work queue - self.ack_raw(work_key, task_json).await?; + self.nak_with_retry(work_key, queue_key, task_json, task_json).await + } - // Then push back to main queue for retry + /// Negative acknowledge with a different (updated) task JSON — used to + /// requeue with an incremented retry_count. + pub async fn nak_with_retry( + &self, + work_key: &str, + queue_key: &str, + old_task_json: &str, + new_task_json: &str, + ) -> Result<(), GitError> { + // Remove the old entry from work queue + self.ack_raw(work_key, old_task_json).await?; + + // Push the updated entry back to main queue for retry let redis = self .pool .get() @@ -134,12 +146,12 @@ impl RedisConsumer { let _: i64 = redis::cmd("LPUSH") .arg(queue_key) - .arg(task_json) + .arg(new_task_json) .query_async(&mut conn) .await .map_err(|e| GitError::Internal(format!("LPUSH retry failed: {}", e)))?; - slog::warn!(self.logger, "task nack'd and requeued"; "queue" => %queue_key); + slog::warn!(self.logger, "{}", format!("task nack'd and requeued queue={}", queue_key)); Ok(()) } diff --git a/libs/git/hook/pool/types.rs b/libs/git/hook/pool/types.rs index fc07999..b1643db 100644 --- a/libs/git/hook/pool/types.rs +++ b/libs/git/hook/pool/types.rs @@ -9,6 +9,10 @@ pub struct HookTask { pub task_type: TaskType, pub payload: serde_json::Value, pub created_at: chrono::DateTime, + /// Number of times this task has been retried after a failure. + /// When >= MAX_TASK_RETRIES, the task is discarded instead of requeued. + #[serde(default)] + pub retry_count: u32, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] diff --git a/libs/git/hook/webhook_dispatch.rs b/libs/git/hook/webhook_dispatch.rs index 53dca25..e8f62fa 100644 --- a/libs/git/hook/webhook_dispatch.rs +++ b/libs/git/hook/webhook_dispatch.rs @@ -231,7 +231,7 @@ pub async fn dispatch_repo_webhooks( { Ok(ws) => ws, Err(e) => { - slog::error!(logs, "failed to query webhooks: {}", e; "repo" => repo_uuid); + slog::error!(logs, "{}", format!("failed to query webhooks repo={} error={}", repo_uuid, e)); return; } }; @@ -297,7 +297,7 @@ pub async fn dispatch_repo_webhooks( let body = match serde_json::to_vec(&payload) { Ok(b) => b, Err(e) => { - slog::error!(logs, "failed to serialize push payload"; "error" => e.to_string()); + slog::error!(logs, "{}", format!("failed to serialize push payload error={}", e)); continue; } }; @@ -310,15 +310,15 @@ pub async fn dispatch_repo_webhooks( .await { Ok(Ok(())) => { - slog::info!(logs, "push webhook delivered"; "webhook_id" => webhook_id, "url" => url); + slog::info!(logs, "{}", format!("push webhook delivered webhook_id={} url={}", webhook_id, url)); let _ = touch_webhook(db, webhook_id, true, logs).await; } Ok(Err(e)) => { - slog::warn!(logs, "push webhook delivery failed"; "error" => e.to_string(), "webhook_id" => webhook_id, "url" => url); + slog::warn!(logs, "{}", format!("push webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e)); let _ = touch_webhook(db, webhook_id, false, logs).await; } Err(_) => { - slog::warn!(logs, "push webhook timed out"; "webhook_id" => webhook_id, "url" => url); + slog::warn!(logs, "{}", format!("push webhook timed out webhook_id={} url={}", webhook_id, url)); let _ = touch_webhook(db, webhook_id, false, logs).await; } } @@ -351,7 +351,7 @@ pub async fn dispatch_repo_webhooks( let body = match serde_json::to_vec(&payload) { Ok(b) => b, Err(e) => { - slog::error!(logs, "failed to serialize tag payload"; "error" => e.to_string()); + slog::error!(logs, "{}", format!("failed to serialize tag payload error={}", e)); continue; } }; @@ -364,15 +364,15 @@ pub async fn dispatch_repo_webhooks( .await { Ok(Ok(())) => { - slog::info!(logs, "tag webhook delivered"; "webhook_id" => webhook_id, "url" => url); + slog::info!(logs, "{}", format!("tag webhook delivered webhook_id={} url={}", webhook_id, url)); let _ = touch_webhook(db, webhook_id, true, logs).await; } Ok(Err(e)) => { - slog::warn!(logs, "tag webhook delivery failed"; "error" => e.to_string(), "webhook_id" => webhook_id, "url" => url); + slog::warn!(logs, "{}", format!("tag webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e)); let _ = touch_webhook(db, webhook_id, false, logs).await; } Err(_) => { - slog::warn!(logs, "tag webhook timed out"; "webhook_id" => webhook_id, "url" => url); + slog::warn!(logs, "{}", format!("tag webhook timed out webhook_id={} url={}", webhook_id, url)); let _ = touch_webhook(db, webhook_id, false, logs).await; } } @@ -405,6 +405,6 @@ async fn touch_webhook(db: &AppDatabase, webhook_id: i64, success: bool, logs: & }; if let Err(e) = result { - slog::warn!(logs, "failed to update webhook touch"; "error" => e.to_string()); + slog::warn!(logs, "{}", format!("failed to update webhook touch error={}", e)); } } diff --git a/libs/git/ssh/authz.rs b/libs/git/ssh/authz.rs index 55c708f..7f53da4 100644 --- a/libs/git/ssh/authz.rs +++ b/libs/git/ssh/authz.rs @@ -103,7 +103,7 @@ impl SshAuthService { let fingerprint = match self.generate_fingerprint_from_public_key(public_key_str) { Ok(fp) => fp, Err(e) => { - error!(self.logger, "Failed to generate fingerprint"; "error" => %e); + error!(self.logger, "{}", format!("Failed to generate fingerprint error={}", e)); return Ok(None); } }; @@ -113,7 +113,7 @@ impl SshAuthService { } else { fingerprint.clone() }; - info!(self.logger, "Looking up user with SSH key"; "fingerprint" => %fingerprint_preview); + info!(self.logger, "{}", format!("Looking up user with SSH key fingerprint={}", fingerprint_preview)); let ssh_key = user_ssh_key::Entity::find() .filter(user_ssh_key::Column::Fingerprint.eq(&fingerprint)) @@ -124,13 +124,13 @@ impl SshAuthService { let ssh_key = match ssh_key { Some(key) => key, None => { - warn!(self.logger, "No SSH key found"; "fingerprint" => %fingerprint); + warn!(self.logger, "{}", format!("No SSH key found fingerprint={}", fingerprint)); return Ok(None); } }; if self.is_key_expired(&ssh_key) { - warn!(self.logger, "SSH key expired"; "key_id" => ssh_key.id, "expires_at" => ?ssh_key.expires_at); + warn!(self.logger, "{}", format!("SSH key expired key_id={} expires_at={:?}", ssh_key.id, ssh_key.expires_at)); return Ok(None); } @@ -140,7 +140,7 @@ impl SshAuthService { .await?; if let Some(ref user) = user_model { - info!(self.logger, "User authenticated"; "user" => %user.username, "key" => %ssh_key.title); + info!(self.logger, "{}", format!("User authenticated user={} key={}", user.username, ssh_key.title)); self.update_key_last_used_async(ssh_key.id); } @@ -161,7 +161,7 @@ impl SshAuthService { let logger = self.logger.clone(); tokio::spawn(async move { if let Err(e) = Self::update_key_last_used_sync(db_clone, &logger, key_id).await { - warn!(&logger, "Failed to update key last_used"; "key_id" => key_id, "error" => %e); + warn!(&logger, "{}", format!("Failed to update key last_used key_id={} error={}", key_id, e)); } }); } @@ -182,7 +182,7 @@ impl SshAuthService { active_key.updated_at = Set(now); active_key.update(db.writer()).await?; - info!(logger, "Updated key last_used"; "key_id" => key_id); + info!(logger, "{}", format!("Updated key last_used key_id={}", key_id)); } Ok(()) @@ -195,12 +195,12 @@ impl SshAuthService { is_write: bool, ) -> bool { if repo.created_by == user.uid { - info!(self.logger, "User is repo owner"; "user" => %user.username, "repo" => %repo.repo_name); + info!(self.logger, "{}", format!("User is repo owner user={} repo={}", user.username, repo.repo_name)); return true; } if !is_write && !repo.is_private { - info!(self.logger, "Public repo allows read"; "repo" => %repo.repo_name); + info!(self.logger, "{}", format!("Public repo allows read repo={}", repo.repo_name)); return true; } @@ -209,7 +209,7 @@ impl SshAuthService { .await .unwrap_or(false) { - info!(self.logger, "User has collaborator access"; "user" => %user.username, "repo" => %repo.repo_name); + info!(self.logger, "{}", format!("User has collaborator access user={} repo={}", user.username, repo.repo_name)); return true; } @@ -219,11 +219,11 @@ impl SshAuthService { .await .unwrap_or(false) { - info!(self.logger, "User has project member access"; "user" => %user.username, "repo" => %repo.repo_name); + info!(self.logger, "{}", format!("User has project member access user={} repo={}", user.username, repo.repo_name)); return true; } - warn!(self.logger, "Access denied"; "user" => %user.username, "repo" => %repo.repo_name, "write" => is_write); + warn!(self.logger, "{}", format!("Access denied user={} repo={} write={}", user.username, repo.repo_name, is_write)); false } @@ -251,7 +251,7 @@ impl SshAuthService { return Ok(true); } - warn!(self.logger, "Collaborator has no valid roles"; "scope" => %collab.scope); + warn!(self.logger, "{}", format!("Collaborator has no valid roles scope={}", collab.scope)); Ok(false) } else { Ok(false) diff --git a/libs/git/ssh/handle.rs b/libs/git/ssh/handle.rs index 1e93bc6..822f271 100644 --- a/libs/git/ssh/handle.rs +++ b/libs/git/ssh/handle.rs @@ -516,7 +516,7 @@ impl russh::server::Handler for SSHandle { self.branch.insert(channel, refs); } Err(e) => { - warn!(self.logger, "Failed to parse ref updates, forwarding raw data"; "error" => ?e); + warn!(self.logger, "{}", format!("Failed to parse ref updates, forwarding raw data error={:?}", e)); self.branch.insert(channel, vec![]); } } @@ -525,7 +525,7 @@ impl russh::server::Handler for SSHandle { stdin.write_all(&buffered).await?; stdin.flush().await?; } else { - error!(self.logger, "stdin not found"; "channel" => ?channel); + error!(self.logger, "{}", format!("stdin not found channel={:?}", channel)); } return Ok(()); } @@ -534,7 +534,7 @@ impl russh::server::Handler for SSHandle { stdin.write_all(data).await?; stdin.flush().await?; } else { - error!(self.logger, "stdin not found (forwarding)"; "channel" => ?channel); + error!(self.logger, "{}", format!("stdin not found (forwarding) channel={:?}", channel)); } return Ok(()); } @@ -565,7 +565,7 @@ impl russh::server::Handler for SSHandle { user.username ); - info!(self.logger, "Shell request"; "user" => %user.username); + info!(self.logger, "{}", format!("Shell request user={}", user.username)); session .data(channel_id, CryptoVec::from_slice(welcome_msg.as_bytes())) .ok(); @@ -605,7 +605,7 @@ impl russh::server::Handler for SSHandle { let git_shell_cmd = match std::str::from_utf8(data) { Ok(cmd) => cmd.trim(), Err(e) => { - error!(self.logger, "Invalid command encoding"; "error" => %e); + error!(self.logger, "{}", format!("Invalid command encoding error={}", e)); session .disconnect( Disconnect::ServiceNotAvailable, @@ -619,7 +619,7 @@ impl russh::server::Handler for SSHandle { let (service, path) = match parse_git_command(git_shell_cmd) { Some((s, p)) => (s, p), None => { - error!(self.logger, "Invalid git command"; "command" => %git_shell_cmd); + error!(self.logger, "{}", format!("Invalid git command command={}", git_shell_cmd)); let msg = format!("Invalid git command: {}", git_shell_cmd); session .disconnect(Disconnect::ServiceNotAvailable, &msg, "") @@ -632,7 +632,7 @@ impl russh::server::Handler for SSHandle { Some(pair) => pair, None => { let msg = format!("Invalid repository path: {}", path); - error!(self.logger, "Invalid repo path"; "path" => path); + error!(self.logger, "{}", format!("Invalid repo path path={}", path)); session .disconnect(Disconnect::ServiceNotAvailable, &msg, "") .ok(); @@ -645,7 +645,7 @@ impl russh::server::Handler for SSHandle { Ok(repo) => repo, Err(e) => { // Log the detailed error internally; client receives generic message. - error!(self.logger, "Error fetching repo"; "error" => %e); + error!(self.logger, "{}", format!("Error fetching repo error={}", e)); session .disconnect(Disconnect::ServiceNotAvailable, "Repository not found", "") .ok(); @@ -677,7 +677,7 @@ impl russh::server::Handler for SSHandle { if is_write { "write" } else { "read" }, repo.repo_name ); - error!(self.logger, "Access denied"; "user" => %operator.username, "repo" => %repo.repo_name, "is_write" => is_write); + error!(self.logger, "{}", format!("Access denied user={} repo={} is_write={}", operator.username, repo.repo_name, is_write)); session.disconnect(Disconnect::ByApplication, &msg, "").ok(); return Err(russh::Error::Disconnect); } @@ -690,20 +690,20 @@ impl russh::server::Handler for SSHandle { .await { let msg = format!("Rate limit exceeded for repository access: {}", repo_path); - warn!(self.logger, "Repo access rate limit exceeded"; "user" => %operator.username, "repo" => %repo.repo_name); + warn!(self.logger, "{}", format!("Repo access rate limit exceeded user={} repo={}", operator.username, repo.repo_name)); session.disconnect(Disconnect::ByApplication, &msg, "").ok(); return Err(russh::Error::Disconnect); } - info!(self.logger, "Access granted"; "user" => %operator.username, "repo" => %repo.repo_name, "is_write" => is_write); + info!(self.logger, "{}", format!("Access granted user={} repo={} is_write={}", operator.username, repo.repo_name, is_write)); let repo_path = PathBuf::from(&repo.storage_path); if !repo_path.exists() { - error!(self.logger, "Repository path not found"; "path" => %repo.storage_path); + error!(self.logger, "{}", format!("Repository path not found path={}", repo.storage_path)); } let mut cmd = build_git_command(service, repo_path); let logger = self.logger.clone(); - info!(&logger, "Spawning git process"; "service" => ?service, "path" => %repo.storage_path); + info!(&logger, "{}", format!("Spawning git process service={:?} path={}", service, repo.storage_path)); let mut shell = match cmd .stdin(Stdio::piped()) .stdout(Stdio::piped()) @@ -715,7 +715,7 @@ impl russh::server::Handler for SSHandle { shell } Err(e) => { - error!(&logger, "Process spawn failed"; "error" => %e); + error!(&logger, "{}", format!("Process spawn failed error={}", e)); let _ = session.channel_failure(channel_id); self.cleanup_channel(channel_id); dbg!(&e); @@ -735,7 +735,7 @@ impl russh::server::Handler for SSHandle { let sync = self.sync.clone(); let logger_for_fut = self.logger.clone(); let fut = async move { - info!(&logger_for_fut, "Task started"; "channel" => ?channel_id); + info!(&logger_for_fut, "{}", format!("Task started channel={:?}", channel_id)); let mut stdout_done = false; let mut stderr_done = false; @@ -762,7 +762,7 @@ impl russh::server::Handler for SSHandle { let status = result?; let status_code = status.code().unwrap_or(128) as u32; - info!(&logger_for_fut, "Git process exited"; "channel" => ?channel_id, "status" => status_code); + info!(&logger_for_fut, "{}", format!("Git process exited channel={:?} status={}", channel_id, status_code)); if !stdout_done || !stderr_done { let _ = tokio::time::timeout(Duration::from_millis(100), async { @@ -792,21 +792,21 @@ impl russh::server::Handler for SSHandle { sleep(Duration::from_millis(50)).await; let _ = session_handle.eof(channel_id).await; let _ = session_handle.close(channel_id).await; - info!(&logger_for_fut, "Channel closed"; "channel" => ?channel_id); + info!(&logger_for_fut, "{}", format!("Channel closed channel={:?}", channel_id)); break; } result = &mut stdout_fut, if !stdout_done => { info!(&logger_for_fut, "stdout completed"); stdout_done = true; if let Err(e) = result { - warn!(&logger_for_fut, "stdout forward error"; "error" => ?e); + warn!(&logger_for_fut, "{}", format!("stdout forward error error={:?}", e)); } } result = &mut stderr_fut, if !stderr_done => { info!(&logger_for_fut, "stderr completed"); stderr_done = true; if let Err(e) = result { - warn!(&logger_for_fut, "stderr forward error"; "error" => ?e); + warn!(&logger_for_fut, "{}", format!("stderr forward error error={:?}", e)); } } } @@ -817,7 +817,7 @@ impl russh::server::Handler for SSHandle { tokio::spawn(async move { if let Err(e) = fut.await { - error!(&logger, "Git SSH channel task error"; "error" => %e); + error!(&logger, "{}", format!("Git SSH channel task error error={}", e)); } while eof_rx.recv().await.is_some() {} }); diff --git a/libs/git/ssh/mod.rs b/libs/git/ssh/mod.rs index a03fd17..bc8c498 100644 --- a/libs/git/ssh/mod.rs +++ b/libs/git/ssh/mod.rs @@ -193,6 +193,7 @@ impl ReceiveSyncService { task_type: TaskType::Sync, payload: serde_json::Value::Null, created_at: chrono::Utc::now(), + retry_count: 0, }; let task_json = match serde_json::to_string(&hook_task) { @@ -220,7 +221,7 @@ impl ReceiveSyncService { .query_async::<()>(&mut conn) .await { - error!(self.logger, "Failed to LPUSH sync task"; "error" => %e, "repo_id" => %task.repo_uid); + error!(self.logger, "{}", format!("Failed to LPUSH sync task repo_id={} error={}", task.repo_uid, e)); } } } diff --git a/libs/git/ssh/server.rs b/libs/git/ssh/server.rs index b06e31d..ae4e680 100644 --- a/libs/git/ssh/server.rs +++ b/libs/git/ssh/server.rs @@ -55,7 +55,7 @@ impl russh::server::Server for SSHServer { let logger = self.logger.clone(); tokio::spawn(async move { if !rate_limiter.is_ip_allowed(&ip).await { - warn!(logger, "IP rate limit exceeded"; "ip" => %ip); + warn!(logger, "{}", format!("IP rate limit exceeded ip={}", ip)); } }); } else {