//! Redis Streams consumer — delegates persistence to the caller. use crate::types::{EmailEnvelope, RoomMessageEnvelope}; use std::future::Future; use std::pin::Pin; use std::sync::Arc; const BATCH_SIZE: usize = 50; const FLUSH_INTERVAL_SECS: u64 = 1; const STREAM_PREFIX: &str = "room:stream"; const EMAIL_STREAM_KEY: &str = "email:stream"; const GROUP: &str = "room-worker"; const EMAIL_GROUP: &str = "email-worker"; fn stream_key(room_id: uuid::Uuid) -> String { format!("{STREAM_PREFIX}:{room_id}") } fn consumer_id() -> String { format!("worker-{}", uuid::Uuid::new_v4()) } pub type RedisFuture = Pin> + Send>>; pub type PersistFn = Arc) -> PersistFut + Send + Sync>; pub type PersistFut = Pin> + Send>>; pub type GetRedis = Arc RedisFuture + Send + Sync>; pub type StreamEntries = Vec<(String, Vec<(String, Vec)>)>; pub async fn start( room_ids: Vec, get_redis: GetRedis, persist_fn: PersistFn, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, log: slog::Logger, ) { let consumer = consumer_id(); slog::info!(log, "room-message worker starting"; "consumer" => %consumer, "rooms" => ?room_ids); let handles: Vec<_> = room_ids .into_iter() .map(|room_id| { let get_redis = get_redis.clone(); let persist_fn = persist_fn.clone(); let shutdown = shutdown_rx.resubscribe(); let consumer = consumer.clone(); let log = log.clone(); tokio::spawn(room_worker_task( room_id, consumer, get_redis, persist_fn, shutdown, log, )) }) .collect(); let _ = shutdown_rx.recv().await; for h in handles { let _ = h.await; } } pub async fn room_worker_task( room_id: uuid::Uuid, consumer: String, get_redis: GetRedis, persist_fn: PersistFn, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, log: slog::Logger, ) { let key = stream_key(room_id); slog::info!(log, "room worker task started"; "room_id" => %room_id); loop { tokio::select! { _ = shutdown_rx.recv() => { slog::info!(log, "room worker task shutting down"; "room_id" => %room_id); break; } _ = tokio::time::sleep(std::time::Duration::from_secs(FLUSH_INTERVAL_SECS)) => {} } match run_once(&key, &consumer, &get_redis, &persist_fn, &log).await { Ok(0) => {} Ok(n) => slog::debug!(log, "batch flushed"; "room_id" => %room_id, "n" => n), Err(e) => { slog::error!(log, "stream consume error"; "room_id" => %room_id, "error" => %e); tokio::time::sleep(std::time::Duration::from_secs(2)).await; } } } } async fn run_once( stream_key: &str, consumer: &str, get_redis: &GetRedis, persist_fn: &PersistFn, log: &slog::Logger, ) -> anyhow::Result { let redis = (get_redis)().await?; let mut conn: deadpool_redis::cluster::Connection = redis; let _: Result<(), _> = redis::cmd("XGROUP") .arg("CREATE") .arg(stream_key) .arg(GROUP) .arg("0") .arg("MKSTREAM") .query_async(&mut conn) .await; let results: StreamEntries = redis::cmd("XREADGROUP") .arg("GROUP") .arg(GROUP) .arg(consumer) .arg("COUNT") .arg(BATCH_SIZE) .arg("BLOCK") .arg(1000) .arg("STREAMS") .arg(stream_key) .arg(">") .query_async(&mut conn) .await .unwrap_or_default(); if results.is_empty() { return Ok(0); } let mut batch: Vec<(String, RoomMessageEnvelope)> = Vec::with_capacity(BATCH_SIZE); for (_stream_name, entries) in results { for (entry_id, field_values) in entries { if let Some(data) = extract_field(&field_values, "data") { match serde_json::from_str::(&data) { Ok(env) => batch.push((entry_id, env)), Err(e) => slog::warn!(log, "malformed envelope"; "entry_id" => %entry_id, "error" => %e), } } } } if batch.is_empty() { return Ok(0); } let entry_ids: Vec = batch.iter().map(|(id, _)| id.clone()).collect(); let envelopes: Vec = batch.into_iter().map(|(_, e)| e).collect(); if let Err(e) = persist_fn(envelopes).await { slog::error!(log, "persist_fn failed — entries NOT acked (will retry)"; "error" => %e); return Err(e); } let redis = (get_redis)().await?; let mut conn: deadpool_redis::cluster::Connection = redis; for entry_id in &entry_ids { let _: Result<(), _> = redis::cmd("XACK") .arg(stream_key) .arg(GROUP) .arg(entry_id) .query_async(&mut conn) .await; } slog::info!(log, "batch persisted and acked"; "n" => entry_ids.len()); Ok(entry_ids.len()) } fn extract_field(values: &[String], key: &str) -> Option { let mut it = values.iter(); loop { let field = match it.next() { Some(f) => f, None => return None, }; let value = it.next()?; if field == key { return Some(value.clone()); } } } /// Email send function type. pub type EmailSendFn = Arc) -> EmailSendFut + Send + Sync>; pub type EmailSendFut = Pin> + Send>>; /// Start the email worker that consumes from the email stream. pub async fn start_email_worker( get_redis: GetRedis, send_fn: EmailSendFn, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, log: slog::Logger, ) { let consumer = format!("email-worker-{}", uuid::Uuid::new_v4()); slog::info!(log, "email worker starting"; "consumer" => %consumer); loop { tokio::select! { _ = shutdown_rx.recv() => { slog::info!(log, "email worker shutting down"); break; } _ = tokio::time::sleep(std::time::Duration::from_secs(FLUSH_INTERVAL_SECS)) => {} } match email_run_once(&consumer, &get_redis, &send_fn, &log).await { Ok(0) => {} Ok(n) => slog::debug!(log, "email batch processed"; "n" => n), Err(e) => { slog::error!(log, "email stream consume error"; "error" => %e); tokio::time::sleep(std::time::Duration::from_secs(2)).await; } } } } async fn email_run_once( consumer: &str, get_redis: &GetRedis, send_fn: &EmailSendFn, log: &slog::Logger, ) -> anyhow::Result { let redis = (get_redis)().await?; let mut conn: deadpool_redis::cluster::Connection = redis; let _: Result<(), _> = redis::cmd("XGROUP") .arg("CREATE") .arg(EMAIL_STREAM_KEY) .arg(EMAIL_GROUP) .arg("0") .arg("MKSTREAM") .query_async(&mut conn) .await; let results: StreamEntries = redis::cmd("XREADGROUP") .arg("GROUP") .arg(EMAIL_GROUP) .arg(consumer) .arg("COUNT") .arg(BATCH_SIZE) .arg("BLOCK") .arg(1000) .arg("STREAMS") .arg(EMAIL_STREAM_KEY) .arg(">") .query_async(&mut conn) .await .unwrap_or_default(); if results.is_empty() { return Ok(0); } let mut batch: Vec<(String, EmailEnvelope)> = Vec::with_capacity(BATCH_SIZE); for (_stream_name, entries) in results { for (entry_id, field_values) in entries { if let Some(data) = extract_field(&field_values, "data") { match serde_json::from_str::(&data) { Ok(env) => batch.push((entry_id, env)), Err(e) => slog::warn!(log, "malformed email envelope"; "entry_id" => %entry_id, "error" => %e), } } } } if batch.is_empty() { return Ok(0); } let entry_ids: Vec = batch.iter().map(|(id, _)| id.clone()).collect(); let envelopes: Vec = batch.into_iter().map(|(_, e)| e).collect(); if let Err(e) = send_fn(envelopes).await { slog::error!(log, "email send_fn failed — entries NOT acked (will retry)"; "error" => %e); return Err(e); } let redis = (get_redis)().await?; let mut conn: deadpool_redis::cluster::Connection = redis; for entry_id in &entry_ids { let _: Result<(), _> = redis::cmd("XACK") .arg(EMAIL_STREAM_KEY) .arg(EMAIL_GROUP) .arg(entry_id) .query_async(&mut conn) .await; } slog::info!(log, "email batch sent and acked"; "n" => entry_ids.len()); Ok(entry_ids.len()) }