feat(room): implement typing indicator broadcast with Redis 10s TTL

RoomConnectionManager now holds a cache field and typing_inner broadcast
map. broadcast_typing() persists start/stop to Redis (SETEX 10s / DEL)
and broadcasts via tokio channel. ws_universal.rs handles TypingStart/
TypingStop actions and streams typing events to WS clients.
This commit is contained in:
ZhenYi 2026-04-24 00:04:09 +08:00
parent e83512382f
commit fb28fdd056
4 changed files with 138 additions and 9 deletions

View File

@ -114,6 +114,10 @@ pub enum WsAction {
SubscribeProject, SubscribeProject,
#[serde(rename = "project.unsubscribe")] #[serde(rename = "project.unsubscribe")]
UnsubscribeProject, UnsubscribeProject,
#[serde(rename = "typing.start")]
TypingStart,
#[serde(rename = "typing.stop")]
TypingStop,
} }
impl std::fmt::Display for WsAction { impl std::fmt::Display for WsAction {
@ -164,6 +168,8 @@ impl std::fmt::Display for WsAction {
WsAction::UnsubscribeRoom => write!(f, "room.unsubscribe"), WsAction::UnsubscribeRoom => write!(f, "room.unsubscribe"),
WsAction::SubscribeProject => write!(f, "project.subscribe"), WsAction::SubscribeProject => write!(f, "project.subscribe"),
WsAction::UnsubscribeProject => write!(f, "project.unsubscribe"), WsAction::UnsubscribeProject => write!(f, "project.unsubscribe"),
WsAction::TypingStart => write!(f, "typing.start"),
WsAction::TypingStop => write!(f, "typing.stop"),
} }
} }
} }
@ -211,6 +217,8 @@ pub struct WsRequestParams {
pub min_score: Option<f32>, pub min_score: Option<f32>,
pub query: Option<String>, pub query: Option<String>,
pub attachment_ids: Option<Vec<Uuid>>, pub attachment_ids: Option<Vec<Uuid>>,
/// Typing event: "start" or "stop"
pub typing: Option<String>,
} }
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]

View File

