refactor(room): migrate from slog to tracing + upgrade metrics to 0.22

- Remove all use slog::* imports and log: slog::Logger fields
- Replace slog macros with tracing::{info!, warn!, error!, debug!}
- metrics.rs: upgrade metrics 0.21→0.22, remove register_*! macros,
  use functional API: metrics::gauge!(), metrics::counter!(),
  metrics::histogram!(), metrics::describe_gauge!() etc.
- RoomMetrics: all fields now use functional metrics API, dynamic
  room_id labels passed as owned String to avoid lifetime issues
- RoomService: remove pub log: slog::Logger field
- connection.rs: remove log from subscribe_room_events,
  subscribe_project_room_events, subscribe_task_events_fn
This commit is contained in:
ZhenYi 2026-04-21 22:28:52 +08:00
parent 773da34fab
commit 57779822dc
8 changed files with 99 additions and 165 deletions

View File

@ -26,7 +26,7 @@ config = { path = "../config" }
serde = { workspace = true, features = ["derive"] } serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true } serde_json = { workspace = true }
slog = { workspace = true } tracing = { workspace = true }
chrono = { workspace = true, features = ["serde"] } chrono = { workspace = true, features = ["serde"] }
uuid = { workspace = true, features = ["serde", "v7", "v4"] } uuid = { workspace = true, features = ["serde", "v7", "v4"] }
sea-orm = { workspace = true } sea-orm = { workspace = true }
@ -37,7 +37,7 @@ tokio-stream = { workspace = true }
futures = { workspace = true } futures = { workspace = true }
deadpool-redis = { workspace = true, features = ["rt_tokio_1", "cluster-async", "cluster"] } deadpool-redis = { workspace = true, features = ["rt_tokio_1", "cluster-async", "cluster"] }
utoipa = { workspace = true, features = ["uuid", "chrono"] } utoipa = { workspace = true, features = ["uuid", "chrono"] }
metrics = "0.21" metrics = "0.22"
regex-lite = "0.1.6" regex-lite = "0.1.6"
redis = { workspace = true, features = ["tokio-comp", "connection-manager"] } redis = { workspace = true, features = ["tokio-comp", "connection-manager"] }
async-openai = { workspace = true } async-openai = { workspace = true }

View File

