gitdataai/libs/git/hook/pool/worker.rs
ZhenYi ef61b193c4 fix(git/hook): refine Redis queue worker, remove dead code, fix warnings
- pool/mod.rs: pass shared http_client Arc to HookWorker
- worker.rs: remove double-locking (sync() manages its own lock),
  await all webhook handles before returning, share http_client,
  hoist namespace query out of loop
- redis.rs: atomic NAK via Lua script (LREM + LPUSH in one eval)
- sync/lock.rs: increase LOCK_TTL from 60s to 300s for large repos
- sync/mod.rs: split sync/sync_work, fsck_only/fsck_work, gc_only/gc_work
  so callers can choose locked vs lock-free path; run_gc + sync_skills
  outside the DB transaction
- hook/mod.rs: remove unused http field from HookService
- ssh/mod.rs, http/mod.rs: remove unused HookService/http imports
2026-04-17 13:05:07 +08:00

397 lines
15 KiB
Rust

use crate::error::GitError;
use crate::hook::pool::redis::RedisConsumer;
use crate::hook::pool::types::{HookTask, TaskType};
use crate::hook::sync::HookMetaDataSync;
use db::cache::AppCache;
use db::database::AppDatabase;
use models::EntityTrait;
use slog::Logger;
use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
/// Single-threaded worker that sequentially consumes tasks from Redis queues.
/// K8s can scale replicas for concurrency — each replica runs one worker.
/// Per-repo Redis locking is managed inside HookMetaDataSync methods.
#[derive(Clone)]
pub struct HookWorker {
db: AppDatabase,
cache: AppCache,
logger: Logger,
consumer: RedisConsumer,
http_client: Arc<reqwest::Client>,
}
impl HookWorker {
pub fn new(
db: AppDatabase,
cache: AppCache,
logger: Logger,
consumer: RedisConsumer,
http_client: Arc<reqwest::Client>,
) -> Self {
Self {
db,
cache,
logger,
consumer,
http_client,
}
}
/// Run the worker loop. Blocks until cancelled.
pub async fn run(&self, cancel: CancellationToken) {
slog::info!(self.logger, "hook worker started");
let task_types = [TaskType::Sync, TaskType::Fsck, TaskType::Gc];
let poll_interval = Duration::from_millis(500);
loop {
tokio::select! {
_ = cancel.cancelled() => {
slog::info!(self.logger, "hook worker shutdown signal received");
break;
}
_ = tokio::time::sleep(poll_interval) => {}
}
for task_type in &task_types {
let result = self.consumer.next(&task_type.to_string()).await;
let (task, task_json) = match result {
Ok(Some(pair)) => pair,
Ok(None) => continue,
Err(e) => {
slog::warn!(self.logger, "failed to dequeue task: {}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
break;
}
};
let queue_key = format!("{}:{}", self.consumer.prefix(), task_type);
let work_key = format!("{}:work", queue_key);
self.process_task(&task, &task_json, &work_key, &queue_key)
.await;
}
}
slog::info!(self.logger, "hook worker stopped");
}
async fn process_task(
&self,
task: &HookTask,
task_json: &str,
work_key: &str,
queue_key: &str,
) {
slog::info!(self.logger, "task started task_id={} task_type={} repo_id={}",
task.id, task.task_type, task.repo_id);
let result = match task.task_type {
TaskType::Sync => self.run_sync(&task.repo_id).await,
TaskType::Fsck => self.run_fsck(&task.repo_id).await,
TaskType::Gc => self.run_gc(&task.repo_id).await,
};
match result {
Ok(()) => {
if let Err(e) = self.consumer.ack(work_key, task_json).await {
slog::warn!(self.logger, "failed to ack task: {}", e);
}
slog::info!(self.logger, "task completed task_id={}", task.id);
}
Err(e) => {
let is_locked = matches!(e, crate::GitError::Locked(_));
if is_locked {
// Another worker holds the lock — requeue without counting as retry.
slog::info!(self.logger, "repo locked by another worker, requeueing task_id={}", task.id);
if let Err(nak_err) = self.consumer.nak(work_key, queue_key, task_json).await {
slog::warn!(self.logger, "failed to requeue locked task: {}", nak_err);
}
} else {
slog::warn!(self.logger, "task failed task_id={} task_type={} repo_id={} error={}",
task.id, task.task_type, task.repo_id, e);
const MAX_RETRIES: u32 = 5;
if task.retry_count >= MAX_RETRIES {
slog::warn!(self.logger, "task exhausted retries, discarding task_id={} retry_count={}",
task.id, task.retry_count);
let _ = self.consumer.ack(work_key, task_json).await;
} else {
let mut task = task.clone();
task.retry_count += 1;
let retry_json =
serde_json::to_string(&task).unwrap_or_else(|_| task_json.to_string());
let _ = self
.consumer
.nak_with_retry(work_key, queue_key, task_json, &retry_json)
.await;
}
}
}
}
}
async fn run_sync(&self, repo_id: &str) -> Result<(), GitError> {
let repo_uuid = models::Uuid::parse_str(repo_id)
.map_err(|_| GitError::Internal("invalid repo_id uuid".into()))?;
let repo = models::repos::repo::Entity::find_by_id(repo_uuid)
.one(self.db.reader())
.await
.map_err(GitError::from)?
.ok_or_else(|| GitError::NotFound(format!("repo {} not found", repo_id)))?;
if !std::path::Path::new(&repo.storage_path).exists() {
return Err(GitError::NotFound(format!(
"storage path does not exist: {}",
repo.storage_path
)));
}
// Capture before tips for webhook diff
let before_tips = tokio::task::spawn_blocking({
let db = self.db.clone();
let cache = self.cache.clone();
let logger = self.logger.clone();
let repo = repo.clone();
move || {
let sync = HookMetaDataSync::new(db, cache, repo, logger)
.map_err(|e| GitError::Internal(e.to_string()))?;
Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips()))
}
})
.await
.map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))?
.map_err(GitError::from)?;
// Run full sync (internally acquires/releases per-repo lock)
let db = self.db.clone();
let cache = self.cache.clone();
let logger = self.logger.clone();
let repo_clone = repo.clone();
tokio::task::spawn_blocking(move || {
let result = tokio::runtime::Handle::current().block_on(async {
let sync = HookMetaDataSync::new(db.clone(), cache.clone(), repo_clone.clone(), logger.clone())?;
sync.sync().await
});
match result {
Ok(()) => Ok::<(), GitError>(()),
Err(e) => Err(GitError::Internal(e.to_string())),
}
})
.await
.map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))?
.map_err(GitError::from)?;
// Capture after tips and dispatch webhooks
let after_tips = tokio::task::spawn_blocking({
let db = self.db.clone();
let cache = self.cache.clone();
let logger = self.logger.clone();
let repo = repo.clone();
move || {
let sync = HookMetaDataSync::new(db, cache, repo, logger)
.map_err(|e| GitError::Internal(e.to_string()))?;
Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips()))
}
})
.await
.map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))?
.map_err(GitError::from)?;
let (before_branch_tips, before_tag_tips) = before_tips;
let (after_branch_tips, after_tag_tips) = after_tips;
let project = repo.project;
// Resolve namespace once outside the loop
let namespace = models::projects::Project::find_by_id(project)
.one(self.db.reader())
.await
.ok()
.flatten()
.map(|p| p.name)
.unwrap_or_default();
let repo_id_str = repo.id.to_string();
let repo_name = repo.repo_name.clone();
let default_branch = repo.default_branch.clone();
let http_client = self.http_client.clone();
let db = self.db.clone();
let logger = self.logger.clone();
// Dispatch branch webhooks and collect handles
let mut handles = Vec::new();
for (branch, after_oid) in after_branch_tips {
let before_oid = before_branch_tips
.iter()
.find(|(n, _)| n == &branch)
.map(|(_, o)| o.as_str());
let changed = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(true);
if changed {
let before_oid = before_oid.map_or("0", |v| v).to_string();
let branch_name = branch.clone();
let h = tokio::spawn({
let http_client = http_client.clone();
let db = db.clone();
let logger = logger.clone();
let repo_id_str = repo_id_str.clone();
let namespace = namespace.clone();
let repo_name = repo_name.clone();
let default_branch = default_branch.clone();
async move {
crate::hook::webhook_dispatch::dispatch_repo_webhooks(
&db,
&http_client,
&logger,
&repo_id_str,
&namespace,
&repo_name,
&default_branch,
"",
"",
crate::hook::webhook_dispatch::WebhookEventKind::Push {
r#ref: format!("refs/heads/{}", branch_name),
before: before_oid,
after: after_oid,
commits: vec![],
},
)
.await;
}
});
handles.push(h);
}
}
// Dispatch tag webhooks and collect handles
for (tag, after_oid) in after_tag_tips {
let before_oid = before_tag_tips
.iter()
.find(|(n, _)| n == &tag)
.map(|(_, o)| o.as_str());
let is_new = before_oid.is_none();
let was_updated = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(false);
if is_new || was_updated {
let before_oid = before_oid.map_or("0", |v| v).to_string();
let tag_name = tag.clone();
let h = tokio::spawn({
let http_client = http_client.clone();
let db = db.clone();
let logger = logger.clone();
let repo_id_str = repo_id_str.clone();
let namespace = namespace.clone();
let repo_name = repo_name.clone();
let default_branch = default_branch.clone();
async move {
crate::hook::webhook_dispatch::dispatch_repo_webhooks(
&db,
&http_client,
&logger,
&repo_id_str,
&namespace,
&repo_name,
&default_branch,
"",
"",
crate::hook::webhook_dispatch::WebhookEventKind::TagPush {
r#ref: format!("refs/tags/{}", tag_name),
before: before_oid,
after: after_oid,
},
)
.await;
}
});
handles.push(h);
}
}
// Wait for all webhooks to complete before returning
for h in handles {
let _ = h.await;
}
Ok(())
}
async fn run_fsck(&self, repo_id: &str) -> Result<(), GitError> {
let repo_uuid = models::Uuid::parse_str(repo_id)
.map_err(|_| GitError::Internal("invalid repo_id uuid".into()))?;
let repo = models::repos::repo::Entity::find_by_id(repo_uuid)
.one(self.db.reader())
.await
.map_err(GitError::from)?
.ok_or_else(|| GitError::NotFound(format!("repo {} not found", repo_id)))?;
if !std::path::Path::new(&repo.storage_path).exists() {
return Err(GitError::NotFound(format!(
"storage path does not exist: {}",
repo.storage_path
)));
}
let db = self.db.clone();
let cache = self.cache.clone();
let logger = self.logger.clone();
tokio::task::spawn_blocking(move || {
let result = tokio::runtime::Handle::current().block_on(async {
let sync = HookMetaDataSync::new(db.clone(), cache.clone(), repo.clone(), logger.clone())?;
sync.fsck_only().await
});
match result {
Ok(()) => Ok::<(), GitError>(()),
Err(e) => Err(GitError::Internal(e.to_string())),
}
})
.await
.map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))?
.map_err(GitError::from)?;
Ok(())
}
async fn run_gc(&self, repo_id: &str) -> Result<(), GitError> {
let repo_uuid = models::Uuid::parse_str(repo_id)
.map_err(|_| GitError::Internal("invalid repo_id uuid".into()))?;
let repo = models::repos::repo::Entity::find_by_id(repo_uuid)
.one(self.db.reader())
.await
.map_err(GitError::from)?
.ok_or_else(|| GitError::NotFound(format!("repo {} not found", repo_id)))?;
if !std::path::Path::new(&repo.storage_path).exists() {
return Err(GitError::NotFound(format!(
"storage path does not exist: {}",
repo.storage_path
)));
}
let db = self.db.clone();
let cache = self.cache.clone();
let logger = self.logger.clone();
tokio::task::spawn_blocking(move || {
let result = tokio::runtime::Handle::current().block_on(async {
let sync = HookMetaDataSync::new(db.clone(), cache.clone(), repo.clone(), logger.clone())?;
sync.gc_only().await
});
match result {
Ok(()) => Ok::<(), GitError>(()),
Err(e) => Err(GitError::Internal(e.to_string())),
}
})
.await
.map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))?
.map_err(GitError::from)?;
Ok(())
}
}