166 lines
5.7 KiB
Rust
166 lines
5.7 KiB
Rust
use clap::Parser;
|
|
use config::AppConfig;
|
|
use metrics::{Unit, describe_counter};
|
|
use metrics_exporter_prometheus::PrometheusHandle;
|
|
use observability::{HttpMetrics, init_tracing_subscriber, install_recorder, push::MetricsPusher};
|
|
use sea_orm::ConnectionTrait;
|
|
use service::AppService;
|
|
use std::sync::Arc;
|
|
|
|
#[derive(Parser, Debug)]
|
|
#[command(name = "email-worker")]
|
|
#[command(version)]
|
|
struct Args {
|
|
#[arg(long, default_value = "info")]
|
|
log_level: String,
|
|
}
|
|
|
|
async fn http_handler(
|
|
db: Arc<db::database::AppDatabase>,
|
|
cache: Arc<db::cache::AppCache>,
|
|
metrics: Arc<PrometheusHandle>,
|
|
req: hyper::Request<hyper::Body>,
|
|
) -> Result<hyper::Response<hyper::Body>, std::convert::Infallible> {
|
|
match req.uri().path() {
|
|
"/health" => {
|
|
let writer_ok = db.writer().execute_unprepared("SELECT 1").await.is_ok();
|
|
let reader_ok = db.reader().execute_unprepared("SELECT 1").await.is_ok();
|
|
let db_ok = writer_ok && reader_ok;
|
|
let cache_ok = cache.conn().await.is_ok();
|
|
|
|
let body = serde_json::json!({
|
|
"status": if db_ok && cache_ok { "ok" } else { "unhealthy" },
|
|
"db": if db_ok { "ok" } else { "error" },
|
|
"cache": if cache_ok { "ok" } else { "error" },
|
|
});
|
|
|
|
let status = if db_ok && cache_ok { 200 } else { 503 };
|
|
let body_bytes = match serde_json::to_string(&body) {
|
|
Ok(s) => hyper::Body::from(s),
|
|
Err(e) => {
|
|
return Ok(hyper::Response::builder()
|
|
.status(500)
|
|
.body(hyper::Body::from(format!("serialize error: {}", e)))
|
|
.expect("static response"));
|
|
}
|
|
};
|
|
Ok(hyper::Response::builder()
|
|
.status(status)
|
|
.header("content-type", "application/json")
|
|
.body(body_bytes)
|
|
.expect("static response"))
|
|
}
|
|
"/metrics" => {
|
|
let body = metrics.render();
|
|
Ok(hyper::Response::builder()
|
|
.status(200)
|
|
.header("content-type", "text/plain; version=0.0.4; charset=utf-8")
|
|
.body(hyper::Body::from(body))
|
|
.unwrap())
|
|
}
|
|
_ => Ok(hyper::Response::builder()
|
|
.status(404)
|
|
.body(hyper::Body::from("not found"))
|
|
.unwrap()),
|
|
}
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> anyhow::Result<()> {
|
|
let args = Args::parse();
|
|
let cfg = AppConfig::load();
|
|
init_tracing_subscriber(&args.log_level, false);
|
|
|
|
// Pre-register all email/queue metrics so they appear in /metrics even before first event.
|
|
describe_counter!(
|
|
"email_queued_total",
|
|
Unit::Count,
|
|
"Emails written to Redis stream"
|
|
);
|
|
describe_counter!(
|
|
"email_consumed_total",
|
|
Unit::Count,
|
|
"Emails consumed from queue"
|
|
);
|
|
describe_counter!(
|
|
"email_batch_size",
|
|
Unit::Count,
|
|
"Email consumer batch sizes accumulated"
|
|
);
|
|
describe_counter!(
|
|
"email_validation_skipped_total",
|
|
Unit::Count,
|
|
"Emails skipped due to invalid recipient"
|
|
);
|
|
describe_counter!(
|
|
"email_build_errors_total",
|
|
Unit::Count,
|
|
"Email message build failures"
|
|
);
|
|
describe_counter!(
|
|
"email_send_attempts_total",
|
|
Unit::Count,
|
|
"SMTP send attempts (including retries)"
|
|
);
|
|
describe_counter!("email_sent_total", Unit::Count, "Emails sent successfully");
|
|
describe_counter!(
|
|
"email_send_failures_total",
|
|
Unit::Count,
|
|
"Emails that failed after all retries"
|
|
);
|
|
|
|
let metrics_handle = Arc::new(install_recorder());
|
|
let http_metrics = Arc::new(HttpMetrics::new()); // Worker app — HTTP section will be empty
|
|
|
|
// Metrics pusher: periodically push all metrics to apps/metrics aggregator
|
|
if let Some(push_url) = std::env::var("METRICS_PUSH_URL").ok() {
|
|
let pusher = MetricsPusher::new(&push_url, "email");
|
|
pusher.spawn(
|
|
http_metrics.clone(),
|
|
metrics_handle.clone(),
|
|
std::time::Duration::from_secs(15),
|
|
);
|
|
tracing::info!(push_url = %push_url, "Metrics pusher started (interval 15s)");
|
|
}
|
|
|
|
tracing::info!("Starting email worker");
|
|
let service = AppService::new(cfg).await?;
|
|
|
|
let db = Arc::new(service.db.clone());
|
|
let cache = Arc::new(service.cache.clone());
|
|
|
|
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
|
|
tokio::spawn(async move {
|
|
tokio::signal::ctrl_c().await.ok();
|
|
tracing::info!("shutting down email worker");
|
|
let _ = shutdown_tx.send(());
|
|
});
|
|
|
|
// Start health/metrics server on a dedicated port
|
|
let health_db = db.clone();
|
|
let health_cache = cache.clone();
|
|
let health_metrics = metrics_handle.clone();
|
|
let health_addr: std::net::SocketAddr = ([0, 0, 0, 0], 8084).into();
|
|
let health_service = hyper::service::make_service_fn(move |_| {
|
|
let db = health_db.clone();
|
|
let cache = health_cache.clone();
|
|
let metrics = health_metrics.clone();
|
|
let service = hyper::service::service_fn(move |req| {
|
|
http_handler(db.clone(), cache.clone(), metrics.clone(), req)
|
|
});
|
|
async move { Ok::<_, std::convert::Infallible>(service) }
|
|
});
|
|
|
|
let health_server = hyper::Server::bind(&health_addr).serve(health_service);
|
|
tracing::info!(port = 8084, "health/metrics server started");
|
|
tokio::spawn(async move {
|
|
if let Err(e) = health_server.await {
|
|
tracing::error!("health check server error: {}", e);
|
|
}
|
|
});
|
|
|
|
service.start_email_workers(shutdown_rx).await?;
|
|
tracing::info!("email worker stopped");
|
|
Ok(())
|
|
}
|