diff --git a/libs/git/hook/mod.rs b/libs/git/hook/mod.rs index 07a21a3..3f3cd43 100644 --- a/libs/git/hook/mod.rs +++ b/libs/git/hook/mod.rs @@ -3,7 +3,6 @@ use db::cache::AppCache; use db::database::AppDatabase; use deadpool_redis::cluster::Pool as RedisPool; use slog::Logger; -use std::sync::Arc; use tokio_util::sync::CancellationToken; pub mod pool; @@ -23,7 +22,6 @@ pub struct HookService { pub(crate) redis_pool: RedisPool, pub(crate) logger: Logger, pub(crate) config: AppConfig, - pub(crate) http: Arc, } impl HookService { @@ -33,7 +31,6 @@ impl HookService { redis_pool: RedisPool, logger: Logger, config: AppConfig, - http: Arc, ) -> Self { Self { db, @@ -41,7 +38,6 @@ impl HookService { redis_pool, logger, config, - http, } } diff --git a/libs/git/hook/pool/mod.rs b/libs/git/hook/pool/mod.rs index 9260473..1fec097 100644 --- a/libs/git/hook/pool/mod.rs +++ b/libs/git/hook/pool/mod.rs @@ -10,6 +10,7 @@ use db::cache::AppCache; use db::database::AppDatabase; use deadpool_redis::cluster::Pool as RedisPool; use slog::Logger; +use std::sync::Arc; use tokio_util::sync::CancellationToken; /// Start the hook worker background task. @@ -28,7 +29,8 @@ pub fn start_worker( logger.clone(), ); - let worker = HookWorker::new(db, cache, logger, consumer); + let http_client = Arc::new(reqwest::Client::new()); + let worker = HookWorker::new(db, cache, logger, consumer, http_client); let cancel = CancellationToken::new(); let cancel_clone = cancel.clone(); diff --git a/libs/git/hook/pool/redis.rs b/libs/git/hook/pool/redis.rs index 9815606..87c3859 100644 --- a/libs/git/hook/pool/redis.rs +++ b/libs/git/hook/pool/redis.rs @@ -121,6 +121,7 @@ impl RedisConsumer { /// Negative acknowledge with a different (updated) task JSON — used to /// requeue with an incremented retry_count. + /// Uses a Lua script for atomic LREM + LPUSH to prevent task loss on crash. pub async fn nak_with_retry( &self, work_key: &str, @@ -128,8 +129,6 @@ impl RedisConsumer { old_task_json: &str, new_task_json: &str, ) -> Result<(), GitError> { - self.ack_raw(work_key, old_task_json).await?; - let redis = self .pool .get() @@ -138,12 +137,22 @@ impl RedisConsumer { let mut conn: RedisConn = redis; - let _: i64 = redis::cmd("LPUSH") - .arg(queue_key) + // Atomic: remove from work queue AND push to retry queue in one script. + // If the process crashes mid-script, either both happen or neither — no lost tasks. + let script = r#" + redis.call("LREM", KEYS[1], 1, ARGV[1]) + redis.call("LPUSH", KEYS[2], ARGV[2]) + return 1 + "#; + + let _: i32 = redis::Script::new(script) + .key(work_key) + .key(queue_key) + .arg(old_task_json) .arg(new_task_json) - .query_async(&mut conn) + .invoke_async(&mut conn) .await - .map_err(|e| GitError::Internal(format!("LPUSH retry failed: {}", e)))?; + .map_err(|e| GitError::Internal(format!("nak script failed: {}", e)))?; slog::warn!(self.logger, "task nack'd and requeued queue={}", queue_key); diff --git a/libs/git/hook/pool/worker.rs b/libs/git/hook/pool/worker.rs index ed01169..6b96171 100644 --- a/libs/git/hook/pool/worker.rs +++ b/libs/git/hook/pool/worker.rs @@ -6,18 +6,20 @@ use db::cache::AppCache; use db::database::AppDatabase; use models::EntityTrait; use slog::Logger; +use std::sync::Arc; use std::time::Duration; use tokio_util::sync::CancellationToken; /// Single-threaded worker that sequentially consumes tasks from Redis queues. /// K8s can scale replicas for concurrency — each replica runs one worker. -/// Per-repo Redis locking prevents concurrent workers from processing the same repo. +/// Per-repo Redis locking is managed inside HookMetaDataSync methods. #[derive(Clone)] pub struct HookWorker { db: AppDatabase, cache: AppCache, logger: Logger, consumer: RedisConsumer, + http_client: Arc, } impl HookWorker { @@ -26,12 +28,14 @@ impl HookWorker { cache: AppCache, logger: Logger, consumer: RedisConsumer, + http_client: Arc, ) -> Self { Self { db, cache, logger, consumer, + http_client, } } @@ -52,10 +56,7 @@ impl HookWorker { } for task_type in &task_types { - let result = self - .consumer - .next(&task_type.to_string()) - .await; + let result = self.consumer.next(&task_type.to_string()).await; let (task, task_json) = match result { Ok(Some(pair)) => pair, @@ -102,11 +103,10 @@ impl HookWorker { slog::info!(self.logger, "task completed task_id={}", task.id); } Err(e) => { - // GitError::Locked means another worker is processing this repo — - // requeue without incrementing retry count so it can be picked up later. let is_locked = matches!(e, crate::GitError::Locked(_)); if is_locked { + // Another worker holds the lock — requeue without counting as retry. slog::info!(self.logger, "repo locked by another worker, requeueing task_id={}", task.id); if let Err(nak_err) = self.consumer.nak(work_key, queue_key, task_json).await { slog::warn!(self.logger, "failed to requeue locked task: {}", nak_err); @@ -152,7 +152,7 @@ impl HookWorker { ))); } - // Capture before tips + // Capture before tips for webhook diff let before_tips = tokio::task::spawn_blocking({ let db = self.db.clone(); let cache = self.cache.clone(); @@ -168,28 +168,26 @@ impl HookWorker { .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))? .map_err(GitError::from)?; - // Run sync - tokio::task::spawn_blocking({ - let db = self.db.clone(); - let cache = self.cache.clone(); - let logger = self.logger.clone(); - let repo = repo.clone(); - move || { - let result = tokio::runtime::Handle::current().block_on(async { - let sync = HookMetaDataSync::new(db, cache, repo, logger)?; - sync.sync().await - }); - match result { - Ok(()) => Ok::<(), GitError>(()), - Err(e) => Err(GitError::Internal(e.to_string())), - } + // Run full sync (internally acquires/releases per-repo lock) + let db = self.db.clone(); + let cache = self.cache.clone(); + let logger = self.logger.clone(); + let repo_clone = repo.clone(); + tokio::task::spawn_blocking(move || { + let result = tokio::runtime::Handle::current().block_on(async { + let sync = HookMetaDataSync::new(db.clone(), cache.clone(), repo_clone.clone(), logger.clone())?; + sync.sync().await + }); + match result { + Ok(()) => Ok::<(), GitError>(()), + Err(e) => Err(GitError::Internal(e.to_string())), } }) .await .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))? .map_err(GitError::from)?; - // Capture after tips + // Capture after tips and dispatch webhooks let after_tips = tokio::task::spawn_blocking({ let db = self.db.clone(); let cache = self.cache.clone(); @@ -209,7 +207,24 @@ impl HookWorker { let (after_branch_tips, after_tag_tips) = after_tips; let project = repo.project; - // Dispatch branch push webhooks + // Resolve namespace once outside the loop + let namespace = models::projects::Project::find_by_id(project) + .one(self.db.reader()) + .await + .ok() + .flatten() + .map(|p| p.name) + .unwrap_or_default(); + + let repo_id_str = repo.id.to_string(); + let repo_name = repo.repo_name.clone(); + let default_branch = repo.default_branch.clone(); + let http_client = self.http_client.clone(); + let db = self.db.clone(); + let logger = self.logger.clone(); + + // Dispatch branch webhooks and collect handles + let mut handles = Vec::new(); for (branch, after_oid) in after_branch_tips { let before_oid = before_branch_tips .iter() @@ -219,43 +234,40 @@ impl HookWorker { if changed { let before_oid = before_oid.map_or("0", |v| v).to_string(); let branch_name = branch.clone(); - let db = self.db.clone(); - let logger = self.logger.clone(); - let repo_id_str = repo.id.to_string(); - let repo_name = repo.repo_name.clone(); - let default_branch = repo.default_branch.clone(); - let ns = models::projects::Project::find_by_id(project) - .one(self.db.reader()) - .await - .ok() - .flatten() - .map(|p| p.name) - .unwrap_or_default(); - - tokio::spawn(async move { - crate::hook::webhook_dispatch::dispatch_repo_webhooks( - &db, - &reqwest::Client::new(), - &logger, - &repo_id_str, - &ns, - &repo_name, - &default_branch, - "", - "", - crate::hook::webhook_dispatch::WebhookEventKind::Push { - r#ref: format!("refs/heads/{}", branch_name), - before: before_oid, - after: after_oid, - commits: vec![], - }, - ) - .await; + let h = tokio::spawn({ + let http_client = http_client.clone(); + let db = db.clone(); + let logger = logger.clone(); + let repo_id_str = repo_id_str.clone(); + let namespace = namespace.clone(); + let repo_name = repo_name.clone(); + let default_branch = default_branch.clone(); + async move { + crate::hook::webhook_dispatch::dispatch_repo_webhooks( + &db, + &http_client, + &logger, + &repo_id_str, + &namespace, + &repo_name, + &default_branch, + "", + "", + crate::hook::webhook_dispatch::WebhookEventKind::Push { + r#ref: format!("refs/heads/{}", branch_name), + before: before_oid, + after: after_oid, + commits: vec![], + }, + ) + .await; + } }); + handles.push(h); } } - // Dispatch tag push webhooks + // Dispatch tag webhooks and collect handles for (tag, after_oid) in after_tag_tips { let before_oid = before_tag_tips .iter() @@ -266,41 +278,43 @@ impl HookWorker { if is_new || was_updated { let before_oid = before_oid.map_or("0", |v| v).to_string(); let tag_name = tag.clone(); - let db = self.db.clone(); - let logger = self.logger.clone(); - let repo_id_str = repo.id.to_string(); - let repo_name = repo.repo_name.clone(); - let default_branch = repo.default_branch.clone(); - let ns = models::projects::Project::find_by_id(project) - .one(self.db.reader()) - .await - .ok() - .flatten() - .map(|p| p.name) - .unwrap_or_default(); - - tokio::spawn(async move { - crate::hook::webhook_dispatch::dispatch_repo_webhooks( - &db, - &reqwest::Client::new(), - &logger, - &repo_id_str, - &ns, - &repo_name, - &default_branch, - "", - "", - crate::hook::webhook_dispatch::WebhookEventKind::TagPush { - r#ref: format!("refs/tags/{}", tag_name), - before: before_oid, - after: after_oid, - }, - ) - .await; + let h = tokio::spawn({ + let http_client = http_client.clone(); + let db = db.clone(); + let logger = logger.clone(); + let repo_id_str = repo_id_str.clone(); + let namespace = namespace.clone(); + let repo_name = repo_name.clone(); + let default_branch = default_branch.clone(); + async move { + crate::hook::webhook_dispatch::dispatch_repo_webhooks( + &db, + &http_client, + &logger, + &repo_id_str, + &namespace, + &repo_name, + &default_branch, + "", + "", + crate::hook::webhook_dispatch::WebhookEventKind::TagPush { + r#ref: format!("refs/tags/{}", tag_name), + before: before_oid, + after: after_oid, + }, + ) + .await; + } }); + handles.push(h); } } + // Wait for all webhooks to complete before returning + for h in handles { + let _ = h.await; + } + Ok(()) } @@ -325,17 +339,14 @@ impl HookWorker { let cache = self.cache.clone(); let logger = self.logger.clone(); - tokio::task::spawn_blocking({ - let repo = repo.clone(); - move || { - let result = tokio::runtime::Handle::current().block_on(async { - let sync = HookMetaDataSync::new(db, cache, repo, logger)?; - sync.fsck_only().await - }); - match result { - Ok(()) => Ok::<(), GitError>(()), - Err(e) => Err(GitError::Internal(e.to_string())), - } + tokio::task::spawn_blocking(move || { + let result = tokio::runtime::Handle::current().block_on(async { + let sync = HookMetaDataSync::new(db.clone(), cache.clone(), repo.clone(), logger.clone())?; + sync.fsck_only().await + }); + match result { + Ok(()) => Ok::<(), GitError>(()), + Err(e) => Err(GitError::Internal(e.to_string())), } }) .await @@ -366,17 +377,14 @@ impl HookWorker { let cache = self.cache.clone(); let logger = self.logger.clone(); - tokio::task::spawn_blocking({ - let repo = repo.clone(); - move || { - let result = tokio::runtime::Handle::current().block_on(async { - let sync = HookMetaDataSync::new(db, cache, repo, logger)?; - sync.gc_only().await - }); - match result { - Ok(()) => Ok::<(), GitError>(()), - Err(e) => Err(GitError::Internal(e.to_string())), - } + tokio::task::spawn_blocking(move || { + let result = tokio::runtime::Handle::current().block_on(async { + let sync = HookMetaDataSync::new(db.clone(), cache.clone(), repo.clone(), logger.clone())?; + sync.gc_only().await + }); + match result { + Ok(()) => Ok::<(), GitError>(()), + Err(e) => Err(GitError::Internal(e.to_string())), } }) .await diff --git a/libs/git/hook/sync/lock.rs b/libs/git/hook/sync/lock.rs index 660742c..bf7bafc 100644 --- a/libs/git/hook/sync/lock.rs +++ b/libs/git/hook/sync/lock.rs @@ -2,7 +2,7 @@ use crate::GitError; use crate::hook::sync::HookMetaDataSync; impl HookMetaDataSync { - const LOCK_TTL_SECS: u64 = 60; + const LOCK_TTL_SECS: u64 = 300; /// Try to acquire an exclusive lock for this repo. /// Returns the lock value if acquired, which must be passed to `release_lock`. diff --git a/libs/git/hook/sync/mod.rs b/libs/git/hook/sync/mod.rs index 5d4629a..dcefe9a 100644 --- a/libs/git/hook/sync/mod.rs +++ b/libs/git/hook/sync/mod.rs @@ -26,7 +26,6 @@ use sha1::Digest; /// Recursively scan `base` for files named `SKILL.md`. /// The skill slug is `{short_repo_id}/{parent_dir_name}` to ensure uniqueness across repos. -/// Populates `commit_sha` (current HEAD) and `blob_hash` for each discovered file. fn scan_skills_from_dir( base: &Path, repo_id: &RepoId, @@ -66,8 +65,6 @@ fn scan_skills_from_dir( Ok(discovered) } -/// Compute the git blob SHA-1 hash of `content`. -/// Format: "blob {len}\0{data}" fn git_blob_hash(content: &[u8]) -> String { let size = content.len(); let header = format!("blob {}\0", size); @@ -77,7 +74,6 @@ fn git_blob_hash(content: &[u8]) -> String { hex::encode(hasher.finalize()) } -/// Parse a SKILL.md file (raw bytes) to extract name, description, content, and frontmatter metadata. fn parse_skill_content(slug: &str, raw: &[u8]) -> DiscoveredSkill { let content = String::from_utf8_lossy(raw); let (frontmatter, body) = extract_frontmatter(&content); @@ -107,7 +103,6 @@ fn parse_skill_content(slug: &str, raw: &[u8]) -> DiscoveredSkill { } } -/// A skill discovered in a repository. struct DiscoveredSkill { slug: String, name: String, @@ -158,62 +153,42 @@ impl HookMetaDataSync { }) } - /// Full sync: refs → commits → tags → LFS → fsck → gc → skills. - /// Acquires a per-repo Redis distributed lock to prevent concurrent workers - /// from processing the same repository simultaneously. + /// Full sync with lock. Caller (worker) manages locking. pub async fn sync(&self) -> Result<(), crate::GitError> { let lock_value = self.acquire_lock().await?; - let res = self.sync_full().await; + let res = self.sync_work().await; if let Err(ref e) = res { slog::error!(self.logger, "sync failed: {}", e); } - if let Err(release_err) = self.release_lock(&lock_value).await { - slog::error!(self.logger, "failed to release lock: {}", release_err); - } - + let _ = self.release_lock(&lock_value).await; res } - /// Run fsck only (refs snapshot + git fsck + rollback on corruption). - /// Acquires a per-repo Redis lock. + /// Fsck only with lock. Caller manages locking. pub async fn fsck_only(&self) -> Result<(), crate::GitError> { let lock_value = self.acquire_lock().await?; - let res = async { - let mut txn = self - .db - .begin() - .await - .map_err(|e| crate::GitError::IoError(format!("failed to begin transaction: {}", e)))?; - self.run_fsck_and_rollback_if_corrupt(&mut txn) - .await?; - txn.commit().await.map_err(|e| { - crate::GitError::IoError(format!("failed to commit transaction: {}", e)) - })?; - Ok::<(), crate::GitError>(()) - } - .await; + let res = self.fsck_work().await; let _ = self.release_lock(&lock_value).await; res } - /// Run gc only. Acquires a per-repo Redis lock. + /// GC only with lock. Caller manages locking. pub async fn gc_only(&self) -> Result<(), crate::GitError> { let lock_value = self.acquire_lock().await?; - let res = self.run_gc().await; + let res = self.gc_work().await; let _ = self.release_lock(&lock_value).await; res } - /// Full sync pipeline inside a single DB transaction. - /// On fsck failure, refs are rolled back and an error is returned. - async fn sync_full(&self) -> Result<(), crate::GitError> { + /// Full sync pipeline (no locking — caller is responsible). + async fn sync_work(&self) -> Result<(), crate::GitError> { let mut txn = self .db .begin() @@ -237,6 +212,29 @@ impl HookMetaDataSync { Ok(()) } + /// Fsck only work (no locking — caller is responsible). + async fn fsck_work(&self) -> Result<(), crate::GitError> { + let mut txn = self + .db + .begin() + .await + .map_err(|e| crate::GitError::IoError(format!("failed to begin transaction: {}", e)))?; + + self.run_fsck_and_rollback_if_corrupt(&mut txn) + .await?; + + txn.commit().await.map_err(|e| { + crate::GitError::IoError(format!("failed to commit transaction: {}", e)) + })?; + + Ok(()) + } + + /// GC only work (no locking — caller is responsible). + async fn gc_work(&self) -> Result<(), crate::GitError> { + self.run_gc().await + } + /// Returns a list of (branch_name, oid) for all local branches. pub fn list_branch_tips(&self) -> Vec<(String, String)> { let repo = self.domain.repo(); diff --git a/libs/git/http/mod.rs b/libs/git/http/mod.rs index 071998a..86b8e83 100644 --- a/libs/git/http/mod.rs +++ b/libs/git/http/mod.rs @@ -72,14 +72,12 @@ pub async fn run_http(config: AppConfig, logger: Logger) -> anyhow::Result<()> { let app_cache = app_cache?; let redis_pool = app_cache.redis_pool().clone(); - let http_client = Arc::new(reqwest::Client::new()); let hook = HookService::new( db.clone(), app_cache.clone(), redis_pool.clone(), logger.clone(), config.clone(), - http_client, ); let _worker_cancel = hook.start_worker(); slog::info!(logger, "hook worker started"); diff --git a/libs/git/ssh/mod.rs b/libs/git/ssh/mod.rs index a81d49f..57a651e 100644 --- a/libs/git/ssh/mod.rs +++ b/libs/git/ssh/mod.rs @@ -1,5 +1,4 @@ use crate::error::GitError; -use crate::hook::HookService; use crate::hook::pool::types::{HookTask, TaskType}; use anyhow::Context; use base64::Engine; @@ -296,7 +295,6 @@ pub async fn run_ssh(config: AppConfig, logger: Logger) -> anyhow::Result<()> { redis_pool.clone(), logger.clone(), config.clone(), - Arc::new(reqwest::Client::new()), ); let _worker_cancel = hook.start_worker(); slog::info!(logger, "hook worker started");