From 2b7777adbc7f137b10f6a16fc0e5513a1ebbed77 Mon Sep 17 00:00:00 2001 From: zhenyi <434836402@qq.com> Date: Sat, 30 May 2026 15:07:12 +0800 Subject: [PATCH] refactor: update channel HTTP handlers (ban, message, room, thread) --- lib/channel/http/handler/ban.rs | 1 + lib/channel/http/handler/message.rs | 107 ++++++++++++++++++++-------- lib/channel/http/handler/mod.rs | 7 +- lib/channel/http/handler/room.rs | 4 +- lib/channel/http/handler/thread.rs | 75 ++++++++++++++++++- 5 files changed, 161 insertions(+), 33 deletions(-) diff --git a/lib/channel/http/handler/ban.rs b/lib/channel/http/handler/ban.rs index 1fe8760..975a7a7 100644 --- a/lib/channel/http/handler/ban.rs +++ b/lib/channel/http/handler/ban.rs @@ -53,6 +53,7 @@ impl WsHandler { workspace: Uuid, user: Uuid, ) -> ChannelResult> { + Self::ensure_workspace_member(bus, _user_id, workspace).await?; db::sqlx::query( "DELETE FROM user_blacklist WHERE \"user\" = $1 AND black = $2", ) diff --git a/lib/channel/http/handler/message.rs b/lib/channel/http/handler/message.rs index de60d10..a23e6a6 100644 --- a/lib/channel/http/handler/message.rs +++ b/lib/channel/http/handler/message.rs @@ -184,15 +184,27 @@ impl WsHandler { // ── End auto-thread logic ────────────────────────────────────── if let Some(thread_id) = effective_thread { - let exists: Option<(Uuid,)> = db::sqlx::query_as( - "SELECT id FROM room_thread WHERE id = $1 AND room = $2", + let thread_row: Option<(bool, bool)> = db::sqlx::query_as( + "SELECT locked, archived FROM room_thread WHERE id = $1 AND room = $2", ) .bind(thread_id) .bind(room) .fetch_optional(bus.inner.db.reader()) .await?; - if exists.is_none() { - return Err(ChannelError::RoomNotFound); + match thread_row { + None => return Err(ChannelError::RoomNotFound), + Some((locked, archived)) => { + if locked { + return Err(ChannelError::Validation( + "thread is resolved".to_string(), + )); + } + if archived { + return Err(ChannelError::Validation( + "thread is archived".to_string(), + )); + } + } } } @@ -458,30 +470,67 @@ impl WsHandler { room: Uuid, seq: i64, limit: Option, + thread: Option, ) -> ChannelResult> { Self::ensure_room_access(bus, user_id, room).await?; let size = limit .unwrap_or(MAX_MESSAGES_PER_REQUEST) .min(MAX_MESSAGES_PER_REQUEST) as i64; - let rows = db::sqlx::query_as::<_, model::room::RoomMessageModel>( - "(SELECT id, room, seq, thread, parent, author, content, content_type, pinned, \ - system_type, metadata, edited_at, created_at, updated_at, deleted_at \ - FROM room_message \ - WHERE room = $1 AND seq < $2 AND deleted_at IS NULL \ - ORDER BY seq DESC LIMIT $3) \ - UNION ALL \ - (SELECT id, room, seq, thread, parent, author, content, content_type, pinned, \ - system_type, metadata, edited_at, created_at, updated_at, deleted_at \ - FROM room_message \ - WHERE room = $1 AND seq >= $2 AND deleted_at IS NULL \ - ORDER BY seq ASC LIMIT $3) \ - ORDER BY seq ASC", - ) - .bind(room) - .bind(seq) - .bind(size) - .fetch_all(bus.inner.db.reader()) - .await?; + + let rows = if let Some(tid) = thread { + // Pre-fetch starter_message to avoid per-row subquery + let starter: Option<(Uuid,)> = db::sqlx::query_as( + "SELECT starter_message FROM room_thread WHERE id = $1", + ) + .bind(tid) + .fetch_optional(bus.inner.db.reader()) + .await?; + let starter_id = starter.map(|r| r.0); + + db::sqlx::query_as::<_, model::room::RoomMessageModel>( + "(SELECT id, room, seq, thread, parent, author, content, content_type, pinned, \ + system_type, metadata, edited_at, created_at, updated_at, deleted_at \ + FROM room_message \ + WHERE room = $1 AND seq < $2 AND deleted_at IS NULL \ + AND (thread = $4 OR ($5 IS NOT NULL AND id = $5)) \ + ORDER BY seq DESC LIMIT $3) \ + UNION ALL \ + (SELECT id, room, seq, thread, parent, author, content, content_type, pinned, \ + system_type, metadata, edited_at, created_at, updated_at, deleted_at \ + FROM room_message \ + WHERE room = $1 AND seq >= $2 AND deleted_at IS NULL \ + AND (thread = $4 OR ($5 IS NOT NULL AND id = $5)) \ + ORDER BY seq ASC LIMIT $3) \ + ORDER BY seq ASC", + ) + .bind(room) + .bind(seq) + .bind(size) + .bind(tid) + .bind(starter_id) + .fetch_all(bus.inner.db.reader()) + .await? + } else { + db::sqlx::query_as::<_, model::room::RoomMessageModel>( + "(SELECT id, room, seq, thread, parent, author, content, content_type, pinned, \ + system_type, metadata, edited_at, created_at, updated_at, deleted_at \ + FROM room_message \ + WHERE room = $1 AND seq < $2 AND deleted_at IS NULL AND thread IS NULL \ + ORDER BY seq DESC LIMIT $3) \ + UNION ALL \ + (SELECT id, room, seq, thread, parent, author, content, content_type, pinned, \ + system_type, metadata, edited_at, created_at, updated_at, deleted_at \ + FROM room_message \ + WHERE room = $1 AND seq >= $2 AND deleted_at IS NULL AND thread IS NULL \ + ORDER BY seq ASC LIMIT $3) \ + ORDER BY seq ASC", + ) + .bind(room) + .bind(seq) + .bind(size) + .fetch_all(bus.inner.db.reader()) + .await? + }; let author_ids: Vec = rows.iter().map(|r| r.author).collect(); let user_map = bus.lookup_users(&author_ids).await.unwrap_or_default(); let message_ids: Vec = rows.iter().map(|r| r.id).collect(); @@ -563,13 +612,13 @@ impl WsHandler { room: missed_room.clone(), sender_type: "user".to_string(), sender, - thread: None, - in_reply_to: None, + thread: m.thread, + in_reply_to: m.parent, content: m.content, - content_type: "text".to_string(), - pinned: false, - system_type: None, - metadata: serde_json::Value::Null, + content_type: m.content_type, + pinned: m.pinned, + system_type: m.system_type, + metadata: m.metadata, thinking_content: None, thinking_is_chunked: None, send_at: m.send_at, diff --git a/lib/channel/http/handler/mod.rs b/lib/channel/http/handler/mod.rs index 4ba1406..2ba6cc6 100644 --- a/lib/channel/http/handler/mod.rs +++ b/lib/channel/http/handler/mod.rs @@ -73,8 +73,8 @@ impl WsHandler { ) .await } - WsInMessage::MessageAround { room, seq, limit } => { - Self::message_around(bus, user_id, room, seq, limit).await + WsInMessage::MessageAround { room, seq, limit, thread } => { + Self::message_around(bus, user_id, room, seq, limit, thread).await } WsInMessage::MessageCreate { room, @@ -180,6 +180,9 @@ impl WsHandler { WsInMessage::ThreadCreate { room, parent } => { Self::thread_create(bus, user_id, room, parent).await } + WsInMessage::ThreadList { room } => { + Self::thread_list(bus, user_id, room).await + } WsInMessage::ThreadResolve { thread_id } => { Self::thread_resolve(bus, user_id, thread_id).await } diff --git a/lib/channel/http/handler/room.rs b/lib/channel/http/handler/room.rs index a72518c..f1ca72a 100644 --- a/lib/channel/http/handler/room.rs +++ b/lib/channel/http/handler/room.rs @@ -202,10 +202,11 @@ impl WsHandler { pub(super) async fn access_grant( bus: &ChannelBus, - _user_id: Uuid, + user_id: Uuid, room: Uuid, target_user: Uuid, ) -> ChannelResult> { + Self::ensure_room_access(bus, user_id, room).await?; db::sqlx::query( "INSERT INTO room_permission_overwrite \ (room, target_type, target_id, allow_permissions, deny_permissions, created_at, updated_at) \ @@ -235,6 +236,7 @@ impl WsHandler { room: Uuid, target_user: Uuid, ) -> ChannelResult> { + Self::ensure_room_access(bus, user_id, room).await?; db::sqlx::query( "DELETE FROM room_permission_overwrite \ WHERE room = $1 AND target_type = 'user' AND target_id = $2", diff --git a/lib/channel/http/handler/thread.rs b/lib/channel/http/handler/thread.rs index 95c7d00..a3f4b95 100644 --- a/lib/channel/http/handler/thread.rs +++ b/lib/channel/http/handler/thread.rs @@ -1,4 +1,4 @@ -use chrono::Utc; +use chrono::{DateTime, Utc}; use uuid::Uuid; use crate::event::{RoomInfo, UserInfo, thread}; @@ -7,7 +7,80 @@ use crate::{ChannelBus, ChannelError, ChannelResult}; use super::WsOutEvent; use super::WsHandler; +/// Helper struct for thread_list JOIN query result +#[derive(db::sqlx::FromRow)] +struct ThreadListRow { + id: Uuid, + room: Uuid, + seq: i64, + starter_message: Option, + title: String, + created_by: Uuid, + archived: bool, + locked: bool, + last_message_at: Option>, + created_at: DateTime, + updated_at: DateTime, + archived_at: Option>, + parent_seq: i64, +} + impl WsHandler { + pub(super) async fn thread_list( + bus: &ChannelBus, + user_id: Uuid, + room: Uuid, + ) -> ChannelResult> { + Self::ensure_room_access(bus, user_id, room).await?; + + // Join room_thread with room_message to get the parent message's seq + let rows = db::sqlx::query_as::<_, ThreadListRow>( + "SELECT rt.id, rt.room, rt.seq, rt.starter_message, rt.title, rt.created_by, \ + rt.archived, rt.locked, rt.last_message_at, rt.created_at, rt.updated_at, rt.archived_at, \ + COALESCE(rm.seq, 0) as parent_seq \ + FROM room_thread rt \ + LEFT JOIN room_message rm ON rm.id = rt.starter_message \ + WHERE rt.room = $1 ORDER BY rt.last_message_at DESC NULLS LAST", + ) + .bind(room) + .fetch_all(bus.inner.db.reader()) + .await?; + + let mut items = Vec::new(); + for row in rows { + let tc_room = bus.lookup_room(row.room).await + .unwrap_or_else(|_| RoomInfo::unknown(row.room)); + let created_by = bus.lookup_user(row.created_by).await + .unwrap_or_else(|_| UserInfo::unknown(row.created_by)); + // Get last message preview + let preview: Option<(String,)> = db::sqlx::query_as( + "SELECT content FROM room_message \ + WHERE thread = $1 AND deleted_at IS NULL \ + ORDER BY seq DESC LIMIT 1", + ) + .bind(row.id) + .fetch_optional(bus.inner.db.reader()) + .await?; + items.push(thread::ThreadListItem { + id: row.id, + room: tc_room, + seq: row.seq, + parent_seq: row.parent_seq, + title: row.title, + created_by, + archived: row.archived, + locked: row.locked, + last_message_at: row.last_message_at, + last_message_preview: preview.map(|p| p.0), + created_at: row.created_at, + }); + } + + Ok(Some(WsOutEvent::ThreadList { + data: thread::ThreadListService { threads: items }, + })) + } + pub(super) async fn thread_create( bus: &ChannelBus, user_id: Uuid,