57 lines
1.6 KiB
Rust
57 lines
1.6 KiB
Rust
use config::AppConfig;
|
|
use queue::{AckAction, MessageHandler, NatsConsumer};
|
|
use tracing::error;
|
|
|
|
use crate::{EmailMessage, SmtpEmailSender};
|
|
|
|
pub struct EmailWorker {
|
|
sender: SmtpEmailSender,
|
|
}
|
|
|
|
impl EmailWorker {
|
|
pub fn new(sender: SmtpEmailSender) -> Self {
|
|
Self { sender }
|
|
}
|
|
|
|
pub async fn start(config: &AppConfig) -> anyhow::Result<()> {
|
|
Self::start_with_metrics(config, None).await
|
|
}
|
|
|
|
pub async fn start_with_metrics(
|
|
config: &AppConfig,
|
|
metrics: Option<track::MetricsRegistry>,
|
|
) -> anyhow::Result<()> {
|
|
let worker = Self::new(SmtpEmailSender::new(config)?);
|
|
let mut consumer =
|
|
NatsConsumer::new(config, &config.email_consumer_group_id())
|
|
.await?;
|
|
if let Some(metrics) = metrics {
|
|
consumer.set_metrics(metrics);
|
|
}
|
|
let topic = config.email_topic();
|
|
consumer.start_consuming(&[topic.as_str()], worker).await?;
|
|
std::future::pending().await
|
|
}
|
|
}
|
|
|
|
#[async_trait::async_trait]
|
|
impl MessageHandler for EmailWorker {
|
|
async fn handle(&self, topic: &str, payload: &[u8]) -> AckAction {
|
|
let message = match serde_json::from_slice::<EmailMessage>(payload) {
|
|
Ok(message) => message,
|
|
Err(error) => {
|
|
error!(topic, error = %error, "invalid email message payload");
|
|
return AckAction::Ack;
|
|
}
|
|
};
|
|
|
|
match self.sender.send(message).await {
|
|
Ok(()) => AckAction::Ack,
|
|
Err(error) => {
|
|
error!(topic, error = %error, "email message send failed");
|
|
AckAction::Nack
|
|
}
|
|
}
|
|
}
|
|
}
|