gitdataai/libs/api/chat/subagent.rs

377 lines
13 KiB
Rust

//! SSE endpoint for watching a specific sub-agent's stream output.
//!
//! `GET /api/ai/subagent/{conversation_id}/{children_id}/stream`
//!
//! Prefers Redis PubSub for low-latency live delivery and falls back to NATS
//! if Redis is unavailable. The subject/channel is
//! `chat.subagent.chunk.{conversation_id}.{children_id}`.
use actix_web::{HttpResponse, Result, web};
use futures::StreamExt;
use models::ai::ai_subagent_session;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QueryOrder};
use service::AppService;
use uuid::Uuid;
use crate::error::ApiError;
async fn find_subagent_session(
service: &AppService,
conversation_id: Uuid,
children_id: &str,
) -> Option<ai_subagent_session::Model> {
ai_subagent_session::Entity::find()
.filter(ai_subagent_session::Column::ConversationId.eq(conversation_id))
.filter(ai_subagent_session::Column::ChildrenId.eq(children_id))
.order_by_desc(ai_subagent_session::Column::CreatedAt)
.one(service.db.reader())
.await
.ok()
.flatten()
}
fn terminal_event_for_status(status: &str) -> &'static str {
match status {
"stopped" | "cancelled" => "stopped",
"error" => "error",
_ => "done",
}
}
async fn send_session_snapshot(
tx: &tokio::sync::mpsc::Sender<String>,
session: &ai_subagent_session::Model,
include_output: bool,
) -> bool {
if include_output && !session.output.is_empty() {
let payload = serde_json::json!({
"event": "token",
"data": {
"content": session.output,
"children_id": session.children_id,
},
});
if tx.send(format!("data: {}\n\n", payload)).await.is_err() {
return false;
}
}
let event = terminal_event_for_status(&session.status);
let payload = serde_json::json!({
"event": event,
"data": {
"content": "",
"children_id": session.children_id,
"error": session.error_message,
},
});
tx.send(format!("data: {}\n\n", payload)).await.is_ok()
}
/// Create an SSE stream for watching a sub-agent's stream output from Redis PubSub.
pub fn create_subagent_sse_stream_redis(
service: AppService,
conversation_id: Uuid,
children_id: String,
) -> std::pin::Pin<
Box<dyn futures::Stream<Item = Result<actix_web::web::Bytes, actix_web::Error>> + Send>,
> {
let (tx, rx) = tokio::sync::mpsc::channel::<String>(200);
tokio::spawn(async move {
if let Some(session) = find_subagent_session(&service, conversation_id, &children_id).await
{
let _ = send_session_snapshot(&tx, &session, true).await;
return;
}
let redis_url = match service.config.redis_url() {
Ok(url) => url,
Err(e) => {
let _ = tx
.send(format!(
"data: {{\"event\":\"error\",\"data\":{}}}\n\n",
serde_json::to_string(&e.to_string()).unwrap_or_default()
))
.await;
return;
}
};
let client = match redis::Client::open(redis_url) {
Ok(client) => client,
Err(e) => {
let _ = tx
.send(format!(
"data: {{\"event\":\"error\",\"data\":{}}}\n\n",
serde_json::to_string(&e.to_string()).unwrap_or_default()
))
.await;
return;
}
};
let pubsub = match client.get_async_pubsub().await {
Ok(pubsub) => pubsub,
Err(e) => {
let _ = tx
.send(format!(
"data: {{\"event\":\"error\",\"data\":{}}}\n\n",
serde_json::to_string(&e.to_string()).unwrap_or_default()
))
.await;
return;
}
};
let (mut sink, mut stream) = pubsub.split();
let channel = format!("chat.subagent.chunk.{}.{}", conversation_id, children_id);
if let Err(e) = sink.subscribe(channel.as_str()).await {
let _ = tx
.send(format!(
"data: {{\"event\":\"error\",\"data\":{}}}\n\n",
serde_json::to_string(&e.to_string()).unwrap_or_default()
))
.await;
return;
}
let _ = tx.send(":ok\n\n".to_string()).await;
let mut session_poll = tokio::time::interval(std::time::Duration::from_millis(500));
let stream_started = tokio::time::Instant::now();
let mut sent_content = false;
loop {
tokio::select! {
Some(msg) = stream.next() => {
let payload: String = match msg.get_payload() {
Ok(v) => v,
Err(e) => {
let _ = tx.send(format!("data: {{\"event\":\"error\",\"data\":{}}}\n\n", serde_json::to_string(&e.to_string()).unwrap_or_default())).await;
break;
}
};
let event_type = if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&payload) {
parsed
.get("chunk_type")
.and_then(|v| v.as_str())
.unwrap_or("chunk")
.to_string()
} else {
"chunk".to_string()
};
if matches!(event_type.as_str(), "token" | "thinking") {
sent_content = true;
}
let sse = format!("data: {{\"event\":\"{}\",\"data\":{}}}\n\n", event_type, payload);
if tx.send(sse).await.is_err() {
break;
}
if matches!(event_type.as_str(), "done" | "stopped" | "error") {
break;
}
}
_ = session_poll.tick() => {
if let Some(session) = find_subagent_session(&service, conversation_id, &children_id).await {
let _ = send_session_snapshot(&tx, &session, !sent_content).await;
break;
}
if stream_started.elapsed() > std::time::Duration::from_secs(90) {
let payload = serde_json::json!({
"event": "error",
"data": {
"content": "sub-agent stream timed out waiting for terminal state",
"children_id": children_id,
},
});
let _ = tx.send(format!("data: {}\n\n", payload)).await;
break;
}
}
}
}
});
Box::pin(
tokio_stream::wrappers::ReceiverStream::new(rx).map(|s| Ok(actix_web::web::Bytes::from(s))),
)
}
/// Create an SSE stream for watching a sub-agent's stream output via NATS fallback.
pub fn create_subagent_sse_stream_nats(
service: AppService,
conversation_id: Uuid,
children_id: String,
) -> std::pin::Pin<
Box<dyn futures::Stream<Item = Result<actix_web::web::Bytes, actix_web::Error>> + Send>,
> {
let (tx, rx) = tokio::sync::mpsc::channel::<String>(200);
tokio::spawn(async move {
let nats = match &service.queue_producer.nats {
Some(n) => n.clone(),
None => {
let _ = tx
.send(format!(
"data: {{\"event\":\"error\",\"data\":{}}}\n\n",
serde_json::to_string("NATS not available").unwrap_or_default()
))
.await;
return;
}
};
let subject = format!("chat.subagent.chunk.{}.{}", conversation_id, children_id);
let mut sub = match nats.subscribe(&subject).await {
Ok(s) => s,
Err(e) => {
let _ = tx
.send(format!(
"data: {{\"event\":\"error\",\"data\":{}}}\n\n",
serde_json::to_string(&e.to_string()).unwrap_or_default()
))
.await;
return;
}
};
let _ = tx.send(":ok\n\n".to_string()).await;
loop {
match sub.next().await {
Some(msg) => {
let payload = String::from_utf8_lossy(&msg.payload);
let event_type =
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&payload) {
parsed
.get("chunk_type")
.and_then(|v| v.as_str())
.unwrap_or("chunk")
.to_string()
} else {
"chunk".to_string()
};
let sse = format!(
"data: {{\"event\":\"{}\",\"data\":{}}}\n\n",
event_type, payload
);
if tx.send(sse).await.is_err() {
break;
}
}
None => break,
}
}
});
Box::pin(
tokio_stream::wrappers::ReceiverStream::new(rx).map(|s| Ok(actix_web::web::Bytes::from(s))),
)
}
/// SSE endpoint for watching a sub-agent's stream output.
///
/// `GET /api/ai/subagent/{conversation_id}/{children_id}/stream`
#[utoipa::path(
get,
path = "/api/ai/subagent/{conversation_id}/{children_id}/stream",
params(
("conversation_id" = Uuid, Path, description = "Conversation ID"),
("children_id" = String, Path, description = "Sub-agent children ID"),
),
responses(
(status = 200, description = "SSE stream of sub-agent events"),
(status = 404, description = "Not found"),
),
tag = "AI Chat"
)]
pub async fn subagent_stream_watch(
service: web::Data<AppService>,
session: session::Session,
path: web::Path<(Uuid, String)>,
) -> Result<HttpResponse, ApiError> {
let user_id = session
.user()
.ok_or_else(|| ApiError::from(service::error::AppError::Unauthorized))?;
let (conversation_id, children_id) = path.into_inner();
// Verify access to the conversation
let _conv = service
.find_conversation_owned(conversation_id, user_id)
.await?;
let redis_stream = create_subagent_sse_stream_redis(
service.get_ref().clone(),
conversation_id,
children_id.clone(),
);
let response = HttpResponse::Ok()
.content_type("text/event-stream")
.insert_header(("Cache-Control", "no-cache"))
.insert_header(("X-Accel-Buffering", "no"))
.streaming(redis_stream);
Ok(response.into())
}
/// NATS fallback retained for deployments where Redis PubSub is unavailable.
pub async fn subagent_stream_watch_nats(
service: web::Data<AppService>,
session: session::Session,
path: web::Path<(Uuid, String)>,
) -> Result<HttpResponse, ApiError> {
let user_id = session
.user()
.ok_or_else(|| ApiError::from(service::error::AppError::Unauthorized))?;
let (conversation_id, children_id) = path.into_inner();
let _conv = service
.find_conversation_owned(conversation_id, user_id)
.await?;
let response = HttpResponse::Ok()
.content_type("text/event-stream")
.insert_header(("Cache-Control", "no-cache"))
.insert_header(("X-Accel-Buffering", "no"))
.streaming(create_subagent_sse_stream_nats(
service.get_ref().clone(),
conversation_id,
children_id,
));
Ok(response.into())
}
#[utoipa::path(
post,
path = "/api/ai/subagent/{conversation_id}/{children_id}/stop",
params(
("conversation_id" = Uuid, Path, description = "Conversation ID"),
("children_id" = String, Path, description = "Sub-agent children ID"),
),
responses(
(status = 200, description = "Sub-agent stop requested"),
(status = 404, description = "Not found"),
),
tag = "AI Chat"
)]
pub async fn subagent_stop(
service: web::Data<AppService>,
session: session::Session,
path: web::Path<(Uuid, String)>,
) -> Result<HttpResponse, ApiError> {
let user_id = session
.user()
.ok_or_else(|| ApiError::from(service::error::AppError::Unauthorized))?;
let (conversation_id, children_id) = path.into_inner();
let _conv = service
.find_conversation_full_access(conversation_id, user_id)
.await?;
service
.cache
.set_sub_agent_cancelled(conversation_id, &children_id)
.await;
Ok(crate::api_success())
}