From 10baa7fbd268fbb593a829630c1ecb3911203b26 Mon Sep 17 00:00:00 2001 From: zhenyi <434836402@qq.com> Date: Mon, 1 Jun 2026 22:04:46 +0800 Subject: [PATCH] refactor: update git, email, AI libs and app entries --- app/email/src/context.rs | 24 +++---- app/email/src/health.rs | 74 +++++++++++++++++++++ app/email/src/main.rs | 16 ++++- app/gitdata/src/context.rs | 42 ++++++++---- app/gitdata/src/main.rs | 130 ++++++++++++++++++++++++++----------- app/gitpod/src/context.rs | 25 +++---- app/gitpod/src/main.rs | 68 ++++++++++++++++--- app/gitsync/src/context.rs | 25 +++---- app/gitsync/src/health.rs | 86 ++++++++++++++---------- app/gitsync/src/main.rs | 27 ++++++-- lib/ai/agent/agent.rs | 1 + lib/email/app.rs | 4 ++ lib/email/worker.rs | 12 +++- lib/git/graphql/mod.rs | 20 ++++++ lib/git/http/lfs_routes.rs | 54 +++++++++++++-- lib/git/http/mod.rs | 80 +++++++++++++++++++++-- lib/git/http/routes.rs | 3 + lib/git/rpc/server.rs | 1 + lib/git/ssh/handler.rs | 34 ++++++++++ lib/git/ssh/mod.rs | 2 + lib/git/ssh/server.rs | 1 + lib/git/sync/branch.rs | 1 + lib/git/sync/commit.rs | 1 + lib/git/sync/language.rs | 1 + lib/git/sync/lfs.rs | 1 + lib/git/sync/tag.rs | 1 + lib/git/sync/worker.rs | 47 ++++++++++++++ 27 files changed, 637 insertions(+), 144 deletions(-) create mode 100644 app/email/src/health.rs diff --git a/app/email/src/context.rs b/app/email/src/context.rs index 5872453..b2df6e4 100644 --- a/app/email/src/context.rs +++ b/app/email/src/context.rs @@ -1,24 +1,26 @@ use config::AppConfig; -use tracing_subscriber::EnvFilter; pub struct AppContext { pub config: AppConfig, + pub metrics: track::MetricsRegistry, + _otel_guard: Option, } impl AppContext { pub fn init() -> anyhow::Result { let config = AppConfig::load(); - init_tracing(&config)?; - Ok(Self { config }) + let otel_guard = init_tracing(&config)?; + let metrics = track::MetricsRegistry::new(); + Ok(Self { + config, + metrics, + _otel_guard: otel_guard, + }) } } -fn init_tracing(config: &AppConfig) -> anyhow::Result<()> { - let level = config.log_level()?; - let filter = EnvFilter::try_new(&level)?; - tracing_subscriber::fmt() - .with_env_filter(filter) - .with_target(false) - .init(); - Ok(()) +fn init_tracing( + config: &AppConfig, +) -> anyhow::Result> { + track::init_lgtm(config) } diff --git a/app/email/src/health.rs b/app/email/src/health.rs new file mode 100644 index 0000000..813cee9 --- /dev/null +++ b/app/email/src/health.rs @@ -0,0 +1,74 @@ +use std::time::Instant; + +use actix_web::dev::Service; +use actix_web::{App, HttpResponse, HttpServer, dev::Server, web}; +use tracing_actix_web::TracingLogger; + +use serde_json; + +async fn health() -> HttpResponse { + HttpResponse::Ok().json(serde_json::json!({ "status": "ok" })) +} + +async fn metrics(metrics: web::Data) -> HttpResponse { + match metrics.encode() { + Ok(body) => HttpResponse::Ok() + .content_type("text/plain; version=0.0.4") + .body(body), + Err(e) => HttpResponse::InternalServerError() + .content_type("text/plain") + .body(format!("metrics encoding error: {e}")), + } +} + +pub fn start_health( + port: u16, + metrics_registry: track::MetricsRegistry, +) -> anyhow::Result { + tracing::info!("email health endpoint starting on 0.0.0.0:{}", port); + + let srv = HttpServer::new(move || { + App::new() + .app_data(web::Data::new(metrics_registry.clone())) + .wrap(TracingLogger::default()) + .wrap_fn(|req, srv| { + let method = req.method().clone(); + let path = req.path().to_owned(); + let peer_addr = + req.connection_info().peer_addr().map(str::to_owned); + let started_at = Instant::now(); + let fut = srv.call(req); + async move { + match fut.await { + Ok(res) => { + tracing::info!( + method = %method, + path = %path, + status = res.status().as_u16(), + elapsed_ms = started_at.elapsed().as_millis(), + peer_addr = peer_addr.as_deref().unwrap_or("-"), + "http request" + ); + Ok(res) + } + Err(err) => { + tracing::warn!( + method = %method, + path = %path, + elapsed_ms = started_at.elapsed().as_millis(), + peer_addr = peer_addr.as_deref().unwrap_or("-"), + error = %err, + "http request failed" + ); + Err(err) + } + } + } + }) + .route("/health", web::get().to(health)) + .route("/metrics", web::get().to(metrics)) + }) + .bind(format!("0.0.0.0:{}", port))?; + + Ok(srv.run()) +} diff --git a/app/email/src/main.rs b/app/email/src/main.rs index 50f7f01..e4a32b7 100644 --- a/app/email/src/main.rs +++ b/app/email/src/main.rs @@ -1,4 +1,5 @@ mod context; +mod health; use context::AppContext; @@ -7,14 +8,27 @@ async fn main() -> anyhow::Result<()> { let ctx = AppContext::init()?; tracing::info!("email service starting"); + let health_port = ctx.config.email_health_port(); + let health_server = health::start_health(health_port, ctx.metrics.clone())?; + let health_handle = health_server.handle(); + let health_task = tokio::spawn(health_server); + tokio::select! { - result = email::EmailWorker::start(&ctx.config) => { + result = email::EmailWorker::start_with_metrics(&ctx.config, Some(ctx.metrics.clone())) => { if let Err(e) = result { tracing::error!("email worker exited with error: {}", e); } } _ = tokio::signal::ctrl_c() => { tracing::info!("shutdown signal received, stopping email service"); + health_handle.stop(true).await; + } + result = health_task => { + match result { + Ok(Ok(())) => tracing::info!("health server stopped"), + Ok(Err(e)) => tracing::error!("health server error: {}", e), + Err(e) => tracing::error!("health task panicked: {}", e), + } } } diff --git a/app/gitdata/src/context.rs b/app/gitdata/src/context.rs index c4c9c58..127fd8d 100644 --- a/app/gitdata/src/context.rs +++ b/app/gitdata/src/context.rs @@ -12,7 +12,7 @@ use session::storage::RedisClusterSessionStore; use storage::{AppStorage, AppStorageConfig}; use tonic::transport::Channel; -use channel::{ChannelBus, ChannelBusConfig}; +use channel::{CdnManager, ChannelBus, ChannelBusConfig}; use socketio::SocketIo; pub struct AppContext { @@ -21,26 +21,30 @@ pub struct AppContext { pub session_store: RedisClusterSessionStore, pub session_key: Key, pub channel_bus: ChannelBus, + _otel_guard: Option, } impl AppContext { pub async fn init() -> anyhow::Result { let config = AppConfig::load(); - init_tracing(&config)?; + let otel_guard = init_tracing(&config)?; tracing::info!("initializing database"); - let db = AppDatabase::init(&config).await?; + let mut db = AppDatabase::init(&config).await?; + + tracing::info!("running database migrations"); + migrate::run_up(db.writer()).await?; tracing::info!("initializing cache"); let cache_config = AppCacheConfig::try_from(&config)?; - let cache = AppCache::init(cache_config).await?; + let mut cache = AppCache::init(cache_config).await?; tracing::info!("initializing storage"); let storage_config = AppStorageConfig::try_from(&config)?; - let storage = AppStorage::init(storage_config).await?; + let mut storage = AppStorage::init(storage_config).await?; tracing::info!("initializing email"); - let email = AppEmail::init(&config).await?; + let mut email = AppEmail::init(&config).await?; tracing::info!("connecting to git RPC"); let rpc_addr = config.git_rpc_addr()?; @@ -51,6 +55,14 @@ impl AppContext { .connect() .await?; + let metrics_registry = track::MetricsRegistry::new(); + db.set_metrics(metrics_registry.clone()); + cache.set_metrics(metrics_registry.clone()); + storage.set_metrics(metrics_registry.clone()); + email.set_metrics(metrics_registry.clone()); + let service_metrics = + service::metrics::ServiceMetrics::init(&metrics_registry); + let service = AppService { db, cache, @@ -59,6 +71,8 @@ impl AppContext { config: config.clone(), git: git_channel, redis_pool: init_redis_pool(&config)?, + metrics_registry, + metrics: service_metrics, }; tracing::info!("initializing session store"); @@ -76,11 +90,14 @@ impl AppContext { signing_secret: Some(secret.clone()), ..Default::default() }; + let cdn = CdnManager::new(service.storage.clone()); let channel_bus = ChannelBus::new( service.db.clone(), service.cache.clone(), io, channel_config, + cdn, + Some(service.metrics_registry.clone()), ); channel_bus.attach().await?; @@ -90,18 +107,15 @@ impl AppContext { session_store, session_key, channel_bus, + _otel_guard: otel_guard, }) } } -fn init_tracing(config: &AppConfig) -> anyhow::Result<()> { - let level = config.log_level()?; - let filter = tracing_subscriber::EnvFilter::try_new(&level)?; - tracing_subscriber::fmt() - .with_env_filter(filter) - .with_target(false) - .init(); - Ok(()) +fn init_tracing( + config: &AppConfig, +) -> anyhow::Result> { + track::init_lgtm(config) } fn init_redis_pool(config: &AppConfig) -> anyhow::Result { diff --git a/app/gitdata/src/main.rs b/app/gitdata/src/main.rs index 1c4ebae..272f270 100644 --- a/app/gitdata/src/main.rs +++ b/app/gitdata/src/main.rs @@ -6,6 +6,8 @@ use std::time::Instant; use actix_web::{App, dev::Service}; use context::AppContext; use service::ai::sync::spawn_model_sync_loop; +use tracing_actix_web::TracingLogger; +use track::{CounterVec, HistogramVec}; const REQUEST_LOG_EXCLUDED_PATHS: &[&str] = &[ "/health", @@ -20,6 +22,41 @@ fn should_log_request(path: &str) -> bool { !REQUEST_LOG_EXCLUDED_PATHS.contains(&path) } +fn record_http_request( + registry: &track::MetricsRegistry, + method: &str, + status: u16, + elapsed: std::time::Duration, +) { + http_requests_total(registry) + .with_label_values(&[method, &status.to_string()]) + .inc(); + http_request_duration(registry) + .with_label_values(&[method, &status.to_string()]) + .observe(elapsed.as_secs_f64()); +} + +fn http_requests_total(registry: &track::MetricsRegistry) -> CounterVec { + registry + .register_counter_vec( + "http_requests_total", + "Total HTTP requests", + &["method", "status"], + ) + .expect("failed to register http_requests_total") +} + +fn http_request_duration(registry: &track::MetricsRegistry) -> HistogramVec { + registry + .register_histogram_vec( + "http_request_duration_seconds", + "HTTP request duration in seconds", + &["method", "status"], + vec![0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0], + ) + .expect("failed to register http_request_duration_seconds") +} + #[tokio::main] async fn main() -> anyhow::Result<()> { let ctx = AppContext::init().await?; @@ -45,51 +82,70 @@ async fn main() -> anyhow::Result<()> { ) .build(); + let request_metrics = service.metrics_registry.clone(); + App::new() .app_data(actix_web::web::Data::new(service.clone())) .app_data(actix_web::web::Data::new(channel_bus.clone())) - .wrap_fn(|req, srv| { - let should_log = should_log_request(req.path()); - let method = req.method().clone(); - let path = req.path().to_owned(); - let peer_addr = - req.connection_info().peer_addr().map(str::to_owned); - let started_at = Instant::now(); - let fut = srv.call(req); + .wrap(TracingLogger::default()) + .wrap_fn(move |req, srv| { + let metrics = request_metrics.clone(); + let should_log = should_log_request(req.path()); + let method = req.method().clone(); + let path = req.path().to_owned(); + let peer_addr = + req.connection_info().peer_addr().map(str::to_owned); + let started_at = Instant::now(); + let fut = srv.call(req); - async move { - match fut.await { - Ok(res) => { - if should_log { - tracing::info!( - method = %method, - path = %path, - status = res.status().as_u16(), - elapsed_ms = started_at.elapsed().as_millis(), - peer_addr = peer_addr.as_deref().unwrap_or("-"), - "http request" - ); - } - Ok(res) + async move { + match fut.await { + Ok(res) => { + let elapsed = started_at.elapsed(); + let status = res.status().as_u16(); + record_http_request( + &metrics, + method.as_str(), + status, + elapsed, + ); + if should_log { + tracing::info!( + method = %method, + path = %path, + status = status, + elapsed_ms = elapsed.as_millis(), + peer_addr = peer_addr.as_deref().unwrap_or("-"), + "http request" + ); } - Err(err) => { - if should_log { - tracing::warn!( - method = %method, - path = %path, - elapsed_ms = started_at.elapsed().as_millis(), - peer_addr = peer_addr.as_deref().unwrap_or("-"), - error = %err, - "http request failed" - ); - } - Err(err) + Ok(res) + } + Err(err) => { + let elapsed = started_at.elapsed(); + record_http_request( + &metrics, + method.as_str(), + 500, + elapsed, + ); + if should_log { + tracing::warn!( + method = %method, + path = %path, + elapsed_ms = elapsed.as_millis(), + peer_addr = peer_addr.as_deref().unwrap_or("-"), + error = %err, + "http request failed" + ); } + Err(err) } } - }) - .wrap(session_middleware) - .configure(|cfg| api::configure(cfg, channel_bus.clone())) + } + }) + .wrap(session_middleware) + .configure(|cfg| api::configure(cfg, channel_bus.clone())) }) .bind(format!("0.0.0.0:{}", api_port))?; diff --git a/app/gitpod/src/context.rs b/app/gitpod/src/context.rs index 45b5a8e..d6d5303 100644 --- a/app/gitpod/src/context.rs +++ b/app/gitpod/src/context.rs @@ -10,19 +10,21 @@ pub struct AppContext { pub db: AppDatabase, pub cache: AppCache, pub redis_pool: deadpool_redis::cluster::Pool, + pub metrics: track::MetricsRegistry, + _otel_guard: Option, } impl AppContext { pub async fn init() -> anyhow::Result { let config = AppConfig::load(); - init_tracing(&config)?; + let otel_guard = init_tracing(&config)?; tracing::info!("initializing database"); - let db = AppDatabase::init(&config).await?; + let mut db = AppDatabase::init(&config).await?; tracing::info!("initializing cache"); let cache_config = AppCacheConfig::try_from(&config)?; - let cache = AppCache::init(cache_config).await?; + let mut cache = AppCache::init(cache_config).await?; tracing::info!("initializing redis pool"); let redis_urls = config.redis_urls()?; @@ -44,22 +46,23 @@ impl AppContext { read_from_replicas: false, }; let redis_pool = cfg.create_pool(Some(Runtime::Tokio1))?; + let metrics = track::MetricsRegistry::new(); + db.set_metrics(metrics.clone()); + cache.set_metrics(metrics.clone()); Ok(Self { config, db, cache, redis_pool, + metrics, + _otel_guard: otel_guard, }) } } -fn init_tracing(config: &AppConfig) -> anyhow::Result<()> { - let level = config.log_level()?; - let filter = tracing_subscriber::EnvFilter::try_new(&level)?; - tracing_subscriber::fmt() - .with_env_filter(filter) - .with_target(false) - .init(); - Ok(()) +fn init_tracing( + config: &AppConfig, +) -> anyhow::Result> { + track::init_lgtm(config) } diff --git a/app/gitpod/src/main.rs b/app/gitpod/src/main.rs index 0b8bd62..b15d7e6 100644 --- a/app/gitpod/src/main.rs +++ b/app/gitpod/src/main.rs @@ -20,18 +20,21 @@ async fn main() -> anyhow::Result<()> { rpc_port ); - let http_task = tokio::spawn(git::http::run_http( + let metrics = ctx.metrics.clone(); + let http_task = tokio::spawn(run_http_with_metrics( ctx.config.clone(), ctx.db.clone(), ctx.cache.clone(), ctx.redis_pool.clone(), + metrics.clone(), )); - let ssh_task = tokio::spawn(git::ssh::run_ssh( + let ssh_task = tokio::spawn(run_ssh_with_metrics( ctx.config.clone(), ctx.db.clone(), ctx.cache.clone(), ctx.redis_pool.clone(), + metrics.clone(), )); let rpc_addr_parsed = @@ -44,12 +47,7 @@ async fn main() -> anyhow::Result<()> { ctx.cache.clone(), sync_service, ); - let rpc_task = tokio::spawn(async move { - git_server - .serve() - .await - .map_err(|e| anyhow::anyhow!("{}", e)) - }); + let rpc_task = tokio::spawn(run_rpc_with_metrics(git_server, metrics)); tokio::select! { result = http_task => { @@ -80,3 +78,57 @@ async fn main() -> anyhow::Result<()> { Ok(()) } + +async fn run_http_with_metrics( + config: config::AppConfig, + db: db::database::AppDatabase, + cache: cache::AppCache, + redis_pool: deadpool_redis::cluster::Pool, + metrics: track::MetricsRegistry, +) -> anyhow::Result<()> { + let result = git::http::run_http( + config, + db, + cache, + redis_pool, + Some(metrics.clone()), + ) + .await; + let status = if result.is_ok() { "stop" } else { "error" }; + gitpod_counter(&metrics, "gitpod_http_events_total", status); + result +} + +async fn run_ssh_with_metrics( + config: config::AppConfig, + db: db::database::AppDatabase, + cache: cache::AppCache, + redis_pool: deadpool_redis::cluster::Pool, + metrics: track::MetricsRegistry, +) -> anyhow::Result<()> { + git::ssh::handler::set_ssh_metrics(metrics.clone()); + let result = git::ssh::run_ssh(config, db, cache, redis_pool).await; + let status = if result.is_ok() { "stop" } else { "error" }; + gitpod_counter(&metrics, "gitpod_ssh_events_total", status); + result +} + +async fn run_rpc_with_metrics( + server: git::rpc::server::GitServer, + metrics: track::MetricsRegistry, +) -> anyhow::Result<()> { + let result = server.serve().await.map_err(|e| anyhow::anyhow!("{}", e)); + let status = if result.is_ok() { "stop" } else { "error" }; + gitpod_counter(&metrics, "gitpod_rpc_events_total", status); + result +} + +fn gitpod_counter(registry: &track::MetricsRegistry, name: &str, label: &str) { + if let Ok(cv) = registry.register_counter_vec( + name, + "Gitpod server lifecycle events", + &["event"], + ) { + cv.with_label_values(&[label]).inc(); + } +} diff --git a/app/gitsync/src/context.rs b/app/gitsync/src/context.rs index 45b5a8e..d6d5303 100644 --- a/app/gitsync/src/context.rs +++ b/app/gitsync/src/context.rs @@ -10,19 +10,21 @@ pub struct AppContext { pub db: AppDatabase, pub cache: AppCache, pub redis_pool: deadpool_redis::cluster::Pool, + pub metrics: track::MetricsRegistry, + _otel_guard: Option, } impl AppContext { pub async fn init() -> anyhow::Result { let config = AppConfig::load(); - init_tracing(&config)?; + let otel_guard = init_tracing(&config)?; tracing::info!("initializing database"); - let db = AppDatabase::init(&config).await?; + let mut db = AppDatabase::init(&config).await?; tracing::info!("initializing cache"); let cache_config = AppCacheConfig::try_from(&config)?; - let cache = AppCache::init(cache_config).await?; + let mut cache = AppCache::init(cache_config).await?; tracing::info!("initializing redis pool"); let redis_urls = config.redis_urls()?; @@ -44,22 +46,23 @@ impl AppContext { read_from_replicas: false, }; let redis_pool = cfg.create_pool(Some(Runtime::Tokio1))?; + let metrics = track::MetricsRegistry::new(); + db.set_metrics(metrics.clone()); + cache.set_metrics(metrics.clone()); Ok(Self { config, db, cache, redis_pool, + metrics, + _otel_guard: otel_guard, }) } } -fn init_tracing(config: &AppConfig) -> anyhow::Result<()> { - let level = config.log_level()?; - let filter = tracing_subscriber::EnvFilter::try_new(&level)?; - tracing_subscriber::fmt() - .with_env_filter(filter) - .with_target(false) - .init(); - Ok(()) +fn init_tracing( + config: &AppConfig, +) -> anyhow::Result> { + track::init_lgtm(config) } diff --git a/app/gitsync/src/health.rs b/app/gitsync/src/health.rs index e4a7e18..88b8684 100644 --- a/app/gitsync/src/health.rs +++ b/app/gitsync/src/health.rs @@ -4,6 +4,7 @@ use actix_web::dev::Service; use actix_web::{App, HttpResponse, HttpServer, dev::Server, web}; use cache::AppCache; use db::database::AppDatabase; +use tracing_actix_web::TracingLogger; const REQUEST_LOG_EXCLUDED_PATHS: &[&str] = &[ "/health", @@ -40,10 +41,22 @@ async fn health( } } +async fn metrics(metrics: web::Data) -> HttpResponse { + match metrics.encode() { + Ok(body) => HttpResponse::Ok() + .content_type("text/plain; version=0.0.4") + .body(body), + Err(e) => HttpResponse::InternalServerError() + .content_type("text/plain") + .body(format!("metrics encoding error: {e}")), + } +} + pub fn start_health( port: u16, db: AppDatabase, cache: AppCache, + metrics_registry: track::MetricsRegistry, ) -> anyhow::Result { tracing::info!("health endpoint starting on 0.0.0.0:{}", port); @@ -51,47 +64,50 @@ pub fn start_health( App::new() .app_data(web::Data::new(db.clone())) .app_data(web::Data::new(cache.clone())) + .app_data(web::Data::new(metrics_registry.clone())) + .wrap(TracingLogger::default()) .wrap_fn(|req, srv| { - let should_log = should_log_request(req.path()); - let method = req.method().clone(); - let path = req.path().to_owned(); - let peer_addr = - req.connection_info().peer_addr().map(str::to_owned); - let started_at = Instant::now(); - let fut = srv.call(req); + let should_log = should_log_request(req.path()); + let method = req.method().clone(); + let path = req.path().to_owned(); + let peer_addr = + req.connection_info().peer_addr().map(str::to_owned); + let started_at = Instant::now(); + let fut = srv.call(req); - async move { - match fut.await { - Ok(res) => { - if should_log { - tracing::info!( - method = %method, - path = %path, - status = res.status().as_u16(), - elapsed_ms = started_at.elapsed().as_millis(), - peer_addr = peer_addr.as_deref().unwrap_or("-"), - "http request" - ); - } - Ok(res) + async move { + match fut.await { + Ok(res) => { + if should_log { + tracing::info!( + method = %method, + path = %path, + status = res.status().as_u16(), + elapsed_ms = started_at.elapsed().as_millis(), + peer_addr = peer_addr.as_deref().unwrap_or("-"), + "http request" + ); } - Err(err) => { - if should_log { - tracing::warn!( - method = %method, - path = %path, - elapsed_ms = started_at.elapsed().as_millis(), - peer_addr = peer_addr.as_deref().unwrap_or("-"), - error = %err, - "http request failed" - ); - } - Err(err) + Ok(res) + } + Err(err) => { + if should_log { + tracing::warn!( + method = %method, + path = %path, + elapsed_ms = started_at.elapsed().as_millis(), + peer_addr = peer_addr.as_deref().unwrap_or("-"), + error = %err, + "http request failed" + ); } + Err(err) } } - }) - .route("/health", web::get().to(health)) + } + }) + .route("/health", web::get().to(health)) + .route("/metrics", web::get().to(metrics)) }) .bind(format!("0.0.0.0:{}", port))?; diff --git a/app/gitsync/src/main.rs b/app/gitsync/src/main.rs index 55bd35a..3f31d5f 100644 --- a/app/gitsync/src/main.rs +++ b/app/gitsync/src/main.rs @@ -11,15 +11,19 @@ async fn main() -> anyhow::Result<()> { tracing::info!("gitsync service starting"); let health_port = ctx.config.gitsync_health_port(); - let health_server = - health::start_health(health_port, ctx.db.clone(), ctx.cache.clone())?; + let health_server = health::start_health( + health_port, + ctx.db.clone(), + ctx.cache.clone(), + ctx.metrics.clone(), + )?; let health_handle = health_server.handle(); let health_task = tokio::spawn(health_server); let sync_service = git::sync::ReceiveSyncService::new(ctx.redis_pool.clone()); let consumer = git::sync::consumer::SyncConsumer::new(sync_service, 5); - let worker = git::sync::worker::SyncWorker::new( + let mut worker = git::sync::worker::SyncWorker::new( consumer, ctx.db.clone(), ctx.cache.clone(), @@ -27,8 +31,13 @@ async fn main() -> anyhow::Result<()> { ctx.config.clone(), format!("gitsync-{}", uuid::Uuid::new_v4()), ); + worker.set_metrics(ctx.metrics.clone()); - let worker_task = tokio::spawn(async move { worker.run().await }); + let metrics = ctx.metrics.clone(); + let worker_task = tokio::spawn(async move { + worker.run().await; + record_sync_event(&metrics, "worker_stopped"); + }); tokio::select! { result = health_task => { @@ -49,3 +58,13 @@ async fn main() -> anyhow::Result<()> { Ok(()) } + +fn record_sync_event(registry: &track::MetricsRegistry, event: &str) { + if let Ok(cv) = registry.register_counter_vec( + "gitsync_worker_events_total", + "Gitsync worker lifecycle events", + &["event"], + ) { + cv.with_label_values(&[event]).inc(); + } +} diff --git a/lib/ai/agent/agent.rs b/lib/ai/agent/agent.rs index 62612cc..10584d7 100644 --- a/lib/ai/agent/agent.rs +++ b/lib/ai/agent/agent.rs @@ -45,6 +45,7 @@ impl RigAgent { &self.config } + #[tracing::instrument(skip(self, tools), fields(model = %self.config.model))] pub async fn chat( &self, request: AgentRequest, diff --git a/lib/email/app.rs b/lib/email/app.rs index cec57db..fed1c76 100644 --- a/lib/email/app.rs +++ b/lib/email/app.rs @@ -17,6 +17,10 @@ impl AppEmail { }) } + pub fn set_metrics(&mut self, registry: track::MetricsRegistry) { + self.producer.set_metrics(registry); + } + pub fn with_topic( producer: NatsProducer, topic: impl Into, diff --git a/lib/email/worker.rs b/lib/email/worker.rs index f48693f..75b6759 100644 --- a/lib/email/worker.rs +++ b/lib/email/worker.rs @@ -14,10 +14,20 @@ impl EmailWorker { } pub async fn start(config: &AppConfig) -> anyhow::Result<()> { + Self::start_with_metrics(config, None).await + } + + pub async fn start_with_metrics( + config: &AppConfig, + metrics: Option, + ) -> anyhow::Result<()> { let worker = Self::new(SmtpEmailSender::new(config)?); - let consumer = + let mut consumer = NatsConsumer::new(config, &config.email_consumer_group_id()) .await?; + if let Some(metrics) = metrics { + consumer.set_metrics(metrics); + } let topic = config.email_topic(); consumer.start_consuming(&[topic.as_str()], worker).await?; std::future::pending().await diff --git a/lib/git/graphql/mod.rs b/lib/git/graphql/mod.rs index 31892b1..c8108de 100644 --- a/lib/git/graphql/mod.rs +++ b/lib/git/graphql/mod.rs @@ -14,6 +14,7 @@ use juniper::{ EmptyMutation, EmptySubscription, FieldResult, RootNode, graphql_object, }; use serde_json::json; +use track::CounterVec; use crate::{ bare::GitBare, @@ -76,6 +77,7 @@ pub async fn graphql_handle( let response = body.execute(&schema, &ctx).await; let status_code = if response.is_ok() { 200 } else { 400 }; + record_graphql_metric(&state, response.is_ok()); HttpResponse::build( actix_web::http::StatusCode::from_u16(status_code).unwrap(), ) @@ -189,3 +191,21 @@ impl GraphqlQuery { resolve_blob(ctx, oid).await.map_err(to_field_error) } } + +fn record_graphql_metric(state: &crate::http::HttpAppState, ok: bool) { + if let Some(reg) = &state.metrics { + graphql_counter(reg) + .with_label_values(&[if ok { "success" } else { "error" }]) + .inc(); + } +} + +fn graphql_counter(registry: &track::MetricsRegistry) -> CounterVec { + registry + .register_counter_vec( + "git_graphql_queries_total", + "Total Git GraphQL queries", + &["outcome"], + ) + .expect("failed to register git_graphql_queries_total") +} diff --git a/lib/git/http/lfs_routes.rs b/lib/git/http/lfs_routes.rs index 2671883..0fff3b7 100644 --- a/lib/git/http/lfs_routes.rs +++ b/lib/git/http/lfs_routes.rs @@ -9,6 +9,7 @@ use model::{ repos::RepoModel, users::{user::UserModel, user_token::UserTokenModel}, }; +use track::CounterVec; use crate::{ errors::GitError, @@ -308,6 +309,7 @@ pub async fn lfs_batch( .map_err(|_| { actix_web::error::ErrorInternalServerError("LFS batch failed") })?; + record_lfs_op(&state, "batch", "success"); Ok(HttpResponse::Ok() .content_type("application/vnd.git-lfs+json") .json(response)) @@ -324,12 +326,29 @@ pub async fn lfs_batch( .map_err(|_| { actix_web::error::ErrorInternalServerError("LFS batch failed") })?; + record_lfs_op(&state, "batch", "success"); Ok(HttpResponse::Ok() .content_type("application/vnd.git-lfs+json") .json(response)) } } +fn record_lfs_op(state: &HttpAppState, op: &str, outcome: &str) { + if let Some(reg) = &state.metrics { + lfs_counter(reg).with_label_values(&[op, outcome]).inc(); + } +} + +fn lfs_counter(registry: &track::MetricsRegistry) -> CounterVec { + registry + .register_counter_vec( + "git_lfs_operations_total", + "Total Git LFS operations", + &["operation", "outcome"], + ) + .expect("failed to register git_lfs_operations_total") +} + pub async fn lfs_upload( req: HttpRequest, path: web::Path<(String, String, String)>, @@ -360,7 +379,10 @@ pub async fn lfs_upload( acquire_lfs_write_queue(&state, &handler.model, "upload").await?; let result = match handler.upload_object(&oid, payload).await { - Ok(response) => Ok(response), + Ok(response) => { + record_lfs_op(&state, "upload", "success"); + Ok(response) + } Err(GitError::InvalidOid(_)) => { Err(actix_web::error::ErrorBadRequest("Invalid OID")) } @@ -409,7 +431,10 @@ pub async fn lfs_download( ); match handler.download_object(&oid).await { - Ok(response) => Ok(response), + Ok(response) => { + record_lfs_op(&state, "download", "success"); + Ok(response) + } Err(GitError::NotFound(_)) => { Err(actix_web::error::ErrorNotFound("Object not found")) } @@ -443,8 +468,14 @@ pub async fn lfs_lock_create( acquire_lfs_write_queue(&state, &handler.model, "lock_create").await?; let result = match handler.lock_object(&body.oid, uid).await { - Ok(lock) => Ok(HttpResponse::Created().json(lock)), - Err(GitError::Locked(msg)) => Ok(HttpResponse::Conflict().body(msg)), + Ok(lock) => { + record_lfs_op(&state, "lock_create", "success"); + Ok(HttpResponse::Created().json(lock)) + } + Err(GitError::Locked(msg)) => { + record_lfs_op(&state, "lock_create", "already_locked"); + Ok(HttpResponse::Conflict().body(msg)) + } Err(_e) => { Err(actix_web::error::ErrorInternalServerError("Lock failed")) } @@ -476,7 +507,10 @@ pub async fn lfs_lock_list( ); match handler.list_locks(maybe_oid).await { - Ok(list) => Ok(HttpResponse::Ok().json(list)), + Ok(list) => { + record_lfs_op(&state, "lock_list", "success"); + Ok(HttpResponse::Ok().json(list)) + } Err(_e) => Err(actix_web::error::ErrorInternalServerError( "Lock list failed", )), @@ -504,7 +538,10 @@ pub async fn lfs_lock_get( ); match handler.get_lock(&lock_id).await { - Ok(lock) => Ok(HttpResponse::Ok().json(lock)), + Ok(lock) => { + record_lfs_op(&state, "lock_get", "success"); + Ok(HttpResponse::Ok().json(lock)) + } Err(GitError::NotFound(_)) => { Err(actix_web::error::ErrorNotFound("Lock not found")) } @@ -534,7 +571,10 @@ pub async fn lfs_lock_delete( acquire_lfs_write_queue(&state, &handler.model, "lock_delete").await?; let result = match handler.unlock_object(&lock_id, uid).await { - Ok(()) => Ok(HttpResponse::NoContent().finish()), + Ok(()) => { + record_lfs_op(&state, "lock_delete", "success"); + Ok(HttpResponse::NoContent().finish()) + } Err(GitError::PermissionDenied(_)) => { Err(actix_web::error::ErrorForbidden("Not allowed")) } diff --git a/lib/git/http/mod.rs b/lib/git/http/mod.rs index 3c524c0..709d0ae 100644 --- a/lib/git/http/mod.rs +++ b/lib/git/http/mod.rs @@ -5,6 +5,8 @@ use cache::AppCache; use config::AppConfig; use db::database::AppDatabase; use sqlx; +use tracing_actix_web::TracingLogger; +use track::{CounterVec, HistogramVec}; pub mod action; pub mod auth; @@ -28,6 +30,41 @@ fn should_log_request(path: &str) -> bool { !REQUEST_LOG_EXCLUDED_PATHS.contains(&path) } +fn record_http_request( + registry: &track::MetricsRegistry, + method: &str, + status: u16, + elapsed: std::time::Duration, +) { + http_requests_total(registry) + .with_label_values(&[method, &status.to_string()]) + .inc(); + http_request_duration(registry) + .with_label_values(&[method, &status.to_string()]) + .observe(elapsed.as_secs_f64()); +} + +fn http_requests_total(registry: &track::MetricsRegistry) -> CounterVec { + registry + .register_counter_vec( + "http_requests_total", + "Total HTTP requests", + &["method", "status"], + ) + .expect("failed to register http_requests_total") +} + +fn http_request_duration(registry: &track::MetricsRegistry) -> HistogramVec { + registry + .register_histogram_vec( + "http_request_duration_seconds", + "HTTP request duration in seconds", + &["method", "status"], + vec![0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0], + ) + .expect("failed to register http_request_duration_seconds") +} + #[derive(Clone)] pub struct HttpAppState { pub db: AppDatabase, @@ -36,6 +73,7 @@ pub struct HttpAppState { pub rate_limiter: Arc, pub config: AppConfig, pub git_state: crate::AppGitState, + pub metrics: Option, } async fn robots(state: web::Data) -> HttpResponse { @@ -78,9 +116,27 @@ async fn health(state: web::Data) -> HttpResponse { } } +async fn metrics(state: web::Data) -> HttpResponse { + let Some(metrics) = &state.metrics else { + return HttpResponse::NotFound() + .content_type("text/plain") + .body("metrics not configured"); + }; + + match metrics.encode() { + Ok(body) => HttpResponse::Ok() + .content_type("text/plain; version=0.0.4") + .body(body), + Err(e) => HttpResponse::InternalServerError() + .content_type("text/plain") + .body(format!("metrics encoding error: {e}")), + } +} + pub fn git_http_cfg(cfg: &mut web::ServiceConfig) { cfg.route("/robots.txt", web::get().to(robots)) .route("/health", web::get().to(health)) + .route("/metrics", web::get().to(metrics)) .route( "/{namespace}/{repo_name}.git/action", web::get().to(action::action_poll), @@ -131,11 +187,13 @@ pub fn git_http_cfg(cfg: &mut web::ServiceConfig) { ); } +#[tracing::instrument(skip(config, db, cache, redis_pool, metrics_registry))] pub async fn run_http( config: AppConfig, db: AppDatabase, cache: AppCache, redis_pool: deadpool_redis::cluster::Pool, + metrics_registry: Option, ) -> anyhow::Result<()> { let sync = crate::sync::ReceiveSyncService::new(redis_pool); @@ -156,15 +214,20 @@ pub async fn run_http( rate_limiter, config: config.clone(), git_state, + metrics: metrics_registry, }; let http_port = config.git_http_port()?; tracing::info!("Starting git HTTP server on 0.0.0.0:{}", http_port); let server = HttpServer::new(move || { + let request_metrics = state.metrics.clone(); + App::new() .app_data(web::Data::new(state.clone())) - .wrap_fn(|req, srv| { + .wrap(TracingLogger::default()) + .wrap_fn(move |req, srv| { + let metrics = request_metrics.clone(); let should_log = should_log_request(req.path()); let method = req.method().clone(); let path = req.path().to_owned(); @@ -176,12 +239,17 @@ pub async fn run_http( async move { match fut.await { Ok(res) => { + let elapsed = started_at.elapsed(); + let status = res.status().as_u16(); + if let Some(metrics) = &metrics { + record_http_request(metrics, method.as_str(), status, elapsed); + } if should_log { tracing::info!( method = %method, path = %path, - status = res.status().as_u16(), - elapsed_ms = started_at.elapsed().as_millis(), + status = status, + elapsed_ms = elapsed.as_millis(), peer_addr = peer_addr.as_deref().unwrap_or("-"), "http request" ); @@ -189,11 +257,15 @@ pub async fn run_http( Ok(res) } Err(err) => { + let elapsed = started_at.elapsed(); + if let Some(metrics) = &metrics { + record_http_request(metrics, method.as_str(), 500, elapsed); + } if should_log { tracing::warn!( method = %method, path = %path, - elapsed_ms = started_at.elapsed().as_millis(), + elapsed_ms = elapsed.as_millis(), peer_addr = peer_addr.as_deref().unwrap_or("-"), error = %err, "http request failed" diff --git a/lib/git/http/routes.rs b/lib/git/http/routes.rs index 6498201..c8f6b8e 100644 --- a/lib/git/http/routes.rs +++ b/lib/git/http/routes.rs @@ -16,6 +16,7 @@ use crate::{ }, }; +#[tracing::instrument(skip(req, state), fields(namespace = %path.0, repo = %path.1))] pub async fn info_refs( req: HttpRequest, path: web::Path<(String, String)>, @@ -51,6 +52,7 @@ pub async fn info_refs( handler.info_refs(service_param).await } +#[tracing::instrument(skip(req, payload, state), fields(namespace = %path.0, repo = %path.1))] pub async fn upload_pack( req: HttpRequest, path: web::Path<(String, String)>, @@ -72,6 +74,7 @@ pub async fn upload_pack( handler.upload_pack(payload).await } +#[tracing::instrument(skip(req, payload, state), fields(namespace = %path.0, repo = %path.1))] pub async fn receive_pack( req: HttpRequest, path: web::Path<(String, String)>, diff --git a/lib/git/rpc/server.rs b/lib/git/rpc/server.rs index 6f6f47f..705cc03 100644 --- a/lib/git/rpc/server.rs +++ b/lib/git/rpc/server.rs @@ -62,6 +62,7 @@ impl GitServer { self.registry.register(repo_id, bare_dir); } + #[tracing::instrument(skip(self))] pub async fn serve(self) -> Result<(), Box> { let archive = ArchiveServiceServer::new(ArchiveServiceImpl { registry: self.registry.clone(), diff --git a/lib/git/ssh/handler.rs b/lib/git/ssh/handler.rs index b48312e..d341ae0 100644 --- a/lib/git/ssh/handler.rs +++ b/lib/git/ssh/handler.rs @@ -45,6 +45,38 @@ use crate::{ }, }; +static SSH_METRICS: std::sync::OnceLock = + std::sync::OnceLock::new(); + +/// Call once during server init to wire SSH metrics into the Prometheus registry. +pub fn set_ssh_metrics(registry: track::MetricsRegistry) { + let _ = SSH_METRICS.set(registry); +} + +fn record_ssh(operation: &str, outcome: &str) { + if let Some(reg) = SSH_METRICS.get() { + reg.register_counter_vec( + "git_ssh_operations_total", + "Total SSH git operations", + &["operation", "outcome"], + ) + .map(|cv| cv.with_label_values(&[operation, outcome]).inc()) + .ok(); + } +} + +fn record_ssh_auth(method: &str, outcome: &str) { + if let Some(reg) = SSH_METRICS.get() { + reg.register_counter_vec( + "git_ssh_auth_attempts_total", + "SSH authentication attempts", + &["method", "outcome"], + ) + .map(|cv| cv.with_label_values(&[method, outcome]).inc()) + .ok(); + } +} + const PRE_PACK_LIMIT: usize = 1_048_576; const ZERO_OID: &str = "0000000000000000000000000000000000000000"; @@ -431,6 +463,7 @@ impl russh::server::Handler for SSHandle { Ok(()) } + #[tracing::instrument(skip(self, channel, session))] async fn channel_open_session( &mut self, channel: Channel, @@ -642,6 +675,7 @@ impl russh::server::Handler for SSHandle { Ok(()) } + #[tracing::instrument(skip(self, session), fields(cmd = ?String::from_utf8_lossy(data)))] async fn exec_request( &mut self, channel_id: ChannelId, diff --git a/lib/git/ssh/mod.rs b/lib/git/ssh/mod.rs index 94b3b4f..506aae7 100644 --- a/lib/git/ssh/mod.rs +++ b/lib/git/ssh/mod.rs @@ -49,6 +49,7 @@ impl SSHHandle { } } + #[tracing::instrument(skip(self))] pub async fn run_ssh(&self) -> anyhow::Result<()> { tracing::info!("SSH server starting"); let key_file = self.app.ssh_server_private_key_file()?; @@ -183,6 +184,7 @@ impl SshTokenService { } } +#[tracing::instrument(skip(config, db, cache, redis_pool))] pub async fn run_ssh( config: AppConfig, db: AppDatabase, diff --git a/lib/git/ssh/server.rs b/lib/git/ssh/server.rs index 47bc127..b326890 100644 --- a/lib/git/ssh/server.rs +++ b/lib/git/ssh/server.rs @@ -38,6 +38,7 @@ impl SSHServer { impl russh::server::Server for SSHServer { type Handler = SSHandle; + #[tracing::instrument(skip(self), fields(peer = ?addr))] fn new_client(&mut self, addr: Option) -> Self::Handler { if let Some(addr) = addr { let ip = addr.ip().to_string(); diff --git a/lib/git/sync/branch.rs b/lib/git/sync/branch.rs index 68766c8..a169a01 100644 --- a/lib/git/sync/branch.rs +++ b/lib/git/sync/branch.rs @@ -45,6 +45,7 @@ pub fn collect_branch_tips(bare: &GitBare) -> Result, GitError> { } Ok(branches) } +#[tracing::instrument(skip(db, bare), fields(repo_id = %repo_id))] pub async fn sync_refs( db: &AppDatabase, bare: &GitBare, diff --git a/lib/git/sync/commit.rs b/lib/git/sync/commit.rs index b476101..a9e08c3 100644 --- a/lib/git/sync/commit.rs +++ b/lib/git/sync/commit.rs @@ -6,6 +6,7 @@ use model::repos::RepoCommitterModel; use uuid::Uuid; use crate::{bare::GitBare, cmd::oid::ObjectId, errors::GitError}; +#[tracing::instrument(skip(db, bare), fields(repo_id = %repo_id))] pub async fn sync_commits( db: &AppDatabase, bare: &GitBare, diff --git a/lib/git/sync/language.rs b/lib/git/sync/language.rs index 1bf5a5b..e3c6c75 100644 --- a/lib/git/sync/language.rs +++ b/lib/git/sync/language.rs @@ -126,6 +126,7 @@ fn blob_size(bare: &GitBare, oid: &ObjectId) -> Result { })?; Ok(header.size() as u64) } +#[tracing::instrument(skip(db, bare), fields(repo_id = %repo_id))] pub async fn sync_languages( db: &AppDatabase, bare: &GitBare, diff --git a/lib/git/sync/lfs.rs b/lib/git/sync/lfs.rs index 010d48c..c6a8b55 100644 --- a/lib/git/sync/lfs.rs +++ b/lib/git/sync/lfs.rs @@ -5,6 +5,7 @@ use model::repos::RepoLfsObjectModel; use uuid::Uuid; use crate::{bare::GitBare, errors::GitError}; +#[tracing::instrument(skip(db, bare), fields(repo_id = %repo_id))] pub async fn sync_lfs_objects( db: &AppDatabase, bare: &GitBare, diff --git a/lib/git/sync/tag.rs b/lib/git/sync/tag.rs index aae5443..39e791f 100644 --- a/lib/git/sync/tag.rs +++ b/lib/git/sync/tag.rs @@ -43,6 +43,7 @@ pub fn collect_tag_tips(bare: &GitBare) -> Result, GitError> { } Ok(tags) } +#[tracing::instrument(skip(db, bare), fields(repo_id = %repo_id))] pub async fn sync_tags( db: &AppDatabase, bare: &GitBare, diff --git a/lib/git/sync/worker.rs b/lib/git/sync/worker.rs index 60ed476..5fbad09 100644 --- a/lib/git/sync/worker.rs +++ b/lib/git/sync/worker.rs @@ -6,6 +6,7 @@ use db::{database::AppDatabase, sqlx}; use deadpool_redis::cluster::Pool as RedisPool; use model::repos::RepoModel; use parsefile::TriggerEvent; +use track::CounterVec; use crate::sync::{ HookTask, TaskType, @@ -22,6 +23,7 @@ pub struct SyncWorker { pub config: AppConfig, pub max_retries: usize, pub worker_id: String, + pub metrics: Option, } impl SyncWorker { @@ -41,8 +43,14 @@ impl SyncWorker { config, max_retries: 3, worker_id, + metrics: None, } } + pub fn set_metrics(&mut self, registry: track::MetricsRegistry) { + self.metrics = Some(registry); + } + + #[tracing::instrument(skip(self))] pub async fn run(&self) { tracing::info!(worker_id = %self.worker_id, "sync worker starting"); let mut backoff_secs: u64 = 1; @@ -85,10 +93,12 @@ impl SyncWorker { match result { ProcessResult::Success => { + self.record_sync_metric(&task, "success"); self.consumer.ack(&task_json, &work_key).await; backoff_secs = 1; } ProcessResult::Locked => { + self.record_sync_metric(&task, "locked"); self.consumer .nak_with_retry( &task_json, &work_key, &queue_key, @@ -104,6 +114,7 @@ impl SyncWorker { retry_count = task.retry_count, "max retries exceeded, dropping task" ); + self.record_sync_metric(&task, "dropped"); self.consumer.ack(&task_json, &work_key).await; } else { tracing::warn!( @@ -145,6 +156,7 @@ impl SyncWorker { } } + #[tracing::instrument(skip(self, _task_json, _work_key, _queue_key), fields(task_id = %task.id, task_type = ?task.task_type))] async fn process_task( &self, task: &HookTask, @@ -160,6 +172,7 @@ impl SyncWorker { } } + #[tracing::instrument(skip(self), fields(repo_id = %task.repo_id))] async fn run_sync(&self, task: &HookTask) -> ProcessResult { let repo_id = match task.repo_id.parse::() { Ok(id) => id, @@ -200,6 +213,7 @@ impl SyncWorker { } } + #[tracing::instrument(skip(self), fields(repo_id = %repo_id))] async fn do_sync(&self, repo_id: uuid::Uuid) -> anyhow::Result<()> { let pool = self.db.reader(); @@ -274,6 +288,7 @@ impl SyncWorker { Ok(()) } + #[tracing::instrument(skip(self), fields(repo_id = %task.repo_id))] async fn run_fsck(&self, task: &HookTask) -> ProcessResult { let repo_id = match task.repo_id.parse::() { Ok(id) => id, @@ -316,6 +331,7 @@ impl SyncWorker { } } + #[tracing::instrument(skip(self), fields(repo_id = %task.repo_id))] async fn run_gc(&self, task: &HookTask) -> ProcessResult { let repo_id = match task.repo_id.parse::() { Ok(id) => id, @@ -385,6 +401,7 @@ impl SyncWorker { Ok(()) } + #[tracing::instrument(skip(self), fields(repo_id = %task.repo_id))] async fn run_webhook(&self, task: &HookTask) -> ProcessResult { let repo_id = match task.repo_id.parse::() { Ok(id) => id, @@ -490,6 +507,36 @@ impl SyncWorker { } } +impl SyncWorker { + fn record_sync_metric(&self, task: &HookTask, outcome: &str) { + if let Some(reg) = &self.metrics { + let task_label = task_type_label(&task.task_type); + sync_metrics_vec(reg) + .with_label_values(&[task_label, outcome]) + .inc(); + } + } +} + +fn task_type_label(tt: &TaskType) -> &str { + match tt { + TaskType::Sync => "sync", + TaskType::Fsck => "fsck", + TaskType::Gc => "gc", + TaskType::Webhook => "webhook", + } +} + +fn sync_metrics_vec(registry: &track::MetricsRegistry) -> CounterVec { + registry + .register_counter_vec( + "gitsync_tasks_total", + "Total sync tasks processed", + &["task_type", "outcome"], + ) + .expect("failed to register gitsync_tasks_total") +} + enum ProcessResult { Success, Locked,