@ -783,7 +783,6 @@ fn start_pubsub_thread<F, Fut>(
channel: String, channel: String,
relay_tx: tokio::sync::mpsc::Sender<Vec<u8>>, relay_tx: tokio::sync::mpsc::Sender<Vec<u8>>,
mut shutdown_rx: broadcast::Receiver<()>, mut shutdown_rx: broadcast::Receiver<()>,
log: slog::Logger,
_on_msg: F, _on_msg: F,
) where ) where
F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static, F: Fn(Vec<u8>) -> Fut + Send + Sync + 'static,
@ -800,14 +799,14 @@ fn start_pubsub_thread<F, Fut>(
let redis_url = redis_url.clone(); let redis_url = redis_url.clone();
loop { loop {
if shutdown_rx.try_recv().is_ok() { if shutdown_rx.try_recv().is_ok() {
slog::info!(log, "pubsub thread shutting down before connect"; "channel" => %channel); tracing::info!(channel = %channel, "pubsub thread shutting down before connect");
break; break;
} }
let client = match redis::Client::open(redis_url.as_str()) { let client = match redis::Client::open(redis_url.as_str()) {
Ok(c) => c, Ok(c) => c,
Err(e) => { Err(e) => {
slog::error!(log, "pubsub redis client open failed"; "channel" => %channel, "error" => %e); tracing::error!(channel = %channel, error = %e, "pubsub redis client open failed");
thread::sleep(Duration::from_secs(1)); thread::sleep(Duration::from_secs(1));
continue; continue;
} }
@ -816,16 +815,16 @@ fn start_pubsub_thread<F, Fut>(
let mut pubsub = match client.get_async_pubsub().await { let mut pubsub = match client.get_async_pubsub().await {
Ok(p) => p, Ok(p) => p,
Err(e) => { Err(e) => {
slog::error!(log, "pubsub connection failed"; "channel" => %channel, "error" => %e); tracing::error!(channel = %channel, error = %e, "pubsub connection failed");
thread::sleep(Duration::from_secs(1)); thread::sleep(Duration::from_secs(1));
continue; continue;
} }
}; };
match pubsub.subscribe(&channel).await { match pubsub.subscribe(&channel).await {
Ok(_) => slog::info!(log, "pubsub subscribed"; "channel" => %channel), Ok(_) => tracing::info!(channel = %channel, "pubsub subscribed"),
Err(e) => { Err(e) => {
slog::error!(log, "pubsub subscribe failed"; "channel" => %channel, "error" => %e); tracing::error!(channel = %channel, error = %e, "pubsub subscribe failed");
thread::sleep(Duration::from_secs(1)); thread::sleep(Duration::from_secs(1));
continue; continue;
} }
@ -835,7 +834,7 @@ fn start_pubsub_thread<F, Fut>(
loop { loop {
if shutdown_rx.try_recv().is_ok() { if shutdown_rx.try_recv().is_ok() {
slog::info!(log, "pubsub thread shutting down"; "channel" => %channel); tracing::info!(channel = %channel, "pubsub thread shutting down");
return; return;
} }
@ -848,21 +847,21 @@ fn start_pubsub_thread<F, Fut>(
match msg { match msg {
Ok(Some(msg)) => { Ok(Some(msg)) => {
let payload = msg.get_payload_bytes(); let payload = msg.get_payload_bytes();
slog::debug!(log, "pubsub received"; "channel" => %channel, "len" => payload.len()); tracing::debug!(channel = %channel, len = payload.len(), "pubsub received");
if relay_tx.send(payload.to_vec()).await.is_err() { if relay_tx.send(payload.to_vec()).await.is_err() {
slog::warn!(log, "pubsub relay channel closed"; "channel" => %channel); tracing::warn!(channel = %channel, "pubsub relay channel closed");
return; return;
} }
} }
Ok(None) => { Ok(None) => {
slog::warn!(log, "pubsub stream ended, will reconnect"; "channel" => %channel); tracing::warn!(channel = %channel, "pubsub stream ended, will reconnect");
break; break;
} }
Err(_) => {} Err(_) => {}
} }
} }
slog::warn!(log, "pubsub connection lost, reconnecting"; "channel" => %channel); tracing::warn!(channel = %channel, "pubsub connection lost, reconnecting");
} }
}); });
}) })
@ -873,15 +872,13 @@ pub async fn subscribe_room_events(
redis_url: String, redis_url: String,
manager: Arc<RoomConnectionManager>, manager: Arc<RoomConnectionManager>,
room_id: Uuid, room_id: Uuid,
log: slog::Logger,
mut shutdown_rx: broadcast::Receiver<()>, mut shutdown_rx: broadcast::Receiver<()>,
) { ) {
let channel = format!("room:pub:{}", room_id); let channel = format!("room:pub:{}", room_id);
let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(1024); let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(1024);
slog::info!(log, "starting room pubsub subscriber"; "room_id" => %room_id, "channel" => %channel); tracing::info!(room_id = %room_id, channel = %channel, "starting room pubsub subscriber");
let thread_log = log.clone();
let thread_channel = channel.clone(); let thread_channel = channel.clone();
let thread_shutdown = shutdown_rx.resubscribe(); let thread_shutdown = shutdown_rx.resubscribe();
start_pubsub_thread( start_pubsub_thread(
@ -889,14 +886,13 @@ pub async fn subscribe_room_events(
thread_channel, thread_channel,
tx, tx,
thread_shutdown, thread_shutdown,
thread_log,
|_| async {}, |_| async {},
); );
loop { loop {
tokio::select! { tokio::select! {
_ = shutdown_rx.recv() => { _ = shutdown_rx.recv() => {
slog::info!(log, "room subscriber shutting down"; "room_id" => %room_id); tracing::info!(room_id = %room_id, "room subscriber shutting down");
break; break;
} }
payload = rx.recv() => { payload = rx.recv() => {
@ -907,34 +903,32 @@ pub async fn subscribe_room_events(
manager.broadcast(room_id, event).await; manager.broadcast(room_id, event).await;
} }
Err(e) => { Err(e) => {
slog::warn!(log, "malformed RoomMessageEvent"; "error" => %e); tracing::warn!(error = %e, "malformed RoomMessageEvent");
} }
} }
} }
None => { None => {
slog::warn!(log, "pubsub relay channel closed"; "room_id" => %room_id); tracing::warn!(room_id = %room_id, "pubsub relay channel closed");
break; break;
} }
} }
} }
} }
} }
slog::info!(log, "room subscriber stopped"; "room_id" => %room_id); tracing::info!(room_id = %room_id, "room subscriber stopped");
} }
pub async fn subscribe_project_room_events( pub async fn subscribe_project_room_events(
redis_url: String, redis_url: String,
manager: Arc<RoomConnectionManager>, manager: Arc<RoomConnectionManager>,
project_id: Uuid, project_id: Uuid,
log: slog::Logger,
mut shutdown_rx: broadcast::Receiver<()>, mut shutdown_rx: broadcast::Receiver<()>,
) { ) {
let channel = format!("project:pub:{}", project_id); let channel = format!("project:pub:{}", project_id);
let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(1024); let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(1024);
slog::info!(log, "starting project pubsub subscriber"; "project_id" => %project_id, "channel" => %channel); tracing::info!(project_id = %project_id, channel = %channel, "starting project pubsub subscriber");
let thread_log = log.clone();
let thread_channel = channel.clone(); let thread_channel = channel.clone();
let thread_shutdown = shutdown_rx.resubscribe(); let thread_shutdown = shutdown_rx.resubscribe();
start_pubsub_thread( start_pubsub_thread(
@ -942,14 +936,13 @@ pub async fn subscribe_project_room_events(
thread_channel, thread_channel,
tx, tx,
thread_shutdown, thread_shutdown,
thread_log,
|_| async {}, |_| async {},
); );
loop { loop {
tokio::select! { tokio::select! {
_ = shutdown_rx.recv() => { _ = shutdown_rx.recv() => {
slog::info!(log, "project subscriber shutting down"; "project_id" => %project_id); tracing::info!(project_id = %project_id, "project subscriber shutting down");
break; break;
} }
payload = rx.recv() => { payload = rx.recv() => {
@ -960,19 +953,19 @@ pub async fn subscribe_project_room_events(
manager.broadcast_project(project_id, event).await; manager.broadcast_project(project_id, event).await;
} }
Err(e) => { Err(e) => {
slog::warn!(log, "malformed ProjectRoomEvent"; "error" => %e); tracing::warn!(error = %e, "malformed ProjectRoomEvent");
} }
} }
} }
None => { None => {
slog::warn!(log, "project pubsub relay channel closed"; "project_id" => %project_id); tracing::warn!(project_id = %project_id, "project pubsub relay channel closed");
break; break;
} }
} }
} }
} }
} }
slog::info!(log, "project subscriber stopped"; "project_id" => %project_id); tracing::info!(project_id = %project_id, "project subscriber stopped");
} }
/// Subscribe to Redis Pub/Sub `task:pub:{project_id}` and relay events to /// Subscribe to Redis Pub/Sub `task:pub:{project_id}` and relay events to
@ -981,15 +974,13 @@ pub async fn subscribe_task_events_fn(
redis_url: String, redis_url: String,
manager: Arc<RoomConnectionManager>, manager: Arc<RoomConnectionManager>,
project_id: Uuid, project_id: Uuid,
log: slog::Logger,
mut shutdown_rx: broadcast::Receiver<()>, mut shutdown_rx: broadcast::Receiver<()>,
) { ) {
let channel = format!("task:pub:{}", project_id); let channel = format!("task:pub:{}", project_id);
let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(1024); let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(1024);
slog::info!(log, "starting task pubsub subscriber"; "project_id" => %project_id, "channel" => %channel); tracing::info!(project_id = %project_id, channel = %channel, "starting task pubsub subscriber");
let thread_log = log.clone();
let thread_channel = channel.clone(); let thread_channel = channel.clone();
let thread_shutdown = shutdown_rx.resubscribe(); let thread_shutdown = shutdown_rx.resubscribe();
start_pubsub_thread( start_pubsub_thread(
@ -997,14 +988,13 @@ pub async fn subscribe_task_events_fn(
thread_channel, thread_channel,
tx, tx,
thread_shutdown, thread_shutdown,
thread_log,
|_| async {}, |_| async {},
); );
loop { loop {
tokio::select! { tokio::select! {
_ = shutdown_rx.recv() => { _ = shutdown_rx.recv() => {
slog::info!(log, "task subscriber shutting down"; "project_id" => %project_id); tracing::info!(project_id = %project_id, "task subscriber shutting down");
break; break;
} }
payload = rx.recv() => { payload = rx.recv() => {
@ -1015,17 +1005,17 @@ pub async fn subscribe_task_events_fn(
manager.broadcast_agent_task(project_id, event).await; manager.broadcast_agent_task(project_id, event).await;
} }
Err(e) => { Err(e) => {
slog::warn!(log, "malformed AgentTaskEvent"; "error" => %e); tracing::warn!(error = %e, "malformed AgentTaskEvent");
} }
} }
} }
None => { None => {
slog::warn!(log, "task pubsub relay channel closed"; "project_id" => %project_id); tracing::warn!(project_id = %project_id, "task pubsub relay channel closed");
break; break;
} }
} }
} }
} }
} }
slog::info!(log, "task subscriber stopped"; "project_id" => %project_id); tracing::info!(project_id = %project_id, "task subscriber stopped");
} }

