refactor(transport): split handler inbound and types into sub-modules

Extract MessageHandler methods into inbound/{message,room,reaction,misc,msg}
and type definitions into types/{in_message,out_event}.
This commit is contained in:
ZhenYi 2026-05-11 17:05:17 +08:00
parent d45e9e28f4
commit deb25614ba
15 changed files with 1612 additions and 1444 deletions

View File

@ -1,803 +0,0 @@
use room::ws_context::WsUserContext;
use uuid::Uuid;
use super::session::TransportSession;
use super::types::{WsInMessage, WsOutEvent};
use crate::error::AppTransportError;
use crate::event::{category, message, reaction};
fn service_err<E: std::fmt::Display>(op: &str, err: E) -> AppTransportError {
tracing::warn!(error = %err, operation = %op, "WS service operation failed");
AppTransportError::Internal
}
pub struct MessageHandler;
impl MessageHandler {
pub async fn handle(
session: &TransportSession,
msg: WsInMessage,
) -> Result<Option<WsOutEvent>, AppTransportError> {
match msg {
WsInMessage::Ping => Ok(Some(WsOutEvent::Pong {
protocol_version: super::types::WS_PROTOCOL_VERSION,
})),
WsInMessage::Subscribe { room } => {
let sub = session
.subscribe_room(room)
.await
.map_err(|e| service_err("subscribe_room", e))?;
session.subscriptions.insert(room, sub);
// Lazily spawn NATS broadcast workers for this room so that
// messages from other users can be delivered in real time.
// SPAWNED_ROOMS guard prevents duplicate spawns.
session.service.room.spawn_room_workers(room);
Ok(None)
}
WsInMessage::Unsubscribe { room } => {
session.unsubscribe_room(room).await;
Ok(None)
}
WsInMessage::TypingStart { room } => {
session.broadcast_typing(room, "start").await;
Ok(None)
}
WsInMessage::TypingStop { room } => {
session.broadcast_typing(room, "stop").await;
Ok(None)
}
WsInMessage::ReadReceipt {
room,
last_read_seq,
} => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_user_state_update_read_seq(room, last_read_seq, &ctx)
.await
.map_err(|e| service_err("room_user_state_update_read_seq", e))?;
Ok(None)
}
WsInMessage::MessageList {
room,
before_seq,
after_seq,
limit,
} => {
let ctx = WsUserContext::new(session.user.user_id);
let list = session
.service
.room
.room_message_list(room, before_seq, after_seq, limit, &ctx)
.await
.map_err(|e| service_err("room_message_list", e))?;
let resp = message::MessageListService {
room,
messages: list
.messages
.iter()
.map(|m| message::MessageNewService {
id: m.id,
seq: m.seq,
room: m.room,
sender_type: m.sender_type.clone(),
sender_id: m.sender_id,
display_name: m.display_name.clone(),
thread: m.thread,
in_reply_to: m.in_reply_to,
content: m.content.clone(),
content_type: m.content_type.clone(),
thinking_content: m.thinking_content.clone(),
thinking_is_chunked: m.thinking_is_chunked,
send_at: m.send_at,
reactions: Some(
m.reactions
.iter()
.map(|r| reaction::ReactionGroup {
emoji: r.emoji.clone(),
count: r.count as i64,
reacted_by_me: r.reacted_by_me,
users: r.users.iter().filter_map(|u| Uuid::parse_str(u).ok()).collect(),
})
.collect(),
),
})
.collect(),
total: list.total,
};
Ok(Some(WsOutEvent::MessageList {
room_id: room,
data: resp,
}))
}
WsInMessage::MessageCreate {
room,
content,
content_type,
thread,
in_reply_to,
} => {
let ctx = WsUserContext::new(session.user.user_id);
let msg = session
.service
.room
.room_message_create(
room,
room::RoomMessageCreateRequest {
content,
content_type,
thread,
in_reply_to,
attachment_ids: vec![],
},
&ctx,
)
.await
.map_err(|e| service_err("room_message_create", e))?;
Ok(Some(WsOutEvent::MessageNew {
room_id: room,
data: message::MessageNewService {
id: msg.id,
seq: msg.seq,
room: msg.room,
sender_type: msg.sender_type,
sender_id: msg.sender_id,
display_name: msg.display_name,
thread: msg.thread,
in_reply_to: msg.in_reply_to,
content: msg.content,
content_type: msg.content_type,
thinking_content: msg.thinking_content,
thinking_is_chunked: false,
send_at: msg.send_at,
reactions: None,
},
}))
}
WsInMessage::MessageUpdate { message, content } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_message_update(message, room::RoomMessageUpdateRequest { content }, &ctx)
.await
.map_err(|e| service_err("room_message_update", e))?;
Ok(None)
}
WsInMessage::MessageRevoke { message } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_message_revoke(message, &ctx)
.await
.map_err(|e| service_err("room_message_revoke", e))?;
Ok(None)
}
WsInMessage::RoomGet { room } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.require_room_access(room, ctx.user_id)
.await
.map_err(|e| service_err("require_room_access", e))?;
let rm = session
.service
.room
.find_room_or_404(room)
.await
.map_err(|e| service_err("find_room", e))?;
Ok(Some(WsOutEvent::RoomCreated {
room_id: rm.id,
data: crate::event::rooms::RoomCreatedService {
id: rm.id,
project: rm.project,
room_name: rm.room_name,
public: rm.public,
category: rm.category,
created_by: rm.created_by,
created_at: rm.created_at,
},
}))
}
WsInMessage::RoomCreate {
project,
room_name,
public,
category,
} => {
let ctx = WsUserContext::new(session.user.user_id);
let rm = session
.service
.room
.room_create(
project.to_string(),
room::RoomCreateRequest {
room_name,
public,
category,
},
&ctx,
)
.await
.map_err(|e| service_err("room_create", e))?;
Ok(Some(WsOutEvent::RoomCreated {
room_id: rm.id,
data: crate::event::rooms::RoomCreatedService {
id: rm.id,
project,
room_name: rm.room_name,
public: rm.public,
category,
created_by: session.user.user_id,
created_at: rm.created_at,
},
}))
}
WsInMessage::RoomUpdate {
room,
room_name,
public,
category,
} => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_update(
room,
room::RoomUpdateRequest {
room_name,
public,
category,
},
&ctx,
)
.await
.map_err(|e| service_err("room_update", e))?;
Ok(None)
}
WsInMessage::RoomDelete { room } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_delete(room, &ctx)
.await
.map_err(|e| service_err("room_delete", e))?;
Ok(None)
}
WsInMessage::CategoryCreate {
project,
name,
position,
} => {
let ctx = WsUserContext::new(session.user.user_id);
let cat = session
.service
.room
.room_category_create(
project.to_string(),
room::RoomCategoryCreateRequest { name, position },
&ctx,
)
.await
.map_err(|e| service_err("room_category_create", e))?;
Ok(Some(WsOutEvent::CategoryCreated {
project,
data: category::CategoryCreatedService {
id: cat.id,
project,
name: cat.name,
position: cat.position,
created_by: session.user.user_id,
created_at: cat.created_at,
},
}))
}
WsInMessage::CategoryUpdate { id, name, position } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_category_update(
id,
room::RoomCategoryUpdateRequest { name, position },
&ctx,
)
.await
.map_err(|e| service_err("room_category_update", e))?;
Ok(None)
}
WsInMessage::CategoryDelete { id } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_category_delete(id, &ctx)
.await
.map_err(|e| service_err("room_category_delete", e))?;
Ok(None)
}
WsInMessage::AccessGrant { room, user } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_access_grant(room, user, &ctx)
.await
.map_err(|e| service_err("room_access_grant", e))?;
Ok(None)
}
WsInMessage::AccessRevoke { room, user } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_access_revoke(room, user, &ctx)
.await
.map_err(|e| service_err("room_access_revoke", e))?;
Ok(None)
}
WsInMessage::ReactionAdd {
room,
message,
emoji,
} => {
let ctx = WsUserContext::new(session.user.user_id);
let rxs = session
.service
.room
.message_reaction_add(message, emoji, &ctx)
.await
.map_err(|e| service_err("message_reaction_add", e))?;
Ok(Some(WsOutEvent::ReactionBatchUpdated {
room_id: room,
data: reaction::ReactionBatchUpdatedService {
room: room,
message: message,
reactions: rxs
.reactions
.into_iter()
.map(|g| reaction::ReactionGroup {
emoji: g.emoji,
count: g.count as i64,
reacted_by_me: g.reacted_by_me,
users: g
.users
.iter()
.filter_map(|u| u.parse::<uuid::Uuid>().ok())
.collect(),
})
.collect(),
},
}))
}
WsInMessage::ReactionRemove {
room,
message,
emoji,
} => {
let ctx = WsUserContext::new(session.user.user_id);
let rxs = session
.service
.room
.message_reaction_remove(message, emoji, &ctx)
.await
.map_err(|e| service_err("message_reaction_remove", e))?;
Ok(Some(WsOutEvent::ReactionBatchUpdated {
room_id: room,
data: reaction::ReactionBatchUpdatedService {
room: room,
message: message,
reactions: rxs
.reactions
.into_iter()
.map(|g| reaction::ReactionGroup {
emoji: g.emoji,
count: g.count as i64,
reacted_by_me: g.reacted_by_me,
users: g
.users
.iter()
.filter_map(|u| u.parse::<uuid::Uuid>().ok())
.collect(),
})
.collect(),
},
}))
}
WsInMessage::ThreadCreate { room, parent } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_thread_create(
room,
room::RoomThreadCreateRequest { parent_seq: parent },
&ctx,
)
.await
.map_err(|e| service_err("room_thread_create", e))?;
Ok(None)
}
WsInMessage::ThreadResolve { thread_id } => {
// Dummy implementation since backend thread.rs doesn't have resolve yet
tracing::info!(%thread_id, "Thread resolved");
Ok(None)
}
WsInMessage::ThreadArchive { thread_id } => {
// Dummy implementation since backend thread.rs doesn't have archive yet
tracing::info!(%thread_id, "Thread archived");
Ok(None)
}
WsInMessage::PinAdd { room: _, message } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_pin_add(message, &ctx)
.await
.map_err(|e| service_err("room_pin_add", e))?;
Ok(None)
}
WsInMessage::PinRemove { room: _, message } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_pin_remove(message, &ctx)
.await
.map_err(|e| service_err("room_pin_remove", e))?;
Ok(None)
}
WsInMessage::DraftSave { room, content } => {
let ctx = WsUserContext::new(session.user.user_id);
let draft = session
.service
.room
.draft_save(room, content, &ctx)
.await
.map_err(|e| service_err("draft_save", e))?;
Ok(Some(WsOutEvent::DraftSaved {
room_id: room,
data: crate::event::draft::DraftSavedService {
user_id: session.user.user_id,
room: draft.room_id,
content: draft.content,
saved_at: draft.saved_at,
},
}))
}
WsInMessage::DraftClear { room } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.draft_clear(room, &ctx)
.await
.map_err(|e| service_err("draft_clear", e))?;
Ok(Some(WsOutEvent::DraftCleared {
room_id: room,
data: crate::event::draft::DraftClearedService {
user_id: session.user.user_id,
room,
cleared_at: chrono::Utc::now(),
},
}))
}
WsInMessage::NotificationMarkRead { id } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.notification_mark_read(id, &ctx)
.await
.map_err(|e| service_err("notification_mark_read", e))?;
Ok(None)
}
WsInMessage::NotificationMarkAllRead { .. } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.notification_mark_all_read(&ctx)
.await
.map_err(|e| service_err("notification_mark_all_read", e))?;
Ok(None)
}
WsInMessage::NotificationArchive { id } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.notification_archive(id, &ctx)
.await
.map_err(|e| service_err("notification_archive", e))?;
Ok(None)
}
WsInMessage::Search {
q,
room,
start_time,
end_time,
sender_id,
content_type,
limit,
offset,
} => {
let ctx = WsUserContext::new(session.user.user_id);
let room_id = room.unwrap_or_default(); // Assuming search requires a room for now
let req = room::RoomMessageSearchRequest {
q: q.clone(),
start_time,
end_time,
sender_id,
content_type,
limit,
offset,
};
let res = session
.service
.room
.room_message_search(room_id, req, &ctx)
.await
.map_err(|e| service_err("room_message_search", e))?;
let resp = crate::event::search::SearchResultService {
q,
room: room_id,
took_ms: 0,
messages: res
.messages
.into_iter()
.map(|m| crate::event::search::SearchMessageHitService {
highlighted_content: m.highlighted_content.unwrap_or_default(),
message: crate::event::message::MessageNewService {
id: m.id,
seq: m.seq,
room: m.room,
sender_type: m.sender_type,
sender_id: m.sender_id,
display_name: m.display_name,
thread: m.thread,
in_reply_to: m.in_reply_to,
content: m.content,
content_type: m.content_type,
thinking_content: m.thinking_content,
thinking_is_chunked: m.thinking_is_chunked,
send_at: m.send_at,
reactions: None,
},
})
.collect(),
total: res.total,
};
Ok(Some(WsOutEvent::SearchResult { data: resp }))
}
WsInMessage::PresenceUpdate { status } => {
// Get project context from session's subscribed rooms (first room's project)
let project_id = session.get_current_project().await;
// Convert transport status to room presence status
let presence_status = match status {
crate::event::presence::UserPresenceStatus::Online => room::presence::PresenceStatus::Online,
crate::event::presence::UserPresenceStatus::Idle => room::presence::PresenceStatus::Idle,
crate::event::presence::UserPresenceStatus::Dnd => room::presence::PresenceStatus::Dnd,
crate::event::presence::UserPresenceStatus::Offline => room::presence::PresenceStatus::Offline,
};
let event = session
.service
.room
.set_user_presence(session.user.user_id, project_id, presence_status)
.await;
if let Some(evt) = event {
// Convert to transport event type
let transport_status = match evt.status {
room::presence::PresenceStatus::Online => crate::event::presence::UserPresenceStatus::Online,
room::presence::PresenceStatus::Idle => crate::event::presence::UserPresenceStatus::Idle,
room::presence::PresenceStatus::Dnd => crate::event::presence::UserPresenceStatus::Dnd,
room::presence::PresenceStatus::Offline => crate::event::presence::UserPresenceStatus::Offline,
};
Ok(Some(WsOutEvent::PresenceChanged {
data: crate::event::presence::PresenceChangedService {
user: evt.user_id,
project: evt.project_id,
status: transport_status,
last_seen_at: evt.last_seen_at,
},
}))
} else {
Ok(None)
}
}
WsInMessage::CustomStatusUpdate {
emoji,
text,
expires_at,
} => {
let evt = session
.service
.room
.set_custom_status(session.user.user_id, emoji.clone(), text.clone(), expires_at);
if let Some(data) = evt {
Ok(Some(WsOutEvent::CustomStatusUpdated {
data: crate::event::presence::CustomStatusUpdatedService {
user: data.user_id,
emoji: data.emoji,
text: data.text,
expires_at: data.expires_at,
},
}))
} else {
Ok(None)
}
}
WsInMessage::InviteCreate { .. } => {
tracing::info!("Invite create");
Ok(None)
}
WsInMessage::InviteAccept { .. } => {
tracing::info!("Invite accept");
Ok(None)
}
WsInMessage::InviteRevoke { .. } => {
tracing::info!("Invite revoke");
Ok(None)
}
WsInMessage::BanCreate { .. } => {
tracing::info!("Ban create");
Ok(None)
}
WsInMessage::BanRemove { .. } => {
tracing::info!("Ban remove");
Ok(None)
}
WsInMessage::VoiceJoin { room } => {
tracing::info!(%room, "Voice join");
Ok(None)
}
WsInMessage::VoiceLeave { room } => {
tracing::info!(%room, "Voice leave");
Ok(None)
}
WsInMessage::VoiceMute { room, muted } => {
tracing::info!(%room, %muted, "Voice mute");
Ok(None)
}
WsInMessage::VoiceDeaf { room, deafened } => {
tracing::info!(%room, %deafened, "Voice deaf");
Ok(None)
}
WsInMessage::ScreenShare { room, start } => {
tracing::info!(%room, %start, "Screen share");
Ok(None)
}
WsInMessage::AiList { room } => {
let ctx = WsUserContext::new(session.user.user_id);
let ai_list = session
.service
.room
.room_ai_list(room, &ctx)
.await
.map_err(|e| service_err("room_ai_list", e))?;
let data = serde_json::to_value(ai_list).unwrap_or_default();
Ok(Some(WsOutEvent::Response {
request_id: Uuid::nil(),
data,
}))
}
WsInMessage::AiUpsert {
room,
model,
version,
system_prompt,
temperature,
max_tokens,
stream,
} => {
let ctx = WsUserContext::new(session.user.user_id);
let req = room::RoomAiUpsertRequest {
model,
version,
system_prompt,
temperature,
max_tokens,
stream,
history_limit: None,
use_exact: None,
think: None,
min_score: None,
agent_type: None,
};
let ai_model = session
.service
.room
.room_ai_upsert(room, req, &ctx)
.await
.map_err(|e| service_err("room_ai_upsert", e))?;
let data = serde_json::to_value(ai_model).unwrap_or_default();
Ok(Some(WsOutEvent::Response {
request_id: Uuid::nil(),
data,
}))
}
WsInMessage::AiDelete { room, agent_id } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_ai_delete(room, agent_id, &ctx)
.await
.map_err(|e| service_err("room_ai_delete", e))?;
Ok(None)
}
WsInMessage::AiStop { room } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_ai_stop(room, &ctx)
.await
.map_err(|e| service_err("room_ai_stop", e))?;
Ok(None)
}
WsInMessage::UserSummary { username } => {
let ctx = session.to_session();
let summary = session
.service
.user_get_summary(ctx, username)
.await
.map_err(|e| service_err("user_get_summary", e))?;
let data = serde_json::to_value(summary).unwrap_or_default();
Ok(Some(WsOutEvent::Response {
request_id: Uuid::nil(), // Filled by ws_handler if rid exists
data,
}))
}
WsInMessage::StateSetReadSeq {
room,
last_read_seq,
} => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_user_state_update_read_seq(room, last_read_seq, &ctx)
.await
.map_err(|e| service_err("room_user_state_update_read_seq", e))?;
Ok(None)
}
WsInMessage::StateUpdateDnd {
room,
do_not_disturb,
dnd_start_hour,
dnd_end_hour,
} => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_user_state_update_dnd(
room,
room::RoomUserStateUpdateDndRequest {
do_not_disturb,
dnd_start_hour,
dnd_end_hour,
},
&ctx,
)
.await
.map_err(|e| service_err("room_user_state_update_dnd", e))?;
Ok(None)
}
}
}
}

