refactor(git): drop hook pool, sync execution is now direct and sequential
- Remove entire pool/ directory (RedisConsumer, CpuMonitor, LogStream, HookTask, TaskType) - Remove Redis distributed lock (acquire_lock/release_lock) — K8s StatefulSet scheduling guarantees exclusive access per repo shard - Remove sync/lock.rs, sync/remote.rs, sync/status.rs (dead code) - Remove hook/event.rs (GitHookEvent was never used) - New HookService exposes sync_repo / fsck_repo / gc_repo directly - ReceiveSyncService now calls HookService inline instead of LPUSH to Redis queue - sync/mod.rs: git2 operations wrapped in spawn_blocking for Send safety (git2 types are not Send — async git2 operations must not cross await points) - scripts/push.js: drop 'frontend' from docker push list (embedded into static binary)
This commit is contained in:
parent
7e42139989
commit
eeb99bf628
@ -1,7 +0,0 @@
|
||||
use models::RepoId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Ord, PartialOrd, PartialEq, Eq)]
|
||||
pub enum GitHookEvent {
|
||||
MetaDataSync(RepoId),
|
||||
}
|
||||
@ -2,13 +2,15 @@ use config::AppConfig;
|
||||
use db::cache::AppCache;
|
||||
use db::database::AppDatabase;
|
||||
use deadpool_redis::cluster::Pool as RedisPool;
|
||||
use models::EntityTrait;
|
||||
use slog::Logger;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::hook::pool::GitHookPool;
|
||||
|
||||
/// Simplified hook service — no queue, no pool.
|
||||
/// K8s StatefulSet HA scheduling ensures only one pod touches a repo at a time.
|
||||
/// Execution is direct and sequential per invocation.
|
||||
#[derive(Clone)]
|
||||
pub struct GitServiceHooks {
|
||||
pub struct HookService {
|
||||
pub(crate) db: AppDatabase,
|
||||
pub(crate) cache: AppCache,
|
||||
pub(crate) redis_pool: RedisPool,
|
||||
@ -17,7 +19,7 @@ pub struct GitServiceHooks {
|
||||
pub(crate) http: Arc<reqwest::Client>,
|
||||
}
|
||||
|
||||
impl GitServiceHooks {
|
||||
impl HookService {
|
||||
pub fn new(
|
||||
db: AppDatabase,
|
||||
cache: AppCache,
|
||||
@ -36,35 +38,92 @@ impl GitServiceHooks {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run(
|
||||
self,
|
||||
cancel: tokio_util::sync::CancellationToken,
|
||||
) -> Result<(), crate::GitError> {
|
||||
let pool_config = config::hook::PoolConfig::from_env(&self.config);
|
||||
/// Full sync: refs → commits → tags → LFS → fsck → gc → skills.
|
||||
pub async fn sync_repo(&self, repo_id: &str) -> Result<(), crate::GitError> {
|
||||
let repo_uuid = models::Uuid::parse_str(repo_id)
|
||||
.map_err(|_| crate::GitError::Internal("invalid repo_id uuid".into()))?;
|
||||
|
||||
let pool = GitHookPool::new(
|
||||
pool_config,
|
||||
self.db,
|
||||
self.cache,
|
||||
self.redis_pool,
|
||||
let repo = models::repos::repo::Entity::find_by_id(repo_uuid)
|
||||
.one(self.db.reader())
|
||||
.await
|
||||
.map_err(crate::GitError::from)?
|
||||
.ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?;
|
||||
|
||||
if !std::path::Path::new(&repo.storage_path).exists() {
|
||||
return Err(crate::GitError::NotFound(format!(
|
||||
"storage path does not exist: {}",
|
||||
repo.storage_path
|
||||
)));
|
||||
}
|
||||
|
||||
let sync = crate::hook::sync::HookMetaDataSync::new(
|
||||
self.db.clone(),
|
||||
self.cache.clone(),
|
||||
repo,
|
||||
self.logger.clone(),
|
||||
self.http,
|
||||
)
|
||||
.await?;
|
||||
)?;
|
||||
|
||||
let pool_arc = Arc::new(pool);
|
||||
// No distributed lock needed — K8s StatefulSet scheduling guarantees
|
||||
// that at most one pod processes a given repo shard at any time.
|
||||
sync.sync().await
|
||||
}
|
||||
|
||||
slog::info!(self.logger, "git hook service started");
|
||||
/// Run fsck only (no full sync).
|
||||
pub async fn fsck_repo(&self, repo_id: &str) -> Result<(), crate::GitError> {
|
||||
let repo_uuid = models::Uuid::parse_str(repo_id)
|
||||
.map_err(|_| crate::GitError::Internal("invalid repo_id uuid".into()))?;
|
||||
|
||||
pool_arc.run(cancel).await;
|
||||
let repo = models::repos::repo::Entity::find_by_id(repo_uuid)
|
||||
.one(self.db.reader())
|
||||
.await
|
||||
.map_err(crate::GitError::from)?
|
||||
.ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?;
|
||||
|
||||
slog::info!(self.logger, "git hook service stopped");
|
||||
if !std::path::Path::new(&repo.storage_path).exists() {
|
||||
return Err(crate::GitError::NotFound(format!(
|
||||
"storage path does not exist: {}",
|
||||
repo.storage_path
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
let sync = crate::hook::sync::HookMetaDataSync::new(
|
||||
self.db.clone(),
|
||||
self.cache.clone(),
|
||||
repo,
|
||||
self.logger.clone(),
|
||||
)?;
|
||||
|
||||
sync.fsck_only().await
|
||||
}
|
||||
|
||||
/// Run gc only (no full sync).
|
||||
pub async fn gc_repo(&self, repo_id: &str) -> Result<(), crate::GitError> {
|
||||
let repo_uuid = models::Uuid::parse_str(repo_id)
|
||||
.map_err(|_| crate::GitError::Internal("invalid repo_id uuid".into()))?;
|
||||
|
||||
let repo = models::repos::repo::Entity::find_by_id(repo_uuid)
|
||||
.one(self.db.reader())
|
||||
.await
|
||||
.map_err(crate::GitError::from)?
|
||||
.ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?;
|
||||
|
||||
if !std::path::Path::new(&repo.storage_path).exists() {
|
||||
return Err(crate::GitError::NotFound(format!(
|
||||
"storage path does not exist: {}",
|
||||
repo.storage_path
|
||||
)));
|
||||
}
|
||||
|
||||
let sync = crate::hook::sync::HookMetaDataSync::new(
|
||||
self.db.clone(),
|
||||
self.cache.clone(),
|
||||
repo,
|
||||
self.logger.clone(),
|
||||
)?;
|
||||
|
||||
sync.gc_only().await
|
||||
}
|
||||
}
|
||||
|
||||
pub mod event;
|
||||
pub mod pool;
|
||||
pub mod sync;
|
||||
pub mod webhook_dispatch;
|
||||
|
||||
@ -1,103 +0,0 @@
|
||||
use deadpool_redis::cluster::Pool;
|
||||
use serde::Serialize;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct TaskLog {
|
||||
pub task_id: String,
|
||||
pub repo_id: String,
|
||||
pub worker_id: String,
|
||||
pub level: String,
|
||||
pub message: String,
|
||||
pub timestamp: chrono::DateTime<chrono::Utc>,
|
||||
}
|
||||
|
||||
pub struct LogStream {
|
||||
channel: String,
|
||||
worker_id: String,
|
||||
pool: Arc<Pool>,
|
||||
}
|
||||
|
||||
impl Clone for LogStream {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
channel: self.channel.clone(),
|
||||
worker_id: self.worker_id.clone(),
|
||||
pool: self.pool.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LogStream {
|
||||
pub fn new(channel: String, worker_id: String, pool: Arc<Pool>) -> Self {
|
||||
Self {
|
||||
channel,
|
||||
worker_id,
|
||||
pool,
|
||||
}
|
||||
}
|
||||
|
||||
async fn publish_log(&self, log: TaskLog) {
|
||||
let data = match serde_json::to_vec(&log) {
|
||||
Ok(d) => d,
|
||||
Err(e) => {
|
||||
eprintln!("failed to serialize log: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let redis = match self.pool.get().await {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
eprintln!("redis pool get failed: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut conn: deadpool_redis::cluster::Connection = redis;
|
||||
if let Err(e) = redis::cmd("PUBLISH")
|
||||
.arg(&self.channel)
|
||||
.arg(&data)
|
||||
.query_async::<()>(&mut conn)
|
||||
.await
|
||||
{
|
||||
eprintln!("Redis PUBLISH failed: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn info(&self, task_id: &str, repo_id: &str, message: &str) {
|
||||
self.publish_log(TaskLog {
|
||||
task_id: task_id.to_string(),
|
||||
repo_id: repo_id.to_string(),
|
||||
worker_id: self.worker_id.clone(),
|
||||
level: "info".to_string(),
|
||||
message: message.to_string(),
|
||||
timestamp: chrono::Utc::now(),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
pub async fn error(&self, task_id: &str, repo_id: &str, message: &str) {
|
||||
self.publish_log(TaskLog {
|
||||
task_id: task_id.to_string(),
|
||||
repo_id: repo_id.to_string(),
|
||||
worker_id: self.worker_id.clone(),
|
||||
level: "error".to_string(),
|
||||
message: message.to_string(),
|
||||
timestamp: chrono::Utc::now(),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
pub async fn warn(&self, task_id: &str, repo_id: &str, message: &str) {
|
||||
self.publish_log(TaskLog {
|
||||
task_id: task_id.to_string(),
|
||||
repo_id: repo_id.to_string(),
|
||||
worker_id: self.worker_id.clone(),
|
||||
level: "warn".to_string(),
|
||||
message: message.to_string(),
|
||||
timestamp: chrono::Utc::now(),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
@ -1,42 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
use sysinfo::System;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
pub struct CpuMonitor {
|
||||
sys: Arc<RwLock<System>>,
|
||||
}
|
||||
|
||||
impl CpuMonitor {
|
||||
pub fn new() -> Self {
|
||||
let mut sys = System::new();
|
||||
sys.refresh_cpu_all();
|
||||
Self {
|
||||
sys: Arc::new(RwLock::new(sys)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn cpu_usage(&self) -> f32 {
|
||||
let mut sys = self.sys.write().await;
|
||||
sys.refresh_cpu_all();
|
||||
sys.global_cpu_usage()
|
||||
}
|
||||
|
||||
pub async fn can_accept_task(
|
||||
&self,
|
||||
max_concurrent: usize,
|
||||
cpu_threshold: f32,
|
||||
running: usize,
|
||||
) -> bool {
|
||||
if running >= max_concurrent {
|
||||
return false;
|
||||
}
|
||||
let cpu = self.cpu_usage().await;
|
||||
cpu < cpu_threshold
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CpuMonitor {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
@ -1,526 +0,0 @@
|
||||
pub mod log;
|
||||
pub mod metrics;
|
||||
pub mod redis;
|
||||
pub mod types;
|
||||
|
||||
use db::cache::AppCache;
|
||||
use db::database::AppDatabase;
|
||||
use deadpool_redis::cluster::Pool as RedisPool;
|
||||
use sea_orm::EntityTrait;
|
||||
use slog::Logger;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio::task::{JoinSet, spawn_blocking};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::hook::pool::log::LogStream;
|
||||
use crate::hook::pool::metrics::CpuMonitor;
|
||||
use crate::hook::pool::redis::RedisConsumer;
|
||||
use crate::hook::pool::types::{HookTask, PoolConfig, PoolMetrics, TaskType};
|
||||
use crate::hook::sync::HookMetaDataSync;
|
||||
|
||||
pub struct GitHookPool {
|
||||
config: PoolConfig,
|
||||
db: AppDatabase,
|
||||
cache: AppCache,
|
||||
logger: Logger,
|
||||
cpu_monitor: CpuMonitor,
|
||||
consumer: RedisConsumer,
|
||||
log_stream: LogStream,
|
||||
running_count: Arc<AtomicU64>,
|
||||
total_processed: Arc<AtomicU64>,
|
||||
total_failed: Arc<AtomicU64>,
|
||||
semaphore: Arc<Semaphore>,
|
||||
http: Arc<reqwest::Client>,
|
||||
}
|
||||
|
||||
impl GitHookPool {
|
||||
pub async fn new(
|
||||
config: PoolConfig,
|
||||
db: AppDatabase,
|
||||
cache: AppCache,
|
||||
redis_pool: RedisPool,
|
||||
logger: Logger,
|
||||
http: Arc<reqwest::Client>,
|
||||
) -> Result<Self, crate::GitError> {
|
||||
let consumer = RedisConsumer::new(
|
||||
redis_pool.clone(),
|
||||
config.redis_list_prefix.clone(),
|
||||
config.redis_block_timeout_secs,
|
||||
logger.clone(),
|
||||
);
|
||||
|
||||
let log_stream = LogStream::new(
|
||||
config.redis_log_channel.clone(),
|
||||
config.worker_id.clone(),
|
||||
Arc::new(redis_pool),
|
||||
);
|
||||
|
||||
Ok(Self {
|
||||
config,
|
||||
db,
|
||||
cache,
|
||||
logger,
|
||||
cpu_monitor: CpuMonitor::new(),
|
||||
consumer,
|
||||
log_stream,
|
||||
running_count: Arc::new(AtomicU64::new(0)),
|
||||
total_processed: Arc::new(AtomicU64::new(0)),
|
||||
total_failed: Arc::new(AtomicU64::new(0)),
|
||||
semaphore: Arc::new(Semaphore::new(num_cpus::get())),
|
||||
http,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn run(self: Arc<Self>, cancel: CancellationToken) {
|
||||
let mut join_set = JoinSet::<()>::new();
|
||||
let cancel_clone = cancel.clone();
|
||||
|
||||
// Task types to poll
|
||||
let task_types = [TaskType::Sync, TaskType::Fsck, TaskType::Gc];
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel_clone.cancelled() => {
|
||||
slog::info!(self.logger, "pool received shutdown signal, draining {} tasks", join_set.len());
|
||||
while join_set.join_next().await.is_some() {}
|
||||
slog::info!(self.logger, "pool shutdown complete");
|
||||
break;
|
||||
}
|
||||
|
||||
_ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {}
|
||||
}
|
||||
|
||||
let running = self.running_count.load(Ordering::Relaxed) as usize;
|
||||
let can_accept = self
|
||||
.cpu_monitor
|
||||
.can_accept_task(
|
||||
self.config.max_concurrent,
|
||||
self.config.cpu_threshold,
|
||||
running,
|
||||
)
|
||||
.await;
|
||||
|
||||
if !can_accept {
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Poll each task type in round-robin fashion
|
||||
for task_type in &task_types {
|
||||
let result = self.consumer.next(&task_type.to_string()).await;
|
||||
|
||||
let (task, task_json) = match result {
|
||||
Ok(Some(pair)) => pair,
|
||||
Ok(None) => continue, // timeout, try next queue
|
||||
Err(e) => {
|
||||
slog::warn!(self.logger, "failed to dequeue task: {}", e);
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
let self_clone = self.clone();
|
||||
|
||||
// Compute queue/work keys for ACK/NAK
|
||||
let queue_key = format!(
|
||||
"{}:{}",
|
||||
self_clone.config.redis_list_prefix,
|
||||
task_type.to_string()
|
||||
);
|
||||
let work_key = format!("{}:work", queue_key);
|
||||
|
||||
let permit = match self_clone.semaphore.clone().acquire_owned().await {
|
||||
Ok(p) => p,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
let self_clone2 = self.clone();
|
||||
self_clone2.running_count.fetch_add(1, Ordering::Relaxed);
|
||||
let logger_clone = self_clone2.logger.clone();
|
||||
let counter_clone = self_clone2.running_count.clone();
|
||||
join_set.spawn(async move {
|
||||
let panicked = match spawn_blocking(move || {
|
||||
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
|
||||
tokio::runtime::Handle::current().block_on(async {
|
||||
self_clone2
|
||||
.execute_task_body(task, task_json, queue_key, work_key)
|
||||
.await
|
||||
})
|
||||
}))
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(Ok(Ok(()))) => false, // spawn_blocking Ok, catch_unwind Ok, body Ok
|
||||
Ok(Ok(Err(_))) => false, // spawn_blocking Ok, catch_unwind Ok, body Err(()) — error handled inside
|
||||
Ok(Err(_)) => true, // spawn_blocking Ok, catch_unwind Err = panic
|
||||
Err(_) => true, // spawn_blocking Err = thread aborted
|
||||
};
|
||||
drop(permit);
|
||||
counter_clone.fetch_sub(1, Ordering::Relaxed);
|
||||
if panicked {
|
||||
slog::error!(logger_clone, "{}", format!("task panicked"));
|
||||
}
|
||||
});
|
||||
|
||||
// Only process one task per loop iteration to avoid overwhelming the pool
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute_task_body(
|
||||
&self,
|
||||
task: HookTask,
|
||||
task_json: String,
|
||||
queue_key: String,
|
||||
work_key: String,
|
||||
) -> Result<(), ()> {
|
||||
slog::info!(self.logger, "{}", format!(
|
||||
"task started task_id={} task_type={} repo_id={} worker_id={} retry={}",
|
||||
task.id, task.task_type, task.repo_id, self.config.worker_id, task.retry_count
|
||||
));
|
||||
|
||||
self.log_stream
|
||||
.info(
|
||||
&task.id,
|
||||
&task.repo_id,
|
||||
&format!("task started: {}", task.task_type),
|
||||
)
|
||||
.await;
|
||||
|
||||
let result = match task.task_type {
|
||||
TaskType::Sync => self.run_sync(&task).await,
|
||||
TaskType::Fsck => self.run_fsck(&task).await,
|
||||
TaskType::Gc => self.run_gc(&task).await,
|
||||
};
|
||||
|
||||
let consumer = self.consumer.clone();
|
||||
match result {
|
||||
Ok(()) => {
|
||||
if let Err(e) = consumer.ack(&work_key, &task_json).await {
|
||||
slog::warn!(self.logger, "failed to ack task: {}", e);
|
||||
}
|
||||
self.total_processed.fetch_add(1, Ordering::Relaxed);
|
||||
self.log_stream
|
||||
.info(&task.id, &task.repo_id, "task completed")
|
||||
.await;
|
||||
}
|
||||
Err(e) => {
|
||||
// Log the actual error so we can diagnose why tasks fail immediately.
|
||||
slog::warn!(self.logger, "{}", format!(
|
||||
"task failed task_id={} task_type={} repo_id={} retry={} error={}",
|
||||
task.id, task.task_type, task.repo_id, task.retry_count, e
|
||||
));
|
||||
// Check retry count and decide whether to requeue or discard.
|
||||
const MAX_RETRIES: u32 = 5;
|
||||
if task.retry_count >= MAX_RETRIES {
|
||||
// Max retries exceeded — discard the task to prevent infinite loop.
|
||||
slog::warn!(self.logger, "{}", format!(
|
||||
"task exhausted retries, discarding task_id={} task_type={} repo_id={} retry_count={} last_error={}",
|
||||
task.id, task.task_type, task.repo_id, task.retry_count, e
|
||||
));
|
||||
if let Err(e) = consumer.ack(&work_key, &task_json).await {
|
||||
slog::warn!(self.logger, "failed to ack discarded task: {}", e);
|
||||
}
|
||||
self.total_failed.fetch_add(1, Ordering::Relaxed);
|
||||
self.log_stream
|
||||
.error(&task.id, &task.repo_id, &format!("task failed after {} retries: {}", task.retry_count, e))
|
||||
.await;
|
||||
} else {
|
||||
// Requeue with incremented retry count.
|
||||
let mut task = task;
|
||||
task.retry_count += 1;
|
||||
let retry_json = serde_json::to_string(&task)
|
||||
.unwrap_or_else(|_| task_json.clone());
|
||||
|
||||
if let Err(e) = consumer.nak_with_retry(&work_key, &queue_key, &task_json, &retry_json).await {
|
||||
slog::warn!(self.logger, "failed to nak task: {}", e);
|
||||
}
|
||||
self.total_failed.fetch_add(1, Ordering::Relaxed);
|
||||
self.log_stream
|
||||
.error(&task.id, &task.repo_id, &format!("task failed: {}", e))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_sync(&self, task: &HookTask) -> Result<(), crate::GitError> {
|
||||
let repo_id = models::Uuid::parse_str(&task.repo_id)
|
||||
.map_err(|_| crate::GitError::Internal("invalid repo_id uuid".into()))?;
|
||||
|
||||
let repo = models::repos::repo::Entity::find_by_id(repo_id)
|
||||
.one(self.db.reader())
|
||||
.await
|
||||
.map_err(crate::GitError::from)?
|
||||
.ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?;
|
||||
|
||||
// Fail fast if storage path doesn't exist — avoid blocking spawn_blocking thread pool.
|
||||
if !std::path::Path::new(&repo.storage_path).exists() {
|
||||
return Err(crate::GitError::NotFound(format!(
|
||||
"storage path does not exist: {}", repo.storage_path
|
||||
)));
|
||||
}
|
||||
|
||||
let db_clone = self.db.clone();
|
||||
let cache_clone = self.cache.clone();
|
||||
let repo_clone = repo.clone();
|
||||
let logger_clone = self.logger.clone();
|
||||
|
||||
// Phase 1: capture before branch/tag tips.
|
||||
let before_tips: (Vec<(String, String)>, Vec<(String, String)>) =
|
||||
tokio::task::spawn_blocking({
|
||||
let db = db_clone.clone();
|
||||
let cache = cache_clone.clone();
|
||||
let repo = repo_clone.clone();
|
||||
let logger = logger_clone.clone();
|
||||
move || {
|
||||
let sync = HookMetaDataSync::new(db, cache, repo, logger)?;
|
||||
Ok::<_, crate::GitError>((sync.list_branch_tips(), sync.list_tag_tips()))
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map_err(|e| crate::GitError::Internal(format!("spawn_blocking failed: {}", e)))??;
|
||||
|
||||
// Phase 2: run sync (async operation).
|
||||
let sync_result: Result<(), crate::GitError> = tokio::task::spawn_blocking({
|
||||
let db = db_clone.clone();
|
||||
let cache = cache_clone.clone();
|
||||
let repo = repo_clone.clone();
|
||||
let logger = logger_clone.clone();
|
||||
move || {
|
||||
let sync = HookMetaDataSync::new(db, cache, repo, logger)?;
|
||||
tokio::runtime::Handle::current().block_on(async { sync.sync().await })
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map_err(|e| crate::GitError::Internal(format!("spawn_blocking failed: {}", e)))?;
|
||||
|
||||
sync_result?;
|
||||
|
||||
// Phase 3: capture after branch/tag tips.
|
||||
let after_tips: (Vec<(String, String)>, Vec<(String, String)>) =
|
||||
tokio::task::spawn_blocking({
|
||||
let db = db_clone.clone();
|
||||
let cache = cache_clone.clone();
|
||||
let repo = repo_clone.clone();
|
||||
let logger = logger_clone.clone();
|
||||
move || {
|
||||
let sync = HookMetaDataSync::new(db, cache, repo, logger)?;
|
||||
Ok::<_, crate::GitError>((sync.list_branch_tips(), sync.list_tag_tips()))
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map_err(|e| crate::GitError::Internal(format!("spawn_blocking failed: {}", e)))??;
|
||||
|
||||
let (before_branch_tips, before_tag_tips) = before_tips;
|
||||
let (after_branch_tips, after_tag_tips) = after_tips;
|
||||
|
||||
let repo_uuid = repo.id.to_string();
|
||||
let repo_name = repo.repo_name.clone();
|
||||
let default_branch = repo.default_branch.clone();
|
||||
|
||||
// Resolve namespace = project.name
|
||||
let namespace = models::projects::Project::find_by_id(repo.project)
|
||||
.one(self.db.reader())
|
||||
.await
|
||||
.map_err(|e| crate::GitError::Internal(format!("failed to fetch project: {}", e)))?
|
||||
.map(|p| p.name)
|
||||
.unwrap_or_default();
|
||||
|
||||
let logger = self.logger.clone();
|
||||
let http = self.http.clone();
|
||||
let db = self.db.clone();
|
||||
|
||||
// Dispatch branch push webhooks.
|
||||
for (branch, after_oid) in &after_branch_tips {
|
||||
let before_oid = before_branch_tips
|
||||
.iter()
|
||||
.find(|(n, _)| n == branch)
|
||||
.map(|(_, o)| o.as_str());
|
||||
let changed = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(true);
|
||||
if changed {
|
||||
let before_oid = before_oid.map_or("0", |v| v).to_string();
|
||||
let after = after_oid.clone();
|
||||
let branch_name = branch.clone();
|
||||
|
||||
slog::info!(logger, "{}", format!("detected push on branch branch={} before={} after={}", branch_name, before_oid, after));
|
||||
|
||||
let http = http.clone();
|
||||
let db = db.clone();
|
||||
let logs = logger.clone();
|
||||
let ru = repo_uuid.clone();
|
||||
let ns = namespace.clone();
|
||||
let rn = repo_name.clone();
|
||||
let db_branch = default_branch.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
crate::hook::webhook_dispatch::dispatch_repo_webhooks(
|
||||
&db,
|
||||
&http,
|
||||
&logs,
|
||||
&ru,
|
||||
&ns,
|
||||
&rn,
|
||||
&db_branch,
|
||||
"",
|
||||
"",
|
||||
crate::hook::webhook_dispatch::WebhookEventKind::Push {
|
||||
r#ref: format!("refs/heads/{}", branch_name),
|
||||
before: before_oid,
|
||||
after,
|
||||
commits: vec![],
|
||||
},
|
||||
)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Dispatch tag push webhooks.
|
||||
for (tag, after_oid) in &after_tag_tips {
|
||||
let before_oid = before_tag_tips
|
||||
.iter()
|
||||
.find(|(n, _)| n == tag)
|
||||
.map(|(_, o)| o.as_str());
|
||||
let is_new = before_oid.is_none();
|
||||
let was_updated = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(false);
|
||||
if is_new || was_updated {
|
||||
let before_oid = before_oid.map_or("0", |v| v).to_string();
|
||||
let after = after_oid.clone();
|
||||
let tag_name = tag.clone();
|
||||
|
||||
slog::info!(logger, "{}", format!("detected tag push tag={} before={} after={}", tag_name, before_oid, after));
|
||||
|
||||
let http = http.clone();
|
||||
let db = db.clone();
|
||||
let logs = logger.clone();
|
||||
let ru = repo_uuid.clone();
|
||||
let ns = namespace.clone();
|
||||
let rn = repo_name.clone();
|
||||
let db_branch = default_branch.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
crate::hook::webhook_dispatch::dispatch_repo_webhooks(
|
||||
&db,
|
||||
&http,
|
||||
&logs,
|
||||
&ru,
|
||||
&ns,
|
||||
&rn,
|
||||
&db_branch,
|
||||
"",
|
||||
"",
|
||||
crate::hook::webhook_dispatch::WebhookEventKind::TagPush {
|
||||
r#ref: format!("refs/tags/{}", tag_name),
|
||||
before: before_oid,
|
||||
after,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_fsck(&self, task: &HookTask) -> Result<(), crate::GitError> {
|
||||
let repo_id = models::Uuid::parse_str(&task.repo_id)
|
||||
.map_err(|_| crate::GitError::Internal("invalid repo_id uuid".into()))?;
|
||||
|
||||
let repo = models::repos::repo::Entity::find_by_id(repo_id)
|
||||
.one(self.db.reader())
|
||||
.await
|
||||
.map_err(crate::GitError::from)?
|
||||
.ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?;
|
||||
|
||||
if !std::path::Path::new(&repo.storage_path).exists() {
|
||||
return Err(crate::GitError::NotFound(format!(
|
||||
"storage path does not exist: {}", repo.storage_path
|
||||
)));
|
||||
}
|
||||
|
||||
self.log_stream
|
||||
.info(&task.id, &task.repo_id, "running fsck")
|
||||
.await;
|
||||
|
||||
let db_clone = self.db.clone();
|
||||
let cache_clone = self.cache.clone();
|
||||
let logger_clone = self.logger.clone();
|
||||
|
||||
tokio::task::spawn_blocking(move || -> Result<(), crate::GitError> {
|
||||
tokio::runtime::Handle::current().block_on(async move {
|
||||
let sync =
|
||||
HookMetaDataSync::new(db_clone.clone(), cache_clone, repo, logger_clone)?;
|
||||
let mut txn = db_clone.begin().await.map_err(crate::GitError::from)?;
|
||||
sync.run_fsck_and_rollback_if_corrupt(&mut txn).await
|
||||
})
|
||||
})
|
||||
.await
|
||||
.map_err(|e| crate::GitError::Internal(format!("spawn_blocking failed: {}", e)))??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_gc(&self, task: &HookTask) -> Result<(), crate::GitError> {
|
||||
let repo_id = models::Uuid::parse_str(&task.repo_id)
|
||||
.map_err(|_| crate::GitError::Internal("invalid repo_id uuid".into()))?;
|
||||
|
||||
let repo = models::repos::repo::Entity::find_by_id(repo_id)
|
||||
.one(self.db.reader())
|
||||
.await
|
||||
.map_err(crate::GitError::from)?
|
||||
.ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?;
|
||||
|
||||
if !std::path::Path::new(&repo.storage_path).exists() {
|
||||
return Err(crate::GitError::NotFound(format!(
|
||||
"storage path does not exist: {}", repo.storage_path
|
||||
)));
|
||||
}
|
||||
|
||||
self.log_stream
|
||||
.info(&task.id, &task.repo_id, "running gc")
|
||||
.await;
|
||||
|
||||
let db_clone = self.db.clone();
|
||||
let cache_clone = self.cache.clone();
|
||||
let logger_clone = self.logger.clone();
|
||||
|
||||
tokio::task::spawn_blocking(move || -> Result<(), crate::GitError> {
|
||||
tokio::runtime::Handle::current().block_on(async move {
|
||||
let sync = HookMetaDataSync::new(db_clone, cache_clone, repo, logger_clone)?;
|
||||
sync.run_gc().await
|
||||
})
|
||||
})
|
||||
.await
|
||||
.map_err(|e| crate::GitError::Internal(format!("spawn_blocking failed: {}", e)))??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn metrics(&self) -> PoolMetrics {
|
||||
let running = self.running_count.load(Ordering::Relaxed) as usize;
|
||||
PoolMetrics {
|
||||
running,
|
||||
max_concurrent: self.config.max_concurrent,
|
||||
cpu_usage: 0.0,
|
||||
total_processed: self.total_processed.load(Ordering::Relaxed),
|
||||
total_failed: self.total_failed.load(Ordering::Relaxed),
|
||||
can_accept: running < self.config.max_concurrent,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn can_accept_task_sync(&self) -> bool {
|
||||
let running = self.running_count.load(Ordering::Relaxed) as usize;
|
||||
running < self.config.max_concurrent
|
||||
}
|
||||
|
||||
pub fn log_stream(&self) -> &LogStream {
|
||||
&self.log_stream
|
||||
}
|
||||
}
|
||||
@ -1,177 +0,0 @@
|
||||
use crate::error::GitError;
|
||||
use crate::hook::pool::types::HookTask;
|
||||
use deadpool_redis::cluster::Connection as RedisConn;
|
||||
use slog::Logger;
|
||||
|
||||
/// Redis List consumer using BLMOVE for atomic move-from-queue-to-work pattern.
|
||||
/// Compatible with Redis Cluster via hash tags in key names.
|
||||
pub struct RedisConsumer {
|
||||
pool: deadpool_redis::cluster::Pool,
|
||||
/// Hash-tag-prefixed key prefix, e.g. "{hook}".
|
||||
/// Full queue key: "{hook}:{task_type}"
|
||||
/// Full work key: "{hook}:{task_type}:work"
|
||||
prefix: String,
|
||||
block_timeout_secs: u64,
|
||||
logger: Logger,
|
||||
}
|
||||
|
||||
impl RedisConsumer {
|
||||
pub fn new(
|
||||
pool: deadpool_redis::cluster::Pool,
|
||||
prefix: String,
|
||||
block_timeout_secs: u64,
|
||||
logger: Logger,
|
||||
) -> Self {
|
||||
Self {
|
||||
pool,
|
||||
prefix,
|
||||
block_timeout_secs,
|
||||
logger,
|
||||
}
|
||||
}
|
||||
|
||||
/// Atomically moves a task from the main queue to the work queue using BLMOVE.
|
||||
/// Blocks up to `block_timeout_secs` waiting for a task.
|
||||
///
|
||||
/// Returns `Some((HookTask, task_json))` where `task_json` is the raw JSON string
|
||||
/// needed for LREM on ACK. Returns `None` if the blocking timed out.
|
||||
pub async fn next(&self, task_type: &str) -> Result<Option<(HookTask, String)>, GitError> {
|
||||
let queue_key = format!("{}:{}", self.prefix, task_type);
|
||||
let work_key = format!("{}:{}:work", self.prefix, task_type);
|
||||
|
||||
let redis = self
|
||||
.pool
|
||||
.get()
|
||||
.await
|
||||
.map_err(|e| GitError::Internal(format!("redis pool get failed: {}", e)))?;
|
||||
|
||||
let mut conn: RedisConn = redis;
|
||||
|
||||
// BLMOVE source destination <LEFT|RIGHT> <LEFT|RIGHT> timeout
|
||||
// RIGHT LEFT = BRPOPLPUSH equivalent (pop from right of src, push to left of dst)
|
||||
let task_json: Option<String> = redis::cmd("BLMOVE")
|
||||
.arg(&queue_key)
|
||||
.arg(&work_key)
|
||||
.arg("RIGHT")
|
||||
.arg("LEFT")
|
||||
.arg(self.block_timeout_secs)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.map_err(|e| GitError::Internal(format!("BLMOVE failed: {}", e)))?;
|
||||
|
||||
match task_json {
|
||||
Some(json) => {
|
||||
match serde_json::from_str::<HookTask>(&json) {
|
||||
Ok(task) => {
|
||||
slog::debug!(self.logger, "task dequeued";
|
||||
"task_id" => %task.id,
|
||||
"task_type" => %task.task_type,
|
||||
"queue" => %queue_key
|
||||
);
|
||||
Ok(Some((task, json)))
|
||||
}
|
||||
Err(e) => {
|
||||
// Malformed task — remove from work queue and discard
|
||||
slog::warn!(self.logger, "malformed task JSON, discarding";
|
||||
"error" => %e,
|
||||
"queue" => %work_key
|
||||
);
|
||||
let _ = self.ack_raw(&work_key, &json).await;
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// Timed out, no task available
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Acknowledge a task: remove it from the work queue (LREM).
|
||||
pub async fn ack(&self, work_key: &str, task_json: &str) -> Result<(), GitError> {
|
||||
self.ack_raw(work_key, task_json).await
|
||||
}
|
||||
|
||||
async fn ack_raw(&self, work_key: &str, task_json: &str) -> Result<(), GitError> {
|
||||
let redis = self
|
||||
.pool
|
||||
.get()
|
||||
.await
|
||||
.map_err(|e| GitError::Internal(format!("redis pool get failed: {}", e)))?;
|
||||
|
||||
let mut conn: RedisConn = redis;
|
||||
|
||||
let _: i64 = redis::cmd("LREM")
|
||||
.arg(work_key)
|
||||
.arg(-1) // remove all occurrences
|
||||
.arg(task_json)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.map_err(|e| GitError::Internal(format!("LREM failed: {}", e)))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Negative acknowledge (retry): remove from work queue and push back to main queue.
|
||||
pub async fn nak(
|
||||
&self,
|
||||
work_key: &str,
|
||||
queue_key: &str,
|
||||
task_json: &str,
|
||||
) -> Result<(), GitError> {
|
||||
self.nak_with_retry(work_key, queue_key, task_json, task_json).await
|
||||
}
|
||||
|
||||
/// Negative acknowledge with a different (updated) task JSON — used to
|
||||
/// requeue with an incremented retry_count.
|
||||
pub async fn nak_with_retry(
|
||||
&self,
|
||||
work_key: &str,
|
||||
queue_key: &str,
|
||||
old_task_json: &str,
|
||||
new_task_json: &str,
|
||||
) -> Result<(), GitError> {
|
||||
// Remove the old entry from work queue
|
||||
self.ack_raw(work_key, old_task_json).await?;
|
||||
|
||||
// Push the updated entry back to main queue for retry
|
||||
let redis = self
|
||||
.pool
|
||||
.get()
|
||||
.await
|
||||
.map_err(|e| GitError::Internal(format!("redis pool get failed: {}", e)))?;
|
||||
|
||||
let mut conn: RedisConn = redis;
|
||||
|
||||
let _: i64 = redis::cmd("LPUSH")
|
||||
.arg(queue_key)
|
||||
.arg(new_task_json)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.map_err(|e| GitError::Internal(format!("LPUSH retry failed: {}", e)))?;
|
||||
|
||||
slog::warn!(self.logger, "{}", format!("task nack'd and requeued queue={}", queue_key));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn pool(&self) -> &deadpool_redis::cluster::Pool {
|
||||
&self.pool
|
||||
}
|
||||
|
||||
pub fn prefix(&self) -> &str {
|
||||
&self.prefix
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for RedisConsumer {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
pool: self.pool.clone(),
|
||||
prefix: self.prefix.clone(),
|
||||
block_timeout_secs: self.block_timeout_secs,
|
||||
logger: self.logger.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,44 +0,0 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub use config::hook::PoolConfig;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct HookTask {
|
||||
pub id: String,
|
||||
pub repo_id: String,
|
||||
pub task_type: TaskType,
|
||||
pub payload: serde_json::Value,
|
||||
pub created_at: chrono::DateTime<chrono::Utc>,
|
||||
/// Number of times this task has been retried after a failure.
|
||||
/// When >= MAX_TASK_RETRIES, the task is discarded instead of requeued.
|
||||
#[serde(default)]
|
||||
pub retry_count: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum TaskType {
|
||||
Sync,
|
||||
Fsck,
|
||||
Gc,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TaskType {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
TaskType::Sync => write!(f, "sync"),
|
||||
TaskType::Fsck => write!(f, "fsck"),
|
||||
TaskType::Gc => write!(f, "gc"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PoolMetrics {
|
||||
pub running: usize,
|
||||
pub max_concurrent: usize,
|
||||
pub cpu_usage: f32,
|
||||
pub total_processed: u64,
|
||||
pub total_failed: u64,
|
||||
pub can_accept: bool,
|
||||
}
|
||||
@ -1,63 +0,0 @@
|
||||
use crate::GitError;
|
||||
use crate::hook::sync::HookMetaDataSync;
|
||||
|
||||
impl HookMetaDataSync {
|
||||
const LOCK_TTL_SECS: u64 = 60;
|
||||
|
||||
pub async fn acquire_lock(&self) -> Result<String, GitError> {
|
||||
let lock_key = format!("git:repo:lock:{}", self.repo.id);
|
||||
let lock_value = format!("{}:{}", uuid::Uuid::new_v4(), std::process::id());
|
||||
|
||||
let mut conn = self
|
||||
.cache
|
||||
.conn()
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to get redis connection: {}", e)))?;
|
||||
|
||||
let result: bool = redis::cmd("SET")
|
||||
.arg(&lock_key)
|
||||
.arg(&lock_value)
|
||||
.arg("NX")
|
||||
.arg("EX")
|
||||
.arg(Self::LOCK_TTL_SECS)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to acquire lock: {}", e)))?;
|
||||
|
||||
if result {
|
||||
Ok(lock_value)
|
||||
} else {
|
||||
Err(GitError::Locked(format!(
|
||||
"repository {} is locked by another process",
|
||||
self.repo.id
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn release_lock(&self, lock_value: &str) -> Result<(), GitError> {
|
||||
let lock_key = format!("git:repo:lock:{}", self.repo.id);
|
||||
|
||||
let mut conn = self
|
||||
.cache
|
||||
.conn()
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to get redis connection: {}", e)))?;
|
||||
|
||||
let script = r#"
|
||||
if redis.call("get", KEYS[1]) == ARGV[1] then
|
||||
return redis.call("del", KEYS[1])
|
||||
else
|
||||
return 0
|
||||
end
|
||||
"#;
|
||||
|
||||
let _: i32 = redis::Script::new(script)
|
||||
.key(&lock_key)
|
||||
.arg(lock_value)
|
||||
.invoke_async(&mut conn)
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to release lock: {}", e)))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -3,8 +3,6 @@ pub mod commit;
|
||||
pub mod fsck;
|
||||
pub mod gc;
|
||||
pub mod lfs;
|
||||
pub mod lock;
|
||||
pub mod status;
|
||||
pub mod tag;
|
||||
|
||||
use db::cache::AppCache;
|
||||
@ -159,41 +157,97 @@ impl HookMetaDataSync {
|
||||
})
|
||||
}
|
||||
|
||||
/// Full sync: refs → commits → tags → LFS → fsck → gc → skills.
|
||||
/// No distributed lock — K8s StatefulSet scheduling guarantees exclusive access.
|
||||
pub async fn sync(&self) -> Result<(), crate::GitError> {
|
||||
let lock_value = self.acquire_lock().await?;
|
||||
// All git2 operations must run on a blocking thread since git2 types are not Send.
|
||||
let db = self.db.clone();
|
||||
let cache = self.cache.clone();
|
||||
let repo = self.repo.clone();
|
||||
let logger = self.logger.clone();
|
||||
|
||||
let res = self.sync_internal().await;
|
||||
let res = tokio::task::spawn_blocking(move || {
|
||||
tokio::runtime::Handle::current().block_on(async move {
|
||||
let sync = Self::new(db, cache, repo, logger)?;
|
||||
let out = sync.sync_full().await;
|
||||
if let Err(ref e) = out {
|
||||
slog::error!(sync.logger, "sync failed: {}", e);
|
||||
}
|
||||
out
|
||||
})
|
||||
})
|
||||
.await
|
||||
.map_err(|e| crate::GitError::Internal(format!("spawn_blocking join error: {}", e)))??;
|
||||
|
||||
if let Err(ref e) = res {
|
||||
slog::error!(self.logger, "sync failed: {}", e);
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
if let Err(release_err) = self.release_lock(&lock_value).await {
|
||||
slog::error!(self.logger, "failed to release lock: {}", release_err);
|
||||
}
|
||||
/// Run fsck only (refs snapshot + git fsck + rollback on corruption).
|
||||
pub async fn fsck_only(&self) -> Result<(), crate::GitError> {
|
||||
let db = self.db.clone();
|
||||
let cache = self.cache.clone();
|
||||
let repo = self.repo.clone();
|
||||
let logger = self.logger.clone();
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
async fn sync_internal(&self) -> Result<(), crate::GitError> {
|
||||
let mut txn =
|
||||
self.db.begin().await.map_err(|e| {
|
||||
tokio::task::spawn_blocking(move || {
|
||||
tokio::runtime::Handle::current().block_on(async move {
|
||||
let sync = Self::new(db, cache, repo, logger)?;
|
||||
let mut txn = sync.db.begin().await.map_err(|e| {
|
||||
crate::GitError::IoError(format!("failed to begin transaction: {}", e))
|
||||
})?;
|
||||
sync.run_fsck_and_rollback_if_corrupt(&mut txn).await?;
|
||||
txn.commit().await.map_err(|e| {
|
||||
crate::GitError::IoError(format!("failed to commit transaction: {}", e))
|
||||
})?;
|
||||
Ok::<(), crate::GitError>(())
|
||||
})
|
||||
})
|
||||
.await
|
||||
.map_err(|e| crate::GitError::Internal(format!("spawn_blocking join error: {}", e)))??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Run gc only.
|
||||
pub async fn gc_only(&self) -> Result<(), crate::GitError> {
|
||||
let db = self.db.clone();
|
||||
let cache = self.cache.clone();
|
||||
let repo = self.repo.clone();
|
||||
let logger = self.logger.clone();
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
tokio::runtime::Handle::current().block_on(async move {
|
||||
let sync = Self::new(db, cache, repo, logger)?;
|
||||
sync.run_gc().await
|
||||
})
|
||||
})
|
||||
.await
|
||||
.map_err(|e| crate::GitError::Internal(format!("spawn_blocking join error: {}", e)))??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Full sync pipeline inside a single DB transaction.
|
||||
/// On fsck failure, refs are rolled back and an error is returned.
|
||||
async fn sync_full(&self) -> Result<(), crate::GitError> {
|
||||
let mut txn = self
|
||||
.db
|
||||
.begin()
|
||||
.await
|
||||
.map_err(|e| crate::GitError::IoError(format!("failed to begin transaction: {}", e)))?;
|
||||
|
||||
self.sync_refs(&mut txn).await?;
|
||||
self.sync_commits(&mut txn).await?;
|
||||
self.sync_tags(&mut txn).await?;
|
||||
self.sync_lfs_objects(&mut txn).await?;
|
||||
|
||||
self.run_fsck_and_rollback_if_corrupt(&mut txn).await?;
|
||||
self.run_fsck_and_rollback_if_corrupt(&mut txn)
|
||||
.await?;
|
||||
|
||||
txn.commit().await.map_err(|e| {
|
||||
crate::GitError::IoError(format!("failed to commit transaction: {}", e))
|
||||
})?;
|
||||
|
||||
self.run_gc().await?;
|
||||
|
||||
self.sync_skills().await;
|
||||
|
||||
Ok(())
|
||||
@ -208,7 +262,6 @@ impl HookMetaDataSync {
|
||||
if let Ok(r) = ref_result {
|
||||
if r.is_branch() && !r.is_remote() {
|
||||
if let Some(name) = r.name() {
|
||||
// name is like "refs/heads/main" -> extract "main"
|
||||
let branch = name.strip_prefix("refs/heads/").unwrap_or(name);
|
||||
if let Some(target) = r.target() {
|
||||
tips.push((branch.to_string(), target.to_string()));
|
||||
@ -230,7 +283,6 @@ impl HookMetaDataSync {
|
||||
if let Ok(r) = ref_result {
|
||||
if r.is_tag() {
|
||||
if let Some(name) = r.name() {
|
||||
// name is like "refs/tags/v1.0" -> extract "v1.0"
|
||||
let tag = name.strip_prefix("refs/tags/").unwrap_or(name);
|
||||
if let Some(target) = r.target() {
|
||||
tips.push((tag.to_string(), target.to_string()));
|
||||
@ -244,8 +296,7 @@ impl HookMetaDataSync {
|
||||
}
|
||||
|
||||
/// Scan the repository for `SKILL.md` files and sync skills to the project.
|
||||
///
|
||||
/// This is a best-effort operation — failures are logged but do not fail the sync.
|
||||
/// Best-effort — failures are logged but do not fail the sync.
|
||||
pub async fn sync_skills(&self) {
|
||||
let project_uid = self.repo.project;
|
||||
|
||||
@ -254,13 +305,15 @@ impl HookMetaDataSync {
|
||||
None => return,
|
||||
};
|
||||
|
||||
// Get current HEAD commit SHA for attribution
|
||||
let commit_sha = self.domain.repo().head().ok()
|
||||
let commit_sha = self
|
||||
.domain
|
||||
.repo()
|
||||
.head()
|
||||
.ok()
|
||||
.and_then(|h| h.target())
|
||||
.map(|oid| oid.to_string())
|
||||
.unwrap_or_default();
|
||||
|
||||
// Discover skills from the filesystem
|
||||
let discovered = match scan_skills_from_dir(repo_root, &self.repo.id, &commit_sha) {
|
||||
Ok(d) => d,
|
||||
Err(e) => {
|
||||
@ -278,7 +331,6 @@ impl HookMetaDataSync {
|
||||
let mut updated = 0i64;
|
||||
let mut removed = 0i64;
|
||||
|
||||
// Collect existing repo-sourced skills for this repo
|
||||
let existing: Vec<_> = match SkillEntity::find()
|
||||
.filter(SkillCol::ProjectUuid.eq(project_uid))
|
||||
.filter(SkillCol::Source.eq("repo"))
|
||||
@ -344,7 +396,6 @@ impl HookMetaDataSync {
|
||||
}
|
||||
}
|
||||
|
||||
// Remove skills no longer in the repo
|
||||
for (slug, old_skill) in existing_by_slug {
|
||||
if !seen_slugs.contains(&slug) {
|
||||
if SkillEntity::delete_by_id(old_skill.id).exec(&self.db).await.is_ok() {
|
||||
|
||||
@ -1,98 +0,0 @@
|
||||
use crate::GitError;
|
||||
use crate::hook::sync::HookMetaDataSync;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum SyncStatus {
|
||||
Pending,
|
||||
Processing,
|
||||
Success,
|
||||
Failed(String),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for SyncStatus {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
SyncStatus::Pending => write!(f, "pending"),
|
||||
SyncStatus::Processing => write!(f, "processing"),
|
||||
SyncStatus::Success => write!(f, "success"),
|
||||
SyncStatus::Failed(_) => write!(f, "failed"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl HookMetaDataSync {
|
||||
const STATUS_TTL_SECS: u64 = 86400;
|
||||
|
||||
pub async fn update_sync_status(&self, status: SyncStatus) -> Result<(), GitError> {
|
||||
let key = format!("git:repo:sync_status:{}", self.repo.id);
|
||||
let status_str = status.to_string();
|
||||
|
||||
let mut conn = self
|
||||
.cache
|
||||
.conn()
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to get redis connection: {}", e)))?;
|
||||
|
||||
let _: () = redis::cmd("SETEX")
|
||||
.arg(&key)
|
||||
.arg(Self::STATUS_TTL_SECS)
|
||||
.arg(&status_str)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to set sync status: {}", e)))?;
|
||||
|
||||
if let SyncStatus::Failed(ref error_msg) = status {
|
||||
let error_key = format!("git:repo:sync_error:{}", self.repo.id);
|
||||
let _: () = redis::cmd("SETEX")
|
||||
.arg(&error_key)
|
||||
.arg(Self::STATUS_TTL_SECS)
|
||||
.arg(error_msg)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to set sync error: {}", e)))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_sync_status(&self) -> Result<Option<SyncStatus>, GitError> {
|
||||
let key = format!("git:repo:sync_status:{}", self.repo.id);
|
||||
let error_key = format!("git:repo:sync_error:{}", self.repo.id);
|
||||
|
||||
let mut conn = self
|
||||
.cache
|
||||
.conn()
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to get redis connection: {}", e)))?;
|
||||
|
||||
let status_str: Option<String> =
|
||||
redis::cmd("GET")
|
||||
.arg(&key)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to get sync status: {}", e)))?;
|
||||
|
||||
match status_str {
|
||||
Some(status) => {
|
||||
let error_msg: Option<String> = redis::cmd("GET")
|
||||
.arg(&error_key)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to get sync error: {}", e)))?;
|
||||
|
||||
let sync_status = match status.as_str() {
|
||||
"pending" => SyncStatus::Pending,
|
||||
"processing" => SyncStatus::Processing,
|
||||
"success" => SyncStatus::Success,
|
||||
"failed" => {
|
||||
SyncStatus::Failed(error_msg.unwrap_or_else(|| "Unknown error".to_string()))
|
||||
}
|
||||
_ => SyncStatus::Pending,
|
||||
};
|
||||
|
||||
Ok(Some(sync_status))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,3 +1,4 @@
|
||||
use crate::hook::HookService;
|
||||
use actix_web::{App, HttpServer, web};
|
||||
use config::AppConfig;
|
||||
use db::cache::AppCache;
|
||||
@ -71,7 +72,16 @@ pub async fn run_http(config: AppConfig, logger: Logger) -> anyhow::Result<()> {
|
||||
let app_cache = app_cache?;
|
||||
|
||||
let redis_pool = app_cache.redis_pool().clone();
|
||||
let sync = crate::ssh::ReceiveSyncService::new(redis_pool, logger.clone());
|
||||
let http = Arc::new(reqwest::Client::new());
|
||||
let hook = HookService::new(
|
||||
db.clone(),
|
||||
app_cache.clone(),
|
||||
redis_pool.clone(),
|
||||
logger.clone(),
|
||||
config.clone(),
|
||||
http,
|
||||
);
|
||||
let sync = crate::ssh::ReceiveSyncService::new(hook);
|
||||
|
||||
let rate_limiter = Arc::new(rate_limit::RateLimiter::new(
|
||||
rate_limit::RateLimitConfig::default(),
|
||||
|
||||
@ -36,8 +36,7 @@ pub use diff::types::{
|
||||
};
|
||||
pub use domain::GitDomain;
|
||||
pub use error::{GitError, GitResult};
|
||||
pub use hook::pool::GitHookPool;
|
||||
pub use hook::pool::types::{HookTask, PoolConfig, PoolMetrics, TaskType};
|
||||
pub use hook::sync::HookMetaDataSync;
|
||||
pub use lfs::types::{LfsConfig, LfsEntry, LfsOid, LfsPointer};
|
||||
pub use merge::types::{MergeAnalysisResult, MergeOptions, MergePreferenceResult, MergeheadInfo};
|
||||
pub use reference::types::RefInfo;
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
use crate::error::GitError;
|
||||
use crate::hook::pool::types::{HookTask, TaskType};
|
||||
use crate::hook::HookService;
|
||||
use anyhow::Context;
|
||||
use base64::Engine;
|
||||
use config::AppConfig;
|
||||
@ -137,12 +137,22 @@ impl SSHHandle {
|
||||
"SSH server configured with methods: {:?}", config.methods
|
||||
);
|
||||
let token_service = SshTokenService::new(self.db.clone());
|
||||
let http = Arc::new(reqwest::Client::new());
|
||||
let hook = crate::hook::HookService::new(
|
||||
self.db.clone(),
|
||||
self.cache.clone(),
|
||||
self.redis_pool.clone(),
|
||||
self.logger.clone(),
|
||||
self.app.clone(),
|
||||
http,
|
||||
);
|
||||
let mut server = server::SSHServer::new(
|
||||
self.db.clone(),
|
||||
self.cache.clone(),
|
||||
self.redis_pool.clone(),
|
||||
self.logger.clone(),
|
||||
token_service,
|
||||
hook,
|
||||
);
|
||||
|
||||
// Start the rate limiter cleanup background task so the HashMap
|
||||
@ -167,60 +177,33 @@ impl SSHHandle {
|
||||
}
|
||||
}
|
||||
|
||||
/// Direct sync service — calls HookService::sync_repo inline.
|
||||
/// K8s StatefulSet HA scheduling ensures exclusive access per repo shard.
|
||||
#[derive(Clone)]
|
||||
pub struct ReceiveSyncService {
|
||||
pool: RedisPool,
|
||||
logger: Logger,
|
||||
/// Redis key prefix for hook task queues, e.g. "{hook}".
|
||||
redis_prefix: String,
|
||||
hook: HookService,
|
||||
}
|
||||
|
||||
impl ReceiveSyncService {
|
||||
pub fn new(pool: RedisPool, logger: Logger) -> Self {
|
||||
Self {
|
||||
pool,
|
||||
logger,
|
||||
redis_prefix: "{hook}".to_string(),
|
||||
}
|
||||
pub fn new(hook: HookService) -> Self {
|
||||
Self { hook }
|
||||
}
|
||||
|
||||
pub async fn send(&self, task: RepoReceiveSyncTask) {
|
||||
let hook_task = HookTask {
|
||||
id: uuid::Uuid::new_v4().to_string(),
|
||||
repo_id: task.repo_uid.to_string(),
|
||||
task_type: TaskType::Sync,
|
||||
payload: serde_json::Value::Null,
|
||||
created_at: chrono::Utc::now(),
|
||||
retry_count: 0,
|
||||
};
|
||||
|
||||
let task_json = match serde_json::to_string(&hook_task) {
|
||||
Ok(j) => j,
|
||||
/// Execute a full repo sync synchronously.
|
||||
/// Returns Ok on success, Err on failure.
|
||||
pub async fn send(&self, task: RepoReceiveSyncTask) -> Result<(), crate::GitError> {
|
||||
let repo_id = task.repo_uid.to_string();
|
||||
slog::info!(self.hook.logger, "starting sync repo_id={}", repo_id);
|
||||
let res = self.hook.sync_repo(&repo_id).await;
|
||||
match &res {
|
||||
Ok(()) => {
|
||||
slog::info!(self.hook.logger, "sync completed repo_id={}", repo_id);
|
||||
}
|
||||
Err(e) => {
|
||||
error!(self.logger, "Failed to serialize hook task: {}", e);
|
||||
return;
|
||||
slog::error!(self.hook.logger, "sync failed repo_id={} error={}", repo_id, e);
|
||||
}
|
||||
};
|
||||
|
||||
let queue_key = format!("{}:sync", self.redis_prefix);
|
||||
|
||||
let redis = match self.pool.get().await {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => {
|
||||
error!(self.logger, "Failed to get Redis connection: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut conn: deadpool_redis::cluster::Connection = redis;
|
||||
if let Err(e) = redis::cmd("LPUSH")
|
||||
.arg(&queue_key)
|
||||
.arg(&task_json)
|
||||
.query_async::<()>(&mut conn)
|
||||
.await
|
||||
{
|
||||
error!(self.logger, "{}", format!("Failed to LPUSH sync task repo_id={} error={}", task.repo_uid, e));
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
use crate::hook::HookService;
|
||||
use crate::ssh::ReceiveSyncService;
|
||||
use crate::ssh::SshTokenService;
|
||||
use crate::ssh::handle::SSHandle;
|
||||
@ -15,6 +16,7 @@ pub struct SSHServer {
|
||||
pub redis_pool: RedisPool,
|
||||
pub logger: Logger,
|
||||
pub token_service: SshTokenService,
|
||||
pub hook: HookService,
|
||||
}
|
||||
|
||||
impl SSHServer {
|
||||
@ -24,6 +26,7 @@ impl SSHServer {
|
||||
redis_pool: RedisPool,
|
||||
logger: Logger,
|
||||
token_service: SshTokenService,
|
||||
hook: HookService,
|
||||
) -> Self {
|
||||
SSHServer {
|
||||
db,
|
||||
@ -31,6 +34,7 @@ impl SSHServer {
|
||||
redis_pool,
|
||||
logger,
|
||||
token_service,
|
||||
hook,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -48,7 +52,7 @@ impl russh::server::Server for SSHServer {
|
||||
} else {
|
||||
info!(self.logger, "New SSH connection from unknown address");
|
||||
}
|
||||
let sync_service = ReceiveSyncService::new(self.redis_pool.clone(), self.logger.clone());
|
||||
let sync_service = ReceiveSyncService::new(self.hook.clone());
|
||||
SSHandle::new(
|
||||
self.db.clone(),
|
||||
self.cache.clone(),
|
||||
|
||||
@ -21,7 +21,7 @@ const TAG = process.env.TAG || GIT_SHA_SHORT;
|
||||
const DOCKER_USER = process.env.DOCKER_USER || process.env.HARBOR_USERNAME;
|
||||
const DOCKER_PASS = process.env.DOCKER_PASS || process.env.HARBOR_PASSWORD;
|
||||
|
||||
const SERVICES = ['app', 'gitserver', 'email-worker', 'git-hook', 'operator', 'static', 'frontend'];
|
||||
const SERVICES = ['app', 'gitserver', 'email-worker', 'git-hook', 'operator', 'static'];
|
||||
|
||||
const args = process.argv.slice(2);
|
||||
const targets = args.length > 0 ? args : SERVICES;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user