- Add seq: u64 to RoomMessageStreamChunkEvent - Frontend sorts by seq on insert for ordered replay - Initial event now includes seq: 0
282 lines
12 KiB
Rust
282 lines
12 KiB
Rust
use std::pin::Pin;
|
|
use std::sync::Arc;
|
|
|
|
use chrono::Utc;
|
|
use db::cache::AppCache;
|
|
use db::database::AppDatabase;
|
|
use models::rooms::room_ai;
|
|
use queue::{MessageProducer, ProjectRoomEvent, RoomMessageEnvelope};
|
|
use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, ExprTrait, QueryFilter};
|
|
use uuid::Uuid;
|
|
|
|
use super::sequence::next_room_message_seq_internal;
|
|
use crate::connection::RoomConnectionManager;
|
|
use agent::chat::{AiRequest, ChatService};
|
|
|
|
pub async fn process_message_ai_streaming(
|
|
chat_service: Arc<ChatService>,
|
|
request: AiRequest,
|
|
room_id: Uuid,
|
|
project_id: Uuid,
|
|
model_id: Uuid,
|
|
lock_guard: crate::room_ai_queue::RoomAiLockGuard,
|
|
db: AppDatabase,
|
|
cache: AppCache,
|
|
queue: MessageProducer,
|
|
room_manager: Arc<RoomConnectionManager>,
|
|
) {
|
|
use queue::RoomMessageStreamChunkEvent;
|
|
|
|
let streaming_msg_id = Uuid::now_v7();
|
|
let seq = match next_room_message_seq_internal(room_id, &db, &cache).await {
|
|
Ok(s) => s,
|
|
Err(e) => {
|
|
tracing::error!(error = %e, "Failed to get seq for streaming AI message");
|
|
return;
|
|
}
|
|
};
|
|
|
|
let _ = room_manager
|
|
.register_stream_channel(streaming_msg_id)
|
|
.await;
|
|
|
|
let initial_event = RoomMessageStreamChunkEvent {
|
|
message_id: streaming_msg_id,
|
|
room_id,
|
|
seq: 0,
|
|
content: String::new(),
|
|
done: false,
|
|
error: None,
|
|
display_name: Some(request.model.name.clone()),
|
|
chunk_type: Some("thinking".to_string()),
|
|
};
|
|
room_manager.broadcast_stream_chunk(initial_event).await;
|
|
|
|
let room_id_inner = room_id;
|
|
let project_id_inner = project_id;
|
|
let now = Utc::now();
|
|
let sender_type = "ai".to_string();
|
|
let ai_display_name = request.model.name.clone();
|
|
|
|
tokio::spawn(async move {
|
|
let _lock_guard = lock_guard;
|
|
let ai_typing_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001")
|
|
.expect("constant UUID should always parse");
|
|
let ai_display_name_for_chunk = ai_display_name.clone();
|
|
let ai_display_name_for_final = ai_display_name.clone();
|
|
|
|
let chunk_count = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
|
|
let room_manager_cb = room_manager.clone();
|
|
|
|
let on_chunk = move |chunk: agent::chat::AiStreamChunk| {
|
|
Box::pin({
|
|
let room_manager = room_manager_cb.clone();
|
|
let streaming_msg_id = streaming_msg_id;
|
|
let room_id = room_id_inner;
|
|
let chunk_count = chunk_count.clone();
|
|
let ai_display_name_for_chunk = ai_display_name_for_chunk.clone();
|
|
async move {
|
|
let chunk_type_str = match chunk.chunk_type {
|
|
agent::chat::AiChunkType::Thinking => "thinking",
|
|
agent::chat::AiChunkType::Answer => "answer",
|
|
agent::chat::AiChunkType::ToolCall => "tool_call",
|
|
agent::chat::AiChunkType::ToolResult => "tool_result",
|
|
};
|
|
let seq = chunk_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
|
let event = RoomMessageStreamChunkEvent {
|
|
message_id: streaming_msg_id,
|
|
room_id,
|
|
seq,
|
|
content: chunk.content,
|
|
done: chunk.done,
|
|
error: None,
|
|
display_name: Some(ai_display_name_for_chunk),
|
|
chunk_type: Some(chunk_type_str.to_string()),
|
|
};
|
|
room_manager.broadcast_stream_chunk(event).await;
|
|
}
|
|
}) as Pin<Box<dyn std::future::Future<Output = ()> + Send>>
|
|
};
|
|
|
|
let stream_callback: agent::chat::StreamCallback = Box::new(on_chunk);
|
|
|
|
let typing_start = queue::TypingEvent {
|
|
room_id: room_id_inner,
|
|
user_id: ai_typing_id,
|
|
username: ai_display_name.clone(),
|
|
avatar_url: None,
|
|
action: "start".to_string(),
|
|
sender_type: Some("ai".to_string()),
|
|
};
|
|
room_manager.broadcast_typing(room_id_inner, typing_start.clone()).await;
|
|
|
|
let (typing_cancel_tx, typing_cancel_rx) = tokio::sync::oneshot::channel::<()>();
|
|
let typing_renew_handle = tokio::spawn({
|
|
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
|
|
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
|
let mgr = room_manager.clone();
|
|
let rid = room_id_inner;
|
|
let evt = typing_start.clone();
|
|
async move {
|
|
tokio::select! {
|
|
_ = typing_cancel_rx => {}
|
|
_ = async {
|
|
loop {
|
|
interval.tick().await;
|
|
mgr.broadcast_typing(rid, evt.clone()).await;
|
|
}
|
|
} => {}
|
|
}
|
|
}
|
|
});
|
|
|
|
match chat_service.process_stream(request, stream_callback).await {
|
|
Ok(result) => {
|
|
// Store ordered chunks as JSON in thinking_content for ordered replay.
|
|
// Uses {"__chunks__": [...]} marker so legacy plain-text still works.
|
|
let thinking_content = if result.chunks.is_empty() {
|
|
None
|
|
} else {
|
|
let chunks_json = serde_json::json!({
|
|
"__chunks__": result.chunks.iter().map(|c| {
|
|
let type_str = match c.chunk_type {
|
|
agent::client::StreamChunkType::Thinking => "thinking",
|
|
agent::client::StreamChunkType::Answer => "answer",
|
|
agent::client::StreamChunkType::ToolCall => "tool_call",
|
|
};
|
|
serde_json::json!({
|
|
"type": type_str,
|
|
"content": c.content,
|
|
})
|
|
}).collect::<Vec<_>>(),
|
|
});
|
|
Some(chunks_json.to_string())
|
|
};
|
|
let envelope = RoomMessageEnvelope {
|
|
id: streaming_msg_id,
|
|
dedup_key: Some(format!("{}:{}", room_id_inner, streaming_msg_id)),
|
|
room_id: room_id_inner,
|
|
sender_type: sender_type.clone(),
|
|
sender_id: None,
|
|
model_id: Some(model_id),
|
|
thread_id: None,
|
|
content: result.content.clone(),
|
|
content_type: "text".to_string(),
|
|
thinking_content: thinking_content.clone(),
|
|
send_at: now,
|
|
seq,
|
|
in_reply_to: None,
|
|
display_name: Some(ai_display_name_for_final.clone()),
|
|
};
|
|
|
|
if let Err(e) = queue.publish(room_id_inner, envelope).await {
|
|
tracing::error!(error = %e, "Failed to publish streaming AI message");
|
|
} else {
|
|
let now = Utc::now();
|
|
if let Err(e) = room_ai::Entity::update_many()
|
|
.col_expr(
|
|
room_ai::Column::CallCount,
|
|
Expr::col(room_ai::Column::CallCount).add(1),
|
|
)
|
|
.col_expr(
|
|
room_ai::Column::LastCallAt,
|
|
Expr::value(Some(now)),
|
|
)
|
|
.filter(room_ai::Column::Room.eq(room_id_inner))
|
|
.filter(room_ai::Column::Model.eq(model_id))
|
|
.exec(&db)
|
|
.await
|
|
{
|
|
tracing::warn!(error = %e, "Failed to update room_ai call stats");
|
|
}
|
|
|
|
// Record billing (non-fatal)
|
|
if let Err(e) = super::billing::record_ai_usage(
|
|
&db,
|
|
project_id_inner,
|
|
model_id,
|
|
result.input_tokens,
|
|
result.output_tokens,
|
|
)
|
|
.await
|
|
{
|
|
tracing::warn!(error = %e, "AI billing recording failed");
|
|
}
|
|
|
|
let msg_event = queue::RoomMessageEvent {
|
|
id: streaming_msg_id,
|
|
room_id: room_id_inner,
|
|
sender_type: sender_type.clone(),
|
|
sender_id: None,
|
|
thread_id: None,
|
|
content: result.content.clone(),
|
|
content_type: "text".to_string(),
|
|
thinking_content: thinking_content.clone(),
|
|
send_at: now,
|
|
seq,
|
|
display_name: Some(ai_display_name_for_final.clone()),
|
|
in_reply_to: None,
|
|
reactions: None,
|
|
message_id: None,
|
|
};
|
|
room_manager.broadcast(room_id_inner, msg_event).await;
|
|
room_manager.metrics.messages_sent.increment(1);
|
|
|
|
let _ = typing_cancel_tx.send(());
|
|
typing_renew_handle.abort();
|
|
let typing_stop = queue::TypingEvent {
|
|
room_id: room_id_inner,
|
|
user_id: ai_typing_id,
|
|
username: ai_display_name_for_final.clone(),
|
|
avatar_url: None,
|
|
action: "stop".to_string(),
|
|
sender_type: Some("ai".to_string()),
|
|
};
|
|
room_manager.broadcast_typing(room_id_inner, typing_stop).await;
|
|
|
|
let event = ProjectRoomEvent {
|
|
event_type: crate::RoomEventType::NewMessage.as_str().into(),
|
|
project_id: project_id_inner,
|
|
room_id: Some(room_id_inner),
|
|
category_id: None,
|
|
message_id: Some(streaming_msg_id),
|
|
seq: Some(seq),
|
|
timestamp: now,
|
|
};
|
|
queue
|
|
.publish_project_room_event(project_id_inner, event)
|
|
.await;
|
|
}
|
|
}
|
|
Err(e) => {
|
|
tracing::error!(error = %e, "AI streaming failed");
|
|
let _ = typing_cancel_tx.send(());
|
|
typing_renew_handle.abort();
|
|
let typing_stop = queue::TypingEvent {
|
|
room_id: room_id_inner,
|
|
user_id: ai_typing_id,
|
|
username: ai_display_name.clone(),
|
|
avatar_url: None,
|
|
action: "stop".to_string(),
|
|
sender_type: Some("ai".to_string()),
|
|
};
|
|
room_manager.broadcast_typing(room_id_inner, typing_stop).await;
|
|
|
|
let event = RoomMessageStreamChunkEvent {
|
|
message_id: streaming_msg_id,
|
|
room_id: room_id_inner,
|
|
seq: 0,
|
|
content: String::new(),
|
|
done: true,
|
|
error: Some(e.to_string()),
|
|
display_name: Some(ai_display_name.clone()),
|
|
chunk_type: None,
|
|
};
|
|
room_manager.broadcast_stream_chunk(event).await;
|
|
}
|
|
}
|
|
|
|
room_manager.close_stream_channel(streaming_msg_id).await;
|
|
});
|
|
}
|