gitdataai/libs/queue/worker.rs
ZhenYi 9a03cded8d refactor(queue): migrate from slog to tracing
- Remove all slog::Logger imports and log fields from RedisPubSub,
  MessageProducer structs
- Remove log parameter from MessageProducer::new()
- Replace slog::info!/warn!/error! with tracing equivalents
- worker.rs: remove log: slog::Logger from start(), room_worker_task(),
  start_email_worker(), run_once(), email_run_once()
2026-04-21 22:29:09 +08:00

287 lines
8.6 KiB
Rust

//! Redis Streams consumer — delegates persistence to the caller.
use crate::types::{EmailEnvelope, RoomMessageEnvelope};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
const BATCH_SIZE: usize = 50;
const FLUSH_INTERVAL_SECS: u64 = 1;
const STREAM_PREFIX: &str = "room:stream";
const EMAIL_STREAM_KEY: &str = "email:stream";
const GROUP: &str = "room-worker";
const EMAIL_GROUP: &str = "email-worker";
fn stream_key(room_id: uuid::Uuid) -> String {
format!("{STREAM_PREFIX}:{room_id}")
}
fn consumer_id() -> String {
format!("worker-{}", uuid::Uuid::new_v4())
}
pub type RedisFuture =
Pin<Box<dyn Future<Output = anyhow::Result<deadpool_redis::cluster::Connection>> + Send>>;
pub type PersistFn = Arc<dyn Fn(Vec<RoomMessageEnvelope>) -> PersistFut + Send + Sync>;
pub type PersistFut = Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>;
pub type GetRedis = Arc<dyn Fn() -> RedisFuture + Send + Sync>;
pub type StreamEntries = Vec<(String, Vec<(String, Vec<String>)>)>;
pub async fn start(
room_ids: Vec<uuid::Uuid>,
get_redis: GetRedis,
persist_fn: PersistFn,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) {
let consumer = consumer_id();
tracing::info!(consumer = %consumer, rooms = ?room_ids, "room-message worker starting");
let handles: Vec<_> = room_ids
.into_iter()
.map(|room_id| {
let get_redis = get_redis.clone();
let persist_fn = persist_fn.clone();
let shutdown = shutdown_rx.resubscribe();
let consumer = consumer.clone();
tokio::spawn(room_worker_task(
room_id, consumer, get_redis, persist_fn, shutdown,
))
})
.collect();
let _ = shutdown_rx.recv().await;
for h in handles {
let _ = h.await;
}
}
pub async fn room_worker_task(
room_id: uuid::Uuid,
consumer: String,
get_redis: GetRedis,
persist_fn: PersistFn,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) {
let key = stream_key(room_id);
tracing::info!(room_id = %room_id, "room worker task started");
loop {
tokio::select! {
_ = shutdown_rx.recv() => {
tracing::info!(room_id = %room_id, "room worker task shutting down");
break;
}
_ = tokio::time::sleep(std::time::Duration::from_secs(FLUSH_INTERVAL_SECS)) => {}
}
match run_once(&key, &consumer, &get_redis, &persist_fn).await {
Ok(0) => {}
Ok(n) => tracing::debug!(room_id = %room_id, n = n, "batch flushed"),
Err(e) => {
tracing::error!(room_id = %room_id, error = %e, "stream consume error");
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
}
}
}
async fn run_once(
stream_key: &str,
consumer: &str,
get_redis: &GetRedis,
persist_fn: &PersistFn,
) -> anyhow::Result<usize> {
let redis = (get_redis)().await?;
let mut conn: deadpool_redis::cluster::Connection = redis;
let _: Result<(), _> = redis::cmd("XGROUP")
.arg("CREATE")
.arg(stream_key)
.arg(GROUP)
.arg("0")
.arg("MKSTREAM")
.query_async(&mut conn)
.await;
let results: StreamEntries = redis::cmd("XREADGROUP")
.arg("GROUP")
.arg(GROUP)
.arg(consumer)
.arg("COUNT")
.arg(BATCH_SIZE)
.arg("BLOCK")
.arg(1000)
.arg("STREAMS")
.arg(stream_key)
.arg(">")
.query_async(&mut conn)
.await
.unwrap_or_default();
if results.is_empty() {
return Ok(0);
}
let mut batch: Vec<(String, RoomMessageEnvelope)> = Vec::with_capacity(BATCH_SIZE);
for (_stream_name, entries) in results {
for (entry_id, field_values) in entries {
if let Some(data) = extract_field(&field_values, "data") {
match serde_json::from_str::<RoomMessageEnvelope>(&data) {
Ok(env) => batch.push((entry_id, env)),
Err(e) => tracing::warn!(entry_id = %entry_id, error = %e, "malformed envelope"),
}
}
}
}
if batch.is_empty() {
return Ok(0);
}
let entry_ids: Vec<String> = batch.iter().map(|(id, _)| id.clone()).collect();
let envelopes: Vec<RoomMessageEnvelope> = batch.into_iter().map(|(_, e)| e).collect();
if let Err(e) = persist_fn(envelopes).await {
tracing::error!(error = %e, "persist_fn failed — entries NOT acked (will retry)");
return Err(e);
}
let redis = (get_redis)().await?;
let mut conn: deadpool_redis::cluster::Connection = redis;
for entry_id in &entry_ids {
let _: Result<(), _> = redis::cmd("XACK")
.arg(stream_key)
.arg(GROUP)
.arg(entry_id)
.query_async(&mut conn)
.await;
}
tracing::info!(n = entry_ids.len(), "batch persisted and acked");
Ok(entry_ids.len())
}
fn extract_field(values: &[String], key: &str) -> Option<String> {
let mut it = values.iter();
loop {
let field = match it.next() {
Some(f) => f,
None => return None,
};
let value = it.next()?;
if field == key {
return Some(value.clone());
}
}
}
/// Email send function type.
pub type EmailSendFn = Arc<dyn Fn(Vec<EmailEnvelope>) -> EmailSendFut + Send + Sync>;
pub type EmailSendFut = Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>;
/// Start the email worker that consumes from the email stream.
pub async fn start_email_worker(
get_redis: GetRedis,
send_fn: EmailSendFn,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) {
let consumer = format!("email-worker-{}", uuid::Uuid::new_v4());
tracing::info!(consumer = %consumer, "email worker starting");
loop {
tokio::select! {
_ = shutdown_rx.recv() => {
tracing::info!("email worker shutting down");
break;
}
_ = tokio::time::sleep(std::time::Duration::from_secs(FLUSH_INTERVAL_SECS)) => {}
}
match email_run_once(&consumer, &get_redis, &send_fn).await {
Ok(0) => {}
Ok(n) => tracing::debug!(n = n, "email batch processed"),
Err(e) => {
tracing::error!(error = %e, "email stream consume error");
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
}
}
}
async fn email_run_once(
consumer: &str,
get_redis: &GetRedis,
send_fn: &EmailSendFn,
) -> anyhow::Result<usize> {
let redis = (get_redis)().await?;
let mut conn: deadpool_redis::cluster::Connection = redis;
let _: Result<(), _> = redis::cmd("XGROUP")
.arg("CREATE")
.arg(EMAIL_STREAM_KEY)
.arg(EMAIL_GROUP)
.arg("0")
.arg("MKSTREAM")
.query_async(&mut conn)
.await;
let results: StreamEntries = redis::cmd("XREADGROUP")
.arg("GROUP")
.arg(EMAIL_GROUP)
.arg(consumer)
.arg("COUNT")
.arg(BATCH_SIZE)
.arg("BLOCK")
.arg(1000)
.arg("STREAMS")
.arg(EMAIL_STREAM_KEY)
.arg(">")
.query_async(&mut conn)
.await
.unwrap_or_default();
if results.is_empty() {
return Ok(0);
}
let mut batch: Vec<(String, EmailEnvelope)> = Vec::with_capacity(BATCH_SIZE);
for (_stream_name, entries) in results {
for (entry_id, field_values) in entries {
if let Some(data) = extract_field(&field_values, "data") {
match serde_json::from_str::<EmailEnvelope>(&data) {
Ok(env) => batch.push((entry_id, env)),
Err(e) => tracing::warn!(entry_id = %entry_id, error = %e, "malformed email envelope"),
}
}
}
}
if batch.is_empty() {
return Ok(0);
}
let entry_ids: Vec<String> = batch.iter().map(|(id, _)| id.clone()).collect();
let envelopes: Vec<EmailEnvelope> = batch.into_iter().map(|(_, e)| e).collect();
if let Err(e) = send_fn(envelopes).await {
tracing::error!(error = %e, "email send_fn failed — entries NOT acked (will retry)");
return Err(e);
}
let redis = (get_redis)().await?;
let mut conn: deadpool_redis::cluster::Connection = redis;
for entry_id in &entry_ids {
let _: Result<(), _> = redis::cmd("XACK")
.arg(EMAIL_STREAM_KEY)
.arg(EMAIL_GROUP)
.arg(entry_id)
.query_async(&mut conn)
.await;
}
tracing::info!(n = entry_ids.len(), "email batch sent and acked");
Ok(entry_ids.len())
}