mod access; mod ai_common; mod ai_nonstreaming; mod ai_react_nonstreaming; mod ai_react_streaming; mod ai_streaming; mod history; mod mentions; mod notifications; mod patterns; mod sequence; mod workers; pub use access::{check_room_access, check_project_member, require_room_member, find_room_or_404}; pub use ai_common::create_and_publish_ai_message; pub use ai_nonstreaming::process_message_ai_nonstreaming; pub use ai_react_nonstreaming::process_message_ai_react_nonstreaming; pub use ai_react_streaming::process_message_ai_react_streaming; pub use ai_streaming::process_message_ai_streaming; pub use history::{get_room_history, get_user_names, get_room_ai_config, extract_mention_context}; pub use mentions::extract_mentions; pub use notifications::{notify_project_members, publish_room_event}; pub use sequence::next_room_message_seq_internal; pub use workers::{start_workers, spawn_agent_task, spawn_room_workers, PushNotificationFn}; use std::sync::Arc; use chrono::Utc; use db::cache::AppCache; use db::database::AppDatabase; use models::rooms::room; use models::rooms::room_ai; use queue::{MessageProducer, ProjectRoomEvent}; use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; use uuid::Uuid; use crate::connection::{RoomConnectionManager, DedupCache}; use crate::error::RoomError; use agent::chat::{AiRequest, ChatService}; use agent::embed::EmbedService; use agent::TaskService; use models::agent_task::AgentType; use crate::service::patterns::{mention_bracket_re, mention_tag_re}; const DEFAULT_MAX_CONCURRENT_WORKERS: usize = 1024; #[derive(Clone)] pub struct RoomService { pub db: AppDatabase, pub cache: AppCache, pub config: config::AppConfig, pub room_manager: Arc, pub queue: MessageProducer, pub redis_url: String, pub chat_service: Option>, pub task_service: Option>, pub embed_service: Option>, pub push_fn: Option, worker_semaphore: Arc, dedup_cache: DedupCache, } impl RoomService { pub fn new( db: AppDatabase, cache: AppCache, config: config::AppConfig, queue: MessageProducer, room_manager: Arc, redis_url: String, chat_service: Option>, task_service: Option>, max_concurrent_workers: Option, push_fn: Option, embed_service: Option>, ) -> Self { let dedup_cache: DedupCache = Arc::new(dashmap::DashMap::with_capacity_and_hasher(10000, Default::default())); Self { db, cache, config, room_manager, queue, redis_url, chat_service, task_service, embed_service, worker_semaphore: Arc::new(tokio::sync::Semaphore::new( max_concurrent_workers.unwrap_or(DEFAULT_MAX_CONCURRENT_WORKERS), )), dedup_cache, push_fn, } } pub async fn start_workers( &self, shutdown_rx: tokio::sync::broadcast::Receiver<()>, ) -> anyhow::Result<()> { workers::start_workers( self.db.clone(), self.cache.clone(), self.room_manager.clone(), self.queue.clone(), self.redis_url.clone(), self.dedup_cache.clone(), self.task_service.clone(), None, // max_concurrent_workers handled by semaphore shutdown_rx, self.embed_service.clone(), ) .await } pub async fn spawn_agent_task( &self, project_id: Uuid, agent_type: AgentType, input: String, _title: Option, execute: F, ) -> anyhow::Result where F: FnOnce(i64, Arc) -> Fut + Send + 'static, Fut: std::future::Future> + Send, { let task_service = match &self.task_service { Some(ts) => ts.clone(), None => return Err(anyhow::anyhow!("task service not configured")), }; workers::spawn_agent_task( project_id, agent_type, input, task_service, self.queue.clone(), self.room_manager.clone(), self.worker_semaphore.clone(), execute, ) .await } pub fn spawn_room_workers(&self, room_id: uuid::Uuid) { workers::spawn_room_workers( room_id, self.db.clone(), self.room_manager.clone(), self.queue.clone(), self.redis_url.clone(), self.worker_semaphore.clone(), self.embed_service.clone(), ); } pub async fn publish_room_event( &self, project_id: uuid::Uuid, event_type: super::RoomEventType, room_id: Option, category_id: Option, message_id: Option, seq: Option, ) { let event = ProjectRoomEvent { event_type: event_type.as_str().into(), project_id, room_id, category_id, message_id, seq, timestamp: Utc::now(), }; self.queue .publish_project_room_event(project_id, event) .await; } pub fn notify_project_members( &self, project_id: uuid::Uuid, notification_type: super::NotificationType, title: String, content: Option, related_room_id: Option, ) { notifications::notify_project_members( self.db.clone(), project_id, notification_type, title, content, related_room_id, ); } pub fn extract_mentions(content: &str) -> Vec { mentions::extract_mentions(content) } pub async fn resolve_mentions(&self, content: &str) -> Vec { use models::users::User; use sea_orm::EntityTrait; let mut resolved: Vec = Vec::new(); let mut seen_usernames: Vec = Vec::new(); for cap in mention_bracket_re().captures_iter(content) { if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { if type_m.as_str() == "user" { let id = id_m.as_str().trim(); if let Ok(uuid) = Uuid::parse_str(id) { if !resolved.contains(&uuid) { resolved.push(uuid); } } else if let Some(label_m) = cap.get(3) { let label = label_m.as_str().trim(); if !label.is_empty() { let label_lower = label.to_lowercase(); if seen_usernames.contains(&label_lower) { continue; } seen_usernames.push(label_lower.clone()); if let Some(user) = User::find() .filter(models::users::user::Column::Username.eq(label_lower)) .one(&self.db) .await .ok() .flatten() { if !resolved.contains(&user.uid) { resolved.push(user.uid); } } } } } } } resolved } pub async fn check_room_access(&self, room_id: Uuid, user_id: Uuid) -> Result<(), RoomError> { access::check_room_access(&self.db, room_id, user_id).await } pub async fn check_project_member( &self, project_id: Uuid, user_id: Uuid, ) -> Result<(), RoomError> { access::check_project_member(&self.db, project_id, user_id).await } pub async fn should_ai_respond(&self, room_id: Uuid, content: &str) -> Result { let ai_config = history::get_room_ai_config(&self.db, room_id).await?; let config = match ai_config { Some(c) => c, None => return Ok(false), }; if !config.use_exact { return Ok(true); } let model_id_str = config.model.to_string(); for cap in mention_bracket_re().captures_iter(content) { if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { if type_m.as_str() == "ai" && id_m.as_str().trim() == model_id_str { return Ok(true); } } } for cap in mention_tag_re().captures_iter(content) { if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { if type_m.as_str() == "ai" && id_m.as_str().trim() == model_id_str { return Ok(true); } } } Ok(false) } pub async fn get_room_ai_config( &self, room_id: Uuid, ) -> Result, RoomError> { history::get_room_ai_config(&self.db, room_id).await } pub async fn get_user_names( &self, user_ids: &[Uuid], ) -> std::collections::HashMap { history::get_user_names(&self.db, user_ids).await } pub async fn require_room_member(&self, room_id: Uuid, user_id: Uuid) -> Result<(), RoomError> { access::require_room_member(&self.db, room_id, user_id).await } pub async fn find_room_or_404(&self, room_id: Uuid) -> Result { access::find_room_or_404(&self.db, room_id).await } pub async fn process_message_ai( &self, room_id: Uuid, _message_id: Uuid, sender_id: Uuid, content: String, ) -> Result<(), RoomError> { let Some(chat_service) = &self.chat_service else { return Ok(()); }; let Some(ai_config) = self.get_room_ai_config(room_id).await? else { return Ok(()); }; let Some(lock_guard) = crate::room_ai_queue::acquire_room_ai_lock(&self.cache, room_id).await? else { return Ok(()); }; let room = self.find_room_or_404(room_id).await?; let project = models::projects::project::Entity::find_by_id(room.project) .one(&self.db) .await? .ok_or_else(|| RoomError::NotFound("Project not found".to_string()))?; let mentioned_model_id = { let mut found = None; for cap in mention_bracket_re().captures_iter(&content) { if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { if type_m.as_str() == "ai" { if let Ok(uuid) = Uuid::parse_str(id_m.as_str().trim()) { found = Some(uuid); break; } } } } found }; let model_id = mentioned_model_id.unwrap_or(ai_config.model); let model = models::agents::model::Entity::find_by_id(model_id) .one(&self.db) .await? .ok_or_else(|| RoomError::NotFound("AI model not found".to_string()))?; let sender = models::users::User::find_by_id(sender_id) .one(&self.db) .await? .ok_or_else(|| RoomError::NotFound("Sender not found".to_string()))?; let history = history::get_room_history(&self.db, room_id, 50).await?; let user_ids: Vec = history .iter() .filter_map(|m| m.sender_id) .chain(std::iter::once(sender_id)) .collect(); let user_names = self.get_user_names(&user_ids).await; let mentions = history::extract_mention_context(&self.db, room.project, &content).await; let request = AiRequest { db: self.db.clone(), cache: self.cache.clone(), config: self.config.clone(), model, project: project.clone(), sender, room: room.clone(), input: content, mention: mentions, history, user_names, temperature: ai_config.temperature.unwrap_or(0.7), max_tokens: ai_config.max_tokens.unwrap_or(4096) as i32, top_p: 1.0, frequency_penalty: 0.0, presence_penalty: 0.0, think: ai_config.think, tools: Some(chat_service.tools()), max_tool_depth: 1000, }; let use_streaming = ai_config.stream; let is_react = ai_config.agent_type.as_deref() == Some("react"); if is_react { if use_streaming { ai_react_streaming::process_message_ai_react_streaming( chat_service.clone(), request, room_id, room.project, model_id, lock_guard, self.db.clone(), self.cache.clone(), self.queue.clone(), self.room_manager.clone(), ) .await; } else { ai_react_nonstreaming::process_message_ai_react_nonstreaming( chat_service.clone(), request, room_id, room.project, model_id, lock_guard, self.db.clone(), self.cache.clone(), self.queue.clone(), self.room_manager.clone(), ) .await; } } else if use_streaming { ai_streaming::process_message_ai_streaming( chat_service.clone(), request, room_id, room.project, model_id, lock_guard, self.db.clone(), self.cache.clone(), self.queue.clone(), self.room_manager.clone(), ) .await; } else { ai_nonstreaming::process_message_ai_nonstreaming( chat_service.clone(), request, room_id, room.project, model_id, lock_guard, self.db.clone(), self.cache.clone(), self.queue.clone(), self.room_manager.clone(), ) .await; } Ok(()) } }