use actix_cors::Cors; use actix_web::cookie::time::Duration; use actix_web::middleware::Logger; use actix_web::{cookie::Key, web, App, HttpResponse, HttpServer}; use clap::Parser; use db::cache::AppCache; use db::database::AppDatabase; use observability::{ init_tracing_subscriber, install_recorder, prometheus_handler, spawn_http_metrics_poller, HttpMetrics, HttpSnapshotGuard, MetricsMiddleware, TracingSpanMiddleware, }; use sea_orm::ConnectionTrait; use service::AppService; use session::config::{PersistentSession, SessionLifecycle, TtlExtensionPolicy}; use session::storage::RedisClusterSessionStore; use session::SessionMiddleware; mod args; use args::ServerArgs; use config::AppConfig; use migrate::{Migrator, MigratorTrait}; #[derive(Clone)] pub struct AppState { pub db: AppDatabase, pub cache: AppCache, } fn build_session_key(cfg: &AppConfig) -> anyhow::Result { if let Some(secret) = cfg.env.get("APP_SESSION_SECRET") { let bytes: Vec = secret.as_bytes().iter().cycle().take(64).copied().collect(); return Ok(Key::from(&bytes)); } Ok(Key::generate()) } #[tokio::main] async fn main() -> anyhow::Result<()> { let cfg = AppConfig::load(); let log_level = cfg.log_level().unwrap_or_else(|_| "info".to_string()); let otel_enabled = cfg.otel_enabled().unwrap_or(false); init_tracing_subscriber(&log_level, otel_enabled); tracing::info!( app_name = %cfg.app_name().unwrap_or_default(), app_version = %cfg.app_version().unwrap_or_default(), "Starting application" ); let db = AppDatabase::init(&cfg).await?; tracing::info!("Database connected"); let redis_urls = cfg.redis_urls()?; let store: RedisClusterSessionStore = RedisClusterSessionStore::new(redis_urls).await?; tracing::info!("Redis connected"); let cache = AppCache::init(&cfg).await?; tracing::info!("Cache initialized"); run_migrations(&db).await?; let session_key = build_session_key(&cfg)?; let args = ServerArgs::parse(); let service = AppService::new(cfg.clone()).await?; tracing::info!("AppService initialized"); let _model_sync_handle = service.clone().start_sync_task(); let _billing_alert_handle = service.clone().start_billing_alert_task(); let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel::<()>(1); let worker_service = service.clone(); let worker_handle = tokio::spawn(async move { worker_service.start_room_workers(shutdown_rx).await }); let _otel_guard = if otel_enabled { let endpoint = cfg .otel_endpoint() .unwrap_or_else(|_| "http://localhost:4317".to_string()); let service_name = cfg .otel_service_name() .unwrap_or_else(|_| "app".to_string()); let service_version = cfg .otel_service_version() .unwrap_or_else(|_| "0.1.0".to_string()); tracing::info!(endpoint = %endpoint, service = %service_name, "OTLP tracing enabled"); let guard = observability::init_otlp(&endpoint, &service_name, &service_version, &log_level) .map_err(|e| anyhow::anyhow!("OTLP init failed: {}", e))?; guard } else { None }; let prometheus_handle = install_recorder(); let prometheus_handle_arc = std::sync::Arc::new(prometheus_handle); let prometheus_handle_data = web::Data::new(prometheus_handle_arc.clone()); let http_metrics = std::sync::Arc::new(HttpMetrics::new()); let http_snapshot: HttpSnapshotGuard = std::sync::Arc::new(std::sync::RwLock::new( observability::HttpMetricsSnapshot::default(), )); let http_snapshot_for_poller = http_snapshot.clone(); spawn_http_metrics_poller( http_metrics.clone(), http_snapshot_for_poller, std::time::Duration::from_secs(15), ); let http_snapshot_data = web::Data::new(http_snapshot); let bind_addr = args.bind.unwrap_or_else(|| "127.0.0.1:8080".to_string()); tracing::info!(bind_addr = %bind_addr, "Listening"); let http_metrics_server = http_metrics.clone(); HttpServer::new(move || { let cors = Cors::default() .allow_any_origin() .allow_any_method() .allow_any_header() .supports_credentials() .max_age(3600); let session_mw = SessionMiddleware::builder(store.clone(), session_key.clone()) .cookie_name("id".to_string()) .cookie_path("/".to_string()) .cookie_secure(false) .cookie_http_only(true) .session_lifecycle(SessionLifecycle::PersistentSession( PersistentSession::default() .session_ttl(Duration::days(30)) .session_ttl_extension_policy(TtlExtensionPolicy::OnEveryRequest), )) .build(); let metrics_mw = MetricsMiddleware::new(http_metrics_server.clone()); App::new() .wrap(cors) .wrap(session_mw) .wrap(Logger::default().exclude("/health")) .wrap(metrics_mw) .wrap(TracingSpanMiddleware::new()) .app_data(web::Data::new(AppState { db: db.clone(), cache: cache.clone(), })) .app_data(web::Data::new(service.clone())) .app_data(web::Data::new(cfg.clone())) .app_data(web::Data::new(db.clone())) .app_data(web::Data::new(cache.clone())) .app_data(http_snapshot_data.clone()) .app_data(prometheus_handle_data.clone()) .route("/health", web::get().to(health_check)) .route("/metrics", web::get().to(prometheus_handler)) .configure(api::route::init_routes) }) .bind(&bind_addr)? .run() .await?; tracing::info!("Server stopped, shutting down room workers"); let _ = shutdown_tx.send(()); let _ = worker_handle.await; tracing::info!("Room workers stopped"); Ok(()) } async fn run_migrations(db: &AppDatabase) -> anyhow::Result<()> { tracing::info!("Running database migrations..."); Migrator::up(db.writer(), None) .await .map_err(|e| anyhow::anyhow!("Migration failed: {:?}", e))?; tracing::info!("Migrations completed"); Ok(()) } async fn health_check(state: web::Data) -> HttpResponse { let db_ok = db_ping(&state.db).await; let cache_ok = cache_ping(&state.cache).await; let healthy = db_ok && cache_ok; if healthy { HttpResponse::Ok().json(serde_json::json!({ "status": "ok", "db": "ok", "cache": "ok", })) } else { HttpResponse::ServiceUnavailable().json(serde_json::json!({ "status": "unhealthy", "db": if db_ok { "ok" } else { "error" }, "cache": if cache_ok { "ok" } else { "error" }, })) } } async fn db_ping(db: &AppDatabase) -> bool { db.query_one_raw(sea_orm::Statement::from_string( sea_orm::DbBackend::Postgres, "SELECT 1", )) .await .is_ok() } async fn cache_ping(cache: &AppCache) -> bool { cache.conn().await.is_ok() }