From 0e53f4a69fdb4dc374dd2a076d2befdc5ff84528 Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Sun, 26 Apr 2026 16:52:20 +0800 Subject: [PATCH] fix(room): fix two major memory leaks 1. WS disconnect now unsubscribes from user_notification_inner. Previously, every WebSocket connection created a broadcast channel for user notifications that was never removed on disconnect, causing unbounded growth proportional to unique connected users over time. 2. Room worker tasks now use the manager's room_shutdown_txs channel instead of a local broadcast channel. shutdown_room() sends on this channel, so when a room is deleted the worker task receives the signal and terminates, releasing its DashMap (capacity 10,000) and all captured closures. Previously the worker ran forever. --- libs/api/room/ws_universal.rs | 1 + libs/room/src/service/workers.rs | 21 +++++++++++---------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/libs/api/room/ws_universal.rs b/libs/api/room/ws_universal.rs index 64be695..466576f 100644 --- a/libs/api/room/ws_universal.rs +++ b/libs/api/room/ws_universal.rs @@ -445,6 +445,7 @@ pub async fn ws_universal( for room_id in push_streams.keys() { manager.unsubscribe(*room_id, user_id).await; } + manager.unsubscribe_user_notification(user_id).await; manager.metrics.ws_connections_active.decrement(1.0); manager.metrics.ws_disconnections_total.increment(1); }); diff --git a/libs/room/src/service/workers.rs b/libs/room/src/service/workers.rs index af7e661..ddaba2b 100644 --- a/libs/room/src/service/workers.rs +++ b/libs/room/src/service/workers.rs @@ -270,19 +270,21 @@ pub fn spawn_room_workers( ); let get_redis: Arc queue::worker::RedisFuture + Send + Sync> = extract_get_redis(queue.clone()); - let manager = room_manager.clone(); - let redis_url_clone = redis_url.clone(); - let semaphore = worker_semaphore.clone(); - + let manager1 = room_manager.clone(); let manager2 = room_manager.clone(); + let manager3 = room_manager.clone(); + let redis_url_clone = redis_url.clone(); let redis_url3 = redis_url.clone(); + let semaphore = worker_semaphore.clone(); tokio::spawn(async move { let _permit = match semaphore.acquire_owned().await { Ok(p) => p, Err(_) => return, }; - let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel::<()>(1); + // Use the manager's room shutdown channel so the worker terminates + // when the room is deleted (shutdown_room sends on room_shutdown_txs). + let shutdown_rx = manager1.register_room(room_id).await; queue::room_worker_task( room_id, uuid::Uuid::new_v4().to_string(), @@ -291,14 +293,13 @@ pub fn spawn_room_workers( shutdown_rx, ) .await; - let _ = shutdown_tx.send(()); }); tokio::spawn(async move { - let shutdown_rx = manager.register_room(room_id).await; + let shutdown_rx = manager2.register_room(room_id).await; crate::connection::subscribe_room_events( redis_url_clone, - manager.clone(), + manager2, room_id, shutdown_rx, ) @@ -317,10 +318,10 @@ pub fn spawn_room_workers( None => return, } }; - let shutdown_rx = manager2.register_project(project_id).await; + let shutdown_rx = manager3.register_project(project_id).await; crate::connection::subscribe_project_room_events( redis_url3, - manager2, + manager3, project_id, shutdown_rx, )