pub mod log; pub mod metrics; pub mod redis; pub mod types; use db::cache::AppCache; use db::database::AppDatabase; use deadpool_redis::cluster::Pool as RedisPool; use sea_orm::EntityTrait; use slog::Logger; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use tokio::sync::Semaphore; use tokio::task::{JoinSet, spawn_blocking}; use tokio_util::sync::CancellationToken; use crate::hook::pool::log::LogStream; use crate::hook::pool::metrics::CpuMonitor; use crate::hook::pool::redis::RedisConsumer; use crate::hook::pool::types::{HookTask, PoolConfig, PoolMetrics, TaskType}; use crate::hook::sync::HookMetaDataSync; pub struct GitHookPool { config: PoolConfig, db: AppDatabase, cache: AppCache, logger: Logger, cpu_monitor: CpuMonitor, consumer: RedisConsumer, log_stream: LogStream, running_count: Arc, total_processed: Arc, total_failed: Arc, semaphore: Arc, http: Arc, } impl GitHookPool { pub async fn new( config: PoolConfig, db: AppDatabase, cache: AppCache, redis_pool: RedisPool, logger: Logger, http: Arc, ) -> Result { let consumer = RedisConsumer::new( redis_pool.clone(), config.redis_list_prefix.clone(), config.redis_block_timeout_secs, logger.clone(), ); let log_stream = LogStream::new( config.redis_log_channel.clone(), config.worker_id.clone(), Arc::new(redis_pool), ); Ok(Self { config, db, cache, logger, cpu_monitor: CpuMonitor::new(), consumer, log_stream, running_count: Arc::new(AtomicU64::new(0)), total_processed: Arc::new(AtomicU64::new(0)), total_failed: Arc::new(AtomicU64::new(0)), semaphore: Arc::new(Semaphore::new(num_cpus::get())), http, }) } pub async fn run(self: Arc, cancel: CancellationToken) { let mut join_set = JoinSet::<()>::new(); let cancel_clone = cancel.clone(); // Task types to poll let task_types = [TaskType::Sync, TaskType::Fsck, TaskType::Gc]; loop { tokio::select! { _ = cancel_clone.cancelled() => { slog::info!(self.logger, "pool received shutdown signal, draining {} tasks", join_set.len()); while join_set.join_next().await.is_some() {} slog::info!(self.logger, "pool shutdown complete"); break; } _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {} } let running = self.running_count.load(Ordering::Relaxed) as usize; let can_accept = self .cpu_monitor .can_accept_task( self.config.max_concurrent, self.config.cpu_threshold, running, ) .await; if !can_accept { tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; continue; } // Poll each task type in round-robin fashion for task_type in &task_types { let result = self.consumer.next(&task_type.to_string()).await; let (task, task_json) = match result { Ok(Some(pair)) => pair, Ok(None) => continue, // timeout, try next queue Err(e) => { slog::warn!(self.logger, "failed to dequeue task: {}", e); tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; break; } }; let self_clone = self.clone(); // Compute queue/work keys for ACK/NAK let queue_key = format!( "{}:{}", self_clone.config.redis_list_prefix, task_type.to_string() ); let work_key = format!("{}:work", queue_key); let permit = match self_clone.semaphore.clone().acquire_owned().await { Ok(p) => p, Err(_) => continue, }; let self_clone2 = self.clone(); self_clone2.running_count.fetch_add(1, Ordering::Relaxed); let logger_clone = self_clone2.logger.clone(); let counter_clone = self_clone2.running_count.clone(); join_set.spawn(async move { let panicked = match spawn_blocking(move || { std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { tokio::runtime::Handle::current().block_on(async { self_clone2 .execute_task_body(task, task_json, queue_key, work_key) .await }) })) }) .await { Ok(Ok(Ok(()))) => false, // spawn_blocking Ok, catch_unwind Ok, body Ok Ok(Ok(Err(_))) => true, // spawn_blocking Ok, catch_unwind Ok, body Err(()) — never hit Ok(Err(_)) => true, // spawn_blocking Ok, catch_unwind Err = panic Err(_) => true, // spawn_blocking Err = thread aborted }; drop(permit); counter_clone.fetch_sub(1, Ordering::Relaxed); if panicked { slog::error!(logger_clone, "task panicked";); } }); // Only process one task per loop iteration to avoid overwhelming the pool break; } } } async fn execute_task_body( &self, task: HookTask, task_json: String, queue_key: String, work_key: String, ) -> Result<(), ()> { slog::info!(self.logger, "task started"; "task_id" => &task.id, "task_type" => %task.task_type, "repo_id" => &task.repo_id, "worker_id" => &self.config.worker_id ); self.log_stream .info( &task.id, &task.repo_id, &format!("task started: {}", task.task_type), ) .await; let result = match task.task_type { TaskType::Sync => self.run_sync(&task).await, TaskType::Fsck => self.run_fsck(&task).await, TaskType::Gc => self.run_gc(&task).await, }; let consumer = self.consumer.clone(); match result { Ok(()) => { if let Err(e) = consumer.ack(&work_key, &task_json).await { slog::warn!(self.logger, "failed to ack task: {}", e); } self.total_processed.fetch_add(1, Ordering::Relaxed); self.log_stream .info(&task.id, &task.repo_id, "task completed") .await; } Err(e) => { if let Err(e) = consumer.nak(&work_key, &queue_key, &task_json).await { slog::warn!(self.logger, "failed to nak task: {}", e); } self.total_failed.fetch_add(1, Ordering::Relaxed); self.log_stream .error(&task.id, &task.repo_id, &format!("task failed: {}", e)) .await; } } Ok(()) } async fn run_sync(&self, task: &HookTask) -> Result<(), crate::GitError> { let repo_id = models::Uuid::parse_str(&task.repo_id) .map_err(|_| crate::GitError::Internal("invalid repo_id uuid".into()))?; let repo = models::repos::repo::Entity::find_by_id(repo_id) .one(self.db.reader()) .await .map_err(crate::GitError::from)? .ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?; let db_clone = self.db.clone(); let cache_clone = self.cache.clone(); let repo_clone = repo.clone(); let logger_clone = self.logger.clone(); // Phase 1: capture before branch/tag tips. let before_tips: (Vec<(String, String)>, Vec<(String, String)>) = tokio::task::spawn_blocking({ let db = db_clone.clone(); let cache = cache_clone.clone(); let repo = repo_clone.clone(); let logger = logger_clone.clone(); move || { let sync = HookMetaDataSync::new(db, cache, repo, logger)?; Ok::<_, crate::GitError>((sync.list_branch_tips(), sync.list_tag_tips())) } }) .await .map_err(|e| crate::GitError::Internal(format!("spawn_blocking failed: {}", e)))??; // Phase 2: run sync (async operation). let sync_result: Result<(), crate::GitError> = tokio::task::spawn_blocking({ let db = db_clone.clone(); let cache = cache_clone.clone(); let repo = repo_clone.clone(); let logger = logger_clone.clone(); move || { let sync = HookMetaDataSync::new(db, cache, repo, logger)?; tokio::runtime::Handle::current().block_on(async { sync.sync().await }) } }) .await .map_err(|e| crate::GitError::Internal(format!("spawn_blocking failed: {}", e)))?; sync_result?; // Phase 3: capture after branch/tag tips. let after_tips: (Vec<(String, String)>, Vec<(String, String)>) = tokio::task::spawn_blocking({ let db = db_clone.clone(); let cache = cache_clone.clone(); let repo = repo_clone.clone(); let logger = logger_clone.clone(); move || { let sync = HookMetaDataSync::new(db, cache, repo, logger)?; Ok::<_, crate::GitError>((sync.list_branch_tips(), sync.list_tag_tips())) } }) .await .map_err(|e| crate::GitError::Internal(format!("spawn_blocking failed: {}", e)))??; let (before_branch_tips, before_tag_tips) = before_tips; let (after_branch_tips, after_tag_tips) = after_tips; let repo_uuid = repo.id.to_string(); let repo_name = repo.repo_name.clone(); let default_branch = repo.default_branch.clone(); // Resolve namespace = project.name let namespace = models::projects::Project::find_by_id(repo.project) .one(self.db.reader()) .await .map_err(|e| crate::GitError::Internal(format!("failed to fetch project: {}", e)))? .map(|p| p.name) .unwrap_or_default(); let logger = self.logger.clone(); let http = self.http.clone(); let db = self.db.clone(); // Dispatch branch push webhooks. for (branch, after_oid) in &after_branch_tips { let before_oid = before_branch_tips .iter() .find(|(n, _)| n == branch) .map(|(_, o)| o.as_str()); let changed = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(true); if changed { let before_oid = before_oid.map_or("0", |v| v).to_string(); let after = after_oid.clone(); let branch_name = branch.clone(); slog::info!(logger, "detected push on branch"; "branch" => &branch_name, "before" => &before_oid, "after" => &after); let http = http.clone(); let db = db.clone(); let logs = logger.clone(); let ru = repo_uuid.clone(); let ns = namespace.clone(); let rn = repo_name.clone(); let db_branch = default_branch.clone(); tokio::spawn(async move { crate::hook::webhook_dispatch::dispatch_repo_webhooks( &db, &http, &logs, &ru, &ns, &rn, &db_branch, "", "", crate::hook::webhook_dispatch::WebhookEventKind::Push { r#ref: format!("refs/heads/{}", branch_name), before: before_oid, after, commits: vec![], }, ) .await; }); } } // Dispatch tag push webhooks. for (tag, after_oid) in &after_tag_tips { let before_oid = before_tag_tips .iter() .find(|(n, _)| n == tag) .map(|(_, o)| o.as_str()); let is_new = before_oid.is_none(); let was_updated = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(false); if is_new || was_updated { let before_oid = before_oid.map_or("0", |v| v).to_string(); let after = after_oid.clone(); let tag_name = tag.clone(); slog::info!(logger, "detected tag push"; "tag" => &tag_name, "before" => &before_oid, "after" => &after); let http = http.clone(); let db = db.clone(); let logs = logger.clone(); let ru = repo_uuid.clone(); let ns = namespace.clone(); let rn = repo_name.clone(); let db_branch = default_branch.clone(); tokio::spawn(async move { crate::hook::webhook_dispatch::dispatch_repo_webhooks( &db, &http, &logs, &ru, &ns, &rn, &db_branch, "", "", crate::hook::webhook_dispatch::WebhookEventKind::TagPush { r#ref: format!("refs/tags/{}", tag_name), before: before_oid, after, }, ) .await; }); } } Ok(()) } async fn run_fsck(&self, task: &HookTask) -> Result<(), crate::GitError> { let repo_id = models::Uuid::parse_str(&task.repo_id) .map_err(|_| crate::GitError::Internal("invalid repo_id uuid".into()))?; let repo = models::repos::repo::Entity::find_by_id(repo_id) .one(self.db.reader()) .await .map_err(crate::GitError::from)? .ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?; self.log_stream .info(&task.id, &task.repo_id, "running fsck") .await; let db_clone = self.db.clone(); let cache_clone = self.cache.clone(); let logger_clone = self.logger.clone(); tokio::task::spawn_blocking(move || -> Result<(), crate::GitError> { tokio::runtime::Handle::current().block_on(async move { let sync = HookMetaDataSync::new(db_clone.clone(), cache_clone, repo, logger_clone)?; let mut txn = db_clone.begin().await.map_err(crate::GitError::from)?; sync.run_fsck_and_rollback_if_corrupt(&mut txn).await }) }) .await .map_err(|e| crate::GitError::Internal(format!("spawn_blocking failed: {}", e)))??; Ok(()) } async fn run_gc(&self, task: &HookTask) -> Result<(), crate::GitError> { let repo_id = models::Uuid::parse_str(&task.repo_id) .map_err(|_| crate::GitError::Internal("invalid repo_id uuid".into()))?; let repo = models::repos::repo::Entity::find_by_id(repo_id) .one(self.db.reader()) .await .map_err(crate::GitError::from)? .ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?; self.log_stream .info(&task.id, &task.repo_id, "running gc") .await; let db_clone = self.db.clone(); let cache_clone = self.cache.clone(); let logger_clone = self.logger.clone(); tokio::task::spawn_blocking(move || -> Result<(), crate::GitError> { tokio::runtime::Handle::current().block_on(async move { let sync = HookMetaDataSync::new(db_clone, cache_clone, repo, logger_clone)?; sync.run_gc().await }) }) .await .map_err(|e| crate::GitError::Internal(format!("spawn_blocking failed: {}", e)))??; Ok(()) } pub fn metrics(&self) -> PoolMetrics { let running = self.running_count.load(Ordering::Relaxed) as usize; PoolMetrics { running, max_concurrent: self.config.max_concurrent, cpu_usage: 0.0, total_processed: self.total_processed.load(Ordering::Relaxed), total_failed: self.total_failed.load(Ordering::Relaxed), can_accept: running < self.config.max_concurrent, } } pub fn can_accept_task_sync(&self) -> bool { let running = self.running_count.load(Ordering::Relaxed) as usize; running < self.config.max_concurrent } pub fn log_stream(&self) -> &LogStream { &self.log_stream } }