295 lines
8.9 KiB
Rust
295 lines
8.9 KiB
Rust
//! Redis Streams consumer — delegates persistence to the caller.
|
|
|
|
use crate::types::{EmailEnvelope, RoomMessageEnvelope};
|
|
use std::future::Future;
|
|
use std::pin::Pin;
|
|
use std::sync::Arc;
|
|
|
|
const BATCH_SIZE: usize = 50;
|
|
const FLUSH_INTERVAL_SECS: u64 = 1;
|
|
const STREAM_PREFIX: &str = "room:stream";
|
|
const EMAIL_STREAM_KEY: &str = "email:stream";
|
|
const GROUP: &str = "room-worker";
|
|
const EMAIL_GROUP: &str = "email-worker";
|
|
|
|
fn stream_key(room_id: uuid::Uuid) -> String {
|
|
format!("{STREAM_PREFIX}:{room_id}")
|
|
}
|
|
|
|
fn consumer_id() -> String {
|
|
format!("worker-{}", uuid::Uuid::new_v4())
|
|
}
|
|
|
|
pub type RedisFuture =
|
|
Pin<Box<dyn Future<Output = anyhow::Result<deadpool_redis::cluster::Connection>> + Send>>;
|
|
pub type PersistFn = Arc<dyn Fn(Vec<RoomMessageEnvelope>) -> PersistFut + Send + Sync>;
|
|
pub type PersistFut = Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>;
|
|
pub type GetRedis = Arc<dyn Fn() -> RedisFuture + Send + Sync>;
|
|
pub type StreamEntries = Vec<(String, Vec<(String, Vec<String>)>)>;
|
|
|
|
pub async fn start(
|
|
room_ids: Vec<uuid::Uuid>,
|
|
get_redis: GetRedis,
|
|
persist_fn: PersistFn,
|
|
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
|
|
log: slog::Logger,
|
|
) {
|
|
let consumer = consumer_id();
|
|
slog::info!(log, "room-message worker starting"; "consumer" => %consumer, "rooms" => ?room_ids);
|
|
|
|
let handles: Vec<_> = room_ids
|
|
.into_iter()
|
|
.map(|room_id| {
|
|
let get_redis = get_redis.clone();
|
|
let persist_fn = persist_fn.clone();
|
|
let shutdown = shutdown_rx.resubscribe();
|
|
let consumer = consumer.clone();
|
|
let log = log.clone();
|
|
|
|
tokio::spawn(room_worker_task(
|
|
room_id, consumer, get_redis, persist_fn, shutdown, log,
|
|
))
|
|
})
|
|
.collect();
|
|
|
|
let _ = shutdown_rx.recv().await;
|
|
|
|
for h in handles {
|
|
let _ = h.await;
|
|
}
|
|
}
|
|
|
|
pub async fn room_worker_task(
|
|
room_id: uuid::Uuid,
|
|
consumer: String,
|
|
get_redis: GetRedis,
|
|
persist_fn: PersistFn,
|
|
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
|
|
log: slog::Logger,
|
|
) {
|
|
let key = stream_key(room_id);
|
|
slog::info!(log, "room worker task started"; "room_id" => %room_id);
|
|
|
|
loop {
|
|
tokio::select! {
|
|
_ = shutdown_rx.recv() => {
|
|
slog::info!(log, "room worker task shutting down"; "room_id" => %room_id);
|
|
break;
|
|
}
|
|
_ = tokio::time::sleep(std::time::Duration::from_secs(FLUSH_INTERVAL_SECS)) => {}
|
|
}
|
|
|
|
match run_once(&key, &consumer, &get_redis, &persist_fn, &log).await {
|
|
Ok(0) => {}
|
|
Ok(n) => slog::debug!(log, "batch flushed"; "room_id" => %room_id, "n" => n),
|
|
Err(e) => {
|
|
slog::error!(log, "stream consume error"; "room_id" => %room_id, "error" => %e);
|
|
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn run_once(
|
|
stream_key: &str,
|
|
consumer: &str,
|
|
get_redis: &GetRedis,
|
|
persist_fn: &PersistFn,
|
|
log: &slog::Logger,
|
|
) -> anyhow::Result<usize> {
|
|
let redis = (get_redis)().await?;
|
|
let mut conn: deadpool_redis::cluster::Connection = redis;
|
|
|
|
let _: Result<(), _> = redis::cmd("XGROUP")
|
|
.arg("CREATE")
|
|
.arg(stream_key)
|
|
.arg(GROUP)
|
|
.arg("0")
|
|
.arg("MKSTREAM")
|
|
.query_async(&mut conn)
|
|
.await;
|
|
|
|
let results: StreamEntries = redis::cmd("XREADGROUP")
|
|
.arg("GROUP")
|
|
.arg(GROUP)
|
|
.arg(consumer)
|
|
.arg("COUNT")
|
|
.arg(BATCH_SIZE)
|
|
.arg("BLOCK")
|
|
.arg(1000)
|
|
.arg("STREAMS")
|
|
.arg(stream_key)
|
|
.arg(">")
|
|
.query_async(&mut conn)
|
|
.await
|
|
.unwrap_or_default();
|
|
|
|
if results.is_empty() {
|
|
return Ok(0);
|
|
}
|
|
|
|
let mut batch: Vec<(String, RoomMessageEnvelope)> = Vec::with_capacity(BATCH_SIZE);
|
|
for (_stream_name, entries) in results {
|
|
for (entry_id, field_values) in entries {
|
|
if let Some(data) = extract_field(&field_values, "data") {
|
|
match serde_json::from_str::<RoomMessageEnvelope>(&data) {
|
|
Ok(env) => batch.push((entry_id, env)),
|
|
Err(e) => slog::warn!(log, "malformed envelope";
|
|
"entry_id" => %entry_id, "error" => %e),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if batch.is_empty() {
|
|
return Ok(0);
|
|
}
|
|
|
|
let entry_ids: Vec<String> = batch.iter().map(|(id, _)| id.clone()).collect();
|
|
let envelopes: Vec<RoomMessageEnvelope> = batch.into_iter().map(|(_, e)| e).collect();
|
|
|
|
if let Err(e) = persist_fn(envelopes).await {
|
|
slog::error!(log, "persist_fn failed — entries NOT acked (will retry)"; "error" => %e);
|
|
return Err(e);
|
|
}
|
|
|
|
let redis = (get_redis)().await?;
|
|
let mut conn: deadpool_redis::cluster::Connection = redis;
|
|
for entry_id in &entry_ids {
|
|
let _: Result<(), _> = redis::cmd("XACK")
|
|
.arg(stream_key)
|
|
.arg(GROUP)
|
|
.arg(entry_id)
|
|
.query_async(&mut conn)
|
|
.await;
|
|
}
|
|
|
|
slog::info!(log, "batch persisted and acked"; "n" => entry_ids.len());
|
|
Ok(entry_ids.len())
|
|
}
|
|
|
|
fn extract_field(values: &[String], key: &str) -> Option<String> {
|
|
let mut it = values.iter();
|
|
loop {
|
|
let field = match it.next() {
|
|
Some(f) => f,
|
|
None => return None,
|
|
};
|
|
let value = it.next()?;
|
|
if field == key {
|
|
return Some(value.clone());
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Email send function type.
|
|
pub type EmailSendFn = Arc<dyn Fn(Vec<EmailEnvelope>) -> EmailSendFut + Send + Sync>;
|
|
pub type EmailSendFut = Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>;
|
|
|
|
/// Start the email worker that consumes from the email stream.
|
|
pub async fn start_email_worker(
|
|
get_redis: GetRedis,
|
|
send_fn: EmailSendFn,
|
|
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
|
|
log: slog::Logger,
|
|
) {
|
|
let consumer = format!("email-worker-{}", uuid::Uuid::new_v4());
|
|
slog::info!(log, "email worker starting"; "consumer" => %consumer);
|
|
|
|
loop {
|
|
tokio::select! {
|
|
_ = shutdown_rx.recv() => {
|
|
slog::info!(log, "email worker shutting down");
|
|
break;
|
|
}
|
|
_ = tokio::time::sleep(std::time::Duration::from_secs(FLUSH_INTERVAL_SECS)) => {}
|
|
}
|
|
|
|
match email_run_once(&consumer, &get_redis, &send_fn, &log).await {
|
|
Ok(0) => {}
|
|
Ok(n) => slog::debug!(log, "email batch processed"; "n" => n),
|
|
Err(e) => {
|
|
slog::error!(log, "email stream consume error"; "error" => %e);
|
|
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn email_run_once(
|
|
consumer: &str,
|
|
get_redis: &GetRedis,
|
|
send_fn: &EmailSendFn,
|
|
log: &slog::Logger,
|
|
) -> anyhow::Result<usize> {
|
|
let redis = (get_redis)().await?;
|
|
let mut conn: deadpool_redis::cluster::Connection = redis;
|
|
|
|
let _: Result<(), _> = redis::cmd("XGROUP")
|
|
.arg("CREATE")
|
|
.arg(EMAIL_STREAM_KEY)
|
|
.arg(EMAIL_GROUP)
|
|
.arg("0")
|
|
.arg("MKSTREAM")
|
|
.query_async(&mut conn)
|
|
.await;
|
|
|
|
let results: StreamEntries = redis::cmd("XREADGROUP")
|
|
.arg("GROUP")
|
|
.arg(EMAIL_GROUP)
|
|
.arg(consumer)
|
|
.arg("COUNT")
|
|
.arg(BATCH_SIZE)
|
|
.arg("BLOCK")
|
|
.arg(1000)
|
|
.arg("STREAMS")
|
|
.arg(EMAIL_STREAM_KEY)
|
|
.arg(">")
|
|
.query_async(&mut conn)
|
|
.await
|
|
.unwrap_or_default();
|
|
|
|
if results.is_empty() {
|
|
return Ok(0);
|
|
}
|
|
|
|
let mut batch: Vec<(String, EmailEnvelope)> = Vec::with_capacity(BATCH_SIZE);
|
|
for (_stream_name, entries) in results {
|
|
for (entry_id, field_values) in entries {
|
|
if let Some(data) = extract_field(&field_values, "data") {
|
|
match serde_json::from_str::<EmailEnvelope>(&data) {
|
|
Ok(env) => batch.push((entry_id, env)),
|
|
Err(e) => slog::warn!(log, "malformed email envelope";
|
|
"entry_id" => %entry_id, "error" => %e),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if batch.is_empty() {
|
|
return Ok(0);
|
|
}
|
|
|
|
let entry_ids: Vec<String> = batch.iter().map(|(id, _)| id.clone()).collect();
|
|
let envelopes: Vec<EmailEnvelope> = batch.into_iter().map(|(_, e)| e).collect();
|
|
|
|
if let Err(e) = send_fn(envelopes).await {
|
|
slog::error!(log, "email send_fn failed — entries NOT acked (will retry)"; "error" => %e);
|
|
return Err(e);
|
|
}
|
|
|
|
let redis = (get_redis)().await?;
|
|
let mut conn: deadpool_redis::cluster::Connection = redis;
|
|
for entry_id in &entry_ids {
|
|
let _: Result<(), _> = redis::cmd("XACK")
|
|
.arg(EMAIL_STREAM_KEY)
|
|
.arg(EMAIL_GROUP)
|
|
.arg(entry_id)
|
|
.query_async(&mut conn)
|
|
.await;
|
|
}
|
|
|
|
slog::info!(log, "email batch sent and acked"; "n" => entry_ids.len());
|
|
Ok(entry_ids.len())
|
|
}
|