207 lines
5.8 KiB
Rust
207 lines
5.8 KiB
Rust
use std::{sync::Arc, time::Duration};
|
|
|
|
use async_nats::{HeaderMap, jetstream};
|
|
use config::AppConfig;
|
|
use futures_util::StreamExt;
|
|
use tracing::{error, info, warn};
|
|
|
|
use crate::{
|
|
handler::{AckAction, MessageHandler},
|
|
producer::{NatsProducer, connect_jetstream, ensure_stream},
|
|
};
|
|
|
|
pub struct NatsConsumer {
|
|
stream: jetstream::stream::Stream,
|
|
producer: NatsProducer,
|
|
max_deliver: i64,
|
|
retry_delay_secs: u64,
|
|
durable_name: String,
|
|
}
|
|
|
|
impl NatsConsumer {
|
|
pub async fn new(
|
|
config: &AppConfig,
|
|
group_id: &str,
|
|
) -> anyhow::Result<Self> {
|
|
let jetstream = connect_jetstream(config).await?;
|
|
let stream = ensure_stream(config, &jetstream).await?;
|
|
let producer = NatsProducer::new(config).await?;
|
|
|
|
Ok(Self {
|
|
stream,
|
|
producer,
|
|
max_deliver: config.nats_max_deliver(),
|
|
retry_delay_secs: config.nats_retry_delay_secs(),
|
|
durable_name: durable_name(group_id),
|
|
})
|
|
}
|
|
|
|
pub async fn start_consuming<H>(
|
|
&self,
|
|
topics: &[&str],
|
|
handler: H,
|
|
) -> anyhow::Result<()>
|
|
where
|
|
H: MessageHandler + 'static,
|
|
{
|
|
let topics_owned: Vec<String> =
|
|
topics.iter().map(|topic| topic.to_string()).collect();
|
|
|
|
let consumer = self
|
|
.stream
|
|
.get_or_create_consumer(
|
|
&self.durable_name,
|
|
jetstream::consumer::pull::Config {
|
|
durable_name: Some(self.durable_name.clone()),
|
|
ack_wait: Duration::from_secs(self.retry_delay_secs),
|
|
max_deliver: self.max_deliver,
|
|
filter_subjects: topics_owned.clone(),
|
|
..Default::default()
|
|
},
|
|
)
|
|
.await?;
|
|
|
|
info!("NATS consumer started subscribing to: {:?}", topics_owned);
|
|
|
|
let producer = self.producer.clone();
|
|
let max_deliver = self.max_deliver;
|
|
let retry_delay_secs = self.retry_delay_secs;
|
|
let handler = Arc::new(handler);
|
|
|
|
tokio::spawn(async move {
|
|
let messages = consumer.messages().await;
|
|
let mut messages = match messages {
|
|
Ok(messages) => messages,
|
|
Err(error) => {
|
|
error!(
|
|
"NATS error while opening consumer stream: {:?}",
|
|
error
|
|
);
|
|
return;
|
|
}
|
|
};
|
|
|
|
while let Some(message_result) = messages.next().await {
|
|
match message_result {
|
|
Ok(message) => {
|
|
handle_message(
|
|
&producer,
|
|
max_deliver,
|
|
retry_delay_secs,
|
|
handler.as_ref(),
|
|
message,
|
|
)
|
|
.await;
|
|
}
|
|
Err(error) => {
|
|
error!("NATS error while consuming: {:?}", error);
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
async fn handle_message<H>(
|
|
producer: &NatsProducer,
|
|
max_deliver: i64,
|
|
retry_delay_secs: u64,
|
|
handler: &H,
|
|
message: jetstream::Message,
|
|
) where
|
|
H: MessageHandler + ?Sized,
|
|
{
|
|
let subject = message.subject.to_string();
|
|
let payload = message.payload.clone();
|
|
let delivered = message.info().map(|info| info.delivered).unwrap_or(1);
|
|
|
|
match handler.handle(&subject, &payload).await {
|
|
AckAction::Ack => ack_message(&message, &subject, "message").await,
|
|
AckAction::Nack => {
|
|
if let Err(error) = handle_nack(
|
|
producer,
|
|
&message,
|
|
&subject,
|
|
&payload,
|
|
delivered,
|
|
max_deliver,
|
|
retry_delay_secs,
|
|
)
|
|
.await
|
|
{
|
|
error!(
|
|
"Failed to route NACKed message from subject {}: {:?}",
|
|
subject, error
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn handle_nack(
|
|
producer: &NatsProducer,
|
|
message: &jetstream::Message,
|
|
subject: &str,
|
|
payload: &[u8],
|
|
delivered: i64,
|
|
max_deliver: i64,
|
|
retry_delay_secs: u64,
|
|
) -> anyhow::Result<()> {
|
|
if delivered < max_deliver {
|
|
warn!(
|
|
"Message in subject {} failed (NACK). Retrying delivery {}/{} in {} seconds",
|
|
subject, delivered, max_deliver, retry_delay_secs
|
|
);
|
|
message
|
|
.ack_with(jetstream::AckKind::Nak(Some(Duration::from_secs(
|
|
retry_delay_secs,
|
|
))))
|
|
.await
|
|
.map_err(|error| {
|
|
anyhow::anyhow!("failed to nack message: {error}")
|
|
})?;
|
|
return Ok(());
|
|
}
|
|
|
|
let dlq_subject = format!("{subject}.dlq");
|
|
error!(
|
|
"Message in subject {} exceeded max deliver attempts ({}). Routing to DLQ: {}",
|
|
subject, max_deliver, dlq_subject
|
|
);
|
|
|
|
let mut headers = HeaderMap::new();
|
|
headers.append("x-original-subject", subject);
|
|
headers.append("x-delivered-count", delivered.to_string());
|
|
producer
|
|
.send_raw(&dlq_subject, "", payload, Some(headers))
|
|
.await
|
|
.map_err(|error| {
|
|
anyhow::anyhow!(
|
|
"failed to send DLQ message to {dlq_subject}: {error}"
|
|
)
|
|
})?;
|
|
message.ack().await.map_err(|error| {
|
|
anyhow::anyhow!("failed to ack DLQ message: {error}")
|
|
})?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn ack_message(
|
|
message: &jetstream::Message,
|
|
subject: &str,
|
|
description: &str,
|
|
) {
|
|
if let Err(error) = message.ack().await {
|
|
error!(
|
|
"Failed to ack {} in subject {}: {:?}",
|
|
description, subject, error
|
|
);
|
|
}
|
|
}
|
|
|
|
fn durable_name(name: &str) -> String {
|
|
name.replace('.', "-")
|
|
}
|