use deadpool_redis::redis; use std::sync::Arc; use crate::ws_context::WsUserContext; use chrono::Utc; use models::rooms::room_notifications; use models::users::user as user_model; use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, Set, prelude::Expr, query::*}; use uuid::Uuid; use crate::connection::extract_get_redis; use crate::error::RoomError; use crate::service::RoomService; impl RoomService { pub async fn notification_create( &self, request: super::NotificationCreateRequest, ) -> Result { let notification_type = match request.notification_type { super::NotificationType::Mention => room_notifications::NotificationType::Mention, super::NotificationType::Invitation => room_notifications::NotificationType::Invitation, super::NotificationType::RoleChange => room_notifications::NotificationType::RoleChange, super::NotificationType::RoomCreated => { room_notifications::NotificationType::RoomCreated } super::NotificationType::RoomDeleted => { room_notifications::NotificationType::RoomDeleted } super::NotificationType::SystemAnnouncement => { room_notifications::NotificationType::SystemAnnouncement } }; let model = room_notifications::ActiveModel { id: Set(Uuid::now_v7()), room: Set(request.room_id), project: Set(Some(request.project_id)), user_id: Set(Some(request.user_id)), notification_type: Set(notification_type), related_message_id: Set(request.related_message_id), related_user_id: Set(request.related_user_id), related_room_id: Set(request.related_room_id), title: Set(request.title), content: Set(request.content), metadata: Set(request.metadata), is_read: Set(false), is_archived: Set(false), created_at: Set(Utc::now()), read_at: Set(None), expires_at: Set(request.expires_at), } .insert(&self.db) .await?; let user_info = { let user = user_model::Entity::find() .filter(user_model::Column::Uid.eq(request.user_id)) .one(&self.db) .await .ok() .flatten(); user.map(|u| super::UserInfo { uid: u.uid, username: u.username, avatar_url: u.avatar_url, }) }; let response = super::NotificationResponse { id: model.id, room: model.room, project: model.project, user_id: model.user_id, user_info, notification_type: model.notification_type.to_string(), title: model.title, content: model.content, related_message_id: model.related_message_id, related_user_id: model.related_user_id, related_room_id: model.related_room_id, metadata: model.metadata.unwrap_or(serde_json::json!({})), is_read: model.is_read, is_archived: model.is_archived, created_at: model.created_at, read_at: model.read_at, expires_at: model.expires_at, }; self.push_notification_event(request.user_id, response.clone()) .await; self.incr_unread_count_cache(request.user_id).await; Ok(response) } pub async fn notification_list( &self, only_unread: Option, archived: Option, limit: Option, ctx: &WsUserContext, ) -> Result { let user_id = ctx.user_id; let show_archived = archived.unwrap_or(false); let mut query = room_notifications::Entity::find() .filter(room_notifications::Column::UserId.eq(user_id)) .filter(room_notifications::Column::IsArchived.eq(show_archived)); if only_unread.unwrap_or(false) { query = query.filter(room_notifications::Column::IsRead.eq(false)); } let unread_count = room_notifications::Entity::find() .filter(room_notifications::Column::UserId.eq(user_id)) .filter(room_notifications::Column::IsArchived.eq(false)) .filter(room_notifications::Column::IsRead.eq(false)) .count(&self.db) .await? as i64; let total = query.clone().count(&self.db).await? as i64; let models = query .order_by_desc(room_notifications::Column::CreatedAt) .limit(limit.unwrap_or(50)) .all(&self.db) .await?; let user_ids: Vec = models.iter().filter_map(|m| m.user_id).collect(); let users: std::collections::HashMap = if !user_ids.is_empty() { user_model::Entity::find() .filter(user_model::Column::Uid.is_in(user_ids)) .all(&self.db) .await? .into_iter() .map(|u| { ( u.uid, super::UserInfo { uid: u.uid, username: u.username, avatar_url: u.avatar_url, }, ) }) .collect() } else { std::collections::HashMap::new() }; let notifications: Vec = models .into_iter() .map(|m| super::NotificationResponse { id: m.id, room: m.room, project: m.project, user_id: m.user_id, user_info: m.user_id.and_then(|uid| users.get(&uid).cloned()), notification_type: m.notification_type.to_string(), title: m.title, content: m.content, related_message_id: m.related_message_id, related_user_id: m.related_user_id, related_room_id: m.related_room_id, metadata: m.metadata.unwrap_or(serde_json::json!({})), is_read: m.is_read, is_archived: m.is_archived, created_at: m.created_at, read_at: m.read_at, expires_at: m.expires_at, }) .collect(); Ok(super::NotificationListResponse { notifications, total, unread_count, }) } pub async fn notification_mark_read( &self, notification_id: Uuid, ctx: &WsUserContext, ) -> Result<(), RoomError> { let user_id = ctx.user_id; let model = room_notifications::Entity::find_by_id(notification_id) .one(&self.db) .await? .ok_or_else(|| RoomError::NotFound("Notification not found".to_string()))?; if model.user_id != Some(user_id) { return Err(RoomError::NoPower); } if !model.is_read { let mut active: room_notifications::ActiveModel = model.into(); active.is_read = Set(true); active.read_at = Set(Some(Utc::now())); active.update(&self.db).await?; self.decr_unread_count_cache(user_id).await; } Ok(()) } pub async fn notification_mark_all_read(&self, ctx: &WsUserContext) -> Result { let user_id = ctx.user_id; let result = room_notifications::Entity::update_many() .filter(room_notifications::Column::UserId.eq(user_id)) .filter(room_notifications::Column::IsArchived.eq(false)) .filter(room_notifications::Column::IsRead.eq(false)) .col_expr(room_notifications::Column::IsRead, Expr::value(true)) .col_expr( room_notifications::Column::ReadAt, Expr::value(Some(Utc::now())), ) .exec(&self.db) .await?; self.reset_unread_count_cache(user_id).await; Ok(result.rows_affected) } pub async fn notification_archive( &self, notification_id: Uuid, ctx: &WsUserContext, ) -> Result<(), RoomError> { let user_id = ctx.user_id; let model = room_notifications::Entity::find_by_id(notification_id) .one(&self.db) .await? .ok_or_else(|| RoomError::NotFound("Notification not found".to_string()))?; if model.user_id != Some(user_id) { return Err(RoomError::NoPower); } let mut active: room_notifications::ActiveModel = model.into(); active.is_archived = Set(true); active.update(&self.db).await?; Ok(()) } pub async fn notification_cleanup_expired(&self) -> Result { let result = room_notifications::Entity::delete_many() .filter(room_notifications::Column::ExpiresAt.lt(Utc::now())) .exec(&self.db) .await?; Ok(result.rows_affected) } async fn push_notification_event( &self, user_id: Uuid, notification: super::NotificationResponse, ) { let event = super::NotificationEvent::new(notification); self.room_manager .push_user_notification(user_id, Arc::new(event)) .await; } fn unread_cache_key(user_id: Uuid) -> String { format!("room:notification:unread:{}", user_id) } async fn incr_unread_count_cache(&self, user_id: Uuid) { let get_redis = extract_get_redis(self.queue.clone()); let key = Self::unread_cache_key(user_id); tokio::spawn(async move { let redis = match (get_redis)().await { Ok(r) => r, Err(_) => return, }; let mut conn = redis; let _: Result = redis::cmd("INCR").arg(&key).query_async(&mut conn).await; let _: Result<(), _> = redis::cmd("EXPIRE") .arg(&key) .arg(3600) .query_async(&mut conn) .await; }); } async fn decr_unread_count_cache(&self, user_id: Uuid) { let get_redis = extract_get_redis(self.queue.clone()); let key = Self::unread_cache_key(user_id); tokio::spawn(async move { let redis = match (get_redis)().await { Ok(r) => r, Err(_) => return, }; let mut conn = redis; let _: Result<(), _> = redis::cmd("EVAL") .arg(r#"local c = redis.call('GET', KEYS[1]); if c and tonumber(c) > 0 then return redis.call('DECR', KEYS[1]) else return 0 end"#) .arg(1) .arg(&key) .query_async(&mut conn) .await; }); } async fn reset_unread_count_cache(&self, user_id: Uuid) { let get_redis = extract_get_redis(self.queue.clone()); let key = Self::unread_cache_key(user_id); tokio::spawn(async move { let redis = match (get_redis)().await { Ok(r) => r, Err(_) => return, }; let mut conn = redis; let _: Result<(), _> = redis::cmd("DEL").arg(&key).query_async(&mut conn).await; }); } }