From d3de12717d76dfd3adae6b2cbbd3434ff03adccd Mon Sep 17 00:00:00 2001 From: zhenyi <434836402@qq.com> Date: Wed, 20 May 2026 13:38:11 +0800 Subject: [PATCH] refactor(room): consolidate AI service modules Delete redundant AI service implementations: - ai_mode_streaming variants - ai_react_streaming variants - process_ai These are superseded by the consolidated ai_service module. --- libs/room/src/service/ai_mode_streaming.rs | 160 --------- .../src/service/ai_mode_streaming_post.rs | 196 ----------- .../src/service/ai_mode_streaming_steps.rs | 98 ------ .../room/src/service/ai_react_nonstreaming.rs | 73 ---- libs/room/src/service/ai_react_streaming.rs | 144 -------- .../src/service/ai_react_streaming_post.rs | 330 ------------------ .../src/service/ai_react_streaming_steps.rs | 125 ------- libs/room/src/service/process_ai.rs | 264 -------------- 8 files changed, 1390 deletions(-) delete mode 100644 libs/room/src/service/ai_mode_streaming.rs delete mode 100644 libs/room/src/service/ai_mode_streaming_post.rs delete mode 100644 libs/room/src/service/ai_mode_streaming_steps.rs delete mode 100644 libs/room/src/service/ai_react_nonstreaming.rs delete mode 100644 libs/room/src/service/ai_react_streaming.rs delete mode 100644 libs/room/src/service/ai_react_streaming_post.rs delete mode 100644 libs/room/src/service/ai_react_streaming_steps.rs delete mode 100644 libs/room/src/service/process_ai.rs diff --git a/libs/room/src/service/ai_mode_streaming.rs b/libs/room/src/service/ai_mode_streaming.rs deleted file mode 100644 index 480414c..0000000 --- a/libs/room/src/service/ai_mode_streaming.rs +++ /dev/null @@ -1,160 +0,0 @@ -use std::pin::Pin; -use std::sync::Arc; - -use chrono::Utc; -use db::cache::AppCache; -use db::database::AppDatabase; -use queue::MessageProducer; -use uuid::Uuid; - -use super::ai_mode_streaming_post::finalize_mode_stream; -use super::ai_mode_streaming_steps::build_mode_chunk_callback; -use super::sequence::next_room_message_seq_internal; -use crate::connection::RoomConnectionManager; -use agent::chat::{AiRequest, ChatService}; - -#[allow(dead_code)] - -pub type RunModeFn = Box< - dyn FnOnce( - Arc, - AiRequest, - Arc< - dyn Fn(String, String, bool) -> Pin + Send>> - + Send - + Sync, - >, - ) -> Pin< - Box> + Send>, - > + Send, ->; - -#[allow(dead_code)] -pub async fn run_mode_streaming( - chat_service: Arc, - request: AiRequest, - room_id: Uuid, - project_id: Uuid, - model_id: Uuid, - lock_guard: crate::room_ai_queue::RoomAiLockGuard, - db: AppDatabase, - cache: AppCache, - queue: MessageProducer, - room_manager: Arc, - mode_name_str: &str, - run: RunModeFn, -) { - let mode_name = mode_name_str.to_string(); - - let streaming_msg_id = Uuid::now_v7(); - let seq = match next_room_message_seq_internal(room_id, &db, &cache).await { - Ok(s) => s, - Err(e) => { - tracing::error!(error = %e, "Failed to get seq for {} streaming", mode_name); - return; - } - }; - - let ai_display_name = request.model.name.clone(); - let _ = room_manager - .register_stream_channel(streaming_msg_id, room_id, Some(ai_display_name.clone())) - .await; - - use queue::RoomMessageStreamChunkEvent; - let initial_event = RoomMessageStreamChunkEvent { - message_id: streaming_msg_id, - room_id, - seq: 0, - content: String::new(), - done: false, - error: None, - display_name: Some(ai_display_name.clone()), - chunk_type: Some("thinking".to_string()), - }; - room_manager.broadcast_stream_chunk(initial_event).await; - - let now = Utc::now(); - - tokio::spawn(async move { - let _lock_guard = lock_guard; - let cancel = room_manager.register_stream_cancel(room_id).await; - - let ai_typing_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001") - .expect("constant UUID should always parse"); - - let typing_start = queue::TypingEvent { - room_id, - user_id: ai_typing_id, - username: ai_display_name.clone(), - avatar_url: None, - action: "start".to_string(), - sender_type: Some("ai".to_string()), - }; - room_manager - .broadcast_typing(room_id, typing_start.clone()) - .await; - - let (typing_cancel_tx, typing_cancel_rx) = tokio::sync::oneshot::channel::<()>(); - let typing_renew_handle = tokio::spawn({ - let mut interval = tokio::time::interval(std::time::Duration::from_secs(30)); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - let mgr = room_manager.clone(); - let evt = typing_start.clone(); - async move { - tokio::select! { - _ = typing_cancel_rx => {} - _ = async { - loop { - interval.tick().await; - mgr.broadcast_typing(room_id, evt.clone()).await; - } - } => {} - } - } - }); - - let state = build_mode_chunk_callback( - streaming_msg_id, - room_id, - &ai_display_name, - cancel, - room_manager.clone(), - queue.clone(), - ); - - let on_chunk = state.on_chunk.clone(); - let result = run(chat_service, request, on_chunk).await; - - finalize_mode_stream( - result, - state, - streaming_msg_id, - room_id, - project_id, - model_id, - seq, - now, - &ai_display_name, - &mode_name, - &db, - &queue, - &room_manager, - ) - .await; - - let _ = typing_cancel_tx.send(()); - typing_renew_handle.abort(); - let typing_stop = queue::TypingEvent { - room_id, - user_id: ai_typing_id, - username: ai_display_name.clone(), - avatar_url: None, - action: "stop".to_string(), - sender_type: Some("ai".to_string()), - }; - room_manager.broadcast_typing(room_id, typing_stop).await; - - room_manager.unregister_stream_cancel(room_id).await; - room_manager.close_stream_channel(streaming_msg_id).await; - }); -} diff --git a/libs/room/src/service/ai_mode_streaming_post.rs b/libs/room/src/service/ai_mode_streaming_post.rs deleted file mode 100644 index 80fc75b..0000000 --- a/libs/room/src/service/ai_mode_streaming_post.rs +++ /dev/null @@ -1,196 +0,0 @@ -use chrono::Utc; -use db::database::AppDatabase; -use models::rooms::{room_ai, room_message}; -use queue::{MessageProducer, ProjectRoomEvent, RoomMessageEnvelope}; -use sea_orm::{ColumnTrait, EntityTrait, ExprTrait, QueryFilter, Set, sea_query::Expr}; -use uuid::Uuid; - -use super::ai_mode_streaming_steps::{ModeStreamingState, lock_or_recover}; -use crate::connection::RoomConnectionManager; -use agent::chat::normalize_thinking_content; - -#[allow(dead_code)] -pub(crate) async fn finalize_mode_stream( - result: Result<(String, i64, i64), agent::AgentError>, - state: ModeStreamingState, - streaming_msg_id: Uuid, - room_id: Uuid, - project_id: Uuid, - model_id: Uuid, - seq: i64, - now: chrono::DateTime, - ai_display_name: &str, - mode_name: &str, - db: &AppDatabase, - queue: &MessageProducer, - room_manager: &RoomConnectionManager, -) { - use queue::RoomMessageStreamChunkEvent; - - let final_stream_content = normalize_thinking_content(&lock_or_recover(&state.answer_buffer)); - let final_event = RoomMessageStreamChunkEvent { - message_id: streaming_msg_id, - room_id, - seq: state - .chunk_seq - .fetch_add(1, std::sync::atomic::Ordering::Relaxed), - content: final_stream_content.clone(), - done: true, - error: None, - display_name: Some(ai_display_name.to_string()), - chunk_type: Some("answer".to_string()), - }; - queue.publish_stream_chunk(&final_event).await; - room_manager.broadcast_stream_chunk(final_event).await; - - let (final_content, err_msg) = match result { - Ok((content, _, _)) => (content, None), - Err(e) => { - let msg = format!("AI 处理失败: {}", e); - tracing::error!(error = ?e, "{} streaming failed", mode_name); - (String::new(), Some(msg)) - } - }; - - let all_chunks_data = lock_or_recover(&state.all_chunks).clone(); - let reasoning_chain: String = all_chunks_data - .iter() - .filter(|(t, _)| t != "answer") - .map(|(_, c)| normalize_thinking_content(c)) - .collect::>() - .join("\n"); - - let content_to_persist = if !final_content.is_empty() { - final_content.clone() - } else if !reasoning_chain.trim().is_empty() { - format!( - "[Agent ran through reasoning steps but did not produce a final answer.]\n{}", - reasoning_chain.trim_end() - ) - } else { - String::from("[No output from reasoning agent]") - }; - let content_to_persist = if let Some(msg) = &err_msg { - format!("{}\n[Error: {}]", content_to_persist.trim_end(), msg) - } else { - content_to_persist - }; - - let persist_content = content_to_persist.trim().to_string(); - if persist_content.is_empty() { - return; - } - - let thinking_content_serialized = { - let chunks = lock_or_recover(&state.all_chunks); - if chunks.is_empty() { - None - } else { - let chunks_json = serde_json::json!({ - "__chunks__": chunks.iter().map(|(t, c)| { - let content = if t == "thinking" { - normalize_thinking_content(c) - } else { - c.clone() - }; - serde_json::json!({ - "type": t, - "content": content, - }) - }).collect::>(), - }); - Some(chunks_json.to_string()) - } - }; - - let envelope = RoomMessageEnvelope { - id: streaming_msg_id, - dedup_key: Some(format!("{}:{}", room_id, streaming_msg_id)), - room_id, - sender_type: "ai".to_string(), - sender_id: None, - model_id: Some(model_id), - thread_id: None, - content: persist_content.clone(), - content_type: "text".to_string(), - thinking_content: thinking_content_serialized.clone(), - send_at: now, - seq, - in_reply_to: None, - display_name: Some(ai_display_name.to_string()), - }; - - if let Err(e) = room_message::Entity::insert(room_message::ActiveModel { - id: Set(streaming_msg_id), - seq: Set(seq), - room: Set(room_id), - sender_type: Set(models::rooms::MessageSenderType::Ai), - sender_id: Set(None), - model_id: Set(Some(model_id)), - thread: Set(None), - content: Set(persist_content.clone()), - content_type: Set(models::rooms::MessageContentType::Text), - thinking_content: Set(thinking_content_serialized.clone()), - edited_at: Set(None), - send_at: Set(now), - revoked: Set(None), - revoked_by: Set(None), - in_reply_to: Set(None), - }) - .exec(db) - .await - { - tracing::error!(error = %e, room_id = %room_id, streaming_msg_id = %streaming_msg_id, - "Failed to persist {} streaming message to DB", mode_name); - return; - } - - if let Err(e) = queue.publish(room_id, envelope).await { - tracing::error!(error = %e, "Failed to publish {} streaming message", mode_name); - } else { - let now = Utc::now(); - if let Err(e) = room_ai::Entity::update_many() - .col_expr( - room_ai::Column::CallCount, - Expr::col(room_ai::Column::CallCount).add(1), - ) - .col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now))) - .filter(room_ai::Column::Room.eq(room_id)) - .filter(room_ai::Column::Model.eq(model_id)) - .exec(db) - .await - { - tracing::warn!(error = %e, "Failed to update room_ai call stats"); - } - - let msg_event = queue::RoomMessageEvent { - id: streaming_msg_id, - room_id, - sender_type: "ai".to_string(), - sender_id: None, - thread_id: None, - content: persist_content, - content_type: "text".to_string(), - thinking_content: thinking_content_serialized, - send_at: now, - seq, - display_name: Some(ai_display_name.to_string()), - in_reply_to: None, - reactions: None, - message_id: None, - }; - room_manager.broadcast(room_id, msg_event).await; - room_manager.metrics.messages_sent.increment(1); - - let event = ProjectRoomEvent { - event_type: crate::RoomEventType::NewMessage.as_str().into(), - project_id, - room_id: Some(room_id), - category_id: None, - message_id: Some(streaming_msg_id), - seq: Some(seq), - timestamp: now, - }; - queue.publish_project_room_event(project_id, event).await; - } -} diff --git a/libs/room/src/service/ai_mode_streaming_steps.rs b/libs/room/src/service/ai_mode_streaming_steps.rs deleted file mode 100644 index 2743ba3..0000000 --- a/libs/room/src/service/ai_mode_streaming_steps.rs +++ /dev/null @@ -1,98 +0,0 @@ -use std::pin::Pin; -use std::sync::Arc; - -use queue::{MessageProducer, RoomMessageStreamChunkEvent}; -use uuid::Uuid; - -use crate::connection::RoomConnectionManager; - -#[allow(dead_code)] -pub(crate) fn lock_or_recover(mutex: &std::sync::Mutex) -> std::sync::MutexGuard<'_, T> { - mutex - .lock() - .unwrap_or_else(|poisoned| poisoned.into_inner()) -} - -pub(crate) struct ModeStreamingState { - pub(crate) all_chunks: Arc>>, - pub(crate) answer_buffer: Arc>, - pub(crate) chunk_seq: Arc, - pub(crate) on_chunk: Arc< - dyn Fn(String, String, bool) -> Pin + Send>> - + Send - + Sync, - >, -} -#[allow(dead_code)] -pub(crate) fn build_mode_chunk_callback( - streaming_msg_id: Uuid, - room_id: Uuid, - ai_display_name: &str, - cancel: Arc, - room_manager: Arc, - queue: MessageProducer, -) -> ModeStreamingState { - let all_chunks: Arc>> = - Arc::new(std::sync::Mutex::new(Vec::new())); - let answer_buffer: Arc> = - Arc::new(std::sync::Mutex::new(String::new())); - let chunk_seq = Arc::new(std::sync::atomic::AtomicU64::new(1)); - - let on_chunk = { - let room_manager = room_manager.clone(); - let queue = queue.clone(); - let cancel = cancel.clone(); - let all_chunks = all_chunks.clone(); - let answer_buffer = answer_buffer.clone(); - let chunk_seq = chunk_seq.clone(); - let ai_display_name = ai_display_name.to_string(); - - Arc::new( - move |chunk_type: String, content: String, is_answer: bool| { - let room_manager = room_manager.clone(); - let queue = queue.clone(); - let cancel = cancel.clone(); - let all_chunks = all_chunks.clone(); - let answer_buffer = answer_buffer.clone(); - let chunk_seq = chunk_seq.clone(); - let ai_display_name = ai_display_name.clone(); - - { - let mut chunks = lock_or_recover(&all_chunks); - chunks.push((chunk_type.clone(), content.clone())); - } - if is_answer { - let mut ab = lock_or_recover(&answer_buffer); - ab.push_str(&content); - } - - let current_seq = chunk_seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let event = RoomMessageStreamChunkEvent { - message_id: streaming_msg_id, - room_id, - seq: current_seq, - content: content.clone(), - done: false, - error: None, - display_name: Some(ai_display_name.clone()), - chunk_type: Some(chunk_type.clone()), - }; - - Box::pin(async move { - if cancel.load(std::sync::atomic::Ordering::Acquire) { - return; - } - queue.publish_stream_chunk(&event).await; - room_manager.broadcast_stream_chunk(event).await; - }) as Pin + Send>> - }, - ) - }; - - ModeStreamingState { - all_chunks, - answer_buffer, - chunk_seq, - on_chunk, - } -} diff --git a/libs/room/src/service/ai_react_nonstreaming.rs b/libs/room/src/service/ai_react_nonstreaming.rs deleted file mode 100644 index 91db077..0000000 --- a/libs/room/src/service/ai_react_nonstreaming.rs +++ /dev/null @@ -1,73 +0,0 @@ -use std::sync::Arc; - -use chrono::Utc; -use db::cache::AppCache; -use db::database::AppDatabase; -use models::rooms::room_ai; -use queue::MessageProducer; -use sea_orm::{ColumnTrait, EntityTrait, ExprTrait, QueryFilter, sea_query::Expr}; -use uuid::Uuid; - -use crate::connection::RoomConnectionManager; -use agent::chat::{AiRequest, ChatService}; -use agent::tool::registry::ToolRegistry; - -pub async fn process_message_ai_react_nonstreaming( - chat_service: Arc, - request: AiRequest, - room_id: Uuid, - _project_id: Uuid, - model_id: Uuid, - lock_guard: crate::room_ai_queue::RoomAiLockGuard, - db: AppDatabase, - _cache: AppCache, - queue: MessageProducer, - _room_manager: Arc, - room_tools: ToolRegistry, - room_preamble: String, -) { - tokio::spawn(async move { - let _lock_guard = lock_guard; - let model_display_name = request.model.name.clone(); - - let final_answer = chat_service - .process_react_room( - &request, - |_step| async move {}, - room_tools, - Some(&room_preamble), - Some(queue.clone()), - ) - .await; - - match final_answer { - Ok((_response, _input_tokens, _output_tokens)) => { - // In room mode, the AI communicates via send_message tool calls. - // Do not post the final answer as a room message — only update call stats. - tracing::info!( - room_id = %room_id, model = %model_display_name, - "Room AI ReAct nonstreaming completed — messages sent via send_message tool" - ); - let now = Utc::now(); - if let Err(e) = room_ai::Entity::update_many() - .col_expr( - room_ai::Column::CallCount, - Expr::col(room_ai::Column::CallCount).add(1), - ) - .col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now))) - .filter(room_ai::Column::Room.eq(room_id)) - .filter(room_ai::Column::Model.eq(model_id)) - .exec(&db) - .await - { - tracing::warn!(error = %e, "Failed to update room_ai call stats"); - } - } - Err(e) => { - tracing::error!(error = ?e, "ReAct agent failed"); - // Even on failure, the AI may have sent partial messages via send_message. - // We log the error but don't post it to the room (the AI can retry). - } - } - }); -} diff --git a/libs/room/src/service/ai_react_streaming.rs b/libs/room/src/service/ai_react_streaming.rs deleted file mode 100644 index 05817b9..0000000 --- a/libs/room/src/service/ai_react_streaming.rs +++ /dev/null @@ -1,144 +0,0 @@ -use std::sync::Arc; - -use chrono::Utc; -use db::cache::AppCache; -use db::database::AppDatabase; -use queue::MessageProducer; -use uuid::Uuid; - -use super::ai_react_streaming_post::finalize_react_stream; -use super::ai_react_streaming_steps::{build_react_step_state, create_react_callback}; -use super::sequence::next_room_message_seq_internal; -use crate::connection::RoomConnectionManager; -use agent::chat::{AiRequest, ChatService}; -use agent::tool::registry::ToolRegistry; - -pub async fn process_message_ai_react_streaming( - chat_service: Arc, - request: AiRequest, - room_id: Uuid, - project_id: Uuid, - model_id: Uuid, - lock_guard: crate::room_ai_queue::RoomAiLockGuard, - db: AppDatabase, - cache: AppCache, - queue: MessageProducer, - room_manager: Arc, - room_tools: ToolRegistry, - room_preamble: String, -) { - let streaming_msg_id = Uuid::now_v7(); - let seq = match next_room_message_seq_internal(room_id, &db, &cache).await { - Ok(s) => s, - Err(e) => { - tracing::error!(error = %e, "Failed to get seq for ReAct streaming"); - return; - } - }; - - let ai_display_name = request.model.name.clone(); - let _ = room_manager - .register_stream_channel(streaming_msg_id, room_id, Some(ai_display_name.clone())) - .await; - - let now = Utc::now(); - - tokio::spawn(async move { - let _lock_guard = lock_guard; - let cancel = room_manager.register_stream_cancel(room_id).await; - - let ai_typing_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001") - .expect("constant UUID should always parse"); - - let typing_start = queue::TypingEvent { - room_id, - user_id: ai_typing_id, - username: ai_display_name.clone(), - avatar_url: None, - action: "start".to_string(), - sender_type: Some("ai".to_string()), - }; - room_manager - .broadcast_typing(room_id, typing_start.clone()) - .await; - - let (typing_cancel_tx, mut typing_cancel_rx) = tokio::sync::oneshot::channel::<()>(); - let typing_renew_handle = tokio::spawn({ - let mut interval = tokio::time::interval(std::time::Duration::from_secs(30)); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - let mgr = room_manager.clone(); - let evt = typing_start.clone(); - async move { - tokio::select! { - _ = &mut typing_cancel_rx => {} - _ = async { - loop { - interval.tick().await; - mgr.broadcast_typing(room_id, evt.clone()).await; - } - } => {} - } - } - }); - - let state = build_react_step_state(); - - let callback = create_react_callback( - streaming_msg_id, - room_id, - &ai_display_name, - room_manager.clone(), - queue.clone(), - cancel.clone(), - state.steps.clone(), - state.answer_buffer.clone(), - state.step_count.clone(), - state.chunk_seq.clone(), - true, // suppress_answer_broadcast: room mode — AI must use send_message - ); - - let result = chat_service - .process_react_room( - &request, - callback, - room_tools, - Some(&room_preamble), - Some(queue.clone()), - ) - .await; - - // In room mode, suppress final answer posting — AI communicates via send_message tool. - finalize_react_stream( - result, - &state, - streaming_msg_id, - room_id, - project_id, - model_id, - seq, - now, - &ai_display_name, - &db, - &cache, - &queue, - &room_manager, - true, // suppress_final_message - ) - .await; - - let _ = typing_cancel_tx.send(()); - typing_renew_handle.abort(); - let typing_stop = queue::TypingEvent { - room_id, - user_id: ai_typing_id, - username: ai_display_name.clone(), - avatar_url: None, - action: "stop".to_string(), - sender_type: Some("ai".to_string()), - }; - room_manager.broadcast_typing(room_id, typing_stop).await; - - room_manager.unregister_stream_cancel(room_id).await; - room_manager.close_stream_channel(streaming_msg_id).await; - }); -} diff --git a/libs/room/src/service/ai_react_streaming_post.rs b/libs/room/src/service/ai_react_streaming_post.rs deleted file mode 100644 index d5ffcc7..0000000 --- a/libs/room/src/service/ai_react_streaming_post.rs +++ /dev/null @@ -1,330 +0,0 @@ -use chrono::Utc; -use db::database::AppDatabase; -use models::rooms::{room_ai, room_message}; -use queue::{MessageProducer, ProjectRoomEvent, RoomMessageEnvelope}; -use sea_orm::{ColumnTrait, EntityTrait, ExprTrait, QueryFilter, Set, sea_query::Expr}; -use uuid::Uuid; - -use super::ai_react_streaming_steps::{ReactStreamingState, lock_or_recover}; -use super::sequence::next_room_message_seq_internal; -use crate::connection::RoomConnectionManager; -use agent::chat::normalize_thinking_content; - -pub(crate) async fn finalize_react_stream( - result: Result<(String, i64, i64), agent::AgentError>, - state: &ReactStreamingState, - streaming_msg_id: Uuid, - room_id: Uuid, - project_id: Uuid, - model_id: Uuid, - seq: i64, - now: chrono::DateTime, - ai_display_name: &str, - db: &AppDatabase, - cache: &db::cache::AppCache, - queue: &MessageProducer, - room_manager: &RoomConnectionManager, - suppress_final_message: bool, -) { - use queue::RoomMessageStreamChunkEvent; - - let final_stream_content = normalize_thinking_content(&lock_or_recover(&state.answer_buffer)); - let final_event = RoomMessageStreamChunkEvent { - message_id: streaming_msg_id, - room_id, - seq: state - .chunk_seq - .fetch_add(1, std::sync::atomic::Ordering::Relaxed), - content: final_stream_content.clone(), - done: true, - error: None, - display_name: Some(ai_display_name.to_string()), - chunk_type: Some("answer".to_string()), - }; - queue.publish_stream_chunk(&final_event).await; - room_manager.broadcast_stream_chunk(final_event).await; - - // In room mode, skip persisting and broadcasting the final answer — - // the AI communicates exclusively through send_message tool calls. - // However, if the model did NOT call send_message but produced text, - // auto-send it as a room message so the user always gets a response. - if suppress_final_message { - // Check if model used send_message — scope the MutexGuard so it - // doesn't cross .await points (MutexGuard is not Send). - let used_send_message = { - let steps = lock_or_recover(&state.steps); - steps - .iter() - .any(|(t, c)| t == "tool_call" && c.contains("\"name\":\"send_message\"")) - }; - - if !used_send_message && !final_stream_content.trim().is_empty() { - tracing::info!( - room_id = %room_id, - "Model did not call send_message, auto-sending final content as room message" - ); - // Auto-send the model's text as a room message - let msg_id = Uuid::now_v7(); - let msg_seq = match next_room_message_seq_internal(room_id, db, cache).await { - Ok(s) => s, - Err(_) => seq, - }; - let now = chrono::Utc::now(); - - let envelope = RoomMessageEnvelope { - id: msg_id, - dedup_key: Some(format!("{}:{}", room_id, msg_id)), - room_id, - sender_type: "ai".to_string(), - sender_id: Some(model_id), - model_id: Some(model_id), - thread_id: None, - content: final_stream_content.clone(), - content_type: "text".to_string(), - thinking_content: None, - send_at: now, - seq: msg_seq, - in_reply_to: None, - display_name: Some(ai_display_name.to_string()), - }; - - if let Err(e) = room_message::Entity::insert(room_message::ActiveModel { - id: Set(msg_id), - seq: Set(msg_seq), - room: Set(room_id), - sender_type: Set(models::rooms::MessageSenderType::Ai), - sender_id: Set(Some(model_id)), - model_id: Set(Some(model_id)), - thread: Set(None), - content: Set(final_stream_content.clone()), - content_type: Set(models::rooms::MessageContentType::Text), - thinking_content: Set(None), - edited_at: Set(None), - send_at: Set(now), - revoked: Set(None), - revoked_by: Set(None), - in_reply_to: Set(None), - }) - .exec(db) - .await - { - tracing::error!(error = %e, "Failed to auto-send model text as room message"); - } - - if let Err(e) = queue.publish(room_id, envelope).await { - tracing::error!(error = %e, "Failed to publish auto-send room message"); - } else { - room_manager - .broadcast( - room_id, - queue::RoomMessageEvent { - id: msg_id, - room_id, - sender_type: "ai".to_string(), - sender_id: Some(model_id), - thread_id: None, - content: final_stream_content.clone(), - content_type: "text".to_string(), - thinking_content: None, - send_at: now, - seq: msg_seq, - in_reply_to: None, - display_name: Some(ai_display_name.to_string()), - reactions: None, - message_id: None, - }, - ) - .await; - room_manager.metrics.messages_sent.increment(1); - - let project_event = ProjectRoomEvent { - event_type: "new_message".to_string(), - project_id, - room_id: Some(room_id), - category_id: None, - message_id: Some(msg_id), - seq: Some(msg_seq), - timestamp: now, - }; - queue - .publish_project_room_event(project_id, project_event) - .await; - } - } - - // Still log call stats for billing - if result.is_ok() { - let now = chrono::Utc::now(); - if let Err(e) = room_ai::Entity::update_many() - .col_expr( - room_ai::Column::CallCount, - Expr::col(room_ai::Column::CallCount).add(1), - ) - .col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now))) - .filter(room_ai::Column::Room.eq(room_id)) - .filter(room_ai::Column::Model.eq(model_id)) - .exec(db) - .await - { - tracing::warn!(error = %e, "Failed to update room_ai call stats"); - } - } - return; - } - - let (final_content, err_msg) = match result { - Ok((content, _, _)) => (content, None), - Err(e) => { - let msg = format!("AI 处理失败: {}", e); - tracing::error!(error = ?e, "ReAct streaming failed"); - (String::new(), Some(msg)) - } - }; - - let all_steps = lock_or_recover(&state.steps).clone(); - let reasoning_chain: String = all_steps - .iter() - .filter(|(t, _)| t != "answer") - .map(|(_, c)| normalize_thinking_content(c)) - .collect::>() - .join("\n"); - - let content_to_persist = if !final_content.is_empty() { - final_content - } else if !reasoning_chain.trim().is_empty() { - format!( - "[Agent ran through {} reasoning steps but did not produce a final answer.]\n{}", - state.step_count.load(std::sync::atomic::Ordering::Relaxed), - reasoning_chain.trim_end() - ) - } else { - String::from("[No output from reasoning agent]") - }; - let content_to_persist = if let Some(msg) = &err_msg { - format!( - "{}\n[Error during reasoning: {}]", - content_to_persist.trim_end(), - msg - ) - } else { - content_to_persist - }; - - let persist_content = content_to_persist.trim().to_string(); - if persist_content.is_empty() { - return; - } - - let thinking_content_serialized = { - let steps = lock_or_recover(&state.steps); - if steps.is_empty() { - None - } else { - let chunks_json = serde_json::json!({ - "__chunks__": steps.iter().map(|(t, c)| { - let content = if t == "thinking" { - normalize_thinking_content(c) - } else { - c.clone() - }; - serde_json::json!({ - "type": t, - "content": content, - }) - }).collect::>(), - }); - Some(chunks_json.to_string()) - } - }; - - let envelope = RoomMessageEnvelope { - id: streaming_msg_id, - dedup_key: Some(format!("{}:{}", room_id, streaming_msg_id)), - room_id, - sender_type: "ai".to_string(), - sender_id: None, - model_id: Some(model_id), - thread_id: None, - content: persist_content.clone(), - content_type: "text".to_string(), - thinking_content: thinking_content_serialized.clone(), - send_at: now, - seq, - in_reply_to: None, - display_name: Some(ai_display_name.to_string()), - }; - - if let Err(e) = room_message::Entity::insert(room_message::ActiveModel { - id: Set(streaming_msg_id), - seq: Set(seq), - room: Set(room_id), - sender_type: Set(models::rooms::MessageSenderType::Ai), - sender_id: Set(None), - model_id: Set(Some(model_id)), - thread: Set(None), - content: Set(persist_content.clone()), - content_type: Set(models::rooms::MessageContentType::Text), - thinking_content: Set(thinking_content_serialized.clone()), - edited_at: Set(None), - send_at: Set(now), - revoked: Set(None), - revoked_by: Set(None), - in_reply_to: Set(None), - }) - .exec(db) - .await - { - tracing::error!(error = %e, room_id = %room_id, streaming_msg_id = %streaming_msg_id, - "Failed to persist ReAct streaming message to DB"); - return; - } - - if let Err(e) = queue.publish(room_id, envelope).await { - tracing::error!(error = %e, "Failed to publish ReAct streaming message"); - } else { - let now = Utc::now(); - if let Err(e) = room_ai::Entity::update_many() - .col_expr( - room_ai::Column::CallCount, - Expr::col(room_ai::Column::CallCount).add(1), - ) - .col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now))) - .filter(room_ai::Column::Room.eq(room_id)) - .filter(room_ai::Column::Model.eq(model_id)) - .exec(db) - .await - { - tracing::warn!(error = %e, "Failed to update room_ai call stats"); - } - - let msg_event = queue::RoomMessageEvent { - id: streaming_msg_id, - room_id, - sender_type: "ai".to_string(), - sender_id: None, - thread_id: None, - content: persist_content, - content_type: "text".to_string(), - thinking_content: thinking_content_serialized, - send_at: now, - seq, - display_name: Some(ai_display_name.to_string()), - in_reply_to: None, - reactions: None, - message_id: None, - }; - room_manager.broadcast(room_id, msg_event).await; - room_manager.metrics.messages_sent.increment(1); - - let event = ProjectRoomEvent { - event_type: crate::RoomEventType::NewMessage.as_str().into(), - project_id, - room_id: Some(room_id), - category_id: None, - message_id: Some(streaming_msg_id), - seq: Some(seq), - timestamp: now, - }; - queue.publish_project_room_event(project_id, event).await; - } -} diff --git a/libs/room/src/service/ai_react_streaming_steps.rs b/libs/room/src/service/ai_react_streaming_steps.rs deleted file mode 100644 index 4b300c9..0000000 --- a/libs/room/src/service/ai_react_streaming_steps.rs +++ /dev/null @@ -1,125 +0,0 @@ -use std::sync::Arc; - -use queue::{MessageProducer, RoomMessageStreamChunkEvent}; -use uuid::Uuid; - -use crate::connection::RoomConnectionManager; -use agent::react::ReactStep; - -pub(crate) fn lock_or_recover(mutex: &std::sync::Mutex) -> std::sync::MutexGuard<'_, T> { - mutex - .lock() - .unwrap_or_else(|poisoned| poisoned.into_inner()) -} - -pub(crate) struct ReactStreamingState { - pub(crate) steps: Arc>>, - pub(crate) answer_buffer: Arc>, - pub(crate) step_count: Arc, - pub(crate) chunk_seq: Arc, -} - -pub(crate) fn build_react_step_state() -> ReactStreamingState { - ReactStreamingState { - steps: Arc::new(std::sync::Mutex::new(Vec::new())), - answer_buffer: Arc::new(std::sync::Mutex::new(String::new())), - step_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)), - chunk_seq: Arc::new(std::sync::atomic::AtomicU64::new(1)), - } -} - -pub(crate) fn create_react_callback( - streaming_msg_id: Uuid, - room_id: Uuid, - ai_display_name: &str, - room_manager: Arc, - queue: MessageProducer, - cancel: Arc, - steps: Arc>>, - answer_buffer: Arc>, - step_count: Arc, - chunk_seq: Arc, - suppress_answer_broadcast: bool, -) -> impl FnMut(ReactStep) -> std::pin::Pin + Send>> + Send -{ - let ai_name = ai_display_name.to_string(); - let suppress = suppress_answer_broadcast; - - move |step: ReactStep| { - let room_manager = room_manager.clone(); - let queue = queue.clone(); - let cancel = cancel.clone(); - let steps = steps.clone(); - let answer_buffer = answer_buffer.clone(); - let step_count = step_count.clone(); - let chunk_seq = chunk_seq.clone(); - let ai_name = ai_name.clone(); - - let (chunk_type, content) = match &step { - ReactStep::Thought { thought, .. } => ("thinking".to_string(), thought.clone()), - ReactStep::Action { action, .. } => ( - "tool_call".to_string(), - serde_json::json!({ - "name": action.name, - "arguments": action.args, - }) - .to_string(), - ), - ReactStep::Observation { observation, .. } => { - ("tool_result".to_string(), observation.clone()) - } - ReactStep::Answer { answer, .. } => ("answer".to_string(), answer.clone()), - }; - - if matches!(&step, ReactStep::Answer { .. }) { - step_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - - { - let mut s = lock_or_recover(&steps); - if let Some(last) = s.last_mut() { - if last.0 == chunk_type { - last.1.push_str(&content); - } else { - s.push((chunk_type.clone(), content.clone())); - } - } else { - s.push((chunk_type.clone(), content.clone())); - } - } - if matches!(&step, ReactStep::Answer { .. }) { - let mut ab = lock_or_recover(&answer_buffer); - ab.push_str(&content); - } - - let current_seq = chunk_seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - - // In room mode, Answer chunks are the model's raw text output which is - // NOT visible to users — the AI must communicate through send_message. - // Only broadcast tool calls, observations, and thinking so the client - // can show streaming progress indicators. The Answer text is still - // recorded in the answer_buffer for billing/session logging. - let is_answer = matches!(&step, ReactStep::Answer { .. }); - - Box::pin(async move { - if cancel.load(std::sync::atomic::Ordering::Acquire) { - return; - } - if suppress && is_answer { - return; - } - let event = RoomMessageStreamChunkEvent { - message_id: streaming_msg_id, - room_id, - seq: current_seq, - content: content.clone(), - done: false, - error: None, - display_name: Some(ai_name.clone()), - chunk_type: Some(chunk_type), - }; - queue.publish_stream_chunk(&event).await; - room_manager.broadcast_stream_chunk(event).await; - }) - } -} diff --git a/libs/room/src/service/process_ai.rs b/libs/room/src/service/process_ai.rs deleted file mode 100644 index 45aad98..0000000 --- a/libs/room/src/service/process_ai.rs +++ /dev/null @@ -1,264 +0,0 @@ -use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; -use uuid::Uuid; - -use super::RoomService; -use super::ai_nonstreaming; -use super::ai_streaming; -use super::history; -use crate::error::RoomError; -use crate::service::{mention_bracket_re, mention_tag_re}; -use agent::chat::AiRequest; -use agent::react::ROOM_CONTEXT_PROMPT; -use agent::tool::registry::ToolRegistry; -use models::projects::project_members; - -const ROOM_DEFAULT_MAX_OUTPUT_TOKENS: i32 = 1024; -const ROOM_MAX_OUTPUT_TOKENS_HARD_CAP: i32 = 2048; - -fn resolve_room_max_tokens(configured: Option) -> i32 { - configured - .and_then(|v| i32::try_from(v).ok()) - .unwrap_or(ROOM_DEFAULT_MAX_OUTPUT_TOKENS) - .clamp(1, ROOM_MAX_OUTPUT_TOKENS_HARD_CAP) -} - -impl RoomService { - pub async fn process_message_ai( - &self, - room_id: Uuid, - _message_id: Uuid, - sender_id: Uuid, - content: String, - ) -> Result<(), RoomError> { - let Some(chat_service) = &self.chat_service else { - return Ok(()); - }; - - let mentioned_model_id = { - let mut found = None; - for cap in mention_bracket_re().captures_iter(&content) { - if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { - if type_m.as_str() == "ai" { - if let Ok(uuid) = Uuid::parse_str(id_m.as_str().trim()) { - found = Some(uuid); - break; - } - } - } - } - if found.is_none() { - for cap in mention_tag_re().captures_iter(&content) { - if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { - if type_m.as_str() == "ai" { - if let Ok(uuid) = Uuid::parse_str(id_m.as_str().trim()) { - found = Some(uuid); - break; - } - } - } - } - } - found - }; - - let model_id = match mentioned_model_id { - Some(id) => id, - None => return Ok(()), - }; - - let ai_config = match models::rooms::room_ai::Entity::find() - .filter(models::rooms::room_ai::Column::Room.eq(room_id)) - .filter(models::rooms::room_ai::Column::Model.eq(model_id)) - .one(&self.db) - .await? - { - Some(c) => c, - None => return Ok(()), - }; - - let Some(lock_guard) = - crate::room_ai_queue::acquire_room_ai_lock(&self.cache, room_id).await? - else { - return Ok(()); - }; - - let room = self.find_room_or_404(room_id).await?; - - let project = models::projects::project::Entity::find_by_id(room.project) - .one(&self.db) - .await? - .ok_or_else(|| RoomError::NotFound("Project not found".to_string()))?; - - let context_setting = - models::projects::project_context_setting::Entity::find_by_id(project.id) - .one(&self.db) - .await - .map_err(|_| ()) - .ok() - .and_then(|x| x); - - let model = models::agents::model::Entity::find_by_id(model_id) - .one(&self.db) - .await? - .ok_or_else(|| RoomError::NotFound("AI model not found".to_string()))?; - - let sender = models::users::User::find_by_id(sender_id) - .one(&self.db) - .await? - .ok_or_else(|| RoomError::NotFound("Sender not found".to_string()))?; - - let history = history::get_room_history(&self.db, room_id, 50).await?; - - let user_ids: Vec = history - .iter() - .filter_map(|m| m.sender_id) - .chain(std::iter::once(sender_id)) - .collect(); - let user_names = self.get_user_names(&user_ids).await; - - let mentions = history::extract_mention_context(&self.db, room.project, &content).await; - - // Build room-only tool registry (send_message, retract_message) - let mut room_tools = ToolRegistry::new(); - fctool::chat_tools::register_room_tools(&mut room_tools); - - // Query sender's project role for permission context - let sender_role = project_members::Entity::find() - .filter(project_members::Column::Project.eq(project.id)) - .filter(project_members::Column::User.eq(sender_id)) - .one(&self.db) - .await - .ok() - .flatten() - .map(|m| { - m.scope_role() - .map(|r| r.to_string()) - .unwrap_or_else(|_| "guest".into()) - }) - .unwrap_or_else(|| "guest".into()); - - let max_tokens = resolve_room_max_tokens(ai_config.max_tokens); - - let mut request = AiRequest { - db: self.db.clone(), - cache: self.cache.clone(), - config: self.config.clone(), - model, - project: project.clone(), - context_setting, - sender: sender.clone(), - room: room.clone(), - input: content, - mention: mentions, - history, - history_cutoff_seq: None, - user_names, - temperature: ai_config.temperature.unwrap_or(0.7), - max_tokens, - top_p: 1.0, - frequency_penalty: 0.0, - presence_penalty: 0.0, - think: ai_config.think, - tools: Some(chat_service.tools()), - max_tool_depth: 1000, - execution_profile: None, - room_preamble: None, - }; - - let (optimized_history, cutoff_seq) = chat_service - .build_room_optimized_context_text(&request) - .await - .unwrap_or_else(|e| { - tracing::warn!(error = %e, "room optimized context failed; using recent history"); - (String::new(), None) - }); - request.history_cutoff_seq = cutoff_seq; - - request.room_preamble = Some(build_room_preamble( - &room, - &project, - &sender, - &sender_role, - &optimized_history, - )); - - let use_streaming = ai_config.stream; - - // Dispatch to direct streaming or nonstreaming with room tools - if use_streaming { - ai_streaming::process_message_ai_streaming( - chat_service.clone(), - request, - room_id, - room.project, - model_id, - lock_guard, - self.db.clone(), - self.cache.clone(), - self.queue.clone(), - self.room_manager.clone(), - room_tools, - ) - .await; - } else { - ai_nonstreaming::process_message_ai_nonstreaming( - chat_service.clone(), - request, - room_id, - room.project, - model_id, - lock_guard, - self.db.clone(), - self.cache.clone(), - self.queue.clone(), - self.room_manager.clone(), - room_tools, - ) - .await; - } - - Ok(()) - } -} - -/// Build a room-specific preamble for the AI system prompt. -fn build_room_preamble( - room: &models::rooms::room::Model, - project: &models::projects::project::Model, - sender: &models::users::user::Model, - sender_role: &str, - optimized_history: &str, -) -> String { - let mut preamble = String::new(); - - preamble.push_str(&format!( - "## Room Context\n\n\ - You are in room **{}** (ID: `{}`) of project **{}** (ID: `{}`).\n\ - Project description: {}\n", - room.room_name, - room.id, - project.display_name, - project.id, - project.description.as_deref().unwrap_or("(none)"), - )); - - preamble.push_str(&format!( - "\n### Who Mentioned You\n\ - - **User:** {} (ID: `{}`)\n\ - - **Project Role:** {}\n", - sender.username, sender.uid, sender_role, - )); - if let Some(ref display_name) = sender.display_name { - preamble.push_str(&format!("- **Display Name:** {}\n", display_name)); - } - - if !optimized_history.trim().is_empty() { - preamble.push_str("\n"); - preamble.push_str(optimized_history); - preamble.push_str("\n"); - } - - preamble.push_str(ROOM_CONTEXT_PROMPT); - - preamble -}