diff --git a/libs/config/redis.rs b/libs/config/redis.rs index a5c2c86..6975cc1 100644 --- a/libs/config/redis.rs +++ b/libs/config/redis.rs @@ -1,6 +1,14 @@ use crate::AppConfig; impl AppConfig { + /// Returns a single Redis URL (first from APP_REDIS_URLS or APP_REDIS_URL). + pub fn redis_url(&self) -> anyhow::Result { + let urls = self.redis_urls()?; + urls.into_iter().next().ok_or_else(|| { + anyhow::anyhow!("APP_REDIS_URLS or APP_REDIS_URL is empty") + }) + } + pub fn redis_urls(&self) -> anyhow::Result> { if let Some(urls) = self.env.get("APP_REDIS_URLS") { return Ok(urls.split(',').map(|s| s.trim().to_string()).collect()); diff --git a/libs/rpc/Cargo.toml b/libs/rpc/Cargo.toml index d6121f7..94bfb08 100644 --- a/libs/rpc/Cargo.toml +++ b/libs/rpc/Cargo.toml @@ -19,12 +19,13 @@ name = "rpc" tonic = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } +tonic-prost = "0.14.5" # Internal session_manager = { workspace = true } -# Logging -slog = { workspace = true } +# Logging / Tracing +tracing = { workspace = true } # Utilities anyhow = { workspace = true } diff --git a/libs/rpc/admin/mod.rs b/libs/rpc/admin/mod.rs index e69de29..bac8801 100644 --- a/libs/rpc/admin/mod.rs +++ b/libs/rpc/admin/mod.rs @@ -0,0 +1,4 @@ +pub mod client; +pub mod generated; +pub mod server; +pub mod types; diff --git a/libs/rpc/admin/server.rs b/libs/rpc/admin/server.rs index 0cd9db6..7f70e0b 100644 --- a/libs/rpc/admin/server.rs +++ b/libs/rpc/admin/server.rs @@ -4,6 +4,7 @@ use session_manager::SessionManager; use std::net::SocketAddr; use tokio::sync::broadcast; use tonic::{transport::Server, Request, Response, Status}; +use tracing::{info_span, Instrument}; use super::generated::admin::{ GetUserInfoRequest, GetUserInfoResponse, GetUserStatusRequest, GetUserStatusResponse, @@ -37,14 +38,18 @@ impl SessionAdmin for SessionAdminService { let workspace_id = parse_uuid(&req.get_ref().workspace_id) .ok_or_else(|| Status::invalid_argument("invalid workspace_id"))?; - let sessions = self - .session_manager - .get_workspace_sessions(&workspace_id) - .await - .map_err(|e| Status::internal(e.to_string()))?; + async { + let sessions = self + .session_manager + .get_workspace_sessions(&workspace_id) + .await + .map_err(|e| Status::internal(e.to_string()))?; - let sessions = sessions.iter().map(to_proto_session).collect(); - Ok(Response::new(ListWorkspaceSessionsResponse { sessions })) + let sessions = sessions.iter().map(to_proto_session).collect(); + Ok(Response::new(ListWorkspaceSessionsResponse { sessions })) + } + .instrument(info_span!("list_workspace_sessions", workspace_id = %workspace_id)) + .await } async fn list_user_sessions( @@ -54,14 +59,18 @@ impl SessionAdmin for SessionAdminService { let user_id = parse_uuid(&req.get_ref().user_id) .ok_or_else(|| Status::invalid_argument("invalid user_id"))?; - let sessions = self - .session_manager - .get_user_sessions(&user_id) - .await - .map_err(|e| Status::internal(e.to_string()))?; + async { + let sessions = self + .session_manager + .get_user_sessions(&user_id) + .await + .map_err(|e| Status::internal(e.to_string()))?; - let sessions = sessions.iter().map(to_proto_session).collect(); - Ok(Response::new(ListUserSessionsResponse { sessions })) + let sessions = sessions.iter().map(to_proto_session).collect(); + Ok(Response::new(ListUserSessionsResponse { sessions })) + } + .instrument(info_span!("list_user_sessions", user_id = %user_id)) + .await } async fn kick_user_from_workspace( @@ -74,15 +83,19 @@ impl SessionAdmin for SessionAdminService { let workspace_id = parse_uuid(&r.workspace_id) .ok_or_else(|| Status::invalid_argument("invalid workspace_id"))?; - let kicked_count = self - .session_manager - .kick_user_from_workspace(&user_id, &workspace_id) - .await - .map_err(|e| Status::internal(e.to_string()))?; + async { + let kicked_count = self + .session_manager + .kick_user_from_workspace(&user_id, &workspace_id) + .await + .map_err(|e| Status::internal(e.to_string()))?; - Ok(Response::new(KickUserFromWorkspaceResponse { - kicked_count: kicked_count as u32, - })) + Ok(Response::new(KickUserFromWorkspaceResponse { + kicked_count: kicked_count as u32, + })) + } + .instrument(info_span!("kick_user_from_workspace", user_id = %user_id, workspace_id = %workspace_id)) + .await } async fn kick_user( @@ -92,15 +105,19 @@ impl SessionAdmin for SessionAdminService { let user_id = parse_uuid(&req.get_ref().user_id) .ok_or_else(|| Status::invalid_argument("invalid user_id"))?; - let kicked_count = self - .session_manager - .kick_user(&user_id) - .await - .map_err(|e| Status::internal(e.to_string()))?; + async { + let kicked_count = self + .session_manager + .kick_user(&user_id) + .await + .map_err(|e| Status::internal(e.to_string()))?; - Ok(Response::new(KickUserResponse { - kicked_count: kicked_count as u32, - })) + Ok(Response::new(KickUserResponse { + kicked_count: kicked_count as u32, + })) + } + .instrument(info_span!("kick_user", user_id = %user_id)) + .await } async fn get_user_status( @@ -110,15 +127,19 @@ impl SessionAdmin for SessionAdminService { let user_id = parse_uuid(&req.get_ref().user_id) .ok_or_else(|| Status::invalid_argument("invalid user_id"))?; - let status = self - .session_manager - .get_user_status(&user_id) - .await - .map_err(|e| Status::internal(e.to_string()))?; + async { + let status = self + .session_manager + .get_user_status(&user_id) + .await + .map_err(|e| Status::internal(e.to_string()))?; - Ok(Response::new(GetUserStatusResponse { - status: to_proto_status(status) as i32, - })) + Ok(Response::new(GetUserStatusResponse { + status: to_proto_status(status) as i32, + })) + } + .instrument(info_span!("get_user_status", user_id = %user_id)) + .await } async fn get_user_info( @@ -128,15 +149,19 @@ impl SessionAdmin for SessionAdminService { let user_id = parse_uuid(&req.get_ref().user_id) .ok_or_else(|| Status::invalid_argument("invalid user_id"))?; - let info = self - .session_manager - .get_user_info(&user_id) - .await - .map_err(|e| Status::internal(e.to_string()))?; + async { + let info = self + .session_manager + .get_user_info(&user_id) + .await + .map_err(|e| Status::internal(e.to_string()))?; - Ok(Response::new(GetUserInfoResponse { - info: info.as_ref().map(to_proto_info), - })) + Ok(Response::new(GetUserInfoResponse { + info: info.as_ref().map(to_proto_info), + })) + } + .instrument(info_span!("get_user_info", user_id = %user_id)) + .await } async fn get_workspace_online_users( @@ -146,14 +171,18 @@ impl SessionAdmin for SessionAdminService { let workspace_id = parse_uuid(&req.get_ref().workspace_id) .ok_or_else(|| Status::invalid_argument("invalid workspace_id"))?; - let user_ids = self - .session_manager - .get_workspace_online_users(&workspace_id) - .await - .map_err(|e| Status::internal(e.to_string()))?; + async { + let user_ids = self + .session_manager + .get_workspace_online_users(&workspace_id) + .await + .map_err(|e| Status::internal(e.to_string()))?; - let user_ids = user_ids.iter().map(|u| u.to_string()).collect(); - Ok(Response::new(GetWorkspaceOnlineUsersResponse { user_ids })) + let user_ids = user_ids.iter().map(|u| u.to_string()).collect(); + Ok(Response::new(GetWorkspaceOnlineUsersResponse { user_ids })) + } + .instrument(info_span!("get_workspace_online_users", workspace_id = %workspace_id)) + .await } async fn is_user_online( @@ -163,13 +192,17 @@ impl SessionAdmin for SessionAdminService { let user_id = parse_uuid(&req.get_ref().user_id) .ok_or_else(|| Status::invalid_argument("invalid user_id"))?; - let online = self - .session_manager - .is_user_online(&user_id) - .await - .map_err(|e| Status::internal(e.to_string()))?; + async { + let online = self + .session_manager + .is_user_online(&user_id) + .await + .map_err(|e| Status::internal(e.to_string()))?; - Ok(Response::new(IsUserOnlineResponse { online })) + Ok(Response::new(IsUserOnlineResponse { online })) + } + .instrument(info_span!("is_user_online", user_id = %user_id)) + .await } } diff --git a/libs/session_manager/Cargo.toml b/libs/session_manager/Cargo.toml index 4a52ec3..c5a92ea 100644 --- a/libs/session_manager/Cargo.toml +++ b/libs/session_manager/Cargo.toml @@ -26,7 +26,7 @@ serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "sync"] } uuid = { workspace = true, features = ["serde", "v4"] } -slog = { workspace = true } +tracing = { workspace = true } [lints] workspace = true