use std::{sync::Arc, time::Duration}; use async_nats::{HeaderMap, jetstream}; use config::AppConfig; use futures_util::StreamExt; use tracing::{error, info, warn}; use track::CounterVec; use crate::{ handler::{AckAction, MessageHandler}, producer::{NatsProducer, connect_jetstream, ensure_stream}, }; pub struct NatsConsumer { stream: jetstream::stream::Stream, producer: NatsProducer, max_deliver: i64, retry_delay_secs: u64, durable_name: String, metrics: Option, } impl NatsConsumer { pub async fn new( config: &AppConfig, group_id: &str, ) -> anyhow::Result { let jetstream = connect_jetstream(config).await?; let stream = ensure_stream(config, &jetstream).await?; let producer = NatsProducer::new(config).await?; Ok(Self { stream, producer, max_deliver: config.nats_max_deliver(), retry_delay_secs: config.nats_retry_delay_secs(), durable_name: durable_name(group_id), metrics: None, }) } pub fn set_metrics(&mut self, registry: track::MetricsRegistry) { self.producer.set_metrics(registry.clone()); self.metrics = Some(registry); } pub async fn start_consuming( &self, topics: &[&str], handler: H, ) -> anyhow::Result<()> where H: MessageHandler + 'static, { let topics_owned: Vec = topics.iter().map(|topic| topic.to_string()).collect(); let consumer = self .stream .get_or_create_consumer( &self.durable_name, jetstream::consumer::pull::Config { durable_name: Some(self.durable_name.clone()), ack_wait: Duration::from_secs(self.retry_delay_secs), max_deliver: self.max_deliver, filter_subjects: topics_owned.clone(), ..Default::default() }, ) .await?; info!(topics = ?topics_owned, durable = %self.durable_name, "NATS consumer started"); let producer = self.producer.clone(); let metrics = self.metrics.clone(); let max_deliver = self.max_deliver; let retry_delay_secs = self.retry_delay_secs; let handler = Arc::new(handler); tokio::spawn(async move { let messages = consumer.messages().await; let mut messages = match messages { Ok(messages) => messages, Err(error) => { error!(error = %error, "NATS error while opening consumer stream"); return; } }; while let Some(message_result) = messages.next().await { match message_result { Ok(message) => { handle_message( &producer, metrics.as_ref(), max_deliver, retry_delay_secs, handler.as_ref(), message, ) .await; } Err(error) => { error!(error = %error, "NATS error while consuming"); } } } }); Ok(()) } } async fn handle_message( producer: &NatsProducer, metrics: Option<&track::MetricsRegistry>, max_deliver: i64, retry_delay_secs: u64, handler: &H, message: jetstream::Message, ) where H: MessageHandler + ?Sized, { let subject = message.subject.to_string(); let payload = message.payload.clone(); let delivered = message.info().map(|info| info.delivered).unwrap_or(1); record_queue_message(metrics, &subject, "received"); match handler.handle(&subject, &payload).await { AckAction::Ack => { ack_message(metrics, &message, &subject, "message").await } AckAction::Nack => { record_queue_message(metrics, &subject, "nack"); if let Err(error) = handle_nack( producer, metrics, &message, &subject, &payload, delivered, max_deliver, retry_delay_secs, ) .await { record_queue_message(metrics, &subject, "error"); error!( subject = %subject, delivered, max_deliver, error = %error, "failed to route NACKed message" ); } } } } async fn handle_nack( producer: &NatsProducer, metrics: Option<&track::MetricsRegistry>, message: &jetstream::Message, subject: &str, payload: &[u8], delivered: i64, max_deliver: i64, retry_delay_secs: u64, ) -> anyhow::Result<()> { if delivered < max_deliver { warn!( subject, delivered, max_deliver, retry_delay_secs, "message NACKed, scheduling retry" ); message .ack_with(jetstream::AckKind::Nak(Some(Duration::from_secs( retry_delay_secs, )))) .await .map_err(|error| { anyhow::anyhow!("failed to nack message: {error}") })?; record_queue_message(metrics, subject, "retry"); return Ok(()); } let dlq_subject = format!("{subject}.dlq"); error!( subject, dlq_subject = %dlq_subject, delivered, max_deliver, "message exceeded max deliver attempts, routing to DLQ" ); let mut headers = HeaderMap::new(); headers.append("x-original-subject", subject); headers.append("x-delivered-count", delivered.to_string()); producer .send_raw(&dlq_subject, "", payload, Some(headers)) .await .map_err(|error| { anyhow::anyhow!( "failed to send DLQ message to {dlq_subject}: {error}" ) })?; message.ack().await.map_err(|error| { anyhow::anyhow!("failed to ack DLQ message: {error}") })?; record_queue_message(metrics, subject, "dlq"); record_queue_dlq(metrics, subject); Ok(()) } async fn ack_message( metrics: Option<&track::MetricsRegistry>, message: &jetstream::Message, subject: &str, description: &str, ) { match message.ack().await { Ok(()) => record_queue_message(metrics, subject, "ack"), Err(error) => { record_queue_message(metrics, subject, "ack_error"); error!( subject, description, error = %error, "failed to ack message" ); } } } fn record_queue_message( metrics: Option<&track::MetricsRegistry>, topic: &str, status: &str, ) { if let Some(metrics) = metrics { queue_messages_vec(metrics) .with_label_values(&[topic, status]) .inc(); } } fn record_queue_dlq(metrics: Option<&track::MetricsRegistry>, topic: &str) { if let Some(metrics) = metrics { queue_dlq_vec(metrics).with_label_values(&[topic]).inc(); } } fn queue_messages_vec(registry: &track::MetricsRegistry) -> CounterVec { registry .register_counter_vec( "queue_messages_total", "Total queue messages", &["topic", "status"], ) .expect("failed to register queue_messages_total") } fn queue_dlq_vec(registry: &track::MetricsRegistry) -> CounterVec { registry .register_counter_vec( "queue_dlq_total", "Total messages routed to DLQ", &["topic"], ) .expect("failed to register queue_dlq_total") } fn durable_name(name: &str) -> String { name.replace('.', "-") }