fix(room): resolve remaining defects from second review
- reaction.rs: query before insert to detect new vs duplicate reactions, only publish Redis event when a reaction was actually added - room.rs: delete Redis seq key on room deletion to prevent seq collision on re-creation - message.rs: use Redis-atomic next_room_message_seq_internal for concurrent safety; look up sender display name once for both mention notifications and response body; add warn log when should_ai_respond fails instead of silent unwrap_or(false) - ws_universal.rs: re-check room access permission when re-subscribing dead streams after error to prevent revoked permissions being bypassed - RoomChatPanel.tsx: truncate reply preview content to 80 chars - RoomMessageList.tsx: remove redundant inline style on message row div
This commit is contained in:
parent
5256e72be7
commit
60d8c3a617
@ -196,7 +196,7 @@ pub async fn ws_universal(
|
|||||||
let _ = session.close(Some(actix_ws::CloseCode::Normal.into())).await;
|
let _ = session.close(Some(actix_ws::CloseCode::Normal.into())).await;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
push_event = poll_push_streams(&mut push_streams, &manager, user_id) => {
|
push_event = poll_push_streams(&mut push_streams, &manager, &handler.service(), user_id) => {
|
||||||
match push_event {
|
match push_event {
|
||||||
Some(WsPushEvent::RoomMessage { room_id, event }) => {
|
Some(WsPushEvent::RoomMessage { room_id, event }) => {
|
||||||
let payload = serde_json::json!({
|
let payload = serde_json::json!({
|
||||||
@ -372,6 +372,7 @@ pub async fn ws_universal(
|
|||||||
async fn poll_push_streams(
|
async fn poll_push_streams(
|
||||||
streams: &mut PushStreams,
|
streams: &mut PushStreams,
|
||||||
manager: &Arc<RoomConnectionManager>,
|
manager: &Arc<RoomConnectionManager>,
|
||||||
|
service: &Arc<AppService>,
|
||||||
user_id: Uuid,
|
user_id: Uuid,
|
||||||
) -> Option<WsPushEvent> {
|
) -> Option<WsPushEvent> {
|
||||||
loop {
|
loop {
|
||||||
@ -412,16 +413,21 @@ async fn poll_push_streams(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Re-subscribe dead rooms so we don't permanently lose events
|
// Re-subscribe dead rooms so we don't permanently lose events.
|
||||||
|
// Re-check access in case the user's permissions were revoked while the
|
||||||
|
// stream was dead.
|
||||||
for room_id in dead_rooms {
|
for room_id in dead_rooms {
|
||||||
if streams.remove(&room_id).is_some() {
|
if streams.remove(&room_id).is_some() {
|
||||||
if let Ok(rx) = manager.subscribe(room_id, user_id).await {
|
if service.room.check_room_access(room_id, user_id).await.is_ok() {
|
||||||
let stream_rx = manager.subscribe_room_stream(room_id).await;
|
if let Ok(rx) = manager.subscribe(room_id, user_id).await {
|
||||||
streams.insert(room_id, (
|
let stream_rx = manager.subscribe_room_stream(room_id).await;
|
||||||
BroadcastStream::new(rx),
|
streams.insert(room_id, (
|
||||||
BroadcastStream::new(stream_rx),
|
BroadcastStream::new(rx),
|
||||||
));
|
BroadcastStream::new(stream_rx),
|
||||||
|
));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
// If access check fails, silently skip re-subscribe (user was removed)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -257,24 +257,6 @@ impl RoomService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn next_room_message_seq<C>(
|
|
||||||
&self,
|
|
||||||
room_id: Uuid,
|
|
||||||
db: &C,
|
|
||||||
) -> Result<i64, RoomError>
|
|
||||||
where
|
|
||||||
C: ConnectionTrait,
|
|
||||||
{
|
|
||||||
let max_seq: Option<Option<i64>> = room_message::Entity::find()
|
|
||||||
.filter(room_message::Column::Room.eq(room_id))
|
|
||||||
.select_only()
|
|
||||||
.column_as(room_message::Column::Seq.max(), "max_seq")
|
|
||||||
.into_tuple::<Option<i64>>()
|
|
||||||
.one(db)
|
|
||||||
.await?;
|
|
||||||
Ok(max_seq.flatten().unwrap_or(0) + 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn utils_find_project_by_name(
|
pub async fn utils_find_project_by_name(
|
||||||
&self,
|
&self,
|
||||||
name: String,
|
name: String,
|
||||||
|
|||||||
@ -134,7 +134,7 @@ impl RoomService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let seq = self.next_room_message_seq(room_id, &self.db).await?;
|
let seq = Self::next_room_message_seq_internal(room_id, &self.db, &self.cache).await?;
|
||||||
let now = Utc::now();
|
let now = Utc::now();
|
||||||
let id = Uuid::now_v7();
|
let id = Uuid::now_v7();
|
||||||
let project_id = room_model.project;
|
let project_id = room_model.project;
|
||||||
@ -207,6 +207,17 @@ impl RoomService {
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
let mentioned_users = self.resolve_mentions(&request.content).await;
|
let mentioned_users = self.resolve_mentions(&request.content).await;
|
||||||
|
// Look up sender display name once for all mention notifications
|
||||||
|
let sender_display_name = {
|
||||||
|
let user = user_model::Entity::find()
|
||||||
|
.filter(user_model::Column::Uid.eq(user_id))
|
||||||
|
.one(&self.db)
|
||||||
|
.await
|
||||||
|
.ok()
|
||||||
|
.flatten();
|
||||||
|
user.map(|u| u.display_name.unwrap_or_else(|| u.username))
|
||||||
|
.unwrap_or_else(|| user_id.to_string())
|
||||||
|
};
|
||||||
for mentioned_user_id in mentioned_users {
|
for mentioned_user_id in mentioned_users {
|
||||||
if mentioned_user_id == user_id {
|
if mentioned_user_id == user_id {
|
||||||
continue;
|
continue;
|
||||||
@ -215,7 +226,7 @@ impl RoomService {
|
|||||||
.notification_create(super::NotificationCreateRequest {
|
.notification_create(super::NotificationCreateRequest {
|
||||||
notification_type: super::NotificationType::Mention,
|
notification_type: super::NotificationType::Mention,
|
||||||
user_id: mentioned_user_id,
|
user_id: mentioned_user_id,
|
||||||
title: format!("{} 在 {} 中提到了你", user_id, room_model.room_name),
|
title: format!("{} 在 {} 中提到了你", sender_display_name, room_model.room_name),
|
||||||
content: Some(content.clone()),
|
content: Some(content.clone()),
|
||||||
room_id: Some(room_id),
|
room_id: Some(room_id),
|
||||||
project_id,
|
project_id,
|
||||||
@ -228,7 +239,13 @@ impl RoomService {
|
|||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
let should_respond = self.should_ai_respond(room_id).await.unwrap_or(false);
|
let should_respond = match self.should_ai_respond(room_id).await {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(e) => {
|
||||||
|
slog::warn!(self.log, "should_ai_respond failed for room {}: {}", room_id, e);
|
||||||
|
false
|
||||||
|
}
|
||||||
|
};
|
||||||
let is_text_message = request
|
let is_text_message = request
|
||||||
.content_type
|
.content_type
|
||||||
.as_ref()
|
.as_ref()
|
||||||
@ -243,23 +260,13 @@ impl RoomService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let display_name = {
|
|
||||||
let user = user_model::Entity::find()
|
|
||||||
.filter(user_model::Column::Uid.eq(user_id))
|
|
||||||
.one(&self.db)
|
|
||||||
.await
|
|
||||||
.ok()
|
|
||||||
.flatten();
|
|
||||||
user.map(|u| u.display_name.unwrap_or_else(|| u.username))
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(super::RoomMessageResponse {
|
Ok(super::RoomMessageResponse {
|
||||||
id,
|
id,
|
||||||
seq,
|
seq,
|
||||||
room: room_id,
|
room: room_id,
|
||||||
sender_type: "member".to_string(),
|
sender_type: "member".to_string(),
|
||||||
sender_id: Some(user_id),
|
sender_id: Some(user_id),
|
||||||
display_name,
|
display_name: Some(sender_display_name),
|
||||||
thread: thread_id,
|
thread: thread_id,
|
||||||
in_reply_to,
|
in_reply_to,
|
||||||
content: request.content,
|
content: request.content,
|
||||||
|
|||||||
@ -54,20 +54,29 @@ impl RoomService {
|
|||||||
created_at: Set(now),
|
created_at: Set(now),
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = room_message_reaction::Entity::insert(reaction)
|
// Check if reaction already exists before inserting
|
||||||
.on_conflict(
|
let existing = room_message_reaction::Entity::find()
|
||||||
OnConflict::columns([
|
.filter(room_message_reaction::Column::Message.eq(message_id))
|
||||||
room_message_reaction::Column::Message,
|
.filter(room_message_reaction::Column::User.eq(user_id))
|
||||||
room_message_reaction::Column::User,
|
.filter(room_message_reaction::Column::Emoji.eq(&emoji))
|
||||||
room_message_reaction::Column::Emoji,
|
.one(&self.db)
|
||||||
])
|
.await?;
|
||||||
.do_nothing()
|
|
||||||
.to_owned(),
|
|
||||||
)
|
|
||||||
.exec(&self.db)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
if result.is_ok() {
|
if existing.is_none() {
|
||||||
|
room_message_reaction::Entity::insert(reaction)
|
||||||
|
.on_conflict(
|
||||||
|
OnConflict::columns([
|
||||||
|
room_message_reaction::Column::Message,
|
||||||
|
room_message_reaction::Column::User,
|
||||||
|
room_message_reaction::Column::Emoji,
|
||||||
|
])
|
||||||
|
.do_nothing()
|
||||||
|
.to_owned(),
|
||||||
|
)
|
||||||
|
.exec(&self.db)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Only publish if we actually inserted a new reaction
|
||||||
let reactions = self
|
let reactions = self
|
||||||
.get_message_reactions(message_id, Some(user_id))
|
.get_message_reactions(message_id, Some(user_id))
|
||||||
.await?;
|
.await?;
|
||||||
|
|||||||
@ -273,6 +273,19 @@ impl RoomService {
|
|||||||
|
|
||||||
self.room_manager.shutdown_room(room_id).await;
|
self.room_manager.shutdown_room(room_id).await;
|
||||||
|
|
||||||
|
// Clean up Redis seq key so re-creating the room starts fresh
|
||||||
|
let seq_key = format!("room:seq:{}", room_id);
|
||||||
|
if let Ok(mut conn) = self.cache.conn().await {
|
||||||
|
let _: Option<String> = redis::cmd("DEL")
|
||||||
|
.arg(&seq_key)
|
||||||
|
.query_async(&mut conn)
|
||||||
|
.await
|
||||||
|
.inspect_err(|e| {
|
||||||
|
slog::warn!(self.log, "room_delete: failed to DEL seq key {}: {}", seq_key, e);
|
||||||
|
})
|
||||||
|
.ok();
|
||||||
|
}
|
||||||
|
|
||||||
let event = ProjectRoomEvent {
|
let event = ProjectRoomEvent {
|
||||||
event_type: super::RoomEventType::RoomDeleted.as_str().into(),
|
event_type: super::RoomEventType::RoomDeleted.as_str().into(),
|
||||||
project_id,
|
project_id,
|
||||||
|
|||||||
@ -1112,7 +1112,7 @@ impl RoomService {
|
|||||||
Vec::new()
|
Vec::new()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn next_room_message_seq_internal(
|
pub(crate) async fn next_room_message_seq_internal(
|
||||||
room_id: Uuid,
|
room_id: Uuid,
|
||||||
db: &AppDatabase,
|
db: &AppDatabase,
|
||||||
cache: &AppCache,
|
cache: &AppCache,
|
||||||
|
|||||||
@ -131,7 +131,7 @@ const ChatInputArea = memo(function ChatInputArea({
|
|||||||
{replyingTo && (
|
{replyingTo && (
|
||||||
<div className="mb-2 flex items-center gap-2 rounded-md bg-muted/50 px-3 py-2 text-xs">
|
<div className="mb-2 flex items-center gap-2 rounded-md bg-muted/50 px-3 py-2 text-xs">
|
||||||
<span className="font-medium text-foreground">Replying to {replyingTo.display_name}</span>
|
<span className="font-medium text-foreground">Replying to {replyingTo.display_name}</span>
|
||||||
<span className="truncate text-muted-foreground">{replyingTo.content}</span>
|
<span className="truncate text-muted-foreground" title={replyingTo.content}>{replyingTo.content.length > 80 ? replyingTo.content.slice(0, 80) + '…' : replyingTo.content}</span>
|
||||||
<button onClick={onCancelReply} className="ml-auto text-muted-foreground hover:text-foreground">
|
<button onClick={onCancelReply} className="ml-auto text-muted-foreground hover:text-foreground">
|
||||||
<X className="h-3 w-3" />
|
<X className="h-3 w-3" />
|
||||||
</button>
|
</button>
|
||||||
|
|||||||
@ -345,12 +345,8 @@ export const RoomMessageList = memo(function RoomMessageList({
|
|||||||
return (
|
return (
|
||||||
<div
|
<div
|
||||||
key={row.key}
|
key={row.key}
|
||||||
className="absolute left-0 w-full"
|
className="absolute left-0 top-0 w-full"
|
||||||
style={{
|
style={{
|
||||||
position: 'absolute',
|
|
||||||
top: 0,
|
|
||||||
left: 0,
|
|
||||||
width: '100%',
|
|
||||||
transform: `translateY(${virtualRow.start}px)`,
|
transform: `translateY(${virtualRow.start}px)`,
|
||||||
}}
|
}}
|
||||||
>
|
>
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user