gitdataai/libs/transport/handler/session.rs

156 lines
5.4 KiB
Rust

use std::sync::Arc;
use dashmap::DashMap;
use tokio::sync::broadcast;
use models::RoomId;
use queue::{RoomMessageEvent, RoomMessageStreamChunkEvent, TypingEvent};
use service::AppService;
use crate::error::AppTransportError;
use crate::seq::SeqAllocator;
use crate::token::AppTransportTokenContext;
// ─── Constants ────────────────────────────────────────────────────────────────
pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
pub const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(60);
pub const MAX_IDLE_TIMEOUT: Duration = Duration::from_secs(300);
pub const RATE_LIMIT_WINDOW: Duration = Duration::from_secs(1);
pub const MAX_MESSAGES_PER_SECOND: u32 = 1000;
pub const MAX_TEXT_MESSAGE_LEN: usize = 64 * 1024;
use std::time::Duration;
// ─── User Context ─────────────────────────────────────────────────────────────
#[derive(Clone)]
pub struct WsUserCtx {
pub user_id: uuid::Uuid,
pub device_id: String,
pub client_id: String,
/// Cached display name resolved at WS connect time.
pub display_name: String,
}
impl From<AppTransportTokenContext> for WsUserCtx {
fn from(ctx: AppTransportTokenContext) -> Self {
Self {
user_id: ctx.user_id,
device_id: ctx.device_id,
client_id: ctx.client_id,
display_name: ctx.user_id.to_string(),
}
}
}
// ─── Per-Room Subscription ────────────────────────────────────────────────────
use tokio::sync::Mutex;
pub struct RoomSubscription {
pub room_id: RoomId,
pub msg_rx: Mutex<broadcast::Receiver<Arc<RoomMessageEvent>>>,
pub stream_rx: Mutex<broadcast::Receiver<Arc<RoomMessageStreamChunkEvent>>>,
pub typing_rx: Mutex<broadcast::Receiver<Arc<TypingEvent>>>,
}
impl RoomSubscription {
pub fn new(
room_id: RoomId,
msg_rx: broadcast::Receiver<Arc<RoomMessageEvent>>,
stream_rx: broadcast::Receiver<Arc<RoomMessageStreamChunkEvent>>,
typing_rx: broadcast::Receiver<Arc<TypingEvent>>,
) -> Self {
Self {
room_id,
msg_rx: Mutex::new(msg_rx),
stream_rx: Mutex::new(stream_rx),
typing_rx: Mutex::new(typing_rx),
}
}
}
// ─── TransportSession ─────────────────────────────────────────────────────────
pub struct TransportSession {
pub user: WsUserCtx,
pub subscriptions: Arc<DashMap<RoomId, RoomSubscription>>,
pub seq: Arc<SeqAllocator>,
pub service: Arc<AppService>,
}
impl TransportSession {
pub fn new(
user: WsUserCtx,
service: Arc<AppService>,
) -> Self {
Self {
user,
subscriptions: Arc::new(DashMap::new()),
seq: Arc::new(SeqAllocator::new(service.cache.clone(), service.db.clone())),
service,
}
}
pub async fn next_seq(&self, room: RoomId) -> Result<i64, AppTransportError> {
self.seq.seq(room).await
}
pub async fn subscribe_room(
&self,
room_id: RoomId,
) -> Result<RoomSubscription, AppTransportError> {
let manager = &self.service.room.room_manager;
let rx = manager
.subscribe(room_id, self.user.user_id)
.await
.map_err(|_| AppTransportError::Internal)?;
let stream_rx = manager.subscribe_room_stream(room_id).await;
let typing_rx = manager.subscribe_typing(room_id).await;
Ok(RoomSubscription::new(room_id, rx, stream_rx, typing_rx))
}
pub async fn unsubscribe_room(&self, room_id: RoomId) {
self.service.room.room_manager.unsubscribe(room_id, self.user.user_id).await;
self.subscriptions.remove(&room_id);
}
pub async fn broadcast_typing(&self, room_id: RoomId, action: &str) {
let event = TypingEvent {
room_id,
user_id: self.user.user_id,
username: self.user.display_name.clone(),
avatar_url: None,
action: action.to_string(),
sender_type: Some("user".to_string()),
};
self.service.room.room_manager.broadcast_typing(room_id, event).await;
}
/// Get the current project context from the first subscribed room.
/// Returns the project_id if the user has any subscribed rooms.
pub async fn get_current_project(&self) -> Option<uuid::Uuid> {
use models::rooms::room;
use sea_orm::EntityTrait;
// Try to get the first subscribed room
let first_room = self.subscriptions.iter().next().map(|r| *r.key());
if let Some(room_id) = first_room {
// Query the room to get its project_id
if let Ok(Some(rm)) = room::Entity::find_by_id(room_id)
.one(&self.service.db)
.await
{
return Some(rm.project);
}
}
None
}
pub fn to_session(&self) -> session::Session {
let s = session::Session::no_op();
s.set_user(self.user.user_id);
s
}
}