use std::pin::Pin; use std::sync::Arc; use queue::{MessageProducer, RoomMessageStreamChunkEvent}; use uuid::Uuid; use crate::connection::RoomConnectionManager; #[allow(dead_code)] pub(crate) fn lock_or_recover(mutex: &std::sync::Mutex) -> std::sync::MutexGuard<'_, T> { mutex .lock() .unwrap_or_else(|poisoned| poisoned.into_inner()) } pub(crate) struct ModeStreamingState { pub(crate) all_chunks: Arc>>, pub(crate) answer_buffer: Arc>, pub(crate) chunk_seq: Arc, pub(crate) on_chunk: Arc< dyn Fn(String, String, bool) -> Pin + Send>> + Send + Sync, >, } #[allow(dead_code)] pub(crate) fn build_mode_chunk_callback( streaming_msg_id: Uuid, room_id: Uuid, ai_display_name: &str, cancel: Arc, room_manager: Arc, queue: MessageProducer, ) -> ModeStreamingState { let all_chunks: Arc>> = Arc::new(std::sync::Mutex::new(Vec::new())); let answer_buffer: Arc> = Arc::new(std::sync::Mutex::new(String::new())); let chunk_seq = Arc::new(std::sync::atomic::AtomicU64::new(1)); let on_chunk = { let room_manager = room_manager.clone(); let queue = queue.clone(); let cancel = cancel.clone(); let all_chunks = all_chunks.clone(); let answer_buffer = answer_buffer.clone(); let chunk_seq = chunk_seq.clone(); let ai_display_name = ai_display_name.to_string(); Arc::new( move |chunk_type: String, content: String, is_answer: bool| { let room_manager = room_manager.clone(); let queue = queue.clone(); let cancel = cancel.clone(); let all_chunks = all_chunks.clone(); let answer_buffer = answer_buffer.clone(); let chunk_seq = chunk_seq.clone(); let ai_display_name = ai_display_name.clone(); { let mut chunks = lock_or_recover(&all_chunks); chunks.push((chunk_type.clone(), content.clone())); } if is_answer { let mut ab = lock_or_recover(&answer_buffer); ab.push_str(&content); } let current_seq = chunk_seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let event = RoomMessageStreamChunkEvent { message_id: streaming_msg_id, room_id, seq: current_seq, content: content.clone(), done: false, error: None, display_name: Some(ai_display_name.clone()), chunk_type: Some(chunk_type.clone()), }; Box::pin(async move { if cancel.load(std::sync::atomic::Ordering::Acquire) { return; } queue.publish_stream_chunk(&event).await; room_manager.broadcast_stream_chunk(event).await; }) as Pin + Send>> }, ) }; ModeStreamingState { all_chunks, answer_buffer, chunk_seq, on_chunk, } }