use std::sync::Arc; use chrono::Utc; use db::cache::AppCache; use db::database::AppDatabase; use models::rooms::room_ai; use queue::{MessageProducer, ProjectRoomEvent, RoomMessageEnvelope}; use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, ExprTrait, QueryFilter}; use uuid::Uuid; use super::sequence::next_room_message_seq_internal; use crate::connection::RoomConnectionManager; use agent::chat::{AiRequest, ChatService}; use agent::react::ReactStep; pub async fn process_message_ai_react_streaming( chat_service: Arc, request: AiRequest, room_id: Uuid, project_id: Uuid, model_id: Uuid, lock_guard: crate::room_ai_queue::RoomAiLockGuard, db: AppDatabase, _cache: AppCache, queue: MessageProducer, room_manager: Arc, ) { use queue::RoomMessageStreamChunkEvent; let streaming_msg_id = Uuid::now_v7(); let seq = match next_room_message_seq_internal(room_id, &db, &_cache).await { Ok(s) => s, Err(e) => { tracing::error!(error = %e, "Failed to get seq for ReAct streaming"); return; } }; let room_id_inner = room_id; let project_id_inner = project_id; let now = Utc::now(); let sender_type = "ai".to_string(); let ai_display_name = request.model.name.clone(); tokio::spawn(async move { let _lock_guard = lock_guard; // Collect ordered steps for storage and streaming. let steps: std::sync::Arc>> = std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); let last_action_name: std::sync::Arc> = std::sync::Arc::new(std::sync::Mutex::new(String::new())); let answer_buffer: std::sync::Arc> = std::sync::Arc::new(std::sync::Mutex::new(String::new())); let step_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); let on_step = { let room_manager = room_manager.clone(); let streaming_msg_id = streaming_msg_id; let room_id = room_id_inner; let step_count = step_count.clone(); let ai_display_name_for_step = std::sync::Arc::new(ai_display_name.clone()); let steps = steps.clone(); let answer_buffer = answer_buffer.clone(); let last_action_name = last_action_name.clone(); move |step: ReactStep| { let room_manager = room_manager.clone(); let (chunk_type, content) = match &step { ReactStep::Thought { step: _, thought } => { ("thinking".to_string(), format!("[Thinking] {}", thought)) } ReactStep::Action { step: _, action } => { *last_action_name.lock().unwrap() = action.name.clone(); ("tool_call".to_string(), format!("[Action] Calling `{}` with {:?}", action.name, action.args)) } ReactStep::Observation { step: _, observation: _, } => { // Sanitize observation — don't expose raw tool output to frontend let action_name = last_action_name.lock().unwrap().clone(); ("tool_call".to_string(), format!("[Observation] {} (completed)", action_name)) } ReactStep::Answer { step: _, answer } => { ("answer".to_string(), answer.clone()) } }; let is_answer = matches!(&step, ReactStep::Answer { .. }); if is_answer { step_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); } // Record ordered step for storage { let mut s = steps.lock().unwrap(); s.push((chunk_type.clone(), content.clone())); } if is_answer { let mut ab = answer_buffer.lock().unwrap(); ab.push_str(&content); } let done = is_answer; let ai_name = ai_display_name_for_step.clone(); tokio::spawn(async move { let event = RoomMessageStreamChunkEvent { message_id: streaming_msg_id, room_id, content: content.clone(), done, error: None, display_name: Some((*ai_name).clone()), chunk_type: Some(chunk_type), }; room_manager.broadcast_stream_chunk(event).await; }); } }; let result = chat_service.process_react(&request, on_step).await; let final_content = answer_buffer.lock().unwrap().clone(); let all_steps = steps.lock().unwrap().clone(); let reasoning_chain: String = all_steps .iter() .filter(|(t, _)| t != "answer") .map(|(_, c)| c.clone()) .collect::>() .join("\n"); let content_to_persist = if !final_content.is_empty() { final_content } else if !reasoning_chain.trim().is_empty() { format!( "[Agent ran through {} reasoning steps but did not produce a final answer.]\n{}", step_count.load(std::sync::atomic::Ordering::Relaxed), reasoning_chain.trim_end() ) } else { String::from("[No output from reasoning agent]") }; let (err_msg, should_log) = match &result { Err(e) => (Some(format!("[Agent error: {}]", e)), true), _ => (None, false), }; let content_to_persist = if let Some(msg) = &err_msg { format!( "{}\n[Error during reasoning: {}]", content_to_persist.trim_end(), msg.trim_start_matches("[Agent error: ") .trim_end_matches("]") ) } else { content_to_persist }; if should_log { tracing::error!(error = %result.as_ref().unwrap_err(), "ReAct streaming failed"); } let persist_content = content_to_persist.trim().to_string(); if persist_content.is_empty() { return; } // Serialize ordered steps as JSON for ordered replay. let thinking_content = { let steps = steps.lock().unwrap(); if steps.is_empty() { None } else { let chunks_json = serde_json::json!({ "__chunks__": steps.iter().map(|(t, c)| serde_json::json!({ "type": t, "content": c, })).collect::>(), }); Some(chunks_json.to_string()) } }; let envelope = RoomMessageEnvelope { id: streaming_msg_id, dedup_key: Some(format!("{}:{}", room_id_inner, streaming_msg_id)), room_id: room_id_inner, sender_type: sender_type.clone(), sender_id: None, model_id: Some(model_id), thread_id: None, content: persist_content.clone(), content_type: "text".to_string(), thinking_content, send_at: now, seq, in_reply_to: None, display_name: Some(ai_display_name.clone()), }; if let Err(e) = queue.publish(room_id_inner, envelope).await { tracing::error!(error = %e, "Failed to publish ReAct streaming message"); } else { let now = Utc::now(); if let Err(e) = room_ai::Entity::update_many() .col_expr( room_ai::Column::CallCount, Expr::col(room_ai::Column::CallCount).add(1), ) .col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now))) .filter(room_ai::Column::Room.eq(room_id_inner)) .filter(room_ai::Column::Model.eq(model_id)) .exec(&db) .await { tracing::warn!(error = %e, "Failed to update room_ai call stats"); } // Record billing (non-fatal) // TODO: ReAct agent does not track token counts yet; billing with 0/0 let _ = super::billing::record_ai_usage( &db, project_id_inner, model_id, 0, 0, ) .await; let msg_event = queue::RoomMessageEvent { id: streaming_msg_id, room_id: room_id_inner, sender_type: sender_type.clone(), sender_id: None, thread_id: None, content: persist_content, content_type: "text".to_string(), thinking_content: None, send_at: now, seq, display_name: Some(ai_display_name.clone()), in_reply_to: None, reactions: None, message_id: None, }; room_manager.broadcast(room_id_inner, msg_event).await; room_manager.metrics.messages_sent.increment(1); let event = ProjectRoomEvent { event_type: crate::RoomEventType::NewMessage.as_str().into(), project_id: project_id_inner, room_id: Some(room_id_inner), category_id: None, message_id: Some(streaming_msg_id), seq: Some(seq), timestamp: now, }; queue .publish_project_room_event(project_id_inner, event) .await; } room_manager.close_stream_channel(streaming_msg_id).await; }); }