feat(service): add push and storage service modules, update project/user/workspace services

This commit is contained in:
ZhenYi 2026-04-20 15:45:40 +08:00
parent 0c64122b80
commit 8316fe926f
6 changed files with 392 additions and 0 deletions

View File

@ -20,6 +20,11 @@ use slog::{Drain, OwnedKVList, Record};
use utoipa::ToSchema;
use ws_token::WsTokenService;
pub mod storage;
pub use storage::AppStorage;
pub mod push;
pub use push::{WebPushService, PushPayload};
#[derive(Clone)]
pub struct AppService {
pub db: AppDatabase,
@ -31,6 +36,48 @@ pub struct AppService {
pub room: RoomService,
pub ws_token: Arc<WsTokenService>,
pub queue_producer: MessageProducer,
pub storage: Option<AppStorage>,
pub push: Option<WebPushService>,
}
impl AppService {
/// Send a Web Push notification to a specific user.
/// Reads the user's push subscription from `user_notification` table.
/// Non-blocking: failures are logged but don't affect the caller.
pub fn send_push_to_user(&self, user_id: uuid::Uuid, payload: PushPayload) {
let push = self.push.clone();
let db = self.db.clone();
let log = self.logs.clone();
tokio::spawn(async move {
if let Some(push) = push {
use models::users::user_notification;
use sea_orm::EntityTrait;
let prefs = user_notification::Entity::find_by_id(user_id)
.one(&db)
.await;
if let Ok(Some(prefs)) = prefs {
if prefs.push_enabled
&& prefs.push_subscription_endpoint.is_some()
&& prefs.push_subscription_keys_p256dh.is_some()
&& prefs.push_subscription_keys_auth.is_some()
{
let endpoint = prefs.push_subscription_endpoint.unwrap();
let p256dh = prefs.push_subscription_keys_p256dh.unwrap();
let auth = prefs.push_subscription_keys_auth.unwrap();
if let Err(e) = push.send(&endpoint, &p256dh, &auth, &payload).await {
slog::warn!(log, "WebPush send failed"; "user_id" => %user_id, "error" => %e);
}
}
} else if let Err(e) = prefs {
slog::warn!(log, "Failed to read push subscription"; "user_id" => %user_id, "error" => %e);
}
}
});
}
}
impl AppService {
@ -101,6 +148,42 @@ impl AppService {
let email = AppEmail::init(&config, logs.clone()).await?;
let avatar = AppAvatar::init(&config).await?;
let storage = match AppStorage::new(&config) {
Ok(s) => {
slog::info!(logs, "Storage initialized at {}", s.base_path.display());
Some(s)
}
Err(e) => {
slog::warn!(logs, "Storage not available: {}", e);
None
}
};
let push = match (
config.vapid_public_key(),
config.vapid_private_key(),
) {
(Some(public_key), Some(private_key)) => {
match WebPushService::new(
public_key,
private_key,
config.vapid_sender_email(),
) {
Ok(s) => {
slog::info!(logs, "WebPush initialized");
Some(s)
}
Err(e) => {
slog::warn!(logs, "WebPush not available: {}", e);
None
}
}
}
_ => {
slog::warn!(logs, "WebPush disabled — VAPID keys not configured");
None
}
};
// Build get_redis closure for MessageProducer
let get_redis: Arc<
@ -162,6 +245,47 @@ impl AppService {
}
};
// Build push notification callback for RoomService
let push_fn: Option<room::PushNotificationFn> = push.clone().map(|push_svc| {
let db_clone = db.clone();
let log_clone = logs.clone();
Arc::new(move |user_id: uuid::Uuid, title: String, body: Option<String>, url: Option<String>| {
let push = push_svc.clone();
let db = db_clone.clone();
let log = log_clone.clone();
let payload = PushPayload {
title,
body: body.unwrap_or_default(),
url,
icon: None,
};
tokio::spawn(async move {
use models::users::user_notification;
use sea_orm::EntityTrait;
let prefs = user_notification::Entity::find_by_id(user_id)
.one(&db)
.await;
if let Ok(Some(prefs)) = prefs {
if prefs.push_enabled
&& prefs.push_subscription_endpoint.is_some()
&& prefs.push_subscription_keys_p256dh.is_some()
&& prefs.push_subscription_keys_auth.is_some()
{
let endpoint = prefs.push_subscription_endpoint.unwrap();
let p256dh = prefs.push_subscription_keys_p256dh.unwrap();
let auth = prefs.push_subscription_keys_auth.unwrap();
if let Err(e) = push.send(&endpoint, &p256dh, &auth, &payload).await {
slog::warn!(log, "WebPush send failed"; "user_id" => %user_id, "error" => %e);
}
}
}
});
}) as room::PushNotificationFn
});
let room = RoomService::new(
db.clone(),
cache.clone(),
@ -172,6 +296,7 @@ impl AppService {
Some(task_service.clone()),
logs.clone(),
None,
push_fn,
);
// Build WsTokenService
@ -187,6 +312,8 @@ impl AppService {
room,
ws_token,
queue_producer: message_producer,
storage,
push,
})
}

View File

@ -324,6 +324,40 @@ impl AppService {
if let Err(_e) = self.queue_producer.publish_email(envelope).await {
// Failed to queue invitation email
}
// Send in-app notification + push notification to the invitee
self.send_push_to_user(
target_uid,
crate::push::PushPayload {
title: format!("Project invitation: {}", project.name),
body: format!("{} invited you to join \"{}\" as {:?}", inviter.username, project.name, scope),
url: Some(format!("/projects/{}/invitations", project.name)),
icon: None,
},
);
let _ = self
.room
.notification_create(room::NotificationCreateRequest {
notification_type: room::NotificationType::ProjectInvitation,
user_id: target_uid,
title: format!("{} invited you to join \"{}\"", inviter.username, project.name),
content: Some(format!("Role: {:?}", scope)),
room_id: None,
project_id: project.id,
related_message_id: None,
related_user_id: Some(inviter_uid),
related_room_id: None,
metadata: Some(serde_json::json!({
"project_id": project.id,
"project_name": project.name,
"inviter_uid": inviter_uid,
"scope": format!("{:?}", scope),
})),
expires_at: None,
})
.await;
Ok(())
}

91
libs/service/push.rs Normal file
View File

@ -0,0 +1,91 @@
use std::sync::Arc;
use anyhow::{bail, Context};
use base64ct::{Base64UrlUnpadded, Encoding};
use serde::Serialize;
use web_push_native::{
jwt_simple::algorithms::ES256KeyPair, p256::PublicKey, Auth, WebPushBuilder,
};
#[derive(Clone)]
pub struct WebPushService {
http: reqwest::Client,
vapid_key_pair: Arc<ES256KeyPair>,
sender_email: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct PushPayload {
pub title: String,
pub body: String,
pub url: Option<String>,
pub icon: Option<String>,
}
impl WebPushService {
/// Create a new WebPush service with VAPID keys.
/// - `_vapid_public_key`: Base64url-encoded P-256 public key (derived from private key)
/// - `vapid_private_key`: Base64url-encoded P-256 private key
/// - `sender_email`: Contact email for VAPID (e.g. "mailto:admin@example.com")
pub fn new(
_vapid_public_key: String,
vapid_private_key: String,
sender_email: String,
) -> anyhow::Result<Self> {
// The VAPID private key bytes are used to create the ES256KeyPair.
// The public key is derived from it, so we don't need to validate the public key separately.
let key_bytes = Base64UrlUnpadded::decode_vec(&vapid_private_key)
.context("Failed to decode VAPID private key")?;
let vapid_key_pair =
ES256KeyPair::from_bytes(&key_bytes).context("Invalid VAPID private key")?;
Ok(Self {
http: reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.context("Failed to build HTTP client")?,
vapid_key_pair: Arc::new(vapid_key_pair),
sender_email,
})
}
/// Send a push notification to a browser subscription.
pub async fn send(
&self,
endpoint: &str,
p256dh: &str,
auth: &str,
payload: &PushPayload,
) -> anyhow::Result<()> {
let endpoint_uri: http::Uri = endpoint
.parse()
.with_context(|| format!("Invalid endpoint URL: {}", endpoint))?;
let ua_public_bytes = Base64UrlUnpadded::decode_vec(p256dh)
.with_context(|| format!("Failed to decode p256dh: {}", p256dh))?;
let ua_public = PublicKey::from_sec1_bytes(&ua_public_bytes)
.with_context(|| "Invalid p256dh key")?;
let auth_bytes = Base64UrlUnpadded::decode_vec(auth)
.with_context(|| format!("Failed to decode auth: {}", auth))?;
let ua_auth = Auth::clone_from_slice(&auth_bytes);
let payload_bytes = serde_json::to_vec(payload)?;
let request = WebPushBuilder::new(endpoint_uri, ua_public, ua_auth)
.with_vapid(&self.vapid_key_pair, &self.sender_email)
.build(payload_bytes)?;
let reqwest_request = reqwest::Request::try_from(request)
.context("Failed to convert web-push request")?;
let response = self.http.execute(reqwest_request).await?;
let status = response.status();
if !status.is_success() && status.as_u16() != 201 {
let body = response.text().await.unwrap_or_default();
bail!("WebPush failed: {} - {}", status, body);
}
Ok(())
}
}

60
libs/service/storage.rs Normal file
View File

@ -0,0 +1,60 @@
use config::AppConfig;
use std::path::PathBuf;
#[derive(Clone)]
pub struct AppStorage {
pub base_path: PathBuf,
pub public_url_base: String,
}
impl AppStorage {
pub fn new(config: &AppConfig) -> anyhow::Result<Self> {
let base_path = config
.env
.get("STORAGE_PATH")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("/data/files"));
let public_url_base = config
.env
.get("STORAGE_PUBLIC_URL")
.cloned()
.unwrap_or_else(|| "/files".to_string());
Ok(Self {
base_path,
public_url_base,
})
}
/// Write data to a local path and return the public URL.
pub async fn upload(
&self,
key: &str,
data: Vec<u8>,
) -> anyhow::Result<String> {
let path = self.base_path.join(key);
// Create parent directories
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(&path, &data).await?;
let url = format!(
"{}/{}",
self.public_url_base.trim_end_matches('/'),
key
);
Ok(url)
}
pub async fn delete(&self, key: &str) -> anyhow::Result<()> {
let path = self.base_path.join(key);
if path.exists() {
tokio::fs::remove_file(&path).await?;
}
Ok(())
}
}

View File

@ -19,6 +19,12 @@ pub struct NotificationPreferencesParams {
pub marketing_enabled: Option<bool>,
pub security_enabled: Option<bool>,
pub product_enabled: Option<bool>,
/// Web Push subscription endpoint (set to null to unsubscribe)
pub push_subscription_endpoint: Option<String>,
/// Web Push subscription p256dh key
pub push_subscription_keys_p256dh: Option<String>,
/// Web Push subscription auth key
pub push_subscription_keys_auth: Option<String>,
}
#[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)]
@ -121,6 +127,18 @@ impl AppService {
if let Some(product_enabled) = params.product_enabled {
active_prefs.product_enabled = Set(product_enabled);
}
if let Some(endpoint) = params.push_subscription_endpoint.clone() {
if endpoint.is_empty() {
// Empty string means unsubscribe — clear all subscription fields
active_prefs.push_subscription_endpoint = Set(None);
active_prefs.push_subscription_keys_p256dh = Set(None);
active_prefs.push_subscription_keys_auth = Set(None);
} else {
active_prefs.push_subscription_endpoint = Set(Some(endpoint));
active_prefs.push_subscription_keys_p256dh = Set(params.push_subscription_keys_p256dh.clone());
active_prefs.push_subscription_keys_auth = Set(params.push_subscription_keys_auth.clone());
}
}
active_prefs.updated_at = Set(Utc::now());
active_prefs.update(&self.db).await?
@ -140,6 +158,9 @@ impl AppService {
marketing_enabled: Set(params.marketing_enabled.unwrap_or(false)),
security_enabled: Set(params.security_enabled.unwrap_or(true)),
product_enabled: Set(params.product_enabled.unwrap_or(false)),
push_subscription_endpoint: Set(None),
push_subscription_keys_p256dh: Set(None),
push_subscription_keys_auth: Set(None),
created_at: Set(Utc::now()),
updated_at: Set(Utc::now()),
};
@ -189,6 +210,9 @@ impl AppService {
marketing_enabled: Set(false),
security_enabled: Set(true),
product_enabled: Set(false),
push_subscription_endpoint: Set(None),
push_subscription_keys_p256dh: Set(None),
push_subscription_keys_auth: Set(None),
created_at: Set(Utc::now()),
updated_at: Set(Utc::now()),
};
@ -197,4 +221,26 @@ impl AppService {
Ok(NotificationPreferencesResponse::from(created_prefs))
}
pub async fn user_unsubscribe_push(
&self,
context: &Session,
) -> Result<(), AppError> {
let user_uid = context.user().ok_or(AppError::Unauthorized)?;
let prefs = user_notification::Entity::find_by_id(user_uid)
.one(&self.db)
.await?;
if let Some(prefs) = prefs {
let mut active_prefs: user_notification::ActiveModel = prefs.into();
active_prefs.push_subscription_endpoint = Set(None);
active_prefs.push_subscription_keys_p256dh = Set(None);
active_prefs.push_subscription_keys_auth = Set(None);
active_prefs.updated_at = Set(Utc::now());
active_prefs.update(&self.db).await?;
}
Ok(())
}
}

View File

@ -373,6 +373,40 @@ impl AppService {
self.email.send(envelope).await.map_err(|e| {
AppError::InternalServerError(format!("Failed to send invitation email: {}", e))
})?;
// Send in-app notification + push notification to the invitee
self.send_push_to_user(
target_user.uid,
crate::push::PushPayload {
title: format!("Workspace invitation: {}", ws.name),
body: format!("{} invited you to join the workspace \"{}\"", inviter.username, ws.name),
url: Some(format!("/workspaces/{}/invitations", ws.slug)),
icon: None,
},
);
let _ = self
.room
.notification_create(room::NotificationCreateRequest {
notification_type: room::NotificationType::WorkspaceInvitation,
user_id: target_user.uid,
title: format!("{} invited you to join \"{}\"", inviter.username, ws.name),
content: None,
room_id: None,
project_id: Default::default(), // workspace invitations don't have a project_id
related_message_id: None,
related_user_id: Some(user_uid),
related_room_id: None,
metadata: Some(serde_json::json!({
"workspace_id": ws.id,
"workspace_name": ws.name,
"workspace_slug": ws.slug,
"inviter_uid": user_uid,
})),
expires_at: None,
})
.await;
Ok(())
}