View File

@ -0,0 +1,147 @@
use room::ws_context::WsUserContext;
use crate::error::AppTransportError;
use crate::event::{message, reaction};
use crate::handler::session::TransportSession;
use crate::handler::types::WsOutEvent;
pub(crate) async fn message_list(
session: &TransportSession,
room: models::RoomId,
before_seq: Option<i64>,
after_seq: Option<i64>,
limit: Option<u64>,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
let list = session
.service
.room
.room_message_list(room, before_seq, after_seq, limit, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_message_list failed");
AppTransportError::Internal
})?;
let resp = message::MessageListService {
room,
messages: list
.messages
.iter()
.map(|m| message::MessageNewService {
id: m.id,
seq: m.seq,
room: m.room,
sender_type: m.sender_type.clone(),
sender_id: m.sender_id,
display_name: m.display_name.clone(),
thread: m.thread,
in_reply_to: m.in_reply_to,
content: m.content.clone(),
content_type: m.content_type.clone(),
thinking_content: m.thinking_content.clone(),
thinking_is_chunked: m.thinking_is_chunked,
send_at: m.send_at,
reactions: Some(
m.reactions
.iter()
.map(|r| reaction::ReactionGroup {
emoji: r.emoji.clone(),
count: r.count as i64,
reacted_by_me: r.reacted_by_me,
users: r.users.iter().filter_map(|u| uuid::Uuid::parse_str(u).ok()).collect(),
})
.collect(),
),
})
.collect(),
total: list.total,
};
Ok(Some(WsOutEvent::MessageList {
room_id: room,
data: resp,
}))
}
pub(crate) async fn message_create(
session: &TransportSession,
room: models::RoomId,
content: String,
content_type: Option<String>,
thread: Option<models::RoomThreadId>,
in_reply_to: Option<uuid::Uuid>,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
let msg = session
.service
.room
.room_message_create(
room,
room::RoomMessageCreateRequest {
content,
content_type,
thread,
in_reply_to,
attachment_ids: vec![],
},
&ctx,
)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_message_create failed");
AppTransportError::Internal
})?;
Ok(Some(WsOutEvent::MessageNew {
room_id: room,
data: message::MessageNewService {
id: msg.id,
seq: msg.seq,
room: msg.room,
sender_type: msg.sender_type,
sender_id: msg.sender_id,
display_name: msg.display_name,
thread: msg.thread,
in_reply_to: msg.in_reply_to,
content: msg.content,
content_type: msg.content_type,
thinking_content: msg.thinking_content,
thinking_is_chunked: false,
send_at: msg.send_at,
reactions: None,
},
}))
}
pub(crate) async fn message_update(
session: &TransportSession,
message: uuid::Uuid,
content: String,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_message_update(message, room::RoomMessageUpdateRequest { content }, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_message_update failed");
AppTransportError::Internal
})?;
Ok(None)
}
pub(crate) async fn message_revoke(
session: &TransportSession,
message: uuid::Uuid,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_message_revoke(message, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_message_revoke failed");
AppTransportError::Internal
})?;
Ok(None)
}

