diff --git a/lib/channel/http/out_event.rs b/lib/channel/http/out_event.rs index 4b9bdcf..05fee98 100644 --- a/lib/channel/http/out_event.rs +++ b/lib/channel/http/out_event.rs @@ -118,6 +118,9 @@ pub enum WsOutEvent { room: RoomInfo, data: thread::ThreadArchivedService, }, + ThreadList { + data: thread::ThreadListService, + }, CategoryCreated { workspace: WorkspaceInfo, data: category::CategoryCreatedService, diff --git a/lib/channel/http/types.rs b/lib/channel/http/types.rs index aa6b56b..cf63640 100644 --- a/lib/channel/http/types.rs +++ b/lib/channel/http/types.rs @@ -36,6 +36,7 @@ pub enum WsInMessage { room: Uuid, seq: i64, limit: Option, + thread: Option, }, MessageCreate { room: Uuid, @@ -120,6 +121,9 @@ pub enum WsInMessage { room: Uuid, parent: i64, }, + ThreadList { + room: Uuid, + }, ThreadResolve { thread_id: Uuid, }, @@ -298,6 +302,7 @@ impl WsInMessage { ReactionAdd, ReactionRemove, ThreadCreate, + ThreadList, PinAdd, PinRemove, DraftSave, diff --git a/lib/channel/http/ws.rs b/lib/channel/http/ws.rs index f4d3373..3fc92d9 100644 --- a/lib/channel/http/ws.rs +++ b/lib/channel/http/ws.rs @@ -55,7 +55,7 @@ async fn handle_inbound(bus: &ChannelBus, socket: &Socket, data: EventPayload) { let pong = WsOutEvent::Pong { protocol_version: super::types::WS_PROTOCOL_VERSION, }; - send_event(socket, &pong).await.ok(); + send_event(socket, &pong).await; return; } if !check_rate_limit(bus, user_id).await { @@ -85,7 +85,7 @@ async fn handle_inbound(bus: &ChannelBus, socket: &Socket, data: EventPayload) { request_id: rid, data: serde_json::to_value(&event).unwrap_or_default(), }; - send_event(socket, &resp).await.ok(); + send_event(socket, &resp).await; } Ok(None) => { let rid = request_id.unwrap_or(Uuid::nil()); @@ -93,7 +93,7 @@ async fn handle_inbound(bus: &ChannelBus, socket: &Socket, data: EventPayload) { request_id: rid, data: serde_json::json!({"ok": true}), }; - send_event(socket, &ack).await.ok(); + send_event(socket, &ack).await; } Err(e) => { tracing::warn!(user_id = %user_id, error = %e, "WS message processing failed"); @@ -103,20 +103,31 @@ async fn handle_inbound(bus: &ChannelBus, socket: &Socket, data: EventPayload) { data: serde_json::to_value(&e.to_ws_error()) .unwrap_or_default(), }; - send_event(socket, &err_resp).await.ok(); + send_event(socket, &err_resp).await; } }, Err(e) => { tracing::warn!(error = %e, "WS transport parse error"); - send_error( - socket, - WsError { - code: 400, - error: "parse_error".to_string(), - message: e.to_string(), - }, - ) - .await; + if let Some(rid) = request_id { + let err_resp = WsOutEvent::Response { + request_id: rid, + data: serde_json::to_value(&WsError { + code: 400, + error: "parse_error".to_string(), + message: e.to_string(), + }).unwrap_or_default(), + }; + send_event(socket, &err_resp).await; + } else { + send_error( + socket, + WsError { + code: 400, + error: "parse_error".to_string(), + message: e.to_string(), + }, + ).await; + } } } } @@ -129,25 +140,22 @@ async fn check_rate_limit(bus: &ChannelBus, user_id: Uuid) -> bool { .unwrap_or(true) } -async fn send_event(socket: &Socket, event: &WsOutEvent) -> ChannelResult<()> { - let json = serde_json::to_string(event)?; - socket - .emit(CHANNEL_EVENT, &json) - .await - .map_err(|e| { - tracing::warn!(error = %e, "WS send failed"); - ChannelError::SocketIo(e) - }) +async fn send_event(socket: &Socket, event: &WsOutEvent) { + // Serialize to Value (not String) to avoid double JSON encoding + let value = serde_json::to_value(event).unwrap_or_default(); + if let Err(e) = socket.emit(CHANNEL_EVENT, value).await { + tracing::warn!(error = %e, "WS response send failed"); + } } async fn send_error(socket: &Socket, error: WsError) { - let json = serde_json::json!({ + let value = serde_json::json!({ "type": "error", "code": error.code, "error": error.error, "message": error.message, }); - if let Err(e) = socket.emit(CHANNEL_EVENT, json.to_string()).await { + if let Err(e) = socket.emit(CHANNEL_EVENT, value).await { tracing::warn!(error = %e, "WS error send failed"); } }