use chrono::Utc; use db::database::AppDatabase; use models::rooms::{room_ai, room_message}; use queue::{MessageProducer, ProjectRoomEvent, RoomMessageEnvelope}; use sea_orm::{ColumnTrait, EntityTrait, ExprTrait, QueryFilter, Set, sea_query::Expr}; use uuid::Uuid; use super::ai_mode_streaming_steps::{ModeStreamingState, lock_or_recover}; use crate::connection::RoomConnectionManager; use agent::chat::normalize_thinking_content; #[allow(dead_code)] pub(crate) async fn finalize_mode_stream( result: Result<(String, i64, i64), agent::AgentError>, state: ModeStreamingState, streaming_msg_id: Uuid, room_id: Uuid, project_id: Uuid, model_id: Uuid, seq: i64, now: chrono::DateTime, ai_display_name: &str, mode_name: &str, db: &AppDatabase, queue: &MessageProducer, room_manager: &RoomConnectionManager, ) { use queue::RoomMessageStreamChunkEvent; let final_stream_content = normalize_thinking_content(&lock_or_recover(&state.answer_buffer)); let final_event = RoomMessageStreamChunkEvent { message_id: streaming_msg_id, room_id, seq: state .chunk_seq .fetch_add(1, std::sync::atomic::Ordering::Relaxed), content: final_stream_content.clone(), done: true, error: None, display_name: Some(ai_display_name.to_string()), chunk_type: Some("answer".to_string()), }; queue.publish_stream_chunk(&final_event).await; room_manager.broadcast_stream_chunk(final_event).await; let (final_content, err_msg) = match result { Ok((content, _, _)) => (content, None), Err(e) => { let msg = format!("AI 处理失败: {}", e); tracing::error!(error = ?e, "{} streaming failed", mode_name); (String::new(), Some(msg)) } }; let all_chunks_data = lock_or_recover(&state.all_chunks).clone(); let reasoning_chain: String = all_chunks_data .iter() .filter(|(t, _)| t != "answer") .map(|(_, c)| normalize_thinking_content(c)) .collect::>() .join("\n"); let content_to_persist = if !final_content.is_empty() { final_content.clone() } else if !reasoning_chain.trim().is_empty() { format!( "[Agent ran through reasoning steps but did not produce a final answer.]\n{}", reasoning_chain.trim_end() ) } else { String::from("[No output from reasoning agent]") }; let content_to_persist = if let Some(msg) = &err_msg { format!("{}\n[Error: {}]", content_to_persist.trim_end(), msg) } else { content_to_persist }; let persist_content = content_to_persist.trim().to_string(); if persist_content.is_empty() { return; } let thinking_content_serialized = { let chunks = lock_or_recover(&state.all_chunks); if chunks.is_empty() { None } else { let chunks_json = serde_json::json!({ "__chunks__": chunks.iter().map(|(t, c)| { let content = if t == "thinking" { normalize_thinking_content(c) } else { c.clone() }; serde_json::json!({ "type": t, "content": content, }) }).collect::>(), }); Some(chunks_json.to_string()) } }; let envelope = RoomMessageEnvelope { id: streaming_msg_id, dedup_key: Some(format!("{}:{}", room_id, streaming_msg_id)), room_id, sender_type: "ai".to_string(), sender_id: None, model_id: Some(model_id), thread_id: None, content: persist_content.clone(), content_type: "text".to_string(), thinking_content: thinking_content_serialized.clone(), send_at: now, seq, in_reply_to: None, display_name: Some(ai_display_name.to_string()), }; if let Err(e) = room_message::Entity::insert(room_message::ActiveModel { id: Set(streaming_msg_id), seq: Set(seq), room: Set(room_id), sender_type: Set(models::rooms::MessageSenderType::Ai), sender_id: Set(None), model_id: Set(Some(model_id)), thread: Set(None), content: Set(persist_content.clone()), content_type: Set(models::rooms::MessageContentType::Text), thinking_content: Set(thinking_content_serialized.clone()), edited_at: Set(None), send_at: Set(now), revoked: Set(None), revoked_by: Set(None), in_reply_to: Set(None), }) .exec(db) .await { tracing::error!(error = %e, room_id = %room_id, streaming_msg_id = %streaming_msg_id, "Failed to persist {} streaming message to DB", mode_name); return; } if let Err(e) = queue.publish(room_id, envelope).await { tracing::error!(error = %e, "Failed to publish {} streaming message", mode_name); } else { let now = Utc::now(); if let Err(e) = room_ai::Entity::update_many() .col_expr( room_ai::Column::CallCount, Expr::col(room_ai::Column::CallCount).add(1), ) .col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now))) .filter(room_ai::Column::Room.eq(room_id)) .filter(room_ai::Column::Model.eq(model_id)) .exec(db) .await { tracing::warn!(error = %e, "Failed to update room_ai call stats"); } let msg_event = queue::RoomMessageEvent { id: streaming_msg_id, room_id, sender_type: "ai".to_string(), sender_id: None, thread_id: None, content: persist_content, content_type: "text".to_string(), thinking_content: thinking_content_serialized, send_at: now, seq, display_name: Some(ai_display_name.to_string()), in_reply_to: None, reactions: None, message_id: None, }; room_manager.broadcast(room_id, msg_event).await; room_manager.metrics.messages_sent.increment(1); let event = ProjectRoomEvent { event_type: crate::RoomEventType::NewMessage.as_str().into(), project_id, room_id: Some(room_id), category_id: None, message_id: Some(streaming_msg_id), seq: Some(seq), timestamp: now, }; queue.publish_project_room_event(project_id, event).await; } }