gitdataai/libs/session_manager/src/manager.rs
ZhenYi e99feb236b refactor(core): migrate session_manager, email, rpc from slog to tracing
- session_manager/manager.rs: remove slog::Logger field, update new()
  and with_config() to remove log parameter
- email/lib.rs: remove slog::Logger from AppEmail::init()
- rpc/admin/server.rs: remove slog::Logger from serve() and spawn(),
  replace with tracing::info!/error!
2026-04-21 22:29:43 +08:00

195 lines
5.7 KiB
Rust

use chrono::Utc;
use uuid::Uuid;
use crate::storage::{SessionStorage, SessionStorageError};
use crate::types::{OnlineStatus, SessionInfo, UserSession};
#[derive(Debug, Clone)]
pub struct SessionManagerConfig {
pub heartbeat_interval_secs: u64,
pub idle_threshold_secs: u64,
}
impl Default for SessionManagerConfig {
fn default() -> Self {
Self {
heartbeat_interval_secs: 60,
idle_threshold_secs: 300,
}
}
}
#[derive(Clone)]
pub struct SessionManager {
storage: SessionStorage,
#[allow(dead_code)]
config: SessionManagerConfig,
}
impl SessionManager {
pub fn new(storage: SessionStorage) -> Self {
Self {
storage,
config: SessionManagerConfig::default(),
}
}
pub fn with_config(
storage: SessionStorage,
config: SessionManagerConfig,
) -> Self {
Self {
storage,
config,
}
}
/// Register a new user session. Returns the generated session ID.
pub async fn register_session(
&self,
user_id: Uuid,
workspace_id: Uuid,
ip_address: Option<String>,
user_agent: Option<String>,
) -> Result<UserSession, SessionStorageError> {
let now = Utc::now();
let session = UserSession {
session_id: Uuid::new_v4(),
user_id,
workspace_id,
ip_address,
user_agent,
connected_at: now,
last_heartbeat: now,
};
self.storage.save_session(&session).await?;
tracing::info!(
session_id = %session.session_id,
user_id = %session.user_id,
workspace_id = %session.workspace_id,
"session_registered"
);
Ok(session)
}
/// Refresh a session's heartbeat to keep it alive.
pub async fn heartbeat(&self, session_id: &Uuid) -> Result<(), SessionStorageError> {
self.storage.heartbeat(session_id).await
}
/// Remove a single session (logout from one device/tab).
pub async fn remove_session(&self, session_id: &Uuid) -> Result<(), SessionStorageError> {
self.storage.delete_session(session_id).await?;
tracing::info!(session_id = %session_id, "session_removed");
Ok(())
}
/// Kick a user from a workspace (remove all their sessions in that workspace).
pub async fn kick_user_from_workspace(
&self,
user_id: &Uuid,
workspace_id: &Uuid,
) -> Result<usize, SessionStorageError> {
let deleted = self
.storage
.delete_user_workspace_sessions(user_id, workspace_id)
.await?;
let count = deleted.len();
tracing::info!(
user_id = %user_id,
workspace_id = %workspace_id,
sessions_removed = count,
"user_kicked_from_workspace"
);
Ok(count)
}
/// Kick a user from all workspaces (full logout across all devices).
pub async fn kick_user(&self, user_id: &Uuid) -> Result<usize, SessionStorageError> {
let deleted = self.storage.delete_user_sessions(user_id).await?;
let count = deleted.len();
tracing::info!(user_id = %user_id, sessions_removed = count, "user_kicked");
Ok(count)
}
/// Get all sessions for a user.
pub async fn get_user_sessions(
&self,
user_id: &Uuid,
) -> Result<Vec<UserSession>, SessionStorageError> {
self.storage.get_user_sessions(user_id).await
}
/// Get session info (summary) for a user.
pub async fn get_user_info(
&self,
user_id: &Uuid,
) -> Result<Option<SessionInfo>, SessionStorageError> {
let sessions = self.storage.get_user_sessions(user_id).await?;
if sessions.is_empty() {
return Ok(None);
}
let mut workspaces: Vec<Uuid> = sessions.iter().map(|s| s.workspace_id).collect();
workspaces.sort();
workspaces.dedup();
let latest = sessions.iter().max_by_key(|s| s.connected_at).cloned();
Ok(Some(SessionInfo {
user_id: *user_id,
session_count: sessions.len(),
workspaces,
latest_session: latest,
}))
}
/// Get all active sessions in a workspace.
pub async fn get_workspace_sessions(
&self,
workspace_id: &Uuid,
) -> Result<Vec<UserSession>, SessionStorageError> {
self.storage.get_workspace_sessions(workspace_id).await
}
/// Get distinct user IDs in a workspace.
pub async fn get_workspace_online_users(
&self,
workspace_id: &Uuid,
) -> Result<Vec<Uuid>, SessionStorageError> {
self.storage.get_workspace_online_users(workspace_id).await
}
/// Get online status for a user.
pub async fn get_user_status(
&self,
user_id: &Uuid,
) -> Result<OnlineStatus, SessionStorageError> {
self.storage.get_user_status(user_id).await
}
/// Check if a user is online in any workspace.
pub async fn is_user_online(&self, user_id: &Uuid) -> Result<bool, SessionStorageError> {
self.storage.is_user_online(user_id).await
}
/// Returns a reference to the underlying Redis pool.
pub fn pool(&self) -> &deadpool_redis::cluster::Pool {
self.storage.pool()
}
/// Get online status for multiple users at once.
pub async fn get_bulk_status(
&self,
user_ids: &[Uuid],
) -> Result<Vec<(Uuid, OnlineStatus)>, SessionStorageError> {
let mut results = Vec::with_capacity(user_ids.len());
for uid in user_ids {
let status = self.storage.get_user_status(uid).await?;
results.push((*uid, status));
}
Ok(results)
}
}