fix(git/hook): address review findings — webhooks, git2 Send, touch_count, rollback
- pool/worker.rs: only dispatch webhooks after sync succeeds (skip on error); pass max_retries from PoolConfig to HookWorker - sync/branch.rs: replaced with stub (sync_refs moved to commit.rs) - sync/commit.rs: add collect_git_refs() that collects BranchTip/TagTip from git2 entirely within one sync call; sync_refs now uses owned data so no git2 types cross .await boundaries (future is Send) - sync/fsck.rs: remove extraneous "HEAD" arg from rollback git update-ref (correct syntax is: update-ref -m <msg> <ref> <new-sha> — no old-sha) - webhook_dispatch.rs: touch_count uses Expr::col().add(1) for atomic increment instead of overwriting with Expr::value(1) - pool/mod.rs: pass config.redis_max_retries to HookWorker
This commit is contained in:
parent
1fed9fc8ab
commit
01b18c97df
@ -30,7 +30,8 @@ pub fn start_worker(
|
||||
);
|
||||
|
||||
let http_client = Arc::new(reqwest::Client::new());
|
||||
let worker = HookWorker::new(db, cache, logger, consumer, http_client);
|
||||
let max_retries = config.redis_max_retries as u32;
|
||||
let worker = HookWorker::new(db, cache, logger, consumer, http_client, max_retries);
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let cancel_clone = cancel.clone();
|
||||
|
||||
@ -20,6 +20,7 @@ pub struct HookWorker {
|
||||
logger: Logger,
|
||||
consumer: RedisConsumer,
|
||||
http_client: Arc<reqwest::Client>,
|
||||
max_retries: u32,
|
||||
}
|
||||
|
||||
impl HookWorker {
|
||||
@ -29,6 +30,7 @@ impl HookWorker {
|
||||
logger: Logger,
|
||||
consumer: RedisConsumer,
|
||||
http_client: Arc<reqwest::Client>,
|
||||
max_retries: u32,
|
||||
) -> Self {
|
||||
Self {
|
||||
db,
|
||||
@ -36,6 +38,7 @@ impl HookWorker {
|
||||
logger,
|
||||
consumer,
|
||||
http_client,
|
||||
max_retries,
|
||||
}
|
||||
}
|
||||
|
||||
@ -119,8 +122,7 @@ impl HookWorker {
|
||||
slog::warn!(self.logger, "task failed task_id={} task_type={} repo_id={} error={}",
|
||||
task.id, task.task_type, task.repo_id, e);
|
||||
|
||||
const MAX_RETRIES: u32 = 5;
|
||||
if task.retry_count >= MAX_RETRIES {
|
||||
if task.retry_count >= self.max_retries {
|
||||
slog::warn!(self.logger, "task exhausted retries, discarding task_id={} retry_count={}",
|
||||
task.id, task.retry_count);
|
||||
let _ = self.consumer.ack(work_key, task_json).await;
|
||||
@ -177,7 +179,7 @@ impl HookWorker {
|
||||
let cache = self.cache.clone();
|
||||
let logger = self.logger.clone();
|
||||
let repo_clone = repo.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let _sync_result = tokio::task::spawn_blocking(move || {
|
||||
let result = tokio::runtime::Handle::current().block_on(async {
|
||||
let sync = HookMetaDataSync::new(db.clone(), cache.clone(), repo_clone.clone(), logger.clone())?;
|
||||
sync.sync().await
|
||||
@ -188,8 +190,10 @@ impl HookWorker {
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))?
|
||||
.map_err(GitError::from)?;
|
||||
.map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))
|
||||
.and_then(|r| r.map_err(GitError::from))?;
|
||||
|
||||
// Only dispatch webhooks if sync succeeded
|
||||
|
||||
// Capture after tips and dispatch webhooks
|
||||
let after_tips = tokio::task::spawn_blocking({
|
||||
|
||||
@ -1,141 +1,2 @@
|
||||
use crate::GitError;
|
||||
use crate::hook::sync::HookMetaDataSync;
|
||||
use db::database::AppTransaction;
|
||||
use models::repos::repo;
|
||||
use models::repos::repo_branch;
|
||||
use sea_orm::prelude::Expr;
|
||||
use sea_orm::*;
|
||||
use std::collections::HashSet;
|
||||
|
||||
impl HookMetaDataSync {
|
||||
pub async fn sync_refs(&self, txn: &AppTransaction) -> Result<(), GitError> {
|
||||
let repo_id = self.repo.id;
|
||||
let now = chrono::Utc::now();
|
||||
|
||||
let existing: Vec<repo_branch::Model> = repo_branch::Entity::find()
|
||||
.filter(repo_branch::Column::Repo.eq(repo_id))
|
||||
.all(txn)
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to query branches: {}", e)))?;
|
||||
let mut existing_names: HashSet<String> = existing.iter().map(|r| r.name.clone()).collect();
|
||||
|
||||
let references = self
|
||||
.domain
|
||||
.repo()
|
||||
.references()
|
||||
.map_err(|e| GitError::Internal(e.to_string()))?;
|
||||
|
||||
// Auto-detect first local branch when default_branch is empty
|
||||
let mut auto_detected_branch: Option<String> = None;
|
||||
|
||||
for reference in references {
|
||||
let reference = reference.map_err(|e| GitError::Internal(e.to_string()))?;
|
||||
let name = reference
|
||||
.name()
|
||||
.ok_or_else(|| GitError::RefNotFound("unnamed ref".into()))?
|
||||
.to_string();
|
||||
let shorthand = reference.shorthand().unwrap_or("").to_string();
|
||||
|
||||
let target_oid = match reference.target() {
|
||||
Some(oid) => oid.to_string(),
|
||||
None => continue,
|
||||
};
|
||||
|
||||
let is_branch = reference.is_branch();
|
||||
let is_remote = reference.is_remote();
|
||||
|
||||
// Detect first local branch if no default is set
|
||||
if self.repo.default_branch.is_empty()
|
||||
&& is_branch
|
||||
&& !is_remote
|
||||
&& auto_detected_branch.is_none()
|
||||
{
|
||||
auto_detected_branch = Some(shorthand.clone());
|
||||
}
|
||||
|
||||
let upstream = if is_branch && !is_remote {
|
||||
reference
|
||||
.shorthand()
|
||||
.map(|short| format!("refs/remotes/{{}}/{}", short))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
if existing_names.contains(&name) {
|
||||
existing_names.remove(&name);
|
||||
repo_branch::Entity::update_many()
|
||||
.filter(repo_branch::Column::Repo.eq(repo_id))
|
||||
.filter(repo_branch::Column::Name.eq(&name))
|
||||
.col_expr(repo_branch::Column::Oid, Expr::value(&target_oid))
|
||||
.col_expr(repo_branch::Column::Upstream, Expr::value(upstream))
|
||||
.col_expr(
|
||||
repo_branch::Column::Head,
|
||||
Expr::value(is_branch && shorthand == self.repo.default_branch),
|
||||
)
|
||||
.col_expr(repo_branch::Column::UpdatedAt, Expr::value(now))
|
||||
.exec(txn)
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to update branch: {}", e)))?;
|
||||
} else {
|
||||
let new_branch = repo_branch::ActiveModel {
|
||||
repo: Set(repo_id),
|
||||
name: Set(name),
|
||||
oid: Set(target_oid),
|
||||
upstream: Set(upstream),
|
||||
head: Set(is_branch && shorthand == self.repo.default_branch),
|
||||
created_at: Set(now),
|
||||
updated_at: Set(now),
|
||||
..Default::default()
|
||||
};
|
||||
new_branch
|
||||
.insert(txn)
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to insert branch: {}", e)))?;
|
||||
}
|
||||
}
|
||||
|
||||
if !existing_names.is_empty() {
|
||||
repo_branch::Entity::delete_many()
|
||||
.filter(repo_branch::Column::Repo.eq(repo_id))
|
||||
.filter(repo_branch::Column::Name.is_in(existing_names))
|
||||
.exec(txn)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
GitError::IoError(format!("failed to delete stale branches: {}", e))
|
||||
})?;
|
||||
}
|
||||
|
||||
// Persist auto-detected default branch and update head flags
|
||||
if let Some(ref branch_name) = auto_detected_branch {
|
||||
// 1. Update the repo's default_branch
|
||||
repo::Entity::update_many()
|
||||
.filter(repo::Column::Id.eq(repo_id))
|
||||
.col_expr(
|
||||
repo::Column::DefaultBranch,
|
||||
Expr::value(branch_name.clone()),
|
||||
)
|
||||
.exec(txn)
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to set default branch: {}", e)))?;
|
||||
|
||||
// 2. Clear head on all branches
|
||||
repo_branch::Entity::update_many()
|
||||
.filter(repo_branch::Column::Repo.eq(repo_id))
|
||||
.col_expr(repo_branch::Column::Head, Expr::value(false))
|
||||
.exec(txn)
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to clear head flags: {}", e)))?;
|
||||
|
||||
// 3. Set head = true for the detected branch (it was inserted above)
|
||||
repo_branch::Entity::update_many()
|
||||
.filter(repo_branch::Column::Repo.eq(repo_id))
|
||||
.filter(repo_branch::Column::Name.eq(branch_name))
|
||||
.col_expr(repo_branch::Column::Head, Expr::value(true))
|
||||
.exec(txn)
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to set head flag: {}", e)))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
// sync_refs has been moved to commit.rs to consolidate sync helpers.
|
||||
// Keeping this module stub to avoid breaking existing module references.
|
||||
|
||||
@ -10,7 +10,225 @@ use sea_orm::*;
|
||||
use sea_query::OnConflict;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
/// Owned branch data collected from git2 (no git2 types after this).
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct BranchTip {
|
||||
pub name: String,
|
||||
pub shorthand: String,
|
||||
pub target_oid: String,
|
||||
pub is_branch: bool,
|
||||
pub is_remote: bool,
|
||||
pub upstream: Option<String>,
|
||||
}
|
||||
|
||||
/// Owned tag data collected from git2 (no git2 types after this).
|
||||
/// Used by sync_tags; currently populated by collect_git_refs but not yet consumed.
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct TagTip {
|
||||
pub name: String,
|
||||
pub target_oid: String,
|
||||
pub description: Option<String>,
|
||||
pub tagger_name: String,
|
||||
pub tagger_email: String,
|
||||
}
|
||||
|
||||
/// Owned commit data collected from git2 (no git2 types after this).
|
||||
#[derive(Debug)]
|
||||
struct CommitData {
|
||||
oid: String,
|
||||
author_name: String,
|
||||
author_email: String,
|
||||
committer_name: String,
|
||||
committer_email: String,
|
||||
message: String,
|
||||
parent_ids: Vec<String>,
|
||||
}
|
||||
|
||||
impl HookMetaDataSync {
|
||||
/// Collect all git2 branch/tag data into owned structs.
|
||||
/// This is sync and must be called from a `spawn_blocking` context.
|
||||
pub(crate) fn collect_git_refs(&self) -> Result<(Vec<BranchTip>, Vec<TagTip>), GitError> {
|
||||
let mut branches = Vec::new();
|
||||
let mut tags = Vec::new();
|
||||
|
||||
let references = self
|
||||
.domain
|
||||
.repo()
|
||||
.references()
|
||||
.map_err(|e| GitError::Internal(e.to_string()))?;
|
||||
|
||||
for ref_result in references {
|
||||
let reference = match ref_result {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
slog::warn!(self.logger, "failed to read reference: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let name = match reference.name() {
|
||||
Some(n) => n.to_string(),
|
||||
None => continue,
|
||||
};
|
||||
|
||||
let target_oid = match reference.target() {
|
||||
Some(oid) => oid.to_string(),
|
||||
None => continue,
|
||||
};
|
||||
|
||||
let shorthand = reference.shorthand().unwrap_or("").to_string();
|
||||
let is_branch = reference.is_branch();
|
||||
let is_remote = reference.is_remote();
|
||||
|
||||
if reference.is_tag() {
|
||||
tags.push(TagTip {
|
||||
name: name.strip_prefix("refs/tags/").unwrap_or(&name).to_string(),
|
||||
target_oid,
|
||||
description: None,
|
||||
tagger_name: String::new(),
|
||||
tagger_email: String::new(),
|
||||
});
|
||||
} else if is_branch && !is_remote {
|
||||
let upstream = reference
|
||||
.shorthand()
|
||||
.map(|short| format!("refs/remotes/{{}}/{}", short));
|
||||
|
||||
branches.push(BranchTip {
|
||||
name,
|
||||
shorthand,
|
||||
target_oid,
|
||||
is_branch,
|
||||
is_remote,
|
||||
upstream,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok((branches, tags))
|
||||
}
|
||||
|
||||
pub async fn sync_refs(&self, txn: &AppTransaction) -> Result<(), GitError> {
|
||||
let repo_id = self.repo.id;
|
||||
let now = chrono::Utc::now();
|
||||
|
||||
let existing: Vec<models::repos::repo_branch::Model> =
|
||||
models::repos::repo_branch::Entity::find()
|
||||
.filter(models::repos::repo_branch::Column::Repo.eq(repo_id))
|
||||
.all(txn)
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to query branches: {}", e)))?;
|
||||
let mut existing_names: HashSet<String> =
|
||||
existing.iter().map(|r| r.name.clone()).collect();
|
||||
|
||||
let (branches, _) = self.collect_git_refs()?;
|
||||
|
||||
// Auto-detect first local branch when default_branch is empty
|
||||
let mut auto_detected_branch: Option<String> = None;
|
||||
|
||||
for branch in &branches {
|
||||
if existing_names.contains(&branch.name) {
|
||||
existing_names.remove(&branch.name);
|
||||
models::repos::repo_branch::Entity::update_many()
|
||||
.filter(models::repos::repo_branch::Column::Repo.eq(repo_id))
|
||||
.filter(models::repos::repo_branch::Column::Name.eq(&branch.name))
|
||||
.col_expr(
|
||||
models::repos::repo_branch::Column::Oid,
|
||||
sea_orm::prelude::Expr::value(&branch.target_oid),
|
||||
)
|
||||
.col_expr(
|
||||
models::repos::repo_branch::Column::Upstream,
|
||||
sea_orm::prelude::Expr::value(branch.upstream.clone()),
|
||||
)
|
||||
.col_expr(
|
||||
models::repos::repo_branch::Column::Head,
|
||||
sea_orm::prelude::Expr::value(
|
||||
branch.is_branch
|
||||
&& branch.shorthand == self.repo.default_branch,
|
||||
),
|
||||
)
|
||||
.col_expr(
|
||||
models::repos::repo_branch::Column::UpdatedAt,
|
||||
sea_orm::prelude::Expr::value(now),
|
||||
)
|
||||
.exec(txn)
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to update branch: {}", e)))?;
|
||||
} else {
|
||||
let new_branch = models::repos::repo_branch::ActiveModel {
|
||||
repo: Set(repo_id),
|
||||
name: Set(branch.name.clone()),
|
||||
oid: Set(branch.target_oid.clone()),
|
||||
upstream: Set(branch.upstream.clone()),
|
||||
head: Set(branch.is_branch && branch.shorthand == self.repo.default_branch),
|
||||
created_at: Set(now),
|
||||
updated_at: Set(now),
|
||||
..Default::default()
|
||||
};
|
||||
new_branch
|
||||
.insert(txn)
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to insert branch: {}", e)))?;
|
||||
}
|
||||
|
||||
// Detect first local branch if no default is set
|
||||
if self.repo.default_branch.is_empty()
|
||||
&& branch.is_branch
|
||||
&& !branch.is_remote
|
||||
&& auto_detected_branch.is_none()
|
||||
{
|
||||
auto_detected_branch = Some(branch.shorthand.clone());
|
||||
}
|
||||
}
|
||||
|
||||
if !existing_names.is_empty() {
|
||||
models::repos::repo_branch::Entity::delete_many()
|
||||
.filter(models::repos::repo_branch::Column::Repo.eq(repo_id))
|
||||
.filter(models::repos::repo_branch::Column::Name.is_in(existing_names))
|
||||
.exec(txn)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
GitError::IoError(format!("failed to delete stale branches: {}", e))
|
||||
})?;
|
||||
}
|
||||
|
||||
// Persist auto-detected default branch and update head flags
|
||||
if let Some(ref branch_name) = auto_detected_branch {
|
||||
models::repos::repo::Entity::update_many()
|
||||
.filter(models::repos::repo::Column::Id.eq(repo_id))
|
||||
.col_expr(
|
||||
models::repos::repo::Column::DefaultBranch,
|
||||
sea_orm::prelude::Expr::value(branch_name.clone()),
|
||||
)
|
||||
.exec(txn)
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to set default branch: {}", e)))?;
|
||||
|
||||
models::repos::repo_branch::Entity::update_many()
|
||||
.filter(models::repos::repo_branch::Column::Repo.eq(repo_id))
|
||||
.col_expr(
|
||||
models::repos::repo_branch::Column::Head,
|
||||
sea_orm::prelude::Expr::value(false),
|
||||
)
|
||||
.exec(txn)
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to clear head flags: {}", e)))?;
|
||||
|
||||
models::repos::repo_branch::Entity::update_many()
|
||||
.filter(models::repos::repo_branch::Column::Repo.eq(repo_id))
|
||||
.filter(models::repos::repo_branch::Column::Name.eq(branch_name))
|
||||
.col_expr(
|
||||
models::repos::repo_branch::Column::Head,
|
||||
sea_orm::prelude::Expr::value(true),
|
||||
)
|
||||
.exec(txn)
|
||||
.await
|
||||
.map_err(|e| GitError::IoError(format!("failed to set head flag: {}", e)))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn sync_commits(&self, txn: &AppTransaction) -> Result<(), GitError> {
|
||||
let repo_id = self.repo.id;
|
||||
let repo = self.domain.repo();
|
||||
@ -200,7 +418,6 @@ impl HookMetaDataSync {
|
||||
created_at: Set(now),
|
||||
..Default::default()
|
||||
};
|
||||
// Use ON CONFLICT DO NOTHING so concurrent syncs don't collide.
|
||||
let _ = RepoCollaborator::insert(new_collab)
|
||||
.on_conflict(
|
||||
OnConflict::columns([
|
||||
@ -233,13 +450,3 @@ impl HookMetaDataSync {
|
||||
names
|
||||
}
|
||||
}
|
||||
|
||||
struct CommitData {
|
||||
oid: String,
|
||||
author_name: String,
|
||||
author_email: String,
|
||||
committer_name: String,
|
||||
committer_email: String,
|
||||
message: String,
|
||||
parent_ids: Vec<String>,
|
||||
}
|
||||
|
||||
@ -106,6 +106,7 @@ impl HookMetaDataSync {
|
||||
|
||||
let _ = tokio::task::spawn_blocking(move || {
|
||||
for (ref_name, oid) in &refs {
|
||||
// git update-ref -m <msg> <ref> <new-sha> — no old-sha in rollback
|
||||
let status = Command::new("git")
|
||||
.arg("-C")
|
||||
.arg(&storage_path)
|
||||
@ -114,7 +115,6 @@ impl HookMetaDataSync {
|
||||
.arg("rollback: integrity check failed")
|
||||
.arg(ref_name)
|
||||
.arg(oid)
|
||||
.arg("HEAD")
|
||||
.status();
|
||||
|
||||
match status {
|
||||
|
||||
@ -384,7 +384,7 @@ pub async fn dispatch_repo_webhooks(
|
||||
async fn touch_webhook(db: &AppDatabase, webhook_id: i64, success: bool, logs: &slog::Logger) {
|
||||
use models::repos::repo_webhook::{Column as RwCol, Entity as RepoWebhookEntity};
|
||||
use models::{ColumnTrait, EntityTrait, QueryFilter};
|
||||
use sea_orm::prelude::Expr;
|
||||
use sea_orm::{sea_query::Expr, ExprTrait};
|
||||
|
||||
let result: Result<sea_orm::UpdateResult, sea_orm::DbErr> = if success {
|
||||
RepoWebhookEntity::update_many()
|
||||
@ -393,13 +393,13 @@ async fn touch_webhook(db: &AppDatabase, webhook_id: i64, success: bool, logs: &
|
||||
RwCol::LastDeliveredAt,
|
||||
Expr::value(Some(chrono::Utc::now())),
|
||||
)
|
||||
.col_expr(RwCol::TouchCount, Expr::value(1i64))
|
||||
.col_expr(RwCol::TouchCount, Expr::col(RwCol::TouchCount).add(1))
|
||||
.exec(db.writer())
|
||||
.await
|
||||
} else {
|
||||
RepoWebhookEntity::update_many()
|
||||
.filter(RwCol::Id.eq(webhook_id))
|
||||
.col_expr(RwCol::TouchCount, Expr::value(1i64))
|
||||
.col_expr(RwCol::TouchCount, Expr::col(RwCol::TouchCount).add(1))
|
||||
.exec(db.writer())
|
||||
.await
|
||||
};
|
||||
|
||||
Loading…
Reference in New Issue
Block a user