use std::time::Duration; use cache::AppCache; use config::AppConfig; use db::{database::AppDatabase, sqlx}; use deadpool_redis::cluster::Pool as RedisPool; use model::repos::RepoModel; use parsefile::TriggerEvent; use track::CounterVec; use crate::sync::{ HookTask, TaskType, cicheck::{CiCheckOutcome, check_and_enqueue}, consumer::SyncConsumer, lock::{acquire_repo_lock, release_repo_lock}, webhook::{WebhookDeliveryTask, deliver_webhook}, }; pub struct SyncWorker { pub consumer: SyncConsumer, pub db: AppDatabase, pub cache: AppCache, pub redis_pool: RedisPool, pub config: AppConfig, pub max_retries: usize, pub worker_id: String, pub metrics: Option, } impl SyncWorker { pub fn new( consumer: SyncConsumer, db: AppDatabase, cache: AppCache, redis_pool: RedisPool, config: AppConfig, worker_id: String, ) -> Self { Self { consumer, db, cache, redis_pool, config, max_retries: 3, worker_id, metrics: None, } } pub fn set_metrics(&mut self, registry: track::MetricsRegistry) { self.metrics = Some(registry); } #[tracing::instrument(skip(self))] pub async fn run(&self) { tracing::info!(worker_id = %self.worker_id, "sync worker starting"); let mut backoff_secs: u64 = 1; loop { let mut had_error = false; for task_type in &[ TaskType::Sync, TaskType::Fsck, TaskType::Gc, TaskType::Webhook, ] { let queue_key = self.consumer.queue_key_for_task_type(task_type); if let Some((task_json, work_key)) = self.consumer.next(task_type).await { let task: HookTask = match serde_json::from_str(&task_json) { Ok(t) => t, Err(e) => { tracing::error!(error = %e, "failed to deserialize hook task"); self.consumer.ack(&task_json, &work_key).await; continue; } }; tracing::info!( task_id = %task.id, repo_id = %task.repo_id, task_type = ?task.task_type, "processing hook task" ); let result = self .process_task(&task, &task_json, &work_key, &queue_key) .await; match result { ProcessResult::Success => { self.record_sync_metric(&task, "success"); self.consumer.ack(&task_json, &work_key).await; backoff_secs = 1; } ProcessResult::Locked => { self.record_sync_metric(&task, "locked"); self.consumer .nak_with_retry( &task_json, &work_key, &queue_key, ) .await; backoff_secs = 1; } ProcessResult::Error => { if task.retry_count >= self.max_retries { tracing::warn!( task_id = %task.id, repo_id = %task.repo_id, retry_count = task.retry_count, "max retries exceeded, dropping task" ); self.record_sync_metric(&task, "dropped"); self.consumer.ack(&task_json, &work_key).await; } else { tracing::warn!( task_id = %task.id, repo_id = %task.repo_id, retry_count = task.retry_count, "task failed, re-queueing" ); let mut updated_task = task.clone(); updated_task.retry_count += 1; if let Ok(updated_json) = serde_json::to_string(&updated_task) { self.consumer .nak_with_retry( &updated_json, &work_key, &queue_key, ) .await; } else { self.consumer .nak_with_retry( &task_json, &work_key, &queue_key, ) .await; } } had_error = true; } } } } if had_error { tokio::time::sleep(Duration::from_secs(backoff_secs)).await; backoff_secs = (backoff_secs * 2).min(32); } } } #[tracing::instrument(skip(self, _task_json, _work_key, _queue_key), fields(task_id = %task.id, task_type = ?task.task_type))] async fn process_task( &self, task: &HookTask, _task_json: &str, _work_key: &str, _queue_key: &str, ) -> ProcessResult { match task.task_type { TaskType::Sync => self.run_sync(task).await, TaskType::Fsck => self.run_fsck(task).await, TaskType::Gc => self.run_gc(task).await, TaskType::Webhook => self.run_webhook(task).await, } } #[tracing::instrument(skip(self), fields(repo_id = %task.repo_id))] async fn run_sync(&self, task: &HookTask) -> ProcessResult { let repo_id = match task.repo_id.parse::() { Ok(id) => id, Err(e) => { tracing::error!(error = %e, repo_id = %task.repo_id, "invalid repo_id UUID"); return ProcessResult::Error; } }; let lock_value = format!("{}:{}", self.worker_id, task.id); let lock_result = acquire_repo_lock( &self.redis_pool, repo_id, &lock_value, 300, // 5 min TTL ) .await; match lock_result { Ok(true) => {} Ok(false) => return ProcessResult::Locked, Err(e) => { tracing::error!(error = %e, repo_id = %repo_id, "failed to acquire repo lock"); return ProcessResult::Error; } } let result = self.do_sync(repo_id).await; release_repo_lock(&self.redis_pool, repo_id, &lock_value).await; match result { Ok(()) => ProcessResult::Success, Err(e) => { tracing::error!(error = %e, repo_id = %repo_id, "sync pipeline failed"); ProcessResult::Error } } } #[tracing::instrument(skip(self), fields(repo_id = %repo_id))] async fn do_sync(&self, repo_id: uuid::Uuid) -> anyhow::Result<()> { let pool = self.db.reader(); let repo_model = sqlx::query_as::<_, RepoModel>( "SELECT id, wk, name, description, default_branch, visibility, size_bytes, is_archived, is_template, is_mirror, created_by, storage_path, created_at, updated_at, deleted_at FROM repo WHERE id = $1" ) .bind(repo_id) .fetch_optional(pool) .await? .ok_or_else(|| anyhow::anyhow!("repo not found: {}", repo_id))?; let bare_dir = std::path::PathBuf::from(&repo_model.storage_path); if !bare_dir.exists() { tracing::warn!(repo_id = %repo_id, "bare repo directory missing, skipping sync"); return Ok(()); } let bare = crate::bare::GitBare { bare_dir }; if let Err(e) = crate::sync::branch::sync_refs(&self.db, &bare, repo_id).await { tracing::error!(error = %e, repo_id = %repo_id, "sync_refs failed"); } if let Err(e) = crate::sync::commit::sync_commits(&self.db, &bare, repo_id).await { tracing::error!(error = %e, repo_id = %repo_id, "sync_commits failed"); } if let Err(e) = crate::sync::tag::sync_tags(&self.db, &bare, repo_id).await { tracing::error!(error = %e, repo_id = %repo_id, "sync_tags failed"); } if let Err(e) = crate::sync::lfs::sync_lfs_objects(&self.db, &bare, repo_id).await { tracing::error!(error = %e, repo_id = %repo_id, "sync_lfs_objects failed"); } if let Err(e) = crate::sync::language::sync_languages(&self.db, &bare, repo_id) .await { tracing::error!(error = %e, repo_id = %repo_id, "sync_languages failed"); } let gc_result = bare.git_command_trusted_unchecked(vec![ "gc".to_string(), "--auto".to_string(), "--quiet".to_string(), ]); if let Ok(output) = gc_result { if !output.success { tracing::warn!(repo_id = %repo_id, "git gc failed: {}", output.stderr_lossy()); } } let pattern = format!("git:rpc:cache:*:{}:*", repo_id); let _ = self.cache.delete_pattern(&pattern).await; tracing::info!(repo_id = %repo_id, "sync completed"); if let Err(e) = self .run_ci_check(&repo_model.default_branch, &bare, repo_id) .await { tracing::warn!(error = %e, repo_id = %repo_id, "CI check failed"); } Ok(()) } #[tracing::instrument(skip(self), fields(repo_id = %task.repo_id))] async fn run_fsck(&self, task: &HookTask) -> ProcessResult { let repo_id = match task.repo_id.parse::() { Ok(id) => id, Err(_) => return ProcessResult::Error, }; let pool = self.db.reader(); let storage_path = sqlx::query_scalar::<_, String>( "SELECT storage_path FROM repo WHERE id = $1", ) .bind(repo_id) .fetch_optional(pool) .await; let storage_path = match storage_path { Ok(Some(s)) => s, _ => return ProcessResult::Error, }; let bare = crate::bare::GitBare { bare_dir: std::path::PathBuf::from(&storage_path), }; let result = bare.git_command_trusted_unchecked(vec![ "fsck".to_string(), "--full".to_string(), ]); match result { Ok(output) if output.success => ProcessResult::Success, Ok(output) => { tracing::warn!(repo_id = %repo_id, "fsck failed: {}", output.stderr_lossy()); ProcessResult::Error } Err(e) => { tracing::error!(error = %e, repo_id = %repo_id, "fsck command failed"); ProcessResult::Error } } } #[tracing::instrument(skip(self), fields(repo_id = %task.repo_id))] async fn run_gc(&self, task: &HookTask) -> ProcessResult { let repo_id = match task.repo_id.parse::() { Ok(id) => id, Err(_) => return ProcessResult::Error, }; let pool = self.db.reader(); let storage_path = sqlx::query_scalar::<_, String>( "SELECT storage_path FROM repo WHERE id = $1", ) .bind(repo_id) .fetch_optional(pool) .await; let storage_path = match storage_path { Ok(Some(s)) => s, _ => return ProcessResult::Error, }; let bare = crate::bare::GitBare { bare_dir: std::path::PathBuf::from(&storage_path), }; let result = bare .git_command_trusted(vec!["gc".to_string(), "--auto".to_string()]); match result { Ok(_) => ProcessResult::Success, Err(e) => { tracing::error!(error = %e, repo_id = %repo_id, "gc command failed"); ProcessResult::Error } } } async fn run_ci_check( &self, default_branch: &str, bare: &crate::bare::GitBare, repo_id: uuid::Uuid, ) -> anyhow::Result<()> { let event = TriggerEvent::PushBranch(default_branch.to_owned()); let outcome = check_and_enqueue(bare, repo_id, &event, &self.redis_pool).await?; match outcome { CiCheckOutcome::Enqueued => { tracing::info!( repo_id = %repo_id, branch = %default_branch, "CI pipeline triggered" ); } CiCheckOutcome::NoPipelineFile => { tracing::debug!(repo_id = %repo_id, "no pipeline.yaml found"); } CiCheckOutcome::NotTriggered => { tracing::debug!( repo_id = %repo_id, branch = %default_branch, "pipeline.yaml exists but not triggered for this event" ); } } Ok(()) } #[tracing::instrument(skip(self), fields(repo_id = %task.repo_id))] async fn run_webhook(&self, task: &HookTask) -> ProcessResult { let repo_id = match task.repo_id.parse::() { Ok(id) => id, Err(_) => return ProcessResult::Error, }; let event = task .payload .get("webhook_event") .and_then(|v| v.as_str()) .unwrap_or("push"); let webhooks: Vec<(uuid::Uuid, String, Option, String)> = sqlx::query_as( "SELECT id, url, secret_hash, events \ FROM repo_webhook WHERE repo = $1 AND active = true", ) .bind(repo_id) .fetch_all(self.db.reader()) .await .unwrap_or_default(); for (wh_id, wh_url, wh_secret, wh_events) in webhooks { let subscribed: Vec<&str> = wh_events.split('.').filter(|s| !s.is_empty()).collect(); let matches = subscribed.iter().any(|e| { *e == event || (*e == "push" && (event == "push_branch" || event == "push_tag")) }); if !matches { continue; } let delivery_id = uuid::Uuid::now_v7(); let now = chrono::Utc::now(); sqlx::query( "INSERT INTO repo_webhook_delivery \ (id, repo, webhook, event, request_headers, request_body, \ response_status, response_headers, response_body, error, delivered_at, created_at) \ VALUES ($1, $2, $3, $4, NULL, NULL, NULL, NULL, NULL, NULL, NULL, $5)", ) .bind(delivery_id) .bind(repo_id) .bind(wh_id) .bind(event) .bind(now) .execute(self.db.writer()) .await .ok(); let wh_task = WebhookDeliveryTask { id: delivery_id.to_string(), webhook_id: wh_id.to_string(), repo_id: repo_id.to_string(), event: event.to_string(), url: wh_url, secret: wh_secret, payload: task.payload.clone(), created_at: now, retry_count: 0, }; let result = deliver_webhook(&wh_task).await; sqlx::query( "UPDATE repo_webhook_delivery SET \ request_headers = $1, request_body = $2, \ response_status = $3, response_headers = $4, response_body = $5, \ error = $6, delivered_at = $7 WHERE id = $8", ) .bind(&result.request_headers) .bind(&result.request_body) .bind(result.response_status) .bind(&result.response_headers) .bind(&result.response_body) .bind(&result.error) .bind(now) .bind(delivery_id) .execute(self.db.writer()) .await .ok(); if result.error.is_some() { tracing::warn!( webhook_id = %wh_id, repo_id = %repo_id, error = ?result.error, "webhook delivery failed" ); } else { tracing::info!( webhook_id = %wh_id, repo_id = %repo_id, status = ?result.response_status, "webhook delivered" ); } } ProcessResult::Success } } impl SyncWorker { fn record_sync_metric(&self, task: &HookTask, outcome: &str) { if let Some(reg) = &self.metrics { let task_label = task_type_label(&task.task_type); sync_metrics_vec(reg) .with_label_values(&[task_label, outcome]) .inc(); } } } fn task_type_label(tt: &TaskType) -> &str { match tt { TaskType::Sync => "sync", TaskType::Fsck => "fsck", TaskType::Gc => "gc", TaskType::Webhook => "webhook", } } fn sync_metrics_vec(registry: &track::MetricsRegistry) -> CounterVec { registry .register_counter_vec( "gitsync_tasks_total", "Total sync tasks processed", &["task_type", "outcome"], ) .expect("failed to register gitsync_tasks_total") } enum ProcessResult { Success, Locked, Error, }