View File

@ -0,0 +1,415 @@
use room::ws_context::WsUserContext;
use uuid::Uuid;
use crate::error::AppTransportError;
use crate::event::{message, presence as pres, search as srch};
use crate::handler::session::TransportSession;
use crate::handler::types::WsOutEvent;
// ─── Notification ──────────────────────────────────────────────────────
pub(crate) async fn notification_mark_read(
session: &TransportSession,
id: Uuid,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.notification_mark_read(id, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "notification_mark_read failed");
AppTransportError::Internal
})?;
Ok(None)
}
pub(crate) async fn notification_mark_all_read(
session: &TransportSession,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.notification_mark_all_read(&ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "notification_mark_all_read failed");
AppTransportError::Internal
})?;
Ok(None)
}
pub(crate) async fn notification_archive(
session: &TransportSession,
id: Uuid,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.notification_archive(id, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "notification_archive failed");
AppTransportError::Internal
})?;
Ok(None)
}
// ─── Search ────────────────────────────────────────────────────────────
pub(crate) async fn search(
session: &TransportSession,
q: String,
room: Option<models::RoomId>,
start_time: Option<chrono::DateTime<chrono::Utc>>,
end_time: Option<chrono::DateTime<chrono::Utc>>,
sender_id: Option<Uuid>,
content_type: Option<String>,
limit: Option<u64>,
offset: Option<u64>,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
let room_id = room.unwrap_or_default();
let req = room::RoomMessageSearchRequest {
q: q.clone(),
start_time,
end_time,
sender_id,
content_type,
limit,
offset,
};
let res = session
.service
.room
.room_message_search(room_id, req, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_message_search failed");
AppTransportError::Internal
})?;
let resp = srch::SearchResultService {
q,
room: room_id,
took_ms: 0,
messages: res
.messages
.into_iter()
.map(|m| srch::SearchMessageHitService {
highlighted_content: m.highlighted_content.unwrap_or_default(),
message: message::MessageNewService {
id: m.id,
seq: m.seq,
room: m.room,
sender_type: m.sender_type,
sender_id: m.sender_id,
display_name: m.display_name,
thread: m.thread,
in_reply_to: m.in_reply_to,
content: m.content,
content_type: m.content_type,
thinking_content: m.thinking_content,
thinking_is_chunked: m.thinking_is_chunked,
send_at: m.send_at,
reactions: None,
},
})
.collect(),
total: res.total,
};
Ok(Some(WsOutEvent::SearchResult { data: resp }))
}
// ─── Presence ──────────────────────────────────────────────────────────
pub(crate) async fn presence_update(
session: &TransportSession,
status: pres::UserPresenceStatus,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let project_id = session.get_current_project().await;
let presence_status = match status {
pres::UserPresenceStatus::Online => room::presence::PresenceStatus::Online,
pres::UserPresenceStatus::Idle => room::presence::PresenceStatus::Idle,
pres::UserPresenceStatus::Dnd => room::presence::PresenceStatus::Dnd,
pres::UserPresenceStatus::Offline => room::presence::PresenceStatus::Offline,
};
let event = session
.service
.room
.set_user_presence(session.user.user_id, project_id, presence_status)
.await;
if let Some(evt) = event {
let transport_status = match evt.status {
room::presence::PresenceStatus::Online => pres::UserPresenceStatus::Online,
room::presence::PresenceStatus::Idle => pres::UserPresenceStatus::Idle,
room::presence::PresenceStatus::Dnd => pres::UserPresenceStatus::Dnd,
room::presence::PresenceStatus::Offline => pres::UserPresenceStatus::Offline,
};
Ok(Some(WsOutEvent::PresenceChanged {
data: pres::PresenceChangedService {
user: evt.user_id,
project: evt.project_id,
status: transport_status,
last_seen_at: evt.last_seen_at,
},
}))
} else {
Ok(None)
}
}
pub(crate) async fn custom_status_update(
session: &TransportSession,
emoji: Option<String>,
text: Option<String>,
expires_at: Option<chrono::DateTime<chrono::Utc>>,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let evt = session
.service
.room
.set_custom_status(session.user.user_id, emoji.clone(), text.clone(), expires_at);
if let Some(data) = evt {
Ok(Some(WsOutEvent::CustomStatusUpdated {
data: pres::CustomStatusUpdatedService {
user: data.user_id,
emoji: data.emoji,
text: data.text,
expires_at: data.expires_at,
},
}))
} else {
Ok(None)
}
}
// ─── Invite (stubs) ────────────────────────────────────────────────────
pub(crate) async fn invite_create() -> Result<Option<WsOutEvent>, AppTransportError> {
tracing::info!("Invite create");
Ok(None)
}
pub(crate) async fn invite_accept() -> Result<Option<WsOutEvent>, AppTransportError> {
tracing::info!("Invite accept");
Ok(None)
}
pub(crate) async fn invite_revoke() -> Result<Option<WsOutEvent>, AppTransportError> {
tracing::info!("Invite revoke");
Ok(None)
}
// ─── Ban (stubs) ──────────────────────────────────────────────────────
pub(crate) async fn ban_create() -> Result<Option<WsOutEvent>, AppTransportError> {
tracing::info!("Ban create");
Ok(None)
}
pub(crate) async fn ban_remove() -> Result<Option<WsOutEvent>, AppTransportError> {
tracing::info!("Ban remove");
Ok(None)
}
// ─── Voice (stubs) ────────────────────────────────────────────────────
pub(crate) async fn voice_join(room: models::RoomId) -> Result<Option<WsOutEvent>, AppTransportError> {
tracing::info!(%room, "Voice join");
Ok(None)
}
pub(crate) async fn voice_leave(room: models::RoomId) -> Result<Option<WsOutEvent>, AppTransportError> {
tracing::info!(%room, "Voice leave");
Ok(None)
}
pub(crate) async fn voice_mute(room: models::RoomId, muted: bool) -> Result<Option<WsOutEvent>, AppTransportError> {
tracing::info!(%room, %muted, "Voice mute");
Ok(None)
}
pub(crate) async fn voice_deaf(room: models::RoomId, deafened: bool) -> Result<Option<WsOutEvent>, AppTransportError> {
tracing::info!(%room, %deafened, "Voice deaf");
Ok(None)
}
pub(crate) async fn screen_share(room: models::RoomId, start: bool) -> Result<Option<WsOutEvent>, AppTransportError> {
tracing::info!(%room, %start, "Screen share");
Ok(None)
}
// ─── AI ────────────────────────────────────────────────────────────────
pub(crate) async fn ai_list(
session: &TransportSession,
room: models::RoomId,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
let ai_list = session
.service
.room
.room_ai_list(room, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_ai_list failed");
AppTransportError::Internal
})?;
let data = serde_json::to_value(ai_list).unwrap_or_default();
Ok(Some(WsOutEvent::Response {
request_id: Uuid::nil(),
data,
}))
}
pub(crate) async fn ai_upsert(
session: &TransportSession,
room: models::RoomId,
model: Uuid,
version: Option<Uuid>,
system_prompt: Option<String>,
temperature: Option<f64>,
max_tokens: Option<i64>,
stream: Option<bool>,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
let req = room::RoomAiUpsertRequest {
model,
version,
system_prompt,
temperature,
max_tokens,
stream,
history_limit: None,
use_exact: None,
think: None,
min_score: None,
agent_type: None,
};
let ai_model = session
.service
.room
.room_ai_upsert(room, req, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_ai_upsert failed");
AppTransportError::Internal
})?;
let data = serde_json::to_value(ai_model).unwrap_or_default();
Ok(Some(WsOutEvent::Response {
request_id: Uuid::nil(),
data,
}))
}
pub(crate) async fn ai_delete(
session: &TransportSession,
room: models::RoomId,
agent_id: Uuid,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_ai_delete(room, agent_id, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_ai_delete failed");
AppTransportError::Internal
})?;
Ok(None)
}
pub(crate) async fn ai_stop(
session: &TransportSession,
room: models::RoomId,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_ai_stop(room, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_ai_stop failed");
AppTransportError::Internal
})?;
Ok(None)
}
// ─── User ──────────────────────────────────────────────────────────────
pub(crate) async fn user_summary(
session: &TransportSession,
username: String,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = session.to_session();
let summary = session
.service
.user_get_summary(ctx, username)
.await
.map_err(|e| {
tracing::warn!(error = %e, "user_get_summary failed");
AppTransportError::Internal
})?;
let data = serde_json::to_value(summary).unwrap_or_default();
Ok(Some(WsOutEvent::Response {
request_id: Uuid::nil(),
data,
}))
}
// ─── State ─────────────────────────────────────────────────────────────
pub(crate) async fn state_set_read_seq(
session: &TransportSession,
room: models::RoomId,
last_read_seq: i64,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_user_state_update_read_seq(room, last_read_seq, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_user_state_update_read_seq failed");
AppTransportError::Internal
})?;
Ok(None)
}
pub(crate) async fn state_update_dnd(
session: &TransportSession,
room: models::RoomId,
do_not_disturb: Option<bool>,
dnd_start_hour: Option<i16>,
dnd_end_hour: Option<i16>,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_user_state_update_dnd(
room,
room::RoomUserStateUpdateDndRequest {
do_not_disturb,
dnd_start_hour,
dnd_end_hour,
},
&ctx,
)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_user_state_update_dnd failed");
AppTransportError::Internal
})?;
Ok(None)
}