View File

@ -30,13 +30,13 @@ impl RoomService {
.await .await
{ {
if let Ok(responses) = serde_json::from_str::<Vec<super::RoomMemberResponse>>(&cached) { if let Ok(responses) = serde_json::from_str::<Vec<super::RoomMemberResponse>>(&cached) {
slog::debug!(self.log, "room_member_list: cache hit for key={}", cache_key); tracing::debug!(cache_key = %cache_key, "room_member_list: cache hit");
return Ok(responses); return Ok(responses);
} }
} }
} }
slog::debug!(self.log, "room_member_list: cache miss for key={}", cache_key); tracing::debug!(cache_key = %cache_key, "room_member_list: cache miss");
let members = room_member::Entity::find() let members = room_member::Entity::find()
.filter(room_member::Column::Room.eq(room_id)) .filter(room_member::Column::Room.eq(room_id))
@ -92,7 +92,7 @@ impl RoomService {
.query_async(&mut conn) .query_async(&mut conn)
.await .await
.inspect_err(|e| { .inspect_err(|e| {
slog::warn!(self.log, "room_member_list: failed to cache key={}: {}", cache_key, e); tracing::warn!(cache_key = %cache_key, error = %e, "room_member_list: failed to cache");
}) })
.ok(); .ok();
} }
@ -424,9 +424,9 @@ impl RoomService {
.query_async::<i64>(&mut conn) .query_async::<i64>(&mut conn)
.await .await
{ {
slog::warn!(self.log, "invalidate_member_list_cache: DEL failed for {}: {}", cache_key, e); tracing::warn!(cache_key = %cache_key, error = %e, "invalidate_member_list_cache: DEL failed");
} else { } else {
slog::debug!(self.log, "invalidate_member_list_cache: deleted {}", cache_key); tracing::debug!(cache_key = %cache_key, "invalidate_member_list_cache: deleted");
} }
} }
} }

View File

