From 8fb2436f22762d11abf293505bf290f969fd33aa Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Fri, 17 Apr 2026 12:33:58 +0800 Subject: [PATCH] feat(git): add Redis-backed hook worker with per-repo distributed locking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - pool/worker.rs: single-threaded consumer that BLMPOPs from Redis queues sequentially. K8s replicas provide HA — each pod runs one worker. - pool/redis.rs: RedisConsumer with BLMOVE atomic dequeue, ACK/NAK, and retry-with-json support. - pool/types.rs: HookTask, TaskType, PoolConfig (minimal — no pool metrics). - sync/lock.rs: Redis SET NX EX per-repo lock to prevent concurrent workers from processing the same repo. Lock conflicts are handled by requeueing without incrementing retry count. - hook/mod.rs: HookService.start_worker() spawns the background worker. - ssh/mod.rs / http/mod.rs: ReceiveSyncService RPUSHes to Redis queue. Both run_http and run_ssh call start_worker() to launch the consumer. - Lock conflicts (GitError::Locked) in the worker are requeued without incrementing retry_count so another worker can pick them up. --- libs/git/hook/mod.rs | 106 ++-------- libs/git/hook/pool/mod.rs | 41 ++++ libs/git/hook/pool/redis.rs | 167 +++++++++++++++ libs/git/hook/pool/types.rs | 32 +++ libs/git/hook/pool/worker.rs | 388 +++++++++++++++++++++++++++++++++++ libs/git/hook/sync/lock.rs | 66 ++++++ libs/git/hook/sync/mod.rs | 90 ++++---- libs/git/http/mod.rs | 9 +- libs/git/lib.rs | 3 + libs/git/ssh/mod.rs | 88 +++++--- libs/git/ssh/server.rs | 6 +- 11 files changed, 820 insertions(+), 176 deletions(-) create mode 100644 libs/git/hook/pool/mod.rs create mode 100644 libs/git/hook/pool/redis.rs create mode 100644 libs/git/hook/pool/types.rs create mode 100644 libs/git/hook/pool/worker.rs create mode 100644 libs/git/hook/sync/lock.rs diff --git a/libs/git/hook/mod.rs b/libs/git/hook/mod.rs index 568de2b..07a21a3 100644 --- a/libs/git/hook/mod.rs +++ b/libs/git/hook/mod.rs @@ -2,13 +2,20 @@ use config::AppConfig; use db::cache::AppCache; use db::database::AppDatabase; use deadpool_redis::cluster::Pool as RedisPool; -use models::EntityTrait; use slog::Logger; use std::sync::Arc; +use tokio_util::sync::CancellationToken; -/// Simplified hook service — no queue, no pool. -/// K8s StatefulSet HA scheduling ensures only one pod touches a repo at a time. -/// Execution is direct and sequential per invocation. +pub mod pool; +pub mod sync; +pub mod webhook_dispatch; + +pub use pool::{HookWorker, PoolConfig, RedisConsumer}; +pub use pool::types::{HookTask, TaskType}; + +/// Hook service that manages the Redis-backed task queue worker. +/// Multiple gitserver pods can run concurrently — the worker acquires a +/// per-repo Redis lock before processing each task. #[derive(Clone)] pub struct HookService { pub(crate) db: AppDatabase, @@ -38,92 +45,15 @@ impl HookService { } } - /// Full sync: refs → commits → tags → LFS → fsck → gc → skills. - pub async fn sync_repo(&self, repo_id: &str) -> Result<(), crate::GitError> { - let repo_uuid = models::Uuid::parse_str(repo_id) - .map_err(|_| crate::GitError::Internal("invalid repo_id uuid".into()))?; - - let repo = models::repos::repo::Entity::find_by_id(repo_uuid) - .one(self.db.reader()) - .await - .map_err(crate::GitError::from)? - .ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?; - - if !std::path::Path::new(&repo.storage_path).exists() { - return Err(crate::GitError::NotFound(format!( - "storage path does not exist: {}", - repo.storage_path - ))); - } - - let sync = crate::hook::sync::HookMetaDataSync::new( + /// Start the background worker and return a cancellation token. + pub fn start_worker(&self) -> CancellationToken { + let pool_config = PoolConfig::from_env(&self.config); + pool::start_worker( self.db.clone(), self.cache.clone(), - repo, + self.redis_pool.clone(), self.logger.clone(), - )?; - - // No distributed lock needed — K8s StatefulSet scheduling guarantees - // that at most one pod processes a given repo shard at any time. - sync.sync().await - } - - /// Run fsck only (no full sync). - pub async fn fsck_repo(&self, repo_id: &str) -> Result<(), crate::GitError> { - let repo_uuid = models::Uuid::parse_str(repo_id) - .map_err(|_| crate::GitError::Internal("invalid repo_id uuid".into()))?; - - let repo = models::repos::repo::Entity::find_by_id(repo_uuid) - .one(self.db.reader()) - .await - .map_err(crate::GitError::from)? - .ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?; - - if !std::path::Path::new(&repo.storage_path).exists() { - return Err(crate::GitError::NotFound(format!( - "storage path does not exist: {}", - repo.storage_path - ))); - } - - let sync = crate::hook::sync::HookMetaDataSync::new( - self.db.clone(), - self.cache.clone(), - repo, - self.logger.clone(), - )?; - - sync.fsck_only().await - } - - /// Run gc only (no full sync). - pub async fn gc_repo(&self, repo_id: &str) -> Result<(), crate::GitError> { - let repo_uuid = models::Uuid::parse_str(repo_id) - .map_err(|_| crate::GitError::Internal("invalid repo_id uuid".into()))?; - - let repo = models::repos::repo::Entity::find_by_id(repo_uuid) - .one(self.db.reader()) - .await - .map_err(crate::GitError::from)? - .ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?; - - if !std::path::Path::new(&repo.storage_path).exists() { - return Err(crate::GitError::NotFound(format!( - "storage path does not exist: {}", - repo.storage_path - ))); - } - - let sync = crate::hook::sync::HookMetaDataSync::new( - self.db.clone(), - self.cache.clone(), - repo, - self.logger.clone(), - )?; - - sync.gc_only().await + pool_config, + ) } } - -pub mod sync; -pub mod webhook_dispatch; diff --git a/libs/git/hook/pool/mod.rs b/libs/git/hook/pool/mod.rs new file mode 100644 index 0000000..9260473 --- /dev/null +++ b/libs/git/hook/pool/mod.rs @@ -0,0 +1,41 @@ +pub mod redis; +pub mod types; +pub mod worker; + +pub use redis::RedisConsumer; +pub use types::{HookTask, PoolConfig, TaskType}; +pub use worker::HookWorker; + +use db::cache::AppCache; +use db::database::AppDatabase; +use deadpool_redis::cluster::Pool as RedisPool; +use slog::Logger; +use tokio_util::sync::CancellationToken; + +/// Start the hook worker background task. +/// Returns a handle to the cancellation token so the caller can shut it down. +pub fn start_worker( + db: AppDatabase, + cache: AppCache, + redis_pool: RedisPool, + logger: Logger, + config: PoolConfig, +) -> CancellationToken { + let consumer = RedisConsumer::new( + redis_pool.clone(), + config.redis_list_prefix.clone(), + config.redis_block_timeout_secs, + logger.clone(), + ); + + let worker = HookWorker::new(db, cache, logger, consumer); + + let cancel = CancellationToken::new(); + let cancel_clone = cancel.clone(); + + tokio::spawn(async move { + worker.run(cancel_clone).await; + }); + + cancel +} diff --git a/libs/git/hook/pool/redis.rs b/libs/git/hook/pool/redis.rs new file mode 100644 index 0000000..9815606 --- /dev/null +++ b/libs/git/hook/pool/redis.rs @@ -0,0 +1,167 @@ +use crate::error::GitError; +use crate::hook::pool::types::HookTask; +use deadpool_redis::cluster::Connection as RedisConn; +use slog::Logger; + +/// Redis List consumer using BLMOVE for atomic move-from-queue-to-work pattern. +pub struct RedisConsumer { + pool: deadpool_redis::cluster::Pool, + /// Hash-tag-prefixed key prefix, e.g. "{hook}". + prefix: String, + block_timeout_secs: u64, + logger: Logger, +} + +impl RedisConsumer { + pub fn new( + pool: deadpool_redis::cluster::Pool, + prefix: String, + block_timeout_secs: u64, + logger: Logger, + ) -> Self { + Self { + pool, + prefix, + block_timeout_secs, + logger, + } + } + + /// Atomically moves a task from the main queue to the work queue using BLMOVE. + /// Blocks up to `block_timeout_secs` waiting for a task. + /// + /// Returns `Some((HookTask, task_json))` where `task_json` is the raw JSON string + /// needed for LREM on ACK. Returns `None` if the blocking timed out. + pub async fn next(&self, task_type: &str) -> Result, GitError> { + let queue_key = format!("{}:{}", self.prefix, task_type); + let work_key = format!("{}:{}:work", self.prefix, task_type); + + let redis = self + .pool + .get() + .await + .map_err(|e| GitError::Internal(format!("redis pool get failed: {}", e)))?; + + let mut conn: RedisConn = redis; + + // BLMOVE source destination timeout + let task_json: Option = redis::cmd("BLMOVE") + .arg(&queue_key) + .arg(&work_key) + .arg("RIGHT") + .arg("LEFT") + .arg(self.block_timeout_secs) + .query_async(&mut conn) + .await + .map_err(|e| GitError::Internal(format!("BLMOVE failed: {}", e)))?; + + match task_json { + Some(json) => { + match serde_json::from_str::(&json) { + Ok(task) => { + slog::debug!(self.logger, "task dequeued"; + "task_id" => %task.id, + "task_type" => %task.task_type, + "queue" => %queue_key + ); + Ok(Some((task, json))) + } + Err(e) => { + // Malformed task — remove from work queue and discard + slog::warn!(self.logger, "malformed task JSON, discarding"; + "error" => %e, + "queue" => %work_key + ); + let _ = self.ack_raw(&work_key, &json).await; + Ok(None) + } + } + } + None => { + // Timed out, no task available + Ok(None) + } + } + } + + /// Acknowledge a task: remove it from the work queue (LREM). + pub async fn ack(&self, work_key: &str, task_json: &str) -> Result<(), GitError> { + self.ack_raw(work_key, task_json).await + } + + async fn ack_raw(&self, work_key: &str, task_json: &str) -> Result<(), GitError> { + let redis = self + .pool + .get() + .await + .map_err(|e| GitError::Internal(format!("redis pool get failed: {}", e)))?; + + let mut conn: RedisConn = redis; + + let _: i64 = redis::cmd("LREM") + .arg(work_key) + .arg(-1) + .arg(task_json) + .query_async(&mut conn) + .await + .map_err(|e| GitError::Internal(format!("LREM failed: {}", e)))?; + + Ok(()) + } + + /// Negative acknowledge (retry): remove from work queue and push back to main queue. + pub async fn nak( + &self, + work_key: &str, + queue_key: &str, + task_json: &str, + ) -> Result<(), GitError> { + self.nak_with_retry(work_key, queue_key, task_json, task_json).await + } + + /// Negative acknowledge with a different (updated) task JSON — used to + /// requeue with an incremented retry_count. + pub async fn nak_with_retry( + &self, + work_key: &str, + queue_key: &str, + old_task_json: &str, + new_task_json: &str, + ) -> Result<(), GitError> { + self.ack_raw(work_key, old_task_json).await?; + + let redis = self + .pool + .get() + .await + .map_err(|e| GitError::Internal(format!("redis pool get failed: {}", e)))?; + + let mut conn: RedisConn = redis; + + let _: i64 = redis::cmd("LPUSH") + .arg(queue_key) + .arg(new_task_json) + .query_async(&mut conn) + .await + .map_err(|e| GitError::Internal(format!("LPUSH retry failed: {}", e)))?; + + slog::warn!(self.logger, "task nack'd and requeued queue={}", queue_key); + + Ok(()) + } + + pub fn prefix(&self) -> &str { + &self.prefix + } +} + +impl Clone for RedisConsumer { + fn clone(&self) -> Self { + Self { + pool: self.pool.clone(), + prefix: self.prefix.clone(), + block_timeout_secs: self.block_timeout_secs, + logger: self.logger.clone(), + } + } +} diff --git a/libs/git/hook/pool/types.rs b/libs/git/hook/pool/types.rs new file mode 100644 index 0000000..1c6d7de --- /dev/null +++ b/libs/git/hook/pool/types.rs @@ -0,0 +1,32 @@ +use serde::{Deserialize, Serialize}; + +pub use config::hook::PoolConfig; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HookTask { + pub id: String, + pub repo_id: String, + pub task_type: TaskType, + pub payload: serde_json::Value, + pub created_at: chrono::DateTime, + #[serde(default)] + pub retry_count: u32, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum TaskType { + Sync, + Fsck, + Gc, +} + +impl std::fmt::Display for TaskType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TaskType::Sync => write!(f, "sync"), + TaskType::Fsck => write!(f, "fsck"), + TaskType::Gc => write!(f, "gc"), + } + } +} diff --git a/libs/git/hook/pool/worker.rs b/libs/git/hook/pool/worker.rs new file mode 100644 index 0000000..ed01169 --- /dev/null +++ b/libs/git/hook/pool/worker.rs @@ -0,0 +1,388 @@ +use crate::error::GitError; +use crate::hook::pool::redis::RedisConsumer; +use crate::hook::pool::types::{HookTask, TaskType}; +use crate::hook::sync::HookMetaDataSync; +use db::cache::AppCache; +use db::database::AppDatabase; +use models::EntityTrait; +use slog::Logger; +use std::time::Duration; +use tokio_util::sync::CancellationToken; + +/// Single-threaded worker that sequentially consumes tasks from Redis queues. +/// K8s can scale replicas for concurrency — each replica runs one worker. +/// Per-repo Redis locking prevents concurrent workers from processing the same repo. +#[derive(Clone)] +pub struct HookWorker { + db: AppDatabase, + cache: AppCache, + logger: Logger, + consumer: RedisConsumer, +} + +impl HookWorker { + pub fn new( + db: AppDatabase, + cache: AppCache, + logger: Logger, + consumer: RedisConsumer, + ) -> Self { + Self { + db, + cache, + logger, + consumer, + } + } + + /// Run the worker loop. Blocks until cancelled. + pub async fn run(&self, cancel: CancellationToken) { + slog::info!(self.logger, "hook worker started"); + + let task_types = [TaskType::Sync, TaskType::Fsck, TaskType::Gc]; + let poll_interval = Duration::from_millis(500); + + loop { + tokio::select! { + _ = cancel.cancelled() => { + slog::info!(self.logger, "hook worker shutdown signal received"); + break; + } + _ = tokio::time::sleep(poll_interval) => {} + } + + for task_type in &task_types { + let result = self + .consumer + .next(&task_type.to_string()) + .await; + + let (task, task_json) = match result { + Ok(Some(pair)) => pair, + Ok(None) => continue, + Err(e) => { + slog::warn!(self.logger, "failed to dequeue task: {}", e); + tokio::time::sleep(Duration::from_secs(1)).await; + break; + } + }; + + let queue_key = format!("{}:{}", self.consumer.prefix(), task_type); + let work_key = format!("{}:work", queue_key); + + self.process_task(&task, &task_json, &work_key, &queue_key) + .await; + } + } + + slog::info!(self.logger, "hook worker stopped"); + } + + async fn process_task( + &self, + task: &HookTask, + task_json: &str, + work_key: &str, + queue_key: &str, + ) { + slog::info!(self.logger, "task started task_id={} task_type={} repo_id={}", + task.id, task.task_type, task.repo_id); + + let result = match task.task_type { + TaskType::Sync => self.run_sync(&task.repo_id).await, + TaskType::Fsck => self.run_fsck(&task.repo_id).await, + TaskType::Gc => self.run_gc(&task.repo_id).await, + }; + + match result { + Ok(()) => { + if let Err(e) = self.consumer.ack(work_key, task_json).await { + slog::warn!(self.logger, "failed to ack task: {}", e); + } + slog::info!(self.logger, "task completed task_id={}", task.id); + } + Err(e) => { + // GitError::Locked means another worker is processing this repo — + // requeue without incrementing retry count so it can be picked up later. + let is_locked = matches!(e, crate::GitError::Locked(_)); + + if is_locked { + slog::info!(self.logger, "repo locked by another worker, requeueing task_id={}", task.id); + if let Err(nak_err) = self.consumer.nak(work_key, queue_key, task_json).await { + slog::warn!(self.logger, "failed to requeue locked task: {}", nak_err); + } + } else { + slog::warn!(self.logger, "task failed task_id={} task_type={} repo_id={} error={}", + task.id, task.task_type, task.repo_id, e); + + const MAX_RETRIES: u32 = 5; + if task.retry_count >= MAX_RETRIES { + slog::warn!(self.logger, "task exhausted retries, discarding task_id={} retry_count={}", + task.id, task.retry_count); + let _ = self.consumer.ack(work_key, task_json).await; + } else { + let mut task = task.clone(); + task.retry_count += 1; + let retry_json = + serde_json::to_string(&task).unwrap_or_else(|_| task_json.to_string()); + let _ = self + .consumer + .nak_with_retry(work_key, queue_key, task_json, &retry_json) + .await; + } + } + } + } + } + + async fn run_sync(&self, repo_id: &str) -> Result<(), GitError> { + let repo_uuid = models::Uuid::parse_str(repo_id) + .map_err(|_| GitError::Internal("invalid repo_id uuid".into()))?; + + let repo = models::repos::repo::Entity::find_by_id(repo_uuid) + .one(self.db.reader()) + .await + .map_err(GitError::from)? + .ok_or_else(|| GitError::NotFound(format!("repo {} not found", repo_id)))?; + + if !std::path::Path::new(&repo.storage_path).exists() { + return Err(GitError::NotFound(format!( + "storage path does not exist: {}", + repo.storage_path + ))); + } + + // Capture before tips + let before_tips = tokio::task::spawn_blocking({ + let db = self.db.clone(); + let cache = self.cache.clone(); + let logger = self.logger.clone(); + let repo = repo.clone(); + move || { + let sync = HookMetaDataSync::new(db, cache, repo, logger) + .map_err(|e| GitError::Internal(e.to_string()))?; + Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips())) + } + }) + .await + .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))? + .map_err(GitError::from)?; + + // Run sync + tokio::task::spawn_blocking({ + let db = self.db.clone(); + let cache = self.cache.clone(); + let logger = self.logger.clone(); + let repo = repo.clone(); + move || { + let result = tokio::runtime::Handle::current().block_on(async { + let sync = HookMetaDataSync::new(db, cache, repo, logger)?; + sync.sync().await + }); + match result { + Ok(()) => Ok::<(), GitError>(()), + Err(e) => Err(GitError::Internal(e.to_string())), + } + } + }) + .await + .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))? + .map_err(GitError::from)?; + + // Capture after tips + let after_tips = tokio::task::spawn_blocking({ + let db = self.db.clone(); + let cache = self.cache.clone(); + let logger = self.logger.clone(); + let repo = repo.clone(); + move || { + let sync = HookMetaDataSync::new(db, cache, repo, logger) + .map_err(|e| GitError::Internal(e.to_string()))?; + Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips())) + } + }) + .await + .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))? + .map_err(GitError::from)?; + + let (before_branch_tips, before_tag_tips) = before_tips; + let (after_branch_tips, after_tag_tips) = after_tips; + let project = repo.project; + + // Dispatch branch push webhooks + for (branch, after_oid) in after_branch_tips { + let before_oid = before_branch_tips + .iter() + .find(|(n, _)| n == &branch) + .map(|(_, o)| o.as_str()); + let changed = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(true); + if changed { + let before_oid = before_oid.map_or("0", |v| v).to_string(); + let branch_name = branch.clone(); + let db = self.db.clone(); + let logger = self.logger.clone(); + let repo_id_str = repo.id.to_string(); + let repo_name = repo.repo_name.clone(); + let default_branch = repo.default_branch.clone(); + let ns = models::projects::Project::find_by_id(project) + .one(self.db.reader()) + .await + .ok() + .flatten() + .map(|p| p.name) + .unwrap_or_default(); + + tokio::spawn(async move { + crate::hook::webhook_dispatch::dispatch_repo_webhooks( + &db, + &reqwest::Client::new(), + &logger, + &repo_id_str, + &ns, + &repo_name, + &default_branch, + "", + "", + crate::hook::webhook_dispatch::WebhookEventKind::Push { + r#ref: format!("refs/heads/{}", branch_name), + before: before_oid, + after: after_oid, + commits: vec![], + }, + ) + .await; + }); + } + } + + // Dispatch tag push webhooks + for (tag, after_oid) in after_tag_tips { + let before_oid = before_tag_tips + .iter() + .find(|(n, _)| n == &tag) + .map(|(_, o)| o.as_str()); + let is_new = before_oid.is_none(); + let was_updated = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(false); + if is_new || was_updated { + let before_oid = before_oid.map_or("0", |v| v).to_string(); + let tag_name = tag.clone(); + let db = self.db.clone(); + let logger = self.logger.clone(); + let repo_id_str = repo.id.to_string(); + let repo_name = repo.repo_name.clone(); + let default_branch = repo.default_branch.clone(); + let ns = models::projects::Project::find_by_id(project) + .one(self.db.reader()) + .await + .ok() + .flatten() + .map(|p| p.name) + .unwrap_or_default(); + + tokio::spawn(async move { + crate::hook::webhook_dispatch::dispatch_repo_webhooks( + &db, + &reqwest::Client::new(), + &logger, + &repo_id_str, + &ns, + &repo_name, + &default_branch, + "", + "", + crate::hook::webhook_dispatch::WebhookEventKind::TagPush { + r#ref: format!("refs/tags/{}", tag_name), + before: before_oid, + after: after_oid, + }, + ) + .await; + }); + } + } + + Ok(()) + } + + async fn run_fsck(&self, repo_id: &str) -> Result<(), GitError> { + let repo_uuid = models::Uuid::parse_str(repo_id) + .map_err(|_| GitError::Internal("invalid repo_id uuid".into()))?; + + let repo = models::repos::repo::Entity::find_by_id(repo_uuid) + .one(self.db.reader()) + .await + .map_err(GitError::from)? + .ok_or_else(|| GitError::NotFound(format!("repo {} not found", repo_id)))?; + + if !std::path::Path::new(&repo.storage_path).exists() { + return Err(GitError::NotFound(format!( + "storage path does not exist: {}", + repo.storage_path + ))); + } + + let db = self.db.clone(); + let cache = self.cache.clone(); + let logger = self.logger.clone(); + + tokio::task::spawn_blocking({ + let repo = repo.clone(); + move || { + let result = tokio::runtime::Handle::current().block_on(async { + let sync = HookMetaDataSync::new(db, cache, repo, logger)?; + sync.fsck_only().await + }); + match result { + Ok(()) => Ok::<(), GitError>(()), + Err(e) => Err(GitError::Internal(e.to_string())), + } + } + }) + .await + .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))? + .map_err(GitError::from)?; + + Ok(()) + } + + async fn run_gc(&self, repo_id: &str) -> Result<(), GitError> { + let repo_uuid = models::Uuid::parse_str(repo_id) + .map_err(|_| GitError::Internal("invalid repo_id uuid".into()))?; + + let repo = models::repos::repo::Entity::find_by_id(repo_uuid) + .one(self.db.reader()) + .await + .map_err(GitError::from)? + .ok_or_else(|| GitError::NotFound(format!("repo {} not found", repo_id)))?; + + if !std::path::Path::new(&repo.storage_path).exists() { + return Err(GitError::NotFound(format!( + "storage path does not exist: {}", + repo.storage_path + ))); + } + + let db = self.db.clone(); + let cache = self.cache.clone(); + let logger = self.logger.clone(); + + tokio::task::spawn_blocking({ + let repo = repo.clone(); + move || { + let result = tokio::runtime::Handle::current().block_on(async { + let sync = HookMetaDataSync::new(db, cache, repo, logger)?; + sync.gc_only().await + }); + match result { + Ok(()) => Ok::<(), GitError>(()), + Err(e) => Err(GitError::Internal(e.to_string())), + } + } + }) + .await + .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))? + .map_err(GitError::from)?; + + Ok(()) + } +} diff --git a/libs/git/hook/sync/lock.rs b/libs/git/hook/sync/lock.rs new file mode 100644 index 0000000..660742c --- /dev/null +++ b/libs/git/hook/sync/lock.rs @@ -0,0 +1,66 @@ +use crate::GitError; +use crate::hook::sync::HookMetaDataSync; + +impl HookMetaDataSync { + const LOCK_TTL_SECS: u64 = 60; + + /// Try to acquire an exclusive lock for this repo. + /// Returns the lock value if acquired, which must be passed to `release_lock`. + pub async fn acquire_lock(&self) -> Result { + let lock_key = format!("git:repo:lock:{}", self.repo.id); + let lock_value = format!("{}:{}", uuid::Uuid::new_v4(), std::process::id()); + + let mut conn = self + .cache + .conn() + .await + .map_err(|e| GitError::IoError(format!("failed to get redis connection: {}", e)))?; + + let result: bool = redis::cmd("SET") + .arg(&lock_key) + .arg(&lock_value) + .arg("NX") + .arg("EX") + .arg(Self::LOCK_TTL_SECS) + .query_async(&mut conn) + .await + .map_err(|e| GitError::IoError(format!("failed to acquire lock: {}", e)))?; + + if result { + Ok(lock_value) + } else { + Err(GitError::Locked(format!( + "repository {} is locked by another process", + self.repo.id + ))) + } + } + + /// Release the lock, but only if we still own it (value matches). + pub async fn release_lock(&self, lock_value: &str) -> Result<(), GitError> { + let lock_key = format!("git:repo:lock:{}", self.repo.id); + + let mut conn = self + .cache + .conn() + .await + .map_err(|e| GitError::IoError(format!("failed to get redis connection: {}", e)))?; + + let script = r#" + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("del", KEYS[1]) + else + return 0 + end + "#; + + let _: i32 = redis::Script::new(script) + .key(&lock_key) + .arg(lock_value) + .invoke_async(&mut conn) + .await + .map_err(|e| GitError::IoError(format!("failed to release lock: {}", e)))?; + + Ok(()) + } +} diff --git a/libs/git/hook/sync/mod.rs b/libs/git/hook/sync/mod.rs index 00112cb..5d4629a 100644 --- a/libs/git/hook/sync/mod.rs +++ b/libs/git/hook/sync/mod.rs @@ -3,6 +3,7 @@ pub mod commit; pub mod fsck; pub mod gc; pub mod lfs; +pub mod lock; pub mod tag; use db::cache::AppCache; @@ -158,73 +159,56 @@ impl HookMetaDataSync { } /// Full sync: refs → commits → tags → LFS → fsck → gc → skills. - /// No distributed lock — K8s StatefulSet scheduling guarantees exclusive access. + /// Acquires a per-repo Redis distributed lock to prevent concurrent workers + /// from processing the same repository simultaneously. pub async fn sync(&self) -> Result<(), crate::GitError> { - // All git2 operations must run on a blocking thread since git2 types are not Send. - let db = self.db.clone(); - let cache = self.cache.clone(); - let repo = self.repo.clone(); - let logger = self.logger.clone(); + let lock_value = self.acquire_lock().await?; - let res = tokio::task::spawn_blocking(move || { - tokio::runtime::Handle::current().block_on(async move { - let sync = Self::new(db, cache, repo, logger)?; - let out = sync.sync_full().await; - if let Err(ref e) = out { - slog::error!(sync.logger, "sync failed: {}", e); - } - out - }) - }) - .await - .map_err(|e| crate::GitError::Internal(format!("spawn_blocking join error: {}", e)))??; + let res = self.sync_full().await; - Ok(res) + if let Err(ref e) = res { + slog::error!(self.logger, "sync failed: {}", e); + } + + if let Err(release_err) = self.release_lock(&lock_value).await { + slog::error!(self.logger, "failed to release lock: {}", release_err); + } + + res } /// Run fsck only (refs snapshot + git fsck + rollback on corruption). + /// Acquires a per-repo Redis lock. pub async fn fsck_only(&self) -> Result<(), crate::GitError> { - let db = self.db.clone(); - let cache = self.cache.clone(); - let repo = self.repo.clone(); - let logger = self.logger.clone(); + let lock_value = self.acquire_lock().await?; - tokio::task::spawn_blocking(move || { - tokio::runtime::Handle::current().block_on(async move { - let sync = Self::new(db, cache, repo, logger)?; - let mut txn = sync.db.begin().await.map_err(|e| { - crate::GitError::IoError(format!("failed to begin transaction: {}", e)) - })?; - sync.run_fsck_and_rollback_if_corrupt(&mut txn).await?; - txn.commit().await.map_err(|e| { - crate::GitError::IoError(format!("failed to commit transaction: {}", e)) - })?; - Ok::<(), crate::GitError>(()) - }) - }) - .await - .map_err(|e| crate::GitError::Internal(format!("spawn_blocking join error: {}", e)))??; + let res = async { + let mut txn = self + .db + .begin() + .await + .map_err(|e| crate::GitError::IoError(format!("failed to begin transaction: {}", e)))?; + self.run_fsck_and_rollback_if_corrupt(&mut txn) + .await?; + txn.commit().await.map_err(|e| { + crate::GitError::IoError(format!("failed to commit transaction: {}", e)) + })?; + Ok::<(), crate::GitError>(()) + } + .await; - Ok(()) + let _ = self.release_lock(&lock_value).await; + res } - /// Run gc only. + /// Run gc only. Acquires a per-repo Redis lock. pub async fn gc_only(&self) -> Result<(), crate::GitError> { - let db = self.db.clone(); - let cache = self.cache.clone(); - let repo = self.repo.clone(); - let logger = self.logger.clone(); + let lock_value = self.acquire_lock().await?; - tokio::task::spawn_blocking(move || { - tokio::runtime::Handle::current().block_on(async move { - let sync = Self::new(db, cache, repo, logger)?; - sync.run_gc().await - }) - }) - .await - .map_err(|e| crate::GitError::Internal(format!("spawn_blocking join error: {}", e)))??; + let res = self.run_gc().await; - Ok(()) + let _ = self.release_lock(&lock_value).await; + res } /// Full sync pipeline inside a single DB transaction. diff --git a/libs/git/http/mod.rs b/libs/git/http/mod.rs index 76df9ed..071998a 100644 --- a/libs/git/http/mod.rs +++ b/libs/git/http/mod.rs @@ -72,16 +72,19 @@ pub async fn run_http(config: AppConfig, logger: Logger) -> anyhow::Result<()> { let app_cache = app_cache?; let redis_pool = app_cache.redis_pool().clone(); - let http = Arc::new(reqwest::Client::new()); + let http_client = Arc::new(reqwest::Client::new()); let hook = HookService::new( db.clone(), app_cache.clone(), redis_pool.clone(), logger.clone(), config.clone(), - http, + http_client, ); - let sync = crate::ssh::ReceiveSyncService::new(hook); + let _worker_cancel = hook.start_worker(); + slog::info!(logger, "hook worker started"); + + let sync = crate::ssh::ReceiveSyncService::new(redis_pool.clone(), logger.clone()); let rate_limiter = Arc::new(rate_limit::RateLimiter::new( rate_limit::RateLimitConfig::default(), diff --git a/libs/git/lib.rs b/libs/git/lib.rs index c5de5ec..ddefdb2 100644 --- a/libs/git/lib.rs +++ b/libs/git/lib.rs @@ -36,6 +36,9 @@ pub use diff::types::{ }; pub use domain::GitDomain; pub use error::{GitError, GitResult}; +pub use hook::pool::types::{HookTask, TaskType}; +pub use hook::pool::PoolConfig; +pub use hook::pool::HookWorker; pub use hook::sync::HookMetaDataSync; pub use lfs::types::{LfsConfig, LfsEntry, LfsOid, LfsPointer}; pub use merge::types::{MergeAnalysisResult, MergeOptions, MergePreferenceResult, MergeheadInfo}; diff --git a/libs/git/ssh/mod.rs b/libs/git/ssh/mod.rs index bcdebc9..a81d49f 100644 --- a/libs/git/ssh/mod.rs +++ b/libs/git/ssh/mod.rs @@ -1,5 +1,6 @@ use crate::error::GitError; use crate::hook::HookService; +use crate::hook::pool::types::{HookTask, TaskType}; use anyhow::Context; use base64::Engine; use config::AppConfig; @@ -137,22 +138,12 @@ impl SSHHandle { "SSH server configured with methods: {:?}", config.methods ); let token_service = SshTokenService::new(self.db.clone()); - let http = Arc::new(reqwest::Client::new()); - let hook = crate::hook::HookService::new( - self.db.clone(), - self.cache.clone(), - self.redis_pool.clone(), - self.logger.clone(), - self.app.clone(), - http, - ); let mut server = server::SSHServer::new( self.db.clone(), self.cache.clone(), self.redis_pool.clone(), self.logger.clone(), token_service, - hook, ); // Start the rate limiter cleanup background task so the HashMap @@ -177,33 +168,63 @@ impl SSHHandle { } } -/// Direct sync service — calls HookService::sync_repo inline. -/// K8s StatefulSet HA scheduling ensures exclusive access per repo shard. +/// Enqueues a sync task to the Redis-backed hook queue. +/// The background worker picks it up and processes it with per-repo locking. #[derive(Clone)] pub struct ReceiveSyncService { - hook: HookService, + pool: deadpool_redis::cluster::Pool, + logger: Logger, + redis_prefix: String, } impl ReceiveSyncService { - pub fn new(hook: HookService) -> Self { - Self { hook } + pub fn new(pool: deadpool_redis::cluster::Pool, logger: Logger) -> Self { + Self { + pool, + logger, + redis_prefix: "{hook}".to_string(), + } } - /// Execute a full repo sync synchronously. - /// Returns Ok on success, Err on failure. - pub async fn send(&self, task: RepoReceiveSyncTask) -> Result<(), crate::GitError> { - let repo_id = task.repo_uid.to_string(); - slog::info!(self.hook.logger, "starting sync repo_id={}", repo_id); - let res = self.hook.sync_repo(&repo_id).await; - match &res { - Ok(()) => { - slog::info!(self.hook.logger, "sync completed repo_id={}", repo_id); - } + /// Enqueue a sync task. Fire-and-forget — logs errors but does not block. + pub async fn send(&self, task: RepoReceiveSyncTask) { + let hook_task = HookTask { + id: uuid::Uuid::new_v4().to_string(), + repo_id: task.repo_uid.to_string(), + task_type: TaskType::Sync, + payload: serde_json::Value::Null, + created_at: chrono::Utc::now(), + retry_count: 0, + }; + + let task_json = match serde_json::to_string(&hook_task) { + Ok(j) => j, Err(e) => { - slog::error!(self.hook.logger, "sync failed repo_id={} error={}", repo_id, e); + error!(self.logger, "failed to serialize hook task: {}", e); + return; } + }; + + let queue_key = format!("{}:sync", self.redis_prefix); + + let redis = match self.pool.get().await { + Ok(c) => c, + Err(e) => { + error!(self.logger, "failed to get Redis connection: {}", e); + return; + } + }; + + let mut conn: deadpool_redis::cluster::Connection = redis; + if let Err(e) = redis::cmd("LPUSH") + .arg(&queue_key) + .arg(&task_json) + .query_async::<()>(&mut conn) + .await + { + error!(self.logger, "failed to enqueue sync task repo_id={} error={}", + task.repo_uid, e); } - res } } @@ -267,6 +288,19 @@ pub async fn run_ssh(config: AppConfig, logger: Logger) -> anyhow::Result<()> { let db = AppDatabase::init(&config).await?; let cache = AppCache::init(&config).await?; let redis_pool = cache.redis_pool().clone(); + + // Start the hook worker (Redis queue consumer) + let hook = crate::hook::HookService::new( + db.clone(), + cache.clone(), + redis_pool.clone(), + logger.clone(), + config.clone(), + Arc::new(reqwest::Client::new()), + ); + let _worker_cancel = hook.start_worker(); + slog::info!(logger, "hook worker started"); + SSHHandle::new(db, config.clone(), cache, redis_pool, logger) .run_ssh() .await?; diff --git a/libs/git/ssh/server.rs b/libs/git/ssh/server.rs index 989b8a6..ea87b4e 100644 --- a/libs/git/ssh/server.rs +++ b/libs/git/ssh/server.rs @@ -1,4 +1,3 @@ -use crate::hook::HookService; use crate::ssh::ReceiveSyncService; use crate::ssh::SshTokenService; use crate::ssh::handle::SSHandle; @@ -16,7 +15,6 @@ pub struct SSHServer { pub redis_pool: RedisPool, pub logger: Logger, pub token_service: SshTokenService, - pub hook: HookService, } impl SSHServer { @@ -26,7 +24,6 @@ impl SSHServer { redis_pool: RedisPool, logger: Logger, token_service: SshTokenService, - hook: HookService, ) -> Self { SSHServer { db, @@ -34,7 +31,6 @@ impl SSHServer { redis_pool, logger, token_service, - hook, } } } @@ -52,7 +48,7 @@ impl russh::server::Server for SSHServer { } else { info!(self.logger, "New SSH connection from unknown address"); } - let sync_service = ReceiveSyncService::new(self.hook.clone()); + let sync_service = ReceiveSyncService::new(self.redis_pool.clone(), self.logger.clone()); SSHandle::new( self.db.clone(), self.cache.clone(),