//! Room message worker: NATS JetStream durable pull consumer. use crate::types::{EmailEnvelope, RoomMessageEvent, RoomMessageEnvelope}; use futures::StreamExt; use metrics::counter; use std::sync::Arc; const BATCH_SIZE: usize = 50; const FLUSH_INTERVAL_SECS: u64 = 1; /// NATS consumer function type: returns (message_data, ack_fn) pairs. pub type NatsConsumeFn = Arc< dyn Fn( String, usize, ) -> std::pin::Pin< Box< dyn std::future::Future< Output = anyhow::Result< Vec<( Vec, Box std::pin::Pin> + Send>> + Send>, )>, >, > + Send, >, > + Send + Sync, >; /// Function that persists a batch of room message envelopes to the database. pub type PersistFn = Arc) -> PersistFut + Send + Sync>; pub type PersistFut = std::pin::Pin> + Send>>; /// Start the room message worker that consumes from NATS JetStream per room. pub async fn start( nats: Option>, _persist_fn: PersistFn, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, ) { let Some(_nats) = nats else { tracing::warn!("NATS not available, room workers disabled"); let _ = shutdown_rx.recv().await; return; }; tracing::info!("room-message worker starting"); loop { tokio::select! { _ = shutdown_rx.recv() => { tracing::info!("room-message worker shutting down"); break; } _ = tokio::time::sleep(std::time::Duration::from_secs(FLUSH_INTERVAL_SECS)) => {} } // Workers are spawned lazily per room when rooms are accessed. // This start() just keeps the worker alive; actual room workers // are spawned by spawn_room_workers() in workers_spawn.rs. } } /// Room worker that consumes from NATS JetStream for a specific room. pub async fn room_worker_task( room_id: uuid::Uuid, nats: Arc, persist_fn: PersistFn, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, ) { let stream_name = nats.stream_name().to_string(); let subject = format!("room.message.{}", room_id); let durable = format!("worker-{}", room_id); tracing::info!(room_id = %room_id, durable = %durable, subject = %subject, "room worker task starting"); loop { tokio::select! { _ = shutdown_rx.recv() => { tracing::info!(room_id = %room_id, "room worker task shutting down"); break; } result = consume_once(&nats, &stream_name, &durable, &subject, &persist_fn) => { match result { Ok(0) => {} Ok(n) => tracing::debug!(room_id = %room_id, n = n, "batch persisted"), Err(e) => { tracing::error!(room_id = %room_id, error = %e, "JetStream consume error"); tokio::time::sleep(std::time::Duration::from_secs(2)).await; } } } } } } async fn consume_once( nats: &crate::NatsClient, stream_name: &str, durable: &str, subject: &str, persist_fn: &PersistFn, ) -> anyhow::Result { let stream = nats .jetstream .get_stream(stream_name) .await .map_err(|e| anyhow::anyhow!("get stream failed: {}", e))?; let pull_config = async_nats::jetstream::consumer::pull::Config { durable_name: Some(durable.to_string()), filter_subject: subject.to_string(), max_deliver: 3, ack_wait: std::time::Duration::from_secs(10), max_ack_pending: 256, ..Default::default() }; let consumer = stream .get_or_create_consumer(durable, pull_config) .await .map_err(|e| anyhow::anyhow!("create consumer failed: {}", e))?; let mut messages = consumer .messages() .await .map_err(|e| anyhow::anyhow!("consumer messages failed: {}", e))?; let mut batch: Vec = Vec::with_capacity(BATCH_SIZE); let mut acks: Vec = Vec::new(); // Fetch up to BATCH_SIZE messages for _ in 0..BATCH_SIZE { match tokio::time::timeout( std::time::Duration::from_millis(500), messages.next(), ) .await { Ok(Some(Ok(msg))) => { match serde_json::from_slice::(&msg.payload) { Ok(event) => { let env = RoomMessageEnvelope::from(event); batch.push(env); acks.push(msg); } Err(e) => { tracing::warn!(error = %e, "malformed envelope"); let _ = msg.ack().await; } } } Ok(Some(Err(e))) => { tracing::warn!(error = %e, "message error"); } Ok(None) => break, Err(_) => break, } } if batch.is_empty() { return Ok(0); } let batch_size = batch.len(); if let Err(e) = persist_fn(batch).await { tracing::error!(error = %e, "persist_fn failed — entries NOT acked (will retry)"); return Err(e); } for msg in &acks { if let Err(e) = msg.ack().await { tracing::warn!(error = %e, "ack failed"); } } tracing::info!(n = batch_size, "batch persisted and acked"); Ok(batch_size) } /// Email send function type. pub type EmailSendFn = Arc) -> EmailSendFut + Send + Sync>; pub type EmailSendFut = std::pin::Pin> + Send>>; /// Start the email worker that consumes from NATS JetStream. pub async fn start_email_worker( nats: Option>, send_fn: EmailSendFn, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, ) { let Some(nats) = nats else { tracing::warn!("NATS not available, email worker disabled"); let _ = shutdown_rx.recv().await; return; }; let stream_name = nats.stream_name().to_string(); let consumer = format!("email-worker-{}", uuid::Uuid::new_v4()); tracing::info!(consumer = %consumer, "email worker starting"); loop { tokio::select! { _ = shutdown_rx.recv() => { tracing::info!("email worker shutting down"); break; } _ = tokio::time::sleep(std::time::Duration::from_secs(FLUSH_INTERVAL_SECS)) => {} result = email_consume_once(&nats, &stream_name, &consumer, &send_fn) => { match result { Ok(0) => {} Ok(n) => tracing::debug!(n = n, "email batch processed"), Err(e) => { tracing::error!(error = %e, "email JetStream consume error"); tokio::time::sleep(std::time::Duration::from_secs(2)).await; } } } } } } async fn email_consume_once( nats: &crate::NatsClient, stream_name: &str, consumer_name: &str, send_fn: &EmailSendFn, ) -> anyhow::Result { let stream = nats .jetstream .get_stream(stream_name) .await .map_err(|e| anyhow::anyhow!("get stream failed: {}", e))?; let pull_config = async_nats::jetstream::consumer::pull::Config { durable_name: Some(consumer_name.to_string()), filter_subject: "email.queue".to_string(), max_deliver: 3, ack_wait: std::time::Duration::from_secs(10), max_ack_pending: 256, ..Default::default() }; let consumer = stream .get_or_create_consumer(consumer_name, pull_config) .await .map_err(|e| anyhow::anyhow!("create consumer failed: {}", e))?; let mut messages = consumer .messages() .await .map_err(|e| anyhow::anyhow!("consumer messages failed: {}", e))?; let mut batch: Vec = Vec::with_capacity(BATCH_SIZE); let mut acks: Vec = Vec::new(); for _ in 0..BATCH_SIZE { match tokio::time::timeout( std::time::Duration::from_millis(500), messages.next(), ) .await { Ok(Some(Ok(msg))) => { match serde_json::from_slice::(&msg.payload) { Ok(env) => { batch.push(env); acks.push(msg); } Err(e) => { tracing::warn!(error = %e, "malformed email envelope"); let _ = msg.ack().await; } } } Ok(Some(Err(e))) => { tracing::warn!(error = %e, "email message error"); } Ok(None) => break, Err(_) => break, } } if batch.is_empty() { return Ok(0); } let batch_size = batch.len(); counter!("email_consumed_total").increment(batch_size as u64); counter!("email_batch_size").increment(batch_size as u64); if let Err(e) = send_fn(batch).await { tracing::error!(error = %e, "email send_fn failed — messages NOT acked (will retry)"); return Err(e); } for msg in &acks { if let Err(e) = msg.ack().await { tracing::warn!(error = %e, "NATS ack failed"); } } tracing::info!(n = batch_size, "email batch sent and acked"); Ok(batch_size) }