//! Publishes room messages via NATS JetStream (persistence) + Core NATS (broadcast). //! //! Architecture: //! - JetStream publish: durable, acknowledged, for message persistence + consumer-based delivery //! - Core NATS publish: fire-and-forget, for real-time broadcast fan-out across nodes use crate::nats_client::NatsClient; use crate::types::{ AgentTaskEvent, ChatMessageEvent, ChatStreamChunkEvent, EmailEnvelope, ProjectRoomEvent, ReactionGroup, RoomMessageEnvelope, RoomMessageEvent, RoomMessageStreamChunkEvent, }; use std::sync::Arc; pub type NatsPublishResult = u64; /// Publishes room messages via NATS. #[derive(Clone)] pub struct MessageProducer { /// JetStream publish function for durable/persisted messages. pub jetstream_publish: Arc< dyn Fn( String, Vec, ) -> std::pin::Pin> + Send>> + Send + Sync, >, /// Core NATS publish function for real-time broadcast (fire-and-forget). pub core_publish: Arc< dyn Fn(String, Vec) -> std::pin::Pin + Send>> + Send + Sync, >, /// Redis connection getter — kept for cache/seq access (notification count, etc.) pub get_redis: Arc< dyn Fn() -> tokio::task::JoinHandle> + Send + Sync, >, /// Direct NATS client reference for subscriptions (watch endpoints, etc.) pub nats: Option>, } impl MessageProducer { pub fn new( nats: Option>, get_redis: Arc< dyn Fn() -> tokio::task::JoinHandle> + Send + Sync, >, ) -> Self { let js_fn: Arc< dyn Fn( String, Vec, ) -> std::pin::Pin< Box> + Send>, > + Send + Sync, > = if let Some(ref n) = nats { let n = n.clone(); Arc::new(move |subject: String, payload: Vec| { let n = n.clone(); Box::pin(async move { n.jetstream_publish(subject, payload).await }) as _ }) } else { Arc::new(|_subject: String, _payload: Vec| { Box::pin(async move { Ok::(0) }) as _ }) }; let core_fn: Arc< dyn Fn( String, Vec, ) -> std::pin::Pin + Send>> + Send + Sync, > = if let Some(ref n) = nats { let n = n.clone(); Arc::new(move |subject: String, payload: Vec| { let n = n.clone(); Box::pin(async move { n.core_publish(subject, payload).await }) as _ }) } else { Arc::new(|_subject: String, _payload: Vec| Box::pin(async move {}) as _) }; Self { jetstream_publish: js_fn, core_publish: core_fn, get_redis, nats, } } /// Publish a room message — broadcast via JetStream for reliable cross-node delivery. /// Persistence (DB INSERT) must happen in the caller BEFORE calling this method. pub async fn publish( &self, room_id: uuid::Uuid, envelope: RoomMessageEnvelope, ) -> anyhow::Result<()> { let subject = format!("room.message.{}", room_id); let event = RoomMessageEvent::from(envelope); let payload = serde_json::to_vec(&event)?; let seq = (self.jetstream_publish)(subject, payload).await?; tracing::info!(room_id = %room_id, seq = seq, "message broadcast via JetStream"); Ok(()) } /// Publish a stream chunk event via JetStream for reliable cross-node real-time delivery. /// Chunks are transient — NOT persisted to DB. pub async fn publish_stream_chunk(&self, event: &RoomMessageStreamChunkEvent) { let subject = format!("room.chunk.{}", event.room_id); let payload = match serde_json::to_vec(event) { Ok(p) => p, Err(e) => { tracing::error!(error = %e, "serialise stream chunk failed"); return; } }; if let Err(e) = (self.jetstream_publish)(subject, payload).await { tracing::warn!(error = %e, room_id = %event.room_id, "JetStream chunk publish failed"); } } /// Publish a project-level room event via Core NATS (no JetStream persistence). pub async fn publish_project_room_event( &self, project_id: uuid::Uuid, event: ProjectRoomEvent, ) { let subject = format!("project.event.{}", project_id); let payload = match serde_json::to_vec(&event) { Ok(p) => p, Err(e) => { tracing::error!(error = %e, "serialise ProjectRoomEvent failed"); return; } }; (self.core_publish)(subject, payload).await; } /// Publish an agent task event via Core NATS (no JetStream persistence). pub async fn publish_agent_task_event(&self, project_id: uuid::Uuid, event: AgentTaskEvent) { let subject = format!("task.event.{}", project_id); let payload = match serde_json::to_vec(&event) { Ok(p) => p, Err(e) => { tracing::error!(error = %e, "serialise AgentTaskEvent failed"); return; } }; (self.core_publish)(subject, payload).await; } /// Broadcast a reaction-update event via Core NATS. pub async fn publish_reaction_event( &self, room_id: uuid::Uuid, message_id: uuid::Uuid, reactions: Vec, ) { let event = RoomMessageEvent { id: uuid::Uuid::now_v7(), room_id, sender_type: String::new(), sender_id: None, thread_id: None, in_reply_to: None, content: String::new(), content_type: String::new(), thinking_content: None, send_at: chrono::Utc::now(), seq: 0, display_name: None, reactions: Some(reactions), message_id: Some(message_id), }; let payload = match serde_json::to_vec(&event) { Ok(p) => p, Err(e) => { tracing::error!(error = %e, "serialise reaction event failed"); return; } }; (self.core_publish)(format!("room.broadcast.{}", room_id), payload).await; } /// Publish an email message via JetStream for async processing. pub async fn publish_email(&self, envelope: EmailEnvelope) -> anyhow::Result { let subject = "email.queue".to_string(); let payload = serde_json::to_string(&envelope)?.into_bytes(); let seq = (self.jetstream_publish)(subject, payload).await?; let msg_id = format!("nats:{}", seq); tracing::info!(to = %envelope.to, msg_id = %msg_id, "email queued to NATS"); metrics::counter!("email_queued_total").increment(1); Ok(msg_id) } // ── Chat message publishing ────────────────────────────────────────────── /// Publish a chat message via JetStream for persistence + multi-viewer delivery. pub async fn publish_chat_message(&self, event: &ChatMessageEvent) -> anyhow::Result { let subject = format!("chat.message.{}", event.conversation_id); let payload = serde_json::to_vec(event)?; let seq = (self.jetstream_publish)(subject.clone(), payload.clone()).await?; (self.core_publish)(subject, payload).await; tracing::info!(conversation_id = %event.conversation_id, seq = seq, "chat message broadcast via JetStream"); Ok(seq) } /// Publish a chat stream chunk via JetStream for real-time multi-viewer streaming. pub async fn publish_chat_chunk(&self, event: &ChatStreamChunkEvent) { let subject = format!("chat.chunk.{}", event.conversation_id); let payload = match serde_json::to_vec(event) { Ok(p) => p, Err(e) => { tracing::error!(error = %e, "serialise chat chunk failed"); return; } }; (self.core_publish)(subject.clone(), payload.clone()).await; if let Err(e) = (self.jetstream_publish)(subject, payload).await { tracing::warn!(error = %e, conversation_id = %event.conversation_id, "JetStream chat chunk publish failed"); } } }