From 5776af18caca2a588a2bea7068fbb36b8e638580 Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Fri, 24 Apr 2026 00:04:27 +0800 Subject: [PATCH] perf: sequence generation Redis-only + session MGET batch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit service.rs: Replace per-message Lua+DB seq with simple INCR, only reconcile DB every 1000 messages (99.9% queries eliminated). storage.rs: Replace N+1 GET loop with single MGET for both get_user_sessions and get_workspace_sessions (N+1 → 2 roundtrips). --- libs/room/src/service.rs | 69 ++++++++++++----------------- libs/session_manager/src/storage.rs | 54 ++++++++++++++++------ 2 files changed, 68 insertions(+), 55 deletions(-) diff --git a/libs/room/src/service.rs b/libs/room/src/service.rs index 513b6e6..00ae50b 100644 --- a/libs/room/src/service.rs +++ b/libs/room/src/service.rs @@ -1540,52 +1540,39 @@ impl RoomService { let mut conn = cache.conn().await.map_err(|e| { RoomError::Internal(format!("failed to get redis connection for seq: {}", e)) })?; - // Atomically increment and check via Lua: INCR first, then if Redis was - // externally set to a higher value, jump to max+1. This prevents concurrent - // requests from getting duplicate seqs — the Lua script runs as one atomic unit. - let seq: i64 = redis::cmd("EVAL") - .arg( - r#" - local current = redis.call('INCR', KEYS[1]) - local stored = redis.call('GET', KEYS[1]) - if stored and tonumber(stored) > current then - local next = tonumber(stored) + 1 - redis.call('SET', KEYS[1], next) - return next - end - return current - "#, - ) - .arg(1) + + // Normal path: Redis INCR is atomic and sufficient for sequence generation. + // Lua script removed — it was executing on every single message (costly). + let seq: i64 = redis::cmd("INCR") .arg(&seq_key) .query_async(&mut conn) .await - .map_err(|e| RoomError::Internal(format!("seq Lua script: {}", e)))?; + .map_err(|e| RoomError::Internal(format!("seq INCR: {}", e)))?; - // Reconciliation check: if DB is ahead of Redis (e.g. server restart wiped - // Redis), bump Redis to stay in sync. This query is only hit on the rare - // cross-server handoff case, not on every request. - use models::rooms::room_message::{Column as RmCol, Entity as RoomMessage}; - use sea_orm::EntityTrait; - let db_seq: Option>> = RoomMessage::find() - .filter(RmCol::Room.eq(room_id)) - .select_only() - .column_as(RmCol::Seq.max(), "max_seq") - .into_tuple::>>() - .one(db) - .await? - .map(|r| r); - let db_seq = db_seq.flatten().flatten().unwrap_or(0); + // DB reconciliation: only check every 1000 messages, not on every request. + // This handles the rare cross-server handoff case (Redis restart wipe). + if seq % 1000 == 0 { + use models::rooms::room_message::{Column as RmCol, Entity as RoomMessage}; + use sea_orm::EntityTrait; + let db_seq: Option>> = RoomMessage::find() + .filter(RmCol::Room.eq(room_id)) + .select_only() + .column_as(RmCol::Seq.max(), "max_seq") + .into_tuple::>>() + .one(db) + .await? + .map(|r| r); + let db_seq = db_seq.flatten().flatten().unwrap_or(0); - if db_seq >= seq { - // Another server handled this room while we were idle — catch up. - let _: String = redis::cmd("SET") - .arg(&seq_key) - .arg(db_seq + 1) - .query_async(&mut conn) - .await - .map_err(|e| RoomError::Internal(format!("SET seq: {}", e)))?; - return Ok(db_seq + 1); + if db_seq >= seq { + let _: String = redis::cmd("SET") + .arg(&seq_key) + .arg(db_seq + 1) + .query_async(&mut conn) + .await + .map_err(|e| RoomError::Internal(format!("seq SET: {}", e)))?; + return Ok(db_seq + 1); + } } Ok(seq) diff --git a/libs/session_manager/src/storage.rs b/libs/session_manager/src/storage.rs index ecb4b33..c58909e 100644 --- a/libs/session_manager/src/storage.rs +++ b/libs/session_manager/src/storage.rs @@ -236,15 +236,28 @@ impl SessionStorage { .await .map_err(Self::to_err)?; - let mut sessions = Vec::new(); - for id_str in &session_ids { - if let Ok(sid) = Uuid::parse_str(id_str) { - if let Ok(Some(session)) = self.get_session(&sid).await { - sessions.push(session); - } - } + if session_ids.is_empty() { + return Ok(Vec::new()); } + // Batch fetch all sessions in a single MGET instead of N individual GET calls. + let keys: Vec = session_ids + .iter() + .map(|id| format!("{}{}", KEY_CONN, id)) + .collect(); + + let values: Vec> = redis::cmd("MGET") + .arg(&keys) + .query_async(&mut conn) + .await + .map_err(Self::to_err)?; + + let sessions: Vec = values + .into_iter() + .flatten() + .filter_map(|v| serde_json::from_str(&v).ok()) + .collect(); + Ok(sessions) } @@ -261,15 +274,28 @@ impl SessionStorage { .await .map_err(Self::to_err)?; - let mut sessions = Vec::new(); - for id_str in &session_ids { - if let Ok(sid) = Uuid::parse_str(id_str) { - if let Ok(Some(session)) = self.get_session(&sid).await { - sessions.push(session); - } - } + if session_ids.is_empty() { + return Ok(Vec::new()); } + // Batch fetch all sessions in a single MGET instead of N individual GET calls. + let keys: Vec = session_ids + .iter() + .map(|id| format!("{}{}", KEY_CONN, id)) + .collect(); + + let values: Vec> = redis::cmd("MGET") + .arg(&keys) + .query_async(&mut conn) + .await + .map_err(Self::to_err)?; + + let sessions: Vec = values + .into_iter() + .flatten() + .filter_map(|v| serde_json::from_str(&v).ok()) + .collect(); + Ok(sessions) }