//! SSE endpoint for watching a specific sub-agent's stream output. //! //! `GET /api/ai/subagent/{conversation_id}/{children_id}/stream` //! //! Prefers Redis PubSub for low-latency live delivery and falls back to NATS //! if Redis is unavailable. The subject/channel is //! `chat.subagent.chunk.{conversation_id}.{children_id}`. use actix_web::{HttpResponse, Result, web}; use futures::StreamExt; use models::ai::ai_subagent_session; use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QueryOrder}; use service::AppService; use uuid::Uuid; use crate::error::ApiError; async fn find_subagent_session( service: &AppService, conversation_id: Uuid, children_id: &str, ) -> Option { ai_subagent_session::Entity::find() .filter(ai_subagent_session::Column::ConversationId.eq(conversation_id)) .filter(ai_subagent_session::Column::ChildrenId.eq(children_id)) .order_by_desc(ai_subagent_session::Column::CreatedAt) .one(service.db.reader()) .await .ok() .flatten() } fn terminal_event_for_status(status: &str) -> &'static str { match status { "stopped" | "cancelled" => "stopped", "error" => "error", _ => "done", } } async fn send_session_snapshot( tx: &tokio::sync::mpsc::Sender, session: &ai_subagent_session::Model, include_output: bool, ) -> bool { if include_output && !session.output.is_empty() { let payload = serde_json::json!({ "event": "token", "data": { "content": session.output, "children_id": session.children_id, }, }); if tx.send(format!("data: {}\n\n", payload)).await.is_err() { return false; } } let event = terminal_event_for_status(&session.status); let payload = serde_json::json!({ "event": event, "data": { "content": "", "children_id": session.children_id, "error": session.error_message, }, }); tx.send(format!("data: {}\n\n", payload)).await.is_ok() } /// Create an SSE stream for watching a sub-agent's stream output from Redis PubSub. pub fn create_subagent_sse_stream_redis( service: AppService, conversation_id: Uuid, children_id: String, ) -> std::pin::Pin< Box> + Send>, > { let (tx, rx) = tokio::sync::mpsc::channel::(200); tokio::spawn(async move { if let Some(session) = find_subagent_session(&service, conversation_id, &children_id).await { let _ = send_session_snapshot(&tx, &session, true).await; return; } let redis_url = match service.config.redis_url() { Ok(url) => url, Err(e) => { let _ = tx .send(format!( "data: {{\"event\":\"error\",\"data\":{}}}\n\n", serde_json::to_string(&e.to_string()).unwrap_or_default() )) .await; return; } }; let client = match redis::Client::open(redis_url) { Ok(client) => client, Err(e) => { let _ = tx .send(format!( "data: {{\"event\":\"error\",\"data\":{}}}\n\n", serde_json::to_string(&e.to_string()).unwrap_or_default() )) .await; return; } }; let pubsub = match client.get_async_pubsub().await { Ok(pubsub) => pubsub, Err(e) => { let _ = tx .send(format!( "data: {{\"event\":\"error\",\"data\":{}}}\n\n", serde_json::to_string(&e.to_string()).unwrap_or_default() )) .await; return; } }; let (mut sink, mut stream) = pubsub.split(); let channel = format!("chat.subagent.chunk.{}.{}", conversation_id, children_id); if let Err(e) = sink.subscribe(channel.as_str()).await { let _ = tx .send(format!( "data: {{\"event\":\"error\",\"data\":{}}}\n\n", serde_json::to_string(&e.to_string()).unwrap_or_default() )) .await; return; } let _ = tx.send(":ok\n\n".to_string()).await; let mut session_poll = tokio::time::interval(std::time::Duration::from_millis(500)); let stream_started = tokio::time::Instant::now(); let mut sent_content = false; loop { tokio::select! { Some(msg) = stream.next() => { let payload: String = match msg.get_payload() { Ok(v) => v, Err(e) => { let _ = tx.send(format!("data: {{\"event\":\"error\",\"data\":{}}}\n\n", serde_json::to_string(&e.to_string()).unwrap_or_default())).await; break; } }; let event_type = if let Ok(parsed) = serde_json::from_str::(&payload) { parsed .get("chunk_type") .and_then(|v| v.as_str()) .unwrap_or("chunk") .to_string() } else { "chunk".to_string() }; if matches!(event_type.as_str(), "token" | "thinking") { sent_content = true; } let sse = format!("data: {{\"event\":\"{}\",\"data\":{}}}\n\n", event_type, payload); if tx.send(sse).await.is_err() { break; } if matches!(event_type.as_str(), "done" | "stopped" | "error") { break; } } _ = session_poll.tick() => { if let Some(session) = find_subagent_session(&service, conversation_id, &children_id).await { let _ = send_session_snapshot(&tx, &session, !sent_content).await; break; } if stream_started.elapsed() > std::time::Duration::from_secs(90) { let payload = serde_json::json!({ "event": "error", "data": { "content": "sub-agent stream timed out waiting for terminal state", "children_id": children_id, }, }); let _ = tx.send(format!("data: {}\n\n", payload)).await; break; } } } } }); Box::pin( tokio_stream::wrappers::ReceiverStream::new(rx).map(|s| Ok(actix_web::web::Bytes::from(s))), ) } /// Create an SSE stream for watching a sub-agent's stream output via NATS fallback. pub fn create_subagent_sse_stream_nats( service: AppService, conversation_id: Uuid, children_id: String, ) -> std::pin::Pin< Box> + Send>, > { let (tx, rx) = tokio::sync::mpsc::channel::(200); tokio::spawn(async move { let nats = match &service.queue_producer.nats { Some(n) => n.clone(), None => { let _ = tx .send(format!( "data: {{\"event\":\"error\",\"data\":{}}}\n\n", serde_json::to_string("NATS not available").unwrap_or_default() )) .await; return; } }; let subject = format!("chat.subagent.chunk.{}.{}", conversation_id, children_id); let mut sub = match nats.subscribe(&subject).await { Ok(s) => s, Err(e) => { let _ = tx .send(format!( "data: {{\"event\":\"error\",\"data\":{}}}\n\n", serde_json::to_string(&e.to_string()).unwrap_or_default() )) .await; return; } }; let _ = tx.send(":ok\n\n".to_string()).await; loop { match sub.next().await { Some(msg) => { let payload = String::from_utf8_lossy(&msg.payload); let event_type = if let Ok(parsed) = serde_json::from_str::(&payload) { parsed .get("chunk_type") .and_then(|v| v.as_str()) .unwrap_or("chunk") .to_string() } else { "chunk".to_string() }; let sse = format!( "data: {{\"event\":\"{}\",\"data\":{}}}\n\n", event_type, payload ); if tx.send(sse).await.is_err() { break; } } None => break, } } }); Box::pin( tokio_stream::wrappers::ReceiverStream::new(rx).map(|s| Ok(actix_web::web::Bytes::from(s))), ) } /// SSE endpoint for watching a sub-agent's stream output. /// /// `GET /api/ai/subagent/{conversation_id}/{children_id}/stream` #[utoipa::path( get, path = "/api/ai/subagent/{conversation_id}/{children_id}/stream", params( ("conversation_id" = Uuid, Path, description = "Conversation ID"), ("children_id" = String, Path, description = "Sub-agent children ID"), ), responses( (status = 200, description = "SSE stream of sub-agent events"), (status = 404, description = "Not found"), ), tag = "AI Chat" )] pub async fn subagent_stream_watch( service: web::Data, session: session::Session, path: web::Path<(Uuid, String)>, ) -> Result { let user_id = session .user() .ok_or_else(|| ApiError::from(service::error::AppError::Unauthorized))?; let (conversation_id, children_id) = path.into_inner(); // Verify access to the conversation let _conv = service .find_conversation_owned(conversation_id, user_id) .await?; let redis_stream = create_subagent_sse_stream_redis( service.get_ref().clone(), conversation_id, children_id.clone(), ); let response = HttpResponse::Ok() .content_type("text/event-stream") .insert_header(("Cache-Control", "no-cache")) .insert_header(("X-Accel-Buffering", "no")) .streaming(redis_stream); Ok(response.into()) } /// NATS fallback retained for deployments where Redis PubSub is unavailable. pub async fn subagent_stream_watch_nats( service: web::Data, session: session::Session, path: web::Path<(Uuid, String)>, ) -> Result { let user_id = session .user() .ok_or_else(|| ApiError::from(service::error::AppError::Unauthorized))?; let (conversation_id, children_id) = path.into_inner(); let _conv = service .find_conversation_owned(conversation_id, user_id) .await?; let response = HttpResponse::Ok() .content_type("text/event-stream") .insert_header(("Cache-Control", "no-cache")) .insert_header(("X-Accel-Buffering", "no")) .streaming(create_subagent_sse_stream_nats( service.get_ref().clone(), conversation_id, children_id, )); Ok(response.into()) } #[utoipa::path( post, path = "/api/ai/subagent/{conversation_id}/{children_id}/stop", params( ("conversation_id" = Uuid, Path, description = "Conversation ID"), ("children_id" = String, Path, description = "Sub-agent children ID"), ), responses( (status = 200, description = "Sub-agent stop requested"), (status = 404, description = "Not found"), ), tag = "AI Chat" )] pub async fn subagent_stop( service: web::Data, session: session::Session, path: web::Path<(Uuid, String)>, ) -> Result { let user_id = session .user() .ok_or_else(|| ApiError::from(service::error::AppError::Unauthorized))?; let (conversation_id, children_id) = path.into_inner(); let _conv = service .find_conversation_full_access(conversation_id, user_id) .await?; service .cache .set_sub_agent_cancelled(conversation_id, &children_id) .await; Ok(crate::api_success()) }