//! Workspace billing alert checker. //! //! Periodically checks all workspaces against their alert config and enqueues //! email notifications when thresholds are exceeded. //! //! Alert types: //! - `low_balance` — balance falls below threshold //! - `monthly_quota` — month_used exceeds monthly_quota * threshold (e.g. 0.8 = 80%) use crate::AppService; use chrono::{Datelike, Utc}; use models::workspaces::{ workspace, workspace_alert_config, workspace_billing, workspace_billing_history, workspace_membership, }; use queue::EmailEnvelope; use rust_decimal::prelude::ToPrimitive; use sea_orm::*; use serde::{Deserialize, Serialize}; use tokio::time::{interval, Duration}; use uuid::Uuid; // ─── Types ───────────────────────────────────────────────────────────────── #[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] pub struct CheckAlertsResponse { pub workspaces_checked: usize, pub alerts_sent: usize, pub details: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] pub struct AlertDetail { pub workspace_id: Uuid, pub workspace_name: String, pub alert_type: String, pub threshold: f64, pub current_value: f64, pub recipients: Vec, } // ─── AppService impl ──────────────────────────────────────────────────────── impl AppService { /// Run billing alert checks for all workspaces. /// Called periodically by the background task and optionally via admin API. pub async fn check_billing_alerts(&self) -> CheckAlertsResponse { let mut details = Vec::new(); let mut alerts_sent = 0; let workspaces = workspace::Entity::find() .filter(workspace::Column::DeletedAt.is_null()) .all(&self.db) .await .unwrap_or_default(); let count = workspaces.len(); for ws in workspaces { let ws_details = self.check_workspace_alerts(&ws).await; alerts_sent += ws_details.alerts_sent; details.extend(ws_details.details); } CheckAlertsResponse { workspaces_checked: count, alerts_sent, details } } async fn check_workspace_alerts( &self, ws: &workspace::Model, ) -> CheckAlertsResponse { let mut details = Vec::new(); let mut alerts_sent = 0usize; let Some(billing) = workspace_billing::Entity::find_by_id(ws.id) .one(&self.db) .await .ok() .flatten() else { return CheckAlertsResponse { workspaces_checked: 1, alerts_sent: 0, details }; }; let configs = workspace_alert_config::Entity::find() .filter(workspace_alert_config::Column::WorkspaceId.eq(ws.id)) .filter(workspace_alert_config::Column::Enabled.eq(true)) .all(&self.db) .await .unwrap_or_default(); if configs.is_empty() { return CheckAlertsResponse { workspaces_checked: 1, alerts_sent: 0, details }; } let month_used = self.current_month_usage(ws.id).await; let recipients = self.alert_recipients(ws.id).await; if recipients.is_empty() { return CheckAlertsResponse { workspaces_checked: 1, alerts_sent: 0, details }; } let threshold_f64 = |d: &rust_decimal::Decimal| -> f64 { use rust_decimal::prelude::ToPrimitive; d.to_f64().unwrap_or_default() }; for config in configs { if !config.email_enabled { continue; } let triggered = match config.alert_type.as_str() { "low_balance" => threshold_f64(&billing.balance) < threshold_f64(&config.threshold), "monthly_quota" => { let quota = threshold_f64(&billing.monthly_quota); quota > 0.0 && month_used > quota * threshold_f64(&config.threshold) } "usage_surge" => { let quota = threshold_f64(&billing.monthly_quota); quota > 0.0 && month_used > quota * 0.5 } _ => false, }; if triggered { let (subject, body) = self.build_alert_email(ws, &billing, month_used, &config); for recipient in &recipients { let envelope = EmailEnvelope { id: Uuid::now_v7(), to: recipient.clone(), subject: subject.clone(), body: body.clone(), created_at: Utc::now(), }; if self.queue_producer.publish_email(envelope).await.is_ok() { alerts_sent += 1; } } details.push(AlertDetail { workspace_id: ws.id, workspace_name: ws.name.clone(), alert_type: config.alert_type.clone(), threshold: threshold_f64(&config.threshold), current_value: if config.alert_type == "low_balance" { threshold_f64(&billing.balance) } else { month_used }, recipients: recipients.clone(), }); } } CheckAlertsResponse { workspaces_checked: 1, alerts_sent, details } } fn build_alert_email( &self, ws: &workspace::Model, billing: &workspace_billing::Model, month_used: f64, config: &workspace_alert_config::Model, ) -> (String, String) { let currency = &billing.currency; let threshold = config.threshold.to_f64().unwrap_or_default(); match config.alert_type.as_str() { "low_balance" => { let balance = billing.balance.to_f64().unwrap_or_default(); ( format!("[告警] Workspace「{}」余额不足", ws.name), format!( "您好,\n\n\ Workspace「{}」余额已低于告警阈值。\n\n\ 当前余额: {:.4} {}\n\ 告警阈值: {:.4} {}\n\n\ 请及时充值以避免服务中断。\n\n\ 此邮件由系统自动发送。", ws.name, balance, currency, threshold, currency ), ) } "monthly_quota" => { let quota = billing.monthly_quota.to_f64().unwrap_or_default(); ( format!("[告警] Workspace「{}」月度配额即将用尽", ws.name), format!( "您好,\n\n\ Workspace「{}」月度用量已超过告警阈值。\n\n\ 当前使用: {:.4} {}\n\ 月度配额: {:.4} {}\n\ 触发阈值: {:.0}%\n\n\ 请关注使用情况。\n\n\ 此邮件由系统自动发送。", ws.name, month_used, currency, quota, currency, threshold * 100.0 ), ) } "usage_surge" => { let quota = billing.monthly_quota.to_f64().unwrap_or_default(); ( format!("[告警] Workspace「{}」使用量激增", ws.name), format!( "您好,\n\n\ Workspace「{}」月度使用量激增,请关注。\n\n\ 当前使用: {:.4} {}(配额: {:.4} {})\n\n\ 此邮件由系统自动发送。", ws.name, month_used, currency, quota, currency ), ) } _ => ( format!("[告警] Workspace「{}」", ws.name), format!( "您好,\n\nWorkspace「{}」触发告警: {}\n\n此邮件由系统自动发送。", ws.name, config.alert_type ), ), } } async fn current_month_usage(&self, workspace_id: Uuid) -> f64 { let now = Utc::now(); let year = now.year(); let month = now.month(); let month_start = chrono::NaiveDate::from_ymd_opt(year, month, 1) .and_then(|d| d.and_hms_opt(0, 0, 0)) .map(|d| chrono::TimeZone::from_utc_datetime(&chrono::Utc, &d)) .unwrap_or(now); let next_month_date = if month == 12 { chrono::NaiveDate::from_ymd_opt(year + 1, 1, 1) } else { chrono::NaiveDate::from_ymd_opt(year, month + 1, 1) } .and_then(|d| d.and_hms_opt(0, 0, 0)) .map(|d| chrono::TimeZone::from_utc_datetime(&chrono::Utc, &d)) .unwrap_or(now); workspace_billing_history::Entity::find() .filter(workspace_billing_history::Column::WorkspaceId.eq(workspace_id)) .filter(workspace_billing_history::Column::Reason.like("ai_usage%")) .filter(workspace_billing_history::Column::CreatedAt.gte(month_start)) .filter(workspace_billing_history::Column::CreatedAt.lt(next_month_date)) .all(&self.db) .await .ok() .unwrap_or_default() .into_iter() .map(|r| r.amount.to_f64().unwrap_or_default()) .sum() } /// Get email addresses for workspace owners and admins who have email notifications enabled. async fn alert_recipients(&self, workspace_id: Uuid) -> Vec { let members = workspace_membership::Entity::find() .filter(workspace_membership::Column::WorkspaceId.eq(workspace_id)) .filter( workspace_membership::Column::Role .is_in(["owner", "admin"]), ) .filter(workspace_membership::Column::Status.eq("active")) .all(&self.db) .await .ok() .unwrap_or_default(); let mut emails = Vec::new(); for member in members { // Check if user has email notifications enabled let notif_enabled = models::users::user_notification::Entity::find_by_id(member.user_id) .one(&self.db) .await .ok() .flatten() .map(|n| n.email_enabled) .unwrap_or(true); // default to enabled if !notif_enabled { continue; } if let Some(email) = models::users::user_email::Entity::find_by_id(member.user_id) .one(&self.db) .await .ok() .flatten() { emails.push(email.email); } } emails } /// Spawn the background billing alert checker task. /// Runs every ALERT_CHECK_INTERVAL seconds. pub fn start_billing_alert_task(self) -> tokio::task::JoinHandle<()> { const ALERT_CHECK_INTERVAL: u64 = 30 * 60; // 30 minutes let logs = self.logs.clone(); tokio::spawn(async move { let mut tick = interval(Duration::from_secs(ALERT_CHECK_INTERVAL)); loop { tick.tick().await; let result = self.check_billing_alerts().await; if result.alerts_sent > 0 { slog::info!( logs, "Billing alerts: checked {} workspaces, sent {} emails", result.workspaces_checked, result.alerts_sent ); } } }) } }