fix(room): include display_name in RoomMessageEnvelope for AI streaming

RoomMessageEvent was losing the AI model name because the
From<RoomMessageEnvelope> impl hardcoded display_name: None.
Add display_name to RoomMessageEnvelope and propagate it through
all AI streaming code paths (chat, ReAct, non-streaming).
Member messages keep display_name: None.
This commit is contained in:
ZhenYi 2026-04-25 09:52:41 +08:00
parent 6b3b77384e
commit 57d0fc371e
3 changed files with 41 additions and 27 deletions

View File

@ -19,6 +19,9 @@ pub struct RoomMessageEnvelope {
pub content_type: String, pub content_type: String,
pub send_at: DateTime<Utc>, pub send_at: DateTime<Utc>,
pub seq: i64, pub seq: i64,
/// Pre-resolved display name for the sender (e.g. AI model name).
#[serde(skip_serializing_if = "Option::is_none")]
pub display_name: Option<String>,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@ -75,7 +78,7 @@ impl From<RoomMessageEnvelope> for RoomMessageEvent {
content_type: e.content_type, content_type: e.content_type,
send_at: e.send_at, send_at: e.send_at,
seq: e.seq, seq: e.seq,
display_name: None, display_name: e.display_name,
reactions: None, reactions: None,
message_id: None, message_id: None,
} }
@ -102,6 +105,8 @@ pub struct RoomMessageStreamChunkEvent {
pub error: Option<String>, pub error: Option<String>,
/// Human-readable AI model name (e.g. "Claude 3.5 Sonnet") for display. /// Human-readable AI model name (e.g. "Claude 3.5 Sonnet") for display.
pub display_name: Option<String>, pub display_name: Option<String>,
/// What kind of content this chunk contains: "thinking", "answer", "tool_call", "tool_result".
pub chunk_type: Option<String>,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]

View File

@ -177,6 +177,7 @@ impl RoomService {
content_type: content_type_str.clone(), content_type: content_type_str.clone(),
send_at: now, send_at: now,
seq, seq,
display_name: None,
}; };
let db = &self.db; let db = &self.db;

View File

