gitdataai/lib/git/sync/commit.rs

234 lines
7.3 KiB
Rust

use std::collections::{HashMap, HashSet};
use chrono::{DateTime, Utc};
use db::{database::AppDatabase, sqlx};
use model::repos::RepoCommitterModel;
use uuid::Uuid;
use crate::{bare::GitBare, cmd::oid::ObjectId, errors::GitError};
pub async fn sync_commits(
db: &AppDatabase,
bare: &GitBare,
repo_id: Uuid,
) -> Result<(), GitError> {
let repo = bare.gix_repo()?;
let pool = db.writer();
let existing_oids: Vec<String> = sqlx::query_scalar::<_, String>(
"SELECT sha FROM repo_commit WHERE repo = $1",
)
.bind(repo_id)
.fetch_all(pool)
.await
.map_err(|e| {
GitError::Internal(format!("failed to query commits: {}", e))
})?;
let existing_set: HashSet<String> = existing_oids.into_iter().collect();
let head_id = repo
.head_id()
.map_err(|e| {
GitError::Internal(format!("failed to resolve HEAD: {}", e))
})?
.detach();
let tips = {
let refs = repo.references().map_err(|e| {
GitError::Internal(format!("failed to open references: {}", e))
})?;
let iter = refs.all().map_err(|e| {
GitError::Internal(format!("failed to iterate refs: {}", e))
})?;
let mut tips = vec![head_id];
for ref_result in iter {
let reference = ref_result.map_err(|e| {
GitError::Internal(format!("ref iteration error: {}", e))
})?;
let name = reference.name().as_bstr().to_string();
if !name.starts_with("refs/heads/") {
continue;
}
if let Some(target_id) = reference.target().try_id() {
let hex = target_id.to_hex().to_string();
if let Ok(gix_id) =
gix::hash::ObjectId::from_hex(hex.as_bytes())
{
tips.push(gix_id);
}
}
}
tips
};
let platform = repo.rev_walk(tips).sorting(
gix::revision::walk::Sorting::ByCommitTime(
gix::traverse::commit::simple::CommitTimeOrder::NewestFirst,
),
);
let walk = platform
.all()
.map_err(|e| GitError::Internal(format!("rev_walk failed: {}", e)))?;
let mut new_commits: Vec<gix::hash::ObjectId> = Vec::new();
for info in walk {
let info = info.map_err(|e| {
GitError::Internal(format!("walk step error: {}", e))
})?;
let hex = info.id().detach().to_hex().to_string();
if !existing_set.contains(&hex) {
new_commits.push(info.id().detach());
}
}
if new_commits.is_empty() {
return Ok(());
}
let mut committer_map: HashMap<String, (Uuid, String)> = HashMap::new(); // email → (committer_id, name)
let existing_committers: Vec<RepoCommitterModel> = sqlx::query_as::<_, RepoCommitterModel>(
"SELECT id, repo, \"user\", name, email, created_at, updated_at FROM repo_committer WHERE repo = $1",
)
.bind(repo_id)
.fetch_all(pool)
.await
.map_err(|e| GitError::Internal(format!("failed to query repo_committer: {}", e)))?;
for model in &existing_committers {
committer_map
.insert(model.email.clone(), (model.id, model.name.clone()));
}
let email_map = resolve_user_ids(db, &committer_map).await?;
let now = Utc::now();
for gix_id in &new_commits {
let hex_oid = gix_id.to_hex().to_string();
let oid = ObjectId::new(&hex_oid);
let commit_meta = bare.commit_info(oid).map_err(|e| {
GitError::Internal(format!(
"commit_info failed for {}: {}",
hex_oid, e
))
})?;
let author_committer_id = ensure_committer(
&mut committer_map,
pool,
repo_id,
&commit_meta.author.email,
&commit_meta.author.name,
&email_map,
now,
)
.await?;
let committer_committer_id = ensure_committer(
&mut committer_map,
pool,
repo_id,
&commit_meta.committer.email,
&commit_meta.committer.name,
&email_map,
now,
)
.await?;
let parent_shas = commit_meta
.parent_ids
.iter()
.map(|p| p.as_str())
.collect::<Vec<_>>()
.join(".");
let authored_at = git_time_to_datetime(
commit_meta.author.time_secs,
commit_meta.author.offset_minutes,
);
let committed_at = git_time_to_datetime(
commit_meta.committer.time_secs,
commit_meta.committer.offset_minutes,
);
let new_id = Uuid::new_v4();
sqlx::query(
"INSERT INTO repo_commit (id, repo, sha, tree_sha, parent_shas, author, committer, message, authored_at, committed_at, created_at) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)"
)
.bind(new_id)
.bind(repo_id)
.bind(&hex_oid)
.bind(commit_meta.tree_id.as_str())
.bind(&parent_shas)
.bind(author_committer_id)
.bind(committer_committer_id)
.bind(&commit_meta.message)
.bind(authored_at)
.bind(committed_at)
.bind(now)
.execute(pool)
.await
.map_err(|e| GitError::Internal(format!("failed to insert commit: {}", e)))?;
}
Ok(())
}
async fn resolve_user_ids(
db: &AppDatabase,
committer_map: &HashMap<String, (Uuid, String)>,
) -> Result<HashMap<String, Uuid>, GitError> {
if committer_map.is_empty() {
return Ok(HashMap::new());
}
let pool = db.writer();
let email_vec: Vec<String> = committer_map.keys().cloned().collect();
let rows: Vec<(Uuid, String)> = sqlx::query_as(
"SELECT \"user\", email FROM user_email WHERE email = ANY($1) AND active = true",
)
.bind(&email_vec)
.fetch_all(pool)
.await
.map_err(|e| GitError::Internal(format!("failed to query user emails: {}", e)))?;
let mut map = HashMap::new();
for (user_id, email) in rows {
map.insert(email, user_id);
}
Ok(map)
}
async fn ensure_committer(
committer_map: &mut HashMap<String, (Uuid, String)>,
pool: &sqlx::Pool<sqlx::Postgres>,
repo_id: Uuid,
email: &str,
name: &str,
email_map: &HashMap<String, Uuid>,
now: DateTime<Utc>,
) -> Result<Uuid, GitError> {
if let Some((id, _)) = committer_map.get(email) {
return Ok(*id);
}
let user_id = email_map.get(email).copied();
let new_id = Uuid::new_v4();
sqlx::query(
"INSERT INTO repo_committer (id, repo, \"user\", name, email, created_at, updated_at) \
VALUES ($1, $2, $3, $4, $5, $6, $7)"
)
.bind(new_id)
.bind(repo_id)
.bind(user_id)
.bind(name)
.bind(email)
.bind(now)
.bind(now)
.execute(pool)
.await
.map_err(|e| GitError::Internal(format!("failed to insert repo_committer: {}", e)))?;
committer_map.insert(email.to_string(), (new_id, name.to_string()));
Ok(new_id)
}
fn git_time_to_datetime(secs: i64, offset_minutes: i32) -> DateTime<Utc> {
let utc_secs = secs - (offset_minutes as i64 * 60);
DateTime::from_timestamp(utc_secs, 0).unwrap_or_else(|| Utc::now())
}