From 18ea3cc355ab226b82f906d9b59c49a9e6ed2641 Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Thu, 14 May 2026 10:02:21 +0800 Subject: [PATCH] refactor(room): apply rustfmt formatting --- libs/room/src/ai.rs | 12 +- libs/room/src/connection/lifecycle.rs | 2 +- libs/room/src/connection/mod.rs | 23 ++- libs/room/src/connection/persist.rs | 1 + libs/room/src/connection/project_ops.rs | 18 +- libs/room/src/connection/pubsub.rs | 16 +- libs/room/src/connection/rate_limit.rs | 22 +- libs/room/src/connection/room_ops.rs | 16 +- libs/room/src/connection/stream.rs | 40 +++- libs/room/src/connection/typing.rs | 45 ++++- libs/room/src/connection/user_ops.rs | 16 +- libs/room/src/draft_and_history.rs | 35 ++-- libs/room/src/helpers.rs | 26 ++- libs/room/src/helpers_tests.rs | 50 ++++- libs/room/src/lib.rs | 10 +- libs/room/src/member.rs | 69 +++++-- libs/room/src/message.rs | 9 +- libs/room/src/message_write.rs | 20 +- libs/room/src/metrics.rs | 7 +- libs/room/src/notification_write.rs | 2 +- libs/room/src/presence.rs | 19 +- libs/room/src/reaction.rs | 177 ++++++++++++---- libs/room/src/room_ai_queue.rs | 3 +- libs/room/src/room_write.rs | 93 +++++++-- libs/room/src/search.rs | 190 +++++++++++++++--- libs/room/src/search_write.rs | 40 +++- libs/room/src/service/access.rs | 16 +- libs/room/src/service/access_write.rs | 30 +-- .../src/service/ai_mode_streaming_post.rs | 4 +- libs/room/src/service/ai_nonstreaming.rs | 2 +- .../room/src/service/ai_react_nonstreaming.rs | 9 +- libs/room/src/service/ai_react_streaming.rs | 16 +- .../src/service/ai_react_streaming_post.rs | 65 +++--- .../src/service/ai_react_streaming_steps.rs | 4 +- libs/room/src/service/ai_service.rs | 186 ++++++++++------- libs/room/src/service/ai_streaming.rs | 29 +-- libs/room/src/service/mod.rs | 105 ++++++---- libs/room/src/service/process_ai.rs | 81 ++++---- libs/room/src/service/type_convert.rs | 10 +- libs/room/src/service/validation.rs | 50 +++-- libs/room/src/service/workers.rs | 7 +- libs/room/src/types_responses.rs | 12 +- 42 files changed, 1085 insertions(+), 502 deletions(-) diff --git a/libs/room/src/ai.rs b/libs/room/src/ai.rs index 2db74d0..aef3169 100644 --- a/libs/room/src/ai.rs +++ b/libs/room/src/ai.rs @@ -32,10 +32,8 @@ impl RoomService { .all(&self.db) .await?; - let model_names: std::collections::HashMap = models - .into_iter() - .map(|m| (m.id, m.name)) - .collect(); + let model_names: std::collections::HashMap = + models.into_iter().map(|m| (m.id, m.name)).collect(); let mut responses = Vec::with_capacity(configs.len()); for config in configs { @@ -178,11 +176,7 @@ impl RoomService { Ok(()) } - pub async fn room_ai_stop( - &self, - room_id: Uuid, - ctx: &WsUserContext, - ) -> Result<(), RoomError> { + pub async fn room_ai_stop(&self, room_id: Uuid, ctx: &WsUserContext) -> Result<(), RoomError> { let user_id = ctx.user_id; self.require_room_access(room_id, user_id).await?; tracing::info!(%room_id, %user_id, "AI stream stop requested"); diff --git a/libs/room/src/connection/lifecycle.rs b/libs/room/src/connection/lifecycle.rs index 2e3c087..d2be89d 100644 --- a/libs/room/src/connection/lifecycle.rs +++ b/libs/room/src/connection/lifecycle.rs @@ -1,5 +1,5 @@ -use uuid::Uuid; use tokio::sync::broadcast; +use uuid::Uuid; use super::{RoomConnectionManager, SHUTDOWN_CHANNEL_CAPACITY}; diff --git a/libs/room/src/connection/mod.rs b/libs/room/src/connection/mod.rs index 7dc55db..cadb1da 100644 --- a/libs/room/src/connection/mod.rs +++ b/libs/room/src/connection/mod.rs @@ -8,14 +8,14 @@ mod stream; mod typing; mod user_ops; -use std::sync::Arc; use std::collections::HashMap; -use uuid::Uuid; +use std::sync::Arc; use tokio::sync::{RwLock, broadcast}; +use uuid::Uuid; -use db::cache::AppCache; use crate::metrics::RoomMetrics; use crate::types::NotificationEvent; +use db::cache::AppCache; use queue::types::TypingEvent; use queue::{AgentTaskEvent, ProjectRoomEvent, RoomMessageEvent, RoomMessageStreamChunkEvent}; @@ -27,6 +27,8 @@ pub const CONNECTION_COOLDOWN: Duration = Duration::from_secs(30); pub const MAX_CONNECTIONS_PER_ROOM: usize = 50000; pub const MAX_CONNECTIONS_PER_PROJECT: usize = 50000; pub const MAX_CONNECTIONS_PER_USER: usize = 50000; +/// Maximum rooms a single WS session can subscribe to. +pub const MAX_ROOMS_PER_SESSION: usize = 100; pub const BATCH_SIZE: usize = 100; pub const ROOM_IDLE_TIMEOUT: Duration = Duration::from_secs(30 * 60); pub const REPLAY_BUFFER_SIZE: usize = 100; @@ -56,7 +58,8 @@ pub struct RoomConnectionManager { project_shutdown_txs: RwLock>>, user_shutdown_txs: RwLock>>, stream_inner: RwLock>>>, - room_stream_inner: RwLock>, usize)>>, + room_stream_inner: + RwLock>, usize)>>, /// Active AI streams keyed by message_id. Used to replay buffered chunks to /// late-joining subscribers who missed the stream start. active_streams: RwLock>, @@ -102,11 +105,19 @@ impl RoomConnectionManager { } pub use persist::{DedupCache, PersistFn, cleanup_dedup_cache, make_persist_fn}; -pub use pubsub::{subscribe_room_events, subscribe_room_stream_chunk_events, subscribe_project_room_events, subscribe_task_events_fn}; +pub use pubsub::{ + subscribe_project_room_events, subscribe_room_events, subscribe_room_stream_chunk_events, + subscribe_task_events_fn, +}; /// Extract a Redis connection getter from MessageProducer. /// Used for cache operations (notification unread count, etc.), NOT for message broadcasting. -pub type RedisFuture = std::pin::Pin> + Send>>; +pub type RedisFuture = std::pin::Pin< + Box< + dyn std::future::Future> + + Send, + >, +>; pub fn extract_get_redis( queue: queue::MessageProducer, diff --git a/libs/room/src/connection/persist.rs b/libs/room/src/connection/persist.rs index d4927df..dbd9c4d 100644 --- a/libs/room/src/connection/persist.rs +++ b/libs/room/src/connection/persist.rs @@ -233,6 +233,7 @@ async fn embed_persisted_messages( let project_name = room_project.get(&m.room_id)?; Some(agent::embed::EmbedMemoryInput { message_id: m.id.to_string(), + seq: m.seq, content: m.content.clone(), project_name: project_name.clone(), room_id: m.room_id.to_string(), diff --git a/libs/room/src/connection/project_ops.rs b/libs/room/src/connection/project_ops.rs index eb81ee0..ff8455f 100644 --- a/libs/room/src/connection/project_ops.rs +++ b/libs/room/src/connection/project_ops.rs @@ -1,12 +1,19 @@ use std::sync::Arc; -use uuid::Uuid; use tokio::sync::broadcast; +use uuid::Uuid; +use super::{ + AgentTaskEvent, BROADCAST_CAPACITY, MAX_CONNECTIONS_PER_PROJECT, ProjectRoomEvent, + RoomConnectionManager, +}; use crate::error::RoomError; -use super::{RoomConnectionManager, ProjectRoomEvent, AgentTaskEvent, BROADCAST_CAPACITY, MAX_CONNECTIONS_PER_PROJECT}; impl RoomConnectionManager { - pub async fn subscribe_project(&self, project_id: Uuid, _user_id: Uuid) -> Result>, RoomError> { + pub async fn subscribe_project( + &self, + project_id: Uuid, + _user_id: Uuid, + ) -> Result>, RoomError> { let mut map = self.project_inner.write().await; if map.get(&project_id).is_some() { drop(map); @@ -68,7 +75,10 @@ impl RoomConnectionManager { } } - pub async fn subscribe_task_events(&self, project_id: Uuid) -> Result>, RoomError> { + pub async fn subscribe_task_events( + &self, + project_id: Uuid, + ) -> Result>, RoomError> { let mut map = self.task_inner.write().await; if let Some(sender) = map.get(&project_id).cloned() { diff --git a/libs/room/src/connection/pubsub.rs b/libs/room/src/connection/pubsub.rs index 4b6d11d..aed610a 100644 --- a/libs/room/src/connection/pubsub.rs +++ b/libs/room/src/connection/pubsub.rs @@ -87,12 +87,7 @@ async fn consume_room_broadcast( let mut count = 0usize; while count < 100 { - match tokio::time::timeout( - std::time::Duration::from_millis(200), - messages.next(), - ) - .await - { + match tokio::time::timeout(std::time::Duration::from_millis(200), messages.next()).await { Ok(Some(Ok(msg))) => { match serde_json::from_slice::(&msg.payload) { Ok(event) => { @@ -192,12 +187,7 @@ async fn consume_chunk_broadcast( let mut count = 0usize; while count < 100 { - match tokio::time::timeout( - std::time::Duration::from_millis(200), - messages.next(), - ) - .await - { + match tokio::time::timeout(std::time::Duration::from_millis(200), messages.next()).await { Ok(Some(Ok(msg))) => { match serde_json::from_slice::(&msg.payload) { Ok(event) => { @@ -317,4 +307,4 @@ pub async fn subscribe_task_events_fn( } } tracing::info!(project_id = %project_id, "task events subscriber stopped"); -} \ No newline at end of file +} diff --git a/libs/room/src/connection/rate_limit.rs b/libs/room/src/connection/rate_limit.rs index af462fd..9b22fa5 100644 --- a/libs/room/src/connection/rate_limit.rs +++ b/libs/room/src/connection/rate_limit.rs @@ -1,11 +1,17 @@ use std::time::Instant; use uuid::Uuid; +use super::{ + CONNECTION_COOLDOWN, MAX_CONNECTIONS_PER_ROOM, ROOM_IDLE_TIMEOUT, RoomConnectionManager, +}; use crate::error::RoomError; -use super::{RoomConnectionManager, CONNECTION_COOLDOWN, MAX_CONNECTIONS_PER_ROOM, ROOM_IDLE_TIMEOUT}; impl RoomConnectionManager { - pub async fn check_room_connection_rate(&self, room_id: Uuid, user_id: Uuid) -> Result<(), RoomError> { + pub async fn check_room_connection_rate( + &self, + room_id: Uuid, + user_id: Uuid, + ) -> Result<(), RoomError> { let mut map = self.connection_rate.write().await; let key = (room_id, user_id); if let Some(last) = map.get(&key) { @@ -21,7 +27,11 @@ impl RoomConnectionManager { Ok(()) } - pub async fn check_project_connection_rate(&self, project_id: Uuid, user_id: Uuid) -> Result<(), RoomError> { + pub async fn check_project_connection_rate( + &self, + project_id: Uuid, + user_id: Uuid, + ) -> Result<(), RoomError> { let mut map = self.connection_rate.write().await; let key = (project_id, user_id); if let Some(last) = map.get(&key) { @@ -61,7 +71,11 @@ impl RoomConnectionManager { let mut entries: Vec<_> = map.iter().collect(); entries.sort_by(|a, b| a.1.cmp(b.1)); let keep_count = entries.len() / 2; - let to_remove: Vec<_> = entries.into_iter().take(keep_count).map(|(k, _)| *k).collect(); + let to_remove: Vec<_> = entries + .into_iter() + .take(keep_count) + .map(|(k, _)| *k) + .collect(); for key in to_remove { map.remove(&key); } diff --git a/libs/room/src/connection/room_ops.rs b/libs/room/src/connection/room_ops.rs index f2f5634..8887221 100644 --- a/libs/room/src/connection/room_ops.rs +++ b/libs/room/src/connection/room_ops.rs @@ -1,12 +1,18 @@ use std::sync::Arc; -use uuid::Uuid; use tokio::sync::broadcast; +use uuid::Uuid; +use super::{ + BROADCAST_CAPACITY, MAX_CONNECTIONS_PER_ROOM, RoomConnectionManager, RoomMessageEvent, +}; use crate::error::RoomError; -use super::{RoomConnectionManager, RoomMessageEvent, BROADCAST_CAPACITY, MAX_CONNECTIONS_PER_ROOM}; impl RoomConnectionManager { - pub async fn subscribe(&self, room_id: Uuid, _user_id: Uuid) -> Result>, RoomError> { + pub async fn subscribe( + &self, + room_id: Uuid, + _user_id: Uuid, + ) -> Result>, RoomError> { let mut map = self.room_inner.write().await; if let Some(_sender) = map.get(&room_id) { drop(map); @@ -16,7 +22,9 @@ impl RoomConnectionManager { if let Some(sender) = map.get(&room_id) { return Ok(sender.subscribe()); } - return Err(RoomError::Internal("room disappeared during subscribe".into())); + return Err(RoomError::Internal( + "room disappeared during subscribe".into(), + )); } if map.len() >= MAX_CONNECTIONS_PER_ROOM { diff --git a/libs/room/src/connection/stream.rs b/libs/room/src/connection/stream.rs index 8b3f284..90139fb 100644 --- a/libs/room/src/connection/stream.rs +++ b/libs/room/src/connection/stream.rs @@ -1,11 +1,18 @@ use std::sync::Arc; +use tokio::sync::{RwLock, broadcast}; use uuid::Uuid; -use tokio::sync::{broadcast, RwLock}; -use super::{RoomConnectionManager, RoomMessageStreamChunkEvent, BROADCAST_CAPACITY, REPLAY_BUFFER_SIZE}; +use super::{ + BROADCAST_CAPACITY, REPLAY_BUFFER_SIZE, RoomConnectionManager, RoomMessageStreamChunkEvent, +}; impl RoomConnectionManager { - pub async fn register_stream_channel(&self, message_id: Uuid, room_id: Uuid, display_name: Option) -> broadcast::Receiver> { + pub async fn register_stream_channel( + &self, + message_id: Uuid, + room_id: Uuid, + display_name: Option, + ) -> broadcast::Receiver> { let mut map = self.stream_inner.write().await; if let Some(tx) = map.get(&message_id) { return tx.subscribe(); @@ -27,12 +34,18 @@ impl RoomConnectionManager { rx } - pub async fn subscribe_stream(&self, message_id: Uuid) -> Option>> { + pub async fn subscribe_stream( + &self, + message_id: Uuid, + ) -> Option>> { let map = self.stream_inner.read().await; map.get(&message_id).map(|tx| tx.subscribe()) } - pub async fn subscribe_room_stream(&self, room_id: Uuid) -> broadcast::Receiver> { + pub async fn subscribe_room_stream( + &self, + room_id: Uuid, + ) -> broadcast::Receiver> { // New subscriber: replay active streams in this room so they catch up, // then subscribe to the room's channel. @@ -47,7 +60,9 @@ impl RoomConnectionManager { // Replay buffered chunks to existing channel so all subscribers receive them. let active = self.active_streams.read().await; for (&msg_id, meta) in active.iter() { - if meta.room_id != room_id { continue; } + if meta.room_id != room_id { + continue; + } let start_event = Arc::new(RoomMessageStreamChunkEvent { message_id: msg_id, room_id, @@ -73,7 +88,9 @@ impl RoomConnectionManager { // Replay buffered chunks to new channel. let active = self.active_streams.read().await; for (&msg_id, meta) in active.iter() { - if meta.room_id != room_id { continue; } + if meta.room_id != room_id { + continue; + } let start_event = Arc::new(RoomMessageStreamChunkEvent { message_id: msg_id, room_id, @@ -121,7 +138,9 @@ impl RoomConnectionManager { // Also update room_to_streams reverse index. if is_start { let mut r2s = self.room_to_streams.write().await; - r2s.entry(event.room_id).or_default().insert(event.message_id); + r2s.entry(event.room_id) + .or_default() + .insert(event.message_id); } } @@ -182,7 +201,10 @@ impl RoomConnectionManager { } } - pub async fn register_stream_cancel(&self, room_id: Uuid) -> Arc { + pub async fn register_stream_cancel( + &self, + room_id: Uuid, + ) -> Arc { let cancel = Arc::new(std::sync::atomic::AtomicBool::new(false)); let mut map = self.stream_cancel_tokens.write().await; map.insert(room_id, cancel.clone()); diff --git a/libs/room/src/connection/typing.rs b/libs/room/src/connection/typing.rs index 4c0b380..6d2c22e 100644 --- a/libs/room/src/connection/typing.rs +++ b/libs/room/src/connection/typing.rs @@ -1,13 +1,16 @@ -use std::sync::Arc; -use uuid::Uuid; -use tokio::sync::{RwLockReadGuard, RwLockWriteGuard, broadcast}; use queue::types::TypingEvent; +use std::sync::Arc; +use tokio::sync::{RwLockReadGuard, RwLockWriteGuard, broadcast}; +use uuid::Uuid; -use super::{RoomConnectionManager, BROADCAST_CAPACITY}; +use super::{BROADCAST_CAPACITY, RoomConnectionManager}; impl RoomConnectionManager { pub async fn subscribe_typing(&self, room_id: Uuid) -> broadcast::Receiver> { - let mut map: RwLockWriteGuard<'_, std::collections::HashMap>>> = self.typing_inner.write().await; + let mut map: RwLockWriteGuard< + '_, + std::collections::HashMap>>, + > = self.typing_inner.write().await; let tx = map.entry(room_id).or_insert_with(|| { let (tx, _) = broadcast::channel(BROADCAST_CAPACITY); tx @@ -24,7 +27,10 @@ impl RoomConnectionManager { let action = event.action.clone(); let username = event.username.clone(); let avatar_url = event.avatar_url.clone(); - let sender_type = event.sender_type.clone().unwrap_or_else(|| "user".to_string()); + let sender_type = event + .sender_type + .clone() + .unwrap_or_else(|| "user".to_string()); if let Ok(mut conn) = self.cache.conn().await { let key = user_key; @@ -46,7 +52,10 @@ impl RoomConnectionManager { } } - let map: RwLockReadGuard<'_, std::collections::HashMap>>> = self.typing_inner.read().await; + let map: RwLockReadGuard< + '_, + std::collections::HashMap>>, + > = self.typing_inner.read().await; if let Some(tx) = map.get(&room_id) { let event = Arc::new(event); let _ = tx.send(event); @@ -85,17 +94,31 @@ impl RoomConnectionManager { let parts: Vec<&str> = key.split(':').collect(); let user_id = parts.get(2).and_then(|s| Uuid::parse_str(s).ok()); if let (Some(value), Some(user_uuid)) = ( - redis::cmd("GET").arg(&key).query_async::(&mut conn).await.ok(), + redis::cmd("GET") + .arg(&key) + .query_async::(&mut conn) + .await + .ok(), user_id, ) { if let Ok(parsed) = serde_json::from_str::(&value) { results.push(TypingEvent { room_id, user_id: user_uuid, - username: parsed.get("username").and_then(|v| v.as_str()).unwrap_or("").to_string(), - avatar_url: parsed.get("avatar_url").and_then(|v| v.as_str()).map(String::from), + username: parsed + .get("username") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(), + avatar_url: parsed + .get("avatar_url") + .and_then(|v| v.as_str()) + .map(String::from), action: "start".to_string(), - sender_type: parsed.get("sender_type").and_then(|v| v.as_str()).map(String::from), + sender_type: parsed + .get("sender_type") + .and_then(|v| v.as_str()) + .map(String::from), }); } } diff --git a/libs/room/src/connection/user_ops.rs b/libs/room/src/connection/user_ops.rs index 0efedbb..cc61a06 100644 --- a/libs/room/src/connection/user_ops.rs +++ b/libs/room/src/connection/user_ops.rs @@ -1,13 +1,18 @@ use std::sync::Arc; -use uuid::Uuid; use tokio::sync::broadcast; +use uuid::Uuid; +use super::{ + BROADCAST_CAPACITY, MAX_CONNECTIONS_PER_USER, ProjectRoomEvent, RoomConnectionManager, +}; use crate::error::RoomError; use crate::types::NotificationEvent; -use super::{RoomConnectionManager, ProjectRoomEvent, BROADCAST_CAPACITY, MAX_CONNECTIONS_PER_USER}; impl RoomConnectionManager { - pub async fn subscribe_user(&self, user_id: Uuid) -> Result>, RoomError> { + pub async fn subscribe_user( + &self, + user_id: Uuid, + ) -> Result>, RoomError> { let mut map = self.user_inner.write().await; if let Some(_sender) = map.get(&user_id) { @@ -62,7 +67,10 @@ impl RoomConnectionManager { } } - pub async fn subscribe_user_notification(&self, user_id: Uuid) -> broadcast::Receiver> { + pub async fn subscribe_user_notification( + &self, + user_id: Uuid, + ) -> broadcast::Receiver> { let mut map = self.user_notification_inner.write().await; if let Some(sender) = map.get(&user_id) { return sender.subscribe(); diff --git a/libs/room/src/draft_and_history.rs b/libs/room/src/draft_and_history.rs index dd06bf0..17dfe3d 100644 --- a/libs/room/src/draft_and_history.rs +++ b/libs/room/src/draft_and_history.rs @@ -142,10 +142,7 @@ impl RoomService { }; // Batch fetch room names to avoid N+1 queries - let room_ids: Vec = notifications - .iter() - .filter_map(|n| n.room) - .collect(); + let room_ids: Vec = notifications.iter().filter_map(|n| n.room).collect(); let rooms: std::collections::HashMap = if !room_ids.is_empty() { models::rooms::room::Entity::find() .filter(models::rooms::room::Column::Id.is_in(room_ids)) @@ -233,10 +230,14 @@ impl RoomService { ) -> Result { let user_id = ctx.user_id; self.require_room_access(room_id, user_id).await?; - + let key = format!("room:{}:draft:{}", room_id, user_id); - let mut conn = self.cache.conn().await.map_err(|e| RoomError::Internal(e.to_string()))?; - + let mut conn = self + .cache + .conn() + .await + .map_err(|e| RoomError::Internal(e.to_string()))?; + let now = Utc::now(); deadpool_redis::redis::cmd("SETEX") .arg(&key) @@ -245,7 +246,7 @@ impl RoomService { .query_async::<()>(&mut conn) .await .map_err(|e| RoomError::Internal(e.to_string()))?; - + Ok(DraftResponse { room_id, content, @@ -253,23 +254,23 @@ impl RoomService { }) } - pub async fn draft_clear( - &self, - room_id: Uuid, - ctx: &WsUserContext, - ) -> Result<(), RoomError> { + pub async fn draft_clear(&self, room_id: Uuid, ctx: &WsUserContext) -> Result<(), RoomError> { let user_id = ctx.user_id; self.require_room_access(room_id, user_id).await?; - + let key = format!("room:{}:draft:{}", room_id, user_id); - let mut conn = self.cache.conn().await.map_err(|e| RoomError::Internal(e.to_string()))?; - + let mut conn = self + .cache + .conn() + .await + .map_err(|e| RoomError::Internal(e.to_string()))?; + deadpool_redis::redis::cmd("DEL") .arg(&key) .query_async::<()>(&mut conn) .await .map_err(|e| RoomError::Internal(e.to_string()))?; - + Ok(()) } } diff --git a/libs/room/src/helpers.rs b/libs/room/src/helpers.rs index c4f6b90..f4a2907 100644 --- a/libs/room/src/helpers.rs +++ b/libs/room/src/helpers.rs @@ -45,13 +45,25 @@ impl RoomService { let chunked = super::RoomMessageResponse::detect_chunked(&msg.thinking_content); super::RoomMessageResponse { - id: msg.id, seq: msg.seq, room: msg.room, sender_type, - sender_id: msg.sender_id, display_name, thread: msg.thread, - content: msg.content, content_type: msg.content_type.to_string(), - thinking_content: msg.thinking_content, thinking_is_chunked: chunked, - edited_at: msg.edited_at, send_at: msg.send_at, revoked: msg.revoked, - revoked_by: msg.revoked_by, in_reply_to: msg.in_reply_to, - highlighted_content: None, attachment_ids: Vec::new(), reactions: Vec::new(), + id: msg.id, + seq: msg.seq, + room: msg.room, + sender_type, + sender_id: msg.sender_id, + display_name, + thread: msg.thread, + content: msg.content, + content_type: msg.content_type.to_string(), + thinking_content: msg.thinking_content, + thinking_is_chunked: chunked, + edited_at: msg.edited_at, + send_at: msg.send_at, + revoked: msg.revoked, + revoked_by: msg.revoked_by, + in_reply_to: msg.in_reply_to, + highlighted_content: None, + attachment_ids: Vec::new(), + reactions: Vec::new(), } } } diff --git a/libs/room/src/helpers_tests.rs b/libs/room/src/helpers_tests.rs index ef731a4..b962c00 100644 --- a/libs/room/src/helpers_tests.rs +++ b/libs/room/src/helpers_tests.rs @@ -5,22 +5,46 @@ mod tests { #[test] fn test_parse_message_content_type_valid() { - assert!(matches!(RoomService::parse_message_content_type(Some("text".into())).unwrap(), MessageContentType::Text)); - assert!(matches!(RoomService::parse_message_content_type(Some("image".into())).unwrap(), MessageContentType::Image)); - assert!(matches!(RoomService::parse_message_content_type(Some("audio".into())).unwrap(), MessageContentType::Audio)); - assert!(matches!(RoomService::parse_message_content_type(Some("video".into())).unwrap(), MessageContentType::Video)); - assert!(matches!(RoomService::parse_message_content_type(Some("file".into())).unwrap(), MessageContentType::File)); + assert!(matches!( + RoomService::parse_message_content_type(Some("text".into())).unwrap(), + MessageContentType::Text + )); + assert!(matches!( + RoomService::parse_message_content_type(Some("image".into())).unwrap(), + MessageContentType::Image + )); + assert!(matches!( + RoomService::parse_message_content_type(Some("audio".into())).unwrap(), + MessageContentType::Audio + )); + assert!(matches!( + RoomService::parse_message_content_type(Some("video".into())).unwrap(), + MessageContentType::Video + )); + assert!(matches!( + RoomService::parse_message_content_type(Some("file".into())).unwrap(), + MessageContentType::File + )); } #[test] fn test_parse_message_content_type_case_insensitive() { - assert!(matches!(RoomService::parse_message_content_type(Some("TEXT".into())).unwrap(), MessageContentType::Text)); - assert!(matches!(RoomService::parse_message_content_type(Some("Image".into())).unwrap(), MessageContentType::Image)); + assert!(matches!( + RoomService::parse_message_content_type(Some("TEXT".into())).unwrap(), + MessageContentType::Text + )); + assert!(matches!( + RoomService::parse_message_content_type(Some("Image".into())).unwrap(), + MessageContentType::Image + )); } #[test] fn test_parse_message_content_type_none_defaults_to_text() { - assert!(matches!(RoomService::parse_message_content_type(None).unwrap(), MessageContentType::Text)); + assert!(matches!( + RoomService::parse_message_content_type(None).unwrap(), + MessageContentType::Text + )); } #[test] @@ -118,7 +142,9 @@ mod tests { #[test] fn test_mention_bracket_re_matches_ai_model() { let re = crate::service::mention_bracket_re(); - let caps: Vec<_> = re.captures_iter("@[ai:550e8400-0000-0000-0000-000000000001:GPT-4]").collect(); + let caps: Vec<_> = re + .captures_iter("@[ai:550e8400-0000-0000-0000-000000000001:GPT-4]") + .collect(); assert_eq!(caps.len(), 1); assert_eq!(&caps[0][1], "ai"); assert_eq!(&caps[0][2], "550e8400-0000-0000-0000-000000000001"); @@ -127,7 +153,9 @@ mod tests { #[test] fn test_mention_bracket_re_matches_user() { let re = crate::service::mention_bracket_re(); - let caps: Vec<_> = re.captures_iter("@[user:850e8400-0000-0000-0000-000000000002:John]").collect(); + let caps: Vec<_> = re + .captures_iter("@[user:850e8400-0000-0000-0000-000000000002:John]") + .collect(); assert_eq!(caps.len(), 1); assert_eq!(&caps[0][1], "user"); } @@ -173,4 +201,4 @@ mod tests { assert_eq!(bracket_re.captures_iter(content).count(), 1); assert_eq!(tag_re.captures_iter(content).count(), 1); } -} \ No newline at end of file +} diff --git a/libs/room/src/lib.rs b/libs/room/src/lib.rs index ef68c1c..548e3ef 100644 --- a/libs/room/src/lib.rs +++ b/libs/room/src/lib.rs @@ -26,11 +26,10 @@ pub mod types; pub mod types_responses; pub mod ws_context; -pub use presence::PresenceStore; pub use connection::{ - PersistFn, RoomConnectionManager, cleanup_dedup_cache, extract_get_redis, - make_persist_fn, subscribe_project_room_events, subscribe_room_events, - subscribe_room_stream_chunk_events, subscribe_task_events_fn, + PersistFn, RoomConnectionManager, cleanup_dedup_cache, extract_get_redis, make_persist_fn, + subscribe_project_room_events, subscribe_room_events, subscribe_room_stream_chunk_events, + subscribe_task_events_fn, }; pub use draft_and_history::{ DraftResponse, DraftSaveRequest, MentionNotificationResponse, MessageEditHistoryEntry, @@ -38,7 +37,8 @@ pub use draft_and_history::{ }; pub use error::RoomError; pub use metrics::RoomMetrics; +pub use presence::PresenceStore; pub use reaction::{MessageReactionsResponse, MessageSearchResponse}; -pub use service::{RoomService, PushNotificationFn}; +pub use service::{PushNotificationFn, RoomService}; pub use types::{RoomEventType, *}; pub use types_responses::*; diff --git a/libs/room/src/member.rs b/libs/room/src/member.rs index f6b2f46..b7f4da0 100644 --- a/libs/room/src/member.rs +++ b/libs/room/src/member.rs @@ -38,7 +38,10 @@ impl RoomService { .all(&self.db) .await?; for m in project_admins { - if matches!(m.scope_role(), Ok(models::projects::MemberRole::Owner | models::projects::MemberRole::Admin)) { + if matches!( + m.scope_role(), + Ok(models::projects::MemberRole::Owner | models::projects::MemberRole::Admin) + ) { if !user_ids.contains(&m.user) { user_ids.push(m.user); } @@ -66,7 +69,16 @@ impl RoomService { .all(&self.db) .await? .into_iter() - .map(|u| (u.uid, super::UserInfo { uid: u.uid, username: u.username, avatar_url: u.avatar_url })) + .map(|u| { + ( + u.uid, + super::UserInfo { + uid: u.uid, + username: u.username, + avatar_url: u.avatar_url, + }, + ) + }) .collect() } else { std::collections::HashMap::new() @@ -89,29 +101,42 @@ impl RoomService { .await?; let role_map: std::collections::HashMap = project_member_list .into_iter() - .map(|m| (m.user, m.scope_role().map(|r| r.to_string()).unwrap_or_else(|_| "member".to_string()))) + .map(|m| { + ( + m.user, + m.scope_role() + .map(|r| r.to_string()) + .unwrap_or_else(|_| "member".to_string()), + ) + }) .collect(); - let participants = members.into_iter().map(|user_id| { - let user_info = users.get(&user_id).cloned(); - let state = state_map.get(&(room_id, user_id)); - let project_role = role_map.get(&user_id).cloned().unwrap_or_else(|| "member".to_string()); - let is_room_owner = room_model.created_by == user_id; + let participants = members + .into_iter() + .map(|user_id| { + let user_info = users.get(&user_id).cloned(); + let state = state_map.get(&(room_id, user_id)); + let project_role = role_map + .get(&user_id) + .cloned() + .unwrap_or_else(|| "member".to_string()); + let is_room_owner = room_model.created_by == user_id; - super::RoomParticipantResponse { - room: room_id, - user: user_id, - user_info, - project_role, - is_room_owner, - last_read_seq: state.and_then(|s| s.last_read_seq), - do_not_disturb: state.map(|s| s.do_not_disturb).unwrap_or(false), - dnd_start_hour: state.and_then(|s| s.dnd_start_hour), - dnd_end_hour: state.and_then(|s| s.dnd_end_hour), - joined_at: state.and_then(|s| s.joined_at), - } - }).collect(); + super::RoomParticipantResponse { + room: room_id, + user: user_id, + user_info, + project_role, + is_room_owner, + last_read_seq: state.and_then(|s| s.last_read_seq), + do_not_disturb: state.map(|s| s.do_not_disturb).unwrap_or(false), + dnd_start_hour: state.and_then(|s| s.dnd_start_hour), + dnd_end_hour: state.and_then(|s| s.dnd_end_hour), + joined_at: state.and_then(|s| s.joined_at), + } + }) + .collect(); Ok(super::RoomParticipantListResponse { participants }) } -} \ No newline at end of file +} diff --git a/libs/room/src/message.rs b/libs/room/src/message.rs index 3dfbc41..49b8c01 100644 --- a/libs/room/src/message.rs +++ b/libs/room/src/message.rs @@ -86,7 +86,8 @@ impl RoomService { .or_else(|| Some(format!("AI({})", &id.to_string()[..8]))) }), _ => msg.sender_id.and_then(|id| users.get(&id).cloned()), - }.or_else(|| msg.sender_id.map(|id| id.to_string())); + } + .or_else(|| msg.sender_id.map(|id| id.to_string())); let chunked = super::RoomMessageResponse::detect_chunked(&msg.thinking_content); super::RoomMessageResponse { id: msg.id, @@ -142,8 +143,10 @@ impl RoomService { .await .unwrap_or_default(); - let mut reaction_map: std::collections::HashMap> = - std::collections::HashMap::new(); + let mut reaction_map: std::collections::HashMap< + Uuid, + Vec, + > = std::collections::HashMap::new(); for r in reactions { reaction_map.entry(r.message).or_default().push(r); } diff --git a/libs/room/src/message_write.rs b/libs/room/src/message_write.rs index 45475ee..d0803c1 100644 --- a/libs/room/src/message_write.rs +++ b/libs/room/src/message_write.rs @@ -40,7 +40,8 @@ impl RoomService { } } - let seq = crate::service::next_room_message_seq_internal(room_id, &self.db, &self.cache).await?; + let seq = + crate::service::next_room_message_seq_internal(room_id, &self.db, &self.cache).await?; let now = Utc::now(); let id = Uuid::now_v7(); let project_id = room_model.project; @@ -101,7 +102,12 @@ impl RoomService { }; let preview = if content.len() > 50 { - let end = content.char_indices().map(|(i, _)| i).take_while(|&i| i <= 50).last().unwrap_or(50); + let end = content + .char_indices() + .map(|(i, _)| i) + .take_while(|&i| i <= 50) + .last() + .unwrap_or(50); format!("{}...", &content[..end]) } else { content.clone() @@ -152,10 +158,7 @@ impl RoomService { let attachment_ids = request.attachment_ids.clone(); if !attachment_ids.is_empty() { if let Err(e) = room_attachment::Entity::update_many() - .col_expr( - room_attachment::Column::Message, - Expr::value(Some(id)), - ) + .col_expr(room_attachment::Column::Message, Expr::value(Some(id))) .filter(room_attachment::Column::Id.is_in(attachment_ids.clone())) .exec(&self.db) .await @@ -196,7 +199,10 @@ impl RoomService { .notification_create(super::NotificationCreateRequest { notification_type: super::NotificationType::Mention, user_id: mentioned_user_id, - title: format!("{} 在 {} 中提到了你", sender_display_name, room_model.room_name), + title: format!( + "{} 在 {} 中提到了你", + sender_display_name, room_model.room_name + ), content: Some(content.clone()), room_id: Some(room_id), project_id, diff --git a/libs/room/src/metrics.rs b/libs/room/src/metrics.rs index 89a3647..76980b4 100644 --- a/libs/room/src/metrics.rs +++ b/libs/room/src/metrics.rs @@ -1,4 +1,6 @@ -use metrics::{describe_counter, describe_gauge, describe_histogram, Counter, Gauge, Histogram, Unit}; +use metrics::{ + Counter, Gauge, Histogram, Unit, describe_counter, describe_gauge, describe_histogram, +}; use std::sync::Arc; use uuid::Uuid; @@ -165,8 +167,7 @@ impl RoomMetrics { } #[allow(dead_code)] - pub async fn cleanup_stale_rooms(&self, _active_room_ids: &[Uuid]) { - } + pub async fn cleanup_stale_rooms(&self, _active_room_ids: &[Uuid]) {} pub fn into_arc(self) -> Arc { Arc::new(self) diff --git a/libs/room/src/notification_write.rs b/libs/room/src/notification_write.rs index 05a7b48..9db801b 100644 --- a/libs/room/src/notification_write.rs +++ b/libs/room/src/notification_write.rs @@ -4,9 +4,9 @@ use crate::service::RoomService; use crate::ws_context::WsUserContext; use chrono::Utc; use deadpool_redis::redis; +use models::Expr; use models::rooms::room_notifications; use models::users::user as user_model; -use models::Expr; use sea_orm::*; use std::sync::Arc; use uuid::Uuid; diff --git a/libs/room/src/presence.rs b/libs/room/src/presence.rs index 2c488ec..04bd8d2 100644 --- a/libs/room/src/presence.rs +++ b/libs/room/src/presence.rs @@ -184,7 +184,11 @@ impl PresenceStore { expires_at: Option>, ) -> Option { // Find the primary presence entry (first one found) - let key = self.entries.iter().find(|e| e.user_id == user_id).map(|e| *e.key()); + let key = self + .entries + .iter() + .find(|e| e.user_id == user_id) + .map(|e| *e.key()); if let Some(key) = key { if let Some(mut entry) = self.entries.get_mut(&key) { @@ -234,7 +238,11 @@ impl PresenceStore { } /// Remove user presence when they disconnect. - pub fn remove_presence(&self, user_id: Uuid, project_id: Option) -> Option { + pub fn remove_presence( + &self, + user_id: Uuid, + project_id: Option, + ) -> Option { let key = (project_id, user_id); if let Some((_, entry)) = self.entries.remove(&key) { // Remove from user_projects index @@ -261,7 +269,10 @@ impl PresenceStore { pub fn project_online_count(&self, project_id: Uuid) -> usize { self.entries .iter() - .filter(|entry| entry.key().0 == Some(project_id) && entry.effective_status() != PresenceStatus::Offline) + .filter(|entry| { + entry.key().0 == Some(project_id) + && entry.effective_status() != PresenceStatus::Offline + }) .count() } -} \ No newline at end of file +} diff --git a/libs/room/src/reaction.rs b/libs/room/src/reaction.rs index fc0501d..8b13a96 100644 --- a/libs/room/src/reaction.rs +++ b/libs/room/src/reaction.rs @@ -1,7 +1,7 @@ use crate::error::RoomError; use crate::service::RoomService; -use crate::ws_context::WsUserContext; use crate::types_responses::ReactionGroupResponse; +use crate::ws_context::WsUserContext; use models::rooms::room_message_reaction; use models::users::user as user_model; use sea_orm::*; @@ -20,14 +20,23 @@ pub struct MessageSearchResponse { } impl RoomService { - pub async fn message_reactions_get(&self, message_id: Uuid, ctx: &WsUserContext) -> Result { + pub async fn message_reactions_get( + &self, + message_id: Uuid, + ctx: &WsUserContext, + ) -> Result { let user_id = ctx.user_id; let message = self.find_message_or_404(message_id).await?; self.require_room_access(message.room, user_id).await?; self.get_message_reactions(message_id, Some(user_id)).await } - pub async fn message_reactions_batch(&self, room_id: Uuid, message_ids: Vec, ctx: &WsUserContext) -> Result, RoomError> { + pub async fn message_reactions_batch( + &self, + room_id: Uuid, + message_ids: Vec, + ctx: &WsUserContext, + ) -> Result, RoomError> { let user_id = ctx.user_id; self.require_room_access(room_id, user_id).await?; let mut results = Vec::with_capacity(message_ids.len()); @@ -38,17 +47,33 @@ impl RoomService { Ok(results) } - pub async fn message_search(&self, room_id: Uuid, query: &str, limit: Option, offset: Option, ctx: &WsUserContext) -> Result { + pub async fn message_search( + &self, + room_id: Uuid, + query: &str, + limit: Option, + offset: Option, + ctx: &WsUserContext, + ) -> Result { let user_id = ctx.user_id; self.require_room_access(room_id, user_id).await?; if query.trim().is_empty() { - return Ok(MessageSearchResponse { messages: Vec::new(), total: 0 }); + return Ok(MessageSearchResponse { + messages: Vec::new(), + total: 0, + }); } let limit = limit.unwrap_or(20); let offset = offset.unwrap_or(0); - let search_pattern = format!("%{}%", query.replace('\\', "\\\\").replace('%', "\\%").replace('_', "\\_")); + let search_pattern = format!( + "%{}%", + query + .replace('\\', "\\\\") + .replace('%', "\\%") + .replace('_', "\\_") + ); let query_builder = models::rooms::room_message::Entity::find() .filter(models::rooms::room_message::Column::Room.eq(room_id)) @@ -58,14 +83,25 @@ impl RoomService { let total = query_builder.clone().count(&self.db).await? as i64; let messages = query_builder .order_by_desc(models::rooms::room_message::Column::SendAt) - .limit(limit).offset(offset).all(&self.db).await?; + .limit(limit) + .offset(offset) + .all(&self.db) + .await?; let response_messages = self.build_messages_with_display_names(messages).await; - Ok(MessageSearchResponse { messages: response_messages, total }) + Ok(MessageSearchResponse { + messages: response_messages, + total, + }) } - pub(crate) async fn find_message_or_404(&self, message_id: Uuid) -> Result { - models::rooms::room_message::Entity::find_by_id(message_id).one(&self.db).await? + pub(crate) async fn find_message_or_404( + &self, + message_id: Uuid, + ) -> Result { + models::rooms::room_message::Entity::find_by_id(message_id) + .one(&self.db) + .await? .ok_or_else(|| RoomError::NotFound("Message not found".to_string())) } @@ -76,45 +112,110 @@ impl RoomService { Ok(()) } - pub(crate) async fn get_message_reactions(&self, message_id: Uuid, current_user_id: Option) -> Result { + pub(crate) async fn get_message_reactions( + &self, + message_id: Uuid, + current_user_id: Option, + ) -> Result { let reactions = room_message_reaction::Entity::find() .filter(room_message_reaction::Column::Message.eq(message_id)) .limit(1000) - .all(&self.db).await?; + .all(&self.db) + .await?; let reaction_groups = self.build_reaction_groups(reactions, current_user_id); - Ok(MessageReactionsResponse { message_id, reactions: reaction_groups }) + Ok(MessageReactionsResponse { + message_id, + reactions: reaction_groups, + }) } - pub(crate) fn build_reaction_groups(&self, reactions: Vec, current_user_id: Option) -> Vec { - let mut grouped: std::collections::HashMap> = std::collections::HashMap::new(); - for r in &reactions { grouped.entry(r.emoji.clone()).or_default().push(r); } + pub(crate) fn build_reaction_groups( + &self, + reactions: Vec, + current_user_id: Option, + ) -> Vec { + let mut grouped: std::collections::HashMap> = + std::collections::HashMap::new(); + for r in &reactions { + grouped.entry(r.emoji.clone()).or_default().push(r); + } - grouped.into_iter().map(|(emoji, user_reactions)| { - let count = user_reactions.len() as i32; - let reacted_by_me = current_user_id.map(|uid| user_reactions.iter().any(|r| r.user == uid)).unwrap_or(false); - let users = user_reactions.iter().take(3).map(|r| r.user.to_string()).collect(); - ReactionGroupResponse { emoji, count, reacted_by_me, users } - }).collect() + grouped + .into_iter() + .map(|(emoji, user_reactions)| { + let count = user_reactions.len() as i32; + let reacted_by_me = current_user_id + .map(|uid| user_reactions.iter().any(|r| r.user == uid)) + .unwrap_or(false); + let users = user_reactions + .iter() + .take(3) + .map(|r| r.user.to_string()) + .collect(); + ReactionGroupResponse { + emoji, + count, + reacted_by_me, + users, + } + }) + .collect() } - pub(crate) async fn build_messages_with_display_names(&self, messages: Vec) -> Vec { - let user_ids: Vec = messages.iter() - .filter(|m| m.sender_type.to_string() == "user").filter_map(|m| m.sender_id).collect(); + pub(crate) async fn build_messages_with_display_names( + &self, + messages: Vec, + ) -> Vec { + let user_ids: Vec = messages + .iter() + .filter(|m| m.sender_type.to_string() == "user") + .filter_map(|m| m.sender_id) + .collect(); let users: std::collections::HashMap = if !user_ids.is_empty() { - user_model::Entity::find().filter(user_model::Column::Uid.is_in(user_ids)) - .all(&self.db).await.unwrap_or_default().into_iter() - .map(|u| (u.uid, u.display_name.unwrap_or(u.username))).collect() - } else { std::collections::HashMap::new() }; + user_model::Entity::find() + .filter(user_model::Column::Uid.is_in(user_ids)) + .all(&self.db) + .await + .unwrap_or_default() + .into_iter() + .map(|u| (u.uid, u.display_name.unwrap_or(u.username))) + .collect() + } else { + std::collections::HashMap::new() + }; - messages.into_iter().map(|msg| { - let sender_type = msg.sender_type.to_string(); - let display_name = match sender_type.as_str() { - "user" => msg.sender_id.and_then(|id| users.get(&id).cloned()), - _ => None, - }; - let chunked = super::RoomMessageResponse::detect_chunked(&msg.thinking_content); - super::RoomMessageResponse { id: msg.id, seq: msg.seq, room: msg.room, sender_type, sender_id: msg.sender_id, display_name, thread: msg.thread, in_reply_to: msg.in_reply_to, content: msg.content, content_type: msg.content_type.to_string(), thinking_content: msg.thinking_content, thinking_is_chunked: chunked, edited_at: msg.edited_at, send_at: msg.send_at, revoked: msg.revoked, revoked_by: msg.revoked_by, highlighted_content: None, attachment_ids: Vec::new(), reactions: Vec::new() } - }).collect() + messages + .into_iter() + .map(|msg| { + let sender_type = msg.sender_type.to_string(); + let display_name = match sender_type.as_str() { + "user" => msg.sender_id.and_then(|id| users.get(&id).cloned()), + _ => None, + }; + let chunked = super::RoomMessageResponse::detect_chunked(&msg.thinking_content); + super::RoomMessageResponse { + id: msg.id, + seq: msg.seq, + room: msg.room, + sender_type, + sender_id: msg.sender_id, + display_name, + thread: msg.thread, + in_reply_to: msg.in_reply_to, + content: msg.content, + content_type: msg.content_type.to_string(), + thinking_content: msg.thinking_content, + thinking_is_chunked: chunked, + edited_at: msg.edited_at, + send_at: msg.send_at, + revoked: msg.revoked, + revoked_by: msg.revoked_by, + highlighted_content: None, + attachment_ids: Vec::new(), + reactions: Vec::new(), + } + }) + .collect() } } diff --git a/libs/room/src/room_ai_queue.rs b/libs/room/src/room_ai_queue.rs index 8ca99d7..fbe9965 100644 --- a/libs/room/src/room_ai_queue.rs +++ b/libs/room/src/room_ai_queue.rs @@ -213,7 +213,8 @@ pub async fn acquire_room_ai_lock( let lock_key_for_watchdog = lock_key.clone(); let lock_token_for_watchdog = lock_token.clone(); tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_millis(HEARTBEAT_INTERVAL_MS)); + let mut interval = + tokio::time::interval(Duration::from_millis(HEARTBEAT_INTERVAL_MS)); loop { tokio::select! { _ = cancel_token.cancelled() => break, diff --git a/libs/room/src/room_write.rs b/libs/room/src/room_write.rs index b22689b..3f41c1e 100644 --- a/libs/room/src/room_write.rs +++ b/libs/room/src/room_write.rs @@ -2,7 +2,11 @@ use crate::error::RoomError; use crate::service::RoomService; use crate::ws_context::WsUserContext; use chrono::Utc; -use models::rooms::{room, room_ai, room_attachment, room_category, room_message, room_message_edit_history, room_message_reaction, room_notifications, room_pin, room_thread, room_access, room_user_state}; +use models::rooms::{ + room, room_access, room_ai, room_attachment, room_category, room_message, + room_message_edit_history, room_message_reaction, room_notifications, room_pin, room_thread, + room_user_state, +}; use queue::ProjectRoomEvent; use sea_orm::*; use uuid::Uuid; @@ -25,7 +29,9 @@ impl RoomService { .await? .ok_or_else(|| RoomError::NotFound("Room category not found".to_string()))?; if category.project != project.id { - return Err(RoomError::BadRequest("category does not belong to this project".to_string())); + return Err(RoomError::BadRequest( + "category does not belong to this project".to_string(), + )); } } @@ -40,7 +46,9 @@ impl RoomService { created_by: Set(user_id), created_at: Set(Utc::now()), last_msg_at: Set(Utc::now()), - }.insert(&txn).await?; + } + .insert(&txn) + .await?; // Create room_user_state for creator room_user_state::ActiveModel { @@ -51,7 +59,9 @@ impl RoomService { dnd_start_hour: Set(None), dnd_end_hour: Set(None), joined_at: Set(Some(Utc::now())), - }.insert(&txn).await?; + } + .insert(&txn) + .await?; // For private rooms, creator is auto-granted access if !request.public { @@ -60,7 +70,9 @@ impl RoomService { user: Set(user_id), granted_by: Set(user_id), granted_at: Set(Utc::now()), - }.insert(&txn).await?; + } + .insert(&txn) + .await?; } txn.commit().await?; @@ -76,7 +88,10 @@ impl RoomService { seq: None, timestamp: Utc::now(), }; - let _ = self.queue.publish_project_room_event(project.id, event).await; + let _ = self + .queue + .publish_project_room_event(project.id, event) + .await; self.notify_project_members( project.id, @@ -109,7 +124,9 @@ impl RoomService { .await? .ok_or_else(|| RoomError::NotFound("Room category not found".to_string()))?; if category.project != room_model.project { - return Err(RoomError::BadRequest("category does not belong to this project".to_string())); + return Err(RoomError::BadRequest( + "category does not belong to this project".to_string(), + )); } } @@ -140,7 +157,10 @@ impl RoomService { seq: None, timestamp: Utc::now(), }; - let _ = self.queue.publish_project_room_event(updated.project, event).await; + let _ = self + .queue + .publish_project_room_event(updated.project, event) + .await; } if moved { let event = ProjectRoomEvent { @@ -152,7 +172,10 @@ impl RoomService { seq: None, timestamp: Utc::now(), }; - let _ = self.queue.publish_project_room_event(updated.project, event).await; + let _ = self + .queue + .publish_project_room_event(updated.project, event) + .await; } let version = self.increment_room_version(room_id).await?; @@ -169,14 +192,38 @@ impl RoomService { let txn = self.db.begin().await?; - room_attachment::Entity::delete_many().filter(room_attachment::Column::Room.eq(room_id)).exec(&txn).await?; - room_message::Entity::delete_many().filter(room_message::Column::Room.eq(room_id)).exec(&txn).await?; - room_pin::Entity::delete_many().filter(room_pin::Column::Room.eq(room_id)).exec(&txn).await?; - room_thread::Entity::delete_many().filter(room_thread::Column::Room.eq(room_id)).exec(&txn).await?; - room_access::Entity::delete_many().filter(room_access::Column::Room.eq(room_id)).exec(&txn).await?; - room_user_state::Entity::delete_many().filter(room_user_state::Column::Room.eq(room_id)).exec(&txn).await?; - room_ai::Entity::delete_many().filter(room_ai::Column::Room.eq(room_id)).exec(&txn).await?; - room_message_reaction::Entity::delete_many().filter(room_message_reaction::Column::Room.eq(room_id)).exec(&txn).await?; + room_attachment::Entity::delete_many() + .filter(room_attachment::Column::Room.eq(room_id)) + .exec(&txn) + .await?; + room_message::Entity::delete_many() + .filter(room_message::Column::Room.eq(room_id)) + .exec(&txn) + .await?; + room_pin::Entity::delete_many() + .filter(room_pin::Column::Room.eq(room_id)) + .exec(&txn) + .await?; + room_thread::Entity::delete_many() + .filter(room_thread::Column::Room.eq(room_id)) + .exec(&txn) + .await?; + room_access::Entity::delete_many() + .filter(room_access::Column::Room.eq(room_id)) + .exec(&txn) + .await?; + room_user_state::Entity::delete_many() + .filter(room_user_state::Column::Room.eq(room_id)) + .exec(&txn) + .await?; + room_ai::Entity::delete_many() + .filter(room_ai::Column::Room.eq(room_id)) + .exec(&txn) + .await?; + room_message_reaction::Entity::delete_many() + .filter(room_message_reaction::Column::Room.eq(room_id)) + .exec(&txn) + .await?; let subquery = room_message::Entity::find() .filter(room_message::Column::Room.eq(room_id)) @@ -188,7 +235,10 @@ impl RoomService { .exec(&txn) .await?; - room_notifications::Entity::delete_many().filter(room_notifications::Column::Room.eq(room_id)).exec(&txn).await?; + room_notifications::Entity::delete_many() + .filter(room_notifications::Column::Room.eq(room_id)) + .exec(&txn) + .await?; room::Entity::delete_by_id(room_id).exec(&txn).await?; txn.commit().await?; @@ -212,7 +262,10 @@ impl RoomService { seq: None, timestamp: Utc::now(), }; - let _ = self.queue.publish_project_room_event(project_id, event).await; + let _ = self + .queue + .publish_project_room_event(project_id, event) + .await; self.notify_project_members( project_id, @@ -225,4 +278,4 @@ impl RoomService { observability::incr!(observability::ROOMS_DELETED_TOTAL); Ok(()) } -} \ No newline at end of file +} diff --git a/libs/room/src/search.rs b/libs/room/src/search.rs index 652f61b..515e71b 100644 --- a/libs/room/src/search.rs +++ b/libs/room/src/search.rs @@ -1,6 +1,6 @@ +use crate::RoomMessageSearchRequest; use crate::error::RoomError; use crate::service::RoomService; -use crate::RoomMessageSearchRequest; use crate::ws_context::WsUserContext; use models::rooms::room_message; use models::{DateTimeUtc, MessageId, RoomId, RoomThreadId, Seq, UserId}; @@ -8,27 +8,60 @@ use sea_orm::*; use uuid::Uuid; impl RoomService { - pub async fn room_message_search(&self, room_id: Uuid, request: RoomMessageSearchRequest, ctx: &WsUserContext) -> Result { + pub async fn room_message_search( + &self, + room_id: Uuid, + request: RoomMessageSearchRequest, + ctx: &WsUserContext, + ) -> Result { let user_id = ctx.user_id; self.require_room_access(room_id, user_id).await?; if request.q.trim().is_empty() { - return Ok(super::reaction::MessageSearchResponse { messages: Vec::new(), total: 0 }); + return Ok(super::reaction::MessageSearchResponse { + messages: Vec::new(), + total: 0, + }); } let limit = std::cmp::min(request.limit.unwrap_or(20), 100); let offset = request.offset.unwrap_or(0); - let mut conditions = vec!["room = $1".to_string(), "content_tsv @@ plainto_tsquery('simple', $2)".to_string(), "revoked IS NULL".to_string()]; + let mut conditions = vec![ + "room = $1".to_string(), + "content_tsv @@ plainto_tsquery('simple', $2)".to_string(), + "revoked IS NULL".to_string(), + ]; let mut param_index = 3; let mut params: Vec = vec![room_id.into(), request.q.trim().into()]; - if let Some(st) = request.start_time { conditions.push(format!("send_at >= ${}", param_index)); params.push(st.into()); param_index += 1; } - if let Some(et) = request.end_time { conditions.push(format!("send_at <= ${}", param_index)); params.push(et.into()); param_index += 1; } - if let Some(sid) = request.sender_id { conditions.push(format!("sender_id = ${}", param_index)); params.push(sid.into()); param_index += 1; } - if let Some(ref ct) = request.content_type { conditions.push(format!("content_type = ${}", param_index)); params.push(ct.clone().into()); param_index += 1; } + if let Some(st) = request.start_time { + conditions.push(format!("send_at >= ${}", param_index)); + params.push(st.into()); + param_index += 1; + } + if let Some(et) = request.end_time { + conditions.push(format!("send_at <= ${}", param_index)); + params.push(et.into()); + param_index += 1; + } + if let Some(sid) = request.sender_id { + conditions.push(format!("sender_id = ${}", param_index)); + params.push(sid.into()); + param_index += 1; + } + if let Some(ref ct) = request.content_type { + conditions.push(format!("content_type = ${}", param_index)); + params.push(ct.clone().into()); + param_index += 1; + } let where_clause = conditions.join(" AND "); - let sql = format!(r#"SELECT id, seq, room, sender_type, sender_id, thread, in_reply_to, content, content_type, edited_at, send_at, revoked, revoked_by, ts_headline('simple', content, plainto_tsquery('simple', $2), 'StartSel=, StopSel=, MaxWords=50, MinWords=15') AS highlighted_content FROM room_message WHERE {} ORDER BY send_at DESC LIMIT ${} OFFSET ${}"#, where_clause, param_index, param_index + 1); + let sql = format!( + r#"SELECT id, seq, room, sender_type, sender_id, thread, in_reply_to, content, content_type, edited_at, send_at, revoked, revoked_by, ts_headline('simple', content, plainto_tsquery('simple', $2), 'StartSel=, StopSel=, MaxWords=50, MinWords=15') AS highlighted_content FROM room_message WHERE {} ORDER BY send_at DESC LIMIT ${} OFFSET ${}"#, + where_clause, + param_index, + param_index + 1 + ); params.push(limit.into()); params.push(offset.into()); @@ -40,46 +73,147 @@ impl RoomService { for row in rows { let sender_type_str = row.try_get::("", "sender_type").unwrap_or_default(); let sender_type = match sender_type_str.as_str() { - "user" => models::rooms::MessageSenderType::User, "ai" => models::rooms::MessageSenderType::Ai, "system" => models::rooms::MessageSenderType::System, "tool" => models::rooms::MessageSenderType::Tool, "webhook" => models::rooms::MessageSenderType::Webhook, _ => models::rooms::MessageSenderType::User, + "user" => models::rooms::MessageSenderType::User, + "ai" => models::rooms::MessageSenderType::Ai, + "system" => models::rooms::MessageSenderType::System, + "tool" => models::rooms::MessageSenderType::Tool, + "webhook" => models::rooms::MessageSenderType::Webhook, + _ => models::rooms::MessageSenderType::User, }; - let content_type_str = row.try_get::("", "content_type").unwrap_or_default(); + let content_type_str = row + .try_get::("", "content_type") + .unwrap_or_default(); let content_type = match content_type_str.as_str() { - "image" => models::rooms::MessageContentType::Image, "audio" => models::rooms::MessageContentType::Audio, "video" => models::rooms::MessageContentType::Video, "file" => models::rooms::MessageContentType::File, _ => models::rooms::MessageContentType::Text, + "image" => models::rooms::MessageContentType::Image, + "audio" => models::rooms::MessageContentType::Audio, + "video" => models::rooms::MessageContentType::Video, + "file" => models::rooms::MessageContentType::File, + _ => models::rooms::MessageContentType::Text, + }; + let msg = room_message::Model { + id: row.try_get::("", "id").unwrap_or_default(), + seq: row.try_get::("", "seq").unwrap_or_default(), + room: row.try_get::("", "room").unwrap_or_default(), + sender_type, + sender_id: row + .try_get::>("", "sender_id") + .ok() + .flatten(), + model_id: row.try_get::>("", "model_id").ok().flatten(), + thread: row + .try_get::>("", "thread") + .ok() + .flatten(), + in_reply_to: row + .try_get::>("", "in_reply_to") + .ok() + .flatten(), + content: row.try_get::("", "content").unwrap_or_default(), + content_type, + thinking_content: None, + edited_at: row + .try_get::>("", "edited_at") + .ok() + .flatten(), + send_at: row + .try_get::("", "send_at") + .unwrap_or_default(), + revoked: row + .try_get::>("", "revoked") + .ok() + .flatten(), + revoked_by: row + .try_get::>("", "revoked_by") + .ok() + .flatten(), + content_tsv: None, }; - let msg = room_message::Model { id: row.try_get::("", "id").unwrap_or_default(), seq: row.try_get::("", "seq").unwrap_or_default(), room: row.try_get::("", "room").unwrap_or_default(), sender_type, sender_id: row.try_get::>("", "sender_id").ok().flatten(), model_id: row.try_get::>("", "model_id").ok().flatten(), thread: row.try_get::>("", "thread").ok().flatten(), in_reply_to: row.try_get::>("", "in_reply_to").ok().flatten(), content: row.try_get::("", "content").unwrap_or_default(), content_type, thinking_content: None, edited_at: row.try_get::>("", "edited_at").ok().flatten(), send_at: row.try_get::("", "send_at").unwrap_or_default(), revoked: row.try_get::>("", "revoked").ok().flatten(), revoked_by: row.try_get::>("", "revoked_by").ok().flatten(), content_tsv: None }; - let highlighted_content = row.try_get::("", "highlighted_content").unwrap_or_else(|_| msg.content.clone()); + let highlighted_content = row + .try_get::("", "highlighted_content") + .unwrap_or_else(|_| msg.content.clone()); let mut msg_with_name = self.resolve_display_name(msg.clone(), room_id).await; msg_with_name.highlighted_content = Some(highlighted_content); results.push(msg_with_name); } - let mut count_conditions = vec!["room = $1".to_string(), "content_tsv @@ plainto_tsquery('simple', $2)".to_string(), "revoked IS NULL".to_string()]; + let mut count_conditions = vec![ + "room = $1".to_string(), + "content_tsv @@ plainto_tsquery('simple', $2)".to_string(), + "revoked IS NULL".to_string(), + ]; let mut count_params: Vec = vec![room_id.into(), request.q.trim().into()]; let mut count_param_idx = 3; - if let Some(st) = request.start_time { count_conditions.push(format!("send_at >= ${}", count_param_idx)); count_params.push(st.into()); count_param_idx += 1; } - if let Some(et) = request.end_time { count_conditions.push(format!("send_at <= ${}", count_param_idx)); count_params.push(et.into()); count_param_idx += 1; } - if let Some(sid) = request.sender_id { count_conditions.push(format!("sender_id = ${}", count_param_idx)); count_params.push(sid.into()); count_param_idx += 1; } - if let Some(ref ct) = request.content_type { count_conditions.push(format!("content_type = ${}", count_param_idx)); count_params.push(ct.clone().into()); } + if let Some(st) = request.start_time { + count_conditions.push(format!("send_at >= ${}", count_param_idx)); + count_params.push(st.into()); + count_param_idx += 1; + } + if let Some(et) = request.end_time { + count_conditions.push(format!("send_at <= ${}", count_param_idx)); + count_params.push(et.into()); + count_param_idx += 1; + } + if let Some(sid) = request.sender_id { + count_conditions.push(format!("sender_id = ${}", count_param_idx)); + count_params.push(sid.into()); + count_param_idx += 1; + } + if let Some(ref ct) = request.content_type { + count_conditions.push(format!("content_type = ${}", count_param_idx)); + count_params.push(ct.clone().into()); + } - let count_sql = format!("SELECT COUNT(*) AS count FROM room_message WHERE {}", count_conditions.join(" AND ")); - let count_stmt = Statement::from_sql_and_values(DbBackend::Postgres, &count_sql, count_params); - let total: i64 = self.db.query_one_raw(count_stmt).await?.and_then(|r| r.try_get::("", "count").ok()).unwrap_or(0); + let count_sql = format!( + "SELECT COUNT(*) AS count FROM room_message WHERE {}", + count_conditions.join(" AND ") + ); + let count_stmt = + Statement::from_sql_and_values(DbBackend::Postgres, &count_sql, count_params); + let total: i64 = self + .db + .query_one_raw(count_stmt) + .await? + .and_then(|r| r.try_get::("", "count").ok()) + .unwrap_or(0); - Ok(super::reaction::MessageSearchResponse { messages: results, total }) + Ok(super::reaction::MessageSearchResponse { + messages: results, + total, + }) } - pub async fn room_message_edit_history(&self, room_id: Uuid, message_id: Uuid, ctx: &WsUserContext) -> Result { + pub async fn room_message_edit_history( + &self, + room_id: Uuid, + message_id: Uuid, + ctx: &WsUserContext, + ) -> Result { let user_id = ctx.user_id; self.require_room_access(room_id, user_id).await?; - let _msg = room_message::Entity::find_by_id(message_id).one(&self.db).await? + let _msg = room_message::Entity::find_by_id(message_id) + .one(&self.db) + .await? .ok_or_else(|| RoomError::NotFound("Message not found".to_string()))?; let history = models::rooms::room_message_edit_history::Entity::find() .filter(models::rooms::room_message_edit_history::Column::Message.eq(message_id)) - .order_by_asc(models::rooms::room_message_edit_history::Column::EditedAt).all(&self.db).await?; + .order_by_asc(models::rooms::room_message_edit_history::Column::EditedAt) + .all(&self.db) + .await?; let total_edits = history.len() as i64; - let entries: Vec = history.into_iter().map(|h| super::MessageEditHistoryEntry { old_content: h.old_content, new_content: h.new_content, edited_at: h.edited_at }).collect(); - Ok(super::MessageEditHistoryResponse { message_id, history: entries, total_edits }) + let entries: Vec = history + .into_iter() + .map(|h| super::MessageEditHistoryEntry { + old_content: h.old_content, + new_content: h.new_content, + edited_at: h.edited_at, + }) + .collect(); + Ok(super::MessageEditHistoryResponse { + message_id, + history: entries, + total_edits, + }) } } diff --git a/libs/room/src/search_write.rs b/libs/room/src/search_write.rs index 611ffeb..3b06d3a 100644 --- a/libs/room/src/search_write.rs +++ b/libs/room/src/search_write.rs @@ -7,15 +7,28 @@ use sea_orm::*; use uuid::Uuid; impl RoomService { - pub async fn room_message_reaction_list(&self, room_id: Uuid, message_id: Uuid, ctx: &WsUserContext) -> Result { + pub async fn room_message_reaction_list( + &self, + room_id: Uuid, + message_id: Uuid, + ctx: &WsUserContext, + ) -> Result { let user_id = ctx.user_id; self.require_room_access(room_id, user_id).await?; - let _msg = room_message::Entity::find_by_id(message_id).one(&self.db).await? + let _msg = room_message::Entity::find_by_id(message_id) + .one(&self.db) + .await? .ok_or_else(|| RoomError::NotFound("Message not found".to_string()))?; self.get_message_reactions(message_id, Some(user_id)).await } - pub async fn room_message_reaction_toggle(&self, room_id: Uuid, message_id: Uuid, emoji: String, ctx: &WsUserContext) -> Result { + pub async fn room_message_reaction_toggle( + &self, + room_id: Uuid, + message_id: Uuid, + emoji: String, + ctx: &WsUserContext, + ) -> Result { let user_id = ctx.user_id; self.require_room_access(room_id, user_id).await?; if emoji.is_empty() || emoji.len() > 50 { @@ -27,15 +40,24 @@ impl RoomService { .filter(room_message_reaction::Column::Message.eq(message_id)) .filter(room_message_reaction::Column::User.eq(user_id)) .filter(room_message_reaction::Column::Emoji.eq(&emoji)) - .one(&self.db).await? + .one(&self.db) + .await? { - room_message_reaction::Entity::delete_by_id(existing.id).exec(&self.db).await?; + room_message_reaction::Entity::delete_by_id(existing.id) + .exec(&self.db) + .await?; } else { room_message_reaction::ActiveModel { - id: Set(Uuid::now_v7()), room: Set(room_id), message: Set(message_id), - user: Set(user_id), emoji: Set(emoji), created_at: Set(Utc::now()), - }.insert(&self.db).await?; + id: Set(Uuid::now_v7()), + room: Set(room_id), + message: Set(message_id), + user: Set(user_id), + emoji: Set(emoji), + created_at: Set(Utc::now()), + } + .insert(&self.db) + .await?; } self.get_message_reactions(message_id, Some(user_id)).await } -} \ No newline at end of file +} diff --git a/libs/room/src/service/access.rs b/libs/room/src/service/access.rs index fb78ca1..7a41f24 100644 --- a/libs/room/src/service/access.rs +++ b/libs/room/src/service/access.rs @@ -1,7 +1,7 @@ use db::database::AppDatabase; +use models::projects::MemberRole; use models::projects::project_members; use models::rooms::{room, room_access, room_user_state}; -use models::projects::MemberRole; use sea_orm::*; use uuid::Uuid; @@ -65,11 +65,7 @@ pub async fn check_project_member( } /// Check if user is a project admin (Owner or Admin role). -pub async fn is_project_admin( - db: &AppDatabase, - project_id: Uuid, - user_id: Uuid, -) -> bool { +pub async fn is_project_admin(db: &AppDatabase, project_id: Uuid, user_id: Uuid) -> bool { let member = project_members::Entity::find() .filter(project_members::Column::Project.eq(project_id)) .filter(project_members::Column::User.eq(user_id)) @@ -98,11 +94,7 @@ pub async fn require_project_admin( } /// Check if user can admin a room (project admin OR room creator). -pub async fn is_room_admin( - db: &AppDatabase, - room_id: Uuid, - user_id: Uuid, -) -> bool { +pub async fn is_room_admin(db: &AppDatabase, room_id: Uuid, user_id: Uuid) -> bool { let room_model = room::Entity::find_by_id(room_id) .one(db) .await @@ -176,4 +168,4 @@ pub async fn find_room_or_404(db: &AppDatabase, room_id: Uuid) -> Result Result { self.require_room_access(room_id, ctx.user_id).await?; - let state = crate::service::access::get_or_create_room_user_state( - &self.db, room_id, ctx.user_id, - ) - .await?; + let state = + crate::service::access::get_or_create_room_user_state(&self.db, room_id, ctx.user_id) + .await?; let mut active: room_user_state::ActiveModel = state.into(); active.last_read_seq = Set(Some(last_read_seq)); @@ -145,10 +148,9 @@ impl RoomService { ) -> Result { self.require_room_access(room_id, ctx.user_id).await?; - let state = crate::service::access::get_or_create_room_user_state( - &self.db, room_id, ctx.user_id, - ) - .await?; + let state = + crate::service::access::get_or_create_room_user_state(&self.db, room_id, ctx.user_id) + .await?; let mut active: room_user_state::ActiveModel = state.into(); if let Some(dnd) = request.do_not_disturb { @@ -178,4 +180,4 @@ impl RoomService { joined_at: updated.joined_at, }) } -} \ No newline at end of file +} diff --git a/libs/room/src/service/ai_mode_streaming_post.rs b/libs/room/src/service/ai_mode_streaming_post.rs index 490f015..80fc75b 100644 --- a/libs/room/src/service/ai_mode_streaming_post.rs +++ b/libs/room/src/service/ai_mode_streaming_post.rs @@ -2,10 +2,10 @@ use chrono::Utc; use db::database::AppDatabase; use models::rooms::{room_ai, room_message}; use queue::{MessageProducer, ProjectRoomEvent, RoomMessageEnvelope}; -use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, ExprTrait, QueryFilter, Set}; +use sea_orm::{ColumnTrait, EntityTrait, ExprTrait, QueryFilter, Set, sea_query::Expr}; use uuid::Uuid; -use super::ai_mode_streaming_steps::{lock_or_recover, ModeStreamingState}; +use super::ai_mode_streaming_steps::{ModeStreamingState, lock_or_recover}; use crate::connection::RoomConnectionManager; use agent::chat::normalize_thinking_content; diff --git a/libs/room/src/service/ai_nonstreaming.rs b/libs/room/src/service/ai_nonstreaming.rs index ca3fc1b..2dd27a7 100644 --- a/libs/room/src/service/ai_nonstreaming.rs +++ b/libs/room/src/service/ai_nonstreaming.rs @@ -5,7 +5,7 @@ use db::cache::AppCache; use db::database::AppDatabase; use models::rooms::room_ai; use queue::MessageProducer; -use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, ExprTrait, QueryFilter}; +use sea_orm::{ColumnTrait, EntityTrait, ExprTrait, QueryFilter, sea_query::Expr}; use uuid::Uuid; use super::ai_common::create_and_publish_ai_message; diff --git a/libs/room/src/service/ai_react_nonstreaming.rs b/libs/room/src/service/ai_react_nonstreaming.rs index 6b526c4..91db077 100644 --- a/libs/room/src/service/ai_react_nonstreaming.rs +++ b/libs/room/src/service/ai_react_nonstreaming.rs @@ -5,7 +5,7 @@ use db::cache::AppCache; use db::database::AppDatabase; use models::rooms::room_ai; use queue::MessageProducer; -use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, ExprTrait, QueryFilter}; +use sea_orm::{ColumnTrait, EntityTrait, ExprTrait, QueryFilter, sea_query::Expr}; use uuid::Uuid; use crate::connection::RoomConnectionManager; @@ -32,8 +32,11 @@ pub async fn process_message_ai_react_nonstreaming( let final_answer = chat_service .process_react_room( - &request, |_step| async move {}, - room_tools, Some(&room_preamble), Some(queue.clone()), + &request, + |_step| async move {}, + room_tools, + Some(&room_preamble), + Some(queue.clone()), ) .await; diff --git a/libs/room/src/service/ai_react_streaming.rs b/libs/room/src/service/ai_react_streaming.rs index 573cd2d..05817b9 100644 --- a/libs/room/src/service/ai_react_streaming.rs +++ b/libs/room/src/service/ai_react_streaming.rs @@ -58,7 +58,9 @@ pub async fn process_message_ai_react_streaming( action: "start".to_string(), sender_type: Some("ai".to_string()), }; - room_manager.broadcast_typing(room_id, typing_start.clone()).await; + room_manager + .broadcast_typing(room_id, typing_start.clone()) + .await; let (typing_cancel_tx, mut typing_cancel_rx) = tokio::sync::oneshot::channel::<()>(); let typing_renew_handle = tokio::spawn({ @@ -95,9 +97,15 @@ pub async fn process_message_ai_react_streaming( true, // suppress_answer_broadcast: room mode — AI must use send_message ); - let result = chat_service.process_react_room( - &request, callback, room_tools, Some(&room_preamble), Some(queue.clone()), - ).await; + let result = chat_service + .process_react_room( + &request, + callback, + room_tools, + Some(&room_preamble), + Some(queue.clone()), + ) + .await; // In room mode, suppress final answer posting — AI communicates via send_message tool. finalize_react_stream( diff --git a/libs/room/src/service/ai_react_streaming_post.rs b/libs/room/src/service/ai_react_streaming_post.rs index 7fdacf5..d5ffcc7 100644 --- a/libs/room/src/service/ai_react_streaming_post.rs +++ b/libs/room/src/service/ai_react_streaming_post.rs @@ -2,10 +2,10 @@ use chrono::Utc; use db::database::AppDatabase; use models::rooms::{room_ai, room_message}; use queue::{MessageProducer, ProjectRoomEvent, RoomMessageEnvelope}; -use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, ExprTrait, QueryFilter, Set}; +use sea_orm::{ColumnTrait, EntityTrait, ExprTrait, QueryFilter, Set, sea_query::Expr}; use uuid::Uuid; -use super::ai_react_streaming_steps::{lock_or_recover, ReactStreamingState}; +use super::ai_react_streaming_steps::{ReactStreamingState, lock_or_recover}; use super::sequence::next_room_message_seq_internal; use crate::connection::RoomConnectionManager; use agent::chat::normalize_thinking_content; @@ -32,7 +32,9 @@ pub(crate) async fn finalize_react_stream( let final_event = RoomMessageStreamChunkEvent { message_id: streaming_msg_id, room_id, - seq: state.chunk_seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed), + seq: state + .chunk_seq + .fetch_add(1, std::sync::atomic::Ordering::Relaxed), content: final_stream_content.clone(), done: true, error: None, @@ -51,9 +53,9 @@ pub(crate) async fn finalize_react_stream( // doesn't cross .await points (MutexGuard is not Send). let used_send_message = { let steps = lock_or_recover(&state.steps); - steps.iter().any(|(t, c)| { - t == "tool_call" && c.contains("\"name\":\"send_message\"") - }) + steps + .iter() + .any(|(t, c)| t == "tool_call" && c.contains("\"name\":\"send_message\"")) }; if !used_send_message && !final_stream_content.trim().is_empty() { @@ -112,22 +114,27 @@ pub(crate) async fn finalize_react_stream( if let Err(e) = queue.publish(room_id, envelope).await { tracing::error!(error = %e, "Failed to publish auto-send room message"); } else { - room_manager.broadcast(room_id, queue::RoomMessageEvent { - id: msg_id, - room_id, - sender_type: "ai".to_string(), - sender_id: Some(model_id), - thread_id: None, - content: final_stream_content.clone(), - content_type: "text".to_string(), - thinking_content: None, - send_at: now, - seq: msg_seq, - in_reply_to: None, - display_name: Some(ai_display_name.to_string()), - reactions: None, - message_id: None, - }).await; + room_manager + .broadcast( + room_id, + queue::RoomMessageEvent { + id: msg_id, + room_id, + sender_type: "ai".to_string(), + sender_id: Some(model_id), + thread_id: None, + content: final_stream_content.clone(), + content_type: "text".to_string(), + thinking_content: None, + send_at: now, + seq: msg_seq, + in_reply_to: None, + display_name: Some(ai_display_name.to_string()), + reactions: None, + message_id: None, + }, + ) + .await; room_manager.metrics.messages_sent.increment(1); let project_event = ProjectRoomEvent { @@ -139,7 +146,9 @@ pub(crate) async fn finalize_react_stream( seq: Some(msg_seq), timestamp: now, }; - queue.publish_project_room_event(project_id, project_event).await; + queue + .publish_project_room_event(project_id, project_event) + .await; } } @@ -147,7 +156,10 @@ pub(crate) async fn finalize_react_stream( if result.is_ok() { let now = chrono::Utc::now(); if let Err(e) = room_ai::Entity::update_many() - .col_expr(room_ai::Column::CallCount, Expr::col(room_ai::Column::CallCount).add(1)) + .col_expr( + room_ai::Column::CallCount, + Expr::col(room_ai::Column::CallCount).add(1), + ) .col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now))) .filter(room_ai::Column::Room.eq(room_id)) .filter(room_ai::Column::Model.eq(model_id)) @@ -272,7 +284,10 @@ pub(crate) async fn finalize_react_stream( } else { let now = Utc::now(); if let Err(e) = room_ai::Entity::update_many() - .col_expr(room_ai::Column::CallCount, Expr::col(room_ai::Column::CallCount).add(1)) + .col_expr( + room_ai::Column::CallCount, + Expr::col(room_ai::Column::CallCount).add(1), + ) .col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now))) .filter(room_ai::Column::Room.eq(room_id)) .filter(room_ai::Column::Model.eq(model_id)) diff --git a/libs/room/src/service/ai_react_streaming_steps.rs b/libs/room/src/service/ai_react_streaming_steps.rs index 8ee1d3f..4b300c9 100644 --- a/libs/room/src/service/ai_react_streaming_steps.rs +++ b/libs/room/src/service/ai_react_streaming_steps.rs @@ -7,7 +7,9 @@ use crate::connection::RoomConnectionManager; use agent::react::ReactStep; pub(crate) fn lock_or_recover(mutex: &std::sync::Mutex) -> std::sync::MutexGuard<'_, T> { - mutex.lock().unwrap_or_else(|poisoned| poisoned.into_inner()) + mutex + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()) } pub(crate) struct ReactStreamingState { diff --git a/libs/room/src/service/ai_service.rs b/libs/room/src/service/ai_service.rs index fd47152..4d0b515 100644 --- a/libs/room/src/service/ai_service.rs +++ b/libs/room/src/service/ai_service.rs @@ -58,10 +58,8 @@ impl RoomAiService { return Ok(false); } - let model_ids: std::collections::HashSet = ai_configs - .iter() - .map(|c| c.model.to_string()) - .collect(); + let model_ids: std::collections::HashSet = + ai_configs.iter().map(|c| c.model.to_string()).collect(); for cap in mention_bracket_re().captures_iter(content) { if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { @@ -147,9 +145,11 @@ impl RoomAiService { content.hash(&mut hasher); let idemp_key = format!("ai:idempot:{}:{}", room_id, hasher.finish()); { - let mut conn = self.cache.conn().await.map_err(|e| { - RoomError::Internal(format!("cache conn: {}", e)) - })?; + let mut conn = self + .cache + .conn() + .await + .map_err(|e| RoomError::Internal(format!("cache conn: {}", e)))?; let exists = redis::cmd("SET") .arg(&idemp_key) .arg("1") @@ -199,15 +199,15 @@ impl RoomAiService { let user_names = history::get_user_names(&self.db, &user_ids).await; - let mentions = - history::extract_mention_context(&self.db, room.project, content).await; + let mentions = history::extract_mention_context(&self.db, room.project, content).await; - let context_setting = models::projects::project_context_setting::Entity::find_by_id(project.id) - .one(&self.db) - .await - .map_err(|_| ()) - .ok() - .and_then(|x| x); + let context_setting = + models::projects::project_context_setting::Entity::find_by_id(project.id) + .one(&self.db) + .await + .map_err(|_| ()) + .ok() + .and_then(|x| x); // Build room-only tool registry (send_message, retract_message) let mut room_tools = ToolRegistry::new(); @@ -221,35 +221,31 @@ impl RoomAiService { .await .ok() .flatten() - .map(|m| m.scope_role().map(|r| r.to_string()).unwrap_or_else(|_| "guest".into())) + .map(|m| { + m.scope_role() + .map(|r| r.to_string()) + .unwrap_or_else(|_| "guest".into()) + }) .unwrap_or_else(|| "guest".into()); - // Build room preamble: room identity, sender info, permissions, history - let room_preamble = build_room_preamble( - &room, - &project, - &model, - &sender, - &sender_role, - &history_messages, - &user_names, - ); + let max_tokens = ai_config.max_tokens.unwrap_or(4096) as i32; - let request = AiRequest { + let mut request = AiRequest { db: self.db.clone(), cache: self.cache.clone(), config: self.config.clone(), - model, + model: model.clone(), project: project.clone(), context_setting, - sender, + sender: sender.clone(), room: room.clone(), input: content.to_string(), mention: mentions, history: history_messages, + history_cutoff_seq: None, user_names, temperature: ai_config.temperature.unwrap_or(0.7), - max_tokens: ai_config.max_tokens.unwrap_or(4096) as i32, + max_tokens, top_p: 1.0, frequency_penalty: 0.0, presence_penalty: 0.0, @@ -258,37 +254,73 @@ impl RoomAiService { max_tool_depth: 1000, }; + let (optimized_history, cutoff_seq) = chat_service + .build_room_optimized_context_text(&request) + .await + .unwrap_or_else(|e| { + tracing::warn!(error = %e, "room optimized context failed; using recent history"); + (String::new(), None) + }); + request.history_cutoff_seq = cutoff_seq; + + // Build room preamble: room identity, sender info, permissions, optimized history + let room_preamble = build_room_preamble( + &room, + &project, + &model, + &sender, + &sender_role, + &optimized_history, + ); + // Pre-flight balance check: verify the project + user can afford at least a minimal AI call - let balance_ok = agent::billing::check_balance( - &self.db, project.id, sender_id, model_id, 500, 250, - ).await; + let balance_ok = + agent::billing::check_balance(&self.db, project.id, sender_id, model_id, 500, 250) + .await; match balance_ok { - Ok(true) => {}, + Ok(true) => {} Ok(false) => { tracing::warn!(project_id = %project.id, user_id = %sender_id, "Insufficient balance for AI call"); // Persist the billing error let _ = agent::billing::persist_billing_error( - &self.db, "project", project.id, "insufficient_balance", - &format!("Insufficient balance. Project {} and user {} have no remaining funds.", project.id, sender_id), + &self.db, + "project", + project.id, + "insufficient_balance", + &format!( + "Insufficient balance. Project {} and user {} have no remaining funds.", + project.id, sender_id + ), Some(serde_json::json!({ "user_id": sender_id.to_string(), "model_id": model_id.to_string(), "project_id": project.id.to_string(), })), - ).await; + ) + .await; // Send the billing error as a visible message in the room - let error_content = format!("⚠️ Billing Error: Insufficient balance. Your project and personal account do not have enough funds to process this AI request. Please add credits to continue using AI features."); + let error_content = format!( + "⚠️ Billing Error: Insufficient balance. Your project and personal account do not have enough funds to process this AI request. Please add credits to continue using AI features." + ); let _ = super::ai_common::create_and_publish_ai_message( - &self.db, &self.cache, &self.queue, &self.room_manager, - room_id, project.id, Uuid::nil(), error_content, - model_id, Some("System".to_string()), - ).await; + &self.db, + &self.cache, + &self.queue, + &self.room_manager, + room_id, + project.id, + Uuid::nil(), + error_content, + model_id, + Some("System".to_string()), + ) + .await; return Ok(()); - }, + } Err(e) => { tracing::warn!(error = %e, "Balance check failed, proceeding without pre-flight check"); } @@ -299,18 +331,36 @@ impl RoomAiService { // Dispatch to ReAct streaming or nonstreaming with room tools and preamble if use_streaming { ai_react_streaming::process_message_ai_react_streaming( - chat_service, request, room_id, room.project, model_id, - lock_guard, self.db.clone(), self.cache.clone(), - self.queue.clone(), self.room_manager.clone(), - room_tools, room_preamble, - ).await; + chat_service, + request, + room_id, + room.project, + model_id, + lock_guard, + self.db.clone(), + self.cache.clone(), + self.queue.clone(), + self.room_manager.clone(), + room_tools, + room_preamble, + ) + .await; } else { ai_react_nonstreaming::process_message_ai_react_nonstreaming( - chat_service, request, room_id, room.project, model_id, - lock_guard, self.db.clone(), self.cache.clone(), - self.queue.clone(), self.room_manager.clone(), - room_tools, room_preamble, - ).await; + chat_service, + request, + room_id, + room.project, + model_id, + lock_guard, + self.db.clone(), + self.cache.clone(), + self.queue.clone(), + self.room_manager.clone(), + room_tools, + room_preamble, + ) + .await; } Ok(()) @@ -328,8 +378,7 @@ fn build_room_preamble( model: &models::agents::model::Model, sender: &models::users::user::Model, sender_role: &str, - history: &[models::rooms::room_message::Model], - user_names: &std::collections::HashMap, + optimized_history: &str, ) -> String { let mut preamble = String::new(); @@ -351,9 +400,7 @@ fn build_room_preamble( - **Name:** {}\n\ - **Model ID:** `{}`\n\ You are an AI assistant in this room. When referring to yourself, use your name **{}**.\n", - model.name, - model.id, - model.name, + model.name, model.id, model.name, )); // Sender info and permissions @@ -361,29 +408,16 @@ fn build_room_preamble( "\n### Who Mentioned You\n\ - **User:** {} (ID: `{}`)\n\ - **Project Role:** {}\n", - sender.username, - sender.uid, - sender_role, + sender.username, sender.uid, sender_role, )); if let Some(ref display_name) = sender.display_name { preamble.push_str(&format!("- **Display Name:** {}\n", display_name)); } - // Recent history (sliding window) - if !history.is_empty() { - preamble.push_str(&format!( - "\n### Recent Conversation (last {} messages)\n", - history.len() - )); - for msg in history.iter().rev().take(20) { - let author = msg - .sender_id - .and_then(|uid| user_names.get(&uid)) - .cloned() - .unwrap_or_else(|| "unknown".into()); - let content = msg.content.clone(); - preamble.push_str(&format!("- **{}**: {}\n", author, content)); - } + if !optimized_history.trim().is_empty() { + preamble.push_str("\n"); + preamble.push_str(optimized_history); + preamble.push_str("\n"); } // Append room communication rules diff --git a/libs/room/src/service/ai_streaming.rs b/libs/room/src/service/ai_streaming.rs index d0b4457..bf29ff3 100644 --- a/libs/room/src/service/ai_streaming.rs +++ b/libs/room/src/service/ai_streaming.rs @@ -7,13 +7,13 @@ use db::cache::AppCache; use db::database::AppDatabase; use models::rooms::{room_ai, room_message}; use queue::{MessageProducer, ProjectRoomEvent, RoomMessageEnvelope}; -use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, ExprTrait, QueryFilter, Set}; +use sea_orm::{ColumnTrait, EntityTrait, ExprTrait, QueryFilter, Set, sea_query::Expr}; use tokio::sync::Mutex; use uuid::Uuid; use super::sequence::next_room_message_seq_internal; use crate::connection::RoomConnectionManager; -use agent::chat::{normalize_thinking_content, AiChunkType, AiRequest, ChatService}; +use agent::chat::{AiChunkType, AiRequest, ChatService, normalize_thinking_content}; pub async fn process_message_ai_streaming( chat_service: Arc, @@ -121,9 +121,8 @@ pub async fn process_message_ai_streaming( buf.push_str(&chunk.content); // Flush on natural boundaries: newlines, or when content buffer grows large. // Small tokens (< 6 chars without newline) accumulate until boundary. - let should_flush = chunk.content.contains('\n') - || buf.len() >= 60 - || chunk.done; + let should_flush = + chunk.content.contains('\n') || buf.len() >= 60 || chunk.done; if should_flush { let content = std::mem::take(&mut *buf); buf.clear(); @@ -154,7 +153,8 @@ pub async fn process_message_ai_streaming( }; let thinking_content = { let mut buf = thinking_buf.lock().await; - let content = normalize_thinking_content(&std::mem::take(&mut *buf)); + let content = + normalize_thinking_content(&std::mem::take(&mut *buf)); buf.clear(); content }; @@ -221,7 +221,9 @@ pub async fn process_message_ai_streaming( action: "start".to_string(), sender_type: Some("ai".to_string()), }; - room_manager.broadcast_typing(room_id_inner, typing_start.clone()).await; + room_manager + .broadcast_typing(room_id_inner, typing_start.clone()) + .await; let (typing_cancel_tx, typing_cancel_rx) = tokio::sync::oneshot::channel::<()>(); let typing_renew_handle = tokio::spawn({ @@ -320,10 +322,7 @@ pub async fn process_message_ai_streaming( room_ai::Column::CallCount, Expr::col(room_ai::Column::CallCount).add(1), ) - .col_expr( - room_ai::Column::LastCallAt, - Expr::value(Some(now)), - ) + .col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now))) .filter(room_ai::Column::Room.eq(room_id_inner)) .filter(room_ai::Column::Model.eq(model_id)) .exec(&db) @@ -363,7 +362,9 @@ pub async fn process_message_ai_streaming( action: "stop".to_string(), sender_type: Some("ai".to_string()), }; - room_manager.broadcast_typing(room_id_inner, typing_stop).await; + room_manager + .broadcast_typing(room_id_inner, typing_stop) + .await; let event = ProjectRoomEvent { event_type: crate::RoomEventType::NewMessage.as_str().into(), @@ -391,7 +392,9 @@ pub async fn process_message_ai_streaming( action: "stop".to_string(), sender_type: Some("ai".to_string()), }; - room_manager.broadcast_typing(room_id_inner, typing_stop).await; + room_manager + .broadcast_typing(room_id_inner, typing_stop) + .await; let event = RoomMessageStreamChunkEvent { message_id: streaming_msg_id, diff --git a/libs/room/src/service/mod.rs b/libs/room/src/service/mod.rs index 0f9e846..172cd30 100644 --- a/libs/room/src/service/mod.rs +++ b/libs/room/src/service/mod.rs @@ -24,18 +24,22 @@ mod workers_spawn; pub use patterns::{mention_bracket_re, mention_tag_re, user_mention_re}; -pub use access::{check_room_access, check_project_member, require_room_access, require_room_admin, require_project_admin, is_room_admin, is_project_admin, find_room_or_404, get_or_create_room_user_state}; +pub use access::{ + check_project_member, check_room_access, find_room_or_404, get_or_create_room_user_state, + is_project_admin, is_room_admin, require_project_admin, require_room_access, + require_room_admin, +}; pub use ai_common::create_and_publish_ai_message; -pub use ai_service::RoomAiService; pub use ai_nonstreaming::process_message_ai_nonstreaming; pub use ai_react_nonstreaming::process_message_ai_react_nonstreaming; pub use ai_react_streaming::process_message_ai_react_streaming; +pub use ai_service::RoomAiService; pub use ai_streaming::process_message_ai_streaming; -pub use history::{get_room_history, get_user_names, get_room_ai_config, extract_mention_context}; +pub use history::{extract_mention_context, get_room_ai_config, get_room_history, get_user_names}; pub use mentions::extract_mentions; pub use notifications::{notify_project_members, publish_room_event}; pub use sequence::next_room_message_seq_internal; -pub use workers::{start_workers, PushNotificationFn}; +pub use workers::{PushNotificationFn, start_workers}; pub use workers_spawn::{spawn_agent_task, spawn_room_workers, unmark_room_spawned}; use std::sync::Arc; @@ -49,12 +53,12 @@ use queue::{MessageProducer, ProjectRoomEvent}; use sea_orm::{ColumnTrait, QueryFilter}; use uuid::Uuid; -use crate::connection::{RoomConnectionManager, DedupCache}; +use crate::connection::{DedupCache, RoomConnectionManager}; use crate::error::RoomError; use crate::presence::PresenceStore; +use agent::TaskService; use agent::chat::ChatService; use agent::embed::EmbedService; -use agent::TaskService; use models::agent_task::AgentType; const DEFAULT_MAX_CONCURRENT_WORKERS: usize = 1024; @@ -91,8 +95,10 @@ impl RoomService { push_fn: Option, embed_service: Option>, ) -> Self { - let dedup_cache: DedupCache = - Arc::new(dashmap::DashMap::with_capacity_and_hasher(10000, Default::default())); + let dedup_cache: DedupCache = Arc::new(dashmap::DashMap::with_capacity_and_hasher( + 10000, + Default::default(), + )); let ai_service = RoomAiService::new( db.clone(), cache.clone(), @@ -237,20 +243,30 @@ impl RoomService { if type_m.as_str() == "user" { let id = id_m.as_str().trim(); if let Ok(uuid) = Uuid::parse_str(id) { - if !resolved.contains(&uuid) { resolved.push(uuid); } + if !resolved.contains(&uuid) { + resolved.push(uuid); + } } else if let Some(label_m) = cap.get(3) { let label = label_m.as_str().trim(); if !label.is_empty() { let label_lower = label.to_lowercase(); - if seen_usernames.contains(&label_lower) { continue; } + if seen_usernames.contains(&label_lower) { + continue; + } seen_usernames.push(label_lower.clone()); if let Some(user) = User::find() .filter(models::users::user::Column::Username.ilike(&label_lower)) - .one(&self.db).await.ok().flatten() + .one(&self.db) + .await + .ok() + .flatten() { - if !resolved.contains(&user.uid) { resolved.push(user.uid); } - } } + if !resolved.contains(&user.uid) { + resolved.push(user.uid); + } + } + } } } } @@ -300,7 +316,11 @@ impl RoomService { access::is_room_admin(&self.db, room_id, user_id).await } - pub async fn require_project_admin(&self, project_id: Uuid, user_id: Uuid) -> Result<(), RoomError> { + pub async fn require_project_admin( + &self, + project_id: Uuid, + user_id: Uuid, + ) -> Result<(), RoomError> { access::require_project_admin(&self.db, project_id, user_id).await } @@ -322,18 +342,20 @@ impl RoomService { if event.is_some() { // Broadcast to project subscribers if let Some(pid) = project_id { - self.room_manager.broadcast_project( - pid, - queue::ProjectRoomEvent { - event_type: "presence_changed".into(), - project_id: pid, - room_id: None, - category_id: None, - message_id: None, - seq: None, - timestamp: chrono::Utc::now(), - }, - ).await; + self.room_manager + .broadcast_project( + pid, + queue::ProjectRoomEvent { + event_type: "presence_changed".into(), + project_id: pid, + room_id: None, + category_id: None, + message_id: None, + seq: None, + timestamp: chrono::Utc::now(), + }, + ) + .await; } } event @@ -347,7 +369,8 @@ impl RoomService { text: Option, expires_at: Option>, ) -> Option { - self.presence.set_custom_status(user_id, emoji, text, expires_at) + self.presence + .set_custom_status(user_id, emoji, text, expires_at) } /// Get all presence entries for a project. @@ -365,20 +388,22 @@ impl RoomService { if event.is_some() { // Broadcast to project subscribers if let Some(pid) = project_id { - self.room_manager.broadcast_project( - pid, - queue::ProjectRoomEvent { - event_type: "presence_changed".into(), - project_id: pid, - room_id: None, - category_id: None, - message_id: None, - seq: None, - timestamp: chrono::Utc::now(), - }, - ).await; + self.room_manager + .broadcast_project( + pid, + queue::ProjectRoomEvent { + event_type: "presence_changed".into(), + project_id: pid, + room_id: None, + category_id: None, + message_id: None, + seq: None, + timestamp: chrono::Utc::now(), + }, + ) + .await; } } event } -} \ No newline at end of file +} diff --git a/libs/room/src/service/process_ai.rs b/libs/room/src/service/process_ai.rs index 5195c9f..ec650f4 100644 --- a/libs/room/src/service/process_ai.rs +++ b/libs/room/src/service/process_ai.rs @@ -1,10 +1,10 @@ use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; use uuid::Uuid; +use super::RoomService; use super::ai_react_nonstreaming; use super::ai_react_streaming; use super::history; -use super::RoomService; use crate::error::RoomError; use crate::service::{mention_bracket_re, mention_tag_re}; use agent::chat::AiRequest; @@ -79,12 +79,13 @@ impl RoomService { .await? .ok_or_else(|| RoomError::NotFound("Project not found".to_string()))?; - let context_setting = models::projects::project_context_setting::Entity::find_by_id(project.id) - .one(&self.db) - .await - .map_err(|_| ()) - .ok() - .and_then(|x| x); + let context_setting = + models::projects::project_context_setting::Entity::find_by_id(project.id) + .one(&self.db) + .await + .map_err(|_| ()) + .ok() + .and_then(|x| x); let model = models::agents::model::Entity::find_by_id(model_id) .one(&self.db) @@ -119,34 +120,31 @@ impl RoomService { .await .ok() .flatten() - .map(|m| m.scope_role().map(|r| r.to_string()).unwrap_or_else(|_| "guest".into())) + .map(|m| { + m.scope_role() + .map(|r| r.to_string()) + .unwrap_or_else(|_| "guest".into()) + }) .unwrap_or_else(|| "guest".into()); - // Build room preamble: room identity, sender info, permissions, history - let room_preamble = build_room_preamble( - &room, - &project, - &sender, - &sender_role, - &history, - &user_names, - ); + let max_tokens = ai_config.max_tokens.unwrap_or(4096) as i32; - let request = AiRequest { + let mut request = AiRequest { db: self.db.clone(), cache: self.cache.clone(), config: self.config.clone(), model, project: project.clone(), context_setting, - sender, + sender: sender.clone(), room: room.clone(), input: content, mention: mentions, history, + history_cutoff_seq: None, user_names, temperature: ai_config.temperature.unwrap_or(0.7), - max_tokens: ai_config.max_tokens.unwrap_or(4096) as i32, + max_tokens, top_p: 1.0, frequency_penalty: 0.0, presence_penalty: 0.0, @@ -155,6 +153,19 @@ impl RoomService { max_tool_depth: 1000, }; + let (optimized_history, cutoff_seq) = chat_service + .build_room_optimized_context_text(&request) + .await + .unwrap_or_else(|e| { + tracing::warn!(error = %e, "room optimized context failed; using recent history"); + (String::new(), None) + }); + request.history_cutoff_seq = cutoff_seq; + + // Build room preamble: room identity, sender info, permissions, optimized history + let room_preamble = + build_room_preamble(&room, &project, &sender, &sender_role, &optimized_history); + let use_streaming = ai_config.stream; // Dispatch to ReAct streaming or nonstreaming with room tools and preamble @@ -202,8 +213,7 @@ fn build_room_preamble( project: &models::projects::project::Model, sender: &models::users::user::Model, sender_role: &str, - history: &[models::rooms::room_message::Model], - user_names: &std::collections::HashMap, + optimized_history: &str, ) -> String { let mut preamble = String::new(); @@ -222,33 +232,16 @@ fn build_room_preamble( "\n### Who Mentioned You\n\ - **User:** {} (ID: `{}`)\n\ - **Project Role:** {}\n", - sender.username, - sender.uid, - sender_role, + sender.username, sender.uid, sender_role, )); if let Some(ref display_name) = sender.display_name { preamble.push_str(&format!("- **Display Name:** {}\n", display_name)); } - // Recent history (sliding window, last 20 messages) - if !history.is_empty() { - preamble.push_str(&format!( - "\n### Recent Conversation (last {} messages)\n", - history.len() - )); - for msg in history.iter().rev().take(20) { - let author = msg - .sender_id - .and_then(|uid| user_names.get(&uid)) - .cloned() - .unwrap_or_else(|| "unknown".into()); - let content = if msg.content.len() > 200 { - format!("{}...", &msg.content[..200]) - } else { - msg.content.clone() - }; - preamble.push_str(&format!("- **{}**: {}\n", author, content)); - } + if !optimized_history.trim().is_empty() { + preamble.push_str("\n"); + preamble.push_str(optimized_history); + preamble.push_str("\n"); } preamble.push_str(ROOM_CONTEXT_PROMPT); diff --git a/libs/room/src/service/type_convert.rs b/libs/room/src/service/type_convert.rs index 5fb8b5f..d3be804 100644 --- a/libs/room/src/service/type_convert.rs +++ b/libs/room/src/service/type_convert.rs @@ -1,9 +1,11 @@ use models::rooms::{ - room, room_ai, room_category, room_message, room_notifications, room_pin, - room_thread, + room, room_ai, room_category, room_message, room_notifications, room_pin, room_thread, }; -use crate::{RoomCategoryResponse, RoomResponse, RoomMessageResponse, RoomThreadResponse, RoomPinResponse, RoomAiResponse, NotificationResponse}; +use crate::{ + NotificationResponse, RoomAiResponse, RoomCategoryResponse, RoomMessageResponse, + RoomPinResponse, RoomResponse, RoomThreadResponse, +}; impl From for RoomCategoryResponse { fn from(value: room_category::Model) -> Self { @@ -135,4 +137,4 @@ impl From for NotificationResponse { expires_at: value.expires_at, } } -} \ No newline at end of file +} diff --git a/libs/room/src/service/validation.rs b/libs/room/src/service/validation.rs index dcf81e9..b3fd5d3 100644 --- a/libs/room/src/service/validation.rs +++ b/libs/room/src/service/validation.rs @@ -40,17 +40,26 @@ impl RoomService { pub(crate) fn parse_message_content_type( content_type: Option, ) -> Result { - match content_type.unwrap_or_else(|| "text".to_string()).to_lowercase().as_str() { + match content_type + .unwrap_or_else(|| "text".to_string()) + .to_lowercase() + .as_str() + { "text" => Ok(models::rooms::MessageContentType::Text), "image" => Ok(models::rooms::MessageContentType::Image), "audio" => Ok(models::rooms::MessageContentType::Audio), "video" => Ok(models::rooms::MessageContentType::Video), "file" => Ok(models::rooms::MessageContentType::File), - _ => Err(RoomError::BadRequest("invalid message content_type".to_string())), + _ => Err(RoomError::BadRequest( + "invalid message content_type".to_string(), + )), } } - pub async fn utils_find_project_by_name(&self, name: String) -> Result { + pub async fn utils_find_project_by_name( + &self, + name: String, + ) -> Result { match project::Entity::find() .filter(project::Column::Name.eq(name.clone())) .one(&self.db) @@ -84,7 +93,11 @@ impl RoomService { .ok_or_else(|| RoomError::NotFound("Project not found".to_string())) } - pub async fn check_project_access(&self, project_uid: Uuid, user_uid: Uuid) -> Result<(), RoomError> { + pub async fn check_project_access( + &self, + project_uid: Uuid, + user_uid: Uuid, + ) -> Result<(), RoomError> { let project = project::Entity::find_by_id(project_uid) .one(&self.db) .await @@ -103,20 +116,27 @@ impl RoomService { .one(&self.db) .await?; - if member.is_some() { Ok(()) } else { Err(RoomError::NoPower) } + if member.is_some() { + Ok(()) + } else { + Err(RoomError::NoPower) + } } pub async fn ensure_room_visible_for_user( - &self, room: &room::Model, user_id: Uuid, + &self, + room: &room::Model, + user_id: Uuid, ) -> Result<(), RoomError> { self.require_room_access(room.id, user_id).await } pub async fn get_room_version(&self, room_id: Uuid) -> Result { let version_key = format!("room:version:{}", room_id); - let mut conn = self.cache.conn().await.map_err(|e| { - RoomError::Internal(format!("failed to get redis for version: {}", e)) - })?; + let mut conn = + self.cache.conn().await.map_err(|e| { + RoomError::Internal(format!("failed to get redis for version: {}", e)) + })?; let version: Option = redis::cmd("GET") .arg(&version_key) .query_async(&mut conn) @@ -130,12 +150,14 @@ impl RoomService { } pub async fn raw_increment_room_version( - cache: &db::cache::AppCache, room_id: Uuid, + cache: &db::cache::AppCache, + room_id: Uuid, ) -> Result { let version_key = format!("room:version:{}", room_id); - let mut conn = cache.conn().await.map_err(|e| { - RoomError::Internal(format!("failed to get redis for version: {}", e)) - })?; + let mut conn = cache + .conn() + .await + .map_err(|e| RoomError::Internal(format!("failed to get redis for version: {}", e)))?; let version: i64 = redis::cmd("INCR") .arg(&version_key) .query_async(&mut conn) @@ -143,4 +165,4 @@ impl RoomService { .map_err(|e| RoomError::Internal(format!("version INCR: {}", e)))?; Ok(version) } -} \ No newline at end of file +} diff --git a/libs/room/src/service/workers.rs b/libs/room/src/service/workers.rs index 8d8fda4..03c9671 100644 --- a/libs/room/src/service/workers.rs +++ b/libs/room/src/service/workers.rs @@ -6,9 +6,10 @@ use queue::MessageProducer; use sea_orm::EntityTrait; use uuid::Uuid; -use crate::connection::{make_persist_fn, DedupCache, PersistFn, RoomConnectionManager}; +use crate::connection::{DedupCache, PersistFn, RoomConnectionManager, make_persist_fn}; -pub type PushNotificationFn = Arc, Option) + Send + Sync>; +pub type PushNotificationFn = + Arc, Option) + Send + Sync>; /// Start global workers (JetStream persist consumer + cleanup). Room broadcast /// subscriptions are spawned lazily when the first WS client subscribes to a room, @@ -99,4 +100,4 @@ pub async fn start_workers( } tracing::info!("room workers stopped"); Ok(()) -} \ No newline at end of file +} diff --git a/libs/room/src/types_responses.rs b/libs/room/src/types_responses.rs index 1f77641..1ee9104 100644 --- a/libs/room/src/types_responses.rs +++ b/libs/room/src/types_responses.rs @@ -1,9 +1,9 @@ +use crate::NotificationResponse; +use crate::NotificationType; +use crate::UserInfo; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use crate::UserInfo; -use crate::NotificationType; -use crate::NotificationResponse; #[derive(Debug, Clone, Deserialize, Serialize, utoipa::ToSchema)] pub struct RoomCategoryCreateRequest { @@ -182,7 +182,9 @@ pub struct RoomMessageResponse { impl RoomMessageResponse { pub fn detect_chunked(thinking: &Option) -> bool { - thinking.as_ref().is_some_and(|s| s.contains("\"__chunks__\"")) + thinking + .as_ref() + .is_some_and(|s| s.contains("\"__chunks__\"")) } } @@ -280,4 +282,4 @@ pub struct NotificationListResponse { pub notifications: Vec, pub total: i64, pub unread_count: i64, -} \ No newline at end of file +}