86 lines
3.5 KiB
Rust
86 lines
3.5 KiB
Rust
use std::sync::Arc;
|
|
|
|
use tokio::sync::broadcast;
|
|
|
|
use models::RoomId;
|
|
use room::types::NotificationEvent;
|
|
|
|
use super::dispatch::EventDispatcher;
|
|
use super::session::TransportSession;
|
|
use super::types::WsOutEvent;
|
|
|
|
/// Poll all active room subscriptions and yield the next available push event.
|
|
/// AI stream chunks are handled specially:
|
|
/// - seq=0 (first chunk) → MessageStreamStart (WS only sends message_id)
|
|
/// - done=true (final chunk) → MessageStreamDone (WS notifies SSE ended)
|
|
/// - Other chunks → skipped (delivered via SSE endpoint)
|
|
/// Poll all active room subscriptions and yield the next available push event.
|
|
/// reuses persistent receivers to ensure no messages are lost and avoid subscription storms.
|
|
pub async fn poll_subscriptions(session: &TransportSession) -> Option<WsOutEvent> {
|
|
// 1. Collect a list of room IDs to poll.
|
|
let room_ids: Vec<RoomId> = session.subscriptions.iter().map(|r| *r.key()).collect();
|
|
|
|
if room_ids.is_empty() {
|
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
|
return None;
|
|
}
|
|
|
|
// 2. Iterate and check for pending messages using non-blocking try_recv.
|
|
for room_id in room_ids {
|
|
if let Some(sub) = session.subscriptions.get(&room_id) {
|
|
// Check Message Events
|
|
if let Ok(mut rx) = sub.msg_rx.try_lock() {
|
|
if let Ok(event) = rx.try_recv() {
|
|
if let Some(reactions) = event.reactions.clone() {
|
|
return Some(EventDispatcher::dispatch_reactions(
|
|
room_id,
|
|
event.message_id.unwrap_or(event.id),
|
|
&reactions,
|
|
));
|
|
}
|
|
return Some(EventDispatcher::dispatch_message(&event));
|
|
}
|
|
}
|
|
|
|
// Check Stream Events (AI typing - Start/Done)
|
|
if let Ok(mut rx) = sub.stream_rx.try_lock() {
|
|
if let Ok(chunk) = rx.try_recv() {
|
|
if chunk.seq == 0 && !chunk.done {
|
|
return Some(EventDispatcher::dispatch_stream_start(&chunk));
|
|
}
|
|
if chunk.done {
|
|
return Some(EventDispatcher::dispatch_stream_done(&chunk));
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check Typing Events
|
|
if let Ok(mut rx) = sub.typing_rx.try_lock() {
|
|
if let Ok(event) = rx.try_recv() {
|
|
return Some(EventDispatcher::dispatch_typing(&event));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// 3. Prevent Busy Loop: Sleep briefly if no work was done in this iteration.
|
|
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
|
|
None
|
|
}
|
|
|
|
/// Poll user-level notification stream with a 100ms await window.
|
|
/// Uses `recv()` (not `try_recv()`) so the `tokio::select!` loop can poll
|
|
/// other futures (WS message stream, heartbeat, etc.) while waiting.
|
|
pub async fn poll_notifications(
|
|
notif_rx: &mut broadcast::Receiver<Arc<NotificationEvent>>,
|
|
) -> Option<WsOutEvent> {
|
|
match tokio::time::timeout(std::time::Duration::from_millis(100), notif_rx.recv()).await {
|
|
Ok(Ok(event)) => Some(EventDispatcher::dispatch_notification(&event)),
|
|
Ok(Err(broadcast::error::RecvError::Lagged(n))) => {
|
|
tracing::warn!(skipped = n, "notification channel lagged");
|
|
None
|
|
}
|
|
Ok(Err(broadcast::error::RecvError::Closed)) => None,
|
|
Err(_elapsed) => None,
|
|
}
|
|
} |