@ -233,11 +233,10 @@ impl RoomService {
.exec(&self.db) .exec(&self.db)
.await .await
{ {
slog::warn!( tracing::warn!(
self.log, message_id = %id,
"Failed to link attachments to message {}: {}", error = %e,
id, "Failed to link attachments to message"
e
); );
} }
} }
@ -288,7 +287,7 @@ impl RoomService {
let should_respond = match self.should_ai_respond(room_id).await { let should_respond = match self.should_ai_respond(room_id).await {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
slog::warn!(self.log, "should_ai_respond failed for room {}: {}", room_id, e); tracing::warn!(room_id = %room_id, error = %e, "should_ai_respond failed");
false false
} }
}; };
@ -302,7 +301,7 @@ impl RoomService {
.process_message_ai(room_id, id, user_id, content.clone()) .process_message_ai(room_id, id, user_id, content.clone())
.await .await
{ {
slog::warn!(self.log, "Failed to process AI message: {}", e); tracing::warn!(error = %e, "Failed to process AI message");
} }
} }

View File

@ -1,11 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use metrics::{ use metrics::{describe_counter, describe_gauge, describe_histogram, Counter, Gauge, Histogram, Unit};
describe_counter, describe_gauge, describe_histogram, register_counter, register_gauge, register_histogram, Counter,
Gauge, Histogram, Unit,
};
use tokio::sync::RwLock;
use uuid::Uuid; use uuid::Uuid;
pub struct RoomMetrics { pub struct RoomMetrics {
@ -27,8 +22,6 @@ pub struct RoomMetrics {
pub ws_heartbeat_sent_total: Counter, pub ws_heartbeat_sent_total: Counter,
pub ws_heartbeat_timeout_total: Counter, pub ws_heartbeat_timeout_total: Counter,
pub ws_idle_timeout_total: Counter, pub ws_idle_timeout_total: Counter,
room_connections: RwLock<HashMap<Uuid, Gauge>>,
room_messages: RwLock<HashMap<Uuid, Counter>>,
} }
impl Default for RoomMetrics { impl Default for RoomMetrics {
@ -119,26 +112,24 @@ impl Default for RoomMetrics {
); );
Self { Self {
rooms_online: register_gauge!("room_online_rooms"), rooms_online: metrics::gauge!("room_online_rooms"),
users_online: register_gauge!("room_online_users"), users_online: metrics::gauge!("room_online_users"),
ws_connections_active: register_gauge!("room_ws_connections_active"), ws_connections_active: metrics::gauge!("room_ws_connections_active"),
ws_connections_total: register_counter!("room_ws_connections_total"), ws_connections_total: metrics::counter!("room_ws_connections_total"),
ws_disconnections_total: register_counter!("room_ws_disconnections_total"), ws_disconnections_total: metrics::counter!("room_ws_disconnections_total"),
messages_sent: register_counter!("room_messages_sent_total"), messages_sent: metrics::counter!("room_messages_sent_total"),
messages_persisted: register_counter!("room_messages_persisted_total"), messages_persisted: metrics::counter!("room_messages_persisted_total"),
messages_persist_failed: register_counter!("room_messages_persist_failed_total"), messages_persist_failed: metrics::counter!("room_messages_persist_failed_total"),
broadcasts_sent: register_counter!("room_broadcasts_sent_total"), broadcasts_sent: metrics::counter!("room_broadcasts_sent_total"),
broadcasts_dropped: register_counter!("room_broadcasts_dropped_total"), broadcasts_dropped: metrics::counter!("room_broadcasts_dropped_total"),
duplicates_skipped: register_counter!("room_duplicates_skipped_total"), duplicates_skipped: metrics::counter!("room_duplicates_skipped_total"),
redis_publish_failed: register_counter!("room_redis_publish_failed_total"), redis_publish_failed: metrics::counter!("room_redis_publish_failed_total"),
message_latency_ms: register_histogram!("room_message_latency_ms"), message_latency_ms: metrics::histogram!("room_message_latency_ms"),
ws_rate_limit_hits: register_counter!("room_ws_rate_limit_hits_total"), ws_rate_limit_hits: metrics::counter!("room_ws_rate_limit_hits_total"),
ws_auth_failures: register_counter!("room_ws_auth_failures_total"), ws_auth_failures: metrics::counter!("room_ws_auth_failures_total"),
ws_heartbeat_sent_total: register_counter!("room_ws_heartbeat_sent_total"), ws_heartbeat_sent_total: metrics::counter!("room_ws_heartbeat_sent_total"),
ws_heartbeat_timeout_total: register_counter!("room_ws_heartbeat_timeout_total"), ws_heartbeat_timeout_total: metrics::counter!("room_ws_heartbeat_timeout_total"),
ws_idle_timeout_total: register_counter!("room_ws_idle_timeout_total"), ws_idle_timeout_total: metrics::counter!("room_ws_idle_timeout_total"),
room_connections: RwLock::new(HashMap::new()),
room_messages: RwLock::new(HashMap::new()),
} }
} }
} }
@ -157,34 +148,23 @@ impl RoomMetrics {
} }
pub async fn incr_room_connections(&self, room_id: Uuid) { pub async fn incr_room_connections(&self, room_id: Uuid) {
let mut map = self.room_connections.write().await; let name = format!("room_connections{{room_id=\"{}\"}}", room_id);
let counter = map.entry(room_id).or_insert_with(|| { metrics::gauge!(name).increment(1.0);
register_gauge!(format!("room_connections{{room_id=\"{}\"}}", room_id))
});
counter.increment(1.0);
} }
pub async fn dec_room_connections(&self, room_id: Uuid) { pub async fn dec_room_connections(&self, room_id: Uuid) {
let map = self.room_connections.read().await; let name = format!("room_connections{{room_id=\"{}\"}}", room_id);
if let Some(counter) = map.get(&room_id) { metrics::gauge!(name).decrement(1.0);
counter.decrement(1.0);
}
} }
pub async fn incr_room_messages(&self, room_id: Uuid) { pub async fn incr_room_messages(&self, room_id: Uuid) {
let mut map = self.room_messages.write().await; let name = format!("room_messages_total{{room_id=\"{}\"}}", room_id);
let counter = map.entry(room_id).or_insert_with(|| { metrics::counter!(name).increment(1);
register_counter!(format!("room_messages_total{{room_id=\"{}\"}}", room_id))
});
counter.increment(1);
} }
pub async fn cleanup_stale_rooms(&self, active_room_ids: &[Uuid]) { #[allow(dead_code)]
let mut conn_map = self.room_connections.write().await; pub async fn cleanup_stale_rooms(&self, _active_room_ids: &[Uuid]) {
conn_map.retain(|room_id, _| active_room_ids.contains(room_id)); // Per-room metrics are registered on-demand; no cleanup needed.
let mut msg_map = self.room_messages.write().await;
msg_map.retain(|room_id, _| active_room_ids.contains(room_id));
} }
pub fn into_arc(self) -> Arc<RoomMetrics> { pub fn into_arc(self) -> Arc<RoomMetrics> {

View File

@ -39,13 +39,13 @@ impl RoomService {
.await .await
{ {
if let Ok(responses) = serde_json::from_str::<Vec<super::RoomResponse>>(&cached) { if let Ok(responses) = serde_json::from_str::<Vec<super::RoomResponse>>(&cached) {
slog::debug!(self.log, "room_list: cache hit for key={}", cache_key); tracing::debug!(cache_key = %cache_key, "room_list: cache hit");
return Ok(responses); return Ok(responses);
} }
} }
} }
slog::debug!(self.log, "room_list: cache miss for key={}", cache_key); tracing::debug!(cache_key = %cache_key, "room_list: cache miss");
let mut query = room::Entity::find().filter(room::Column::Project.eq(project.id)); let mut query = room::Entity::find().filter(room::Column::Project.eq(project.id));
if only_public.unwrap_or(false) { if only_public.unwrap_or(false) {
@ -102,7 +102,7 @@ impl RoomService {
.query_async(&mut conn) .query_async(&mut conn)
.await .await
.inspect_err(|e| { .inspect_err(|e| {
slog::warn!(self.log, "room_list: failed to cache key={}: {}", cache_key, e); tracing::warn!(cache_key = %cache_key, error = %e, "room_list: failed to cache");
}) })
.ok(); .ok();
} }
@ -364,7 +364,7 @@ impl RoomService {
.query_async(&mut conn) .query_async(&mut conn)
.await .await
.inspect_err(|e| { .inspect_err(|e| {
slog::warn!(self.log, "room_delete: failed to DEL seq key {}: {}", seq_key, e); tracing::warn!(seq_key = %seq_key, error = %e, "room_delete: failed to DEL seq key");
}) })
.ok(); .ok();
} }
@ -412,7 +412,7 @@ impl RoomService {
{ {
Ok(result) => result, Ok(result) => result,
Err(e) => { Err(e) => {
slog::warn!(self.log, "invalidate_room_list_cache: SCAN failed: {}", e); tracing::warn!(error = %e, "invalidate_room_list_cache: SCAN failed");
break; break;
} }
}; };
@ -426,9 +426,9 @@ impl RoomService {
.query_async::<i64>(&mut conn) .query_async::<i64>(&mut conn)
.await .await
{ {
slog::warn!(self.log, "invalidate_room_list_cache: DEL failed: {}", e); tracing::warn!(error = %e, "invalidate_room_list_cache: DEL failed");
} else { } else {
slog::debug!(self.log, "invalidate_room_list_cache: deleted {} keys", keys.len()); tracing::debug!(keys_count = keys.len(), "invalidate_room_list_cache: deleted");
} }
} }

View File

@ -15,7 +15,6 @@ pub struct RoomAiLockGuard {
lock_token: String, lock_token: String,
request_uid: String, request_uid: String,
acquired: bool, acquired: bool,
log: slog::Logger,
} }
impl Drop for RoomAiLockGuard { impl Drop for RoomAiLockGuard {
@ -29,7 +28,6 @@ impl Drop for RoomAiLockGuard {
let lock_key = self.lock_key.clone(); let lock_key = self.lock_key.clone();
let lock_token = self.lock_token.clone(); let lock_token = self.lock_token.clone();
let request_uid = self.request_uid.clone(); let request_uid = self.request_uid.clone();
let log = self.log.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = release_lock( if let Err(e) = release_lock(
&cache, &cache,
@ -41,12 +39,11 @@ impl Drop for RoomAiLockGuard {
) )
.await .await
{ {
slog::warn!( tracing::warn!(
log, lock_key = %lock_key,
"RoomAiLockGuard: failed to release lock key={} token={} err={}", lock_token = %lock_token,
lock_key, error = %e,
lock_token, "RoomAiLockGuard: failed to release lock"
e
); );
} }
}); });
@ -56,7 +53,6 @@ impl Drop for RoomAiLockGuard {
pub async fn acquire_room_ai_lock( pub async fn acquire_room_ai_lock(
cache: &AppCache, cache: &AppCache,
room_id: Uuid, room_id: Uuid,
log: &slog::Logger,
) -> Result<Option<RoomAiLockGuard>, RoomError> { ) -> Result<Option<RoomAiLockGuard>, RoomError> {
let request_uid = Uuid::now_v7().to_string(); let request_uid = Uuid::now_v7().to_string();
let hostname = hostname::get() let hostname = hostname::get()
@ -103,11 +99,10 @@ pub async fn acquire_room_ai_lock(
let mut retry_count: u32 = 0; let mut retry_count: u32 = 0;
loop { loop {
if start.elapsed().as_millis() as usize >= TICKET_TTL_MS { if start.elapsed().as_millis() as usize >= TICKET_TTL_MS {
slog::warn!( tracing::warn!(
log, room_id = %room_id,
"RoomAiLock: timeout waiting for lock after {}ms, room_id={}", elapsed_ms = start.elapsed().as_millis(),
start.elapsed().as_millis(), "RoomAiLock: timeout waiting for lock"
room_id
); );
return Ok(None); return Ok(None);
} }
@ -153,7 +148,6 @@ pub async fn acquire_room_ai_lock(
lock_token, lock_token,
request_uid, request_uid,
acquired: true, acquired: true,
log: log.clone(),
})); }));
} }
} else { } else {

View File

@ -59,7 +59,6 @@ pub struct RoomService {
pub redis_url: String, pub redis_url: String,
pub chat_service: Option<Arc<ChatService>>, pub chat_service: Option<Arc<ChatService>>,
pub task_service: Option<Arc<TaskService>>, pub task_service: Option<Arc<TaskService>>,
pub log: slog::Logger,
pub push_fn: Option<PushNotificationFn>, pub push_fn: Option<PushNotificationFn>,
worker_semaphore: Arc<tokio::sync::Semaphore>, worker_semaphore: Arc<tokio::sync::Semaphore>,
dedup_cache: DedupCache, dedup_cache: DedupCache,
@ -75,7 +74,6 @@ impl RoomService {
redis_url: String, redis_url: String,
chat_service: Option<Arc<ChatService>>, chat_service: Option<Arc<ChatService>>,
task_service: Option<Arc<TaskService>>, task_service: Option<Arc<TaskService>>,
log: slog::Logger,
max_concurrent_workers: Option<usize>, max_concurrent_workers: Option<usize>,
push_fn: Option<PushNotificationFn>, push_fn: Option<PushNotificationFn>,
) -> Self { ) -> Self {
@ -90,7 +88,6 @@ impl RoomService {
redis_url, redis_url,
chat_service, chat_service,
task_service, task_service,
log,
worker_semaphore: Arc::new(tokio::sync::Semaphore::new( worker_semaphore: Arc::new(tokio::sync::Semaphore::new(
max_concurrent_workers.unwrap_or(DEFAULT_MAX_CONCURRENT_WORKERS), max_concurrent_workers.unwrap_or(DEFAULT_MAX_CONCURRENT_WORKERS),
)), )),
@ -102,7 +99,6 @@ impl RoomService {
pub async fn start_workers( pub async fn start_workers(
&self, &self,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
log: slog::Logger,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
use models::rooms::Room; use models::rooms::Room;
use sea_orm::EntityTrait; use sea_orm::EntityTrait;
@ -119,8 +115,7 @@ impl RoomService {
// Save a clone for task subscriber handles before `project_ids` gets moved. // Save a clone for task subscriber handles before `project_ids` gets moved.
let task_project_ids = project_ids.clone(); let task_project_ids = project_ids.clone();
slog::info!(log, "starting room workers"; tracing::info!(room_count = room_ids.len(), project_count = project_ids.len(), "starting room workers");
"room_count" => room_ids.len(), "project_count" => project_ids.len());
let persist_fn: PersistFn = make_persist_fn( let persist_fn: PersistFn = make_persist_fn(
self.db.clone(), self.db.clone(),
@ -131,7 +126,6 @@ impl RoomService {
let get_redis: Arc<dyn Fn() -> queue::worker::RedisFuture + Send + Sync> = let get_redis: Arc<dyn Fn() -> queue::worker::RedisFuture + Send + Sync> =
extract_get_redis(self.queue.clone()); extract_get_redis(self.queue.clone());
let worker_log = log.clone();
let worker_room_ids = room_ids.clone(); let worker_room_ids = room_ids.clone();
let worker_shutdown = shutdown_rx.resubscribe(); let worker_shutdown = shutdown_rx.resubscribe();
let worker_handle = tokio::spawn({ let worker_handle = tokio::spawn({
@ -143,21 +137,18 @@ impl RoomService {
get_redis, get_redis,
persist_fn, persist_fn,
worker_shutdown, worker_shutdown,
worker_log,
) )
.await; .await;
} }
}); });
let manager = self.room_manager.clone(); let manager = self.room_manager.clone();
let subscriber_log = log.clone();
let redis_url = self.redis_url.clone(); let redis_url = self.redis_url.clone();
let mut handles: Vec<_> = room_ids let mut handles: Vec<_> = room_ids
.into_iter() .into_iter()
.map(|room_id| { .map(|room_id| {
let manager = manager.clone(); let manager = manager.clone();
let log = subscriber_log.clone();
let redis_url = redis_url.clone(); let redis_url = redis_url.clone();
let shutdown_rx = shutdown_rx.resubscribe(); let shutdown_rx = shutdown_rx.resubscribe();
tokio::spawn(async move { tokio::spawn(async move {
@ -165,7 +156,6 @@ impl RoomService {
redis_url, redis_url,
manager, manager,
room_id, room_id,
log,
shutdown_rx, shutdown_rx,
) )
.await; .await;
@ -177,7 +167,6 @@ impl RoomService {
.into_iter() .into_iter()
.map(|project_id| { .map(|project_id| {
let manager = manager.clone(); let manager = manager.clone();
let log = subscriber_log.clone();
let redis_url = redis_url.clone(); let redis_url = redis_url.clone();
let shutdown_rx = shutdown_rx.resubscribe(); let shutdown_rx = shutdown_rx.resubscribe();
tokio::spawn(async move { tokio::spawn(async move {
@ -185,7 +174,6 @@ impl RoomService {
redis_url, redis_url,
manager, manager,
project_id, project_id,
log,
shutdown_rx, shutdown_rx,
) )
.await; .await;
@ -199,7 +187,6 @@ impl RoomService {
.into_iter() .into_iter()
.map(|project_id| { .map(|project_id| {
let manager = manager.clone(); let manager = manager.clone();
let log = subscriber_log.clone();
let redis_url = redis_url.clone(); let redis_url = redis_url.clone();
let shutdown_rx = shutdown_rx.resubscribe(); let shutdown_rx = shutdown_rx.resubscribe();
tokio::spawn(async move { tokio::spawn(async move {
@ -207,7 +194,6 @@ impl RoomService {
redis_url, redis_url,
manager, manager,
project_id, project_id,
log,
shutdown_rx, shutdown_rx,
) )
.await; .await;
@ -238,7 +224,7 @@ impl RoomService {
} }
} }
_ = cleanup_shutdown.recv() => { _ = cleanup_shutdown.recv() => {
slog::info!(slog::Logger::root(slog::Discard, slog::o!()), "cleanup task shutting down"); tracing::info!("cleanup task shutting down");
break; break;
} }
} }
@ -249,14 +235,14 @@ impl RoomService {
let _ = shutdown_rx.recv().await; let _ = shutdown_rx.recv().await;
slog::info!(log, "room workers shutting down"); tracing::info!("room workers shutting down");
for h in handles { for h in handles {
let _ = h.abort(); let _ = h.abort();
} }
let _ = worker_handle.await; let _ = worker_handle.await;
slog::info!(log, "room workers stopped"); tracing::info!("room workers stopped");
Ok(()) Ok(())
} }
@ -311,7 +297,6 @@ impl RoomService {
let queue = self.queue.clone(); let queue = self.queue.clone();
let room_manager = self.room_manager.clone(); let room_manager = self.room_manager.clone();
let semaphore = self.worker_semaphore.clone(); let semaphore = self.worker_semaphore.clone();
let log = self.log.clone();
// Spawn the background task. // Spawn the background task.
tokio::spawn(async move { tokio::spawn(async move {
@ -354,7 +339,7 @@ impl RoomService {
.publish_agent_task_event(project_id, event.clone()) .publish_agent_task_event(project_id, event.clone())
.await; .await;
room_manager.broadcast_agent_task(project_id, event).await; room_manager.broadcast_agent_task(project_id, event).await;
slog::info!(log, "agent task finished"; "task_id" => task_id, "project_id" => %project_id); tracing::info!(task_id = task_id, project_id = %project_id, "agent task finished");
}); });
Ok(task_id) Ok(task_id)
@ -370,12 +355,9 @@ impl RoomService {
extract_get_redis(self.queue.clone()); extract_get_redis(self.queue.clone());
let manager = self.room_manager.clone(); let manager = self.room_manager.clone();
let redis_url = self.redis_url.clone(); let redis_url = self.redis_url.clone();
let log = self.log.clone();
let semaphore = self.worker_semaphore.clone(); let semaphore = self.worker_semaphore.clone();
let db = self.db.clone(); let db = self.db.clone();
let log2 = log.clone();
let log3 = log.clone();
let manager2 = self.room_manager.clone(); let manager2 = self.room_manager.clone();
let redis_url2 = redis_url.clone(); let redis_url2 = redis_url.clone();
let redis_url3 = redis_url.clone(); let redis_url3 = redis_url.clone();
@ -392,7 +374,6 @@ impl RoomService {
get_redis, get_redis,
persist_fn, persist_fn,
shutdown_rx, shutdown_rx,
log,
) )
.await; .await;
let _ = shutdown_tx.send(()); let _ = shutdown_tx.send(());
@ -404,7 +385,6 @@ impl RoomService {
redis_url2, redis_url2,
manager.clone(), manager.clone(),
room_id, room_id,
log2,
shutdown_rx, shutdown_rx,
) )
.await; .await;
@ -427,7 +407,6 @@ impl RoomService {
redis_url3, redis_url3,
manager2, manager2,
project_id, project_id,
log3,
shutdown_rx, shutdown_rx,
) )
.await; .await;
@ -480,9 +459,8 @@ impl RoomService {
{ {
Ok(m) => m, Ok(m) => m,
Err(e) => { Err(e) => {
slog::error!(slog::Logger::root(slog::Discard, slog::o!()), tracing::error!(project_id = %project_id_inner, error = %e,
"notify_project_members: failed to fetch members"; "notify_project_members: failed to fetch members");
"project_id" => %project_id_inner, "error" => %e);
return; return;
} }
}; };
@ -500,9 +478,8 @@ impl RoomService {
) )
.await .await
{ {
slog::warn!(slog::Logger::root(slog::Discard, slog::o!()), tracing::warn!(user_id = %user_id, project_id = %project_id_inner, error = %e,
"notify_project_members: failed to create notification for user"; "notify_project_members: failed to create notification for user");
"user_id" => %user_id, "project_id" => %project_id_inner, "error" => %e);
} }
} }
}); });
@ -865,7 +842,7 @@ impl RoomService {
}; };
let Some(lock_guard) = let Some(lock_guard) =
crate::room_ai_queue::acquire_room_ai_lock(&self.cache, room_id, &self.log).await? crate::room_ai_queue::acquire_room_ai_lock(&self.cache, room_id).await?
else { else {
return Ok(()); return Ok(());
}; };
@ -962,11 +939,7 @@ impl RoomService {
let seq = match Self::next_room_message_seq_internal(room_id, &self.db, &self.cache).await { let seq = match Self::next_room_message_seq_internal(room_id, &self.db, &self.cache).await {
Ok(s) => s, Ok(s) => s,
Err(e) => { Err(e) => {
slog::error!( tracing::error!(error = %e, "Failed to get seq for streaming AI message");
self.log,
"Failed to get seq for streaming AI message: {}",
e
);
return; return;
} }
}; };
@ -987,7 +960,6 @@ impl RoomService {
let db = db.clone(); let db = db.clone();
let model_id = request.model.id; let model_id = request.model.id;
let log = self.log.clone();
tokio::spawn(async move { tokio::spawn(async move {
let _lock_guard = lock_guard; let _lock_guard = lock_guard;
let room_manager = room_manager.clone(); let room_manager = room_manager.clone();
@ -1059,7 +1031,7 @@ impl RoomService {
}; };
if let Err(e) = queue.publish(room_id_inner, envelope).await { if let Err(e) = queue.publish(room_id_inner, envelope).await {
slog::error!(log, "Failed to publish streaming AI message: {}", e); tracing::error!(error = %e, "Failed to publish streaming AI message");
} else { } else {
let now = Utc::now(); let now = Utc::now();
if let Err(e) = room_ai::Entity::update_many() if let Err(e) = room_ai::Entity::update_many()
@ -1073,7 +1045,7 @@ impl RoomService {
.exec(&db) .exec(&db)
.await .await
{ {
slog::warn!(log, "Failed to update room_ai call stats: {}", e); tracing::warn!(error = %e, "Failed to update room_ai call stats");
} }
let msg_event = queue::RoomMessageEvent { let msg_event = queue::RoomMessageEvent {
@ -1109,7 +1081,7 @@ impl RoomService {
} }
} }
Err(e) => { Err(e) => {
slog::error!(log, "AI streaming failed: {}", e); tracing::error!(error = %e, "AI streaming failed");
let event = RoomMessageStreamChunkEvent { let event = RoomMessageStreamChunkEvent {
message_id: streaming_msg_id, message_id: streaming_msg_id,
room_id: room_id_inner, room_id: room_id_inner,
@ -1140,7 +1112,6 @@ impl RoomService {
let cache = self.cache.clone(); let cache = self.cache.clone();
let queue = self.queue.clone(); let queue = self.queue.clone();
let room_manager = self.room_manager.clone(); let room_manager = self.room_manager.clone();
let log = self.log.clone();
let room_id_for_ai = room_id; let room_id_for_ai = room_id;
let project_id_for_ai = project_id; let project_id_for_ai = project_id;
let model_id_inner = model_id; let model_id_inner = model_id;
@ -1164,7 +1135,7 @@ impl RoomService {
) )
.await .await
{ {
slog::error!(log, "Failed to create AI message: {}", e); tracing::error!(error = %e, "Failed to create AI message");
} else { } else {
let now = Utc::now(); let now = Utc::now();
if let Err(e) = room_ai::Entity::update_many() if let Err(e) = room_ai::Entity::update_many()
@ -1178,12 +1149,12 @@ impl RoomService {
.exec(&db) .exec(&db)
.await .await
{ {
slog::warn!(log, "Failed to update room_ai call stats: {}", e); tracing::warn!(error = %e, "Failed to update room_ai call stats");
} }
} }
} }
Err(e) => { Err(e) => {
slog::error!(log, "AI processing failed: {}", e); tracing::error!(error = %e, "AI processing failed");
} }
} }
}); });