View File

@ -0,0 +1,118 @@
use crate::error::AppTransportError;
use crate::handler::session::TransportSession;
use crate::handler::types::{WsInMessage, WsOutEvent};
mod misc;
mod msg;
mod message;
mod reaction;
mod room;
pub struct MessageHandler;
impl MessageHandler {
pub async fn handle(
session: &TransportSession,
msg: WsInMessage,
) -> Result<Option<WsOutEvent>, AppTransportError> {
match msg {
WsInMessage::Ping => msg::ping().await,
WsInMessage::Subscribe { room } => msg::subscribe(session, room).await,
WsInMessage::Unsubscribe { room } => msg::unsubscribe(session, room).await,
WsInMessage::TypingStart { room } => msg::typing_start(session, room).await,
WsInMessage::TypingStop { room } => msg::typing_stop(session, room).await,
WsInMessage::ReadReceipt { room, last_read_seq } => {
msg::read_receipt(session, room, last_read_seq).await
}
WsInMessage::MessageList { room, before_seq, after_seq, limit } => {
message::message_list(session, room, before_seq, after_seq, limit).await
}
WsInMessage::MessageCreate { room, content, content_type, thread, in_reply_to } => {
message::message_create(session, room, content, content_type, thread, in_reply_to).await
}
WsInMessage::MessageUpdate { message, content } => {
message::message_update(session, message, content).await
}
WsInMessage::MessageRevoke { message } => {
message::message_revoke(session, message).await
}
WsInMessage::RoomGet { room } => room::room_get(session, room).await,
WsInMessage::RoomCreate { project, room_name, public, category } => {
room::room_create(session, project, room_name, public, category).await
}
WsInMessage::RoomUpdate { room, room_name, public, category } => {
room::room_update(session, room, room_name, public, category).await
}
WsInMessage::RoomDelete { room } => room::room_delete(session, room).await,
WsInMessage::CategoryCreate { project, name, position } => {
room::category_create(session, project, name, position).await
}
WsInMessage::CategoryUpdate { id, name, position } => {
room::category_update(session, id, name, position).await
}
WsInMessage::CategoryDelete { id } => room::category_delete(session, id).await,
WsInMessage::AccessGrant { room, user } => room::access_grant(session, room, user).await,
WsInMessage::AccessRevoke { room, user } => room::access_revoke(session, room, user).await,
WsInMessage::ReactionAdd { room, message, emoji } => {
reaction::reaction_add(session, room, message, emoji).await
}
WsInMessage::ReactionRemove { room, message, emoji } => {
reaction::reaction_remove(session, room, message, emoji).await
}
WsInMessage::ThreadCreate { room, parent } => {
reaction::thread_create(session, room, parent).await
}
WsInMessage::ThreadResolve { thread_id } => reaction::thread_resolve(thread_id).await,
WsInMessage::ThreadArchive { thread_id } => reaction::thread_archive(thread_id).await,
WsInMessage::PinAdd { message, .. } => reaction::pin_add(session, message).await,
WsInMessage::PinRemove { message, .. } => reaction::pin_remove(session, message).await,
WsInMessage::DraftSave { room, content } => {
reaction::draft_save(session, room, content).await
}
WsInMessage::DraftClear { room } => reaction::draft_clear(session, room).await,
WsInMessage::NotificationMarkRead { id } => {
misc::notification_mark_read(session, id).await
}
WsInMessage::NotificationMarkAllRead { .. } => {
misc::notification_mark_all_read(session).await
}
WsInMessage::NotificationArchive { id } => {
misc::notification_archive(session, id).await
}
WsInMessage::Search { q, room, start_time, end_time, sender_id, content_type, limit, offset } => {
misc::search(session, q, room, start_time, end_time, sender_id, content_type, limit, offset).await
}
WsInMessage::PresenceUpdate { status } => {
misc::presence_update(session, status).await
}
WsInMessage::CustomStatusUpdate { emoji, text, expires_at } => {
misc::custom_status_update(session, emoji, text, expires_at).await
}
WsInMessage::InviteCreate { .. } => misc::invite_create().await,
WsInMessage::InviteAccept { .. } => misc::invite_accept().await,
WsInMessage::InviteRevoke { .. } => misc::invite_revoke().await,
WsInMessage::BanCreate { .. } => misc::ban_create().await,
WsInMessage::BanRemove { .. } => misc::ban_remove().await,
WsInMessage::VoiceJoin { room } => misc::voice_join(room).await,
WsInMessage::VoiceLeave { room } => misc::voice_leave(room).await,
WsInMessage::VoiceMute { room, muted } => misc::voice_mute(room, muted).await,
WsInMessage::VoiceDeaf { room, deafened } => misc::voice_deaf(room, deafened).await,
WsInMessage::ScreenShare { room, start } => misc::screen_share(room, start).await,
WsInMessage::AiList { room } => misc::ai_list(session, room).await,
WsInMessage::AiUpsert { room, model, version, system_prompt, temperature, max_tokens, stream } => {
misc::ai_upsert(session, room, model, version, system_prompt, temperature, max_tokens, stream).await
}
WsInMessage::AiDelete { room, agent_id } => {
misc::ai_delete(session, room, agent_id).await
}
WsInMessage::AiStop { room } => misc::ai_stop(session, room).await,
WsInMessage::UserSummary { username } => misc::user_summary(session, username).await,
WsInMessage::StateSetReadSeq { room, last_read_seq } => {
misc::state_set_read_seq(session, room, last_read_seq).await
}
WsInMessage::StateUpdateDnd { room, do_not_disturb, dnd_start_hour, dnd_end_hour } => {
misc::state_update_dnd(session, room, do_not_disturb, dnd_start_hour, dnd_end_hour).await
}
}
}
}

View File

@ -0,0 +1,67 @@
use room::ws_context::WsUserContext;
use crate::error::AppTransportError;
use crate::handler::session::TransportSession;
use crate::handler::types::WsOutEvent;
pub(crate) async fn ping() -> Result<Option<WsOutEvent>, AppTransportError> {
Ok(Some(WsOutEvent::Pong {
protocol_version: crate::handler::types::WS_PROTOCOL_VERSION,
}))
}
pub(crate) async fn subscribe(
session: &TransportSession,
room: models::RoomId,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let sub = session.subscribe_room(room).await.map_err(|e| {
tracing::warn!(error = %e, "subscribe_room failed");
AppTransportError::Internal
})?;
session.subscriptions.insert(room, sub);
session.service.room.spawn_room_workers(room);
session.refresh_project().await;
Ok(None)
}
pub(crate) async fn unsubscribe(
session: &TransportSession,
room: models::RoomId,
) -> Result<Option<WsOutEvent>, AppTransportError> {
session.unsubscribe_room(room).await;
Ok(None)
}
pub(crate) async fn typing_start(
session: &TransportSession,
room: models::RoomId,
) -> Result<Option<WsOutEvent>, AppTransportError> {
session.broadcast_typing(room, "start").await;
Ok(None)
}
pub(crate) async fn typing_stop(
session: &TransportSession,
room: models::RoomId,
) -> Result<Option<WsOutEvent>, AppTransportError> {
session.broadcast_typing(room, "stop").await;
Ok(None)
}
pub(crate) async fn read_receipt(
session: &TransportSession,
room: models::RoomId,
last_read_seq: i64,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_user_state_update_read_seq(room, last_read_seq, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_user_state_update_read_seq failed");
AppTransportError::Internal
})?;
Ok(None)
}

View File

