refactor(email): switch to async channel-based email queue with retry
Converts AppEmail from blocking sync sends to a background worker via mpsc channel, adds SMTP pool tuning (min_idle 5, max_size 100), and 3-retry backoff on send failures.
This commit is contained in:
parent
b693bd6beb
commit
882e86dc33
@ -2,18 +2,13 @@ use config::AppConfig;
|
|||||||
use lettre::message::Mailbox;
|
use lettre::message::Mailbox;
|
||||||
use lettre::transport::smtp::authentication::Credentials;
|
use lettre::transport::smtp::authentication::Credentials;
|
||||||
use lettre::transport::smtp::client::Tls;
|
use lettre::transport::smtp::client::Tls;
|
||||||
use lettre::{SmtpTransport, Transport};
|
use lettre::transport::smtp::{PoolConfig, SmtpTransport};
|
||||||
|
use lettre::Transport;
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::sync::LazyLock;
|
use std::sync::LazyLock;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct AppEmail {
|
|
||||||
pub cred: Credentials,
|
|
||||||
pub mailer: SmtpTransport,
|
|
||||||
pub from: Mailbox,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||||
pub struct EmailMessage {
|
pub struct EmailMessage {
|
||||||
@ -25,6 +20,11 @@ pub struct EmailMessage {
|
|||||||
static EMAIL_REGEX: LazyLock<Regex> =
|
static EMAIL_REGEX: LazyLock<Regex> =
|
||||||
LazyLock::new(|| Regex::new(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$").unwrap());
|
LazyLock::new(|| Regex::new(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$").unwrap());
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct AppEmail {
|
||||||
|
sender: mpsc::Sender<EmailMessage>,
|
||||||
|
}
|
||||||
|
|
||||||
impl AppEmail {
|
impl AppEmail {
|
||||||
pub async fn init(cfg: &AppConfig) -> anyhow::Result<Self> {
|
pub async fn init(cfg: &AppConfig) -> anyhow::Result<Self> {
|
||||||
let smtp_host = cfg.smtp_host()?;
|
let smtp_host = cfg.smtp_host()?;
|
||||||
@ -34,12 +34,14 @@ impl AppEmail {
|
|||||||
let smtp_from = cfg.smtp_from()?;
|
let smtp_from = cfg.smtp_from()?;
|
||||||
let smtp_tls = cfg.smtp_tls()?;
|
let smtp_tls = cfg.smtp_tls()?;
|
||||||
let smtp_timeout = cfg.smtp_timeout()?;
|
let smtp_timeout = cfg.smtp_timeout()?;
|
||||||
|
|
||||||
let cred = Credentials::new(smtp_username, smtp_password);
|
let cred = Credentials::new(smtp_username, smtp_password);
|
||||||
|
|
||||||
let tls_param = if smtp_tls {
|
let tls_param = if smtp_tls {
|
||||||
Tls::Required(
|
Tls::Required(
|
||||||
lettre::transport::smtp::client::TlsParameters::builder(smtp_host.clone())
|
lettre::transport::smtp::client::TlsParameters::builder(smtp_host.clone())
|
||||||
.build()
|
.build()
|
||||||
.map_err(|e| anyhow::anyhow!("Failed to build TLS parameters: {}", e))?,
|
.map_err(|e| anyhow::anyhow!("TLS build error: {}", e))?,
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
Tls::None
|
Tls::None
|
||||||
@ -49,29 +51,61 @@ impl AppEmail {
|
|||||||
.port(smtp_port)
|
.port(smtp_port)
|
||||||
.tls(tls_param)
|
.tls(tls_param)
|
||||||
.timeout(Some(Duration::from_secs(smtp_timeout)))
|
.timeout(Some(Duration::from_secs(smtp_timeout)))
|
||||||
.credentials(cred.clone())
|
.credentials(cred)
|
||||||
|
.pool_config(PoolConfig::new().min_idle(5).max_size(100))
|
||||||
.build();
|
.build();
|
||||||
Ok(AppEmail {
|
|
||||||
cred,
|
|
||||||
mailer,
|
|
||||||
from: smtp_from
|
|
||||||
.parse()
|
|
||||||
.map_err(|e| anyhow::anyhow!("Invalid from address: {}", e))?,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
pub async fn send(&self, msg: EmailMessage) -> anyhow::Result<()> {
|
|
||||||
if !EMAIL_REGEX.is_match(&msg.to) {
|
|
||||||
return Err(anyhow::anyhow!("Invalid email address format: {}", msg.to));
|
|
||||||
}
|
|
||||||
|
|
||||||
let email = lettre::Message::builder()
|
let from: Mailbox = smtp_from.parse()?;
|
||||||
.from(self.from.clone())
|
|
||||||
.to(msg.to.parse()?)
|
let (tx, mut rx) = mpsc::channel::<EmailMessage>(100);
|
||||||
.subject(msg.subject)
|
|
||||||
.body(msg.body)?;
|
tokio::spawn(async move {
|
||||||
self.mailer
|
while let Some(msg) = rx.recv().await {
|
||||||
.send(&email)
|
if !EMAIL_REGEX.is_match(&msg.clone().to) {
|
||||||
.map_err(|e| anyhow::anyhow!("{}", e))?;
|
continue;
|
||||||
Ok(())
|
}
|
||||||
|
|
||||||
|
let email = match lettre::Message::builder()
|
||||||
|
.from(from.clone())
|
||||||
|
.to(msg.clone().to.parse().unwrap())
|
||||||
|
.subject(msg.clone().subject)
|
||||||
|
.body(msg.clone().body)
|
||||||
|
{
|
||||||
|
Ok(e) => e,
|
||||||
|
Err(_) => continue,
|
||||||
|
};
|
||||||
|
let mut success = false;
|
||||||
|
for i in 0..3 {
|
||||||
|
let mailer = mailer.clone();
|
||||||
|
let email = email.clone();
|
||||||
|
|
||||||
|
let result = tokio::task::spawn_blocking(move || mailer.send(&email)).await;
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(Ok(_)) => {
|
||||||
|
success = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
let backoff = 100 * (i + 1);
|
||||||
|
tokio::time::sleep(Duration::from_millis(backoff)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !success {
|
||||||
|
println!("[System] email send fail: {:?}", msg.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(Self { sender: tx })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn send(&self, msg: EmailMessage) -> anyhow::Result<()> {
|
||||||
|
self.sender
|
||||||
|
.send(msg)
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("queue send error: {}", e))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user