From 9a03cded8d3831edb675ff413cc0f13fe2cd5caf Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Tue, 21 Apr 2026 22:29:09 +0800 Subject: [PATCH] refactor(queue): migrate from slog to tracing - Remove all slog::Logger imports and log fields from RedisPubSub, MessageProducer structs - Remove log parameter from MessageProducer::new() - Replace slog::info!/warn!/error! with tracing equivalents - worker.rs: remove log: slog::Logger from start(), room_worker_task(), start_email_worker(), run_once(), email_run_once() --- libs/queue/Cargo.toml | 2 +- libs/queue/producer.rs | 28 +++++++++++---------------- libs/queue/worker.rs | 44 +++++++++++++++++------------------------- 3 files changed, 30 insertions(+), 44 deletions(-) diff --git a/libs/queue/Cargo.toml b/libs/queue/Cargo.toml index 7c6f461..3aabd66 100644 --- a/libs/queue/Cargo.toml +++ b/libs/queue/Cargo.toml @@ -28,7 +28,7 @@ anyhow = { workspace = true } thiserror = { workspace = true } uuid = { workspace = true, features = ["v7", "v4", "serde"] } chrono = { workspace = true, features = ["serde"] } -slog = { workspace = true } +tracing = { workspace = true } [lints] workspace = true diff --git a/libs/queue/producer.rs b/libs/queue/producer.rs index 9c4f9cd..59384fe 100644 --- a/libs/queue/producer.rs +++ b/libs/queue/producer.rs @@ -14,7 +14,6 @@ pub struct RedisPubSub { /// Shared connection pool; cloned handles share the same pool. pub get_redis: Arc tokio::task::JoinHandle> + Send + Sync>, - pub log: slog::Logger, } impl RedisPubSub { @@ -23,11 +22,11 @@ impl RedisPubSub { let redis = match (self.get_redis)().await { Ok(Ok(c)) => c, Ok(Err(e)) => { - slog::error!(self.log, "redis pool get failed"; "error" => %e); + tracing::error!(error = %e, "redis pool get failed"); return; } Err(_) => { - slog::error!(self.log, "redis pool task panicked"); + tracing::error!("redis pool task panicked"); return; } }; @@ -38,7 +37,7 @@ impl RedisPubSub { .query_async::<()>(&mut conn) .await { - slog::error!(self.log, "Redis PUBLISH failed"; "channel" => %channel, "error" => %e); + tracing::error!(channel = %channel, error = %e, "Redis PUBLISH failed"); } } @@ -48,7 +47,7 @@ impl RedisPubSub { let payload = match serde_json::to_vec(event) { Ok(p) => p, Err(e) => { - slog::error!(self.log, "serialise RoomMessageEvent failed"; "error" => %e); + tracing::error!(error = %e, "serialise RoomMessageEvent failed"); return; } }; @@ -65,7 +64,7 @@ impl RedisPubSub { let payload = match serde_json::to_vec(event) { Ok(p) => p, Err(e) => { - slog::error!(self.log, "serialise ProjectRoomEvent failed"; "error" => %e); + tracing::error!(error = %e, "serialise ProjectRoomEvent failed"); return; } }; @@ -78,7 +77,7 @@ impl RedisPubSub { let payload = match serde_json::to_vec(event) { Ok(p) => p, Err(e) => { - slog::error!(self.log, "serialise AgentTaskEvent failed"; "error" => %e); + tracing::error!(error = %e, "serialise AgentTaskEvent failed"); return; } }; @@ -94,7 +93,6 @@ pub struct MessageProducer { maxlen: i64, /// Redis Pub/Sub client used to fan-out events to all server instances. pub pubsub: Option, - log: slog::Logger, } impl MessageProducer { @@ -104,13 +102,11 @@ impl MessageProducer { >, pubsub: Option, maxlen: i64, - log: slog::Logger, ) -> Self { Self { get_redis, maxlen, pubsub, - log, } } @@ -137,8 +133,7 @@ impl MessageProducer { .await .context("XADD to Redis Stream")?; - slog::info!(self.log, "message queued to stream"; - "room_id" => %room_id, "entry_id" => %entry_id); + tracing::info!(room_id = %room_id, entry_id = %entry_id, "message queued to stream"); // Fan-out via Redis Pub/Sub so all server instances can push to their WS clients. if let Some(pubsub) = &self.pubsub { @@ -156,7 +151,7 @@ impl MessageProducer { event: ProjectRoomEvent, ) { let Some(pubsub) = &self.pubsub else { - slog::warn!(self.log, "pubsub not configured, skipping project event"); + tracing::warn!(project_id = %project_id, "pubsub not configured, skipping project event"); return; }; pubsub.publish_project_room_event(project_id, &event).await; @@ -165,7 +160,7 @@ impl MessageProducer { /// Publish an agent task event via Pub/Sub (no Redis Stream write). pub async fn publish_agent_task_event(&self, project_id: uuid::Uuid, event: AgentTaskEvent) { let Some(pubsub) = &self.pubsub else { - slog::warn!(self.log, "pubsub not configured, skipping task event"); + tracing::warn!(project_id = %project_id, "pubsub not configured, skipping task event"); return; }; pubsub.publish_agent_task_event(project_id, &event).await; @@ -180,7 +175,7 @@ impl MessageProducer { reactions: Vec, ) { let Some(pubsub) = &self.pubsub else { - slog::warn!(self.log, "pubsub not configured, skipping reaction event"); + tracing::warn!(room_id = %room_id, message_id = %message_id, "pubsub not configured, skipping reaction event"); return; }; let event = RoomMessageEvent { @@ -221,8 +216,7 @@ impl MessageProducer { .await .context("XADD email to Redis Stream")?; - slog::info!(self.log, "email queued to stream"; - "to" => %envelope.to, "entry_id" => %entry_id); + tracing::info!(to = %envelope.to, entry_id = %entry_id, "email queued to stream"); Ok(entry_id) } diff --git a/libs/queue/worker.rs b/libs/queue/worker.rs index 153aa6d..1192984 100644 --- a/libs/queue/worker.rs +++ b/libs/queue/worker.rs @@ -32,10 +32,9 @@ pub async fn start( get_redis: GetRedis, persist_fn: PersistFn, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, - log: slog::Logger, ) { let consumer = consumer_id(); - slog::info!(log, "room-message worker starting"; "consumer" => %consumer, "rooms" => ?room_ids); + tracing::info!(consumer = %consumer, rooms = ?room_ids, "room-message worker starting"); let handles: Vec<_> = room_ids .into_iter() @@ -44,10 +43,9 @@ pub async fn start( let persist_fn = persist_fn.clone(); let shutdown = shutdown_rx.resubscribe(); let consumer = consumer.clone(); - let log = log.clone(); tokio::spawn(room_worker_task( - room_id, consumer, get_redis, persist_fn, shutdown, log, + room_id, consumer, get_redis, persist_fn, shutdown, )) }) .collect(); @@ -65,25 +63,24 @@ pub async fn room_worker_task( get_redis: GetRedis, persist_fn: PersistFn, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, - log: slog::Logger, ) { let key = stream_key(room_id); - slog::info!(log, "room worker task started"; "room_id" => %room_id); + tracing::info!(room_id = %room_id, "room worker task started"); loop { tokio::select! { _ = shutdown_rx.recv() => { - slog::info!(log, "room worker task shutting down"; "room_id" => %room_id); + tracing::info!(room_id = %room_id, "room worker task shutting down"); break; } _ = tokio::time::sleep(std::time::Duration::from_secs(FLUSH_INTERVAL_SECS)) => {} } - match run_once(&key, &consumer, &get_redis, &persist_fn, &log).await { + match run_once(&key, &consumer, &get_redis, &persist_fn).await { Ok(0) => {} - Ok(n) => slog::debug!(log, "batch flushed"; "room_id" => %room_id, "n" => n), + Ok(n) => tracing::debug!(room_id = %room_id, n = n, "batch flushed"), Err(e) => { - slog::error!(log, "stream consume error"; "room_id" => %room_id, "error" => %e); + tracing::error!(room_id = %room_id, error = %e, "stream consume error"); tokio::time::sleep(std::time::Duration::from_secs(2)).await; } } @@ -95,7 +92,6 @@ async fn run_once( consumer: &str, get_redis: &GetRedis, persist_fn: &PersistFn, - log: &slog::Logger, ) -> anyhow::Result { let redis = (get_redis)().await?; let mut conn: deadpool_redis::cluster::Connection = redis; @@ -134,8 +130,7 @@ async fn run_once( if let Some(data) = extract_field(&field_values, "data") { match serde_json::from_str::(&data) { Ok(env) => batch.push((entry_id, env)), - Err(e) => slog::warn!(log, "malformed envelope"; - "entry_id" => %entry_id, "error" => %e), + Err(e) => tracing::warn!(entry_id = %entry_id, error = %e, "malformed envelope"), } } } @@ -149,7 +144,7 @@ async fn run_once( let envelopes: Vec = batch.into_iter().map(|(_, e)| e).collect(); if let Err(e) = persist_fn(envelopes).await { - slog::error!(log, "persist_fn failed — entries NOT acked (will retry)"; "error" => %e); + tracing::error!(error = %e, "persist_fn failed — entries NOT acked (will retry)"); return Err(e); } @@ -164,7 +159,7 @@ async fn run_once( .await; } - slog::info!(log, "batch persisted and acked"; "n" => entry_ids.len()); + tracing::info!(n = entry_ids.len(), "batch persisted and acked"); Ok(entry_ids.len()) } @@ -191,25 +186,24 @@ pub async fn start_email_worker( get_redis: GetRedis, send_fn: EmailSendFn, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, - log: slog::Logger, ) { let consumer = format!("email-worker-{}", uuid::Uuid::new_v4()); - slog::info!(log, "email worker starting"; "consumer" => %consumer); + tracing::info!(consumer = %consumer, "email worker starting"); loop { tokio::select! { _ = shutdown_rx.recv() => { - slog::info!(log, "email worker shutting down"); + tracing::info!("email worker shutting down"); break; } _ = tokio::time::sleep(std::time::Duration::from_secs(FLUSH_INTERVAL_SECS)) => {} } - match email_run_once(&consumer, &get_redis, &send_fn, &log).await { + match email_run_once(&consumer, &get_redis, &send_fn).await { Ok(0) => {} - Ok(n) => slog::debug!(log, "email batch processed"; "n" => n), + Ok(n) => tracing::debug!(n = n, "email batch processed"), Err(e) => { - slog::error!(log, "email stream consume error"; "error" => %e); + tracing::error!(error = %e, "email stream consume error"); tokio::time::sleep(std::time::Duration::from_secs(2)).await; } } @@ -220,7 +214,6 @@ async fn email_run_once( consumer: &str, get_redis: &GetRedis, send_fn: &EmailSendFn, - log: &slog::Logger, ) -> anyhow::Result { let redis = (get_redis)().await?; let mut conn: deadpool_redis::cluster::Connection = redis; @@ -259,8 +252,7 @@ async fn email_run_once( if let Some(data) = extract_field(&field_values, "data") { match serde_json::from_str::(&data) { Ok(env) => batch.push((entry_id, env)), - Err(e) => slog::warn!(log, "malformed email envelope"; - "entry_id" => %entry_id, "error" => %e), + Err(e) => tracing::warn!(entry_id = %entry_id, error = %e, "malformed email envelope"), } } } @@ -274,7 +266,7 @@ async fn email_run_once( let envelopes: Vec = batch.into_iter().map(|(_, e)| e).collect(); if let Err(e) = send_fn(envelopes).await { - slog::error!(log, "email send_fn failed — entries NOT acked (will retry)"; "error" => %e); + tracing::error!(error = %e, "email send_fn failed — entries NOT acked (will retry)"); return Err(e); } @@ -289,6 +281,6 @@ async fn email_run_once( .await; } - slog::info!(log, "email batch sent and acked"; "n" => entry_ids.len()); + tracing::info!(n = entry_ids.len(), "email batch sent and acked"); Ok(entry_ids.len()) }