refactor(queue): migrate from slog to tracing

- Remove all slog::Logger imports and log fields from RedisPubSub,
  MessageProducer structs
- Remove log parameter from MessageProducer::new()
- Replace slog::info!/warn!/error! with tracing equivalents
- worker.rs: remove log: slog::Logger from start(), room_worker_task(),
  start_email_worker(), run_once(), email_run_once()
This commit is contained in:
ZhenYi 2026-04-21 22:29:09 +08:00
parent 57779822dc
commit 9a03cded8d
3 changed files with 30 additions and 44 deletions

View File

@ -28,7 +28,7 @@ anyhow = { workspace = true }
thiserror = { workspace = true } thiserror = { workspace = true }
uuid = { workspace = true, features = ["v7", "v4", "serde"] } uuid = { workspace = true, features = ["v7", "v4", "serde"] }
chrono = { workspace = true, features = ["serde"] } chrono = { workspace = true, features = ["serde"] }
slog = { workspace = true } tracing = { workspace = true }
[lints] [lints]
workspace = true workspace = true

View File

@ -14,7 +14,6 @@ pub struct RedisPubSub {
/// Shared connection pool; cloned handles share the same pool. /// Shared connection pool; cloned handles share the same pool.
pub get_redis: pub get_redis:
Arc<dyn Fn() -> tokio::task::JoinHandle<anyhow::Result<RedisConn>> + Send + Sync>, Arc<dyn Fn() -> tokio::task::JoinHandle<anyhow::Result<RedisConn>> + Send + Sync>,
pub log: slog::Logger,
} }
impl RedisPubSub { impl RedisPubSub {
@ -23,11 +22,11 @@ impl RedisPubSub {
let redis = match (self.get_redis)().await { let redis = match (self.get_redis)().await {
Ok(Ok(c)) => c, Ok(Ok(c)) => c,
Ok(Err(e)) => { Ok(Err(e)) => {
slog::error!(self.log, "redis pool get failed"; "error" => %e); tracing::error!(error = %e, "redis pool get failed");
return; return;
} }
Err(_) => { Err(_) => {
slog::error!(self.log, "redis pool task panicked"); tracing::error!("redis pool task panicked");
return; return;
} }
}; };
@ -38,7 +37,7 @@ impl RedisPubSub {
.query_async::<()>(&mut conn) .query_async::<()>(&mut conn)
.await .await
{ {
slog::error!(self.log, "Redis PUBLISH failed"; "channel" => %channel, "error" => %e); tracing::error!(channel = %channel, error = %e, "Redis PUBLISH failed");
} }
} }
@ -48,7 +47,7 @@ impl RedisPubSub {
let payload = match serde_json::to_vec(event) { let payload = match serde_json::to_vec(event) {
Ok(p) => p, Ok(p) => p,
Err(e) => { Err(e) => {
slog::error!(self.log, "serialise RoomMessageEvent failed"; "error" => %e); tracing::error!(error = %e, "serialise RoomMessageEvent failed");
return; return;
} }
}; };
@ -65,7 +64,7 @@ impl RedisPubSub {
let payload = match serde_json::to_vec(event) { let payload = match serde_json::to_vec(event) {
Ok(p) => p, Ok(p) => p,
Err(e) => { Err(e) => {
slog::error!(self.log, "serialise ProjectRoomEvent failed"; "error" => %e); tracing::error!(error = %e, "serialise ProjectRoomEvent failed");
return; return;
} }
}; };
@ -78,7 +77,7 @@ impl RedisPubSub {
let payload = match serde_json::to_vec(event) { let payload = match serde_json::to_vec(event) {
Ok(p) => p, Ok(p) => p,
Err(e) => { Err(e) => {
slog::error!(self.log, "serialise AgentTaskEvent failed"; "error" => %e); tracing::error!(error = %e, "serialise AgentTaskEvent failed");
return; return;
} }
}; };
@ -94,7 +93,6 @@ pub struct MessageProducer {
maxlen: i64, maxlen: i64,
/// Redis Pub/Sub client used to fan-out events to all server instances. /// Redis Pub/Sub client used to fan-out events to all server instances.
pub pubsub: Option<RedisPubSub>, pub pubsub: Option<RedisPubSub>,
log: slog::Logger,
} }
impl MessageProducer { impl MessageProducer {
@ -104,13 +102,11 @@ impl MessageProducer {
>, >,
pubsub: Option<RedisPubSub>, pubsub: Option<RedisPubSub>,
maxlen: i64, maxlen: i64,
log: slog::Logger,
) -> Self { ) -> Self {
Self { Self {
get_redis, get_redis,
maxlen, maxlen,
pubsub, pubsub,
log,
} }
} }
@ -137,8 +133,7 @@ impl MessageProducer {
.await .await
.context("XADD to Redis Stream")?; .context("XADD to Redis Stream")?;
slog::info!(self.log, "message queued to stream"; tracing::info!(room_id = %room_id, entry_id = %entry_id, "message queued to stream");
"room_id" => %room_id, "entry_id" => %entry_id);
// Fan-out via Redis Pub/Sub so all server instances can push to their WS clients. // Fan-out via Redis Pub/Sub so all server instances can push to their WS clients.
if let Some(pubsub) = &self.pubsub { if let Some(pubsub) = &self.pubsub {
@ -156,7 +151,7 @@ impl MessageProducer {
event: ProjectRoomEvent, event: ProjectRoomEvent,
) { ) {
let Some(pubsub) = &self.pubsub else { let Some(pubsub) = &self.pubsub else {
slog::warn!(self.log, "pubsub not configured, skipping project event"); tracing::warn!(project_id = %project_id, "pubsub not configured, skipping project event");
return; return;
}; };
pubsub.publish_project_room_event(project_id, &event).await; pubsub.publish_project_room_event(project_id, &event).await;
@ -165,7 +160,7 @@ impl MessageProducer {
/// Publish an agent task event via Pub/Sub (no Redis Stream write). /// Publish an agent task event via Pub/Sub (no Redis Stream write).
pub async fn publish_agent_task_event(&self, project_id: uuid::Uuid, event: AgentTaskEvent) { pub async fn publish_agent_task_event(&self, project_id: uuid::Uuid, event: AgentTaskEvent) {
let Some(pubsub) = &self.pubsub else { let Some(pubsub) = &self.pubsub else {
slog::warn!(self.log, "pubsub not configured, skipping task event"); tracing::warn!(project_id = %project_id, "pubsub not configured, skipping task event");
return; return;
}; };
pubsub.publish_agent_task_event(project_id, &event).await; pubsub.publish_agent_task_event(project_id, &event).await;
@ -180,7 +175,7 @@ impl MessageProducer {
reactions: Vec<ReactionGroup>, reactions: Vec<ReactionGroup>,
) { ) {
let Some(pubsub) = &self.pubsub else { let Some(pubsub) = &self.pubsub else {
slog::warn!(self.log, "pubsub not configured, skipping reaction event"); tracing::warn!(room_id = %room_id, message_id = %message_id, "pubsub not configured, skipping reaction event");
return; return;
}; };
let event = RoomMessageEvent { let event = RoomMessageEvent {
@ -221,8 +216,7 @@ impl MessageProducer {
.await .await
.context("XADD email to Redis Stream")?; .context("XADD email to Redis Stream")?;
slog::info!(self.log, "email queued to stream"; tracing::info!(to = %envelope.to, entry_id = %entry_id, "email queued to stream");
"to" => %envelope.to, "entry_id" => %entry_id);
Ok(entry_id) Ok(entry_id)
} }

View File

@ -32,10 +32,9 @@ pub async fn start(
get_redis: GetRedis, get_redis: GetRedis,
persist_fn: PersistFn, persist_fn: PersistFn,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
log: slog::Logger,
) { ) {
let consumer = consumer_id(); let consumer = consumer_id();
slog::info!(log, "room-message worker starting"; "consumer" => %consumer, "rooms" => ?room_ids); tracing::info!(consumer = %consumer, rooms = ?room_ids, "room-message worker starting");
let handles: Vec<_> = room_ids let handles: Vec<_> = room_ids
.into_iter() .into_iter()
@ -44,10 +43,9 @@ pub async fn start(
let persist_fn = persist_fn.clone(); let persist_fn = persist_fn.clone();
let shutdown = shutdown_rx.resubscribe(); let shutdown = shutdown_rx.resubscribe();
let consumer = consumer.clone(); let consumer = consumer.clone();
let log = log.clone();
tokio::spawn(room_worker_task( tokio::spawn(room_worker_task(
room_id, consumer, get_redis, persist_fn, shutdown, log, room_id, consumer, get_redis, persist_fn, shutdown,
)) ))
}) })
.collect(); .collect();
@ -65,25 +63,24 @@ pub async fn room_worker_task(
get_redis: GetRedis, get_redis: GetRedis,
persist_fn: PersistFn, persist_fn: PersistFn,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
log: slog::Logger,
) { ) {
let key = stream_key(room_id); let key = stream_key(room_id);
slog::info!(log, "room worker task started"; "room_id" => %room_id); tracing::info!(room_id = %room_id, "room worker task started");
loop { loop {
tokio::select! { tokio::select! {
_ = shutdown_rx.recv() => { _ = shutdown_rx.recv() => {
slog::info!(log, "room worker task shutting down"; "room_id" => %room_id); tracing::info!(room_id = %room_id, "room worker task shutting down");
break; break;
} }
_ = tokio::time::sleep(std::time::Duration::from_secs(FLUSH_INTERVAL_SECS)) => {} _ = tokio::time::sleep(std::time::Duration::from_secs(FLUSH_INTERVAL_SECS)) => {}
} }
match run_once(&key, &consumer, &get_redis, &persist_fn, &log).await { match run_once(&key, &consumer, &get_redis, &persist_fn).await {
Ok(0) => {} Ok(0) => {}
Ok(n) => slog::debug!(log, "batch flushed"; "room_id" => %room_id, "n" => n), Ok(n) => tracing::debug!(room_id = %room_id, n = n, "batch flushed"),
Err(e) => { Err(e) => {
slog::error!(log, "stream consume error"; "room_id" => %room_id, "error" => %e); tracing::error!(room_id = %room_id, error = %e, "stream consume error");
tokio::time::sleep(std::time::Duration::from_secs(2)).await; tokio::time::sleep(std::time::Duration::from_secs(2)).await;
} }
} }
@ -95,7 +92,6 @@ async fn run_once(
consumer: &str, consumer: &str,
get_redis: &GetRedis, get_redis: &GetRedis,
persist_fn: &PersistFn, persist_fn: &PersistFn,
log: &slog::Logger,
) -> anyhow::Result<usize> { ) -> anyhow::Result<usize> {
let redis = (get_redis)().await?; let redis = (get_redis)().await?;
let mut conn: deadpool_redis::cluster::Connection = redis; let mut conn: deadpool_redis::cluster::Connection = redis;
@ -134,8 +130,7 @@ async fn run_once(
if let Some(data) = extract_field(&field_values, "data") { if let Some(data) = extract_field(&field_values, "data") {
match serde_json::from_str::<RoomMessageEnvelope>(&data) { match serde_json::from_str::<RoomMessageEnvelope>(&data) {
Ok(env) => batch.push((entry_id, env)), Ok(env) => batch.push((entry_id, env)),
Err(e) => slog::warn!(log, "malformed envelope"; Err(e) => tracing::warn!(entry_id = %entry_id, error = %e, "malformed envelope"),
"entry_id" => %entry_id, "error" => %e),
} }
} }
} }
@ -149,7 +144,7 @@ async fn run_once(
let envelopes: Vec<RoomMessageEnvelope> = batch.into_iter().map(|(_, e)| e).collect(); let envelopes: Vec<RoomMessageEnvelope> = batch.into_iter().map(|(_, e)| e).collect();
if let Err(e) = persist_fn(envelopes).await { if let Err(e) = persist_fn(envelopes).await {
slog::error!(log, "persist_fn failed — entries NOT acked (will retry)"; "error" => %e); tracing::error!(error = %e, "persist_fn failed — entries NOT acked (will retry)");
return Err(e); return Err(e);
} }
@ -164,7 +159,7 @@ async fn run_once(
.await; .await;
} }
slog::info!(log, "batch persisted and acked"; "n" => entry_ids.len()); tracing::info!(n = entry_ids.len(), "batch persisted and acked");
Ok(entry_ids.len()) Ok(entry_ids.len())
} }
@ -191,25 +186,24 @@ pub async fn start_email_worker(
get_redis: GetRedis, get_redis: GetRedis,
send_fn: EmailSendFn, send_fn: EmailSendFn,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
log: slog::Logger,
) { ) {
let consumer = format!("email-worker-{}", uuid::Uuid::new_v4()); let consumer = format!("email-worker-{}", uuid::Uuid::new_v4());
slog::info!(log, "email worker starting"; "consumer" => %consumer); tracing::info!(consumer = %consumer, "email worker starting");
loop { loop {
tokio::select! { tokio::select! {
_ = shutdown_rx.recv() => { _ = shutdown_rx.recv() => {
slog::info!(log, "email worker shutting down"); tracing::info!("email worker shutting down");
break; break;
} }
_ = tokio::time::sleep(std::time::Duration::from_secs(FLUSH_INTERVAL_SECS)) => {} _ = tokio::time::sleep(std::time::Duration::from_secs(FLUSH_INTERVAL_SECS)) => {}
} }
match email_run_once(&consumer, &get_redis, &send_fn, &log).await { match email_run_once(&consumer, &get_redis, &send_fn).await {
Ok(0) => {} Ok(0) => {}
Ok(n) => slog::debug!(log, "email batch processed"; "n" => n), Ok(n) => tracing::debug!(n = n, "email batch processed"),
Err(e) => { Err(e) => {
slog::error!(log, "email stream consume error"; "error" => %e); tracing::error!(error = %e, "email stream consume error");
tokio::time::sleep(std::time::Duration::from_secs(2)).await; tokio::time::sleep(std::time::Duration::from_secs(2)).await;
} }
} }
@ -220,7 +214,6 @@ async fn email_run_once(
consumer: &str, consumer: &str,
get_redis: &GetRedis, get_redis: &GetRedis,
send_fn: &EmailSendFn, send_fn: &EmailSendFn,
log: &slog::Logger,
) -> anyhow::Result<usize> { ) -> anyhow::Result<usize> {
let redis = (get_redis)().await?; let redis = (get_redis)().await?;
let mut conn: deadpool_redis::cluster::Connection = redis; let mut conn: deadpool_redis::cluster::Connection = redis;
@ -259,8 +252,7 @@ async fn email_run_once(
if let Some(data) = extract_field(&field_values, "data") { if let Some(data) = extract_field(&field_values, "data") {
match serde_json::from_str::<EmailEnvelope>(&data) { match serde_json::from_str::<EmailEnvelope>(&data) {
Ok(env) => batch.push((entry_id, env)), Ok(env) => batch.push((entry_id, env)),
Err(e) => slog::warn!(log, "malformed email envelope"; Err(e) => tracing::warn!(entry_id = %entry_id, error = %e, "malformed email envelope"),
"entry_id" => %entry_id, "error" => %e),
} }
} }
} }
@ -274,7 +266,7 @@ async fn email_run_once(
let envelopes: Vec<EmailEnvelope> = batch.into_iter().map(|(_, e)| e).collect(); let envelopes: Vec<EmailEnvelope> = batch.into_iter().map(|(_, e)| e).collect();
if let Err(e) = send_fn(envelopes).await { if let Err(e) = send_fn(envelopes).await {
slog::error!(log, "email send_fn failed — entries NOT acked (will retry)"; "error" => %e); tracing::error!(error = %e, "email send_fn failed — entries NOT acked (will retry)");
return Err(e); return Err(e);
} }
@ -289,6 +281,6 @@ async fn email_run_once(
.await; .await;
} }
slog::info!(log, "email batch sent and acked"; "n" => entry_ids.len()); tracing::info!(n = entry_ids.len(), "email batch sent and acked");
Ok(entry_ids.len()) Ok(entry_ids.len())
} }