use crate::RoomError; use db::cache::AppCache; use std::time::{Duration, Instant}; use uuid::Uuid; const LOCK_TTL_MS: usize = 120_000; const TICKET_TTL_MS: usize = 90_000; const MAX_BACKOFF_MS: u64 = 200; pub struct RoomAiLockGuard { cache: AppCache, queue_key: String, ticket_key: String, lock_key: String, lock_token: String, request_uid: String, acquired: bool, log: slog::Logger, } impl Drop for RoomAiLockGuard { fn drop(&mut self) { if !self.acquired { return; } let cache = self.cache.clone(); let queue_key = self.queue_key.clone(); let ticket_key = self.ticket_key.clone(); let lock_key = self.lock_key.clone(); let lock_token = self.lock_token.clone(); let request_uid = self.request_uid.clone(); let log = self.log.clone(); tokio::spawn(async move { if let Err(e) = release_lock( &cache, &queue_key, &ticket_key, &lock_key, &lock_token, &request_uid, ) .await { slog::warn!( log, "RoomAiLockGuard: failed to release lock key={} token={} err={}", lock_key, lock_token, e ); } }); } } pub async fn acquire_room_ai_lock( cache: &AppCache, room_id: Uuid, log: &slog::Logger, ) -> Result, RoomError> { let request_uid = Uuid::now_v7().to_string(); let hostname = hostname::get() .map(|h| h.to_string_lossy().into_owned()) .unwrap_or_else(|_| "unknown".to_string()); let pid = std::process::id(); let lock_token = format!("{}:{}:{}", hostname, pid, request_uid); let queue_key = format!("ai:room:queue:{}", room_id); let seq_key = format!("ai:room:queue:seq:{}", room_id); let lock_key = format!("ai:room:queue:lock:{}", room_id); let ticket_key = format!("ai:room:queue:ticket:{}:{}", room_id, request_uid); { let mut conn = cache .conn() .await .map_err(|e| RoomError::Internal(e.to_string()))?; let seq: i64 = redis::cmd("INCR") .arg(&seq_key) .query_async(&mut conn) .await .map_err(|e| RoomError::Internal(format!("INCR: {}", e)))?; let _: i32 = redis::cmd("ZADD") .arg(&queue_key) .arg(seq) .arg(&request_uid) .query_async(&mut conn) .await .map_err(|e| RoomError::Internal(format!("ZADD: {}", e)))?; let _: () = redis::cmd("SET") .arg(&ticket_key) .arg("1") .arg("PX") .arg(TICKET_TTL_MS) .query_async(&mut conn) .await .map_err(|e| RoomError::Internal(format!("SET ticket: {}", e)))?; } let start = Instant::now(); let mut retry_count: u32 = 0; loop { if start.elapsed().as_millis() as usize >= TICKET_TTL_MS { slog::warn!( log, "RoomAiLock: timeout waiting for lock after {}ms, room_id={}", start.elapsed().as_millis(), room_id ); return Ok(None); } let mut conn = cache .conn() .await .map_err(|e| RoomError::Internal(e.to_string()))?; let _: () = redis::cmd("PEXPIRE") .arg(&ticket_key) .arg(TICKET_TTL_MS) .query_async(&mut conn) .await .map_err(|e| RoomError::Internal(format!("PEXPIRE: {}", e)))?; let head: Vec = redis::cmd("ZRANGE") .arg(&queue_key) .arg(0) .arg(0) .query_async(&mut conn) .await .map_err(|e| RoomError::Internal(format!("ZRANGE: {}", e)))?; if let Some(head_uid) = head.first() { if head_uid == &request_uid { let ok: Option = redis::cmd("SET") .arg(&lock_key) .arg(&lock_token) .arg("NX") .arg("PX") .arg(LOCK_TTL_MS) .query_async(&mut conn) .await .map_err(|e| RoomError::Internal(format!("SET NX PX: {}", e)))?; if ok.is_some() { return Ok(Some(RoomAiLockGuard { cache: cache.clone(), queue_key, ticket_key, lock_key, lock_token, request_uid, acquired: true, log: log.clone(), })); } } else { let head_ticket_key = format!("ai:room:queue:ticket:{}:{}", room_id, head_uid); let head_exists: i32 = redis::cmd("EXISTS") .arg(&head_ticket_key) .query_async(&mut conn) .await .map_err(|e| RoomError::Internal(format!("EXISTS: {}", e)))?; if head_exists == 0 { let _: i32 = redis::cmd("ZREM") .arg(&queue_key) .arg(head_uid) .query_async(&mut conn) .await .map_err(|e| RoomError::Internal(format!("ZREM: {}", e)))?; } } } retry_count += 1; let backoff_exp = retry_count.min(5); let backoff_ms = std::cmp::min(10 * (2_u64.pow(backoff_exp)), MAX_BACKOFF_MS); tokio::time::sleep(Duration::from_millis(backoff_ms)).await; } } async fn release_lock( cache: &AppCache, queue_key: &str, ticket_key: &str, lock_key: &str, lock_token: &str, request_uid: &str, ) -> Result<(), String> { let mut conn = cache.conn().await.map_err(|e| e.to_string())?; let release_script = redis::Script::new( r#" if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else return 0 end "#, ); let _: () = release_script .key(lock_key) .arg(lock_token) .invoke_async(&mut conn) .await .map_err(|e| format!("DEL lock: {}", e))?; let _: i32 = redis::cmd("ZREM") .arg(queue_key) .arg(request_uid) .query_async(&mut conn) .await .map_err(|e| format!("ZREM: {}", e))?; let _: () = redis::cmd("DEL") .arg(ticket_key) .query_async(&mut conn) .await .map_err(|e| format!("DEL ticket: {}", e))?; Ok(()) }