diff --git a/libs/room/src/message.rs b/libs/room/src/message.rs index e1808b2..1675319 100644 --- a/libs/room/src/message.rs +++ b/libs/room/src/message.rs @@ -284,7 +284,7 @@ impl RoomService { .await; } - let should_respond = match self.should_ai_respond(room_id).await { + let should_respond = match self.should_ai_respond(room_id, &content).await { Ok(v) => v, Err(e) => { tracing::warn!(room_id = %room_id, error = %e, "should_ai_respond failed"); diff --git a/libs/room/src/service.rs b/libs/room/src/service.rs index 00ae50b..5d881cd 100644 --- a/libs/room/src/service.rs +++ b/libs/room/src/service.rs @@ -756,7 +756,16 @@ impl RoomService { } } - pub async fn should_ai_respond(&self, room_id: Uuid) -> Result { + /// Determine whether AI should respond to a message in this room. + /// - No room_ai config → AI not configured, never respond. + /// - use_exact = false → respond to every text message. + /// - use_exact = true → only respond when the message contains an @[ai:...] or + /// ... tag that mentions this room's configured AI model. + pub async fn should_ai_respond( + &self, + room_id: Uuid, + content: &str, + ) -> Result { use models::rooms::room_ai; let ai_config = room_ai::Entity::find() @@ -764,7 +773,37 @@ impl RoomService { .one(&self.db) .await?; - Ok(ai_config.is_some()) + let config = match ai_config { + Some(c) => c, + None => return Ok(false), + }; + + if !config.use_exact { + return Ok(true); + } + + // use_exact mode: only respond when AI is explicitly mentioned + let model_id_str = config.model.to_string(); + + // Check @[ai:model_id:label] format + for cap in MENTION_BRACKET_RE.captures_iter(content) { + if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { + if type_m.as_str() == "ai" && id_m.as_str().trim() == model_id_str { + return Ok(true); + } + } + } + + // Check label format + for cap in MENTION_TAG_RE.captures_iter(content) { + if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { + if type_m.as_str() == "ai" && id_m.as_str().trim() == model_id_str { + return Ok(true); + } + } + } + + Ok(false) } pub async fn get_room_ai_config( @@ -1179,6 +1218,20 @@ impl RoomService { } Err(e) => { tracing::error!(error = %e, "AI processing failed"); + // Send an error message so the user knows something went wrong + let _ = Self::create_and_publish_ai_message( + &db, + &cache, + &queue, + &room_manager, + room_id_for_ai, + project_id_for_ai, + Uuid::now_v7(), + format!("[AI error: {}]", e), + model_id_inner, + Some(model_display_name), + ) + .await; } } }); @@ -1249,6 +1302,19 @@ impl RoomService { } Err(e) => { tracing::error!(error = %e, "ReAct agent failed"); + let _ = Self::create_and_publish_ai_message( + &db, + &cache, + &queue, + &room_manager, + room_id_for_ai, + project_id_for_ai, + Uuid::now_v7(), + format!("[AI error: {}]", e), + model_id_inner, + Some(model_display_name), + ) + .await; } } }); @@ -1288,7 +1354,9 @@ impl RoomService { tokio::spawn(async move { let _lock_guard = lock_guard; - // Buffer each ReactStep and forward as a stream chunk. + // Buffer all reasoning steps + the final answer separately. + let reasoning_buffer: std::sync::Arc> = + std::sync::Arc::new(std::sync::Mutex::new(String::new())); let answer_buffer: std::sync::Arc> = std::sync::Arc::new(std::sync::Mutex::new(String::new())); let step_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); @@ -1299,6 +1367,7 @@ impl RoomService { let room_id = room_id_inner; let step_count = step_count.clone(); let ai_display_name_for_step = std::sync::Arc::new(ai_display_name.clone()); + let reasoning_buffer = reasoning_buffer.clone(); let answer_buffer = answer_buffer.clone(); move |step: ReactStep| { let room_manager = room_manager.clone(); @@ -1320,31 +1389,35 @@ impl RoomService { } }; - if let ReactStep::Answer { .. } = &step { + let is_answer = matches!(&step, ReactStep::Answer { .. }); + if is_answer { step_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); } - let done = matches!(&step, ReactStep::Answer { .. }); - let content_for_buffer = if done { - content.clone() - } else { - String::new() - }; + let done = is_answer; let ai_name = ai_display_name_for_step.clone(); + let reasoning_buf = reasoning_buffer.clone(); let answer_buf = answer_buffer.clone(); tokio::spawn(async move { + // Always broadcast every step as a stream chunk let event = RoomMessageStreamChunkEvent { message_id: streaming_msg_id, room_id, - content, + content: content.clone(), done, error: None, display_name: Some((*ai_name).clone()), }; room_manager.broadcast_stream_chunk(event).await; - if done { - answer_buf.lock().unwrap().push_str(&content_for_buffer); + + // Collect all steps into reasoning_buffer; Answer goes to answer_buffer + let mut rb = reasoning_buf.lock().unwrap(); + rb.push_str(&content); + rb.push('\n'); + drop(rb); + if is_answer { + answer_buf.lock().unwrap().push_str(&content); } }); } @@ -1355,89 +1428,110 @@ impl RoomService { .await; let final_content = answer_buffer.lock().unwrap().clone(); + let reasoning_chain = reasoning_buffer.lock().unwrap().clone(); - match result { - Ok(_) if !final_content.is_empty() => { - let envelope = RoomMessageEnvelope { - id: streaming_msg_id, - dedup_key: Some(format!("{}:{}", room_id_inner, streaming_msg_id)), - room_id: room_id_inner, - sender_type: sender_type.clone(), - sender_id: None, - model_id: Some(model_id_inner), - thread_id: None, - content: final_content.clone(), - content_type: "text".to_string(), - send_at: now, - seq, - in_reply_to: None, - }; + // Determine what to persist: prefer the answer, fall back to the reasoning chain + let content_to_persist = if !final_content.is_empty() { + final_content + } else if !reasoning_chain.trim().is_empty() { + // No Answer step, but the reasoning chain was streamed — still send it + format!( + "[Agent ran through {} reasoning steps but did not produce a final answer.]\n{}", + step_count.load(std::sync::atomic::Ordering::Relaxed), + reasoning_chain.trim_end() + ) + } else { + // Nothing produced — this should not happen in practice + String::from("[No output from reasoning agent]") + }; - if let Err(e) = queue.publish(room_id_inner, envelope).await { - tracing::error!(error = %e, "Failed to publish ReAct streaming message"); - } else { - let now = Utc::now(); - if let Err(e) = room_ai::Entity::update_many() - .col_expr( - room_ai::Column::CallCount, - Expr::col(room_ai::Column::CallCount).add(1), - ) - .col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now))) - .filter(room_ai::Column::Room.eq(room_id_inner)) - .filter(room_ai::Column::Model.eq(model_id_inner)) - .exec(&db) - .await - { - tracing::warn!(error = %e, "Failed to update room_ai call stats"); - } + let (err_msg, should_log) = match &result { + Err(e) => (Some(format!("[Agent error: {}]", e)), true), + _ => (None, false), + }; - let msg_event = queue::RoomMessageEvent { - id: streaming_msg_id, - room_id: room_id_inner, - sender_type: sender_type.clone(), - sender_id: None, - thread_id: None, - content: final_content, - content_type: "text".to_string(), - send_at: now, - seq, - display_name: Some(ai_display_name.clone()), - in_reply_to: None, - reactions: None, - message_id: None, - }; - room_manager.broadcast(room_id_inner, msg_event).await; - room_manager.metrics.messages_sent.increment(1); + let content_to_persist = if let Some(msg) = &err_msg { + format!( + "{}\n[Error during reasoning: {}]", + content_to_persist.trim_end(), + msg.trim_start_matches("[Agent error: ").trim_end_matches("]") + ) + } else { + content_to_persist + }; - let event = queue::ProjectRoomEvent { - event_type: super::RoomEventType::NewMessage.as_str().into(), - project_id: project_id_inner, - room_id: Some(room_id_inner), - category_id: None, - message_id: Some(streaming_msg_id), - seq: Some(seq), - timestamp: now, - }; - queue - .publish_project_room_event(project_id_inner, event) - .await; - } - } - Ok(_) => { - tracing::warn!("ReAct agent returned empty answer"); - } - Err(e) => { - tracing::error!(error = %e, "ReAct streaming failed"); - let event = RoomMessageStreamChunkEvent { - message_id: streaming_msg_id, - room_id: room_id_inner, - content: String::new(), - done: true, - error: Some(e.to_string()), - display_name: Some(ai_display_name.clone()), - }; - room_manager.broadcast_stream_chunk(event).await; + if should_log { + tracing::error!(error = %result.as_ref().unwrap_err(), "ReAct streaming failed"); + } + + let persist_content = content_to_persist.trim().to_string(); + if persist_content.is_empty() { + return; + } + + let envelope = RoomMessageEnvelope { + id: streaming_msg_id, + dedup_key: Some(format!("{}:{}", room_id_inner, streaming_msg_id)), + room_id: room_id_inner, + sender_type: sender_type.clone(), + sender_id: None, + model_id: Some(model_id_inner), + thread_id: None, + content: persist_content.clone(), + content_type: "text".to_string(), + send_at: now, + seq, + in_reply_to: None, + }; + + if let Err(e) = queue.publish(room_id_inner, envelope).await { + tracing::error!(error = %e, "Failed to publish ReAct streaming message"); + } else { + let now = Utc::now(); + if let Err(e) = room_ai::Entity::update_many() + .col_expr( + room_ai::Column::CallCount, + Expr::col(room_ai::Column::CallCount).add(1), + ) + .col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now))) + .filter(room_ai::Column::Room.eq(room_id_inner)) + .filter(room_ai::Column::Model.eq(model_id_inner)) + .exec(&db) + .await + { + tracing::warn!(error = %e, "Failed to update room_ai call stats"); } + + let msg_event = queue::RoomMessageEvent { + id: streaming_msg_id, + room_id: room_id_inner, + sender_type: sender_type.clone(), + sender_id: None, + thread_id: None, + content: persist_content, + content_type: "text".to_string(), + send_at: now, + seq, + display_name: Some(ai_display_name.clone()), + in_reply_to: None, + reactions: None, + message_id: None, + }; + room_manager.broadcast(room_id_inner, msg_event).await; + room_manager.metrics.messages_sent.increment(1); + + let event = queue::ProjectRoomEvent { + event_type: super::RoomEventType::NewMessage.as_str().into(), + project_id: project_id_inner, + room_id: Some(room_id_inner), + category_id: None, + message_id: Some(streaming_msg_id), + seq: Some(seq), + timestamp: now, + }; + queue + .publish_project_room_event(project_id_inner, event) + .await; } room_manager.close_stream_channel(streaming_msg_id).await;