90 lines
2.4 KiB
Rust
90 lines
2.4 KiB
Rust
use std::{
|
|
collections::{HashMap, HashSet, VecDeque},
|
|
sync::Arc,
|
|
sync::Mutex as StdMutex,
|
|
sync::atomic::AtomicBool,
|
|
time::Instant,
|
|
};
|
|
|
|
use serde_json::Value;
|
|
use tokio::sync::{Mutex, Notify, oneshot};
|
|
use uuid::Uuid;
|
|
|
|
use crate::{
|
|
engine_packet::{EnginePacket, SocketPayload},
|
|
packet::Packet,
|
|
};
|
|
|
|
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
|
pub enum Transport {
|
|
Polling,
|
|
WebSocket,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct SocketState {
|
|
pub sid: String,
|
|
pub rooms: HashSet<String>,
|
|
pub auth: Option<Value>,
|
|
}
|
|
|
|
pub struct PendingBinary {
|
|
pub packet: Packet,
|
|
}
|
|
|
|
pub struct Session {
|
|
pub engine_id: String,
|
|
pub user: StdMutex<Option<Uuid>>,
|
|
pub transport: Mutex<Transport>,
|
|
pub namespaces: Mutex<HashMap<String, SocketState>>,
|
|
pub pending_binary: Mutex<Option<PendingBinary>>,
|
|
pub ack_waiters: Mutex<AckWaiters>,
|
|
pub last_pong: Mutex<Instant>,
|
|
queue: Mutex<VecDeque<EnginePacket>>,
|
|
pub get_active: Arc<AtomicBool>,
|
|
pub post_active: Arc<AtomicBool>,
|
|
pub notify: Notify,
|
|
}
|
|
|
|
pub type AckWaiters = HashMap<(String, u64), oneshot::Sender<Vec<Value>>>;
|
|
|
|
impl Session {
|
|
pub fn new(user: Option<Uuid>) -> Arc<Self> {
|
|
Arc::new(Self {
|
|
engine_id: Uuid::new_v4().to_string(),
|
|
user: StdMutex::new(user),
|
|
transport: Mutex::new(Transport::Polling),
|
|
namespaces: Mutex::new(HashMap::new()),
|
|
pending_binary: Mutex::new(None),
|
|
ack_waiters: Mutex::new(HashMap::new()),
|
|
last_pong: Mutex::new(Instant::now()),
|
|
queue: Mutex::new(VecDeque::new()),
|
|
get_active: Arc::new(AtomicBool::new(false)),
|
|
post_active: Arc::new(AtomicBool::new(false)),
|
|
notify: Notify::new(),
|
|
})
|
|
}
|
|
|
|
pub async fn enqueue(&self, packet: EnginePacket) {
|
|
self.queue.lock().await.push_back(packet);
|
|
self.notify.notify_waiters();
|
|
}
|
|
|
|
pub async fn enqueue_socket_packet(&self, packet: Packet) {
|
|
self.enqueue(EnginePacket::Message(SocketPayload::Text(
|
|
packet.encode(),
|
|
)))
|
|
.await;
|
|
for attachment in packet.attachments {
|
|
self.enqueue(EnginePacket::Message(SocketPayload::Binary(
|
|
attachment,
|
|
)))
|
|
.await;
|
|
}
|
|
}
|
|
|
|
pub async fn drain(&self) -> Vec<EnginePacket> {
|
|
self.queue.lock().await.drain(..).collect()
|
|
}
|
|
}
|