gitdataai/libs/queue/worker.rs
2026-04-14 19:02:01 +08:00

295 lines
8.9 KiB
Rust

//! Redis Streams consumer — delegates persistence to the caller.
use crate::types::{EmailEnvelope, RoomMessageEnvelope};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
const BATCH_SIZE: usize = 50;
const FLUSH_INTERVAL_SECS: u64 = 1;
const STREAM_PREFIX: &str = "room:stream";
const EMAIL_STREAM_KEY: &str = "email:stream";
const GROUP: &str = "room-worker";
const EMAIL_GROUP: &str = "email-worker";
fn stream_key(room_id: uuid::Uuid) -> String {
format!("{STREAM_PREFIX}:{room_id}")
}
fn consumer_id() -> String {
format!("worker-{}", uuid::Uuid::new_v4())
}
pub type RedisFuture =
Pin<Box<dyn Future<Output = anyhow::Result<deadpool_redis::cluster::Connection>> + Send>>;
pub type PersistFn = Arc<dyn Fn(Vec<RoomMessageEnvelope>) -> PersistFut + Send + Sync>;
pub type PersistFut = Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>;
pub type GetRedis = Arc<dyn Fn() -> RedisFuture + Send + Sync>;
pub type StreamEntries = Vec<(String, Vec<(String, Vec<String>)>)>;
pub async fn start(
room_ids: Vec<uuid::Uuid>,
get_redis: GetRedis,
persist_fn: PersistFn,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
log: slog::Logger,
) {
let consumer = consumer_id();
slog::info!(log, "room-message worker starting"; "consumer" => %consumer, "rooms" => ?room_ids);
let handles: Vec<_> = room_ids
.into_iter()
.map(|room_id| {
let get_redis = get_redis.clone();
let persist_fn = persist_fn.clone();
let shutdown = shutdown_rx.resubscribe();
let consumer = consumer.clone();
let log = log.clone();
tokio::spawn(room_worker_task(
room_id, consumer, get_redis, persist_fn, shutdown, log,
))
})
.collect();
let _ = shutdown_rx.recv().await;
for h in handles {
let _ = h.await;
}
}
pub async fn room_worker_task(
room_id: uuid::Uuid,
consumer: String,
get_redis: GetRedis,
persist_fn: PersistFn,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
log: slog::Logger,
) {
let key = stream_key(room_id);
slog::info!(log, "room worker task started"; "room_id" => %room_id);
loop {
tokio::select! {
_ = shutdown_rx.recv() => {
slog::info!(log, "room worker task shutting down"; "room_id" => %room_id);
break;
}
_ = tokio::time::sleep(std::time::Duration::from_secs(FLUSH_INTERVAL_SECS)) => {}
}
match run_once(&key, &consumer, &get_redis, &persist_fn, &log).await {
Ok(0) => {}
Ok(n) => slog::debug!(log, "batch flushed"; "room_id" => %room_id, "n" => n),
Err(e) => {
slog::error!(log, "stream consume error"; "room_id" => %room_id, "error" => %e);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
}
}
}
async fn run_once(
stream_key: &str,
consumer: &str,
get_redis: &GetRedis,
persist_fn: &PersistFn,
log: &slog::Logger,
) -> anyhow::Result<usize> {
let redis = (get_redis)().await?;
let mut conn: deadpool_redis::cluster::Connection = redis;
let _: Result<(), _> = redis::cmd("XGROUP")
.arg("CREATE")
.arg(stream_key)
.arg(GROUP)
.arg("0")
.arg("MKSTREAM")
.query_async(&mut conn)
.await;
let results: StreamEntries = redis::cmd("XREADGROUP")
.arg("GROUP")
.arg(GROUP)
.arg(consumer)
.arg("COUNT")
.arg(BATCH_SIZE)
.arg("BLOCK")
.arg(1000)
.arg("STREAMS")
.arg(stream_key)
.arg(">")
.query_async(&mut conn)
.await
.unwrap_or_default();
if results.is_empty() {
return Ok(0);
}
let mut batch: Vec<(String, RoomMessageEnvelope)> = Vec::with_capacity(BATCH_SIZE);
for (_stream_name, entries) in results {
for (entry_id, field_values) in entries {
if let Some(data) = extract_field(&field_values, "data") {
match serde_json::from_str::<RoomMessageEnvelope>(&data) {
Ok(env) => batch.push((entry_id, env)),
Err(e) => slog::warn!(log, "malformed envelope";
"entry_id" => %entry_id, "error" => %e),
}
}
}
}
if batch.is_empty() {
return Ok(0);
}
let entry_ids: Vec<String> = batch.iter().map(|(id, _)| id.clone()).collect();
let envelopes: Vec<RoomMessageEnvelope> = batch.into_iter().map(|(_, e)| e).collect();
if let Err(e) = persist_fn(envelopes).await {
slog::error!(log, "persist_fn failed — entries NOT acked (will retry)"; "error" => %e);
return Err(e);
}
let redis = (get_redis)().await?;
let mut conn: deadpool_redis::cluster::Connection = redis;
for entry_id in &entry_ids {
let _: Result<(), _> = redis::cmd("XACK")
.arg(stream_key)
.arg(GROUP)
.arg(entry_id)
.query_async(&mut conn)
.await;
}
slog::info!(log, "batch persisted and acked"; "n" => entry_ids.len());
Ok(entry_ids.len())
}
fn extract_field(values: &[String], key: &str) -> Option<String> {
let mut it = values.iter();
loop {
let field = match it.next() {
Some(f) => f,
None => return None,
};
let value = it.next()?;
if field == key {
return Some(value.clone());
}
}
}
/// Email send function type.
pub type EmailSendFn = Arc<dyn Fn(Vec<EmailEnvelope>) -> EmailSendFut + Send + Sync>;
pub type EmailSendFut = Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>;
/// Start the email worker that consumes from the email stream.
pub async fn start_email_worker(
get_redis: GetRedis,
send_fn: EmailSendFn,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
log: slog::Logger,
) {
let consumer = format!("email-worker-{}", uuid::Uuid::new_v4());
slog::info!(log, "email worker starting"; "consumer" => %consumer);
loop {
tokio::select! {
_ = shutdown_rx.recv() => {
slog::info!(log, "email worker shutting down");
break;
}
_ = tokio::time::sleep(std::time::Duration::from_secs(FLUSH_INTERVAL_SECS)) => {}
}
match email_run_once(&consumer, &get_redis, &send_fn, &log).await {
Ok(0) => {}
Ok(n) => slog::debug!(log, "email batch processed"; "n" => n),
Err(e) => {
slog::error!(log, "email stream consume error"; "error" => %e);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
}
}
}
async fn email_run_once(
consumer: &str,
get_redis: &GetRedis,
send_fn: &EmailSendFn,
log: &slog::Logger,
) -> anyhow::Result<usize> {
let redis = (get_redis)().await?;
let mut conn: deadpool_redis::cluster::Connection = redis;
let _: Result<(), _> = redis::cmd("XGROUP")
.arg("CREATE")
.arg(EMAIL_STREAM_KEY)
.arg(EMAIL_GROUP)
.arg("0")
.arg("MKSTREAM")
.query_async(&mut conn)
.await;
let results: StreamEntries = redis::cmd("XREADGROUP")
.arg("GROUP")
.arg(EMAIL_GROUP)
.arg(consumer)
.arg("COUNT")
.arg(BATCH_SIZE)
.arg("BLOCK")
.arg(1000)
.arg("STREAMS")
.arg(EMAIL_STREAM_KEY)
.arg(">")
.query_async(&mut conn)
.await
.unwrap_or_default();
if results.is_empty() {
return Ok(0);
}
let mut batch: Vec<(String, EmailEnvelope)> = Vec::with_capacity(BATCH_SIZE);
for (_stream_name, entries) in results {
for (entry_id, field_values) in entries {
if let Some(data) = extract_field(&field_values, "data") {
match serde_json::from_str::<EmailEnvelope>(&data) {
Ok(env) => batch.push((entry_id, env)),
Err(e) => slog::warn!(log, "malformed email envelope";
"entry_id" => %entry_id, "error" => %e),
}
}
}
}
if batch.is_empty() {
return Ok(0);
}
let entry_ids: Vec<String> = batch.iter().map(|(id, _)| id.clone()).collect();
let envelopes: Vec<EmailEnvelope> = batch.into_iter().map(|(_, e)| e).collect();
if let Err(e) = send_fn(envelopes).await {
slog::error!(log, "email send_fn failed — entries NOT acked (will retry)"; "error" => %e);
return Err(e);
}
let redis = (get_redis)().await?;
let mut conn: deadpool_redis::cluster::Connection = redis;
for entry_id in &entry_ids {
let _: Result<(), _> = redis::cmd("XACK")
.arg(EMAIL_STREAM_KEY)
.arg(EMAIL_GROUP)
.arg(entry_id)
.query_async(&mut conn)
.await;
}
slog::info!(log, "email batch sent and acked"; "n" => entry_ids.len());
Ok(entry_ids.len())
}