//! SSE endpoint for watching a chat conversation in real-time via NATS. //! //! Unlike the primary SSE stream (which triggers AI execution), this endpoint //! passively subscribes to NATS Core subjects and forwards chat messages and //! stream chunks to connected clients. This enables multiple viewers to watch //! the same AI conversation in real-time. use actix_web::{web, HttpResponse, Result}; use futures::StreamExt; use service::AppService; use std::pin::Pin; use uuid::Uuid; use crate::error::ApiError; /// SSE endpoint for watching a chat conversation. /// /// `GET /api/ai/conversations/{conversation_id}/watch` /// /// Subscribes to NATS Core subjects (`chat.chunk.{id}` and `chat.message.{id}`) /// and forwards received events as SSE to the connected client. /// /// SSE events: /// - `chunk` — a stream chunk (thinking, token, tool_call, tool_result, done, error) /// - `message` — a complete chat message /// - `error` — an error event pub fn create_watch_sse_stream( service: AppService, conversation_id: Uuid, ) -> Pin> + Send>> { let (tx, rx) = tokio::sync::mpsc::channel::(200); tokio::spawn(async move { let nats = match &service.queue_producer.nats { Some(n) => n.clone(), None => { let _ = tx.send(format!( "data: {{\"event\":\"error\",\"data\":{}}}\n\n", serde_json::to_string("NATS not available").unwrap_or_default() )).await; return; } }; // Subscribe to chat chunks let chunk_subject = format!("chat.chunk.{}", conversation_id); let mut chunk_sub = match nats.subscribe(&chunk_subject).await { Ok(s) => s, Err(e) => { let _ = tx.send(format!( "data: {{\"event\":\"error\",\"data\":{}}}\n\n", serde_json::to_string(&e.to_string()).unwrap_or_default() )).await; return; } }; // Subscribe to chat messages let msg_subject = format!("chat.message.{}", conversation_id); let mut msg_sub = match nats.subscribe(&msg_subject).await { Ok(s) => s, Err(e) => { let _ = tx.send(format!( "data: {{\"event\":\"error\",\"data\":{}}}\n\n", serde_json::to_string(&e.to_string()).unwrap_or_default() )).await; return; } }; let _ = tx.send(":ok\n\n".to_string()).await; loop { tokio::select! { chunk_msg = chunk_sub.next() => { match chunk_msg { Some(msg) => { let payload = String::from_utf8_lossy(&msg.payload); // Parse to get chunk_type for the event field let event_type = if let Ok(parsed) = serde_json::from_str::(&payload) { parsed.get("chunk_type") .and_then(|v| v.as_str()) .unwrap_or("chunk") .to_string() } else { "chunk".to_string() }; let sse = format!( "data: {{\"event\":\"{}\",\"data\":{}}}\n\n", event_type, payload ); if tx.send(sse).await.is_err() { break; } } None => break, } } msg = msg_sub.next() => { match msg { Some(msg) => { let payload = String::from_utf8_lossy(&msg.payload); let sse = format!( "data: {{\"event\":\"message\",\"data\":{}}}\n\n", payload ); if tx.send(sse).await.is_err() { break; } } None => break, } } } } }); Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx).map(|s| { Ok(actix_web::web::Bytes::from(s)) })) } #[utoipa::path( get, path = "/api/ai/conversations/{conversation_id}/watch", params( ("conversation_id" = Uuid, Path, description = "Conversation ID"), ), responses( (status = 200, description = "SSE stream of conversation events"), (status = 404, description = "Not found"), ), tag = "AI Chat" )] pub async fn conversation_watch( service: web::Data, session: session::Session, path: web::Path, ) -> Result { let user_id = session.user().ok_or_else(|| ApiError::from(service::error::AppError::Unauthorized))?; let conversation_id = path.into_inner(); // Verify access (view-only is sufficient) let _conv = service .find_conversation_owned(conversation_id, user_id) .await?; let response = HttpResponse::Ok() .content_type("text/event-stream") .insert_header(("Cache-Control", "no-cache")) .insert_header(("X-Accel-Buffering", "no")) .streaming(create_watch_sse_stream( service.get_ref().clone(), conversation_id, )); Ok(response.into()) }