use std::sync::Arc; use chrono::Utc; use db::cache::AppCache; use db::database::AppDatabase; use models::rooms::room; use queue::{AgentTaskEvent, MessageProducer}; use sea_orm::EntityTrait; use uuid::Uuid; use crate::connection::{ extract_get_redis, make_persist_fn, DedupCache, PersistFn, RoomConnectionManager, }; /// Callback type for sending push notifications. pub type PushNotificationFn = Arc, Option) + Send + Sync>; pub async fn start_workers( db: AppDatabase, _cache: AppCache, room_manager: Arc, queue: MessageProducer, redis_url: String, dedup_cache: DedupCache, _task_service: Option>, _max_concurrent_workers: Option, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, ) -> anyhow::Result<()> { let rooms: Vec = room::Entity::find().all(&db).await?; let room_ids: Vec = rooms.iter().map(|r| r.id).collect(); let project_ids: Vec = rooms .iter() .map(|r| r.project) .collect::>() .into_iter() .collect(); let task_project_ids = project_ids.clone(); tracing::info!( room_count = room_ids.len(), project_count = project_ids.len(), "starting room workers" ); let persist_fn: PersistFn = make_persist_fn(db.clone(), room_manager.metrics.clone(), dedup_cache.clone()); let get_redis: Arc queue::worker::RedisFuture + Send + Sync> = extract_get_redis(queue.clone()); let worker_room_ids = room_ids.clone(); let worker_shutdown = shutdown_rx.resubscribe(); let worker_handle = tokio::spawn({ let get_redis = get_redis.clone(); let persist_fn = persist_fn.clone(); async move { queue::start_worker(worker_room_ids, get_redis, persist_fn, worker_shutdown).await; } }); let manager = room_manager.clone(); let redis_url_clone = redis_url.clone(); let mut handles: Vec<_> = room_ids .into_iter() .map(|room_id| { let manager = manager.clone(); let redis_url = redis_url_clone.clone(); let shutdown_rx = shutdown_rx.resubscribe(); tokio::spawn(async move { crate::connection::subscribe_room_events( redis_url, manager, room_id, shutdown_rx, ) .await; }) }) .collect(); let project_handles: Vec<_> = project_ids .into_iter() .map(|project_id| { let manager = manager.clone(); let redis_url = redis_url_clone.clone(); let shutdown_rx = shutdown_rx.resubscribe(); tokio::spawn(async move { crate::connection::subscribe_project_room_events( redis_url, manager, project_id, shutdown_rx, ) .await; }) }) .collect(); handles.extend(project_handles); let task_handles: Vec<_> = task_project_ids .into_iter() .map(|project_id| { let manager = manager.clone(); let redis_url = redis_url_clone.clone(); let shutdown_rx = shutdown_rx.resubscribe(); tokio::spawn(async move { crate::connection::subscribe_task_events_fn( redis_url, manager, project_id, shutdown_rx, ) .await; }) }) .collect(); handles.extend(task_handles); let cleanup_handle = { let manager = room_manager.clone(); let db = db.clone(); let dedup_cache = dedup_cache.clone(); let mut cleanup_shutdown = shutdown_rx.resubscribe(); tokio::spawn(async move { let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); interval.tick().await; loop { tokio::select! { _ = interval.tick() => { manager.cleanup_rate_limit().await; crate::connection::cleanup_dedup_cache(&dedup_cache); if let Ok(rooms) = room::Entity::find().all(&db).await { let room_ids: Vec<_> = rooms.iter().map(|r| r.id).collect(); let project_ids: Vec<_> = rooms.iter().map(|r| r.project).collect(); manager.metrics.cleanup_stale_rooms(&room_ids).await; manager.prune_stale_rooms(&room_ids).await; manager.prune_stale_projects(&project_ids).await; } } _ = cleanup_shutdown.recv() => { tracing::info!("cleanup task shutting down"); break; } } } }) }; handles.push(cleanup_handle); let _ = shutdown_rx.recv().await; tracing::info!("room workers shutting down"); for h in handles { let _ = h.abort(); } let _ = worker_handle.await; tracing::info!("room workers stopped"); Ok(()) } pub async fn spawn_agent_task( project_id: Uuid, agent_type: models::agent_task::AgentType, input: String, task_service: Arc, queue: MessageProducer, room_manager: Arc, worker_semaphore: Arc, execute: F, ) -> anyhow::Result where F: FnOnce(i64, Arc) -> Fut + Send + 'static, Fut: std::future::Future> + Send, { let task = task_service .create(project_id, input, agent_type) .await .map_err(|e| anyhow::anyhow!("create task failed: {}", e))?; let task_id = task.id; let started_event = AgentTaskEvent { task_id, project_id, parent_id: task.parent_id, event: "started".to_string(), message: None, output: None, error: None, status: models::agent_task::TaskStatus::Running.to_string(), timestamp: Utc::now(), }; queue .publish_agent_task_event(project_id, started_event) .await; if let Err(e) = task_service.start(task_id).await { tracing::warn!(error = %e, task_id = %task_id, "AI task start failed"); } let queue_clone = queue.clone(); let room_manager_clone = room_manager.clone(); let semaphore = worker_semaphore.clone(); tokio::spawn(async move { let _permit = semaphore.acquire().await.expect("semaphore closed"); let result = execute(task_id, task_service.clone()).await; let event = match result { Ok(output) => { if let Err(e) = task_service.complete(task_id, &output).await { tracing::warn!(error = %e, task_id = %task_id, "AI task complete failed"); } AgentTaskEvent { task_id, project_id, parent_id: None, event: "done".to_string(), message: None, output: Some(output), error: None, status: models::agent_task::TaskStatus::Done.to_string(), timestamp: chrono::Utc::now(), } } Err(err) => { if let Err(e) = task_service.fail(task_id, &err).await { tracing::warn!(error = %e, task_id = %task_id, "AI task fail failed"); } AgentTaskEvent { task_id, project_id, parent_id: None, event: "failed".to_string(), message: None, output: None, error: Some(err), status: models::agent_task::TaskStatus::Failed.to_string(), timestamp: chrono::Utc::now(), } } }; queue_clone .publish_agent_task_event(project_id, event.clone()) .await; room_manager_clone.broadcast_agent_task(project_id, event).await; tracing::info!(task_id = task_id, project_id = %project_id, "agent task finished"); }); Ok(task_id) } pub fn spawn_room_workers( room_id: uuid::Uuid, db: AppDatabase, room_manager: Arc, queue: MessageProducer, redis_url: String, worker_semaphore: Arc, ) { let persist_fn: PersistFn = make_persist_fn( db.clone(), room_manager.metrics.clone(), Arc::new( dashmap::DashMap::with_capacity_and_hasher( 10000, Default::default(), ), ), ); let get_redis: Arc queue::worker::RedisFuture + Send + Sync> = extract_get_redis(queue.clone()); let manager1 = room_manager.clone(); let manager2 = room_manager.clone(); let manager3 = room_manager.clone(); let redis_url_clone = redis_url.clone(); let redis_url3 = redis_url.clone(); let semaphore = worker_semaphore.clone(); tokio::spawn(async move { let _permit = match semaphore.acquire_owned().await { Ok(p) => p, Err(_) => return, }; // Use the manager's room shutdown channel so the worker terminates // when the room is deleted (shutdown_room sends on room_shutdown_txs). let shutdown_rx = manager1.register_room(room_id).await; queue::room_worker_task( room_id, uuid::Uuid::new_v4().to_string(), get_redis, persist_fn, shutdown_rx, ) .await; }); tokio::spawn(async move { let shutdown_rx = manager2.register_room(room_id).await; crate::connection::subscribe_room_events( redis_url_clone, manager2, room_id, shutdown_rx, ) .await; }); tokio::spawn(async move { let project_id = { let room = room::Entity::find_by_id(room_id) .one(&db) .await .ok() .flatten(); match room { Some(r) => r.project, None => return, } }; let shutdown_rx = manager3.register_project(project_id).await; crate::connection::subscribe_project_room_events( redis_url3, manager3, project_id, shutdown_rx, ) .await; }); }