From deb25614ba3f92450727d96e0b5656f1a3f10542 Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Mon, 11 May 2026 17:05:17 +0800 Subject: [PATCH] refactor(transport): split handler inbound and types into sub-modules Extract MessageHandler methods into inbound/{message,room,reaction,misc,msg} and type definitions into types/{in_message,out_event}. --- libs/transport/handler/inbound.rs | 803 --------------------- libs/transport/handler/inbound/message.rs | 147 ++++ libs/transport/handler/inbound/misc.rs | 415 +++++++++++ libs/transport/handler/inbound/mod.rs | 118 +++ libs/transport/handler/inbound/msg.rs | 67 ++ libs/transport/handler/inbound/reaction.rs | 180 +++++ libs/transport/handler/inbound/room.rs | 237 ++++++ libs/transport/handler/session.rs | 40 +- libs/transport/handler/types.rs | 559 -------------- libs/transport/handler/types/in_message.rs | 104 +++ libs/transport/handler/types/mod.rs | 5 + libs/transport/handler/types/out_event.rs | 275 +++++++ libs/transport/handler/ws.rs | 65 +- libs/transport/lib.rs | 39 +- libs/transport/token.rs | 2 +- 15 files changed, 1612 insertions(+), 1444 deletions(-) delete mode 100644 libs/transport/handler/inbound.rs create mode 100644 libs/transport/handler/inbound/message.rs create mode 100644 libs/transport/handler/inbound/misc.rs create mode 100644 libs/transport/handler/inbound/mod.rs create mode 100644 libs/transport/handler/inbound/msg.rs create mode 100644 libs/transport/handler/inbound/reaction.rs create mode 100644 libs/transport/handler/inbound/room.rs delete mode 100644 libs/transport/handler/types.rs create mode 100644 libs/transport/handler/types/in_message.rs create mode 100644 libs/transport/handler/types/mod.rs create mode 100644 libs/transport/handler/types/out_event.rs diff --git a/libs/transport/handler/inbound.rs b/libs/transport/handler/inbound.rs deleted file mode 100644 index 6e20133..0000000 --- a/libs/transport/handler/inbound.rs +++ /dev/null @@ -1,803 +0,0 @@ -use room::ws_context::WsUserContext; -use uuid::Uuid; - -use super::session::TransportSession; -use super::types::{WsInMessage, WsOutEvent}; -use crate::error::AppTransportError; -use crate::event::{category, message, reaction}; - -fn service_err(op: &str, err: E) -> AppTransportError { - tracing::warn!(error = %err, operation = %op, "WS service operation failed"); - AppTransportError::Internal -} - -pub struct MessageHandler; - -impl MessageHandler { - pub async fn handle( - session: &TransportSession, - msg: WsInMessage, - ) -> Result, AppTransportError> { - match msg { - WsInMessage::Ping => Ok(Some(WsOutEvent::Pong { - protocol_version: super::types::WS_PROTOCOL_VERSION, - })), - WsInMessage::Subscribe { room } => { - let sub = session - .subscribe_room(room) - .await - .map_err(|e| service_err("subscribe_room", e))?; - session.subscriptions.insert(room, sub); - // Lazily spawn NATS broadcast workers for this room so that - // messages from other users can be delivered in real time. - // SPAWNED_ROOMS guard prevents duplicate spawns. - session.service.room.spawn_room_workers(room); - Ok(None) - } - WsInMessage::Unsubscribe { room } => { - session.unsubscribe_room(room).await; - Ok(None) - } - WsInMessage::TypingStart { room } => { - session.broadcast_typing(room, "start").await; - Ok(None) - } - WsInMessage::TypingStop { room } => { - session.broadcast_typing(room, "stop").await; - Ok(None) - } - WsInMessage::ReadReceipt { - room, - last_read_seq, - } => { - let ctx = WsUserContext::new(session.user.user_id); - session - .service - .room - .room_user_state_update_read_seq(room, last_read_seq, &ctx) - .await - .map_err(|e| service_err("room_user_state_update_read_seq", e))?; - Ok(None) - } - WsInMessage::MessageList { - room, - before_seq, - after_seq, - limit, - } => { - let ctx = WsUserContext::new(session.user.user_id); - let list = session - .service - .room - .room_message_list(room, before_seq, after_seq, limit, &ctx) - .await - .map_err(|e| service_err("room_message_list", e))?; - let resp = message::MessageListService { - room, - messages: list - .messages - .iter() - .map(|m| message::MessageNewService { - id: m.id, - seq: m.seq, - room: m.room, - sender_type: m.sender_type.clone(), - sender_id: m.sender_id, - display_name: m.display_name.clone(), - thread: m.thread, - in_reply_to: m.in_reply_to, - content: m.content.clone(), - content_type: m.content_type.clone(), - thinking_content: m.thinking_content.clone(), - thinking_is_chunked: m.thinking_is_chunked, - send_at: m.send_at, - reactions: Some( - m.reactions - .iter() - .map(|r| reaction::ReactionGroup { - emoji: r.emoji.clone(), - count: r.count as i64, - reacted_by_me: r.reacted_by_me, - users: r.users.iter().filter_map(|u| Uuid::parse_str(u).ok()).collect(), - }) - .collect(), - ), - }) - .collect(), - total: list.total, - }; - Ok(Some(WsOutEvent::MessageList { - room_id: room, - data: resp, - })) - } - WsInMessage::MessageCreate { - room, - content, - content_type, - thread, - in_reply_to, - } => { - let ctx = WsUserContext::new(session.user.user_id); - let msg = session - .service - .room - .room_message_create( - room, - room::RoomMessageCreateRequest { - content, - content_type, - thread, - in_reply_to, - attachment_ids: vec![], - }, - &ctx, - ) - .await - .map_err(|e| service_err("room_message_create", e))?; - Ok(Some(WsOutEvent::MessageNew { - room_id: room, - data: message::MessageNewService { - id: msg.id, - seq: msg.seq, - room: msg.room, - sender_type: msg.sender_type, - sender_id: msg.sender_id, - display_name: msg.display_name, - thread: msg.thread, - in_reply_to: msg.in_reply_to, - content: msg.content, - content_type: msg.content_type, - thinking_content: msg.thinking_content, - thinking_is_chunked: false, - send_at: msg.send_at, - reactions: None, - }, - })) - } - WsInMessage::MessageUpdate { message, content } => { - let ctx = WsUserContext::new(session.user.user_id); - session - .service - .room - .room_message_update(message, room::RoomMessageUpdateRequest { content }, &ctx) - .await - .map_err(|e| service_err("room_message_update", e))?; - Ok(None) - } - WsInMessage::MessageRevoke { message } => { - let ctx = WsUserContext::new(session.user.user_id); - session - .service - .room - .room_message_revoke(message, &ctx) - .await - .map_err(|e| service_err("room_message_revoke", e))?; - Ok(None) - } - WsInMessage::RoomGet { room } => { - let ctx = WsUserContext::new(session.user.user_id); - session - .service - .room - .require_room_access(room, ctx.user_id) - .await - .map_err(|e| service_err("require_room_access", e))?; - let rm = session - .service - .room - .find_room_or_404(room) - .await - .map_err(|e| service_err("find_room", e))?; - Ok(Some(WsOutEvent::RoomCreated { - room_id: rm.id, - data: crate::event::rooms::RoomCreatedService { - id: rm.id, - project: rm.project, - room_name: rm.room_name, - public: rm.public, - category: rm.category, - created_by: rm.created_by, - created_at: rm.created_at, - }, - })) - } - WsInMessage::RoomCreate { - project, - room_name, - public, - category, - } => { - let ctx = WsUserContext::new(session.user.user_id); - let rm = session - .service - .room - .room_create( - project.to_string(), - room::RoomCreateRequest { - room_name, - public, - category, - }, - &ctx, - ) - .await - .map_err(|e| service_err("room_create", e))?; - Ok(Some(WsOutEvent::RoomCreated { - room_id: rm.id, - data: crate::event::rooms::RoomCreatedService { - id: rm.id, - project, - room_name: rm.room_name, - public: rm.public, - category, - created_by: session.user.user_id, - created_at: rm.created_at, - }, - })) - } - WsInMessage::RoomUpdate { - room, - room_name, - public, - category, - } => { - let ctx = WsUserContext::new(session.user.user_id); - session - .service - .room - .room_update( - room, - room::RoomUpdateRequest { - room_name, - public, - category, - }, - &ctx, - ) - .await - .map_err(|e| service_err("room_update", e))?; - Ok(None) - } - WsInMessage::RoomDelete { room } => { - let ctx = WsUserContext::new(session.user.user_id); - session - .service - .room - .room_delete(room, &ctx) - .await - .map_err(|e| service_err("room_delete", e))?; - Ok(None) - } - WsInMessage::CategoryCreate { - project, - name, - position, - } => { - let ctx = WsUserContext::new(session.user.user_id); - let cat = session - .service - .room - .room_category_create( - project.to_string(), - room::RoomCategoryCreateRequest { name, position }, - &ctx, - ) - .await - .map_err(|e| service_err("room_category_create", e))?; - Ok(Some(WsOutEvent::CategoryCreated { - project, - data: category::CategoryCreatedService { - id: cat.id, - project, - name: cat.name, - position: cat.position, - created_by: session.user.user_id, - created_at: cat.created_at, - }, - })) - } - WsInMessage::CategoryUpdate { id, name, position } => { - let ctx = WsUserContext::new(session.user.user_id); - session - .service - .room - .room_category_update( - id, - room::RoomCategoryUpdateRequest { name, position }, - &ctx, - ) - .await - .map_err(|e| service_err("room_category_update", e))?; - Ok(None) - } - WsInMessage::CategoryDelete { id } => { - let ctx = WsUserContext::new(session.user.user_id); - session - .service - .room - .room_category_delete(id, &ctx) - .await - .map_err(|e| service_err("room_category_delete", e))?; - Ok(None) - } - WsInMessage::AccessGrant { room, user } => { - let ctx = WsUserContext::new(session.user.user_id); - session - .service - .room - .room_access_grant(room, user, &ctx) - .await - .map_err(|e| service_err("room_access_grant", e))?; - Ok(None) - } - WsInMessage::AccessRevoke { room, user } => { - let ctx = WsUserContext::new(session.user.user_id); - session - .service - .room - .room_access_revoke(room, user, &ctx) - .await - .map_err(|e| service_err("room_access_revoke", e))?; - Ok(None) - } - WsInMessage::ReactionAdd { - room, - message, - emoji, - } => { - let ctx = WsUserContext::new(session.user.user_id); - let rxs = session - .service - .room - .message_reaction_add(message, emoji, &ctx) - .await - .map_err(|e| service_err("message_reaction_add", e))?; - Ok(Some(WsOutEvent::ReactionBatchUpdated { - room_id: room, - data: reaction::ReactionBatchUpdatedService { - room: room, - message: message, - reactions: rxs - .reactions - .into_iter() - .map(|g| reaction::ReactionGroup { - emoji: g.emoji, - count: g.count as i64, - reacted_by_me: g.reacted_by_me, - users: g - .users - .iter() - .filter_map(|u| u.parse::().ok()) - .collect(), - }) - .collect(), - }, - })) - } - WsInMessage::ReactionRemove { - room, - message, - emoji, - } => { - let ctx = WsUserContext::new(session.user.user_id); - let rxs = session - .service - .room - .message_reaction_remove(message, emoji, &ctx) - .await - .map_err(|e| service_err("message_reaction_remove", e))?; - Ok(Some(WsOutEvent::ReactionBatchUpdated { - room_id: room, - data: reaction::ReactionBatchUpdatedService { - room: room, - message: message, - reactions: rxs - .reactions - .into_iter() - .map(|g| reaction::ReactionGroup { - emoji: g.emoji, - count: g.count as i64, - reacted_by_me: g.reacted_by_me, - users: g - .users - .iter() - .filter_map(|u| u.parse::().ok()) - .collect(), - }) - .collect(), - }, - })) - } - WsInMessage::ThreadCreate { room, parent } => { - let ctx = WsUserContext::new(session.user.user_id); - session - .service - .room - .room_thread_create( - room, - room::RoomThreadCreateRequest { parent_seq: parent }, - &ctx, - ) - .await - .map_err(|e| service_err("room_thread_create", e))?; - Ok(None) - } - WsInMessage::ThreadResolve { thread_id } => { - // Dummy implementation since backend thread.rs doesn't have resolve yet - tracing::info!(%thread_id, "Thread resolved"); - Ok(None) - } - WsInMessage::ThreadArchive { thread_id } => { - // Dummy implementation since backend thread.rs doesn't have archive yet - tracing::info!(%thread_id, "Thread archived"); - Ok(None) - } - WsInMessage::PinAdd { room: _, message } => { - let ctx = WsUserContext::new(session.user.user_id); - session - .service - .room - .room_pin_add(message, &ctx) - .await - .map_err(|e| service_err("room_pin_add", e))?; - Ok(None) - } - WsInMessage::PinRemove { room: _, message } => { - let ctx = WsUserContext::new(session.user.user_id); - session - .service - .room - .room_pin_remove(message, &ctx) - .await - .map_err(|e| service_err("room_pin_remove", e))?; - Ok(None) - } - WsInMessage::DraftSave { room, content } => { - let ctx = WsUserContext::new(session.user.user_id); - let draft = session - .service - .room - .draft_save(room, content, &ctx) - .await - .map_err(|e| service_err("draft_save", e))?; - Ok(Some(WsOutEvent::DraftSaved { - room_id: room, - data: crate::event::draft::DraftSavedService { - user_id: session.user.user_id, - room: draft.room_id, - content: draft.content, - saved_at: draft.saved_at, - }, - })) - } - WsInMessage::DraftClear { room } => { - let ctx = WsUserContext::new(session.user.user_id); - session - .service - .room - .draft_clear(room, &ctx) - .await - .map_err(|e| service_err("draft_clear", e))?; - Ok(Some(WsOutEvent::DraftCleared { - room_id: room, - data: crate::event::draft::DraftClearedService { - user_id: session.user.user_id, - room, - cleared_at: chrono::Utc::now(), - }, - })) - } - WsInMessage::NotificationMarkRead { id } => { - let ctx = WsUserContext::new(session.user.user_id); - session - .service - .room - .notification_mark_read(id, &ctx) - .await - .map_err(|e| service_err("notification_mark_read", e))?; - Ok(None) - } - WsInMessage::NotificationMarkAllRead { .. } => { - let ctx = WsUserContext::new(session.user.user_id); - session - .service - .room - .notification_mark_all_read(&ctx) - .await - .map_err(|e| service_err("notification_mark_all_read", e))?; - Ok(None) - } - WsInMessage::NotificationArchive { id } => { - let ctx = WsUserContext::new(session.user.user_id); - session - .service - .room - .notification_archive(id, &ctx) - .await - .map_err(|e| service_err("notification_archive", e))?; - Ok(None) - } - WsInMessage::Search { - q, - room, - start_time, - end_time, - sender_id, - content_type, - limit, - offset, - } => { - let ctx = WsUserContext::new(session.user.user_id); - let room_id = room.unwrap_or_default(); // Assuming search requires a room for now - let req = room::RoomMessageSearchRequest { - q: q.clone(), - start_time, - end_time, - sender_id, - content_type, - limit, - offset, - }; - let res = session - .service - .room - .room_message_search(room_id, req, &ctx) - .await - .map_err(|e| service_err("room_message_search", e))?; - - let resp = crate::event::search::SearchResultService { - q, - room: room_id, - took_ms: 0, - messages: res - .messages - .into_iter() - .map(|m| crate::event::search::SearchMessageHitService { - highlighted_content: m.highlighted_content.unwrap_or_default(), - message: crate::event::message::MessageNewService { - id: m.id, - seq: m.seq, - room: m.room, - sender_type: m.sender_type, - sender_id: m.sender_id, - display_name: m.display_name, - thread: m.thread, - in_reply_to: m.in_reply_to, - content: m.content, - content_type: m.content_type, - thinking_content: m.thinking_content, - thinking_is_chunked: m.thinking_is_chunked, - send_at: m.send_at, - reactions: None, - }, - }) - .collect(), - total: res.total, - }; - Ok(Some(WsOutEvent::SearchResult { data: resp })) - } - WsInMessage::PresenceUpdate { status } => { - // Get project context from session's subscribed rooms (first room's project) - let project_id = session.get_current_project().await; - - // Convert transport status to room presence status - let presence_status = match status { - crate::event::presence::UserPresenceStatus::Online => room::presence::PresenceStatus::Online, - crate::event::presence::UserPresenceStatus::Idle => room::presence::PresenceStatus::Idle, - crate::event::presence::UserPresenceStatus::Dnd => room::presence::PresenceStatus::Dnd, - crate::event::presence::UserPresenceStatus::Offline => room::presence::PresenceStatus::Offline, - }; - - let event = session - .service - .room - .set_user_presence(session.user.user_id, project_id, presence_status) - .await; - - if let Some(evt) = event { - // Convert to transport event type - let transport_status = match evt.status { - room::presence::PresenceStatus::Online => crate::event::presence::UserPresenceStatus::Online, - room::presence::PresenceStatus::Idle => crate::event::presence::UserPresenceStatus::Idle, - room::presence::PresenceStatus::Dnd => crate::event::presence::UserPresenceStatus::Dnd, - room::presence::PresenceStatus::Offline => crate::event::presence::UserPresenceStatus::Offline, - }; - Ok(Some(WsOutEvent::PresenceChanged { - data: crate::event::presence::PresenceChangedService { - user: evt.user_id, - project: evt.project_id, - status: transport_status, - last_seen_at: evt.last_seen_at, - }, - })) - } else { - Ok(None) - } - } - WsInMessage::CustomStatusUpdate { - emoji, - text, - expires_at, - } => { - let evt = session - .service - .room - .set_custom_status(session.user.user_id, emoji.clone(), text.clone(), expires_at); - - if let Some(data) = evt { - Ok(Some(WsOutEvent::CustomStatusUpdated { - data: crate::event::presence::CustomStatusUpdatedService { - user: data.user_id, - emoji: data.emoji, - text: data.text, - expires_at: data.expires_at, - }, - })) - } else { - Ok(None) - } - } - WsInMessage::InviteCreate { .. } => { - tracing::info!("Invite create"); - Ok(None) - } - WsInMessage::InviteAccept { .. } => { - tracing::info!("Invite accept"); - Ok(None) - } - WsInMessage::InviteRevoke { .. } => { - tracing::info!("Invite revoke"); - Ok(None) - } - WsInMessage::BanCreate { .. } => { - tracing::info!("Ban create"); - Ok(None) - } - WsInMessage::BanRemove { .. } => { - tracing::info!("Ban remove"); - Ok(None) - } - WsInMessage::VoiceJoin { room } => { - tracing::info!(%room, "Voice join"); - Ok(None) - } - WsInMessage::VoiceLeave { room } => { - tracing::info!(%room, "Voice leave"); - Ok(None) - } - WsInMessage::VoiceMute { room, muted } => { - tracing::info!(%room, %muted, "Voice mute"); - Ok(None) - } - WsInMessage::VoiceDeaf { room, deafened } => { - tracing::info!(%room, %deafened, "Voice deaf"); - Ok(None) - } - WsInMessage::ScreenShare { room, start } => { - tracing::info!(%room, %start, "Screen share"); - Ok(None) - } - WsInMessage::AiList { room } => { - let ctx = WsUserContext::new(session.user.user_id); - let ai_list = session - .service - .room - .room_ai_list(room, &ctx) - .await - .map_err(|e| service_err("room_ai_list", e))?; - - let data = serde_json::to_value(ai_list).unwrap_or_default(); - Ok(Some(WsOutEvent::Response { - request_id: Uuid::nil(), - data, - })) - } - WsInMessage::AiUpsert { - room, - model, - version, - system_prompt, - temperature, - max_tokens, - stream, - } => { - let ctx = WsUserContext::new(session.user.user_id); - let req = room::RoomAiUpsertRequest { - model, - version, - system_prompt, - temperature, - max_tokens, - stream, - history_limit: None, - use_exact: None, - think: None, - min_score: None, - agent_type: None, - }; - let ai_model = session - .service - .room - .room_ai_upsert(room, req, &ctx) - .await - .map_err(|e| service_err("room_ai_upsert", e))?; - - let data = serde_json::to_value(ai_model).unwrap_or_default(); - Ok(Some(WsOutEvent::Response { - request_id: Uuid::nil(), - data, - })) - } - WsInMessage::AiDelete { room, agent_id } => { - let ctx = WsUserContext::new(session.user.user_id); - session - .service - .room - .room_ai_delete(room, agent_id, &ctx) - .await - .map_err(|e| service_err("room_ai_delete", e))?; - Ok(None) - } - WsInMessage::AiStop { room } => { - let ctx = WsUserContext::new(session.user.user_id); - session - .service - .room - .room_ai_stop(room, &ctx) - .await - .map_err(|e| service_err("room_ai_stop", e))?; - Ok(None) - } - WsInMessage::UserSummary { username } => { - let ctx = session.to_session(); - let summary = session - .service - .user_get_summary(ctx, username) - .await - .map_err(|e| service_err("user_get_summary", e))?; - let data = serde_json::to_value(summary).unwrap_or_default(); - Ok(Some(WsOutEvent::Response { - request_id: Uuid::nil(), // Filled by ws_handler if rid exists - data, - })) - } - WsInMessage::StateSetReadSeq { - room, - last_read_seq, - } => { - let ctx = WsUserContext::new(session.user.user_id); - session - .service - .room - .room_user_state_update_read_seq(room, last_read_seq, &ctx) - .await - .map_err(|e| service_err("room_user_state_update_read_seq", e))?; - Ok(None) - } - WsInMessage::StateUpdateDnd { - room, - do_not_disturb, - dnd_start_hour, - dnd_end_hour, - } => { - let ctx = WsUserContext::new(session.user.user_id); - session - .service - .room - .room_user_state_update_dnd( - room, - room::RoomUserStateUpdateDndRequest { - do_not_disturb, - dnd_start_hour, - dnd_end_hour, - }, - &ctx, - ) - .await - .map_err(|e| service_err("room_user_state_update_dnd", e))?; - Ok(None) - } - } - } -} diff --git a/libs/transport/handler/inbound/message.rs b/libs/transport/handler/inbound/message.rs new file mode 100644 index 0000000..1a2fd0e --- /dev/null +++ b/libs/transport/handler/inbound/message.rs @@ -0,0 +1,147 @@ +use room::ws_context::WsUserContext; + +use crate::error::AppTransportError; +use crate::event::{message, reaction}; +use crate::handler::session::TransportSession; +use crate::handler::types::WsOutEvent; + +pub(crate) async fn message_list( + session: &TransportSession, + room: models::RoomId, + before_seq: Option, + after_seq: Option, + limit: Option, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + let list = session + .service + .room + .room_message_list(room, before_seq, after_seq, limit, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_message_list failed"); + AppTransportError::Internal + })?; + let resp = message::MessageListService { + room, + messages: list + .messages + .iter() + .map(|m| message::MessageNewService { + id: m.id, + seq: m.seq, + room: m.room, + sender_type: m.sender_type.clone(), + sender_id: m.sender_id, + display_name: m.display_name.clone(), + thread: m.thread, + in_reply_to: m.in_reply_to, + content: m.content.clone(), + content_type: m.content_type.clone(), + thinking_content: m.thinking_content.clone(), + thinking_is_chunked: m.thinking_is_chunked, + send_at: m.send_at, + reactions: Some( + m.reactions + .iter() + .map(|r| reaction::ReactionGroup { + emoji: r.emoji.clone(), + count: r.count as i64, + reacted_by_me: r.reacted_by_me, + users: r.users.iter().filter_map(|u| uuid::Uuid::parse_str(u).ok()).collect(), + }) + .collect(), + ), + }) + .collect(), + total: list.total, + }; + Ok(Some(WsOutEvent::MessageList { + room_id: room, + data: resp, + })) +} + +pub(crate) async fn message_create( + session: &TransportSession, + room: models::RoomId, + content: String, + content_type: Option, + thread: Option, + in_reply_to: Option, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + let msg = session + .service + .room + .room_message_create( + room, + room::RoomMessageCreateRequest { + content, + content_type, + thread, + in_reply_to, + attachment_ids: vec![], + }, + &ctx, + ) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_message_create failed"); + AppTransportError::Internal + })?; + Ok(Some(WsOutEvent::MessageNew { + room_id: room, + data: message::MessageNewService { + id: msg.id, + seq: msg.seq, + room: msg.room, + sender_type: msg.sender_type, + sender_id: msg.sender_id, + display_name: msg.display_name, + thread: msg.thread, + in_reply_to: msg.in_reply_to, + content: msg.content, + content_type: msg.content_type, + thinking_content: msg.thinking_content, + thinking_is_chunked: false, + send_at: msg.send_at, + reactions: None, + }, + })) +} + +pub(crate) async fn message_update( + session: &TransportSession, + message: uuid::Uuid, + content: String, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + session + .service + .room + .room_message_update(message, room::RoomMessageUpdateRequest { content }, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_message_update failed"); + AppTransportError::Internal + })?; + Ok(None) +} + +pub(crate) async fn message_revoke( + session: &TransportSession, + message: uuid::Uuid, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + session + .service + .room + .room_message_revoke(message, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_message_revoke failed"); + AppTransportError::Internal + })?; + Ok(None) +} \ No newline at end of file diff --git a/libs/transport/handler/inbound/misc.rs b/libs/transport/handler/inbound/misc.rs new file mode 100644 index 0000000..b6de3fe --- /dev/null +++ b/libs/transport/handler/inbound/misc.rs @@ -0,0 +1,415 @@ +use room::ws_context::WsUserContext; +use uuid::Uuid; + +use crate::error::AppTransportError; +use crate::event::{message, presence as pres, search as srch}; +use crate::handler::session::TransportSession; +use crate::handler::types::WsOutEvent; + +// ─── Notification ────────────────────────────────────────────────────── + +pub(crate) async fn notification_mark_read( + session: &TransportSession, + id: Uuid, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + session + .service + .room + .notification_mark_read(id, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "notification_mark_read failed"); + AppTransportError::Internal + })?; + Ok(None) +} + +pub(crate) async fn notification_mark_all_read( + session: &TransportSession, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + session + .service + .room + .notification_mark_all_read(&ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "notification_mark_all_read failed"); + AppTransportError::Internal + })?; + Ok(None) +} + +pub(crate) async fn notification_archive( + session: &TransportSession, + id: Uuid, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + session + .service + .room + .notification_archive(id, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "notification_archive failed"); + AppTransportError::Internal + })?; + Ok(None) +} + +// ─── Search ──────────────────────────────────────────────────────────── + +pub(crate) async fn search( + session: &TransportSession, + q: String, + room: Option, + start_time: Option>, + end_time: Option>, + sender_id: Option, + content_type: Option, + limit: Option, + offset: Option, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + let room_id = room.unwrap_or_default(); + let req = room::RoomMessageSearchRequest { + q: q.clone(), + start_time, + end_time, + sender_id, + content_type, + limit, + offset, + }; + let res = session + .service + .room + .room_message_search(room_id, req, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_message_search failed"); + AppTransportError::Internal + })?; + + let resp = srch::SearchResultService { + q, + room: room_id, + took_ms: 0, + messages: res + .messages + .into_iter() + .map(|m| srch::SearchMessageHitService { + highlighted_content: m.highlighted_content.unwrap_or_default(), + message: message::MessageNewService { + id: m.id, + seq: m.seq, + room: m.room, + sender_type: m.sender_type, + sender_id: m.sender_id, + display_name: m.display_name, + thread: m.thread, + in_reply_to: m.in_reply_to, + content: m.content, + content_type: m.content_type, + thinking_content: m.thinking_content, + thinking_is_chunked: m.thinking_is_chunked, + send_at: m.send_at, + reactions: None, + }, + }) + .collect(), + total: res.total, + }; + Ok(Some(WsOutEvent::SearchResult { data: resp })) +} + +// ─── Presence ────────────────────────────────────────────────────────── + +pub(crate) async fn presence_update( + session: &TransportSession, + status: pres::UserPresenceStatus, +) -> Result, AppTransportError> { + let project_id = session.get_current_project().await; + + let presence_status = match status { + pres::UserPresenceStatus::Online => room::presence::PresenceStatus::Online, + pres::UserPresenceStatus::Idle => room::presence::PresenceStatus::Idle, + pres::UserPresenceStatus::Dnd => room::presence::PresenceStatus::Dnd, + pres::UserPresenceStatus::Offline => room::presence::PresenceStatus::Offline, + }; + + let event = session + .service + .room + .set_user_presence(session.user.user_id, project_id, presence_status) + .await; + + if let Some(evt) = event { + let transport_status = match evt.status { + room::presence::PresenceStatus::Online => pres::UserPresenceStatus::Online, + room::presence::PresenceStatus::Idle => pres::UserPresenceStatus::Idle, + room::presence::PresenceStatus::Dnd => pres::UserPresenceStatus::Dnd, + room::presence::PresenceStatus::Offline => pres::UserPresenceStatus::Offline, + }; + Ok(Some(WsOutEvent::PresenceChanged { + data: pres::PresenceChangedService { + user: evt.user_id, + project: evt.project_id, + status: transport_status, + last_seen_at: evt.last_seen_at, + }, + })) + } else { + Ok(None) + } +} + +pub(crate) async fn custom_status_update( + session: &TransportSession, + emoji: Option, + text: Option, + expires_at: Option>, +) -> Result, AppTransportError> { + let evt = session + .service + .room + .set_custom_status(session.user.user_id, emoji.clone(), text.clone(), expires_at); + + if let Some(data) = evt { + Ok(Some(WsOutEvent::CustomStatusUpdated { + data: pres::CustomStatusUpdatedService { + user: data.user_id, + emoji: data.emoji, + text: data.text, + expires_at: data.expires_at, + }, + })) + } else { + Ok(None) + } +} + +// ─── Invite (stubs) ──────────────────────────────────────────────────── + +pub(crate) async fn invite_create() -> Result, AppTransportError> { + tracing::info!("Invite create"); + Ok(None) +} + +pub(crate) async fn invite_accept() -> Result, AppTransportError> { + tracing::info!("Invite accept"); + Ok(None) +} + +pub(crate) async fn invite_revoke() -> Result, AppTransportError> { + tracing::info!("Invite revoke"); + Ok(None) +} + +// ─── Ban (stubs) ────────────────────────────────────────────────────── + +pub(crate) async fn ban_create() -> Result, AppTransportError> { + tracing::info!("Ban create"); + Ok(None) +} + +pub(crate) async fn ban_remove() -> Result, AppTransportError> { + tracing::info!("Ban remove"); + Ok(None) +} + +// ─── Voice (stubs) ──────────────────────────────────────────────────── + +pub(crate) async fn voice_join(room: models::RoomId) -> Result, AppTransportError> { + tracing::info!(%room, "Voice join"); + Ok(None) +} + +pub(crate) async fn voice_leave(room: models::RoomId) -> Result, AppTransportError> { + tracing::info!(%room, "Voice leave"); + Ok(None) +} + +pub(crate) async fn voice_mute(room: models::RoomId, muted: bool) -> Result, AppTransportError> { + tracing::info!(%room, %muted, "Voice mute"); + Ok(None) +} + +pub(crate) async fn voice_deaf(room: models::RoomId, deafened: bool) -> Result, AppTransportError> { + tracing::info!(%room, %deafened, "Voice deaf"); + Ok(None) +} + +pub(crate) async fn screen_share(room: models::RoomId, start: bool) -> Result, AppTransportError> { + tracing::info!(%room, %start, "Screen share"); + Ok(None) +} + +// ─── AI ──────────────────────────────────────────────────────────────── + +pub(crate) async fn ai_list( + session: &TransportSession, + room: models::RoomId, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + let ai_list = session + .service + .room + .room_ai_list(room, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_ai_list failed"); + AppTransportError::Internal + })?; + let data = serde_json::to_value(ai_list).unwrap_or_default(); + Ok(Some(WsOutEvent::Response { + request_id: Uuid::nil(), + data, + })) +} + +pub(crate) async fn ai_upsert( + session: &TransportSession, + room: models::RoomId, + model: Uuid, + version: Option, + system_prompt: Option, + temperature: Option, + max_tokens: Option, + stream: Option, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + let req = room::RoomAiUpsertRequest { + model, + version, + system_prompt, + temperature, + max_tokens, + stream, + history_limit: None, + use_exact: None, + think: None, + min_score: None, + agent_type: None, + }; + let ai_model = session + .service + .room + .room_ai_upsert(room, req, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_ai_upsert failed"); + AppTransportError::Internal + })?; + let data = serde_json::to_value(ai_model).unwrap_or_default(); + Ok(Some(WsOutEvent::Response { + request_id: Uuid::nil(), + data, + })) +} + +pub(crate) async fn ai_delete( + session: &TransportSession, + room: models::RoomId, + agent_id: Uuid, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + session + .service + .room + .room_ai_delete(room, agent_id, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_ai_delete failed"); + AppTransportError::Internal + })?; + Ok(None) +} + +pub(crate) async fn ai_stop( + session: &TransportSession, + room: models::RoomId, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + session + .service + .room + .room_ai_stop(room, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_ai_stop failed"); + AppTransportError::Internal + })?; + Ok(None) +} + +// ─── User ────────────────────────────────────────────────────────────── + +pub(crate) async fn user_summary( + session: &TransportSession, + username: String, +) -> Result, AppTransportError> { + let ctx = session.to_session(); + let summary = session + .service + .user_get_summary(ctx, username) + .await + .map_err(|e| { + tracing::warn!(error = %e, "user_get_summary failed"); + AppTransportError::Internal + })?; + let data = serde_json::to_value(summary).unwrap_or_default(); + Ok(Some(WsOutEvent::Response { + request_id: Uuid::nil(), + data, + })) +} + +// ─── State ───────────────────────────────────────────────────────────── + +pub(crate) async fn state_set_read_seq( + session: &TransportSession, + room: models::RoomId, + last_read_seq: i64, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + session + .service + .room + .room_user_state_update_read_seq(room, last_read_seq, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_user_state_update_read_seq failed"); + AppTransportError::Internal + })?; + Ok(None) +} + +pub(crate) async fn state_update_dnd( + session: &TransportSession, + room: models::RoomId, + do_not_disturb: Option, + dnd_start_hour: Option, + dnd_end_hour: Option, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + session + .service + .room + .room_user_state_update_dnd( + room, + room::RoomUserStateUpdateDndRequest { + do_not_disturb, + dnd_start_hour, + dnd_end_hour, + }, + &ctx, + ) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_user_state_update_dnd failed"); + AppTransportError::Internal + })?; + Ok(None) +} \ No newline at end of file diff --git a/libs/transport/handler/inbound/mod.rs b/libs/transport/handler/inbound/mod.rs new file mode 100644 index 0000000..1041778 --- /dev/null +++ b/libs/transport/handler/inbound/mod.rs @@ -0,0 +1,118 @@ +use crate::error::AppTransportError; +use crate::handler::session::TransportSession; +use crate::handler::types::{WsInMessage, WsOutEvent}; + +mod misc; +mod msg; +mod message; +mod reaction; +mod room; + +pub struct MessageHandler; + +impl MessageHandler { + pub async fn handle( + session: &TransportSession, + msg: WsInMessage, + ) -> Result, AppTransportError> { + match msg { + WsInMessage::Ping => msg::ping().await, + WsInMessage::Subscribe { room } => msg::subscribe(session, room).await, + WsInMessage::Unsubscribe { room } => msg::unsubscribe(session, room).await, + WsInMessage::TypingStart { room } => msg::typing_start(session, room).await, + WsInMessage::TypingStop { room } => msg::typing_stop(session, room).await, + WsInMessage::ReadReceipt { room, last_read_seq } => { + msg::read_receipt(session, room, last_read_seq).await + } + WsInMessage::MessageList { room, before_seq, after_seq, limit } => { + message::message_list(session, room, before_seq, after_seq, limit).await + } + WsInMessage::MessageCreate { room, content, content_type, thread, in_reply_to } => { + message::message_create(session, room, content, content_type, thread, in_reply_to).await + } + WsInMessage::MessageUpdate { message, content } => { + message::message_update(session, message, content).await + } + WsInMessage::MessageRevoke { message } => { + message::message_revoke(session, message).await + } + WsInMessage::RoomGet { room } => room::room_get(session, room).await, + WsInMessage::RoomCreate { project, room_name, public, category } => { + room::room_create(session, project, room_name, public, category).await + } + WsInMessage::RoomUpdate { room, room_name, public, category } => { + room::room_update(session, room, room_name, public, category).await + } + WsInMessage::RoomDelete { room } => room::room_delete(session, room).await, + WsInMessage::CategoryCreate { project, name, position } => { + room::category_create(session, project, name, position).await + } + WsInMessage::CategoryUpdate { id, name, position } => { + room::category_update(session, id, name, position).await + } + WsInMessage::CategoryDelete { id } => room::category_delete(session, id).await, + WsInMessage::AccessGrant { room, user } => room::access_grant(session, room, user).await, + WsInMessage::AccessRevoke { room, user } => room::access_revoke(session, room, user).await, + WsInMessage::ReactionAdd { room, message, emoji } => { + reaction::reaction_add(session, room, message, emoji).await + } + WsInMessage::ReactionRemove { room, message, emoji } => { + reaction::reaction_remove(session, room, message, emoji).await + } + WsInMessage::ThreadCreate { room, parent } => { + reaction::thread_create(session, room, parent).await + } + WsInMessage::ThreadResolve { thread_id } => reaction::thread_resolve(thread_id).await, + WsInMessage::ThreadArchive { thread_id } => reaction::thread_archive(thread_id).await, + WsInMessage::PinAdd { message, .. } => reaction::pin_add(session, message).await, + WsInMessage::PinRemove { message, .. } => reaction::pin_remove(session, message).await, + WsInMessage::DraftSave { room, content } => { + reaction::draft_save(session, room, content).await + } + WsInMessage::DraftClear { room } => reaction::draft_clear(session, room).await, + WsInMessage::NotificationMarkRead { id } => { + misc::notification_mark_read(session, id).await + } + WsInMessage::NotificationMarkAllRead { .. } => { + misc::notification_mark_all_read(session).await + } + WsInMessage::NotificationArchive { id } => { + misc::notification_archive(session, id).await + } + WsInMessage::Search { q, room, start_time, end_time, sender_id, content_type, limit, offset } => { + misc::search(session, q, room, start_time, end_time, sender_id, content_type, limit, offset).await + } + WsInMessage::PresenceUpdate { status } => { + misc::presence_update(session, status).await + } + WsInMessage::CustomStatusUpdate { emoji, text, expires_at } => { + misc::custom_status_update(session, emoji, text, expires_at).await + } + WsInMessage::InviteCreate { .. } => misc::invite_create().await, + WsInMessage::InviteAccept { .. } => misc::invite_accept().await, + WsInMessage::InviteRevoke { .. } => misc::invite_revoke().await, + WsInMessage::BanCreate { .. } => misc::ban_create().await, + WsInMessage::BanRemove { .. } => misc::ban_remove().await, + WsInMessage::VoiceJoin { room } => misc::voice_join(room).await, + WsInMessage::VoiceLeave { room } => misc::voice_leave(room).await, + WsInMessage::VoiceMute { room, muted } => misc::voice_mute(room, muted).await, + WsInMessage::VoiceDeaf { room, deafened } => misc::voice_deaf(room, deafened).await, + WsInMessage::ScreenShare { room, start } => misc::screen_share(room, start).await, + WsInMessage::AiList { room } => misc::ai_list(session, room).await, + WsInMessage::AiUpsert { room, model, version, system_prompt, temperature, max_tokens, stream } => { + misc::ai_upsert(session, room, model, version, system_prompt, temperature, max_tokens, stream).await + } + WsInMessage::AiDelete { room, agent_id } => { + misc::ai_delete(session, room, agent_id).await + } + WsInMessage::AiStop { room } => misc::ai_stop(session, room).await, + WsInMessage::UserSummary { username } => misc::user_summary(session, username).await, + WsInMessage::StateSetReadSeq { room, last_read_seq } => { + misc::state_set_read_seq(session, room, last_read_seq).await + } + WsInMessage::StateUpdateDnd { room, do_not_disturb, dnd_start_hour, dnd_end_hour } => { + misc::state_update_dnd(session, room, do_not_disturb, dnd_start_hour, dnd_end_hour).await + } + } + } +} \ No newline at end of file diff --git a/libs/transport/handler/inbound/msg.rs b/libs/transport/handler/inbound/msg.rs new file mode 100644 index 0000000..1fe3f98 --- /dev/null +++ b/libs/transport/handler/inbound/msg.rs @@ -0,0 +1,67 @@ +use room::ws_context::WsUserContext; + +use crate::error::AppTransportError; +use crate::handler::session::TransportSession; +use crate::handler::types::WsOutEvent; + +pub(crate) async fn ping() -> Result, AppTransportError> { + Ok(Some(WsOutEvent::Pong { + protocol_version: crate::handler::types::WS_PROTOCOL_VERSION, + })) +} + +pub(crate) async fn subscribe( + session: &TransportSession, + room: models::RoomId, +) -> Result, AppTransportError> { + let sub = session.subscribe_room(room).await.map_err(|e| { + tracing::warn!(error = %e, "subscribe_room failed"); + AppTransportError::Internal + })?; + session.subscriptions.insert(room, sub); + session.service.room.spawn_room_workers(room); + session.refresh_project().await; + Ok(None) +} + +pub(crate) async fn unsubscribe( + session: &TransportSession, + room: models::RoomId, +) -> Result, AppTransportError> { + session.unsubscribe_room(room).await; + Ok(None) +} + +pub(crate) async fn typing_start( + session: &TransportSession, + room: models::RoomId, +) -> Result, AppTransportError> { + session.broadcast_typing(room, "start").await; + Ok(None) +} + +pub(crate) async fn typing_stop( + session: &TransportSession, + room: models::RoomId, +) -> Result, AppTransportError> { + session.broadcast_typing(room, "stop").await; + Ok(None) +} + +pub(crate) async fn read_receipt( + session: &TransportSession, + room: models::RoomId, + last_read_seq: i64, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + session + .service + .room + .room_user_state_update_read_seq(room, last_read_seq, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_user_state_update_read_seq failed"); + AppTransportError::Internal + })?; + Ok(None) +} \ No newline at end of file diff --git a/libs/transport/handler/inbound/reaction.rs b/libs/transport/handler/inbound/reaction.rs new file mode 100644 index 0000000..20dbf6f --- /dev/null +++ b/libs/transport/handler/inbound/reaction.rs @@ -0,0 +1,180 @@ +use room::ws_context::WsUserContext; +use uuid::Uuid; + +use crate::error::AppTransportError; +use crate::event::reaction; +use crate::handler::session::TransportSession; +use crate::handler::types::WsOutEvent; + +pub(crate) async fn reaction_add( + session: &TransportSession, + room: models::RoomId, + message: Uuid, + emoji: String, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + let rxs = session + .service + .room + .message_reaction_add(message, emoji, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "message_reaction_add failed"); + AppTransportError::Internal + })?; + Ok(Some(build_reaction_batch(room, message, rxs.reactions))) +} + +pub(crate) async fn reaction_remove( + session: &TransportSession, + room: models::RoomId, + message: Uuid, + emoji: String, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + let rxs = session + .service + .room + .message_reaction_remove(message, emoji, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "message_reaction_remove failed"); + AppTransportError::Internal + })?; + Ok(Some(build_reaction_batch(room, message, rxs.reactions))) +} + +fn build_reaction_batch( + room: models::RoomId, + message: Uuid, + reactions: Vec, +) -> WsOutEvent { + WsOutEvent::ReactionBatchUpdated { + room_id: room, + data: reaction::ReactionBatchUpdatedService { + room, + message, + reactions: reactions + .into_iter() + .map(|g| reaction::ReactionGroup { + emoji: g.emoji, + count: g.count as i64, + reacted_by_me: g.reacted_by_me, + users: g.users.iter().filter_map(|u| u.parse::().ok()).collect(), + }) + .collect(), + }, + } +} + +pub(crate) async fn thread_create( + session: &TransportSession, + room: models::RoomId, + parent: i64, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + session + .service + .room + .room_thread_create(room, room::RoomThreadCreateRequest { parent_seq: parent }, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_thread_create failed"); + AppTransportError::Internal + })?; + Ok(None) +} + +pub(crate) async fn thread_resolve(thread_id: models::RoomThreadId) -> Result, AppTransportError> { + tracing::info!(%thread_id, "Thread resolved"); + Ok(None) +} + +pub(crate) async fn thread_archive(thread_id: models::RoomThreadId) -> Result, AppTransportError> { + tracing::info!(%thread_id, "Thread archived"); + Ok(None) +} + +pub(crate) async fn pin_add( + session: &TransportSession, + message: Uuid, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + session + .service + .room + .room_pin_add(message, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_pin_add failed"); + AppTransportError::Internal + })?; + Ok(None) +} + +pub(crate) async fn pin_remove( + session: &TransportSession, + message: Uuid, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + session + .service + .room + .room_pin_remove(message, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_pin_remove failed"); + AppTransportError::Internal + })?; + Ok(None) +} + +pub(crate) async fn draft_save( + session: &TransportSession, + room: models::RoomId, + content: String, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + let draft = session + .service + .room + .draft_save(room, content, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "draft_save failed"); + AppTransportError::Internal + })?; + Ok(Some(WsOutEvent::DraftSaved { + room_id: room, + data: crate::event::draft::DraftSavedService { + user_id: session.user.user_id, + room: draft.room_id, + content: draft.content, + saved_at: draft.saved_at, + }, + })) +} + +pub(crate) async fn draft_clear( + session: &TransportSession, + room: models::RoomId, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + session + .service + .room + .draft_clear(room, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "draft_clear failed"); + AppTransportError::Internal + })?; + Ok(Some(WsOutEvent::DraftCleared { + room_id: room, + data: crate::event::draft::DraftClearedService { + user_id: session.user.user_id, + room, + cleared_at: chrono::Utc::now(), + }, + })) +} \ No newline at end of file diff --git a/libs/transport/handler/inbound/room.rs b/libs/transport/handler/inbound/room.rs new file mode 100644 index 0000000..b354038 --- /dev/null +++ b/libs/transport/handler/inbound/room.rs @@ -0,0 +1,237 @@ +use room::ws_context::WsUserContext; +use uuid::Uuid; + +use crate::error::AppTransportError; +use crate::event::{category, rooms}; +use crate::handler::session::TransportSession; +use crate::handler::types::WsOutEvent; + +pub(crate) async fn room_get( + session: &TransportSession, + room: models::RoomId, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + session + .service + .room + .require_room_access(room, ctx.user_id) + .await + .map_err(|e| { + tracing::warn!(error = %e, "require_room_access failed"); + AppTransportError::Internal + })?; + let rm = session + .service + .room + .find_room_or_404(room) + .await + .map_err(|e| { + tracing::warn!(error = %e, "find_room failed"); + AppTransportError::Internal + })?; + Ok(Some(WsOutEvent::RoomCreated { + room_id: rm.id, + data: rooms::RoomCreatedService { + id: rm.id, + project: rm.project, + room_name: rm.room_name, + public: rm.public, + category: rm.category, + created_by: rm.created_by, + created_at: rm.created_at, + }, + })) +} + +pub(crate) async fn room_create( + session: &TransportSession, + project: models::ProjectId, + room_name: String, + public: bool, + category: Option, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + let rm = session + .service + .room + .room_create( + project.to_string(), + room::RoomCreateRequest { + room_name, + public, + category, + }, + &ctx, + ) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_create failed"); + AppTransportError::Internal + })?; + Ok(Some(WsOutEvent::RoomCreated { + room_id: rm.id, + data: rooms::RoomCreatedService { + id: rm.id, + project, + room_name: rm.room_name, + public: rm.public, + category, + created_by: session.user.user_id, + created_at: rm.created_at, + }, + })) +} + +pub(crate) async fn room_update( + session: &TransportSession, + room: models::RoomId, + room_name: Option, + public: Option, + category: Option, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + session + .service + .room + .room_update( + room, + room::RoomUpdateRequest { + room_name, + public, + category, + }, + &ctx, + ) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_update failed"); + AppTransportError::Internal + })?; + Ok(None) +} + +pub(crate) async fn room_delete( + session: &TransportSession, + room: models::RoomId, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + session + .service + .room + .room_delete(room, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_delete failed"); + AppTransportError::Internal + })?; + Ok(None) +} + +pub(crate) async fn category_create( + session: &TransportSession, + project: models::ProjectId, + name: String, + position: Option, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + let cat = session + .service + .room + .room_category_create( + project.to_string(), + room::RoomCategoryCreateRequest { name, position }, + &ctx, + ) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_category_create failed"); + AppTransportError::Internal + })?; + Ok(Some(WsOutEvent::CategoryCreated { + project, + data: category::CategoryCreatedService { + id: cat.id, + project, + name: cat.name, + position: cat.position, + created_by: session.user.user_id, + created_at: cat.created_at, + }, + })) +} + +pub(crate) async fn category_update( + session: &TransportSession, + id: Uuid, + name: Option, + position: Option, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + session + .service + .room + .room_category_update( + id, + room::RoomCategoryUpdateRequest { name, position }, + &ctx, + ) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_category_update failed"); + AppTransportError::Internal + })?; + Ok(None) +} + +pub(crate) async fn category_delete( + session: &TransportSession, + id: Uuid, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + session + .service + .room + .room_category_delete(id, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_category_delete failed"); + AppTransportError::Internal + })?; + Ok(None) +} + +pub(crate) async fn access_grant( + session: &TransportSession, + room: models::RoomId, + user: models::UserId, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + session + .service + .room + .room_access_grant(room, user, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_access_grant failed"); + AppTransportError::Internal + })?; + Ok(None) +} + +pub(crate) async fn access_revoke( + session: &TransportSession, + room: models::RoomId, + user: models::UserId, +) -> Result, AppTransportError> { + let ctx = WsUserContext::new(session.user.user_id); + session + .service + .room + .room_access_revoke(room, user, &ctx) + .await + .map_err(|e| { + tracing::warn!(error = %e, "room_access_revoke failed"); + AppTransportError::Internal + })?; + Ok(None) +} \ No newline at end of file diff --git a/libs/transport/handler/session.rs b/libs/transport/handler/session.rs index 1de6e02..561d279 100644 --- a/libs/transport/handler/session.rs +++ b/libs/transport/handler/session.rs @@ -78,6 +78,9 @@ pub struct TransportSession { pub subscriptions: Arc>, pub seq: Arc, pub service: Arc, + /// Cached project_id resolved from the first subscribed room. + /// Avoids repeated DB lookups on every PresenceUpdate. + pub project_id: Mutex>, } impl TransportSession { @@ -90,6 +93,7 @@ impl TransportSession { subscriptions: Arc::new(DashMap::new()), seq: Arc::new(SeqAllocator::new(service.cache.clone(), service.db.clone())), service, + project_id: Mutex::new(None), } } @@ -128,26 +132,50 @@ impl TransportSession { self.service.room.room_manager.broadcast_typing(room_id, event).await; } - /// Get the current project context from the first subscribed room. - /// Returns the project_id if the user has any subscribed rooms. + /// Get the current project context from cache (populated on first subscription). pub async fn get_current_project(&self) -> Option { - use models::rooms::room; - use sea_orm::EntityTrait; + let cached = self.project_id.lock().await; + if cached.is_some() { + return *cached; + } + drop(cached); - // Try to get the first subscribed room + // Lazy init: query first subscribed room, cache result. let first_room = self.subscriptions.iter().next().map(|r| *r.key()); if let Some(room_id) = first_room { - // Query the room to get its project_id + use models::rooms::room; + use sea_orm::EntityTrait; if let Ok(Some(rm)) = room::Entity::find_by_id(room_id) .one(&self.service.db) .await { + let mut cached = self.project_id.lock().await; + *cached = Some(rm.project); return Some(rm.project); } } None } + /// Refresh cached project_id — call after Subscribe/Unsubscribe if needed. + pub async fn refresh_project(&self) { + let first_room = self.subscriptions.iter().next().map(|r| *r.key()); + let new_project = if let Some(room_id) = first_room { + use models::rooms::room; + use sea_orm::EntityTrait; + room::Entity::find_by_id(room_id) + .one(&self.service.db) + .await + .ok() + .flatten() + .map(|rm| rm.project) + } else { + None + }; + let mut cached = self.project_id.lock().await; + *cached = new_project; + } + pub fn to_session(&self) -> session::Session { let s = session::Session::no_op(); s.set_user(self.user.user_id); diff --git a/libs/transport/handler/types.rs b/libs/transport/handler/types.rs deleted file mode 100644 index f3038c9..0000000 --- a/libs/transport/handler/types.rs +++ /dev/null @@ -1,559 +0,0 @@ -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; - -use models::{ProjectId, RoomId, RoomThreadId, UserId}; -use uuid::Uuid; - -use crate::event::{ - ai, attachment, ban, category, draft, invite, member, message, notify, pin, presence, project, - reaction, rooms, search, thread, voice, -}; - -/// Current WS protocol version — bump when event types or fields change. -pub const WS_PROTOCOL_VERSION: u32 = 1; - -// ─── Outbound Event Envelope ──────────────────────────────────────────────── - -#[derive(Debug, Clone, Serialize)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum WsOutEvent { - Pong { - protocol_version: u32, - }, - Error(WsError), - // ── Room events ── - RoomCreated { - room_id: RoomId, - data: rooms::RoomCreatedService, - }, - RoomDeleted { - room_id: RoomId, - data: rooms::RoomDeletedService, - }, - RoomRenamed { - room_id: RoomId, - data: rooms::RoomRenamedService, - }, - RoomTopicUpdated { - room_id: RoomId, - data: rooms::RoomTopicUpdatedService, - }, - RoomSettingsUpdated { - room_id: RoomId, - data: rooms::RoomSettingsUpdatedService, - }, - RoomMoved { - room_id: RoomId, - data: rooms::RoomMovedService, - }, - // ── Message events ── - MessageNew { - room_id: RoomId, - data: message::MessageNewService, - }, - MessageEdited { - room_id: RoomId, - data: message::MessageEditedService, - }, - MessageRevoked { - room_id: RoomId, - data: message::MessageRevokedService, - }, - MessageStreamStart { - room_id: RoomId, - data: message::MessageStreamStartService, - }, - MessageStreamChunk { - room_id: RoomId, - data: message::MessageStreamChunkService, - }, - MessageStreamDone { - room_id: RoomId, - data: message::MessageStreamDoneService, - }, - MessageList { - room_id: RoomId, - data: message::MessageListService, - }, - // ── Member events ── - MemberJoined { - room_id: RoomId, - data: member::MemberJoinedService, - }, - MemberRemoved { - room_id: RoomId, - data: member::MemberRemovedService, - }, - ReadReceipt { - room_id: RoomId, - data: member::ReadReceiptService, - }, - TypingStart { - room_id: RoomId, - data: member::TypingStartService, - }, - TypingStop { - room_id: RoomId, - data: member::TypingStopService, - }, - // ── Reaction events ── - ReactionAdded { - room_id: RoomId, - data: reaction::ReactionAddedService, - }, - ReactionRemoved { - room_id: RoomId, - data: reaction::ReactionRemovedService, - }, - ReactionBatchUpdated { - room_id: RoomId, - data: reaction::ReactionBatchUpdatedService, - }, - // ── Thread events ── - ThreadCreated { - room_id: RoomId, - data: thread::ThreadCreatedService, - }, - ThreadUpdated { - room_id: RoomId, - data: thread::ThreadUpdatedService, - }, - ThreadResolved { - room_id: RoomId, - data: thread::ThreadResolvedService, - }, - ThreadArchived { - room_id: RoomId, - data: thread::ThreadArchivedService, - }, - ThreadParticipantJoined { - room_id: RoomId, - data: thread::ThreadParticipantJoinedService, - }, - ThreadParticipantLeft { - room_id: RoomId, - data: thread::ThreadParticipantLeftService, - }, - // ── Category events ── - CategoryCreated { - project: ProjectId, - data: category::CategoryCreatedService, - }, - CategoryUpdated { - project: ProjectId, - data: category::CategoryUpdatedService, - }, - CategoryDeleted { - project: ProjectId, - data: category::CategoryDeletedService, - }, - // ── Pin events ── - PinAdded { - room_id: RoomId, - data: pin::PinAddedService, - }, - PinRemoved { - room_id: RoomId, - data: pin::PinRemovedService, - }, - // ── Project events ── - ProjectRoomCreated { - project: ProjectId, - data: project::ProjectRoomCreatedService, - }, - ProjectRoomDeleted { - project: ProjectId, - data: project::ProjectRoomDeletedService, - }, - ProjectRoomRenamed { - project: ProjectId, - data: project::ProjectRoomRenamedService, - }, - ProjectRoomMoved { - project: ProjectId, - data: project::ProjectRoomMovedService, - }, - // ── Draft events ── - DraftSaved { - room_id: RoomId, - data: draft::DraftSavedService, - }, - DraftCleared { - room_id: RoomId, - data: draft::DraftClearedService, - }, - // ── Search ── - SearchResult { - data: search::SearchResultService, - }, - // ── Notification events ── - NotifyCreated { - data: notify::NotifyCreatedService, - }, - NotifyRead { - data: notify::NotifyReadService, - }, - // ── Presence events ── - PresenceChanged { - data: presence::PresenceChangedService, - }, - CustomStatusUpdated { - data: presence::CustomStatusUpdatedService, - }, - // ── Invite events ── - InviteCreated { - data: invite::InviteCreatedService, - }, - InviteAccepted { - data: invite::InviteAcceptedService, - }, - InviteRejected { - data: invite::InviteRejectedService, - }, - InviteRevoked { - data: invite::InviteRevokedService, - }, - // ── Attachment events ── - AttachmentUploaded { - data: attachment::AttachmentUploadedService, - }, - AttachmentThumbnailGenerated { - data: attachment::AttachmentThumbnailService, - }, - AttachmentDeleted { - data: attachment::AttachmentDeletedService, - }, - // ── Ban events ── - UserBanned { - data: ban::BannedService, - }, - UserUnbanned { - data: ban::UnbannedService, - }, - // ── AI events ── - AiAgentJoined { - data: ai::AiAgentJoinedService, - }, - AiAgentLeft { - data: ai::AiAgentLeftService, - }, - AiAgentStatusChanged { - data: ai::AiAgentStatusChangedService, - }, - // ── Voice events ── - VoiceChannelJoined { - data: voice::VoiceChannelJoinedService, - }, - VoiceChannelLeft { - data: voice::VoiceChannelLeftService, - }, - VoiceMuteUpdated { - data: voice::VoiceMuteUpdatedService, - }, - VoiceDeafUpdated { - data: voice::VoiceDeafUpdatedService, - }, - ScreenShareStarted { - data: voice::ScreenShareStartedService, - }, - ScreenShareStopped { - data: voice::ScreenShareStoppedService, - }, - SpeakingStarted { - room_id: RoomId, - user_id: UserId, - }, - SpeakingStopped { - room_id: RoomId, - user_id: UserId, - }, - // ── Request-response ── - Response { - request_id: Uuid, - data: serde_json::Value, - }, -} - -#[derive(Debug, Clone, Serialize)] -pub struct WsError { - pub code: i32, - pub error: String, - pub message: String, -} - -// ─── Inbound Client Message ────────────────────────────────────────────────── - -#[derive(Debug, Clone, Deserialize)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum WsInMessage { - Ping, - // ── Room subscribe/unsubscribe ── - Subscribe { - room: RoomId, - }, - Unsubscribe { - room: RoomId, - }, - // ── Typing ── - TypingStart { - room: RoomId, - }, - TypingStop { - room: RoomId, - }, - // ── Read receipt ── - ReadReceipt { - room: RoomId, - last_read_seq: i64, - }, - // ── Message operations ── - MessageList { - room: RoomId, - before_seq: Option, - after_seq: Option, - limit: Option, - }, - MessageCreate { - room: RoomId, - content: String, - content_type: Option, - thread: Option, - in_reply_to: Option, - }, - MessageUpdate { - message: Uuid, - content: String, - }, - MessageRevoke { - message: Uuid, - }, - // ── Room queries ── - RoomGet { - room: RoomId, - }, - // ── Room CRUD ── - RoomCreate { - project: ProjectId, - room_name: String, - public: bool, - category: Option, - }, - RoomUpdate { - room: RoomId, - room_name: Option, - public: Option, - category: Option, - }, - RoomDelete { - room: RoomId, - }, - // ── Category CRUD ── - CategoryCreate { - project: ProjectId, - name: String, - position: Option, - }, - CategoryUpdate { - id: Uuid, - name: Option, - position: Option, - }, - CategoryDelete { - id: Uuid, - }, - // ── Access & state operations ── - AccessGrant { - room: RoomId, - user: UserId, - }, - AccessRevoke { - room: RoomId, - user: UserId, - }, - StateSetReadSeq { - room: RoomId, - last_read_seq: i64, - }, - StateUpdateDnd { - room: RoomId, - do_not_disturb: Option, - dnd_start_hour: Option, - dnd_end_hour: Option, - }, - // ── Reaction ── - ReactionAdd { - room: RoomId, - message: Uuid, - emoji: String, - }, - ReactionRemove { - room: RoomId, - message: Uuid, - emoji: String, - }, - // ── Thread ── - ThreadCreate { - room: RoomId, - parent: i64, - }, - ThreadResolve { - thread_id: RoomThreadId, - }, - ThreadArchive { - thread_id: RoomThreadId, - }, - // ── Pin ── - PinAdd { - room: RoomId, - message: Uuid, - }, - PinRemove { - room: RoomId, - message: Uuid, - }, - // ── Draft ── - DraftSave { - room: RoomId, - content: String, - }, - DraftClear { - room: RoomId, - }, - // ── Search ── - Search { - q: String, - room: Option, - start_time: Option>, - end_time: Option>, - sender_id: Option, - content_type: Option, - limit: Option, - offset: Option, - }, - // ── Notification ── - NotificationMarkRead { - id: Uuid, - }, - NotificationMarkAllRead { - project_id: Option, - }, - NotificationArchive { - id: Uuid, - }, - // ── Presence ── - PresenceUpdate { - status: crate::event::presence::UserPresenceStatus, - }, - CustomStatusUpdate { - emoji: Option, - text: Option, - expires_at: Option>, - }, - // ── Invite ── - InviteCreate { - project: ProjectId, - room: Option, - max_uses: Option, - expires_at: Option>, - }, - InviteAccept { - code: String, - }, - InviteRevoke { - id: Uuid, - }, - // ── Ban ── - BanCreate { - project: ProjectId, - user: UserId, - reason: Option, - expires_at: Option>, - }, - BanRemove { - project: ProjectId, - user: UserId, - }, - // ── Voice ── - VoiceJoin { - room: RoomId, - }, - VoiceLeave { - room: RoomId, - }, - VoiceMute { - room: RoomId, - muted: bool, - }, - VoiceDeaf { - room: RoomId, - deafened: bool, - }, - ScreenShare { - room: RoomId, - start: bool, - }, - // ── AI ── - AiList { - room: RoomId, - }, - AiUpsert { - room: RoomId, - model: Uuid, - version: Option, - system_prompt: Option, - temperature: Option, - max_tokens: Option, - stream: Option, - }, - AiDelete { - room: RoomId, - agent_id: Uuid, - }, - AiStop { - room: RoomId, - }, - // ── User ── - UserSummary { - username: String, - }, -} - -impl WsInMessage { - pub fn room_id(&self) -> Option { - match self { - Self::Subscribe { room } - | Self::Unsubscribe { room } - | Self::TypingStart { room } - | Self::TypingStop { room } - | Self::ReadReceipt { room, .. } - | Self::MessageCreate { room, .. } - | Self::RoomUpdate { room, .. } - | Self::RoomDelete { room } - | Self::AccessGrant { room, .. } - | Self::AccessRevoke { room, .. } - | Self::StateSetReadSeq { room, .. } - | Self::StateUpdateDnd { room, .. } - | Self::ReactionAdd { room, .. } - | Self::ReactionRemove { room, .. } - | Self::ThreadCreate { room, .. } - | Self::PinAdd { room, .. } - | Self::PinRemove { room, .. } - | Self::DraftSave { room, .. } - | Self::DraftClear { room } - | Self::VoiceJoin { room } - | Self::VoiceLeave { room } - | Self::VoiceMute { room, .. } - | Self::VoiceDeaf { room, .. } - | Self::ScreenShare { room, .. } - | Self::AiList { room } - | Self::AiUpsert { room, .. } - | Self::AiDelete { room, .. } - | Self::MessageList { room, .. } - | Self::RoomGet { room } - | Self::Search { - room: Some(room), .. - } => Some(*room), - _ => None, - } - } -} diff --git a/libs/transport/handler/types/in_message.rs b/libs/transport/handler/types/in_message.rs new file mode 100644 index 0000000..3504433 --- /dev/null +++ b/libs/transport/handler/types/in_message.rs @@ -0,0 +1,104 @@ +use chrono::{DateTime, Utc}; +use serde::Deserialize; +use uuid::Uuid; + +use models::{ProjectId, RoomId, RoomThreadId, UserId}; + +/// Current WS protocol version — bump when event types or fields change. +pub const WS_PROTOCOL_VERSION: u32 = 1; + +#[derive(Debug, Clone, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum WsInMessage { + Ping, + Subscribe { room: RoomId }, + Unsubscribe { room: RoomId }, + TypingStart { room: RoomId }, + TypingStop { room: RoomId }, + ReadReceipt { room: RoomId, last_read_seq: i64 }, + MessageList { room: RoomId, before_seq: Option, after_seq: Option, limit: Option }, + MessageCreate { room: RoomId, content: String, content_type: Option, thread: Option, in_reply_to: Option }, + MessageUpdate { message: Uuid, content: String }, + MessageRevoke { message: Uuid }, + RoomGet { room: RoomId }, + RoomCreate { project: ProjectId, room_name: String, public: bool, category: Option }, + RoomUpdate { room: RoomId, room_name: Option, public: Option, category: Option }, + RoomDelete { room: RoomId }, + CategoryCreate { project: ProjectId, name: String, position: Option }, + CategoryUpdate { id: Uuid, name: Option, position: Option }, + CategoryDelete { id: Uuid }, + AccessGrant { room: RoomId, user: UserId }, + AccessRevoke { room: RoomId, user: UserId }, + StateSetReadSeq { room: RoomId, last_read_seq: i64 }, + StateUpdateDnd { room: RoomId, do_not_disturb: Option, dnd_start_hour: Option, dnd_end_hour: Option }, + ReactionAdd { room: RoomId, message: Uuid, emoji: String }, + ReactionRemove { room: RoomId, message: Uuid, emoji: String }, + ThreadCreate { room: RoomId, parent: i64 }, + ThreadResolve { thread_id: RoomThreadId }, + ThreadArchive { thread_id: RoomThreadId }, + PinAdd { room: RoomId, message: Uuid }, + PinRemove { room: RoomId, message: Uuid }, + DraftSave { room: RoomId, content: String }, + DraftClear { room: RoomId }, + Search { q: String, room: Option, start_time: Option>, end_time: Option>, sender_id: Option, content_type: Option, limit: Option, offset: Option }, + NotificationMarkRead { id: Uuid }, + NotificationMarkAllRead { project_id: Option }, + NotificationArchive { id: Uuid }, + PresenceUpdate { status: crate::event::presence::UserPresenceStatus }, + CustomStatusUpdate { emoji: Option, text: Option, expires_at: Option> }, + InviteCreate { project: ProjectId, room: Option, max_uses: Option, expires_at: Option> }, + InviteAccept { code: String }, + InviteRevoke { id: Uuid }, + BanCreate { project: ProjectId, user: UserId, reason: Option, expires_at: Option> }, + BanRemove { project: ProjectId, user: UserId }, + VoiceJoin { room: RoomId }, + VoiceLeave { room: RoomId }, + VoiceMute { room: RoomId, muted: bool }, + VoiceDeaf { room: RoomId, deafened: bool }, + ScreenShare { room: RoomId, start: bool }, + AiList { room: RoomId }, + AiUpsert { room: RoomId, model: Uuid, version: Option, system_prompt: Option, temperature: Option, max_tokens: Option, stream: Option }, + AiDelete { room: RoomId, agent_id: Uuid }, + AiStop { room: RoomId }, + UserSummary { username: String }, +} + +impl WsInMessage { + pub fn room_id(&self) -> Option { + match self { + Self::Subscribe { room } + | Self::Unsubscribe { room } + | Self::TypingStart { room } + | Self::TypingStop { room } + | Self::ReadReceipt { room, .. } + | Self::MessageCreate { room, .. } + | Self::RoomUpdate { room, .. } + | Self::RoomDelete { room } + | Self::AccessGrant { room, .. } + | Self::AccessRevoke { room, .. } + | Self::StateSetReadSeq { room, .. } + | Self::StateUpdateDnd { room, .. } + | Self::ReactionAdd { room, .. } + | Self::ReactionRemove { room, .. } + | Self::ThreadCreate { room, .. } + | Self::PinAdd { room, .. } + | Self::PinRemove { room, .. } + | Self::DraftSave { room, .. } + | Self::DraftClear { room } + | Self::VoiceJoin { room } + | Self::VoiceLeave { room } + | Self::VoiceMute { room, .. } + | Self::VoiceDeaf { room, .. } + | Self::ScreenShare { room, .. } + | Self::AiList { room } + | Self::AiUpsert { room, .. } + | Self::AiDelete { room, .. } + | Self::MessageList { room, .. } + | Self::RoomGet { room } + | Self::Search { + room: Some(room), .. + } => Some(*room), + _ => None, + } + } +} \ No newline at end of file diff --git a/libs/transport/handler/types/mod.rs b/libs/transport/handler/types/mod.rs new file mode 100644 index 0000000..53d36dd --- /dev/null +++ b/libs/transport/handler/types/mod.rs @@ -0,0 +1,5 @@ +mod in_message; +mod out_event; + +pub use in_message::{WsInMessage, WS_PROTOCOL_VERSION}; +pub use out_event::{WsError, WsOutEvent}; \ No newline at end of file diff --git a/libs/transport/handler/types/out_event.rs b/libs/transport/handler/types/out_event.rs new file mode 100644 index 0000000..f042336 --- /dev/null +++ b/libs/transport/handler/types/out_event.rs @@ -0,0 +1,275 @@ +use serde::Serialize; +use uuid::Uuid; + +use models::{ProjectId, RoomId, UserId}; +use crate::event::{ + ai, attachment, ban, category, draft, invite, member, message, notify, pin, presence, project, + reaction, rooms, search, thread, voice, +}; + +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum WsOutEvent { + Pong { + protocol_version: u32, + }, + Error(WsError), + // ── Room events ── + RoomCreated { + room_id: RoomId, + data: rooms::RoomCreatedService, + }, + RoomDeleted { + room_id: RoomId, + data: rooms::RoomDeletedService, + }, + RoomRenamed { + room_id: RoomId, + data: rooms::RoomRenamedService, + }, + RoomTopicUpdated { + room_id: RoomId, + data: rooms::RoomTopicUpdatedService, + }, + RoomSettingsUpdated { + room_id: RoomId, + data: rooms::RoomSettingsUpdatedService, + }, + RoomMoved { + room_id: RoomId, + data: rooms::RoomMovedService, + }, + // ── Message events ── + MessageNew { + room_id: RoomId, + data: message::MessageNewService, + }, + MessageEdited { + room_id: RoomId, + data: message::MessageEditedService, + }, + MessageRevoked { + room_id: RoomId, + data: message::MessageRevokedService, + }, + MessageStreamStart { + room_id: RoomId, + data: message::MessageStreamStartService, + }, + MessageStreamChunk { + room_id: RoomId, + data: message::MessageStreamChunkService, + }, + MessageStreamDone { + room_id: RoomId, + data: message::MessageStreamDoneService, + }, + MessageList { + room_id: RoomId, + data: message::MessageListService, + }, + // ── Member events ── + MemberJoined { + room_id: RoomId, + data: member::MemberJoinedService, + }, + MemberRemoved { + room_id: RoomId, + data: member::MemberRemovedService, + }, + ReadReceipt { + room_id: RoomId, + data: member::ReadReceiptService, + }, + TypingStart { + room_id: RoomId, + data: member::TypingStartService, + }, + TypingStop { + room_id: RoomId, + data: member::TypingStopService, + }, + // ── Reaction events ── + ReactionAdded { + room_id: RoomId, + data: reaction::ReactionAddedService, + }, + ReactionRemoved { + room_id: RoomId, + data: reaction::ReactionRemovedService, + }, + ReactionBatchUpdated { + room_id: RoomId, + data: reaction::ReactionBatchUpdatedService, + }, + // ── Thread events ── + ThreadCreated { + room_id: RoomId, + data: thread::ThreadCreatedService, + }, + ThreadUpdated { + room_id: RoomId, + data: thread::ThreadUpdatedService, + }, + ThreadResolved { + room_id: RoomId, + data: thread::ThreadResolvedService, + }, + ThreadArchived { + room_id: RoomId, + data: thread::ThreadArchivedService, + }, + ThreadParticipantJoined { + room_id: RoomId, + data: thread::ThreadParticipantJoinedService, + }, + ThreadParticipantLeft { + room_id: RoomId, + data: thread::ThreadParticipantLeftService, + }, + // ── Category events ── + CategoryCreated { + project: ProjectId, + data: category::CategoryCreatedService, + }, + CategoryUpdated { + project: ProjectId, + data: category::CategoryUpdatedService, + }, + CategoryDeleted { + project: ProjectId, + data: category::CategoryDeletedService, + }, + // ── Pin events ── + PinAdded { + room_id: RoomId, + data: pin::PinAddedService, + }, + PinRemoved { + room_id: RoomId, + data: pin::PinRemovedService, + }, + // ── Project events ── + ProjectRoomCreated { + project: ProjectId, + data: project::ProjectRoomCreatedService, + }, + ProjectRoomDeleted { + project: ProjectId, + data: project::ProjectRoomDeletedService, + }, + ProjectRoomRenamed { + project: ProjectId, + data: project::ProjectRoomRenamedService, + }, + ProjectRoomMoved { + project: ProjectId, + data: project::ProjectRoomMovedService, + }, + // ── Draft events ── + DraftSaved { + room_id: RoomId, + data: draft::DraftSavedService, + }, + DraftCleared { + room_id: RoomId, + data: draft::DraftClearedService, + }, + // ── Search ── + SearchResult { + data: search::SearchResultService, + }, + // ── Notification events ── + NotifyCreated { + data: notify::NotifyCreatedService, + }, + NotifyRead { + data: notify::NotifyReadService, + }, + // ── Presence events ── + PresenceChanged { + data: presence::PresenceChangedService, + }, + CustomStatusUpdated { + data: presence::CustomStatusUpdatedService, + }, + // ── Invite events ── + InviteCreated { + data: invite::InviteCreatedService, + }, + InviteAccepted { + data: invite::InviteAcceptedService, + }, + InviteRejected { + data: invite::InviteRejectedService, + }, + InviteRevoked { + data: invite::InviteRevokedService, + }, + // ── Attachment events ── + AttachmentUploaded { + data: attachment::AttachmentUploadedService, + }, + AttachmentThumbnailGenerated { + data: attachment::AttachmentThumbnailService, + }, + AttachmentDeleted { + data: attachment::AttachmentDeletedService, + }, + // ── Ban events ── + UserBanned { + data: ban::BannedService, + }, + UserUnbanned { + data: ban::UnbannedService, + }, + // ── AI events ── + AiAgentJoined { + data: ai::AiAgentJoinedService, + }, + AiAgentLeft { + data: ai::AiAgentLeftService, + }, + AiAgentStatusChanged { + data: ai::AiAgentStatusChangedService, + }, + // ── Voice events ── + VoiceChannelJoined { + data: voice::VoiceChannelJoinedService, + }, + VoiceChannelLeft { + data: voice::VoiceChannelLeftService, + }, + VoiceMuteUpdated { + data: voice::VoiceMuteUpdatedService, + }, + VoiceDeafUpdated { + data: voice::VoiceDeafUpdatedService, + }, + ScreenShareStarted { + data: voice::ScreenShareStartedService, + }, + ScreenShareStopped { + data: voice::ScreenShareStoppedService, + }, + SpeakingStarted { + room_id: RoomId, + user_id: UserId, + }, + SpeakingStopped { + room_id: RoomId, + user_id: UserId, + }, + // ── Request-response ── + Response { + request_id: Uuid, + data: serde_json::Value, + }, +} + +#[derive(Debug, Clone, Serialize)] +pub struct WsError { + pub code: i32, + pub error: String, + pub message: String, +} \ No newline at end of file diff --git a/libs/transport/handler/ws.rs b/libs/transport/handler/ws.rs index 06f3c58..61016a7 100644 --- a/libs/transport/handler/ws.rs +++ b/libs/transport/handler/ws.rs @@ -146,7 +146,7 @@ pub async fn ws_handler( continue; } - // Pre-parse as Value to extract _request_id, then parse as WsInMessage + // Parse once — extract request_id and deserialize together. let json_value = serde_json::from_str::(&text); // Application-level JSON ping (distinguish from WebSocket Ping frame) @@ -155,62 +155,41 @@ pub async fn ws_handler( continue; } - // Extract _request_id before full parsing to support request-response correlation + // Extract _request_id from the Value, then deserialize WsInMessage let request_id: Option = json_value .ok() .and_then(|v| v.get("_request_id") .and_then(|r| serde_json::from_value(r.clone()).ok())); - // Re-parse from text as WsInMessage for the handler match serde_json::from_str::(&text) { Ok(in_msg) => { match MessageHandler::handle(&session, in_msg).await { Ok(Some(event)) => { - if let Some(rid) = request_id { - // Unwrap inner data for MessageList to avoid double nesting - let json_data = match &event { - WsOutEvent::MessageList { data, .. } => { - serde_json::to_value(data).unwrap_or_default() - } - _ => { - serde_json::to_value(&event).unwrap_or_default() - } - }; - let resp = WsOutEvent::Response { request_id: rid, data: json_data }; - if send_event(&mut ws_session, &resp).await.is_err() { break; } - } else { - if send_event(&mut ws_session, &event).await.is_err() { break; } - } + let rid = request_id.unwrap_or(Uuid::nil()); + let resp = WsOutEvent::Response { + request_id: rid, + data: serde_json::to_value(&event).unwrap_or_default(), + }; + if send_event(&mut ws_session, &resp).await.is_err() { break; } } Ok(None) => { - if let Some(rid) = request_id { - let ack = WsOutEvent::Response { - request_id: rid, - data: serde_json::json!({"ok": true}), - }; - if send_event(&mut ws_session, &ack).await.is_err() { break; } - } else { - let _ = ws_session.text(r#"{"type":"ack"}"#).await; - } + let rid = request_id.unwrap_or(Uuid::nil()); + let ack = WsOutEvent::Response { + request_id: rid, + data: serde_json::json!({"ok": true}), + }; + if send_event(&mut ws_session, &ack).await.is_err() { break; } } Err(e) => { tracing::warn!(user_id = %user_id, error = %e, "WS message processing failed"); - let err_json = if let Some(rid) = request_id { - serde_json::json!({ - "type": "error", - "code": 500, - "error": "internal_error", - "message": e.to_string(), - "_request_id": rid - }) - } else { - serde_json::json!({ - "type": "error", - "code": 500, - "error": "internal_error", - "message": e.to_string() - }) - }; + let rid = request_id.unwrap_or(Uuid::nil()); + let err_json = serde_json::json!({ + "type": "error", + "code": 500, + "error": "internal_error", + "message": e.to_string(), + "_request_id": rid + }); let _ = ws_session.text(err_json.to_string()).await; } } diff --git a/libs/transport/lib.rs b/libs/transport/lib.rs index 95f5523..f52fac7 100644 --- a/libs/transport/lib.rs +++ b/libs/transport/lib.rs @@ -51,40 +51,14 @@ impl AppTransport { Some(u) => u, None => { tracing::warn!("NATS_URL not configured, running without NATS transport"); - return Ok(Self { - db: service.db.clone(), - cache: service.cache.clone(), - seq: SeqAllocator::new(service.cache.clone(), service.db.clone()), - ack: ack::AckTracker::new(service.cache.clone()), - reconnect: reconnect::ReconnectManager::new(service.cache.clone(), service.db.clone()), - dedup: dedup::DeduplicationManager::new(service.cache.clone()), - rate_limiter: security::RateLimiter::new(service.cache.clone()), - csrf: security::CsrfProtection::new(service.cache.clone()), - circuit_breaker: circuit_breaker::CircuitBreaker::new(), - service, - config, - nats: None, - }); + return Self::build(service, config, None); } }; let token = match config.nats_token() { Some(t) => t, None => { tracing::warn!("NATS_TOKEN not configured, running without NATS transport"); - return Ok(Self { - db: service.db.clone(), - cache: service.cache.clone(), - seq: SeqAllocator::new(service.cache.clone(), service.db.clone()), - ack: ack::AckTracker::new(service.cache.clone()), - reconnect: reconnect::ReconnectManager::new(service.cache.clone(), service.db.clone()), - dedup: dedup::DeduplicationManager::new(service.cache.clone()), - rate_limiter: security::RateLimiter::new(service.cache.clone()), - csrf: security::CsrfProtection::new(service.cache.clone()), - circuit_breaker: circuit_breaker::CircuitBreaker::new(), - service, - config, - nats: None, - }); + return Self::build(service, config, None); } }; @@ -126,13 +100,14 @@ impl AppTransport { None }; + Self::build(service, config, nats) + } + + fn build(service: AppService, config: AppConfig, nats: Option) -> Result { Ok(Self { db: service.db.clone(), cache: service.cache.clone(), - seq: SeqAllocator::new( - service.cache.clone(), - service.db.clone(), - ), + seq: SeqAllocator::new(service.cache.clone(), service.db.clone()), ack: ack::AckTracker::new(service.cache.clone()), reconnect: reconnect::ReconnectManager::new(service.cache.clone(), service.db.clone()), dedup: dedup::DeduplicationManager::new(service.cache.clone()), diff --git a/libs/transport/token.rs b/libs/transport/token.rs index 8894c33..a3ea19b 100644 --- a/libs/transport/token.rs +++ b/libs/transport/token.rs @@ -1,5 +1,5 @@ use base64::Engine; -use hmac::{Hmac, Mac}; +use hmac::{Hmac, Mac, KeyInit}; use models::UserId; use session::Session; use sha2::Sha256;