gitdataai/libs/queue/producer.rs

224 lines
8.7 KiB
Rust

//! Publishes room messages via NATS JetStream (persistence) + Core NATS (broadcast).
//!
//! Architecture:
//! - JetStream publish: durable, acknowledged, for message persistence + consumer-based delivery
//! - Core NATS publish: fire-and-forget, for real-time broadcast fan-out across nodes
use crate::nats_client::NatsClient;
use crate::types::{
AgentTaskEvent, ChatMessageEvent, ChatStreamChunkEvent, EmailEnvelope, ProjectRoomEvent,
ReactionGroup, RoomMessageEnvelope, RoomMessageEvent, RoomMessageStreamChunkEvent,
};
use std::sync::Arc;
pub type NatsPublishResult = u64;
/// Publishes room messages via NATS.
#[derive(Clone)]
pub struct MessageProducer {
/// JetStream publish function for durable/persisted messages.
pub jetstream_publish: Arc<
dyn Fn(String, Vec<u8>) -> std::pin::Pin<
Box<dyn std::future::Future<Output = anyhow::Result<u64>> + Send>,
> + Send
+ Sync,
>,
/// Core NATS publish function for real-time broadcast (fire-and-forget).
pub core_publish: Arc<
dyn Fn(String, Vec<u8>) -> std::pin::Pin<
Box<dyn std::future::Future<Output = ()> + Send>,
> + Send
+ Sync,
>,
/// Redis connection getter — kept for cache/seq access (notification count, etc.)
pub get_redis:
Arc<dyn Fn() -> tokio::task::JoinHandle<anyhow::Result<deadpool_redis::cluster::Connection>> + Send + Sync>,
/// Direct NATS client reference for subscriptions (watch endpoints, etc.)
pub nats: Option<Arc<NatsClient>>,
}
impl MessageProducer {
pub fn new(
nats: Option<Arc<NatsClient>>,
get_redis: Arc<
dyn Fn() -> tokio::task::JoinHandle<anyhow::Result<deadpool_redis::cluster::Connection>>
+ Send
+ Sync,
>,
) -> Self {
let js_fn: Arc<
dyn Fn(String, Vec<u8>) -> std::pin::Pin<
Box<dyn std::future::Future<Output = anyhow::Result<u64>> + Send>,
> + Send
+ Sync,
> = if let Some(ref n) = nats {
let n = n.clone();
Arc::new(move |subject: String, payload: Vec<u8>| {
let n = n.clone();
Box::pin(async move { n.jetstream_publish(subject, payload).await }) as _
})
} else {
Arc::new(|_subject: String, _payload: Vec<u8>| {
Box::pin(async move { Ok::<u64, anyhow::Error>(0) }) as _
})
};
let core_fn: Arc<
dyn Fn(String, Vec<u8>) -> std::pin::Pin<
Box<dyn std::future::Future<Output = ()> + Send>,
> + Send
+ Sync,
> = if let Some(ref n) = nats {
let n = n.clone();
Arc::new(move |subject: String, payload: Vec<u8>| {
let n = n.clone();
Box::pin(async move { n.core_publish(subject, payload).await }) as _
})
} else {
Arc::new(|_subject: String, _payload: Vec<u8>| {
Box::pin(async move {}) as _
})
};
Self {
jetstream_publish: js_fn,
core_publish: core_fn,
get_redis,
nats,
}
}
/// Publish a room message — broadcast via JetStream for reliable cross-node delivery.
/// Persistence (DB INSERT) must happen in the caller BEFORE calling this method.
pub async fn publish(
&self,
room_id: uuid::Uuid,
envelope: RoomMessageEnvelope,
) -> anyhow::Result<()> {
let subject = format!("room.message.{}", room_id);
let event = RoomMessageEvent::from(envelope);
let payload = serde_json::to_vec(&event)?;
let seq = (self.jetstream_publish)(subject, payload).await?;
tracing::info!(room_id = %room_id, seq = seq, "message broadcast via JetStream");
Ok(())
}
/// Publish a stream chunk event via JetStream for reliable cross-node real-time delivery.
/// Chunks are transient — NOT persisted to DB.
pub async fn publish_stream_chunk(&self, event: &RoomMessageStreamChunkEvent) {
let subject = format!("room.chunk.{}", event.room_id);
let payload = match serde_json::to_vec(event) {
Ok(p) => p,
Err(e) => {
tracing::error!(error = %e, "serialise stream chunk failed");
return;
}
};
if let Err(e) = (self.jetstream_publish)(subject, payload).await {
tracing::warn!(error = %e, room_id = %event.room_id, "JetStream chunk publish failed");
}
}
/// Publish a project-level room event via Core NATS (no JetStream persistence).
pub async fn publish_project_room_event(
&self,
project_id: uuid::Uuid,
event: ProjectRoomEvent,
) {
let subject = format!("project.event.{}", project_id);
let payload = match serde_json::to_vec(&event) {
Ok(p) => p,
Err(e) => {
tracing::error!(error = %e, "serialise ProjectRoomEvent failed");
return;
}
};
(self.core_publish)(subject, payload).await;
}
/// Publish an agent task event via Core NATS (no JetStream persistence).
pub async fn publish_agent_task_event(&self, project_id: uuid::Uuid, event: AgentTaskEvent) {
let subject = format!("task.event.{}", project_id);
let payload = match serde_json::to_vec(&event) {
Ok(p) => p,
Err(e) => {
tracing::error!(error = %e, "serialise AgentTaskEvent failed");
return;
}
};
(self.core_publish)(subject, payload).await;
}
/// Broadcast a reaction-update event via Core NATS.
pub async fn publish_reaction_event(
&self,
room_id: uuid::Uuid,
message_id: uuid::Uuid,
reactions: Vec<ReactionGroup>,
) {
let event = RoomMessageEvent {
id: uuid::Uuid::now_v7(),
room_id,
sender_type: String::new(),
sender_id: None,
thread_id: None,
in_reply_to: None,
content: String::new(),
content_type: String::new(),
thinking_content: None,
send_at: chrono::Utc::now(),
seq: 0,
display_name: None,
reactions: Some(reactions),
message_id: Some(message_id),
};
let payload = match serde_json::to_vec(&event) {
Ok(p) => p,
Err(e) => {
tracing::error!(error = %e, "serialise reaction event failed");
return;
}
};
(self.core_publish)(format!("room.broadcast.{}", room_id), payload).await;
}
/// Publish an email message via JetStream for async processing.
pub async fn publish_email(&self, envelope: EmailEnvelope) -> anyhow::Result<String> {
let subject = "email.queue".to_string();
let payload = serde_json::to_string(&envelope)?.into_bytes();
let seq = (self.jetstream_publish)(subject, payload).await?;
let msg_id = format!("nats:{}", seq);
tracing::info!(to = %envelope.to, msg_id = %msg_id, "email queued to NATS");
metrics::counter!("email_queued_total").increment(1);
Ok(msg_id)
}
// ── Chat message publishing ──────────────────────────────────────────────
/// Publish a chat message via JetStream for persistence + multi-viewer delivery.
pub async fn publish_chat_message(&self, event: &ChatMessageEvent) -> anyhow::Result<u64> {
let subject = format!("chat.message.{}", event.conversation_id);
let payload = serde_json::to_vec(event)?;
let seq = (self.jetstream_publish)(subject.clone(), payload.clone()).await?;
(self.core_publish)(subject, payload).await;
tracing::info!(conversation_id = %event.conversation_id, seq = seq, "chat message broadcast via JetStream");
Ok(seq)
}
/// Publish a chat stream chunk via JetStream for real-time multi-viewer streaming.
pub async fn publish_chat_chunk(&self, event: &ChatStreamChunkEvent) {
let subject = format!("chat.chunk.{}", event.conversation_id);
let payload = match serde_json::to_vec(event) {
Ok(p) => p,
Err(e) => {
tracing::error!(error = %e, "serialise chat chunk failed");
return;
}
};
(self.core_publish)(subject.clone(), payload.clone()).await;
if let Err(e) = (self.jetstream_publish)(subject, payload).await {
tracing::warn!(error = %e, conversation_id = %event.conversation_id, "JetStream chat chunk publish failed");
}
}
}