use std::pin::Pin; use std::sync::Arc; use chrono::Utc; use db::cache::AppCache; use db::database::AppDatabase; use queue::MessageProducer; use uuid::Uuid; use super::ai_mode_streaming_post::finalize_mode_stream; use super::ai_mode_streaming_steps::build_mode_chunk_callback; use super::sequence::next_room_message_seq_internal; use crate::connection::RoomConnectionManager; use agent::chat::{AiRequest, ChatService}; #[allow(dead_code)] pub type RunModeFn = Box< dyn FnOnce( Arc, AiRequest, Arc< dyn Fn(String, String, bool) -> Pin + Send>> + Send + Sync, >, ) -> Pin< Box> + Send>, > + Send, >; #[allow(dead_code)] pub async fn run_mode_streaming( chat_service: Arc, request: AiRequest, room_id: Uuid, project_id: Uuid, model_id: Uuid, lock_guard: crate::room_ai_queue::RoomAiLockGuard, db: AppDatabase, cache: AppCache, queue: MessageProducer, room_manager: Arc, mode_name_str: &str, run: RunModeFn, ) { let mode_name = mode_name_str.to_string(); let streaming_msg_id = Uuid::now_v7(); let seq = match next_room_message_seq_internal(room_id, &db, &cache).await { Ok(s) => s, Err(e) => { tracing::error!(error = %e, "Failed to get seq for {} streaming", mode_name); return; } }; let ai_display_name = request.model.name.clone(); let _ = room_manager .register_stream_channel(streaming_msg_id, room_id, Some(ai_display_name.clone())) .await; use queue::RoomMessageStreamChunkEvent; let initial_event = RoomMessageStreamChunkEvent { message_id: streaming_msg_id, room_id, seq: 0, content: String::new(), done: false, error: None, display_name: Some(ai_display_name.clone()), chunk_type: Some("thinking".to_string()), }; room_manager.broadcast_stream_chunk(initial_event).await; let now = Utc::now(); tokio::spawn(async move { let _lock_guard = lock_guard; let cancel = room_manager.register_stream_cancel(room_id).await; let ai_typing_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001") .expect("constant UUID should always parse"); let typing_start = queue::TypingEvent { room_id, user_id: ai_typing_id, username: ai_display_name.clone(), avatar_url: None, action: "start".to_string(), sender_type: Some("ai".to_string()), }; room_manager .broadcast_typing(room_id, typing_start.clone()) .await; let (typing_cancel_tx, typing_cancel_rx) = tokio::sync::oneshot::channel::<()>(); let typing_renew_handle = tokio::spawn({ let mut interval = tokio::time::interval(std::time::Duration::from_secs(30)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let mgr = room_manager.clone(); let evt = typing_start.clone(); async move { tokio::select! { _ = typing_cancel_rx => {} _ = async { loop { interval.tick().await; mgr.broadcast_typing(room_id, evt.clone()).await; } } => {} } } }); let state = build_mode_chunk_callback( streaming_msg_id, room_id, &ai_display_name, cancel, room_manager.clone(), queue.clone(), ); let on_chunk = state.on_chunk.clone(); let result = run(chat_service, request, on_chunk).await; finalize_mode_stream( result, state, streaming_msg_id, room_id, project_id, model_id, seq, now, &ai_display_name, &mode_name, &db, &queue, &room_manager, ) .await; let _ = typing_cancel_tx.send(()); typing_renew_handle.abort(); let typing_stop = queue::TypingEvent { room_id, user_id: ai_typing_id, username: ai_display_name.clone(), avatar_url: None, action: "stop".to_string(), sender_type: Some("ai".to_string()), }; room_manager.broadcast_typing(room_id, typing_stop).await; room_manager.unregister_stream_cancel(room_id).await; room_manager.close_stream_channel(streaming_msg_id).await; }); }