fix(room): reasoning chain fallback, streaming error messages, borrow fixes
Some checks are pending
CI / Rust Tests (push) Waiting to run
CI / Frontend Lint & Type Check (push) Waiting to run
CI / Rust Lint & Check (push) Waiting to run
CI / Frontend Build (push) Blocked by required conditions

- ReAct streaming: collect all ReactStep chunks into reasoning_buffer;
  if no Answer step is emitted, persist the full reasoning chain instead
  of empty content
- All AI error paths (reasoning loop failure, non-streaming errors) now
  send user-visible [AI error: ...] messages instead of silently dropping
- Fix borrow checker: clone content before struct init, use should_log bool
  to avoid double-borrow on err_msg
This commit is contained in:
ZhenYi 2026-04-24 13:17:20 +08:00
parent d89d02e81b
commit beee62832f
2 changed files with 186 additions and 92 deletions

View File

@ -284,7 +284,7 @@ impl RoomService {
.await; .await;
} }
let should_respond = match self.should_ai_respond(room_id).await { let should_respond = match self.should_ai_respond(room_id, &content).await {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
tracing::warn!(room_id = %room_id, error = %e, "should_ai_respond failed"); tracing::warn!(room_id = %room_id, error = %e, "should_ai_respond failed");

View File

@ -756,7 +756,16 @@ impl RoomService {
} }
} }
pub async fn should_ai_respond(&self, room_id: Uuid) -> Result<bool, RoomError> { /// Determine whether AI should respond to a message in this room.
/// - No room_ai config → AI not configured, never respond.
/// - use_exact = false → respond to every text message.
/// - use_exact = true → only respond when the message contains an @[ai:...] or
/// <mention type="ai">... tag that mentions this room's configured AI model.
pub async fn should_ai_respond(
&self,
room_id: Uuid,
content: &str,
) -> Result<bool, RoomError> {
use models::rooms::room_ai; use models::rooms::room_ai;
let ai_config = room_ai::Entity::find() let ai_config = room_ai::Entity::find()
@ -764,7 +773,37 @@ impl RoomService {
.one(&self.db) .one(&self.db)
.await?; .await?;
Ok(ai_config.is_some()) let config = match ai_config {
Some(c) => c,
None => return Ok(false),
};
if !config.use_exact {
return Ok(true);
}
// use_exact mode: only respond when AI is explicitly mentioned
let model_id_str = config.model.to_string();
// Check @[ai:model_id:label] format
for cap in MENTION_BRACKET_RE.captures_iter(content) {
if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) {
if type_m.as_str() == "ai" && id_m.as_str().trim() == model_id_str {
return Ok(true);
}
}
}
// Check <mention type="ai" id="model_id">label</mention> format
for cap in MENTION_TAG_RE.captures_iter(content) {
if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) {
if type_m.as_str() == "ai" && id_m.as_str().trim() == model_id_str {
return Ok(true);
}
}
}
Ok(false)
} }
pub async fn get_room_ai_config( pub async fn get_room_ai_config(
@ -1179,6 +1218,20 @@ impl RoomService {
} }
Err(e) => { Err(e) => {
tracing::error!(error = %e, "AI processing failed"); tracing::error!(error = %e, "AI processing failed");
// Send an error message so the user knows something went wrong
let _ = Self::create_and_publish_ai_message(
&db,
&cache,
&queue,
&room_manager,
room_id_for_ai,
project_id_for_ai,
Uuid::now_v7(),
format!("[AI error: {}]", e),
model_id_inner,
Some(model_display_name),
)
.await;
} }
} }
}); });
@ -1249,6 +1302,19 @@ impl RoomService {
} }
Err(e) => { Err(e) => {
tracing::error!(error = %e, "ReAct agent failed"); tracing::error!(error = %e, "ReAct agent failed");
let _ = Self::create_and_publish_ai_message(
&db,
&cache,
&queue,
&room_manager,
room_id_for_ai,
project_id_for_ai,
Uuid::now_v7(),
format!("[AI error: {}]", e),
model_id_inner,
Some(model_display_name),
)
.await;
} }
} }
}); });
@ -1288,7 +1354,9 @@ impl RoomService {
tokio::spawn(async move { tokio::spawn(async move {
let _lock_guard = lock_guard; let _lock_guard = lock_guard;
// Buffer each ReactStep and forward as a stream chunk. // Buffer all reasoning steps + the final answer separately.
let reasoning_buffer: std::sync::Arc<std::sync::Mutex<String>> =
std::sync::Arc::new(std::sync::Mutex::new(String::new()));
let answer_buffer: std::sync::Arc<std::sync::Mutex<String>> = let answer_buffer: std::sync::Arc<std::sync::Mutex<String>> =
std::sync::Arc::new(std::sync::Mutex::new(String::new())); std::sync::Arc::new(std::sync::Mutex::new(String::new()));
let step_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); let step_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
@ -1299,6 +1367,7 @@ impl RoomService {
let room_id = room_id_inner; let room_id = room_id_inner;
let step_count = step_count.clone(); let step_count = step_count.clone();
let ai_display_name_for_step = std::sync::Arc::new(ai_display_name.clone()); let ai_display_name_for_step = std::sync::Arc::new(ai_display_name.clone());
let reasoning_buffer = reasoning_buffer.clone();
let answer_buffer = answer_buffer.clone(); let answer_buffer = answer_buffer.clone();
move |step: ReactStep| { move |step: ReactStep| {
let room_manager = room_manager.clone(); let room_manager = room_manager.clone();
@ -1320,31 +1389,35 @@ impl RoomService {
} }
}; };
if let ReactStep::Answer { .. } = &step { let is_answer = matches!(&step, ReactStep::Answer { .. });
if is_answer {
step_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); step_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
} }
let done = matches!(&step, ReactStep::Answer { .. }); let done = is_answer;
let content_for_buffer = if done {
content.clone()
} else {
String::new()
};
let ai_name = ai_display_name_for_step.clone(); let ai_name = ai_display_name_for_step.clone();
let reasoning_buf = reasoning_buffer.clone();
let answer_buf = answer_buffer.clone(); let answer_buf = answer_buffer.clone();
tokio::spawn(async move { tokio::spawn(async move {
// Always broadcast every step as a stream chunk
let event = RoomMessageStreamChunkEvent { let event = RoomMessageStreamChunkEvent {
message_id: streaming_msg_id, message_id: streaming_msg_id,
room_id, room_id,
content, content: content.clone(),
done, done,
error: None, error: None,
display_name: Some((*ai_name).clone()), display_name: Some((*ai_name).clone()),
}; };
room_manager.broadcast_stream_chunk(event).await; room_manager.broadcast_stream_chunk(event).await;
if done {
answer_buf.lock().unwrap().push_str(&content_for_buffer); // Collect all steps into reasoning_buffer; Answer goes to answer_buffer
let mut rb = reasoning_buf.lock().unwrap();
rb.push_str(&content);
rb.push('\n');
drop(rb);
if is_answer {
answer_buf.lock().unwrap().push_str(&content);
} }
}); });
} }
@ -1355,89 +1428,110 @@ impl RoomService {
.await; .await;
let final_content = answer_buffer.lock().unwrap().clone(); let final_content = answer_buffer.lock().unwrap().clone();
let reasoning_chain = reasoning_buffer.lock().unwrap().clone();
match result { // Determine what to persist: prefer the answer, fall back to the reasoning chain
Ok(_) if !final_content.is_empty() => { let content_to_persist = if !final_content.is_empty() {
let envelope = RoomMessageEnvelope { final_content
id: streaming_msg_id, } else if !reasoning_chain.trim().is_empty() {
dedup_key: Some(format!("{}:{}", room_id_inner, streaming_msg_id)), // No Answer step, but the reasoning chain was streamed — still send it
room_id: room_id_inner, format!(
sender_type: sender_type.clone(), "[Agent ran through {} reasoning steps but did not produce a final answer.]\n{}",
sender_id: None, step_count.load(std::sync::atomic::Ordering::Relaxed),
model_id: Some(model_id_inner), reasoning_chain.trim_end()
thread_id: None, )
content: final_content.clone(), } else {
content_type: "text".to_string(), // Nothing produced — this should not happen in practice
send_at: now, String::from("[No output from reasoning agent]")
seq, };
in_reply_to: None,
};
if let Err(e) = queue.publish(room_id_inner, envelope).await { let (err_msg, should_log) = match &result {
tracing::error!(error = %e, "Failed to publish ReAct streaming message"); Err(e) => (Some(format!("[Agent error: {}]", e)), true),
} else { _ => (None, false),
let now = Utc::now(); };
if let Err(e) = room_ai::Entity::update_many()
.col_expr(
room_ai::Column::CallCount,
Expr::col(room_ai::Column::CallCount).add(1),
)
.col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now)))
.filter(room_ai::Column::Room.eq(room_id_inner))
.filter(room_ai::Column::Model.eq(model_id_inner))
.exec(&db)
.await
{
tracing::warn!(error = %e, "Failed to update room_ai call stats");
}
let msg_event = queue::RoomMessageEvent { let content_to_persist = if let Some(msg) = &err_msg {
id: streaming_msg_id, format!(
room_id: room_id_inner, "{}\n[Error during reasoning: {}]",
sender_type: sender_type.clone(), content_to_persist.trim_end(),
sender_id: None, msg.trim_start_matches("[Agent error: ").trim_end_matches("]")
thread_id: None, )
content: final_content, } else {
content_type: "text".to_string(), content_to_persist
send_at: now, };
seq,
display_name: Some(ai_display_name.clone()),
in_reply_to: None,
reactions: None,
message_id: None,
};
room_manager.broadcast(room_id_inner, msg_event).await;
room_manager.metrics.messages_sent.increment(1);
let event = queue::ProjectRoomEvent { if should_log {
event_type: super::RoomEventType::NewMessage.as_str().into(), tracing::error!(error = %result.as_ref().unwrap_err(), "ReAct streaming failed");
project_id: project_id_inner, }
room_id: Some(room_id_inner),
category_id: None, let persist_content = content_to_persist.trim().to_string();
message_id: Some(streaming_msg_id), if persist_content.is_empty() {
seq: Some(seq), return;
timestamp: now, }
};
queue let envelope = RoomMessageEnvelope {
.publish_project_room_event(project_id_inner, event) id: streaming_msg_id,
.await; dedup_key: Some(format!("{}:{}", room_id_inner, streaming_msg_id)),
} room_id: room_id_inner,
} sender_type: sender_type.clone(),
Ok(_) => { sender_id: None,
tracing::warn!("ReAct agent returned empty answer"); model_id: Some(model_id_inner),
} thread_id: None,
Err(e) => { content: persist_content.clone(),
tracing::error!(error = %e, "ReAct streaming failed"); content_type: "text".to_string(),
let event = RoomMessageStreamChunkEvent { send_at: now,
message_id: streaming_msg_id, seq,
room_id: room_id_inner, in_reply_to: None,
content: String::new(), };
done: true,
error: Some(e.to_string()), if let Err(e) = queue.publish(room_id_inner, envelope).await {
display_name: Some(ai_display_name.clone()), tracing::error!(error = %e, "Failed to publish ReAct streaming message");
}; } else {
room_manager.broadcast_stream_chunk(event).await; let now = Utc::now();
if let Err(e) = room_ai::Entity::update_many()
.col_expr(
room_ai::Column::CallCount,
Expr::col(room_ai::Column::CallCount).add(1),
)
.col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now)))
.filter(room_ai::Column::Room.eq(room_id_inner))
.filter(room_ai::Column::Model.eq(model_id_inner))
.exec(&db)
.await
{
tracing::warn!(error = %e, "Failed to update room_ai call stats");
} }
let msg_event = queue::RoomMessageEvent {
id: streaming_msg_id,
room_id: room_id_inner,
sender_type: sender_type.clone(),
sender_id: None,
thread_id: None,
content: persist_content,
content_type: "text".to_string(),
send_at: now,
seq,
display_name: Some(ai_display_name.clone()),
in_reply_to: None,
reactions: None,
message_id: None,
};
room_manager.broadcast(room_id_inner, msg_event).await;
room_manager.metrics.messages_sent.increment(1);
let event = queue::ProjectRoomEvent {
event_type: super::RoomEventType::NewMessage.as_str().into(),
project_id: project_id_inner,
room_id: Some(room_id_inner),
category_id: None,
message_id: Some(streaming_msg_id),
seq: Some(seq),
timestamp: now,
};
queue
.publish_project_room_event(project_id_inner, event)
.await;
} }
room_manager.close_stream_channel(streaming_msg_id).await; room_manager.close_stream_channel(streaming_msg_id).await;