gitdataai/libs/queue/worker.rs

301 lines
9.9 KiB
Rust

//! Room message worker: NATS JetStream durable pull consumer.
use crate::types::{EmailEnvelope, RoomMessageEnvelope, RoomMessageEvent};
use futures::StreamExt;
use metrics::counter;
use std::sync::Arc;
const BATCH_SIZE: usize = 50;
const FLUSH_INTERVAL_SECS: u64 = 1;
/// NATS consumer function type: returns (message_data, ack_fn) pairs.
pub type NatsConsumeFn = Arc<
dyn Fn(
String,
usize,
) -> std::pin::Pin<
Box<
dyn std::future::Future<
Output = anyhow::Result<
Vec<(
Vec<u8>,
Box<
dyn Fn() -> std::pin::Pin<
Box<
dyn std::future::Future<Output = anyhow::Result<()>>
+ Send,
>,
> + Send,
>,
)>,
>,
> + Send,
>,
> + Send
+ Sync,
>;
/// Function that persists a batch of room message envelopes to the database.
pub type PersistFn = Arc<dyn Fn(Vec<RoomMessageEnvelope>) -> PersistFut + Send + Sync>;
pub type PersistFut =
std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send>>;
/// Start the room message worker that consumes from NATS JetStream per room.
pub async fn start(
nats: Option<Arc<crate::NatsClient>>,
_persist_fn: PersistFn,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) {
let Some(_nats) = nats else {
tracing::warn!("NATS not available, room workers disabled");
let _ = shutdown_rx.recv().await;
return;
};
tracing::info!("room-message worker starting");
loop {
tokio::select! {
_ = shutdown_rx.recv() => {
tracing::info!("room-message worker shutting down");
break;
}
_ = tokio::time::sleep(std::time::Duration::from_secs(FLUSH_INTERVAL_SECS)) => {}
}
// Workers are spawned lazily per room when rooms are accessed.
// This start() just keeps the worker alive; actual room workers
// are spawned by spawn_room_workers() in workers_spawn.rs.
}
}
/// Room worker that consumes from NATS JetStream for a specific room.
pub async fn room_worker_task(
room_id: uuid::Uuid,
nats: Arc<crate::NatsClient>,
persist_fn: PersistFn,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) {
let stream_name = nats.stream_name().to_string();
let subject = format!("room.message.{}", room_id);
let durable = format!("worker-{}", room_id);
tracing::info!(room_id = %room_id, durable = %durable, subject = %subject, "room worker task starting");
loop {
tokio::select! {
_ = shutdown_rx.recv() => {
tracing::info!(room_id = %room_id, "room worker task shutting down");
break;
}
result = consume_once(&nats, &stream_name, &durable, &subject, &persist_fn) => {
match result {
Ok(0) => {}
Ok(n) => tracing::debug!(room_id = %room_id, n = n, "batch persisted"),
Err(e) => {
tracing::error!(room_id = %room_id, error = %e, "JetStream consume error");
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
}
}
}
}
}
async fn consume_once(
nats: &crate::NatsClient,
stream_name: &str,
durable: &str,
subject: &str,
persist_fn: &PersistFn,
) -> anyhow::Result<usize> {
let stream = nats
.jetstream
.get_stream(stream_name)
.await
.map_err(|e| anyhow::anyhow!("get stream failed: {}", e))?;
let pull_config = async_nats::jetstream::consumer::pull::Config {
durable_name: Some(durable.to_string()),
filter_subject: subject.to_string(),
max_deliver: 3,
ack_wait: std::time::Duration::from_secs(10),
max_ack_pending: 256,
..Default::default()
};
let consumer = stream
.get_or_create_consumer(durable, pull_config)
.await
.map_err(|e| anyhow::anyhow!("create consumer failed: {}", e))?;
let mut messages = consumer
.messages()
.await
.map_err(|e| anyhow::anyhow!("consumer messages failed: {}", e))?;
let mut batch: Vec<RoomMessageEnvelope> = Vec::with_capacity(BATCH_SIZE);
let mut acks: Vec<async_nats::jetstream::message::Message> = Vec::new();
// Fetch up to BATCH_SIZE messages
for _ in 0..BATCH_SIZE {
match tokio::time::timeout(std::time::Duration::from_millis(500), messages.next()).await {
Ok(Some(Ok(msg))) => match serde_json::from_slice::<RoomMessageEvent>(&msg.payload) {
Ok(event) => {
let env = RoomMessageEnvelope::from(event);
batch.push(env);
acks.push(msg);
}
Err(e) => {
tracing::warn!(error = %e, "malformed envelope");
let _ = msg.ack().await;
}
},
Ok(Some(Err(e))) => {
tracing::warn!(error = %e, "message error");
}
Ok(None) => break,
Err(_) => break,
}
}
if batch.is_empty() {
return Ok(0);
}
let batch_size = batch.len();
if let Err(e) = persist_fn(batch).await {
tracing::error!(error = %e, "persist_fn failed — entries NOT acked (will retry)");
return Err(e);
}
for msg in &acks {
if let Err(e) = msg.ack().await {
tracing::warn!(error = %e, "ack failed");
}
}
tracing::info!(n = batch_size, "batch persisted and acked");
Ok(batch_size)
}
/// Email send function type.
pub type EmailSendFn = Arc<dyn Fn(Vec<EmailEnvelope>) -> EmailSendFut + Send + Sync>;
pub type EmailSendFut =
std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send>>;
/// Start the email worker that consumes from NATS JetStream.
pub async fn start_email_worker(
nats: Option<Arc<crate::NatsClient>>,
send_fn: EmailSendFn,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) {
let Some(nats) = nats else {
tracing::warn!("NATS not available, email worker disabled");
let _ = shutdown_rx.recv().await;
return;
};
let stream_name = nats.stream_name().to_string();
let consumer = format!("email-worker-{}", uuid::Uuid::new_v4());
tracing::info!(consumer = %consumer, "email worker starting");
loop {
tokio::select! {
_ = shutdown_rx.recv() => {
tracing::info!("email worker shutting down");
break;
}
_ = tokio::time::sleep(std::time::Duration::from_secs(FLUSH_INTERVAL_SECS)) => {}
result = email_consume_once(&nats, &stream_name, &consumer, &send_fn) => {
match result {
Ok(0) => {}
Ok(n) => tracing::debug!(n = n, "email batch processed"),
Err(e) => {
tracing::error!(error = %e, "email JetStream consume error");
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
}
}
}
}
}
async fn email_consume_once(
nats: &crate::NatsClient,
stream_name: &str,
consumer_name: &str,
send_fn: &EmailSendFn,
) -> anyhow::Result<usize> {
let stream = nats
.jetstream
.get_stream(stream_name)
.await
.map_err(|e| anyhow::anyhow!("get stream failed: {}", e))?;
let pull_config = async_nats::jetstream::consumer::pull::Config {
durable_name: Some(consumer_name.to_string()),
filter_subject: "email.queue".to_string(),
max_deliver: 3,
ack_wait: std::time::Duration::from_secs(10),
max_ack_pending: 256,
..Default::default()
};
let consumer = stream
.get_or_create_consumer(consumer_name, pull_config)
.await
.map_err(|e| anyhow::anyhow!("create consumer failed: {}", e))?;
let mut messages = consumer
.messages()
.await
.map_err(|e| anyhow::anyhow!("consumer messages failed: {}", e))?;
let mut batch: Vec<EmailEnvelope> = Vec::with_capacity(BATCH_SIZE);
let mut acks: Vec<async_nats::jetstream::message::Message> = Vec::new();
for _ in 0..BATCH_SIZE {
match tokio::time::timeout(std::time::Duration::from_millis(500), messages.next()).await {
Ok(Some(Ok(msg))) => match serde_json::from_slice::<EmailEnvelope>(&msg.payload) {
Ok(env) => {
batch.push(env);
acks.push(msg);
}
Err(e) => {
tracing::warn!(error = %e, "malformed email envelope");
let _ = msg.ack().await;
}
},
Ok(Some(Err(e))) => {
tracing::warn!(error = %e, "email message error");
}
Ok(None) => break,
Err(_) => break,
}
}
if batch.is_empty() {
return Ok(0);
}
let batch_size = batch.len();
counter!("email_consumed_total").increment(batch_size as u64);
counter!("email_batch_size").increment(batch_size as u64);
if let Err(e) = send_fn(batch).await {
tracing::error!(error = %e, "email send_fn failed — messages NOT acked (will retry)");
return Err(e);
}
for msg in &acks {
if let Err(e) = msg.ack().await {
tracing::warn!(error = %e, "NATS ack failed");
}
}
tracing::info!(n = batch_size, "email batch sent and acked");
Ok(batch_size)
}