Compare commits

..

No commits in common. "329b526bfb21b335df82cd8d771dd178bff234cc" and "bbf2d75fbacbc560607186b3481d6a2c51784e46" have entirely different histories.

View File

@ -153,14 +153,14 @@ impl GitHookPool {
.await
{
Ok(Ok(Ok(()))) => false, // spawn_blocking Ok, catch_unwind Ok, body Ok
Ok(Ok(Err(_))) => false, // spawn_blocking Ok, catch_unwind Ok, body Err(()) — error handled inside
Ok(Err(_)) => true, // spawn_blocking Ok, catch_unwind Err = panic
Err(_) => true, // spawn_blocking Err = thread aborted
Ok(Ok(Err(_))) => true, // spawn_blocking Ok, catch_unwind Ok, body Err(()) — never hit
Ok(Err(_)) => true, // spawn_blocking Ok, catch_unwind Err = panic
Err(_) => true, // spawn_blocking Err = thread aborted
};
drop(permit);
counter_clone.fetch_sub(1, Ordering::Relaxed);
if panicked {
slog::error!(logger_clone, "{}", format!("task panicked"));
slog::error!(logger_clone, "task panicked";);
}
});
@ -177,10 +177,13 @@ impl GitHookPool {
queue_key: String,
work_key: String,
) -> Result<(), ()> {
slog::info!(self.logger, "{}", format!(
"task started task_id={} task_type={} repo_id={} worker_id={} retry={}",
task.id, task.task_type, task.repo_id, self.config.worker_id, task.retry_count
));
slog::info!(self.logger, "task started";
"task_id" => &task.id,
"task_type" => %task.task_type,
"repo_id" => &task.repo_id,
"worker_id" => &self.config.worker_id,
"retry" => task.retry_count
);
self.log_stream
.info(
@ -208,19 +211,17 @@ impl GitHookPool {
.await;
}
Err(e) => {
// Log the actual error so we can diagnose why tasks fail immediately.
slog::warn!(self.logger, "{}", format!(
"task failed task_id={} task_type={} repo_id={} retry={} error={}",
task.id, task.task_type, task.repo_id, task.retry_count, e
));
// Check retry count and decide whether to requeue or discard.
const MAX_RETRIES: u32 = 5;
if task.retry_count >= MAX_RETRIES {
// Max retries exceeded — discard the task to prevent infinite loop.
slog::warn!(self.logger, "{}", format!(
"task exhausted retries, discarding task_id={} task_type={} repo_id={} retry_count={} last_error={}",
task.id, task.task_type, task.repo_id, task.retry_count, e
));
slog::warn!(self.logger, "task exhausted retries, discarding";
"task_id" => &task.id,
"task_type" => %task.task_type,
"repo_id" => &task.repo_id,
"retry_count" => task.retry_count,
"last_error" => %e
);
if let Err(e) = consumer.ack(&work_key, &task_json).await {
slog::warn!(self.logger, "failed to ack discarded task: {}", e);
}
@ -259,13 +260,6 @@ impl GitHookPool {
.map_err(crate::GitError::from)?
.ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?;
// Fail fast if storage path doesn't exist — avoid blocking spawn_blocking thread pool.
if !std::path::Path::new(&repo.storage_path).exists() {
return Err(crate::GitError::NotFound(format!(
"storage path does not exist: {}", repo.storage_path
)));
}
let db_clone = self.db.clone();
let cache_clone = self.cache.clone();
let repo_clone = repo.clone();
@ -439,12 +433,6 @@ impl GitHookPool {
.map_err(crate::GitError::from)?
.ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?;
if !std::path::Path::new(&repo.storage_path).exists() {
return Err(crate::GitError::NotFound(format!(
"storage path does not exist: {}", repo.storage_path
)));
}
self.log_stream
.info(&task.id, &task.repo_id, "running fsck")
.await;
@ -477,12 +465,6 @@ impl GitHookPool {
.map_err(crate::GitError::from)?
.ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?;
if !std::path::Path::new(&repo.storage_path).exists() {
return Err(crate::GitError::NotFound(format!(
"storage path does not exist: {}", repo.storage_path
)));
}
self.log_stream
.info(&task.id, &task.repo_id, "running gc")
.await;