diff --git a/libs/agent/client.rs b/libs/agent/client.rs index 058bde7..62efea8 100644 --- a/libs/agent/client.rs +++ b/libs/agent/client.rs @@ -14,22 +14,19 @@ use async_openai::types::chat::{ use std::time::Instant; use crate::error::{AgentError, Result}; -use slog::warn; /// Configuration for the AI client. #[derive(Clone)] pub struct AiClientConfig { pub api_key: String, pub base_url: Option, - pub logger: slog::Logger, } impl AiClientConfig { - pub fn new(api_key: String, logger: slog::Logger) -> Self { + pub fn new(api_key: String) -> Self { Self { api_key, base_url: None, - logger, } } @@ -175,12 +172,13 @@ pub async fn call_with_retry( Err(err) => { if state.should_retry() && is_retryable_error(&err) { let duration = state.backoff_duration(); - warn!(config.logger, "ai_call_retry"; - "attempt" => state.attempt + 1, - "max_retries" => state.max_retries, - "backoff_ms" => duration.as_millis() as u64, - "model" => %model, - "error" => %err.to_string() + tracing::warn!( + attempt = state.attempt + 1, + max_retries = state.max_retries, + backoff_ms = duration.as_millis() as u64, + model = %model, + error = %err.to_string(), + "ai_call_retry" ); tokio::time::sleep(duration).await; state.next(); @@ -243,12 +241,13 @@ pub async fn call_with_params( Err(err) => { if state.should_retry() && is_retryable_error(&err) { let duration = state.backoff_duration(); - warn!(config.logger, "ai_call_retry"; - "attempt" => state.attempt + 1, - "max_retries" => state.max_retries, - "backoff_ms" => duration.as_millis() as u64, - "model" => %model, - "error" => %err.to_string() + tracing::warn!( + attempt = state.attempt + 1, + max_retries = state.max_retries, + backoff_ms = duration.as_millis() as u64, + model = %model, + error = %err.to_string(), + "ai_call_retry" ); tokio::time::sleep(duration).await; state.next(); diff --git a/libs/api/room/ws.rs b/libs/api/room/ws.rs index 79e197b..bf165d1 100644 --- a/libs/api/room/ws.rs +++ b/libs/api/room/ws.rs @@ -31,11 +31,11 @@ async fn authenticate_ws_request( }) { match service.ws_token.validate_token(token).await { Ok(uid) => { - slog::debug!(service.logs, "WS: token auth successful for uid={}", uid); + tracing::debug!(uid = %uid, "WS: token auth successful"); return Ok(uid); } Err(_) => { - slog::warn!(service.logs, "WS: token auth failed"); + tracing::warn!("WS: token auth failed"); service .room .room_manager @@ -64,7 +64,6 @@ async fn authenticate_ws_request( } async fn check_ws_rate_limit( - log: &slog::Logger, manager: &Arc, message_count: &mut u32, rate_window_start: &mut Instant, @@ -75,7 +74,7 @@ async fn check_ws_rate_limit( } *message_count += 1; if *message_count > MAX_MESSAGES_PER_SECOND { - slog::warn!(log, "WS rate limit exceeded"); + tracing::warn!("WS rate limit exceeded"); manager.metrics.ws_rate_limit_hits.increment(1); true } else { @@ -351,21 +350,19 @@ pub async fn ws_room( .get("origin") .and_then(|v| v.to_str().ok()) .unwrap_or("(none)"); - slog::debug!( - service.logs, - "WS room connection attempt user_id={} room_id={} origin={}", - user_id, - room_id, - origin_val + tracing::debug!( + user_id = %user_id, + room_id = %room_id, + origin = %origin_val, + "WS room connection attempt" ); if !validate_origin(&req) { - slog::warn!( - service.logs, - "WS room: origin rejected user_id={} room_id={} origin={}", - user_id, - room_id, - origin_val + tracing::warn!( + user_id = %user_id, + room_id = %room_id, + origin = %origin_val, + "WS room: origin rejected" ); service .room @@ -380,12 +377,11 @@ pub async fn ws_room( } if let Err(e) = service.room.check_room_access(room_id, user_id).await { - slog::warn!( - service.logs, - "WS room: access denied for user_id={} room_id={} error={}", - user_id, - room_id, - e + tracing::warn!( + user_id = %user_id, + room_id = %room_id, + error = ?e, + "WS room: access denied" ); return Err(crate::error::ApiError::from(e).into()); } @@ -401,7 +397,7 @@ pub async fn ws_room( let mut receiver = match manager.subscribe(room_id, user_id).await { Ok(r) => r, Err(e) => { - slog::error!(service.logs, "Failed to subscribe to room: {}", e); + tracing::error!(error = ?e, "Failed to subscribe to room"); return; } }; @@ -420,14 +416,14 @@ pub async fn ws_room( tokio::select! { _ = heartbeat_interval.tick() => { if last_heartbeat.elapsed() > HEARTBEAT_TIMEOUT { - slog::warn!(service.logs, "WS room {} heartbeat timeout for user {}", room_id, user_id); + tracing::warn!(room_id = %room_id, user_id = %user_id, "WS room heartbeat timeout"); manager.metrics.ws_heartbeat_timeout_total.increment(1); let _ = session.close(Some(actix_ws::CloseCode::Policy.into())).await; break; } if last_activity.elapsed() > MAX_IDLE_TIMEOUT { - slog::info!(service.logs, "WS room {} idle timeout for user {}", room_id, user_id); + tracing::info!(room_id = %room_id, user_id = %user_id, "WS room idle timeout"); manager.metrics.ws_idle_timeout_total.increment(1); let _ = session.close(Some(actix_ws::CloseCode::Normal.into())).await; break; @@ -439,7 +435,7 @@ pub async fn ws_room( manager.metrics.ws_heartbeat_sent_total.increment(1); } _ = shutdown_rx.recv() => { - slog::info!(service.logs, "WS room {} shutdown", room_id); + tracing::info!(room_id = %room_id, "WS room shutdown"); let _ = session.close(Some(actix_ws::CloseCode::Normal.into())).await; break; } @@ -457,13 +453,13 @@ pub async fn ws_room( #[allow(unused_assignments)] Some(Ok(WsMessage::Text(text))) => { if last_activity.elapsed() > MAX_IDLE_TIMEOUT { - slog::info!(service.logs, "WS room {} idle timeout for user {}", room_id, user_id); + tracing::info!(room_id = %room_id, user_id = %user_id, "WS room idle timeout"); manager.metrics.ws_idle_timeout_total.increment(1); let _ = session.close(Some(actix_ws::CloseCode::Normal.into())).await; break; } last_activity = Instant::now(); - if check_ws_rate_limit(&service.logs, &manager, &mut message_count, &mut rate_window_start).await { + if check_ws_rate_limit(&manager, &mut message_count, &mut rate_window_start).await { let _ = session.text(serde_json::json!({ "type": "error", "error": "rate_limit_exceeded", @@ -473,7 +469,7 @@ pub async fn ws_room( } if text.len() > MAX_TEXT_MESSAGE_LEN { - slog::warn!(service.logs, "WS room {} message too long from user {}: {} bytes", room_id, user_id, text.len()); + tracing::warn!(room_id = %room_id, user_id = %user_id, bytes = text.len(), "WS room message too long"); let _ = session.text(serde_json::json!({ "type": "error", "error": "message_too_long", @@ -482,7 +478,7 @@ pub async fn ws_room( break; } - slog::warn!(service.logs, "WS room {} unexpected text message from user {} ({} bytes) — WS is push-only, use REST to send messages", room_id, user_id, text.len()); + tracing::warn!(room_id = %room_id, user_id = %user_id, bytes = text.len(), "WS room unexpected text message — WS is push-only, use REST to send messages"); let _ = session.text(serde_json::json!({ "type": "error", "error": "ws_push_only", @@ -491,10 +487,10 @@ pub async fn ws_room( break; } Some(Ok(WsMessage::Binary(_))) => { - if check_ws_rate_limit(&service.logs, &manager, &mut message_count, &mut rate_window_start).await { + if check_ws_rate_limit(&manager, &mut message_count, &mut rate_window_start).await { break; } - slog::warn!(service.logs, "WS room {} unexpected binary from user {}", room_id, user_id); + tracing::warn!(room_id = %room_id, user_id = %user_id, "WS room unexpected binary"); break; } Some(Ok(WsMessage::Close(reason))) => { @@ -503,7 +499,7 @@ pub async fn ws_room( } Some(Ok(_)) => {} Some(Err(e)) => { - slog::warn!(service.logs, "WS room error: {}", e); + tracing::warn!(error = ?e, "WS room error"); break; } None => break, @@ -525,7 +521,7 @@ pub async fn ws_room( } } Err(e) => { - slog::error!(service.logs, "WS serialize error: {}", e); + tracing::error!(error = ?e, "WS serialize error"); break; } } @@ -549,7 +545,7 @@ pub async fn ws_room( } } Err(e) => { - slog::error!(service.logs, "WS streaming serialize error: {}", e); + tracing::error!(error = ?e, "WS streaming serialize error"); } } } @@ -627,7 +623,7 @@ pub async fn ws_project( let mut receiver = match manager.subscribe_project(project_id, user_id).await { Ok(r) => r, Err(e) => { - slog::error!(service.logs, "Failed to subscribe to project: {}", e); + tracing::error!(error = ?e, "Failed to subscribe to project"); return; } }; @@ -645,14 +641,14 @@ pub async fn ws_project( tokio::select! { _ = heartbeat_interval.tick() => { if last_heartbeat.elapsed() > HEARTBEAT_TIMEOUT { - slog::warn!(service.logs, "WS project {} heartbeat timeout for user {}", project_id, user_id); + tracing::warn!(project_id = %project_id, user_id = %user_id, "WS project heartbeat timeout"); manager.metrics.ws_heartbeat_timeout_total.increment(1); let _ = session.close(Some(actix_ws::CloseCode::Policy.into())).await; break; } if last_activity.elapsed() > MAX_IDLE_TIMEOUT { - slog::info!(service.logs, "WS project {} idle timeout for user {}", project_id, user_id); + tracing::info!(project_id = %project_id, user_id = %user_id, "WS project idle timeout"); manager.metrics.ws_idle_timeout_total.increment(1); let _ = session.close(Some(actix_ws::CloseCode::Normal.into())).await; break; @@ -664,7 +660,7 @@ pub async fn ws_project( manager.metrics.ws_heartbeat_sent_total.increment(1); } _ = shutdown_rx.recv() => { - slog::info!(service.logs, "WS project {} shutdown", project_id); + tracing::info!(project_id = %project_id, "WS project shutdown"); let _ = session.close(Some(actix_ws::CloseCode::Normal.into())).await; break; } @@ -682,13 +678,13 @@ pub async fn ws_project( #[allow(unused_assignments)] Some(Ok(WsMessage::Text(text))) => { if last_activity.elapsed() > MAX_IDLE_TIMEOUT { - slog::info!(service.logs, "WS project {} idle timeout for user {}", project_id, user_id); + tracing::info!(project_id = %project_id, user_id = %user_id, "WS project idle timeout"); manager.metrics.ws_idle_timeout_total.increment(1); let _ = session.close(Some(actix_ws::CloseCode::Normal.into())).await; break; } last_activity = Instant::now(); - slog::warn!(service.logs, "WS project {} unexpected text from user {} ({} bytes) — WS is push-only", project_id, user_id, text.len()); + tracing::warn!(project_id = %project_id, user_id = %user_id, bytes = text.len(), "WS project unexpected text — WS is push-only"); let _ = session.text(serde_json::json!({ "type": "error", "error": "ws_push_only", @@ -697,8 +693,8 @@ pub async fn ws_project( break; } Some(Ok(WsMessage::Binary(_))) => { - if check_ws_rate_limit(&service.logs, &manager, &mut message_count, &mut rate_window_start).await { - slog::warn!(service.logs, "WS project {} rate limit exceeded for user {}", project_id, user_id); + if check_ws_rate_limit(&manager, &mut message_count, &mut rate_window_start).await { + tracing::warn!(project_id = %project_id, user_id = %user_id, "WS project rate limit exceeded"); let _ = session.text(serde_json::json!({ "type": "error", "error": "rate_limit_exceeded", @@ -706,7 +702,7 @@ pub async fn ws_project( }).to_string()).await; break; } - slog::warn!(service.logs, "WS project {} unexpected binary from user {}", project_id, user_id); + tracing::warn!(project_id = %project_id, user_id = %user_id, "WS project unexpected binary"); break; } Some(Ok(WsMessage::Close(reason))) => { @@ -715,7 +711,7 @@ pub async fn ws_project( } Some(Ok(_)) => {} Some(Err(e)) => { - slog::warn!(service.logs, "WS project error: {}", e); + tracing::warn!(error = ?e, "WS project error"); break; } None => break, @@ -737,7 +733,7 @@ pub async fn ws_project( } } Err(e) => { - slog::error!(service.logs, "WS serialize error: {}", e); + tracing::error!(error = ?e, "WS serialize error"); break; } } diff --git a/libs/api/room/ws_universal.rs b/libs/api/room/ws_universal.rs index bd8dfc1..bc8ffc6 100644 --- a/libs/api/room/ws_universal.rs +++ b/libs/api/room/ws_universal.rs @@ -62,10 +62,9 @@ pub async fn ws_universal( .and_then(|v| v.to_str().ok()) .unwrap_or("(none)"); if !validate_origin(&req) { - slog::warn!( - service.logs, - "WS universal: origin rejected origin={}", - origin_val + tracing::warn!( + origin = %origin_val, + "WS universal: origin rejected" ); return Err(ApiError(service::error::AppError::BadRequest( "Invalid origin".into(), @@ -83,28 +82,25 @@ pub async fn ws_universal( .find(|p| p.starts_with("token=")) .and_then(|p| p.split('=').nth(1)) }) { - slog::info!( - service.logs, - "WS universal: validating token token={} origin={}", - token, - origin_val + tracing::info!( + token = %token, + origin = %origin_val, + "WS universal: validating token" ); match service.ws_token.validate_token(token).await { Ok(uid) => { - slog::info!( - service.logs, - "WS universal: token auth successful uid={} origin={}", - uid, - origin_val + tracing::info!( + uid = %uid, + origin = %origin_val, + "WS universal: token auth successful" ); uid } Err(e) => { - slog::warn!( - service.logs, - "WS universal: token auth failed: {:?} token={}", - e, - token + tracing::warn!( + error = ?e, + token = %token, + "WS universal: token auth failed" ); service .room @@ -147,11 +143,10 @@ pub async fn ws_universal( } }; - slog::debug!( - service.logs, - "WS universal connection established user_id={} origin={}", - user_id, - origin_val + tracing::debug!( + user_id = %user_id, + origin = %origin_val, + "WS universal connection established" ); let service = service.get_ref().clone(); @@ -159,7 +154,6 @@ pub async fn ws_universal( manager.metrics.ws_connections_active.increment(1.0); manager.metrics.ws_connections_total.increment(1); - let logs = service.logs.clone(); let (response, mut session, mut msg_stream) = actix_ws::handle(&req, stream)?; actix::spawn(async move { let handler = WsRequestHandler::new(Arc::new(service), user_id); @@ -175,13 +169,13 @@ pub async fn ws_universal( tokio::select! { _ = heartbeat_interval.tick() => { if last_heartbeat.elapsed() > HEARTBEAT_TIMEOUT { - slog::warn!(logs, "WS universal heartbeat timeout for user {}", user_id); + tracing::warn!(user_id = %user_id, "WS universal heartbeat timeout"); manager.metrics.ws_heartbeat_timeout_total.increment(1); let _ = session.close(Some(actix_ws::CloseCode::Policy.into())).await; break; } if last_activity.elapsed() > MAX_IDLE_TIMEOUT { - slog::info!(logs, "WS universal idle timeout for user {}", user_id); + tracing::info!(user_id = %user_id, "WS universal idle timeout"); manager.metrics.ws_idle_timeout_total.increment(1); let _ = session.close(Some(actix_ws::CloseCode::Normal.into())).await; break; @@ -192,7 +186,7 @@ pub async fn ws_universal( manager.metrics.ws_heartbeat_sent_total.increment(1); } _ = shutdown_rx.recv() => { - slog::info!(logs, "WS universal shutdown"); + tracing::info!("WS universal shutdown"); let _ = session.close(Some(actix_ws::CloseCode::Normal.into())).await; break; } @@ -264,7 +258,7 @@ pub async fn ws_universal( Some(Ok(WsMessage::Pong(_))) => { last_heartbeat = Instant::now(); } Some(Ok(WsMessage::Text(text))) => { if last_activity.elapsed() > MAX_IDLE_TIMEOUT { - slog::info!(logs, "WS universal idle timeout for user {}", user_id); + tracing::info!(user_id = %user_id, "WS universal idle timeout"); manager.metrics.ws_idle_timeout_total.increment(1); let _ = session.close(Some(actix_ws::CloseCode::Normal.into())).await; break; @@ -277,14 +271,14 @@ pub async fn ws_universal( } message_count += 1; if message_count > MAX_MESSAGES_PER_SECOND { - slog::warn!(logs, "WS universal rate limit exceeded for user {}", user_id); + tracing::warn!(user_id = %user_id, "WS universal rate limit exceeded"); manager.metrics.ws_rate_limit_hits.increment(1); let _ = session.text(serde_json::json!({"type":"error","error":"rate_limit_exceeded"}).to_string()).await; continue; } if text.len() > MAX_TEXT_MESSAGE_LEN { - slog::warn!(logs, "WS universal message too long from user {}: {} bytes", user_id, text.len()); + tracing::warn!(user_id = %user_id, bytes = text.len(), "WS universal message too long"); let _ = session.text(serde_json::json!({"type":"error","error":"message_too_long"}).to_string()).await; continue; } @@ -351,7 +345,7 @@ pub async fn ws_universal( } } Err(e) => { - slog::warn!(logs, "WS universal parse error from user {}: {}", user_id, e); + tracing::warn!(user_id = %user_id, error = %e, "WS universal parse error"); let _ = session.text(serde_json::json!({"type":"error","error":"parse_error"}).to_string()).await; } } @@ -360,7 +354,7 @@ pub async fn ws_universal( Some(Ok(WsMessage::Continuation(_))) => {} Some(Ok(WsMessage::Nop)) => {} Some(Ok(WsMessage::Close(reason))) => { let _ = session.close(reason).await; break; } - Some(Err(e)) => { slog::warn!(logs, "WS error: {}", e); break; } + Some(Err(e)) => { tracing::warn!(error = %e, "WS error"); break; } None => break, } }