use chrono::Utc; use dashmap::DashSet; use db::database::AppDatabase; use models::rooms::room; use queue::{AgentTaskEvent, MessageProducer}; use sea_orm::EntityTrait; use std::sync::Arc; use std::sync::OnceLock; use uuid::Uuid; use crate::connection::{make_persist_fn, RoomConnectionManager}; /// Tracks rooms for which NATS subscriptions have already been spawned. static SPAWNED_ROOMS: OnceLock> = OnceLock::new(); /// Remove a room from the spawned set (called on room shutdown). pub fn unmark_room_spawned(room_id: Uuid) { if let Some(set) = SPAWNED_ROOMS.get() { set.remove(&room_id); } } pub async fn spawn_agent_task( project_id: Uuid, agent_type: models::agent_task::AgentType, input: String, task_service: Arc, queue: MessageProducer, room_manager: Arc, worker_semaphore: Arc, execute: F, ) -> anyhow::Result where F: FnOnce(i64, Arc) -> Fut + Send + 'static, Fut: std::future::Future> + Send, { let task = task_service .create(project_id, input, agent_type) .await .map_err(|e| anyhow::anyhow!("create task failed: {}", e))?; let task_id = task.id; let started_event = AgentTaskEvent { task_id, project_id, parent_id: task.parent_id, event: "started".to_string(), message: None, output: None, error: None, status: models::agent_task::TaskStatus::Running.to_string(), timestamp: Utc::now(), }; queue .publish_agent_task_event(project_id, started_event) .await; if let Err(e) = task_service.start(task_id).await { tracing::warn!(error = %e, task_id = %task_id, "AI task start failed"); } let queue_clone = queue.clone(); let room_manager_clone = room_manager.clone(); let semaphore = worker_semaphore.clone(); tokio::spawn(async move { let Ok(_permit) = semaphore.acquire().await else { tracing::warn!(task_id = %task_id, "semaphore closed, skipping task"); return; }; let result = execute(task_id, task_service.clone()).await; let event = match result { Ok(output) => { if let Err(e) = task_service.complete(task_id, &output).await { tracing::warn!(error = %e, task_id = %task_id, "AI task complete failed"); } AgentTaskEvent { task_id, project_id, parent_id: None, event: "done".to_string(), message: None, output: Some(output), error: None, status: models::agent_task::TaskStatus::Done.to_string(), timestamp: chrono::Utc::now(), } } Err(err) => { if let Err(e) = task_service.fail(task_id, &err).await { tracing::warn!(error = %e, task_id = %task_id, "AI task fail failed"); } AgentTaskEvent { task_id, project_id, parent_id: None, event: "failed".to_string(), message: None, output: None, error: Some(err), status: models::agent_task::TaskStatus::Failed.to_string(), timestamp: chrono::Utc::now(), } } }; queue_clone .publish_agent_task_event(project_id, event.clone()) .await; room_manager_clone .broadcast_agent_task(project_id, event) .await; tracing::info!(task_id = task_id, project_id = %project_id, "agent task finished"); }); Ok(task_id) } /// Spawns per-room workers: NATS subscribe tasks + optional JetStream consumer. /// Replaces the old Redis Streams + Redis Pub/Sub workers. pub fn spawn_room_workers( room_id: uuid::Uuid, db: AppDatabase, room_manager: Arc, _queue: MessageProducer, nats: Option>, worker_semaphore: Arc, embed_service: Option>, ) { if !SPAWNED_ROOMS.get_or_init(DashSet::new).insert(room_id) { return; } let dedup_cache = Arc::new(dashmap::DashMap::with_capacity_and_hasher( 10000, Default::default(), )); let persist_fn = make_persist_fn( db.clone(), room_manager.metrics.clone(), dedup_cache, embed_service.clone(), ); let db1 = db.clone(); let db2 = db.clone(); let manager1 = room_manager.clone(); let manager2 = room_manager.clone(); let manager3 = room_manager.clone(); let manager4 = room_manager.clone(); let manager5 = room_manager.clone(); let semaphore = worker_semaphore.clone(); // JetStream consumer for message persistence (only if NATS available) if let Some(nats) = nats.clone() { let nats_consumer = nats.clone(); let persist_fn = persist_fn.clone(); tokio::spawn(async move { let Ok(_permit) = semaphore.acquire_owned().await else { return; }; let shutdown_rx = manager1.register_room(room_id).await; queue::room_worker_task(room_id, nats_consumer, persist_fn, shutdown_rx).await; }); } // Room message broadcast subscriber if let Some(nats) = nats.clone() { tokio::spawn(async move { let shutdown_rx = manager2.register_room(room_id).await; crate::connection::subscribe_room_events(nats, manager2, room_id, shutdown_rx).await; }); } // Project-level event subscriber if let Some(nats) = nats.clone() { tokio::spawn(async move { let project_id = match room::Entity::find_by_id(room_id) .one(&db1) .await .ok() .flatten() { Some(r) => r.project, None => return, }; let shutdown_rx = manager3.register_project(project_id).await; crate::connection::subscribe_project_room_events( nats, manager3, project_id, shutdown_rx, ) .await; }); } // Agent task event subscriber if let Some(nats) = nats.clone() { tokio::spawn(async move { let project_id = match room::Entity::find_by_id(room_id) .one(&db2) .await .ok() .flatten() { Some(r) => r.project, None => return, }; let shutdown_rx = manager5.register_project(project_id).await; crate::connection::subscribe_task_events_fn(nats, manager5, project_id, shutdown_rx) .await; }); } // Stream chunk subscriber if let Some(nats) = nats { tokio::spawn(async move { let shutdown_rx = manager4.register_room(room_id).await; crate::connection::subscribe_room_stream_chunk_events( nats, manager4, room_id, shutdown_rx, ) .await; }); } }