diff --git a/Cargo.lock b/Cargo.lock index 5bac54c..9840d0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2510,6 +2510,7 @@ dependencies = [ "anyhow", "config", "lettre", + "metrics", "regex", "serde", "tokio", @@ -2535,7 +2536,11 @@ dependencies = [ "clap", "config", "db", + "hyper 0.14.32", + "metrics-exporter-prometheus", "observability", + "sea-orm", + "serde_json", "service", "tokio", "tracing", @@ -3091,6 +3096,7 @@ dependencies = [ "git2-hooks", "globset", "hex", + "metrics", "models", "num_cpus", "qdrant-client", @@ -3122,8 +3128,12 @@ dependencies = [ "config", "db", "git", + "hyper 0.14.32", + "metrics-exporter-prometheus", "observability", "reqwest 0.13.2", + "sea-orm", + "serde_json", "tokio", "tokio-util", "tracing", @@ -6266,6 +6276,7 @@ dependencies = [ "chrono", "deadpool-redis", "futures", + "metrics", "redis", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index f638934..812b2a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -169,6 +169,7 @@ base64 = "0.22.1" base64ct = "1" p256 = { version = "0.13", features = ["ecdsa", "std"] } http = "1" +hyper = "0.14" tempfile = "3" rig-core = "0.30" diff --git a/apps/email/Cargo.toml b/apps/email/Cargo.toml index aae2237..433d94d 100644 --- a/apps/email/Cargo.toml +++ b/apps/email/Cargo.toml @@ -26,6 +26,10 @@ observability = { workspace = true } anyhow = { workspace = true } clap = { workspace = true, features = ["derive"] } chrono = { workspace = true, features = ["serde"] } +hyper = { workspace = true } +serde_json = { workspace = true } +sea-orm = { workspace = true } +metrics-exporter-prometheus = "0.13" [lints] workspace = true diff --git a/apps/email/src/main.rs b/apps/email/src/main.rs index 4397d52..0a8faf2 100644 --- a/apps/email/src/main.rs +++ b/apps/email/src/main.rs @@ -1,7 +1,10 @@ use clap::Parser; use config::AppConfig; -use observability::init_tracing_subscriber; +use metrics_exporter_prometheus::PrometheusHandle; +use observability::{init_tracing_subscriber, install_recorder}; +use sea_orm::ConnectionTrait; use service::AppService; +use std::sync::Arc; #[derive(Parser, Debug)] #[command(name = "email-worker")] @@ -11,15 +14,64 @@ struct Args { log_level: String, } +async fn http_handler( + db: Arc, + cache: Arc, + metrics: Arc, + req: hyper::Request, +) -> Result, std::convert::Infallible> { + match req.uri().path() { + "/health" => { + let db_ok = db + .query_one_raw(sea_orm::Statement::from_string( + sea_orm::DbBackend::Postgres, + "SELECT 1", + )) + .await + .is_ok(); + let cache_ok = cache.conn().await.is_ok(); + + let body = serde_json::json!({ + "status": if db_ok && cache_ok { "ok" } else { "unhealthy" }, + "db": if db_ok { "ok" } else { "error" }, + "cache": if cache_ok { "ok" } else { "error" }, + }); + + let status = if db_ok && cache_ok { 200 } else { 503 }; + Ok(hyper::Response::builder() + .status(status) + .header("content-type", "application/json") + .body(hyper::Body::from(serde_json::to_string(&body).unwrap())) + .unwrap()) + } + "/metrics" => { + let body = metrics.render(); + Ok(hyper::Response::builder() + .status(200) + .header("content-type", "text/plain; version=0.0.4; charset=utf-8") + .body(hyper::Body::from(body)) + .unwrap()) + } + _ => Ok(hyper::Response::builder() + .status(404) + .body(hyper::Body::from("not found")) + .unwrap()), + } +} + #[tokio::main] async fn main() -> anyhow::Result<()> { let args = Args::parse(); let cfg = AppConfig::load(); init_tracing_subscriber(&args.log_level, false); + let metrics_handle = Arc::new(install_recorder()); tracing::info!("Starting email worker"); let service = AppService::new(cfg).await?; + let db = Arc::new(service.db.clone()); + let cache = Arc::new(service.cache.clone()); + let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel::<()>(1); tokio::spawn(async move { tokio::signal::ctrl_c().await.ok(); @@ -27,6 +79,29 @@ async fn main() -> anyhow::Result<()> { let _ = shutdown_tx.send(()); }); + // Start health/metrics server on a dedicated port + let health_db = db.clone(); + let health_cache = cache.clone(); + let health_metrics = metrics_handle.clone(); + let health_addr: std::net::SocketAddr = ([0, 0, 0, 0], 8084).into(); + let health_service = hyper::service::make_service_fn(move |_| { + let db = health_db.clone(); + let cache = health_cache.clone(); + let metrics = health_metrics.clone(); + let service = hyper::service::service_fn(move |req| { + http_handler(db.clone(), cache.clone(), metrics.clone(), req) + }); + async move { Ok::<_, std::convert::Infallible>(service) } + }); + + let health_server = hyper::Server::bind(&health_addr).serve(health_service); + tracing::info!(port = 8084, "health/metrics server started"); + tokio::spawn(async move { + if let Err(e) = health_server.await { + tracing::error!("health check server error: {}", e); + } + }); + service.start_email_workers(shutdown_rx).await?; tracing::info!("email worker stopped"); Ok(()) diff --git a/apps/git-hook/Cargo.toml b/apps/git-hook/Cargo.toml index c70a0c7..bec78a0 100644 --- a/apps/git-hook/Cargo.toml +++ b/apps/git-hook/Cargo.toml @@ -23,5 +23,9 @@ tracing-subscriber = { workspace = true, features = ["json"] } anyhow = { workspace = true } clap = { workspace = true, features = ["derive"] } tokio-util = { workspace = true } +hyper = { workspace = true } +serde_json = { workspace = true } +sea-orm = { workspace = true } +metrics-exporter-prometheus = "0.13" chrono = { workspace = true, features = ["serde"] } reqwest = { workspace = true } diff --git a/apps/git-hook/src/main.rs b/apps/git-hook/src/main.rs index b4b2920..cff90e9 100644 --- a/apps/git-hook/src/main.rs +++ b/apps/git-hook/src/main.rs @@ -3,28 +3,77 @@ use config::AppConfig; use db::cache::AppCache; use db::database::AppDatabase; use git::hook::HookService; -use observability::init_tracing_subscriber; +use metrics_exporter_prometheus::PrometheusHandle; +use observability::{init_tracing_subscriber, install_recorder}; +use sea_orm::ConnectionTrait; +use std::sync::Arc; use tokio::signal; mod args; use args::HookArgs; +async fn http_handler( + db: Arc, + cache: Arc, + metrics: Arc, + req: hyper::Request, +) -> Result, std::convert::Infallible> { + match req.uri().path() { + "/health" => { + let db_ok = db + .query_one_raw(sea_orm::Statement::from_string( + sea_orm::DbBackend::Postgres, + "SELECT 1", + )) + .await + .is_ok(); + let cache_ok = cache.conn().await.is_ok(); + + let body = serde_json::json!({ + "status": if db_ok && cache_ok { "ok" } else { "unhealthy" }, + "db": if db_ok { "ok" } else { "error" }, + "cache": if cache_ok { "ok" } else { "error" }, + }); + + let status = if db_ok && cache_ok { 200 } else { 503 }; + Ok(hyper::Response::builder() + .status(status) + .header("content-type", "application/json") + .body(hyper::Body::from(serde_json::to_string(&body).unwrap())) + .unwrap()) + } + "/metrics" => { + let body = metrics.render(); + Ok(hyper::Response::builder() + .status(200) + .header("content-type", "text/plain; version=0.0.4; charset=utf-8") + .body(hyper::Body::from(body)) + .unwrap()) + } + _ => Ok(hyper::Response::builder() + .status(404) + .body(hyper::Body::from("not found")) + .unwrap()), + } +} + #[tokio::main] async fn main() -> anyhow::Result<()> { // 1. Load configuration let cfg = AppConfig::load(); - // 2. Init tracing logging + // 2. Init tracing + metrics let log_level = cfg.log_level().unwrap_or_else(|_| "info".to_string()); init_tracing_subscriber(&log_level, false); + let metrics_handle = Arc::new(install_recorder()); // 3. Connect to database - let db = AppDatabase::init(&cfg).await?; + let db = Arc::new(AppDatabase::init(&cfg).await?); tracing::info!("database connected"); // 4. Connect to Redis cache (also provides the cluster pool for hook queue) - let cache = AppCache::init(&cfg).await?; + let cache = Arc::new(AppCache::init(&cfg).await?); tracing::info!("cache connected"); // 5. Parse CLI args @@ -34,8 +83,8 @@ async fn main() -> anyhow::Result<()> { // 6. Build and start git hook service let hooks = HookService::new( - db, - cache.clone(), + (*db).clone(), + (*cache).clone(), cache.redis_pool().clone(), cfg, ); @@ -43,6 +92,30 @@ async fn main() -> anyhow::Result<()> { let cancel = hooks.start_worker(); let cancel_signal = cancel.clone(); + // 7. Start health/metrics server on a dedicated port + let health_db = db.clone(); + let health_cache = cache.clone(); + let health_metrics = metrics_handle.clone(); + let health_addr: std::net::SocketAddr = + ([0, 0, 0, 0], 8083).into(); + let health_service = hyper::service::make_service_fn(move |_| { + let db = health_db.clone(); + let cache = health_cache.clone(); + let metrics = health_metrics.clone(); + let service = hyper::service::service_fn(move |req| { + http_handler(db.clone(), cache.clone(), metrics.clone(), req) + }); + async move { Ok::<_, std::convert::Infallible>(service) } + }); + + let health_server = hyper::Server::bind(&health_addr).serve(health_service); + tracing::info!(port = 8083, "health/metrics server started"); + tokio::spawn(async move { + if let Err(e) = health_server.await { + tracing::error!("health check server error: {}", e); + } + }); + // Spawn signal handler that cancels on SIGINT/SIGTERM tokio::spawn(async move { let ctrl_c = async { diff --git a/deploy/templates/email-worker-deployment.yaml b/deploy/templates/email-worker-deployment.yaml index 9d42762..8b6a68d 100644 --- a/deploy/templates/email-worker-deployment.yaml +++ b/deploy/templates/email-worker-deployment.yaml @@ -30,6 +30,10 @@ spec: - name: email-worker image: "{{ .Values.image.registry }}/{{ .Values.emailWorker.image.repository }}:{{ .Values.emailWorker.image.tag }}" imagePullPolicy: {{ .Values.emailWorker.image.pullPolicy | default .Values.image.pullPolicy }} + ports: + - name: health + containerPort: 8084 + protocol: TCP envFrom: - configMapRef: name: {{ include "gitdata.fullname" . }}-config @@ -39,11 +43,17 @@ spec: {{- end }} {{- if .Values.emailWorker.livenessProbe }} livenessProbe: + {{- if .Values.emailWorker.livenessProbe.httpGet }} + httpGet: + path: {{ .Values.emailWorker.livenessProbe.httpGet.path }} + port: {{ .Values.emailWorker.livenessProbe.httpGet.port }} + {{- else }} exec: command: {{- range .Values.emailWorker.livenessProbe.exec.command }} - {{ . | quote }} {{- end }} + {{- end }} initialDelaySeconds: {{ .Values.emailWorker.livenessProbe.initialDelaySeconds }} periodSeconds: {{ .Values.emailWorker.livenessProbe.periodSeconds }} timeoutSeconds: {{ .Values.emailWorker.livenessProbe.timeoutSeconds }} @@ -51,11 +61,17 @@ spec: {{- end }} {{- if .Values.emailWorker.readinessProbe }} readinessProbe: + {{- if .Values.emailWorker.readinessProbe.httpGet }} + httpGet: + path: {{ .Values.emailWorker.readinessProbe.httpGet.path }} + port: {{ .Values.emailWorker.readinessProbe.httpGet.port }} + {{- else }} exec: command: {{- range .Values.emailWorker.readinessProbe.exec.command }} - {{ . | quote }} {{- end }} + {{- end }} initialDelaySeconds: {{ .Values.emailWorker.readinessProbe.initialDelaySeconds }} periodSeconds: {{ .Values.emailWorker.readinessProbe.periodSeconds }} timeoutSeconds: {{ .Values.emailWorker.readinessProbe.timeoutSeconds }} diff --git a/deploy/templates/git-hook-deployment.yaml b/deploy/templates/git-hook-deployment.yaml index 7968c88..7b96f7b 100644 --- a/deploy/templates/git-hook-deployment.yaml +++ b/deploy/templates/git-hook-deployment.yaml @@ -30,6 +30,10 @@ spec: - name: git-hook image: "{{ .Values.image.registry }}/{{ .Values.gitHook.image.repository }}:{{ .Values.gitHook.image.tag }}" imagePullPolicy: {{ .Values.gitHook.image.pullPolicy | default .Values.image.pullPolicy }} + ports: + - name: health + containerPort: 8083 + protocol: TCP envFrom: - configMapRef: name: {{ include "gitdata.fullname" . }}-config @@ -42,11 +46,17 @@ spec: {{- end }} {{- if .Values.gitHook.livenessProbe }} livenessProbe: + {{- if .Values.gitHook.livenessProbe.httpGet }} + httpGet: + path: {{ .Values.gitHook.livenessProbe.httpGet.path }} + port: {{ .Values.gitHook.livenessProbe.httpGet.port }} + {{- else }} exec: command: {{- range .Values.gitHook.livenessProbe.exec.command }} - {{ . | quote }} {{- end }} + {{- end }} initialDelaySeconds: {{ .Values.gitHook.livenessProbe.initialDelaySeconds }} periodSeconds: {{ .Values.gitHook.livenessProbe.periodSeconds }} timeoutSeconds: {{ .Values.gitHook.livenessProbe.timeoutSeconds }} @@ -54,11 +64,17 @@ spec: {{- end }} {{- if .Values.gitHook.readinessProbe }} readinessProbe: + {{- if .Values.gitHook.readinessProbe.httpGet }} + httpGet: + path: {{ .Values.gitHook.readinessProbe.httpGet.path }} + port: {{ .Values.gitHook.readinessProbe.httpGet.port }} + {{- else }} exec: command: {{- range .Values.gitHook.readinessProbe.exec.command }} - {{ . | quote }} {{- end }} + {{- end }} initialDelaySeconds: {{ .Values.gitHook.readinessProbe.initialDelaySeconds }} periodSeconds: {{ .Values.gitHook.readinessProbe.periodSeconds }} timeoutSeconds: {{ .Values.gitHook.readinessProbe.timeoutSeconds }} diff --git a/deploy/templates/gitserver-deployment.yaml b/deploy/templates/gitserver-deployment.yaml index 35cd633..6d5d103 100644 --- a/deploy/templates/gitserver-deployment.yaml +++ b/deploy/templates/gitserver-deployment.yaml @@ -44,6 +44,29 @@ spec: - name: ssh containerPort: {{ $svc.service.ssh.port }} protocol: TCP + - name: health + containerPort: 8021 + protocol: TCP + {{- if $svc.livenessProbe }} + livenessProbe: + httpGet: + path: {{ $svc.livenessProbe.path }} + port: {{ $svc.livenessProbe.port }} + initialDelaySeconds: {{ $svc.livenessProbe.initialDelaySeconds }} + periodSeconds: {{ $svc.livenessProbe.periodSeconds }} + timeoutSeconds: {{ $svc.livenessProbe.timeoutSeconds | default 3 }} + failureThreshold: {{ $svc.livenessProbe.failureThreshold | default 3 }} + {{- end }} + {{- if $svc.readinessProbe }} + readinessProbe: + httpGet: + path: {{ $svc.readinessProbe.path }} + port: {{ $svc.readinessProbe.port }} + initialDelaySeconds: {{ $svc.readinessProbe.initialDelaySeconds }} + periodSeconds: {{ $svc.readinessProbe.periodSeconds }} + timeoutSeconds: {{ $svc.readinessProbe.timeoutSeconds | default 3 }} + failureThreshold: {{ $svc.readinessProbe.failureThreshold | default 3 }} + {{- end }} envFrom: - configMapRef: name: {{ $fullName }}-config diff --git a/deploy/values.yaml b/deploy/values.yaml index ec9b7e9..ae63a4a 100644 --- a/deploy/values.yaml +++ b/deploy/values.yaml @@ -255,6 +255,24 @@ gitserver: env: [] + livenessProbe: + httpGet: + path: /health + port: 8021 + initialDelaySeconds: 5 + periodSeconds: 10 + timeoutSeconds: 3 + failureThreshold: 3 + + readinessProbe: + httpGet: + path: /health + port: 8021 + initialDelaySeconds: 5 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 3 + nodeSelector: {} tolerations: [] affinity: {} @@ -270,22 +288,18 @@ emailWorker: tag: latest livenessProbe: - exec: - command: - - /bin/sh - - -c - - "kill -0 1 || exit 1" + httpGet: + path: /health + port: 8084 initialDelaySeconds: 10 periodSeconds: 30 timeoutSeconds: 5 failureThreshold: 3 readinessProbe: - exec: - command: - - /bin/sh - - -c - - "kill -0 1 || exit 1" + httpGet: + path: /health + port: 8084 initialDelaySeconds: 5 periodSeconds: 15 timeoutSeconds: 3 @@ -319,22 +333,18 @@ gitHook: minAvailable: 1 livenessProbe: - exec: - command: - - /bin/sh - - -c - - "kill -0 1 || exit 1" + httpGet: + path: /health + port: 8083 initialDelaySeconds: 10 periodSeconds: 15 timeoutSeconds: 5 failureThreshold: 3 readinessProbe: - exec: - command: - - /bin/sh - - -c - - "kill -0 1 || exit 1" + httpGet: + path: /health + port: 8083 initialDelaySeconds: 5 periodSeconds: 10 timeoutSeconds: 3 diff --git a/libs/email/Cargo.toml b/libs/email/Cargo.toml index 418438b..0e17114 100644 --- a/libs/email/Cargo.toml +++ b/libs/email/Cargo.toml @@ -22,5 +22,6 @@ serde = { workspace = true, features = ["derive"] } anyhow = { workspace = true } regex = { workspace = true } tracing = { workspace = true } +metrics = { workspace = true } [lints] workspace = true diff --git a/libs/email/lib.rs b/libs/email/lib.rs index b3bf05f..2c69fb2 100644 --- a/libs/email/lib.rs +++ b/libs/email/lib.rs @@ -2,6 +2,7 @@ use config::AppConfig; use lettre::message::Mailbox; use lettre::transport::smtp::{PoolConfig, SmtpTransport}; use lettre::Transport; +use metrics::counter; use regex::Regex; use serde::{Deserialize, Serialize}; use std::sync::LazyLock; @@ -64,6 +65,7 @@ impl AppEmail { tokio::spawn(async move { while let Some(msg) = rx.recv().await { if !EMAIL_REGEX.is_match(&msg.clone().to) { + counter!("email_validation_skipped_total").increment(1); continue; } @@ -75,12 +77,14 @@ impl AppEmail { { Ok(e) => e, Err(_) => { + counter!("email_build_errors_total").increment(1); tracing::warn!(to = %msg.to, "Email build error"); continue; } }; let mut success = false; for i in 0..3 { + counter!("email_send_attempts_total").increment(1); let mailer = mailer.clone(); let email = email.clone(); let result = tokio::task::spawn_blocking(move || mailer.send(&email)).await; @@ -92,6 +96,7 @@ impl AppEmail { } Ok(Err(e)) => { if i == 2 { + counter!("email_send_failures_total").increment(1); tracing::error!(to = %msg.to, error = %e, "Email send failed after retries"); } tokio::time::sleep(Duration::from_secs((1 << i) as u64)).await; @@ -103,8 +108,8 @@ impl AppEmail { } } - if !success { - tracing::warn!(to = %msg.to, "Email send permanently failed"); + if success { + counter!("email_sent_total").increment(1); } } }); diff --git a/libs/git/Cargo.toml b/libs/git/Cargo.toml index 7392543..53cb7bb 100644 --- a/libs/git/Cargo.toml +++ b/libs/git/Cargo.toml @@ -50,5 +50,6 @@ ssh-key = { workspace = true } actix-web = { workspace = true } hex = "0.4.3" reqwest = { workspace = true } +metrics = { workspace = true } [lints] workspace = true diff --git a/libs/git/hook/pool/worker.rs b/libs/git/hook/pool/worker.rs index ffe1c6d..a503f5d 100644 --- a/libs/git/hook/pool/worker.rs +++ b/libs/git/hook/pool/worker.rs @@ -4,6 +4,7 @@ use crate::hook::pool::types::{HookTask, TaskType}; use crate::hook::sync::HookMetaDataSync; use db::cache::AppCache; use db::database::AppDatabase; +use metrics::counter; use models::EntityTrait; use std::sync::Arc; use std::time::Duration; @@ -92,6 +93,8 @@ impl HookWorker { tracing::info!("task started task_id={} task_type={} repo_id={}", task.id, task.task_type, task.repo_id); + counter!("hook_tasks_total", "task_type" => task.task_type.to_string()).increment(1); + let result = match task.task_type { TaskType::Sync => self.run_sync(&task.repo_id).await, TaskType::Fsck => self.run_fsck(&task.repo_id).await, @@ -100,6 +103,7 @@ impl HookWorker { match result { Ok(()) => { + counter!("hook_tasks_success_total", "task_type" => task.task_type.to_string()).increment(1); if let Err(e) = self.consumer.ack(work_key, task_json).await { tracing::warn!("failed to ack task: {}", e); } @@ -109,20 +113,24 @@ impl HookWorker { let is_locked = matches!(e, crate::GitError::Locked(_)); if is_locked { + counter!("hook_tasks_locked_total").increment(1); // Another worker holds the lock — requeue without counting as retry. tracing::info!("repo locked by another worker, requeueing task_id={}", task.id); if let Err(nak_err) = self.consumer.nak(work_key, queue_key, task_json).await { tracing::warn!("failed to requeue locked task: {}", nak_err); } } else { + counter!("hook_tasks_failed_total", "task_type" => task.task_type.to_string()).increment(1); tracing::warn!("task failed task_id={} task_type={} repo_id={} error={}", task.id, task.task_type, task.repo_id, e); if task.retry_count >= self.max_retries { + counter!("hook_tasks_exhausted_total").increment(1); tracing::warn!("task exhausted retries, discarding task_id={} retry_count={}", task.id, task.retry_count); let _ = self.consumer.ack(work_key, task_json).await; } else { + counter!("hook_tasks_retried_total").increment(1); let mut task = task.clone(); task.retry_count += 1; let retry_json = @@ -225,6 +233,7 @@ impl HookWorker { // Dispatch branch webhooks and collect handles let mut handles = Vec::new(); + let mut branch_changes: u64 = 0; for (branch, after_oid) in after_branch_tips { let before_oid = before_branch_tips .iter() @@ -232,6 +241,7 @@ impl HookWorker { .map(|(_, o)| o.as_str()); let changed = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(true); if changed { + branch_changes += 1; let before_oid = before_oid.map_or("0", |v| v).to_string(); let branch_name = branch.clone(); let h = tokio::spawn({ @@ -266,6 +276,7 @@ impl HookWorker { } // Dispatch tag webhooks and collect handles + let mut tag_changes: u64 = 0; for (tag, after_oid) in after_tag_tips { let before_oid = before_tag_tips .iter() @@ -274,6 +285,7 @@ impl HookWorker { let is_new = before_oid.is_none(); let was_updated = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(false); if is_new || was_updated { + tag_changes += 1; let before_oid = before_oid.map_or("0", |v| v).to_string(); let tag_name = tag.clone(); let h = tokio::spawn({ @@ -311,6 +323,9 @@ impl HookWorker { let _ = h.await; } + counter!("hook_sync_branches_changed_total").increment(branch_changes); + counter!("hook_sync_tags_changed_total").increment(tag_changes); + Ok(()) } diff --git a/libs/git/http/mod.rs b/libs/git/http/mod.rs index d57283b..c9b4967 100644 --- a/libs/git/http/mod.rs +++ b/libs/git/http/mod.rs @@ -1,8 +1,9 @@ use crate::hook::HookService; -use actix_web::{App, HttpServer, web}; +use actix_web::{App, HttpServer, HttpResponse, web}; use config::AppConfig; use db::cache::AppCache; use db::database::AppDatabase; +use sea_orm::ConnectionTrait; use std::sync::Arc; pub mod auth; @@ -21,8 +22,35 @@ pub struct HttpAppState { pub rate_limiter: Arc, } +async fn health(state: web::Data) -> HttpResponse { + let db_ok = state + .db + .query_one_raw(sea_orm::Statement::from_string( + sea_orm::DbBackend::Postgres, + "SELECT 1", + )) + .await + .is_ok(); + let cache_ok = state.cache.conn().await.is_ok(); + + if db_ok && cache_ok { + HttpResponse::Ok().json(serde_json::json!({ + "status": "ok", + "db": "ok", + "cache": "ok", + })) + } else { + HttpResponse::ServiceUnavailable().json(serde_json::json!({ + "status": "unhealthy", + "db": if db_ok { "ok" } else { "error" }, + "cache": if cache_ok { "ok" } else { "error" }, + })) + } +} + pub fn git_http_cfg(cfg: &mut web::ServiceConfig) { - cfg.route( + cfg.route("/health", web::get().to(health)) + .route( "/{namespace}/{repo_name}.git/info/refs", web::get().to(routes::info_refs), ) diff --git a/libs/queue/Cargo.toml b/libs/queue/Cargo.toml index 3aabd66..bccd2db 100644 --- a/libs/queue/Cargo.toml +++ b/libs/queue/Cargo.toml @@ -29,6 +29,7 @@ thiserror = { workspace = true } uuid = { workspace = true, features = ["v7", "v4", "serde"] } chrono = { workspace = true, features = ["serde"] } tracing = { workspace = true } +metrics = { workspace = true } [lints] workspace = true diff --git a/libs/queue/producer.rs b/libs/queue/producer.rs index 59384fe..0eebf00 100644 --- a/libs/queue/producer.rs +++ b/libs/queue/producer.rs @@ -5,6 +5,7 @@ use crate::types::{ RoomMessageEvent, }; use anyhow::Context; +use metrics::counter; use deadpool_redis::cluster::Connection as RedisConn; use std::sync::Arc; @@ -217,6 +218,7 @@ impl MessageProducer { .context("XADD email to Redis Stream")?; tracing::info!(to = %envelope.to, entry_id = %entry_id, "email queued to stream"); + counter!("email_queued_total").increment(1); Ok(entry_id) } diff --git a/libs/queue/worker.rs b/libs/queue/worker.rs index 1192984..8365f73 100644 --- a/libs/queue/worker.rs +++ b/libs/queue/worker.rs @@ -1,6 +1,7 @@ //! Redis Streams consumer — delegates persistence to the caller. use crate::types::{EmailEnvelope, RoomMessageEnvelope}; +use metrics::counter; use std::future::Future; use std::pin::Pin; use std::sync::Arc; @@ -262,6 +263,10 @@ async fn email_run_once( return Ok(0); } + let batch_size = batch.len(); + counter!("email_consumed_total").increment(batch_size as u64); + counter!("email_batch_size").increment(batch_size as u64); + let entry_ids: Vec = batch.iter().map(|(id, _)| id.clone()).collect(); let envelopes: Vec = batch.into_iter().map(|(_, e)| e).collect();