Compare commits

..

No commits in common. "7c042c7b9d42c22f3560833efa06f83f0ee3f9c0" and "01b18c97dfcabe83f08f52781fb877ccde8372eb" have entirely different histories.

3 changed files with 46 additions and 55 deletions

View File

@ -2,9 +2,10 @@ use clap::Parser;
use config::AppConfig; use config::AppConfig;
use db::cache::AppCache; use db::cache::AppCache;
use db::database::AppDatabase; use db::database::AppDatabase;
use git::hook::HookService; use git::hook::GitServiceHooks;
use slog::{Drain, OwnedKVList, Record}; use slog::{Drain, OwnedKVList, Record};
use tokio::signal; use tokio::signal;
use tokio_util::sync::CancellationToken;
mod args; mod args;
@ -28,24 +29,33 @@ async fn main() -> anyhow::Result<()> {
slog::info!(log, "cache connected"); slog::info!(log, "cache connected");
// 5. Parse CLI args // 5. Parse CLI args
let _args = HookArgs::parse(); let args = HookArgs::parse();
slog::info!(log, "git-hook worker starting"); slog::info!(log, "git-hook worker starting";
"worker_id" => %args.worker_id.unwrap_or_else(|| "default".to_string())
);
// 6. Build and start git hook service // 5. Build HTTP client for webhook delivery
let hooks = HookService::new( let http = reqwest::Client::builder()
.user_agent("Code-Git-Hook/1.0")
.build()
.unwrap_or_else(|_| reqwest::Client::new());
// 6. Build and run git hook service
let hooks = GitServiceHooks::new(
db, db,
cache.clone(), cache.clone(),
cache.redis_pool().clone(), cache.redis_pool().clone(),
log.clone(), log.clone(),
cfg, cfg,
std::sync::Arc::new(http),
); );
let cancel = hooks.start_worker(); let cancel = CancellationToken::new();
let cancel_signal = cancel.clone(); let cancel_clone = cancel.clone();
let log_clone = log.clone();
// Spawn signal handler that cancels on SIGINT/SIGTERM // Spawn signal handler
let log_clone = log.clone();
tokio::spawn(async move { tokio::spawn(async move {
let ctrl_c = async { let ctrl_c = async {
signal::ctrl_c() signal::ctrl_c()
@ -72,11 +82,11 @@ async fn main() -> anyhow::Result<()> {
slog::info!(log_clone, "received SIGTERM, initiating shutdown"); slog::info!(log_clone, "received SIGTERM, initiating shutdown");
} }
} }
cancel_signal.cancel(); cancel_clone.cancel();
}); });
// Wait until the worker is cancelled (by signal handler or otherwise) hooks.run(cancel).await?;
cancel.cancelled().await;
slog::info!(log, "git-hook worker stopped"); slog::info!(log, "git-hook worker stopped");
Ok(()) Ok(())
} }

View File

