Compare commits
No commits in common. "01b18c97dfcabe83f08f52781fb877ccde8372eb" and "eeb99bf628d9e2574e52f10871502c544c5c64e9" have entirely different histories.
01b18c97df
...
eeb99bf628
@ -3,8 +3,6 @@ use serde::{Deserialize, Serialize};
|
|||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct PoolConfig {
|
pub struct PoolConfig {
|
||||||
/// Intended concurrency (used by K8s operator/HPA, not the worker itself).
|
|
||||||
/// The worker is single-threaded by design; K8s replicas provide parallelism.
|
|
||||||
pub max_concurrent: usize,
|
pub max_concurrent: usize,
|
||||||
pub cpu_threshold: f32,
|
pub cpu_threshold: f32,
|
||||||
/// Hash-tag-prefixed Redis key prefix for hook task queues.
|
/// Hash-tag-prefixed Redis key prefix for hook task queues.
|
||||||
|
|||||||
@ -2,19 +2,13 @@ use config::AppConfig;
|
|||||||
use db::cache::AppCache;
|
use db::cache::AppCache;
|
||||||
use db::database::AppDatabase;
|
use db::database::AppDatabase;
|
||||||
use deadpool_redis::cluster::Pool as RedisPool;
|
use deadpool_redis::cluster::Pool as RedisPool;
|
||||||
|
use models::EntityTrait;
|
||||||
use slog::Logger;
|
use slog::Logger;
|
||||||
use tokio_util::sync::CancellationToken;
|
use std::sync::Arc;
|
||||||
|
|
||||||
pub mod pool;
|
/// Simplified hook service — no queue, no pool.
|
||||||
pub mod sync;
|
/// K8s StatefulSet HA scheduling ensures only one pod touches a repo at a time.
|
||||||
pub mod webhook_dispatch;
|
/// Execution is direct and sequential per invocation.
|
||||||
|
|
||||||
pub use pool::{HookWorker, PoolConfig, RedisConsumer};
|
|
||||||
pub use pool::types::{HookTask, TaskType};
|
|
||||||
|
|
||||||
/// Hook service that manages the Redis-backed task queue worker.
|
|
||||||
/// Multiple gitserver pods can run concurrently — the worker acquires a
|
|
||||||
/// per-repo Redis lock before processing each task.
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct HookService {
|
pub struct HookService {
|
||||||
pub(crate) db: AppDatabase,
|
pub(crate) db: AppDatabase,
|
||||||
@ -22,6 +16,7 @@ pub struct HookService {
|
|||||||
pub(crate) redis_pool: RedisPool,
|
pub(crate) redis_pool: RedisPool,
|
||||||
pub(crate) logger: Logger,
|
pub(crate) logger: Logger,
|
||||||
pub(crate) config: AppConfig,
|
pub(crate) config: AppConfig,
|
||||||
|
pub(crate) http: Arc<reqwest::Client>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HookService {
|
impl HookService {
|
||||||
@ -31,6 +26,7 @@ impl HookService {
|
|||||||
redis_pool: RedisPool,
|
redis_pool: RedisPool,
|
||||||
logger: Logger,
|
logger: Logger,
|
||||||
config: AppConfig,
|
config: AppConfig,
|
||||||
|
http: Arc<reqwest::Client>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
db,
|
db,
|
||||||
@ -38,18 +34,96 @@ impl HookService {
|
|||||||
redis_pool,
|
redis_pool,
|
||||||
logger,
|
logger,
|
||||||
config,
|
config,
|
||||||
|
http,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Start the background worker and return a cancellation token.
|
/// Full sync: refs → commits → tags → LFS → fsck → gc → skills.
|
||||||
pub fn start_worker(&self) -> CancellationToken {
|
pub async fn sync_repo(&self, repo_id: &str) -> Result<(), crate::GitError> {
|
||||||
let pool_config = PoolConfig::from_env(&self.config);
|
let repo_uuid = models::Uuid::parse_str(repo_id)
|
||||||
pool::start_worker(
|
.map_err(|_| crate::GitError::Internal("invalid repo_id uuid".into()))?;
|
||||||
|
|
||||||
|
let repo = models::repos::repo::Entity::find_by_id(repo_uuid)
|
||||||
|
.one(self.db.reader())
|
||||||
|
.await
|
||||||
|
.map_err(crate::GitError::from)?
|
||||||
|
.ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?;
|
||||||
|
|
||||||
|
if !std::path::Path::new(&repo.storage_path).exists() {
|
||||||
|
return Err(crate::GitError::NotFound(format!(
|
||||||
|
"storage path does not exist: {}",
|
||||||
|
repo.storage_path
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
let sync = crate::hook::sync::HookMetaDataSync::new(
|
||||||
self.db.clone(),
|
self.db.clone(),
|
||||||
self.cache.clone(),
|
self.cache.clone(),
|
||||||
self.redis_pool.clone(),
|
repo,
|
||||||
self.logger.clone(),
|
self.logger.clone(),
|
||||||
pool_config,
|
)?;
|
||||||
)
|
|
||||||
|
// No distributed lock needed — K8s StatefulSet scheduling guarantees
|
||||||
|
// that at most one pod processes a given repo shard at any time.
|
||||||
|
sync.sync().await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run fsck only (no full sync).
|
||||||
|
pub async fn fsck_repo(&self, repo_id: &str) -> Result<(), crate::GitError> {
|
||||||
|
let repo_uuid = models::Uuid::parse_str(repo_id)
|
||||||
|
.map_err(|_| crate::GitError::Internal("invalid repo_id uuid".into()))?;
|
||||||
|
|
||||||
|
let repo = models::repos::repo::Entity::find_by_id(repo_uuid)
|
||||||
|
.one(self.db.reader())
|
||||||
|
.await
|
||||||
|
.map_err(crate::GitError::from)?
|
||||||
|
.ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?;
|
||||||
|
|
||||||
|
if !std::path::Path::new(&repo.storage_path).exists() {
|
||||||
|
return Err(crate::GitError::NotFound(format!(
|
||||||
|
"storage path does not exist: {}",
|
||||||
|
repo.storage_path
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
let sync = crate::hook::sync::HookMetaDataSync::new(
|
||||||
|
self.db.clone(),
|
||||||
|
self.cache.clone(),
|
||||||
|
repo,
|
||||||
|
self.logger.clone(),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
sync.fsck_only().await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run gc only (no full sync).
|
||||||
|
pub async fn gc_repo(&self, repo_id: &str) -> Result<(), crate::GitError> {
|
||||||
|
let repo_uuid = models::Uuid::parse_str(repo_id)
|
||||||
|
.map_err(|_| crate::GitError::Internal("invalid repo_id uuid".into()))?;
|
||||||
|
|
||||||
|
let repo = models::repos::repo::Entity::find_by_id(repo_uuid)
|
||||||
|
.one(self.db.reader())
|
||||||
|
.await
|
||||||
|
.map_err(crate::GitError::from)?
|
||||||
|
.ok_or_else(|| crate::GitError::NotFound(format!("repo {} not found", repo_id)))?;
|
||||||
|
|
||||||
|
if !std::path::Path::new(&repo.storage_path).exists() {
|
||||||
|
return Err(crate::GitError::NotFound(format!(
|
||||||
|
"storage path does not exist: {}",
|
||||||
|
repo.storage_path
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
let sync = crate::hook::sync::HookMetaDataSync::new(
|
||||||
|
self.db.clone(),
|
||||||
|
self.cache.clone(),
|
||||||
|
repo,
|
||||||
|
self.logger.clone(),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
sync.gc_only().await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub mod sync;
|
||||||
|
pub mod webhook_dispatch;
|
||||||
|
|||||||
@ -1,44 +0,0 @@
|
|||||||
pub mod redis;
|
|
||||||
pub mod types;
|
|
||||||
pub mod worker;
|
|
||||||
|
|
||||||
pub use redis::RedisConsumer;
|
|
||||||
pub use types::{HookTask, PoolConfig, TaskType};
|
|
||||||
pub use worker::HookWorker;
|
|
||||||
|
|
||||||
use db::cache::AppCache;
|
|
||||||
use db::database::AppDatabase;
|
|
||||||
use deadpool_redis::cluster::Pool as RedisPool;
|
|
||||||
use slog::Logger;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use tokio_util::sync::CancellationToken;
|
|
||||||
|
|
||||||
/// Start the hook worker background task.
|
|
||||||
/// Returns a handle to the cancellation token so the caller can shut it down.
|
|
||||||
pub fn start_worker(
|
|
||||||
db: AppDatabase,
|
|
||||||
cache: AppCache,
|
|
||||||
redis_pool: RedisPool,
|
|
||||||
logger: Logger,
|
|
||||||
config: PoolConfig,
|
|
||||||
) -> CancellationToken {
|
|
||||||
let consumer = RedisConsumer::new(
|
|
||||||
redis_pool.clone(),
|
|
||||||
config.redis_list_prefix.clone(),
|
|
||||||
config.redis_block_timeout_secs,
|
|
||||||
logger.clone(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let http_client = Arc::new(reqwest::Client::new());
|
|
||||||
let max_retries = config.redis_max_retries as u32;
|
|
||||||
let worker = HookWorker::new(db, cache, logger, consumer, http_client, max_retries);
|
|
||||||
|
|
||||||
let cancel = CancellationToken::new();
|
|
||||||
let cancel_clone = cancel.clone();
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
worker.run(cancel_clone).await;
|
|
||||||
});
|
|
||||||
|
|
||||||
cancel
|
|
||||||
}
|
|
||||||
@ -1,176 +0,0 @@
|
|||||||
use crate::error::GitError;
|
|
||||||
use crate::hook::pool::types::HookTask;
|
|
||||||
use deadpool_redis::cluster::Connection as RedisConn;
|
|
||||||
use slog::Logger;
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
/// Redis List consumer using BLMOVE for atomic move-from-queue-to-work pattern.
|
|
||||||
pub struct RedisConsumer {
|
|
||||||
pool: deadpool_redis::cluster::Pool,
|
|
||||||
/// Hash-tag-prefixed key prefix, e.g. "{hook}".
|
|
||||||
prefix: String,
|
|
||||||
block_timeout_secs: u64,
|
|
||||||
logger: Logger,
|
|
||||||
}
|
|
||||||
|
|
||||||
const POOL_GET_TIMEOUT: Duration = Duration::from_secs(5);
|
|
||||||
|
|
||||||
impl RedisConsumer {
|
|
||||||
pub fn new(
|
|
||||||
pool: deadpool_redis::cluster::Pool,
|
|
||||||
prefix: String,
|
|
||||||
block_timeout_secs: u64,
|
|
||||||
logger: Logger,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
|
||||||
pool,
|
|
||||||
prefix,
|
|
||||||
block_timeout_secs,
|
|
||||||
logger,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Atomically moves a task from the main queue to the work queue using BLMOVE.
|
|
||||||
/// Blocks up to `block_timeout_secs` waiting for a task.
|
|
||||||
///
|
|
||||||
/// Returns `Some((HookTask, task_json))` where `task_json` is the raw JSON string
|
|
||||||
/// needed for LREM on ACK. Returns `None` if the blocking timed out.
|
|
||||||
pub async fn next(&self, task_type: &str) -> Result<Option<(HookTask, String)>, GitError> {
|
|
||||||
let queue_key = format!("{}:{}", self.prefix, task_type);
|
|
||||||
let work_key = format!("{}:{}:work", self.prefix, task_type);
|
|
||||||
|
|
||||||
let redis = tokio::time::timeout(POOL_GET_TIMEOUT, self.pool.get())
|
|
||||||
.await
|
|
||||||
.map_err(|_| GitError::Internal("redis pool get timed out".into()))?
|
|
||||||
.map_err(|e| GitError::Internal(format!("redis pool get failed: {}", e)))?;
|
|
||||||
|
|
||||||
let mut conn: RedisConn = redis;
|
|
||||||
|
|
||||||
// BLMOVE source destination <LEFT|RIGHT> <LEFT|RIGHT> timeout
|
|
||||||
let task_json: Option<String> = redis::cmd("BLMOVE")
|
|
||||||
.arg(&queue_key)
|
|
||||||
.arg(&work_key)
|
|
||||||
.arg("RIGHT")
|
|
||||||
.arg("LEFT")
|
|
||||||
.arg(self.block_timeout_secs)
|
|
||||||
.query_async(&mut conn)
|
|
||||||
.await
|
|
||||||
.map_err(|e| GitError::Internal(format!("BLMOVE failed: {}", e)))?;
|
|
||||||
|
|
||||||
match task_json {
|
|
||||||
Some(json) => {
|
|
||||||
match serde_json::from_str::<HookTask>(&json) {
|
|
||||||
Ok(task) => {
|
|
||||||
slog::debug!(self.logger, "task dequeued";
|
|
||||||
"task_id" => %task.id,
|
|
||||||
"task_type" => %task.task_type,
|
|
||||||
"queue" => %queue_key
|
|
||||||
);
|
|
||||||
Ok(Some((task, json)))
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
// Malformed task — remove from work queue and discard
|
|
||||||
slog::warn!(self.logger, "malformed task JSON, discarding";
|
|
||||||
"error" => %e,
|
|
||||||
"queue" => %work_key
|
|
||||||
);
|
|
||||||
let _ = self.ack_raw(&work_key, &json).await;
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
// Timed out, no task available
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Acknowledge a task: remove it from the work queue (LREM).
|
|
||||||
pub async fn ack(&self, work_key: &str, task_json: &str) -> Result<(), GitError> {
|
|
||||||
self.ack_raw(work_key, task_json).await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn ack_raw(&self, work_key: &str, task_json: &str) -> Result<(), GitError> {
|
|
||||||
let redis = tokio::time::timeout(POOL_GET_TIMEOUT, self.pool.get())
|
|
||||||
.await
|
|
||||||
.map_err(|_| GitError::Internal("redis pool get timed out".into()))?
|
|
||||||
.map_err(|e| GitError::Internal(format!("redis pool get failed: {}", e)))?;
|
|
||||||
|
|
||||||
let mut conn: RedisConn = redis;
|
|
||||||
|
|
||||||
let _: i64 = redis::cmd("LREM")
|
|
||||||
.arg(work_key)
|
|
||||||
.arg(-1)
|
|
||||||
.arg(task_json)
|
|
||||||
.query_async(&mut conn)
|
|
||||||
.await
|
|
||||||
.map_err(|e| GitError::Internal(format!("LREM failed: {}", e)))?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Negative acknowledge (retry): remove from work queue and push back to main queue.
|
|
||||||
pub async fn nak(
|
|
||||||
&self,
|
|
||||||
work_key: &str,
|
|
||||||
queue_key: &str,
|
|
||||||
task_json: &str,
|
|
||||||
) -> Result<(), GitError> {
|
|
||||||
self.nak_with_retry(work_key, queue_key, task_json, task_json).await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Negative acknowledge with a different (updated) task JSON — used to
|
|
||||||
/// requeue with an incremented retry_count.
|
|
||||||
/// Uses a Lua script for atomic LREM + LPUSH to prevent task loss on crash.
|
|
||||||
pub async fn nak_with_retry(
|
|
||||||
&self,
|
|
||||||
work_key: &str,
|
|
||||||
queue_key: &str,
|
|
||||||
old_task_json: &str,
|
|
||||||
new_task_json: &str,
|
|
||||||
) -> Result<(), GitError> {
|
|
||||||
let redis = tokio::time::timeout(POOL_GET_TIMEOUT, self.pool.get())
|
|
||||||
.await
|
|
||||||
.map_err(|_| GitError::Internal("redis pool get timed out".into()))?
|
|
||||||
.map_err(|e| GitError::Internal(format!("redis pool get failed: {}", e)))?;
|
|
||||||
|
|
||||||
let mut conn: RedisConn = redis;
|
|
||||||
|
|
||||||
// Atomic: remove from work queue AND push to retry queue in one script.
|
|
||||||
// If the process crashes mid-script, either both happen or neither — no lost tasks.
|
|
||||||
let script = r#"
|
|
||||||
redis.call("LREM", KEYS[1], 1, ARGV[1])
|
|
||||||
redis.call("LPUSH", KEYS[2], ARGV[2])
|
|
||||||
return 1
|
|
||||||
"#;
|
|
||||||
|
|
||||||
let _: i32 = redis::Script::new(script)
|
|
||||||
.key(work_key)
|
|
||||||
.key(queue_key)
|
|
||||||
.arg(old_task_json)
|
|
||||||
.arg(new_task_json)
|
|
||||||
.invoke_async(&mut conn)
|
|
||||||
.await
|
|
||||||
.map_err(|e| GitError::Internal(format!("nak script failed: {}", e)))?;
|
|
||||||
|
|
||||||
slog::warn!(self.logger, "task nack'd and requeued queue={}", queue_key);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn prefix(&self) -> &str {
|
|
||||||
&self.prefix
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Clone for RedisConsumer {
|
|
||||||
fn clone(&self) -> Self {
|
|
||||||
Self {
|
|
||||||
pool: self.pool.clone(),
|
|
||||||
prefix: self.prefix.clone(),
|
|
||||||
block_timeout_secs: self.block_timeout_secs,
|
|
||||||
logger: self.logger.clone(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,32 +0,0 @@
|
|||||||
use serde::{Deserialize, Serialize};
|
|
||||||
|
|
||||||
pub use config::hook::PoolConfig;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
||||||
pub struct HookTask {
|
|
||||||
pub id: String,
|
|
||||||
pub repo_id: String,
|
|
||||||
pub task_type: TaskType,
|
|
||||||
pub payload: serde_json::Value,
|
|
||||||
pub created_at: chrono::DateTime<chrono::Utc>,
|
|
||||||
#[serde(default)]
|
|
||||||
pub retry_count: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
||||||
#[serde(rename_all = "snake_case")]
|
|
||||||
pub enum TaskType {
|
|
||||||
Sync,
|
|
||||||
Fsck,
|
|
||||||
Gc,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Display for TaskType {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
match self {
|
|
||||||
TaskType::Sync => write!(f, "sync"),
|
|
||||||
TaskType::Fsck => write!(f, "fsck"),
|
|
||||||
TaskType::Gc => write!(f, "gc"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,380 +0,0 @@
|
|||||||
use crate::error::GitError;
|
|
||||||
use crate::hook::pool::redis::RedisConsumer;
|
|
||||||
use crate::hook::pool::types::{HookTask, TaskType};
|
|
||||||
use crate::hook::sync::HookMetaDataSync;
|
|
||||||
use db::cache::AppCache;
|
|
||||||
use db::database::AppDatabase;
|
|
||||||
use models::EntityTrait;
|
|
||||||
use slog::Logger;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::time::Duration;
|
|
||||||
use tokio_util::sync::CancellationToken;
|
|
||||||
|
|
||||||
/// Single-threaded worker that sequentially consumes tasks from Redis queues.
|
|
||||||
/// K8s can scale replicas for concurrency — each replica runs one worker.
|
|
||||||
/// Per-repo Redis locking is managed inside HookMetaDataSync methods.
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct HookWorker {
|
|
||||||
db: AppDatabase,
|
|
||||||
cache: AppCache,
|
|
||||||
logger: Logger,
|
|
||||||
consumer: RedisConsumer,
|
|
||||||
http_client: Arc<reqwest::Client>,
|
|
||||||
max_retries: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl HookWorker {
|
|
||||||
pub fn new(
|
|
||||||
db: AppDatabase,
|
|
||||||
cache: AppCache,
|
|
||||||
logger: Logger,
|
|
||||||
consumer: RedisConsumer,
|
|
||||||
http_client: Arc<reqwest::Client>,
|
|
||||||
max_retries: u32,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
|
||||||
db,
|
|
||||||
cache,
|
|
||||||
logger,
|
|
||||||
consumer,
|
|
||||||
http_client,
|
|
||||||
max_retries,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Run the worker loop. Blocks until cancelled.
|
|
||||||
pub async fn run(&self, cancel: CancellationToken) {
|
|
||||||
slog::info!(self.logger, "hook worker started");
|
|
||||||
|
|
||||||
let task_types = [TaskType::Sync, TaskType::Fsck, TaskType::Gc];
|
|
||||||
let mut redis_backoff_ms: u64 = 1000;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
// Check cancellation at top of loop to avoid unnecessary work
|
|
||||||
if cancel.is_cancelled() {
|
|
||||||
slog::info!(self.logger, "hook worker shutdown signal received");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
for task_type in &task_types {
|
|
||||||
let result = self.consumer.next(&task_type.to_string()).await;
|
|
||||||
|
|
||||||
let (task, task_json) = match result {
|
|
||||||
Ok(Some(pair)) => {
|
|
||||||
// Reset backoff on successful dequeue
|
|
||||||
redis_backoff_ms = 1000;
|
|
||||||
pair
|
|
||||||
}
|
|
||||||
Ok(None) => continue,
|
|
||||||
Err(e) => {
|
|
||||||
slog::warn!(self.logger, "failed to dequeue task: {}", e);
|
|
||||||
tokio::time::sleep(Duration::from_millis(redis_backoff_ms)).await;
|
|
||||||
// Exponential backoff, cap at 32s
|
|
||||||
redis_backoff_ms = (redis_backoff_ms * 2).min(32_000);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let queue_key = format!("{}:{}", self.consumer.prefix(), task_type);
|
|
||||||
let work_key = format!("{}:work", queue_key);
|
|
||||||
|
|
||||||
self.process_task(&task, &task_json, &work_key, &queue_key)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
slog::info!(self.logger, "hook worker stopped");
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn process_task(
|
|
||||||
&self,
|
|
||||||
task: &HookTask,
|
|
||||||
task_json: &str,
|
|
||||||
work_key: &str,
|
|
||||||
queue_key: &str,
|
|
||||||
) {
|
|
||||||
slog::info!(self.logger, "task started task_id={} task_type={} repo_id={}",
|
|
||||||
task.id, task.task_type, task.repo_id);
|
|
||||||
|
|
||||||
let result = match task.task_type {
|
|
||||||
TaskType::Sync => self.run_sync(&task.repo_id).await,
|
|
||||||
TaskType::Fsck => self.run_fsck(&task.repo_id).await,
|
|
||||||
TaskType::Gc => self.run_gc(&task.repo_id).await,
|
|
||||||
};
|
|
||||||
|
|
||||||
match result {
|
|
||||||
Ok(()) => {
|
|
||||||
if let Err(e) = self.consumer.ack(work_key, task_json).await {
|
|
||||||
slog::warn!(self.logger, "failed to ack task: {}", e);
|
|
||||||
}
|
|
||||||
slog::info!(self.logger, "task completed task_id={}", task.id);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
let is_locked = matches!(e, crate::GitError::Locked(_));
|
|
||||||
|
|
||||||
if is_locked {
|
|
||||||
// Another worker holds the lock — requeue without counting as retry.
|
|
||||||
slog::info!(self.logger, "repo locked by another worker, requeueing task_id={}", task.id);
|
|
||||||
if let Err(nak_err) = self.consumer.nak(work_key, queue_key, task_json).await {
|
|
||||||
slog::warn!(self.logger, "failed to requeue locked task: {}", nak_err);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
slog::warn!(self.logger, "task failed task_id={} task_type={} repo_id={} error={}",
|
|
||||||
task.id, task.task_type, task.repo_id, e);
|
|
||||||
|
|
||||||
if task.retry_count >= self.max_retries {
|
|
||||||
slog::warn!(self.logger, "task exhausted retries, discarding task_id={} retry_count={}",
|
|
||||||
task.id, task.retry_count);
|
|
||||||
let _ = self.consumer.ack(work_key, task_json).await;
|
|
||||||
} else {
|
|
||||||
let mut task = task.clone();
|
|
||||||
task.retry_count += 1;
|
|
||||||
let retry_json =
|
|
||||||
serde_json::to_string(&task).unwrap_or_else(|_| task_json.to_string());
|
|
||||||
let _ = self
|
|
||||||
.consumer
|
|
||||||
.nak_with_retry(work_key, queue_key, task_json, &retry_json)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run_sync(&self, repo_id: &str) -> Result<(), GitError> {
|
|
||||||
let repo_uuid = models::Uuid::parse_str(repo_id)
|
|
||||||
.map_err(|_| GitError::Internal("invalid repo_id uuid".into()))?;
|
|
||||||
|
|
||||||
let repo = models::repos::repo::Entity::find_by_id(repo_uuid)
|
|
||||||
.one(self.db.reader())
|
|
||||||
.await
|
|
||||||
.map_err(GitError::from)?
|
|
||||||
.ok_or_else(|| GitError::NotFound(format!("repo {} not found", repo_id)))?;
|
|
||||||
|
|
||||||
if !std::path::Path::new(&repo.storage_path).exists() {
|
|
||||||
return Err(GitError::NotFound(format!(
|
|
||||||
"storage path does not exist: {}",
|
|
||||||
repo.storage_path
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Capture before tips for webhook diff
|
|
||||||
let before_tips = tokio::task::spawn_blocking({
|
|
||||||
let db = self.db.clone();
|
|
||||||
let cache = self.cache.clone();
|
|
||||||
let logger = self.logger.clone();
|
|
||||||
let repo = repo.clone();
|
|
||||||
move || {
|
|
||||||
let sync = HookMetaDataSync::new(db, cache, repo, logger)
|
|
||||||
.map_err(|e| GitError::Internal(e.to_string()))?;
|
|
||||||
Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips()))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))?
|
|
||||||
.map_err(GitError::from)?;
|
|
||||||
|
|
||||||
// Run full sync (internally acquires/releases per-repo lock)
|
|
||||||
let db = self.db.clone();
|
|
||||||
let cache = self.cache.clone();
|
|
||||||
let logger = self.logger.clone();
|
|
||||||
let repo_clone = repo.clone();
|
|
||||||
let _sync_result = tokio::task::spawn_blocking(move || {
|
|
||||||
let result = tokio::runtime::Handle::current().block_on(async {
|
|
||||||
let sync = HookMetaDataSync::new(db.clone(), cache.clone(), repo_clone.clone(), logger.clone())?;
|
|
||||||
sync.sync().await
|
|
||||||
});
|
|
||||||
match result {
|
|
||||||
Ok(()) => Ok::<(), GitError>(()),
|
|
||||||
Err(e) => Err(GitError::Internal(e.to_string())),
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))
|
|
||||||
.and_then(|r| r.map_err(GitError::from))?;
|
|
||||||
|
|
||||||
// Only dispatch webhooks if sync succeeded
|
|
||||||
|
|
||||||
// Capture after tips and dispatch webhooks
|
|
||||||
let after_tips = tokio::task::spawn_blocking({
|
|
||||||
let db = self.db.clone();
|
|
||||||
let cache = self.cache.clone();
|
|
||||||
let logger = self.logger.clone();
|
|
||||||
let repo = repo.clone();
|
|
||||||
move || {
|
|
||||||
let sync = HookMetaDataSync::new(db, cache, repo, logger)
|
|
||||||
.map_err(|e| GitError::Internal(e.to_string()))?;
|
|
||||||
Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips()))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))?
|
|
||||||
.map_err(GitError::from)?;
|
|
||||||
|
|
||||||
let (before_branch_tips, before_tag_tips) = before_tips;
|
|
||||||
let (after_branch_tips, after_tag_tips) = after_tips;
|
|
||||||
let project = repo.project;
|
|
||||||
|
|
||||||
// Resolve namespace once outside the loop
|
|
||||||
let namespace = models::projects::Project::find_by_id(project)
|
|
||||||
.one(self.db.reader())
|
|
||||||
.await
|
|
||||||
.ok()
|
|
||||||
.flatten()
|
|
||||||
.map(|p| p.name)
|
|
||||||
.unwrap_or_default();
|
|
||||||
|
|
||||||
let repo_id_str = repo.id.to_string();
|
|
||||||
let repo_name = repo.repo_name.clone();
|
|
||||||
let default_branch = repo.default_branch.clone();
|
|
||||||
let http_client = self.http_client.clone();
|
|
||||||
let db = self.db.clone();
|
|
||||||
let logger = self.logger.clone();
|
|
||||||
|
|
||||||
// Dispatch branch webhooks and collect handles
|
|
||||||
let mut handles = Vec::new();
|
|
||||||
for (branch, after_oid) in after_branch_tips {
|
|
||||||
let before_oid = before_branch_tips
|
|
||||||
.iter()
|
|
||||||
.find(|(n, _)| n == &branch)
|
|
||||||
.map(|(_, o)| o.as_str());
|
|
||||||
let changed = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(true);
|
|
||||||
if changed {
|
|
||||||
let before_oid = before_oid.map_or("0", |v| v).to_string();
|
|
||||||
let branch_name = branch.clone();
|
|
||||||
let h = tokio::spawn({
|
|
||||||
let http_client = http_client.clone();
|
|
||||||
let db = db.clone();
|
|
||||||
let logger = logger.clone();
|
|
||||||
let repo_id_str = repo_id_str.clone();
|
|
||||||
let namespace = namespace.clone();
|
|
||||||
let repo_name = repo_name.clone();
|
|
||||||
let default_branch = default_branch.clone();
|
|
||||||
async move {
|
|
||||||
crate::hook::webhook_dispatch::dispatch_repo_webhooks(
|
|
||||||
&db,
|
|
||||||
&http_client,
|
|
||||||
&logger,
|
|
||||||
&repo_id_str,
|
|
||||||
&namespace,
|
|
||||||
&repo_name,
|
|
||||||
&default_branch,
|
|
||||||
"",
|
|
||||||
"",
|
|
||||||
crate::hook::webhook_dispatch::WebhookEventKind::Push {
|
|
||||||
r#ref: format!("refs/heads/{}", branch_name),
|
|
||||||
before: before_oid,
|
|
||||||
after: after_oid,
|
|
||||||
commits: vec![],
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
handles.push(h);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dispatch tag webhooks and collect handles
|
|
||||||
for (tag, after_oid) in after_tag_tips {
|
|
||||||
let before_oid = before_tag_tips
|
|
||||||
.iter()
|
|
||||||
.find(|(n, _)| n == &tag)
|
|
||||||
.map(|(_, o)| o.as_str());
|
|
||||||
let is_new = before_oid.is_none();
|
|
||||||
let was_updated = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(false);
|
|
||||||
if is_new || was_updated {
|
|
||||||
let before_oid = before_oid.map_or("0", |v| v).to_string();
|
|
||||||
let tag_name = tag.clone();
|
|
||||||
let h = tokio::spawn({
|
|
||||||
let http_client = http_client.clone();
|
|
||||||
let db = db.clone();
|
|
||||||
let logger = logger.clone();
|
|
||||||
let repo_id_str = repo_id_str.clone();
|
|
||||||
let namespace = namespace.clone();
|
|
||||||
let repo_name = repo_name.clone();
|
|
||||||
let default_branch = default_branch.clone();
|
|
||||||
async move {
|
|
||||||
crate::hook::webhook_dispatch::dispatch_repo_webhooks(
|
|
||||||
&db,
|
|
||||||
&http_client,
|
|
||||||
&logger,
|
|
||||||
&repo_id_str,
|
|
||||||
&namespace,
|
|
||||||
&repo_name,
|
|
||||||
&default_branch,
|
|
||||||
"",
|
|
||||||
"",
|
|
||||||
crate::hook::webhook_dispatch::WebhookEventKind::TagPush {
|
|
||||||
r#ref: format!("refs/tags/{}", tag_name),
|
|
||||||
before: before_oid,
|
|
||||||
after: after_oid,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
handles.push(h);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for all webhooks to complete before returning
|
|
||||||
for h in handles {
|
|
||||||
let _ = h.await;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run_fsck(&self, repo_id: &str) -> Result<(), GitError> {
|
|
||||||
let repo_uuid = models::Uuid::parse_str(repo_id)
|
|
||||||
.map_err(|_| GitError::Internal("invalid repo_id uuid".into()))?;
|
|
||||||
|
|
||||||
let repo = models::repos::repo::Entity::find_by_id(repo_uuid)
|
|
||||||
.one(self.db.reader())
|
|
||||||
.await
|
|
||||||
.map_err(GitError::from)?
|
|
||||||
.ok_or_else(|| GitError::NotFound(format!("repo {} not found", repo_id)))?;
|
|
||||||
|
|
||||||
if !std::path::Path::new(&repo.storage_path).exists() {
|
|
||||||
return Err(GitError::NotFound(format!(
|
|
||||||
"storage path does not exist: {}",
|
|
||||||
repo.storage_path
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
let db = self.db.clone();
|
|
||||||
let cache = self.cache.clone();
|
|
||||||
let logger = self.logger.clone();
|
|
||||||
let sync = HookMetaDataSync::new(db, cache, repo, logger)?;
|
|
||||||
sync.fsck_only().await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run_gc(&self, repo_id: &str) -> Result<(), GitError> {
|
|
||||||
let repo_uuid = models::Uuid::parse_str(repo_id)
|
|
||||||
.map_err(|_| GitError::Internal("invalid repo_id uuid".into()))?;
|
|
||||||
|
|
||||||
let repo = models::repos::repo::Entity::find_by_id(repo_uuid)
|
|
||||||
.one(self.db.reader())
|
|
||||||
.await
|
|
||||||
.map_err(GitError::from)?
|
|
||||||
.ok_or_else(|| GitError::NotFound(format!("repo {} not found", repo_id)))?;
|
|
||||||
|
|
||||||
if !std::path::Path::new(&repo.storage_path).exists() {
|
|
||||||
return Err(GitError::NotFound(format!(
|
|
||||||
"storage path does not exist: {}",
|
|
||||||
repo.storage_path
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
let db = self.db.clone();
|
|
||||||
let cache = self.cache.clone();
|
|
||||||
let logger = self.logger.clone();
|
|
||||||
let sync = HookMetaDataSync::new(db, cache, repo, logger)?;
|
|
||||||
sync.gc_only().await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,2 +1,141 @@
|
|||||||
// sync_refs has been moved to commit.rs to consolidate sync helpers.
|
use crate::GitError;
|
||||||
// Keeping this module stub to avoid breaking existing module references.
|
use crate::hook::sync::HookMetaDataSync;
|
||||||
|
use db::database::AppTransaction;
|
||||||
|
use models::repos::repo;
|
||||||
|
use models::repos::repo_branch;
|
||||||
|
use sea_orm::prelude::Expr;
|
||||||
|
use sea_orm::*;
|
||||||
|
use std::collections::HashSet;
|
||||||
|
|
||||||
|
impl HookMetaDataSync {
|
||||||
|
pub async fn sync_refs(&self, txn: &AppTransaction) -> Result<(), GitError> {
|
||||||
|
let repo_id = self.repo.id;
|
||||||
|
let now = chrono::Utc::now();
|
||||||
|
|
||||||
|
let existing: Vec<repo_branch::Model> = repo_branch::Entity::find()
|
||||||
|
.filter(repo_branch::Column::Repo.eq(repo_id))
|
||||||
|
.all(txn)
|
||||||
|
.await
|
||||||
|
.map_err(|e| GitError::IoError(format!("failed to query branches: {}", e)))?;
|
||||||
|
let mut existing_names: HashSet<String> = existing.iter().map(|r| r.name.clone()).collect();
|
||||||
|
|
||||||
|
let references = self
|
||||||
|
.domain
|
||||||
|
.repo()
|
||||||
|
.references()
|
||||||
|
.map_err(|e| GitError::Internal(e.to_string()))?;
|
||||||
|
|
||||||
|
// Auto-detect first local branch when default_branch is empty
|
||||||
|
let mut auto_detected_branch: Option<String> = None;
|
||||||
|
|
||||||
|
for reference in references {
|
||||||
|
let reference = reference.map_err(|e| GitError::Internal(e.to_string()))?;
|
||||||
|
let name = reference
|
||||||
|
.name()
|
||||||
|
.ok_or_else(|| GitError::RefNotFound("unnamed ref".into()))?
|
||||||
|
.to_string();
|
||||||
|
let shorthand = reference.shorthand().unwrap_or("").to_string();
|
||||||
|
|
||||||
|
let target_oid = match reference.target() {
|
||||||
|
Some(oid) => oid.to_string(),
|
||||||
|
None => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
let is_branch = reference.is_branch();
|
||||||
|
let is_remote = reference.is_remote();
|
||||||
|
|
||||||
|
// Detect first local branch if no default is set
|
||||||
|
if self.repo.default_branch.is_empty()
|
||||||
|
&& is_branch
|
||||||
|
&& !is_remote
|
||||||
|
&& auto_detected_branch.is_none()
|
||||||
|
{
|
||||||
|
auto_detected_branch = Some(shorthand.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
let upstream = if is_branch && !is_remote {
|
||||||
|
reference
|
||||||
|
.shorthand()
|
||||||
|
.map(|short| format!("refs/remotes/{{}}/{}", short))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
if existing_names.contains(&name) {
|
||||||
|
existing_names.remove(&name);
|
||||||
|
repo_branch::Entity::update_many()
|
||||||
|
.filter(repo_branch::Column::Repo.eq(repo_id))
|
||||||
|
.filter(repo_branch::Column::Name.eq(&name))
|
||||||
|
.col_expr(repo_branch::Column::Oid, Expr::value(&target_oid))
|
||||||
|
.col_expr(repo_branch::Column::Upstream, Expr::value(upstream))
|
||||||
|
.col_expr(
|
||||||
|
repo_branch::Column::Head,
|
||||||
|
Expr::value(is_branch && shorthand == self.repo.default_branch),
|
||||||
|
)
|
||||||
|
.col_expr(repo_branch::Column::UpdatedAt, Expr::value(now))
|
||||||
|
.exec(txn)
|
||||||
|
.await
|
||||||
|
.map_err(|e| GitError::IoError(format!("failed to update branch: {}", e)))?;
|
||||||
|
} else {
|
||||||
|
let new_branch = repo_branch::ActiveModel {
|
||||||
|
repo: Set(repo_id),
|
||||||
|
name: Set(name),
|
||||||
|
oid: Set(target_oid),
|
||||||
|
upstream: Set(upstream),
|
||||||
|
head: Set(is_branch && shorthand == self.repo.default_branch),
|
||||||
|
created_at: Set(now),
|
||||||
|
updated_at: Set(now),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
new_branch
|
||||||
|
.insert(txn)
|
||||||
|
.await
|
||||||
|
.map_err(|e| GitError::IoError(format!("failed to insert branch: {}", e)))?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !existing_names.is_empty() {
|
||||||
|
repo_branch::Entity::delete_many()
|
||||||
|
.filter(repo_branch::Column::Repo.eq(repo_id))
|
||||||
|
.filter(repo_branch::Column::Name.is_in(existing_names))
|
||||||
|
.exec(txn)
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
GitError::IoError(format!("failed to delete stale branches: {}", e))
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Persist auto-detected default branch and update head flags
|
||||||
|
if let Some(ref branch_name) = auto_detected_branch {
|
||||||
|
// 1. Update the repo's default_branch
|
||||||
|
repo::Entity::update_many()
|
||||||
|
.filter(repo::Column::Id.eq(repo_id))
|
||||||
|
.col_expr(
|
||||||
|
repo::Column::DefaultBranch,
|
||||||
|
Expr::value(branch_name.clone()),
|
||||||
|
)
|
||||||
|
.exec(txn)
|
||||||
|
.await
|
||||||
|
.map_err(|e| GitError::IoError(format!("failed to set default branch: {}", e)))?;
|
||||||
|
|
||||||
|
// 2. Clear head on all branches
|
||||||
|
repo_branch::Entity::update_many()
|
||||||
|
.filter(repo_branch::Column::Repo.eq(repo_id))
|
||||||
|
.col_expr(repo_branch::Column::Head, Expr::value(false))
|
||||||
|
.exec(txn)
|
||||||
|
.await
|
||||||
|
.map_err(|e| GitError::IoError(format!("failed to clear head flags: {}", e)))?;
|
||||||
|
|
||||||
|
// 3. Set head = true for the detected branch (it was inserted above)
|
||||||
|
repo_branch::Entity::update_many()
|
||||||
|
.filter(repo_branch::Column::Repo.eq(repo_id))
|
||||||
|
.filter(repo_branch::Column::Name.eq(branch_name))
|
||||||
|
.col_expr(repo_branch::Column::Head, Expr::value(true))
|
||||||
|
.exec(txn)
|
||||||
|
.await
|
||||||
|
.map_err(|e| GitError::IoError(format!("failed to set head flag: {}", e)))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -10,225 +10,7 @@ use sea_orm::*;
|
|||||||
use sea_query::OnConflict;
|
use sea_query::OnConflict;
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
|
|
||||||
/// Owned branch data collected from git2 (no git2 types after this).
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub(crate) struct BranchTip {
|
|
||||||
pub name: String,
|
|
||||||
pub shorthand: String,
|
|
||||||
pub target_oid: String,
|
|
||||||
pub is_branch: bool,
|
|
||||||
pub is_remote: bool,
|
|
||||||
pub upstream: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Owned tag data collected from git2 (no git2 types after this).
|
|
||||||
/// Used by sync_tags; currently populated by collect_git_refs but not yet consumed.
|
|
||||||
#[allow(dead_code)]
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub(crate) struct TagTip {
|
|
||||||
pub name: String,
|
|
||||||
pub target_oid: String,
|
|
||||||
pub description: Option<String>,
|
|
||||||
pub tagger_name: String,
|
|
||||||
pub tagger_email: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Owned commit data collected from git2 (no git2 types after this).
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct CommitData {
|
|
||||||
oid: String,
|
|
||||||
author_name: String,
|
|
||||||
author_email: String,
|
|
||||||
committer_name: String,
|
|
||||||
committer_email: String,
|
|
||||||
message: String,
|
|
||||||
parent_ids: Vec<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl HookMetaDataSync {
|
impl HookMetaDataSync {
|
||||||
/// Collect all git2 branch/tag data into owned structs.
|
|
||||||
/// This is sync and must be called from a `spawn_blocking` context.
|
|
||||||
pub(crate) fn collect_git_refs(&self) -> Result<(Vec<BranchTip>, Vec<TagTip>), GitError> {
|
|
||||||
let mut branches = Vec::new();
|
|
||||||
let mut tags = Vec::new();
|
|
||||||
|
|
||||||
let references = self
|
|
||||||
.domain
|
|
||||||
.repo()
|
|
||||||
.references()
|
|
||||||
.map_err(|e| GitError::Internal(e.to_string()))?;
|
|
||||||
|
|
||||||
for ref_result in references {
|
|
||||||
let reference = match ref_result {
|
|
||||||
Ok(r) => r,
|
|
||||||
Err(e) => {
|
|
||||||
slog::warn!(self.logger, "failed to read reference: {}", e);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let name = match reference.name() {
|
|
||||||
Some(n) => n.to_string(),
|
|
||||||
None => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
let target_oid = match reference.target() {
|
|
||||||
Some(oid) => oid.to_string(),
|
|
||||||
None => continue,
|
|
||||||
};
|
|
||||||
|
|
||||||
let shorthand = reference.shorthand().unwrap_or("").to_string();
|
|
||||||
let is_branch = reference.is_branch();
|
|
||||||
let is_remote = reference.is_remote();
|
|
||||||
|
|
||||||
if reference.is_tag() {
|
|
||||||
tags.push(TagTip {
|
|
||||||
name: name.strip_prefix("refs/tags/").unwrap_or(&name).to_string(),
|
|
||||||
target_oid,
|
|
||||||
description: None,
|
|
||||||
tagger_name: String::new(),
|
|
||||||
tagger_email: String::new(),
|
|
||||||
});
|
|
||||||
} else if is_branch && !is_remote {
|
|
||||||
let upstream = reference
|
|
||||||
.shorthand()
|
|
||||||
.map(|short| format!("refs/remotes/{{}}/{}", short));
|
|
||||||
|
|
||||||
branches.push(BranchTip {
|
|
||||||
name,
|
|
||||||
shorthand,
|
|
||||||
target_oid,
|
|
||||||
is_branch,
|
|
||||||
is_remote,
|
|
||||||
upstream,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok((branches, tags))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn sync_refs(&self, txn: &AppTransaction) -> Result<(), GitError> {
|
|
||||||
let repo_id = self.repo.id;
|
|
||||||
let now = chrono::Utc::now();
|
|
||||||
|
|
||||||
let existing: Vec<models::repos::repo_branch::Model> =
|
|
||||||
models::repos::repo_branch::Entity::find()
|
|
||||||
.filter(models::repos::repo_branch::Column::Repo.eq(repo_id))
|
|
||||||
.all(txn)
|
|
||||||
.await
|
|
||||||
.map_err(|e| GitError::IoError(format!("failed to query branches: {}", e)))?;
|
|
||||||
let mut existing_names: HashSet<String> =
|
|
||||||
existing.iter().map(|r| r.name.clone()).collect();
|
|
||||||
|
|
||||||
let (branches, _) = self.collect_git_refs()?;
|
|
||||||
|
|
||||||
// Auto-detect first local branch when default_branch is empty
|
|
||||||
let mut auto_detected_branch: Option<String> = None;
|
|
||||||
|
|
||||||
for branch in &branches {
|
|
||||||
if existing_names.contains(&branch.name) {
|
|
||||||
existing_names.remove(&branch.name);
|
|
||||||
models::repos::repo_branch::Entity::update_many()
|
|
||||||
.filter(models::repos::repo_branch::Column::Repo.eq(repo_id))
|
|
||||||
.filter(models::repos::repo_branch::Column::Name.eq(&branch.name))
|
|
||||||
.col_expr(
|
|
||||||
models::repos::repo_branch::Column::Oid,
|
|
||||||
sea_orm::prelude::Expr::value(&branch.target_oid),
|
|
||||||
)
|
|
||||||
.col_expr(
|
|
||||||
models::repos::repo_branch::Column::Upstream,
|
|
||||||
sea_orm::prelude::Expr::value(branch.upstream.clone()),
|
|
||||||
)
|
|
||||||
.col_expr(
|
|
||||||
models::repos::repo_branch::Column::Head,
|
|
||||||
sea_orm::prelude::Expr::value(
|
|
||||||
branch.is_branch
|
|
||||||
&& branch.shorthand == self.repo.default_branch,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.col_expr(
|
|
||||||
models::repos::repo_branch::Column::UpdatedAt,
|
|
||||||
sea_orm::prelude::Expr::value(now),
|
|
||||||
)
|
|
||||||
.exec(txn)
|
|
||||||
.await
|
|
||||||
.map_err(|e| GitError::IoError(format!("failed to update branch: {}", e)))?;
|
|
||||||
} else {
|
|
||||||
let new_branch = models::repos::repo_branch::ActiveModel {
|
|
||||||
repo: Set(repo_id),
|
|
||||||
name: Set(branch.name.clone()),
|
|
||||||
oid: Set(branch.target_oid.clone()),
|
|
||||||
upstream: Set(branch.upstream.clone()),
|
|
||||||
head: Set(branch.is_branch && branch.shorthand == self.repo.default_branch),
|
|
||||||
created_at: Set(now),
|
|
||||||
updated_at: Set(now),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
new_branch
|
|
||||||
.insert(txn)
|
|
||||||
.await
|
|
||||||
.map_err(|e| GitError::IoError(format!("failed to insert branch: {}", e)))?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Detect first local branch if no default is set
|
|
||||||
if self.repo.default_branch.is_empty()
|
|
||||||
&& branch.is_branch
|
|
||||||
&& !branch.is_remote
|
|
||||||
&& auto_detected_branch.is_none()
|
|
||||||
{
|
|
||||||
auto_detected_branch = Some(branch.shorthand.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !existing_names.is_empty() {
|
|
||||||
models::repos::repo_branch::Entity::delete_many()
|
|
||||||
.filter(models::repos::repo_branch::Column::Repo.eq(repo_id))
|
|
||||||
.filter(models::repos::repo_branch::Column::Name.is_in(existing_names))
|
|
||||||
.exec(txn)
|
|
||||||
.await
|
|
||||||
.map_err(|e| {
|
|
||||||
GitError::IoError(format!("failed to delete stale branches: {}", e))
|
|
||||||
})?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Persist auto-detected default branch and update head flags
|
|
||||||
if let Some(ref branch_name) = auto_detected_branch {
|
|
||||||
models::repos::repo::Entity::update_many()
|
|
||||||
.filter(models::repos::repo::Column::Id.eq(repo_id))
|
|
||||||
.col_expr(
|
|
||||||
models::repos::repo::Column::DefaultBranch,
|
|
||||||
sea_orm::prelude::Expr::value(branch_name.clone()),
|
|
||||||
)
|
|
||||||
.exec(txn)
|
|
||||||
.await
|
|
||||||
.map_err(|e| GitError::IoError(format!("failed to set default branch: {}", e)))?;
|
|
||||||
|
|
||||||
models::repos::repo_branch::Entity::update_many()
|
|
||||||
.filter(models::repos::repo_branch::Column::Repo.eq(repo_id))
|
|
||||||
.col_expr(
|
|
||||||
models::repos::repo_branch::Column::Head,
|
|
||||||
sea_orm::prelude::Expr::value(false),
|
|
||||||
)
|
|
||||||
.exec(txn)
|
|
||||||
.await
|
|
||||||
.map_err(|e| GitError::IoError(format!("failed to clear head flags: {}", e)))?;
|
|
||||||
|
|
||||||
models::repos::repo_branch::Entity::update_many()
|
|
||||||
.filter(models::repos::repo_branch::Column::Repo.eq(repo_id))
|
|
||||||
.filter(models::repos::repo_branch::Column::Name.eq(branch_name))
|
|
||||||
.col_expr(
|
|
||||||
models::repos::repo_branch::Column::Head,
|
|
||||||
sea_orm::prelude::Expr::value(true),
|
|
||||||
)
|
|
||||||
.exec(txn)
|
|
||||||
.await
|
|
||||||
.map_err(|e| GitError::IoError(format!("failed to set head flag: {}", e)))?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn sync_commits(&self, txn: &AppTransaction) -> Result<(), GitError> {
|
pub async fn sync_commits(&self, txn: &AppTransaction) -> Result<(), GitError> {
|
||||||
let repo_id = self.repo.id;
|
let repo_id = self.repo.id;
|
||||||
let repo = self.domain.repo();
|
let repo = self.domain.repo();
|
||||||
@ -418,6 +200,7 @@ impl HookMetaDataSync {
|
|||||||
created_at: Set(now),
|
created_at: Set(now),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
// Use ON CONFLICT DO NOTHING so concurrent syncs don't collide.
|
||||||
let _ = RepoCollaborator::insert(new_collab)
|
let _ = RepoCollaborator::insert(new_collab)
|
||||||
.on_conflict(
|
.on_conflict(
|
||||||
OnConflict::columns([
|
OnConflict::columns([
|
||||||
@ -450,3 +233,13 @@ impl HookMetaDataSync {
|
|||||||
names
|
names
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct CommitData {
|
||||||
|
oid: String,
|
||||||
|
author_name: String,
|
||||||
|
author_email: String,
|
||||||
|
committer_name: String,
|
||||||
|
committer_email: String,
|
||||||
|
message: String,
|
||||||
|
parent_ids: Vec<String>,
|
||||||
|
}
|
||||||
|
|||||||
@ -106,7 +106,6 @@ impl HookMetaDataSync {
|
|||||||
|
|
||||||
let _ = tokio::task::spawn_blocking(move || {
|
let _ = tokio::task::spawn_blocking(move || {
|
||||||
for (ref_name, oid) in &refs {
|
for (ref_name, oid) in &refs {
|
||||||
// git update-ref -m <msg> <ref> <new-sha> — no old-sha in rollback
|
|
||||||
let status = Command::new("git")
|
let status = Command::new("git")
|
||||||
.arg("-C")
|
.arg("-C")
|
||||||
.arg(&storage_path)
|
.arg(&storage_path)
|
||||||
@ -115,6 +114,7 @@ impl HookMetaDataSync {
|
|||||||
.arg("rollback: integrity check failed")
|
.arg("rollback: integrity check failed")
|
||||||
.arg(ref_name)
|
.arg(ref_name)
|
||||||
.arg(oid)
|
.arg(oid)
|
||||||
|
.arg("HEAD")
|
||||||
.status();
|
.status();
|
||||||
|
|
||||||
match status {
|
match status {
|
||||||
|
|||||||
@ -18,8 +18,6 @@ impl HookMetaDataSync {
|
|||||||
.map_err(|e| GitError::IoError(format!("git gc failed: {}", e)))?;
|
.map_err(|e| GitError::IoError(format!("git gc failed: {}", e)))?;
|
||||||
|
|
||||||
if !status.success() {
|
if !status.success() {
|
||||||
// git gc --auto exits non-zero when there's nothing to collect,
|
|
||||||
// or when another gc is already running — both are benign.
|
|
||||||
slog::warn!(logger, "git gc exited with {:?}", status.code());
|
slog::warn!(logger, "git gc exited with {:?}", status.code());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,66 +0,0 @@
|
|||||||
use crate::GitError;
|
|
||||||
use crate::hook::sync::HookMetaDataSync;
|
|
||||||
|
|
||||||
impl HookMetaDataSync {
|
|
||||||
const LOCK_TTL_SECS: u64 = 300;
|
|
||||||
|
|
||||||
/// Try to acquire an exclusive lock for this repo.
|
|
||||||
/// Returns the lock value if acquired, which must be passed to `release_lock`.
|
|
||||||
pub async fn acquire_lock(&self) -> Result<String, GitError> {
|
|
||||||
let lock_key = format!("git:repo:lock:{}", self.repo.id);
|
|
||||||
let lock_value = format!("{}:{}", uuid::Uuid::new_v4(), std::process::id());
|
|
||||||
|
|
||||||
let mut conn = self
|
|
||||||
.cache
|
|
||||||
.conn()
|
|
||||||
.await
|
|
||||||
.map_err(|e| GitError::IoError(format!("failed to get redis connection: {}", e)))?;
|
|
||||||
|
|
||||||
let result: bool = redis::cmd("SET")
|
|
||||||
.arg(&lock_key)
|
|
||||||
.arg(&lock_value)
|
|
||||||
.arg("NX")
|
|
||||||
.arg("EX")
|
|
||||||
.arg(Self::LOCK_TTL_SECS)
|
|
||||||
.query_async(&mut conn)
|
|
||||||
.await
|
|
||||||
.map_err(|e| GitError::IoError(format!("failed to acquire lock: {}", e)))?;
|
|
||||||
|
|
||||||
if result {
|
|
||||||
Ok(lock_value)
|
|
||||||
} else {
|
|
||||||
Err(GitError::Locked(format!(
|
|
||||||
"repository {} is locked by another process",
|
|
||||||
self.repo.id
|
|
||||||
)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Release the lock, but only if we still own it (value matches).
|
|
||||||
pub async fn release_lock(&self, lock_value: &str) -> Result<(), GitError> {
|
|
||||||
let lock_key = format!("git:repo:lock:{}", self.repo.id);
|
|
||||||
|
|
||||||
let mut conn = self
|
|
||||||
.cache
|
|
||||||
.conn()
|
|
||||||
.await
|
|
||||||
.map_err(|e| GitError::IoError(format!("failed to get redis connection: {}", e)))?;
|
|
||||||
|
|
||||||
let script = r#"
|
|
||||||
if redis.call("get", KEYS[1]) == ARGV[1] then
|
|
||||||
return redis.call("del", KEYS[1])
|
|
||||||
else
|
|
||||||
return 0
|
|
||||||
end
|
|
||||||
"#;
|
|
||||||
|
|
||||||
let _: i32 = redis::Script::new(script)
|
|
||||||
.key(&lock_key)
|
|
||||||
.arg(lock_value)
|
|
||||||
.invoke_async(&mut conn)
|
|
||||||
.await
|
|
||||||
.map_err(|e| GitError::IoError(format!("failed to release lock: {}", e)))?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -3,7 +3,6 @@ pub mod commit;
|
|||||||
pub mod fsck;
|
pub mod fsck;
|
||||||
pub mod gc;
|
pub mod gc;
|
||||||
pub mod lfs;
|
pub mod lfs;
|
||||||
pub mod lock;
|
|
||||||
pub mod tag;
|
pub mod tag;
|
||||||
|
|
||||||
use db::cache::AppCache;
|
use db::cache::AppCache;
|
||||||
@ -26,6 +25,7 @@ use sha1::Digest;
|
|||||||
|
|
||||||
/// Recursively scan `base` for files named `SKILL.md`.
|
/// Recursively scan `base` for files named `SKILL.md`.
|
||||||
/// The skill slug is `{short_repo_id}/{parent_dir_name}` to ensure uniqueness across repos.
|
/// The skill slug is `{short_repo_id}/{parent_dir_name}` to ensure uniqueness across repos.
|
||||||
|
/// Populates `commit_sha` (current HEAD) and `blob_hash` for each discovered file.
|
||||||
fn scan_skills_from_dir(
|
fn scan_skills_from_dir(
|
||||||
base: &Path,
|
base: &Path,
|
||||||
repo_id: &RepoId,
|
repo_id: &RepoId,
|
||||||
@ -65,6 +65,8 @@ fn scan_skills_from_dir(
|
|||||||
Ok(discovered)
|
Ok(discovered)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Compute the git blob SHA-1 hash of `content`.
|
||||||
|
/// Format: "blob {len}\0{data}"
|
||||||
fn git_blob_hash(content: &[u8]) -> String {
|
fn git_blob_hash(content: &[u8]) -> String {
|
||||||
let size = content.len();
|
let size = content.len();
|
||||||
let header = format!("blob {}\0", size);
|
let header = format!("blob {}\0", size);
|
||||||
@ -74,6 +76,7 @@ fn git_blob_hash(content: &[u8]) -> String {
|
|||||||
hex::encode(hasher.finalize())
|
hex::encode(hasher.finalize())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Parse a SKILL.md file (raw bytes) to extract name, description, content, and frontmatter metadata.
|
||||||
fn parse_skill_content(slug: &str, raw: &[u8]) -> DiscoveredSkill {
|
fn parse_skill_content(slug: &str, raw: &[u8]) -> DiscoveredSkill {
|
||||||
let content = String::from_utf8_lossy(raw);
|
let content = String::from_utf8_lossy(raw);
|
||||||
let (frontmatter, body) = extract_frontmatter(&content);
|
let (frontmatter, body) = extract_frontmatter(&content);
|
||||||
@ -103,6 +106,7 @@ fn parse_skill_content(slug: &str, raw: &[u8]) -> DiscoveredSkill {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A skill discovered in a repository.
|
||||||
struct DiscoveredSkill {
|
struct DiscoveredSkill {
|
||||||
slug: String,
|
slug: String,
|
||||||
name: String,
|
name: String,
|
||||||
@ -153,42 +157,79 @@ impl HookMetaDataSync {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Full sync with lock. Caller (worker) manages locking.
|
/// Full sync: refs → commits → tags → LFS → fsck → gc → skills.
|
||||||
|
/// No distributed lock — K8s StatefulSet scheduling guarantees exclusive access.
|
||||||
pub async fn sync(&self) -> Result<(), crate::GitError> {
|
pub async fn sync(&self) -> Result<(), crate::GitError> {
|
||||||
let lock_value = self.acquire_lock().await?;
|
// All git2 operations must run on a blocking thread since git2 types are not Send.
|
||||||
|
let db = self.db.clone();
|
||||||
|
let cache = self.cache.clone();
|
||||||
|
let repo = self.repo.clone();
|
||||||
|
let logger = self.logger.clone();
|
||||||
|
|
||||||
let res = self.sync_work().await;
|
let res = tokio::task::spawn_blocking(move || {
|
||||||
|
tokio::runtime::Handle::current().block_on(async move {
|
||||||
|
let sync = Self::new(db, cache, repo, logger)?;
|
||||||
|
let out = sync.sync_full().await;
|
||||||
|
if let Err(ref e) = out {
|
||||||
|
slog::error!(sync.logger, "sync failed: {}", e);
|
||||||
|
}
|
||||||
|
out
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|e| crate::GitError::Internal(format!("spawn_blocking join error: {}", e)))??;
|
||||||
|
|
||||||
if let Err(ref e) = res {
|
Ok(res)
|
||||||
slog::error!(self.logger, "sync failed: {}", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
let _ = self.release_lock(&lock_value).await;
|
|
||||||
res
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fsck only with lock. Caller manages locking.
|
/// Run fsck only (refs snapshot + git fsck + rollback on corruption).
|
||||||
pub async fn fsck_only(&self) -> Result<(), crate::GitError> {
|
pub async fn fsck_only(&self) -> Result<(), crate::GitError> {
|
||||||
let lock_value = self.acquire_lock().await?;
|
let db = self.db.clone();
|
||||||
|
let cache = self.cache.clone();
|
||||||
|
let repo = self.repo.clone();
|
||||||
|
let logger = self.logger.clone();
|
||||||
|
|
||||||
let res = self.fsck_work().await;
|
tokio::task::spawn_blocking(move || {
|
||||||
|
tokio::runtime::Handle::current().block_on(async move {
|
||||||
|
let sync = Self::new(db, cache, repo, logger)?;
|
||||||
|
let mut txn = sync.db.begin().await.map_err(|e| {
|
||||||
|
crate::GitError::IoError(format!("failed to begin transaction: {}", e))
|
||||||
|
})?;
|
||||||
|
sync.run_fsck_and_rollback_if_corrupt(&mut txn).await?;
|
||||||
|
txn.commit().await.map_err(|e| {
|
||||||
|
crate::GitError::IoError(format!("failed to commit transaction: {}", e))
|
||||||
|
})?;
|
||||||
|
Ok::<(), crate::GitError>(())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|e| crate::GitError::Internal(format!("spawn_blocking join error: {}", e)))??;
|
||||||
|
|
||||||
let _ = self.release_lock(&lock_value).await;
|
Ok(())
|
||||||
res
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// GC only with lock. Caller manages locking.
|
/// Run gc only.
|
||||||
pub async fn gc_only(&self) -> Result<(), crate::GitError> {
|
pub async fn gc_only(&self) -> Result<(), crate::GitError> {
|
||||||
let lock_value = self.acquire_lock().await?;
|
let db = self.db.clone();
|
||||||
|
let cache = self.cache.clone();
|
||||||
|
let repo = self.repo.clone();
|
||||||
|
let logger = self.logger.clone();
|
||||||
|
|
||||||
let res = self.gc_work().await;
|
tokio::task::spawn_blocking(move || {
|
||||||
|
tokio::runtime::Handle::current().block_on(async move {
|
||||||
|
let sync = Self::new(db, cache, repo, logger)?;
|
||||||
|
sync.run_gc().await
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|e| crate::GitError::Internal(format!("spawn_blocking join error: {}", e)))??;
|
||||||
|
|
||||||
let _ = self.release_lock(&lock_value).await;
|
Ok(())
|
||||||
res
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Full sync pipeline (no locking — caller is responsible).
|
/// Full sync pipeline inside a single DB transaction.
|
||||||
async fn sync_work(&self) -> Result<(), crate::GitError> {
|
/// On fsck failure, refs are rolled back and an error is returned.
|
||||||
|
async fn sync_full(&self) -> Result<(), crate::GitError> {
|
||||||
let mut txn = self
|
let mut txn = self
|
||||||
.db
|
.db
|
||||||
.begin()
|
.begin()
|
||||||
@ -212,29 +253,6 @@ impl HookMetaDataSync {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fsck only work (no locking — caller is responsible).
|
|
||||||
async fn fsck_work(&self) -> Result<(), crate::GitError> {
|
|
||||||
let mut txn = self
|
|
||||||
.db
|
|
||||||
.begin()
|
|
||||||
.await
|
|
||||||
.map_err(|e| crate::GitError::IoError(format!("failed to begin transaction: {}", e)))?;
|
|
||||||
|
|
||||||
self.run_fsck_and_rollback_if_corrupt(&mut txn)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
txn.commit().await.map_err(|e| {
|
|
||||||
crate::GitError::IoError(format!("failed to commit transaction: {}", e))
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// GC only work (no locking — caller is responsible).
|
|
||||||
async fn gc_work(&self) -> Result<(), crate::GitError> {
|
|
||||||
self.run_gc().await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a list of (branch_name, oid) for all local branches.
|
/// Returns a list of (branch_name, oid) for all local branches.
|
||||||
pub fn list_branch_tips(&self) -> Vec<(String, String)> {
|
pub fn list_branch_tips(&self) -> Vec<(String, String)> {
|
||||||
let repo = self.domain.repo();
|
let repo = self.domain.repo();
|
||||||
@ -283,7 +301,7 @@ impl HookMetaDataSync {
|
|||||||
let project_uid = self.repo.project;
|
let project_uid = self.repo.project;
|
||||||
|
|
||||||
let repo_root = match self.domain.repo().workdir() {
|
let repo_root = match self.domain.repo().workdir() {
|
||||||
Some(p) => p.to_path_buf(),
|
Some(p) => p,
|
||||||
None => return,
|
None => return,
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -296,19 +314,10 @@ impl HookMetaDataSync {
|
|||||||
.map(|oid| oid.to_string())
|
.map(|oid| oid.to_string())
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
let repo_id = self.repo.id;
|
let discovered = match scan_skills_from_dir(repo_root, &self.repo.id, &commit_sha) {
|
||||||
let discovered = match tokio::task::spawn_blocking(move || {
|
Ok(d) => d,
|
||||||
scan_skills_from_dir(&repo_root, &repo_id, &commit_sha)
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(Ok(d)) => d,
|
|
||||||
Ok(Err(e)) => {
|
|
||||||
slog::warn!(self.logger, "failed to scan skills directory: {}", e);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
slog::warn!(self.logger, "spawn_blocking join error: {}", e);
|
slog::warn!(self.logger, "failed to scan skills directory: {}", e);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
@ -231,7 +231,7 @@ pub async fn dispatch_repo_webhooks(
|
|||||||
{
|
{
|
||||||
Ok(ws) => ws,
|
Ok(ws) => ws,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
slog::error!(logs, "failed to query webhooks repo={} error={}", repo_uuid, e);
|
slog::error!(logs, "{}", format!("failed to query webhooks repo={} error={}", repo_uuid, e));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -297,7 +297,7 @@ pub async fn dispatch_repo_webhooks(
|
|||||||
let body = match serde_json::to_vec(&payload) {
|
let body = match serde_json::to_vec(&payload) {
|
||||||
Ok(b) => b,
|
Ok(b) => b,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
slog::error!(logs, "failed to serialize push payload error={}", e);
|
slog::error!(logs, "{}", format!("failed to serialize push payload error={}", e));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -310,15 +310,15 @@ pub async fn dispatch_repo_webhooks(
|
|||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(Ok(())) => {
|
Ok(Ok(())) => {
|
||||||
slog::info!(logs, "push webhook delivered webhook_id={} url={}", webhook_id, url);
|
slog::info!(logs, "{}", format!("push webhook delivered webhook_id={} url={}", webhook_id, url));
|
||||||
let _ = touch_webhook(db, webhook_id, true, logs).await;
|
let _ = touch_webhook(db, webhook_id, true, logs).await;
|
||||||
}
|
}
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
slog::warn!(logs, "push webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e);
|
slog::warn!(logs, "{}", format!("push webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e));
|
||||||
let _ = touch_webhook(db, webhook_id, false, logs).await;
|
let _ = touch_webhook(db, webhook_id, false, logs).await;
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
slog::warn!(logs, "push webhook timed out webhook_id={} url={}", webhook_id, url);
|
slog::warn!(logs, "{}", format!("push webhook timed out webhook_id={} url={}", webhook_id, url));
|
||||||
let _ = touch_webhook(db, webhook_id, false, logs).await;
|
let _ = touch_webhook(db, webhook_id, false, logs).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -351,7 +351,7 @@ pub async fn dispatch_repo_webhooks(
|
|||||||
let body = match serde_json::to_vec(&payload) {
|
let body = match serde_json::to_vec(&payload) {
|
||||||
Ok(b) => b,
|
Ok(b) => b,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
slog::error!(logs, "failed to serialize tag payload error={}", e);
|
slog::error!(logs, "{}", format!("failed to serialize tag payload error={}", e));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -364,15 +364,15 @@ pub async fn dispatch_repo_webhooks(
|
|||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(Ok(())) => {
|
Ok(Ok(())) => {
|
||||||
slog::info!(logs, "tag webhook delivered webhook_id={} url={}", webhook_id, url);
|
slog::info!(logs, "{}", format!("tag webhook delivered webhook_id={} url={}", webhook_id, url));
|
||||||
let _ = touch_webhook(db, webhook_id, true, logs).await;
|
let _ = touch_webhook(db, webhook_id, true, logs).await;
|
||||||
}
|
}
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
slog::warn!(logs, "tag webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e);
|
slog::warn!(logs, "{}", format!("tag webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e));
|
||||||
let _ = touch_webhook(db, webhook_id, false, logs).await;
|
let _ = touch_webhook(db, webhook_id, false, logs).await;
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
slog::warn!(logs, "tag webhook timed out webhook_id={} url={}", webhook_id, url);
|
slog::warn!(logs, "{}", format!("tag webhook timed out webhook_id={} url={}", webhook_id, url));
|
||||||
let _ = touch_webhook(db, webhook_id, false, logs).await;
|
let _ = touch_webhook(db, webhook_id, false, logs).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -384,7 +384,7 @@ pub async fn dispatch_repo_webhooks(
|
|||||||
async fn touch_webhook(db: &AppDatabase, webhook_id: i64, success: bool, logs: &slog::Logger) {
|
async fn touch_webhook(db: &AppDatabase, webhook_id: i64, success: bool, logs: &slog::Logger) {
|
||||||
use models::repos::repo_webhook::{Column as RwCol, Entity as RepoWebhookEntity};
|
use models::repos::repo_webhook::{Column as RwCol, Entity as RepoWebhookEntity};
|
||||||
use models::{ColumnTrait, EntityTrait, QueryFilter};
|
use models::{ColumnTrait, EntityTrait, QueryFilter};
|
||||||
use sea_orm::{sea_query::Expr, ExprTrait};
|
use sea_orm::prelude::Expr;
|
||||||
|
|
||||||
let result: Result<sea_orm::UpdateResult, sea_orm::DbErr> = if success {
|
let result: Result<sea_orm::UpdateResult, sea_orm::DbErr> = if success {
|
||||||
RepoWebhookEntity::update_many()
|
RepoWebhookEntity::update_many()
|
||||||
@ -393,18 +393,18 @@ async fn touch_webhook(db: &AppDatabase, webhook_id: i64, success: bool, logs: &
|
|||||||
RwCol::LastDeliveredAt,
|
RwCol::LastDeliveredAt,
|
||||||
Expr::value(Some(chrono::Utc::now())),
|
Expr::value(Some(chrono::Utc::now())),
|
||||||
)
|
)
|
||||||
.col_expr(RwCol::TouchCount, Expr::col(RwCol::TouchCount).add(1))
|
.col_expr(RwCol::TouchCount, Expr::value(1i64))
|
||||||
.exec(db.writer())
|
.exec(db.writer())
|
||||||
.await
|
.await
|
||||||
} else {
|
} else {
|
||||||
RepoWebhookEntity::update_many()
|
RepoWebhookEntity::update_many()
|
||||||
.filter(RwCol::Id.eq(webhook_id))
|
.filter(RwCol::Id.eq(webhook_id))
|
||||||
.col_expr(RwCol::TouchCount, Expr::col(RwCol::TouchCount).add(1))
|
.col_expr(RwCol::TouchCount, Expr::value(1i64))
|
||||||
.exec(db.writer())
|
.exec(db.writer())
|
||||||
.await
|
.await
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
slog::warn!(logs, "failed to update webhook touch error={}", e);
|
slog::warn!(logs, "{}", format!("failed to update webhook touch error={}", e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -72,17 +72,16 @@ pub async fn run_http(config: AppConfig, logger: Logger) -> anyhow::Result<()> {
|
|||||||
let app_cache = app_cache?;
|
let app_cache = app_cache?;
|
||||||
|
|
||||||
let redis_pool = app_cache.redis_pool().clone();
|
let redis_pool = app_cache.redis_pool().clone();
|
||||||
|
let http = Arc::new(reqwest::Client::new());
|
||||||
let hook = HookService::new(
|
let hook = HookService::new(
|
||||||
db.clone(),
|
db.clone(),
|
||||||
app_cache.clone(),
|
app_cache.clone(),
|
||||||
redis_pool.clone(),
|
redis_pool.clone(),
|
||||||
logger.clone(),
|
logger.clone(),
|
||||||
config.clone(),
|
config.clone(),
|
||||||
|
http,
|
||||||
);
|
);
|
||||||
let _worker_cancel = hook.start_worker();
|
let sync = crate::ssh::ReceiveSyncService::new(hook);
|
||||||
slog::info!(logger, "hook worker started");
|
|
||||||
|
|
||||||
let sync = crate::ssh::ReceiveSyncService::new(redis_pool.clone(), logger.clone());
|
|
||||||
|
|
||||||
let rate_limiter = Arc::new(rate_limit::RateLimiter::new(
|
let rate_limiter = Arc::new(rate_limit::RateLimiter::new(
|
||||||
rate_limit::RateLimitConfig::default(),
|
rate_limit::RateLimitConfig::default(),
|
||||||
|
|||||||
@ -36,9 +36,6 @@ pub use diff::types::{
|
|||||||
};
|
};
|
||||||
pub use domain::GitDomain;
|
pub use domain::GitDomain;
|
||||||
pub use error::{GitError, GitResult};
|
pub use error::{GitError, GitResult};
|
||||||
pub use hook::pool::types::{HookTask, TaskType};
|
|
||||||
pub use hook::pool::PoolConfig;
|
|
||||||
pub use hook::pool::HookWorker;
|
|
||||||
pub use hook::sync::HookMetaDataSync;
|
pub use hook::sync::HookMetaDataSync;
|
||||||
pub use lfs::types::{LfsConfig, LfsEntry, LfsOid, LfsPointer};
|
pub use lfs::types::{LfsConfig, LfsEntry, LfsOid, LfsPointer};
|
||||||
pub use merge::types::{MergeAnalysisResult, MergeOptions, MergePreferenceResult, MergeheadInfo};
|
pub use merge::types::{MergeAnalysisResult, MergeOptions, MergePreferenceResult, MergeheadInfo};
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
use crate::error::GitError;
|
use crate::error::GitError;
|
||||||
use crate::hook::pool::types::{HookTask, TaskType};
|
use crate::hook::HookService;
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use base64::Engine;
|
use base64::Engine;
|
||||||
use config::AppConfig;
|
use config::AppConfig;
|
||||||
@ -137,12 +137,22 @@ impl SSHHandle {
|
|||||||
"SSH server configured with methods: {:?}", config.methods
|
"SSH server configured with methods: {:?}", config.methods
|
||||||
);
|
);
|
||||||
let token_service = SshTokenService::new(self.db.clone());
|
let token_service = SshTokenService::new(self.db.clone());
|
||||||
|
let http = Arc::new(reqwest::Client::new());
|
||||||
|
let hook = crate::hook::HookService::new(
|
||||||
|
self.db.clone(),
|
||||||
|
self.cache.clone(),
|
||||||
|
self.redis_pool.clone(),
|
||||||
|
self.logger.clone(),
|
||||||
|
self.app.clone(),
|
||||||
|
http,
|
||||||
|
);
|
||||||
let mut server = server::SSHServer::new(
|
let mut server = server::SSHServer::new(
|
||||||
self.db.clone(),
|
self.db.clone(),
|
||||||
self.cache.clone(),
|
self.cache.clone(),
|
||||||
self.redis_pool.clone(),
|
self.redis_pool.clone(),
|
||||||
self.logger.clone(),
|
self.logger.clone(),
|
||||||
token_service,
|
token_service,
|
||||||
|
hook,
|
||||||
);
|
);
|
||||||
|
|
||||||
// Start the rate limiter cleanup background task so the HashMap
|
// Start the rate limiter cleanup background task so the HashMap
|
||||||
@ -167,63 +177,33 @@ impl SSHHandle {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Enqueues a sync task to the Redis-backed hook queue.
|
/// Direct sync service — calls HookService::sync_repo inline.
|
||||||
/// The background worker picks it up and processes it with per-repo locking.
|
/// K8s StatefulSet HA scheduling ensures exclusive access per repo shard.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct ReceiveSyncService {
|
pub struct ReceiveSyncService {
|
||||||
pool: deadpool_redis::cluster::Pool,
|
hook: HookService,
|
||||||
logger: Logger,
|
|
||||||
redis_prefix: String,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ReceiveSyncService {
|
impl ReceiveSyncService {
|
||||||
pub fn new(pool: deadpool_redis::cluster::Pool, logger: Logger) -> Self {
|
pub fn new(hook: HookService) -> Self {
|
||||||
Self {
|
Self { hook }
|
||||||
pool,
|
|
||||||
logger,
|
|
||||||
redis_prefix: "{hook}".to_string(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Enqueue a sync task. Fire-and-forget — logs errors but does not block.
|
/// Execute a full repo sync synchronously.
|
||||||
pub async fn send(&self, task: RepoReceiveSyncTask) {
|
/// Returns Ok on success, Err on failure.
|
||||||
let hook_task = HookTask {
|
pub async fn send(&self, task: RepoReceiveSyncTask) -> Result<(), crate::GitError> {
|
||||||
id: uuid::Uuid::new_v4().to_string(),
|
let repo_id = task.repo_uid.to_string();
|
||||||
repo_id: task.repo_uid.to_string(),
|
slog::info!(self.hook.logger, "starting sync repo_id={}", repo_id);
|
||||||
task_type: TaskType::Sync,
|
let res = self.hook.sync_repo(&repo_id).await;
|
||||||
payload: serde_json::Value::Null,
|
match &res {
|
||||||
created_at: chrono::Utc::now(),
|
Ok(()) => {
|
||||||
retry_count: 0,
|
slog::info!(self.hook.logger, "sync completed repo_id={}", repo_id);
|
||||||
};
|
|
||||||
|
|
||||||
let task_json = match serde_json::to_string(&hook_task) {
|
|
||||||
Ok(j) => j,
|
|
||||||
Err(e) => {
|
|
||||||
error!(self.logger, "failed to serialize hook task: {}", e);
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
let queue_key = format!("{}:sync", self.redis_prefix);
|
|
||||||
|
|
||||||
let redis = match self.pool.get().await {
|
|
||||||
Ok(c) => c,
|
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(self.logger, "failed to get Redis connection: {}", e);
|
slog::error!(self.hook.logger, "sync failed repo_id={} error={}", repo_id, e);
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
let mut conn: deadpool_redis::cluster::Connection = redis;
|
|
||||||
if let Err(e) = redis::cmd("LPUSH")
|
|
||||||
.arg(&queue_key)
|
|
||||||
.arg(&task_json)
|
|
||||||
.query_async::<()>(&mut conn)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
error!(self.logger, "failed to enqueue sync task repo_id={} error={}",
|
|
||||||
task.repo_uid, e);
|
|
||||||
}
|
}
|
||||||
|
res
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -287,18 +267,6 @@ pub async fn run_ssh(config: AppConfig, logger: Logger) -> anyhow::Result<()> {
|
|||||||
let db = AppDatabase::init(&config).await?;
|
let db = AppDatabase::init(&config).await?;
|
||||||
let cache = AppCache::init(&config).await?;
|
let cache = AppCache::init(&config).await?;
|
||||||
let redis_pool = cache.redis_pool().clone();
|
let redis_pool = cache.redis_pool().clone();
|
||||||
|
|
||||||
// Start the hook worker (Redis queue consumer)
|
|
||||||
let hook = crate::hook::HookService::new(
|
|
||||||
db.clone(),
|
|
||||||
cache.clone(),
|
|
||||||
redis_pool.clone(),
|
|
||||||
logger.clone(),
|
|
||||||
config.clone(),
|
|
||||||
);
|
|
||||||
let _worker_cancel = hook.start_worker();
|
|
||||||
slog::info!(logger, "hook worker started");
|
|
||||||
|
|
||||||
SSHHandle::new(db, config.clone(), cache, redis_pool, logger)
|
SSHHandle::new(db, config.clone(), cache, redis_pool, logger)
|
||||||
.run_ssh()
|
.run_ssh()
|
||||||
.await?;
|
.await?;
|
||||||
|
|||||||
@ -1,3 +1,4 @@
|
|||||||
|
use crate::hook::HookService;
|
||||||
use crate::ssh::ReceiveSyncService;
|
use crate::ssh::ReceiveSyncService;
|
||||||
use crate::ssh::SshTokenService;
|
use crate::ssh::SshTokenService;
|
||||||
use crate::ssh::handle::SSHandle;
|
use crate::ssh::handle::SSHandle;
|
||||||
@ -15,6 +16,7 @@ pub struct SSHServer {
|
|||||||
pub redis_pool: RedisPool,
|
pub redis_pool: RedisPool,
|
||||||
pub logger: Logger,
|
pub logger: Logger,
|
||||||
pub token_service: SshTokenService,
|
pub token_service: SshTokenService,
|
||||||
|
pub hook: HookService,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SSHServer {
|
impl SSHServer {
|
||||||
@ -24,6 +26,7 @@ impl SSHServer {
|
|||||||
redis_pool: RedisPool,
|
redis_pool: RedisPool,
|
||||||
logger: Logger,
|
logger: Logger,
|
||||||
token_service: SshTokenService,
|
token_service: SshTokenService,
|
||||||
|
hook: HookService,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
SSHServer {
|
SSHServer {
|
||||||
db,
|
db,
|
||||||
@ -31,6 +34,7 @@ impl SSHServer {
|
|||||||
redis_pool,
|
redis_pool,
|
||||||
logger,
|
logger,
|
||||||
token_service,
|
token_service,
|
||||||
|
hook,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -48,7 +52,7 @@ impl russh::server::Server for SSHServer {
|
|||||||
} else {
|
} else {
|
||||||
info!(self.logger, "New SSH connection from unknown address");
|
info!(self.logger, "New SSH connection from unknown address");
|
||||||
}
|
}
|
||||||
let sync_service = ReceiveSyncService::new(self.redis_pool.clone(), self.logger.clone());
|
let sync_service = ReceiveSyncService::new(self.hook.clone());
|
||||||
SSHandle::new(
|
SSHandle::new(
|
||||||
self.db.clone(),
|
self.db.clone(),
|
||||||
self.cache.clone(),
|
self.cache.clone(),
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user