perf: sequence generation Redis-only + session MGET batch
service.rs: Replace per-message Lua+DB seq with simple INCR, only reconcile DB every 1000 messages (99.9% queries eliminated). storage.rs: Replace N+1 GET loop with single MGET for both get_user_sessions and get_workspace_sessions (N+1 → 2 roundtrips).
This commit is contained in:
parent
33ab7b058d
commit
5776af18ca
@ -1540,52 +1540,39 @@ impl RoomService {
|
||||
let mut conn = cache.conn().await.map_err(|e| {
|
||||
RoomError::Internal(format!("failed to get redis connection for seq: {}", e))
|
||||
})?;
|
||||
// Atomically increment and check via Lua: INCR first, then if Redis was
|
||||
// externally set to a higher value, jump to max+1. This prevents concurrent
|
||||
// requests from getting duplicate seqs — the Lua script runs as one atomic unit.
|
||||
let seq: i64 = redis::cmd("EVAL")
|
||||
.arg(
|
||||
r#"
|
||||
local current = redis.call('INCR', KEYS[1])
|
||||
local stored = redis.call('GET', KEYS[1])
|
||||
if stored and tonumber(stored) > current then
|
||||
local next = tonumber(stored) + 1
|
||||
redis.call('SET', KEYS[1], next)
|
||||
return next
|
||||
end
|
||||
return current
|
||||
"#,
|
||||
)
|
||||
.arg(1)
|
||||
|
||||
// Normal path: Redis INCR is atomic and sufficient for sequence generation.
|
||||
// Lua script removed — it was executing on every single message (costly).
|
||||
let seq: i64 = redis::cmd("INCR")
|
||||
.arg(&seq_key)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.map_err(|e| RoomError::Internal(format!("seq Lua script: {}", e)))?;
|
||||
.map_err(|e| RoomError::Internal(format!("seq INCR: {}", e)))?;
|
||||
|
||||
// Reconciliation check: if DB is ahead of Redis (e.g. server restart wiped
|
||||
// Redis), bump Redis to stay in sync. This query is only hit on the rare
|
||||
// cross-server handoff case, not on every request.
|
||||
use models::rooms::room_message::{Column as RmCol, Entity as RoomMessage};
|
||||
use sea_orm::EntityTrait;
|
||||
let db_seq: Option<Option<Option<i64>>> = RoomMessage::find()
|
||||
.filter(RmCol::Room.eq(room_id))
|
||||
.select_only()
|
||||
.column_as(RmCol::Seq.max(), "max_seq")
|
||||
.into_tuple::<Option<Option<i64>>>()
|
||||
.one(db)
|
||||
.await?
|
||||
.map(|r| r);
|
||||
let db_seq = db_seq.flatten().flatten().unwrap_or(0);
|
||||
// DB reconciliation: only check every 1000 messages, not on every request.
|
||||
// This handles the rare cross-server handoff case (Redis restart wipe).
|
||||
if seq % 1000 == 0 {
|
||||
use models::rooms::room_message::{Column as RmCol, Entity as RoomMessage};
|
||||
use sea_orm::EntityTrait;
|
||||
let db_seq: Option<Option<Option<i64>>> = RoomMessage::find()
|
||||
.filter(RmCol::Room.eq(room_id))
|
||||
.select_only()
|
||||
.column_as(RmCol::Seq.max(), "max_seq")
|
||||
.into_tuple::<Option<Option<i64>>>()
|
||||
.one(db)
|
||||
.await?
|
||||
.map(|r| r);
|
||||
let db_seq = db_seq.flatten().flatten().unwrap_or(0);
|
||||
|
||||
if db_seq >= seq {
|
||||
// Another server handled this room while we were idle — catch up.
|
||||
let _: String = redis::cmd("SET")
|
||||
.arg(&seq_key)
|
||||
.arg(db_seq + 1)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.map_err(|e| RoomError::Internal(format!("SET seq: {}", e)))?;
|
||||
return Ok(db_seq + 1);
|
||||
if db_seq >= seq {
|
||||
let _: String = redis::cmd("SET")
|
||||
.arg(&seq_key)
|
||||
.arg(db_seq + 1)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.map_err(|e| RoomError::Internal(format!("seq SET: {}", e)))?;
|
||||
return Ok(db_seq + 1);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(seq)
|
||||
|
||||
@ -236,15 +236,28 @@ impl SessionStorage {
|
||||
.await
|
||||
.map_err(Self::to_err)?;
|
||||
|
||||
let mut sessions = Vec::new();
|
||||
for id_str in &session_ids {
|
||||
if let Ok(sid) = Uuid::parse_str(id_str) {
|
||||
if let Ok(Some(session)) = self.get_session(&sid).await {
|
||||
sessions.push(session);
|
||||
}
|
||||
}
|
||||
if session_ids.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
// Batch fetch all sessions in a single MGET instead of N individual GET calls.
|
||||
let keys: Vec<String> = session_ids
|
||||
.iter()
|
||||
.map(|id| format!("{}{}", KEY_CONN, id))
|
||||
.collect();
|
||||
|
||||
let values: Vec<Option<String>> = redis::cmd("MGET")
|
||||
.arg(&keys)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.map_err(Self::to_err)?;
|
||||
|
||||
let sessions: Vec<UserSession> = values
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.filter_map(|v| serde_json::from_str(&v).ok())
|
||||
.collect();
|
||||
|
||||
Ok(sessions)
|
||||
}
|
||||
|
||||
@ -261,15 +274,28 @@ impl SessionStorage {
|
||||
.await
|
||||
.map_err(Self::to_err)?;
|
||||
|
||||
let mut sessions = Vec::new();
|
||||
for id_str in &session_ids {
|
||||
if let Ok(sid) = Uuid::parse_str(id_str) {
|
||||
if let Ok(Some(session)) = self.get_session(&sid).await {
|
||||
sessions.push(session);
|
||||
}
|
||||
}
|
||||
if session_ids.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
// Batch fetch all sessions in a single MGET instead of N individual GET calls.
|
||||
let keys: Vec<String> = session_ids
|
||||
.iter()
|
||||
.map(|id| format!("{}{}", KEY_CONN, id))
|
||||
.collect();
|
||||
|
||||
let values: Vec<Option<String>> = redis::cmd("MGET")
|
||||
.arg(&keys)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.map_err(Self::to_err)?;
|
||||
|
||||
let sessions: Vec<UserSession> = values
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.filter_map(|v| serde_json::from_str(&v).ok())
|
||||
.collect();
|
||||
|
||||
Ok(sessions)
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user