use crate::error::GitError; use crate::hook::pool::redis::RedisConsumer; use crate::hook::pool::types::{HookTask, TaskType}; use crate::hook::sync::HookMetaDataSync; use agent::TagEmbedInput; use db::cache::AppCache; use db::database::AppDatabase; use metrics::counter; use models::repos::repo_tag; use models::EntityTrait; use sea_orm::{ColumnTrait, QueryFilter}; use std::sync::Arc; use std::time::Duration; use tokio_util::sync::CancellationToken; /// Single-threaded worker that sequentially consumes tasks from Redis queues. /// K8s can scale replicas for concurrency — each replica runs one worker. /// Per-repo Redis locking is managed inside HookMetaDataSync methods. #[derive(Clone)] pub struct HookWorker { db: AppDatabase, cache: AppCache, consumer: RedisConsumer, http_client: Arc, max_retries: u32, embed_service: Option, } impl HookWorker { pub fn new( db: AppDatabase, cache: AppCache, consumer: RedisConsumer, http_client: Arc, max_retries: u32, embed_service: Option, ) -> Self { Self { db, cache, consumer, http_client, max_retries, embed_service, } } /// Run the worker loop. Blocks until cancelled. pub async fn run(&self, cancel: CancellationToken) { tracing::info!("hook worker started"); let task_types = [TaskType::Sync, TaskType::Fsck, TaskType::Gc]; let mut redis_backoff_ms: u64 = 1000; loop { // Check cancellation at top of loop to avoid unnecessary work if cancel.is_cancelled() { tracing::info!("hook worker shutdown signal received"); break; } for task_type in &task_types { let result = self.consumer.next(&task_type.to_string()).await; let (task, task_json) = match result { Ok(Some(pair)) => { // Reset backoff on successful dequeue redis_backoff_ms = 1000; pair } Ok(None) => continue, Err(e) => { tracing::warn!("failed to dequeue task: {}", e); tokio::time::sleep(Duration::from_millis(redis_backoff_ms)).await; // Exponential backoff, cap at 32s redis_backoff_ms = (redis_backoff_ms * 2).min(32_000); break; } }; let queue_key = format!("{}:{}", self.consumer.prefix(), task_type); let work_key = format!("{}:work", queue_key); self.process_task(&task, &task_json, &work_key, &queue_key) .await; } } tracing::info!("hook worker stopped"); } async fn process_task( &self, task: &HookTask, task_json: &str, work_key: &str, queue_key: &str, ) { tracing::info!("task started task_id={} task_type={} repo_id={}", task.id, task.task_type, task.repo_id); counter!("hook_tasks_total", "task_type" => task.task_type.to_string()).increment(1); let result = match task.task_type { TaskType::Sync => self.run_sync(&task.repo_id).await, TaskType::Fsck => self.run_fsck(&task.repo_id).await, TaskType::Gc => self.run_gc(&task.repo_id).await, }; match result { Ok(()) => { counter!("hook_tasks_success_total", "task_type" => task.task_type.to_string()).increment(1); if let Err(e) = self.consumer.ack(work_key, task_json).await { tracing::warn!("failed to ack task: {}", e); } tracing::info!("task completed task_id={}", task.id); } Err(e) => { let is_locked = matches!(e, crate::GitError::Locked(_)); if is_locked { counter!("hook_tasks_locked_total").increment(1); // Another worker holds the lock — requeue without counting as retry. tracing::info!("repo locked by another worker, requeueing task_id={}", task.id); if let Err(nak_err) = self.consumer.nak(work_key, queue_key, task_json).await { tracing::warn!("failed to requeue locked task: {}", nak_err); } } else { counter!("hook_tasks_failed_total", "task_type" => task.task_type.to_string()).increment(1); tracing::warn!("task failed task_id={} task_type={} repo_id={} error={}", task.id, task.task_type, task.repo_id, e); if task.retry_count >= self.max_retries { counter!("hook_tasks_exhausted_total").increment(1); tracing::warn!("task exhausted retries, discarding task_id={} retry_count={}", task.id, task.retry_count); let _ = self.consumer.ack(work_key, task_json).await; } else { counter!("hook_tasks_retried_total").increment(1); let mut task = task.clone(); task.retry_count += 1; let retry_json = serde_json::to_string(&task).unwrap_or_else(|_| task_json.to_string()); let _ = self .consumer .nak_with_retry(work_key, queue_key, task_json, &retry_json) .await; } } } } } async fn run_sync(&self, repo_id: &str) -> Result<(), GitError> { let repo_uuid = models::Uuid::parse_str(repo_id) .map_err(|_| GitError::Internal("invalid repo_id uuid".into()))?; let repo = models::repos::repo::Entity::find_by_id(repo_uuid) .one(self.db.reader()) .await .map_err(GitError::from)? .ok_or_else(|| GitError::NotFound(format!("repo {} not found", repo_id)))?; if !std::path::Path::new(&repo.storage_path).exists() { return Err(GitError::NotFound(format!( "storage path does not exist: {}", repo.storage_path ))); } // Capture before tips for webhook diff let before_tips = tokio::task::spawn_blocking({ let db = self.db.clone(); let cache = self.cache.clone(); let repo = repo.clone(); move || { let sync = HookMetaDataSync::new(db, cache, repo) .map_err(|e| GitError::Internal(e.to_string()))?; Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips())) } }) .await .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))? .map_err(GitError::from)?; // Run full sync (internally acquires/releases per-repo lock) let db = self.db.clone(); let cache = self.cache.clone(); let repo_clone = repo.clone(); let _sync_result = tokio::task::spawn_blocking(move || { let result = tokio::runtime::Handle::current().block_on(async { let sync = HookMetaDataSync::new(db.clone(), cache.clone(), repo_clone.clone())?; sync.sync().await }); match result { Ok(()) => Ok::<(), GitError>(()), Err(e) => Err(GitError::Internal(e.to_string())), } }) .await .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e))) .and_then(|r| r.map_err(GitError::from))?; // Only dispatch webhooks if sync succeeded // Capture after tips and dispatch webhooks let after_tips = tokio::task::spawn_blocking({ let db = self.db.clone(); let cache = self.cache.clone(); let repo = repo.clone(); move || { let sync = HookMetaDataSync::new(db, cache, repo) .map_err(|e| GitError::Internal(e.to_string()))?; Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips())) } }) .await .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))? .map_err(GitError::from)?; let (before_branch_tips, before_tag_tips) = before_tips; let (after_branch_tips, after_tag_tips) = after_tips; let project = repo.project; // Resolve namespace once outside the loop let namespace = models::projects::Project::find_by_id(project) .one(self.db.reader()) .await .ok() .flatten() .map(|p| p.name) .unwrap_or_default(); let repo_id_str = repo.id.to_string(); let repo_name = repo.repo_name.clone(); let default_branch = repo.default_branch.clone(); let http_client = self.http_client.clone(); let db = self.db.clone(); // Dispatch branch webhooks and collect handles let mut handles = Vec::new(); let mut branch_changes: u64 = 0; for (branch, after_oid) in after_branch_tips { let before_oid = before_branch_tips .iter() .find(|(n, _)| n == &branch) .map(|(_, o)| o.as_str()); let changed = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(true); if changed { branch_changes += 1; let before_oid = before_oid.map_or("0", |v| v).to_string(); let branch_name = branch.clone(); let h = tokio::spawn({ let http_client = http_client.clone(); let db = db.clone(); let repo_id_str = repo_id_str.clone(); let namespace = namespace.clone(); let repo_name = repo_name.clone(); let default_branch = default_branch.clone(); async move { crate::hook::webhook_dispatch::dispatch_repo_webhooks( &db, &http_client, &repo_id_str, &namespace, &repo_name, &default_branch, "", "", crate::hook::webhook_dispatch::WebhookEventKind::Push { r#ref: format!("refs/heads/{}", branch_name), before: before_oid, after: after_oid, commits: vec![], }, ) .await; } }); handles.push(h); } } // Dispatch tag webhooks and collect handles let mut tag_changes: u64 = 0; let mut changed_tag_names: Vec = Vec::new(); for (tag, after_oid) in after_tag_tips { let before_oid = before_tag_tips .iter() .find(|(n, _)| n == &tag) .map(|(_, o)| o.as_str()); let is_new = before_oid.is_none(); let was_updated = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(false); if is_new || was_updated { tag_changes += 1; changed_tag_names.push(tag.clone()); let before_oid = before_oid.map_or("0", |v| v).to_string(); let tag_name = tag.clone(); let h = tokio::spawn({ let http_client = http_client.clone(); let db = db.clone(); let repo_id_str = repo_id_str.clone(); let namespace = namespace.clone(); let repo_name = repo_name.clone(); let default_branch = default_branch.clone(); async move { crate::hook::webhook_dispatch::dispatch_repo_webhooks( &db, &http_client, &repo_id_str, &namespace, &repo_name, &default_branch, "", "", crate::hook::webhook_dispatch::WebhookEventKind::TagPush { r#ref: format!("refs/tags/{}", tag_name), before: before_oid, after: after_oid, }, ) .await; } }); handles.push(h); } } // Wait for all webhooks to complete before returning for h in handles { let _ = h.await; } counter!("hook_sync_branches_changed_total").increment(branch_changes); counter!("hook_sync_tags_changed_total").increment(tag_changes); // Embed changed tags into Qdrant for semantic search (non-blocking, fire-and-forget) if let Some(ref embed) = self.embed_service { if !changed_tag_names.is_empty() { let es = embed.clone(); let db = self.db.clone(); let repo_uuid = repo_uuid; let repo_name = repo_name.clone(); let project_id = project.to_string(); let tag_names = changed_tag_names.clone(); tokio::spawn(async move { let tags = repo_tag::Entity::find() .filter(repo_tag::Column::Repo.eq(repo_uuid)) .filter(repo_tag::Column::Name.is_in(tag_names)) .all(db.reader()) .await; match tags { Ok(rows) => { let count = rows.len(); let inputs: Vec = rows .into_iter() .map(|t| TagEmbedInput { repo_id: repo_uuid.to_string(), repo_name: repo_name.clone(), project_id: project_id.clone(), name: t.name, description: t.description, }) .collect(); if !inputs.is_empty() { if let Err(e) = es.embed_tags_batch(inputs).await { tracing::warn!(error = %e, "failed to embed changed tags"); } else { tracing::debug!(count, "embedded changed tags into Qdrant"); } } } Err(e) => { tracing::warn!(error = %e, "failed to query changed tags for embedding"); } } }); } } Ok(()) } async fn run_fsck(&self, repo_id: &str) -> Result<(), GitError> { let repo_uuid = models::Uuid::parse_str(repo_id) .map_err(|_| GitError::Internal("invalid repo_id uuid".into()))?; let repo = models::repos::repo::Entity::find_by_id(repo_uuid) .one(self.db.reader()) .await .map_err(GitError::from)? .ok_or_else(|| GitError::NotFound(format!("repo {} not found", repo_id)))?; if !std::path::Path::new(&repo.storage_path).exists() { return Err(GitError::NotFound(format!( "storage path does not exist: {}", repo.storage_path ))); } let db = self.db.clone(); let cache = self.cache.clone(); let sync = HookMetaDataSync::new(db, cache, repo)?; sync.fsck_only().await?; Ok(()) } async fn run_gc(&self, repo_id: &str) -> Result<(), GitError> { let repo_uuid = models::Uuid::parse_str(repo_id) .map_err(|_| GitError::Internal("invalid repo_id uuid".into()))?; let repo = models::repos::repo::Entity::find_by_id(repo_uuid) .one(self.db.reader()) .await .map_err(GitError::from)? .ok_or_else(|| GitError::NotFound(format!("repo {} not found", repo_id)))?; if !std::path::Path::new(&repo.storage_path).exists() { return Err(GitError::NotFound(format!( "storage path does not exist: {}", repo.storage_path ))); } let db = self.db.clone(); let cache = self.cache.clone(); let sync = HookMetaDataSync::new(db, cache, repo)?; sync.gc_only().await?; Ok(()) } }