gitdataai/lib/queue/consumer.rs
2026-05-30 01:38:40 +08:00

207 lines
5.8 KiB
Rust

use std::{sync::Arc, time::Duration};
use async_nats::{HeaderMap, jetstream};
use config::AppConfig;
use futures_util::StreamExt;
use tracing::{error, info, warn};
use crate::{
handler::{AckAction, MessageHandler},
producer::{NatsProducer, connect_jetstream, ensure_stream},
};
pub struct NatsConsumer {
stream: jetstream::stream::Stream,
producer: NatsProducer,
max_deliver: i64,
retry_delay_secs: u64,
durable_name: String,
}
impl NatsConsumer {
pub async fn new(
config: &AppConfig,
group_id: &str,
) -> anyhow::Result<Self> {
let jetstream = connect_jetstream(config).await?;
let stream = ensure_stream(config, &jetstream).await?;
let producer = NatsProducer::new(config).await?;
Ok(Self {
stream,
producer,
max_deliver: config.nats_max_deliver(),
retry_delay_secs: config.nats_retry_delay_secs(),
durable_name: durable_name(group_id),
})
}
pub async fn start_consuming<H>(
&self,
topics: &[&str],
handler: H,
) -> anyhow::Result<()>
where
H: MessageHandler + 'static,
{
let topics_owned: Vec<String> =
topics.iter().map(|topic| topic.to_string()).collect();
let consumer = self
.stream
.get_or_create_consumer(
&self.durable_name,
jetstream::consumer::pull::Config {
durable_name: Some(self.durable_name.clone()),
ack_wait: Duration::from_secs(self.retry_delay_secs),
max_deliver: self.max_deliver,
filter_subjects: topics_owned.clone(),
..Default::default()
},
)
.await?;
info!("NATS consumer started subscribing to: {:?}", topics_owned);
let producer = self.producer.clone();
let max_deliver = self.max_deliver;
let retry_delay_secs = self.retry_delay_secs;
let handler = Arc::new(handler);
tokio::spawn(async move {
let messages = consumer.messages().await;
let mut messages = match messages {
Ok(messages) => messages,
Err(error) => {
error!(
"NATS error while opening consumer stream: {:?}",
error
);
return;
}
};
while let Some(message_result) = messages.next().await {
match message_result {
Ok(message) => {
handle_message(
&producer,
max_deliver,
retry_delay_secs,
handler.as_ref(),
message,
)
.await;
}
Err(error) => {
error!("NATS error while consuming: {:?}", error);
}
}
}
});
Ok(())
}
}
async fn handle_message<H>(
producer: &NatsProducer,
max_deliver: i64,
retry_delay_secs: u64,
handler: &H,
message: jetstream::Message,
) where
H: MessageHandler + ?Sized,
{
let subject = message.subject.to_string();
let payload = message.payload.clone();
let delivered = message.info().map(|info| info.delivered).unwrap_or(1);
match handler.handle(&subject, &payload).await {
AckAction::Ack => ack_message(&message, &subject, "message").await,
AckAction::Nack => {
if let Err(error) = handle_nack(
producer,
&message,
&subject,
&payload,
delivered,
max_deliver,
retry_delay_secs,
)
.await
{
error!(
"Failed to route NACKed message from subject {}: {:?}",
subject, error
);
}
}
}
}
async fn handle_nack(
producer: &NatsProducer,
message: &jetstream::Message,
subject: &str,
payload: &[u8],
delivered: i64,
max_deliver: i64,
retry_delay_secs: u64,
) -> anyhow::Result<()> {
if delivered < max_deliver {
warn!(
"Message in subject {} failed (NACK). Retrying delivery {}/{} in {} seconds",
subject, delivered, max_deliver, retry_delay_secs
);
message
.ack_with(jetstream::AckKind::Nak(Some(Duration::from_secs(
retry_delay_secs,
))))
.await
.map_err(|error| {
anyhow::anyhow!("failed to nack message: {error}")
})?;
return Ok(());
}
let dlq_subject = format!("{subject}.dlq");
error!(
"Message in subject {} exceeded max deliver attempts ({}). Routing to DLQ: {}",
subject, max_deliver, dlq_subject
);
let mut headers = HeaderMap::new();
headers.append("x-original-subject", subject);
headers.append("x-delivered-count", delivered.to_string());
producer
.send_raw(&dlq_subject, "", payload, Some(headers))
.await
.map_err(|error| {
anyhow::anyhow!(
"failed to send DLQ message to {dlq_subject}: {error}"
)
})?;
message.ack().await.map_err(|error| {
anyhow::anyhow!("failed to ack DLQ message: {error}")
})?;
Ok(())
}
async fn ack_message(
message: &jetstream::Message,
subject: &str,
description: &str,
) {
if let Err(error) = message.ack().await {
error!(
"Failed to ack {} in subject {}: {:?}",
description, subject, error
);
}
}
fn durable_name(name: &str) -> String {
name.replace('.', "-")
}