use deadpool_redis::cluster::Connection; use redis::AsyncCommands; use crate::sync::{ReceiveSyncService, TaskType}; pub struct SyncConsumer { service: ReceiveSyncService, block_timeout_secs: u64, } impl SyncConsumer { pub fn new(service: ReceiveSyncService, block_timeout_secs: u64) -> Self { Self { service, block_timeout_secs, } } pub async fn next(&self, task_type: &TaskType) -> Option<(String, String)> { let prefix = &self.service.redis_prefix; let queue_key = match task_type { TaskType::Sync => format!("{prefix}:sync"), TaskType::Fsck => format!("{prefix}:fsck"), TaskType::Gc => format!("{prefix}:gc"), TaskType::Webhook => format!("{prefix}:webhook"), }; let work_key = format!("{queue_key}:work"); let redis = self.service.pool.get().await.ok()?; let mut conn: Connection = redis; let result: Option = redis::cmd("BLMOVE") .arg(&queue_key) .arg(&work_key) .arg("RIGHT") .arg("LEFT") .arg(self.block_timeout_secs) .query_async(&mut conn) .await .ok()?; result.map(|json| (json, work_key)) } pub async fn ack(&self, task_json: &str, work_key: &str) -> Option<()> { let redis = self.service.pool.get().await.ok()?; let mut conn: Connection = redis; let removed: i32 = conn.lrem(work_key, 1, task_json).await.ok()?; if removed > 0 { Some(()) } else { None } } pub async fn nak_with_retry( &self, task_json: &str, work_key: &str, queue_key: &str, ) -> Option<()> { let redis = self.service.pool.get().await.ok()?; let mut conn: Connection = redis; let script = redis::Script::new( r#" local removed = redis.call("LREM", KEYS[1], 0, ARGV[1]) if removed > 0 then redis.call("LPUSH", KEYS[2], ARGV[1]) end return removed "#, ); let result: i32 = script .key(work_key) .key(queue_key) .arg(task_json) .invoke_async(&mut conn) .await .ok()?; if result > 0 { Some(()) } else { None } } pub(crate) fn queue_key_for_task_type( &self, task_type: &TaskType, ) -> String { let prefix = &self.service.redis_prefix; match task_type { TaskType::Sync => format!("{prefix}:sync"), TaskType::Fsck => format!("{prefix}:fsck"), TaskType::Gc => format!("{prefix}:gc"), TaskType::Webhook => format!("{prefix}:webhook"), } } }