gitdataai/libs/transport/handler/inbound.rs

804 lines
32 KiB
Rust

use room::ws_context::WsUserContext;
use uuid::Uuid;
use super::session::TransportSession;
use super::types::{WsInMessage, WsOutEvent};
use crate::error::AppTransportError;
use crate::event::{category, message, reaction};
fn service_err<E: std::fmt::Display>(op: &str, err: E) -> AppTransportError {
tracing::warn!(error = %err, operation = %op, "WS service operation failed");
AppTransportError::Internal
}
pub struct MessageHandler;
impl MessageHandler {
pub async fn handle(
session: &TransportSession,
msg: WsInMessage,
) -> Result<Option<WsOutEvent>, AppTransportError> {
match msg {
WsInMessage::Ping => Ok(Some(WsOutEvent::Pong {
protocol_version: super::types::WS_PROTOCOL_VERSION,
})),
WsInMessage::Subscribe { room } => {
let sub = session
.subscribe_room(room)
.await
.map_err(|e| service_err("subscribe_room", e))?;
session.subscriptions.insert(room, sub);
// Lazily spawn NATS broadcast workers for this room so that
// messages from other users can be delivered in real time.
// SPAWNED_ROOMS guard prevents duplicate spawns.
session.service.room.spawn_room_workers(room);
Ok(None)
}
WsInMessage::Unsubscribe { room } => {
session.unsubscribe_room(room).await;
Ok(None)
}
WsInMessage::TypingStart { room } => {
session.broadcast_typing(room, "start").await;
Ok(None)
}
WsInMessage::TypingStop { room } => {
session.broadcast_typing(room, "stop").await;
Ok(None)
}
WsInMessage::ReadReceipt {
room,
last_read_seq,
} => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_user_state_update_read_seq(room, last_read_seq, &ctx)
.await
.map_err(|e| service_err("room_user_state_update_read_seq", e))?;
Ok(None)
}
WsInMessage::MessageList {
room,
before_seq,
after_seq,
limit,
} => {
let ctx = WsUserContext::new(session.user.user_id);
let list = session
.service
.room
.room_message_list(room, before_seq, after_seq, limit, &ctx)
.await
.map_err(|e| service_err("room_message_list", e))?;
let resp = message::MessageListService {
room,
messages: list
.messages
.iter()
.map(|m| message::MessageNewService {
id: m.id,
seq: m.seq,
room: m.room,
sender_type: m.sender_type.clone(),
sender_id: m.sender_id,
display_name: m.display_name.clone(),
thread: m.thread,
in_reply_to: m.in_reply_to,
content: m.content.clone(),
content_type: m.content_type.clone(),
thinking_content: m.thinking_content.clone(),
thinking_is_chunked: m.thinking_is_chunked,
send_at: m.send_at,
reactions: Some(
m.reactions
.iter()
.map(|r| reaction::ReactionGroup {
emoji: r.emoji.clone(),
count: r.count as i64,
reacted_by_me: r.reacted_by_me,
users: r.users.iter().filter_map(|u| Uuid::parse_str(u).ok()).collect(),
})
.collect(),
),
})
.collect(),
total: list.total,
};
Ok(Some(WsOutEvent::MessageList {
room_id: room,
data: resp,
}))
}
WsInMessage::MessageCreate {
room,
content,
content_type,
thread,
in_reply_to,
} => {
let ctx = WsUserContext::new(session.user.user_id);
let msg = session
.service
.room
.room_message_create(
room,
room::RoomMessageCreateRequest {
content,
content_type,
thread,
in_reply_to,
attachment_ids: vec![],
},
&ctx,
)
.await
.map_err(|e| service_err("room_message_create", e))?;
Ok(Some(WsOutEvent::MessageNew {
room_id: room,
data: message::MessageNewService {
id: msg.id,
seq: msg.seq,
room: msg.room,
sender_type: msg.sender_type,
sender_id: msg.sender_id,
display_name: msg.display_name,
thread: msg.thread,
in_reply_to: msg.in_reply_to,
content: msg.content,
content_type: msg.content_type,
thinking_content: msg.thinking_content,
thinking_is_chunked: false,
send_at: msg.send_at,
reactions: None,
},
}))
}
WsInMessage::MessageUpdate { message, content } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_message_update(message, room::RoomMessageUpdateRequest { content }, &ctx)
.await
.map_err(|e| service_err("room_message_update", e))?;
Ok(None)
}
WsInMessage::MessageRevoke { message } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_message_revoke(message, &ctx)
.await
.map_err(|e| service_err("room_message_revoke", e))?;
Ok(None)
}
WsInMessage::RoomGet { room } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.require_room_access(room, ctx.user_id)
.await
.map_err(|e| service_err("require_room_access", e))?;
let rm = session
.service
.room
.find_room_or_404(room)
.await
.map_err(|e| service_err("find_room", e))?;
Ok(Some(WsOutEvent::RoomCreated {
room_id: rm.id,
data: crate::event::rooms::RoomCreatedService {
id: rm.id,
project: rm.project,
room_name: rm.room_name,
public: rm.public,
category: rm.category,
created_by: rm.created_by,
created_at: rm.created_at,
},
}))
}
WsInMessage::RoomCreate {
project,
room_name,
public,
category,
} => {
let ctx = WsUserContext::new(session.user.user_id);
let rm = session
.service
.room
.room_create(
project.to_string(),
room::RoomCreateRequest {
room_name,
public,
category,
},
&ctx,
)
.await
.map_err(|e| service_err("room_create", e))?;
Ok(Some(WsOutEvent::RoomCreated {
room_id: rm.id,
data: crate::event::rooms::RoomCreatedService {
id: rm.id,
project,
room_name: rm.room_name,
public: rm.public,
category,
created_by: session.user.user_id,
created_at: rm.created_at,
},
}))
}
WsInMessage::RoomUpdate {
room,
room_name,
public,
category,
} => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_update(
room,
room::RoomUpdateRequest {
room_name,
public,
category,
},
&ctx,
)
.await
.map_err(|e| service_err("room_update", e))?;
Ok(None)
}
WsInMessage::RoomDelete { room } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_delete(room, &ctx)
.await
.map_err(|e| service_err("room_delete", e))?;
Ok(None)
}
WsInMessage::CategoryCreate {
project,
name,
position,
} => {
let ctx = WsUserContext::new(session.user.user_id);
let cat = session
.service
.room
.room_category_create(
project.to_string(),
room::RoomCategoryCreateRequest { name, position },
&ctx,
)
.await
.map_err(|e| service_err("room_category_create", e))?;
Ok(Some(WsOutEvent::CategoryCreated {
project,
data: category::CategoryCreatedService {
id: cat.id,
project,
name: cat.name,
position: cat.position,
created_by: session.user.user_id,
created_at: cat.created_at,
},
}))
}
WsInMessage::CategoryUpdate { id, name, position } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_category_update(
id,
room::RoomCategoryUpdateRequest { name, position },
&ctx,
)
.await
.map_err(|e| service_err("room_category_update", e))?;
Ok(None)
}
WsInMessage::CategoryDelete { id } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_category_delete(id, &ctx)
.await
.map_err(|e| service_err("room_category_delete", e))?;
Ok(None)
}
WsInMessage::AccessGrant { room, user } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_access_grant(room, user, &ctx)
.await
.map_err(|e| service_err("room_access_grant", e))?;
Ok(None)
}
WsInMessage::AccessRevoke { room, user } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_access_revoke(room, user, &ctx)
.await
.map_err(|e| service_err("room_access_revoke", e))?;
Ok(None)
}
WsInMessage::ReactionAdd {
room,
message,
emoji,
} => {
let ctx = WsUserContext::new(session.user.user_id);
let rxs = session
.service
.room
.message_reaction_add(message, emoji, &ctx)
.await
.map_err(|e| service_err("message_reaction_add", e))?;
Ok(Some(WsOutEvent::ReactionBatchUpdated {
room_id: room,
data: reaction::ReactionBatchUpdatedService {
room: room,
message: message,
reactions: rxs
.reactions
.into_iter()
.map(|g| reaction::ReactionGroup {
emoji: g.emoji,
count: g.count as i64,
reacted_by_me: g.reacted_by_me,
users: g
.users
.iter()
.filter_map(|u| u.parse::<uuid::Uuid>().ok())
.collect(),
})
.collect(),
},
}))
}
WsInMessage::ReactionRemove {
room,
message,
emoji,
} => {
let ctx = WsUserContext::new(session.user.user_id);
let rxs = session
.service
.room
.message_reaction_remove(message, emoji, &ctx)
.await
.map_err(|e| service_err("message_reaction_remove", e))?;
Ok(Some(WsOutEvent::ReactionBatchUpdated {
room_id: room,
data: reaction::ReactionBatchUpdatedService {
room: room,
message: message,
reactions: rxs
.reactions
.into_iter()
.map(|g| reaction::ReactionGroup {
emoji: g.emoji,
count: g.count as i64,
reacted_by_me: g.reacted_by_me,
users: g
.users
.iter()
.filter_map(|u| u.parse::<uuid::Uuid>().ok())
.collect(),
})
.collect(),
},
}))
}
WsInMessage::ThreadCreate { room, parent } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_thread_create(
room,
room::RoomThreadCreateRequest { parent_seq: parent },
&ctx,
)
.await
.map_err(|e| service_err("room_thread_create", e))?;
Ok(None)
}
WsInMessage::ThreadResolve { thread_id } => {
// Dummy implementation since backend thread.rs doesn't have resolve yet
tracing::info!(%thread_id, "Thread resolved");
Ok(None)
}
WsInMessage::ThreadArchive { thread_id } => {
// Dummy implementation since backend thread.rs doesn't have archive yet
tracing::info!(%thread_id, "Thread archived");
Ok(None)
}
WsInMessage::PinAdd { room: _, message } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_pin_add(message, &ctx)
.await
.map_err(|e| service_err("room_pin_add", e))?;
Ok(None)
}
WsInMessage::PinRemove { room: _, message } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_pin_remove(message, &ctx)
.await
.map_err(|e| service_err("room_pin_remove", e))?;
Ok(None)
}
WsInMessage::DraftSave { room, content } => {
let ctx = WsUserContext::new(session.user.user_id);
let draft = session
.service
.room
.draft_save(room, content, &ctx)
.await
.map_err(|e| service_err("draft_save", e))?;
Ok(Some(WsOutEvent::DraftSaved {
room_id: room,
data: crate::event::draft::DraftSavedService {
user_id: session.user.user_id,
room: draft.room_id,
content: draft.content,
saved_at: draft.saved_at,
},
}))
}
WsInMessage::DraftClear { room } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.draft_clear(room, &ctx)
.await
.map_err(|e| service_err("draft_clear", e))?;
Ok(Some(WsOutEvent::DraftCleared {
room_id: room,
data: crate::event::draft::DraftClearedService {
user_id: session.user.user_id,
room,
cleared_at: chrono::Utc::now(),
},
}))
}
WsInMessage::NotificationMarkRead { id } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.notification_mark_read(id, &ctx)
.await
.map_err(|e| service_err("notification_mark_read", e))?;
Ok(None)
}
WsInMessage::NotificationMarkAllRead { .. } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.notification_mark_all_read(&ctx)
.await
.map_err(|e| service_err("notification_mark_all_read", e))?;
Ok(None)
}
WsInMessage::NotificationArchive { id } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.notification_archive(id, &ctx)
.await
.map_err(|e| service_err("notification_archive", e))?;
Ok(None)
}
WsInMessage::Search {
q,
room,
start_time,
end_time,
sender_id,
content_type,
limit,
offset,
} => {
let ctx = WsUserContext::new(session.user.user_id);
let room_id = room.unwrap_or_default(); // Assuming search requires a room for now
let req = room::RoomMessageSearchRequest {
q: q.clone(),
start_time,
end_time,
sender_id,
content_type,
limit,
offset,
};
let res = session
.service
.room
.room_message_search(room_id, req, &ctx)
.await
.map_err(|e| service_err("room_message_search", e))?;
let resp = crate::event::search::SearchResultService {
q,
room: room_id,
took_ms: 0,
messages: res
.messages
.into_iter()
.map(|m| crate::event::search::SearchMessageHitService {
highlighted_content: m.highlighted_content.unwrap_or_default(),
message: crate::event::message::MessageNewService {
id: m.id,
seq: m.seq,
room: m.room,
sender_type: m.sender_type,
sender_id: m.sender_id,
display_name: m.display_name,
thread: m.thread,
in_reply_to: m.in_reply_to,
content: m.content,
content_type: m.content_type,
thinking_content: m.thinking_content,
thinking_is_chunked: m.thinking_is_chunked,
send_at: m.send_at,
reactions: None,
},
})
.collect(),
total: res.total,
};
Ok(Some(WsOutEvent::SearchResult { data: resp }))
}
WsInMessage::PresenceUpdate { status } => {
// Get project context from session's subscribed rooms (first room's project)
let project_id = session.get_current_project().await;
// Convert transport status to room presence status
let presence_status = match status {
crate::event::presence::UserPresenceStatus::Online => room::presence::PresenceStatus::Online,
crate::event::presence::UserPresenceStatus::Idle => room::presence::PresenceStatus::Idle,
crate::event::presence::UserPresenceStatus::Dnd => room::presence::PresenceStatus::Dnd,
crate::event::presence::UserPresenceStatus::Offline => room::presence::PresenceStatus::Offline,
};
let event = session
.service
.room
.set_user_presence(session.user.user_id, project_id, presence_status)
.await;
if let Some(evt) = event {
// Convert to transport event type
let transport_status = match evt.status {
room::presence::PresenceStatus::Online => crate::event::presence::UserPresenceStatus::Online,
room::presence::PresenceStatus::Idle => crate::event::presence::UserPresenceStatus::Idle,
room::presence::PresenceStatus::Dnd => crate::event::presence::UserPresenceStatus::Dnd,
room::presence::PresenceStatus::Offline => crate::event::presence::UserPresenceStatus::Offline,
};
Ok(Some(WsOutEvent::PresenceChanged {
data: crate::event::presence::PresenceChangedService {
user: evt.user_id,
project: evt.project_id,
status: transport_status,
last_seen_at: evt.last_seen_at,
},
}))
} else {
Ok(None)
}
}
WsInMessage::CustomStatusUpdate {
emoji,
text,
expires_at,
} => {
let evt = session
.service
.room
.set_custom_status(session.user.user_id, emoji.clone(), text.clone(), expires_at);
if let Some(data) = evt {
Ok(Some(WsOutEvent::CustomStatusUpdated {
data: crate::event::presence::CustomStatusUpdatedService {
user: data.user_id,
emoji: data.emoji,
text: data.text,
expires_at: data.expires_at,
},
}))
} else {
Ok(None)
}
}
WsInMessage::InviteCreate { .. } => {
tracing::info!("Invite create");
Ok(None)
}
WsInMessage::InviteAccept { .. } => {
tracing::info!("Invite accept");
Ok(None)
}
WsInMessage::InviteRevoke { .. } => {
tracing::info!("Invite revoke");
Ok(None)
}
WsInMessage::BanCreate { .. } => {
tracing::info!("Ban create");
Ok(None)
}
WsInMessage::BanRemove { .. } => {
tracing::info!("Ban remove");
Ok(None)
}
WsInMessage::VoiceJoin { room } => {
tracing::info!(%room, "Voice join");
Ok(None)
}
WsInMessage::VoiceLeave { room } => {
tracing::info!(%room, "Voice leave");
Ok(None)
}
WsInMessage::VoiceMute { room, muted } => {
tracing::info!(%room, %muted, "Voice mute");
Ok(None)
}
WsInMessage::VoiceDeaf { room, deafened } => {
tracing::info!(%room, %deafened, "Voice deaf");
Ok(None)
}
WsInMessage::ScreenShare { room, start } => {
tracing::info!(%room, %start, "Screen share");
Ok(None)
}
WsInMessage::AiList { room } => {
let ctx = WsUserContext::new(session.user.user_id);
let ai_list = session
.service
.room
.room_ai_list(room, &ctx)
.await
.map_err(|e| service_err("room_ai_list", e))?;
let data = serde_json::to_value(ai_list).unwrap_or_default();
Ok(Some(WsOutEvent::Response {
request_id: Uuid::nil(),
data,
}))
}
WsInMessage::AiUpsert {
room,
model,
version,
system_prompt,
temperature,
max_tokens,
stream,
} => {
let ctx = WsUserContext::new(session.user.user_id);
let req = room::RoomAiUpsertRequest {
model,
version,
system_prompt,
temperature,
max_tokens,
stream,
history_limit: None,
use_exact: None,
think: None,
min_score: None,
agent_type: None,
};
let ai_model = session
.service
.room
.room_ai_upsert(room, req, &ctx)
.await
.map_err(|e| service_err("room_ai_upsert", e))?;
let data = serde_json::to_value(ai_model).unwrap_or_default();
Ok(Some(WsOutEvent::Response {
request_id: Uuid::nil(),
data,
}))
}
WsInMessage::AiDelete { room, agent_id } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_ai_delete(room, agent_id, &ctx)
.await
.map_err(|e| service_err("room_ai_delete", e))?;
Ok(None)
}
WsInMessage::AiStop { room } => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_ai_stop(room, &ctx)
.await
.map_err(|e| service_err("room_ai_stop", e))?;
Ok(None)
}
WsInMessage::UserSummary { username } => {
let ctx = session.to_session();
let summary = session
.service
.user_get_summary(ctx, username)
.await
.map_err(|e| service_err("user_get_summary", e))?;
let data = serde_json::to_value(summary).unwrap_or_default();
Ok(Some(WsOutEvent::Response {
request_id: Uuid::nil(), // Filled by ws_handler if rid exists
data,
}))
}
WsInMessage::StateSetReadSeq {
room,
last_read_seq,
} => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_user_state_update_read_seq(room, last_read_seq, &ctx)
.await
.map_err(|e| service_err("room_user_state_update_read_seq", e))?;
Ok(None)
}
WsInMessage::StateUpdateDnd {
room,
do_not_disturb,
dnd_start_hour,
dnd_end_hour,
} => {
let ctx = WsUserContext::new(session.user.user_id);
session
.service
.room
.room_user_state_update_dnd(
room,
room::RoomUserStateUpdateDndRequest {
do_not_disturb,
dnd_start_hour,
dnd_end_hour,
},
&ctx,
)
.await
.map_err(|e| service_err("room_user_state_update_dnd", e))?;
Ok(None)
}
}
}
}