From 418f9a5d8b79031ce7dcbdf909030efa5198ed75 Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Tue, 21 Apr 2026 13:44:25 +0800 Subject: [PATCH] feat(rpc): migrate admin from Redis Pub/Sub JSON-RPC to Tonic gRPC - libs/rpc/proto/: admin.proto with 8 RPC methods - libs/rpc/admin/: tonic server impl (SessionAdminService), client wrapper (AdminGrpcClient), types, generated/ tonic-prost build output - libs/rpc/build.rs: tonic-prost-build two-step (proto -> message types + manual service defs) - libs/rpc/lib.rs: module re-exports - libs/session_manager/: session manager types used by admin service --- libs/rpc/Cargo.toml | 20 + libs/rpc/admin/client.rs | 168 +++ .../rpc/admin/generated/admin.SessionAdmin.rs | 906 ++++++++++++++++ libs/rpc/admin/generated/admin.rs | 991 ++++++++++++++++++ libs/rpc/admin/generated/mod.rs | 7 + libs/rpc/admin/server.rs | 215 ++++ libs/rpc/admin/types.rs | 68 ++ libs/rpc/build.rs | 93 ++ libs/rpc/lib.rs | 15 +- libs/rpc/proto/admin.proto | 109 ++ libs/session_manager/Cargo.toml | 32 + libs/session_manager/src/lib.rs | 7 + libs/session_manager/src/manager.rs | 197 ++++ libs/session_manager/src/storage.rs | 329 ++++++ libs/session_manager/src/types.rs | 36 + 15 files changed, 3179 insertions(+), 14 deletions(-) create mode 100644 libs/rpc/admin/client.rs create mode 100644 libs/rpc/admin/generated/admin.SessionAdmin.rs create mode 100644 libs/rpc/admin/generated/admin.rs create mode 100644 libs/rpc/admin/generated/mod.rs create mode 100644 libs/rpc/admin/server.rs create mode 100644 libs/rpc/admin/types.rs create mode 100644 libs/rpc/build.rs create mode 100644 libs/rpc/proto/admin.proto create mode 100644 libs/session_manager/Cargo.toml create mode 100644 libs/session_manager/src/lib.rs create mode 100644 libs/session_manager/src/manager.rs create mode 100644 libs/session_manager/src/storage.rs create mode 100644 libs/session_manager/src/types.rs diff --git a/libs/rpc/Cargo.toml b/libs/rpc/Cargo.toml index 9d28dfe..d6121f7 100644 --- a/libs/rpc/Cargo.toml +++ b/libs/rpc/Cargo.toml @@ -15,6 +15,26 @@ documentation.workspace = true path = "lib.rs" name = "rpc" [dependencies] +# gRPC / Prost +tonic = { workspace = true } +prost = { workspace = true } +prost-types = { workspace = true } + +# Internal +session_manager = { workspace = true } + +# Logging +slog = { workspace = true } + +# Utilities +anyhow = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true } +tokio = { workspace = true, features = ["sync"] } [lints] workspace = true + +[build-dependencies] +tonic-prost-build = "0.14.5" +prost-types = { workspace = true } diff --git a/libs/rpc/admin/client.rs b/libs/rpc/admin/client.rs new file mode 100644 index 0000000..82de891 --- /dev/null +++ b/libs/rpc/admin/client.rs @@ -0,0 +1,168 @@ +//! Tonic gRPC client wrapper for SessionAdmin service. + +use session_manager::{OnlineStatus, SessionInfo, UserSession}; +use tonic::transport::Channel; +use uuid::Uuid; + +use super::generated::admin::{ + ListWorkspaceSessionsRequest, ListUserSessionsRequest, + KickUserFromWorkspaceRequest, KickUserRequest, + GetUserStatusRequest, GetUserInfoRequest, + GetWorkspaceOnlineUsersRequest, IsUserOnlineRequest, +}; +use super::generated::admin_session_admin::session_admin_client::SessionAdminClient; +use super::types::from_proto_status; + +/// Auto-generated gRPC client type. +pub type GrpcClient = SessionAdminClient; + +/// Thin wrapper around the generated SessionAdminClient. +pub struct AdminGrpcClient { + inner: GrpcClient, +} + +impl AdminGrpcClient { + /// Connect to the gRPC server at the given URI. + pub async fn connect(uri: tonic::codegen::http::Uri) -> anyhow::Result { + let inner = SessionAdminClient::connect(uri).await?; + Ok(Self { inner }) + } + + /// Wrap an existing channel. + pub fn new(inner: GrpcClient) -> Self { + Self { inner } + } + + pub async fn list_workspace_sessions( + &mut self, + workspace_id: Uuid, + ) -> anyhow::Result> { + let req = tonic::Request::new(ListWorkspaceSessionsRequest { + workspace_id: workspace_id.to_string(), + }); + let res = self.inner.list_workspace_sessions(req).await + .map_err(|e| anyhow::anyhow!("gRPC error: {}", e))?; + Ok(res.into_inner().sessions + .into_iter() + .map(from_proto_session) + .collect()) + } + + pub async fn list_user_sessions( + &mut self, + user_id: Uuid, + ) -> anyhow::Result> { + let req = tonic::Request::new(ListUserSessionsRequest { + user_id: user_id.to_string(), + }); + let res = self.inner.list_user_sessions(req).await + .map_err(|e| anyhow::anyhow!("gRPC error: {}", e))?; + Ok(res.into_inner().sessions + .into_iter() + .map(from_proto_session) + .collect()) + } + + pub async fn kick_user_from_workspace( + &mut self, + user_id: Uuid, + workspace_id: Uuid, + ) -> anyhow::Result { + let req = tonic::Request::new(KickUserFromWorkspaceRequest { + user_id: user_id.to_string(), + workspace_id: workspace_id.to_string(), + }); + let res = self.inner.kick_user_from_workspace(req).await + .map_err(|e| anyhow::anyhow!("gRPC error: {}", e))?; + Ok(res.into_inner().kicked_count as usize) + } + + pub async fn kick_user(&mut self, user_id: Uuid) -> anyhow::Result { + let req = tonic::Request::new(KickUserRequest { + user_id: user_id.to_string(), + }); + let res = self.inner.kick_user(req).await + .map_err(|e| anyhow::anyhow!("gRPC error: {}", e))?; + Ok(res.into_inner().kicked_count as usize) + } + + pub async fn get_user_status(&mut self, user_id: Uuid) -> anyhow::Result { + let req = tonic::Request::new(GetUserStatusRequest { + user_id: user_id.to_string(), + }); + let res = self.inner.get_user_status(req).await + .map_err(|e| anyhow::anyhow!("gRPC error: {}", e))?; + let status = super::generated::admin::OnlineStatus::try_from(res.get_ref().status) + .unwrap_or(super::generated::admin::OnlineStatus::Offline); + Ok(from_proto_status(status)) + } + + pub async fn get_user_info( + &mut self, + user_id: Uuid, + ) -> anyhow::Result> { + let req = tonic::Request::new(GetUserInfoRequest { + user_id: user_id.to_string(), + }); + let res = self.inner.get_user_info(req).await + .map_err(|e| anyhow::anyhow!("gRPC error: {}", e))?; + Ok(res.into_inner().info.map(|i| from_proto_info(&i))) + } + + pub async fn get_workspace_online_users( + &mut self, + workspace_id: Uuid, + ) -> anyhow::Result> { + let req = tonic::Request::new(GetWorkspaceOnlineUsersRequest { + workspace_id: workspace_id.to_string(), + }); + let res = self.inner.get_workspace_online_users(req).await + .map_err(|e| anyhow::anyhow!("gRPC error: {}", e))?; + res.into_inner().user_ids + .into_iter() + .map(|s| Uuid::parse_str(&s)) + .collect::, _>>() + .map_err(|e| anyhow::anyhow!("invalid UUID in response: {}", e)) + } + + pub async fn is_user_online(&mut self, user_id: Uuid) -> anyhow::Result { + let req = tonic::Request::new(IsUserOnlineRequest { + user_id: user_id.to_string(), + }); + let res = self.inner.is_user_online(req).await + .map_err(|e| anyhow::anyhow!("gRPC error: {}", e))?; + Ok(res.into_inner().online) + } +} + +// --------------------------------------------------------------------------- +// Proto → session_manager type conversions +// --------------------------------------------------------------------------- + +fn from_proto_session(s: super::generated::admin::UserSession) -> UserSession { + UserSession { + session_id: Uuid::parse_str(&s.session_id).unwrap_or_default(), + user_id: Uuid::parse_str(&s.user_id).unwrap_or_default(), + workspace_id: Uuid::parse_str(&s.workspace_id).unwrap_or_default(), + ip_address: s.ip_address, + user_agent: s.user_agent, + connected_at: s.connected_at + .and_then(|ts| chrono::DateTime::from_timestamp(ts.seconds, ts.nanos as u32)) + .unwrap_or_else(chrono::Utc::now), + last_heartbeat: s.last_heartbeat + .and_then(|ts| chrono::DateTime::from_timestamp(ts.seconds, ts.nanos as u32)) + .unwrap_or_else(chrono::Utc::now), + } +} + +fn from_proto_info(info: &super::generated::admin::SessionInfo) -> SessionInfo { + SessionInfo { + user_id: Uuid::parse_str(&info.user_id).unwrap_or_default(), + session_count: info.session_count as usize, + workspaces: info.workspaces + .iter() + .filter_map(|w| Uuid::parse_str(w).ok()) + .collect(), + latest_session: info.latest_session.as_ref().map(|s| from_proto_session(s.clone())), + } +} diff --git a/libs/rpc/admin/generated/admin.SessionAdmin.rs b/libs/rpc/admin/generated/admin.SessionAdmin.rs new file mode 100644 index 0000000..46fa12a --- /dev/null +++ b/libs/rpc/admin/generated/admin.SessionAdmin.rs @@ -0,0 +1,906 @@ +/// Generated client implementations. +pub mod session_admin_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct SessionAdminClient { + inner: tonic::client::Grpc, + } + impl SessionAdminClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl SessionAdminClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> SessionAdminClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + SessionAdminClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn list_workspace_sessions( + &mut self, + request: impl tonic::IntoRequest< + crate::admin::generated::admin::ListWorkspaceSessionsRequest, + >, + ) -> std::result::Result< + tonic::Response< + crate::admin::generated::admin::ListWorkspaceSessionsResponse, + >, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/admin.SessionAdmin/ListWorkspaceSessions", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("admin.SessionAdmin", "ListWorkspaceSessions")); + self.inner.unary(req, path, codec).await + } + pub async fn list_user_sessions( + &mut self, + request: impl tonic::IntoRequest< + crate::admin::generated::admin::ListUserSessionsRequest, + >, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/admin.SessionAdmin/ListUserSessions", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("admin.SessionAdmin", "ListUserSessions")); + self.inner.unary(req, path, codec).await + } + pub async fn kick_user_from_workspace( + &mut self, + request: impl tonic::IntoRequest< + crate::admin::generated::admin::KickUserFromWorkspaceRequest, + >, + ) -> std::result::Result< + tonic::Response< + crate::admin::generated::admin::KickUserFromWorkspaceResponse, + >, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/admin.SessionAdmin/KickUserFromWorkspace", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("admin.SessionAdmin", "KickUserFromWorkspace")); + self.inner.unary(req, path, codec).await + } + pub async fn kick_user( + &mut self, + request: impl tonic::IntoRequest< + crate::admin::generated::admin::KickUserRequest, + >, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/admin.SessionAdmin/KickUser", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("admin.SessionAdmin", "KickUser")); + self.inner.unary(req, path, codec).await + } + pub async fn get_user_status( + &mut self, + request: impl tonic::IntoRequest< + crate::admin::generated::admin::GetUserStatusRequest, + >, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/admin.SessionAdmin/GetUserStatus", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("admin.SessionAdmin", "GetUserStatus")); + self.inner.unary(req, path, codec).await + } + pub async fn get_user_info( + &mut self, + request: impl tonic::IntoRequest< + crate::admin::generated::admin::GetUserInfoRequest, + >, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/admin.SessionAdmin/GetUserInfo", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("admin.SessionAdmin", "GetUserInfo")); + self.inner.unary(req, path, codec).await + } + pub async fn get_workspace_online_users( + &mut self, + request: impl tonic::IntoRequest< + crate::admin::generated::admin::GetWorkspaceOnlineUsersRequest, + >, + ) -> std::result::Result< + tonic::Response< + crate::admin::generated::admin::GetWorkspaceOnlineUsersResponse, + >, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/admin.SessionAdmin/GetWorkspaceOnlineUsers", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("admin.SessionAdmin", "GetWorkspaceOnlineUsers"), + ); + self.inner.unary(req, path, codec).await + } + pub async fn is_user_online( + &mut self, + request: impl tonic::IntoRequest< + crate::admin::generated::admin::IsUserOnlineRequest, + >, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/admin.SessionAdmin/IsUserOnline", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("admin.SessionAdmin", "IsUserOnline")); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod session_admin_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with SessionAdminServer. + #[async_trait] + pub trait SessionAdmin: std::marker::Send + std::marker::Sync + 'static { + async fn list_workspace_sessions( + &self, + request: tonic::Request< + crate::admin::generated::admin::ListWorkspaceSessionsRequest, + >, + ) -> std::result::Result< + tonic::Response< + crate::admin::generated::admin::ListWorkspaceSessionsResponse, + >, + tonic::Status, + >; + async fn list_user_sessions( + &self, + request: tonic::Request< + crate::admin::generated::admin::ListUserSessionsRequest, + >, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn kick_user_from_workspace( + &self, + request: tonic::Request< + crate::admin::generated::admin::KickUserFromWorkspaceRequest, + >, + ) -> std::result::Result< + tonic::Response< + crate::admin::generated::admin::KickUserFromWorkspaceResponse, + >, + tonic::Status, + >; + async fn kick_user( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_user_status( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_user_info( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_workspace_online_users( + &self, + request: tonic::Request< + crate::admin::generated::admin::GetWorkspaceOnlineUsersRequest, + >, + ) -> std::result::Result< + tonic::Response< + crate::admin::generated::admin::GetWorkspaceOnlineUsersResponse, + >, + tonic::Status, + >; + async fn is_user_online( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + } + #[derive(Debug)] + pub struct SessionAdminServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl SessionAdminServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for SessionAdminServer + where + T: SessionAdmin, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/admin.SessionAdmin/ListWorkspaceSessions" => { + #[allow(non_camel_case_types)] + struct ListWorkspaceSessionsSvc(pub Arc); + impl< + T: SessionAdmin, + > tonic::server::UnaryService< + crate::admin::generated::admin::ListWorkspaceSessionsRequest, + > for ListWorkspaceSessionsSvc { + type Response = crate::admin::generated::admin::ListWorkspaceSessionsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + crate::admin::generated::admin::ListWorkspaceSessionsRequest, + >, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::list_workspace_sessions( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ListWorkspaceSessionsSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/admin.SessionAdmin/ListUserSessions" => { + #[allow(non_camel_case_types)] + struct ListUserSessionsSvc(pub Arc); + impl< + T: SessionAdmin, + > tonic::server::UnaryService< + crate::admin::generated::admin::ListUserSessionsRequest, + > for ListUserSessionsSvc { + type Response = crate::admin::generated::admin::ListUserSessionsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + crate::admin::generated::admin::ListUserSessionsRequest, + >, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::list_user_sessions(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ListUserSessionsSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/admin.SessionAdmin/KickUserFromWorkspace" => { + #[allow(non_camel_case_types)] + struct KickUserFromWorkspaceSvc(pub Arc); + impl< + T: SessionAdmin, + > tonic::server::UnaryService< + crate::admin::generated::admin::KickUserFromWorkspaceRequest, + > for KickUserFromWorkspaceSvc { + type Response = crate::admin::generated::admin::KickUserFromWorkspaceResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + crate::admin::generated::admin::KickUserFromWorkspaceRequest, + >, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::kick_user_from_workspace( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = KickUserFromWorkspaceSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/admin.SessionAdmin/KickUser" => { + #[allow(non_camel_case_types)] + struct KickUserSvc(pub Arc); + impl< + T: SessionAdmin, + > tonic::server::UnaryService< + crate::admin::generated::admin::KickUserRequest, + > for KickUserSvc { + type Response = crate::admin::generated::admin::KickUserResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + crate::admin::generated::admin::KickUserRequest, + >, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::kick_user(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = KickUserSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/admin.SessionAdmin/GetUserStatus" => { + #[allow(non_camel_case_types)] + struct GetUserStatusSvc(pub Arc); + impl< + T: SessionAdmin, + > tonic::server::UnaryService< + crate::admin::generated::admin::GetUserStatusRequest, + > for GetUserStatusSvc { + type Response = crate::admin::generated::admin::GetUserStatusResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + crate::admin::generated::admin::GetUserStatusRequest, + >, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_user_status(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetUserStatusSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/admin.SessionAdmin/GetUserInfo" => { + #[allow(non_camel_case_types)] + struct GetUserInfoSvc(pub Arc); + impl< + T: SessionAdmin, + > tonic::server::UnaryService< + crate::admin::generated::admin::GetUserInfoRequest, + > for GetUserInfoSvc { + type Response = crate::admin::generated::admin::GetUserInfoResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + crate::admin::generated::admin::GetUserInfoRequest, + >, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_user_info(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetUserInfoSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/admin.SessionAdmin/GetWorkspaceOnlineUsers" => { + #[allow(non_camel_case_types)] + struct GetWorkspaceOnlineUsersSvc(pub Arc); + impl< + T: SessionAdmin, + > tonic::server::UnaryService< + crate::admin::generated::admin::GetWorkspaceOnlineUsersRequest, + > for GetWorkspaceOnlineUsersSvc { + type Response = crate::admin::generated::admin::GetWorkspaceOnlineUsersResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + crate::admin::generated::admin::GetWorkspaceOnlineUsersRequest, + >, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_workspace_online_users( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetWorkspaceOnlineUsersSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/admin.SessionAdmin/IsUserOnline" => { + #[allow(non_camel_case_types)] + struct IsUserOnlineSvc(pub Arc); + impl< + T: SessionAdmin, + > tonic::server::UnaryService< + crate::admin::generated::admin::IsUserOnlineRequest, + > for IsUserOnlineSvc { + type Response = crate::admin::generated::admin::IsUserOnlineResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + crate::admin::generated::admin::IsUserOnlineRequest, + >, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::is_user_online(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = IsUserOnlineSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new( + tonic::body::Body::default(), + ); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for SessionAdminServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "admin.SessionAdmin"; + impl tonic::server::NamedService for SessionAdminServer { + const NAME: &'static str = SERVICE_NAME; + } +} diff --git a/libs/rpc/admin/generated/admin.rs b/libs/rpc/admin/generated/admin.rs new file mode 100644 index 0000000..eaf3525 --- /dev/null +++ b/libs/rpc/admin/generated/admin.rs @@ -0,0 +1,991 @@ +// This file is @generated by prost-build. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct UserSession { + #[prost(string, tag = "1")] + pub session_id: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub user_id: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub workspace_id: ::prost::alloc::string::String, + #[prost(string, optional, tag = "4")] + pub ip_address: ::core::option::Option<::prost::alloc::string::String>, + #[prost(string, optional, tag = "5")] + pub user_agent: ::core::option::Option<::prost::alloc::string::String>, + #[prost(message, optional, tag = "6")] + pub connected_at: ::core::option::Option<::prost_types::Timestamp>, + #[prost(message, optional, tag = "7")] + pub last_heartbeat: ::core::option::Option<::prost_types::Timestamp>, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct SessionInfo { + #[prost(string, tag = "1")] + pub user_id: ::prost::alloc::string::String, + #[prost(uint32, tag = "2")] + pub session_count: u32, + #[prost(string, repeated, tag = "3")] + pub workspaces: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(message, optional, tag = "4")] + pub latest_session: ::core::option::Option, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct ListWorkspaceSessionsRequest { + #[prost(string, tag = "1")] + pub workspace_id: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListWorkspaceSessionsResponse { + #[prost(message, repeated, tag = "1")] + pub sessions: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct ListUserSessionsRequest { + #[prost(string, tag = "1")] + pub user_id: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListUserSessionsResponse { + #[prost(message, repeated, tag = "1")] + pub sessions: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct KickUserFromWorkspaceRequest { + #[prost(string, tag = "1")] + pub user_id: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub workspace_id: ::prost::alloc::string::String, +} +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct KickUserFromWorkspaceResponse { + #[prost(uint32, tag = "1")] + pub kicked_count: u32, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct KickUserRequest { + #[prost(string, tag = "1")] + pub user_id: ::prost::alloc::string::String, +} +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct KickUserResponse { + #[prost(uint32, tag = "1")] + pub kicked_count: u32, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetUserStatusRequest { + #[prost(string, tag = "1")] + pub user_id: ::prost::alloc::string::String, +} +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetUserStatusResponse { + #[prost(enumeration = "OnlineStatus", tag = "1")] + pub status: i32, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetUserInfoRequest { + #[prost(string, tag = "1")] + pub user_id: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetUserInfoResponse { + #[prost(message, optional, tag = "1")] + pub info: ::core::option::Option, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetWorkspaceOnlineUsersRequest { + #[prost(string, tag = "1")] + pub workspace_id: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetWorkspaceOnlineUsersResponse { + #[prost(string, repeated, tag = "1")] + pub user_ids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct IsUserOnlineRequest { + #[prost(string, tag = "1")] + pub user_id: ::prost::alloc::string::String, +} +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct IsUserOnlineResponse { + #[prost(bool, tag = "1")] + pub online: bool, +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum OnlineStatus { + Unspecified = 0, + Online = 1, + Idle = 2, + Offline = 3, +} +impl OnlineStatus { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Unspecified => "ONLINE_STATUS_UNSPECIFIED", + Self::Online => "ONLINE_STATUS_ONLINE", + Self::Idle => "ONLINE_STATUS_IDLE", + Self::Offline => "ONLINE_STATUS_OFFLINE", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "ONLINE_STATUS_UNSPECIFIED" => Some(Self::Unspecified), + "ONLINE_STATUS_ONLINE" => Some(Self::Online), + "ONLINE_STATUS_IDLE" => Some(Self::Idle), + "ONLINE_STATUS_OFFLINE" => Some(Self::Offline), + _ => None, + } + } +} +/// Generated client implementations. +pub mod session_admin_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct SessionAdminClient { + inner: tonic::client::Grpc, + } + impl SessionAdminClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl SessionAdminClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> SessionAdminClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + SessionAdminClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn list_workspace_sessions( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/admin.SessionAdmin/ListWorkspaceSessions", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("admin.SessionAdmin", "ListWorkspaceSessions")); + self.inner.unary(req, path, codec).await + } + pub async fn list_user_sessions( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/admin.SessionAdmin/ListUserSessions", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("admin.SessionAdmin", "ListUserSessions")); + self.inner.unary(req, path, codec).await + } + pub async fn kick_user_from_workspace( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/admin.SessionAdmin/KickUserFromWorkspace", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("admin.SessionAdmin", "KickUserFromWorkspace")); + self.inner.unary(req, path, codec).await + } + pub async fn kick_user( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/admin.SessionAdmin/KickUser", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("admin.SessionAdmin", "KickUser")); + self.inner.unary(req, path, codec).await + } + pub async fn get_user_status( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/admin.SessionAdmin/GetUserStatus", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("admin.SessionAdmin", "GetUserStatus")); + self.inner.unary(req, path, codec).await + } + pub async fn get_user_info( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/admin.SessionAdmin/GetUserInfo", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("admin.SessionAdmin", "GetUserInfo")); + self.inner.unary(req, path, codec).await + } + pub async fn get_workspace_online_users( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/admin.SessionAdmin/GetWorkspaceOnlineUsers", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("admin.SessionAdmin", "GetWorkspaceOnlineUsers"), + ); + self.inner.unary(req, path, codec).await + } + pub async fn is_user_online( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/admin.SessionAdmin/IsUserOnline", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("admin.SessionAdmin", "IsUserOnline")); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod session_admin_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with SessionAdminServer. + #[async_trait] + pub trait SessionAdmin: std::marker::Send + std::marker::Sync + 'static { + async fn list_workspace_sessions( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn list_user_sessions( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn kick_user_from_workspace( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn kick_user( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_user_status( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_user_info( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn get_workspace_online_users( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + async fn is_user_online( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + } + #[derive(Debug)] + pub struct SessionAdminServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl SessionAdminServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for SessionAdminServer + where + T: SessionAdmin, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/admin.SessionAdmin/ListWorkspaceSessions" => { + #[allow(non_camel_case_types)] + struct ListWorkspaceSessionsSvc(pub Arc); + impl< + T: SessionAdmin, + > tonic::server::UnaryService + for ListWorkspaceSessionsSvc { + type Response = super::ListWorkspaceSessionsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::list_workspace_sessions( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ListWorkspaceSessionsSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/admin.SessionAdmin/ListUserSessions" => { + #[allow(non_camel_case_types)] + struct ListUserSessionsSvc(pub Arc); + impl< + T: SessionAdmin, + > tonic::server::UnaryService + for ListUserSessionsSvc { + type Response = super::ListUserSessionsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::list_user_sessions(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ListUserSessionsSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/admin.SessionAdmin/KickUserFromWorkspace" => { + #[allow(non_camel_case_types)] + struct KickUserFromWorkspaceSvc(pub Arc); + impl< + T: SessionAdmin, + > tonic::server::UnaryService + for KickUserFromWorkspaceSvc { + type Response = super::KickUserFromWorkspaceResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::kick_user_from_workspace( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = KickUserFromWorkspaceSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/admin.SessionAdmin/KickUser" => { + #[allow(non_camel_case_types)] + struct KickUserSvc(pub Arc); + impl< + T: SessionAdmin, + > tonic::server::UnaryService + for KickUserSvc { + type Response = super::KickUserResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::kick_user(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = KickUserSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/admin.SessionAdmin/GetUserStatus" => { + #[allow(non_camel_case_types)] + struct GetUserStatusSvc(pub Arc); + impl< + T: SessionAdmin, + > tonic::server::UnaryService + for GetUserStatusSvc { + type Response = super::GetUserStatusResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_user_status(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetUserStatusSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/admin.SessionAdmin/GetUserInfo" => { + #[allow(non_camel_case_types)] + struct GetUserInfoSvc(pub Arc); + impl< + T: SessionAdmin, + > tonic::server::UnaryService + for GetUserInfoSvc { + type Response = super::GetUserInfoResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_user_info(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetUserInfoSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/admin.SessionAdmin/GetWorkspaceOnlineUsers" => { + #[allow(non_camel_case_types)] + struct GetWorkspaceOnlineUsersSvc(pub Arc); + impl< + T: SessionAdmin, + > tonic::server::UnaryService + for GetWorkspaceOnlineUsersSvc { + type Response = super::GetWorkspaceOnlineUsersResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + super::GetWorkspaceOnlineUsersRequest, + >, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_workspace_online_users( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetWorkspaceOnlineUsersSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/admin.SessionAdmin/IsUserOnline" => { + #[allow(non_camel_case_types)] + struct IsUserOnlineSvc(pub Arc); + impl< + T: SessionAdmin, + > tonic::server::UnaryService + for IsUserOnlineSvc { + type Response = super::IsUserOnlineResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::is_user_online(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = IsUserOnlineSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new( + tonic::body::Body::default(), + ); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for SessionAdminServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "admin.SessionAdmin"; + impl tonic::server::NamedService for SessionAdminServer { + const NAME: &'static str = SERVICE_NAME; + } +} diff --git a/libs/rpc/admin/generated/mod.rs b/libs/rpc/admin/generated/mod.rs new file mode 100644 index 0000000..f0f5f65 --- /dev/null +++ b/libs/rpc/admin/generated/mod.rs @@ -0,0 +1,7 @@ +// Generated message types from proto/admin.proto. + +pub mod admin; + +// Generated tonic service client/server from manual definitions. +#[path = "admin.SessionAdmin.rs"] +pub mod admin_session_admin; diff --git a/libs/rpc/admin/server.rs b/libs/rpc/admin/server.rs new file mode 100644 index 0000000..18eafb9 --- /dev/null +++ b/libs/rpc/admin/server.rs @@ -0,0 +1,215 @@ +//! Tonic gRPC server implementation for SessionAdmin service. + +use session_manager::SessionManager; +use slog::Logger; +use std::net::SocketAddr; +use tokio::sync::broadcast; +use tonic::{transport::Server, Request, Response, Status}; + +use super::generated::admin::{ + GetUserInfoRequest, GetUserInfoResponse, GetUserStatusRequest, GetUserStatusResponse, + GetWorkspaceOnlineUsersRequest, GetWorkspaceOnlineUsersResponse, IsUserOnlineRequest, + IsUserOnlineResponse, KickUserFromWorkspaceRequest, KickUserFromWorkspaceResponse, + KickUserRequest, KickUserResponse, ListUserSessionsRequest, ListUserSessionsResponse, + ListWorkspaceSessionsRequest, ListWorkspaceSessionsResponse, +}; +use super::generated::admin_session_admin::session_admin_server::{ + SessionAdmin, SessionAdminServer, +}; +use super::types::{parse_uuid, to_proto_info, to_proto_session, to_proto_status}; + +#[derive(Clone)] +pub struct SessionAdminService { + session_manager: SessionManager, +} + +impl SessionAdminService { + pub fn new(session_manager: SessionManager) -> Self { + Self { session_manager } + } +} + +#[tonic::async_trait] +impl SessionAdmin for SessionAdminService { + async fn list_workspace_sessions( + &self, + req: Request, + ) -> Result, Status> { + let workspace_id = parse_uuid(&req.get_ref().workspace_id) + .ok_or_else(|| Status::invalid_argument("invalid workspace_id"))?; + + let sessions = self + .session_manager + .get_workspace_sessions(&workspace_id) + .await + .map_err(|e| Status::internal(e.to_string()))?; + + let sessions = sessions.iter().map(to_proto_session).collect(); + Ok(Response::new(ListWorkspaceSessionsResponse { sessions })) + } + + async fn list_user_sessions( + &self, + req: Request, + ) -> Result, Status> { + let user_id = parse_uuid(&req.get_ref().user_id) + .ok_or_else(|| Status::invalid_argument("invalid user_id"))?; + + let sessions = self + .session_manager + .get_user_sessions(&user_id) + .await + .map_err(|e| Status::internal(e.to_string()))?; + + let sessions = sessions.iter().map(to_proto_session).collect(); + Ok(Response::new(ListUserSessionsResponse { sessions })) + } + + async fn kick_user_from_workspace( + &self, + req: Request, + ) -> Result, Status> { + let r = req.get_ref(); + let user_id = + parse_uuid(&r.user_id).ok_or_else(|| Status::invalid_argument("invalid user_id"))?; + let workspace_id = parse_uuid(&r.workspace_id) + .ok_or_else(|| Status::invalid_argument("invalid workspace_id"))?; + + let kicked_count = self + .session_manager + .kick_user_from_workspace(&user_id, &workspace_id) + .await + .map_err(|e| Status::internal(e.to_string()))?; + + Ok(Response::new(KickUserFromWorkspaceResponse { + kicked_count: kicked_count as u32, + })) + } + + async fn kick_user( + &self, + req: Request, + ) -> Result, Status> { + let user_id = parse_uuid(&req.get_ref().user_id) + .ok_or_else(|| Status::invalid_argument("invalid user_id"))?; + + let kicked_count = self + .session_manager + .kick_user(&user_id) + .await + .map_err(|e| Status::internal(e.to_string()))?; + + Ok(Response::new(KickUserResponse { + kicked_count: kicked_count as u32, + })) + } + + async fn get_user_status( + &self, + req: Request, + ) -> Result, Status> { + let user_id = parse_uuid(&req.get_ref().user_id) + .ok_or_else(|| Status::invalid_argument("invalid user_id"))?; + + let status = self + .session_manager + .get_user_status(&user_id) + .await + .map_err(|e| Status::internal(e.to_string()))?; + + Ok(Response::new(GetUserStatusResponse { + status: to_proto_status(status) as i32, + })) + } + + async fn get_user_info( + &self, + req: Request, + ) -> Result, Status> { + let user_id = parse_uuid(&req.get_ref().user_id) + .ok_or_else(|| Status::invalid_argument("invalid user_id"))?; + + let info = self + .session_manager + .get_user_info(&user_id) + .await + .map_err(|e| Status::internal(e.to_string()))?; + + Ok(Response::new(GetUserInfoResponse { + info: info.as_ref().map(to_proto_info), + })) + } + + async fn get_workspace_online_users( + &self, + req: Request, + ) -> Result, Status> { + let workspace_id = parse_uuid(&req.get_ref().workspace_id) + .ok_or_else(|| Status::invalid_argument("invalid workspace_id"))?; + + let user_ids = self + .session_manager + .get_workspace_online_users(&workspace_id) + .await + .map_err(|e| Status::internal(e.to_string()))?; + + let user_ids = user_ids.iter().map(|u| u.to_string()).collect(); + Ok(Response::new(GetWorkspaceOnlineUsersResponse { user_ids })) + } + + async fn is_user_online( + &self, + req: Request, + ) -> Result, Status> { + let user_id = parse_uuid(&req.get_ref().user_id) + .ok_or_else(|| Status::invalid_argument("invalid user_id"))?; + + let online = self + .session_manager + .is_user_online(&user_id) + .await + .map_err(|e| Status::internal(e.to_string()))?; + + Ok(Response::new(IsUserOnlineResponse { online })) + } +} + +/// Default gRPC admin port. +pub const DEFAULT_GRPC_PORT: u16 = 9090; + +/// Start the Tonic gRPC server on the given address. +pub async fn serve( + addr: SocketAddr, + session_manager: SessionManager, + log: Logger, +) -> anyhow::Result<()> { + let service = SessionAdminService::new(session_manager); + let incoming = tonic::transport::server::TcpIncoming::bind(addr) + .map_err(|e| anyhow::anyhow!("failed to bind TcpIncoming: {}", e))?; + + slog::info!(log, "Admin gRPC server listening on {}", addr); + + Server::builder() + .add_service(SessionAdminServer::new(service)) + .serve_with_incoming(incoming) + .await?; + + Ok(()) +} + +/// Spawn the gRPC server as a background task. +pub fn spawn( + addr: SocketAddr, + session_manager: SessionManager, + log: Logger, + mut shutdown_rx: broadcast::Receiver<()>, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let result = serve(addr, session_manager, log).await; + if let Err(e) = result { + eprintln!("Admin gRPC server error: {}", e); + } + + let _ = shutdown_rx.recv().await; + }) +} diff --git a/libs/rpc/admin/types.rs b/libs/rpc/admin/types.rs new file mode 100644 index 0000000..d9c7d03 --- /dev/null +++ b/libs/rpc/admin/types.rs @@ -0,0 +1,68 @@ +//! Conversion between proto types and session_manager types. + +use chrono::{DateTime, Utc}; + +use super::generated::admin::{OnlineStatus as ProtoStatus, UserSession as ProtoSession, SessionInfo as ProtoInfo}; + +use session_manager::{OnlineStatus, SessionInfo, UserSession}; + +/// Convert DateTime → prost_types::Timestamp. +fn datetime_to_timestamp(dt: DateTime) -> prost_types::Timestamp { + prost_types::Timestamp { + seconds: dt.timestamp(), + nanos: dt.timestamp_subsec_nanos() as i32, + } +} + +/// Convert session_manager UserSession → proto UserSession. +pub fn to_proto_session(s: &UserSession) -> ProtoSession { + ProtoSession { + session_id: s.session_id.to_string(), + user_id: s.user_id.to_string(), + workspace_id: s.workspace_id.to_string(), + ip_address: s.ip_address.clone(), + user_agent: s.user_agent.clone(), + connected_at: Some(datetime_to_timestamp(s.connected_at)), + last_heartbeat: Some(datetime_to_timestamp(s.last_heartbeat)), + } +} + +/// Convert session_manager OnlineStatus → proto OnlineStatus. +pub fn to_proto_status(s: OnlineStatus) -> ProtoStatus { + match s { + OnlineStatus::Online => ProtoStatus::Online, + OnlineStatus::Idle => ProtoStatus::Idle, + OnlineStatus::Offline => ProtoStatus::Offline, + } +} + +/// Convert session_manager SessionInfo → proto SessionInfo. +pub fn to_proto_info(info: &SessionInfo) -> ProtoInfo { + ProtoInfo { + user_id: info.user_id.to_string(), + session_count: info.session_count as u32, + workspaces: info.workspaces.iter().map(|w| w.to_string()).collect(), + latest_session: info.latest_session.as_ref().map(to_proto_session), + } +} + +/// Convert prost_types::Timestamp → chrono::DateTime. +pub fn prost_ts_to_chrono(ts: &prost_types::Timestamp) -> DateTime { + DateTime::from_timestamp(ts.seconds, ts.nanos as u32) + .unwrap_or_else(Utc::now) +} + +/// Convert proto OnlineStatus → session_manager OnlineStatus. +pub fn from_proto_status(s: ProtoStatus) -> OnlineStatus { + match s { + ProtoStatus::Online => OnlineStatus::Online, + ProtoStatus::Idle => OnlineStatus::Idle, + ProtoStatus::Offline => OnlineStatus::Offline, + ProtoStatus::Unspecified => OnlineStatus::Offline, + } +} + +/// Parse a string UUID. Returns None on parse failure. +pub fn parse_uuid(s: &str) -> Option { + uuid::Uuid::parse_str(s).ok() +} diff --git a/libs/rpc/build.rs b/libs/rpc/build.rs new file mode 100644 index 0000000..f614842 --- /dev/null +++ b/libs/rpc/build.rs @@ -0,0 +1,93 @@ +fn main() -> Result<(), Box> { + let proto_file = "proto/admin.proto"; + let out_dir = "admin/generated"; + std::fs::create_dir_all(out_dir)?; + + tonic_prost_build::configure() + .out_dir(out_dir) + .compile_protos(&[proto_file], &["proto/"])?; + + let service = tonic_prost_build::manual::Service::builder() + .name("SessionAdmin") + .package("admin") + .method( + tonic_prost_build::manual::Method::builder() + .name("list_workspace_sessions") + .route_name("ListWorkspaceSessions") + .input_type("crate::admin::generated::admin::ListWorkspaceSessionsRequest") + .output_type("crate::admin::generated::admin::ListWorkspaceSessionsResponse") + .codec_path("tonic_prost::ProstCodec") + .build(), + ) + .method( + tonic_prost_build::manual::Method::builder() + .name("list_user_sessions") + .route_name("ListUserSessions") + .input_type("crate::admin::generated::admin::ListUserSessionsRequest") + .output_type("crate::admin::generated::admin::ListUserSessionsResponse") + .codec_path("tonic_prost::ProstCodec") + .build(), + ) + .method( + tonic_prost_build::manual::Method::builder() + .name("kick_user_from_workspace") + .route_name("KickUserFromWorkspace") + .input_type("crate::admin::generated::admin::KickUserFromWorkspaceRequest") + .output_type("crate::admin::generated::admin::KickUserFromWorkspaceResponse") + .codec_path("tonic_prost::ProstCodec") + .build(), + ) + .method( + tonic_prost_build::manual::Method::builder() + .name("kick_user") + .route_name("KickUser") + .input_type("crate::admin::generated::admin::KickUserRequest") + .output_type("crate::admin::generated::admin::KickUserResponse") + .codec_path("tonic_prost::ProstCodec") + .build(), + ) + .method( + tonic_prost_build::manual::Method::builder() + .name("get_user_status") + .route_name("GetUserStatus") + .input_type("crate::admin::generated::admin::GetUserStatusRequest") + .output_type("crate::admin::generated::admin::GetUserStatusResponse") + .codec_path("tonic_prost::ProstCodec") + .build(), + ) + .method( + tonic_prost_build::manual::Method::builder() + .name("get_user_info") + .route_name("GetUserInfo") + .input_type("crate::admin::generated::admin::GetUserInfoRequest") + .output_type("crate::admin::generated::admin::GetUserInfoResponse") + .codec_path("tonic_prost::ProstCodec") + .build(), + ) + .method( + tonic_prost_build::manual::Method::builder() + .name("get_workspace_online_users") + .route_name("GetWorkspaceOnlineUsers") + .input_type("crate::admin::generated::admin::GetWorkspaceOnlineUsersRequest") + .output_type("crate::admin::generated::admin::GetWorkspaceOnlineUsersResponse") + .codec_path("tonic_prost::ProstCodec") + .build(), + ) + .method( + tonic_prost_build::manual::Method::builder() + .name("is_user_online") + .route_name("IsUserOnline") + .input_type("crate::admin::generated::admin::IsUserOnlineRequest") + .output_type("crate::admin::generated::admin::IsUserOnlineResponse") + .codec_path("tonic_prost::ProstCodec") + .build(), + ) + .build(); + + tonic_prost_build::manual::Builder::new() + .out_dir(out_dir) + .build_transport(true) + .compile(&[service]); + + Ok(()) +} diff --git a/libs/rpc/lib.rs b/libs/rpc/lib.rs index b93cf3f..92918b0 100644 --- a/libs/rpc/lib.rs +++ b/libs/rpc/lib.rs @@ -1,14 +1 @@ -pub fn add(left: u64, right: u64) -> u64 { - left + right -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); - } -} +pub mod admin; diff --git a/libs/rpc/proto/admin.proto b/libs/rpc/proto/admin.proto new file mode 100644 index 0000000..dcacf07 --- /dev/null +++ b/libs/rpc/proto/admin.proto @@ -0,0 +1,109 @@ +syntax = "proto3"; + +package admin; + +import "google/protobuf/timestamp.proto"; + +// --------------------------------------------------------------------------- +// Session entities +// --------------------------------------------------------------------------- + +message UserSession { + string session_id = 1; + string user_id = 2; + string workspace_id = 3; + optional string ip_address = 4; + optional string user_agent = 5; + google.protobuf.Timestamp connected_at = 6; + google.protobuf.Timestamp last_heartbeat = 7; +} + +message SessionInfo { + string user_id = 1; + uint32 session_count = 2; + repeated string workspaces = 3; + optional UserSession latest_session = 4; +} + +enum OnlineStatus { + ONLINE_STATUS_UNSPECIFIED = 0; + ONLINE_STATUS_ONLINE = 1; + ONLINE_STATUS_IDLE = 2; + ONLINE_STATUS_OFFLINE = 3; +} + +// --------------------------------------------------------------------------- +// Requests & Responses +// --------------------------------------------------------------------------- + +message ListWorkspaceSessionsRequest { + string workspace_id = 1; +} +message ListWorkspaceSessionsResponse { + repeated UserSession sessions = 1; +} + +message ListUserSessionsRequest { + string user_id = 1; +} +message ListUserSessionsResponse { + repeated UserSession sessions = 1; +} + +message KickUserFromWorkspaceRequest { + string user_id = 1; + string workspace_id = 2; +} +message KickUserFromWorkspaceResponse { + uint32 kicked_count = 1; +} + +message KickUserRequest { + string user_id = 1; +} +message KickUserResponse { + uint32 kicked_count = 1; +} + +message GetUserStatusRequest { + string user_id = 1; +} +message GetUserStatusResponse { + OnlineStatus status = 1; +} + +message GetUserInfoRequest { + string user_id = 1; +} +message GetUserInfoResponse { + optional SessionInfo info = 1; +} + +message GetWorkspaceOnlineUsersRequest { + string workspace_id = 1; +} +message GetWorkspaceOnlineUsersResponse { + repeated string user_ids = 1; +} + +message IsUserOnlineRequest { + string user_id = 1; +} +message IsUserOnlineResponse { + bool online = 1; +} + +// --------------------------------------------------------------------------- +// Service +// --------------------------------------------------------------------------- + +service SessionAdmin { + rpc ListWorkspaceSessions(ListWorkspaceSessionsRequest) returns (ListWorkspaceSessionsResponse); + rpc ListUserSessions(ListUserSessionsRequest) returns (ListUserSessionsResponse); + rpc KickUserFromWorkspace(KickUserFromWorkspaceRequest) returns (KickUserFromWorkspaceResponse); + rpc KickUser(KickUserRequest) returns (KickUserResponse); + rpc GetUserStatus(GetUserStatusRequest) returns (GetUserStatusResponse); + rpc GetUserInfo(GetUserInfoRequest) returns (GetUserInfoResponse); + rpc GetWorkspaceOnlineUsers(GetWorkspaceOnlineUsersRequest) returns (GetWorkspaceOnlineUsersResponse); + rpc IsUserOnline(IsUserOnlineRequest) returns (IsUserOnlineResponse); +} diff --git a/libs/session_manager/Cargo.toml b/libs/session_manager/Cargo.toml new file mode 100644 index 0000000..4a52ec3 --- /dev/null +++ b/libs/session_manager/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "session_manager" +version.workspace = true +edition.workspace = true +authors.workspace = true +description.workspace = true +repository.workspace = true +readme.workspace = true +homepage.workspace = true +license.workspace = true +keywords.workspace = true +categories.workspace = true +documentation.workspace = true +[lib] +path = "src/lib.rs" +name = "session_manager" + +[dependencies] +anyhow = { workspace = true } +chrono = { workspace = true, features = ["serde"] } +deadpool-redis = { workspace = true, features = ["cluster"] } +rand = { workspace = true } +redis = { workspace = true } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true, features = ["rt-multi-thread", "sync"] } +uuid = { workspace = true, features = ["serde", "v4"] } +slog = { workspace = true } + +[lints] +workspace = true diff --git a/libs/session_manager/src/lib.rs b/libs/session_manager/src/lib.rs new file mode 100644 index 0000000..580202d --- /dev/null +++ b/libs/session_manager/src/lib.rs @@ -0,0 +1,7 @@ +mod manager; +mod storage; +mod types; + +pub use manager::{SessionManager, SessionManagerConfig}; +pub use storage::SessionStorage; +pub use types::{OnlineStatus, SessionInfo, UserSession}; diff --git a/libs/session_manager/src/manager.rs b/libs/session_manager/src/manager.rs new file mode 100644 index 0000000..69fd26d --- /dev/null +++ b/libs/session_manager/src/manager.rs @@ -0,0 +1,197 @@ +use chrono::Utc; +use uuid::Uuid; + +use crate::storage::{SessionStorage, SessionStorageError}; +use crate::types::{OnlineStatus, SessionInfo, UserSession}; +use slog::info; + +#[derive(Debug, Clone)] +pub struct SessionManagerConfig { + pub heartbeat_interval_secs: u64, + pub idle_threshold_secs: u64, +} + +impl Default for SessionManagerConfig { + fn default() -> Self { + Self { + heartbeat_interval_secs: 60, + idle_threshold_secs: 300, + } + } +} + +#[derive(Clone)] +pub struct SessionManager { + storage: SessionStorage, + #[allow(dead_code)] + config: SessionManagerConfig, + logger: slog::Logger, +} + +impl SessionManager { + pub fn new(storage: SessionStorage, logger: slog::Logger) -> Self { + Self { + storage, + config: SessionManagerConfig::default(), + logger, + } + } + + pub fn with_config( + storage: SessionStorage, + config: SessionManagerConfig, + logger: slog::Logger, + ) -> Self { + Self { + storage, + config, + logger, + } + } + + /// Register a new user session. Returns the generated session ID. + pub async fn register_session( + &self, + user_id: Uuid, + workspace_id: Uuid, + ip_address: Option, + user_agent: Option, + ) -> Result { + let now = Utc::now(); + let session = UserSession { + session_id: Uuid::new_v4(), + user_id, + workspace_id, + ip_address, + user_agent, + connected_at: now, + last_heartbeat: now, + }; + + self.storage.save_session(&session).await?; + info!(self.logger, "session_registered"; + "session_id" => %session.session_id, + "user_id" => %session.user_id, + "workspace_id" => %session.workspace_id + ); + Ok(session) + } + + /// Refresh a session's heartbeat to keep it alive. + pub async fn heartbeat(&self, session_id: &Uuid) -> Result<(), SessionStorageError> { + self.storage.heartbeat(session_id).await + } + + /// Remove a single session (logout from one device/tab). + pub async fn remove_session(&self, session_id: &Uuid) -> Result<(), SessionStorageError> { + self.storage.delete_session(session_id).await?; + info!(self.logger, "session_removed"; "session_id" => %session_id); + Ok(()) + } + + /// Kick a user from a workspace (remove all their sessions in that workspace). + pub async fn kick_user_from_workspace( + &self, + user_id: &Uuid, + workspace_id: &Uuid, + ) -> Result { + let deleted = self + .storage + .delete_user_workspace_sessions(user_id, workspace_id) + .await?; + let count = deleted.len(); + info!(self.logger, "user_kicked_from_workspace"; + "user_id" => %user_id, + "workspace_id" => %workspace_id, + "sessions_removed" => count + ); + Ok(count) + } + + /// Kick a user from all workspaces (full logout across all devices). + pub async fn kick_user(&self, user_id: &Uuid) -> Result { + let deleted = self.storage.delete_user_sessions(user_id).await?; + let count = deleted.len(); + info!(self.logger, "user_kicked"; "user_id" => %user_id, "sessions_removed" => count); + Ok(count) + } + + /// Get all sessions for a user. + pub async fn get_user_sessions( + &self, + user_id: &Uuid, + ) -> Result, SessionStorageError> { + self.storage.get_user_sessions(user_id).await + } + + /// Get session info (summary) for a user. + pub async fn get_user_info( + &self, + user_id: &Uuid, + ) -> Result, SessionStorageError> { + let sessions = self.storage.get_user_sessions(user_id).await?; + if sessions.is_empty() { + return Ok(None); + } + + let mut workspaces: Vec = sessions.iter().map(|s| s.workspace_id).collect(); + workspaces.sort(); + workspaces.dedup(); + + let latest = sessions.iter().max_by_key(|s| s.connected_at).cloned(); + + Ok(Some(SessionInfo { + user_id: *user_id, + session_count: sessions.len(), + workspaces, + latest_session: latest, + })) + } + + /// Get all active sessions in a workspace. + pub async fn get_workspace_sessions( + &self, + workspace_id: &Uuid, + ) -> Result, SessionStorageError> { + self.storage.get_workspace_sessions(workspace_id).await + } + + /// Get distinct user IDs in a workspace. + pub async fn get_workspace_online_users( + &self, + workspace_id: &Uuid, + ) -> Result, SessionStorageError> { + self.storage.get_workspace_online_users(workspace_id).await + } + + /// Get online status for a user. + pub async fn get_user_status( + &self, + user_id: &Uuid, + ) -> Result { + self.storage.get_user_status(user_id).await + } + + /// Check if a user is online in any workspace. + pub async fn is_user_online(&self, user_id: &Uuid) -> Result { + self.storage.is_user_online(user_id).await + } + + /// Returns a reference to the underlying Redis pool. + pub fn pool(&self) -> &deadpool_redis::cluster::Pool { + self.storage.pool() + } + + /// Get online status for multiple users at once. + pub async fn get_bulk_status( + &self, + user_ids: &[Uuid], + ) -> Result, SessionStorageError> { + let mut results = Vec::with_capacity(user_ids.len()); + for uid in user_ids { + let status = self.storage.get_user_status(uid).await?; + results.push((*uid, status)); + } + Ok(results) + } +} diff --git a/libs/session_manager/src/storage.rs b/libs/session_manager/src/storage.rs new file mode 100644 index 0000000..ecb4b33 --- /dev/null +++ b/libs/session_manager/src/storage.rs @@ -0,0 +1,329 @@ +use anyhow::Context; +use chrono::Utc; +use deadpool_redis::cluster::Pool; +use redis::AsyncCommands; +use serde_json; +use thiserror::Error; +use uuid::Uuid; + +use crate::types::UserSession; + +#[derive(Error, Debug)] +pub enum SessionStorageError { + #[error("Redis error: {0}")] + Redis(#[from] anyhow::Error), + + #[error("session not found: {0}")] + NotFound(Uuid), +} + +const KEY_CONN: &str = "user:conn:"; +const KEY_USER_SESSIONS: &str = "user:user_sessions:"; +const KEY_WORKSPACE_SESSIONS: &str = "user:workspace_sessions:"; + +#[derive(Clone)] +pub struct SessionStorage { + pool: Pool, + heartbeat_ttl_secs: u64, +} + +impl SessionStorage { + pub fn new(pool: Pool) -> Self { + Self { + pool, + heartbeat_ttl_secs: 120, + } + } + + pub fn with_heartbeat_ttl(mut self, ttl_secs: u64) -> Self { + self.heartbeat_ttl_secs = ttl_secs; + self + } + + async fn get_conn(&self) -> Result { + self.pool + .get() + .await + .context("failed to get Redis connection from pool") + .map_err(SessionStorageError::Redis) + } + + fn conn_key(session_id: &Uuid) -> String { + format!("{KEY_CONN}{session_id}") + } + + fn user_sessions_key(user_id: &Uuid) -> String { + format!("{KEY_USER_SESSIONS}{user_id}") + } + + fn workspace_sessions_key(workspace_id: &Uuid) -> String { + format!("{KEY_WORKSPACE_SESSIONS}{workspace_id}") + } + + fn to_err(e: E) -> SessionStorageError { + SessionStorageError::Redis(anyhow::anyhow!(e)) + } + + /// Store a new user session and associate it with user + workspace indexes. + pub async fn save_session(&self, session: &UserSession) -> Result<(), SessionStorageError> { + let mut conn = self.get_conn().await?; + let key = Self::conn_key(&session.session_id); + let user_key = Self::user_sessions_key(&session.user_id); + let ws_key = Self::workspace_sessions_key(&session.workspace_id); + + let value = serde_json::to_string(session) + .context("serialize UserSession") + .map_err(SessionStorageError::Redis)?; + + let ttl = self.heartbeat_ttl_secs; + + let _: () = redis::pipe() + .set_ex(&key, &value, ttl) + .sadd(&user_key, session_id_str(&session.session_id)) + .expire(&user_key, 0) + .sadd(&ws_key, session_id_str(&session.session_id)) + .expire(&ws_key, 0) + .query_async(&mut conn) + .await + .map_err(Self::to_err)?; + + Ok(()) + } + + /// Get a session by its ID. + pub async fn get_session(&self, session_id: &Uuid) -> Result, SessionStorageError> { + let mut conn = self.get_conn().await?; + let key = Self::conn_key(session_id); + + let value: Option = conn + .get(&key) + .await + .map_err(Self::to_err)?; + + match value { + Some(v) => { + let session: UserSession = serde_json::from_str(&v) + .context("deserialize UserSession") + .map_err(SessionStorageError::Redis)?; + Ok(Some(session)) + } + None => Ok(None), + } + } + + /// Update the heartbeat timestamp and refresh TTL. + pub async fn heartbeat(&self, session_id: &Uuid) -> Result<(), SessionStorageError> { + let mut conn = self.get_conn().await?; + let key = Self::conn_key(session_id); + let ttl = self.heartbeat_ttl_secs; + let updated = Utc::now(); + + let script = redis::Script::new( + r#" + local v = redis.call('GET', KEYS[1]) + if not v then return 0 end + local session = cjson.decode(v) + session.last_heartbeat = ARGV[1] + redis.call('SETEX', KEYS[1], ARGV[2], cjson.encode(session)) + return 1 + "#, + ); + + let result: i64 = script + .key(&key) + .arg(updated.to_rfc3339()) + .arg(ttl) + .invoke_async(&mut conn) + .await + .map_err(Self::to_err)?; + + if result == 0 { + return Err(SessionStorageError::NotFound(*session_id)); + } + + Ok(()) + } + + /// Delete a session by ID and clean up indexes. + pub async fn delete_session(&self, session_id: &Uuid) -> Result<(), SessionStorageError> { + let session = self.get_session(session_id).await?; + + let key = Self::conn_key(session_id); + + let _: () = self + .get_conn() + .await? + .del(&key) + .await + .map_err(Self::to_err)?; + + if let Some(ref s) = session { + let mut conn = self.get_conn().await?; + let user_key = Self::user_sessions_key(&s.user_id); + let ws_key = Self::workspace_sessions_key(&s.workspace_id); + let id_str = session_id_str(session_id); + let _: () = conn.srem::<_, _, ()>(&user_key, &id_str).await.map_err(Self::to_err)?; + let _: () = conn.srem::<_, _, ()>(&ws_key, &id_str).await.map_err(Self::to_err)?; + } + + Ok(()) + } + + /// Delete all sessions for a specific user. + pub async fn delete_user_sessions(&self, user_id: &Uuid) -> Result, SessionStorageError> { + let mut conn = self.get_conn().await?; + let user_key = Self::user_sessions_key(user_id); + + let session_ids: Vec = conn + .smembers(&user_key) + .await + .map_err(Self::to_err)?; + + let mut deleted = Vec::new(); + for id_str in &session_ids { + if let Ok(sid) = Uuid::parse_str(id_str) { + let conn_key = Self::conn_key(&sid); + let _: () = conn.del(&conn_key).await.map_err(Self::to_err)?; + deleted.push(sid); + } + } + + let _: () = conn.del(&user_key).await.map_err(Self::to_err)?; + + Ok(deleted) + } + + /// Delete all sessions for a user within a specific workspace. + pub async fn delete_user_workspace_sessions( + &self, + user_id: &Uuid, + workspace_id: &Uuid, + ) -> Result, SessionStorageError> { + let mut conn = self.get_conn().await?; + let ws_key = Self::workspace_sessions_key(workspace_id); + let user_key = Self::user_sessions_key(user_id); + + let ws_session_ids: Vec = conn + .smembers(&ws_key) + .await + .map_err(Self::to_err)?; + + let mut deleted = Vec::new(); + for id_str in &ws_session_ids { + if let Ok(sid) = Uuid::parse_str(id_str) { + let session = self.get_session(&sid).await?; + if let Some(ref s) = session { + if s.user_id == *user_id { + let conn_key = Self::conn_key(&sid); + let _: () = conn.del(&conn_key).await.map_err(Self::to_err)?; + let _: () = conn.srem::<_, _, ()>(&user_key, id_str).await.map_err(Self::to_err)?; + deleted.push(sid); + } + } + } + } + + Ok(deleted) + } + + /// Get all active sessions for a user. + pub async fn get_user_sessions(&self, user_id: &Uuid) -> Result, SessionStorageError> { + let mut conn = self.get_conn().await?; + let user_key = Self::user_sessions_key(user_id); + + let session_ids: Vec = conn + .smembers(&user_key) + .await + .map_err(Self::to_err)?; + + let mut sessions = Vec::new(); + for id_str in &session_ids { + if let Ok(sid) = Uuid::parse_str(id_str) { + if let Ok(Some(session)) = self.get_session(&sid).await { + sessions.push(session); + } + } + } + + Ok(sessions) + } + + /// Get all active sessions in a workspace. + pub async fn get_workspace_sessions( + &self, + workspace_id: &Uuid, + ) -> Result, SessionStorageError> { + let mut conn = self.get_conn().await?; + let ws_key = Self::workspace_sessions_key(workspace_id); + + let session_ids: Vec = conn + .smembers(&ws_key) + .await + .map_err(Self::to_err)?; + + let mut sessions = Vec::new(); + for id_str in &session_ids { + if let Ok(sid) = Uuid::parse_str(id_str) { + if let Ok(Some(session)) = self.get_session(&sid).await { + sessions.push(session); + } + } + } + + Ok(sessions) + } + + /// Get distinct user IDs active in a workspace. + pub async fn get_workspace_online_users( + &self, + workspace_id: &Uuid, + ) -> Result, SessionStorageError> { + let sessions = self.get_workspace_sessions(workspace_id).await?; + let mut seen = std::collections::HashSet::new(); + let mut result = Vec::new(); + for s in sessions { + if seen.insert(s.user_id) { + result.push(s.user_id); + } + } + Ok(result) + } + + /// Get the count of online sessions for a user. + pub async fn get_user_session_count(&self, user_id: &Uuid) -> Result { + let sessions = self.get_user_sessions(user_id).await?; + Ok(sessions.len()) + } + + /// Check if a user has any active sessions (online status). + pub async fn is_user_online(&self, user_id: &Uuid) -> Result { + let count = self.get_user_session_count(user_id).await?; + Ok(count > 0) + } + + /// Returns a reference to the underlying Redis pool. + pub fn pool(&self) -> &deadpool_redis::cluster::Pool { + &self.pool + } + + /// Get online status for a user. + pub async fn get_user_status(&self, user_id: &Uuid) -> Result { + let sessions = self.get_user_sessions(user_id).await?; + if sessions.is_empty() { + return Ok(crate::types::OnlineStatus::Offline); + } + + let now = Utc::now(); + let idle_threshold = chrono::Duration::minutes(5); + + if sessions.iter().any(|s| now - s.last_heartbeat < idle_threshold) { + Ok(crate::types::OnlineStatus::Online) + } else { + Ok(crate::types::OnlineStatus::Idle) + } + } +} + +fn session_id_str(id: &Uuid) -> String { + id.to_string() +} diff --git a/libs/session_manager/src/types.rs b/libs/session_manager/src/types.rs new file mode 100644 index 0000000..9eba4a8 --- /dev/null +++ b/libs/session_manager/src/types.rs @@ -0,0 +1,36 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum OnlineStatus { + Online, + Idle, + Offline, +} + +impl Default for OnlineStatus { + fn default() -> Self { + Self::Offline + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UserSession { + pub session_id: Uuid, + pub user_id: Uuid, + pub workspace_id: Uuid, + pub ip_address: Option, + pub user_agent: Option, + pub connected_at: DateTime, + pub last_heartbeat: DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SessionInfo { + pub user_id: Uuid, + pub session_count: usize, + pub workspaces: Vec, + pub latest_session: Option, +}