diff --git a/libs/api/room/ws_types.rs b/libs/api/room/ws_types.rs index 885be30..80dbad1 100644 --- a/libs/api/room/ws_types.rs +++ b/libs/api/room/ws_types.rs @@ -114,6 +114,10 @@ pub enum WsAction { SubscribeProject, #[serde(rename = "project.unsubscribe")] UnsubscribeProject, + #[serde(rename = "typing.start")] + TypingStart, + #[serde(rename = "typing.stop")] + TypingStop, } impl std::fmt::Display for WsAction { @@ -164,6 +168,8 @@ impl std::fmt::Display for WsAction { WsAction::UnsubscribeRoom => write!(f, "room.unsubscribe"), WsAction::SubscribeProject => write!(f, "project.subscribe"), WsAction::UnsubscribeProject => write!(f, "project.unsubscribe"), + WsAction::TypingStart => write!(f, "typing.start"), + WsAction::TypingStop => write!(f, "typing.stop"), } } } @@ -211,6 +217,8 @@ pub struct WsRequestParams { pub min_score: Option, pub query: Option, pub attachment_ids: Option>, + /// Typing event: "start" or "stop" + pub typing: Option, } #[derive(Debug, Clone, Serialize)] diff --git a/libs/api/room/ws_universal.rs b/libs/api/room/ws_universal.rs index bc8ffc6..4650ab4 100644 --- a/libs/api/room/ws_universal.rs +++ b/libs/api/room/ws_universal.rs @@ -9,7 +9,7 @@ use tokio_stream::wrappers::BroadcastStream; use uuid::Uuid; use crate::error::ApiError; -use queue::{ReactionGroup, RoomMessageEvent, RoomMessageStreamChunkEvent}; +use queue::{ReactionGroup, RoomMessageEvent, RoomMessageStreamChunkEvent, TypingEvent}; use room::connection::RoomConnectionManager; use service::AppService; @@ -40,6 +40,10 @@ pub enum WsPushEvent { room_id: Uuid, chunk: Arc, }, + TypingIndicator { + room_id: Uuid, + event: Arc, + }, } /// Maps room_id -> (room_message_broadcast_stream, stream_chunk_broadcast_stream) @@ -48,6 +52,7 @@ type PushStreams = HashMap< ( BroadcastStream>, BroadcastStream>, + BroadcastStream>, ), >; @@ -245,6 +250,22 @@ pub async fn ws_universal( break; } } + Some(WsPushEvent::TypingIndicator { room_id, event }) => { + let payload = serde_json::json!({ + "type": "event", + "event": "room.typing", + "room_id": room_id, + "data": { + "user_id": event.user_id, + "username": event.username, + "avatar_url": event.avatar_url, + "action": event.action, + }, + }); + if session.text(payload.to_string()).await.is_err() { + break; + } + } None => { } } @@ -307,9 +328,11 @@ pub async fn ws_universal( match manager.subscribe(room_id, user_id).await { Ok(rx) => { let stream_rx = manager.subscribe_room_stream(room_id).await; + let typing_rx = manager.subscribe_typing(room_id).await; push_streams.insert(room_id, ( BroadcastStream::new(rx), BroadcastStream::new(stream_rx), + BroadcastStream::new(typing_rx), )); let _ = session.text(serde_json::to_string(&WsResponse::success( request.request_id, &action_str, @@ -338,6 +361,24 @@ pub async fn ws_universal( request.request_id, &action_str, WsResponseData::bool(true) )).unwrap_or_default()).await; } + WsAction::TypingStart | WsAction::TypingStop => { + if let (Some(room_id), Some(action)) = + (request.params().room_id, request.params().typing.as_deref()) + { + let names = handler.service().room.get_user_names(&[user_id]).await; + let typing_event = TypingEvent { + room_id, + user_id, + username: names.into_values().next().unwrap_or_else(|| "unknown".to_string()), + avatar_url: None, + action: action.to_string(), + }; + manager.broadcast_typing(room_id, typing_event).await; + } + let _ = session.text(serde_json::to_string(&WsResponse::success( + request.request_id, &action_str, WsResponseData::bool(true) + )).unwrap_or_default()).await; + } _ => { let resp = handler.handle(request).await; let _ = session.text(serde_json::to_string(&resp).unwrap_or_default()).await; @@ -383,7 +424,7 @@ async fn poll_push_streams( let mut dead_rooms: Vec = Vec::new(); for room_id in room_ids { - if let Some((msg_stream, chunk_stream)) = streams.get_mut(&room_id) { + if let Some((msg_stream, chunk_stream, typing_stream)) = streams.get_mut(&room_id) { tokio::select! { result = msg_stream.next() => { match result { @@ -412,6 +453,16 @@ async fn poll_push_streams( } } } + result = typing_stream.next() => { + match result { + Some(Ok(event)) => { + return Some(WsPushEvent::TypingIndicator { room_id, event }); + } + Some(Err(_)) | None => { + // Typing channel going dead is non-fatal — typing is ephemeral + } + } + } } } } @@ -424,9 +475,11 @@ async fn poll_push_streams( if service.room.check_room_access(room_id, user_id).await.is_ok() { if let Ok(rx) = manager.subscribe(room_id, user_id).await { let stream_rx = manager.subscribe_room_stream(room_id).await; + let typing_rx = manager.subscribe_typing(room_id).await; streams.insert(room_id, ( BroadcastStream::new(rx), BroadcastStream::new(stream_rx), + BroadcastStream::new(typing_rx), )); } } diff --git a/libs/room/src/connection.rs b/libs/room/src/connection.rs index 3b44211..d0991d7 100644 --- a/libs/room/src/connection.rs +++ b/libs/room/src/connection.rs @@ -7,8 +7,10 @@ use std::time::{Duration, Instant}; use tokio::sync::{RwLock, broadcast}; use uuid::Uuid; +use db::cache::AppCache; use db::database::AppDatabase; use models::rooms::{MessageContentType, MessageSenderType, room_message}; +use queue::types::TypingEvent; use queue::{AgentTaskEvent, ProjectRoomEvent, RoomMessageEnvelope, RoomMessageEvent, RoomMessageStreamChunkEvent}; use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, Set}; @@ -33,6 +35,7 @@ pub struct RoomConnectionManager { /// Broadcast channel for agent task events per project. task_inner: RwLock>>>, pub metrics: Arc, + cache: AppCache, connection_rate: RwLock>, shutdown_tx: broadcast::Sender<()>, room_shutdown_txs: RwLock>>, @@ -40,6 +43,7 @@ pub struct RoomConnectionManager { user_shutdown_txs: RwLock>>, stream_inner: RwLock>>>, room_stream_inner: RwLock>>>, + typing_inner: RwLock>>>, room_last_activity: RwLock>, room_subscriber_count: RwLock>, project_subscriber_count: RwLock>, @@ -47,7 +51,7 @@ pub struct RoomConnectionManager { } impl RoomConnectionManager { - pub fn new(metrics: Arc) -> Self { + pub fn new(metrics: Arc, cache: AppCache) -> Self { let (shutdown_tx, _) = broadcast::channel(SHUTDOWN_CHANNEL_CAPACITY); Self { #[allow(clippy::default_constructed_unit_structs)] @@ -61,6 +65,7 @@ impl RoomConnectionManager { #[allow(clippy::default_constructed_unit_structs)] task_inner: RwLock::new(HashMap::new()), metrics, + cache, #[allow(clippy::default_constructed_unit_structs)] connection_rate: RwLock::new(HashMap::new()), shutdown_tx, @@ -75,6 +80,8 @@ impl RoomConnectionManager { #[allow(clippy::default_constructed_unit_structs)] room_stream_inner: RwLock::new(HashMap::new()), #[allow(clippy::default_constructed_unit_structs)] + typing_inner: RwLock::new(HashMap::new()), + #[allow(clippy::default_constructed_unit_structs)] room_last_activity: RwLock::new(HashMap::new()), #[allow(clippy::default_constructed_unit_structs)] room_subscriber_count: RwLock::new(HashMap::new()), @@ -621,6 +628,57 @@ impl RoomConnectionManager { let mut map = self.stream_inner.write().await; map.remove(&message_id); } + + pub async fn subscribe_typing( + &self, + room_id: Uuid, + ) -> broadcast::Receiver> { + let mut map: tokio::sync::RwLockWriteGuard<'_, std::collections::HashMap>>> = self.typing_inner.write().await; + if let Some(tx) = map.get(&room_id) { + return tx.subscribe(); + } + let (tx, rx) = broadcast::channel(BROADCAST_CAPACITY); + map.insert(room_id, tx); + rx + } + + /// Broadcast a typing event and persist it to Redis with 10s TTL. + /// - "start": writes key with 10s expiry, broadcasts start event + /// - "stop": deletes key, broadcasts stop event + pub async fn broadcast_typing(&self, room_id: Uuid, event: TypingEvent) { + let user_key = format!("typing:{}:{}", room_id, event.user_id); + let action = event.action.clone(); + let username = event.username.clone(); + let avatar_url = event.avatar_url.clone(); + + // Write/delete Redis key for 10s expiry (non-blocking) + if let Ok(mut conn) = self.cache.conn().await { + let key = user_key; + tokio::spawn(async move { + if action == "start" { + let value = serde_json::json!({ + "username": username, + "avatar_url": avatar_url, + }) + .to_string(); + let _: Result<(), _> = redis::cmd("SETEX") + .arg(&key) + .arg(10i64) + .arg(&value) + .query_async(&mut conn) + .await; + } else { + let _: Result<(), _> = redis::cmd("DEL").arg(&key).query_async(&mut conn).await; + } + }); + } + + let map: tokio::sync::RwLockReadGuard<'_, std::collections::HashMap>>> = self.typing_inner.read().await; + if let Some(tx) = map.get(&room_id) { + let event = Arc::new(event); + let _ = tx.send(event); + } + } } fn parse_sender_type(s: &str) -> MessageSenderType { @@ -738,15 +796,24 @@ pub fn make_persist_fn( .exec(&db) .await?; - // Update content_tsv for inserted messages - for env in chunk.iter() { - let update_sql = format!( - "UPDATE room_message SET content_tsv = to_tsvector('simple', content) WHERE id = '{}'", - env.id + // Batch update content_tsv using a single UPDATE with subquery + // instead of N individual UPDATE statements (N=chunk size, up to 100) + let ids: Vec = chunk + .iter() + .filter(|e| !existing_ids.contains(&e.id)) + .map(|e| format!("'{}'", e.id)) + .collect(); + + if !ids.is_empty() { + let batch_sql = format!( + "UPDATE room_message AS t \ + SET content_tsv = to_tsvector('simple', content) \ + WHERE t.id IN ({})", + ids.join(",") ); let stmt = sea_orm::Statement::from_sql_and_values( sea_orm::DbBackend::Postgres, - &update_sql, + &batch_sql, vec![], ); let _ = db.execute_raw(stmt).await; diff --git a/libs/service/lib.rs b/libs/service/lib.rs index 99841f8..1e428b8 100644 --- a/libs/service/lib.rs +++ b/libs/service/lib.rs @@ -154,6 +154,7 @@ impl AppService { let room_metrics = Arc::new(RoomMetrics::default()); let room_manager = Arc::new(room::connection::RoomConnectionManager::new( room_metrics.clone(), + cache.clone(), )); let redis_url = config