Both init_tracing_subscriber() and init_otlp() were calling try_init() on the global tracing dispatcher, causing "global default trace dispatcher has already been set" at runtime when APP_OTEL_ENABLED=true. Fix: simplify the API so init_tracing_subscriber() never installs the subscriber — it either calls try_init() immediately (non-OTLP mode) or returns without installing (OTLP mode, defer=true). init_otlp() now builds the complete subscriber stack (registry + env_filter + fmt_layer + otel_layer) and calls try_init() once. init_tracing_subscriber() signature: (level, defer) → () init_otlp() signature: (endpoint, service_name, _, log_level) → Result The fmt layer is replicated inside init_otlp() for the OTLP path.
215 lines
7.6 KiB
Rust
215 lines
7.6 KiB
Rust
use actix_cors::Cors;
|
|
use actix_web::cookie::time::Duration;
|
|
use actix_web::middleware::Logger;
|
|
use actix_web::{cookie::Key, web, App, HttpResponse, HttpServer};
|
|
use clap::Parser;
|
|
use db::cache::AppCache;
|
|
use db::database::AppDatabase;
|
|
use observability::{
|
|
init_tracing_subscriber, install_recorder, prometheus_handler, spawn_http_metrics_poller,
|
|
spawn_redis_metrics_flusher,
|
|
HttpMetrics, HttpSnapshotGuard, MetricsMiddleware, TracingSpanMiddleware, instance_id,
|
|
};
|
|
use sea_orm::ConnectionTrait;
|
|
use service::AppService;
|
|
use session::config::{PersistentSession, SessionLifecycle, TtlExtensionPolicy};
|
|
use session::storage::RedisClusterSessionStore;
|
|
use session::SessionMiddleware;
|
|
|
|
mod args;
|
|
|
|
use args::ServerArgs;
|
|
use config::AppConfig;
|
|
use migrate::{Migrator, MigratorTrait};
|
|
|
|
#[derive(Clone)]
|
|
pub struct AppState {
|
|
pub db: AppDatabase,
|
|
pub cache: AppCache,
|
|
}
|
|
|
|
fn build_session_key(cfg: &AppConfig) -> anyhow::Result<Key> {
|
|
if let Some(secret) = cfg.env.get("APP_SESSION_SECRET") {
|
|
let bytes: Vec<u8> = secret.as_bytes().iter().cycle().take(64).copied().collect();
|
|
return Ok(Key::from(&bytes));
|
|
}
|
|
Ok(Key::generate())
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> anyhow::Result<()> {
|
|
let cfg = AppConfig::load();
|
|
let log_level = cfg.log_level().unwrap_or_else(|_| "info".to_string());
|
|
let otel_enabled = cfg.otel_enabled().unwrap_or(false);
|
|
init_tracing_subscriber(&log_level, otel_enabled);
|
|
tracing::info!(
|
|
app_name = %cfg.app_name().unwrap_or_default(),
|
|
app_version = %cfg.app_version().unwrap_or_default(),
|
|
"Starting application"
|
|
);
|
|
let db = AppDatabase::init(&cfg).await?;
|
|
tracing::info!("Database connected");
|
|
let redis_urls = cfg.redis_urls()?;
|
|
let store: RedisClusterSessionStore = RedisClusterSessionStore::new(redis_urls).await?;
|
|
tracing::info!("Redis connected");
|
|
let cache = AppCache::init(&cfg).await?;
|
|
tracing::info!("Cache initialized");
|
|
run_migrations(&db).await?;
|
|
let session_key = build_session_key(&cfg)?;
|
|
let args = ServerArgs::parse();
|
|
let service = AppService::new(cfg.clone()).await?;
|
|
tracing::info!("AppService initialized");
|
|
let _model_sync_handle = service.clone().start_sync_task();
|
|
let _billing_alert_handle = service.clone().start_billing_alert_task();
|
|
|
|
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
|
|
let worker_service = service.clone();
|
|
let worker_handle =
|
|
tokio::spawn(async move { worker_service.start_room_workers(shutdown_rx).await });
|
|
|
|
let _otel_guard = if otel_enabled {
|
|
let endpoint = cfg
|
|
.otel_endpoint()
|
|
.unwrap_or_else(|_| "http://localhost:4317".to_string());
|
|
let service_name = cfg
|
|
.otel_service_name()
|
|
.unwrap_or_else(|_| "app".to_string());
|
|
let service_version = cfg
|
|
.otel_service_version()
|
|
.unwrap_or_else(|_| "0.1.0".to_string());
|
|
tracing::info!(endpoint = %endpoint, service = %service_name, "OTLP tracing enabled");
|
|
let guard =
|
|
observability::init_otlp(&endpoint, &service_name, &service_version, &log_level)
|
|
.map_err(|e| anyhow::anyhow!("OTLP init failed: {}", e))?;
|
|
guard
|
|
} else {
|
|
None
|
|
};
|
|
|
|
let prometheus_handle = install_recorder();
|
|
let prometheus_handle_arc = std::sync::Arc::new(prometheus_handle);
|
|
let prometheus_handle_data = web::Data::new(prometheus_handle_arc.clone());
|
|
|
|
let http_metrics = std::sync::Arc::new(HttpMetrics::new());
|
|
let http_snapshot: HttpSnapshotGuard = std::sync::Arc::new(std::sync::RwLock::new(
|
|
observability::HttpMetricsSnapshot::default(),
|
|
));
|
|
let http_snapshot_for_poller = http_snapshot.clone();
|
|
spawn_http_metrics_poller(
|
|
http_metrics.clone(),
|
|
http_snapshot_for_poller,
|
|
std::time::Duration::from_secs(15),
|
|
);
|
|
let http_snapshot_data = web::Data::new(http_snapshot);
|
|
|
|
// ── Redis metrics flusher (every 5s → key: metrics:{instance}:{ts}) ──────
|
|
let http_for_flusher = http_metrics.clone();
|
|
let prometheus_for_flusher = prometheus_handle_arc.clone();
|
|
spawn_redis_metrics_flusher(
|
|
cache.redis_pool().clone(),
|
|
instance_id(),
|
|
prometheus_for_flusher,
|
|
http_for_flusher,
|
|
std::time::Duration::from_secs(5),
|
|
);
|
|
tracing::info!("Redis metrics flusher started (5s interval)");
|
|
|
|
let bind_addr = args.bind.unwrap_or_else(|| "127.0.0.1:8080".to_string());
|
|
tracing::info!(bind_addr = %bind_addr, "Listening");
|
|
let http_metrics_server = http_metrics.clone();
|
|
HttpServer::new(move || {
|
|
let cors = Cors::default()
|
|
.allow_any_origin()
|
|
.allow_any_method()
|
|
.allow_any_header()
|
|
.supports_credentials()
|
|
.max_age(3600);
|
|
|
|
let session_mw = SessionMiddleware::builder(store.clone(), session_key.clone())
|
|
.cookie_name("id".to_string())
|
|
.cookie_path("/".to_string())
|
|
.cookie_secure(false)
|
|
.cookie_http_only(true)
|
|
.session_lifecycle(SessionLifecycle::PersistentSession(
|
|
PersistentSession::default()
|
|
.session_ttl(Duration::days(30))
|
|
.session_ttl_extension_policy(TtlExtensionPolicy::OnEveryRequest),
|
|
))
|
|
.build();
|
|
|
|
let metrics_mw = MetricsMiddleware::new(http_metrics_server.clone());
|
|
|
|
App::new()
|
|
.wrap(cors)
|
|
.wrap(session_mw)
|
|
.wrap(Logger::default().exclude("/health"))
|
|
.wrap(metrics_mw)
|
|
.wrap(TracingSpanMiddleware::new())
|
|
.app_data(web::Data::new(AppState {
|
|
db: db.clone(),
|
|
cache: cache.clone(),
|
|
}))
|
|
.app_data(web::Data::new(service.clone()))
|
|
.app_data(web::Data::new(cfg.clone()))
|
|
.app_data(web::Data::new(db.clone()))
|
|
.app_data(web::Data::new(cache.clone()))
|
|
.app_data(http_snapshot_data.clone())
|
|
.app_data(prometheus_handle_data.clone())
|
|
.route("/health", web::get().to(health_check))
|
|
.route("/metrics", web::get().to(prometheus_handler))
|
|
.configure(api::route::init_routes)
|
|
})
|
|
.bind(&bind_addr)?
|
|
.run()
|
|
.await?;
|
|
|
|
tracing::info!("Server stopped, shutting down room workers");
|
|
let _ = shutdown_tx.send(());
|
|
let _ = worker_handle.await;
|
|
tracing::info!("Room workers stopped");
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn run_migrations(db: &AppDatabase) -> anyhow::Result<()> {
|
|
tracing::info!("Running database migrations...");
|
|
Migrator::up(db.writer(), None)
|
|
.await
|
|
.map_err(|e| anyhow::anyhow!("Migration failed: {:?}", e))?;
|
|
tracing::info!("Migrations completed");
|
|
Ok(())
|
|
}
|
|
|
|
async fn health_check(state: web::Data<AppState>) -> HttpResponse {
|
|
let db_ok = db_ping(&state.db).await;
|
|
let cache_ok = cache_ping(&state.cache).await;
|
|
|
|
let healthy = db_ok && cache_ok;
|
|
if healthy {
|
|
HttpResponse::Ok().json(serde_json::json!({
|
|
"status": "ok",
|
|
"db": "ok",
|
|
"cache": "ok",
|
|
}))
|
|
} else {
|
|
HttpResponse::ServiceUnavailable().json(serde_json::json!({
|
|
"status": "unhealthy",
|
|
"db": if db_ok { "ok" } else { "error" },
|
|
"cache": if cache_ok { "ok" } else { "error" },
|
|
}))
|
|
}
|
|
}
|
|
|
|
async fn db_ping(db: &AppDatabase) -> bool {
|
|
db.query_one_raw(sea_orm::Statement::from_string(
|
|
sea_orm::DbBackend::Postgres,
|
|
"SELECT 1",
|
|
))
|
|
.await
|
|
.is_ok()
|
|
}
|
|
|
|
async fn cache_ping(cache: &AppCache) -> bool {
|
|
cache.conn().await.is_ok()
|
|
}
|