feat(room): update room lib (connection, helpers, member, message, notification, reaction, room, search, service, types)
This commit is contained in:
parent
1b863a9f65
commit
cec8d486f1
@ -10,7 +10,7 @@ use uuid::Uuid;
|
||||
use db::database::AppDatabase;
|
||||
use models::rooms::{MessageContentType, MessageSenderType, room_message};
|
||||
use queue::{AgentTaskEvent, ProjectRoomEvent, RoomMessageEnvelope, RoomMessageEvent, RoomMessageStreamChunkEvent};
|
||||
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, Set};
|
||||
use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, Set};
|
||||
|
||||
use crate::error::RoomError;
|
||||
use crate::metrics::RoomMetrics;
|
||||
@ -720,6 +720,7 @@ pub fn make_persist_fn(
|
||||
room: Set(env.room_id),
|
||||
sender_type: Set(sender_type),
|
||||
sender_id: Set(env.sender_id),
|
||||
model_id: Set(env.model_id),
|
||||
thread: Set(env.thread_id),
|
||||
content: Set(env.content.clone()),
|
||||
content_type: Set(content_type),
|
||||
@ -736,6 +737,21 @@ pub fn make_persist_fn(
|
||||
room_message::Entity::insert_many(models_to_insert)
|
||||
.exec(&db)
|
||||
.await?;
|
||||
|
||||
// Update content_tsv for inserted messages
|
||||
for env in chunk.iter() {
|
||||
let update_sql = format!(
|
||||
"UPDATE room_message SET content_tsv = to_tsvector('simple', content) WHERE id = '{}'",
|
||||
env.id
|
||||
);
|
||||
let stmt = sea_orm::Statement::from_sql_and_values(
|
||||
sea_orm::DbBackend::Postgres,
|
||||
&update_sql,
|
||||
vec![],
|
||||
);
|
||||
let _ = db.execute_raw(stmt).await;
|
||||
}
|
||||
|
||||
metrics.messages_persisted.increment(count);
|
||||
}
|
||||
}
|
||||
|
||||
@ -73,6 +73,7 @@ impl From<room_message::Model> for super::RoomMessageResponse {
|
||||
revoked: value.revoked,
|
||||
revoked_by: value.revoked_by,
|
||||
in_reply_to: value.in_reply_to,
|
||||
highlighted_content: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -388,8 +389,8 @@ impl RoomService {
|
||||
let sender_type = msg.sender_type.to_string();
|
||||
let display_name = match sender_type.as_str() {
|
||||
"ai" => {
|
||||
if let Some(sender_id) = msg.sender_id {
|
||||
ai_model::Entity::find_by_id(sender_id)
|
||||
if let Some(mid) = msg.model_id {
|
||||
ai_model::Entity::find_by_id(mid)
|
||||
.one(&self.db)
|
||||
.await
|
||||
.ok()
|
||||
@ -429,6 +430,7 @@ impl RoomService {
|
||||
revoked: msg.revoked,
|
||||
revoked_by: msg.revoked_by,
|
||||
in_reply_to: msg.in_reply_to,
|
||||
highlighted_content: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -30,5 +30,5 @@ pub use draft_and_history::{
|
||||
pub use error::RoomError;
|
||||
pub use metrics::RoomMetrics;
|
||||
pub use reaction::{MessageReactionsResponse, MessageSearchResponse};
|
||||
pub use service::RoomService;
|
||||
pub use service::{RoomService, PushNotificationFn};
|
||||
pub use types::{RoomEventType, *};
|
||||
|
||||
@ -9,6 +9,9 @@ use sea_orm::*;
|
||||
use uuid::Uuid;
|
||||
|
||||
impl RoomService {
|
||||
/// Cache TTL for member list (in seconds).
|
||||
const MEMBER_LIST_CACHE_TTL: u64 = 30;
|
||||
|
||||
pub async fn room_member_list(
|
||||
&self,
|
||||
room_id: Uuid,
|
||||
@ -17,6 +20,24 @@ impl RoomService {
|
||||
let user_id = ctx.user_id;
|
||||
self.require_room_member(room_id, user_id).await?;
|
||||
|
||||
// Try cache first
|
||||
let cache_key = format!("room:members:{}", room_id);
|
||||
|
||||
if let Ok(mut conn) = self.cache.conn().await {
|
||||
if let Ok(Some(cached)) = redis::cmd("GET")
|
||||
.arg(&cache_key)
|
||||
.query_async::<Option<String>>(&mut conn)
|
||||
.await
|
||||
{
|
||||
if let Ok(responses) = serde_json::from_str::<Vec<super::RoomMemberResponse>>(&cached) {
|
||||
slog::debug!(self.log, "room_member_list: cache hit for key={}", cache_key);
|
||||
return Ok(responses);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
slog::debug!(self.log, "room_member_list: cache miss for key={}", cache_key);
|
||||
|
||||
let members = room_member::Entity::find()
|
||||
.filter(room_member::Column::Room.eq(room_id))
|
||||
.all(&self.db)
|
||||
@ -60,6 +81,23 @@ impl RoomService {
|
||||
dnd_end_hour: m.dnd_end_hour,
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Cache the result
|
||||
if let Ok(mut conn) = self.cache.conn().await {
|
||||
if let Ok(json) = serde_json::to_string(&responses) {
|
||||
let _: Option<String> = redis::cmd("SETEX")
|
||||
.arg(&cache_key)
|
||||
.arg(Self::MEMBER_LIST_CACHE_TTL)
|
||||
.arg(&json)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.inspect_err(|e| {
|
||||
slog::warn!(self.log, "room_member_list: failed to cache key={}: {}", cache_key, e);
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(responses)
|
||||
}
|
||||
|
||||
@ -121,6 +159,9 @@ impl RoomService {
|
||||
|
||||
drop(self.room_manager.subscribe(room_id, request.user_id).await);
|
||||
|
||||
// Invalidate member list cache
|
||||
self.invalidate_member_list_cache(room_id).await;
|
||||
|
||||
self.publish_room_event(
|
||||
room_model.project,
|
||||
super::RoomEventType::MemberJoined,
|
||||
@ -198,6 +239,9 @@ impl RoomService {
|
||||
active.role = Set(new_role);
|
||||
let updated = active.update(&self.db).await?;
|
||||
|
||||
// Invalidate member list cache
|
||||
self.invalidate_member_list_cache(room_id).await;
|
||||
|
||||
let room = self.find_room_or_404(room_id).await?;
|
||||
let _ = self
|
||||
.notification_create(super::NotificationCreateRequest {
|
||||
@ -264,6 +308,9 @@ impl RoomService {
|
||||
.exec(&self.db)
|
||||
.await?;
|
||||
|
||||
// Invalidate member list cache
|
||||
self.invalidate_member_list_cache(room_id).await;
|
||||
|
||||
self.room_manager.unsubscribe(room_id, user_id).await;
|
||||
|
||||
let room = self.find_room_or_404(room_id).await?;
|
||||
@ -367,4 +414,20 @@ impl RoomService {
|
||||
};
|
||||
Ok(updated_response)
|
||||
}
|
||||
|
||||
/// Invalidate member list cache for a room.
|
||||
async fn invalidate_member_list_cache(&self, room_id: Uuid) {
|
||||
let cache_key = format!("room:members:{}", room_id);
|
||||
if let Ok(mut conn) = self.cache.conn().await {
|
||||
if let Err(e) = redis::cmd("DEL")
|
||||
.arg(&cache_key)
|
||||
.query_async::<i64>(&mut conn)
|
||||
.await
|
||||
{
|
||||
slog::warn!(self.log, "invalidate_member_list_cache: DEL failed for {}: {}", cache_key, e);
|
||||
} else {
|
||||
slog::debug!(self.log, "invalidate_member_list_cache: deleted {}", cache_key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -44,7 +44,7 @@ impl RoomService {
|
||||
let ai_model_ids: Vec<Uuid> = models
|
||||
.iter()
|
||||
.filter(|m| m.sender_type.to_string() == "ai")
|
||||
.filter_map(|m| m.sender_id)
|
||||
.filter_map(|m| m.model_id)
|
||||
.collect();
|
||||
|
||||
let users: std::collections::HashMap<Uuid, String> = if !user_ids.is_empty() {
|
||||
@ -78,7 +78,7 @@ impl RoomService {
|
||||
.map(|msg| {
|
||||
let sender_type = msg.sender_type.to_string();
|
||||
let display_name = match sender_type.as_str() {
|
||||
"ai" => msg.sender_id.and_then(|id| ai_names.get(&id).cloned()),
|
||||
"ai" => msg.model_id.and_then(|id| ai_names.get(&id).cloned()),
|
||||
_ => msg.sender_id.and_then(|id| users.get(&id).cloned()),
|
||||
};
|
||||
super::RoomMessageResponse {
|
||||
@ -96,6 +96,7 @@ impl RoomService {
|
||||
send_at: msg.send_at,
|
||||
revoked: msg.revoked,
|
||||
revoked_by: msg.revoked_by,
|
||||
highlighted_content: None,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
@ -146,6 +147,7 @@ impl RoomService {
|
||||
room_id,
|
||||
sender_type: "member".to_string(),
|
||||
sender_id: Some(user_id),
|
||||
model_id: None,
|
||||
thread_id,
|
||||
in_reply_to,
|
||||
content: content.clone(),
|
||||
@ -275,6 +277,7 @@ impl RoomService {
|
||||
send_at: now,
|
||||
revoked: None,
|
||||
revoked_by: None,
|
||||
highlighted_content: None,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@ -30,6 +30,12 @@ impl RoomService {
|
||||
super::NotificationType::SystemAnnouncement => {
|
||||
room_notifications::NotificationType::SystemAnnouncement
|
||||
}
|
||||
super::NotificationType::ProjectInvitation => {
|
||||
room_notifications::NotificationType::ProjectInvitation
|
||||
}
|
||||
super::NotificationType::WorkspaceInvitation => {
|
||||
room_notifications::NotificationType::WorkspaceInvitation
|
||||
}
|
||||
};
|
||||
|
||||
let model = room_notifications::ActiveModel {
|
||||
@ -261,10 +267,20 @@ impl RoomService {
|
||||
user_id: Uuid,
|
||||
notification: super::NotificationResponse,
|
||||
) {
|
||||
let event = super::NotificationEvent::new(notification);
|
||||
let event = super::NotificationEvent::new(notification.clone());
|
||||
self.room_manager
|
||||
.push_user_notification(user_id, Arc::new(event))
|
||||
.await;
|
||||
|
||||
// Also trigger Web Push for offline users
|
||||
if let Some(push_fn) = &self.push_fn {
|
||||
push_fn(
|
||||
user_id,
|
||||
notification.title.clone(),
|
||||
notification.content.clone(),
|
||||
None, // URL — could be derived from room/project
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn unread_cache_key(user_id: Uuid) -> String {
|
||||
|
||||
@ -325,6 +325,7 @@ impl RoomService {
|
||||
send_at: msg.send_at,
|
||||
revoked: msg.revoked,
|
||||
revoked_by: msg.revoked_by,
|
||||
highlighted_content: None,
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
|
||||
@ -11,6 +11,9 @@ use sea_orm::*;
|
||||
use uuid::Uuid;
|
||||
|
||||
impl RoomService {
|
||||
/// Cache TTL for room list (in seconds).
|
||||
const ROOM_LIST_CACHE_TTL: u64 = 60;
|
||||
|
||||
pub async fn room_list(
|
||||
&self,
|
||||
project_name: String,
|
||||
@ -21,6 +24,29 @@ impl RoomService {
|
||||
let project = self.utils_find_project_by_name(project_name).await?;
|
||||
self.check_project_access(project.id, user_id).await?;
|
||||
|
||||
// Try cache first
|
||||
let cache_key = format!(
|
||||
"room:list:{}:{}:public={}",
|
||||
project.id,
|
||||
user_id,
|
||||
only_public.unwrap_or(false)
|
||||
);
|
||||
|
||||
if let Ok(mut conn) = self.cache.conn().await {
|
||||
if let Ok(Some(cached)) = redis::cmd("GET")
|
||||
.arg(&cache_key)
|
||||
.query_async::<Option<String>>(&mut conn)
|
||||
.await
|
||||
{
|
||||
if let Ok(responses) = serde_json::from_str::<Vec<super::RoomResponse>>(&cached) {
|
||||
slog::debug!(self.log, "room_list: cache hit for key={}", cache_key);
|
||||
return Ok(responses);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
slog::debug!(self.log, "room_list: cache miss for key={}", cache_key);
|
||||
|
||||
let mut query = room::Entity::find().filter(room::Column::Project.eq(project.id));
|
||||
if only_public.unwrap_or(false) {
|
||||
query = query.filter(room::Column::Public.eq(true));
|
||||
@ -66,6 +92,22 @@ impl RoomService {
|
||||
responses.push(response);
|
||||
}
|
||||
|
||||
// Cache the result
|
||||
if let Ok(mut conn) = self.cache.conn().await {
|
||||
if let Ok(json) = serde_json::to_string(&responses) {
|
||||
let _: Option<String> = redis::cmd("SETEX")
|
||||
.arg(&cache_key)
|
||||
.arg(Self::ROOM_LIST_CACHE_TTL)
|
||||
.arg(&json)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.inspect_err(|e| {
|
||||
slog::warn!(self.log, "room_list: failed to cache key={}: {}", cache_key, e);
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(responses)
|
||||
}
|
||||
|
||||
@ -156,6 +198,9 @@ impl RoomService {
|
||||
|
||||
txn.commit().await?;
|
||||
|
||||
// Invalidate room list cache for this project
|
||||
self.invalidate_room_list_cache(project.id).await;
|
||||
|
||||
self.spawn_room_workers(room_model.id);
|
||||
|
||||
let event = ProjectRoomEvent {
|
||||
@ -232,6 +277,9 @@ impl RoomService {
|
||||
}
|
||||
let updated = active.update(&self.db).await?;
|
||||
|
||||
// Invalidate room list cache
|
||||
self.invalidate_room_list_cache(updated.project).await;
|
||||
|
||||
if renamed {
|
||||
let event = ProjectRoomEvent {
|
||||
event_type: super::RoomEventType::RoomRenamed.as_str().into(),
|
||||
@ -303,6 +351,9 @@ impl RoomService {
|
||||
|
||||
txn.commit().await?;
|
||||
|
||||
// Invalidate room list cache
|
||||
self.invalidate_room_list_cache(project_id).await;
|
||||
|
||||
self.room_manager.shutdown_room(room_id).await;
|
||||
|
||||
// Clean up Redis seq key so re-creating the room starts fresh
|
||||
@ -342,4 +393,49 @@ impl RoomService {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Invalidate all room list cache entries for a project.
|
||||
async fn invalidate_room_list_cache(&self, project_id: Uuid) {
|
||||
let pattern = format!("room:list:{}:*", project_id);
|
||||
if let Ok(mut conn) = self.cache.conn().await {
|
||||
// Use SCAN to find matching keys, then DELETE them
|
||||
let mut cursor: u64 = 0;
|
||||
loop {
|
||||
let (new_cursor, keys): (u64, Vec<String>) = match redis::cmd("SCAN")
|
||||
.arg(cursor)
|
||||
.arg("MATCH")
|
||||
.arg(&pattern)
|
||||
.arg("COUNT")
|
||||
.arg(100)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(e) => {
|
||||
slog::warn!(self.log, "invalidate_room_list_cache: SCAN failed: {}", e);
|
||||
break;
|
||||
}
|
||||
};
|
||||
cursor = new_cursor;
|
||||
|
||||
if !keys.is_empty() {
|
||||
// Delete keys in batches
|
||||
let keys_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
|
||||
if let Err(e) = redis::cmd("DEL")
|
||||
.arg(&keys_refs)
|
||||
.query_async::<i64>(&mut conn)
|
||||
.await
|
||||
{
|
||||
slog::warn!(self.log, "invalidate_room_list_cache: DEL failed: {}", e);
|
||||
} else {
|
||||
slog::debug!(self.log, "invalidate_room_list_cache: deleted {} keys", keys.len());
|
||||
}
|
||||
}
|
||||
|
||||
if cursor == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
use crate::error::RoomError;
|
||||
use crate::service::RoomService;
|
||||
use crate::types::RoomMessageSearchRequest;
|
||||
use crate::ws_context::WsUserContext;
|
||||
use chrono::Utc;
|
||||
use models::rooms::{room_message, room_message_reaction};
|
||||
@ -11,139 +12,177 @@ impl RoomService {
|
||||
pub async fn room_message_search(
|
||||
&self,
|
||||
room_id: Uuid,
|
||||
query: &str,
|
||||
limit: Option<u64>,
|
||||
offset: Option<u64>,
|
||||
request: RoomMessageSearchRequest,
|
||||
ctx: &WsUserContext,
|
||||
) -> Result<super::MessageSearchResponse, RoomError> {
|
||||
let user_id = ctx.user_id;
|
||||
self.require_room_member(room_id, user_id).await?;
|
||||
|
||||
if query.trim().is_empty() {
|
||||
if request.q.trim().is_empty() {
|
||||
return Ok(super::MessageSearchResponse {
|
||||
messages: Vec::new(),
|
||||
total: 0,
|
||||
});
|
||||
}
|
||||
|
||||
let limit = std::cmp::min(limit.unwrap_or(20), 100);
|
||||
let offset = offset.unwrap_or(0);
|
||||
let limit = std::cmp::min(request.limit.unwrap_or(20), 100);
|
||||
let offset = request.offset.unwrap_or(0);
|
||||
|
||||
// PostgreSQL full-text search via raw SQL with parameterized query.
|
||||
// plainto_tsquery('simple', $1) is injection-safe — it treats input as text.
|
||||
let sql = r#"
|
||||
// Build dynamic WHERE conditions
|
||||
let mut conditions = vec![
|
||||
"room = $1".to_string(),
|
||||
"content_tsv @@ plainto_tsquery('simple', $2)".to_string(),
|
||||
"revoked IS NULL".to_string(),
|
||||
];
|
||||
let mut param_index = 3;
|
||||
let mut params: Vec<sea_orm::Value> = vec![room_id.into(), request.q.trim().into()];
|
||||
|
||||
// Add time range filter
|
||||
if let Some(start_time) = request.start_time {
|
||||
conditions.push(format!("send_at >= ${}", param_index));
|
||||
params.push(start_time.into());
|
||||
param_index += 1;
|
||||
}
|
||||
if let Some(end_time) = request.end_time {
|
||||
conditions.push(format!("send_at <= ${}", param_index));
|
||||
params.push(end_time.into());
|
||||
param_index += 1;
|
||||
}
|
||||
|
||||
// Add sender filter
|
||||
if let Some(sender_id) = request.sender_id {
|
||||
conditions.push(format!("sender_id = ${}", param_index));
|
||||
params.push(sender_id.into());
|
||||
param_index += 1;
|
||||
}
|
||||
|
||||
// Add content type filter
|
||||
if let Some(ref content_type) = request.content_type {
|
||||
conditions.push(format!("content_type = ${}", param_index));
|
||||
params.push(content_type.clone().into());
|
||||
param_index += 1;
|
||||
}
|
||||
|
||||
let where_clause = conditions.join(" AND ");
|
||||
|
||||
// PostgreSQL full-text search with highlighting via raw SQL.
|
||||
// Uses ts_headline for result highlighting with <mark> tags.
|
||||
let sql = format!(
|
||||
r#"
|
||||
SELECT id, seq, room, sender_type, sender_id, thread, in_reply_to,
|
||||
content, content_type, edited_at, send_at, revoked, revoked_by
|
||||
content, content_type, edited_at, send_at, revoked, revoked_by,
|
||||
ts_headline('simple', content, plainto_tsquery('simple', $2),
|
||||
'StartSel=<mark>, StopSel=</mark>, MaxWords=50, MinWords=15') AS highlighted_content
|
||||
FROM room_message
|
||||
WHERE room = $1
|
||||
AND content_tsv @@ plainto_tsquery('simple', $2)
|
||||
AND revoked IS NULL
|
||||
WHERE {}
|
||||
ORDER BY send_at DESC
|
||||
LIMIT $3 OFFSET $4"#;
|
||||
|
||||
let stmt = Statement::from_sql_and_values(
|
||||
DbBackend::Postgres,
|
||||
sql,
|
||||
vec![
|
||||
room_id.into(),
|
||||
query.trim().into(),
|
||||
limit.into(),
|
||||
offset.into(),
|
||||
],
|
||||
LIMIT ${} OFFSET ${}"#,
|
||||
where_clause,
|
||||
param_index,
|
||||
param_index + 1
|
||||
);
|
||||
|
||||
let rows: Vec<room_message::Model> = self
|
||||
.db
|
||||
.query_all_raw(stmt)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|row| {
|
||||
let sender_type = row
|
||||
.try_get::<String>("", "sender_type")
|
||||
.map(|s| match s.as_str() {
|
||||
"admin" => models::rooms::MessageSenderType::Admin,
|
||||
"owner" => models::rooms::MessageSenderType::Owner,
|
||||
"ai" => models::rooms::MessageSenderType::Ai,
|
||||
"system" => models::rooms::MessageSenderType::System,
|
||||
"tool" => models::rooms::MessageSenderType::Tool,
|
||||
"guest" => models::rooms::MessageSenderType::Guest,
|
||||
_ => models::rooms::MessageSenderType::Member,
|
||||
})
|
||||
.unwrap_or(models::rooms::MessageSenderType::Member);
|
||||
params.push(limit.into());
|
||||
params.push(offset.into());
|
||||
|
||||
let content_type = row
|
||||
.try_get::<String>("", "content_type")
|
||||
.map(|s| match s.as_str() {
|
||||
"image" => models::rooms::MessageContentType::Image,
|
||||
"audio" => models::rooms::MessageContentType::Audio,
|
||||
"video" => models::rooms::MessageContentType::Video,
|
||||
"file" => models::rooms::MessageContentType::File,
|
||||
_ => models::rooms::MessageContentType::Text,
|
||||
})
|
||||
.unwrap_or(models::rooms::MessageContentType::Text);
|
||||
let stmt = Statement::from_sql_and_values(DbBackend::Postgres, &sql, params);
|
||||
|
||||
room_message::Model {
|
||||
id: row.try_get::<MessageId>("", "id").unwrap_or_default(),
|
||||
seq: row.try_get::<Seq>("", "seq").unwrap_or_default(),
|
||||
room: row.try_get::<RoomId>("", "room").unwrap_or_default(),
|
||||
sender_type,
|
||||
sender_id: row
|
||||
.try_get::<Option<UserId>>("", "sender_id")
|
||||
.ok()
|
||||
.flatten(),
|
||||
thread: row
|
||||
.try_get::<Option<RoomThreadId>>("", "thread")
|
||||
.ok()
|
||||
.flatten(),
|
||||
in_reply_to: row
|
||||
.try_get::<Option<MessageId>>("", "in_reply_to")
|
||||
.ok()
|
||||
.flatten(),
|
||||
content: row.try_get::<String>("", "content").unwrap_or_default(),
|
||||
content_type,
|
||||
edited_at: row
|
||||
.try_get::<Option<DateTimeUtc>>("", "edited_at")
|
||||
.ok()
|
||||
.flatten(),
|
||||
send_at: row
|
||||
.try_get::<DateTimeUtc>("", "send_at")
|
||||
.unwrap_or_default(),
|
||||
revoked: row
|
||||
.try_get::<Option<DateTimeUtc>>("", "revoked")
|
||||
.ok()
|
||||
.flatten(),
|
||||
revoked_by: row
|
||||
.try_get::<Option<UserId>>("", "revoked_by")
|
||||
.ok()
|
||||
.flatten(),
|
||||
content_tsv: None,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let rows = self.db.query_all_raw(stmt).await?;
|
||||
|
||||
// Efficient COUNT query.
|
||||
let count_sql = r#"
|
||||
SELECT COUNT(*) AS count
|
||||
FROM room_message
|
||||
WHERE room = $1
|
||||
AND content_tsv @@ plainto_tsquery('simple', $2)
|
||||
AND revoked IS NULL"#;
|
||||
// Parse results and build response with highlighted content
|
||||
let mut results: Vec<super::RoomMessageResponse> = Vec::new();
|
||||
|
||||
let count_stmt = Statement::from_sql_and_values(
|
||||
DbBackend::Postgres,
|
||||
count_sql,
|
||||
vec![room_id.into(), query.trim().into()],
|
||||
for row in rows {
|
||||
let sender_type_str = row.try_get::<String>("", "sender_type").unwrap_or_default();
|
||||
let sender_type = match sender_type_str.as_str() {
|
||||
"admin" => models::rooms::MessageSenderType::Admin,
|
||||
"owner" => models::rooms::MessageSenderType::Owner,
|
||||
"ai" => models::rooms::MessageSenderType::Ai,
|
||||
"system" => models::rooms::MessageSenderType::System,
|
||||
"tool" => models::rooms::MessageSenderType::Tool,
|
||||
"guest" => models::rooms::MessageSenderType::Guest,
|
||||
_ => models::rooms::MessageSenderType::Member,
|
||||
};
|
||||
|
||||
let content_type_str = row.try_get::<String>("", "content_type").unwrap_or_default();
|
||||
let content_type = match content_type_str.as_str() {
|
||||
"image" => models::rooms::MessageContentType::Image,
|
||||
"audio" => models::rooms::MessageContentType::Audio,
|
||||
"video" => models::rooms::MessageContentType::Video,
|
||||
"file" => models::rooms::MessageContentType::File,
|
||||
_ => models::rooms::MessageContentType::Text,
|
||||
};
|
||||
|
||||
let msg = room_message::Model {
|
||||
id: row.try_get::<MessageId>("", "id").unwrap_or_default(),
|
||||
seq: row.try_get::<Seq>("", "seq").unwrap_or_default(),
|
||||
room: row.try_get::<RoomId>("", "room").unwrap_or_default(),
|
||||
sender_type,
|
||||
sender_id: row.try_get::<Option<UserId>>("", "sender_id").ok().flatten(),
|
||||
thread: row.try_get::<Option<RoomThreadId>>("", "thread").ok().flatten(),
|
||||
in_reply_to: row.try_get::<Option<MessageId>>("", "in_reply_to").ok().flatten(),
|
||||
content: row.try_get::<String>("", "content").unwrap_or_default(),
|
||||
content_type,
|
||||
edited_at: row.try_get::<Option<DateTimeUtc>>("", "edited_at").ok().flatten(),
|
||||
send_at: row.try_get::<DateTimeUtc>("", "send_at").unwrap_or_default(),
|
||||
revoked: row.try_get::<Option<DateTimeUtc>>("", "revoked").ok().flatten(),
|
||||
revoked_by: row.try_get::<Option<UserId>>("", "revoked_by").ok().flatten(),
|
||||
content_tsv: None,
|
||||
};
|
||||
|
||||
let highlighted_content = row
|
||||
.try_get::<String>("", "highlighted_content")
|
||||
.unwrap_or_else(|_| msg.content.clone());
|
||||
|
||||
// Resolve display name for this message
|
||||
let message_with_name = self.resolve_display_name(msg.clone(), room_id).await;
|
||||
|
||||
let mut msg_with_name = message_with_name;
|
||||
msg_with_name.highlighted_content = Some(highlighted_content);
|
||||
results.push(msg_with_name);
|
||||
}
|
||||
|
||||
// COUNT query for total (without pagination)
|
||||
let mut count_conditions = vec![
|
||||
"room = $1".to_string(),
|
||||
"content_tsv @@ plainto_tsquery('simple', $2)".to_string(),
|
||||
"revoked IS NULL".to_string(),
|
||||
];
|
||||
let mut count_params: Vec<sea_orm::Value> = vec![room_id.into(), request.q.trim().into()];
|
||||
let mut count_param_idx = 3;
|
||||
|
||||
if let Some(start_time) = request.start_time {
|
||||
count_conditions.push(format!("send_at >= ${}", count_param_idx));
|
||||
count_params.push(start_time.into());
|
||||
count_param_idx += 1;
|
||||
}
|
||||
if let Some(end_time) = request.end_time {
|
||||
count_conditions.push(format!("send_at <= ${}", count_param_idx));
|
||||
count_params.push(end_time.into());
|
||||
count_param_idx += 1;
|
||||
}
|
||||
if let Some(sender_id) = request.sender_id {
|
||||
count_conditions.push(format!("sender_id = ${}", count_param_idx));
|
||||
count_params.push(sender_id.into());
|
||||
count_param_idx += 1;
|
||||
}
|
||||
if let Some(ref content_type) = request.content_type {
|
||||
count_conditions.push(format!("content_type = ${}", count_param_idx));
|
||||
count_params.push(content_type.clone().into());
|
||||
}
|
||||
|
||||
let count_sql = format!(
|
||||
"SELECT COUNT(*) AS count FROM room_message WHERE {}",
|
||||
count_conditions.join(" AND ")
|
||||
);
|
||||
|
||||
let count_stmt = Statement::from_sql_and_values(DbBackend::Postgres, &count_sql, count_params);
|
||||
let count_row = self.db.query_one_raw(count_stmt).await?;
|
||||
let total: i64 = count_row
|
||||
.and_then(|r| r.try_get::<i64>("", "count").ok())
|
||||
.unwrap_or(0);
|
||||
|
||||
let response_messages = self.build_messages_with_display_names(rows).await;
|
||||
|
||||
Ok(super::MessageSearchResponse {
|
||||
messages: response_messages,
|
||||
messages: results,
|
||||
total,
|
||||
})
|
||||
}
|
||||
|
||||
@ -24,6 +24,11 @@ use models::agent_task::AgentType;
|
||||
|
||||
const DEFAULT_MAX_CONCURRENT_WORKERS: usize = 1024;
|
||||
|
||||
/// Callback type for sending push notifications.
|
||||
/// The caller (AppService) provides this to RoomService so it can trigger
|
||||
/// browser push notifications without depending on the service crate directly.
|
||||
pub type PushNotificationFn = Arc<dyn Fn(Uuid, String, Option<String>, Option<String>) + Send + Sync>;
|
||||
|
||||
/// Legacy: <user>uuid</user> or <user>username</user>
|
||||
static USER_MENTION_RE: LazyLock<regex_lite::Regex, fn() -> regex_lite::Regex> =
|
||||
LazyLock::new(|| regex_lite::Regex::new(r"<user>\s*([^<]+?)\s*</user>").unwrap());
|
||||
@ -54,6 +59,7 @@ pub struct RoomService {
|
||||
pub chat_service: Option<Arc<ChatService>>,
|
||||
pub task_service: Option<Arc<TaskService>>,
|
||||
pub log: slog::Logger,
|
||||
pub push_fn: Option<PushNotificationFn>,
|
||||
worker_semaphore: Arc<tokio::sync::Semaphore>,
|
||||
dedup_cache: DedupCache,
|
||||
}
|
||||
@ -69,6 +75,7 @@ impl RoomService {
|
||||
task_service: Option<Arc<TaskService>>,
|
||||
log: slog::Logger,
|
||||
max_concurrent_workers: Option<usize>,
|
||||
push_fn: Option<PushNotificationFn>,
|
||||
) -> Self {
|
||||
let dedup_cache: DedupCache =
|
||||
Arc::new(DashMap::with_capacity_and_hasher(10000, Default::default()));
|
||||
@ -85,6 +92,7 @@ impl RoomService {
|
||||
max_concurrent_workers.unwrap_or(DEFAULT_MAX_CONCURRENT_WORKERS),
|
||||
)),
|
||||
dedup_cache,
|
||||
push_fn,
|
||||
}
|
||||
}
|
||||
|
||||
@ -523,6 +531,12 @@ impl RoomService {
|
||||
super::NotificationType::SystemAnnouncement => {
|
||||
room_notifications::NotificationType::SystemAnnouncement
|
||||
}
|
||||
super::NotificationType::ProjectInvitation => {
|
||||
room_notifications::NotificationType::ProjectInvitation
|
||||
}
|
||||
super::NotificationType::WorkspaceInvitation => {
|
||||
room_notifications::NotificationType::WorkspaceInvitation
|
||||
}
|
||||
};
|
||||
|
||||
let _model = room_notifications::ActiveModel {
|
||||
@ -975,7 +989,9 @@ impl RoomService {
|
||||
let room_manager = room_manager.clone();
|
||||
let db = db.clone();
|
||||
let model_id = model_id;
|
||||
let ai_display_name = ai_display_name;
|
||||
// Clone before closure so closure captures clone, not the original.
|
||||
let ai_display_name_for_chunk = ai_display_name.clone();
|
||||
let ai_display_name_for_final = ai_display_name.clone();
|
||||
|
||||
let streaming_msg_id = streaming_msg_id;
|
||||
let room_id_for_chunk = room_id_inner;
|
||||
@ -988,6 +1004,8 @@ impl RoomService {
|
||||
let streaming_msg_id = streaming_msg_id;
|
||||
let room_id = room_id_for_chunk;
|
||||
let chunk_count = chunk_count.clone();
|
||||
// Clone display_name INSIDE the async block so the outer closure stays `Fn`.
|
||||
let ai_display_name_for_chunk = ai_display_name_for_chunk.clone();
|
||||
async move {
|
||||
let event = RoomMessageStreamChunkEvent {
|
||||
message_id: streaming_msg_id,
|
||||
@ -995,6 +1013,7 @@ impl RoomService {
|
||||
content: chunk.content,
|
||||
done: chunk.done,
|
||||
error: None,
|
||||
display_name: Some(ai_display_name_for_chunk),
|
||||
};
|
||||
room_manager.broadcast_stream_chunk(event).await;
|
||||
|
||||
@ -1026,6 +1045,7 @@ impl RoomService {
|
||||
room_id: room_id_inner,
|
||||
sender_type: sender_type.clone(),
|
||||
sender_id: None,
|
||||
model_id: Some(model_id),
|
||||
thread_id: None,
|
||||
content: full_content.clone(),
|
||||
content_type: "text".to_string(),
|
||||
@ -1062,7 +1082,7 @@ impl RoomService {
|
||||
content_type: "text".to_string(),
|
||||
send_at: now,
|
||||
seq,
|
||||
display_name: Some(ai_display_name.clone()),
|
||||
display_name: Some(ai_display_name_for_final.clone()),
|
||||
in_reply_to: None,
|
||||
reactions: None,
|
||||
message_id: None,
|
||||
@ -1092,6 +1112,7 @@ impl RoomService {
|
||||
content: String::new(),
|
||||
done: true,
|
||||
error: Some(e.to_string()),
|
||||
display_name: Some(ai_display_name.clone()),
|
||||
};
|
||||
room_manager.broadcast_stream_chunk(event).await;
|
||||
}
|
||||
@ -1134,6 +1155,7 @@ impl RoomService {
|
||||
project_id_for_ai,
|
||||
Uuid::now_v7(),
|
||||
response,
|
||||
model_id_inner,
|
||||
Some(model_display_name),
|
||||
)
|
||||
.await
|
||||
@ -1172,6 +1194,7 @@ impl RoomService {
|
||||
project_id: Uuid,
|
||||
_reply_to: Uuid,
|
||||
content: String,
|
||||
model_id: Uuid,
|
||||
model_display_name: Option<String>,
|
||||
) -> Result<Uuid, RoomError> {
|
||||
let now = Utc::now();
|
||||
@ -1184,6 +1207,7 @@ impl RoomService {
|
||||
room_id,
|
||||
sender_type: "ai".to_string(),
|
||||
sender_id: None,
|
||||
model_id: Some(model_id),
|
||||
thread_id: None,
|
||||
content: content.clone(),
|
||||
content_type: "text".to_string(),
|
||||
|
||||
@ -126,7 +126,7 @@ pub struct RoomUpdateRequest {
|
||||
pub category: Option<Uuid>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct RoomResponse {
|
||||
pub id: Uuid,
|
||||
pub project: Uuid,
|
||||
@ -157,7 +157,7 @@ pub struct RoomMemberReadSeqRequest {
|
||||
pub last_read_seq: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
|
||||
pub struct RoomMemberResponse {
|
||||
pub room: Uuid,
|
||||
pub user: Uuid,
|
||||
@ -192,6 +192,17 @@ pub struct RoomMessageUpdateRequest {
|
||||
pub content: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, utoipa::ToSchema)]
|
||||
pub struct RoomMessageSearchRequest {
|
||||
pub q: String,
|
||||
pub start_time: Option<DateTime<Utc>>,
|
||||
pub end_time: Option<DateTime<Utc>>,
|
||||
pub sender_id: Option<Uuid>,
|
||||
pub content_type: Option<String>,
|
||||
pub limit: Option<u64>,
|
||||
pub offset: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]
|
||||
pub struct RoomMessageResponse {
|
||||
pub id: Uuid,
|
||||
@ -208,6 +219,16 @@ pub struct RoomMessageResponse {
|
||||
pub send_at: DateTime<Utc>,
|
||||
pub revoked: Option<DateTime<Utc>>,
|
||||
pub revoked_by: Option<Uuid>,
|
||||
/// Highlighted content with <mark> tags around matched terms (for search results)
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub highlighted_content: Option<String>,
|
||||
}
|
||||
|
||||
/// Search result wrapper (keeps API compatibility)
|
||||
#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]
|
||||
pub struct RoomMessageSearchResult {
|
||||
#[serde(flatten)]
|
||||
pub message: RoomMessageResponse,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]
|
||||
@ -285,6 +306,8 @@ pub enum NotificationType {
|
||||
RoomCreated,
|
||||
RoomDeleted,
|
||||
SystemAnnouncement,
|
||||
ProjectInvitation,
|
||||
WorkspaceInvitation,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]
|
||||
|
||||
Loading…
Reference in New Issue
Block a user