diff --git a/libs/service/lib.rs b/libs/service/lib.rs index eb7c790..e82cda1 100644 --- a/libs/service/lib.rs +++ b/libs/service/lib.rs @@ -20,6 +20,11 @@ use slog::{Drain, OwnedKVList, Record}; use utoipa::ToSchema; use ws_token::WsTokenService; +pub mod storage; +pub use storage::AppStorage; +pub mod push; +pub use push::{WebPushService, PushPayload}; + #[derive(Clone)] pub struct AppService { pub db: AppDatabase, @@ -31,6 +36,48 @@ pub struct AppService { pub room: RoomService, pub ws_token: Arc, pub queue_producer: MessageProducer, + pub storage: Option, + pub push: Option, +} + +impl AppService { + /// Send a Web Push notification to a specific user. + /// Reads the user's push subscription from `user_notification` table. + /// Non-blocking: failures are logged but don't affect the caller. + pub fn send_push_to_user(&self, user_id: uuid::Uuid, payload: PushPayload) { + let push = self.push.clone(); + let db = self.db.clone(); + let log = self.logs.clone(); + + tokio::spawn(async move { + if let Some(push) = push { + use models::users::user_notification; + use sea_orm::EntityTrait; + + let prefs = user_notification::Entity::find_by_id(user_id) + .one(&db) + .await; + + if let Ok(Some(prefs)) = prefs { + if prefs.push_enabled + && prefs.push_subscription_endpoint.is_some() + && prefs.push_subscription_keys_p256dh.is_some() + && prefs.push_subscription_keys_auth.is_some() + { + let endpoint = prefs.push_subscription_endpoint.unwrap(); + let p256dh = prefs.push_subscription_keys_p256dh.unwrap(); + let auth = prefs.push_subscription_keys_auth.unwrap(); + + if let Err(e) = push.send(&endpoint, &p256dh, &auth, &payload).await { + slog::warn!(log, "WebPush send failed"; "user_id" => %user_id, "error" => %e); + } + } + } else if let Err(e) = prefs { + slog::warn!(log, "Failed to read push subscription"; "user_id" => %user_id, "error" => %e); + } + } + }); + } } impl AppService { @@ -101,6 +148,42 @@ impl AppService { let email = AppEmail::init(&config, logs.clone()).await?; let avatar = AppAvatar::init(&config).await?; + let storage = match AppStorage::new(&config) { + Ok(s) => { + slog::info!(logs, "Storage initialized at {}", s.base_path.display()); + Some(s) + } + Err(e) => { + slog::warn!(logs, "Storage not available: {}", e); + None + } + }; + + let push = match ( + config.vapid_public_key(), + config.vapid_private_key(), + ) { + (Some(public_key), Some(private_key)) => { + match WebPushService::new( + public_key, + private_key, + config.vapid_sender_email(), + ) { + Ok(s) => { + slog::info!(logs, "WebPush initialized"); + Some(s) + } + Err(e) => { + slog::warn!(logs, "WebPush not available: {}", e); + None + } + } + } + _ => { + slog::warn!(logs, "WebPush disabled — VAPID keys not configured"); + None + } + }; // Build get_redis closure for MessageProducer let get_redis: Arc< @@ -162,6 +245,47 @@ impl AppService { } }; + // Build push notification callback for RoomService + let push_fn: Option = push.clone().map(|push_svc| { + let db_clone = db.clone(); + let log_clone = logs.clone(); + Arc::new(move |user_id: uuid::Uuid, title: String, body: Option, url: Option| { + let push = push_svc.clone(); + let db = db_clone.clone(); + let log = log_clone.clone(); + let payload = PushPayload { + title, + body: body.unwrap_or_default(), + url, + icon: None, + }; + tokio::spawn(async move { + use models::users::user_notification; + use sea_orm::EntityTrait; + + let prefs = user_notification::Entity::find_by_id(user_id) + .one(&db) + .await; + + if let Ok(Some(prefs)) = prefs { + if prefs.push_enabled + && prefs.push_subscription_endpoint.is_some() + && prefs.push_subscription_keys_p256dh.is_some() + && prefs.push_subscription_keys_auth.is_some() + { + let endpoint = prefs.push_subscription_endpoint.unwrap(); + let p256dh = prefs.push_subscription_keys_p256dh.unwrap(); + let auth = prefs.push_subscription_keys_auth.unwrap(); + + if let Err(e) = push.send(&endpoint, &p256dh, &auth, &payload).await { + slog::warn!(log, "WebPush send failed"; "user_id" => %user_id, "error" => %e); + } + } + } + }); + }) as room::PushNotificationFn + }); + let room = RoomService::new( db.clone(), cache.clone(), @@ -172,6 +296,7 @@ impl AppService { Some(task_service.clone()), logs.clone(), None, + push_fn, ); // Build WsTokenService @@ -187,6 +312,8 @@ impl AppService { room, ws_token, queue_producer: message_producer, + storage, + push, }) } diff --git a/libs/service/project/invitation.rs b/libs/service/project/invitation.rs index 8d575c8..3ecbfa3 100644 --- a/libs/service/project/invitation.rs +++ b/libs/service/project/invitation.rs @@ -324,6 +324,40 @@ impl AppService { if let Err(_e) = self.queue_producer.publish_email(envelope).await { // Failed to queue invitation email } + + // Send in-app notification + push notification to the invitee + self.send_push_to_user( + target_uid, + crate::push::PushPayload { + title: format!("Project invitation: {}", project.name), + body: format!("{} invited you to join \"{}\" as {:?}", inviter.username, project.name, scope), + url: Some(format!("/projects/{}/invitations", project.name)), + icon: None, + }, + ); + + let _ = self + .room + .notification_create(room::NotificationCreateRequest { + notification_type: room::NotificationType::ProjectInvitation, + user_id: target_uid, + title: format!("{} invited you to join \"{}\"", inviter.username, project.name), + content: Some(format!("Role: {:?}", scope)), + room_id: None, + project_id: project.id, + related_message_id: None, + related_user_id: Some(inviter_uid), + related_room_id: None, + metadata: Some(serde_json::json!({ + "project_id": project.id, + "project_name": project.name, + "inviter_uid": inviter_uid, + "scope": format!("{:?}", scope), + })), + expires_at: None, + }) + .await; + Ok(()) } diff --git a/libs/service/push.rs b/libs/service/push.rs new file mode 100644 index 0000000..d6b1850 --- /dev/null +++ b/libs/service/push.rs @@ -0,0 +1,91 @@ +use std::sync::Arc; + +use anyhow::{bail, Context}; +use base64ct::{Base64UrlUnpadded, Encoding}; +use serde::Serialize; +use web_push_native::{ + jwt_simple::algorithms::ES256KeyPair, p256::PublicKey, Auth, WebPushBuilder, +}; + +#[derive(Clone)] +pub struct WebPushService { + http: reqwest::Client, + vapid_key_pair: Arc, + sender_email: String, +} + +#[derive(Debug, Clone, Serialize)] +pub struct PushPayload { + pub title: String, + pub body: String, + pub url: Option, + pub icon: Option, +} + +impl WebPushService { + /// Create a new WebPush service with VAPID keys. + /// - `_vapid_public_key`: Base64url-encoded P-256 public key (derived from private key) + /// - `vapid_private_key`: Base64url-encoded P-256 private key + /// - `sender_email`: Contact email for VAPID (e.g. "mailto:admin@example.com") + pub fn new( + _vapid_public_key: String, + vapid_private_key: String, + sender_email: String, + ) -> anyhow::Result { + // The VAPID private key bytes are used to create the ES256KeyPair. + // The public key is derived from it, so we don't need to validate the public key separately. + let key_bytes = Base64UrlUnpadded::decode_vec(&vapid_private_key) + .context("Failed to decode VAPID private key")?; + let vapid_key_pair = + ES256KeyPair::from_bytes(&key_bytes).context("Invalid VAPID private key")?; + + Ok(Self { + http: reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(10)) + .build() + .context("Failed to build HTTP client")?, + vapid_key_pair: Arc::new(vapid_key_pair), + sender_email, + }) + } + + /// Send a push notification to a browser subscription. + pub async fn send( + &self, + endpoint: &str, + p256dh: &str, + auth: &str, + payload: &PushPayload, + ) -> anyhow::Result<()> { + let endpoint_uri: http::Uri = endpoint + .parse() + .with_context(|| format!("Invalid endpoint URL: {}", endpoint))?; + + let ua_public_bytes = Base64UrlUnpadded::decode_vec(p256dh) + .with_context(|| format!("Failed to decode p256dh: {}", p256dh))?; + let ua_public = PublicKey::from_sec1_bytes(&ua_public_bytes) + .with_context(|| "Invalid p256dh key")?; + + let auth_bytes = Base64UrlUnpadded::decode_vec(auth) + .with_context(|| format!("Failed to decode auth: {}", auth))?; + let ua_auth = Auth::clone_from_slice(&auth_bytes); + + let payload_bytes = serde_json::to_vec(payload)?; + + let request = WebPushBuilder::new(endpoint_uri, ua_public, ua_auth) + .with_vapid(&self.vapid_key_pair, &self.sender_email) + .build(payload_bytes)?; + + let reqwest_request = reqwest::Request::try_from(request) + .context("Failed to convert web-push request")?; + let response = self.http.execute(reqwest_request).await?; + + let status = response.status(); + if !status.is_success() && status.as_u16() != 201 { + let body = response.text().await.unwrap_or_default(); + bail!("WebPush failed: {} - {}", status, body); + } + + Ok(()) + } +} diff --git a/libs/service/storage.rs b/libs/service/storage.rs new file mode 100644 index 0000000..214f915 --- /dev/null +++ b/libs/service/storage.rs @@ -0,0 +1,60 @@ +use config::AppConfig; +use std::path::PathBuf; + +#[derive(Clone)] +pub struct AppStorage { + pub base_path: PathBuf, + pub public_url_base: String, +} + +impl AppStorage { + pub fn new(config: &AppConfig) -> anyhow::Result { + let base_path = config + .env + .get("STORAGE_PATH") + .map(PathBuf::from) + .unwrap_or_else(|| PathBuf::from("/data/files")); + + let public_url_base = config + .env + .get("STORAGE_PUBLIC_URL") + .cloned() + .unwrap_or_else(|| "/files".to_string()); + + Ok(Self { + base_path, + public_url_base, + }) + } + + /// Write data to a local path and return the public URL. + pub async fn upload( + &self, + key: &str, + data: Vec, + ) -> anyhow::Result { + let path = self.base_path.join(key); + + // Create parent directories + if let Some(parent) = path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + + tokio::fs::write(&path, &data).await?; + + let url = format!( + "{}/{}", + self.public_url_base.trim_end_matches('/'), + key + ); + Ok(url) + } + + pub async fn delete(&self, key: &str) -> anyhow::Result<()> { + let path = self.base_path.join(key); + if path.exists() { + tokio::fs::remove_file(&path).await?; + } + Ok(()) + } +} diff --git a/libs/service/user/notification.rs b/libs/service/user/notification.rs index 09830ca..5800fea 100644 --- a/libs/service/user/notification.rs +++ b/libs/service/user/notification.rs @@ -19,6 +19,12 @@ pub struct NotificationPreferencesParams { pub marketing_enabled: Option, pub security_enabled: Option, pub product_enabled: Option, + /// Web Push subscription endpoint (set to null to unsubscribe) + pub push_subscription_endpoint: Option, + /// Web Push subscription p256dh key + pub push_subscription_keys_p256dh: Option, + /// Web Push subscription auth key + pub push_subscription_keys_auth: Option, } #[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)] @@ -121,6 +127,18 @@ impl AppService { if let Some(product_enabled) = params.product_enabled { active_prefs.product_enabled = Set(product_enabled); } + if let Some(endpoint) = params.push_subscription_endpoint.clone() { + if endpoint.is_empty() { + // Empty string means unsubscribe — clear all subscription fields + active_prefs.push_subscription_endpoint = Set(None); + active_prefs.push_subscription_keys_p256dh = Set(None); + active_prefs.push_subscription_keys_auth = Set(None); + } else { + active_prefs.push_subscription_endpoint = Set(Some(endpoint)); + active_prefs.push_subscription_keys_p256dh = Set(params.push_subscription_keys_p256dh.clone()); + active_prefs.push_subscription_keys_auth = Set(params.push_subscription_keys_auth.clone()); + } + } active_prefs.updated_at = Set(Utc::now()); active_prefs.update(&self.db).await? @@ -140,6 +158,9 @@ impl AppService { marketing_enabled: Set(params.marketing_enabled.unwrap_or(false)), security_enabled: Set(params.security_enabled.unwrap_or(true)), product_enabled: Set(params.product_enabled.unwrap_or(false)), + push_subscription_endpoint: Set(None), + push_subscription_keys_p256dh: Set(None), + push_subscription_keys_auth: Set(None), created_at: Set(Utc::now()), updated_at: Set(Utc::now()), }; @@ -189,6 +210,9 @@ impl AppService { marketing_enabled: Set(false), security_enabled: Set(true), product_enabled: Set(false), + push_subscription_endpoint: Set(None), + push_subscription_keys_p256dh: Set(None), + push_subscription_keys_auth: Set(None), created_at: Set(Utc::now()), updated_at: Set(Utc::now()), }; @@ -197,4 +221,26 @@ impl AppService { Ok(NotificationPreferencesResponse::from(created_prefs)) } + + pub async fn user_unsubscribe_push( + &self, + context: &Session, + ) -> Result<(), AppError> { + let user_uid = context.user().ok_or(AppError::Unauthorized)?; + + let prefs = user_notification::Entity::find_by_id(user_uid) + .one(&self.db) + .await?; + + if let Some(prefs) = prefs { + let mut active_prefs: user_notification::ActiveModel = prefs.into(); + active_prefs.push_subscription_endpoint = Set(None); + active_prefs.push_subscription_keys_p256dh = Set(None); + active_prefs.push_subscription_keys_auth = Set(None); + active_prefs.updated_at = Set(Utc::now()); + active_prefs.update(&self.db).await?; + } + + Ok(()) + } } diff --git a/libs/service/workspace/members.rs b/libs/service/workspace/members.rs index 63b4d44..4966f23 100644 --- a/libs/service/workspace/members.rs +++ b/libs/service/workspace/members.rs @@ -373,6 +373,40 @@ impl AppService { self.email.send(envelope).await.map_err(|e| { AppError::InternalServerError(format!("Failed to send invitation email: {}", e)) })?; + + // Send in-app notification + push notification to the invitee + self.send_push_to_user( + target_user.uid, + crate::push::PushPayload { + title: format!("Workspace invitation: {}", ws.name), + body: format!("{} invited you to join the workspace \"{}\"", inviter.username, ws.name), + url: Some(format!("/workspaces/{}/invitations", ws.slug)), + icon: None, + }, + ); + + let _ = self + .room + .notification_create(room::NotificationCreateRequest { + notification_type: room::NotificationType::WorkspaceInvitation, + user_id: target_user.uid, + title: format!("{} invited you to join \"{}\"", inviter.username, ws.name), + content: None, + room_id: None, + project_id: Default::default(), // workspace invitations don't have a project_id + related_message_id: None, + related_user_id: Some(user_uid), + related_room_id: None, + metadata: Some(serde_json::json!({ + "workspace_id": ws.id, + "workspace_name": ws.name, + "workspace_slug": ws.slug, + "inviter_uid": user_uid, + })), + expires_at: None, + }) + .await; + Ok(()) }