From 57779822dc0b2d530fc90cacfc73ede43ea8253c Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Tue, 21 Apr 2026 22:28:52 +0800 Subject: [PATCH] refactor(room): migrate from slog to tracing + upgrade metrics to 0.22 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove all use slog::* imports and log: slog::Logger fields - Replace slog macros with tracing::{info!, warn!, error!, debug!} - metrics.rs: upgrade metrics 0.21→0.22, remove register_*! macros, use functional API: metrics::gauge!(), metrics::counter!(), metrics::histogram!(), metrics::describe_gauge!() etc. - RoomMetrics: all fields now use functional metrics API, dynamic room_id labels passed as owned String to avoid lifetime issues - RoomService: remove pub log: slog::Logger field - connection.rs: remove log from subscribe_room_events, subscribe_project_room_events, subscribe_task_events_fn --- libs/room/Cargo.toml | 4 +- libs/room/src/connection.rs | 60 +++++++++++---------------- libs/room/src/member.rs | 10 ++--- libs/room/src/message.rs | 13 +++--- libs/room/src/metrics.rs | 76 +++++++++++++--------------------- libs/room/src/room.rs | 14 +++---- libs/room/src/room_ai_queue.rs | 24 ++++------- libs/room/src/service.rs | 63 ++++++++-------------------- 8 files changed, 99 insertions(+), 165 deletions(-) diff --git a/libs/room/Cargo.toml b/libs/room/Cargo.toml index 659e035..c3289e7 100644 --- a/libs/room/Cargo.toml +++ b/libs/room/Cargo.toml @@ -26,7 +26,7 @@ config = { path = "../config" } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } -slog = { workspace = true } +tracing = { workspace = true } chrono = { workspace = true, features = ["serde"] } uuid = { workspace = true, features = ["serde", "v7", "v4"] } sea-orm = { workspace = true } @@ -37,7 +37,7 @@ tokio-stream = { workspace = true } futures = { workspace = true } deadpool-redis = { workspace = true, features = ["rt_tokio_1", "cluster-async", "cluster"] } utoipa = { workspace = true, features = ["uuid", "chrono"] } -metrics = "0.21" +metrics = "0.22" regex-lite = "0.1.6" redis = { workspace = true, features = ["tokio-comp", "connection-manager"] } async-openai = { workspace = true } diff --git a/libs/room/src/connection.rs b/libs/room/src/connection.rs index 545eb6e..3b44211 100644 --- a/libs/room/src/connection.rs +++ b/libs/room/src/connection.rs @@ -783,7 +783,6 @@ fn start_pubsub_thread( channel: String, relay_tx: tokio::sync::mpsc::Sender>, mut shutdown_rx: broadcast::Receiver<()>, - log: slog::Logger, _on_msg: F, ) where F: Fn(Vec) -> Fut + Send + Sync + 'static, @@ -800,14 +799,14 @@ fn start_pubsub_thread( let redis_url = redis_url.clone(); loop { if shutdown_rx.try_recv().is_ok() { - slog::info!(log, "pubsub thread shutting down before connect"; "channel" => %channel); + tracing::info!(channel = %channel, "pubsub thread shutting down before connect"); break; } let client = match redis::Client::open(redis_url.as_str()) { Ok(c) => c, Err(e) => { - slog::error!(log, "pubsub redis client open failed"; "channel" => %channel, "error" => %e); + tracing::error!(channel = %channel, error = %e, "pubsub redis client open failed"); thread::sleep(Duration::from_secs(1)); continue; } @@ -816,16 +815,16 @@ fn start_pubsub_thread( let mut pubsub = match client.get_async_pubsub().await { Ok(p) => p, Err(e) => { - slog::error!(log, "pubsub connection failed"; "channel" => %channel, "error" => %e); + tracing::error!(channel = %channel, error = %e, "pubsub connection failed"); thread::sleep(Duration::from_secs(1)); continue; } }; match pubsub.subscribe(&channel).await { - Ok(_) => slog::info!(log, "pubsub subscribed"; "channel" => %channel), + Ok(_) => tracing::info!(channel = %channel, "pubsub subscribed"), Err(e) => { - slog::error!(log, "pubsub subscribe failed"; "channel" => %channel, "error" => %e); + tracing::error!(channel = %channel, error = %e, "pubsub subscribe failed"); thread::sleep(Duration::from_secs(1)); continue; } @@ -835,7 +834,7 @@ fn start_pubsub_thread( loop { if shutdown_rx.try_recv().is_ok() { - slog::info!(log, "pubsub thread shutting down"; "channel" => %channel); + tracing::info!(channel = %channel, "pubsub thread shutting down"); return; } @@ -848,21 +847,21 @@ fn start_pubsub_thread( match msg { Ok(Some(msg)) => { let payload = msg.get_payload_bytes(); - slog::debug!(log, "pubsub received"; "channel" => %channel, "len" => payload.len()); + tracing::debug!(channel = %channel, len = payload.len(), "pubsub received"); if relay_tx.send(payload.to_vec()).await.is_err() { - slog::warn!(log, "pubsub relay channel closed"; "channel" => %channel); + tracing::warn!(channel = %channel, "pubsub relay channel closed"); return; } } Ok(None) => { - slog::warn!(log, "pubsub stream ended, will reconnect"; "channel" => %channel); + tracing::warn!(channel = %channel, "pubsub stream ended, will reconnect"); break; } Err(_) => {} } } - slog::warn!(log, "pubsub connection lost, reconnecting"; "channel" => %channel); + tracing::warn!(channel = %channel, "pubsub connection lost, reconnecting"); } }); }) @@ -873,15 +872,13 @@ pub async fn subscribe_room_events( redis_url: String, manager: Arc, room_id: Uuid, - log: slog::Logger, mut shutdown_rx: broadcast::Receiver<()>, ) { let channel = format!("room:pub:{}", room_id); let (tx, mut rx) = tokio::sync::mpsc::channel::>(1024); - slog::info!(log, "starting room pubsub subscriber"; "room_id" => %room_id, "channel" => %channel); + tracing::info!(room_id = %room_id, channel = %channel, "starting room pubsub subscriber"); - let thread_log = log.clone(); let thread_channel = channel.clone(); let thread_shutdown = shutdown_rx.resubscribe(); start_pubsub_thread( @@ -889,14 +886,13 @@ pub async fn subscribe_room_events( thread_channel, tx, thread_shutdown, - thread_log, |_| async {}, ); loop { tokio::select! { _ = shutdown_rx.recv() => { - slog::info!(log, "room subscriber shutting down"; "room_id" => %room_id); + tracing::info!(room_id = %room_id, "room subscriber shutting down"); break; } payload = rx.recv() => { @@ -907,34 +903,32 @@ pub async fn subscribe_room_events( manager.broadcast(room_id, event).await; } Err(e) => { - slog::warn!(log, "malformed RoomMessageEvent"; "error" => %e); + tracing::warn!(error = %e, "malformed RoomMessageEvent"); } } } None => { - slog::warn!(log, "pubsub relay channel closed"; "room_id" => %room_id); + tracing::warn!(room_id = %room_id, "pubsub relay channel closed"); break; } } } } } - slog::info!(log, "room subscriber stopped"; "room_id" => %room_id); + tracing::info!(room_id = %room_id, "room subscriber stopped"); } pub async fn subscribe_project_room_events( redis_url: String, manager: Arc, project_id: Uuid, - log: slog::Logger, mut shutdown_rx: broadcast::Receiver<()>, ) { let channel = format!("project:pub:{}", project_id); let (tx, mut rx) = tokio::sync::mpsc::channel::>(1024); - slog::info!(log, "starting project pubsub subscriber"; "project_id" => %project_id, "channel" => %channel); + tracing::info!(project_id = %project_id, channel = %channel, "starting project pubsub subscriber"); - let thread_log = log.clone(); let thread_channel = channel.clone(); let thread_shutdown = shutdown_rx.resubscribe(); start_pubsub_thread( @@ -942,14 +936,13 @@ pub async fn subscribe_project_room_events( thread_channel, tx, thread_shutdown, - thread_log, |_| async {}, ); loop { tokio::select! { _ = shutdown_rx.recv() => { - slog::info!(log, "project subscriber shutting down"; "project_id" => %project_id); + tracing::info!(project_id = %project_id, "project subscriber shutting down"); break; } payload = rx.recv() => { @@ -960,19 +953,19 @@ pub async fn subscribe_project_room_events( manager.broadcast_project(project_id, event).await; } Err(e) => { - slog::warn!(log, "malformed ProjectRoomEvent"; "error" => %e); + tracing::warn!(error = %e, "malformed ProjectRoomEvent"); } } } None => { - slog::warn!(log, "project pubsub relay channel closed"; "project_id" => %project_id); + tracing::warn!(project_id = %project_id, "project pubsub relay channel closed"); break; } } } } } - slog::info!(log, "project subscriber stopped"; "project_id" => %project_id); + tracing::info!(project_id = %project_id, "project subscriber stopped"); } /// Subscribe to Redis Pub/Sub `task:pub:{project_id}` and relay events to @@ -981,15 +974,13 @@ pub async fn subscribe_task_events_fn( redis_url: String, manager: Arc, project_id: Uuid, - log: slog::Logger, mut shutdown_rx: broadcast::Receiver<()>, ) { let channel = format!("task:pub:{}", project_id); let (tx, mut rx) = tokio::sync::mpsc::channel::>(1024); - slog::info!(log, "starting task pubsub subscriber"; "project_id" => %project_id, "channel" => %channel); + tracing::info!(project_id = %project_id, channel = %channel, "starting task pubsub subscriber"); - let thread_log = log.clone(); let thread_channel = channel.clone(); let thread_shutdown = shutdown_rx.resubscribe(); start_pubsub_thread( @@ -997,14 +988,13 @@ pub async fn subscribe_task_events_fn( thread_channel, tx, thread_shutdown, - thread_log, |_| async {}, ); loop { tokio::select! { _ = shutdown_rx.recv() => { - slog::info!(log, "task subscriber shutting down"; "project_id" => %project_id); + tracing::info!(project_id = %project_id, "task subscriber shutting down"); break; } payload = rx.recv() => { @@ -1015,17 +1005,17 @@ pub async fn subscribe_task_events_fn( manager.broadcast_agent_task(project_id, event).await; } Err(e) => { - slog::warn!(log, "malformed AgentTaskEvent"; "error" => %e); + tracing::warn!(error = %e, "malformed AgentTaskEvent"); } } } None => { - slog::warn!(log, "task pubsub relay channel closed"; "project_id" => %project_id); + tracing::warn!(project_id = %project_id, "task pubsub relay channel closed"); break; } } } } } - slog::info!(log, "task subscriber stopped"; "project_id" => %project_id); + tracing::info!(project_id = %project_id, "task subscriber stopped"); } diff --git a/libs/room/src/member.rs b/libs/room/src/member.rs index 2c4a641..139df2f 100644 --- a/libs/room/src/member.rs +++ b/libs/room/src/member.rs @@ -30,13 +30,13 @@ impl RoomService { .await { if let Ok(responses) = serde_json::from_str::>(&cached) { - slog::debug!(self.log, "room_member_list: cache hit for key={}", cache_key); + tracing::debug!(cache_key = %cache_key, "room_member_list: cache hit"); return Ok(responses); } } } - slog::debug!(self.log, "room_member_list: cache miss for key={}", cache_key); + tracing::debug!(cache_key = %cache_key, "room_member_list: cache miss"); let members = room_member::Entity::find() .filter(room_member::Column::Room.eq(room_id)) @@ -92,7 +92,7 @@ impl RoomService { .query_async(&mut conn) .await .inspect_err(|e| { - slog::warn!(self.log, "room_member_list: failed to cache key={}: {}", cache_key, e); + tracing::warn!(cache_key = %cache_key, error = %e, "room_member_list: failed to cache"); }) .ok(); } @@ -424,9 +424,9 @@ impl RoomService { .query_async::(&mut conn) .await { - slog::warn!(self.log, "invalidate_member_list_cache: DEL failed for {}: {}", cache_key, e); + tracing::warn!(cache_key = %cache_key, error = %e, "invalidate_member_list_cache: DEL failed"); } else { - slog::debug!(self.log, "invalidate_member_list_cache: deleted {}", cache_key); + tracing::debug!(cache_key = %cache_key, "invalidate_member_list_cache: deleted"); } } } diff --git a/libs/room/src/message.rs b/libs/room/src/message.rs index d35d759..e1808b2 100644 --- a/libs/room/src/message.rs +++ b/libs/room/src/message.rs @@ -233,11 +233,10 @@ impl RoomService { .exec(&self.db) .await { - slog::warn!( - self.log, - "Failed to link attachments to message {}: {}", - id, - e + tracing::warn!( + message_id = %id, + error = %e, + "Failed to link attachments to message" ); } } @@ -288,7 +287,7 @@ impl RoomService { let should_respond = match self.should_ai_respond(room_id).await { Ok(v) => v, Err(e) => { - slog::warn!(self.log, "should_ai_respond failed for room {}: {}", room_id, e); + tracing::warn!(room_id = %room_id, error = %e, "should_ai_respond failed"); false } }; @@ -302,7 +301,7 @@ impl RoomService { .process_message_ai(room_id, id, user_id, content.clone()) .await { - slog::warn!(self.log, "Failed to process AI message: {}", e); + tracing::warn!(error = %e, "Failed to process AI message"); } } diff --git a/libs/room/src/metrics.rs b/libs/room/src/metrics.rs index 145e97c..38f99b8 100644 --- a/libs/room/src/metrics.rs +++ b/libs/room/src/metrics.rs @@ -1,11 +1,6 @@ -use std::collections::HashMap; use std::sync::Arc; -use metrics::{ - describe_counter, describe_gauge, describe_histogram, register_counter, register_gauge, register_histogram, Counter, - Gauge, Histogram, Unit, -}; -use tokio::sync::RwLock; +use metrics::{describe_counter, describe_gauge, describe_histogram, Counter, Gauge, Histogram, Unit}; use uuid::Uuid; pub struct RoomMetrics { @@ -27,8 +22,6 @@ pub struct RoomMetrics { pub ws_heartbeat_sent_total: Counter, pub ws_heartbeat_timeout_total: Counter, pub ws_idle_timeout_total: Counter, - room_connections: RwLock>, - room_messages: RwLock>, } impl Default for RoomMetrics { @@ -119,26 +112,24 @@ impl Default for RoomMetrics { ); Self { - rooms_online: register_gauge!("room_online_rooms"), - users_online: register_gauge!("room_online_users"), - ws_connections_active: register_gauge!("room_ws_connections_active"), - ws_connections_total: register_counter!("room_ws_connections_total"), - ws_disconnections_total: register_counter!("room_ws_disconnections_total"), - messages_sent: register_counter!("room_messages_sent_total"), - messages_persisted: register_counter!("room_messages_persisted_total"), - messages_persist_failed: register_counter!("room_messages_persist_failed_total"), - broadcasts_sent: register_counter!("room_broadcasts_sent_total"), - broadcasts_dropped: register_counter!("room_broadcasts_dropped_total"), - duplicates_skipped: register_counter!("room_duplicates_skipped_total"), - redis_publish_failed: register_counter!("room_redis_publish_failed_total"), - message_latency_ms: register_histogram!("room_message_latency_ms"), - ws_rate_limit_hits: register_counter!("room_ws_rate_limit_hits_total"), - ws_auth_failures: register_counter!("room_ws_auth_failures_total"), - ws_heartbeat_sent_total: register_counter!("room_ws_heartbeat_sent_total"), - ws_heartbeat_timeout_total: register_counter!("room_ws_heartbeat_timeout_total"), - ws_idle_timeout_total: register_counter!("room_ws_idle_timeout_total"), - room_connections: RwLock::new(HashMap::new()), - room_messages: RwLock::new(HashMap::new()), + rooms_online: metrics::gauge!("room_online_rooms"), + users_online: metrics::gauge!("room_online_users"), + ws_connections_active: metrics::gauge!("room_ws_connections_active"), + ws_connections_total: metrics::counter!("room_ws_connections_total"), + ws_disconnections_total: metrics::counter!("room_ws_disconnections_total"), + messages_sent: metrics::counter!("room_messages_sent_total"), + messages_persisted: metrics::counter!("room_messages_persisted_total"), + messages_persist_failed: metrics::counter!("room_messages_persist_failed_total"), + broadcasts_sent: metrics::counter!("room_broadcasts_sent_total"), + broadcasts_dropped: metrics::counter!("room_broadcasts_dropped_total"), + duplicates_skipped: metrics::counter!("room_duplicates_skipped_total"), + redis_publish_failed: metrics::counter!("room_redis_publish_failed_total"), + message_latency_ms: metrics::histogram!("room_message_latency_ms"), + ws_rate_limit_hits: metrics::counter!("room_ws_rate_limit_hits_total"), + ws_auth_failures: metrics::counter!("room_ws_auth_failures_total"), + ws_heartbeat_sent_total: metrics::counter!("room_ws_heartbeat_sent_total"), + ws_heartbeat_timeout_total: metrics::counter!("room_ws_heartbeat_timeout_total"), + ws_idle_timeout_total: metrics::counter!("room_ws_idle_timeout_total"), } } } @@ -157,34 +148,23 @@ impl RoomMetrics { } pub async fn incr_room_connections(&self, room_id: Uuid) { - let mut map = self.room_connections.write().await; - let counter = map.entry(room_id).or_insert_with(|| { - register_gauge!(format!("room_connections{{room_id=\"{}\"}}", room_id)) - }); - counter.increment(1.0); + let name = format!("room_connections{{room_id=\"{}\"}}", room_id); + metrics::gauge!(name).increment(1.0); } pub async fn dec_room_connections(&self, room_id: Uuid) { - let map = self.room_connections.read().await; - if let Some(counter) = map.get(&room_id) { - counter.decrement(1.0); - } + let name = format!("room_connections{{room_id=\"{}\"}}", room_id); + metrics::gauge!(name).decrement(1.0); } pub async fn incr_room_messages(&self, room_id: Uuid) { - let mut map = self.room_messages.write().await; - let counter = map.entry(room_id).or_insert_with(|| { - register_counter!(format!("room_messages_total{{room_id=\"{}\"}}", room_id)) - }); - counter.increment(1); + let name = format!("room_messages_total{{room_id=\"{}\"}}", room_id); + metrics::counter!(name).increment(1); } - pub async fn cleanup_stale_rooms(&self, active_room_ids: &[Uuid]) { - let mut conn_map = self.room_connections.write().await; - conn_map.retain(|room_id, _| active_room_ids.contains(room_id)); - - let mut msg_map = self.room_messages.write().await; - msg_map.retain(|room_id, _| active_room_ids.contains(room_id)); + #[allow(dead_code)] + pub async fn cleanup_stale_rooms(&self, _active_room_ids: &[Uuid]) { + // Per-room metrics are registered on-demand; no cleanup needed. } pub fn into_arc(self) -> Arc { diff --git a/libs/room/src/room.rs b/libs/room/src/room.rs index 8f34fae..b0d73b2 100644 --- a/libs/room/src/room.rs +++ b/libs/room/src/room.rs @@ -39,13 +39,13 @@ impl RoomService { .await { if let Ok(responses) = serde_json::from_str::>(&cached) { - slog::debug!(self.log, "room_list: cache hit for key={}", cache_key); + tracing::debug!(cache_key = %cache_key, "room_list: cache hit"); return Ok(responses); } } } - slog::debug!(self.log, "room_list: cache miss for key={}", cache_key); + tracing::debug!(cache_key = %cache_key, "room_list: cache miss"); let mut query = room::Entity::find().filter(room::Column::Project.eq(project.id)); if only_public.unwrap_or(false) { @@ -102,7 +102,7 @@ impl RoomService { .query_async(&mut conn) .await .inspect_err(|e| { - slog::warn!(self.log, "room_list: failed to cache key={}: {}", cache_key, e); + tracing::warn!(cache_key = %cache_key, error = %e, "room_list: failed to cache"); }) .ok(); } @@ -364,7 +364,7 @@ impl RoomService { .query_async(&mut conn) .await .inspect_err(|e| { - slog::warn!(self.log, "room_delete: failed to DEL seq key {}: {}", seq_key, e); + tracing::warn!(seq_key = %seq_key, error = %e, "room_delete: failed to DEL seq key"); }) .ok(); } @@ -412,7 +412,7 @@ impl RoomService { { Ok(result) => result, Err(e) => { - slog::warn!(self.log, "invalidate_room_list_cache: SCAN failed: {}", e); + tracing::warn!(error = %e, "invalidate_room_list_cache: SCAN failed"); break; } }; @@ -426,9 +426,9 @@ impl RoomService { .query_async::(&mut conn) .await { - slog::warn!(self.log, "invalidate_room_list_cache: DEL failed: {}", e); + tracing::warn!(error = %e, "invalidate_room_list_cache: DEL failed"); } else { - slog::debug!(self.log, "invalidate_room_list_cache: deleted {} keys", keys.len()); + tracing::debug!(keys_count = keys.len(), "invalidate_room_list_cache: deleted"); } } diff --git a/libs/room/src/room_ai_queue.rs b/libs/room/src/room_ai_queue.rs index 6855262..f837fb9 100644 --- a/libs/room/src/room_ai_queue.rs +++ b/libs/room/src/room_ai_queue.rs @@ -15,7 +15,6 @@ pub struct RoomAiLockGuard { lock_token: String, request_uid: String, acquired: bool, - log: slog::Logger, } impl Drop for RoomAiLockGuard { @@ -29,7 +28,6 @@ impl Drop for RoomAiLockGuard { let lock_key = self.lock_key.clone(); let lock_token = self.lock_token.clone(); let request_uid = self.request_uid.clone(); - let log = self.log.clone(); tokio::spawn(async move { if let Err(e) = release_lock( &cache, @@ -41,12 +39,11 @@ impl Drop for RoomAiLockGuard { ) .await { - slog::warn!( - log, - "RoomAiLockGuard: failed to release lock key={} token={} err={}", - lock_key, - lock_token, - e + tracing::warn!( + lock_key = %lock_key, + lock_token = %lock_token, + error = %e, + "RoomAiLockGuard: failed to release lock" ); } }); @@ -56,7 +53,6 @@ impl Drop for RoomAiLockGuard { pub async fn acquire_room_ai_lock( cache: &AppCache, room_id: Uuid, - log: &slog::Logger, ) -> Result, RoomError> { let request_uid = Uuid::now_v7().to_string(); let hostname = hostname::get() @@ -103,11 +99,10 @@ pub async fn acquire_room_ai_lock( let mut retry_count: u32 = 0; loop { if start.elapsed().as_millis() as usize >= TICKET_TTL_MS { - slog::warn!( - log, - "RoomAiLock: timeout waiting for lock after {}ms, room_id={}", - start.elapsed().as_millis(), - room_id + tracing::warn!( + room_id = %room_id, + elapsed_ms = start.elapsed().as_millis(), + "RoomAiLock: timeout waiting for lock" ); return Ok(None); } @@ -153,7 +148,6 @@ pub async fn acquire_room_ai_lock( lock_token, request_uid, acquired: true, - log: log.clone(), })); } } else { diff --git a/libs/room/src/service.rs b/libs/room/src/service.rs index d7a5cd2..c383fab 100644 --- a/libs/room/src/service.rs +++ b/libs/room/src/service.rs @@ -59,7 +59,6 @@ pub struct RoomService { pub redis_url: String, pub chat_service: Option>, pub task_service: Option>, - pub log: slog::Logger, pub push_fn: Option, worker_semaphore: Arc, dedup_cache: DedupCache, @@ -75,7 +74,6 @@ impl RoomService { redis_url: String, chat_service: Option>, task_service: Option>, - log: slog::Logger, max_concurrent_workers: Option, push_fn: Option, ) -> Self { @@ -90,7 +88,6 @@ impl RoomService { redis_url, chat_service, task_service, - log, worker_semaphore: Arc::new(tokio::sync::Semaphore::new( max_concurrent_workers.unwrap_or(DEFAULT_MAX_CONCURRENT_WORKERS), )), @@ -102,7 +99,6 @@ impl RoomService { pub async fn start_workers( &self, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, - log: slog::Logger, ) -> anyhow::Result<()> { use models::rooms::Room; use sea_orm::EntityTrait; @@ -119,8 +115,7 @@ impl RoomService { // Save a clone for task subscriber handles before `project_ids` gets moved. let task_project_ids = project_ids.clone(); - slog::info!(log, "starting room workers"; - "room_count" => room_ids.len(), "project_count" => project_ids.len()); + tracing::info!(room_count = room_ids.len(), project_count = project_ids.len(), "starting room workers"); let persist_fn: PersistFn = make_persist_fn( self.db.clone(), @@ -131,7 +126,6 @@ impl RoomService { let get_redis: Arc queue::worker::RedisFuture + Send + Sync> = extract_get_redis(self.queue.clone()); - let worker_log = log.clone(); let worker_room_ids = room_ids.clone(); let worker_shutdown = shutdown_rx.resubscribe(); let worker_handle = tokio::spawn({ @@ -143,21 +137,18 @@ impl RoomService { get_redis, persist_fn, worker_shutdown, - worker_log, ) .await; } }); let manager = self.room_manager.clone(); - let subscriber_log = log.clone(); let redis_url = self.redis_url.clone(); let mut handles: Vec<_> = room_ids .into_iter() .map(|room_id| { let manager = manager.clone(); - let log = subscriber_log.clone(); let redis_url = redis_url.clone(); let shutdown_rx = shutdown_rx.resubscribe(); tokio::spawn(async move { @@ -165,7 +156,6 @@ impl RoomService { redis_url, manager, room_id, - log, shutdown_rx, ) .await; @@ -177,7 +167,6 @@ impl RoomService { .into_iter() .map(|project_id| { let manager = manager.clone(); - let log = subscriber_log.clone(); let redis_url = redis_url.clone(); let shutdown_rx = shutdown_rx.resubscribe(); tokio::spawn(async move { @@ -185,7 +174,6 @@ impl RoomService { redis_url, manager, project_id, - log, shutdown_rx, ) .await; @@ -199,7 +187,6 @@ impl RoomService { .into_iter() .map(|project_id| { let manager = manager.clone(); - let log = subscriber_log.clone(); let redis_url = redis_url.clone(); let shutdown_rx = shutdown_rx.resubscribe(); tokio::spawn(async move { @@ -207,7 +194,6 @@ impl RoomService { redis_url, manager, project_id, - log, shutdown_rx, ) .await; @@ -238,7 +224,7 @@ impl RoomService { } } _ = cleanup_shutdown.recv() => { - slog::info!(slog::Logger::root(slog::Discard, slog::o!()), "cleanup task shutting down"); + tracing::info!("cleanup task shutting down"); break; } } @@ -249,14 +235,14 @@ impl RoomService { let _ = shutdown_rx.recv().await; - slog::info!(log, "room workers shutting down"); + tracing::info!("room workers shutting down"); for h in handles { let _ = h.abort(); } let _ = worker_handle.await; - slog::info!(log, "room workers stopped"); + tracing::info!("room workers stopped"); Ok(()) } @@ -311,7 +297,6 @@ impl RoomService { let queue = self.queue.clone(); let room_manager = self.room_manager.clone(); let semaphore = self.worker_semaphore.clone(); - let log = self.log.clone(); // Spawn the background task. tokio::spawn(async move { @@ -354,7 +339,7 @@ impl RoomService { .publish_agent_task_event(project_id, event.clone()) .await; room_manager.broadcast_agent_task(project_id, event).await; - slog::info!(log, "agent task finished"; "task_id" => task_id, "project_id" => %project_id); + tracing::info!(task_id = task_id, project_id = %project_id, "agent task finished"); }); Ok(task_id) @@ -370,12 +355,9 @@ impl RoomService { extract_get_redis(self.queue.clone()); let manager = self.room_manager.clone(); let redis_url = self.redis_url.clone(); - let log = self.log.clone(); let semaphore = self.worker_semaphore.clone(); let db = self.db.clone(); - let log2 = log.clone(); - let log3 = log.clone(); let manager2 = self.room_manager.clone(); let redis_url2 = redis_url.clone(); let redis_url3 = redis_url.clone(); @@ -392,7 +374,6 @@ impl RoomService { get_redis, persist_fn, shutdown_rx, - log, ) .await; let _ = shutdown_tx.send(()); @@ -404,7 +385,6 @@ impl RoomService { redis_url2, manager.clone(), room_id, - log2, shutdown_rx, ) .await; @@ -427,7 +407,6 @@ impl RoomService { redis_url3, manager2, project_id, - log3, shutdown_rx, ) .await; @@ -480,9 +459,8 @@ impl RoomService { { Ok(m) => m, Err(e) => { - slog::error!(slog::Logger::root(slog::Discard, slog::o!()), - "notify_project_members: failed to fetch members"; - "project_id" => %project_id_inner, "error" => %e); + tracing::error!(project_id = %project_id_inner, error = %e, + "notify_project_members: failed to fetch members"); return; } }; @@ -500,9 +478,8 @@ impl RoomService { ) .await { - slog::warn!(slog::Logger::root(slog::Discard, slog::o!()), - "notify_project_members: failed to create notification for user"; - "user_id" => %user_id, "project_id" => %project_id_inner, "error" => %e); + tracing::warn!(user_id = %user_id, project_id = %project_id_inner, error = %e, + "notify_project_members: failed to create notification for user"); } } }); @@ -865,7 +842,7 @@ impl RoomService { }; let Some(lock_guard) = - crate::room_ai_queue::acquire_room_ai_lock(&self.cache, room_id, &self.log).await? + crate::room_ai_queue::acquire_room_ai_lock(&self.cache, room_id).await? else { return Ok(()); }; @@ -962,11 +939,7 @@ impl RoomService { let seq = match Self::next_room_message_seq_internal(room_id, &self.db, &self.cache).await { Ok(s) => s, Err(e) => { - slog::error!( - self.log, - "Failed to get seq for streaming AI message: {}", - e - ); + tracing::error!(error = %e, "Failed to get seq for streaming AI message"); return; } }; @@ -987,7 +960,6 @@ impl RoomService { let db = db.clone(); let model_id = request.model.id; - let log = self.log.clone(); tokio::spawn(async move { let _lock_guard = lock_guard; let room_manager = room_manager.clone(); @@ -1059,7 +1031,7 @@ impl RoomService { }; if let Err(e) = queue.publish(room_id_inner, envelope).await { - slog::error!(log, "Failed to publish streaming AI message: {}", e); + tracing::error!(error = %e, "Failed to publish streaming AI message"); } else { let now = Utc::now(); if let Err(e) = room_ai::Entity::update_many() @@ -1073,7 +1045,7 @@ impl RoomService { .exec(&db) .await { - slog::warn!(log, "Failed to update room_ai call stats: {}", e); + tracing::warn!(error = %e, "Failed to update room_ai call stats"); } let msg_event = queue::RoomMessageEvent { @@ -1109,7 +1081,7 @@ impl RoomService { } } Err(e) => { - slog::error!(log, "AI streaming failed: {}", e); + tracing::error!(error = %e, "AI streaming failed"); let event = RoomMessageStreamChunkEvent { message_id: streaming_msg_id, room_id: room_id_inner, @@ -1140,7 +1112,6 @@ impl RoomService { let cache = self.cache.clone(); let queue = self.queue.clone(); let room_manager = self.room_manager.clone(); - let log = self.log.clone(); let room_id_for_ai = room_id; let project_id_for_ai = project_id; let model_id_inner = model_id; @@ -1164,7 +1135,7 @@ impl RoomService { ) .await { - slog::error!(log, "Failed to create AI message: {}", e); + tracing::error!(error = %e, "Failed to create AI message"); } else { let now = Utc::now(); if let Err(e) = room_ai::Entity::update_many() @@ -1178,12 +1149,12 @@ impl RoomService { .exec(&db) .await { - slog::warn!(log, "Failed to update room_ai call stats: {}", e); + tracing::warn!(error = %e, "Failed to update room_ai call stats"); } } } Err(e) => { - slog::error!(log, "AI processing failed: {}", e); + tracing::error!(error = %e, "AI processing failed"); } } });