Health monitoring: - gitserver: /health endpoint on port 8021 (DB + Redis ping) - git-hook: hyper health server on port 8083 with /health - email-worker: hyper health server on port 8084 with /health - K8s probes updated to httpGet for all three services Metrics (via /metrics endpoint): - git-hook: hook_tasks_total/success/failed/locked/retried/exhausted, hook_sync_branches/tags_changed_total - email: email_queued/consumed/sent/failed_total, email_validation_skipped/build_errors/send_attempts_total
292 lines
8.8 KiB
Rust
292 lines
8.8 KiB
Rust
//! Redis Streams consumer — delegates persistence to the caller.
|
|
|
|
use crate::types::{EmailEnvelope, RoomMessageEnvelope};
|
|
use metrics::counter;
|
|
use std::future::Future;
|
|
use std::pin::Pin;
|
|
use std::sync::Arc;
|
|
|
|
const BATCH_SIZE: usize = 50;
|
|
const FLUSH_INTERVAL_SECS: u64 = 1;
|
|
const STREAM_PREFIX: &str = "room:stream";
|
|
const EMAIL_STREAM_KEY: &str = "email:stream";
|
|
const GROUP: &str = "room-worker";
|
|
const EMAIL_GROUP: &str = "email-worker";
|
|
|
|
fn stream_key(room_id: uuid::Uuid) -> String {
|
|
format!("{STREAM_PREFIX}:{room_id}")
|
|
}
|
|
|
|
fn consumer_id() -> String {
|
|
format!("worker-{}", uuid::Uuid::new_v4())
|
|
}
|
|
|
|
pub type RedisFuture =
|
|
Pin<Box<dyn Future<Output = anyhow::Result<deadpool_redis::cluster::Connection>> + Send>>;
|
|
pub type PersistFn = Arc<dyn Fn(Vec<RoomMessageEnvelope>) -> PersistFut + Send + Sync>;
|
|
pub type PersistFut = Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>;
|
|
pub type GetRedis = Arc<dyn Fn() -> RedisFuture + Send + Sync>;
|
|
pub type StreamEntries = Vec<(String, Vec<(String, Vec<String>)>)>;
|
|
|
|
pub async fn start(
|
|
room_ids: Vec<uuid::Uuid>,
|
|
get_redis: GetRedis,
|
|
persist_fn: PersistFn,
|
|
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
|
|
) {
|
|
let consumer = consumer_id();
|
|
tracing::info!(consumer = %consumer, rooms = ?room_ids, "room-message worker starting");
|
|
|
|
let handles: Vec<_> = room_ids
|
|
.into_iter()
|
|
.map(|room_id| {
|
|
let get_redis = get_redis.clone();
|
|
let persist_fn = persist_fn.clone();
|
|
let shutdown = shutdown_rx.resubscribe();
|
|
let consumer = consumer.clone();
|
|
|
|
tokio::spawn(room_worker_task(
|
|
room_id, consumer, get_redis, persist_fn, shutdown,
|
|
))
|
|
})
|
|
.collect();
|
|
|
|
let _ = shutdown_rx.recv().await;
|
|
|
|
for h in handles {
|
|
let _ = h.await;
|
|
}
|
|
}
|
|
|
|
pub async fn room_worker_task(
|
|
room_id: uuid::Uuid,
|
|
consumer: String,
|
|
get_redis: GetRedis,
|
|
persist_fn: PersistFn,
|
|
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
|
|
) {
|
|
let key = stream_key(room_id);
|
|
tracing::info!(room_id = %room_id, "room worker task started");
|
|
|
|
loop {
|
|
tokio::select! {
|
|
_ = shutdown_rx.recv() => {
|
|
tracing::info!(room_id = %room_id, "room worker task shutting down");
|
|
break;
|
|
}
|
|
_ = tokio::time::sleep(std::time::Duration::from_secs(FLUSH_INTERVAL_SECS)) => {}
|
|
}
|
|
|
|
match run_once(&key, &consumer, &get_redis, &persist_fn).await {
|
|
Ok(0) => {}
|
|
Ok(n) => tracing::debug!(room_id = %room_id, n = n, "batch flushed"),
|
|
Err(e) => {
|
|
tracing::error!(room_id = %room_id, error = %e, "stream consume error");
|
|
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn run_once(
|
|
stream_key: &str,
|
|
consumer: &str,
|
|
get_redis: &GetRedis,
|
|
persist_fn: &PersistFn,
|
|
) -> anyhow::Result<usize> {
|
|
let redis = (get_redis)().await?;
|
|
let mut conn: deadpool_redis::cluster::Connection = redis;
|
|
|
|
let _: Result<(), _> = redis::cmd("XGROUP")
|
|
.arg("CREATE")
|
|
.arg(stream_key)
|
|
.arg(GROUP)
|
|
.arg("0")
|
|
.arg("MKSTREAM")
|
|
.query_async(&mut conn)
|
|
.await;
|
|
|
|
let results: StreamEntries = redis::cmd("XREADGROUP")
|
|
.arg("GROUP")
|
|
.arg(GROUP)
|
|
.arg(consumer)
|
|
.arg("COUNT")
|
|
.arg(BATCH_SIZE)
|
|
.arg("BLOCK")
|
|
.arg(1000)
|
|
.arg("STREAMS")
|
|
.arg(stream_key)
|
|
.arg(">")
|
|
.query_async(&mut conn)
|
|
.await
|
|
.unwrap_or_default();
|
|
|
|
if results.is_empty() {
|
|
return Ok(0);
|
|
}
|
|
|
|
let mut batch: Vec<(String, RoomMessageEnvelope)> = Vec::with_capacity(BATCH_SIZE);
|
|
for (_stream_name, entries) in results {
|
|
for (entry_id, field_values) in entries {
|
|
if let Some(data) = extract_field(&field_values, "data") {
|
|
match serde_json::from_str::<RoomMessageEnvelope>(&data) {
|
|
Ok(env) => batch.push((entry_id, env)),
|
|
Err(e) => tracing::warn!(entry_id = %entry_id, error = %e, "malformed envelope"),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if batch.is_empty() {
|
|
return Ok(0);
|
|
}
|
|
|
|
let entry_ids: Vec<String> = batch.iter().map(|(id, _)| id.clone()).collect();
|
|
let envelopes: Vec<RoomMessageEnvelope> = batch.into_iter().map(|(_, e)| e).collect();
|
|
|
|
if let Err(e) = persist_fn(envelopes).await {
|
|
tracing::error!(error = %e, "persist_fn failed — entries NOT acked (will retry)");
|
|
return Err(e);
|
|
}
|
|
|
|
let redis = (get_redis)().await?;
|
|
let mut conn: deadpool_redis::cluster::Connection = redis;
|
|
for entry_id in &entry_ids {
|
|
let _: Result<(), _> = redis::cmd("XACK")
|
|
.arg(stream_key)
|
|
.arg(GROUP)
|
|
.arg(entry_id)
|
|
.query_async(&mut conn)
|
|
.await;
|
|
}
|
|
|
|
tracing::info!(n = entry_ids.len(), "batch persisted and acked");
|
|
Ok(entry_ids.len())
|
|
}
|
|
|
|
fn extract_field(values: &[String], key: &str) -> Option<String> {
|
|
let mut it = values.iter();
|
|
loop {
|
|
let field = match it.next() {
|
|
Some(f) => f,
|
|
None => return None,
|
|
};
|
|
let value = it.next()?;
|
|
if field == key {
|
|
return Some(value.clone());
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Email send function type.
|
|
pub type EmailSendFn = Arc<dyn Fn(Vec<EmailEnvelope>) -> EmailSendFut + Send + Sync>;
|
|
pub type EmailSendFut = Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>;
|
|
|
|
/// Start the email worker that consumes from the email stream.
|
|
pub async fn start_email_worker(
|
|
get_redis: GetRedis,
|
|
send_fn: EmailSendFn,
|
|
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
|
|
) {
|
|
let consumer = format!("email-worker-{}", uuid::Uuid::new_v4());
|
|
tracing::info!(consumer = %consumer, "email worker starting");
|
|
|
|
loop {
|
|
tokio::select! {
|
|
_ = shutdown_rx.recv() => {
|
|
tracing::info!("email worker shutting down");
|
|
break;
|
|
}
|
|
_ = tokio::time::sleep(std::time::Duration::from_secs(FLUSH_INTERVAL_SECS)) => {}
|
|
}
|
|
|
|
match email_run_once(&consumer, &get_redis, &send_fn).await {
|
|
Ok(0) => {}
|
|
Ok(n) => tracing::debug!(n = n, "email batch processed"),
|
|
Err(e) => {
|
|
tracing::error!(error = %e, "email stream consume error");
|
|
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn email_run_once(
|
|
consumer: &str,
|
|
get_redis: &GetRedis,
|
|
send_fn: &EmailSendFn,
|
|
) -> anyhow::Result<usize> {
|
|
let redis = (get_redis)().await?;
|
|
let mut conn: deadpool_redis::cluster::Connection = redis;
|
|
|
|
let _: Result<(), _> = redis::cmd("XGROUP")
|
|
.arg("CREATE")
|
|
.arg(EMAIL_STREAM_KEY)
|
|
.arg(EMAIL_GROUP)
|
|
.arg("0")
|
|
.arg("MKSTREAM")
|
|
.query_async(&mut conn)
|
|
.await;
|
|
|
|
let results: StreamEntries = redis::cmd("XREADGROUP")
|
|
.arg("GROUP")
|
|
.arg(EMAIL_GROUP)
|
|
.arg(consumer)
|
|
.arg("COUNT")
|
|
.arg(BATCH_SIZE)
|
|
.arg("BLOCK")
|
|
.arg(1000)
|
|
.arg("STREAMS")
|
|
.arg(EMAIL_STREAM_KEY)
|
|
.arg(">")
|
|
.query_async(&mut conn)
|
|
.await
|
|
.unwrap_or_default();
|
|
|
|
if results.is_empty() {
|
|
return Ok(0);
|
|
}
|
|
|
|
let mut batch: Vec<(String, EmailEnvelope)> = Vec::with_capacity(BATCH_SIZE);
|
|
for (_stream_name, entries) in results {
|
|
for (entry_id, field_values) in entries {
|
|
if let Some(data) = extract_field(&field_values, "data") {
|
|
match serde_json::from_str::<EmailEnvelope>(&data) {
|
|
Ok(env) => batch.push((entry_id, env)),
|
|
Err(e) => tracing::warn!(entry_id = %entry_id, error = %e, "malformed email envelope"),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if batch.is_empty() {
|
|
return Ok(0);
|
|
}
|
|
|
|
let batch_size = batch.len();
|
|
counter!("email_consumed_total").increment(batch_size as u64);
|
|
counter!("email_batch_size").increment(batch_size as u64);
|
|
|
|
let entry_ids: Vec<String> = batch.iter().map(|(id, _)| id.clone()).collect();
|
|
let envelopes: Vec<EmailEnvelope> = batch.into_iter().map(|(_, e)| e).collect();
|
|
|
|
if let Err(e) = send_fn(envelopes).await {
|
|
tracing::error!(error = %e, "email send_fn failed — entries NOT acked (will retry)");
|
|
return Err(e);
|
|
}
|
|
|
|
let redis = (get_redis)().await?;
|
|
let mut conn: deadpool_redis::cluster::Connection = redis;
|
|
for entry_id in &entry_ids {
|
|
let _: Result<(), _> = redis::cmd("XACK")
|
|
.arg(EMAIL_STREAM_KEY)
|
|
.arg(EMAIL_GROUP)
|
|
.arg(entry_id)
|
|
.query_async(&mut conn)
|
|
.await;
|
|
}
|
|
|
|
tracing::info!(n = entry_ids.len(), "email batch sent and acked");
|
|
Ok(entry_ids.len())
|
|
}
|