//! NATS client — single connection shared across MessageProducer, workers, and pubsub. //! //! Uses Core NATS for real-time broadcast (fan-out) and JetStream for message //! persistence (durable consumers). Replaces Redis Pub/Sub + Redis Streams. use std::sync::Arc; use std::time::Duration; use async_nats::jetstream; use config::AppConfig; pub struct NatsClient { pub client: async_nats::Client, pub jetstream: jetstream::Context, stream_name: String, connected: Arc, } impl NatsClient { pub async fn connect(config: &AppConfig) -> Option { let url = config.nats_url()?; let token = config.nats_token()?; let opts = async_nats::ConnectOptions::with_token(token) .retry_on_initial_connect() .connection_timeout(Duration::from_secs(10)) .reconnect_delay_callback(|attempts| { let ms = 100 * 2u64.saturating_pow(attempts as u32); Duration::from_millis(ms.min(30_000)) }) .event_callback(|event| async move { match event { async_nats::Event::Connected => { tracing::info!("NATS connected"); } async_nats::Event::Disconnected => { tracing::warn!("NATS disconnected, reconnecting..."); } async_nats::Event::ServerError(e) => { tracing::warn!(error = %e, "NATS server error"); } _ => {} } }); match opts.connect(&url).await { Ok(client) => { let connected = Arc::new(std::sync::atomic::AtomicBool::new(true)); let connected_clone = connected.clone(); // Track connection state for health checks let client_clone = client.clone(); tokio::spawn(async move { let mut attempts: u64 = 0; loop { tokio::time::sleep(Duration::from_secs(5)).await; let was = connected_clone.load(std::sync::atomic::Ordering::SeqCst); let is = client_clone.connection_state() == async_nats::connection::State::Connected; if was != is { connected_clone.store(is, std::sync::atomic::Ordering::SeqCst); if is { attempts = 0; } } if !is { attempts += 1; if attempts > 12 { tracing::error!("NATS disconnected for {}s", attempts * 5); } } } }); let jetstream = jetstream::new(client.clone()); let stream_name = config.nats_stream_name(); let nats = Self { client, jetstream, stream_name, connected, }; if let Err(e) = nats.ensure_stream(config).await { tracing::warn!(error = %e, "JetStream stream init failed"); } else { tracing::info!(stream = %nats.stream_name, "JetStream stream ready"); } tracing::info!(url = %url, "NATS connected"); Some(nats) } Err(e) => { tracing::warn!(error = %e, url = %url, "NATS connect failed — running without NATS"); None } } } /// Create or verify the JetStream stream for room message persistence. pub async fn ensure_stream(&self, config: &AppConfig) -> anyhow::Result<()> { let stream_config = jetstream::stream::Config { name: self.stream_name.clone(), subjects: vec![ "room.message.>".to_string(), "room.chunk.>".to_string(), "chat.message.>".to_string(), "chat.chunk.>".to_string(), "chat.subagent.chunk.>".to_string(), ], retention: jetstream::stream::RetentionPolicy::Interest, max_age: Duration::from_secs(config.nats_max_age_secs()), storage: jetstream::stream::StorageType::File, max_messages_per_subject: 100_000, ..Default::default() }; self.jetstream .get_or_create_stream(stream_config) .await .map_err(|e| anyhow::anyhow!("JetStream stream create failed: {}", e))?; Ok(()) } pub fn is_connected(&self) -> bool { self.connected.load(std::sync::atomic::Ordering::SeqCst) } pub fn stream_name(&self) -> &str { &self.stream_name } /// Publish to core NATS for real-time broadcast. Fire-and-forget. pub async fn core_publish(&self, subject: String, payload: Vec) { if let Err(e) = self.client.publish(subject.clone(), payload.into()).await { tracing::warn!(subject = %subject, error = %e, "NATS core publish failed"); } } /// Publish to JetStream and await the publish acknowledgement. /// Returns the stream sequence number, or 0 on failure. pub async fn jetstream_publish( &self, subject: String, payload: Vec, ) -> anyhow::Result { let ack_future = self .jetstream .publish(subject.clone(), payload.into()) .await .map_err(|e| anyhow::anyhow!("JetStream publish failed: {}", e))?; let ack = ack_future .await .map_err(|e| anyhow::anyhow!("JetStream publish ack failed: {}", e))?; Ok(ack.sequence) } /// Create a core NATS subscription. Returns a subscriber receiver. pub async fn subscribe(&self, subject: &str) -> anyhow::Result { self.client .subscribe(subject.to_string()) .await .map_err(|e| anyhow::anyhow!("NATS subscribe failed: {}", e)) } /// Create or get an ephemeral JetStream push consumer for broadcast delivery. /// Returns a message stream that will receive messages published to the subject. pub async fn consumer_messages( &self, durable: &str, filter_subject: &str, config: &AppConfig, ) -> anyhow::Result { let stream = self .jetstream .get_stream(&self.stream_name) .await .map_err(|e| anyhow::anyhow!("JetStream get stream failed: {}", e))?; let pull_config = async_nats::jetstream::consumer::pull::Config { durable_name: Some(durable.to_string()), filter_subject: filter_subject.to_string(), max_deliver: config.nats_max_deliver(), ack_wait: Duration::from_secs(config.nats_ack_wait_secs()), max_ack_pending: config.nats_buffer_size() as i64, ..Default::default() }; let consumer = stream .get_or_create_consumer(durable, pull_config) .await .map_err(|e| anyhow::anyhow!("JetStream consumer create failed: {}", e))?; consumer .messages() .await .map_err(|e| anyhow::anyhow!("JetStream consumer messages failed: {}", e)) } }