gitdataai/apps/email/src/main.rs
ZhenYi 0b5dc98ce5
Some checks are pending
CI / Rust Lint & Check (push) Waiting to run
CI / Rust Tests (push) Waiting to run
CI / Frontend Lint & Type Check (push) Waiting to run
CI / Frontend Build (push) Blocked by required conditions
refactor(db): simplify read-replica to single connection for CNPG
CNPG's cluster-ro service already handles load balancing and failover,
so the application-level Vec + random_range is redundant.

- db_read: Vec<DatabaseConnection> → Option<DatabaseConnection>
- database_read_replicas returns Option<String> instead of Vec<String>
- health checks now explicitly ping both writer() and reader()
- remove unused rand dependency from libs/db
2026-04-26 01:03:39 +08:00

125 lines
4.7 KiB
Rust

use clap::Parser;
use config::AppConfig;
use metrics::{describe_counter, Unit};
use metrics_exporter_prometheus::PrometheusHandle;
use observability::{init_tracing_subscriber, install_recorder};
use sea_orm::ConnectionTrait;
use service::AppService;
use std::sync::Arc;
#[derive(Parser, Debug)]
#[command(name = "email-worker")]
#[command(version)]
struct Args {
#[arg(long, default_value = "info")]
log_level: String,
}
async fn http_handler(
db: Arc<db::database::AppDatabase>,
cache: Arc<db::cache::AppCache>,
metrics: Arc<PrometheusHandle>,
req: hyper::Request<hyper::Body>,
) -> Result<hyper::Response<hyper::Body>, std::convert::Infallible> {
match req.uri().path() {
"/health" => {
let writer_ok = db
.writer()
.execute_unprepared("SELECT 1")
.await
.is_ok();
let reader_ok = db
.reader()
.execute_unprepared("SELECT 1")
.await
.is_ok();
let db_ok = writer_ok && reader_ok;
let cache_ok = cache.conn().await.is_ok();
let body = serde_json::json!({
"status": if db_ok && cache_ok { "ok" } else { "unhealthy" },
"db": if db_ok { "ok" } else { "error" },
"cache": if cache_ok { "ok" } else { "error" },
});
let status = if db_ok && cache_ok { 200 } else { 503 };
Ok(hyper::Response::builder()
.status(status)
.header("content-type", "application/json")
.body(hyper::Body::from(serde_json::to_string(&body).unwrap()))
.unwrap())
}
"/metrics" => {
let body = metrics.render();
Ok(hyper::Response::builder()
.status(200)
.header("content-type", "text/plain; version=0.0.4; charset=utf-8")
.body(hyper::Body::from(body))
.unwrap())
}
_ => Ok(hyper::Response::builder()
.status(404)
.body(hyper::Body::from("not found"))
.unwrap()),
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
let cfg = AppConfig::load();
init_tracing_subscriber(&args.log_level, false);
// Pre-register all email/queue metrics so they appear in /metrics even before first event.
describe_counter!("email_queued_total", Unit::Count, "Emails written to Redis stream");
describe_counter!("email_consumed_total", Unit::Count, "Emails consumed from queue");
describe_counter!("email_batch_size", Unit::Count, "Email consumer batch sizes accumulated");
describe_counter!("email_validation_skipped_total", Unit::Count, "Emails skipped due to invalid recipient");
describe_counter!("email_build_errors_total", Unit::Count, "Email message build failures");
describe_counter!("email_send_attempts_total", Unit::Count, "SMTP send attempts (including retries)");
describe_counter!("email_sent_total", Unit::Count, "Emails sent successfully");
describe_counter!("email_send_failures_total", Unit::Count, "Emails that failed after all retries");
let metrics_handle = Arc::new(install_recorder());
tracing::info!("Starting email worker");
let service = AppService::new(cfg).await?;
let db = Arc::new(service.db.clone());
let cache = Arc::new(service.cache.clone());
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
tracing::info!("shutting down email worker");
let _ = shutdown_tx.send(());
});
// Start health/metrics server on a dedicated port
let health_db = db.clone();
let health_cache = cache.clone();
let health_metrics = metrics_handle.clone();
let health_addr: std::net::SocketAddr = ([0, 0, 0, 0], 8084).into();
let health_service = hyper::service::make_service_fn(move |_| {
let db = health_db.clone();
let cache = health_cache.clone();
let metrics = health_metrics.clone();
let service = hyper::service::service_fn(move |req| {
http_handler(db.clone(), cache.clone(), metrics.clone(), req)
});
async move { Ok::<_, std::convert::Infallible>(service) }
});
let health_server = hyper::Server::bind(&health_addr).serve(health_service);
tracing::info!(port = 8084, "health/metrics server started");
tokio::spawn(async move {
if let Err(e) = health_server.await {
tracing::error!("health check server error: {}", e);
}
});
service.start_email_workers(shutdown_rx).await?;
tracing::info!("email worker stopped");
Ok(())
}