use crate::error::RoomError; use crate::service::RoomService; use crate::ws_context::WsUserContext; use chrono::Utc; use models::rooms::room_message_reaction; use models::users::user as user_model; use queue::ReactionGroup; use sea_orm::*; use sea_query::OnConflict; use uuid::Uuid; #[derive(Debug, Clone, serde::Serialize, utoipa::ToSchema)] pub struct ReactionGroupResponse { pub emoji: String, pub count: i64, pub reacted_by_me: bool, pub users: Vec, } #[derive(Debug, Clone, serde::Serialize, utoipa::ToSchema)] pub struct MessageReactionsResponse { pub message_id: Uuid, pub reactions: Vec, } #[derive(Debug, Clone, serde::Serialize, utoipa::ToSchema)] pub struct MessageSearchResponse { pub messages: Vec, pub total: i64, } impl RoomService { pub async fn message_reaction_add( &self, message_id: Uuid, emoji: String, ctx: &WsUserContext, ) -> Result { let user_id = ctx.user_id; let message = self.find_message_or_404(message_id).await?; self.require_room_member(message.room, user_id).await?; Self::validate_emoji(&emoji)?; let now = Utc::now(); let reaction = room_message_reaction::ActiveModel { id: Set(Uuid::now_v7()), room: Set(message.room), message: Set(message_id), user: Set(user_id), emoji: Set(emoji.clone()), created_at: Set(now), }; let result = room_message_reaction::Entity::insert(reaction) .on_conflict( OnConflict::columns([ room_message_reaction::Column::Message, room_message_reaction::Column::User, room_message_reaction::Column::Emoji, ]) .do_nothing() .to_owned(), ) .exec(&self.db) .await; if result.is_ok() { let reactions = self .get_message_reactions(message_id, Some(user_id)) .await?; let reaction_groups = reactions .reactions .into_iter() .map(|g| ReactionGroup { emoji: g.emoji, count: g.count, reacted_by_me: g.reacted_by_me, users: g.users, }) .collect(); self.queue .publish_reaction_event(message.room, message_id, reaction_groups) .await; } self.get_message_reactions(message_id, Some(user_id)).await } pub async fn message_reaction_remove( &self, message_id: Uuid, emoji: String, ctx: &WsUserContext, ) -> Result { let user_id = ctx.user_id; let message = self.find_message_or_404(message_id).await?; self.require_room_member(message.room, user_id).await?; room_message_reaction::Entity::delete_many() .filter(room_message_reaction::Column::Message.eq(message_id)) .filter(room_message_reaction::Column::User.eq(user_id)) .filter(room_message_reaction::Column::Emoji.eq(emoji)) .exec(&self.db) .await?; let reactions = self .get_message_reactions(message_id, Some(user_id)) .await?; let reaction_groups = reactions .reactions .into_iter() .map(|g| ReactionGroup { emoji: g.emoji, count: g.count, reacted_by_me: g.reacted_by_me, users: g.users, }) .collect(); self.queue .publish_reaction_event(message.room, message_id, reaction_groups) .await; self.get_message_reactions(message_id, Some(user_id)).await } pub async fn message_reactions_get( &self, message_id: Uuid, ctx: &WsUserContext, ) -> Result { let user_id = ctx.user_id; let message = self.find_message_or_404(message_id).await?; self.require_room_member(message.room, user_id).await?; self.get_message_reactions(message_id, Some(user_id)).await } pub async fn message_reactions_batch( &self, room_id: Uuid, message_ids: Vec, ctx: &WsUserContext, ) -> Result, RoomError> { let user_id = ctx.user_id; self.require_room_member(room_id, user_id).await?; let mut results = Vec::with_capacity(message_ids.len()); for msg_id in message_ids { let reactions = self.get_message_reactions(msg_id, Some(user_id)).await?; results.push(reactions); } Ok(results) } pub async fn message_search( &self, room_id: Uuid, query: &str, limit: Option, offset: Option, ctx: &WsUserContext, ) -> Result { let user_id = ctx.user_id; self.require_room_member(room_id, user_id).await?; if query.trim().is_empty() { return Ok(MessageSearchResponse { messages: Vec::new(), total: 0, }); } let limit = limit.unwrap_or(20); let offset = offset.unwrap_or(0); let search_pattern = format!("%{}%", query); let query_builder = models::rooms::room_message::Entity::find() .filter(models::rooms::room_message::Column::Room.eq(room_id)) .filter(models::rooms::room_message::Column::Content.like(&search_pattern)) .filter(models::rooms::room_message::Column::Revoked.is_null()); let total = query_builder.clone().count(&self.db).await? as i64; let messages = query_builder .order_by_desc(models::rooms::room_message::Column::SendAt) .limit(limit) .offset(offset) .all(&self.db) .await?; let response_messages = self.build_messages_with_display_names(messages).await; Ok(MessageSearchResponse { messages: response_messages, total, }) } pub(crate) async fn find_message_or_404( &self, message_id: Uuid, ) -> Result { models::rooms::room_message::Entity::find_by_id(message_id) .one(&self.db) .await? .ok_or_else(|| RoomError::NotFound("Message not found".to_string())) } pub(crate) fn validate_emoji(emoji: &str) -> Result<(), RoomError> { if emoji.is_empty() || emoji.len() > 50 { return Err(RoomError::BadRequest("Invalid emoji format".to_string())); } Ok(()) } pub(crate) async fn get_message_reactions( &self, message_id: Uuid, current_user_id: Option, ) -> Result { let reactions = room_message_reaction::Entity::find() .filter(room_message_reaction::Column::Message.eq(message_id)) .all(&self.db) .await?; let reaction_groups = self.build_reaction_groups(reactions, current_user_id); Ok(MessageReactionsResponse { message_id, reactions: reaction_groups, }) } pub(crate) fn build_reaction_groups( &self, reactions: Vec, current_user_id: Option, ) -> Vec { let mut grouped: std::collections::HashMap> = std::collections::HashMap::new(); for r in &reactions { grouped.entry(r.emoji.clone()).or_default().push(r); } grouped .into_iter() .map(|(emoji, user_reactions)| { let count = user_reactions.len() as i64; let reacted_by_me = current_user_id .map(|uid| user_reactions.iter().any(|r| r.user == uid)) .unwrap_or(false); let users = user_reactions.iter().take(3).map(|r| r.user).collect(); ReactionGroupResponse { emoji, count, reacted_by_me, users, } }) .collect() } pub(crate) async fn build_messages_with_display_names( &self, messages: Vec, ) -> Vec { let user_ids: Vec = messages .iter() .filter(|m| m.sender_type.to_string() == "member") .filter_map(|m| m.sender_id) .collect(); let users: std::collections::HashMap = if !user_ids.is_empty() { user_model::Entity::find() .filter(user_model::Column::Uid.is_in(user_ids)) .all(&self.db) .await .unwrap_or_default() .into_iter() .map(|u| (u.uid, u.display_name.unwrap_or(u.username))) .collect() } else { std::collections::HashMap::new() }; messages .into_iter() .map(|msg| { let sender_type = msg.sender_type.to_string(); let display_name = match sender_type.as_str() { "member" => msg.sender_id.and_then(|id| users.get(&id).cloned()), _ => None, }; super::RoomMessageResponse { id: msg.id, seq: msg.seq, room: msg.room, sender_type, sender_id: msg.sender_id, display_name, thread: msg.thread, in_reply_to: msg.in_reply_to, content: msg.content, content_type: msg.content_type.to_string(), edited_at: msg.edited_at, send_at: msg.send_at, revoked: msg.revoked, revoked_by: msg.revoked_by, } }) .collect() } }