refactor(db): simplify read-replica to single connection for CNPG
Some checks are pending
CI / Rust Lint & Check (push) Waiting to run
CI / Rust Tests (push) Waiting to run
CI / Frontend Lint & Type Check (push) Waiting to run
CI / Frontend Build (push) Blocked by required conditions

CNPG's cluster-ro service already handles load balancing and failover,
so the application-level Vec + random_range is redundant.

- db_read: Vec<DatabaseConnection> → Option<DatabaseConnection>
- database_read_replicas returns Option<String> instead of Vec<String>
- health checks now explicitly ping both writer() and reader()
- remove unused rand dependency from libs/db
This commit is contained in:
ZhenYi 2026-04-26 01:03:39 +08:00
parent 468007177f
commit 0b5dc98ce5
7 changed files with 46 additions and 34 deletions

1
Cargo.lock generated
View File

@ -2134,7 +2134,6 @@ dependencies = [
"async-trait", "async-trait",
"config", "config",
"deadpool-redis", "deadpool-redis",
"rand 0.10.0",
"sea-orm", "sea-orm",
"tokio", "tokio",
] ]

View File

@ -275,12 +275,17 @@ async fn health_check(state: web::Data<AppState>) -> HttpResponse {
} }
async fn db_ping(db: &AppDatabase) -> bool { async fn db_ping(db: &AppDatabase) -> bool {
db.query_one_raw(sea_orm::Statement::from_string( let writer_ok = db
sea_orm::DbBackend::Postgres, .writer()
"SELECT 1", .execute_unprepared("SELECT 1")
))
.await .await
.is_ok() .is_ok();
let reader_ok = db
.reader()
.execute_unprepared("SELECT 1")
.await
.is_ok();
writer_ok && reader_ok
} }
async fn cache_ping(cache: &AppCache) -> bool { async fn cache_ping(cache: &AppCache) -> bool {

View File

@ -23,13 +23,17 @@ async fn http_handler(
) -> Result<hyper::Response<hyper::Body>, std::convert::Infallible> { ) -> Result<hyper::Response<hyper::Body>, std::convert::Infallible> {
match req.uri().path() { match req.uri().path() {
"/health" => { "/health" => {
let db_ok = db let writer_ok = db
.query_one_raw(sea_orm::Statement::from_string( .writer()
sea_orm::DbBackend::Postgres, .execute_unprepared("SELECT 1")
"SELECT 1",
))
.await .await
.is_ok(); .is_ok();
let reader_ok = db
.reader()
.execute_unprepared("SELECT 1")
.await
.is_ok();
let db_ok = writer_ok && reader_ok;
let cache_ok = cache.conn().await.is_ok(); let cache_ok = cache.conn().await.is_ok();
let body = serde_json::json!({ let body = serde_json::json!({

View File

@ -22,13 +22,17 @@ async fn http_handler(
) -> Result<hyper::Response<hyper::Body>, std::convert::Infallible> { ) -> Result<hyper::Response<hyper::Body>, std::convert::Infallible> {
match req.uri().path() { match req.uri().path() {
"/health" => { "/health" => {
let db_ok = db let writer_ok = db
.query_one_raw(sea_orm::Statement::from_string( .writer()
sea_orm::DbBackend::Postgres, .execute_unprepared("SELECT 1")
"SELECT 1",
))
.await .await
.is_ok(); .is_ok();
let reader_ok = db
.reader()
.execute_unprepared("SELECT 1")
.await
.is_ok();
let db_ok = writer_ok && reader_ok;
let cache_ok = cache.conn().await.is_ok(); let cache_ok = cache.conn().await.is_ok();
let body = serde_json::json!({ let body = serde_json::json!({

View File

@ -43,11 +43,14 @@ impl AppConfig {
} }
Ok("public".to_string()) Ok("public".to_string())
} }
pub fn database_read_replicas(&self) -> anyhow::Result<Vec<String>> { pub fn database_read_replicas(&self) -> anyhow::Result<Option<String>> {
if let Some(replicas) = self.env.get("APP_DATABASE_REPLICAS") { if let Some(replicas) = self.env.get("APP_DATABASE_REPLICAS") {
return Ok(replicas.split(',').map(|s| s.to_string()).collect()); if replicas.is_empty() {
return Ok(None);
} }
Ok(vec![]) return Ok(Some(replicas.to_string()));
}
Ok(None)
} }
pub fn database_health_check_interval(&self) -> anyhow::Result<u64> { pub fn database_health_check_interval(&self) -> anyhow::Result<u64> {
if let Some(interval) = self.env.get("APP_DATABASE_HEALTH_CHECK_INTERVAL") { if let Some(interval) = self.env.get("APP_DATABASE_HEALTH_CHECK_INTERVAL") {

View File

@ -19,7 +19,6 @@ sea-orm = { workspace = true, features = ["sqlx-all", "runtime-tokio"] }
deadpool-redis = { workspace = true, features = ["rt_tokio_1", "cluster-async", "cluster"] } deadpool-redis = { workspace = true, features = ["rt_tokio_1", "cluster-async", "cluster"] }
config = { workspace = true } config = { workspace = true }
anyhow = { workspace = true } anyhow = { workspace = true }
rand = { workspace = true, features = [] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
async-trait = { workspace = true } async-trait = { workspace = true }
[lints] [lints]

View File

@ -1,5 +1,4 @@
use config::AppConfig; use config::AppConfig;
use rand::random_range;
use sea_orm::prelude::async_trait::async_trait; use sea_orm::prelude::async_trait::async_trait;
use sea_orm::{ use sea_orm::{
ConnectionTrait, Database, DatabaseConnection, DatabaseTransaction, DbBackend, DbErr, ConnectionTrait, Database, DatabaseConnection, DatabaseTransaction, DbBackend, DbErr,
@ -10,7 +9,7 @@ use std::time::Duration;
#[derive(Clone)] #[derive(Clone)]
pub struct AppDatabase { pub struct AppDatabase {
db_write: DatabaseConnection, db_write: DatabaseConnection,
db_read: Vec<DatabaseConnection>, db_read: Option<DatabaseConnection>,
} }
impl AppDatabase { impl AppDatabase {
@ -22,7 +21,7 @@ impl AppDatabase {
let max_lifetime = cfg.database_max_lifetime()?; let max_lifetime = cfg.database_max_lifetime()?;
let connection_timeout = cfg.database_connection_timeout()?; let connection_timeout = cfg.database_connection_timeout()?;
let schema_search_path = cfg.database_schema_search_path()?; let schema_search_path = cfg.database_schema_search_path()?;
let read_replicas = cfg.database_read_replicas()?; let read_replica = cfg.database_read_replicas()?;
let conn_cfg = sea_orm::ConnectOptions::new(db_url) let conn_cfg = sea_orm::ConnectOptions::new(db_url)
.max_connections(max_connections) .max_connections(max_connections)
@ -36,9 +35,8 @@ impl AppDatabase {
let db_write = Database::connect(conn_cfg).await?; let db_write = Database::connect(conn_cfg).await?;
let mut db_read = vec![]; let db_read = if let Some(ref replica_url) = read_replica {
for replica in read_replicas { let conn_cfg = sea_orm::ConnectOptions::new(replica_url.clone())
let conn_cfg = sea_orm::ConnectOptions::new(replica.clone())
.max_connections(max_connections) .max_connections(max_connections)
.min_connections(min_connections) .min_connections(min_connections)
.idle_timeout(Duration::from_secs(idle_timeout)) .idle_timeout(Duration::from_secs(idle_timeout))
@ -46,9 +44,10 @@ impl AppDatabase {
.connect_timeout(Duration::from_secs(connection_timeout)) .connect_timeout(Duration::from_secs(connection_timeout))
.to_owned(); .to_owned();
let conn = Database::connect(conn_cfg).await?; Some(Database::connect(conn_cfg).await?)
db_read.push(conn); } else {
} None
};
Ok(Self { db_write, db_read }) Ok(Self { db_write, db_read })
} }
@ -58,11 +57,10 @@ impl AppDatabase {
} }
pub fn reader(&self) -> &DatabaseConnection { pub fn reader(&self) -> &DatabaseConnection {
if self.db_read.is_empty() { match &self.db_read {
return &self.db_write; Some(conn) => conn,
None => &self.db_write,
} }
&self.db_read[random_range(0..self.db_read.len())]
} }
pub async fn begin(&self) -> Result<AppTransaction, DbErr> { pub async fn begin(&self) -> Result<AppTransaction, DbErr> {