@ -9,7 +9,7 @@ use tokio_stream::wrappers::BroadcastStream;
use uuid::Uuid; use uuid::Uuid;
use crate::error::ApiError; use crate::error::ApiError;
use queue::{ReactionGroup, RoomMessageEvent, RoomMessageStreamChunkEvent}; use queue::{ReactionGroup, RoomMessageEvent, RoomMessageStreamChunkEvent, TypingEvent};
use room::connection::RoomConnectionManager; use room::connection::RoomConnectionManager;
use service::AppService; use service::AppService;
@ -40,6 +40,10 @@ pub enum WsPushEvent {
room_id: Uuid, room_id: Uuid,
chunk: Arc<RoomMessageStreamChunkEvent>, chunk: Arc<RoomMessageStreamChunkEvent>,
}, },
TypingIndicator {
room_id: Uuid,
event: Arc<TypingEvent>,
},
} }
/// Maps room_id -> (room_message_broadcast_stream, stream_chunk_broadcast_stream) /// Maps room_id -> (room_message_broadcast_stream, stream_chunk_broadcast_stream)
@ -48,6 +52,7 @@ type PushStreams = HashMap<
( (
BroadcastStream<Arc<RoomMessageEvent>>, BroadcastStream<Arc<RoomMessageEvent>>,
BroadcastStream<Arc<RoomMessageStreamChunkEvent>>, BroadcastStream<Arc<RoomMessageStreamChunkEvent>>,
BroadcastStream<Arc<TypingEvent>>,
), ),
>; >;
@ -245,6 +250,22 @@ pub async fn ws_universal(
break; break;
} }
} }
Some(WsPushEvent::TypingIndicator { room_id, event }) => {
let payload = serde_json::json!({
"type": "event",
"event": "room.typing",
"room_id": room_id,
"data": {
"user_id": event.user_id,
"username": event.username,
"avatar_url": event.avatar_url,
"action": event.action,
},
});
if session.text(payload.to_string()).await.is_err() {
break;
}
}
None => { None => {
} }
} }
@ -307,9 +328,11 @@ pub async fn ws_universal(
match manager.subscribe(room_id, user_id).await { match manager.subscribe(room_id, user_id).await {
Ok(rx) => { Ok(rx) => {
let stream_rx = manager.subscribe_room_stream(room_id).await; let stream_rx = manager.subscribe_room_stream(room_id).await;
let typing_rx = manager.subscribe_typing(room_id).await;
push_streams.insert(room_id, ( push_streams.insert(room_id, (
BroadcastStream::new(rx), BroadcastStream::new(rx),
BroadcastStream::new(stream_rx), BroadcastStream::new(stream_rx),
BroadcastStream::new(typing_rx),
)); ));
let _ = session.text(serde_json::to_string(&WsResponse::success( let _ = session.text(serde_json::to_string(&WsResponse::success(
request.request_id, &action_str, request.request_id, &action_str,
@ -338,6 +361,24 @@ pub async fn ws_universal(
request.request_id, &action_str, WsResponseData::bool(true) request.request_id, &action_str, WsResponseData::bool(true)
)).unwrap_or_default()).await; )).unwrap_or_default()).await;
} }
WsAction::TypingStart | WsAction::TypingStop => {
if let (Some(room_id), Some(action)) =
(request.params().room_id, request.params().typing.as_deref())
{
let names = handler.service().room.get_user_names(&[user_id]).await;
let typing_event = TypingEvent {
room_id,
user_id,
username: names.into_values().next().unwrap_or_else(|| "unknown".to_string()),
avatar_url: None,
action: action.to_string(),
};
manager.broadcast_typing(room_id, typing_event).await;
}
let _ = session.text(serde_json::to_string(&WsResponse::success(
request.request_id, &action_str, WsResponseData::bool(true)
)).unwrap_or_default()).await;
}
_ => { _ => {
let resp = handler.handle(request).await; let resp = handler.handle(request).await;
let _ = session.text(serde_json::to_string(&resp).unwrap_or_default()).await; let _ = session.text(serde_json::to_string(&resp).unwrap_or_default()).await;
@ -383,7 +424,7 @@ async fn poll_push_streams(
let mut dead_rooms: Vec<Uuid> = Vec::new(); let mut dead_rooms: Vec<Uuid> = Vec::new();
for room_id in room_ids { for room_id in room_ids {
if let Some((msg_stream, chunk_stream)) = streams.get_mut(&room_id) { if let Some((msg_stream, chunk_stream, typing_stream)) = streams.get_mut(&room_id) {
tokio::select! { tokio::select! {
result = msg_stream.next() => { result = msg_stream.next() => {
match result { match result {
@ -412,6 +453,16 @@ async fn poll_push_streams(
} }
} }
} }
result = typing_stream.next() => {
match result {
Some(Ok(event)) => {
return Some(WsPushEvent::TypingIndicator { room_id, event });
}
Some(Err(_)) | None => {
// Typing channel going dead is non-fatal — typing is ephemeral
}
}
}
} }
} }
} }
@ -424,9 +475,11 @@ async fn poll_push_streams(
if service.room.check_room_access(room_id, user_id).await.is_ok() { if service.room.check_room_access(room_id, user_id).await.is_ok() {
if let Ok(rx) = manager.subscribe(room_id, user_id).await { if let Ok(rx) = manager.subscribe(room_id, user_id).await {
let stream_rx = manager.subscribe_room_stream(room_id).await; let stream_rx = manager.subscribe_room_stream(room_id).await;
let typing_rx = manager.subscribe_typing(room_id).await;
streams.insert(room_id, ( streams.insert(room_id, (
BroadcastStream::new(rx), BroadcastStream::new(rx),
BroadcastStream::new(stream_rx), BroadcastStream::new(stream_rx),
BroadcastStream::new(typing_rx),
)); ));
} }
} }

View File

@ -7,8 +7,10 @@ use std::time::{Duration, Instant};
use tokio::sync::{RwLock, broadcast}; use tokio::sync::{RwLock, broadcast};
use uuid::Uuid; use uuid::Uuid;
use db::cache::AppCache;
use db::database::AppDatabase; use db::database::AppDatabase;
use models::rooms::{MessageContentType, MessageSenderType, room_message}; use models::rooms::{MessageContentType, MessageSenderType, room_message};
use queue::types::TypingEvent;
use queue::{AgentTaskEvent, ProjectRoomEvent, RoomMessageEnvelope, RoomMessageEvent, RoomMessageStreamChunkEvent}; use queue::{AgentTaskEvent, ProjectRoomEvent, RoomMessageEnvelope, RoomMessageEvent, RoomMessageStreamChunkEvent};
use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, Set}; use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, Set};
@ -33,6 +35,7 @@ pub struct RoomConnectionManager {
/// Broadcast channel for agent task events per project. /// Broadcast channel for agent task events per project.
task_inner: RwLock<HashMap<Uuid, broadcast::Sender<Arc<AgentTaskEvent>>>>, task_inner: RwLock<HashMap<Uuid, broadcast::Sender<Arc<AgentTaskEvent>>>>,
pub metrics: Arc<RoomMetrics>, pub metrics: Arc<RoomMetrics>,
cache: AppCache,
connection_rate: RwLock<HashMap<(Uuid, Uuid), Instant>>, connection_rate: RwLock<HashMap<(Uuid, Uuid), Instant>>,
shutdown_tx: broadcast::Sender<()>, shutdown_tx: broadcast::Sender<()>,
room_shutdown_txs: RwLock<HashMap<Uuid, broadcast::Sender<()>>>, room_shutdown_txs: RwLock<HashMap<Uuid, broadcast::Sender<()>>>,
@ -40,6 +43,7 @@ pub struct RoomConnectionManager {
user_shutdown_txs: RwLock<HashMap<Uuid, broadcast::Sender<()>>>, user_shutdown_txs: RwLock<HashMap<Uuid, broadcast::Sender<()>>>,
stream_inner: RwLock<HashMap<Uuid, broadcast::Sender<Arc<RoomMessageStreamChunkEvent>>>>, stream_inner: RwLock<HashMap<Uuid, broadcast::Sender<Arc<RoomMessageStreamChunkEvent>>>>,
room_stream_inner: RwLock<HashMap<Uuid, broadcast::Sender<Arc<RoomMessageStreamChunkEvent>>>>, room_stream_inner: RwLock<HashMap<Uuid, broadcast::Sender<Arc<RoomMessageStreamChunkEvent>>>>,
typing_inner: RwLock<HashMap<Uuid, broadcast::Sender<Arc<TypingEvent>>>>,
room_last_activity: RwLock<HashMap<Uuid, Instant>>, room_last_activity: RwLock<HashMap<Uuid, Instant>>,
room_subscriber_count: RwLock<HashMap<Uuid, usize>>, room_subscriber_count: RwLock<HashMap<Uuid, usize>>,
project_subscriber_count: RwLock<HashMap<Uuid, usize>>, project_subscriber_count: RwLock<HashMap<Uuid, usize>>,
@ -47,7 +51,7 @@ pub struct RoomConnectionManager {
} }
impl RoomConnectionManager { impl RoomConnectionManager {
pub fn new(metrics: Arc<RoomMetrics>) -> Self { pub fn new(metrics: Arc<RoomMetrics>, cache: AppCache) -> Self {
let (shutdown_tx, _) = broadcast::channel(SHUTDOWN_CHANNEL_CAPACITY); let (shutdown_tx, _) = broadcast::channel(SHUTDOWN_CHANNEL_CAPACITY);
Self { Self {
#[allow(clippy::default_constructed_unit_structs)] #[allow(clippy::default_constructed_unit_structs)]
@ -61,6 +65,7 @@ impl RoomConnectionManager {
#[allow(clippy::default_constructed_unit_structs)] #[allow(clippy::default_constructed_unit_structs)]
task_inner: RwLock::new(HashMap::new()), task_inner: RwLock::new(HashMap::new()),
metrics, metrics,
cache,
#[allow(clippy::default_constructed_unit_structs)] #[allow(clippy::default_constructed_unit_structs)]
connection_rate: RwLock::new(HashMap::new()), connection_rate: RwLock::new(HashMap::new()),
shutdown_tx, shutdown_tx,
@ -75,6 +80,8 @@ impl RoomConnectionManager {
#[allow(clippy::default_constructed_unit_structs)] #[allow(clippy::default_constructed_unit_structs)]
room_stream_inner: RwLock::new(HashMap::new()), room_stream_inner: RwLock::new(HashMap::new()),
#[allow(clippy::default_constructed_unit_structs)] #[allow(clippy::default_constructed_unit_structs)]
typing_inner: RwLock::new(HashMap::new()),
#[allow(clippy::default_constructed_unit_structs)]
room_last_activity: RwLock::new(HashMap::new()), room_last_activity: RwLock::new(HashMap::new()),
#[allow(clippy::default_constructed_unit_structs)] #[allow(clippy::default_constructed_unit_structs)]
room_subscriber_count: RwLock::new(HashMap::new()), room_subscriber_count: RwLock::new(HashMap::new()),
@ -621,6 +628,57 @@ impl RoomConnectionManager {
let mut map = self.stream_inner.write().await; let mut map = self.stream_inner.write().await;
map.remove(&message_id); map.remove(&message_id);
} }
pub async fn subscribe_typing(
&self,
room_id: Uuid,
) -> broadcast::Receiver<Arc<TypingEvent>> {
let mut map: tokio::sync::RwLockWriteGuard<'_, std::collections::HashMap<Uuid, broadcast::Sender<Arc<TypingEvent>>>> = self.typing_inner.write().await;
if let Some(tx) = map.get(&room_id) {
return tx.subscribe();
}
let (tx, rx) = broadcast::channel(BROADCAST_CAPACITY);
map.insert(room_id, tx);
rx
}
/// Broadcast a typing event and persist it to Redis with 10s TTL.
/// - "start": writes key with 10s expiry, broadcasts start event
/// - "stop": deletes key, broadcasts stop event
pub async fn broadcast_typing(&self, room_id: Uuid, event: TypingEvent) {
let user_key = format!("typing:{}:{}", room_id, event.user_id);
let action = event.action.clone();
let username = event.username.clone();
let avatar_url = event.avatar_url.clone();
// Write/delete Redis key for 10s expiry (non-blocking)
if let Ok(mut conn) = self.cache.conn().await {
let key = user_key;
tokio::spawn(async move {
if action == "start" {
let value = serde_json::json!({
"username": username,
"avatar_url": avatar_url,
})
.to_string();
let _: Result<(), _> = redis::cmd("SETEX")
.arg(&key)
.arg(10i64)
.arg(&value)
.query_async(&mut conn)
.await;
} else {
let _: Result<(), _> = redis::cmd("DEL").arg(&key).query_async(&mut conn).await;
}
});
}
let map: tokio::sync::RwLockReadGuard<'_, std::collections::HashMap<Uuid, broadcast::Sender<Arc<TypingEvent>>>> = self.typing_inner.read().await;
if let Some(tx) = map.get(&room_id) {
let event = Arc::new(event);
let _ = tx.send(event);
}
}
} }
fn parse_sender_type(s: &str) -> MessageSenderType { fn parse_sender_type(s: &str) -> MessageSenderType {
@ -738,15 +796,24 @@ pub fn make_persist_fn(
.exec(&db) .exec(&db)
.await?; .await?;
// Update content_tsv for inserted messages // Batch update content_tsv using a single UPDATE with subquery
for env in chunk.iter() { // instead of N individual UPDATE statements (N=chunk size, up to 100)
let update_sql = format!( let ids: Vec<String> = chunk
"UPDATE room_message SET content_tsv = to_tsvector('simple', content) WHERE id = '{}'", .iter()
env.id .filter(|e| !existing_ids.contains(&e.id))
.map(|e| format!("'{}'", e.id))
.collect();
if !ids.is_empty() {
let batch_sql = format!(
"UPDATE room_message AS t \
SET content_tsv = to_tsvector('simple', content) \
WHERE t.id IN ({})",
ids.join(",")
); );
let stmt = sea_orm::Statement::from_sql_and_values( let stmt = sea_orm::Statement::from_sql_and_values(
sea_orm::DbBackend::Postgres, sea_orm::DbBackend::Postgres,
&update_sql, &batch_sql,
vec![], vec![],
); );
let _ = db.execute_raw(stmt).await; let _ = db.execute_raw(stmt).await;

View File

@ -154,6 +154,7 @@ impl AppService {
let room_metrics = Arc::new(RoomMetrics::default()); let room_metrics = Arc::new(RoomMetrics::default());
let room_manager = Arc::new(room::connection::RoomConnectionManager::new( let room_manager = Arc::new(room::connection::RoomConnectionManager::new(
room_metrics.clone(), room_metrics.clone(),
cache.clone(),
)); ));
let redis_url = config let redis_url = config