feat(observability): Phase 6 OTLP tracing for gRPC + config helper
- libs/rpc: slog → tracing; 8 gRPC methods instrumented with info_span + Instrument for W3C trace propagation - libs/session_manager: slog → tracing dependency - libs/config: add redis_url() singleton helper for adminrpc
This commit is contained in:
parent
1e6ba34827
commit
beae9bdea0
@ -1,6 +1,14 @@
|
||||
use crate::AppConfig;
|
||||
|
||||
impl AppConfig {
|
||||
/// Returns a single Redis URL (first from APP_REDIS_URLS or APP_REDIS_URL).
|
||||
pub fn redis_url(&self) -> anyhow::Result<String> {
|
||||
let urls = self.redis_urls()?;
|
||||
urls.into_iter().next().ok_or_else(|| {
|
||||
anyhow::anyhow!("APP_REDIS_URLS or APP_REDIS_URL is empty")
|
||||
})
|
||||
}
|
||||
|
||||
pub fn redis_urls(&self) -> anyhow::Result<Vec<String>> {
|
||||
if let Some(urls) = self.env.get("APP_REDIS_URLS") {
|
||||
return Ok(urls.split(',').map(|s| s.trim().to_string()).collect());
|
||||
|
||||
@ -19,12 +19,13 @@ name = "rpc"
|
||||
tonic = { workspace = true }
|
||||
prost = { workspace = true }
|
||||
prost-types = { workspace = true }
|
||||
tonic-prost = "0.14.5"
|
||||
|
||||
# Internal
|
||||
session_manager = { workspace = true }
|
||||
|
||||
# Logging
|
||||
slog = { workspace = true }
|
||||
# Logging / Tracing
|
||||
tracing = { workspace = true }
|
||||
|
||||
# Utilities
|
||||
anyhow = { workspace = true }
|
||||
|
||||
@ -0,0 +1,4 @@
|
||||
pub mod client;
|
||||
pub mod generated;
|
||||
pub mod server;
|
||||
pub mod types;
|
||||
@ -4,6 +4,7 @@ use session_manager::SessionManager;
|
||||
use std::net::SocketAddr;
|
||||
use tokio::sync::broadcast;
|
||||
use tonic::{transport::Server, Request, Response, Status};
|
||||
use tracing::{info_span, Instrument};
|
||||
|
||||
use super::generated::admin::{
|
||||
GetUserInfoRequest, GetUserInfoResponse, GetUserStatusRequest, GetUserStatusResponse,
|
||||
@ -37,14 +38,18 @@ impl SessionAdmin for SessionAdminService {
|
||||
let workspace_id = parse_uuid(&req.get_ref().workspace_id)
|
||||
.ok_or_else(|| Status::invalid_argument("invalid workspace_id"))?;
|
||||
|
||||
let sessions = self
|
||||
.session_manager
|
||||
.get_workspace_sessions(&workspace_id)
|
||||
.await
|
||||
.map_err(|e| Status::internal(e.to_string()))?;
|
||||
async {
|
||||
let sessions = self
|
||||
.session_manager
|
||||
.get_workspace_sessions(&workspace_id)
|
||||
.await
|
||||
.map_err(|e| Status::internal(e.to_string()))?;
|
||||
|
||||
let sessions = sessions.iter().map(to_proto_session).collect();
|
||||
Ok(Response::new(ListWorkspaceSessionsResponse { sessions }))
|
||||
let sessions = sessions.iter().map(to_proto_session).collect();
|
||||
Ok(Response::new(ListWorkspaceSessionsResponse { sessions }))
|
||||
}
|
||||
.instrument(info_span!("list_workspace_sessions", workspace_id = %workspace_id))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn list_user_sessions(
|
||||
@ -54,14 +59,18 @@ impl SessionAdmin for SessionAdminService {
|
||||
let user_id = parse_uuid(&req.get_ref().user_id)
|
||||
.ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
|
||||
|
||||
let sessions = self
|
||||
.session_manager
|
||||
.get_user_sessions(&user_id)
|
||||
.await
|
||||
.map_err(|e| Status::internal(e.to_string()))?;
|
||||
async {
|
||||
let sessions = self
|
||||
.session_manager
|
||||
.get_user_sessions(&user_id)
|
||||
.await
|
||||
.map_err(|e| Status::internal(e.to_string()))?;
|
||||
|
||||
let sessions = sessions.iter().map(to_proto_session).collect();
|
||||
Ok(Response::new(ListUserSessionsResponse { sessions }))
|
||||
let sessions = sessions.iter().map(to_proto_session).collect();
|
||||
Ok(Response::new(ListUserSessionsResponse { sessions }))
|
||||
}
|
||||
.instrument(info_span!("list_user_sessions", user_id = %user_id))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn kick_user_from_workspace(
|
||||
@ -74,15 +83,19 @@ impl SessionAdmin for SessionAdminService {
|
||||
let workspace_id = parse_uuid(&r.workspace_id)
|
||||
.ok_or_else(|| Status::invalid_argument("invalid workspace_id"))?;
|
||||
|
||||
let kicked_count = self
|
||||
.session_manager
|
||||
.kick_user_from_workspace(&user_id, &workspace_id)
|
||||
.await
|
||||
.map_err(|e| Status::internal(e.to_string()))?;
|
||||
async {
|
||||
let kicked_count = self
|
||||
.session_manager
|
||||
.kick_user_from_workspace(&user_id, &workspace_id)
|
||||
.await
|
||||
.map_err(|e| Status::internal(e.to_string()))?;
|
||||
|
||||
Ok(Response::new(KickUserFromWorkspaceResponse {
|
||||
kicked_count: kicked_count as u32,
|
||||
}))
|
||||
Ok(Response::new(KickUserFromWorkspaceResponse {
|
||||
kicked_count: kicked_count as u32,
|
||||
}))
|
||||
}
|
||||
.instrument(info_span!("kick_user_from_workspace", user_id = %user_id, workspace_id = %workspace_id))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn kick_user(
|
||||
@ -92,15 +105,19 @@ impl SessionAdmin for SessionAdminService {
|
||||
let user_id = parse_uuid(&req.get_ref().user_id)
|
||||
.ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
|
||||
|
||||
let kicked_count = self
|
||||
.session_manager
|
||||
.kick_user(&user_id)
|
||||
.await
|
||||
.map_err(|e| Status::internal(e.to_string()))?;
|
||||
async {
|
||||
let kicked_count = self
|
||||
.session_manager
|
||||
.kick_user(&user_id)
|
||||
.await
|
||||
.map_err(|e| Status::internal(e.to_string()))?;
|
||||
|
||||
Ok(Response::new(KickUserResponse {
|
||||
kicked_count: kicked_count as u32,
|
||||
}))
|
||||
Ok(Response::new(KickUserResponse {
|
||||
kicked_count: kicked_count as u32,
|
||||
}))
|
||||
}
|
||||
.instrument(info_span!("kick_user", user_id = %user_id))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_user_status(
|
||||
@ -110,15 +127,19 @@ impl SessionAdmin for SessionAdminService {
|
||||
let user_id = parse_uuid(&req.get_ref().user_id)
|
||||
.ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
|
||||
|
||||
let status = self
|
||||
.session_manager
|
||||
.get_user_status(&user_id)
|
||||
.await
|
||||
.map_err(|e| Status::internal(e.to_string()))?;
|
||||
async {
|
||||
let status = self
|
||||
.session_manager
|
||||
.get_user_status(&user_id)
|
||||
.await
|
||||
.map_err(|e| Status::internal(e.to_string()))?;
|
||||
|
||||
Ok(Response::new(GetUserStatusResponse {
|
||||
status: to_proto_status(status) as i32,
|
||||
}))
|
||||
Ok(Response::new(GetUserStatusResponse {
|
||||
status: to_proto_status(status) as i32,
|
||||
}))
|
||||
}
|
||||
.instrument(info_span!("get_user_status", user_id = %user_id))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_user_info(
|
||||
@ -128,15 +149,19 @@ impl SessionAdmin for SessionAdminService {
|
||||
let user_id = parse_uuid(&req.get_ref().user_id)
|
||||
.ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
|
||||
|
||||
let info = self
|
||||
.session_manager
|
||||
.get_user_info(&user_id)
|
||||
.await
|
||||
.map_err(|e| Status::internal(e.to_string()))?;
|
||||
async {
|
||||
let info = self
|
||||
.session_manager
|
||||
.get_user_info(&user_id)
|
||||
.await
|
||||
.map_err(|e| Status::internal(e.to_string()))?;
|
||||
|
||||
Ok(Response::new(GetUserInfoResponse {
|
||||
info: info.as_ref().map(to_proto_info),
|
||||
}))
|
||||
Ok(Response::new(GetUserInfoResponse {
|
||||
info: info.as_ref().map(to_proto_info),
|
||||
}))
|
||||
}
|
||||
.instrument(info_span!("get_user_info", user_id = %user_id))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_workspace_online_users(
|
||||
@ -146,14 +171,18 @@ impl SessionAdmin for SessionAdminService {
|
||||
let workspace_id = parse_uuid(&req.get_ref().workspace_id)
|
||||
.ok_or_else(|| Status::invalid_argument("invalid workspace_id"))?;
|
||||
|
||||
let user_ids = self
|
||||
.session_manager
|
||||
.get_workspace_online_users(&workspace_id)
|
||||
.await
|
||||
.map_err(|e| Status::internal(e.to_string()))?;
|
||||
async {
|
||||
let user_ids = self
|
||||
.session_manager
|
||||
.get_workspace_online_users(&workspace_id)
|
||||
.await
|
||||
.map_err(|e| Status::internal(e.to_string()))?;
|
||||
|
||||
let user_ids = user_ids.iter().map(|u| u.to_string()).collect();
|
||||
Ok(Response::new(GetWorkspaceOnlineUsersResponse { user_ids }))
|
||||
let user_ids = user_ids.iter().map(|u| u.to_string()).collect();
|
||||
Ok(Response::new(GetWorkspaceOnlineUsersResponse { user_ids }))
|
||||
}
|
||||
.instrument(info_span!("get_workspace_online_users", workspace_id = %workspace_id))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn is_user_online(
|
||||
@ -163,13 +192,17 @@ impl SessionAdmin for SessionAdminService {
|
||||
let user_id = parse_uuid(&req.get_ref().user_id)
|
||||
.ok_or_else(|| Status::invalid_argument("invalid user_id"))?;
|
||||
|
||||
let online = self
|
||||
.session_manager
|
||||
.is_user_online(&user_id)
|
||||
.await
|
||||
.map_err(|e| Status::internal(e.to_string()))?;
|
||||
async {
|
||||
let online = self
|
||||
.session_manager
|
||||
.is_user_online(&user_id)
|
||||
.await
|
||||
.map_err(|e| Status::internal(e.to_string()))?;
|
||||
|
||||
Ok(Response::new(IsUserOnlineResponse { online }))
|
||||
Ok(Response::new(IsUserOnlineResponse { online }))
|
||||
}
|
||||
.instrument(info_span!("is_user_online", user_id = %user_id))
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -26,7 +26,7 @@ serde_json = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["rt-multi-thread", "sync"] }
|
||||
uuid = { workspace = true, features = ["serde", "v4"] }
|
||||
slog = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
Loading…
Reference in New Issue
Block a user