gitdataai/libs/git/hook/sync/mod.rs
ZhenYi 3faaff6220 refactor(git): expand hook sync with skill scanning and multi-root discovery
Update sync module to support .claude/skills and .codex/skills roots,
add system/source tracking to discovered skills, and refactor
migration path for the new SQL-file based migrator.
2026-05-18 20:43:16 +08:00

682 lines
22 KiB
Rust

pub mod branch;
pub mod commit;
pub mod fsck;
pub mod gc;
pub mod lfs;
pub mod lock;
pub mod tag;
use db::cache::AppCache;
use db::database::AppDatabase;
use models::ActiveModelTrait;
use models::RepoId;
use models::projects::project_skill::ActiveModel as SkillActiveModel;
use models::projects::project_skill::{Column as SkillCol, Entity as SkillEntity};
use models::repos::repo::Model as RepoModel;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, Set};
use std::collections::HashMap;
use std::path::Path;
use crate::GitDomain;
// ── Skill discovery (local, no service crate dependency) ────────────────────────
use sha1::Digest;
const SKILL_ROOTS: &[(&str, &str)] = &[(".claude/skills", "claude"), (".codex/skills", "codex")];
const ROOT_SKILL_SYSTEM: &str = "root";
fn should_descend_dir(name: &str) -> bool {
name != ".git"
}
/// Recursively scan supported skill locations for files named `SKILL.md`.
/// Root-level skill packs keep the legacy slug `{short_repo_id}/{skill_dir}`.
/// System skills use `{short_repo_id}/{system}/{relative_skill_dir}`.
fn scan_skills_from_dir(
base: &Path,
repo_id: &RepoId,
commit_sha: &str,
) -> Result<Vec<DiscoveredSkill>, std::io::Error> {
let repo_id_prefix = &repo_id.to_string()[..8];
let mut discovered = Vec::new();
for (root, system) in SKILL_ROOTS {
let root_path = base.join(root);
if root_path.exists() {
scan_skill_root_from_dir(
&root_path,
repo_id_prefix,
system,
root,
commit_sha,
&mut discovered,
);
}
}
scan_root_skill_pack_from_dir(base, repo_id_prefix, commit_sha, &mut discovered);
Ok(discovered)
}
fn scan_skill_root_from_dir(
root_path: &Path,
repo_id_prefix: &str,
system: &str,
root: &str,
commit_sha: &str,
discovered: &mut Vec<DiscoveredSkill>,
) {
let mut stack = vec![root_path.to_path_buf()];
while let Some(dir) = stack.pop() {
let entries = match std::fs::read_dir(&dir) {
Ok(e) => e,
Err(_) => continue,
};
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
stack.push(path);
continue;
}
if !is_skill_file_name(&path) {
continue;
}
let Some(parent) = path.parent() else {
continue;
};
let relative_skill_dir = parent
.strip_prefix(root_path)
.ok()
.and_then(path_to_slug)
.filter(|s| !s.is_empty());
let Some(relative_skill_dir) = relative_skill_dir else {
continue;
};
let slug = format!("{}/{}/{}", repo_id_prefix, system, relative_skill_dir);
if let Ok(raw) = std::fs::read(&path) {
let blob_hash = git_blob_hash(&raw);
let mut skill = parse_skill_content(&slug, &raw);
skill.commit_sha = Some(commit_sha.to_string());
skill.blob_hash = Some(blob_hash);
skill.metadata = enrich_metadata(
skill.metadata,
system,
Some(&format!("{}/{}/SKILL.md", root, relative_skill_dir)),
);
discovered.push(skill);
}
}
}
}
fn scan_root_skill_pack_from_dir(
base: &Path,
repo_id_prefix: &str,
commit_sha: &str,
discovered: &mut Vec<DiscoveredSkill>,
) {
let entries = match std::fs::read_dir(base) {
Ok(e) => e,
Err(_) => return,
};
for entry in entries.flatten() {
let path = entry.path();
if !path.is_dir() {
continue;
}
let Some(dir_name) = path.file_name().and_then(|n| n.to_str()) else {
continue;
};
if dir_name == ".git" || dir_name == ".claude" || dir_name == ".codex" {
continue;
}
let skill_file = path.join("SKILL.md");
if !skill_file.exists() {
continue;
}
let relative_skill_dir = slugify_segment(dir_name);
if relative_skill_dir.is_empty() {
continue;
}
let slug = format!("{}/{}", repo_id_prefix, relative_skill_dir);
if let Ok(raw) = std::fs::read(&skill_file) {
let blob_hash = git_blob_hash(&raw);
let mut skill = parse_skill_content(&slug, &raw);
skill.commit_sha = Some(commit_sha.to_string());
skill.blob_hash = Some(blob_hash);
skill.metadata = enrich_metadata(
skill.metadata,
ROOT_SKILL_SYSTEM,
Some(&format!("{}/SKILL.md", relative_skill_dir)),
);
discovered.push(skill);
}
}
}
fn git_blob_hash(content: &[u8]) -> String {
let size = content.len();
let header = format!("blob {}\0", size);
let mut hasher = sha1::Sha1::new();
hasher.update(header.as_bytes());
hasher.update(content);
hex::encode(hasher.finalize())
}
fn parse_frontmatter(frontmatter: Option<&str>) -> serde_json::Value {
frontmatter
.and_then(|fm| serde_json::from_str(fm).ok())
.or_else(|| frontmatter.and_then(|fm| serde_yaml::from_str(fm).ok()))
.unwrap_or_default()
}
fn parse_skill_content(slug: &str, raw: &[u8]) -> DiscoveredSkill {
let content = String::from_utf8_lossy(raw);
let (frontmatter, body) = extract_frontmatter(&content);
let metadata = parse_frontmatter(frontmatter);
let name = metadata
.get("name")
.and_then(|v| v.as_str())
.map(String::from)
.unwrap_or_else(|| slug.replace('-', " ").replace('_', " "));
let description = metadata
.get("description")
.and_then(|v| v.as_str())
.map(String::from);
DiscoveredSkill {
slug: slug.to_string(),
name,
description,
content: body.trim().to_string(),
metadata,
commit_sha: None,
blob_hash: None,
}
}
struct DiscoveredSkill {
slug: String,
name: String,
description: Option<String>,
content: String,
metadata: serde_json::Value,
commit_sha: Option<String>,
blob_hash: Option<String>,
}
fn is_skill_file_name(path: &Path) -> bool {
path.file_name()
.and_then(|n| n.to_str())
.is_some_and(|name| name.eq_ignore_ascii_case("SKILL.md"))
}
fn path_to_slug(path: &Path) -> Option<String> {
let parts: Vec<String> = path
.components()
.filter_map(|c| c.as_os_str().to_str())
.map(slugify_segment)
.filter(|s| !s.is_empty())
.collect();
(!parts.is_empty()).then(|| parts.join("/"))
}
fn slugify_segment(input: &str) -> String {
let mut out = String::with_capacity(input.len());
let mut last_dash = false;
for ch in input.chars() {
let ch = ch.to_ascii_lowercase();
if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
out.push(ch);
last_dash = false;
} else if !last_dash {
out.push('-');
last_dash = true;
}
}
out.trim_matches('-').to_string()
}
fn enrich_metadata(
mut metadata: serde_json::Value,
system: &str,
relative_path: Option<&str>,
) -> serde_json::Value {
if !metadata.is_object() {
metadata = serde_json::json!({});
}
if let Some(obj) = metadata.as_object_mut() {
obj.entry("system")
.or_insert_with(|| serde_json::Value::String(system.to_string()));
if let Some(relative_path) = relative_path {
obj.entry("path")
.or_insert_with(|| serde_json::Value::String(relative_path.to_string()));
}
}
metadata
}
fn extract_frontmatter(raw: &str) -> (Option<&str>, &str) {
let trimmed = raw.trim_start();
if !trimmed.starts_with("---") {
return (None, trimmed);
}
if let Some(end) = trimmed[3..].find("---") {
let fm = &trimmed[3..end + 3];
let rest = trimmed[3 + end + 3..].trim_start();
(Some(fm), rest)
} else {
(None, trimmed)
}
}
/// Scan git tree objects for `SKILL.md` files (works for bare repos).
fn scan_skills_from_tree(
git_repo: &git2::Repository,
repo_id: &RepoId,
commit_sha: &str,
) -> Result<Vec<DiscoveredSkill>, String> {
let repo_id_prefix = &repo_id.to_string()[..8];
let head = git_repo.head().map_err(|e| format!("no HEAD: {e}"))?;
let tree = head.peel_to_tree().map_err(|e| format!("no tree: {e}"))?;
let mut discovered = Vec::new();
let mut stack: Vec<(git2::Tree<'_>, String)> = vec![(tree, String::new())];
while let Some((current_tree, prefix)) = stack.pop() {
for entry in current_tree.iter() {
let name = match entry.name() {
Some(n) => n,
None => continue,
};
let entry_path = if prefix.is_empty() {
name.to_string()
} else {
format!("{}/{}", prefix, name)
};
match entry.kind() {
Some(git2::ObjectType::Tree) => {
if should_descend_dir(name) {
if let Ok(subtree) =
entry.to_object(git_repo).and_then(|o| o.peel_to_tree())
{
stack.push((subtree, entry_path));
}
}
}
Some(git2::ObjectType::Blob) if name.eq_ignore_ascii_case("SKILL.md") => {
let Some((system, relative_skill_dir, legacy_slug)) =
skill_location_from_path(&entry_path)
else {
continue;
};
let slug = if legacy_slug {
format!("{}/{}", repo_id_prefix, relative_skill_dir)
} else {
format!("{}/{}/{}", repo_id_prefix, system, relative_skill_dir)
};
if let Ok(blob) = entry.to_object(git_repo).and_then(|o| o.peel_to_blob()) {
let raw = blob.content();
let blob_hash = git_blob_hash(raw);
let mut skill = parse_skill_content(&slug, raw);
skill.commit_sha = Some(commit_sha.to_string());
skill.blob_hash = Some(blob_hash);
skill.metadata = enrich_metadata(skill.metadata, system, Some(&entry_path));
discovered.push(skill);
}
}
_ => {}
}
}
}
Ok(discovered)
}
fn skill_location_from_path(path: &str) -> Option<(&'static str, String, bool)> {
let normalized = path.replace('\\', "/");
for (root, system) in SKILL_ROOTS {
let prefix = format!("{}/", root);
let suffix = "/SKILL.md";
if normalized.starts_with(&prefix) && normalized.ends_with(suffix) {
let relative = &normalized[prefix.len()..normalized.len() - suffix.len()];
let slug = relative
.split('/')
.map(slugify_segment)
.filter(|s| !s.is_empty())
.collect::<Vec<_>>()
.join("/");
if !slug.is_empty() {
return Some((*system, slug, false));
}
}
}
let suffix = "/SKILL.md";
if normalized.ends_with(suffix) && !normalized.starts_with('.') {
let relative = &normalized[..normalized.len() - suffix.len()];
if !relative.contains('/') {
let slug = slugify_segment(relative);
if !slug.is_empty() {
return Some((ROOT_SKILL_SYSTEM, slug, true));
}
}
}
None
}
#[derive(Clone)]
pub struct HookMetaDataSync {
pub db: AppDatabase,
pub cache: AppCache,
pub repo: RepoModel,
pub domain: GitDomain,
}
impl HookMetaDataSync {
pub fn new(db: AppDatabase, cache: AppCache, repo: RepoModel) -> Result<Self, crate::GitError> {
let domain = GitDomain::from_model(repo.clone())?;
Ok(Self {
db,
cache,
repo,
domain,
})
}
/// Full sync with lock. Caller (worker) manages locking.
pub async fn sync(&self) -> Result<(), crate::GitError> {
let lock_value = self.acquire_lock().await?;
let res = self.sync_work().await;
if let Err(ref e) = res {
tracing::error!("sync failed error={}", e);
}
let _ = self.release_lock(&lock_value).await;
res
}
/// Fsck only with lock. Caller manages locking.
pub async fn fsck_only(&self) -> Result<(), crate::GitError> {
let lock_value = self.acquire_lock().await?;
let res = self.fsck_work().await;
let _ = self.release_lock(&lock_value).await;
res
}
/// GC only with lock. Caller manages locking.
pub async fn gc_only(&self) -> Result<(), crate::GitError> {
let lock_value = self.acquire_lock().await?;
let res = self.gc_work().await;
let _ = self.release_lock(&lock_value).await;
res
}
/// Full sync pipeline (no locking — caller is responsible).
async fn sync_work(&self) -> Result<(), crate::GitError> {
let mut txn =
self.db.begin().await.map_err(|e| {
crate::GitError::IoError(format!("failed to begin transaction: {}", e))
})?;
self.sync_refs(&mut txn).await?;
self.sync_commits(&mut txn).await?;
self.sync_tags(&mut txn).await?;
self.sync_lfs_objects(&mut txn).await?;
self.run_fsck_and_rollback_if_corrupt(&mut txn).await?;
txn.commit().await.map_err(|e| {
crate::GitError::IoError(format!("failed to commit transaction: {}", e))
})?;
self.run_gc().await?;
self.sync_skills().await;
Ok(())
}
/// Fsck only work (no locking — caller is responsible).
async fn fsck_work(&self) -> Result<(), crate::GitError> {
let mut txn =
self.db.begin().await.map_err(|e| {
crate::GitError::IoError(format!("failed to begin transaction: {}", e))
})?;
self.run_fsck_and_rollback_if_corrupt(&mut txn).await?;
txn.commit().await.map_err(|e| {
crate::GitError::IoError(format!("failed to commit transaction: {}", e))
})?;
Ok(())
}
/// GC only work (no locking — caller is responsible).
async fn gc_work(&self) -> Result<(), crate::GitError> {
self.run_gc().await
}
/// Returns a list of (branch_name, oid) for all local branches.
pub fn list_branch_tips(&self) -> Vec<(String, String)> {
let repo = self.domain.repo();
let mut tips = Vec::new();
if let Ok(refs) = repo.references() {
for ref_result in refs {
if let Ok(r) = ref_result {
if r.is_branch() && !r.is_remote() {
if let Some(name) = r.name() {
let branch = name.strip_prefix("refs/heads/").unwrap_or(name);
if let Some(target) = r.target() {
tips.push((branch.to_string(), target.to_string()));
}
}
}
}
}
}
tips
}
/// Returns a list of (tag_name, oid) for all tags.
pub fn list_tag_tips(&self) -> Vec<(String, String)> {
let repo = self.domain.repo();
let mut tips = Vec::new();
if let Ok(refs) = repo.references() {
for ref_result in refs {
if let Ok(r) = ref_result {
if r.is_tag() {
if let Some(name) = r.name() {
let tag = name.strip_prefix("refs/tags/").unwrap_or(name);
if let Some(target) = r.target() {
tips.push((tag.to_string(), target.to_string()));
}
}
}
}
}
}
tips
}
/// Scan the repository for `SKILL.md` files and sync skills to the project.
/// Best-effort — failures are logged but do not fail the sync.
pub async fn sync_skills(&self) {
let project_uid = self.repo.project;
let git_repo = self.domain.repo();
let commit_sha = git_repo
.head()
.ok()
.and_then(|h| h.target())
.map(|oid| oid.to_string())
.unwrap_or_default();
let repo_id = self.repo.id;
let is_bare = git_repo.is_bare() || git_repo.workdir().is_none();
let discovered = if is_bare {
// Bare repo: scan git tree objects directly
let git_repo_ref = self.domain.repo();
match scan_skills_from_tree(git_repo_ref, &repo_id, &commit_sha) {
Ok(d) => d,
Err(e) => {
tracing::warn!("failed to scan skills from tree error={}", e);
return;
}
}
} else {
// Normal repo: walk filesystem
let repo_root = match git_repo.workdir() {
Some(path) => path.to_path_buf(),
None => {
tracing::warn!("workdir not available for non-bare repo");
return;
}
};
match tokio::task::spawn_blocking(move || {
scan_skills_from_dir(&repo_root, &repo_id, &commit_sha)
})
.await
{
Ok(Ok(d)) => d,
Ok(Err(e)) => {
tracing::warn!("failed to scan skills directory error={}", e);
return;
}
Err(e) => {
tracing::warn!("spawn_blocking join error error={}", e);
return;
}
}
};
if discovered.is_empty() {
return;
}
let now = chrono::Utc::now();
let mut created = 0i64;
let mut updated = 0i64;
let mut removed = 0i64;
let existing: Vec<_> = match SkillEntity::find()
.filter(SkillCol::ProjectUuid.eq(project_uid))
.filter(SkillCol::Source.eq("repo"))
.filter(SkillCol::RepoId.eq(self.repo.id))
.all(&self.db)
.await
{
Ok(e) => e,
Err(e) => {
tracing::warn!("failed to query existing skills error={}", e);
return;
}
};
// Deduplicate by stable slug. Blob hash changes when content changes and must not be the
// upsert key because project_skill has a unique (project_uuid, slug) constraint.
let mut deduped: std::collections::HashMap<String, DiscoveredSkill> =
std::collections::HashMap::new();
for skill in discovered {
match deduped.get(&skill.slug) {
Some(existing) => {
if skill.commit_sha.as_ref().unwrap_or(&String::new())
> existing.commit_sha.as_ref().unwrap_or(&String::new())
{
deduped.insert(skill.slug.clone(), skill);
}
}
None => {
deduped.insert(skill.slug.clone(), skill);
}
}
}
let existing_by_slug: HashMap<_, _> =
existing.into_iter().map(|s| (s.slug.clone(), s)).collect();
let mut seen_keys = std::collections::HashSet::new();
for (key, skill) in deduped {
seen_keys.insert(key.clone());
let json_meta = serde_json::to_value(&skill.metadata).unwrap_or_default();
if let Some(existing_skill) = existing_by_slug.get(&key) {
if existing_skill.content != skill.content
|| existing_skill.metadata != json_meta
|| existing_skill.commit_sha != skill.commit_sha
|| existing_skill.blob_hash != skill.blob_hash
|| existing_skill.name != skill.name
|| existing_skill.description != skill.description
{
let mut active: SkillActiveModel = existing_skill.clone().into();
active.name = Set(skill.name);
active.description = Set(skill.description);
active.content = Set(skill.content);
active.metadata = Set(json_meta);
active.commit_sha = Set(skill.commit_sha);
active.blob_hash = Set(skill.blob_hash);
active.updated_at = Set(now);
if active.update(&self.db).await.is_ok() {
updated += 1;
}
}
} else {
let active = SkillActiveModel {
id: Set(0),
project_uuid: Set(project_uid),
slug: Set(skill.slug.clone()),
name: Set(skill.name),
description: Set(skill.description),
source: Set("repo".to_string()),
repo_id: Set(Some(self.repo.id)),
commit_sha: Set(skill.commit_sha),
blob_hash: Set(skill.blob_hash),
content: Set(skill.content),
metadata: Set(json_meta),
enabled: Set(true),
created_by: Set(None),
created_at: Set(now),
updated_at: Set(now),
};
if SkillEntity::insert(active).exec(&self.db).await.is_ok() {
created += 1;
}
}
}
for (key, old_skill) in existing_by_slug {
if !seen_keys.contains(&key) {
if SkillEntity::delete_by_id(old_skill.id)
.exec(&self.db)
.await
.is_ok()
{
removed += 1;
}
}
}
if created > 0 || updated > 0 || removed > 0 {
tracing::info!(
"skills synced created={} updated={} removed={}",
created,
updated,
removed
);
}
}
}