- pool/worker.rs: only dispatch webhooks after sync succeeds (skip on error); pass max_retries from PoolConfig to HookWorker - sync/branch.rs: replaced with stub (sync_refs moved to commit.rs) - sync/commit.rs: add collect_git_refs() that collects BranchTip/TagTip from git2 entirely within one sync call; sync_refs now uses owned data so no git2 types cross .await boundaries (future is Send) - sync/fsck.rs: remove extraneous "HEAD" arg from rollback git update-ref (correct syntax is: update-ref -m <msg> <ref> <new-sha> — no old-sha) - webhook_dispatch.rs: touch_count uses Expr::col().add(1) for atomic increment instead of overwriting with Expr::value(1) - pool/mod.rs: pass config.redis_max_retries to HookWorker
45 lines
1.2 KiB
Rust
45 lines
1.2 KiB
Rust
pub mod redis;
|
|
pub mod types;
|
|
pub mod worker;
|
|
|
|
pub use redis::RedisConsumer;
|
|
pub use types::{HookTask, PoolConfig, TaskType};
|
|
pub use worker::HookWorker;
|
|
|
|
use db::cache::AppCache;
|
|
use db::database::AppDatabase;
|
|
use deadpool_redis::cluster::Pool as RedisPool;
|
|
use slog::Logger;
|
|
use std::sync::Arc;
|
|
use tokio_util::sync::CancellationToken;
|
|
|
|
/// Start the hook worker background task.
|
|
/// Returns a handle to the cancellation token so the caller can shut it down.
|
|
pub fn start_worker(
|
|
db: AppDatabase,
|
|
cache: AppCache,
|
|
redis_pool: RedisPool,
|
|
logger: Logger,
|
|
config: PoolConfig,
|
|
) -> CancellationToken {
|
|
let consumer = RedisConsumer::new(
|
|
redis_pool.clone(),
|
|
config.redis_list_prefix.clone(),
|
|
config.redis_block_timeout_secs,
|
|
logger.clone(),
|
|
);
|
|
|
|
let http_client = Arc::new(reqwest::Client::new());
|
|
let max_retries = config.redis_max_retries as u32;
|
|
let worker = HookWorker::new(db, cache, logger, consumer, http_client, max_retries);
|
|
|
|
let cancel = CancellationToken::new();
|
|
let cancel_clone = cancel.clone();
|
|
|
|
tokio::spawn(async move {
|
|
worker.run(cancel_clone).await;
|
|
});
|
|
|
|
cancel
|
|
}
|