gitdataai/libs/room/src/room_ai_queue.rs
2026-05-14 10:02:21 +08:00

333 lines
11 KiB
Rust

use crate::RoomError;
use db::cache::AppCache;
use std::time::{Duration, Instant};
use uuid::Uuid;
use tokio_util::sync::CancellationToken;
const LOCK_TTL_MS: usize = 20_000; // Shorter TTL for watchdog
const HEARTBEAT_INTERVAL_MS: u64 = 10_000;
const TICKET_TTL_MS: usize = 90_000;
const MAX_BACKOFF_MS: u64 = 200;
pub struct RoomAiLockGuard {
cache: AppCache,
queue_key: String,
ticket_key: String,
lock_key: String,
lock_token: String,
request_uid: String,
acquired: bool,
cancel_token: CancellationToken,
}
impl RoomAiLockGuard {
pub async fn release(mut self) {
if !self.acquired {
return;
}
self.acquired = false;
self.cancel_token.cancel();
let cache = self.cache.clone();
let queue_key = self.queue_key.clone();
let ticket_key = self.ticket_key.clone();
let lock_key = self.lock_key.clone();
let lock_token = self.lock_token.clone();
let request_uid = self.request_uid.clone();
if let Err(e) = release_lock(
&cache,
&queue_key,
&ticket_key,
&lock_key,
&lock_token,
&request_uid,
)
.await
{
tracing::warn!(
lock_key = %lock_key,
lock_token = %lock_token,
error = %e,
"RoomAiLockGuard: failed to release lock"
);
}
}
}
impl Drop for RoomAiLockGuard {
fn drop(&mut self) {
if !self.acquired {
return;
}
// Signal watchdog to stop
self.cancel_token.cancel();
let cache = self.cache.clone();
let queue_key = self.queue_key.clone();
let ticket_key = self.ticket_key.clone();
let lock_key = self.lock_key.clone();
let lock_token = self.lock_token.clone();
let request_uid = self.request_uid.clone();
// Fire-and-forget release in background if runtime is available.
// We don't block here or spawn threads anymore, as the watchdog
// mechanism ensures the lock will expire safely anyway.
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
let _ = release_lock(
&cache,
&queue_key,
&ticket_key,
&lock_key,
&lock_token,
&request_uid,
)
.await;
});
}
}
}
pub async fn acquire_room_ai_lock(
cache: &AppCache,
room_id: Uuid,
) -> Result<Option<RoomAiLockGuard>, RoomError> {
let request_uid = Uuid::now_v7().to_string();
let hostname = hostname::get()
.map(|h| h.to_string_lossy().into_owned())
.unwrap_or_else(|_| "unknown".to_string());
let pid = std::process::id();
let lock_token = format!("{}:{}:{}", hostname, pid, request_uid);
let queue_key = format!("ai:room:queue:{}", room_id);
let seq_key = format!("ai:room:queue:seq:{}", room_id);
let lock_key = format!("ai:room:queue:lock:{}", room_id);
let ticket_key = format!("ai:room:queue:ticket:{}:{}", room_id, request_uid);
{
let mut conn = cache
.conn()
.await
.map_err(|e| RoomError::Internal(e.to_string()))?;
let seq: i64 = redis::cmd("INCR")
.arg(&seq_key)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("INCR: {}", e)))?;
let _: i32 = redis::cmd("ZADD")
.arg(&queue_key)
.arg(seq)
.arg(&request_uid)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("ZADD: {}", e)))?;
let _: () = redis::cmd("SET")
.arg(&ticket_key)
.arg("1")
.arg("PX")
.arg(TICKET_TTL_MS)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("SET ticket: {}", e)))?;
}
let start = Instant::now();
let mut retry_count: u32 = 0;
loop {
if start.elapsed().as_millis() as usize >= TICKET_TTL_MS {
tracing::warn!(
room_id = %room_id,
elapsed_ms = start.elapsed().as_millis(),
"RoomAiLock: timeout waiting for lock, cleaning up"
);
// Clean up our own ZSET entry and ticket to prevent ZSET leak
if let Ok(mut conn) = cache.conn().await {
let _: i32 = redis::cmd("ZREM")
.arg(&queue_key)
.arg(&request_uid)
.query_async(&mut conn)
.await
.inspect_err(|e| tracing::warn!(error = %e, "timeout ZREM failed"))
.unwrap_or(0);
let _: i32 = redis::cmd("DEL")
.arg(&ticket_key)
.query_async(&mut conn)
.await
.inspect_err(|e| tracing::warn!(error = %e, "timeout DEL ticket failed"))
.unwrap_or(0);
}
return Ok(None);
}
let mut conn = cache
.conn()
.await
.map_err(|e| RoomError::Internal(e.to_string()))?;
let _: () = redis::cmd("PEXPIRE")
.arg(&ticket_key)
.arg(TICKET_TTL_MS)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("PEXPIRE: {}", e)))?;
let head: Vec<String> = redis::cmd("ZRANGE")
.arg(&queue_key)
.arg(0)
.arg(0)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("ZRANGE: {}", e)))?;
if let Some(head_uid) = head.first() {
if head_uid == &request_uid {
let ok: Option<String> = redis::cmd("SET")
.arg(&lock_key)
.arg(&lock_token)
.arg("NX")
.arg("PX")
.arg(LOCK_TTL_MS)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("SET NX PX: {}", e)))?;
if ok.is_some() {
let cancel_token = CancellationToken::new();
let guard = RoomAiLockGuard {
cache: cache.clone(),
queue_key: queue_key.clone(),
ticket_key: ticket_key.clone(),
lock_key: lock_key.clone(),
lock_token: lock_token.clone(),
request_uid: request_uid.clone(),
acquired: true,
cancel_token: cancel_token.clone(),
};
// Start Watchdog task to renew lock TTL
let cache_for_watchdog = cache.clone();
let lock_key_for_watchdog = lock_key.clone();
let lock_token_for_watchdog = lock_token.clone();
tokio::spawn(async move {
let mut interval =
tokio::time::interval(Duration::from_millis(HEARTBEAT_INTERVAL_MS));
loop {
tokio::select! {
_ = cancel_token.cancelled() => break,
_ = interval.tick() => {
if let Ok(mut conn) = cache_for_watchdog.conn().await {
let renew_script = redis::Script::new(
r#"
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("PEXPIRE", KEYS[1], ARGV[2])
else
return 0
end
"#,
);
let _: i32 = renew_script
.key(&lock_key_for_watchdog)
.arg(&lock_token_for_watchdog)
.arg(LOCK_TTL_MS)
.invoke_async(&mut conn)
.await
.unwrap_or(0);
}
}
}
}
});
return Ok(Some(guard));
}
// Lock exists — check if it's stale (previous owner crashed).
let pttl: i64 = redis::cmd("PTTL")
.arg(&lock_key)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("PTTL: {}", e)))?;
if pttl == -1 {
tracing::warn!(
lock_key = %lock_key,
"RoomAiLock: lock exists without TTL, force releasing"
);
let _: i32 = redis::cmd("DEL")
.arg(&lock_key)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("DEL stale lock: {}", e)))?;
}
} else {
let head_ticket_key = format!("ai:room:queue:ticket:{}:{}", room_id, head_uid);
let head_exists: i32 = redis::cmd("EXISTS")
.arg(&head_ticket_key)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("EXISTS: {}", e)))?;
if head_exists == 0 {
let _: i32 = redis::cmd("ZREM")
.arg(&queue_key)
.arg(head_uid)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("ZREM: {}", e)))?;
}
}
}
retry_count += 1;
let backoff_exp = retry_count.min(5);
let backoff_ms = std::cmp::min(10 * (2_u64.pow(backoff_exp)), MAX_BACKOFF_MS);
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
}
}
async fn release_lock(
cache: &AppCache,
queue_key: &str,
ticket_key: &str,
lock_key: &str,
lock_token: &str,
request_uid: &str,
) -> Result<(), String> {
let mut conn = cache.conn().await.map_err(|e| e.to_string())?;
let release_script = redis::Script::new(
r#"
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
"#,
);
let _: () = release_script
.key(lock_key)
.arg(lock_token)
.invoke_async(&mut conn)
.await
.map_err(|e| format!("DEL lock: {}", e))?;
let _: i32 = redis::cmd("ZREM")
.arg(queue_key)
.arg(request_uid)
.query_async(&mut conn)
.await
.map_err(|e| format!("ZREM: {}", e))?;
let _: () = redis::cmd("DEL")
.arg(ticket_key)
.query_async(&mut conn)
.await
.map_err(|e| format!("DEL ticket: {}", e))?;
Ok(())
}