From 12eaa83b87904f2590e1d026a3cbdef7073ed88e Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Thu, 14 May 2026 10:02:36 +0800 Subject: [PATCH] refactor(transport): apply rustfmt formatting --- libs/transport/ack.rs | 61 +++-- libs/transport/bus.rs | 40 ++-- libs/transport/dedup.rs | 14 +- libs/transport/e2e.rs | 3 +- libs/transport/error.rs | 40 +++- libs/transport/event/ai.rs | 2 +- libs/transport/event/attachment.rs | 2 +- libs/transport/event/ban.rs | 2 +- libs/transport/event/category.rs | 4 +- libs/transport/event/draft.rs | 2 +- libs/transport/event/invite.rs | 2 +- libs/transport/event/member.rs | 2 +- libs/transport/event/message.rs | 2 +- libs/transport/event/notify.rs | 2 +- libs/transport/event/pin.rs | 2 +- libs/transport/event/presence.rs | 2 +- libs/transport/event/project.rs | 2 +- libs/transport/event/reaction.rs | 2 +- libs/transport/event/rooms.rs | 2 +- libs/transport/event/search.rs | 4 +- libs/transport/event/thread.rs | 2 +- libs/transport/event/voice.rs | 2 +- libs/transport/handler/dispatch.rs | 18 +- libs/transport/handler/inbound/message.rs | 8 +- libs/transport/handler/inbound/misc.rs | 35 ++- libs/transport/handler/inbound/mod.rs | 152 ++++++++---- libs/transport/handler/inbound/msg.rs | 12 +- libs/transport/handler/inbound/reaction.rs | 22 +- libs/transport/handler/inbound/room.rs | 8 +- libs/transport/handler/mod.rs | 2 +- libs/transport/handler/poll.rs | 2 +- libs/transport/handler/session.rs | 19 +- libs/transport/handler/sse.rs | 40 ++-- libs/transport/handler/types/in_message.rs | 261 +++++++++++++++++---- libs/transport/handler/types/mod.rs | 4 +- libs/transport/handler/types/out_event.rs | 4 +- libs/transport/handler/ws.rs | 77 ++++-- libs/transport/lib.rs | 15 +- libs/transport/metrics.rs | 15 +- libs/transport/pagination.rs | 22 +- libs/transport/reconnect.rs | 25 +- libs/transport/richtext.rs | 30 ++- libs/transport/search.rs | 10 +- libs/transport/security.rs | 35 ++- libs/transport/seq.rs | 2 +- libs/transport/token.rs | 4 +- libs/transport/unread.rs | 6 +- 47 files changed, 728 insertions(+), 296 deletions(-) diff --git a/libs/transport/ack.rs b/libs/transport/ack.rs index ce50435..5af78e4 100644 --- a/libs/transport/ack.rs +++ b/libs/transport/ack.rs @@ -1,7 +1,7 @@ +use redis::AsyncCommands; use serde::{Deserialize, Serialize}; use std::time::Duration; use uuid::Uuid; -use redis::AsyncCommands; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct MessageAck { @@ -66,13 +66,17 @@ impl AckTracker { status: AckStatus::Pending, timestamp: chrono::Utc::now(), }; - let value = serde_json::to_string(&ack) + let value = + serde_json::to_string(&ack).map_err(|_| crate::error::AppTransportError::Internal)?; + + let mut conn = self + .cache + .conn() + .await .map_err(|_| crate::error::AppTransportError::Internal)?; - let mut conn = self.cache.conn().await - .map_err(|_| crate::error::AppTransportError::Internal)?; - - let _: () = conn.set_ex(&key, &value, self.timeout.as_secs()) + let _: () = conn + .set_ex(&key, &value, self.timeout.as_secs()) .await .map_err(|_| crate::error::AppTransportError::Internal)?; @@ -84,7 +88,8 @@ impl AckTracker { message_id: Uuid, room_id: Uuid, ) -> Result<(), crate::error::AppTransportError> { - self.update_status(message_id, room_id, AckStatus::Received).await + self.update_status(message_id, room_id, AckStatus::Received) + .await } pub async fn mark_persisted( @@ -101,13 +106,17 @@ impl AckTracker { status: AckStatus::Persisted, timestamp: chrono::Utc::now(), }; - let value = serde_json::to_string(&ack) + let value = + serde_json::to_string(&ack).map_err(|_| crate::error::AppTransportError::Internal)?; + + let mut conn = self + .cache + .conn() + .await .map_err(|_| crate::error::AppTransportError::Internal)?; - let mut conn = self.cache.conn().await - .map_err(|_| crate::error::AppTransportError::Internal)?; - - let _: () = conn.set_ex(&key, &value, self.timeout.as_secs()) + let _: () = conn + .set_ex(&key, &value, self.timeout.as_secs()) .await .map_err(|_| crate::error::AppTransportError::Internal)?; @@ -119,10 +128,14 @@ impl AckTracker { message_id: Uuid, room_id: Uuid, ) -> Result<(), crate::error::AppTransportError> { - self.update_status(message_id, room_id, AckStatus::Delivered).await?; + self.update_status(message_id, room_id, AckStatus::Delivered) + .await?; let key = format!("ack:pending:{}:{}", room_id, message_id); - let mut conn = self.cache.conn().await + let mut conn = self + .cache + .conn() + .await .map_err(|_| crate::error::AppTransportError::Internal)?; let _: Result<(), redis::RedisError> = conn.del(&key).await; @@ -134,7 +147,8 @@ impl AckTracker { message_id: Uuid, room_id: Uuid, ) -> Result<(), crate::error::AppTransportError> { - self.update_status(message_id, room_id, AckStatus::Failed).await + self.update_status(message_id, room_id, AckStatus::Failed) + .await } pub async fn get_status( @@ -144,10 +158,15 @@ impl AckTracker { ) -> Result, crate::error::AppTransportError> { let key = format!("ack:pending:{}:{}", room_id, message_id); - let mut conn = self.cache.conn().await + let mut conn = self + .cache + .conn() + .await .map_err(|_| crate::error::AppTransportError::Internal)?; - let value: Option = conn.get(&key).await + let value: Option = conn + .get(&key) + .await .map_err(|_| crate::error::AppTransportError::Internal)?; match value { @@ -173,10 +192,14 @@ impl AckTracker { let value = serde_json::to_string(&ack) .map_err(|_| crate::error::AppTransportError::Internal)?; - let mut conn = self.cache.conn().await + let mut conn = self + .cache + .conn() + .await .map_err(|_| crate::error::AppTransportError::Internal)?; - let _: () = conn.set_ex(&key, &value, self.timeout.as_secs()) + let _: () = conn + .set_ex(&key, &value, self.timeout.as_secs()) .await .map_err(|_| crate::error::AppTransportError::Internal)?; } diff --git a/libs/transport/bus.rs b/libs/transport/bus.rs index a014ce4..0515c4e 100644 --- a/libs/transport/bus.rs +++ b/libs/transport/bus.rs @@ -49,13 +49,10 @@ impl NatsTransport { } }); - let client = opts - .connect(&url) - .await - .map_err(|e| { - warn!(error = %e, "NATS connect failed"); - AppTransportError::Internal - })?; + let client = opts.connect(&url).await.map_err(|e| { + warn!(error = %e, "NATS connect failed"); + AppTransportError::Internal + })?; let jetstream = jetstream::new(client); @@ -128,7 +125,7 @@ impl Transport for NatsTransport { .replace(['.', '>'], "-") .trim_end_matches('-') .to_string(); - + // Generate a unique instance-specific suffix to prevent competition in multi-node setups. // Using a short UUID-based string for reliability across dependency versions. let instance_id = uuid::Uuid::new_v4().to_string(); @@ -150,7 +147,10 @@ impl Transport for NatsTransport { ..Default::default() }; - let mut messages = match stream.get_or_create_consumer(&durable, config.clone()).await { + let mut messages = match stream + .get_or_create_consumer(&durable, config.clone()) + .await + { Ok(c) => match c.messages().await { Ok(m) => m, Err(e) => { @@ -200,19 +200,17 @@ impl Transport for NatsTransport { .get_or_create_consumer(&durable, config.clone()) .await { - Ok(new_consumer) => { - match new_consumer.messages().await { - Ok(new_messages) => { - info!(subject = %subject, "NATS consumer reconnected"); - messages = new_messages; - reconnect_retries = 0; - break; - } - Err(e) => { - warn!(subject = %subject, error = %e, "Failed to get messages from reconnected NATS consumer"); - } + Ok(new_consumer) => match new_consumer.messages().await { + Ok(new_messages) => { + info!(subject = %subject, "NATS consumer reconnected"); + messages = new_messages; + reconnect_retries = 0; + break; } - } + Err(e) => { + warn!(subject = %subject, error = %e, "Failed to get messages from reconnected NATS consumer"); + } + }, Err(e) => { warn!(subject = %subject, error = %e, "Failed to recreate NATS consumer in reconnect loop"); } diff --git a/libs/transport/dedup.rs b/libs/transport/dedup.rs index 30dab63..61963c8 100644 --- a/libs/transport/dedup.rs +++ b/libs/transport/dedup.rs @@ -23,7 +23,10 @@ impl DeduplicationManager { ) -> Result { let key = format!("dedup:{}:{}", room_id, message_id); - let mut conn = self.cache.conn().await + let mut conn = self + .cache + .conn() + .await .map_err(|_| crate::error::AppTransportError::Internal)?; // Use atomic SET NX EX to prevent race conditions. @@ -48,10 +51,15 @@ impl DeduplicationManager { ) -> Result { let key = format!("dedup:{}:{}", room_id, message_id); - let mut conn = self.cache.conn().await + let mut conn = self + .cache + .conn() + .await .map_err(|_| crate::error::AppTransportError::Internal)?; - let exists: bool = conn.exists(&key).await + let exists: bool = conn + .exists(&key) + .await .map_err(|_| crate::error::AppTransportError::Internal)?; Ok(exists) diff --git a/libs/transport/e2e.rs b/libs/transport/e2e.rs index 3961cdf..13a1da7 100644 --- a/libs/transport/e2e.rs +++ b/libs/transport/e2e.rs @@ -7,8 +7,7 @@ pub struct EncryptedMessage { pub recipient_key_id: String, } -pub struct E2EEncryption { -} +pub struct E2EEncryption {} impl E2EEncryption { pub fn new() -> Self { diff --git a/libs/transport/error.rs b/libs/transport/error.rs index 5c23ae9..eb2b590 100644 --- a/libs/transport/error.rs +++ b/libs/transport/error.rs @@ -8,6 +8,44 @@ pub enum AppTransportError { TokenInvalidOrExpired, #[error("renewal limit exceeded")] RenewalLimitExceeded, + #[error("rate limit exceeded")] + RateLimitExceeded, + #[error("room not found")] + RoomNotFound, + #[error("access denied")] + AccessDenied, #[error("internal error")] Internal, -} \ No newline at end of file +} + +impl From for AppTransportError { + fn from(e: room::error::RoomError) -> Self { + match e { + room::error::RoomError::Unauthorized => AppTransportError::Unauthorized, + room::error::RoomError::NoPower => AppTransportError::AccessDenied, + room::error::RoomError::NotFound(_) => AppTransportError::RoomNotFound, + room::error::RoomError::RateLimited(_) => AppTransportError::RateLimitExceeded, + room::error::RoomError::BadRequest(_) => AppTransportError::Internal, + room::error::RoomError::Database(_) => AppTransportError::Internal, + room::error::RoomError::RoleParseError => AppTransportError::Internal, + room::error::RoomError::Internal(_) => AppTransportError::Internal, + } + } +} + +impl AppTransportError { + /// Map error to a (code, error_type) pair for WS error messages. + /// Frontend can use these to distinguish auth vs rate-limit vs internal errors. + pub fn ws_error_code(&self) -> (u16, &'static str) { + match self { + AppTransportError::Unauthorized => (401, "unauthorized"), + AppTransportError::TokenInvalidOrExpired => (401, "token_invalid"), + AppTransportError::AccessDenied => (403, "access_denied"), + AppTransportError::RateLimitExceeded => (429, "rate_limit_exceeded"), + AppTransportError::RoomNotFound => (404, "not_found"), + AppTransportError::InvalidSession => (401, "invalid_session"), + AppTransportError::RenewalLimitExceeded => (429, "renewal_limit"), + AppTransportError::Internal => (500, "internal_error"), + } + } +} diff --git a/libs/transport/event/ai.rs b/libs/transport/event/ai.rs index 5e39fb7..0d4de42 100644 --- a/libs/transport/event/ai.rs +++ b/libs/transport/event/ai.rs @@ -45,4 +45,4 @@ pub struct AiAgentStatusChangedService { pub old_status: String, pub new_status: String, pub changed_at: DateTime, -} \ No newline at end of file +} diff --git a/libs/transport/event/attachment.rs b/libs/transport/event/attachment.rs index 492e2e0..d498096 100644 --- a/libs/transport/event/attachment.rs +++ b/libs/transport/event/attachment.rs @@ -48,4 +48,4 @@ pub struct AttachmentDeletedService { pub room: RoomId, pub deleted_by: Uuid, pub deleted_at: DateTime, -} \ No newline at end of file +} diff --git a/libs/transport/event/ban.rs b/libs/transport/event/ban.rs index b6571d5..624f4e2 100644 --- a/libs/transport/event/ban.rs +++ b/libs/transport/event/ban.rs @@ -47,4 +47,4 @@ pub struct BanCreateClient { pub struct BanRemoveClient { pub project: ProjectId, pub user_id: UserId, -} \ No newline at end of file +} diff --git a/libs/transport/event/category.rs b/libs/transport/event/category.rs index 3b93baf..c3d39fa 100644 --- a/libs/transport/event/category.rs +++ b/libs/transport/event/category.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use models::{RoomCategoryId, ProjectId, UserId}; +use models::{ProjectId, RoomCategoryId, UserId}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] @@ -58,4 +58,4 @@ pub struct CategoryCreateClient { pub struct CategoryUpdateClient { pub name: Option, pub position: Option, -} \ No newline at end of file +} diff --git a/libs/transport/event/draft.rs b/libs/transport/event/draft.rs index ed8a9ee..38179a3 100644 --- a/libs/transport/event/draft.rs +++ b/libs/transport/event/draft.rs @@ -32,4 +32,4 @@ pub struct DraftLoadClient { #[derive(Debug, Clone, Deserialize, Serialize)] pub struct DraftClearClient { pub room: RoomId, -} \ No newline at end of file +} diff --git a/libs/transport/event/invite.rs b/libs/transport/event/invite.rs index 1521fb4..bcb8336 100644 --- a/libs/transport/event/invite.rs +++ b/libs/transport/event/invite.rs @@ -75,4 +75,4 @@ pub struct InviteAcceptClient { #[derive(Debug, Clone, Deserialize, Serialize)] pub struct InviteRevokeClient { pub id: Uuid, -} \ No newline at end of file +} diff --git a/libs/transport/event/member.rs b/libs/transport/event/member.rs index b8d7ad1..d66a1a8 100644 --- a/libs/transport/event/member.rs +++ b/libs/transport/event/member.rs @@ -111,4 +111,4 @@ pub struct DndUpdateClient { pub do_not_disturb: Option, pub dnd_start_hour: Option, pub dnd_end_hour: Option, -} \ No newline at end of file +} diff --git a/libs/transport/event/message.rs b/libs/transport/event/message.rs index aec47f3..7e79a53 100644 --- a/libs/transport/event/message.rs +++ b/libs/transport/event/message.rs @@ -119,4 +119,4 @@ pub struct MessageListService { pub room: RoomId, pub messages: Vec, pub total: i64, -} \ No newline at end of file +} diff --git a/libs/transport/event/notify.rs b/libs/transport/event/notify.rs index de4be4c..4786f58 100644 --- a/libs/transport/event/notify.rs +++ b/libs/transport/event/notify.rs @@ -63,4 +63,4 @@ pub struct NotifyReadAllClient { #[derive(Debug, Clone, Deserialize, Serialize)] pub struct NotifyArchiveClient { pub id: Uuid, -} \ No newline at end of file +} diff --git a/libs/transport/event/pin.rs b/libs/transport/event/pin.rs index bdbef99..61c066c 100644 --- a/libs/transport/event/pin.rs +++ b/libs/transport/event/pin.rs @@ -43,4 +43,4 @@ pub struct PinAddClient { pub struct PinRemoveClient { pub room: RoomId, pub message: MessageId, -} \ No newline at end of file +} diff --git a/libs/transport/event/presence.rs b/libs/transport/event/presence.rs index d448e12..662cf84 100644 --- a/libs/transport/event/presence.rs +++ b/libs/transport/event/presence.rs @@ -52,4 +52,4 @@ pub struct CustomStatusUpdateClient { pub emoji: Option, pub text: Option, pub expires_at: Option>, -} \ No newline at end of file +} diff --git a/libs/transport/event/project.rs b/libs/transport/event/project.rs index cfbf6c7..b69bbbd 100644 --- a/libs/transport/event/project.rs +++ b/libs/transport/event/project.rs @@ -112,4 +112,4 @@ pub struct ProjectRoomUpdateClient { #[derive(Debug, Clone, Deserialize, Serialize)] pub struct ProjectRoomDeleteClient { pub room: Uuid, -} \ No newline at end of file +} diff --git a/libs/transport/event/reaction.rs b/libs/transport/event/reaction.rs index 091c268..185f779 100644 --- a/libs/transport/event/reaction.rs +++ b/libs/transport/event/reaction.rs @@ -66,4 +66,4 @@ pub struct ReactionRemoveClient { pub room: RoomId, pub message: MessageId, pub emoji: String, -} \ No newline at end of file +} diff --git a/libs/transport/event/rooms.rs b/libs/transport/event/rooms.rs index 965f3de..b840a5a 100644 --- a/libs/transport/event/rooms.rs +++ b/libs/transport/event/rooms.rs @@ -125,4 +125,4 @@ pub struct RoomDeleteClient {} #[derive(Debug, Clone, Deserialize, Serialize)] pub struct RoomLoadClient { pub room: RoomId, -} \ No newline at end of file +} diff --git a/libs/transport/event/search.rs b/libs/transport/event/search.rs index 5c5a4ef..3065336 100644 --- a/libs/transport/event/search.rs +++ b/libs/transport/event/search.rs @@ -1,8 +1,8 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use models::{RoomId, Uuid}; use super::message::MessageNewService; +use models::{RoomId, Uuid}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SearchResultService { @@ -30,4 +30,4 @@ pub struct SearchClient { pub content_type: Option, pub limit: Option, pub offset: Option, -} \ No newline at end of file +} diff --git a/libs/transport/event/thread.rs b/libs/transport/event/thread.rs index da82010..048dfde 100644 --- a/libs/transport/event/thread.rs +++ b/libs/transport/event/thread.rs @@ -95,4 +95,4 @@ pub struct ThreadArchiveClient { #[derive(Debug, Clone, Deserialize, Serialize)] pub struct ThreadLoadClient { pub thread_id: RoomThreadId, -} \ No newline at end of file +} diff --git a/libs/transport/event/voice.rs b/libs/transport/event/voice.rs index 4e66ff6..a50c650 100644 --- a/libs/transport/event/voice.rs +++ b/libs/transport/event/voice.rs @@ -114,4 +114,4 @@ pub struct VoiceDeafClient { pub struct ScreenShareClient { pub room: RoomId, pub start: bool, -} \ No newline at end of file +} diff --git a/libs/transport/handler/dispatch.rs b/libs/transport/handler/dispatch.rs index ab46ec4..80e21e1 100644 --- a/libs/transport/handler/dispatch.rs +++ b/libs/transport/handler/dispatch.rs @@ -2,8 +2,8 @@ use models::RoomId; use queue::{ReactionGroup, RoomMessageEvent, RoomMessageStreamChunkEvent, TypingEvent}; use room::types::NotificationEvent; -use crate::event::{member, message, reaction, notify}; use super::types::WsOutEvent; +use crate::event::{member, message, notify, reaction}; pub struct EventDispatcher; @@ -117,13 +117,17 @@ impl EventDispatcher { reactions: reactions .iter() .map(|g| reaction::ReactionGroup { - emoji: g.emoji.clone(), - count: g.count as i64, - reacted_by_me: g.reacted_by_me, - users: g.users.iter().filter_map(|u| u.parse::().ok()).collect(), - }) + emoji: g.emoji.clone(), + count: g.count as i64, + reacted_by_me: g.reacted_by_me, + users: g + .users + .iter() + .filter_map(|u| u.parse::().ok()) + .collect(), + }) .collect(), }, } } -} \ No newline at end of file +} diff --git a/libs/transport/handler/inbound/message.rs b/libs/transport/handler/inbound/message.rs index 1a2fd0e..ddde914 100644 --- a/libs/transport/handler/inbound/message.rs +++ b/libs/transport/handler/inbound/message.rs @@ -48,7 +48,11 @@ pub(crate) async fn message_list( emoji: r.emoji.clone(), count: r.count as i64, reacted_by_me: r.reacted_by_me, - users: r.users.iter().filter_map(|u| uuid::Uuid::parse_str(u).ok()).collect(), + users: r + .users + .iter() + .filter_map(|u| uuid::Uuid::parse_str(u).ok()) + .collect(), }) .collect(), ), @@ -144,4 +148,4 @@ pub(crate) async fn message_revoke( AppTransportError::Internal })?; Ok(None) -} \ No newline at end of file +} diff --git a/libs/transport/handler/inbound/misc.rs b/libs/transport/handler/inbound/misc.rs index b6de3fe..a76a3bf 100644 --- a/libs/transport/handler/inbound/misc.rs +++ b/libs/transport/handler/inbound/misc.rs @@ -171,10 +171,12 @@ pub(crate) async fn custom_status_update( text: Option, expires_at: Option>, ) -> Result, AppTransportError> { - let evt = session - .service - .room - .set_custom_status(session.user.user_id, emoji.clone(), text.clone(), expires_at); + let evt = session.service.room.set_custom_status( + session.user.user_id, + emoji.clone(), + text.clone(), + expires_at, + ); if let Some(data) = evt { Ok(Some(WsOutEvent::CustomStatusUpdated { @@ -221,27 +223,40 @@ pub(crate) async fn ban_remove() -> Result, AppTransportError // ─── Voice (stubs) ──────────────────────────────────────────────────── -pub(crate) async fn voice_join(room: models::RoomId) -> Result, AppTransportError> { +pub(crate) async fn voice_join( + room: models::RoomId, +) -> Result, AppTransportError> { tracing::info!(%room, "Voice join"); Ok(None) } -pub(crate) async fn voice_leave(room: models::RoomId) -> Result, AppTransportError> { +pub(crate) async fn voice_leave( + room: models::RoomId, +) -> Result, AppTransportError> { tracing::info!(%room, "Voice leave"); Ok(None) } -pub(crate) async fn voice_mute(room: models::RoomId, muted: bool) -> Result, AppTransportError> { +pub(crate) async fn voice_mute( + room: models::RoomId, + muted: bool, +) -> Result, AppTransportError> { tracing::info!(%room, %muted, "Voice mute"); Ok(None) } -pub(crate) async fn voice_deaf(room: models::RoomId, deafened: bool) -> Result, AppTransportError> { +pub(crate) async fn voice_deaf( + room: models::RoomId, + deafened: bool, +) -> Result, AppTransportError> { tracing::info!(%room, %deafened, "Voice deaf"); Ok(None) } -pub(crate) async fn screen_share(room: models::RoomId, start: bool) -> Result, AppTransportError> { +pub(crate) async fn screen_share( + room: models::RoomId, + start: bool, +) -> Result, AppTransportError> { tracing::info!(%room, %start, "Screen share"); Ok(None) } @@ -412,4 +427,4 @@ pub(crate) async fn state_update_dnd( AppTransportError::Internal })?; Ok(None) -} \ No newline at end of file +} diff --git a/libs/transport/handler/inbound/mod.rs b/libs/transport/handler/inbound/mod.rs index 1041778..be374fa 100644 --- a/libs/transport/handler/inbound/mod.rs +++ b/libs/transport/handler/inbound/mod.rs @@ -2,9 +2,9 @@ use crate::error::AppTransportError; use crate::handler::session::TransportSession; use crate::handler::types::{WsInMessage, WsOutEvent}; +mod message; mod misc; mod msg; -mod message; mod reaction; mod room; @@ -21,14 +21,25 @@ impl MessageHandler { WsInMessage::Unsubscribe { room } => msg::unsubscribe(session, room).await, WsInMessage::TypingStart { room } => msg::typing_start(session, room).await, WsInMessage::TypingStop { room } => msg::typing_stop(session, room).await, - WsInMessage::ReadReceipt { room, last_read_seq } => { - msg::read_receipt(session, room, last_read_seq).await - } - WsInMessage::MessageList { room, before_seq, after_seq, limit } => { - message::message_list(session, room, before_seq, after_seq, limit).await - } - WsInMessage::MessageCreate { room, content, content_type, thread, in_reply_to } => { - message::message_create(session, room, content, content_type, thread, in_reply_to).await + WsInMessage::ReadReceipt { + room, + last_read_seq, + } => msg::read_receipt(session, room, last_read_seq).await, + WsInMessage::MessageList { + room, + before_seq, + after_seq, + limit, + } => message::message_list(session, room, before_seq, after_seq, limit).await, + WsInMessage::MessageCreate { + room, + content, + content_type, + thread, + in_reply_to, + } => { + message::message_create(session, room, content, content_type, thread, in_reply_to) + .await } WsInMessage::MessageUpdate { message, content } => { message::message_update(session, message, content).await @@ -37,28 +48,44 @@ impl MessageHandler { message::message_revoke(session, message).await } WsInMessage::RoomGet { room } => room::room_get(session, room).await, - WsInMessage::RoomCreate { project, room_name, public, category } => { - room::room_create(session, project, room_name, public, category).await - } - WsInMessage::RoomUpdate { room, room_name, public, category } => { - room::room_update(session, room, room_name, public, category).await - } + WsInMessage::RoomCreate { + project, + room_name, + public, + category, + } => room::room_create(session, project, room_name, public, category).await, + WsInMessage::RoomUpdate { + room, + room_name, + public, + category, + } => room::room_update(session, room, room_name, public, category).await, WsInMessage::RoomDelete { room } => room::room_delete(session, room).await, - WsInMessage::CategoryCreate { project, name, position } => { - room::category_create(session, project, name, position).await - } + WsInMessage::CategoryCreate { + project, + name, + position, + } => room::category_create(session, project, name, position).await, WsInMessage::CategoryUpdate { id, name, position } => { room::category_update(session, id, name, position).await } WsInMessage::CategoryDelete { id } => room::category_delete(session, id).await, - WsInMessage::AccessGrant { room, user } => room::access_grant(session, room, user).await, - WsInMessage::AccessRevoke { room, user } => room::access_revoke(session, room, user).await, - WsInMessage::ReactionAdd { room, message, emoji } => { - reaction::reaction_add(session, room, message, emoji).await + WsInMessage::AccessGrant { room, user } => { + room::access_grant(session, room, user).await } - WsInMessage::ReactionRemove { room, message, emoji } => { - reaction::reaction_remove(session, room, message, emoji).await + WsInMessage::AccessRevoke { room, user } => { + room::access_revoke(session, room, user).await } + WsInMessage::ReactionAdd { + room, + message, + emoji, + } => reaction::reaction_add(session, room, message, emoji).await, + WsInMessage::ReactionRemove { + room, + message, + emoji, + } => reaction::reaction_remove(session, room, message, emoji).await, WsInMessage::ThreadCreate { room, parent } => { reaction::thread_create(session, room, parent).await } @@ -79,15 +106,35 @@ impl MessageHandler { WsInMessage::NotificationArchive { id } => { misc::notification_archive(session, id).await } - WsInMessage::Search { q, room, start_time, end_time, sender_id, content_type, limit, offset } => { - misc::search(session, q, room, start_time, end_time, sender_id, content_type, limit, offset).await - } - WsInMessage::PresenceUpdate { status } => { - misc::presence_update(session, status).await - } - WsInMessage::CustomStatusUpdate { emoji, text, expires_at } => { - misc::custom_status_update(session, emoji, text, expires_at).await + WsInMessage::Search { + q, + room, + start_time, + end_time, + sender_id, + content_type, + limit, + offset, + } => { + misc::search( + session, + q, + room, + start_time, + end_time, + sender_id, + content_type, + limit, + offset, + ) + .await } + WsInMessage::PresenceUpdate { status } => misc::presence_update(session, status).await, + WsInMessage::CustomStatusUpdate { + emoji, + text, + expires_at, + } => misc::custom_status_update(session, emoji, text, expires_at).await, WsInMessage::InviteCreate { .. } => misc::invite_create().await, WsInMessage::InviteAccept { .. } => misc::invite_accept().await, WsInMessage::InviteRevoke { .. } => misc::invite_revoke().await, @@ -99,20 +146,45 @@ impl MessageHandler { WsInMessage::VoiceDeaf { room, deafened } => misc::voice_deaf(room, deafened).await, WsInMessage::ScreenShare { room, start } => misc::screen_share(room, start).await, WsInMessage::AiList { room } => misc::ai_list(session, room).await, - WsInMessage::AiUpsert { room, model, version, system_prompt, temperature, max_tokens, stream } => { - misc::ai_upsert(session, room, model, version, system_prompt, temperature, max_tokens, stream).await + WsInMessage::AiUpsert { + room, + model, + version, + system_prompt, + temperature, + max_tokens, + stream, + } => { + misc::ai_upsert( + session, + room, + model, + version, + system_prompt, + temperature, + max_tokens, + stream, + ) + .await } WsInMessage::AiDelete { room, agent_id } => { misc::ai_delete(session, room, agent_id).await } WsInMessage::AiStop { room } => misc::ai_stop(session, room).await, WsInMessage::UserSummary { username } => misc::user_summary(session, username).await, - WsInMessage::StateSetReadSeq { room, last_read_seq } => { - misc::state_set_read_seq(session, room, last_read_seq).await - } - WsInMessage::StateUpdateDnd { room, do_not_disturb, dnd_start_hour, dnd_end_hour } => { - misc::state_update_dnd(session, room, do_not_disturb, dnd_start_hour, dnd_end_hour).await + WsInMessage::StateSetReadSeq { + room, + last_read_seq, + } => misc::state_set_read_seq(session, room, last_read_seq).await, + WsInMessage::StateUpdateDnd { + room, + do_not_disturb, + dnd_start_hour, + dnd_end_hour, + } => { + misc::state_update_dnd(session, room, do_not_disturb, dnd_start_hour, dnd_end_hour) + .await } } } -} \ No newline at end of file +} diff --git a/libs/transport/handler/inbound/msg.rs b/libs/transport/handler/inbound/msg.rs index 1fe3f98..035a4a6 100644 --- a/libs/transport/handler/inbound/msg.rs +++ b/libs/transport/handler/inbound/msg.rs @@ -3,6 +3,7 @@ use room::ws_context::WsUserContext; use crate::error::AppTransportError; use crate::handler::session::TransportSession; use crate::handler::types::WsOutEvent; +use room::connection::MAX_ROOMS_PER_SESSION; pub(crate) async fn ping() -> Result, AppTransportError> { Ok(Some(WsOutEvent::Pong { @@ -14,10 +15,11 @@ pub(crate) async fn subscribe( session: &TransportSession, room: models::RoomId, ) -> Result, AppTransportError> { - let sub = session.subscribe_room(room).await.map_err(|e| { - tracing::warn!(error = %e, "subscribe_room failed"); - AppTransportError::Internal - })?; + // Per-session room subscription limit + if session.subscriptions.len() >= MAX_ROOMS_PER_SESSION { + return Err(AppTransportError::RateLimitExceeded); + } + let sub = session.subscribe_room(room).await?; session.subscriptions.insert(room, sub); session.service.room.spawn_room_workers(room); session.refresh_project().await; @@ -64,4 +66,4 @@ pub(crate) async fn read_receipt( AppTransportError::Internal })?; Ok(None) -} \ No newline at end of file +} diff --git a/libs/transport/handler/inbound/reaction.rs b/libs/transport/handler/inbound/reaction.rs index 20dbf6f..d3ea9ef 100644 --- a/libs/transport/handler/inbound/reaction.rs +++ b/libs/transport/handler/inbound/reaction.rs @@ -60,7 +60,11 @@ fn build_reaction_batch( emoji: g.emoji, count: g.count as i64, reacted_by_me: g.reacted_by_me, - users: g.users.iter().filter_map(|u| u.parse::().ok()).collect(), + users: g + .users + .iter() + .filter_map(|u| u.parse::().ok()) + .collect(), }) .collect(), }, @@ -76,7 +80,11 @@ pub(crate) async fn thread_create( session .service .room - .room_thread_create(room, room::RoomThreadCreateRequest { parent_seq: parent }, &ctx) + .room_thread_create( + room, + room::RoomThreadCreateRequest { parent_seq: parent }, + &ctx, + ) .await .map_err(|e| { tracing::warn!(error = %e, "room_thread_create failed"); @@ -85,12 +93,16 @@ pub(crate) async fn thread_create( Ok(None) } -pub(crate) async fn thread_resolve(thread_id: models::RoomThreadId) -> Result, AppTransportError> { +pub(crate) async fn thread_resolve( + thread_id: models::RoomThreadId, +) -> Result, AppTransportError> { tracing::info!(%thread_id, "Thread resolved"); Ok(None) } -pub(crate) async fn thread_archive(thread_id: models::RoomThreadId) -> Result, AppTransportError> { +pub(crate) async fn thread_archive( + thread_id: models::RoomThreadId, +) -> Result, AppTransportError> { tracing::info!(%thread_id, "Thread archived"); Ok(None) } @@ -177,4 +189,4 @@ pub(crate) async fn draft_clear( cleared_at: chrono::Utc::now(), }, })) -} \ No newline at end of file +} diff --git a/libs/transport/handler/inbound/room.rs b/libs/transport/handler/inbound/room.rs index b354038..246ab93 100644 --- a/libs/transport/handler/inbound/room.rs +++ b/libs/transport/handler/inbound/room.rs @@ -170,11 +170,7 @@ pub(crate) async fn category_update( session .service .room - .room_category_update( - id, - room::RoomCategoryUpdateRequest { name, position }, - &ctx, - ) + .room_category_update(id, room::RoomCategoryUpdateRequest { name, position }, &ctx) .await .map_err(|e| { tracing::warn!(error = %e, "room_category_update failed"); @@ -234,4 +230,4 @@ pub(crate) async fn access_revoke( AppTransportError::Internal })?; Ok(None) -} \ No newline at end of file +} diff --git a/libs/transport/handler/mod.rs b/libs/transport/handler/mod.rs index e24361f..e355450 100644 --- a/libs/transport/handler/mod.rs +++ b/libs/transport/handler/mod.rs @@ -11,4 +11,4 @@ pub use inbound::MessageHandler; pub use poll::poll_subscriptions; pub use session::{TransportSession, WsUserCtx}; pub use sse::ws_ai_stream; -pub use types::{WsError, WsInMessage, WsOutEvent, WS_PROTOCOL_VERSION}; \ No newline at end of file +pub use types::{WS_PROTOCOL_VERSION, WsError, WsInMessage, WsOutEvent}; diff --git a/libs/transport/handler/poll.rs b/libs/transport/handler/poll.rs index c44cc43..7a2ee8e 100644 --- a/libs/transport/handler/poll.rs +++ b/libs/transport/handler/poll.rs @@ -83,4 +83,4 @@ pub async fn poll_notifications( Ok(Err(broadcast::error::RecvError::Closed)) => None, Err(_elapsed) => None, } -} \ No newline at end of file +} diff --git a/libs/transport/handler/session.rs b/libs/transport/handler/session.rs index 561d279..97cdb07 100644 --- a/libs/transport/handler/session.rs +++ b/libs/transport/handler/session.rs @@ -84,10 +84,7 @@ pub struct TransportSession { } impl TransportSession { - pub fn new( - user: WsUserCtx, - service: Arc, - ) -> Self { + pub fn new(user: WsUserCtx, service: Arc) -> Self { Self { user, subscriptions: Arc::new(DashMap::new()), @@ -116,7 +113,11 @@ impl TransportSession { } pub async fn unsubscribe_room(&self, room_id: RoomId) { - self.service.room.room_manager.unsubscribe(room_id, self.user.user_id).await; + self.service + .room + .room_manager + .unsubscribe(room_id, self.user.user_id) + .await; self.subscriptions.remove(&room_id); } @@ -129,7 +130,11 @@ impl TransportSession { action: action.to_string(), sender_type: Some("user".to_string()), }; - self.service.room.room_manager.broadcast_typing(room_id, event).await; + self.service + .room + .room_manager + .broadcast_typing(room_id, event) + .await; } /// Get the current project context from cache (populated on first subscription). @@ -181,4 +186,4 @@ impl TransportSession { s.set_user(self.user.user_id); s } -} \ No newline at end of file +} diff --git a/libs/transport/handler/sse.rs b/libs/transport/handler/sse.rs index fdc1d89..8ba45bd 100644 --- a/libs/transport/handler/sse.rs +++ b/libs/transport/handler/sse.rs @@ -1,11 +1,11 @@ -use actix_web::{web, HttpRequest, HttpResponse}; use actix_web::web::Bytes; +use actix_web::{HttpRequest, HttpResponse, web}; use tokio_stream::StreamExt; use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; use uuid::Uuid; -use service::AppService; use queue::RoomMessageStreamChunkEvent; +use service::AppService; /// SSE endpoint: GET /ws/ai-stream/{room_id}/{message_id} pub async fn ws_ai_stream( @@ -30,7 +30,9 @@ pub async fn ws_ai_stream( return Err(actix_web::error::ErrorUnauthorized("invalid auth header")); } } else if let Some(token) = req.uri().query().and_then(|q| { - q.split('&').find(|p| p.starts_with("token=")).and_then(|p| p.split('=').nth(1)) + q.split('&') + .find(|p| p.starts_with("token=")) + .and_then(|p| p.split('=').nth(1)) }) { match service.ws_token.validate_token(token).await { Ok(uid) => uid, @@ -55,21 +57,23 @@ pub async fn ws_ai_stream( None => return Err(actix_web::error::ErrorNotFound("stream not found").into()), }; - let sse_stream = BroadcastStream::new(stream_rx) - .map(move |result| match result { - Ok(chunk) => { - let data = format_sse_chunk(&chunk); - if chunk.done { - Ok::<_, std::io::Error>(Bytes::from(format!("{}event: done\ndata: \n\n", data))) - } else { - Ok::<_, std::io::Error>(Bytes::from(data)) - } + let sse_stream = BroadcastStream::new(stream_rx).map(move |result| match result { + Ok(chunk) => { + let data = format_sse_chunk(&chunk); + if chunk.done { + Ok::<_, std::io::Error>(Bytes::from(format!("{}event: done\ndata: \n\n", data))) + } else { + Ok::<_, std::io::Error>(Bytes::from(data)) } - Err(BroadcastStreamRecvError::Lagged(_)) => { - tracing::warn!(message_id = %message_id, "SSE subscriber lagged"); - Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "stream lagged")) - } - }); + } + Err(BroadcastStreamRecvError::Lagged(_)) => { + tracing::warn!(message_id = %message_id, "SSE subscriber lagged"); + Err(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "stream lagged", + )) + } + }); Ok(HttpResponse::Ok() .content_type("text/event-stream") @@ -91,4 +95,4 @@ fn format_sse_chunk(chunk: &RoomMessageStreamChunkEvent) -> String { "chunk_type": chunk.chunk_type, }); format!("event: chunk\ndata: {}\n\n", json) -} \ No newline at end of file +} diff --git a/libs/transport/handler/types/in_message.rs b/libs/transport/handler/types/in_message.rs index 3504433..9bd6a95 100644 --- a/libs/transport/handler/types/in_message.rs +++ b/libs/transport/handler/types/in_message.rs @@ -11,56 +11,215 @@ pub const WS_PROTOCOL_VERSION: u32 = 1; #[serde(tag = "type", rename_all = "snake_case")] pub enum WsInMessage { Ping, - Subscribe { room: RoomId }, - Unsubscribe { room: RoomId }, - TypingStart { room: RoomId }, - TypingStop { room: RoomId }, - ReadReceipt { room: RoomId, last_read_seq: i64 }, - MessageList { room: RoomId, before_seq: Option, after_seq: Option, limit: Option }, - MessageCreate { room: RoomId, content: String, content_type: Option, thread: Option, in_reply_to: Option }, - MessageUpdate { message: Uuid, content: String }, - MessageRevoke { message: Uuid }, - RoomGet { room: RoomId }, - RoomCreate { project: ProjectId, room_name: String, public: bool, category: Option }, - RoomUpdate { room: RoomId, room_name: Option, public: Option, category: Option }, - RoomDelete { room: RoomId }, - CategoryCreate { project: ProjectId, name: String, position: Option }, - CategoryUpdate { id: Uuid, name: Option, position: Option }, - CategoryDelete { id: Uuid }, - AccessGrant { room: RoomId, user: UserId }, - AccessRevoke { room: RoomId, user: UserId }, - StateSetReadSeq { room: RoomId, last_read_seq: i64 }, - StateUpdateDnd { room: RoomId, do_not_disturb: Option, dnd_start_hour: Option, dnd_end_hour: Option }, - ReactionAdd { room: RoomId, message: Uuid, emoji: String }, - ReactionRemove { room: RoomId, message: Uuid, emoji: String }, - ThreadCreate { room: RoomId, parent: i64 }, - ThreadResolve { thread_id: RoomThreadId }, - ThreadArchive { thread_id: RoomThreadId }, - PinAdd { room: RoomId, message: Uuid }, - PinRemove { room: RoomId, message: Uuid }, - DraftSave { room: RoomId, content: String }, - DraftClear { room: RoomId }, - Search { q: String, room: Option, start_time: Option>, end_time: Option>, sender_id: Option, content_type: Option, limit: Option, offset: Option }, - NotificationMarkRead { id: Uuid }, - NotificationMarkAllRead { project_id: Option }, - NotificationArchive { id: Uuid }, - PresenceUpdate { status: crate::event::presence::UserPresenceStatus }, - CustomStatusUpdate { emoji: Option, text: Option, expires_at: Option> }, - InviteCreate { project: ProjectId, room: Option, max_uses: Option, expires_at: Option> }, - InviteAccept { code: String }, - InviteRevoke { id: Uuid }, - BanCreate { project: ProjectId, user: UserId, reason: Option, expires_at: Option> }, - BanRemove { project: ProjectId, user: UserId }, - VoiceJoin { room: RoomId }, - VoiceLeave { room: RoomId }, - VoiceMute { room: RoomId, muted: bool }, - VoiceDeaf { room: RoomId, deafened: bool }, - ScreenShare { room: RoomId, start: bool }, - AiList { room: RoomId }, - AiUpsert { room: RoomId, model: Uuid, version: Option, system_prompt: Option, temperature: Option, max_tokens: Option, stream: Option }, - AiDelete { room: RoomId, agent_id: Uuid }, - AiStop { room: RoomId }, - UserSummary { username: String }, + Subscribe { + room: RoomId, + }, + Unsubscribe { + room: RoomId, + }, + TypingStart { + room: RoomId, + }, + TypingStop { + room: RoomId, + }, + ReadReceipt { + room: RoomId, + last_read_seq: i64, + }, + MessageList { + room: RoomId, + before_seq: Option, + after_seq: Option, + limit: Option, + }, + MessageCreate { + room: RoomId, + content: String, + content_type: Option, + thread: Option, + in_reply_to: Option, + }, + MessageUpdate { + message: Uuid, + content: String, + }, + MessageRevoke { + message: Uuid, + }, + RoomGet { + room: RoomId, + }, + RoomCreate { + project: ProjectId, + room_name: String, + public: bool, + category: Option, + }, + RoomUpdate { + room: RoomId, + room_name: Option, + public: Option, + category: Option, + }, + RoomDelete { + room: RoomId, + }, + CategoryCreate { + project: ProjectId, + name: String, + position: Option, + }, + CategoryUpdate { + id: Uuid, + name: Option, + position: Option, + }, + CategoryDelete { + id: Uuid, + }, + AccessGrant { + room: RoomId, + user: UserId, + }, + AccessRevoke { + room: RoomId, + user: UserId, + }, + StateSetReadSeq { + room: RoomId, + last_read_seq: i64, + }, + StateUpdateDnd { + room: RoomId, + do_not_disturb: Option, + dnd_start_hour: Option, + dnd_end_hour: Option, + }, + ReactionAdd { + room: RoomId, + message: Uuid, + emoji: String, + }, + ReactionRemove { + room: RoomId, + message: Uuid, + emoji: String, + }, + ThreadCreate { + room: RoomId, + parent: i64, + }, + ThreadResolve { + thread_id: RoomThreadId, + }, + ThreadArchive { + thread_id: RoomThreadId, + }, + PinAdd { + room: RoomId, + message: Uuid, + }, + PinRemove { + room: RoomId, + message: Uuid, + }, + DraftSave { + room: RoomId, + content: String, + }, + DraftClear { + room: RoomId, + }, + Search { + q: String, + room: Option, + start_time: Option>, + end_time: Option>, + sender_id: Option, + content_type: Option, + limit: Option, + offset: Option, + }, + NotificationMarkRead { + id: Uuid, + }, + NotificationMarkAllRead { + project_id: Option, + }, + NotificationArchive { + id: Uuid, + }, + PresenceUpdate { + status: crate::event::presence::UserPresenceStatus, + }, + CustomStatusUpdate { + emoji: Option, + text: Option, + expires_at: Option>, + }, + InviteCreate { + project: ProjectId, + room: Option, + max_uses: Option, + expires_at: Option>, + }, + InviteAccept { + code: String, + }, + InviteRevoke { + id: Uuid, + }, + BanCreate { + project: ProjectId, + user: UserId, + reason: Option, + expires_at: Option>, + }, + BanRemove { + project: ProjectId, + user: UserId, + }, + VoiceJoin { + room: RoomId, + }, + VoiceLeave { + room: RoomId, + }, + VoiceMute { + room: RoomId, + muted: bool, + }, + VoiceDeaf { + room: RoomId, + deafened: bool, + }, + ScreenShare { + room: RoomId, + start: bool, + }, + AiList { + room: RoomId, + }, + AiUpsert { + room: RoomId, + model: Uuid, + version: Option, + system_prompt: Option, + temperature: Option, + max_tokens: Option, + stream: Option, + }, + AiDelete { + room: RoomId, + agent_id: Uuid, + }, + AiStop { + room: RoomId, + }, + UserSummary { + username: String, + }, } impl WsInMessage { @@ -101,4 +260,4 @@ impl WsInMessage { _ => None, } } -} \ No newline at end of file +} diff --git a/libs/transport/handler/types/mod.rs b/libs/transport/handler/types/mod.rs index 53d36dd..05799f7 100644 --- a/libs/transport/handler/types/mod.rs +++ b/libs/transport/handler/types/mod.rs @@ -1,5 +1,5 @@ mod in_message; mod out_event; -pub use in_message::{WsInMessage, WS_PROTOCOL_VERSION}; -pub use out_event::{WsError, WsOutEvent}; \ No newline at end of file +pub use in_message::{WS_PROTOCOL_VERSION, WsInMessage}; +pub use out_event::{WsError, WsOutEvent}; diff --git a/libs/transport/handler/types/out_event.rs b/libs/transport/handler/types/out_event.rs index f042336..ae028a7 100644 --- a/libs/transport/handler/types/out_event.rs +++ b/libs/transport/handler/types/out_event.rs @@ -1,11 +1,11 @@ use serde::Serialize; use uuid::Uuid; -use models::{ProjectId, RoomId, UserId}; use crate::event::{ ai, attachment, ban, category, draft, invite, member, message, notify, pin, presence, project, reaction, rooms, search, thread, voice, }; +use models::{ProjectId, RoomId, UserId}; #[derive(Debug, Clone, Serialize)] #[serde(tag = "type", rename_all = "snake_case")] @@ -272,4 +272,4 @@ pub struct WsError { pub code: i32, pub error: String, pub message: String, -} \ No newline at end of file +} diff --git a/libs/transport/handler/ws.rs b/libs/transport/handler/ws.rs index 61016a7..8a6e7b3 100644 --- a/libs/transport/handler/ws.rs +++ b/libs/transport/handler/ws.rs @@ -2,7 +2,7 @@ use std::panic::AssertUnwindSafe; use std::sync::Arc; use std::time::Instant; -use actix_web::{web, HttpRequest, HttpResponse}; +use actix_web::{HttpRequest, HttpResponse, web}; use actix_ws::Message as WsMessage; use futures_util::FutureExt; use uuid::Uuid; @@ -11,7 +11,10 @@ use service::AppService; use super::inbound::MessageHandler; use super::poll::{poll_notifications, poll_subscriptions}; -use super::session::{TransportSession, WsUserCtx, HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT, MAX_IDLE_TIMEOUT, MAX_TEXT_MESSAGE_LEN, MAX_MESSAGES_PER_SECOND}; +use super::session::{ + HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT, MAX_IDLE_TIMEOUT, MAX_MESSAGES_PER_SECOND, + MAX_TEXT_MESSAGE_LEN, TransportSession, WsUserCtx, +}; use super::types::{WsInMessage, WsOutEvent}; /// Universal WebSocket endpoint: `/ws` @@ -183,10 +186,11 @@ pub async fn ws_handler( Err(e) => { tracing::warn!(user_id = %user_id, error = %e, "WS message processing failed"); let rid = request_id.unwrap_or(Uuid::nil()); + let (code, error_type) = e.ws_error_code(); let err_json = serde_json::json!({ "type": "error", - "code": 500, - "error": "internal_error", + "code": code, + "error": error_type, "message": e.to_string(), "_request_id": rid }); @@ -202,14 +206,21 @@ pub async fn ws_handler( } } } - Some(Ok(WsMessage::Binary(_))) => { break; } + Some(Ok(WsMessage::Binary(_))) => { + let _ = ws_session.close(Some(actix_ws::CloseCode::Unsupported.into())).await; + break; + } Some(Ok(WsMessage::Continuation(_))) => {} Some(Ok(WsMessage::Nop)) => {} Some(Ok(WsMessage::Close(reason))) => { let _ = ws_session.close(reason).await; break; } - Some(Err(e)) => { tracing::warn!(error = %e, "WS transport error"); break; } + Some(Err(e)) => { + tracing::warn!(error = %e, "WS transport error"); + let _ = ws_session.close(Some(actix_ws::CloseCode::Protocol.into())).await; + break; + } None => break, } } @@ -221,6 +232,9 @@ pub async fn ws_handler( manager.unsubscribe(sub.room_id, user_id).await; } manager.unsubscribe_user_notification(user_id).await; + // Remove presence entry so disconnected users don't appear online for up to 10 minutes + let project_id = session.project_id.lock().await; + session.service.room.remove_user_presence(user_id, *project_id).await; manager.metrics.ws_connections_active.decrement(1.0); manager.metrics.ws_disconnections_total.increment(1); }).catch_unwind(); @@ -264,13 +278,20 @@ async fn authenticate_ws( if let Ok(auth_str) = auth_header.to_str() { if let Some(token) = auth_str.strip_prefix("Bearer ") { match service.ws_token.validate_token_ctx(token).await { - Ok(ctx) => return Ok(crate::token::AppTransportTokenContext { - user_id: ctx.user_id, - device_id: ctx.device_id.unwrap_or_default(), - client_id: ctx.client_id.unwrap_or_default(), - }), + Ok(ctx) => { + return Ok(crate::token::AppTransportTokenContext { + user_id: ctx.user_id, + device_id: ctx.device_id.unwrap_or_default(), + client_id: ctx.client_id.unwrap_or_default(), + }); + } Err(_) => { - service.room.room_manager.metrics.ws_auth_failures.increment(1); + service + .room + .room_manager + .metrics + .ws_auth_failures + .increment(1); return Err(actix_web::error::ErrorUnauthorized("token auth failed")); } } @@ -280,21 +301,35 @@ async fn authenticate_ws( // Fallback: token in query string (deprecated, kept for backward compatibility) if let Some(token) = req.uri().query().and_then(|q| { - q.split('&').find(|p| p.starts_with("token=")).and_then(|p| p.split('=').nth(1)) + q.split('&') + .find(|p| p.starts_with("token=")) + .and_then(|p| p.split('=').nth(1)) }) { match service.ws_token.validate_token_ctx(token).await { - Ok(ctx) => return Ok(crate::token::AppTransportTokenContext { - user_id: ctx.user_id, - device_id: ctx.device_id.unwrap_or_default(), - client_id: ctx.client_id.unwrap_or_default(), - }), + Ok(ctx) => { + return Ok(crate::token::AppTransportTokenContext { + user_id: ctx.user_id, + device_id: ctx.device_id.unwrap_or_default(), + client_id: ctx.client_id.unwrap_or_default(), + }); + } Err(_) => { - service.room.room_manager.metrics.ws_auth_failures.increment(1); + service + .room + .room_manager + .metrics + .ws_auth_failures + .increment(1); return Err(actix_web::error::ErrorUnauthorized("token auth failed")); } } } - service.room.room_manager.metrics.ws_auth_failures.increment(1); + service + .room + .room_manager + .metrics + .ws_auth_failures + .increment(1); Err(actix_web::error::ErrorUnauthorized("no auth provided")) -} \ No newline at end of file +} diff --git a/libs/transport/lib.rs b/libs/transport/lib.rs index f52fac7..c93ce90 100644 --- a/libs/transport/lib.rs +++ b/libs/transport/lib.rs @@ -1,9 +1,9 @@ -use std::collections::HashMap; +use crate::seq::SeqAllocator; use config::AppConfig; use db::cache::AppCache; use db::database::AppDatabase; use service::AppService; -use crate::seq::SeqAllocator; +use std::collections::HashMap; #[derive(Clone)] pub struct AppTransport { @@ -103,7 +103,11 @@ impl AppTransport { Self::build(service, config, nats) } - fn build(service: AppService, config: AppConfig, nats: Option) -> Result { + fn build( + service: AppService, + config: AppConfig, + nats: Option, + ) -> Result { Ok(Self { db: service.db.clone(), cache: service.cache.clone(), @@ -151,7 +155,10 @@ impl AppTransport { self.seq.seq(room).await } - pub async fn bootstrap_seq(&self, room: models::RoomId) -> Result { + pub async fn bootstrap_seq( + &self, + room: models::RoomId, + ) -> Result { self.seq.bootstrap(room).await } diff --git a/libs/transport/metrics.rs b/libs/transport/metrics.rs index 3a70c7c..56d041c 100644 --- a/libs/transport/metrics.rs +++ b/libs/transport/metrics.rs @@ -19,22 +19,27 @@ impl TransportMetrics { } pub fn increment_sent(&self) { - self.messages_sent.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.messages_sent + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); } pub fn increment_received(&self) { - self.messages_received.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.messages_received + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); } pub fn increment_failed(&self) { - self.messages_failed.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.messages_failed + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); } pub fn increment_connections(&self) { - self.active_connections.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.active_connections + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); } pub fn decrement_connections(&self) { - self.active_connections.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + self.active_connections + .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); } } diff --git a/libs/transport/pagination.rs b/libs/transport/pagination.rs index 0113fca..190beb3 100644 --- a/libs/transport/pagination.rs +++ b/libs/transport/pagination.rs @@ -49,8 +49,8 @@ impl MessagePagination { &self, params: PaginationParams, ) -> Result { - use sea_orm::*; use models::rooms::room_message; + use sea_orm::*; let limit = std::cmp::Ord::min(params.limit, 100); let cursor_seq = if let Some(cursor) = params.cursor { @@ -59,18 +59,16 @@ impl MessagePagination { None }; - let mut query = room_message::Entity::find() - .filter(room_message::Column::Room.eq(params.room_id)); + let mut query = + room_message::Entity::find().filter(room_message::Column::Room.eq(params.room_id)); query = match (params.direction, cursor_seq) { - (PaginationDirection::Before, Some(seq)) => { - query.filter(room_message::Column::Seq.lt(seq)) - .order_by_desc(room_message::Column::Seq) - } - (PaginationDirection::After, Some(seq)) => { - query.filter(room_message::Column::Seq.gt(seq)) - .order_by_asc(room_message::Column::Seq) - } + (PaginationDirection::Before, Some(seq)) => query + .filter(room_message::Column::Seq.lt(seq)) + .order_by_desc(room_message::Column::Seq), + (PaginationDirection::After, Some(seq)) => query + .filter(room_message::Column::Seq.gt(seq)) + .order_by_asc(room_message::Column::Seq), _ => query.order_by_desc(room_message::Column::Seq), }; @@ -119,8 +117,8 @@ impl MessagePagination { message_id: Uuid, context_size: u64, ) -> Result { - use sea_orm::*; use models::rooms::room_message; + use sea_orm::*; let target = room_message::Entity::find_by_id(message_id) .one(&self.db) diff --git a/libs/transport/reconnect.rs b/libs/transport/reconnect.rs index 5e18e98..b170a0c 100644 --- a/libs/transport/reconnect.rs +++ b/libs/transport/reconnect.rs @@ -1,7 +1,7 @@ +use redis::AsyncCommands; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use uuid::Uuid; -use redis::AsyncCommands; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ClientState { @@ -40,10 +40,14 @@ impl ReconnectManager { let key = format!("client:state:{}:{}", user_id, room_id); let value = last_seq.to_string(); - let mut conn = self.cache.conn().await + let mut conn = self + .cache + .conn() + .await .map_err(|_| crate::error::AppTransportError::Internal)?; - let _: () = conn.set_ex(&key, &value, 86400) + let _: () = conn + .set_ex(&key, &value, 86400) .await .map_err(|_| crate::error::AppTransportError::Internal)?; @@ -57,10 +61,15 @@ impl ReconnectManager { ) -> Result, crate::error::AppTransportError> { let key = format!("client:state:{}:{}", user_id, room_id); - let mut conn = self.cache.conn().await + let mut conn = self + .cache + .conn() + .await .map_err(|_| crate::error::AppTransportError::Internal)?; - let value: Option = conn.get(&key).await + let value: Option = conn + .get(&key) + .await .map_err(|_| crate::error::AppTransportError::Internal)?; Ok(value.and_then(|v| v.parse::().ok())) @@ -72,8 +81,8 @@ impl ReconnectManager { room_id: Uuid, since_seq: i64, ) -> Result, crate::error::AppTransportError> { - use sea_orm::*; use models::rooms::room_message; + use sea_orm::*; let messages = room_message::Entity::find() .filter(room_message::Column::Room.eq(room_id)) @@ -107,7 +116,9 @@ impl ReconnectManager { let mut result = HashMap::new(); for (room_id, client_seq) in room_states { - let missed = self.get_missed_messages(user_id, room_id, client_seq).await?; + let missed = self + .get_missed_messages(user_id, room_id, client_seq) + .await?; if !missed.is_empty() { result.insert(room_id, missed); } diff --git a/libs/transport/richtext.rs b/libs/transport/richtext.rs index 3fd9169..5b51c29 100644 --- a/libs/transport/richtext.rs +++ b/libs/transport/richtext.rs @@ -20,8 +20,7 @@ pub enum BlockType { Image, } -pub struct RichTextRenderer { -} +pub struct RichTextRenderer {} impl RichTextRenderer { pub fn new() -> Self { @@ -59,14 +58,29 @@ impl RichTextRenderer { } pub fn render_to_html(&self, blocks: &[RichTextBlock]) -> String { - blocks.iter() + blocks + .iter() .map(|block| match block.block_type { BlockType::Text => format!("

{}

", html_escape(&block.content)), - BlockType::Code => format!("
{}
", html_escape(&block.content)), - BlockType::Quote => format!("
{}
", html_escape(&block.content)), - BlockType::Link => format!("{}", html_escape(&block.content), html_escape(&block.content)), - BlockType::Mention => format!("@{}", html_escape(&block.content)), - BlockType::Emoji => format!("{}", html_escape(&block.content)), + BlockType::Code => { + format!("
{}
", html_escape(&block.content)) + } + BlockType::Quote => { + format!("
{}
", html_escape(&block.content)) + } + BlockType::Link => format!( + "{}", + html_escape(&block.content), + html_escape(&block.content) + ), + BlockType::Mention => format!( + "@{}", + html_escape(&block.content) + ), + BlockType::Emoji => format!( + "{}", + html_escape(&block.content) + ), BlockType::Image => format!("", html_escape(&block.content)), }) .collect::>() diff --git a/libs/transport/search.rs b/libs/transport/search.rs index 921d5ac..5655d83 100644 --- a/libs/transport/search.rs +++ b/libs/transport/search.rs @@ -40,8 +40,8 @@ impl SearchEngine { &self, query: SearchQuery, ) -> Result { - use sea_orm::*; use models::rooms::room_message; + use sea_orm::*; let mut db_query = room_message::Entity::find(); @@ -53,14 +53,18 @@ impl SearchEngine { db_query = db_query.filter(room_message::Column::SenderId.eq(user_id)); } - let escaped_query = query.query + let escaped_query = query + .query .replace('\\', "\\\\") .replace('%', "\\%") .replace('_', "\\_"); let search_term = format!("%{}%", escaped_query); db_query = db_query.filter(room_message::Column::Content.like(&search_term)); - let total = db_query.clone().count(&self.db).await + let total = db_query + .clone() + .count(&self.db) + .await .map_err(|_| crate::error::AppTransportError::Internal)?; let messages = db_query diff --git a/libs/transport/security.rs b/libs/transport/security.rs index 0189c38..9a3581d 100644 --- a/libs/transport/security.rs +++ b/libs/transport/security.rs @@ -25,7 +25,10 @@ impl RateLimiter { ) -> Result { let key = format!("ratelimit:{}:{}", user_id, action); - let mut conn = self.cache.conn().await + let mut conn = self + .cache + .conn() + .await .map_err(|_| crate::error::AppTransportError::Internal)?; // Atomic INCR with EX NX — sets TTL only on first creation @@ -57,10 +60,15 @@ impl RateLimiter { ) -> Result { let key = format!("ratelimit:{}:{}", user_id, action); - let mut conn = self.cache.conn().await + let mut conn = self + .cache + .conn() + .await .map_err(|_| crate::error::AppTransportError::Internal)?; - let count: Option = conn.get(&key).await + let count: Option = conn + .get(&key) + .await .map_err(|_| crate::error::AppTransportError::Internal)?; let current = count.unwrap_or(0); @@ -85,10 +93,14 @@ impl CsrfProtection { let token = Uuid::new_v4().to_string(); let key = format!("csrf:{}:{}", user_id, token); - let mut conn = self.cache.conn().await + let mut conn = self + .cache + .conn() + .await .map_err(|_| crate::error::AppTransportError::Internal)?; - let _: () = conn.set_ex(&key, "1", 3600) + let _: () = conn + .set_ex(&key, "1", 3600) .await .map_err(|_| crate::error::AppTransportError::Internal)?; @@ -102,14 +114,21 @@ impl CsrfProtection { ) -> Result { let key = format!("csrf:{}:{}", user_id, token); - let mut conn = self.cache.conn().await + let mut conn = self + .cache + .conn() + .await .map_err(|_| crate::error::AppTransportError::Internal)?; - let exists: bool = conn.exists(&key).await + let exists: bool = conn + .exists(&key) + .await .map_err(|_| crate::error::AppTransportError::Internal)?; if exists { - let _: () = conn.del(&key).await + let _: () = conn + .del(&key) + .await .map_err(|_| crate::error::AppTransportError::Internal)?; } diff --git a/libs/transport/seq.rs b/libs/transport/seq.rs index e25de23..8d34bf2 100644 --- a/libs/transport/seq.rs +++ b/libs/transport/seq.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicI64, Ordering}; use dashmap::DashMap; use tokio::sync::Mutex; diff --git a/libs/transport/token.rs b/libs/transport/token.rs index a3ea19b..f4c70e8 100644 --- a/libs/transport/token.rs +++ b/libs/transport/token.rs @@ -1,11 +1,11 @@ use base64::Engine; -use hmac::{Hmac, Mac, KeyInit}; +use hmac::{Hmac, KeyInit, Mac}; use models::UserId; use session::Session; use sha2::Sha256; -use crate::error::AppTransportError; use crate::AppTransport; +use crate::error::AppTransportError; type HmacSha256 = Hmac; diff --git a/libs/transport/unread.rs b/libs/transport/unread.rs index 908982f..68dc9b5 100644 --- a/libs/transport/unread.rs +++ b/libs/transport/unread.rs @@ -69,7 +69,7 @@ impl UnreadManager { user_id: Uuid, room_id: Uuid, ) -> Result { - use models::rooms::{room_user_state, room_message}; + use models::rooms::{room_message, room_user_state}; use sea_orm::*; let state = room_user_state::Entity::find_by_id((room_id, user_id)) @@ -93,7 +93,7 @@ impl UnreadManager { &self, user_id: Uuid, ) -> Result, crate::error::AppTransportError> { - use models::rooms::{room_user_state, room_message}; + use models::rooms::{room_message, room_user_state}; use sea_orm::*; let states = room_user_state::Entity::find() @@ -140,4 +140,4 @@ impl UnreadManager { ) -> Result<(), crate::error::AppTransportError> { Ok(()) } -} \ No newline at end of file +}