diff --git a/libs/queue/producer.rs b/libs/queue/producer.rs index 0eebf00..e8308b4 100644 --- a/libs/queue/producer.rs +++ b/libs/queue/producer.rs @@ -188,6 +188,7 @@ impl MessageProducer { in_reply_to: None, content: String::new(), content_type: String::new(), + thinking_content: None, send_at: chrono::Utc::now(), seq: 0, display_name: None, diff --git a/libs/queue/types.rs b/libs/queue/types.rs index eaeda8d..ee3f6e4 100644 --- a/libs/queue/types.rs +++ b/libs/queue/types.rs @@ -17,6 +17,9 @@ pub struct RoomMessageEnvelope { pub in_reply_to: Option, pub content: String, pub content_type: String, + /// Accumulated AI reasoning/thinking text. + #[serde(skip_serializing_if = "Option::is_none")] + pub thinking_content: Option, pub send_at: DateTime, pub seq: i64, /// Pre-resolved display name for the sender (e.g. AI model name). @@ -34,6 +37,9 @@ pub struct RoomMessageEvent { pub in_reply_to: Option, pub content: String, pub content_type: String, + /// Accumulated AI reasoning/thinking text. + #[serde(skip_serializing_if = "Option::is_none")] + pub thinking_content: Option, pub send_at: DateTime, pub seq: i64, pub display_name: Option, @@ -79,6 +85,7 @@ impl From for RoomMessageEvent { in_reply_to: e.in_reply_to, content: e.content, content_type: e.content_type, + thinking_content: e.thinking_content, send_at: e.send_at, seq: e.seq, display_name: e.display_name, diff --git a/libs/room/src/connection.rs b/libs/room/src/connection.rs index 34b7351..8d159fc 100644 --- a/libs/room/src/connection.rs +++ b/libs/room/src/connection.rs @@ -826,6 +826,7 @@ pub fn make_persist_fn( thread: Set(env.thread_id), content: Set(env.content.clone()), content_type: Set(content_type), + thinking_content: Set(env.thinking_content.clone()), edited_at: Set(None), send_at: Set(env.send_at.clone()), revoked: Set(None), diff --git a/libs/room/src/error.rs b/libs/room/src/error.rs index dd069ff..3670cdf 100644 --- a/libs/room/src/error.rs +++ b/libs/room/src/error.rs @@ -32,3 +32,9 @@ impl From for RoomError { RoomError::Internal(e.to_string()) } } + +impl From for RoomError { + fn from(e: agent::error::AgentError) -> Self { + RoomError::Internal(e.to_string()) + } +} diff --git a/libs/room/src/helpers.rs b/libs/room/src/helpers.rs index 773fe47..0afb6df 100644 --- a/libs/room/src/helpers.rs +++ b/libs/room/src/helpers.rs @@ -68,6 +68,7 @@ impl From for super::RoomMessageResponse { thread: value.thread, content: value.content, content_type: value.content_type.to_string(), + thinking_content: value.thinking_content, edited_at: value.edited_at, send_at: value.send_at, revoked: value.revoked, @@ -427,6 +428,7 @@ impl RoomService { thread: msg.thread, content: msg.content, content_type: msg.content_type.to_string(), + thinking_content: msg.thinking_content, edited_at: msg.edited_at, send_at: msg.send_at, revoked: msg.revoked, diff --git a/libs/room/src/message.rs b/libs/room/src/message.rs index eff9d57..1ec8f5b 100644 --- a/libs/room/src/message.rs +++ b/libs/room/src/message.rs @@ -92,6 +92,7 @@ impl RoomService { in_reply_to: msg.in_reply_to, content: msg.content, content_type: msg.content_type.to_string(), + thinking_content: msg.thinking_content, edited_at: msg.edited_at, send_at: msg.send_at, revoked: msg.revoked, @@ -158,7 +159,7 @@ impl RoomService { } } - let seq = Self::next_room_message_seq_internal(room_id, &self.db, &self.cache).await?; + let seq = crate::service::next_room_message_seq_internal(room_id, &self.db, &self.cache).await?; let now = Utc::now(); let id = Uuid::now_v7(); let project_id = room_model.project; @@ -175,6 +176,7 @@ impl RoomService { in_reply_to, content: content.clone(), content_type: content_type_str.clone(), + thinking_content: None, send_at: now, seq, display_name: None, @@ -349,6 +351,7 @@ impl RoomService { in_reply_to, content: request.content, content_type: content_type_str, + thinking_content: None, edited_at: None, send_at: now, revoked: None, diff --git a/libs/room/src/reaction.rs b/libs/room/src/reaction.rs index b2d9aa3..bc7f277 100644 --- a/libs/room/src/reaction.rs +++ b/libs/room/src/reaction.rs @@ -321,6 +321,7 @@ impl RoomService { in_reply_to: msg.in_reply_to, content: msg.content, content_type: msg.content_type.to_string(), + thinking_content: msg.thinking_content, edited_at: msg.edited_at, send_at: msg.send_at, revoked: msg.revoked, diff --git a/libs/room/src/search.rs b/libs/room/src/search.rs index 3ec2f71..c55b870 100644 --- a/libs/room/src/search.rs +++ b/libs/room/src/search.rs @@ -124,6 +124,7 @@ impl RoomService { in_reply_to: row.try_get::>("", "in_reply_to").ok().flatten(), content: row.try_get::("", "content").unwrap_or_default(), content_type, + thinking_content: None, edited_at: row.try_get::>("", "edited_at").ok().flatten(), send_at: row.try_get::("", "send_at").unwrap_or_default(), revoked: row.try_get::>("", "revoked").ok().flatten(), diff --git a/libs/room/src/service.rs b/libs/room/src/service.rs deleted file mode 100644 index 7247052..0000000 --- a/libs/room/src/service.rs +++ /dev/null @@ -1,1760 +0,0 @@ -use dashmap::DashMap; -use std::pin::Pin; -use std::sync::Arc; -use std::sync::LazyLock; - -use chrono::Utc; -use db::cache::AppCache; -use db::database::AppDatabase; -use models::projects::project_members; -use models::rooms::room; -use models::rooms::room_ai; -use models::EntityTrait; -use queue::{AgentTaskEvent, MessageProducer, ProjectRoomEvent, RoomMessageEnvelope}; -use sea_orm::{sea_query::Expr, ColumnTrait, ExprTrait, QueryFilter, QueryOrder, QuerySelect}; -use uuid::Uuid; - -use crate::connection::{ - extract_get_redis, make_persist_fn, DedupCache, PersistFn, RoomConnectionManager, -}; -use crate::error::RoomError; -use agent::chat::{AiRequest, ChatService, Mention}; -use agent::embed::EmbedService; -use agent::react::ReactStep; -use agent::TaskService; -use models::agent_task::AgentType; - -const DEFAULT_MAX_CONCURRENT_WORKERS: usize = 1024; - -/// Callback type for sending push notifications. -/// The caller (AppService) provides this to RoomService so it can trigger -/// browser push notifications without depending on the service crate directly. -pub type PushNotificationFn = - Arc, Option) + Send + Sync>; - -/// Legacy: uuid or username -static USER_MENTION_RE: LazyLock regex_lite::Regex> = - LazyLock::new(|| regex_lite::Regex::new(r"\s*([^<]+?)\s*").unwrap()); - -/// Legacy: label -static MENTION_TAG_RE: LazyLock regex_lite::Regex> = - LazyLock::new(|| { - regex_lite::Regex::new( - r#"]*>\s*([^<]*?)\s*"#, - ) - .unwrap() - }); - -/// New format: @[type:id:label] -/// e.g. @[user:550e8400-e29b-41d4-a716-446655440000:alice] -/// @[repo:660e8400-e29b-41d4-a716-446655440001:my-repo] -/// @[ai:gpt-4o:Claude] -static MENTION_BRACKET_RE: LazyLock regex_lite::Regex> = - LazyLock::new(|| regex_lite::Regex::new(r"@\[([a-z]+):([^:\]]+):([^\]]+)\]").unwrap()); - -#[derive(Clone)] -pub struct RoomService { - pub db: AppDatabase, - pub cache: AppCache, - pub config: config::AppConfig, - pub room_manager: Arc, - pub queue: MessageProducer, - pub redis_url: String, - pub chat_service: Option>, - pub task_service: Option>, - pub embed_service: Option>, - pub push_fn: Option, - worker_semaphore: Arc, - dedup_cache: DedupCache, -} - -impl RoomService { - pub fn new( - db: AppDatabase, - cache: AppCache, - config: config::AppConfig, - queue: MessageProducer, - room_manager: Arc, - redis_url: String, - chat_service: Option>, - task_service: Option>, - max_concurrent_workers: Option, - push_fn: Option, - embed_service: Option>, - ) -> Self { - let dedup_cache: DedupCache = - Arc::new(DashMap::with_capacity_and_hasher(10000, Default::default())); - Self { - db, - cache, - config, - room_manager, - queue, - redis_url, - chat_service, - task_service, - embed_service, - worker_semaphore: Arc::new(tokio::sync::Semaphore::new( - max_concurrent_workers.unwrap_or(DEFAULT_MAX_CONCURRENT_WORKERS), - )), - dedup_cache, - push_fn, - } - } - - pub async fn start_workers( - &self, - mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, - ) -> anyhow::Result<()> { - use models::rooms::Room; - use sea_orm::EntityTrait; - - let rooms: Vec = Room::find().all(&self.db).await?; - let room_ids: Vec = rooms.iter().map(|r| r.id).collect(); - let project_ids: Vec = rooms - .iter() - .map(|r| r.project) - .collect::>() - .into_iter() - .collect(); - - // Save a clone for task subscriber handles before `project_ids` gets moved. - let task_project_ids = project_ids.clone(); - - tracing::info!( - room_count = room_ids.len(), - project_count = project_ids.len(), - "starting room workers" - ); - - let persist_fn: PersistFn = make_persist_fn( - self.db.clone(), - self.room_manager.metrics.clone(), - self.dedup_cache.clone(), - ); - - let get_redis: Arc queue::worker::RedisFuture + Send + Sync> = - extract_get_redis(self.queue.clone()); - - let worker_room_ids = room_ids.clone(); - let worker_shutdown = shutdown_rx.resubscribe(); - let worker_handle = tokio::spawn({ - let get_redis = get_redis.clone(); - let persist_fn = persist_fn.clone(); - async move { - queue::start_worker(worker_room_ids, get_redis, persist_fn, worker_shutdown).await; - } - }); - - let manager = self.room_manager.clone(); - let redis_url = self.redis_url.clone(); - - let mut handles: Vec<_> = room_ids - .into_iter() - .map(|room_id| { - let manager = manager.clone(); - let redis_url = redis_url.clone(); - let shutdown_rx = shutdown_rx.resubscribe(); - tokio::spawn(async move { - crate::connection::subscribe_room_events( - redis_url, - manager, - room_id, - shutdown_rx, - ) - .await; - }) - }) - .collect(); - - let project_handles: Vec<_> = project_ids - .into_iter() - .map(|project_id| { - let manager = manager.clone(); - let redis_url = redis_url.clone(); - let shutdown_rx = shutdown_rx.resubscribe(); - tokio::spawn(async move { - crate::connection::subscribe_project_room_events( - redis_url, - manager, - project_id, - shutdown_rx, - ) - .await; - }) - }) - .collect(); - handles.extend(project_handles); - - // Subscribe to agent task events for each project. - let task_handles: Vec<_> = task_project_ids - .into_iter() - .map(|project_id| { - let manager = manager.clone(); - let redis_url = redis_url.clone(); - let shutdown_rx = shutdown_rx.resubscribe(); - tokio::spawn(async move { - crate::connection::subscribe_task_events_fn( - redis_url, - manager, - project_id, - shutdown_rx, - ) - .await; - }) - }) - .collect(); - handles.extend(task_handles); - - let cleanup_handle = { - let manager = self.room_manager.clone(); - let db = self.db.clone(); - let dedup_cache = self.dedup_cache.clone(); - let mut cleanup_shutdown = shutdown_rx.resubscribe(); - tokio::spawn(async move { - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); - interval.tick().await; - loop { - tokio::select! { - _ = interval.tick() => { - manager.cleanup_rate_limit().await; - crate::connection::cleanup_dedup_cache(&dedup_cache); - if let Ok(rooms) = Room::find().all(&db).await { - let room_ids: Vec<_> = rooms.iter().map(|r| r.id).collect(); - let project_ids: Vec<_> = rooms.iter().map(|r| r.project).collect(); - manager.metrics.cleanup_stale_rooms(&room_ids).await; - manager.prune_stale_rooms(&room_ids).await; - manager.prune_stale_projects(&project_ids).await; - } - } - _ = cleanup_shutdown.recv() => { - tracing::info!("cleanup task shutting down"); - break; - } - } - } - }) - }; - handles.push(cleanup_handle); - - let _ = shutdown_rx.recv().await; - - tracing::info!("room workers shutting down"); - - for h in handles { - let _ = h.abort(); - } - let _ = worker_handle.await; - - tracing::info!("room workers stopped"); - Ok(()) - } - - /// Spawn a background agent task: - /// 1. Creates a DB record (status = pending → running) - /// 2. Publishes a "started" event via Redis Pub/Sub - /// 3. Runs `execute()` behind a semaphore - /// 4. On complete/fail, updates the record and publishes the final event - pub async fn spawn_agent_task( - &self, - project_id: Uuid, - agent_type: AgentType, - input: String, - _title: Option, - execute: F, - ) -> anyhow::Result - where - F: FnOnce(i64, Arc) -> Fut + Send + 'static, - Fut: std::future::Future> + Send, - { - let task_service = match &self.task_service { - Some(ts) => ts.clone(), - None => return Err(anyhow::anyhow!("task service not configured")), - }; - - let task = task_service - .create(project_id, input, agent_type) - .await - .map_err(|e| anyhow::anyhow!("create task failed: {}", e))?; - - let task_id = task.id; - - // Publish "started" event via Redis Pub/Sub. - let started_event = AgentTaskEvent { - task_id, - project_id, - parent_id: task.parent_id, - event: "started".to_string(), - message: None, - output: None, - error: None, - status: models::agent_task::TaskStatus::Running.to_string(), - timestamp: Utc::now(), - }; - self.queue - .publish_agent_task_event(project_id, started_event) - .await; - - // Mark task as running. - let _ = task_service.start(task_id).await; - - let queue = self.queue.clone(); - let room_manager = self.room_manager.clone(); - let semaphore = self.worker_semaphore.clone(); - - // Spawn the background task. - tokio::spawn(async move { - let _permit = semaphore.acquire().await.expect("semaphore closed"); - - let result = execute(task_id, task_service.clone()).await; - - let event = match result { - Ok(output) => { - let _ = task_service.complete(task_id, &output).await; - AgentTaskEvent { - task_id, - project_id, - parent_id: None, - event: "done".to_string(), - message: None, - output: Some(output), - error: None, - status: models::agent_task::TaskStatus::Done.to_string(), - timestamp: chrono::Utc::now(), - } - } - Err(err) => { - let _ = task_service.fail(task_id, &err).await; - AgentTaskEvent { - task_id, - project_id, - parent_id: None, - event: "failed".to_string(), - message: None, - output: None, - error: Some(err), - status: models::agent_task::TaskStatus::Failed.to_string(), - timestamp: chrono::Utc::now(), - } - } - }; - - queue - .publish_agent_task_event(project_id, event.clone()) - .await; - room_manager.broadcast_agent_task(project_id, event).await; - tracing::info!(task_id = task_id, project_id = %project_id, "agent task finished"); - }); - - Ok(task_id) - } - - pub fn spawn_room_workers(&self, room_id: uuid::Uuid) { - let persist_fn: PersistFn = make_persist_fn( - self.db.clone(), - self.room_manager.metrics.clone(), - self.dedup_cache.clone(), - ); - let get_redis: Arc queue::worker::RedisFuture + Send + Sync> = - extract_get_redis(self.queue.clone()); - let manager = self.room_manager.clone(); - let redis_url = self.redis_url.clone(); - let semaphore = self.worker_semaphore.clone(); - let db = self.db.clone(); - - let manager2 = self.room_manager.clone(); - let redis_url2 = redis_url.clone(); - let redis_url3 = redis_url.clone(); - - tokio::spawn(async move { - let _permit = match semaphore.acquire_owned().await { - Ok(p) => p, - Err(_) => return, - }; - let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel::<()>(1); - queue::room_worker_task( - room_id, - uuid::Uuid::new_v4().to_string(), - get_redis, - persist_fn, - shutdown_rx, - ) - .await; - let _ = shutdown_tx.send(()); - }); - - tokio::spawn(async move { - let shutdown_rx = manager.register_room(room_id).await; - crate::connection::subscribe_room_events( - redis_url2, - manager.clone(), - room_id, - shutdown_rx, - ) - .await; - }); - - tokio::spawn(async move { - let project_id = { - let room = room::Entity::find_by_id(room_id) - .one(&db) - .await - .ok() - .flatten(); - match room { - Some(r) => r.project, - None => return, - } - }; - let shutdown_rx = manager2.register_project(project_id).await; - crate::connection::subscribe_project_room_events( - redis_url3, - manager2, - project_id, - shutdown_rx, - ) - .await; - }); - } - - pub async fn publish_room_event( - &self, - project_id: uuid::Uuid, - event_type: super::RoomEventType, - room_id: Option, - category_id: Option, - message_id: Option, - seq: Option, - ) { - let event = ProjectRoomEvent { - event_type: event_type.as_str().into(), - project_id, - room_id, - category_id, - message_id, - seq, - timestamp: Utc::now(), - }; - self.queue - .publish_project_room_event(project_id, event) - .await; - } - - pub(crate) fn notify_project_members( - &self, - project_id: uuid::Uuid, - notification_type: super::NotificationType, - title: String, - content: Option, - related_room_id: Option, - ) { - let db = self.db.clone(); - let notification_type_inner = notification_type; - let title_inner = title; - let content_inner = content; - let related_room_id_inner = related_room_id; - let project_id_inner = project_id; - - tokio::spawn(async move { - let members = match project_members::Entity::find() - .filter(project_members::Column::Project.eq(project_id_inner)) - .all(&db) - .await - { - Ok(m) => m, - Err(e) => { - tracing::error!(project_id = %project_id_inner, error = %e, - "notify_project_members: failed to fetch members"); - return; - } - }; - - for member in members { - let user_id = member.user; - if let Err(e) = Self::_notification_create_sync( - &db, - notification_type_inner, - user_id, - title_inner.clone(), - content_inner.clone(), - related_room_id_inner, - project_id_inner, - ) - .await - { - tracing::warn!(user_id = %user_id, project_id = %project_id_inner, error = %e, - "notify_project_members: failed to create notification for user"); - } - } - }); - } - - async fn _notification_create_sync( - db: &db::database::AppDatabase, - notification_type: super::NotificationType, - user_id: uuid::Uuid, - title: String, - content: Option, - related_room_id: Option, - project_id: uuid::Uuid, - ) -> Result<(), crate::error::RoomError> { - use chrono::Utc; - use models::rooms::room_notifications; - use sea_orm::{ActiveModelTrait, Set}; - - let notification_type_model = match notification_type { - super::NotificationType::Mention => room_notifications::NotificationType::Mention, - super::NotificationType::Invitation => room_notifications::NotificationType::Invitation, - super::NotificationType::RoleChange => room_notifications::NotificationType::RoleChange, - super::NotificationType::RoomCreated => { - room_notifications::NotificationType::RoomCreated - } - super::NotificationType::RoomDeleted => { - room_notifications::NotificationType::RoomDeleted - } - super::NotificationType::SystemAnnouncement => { - room_notifications::NotificationType::SystemAnnouncement - } - super::NotificationType::ProjectInvitation => { - room_notifications::NotificationType::ProjectInvitation - } - super::NotificationType::WorkspaceInvitation => { - room_notifications::NotificationType::WorkspaceInvitation - } - }; - - let _model = room_notifications::ActiveModel { - id: Set(uuid::Uuid::now_v7()), - room: Set(related_room_id), - project: Set(Some(project_id)), - user_id: Set(Some(user_id)), - notification_type: Set(notification_type_model), - related_message_id: Set(None), - related_user_id: Set(None), - related_room_id: Set(related_room_id), - title: Set(title), - content: Set(content), - metadata: Set(None), - is_read: Set(false), - is_archived: Set(false), - created_at: Set(Utc::now()), - read_at: Set(None), - expires_at: Set(None), - } - .insert(db) - .await - .map_err(|e| crate::error::RoomError::Database(e))?; - - Ok(()) - } - - /// Extracts user UUIDs from all mention formats: - /// - Legacy: `uuid` - /// - Legacy: `label` - /// - New: `@[user:uuid:label]` - pub fn extract_mentions(content: &str) -> Vec { - let mut mentioned = Vec::new(); - - // Legacy uuid format - for cap in USER_MENTION_RE.captures_iter(content) { - if let Some(inner) = cap.get(1) { - let token = inner.as_str().trim(); - if let Ok(uuid) = Uuid::parse_str(token) { - if !mentioned.contains(&uuid) { - mentioned.push(uuid); - } - } - } - } - - // Legacy label format - for cap in MENTION_TAG_RE.captures_iter(content) { - if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { - if type_m.as_str() == "user" { - if let Ok(uuid) = Uuid::parse_str(id_m.as_str().trim()) { - if !mentioned.contains(&uuid) { - mentioned.push(uuid); - } - } - } - } - } - - // New @[user:uuid:label] format - for cap in MENTION_BRACKET_RE.captures_iter(content) { - if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { - if type_m.as_str() == "user" { - if let Ok(uuid) = Uuid::parse_str(id_m.as_str().trim()) { - if !mentioned.contains(&uuid) { - mentioned.push(uuid); - } - } - } - } - } - - mentioned - } - - /// Resolves user mentions from all formats: - /// - Legacy: `uuid` or `username` - /// - Legacy: `label` - /// - New: `@[user:uuid:label]` - /// Repository and AI mention types are accepted but produce no user UUIDs. - pub async fn resolve_mentions(&self, content: &str) -> Vec { - use models::users::User; - use sea_orm::EntityTrait; - - let mut resolved: Vec = Vec::new(); - let mut seen_usernames: Vec = Vec::new(); - - // Legacy uuid or username format - for cap in USER_MENTION_RE.captures_iter(content) { - if let Some(inner) = cap.get(1) { - let token = inner.as_str().trim(); - - if let Ok(uuid) = Uuid::parse_str(token) { - if !resolved.contains(&uuid) { - resolved.push(uuid); - } - continue; - } - - let token_lower = token.to_lowercase(); - if seen_usernames.contains(&token_lower) { - continue; - } - seen_usernames.push(token_lower.clone()); - - if let Some(user) = User::find() - .filter(models::users::user::Column::Username.eq(token_lower)) - .one(&self.db) - .await - .ok() - .flatten() - { - if !resolved.contains(&user.uid) { - resolved.push(user.uid); - } - } - } - } - - // New label format - for cap in MENTION_TAG_RE.captures_iter(content) { - if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { - if type_m.as_str() == "user" { - let id = id_m.as_str().trim(); - if let Ok(uuid) = Uuid::parse_str(id) { - if !resolved.contains(&uuid) { - resolved.push(uuid); - } - } else { - // Fall back to label-based username lookup - if let Some(label_m) = cap.get(3) { - let label = label_m.as_str().trim(); - if !label.is_empty() { - let label_lower = label.to_lowercase(); - if seen_usernames.contains(&label_lower) { - continue; - } - seen_usernames.push(label_lower.clone()); - - if let Some(user) = User::find() - .filter(models::users::user::Column::Username.eq(label_lower)) - .one(&self.db) - .await - .ok() - .flatten() - { - if !resolved.contains(&user.uid) { - resolved.push(user.uid); - } - } - } - } - } - } - // `repository` and `ai` mention types are accepted but do not - // produce user notification UUIDs — no-op here. - } - } - - // New @[user:uuid:label] format - for cap in MENTION_BRACKET_RE.captures_iter(content) { - if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { - if type_m.as_str() == "user" { - let id = id_m.as_str().trim(); - if let Ok(uuid) = Uuid::parse_str(id) { - if !resolved.contains(&uuid) { - resolved.push(uuid); - } - } else { - // Fall back to label-based username lookup - if let Some(label_m) = cap.get(3) { - let label = label_m.as_str().trim(); - if !label.is_empty() { - let label_lower = label.to_lowercase(); - if seen_usernames.contains(&label_lower) { - continue; - } - seen_usernames.push(label_lower.clone()); - - if let Some(user) = User::find() - .filter(models::users::user::Column::Username.eq(label_lower)) - .one(&self.db) - .await - .ok() - .flatten() - { - if !resolved.contains(&user.uid) { - resolved.push(user.uid); - } - } - } - } - } - } - // `repository` and `ai` mention types: no user UUIDs - } - } - - resolved - } - - pub async fn check_room_access(&self, room_id: Uuid, user_id: Uuid) -> Result<(), RoomError> { - let room = room::Entity::find_by_id(room_id) - .one(&self.db) - .await? - .ok_or_else(|| RoomError::NotFound("Room not found".to_string()))?; - - if room.public { - return Ok(()); - } - - if self.require_room_member(room_id, user_id).await.is_ok() { - return Ok(()); - } - - self.check_project_member(room.project, user_id).await?; - - Ok(()) - } - - pub async fn check_project_member( - &self, - project_id: Uuid, - user_id: Uuid, - ) -> Result<(), RoomError> { - let member = project_members::Entity::find() - .filter(project_members::Column::Project.eq(project_id)) - .filter(project_members::Column::User.eq(user_id)) - .one(&self.db) - .await?; - - if member.is_some() { - Ok(()) - } else { - Err(RoomError::NoPower) - } - } - - /// Determine whether AI should respond to a message in this room. - /// - No room_ai config → AI not configured, never respond. - /// - use_exact = false → respond to every text message. - /// - use_exact = true → only respond when the message contains an @[ai:...] or - /// ... tag that mentions this room's configured AI model. - pub async fn should_ai_respond(&self, room_id: Uuid, content: &str) -> Result { - use models::rooms::room_ai; - - let ai_config = room_ai::Entity::find() - .filter(room_ai::Column::Room.eq(room_id)) - .one(&self.db) - .await?; - - let config = match ai_config { - Some(c) => c, - None => return Ok(false), - }; - - if !config.use_exact { - return Ok(true); - } - - // use_exact mode: only respond when AI is explicitly mentioned - let model_id_str = config.model.to_string(); - - // Check @[ai:model_id:label] format - for cap in MENTION_BRACKET_RE.captures_iter(content) { - if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { - if type_m.as_str() == "ai" && id_m.as_str().trim() == model_id_str { - return Ok(true); - } - } - } - - // Check label format - for cap in MENTION_TAG_RE.captures_iter(content) { - if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { - if type_m.as_str() == "ai" && id_m.as_str().trim() == model_id_str { - return Ok(true); - } - } - } - - Ok(false) - } - - pub async fn get_room_ai_config( - &self, - room_id: Uuid, - ) -> Result, RoomError> { - use models::rooms::room_ai; - - let ai_config = room_ai::Entity::find() - .filter(room_ai::Column::Room.eq(room_id)) - .one(&self.db) - .await?; - - Ok(ai_config) - } - - pub async fn get_user_names( - &self, - user_ids: &[Uuid], - ) -> std::collections::HashMap { - use models::users::User; - use sea_orm::EntityTrait; - - let mut names = std::collections::HashMap::new(); - if user_ids.is_empty() { - return names; - } - - let users = User::find() - .filter(models::users::user::Column::Uid.is_in(user_ids.to_vec())) - .all(&self.db) - .await - .unwrap_or_default(); - - for user in users { - names.insert(user.uid, user.username); - } - - names - } - - pub async fn require_room_member(&self, room_id: Uuid, user_id: Uuid) -> Result<(), RoomError> { - use models::rooms::room_member::{Column as RmCol, Entity as RoomMember}; - - let member = RoomMember::find() - .filter(RmCol::Room.eq(room_id)) - .filter(RmCol::User.eq(user_id)) - .one(&self.db) - .await?; - - member - .ok_or_else(|| RoomError::NotFound("Room member not found".to_string())) - .map(|_| ()) - } - - pub async fn find_room_or_404(&self, room_id: Uuid) -> Result { - room::Entity::find_by_id(room_id) - .one(&self.db) - .await? - .ok_or_else(|| RoomError::NotFound("Room not found".to_string())) - } - - pub async fn process_message_ai( - &self, - room_id: Uuid, - _message_id: Uuid, - sender_id: Uuid, - content: String, - ) -> Result<(), RoomError> { - let Some(chat_service) = &self.chat_service else { - return Ok(()); - }; - - let Some(ai_config) = self.get_room_ai_config(room_id).await? else { - return Ok(()); - }; - - let Some(lock_guard) = - crate::room_ai_queue::acquire_room_ai_lock(&self.cache, room_id).await? - else { - return Ok(()); - }; - - let room = self.find_room_or_404(room_id).await?; - - let project = models::projects::project::Entity::find_by_id(room.project) - .one(&self.db) - .await? - .ok_or_else(|| RoomError::NotFound("Project not found".to_string()))?; - - // Parse @[ai:uuid:label] from content to allow per-mention model routing. - // If no mention is found, use the room's default model. - let mentioned_model_id = { - let mut found = None; - for cap in MENTION_BRACKET_RE.captures_iter(&content) { - if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { - if type_m.as_str() == "ai" { - if let Ok(uuid) = Uuid::parse_str(id_m.as_str().trim()) { - found = Some(uuid); - break; - } - } - } - } - found - }; - - let model_id = mentioned_model_id.unwrap_or(ai_config.model); - let model = models::agents::model::Entity::find_by_id(model_id) - .one(&self.db) - .await? - .ok_or_else(|| RoomError::NotFound("AI model not found".to_string()))?; - - let sender = models::users::User::find_by_id(sender_id) - .one(&self.db) - .await? - .ok_or_else(|| RoomError::NotFound("Sender not found".to_string()))?; - - let history = self.get_room_history(room_id, 50).await?; - - let user_ids: Vec = history - .iter() - .filter_map(|m| m.sender_id) - .chain(std::iter::once(sender_id)) - .collect(); - let user_names = self.get_user_names(&user_ids).await; - - let mentions = self.extract_mention_context(&content).await; - - let request = AiRequest { - db: self.db.clone(), - cache: self.cache.clone(), - config: self.config.clone(), - model, - project: project.clone(), - sender, - room: room.clone(), - input: content, - mention: mentions, - history, - user_names, - temperature: ai_config.temperature.unwrap_or(0.7), - max_tokens: ai_config.max_tokens.unwrap_or(4096) as i32, - top_p: 1.0, - frequency_penalty: 0.0, - presence_penalty: 0.0, - think: ai_config.think, - tools: Some(chat_service.tools()), - max_tool_depth: 1000, - }; - - let use_streaming = ai_config.stream; - let is_react = ai_config.agent_type.as_deref() == Some("react"); - - if is_react { - if use_streaming { - self.process_message_ai_react_streaming( - chat_service.clone(), - request, - room_id, - room.project, - model_id, - lock_guard, - ) - .await; - } else { - self.process_message_ai_react_nonstreaming( - chat_service.clone(), - request, - room_id, - room.project, - model_id, - lock_guard, - ) - .await; - } - } else if use_streaming { - self.process_message_ai_streaming( - chat_service.clone(), - request, - room_id, - room.project, - model_id, - lock_guard, - ) - .await; - } else { - self.process_message_ai_nonstreaming( - chat_service.clone(), - request, - room_id, - room.project, - model_id, - lock_guard, - ) - .await; - } - - Ok(()) - } - - async fn process_message_ai_streaming( - &self, - chat_service: Arc, - request: AiRequest, - room_id: Uuid, - project_id: Uuid, - _model_id: Uuid, - lock_guard: crate::room_ai_queue::RoomAiLockGuard, - ) { - use queue::RoomMessageStreamChunkEvent; - - let streaming_msg_id = Uuid::now_v7(); - let seq = match Self::next_room_message_seq_internal(room_id, &self.db, &self.cache).await { - Ok(s) => s, - Err(e) => { - tracing::error!(error = %e, "Failed to get seq for streaming AI message"); - return; - } - }; - - // Register stream channel for real-time WebSocket broadcasting of chunks - let _ = self - .room_manager - .register_stream_channel(streaming_msg_id) - .await; - - // Emit an initial "thinking" chunk immediately so the frontend shows the - // "AI is thinking..." indicator without waiting for the first real token. - let initial_event = RoomMessageStreamChunkEvent { - message_id: streaming_msg_id, - room_id, - content: String::new(), - done: false, - error: None, - display_name: Some(request.model.name.clone()), - chunk_type: Some("thinking".to_string()), - }; - self.room_manager.broadcast_stream_chunk(initial_event).await; - - let room_manager = self.room_manager.clone(); - let db = self.db.clone(); - let room_id_inner = room_id; - let project_id_inner = project_id; - let now = Utc::now(); - let sender_type = "ai".to_string(); - let queue = self.queue.clone(); - let ai_display_name = request.model.name.clone(); - - let db = db.clone(); - let model_id = request.model.id; - tokio::spawn(async move { - let _lock_guard = lock_guard; - let room_manager = room_manager.clone(); - let db = db.clone(); - let model_id = model_id; - // Fixed UUID to identify AI typing events across WS reconnections. - let ai_typing_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(); - // Clone before closure so closure captures clone, not the original. - let ai_display_name_for_chunk = ai_display_name.clone(); - let ai_display_name_for_final = ai_display_name.clone(); - - let streaming_msg_id = streaming_msg_id; - let room_id_for_chunk = room_id_inner; - let chunk_count = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); - let room_manager_cb = room_manager.clone(); - - let on_chunk = move |chunk: agent::chat::AiStreamChunk| { - Box::pin({ - let room_manager = room_manager_cb.clone(); - let streaming_msg_id = streaming_msg_id; - let room_id = room_id_for_chunk; - let chunk_count = chunk_count.clone(); - // Clone display_name INSIDE the async block so the outer closure stays `Fn`. - let ai_display_name_for_chunk = ai_display_name_for_chunk.clone(); - async move { - let chunk_type_str = match chunk.chunk_type { - agent::chat::AiChunkType::Thinking => "thinking", - agent::chat::AiChunkType::Answer => "answer", - agent::chat::AiChunkType::ToolCall => "tool_call", - agent::chat::AiChunkType::ToolResult => "tool_result", - }; - let event = RoomMessageStreamChunkEvent { - message_id: streaming_msg_id, - room_id, - content: chunk.content, - done: chunk.done, - error: None, - display_name: Some(ai_display_name_for_chunk), - chunk_type: Some(chunk_type_str.to_string()), - }; - room_manager.broadcast_stream_chunk(event).await; - - chunk_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - }) as Pin + Send>> - }; - - let stream_callback: agent::chat::StreamCallback = Box::new(on_chunk); - - // Broadcast AI typing.start so WS clients (including reconnections) see the indicator. - let typing_start = queue::TypingEvent { - room_id: room_id_inner, - user_id: ai_typing_id, - username: ai_display_name.clone(), - avatar_url: None, - action: "start".to_string(), - sender_type: Some("ai".to_string()), - }; - room_manager.broadcast_typing(room_id_inner, typing_start).await; - - match chat_service.process_stream(request, stream_callback).await { - Ok(full_content) => { - let envelope = RoomMessageEnvelope { - id: streaming_msg_id, - dedup_key: Some(format!("{}:{}", room_id_inner, streaming_msg_id)), - room_id: room_id_inner, - sender_type: sender_type.clone(), - sender_id: None, - model_id: Some(model_id), - thread_id: None, - content: full_content.clone(), - content_type: "text".to_string(), - send_at: now, - seq, - in_reply_to: None, - display_name: Some(ai_display_name_for_final.clone()), - }; - - if let Err(e) = queue.publish(room_id_inner, envelope).await { - tracing::error!(error = %e, "Failed to publish streaming AI message"); - } else { - let now = Utc::now(); - if let Err(e) = room_ai::Entity::update_many() - .col_expr( - room_ai::Column::CallCount, - Expr::col(room_ai::Column::CallCount).add(1), - ) - .col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now))) - .filter(room_ai::Column::Room.eq(room_id_inner)) - .filter(room_ai::Column::Model.eq(model_id)) - .exec(&db) - .await - { - tracing::warn!(error = %e, "Failed to update room_ai call stats"); - } - - let msg_event = queue::RoomMessageEvent { - id: streaming_msg_id, - room_id: room_id_inner, - sender_type: sender_type.clone(), - sender_id: None, - thread_id: None, - content: full_content, - content_type: "text".to_string(), - send_at: now, - seq, - display_name: Some(ai_display_name_for_final.clone()), - in_reply_to: None, - reactions: None, - message_id: None, - }; - room_manager.broadcast(room_id_inner, msg_event).await; - room_manager.metrics.messages_sent.increment(1); - - // Stop AI typing indicator now that the message is delivered. - let typing_stop = queue::TypingEvent { - room_id: room_id_inner, - user_id: ai_typing_id, - username: ai_display_name_for_final.clone(), - avatar_url: None, - action: "stop".to_string(), - sender_type: Some("ai".to_string()), - }; - room_manager.broadcast_typing(room_id_inner, typing_stop).await; - - let event = queue::ProjectRoomEvent { - event_type: super::RoomEventType::NewMessage.as_str().into(), - project_id: project_id_inner, - room_id: Some(room_id_inner), - category_id: None, - message_id: Some(streaming_msg_id), - seq: Some(seq), - timestamp: now, - }; - queue - .publish_project_room_event(project_id_inner, event) - .await; - } - } - Err(e) => { - tracing::error!(error = %e, "AI streaming failed"); - // Stop AI typing indicator since the stream failed. - let typing_stop = queue::TypingEvent { - room_id: room_id_inner, - user_id: ai_typing_id, - username: ai_display_name.clone(), - avatar_url: None, - action: "stop".to_string(), - sender_type: Some("ai".to_string()), - }; - room_manager.broadcast_typing(room_id_inner, typing_stop).await; - - let event = RoomMessageStreamChunkEvent { - message_id: streaming_msg_id, - room_id: room_id_inner, - content: String::new(), - done: true, - error: Some(e.to_string()), - display_name: Some(ai_display_name.clone()), - chunk_type: None, - }; - room_manager.broadcast_stream_chunk(event).await; - } - } - - room_manager.close_stream_channel(streaming_msg_id).await; - }); - } - - async fn process_message_ai_nonstreaming( - &self, - chat_service: Arc, - request: AiRequest, - room_id: Uuid, - project_id: Uuid, - model_id: Uuid, - lock_guard: crate::room_ai_queue::RoomAiLockGuard, - ) { - let chat_service = chat_service.clone(); - let db = self.db.clone(); - let cache = self.cache.clone(); - let queue = self.queue.clone(); - let room_manager = self.room_manager.clone(); - let room_id_for_ai = room_id; - let project_id_for_ai = project_id; - let model_id_inner = model_id; - - tokio::spawn(async move { - let _lock_guard = lock_guard; - let model_display_name = request.model.name.clone(); - match chat_service.process(request).await { - Ok(response) => { - if let Err(e) = Self::create_and_publish_ai_message( - &db, - &cache, - &queue, - &room_manager, - room_id_for_ai, - project_id_for_ai, - Uuid::now_v7(), - response, - model_id_inner, - Some(model_display_name), - ) - .await - { - tracing::error!(error = %e, "Failed to create AI message"); - } else { - let now = Utc::now(); - if let Err(e) = room_ai::Entity::update_many() - .col_expr( - room_ai::Column::CallCount, - Expr::col(room_ai::Column::CallCount).add(1), - ) - .col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now))) - .filter(room_ai::Column::Room.eq(room_id_for_ai)) - .filter(room_ai::Column::Model.eq(model_id_inner)) - .exec(&db) - .await - { - tracing::warn!(error = %e, "Failed to update room_ai call stats"); - } - } - } - Err(e) => { - tracing::error!(error = %e, "AI processing failed"); - // Send an error message so the user knows something went wrong - let _ = Self::create_and_publish_ai_message( - &db, - &cache, - &queue, - &room_manager, - room_id_for_ai, - project_id_for_ai, - Uuid::now_v7(), - format!("[AI error: {}]", e), - model_id_inner, - Some(model_display_name), - ) - .await; - } - } - }); - } - - /// ReAct agent — non-streaming: collect full answer then persist. - async fn process_message_ai_react_nonstreaming( - &self, - chat_service: Arc, - request: AiRequest, - room_id: Uuid, - project_id: Uuid, - model_id: Uuid, - lock_guard: crate::room_ai_queue::RoomAiLockGuard, - ) { - let chat_service = chat_service.clone(); - let db = self.db.clone(); - let cache = self.cache.clone(); - let queue = self.queue.clone(); - let room_manager = self.room_manager.clone(); - let room_id_for_ai = room_id; - let project_id_for_ai = project_id; - let model_id_inner = model_id; - - tokio::spawn(async move { - let _lock_guard = lock_guard; - let model_display_name = request.model.name.clone(); - - let final_answer = chat_service - .process_react(&request, |_step| { - // ReAct step events are logged internally; no streaming output here. - }) - .await; - - match final_answer { - Ok(response) => { - if let Err(e) = Self::create_and_publish_ai_message( - &db, - &cache, - &queue, - &room_manager, - room_id_for_ai, - project_id_for_ai, - Uuid::now_v7(), - response, - model_id_inner, - Some(model_display_name), - ) - .await - { - tracing::error!(error = %e, "Failed to create ReAct AI message"); - } else { - let now = Utc::now(); - if let Err(e) = room_ai::Entity::update_many() - .col_expr( - room_ai::Column::CallCount, - Expr::col(room_ai::Column::CallCount).add(1), - ) - .col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now))) - .filter(room_ai::Column::Room.eq(room_id_for_ai)) - .filter(room_ai::Column::Model.eq(model_id_inner)) - .exec(&db) - .await - { - tracing::warn!(error = %e, "Failed to update room_ai call stats"); - } - } - } - Err(e) => { - tracing::error!(error = %e, "ReAct agent failed"); - let _ = Self::create_and_publish_ai_message( - &db, - &cache, - &queue, - &room_manager, - room_id_for_ai, - project_id_for_ai, - Uuid::now_v7(), - format!("[AI error: {}]", e), - model_id_inner, - Some(model_display_name), - ) - .await; - } - } - }); - } - - /// ReAct agent — streaming: forward each ReactStep to WebSocket, then persist final answer. - async fn process_message_ai_react_streaming( - &self, - chat_service: Arc, - request: AiRequest, - room_id: Uuid, - project_id: Uuid, - _model_id: Uuid, - lock_guard: crate::room_ai_queue::RoomAiLockGuard, - ) { - use queue::RoomMessageStreamChunkEvent; - - let streaming_msg_id = Uuid::now_v7(); - let seq = match Self::next_room_message_seq_internal(room_id, &self.db, &self.cache).await { - Ok(s) => s, - Err(e) => { - tracing::error!(error = %e, "Failed to get seq for ReAct streaming"); - return; - } - }; - - let room_manager = self.room_manager.clone(); - let db = self.db.clone(); - let room_id_inner = room_id; - let project_id_inner = project_id; - let now = Utc::now(); - let sender_type = "ai".to_string(); - let queue = self.queue.clone(); - let ai_display_name = request.model.name.clone(); - let model_id_inner = request.model.id; - - tokio::spawn(async move { - let _lock_guard = lock_guard; - - // Buffer all reasoning steps + the final answer separately. - let reasoning_buffer: std::sync::Arc> = - std::sync::Arc::new(std::sync::Mutex::new(String::new())); - let answer_buffer: std::sync::Arc> = - std::sync::Arc::new(std::sync::Mutex::new(String::new())); - let step_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); - - let on_step = { - let room_manager = room_manager.clone(); - let streaming_msg_id = streaming_msg_id; - let room_id = room_id_inner; - let step_count = step_count.clone(); - let ai_display_name_for_step = std::sync::Arc::new(ai_display_name.clone()); - let reasoning_buffer = reasoning_buffer.clone(); - let answer_buffer = answer_buffer.clone(); - move |step: ReactStep| { - let room_manager = room_manager.clone(); - let content = match &step { - ReactStep::Thought { step: _, thought } => { - format!("[Thinking] {}", thought) - } - ReactStep::Action { step: _, action } => { - format!("[Action] Calling `{}` with {:?}", action.name, action.args) - } - ReactStep::Observation { - step: _, - observation, - } => { - format!("[Observation] {}", observation) - } - ReactStep::Answer { step: _, answer } => answer.clone(), - }; - - let is_answer = matches!(&step, ReactStep::Answer { .. }); - if is_answer { - step_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - - // Update buffers BEFORE spawning — must be synchronous so the main - // thread sees the updates immediately after `process_react` returns. - { - let mut rb = reasoning_buffer.lock().unwrap(); - rb.push_str(&content); - rb.push('\n'); - } - if is_answer { - let mut ab = answer_buffer.lock().unwrap(); - ab.push_str(&content); - } - - let done = is_answer; - let ai_name = ai_display_name_for_step.clone(); - tokio::spawn(async move { - let event = RoomMessageStreamChunkEvent { - message_id: streaming_msg_id, - room_id, - content: content.clone(), - done, - error: None, - display_name: Some((*ai_name).clone()), - chunk_type: Some(if is_answer { - "answer".to_string() - } else { - "thinking".to_string() - }), - }; - room_manager.broadcast_stream_chunk(event).await; - }); - } - }; - - let result = chat_service.process_react(&request, on_step).await; - - let final_content = answer_buffer.lock().unwrap().clone(); - let reasoning_chain = reasoning_buffer.lock().unwrap().clone(); - - // Determine what to persist: prefer the answer, fall back to the reasoning chain - let content_to_persist = if !final_content.is_empty() { - final_content - } else if !reasoning_chain.trim().is_empty() { - // No Answer step, but the reasoning chain was streamed — still send it - format!( - "[Agent ran through {} reasoning steps but did not produce a final answer.]\n{}", - step_count.load(std::sync::atomic::Ordering::Relaxed), - reasoning_chain.trim_end() - ) - } else { - // Nothing produced — this should not happen in practice - String::from("[No output from reasoning agent]") - }; - - let (err_msg, should_log) = match &result { - Err(e) => (Some(format!("[Agent error: {}]", e)), true), - _ => (None, false), - }; - - let content_to_persist = if let Some(msg) = &err_msg { - format!( - "{}\n[Error during reasoning: {}]", - content_to_persist.trim_end(), - msg.trim_start_matches("[Agent error: ") - .trim_end_matches("]") - ) - } else { - content_to_persist - }; - - if should_log { - tracing::error!(error = %result.as_ref().unwrap_err(), "ReAct streaming failed"); - } - - let persist_content = content_to_persist.trim().to_string(); - if persist_content.is_empty() { - return; - } - - let envelope = RoomMessageEnvelope { - id: streaming_msg_id, - dedup_key: Some(format!("{}:{}", room_id_inner, streaming_msg_id)), - room_id: room_id_inner, - sender_type: sender_type.clone(), - sender_id: None, - model_id: Some(model_id_inner), - thread_id: None, - content: persist_content.clone(), - content_type: "text".to_string(), - send_at: now, - seq, - in_reply_to: None, - display_name: Some(ai_display_name.clone()), - }; - - if let Err(e) = queue.publish(room_id_inner, envelope).await { - tracing::error!(error = %e, "Failed to publish ReAct streaming message"); - } else { - let now = Utc::now(); - if let Err(e) = room_ai::Entity::update_many() - .col_expr( - room_ai::Column::CallCount, - Expr::col(room_ai::Column::CallCount).add(1), - ) - .col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now))) - .filter(room_ai::Column::Room.eq(room_id_inner)) - .filter(room_ai::Column::Model.eq(model_id_inner)) - .exec(&db) - .await - { - tracing::warn!(error = %e, "Failed to update room_ai call stats"); - } - - let msg_event = queue::RoomMessageEvent { - id: streaming_msg_id, - room_id: room_id_inner, - sender_type: sender_type.clone(), - sender_id: None, - thread_id: None, - content: persist_content, - content_type: "text".to_string(), - send_at: now, - seq, - display_name: Some(ai_display_name.clone()), - in_reply_to: None, - reactions: None, - message_id: None, - }; - room_manager.broadcast(room_id_inner, msg_event).await; - room_manager.metrics.messages_sent.increment(1); - - let event = queue::ProjectRoomEvent { - event_type: super::RoomEventType::NewMessage.as_str().into(), - project_id: project_id_inner, - room_id: Some(room_id_inner), - category_id: None, - message_id: Some(streaming_msg_id), - seq: Some(seq), - timestamp: now, - }; - queue - .publish_project_room_event(project_id_inner, event) - .await; - } - - room_manager.close_stream_channel(streaming_msg_id).await; - }); - } - - pub async fn create_and_publish_ai_message( - db: &AppDatabase, - cache: &AppCache, - queue: &MessageProducer, - room_manager: &Arc, - room_id: Uuid, - project_id: Uuid, - _reply_to: Uuid, - content: String, - model_id: Uuid, - model_display_name: Option, - ) -> Result { - let now = Utc::now(); - let seq = Self::next_room_message_seq_internal(room_id, db, cache).await?; - let id = Uuid::now_v7(); - - let envelope = RoomMessageEnvelope { - id, - dedup_key: Some(format!("{}:{}", room_id, id)), - room_id, - sender_type: "ai".to_string(), - sender_id: None, - model_id: Some(model_id), - thread_id: None, - content: content.clone(), - content_type: "text".to_string(), - send_at: now, - seq, - in_reply_to: None, - display_name: model_display_name.clone(), - }; - - queue.publish(room_id, envelope).await?; - room_manager.metrics.messages_sent.increment(1); - - let event = queue::RoomMessageEvent { - id, - room_id, - sender_type: "ai".to_string(), - sender_id: None, - thread_id: None, - content: content.clone(), - content_type: "text".to_string(), - send_at: now, - seq, - display_name: model_display_name, - in_reply_to: None, - reactions: None, - message_id: None, - }; - room_manager.broadcast(room_id, event).await; - - Self::publish_room_event_internal( - &db, - queue, - project_id, - super::RoomEventType::NewMessage, - Some(room_id), - Some(id), - Some(seq), - ) - .await; - - Ok(id) - } - - async fn get_room_history( - &self, - room_id: Uuid, - limit: usize, - ) -> Result, RoomError> { - use models::rooms::room_message::{Column as RmCol, Entity as RoomMessage}; - use sea_orm::EntityTrait; - - let messages = RoomMessage::find() - .filter(RmCol::Room.eq(room_id)) - .order_by_desc(RmCol::Seq) - .limit(limit as u64) - .all(&self.db) - .await?; - - Ok(messages) - } - - async fn extract_mention_context(&self, _content: &str) -> Vec { - Vec::new() - } - - pub(crate) async fn next_room_message_seq_internal( - room_id: Uuid, - db: &AppDatabase, - cache: &AppCache, - ) -> Result { - let seq_key = format!("room:seq:{}", room_id); - let mut conn = cache.conn().await.map_err(|e| { - RoomError::Internal(format!("failed to get redis connection for seq: {}", e)) - })?; - - // Normal path: Redis INCR is atomic and sufficient for sequence generation. - // Lua script removed — it was executing on every single message (costly). - let seq: i64 = redis::cmd("INCR") - .arg(&seq_key) - .query_async(&mut conn) - .await - .map_err(|e| RoomError::Internal(format!("seq INCR: {}", e)))?; - - // DB reconciliation: only check every 1000 messages, not on every request. - // This handles the rare cross-server handoff case (Redis restart wipe). - if seq % 1000 == 0 { - use models::rooms::room_message::{Column as RmCol, Entity as RoomMessage}; - use sea_orm::EntityTrait; - let db_seq: Option>> = RoomMessage::find() - .filter(RmCol::Room.eq(room_id)) - .select_only() - .column_as(RmCol::Seq.max(), "max_seq") - .into_tuple::>>() - .one(db) - .await? - .map(|r| r); - let db_seq = db_seq.flatten().flatten().unwrap_or(0); - - if db_seq >= seq { - let _: String = redis::cmd("SET") - .arg(&seq_key) - .arg(db_seq + 1) - .query_async(&mut conn) - .await - .map_err(|e| RoomError::Internal(format!("seq SET: {}", e)))?; - return Ok(db_seq + 1); - } - } - - Ok(seq) - } - - async fn publish_room_event_internal( - _db: &AppDatabase, - queue: &MessageProducer, - project_id: Uuid, - event_type: super::RoomEventType, - room_id: Option, - message_id: Option, - seq: Option, - ) { - let event = ProjectRoomEvent { - event_type: event_type.as_str().into(), - project_id, - room_id, - category_id: None, - message_id, - seq, - timestamp: Utc::now(), - }; - queue.publish_project_room_event(project_id, event).await; - } -} diff --git a/libs/room/src/service/access.rs b/libs/room/src/service/access.rs new file mode 100644 index 0000000..746982b --- /dev/null +++ b/libs/room/src/service/access.rs @@ -0,0 +1,73 @@ +use db::database::AppDatabase; +use models::projects::project_members; +use models::rooms::room; +use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; +use uuid::Uuid; + +use crate::error::RoomError; + +pub async fn check_room_access( + db: &AppDatabase, + room_id: Uuid, + user_id: Uuid, +) -> Result<(), RoomError> { + let room = room::Entity::find_by_id(room_id) + .one(db) + .await? + .ok_or_else(|| RoomError::NotFound("Room not found".to_string()))?; + + if room.public { + return Ok(()); + } + + if require_room_member(db, room_id, user_id).await.is_ok() { + return Ok(()); + } + + check_project_member(db, room.project, user_id).await?; + + Ok(()) +} + +pub async fn check_project_member( + db: &AppDatabase, + project_id: Uuid, + user_id: Uuid, +) -> Result<(), RoomError> { + let member = project_members::Entity::find() + .filter(project_members::Column::Project.eq(project_id)) + .filter(project_members::Column::User.eq(user_id)) + .one(db) + .await?; + + if member.is_some() { + Ok(()) + } else { + Err(RoomError::NoPower) + } +} + +pub async fn require_room_member( + db: &AppDatabase, + room_id: Uuid, + user_id: Uuid, +) -> Result<(), RoomError> { + use models::rooms::room_member::{Column as RmCol, Entity as RoomMember}; + + let member = RoomMember::find() + .filter(RmCol::Room.eq(room_id)) + .filter(RmCol::User.eq(user_id)) + .one(db) + .await?; + + member + .ok_or_else(|| RoomError::NotFound("Room member not found".to_string())) + .map(|_| ()) +} + +pub async fn find_room_or_404(db: &AppDatabase, room_id: Uuid) -> Result { + room::Entity::find_by_id(room_id) + .one(db) + .await? + .ok_or_else(|| RoomError::NotFound("Room not found".to_string())) +} diff --git a/libs/room/src/service/ai_common.rs b/libs/room/src/service/ai_common.rs new file mode 100644 index 0000000..a8a9942 --- /dev/null +++ b/libs/room/src/service/ai_common.rs @@ -0,0 +1,77 @@ +use std::sync::Arc; + +use chrono::Utc; +use db::cache::AppCache; +use db::database::AppDatabase; +use queue::MessageProducer; +use uuid::Uuid; + +use super::sequence::next_room_message_seq_internal; +use crate::connection::RoomConnectionManager; +use crate::error::RoomError; + +pub async fn create_and_publish_ai_message( + db: &AppDatabase, + cache: &AppCache, + queue: &MessageProducer, + room_manager: &Arc, + room_id: Uuid, + project_id: Uuid, + _reply_to: Uuid, + content: String, + model_id: Uuid, + model_display_name: Option, +) -> Result { + let now = Utc::now(); + let seq = next_room_message_seq_internal(room_id, db, cache).await?; + let id = Uuid::now_v7(); + + let envelope = queue::RoomMessageEnvelope { + id, + dedup_key: Some(format!("{}:{}", room_id, id)), + room_id, + sender_type: "ai".to_string(), + sender_id: None, + model_id: Some(model_id), + thread_id: None, + content: content.clone(), + content_type: "text".to_string(), + thinking_content: None, + send_at: now, + seq, + in_reply_to: None, + display_name: model_display_name.clone(), + }; + + queue.publish(room_id, envelope).await?; + room_manager.metrics.messages_sent.increment(1); + + let event = queue::RoomMessageEvent { + id, + room_id, + sender_type: "ai".to_string(), + sender_id: None, + thread_id: None, + content: content.clone(), + content_type: "text".to_string(), + thinking_content: None, + send_at: now, + seq, + display_name: model_display_name, + in_reply_to: None, + reactions: None, + message_id: None, + }; + room_manager.broadcast(room_id, event).await; + + super::notifications::publish_room_event( + queue, + project_id, + crate::RoomEventType::NewMessage, + Some(room_id), + Some(id), + Some(seq), + ); + + Ok(id) +} diff --git a/libs/room/src/service/ai_nonstreaming.rs b/libs/room/src/service/ai_nonstreaming.rs new file mode 100644 index 0000000..089c757 --- /dev/null +++ b/libs/room/src/service/ai_nonstreaming.rs @@ -0,0 +1,94 @@ +use std::sync::Arc; + +use chrono::Utc; +use db::cache::AppCache; +use db::database::AppDatabase; +use models::rooms::room_ai; +use queue::MessageProducer; +use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, ExprTrait, QueryFilter}; +use uuid::Uuid; + +use super::ai_common::create_and_publish_ai_message; +use crate::connection::RoomConnectionManager; +use agent::chat::{AiRequest, ChatService}; + +pub async fn process_message_ai_nonstreaming( + chat_service: Arc, + request: AiRequest, + room_id: Uuid, + project_id: Uuid, + model_id: Uuid, + lock_guard: crate::room_ai_queue::RoomAiLockGuard, + db: AppDatabase, + cache: AppCache, + queue: MessageProducer, + room_manager: Arc, +) { + let chat_service = chat_service.clone(); + + tokio::spawn(async move { + let _lock_guard = lock_guard; + let model_display_name = request.model.name.clone(); + match chat_service.process(request).await { + Ok(result) => { + if let Err(e) = create_and_publish_ai_message( + &db, + &cache, + &queue, + &room_manager, + room_id, + project_id, + Uuid::now_v7(), + result.content, + model_id, + Some(model_display_name), + ) + .await + { + tracing::error!(error = %e, "Failed to create AI message"); + } else { + let now = Utc::now(); + if let Err(e) = room_ai::Entity::update_many() + .col_expr( + room_ai::Column::CallCount, + Expr::col(room_ai::Column::CallCount).add(1), + ) + .col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now))) + .filter(room_ai::Column::Room.eq(room_id)) + .filter(room_ai::Column::Model.eq(model_id)) + .exec(&db) + .await + { + tracing::warn!(error = %e, "Failed to update room_ai call stats"); + } + + // Record billing (non-fatal) + let _ = super::billing::record_ai_usage( + &db, + project_id, + model_id, + result.input_tokens, + result.output_tokens, + ) + .await; + } + } + Err(e) => { + tracing::error!(error = %e, "AI processing failed"); + let _ = create_and_publish_ai_message( + &db, + &cache, + &queue, + &room_manager, + room_id, + project_id, + Uuid::now_v7(), + format!("[AI error: {}]", e), + model_id, + Some(model_display_name), + ) + .await; + } + } + }); +} diff --git a/libs/room/src/service/ai_react_nonstreaming.rs b/libs/room/src/service/ai_react_nonstreaming.rs new file mode 100644 index 0000000..1b23bcb --- /dev/null +++ b/libs/room/src/service/ai_react_nonstreaming.rs @@ -0,0 +1,98 @@ +use std::sync::Arc; + +use chrono::Utc; +use db::cache::AppCache; +use db::database::AppDatabase; +use models::rooms::room_ai; +use queue::MessageProducer; +use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, ExprTrait, QueryFilter}; +use uuid::Uuid; + +use super::ai_common::create_and_publish_ai_message; +use crate::connection::RoomConnectionManager; +use agent::chat::{AiRequest, ChatService}; + +pub async fn process_message_ai_react_nonstreaming( + chat_service: Arc, + request: AiRequest, + room_id: Uuid, + project_id: Uuid, + model_id: Uuid, + lock_guard: crate::room_ai_queue::RoomAiLockGuard, + db: AppDatabase, + cache: AppCache, + queue: MessageProducer, + room_manager: Arc, +) { + tokio::spawn(async move { + let _lock_guard = lock_guard; + let model_display_name = request.model.name.clone(); + + let final_answer = chat_service + .process_react(&request, |_step| {}) + .await; + + match final_answer { + Ok(response) => { + if let Err(e) = create_and_publish_ai_message( + &db, + &cache, + &queue, + &room_manager, + room_id, + project_id, + Uuid::now_v7(), + response, + model_id, + Some(model_display_name), + ) + .await + { + tracing::error!(error = %e, "Failed to create ReAct AI message"); + } else { + let now = Utc::now(); + if let Err(e) = room_ai::Entity::update_many() + .col_expr( + room_ai::Column::CallCount, + Expr::col(room_ai::Column::CallCount).add(1), + ) + .col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now))) + .filter(room_ai::Column::Room.eq(room_id)) + .filter(room_ai::Column::Model.eq(model_id)) + .exec(&db) + .await + { + tracing::warn!(error = %e, "Failed to update room_ai call stats"); + } + + // Record billing (non-fatal) + // TODO: ReAct agent does not track token counts yet; billing with 0/0 + let _ = super::billing::record_ai_usage( + &db, + project_id, + model_id, + 0, + 0, + ) + .await; + } + } + Err(e) => { + tracing::error!(error = %e, "ReAct agent failed"); + let _ = create_and_publish_ai_message( + &db, + &cache, + &queue, + &room_manager, + room_id, + project_id, + Uuid::now_v7(), + format!("[AI error: {}]", e), + model_id, + Some(model_display_name), + ) + .await; + } + } + }); +} diff --git a/libs/room/src/service/ai_react_streaming.rs b/libs/room/src/service/ai_react_streaming.rs new file mode 100644 index 0000000..5e7c679 --- /dev/null +++ b/libs/room/src/service/ai_react_streaming.rs @@ -0,0 +1,266 @@ +use std::sync::Arc; + +use chrono::Utc; +use db::cache::AppCache; +use db::database::AppDatabase; +use models::rooms::room_ai; +use queue::{MessageProducer, ProjectRoomEvent, RoomMessageEnvelope}; +use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, ExprTrait, QueryFilter}; +use uuid::Uuid; + +use super::sequence::next_room_message_seq_internal; +use crate::connection::RoomConnectionManager; +use agent::chat::{AiRequest, ChatService}; +use agent::react::ReactStep; + +pub async fn process_message_ai_react_streaming( + chat_service: Arc, + request: AiRequest, + room_id: Uuid, + project_id: Uuid, + model_id: Uuid, + lock_guard: crate::room_ai_queue::RoomAiLockGuard, + db: AppDatabase, + _cache: AppCache, + queue: MessageProducer, + room_manager: Arc, +) { + use queue::RoomMessageStreamChunkEvent; + + let streaming_msg_id = Uuid::now_v7(); + let seq = match next_room_message_seq_internal(room_id, &db, &_cache).await { + Ok(s) => s, + Err(e) => { + tracing::error!(error = %e, "Failed to get seq for ReAct streaming"); + return; + } + }; + + let room_id_inner = room_id; + let project_id_inner = project_id; + let now = Utc::now(); + let sender_type = "ai".to_string(); + let ai_display_name = request.model.name.clone(); + + tokio::spawn(async move { + let _lock_guard = lock_guard; + + // Collect ordered steps for storage and streaming. + let steps: std::sync::Arc>> = + std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); + let last_action_name: std::sync::Arc> = + std::sync::Arc::new(std::sync::Mutex::new(String::new())); + let answer_buffer: std::sync::Arc> = + std::sync::Arc::new(std::sync::Mutex::new(String::new())); + let step_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); + + let on_step = { + let room_manager = room_manager.clone(); + let streaming_msg_id = streaming_msg_id; + let room_id = room_id_inner; + let step_count = step_count.clone(); + let ai_display_name_for_step = std::sync::Arc::new(ai_display_name.clone()); + let steps = steps.clone(); + let answer_buffer = answer_buffer.clone(); + let last_action_name = last_action_name.clone(); + move |step: ReactStep| { + let room_manager = room_manager.clone(); + let (chunk_type, content) = match &step { + ReactStep::Thought { step: _, thought } => { + ("thinking".to_string(), format!("[Thinking] {}", thought)) + } + ReactStep::Action { step: _, action } => { + *last_action_name.lock().unwrap() = action.name.clone(); + ("tool_call".to_string(), format!("[Action] Calling `{}` with {:?}", action.name, action.args)) + } + ReactStep::Observation { + step: _, + observation: _, + } => { + // Sanitize observation — don't expose raw tool output to frontend + let action_name = last_action_name.lock().unwrap().clone(); + ("tool_call".to_string(), format!("[Observation] {} (completed)", action_name)) + } + ReactStep::Answer { step: _, answer } => { + ("answer".to_string(), answer.clone()) + } + }; + + let is_answer = matches!(&step, ReactStep::Answer { .. }); + if is_answer { + step_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + + // Record ordered step for storage + { + let mut s = steps.lock().unwrap(); + s.push((chunk_type.clone(), content.clone())); + } + if is_answer { + let mut ab = answer_buffer.lock().unwrap(); + ab.push_str(&content); + } + + let done = is_answer; + let ai_name = ai_display_name_for_step.clone(); + tokio::spawn(async move { + let event = RoomMessageStreamChunkEvent { + message_id: streaming_msg_id, + room_id, + content: content.clone(), + done, + error: None, + display_name: Some((*ai_name).clone()), + chunk_type: Some(chunk_type), + }; + room_manager.broadcast_stream_chunk(event).await; + }); + } + }; + + let result = chat_service.process_react(&request, on_step).await; + + let final_content = answer_buffer.lock().unwrap().clone(); + let all_steps = steps.lock().unwrap().clone(); + let reasoning_chain: String = all_steps + .iter() + .filter(|(t, _)| t != "answer") + .map(|(_, c)| c.clone()) + .collect::>() + .join("\n"); + + let content_to_persist = if !final_content.is_empty() { + final_content + } else if !reasoning_chain.trim().is_empty() { + format!( + "[Agent ran through {} reasoning steps but did not produce a final answer.]\n{}", + step_count.load(std::sync::atomic::Ordering::Relaxed), + reasoning_chain.trim_end() + ) + } else { + String::from("[No output from reasoning agent]") + }; + + let (err_msg, should_log) = match &result { + Err(e) => (Some(format!("[Agent error: {}]", e)), true), + _ => (None, false), + }; + + let content_to_persist = if let Some(msg) = &err_msg { + format!( + "{}\n[Error during reasoning: {}]", + content_to_persist.trim_end(), + msg.trim_start_matches("[Agent error: ") + .trim_end_matches("]") + ) + } else { + content_to_persist + }; + + if should_log { + tracing::error!(error = %result.as_ref().unwrap_err(), "ReAct streaming failed"); + } + + let persist_content = content_to_persist.trim().to_string(); + if persist_content.is_empty() { + return; + } + + // Serialize ordered steps as JSON for ordered replay. + let thinking_content = { + let steps = steps.lock().unwrap(); + if steps.is_empty() { + None + } else { + let chunks_json = serde_json::json!({ + "__chunks__": steps.iter().map(|(t, c)| serde_json::json!({ + "type": t, + "content": c, + })).collect::>(), + }); + Some(chunks_json.to_string()) + } + }; + + let envelope = RoomMessageEnvelope { + id: streaming_msg_id, + dedup_key: Some(format!("{}:{}", room_id_inner, streaming_msg_id)), + room_id: room_id_inner, + sender_type: sender_type.clone(), + sender_id: None, + model_id: Some(model_id), + thread_id: None, + content: persist_content.clone(), + content_type: "text".to_string(), + thinking_content, + send_at: now, + seq, + in_reply_to: None, + display_name: Some(ai_display_name.clone()), + }; + + if let Err(e) = queue.publish(room_id_inner, envelope).await { + tracing::error!(error = %e, "Failed to publish ReAct streaming message"); + } else { + let now = Utc::now(); + if let Err(e) = room_ai::Entity::update_many() + .col_expr( + room_ai::Column::CallCount, + Expr::col(room_ai::Column::CallCount).add(1), + ) + .col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now))) + .filter(room_ai::Column::Room.eq(room_id_inner)) + .filter(room_ai::Column::Model.eq(model_id)) + .exec(&db) + .await + { + tracing::warn!(error = %e, "Failed to update room_ai call stats"); + } + + // Record billing (non-fatal) + // TODO: ReAct agent does not track token counts yet; billing with 0/0 + let _ = super::billing::record_ai_usage( + &db, + project_id_inner, + model_id, + 0, + 0, + ) + .await; + + let msg_event = queue::RoomMessageEvent { + id: streaming_msg_id, + room_id: room_id_inner, + sender_type: sender_type.clone(), + sender_id: None, + thread_id: None, + content: persist_content, + content_type: "text".to_string(), + thinking_content: None, + send_at: now, + seq, + display_name: Some(ai_display_name.clone()), + in_reply_to: None, + reactions: None, + message_id: None, + }; + room_manager.broadcast(room_id_inner, msg_event).await; + room_manager.metrics.messages_sent.increment(1); + + let event = ProjectRoomEvent { + event_type: crate::RoomEventType::NewMessage.as_str().into(), + project_id: project_id_inner, + room_id: Some(room_id_inner), + category_id: None, + message_id: Some(streaming_msg_id), + seq: Some(seq), + timestamp: now, + }; + queue + .publish_project_room_event(project_id_inner, event) + .await; + } + + room_manager.close_stream_channel(streaming_msg_id).await; + }); +} diff --git a/libs/room/src/service/ai_streaming.rs b/libs/room/src/service/ai_streaming.rs new file mode 100644 index 0000000..e1254d5 --- /dev/null +++ b/libs/room/src/service/ai_streaming.rs @@ -0,0 +1,274 @@ +use std::pin::Pin; +use std::sync::Arc; + +use chrono::Utc; +use db::cache::AppCache; +use db::database::AppDatabase; +use models::rooms::room_ai; +use queue::{MessageProducer, ProjectRoomEvent, RoomMessageEnvelope}; +use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, ExprTrait, QueryFilter}; +use uuid::Uuid; + +use super::sequence::next_room_message_seq_internal; +use crate::connection::RoomConnectionManager; +use agent::chat::{AiRequest, ChatService}; + +pub async fn process_message_ai_streaming( + chat_service: Arc, + request: AiRequest, + room_id: Uuid, + project_id: Uuid, + model_id: Uuid, + lock_guard: crate::room_ai_queue::RoomAiLockGuard, + db: AppDatabase, + cache: AppCache, + queue: MessageProducer, + room_manager: Arc, +) { + use queue::RoomMessageStreamChunkEvent; + + let streaming_msg_id = Uuid::now_v7(); + let seq = match next_room_message_seq_internal(room_id, &db, &cache).await { + Ok(s) => s, + Err(e) => { + tracing::error!(error = %e, "Failed to get seq for streaming AI message"); + return; + } + }; + + let _ = room_manager + .register_stream_channel(streaming_msg_id) + .await; + + let initial_event = RoomMessageStreamChunkEvent { + message_id: streaming_msg_id, + room_id, + content: String::new(), + done: false, + error: None, + display_name: Some(request.model.name.clone()), + chunk_type: Some("thinking".to_string()), + }; + room_manager.broadcast_stream_chunk(initial_event).await; + + let room_id_inner = room_id; + let project_id_inner = project_id; + let now = Utc::now(); + let sender_type = "ai".to_string(); + let ai_display_name = request.model.name.clone(); + + tokio::spawn(async move { + let _lock_guard = lock_guard; + let ai_typing_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(); + let ai_display_name_for_chunk = ai_display_name.clone(); + let ai_display_name_for_final = ai_display_name.clone(); + + let chunk_count = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); + let room_manager_cb = room_manager.clone(); + + let on_chunk = move |chunk: agent::chat::AiStreamChunk| { + Box::pin({ + let room_manager = room_manager_cb.clone(); + let streaming_msg_id = streaming_msg_id; + let room_id = room_id_inner; + let chunk_count = chunk_count.clone(); + let ai_display_name_for_chunk = ai_display_name_for_chunk.clone(); + async move { + let chunk_type_str = match chunk.chunk_type { + agent::chat::AiChunkType::Thinking => "thinking", + agent::chat::AiChunkType::Answer => "answer", + agent::chat::AiChunkType::ToolCall => "tool_call", + agent::chat::AiChunkType::ToolResult => "tool_result", + }; + let event = RoomMessageStreamChunkEvent { + message_id: streaming_msg_id, + room_id, + content: chunk.content, + done: chunk.done, + error: None, + display_name: Some(ai_display_name_for_chunk), + chunk_type: Some(chunk_type_str.to_string()), + }; + room_manager.broadcast_stream_chunk(event).await; + chunk_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + }) as Pin + Send>> + }; + + let stream_callback: agent::chat::StreamCallback = Box::new(on_chunk); + + let typing_start = queue::TypingEvent { + room_id: room_id_inner, + user_id: ai_typing_id, + username: ai_display_name.clone(), + avatar_url: None, + action: "start".to_string(), + sender_type: Some("ai".to_string()), + }; + room_manager.broadcast_typing(room_id_inner, typing_start.clone()).await; + + let (typing_cancel_tx, typing_cancel_rx) = tokio::sync::oneshot::channel::<()>(); + let typing_renew_handle = tokio::spawn({ + let mut interval = tokio::time::interval(std::time::Duration::from_secs(30)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mgr = room_manager.clone(); + let rid = room_id_inner; + let evt = typing_start.clone(); + async move { + tokio::select! { + _ = typing_cancel_rx => {} + _ = async { + loop { + interval.tick().await; + mgr.broadcast_typing(rid, evt.clone()).await; + } + } => {} + } + } + }); + + match chat_service.process_stream(request, stream_callback).await { + Ok(result) => { + // Store ordered chunks as JSON in thinking_content for ordered replay. + // Uses {"__chunks__": [...]} marker so legacy plain-text still works. + let thinking_content = if result.chunks.is_empty() { + None + } else { + let chunks_json = serde_json::json!({ + "__chunks__": result.chunks.iter().map(|c| { + let type_str = match c.chunk_type { + agent::client::StreamChunkType::Thinking => "thinking", + agent::client::StreamChunkType::Answer => "answer", + agent::client::StreamChunkType::ToolCall => "tool_call", + }; + serde_json::json!({ + "type": type_str, + "content": c.content, + }) + }).collect::>(), + }); + Some(chunks_json.to_string()) + }; + let envelope = RoomMessageEnvelope { + id: streaming_msg_id, + dedup_key: Some(format!("{}:{}", room_id_inner, streaming_msg_id)), + room_id: room_id_inner, + sender_type: sender_type.clone(), + sender_id: None, + model_id: Some(model_id), + thread_id: None, + content: result.content.clone(), + content_type: "text".to_string(), + thinking_content: thinking_content.clone(), + send_at: now, + seq, + in_reply_to: None, + display_name: Some(ai_display_name_for_final.clone()), + }; + + if let Err(e) = queue.publish(room_id_inner, envelope).await { + tracing::error!(error = %e, "Failed to publish streaming AI message"); + } else { + let now = Utc::now(); + if let Err(e) = room_ai::Entity::update_many() + .col_expr( + room_ai::Column::CallCount, + Expr::col(room_ai::Column::CallCount).add(1), + ) + .col_expr( + room_ai::Column::LastCallAt, + Expr::value(Some(now)), + ) + .filter(room_ai::Column::Room.eq(room_id_inner)) + .filter(room_ai::Column::Model.eq(model_id)) + .exec(&db) + .await + { + tracing::warn!(error = %e, "Failed to update room_ai call stats"); + } + + // Record billing (non-fatal) + let _ = super::billing::record_ai_usage( + &db, + project_id_inner, + model_id, + result.input_tokens, + result.output_tokens, + ) + .await; + + let msg_event = queue::RoomMessageEvent { + id: streaming_msg_id, + room_id: room_id_inner, + sender_type: sender_type.clone(), + sender_id: None, + thread_id: None, + content: result.content.clone(), + content_type: "text".to_string(), + thinking_content: thinking_content.clone(), + send_at: now, + seq, + display_name: Some(ai_display_name_for_final.clone()), + in_reply_to: None, + reactions: None, + message_id: None, + }; + room_manager.broadcast(room_id_inner, msg_event).await; + room_manager.metrics.messages_sent.increment(1); + + let _ = typing_cancel_tx.send(()); + typing_renew_handle.abort(); + let typing_stop = queue::TypingEvent { + room_id: room_id_inner, + user_id: ai_typing_id, + username: ai_display_name_for_final.clone(), + avatar_url: None, + action: "stop".to_string(), + sender_type: Some("ai".to_string()), + }; + room_manager.broadcast_typing(room_id_inner, typing_stop).await; + + let event = ProjectRoomEvent { + event_type: crate::RoomEventType::NewMessage.as_str().into(), + project_id: project_id_inner, + room_id: Some(room_id_inner), + category_id: None, + message_id: Some(streaming_msg_id), + seq: Some(seq), + timestamp: now, + }; + queue + .publish_project_room_event(project_id_inner, event) + .await; + } + } + Err(e) => { + tracing::error!(error = %e, "AI streaming failed"); + let _ = typing_cancel_tx.send(()); + typing_renew_handle.abort(); + let typing_stop = queue::TypingEvent { + room_id: room_id_inner, + user_id: ai_typing_id, + username: ai_display_name.clone(), + avatar_url: None, + action: "stop".to_string(), + sender_type: Some("ai".to_string()), + }; + room_manager.broadcast_typing(room_id_inner, typing_stop).await; + + let event = RoomMessageStreamChunkEvent { + message_id: streaming_msg_id, + room_id: room_id_inner, + content: String::new(), + done: true, + error: Some(e.to_string()), + display_name: Some(ai_display_name.clone()), + chunk_type: None, + }; + room_manager.broadcast_stream_chunk(event).await; + } + } + + room_manager.close_stream_channel(streaming_msg_id).await; + }); +} diff --git a/libs/room/src/service/billing.rs b/libs/room/src/service/billing.rs new file mode 100644 index 0000000..6823ddb --- /dev/null +++ b/libs/room/src/service/billing.rs @@ -0,0 +1,51 @@ +//! AI usage billing helper for room service. +//! +//! Delegates to `agent::billing::record_ai_usage`. +//! Billing is non-fatal — failures are logged but do not block AI responses. + +use db::database::AppDatabase; +use uuid::Uuid; + +use crate::error::RoomError; + +/// Record AI token usage against a project's billing balance. +/// +/// Returns `Ok(())` on success. On billing failure (e.g. insufficient balance, +/// missing pricing), returns `Err` but the caller should still complete the AI +/// request — billing is a non-critical side-effect. +pub async fn record_ai_usage( + db: &AppDatabase, + project_id: Uuid, + model_id: Uuid, + input_tokens: i64, + output_tokens: i64, +) -> Result<(), RoomError> { + if input_tokens == 0 && output_tokens == 0 { + return Ok(()); + } + + match agent::billing::record_ai_usage(db, project_id, model_id, input_tokens, output_tokens).await { + Ok(record) => { + tracing::info!( + project_id = %project_id, + model_id = %model_id, + input_tokens = input_tokens, + output_tokens = output_tokens, + cost_usd = %record.cost, + "ai_usage_recorded" + ); + Ok(()) + } + Err(e) => { + tracing::warn!( + project_id = %project_id, + model_id = %model_id, + input_tokens = input_tokens, + output_tokens = output_tokens, + error = %e, + "ai_billing_failed_non_fatal" + ); + Err(e.into()) + } + } +} diff --git a/libs/room/src/service/history.rs b/libs/room/src/service/history.rs new file mode 100644 index 0000000..c225f16 --- /dev/null +++ b/libs/room/src/service/history.rs @@ -0,0 +1,62 @@ +use db::database::AppDatabase; +use models::rooms::room_ai; +use models::rooms::room_message::{Column as RmCol, Entity as RoomMessage}; +use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QueryOrder, QuerySelect}; +use uuid::Uuid; + +use crate::error::RoomError; + +pub async fn get_room_history( + db: &AppDatabase, + room_id: Uuid, + limit: usize, +) -> Result, RoomError> { + let messages = RoomMessage::find() + .filter(RmCol::Room.eq(room_id)) + .order_by_desc(RmCol::Seq) + .limit(limit as u64) + .all(db) + .await?; + + Ok(messages) +} + +pub async fn get_user_names( + db: &AppDatabase, + user_ids: &[Uuid], +) -> std::collections::HashMap { + use models::users::User; + + let mut names = std::collections::HashMap::new(); + if user_ids.is_empty() { + return names; + } + + let users = User::find() + .filter(models::users::user::Column::Uid.is_in(user_ids.to_vec())) + .all(db) + .await + .unwrap_or_default(); + + for user in users { + names.insert(user.uid, user.username); + } + + names +} + +pub async fn get_room_ai_config( + db: &AppDatabase, + room_id: Uuid, +) -> Result, RoomError> { + let ai_config = room_ai::Entity::find() + .filter(room_ai::Column::Room.eq(room_id)) + .one(db) + .await?; + + Ok(ai_config) +} + +pub async fn extract_mention_context(_content: &str) -> Vec { + Vec::new() +} diff --git a/libs/room/src/service/mentions.rs b/libs/room/src/service/mentions.rs new file mode 100644 index 0000000..6ff7f4a --- /dev/null +++ b/libs/room/src/service/mentions.rs @@ -0,0 +1,48 @@ +use uuid::Uuid; + +use super::patterns::{mention_bracket_re, mention_tag_re, user_mention_re}; + +/// Extracts user UUIDs from all mention formats: +/// - Legacy: `uuid` +/// - Legacy: `label` +/// - New: `@[user:uuid:label]` +pub fn extract_mentions(content: &str) -> Vec { + let mut mentioned = Vec::new(); + + for cap in user_mention_re().captures_iter(content) { + if let Some(inner) = cap.get(1) { + let token = inner.as_str().trim(); + if let Ok(uuid) = Uuid::parse_str(token) { + if !mentioned.contains(&uuid) { + mentioned.push(uuid); + } + } + } + } + + for cap in mention_tag_re().captures_iter(content) { + if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { + if type_m.as_str() == "user" { + if let Ok(uuid) = Uuid::parse_str(id_m.as_str().trim()) { + if !mentioned.contains(&uuid) { + mentioned.push(uuid); + } + } + } + } + } + + for cap in mention_bracket_re().captures_iter(content) { + if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { + if type_m.as_str() == "user" { + if let Ok(uuid) = Uuid::parse_str(id_m.as_str().trim()) { + if !mentioned.contains(&uuid) { + mentioned.push(uuid); + } + } + } + } + } + + mentioned +} diff --git a/libs/room/src/service/mod.rs b/libs/room/src/service/mod.rs new file mode 100644 index 0000000..1cfea77 --- /dev/null +++ b/libs/room/src/service/mod.rs @@ -0,0 +1,466 @@ +mod access; +mod billing; +mod ai_common; +mod ai_nonstreaming; +mod ai_react_nonstreaming; +mod ai_react_streaming; +mod ai_streaming; +mod history; +mod mentions; +mod notifications; +mod patterns; +mod sequence; +mod workers; + +pub use access::{check_room_access, check_project_member, require_room_member, find_room_or_404}; +pub use ai_common::create_and_publish_ai_message; +pub use ai_nonstreaming::process_message_ai_nonstreaming; +pub use ai_react_nonstreaming::process_message_ai_react_nonstreaming; +pub use ai_react_streaming::process_message_ai_react_streaming; +pub use ai_streaming::process_message_ai_streaming; +pub use history::{get_room_history, get_user_names, get_room_ai_config, extract_mention_context}; +pub use mentions::extract_mentions; +pub use notifications::{notify_project_members, publish_room_event}; +pub use sequence::next_room_message_seq_internal; +pub use workers::{start_workers, spawn_agent_task, spawn_room_workers, PushNotificationFn}; + +use std::sync::Arc; + +use chrono::Utc; +use db::cache::AppCache; +use db::database::AppDatabase; +use models::rooms::room; +use models::rooms::room_ai; +use queue::{MessageProducer, ProjectRoomEvent}; +use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; +use uuid::Uuid; + +use crate::connection::{RoomConnectionManager, DedupCache}; +use crate::error::RoomError; +use agent::chat::{AiRequest, ChatService}; +use agent::embed::EmbedService; +use agent::TaskService; +use models::agent_task::AgentType; + +use crate::service::patterns::{mention_bracket_re, mention_tag_re}; + +const DEFAULT_MAX_CONCURRENT_WORKERS: usize = 1024; + +#[derive(Clone)] +pub struct RoomService { + pub db: AppDatabase, + pub cache: AppCache, + pub config: config::AppConfig, + pub room_manager: Arc, + pub queue: MessageProducer, + pub redis_url: String, + pub chat_service: Option>, + pub task_service: Option>, + pub embed_service: Option>, + pub push_fn: Option, + worker_semaphore: Arc, + dedup_cache: DedupCache, +} + +impl RoomService { + pub fn new( + db: AppDatabase, + cache: AppCache, + config: config::AppConfig, + queue: MessageProducer, + room_manager: Arc, + redis_url: String, + chat_service: Option>, + task_service: Option>, + max_concurrent_workers: Option, + push_fn: Option, + embed_service: Option>, + ) -> Self { + let dedup_cache: DedupCache = + Arc::new(dashmap::DashMap::with_capacity_and_hasher(10000, Default::default())); + Self { + db, + cache, + config, + room_manager, + queue, + redis_url, + chat_service, + task_service, + embed_service, + worker_semaphore: Arc::new(tokio::sync::Semaphore::new( + max_concurrent_workers.unwrap_or(DEFAULT_MAX_CONCURRENT_WORKERS), + )), + dedup_cache, + push_fn, + } + } + + pub async fn start_workers( + &self, + shutdown_rx: tokio::sync::broadcast::Receiver<()>, + ) -> anyhow::Result<()> { + workers::start_workers( + self.db.clone(), + self.cache.clone(), + self.room_manager.clone(), + self.queue.clone(), + self.redis_url.clone(), + self.dedup_cache.clone(), + self.task_service.clone(), + None, // max_concurrent_workers handled by semaphore + shutdown_rx, + ) + .await + } + + pub async fn spawn_agent_task( + &self, + project_id: Uuid, + agent_type: AgentType, + input: String, + _title: Option, + execute: F, + ) -> anyhow::Result + where + F: FnOnce(i64, Arc) -> Fut + Send + 'static, + Fut: std::future::Future> + Send, + { + let task_service = match &self.task_service { + Some(ts) => ts.clone(), + None => return Err(anyhow::anyhow!("task service not configured")), + }; + + workers::spawn_agent_task( + project_id, + agent_type, + input, + task_service, + self.queue.clone(), + self.room_manager.clone(), + self.worker_semaphore.clone(), + execute, + ) + .await + } + + pub fn spawn_room_workers(&self, room_id: uuid::Uuid) { + workers::spawn_room_workers( + room_id, + self.db.clone(), + self.room_manager.clone(), + self.queue.clone(), + self.redis_url.clone(), + self.worker_semaphore.clone(), + ); + } + + pub async fn publish_room_event( + &self, + project_id: uuid::Uuid, + event_type: super::RoomEventType, + room_id: Option, + category_id: Option, + message_id: Option, + seq: Option, + ) { + let event = ProjectRoomEvent { + event_type: event_type.as_str().into(), + project_id, + room_id, + category_id, + message_id, + seq, + timestamp: Utc::now(), + }; + self.queue + .publish_project_room_event(project_id, event) + .await; + } + + pub fn notify_project_members( + &self, + project_id: uuid::Uuid, + notification_type: super::NotificationType, + title: String, + content: Option, + related_room_id: Option, + ) { + notifications::notify_project_members( + self.db.clone(), + project_id, + notification_type, + title, + content, + related_room_id, + ); + } + + pub fn extract_mentions(content: &str) -> Vec { + mentions::extract_mentions(content) + } + + pub async fn resolve_mentions(&self, content: &str) -> Vec { + use models::users::User; + use sea_orm::EntityTrait; + + let mut resolved: Vec = Vec::new(); + let mut seen_usernames: Vec = Vec::new(); + + for cap in mention_bracket_re().captures_iter(content) { + if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { + if type_m.as_str() == "user" { + let id = id_m.as_str().trim(); + if let Ok(uuid) = Uuid::parse_str(id) { + if !resolved.contains(&uuid) { + resolved.push(uuid); + } + } else if let Some(label_m) = cap.get(3) { + let label = label_m.as_str().trim(); + if !label.is_empty() { + let label_lower = label.to_lowercase(); + if seen_usernames.contains(&label_lower) { + continue; + } + seen_usernames.push(label_lower.clone()); + + if let Some(user) = User::find() + .filter(models::users::user::Column::Username.eq(label_lower)) + .one(&self.db) + .await + .ok() + .flatten() + { + if !resolved.contains(&user.uid) { + resolved.push(user.uid); + } + } + } + } + } + } + } + + resolved + } + + pub async fn check_room_access(&self, room_id: Uuid, user_id: Uuid) -> Result<(), RoomError> { + access::check_room_access(&self.db, room_id, user_id).await + } + + pub async fn check_project_member( + &self, + project_id: Uuid, + user_id: Uuid, + ) -> Result<(), RoomError> { + access::check_project_member(&self.db, project_id, user_id).await + } + + pub async fn should_ai_respond(&self, room_id: Uuid, content: &str) -> Result { + let ai_config = history::get_room_ai_config(&self.db, room_id).await?; + + let config = match ai_config { + Some(c) => c, + None => return Ok(false), + }; + + if !config.use_exact { + return Ok(true); + } + + let model_id_str = config.model.to_string(); + + for cap in mention_bracket_re().captures_iter(content) { + if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { + if type_m.as_str() == "ai" && id_m.as_str().trim() == model_id_str { + return Ok(true); + } + } + } + + for cap in mention_tag_re().captures_iter(content) { + if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { + if type_m.as_str() == "ai" && id_m.as_str().trim() == model_id_str { + return Ok(true); + } + } + } + + Ok(false) + } + + pub async fn get_room_ai_config( + &self, + room_id: Uuid, + ) -> Result, RoomError> { + history::get_room_ai_config(&self.db, room_id).await + } + + pub async fn get_user_names( + &self, + user_ids: &[Uuid], + ) -> std::collections::HashMap { + history::get_user_names(&self.db, user_ids).await + } + + pub async fn require_room_member(&self, room_id: Uuid, user_id: Uuid) -> Result<(), RoomError> { + access::require_room_member(&self.db, room_id, user_id).await + } + + pub async fn find_room_or_404(&self, room_id: Uuid) -> Result { + access::find_room_or_404(&self.db, room_id).await + } + + pub async fn process_message_ai( + &self, + room_id: Uuid, + _message_id: Uuid, + sender_id: Uuid, + content: String, + ) -> Result<(), RoomError> { + let Some(chat_service) = &self.chat_service else { + return Ok(()); + }; + + let Some(ai_config) = self.get_room_ai_config(room_id).await? else { + return Ok(()); + }; + + let Some(lock_guard) = + crate::room_ai_queue::acquire_room_ai_lock(&self.cache, room_id).await? + else { + return Ok(()); + }; + + let room = self.find_room_or_404(room_id).await?; + + let project = models::projects::project::Entity::find_by_id(room.project) + .one(&self.db) + .await? + .ok_or_else(|| RoomError::NotFound("Project not found".to_string()))?; + + let mentioned_model_id = { + let mut found = None; + for cap in mention_bracket_re().captures_iter(&content) { + if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { + if type_m.as_str() == "ai" { + if let Ok(uuid) = Uuid::parse_str(id_m.as_str().trim()) { + found = Some(uuid); + break; + } + } + } + } + found + }; + + let model_id = mentioned_model_id.unwrap_or(ai_config.model); + let model = models::agents::model::Entity::find_by_id(model_id) + .one(&self.db) + .await? + .ok_or_else(|| RoomError::NotFound("AI model not found".to_string()))?; + + let sender = models::users::User::find_by_id(sender_id) + .one(&self.db) + .await? + .ok_or_else(|| RoomError::NotFound("Sender not found".to_string()))?; + + let history = history::get_room_history(&self.db, room_id, 50).await?; + + let user_ids: Vec = history + .iter() + .filter_map(|m| m.sender_id) + .chain(std::iter::once(sender_id)) + .collect(); + let user_names = self.get_user_names(&user_ids).await; + + let mentions = history::extract_mention_context(&content).await; + + let request = AiRequest { + db: self.db.clone(), + cache: self.cache.clone(), + config: self.config.clone(), + model, + project: project.clone(), + sender, + room: room.clone(), + input: content, + mention: mentions, + history, + user_names, + temperature: ai_config.temperature.unwrap_or(0.7), + max_tokens: ai_config.max_tokens.unwrap_or(4096) as i32, + top_p: 1.0, + frequency_penalty: 0.0, + presence_penalty: 0.0, + think: ai_config.think, + tools: Some(chat_service.tools()), + max_tool_depth: 1000, + }; + + let use_streaming = ai_config.stream; + let is_react = ai_config.agent_type.as_deref() == Some("react"); + + if is_react { + if use_streaming { + ai_react_streaming::process_message_ai_react_streaming( + chat_service.clone(), + request, + room_id, + room.project, + model_id, + lock_guard, + self.db.clone(), + self.cache.clone(), + self.queue.clone(), + self.room_manager.clone(), + ) + .await; + } else { + ai_react_nonstreaming::process_message_ai_react_nonstreaming( + chat_service.clone(), + request, + room_id, + room.project, + model_id, + lock_guard, + self.db.clone(), + self.cache.clone(), + self.queue.clone(), + self.room_manager.clone(), + ) + .await; + } + } else if use_streaming { + ai_streaming::process_message_ai_streaming( + chat_service.clone(), + request, + room_id, + room.project, + model_id, + lock_guard, + self.db.clone(), + self.cache.clone(), + self.queue.clone(), + self.room_manager.clone(), + ) + .await; + } else { + ai_nonstreaming::process_message_ai_nonstreaming( + chat_service.clone(), + request, + room_id, + room.project, + model_id, + lock_guard, + self.db.clone(), + self.cache.clone(), + self.queue.clone(), + self.room_manager.clone(), + ) + .await; + } + + Ok(()) + } +} diff --git a/libs/room/src/service/notifications.rs b/libs/room/src/service/notifications.rs new file mode 100644 index 0000000..d17f17d --- /dev/null +++ b/libs/room/src/service/notifications.rs @@ -0,0 +1,134 @@ +use chrono::Utc; +use db::database::AppDatabase; +use models::projects::project_members; +use queue::ProjectRoomEvent; +use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; +use uuid::Uuid; + +use crate::error::RoomError; + +pub fn notify_project_members( + db: AppDatabase, + project_id: Uuid, + notification_type: crate::NotificationType, + title: String, + content: Option, + related_room_id: Option, +) { + let notification_type_inner = notification_type; + let title_inner = title; + let content_inner = content; + let related_room_id_inner = related_room_id; + let project_id_inner = project_id; + + tokio::spawn(async move { + let members = match project_members::Entity::find() + .filter(project_members::Column::Project.eq(project_id_inner)) + .all(&db) + .await + { + Ok(m) => m, + Err(e) => { + tracing::error!(project_id = %project_id_inner, error = %e, + "notify_project_members: failed to fetch members"); + return; + } + }; + + for member in members { + let user_id = member.user; + if let Err(e) = create_notification_sync( + &db, + notification_type_inner, + user_id, + title_inner.clone(), + content_inner.clone(), + related_room_id_inner, + project_id_inner, + ) + .await + { + tracing::warn!(user_id = %user_id, project_id = %project_id_inner, error = %e, + "notify_project_members: failed to create notification for user"); + } + } + }); +} + +async fn create_notification_sync( + db: &AppDatabase, + notification_type: crate::NotificationType, + user_id: Uuid, + title: String, + content: Option, + related_room_id: Option, + project_id: Uuid, +) -> Result<(), RoomError> { + use models::rooms::room_notifications; + use sea_orm::{ActiveModelTrait, Set}; + + let notification_type_model = match notification_type { + crate::NotificationType::Mention => room_notifications::NotificationType::Mention, + crate::NotificationType::Invitation => room_notifications::NotificationType::Invitation, + crate::NotificationType::RoleChange => room_notifications::NotificationType::RoleChange, + crate::NotificationType::RoomCreated => room_notifications::NotificationType::RoomCreated, + crate::NotificationType::RoomDeleted => room_notifications::NotificationType::RoomDeleted, + crate::NotificationType::SystemAnnouncement => { + room_notifications::NotificationType::SystemAnnouncement + } + crate::NotificationType::ProjectInvitation => { + room_notifications::NotificationType::ProjectInvitation + } + crate::NotificationType::WorkspaceInvitation => { + room_notifications::NotificationType::WorkspaceInvitation + } + }; + + let _model = room_notifications::ActiveModel { + id: Set(Uuid::now_v7()), + room: Set(related_room_id), + project: Set(Some(project_id)), + user_id: Set(Some(user_id)), + notification_type: Set(notification_type_model), + related_message_id: Set(None), + related_user_id: Set(None), + related_room_id: Set(related_room_id), + title: Set(title), + content: Set(content), + metadata: Set(None), + is_read: Set(false), + is_archived: Set(false), + created_at: Set(Utc::now()), + read_at: Set(None), + expires_at: Set(None), + } + .insert(db) + .await + .map_err(|e| RoomError::Database(e))?; + + Ok(()) +} + +pub fn publish_room_event( + queue: &queue::MessageProducer, + project_id: Uuid, + event_type: crate::RoomEventType, + room_id: Option, + message_id: Option, + seq: Option, +) { + let event = ProjectRoomEvent { + event_type: event_type.as_str().into(), + project_id, + room_id, + category_id: None, + message_id, + seq, + timestamp: Utc::now(), + }; + // Fire-and-forget — caller doesn't need to await. + let queue = queue.clone(); + tokio::spawn(async move { + queue.publish_project_room_event(project_id, event).await; + }); +} diff --git a/libs/room/src/service/patterns.rs b/libs/room/src/service/patterns.rs new file mode 100644 index 0000000..01495b9 --- /dev/null +++ b/libs/room/src/service/patterns.rs @@ -0,0 +1,30 @@ +use std::sync::LazyLock; + +/// Legacy: uuid or username +static USER_MENTION_RE: LazyLock regex_lite::Regex> = + LazyLock::new(|| regex_lite::Regex::new(r"\s*([^<]+?)\s*").unwrap()); + +/// Legacy: label +static MENTION_TAG_RE: LazyLock regex_lite::Regex> = + LazyLock::new(|| { + regex_lite::Regex::new( + r#"]*>\s*([^<]*?)\s*"#, + ) + .unwrap() + }); + +/// New format: @[type:id:label] +static MENTION_BRACKET_RE: LazyLock regex_lite::Regex> = + LazyLock::new(|| regex_lite::Regex::new(r"@\[([a-z]+):([^:\]]+):([^\]]+)\]").unwrap()); + +pub fn user_mention_re() -> &'static regex_lite::Regex { + &USER_MENTION_RE +} + +pub fn mention_tag_re() -> &'static regex_lite::Regex { + &MENTION_TAG_RE +} + +pub fn mention_bracket_re() -> &'static regex_lite::Regex { + &MENTION_BRACKET_RE +} diff --git a/libs/room/src/service/sequence.rs b/libs/room/src/service/sequence.rs new file mode 100644 index 0000000..965978e --- /dev/null +++ b/libs/room/src/service/sequence.rs @@ -0,0 +1,49 @@ +use db::cache::AppCache; +use db::database::AppDatabase; +use models::rooms::room_message::{Column as RmCol, Entity as RoomMessage}; +use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect}; +use uuid::Uuid; + +use crate::error::RoomError; + +pub async fn next_room_message_seq_internal( + room_id: Uuid, + db: &AppDatabase, + cache: &AppCache, +) -> Result { + let seq_key = format!("room:seq:{}", room_id); + let mut conn = cache.conn().await.map_err(|e| { + RoomError::Internal(format!("failed to get redis connection for seq: {}", e)) + })?; + + let seq: i64 = redis::cmd("INCR") + .arg(&seq_key) + .query_async(&mut conn) + .await + .map_err(|e| RoomError::Internal(format!("seq INCR: {}", e)))?; + + // DB reconciliation: only check every 1000 messages + if seq % 1000 == 0 { + let db_seq: Option>> = RoomMessage::find() + .filter(RmCol::Room.eq(room_id)) + .select_only() + .column_as(RmCol::Seq.max(), "max_seq") + .into_tuple::>>() + .one(db) + .await? + .map(|r| r); + let db_seq = db_seq.flatten().flatten().unwrap_or(0); + + if db_seq >= seq { + let _: String = redis::cmd("SET") + .arg(&seq_key) + .arg(db_seq + 1) + .query_async(&mut conn) + .await + .map_err(|e| RoomError::Internal(format!("seq SET: {}", e)))?; + return Ok(db_seq + 1); + } + } + + Ok(seq) +} diff --git a/libs/room/src/service/workers.rs b/libs/room/src/service/workers.rs new file mode 100644 index 0000000..af7e661 --- /dev/null +++ b/libs/room/src/service/workers.rs @@ -0,0 +1,329 @@ +use std::sync::Arc; + +use chrono::Utc; +use db::cache::AppCache; +use db::database::AppDatabase; +use models::rooms::room; +use queue::{AgentTaskEvent, MessageProducer}; +use sea_orm::EntityTrait; +use uuid::Uuid; + +use crate::connection::{ + extract_get_redis, make_persist_fn, DedupCache, PersistFn, RoomConnectionManager, +}; + +/// Callback type for sending push notifications. +pub type PushNotificationFn = + Arc, Option) + Send + Sync>; + +pub async fn start_workers( + db: AppDatabase, + _cache: AppCache, + room_manager: Arc, + queue: MessageProducer, + redis_url: String, + dedup_cache: DedupCache, + _task_service: Option>, + _max_concurrent_workers: Option, + mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, +) -> anyhow::Result<()> { + let rooms: Vec = room::Entity::find().all(&db).await?; + let room_ids: Vec = rooms.iter().map(|r| r.id).collect(); + let project_ids: Vec = rooms + .iter() + .map(|r| r.project) + .collect::>() + .into_iter() + .collect(); + + let task_project_ids = project_ids.clone(); + + tracing::info!( + room_count = room_ids.len(), + project_count = project_ids.len(), + "starting room workers" + ); + + let persist_fn: PersistFn = make_persist_fn(db.clone(), room_manager.metrics.clone(), dedup_cache.clone()); + + let get_redis: Arc queue::worker::RedisFuture + Send + Sync> = + extract_get_redis(queue.clone()); + + let worker_room_ids = room_ids.clone(); + let worker_shutdown = shutdown_rx.resubscribe(); + let worker_handle = tokio::spawn({ + let get_redis = get_redis.clone(); + let persist_fn = persist_fn.clone(); + async move { + queue::start_worker(worker_room_ids, get_redis, persist_fn, worker_shutdown).await; + } + }); + + let manager = room_manager.clone(); + let redis_url_clone = redis_url.clone(); + + let mut handles: Vec<_> = room_ids + .into_iter() + .map(|room_id| { + let manager = manager.clone(); + let redis_url = redis_url_clone.clone(); + let shutdown_rx = shutdown_rx.resubscribe(); + tokio::spawn(async move { + crate::connection::subscribe_room_events( + redis_url, + manager, + room_id, + shutdown_rx, + ) + .await; + }) + }) + .collect(); + + let project_handles: Vec<_> = project_ids + .into_iter() + .map(|project_id| { + let manager = manager.clone(); + let redis_url = redis_url_clone.clone(); + let shutdown_rx = shutdown_rx.resubscribe(); + tokio::spawn(async move { + crate::connection::subscribe_project_room_events( + redis_url, + manager, + project_id, + shutdown_rx, + ) + .await; + }) + }) + .collect(); + handles.extend(project_handles); + + let task_handles: Vec<_> = task_project_ids + .into_iter() + .map(|project_id| { + let manager = manager.clone(); + let redis_url = redis_url_clone.clone(); + let shutdown_rx = shutdown_rx.resubscribe(); + tokio::spawn(async move { + crate::connection::subscribe_task_events_fn( + redis_url, + manager, + project_id, + shutdown_rx, + ) + .await; + }) + }) + .collect(); + handles.extend(task_handles); + + let cleanup_handle = { + let manager = room_manager.clone(); + let db = db.clone(); + let dedup_cache = dedup_cache.clone(); + let mut cleanup_shutdown = shutdown_rx.resubscribe(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); + interval.tick().await; + loop { + tokio::select! { + _ = interval.tick() => { + manager.cleanup_rate_limit().await; + crate::connection::cleanup_dedup_cache(&dedup_cache); + if let Ok(rooms) = room::Entity::find().all(&db).await { + let room_ids: Vec<_> = rooms.iter().map(|r| r.id).collect(); + let project_ids: Vec<_> = rooms.iter().map(|r| r.project).collect(); + manager.metrics.cleanup_stale_rooms(&room_ids).await; + manager.prune_stale_rooms(&room_ids).await; + manager.prune_stale_projects(&project_ids).await; + } + } + _ = cleanup_shutdown.recv() => { + tracing::info!("cleanup task shutting down"); + break; + } + } + } + }) + }; + handles.push(cleanup_handle); + + let _ = shutdown_rx.recv().await; + + tracing::info!("room workers shutting down"); + + for h in handles { + let _ = h.abort(); + } + let _ = worker_handle.await; + + tracing::info!("room workers stopped"); + Ok(()) +} + +pub async fn spawn_agent_task( + project_id: Uuid, + agent_type: models::agent_task::AgentType, + input: String, + task_service: Arc, + queue: MessageProducer, + room_manager: Arc, + worker_semaphore: Arc, + execute: F, +) -> anyhow::Result +where + F: FnOnce(i64, Arc) -> Fut + Send + 'static, + Fut: std::future::Future> + Send, +{ + let task = task_service + .create(project_id, input, agent_type) + .await + .map_err(|e| anyhow::anyhow!("create task failed: {}", e))?; + + let task_id = task.id; + + let started_event = AgentTaskEvent { + task_id, + project_id, + parent_id: task.parent_id, + event: "started".to_string(), + message: None, + output: None, + error: None, + status: models::agent_task::TaskStatus::Running.to_string(), + timestamp: Utc::now(), + }; + queue + .publish_agent_task_event(project_id, started_event) + .await; + + let _ = task_service.start(task_id).await; + + let queue_clone = queue.clone(); + let room_manager_clone = room_manager.clone(); + let semaphore = worker_semaphore.clone(); + + tokio::spawn(async move { + let _permit = semaphore.acquire().await.expect("semaphore closed"); + + let result = execute(task_id, task_service.clone()).await; + + let event = match result { + Ok(output) => { + let _ = task_service.complete(task_id, &output).await; + AgentTaskEvent { + task_id, + project_id, + parent_id: None, + event: "done".to_string(), + message: None, + output: Some(output), + error: None, + status: models::agent_task::TaskStatus::Done.to_string(), + timestamp: chrono::Utc::now(), + } + } + Err(err) => { + let _ = task_service.fail(task_id, &err).await; + AgentTaskEvent { + task_id, + project_id, + parent_id: None, + event: "failed".to_string(), + message: None, + output: None, + error: Some(err), + status: models::agent_task::TaskStatus::Failed.to_string(), + timestamp: chrono::Utc::now(), + } + } + }; + + queue_clone + .publish_agent_task_event(project_id, event.clone()) + .await; + room_manager_clone.broadcast_agent_task(project_id, event).await; + tracing::info!(task_id = task_id, project_id = %project_id, "agent task finished"); + }); + + Ok(task_id) +} + +pub fn spawn_room_workers( + room_id: uuid::Uuid, + db: AppDatabase, + room_manager: Arc, + queue: MessageProducer, + redis_url: String, + worker_semaphore: Arc, +) { + let persist_fn: PersistFn = make_persist_fn( + db.clone(), + room_manager.metrics.clone(), + Arc::new( + dashmap::DashMap::with_capacity_and_hasher( + 10000, + Default::default(), + ), + ), + ); + let get_redis: Arc queue::worker::RedisFuture + Send + Sync> = + extract_get_redis(queue.clone()); + let manager = room_manager.clone(); + let redis_url_clone = redis_url.clone(); + let semaphore = worker_semaphore.clone(); + + let manager2 = room_manager.clone(); + let redis_url3 = redis_url.clone(); + + tokio::spawn(async move { + let _permit = match semaphore.acquire_owned().await { + Ok(p) => p, + Err(_) => return, + }; + let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel::<()>(1); + queue::room_worker_task( + room_id, + uuid::Uuid::new_v4().to_string(), + get_redis, + persist_fn, + shutdown_rx, + ) + .await; + let _ = shutdown_tx.send(()); + }); + + tokio::spawn(async move { + let shutdown_rx = manager.register_room(room_id).await; + crate::connection::subscribe_room_events( + redis_url_clone, + manager.clone(), + room_id, + shutdown_rx, + ) + .await; + }); + + tokio::spawn(async move { + let project_id = { + let room = room::Entity::find_by_id(room_id) + .one(&db) + .await + .ok() + .flatten(); + match room { + Some(r) => r.project, + None => return, + } + }; + let shutdown_rx = manager2.register_project(project_id).await; + crate::connection::subscribe_project_room_events( + redis_url3, + manager2, + project_id, + shutdown_rx, + ) + .await; + }); +} diff --git a/libs/room/src/types.rs b/libs/room/src/types.rs index e251ad8..00a2357 100644 --- a/libs/room/src/types.rs +++ b/libs/room/src/types.rs @@ -223,6 +223,9 @@ pub struct RoomMessageResponse { pub in_reply_to: Option, pub content: String, pub content_type: String, + /// Accumulated AI reasoning/thinking text. + #[serde(skip_serializing_if = "Option::is_none")] + pub thinking_content: Option, pub edited_at: Option>, pub send_at: DateTime, pub revoked: Option>,