From 57d0fc371e6840b7965eb4ec6442b83436e7ea8d Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Sat, 25 Apr 2026 09:52:41 +0800 Subject: [PATCH] fix(room): include display_name in RoomMessageEnvelope for AI streaming RoomMessageEvent was losing the AI model name because the From impl hardcoded display_name: None. Add display_name to RoomMessageEnvelope and propagate it through all AI streaming code paths (chat, ReAct, non-streaming). Member messages keep display_name: None. --- libs/queue/types.rs | 7 ++++- libs/room/src/message.rs | 1 + libs/room/src/service.rs | 60 +++++++++++++++++++++++----------------- 3 files changed, 41 insertions(+), 27 deletions(-) diff --git a/libs/queue/types.rs b/libs/queue/types.rs index f7cf8ed..f8f4029 100644 --- a/libs/queue/types.rs +++ b/libs/queue/types.rs @@ -19,6 +19,9 @@ pub struct RoomMessageEnvelope { pub content_type: String, pub send_at: DateTime, pub seq: i64, + /// Pre-resolved display name for the sender (e.g. AI model name). + #[serde(skip_serializing_if = "Option::is_none")] + pub display_name: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -75,7 +78,7 @@ impl From for RoomMessageEvent { content_type: e.content_type, send_at: e.send_at, seq: e.seq, - display_name: None, + display_name: e.display_name, reactions: None, message_id: None, } @@ -102,6 +105,8 @@ pub struct RoomMessageStreamChunkEvent { pub error: Option, /// Human-readable AI model name (e.g. "Claude 3.5 Sonnet") for display. pub display_name: Option, + /// What kind of content this chunk contains: "thinking", "answer", "tool_call", "tool_result". + pub chunk_type: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/libs/room/src/message.rs b/libs/room/src/message.rs index 1675319..bc6f9ab 100644 --- a/libs/room/src/message.rs +++ b/libs/room/src/message.rs @@ -177,6 +177,7 @@ impl RoomService { content_type: content_type_str.clone(), send_at: now, seq, + display_name: None, }; let db = &self.db; diff --git a/libs/room/src/service.rs b/libs/room/src/service.rs index 5d881cd..a804a0a 100644 --- a/libs/room/src/service.rs +++ b/libs/room/src/service.rs @@ -28,7 +28,8 @@ const DEFAULT_MAX_CONCURRENT_WORKERS: usize = 1024; /// Callback type for sending push notifications. /// The caller (AppService) provides this to RoomService so it can trigger /// browser push notifications without depending on the service crate directly. -pub type PushNotificationFn = Arc, Option) + Send + Sync>; +pub type PushNotificationFn = + Arc, Option) + Send + Sync>; /// Legacy: uuid or username static USER_MENTION_RE: LazyLock regex_lite::Regex> = @@ -116,7 +117,11 @@ impl RoomService { // Save a clone for task subscriber handles before `project_ids` gets moved. let task_project_ids = project_ids.clone(); - tracing::info!(room_count = room_ids.len(), project_count = project_ids.len(), "starting room workers"); + tracing::info!( + room_count = room_ids.len(), + project_count = project_ids.len(), + "starting room workers" + ); let persist_fn: PersistFn = make_persist_fn( self.db.clone(), @@ -133,13 +138,7 @@ impl RoomService { let get_redis = get_redis.clone(); let persist_fn = persist_fn.clone(); async move { - queue::start_worker( - worker_room_ids, - get_redis, - persist_fn, - worker_shutdown, - ) - .await; + queue::start_worker(worker_room_ids, get_redis, persist_fn, worker_shutdown).await; } }); @@ -761,11 +760,7 @@ impl RoomService { /// - use_exact = false → respond to every text message. /// - use_exact = true → only respond when the message contains an @[ai:...] or /// ... tag that mentions this room's configured AI model. - pub async fn should_ai_respond( - &self, - room_id: Uuid, - content: &str, - ) -> Result { + pub async fn should_ai_respond(&self, room_id: Uuid, content: &str) -> Result { use models::rooms::room_ai; let ai_config = room_ai::Entity::find() @@ -1046,6 +1041,12 @@ impl RoomService { // Clone display_name INSIDE the async block so the outer closure stays `Fn`. let ai_display_name_for_chunk = ai_display_name_for_chunk.clone(); async move { + let chunk_type_str = match chunk.chunk_type { + agent::chat::AiChunkType::Thinking => "thinking", + agent::chat::AiChunkType::Answer => "answer", + agent::chat::AiChunkType::ToolCall => "tool_call", + agent::chat::AiChunkType::ToolResult => "tool_result", + }; let event = RoomMessageStreamChunkEvent { message_id: streaming_msg_id, room_id, @@ -1053,6 +1054,7 @@ impl RoomService { done: chunk.done, error: None, display_name: Some(ai_display_name_for_chunk), + chunk_type: Some(chunk_type_str.to_string()), }; room_manager.broadcast_stream_chunk(event).await; @@ -1091,6 +1093,7 @@ impl RoomService { send_at: now, seq, in_reply_to: None, + display_name: Some(ai_display_name_for_final.clone()), }; if let Err(e) = queue.publish(room_id_inner, envelope).await { @@ -1152,6 +1155,7 @@ impl RoomService { done: true, error: Some(e.to_string()), display_name: Some(ai_display_name.clone()), + chunk_type: None, }; room_manager.broadcast_stream_chunk(event).await; } @@ -1376,17 +1380,15 @@ impl RoomService { format!("[Thinking] {}", thought) } ReactStep::Action { step: _, action } => { - format!( - "[Action] Calling `{}` with {:?}", - action.name, action.args - ) + format!("[Action] Calling `{}` with {:?}", action.name, action.args) } - ReactStep::Observation { step: _, observation } => { + ReactStep::Observation { + step: _, + observation, + } => { format!("[Observation] {}", observation) } - ReactStep::Answer { step: _, answer } => { - answer.clone() - } + ReactStep::Answer { step: _, answer } => answer.clone(), }; let is_answer = matches!(&step, ReactStep::Answer { .. }); @@ -1408,6 +1410,11 @@ impl RoomService { done, error: None, display_name: Some((*ai_name).clone()), + chunk_type: Some(if is_answer { + "answer".to_string() + } else { + "thinking".to_string() + }), }; room_manager.broadcast_stream_chunk(event).await; @@ -1423,9 +1430,7 @@ impl RoomService { } }; - let result = chat_service - .process_react(&request, on_step) - .await; + let result = chat_service.process_react(&request, on_step).await; let final_content = answer_buffer.lock().unwrap().clone(); let reasoning_chain = reasoning_buffer.lock().unwrap().clone(); @@ -1454,7 +1459,8 @@ impl RoomService { format!( "{}\n[Error during reasoning: {}]", content_to_persist.trim_end(), - msg.trim_start_matches("[Agent error: ").trim_end_matches("]") + msg.trim_start_matches("[Agent error: ") + .trim_end_matches("]") ) } else { content_to_persist @@ -1482,6 +1488,7 @@ impl RoomService { send_at: now, seq, in_reply_to: None, + display_name: Some(ai_display_name.clone()), }; if let Err(e) = queue.publish(room_id_inner, envelope).await { @@ -1567,6 +1574,7 @@ impl RoomService { send_at: now, seq, in_reply_to: None, + display_name: model_display_name.clone(), }; queue.publish(room_id, envelope).await?;