gitdataai/lib/queue/consumer.rs

279 lines
7.9 KiB
Rust

use std::{sync::Arc, time::Duration};
use async_nats::{HeaderMap, jetstream};
use config::AppConfig;
use futures_util::StreamExt;
use tracing::{error, info, warn};
use track::CounterVec;
use crate::{
handler::{AckAction, MessageHandler},
producer::{NatsProducer, connect_jetstream, ensure_stream},
};
pub struct NatsConsumer {
stream: jetstream::stream::Stream,
producer: NatsProducer,
max_deliver: i64,
retry_delay_secs: u64,
durable_name: String,
metrics: Option<track::MetricsRegistry>,
}
impl NatsConsumer {
pub async fn new(
config: &AppConfig,
group_id: &str,
) -> anyhow::Result<Self> {
let jetstream = connect_jetstream(config).await?;
let stream = ensure_stream(config, &jetstream).await?;
let producer = NatsProducer::new(config).await?;
Ok(Self {
stream,
producer,
max_deliver: config.nats_max_deliver(),
retry_delay_secs: config.nats_retry_delay_secs(),
durable_name: durable_name(group_id),
metrics: None,
})
}
pub fn set_metrics(&mut self, registry: track::MetricsRegistry) {
self.producer.set_metrics(registry.clone());
self.metrics = Some(registry);
}
pub async fn start_consuming<H>(
&self,
topics: &[&str],
handler: H,
) -> anyhow::Result<()>
where
H: MessageHandler + 'static,
{
let topics_owned: Vec<String> =
topics.iter().map(|topic| topic.to_string()).collect();
let consumer = self
.stream
.get_or_create_consumer(
&self.durable_name,
jetstream::consumer::pull::Config {
durable_name: Some(self.durable_name.clone()),
ack_wait: Duration::from_secs(self.retry_delay_secs),
max_deliver: self.max_deliver,
filter_subjects: topics_owned.clone(),
..Default::default()
},
)
.await?;
info!(topics = ?topics_owned, durable = %self.durable_name, "NATS consumer started");
let producer = self.producer.clone();
let metrics = self.metrics.clone();
let max_deliver = self.max_deliver;
let retry_delay_secs = self.retry_delay_secs;
let handler = Arc::new(handler);
tokio::spawn(async move {
let messages = consumer.messages().await;
let mut messages = match messages {
Ok(messages) => messages,
Err(error) => {
error!(error = %error, "NATS error while opening consumer stream");
return;
}
};
while let Some(message_result) = messages.next().await {
match message_result {
Ok(message) => {
handle_message(
&producer,
metrics.as_ref(),
max_deliver,
retry_delay_secs,
handler.as_ref(),
message,
)
.await;
}
Err(error) => {
error!(error = %error, "NATS error while consuming");
}
}
}
});
Ok(())
}
}
async fn handle_message<H>(
producer: &NatsProducer,
metrics: Option<&track::MetricsRegistry>,
max_deliver: i64,
retry_delay_secs: u64,
handler: &H,
message: jetstream::Message,
) where
H: MessageHandler + ?Sized,
{
let subject = message.subject.to_string();
let payload = message.payload.clone();
let delivered = message.info().map(|info| info.delivered).unwrap_or(1);
record_queue_message(metrics, &subject, "received");
match handler.handle(&subject, &payload).await {
AckAction::Ack => {
ack_message(metrics, &message, &subject, "message").await
}
AckAction::Nack => {
record_queue_message(metrics, &subject, "nack");
if let Err(error) = handle_nack(
producer,
metrics,
&message,
&subject,
&payload,
delivered,
max_deliver,
retry_delay_secs,
)
.await
{
record_queue_message(metrics, &subject, "error");
error!(
subject = %subject,
delivered,
max_deliver,
error = %error,
"failed to route NACKed message"
);
}
}
}
}
async fn handle_nack(
producer: &NatsProducer,
metrics: Option<&track::MetricsRegistry>,
message: &jetstream::Message,
subject: &str,
payload: &[u8],
delivered: i64,
max_deliver: i64,
retry_delay_secs: u64,
) -> anyhow::Result<()> {
if delivered < max_deliver {
warn!(
subject,
delivered,
max_deliver,
retry_delay_secs,
"message NACKed, scheduling retry"
);
message
.ack_with(jetstream::AckKind::Nak(Some(Duration::from_secs(
retry_delay_secs,
))))
.await
.map_err(|error| {
anyhow::anyhow!("failed to nack message: {error}")
})?;
record_queue_message(metrics, subject, "retry");
return Ok(());
}
let dlq_subject = format!("{subject}.dlq");
error!(
subject,
dlq_subject = %dlq_subject,
delivered,
max_deliver,
"message exceeded max deliver attempts, routing to DLQ"
);
let mut headers = HeaderMap::new();
headers.append("x-original-subject", subject);
headers.append("x-delivered-count", delivered.to_string());
producer
.send_raw(&dlq_subject, "", payload, Some(headers))
.await
.map_err(|error| {
anyhow::anyhow!(
"failed to send DLQ message to {dlq_subject}: {error}"
)
})?;
message.ack().await.map_err(|error| {
anyhow::anyhow!("failed to ack DLQ message: {error}")
})?;
record_queue_message(metrics, subject, "dlq");
record_queue_dlq(metrics, subject);
Ok(())
}
async fn ack_message(
metrics: Option<&track::MetricsRegistry>,
message: &jetstream::Message,
subject: &str,
description: &str,
) {
match message.ack().await {
Ok(()) => record_queue_message(metrics, subject, "ack"),
Err(error) => {
record_queue_message(metrics, subject, "ack_error");
error!(
subject,
description,
error = %error,
"failed to ack message"
);
}
}
}
fn record_queue_message(
metrics: Option<&track::MetricsRegistry>,
topic: &str,
status: &str,
) {
if let Some(metrics) = metrics {
queue_messages_vec(metrics)
.with_label_values(&[topic, status])
.inc();
}
}
fn record_queue_dlq(metrics: Option<&track::MetricsRegistry>, topic: &str) {
if let Some(metrics) = metrics {
queue_dlq_vec(metrics).with_label_values(&[topic]).inc();
}
}
fn queue_messages_vec(registry: &track::MetricsRegistry) -> CounterVec {
registry
.register_counter_vec(
"queue_messages_total",
"Total queue messages",
&["topic", "status"],
)
.expect("failed to register queue_messages_total")
}
fn queue_dlq_vec(registry: &track::MetricsRegistry) -> CounterVec {
registry
.register_counter_vec(
"queue_dlq_total",
"Total messages routed to DLQ",
&["topic"],
)
.expect("failed to register queue_dlq_total")
}
fn durable_name(name: &str) -> String {
name.replace('.', "-")
}