@ -0,0 +1,180 @@
use room::ws_context::WsUserContext;
use uuid::Uuid;
use crate::error::AppTransportError;
use crate::event::reaction;
use crate::handler::session::TransportSession;
use crate::handler::types::WsOutEvent;
pub(crate) async fn reaction_add(
session: &TransportSession,
room: models::RoomId,
message: Uuid,
emoji: String,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
let rxs = session
.service
.room
.message_reaction_add(message, emoji, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "message_reaction_add failed");
AppTransportError::Internal
})?;
Ok(Some(build_reaction_batch(room, message, rxs.reactions)))
}
pub(crate) async fn reaction_remove(
session: &TransportSession,
room: models::RoomId,
message: Uuid,
emoji: String,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
let rxs = session
.service
.room
.message_reaction_remove(message, emoji, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "message_reaction_remove failed");
AppTransportError::Internal
})?;
Ok(Some(build_reaction_batch(room, message, rxs.reactions)))
}
fn build_reaction_batch(
room: models::RoomId,
message: Uuid,
reactions: Vec<room::ReactionGroupResponse>,
) -> WsOutEvent {
WsOutEvent::ReactionBatchUpdated {
room_id: room,
data: reaction::ReactionBatchUpdatedService {
room,
message,
reactions: reactions
.into_iter()
.map(|g| reaction::ReactionGroup {
emoji: g.emoji,
count: g.count as i64,
reacted_by_me: g.reacted_by_me,
users: g.users.iter().filter_map(|u| u.parse::<Uuid>().ok()).collect(),
})
.collect(),
},
}
}
pub(crate) async fn thread_create(
session: &TransportSession,
room: models::RoomId,
parent: i64,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_thread_create(room, room::RoomThreadCreateRequest { parent_seq: parent }, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_thread_create failed");
AppTransportError::Internal
})?;
Ok(None)
}
pub(crate) async fn thread_resolve(thread_id: models::RoomThreadId) -> Result<Option<WsOutEvent>, AppTransportError> {
tracing::info!(%thread_id, "Thread resolved");
Ok(None)
}
pub(crate) async fn thread_archive(thread_id: models::RoomThreadId) -> Result<Option<WsOutEvent>, AppTransportError> {
tracing::info!(%thread_id, "Thread archived");
Ok(None)
}
pub(crate) async fn pin_add(
session: &TransportSession,
message: Uuid,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_pin_add(message, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_pin_add failed");
AppTransportError::Internal
})?;
Ok(None)
}
pub(crate) async fn pin_remove(
session: &TransportSession,
message: Uuid,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_pin_remove(message, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_pin_remove failed");
AppTransportError::Internal
})?;
Ok(None)
}
pub(crate) async fn draft_save(
session: &TransportSession,
room: models::RoomId,
content: String,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
let draft = session
.service
.room
.draft_save(room, content, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "draft_save failed");
AppTransportError::Internal
})?;
Ok(Some(WsOutEvent::DraftSaved {
room_id: room,
data: crate::event::draft::DraftSavedService {
user_id: session.user.user_id,
room: draft.room_id,
content: draft.content,
saved_at: draft.saved_at,
},
}))
}
pub(crate) async fn draft_clear(
session: &TransportSession,
room: models::RoomId,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.draft_clear(room, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "draft_clear failed");
AppTransportError::Internal
})?;
Ok(Some(WsOutEvent::DraftCleared {
room_id: room,
data: crate::event::draft::DraftClearedService {
user_id: session.user.user_id,
room,
cleared_at: chrono::Utc::now(),
},
}))
}

View File

@ -0,0 +1,237 @@
use room::ws_context::WsUserContext;
use uuid::Uuid;
use crate::error::AppTransportError;
use crate::event::{category, rooms};
use crate::handler::session::TransportSession;
use crate::handler::types::WsOutEvent;
pub(crate) async fn room_get(
session: &TransportSession,
room: models::RoomId,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.require_room_access(room, ctx.user_id)
.await
.map_err(|e| {
tracing::warn!(error = %e, "require_room_access failed");
AppTransportError::Internal
})?;
let rm = session
.service
.room
.find_room_or_404(room)
.await
.map_err(|e| {
tracing::warn!(error = %e, "find_room failed");
AppTransportError::Internal
})?;
Ok(Some(WsOutEvent::RoomCreated {
room_id: rm.id,
data: rooms::RoomCreatedService {
id: rm.id,
project: rm.project,
room_name: rm.room_name,
public: rm.public,
category: rm.category,
created_by: rm.created_by,
created_at: rm.created_at,
},
}))
}
pub(crate) async fn room_create(
session: &TransportSession,
project: models::ProjectId,
room_name: String,
public: bool,
category: Option<Uuid>,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
let rm = session
.service
.room
.room_create(
project.to_string(),
room::RoomCreateRequest {
room_name,
public,
category,
},
&ctx,
)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_create failed");
AppTransportError::Internal
})?;
Ok(Some(WsOutEvent::RoomCreated {
room_id: rm.id,
data: rooms::RoomCreatedService {
id: rm.id,
project,
room_name: rm.room_name,
public: rm.public,
category,
created_by: session.user.user_id,
created_at: rm.created_at,
},
}))
}
pub(crate) async fn room_update(
session: &TransportSession,
room: models::RoomId,
room_name: Option<String>,
public: Option<bool>,
category: Option<Uuid>,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_update(
room,
room::RoomUpdateRequest {
room_name,
public,
category,
},
&ctx,
)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_update failed");
AppTransportError::Internal
})?;
Ok(None)
}
pub(crate) async fn room_delete(
session: &TransportSession,
room: models::RoomId,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_delete(room, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_delete failed");
AppTransportError::Internal
})?;
Ok(None)
}
pub(crate) async fn category_create(
session: &TransportSession,
project: models::ProjectId,
name: String,
position: Option<i32>,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
let cat = session
.service
.room
.room_category_create(
project.to_string(),
room::RoomCategoryCreateRequest { name, position },
&ctx,
)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_category_create failed");
AppTransportError::Internal
})?;
Ok(Some(WsOutEvent::CategoryCreated {
project,
data: category::CategoryCreatedService {
id: cat.id,
project,
name: cat.name,
position: cat.position,
created_by: session.user.user_id,
created_at: cat.created_at,
},
}))
}
pub(crate) async fn category_update(
session: &TransportSession,
id: Uuid,
name: Option<String>,
position: Option<i32>,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_category_update(
id,
room::RoomCategoryUpdateRequest { name, position },
&ctx,
)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_category_update failed");
AppTransportError::Internal
})?;
Ok(None)
}
pub(crate) async fn category_delete(
session: &TransportSession,
id: Uuid,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_category_delete(id, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_category_delete failed");
AppTransportError::Internal
})?;
Ok(None)
}
pub(crate) async fn access_grant(
session: &TransportSession,
room: models::RoomId,
user: models::UserId,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_access_grant(room, user, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_access_grant failed");
AppTransportError::Internal
})?;
Ok(None)
}
pub(crate) async fn access_revoke(
session: &TransportSession,
room: models::RoomId,
user: models::UserId,
) -> Result<Option<WsOutEvent>, AppTransportError> {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_access_revoke(room, user, &ctx)
.await
.map_err(|e| {
tracing::warn!(error = %e, "room_access_revoke failed");
AppTransportError::Internal
})?;
Ok(None)
}

View File

