From eeb99bf628d9e2574e52f10871502c544c5c64e9 Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Fri, 17 Apr 2026 12:22:09 +0800 Subject: [PATCH] refactor(git): drop hook pool, sync execution is now direct and sequential MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove entire pool/ directory (RedisConsumer, CpuMonitor, LogStream, HookTask, TaskType) - Remove Redis distributed lock (acquire_lock/release_lock) — K8s StatefulSet scheduling guarantees exclusive access per repo shard - Remove sync/lock.rs, sync/remote.rs, sync/status.rs (dead code) - Remove hook/event.rs (GitHookEvent was never used) - New HookService exposes sync_repo / fsck_repo / gc_repo directly - ReceiveSyncService now calls HookService inline instead of LPUSH to Redis queue - sync/mod.rs: git2 operations wrapped in spawn_blocking for Send safety (git2 types are not Send — async git2 operations must not cross await points) - scripts/push.js: drop 'frontend' from docker push list (embedded into static binary) --- libs/git/hook/event.rs | 7 - libs/git/hook/mod.rs | 107 +++++-- libs/git/hook/pool/log.rs | 103 ------- libs/git/hook/pool/metrics.rs | 42 --- libs/git/hook/pool/mod.rs | 526 ---------------------------------- libs/git/hook/pool/redis.rs | 177 ------------ libs/git/hook/pool/types.rs | 44 --- libs/git/hook/sync/lock.rs | 63 ---- libs/git/hook/sync/mod.rs | 111 +++++-- libs/git/hook/sync/remote.rs | 0 libs/git/hook/sync/status.rs | 98 ------- libs/git/http/mod.rs | 12 +- libs/git/lib.rs | 3 +- libs/git/ssh/mod.rs | 71 ++--- libs/git/ssh/server.rs | 6 +- scripts/push.js | 46 +-- 16 files changed, 231 insertions(+), 1185 deletions(-) delete mode 100644 libs/git/hook/event.rs delete mode 100644 libs/git/hook/pool/log.rs delete mode 100644 libs/git/hook/pool/metrics.rs delete mode 100644 libs/git/hook/pool/mod.rs delete mode 100644 libs/git/hook/pool/redis.rs delete mode 100644 libs/git/hook/pool/types.rs delete mode 100644 libs/git/hook/sync/lock.rs delete mode 100644 libs/git/hook/sync/remote.rs delete mode 100644 libs/git/hook/sync/status.rs diff --git a/libs/git/hook/event.rs b/libs/git/hook/event.rs deleted file mode 100644 index 75d4712..0000000 --- a/libs/git/hook/event.rs +++ /dev/null @@ -1,7 +0,0 @@ -use models::RepoId; -use serde::{Deserialize, Serialize}; - -#[derive(Deserialize, Serialize, Clone, Debug, Ord, PartialOrd, PartialEq, Eq)] -pub enum GitHookEvent { - MetaDataSync(RepoId), -} diff --git a/libs/git/hook/mod.rs b/libs/git/hook/mod.rs index 9d1dc9e..568de2b 100644 --- a/libs/git/hook/mod.rs +++ b/libs/git/hook/mod.rs @@ -2,13 +2,15 @@ use config::AppConfig; use db::cache::AppCache; use db::database::AppDatabase; use deadpool_redis::cluster::Pool as RedisPool; +use models::EntityTrait; use slog::Logger; use std::sync::Arc; -use crate::hook::pool::GitHookPool; - +/// Simplified hook service — no queue, no pool. +/// K8s StatefulSet HA scheduling ensures only one pod touches a repo at a time. +/// Execution is direct and sequential per invocation. #[derive(Clone)] -pub struct GitServiceHooks { +pub struct HookService { pub(crate) db: AppDatabase, pub(crate) cache: AppCache, pub(crate) redis_pool: RedisPool, @@ -17,7 +19,7 @@ pub struct GitServiceHooks { pub(crate) http: Arc, } -impl GitServiceHooks { +impl HookService { pub fn new( db: AppDatabase, cache: AppCache, @@ -36,35 +38,92 @@ impl GitServiceHooks { } } - pub async fn run( - self, - cancel: tokio_util::sync::CancellationToken, - ) -> Result<(), crate::GitError> { - let pool_config = config::hook::PoolConfig::from_env(&self.config); + /// Full sync: refs → commits → tags → LFS → fsck → gc → skills. + pub async fn sync_repo(&self, repo_id: &str) -> Result<(), crate::GitError> { + let repo_uuid = models::Uuid::parse_str(repo_id) + .map_err(|_| crate::GitError::Internal("invalid repo_id uuid".into()))?; - let pool = GitHookPool::new( - pool_config, - self.db, - self.cache, - self.redis_pool, + let repo = models::repos::repo::Entity::find_by_id(repo_uuid) + .one(self.db.reader()) + .await + .map_err(crate::GitError::from)? + .ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?; + + if !std::path::Path::new(&repo.storage_path).exists() { + return Err(crate::GitError::NotFound(format!( + "storage path does not exist: {}", + repo.storage_path + ))); + } + + let sync = crate::hook::sync::HookMetaDataSync::new( + self.db.clone(), + self.cache.clone(), + repo, self.logger.clone(), - self.http, - ) - .await?; + )?; - let pool_arc = Arc::new(pool); + // No distributed lock needed — K8s StatefulSet scheduling guarantees + // that at most one pod processes a given repo shard at any time. + sync.sync().await + } - slog::info!(self.logger, "git hook service started"); + /// Run fsck only (no full sync). + pub async fn fsck_repo(&self, repo_id: &str) -> Result<(), crate::GitError> { + let repo_uuid = models::Uuid::parse_str(repo_id) + .map_err(|_| crate::GitError::Internal("invalid repo_id uuid".into()))?; - pool_arc.run(cancel).await; + let repo = models::repos::repo::Entity::find_by_id(repo_uuid) + .one(self.db.reader()) + .await + .map_err(crate::GitError::from)? + .ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?; - slog::info!(self.logger, "git hook service stopped"); + if !std::path::Path::new(&repo.storage_path).exists() { + return Err(crate::GitError::NotFound(format!( + "storage path does not exist: {}", + repo.storage_path + ))); + } - Ok(()) + let sync = crate::hook::sync::HookMetaDataSync::new( + self.db.clone(), + self.cache.clone(), + repo, + self.logger.clone(), + )?; + + sync.fsck_only().await + } + + /// Run gc only (no full sync). + pub async fn gc_repo(&self, repo_id: &str) -> Result<(), crate::GitError> { + let repo_uuid = models::Uuid::parse_str(repo_id) + .map_err(|_| crate::GitError::Internal("invalid repo_id uuid".into()))?; + + let repo = models::repos::repo::Entity::find_by_id(repo_uuid) + .one(self.db.reader()) + .await + .map_err(crate::GitError::from)? + .ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?; + + if !std::path::Path::new(&repo.storage_path).exists() { + return Err(crate::GitError::NotFound(format!( + "storage path does not exist: {}", + repo.storage_path + ))); + } + + let sync = crate::hook::sync::HookMetaDataSync::new( + self.db.clone(), + self.cache.clone(), + repo, + self.logger.clone(), + )?; + + sync.gc_only().await } } -pub mod event; -pub mod pool; pub mod sync; pub mod webhook_dispatch; diff --git a/libs/git/hook/pool/log.rs b/libs/git/hook/pool/log.rs deleted file mode 100644 index 142ebd6..0000000 --- a/libs/git/hook/pool/log.rs +++ /dev/null @@ -1,103 +0,0 @@ -use deadpool_redis::cluster::Pool; -use serde::Serialize; -use std::sync::Arc; - -#[derive(Debug, Clone, Serialize)] -pub struct TaskLog { - pub task_id: String, - pub repo_id: String, - pub worker_id: String, - pub level: String, - pub message: String, - pub timestamp: chrono::DateTime, -} - -pub struct LogStream { - channel: String, - worker_id: String, - pool: Arc, -} - -impl Clone for LogStream { - fn clone(&self) -> Self { - Self { - channel: self.channel.clone(), - worker_id: self.worker_id.clone(), - pool: self.pool.clone(), - } - } -} - -impl LogStream { - pub fn new(channel: String, worker_id: String, pool: Arc) -> Self { - Self { - channel, - worker_id, - pool, - } - } - - async fn publish_log(&self, log: TaskLog) { - let data = match serde_json::to_vec(&log) { - Ok(d) => d, - Err(e) => { - eprintln!("failed to serialize log: {}", e); - return; - } - }; - - let redis = match self.pool.get().await { - Ok(c) => c, - Err(e) => { - eprintln!("redis pool get failed: {}", e); - return; - } - }; - - let mut conn: deadpool_redis::cluster::Connection = redis; - if let Err(e) = redis::cmd("PUBLISH") - .arg(&self.channel) - .arg(&data) - .query_async::<()>(&mut conn) - .await - { - eprintln!("Redis PUBLISH failed: {}", e); - } - } - - pub async fn info(&self, task_id: &str, repo_id: &str, message: &str) { - self.publish_log(TaskLog { - task_id: task_id.to_string(), - repo_id: repo_id.to_string(), - worker_id: self.worker_id.clone(), - level: "info".to_string(), - message: message.to_string(), - timestamp: chrono::Utc::now(), - }) - .await; - } - - pub async fn error(&self, task_id: &str, repo_id: &str, message: &str) { - self.publish_log(TaskLog { - task_id: task_id.to_string(), - repo_id: repo_id.to_string(), - worker_id: self.worker_id.clone(), - level: "error".to_string(), - message: message.to_string(), - timestamp: chrono::Utc::now(), - }) - .await; - } - - pub async fn warn(&self, task_id: &str, repo_id: &str, message: &str) { - self.publish_log(TaskLog { - task_id: task_id.to_string(), - repo_id: repo_id.to_string(), - worker_id: self.worker_id.clone(), - level: "warn".to_string(), - message: message.to_string(), - timestamp: chrono::Utc::now(), - }) - .await; - } -} diff --git a/libs/git/hook/pool/metrics.rs b/libs/git/hook/pool/metrics.rs deleted file mode 100644 index ba58382..0000000 --- a/libs/git/hook/pool/metrics.rs +++ /dev/null @@ -1,42 +0,0 @@ -use std::sync::Arc; -use sysinfo::System; -use tokio::sync::RwLock; - -pub struct CpuMonitor { - sys: Arc>, -} - -impl CpuMonitor { - pub fn new() -> Self { - let mut sys = System::new(); - sys.refresh_cpu_all(); - Self { - sys: Arc::new(RwLock::new(sys)), - } - } - - pub async fn cpu_usage(&self) -> f32 { - let mut sys = self.sys.write().await; - sys.refresh_cpu_all(); - sys.global_cpu_usage() - } - - pub async fn can_accept_task( - &self, - max_concurrent: usize, - cpu_threshold: f32, - running: usize, - ) -> bool { - if running >= max_concurrent { - return false; - } - let cpu = self.cpu_usage().await; - cpu < cpu_threshold - } -} - -impl Default for CpuMonitor { - fn default() -> Self { - Self::new() - } -} diff --git a/libs/git/hook/pool/mod.rs b/libs/git/hook/pool/mod.rs deleted file mode 100644 index c7e513f..0000000 --- a/libs/git/hook/pool/mod.rs +++ /dev/null @@ -1,526 +0,0 @@ -pub mod log; -pub mod metrics; -pub mod redis; -pub mod types; - -use db::cache::AppCache; -use db::database::AppDatabase; -use deadpool_redis::cluster::Pool as RedisPool; -use sea_orm::EntityTrait; -use slog::Logger; -use std::sync::Arc; -use std::sync::atomic::{AtomicU64, Ordering}; -use tokio::sync::Semaphore; -use tokio::task::{JoinSet, spawn_blocking}; -use tokio_util::sync::CancellationToken; - -use crate::hook::pool::log::LogStream; -use crate::hook::pool::metrics::CpuMonitor; -use crate::hook::pool::redis::RedisConsumer; -use crate::hook::pool::types::{HookTask, PoolConfig, PoolMetrics, TaskType}; -use crate::hook::sync::HookMetaDataSync; - -pub struct GitHookPool { - config: PoolConfig, - db: AppDatabase, - cache: AppCache, - logger: Logger, - cpu_monitor: CpuMonitor, - consumer: RedisConsumer, - log_stream: LogStream, - running_count: Arc, - total_processed: Arc, - total_failed: Arc, - semaphore: Arc, - http: Arc, -} - -impl GitHookPool { - pub async fn new( - config: PoolConfig, - db: AppDatabase, - cache: AppCache, - redis_pool: RedisPool, - logger: Logger, - http: Arc, - ) -> Result { - let consumer = RedisConsumer::new( - redis_pool.clone(), - config.redis_list_prefix.clone(), - config.redis_block_timeout_secs, - logger.clone(), - ); - - let log_stream = LogStream::new( - config.redis_log_channel.clone(), - config.worker_id.clone(), - Arc::new(redis_pool), - ); - - Ok(Self { - config, - db, - cache, - logger, - cpu_monitor: CpuMonitor::new(), - consumer, - log_stream, - running_count: Arc::new(AtomicU64::new(0)), - total_processed: Arc::new(AtomicU64::new(0)), - total_failed: Arc::new(AtomicU64::new(0)), - semaphore: Arc::new(Semaphore::new(num_cpus::get())), - http, - }) - } - - pub async fn run(self: Arc, cancel: CancellationToken) { - let mut join_set = JoinSet::<()>::new(); - let cancel_clone = cancel.clone(); - - // Task types to poll - let task_types = [TaskType::Sync, TaskType::Fsck, TaskType::Gc]; - - loop { - tokio::select! { - _ = cancel_clone.cancelled() => { - slog::info!(self.logger, "pool received shutdown signal, draining {} tasks", join_set.len()); - while join_set.join_next().await.is_some() {} - slog::info!(self.logger, "pool shutdown complete"); - break; - } - - _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {} - } - - let running = self.running_count.load(Ordering::Relaxed) as usize; - let can_accept = self - .cpu_monitor - .can_accept_task( - self.config.max_concurrent, - self.config.cpu_threshold, - running, - ) - .await; - - if !can_accept { - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - continue; - } - - // Poll each task type in round-robin fashion - for task_type in &task_types { - let result = self.consumer.next(&task_type.to_string()).await; - - let (task, task_json) = match result { - Ok(Some(pair)) => pair, - Ok(None) => continue, // timeout, try next queue - Err(e) => { - slog::warn!(self.logger, "failed to dequeue task: {}", e); - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - break; - } - }; - - let self_clone = self.clone(); - - // Compute queue/work keys for ACK/NAK - let queue_key = format!( - "{}:{}", - self_clone.config.redis_list_prefix, - task_type.to_string() - ); - let work_key = format!("{}:work", queue_key); - - let permit = match self_clone.semaphore.clone().acquire_owned().await { - Ok(p) => p, - Err(_) => continue, - }; - - let self_clone2 = self.clone(); - self_clone2.running_count.fetch_add(1, Ordering::Relaxed); - let logger_clone = self_clone2.logger.clone(); - let counter_clone = self_clone2.running_count.clone(); - join_set.spawn(async move { - let panicked = match spawn_blocking(move || { - std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { - tokio::runtime::Handle::current().block_on(async { - self_clone2 - .execute_task_body(task, task_json, queue_key, work_key) - .await - }) - })) - }) - .await - { - Ok(Ok(Ok(()))) => false, // spawn_blocking Ok, catch_unwind Ok, body Ok - Ok(Ok(Err(_))) => false, // spawn_blocking Ok, catch_unwind Ok, body Err(()) — error handled inside - Ok(Err(_)) => true, // spawn_blocking Ok, catch_unwind Err = panic - Err(_) => true, // spawn_blocking Err = thread aborted - }; - drop(permit); - counter_clone.fetch_sub(1, Ordering::Relaxed); - if panicked { - slog::error!(logger_clone, "{}", format!("task panicked")); - } - }); - - // Only process one task per loop iteration to avoid overwhelming the pool - break; - } - } - } - - async fn execute_task_body( - &self, - task: HookTask, - task_json: String, - queue_key: String, - work_key: String, - ) -> Result<(), ()> { - slog::info!(self.logger, "{}", format!( - "task started task_id={} task_type={} repo_id={} worker_id={} retry={}", - task.id, task.task_type, task.repo_id, self.config.worker_id, task.retry_count - )); - - self.log_stream - .info( - &task.id, - &task.repo_id, - &format!("task started: {}", task.task_type), - ) - .await; - - let result = match task.task_type { - TaskType::Sync => self.run_sync(&task).await, - TaskType::Fsck => self.run_fsck(&task).await, - TaskType::Gc => self.run_gc(&task).await, - }; - - let consumer = self.consumer.clone(); - match result { - Ok(()) => { - if let Err(e) = consumer.ack(&work_key, &task_json).await { - slog::warn!(self.logger, "failed to ack task: {}", e); - } - self.total_processed.fetch_add(1, Ordering::Relaxed); - self.log_stream - .info(&task.id, &task.repo_id, "task completed") - .await; - } - Err(e) => { - // Log the actual error so we can diagnose why tasks fail immediately. - slog::warn!(self.logger, "{}", format!( - "task failed task_id={} task_type={} repo_id={} retry={} error={}", - task.id, task.task_type, task.repo_id, task.retry_count, e - )); - // Check retry count and decide whether to requeue or discard. - const MAX_RETRIES: u32 = 5; - if task.retry_count >= MAX_RETRIES { - // Max retries exceeded — discard the task to prevent infinite loop. - slog::warn!(self.logger, "{}", format!( - "task exhausted retries, discarding task_id={} task_type={} repo_id={} retry_count={} last_error={}", - task.id, task.task_type, task.repo_id, task.retry_count, e - )); - if let Err(e) = consumer.ack(&work_key, &task_json).await { - slog::warn!(self.logger, "failed to ack discarded task: {}", e); - } - self.total_failed.fetch_add(1, Ordering::Relaxed); - self.log_stream - .error(&task.id, &task.repo_id, &format!("task failed after {} retries: {}", task.retry_count, e)) - .await; - } else { - // Requeue with incremented retry count. - let mut task = task; - task.retry_count += 1; - let retry_json = serde_json::to_string(&task) - .unwrap_or_else(|_| task_json.clone()); - - if let Err(e) = consumer.nak_with_retry(&work_key, &queue_key, &task_json, &retry_json).await { - slog::warn!(self.logger, "failed to nak task: {}", e); - } - self.total_failed.fetch_add(1, Ordering::Relaxed); - self.log_stream - .error(&task.id, &task.repo_id, &format!("task failed: {}", e)) - .await; - } - } - } - - Ok(()) - } - - async fn run_sync(&self, task: &HookTask) -> Result<(), crate::GitError> { - let repo_id = models::Uuid::parse_str(&task.repo_id) - .map_err(|_| crate::GitError::Internal("invalid repo_id uuid".into()))?; - - let repo = models::repos::repo::Entity::find_by_id(repo_id) - .one(self.db.reader()) - .await - .map_err(crate::GitError::from)? - .ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?; - - // Fail fast if storage path doesn't exist — avoid blocking spawn_blocking thread pool. - if !std::path::Path::new(&repo.storage_path).exists() { - return Err(crate::GitError::NotFound(format!( - "storage path does not exist: {}", repo.storage_path - ))); - } - - let db_clone = self.db.clone(); - let cache_clone = self.cache.clone(); - let repo_clone = repo.clone(); - let logger_clone = self.logger.clone(); - - // Phase 1: capture before branch/tag tips. - let before_tips: (Vec<(String, String)>, Vec<(String, String)>) = - tokio::task::spawn_blocking({ - let db = db_clone.clone(); - let cache = cache_clone.clone(); - let repo = repo_clone.clone(); - let logger = logger_clone.clone(); - move || { - let sync = HookMetaDataSync::new(db, cache, repo, logger)?; - Ok::<_, crate::GitError>((sync.list_branch_tips(), sync.list_tag_tips())) - } - }) - .await - .map_err(|e| crate::GitError::Internal(format!("spawn_blocking failed: {}", e)))??; - - // Phase 2: run sync (async operation). - let sync_result: Result<(), crate::GitError> = tokio::task::spawn_blocking({ - let db = db_clone.clone(); - let cache = cache_clone.clone(); - let repo = repo_clone.clone(); - let logger = logger_clone.clone(); - move || { - let sync = HookMetaDataSync::new(db, cache, repo, logger)?; - tokio::runtime::Handle::current().block_on(async { sync.sync().await }) - } - }) - .await - .map_err(|e| crate::GitError::Internal(format!("spawn_blocking failed: {}", e)))?; - - sync_result?; - - // Phase 3: capture after branch/tag tips. - let after_tips: (Vec<(String, String)>, Vec<(String, String)>) = - tokio::task::spawn_blocking({ - let db = db_clone.clone(); - let cache = cache_clone.clone(); - let repo = repo_clone.clone(); - let logger = logger_clone.clone(); - move || { - let sync = HookMetaDataSync::new(db, cache, repo, logger)?; - Ok::<_, crate::GitError>((sync.list_branch_tips(), sync.list_tag_tips())) - } - }) - .await - .map_err(|e| crate::GitError::Internal(format!("spawn_blocking failed: {}", e)))??; - - let (before_branch_tips, before_tag_tips) = before_tips; - let (after_branch_tips, after_tag_tips) = after_tips; - - let repo_uuid = repo.id.to_string(); - let repo_name = repo.repo_name.clone(); - let default_branch = repo.default_branch.clone(); - - // Resolve namespace = project.name - let namespace = models::projects::Project::find_by_id(repo.project) - .one(self.db.reader()) - .await - .map_err(|e| crate::GitError::Internal(format!("failed to fetch project: {}", e)))? - .map(|p| p.name) - .unwrap_or_default(); - - let logger = self.logger.clone(); - let http = self.http.clone(); - let db = self.db.clone(); - - // Dispatch branch push webhooks. - for (branch, after_oid) in &after_branch_tips { - let before_oid = before_branch_tips - .iter() - .find(|(n, _)| n == branch) - .map(|(_, o)| o.as_str()); - let changed = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(true); - if changed { - let before_oid = before_oid.map_or("0", |v| v).to_string(); - let after = after_oid.clone(); - let branch_name = branch.clone(); - - slog::info!(logger, "{}", format!("detected push on branch branch={} before={} after={}", branch_name, before_oid, after)); - - let http = http.clone(); - let db = db.clone(); - let logs = logger.clone(); - let ru = repo_uuid.clone(); - let ns = namespace.clone(); - let rn = repo_name.clone(); - let db_branch = default_branch.clone(); - - tokio::spawn(async move { - crate::hook::webhook_dispatch::dispatch_repo_webhooks( - &db, - &http, - &logs, - &ru, - &ns, - &rn, - &db_branch, - "", - "", - crate::hook::webhook_dispatch::WebhookEventKind::Push { - r#ref: format!("refs/heads/{}", branch_name), - before: before_oid, - after, - commits: vec![], - }, - ) - .await; - }); - } - } - - // Dispatch tag push webhooks. - for (tag, after_oid) in &after_tag_tips { - let before_oid = before_tag_tips - .iter() - .find(|(n, _)| n == tag) - .map(|(_, o)| o.as_str()); - let is_new = before_oid.is_none(); - let was_updated = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(false); - if is_new || was_updated { - let before_oid = before_oid.map_or("0", |v| v).to_string(); - let after = after_oid.clone(); - let tag_name = tag.clone(); - - slog::info!(logger, "{}", format!("detected tag push tag={} before={} after={}", tag_name, before_oid, after)); - - let http = http.clone(); - let db = db.clone(); - let logs = logger.clone(); - let ru = repo_uuid.clone(); - let ns = namespace.clone(); - let rn = repo_name.clone(); - let db_branch = default_branch.clone(); - - tokio::spawn(async move { - crate::hook::webhook_dispatch::dispatch_repo_webhooks( - &db, - &http, - &logs, - &ru, - &ns, - &rn, - &db_branch, - "", - "", - crate::hook::webhook_dispatch::WebhookEventKind::TagPush { - r#ref: format!("refs/tags/{}", tag_name), - before: before_oid, - after, - }, - ) - .await; - }); - } - } - - Ok(()) - } - - async fn run_fsck(&self, task: &HookTask) -> Result<(), crate::GitError> { - let repo_id = models::Uuid::parse_str(&task.repo_id) - .map_err(|_| crate::GitError::Internal("invalid repo_id uuid".into()))?; - - let repo = models::repos::repo::Entity::find_by_id(repo_id) - .one(self.db.reader()) - .await - .map_err(crate::GitError::from)? - .ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?; - - if !std::path::Path::new(&repo.storage_path).exists() { - return Err(crate::GitError::NotFound(format!( - "storage path does not exist: {}", repo.storage_path - ))); - } - - self.log_stream - .info(&task.id, &task.repo_id, "running fsck") - .await; - - let db_clone = self.db.clone(); - let cache_clone = self.cache.clone(); - let logger_clone = self.logger.clone(); - - tokio::task::spawn_blocking(move || -> Result<(), crate::GitError> { - tokio::runtime::Handle::current().block_on(async move { - let sync = - HookMetaDataSync::new(db_clone.clone(), cache_clone, repo, logger_clone)?; - let mut txn = db_clone.begin().await.map_err(crate::GitError::from)?; - sync.run_fsck_and_rollback_if_corrupt(&mut txn).await - }) - }) - .await - .map_err(|e| crate::GitError::Internal(format!("spawn_blocking failed: {}", e)))??; - - Ok(()) - } - - async fn run_gc(&self, task: &HookTask) -> Result<(), crate::GitError> { - let repo_id = models::Uuid::parse_str(&task.repo_id) - .map_err(|_| crate::GitError::Internal("invalid repo_id uuid".into()))?; - - let repo = models::repos::repo::Entity::find_by_id(repo_id) - .one(self.db.reader()) - .await - .map_err(crate::GitError::from)? - .ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?; - - if !std::path::Path::new(&repo.storage_path).exists() { - return Err(crate::GitError::NotFound(format!( - "storage path does not exist: {}", repo.storage_path - ))); - } - - self.log_stream - .info(&task.id, &task.repo_id, "running gc") - .await; - - let db_clone = self.db.clone(); - let cache_clone = self.cache.clone(); - let logger_clone = self.logger.clone(); - - tokio::task::spawn_blocking(move || -> Result<(), crate::GitError> { - tokio::runtime::Handle::current().block_on(async move { - let sync = HookMetaDataSync::new(db_clone, cache_clone, repo, logger_clone)?; - sync.run_gc().await - }) - }) - .await - .map_err(|e| crate::GitError::Internal(format!("spawn_blocking failed: {}", e)))??; - - Ok(()) - } - - pub fn metrics(&self) -> PoolMetrics { - let running = self.running_count.load(Ordering::Relaxed) as usize; - PoolMetrics { - running, - max_concurrent: self.config.max_concurrent, - cpu_usage: 0.0, - total_processed: self.total_processed.load(Ordering::Relaxed), - total_failed: self.total_failed.load(Ordering::Relaxed), - can_accept: running < self.config.max_concurrent, - } - } - - pub fn can_accept_task_sync(&self) -> bool { - let running = self.running_count.load(Ordering::Relaxed) as usize; - running < self.config.max_concurrent - } - - pub fn log_stream(&self) -> &LogStream { - &self.log_stream - } -} diff --git a/libs/git/hook/pool/redis.rs b/libs/git/hook/pool/redis.rs deleted file mode 100644 index 2ea6265..0000000 --- a/libs/git/hook/pool/redis.rs +++ /dev/null @@ -1,177 +0,0 @@ -use crate::error::GitError; -use crate::hook::pool::types::HookTask; -use deadpool_redis::cluster::Connection as RedisConn; -use slog::Logger; - -/// Redis List consumer using BLMOVE for atomic move-from-queue-to-work pattern. -/// Compatible with Redis Cluster via hash tags in key names. -pub struct RedisConsumer { - pool: deadpool_redis::cluster::Pool, - /// Hash-tag-prefixed key prefix, e.g. "{hook}". - /// Full queue key: "{hook}:{task_type}" - /// Full work key: "{hook}:{task_type}:work" - prefix: String, - block_timeout_secs: u64, - logger: Logger, -} - -impl RedisConsumer { - pub fn new( - pool: deadpool_redis::cluster::Pool, - prefix: String, - block_timeout_secs: u64, - logger: Logger, - ) -> Self { - Self { - pool, - prefix, - block_timeout_secs, - logger, - } - } - - /// Atomically moves a task from the main queue to the work queue using BLMOVE. - /// Blocks up to `block_timeout_secs` waiting for a task. - /// - /// Returns `Some((HookTask, task_json))` where `task_json` is the raw JSON string - /// needed for LREM on ACK. Returns `None` if the blocking timed out. - pub async fn next(&self, task_type: &str) -> Result, GitError> { - let queue_key = format!("{}:{}", self.prefix, task_type); - let work_key = format!("{}:{}:work", self.prefix, task_type); - - let redis = self - .pool - .get() - .await - .map_err(|e| GitError::Internal(format!("redis pool get failed: {}", e)))?; - - let mut conn: RedisConn = redis; - - // BLMOVE source destination timeout - // RIGHT LEFT = BRPOPLPUSH equivalent (pop from right of src, push to left of dst) - let task_json: Option = redis::cmd("BLMOVE") - .arg(&queue_key) - .arg(&work_key) - .arg("RIGHT") - .arg("LEFT") - .arg(self.block_timeout_secs) - .query_async(&mut conn) - .await - .map_err(|e| GitError::Internal(format!("BLMOVE failed: {}", e)))?; - - match task_json { - Some(json) => { - match serde_json::from_str::(&json) { - Ok(task) => { - slog::debug!(self.logger, "task dequeued"; - "task_id" => %task.id, - "task_type" => %task.task_type, - "queue" => %queue_key - ); - Ok(Some((task, json))) - } - Err(e) => { - // Malformed task — remove from work queue and discard - slog::warn!(self.logger, "malformed task JSON, discarding"; - "error" => %e, - "queue" => %work_key - ); - let _ = self.ack_raw(&work_key, &json).await; - Ok(None) - } - } - } - None => { - // Timed out, no task available - Ok(None) - } - } - } - - /// Acknowledge a task: remove it from the work queue (LREM). - pub async fn ack(&self, work_key: &str, task_json: &str) -> Result<(), GitError> { - self.ack_raw(work_key, task_json).await - } - - async fn ack_raw(&self, work_key: &str, task_json: &str) -> Result<(), GitError> { - let redis = self - .pool - .get() - .await - .map_err(|e| GitError::Internal(format!("redis pool get failed: {}", e)))?; - - let mut conn: RedisConn = redis; - - let _: i64 = redis::cmd("LREM") - .arg(work_key) - .arg(-1) // remove all occurrences - .arg(task_json) - .query_async(&mut conn) - .await - .map_err(|e| GitError::Internal(format!("LREM failed: {}", e)))?; - - Ok(()) - } - - /// Negative acknowledge (retry): remove from work queue and push back to main queue. - pub async fn nak( - &self, - work_key: &str, - queue_key: &str, - task_json: &str, - ) -> Result<(), GitError> { - self.nak_with_retry(work_key, queue_key, task_json, task_json).await - } - - /// Negative acknowledge with a different (updated) task JSON — used to - /// requeue with an incremented retry_count. - pub async fn nak_with_retry( - &self, - work_key: &str, - queue_key: &str, - old_task_json: &str, - new_task_json: &str, - ) -> Result<(), GitError> { - // Remove the old entry from work queue - self.ack_raw(work_key, old_task_json).await?; - - // Push the updated entry back to main queue for retry - let redis = self - .pool - .get() - .await - .map_err(|e| GitError::Internal(format!("redis pool get failed: {}", e)))?; - - let mut conn: RedisConn = redis; - - let _: i64 = redis::cmd("LPUSH") - .arg(queue_key) - .arg(new_task_json) - .query_async(&mut conn) - .await - .map_err(|e| GitError::Internal(format!("LPUSH retry failed: {}", e)))?; - - slog::warn!(self.logger, "{}", format!("task nack'd and requeued queue={}", queue_key)); - - Ok(()) - } - - pub fn pool(&self) -> &deadpool_redis::cluster::Pool { - &self.pool - } - - pub fn prefix(&self) -> &str { - &self.prefix - } -} - -impl Clone for RedisConsumer { - fn clone(&self) -> Self { - Self { - pool: self.pool.clone(), - prefix: self.prefix.clone(), - block_timeout_secs: self.block_timeout_secs, - logger: self.logger.clone(), - } - } -} diff --git a/libs/git/hook/pool/types.rs b/libs/git/hook/pool/types.rs deleted file mode 100644 index b1643db..0000000 --- a/libs/git/hook/pool/types.rs +++ /dev/null @@ -1,44 +0,0 @@ -use serde::{Deserialize, Serialize}; - -pub use config::hook::PoolConfig; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct HookTask { - pub id: String, - pub repo_id: String, - pub task_type: TaskType, - pub payload: serde_json::Value, - pub created_at: chrono::DateTime, - /// Number of times this task has been retried after a failure. - /// When >= MAX_TASK_RETRIES, the task is discarded instead of requeued. - #[serde(default)] - pub retry_count: u32, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -#[serde(rename_all = "snake_case")] -pub enum TaskType { - Sync, - Fsck, - Gc, -} - -impl std::fmt::Display for TaskType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - TaskType::Sync => write!(f, "sync"), - TaskType::Fsck => write!(f, "fsck"), - TaskType::Gc => write!(f, "gc"), - } - } -} - -#[derive(Debug, Clone)] -pub struct PoolMetrics { - pub running: usize, - pub max_concurrent: usize, - pub cpu_usage: f32, - pub total_processed: u64, - pub total_failed: u64, - pub can_accept: bool, -} diff --git a/libs/git/hook/sync/lock.rs b/libs/git/hook/sync/lock.rs deleted file mode 100644 index d9bb081..0000000 --- a/libs/git/hook/sync/lock.rs +++ /dev/null @@ -1,63 +0,0 @@ -use crate::GitError; -use crate::hook::sync::HookMetaDataSync; - -impl HookMetaDataSync { - const LOCK_TTL_SECS: u64 = 60; - - pub async fn acquire_lock(&self) -> Result { - let lock_key = format!("git:repo:lock:{}", self.repo.id); - let lock_value = format!("{}:{}", uuid::Uuid::new_v4(), std::process::id()); - - let mut conn = self - .cache - .conn() - .await - .map_err(|e| GitError::IoError(format!("failed to get redis connection: {}", e)))?; - - let result: bool = redis::cmd("SET") - .arg(&lock_key) - .arg(&lock_value) - .arg("NX") - .arg("EX") - .arg(Self::LOCK_TTL_SECS) - .query_async(&mut conn) - .await - .map_err(|e| GitError::IoError(format!("failed to acquire lock: {}", e)))?; - - if result { - Ok(lock_value) - } else { - Err(GitError::Locked(format!( - "repository {} is locked by another process", - self.repo.id - ))) - } - } - - pub async fn release_lock(&self, lock_value: &str) -> Result<(), GitError> { - let lock_key = format!("git:repo:lock:{}", self.repo.id); - - let mut conn = self - .cache - .conn() - .await - .map_err(|e| GitError::IoError(format!("failed to get redis connection: {}", e)))?; - - let script = r#" - if redis.call("get", KEYS[1]) == ARGV[1] then - return redis.call("del", KEYS[1]) - else - return 0 - end - "#; - - let _: i32 = redis::Script::new(script) - .key(&lock_key) - .arg(lock_value) - .invoke_async(&mut conn) - .await - .map_err(|e| GitError::IoError(format!("failed to release lock: {}", e)))?; - - Ok(()) - } -} diff --git a/libs/git/hook/sync/mod.rs b/libs/git/hook/sync/mod.rs index af840e4..00112cb 100644 --- a/libs/git/hook/sync/mod.rs +++ b/libs/git/hook/sync/mod.rs @@ -3,8 +3,6 @@ pub mod commit; pub mod fsck; pub mod gc; pub mod lfs; -pub mod lock; -pub mod status; pub mod tag; use db::cache::AppCache; @@ -159,41 +157,97 @@ impl HookMetaDataSync { }) } + /// Full sync: refs → commits → tags → LFS → fsck → gc → skills. + /// No distributed lock — K8s StatefulSet scheduling guarantees exclusive access. pub async fn sync(&self) -> Result<(), crate::GitError> { - let lock_value = self.acquire_lock().await?; + // All git2 operations must run on a blocking thread since git2 types are not Send. + let db = self.db.clone(); + let cache = self.cache.clone(); + let repo = self.repo.clone(); + let logger = self.logger.clone(); - let res = self.sync_internal().await; + let res = tokio::task::spawn_blocking(move || { + tokio::runtime::Handle::current().block_on(async move { + let sync = Self::new(db, cache, repo, logger)?; + let out = sync.sync_full().await; + if let Err(ref e) = out { + slog::error!(sync.logger, "sync failed: {}", e); + } + out + }) + }) + .await + .map_err(|e| crate::GitError::Internal(format!("spawn_blocking join error: {}", e)))??; - if let Err(ref e) = res { - slog::error!(self.logger, "sync failed: {}", e); - } - - if let Err(release_err) = self.release_lock(&lock_value).await { - slog::error!(self.logger, "failed to release lock: {}", release_err); - } - - res + Ok(res) } - async fn sync_internal(&self) -> Result<(), crate::GitError> { - let mut txn = - self.db.begin().await.map_err(|e| { - crate::GitError::IoError(format!("failed to begin transaction: {}", e)) - })?; + /// Run fsck only (refs snapshot + git fsck + rollback on corruption). + pub async fn fsck_only(&self) -> Result<(), crate::GitError> { + let db = self.db.clone(); + let cache = self.cache.clone(); + let repo = self.repo.clone(); + let logger = self.logger.clone(); + + tokio::task::spawn_blocking(move || { + tokio::runtime::Handle::current().block_on(async move { + let sync = Self::new(db, cache, repo, logger)?; + let mut txn = sync.db.begin().await.map_err(|e| { + crate::GitError::IoError(format!("failed to begin transaction: {}", e)) + })?; + sync.run_fsck_and_rollback_if_corrupt(&mut txn).await?; + txn.commit().await.map_err(|e| { + crate::GitError::IoError(format!("failed to commit transaction: {}", e)) + })?; + Ok::<(), crate::GitError>(()) + }) + }) + .await + .map_err(|e| crate::GitError::Internal(format!("spawn_blocking join error: {}", e)))??; + + Ok(()) + } + + /// Run gc only. + pub async fn gc_only(&self) -> Result<(), crate::GitError> { + let db = self.db.clone(); + let cache = self.cache.clone(); + let repo = self.repo.clone(); + let logger = self.logger.clone(); + + tokio::task::spawn_blocking(move || { + tokio::runtime::Handle::current().block_on(async move { + let sync = Self::new(db, cache, repo, logger)?; + sync.run_gc().await + }) + }) + .await + .map_err(|e| crate::GitError::Internal(format!("spawn_blocking join error: {}", e)))??; + + Ok(()) + } + + /// Full sync pipeline inside a single DB transaction. + /// On fsck failure, refs are rolled back and an error is returned. + async fn sync_full(&self) -> Result<(), crate::GitError> { + let mut txn = self + .db + .begin() + .await + .map_err(|e| crate::GitError::IoError(format!("failed to begin transaction: {}", e)))?; self.sync_refs(&mut txn).await?; self.sync_commits(&mut txn).await?; self.sync_tags(&mut txn).await?; self.sync_lfs_objects(&mut txn).await?; - - self.run_fsck_and_rollback_if_corrupt(&mut txn).await?; + self.run_fsck_and_rollback_if_corrupt(&mut txn) + .await?; txn.commit().await.map_err(|e| { crate::GitError::IoError(format!("failed to commit transaction: {}", e)) })?; self.run_gc().await?; - self.sync_skills().await; Ok(()) @@ -208,7 +262,6 @@ impl HookMetaDataSync { if let Ok(r) = ref_result { if r.is_branch() && !r.is_remote() { if let Some(name) = r.name() { - // name is like "refs/heads/main" -> extract "main" let branch = name.strip_prefix("refs/heads/").unwrap_or(name); if let Some(target) = r.target() { tips.push((branch.to_string(), target.to_string())); @@ -230,7 +283,6 @@ impl HookMetaDataSync { if let Ok(r) = ref_result { if r.is_tag() { if let Some(name) = r.name() { - // name is like "refs/tags/v1.0" -> extract "v1.0" let tag = name.strip_prefix("refs/tags/").unwrap_or(name); if let Some(target) = r.target() { tips.push((tag.to_string(), target.to_string())); @@ -244,8 +296,7 @@ impl HookMetaDataSync { } /// Scan the repository for `SKILL.md` files and sync skills to the project. - /// - /// This is a best-effort operation — failures are logged but do not fail the sync. + /// Best-effort — failures are logged but do not fail the sync. pub async fn sync_skills(&self) { let project_uid = self.repo.project; @@ -254,13 +305,15 @@ impl HookMetaDataSync { None => return, }; - // Get current HEAD commit SHA for attribution - let commit_sha = self.domain.repo().head().ok() + let commit_sha = self + .domain + .repo() + .head() + .ok() .and_then(|h| h.target()) .map(|oid| oid.to_string()) .unwrap_or_default(); - // Discover skills from the filesystem let discovered = match scan_skills_from_dir(repo_root, &self.repo.id, &commit_sha) { Ok(d) => d, Err(e) => { @@ -278,7 +331,6 @@ impl HookMetaDataSync { let mut updated = 0i64; let mut removed = 0i64; - // Collect existing repo-sourced skills for this repo let existing: Vec<_> = match SkillEntity::find() .filter(SkillCol::ProjectUuid.eq(project_uid)) .filter(SkillCol::Source.eq("repo")) @@ -344,7 +396,6 @@ impl HookMetaDataSync { } } - // Remove skills no longer in the repo for (slug, old_skill) in existing_by_slug { if !seen_slugs.contains(&slug) { if SkillEntity::delete_by_id(old_skill.id).exec(&self.db).await.is_ok() { diff --git a/libs/git/hook/sync/remote.rs b/libs/git/hook/sync/remote.rs deleted file mode 100644 index e69de29..0000000 diff --git a/libs/git/hook/sync/status.rs b/libs/git/hook/sync/status.rs deleted file mode 100644 index ce6f708..0000000 --- a/libs/git/hook/sync/status.rs +++ /dev/null @@ -1,98 +0,0 @@ -use crate::GitError; -use crate::hook::sync::HookMetaDataSync; - -#[derive(Debug, Clone)] -pub enum SyncStatus { - Pending, - Processing, - Success, - Failed(String), -} - -impl std::fmt::Display for SyncStatus { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - SyncStatus::Pending => write!(f, "pending"), - SyncStatus::Processing => write!(f, "processing"), - SyncStatus::Success => write!(f, "success"), - SyncStatus::Failed(_) => write!(f, "failed"), - } - } -} - -impl HookMetaDataSync { - const STATUS_TTL_SECS: u64 = 86400; - - pub async fn update_sync_status(&self, status: SyncStatus) -> Result<(), GitError> { - let key = format!("git:repo:sync_status:{}", self.repo.id); - let status_str = status.to_string(); - - let mut conn = self - .cache - .conn() - .await - .map_err(|e| GitError::IoError(format!("failed to get redis connection: {}", e)))?; - - let _: () = redis::cmd("SETEX") - .arg(&key) - .arg(Self::STATUS_TTL_SECS) - .arg(&status_str) - .query_async(&mut conn) - .await - .map_err(|e| GitError::IoError(format!("failed to set sync status: {}", e)))?; - - if let SyncStatus::Failed(ref error_msg) = status { - let error_key = format!("git:repo:sync_error:{}", self.repo.id); - let _: () = redis::cmd("SETEX") - .arg(&error_key) - .arg(Self::STATUS_TTL_SECS) - .arg(error_msg) - .query_async(&mut conn) - .await - .map_err(|e| GitError::IoError(format!("failed to set sync error: {}", e)))?; - } - - Ok(()) - } - - pub async fn get_sync_status(&self) -> Result, GitError> { - let key = format!("git:repo:sync_status:{}", self.repo.id); - let error_key = format!("git:repo:sync_error:{}", self.repo.id); - - let mut conn = self - .cache - .conn() - .await - .map_err(|e| GitError::IoError(format!("failed to get redis connection: {}", e)))?; - - let status_str: Option = - redis::cmd("GET") - .arg(&key) - .query_async(&mut conn) - .await - .map_err(|e| GitError::IoError(format!("failed to get sync status: {}", e)))?; - - match status_str { - Some(status) => { - let error_msg: Option = redis::cmd("GET") - .arg(&error_key) - .query_async(&mut conn) - .await - .map_err(|e| GitError::IoError(format!("failed to get sync error: {}", e)))?; - - let sync_status = match status.as_str() { - "pending" => SyncStatus::Pending, - "processing" => SyncStatus::Processing, - "success" => SyncStatus::Success, - "failed" => { - SyncStatus::Failed(error_msg.unwrap_or_else(|| "Unknown error".to_string())) - } - _ => SyncStatus::Pending, - }; - - Ok(Some(sync_status)) - } - None => Ok(None), - } - } -} diff --git a/libs/git/http/mod.rs b/libs/git/http/mod.rs index dc34384..76df9ed 100644 --- a/libs/git/http/mod.rs +++ b/libs/git/http/mod.rs @@ -1,3 +1,4 @@ +use crate::hook::HookService; use actix_web::{App, HttpServer, web}; use config::AppConfig; use db::cache::AppCache; @@ -71,7 +72,16 @@ pub async fn run_http(config: AppConfig, logger: Logger) -> anyhow::Result<()> { let app_cache = app_cache?; let redis_pool = app_cache.redis_pool().clone(); - let sync = crate::ssh::ReceiveSyncService::new(redis_pool, logger.clone()); + let http = Arc::new(reqwest::Client::new()); + let hook = HookService::new( + db.clone(), + app_cache.clone(), + redis_pool.clone(), + logger.clone(), + config.clone(), + http, + ); + let sync = crate::ssh::ReceiveSyncService::new(hook); let rate_limiter = Arc::new(rate_limit::RateLimiter::new( rate_limit::RateLimitConfig::default(), diff --git a/libs/git/lib.rs b/libs/git/lib.rs index 4c11430..c5de5ec 100644 --- a/libs/git/lib.rs +++ b/libs/git/lib.rs @@ -36,8 +36,7 @@ pub use diff::types::{ }; pub use domain::GitDomain; pub use error::{GitError, GitResult}; -pub use hook::pool::GitHookPool; -pub use hook::pool::types::{HookTask, PoolConfig, PoolMetrics, TaskType}; +pub use hook::sync::HookMetaDataSync; pub use lfs::types::{LfsConfig, LfsEntry, LfsOid, LfsPointer}; pub use merge::types::{MergeAnalysisResult, MergeOptions, MergePreferenceResult, MergeheadInfo}; pub use reference::types::RefInfo; diff --git a/libs/git/ssh/mod.rs b/libs/git/ssh/mod.rs index 015cfff..bcdebc9 100644 --- a/libs/git/ssh/mod.rs +++ b/libs/git/ssh/mod.rs @@ -1,5 +1,5 @@ use crate::error::GitError; -use crate::hook::pool::types::{HookTask, TaskType}; +use crate::hook::HookService; use anyhow::Context; use base64::Engine; use config::AppConfig; @@ -137,12 +137,22 @@ impl SSHHandle { "SSH server configured with methods: {:?}", config.methods ); let token_service = SshTokenService::new(self.db.clone()); + let http = Arc::new(reqwest::Client::new()); + let hook = crate::hook::HookService::new( + self.db.clone(), + self.cache.clone(), + self.redis_pool.clone(), + self.logger.clone(), + self.app.clone(), + http, + ); let mut server = server::SSHServer::new( self.db.clone(), self.cache.clone(), self.redis_pool.clone(), self.logger.clone(), token_service, + hook, ); // Start the rate limiter cleanup background task so the HashMap @@ -167,60 +177,33 @@ impl SSHHandle { } } +/// Direct sync service — calls HookService::sync_repo inline. +/// K8s StatefulSet HA scheduling ensures exclusive access per repo shard. #[derive(Clone)] pub struct ReceiveSyncService { - pool: RedisPool, - logger: Logger, - /// Redis key prefix for hook task queues, e.g. "{hook}". - redis_prefix: String, + hook: HookService, } impl ReceiveSyncService { - pub fn new(pool: RedisPool, logger: Logger) -> Self { - Self { - pool, - logger, - redis_prefix: "{hook}".to_string(), - } + pub fn new(hook: HookService) -> Self { + Self { hook } } - pub async fn send(&self, task: RepoReceiveSyncTask) { - let hook_task = HookTask { - id: uuid::Uuid::new_v4().to_string(), - repo_id: task.repo_uid.to_string(), - task_type: TaskType::Sync, - payload: serde_json::Value::Null, - created_at: chrono::Utc::now(), - retry_count: 0, - }; - - let task_json = match serde_json::to_string(&hook_task) { - Ok(j) => j, - Err(e) => { - error!(self.logger, "Failed to serialize hook task: {}", e); - return; + /// Execute a full repo sync synchronously. + /// Returns Ok on success, Err on failure. + pub async fn send(&self, task: RepoReceiveSyncTask) -> Result<(), crate::GitError> { + let repo_id = task.repo_uid.to_string(); + slog::info!(self.hook.logger, "starting sync repo_id={}", repo_id); + let res = self.hook.sync_repo(&repo_id).await; + match &res { + Ok(()) => { + slog::info!(self.hook.logger, "sync completed repo_id={}", repo_id); } - }; - - let queue_key = format!("{}:sync", self.redis_prefix); - - let redis = match self.pool.get().await { - Ok(conn) => conn, Err(e) => { - error!(self.logger, "Failed to get Redis connection: {}", e); - return; + slog::error!(self.hook.logger, "sync failed repo_id={} error={}", repo_id, e); } - }; - - let mut conn: deadpool_redis::cluster::Connection = redis; - if let Err(e) = redis::cmd("LPUSH") - .arg(&queue_key) - .arg(&task_json) - .query_async::<()>(&mut conn) - .await - { - error!(self.logger, "{}", format!("Failed to LPUSH sync task repo_id={} error={}", task.repo_uid, e)); } + res } } diff --git a/libs/git/ssh/server.rs b/libs/git/ssh/server.rs index ea87b4e..989b8a6 100644 --- a/libs/git/ssh/server.rs +++ b/libs/git/ssh/server.rs @@ -1,3 +1,4 @@ +use crate::hook::HookService; use crate::ssh::ReceiveSyncService; use crate::ssh::SshTokenService; use crate::ssh::handle::SSHandle; @@ -15,6 +16,7 @@ pub struct SSHServer { pub redis_pool: RedisPool, pub logger: Logger, pub token_service: SshTokenService, + pub hook: HookService, } impl SSHServer { @@ -24,6 +26,7 @@ impl SSHServer { redis_pool: RedisPool, logger: Logger, token_service: SshTokenService, + hook: HookService, ) -> Self { SSHServer { db, @@ -31,6 +34,7 @@ impl SSHServer { redis_pool, logger, token_service, + hook, } } } @@ -48,7 +52,7 @@ impl russh::server::Server for SSHServer { } else { info!(self.logger, "New SSH connection from unknown address"); } - let sync_service = ReceiveSyncService::new(self.redis_pool.clone(), self.logger.clone()); + let sync_service = ReceiveSyncService::new(self.hook.clone()); SSHandle::new( self.db.clone(), self.cache.clone(), diff --git a/scripts/push.js b/scripts/push.js index 7df6ddb..5d9afa9 100644 --- a/scripts/push.js +++ b/scripts/push.js @@ -13,7 +13,7 @@ * DOCKER_PASS - Registry password */ -const { execSync } = require('child_process'); +const {execSync} = require('child_process'); const REGISTRY = process.env.REGISTRY || 'harbor.gitdata.me/gta_team'; const GIT_SHA_SHORT = execSync('git rev-parse --short HEAD', {encoding: 'utf8'}).trim(); @@ -21,15 +21,15 @@ const TAG = process.env.TAG || GIT_SHA_SHORT; const DOCKER_USER = process.env.DOCKER_USER || process.env.HARBOR_USERNAME; const DOCKER_PASS = process.env.DOCKER_PASS || process.env.HARBOR_PASSWORD; -const SERVICES = ['app', 'gitserver', 'email-worker', 'git-hook', 'operator', 'static', 'frontend']; +const SERVICES = ['app', 'gitserver', 'email-worker', 'git-hook', 'operator', 'static']; const args = process.argv.slice(2); const targets = args.length > 0 ? args : SERVICES; if (!DOCKER_USER || !DOCKER_PASS) { - console.error('Error: DOCKER_USER and DOCKER_PASS environment variables are required'); - console.error('Set HARBOR_USERNAME and HARBOR_PASSWORD as alternative'); - process.exit(1); + console.error('Error: DOCKER_USER and DOCKER_PASS environment variables are required'); + console.error('Set HARBOR_USERNAME and HARBOR_PASSWORD as alternative'); + process.exit(1); } console.log(`\n=== Push Configuration ===`); @@ -40,31 +40,31 @@ console.log(`Services: ${targets.join(', ')}\n`); // Login console.log(`==> Logging in to ${REGISTRY}`); try { - execSync(`docker login ${REGISTRY} -u "${DOCKER_USER}" -p "${DOCKER_PASS}"`, { - stdio: 'inherit' - }); + execSync(`docker login ${REGISTRY} -u "${DOCKER_USER}" -p "${DOCKER_PASS}"`, { + stdio: 'inherit' + }); } catch (error) { - console.error('Login failed'); - process.exit(1); + console.error('Login failed'); + process.exit(1); } for (const service of targets) { - if (!SERVICES.includes(service)) { - console.error(`Unknown service: ${service}`); - process.exit(1); - } + if (!SERVICES.includes(service)) { + console.error(`Unknown service: ${service}`); + process.exit(1); + } - const image = `${REGISTRY}/${service}:${TAG}`; + const image = `${REGISTRY}/${service}:${TAG}`; - console.log(`\n==> Pushing ${image}`); + console.log(`\n==> Pushing ${image}`); - try { - execSync(`docker push "${image}"`, { stdio: 'inherit' }); - console.log(` [OK] ${image}`); - } catch (error) { - console.error(` [FAIL] ${service}`); - process.exit(1); - } + try { + execSync(`docker push "${image}"`, {stdio: 'inherit'}); + console.log(` [OK] ${image}`); + } catch (error) { + console.error(` [FAIL] ${service}`); + process.exit(1); + } } console.log(`\n=== Push Complete ===`);