gitdataai/libs/transport/bus.rs

228 lines
8.4 KiB
Rust

use async_nats::jetstream;
use config::AppConfig;
use tokio::sync::mpsc;
use tracing::{debug, info, warn};
use crate::error::AppTransportError;
pub trait Transport: Send + Sync {
fn publish(
&self,
subject: &str,
payload: &[u8],
) -> impl std::future::Future<Output = Result<(), AppTransportError>> + Send;
fn subscribe(
&self,
subject: &str,
) -> impl std::future::Future<Output = mpsc::Receiver<Vec<u8>>> + Send;
}
pub struct NatsTransport {
jetstream: jetstream::Context,
stream_name: String,
}
impl NatsTransport {
pub async fn connect(config: &AppConfig) -> Result<Self, AppTransportError> {
let url = config
.nats_url()
.ok_or_else(|| AppTransportError::Internal)?;
let token = config
.nats_token()
.ok_or_else(|| AppTransportError::Internal)?;
let opts = async_nats::ConnectOptions::with_token(token)
.retry_on_initial_connect()
.connection_timeout(std::time::Duration::from_secs(10))
.reconnect_delay_callback(|attempts| {
let base = std::time::Duration::from_secs(1);
let delay = base.saturating_mul(2u32.saturating_pow(attempts as u32));
std::cmp::min(delay, std::time::Duration::from_secs(30))
})
.event_callback(|event| async move {
match event {
async_nats::Event::Connected => debug!("NATS connected"),
async_nats::Event::Disconnected => warn!("NATS disconnected, reconnecting"),
async_nats::Event::ServerError(e) => warn!(error = %e, "NATS server error"),
_ => {}
}
});
let client = opts.connect(&url).await.map_err(|e| {
warn!(error = %e, "NATS connect failed");
AppTransportError::Internal
})?;
let jetstream = jetstream::new(client);
let stream_config = jetstream::stream::Config {
name: config.nats_stream_name(),
subjects: vec!["room.events.>".to_string()],
retention: jetstream::stream::RetentionPolicy::WorkQueue,
max_age: std::time::Duration::from_secs(config.nats_max_age_secs()),
storage: jetstream::stream::StorageType::Memory,
..Default::default()
};
jetstream
.get_or_create_stream(stream_config)
.await
.map_err(|e| {
warn!(error = %e, "Failed to create JetStream stream");
AppTransportError::Internal
})?;
info!(stream = %config.nats_stream_name(), "JetStream stream ready");
Ok(Self {
jetstream,
stream_name: config.nats_stream_name(),
})
}
}
impl Transport for NatsTransport {
async fn publish(&self, subject: &str, payload: &[u8]) -> Result<(), AppTransportError> {
let ack = self
.jetstream
.publish(subject.to_string(), payload.to_vec().into())
.await
.map_err(|e| {
warn!(error = %e, subject = %subject, "NATS publish failed");
AppTransportError::Internal
})?;
ack.await.map_err(|e| {
warn!(error = %e, "NATS publish ack failed");
AppTransportError::Internal
})?;
Ok(())
}
fn subscribe(
&self,
subject: &str,
) -> impl std::future::Future<Output = mpsc::Receiver<Vec<u8>>> + Send {
let subject = subject.to_string();
let jetstream = self.jetstream.clone();
let stream_name = self.stream_name.clone();
let buffer_size = 256;
async move {
let (tx, rx) = mpsc::channel(buffer_size);
let stream = match jetstream.get_stream(&stream_name).await {
Ok(s) => s,
Err(e) => {
warn!(error = %e, "Failed to get stream for subscription");
return rx;
}
};
let consumer_name = subject
.replace(['.', '>'], "-")
.trim_end_matches('-')
.to_string();
// Generate a unique instance-specific suffix to prevent competition in multi-node setups.
// Using a short UUID-based string for reliability across dependency versions.
let instance_id = uuid::Uuid::new_v4().to_string();
let instance_id_short = &instance_id[..8];
let durable = if consumer_name.is_empty() {
format!("room-events-default-{}", instance_id_short)
} else {
format!("room-events-sub-{}-{}", consumer_name, instance_id_short)
};
let config = async_nats::jetstream::consumer::pull::Config {
durable_name: Some(durable.clone()),
filter_subject: subject.clone(),
max_deliver: 3,
ack_wait: std::time::Duration::from_secs(10),
// Ensure temporary consumers are cleaned up by the server after inactivity.
inactive_threshold: std::time::Duration::from_secs(3600),
..Default::default()
};
let mut messages = match stream
.get_or_create_consumer(&durable, config.clone())
.await
{
Ok(c) => match c.messages().await {
Ok(m) => m,
Err(e) => {
warn!(error = %e, "Failed to start consumer message stream");
return rx;
}
},
Err(e) => {
warn!(error = %e, "Failed to create subscriber consumer");
return rx;
}
};
tokio::spawn(async move {
use futures_util::StreamExt;
const MAX_RECONNECT_RETRIES: u32 = 50;
let mut reconnect_retries: u32 = 0;
loop {
while let Some(result) = messages.next().await {
match result {
Ok(msg) => {
if tx.send(msg.payload.to_vec()).await.is_err() {
debug!("NATS subscriber channel closed");
return;
}
let _ = msg.ack().await;
}
Err(e) => warn!(error = %e, "NATS consumer message error"),
}
}
reconnect_retries += 1;
if reconnect_retries >= MAX_RECONNECT_RETRIES {
warn!(subject = %subject, "NATS consumer reconnect limit exceeded, giving up");
return;
}
warn!(subject = %subject, retry = reconnect_retries, "NATS consumer stream ended, reconnecting");
let mut delay = std::time::Duration::from_secs(1);
let max_delay = std::time::Duration::from_secs(30);
loop {
tokio::time::sleep(delay).await;
match stream
.get_or_create_consumer(&durable, config.clone())
.await
{
Ok(new_consumer) => match new_consumer.messages().await {
Ok(new_messages) => {
info!(subject = %subject, "NATS consumer reconnected");
messages = new_messages;
reconnect_retries = 0;
break;
}
Err(e) => {
warn!(subject = %subject, error = %e, "Failed to get messages from reconnected NATS consumer");
}
},
Err(e) => {
warn!(subject = %subject, error = %e, "Failed to recreate NATS consumer in reconnect loop");
}
}
warn!(delay = ?delay, "Reconnect failed, retrying");
delay = std::cmp::min(delay.saturating_mul(2), max_delay);
}
}
});
rx
}
}
}