@ -28,7 +28,8 @@ const DEFAULT_MAX_CONCURRENT_WORKERS: usize = 1024;
/// Callback type for sending push notifications. /// Callback type for sending push notifications.
/// The caller (AppService) provides this to RoomService so it can trigger /// The caller (AppService) provides this to RoomService so it can trigger
/// browser push notifications without depending on the service crate directly. /// browser push notifications without depending on the service crate directly.
pub type PushNotificationFn = Arc<dyn Fn(Uuid, String, Option<String>, Option<String>) + Send + Sync>; pub type PushNotificationFn =
Arc<dyn Fn(Uuid, String, Option<String>, Option<String>) + Send + Sync>;
/// Legacy: <user>uuid</user> or <user>username</user> /// Legacy: <user>uuid</user> or <user>username</user>
static USER_MENTION_RE: LazyLock<regex_lite::Regex, fn() -> regex_lite::Regex> = static USER_MENTION_RE: LazyLock<regex_lite::Regex, fn() -> regex_lite::Regex> =
@ -116,7 +117,11 @@ impl RoomService {
// Save a clone for task subscriber handles before `project_ids` gets moved. // Save a clone for task subscriber handles before `project_ids` gets moved.
let task_project_ids = project_ids.clone(); let task_project_ids = project_ids.clone();
tracing::info!(room_count = room_ids.len(), project_count = project_ids.len(), "starting room workers"); tracing::info!(
room_count = room_ids.len(),
project_count = project_ids.len(),
"starting room workers"
);
let persist_fn: PersistFn = make_persist_fn( let persist_fn: PersistFn = make_persist_fn(
self.db.clone(), self.db.clone(),
@ -133,13 +138,7 @@ impl RoomService {
let get_redis = get_redis.clone(); let get_redis = get_redis.clone();
let persist_fn = persist_fn.clone(); let persist_fn = persist_fn.clone();
async move { async move {
queue::start_worker( queue::start_worker(worker_room_ids, get_redis, persist_fn, worker_shutdown).await;
worker_room_ids,
get_redis,
persist_fn,
worker_shutdown,
)
.await;
} }
}); });
@ -761,11 +760,7 @@ impl RoomService {
/// - use_exact = false → respond to every text message. /// - use_exact = false → respond to every text message.
/// - use_exact = true → only respond when the message contains an @[ai:...] or /// - use_exact = true → only respond when the message contains an @[ai:...] or
/// <mention type="ai">... tag that mentions this room's configured AI model. /// <mention type="ai">... tag that mentions this room's configured AI model.
pub async fn should_ai_respond( pub async fn should_ai_respond(&self, room_id: Uuid, content: &str) -> Result<bool, RoomError> {
&self,
room_id: Uuid,
content: &str,
) -> Result<bool, RoomError> {
use models::rooms::room_ai; use models::rooms::room_ai;
let ai_config = room_ai::Entity::find() let ai_config = room_ai::Entity::find()
@ -1046,6 +1041,12 @@ impl RoomService {
// Clone display_name INSIDE the async block so the outer closure stays `Fn`. // Clone display_name INSIDE the async block so the outer closure stays `Fn`.
let ai_display_name_for_chunk = ai_display_name_for_chunk.clone(); let ai_display_name_for_chunk = ai_display_name_for_chunk.clone();
async move { async move {
let chunk_type_str = match chunk.chunk_type {
agent::chat::AiChunkType::Thinking => "thinking",
agent::chat::AiChunkType::Answer => "answer",
agent::chat::AiChunkType::ToolCall => "tool_call",
agent::chat::AiChunkType::ToolResult => "tool_result",
};
let event = RoomMessageStreamChunkEvent { let event = RoomMessageStreamChunkEvent {
message_id: streaming_msg_id, message_id: streaming_msg_id,
room_id, room_id,
@ -1053,6 +1054,7 @@ impl RoomService {
done: chunk.done, done: chunk.done,
error: None, error: None,
display_name: Some(ai_display_name_for_chunk), display_name: Some(ai_display_name_for_chunk),
chunk_type: Some(chunk_type_str.to_string()),
}; };
room_manager.broadcast_stream_chunk(event).await; room_manager.broadcast_stream_chunk(event).await;
@ -1091,6 +1093,7 @@ impl RoomService {
send_at: now, send_at: now,
seq, seq,
in_reply_to: None, in_reply_to: None,
display_name: Some(ai_display_name_for_final.clone()),
}; };
if let Err(e) = queue.publish(room_id_inner, envelope).await { if let Err(e) = queue.publish(room_id_inner, envelope).await {
@ -1152,6 +1155,7 @@ impl RoomService {
done: true, done: true,
error: Some(e.to_string()), error: Some(e.to_string()),
display_name: Some(ai_display_name.clone()), display_name: Some(ai_display_name.clone()),
chunk_type: None,
}; };
room_manager.broadcast_stream_chunk(event).await; room_manager.broadcast_stream_chunk(event).await;
} }
@ -1376,17 +1380,15 @@ impl RoomService {
format!("[Thinking] {}", thought) format!("[Thinking] {}", thought)
} }
ReactStep::Action { step: _, action } => { ReactStep::Action { step: _, action } => {
format!( format!("[Action] Calling `{}` with {:?}", action.name, action.args)
"[Action] Calling `{}` with {:?}",
action.name, action.args
)
} }
ReactStep::Observation { step: _, observation } => { ReactStep::Observation {
step: _,
observation,
} => {
format!("[Observation] {}", observation) format!("[Observation] {}", observation)
} }
ReactStep::Answer { step: _, answer } => { ReactStep::Answer { step: _, answer } => answer.clone(),
answer.clone()
}
}; };
let is_answer = matches!(&step, ReactStep::Answer { .. }); let is_answer = matches!(&step, ReactStep::Answer { .. });
@ -1408,6 +1410,11 @@ impl RoomService {
done, done,
error: None, error: None,
display_name: Some((*ai_name).clone()), display_name: Some((*ai_name).clone()),
chunk_type: Some(if is_answer {
"answer".to_string()
} else {
"thinking".to_string()
}),
}; };
room_manager.broadcast_stream_chunk(event).await; room_manager.broadcast_stream_chunk(event).await;
@ -1423,9 +1430,7 @@ impl RoomService {
} }
}; };
let result = chat_service let result = chat_service.process_react(&request, on_step).await;
.process_react(&request, on_step)
.await;
let final_content = answer_buffer.lock().unwrap().clone(); let final_content = answer_buffer.lock().unwrap().clone();
let reasoning_chain = reasoning_buffer.lock().unwrap().clone(); let reasoning_chain = reasoning_buffer.lock().unwrap().clone();
@ -1454,7 +1459,8 @@ impl RoomService {
format!( format!(
"{}\n[Error during reasoning: {}]", "{}\n[Error during reasoning: {}]",
content_to_persist.trim_end(), content_to_persist.trim_end(),
msg.trim_start_matches("[Agent error: ").trim_end_matches("]") msg.trim_start_matches("[Agent error: ")
.trim_end_matches("]")
) )
} else { } else {
content_to_persist content_to_persist
@ -1482,6 +1488,7 @@ impl RoomService {
send_at: now, send_at: now,
seq, seq,
in_reply_to: None, in_reply_to: None,
display_name: Some(ai_display_name.clone()),
}; };
if let Err(e) = queue.publish(room_id_inner, envelope).await { if let Err(e) = queue.publish(room_id_inner, envelope).await {
@ -1567,6 +1574,7 @@ impl RoomService {
send_at: now, send_at: now,
seq, seq,
in_reply_to: None, in_reply_to: None,
display_name: model_display_name.clone(),
}; };
queue.publish(room_id, envelope).await?; queue.publish(room_id, envelope).await?;