diff --git a/libs/config/hook.rs b/libs/config/hook.rs index 51c5544..dba3539 100644 --- a/libs/config/hook.rs +++ b/libs/config/hook.rs @@ -3,6 +3,8 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PoolConfig { + /// Intended concurrency (used by K8s operator/HPA, not the worker itself). + /// The worker is single-threaded by design; K8s replicas provide parallelism. pub max_concurrent: usize, pub cpu_threshold: f32, /// Hash-tag-prefixed Redis key prefix for hook task queues. diff --git a/libs/git/hook/pool/redis.rs b/libs/git/hook/pool/redis.rs index 87c3859..796ffc0 100644 --- a/libs/git/hook/pool/redis.rs +++ b/libs/git/hook/pool/redis.rs @@ -2,6 +2,7 @@ use crate::error::GitError; use crate::hook::pool::types::HookTask; use deadpool_redis::cluster::Connection as RedisConn; use slog::Logger; +use std::time::Duration; /// Redis List consumer using BLMOVE for atomic move-from-queue-to-work pattern. pub struct RedisConsumer { @@ -12,6 +13,8 @@ pub struct RedisConsumer { logger: Logger, } +const POOL_GET_TIMEOUT: Duration = Duration::from_secs(5); + impl RedisConsumer { pub fn new( pool: deadpool_redis::cluster::Pool, @@ -36,10 +39,9 @@ impl RedisConsumer { let queue_key = format!("{}:{}", self.prefix, task_type); let work_key = format!("{}:{}:work", self.prefix, task_type); - let redis = self - .pool - .get() + let redis = tokio::time::timeout(POOL_GET_TIMEOUT, self.pool.get()) .await + .map_err(|_| GitError::Internal("redis pool get timed out".into()))? .map_err(|e| GitError::Internal(format!("redis pool get failed: {}", e)))?; let mut conn: RedisConn = redis; @@ -90,10 +92,9 @@ impl RedisConsumer { } async fn ack_raw(&self, work_key: &str, task_json: &str) -> Result<(), GitError> { - let redis = self - .pool - .get() + let redis = tokio::time::timeout(POOL_GET_TIMEOUT, self.pool.get()) .await + .map_err(|_| GitError::Internal("redis pool get timed out".into()))? .map_err(|e| GitError::Internal(format!("redis pool get failed: {}", e)))?; let mut conn: RedisConn = redis; @@ -129,10 +130,9 @@ impl RedisConsumer { old_task_json: &str, new_task_json: &str, ) -> Result<(), GitError> { - let redis = self - .pool - .get() + let redis = tokio::time::timeout(POOL_GET_TIMEOUT, self.pool.get()) .await + .map_err(|_| GitError::Internal("redis pool get timed out".into()))? .map_err(|e| GitError::Internal(format!("redis pool get failed: {}", e)))?; let mut conn: RedisConn = redis; diff --git a/libs/git/hook/pool/worker.rs b/libs/git/hook/pool/worker.rs index 6b96171..f89e648 100644 --- a/libs/git/hook/pool/worker.rs +++ b/libs/git/hook/pool/worker.rs @@ -44,26 +44,30 @@ impl HookWorker { slog::info!(self.logger, "hook worker started"); let task_types = [TaskType::Sync, TaskType::Fsck, TaskType::Gc]; - let poll_interval = Duration::from_millis(500); + let mut redis_backoff_ms: u64 = 1000; loop { - tokio::select! { - _ = cancel.cancelled() => { - slog::info!(self.logger, "hook worker shutdown signal received"); - break; - } - _ = tokio::time::sleep(poll_interval) => {} + // Check cancellation at top of loop to avoid unnecessary work + if cancel.is_cancelled() { + slog::info!(self.logger, "hook worker shutdown signal received"); + break; } for task_type in &task_types { let result = self.consumer.next(&task_type.to_string()).await; let (task, task_json) = match result { - Ok(Some(pair)) => pair, + Ok(Some(pair)) => { + // Reset backoff on successful dequeue + redis_backoff_ms = 1000; + pair + } Ok(None) => continue, Err(e) => { slog::warn!(self.logger, "failed to dequeue task: {}", e); - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_millis(redis_backoff_ms)).await; + // Exponential backoff, cap at 32s + redis_backoff_ms = (redis_backoff_ms * 2).min(32_000); break; } }; @@ -338,20 +342,8 @@ impl HookWorker { let db = self.db.clone(); let cache = self.cache.clone(); let logger = self.logger.clone(); - - tokio::task::spawn_blocking(move || { - let result = tokio::runtime::Handle::current().block_on(async { - let sync = HookMetaDataSync::new(db.clone(), cache.clone(), repo.clone(), logger.clone())?; - sync.fsck_only().await - }); - match result { - Ok(()) => Ok::<(), GitError>(()), - Err(e) => Err(GitError::Internal(e.to_string())), - } - }) - .await - .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))? - .map_err(GitError::from)?; + let sync = HookMetaDataSync::new(db, cache, repo, logger)?; + sync.fsck_only().await?; Ok(()) } @@ -376,20 +368,8 @@ impl HookWorker { let db = self.db.clone(); let cache = self.cache.clone(); let logger = self.logger.clone(); - - tokio::task::spawn_blocking(move || { - let result = tokio::runtime::Handle::current().block_on(async { - let sync = HookMetaDataSync::new(db.clone(), cache.clone(), repo.clone(), logger.clone())?; - sync.gc_only().await - }); - match result { - Ok(()) => Ok::<(), GitError>(()), - Err(e) => Err(GitError::Internal(e.to_string())), - } - }) - .await - .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))? - .map_err(GitError::from)?; + let sync = HookMetaDataSync::new(db, cache, repo, logger)?; + sync.gc_only().await?; Ok(()) } diff --git a/libs/git/hook/sync/gc.rs b/libs/git/hook/sync/gc.rs index 963bd8f..6b45663 100644 --- a/libs/git/hook/sync/gc.rs +++ b/libs/git/hook/sync/gc.rs @@ -18,6 +18,8 @@ impl HookMetaDataSync { .map_err(|e| GitError::IoError(format!("git gc failed: {}", e)))?; if !status.success() { + // git gc --auto exits non-zero when there's nothing to collect, + // or when another gc is already running — both are benign. slog::warn!(logger, "git gc exited with {:?}", status.code()); } diff --git a/libs/git/hook/sync/mod.rs b/libs/git/hook/sync/mod.rs index dcefe9a..4cac9b4 100644 --- a/libs/git/hook/sync/mod.rs +++ b/libs/git/hook/sync/mod.rs @@ -283,7 +283,7 @@ impl HookMetaDataSync { let project_uid = self.repo.project; let repo_root = match self.domain.repo().workdir() { - Some(p) => p, + Some(p) => p.to_path_buf(), None => return, }; @@ -296,12 +296,21 @@ impl HookMetaDataSync { .map(|oid| oid.to_string()) .unwrap_or_default(); - let discovered = match scan_skills_from_dir(repo_root, &self.repo.id, &commit_sha) { - Ok(d) => d, - Err(e) => { + let repo_id = self.repo.id; + let discovered = match tokio::task::spawn_blocking(move || { + scan_skills_from_dir(&repo_root, &repo_id, &commit_sha) + }) + .await + { + Ok(Ok(d)) => d, + Ok(Err(e)) => { slog::warn!(self.logger, "failed to scan skills directory: {}", e); return; } + Err(e) => { + slog::warn!(self.logger, "spawn_blocking join error: {}", e); + return; + } }; if discovered.is_empty() { diff --git a/libs/git/hook/webhook_dispatch.rs b/libs/git/hook/webhook_dispatch.rs index e8f62fa..78986e4 100644 --- a/libs/git/hook/webhook_dispatch.rs +++ b/libs/git/hook/webhook_dispatch.rs @@ -231,7 +231,7 @@ pub async fn dispatch_repo_webhooks( { Ok(ws) => ws, Err(e) => { - slog::error!(logs, "{}", format!("failed to query webhooks repo={} error={}", repo_uuid, e)); + slog::error!(logs, "failed to query webhooks repo={} error={}", repo_uuid, e); return; } }; @@ -297,7 +297,7 @@ pub async fn dispatch_repo_webhooks( let body = match serde_json::to_vec(&payload) { Ok(b) => b, Err(e) => { - slog::error!(logs, "{}", format!("failed to serialize push payload error={}", e)); + slog::error!(logs, "failed to serialize push payload error={}", e); continue; } }; @@ -310,15 +310,15 @@ pub async fn dispatch_repo_webhooks( .await { Ok(Ok(())) => { - slog::info!(logs, "{}", format!("push webhook delivered webhook_id={} url={}", webhook_id, url)); + slog::info!(logs, "push webhook delivered webhook_id={} url={}", webhook_id, url); let _ = touch_webhook(db, webhook_id, true, logs).await; } Ok(Err(e)) => { - slog::warn!(logs, "{}", format!("push webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e)); + slog::warn!(logs, "push webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e); let _ = touch_webhook(db, webhook_id, false, logs).await; } Err(_) => { - slog::warn!(logs, "{}", format!("push webhook timed out webhook_id={} url={}", webhook_id, url)); + slog::warn!(logs, "push webhook timed out webhook_id={} url={}", webhook_id, url); let _ = touch_webhook(db, webhook_id, false, logs).await; } } @@ -351,7 +351,7 @@ pub async fn dispatch_repo_webhooks( let body = match serde_json::to_vec(&payload) { Ok(b) => b, Err(e) => { - slog::error!(logs, "{}", format!("failed to serialize tag payload error={}", e)); + slog::error!(logs, "failed to serialize tag payload error={}", e); continue; } }; @@ -364,15 +364,15 @@ pub async fn dispatch_repo_webhooks( .await { Ok(Ok(())) => { - slog::info!(logs, "{}", format!("tag webhook delivered webhook_id={} url={}", webhook_id, url)); + slog::info!(logs, "tag webhook delivered webhook_id={} url={}", webhook_id, url); let _ = touch_webhook(db, webhook_id, true, logs).await; } Ok(Err(e)) => { - slog::warn!(logs, "{}", format!("tag webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e)); + slog::warn!(logs, "tag webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e); let _ = touch_webhook(db, webhook_id, false, logs).await; } Err(_) => { - slog::warn!(logs, "{}", format!("tag webhook timed out webhook_id={} url={}", webhook_id, url)); + slog::warn!(logs, "tag webhook timed out webhook_id={} url={}", webhook_id, url); let _ = touch_webhook(db, webhook_id, false, logs).await; } } @@ -405,6 +405,6 @@ async fn touch_webhook(db: &AppDatabase, webhook_id: i64, success: bool, logs: & }; if let Err(e) = result { - slog::warn!(logs, "{}", format!("failed to update webhook touch error={}", e)); + slog::warn!(logs, "failed to update webhook touch error={}", e); } }