use std::{sync::Arc, time::Duration}; use async_nats::{HeaderMap, jetstream}; use config::AppConfig; use futures_util::StreamExt; use tracing::{error, info, warn}; use crate::{ handler::{AckAction, MessageHandler}, producer::{NatsProducer, connect_jetstream, ensure_stream}, }; pub struct NatsConsumer { stream: jetstream::stream::Stream, producer: NatsProducer, max_deliver: i64, retry_delay_secs: u64, durable_name: String, } impl NatsConsumer { pub async fn new( config: &AppConfig, group_id: &str, ) -> anyhow::Result { let jetstream = connect_jetstream(config).await?; let stream = ensure_stream(config, &jetstream).await?; let producer = NatsProducer::new(config).await?; Ok(Self { stream, producer, max_deliver: config.nats_max_deliver(), retry_delay_secs: config.nats_retry_delay_secs(), durable_name: durable_name(group_id), }) } pub async fn start_consuming( &self, topics: &[&str], handler: H, ) -> anyhow::Result<()> where H: MessageHandler + 'static, { let topics_owned: Vec = topics.iter().map(|topic| topic.to_string()).collect(); let consumer = self .stream .get_or_create_consumer( &self.durable_name, jetstream::consumer::pull::Config { durable_name: Some(self.durable_name.clone()), ack_wait: Duration::from_secs(self.retry_delay_secs), max_deliver: self.max_deliver, filter_subjects: topics_owned.clone(), ..Default::default() }, ) .await?; info!("NATS consumer started subscribing to: {:?}", topics_owned); let producer = self.producer.clone(); let max_deliver = self.max_deliver; let retry_delay_secs = self.retry_delay_secs; let handler = Arc::new(handler); tokio::spawn(async move { let messages = consumer.messages().await; let mut messages = match messages { Ok(messages) => messages, Err(error) => { error!( "NATS error while opening consumer stream: {:?}", error ); return; } }; while let Some(message_result) = messages.next().await { match message_result { Ok(message) => { handle_message( &producer, max_deliver, retry_delay_secs, handler.as_ref(), message, ) .await; } Err(error) => { error!("NATS error while consuming: {:?}", error); } } } }); Ok(()) } } async fn handle_message( producer: &NatsProducer, max_deliver: i64, retry_delay_secs: u64, handler: &H, message: jetstream::Message, ) where H: MessageHandler + ?Sized, { let subject = message.subject.to_string(); let payload = message.payload.clone(); let delivered = message.info().map(|info| info.delivered).unwrap_or(1); match handler.handle(&subject, &payload).await { AckAction::Ack => ack_message(&message, &subject, "message").await, AckAction::Nack => { if let Err(error) = handle_nack( producer, &message, &subject, &payload, delivered, max_deliver, retry_delay_secs, ) .await { error!( "Failed to route NACKed message from subject {}: {:?}", subject, error ); } } } } async fn handle_nack( producer: &NatsProducer, message: &jetstream::Message, subject: &str, payload: &[u8], delivered: i64, max_deliver: i64, retry_delay_secs: u64, ) -> anyhow::Result<()> { if delivered < max_deliver { warn!( "Message in subject {} failed (NACK). Retrying delivery {}/{} in {} seconds", subject, delivered, max_deliver, retry_delay_secs ); message .ack_with(jetstream::AckKind::Nak(Some(Duration::from_secs( retry_delay_secs, )))) .await .map_err(|error| { anyhow::anyhow!("failed to nack message: {error}") })?; return Ok(()); } let dlq_subject = format!("{subject}.dlq"); error!( "Message in subject {} exceeded max deliver attempts ({}). Routing to DLQ: {}", subject, max_deliver, dlq_subject ); let mut headers = HeaderMap::new(); headers.append("x-original-subject", subject); headers.append("x-delivered-count", delivered.to_string()); producer .send_raw(&dlq_subject, "", payload, Some(headers)) .await .map_err(|error| { anyhow::anyhow!( "failed to send DLQ message to {dlq_subject}: {error}" ) })?; message.ack().await.map_err(|error| { anyhow::anyhow!("failed to ack DLQ message: {error}") })?; Ok(()) } async fn ack_message( message: &jetstream::Message, subject: &str, description: &str, ) { if let Err(error) = message.ack().await { error!( "Failed to ack {} in subject {}: {:?}", description, subject, error ); } } fn durable_name(name: &str) -> String { name.replace('.', "-") }