gitdataai/libs/service/workspace/alert.rs
ZhenYi fb91f5a6c5 feat(admin): add admin panel with billing alerts and model sync
- Add libs/api/admin with admin API endpoints:
  sync models, workspace credit, billing alert check
- Add workspace_alert_config model and alert service
- Add Session::no_op() for background tasks without user context
- Add admin/ Next.js admin panel (AI models, billing, workspaces, audit)
- Start billing alert background task every 30 minutes
2026-04-19 20:48:59 +08:00

318 lines
12 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Workspace billing alert checker.
//!
//! Periodically checks all workspaces against their alert config and enqueues
//! email notifications when thresholds are exceeded.
//!
//! Alert types:
//! - `low_balance` — balance falls below threshold
//! - `monthly_quota` — month_used exceeds monthly_quota * threshold (e.g. 0.8 = 80%)
use crate::AppService;
use chrono::{Datelike, Utc};
use models::workspaces::{
workspace, workspace_alert_config, workspace_billing, workspace_billing_history,
workspace_membership,
};
use queue::EmailEnvelope;
use rust_decimal::prelude::ToPrimitive;
use sea_orm::*;
use serde::{Deserialize, Serialize};
use tokio::time::{interval, Duration};
use uuid::Uuid;
// ─── Types ─────────────────────────────────────────────────────────────────
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
pub struct CheckAlertsResponse {
pub workspaces_checked: usize,
pub alerts_sent: usize,
pub details: Vec<AlertDetail>,
}
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
pub struct AlertDetail {
pub workspace_id: Uuid,
pub workspace_name: String,
pub alert_type: String,
pub threshold: f64,
pub current_value: f64,
pub recipients: Vec<String>,
}
// ─── AppService impl ────────────────────────────────────────────────────────
impl AppService {
/// Run billing alert checks for all workspaces.
/// Called periodically by the background task and optionally via admin API.
pub async fn check_billing_alerts(&self) -> CheckAlertsResponse {
let mut details = Vec::new();
let mut alerts_sent = 0;
let workspaces = workspace::Entity::find()
.filter(workspace::Column::DeletedAt.is_null())
.all(&self.db)
.await
.unwrap_or_default();
let count = workspaces.len();
for ws in workspaces {
let ws_details = self.check_workspace_alerts(&ws).await;
alerts_sent += ws_details.alerts_sent;
details.extend(ws_details.details);
}
CheckAlertsResponse { workspaces_checked: count, alerts_sent, details }
}
async fn check_workspace_alerts(
&self,
ws: &workspace::Model,
) -> CheckAlertsResponse {
let mut details = Vec::new();
let mut alerts_sent = 0usize;
let Some(billing) = workspace_billing::Entity::find_by_id(ws.id)
.one(&self.db)
.await
.ok()
.flatten()
else {
return CheckAlertsResponse { workspaces_checked: 1, alerts_sent: 0, details };
};
let configs = workspace_alert_config::Entity::find()
.filter(workspace_alert_config::Column::WorkspaceId.eq(ws.id))
.filter(workspace_alert_config::Column::Enabled.eq(true))
.all(&self.db)
.await
.unwrap_or_default();
if configs.is_empty() {
return CheckAlertsResponse { workspaces_checked: 1, alerts_sent: 0, details };
}
let month_used = self.current_month_usage(ws.id).await;
let recipients = self.alert_recipients(ws.id).await;
if recipients.is_empty() {
return CheckAlertsResponse { workspaces_checked: 1, alerts_sent: 0, details };
}
let threshold_f64 = |d: &rust_decimal::Decimal| -> f64 {
use rust_decimal::prelude::ToPrimitive;
d.to_f64().unwrap_or_default()
};
for config in configs {
if !config.email_enabled {
continue;
}
let triggered = match config.alert_type.as_str() {
"low_balance" => threshold_f64(&billing.balance) < threshold_f64(&config.threshold),
"monthly_quota" => {
let quota = threshold_f64(&billing.monthly_quota);
quota > 0.0 && month_used > quota * threshold_f64(&config.threshold)
}
"usage_surge" => {
let quota = threshold_f64(&billing.monthly_quota);
quota > 0.0 && month_used > quota * 0.5
}
_ => false,
};
if triggered {
let (subject, body) = self.build_alert_email(ws, &billing, month_used, &config);
for recipient in &recipients {
let envelope = EmailEnvelope {
id: Uuid::now_v7(),
to: recipient.clone(),
subject: subject.clone(),
body: body.clone(),
created_at: Utc::now(),
};
if self.queue_producer.publish_email(envelope).await.is_ok() {
alerts_sent += 1;
}
}
details.push(AlertDetail {
workspace_id: ws.id,
workspace_name: ws.name.clone(),
alert_type: config.alert_type.clone(),
threshold: threshold_f64(&config.threshold),
current_value: if config.alert_type == "low_balance" {
threshold_f64(&billing.balance)
} else {
month_used
},
recipients: recipients.clone(),
});
}
}
CheckAlertsResponse { workspaces_checked: 1, alerts_sent, details }
}
fn build_alert_email(
&self,
ws: &workspace::Model,
billing: &workspace_billing::Model,
month_used: f64,
config: &workspace_alert_config::Model,
) -> (String, String) {
let currency = &billing.currency;
let threshold = config.threshold.to_f64().unwrap_or_default();
match config.alert_type.as_str() {
"low_balance" => {
let balance = billing.balance.to_f64().unwrap_or_default();
(
format!("[告警] Workspace「{}」余额不足", ws.name),
format!(
"您好,\n\n\
Workspace「{}」余额已低于告警阈值。\n\n\
当前余额: {:.4} {}\n\
告警阈值: {:.4} {}\n\n\
请及时充值以避免服务中断。\n\n\
此邮件由系统自动发送。",
ws.name, balance, currency, threshold, currency
),
)
}
"monthly_quota" => {
let quota = billing.monthly_quota.to_f64().unwrap_or_default();
(
format!("[告警] Workspace「{}」月度配额即将用尽", ws.name),
format!(
"您好,\n\n\
Workspace「{}」月度用量已超过告警阈值。\n\n\
当前使用: {:.4} {}\n\
月度配额: {:.4} {}\n\
触发阈值: {:.0}%\n\n\
请关注使用情况。\n\n\
此邮件由系统自动发送。",
ws.name, month_used, currency, quota, currency,
threshold * 100.0
),
)
}
"usage_surge" => {
let quota = billing.monthly_quota.to_f64().unwrap_or_default();
(
format!("[告警] Workspace「{}」使用量激增", ws.name),
format!(
"您好,\n\n\
Workspace「{}」月度使用量激增,请关注。\n\n\
当前使用: {:.4} {}(配额: {:.4} {}\n\n\
此邮件由系统自动发送。",
ws.name, month_used, currency, quota, currency
),
)
}
_ => (
format!("[告警] Workspace「{}", ws.name),
format!(
"您好,\n\nWorkspace「{}」触发告警: {}\n\n此邮件由系统自动发送。",
ws.name, config.alert_type
),
),
}
}
async fn current_month_usage(&self, workspace_id: Uuid) -> f64 {
let now = Utc::now();
let year = now.year();
let month = now.month();
let month_start = chrono::NaiveDate::from_ymd_opt(year, month, 1)
.and_then(|d| d.and_hms_opt(0, 0, 0))
.map(|d| chrono::TimeZone::from_utc_datetime(&chrono::Utc, &d))
.unwrap_or(now);
let next_month_date = if month == 12 {
chrono::NaiveDate::from_ymd_opt(year + 1, 1, 1)
} else {
chrono::NaiveDate::from_ymd_opt(year, month + 1, 1)
}
.and_then(|d| d.and_hms_opt(0, 0, 0))
.map(|d| chrono::TimeZone::from_utc_datetime(&chrono::Utc, &d))
.unwrap_or(now);
workspace_billing_history::Entity::find()
.filter(workspace_billing_history::Column::WorkspaceId.eq(workspace_id))
.filter(workspace_billing_history::Column::Reason.like("ai_usage%"))
.filter(workspace_billing_history::Column::CreatedAt.gte(month_start))
.filter(workspace_billing_history::Column::CreatedAt.lt(next_month_date))
.all(&self.db)
.await
.ok()
.unwrap_or_default()
.into_iter()
.map(|r| r.amount.to_f64().unwrap_or_default())
.sum()
}
/// Get email addresses for workspace owners and admins who have email notifications enabled.
async fn alert_recipients(&self, workspace_id: Uuid) -> Vec<String> {
let members = workspace_membership::Entity::find()
.filter(workspace_membership::Column::WorkspaceId.eq(workspace_id))
.filter(
workspace_membership::Column::Role
.is_in(["owner", "admin"]),
)
.filter(workspace_membership::Column::Status.eq("active"))
.all(&self.db)
.await
.ok()
.unwrap_or_default();
let mut emails = Vec::new();
for member in members {
// Check if user has email notifications enabled
let notif_enabled = models::users::user_notification::Entity::find_by_id(member.user_id)
.one(&self.db)
.await
.ok()
.flatten()
.map(|n| n.email_enabled)
.unwrap_or(true); // default to enabled
if !notif_enabled {
continue;
}
if let Some(email) = models::users::user_email::Entity::find_by_id(member.user_id)
.one(&self.db)
.await
.ok()
.flatten()
{
emails.push(email.email);
}
}
emails
}
/// Spawn the background billing alert checker task.
/// Runs every ALERT_CHECK_INTERVAL seconds.
pub fn start_billing_alert_task(self) -> tokio::task::JoinHandle<()> {
const ALERT_CHECK_INTERVAL: u64 = 30 * 60; // 30 minutes
let logs = self.logs.clone();
tokio::spawn(async move {
let mut tick = interval(Duration::from_secs(ALERT_CHECK_INTERVAL));
loop {
tick.tick().await;
let result = self.check_billing_alerts().await;
if result.alerts_sent > 0 {
slog::info!(
logs,
"Billing alerts: checked {} workspaces, sent {} emails",
result.workspaces_checked,
result.alerts_sent
);
}
}
})
}
}