gitdataai/libs/git/hook/sync/commit.rs
ZhenYi a4dc507b66 fix(git): multiple branch and upstream query bugs
1. branch_list: Fix is_head comparison
   - head_name now keeps full ref (e.g., "refs/heads/main")
   - Previously stripped to short name causing is_head to always be false

2. branch_get/branch_exists: Fix name resolution for branches with '/'
   - Previously, "feature/testing" was assumed to be a remote branch
   - Now tries both refs/heads/ and refs/remotes/ candidates
   - Correctly handles local branches like "feature/testing"

3. sync_refs: Fix upstream reference format
   - Was storing "refs/remotes/{}/branch" (broken pattern)
   - Now properly queries git2 for actual upstream branch name
2026-04-17 16:30:58 +08:00

467 lines
17 KiB
Rust

use crate::GitError;
use crate::hook::sync::HookMetaDataSync;
use db::database::AppTransaction;
use models::repos::RepoCollaborator;
use models::repos::RepoCommit;
use models::repos::repo_collaborator;
use models::repos::repo_commit;
use models::users::user_email;
use sea_orm::*;
use sea_query::OnConflict;
use std::collections::{HashMap, HashSet};
/// Owned branch data collected from git2 (no git2 types after this).
#[derive(Debug, Clone)]
pub(crate) struct BranchTip {
pub name: String,
pub shorthand: String,
pub target_oid: String,
pub is_branch: bool,
pub is_remote: bool,
pub upstream: Option<String>,
}
/// Owned tag data collected from git2 (no git2 types after this).
/// Consumed by sync_tags via collect_tag_refs().
#[derive(Debug, Clone)]
pub(crate) struct TagTip {
pub name: String,
pub target_oid: String,
pub description: Option<String>,
pub tagger_name: String,
pub tagger_email: String,
}
/// Owned commit data collected from git2 (no git2 types after this).
#[derive(Debug)]
struct CommitData {
oid: String,
author_name: String,
author_email: String,
committer_name: String,
committer_email: String,
message: String,
parent_ids: Vec<String>,
}
impl HookMetaDataSync {
/// Collect all git2 branch/tag data into owned structs.
/// This is sync and must be called from a `spawn_blocking` context.
pub(crate) fn collect_git_refs(&self) -> Result<(Vec<BranchTip>, Vec<TagTip>), GitError> {
let mut branches = Vec::new();
let mut tags = Vec::new();
let references = self
.domain
.repo()
.references()
.map_err(|e| GitError::Internal(e.to_string()))?;
for ref_result in references {
let reference = match ref_result {
Ok(r) => r,
Err(e) => {
slog::warn!(self.logger, "failed to read reference: {}", e);
continue;
}
};
let name = match reference.name() {
Some(n) => n.to_string(),
None => continue,
};
let target_oid = match reference.target() {
Some(oid) => oid.to_string(),
None => continue,
};
let shorthand = reference.shorthand().unwrap_or("").to_string();
let is_branch = reference.is_branch();
let is_remote = reference.is_remote();
if reference.is_tag() {
tags.push(TagTip {
name: name.strip_prefix("refs/tags/").unwrap_or(&name).to_string(),
target_oid,
description: None,
tagger_name: String::new(),
tagger_email: String::new(),
});
} else if is_branch && !is_remote {
// Try to get upstream branch name from the reference's upstream target
let upstream: Option<String> = if reference.target().is_some() {
if let Ok(branch) = self.domain.repo().find_branch(&name, git2::BranchType::Local) {
if let Ok(upstream_ref) = branch.upstream() {
if let Some(upstream_name) = upstream_ref.name().ok().flatten() {
Some(upstream_name.to_string())
} else {
None
}
} else {
None
}
} else {
None
}
} else {
None
};
branches.push(BranchTip {
name,
shorthand,
target_oid,
is_branch,
is_remote,
upstream,
});
}
}
Ok((branches, tags))
}
pub async fn sync_refs(&self, txn: &AppTransaction) -> Result<(), GitError> {
let repo_id = self.repo.id;
let now = chrono::Utc::now();
let existing: Vec<models::repos::repo_branch::Model> =
models::repos::repo_branch::Entity::find()
.filter(models::repos::repo_branch::Column::Repo.eq(repo_id))
.all(txn)
.await
.map_err(|e| GitError::IoError(format!("failed to query branches: {}", e)))?;
let mut existing_names: HashSet<String> =
existing.iter().map(|r| r.name.clone()).collect();
let (branches, _) = self.collect_git_refs()?;
// Auto-detect first local branch when default_branch is empty
let mut auto_detected_branch: Option<String> = None;
for branch in &branches {
if existing_names.contains(&branch.name) {
existing_names.remove(&branch.name);
models::repos::repo_branch::Entity::update_many()
.filter(models::repos::repo_branch::Column::Repo.eq(repo_id))
.filter(models::repos::repo_branch::Column::Name.eq(&branch.name))
.col_expr(
models::repos::repo_branch::Column::Oid,
sea_orm::prelude::Expr::value(&branch.target_oid),
)
.col_expr(
models::repos::repo_branch::Column::Upstream,
sea_orm::prelude::Expr::value(branch.upstream.clone()),
)
.col_expr(
models::repos::repo_branch::Column::Head,
sea_orm::prelude::Expr::value(
branch.is_branch
&& branch.shorthand == self.repo.default_branch,
),
)
.col_expr(
models::repos::repo_branch::Column::UpdatedAt,
sea_orm::prelude::Expr::value(now),
)
.exec(txn)
.await
.map_err(|e| GitError::IoError(format!("failed to update branch: {}", e)))?;
} else {
let new_branch = models::repos::repo_branch::ActiveModel {
repo: Set(repo_id),
name: Set(branch.name.clone()),
oid: Set(branch.target_oid.clone()),
upstream: Set(branch.upstream.clone()),
head: Set(branch.is_branch && branch.shorthand == self.repo.default_branch),
created_at: Set(now),
updated_at: Set(now),
..Default::default()
};
new_branch
.insert(txn)
.await
.map_err(|e| GitError::IoError(format!("failed to insert branch: {}", e)))?;
}
// Detect first local branch if no default is set
if self.repo.default_branch.is_empty()
&& branch.is_branch
&& !branch.is_remote
&& auto_detected_branch.is_none()
{
auto_detected_branch = Some(branch.shorthand.clone());
}
}
if !existing_names.is_empty() {
models::repos::repo_branch::Entity::delete_many()
.filter(models::repos::repo_branch::Column::Repo.eq(repo_id))
.filter(models::repos::repo_branch::Column::Name.is_in(existing_names))
.exec(txn)
.await
.map_err(|e| {
GitError::IoError(format!("failed to delete stale branches: {}", e))
})?;
}
// Persist auto-detected default branch and update head flags
if let Some(ref branch_name) = auto_detected_branch {
models::repos::repo::Entity::update_many()
.filter(models::repos::repo::Column::Id.eq(repo_id))
.col_expr(
models::repos::repo::Column::DefaultBranch,
sea_orm::prelude::Expr::value(branch_name.clone()),
)
.exec(txn)
.await
.map_err(|e| GitError::IoError(format!("failed to set default branch: {}", e)))?;
models::repos::repo_branch::Entity::update_many()
.filter(models::repos::repo_branch::Column::Repo.eq(repo_id))
.col_expr(
models::repos::repo_branch::Column::Head,
sea_orm::prelude::Expr::value(false),
)
.exec(txn)
.await
.map_err(|e| GitError::IoError(format!("failed to clear head flags: {}", e)))?;
models::repos::repo_branch::Entity::update_many()
.filter(models::repos::repo_branch::Column::Repo.eq(repo_id))
.filter(models::repos::repo_branch::Column::Name.eq(branch_name))
.col_expr(
models::repos::repo_branch::Column::Head,
sea_orm::prelude::Expr::value(true),
)
.exec(txn)
.await
.map_err(|e| GitError::IoError(format!("failed to set head flag: {}", e)))?;
}
Ok(())
}
pub async fn sync_commits(&self, txn: &AppTransaction) -> Result<(), GitError> {
let repo_id = self.repo.id;
let repo = self.domain.repo();
if repo.is_empty().unwrap_or(true) {
return Ok(());
}
let existing_oids: Vec<String> = RepoCommit::find()
.filter(repo_commit::Column::Repo.eq(repo_id))
.select_only()
.column(repo_commit::Column::Oid)
.into_tuple()
.all(txn)
.await
.map_err(|e| GitError::IoError(format!("failed to query commits: {}", e)))?;
let existing_set: HashSet<String> = existing_oids.into_iter().collect();
let branch_names = self.list_branch_names();
let mut new_oid_list: Vec<(git2::Oid, String)> = Vec::new();
for ref_name in &branch_names {
let mut revwalk = repo
.revwalk()
.map_err(|e| GitError::Internal(e.to_string()))?;
revwalk
.push_ref(ref_name)
.map_err(|e| GitError::Internal(e.to_string()))?;
revwalk
.set_sorting(git2::Sort::TOPOLOGICAL | git2::Sort::TIME)
.map_err(|e| GitError::Internal(e.to_string()))?;
for oid_result in revwalk {
let oid = oid_result.map_err(|e| GitError::Internal(e.to_string()))?;
let oid_str = oid.to_string();
if !existing_set.contains(&oid_str)
&& !new_oid_list.iter().any(|(_, s)| s == &oid_str)
{
new_oid_list.push((oid, oid_str));
}
}
}
if new_oid_list.is_empty() {
return Ok(());
}
let mut author_emails: Vec<String> = Vec::with_capacity(new_oid_list.len());
let mut committer_emails: Vec<String> = Vec::with_capacity(new_oid_list.len());
let mut commits_data: Vec<CommitData> = Vec::with_capacity(new_oid_list.len());
for (oid, oid_str) in &new_oid_list {
let commit = repo
.find_commit(*oid)
.map_err(|e| GitError::Internal(e.to_string()))?;
let author = commit.author();
let committer = commit.committer();
let a_email = author.email().unwrap_or("").to_string();
let c_email = committer.email().unwrap_or("").to_string();
author_emails.push(a_email.clone());
committer_emails.push(c_email.clone());
commits_data.push(CommitData {
oid: oid_str.clone(),
author_name: author.name().unwrap_or("").to_string(),
author_email: a_email,
committer_name: committer.name().unwrap_or("").to_string(),
committer_email: c_email,
message: commit.message().unwrap_or("").to_string(),
parent_ids: commit.parent_ids().map(|p| p.to_string()).collect(),
});
}
let user_map = self
.resolve_user_ids(&author_emails, &committer_emails, txn)
.await?;
let all_emails: Vec<&str> = author_emails
.iter()
.chain(committer_emails.iter())
.map(|s| s.as_str())
.collect();
self.ensure_collaborators(&all_emails, &user_map, txn)
.await?;
let now = chrono::Utc::now();
let mut batch = Vec::with_capacity(100);
for data in commits_data {
let author_uid = user_map.get(&data.author_email).copied();
let committer_uid = user_map.get(&data.committer_email).copied();
batch.push(repo_commit::ActiveModel {
repo: Set(repo_id),
oid: Set(data.oid),
author_name: Set(data.author_name),
author_email: Set(data.author_email),
author: Set(author_uid),
commiter_name: Set(data.committer_name),
commiter_email: Set(data.committer_email),
commiter: Set(committer_uid),
message: Set(data.message),
parent: Set(serde_json::json!(data.parent_ids)),
created_at: Set(now),
..Default::default()
});
if batch.len() >= 100 {
RepoCommit::insert_many(std::mem::take(&mut batch))
.exec(txn)
.await
.map_err(|e| GitError::IoError(format!("failed to insert commits: {}", e)))?;
}
}
if !batch.is_empty() {
RepoCommit::insert_many(batch)
.exec(txn)
.await
.map_err(|e| GitError::IoError(format!("failed to insert commits: {}", e)))?;
}
Ok(())
}
async fn resolve_user_ids(
&self,
author_emails: &[String],
committer_emails: &[String],
txn: &AppTransaction,
) -> Result<HashMap<String, models::UserId>, GitError> {
let mut emails: Vec<&str> =
Vec::with_capacity(author_emails.len() + committer_emails.len());
for e in author_emails {
emails.push(e.as_str());
}
for e in committer_emails {
emails.push(e.as_str());
}
let rows: Vec<(String, models::UserId)> = user_email::Entity::find()
.filter(user_email::Column::Email.is_in(emails))
.select_only()
.column(user_email::Column::Email)
.column(user_email::Column::User)
.into_tuple()
.all(txn)
.await
.map_err(|e| GitError::IoError(format!("failed to query user emails: {}", e)))?;
let mut map = HashMap::new();
for (email, uid) in rows {
map.insert(email, uid);
}
Ok(map)
}
async fn ensure_collaborators(
&self,
emails: &[&str],
user_map: &HashMap<String, models::UserId>,
txn: &AppTransaction,
) -> Result<(), GitError> {
let repo_id = self.repo.id;
let existing: Vec<(models::UserId,)> = RepoCollaborator::find()
.filter(repo_collaborator::Column::Repo.eq(repo_id))
.select_only()
.column(repo_collaborator::Column::User)
.into_tuple()
.all(txn)
.await
.map_err(|e| GitError::IoError(format!("failed to query collaborators: {}", e)))?;
let existing_set: HashSet<models::UserId> =
existing.into_iter().map(|(uid,)| uid).collect();
let now = chrono::Utc::now();
for &email in emails {
if let Some(&uid) = user_map.get(email) {
if !existing_set.contains(&uid) {
let new_collab = repo_collaborator::ActiveModel {
repo: Set(repo_id),
user: Set(uid),
scope: Set("read".to_string()),
created_at: Set(now),
..Default::default()
};
let _ = RepoCollaborator::insert(new_collab)
.on_conflict(
OnConflict::columns([
repo_collaborator::Column::Repo,
repo_collaborator::Column::User,
])
.do_nothing()
.to_owned(),
)
.exec(txn)
.await;
}
}
}
Ok(())
}
fn list_branch_names(&self) -> Vec<String> {
let mut names = Vec::new();
if let Ok(refs) = self.domain.repo().references() {
for r in refs.flatten() {
if r.is_branch() && !r.is_remote() {
if let Some(name) = r.name() {
names.push(name.to_string());
}
}
}
}
names
}
}