use crate::error::GitError; use crate::hook::pool::redis::RedisConsumer; use crate::hook::pool::types::{HookTask, TaskType}; use crate::hook::sync::HookMetaDataSync; use db::cache::AppCache; use db::database::AppDatabase; use models::EntityTrait; use slog::Logger; use std::time::Duration; use tokio_util::sync::CancellationToken; /// Single-threaded worker that sequentially consumes tasks from Redis queues. /// K8s can scale replicas for concurrency — each replica runs one worker. /// Per-repo Redis locking prevents concurrent workers from processing the same repo. #[derive(Clone)] pub struct HookWorker { db: AppDatabase, cache: AppCache, logger: Logger, consumer: RedisConsumer, } impl HookWorker { pub fn new( db: AppDatabase, cache: AppCache, logger: Logger, consumer: RedisConsumer, ) -> Self { Self { db, cache, logger, consumer, } } /// Run the worker loop. Blocks until cancelled. pub async fn run(&self, cancel: CancellationToken) { slog::info!(self.logger, "hook worker started"); let task_types = [TaskType::Sync, TaskType::Fsck, TaskType::Gc]; let poll_interval = Duration::from_millis(500); loop { tokio::select! { _ = cancel.cancelled() => { slog::info!(self.logger, "hook worker shutdown signal received"); break; } _ = tokio::time::sleep(poll_interval) => {} } for task_type in &task_types { let result = self .consumer .next(&task_type.to_string()) .await; let (task, task_json) = match result { Ok(Some(pair)) => pair, Ok(None) => continue, Err(e) => { slog::warn!(self.logger, "failed to dequeue task: {}", e); tokio::time::sleep(Duration::from_secs(1)).await; break; } }; let queue_key = format!("{}:{}", self.consumer.prefix(), task_type); let work_key = format!("{}:work", queue_key); self.process_task(&task, &task_json, &work_key, &queue_key) .await; } } slog::info!(self.logger, "hook worker stopped"); } async fn process_task( &self, task: &HookTask, task_json: &str, work_key: &str, queue_key: &str, ) { slog::info!(self.logger, "task started task_id={} task_type={} repo_id={}", task.id, task.task_type, task.repo_id); let result = match task.task_type { TaskType::Sync => self.run_sync(&task.repo_id).await, TaskType::Fsck => self.run_fsck(&task.repo_id).await, TaskType::Gc => self.run_gc(&task.repo_id).await, }; match result { Ok(()) => { if let Err(e) = self.consumer.ack(work_key, task_json).await { slog::warn!(self.logger, "failed to ack task: {}", e); } slog::info!(self.logger, "task completed task_id={}", task.id); } Err(e) => { // GitError::Locked means another worker is processing this repo — // requeue without incrementing retry count so it can be picked up later. let is_locked = matches!(e, crate::GitError::Locked(_)); if is_locked { slog::info!(self.logger, "repo locked by another worker, requeueing task_id={}", task.id); if let Err(nak_err) = self.consumer.nak(work_key, queue_key, task_json).await { slog::warn!(self.logger, "failed to requeue locked task: {}", nak_err); } } else { slog::warn!(self.logger, "task failed task_id={} task_type={} repo_id={} error={}", task.id, task.task_type, task.repo_id, e); const MAX_RETRIES: u32 = 5; if task.retry_count >= MAX_RETRIES { slog::warn!(self.logger, "task exhausted retries, discarding task_id={} retry_count={}", task.id, task.retry_count); let _ = self.consumer.ack(work_key, task_json).await; } else { let mut task = task.clone(); task.retry_count += 1; let retry_json = serde_json::to_string(&task).unwrap_or_else(|_| task_json.to_string()); let _ = self .consumer .nak_with_retry(work_key, queue_key, task_json, &retry_json) .await; } } } } } async fn run_sync(&self, repo_id: &str) -> Result<(), GitError> { let repo_uuid = models::Uuid::parse_str(repo_id) .map_err(|_| GitError::Internal("invalid repo_id uuid".into()))?; let repo = models::repos::repo::Entity::find_by_id(repo_uuid) .one(self.db.reader()) .await .map_err(GitError::from)? .ok_or_else(|| GitError::NotFound(format!("repo {} not found", repo_id)))?; if !std::path::Path::new(&repo.storage_path).exists() { return Err(GitError::NotFound(format!( "storage path does not exist: {}", repo.storage_path ))); } // Capture before tips let before_tips = tokio::task::spawn_blocking({ let db = self.db.clone(); let cache = self.cache.clone(); let logger = self.logger.clone(); let repo = repo.clone(); move || { let sync = HookMetaDataSync::new(db, cache, repo, logger) .map_err(|e| GitError::Internal(e.to_string()))?; Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips())) } }) .await .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))? .map_err(GitError::from)?; // Run sync tokio::task::spawn_blocking({ let db = self.db.clone(); let cache = self.cache.clone(); let logger = self.logger.clone(); let repo = repo.clone(); move || { let result = tokio::runtime::Handle::current().block_on(async { let sync = HookMetaDataSync::new(db, cache, repo, logger)?; sync.sync().await }); match result { Ok(()) => Ok::<(), GitError>(()), Err(e) => Err(GitError::Internal(e.to_string())), } } }) .await .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))? .map_err(GitError::from)?; // Capture after tips let after_tips = tokio::task::spawn_blocking({ let db = self.db.clone(); let cache = self.cache.clone(); let logger = self.logger.clone(); let repo = repo.clone(); move || { let sync = HookMetaDataSync::new(db, cache, repo, logger) .map_err(|e| GitError::Internal(e.to_string()))?; Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips())) } }) .await .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))? .map_err(GitError::from)?; let (before_branch_tips, before_tag_tips) = before_tips; let (after_branch_tips, after_tag_tips) = after_tips; let project = repo.project; // Dispatch branch push webhooks for (branch, after_oid) in after_branch_tips { let before_oid = before_branch_tips .iter() .find(|(n, _)| n == &branch) .map(|(_, o)| o.as_str()); let changed = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(true); if changed { let before_oid = before_oid.map_or("0", |v| v).to_string(); let branch_name = branch.clone(); let db = self.db.clone(); let logger = self.logger.clone(); let repo_id_str = repo.id.to_string(); let repo_name = repo.repo_name.clone(); let default_branch = repo.default_branch.clone(); let ns = models::projects::Project::find_by_id(project) .one(self.db.reader()) .await .ok() .flatten() .map(|p| p.name) .unwrap_or_default(); tokio::spawn(async move { crate::hook::webhook_dispatch::dispatch_repo_webhooks( &db, &reqwest::Client::new(), &logger, &repo_id_str, &ns, &repo_name, &default_branch, "", "", crate::hook::webhook_dispatch::WebhookEventKind::Push { r#ref: format!("refs/heads/{}", branch_name), before: before_oid, after: after_oid, commits: vec![], }, ) .await; }); } } // Dispatch tag push webhooks for (tag, after_oid) in after_tag_tips { let before_oid = before_tag_tips .iter() .find(|(n, _)| n == &tag) .map(|(_, o)| o.as_str()); let is_new = before_oid.is_none(); let was_updated = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(false); if is_new || was_updated { let before_oid = before_oid.map_or("0", |v| v).to_string(); let tag_name = tag.clone(); let db = self.db.clone(); let logger = self.logger.clone(); let repo_id_str = repo.id.to_string(); let repo_name = repo.repo_name.clone(); let default_branch = repo.default_branch.clone(); let ns = models::projects::Project::find_by_id(project) .one(self.db.reader()) .await .ok() .flatten() .map(|p| p.name) .unwrap_or_default(); tokio::spawn(async move { crate::hook::webhook_dispatch::dispatch_repo_webhooks( &db, &reqwest::Client::new(), &logger, &repo_id_str, &ns, &repo_name, &default_branch, "", "", crate::hook::webhook_dispatch::WebhookEventKind::TagPush { r#ref: format!("refs/tags/{}", tag_name), before: before_oid, after: after_oid, }, ) .await; }); } } Ok(()) } async fn run_fsck(&self, repo_id: &str) -> Result<(), GitError> { let repo_uuid = models::Uuid::parse_str(repo_id) .map_err(|_| GitError::Internal("invalid repo_id uuid".into()))?; let repo = models::repos::repo::Entity::find_by_id(repo_uuid) .one(self.db.reader()) .await .map_err(GitError::from)? .ok_or_else(|| GitError::NotFound(format!("repo {} not found", repo_id)))?; if !std::path::Path::new(&repo.storage_path).exists() { return Err(GitError::NotFound(format!( "storage path does not exist: {}", repo.storage_path ))); } let db = self.db.clone(); let cache = self.cache.clone(); let logger = self.logger.clone(); tokio::task::spawn_blocking({ let repo = repo.clone(); move || { let result = tokio::runtime::Handle::current().block_on(async { let sync = HookMetaDataSync::new(db, cache, repo, logger)?; sync.fsck_only().await }); match result { Ok(()) => Ok::<(), GitError>(()), Err(e) => Err(GitError::Internal(e.to_string())), } } }) .await .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))? .map_err(GitError::from)?; Ok(()) } async fn run_gc(&self, repo_id: &str) -> Result<(), GitError> { let repo_uuid = models::Uuid::parse_str(repo_id) .map_err(|_| GitError::Internal("invalid repo_id uuid".into()))?; let repo = models::repos::repo::Entity::find_by_id(repo_uuid) .one(self.db.reader()) .await .map_err(GitError::from)? .ok_or_else(|| GitError::NotFound(format!("repo {} not found", repo_id)))?; if !std::path::Path::new(&repo.storage_path).exists() { return Err(GitError::NotFound(format!( "storage path does not exist: {}", repo.storage_path ))); } let db = self.db.clone(); let cache = self.cache.clone(); let logger = self.logger.clone(); tokio::task::spawn_blocking({ let repo = repo.clone(); move || { let result = tokio::runtime::Handle::current().block_on(async { let sync = HookMetaDataSync::new(db, cache, repo, logger)?; sync.gc_only().await }); match result { Ok(()) => Ok::<(), GitError>(()), Err(e) => Err(GitError::Internal(e.to_string())), } } }) .await .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))? .map_err(GitError::from)?; Ok(()) } }