@ -78,6 +78,9 @@ pub struct TransportSession {
pub subscriptions: Arc<DashMap<RoomId, RoomSubscription>>,
pub seq: Arc<SeqAllocator>,
pub service: Arc<AppService>,
/// Cached project_id resolved from the first subscribed room.
/// Avoids repeated DB lookups on every PresenceUpdate.
pub project_id: Mutex<Option<uuid::Uuid>>,
}
impl TransportSession {
@ -90,6 +93,7 @@ impl TransportSession {
subscriptions: Arc::new(DashMap::new()),
seq: Arc::new(SeqAllocator::new(service.cache.clone(), service.db.clone())),
service,
project_id: Mutex::new(None),
}
}
@ -128,26 +132,50 @@ impl TransportSession {
self.service.room.room_manager.broadcast_typing(room_id, event).await;
}
/// Get the current project context from the first subscribed room.
/// Returns the project_id if the user has any subscribed rooms.
/// Get the current project context from cache (populated on first subscription).
pub async fn get_current_project(&self) -> Option<uuid::Uuid> {
use models::rooms::room;
use sea_orm::EntityTrait;
let cached = self.project_id.lock().await;
if cached.is_some() {
return *cached;
}
drop(cached);
// Try to get the first subscribed room
// Lazy init: query first subscribed room, cache result.
let first_room = self.subscriptions.iter().next().map(|r| *r.key());
if let Some(room_id) = first_room {
// Query the room to get its project_id
use models::rooms::room;
use sea_orm::EntityTrait;
if let Ok(Some(rm)) = room::Entity::find_by_id(room_id)
.one(&self.service.db)
.await
{
let mut cached = self.project_id.lock().await;
*cached = Some(rm.project);
return Some(rm.project);
}
}
None
}
/// Refresh cached project_id — call after Subscribe/Unsubscribe if needed.
pub async fn refresh_project(&self) {
let first_room = self.subscriptions.iter().next().map(|r| *r.key());
let new_project = if let Some(room_id) = first_room {
use models::rooms::room;
use sea_orm::EntityTrait;
room::Entity::find_by_id(room_id)
.one(&self.service.db)
.await
.ok()
.flatten()
.map(|rm| rm.project)
} else {
None
};
let mut cached = self.project_id.lock().await;
*cached = new_project;
}
pub fn to_session(&self) -> session::Session {
let s = session::Session::no_op();
s.set_user(self.user.user_id);

View File

@ -1,559 +0,0 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use models::{ProjectId, RoomId, RoomThreadId, UserId};
use uuid::Uuid;
use crate::event::{
ai, attachment, ban, category, draft, invite, member, message, notify, pin, presence, project,
reaction, rooms, search, thread, voice,
};
/// Current WS protocol version — bump when event types or fields change.
pub const WS_PROTOCOL_VERSION: u32 = 1;
// ─── Outbound Event Envelope ────────────────────────────────────────────────
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum WsOutEvent {
Pong {
protocol_version: u32,
},
Error(WsError),
// ── Room events ──
RoomCreated {
room_id: RoomId,
data: rooms::RoomCreatedService,
},
RoomDeleted {
room_id: RoomId,
data: rooms::RoomDeletedService,
},
RoomRenamed {
room_id: RoomId,
data: rooms::RoomRenamedService,
},
RoomTopicUpdated {
room_id: RoomId,
data: rooms::RoomTopicUpdatedService,
},
RoomSettingsUpdated {
room_id: RoomId,
data: rooms::RoomSettingsUpdatedService,
},
RoomMoved {
room_id: RoomId,
data: rooms::RoomMovedService,
},
// ── Message events ──
MessageNew {
room_id: RoomId,
data: message::MessageNewService,
},
MessageEdited {
room_id: RoomId,
data: message::MessageEditedService,
},
MessageRevoked {
room_id: RoomId,
data: message::MessageRevokedService,
},
MessageStreamStart {
room_id: RoomId,
data: message::MessageStreamStartService,
},
MessageStreamChunk {
room_id: RoomId,
data: message::MessageStreamChunkService,
},
MessageStreamDone {
room_id: RoomId,
data: message::MessageStreamDoneService,
},
MessageList {
room_id: RoomId,
data: message::MessageListService,
},
// ── Member events ──
MemberJoined {
room_id: RoomId,
data: member::MemberJoinedService,
},
MemberRemoved {
room_id: RoomId,
data: member::MemberRemovedService,
},
ReadReceipt {
room_id: RoomId,
data: member::ReadReceiptService,
},
TypingStart {
room_id: RoomId,
data: member::TypingStartService,
},
TypingStop {
room_id: RoomId,
data: member::TypingStopService,
},
// ── Reaction events ──
ReactionAdded {
room_id: RoomId,
data: reaction::ReactionAddedService,
},
ReactionRemoved {
room_id: RoomId,
data: reaction::ReactionRemovedService,
},
ReactionBatchUpdated {
room_id: RoomId,
data: reaction::ReactionBatchUpdatedService,
},
// ── Thread events ──
ThreadCreated {
room_id: RoomId,
data: thread::ThreadCreatedService,
},
ThreadUpdated {
room_id: RoomId,
data: thread::ThreadUpdatedService,
},
ThreadResolved {
room_id: RoomId,
data: thread::ThreadResolvedService,
},
ThreadArchived {
room_id: RoomId,
data: thread::ThreadArchivedService,
},
ThreadParticipantJoined {
room_id: RoomId,
data: thread::ThreadParticipantJoinedService,
},
ThreadParticipantLeft {
room_id: RoomId,
data: thread::ThreadParticipantLeftService,
},
// ── Category events ──
CategoryCreated {
project: ProjectId,
data: category::CategoryCreatedService,
},
CategoryUpdated {
project: ProjectId,
data: category::CategoryUpdatedService,
},
CategoryDeleted {
project: ProjectId,
data: category::CategoryDeletedService,
},
// ── Pin events ──
PinAdded {
room_id: RoomId,
data: pin::PinAddedService,
},
PinRemoved {
room_id: RoomId,
data: pin::PinRemovedService,
},
// ── Project events ──
ProjectRoomCreated {
project: ProjectId,
data: project::ProjectRoomCreatedService,
},
ProjectRoomDeleted {
project: ProjectId,
data: project::ProjectRoomDeletedService,
},
ProjectRoomRenamed {
project: ProjectId,
data: project::ProjectRoomRenamedService,
},
ProjectRoomMoved {
project: ProjectId,
data: project::ProjectRoomMovedService,
},
// ── Draft events ──
DraftSaved {
room_id: RoomId,
data: draft::DraftSavedService,
},
DraftCleared {
room_id: RoomId,
data: draft::DraftClearedService,
},
// ── Search ──
SearchResult {
data: search::SearchResultService,
},
// ── Notification events ──
NotifyCreated {
data: notify::NotifyCreatedService,
},
NotifyRead {
data: notify::NotifyReadService,
},
// ── Presence events ──
PresenceChanged {
data: presence::PresenceChangedService,
},
CustomStatusUpdated {
data: presence::CustomStatusUpdatedService,
},
// ── Invite events ──
InviteCreated {
data: invite::InviteCreatedService,
},
InviteAccepted {
data: invite::InviteAcceptedService,
},
InviteRejected {
data: invite::InviteRejectedService,
},
InviteRevoked {
data: invite::InviteRevokedService,
},
// ── Attachment events ──
AttachmentUploaded {
data: attachment::AttachmentUploadedService,
},
AttachmentThumbnailGenerated {
data: attachment::AttachmentThumbnailService,
},
AttachmentDeleted {
data: attachment::AttachmentDeletedService,
},
// ── Ban events ──
UserBanned {
data: ban::BannedService,
},
UserUnbanned {
data: ban::UnbannedService,
},
// ── AI events ──
AiAgentJoined {
data: ai::AiAgentJoinedService,
},
AiAgentLeft {
data: ai::AiAgentLeftService,
},
AiAgentStatusChanged {
data: ai::AiAgentStatusChangedService,
},
// ── Voice events ──
VoiceChannelJoined {
data: voice::VoiceChannelJoinedService,
},
VoiceChannelLeft {
data: voice::VoiceChannelLeftService,
},
VoiceMuteUpdated {
data: voice::VoiceMuteUpdatedService,
},
VoiceDeafUpdated {
data: voice::VoiceDeafUpdatedService,
},
ScreenShareStarted {
data: voice::ScreenShareStartedService,
},
ScreenShareStopped {
data: voice::ScreenShareStoppedService,
},
SpeakingStarted {
room_id: RoomId,
user_id: UserId,
},
SpeakingStopped {
room_id: RoomId,
user_id: UserId,
},
// ── Request-response ──
Response {
request_id: Uuid,
data: serde_json::Value,
},
}
#[derive(Debug, Clone, Serialize)]
pub struct WsError {
pub code: i32,
pub error: String,
pub message: String,
}
// ─── Inbound Client Message ──────────────────────────────────────────────────
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum WsInMessage {
Ping,
// ── Room subscribe/unsubscribe ──
Subscribe {
room: RoomId,
},
Unsubscribe {
room: RoomId,
},
// ── Typing ──
TypingStart {
room: RoomId,
},
TypingStop {
room: RoomId,
},
// ── Read receipt ──
ReadReceipt {
room: RoomId,
last_read_seq: i64,
},
// ── Message operations ──
MessageList {
room: RoomId,
before_seq: Option<i64>,
after_seq: Option<i64>,
limit: Option<u64>,
},
MessageCreate {
room: RoomId,
content: String,
content_type: Option<String>,
thread: Option<RoomThreadId>,
in_reply_to: Option<Uuid>,
},
MessageUpdate {
message: Uuid,
content: String,
},
MessageRevoke {
message: Uuid,
},
// ── Room queries ──
RoomGet {
room: RoomId,
},
// ── Room CRUD ──
RoomCreate {
project: ProjectId,
room_name: String,
public: bool,
category: Option<Uuid>,
},
RoomUpdate {
room: RoomId,
room_name: Option<String>,
public: Option<bool>,
category: Option<Uuid>,
},
RoomDelete {
room: RoomId,
},
// ── Category CRUD ──
CategoryCreate {
project: ProjectId,
name: String,
position: Option<i32>,
},
CategoryUpdate {
id: Uuid,
name: Option<String>,
position: Option<i32>,
},
CategoryDelete {
id: Uuid,
},
// ── Access & state operations ──
AccessGrant {
room: RoomId,
user: UserId,
},
AccessRevoke {
room: RoomId,
user: UserId,
},
StateSetReadSeq {
room: RoomId,
last_read_seq: i64,
},
StateUpdateDnd {
room: RoomId,
do_not_disturb: Option<bool>,
dnd_start_hour: Option<i16>,
dnd_end_hour: Option<i16>,
},
// ── Reaction ──
ReactionAdd {
room: RoomId,
message: Uuid,
emoji: String,
},
ReactionRemove {
room: RoomId,
message: Uuid,
emoji: String,
},
// ── Thread ──
ThreadCreate {
room: RoomId,
parent: i64,
},
ThreadResolve {
thread_id: RoomThreadId,
},
ThreadArchive {
thread_id: RoomThreadId,
},
// ── Pin ──
PinAdd {
room: RoomId,
message: Uuid,
},
PinRemove {
room: RoomId,
message: Uuid,
},
// ── Draft ──
DraftSave {
room: RoomId,
content: String,
},
DraftClear {
room: RoomId,
},
// ── Search ──
Search {
q: String,
room: Option<RoomId>,
start_time: Option<DateTime<Utc>>,
end_time: Option<DateTime<Utc>>,
sender_id: Option<Uuid>,
content_type: Option<String>,
limit: Option<u64>,
offset: Option<u64>,
},
// ── Notification ──
NotificationMarkRead {
id: Uuid,
},
NotificationMarkAllRead {
project_id: Option<ProjectId>,
},
NotificationArchive {
id: Uuid,
},
// ── Presence ──
PresenceUpdate {
status: crate::event::presence::UserPresenceStatus,
},
CustomStatusUpdate {
emoji: Option<String>,
text: Option<String>,
expires_at: Option<DateTime<Utc>>,
},
// ── Invite ──
InviteCreate {
project: ProjectId,
room: Option<RoomId>,
max_uses: Option<i32>,
expires_at: Option<DateTime<Utc>>,
},
InviteAccept {
code: String,
},
InviteRevoke {
id: Uuid,
},
// ── Ban ──
BanCreate {
project: ProjectId,
user: UserId,
reason: Option<String>,
expires_at: Option<DateTime<Utc>>,
},
BanRemove {
project: ProjectId,
user: UserId,
},
// ── Voice ──
VoiceJoin {
room: RoomId,
},
VoiceLeave {
room: RoomId,
},
VoiceMute {
room: RoomId,
muted: bool,
},
VoiceDeaf {
room: RoomId,
deafened: bool,
},
ScreenShare {
room: RoomId,
start: bool,
},
// ── AI ──
AiList {
room: RoomId,
},
AiUpsert {
room: RoomId,
model: Uuid,
version: Option<Uuid>,
system_prompt: Option<String>,
temperature: Option<f64>,
max_tokens: Option<i64>,
stream: Option<bool>,
},
AiDelete {
room: RoomId,
agent_id: Uuid,
},
AiStop {
room: RoomId,
},
// ── User ──
UserSummary {
username: String,
},
}
impl WsInMessage {
pub fn room_id(&self) -> Option<RoomId> {
match self {
Self::Subscribe { room }
| Self::Unsubscribe { room }
| Self::TypingStart { room }
| Self::TypingStop { room }
| Self::ReadReceipt { room, .. }
| Self::MessageCreate { room, .. }
| Self::RoomUpdate { room, .. }
| Self::RoomDelete { room }
| Self::AccessGrant { room, .. }
| Self::AccessRevoke { room, .. }
| Self::StateSetReadSeq { room, .. }
| Self::StateUpdateDnd { room, .. }
| Self::ReactionAdd { room, .. }
| Self::ReactionRemove { room, .. }
| Self::ThreadCreate { room, .. }
| Self::PinAdd { room, .. }
| Self::PinRemove { room, .. }
| Self::DraftSave { room, .. }
| Self::DraftClear { room }
| Self::VoiceJoin { room }
| Self::VoiceLeave { room }
| Self::VoiceMute { room, .. }
| Self::VoiceDeaf { room, .. }
| Self::ScreenShare { room, .. }
| Self::AiList { room }
| Self::AiUpsert { room, .. }
| Self::AiDelete { room, .. }
| Self::MessageList { room, .. }
| Self::RoomGet { room }
| Self::Search {
room: Some(room), ..
} => Some(*room),
_ => None,
}
}
}

View File

@ -0,0 +1,104 @@
use chrono::{DateTime, Utc};
use serde::Deserialize;
use uuid::Uuid;
use models::{ProjectId, RoomId, RoomThreadId, UserId};
/// Current WS protocol version — bump when event types or fields change.
pub const WS_PROTOCOL_VERSION: u32 = 1;
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum WsInMessage {
Ping,
Subscribe { room: RoomId },
Unsubscribe { room: RoomId },
TypingStart { room: RoomId },
TypingStop { room: RoomId },
ReadReceipt { room: RoomId, last_read_seq: i64 },
MessageList { room: RoomId, before_seq: Option<i64>, after_seq: Option<i64>, limit: Option<u64> },
MessageCreate { room: RoomId, content: String, content_type: Option<String>, thread: Option<RoomThreadId>, in_reply_to: Option<Uuid> },
MessageUpdate { message: Uuid, content: String },
MessageRevoke { message: Uuid },
RoomGet { room: RoomId },
RoomCreate { project: ProjectId, room_name: String, public: bool, category: Option<Uuid> },
RoomUpdate { room: RoomId, room_name: Option<String>, public: Option<bool>, category: Option<Uuid> },
RoomDelete { room: RoomId },
CategoryCreate { project: ProjectId, name: String, position: Option<i32> },
CategoryUpdate { id: Uuid, name: Option<String>, position: Option<i32> },
CategoryDelete { id: Uuid },
AccessGrant { room: RoomId, user: UserId },
AccessRevoke { room: RoomId, user: UserId },
StateSetReadSeq { room: RoomId, last_read_seq: i64 },
StateUpdateDnd { room: RoomId, do_not_disturb: Option<bool>, dnd_start_hour: Option<i16>, dnd_end_hour: Option<i16> },
ReactionAdd { room: RoomId, message: Uuid, emoji: String },
ReactionRemove { room: RoomId, message: Uuid, emoji: String },
ThreadCreate { room: RoomId, parent: i64 },
ThreadResolve { thread_id: RoomThreadId },
ThreadArchive { thread_id: RoomThreadId },
PinAdd { room: RoomId, message: Uuid },
PinRemove { room: RoomId, message: Uuid },
DraftSave { room: RoomId, content: String },
DraftClear { room: RoomId },
Search { q: String, room: Option<RoomId>, start_time: Option<DateTime<Utc>>, end_time: Option<DateTime<Utc>>, sender_id: Option<Uuid>, content_type: Option<String>, limit: Option<u64>, offset: Option<u64> },
NotificationMarkRead { id: Uuid },
NotificationMarkAllRead { project_id: Option<ProjectId> },
NotificationArchive { id: Uuid },
PresenceUpdate { status: crate::event::presence::UserPresenceStatus },
CustomStatusUpdate { emoji: Option<String>, text: Option<String>, expires_at: Option<DateTime<Utc>> },
InviteCreate { project: ProjectId, room: Option<RoomId>, max_uses: Option<i32>, expires_at: Option<DateTime<Utc>> },
InviteAccept { code: String },
InviteRevoke { id: Uuid },
BanCreate { project: ProjectId, user: UserId, reason: Option<String>, expires_at: Option<DateTime<Utc>> },
BanRemove { project: ProjectId, user: UserId },
VoiceJoin { room: RoomId },
VoiceLeave { room: RoomId },
VoiceMute { room: RoomId, muted: bool },
VoiceDeaf { room: RoomId, deafened: bool },
ScreenShare { room: RoomId, start: bool },
AiList { room: RoomId },
AiUpsert { room: RoomId, model: Uuid, version: Option<Uuid>, system_prompt: Option<String>, temperature: Option<f64>, max_tokens: Option<i64>, stream: Option<bool> },
AiDelete { room: RoomId, agent_id: Uuid },
AiStop { room: RoomId },
UserSummary { username: String },
}
impl WsInMessage {
pub fn room_id(&self) -> Option<RoomId> {
match self {
Self::Subscribe { room }
| Self::Unsubscribe { room }
| Self::TypingStart { room }
| Self::TypingStop { room }
| Self::ReadReceipt { room, .. }
| Self::MessageCreate { room, .. }
| Self::RoomUpdate { room, .. }
| Self::RoomDelete { room }
| Self::AccessGrant { room, .. }
| Self::AccessRevoke { room, .. }
| Self::StateSetReadSeq { room, .. }
| Self::StateUpdateDnd { room, .. }
| Self::ReactionAdd { room, .. }
| Self::ReactionRemove { room, .. }
| Self::ThreadCreate { room, .. }
| Self::PinAdd { room, .. }
| Self::PinRemove { room, .. }
| Self::DraftSave { room, .. }
| Self::DraftClear { room }
| Self::VoiceJoin { room }
| Self::VoiceLeave { room }
| Self::VoiceMute { room, .. }
| Self::VoiceDeaf { room, .. }
| Self::ScreenShare { room, .. }
| Self::AiList { room }
| Self::AiUpsert { room, .. }
| Self::AiDelete { room, .. }
| Self::MessageList { room, .. }
| Self::RoomGet { room }
| Self::Search {
room: Some(room), ..
} => Some(*room),
_ => None,
}
}
}

View File

@ -0,0 +1,5 @@
mod in_message;
mod out_event;
pub use in_message::{WsInMessage, WS_PROTOCOL_VERSION};
pub use out_event::{WsError, WsOutEvent};

View File

@ -0,0 +1,275 @@
use serde::Serialize;
use uuid::Uuid;
use models::{ProjectId, RoomId, UserId};
use crate::event::{
ai, attachment, ban, category, draft, invite, member, message, notify, pin, presence, project,
reaction, rooms, search, thread, voice,
};
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum WsOutEvent {
Pong {
protocol_version: u32,
},
Error(WsError),
// ── Room events ──
RoomCreated {
room_id: RoomId,
data: rooms::RoomCreatedService,
},
RoomDeleted {
room_id: RoomId,
data: rooms::RoomDeletedService,
},
RoomRenamed {
room_id: RoomId,
data: rooms::RoomRenamedService,
},
RoomTopicUpdated {
room_id: RoomId,
data: rooms::RoomTopicUpdatedService,
},
RoomSettingsUpdated {
room_id: RoomId,
data: rooms::RoomSettingsUpdatedService,
},
RoomMoved {
room_id: RoomId,
data: rooms::RoomMovedService,
},
// ── Message events ──
MessageNew {
room_id: RoomId,
data: message::MessageNewService,
},
MessageEdited {
room_id: RoomId,
data: message::MessageEditedService,
},
MessageRevoked {
room_id: RoomId,
data: message::MessageRevokedService,
},
MessageStreamStart {
room_id: RoomId,
data: message::MessageStreamStartService,
},
MessageStreamChunk {
room_id: RoomId,
data: message::MessageStreamChunkService,
},
MessageStreamDone {
room_id: RoomId,
data: message::MessageStreamDoneService,
},
MessageList {
room_id: RoomId,
data: message::MessageListService,
},
// ── Member events ──
MemberJoined {
room_id: RoomId,
data: member::MemberJoinedService,
},
MemberRemoved {
room_id: RoomId,
data: member::MemberRemovedService,
},
ReadReceipt {
room_id: RoomId,
data: member::ReadReceiptService,
},
TypingStart {
room_id: RoomId,
data: member::TypingStartService,
},
TypingStop {
room_id: RoomId,
data: member::TypingStopService,
},
// ── Reaction events ──
ReactionAdded {
room_id: RoomId,
data: reaction::ReactionAddedService,
},
ReactionRemoved {
room_id: RoomId,
data: reaction::ReactionRemovedService,
},
ReactionBatchUpdated {
room_id: RoomId,
data: reaction::ReactionBatchUpdatedService,
},
// ── Thread events ──
ThreadCreated {
room_id: RoomId,
data: thread::ThreadCreatedService,
},
ThreadUpdated {
room_id: RoomId,
data: thread::ThreadUpdatedService,
},
ThreadResolved {
room_id: RoomId,
data: thread::ThreadResolvedService,
},
ThreadArchived {
room_id: RoomId,
data: thread::ThreadArchivedService,
},
ThreadParticipantJoined {
room_id: RoomId,
data: thread::ThreadParticipantJoinedService,
},
ThreadParticipantLeft {
room_id: RoomId,
data: thread::ThreadParticipantLeftService,
},
// ── Category events ──
CategoryCreated {
project: ProjectId,
data: category::CategoryCreatedService,
},
CategoryUpdated {
project: ProjectId,
data: category::CategoryUpdatedService,
},
CategoryDeleted {
project: ProjectId,
data: category::CategoryDeletedService,
},
// ── Pin events ──
PinAdded {
room_id: RoomId,
data: pin::PinAddedService,
},
PinRemoved {
room_id: RoomId,
data: pin::PinRemovedService,
},
// ── Project events ──
ProjectRoomCreated {
project: ProjectId,
data: project::ProjectRoomCreatedService,
},
ProjectRoomDeleted {
project: ProjectId,
data: project::ProjectRoomDeletedService,
},
ProjectRoomRenamed {
project: ProjectId,
data: project::ProjectRoomRenamedService,
},
ProjectRoomMoved {
project: ProjectId,
data: project::ProjectRoomMovedService,
},
// ── Draft events ──
DraftSaved {
room_id: RoomId,
data: draft::DraftSavedService,
},
DraftCleared {
room_id: RoomId,
data: draft::DraftClearedService,
},
// ── Search ──
SearchResult {
data: search::SearchResultService,
},
// ── Notification events ──
NotifyCreated {
data: notify::NotifyCreatedService,
},
NotifyRead {
data: notify::NotifyReadService,
},
// ── Presence events ──
PresenceChanged {
data: presence::PresenceChangedService,
},
CustomStatusUpdated {
data: presence::CustomStatusUpdatedService,
},
// ── Invite events ──
InviteCreated {
data: invite::InviteCreatedService,
},
InviteAccepted {
data: invite::InviteAcceptedService,
},
InviteRejected {
data: invite::InviteRejectedService,
},
InviteRevoked {
data: invite::InviteRevokedService,
},
// ── Attachment events ──
AttachmentUploaded {
data: attachment::AttachmentUploadedService,
},
AttachmentThumbnailGenerated {
data: attachment::AttachmentThumbnailService,
},
AttachmentDeleted {
data: attachment::AttachmentDeletedService,
},
// ── Ban events ──
UserBanned {
data: ban::BannedService,
},
UserUnbanned {
data: ban::UnbannedService,
},
// ── AI events ──
AiAgentJoined {
data: ai::AiAgentJoinedService,
},
AiAgentLeft {
data: ai::AiAgentLeftService,
},
AiAgentStatusChanged {
data: ai::AiAgentStatusChangedService,
},
// ── Voice events ──
VoiceChannelJoined {
data: voice::VoiceChannelJoinedService,
},
VoiceChannelLeft {
data: voice::VoiceChannelLeftService,
},
VoiceMuteUpdated {
data: voice::VoiceMuteUpdatedService,
},
VoiceDeafUpdated {
data: voice::VoiceDeafUpdatedService,
},
ScreenShareStarted {
data: voice::ScreenShareStartedService,
},
ScreenShareStopped {
data: voice::ScreenShareStoppedService,
},
SpeakingStarted {
room_id: RoomId,
user_id: UserId,
},
SpeakingStopped {
room_id: RoomId,
user_id: UserId,
},
// ── Request-response ──
Response {
request_id: Uuid,
data: serde_json::Value,
},
}
#[derive(Debug, Clone, Serialize)]
pub struct WsError {
pub code: i32,
pub error: String,
pub message: String,
}

View File

@ -146,7 +146,7 @@ pub async fn ws_handler(
continue;
}
// Pre-parse as Value to extract _request_id, then parse as WsInMessage
// Parse once — extract request_id and deserialize together.
let json_value = serde_json::from_str::<serde_json::Value>(&text);
// Application-level JSON ping (distinguish from WebSocket Ping frame)
@ -155,62 +155,41 @@ pub async fn ws_handler(
continue;
}
// Extract _request_id before full parsing to support request-response correlation
// Extract _request_id from the Value, then deserialize WsInMessage
let request_id: Option<Uuid> = json_value
.ok()
.and_then(|v| v.get("_request_id")
.and_then(|r| serde_json::from_value(r.clone()).ok()));
// Re-parse from text as WsInMessage for the handler
match serde_json::from_str::<WsInMessage>(&text) {
Ok(in_msg) => {
match MessageHandler::handle(&session, in_msg).await {
Ok(Some(event)) => {
if let Some(rid) = request_id {
// Unwrap inner data for MessageList to avoid double nesting
let json_data = match &event {
WsOutEvent::MessageList { data, .. } => {
serde_json::to_value(data).unwrap_or_default()
}
_ => {
serde_json::to_value(&event).unwrap_or_default()
}
};
let resp = WsOutEvent::Response { request_id: rid, data: json_data };
if send_event(&mut ws_session, &resp).await.is_err() { break; }
} else {
if send_event(&mut ws_session, &event).await.is_err() { break; }
}
let rid = request_id.unwrap_or(Uuid::nil());
let resp = WsOutEvent::Response {
request_id: rid,
data: serde_json::to_value(&event).unwrap_or_default(),
};
if send_event(&mut ws_session, &resp).await.is_err() { break; }
}
Ok(None) => {
if let Some(rid) = request_id {
let ack = WsOutEvent::Response {
request_id: rid,
data: serde_json::json!({"ok": true}),
};
if send_event(&mut ws_session, &ack).await.is_err() { break; }
} else {
let _ = ws_session.text(r#"{"type":"ack"}"#).await;
}
let rid = request_id.unwrap_or(Uuid::nil());
let ack = WsOutEvent::Response {
request_id: rid,
data: serde_json::json!({"ok": true}),
};
if send_event(&mut ws_session, &ack).await.is_err() { break; }
}
Err(e) => {
tracing::warn!(user_id = %user_id, error = %e, "WS message processing failed");
let err_json = if let Some(rid) = request_id {
serde_json::json!({
"type": "error",
"code": 500,
"error": "internal_error",
"message": e.to_string(),
"_request_id": rid
})
} else {
serde_json::json!({
"type": "error",
"code": 500,
"error": "internal_error",
"message": e.to_string()
})
};
let rid = request_id.unwrap_or(Uuid::nil());
let err_json = serde_json::json!({
"type": "error",
"code": 500,
"error": "internal_error",
"message": e.to_string(),
"_request_id": rid
});
let _ = ws_session.text(err_json.to_string()).await;
}
}

View File

@ -51,40 +51,14 @@ impl AppTransport {
Some(u) => u,
None => {
tracing::warn!("NATS_URL not configured, running without NATS transport");
return Ok(Self {
db: service.db.clone(),
cache: service.cache.clone(),
seq: SeqAllocator::new(service.cache.clone(), service.db.clone()),
ack: ack::AckTracker::new(service.cache.clone()),
reconnect: reconnect::ReconnectManager::new(service.cache.clone(), service.db.clone()),
dedup: dedup::DeduplicationManager::new(service.cache.clone()),
rate_limiter: security::RateLimiter::new(service.cache.clone()),
csrf: security::CsrfProtection::new(service.cache.clone()),
circuit_breaker: circuit_breaker::CircuitBreaker::new(),
service,
config,
nats: None,
});
return Self::build(service, config, None);
}
};
let token = match config.nats_token() {
Some(t) => t,
None => {
tracing::warn!("NATS_TOKEN not configured, running without NATS transport");
return Ok(Self {
db: service.db.clone(),
cache: service.cache.clone(),
seq: SeqAllocator::new(service.cache.clone(), service.db.clone()),
ack: ack::AckTracker::new(service.cache.clone()),
reconnect: reconnect::ReconnectManager::new(service.cache.clone(), service.db.clone()),
dedup: dedup::DeduplicationManager::new(service.cache.clone()),
rate_limiter: security::RateLimiter::new(service.cache.clone()),
csrf: security::CsrfProtection::new(service.cache.clone()),
circuit_breaker: circuit_breaker::CircuitBreaker::new(),
service,
config,
nats: None,
});
return Self::build(service, config, None);
}
};
@ -126,13 +100,14 @@ impl AppTransport {
None
};
Self::build(service, config, nats)
}
fn build(service: AppService, config: AppConfig, nats: Option<async_nats::Client>) -> Result<Self, crate::error::AppTransportError> {
Ok(Self {
db: service.db.clone(),
cache: service.cache.clone(),
seq: SeqAllocator::new(
service.cache.clone(),
service.db.clone(),
),
seq: SeqAllocator::new(service.cache.clone(), service.db.clone()),
ack: ack::AckTracker::new(service.cache.clone()),
reconnect: reconnect::ReconnectManager::new(service.cache.clone(), service.db.clone()),
dedup: dedup::DeduplicationManager::new(service.cache.clone()),

View File

@ -1,5 +1,5 @@
use base64::Engine;
use hmac::{Hmac, Mac};
use hmac::{Hmac, Mac, KeyInit};
use models::UserId;
use session::Session;
use sha2::Sha256;