Compare commits
No commits in common. "0e53f4a69fdb4dc374dd2a076d2befdc5ff84528" and "7d7103e271e6fc3c1be8380fc0df220c03300de8" have entirely different histories.
0e53f4a69f
...
7d7103e271
@ -17,3 +17,8 @@ slog = { workspace = true }
|
|||||||
anyhow = { workspace = true }
|
anyhow = { workspace = true }
|
||||||
env_logger = { workspace = true }
|
env_logger = { workspace = true }
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
|
|
||||||
|
[profile.release]
|
||||||
|
strip = true
|
||||||
|
lto = "thin"
|
||||||
|
opt-level = 3
|
||||||
|
|||||||
@ -445,7 +445,6 @@ pub async fn ws_universal(
|
|||||||
for room_id in push_streams.keys() {
|
for room_id in push_streams.keys() {
|
||||||
manager.unsubscribe(*room_id, user_id).await;
|
manager.unsubscribe(*room_id, user_id).await;
|
||||||
}
|
}
|
||||||
manager.unsubscribe_user_notification(user_id).await;
|
|
||||||
manager.metrics.ws_connections_active.decrement(1.0);
|
manager.metrics.ws_connections_active.decrement(1.0);
|
||||||
manager.metrics.ws_disconnections_total.increment(1);
|
manager.metrics.ws_disconnections_total.increment(1);
|
||||||
});
|
});
|
||||||
|
|||||||
@ -270,21 +270,19 @@ pub fn spawn_room_workers(
|
|||||||
);
|
);
|
||||||
let get_redis: Arc<dyn Fn() -> queue::worker::RedisFuture + Send + Sync> =
|
let get_redis: Arc<dyn Fn() -> queue::worker::RedisFuture + Send + Sync> =
|
||||||
extract_get_redis(queue.clone());
|
extract_get_redis(queue.clone());
|
||||||
let manager1 = room_manager.clone();
|
let manager = room_manager.clone();
|
||||||
let manager2 = room_manager.clone();
|
|
||||||
let manager3 = room_manager.clone();
|
|
||||||
let redis_url_clone = redis_url.clone();
|
let redis_url_clone = redis_url.clone();
|
||||||
let redis_url3 = redis_url.clone();
|
|
||||||
let semaphore = worker_semaphore.clone();
|
let semaphore = worker_semaphore.clone();
|
||||||
|
|
||||||
|
let manager2 = room_manager.clone();
|
||||||
|
let redis_url3 = redis_url.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let _permit = match semaphore.acquire_owned().await {
|
let _permit = match semaphore.acquire_owned().await {
|
||||||
Ok(p) => p,
|
Ok(p) => p,
|
||||||
Err(_) => return,
|
Err(_) => return,
|
||||||
};
|
};
|
||||||
// Use the manager's room shutdown channel so the worker terminates
|
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
|
||||||
// when the room is deleted (shutdown_room sends on room_shutdown_txs).
|
|
||||||
let shutdown_rx = manager1.register_room(room_id).await;
|
|
||||||
queue::room_worker_task(
|
queue::room_worker_task(
|
||||||
room_id,
|
room_id,
|
||||||
uuid::Uuid::new_v4().to_string(),
|
uuid::Uuid::new_v4().to_string(),
|
||||||
@ -293,13 +291,14 @@ pub fn spawn_room_workers(
|
|||||||
shutdown_rx,
|
shutdown_rx,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
let _ = shutdown_tx.send(());
|
||||||
});
|
});
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let shutdown_rx = manager2.register_room(room_id).await;
|
let shutdown_rx = manager.register_room(room_id).await;
|
||||||
crate::connection::subscribe_room_events(
|
crate::connection::subscribe_room_events(
|
||||||
redis_url_clone,
|
redis_url_clone,
|
||||||
manager2,
|
manager.clone(),
|
||||||
room_id,
|
room_id,
|
||||||
shutdown_rx,
|
shutdown_rx,
|
||||||
)
|
)
|
||||||
@ -318,10 +317,10 @@ pub fn spawn_room_workers(
|
|||||||
None => return,
|
None => return,
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let shutdown_rx = manager3.register_project(project_id).await;
|
let shutdown_rx = manager2.register_project(project_id).await;
|
||||||
crate::connection::subscribe_project_room_events(
|
crate::connection::subscribe_project_room_events(
|
||||||
redis_url3,
|
redis_url3,
|
||||||
manager3,
|
manager2,
|
||||||
project_id,
|
project_id,
|
||||||
shutdown_rx,
|
shutdown_rx,
|
||||||
)
|
)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user