gitdataai/libs/git/hook/pool/worker.rs
ZhenYi 1fed9fc8ab fix(git/hook): address review findings — fs blocking, redis timeout, backoff, slog
- sync/mod.rs: wrap scan_skills_from_dir in spawn_blocking to avoid
  blocking the async executor; use to_path_buf() to get owned PathBuf
- pool/worker.rs: replace 500ms poll sleep with cancellation-is_cancelled
  check (eliminates artificial latency); add exponential backoff on Redis
  errors (1s base, 32s cap, reset on success)
- pool/redis.rs: add 5s timeout on pool.get() for all three methods
  (next, ack_raw, nak_with_retry) to prevent indefinite blocking on
  unresponsive Redis
- sync/gc.rs: add comment explaining why git gc --auto non-zero exit
  is benign
- webhook_dispatch.rs: remove unnecessary format! wrappers in slog macros
- config/hook.rs: document max_concurrent intent (K8s operator/HPA, not
  the single-threaded worker itself)
2026-04-17 13:20:31 +08:00

377 lines
14 KiB
Rust

use crate::error::GitError;
use crate::hook::pool::redis::RedisConsumer;
use crate::hook::pool::types::{HookTask, TaskType};
use crate::hook::sync::HookMetaDataSync;
use db::cache::AppCache;
use db::database::AppDatabase;
use models::EntityTrait;
use slog::Logger;
use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
/// Single-threaded worker that sequentially consumes tasks from Redis queues.
/// K8s can scale replicas for concurrency — each replica runs one worker.
/// Per-repo Redis locking is managed inside HookMetaDataSync methods.
#[derive(Clone)]
pub struct HookWorker {
db: AppDatabase,
cache: AppCache,
logger: Logger,
consumer: RedisConsumer,
http_client: Arc<reqwest::Client>,
}
impl HookWorker {
pub fn new(
db: AppDatabase,
cache: AppCache,
logger: Logger,
consumer: RedisConsumer,
http_client: Arc<reqwest::Client>,
) -> Self {
Self {
db,
cache,
logger,
consumer,
http_client,
}
}
/// Run the worker loop. Blocks until cancelled.
pub async fn run(&self, cancel: CancellationToken) {
slog::info!(self.logger, "hook worker started");
let task_types = [TaskType::Sync, TaskType::Fsck, TaskType::Gc];
let mut redis_backoff_ms: u64 = 1000;
loop {
// Check cancellation at top of loop to avoid unnecessary work
if cancel.is_cancelled() {
slog::info!(self.logger, "hook worker shutdown signal received");
break;
}
for task_type in &task_types {
let result = self.consumer.next(&task_type.to_string()).await;
let (task, task_json) = match result {
Ok(Some(pair)) => {
// Reset backoff on successful dequeue
redis_backoff_ms = 1000;
pair
}
Ok(None) => continue,
Err(e) => {
slog::warn!(self.logger, "failed to dequeue task: {}", e);
tokio::time::sleep(Duration::from_millis(redis_backoff_ms)).await;
// Exponential backoff, cap at 32s
redis_backoff_ms = (redis_backoff_ms * 2).min(32_000);
break;
}
};
let queue_key = format!("{}:{}", self.consumer.prefix(), task_type);
let work_key = format!("{}:work", queue_key);
self.process_task(&task, &task_json, &work_key, &queue_key)
.await;
}
}
slog::info!(self.logger, "hook worker stopped");
}
async fn process_task(
&self,
task: &HookTask,
task_json: &str,
work_key: &str,
queue_key: &str,
) {
slog::info!(self.logger, "task started task_id={} task_type={} repo_id={}",
task.id, task.task_type, task.repo_id);
let result = match task.task_type {
TaskType::Sync => self.run_sync(&task.repo_id).await,
TaskType::Fsck => self.run_fsck(&task.repo_id).await,
TaskType::Gc => self.run_gc(&task.repo_id).await,
};
match result {
Ok(()) => {
if let Err(e) = self.consumer.ack(work_key, task_json).await {
slog::warn!(self.logger, "failed to ack task: {}", e);
}
slog::info!(self.logger, "task completed task_id={}", task.id);
}
Err(e) => {
let is_locked = matches!(e, crate::GitError::Locked(_));
if is_locked {
// Another worker holds the lock — requeue without counting as retry.
slog::info!(self.logger, "repo locked by another worker, requeueing task_id={}", task.id);
if let Err(nak_err) = self.consumer.nak(work_key, queue_key, task_json).await {
slog::warn!(self.logger, "failed to requeue locked task: {}", nak_err);
}
} else {
slog::warn!(self.logger, "task failed task_id={} task_type={} repo_id={} error={}",
task.id, task.task_type, task.repo_id, e);
const MAX_RETRIES: u32 = 5;
if task.retry_count >= MAX_RETRIES {
slog::warn!(self.logger, "task exhausted retries, discarding task_id={} retry_count={}",
task.id, task.retry_count);
let _ = self.consumer.ack(work_key, task_json).await;
} else {
let mut task = task.clone();
task.retry_count += 1;
let retry_json =
serde_json::to_string(&task).unwrap_or_else(|_| task_json.to_string());
let _ = self
.consumer
.nak_with_retry(work_key, queue_key, task_json, &retry_json)
.await;
}
}
}
}
}
async fn run_sync(&self, repo_id: &str) -> Result<(), GitError> {
let repo_uuid = models::Uuid::parse_str(repo_id)
.map_err(|_| GitError::Internal("invalid repo_id uuid".into()))?;
let repo = models::repos::repo::Entity::find_by_id(repo_uuid)
.one(self.db.reader())
.await
.map_err(GitError::from)?
.ok_or_else(|| GitError::NotFound(format!("repo {} not found", repo_id)))?;
if !std::path::Path::new(&repo.storage_path).exists() {
return Err(GitError::NotFound(format!(
"storage path does not exist: {}",
repo.storage_path
)));
}
// Capture before tips for webhook diff
let before_tips = tokio::task::spawn_blocking({
let db = self.db.clone();
let cache = self.cache.clone();
let logger = self.logger.clone();
let repo = repo.clone();
move || {
let sync = HookMetaDataSync::new(db, cache, repo, logger)
.map_err(|e| GitError::Internal(e.to_string()))?;
Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips()))
}
})
.await
.map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))?
.map_err(GitError::from)?;
// Run full sync (internally acquires/releases per-repo lock)
let db = self.db.clone();
let cache = self.cache.clone();
let logger = self.logger.clone();
let repo_clone = repo.clone();
tokio::task::spawn_blocking(move || {
let result = tokio::runtime::Handle::current().block_on(async {
let sync = HookMetaDataSync::new(db.clone(), cache.clone(), repo_clone.clone(), logger.clone())?;
sync.sync().await
});
match result {
Ok(()) => Ok::<(), GitError>(()),
Err(e) => Err(GitError::Internal(e.to_string())),
}
})
.await
.map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))?
.map_err(GitError::from)?;
// Capture after tips and dispatch webhooks
let after_tips = tokio::task::spawn_blocking({
let db = self.db.clone();
let cache = self.cache.clone();
let logger = self.logger.clone();
let repo = repo.clone();
move || {
let sync = HookMetaDataSync::new(db, cache, repo, logger)
.map_err(|e| GitError::Internal(e.to_string()))?;
Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips()))
}
})
.await
.map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))?
.map_err(GitError::from)?;
let (before_branch_tips, before_tag_tips) = before_tips;
let (after_branch_tips, after_tag_tips) = after_tips;
let project = repo.project;
// Resolve namespace once outside the loop
let namespace = models::projects::Project::find_by_id(project)
.one(self.db.reader())
.await
.ok()
.flatten()
.map(|p| p.name)
.unwrap_or_default();
let repo_id_str = repo.id.to_string();
let repo_name = repo.repo_name.clone();
let default_branch = repo.default_branch.clone();
let http_client = self.http_client.clone();
let db = self.db.clone();
let logger = self.logger.clone();
// Dispatch branch webhooks and collect handles
let mut handles = Vec::new();
for (branch, after_oid) in after_branch_tips {
let before_oid = before_branch_tips
.iter()
.find(|(n, _)| n == &branch)
.map(|(_, o)| o.as_str());
let changed = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(true);
if changed {
let before_oid = before_oid.map_or("0", |v| v).to_string();
let branch_name = branch.clone();
let h = tokio::spawn({
let http_client = http_client.clone();
let db = db.clone();
let logger = logger.clone();
let repo_id_str = repo_id_str.clone();
let namespace = namespace.clone();
let repo_name = repo_name.clone();
let default_branch = default_branch.clone();
async move {
crate::hook::webhook_dispatch::dispatch_repo_webhooks(
&db,
&http_client,
&logger,
&repo_id_str,
&namespace,
&repo_name,
&default_branch,
"",
"",
crate::hook::webhook_dispatch::WebhookEventKind::Push {
r#ref: format!("refs/heads/{}", branch_name),
before: before_oid,
after: after_oid,
commits: vec![],
},
)
.await;
}
});
handles.push(h);
}
}
// Dispatch tag webhooks and collect handles
for (tag, after_oid) in after_tag_tips {
let before_oid = before_tag_tips
.iter()
.find(|(n, _)| n == &tag)
.map(|(_, o)| o.as_str());
let is_new = before_oid.is_none();
let was_updated = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(false);
if is_new || was_updated {
let before_oid = before_oid.map_or("0", |v| v).to_string();
let tag_name = tag.clone();
let h = tokio::spawn({
let http_client = http_client.clone();
let db = db.clone();
let logger = logger.clone();
let repo_id_str = repo_id_str.clone();
let namespace = namespace.clone();
let repo_name = repo_name.clone();
let default_branch = default_branch.clone();
async move {
crate::hook::webhook_dispatch::dispatch_repo_webhooks(
&db,
&http_client,
&logger,
&repo_id_str,
&namespace,
&repo_name,
&default_branch,
"",
"",
crate::hook::webhook_dispatch::WebhookEventKind::TagPush {
r#ref: format!("refs/tags/{}", tag_name),
before: before_oid,
after: after_oid,
},
)
.await;
}
});
handles.push(h);
}
}
// Wait for all webhooks to complete before returning
for h in handles {
let _ = h.await;
}
Ok(())
}
async fn run_fsck(&self, repo_id: &str) -> Result<(), GitError> {
let repo_uuid = models::Uuid::parse_str(repo_id)
.map_err(|_| GitError::Internal("invalid repo_id uuid".into()))?;
let repo = models::repos::repo::Entity::find_by_id(repo_uuid)
.one(self.db.reader())
.await
.map_err(GitError::from)?
.ok_or_else(|| GitError::NotFound(format!("repo {} not found", repo_id)))?;
if !std::path::Path::new(&repo.storage_path).exists() {
return Err(GitError::NotFound(format!(
"storage path does not exist: {}",
repo.storage_path
)));
}
let db = self.db.clone();
let cache = self.cache.clone();
let logger = self.logger.clone();
let sync = HookMetaDataSync::new(db, cache, repo, logger)?;
sync.fsck_only().await?;
Ok(())
}
async fn run_gc(&self, repo_id: &str) -> Result<(), GitError> {
let repo_uuid = models::Uuid::parse_str(repo_id)
.map_err(|_| GitError::Internal("invalid repo_id uuid".into()))?;
let repo = models::repos::repo::Entity::find_by_id(repo_uuid)
.one(self.db.reader())
.await
.map_err(GitError::from)?
.ok_or_else(|| GitError::NotFound(format!("repo {} not found", repo_id)))?;
if !std::path::Path::new(&repo.storage_path).exists() {
return Err(GitError::NotFound(format!(
"storage path does not exist: {}",
repo.storage_path
)));
}
let db = self.db.clone();
let cache = self.cache.clone();
let logger = self.logger.clone();
let sync = HookMetaDataSync::new(db, cache, repo, logger)?;
sync.gc_only().await?;
Ok(())
}
}