diff --git a/Cargo.lock b/Cargo.lock index 710eca7..dc60e72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2134,7 +2134,6 @@ dependencies = [ "async-trait", "config", "deadpool-redis", - "rand 0.10.0", "sea-orm", "tokio", ] diff --git a/apps/app/src/main.rs b/apps/app/src/main.rs index 723bb5d..9c27d81 100644 --- a/apps/app/src/main.rs +++ b/apps/app/src/main.rs @@ -275,12 +275,17 @@ async fn health_check(state: web::Data) -> HttpResponse { } async fn db_ping(db: &AppDatabase) -> bool { - db.query_one_raw(sea_orm::Statement::from_string( - sea_orm::DbBackend::Postgres, - "SELECT 1", - )) - .await - .is_ok() + let writer_ok = db + .writer() + .execute_unprepared("SELECT 1") + .await + .is_ok(); + let reader_ok = db + .reader() + .execute_unprepared("SELECT 1") + .await + .is_ok(); + writer_ok && reader_ok } async fn cache_ping(cache: &AppCache) -> bool { diff --git a/apps/email/src/main.rs b/apps/email/src/main.rs index 8e4b168..d94af7b 100644 --- a/apps/email/src/main.rs +++ b/apps/email/src/main.rs @@ -23,13 +23,17 @@ async fn http_handler( ) -> Result, std::convert::Infallible> { match req.uri().path() { "/health" => { - let db_ok = db - .query_one_raw(sea_orm::Statement::from_string( - sea_orm::DbBackend::Postgres, - "SELECT 1", - )) + let writer_ok = db + .writer() + .execute_unprepared("SELECT 1") .await .is_ok(); + let reader_ok = db + .reader() + .execute_unprepared("SELECT 1") + .await + .is_ok(); + let db_ok = writer_ok && reader_ok; let cache_ok = cache.conn().await.is_ok(); let body = serde_json::json!({ diff --git a/apps/git-hook/src/main.rs b/apps/git-hook/src/main.rs index 209a3c1..08b1710 100644 --- a/apps/git-hook/src/main.rs +++ b/apps/git-hook/src/main.rs @@ -22,13 +22,17 @@ async fn http_handler( ) -> Result, std::convert::Infallible> { match req.uri().path() { "/health" => { - let db_ok = db - .query_one_raw(sea_orm::Statement::from_string( - sea_orm::DbBackend::Postgres, - "SELECT 1", - )) + let writer_ok = db + .writer() + .execute_unprepared("SELECT 1") .await .is_ok(); + let reader_ok = db + .reader() + .execute_unprepared("SELECT 1") + .await + .is_ok(); + let db_ok = writer_ok && reader_ok; let cache_ok = cache.conn().await.is_ok(); let body = serde_json::json!({ diff --git a/libs/config/database.rs b/libs/config/database.rs index 52652fe..bc31059 100644 --- a/libs/config/database.rs +++ b/libs/config/database.rs @@ -43,11 +43,14 @@ impl AppConfig { } Ok("public".to_string()) } - pub fn database_read_replicas(&self) -> anyhow::Result> { + pub fn database_read_replicas(&self) -> anyhow::Result> { if let Some(replicas) = self.env.get("APP_DATABASE_REPLICAS") { - return Ok(replicas.split(',').map(|s| s.to_string()).collect()); + if replicas.is_empty() { + return Ok(None); + } + return Ok(Some(replicas.to_string())); } - Ok(vec![]) + Ok(None) } pub fn database_health_check_interval(&self) -> anyhow::Result { if let Some(interval) = self.env.get("APP_DATABASE_HEALTH_CHECK_INTERVAL") { diff --git a/libs/db/Cargo.toml b/libs/db/Cargo.toml index 6957bed..a91ce80 100644 --- a/libs/db/Cargo.toml +++ b/libs/db/Cargo.toml @@ -19,7 +19,6 @@ sea-orm = { workspace = true, features = ["sqlx-all", "runtime-tokio"] } deadpool-redis = { workspace = true, features = ["rt_tokio_1", "cluster-async", "cluster"] } config = { workspace = true } anyhow = { workspace = true } -rand = { workspace = true, features = [] } tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } async-trait = { workspace = true } [lints] diff --git a/libs/db/database.rs b/libs/db/database.rs index c4f043b..b965878 100644 --- a/libs/db/database.rs +++ b/libs/db/database.rs @@ -1,5 +1,4 @@ use config::AppConfig; -use rand::random_range; use sea_orm::prelude::async_trait::async_trait; use sea_orm::{ ConnectionTrait, Database, DatabaseConnection, DatabaseTransaction, DbBackend, DbErr, @@ -10,7 +9,7 @@ use std::time::Duration; #[derive(Clone)] pub struct AppDatabase { db_write: DatabaseConnection, - db_read: Vec, + db_read: Option, } impl AppDatabase { @@ -22,7 +21,7 @@ impl AppDatabase { let max_lifetime = cfg.database_max_lifetime()?; let connection_timeout = cfg.database_connection_timeout()?; let schema_search_path = cfg.database_schema_search_path()?; - let read_replicas = cfg.database_read_replicas()?; + let read_replica = cfg.database_read_replicas()?; let conn_cfg = sea_orm::ConnectOptions::new(db_url) .max_connections(max_connections) @@ -36,9 +35,8 @@ impl AppDatabase { let db_write = Database::connect(conn_cfg).await?; - let mut db_read = vec![]; - for replica in read_replicas { - let conn_cfg = sea_orm::ConnectOptions::new(replica.clone()) + let db_read = if let Some(ref replica_url) = read_replica { + let conn_cfg = sea_orm::ConnectOptions::new(replica_url.clone()) .max_connections(max_connections) .min_connections(min_connections) .idle_timeout(Duration::from_secs(idle_timeout)) @@ -46,9 +44,10 @@ impl AppDatabase { .connect_timeout(Duration::from_secs(connection_timeout)) .to_owned(); - let conn = Database::connect(conn_cfg).await?; - db_read.push(conn); - } + Some(Database::connect(conn_cfg).await?) + } else { + None + }; Ok(Self { db_write, db_read }) } @@ -58,11 +57,10 @@ impl AppDatabase { } pub fn reader(&self) -> &DatabaseConnection { - if self.db_read.is_empty() { - return &self.db_write; + match &self.db_read { + Some(conn) => conn, + None => &self.db_write, } - - &self.db_read[random_range(0..self.db_read.len())] } pub async fn begin(&self) -> Result {