diff --git a/libs/api/room/ws_universal.rs b/libs/api/room/ws_universal.rs index b7712e5..00504b2 100644 --- a/libs/api/room/ws_universal.rs +++ b/libs/api/room/ws_universal.rs @@ -276,6 +276,8 @@ pub async fn ws_universal( "content": chunk.content, "done": chunk.done, "error": chunk.error, + "display_name": chunk.display_name, + "chunk_type": chunk.chunk_type, }, }); if session.text(payload.to_string()).await.is_err() { @@ -292,6 +294,7 @@ pub async fn ws_universal( "username": event.username, "avatar_url": event.avatar_url, "action": event.action, + "sender_type": event.sender_type.as_deref().unwrap_or("user"), }, }); if session.text(payload.to_string()).await.is_err() { diff --git a/libs/queue/types.rs b/libs/queue/types.rs index f8f4029..eaeda8d 100644 --- a/libs/queue/types.rs +++ b/libs/queue/types.rs @@ -54,6 +54,9 @@ pub struct TypingEvent { pub avatar_url: Option, /// "start" or "stop" pub action: String, + /// Sender type: "user" or "ai". Defaults to "user" if absent. + #[serde(skip_serializing_if = "Option::is_none")] + pub sender_type: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/libs/room/src/connection.rs b/libs/room/src/connection.rs index d0991d7..7c6ef19 100644 --- a/libs/room/src/connection.rs +++ b/libs/room/src/connection.rs @@ -634,24 +634,30 @@ impl RoomConnectionManager { room_id: Uuid, ) -> broadcast::Receiver> { let mut map: tokio::sync::RwLockWriteGuard<'_, std::collections::HashMap>>> = self.typing_inner.write().await; - if let Some(tx) = map.get(&room_id) { - return tx.subscribe(); + let tx = map.entry(room_id).or_insert_with(|| { + let (tx, _) = broadcast::channel(BROADCAST_CAPACITY); + tx + }); + // Replay active typing state from Redis to the new subscriber. + // This ensures newly connected WS clients see who is currently typing. + let active_events = self.get_active_typing_events(room_id).await; + for event in active_events { + let _ = tx.send(Arc::new(event)); } - let (tx, rx) = broadcast::channel(BROADCAST_CAPACITY); - map.insert(room_id, tx); - rx + tx.subscribe() } - /// Broadcast a typing event and persist it to Redis with 10s TTL. - /// - "start": writes key with 10s expiry, broadcasts start event + /// Broadcast a typing event and persist it to Redis with 60s TTL. + /// - "start": writes key with 60s expiry, broadcasts start event /// - "stop": deletes key, broadcasts stop event pub async fn broadcast_typing(&self, room_id: Uuid, event: TypingEvent) { let user_key = format!("typing:{}:{}", room_id, event.user_id); let action = event.action.clone(); let username = event.username.clone(); let avatar_url = event.avatar_url.clone(); + let sender_type = event.sender_type.clone().unwrap_or_else(|| "user".to_string()); - // Write/delete Redis key for 10s expiry (non-blocking) + // Write/delete Redis key for 60s expiry (non-blocking) if let Ok(mut conn) = self.cache.conn().await { let key = user_key; tokio::spawn(async move { @@ -659,11 +665,12 @@ impl RoomConnectionManager { let value = serde_json::json!({ "username": username, "avatar_url": avatar_url, + "sender_type": sender_type, }) .to_string(); let _: Result<(), _> = redis::cmd("SETEX") .arg(&key) - .arg(10i64) + .arg(60i64) .arg(&value) .query_async(&mut conn) .await; @@ -679,6 +686,43 @@ impl RoomConnectionManager { let _ = tx.send(event); } } + + /// Load all active typing entries for a room from Redis and return as TypingEvents. + /// Used to replay current typing state to newly connected WS clients. + pub async fn get_active_typing_events(&self, room_id: Uuid) -> Vec { + let pattern = format!("typing:{}:*", room_id); + if let Ok(mut conn) = self.cache.conn().await { + let keys: Vec = match redis::cmd("KEYS").arg(&pattern).query_async(&mut conn).await { + Ok(k) => k, + Err(_) => return vec![], + }; + if keys.is_empty() { + return vec![]; + } + let mut results = Vec::new(); + for key in keys { + let parts: Vec<&str> = key.split(':').collect(); + let user_id = parts.get(2).and_then(|s| Uuid::parse_str(s).ok()); + if let (Some(Ok(value)), Some(Ok(user_uuid)))) = ( + redis::cmd("GET").arg(&key).query_async::(&mut conn).await.ok(), + user_id, + ) { + if let Ok(parsed) = serde_json::from_str::(&value) { + results.push(TypingEvent { + room_id, + user_id: user_uuid, + username: parsed.get("username").and_then(|v| v.as_str()).unwrap_or("").to_string(), + avatar_url: parsed.get("avatar_url").and_then(|v| v.as_str()).map(String::from), + action: "start".to_string(), + sender_type: parsed.get("sender_type").and_then(|v| v.as_str()).map(String::from), + }); + } + } + } + return results; + } + vec![] + } } fn parse_sender_type(s: &str) -> MessageSenderType { diff --git a/libs/room/src/service.rs b/libs/room/src/service.rs index f87894c..7247052 100644 --- a/libs/room/src/service.rs +++ b/libs/room/src/service.rs @@ -1030,6 +1030,19 @@ impl RoomService { .register_stream_channel(streaming_msg_id) .await; + // Emit an initial "thinking" chunk immediately so the frontend shows the + // "AI is thinking..." indicator without waiting for the first real token. + let initial_event = RoomMessageStreamChunkEvent { + message_id: streaming_msg_id, + room_id, + content: String::new(), + done: false, + error: None, + display_name: Some(request.model.name.clone()), + chunk_type: Some("thinking".to_string()), + }; + self.room_manager.broadcast_stream_chunk(initial_event).await; + let room_manager = self.room_manager.clone(); let db = self.db.clone(); let room_id_inner = room_id; @@ -1046,6 +1059,8 @@ impl RoomService { let room_manager = room_manager.clone(); let db = db.clone(); let model_id = model_id; + // Fixed UUID to identify AI typing events across WS reconnections. + let ai_typing_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(); // Clone before closure so closure captures clone, not the original. let ai_display_name_for_chunk = ai_display_name.clone(); let ai_display_name_for_final = ai_display_name.clone(); @@ -1088,6 +1103,17 @@ impl RoomService { let stream_callback: agent::chat::StreamCallback = Box::new(on_chunk); + // Broadcast AI typing.start so WS clients (including reconnections) see the indicator. + let typing_start = queue::TypingEvent { + room_id: room_id_inner, + user_id: ai_typing_id, + username: ai_display_name.clone(), + avatar_url: None, + action: "start".to_string(), + sender_type: Some("ai".to_string()), + }; + room_manager.broadcast_typing(room_id_inner, typing_start).await; + match chat_service.process_stream(request, stream_callback).await { Ok(full_content) => { let envelope = RoomMessageEnvelope { @@ -1142,6 +1168,17 @@ impl RoomService { room_manager.broadcast(room_id_inner, msg_event).await; room_manager.metrics.messages_sent.increment(1); + // Stop AI typing indicator now that the message is delivered. + let typing_stop = queue::TypingEvent { + room_id: room_id_inner, + user_id: ai_typing_id, + username: ai_display_name_for_final.clone(), + avatar_url: None, + action: "stop".to_string(), + sender_type: Some("ai".to_string()), + }; + room_manager.broadcast_typing(room_id_inner, typing_stop).await; + let event = queue::ProjectRoomEvent { event_type: super::RoomEventType::NewMessage.as_str().into(), project_id: project_id_inner, @@ -1158,6 +1195,17 @@ impl RoomService { } Err(e) => { tracing::error!(error = %e, "AI streaming failed"); + // Stop AI typing indicator since the stream failed. + let typing_stop = queue::TypingEvent { + room_id: room_id_inner, + user_id: ai_typing_id, + username: ai_display_name.clone(), + avatar_url: None, + action: "stop".to_string(), + sender_type: Some("ai".to_string()), + }; + room_manager.broadcast_typing(room_id_inner, typing_stop).await; + let event = RoomMessageStreamChunkEvent { message_id: streaming_msg_id, room_id: room_id_inner,