From 01b18c97dfcabe83f08f52781fb877ccde8372eb Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Fri, 17 Apr 2026 13:44:33 +0800 Subject: [PATCH] =?UTF-8?q?fix(git/hook):=20address=20review=20findings=20?= =?UTF-8?q?=E2=80=94=20webhooks,=20git2=20Send,=20touch=5Fcount,=20rollbac?= =?UTF-8?q?k?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - pool/worker.rs: only dispatch webhooks after sync succeeds (skip on error); pass max_retries from PoolConfig to HookWorker - sync/branch.rs: replaced with stub (sync_refs moved to commit.rs) - sync/commit.rs: add collect_git_refs() that collects BranchTip/TagTip from git2 entirely within one sync call; sync_refs now uses owned data so no git2 types cross .await boundaries (future is Send) - sync/fsck.rs: remove extraneous "HEAD" arg from rollback git update-ref (correct syntax is: update-ref -m — no old-sha) - webhook_dispatch.rs: touch_count uses Expr::col().add(1) for atomic increment instead of overwriting with Expr::value(1) - pool/mod.rs: pass config.redis_max_retries to HookWorker --- libs/git/hook/pool/mod.rs | 3 +- libs/git/hook/pool/worker.rs | 14 +- libs/git/hook/sync/branch.rs | 143 +------------------ libs/git/hook/sync/commit.rs | 229 ++++++++++++++++++++++++++++-- libs/git/hook/sync/fsck.rs | 2 +- libs/git/hook/webhook_dispatch.rs | 6 +- 6 files changed, 235 insertions(+), 162 deletions(-) diff --git a/libs/git/hook/pool/mod.rs b/libs/git/hook/pool/mod.rs index 1fec097..2248978 100644 --- a/libs/git/hook/pool/mod.rs +++ b/libs/git/hook/pool/mod.rs @@ -30,7 +30,8 @@ pub fn start_worker( ); let http_client = Arc::new(reqwest::Client::new()); - let worker = HookWorker::new(db, cache, logger, consumer, http_client); + let max_retries = config.redis_max_retries as u32; + let worker = HookWorker::new(db, cache, logger, consumer, http_client, max_retries); let cancel = CancellationToken::new(); let cancel_clone = cancel.clone(); diff --git a/libs/git/hook/pool/worker.rs b/libs/git/hook/pool/worker.rs index f89e648..853c2e4 100644 --- a/libs/git/hook/pool/worker.rs +++ b/libs/git/hook/pool/worker.rs @@ -20,6 +20,7 @@ pub struct HookWorker { logger: Logger, consumer: RedisConsumer, http_client: Arc, + max_retries: u32, } impl HookWorker { @@ -29,6 +30,7 @@ impl HookWorker { logger: Logger, consumer: RedisConsumer, http_client: Arc, + max_retries: u32, ) -> Self { Self { db, @@ -36,6 +38,7 @@ impl HookWorker { logger, consumer, http_client, + max_retries, } } @@ -119,8 +122,7 @@ impl HookWorker { slog::warn!(self.logger, "task failed task_id={} task_type={} repo_id={} error={}", task.id, task.task_type, task.repo_id, e); - const MAX_RETRIES: u32 = 5; - if task.retry_count >= MAX_RETRIES { + if task.retry_count >= self.max_retries { slog::warn!(self.logger, "task exhausted retries, discarding task_id={} retry_count={}", task.id, task.retry_count); let _ = self.consumer.ack(work_key, task_json).await; @@ -177,7 +179,7 @@ impl HookWorker { let cache = self.cache.clone(); let logger = self.logger.clone(); let repo_clone = repo.clone(); - tokio::task::spawn_blocking(move || { + let _sync_result = tokio::task::spawn_blocking(move || { let result = tokio::runtime::Handle::current().block_on(async { let sync = HookMetaDataSync::new(db.clone(), cache.clone(), repo_clone.clone(), logger.clone())?; sync.sync().await @@ -188,8 +190,10 @@ impl HookWorker { } }) .await - .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))? - .map_err(GitError::from)?; + .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e))) + .and_then(|r| r.map_err(GitError::from))?; + + // Only dispatch webhooks if sync succeeded // Capture after tips and dispatch webhooks let after_tips = tokio::task::spawn_blocking({ diff --git a/libs/git/hook/sync/branch.rs b/libs/git/hook/sync/branch.rs index 74221f7..94f6837 100644 --- a/libs/git/hook/sync/branch.rs +++ b/libs/git/hook/sync/branch.rs @@ -1,141 +1,2 @@ -use crate::GitError; -use crate::hook::sync::HookMetaDataSync; -use db::database::AppTransaction; -use models::repos::repo; -use models::repos::repo_branch; -use sea_orm::prelude::Expr; -use sea_orm::*; -use std::collections::HashSet; - -impl HookMetaDataSync { - pub async fn sync_refs(&self, txn: &AppTransaction) -> Result<(), GitError> { - let repo_id = self.repo.id; - let now = chrono::Utc::now(); - - let existing: Vec = repo_branch::Entity::find() - .filter(repo_branch::Column::Repo.eq(repo_id)) - .all(txn) - .await - .map_err(|e| GitError::IoError(format!("failed to query branches: {}", e)))?; - let mut existing_names: HashSet = existing.iter().map(|r| r.name.clone()).collect(); - - let references = self - .domain - .repo() - .references() - .map_err(|e| GitError::Internal(e.to_string()))?; - - // Auto-detect first local branch when default_branch is empty - let mut auto_detected_branch: Option = None; - - for reference in references { - let reference = reference.map_err(|e| GitError::Internal(e.to_string()))?; - let name = reference - .name() - .ok_or_else(|| GitError::RefNotFound("unnamed ref".into()))? - .to_string(); - let shorthand = reference.shorthand().unwrap_or("").to_string(); - - let target_oid = match reference.target() { - Some(oid) => oid.to_string(), - None => continue, - }; - - let is_branch = reference.is_branch(); - let is_remote = reference.is_remote(); - - // Detect first local branch if no default is set - if self.repo.default_branch.is_empty() - && is_branch - && !is_remote - && auto_detected_branch.is_none() - { - auto_detected_branch = Some(shorthand.clone()); - } - - let upstream = if is_branch && !is_remote { - reference - .shorthand() - .map(|short| format!("refs/remotes/{{}}/{}", short)) - } else { - None - }; - - if existing_names.contains(&name) { - existing_names.remove(&name); - repo_branch::Entity::update_many() - .filter(repo_branch::Column::Repo.eq(repo_id)) - .filter(repo_branch::Column::Name.eq(&name)) - .col_expr(repo_branch::Column::Oid, Expr::value(&target_oid)) - .col_expr(repo_branch::Column::Upstream, Expr::value(upstream)) - .col_expr( - repo_branch::Column::Head, - Expr::value(is_branch && shorthand == self.repo.default_branch), - ) - .col_expr(repo_branch::Column::UpdatedAt, Expr::value(now)) - .exec(txn) - .await - .map_err(|e| GitError::IoError(format!("failed to update branch: {}", e)))?; - } else { - let new_branch = repo_branch::ActiveModel { - repo: Set(repo_id), - name: Set(name), - oid: Set(target_oid), - upstream: Set(upstream), - head: Set(is_branch && shorthand == self.repo.default_branch), - created_at: Set(now), - updated_at: Set(now), - ..Default::default() - }; - new_branch - .insert(txn) - .await - .map_err(|e| GitError::IoError(format!("failed to insert branch: {}", e)))?; - } - } - - if !existing_names.is_empty() { - repo_branch::Entity::delete_many() - .filter(repo_branch::Column::Repo.eq(repo_id)) - .filter(repo_branch::Column::Name.is_in(existing_names)) - .exec(txn) - .await - .map_err(|e| { - GitError::IoError(format!("failed to delete stale branches: {}", e)) - })?; - } - - // Persist auto-detected default branch and update head flags - if let Some(ref branch_name) = auto_detected_branch { - // 1. Update the repo's default_branch - repo::Entity::update_many() - .filter(repo::Column::Id.eq(repo_id)) - .col_expr( - repo::Column::DefaultBranch, - Expr::value(branch_name.clone()), - ) - .exec(txn) - .await - .map_err(|e| GitError::IoError(format!("failed to set default branch: {}", e)))?; - - // 2. Clear head on all branches - repo_branch::Entity::update_many() - .filter(repo_branch::Column::Repo.eq(repo_id)) - .col_expr(repo_branch::Column::Head, Expr::value(false)) - .exec(txn) - .await - .map_err(|e| GitError::IoError(format!("failed to clear head flags: {}", e)))?; - - // 3. Set head = true for the detected branch (it was inserted above) - repo_branch::Entity::update_many() - .filter(repo_branch::Column::Repo.eq(repo_id)) - .filter(repo_branch::Column::Name.eq(branch_name)) - .col_expr(repo_branch::Column::Head, Expr::value(true)) - .exec(txn) - .await - .map_err(|e| GitError::IoError(format!("failed to set head flag: {}", e)))?; - } - - Ok(()) - } -} +// sync_refs has been moved to commit.rs to consolidate sync helpers. +// Keeping this module stub to avoid breaking existing module references. diff --git a/libs/git/hook/sync/commit.rs b/libs/git/hook/sync/commit.rs index ca69188..02ded7d 100644 --- a/libs/git/hook/sync/commit.rs +++ b/libs/git/hook/sync/commit.rs @@ -10,7 +10,225 @@ use sea_orm::*; use sea_query::OnConflict; use std::collections::{HashMap, HashSet}; +/// Owned branch data collected from git2 (no git2 types after this). +#[derive(Debug, Clone)] +pub(crate) struct BranchTip { + pub name: String, + pub shorthand: String, + pub target_oid: String, + pub is_branch: bool, + pub is_remote: bool, + pub upstream: Option, +} + +/// Owned tag data collected from git2 (no git2 types after this). +/// Used by sync_tags; currently populated by collect_git_refs but not yet consumed. +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub(crate) struct TagTip { + pub name: String, + pub target_oid: String, + pub description: Option, + pub tagger_name: String, + pub tagger_email: String, +} + +/// Owned commit data collected from git2 (no git2 types after this). +#[derive(Debug)] +struct CommitData { + oid: String, + author_name: String, + author_email: String, + committer_name: String, + committer_email: String, + message: String, + parent_ids: Vec, +} + impl HookMetaDataSync { + /// Collect all git2 branch/tag data into owned structs. + /// This is sync and must be called from a `spawn_blocking` context. + pub(crate) fn collect_git_refs(&self) -> Result<(Vec, Vec), GitError> { + let mut branches = Vec::new(); + let mut tags = Vec::new(); + + let references = self + .domain + .repo() + .references() + .map_err(|e| GitError::Internal(e.to_string()))?; + + for ref_result in references { + let reference = match ref_result { + Ok(r) => r, + Err(e) => { + slog::warn!(self.logger, "failed to read reference: {}", e); + continue; + } + }; + + let name = match reference.name() { + Some(n) => n.to_string(), + None => continue, + }; + + let target_oid = match reference.target() { + Some(oid) => oid.to_string(), + None => continue, + }; + + let shorthand = reference.shorthand().unwrap_or("").to_string(); + let is_branch = reference.is_branch(); + let is_remote = reference.is_remote(); + + if reference.is_tag() { + tags.push(TagTip { + name: name.strip_prefix("refs/tags/").unwrap_or(&name).to_string(), + target_oid, + description: None, + tagger_name: String::new(), + tagger_email: String::new(), + }); + } else if is_branch && !is_remote { + let upstream = reference + .shorthand() + .map(|short| format!("refs/remotes/{{}}/{}", short)); + + branches.push(BranchTip { + name, + shorthand, + target_oid, + is_branch, + is_remote, + upstream, + }); + } + } + + Ok((branches, tags)) + } + + pub async fn sync_refs(&self, txn: &AppTransaction) -> Result<(), GitError> { + let repo_id = self.repo.id; + let now = chrono::Utc::now(); + + let existing: Vec = + models::repos::repo_branch::Entity::find() + .filter(models::repos::repo_branch::Column::Repo.eq(repo_id)) + .all(txn) + .await + .map_err(|e| GitError::IoError(format!("failed to query branches: {}", e)))?; + let mut existing_names: HashSet = + existing.iter().map(|r| r.name.clone()).collect(); + + let (branches, _) = self.collect_git_refs()?; + + // Auto-detect first local branch when default_branch is empty + let mut auto_detected_branch: Option = None; + + for branch in &branches { + if existing_names.contains(&branch.name) { + existing_names.remove(&branch.name); + models::repos::repo_branch::Entity::update_many() + .filter(models::repos::repo_branch::Column::Repo.eq(repo_id)) + .filter(models::repos::repo_branch::Column::Name.eq(&branch.name)) + .col_expr( + models::repos::repo_branch::Column::Oid, + sea_orm::prelude::Expr::value(&branch.target_oid), + ) + .col_expr( + models::repos::repo_branch::Column::Upstream, + sea_orm::prelude::Expr::value(branch.upstream.clone()), + ) + .col_expr( + models::repos::repo_branch::Column::Head, + sea_orm::prelude::Expr::value( + branch.is_branch + && branch.shorthand == self.repo.default_branch, + ), + ) + .col_expr( + models::repos::repo_branch::Column::UpdatedAt, + sea_orm::prelude::Expr::value(now), + ) + .exec(txn) + .await + .map_err(|e| GitError::IoError(format!("failed to update branch: {}", e)))?; + } else { + let new_branch = models::repos::repo_branch::ActiveModel { + repo: Set(repo_id), + name: Set(branch.name.clone()), + oid: Set(branch.target_oid.clone()), + upstream: Set(branch.upstream.clone()), + head: Set(branch.is_branch && branch.shorthand == self.repo.default_branch), + created_at: Set(now), + updated_at: Set(now), + ..Default::default() + }; + new_branch + .insert(txn) + .await + .map_err(|e| GitError::IoError(format!("failed to insert branch: {}", e)))?; + } + + // Detect first local branch if no default is set + if self.repo.default_branch.is_empty() + && branch.is_branch + && !branch.is_remote + && auto_detected_branch.is_none() + { + auto_detected_branch = Some(branch.shorthand.clone()); + } + } + + if !existing_names.is_empty() { + models::repos::repo_branch::Entity::delete_many() + .filter(models::repos::repo_branch::Column::Repo.eq(repo_id)) + .filter(models::repos::repo_branch::Column::Name.is_in(existing_names)) + .exec(txn) + .await + .map_err(|e| { + GitError::IoError(format!("failed to delete stale branches: {}", e)) + })?; + } + + // Persist auto-detected default branch and update head flags + if let Some(ref branch_name) = auto_detected_branch { + models::repos::repo::Entity::update_many() + .filter(models::repos::repo::Column::Id.eq(repo_id)) + .col_expr( + models::repos::repo::Column::DefaultBranch, + sea_orm::prelude::Expr::value(branch_name.clone()), + ) + .exec(txn) + .await + .map_err(|e| GitError::IoError(format!("failed to set default branch: {}", e)))?; + + models::repos::repo_branch::Entity::update_many() + .filter(models::repos::repo_branch::Column::Repo.eq(repo_id)) + .col_expr( + models::repos::repo_branch::Column::Head, + sea_orm::prelude::Expr::value(false), + ) + .exec(txn) + .await + .map_err(|e| GitError::IoError(format!("failed to clear head flags: {}", e)))?; + + models::repos::repo_branch::Entity::update_many() + .filter(models::repos::repo_branch::Column::Repo.eq(repo_id)) + .filter(models::repos::repo_branch::Column::Name.eq(branch_name)) + .col_expr( + models::repos::repo_branch::Column::Head, + sea_orm::prelude::Expr::value(true), + ) + .exec(txn) + .await + .map_err(|e| GitError::IoError(format!("failed to set head flag: {}", e)))?; + } + + Ok(()) + } + pub async fn sync_commits(&self, txn: &AppTransaction) -> Result<(), GitError> { let repo_id = self.repo.id; let repo = self.domain.repo(); @@ -200,7 +418,6 @@ impl HookMetaDataSync { created_at: Set(now), ..Default::default() }; - // Use ON CONFLICT DO NOTHING so concurrent syncs don't collide. let _ = RepoCollaborator::insert(new_collab) .on_conflict( OnConflict::columns([ @@ -233,13 +450,3 @@ impl HookMetaDataSync { names } } - -struct CommitData { - oid: String, - author_name: String, - author_email: String, - committer_name: String, - committer_email: String, - message: String, - parent_ids: Vec, -} diff --git a/libs/git/hook/sync/fsck.rs b/libs/git/hook/sync/fsck.rs index d6def1d..040a9c2 100644 --- a/libs/git/hook/sync/fsck.rs +++ b/libs/git/hook/sync/fsck.rs @@ -106,6 +106,7 @@ impl HookMetaDataSync { let _ = tokio::task::spawn_blocking(move || { for (ref_name, oid) in &refs { + // git update-ref -m — no old-sha in rollback let status = Command::new("git") .arg("-C") .arg(&storage_path) @@ -114,7 +115,6 @@ impl HookMetaDataSync { .arg("rollback: integrity check failed") .arg(ref_name) .arg(oid) - .arg("HEAD") .status(); match status { diff --git a/libs/git/hook/webhook_dispatch.rs b/libs/git/hook/webhook_dispatch.rs index 78986e4..985139a 100644 --- a/libs/git/hook/webhook_dispatch.rs +++ b/libs/git/hook/webhook_dispatch.rs @@ -384,7 +384,7 @@ pub async fn dispatch_repo_webhooks( async fn touch_webhook(db: &AppDatabase, webhook_id: i64, success: bool, logs: &slog::Logger) { use models::repos::repo_webhook::{Column as RwCol, Entity as RepoWebhookEntity}; use models::{ColumnTrait, EntityTrait, QueryFilter}; - use sea_orm::prelude::Expr; + use sea_orm::{sea_query::Expr, ExprTrait}; let result: Result = if success { RepoWebhookEntity::update_many() @@ -393,13 +393,13 @@ async fn touch_webhook(db: &AppDatabase, webhook_id: i64, success: bool, logs: & RwCol::LastDeliveredAt, Expr::value(Some(chrono::Utc::now())), ) - .col_expr(RwCol::TouchCount, Expr::value(1i64)) + .col_expr(RwCol::TouchCount, Expr::col(RwCol::TouchCount).add(1)) .exec(db.writer()) .await } else { RepoWebhookEntity::update_many() .filter(RwCol::Id.eq(webhook_id)) - .col_expr(RwCol::TouchCount, Expr::value(1i64)) + .col_expr(RwCol::TouchCount, Expr::col(RwCol::TouchCount).add(1)) .exec(db.writer()) .await };