use std::pin::Pin; use std::sync::Arc; use std::time::Duration; use models::projects::project_skill; use models::rooms::room_ai; use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; use uuid::Uuid; use super::context::RoomMessageContext; use super::{AiChunkType, AiRequest, AiStreamChunk, Mention, StreamCallback}; use crate::client::types::{ChatRequestMessage, ToolCall}; use crate::client::AiClientConfig; use crate::client::{call_stream, call_with_params, StreamChunk, StreamChunkType, StreamedToolCall}; use crate::compact::{CompactConfig, CompactService}; use crate::embed::EmbedService; use crate::error::{AgentError, Result}; use crate::perception::{PerceptionService, SkillEntry, ToolCallEvent}; use crate::react::{ReactAgent, ReactConfig, DEFAULT_SYSTEM_PROMPT}; use crate::tool::{ToolCall as AgentToolCall, ToolContext, ToolExecutor, ToolResult, registry::ToolRegistry}; /// Result from streaming AI response. pub struct StreamResult { pub content: String, pub reasoning_content: String, pub input_tokens: i64, pub output_tokens: i64, /// All chunks in arrival order — preserves ReAct multi-cycle ordering. pub chunks: Vec, } /// Result from non-streaming AI response. pub struct ProcessResult { pub content: String, pub input_tokens: i64, pub output_tokens: i64, } /// Service for handling AI chat requests in rooms. pub struct ChatService { ai_base_url: Option, ai_api_key: Option, compact_service: Option, embed_service: Option, perception_service: PerceptionService, tool_registry: Option, } impl ChatService { pub fn new() -> Self { Self { ai_base_url: None, ai_api_key: None, compact_service: None, embed_service: None, perception_service: PerceptionService::default(), tool_registry: None, } } pub fn with_ai_client_config(mut self, config: AiClientConfig) -> Self { self.ai_base_url = config.base_url.clone(); self.ai_api_key = Some(config.api_key.clone()); self } pub fn with_compact_service(mut self, compact_service: CompactService) -> Self { self.compact_service = Some(compact_service); self } pub fn with_embed_service(mut self, embed_service: EmbedService) -> Self { self.embed_service = Some(embed_service); self } pub fn with_perception_service(mut self, perception_service: PerceptionService) -> Self { self.perception_service = perception_service; self } pub fn with_tool_registry(mut self, registry: ToolRegistry) -> Self { self.tool_registry = Some(registry); self } /// Returns all registered tools as JSON tool definitions. pub fn tools(&self) -> Vec { self.tool_registry .as_ref() .map(|r| r.to_openai_tools()) .unwrap_or_default() } /// Build a RigToolSet from the registered tool registry. /// /// This enables using the same tools with `RigAgentService` via rig's native Agent. /// The context (db, cache, config, room_id, sender_id) is passed through to each /// tool handler at creation time. #[cfg(feature = "rig")] pub fn rig_toolset( &self, db: db::database::AppDatabase, cache: db::cache::AppCache, config: config::AppConfig, room_id: uuid::Uuid, sender_id: Option, ) -> Option { self.tool_registry.as_ref().map(|registry| { crate::RigToolSet::from_registry(registry, db, cache, config, room_id, sender_id) }) } /// Get a reference to the underlying ToolRegistry. pub fn tool_registry(&self) -> Option<&ToolRegistry> { self.tool_registry.as_ref() } pub async fn process(&self, request: AiRequest) -> Result { let tools: Vec = request.tools.clone().unwrap_or_default(); let tools_enabled = !tools.is_empty(); let max_tool_depth = request.max_tool_depth; let mut messages = self.build_messages(&request).await?; let room_ai = room_ai::Entity::find() .filter(room_ai::Column::Room.eq(request.room.id)) .filter(room_ai::Column::Model.eq(request.model.id)) .one(&request.db) .await?; let model_name = request.model.name.clone(); let temperature = room_ai .as_ref() .and_then(|r| r.temperature.map(|v| v as f32)) .unwrap_or(request.temperature as f32); let max_tokens = room_ai .as_ref() .and_then(|r| r.max_tokens.map(|v| v as u32)) .unwrap_or(request.max_tokens as u32); let mut tool_depth = 0; let mut input_tokens = 0i64; let mut output_tokens = 0i64; let config = AiClientConfig::new( self.ai_api_key.clone().unwrap_or_default(), ) .with_base_url(self.ai_base_url.clone().unwrap_or_else(|| "https://api.openai.com".into())); loop { let response = call_with_params( &messages, &model_name, &config, temperature, max_tokens, None, if tools_enabled { Some(&tools) } else { None }, if tools_enabled { None } else { Some("none") }, ) .await?; let text = response.content.clone(); input_tokens += response.input_tokens; output_tokens += response.output_tokens; if tools_enabled && !response.tool_calls_finished.is_empty() { // Build assistant message with tool_calls let tool_call_messages: Vec<_> = response .tool_calls_finished .iter() .map(|name| { // We need ID and arguments — for non-streaming we reconstruct from content // The model returns tool_calls in its content; for now we create a placeholder // that will be replaced by actual tool results ToolCall { id: Uuid::new_v4().to_string(), type_: "function".into(), function: crate::client::types::ToolCallFunction { name: name.clone(), arguments: "{}".into(), }, } }) .collect(); messages.push( ChatRequestMessage::assistant(Some(text.clone()), Some(tool_call_messages.clone())) ); // Create ToolCall list for executor (we need real IDs and args) // Since we can't get args from streaming, use name matching from the text let calls: Vec = tool_call_messages .into_iter() .map(|tc| AgentToolCall { id: tc.id.clone(), name: tc.function.name.clone(), arguments: tc.function.arguments.clone(), }) .collect(); let tool_messages = { let mut ctx = ToolContext::new( request.db.clone(), request.cache.clone(), request.config.clone(), request.room.id, Some(request.sender.uid), ) .with_project(request.project.id); if let Some(ref registry) = self.tool_registry { ctx.registry_mut().merge(registry.clone()); } let executor = ToolExecutor::new(); match executor.execute_batch(calls, &mut ctx).await { Ok(results) => ToolExecutor::to_tool_messages(&results), Err(e) => { let err_msg = format!("[Tool call failed: {}]", e); response .tool_calls_finished .iter() .map(|_| ChatRequestMessage::tool(Uuid::new_v4().to_string(), &err_msg)) .collect() } } }; messages.extend(tool_messages); // Inject passive-detected skills based on tool calls if let Ok(skills) = project_skill::Entity::find() .filter(project_skill::Column::ProjectUuid.eq(request.project.id)) .filter(project_skill::Column::Enabled.eq(true)) .all(&request.db) .await { let skill_entries: Vec = skills .into_iter() .map(|s| SkillEntry { slug: s.slug, name: s.name, description: s.description, content: s.content, }) .collect(); let tool_events: Vec = response .tool_calls_finished .iter() .map(|name| ToolCallEvent { tool_name: name.clone(), arguments: String::new(), }) .collect(); for event in &tool_events { if let Some(ctx) = self.perception_service.passive.detect(event, &skill_entries) { messages.push(ctx.to_system_message()); } } } tool_depth += 1; if tool_depth >= max_tool_depth { let content = if text.is_empty() { format!( "[AI reached maximum tool depth ({}) — no final answer produced]", max_tool_depth ) } else { text }; return Ok(ProcessResult { content, input_tokens, output_tokens }); } continue; } return Ok(ProcessResult { content: text, input_tokens, output_tokens }); } } pub async fn process_stream(&self, request: AiRequest, on_chunk: StreamCallback) -> Result { // Wrap on_chunk in Arc so it can be shared across loop iterations let on_chunk = Arc::new(on_chunk); let tools: Vec = request.tools.clone().unwrap_or_default(); let tools_enabled = !tools.is_empty(); let max_tool_depth = request.max_tool_depth; let mut messages = self.build_messages(&request).await?; let room_ai = room_ai::Entity::find() .filter(room_ai::Column::Room.eq(request.room.id)) .filter(room_ai::Column::Model.eq(request.model.id)) .one(&request.db) .await?; let model_name = request.model.name.clone(); let temperature = room_ai .as_ref() .and_then(|r| r.temperature.map(|v| v as f32)) .unwrap_or(request.temperature as f32); let max_tokens = room_ai .as_ref() .and_then(|r| r.max_tokens.map(|v| v as u32)) .unwrap_or(request.max_tokens as u32); let mut tool_depth = 0; let config = AiClientConfig::new( self.ai_api_key.clone().unwrap_or_default(), ) .with_base_url(self.ai_base_url.clone().unwrap_or_else(|| "https://api.openai.com".into())); let mut full_content = String::new(); let mut all_chunks: Vec = Vec::new(); // Collect tool calls during streaming, push them incrementally after. let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); loop { let on_chunk_cb = on_chunk.clone(); let on_chunk_cb2 = on_chunk_cb.clone(); let tx_arc = Arc::new(tx.clone()); let tx_arc2 = tx_arc.clone(); let response = call_stream( &messages, &model_name, &config, temperature, max_tokens, if tools_enabled { Some(&tools) } else { None }, Arc::new(move |delta| { let fut = on_chunk_cb(AiStreamChunk { content: delta.to_string(), done: false, chunk_type: AiChunkType::Answer, }); fut }), Arc::new(move |delta| { let fut = on_chunk_cb2(AiStreamChunk { content: delta.to_string(), done: false, chunk_type: AiChunkType::Thinking, }); fut }), Arc::new(move |tc: &StreamedToolCall| { let tx = tx_arc2.clone(); let tc_owned = tc.clone(); Box::pin(async move { let _ = tx.send(tc_owned); }) as Pin + Send>> }), ) .await?; // Collect chunks from this streaming iteration in order. all_chunks.extend(response.chunks); let has_tool_calls = tools_enabled && !response.tool_calls.is_empty(); if has_tool_calls { // Accumulate the assistant's text before tool calls full_content.push_str(&response.content); full_content.push('\n'); // Build assistant message with tool_calls from streaming response let tool_calls: Vec = response .tool_calls .iter() .map(|tc| ToolCall { id: tc.id.clone(), type_: "function".into(), function: crate::client::types::ToolCallFunction { name: tc.name.clone(), arguments: tc.arguments.clone(), }, }) .collect(); messages.push(ChatRequestMessage::assistant( Some(response.content.clone()), Some(tool_calls.clone()), )); // Push each tool call incrementally to frontend. // Use try_recv() — tx is never dropped so recv() would deadlock. loop { match rx.try_recv() { Ok(tc) => { let args_display = if tc.arguments.len() > 100 { format!("{}...", &tc.arguments[..100]) } else { tc.arguments.clone() }; let tool_display = format!("🔧 {}({})", tc.name, args_display); on_chunk(AiStreamChunk { content: tool_display.clone(), done: false, chunk_type: AiChunkType::ToolCall, }) .await; all_chunks.push(StreamChunk { chunk_type: StreamChunkType::ToolCall, content: tool_display, }); } Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break, Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break, } } // Execute tools one at a time, push each result incrementally let calls: Vec = response .tool_calls .iter() .map(|tc| AgentToolCall { id: tc.id.clone(), name: tc.name.clone(), arguments: tc.arguments.clone(), }) .collect(); let mut tool_messages = Vec::new(); for call in &calls { let ctx = &mut crate::tool::ToolContext::new( request.db.clone(), request.cache.clone(), request.config.clone(), request.room.id, Some(request.sender.uid), ); if let Some(ref registry) = self.tool_registry { ctx.registry_mut().merge(registry.clone()); } let executor = crate::tool::ToolExecutor::new(); let results = match executor.execute_batch(vec![call.clone()], ctx).await { Ok(r) => r, Err(e) => { let err_text = format!("[Tool call failed: {}]", e); tracing::warn!(tool = %call.name, error = %e, "tool_call_failed"); // Do NOT emit tool_result chunks to frontend — show error via tool_call instead let err_display = format!("❌ {} (failed)", call.name); on_chunk(AiStreamChunk { content: err_display.clone(), done: false, chunk_type: AiChunkType::ToolCall, }) .await; all_chunks.push(StreamChunk { chunk_type: StreamChunkType::ToolCall, content: err_display, }); tool_messages.push(ChatRequestMessage::tool(&call.id, &err_text)); continue; } }; for result in &results { let text = match &result.result { crate::tool::ToolResult::Ok(v) => v.to_string(), crate::tool::ToolResult::Error(msg) => msg.clone(), }; let preview = if text.len() > 300 { format!("{}...", &text[..300]) } else { text.clone() }; tracing::debug!("tool_result: {} — {}", call.name, preview); // Do NOT emit tool_result chunks to frontend — raw output may contain sensitive data. // Log server-side only; frontend sees tool_call status via on_chunk below. } let success_display = format!("✅ {}", call.name); on_chunk(AiStreamChunk { content: success_display.clone(), done: false, chunk_type: AiChunkType::ToolCall, }) .await; all_chunks.push(StreamChunk { chunk_type: StreamChunkType::ToolCall, content: success_display, }); let msgs = crate::tool::ToolExecutor::to_tool_messages(&results); tool_messages.extend(msgs); } messages.extend(tool_messages); // Inject passive-detected skills based on tool calls if let Ok(skills) = project_skill::Entity::find() .filter(project_skill::Column::ProjectUuid.eq(request.project.id)) .filter(project_skill::Column::Enabled.eq(true)) .all(&request.db) .await { let skill_entries: Vec = skills .into_iter() .map(|s| SkillEntry { slug: s.slug, name: s.name, description: s.description, content: s.content, }) .collect(); let tool_events: Vec = response .tool_calls .iter() .map(|tc| ToolCallEvent { tool_name: tc.name.clone(), arguments: tc.arguments.clone(), }) .collect(); for event in &tool_events { if let Some(ctx) = self.perception_service.passive.detect(event, &skill_entries) { messages.push(ctx.to_system_message()); } } } tool_depth += 1; if tool_depth >= max_tool_depth { let max_depth_text = format!( "[AI reached maximum tool depth ({}) — no final answer produced]", max_tool_depth ); on_chunk(AiStreamChunk { content: max_depth_text.clone(), done: true, chunk_type: AiChunkType::Answer, }) .await; all_chunks.push(StreamChunk { chunk_type: StreamChunkType::Answer, content: max_depth_text, }); return Ok(StreamResult { content: full_content, reasoning_content: String::new(), input_tokens: 0, output_tokens: 0, chunks: all_chunks, }); } continue; } // Final answer — accumulate and return full_content.push_str(&response.content); on_chunk(AiStreamChunk { content: response.content.clone(), done: true, chunk_type: AiChunkType::Answer, }) .await; all_chunks.push(StreamChunk { chunk_type: StreamChunkType::Answer, content: response.content.clone(), }); return Ok(StreamResult { content: full_content, reasoning_content: response.reasoning_content, input_tokens: response.input_tokens, output_tokens: response.output_tokens, chunks: all_chunks, }); } } async fn build_messages(&self, request: &AiRequest) -> Result> { let mut messages = Vec::new(); let mut processed_history = Vec::new(); if let Some(compact_service) = &self.compact_service { let config = CompactConfig::default(); match compact_service .compact_room_auto(request.room.id, Some(request.user_names.clone()), config) .await { Ok(compact_summary) => { if !compact_summary.summary.is_empty() { messages.push(ChatRequestMessage::system(format!( "Conversation summary:\n{}", compact_summary.summary ))); } processed_history = compact_summary.retained; } Err(e) => { let _ = e; } } } if !processed_history.is_empty() { for msg_summary in processed_history { let ctx = RoomMessageContext::from(msg_summary); messages.push(ctx.to_message()); } } else { for msg in &request.history { let ctx = RoomMessageContext::from_model_with_names(msg, &request.user_names); messages.push(ctx.to_message()); } } if let Some(embed_service) = &self.embed_service { for mention in &request.mention { match mention { Mention::Repo(repo) => { let query = format!( "{} {}", repo.repo_name, repo.description.as_deref().unwrap_or_default() ); match embed_service.search_issues(&query, 5).await { Ok(issues) if !issues.is_empty() => { let context = format!( "Related issues:\n{}", issues .iter() .map(|i| format!("- {}", i.payload.text)) .collect::>() .join("\n") ); messages.push(ChatRequestMessage::system(context)); } Err(e) => { let _ = e; } _ => {} } match embed_service.search_repos(&query, 3).await { Ok(repos) if !repos.is_empty() => { let context = format!( "Related repositories:\n{}", repos .iter() .map(|r| format!("- {}", r.payload.text)) .collect::>() .join("\n") ); messages.push(ChatRequestMessage::system(context)); } Err(e) => { let _ = e; } _ => {} } } Mention::User(user) => { let mut profile_parts = vec![format!("Username: {}", user.username)]; if let Some(ref display_name) = user.display_name { profile_parts.push(format!("Display name: {}", display_name)); } if let Some(ref org) = user.organization { profile_parts.push(format!("Organization: {}", org)); } if let Some(ref website) = user.website_url { profile_parts.push(format!("Website: {}", website)); } messages.push(ChatRequestMessage::system(format!( "Mentioned user profile:\n{}", profile_parts.join("\n") ))); } } } } let skill_contexts = self.build_skill_context(request).await; for ctx in skill_contexts { messages.push(ctx.to_system_message()); } let memories = self.build_memory_context(request).await; for mem in memories { messages.push(mem.to_system_message()); } messages.push(ChatRequestMessage::system(format!( "Current Project:\n{}\nDescription: {}\nPublic: {}", request.project.display_name, request.project.description.as_deref().unwrap_or("(none)"), if request.project.is_public { "yes" } else { "no" } ))); let mut sender_parts = vec![format!("**Sender:** {}", request.sender.username)]; if let Some(ref display_name) = request.sender.display_name { sender_parts.push(display_name.clone()); } if let Some(ref org) = request.sender.organization { sender_parts.push(format!("({})", org)); } let sender_display = sender_parts.join(" "); messages.push(ChatRequestMessage::system(format!( "The person sending the next message:\n{}", sender_display ))); messages.push(ChatRequestMessage::user(&request.input)); Ok(messages) } async fn build_skill_context( &self, request: &AiRequest, ) -> Vec { let skills: Vec = match project_skill::Entity::find() .filter(project_skill::Column::ProjectUuid.eq(request.project.id)) .filter(project_skill::Column::Enabled.eq(true)) .all(&request.db) .await { Ok(models) => models .into_iter() .map(|s| SkillEntry { slug: s.slug, name: s.name, description: s.description, content: s.content, }) .collect(), Err(_) => return Vec::new(), }; if skills.is_empty() { return Vec::new(); } let history_texts: Vec = request .history .iter() .rev() .take(10) .map(|msg| msg.content.clone()) .collect(); let tool_events: Vec = Vec::new(); let keyword_skills = self .perception_service .inject_skills(&request.input, &history_texts, &tool_events, &skills) .await; let mut vector_skills = Vec::new(); if let Some(embed_service) = &self.embed_service { let awareness = crate::perception::VectorActiveAwareness::default(); vector_skills = awareness .detect(embed_service, &request.input, &request.project.id.to_string()) .await; } let mut seen = std::collections::HashSet::new(); let mut result = Vec::new(); for ctx in vector_skills { if seen.insert(ctx.label.clone()) { result.push(ctx); } } for ctx in keyword_skills { if seen.insert(ctx.label.clone()) { result.push(ctx); } } result } async fn build_memory_context( &self, request: &AiRequest, ) -> Vec { let embed_service = match &self.embed_service { Some(s) => s, None => return Vec::new(), }; let awareness = crate::perception::VectorPassiveAwareness::default(); awareness .detect( embed_service, &request.input, &request.project.display_name, &request.room.id.to_string(), ) .await } fn is_retryable_tool_error(msg: &str) -> bool { let msg_lower = msg.to_lowercase(); msg_lower.contains("connection") || msg_lower.contains("timeout") || msg_lower.contains("timed out") || msg_lower.contains("rate limit") || msg_lower.contains("too many") || msg_lower.contains("unavailable") || msg_lower.contains("service unavailable") || msg_lower.contains("temporarily") || msg_lower.contains("refused") || msg_lower.contains("reset") || msg_lower.contains("broken pipe") || msg_lower.contains("deadline exceeded") || msg_lower.contains("try again") } pub async fn process_react( &self, request: &AiRequest, mut on_chunk: C, ) -> Result where C: FnMut(crate::react::ReactStep) + Send, { let base_url = self.ai_base_url.clone().unwrap_or_else(|| "https://api.openai.com".into()); let api_key = self.ai_api_key.clone().unwrap_or_default(); let client_config = AiClientConfig::new(api_key).with_base_url(base_url); let Some(registry) = &self.tool_registry else { return Err(AgentError::Internal("no tool registry registered".into())); }; let db = request.db.clone(); let cache = request.cache.clone(); let config = request.config.clone(); let room_id = request.room.id; let project_id = Some(request.project.id); let sender_uid = Some(request.sender.uid); let registry = registry.clone(); let executor: std::sync::Arc< dyn Fn(String, serde_json::Value) -> Pin> + Send>> + Send + Sync, > = std::sync::Arc::new(move |name: String, args: serde_json::Value| { let db = db.clone(); let cache = cache.clone(); let config = config.clone(); let room_id = room_id; let project_id = project_id; let sender_uid = sender_uid; let registry = registry.clone(); Box::pin(async move { let max_retries = 3; let mut last_err = String::new(); for attempt in 0..=max_retries { let mut ctx = ToolContext::new(db.clone(), cache.clone(), config.clone(), room_id, sender_uid); if let Some(pid) = project_id { ctx = ctx.with_project(pid); } ctx.registry_mut().merge(registry.clone()); let tool_executor = ToolExecutor::new(); let call = AgentToolCall { id: Uuid::new_v4().to_string(), name: name.clone(), arguments: serde_json::to_string(&args).unwrap_or_else(|_| "{}".into()), }; match tool_executor.execute_batch(vec![call], &mut ctx).await { Ok(results) => { let result = results.into_iter().next() .ok_or_else(|| "no tool result returned".to_string())?; match result.result { ToolResult::Ok(v) => return Ok(v), ToolResult::Error(msg) => { if attempt < max_retries && Self::is_retryable_tool_error(&msg) { last_err = msg; let backoff_ms = 100u64.saturating_mul(2u64.pow(attempt as u32)); tracing::warn!( tool = %name, attempt = attempt + 1, backoff_ms = backoff_ms, error = %last_err, "tool_execute_retry" ); tokio::time::sleep(Duration::from_millis(backoff_ms)).await; continue; } return Err(msg); } } } Err(e) => { last_err = e.to_string(); if attempt < max_retries && Self::is_retryable_tool_error(&last_err) { let backoff_ms = 100u64.saturating_mul(2u64.pow(attempt as u32)); tracing::warn!( tool = %name, attempt = attempt + 1, backoff_ms = backoff_ms, error = %last_err, "tool_execute_retry" ); tokio::time::sleep(Duration::from_millis(backoff_ms)).await; continue; } return Err(last_err); } } } Err(last_err) }) as Pin> + Send>> }); let tools = self.tools(); let config = ReactConfig { max_steps: request.max_tool_depth, stop_sequences: Vec::new(), tool_executor: Some(executor), }; let mut agent = ReactAgent::new(DEFAULT_SYSTEM_PROMPT, tools, config); agent.add_user_message(&request.input); agent .run(&request.model.name, &client_config, |step| { on_chunk(step); }) .await } }