diff --git a/libs/api/room/ws_universal.rs b/libs/api/room/ws_universal.rs index 478893f..d4694eb 100644 --- a/libs/api/room/ws_universal.rs +++ b/libs/api/room/ws_universal.rs @@ -196,7 +196,7 @@ pub async fn ws_universal( let _ = session.close(Some(actix_ws::CloseCode::Normal.into())).await; break; } - push_event = poll_push_streams(&mut push_streams, &manager, user_id) => { + push_event = poll_push_streams(&mut push_streams, &manager, &handler.service(), user_id) => { match push_event { Some(WsPushEvent::RoomMessage { room_id, event }) => { let payload = serde_json::json!({ @@ -372,6 +372,7 @@ pub async fn ws_universal( async fn poll_push_streams( streams: &mut PushStreams, manager: &Arc, + service: &Arc, user_id: Uuid, ) -> Option { loop { @@ -412,16 +413,21 @@ async fn poll_push_streams( } } - // Re-subscribe dead rooms so we don't permanently lose events + // Re-subscribe dead rooms so we don't permanently lose events. + // Re-check access in case the user's permissions were revoked while the + // stream was dead. for room_id in dead_rooms { if streams.remove(&room_id).is_some() { - if let Ok(rx) = manager.subscribe(room_id, user_id).await { - let stream_rx = manager.subscribe_room_stream(room_id).await; - streams.insert(room_id, ( - BroadcastStream::new(rx), - BroadcastStream::new(stream_rx), - )); + if service.room.check_room_access(room_id, user_id).await.is_ok() { + if let Ok(rx) = manager.subscribe(room_id, user_id).await { + let stream_rx = manager.subscribe_room_stream(room_id).await; + streams.insert(room_id, ( + BroadcastStream::new(rx), + BroadcastStream::new(stream_rx), + )); + } } + // If access check fails, silently skip re-subscribe (user was removed) } } diff --git a/libs/room/src/helpers.rs b/libs/room/src/helpers.rs index f576830..80b9877 100644 --- a/libs/room/src/helpers.rs +++ b/libs/room/src/helpers.rs @@ -257,24 +257,6 @@ impl RoomService { } } - pub(crate) async fn next_room_message_seq( - &self, - room_id: Uuid, - db: &C, - ) -> Result - where - C: ConnectionTrait, - { - let max_seq: Option> = room_message::Entity::find() - .filter(room_message::Column::Room.eq(room_id)) - .select_only() - .column_as(room_message::Column::Seq.max(), "max_seq") - .into_tuple::>() - .one(db) - .await?; - Ok(max_seq.flatten().unwrap_or(0) + 1) - } - pub async fn utils_find_project_by_name( &self, name: String, diff --git a/libs/room/src/message.rs b/libs/room/src/message.rs index 4f5b873..381e20e 100644 --- a/libs/room/src/message.rs +++ b/libs/room/src/message.rs @@ -134,7 +134,7 @@ impl RoomService { } } - let seq = self.next_room_message_seq(room_id, &self.db).await?; + let seq = Self::next_room_message_seq_internal(room_id, &self.db, &self.cache).await?; let now = Utc::now(); let id = Uuid::now_v7(); let project_id = room_model.project; @@ -207,6 +207,17 @@ impl RoomService { .await; let mentioned_users = self.resolve_mentions(&request.content).await; + // Look up sender display name once for all mention notifications + let sender_display_name = { + let user = user_model::Entity::find() + .filter(user_model::Column::Uid.eq(user_id)) + .one(&self.db) + .await + .ok() + .flatten(); + user.map(|u| u.display_name.unwrap_or_else(|| u.username)) + .unwrap_or_else(|| user_id.to_string()) + }; for mentioned_user_id in mentioned_users { if mentioned_user_id == user_id { continue; @@ -215,7 +226,7 @@ impl RoomService { .notification_create(super::NotificationCreateRequest { notification_type: super::NotificationType::Mention, user_id: mentioned_user_id, - title: format!("{} 在 {} 中提到了你", user_id, room_model.room_name), + title: format!("{} 在 {} 中提到了你", sender_display_name, room_model.room_name), content: Some(content.clone()), room_id: Some(room_id), project_id, @@ -228,7 +239,13 @@ impl RoomService { .await; } - let should_respond = self.should_ai_respond(room_id).await.unwrap_or(false); + let should_respond = match self.should_ai_respond(room_id).await { + Ok(v) => v, + Err(e) => { + slog::warn!(self.log, "should_ai_respond failed for room {}: {}", room_id, e); + false + } + }; let is_text_message = request .content_type .as_ref() @@ -243,23 +260,13 @@ impl RoomService { } } - let display_name = { - let user = user_model::Entity::find() - .filter(user_model::Column::Uid.eq(user_id)) - .one(&self.db) - .await - .ok() - .flatten(); - user.map(|u| u.display_name.unwrap_or_else(|| u.username)) - }; - Ok(super::RoomMessageResponse { id, seq, room: room_id, sender_type: "member".to_string(), sender_id: Some(user_id), - display_name, + display_name: Some(sender_display_name), thread: thread_id, in_reply_to, content: request.content, diff --git a/libs/room/src/reaction.rs b/libs/room/src/reaction.rs index 9a0dbfe..a56d1d7 100644 --- a/libs/room/src/reaction.rs +++ b/libs/room/src/reaction.rs @@ -54,20 +54,29 @@ impl RoomService { created_at: Set(now), }; - let result = room_message_reaction::Entity::insert(reaction) - .on_conflict( - OnConflict::columns([ - room_message_reaction::Column::Message, - room_message_reaction::Column::User, - room_message_reaction::Column::Emoji, - ]) - .do_nothing() - .to_owned(), - ) - .exec(&self.db) - .await; + // Check if reaction already exists before inserting + let existing = room_message_reaction::Entity::find() + .filter(room_message_reaction::Column::Message.eq(message_id)) + .filter(room_message_reaction::Column::User.eq(user_id)) + .filter(room_message_reaction::Column::Emoji.eq(&emoji)) + .one(&self.db) + .await?; - if result.is_ok() { + if existing.is_none() { + room_message_reaction::Entity::insert(reaction) + .on_conflict( + OnConflict::columns([ + room_message_reaction::Column::Message, + room_message_reaction::Column::User, + room_message_reaction::Column::Emoji, + ]) + .do_nothing() + .to_owned(), + ) + .exec(&self.db) + .await?; + + // Only publish if we actually inserted a new reaction let reactions = self .get_message_reactions(message_id, Some(user_id)) .await?; diff --git a/libs/room/src/room.rs b/libs/room/src/room.rs index 1c2e941..dc6d3ba 100644 --- a/libs/room/src/room.rs +++ b/libs/room/src/room.rs @@ -273,6 +273,19 @@ impl RoomService { self.room_manager.shutdown_room(room_id).await; + // Clean up Redis seq key so re-creating the room starts fresh + let seq_key = format!("room:seq:{}", room_id); + if let Ok(mut conn) = self.cache.conn().await { + let _: Option = redis::cmd("DEL") + .arg(&seq_key) + .query_async(&mut conn) + .await + .inspect_err(|e| { + slog::warn!(self.log, "room_delete: failed to DEL seq key {}: {}", seq_key, e); + }) + .ok(); + } + let event = ProjectRoomEvent { event_type: super::RoomEventType::RoomDeleted.as_str().into(), project_id, diff --git a/libs/room/src/service.rs b/libs/room/src/service.rs index bc52585..bb775f7 100644 --- a/libs/room/src/service.rs +++ b/libs/room/src/service.rs @@ -1112,7 +1112,7 @@ impl RoomService { Vec::new() } - async fn next_room_message_seq_internal( + pub(crate) async fn next_room_message_seq_internal( room_id: Uuid, db: &AppDatabase, cache: &AppCache, diff --git a/src/components/room/RoomChatPanel.tsx b/src/components/room/RoomChatPanel.tsx index d33c41e..0d418e4 100644 --- a/src/components/room/RoomChatPanel.tsx +++ b/src/components/room/RoomChatPanel.tsx @@ -131,7 +131,7 @@ const ChatInputArea = memo(function ChatInputArea({ {replyingTo && (
Replying to {replyingTo.display_name} - {replyingTo.content} + {replyingTo.content.length > 80 ? replyingTo.content.slice(0, 80) + '…' : replyingTo.content} diff --git a/src/components/room/RoomMessageList.tsx b/src/components/room/RoomMessageList.tsx index 266c437..6b9ee33 100644 --- a/src/components/room/RoomMessageList.tsx +++ b/src/components/room/RoomMessageList.tsx @@ -345,12 +345,8 @@ export const RoomMessageList = memo(function RoomMessageList({ return (