use crate::GitError; use crate::hook::sync::HookMetaDataSync; use db::database::AppTransaction; use models::repos::RepoCollaborator; use models::repos::RepoCommit; use models::repos::repo_collaborator; use models::repos::repo_commit; use models::users::user_email; use sea_orm::*; use sea_query::OnConflict; use std::collections::{HashMap, HashSet}; impl HookMetaDataSync { pub async fn sync_commits(&self, txn: &AppTransaction) -> Result<(), GitError> { let repo_id = self.repo.id; let repo = self.domain.repo(); if repo.is_empty().unwrap_or(true) { return Ok(()); } let existing_oids: Vec = RepoCommit::find() .filter(repo_commit::Column::Repo.eq(repo_id)) .select_only() .column(repo_commit::Column::Oid) .into_tuple() .all(txn) .await .map_err(|e| GitError::IoError(format!("failed to query commits: {}", e)))?; let existing_set: HashSet = existing_oids.into_iter().collect(); let branch_names = self.list_branch_names(); let mut new_oid_list: Vec<(git2::Oid, String)> = Vec::new(); for ref_name in &branch_names { let mut revwalk = repo .revwalk() .map_err(|e| GitError::Internal(e.to_string()))?; revwalk .push_ref(ref_name) .map_err(|e| GitError::Internal(e.to_string()))?; revwalk .set_sorting(git2::Sort::TOPOLOGICAL | git2::Sort::TIME) .map_err(|e| GitError::Internal(e.to_string()))?; for oid_result in revwalk { let oid = oid_result.map_err(|e| GitError::Internal(e.to_string()))?; let oid_str = oid.to_string(); if !existing_set.contains(&oid_str) && !new_oid_list.iter().any(|(_, s)| s == &oid_str) { new_oid_list.push((oid, oid_str)); } } } if new_oid_list.is_empty() { return Ok(()); } let mut author_emails: Vec = Vec::with_capacity(new_oid_list.len()); let mut committer_emails: Vec = Vec::with_capacity(new_oid_list.len()); let mut commits_data: Vec = Vec::with_capacity(new_oid_list.len()); for (oid, oid_str) in &new_oid_list { let commit = repo .find_commit(*oid) .map_err(|e| GitError::Internal(e.to_string()))?; let author = commit.author(); let committer = commit.committer(); let a_email = author.email().unwrap_or("").to_string(); let c_email = committer.email().unwrap_or("").to_string(); author_emails.push(a_email.clone()); committer_emails.push(c_email.clone()); commits_data.push(CommitData { oid: oid_str.clone(), author_name: author.name().unwrap_or("").to_string(), author_email: a_email, committer_name: committer.name().unwrap_or("").to_string(), committer_email: c_email, message: commit.message().unwrap_or("").to_string(), parent_ids: commit.parent_ids().map(|p| p.to_string()).collect(), }); } let user_map = self .resolve_user_ids(&author_emails, &committer_emails, txn) .await?; let all_emails: Vec<&str> = author_emails .iter() .chain(committer_emails.iter()) .map(|s| s.as_str()) .collect(); self.ensure_collaborators(&all_emails, &user_map, txn) .await?; let now = chrono::Utc::now(); let mut batch = Vec::with_capacity(100); for data in commits_data { let author_uid = user_map.get(&data.author_email).copied(); let committer_uid = user_map.get(&data.committer_email).copied(); batch.push(repo_commit::ActiveModel { repo: Set(repo_id), oid: Set(data.oid), author_name: Set(data.author_name), author_email: Set(data.author_email), author: Set(author_uid), commiter_name: Set(data.committer_name), commiter_email: Set(data.committer_email), commiter: Set(committer_uid), message: Set(data.message), parent: Set(serde_json::json!(data.parent_ids)), created_at: Set(now), ..Default::default() }); if batch.len() >= 100 { RepoCommit::insert_many(std::mem::take(&mut batch)) .exec(txn) .await .map_err(|e| GitError::IoError(format!("failed to insert commits: {}", e)))?; } } if !batch.is_empty() { RepoCommit::insert_many(batch) .exec(txn) .await .map_err(|e| GitError::IoError(format!("failed to insert commits: {}", e)))?; } Ok(()) } async fn resolve_user_ids( &self, author_emails: &[String], committer_emails: &[String], txn: &AppTransaction, ) -> Result, GitError> { let mut emails: Vec<&str> = Vec::with_capacity(author_emails.len() + committer_emails.len()); for e in author_emails { emails.push(e.as_str()); } for e in committer_emails { emails.push(e.as_str()); } let rows: Vec<(String, models::UserId)> = user_email::Entity::find() .filter(user_email::Column::Email.is_in(emails)) .select_only() .column(user_email::Column::Email) .column(user_email::Column::User) .into_tuple() .all(txn) .await .map_err(|e| GitError::IoError(format!("failed to query user emails: {}", e)))?; let mut map = HashMap::new(); for (email, uid) in rows { map.insert(email, uid); } Ok(map) } async fn ensure_collaborators( &self, emails: &[&str], user_map: &HashMap, txn: &AppTransaction, ) -> Result<(), GitError> { let repo_id = self.repo.id; let existing: Vec<(models::UserId,)> = RepoCollaborator::find() .filter(repo_collaborator::Column::Repo.eq(repo_id)) .select_only() .column(repo_collaborator::Column::User) .into_tuple() .all(txn) .await .map_err(|e| GitError::IoError(format!("failed to query collaborators: {}", e)))?; let existing_set: HashSet = existing.into_iter().map(|(uid,)| uid).collect(); let now = chrono::Utc::now(); for &email in emails { if let Some(&uid) = user_map.get(email) { if !existing_set.contains(&uid) { let new_collab = repo_collaborator::ActiveModel { repo: Set(repo_id), user: Set(uid), scope: Set("read".to_string()), created_at: Set(now), ..Default::default() }; // Use ON CONFLICT DO NOTHING so concurrent syncs don't collide. let _ = RepoCollaborator::insert(new_collab) .on_conflict( OnConflict::columns([ repo_collaborator::Column::Repo, repo_collaborator::Column::User, ]) .do_nothing() .to_owned(), ) .exec(txn) .await; } } } Ok(()) } fn list_branch_names(&self) -> Vec { let mut names = Vec::new(); if let Ok(refs) = self.domain.repo().references() { for r in refs.flatten() { if r.is_branch() && !r.is_remote() { if let Some(name) = r.name() { names.push(name.to_string()); } } } } names } } struct CommitData { oid: String, author_name: String, author_email: String, committer_name: String, committer_email: String, message: String, parent_ids: Vec, }