diff --git a/libs/api/room/ws_universal.rs b/libs/api/room/ws_universal.rs index 64be695..466576f 100644 --- a/libs/api/room/ws_universal.rs +++ b/libs/api/room/ws_universal.rs @@ -445,6 +445,7 @@ pub async fn ws_universal( for room_id in push_streams.keys() { manager.unsubscribe(*room_id, user_id).await; } + manager.unsubscribe_user_notification(user_id).await; manager.metrics.ws_connections_active.decrement(1.0); manager.metrics.ws_disconnections_total.increment(1); }); diff --git a/libs/room/src/service/workers.rs b/libs/room/src/service/workers.rs index af7e661..ddaba2b 100644 --- a/libs/room/src/service/workers.rs +++ b/libs/room/src/service/workers.rs @@ -270,19 +270,21 @@ pub fn spawn_room_workers( ); let get_redis: Arc queue::worker::RedisFuture + Send + Sync> = extract_get_redis(queue.clone()); - let manager = room_manager.clone(); - let redis_url_clone = redis_url.clone(); - let semaphore = worker_semaphore.clone(); - + let manager1 = room_manager.clone(); let manager2 = room_manager.clone(); + let manager3 = room_manager.clone(); + let redis_url_clone = redis_url.clone(); let redis_url3 = redis_url.clone(); + let semaphore = worker_semaphore.clone(); tokio::spawn(async move { let _permit = match semaphore.acquire_owned().await { Ok(p) => p, Err(_) => return, }; - let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel::<()>(1); + // Use the manager's room shutdown channel so the worker terminates + // when the room is deleted (shutdown_room sends on room_shutdown_txs). + let shutdown_rx = manager1.register_room(room_id).await; queue::room_worker_task( room_id, uuid::Uuid::new_v4().to_string(), @@ -291,14 +293,13 @@ pub fn spawn_room_workers( shutdown_rx, ) .await; - let _ = shutdown_tx.send(()); }); tokio::spawn(async move { - let shutdown_rx = manager.register_room(room_id).await; + let shutdown_rx = manager2.register_room(room_id).await; crate::connection::subscribe_room_events( redis_url_clone, - manager.clone(), + manager2, room_id, shutdown_rx, ) @@ -317,10 +318,10 @@ pub fn spawn_room_workers( None => return, } }; - let shutdown_rx = manager2.register_project(project_id).await; + let shutdown_rx = manager3.register_project(project_id).await; crate::connection::subscribe_project_room_events( redis_url3, - manager2, + manager3, project_id, shutdown_rx, )