@ -22,7 +22,8 @@ pub(crate) struct BranchTip {
} }
/// Owned tag data collected from git2 (no git2 types after this). /// Owned tag data collected from git2 (no git2 types after this).
/// Consumed by sync_tags via collect_tag_refs(). /// Used by sync_tags; currently populated by collect_git_refs but not yet consumed.
#[allow(dead_code)]
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct TagTip { pub(crate) struct TagTip {
pub name: String, pub name: String,

View File

@ -1,4 +1,3 @@
use crate::hook::sync::commit::TagTip;
use crate::GitError; use crate::GitError;
use crate::hook::sync::HookMetaDataSync; use crate::hook::sync::HookMetaDataSync;
use db::database::AppTransaction; use db::database::AppTransaction;
@ -8,15 +7,21 @@ use sea_orm::*;
use std::collections::HashSet; use std::collections::HashSet;
impl HookMetaDataSync { impl HookMetaDataSync {
/// Collect all tag metadata from git2 into owned structs. pub async fn sync_tags(&self, txn: &AppTransaction) -> Result<(), GitError> {
/// This is sync and must be called from a `spawn_blocking` context. let repo_id = self.repo.id;
pub(crate) fn collect_tag_refs(&self) -> Result<Vec<TagTip>, GitError> {
let repo = self.domain.repo(); let repo = self.domain.repo();
let existing: Vec<repo_tag::Model> = repo_tag::Entity::find()
.filter(repo_tag::Column::Repo.eq(repo_id))
.all(txn)
.await
.map_err(|e| GitError::IoError(format!("failed to query tags: {}", e)))?;
let mut existing_names: HashSet<String> = existing.iter().map(|t| t.name.clone()).collect();
let tag_names = repo let tag_names = repo
.tag_names(None) .tag_names(None)
.map_err(|e| GitError::Internal(e.to_string()))?; .map_err(|e| GitError::Internal(e.to_string()))?;
let mut tags = Vec::new();
for tag_name in tag_names.iter().flatten() { for tag_name in tag_names.iter().flatten() {
let full_ref = format!("refs/tags/{}", tag_name); let full_ref = format!("refs/tags/{}", tag_name);
let reference = match repo.find_reference(&full_ref) { let reference = match repo.find_reference(&full_ref) {
@ -48,53 +53,28 @@ impl HookMetaDataSync {
(None, String::new(), String::new()) (None, String::new(), String::new())
}; };
tags.push(TagTip { if existing_names.contains(tag_name) {
name: tag_name.to_string(), existing_names.remove(tag_name);
target_oid,
description,
tagger_name,
tagger_email,
});
}
Ok(tags)
}
pub async fn sync_tags(&self, txn: &AppTransaction) -> Result<(), GitError> {
let repo_id = self.repo.id;
let existing: Vec<repo_tag::Model> = repo_tag::Entity::find()
.filter(repo_tag::Column::Repo.eq(repo_id))
.all(txn)
.await
.map_err(|e| GitError::IoError(format!("failed to query tags: {}", e)))?;
let mut existing_names: HashSet<String> = existing.iter().map(|t| t.name.clone()).collect();
let tags = self.collect_tag_refs()?;
for tag in tags {
if existing_names.contains(&tag.name) {
existing_names.remove(&tag.name);
repo_tag::Entity::update_many() repo_tag::Entity::update_many()
.filter(repo_tag::Column::Repo.eq(repo_id)) .filter(repo_tag::Column::Repo.eq(repo_id))
.filter(repo_tag::Column::Name.eq(&tag.name)) .filter(repo_tag::Column::Name.eq(tag_name))
.col_expr(repo_tag::Column::Oid, Expr::value(&tag.target_oid)) .col_expr(repo_tag::Column::Oid, Expr::value(&target_oid))
.col_expr(repo_tag::Column::Description, Expr::value(tag.description.clone())) .col_expr(repo_tag::Column::Description, Expr::value(description))
.col_expr(repo_tag::Column::TaggerName, Expr::value(&tag.tagger_name)) .col_expr(repo_tag::Column::TaggerName, Expr::value(&tagger_name))
.col_expr(repo_tag::Column::TaggerEmail, Expr::value(&tag.tagger_email)) .col_expr(repo_tag::Column::TaggerEmail, Expr::value(&tagger_email))
.exec(txn) .exec(txn)
.await .await
.map_err(|e| GitError::IoError(format!("failed to update tag: {}", e)))?; .map_err(|e| GitError::IoError(format!("failed to update tag: {}", e)))?;
} else { } else {
let new_tag = repo_tag::ActiveModel { let new_tag = repo_tag::ActiveModel {
repo: Set(repo_id), repo: Set(repo_id),
name: Set(tag.name.clone()), name: Set(tag_name.to_string()),
oid: Set(tag.target_oid), oid: Set(target_oid),
color: Set(None), color: Set(None),
description: Set(tag.description.clone()), description: Set(description),
created_at: Set(chrono::Utc::now()), created_at: Set(chrono::Utc::now()),
tagger_name: Set(tag.tagger_name.clone()), tagger_name: Set(tagger_name),
tagger_email: Set(tag.tagger_email.clone()), tagger_email: Set(tagger_email),
tagger: Set(None), tagger: Set(None),
..Default::default() ..Default::default()
}; };