use crate::error::GitError; use crate::hook::pool::types::HookTask; use deadpool_redis::cluster::Connection as RedisConn; use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; /// NATS consumer function type: returns (task, ack_fn) pairs. pub type NatsHookConsumeFn = Arc< dyn Fn( String, usize, ) -> Pin< Box< dyn Future< Output = anyhow::Result< Vec<( Vec, Box< dyn Fn() -> Pin< Box> + Send>, > + Send, >, )>, >, > + Send, >, > + Send + Sync, >; /// Redis List consumer using BLMOVE for atomic move-from-queue-to-work pattern. pub struct RedisConsumer { pool: deadpool_redis::cluster::Pool, /// Hash-tag-prefixed key prefix, e.g. "{hook}". prefix: String, block_timeout_secs: u64, /// Optional NATS consume function for JetStream integration. nats_consume: Option, } const POOL_GET_TIMEOUT: Duration = Duration::from_secs(5); impl RedisConsumer { pub fn new( pool: deadpool_redis::cluster::Pool, mut prefix: String, block_timeout_secs: u64, ) -> Self { // Redis Cluster requires hash tags ({...}) for multi-key commands // like BLMOVE and Lua scripts to ensure keys hash to the same slot. if !prefix.contains('{') { prefix = format!("{{{}}}", prefix); } Self { pool, prefix, block_timeout_secs, nats_consume: None, } } pub fn with_nats( pool: deadpool_redis::cluster::Pool, mut prefix: String, block_timeout_secs: u64, nats_consume: NatsHookConsumeFn, ) -> Self { if !prefix.contains('{') { prefix = format!("{{{}}}", prefix); } Self { pool, prefix, block_timeout_secs, nats_consume: Some(nats_consume), } } /// Atomically moves a task from the main queue to the work queue using BLMOVE or NATS. /// Blocks up to `block_timeout_secs` waiting for a task. /// /// Returns `Some((HookTask, task_json))` where `task_json` is the raw JSON string /// needed for LREM on ACK. Returns `None` if the blocking timed out. pub async fn next(&self, task_type: &str) -> Result, GitError> { // Try NATS first if available if let Some(nats_consume) = &self.nats_consume { match self.next_nats(task_type, nats_consume).await { Ok(Some(result)) => return Ok(Some(result)), Ok(None) => {} // No messages, fall through to Redis Err(e) => { tracing::warn!(error = %e, "NATS consume failed, falling back to Redis"); } } } // Fallback to Redis List self.next_redis(task_type).await } async fn next_nats( &self, task_type: &str, nats_consume: &NatsHookConsumeFn, ) -> Result, GitError> { let subject = format!("queue.hook.{}", task_type); let messages = nats_consume(subject, 1) .await .map_err(|e| GitError::Internal(format!("NATS consume failed: {}", e)))?; if messages.is_empty() { return Ok(None); } let (data, ack_fn) = messages.into_iter().next().unwrap(); match serde_json::from_slice::(&data) { Ok(task) => { let task_json = String::from_utf8_lossy(&data).to_string(); tracing::debug!( "task dequeued from NATS task_id={} task_type={}", task.id, task.task_type ); // Store ack_fn for later use - we'll need to refactor to support async ack // For now, we'll ack immediately after processing in the worker Ok(Some((task, task_json))) } Err(e) => { tracing::warn!("malformed task JSON from NATS, discarding error={}", e); let _ = ack_fn().await; Ok(None) } } } async fn next_redis(&self, task_type: &str) -> Result, GitError> { let queue_key = format!("{}:{}", self.prefix, task_type); let work_key = format!("{}:{}:work", self.prefix, task_type); let redis = tokio::time::timeout(POOL_GET_TIMEOUT, self.pool.get()) .await .map_err(|_| GitError::Internal("redis pool get timed out".into()))? .map_err(|e| GitError::Internal(format!("redis pool get failed: {}", e)))?; let mut conn: RedisConn = redis; // BLMOVE source destination timeout let task_json: Option = redis::cmd("BLMOVE") .arg(&queue_key) .arg(&work_key) .arg("RIGHT") .arg("LEFT") .arg(self.block_timeout_secs) .query_async(&mut conn) .await .map_err(|e| GitError::Internal(format!("BLMOVE failed: {}", e)))?; match task_json { Some(json) => { match serde_json::from_str::(&json) { Ok(task) => { tracing::debug!( "task dequeued task_id={} task_type={} queue={}", task.id, task.task_type, queue_key ); Ok(Some((task, json))) } Err(e) => { // Malformed task — remove from work queue and discard tracing::warn!( "malformed task JSON, discarding error={} queue={}", e, work_key ); let _ = self.ack_raw(&work_key, &json).await; Ok(None) } } } None => { // Timed out, no task available Ok(None) } } } /// Acknowledge a task: remove it from the work queue (LREM). pub async fn ack(&self, work_key: &str, task_json: &str) -> Result<(), GitError> { self.ack_raw(work_key, task_json).await } async fn ack_raw(&self, work_key: &str, task_json: &str) -> Result<(), GitError> { let redis = tokio::time::timeout(POOL_GET_TIMEOUT, self.pool.get()) .await .map_err(|_| GitError::Internal("redis pool get timed out".into()))? .map_err(|e| GitError::Internal(format!("redis pool get failed: {}", e)))?; let mut conn: RedisConn = redis; let _: i64 = redis::cmd("LREM") .arg(work_key) .arg(-1) .arg(task_json) .query_async(&mut conn) .await .map_err(|e| GitError::Internal(format!("LREM failed: {}", e)))?; Ok(()) } /// Negative acknowledge (retry): remove from work queue and push back to main queue. pub async fn nak( &self, work_key: &str, queue_key: &str, task_json: &str, ) -> Result<(), GitError> { self.nak_with_retry(work_key, queue_key, task_json, task_json) .await } /// Negative acknowledge with a different (updated) task JSON — used to /// requeue with an incremented retry_count. /// Uses a Lua script for atomic LREM + LPUSH to prevent task loss on crash. pub async fn nak_with_retry( &self, work_key: &str, queue_key: &str, old_task_json: &str, new_task_json: &str, ) -> Result<(), GitError> { let redis = tokio::time::timeout(POOL_GET_TIMEOUT, self.pool.get()) .await .map_err(|_| GitError::Internal("redis pool get timed out".into()))? .map_err(|e| GitError::Internal(format!("redis pool get failed: {}", e)))?; let mut conn: RedisConn = redis; // Atomic: remove from work queue AND push to retry queue in one script. // If the process crashes mid-script, either both happen or neither — no lost tasks. let script = r#" redis.call("LREM", KEYS[1], 1, ARGV[1]) redis.call("LPUSH", KEYS[2], ARGV[2]) return 1 "#; let _: i32 = redis::Script::new(script) .key(work_key) .key(queue_key) .arg(old_task_json) .arg(new_task_json) .invoke_async(&mut conn) .await .map_err(|e| GitError::Internal(format!("nak script failed: {}", e)))?; tracing::warn!("task nack'd and requeued queue={}", queue_key); Ok(()) } pub fn prefix(&self) -> &str { &self.prefix } } impl Clone for RedisConsumer { fn clone(&self) -> Self { Self { pool: self.pool.clone(), prefix: self.prefix.clone(), block_timeout_secs: self.block_timeout_secs, nats_consume: self.nats_consume.clone(), } } }