gitdataai/libs/room/src/room_ai_queue.rs
2026-04-14 19:02:01 +08:00

226 lines
6.8 KiB
Rust

use crate::RoomError;
use db::cache::AppCache;
use std::time::{Duration, Instant};
use uuid::Uuid;
const LOCK_TTL_MS: usize = 120_000;
const TICKET_TTL_MS: usize = 90_000;
const MAX_BACKOFF_MS: u64 = 200;
pub struct RoomAiLockGuard {
cache: AppCache,
queue_key: String,
ticket_key: String,
lock_key: String,
lock_token: String,
request_uid: String,
acquired: bool,
log: slog::Logger,
}
impl Drop for RoomAiLockGuard {
fn drop(&mut self) {
if !self.acquired {
return;
}
let cache = self.cache.clone();
let queue_key = self.queue_key.clone();
let ticket_key = self.ticket_key.clone();
let lock_key = self.lock_key.clone();
let lock_token = self.lock_token.clone();
let request_uid = self.request_uid.clone();
let log = self.log.clone();
tokio::spawn(async move {
if let Err(e) = release_lock(
&cache,
&queue_key,
&ticket_key,
&lock_key,
&lock_token,
&request_uid,
)
.await
{
slog::warn!(
log,
"RoomAiLockGuard: failed to release lock key={} token={} err={}",
lock_key,
lock_token,
e
);
}
});
}
}
pub async fn acquire_room_ai_lock(
cache: &AppCache,
room_id: Uuid,
log: &slog::Logger,
) -> Result<Option<RoomAiLockGuard>, RoomError> {
let request_uid = Uuid::now_v7().to_string();
let hostname = hostname::get()
.map(|h| h.to_string_lossy().into_owned())
.unwrap_or_else(|_| "unknown".to_string());
let pid = std::process::id();
let lock_token = format!("{}:{}:{}", hostname, pid, request_uid);
let queue_key = format!("ai:room:queue:{}", room_id);
let seq_key = format!("ai:room:queue:seq:{}", room_id);
let lock_key = format!("ai:room:queue:lock:{}", room_id);
let ticket_key = format!("ai:room:queue:ticket:{}:{}", room_id, request_uid);
{
let mut conn = cache
.conn()
.await
.map_err(|e| RoomError::Internal(e.to_string()))?;
let seq: i64 = redis::cmd("INCR")
.arg(&seq_key)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("INCR: {}", e)))?;
let _: i32 = redis::cmd("ZADD")
.arg(&queue_key)
.arg(seq)
.arg(&request_uid)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("ZADD: {}", e)))?;
let _: () = redis::cmd("SET")
.arg(&ticket_key)
.arg("1")
.arg("PX")
.arg(TICKET_TTL_MS)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("SET ticket: {}", e)))?;
}
let start = Instant::now();
let mut retry_count: u32 = 0;
loop {
if start.elapsed().as_millis() as usize >= TICKET_TTL_MS {
slog::warn!(
log,
"RoomAiLock: timeout waiting for lock after {}ms, room_id={}",
start.elapsed().as_millis(),
room_id
);
return Ok(None);
}
let mut conn = cache
.conn()
.await
.map_err(|e| RoomError::Internal(e.to_string()))?;
let _: () = redis::cmd("PEXPIRE")
.arg(&ticket_key)
.arg(TICKET_TTL_MS)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("PEXPIRE: {}", e)))?;
let head: Vec<String> = redis::cmd("ZRANGE")
.arg(&queue_key)
.arg(0)
.arg(0)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("ZRANGE: {}", e)))?;
if let Some(head_uid) = head.first() {
if head_uid == &request_uid {
let ok: Option<String> = redis::cmd("SET")
.arg(&lock_key)
.arg(&lock_token)
.arg("NX")
.arg("PX")
.arg(LOCK_TTL_MS)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("SET NX PX: {}", e)))?;
if ok.is_some() {
return Ok(Some(RoomAiLockGuard {
cache: cache.clone(),
queue_key,
ticket_key,
lock_key,
lock_token,
request_uid,
acquired: true,
log: log.clone(),
}));
}
} else {
let head_ticket_key = format!("ai:room:queue:ticket:{}:{}", room_id, head_uid);
let head_exists: i32 = redis::cmd("EXISTS")
.arg(&head_ticket_key)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("EXISTS: {}", e)))?;
if head_exists == 0 {
let _: i32 = redis::cmd("ZREM")
.arg(&queue_key)
.arg(head_uid)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("ZREM: {}", e)))?;
}
}
}
retry_count += 1;
let backoff_exp = retry_count.min(5);
let backoff_ms = std::cmp::min(10 * (2_u64.pow(backoff_exp)), MAX_BACKOFF_MS);
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
}
}
async fn release_lock(
cache: &AppCache,
queue_key: &str,
ticket_key: &str,
lock_key: &str,
lock_token: &str,
request_uid: &str,
) -> Result<(), String> {
let mut conn = cache.conn().await.map_err(|e| e.to_string())?;
let release_script = redis::Script::new(
r#"
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
"#,
);
let _: () = release_script
.key(lock_key)
.arg(lock_token)
.invoke_async(&mut conn)
.await
.map_err(|e| format!("DEL lock: {}", e))?;
let _: i32 = redis::cmd("ZREM")
.arg(queue_key)
.arg(request_uid)
.query_async(&mut conn)
.await
.map_err(|e| format!("ZREM: {}", e))?;
let _: () = redis::cmd("DEL")
.arg(ticket_key)
.query_async(&mut conn)
.await
.map_err(|e| format!("DEL ticket: {}", e))?;
Ok(())
}