From 882e86dc33777e900409c37f7eaa8793e1011070 Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Sun, 19 Apr 2026 00:08:10 +0800 Subject: [PATCH] refactor(email): switch to async channel-based email queue with retry Converts AppEmail from blocking sync sends to a background worker via mpsc channel, adds SMTP pool tuning (min_idle 5, max_size 100), and 3-retry backoff on send failures. --- libs/email/lib.rs | 96 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 65 insertions(+), 31 deletions(-) diff --git a/libs/email/lib.rs b/libs/email/lib.rs index 8c6d5ea..094688c 100644 --- a/libs/email/lib.rs +++ b/libs/email/lib.rs @@ -2,18 +2,13 @@ use config::AppConfig; use lettre::message::Mailbox; use lettre::transport::smtp::authentication::Credentials; use lettre::transport::smtp::client::Tls; -use lettre::{SmtpTransport, Transport}; +use lettre::transport::smtp::{PoolConfig, SmtpTransport}; +use lettre::Transport; use regex::Regex; use serde::{Deserialize, Serialize}; use std::sync::LazyLock; use std::time::Duration; - -#[derive(Clone)] -pub struct AppEmail { - pub cred: Credentials, - pub mailer: SmtpTransport, - pub from: Mailbox, -} +use tokio::sync::mpsc; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct EmailMessage { @@ -25,6 +20,11 @@ pub struct EmailMessage { static EMAIL_REGEX: LazyLock = LazyLock::new(|| Regex::new(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$").unwrap()); +#[derive(Clone)] +pub struct AppEmail { + sender: mpsc::Sender, +} + impl AppEmail { pub async fn init(cfg: &AppConfig) -> anyhow::Result { let smtp_host = cfg.smtp_host()?; @@ -34,12 +34,14 @@ impl AppEmail { let smtp_from = cfg.smtp_from()?; let smtp_tls = cfg.smtp_tls()?; let smtp_timeout = cfg.smtp_timeout()?; + let cred = Credentials::new(smtp_username, smtp_password); + let tls_param = if smtp_tls { Tls::Required( lettre::transport::smtp::client::TlsParameters::builder(smtp_host.clone()) .build() - .map_err(|e| anyhow::anyhow!("Failed to build TLS parameters: {}", e))?, + .map_err(|e| anyhow::anyhow!("TLS build error: {}", e))?, ) } else { Tls::None @@ -49,29 +51,61 @@ impl AppEmail { .port(smtp_port) .tls(tls_param) .timeout(Some(Duration::from_secs(smtp_timeout))) - .credentials(cred.clone()) + .credentials(cred) + .pool_config(PoolConfig::new().min_idle(5).max_size(100)) .build(); - Ok(AppEmail { - cred, - mailer, - from: smtp_from - .parse() - .map_err(|e| anyhow::anyhow!("Invalid from address: {}", e))?, - }) - } - pub async fn send(&self, msg: EmailMessage) -> anyhow::Result<()> { - if !EMAIL_REGEX.is_match(&msg.to) { - return Err(anyhow::anyhow!("Invalid email address format: {}", msg.to)); - } - let email = lettre::Message::builder() - .from(self.from.clone()) - .to(msg.to.parse()?) - .subject(msg.subject) - .body(msg.body)?; - self.mailer - .send(&email) - .map_err(|e| anyhow::anyhow!("{}", e))?; - Ok(()) + let from: Mailbox = smtp_from.parse()?; + + let (tx, mut rx) = mpsc::channel::(100); + + tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + if !EMAIL_REGEX.is_match(&msg.clone().to) { + continue; + } + + let email = match lettre::Message::builder() + .from(from.clone()) + .to(msg.clone().to.parse().unwrap()) + .subject(msg.clone().subject) + .body(msg.clone().body) + { + Ok(e) => e, + Err(_) => continue, + }; + let mut success = false; + for i in 0..3 { + let mailer = mailer.clone(); + let email = email.clone(); + + let result = tokio::task::spawn_blocking(move || mailer.send(&email)).await; + + match result { + Ok(Ok(_)) => { + success = true; + break; + } + _ => { + let backoff = 100 * (i + 1); + tokio::time::sleep(Duration::from_millis(backoff)).await; + } + } + } + + if !success { + println!("[System] email send fail: {:?}", msg.clone()); + } + } + }); + + Ok(Self { sender: tx }) + } + + pub async fn send(&self, msg: EmailMessage) -> anyhow::Result<()> { + self.sender + .send(msg) + .await + .map_err(|e| anyhow::anyhow!("queue send error: {}", e)) } }