From cec8d486f102a69b845303a8ee9e443edde6f8e4 Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Mon, 20 Apr 2026 15:45:18 +0800 Subject: [PATCH] feat(room): update room lib (connection, helpers, member, message, notification, reaction, room, search, service, types) --- libs/room/src/connection.rs | 18 ++- libs/room/src/helpers.rs | 6 +- libs/room/src/lib.rs | 2 +- libs/room/src/member.rs | 63 +++++++++ libs/room/src/message.rs | 7 +- libs/room/src/notification.rs | 18 ++- libs/room/src/reaction.rs | 1 + libs/room/src/room.rs | 96 +++++++++++++ libs/room/src/search.rs | 249 ++++++++++++++++++++-------------- libs/room/src/service.rs | 28 +++- libs/room/src/types.rs | 27 +++- 11 files changed, 399 insertions(+), 116 deletions(-) diff --git a/libs/room/src/connection.rs b/libs/room/src/connection.rs index 9d5622d..545eb6e 100644 --- a/libs/room/src/connection.rs +++ b/libs/room/src/connection.rs @@ -10,7 +10,7 @@ use uuid::Uuid; use db::database::AppDatabase; use models::rooms::{MessageContentType, MessageSenderType, room_message}; use queue::{AgentTaskEvent, ProjectRoomEvent, RoomMessageEnvelope, RoomMessageEvent, RoomMessageStreamChunkEvent}; -use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, Set}; +use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, Set}; use crate::error::RoomError; use crate::metrics::RoomMetrics; @@ -720,6 +720,7 @@ pub fn make_persist_fn( room: Set(env.room_id), sender_type: Set(sender_type), sender_id: Set(env.sender_id), + model_id: Set(env.model_id), thread: Set(env.thread_id), content: Set(env.content.clone()), content_type: Set(content_type), @@ -736,6 +737,21 @@ pub fn make_persist_fn( room_message::Entity::insert_many(models_to_insert) .exec(&db) .await?; + + // Update content_tsv for inserted messages + for env in chunk.iter() { + let update_sql = format!( + "UPDATE room_message SET content_tsv = to_tsvector('simple', content) WHERE id = '{}'", + env.id + ); + let stmt = sea_orm::Statement::from_sql_and_values( + sea_orm::DbBackend::Postgres, + &update_sql, + vec![], + ); + let _ = db.execute_raw(stmt).await; + } + metrics.messages_persisted.increment(count); } } diff --git a/libs/room/src/helpers.rs b/libs/room/src/helpers.rs index b34bce8..fb22c61 100644 --- a/libs/room/src/helpers.rs +++ b/libs/room/src/helpers.rs @@ -73,6 +73,7 @@ impl From for super::RoomMessageResponse { revoked: value.revoked, revoked_by: value.revoked_by, in_reply_to: value.in_reply_to, + highlighted_content: None, } } } @@ -388,8 +389,8 @@ impl RoomService { let sender_type = msg.sender_type.to_string(); let display_name = match sender_type.as_str() { "ai" => { - if let Some(sender_id) = msg.sender_id { - ai_model::Entity::find_by_id(sender_id) + if let Some(mid) = msg.model_id { + ai_model::Entity::find_by_id(mid) .one(&self.db) .await .ok() @@ -429,6 +430,7 @@ impl RoomService { revoked: msg.revoked, revoked_by: msg.revoked_by, in_reply_to: msg.in_reply_to, + highlighted_content: None, } } } diff --git a/libs/room/src/lib.rs b/libs/room/src/lib.rs index 9738740..b112dd9 100644 --- a/libs/room/src/lib.rs +++ b/libs/room/src/lib.rs @@ -30,5 +30,5 @@ pub use draft_and_history::{ pub use error::RoomError; pub use metrics::RoomMetrics; pub use reaction::{MessageReactionsResponse, MessageSearchResponse}; -pub use service::RoomService; +pub use service::{RoomService, PushNotificationFn}; pub use types::{RoomEventType, *}; diff --git a/libs/room/src/member.rs b/libs/room/src/member.rs index cbffb79..2c4a641 100644 --- a/libs/room/src/member.rs +++ b/libs/room/src/member.rs @@ -9,6 +9,9 @@ use sea_orm::*; use uuid::Uuid; impl RoomService { + /// Cache TTL for member list (in seconds). + const MEMBER_LIST_CACHE_TTL: u64 = 30; + pub async fn room_member_list( &self, room_id: Uuid, @@ -17,6 +20,24 @@ impl RoomService { let user_id = ctx.user_id; self.require_room_member(room_id, user_id).await?; + // Try cache first + let cache_key = format!("room:members:{}", room_id); + + if let Ok(mut conn) = self.cache.conn().await { + if let Ok(Some(cached)) = redis::cmd("GET") + .arg(&cache_key) + .query_async::>(&mut conn) + .await + { + if let Ok(responses) = serde_json::from_str::>(&cached) { + slog::debug!(self.log, "room_member_list: cache hit for key={}", cache_key); + return Ok(responses); + } + } + } + + slog::debug!(self.log, "room_member_list: cache miss for key={}", cache_key); + let members = room_member::Entity::find() .filter(room_member::Column::Room.eq(room_id)) .all(&self.db) @@ -60,6 +81,23 @@ impl RoomService { dnd_end_hour: m.dnd_end_hour, }) .collect(); + + // Cache the result + if let Ok(mut conn) = self.cache.conn().await { + if let Ok(json) = serde_json::to_string(&responses) { + let _: Option = redis::cmd("SETEX") + .arg(&cache_key) + .arg(Self::MEMBER_LIST_CACHE_TTL) + .arg(&json) + .query_async(&mut conn) + .await + .inspect_err(|e| { + slog::warn!(self.log, "room_member_list: failed to cache key={}: {}", cache_key, e); + }) + .ok(); + } + } + Ok(responses) } @@ -121,6 +159,9 @@ impl RoomService { drop(self.room_manager.subscribe(room_id, request.user_id).await); + // Invalidate member list cache + self.invalidate_member_list_cache(room_id).await; + self.publish_room_event( room_model.project, super::RoomEventType::MemberJoined, @@ -198,6 +239,9 @@ impl RoomService { active.role = Set(new_role); let updated = active.update(&self.db).await?; + // Invalidate member list cache + self.invalidate_member_list_cache(room_id).await; + let room = self.find_room_or_404(room_id).await?; let _ = self .notification_create(super::NotificationCreateRequest { @@ -264,6 +308,9 @@ impl RoomService { .exec(&self.db) .await?; + // Invalidate member list cache + self.invalidate_member_list_cache(room_id).await; + self.room_manager.unsubscribe(room_id, user_id).await; let room = self.find_room_or_404(room_id).await?; @@ -367,4 +414,20 @@ impl RoomService { }; Ok(updated_response) } + + /// Invalidate member list cache for a room. + async fn invalidate_member_list_cache(&self, room_id: Uuid) { + let cache_key = format!("room:members:{}", room_id); + if let Ok(mut conn) = self.cache.conn().await { + if let Err(e) = redis::cmd("DEL") + .arg(&cache_key) + .query_async::(&mut conn) + .await + { + slog::warn!(self.log, "invalidate_member_list_cache: DEL failed for {}: {}", cache_key, e); + } else { + slog::debug!(self.log, "invalidate_member_list_cache: deleted {}", cache_key); + } + } + } } diff --git a/libs/room/src/message.rs b/libs/room/src/message.rs index 381e20e..59c6e0e 100644 --- a/libs/room/src/message.rs +++ b/libs/room/src/message.rs @@ -44,7 +44,7 @@ impl RoomService { let ai_model_ids: Vec = models .iter() .filter(|m| m.sender_type.to_string() == "ai") - .filter_map(|m| m.sender_id) + .filter_map(|m| m.model_id) .collect(); let users: std::collections::HashMap = if !user_ids.is_empty() { @@ -78,7 +78,7 @@ impl RoomService { .map(|msg| { let sender_type = msg.sender_type.to_string(); let display_name = match sender_type.as_str() { - "ai" => msg.sender_id.and_then(|id| ai_names.get(&id).cloned()), + "ai" => msg.model_id.and_then(|id| ai_names.get(&id).cloned()), _ => msg.sender_id.and_then(|id| users.get(&id).cloned()), }; super::RoomMessageResponse { @@ -96,6 +96,7 @@ impl RoomService { send_at: msg.send_at, revoked: msg.revoked, revoked_by: msg.revoked_by, + highlighted_content: None, } }) .collect(); @@ -146,6 +147,7 @@ impl RoomService { room_id, sender_type: "member".to_string(), sender_id: Some(user_id), + model_id: None, thread_id, in_reply_to, content: content.clone(), @@ -275,6 +277,7 @@ impl RoomService { send_at: now, revoked: None, revoked_by: None, + highlighted_content: None, }) } diff --git a/libs/room/src/notification.rs b/libs/room/src/notification.rs index ee61d3f..b031fd8 100644 --- a/libs/room/src/notification.rs +++ b/libs/room/src/notification.rs @@ -30,6 +30,12 @@ impl RoomService { super::NotificationType::SystemAnnouncement => { room_notifications::NotificationType::SystemAnnouncement } + super::NotificationType::ProjectInvitation => { + room_notifications::NotificationType::ProjectInvitation + } + super::NotificationType::WorkspaceInvitation => { + room_notifications::NotificationType::WorkspaceInvitation + } }; let model = room_notifications::ActiveModel { @@ -261,10 +267,20 @@ impl RoomService { user_id: Uuid, notification: super::NotificationResponse, ) { - let event = super::NotificationEvent::new(notification); + let event = super::NotificationEvent::new(notification.clone()); self.room_manager .push_user_notification(user_id, Arc::new(event)) .await; + + // Also trigger Web Push for offline users + if let Some(push_fn) = &self.push_fn { + push_fn( + user_id, + notification.title.clone(), + notification.content.clone(), + None, // URL — could be derived from room/project + ); + } } fn unread_cache_key(user_id: Uuid) -> String { diff --git a/libs/room/src/reaction.rs b/libs/room/src/reaction.rs index eea5089..a91476f 100644 --- a/libs/room/src/reaction.rs +++ b/libs/room/src/reaction.rs @@ -325,6 +325,7 @@ impl RoomService { send_at: msg.send_at, revoked: msg.revoked, revoked_by: msg.revoked_by, + highlighted_content: None, } }) .collect() diff --git a/libs/room/src/room.rs b/libs/room/src/room.rs index 125e5fa..8f34fae 100644 --- a/libs/room/src/room.rs +++ b/libs/room/src/room.rs @@ -11,6 +11,9 @@ use sea_orm::*; use uuid::Uuid; impl RoomService { + /// Cache TTL for room list (in seconds). + const ROOM_LIST_CACHE_TTL: u64 = 60; + pub async fn room_list( &self, project_name: String, @@ -21,6 +24,29 @@ impl RoomService { let project = self.utils_find_project_by_name(project_name).await?; self.check_project_access(project.id, user_id).await?; + // Try cache first + let cache_key = format!( + "room:list:{}:{}:public={}", + project.id, + user_id, + only_public.unwrap_or(false) + ); + + if let Ok(mut conn) = self.cache.conn().await { + if let Ok(Some(cached)) = redis::cmd("GET") + .arg(&cache_key) + .query_async::>(&mut conn) + .await + { + if let Ok(responses) = serde_json::from_str::>(&cached) { + slog::debug!(self.log, "room_list: cache hit for key={}", cache_key); + return Ok(responses); + } + } + } + + slog::debug!(self.log, "room_list: cache miss for key={}", cache_key); + let mut query = room::Entity::find().filter(room::Column::Project.eq(project.id)); if only_public.unwrap_or(false) { query = query.filter(room::Column::Public.eq(true)); @@ -66,6 +92,22 @@ impl RoomService { responses.push(response); } + // Cache the result + if let Ok(mut conn) = self.cache.conn().await { + if let Ok(json) = serde_json::to_string(&responses) { + let _: Option = redis::cmd("SETEX") + .arg(&cache_key) + .arg(Self::ROOM_LIST_CACHE_TTL) + .arg(&json) + .query_async(&mut conn) + .await + .inspect_err(|e| { + slog::warn!(self.log, "room_list: failed to cache key={}: {}", cache_key, e); + }) + .ok(); + } + } + Ok(responses) } @@ -156,6 +198,9 @@ impl RoomService { txn.commit().await?; + // Invalidate room list cache for this project + self.invalidate_room_list_cache(project.id).await; + self.spawn_room_workers(room_model.id); let event = ProjectRoomEvent { @@ -232,6 +277,9 @@ impl RoomService { } let updated = active.update(&self.db).await?; + // Invalidate room list cache + self.invalidate_room_list_cache(updated.project).await; + if renamed { let event = ProjectRoomEvent { event_type: super::RoomEventType::RoomRenamed.as_str().into(), @@ -303,6 +351,9 @@ impl RoomService { txn.commit().await?; + // Invalidate room list cache + self.invalidate_room_list_cache(project_id).await; + self.room_manager.shutdown_room(room_id).await; // Clean up Redis seq key so re-creating the room starts fresh @@ -342,4 +393,49 @@ impl RoomService { Ok(()) } + + /// Invalidate all room list cache entries for a project. + async fn invalidate_room_list_cache(&self, project_id: Uuid) { + let pattern = format!("room:list:{}:*", project_id); + if let Ok(mut conn) = self.cache.conn().await { + // Use SCAN to find matching keys, then DELETE them + let mut cursor: u64 = 0; + loop { + let (new_cursor, keys): (u64, Vec) = match redis::cmd("SCAN") + .arg(cursor) + .arg("MATCH") + .arg(&pattern) + .arg("COUNT") + .arg(100) + .query_async(&mut conn) + .await + { + Ok(result) => result, + Err(e) => { + slog::warn!(self.log, "invalidate_room_list_cache: SCAN failed: {}", e); + break; + } + }; + cursor = new_cursor; + + if !keys.is_empty() { + // Delete keys in batches + let keys_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect(); + if let Err(e) = redis::cmd("DEL") + .arg(&keys_refs) + .query_async::(&mut conn) + .await + { + slog::warn!(self.log, "invalidate_room_list_cache: DEL failed: {}", e); + } else { + slog::debug!(self.log, "invalidate_room_list_cache: deleted {} keys", keys.len()); + } + } + + if cursor == 0 { + break; + } + } + } + } } diff --git a/libs/room/src/search.rs b/libs/room/src/search.rs index 73dcf9e..31efac4 100644 --- a/libs/room/src/search.rs +++ b/libs/room/src/search.rs @@ -1,5 +1,6 @@ use crate::error::RoomError; use crate::service::RoomService; +use crate::types::RoomMessageSearchRequest; use crate::ws_context::WsUserContext; use chrono::Utc; use models::rooms::{room_message, room_message_reaction}; @@ -11,139 +12,177 @@ impl RoomService { pub async fn room_message_search( &self, room_id: Uuid, - query: &str, - limit: Option, - offset: Option, + request: RoomMessageSearchRequest, ctx: &WsUserContext, ) -> Result { let user_id = ctx.user_id; self.require_room_member(room_id, user_id).await?; - if query.trim().is_empty() { + if request.q.trim().is_empty() { return Ok(super::MessageSearchResponse { messages: Vec::new(), total: 0, }); } - let limit = std::cmp::min(limit.unwrap_or(20), 100); - let offset = offset.unwrap_or(0); + let limit = std::cmp::min(request.limit.unwrap_or(20), 100); + let offset = request.offset.unwrap_or(0); - // PostgreSQL full-text search via raw SQL with parameterized query. - // plainto_tsquery('simple', $1) is injection-safe — it treats input as text. - let sql = r#" + // Build dynamic WHERE conditions + let mut conditions = vec![ + "room = $1".to_string(), + "content_tsv @@ plainto_tsquery('simple', $2)".to_string(), + "revoked IS NULL".to_string(), + ]; + let mut param_index = 3; + let mut params: Vec = vec![room_id.into(), request.q.trim().into()]; + + // Add time range filter + if let Some(start_time) = request.start_time { + conditions.push(format!("send_at >= ${}", param_index)); + params.push(start_time.into()); + param_index += 1; + } + if let Some(end_time) = request.end_time { + conditions.push(format!("send_at <= ${}", param_index)); + params.push(end_time.into()); + param_index += 1; + } + + // Add sender filter + if let Some(sender_id) = request.sender_id { + conditions.push(format!("sender_id = ${}", param_index)); + params.push(sender_id.into()); + param_index += 1; + } + + // Add content type filter + if let Some(ref content_type) = request.content_type { + conditions.push(format!("content_type = ${}", param_index)); + params.push(content_type.clone().into()); + param_index += 1; + } + + let where_clause = conditions.join(" AND "); + + // PostgreSQL full-text search with highlighting via raw SQL. + // Uses ts_headline for result highlighting with tags. + let sql = format!( + r#" SELECT id, seq, room, sender_type, sender_id, thread, in_reply_to, - content, content_type, edited_at, send_at, revoked, revoked_by + content, content_type, edited_at, send_at, revoked, revoked_by, + ts_headline('simple', content, plainto_tsquery('simple', $2), + 'StartSel=, StopSel=, MaxWords=50, MinWords=15') AS highlighted_content FROM room_message - WHERE room = $1 - AND content_tsv @@ plainto_tsquery('simple', $2) - AND revoked IS NULL + WHERE {} ORDER BY send_at DESC - LIMIT $3 OFFSET $4"#; - - let stmt = Statement::from_sql_and_values( - DbBackend::Postgres, - sql, - vec![ - room_id.into(), - query.trim().into(), - limit.into(), - offset.into(), - ], + LIMIT ${} OFFSET ${}"#, + where_clause, + param_index, + param_index + 1 ); - let rows: Vec = self - .db - .query_all_raw(stmt) - .await? - .into_iter() - .map(|row| { - let sender_type = row - .try_get::("", "sender_type") - .map(|s| match s.as_str() { - "admin" => models::rooms::MessageSenderType::Admin, - "owner" => models::rooms::MessageSenderType::Owner, - "ai" => models::rooms::MessageSenderType::Ai, - "system" => models::rooms::MessageSenderType::System, - "tool" => models::rooms::MessageSenderType::Tool, - "guest" => models::rooms::MessageSenderType::Guest, - _ => models::rooms::MessageSenderType::Member, - }) - .unwrap_or(models::rooms::MessageSenderType::Member); + params.push(limit.into()); + params.push(offset.into()); - let content_type = row - .try_get::("", "content_type") - .map(|s| match s.as_str() { - "image" => models::rooms::MessageContentType::Image, - "audio" => models::rooms::MessageContentType::Audio, - "video" => models::rooms::MessageContentType::Video, - "file" => models::rooms::MessageContentType::File, - _ => models::rooms::MessageContentType::Text, - }) - .unwrap_or(models::rooms::MessageContentType::Text); + let stmt = Statement::from_sql_and_values(DbBackend::Postgres, &sql, params); - room_message::Model { - id: row.try_get::("", "id").unwrap_or_default(), - seq: row.try_get::("", "seq").unwrap_or_default(), - room: row.try_get::("", "room").unwrap_or_default(), - sender_type, - sender_id: row - .try_get::>("", "sender_id") - .ok() - .flatten(), - thread: row - .try_get::>("", "thread") - .ok() - .flatten(), - in_reply_to: row - .try_get::>("", "in_reply_to") - .ok() - .flatten(), - content: row.try_get::("", "content").unwrap_or_default(), - content_type, - edited_at: row - .try_get::>("", "edited_at") - .ok() - .flatten(), - send_at: row - .try_get::("", "send_at") - .unwrap_or_default(), - revoked: row - .try_get::>("", "revoked") - .ok() - .flatten(), - revoked_by: row - .try_get::>("", "revoked_by") - .ok() - .flatten(), - content_tsv: None, - } - }) - .collect(); + let rows = self.db.query_all_raw(stmt).await?; - // Efficient COUNT query. - let count_sql = r#" - SELECT COUNT(*) AS count - FROM room_message - WHERE room = $1 - AND content_tsv @@ plainto_tsquery('simple', $2) - AND revoked IS NULL"#; + // Parse results and build response with highlighted content + let mut results: Vec = Vec::new(); - let count_stmt = Statement::from_sql_and_values( - DbBackend::Postgres, - count_sql, - vec![room_id.into(), query.trim().into()], + for row in rows { + let sender_type_str = row.try_get::("", "sender_type").unwrap_or_default(); + let sender_type = match sender_type_str.as_str() { + "admin" => models::rooms::MessageSenderType::Admin, + "owner" => models::rooms::MessageSenderType::Owner, + "ai" => models::rooms::MessageSenderType::Ai, + "system" => models::rooms::MessageSenderType::System, + "tool" => models::rooms::MessageSenderType::Tool, + "guest" => models::rooms::MessageSenderType::Guest, + _ => models::rooms::MessageSenderType::Member, + }; + + let content_type_str = row.try_get::("", "content_type").unwrap_or_default(); + let content_type = match content_type_str.as_str() { + "image" => models::rooms::MessageContentType::Image, + "audio" => models::rooms::MessageContentType::Audio, + "video" => models::rooms::MessageContentType::Video, + "file" => models::rooms::MessageContentType::File, + _ => models::rooms::MessageContentType::Text, + }; + + let msg = room_message::Model { + id: row.try_get::("", "id").unwrap_or_default(), + seq: row.try_get::("", "seq").unwrap_or_default(), + room: row.try_get::("", "room").unwrap_or_default(), + sender_type, + sender_id: row.try_get::>("", "sender_id").ok().flatten(), + thread: row.try_get::>("", "thread").ok().flatten(), + in_reply_to: row.try_get::>("", "in_reply_to").ok().flatten(), + content: row.try_get::("", "content").unwrap_or_default(), + content_type, + edited_at: row.try_get::>("", "edited_at").ok().flatten(), + send_at: row.try_get::("", "send_at").unwrap_or_default(), + revoked: row.try_get::>("", "revoked").ok().flatten(), + revoked_by: row.try_get::>("", "revoked_by").ok().flatten(), + content_tsv: None, + }; + + let highlighted_content = row + .try_get::("", "highlighted_content") + .unwrap_or_else(|_| msg.content.clone()); + + // Resolve display name for this message + let message_with_name = self.resolve_display_name(msg.clone(), room_id).await; + + let mut msg_with_name = message_with_name; + msg_with_name.highlighted_content = Some(highlighted_content); + results.push(msg_with_name); + } + + // COUNT query for total (without pagination) + let mut count_conditions = vec![ + "room = $1".to_string(), + "content_tsv @@ plainto_tsquery('simple', $2)".to_string(), + "revoked IS NULL".to_string(), + ]; + let mut count_params: Vec = vec![room_id.into(), request.q.trim().into()]; + let mut count_param_idx = 3; + + if let Some(start_time) = request.start_time { + count_conditions.push(format!("send_at >= ${}", count_param_idx)); + count_params.push(start_time.into()); + count_param_idx += 1; + } + if let Some(end_time) = request.end_time { + count_conditions.push(format!("send_at <= ${}", count_param_idx)); + count_params.push(end_time.into()); + count_param_idx += 1; + } + if let Some(sender_id) = request.sender_id { + count_conditions.push(format!("sender_id = ${}", count_param_idx)); + count_params.push(sender_id.into()); + count_param_idx += 1; + } + if let Some(ref content_type) = request.content_type { + count_conditions.push(format!("content_type = ${}", count_param_idx)); + count_params.push(content_type.clone().into()); + } + + let count_sql = format!( + "SELECT COUNT(*) AS count FROM room_message WHERE {}", + count_conditions.join(" AND ") ); - + let count_stmt = Statement::from_sql_and_values(DbBackend::Postgres, &count_sql, count_params); let count_row = self.db.query_one_raw(count_stmt).await?; let total: i64 = count_row .and_then(|r| r.try_get::("", "count").ok()) .unwrap_or(0); - let response_messages = self.build_messages_with_display_names(rows).await; - Ok(super::MessageSearchResponse { - messages: response_messages, + messages: results, total, }) } diff --git a/libs/room/src/service.rs b/libs/room/src/service.rs index cd0def1..04c1c12 100644 --- a/libs/room/src/service.rs +++ b/libs/room/src/service.rs @@ -24,6 +24,11 @@ use models::agent_task::AgentType; const DEFAULT_MAX_CONCURRENT_WORKERS: usize = 1024; +/// Callback type for sending push notifications. +/// The caller (AppService) provides this to RoomService so it can trigger +/// browser push notifications without depending on the service crate directly. +pub type PushNotificationFn = Arc, Option) + Send + Sync>; + /// Legacy: uuid or username static USER_MENTION_RE: LazyLock regex_lite::Regex> = LazyLock::new(|| regex_lite::Regex::new(r"\s*([^<]+?)\s*").unwrap()); @@ -54,6 +59,7 @@ pub struct RoomService { pub chat_service: Option>, pub task_service: Option>, pub log: slog::Logger, + pub push_fn: Option, worker_semaphore: Arc, dedup_cache: DedupCache, } @@ -69,6 +75,7 @@ impl RoomService { task_service: Option>, log: slog::Logger, max_concurrent_workers: Option, + push_fn: Option, ) -> Self { let dedup_cache: DedupCache = Arc::new(DashMap::with_capacity_and_hasher(10000, Default::default())); @@ -85,6 +92,7 @@ impl RoomService { max_concurrent_workers.unwrap_or(DEFAULT_MAX_CONCURRENT_WORKERS), )), dedup_cache, + push_fn, } } @@ -523,6 +531,12 @@ impl RoomService { super::NotificationType::SystemAnnouncement => { room_notifications::NotificationType::SystemAnnouncement } + super::NotificationType::ProjectInvitation => { + room_notifications::NotificationType::ProjectInvitation + } + super::NotificationType::WorkspaceInvitation => { + room_notifications::NotificationType::WorkspaceInvitation + } }; let _model = room_notifications::ActiveModel { @@ -975,7 +989,9 @@ impl RoomService { let room_manager = room_manager.clone(); let db = db.clone(); let model_id = model_id; - let ai_display_name = ai_display_name; + // Clone before closure so closure captures clone, not the original. + let ai_display_name_for_chunk = ai_display_name.clone(); + let ai_display_name_for_final = ai_display_name.clone(); let streaming_msg_id = streaming_msg_id; let room_id_for_chunk = room_id_inner; @@ -988,6 +1004,8 @@ impl RoomService { let streaming_msg_id = streaming_msg_id; let room_id = room_id_for_chunk; let chunk_count = chunk_count.clone(); + // Clone display_name INSIDE the async block so the outer closure stays `Fn`. + let ai_display_name_for_chunk = ai_display_name_for_chunk.clone(); async move { let event = RoomMessageStreamChunkEvent { message_id: streaming_msg_id, @@ -995,6 +1013,7 @@ impl RoomService { content: chunk.content, done: chunk.done, error: None, + display_name: Some(ai_display_name_for_chunk), }; room_manager.broadcast_stream_chunk(event).await; @@ -1026,6 +1045,7 @@ impl RoomService { room_id: room_id_inner, sender_type: sender_type.clone(), sender_id: None, + model_id: Some(model_id), thread_id: None, content: full_content.clone(), content_type: "text".to_string(), @@ -1062,7 +1082,7 @@ impl RoomService { content_type: "text".to_string(), send_at: now, seq, - display_name: Some(ai_display_name.clone()), + display_name: Some(ai_display_name_for_final.clone()), in_reply_to: None, reactions: None, message_id: None, @@ -1092,6 +1112,7 @@ impl RoomService { content: String::new(), done: true, error: Some(e.to_string()), + display_name: Some(ai_display_name.clone()), }; room_manager.broadcast_stream_chunk(event).await; } @@ -1134,6 +1155,7 @@ impl RoomService { project_id_for_ai, Uuid::now_v7(), response, + model_id_inner, Some(model_display_name), ) .await @@ -1172,6 +1194,7 @@ impl RoomService { project_id: Uuid, _reply_to: Uuid, content: String, + model_id: Uuid, model_display_name: Option, ) -> Result { let now = Utc::now(); @@ -1184,6 +1207,7 @@ impl RoomService { room_id, sender_type: "ai".to_string(), sender_id: None, + model_id: Some(model_id), thread_id: None, content: content.clone(), content_type: "text".to_string(), diff --git a/libs/room/src/types.rs b/libs/room/src/types.rs index a79665b..5b3c4a5 100644 --- a/libs/room/src/types.rs +++ b/libs/room/src/types.rs @@ -126,7 +126,7 @@ pub struct RoomUpdateRequest { pub category: Option, } -#[derive(Debug, Clone, Serialize, utoipa::ToSchema)] +#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] pub struct RoomResponse { pub id: Uuid, pub project: Uuid, @@ -157,7 +157,7 @@ pub struct RoomMemberReadSeqRequest { pub last_read_seq: i64, } -#[derive(Debug, Clone, Serialize, utoipa::ToSchema)] +#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] pub struct RoomMemberResponse { pub room: Uuid, pub user: Uuid, @@ -192,6 +192,17 @@ pub struct RoomMessageUpdateRequest { pub content: String, } +#[derive(Debug, Clone, Deserialize, Serialize, utoipa::ToSchema)] +pub struct RoomMessageSearchRequest { + pub q: String, + pub start_time: Option>, + pub end_time: Option>, + pub sender_id: Option, + pub content_type: Option, + pub limit: Option, + pub offset: Option, +} + #[derive(Debug, Clone, Serialize, utoipa::ToSchema)] pub struct RoomMessageResponse { pub id: Uuid, @@ -208,6 +219,16 @@ pub struct RoomMessageResponse { pub send_at: DateTime, pub revoked: Option>, pub revoked_by: Option, + /// Highlighted content with tags around matched terms (for search results) + #[serde(skip_serializing_if = "Option::is_none")] + pub highlighted_content: Option, +} + +/// Search result wrapper (keeps API compatibility) +#[derive(Debug, Clone, Serialize, utoipa::ToSchema)] +pub struct RoomMessageSearchResult { + #[serde(flatten)] + pub message: RoomMessageResponse, } #[derive(Debug, Clone, Serialize, utoipa::ToSchema)] @@ -285,6 +306,8 @@ pub enum NotificationType { RoomCreated, RoomDeleted, SystemAnnouncement, + ProjectInvitation, + WorkspaceInvitation, } #[derive(Debug, Clone, Serialize, utoipa::ToSchema)]