279 lines
8.9 KiB
Rust
279 lines
8.9 KiB
Rust
//! In-memory presence store for tracking user online status.
|
|
//!
|
|
//! This module maintains an in-memory store of user presence states,
|
|
//! indexed by (project_id, user_id). Presence is updated via WebSocket
|
|
//! messages and is broadcast to relevant project subscribers.
|
|
//!
|
|
//! Note: This is a per-instance in-memory store. In a multi-node deployment,
|
|
//! presence state is not synchronized between nodes. For production use,
|
|
//! consider using Redis Sorted Sets with TTL-based idle detection.
|
|
|
|
use std::time::{Duration, Instant};
|
|
|
|
use chrono::{DateTime, Utc};
|
|
use dashmap::DashMap;
|
|
use serde::{Deserialize, Serialize};
|
|
use uuid::Uuid;
|
|
|
|
/// User presence status enum.
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, utoipa::ToSchema)]
|
|
#[serde(rename_all = "snake_case")]
|
|
pub enum PresenceStatus {
|
|
Online,
|
|
Idle,
|
|
Dnd,
|
|
Offline,
|
|
}
|
|
|
|
impl Default for PresenceStatus {
|
|
fn default() -> Self {
|
|
PresenceStatus::Offline
|
|
}
|
|
}
|
|
|
|
/// Presence changed event for broadcasting.
|
|
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
|
|
pub struct PresenceChanged {
|
|
pub user_id: Uuid,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub project_id: Option<Uuid>,
|
|
pub status: PresenceStatus,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub last_seen_at: Option<DateTime<Utc>>,
|
|
}
|
|
|
|
/// Custom status event for broadcasting.
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct CustomStatusChanged {
|
|
pub user_id: Uuid,
|
|
pub emoji: Option<String>,
|
|
pub text: Option<String>,
|
|
pub expires_at: Option<DateTime<Utc>>,
|
|
}
|
|
|
|
/// Maximum time before a user is considered "idle" when no heartbeat is received.
|
|
pub const IDLE_TIMEOUT: Duration = Duration::from_secs(5 * 60); // 5 minutes
|
|
|
|
/// Maximum time before a user is considered "offline" when no heartbeat is received.
|
|
pub const OFFLINE_TIMEOUT: Duration = Duration::from_secs(10 * 60); // 10 minutes
|
|
|
|
/// Presence entry for a single user in a project context.
|
|
#[derive(Clone, Debug)]
|
|
pub struct PresenceEntry {
|
|
pub user_id: Uuid,
|
|
pub project_id: Option<Uuid>,
|
|
pub status: PresenceStatus,
|
|
pub custom_emoji: Option<String>,
|
|
pub custom_text: Option<String>,
|
|
pub custom_expires_at: Option<DateTime<Utc>>,
|
|
pub last_seen_at: Option<DateTime<Utc>>,
|
|
pub last_activity: Instant,
|
|
}
|
|
|
|
impl PresenceEntry {
|
|
pub fn new(user_id: Uuid, project_id: Option<Uuid>, status: PresenceStatus) -> Self {
|
|
Self {
|
|
user_id,
|
|
project_id,
|
|
status,
|
|
custom_emoji: None,
|
|
custom_text: None,
|
|
custom_expires_at: None,
|
|
last_seen_at: Some(Utc::now()),
|
|
last_activity: Instant::now(),
|
|
}
|
|
}
|
|
|
|
/// Update the presence status and activity timestamp.
|
|
pub fn update_status(&mut self, status: PresenceStatus) {
|
|
self.status = status;
|
|
self.last_seen_at = Some(Utc::now());
|
|
self.last_activity = Instant::now();
|
|
}
|
|
|
|
/// Update custom status (emoji, text, expires_at).
|
|
pub fn update_custom_status(
|
|
&mut self,
|
|
emoji: Option<String>,
|
|
text: Option<String>,
|
|
expires_at: Option<DateTime<Utc>>,
|
|
) {
|
|
self.custom_emoji = emoji;
|
|
self.custom_text = text;
|
|
self.custom_expires_at = expires_at;
|
|
}
|
|
|
|
/// Compute effective status based on last activity.
|
|
/// If user hasn't sent any presence update for a while, they're "idle".
|
|
pub fn effective_status(&self) -> PresenceStatus {
|
|
match self.status {
|
|
PresenceStatus::Online | PresenceStatus::Idle | PresenceStatus::Dnd => {
|
|
let elapsed = self.last_activity.elapsed();
|
|
if elapsed >= OFFLINE_TIMEOUT {
|
|
PresenceStatus::Offline
|
|
} else if elapsed >= IDLE_TIMEOUT && self.status == PresenceStatus::Online {
|
|
PresenceStatus::Idle
|
|
} else {
|
|
self.status
|
|
}
|
|
}
|
|
PresenceStatus::Offline => PresenceStatus::Offline,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Global presence store - keyed by (project_id, user_id).
|
|
/// project_id = None means "global" presence (across all projects).
|
|
#[derive(Default, Clone)]
|
|
pub struct PresenceStore {
|
|
/// Main presence storage: (project_id, user_id) -> PresenceEntry
|
|
entries: DashMap<(Option<Uuid>, Uuid), PresenceEntry>,
|
|
/// Quick lookup: user_id -> Set of project_ids they're present in
|
|
user_projects: DashMap<Uuid, std::collections::HashSet<Uuid>>,
|
|
}
|
|
|
|
impl PresenceStore {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
entries: DashMap::new(),
|
|
user_projects: DashMap::new(),
|
|
}
|
|
}
|
|
|
|
/// Set user presence in a project context.
|
|
pub fn set_presence(
|
|
&self,
|
|
user_id: Uuid,
|
|
project_id: Option<Uuid>,
|
|
status: PresenceStatus,
|
|
) -> Option<PresenceChanged> {
|
|
let key = (project_id, user_id);
|
|
let now = Utc::now();
|
|
|
|
let mut entry = self.entries.entry(key).or_insert_with(|| {
|
|
// Add to user_projects index
|
|
if let Some(pid) = project_id {
|
|
let mut projects = self.user_projects.entry(user_id).or_default();
|
|
projects.insert(pid);
|
|
}
|
|
PresenceEntry::new(user_id, project_id, status)
|
|
});
|
|
|
|
let old_status = entry.status;
|
|
entry.update_status(status);
|
|
|
|
// Return event if status actually changed
|
|
if old_status != status {
|
|
Some(PresenceChanged {
|
|
user_id,
|
|
project_id,
|
|
status,
|
|
last_seen_at: Some(now),
|
|
})
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
|
|
/// Update custom status for a user.
|
|
pub fn set_custom_status(
|
|
&self,
|
|
user_id: Uuid,
|
|
emoji: Option<String>,
|
|
text: Option<String>,
|
|
expires_at: Option<DateTime<Utc>>,
|
|
) -> Option<CustomStatusChanged> {
|
|
// Find the primary presence entry (first one found)
|
|
let key = self
|
|
.entries
|
|
.iter()
|
|
.find(|e| e.user_id == user_id)
|
|
.map(|e| *e.key());
|
|
|
|
if let Some(key) = key {
|
|
if let Some(mut entry) = self.entries.get_mut(&key) {
|
|
entry.update_custom_status(emoji.clone(), text.clone(), expires_at.clone());
|
|
return Some(CustomStatusChanged {
|
|
user_id,
|
|
emoji,
|
|
text,
|
|
expires_at,
|
|
});
|
|
}
|
|
}
|
|
None
|
|
}
|
|
|
|
/// Get presence entry for a user in a specific project.
|
|
pub fn get_presence(&self, user_id: Uuid, project_id: Option<Uuid>) -> Option<PresenceEntry> {
|
|
self.entries.get(&(project_id, user_id)).map(|e| e.clone())
|
|
}
|
|
|
|
/// Get all presence entries for a project.
|
|
pub fn get_project_presence(&self, project_id: Uuid) -> Vec<PresenceChanged> {
|
|
self.entries
|
|
.iter()
|
|
.filter(|entry| entry.key().0 == Some(project_id))
|
|
.map(|entry| PresenceChanged {
|
|
user_id: entry.user_id,
|
|
project_id: entry.project_id,
|
|
status: entry.effective_status(),
|
|
last_seen_at: entry.last_seen_at,
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
/// Get all online users across all projects.
|
|
pub fn get_all_online(&self) -> Vec<PresenceChanged> {
|
|
self.entries
|
|
.iter()
|
|
.filter(|entry| entry.effective_status() != PresenceStatus::Offline)
|
|
.map(|entry| PresenceChanged {
|
|
user_id: entry.user_id,
|
|
project_id: entry.project_id,
|
|
status: entry.effective_status(),
|
|
last_seen_at: entry.last_seen_at,
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
/// Remove user presence when they disconnect.
|
|
pub fn remove_presence(
|
|
&self,
|
|
user_id: Uuid,
|
|
project_id: Option<Uuid>,
|
|
) -> Option<PresenceChanged> {
|
|
let key = (project_id, user_id);
|
|
if let Some((_, entry)) = self.entries.remove(&key) {
|
|
// Remove from user_projects index
|
|
if let Some(pid) = project_id {
|
|
if let Some(mut projects) = self.user_projects.get_mut(&user_id) {
|
|
projects.remove(&pid);
|
|
if projects.is_empty() {
|
|
drop(projects);
|
|
self.user_projects.remove(&user_id);
|
|
}
|
|
}
|
|
}
|
|
return Some(PresenceChanged {
|
|
user_id: entry.user_id,
|
|
project_id,
|
|
status: PresenceStatus::Offline,
|
|
last_seen_at: entry.last_seen_at,
|
|
});
|
|
}
|
|
None
|
|
}
|
|
|
|
/// Get count of online users in a project.
|
|
pub fn project_online_count(&self, project_id: Uuid) -> usize {
|
|
self.entries
|
|
.iter()
|
|
.filter(|entry| {
|
|
entry.key().0 == Some(project_id)
|
|
&& entry.effective_status() != PresenceStatus::Offline
|
|
})
|
|
.count()
|
|
}
|
|
}
|