gitdataai/libs/room/src/notification.rs
2026-04-15 09:08:09 +08:00

323 lines
11 KiB
Rust

use deadpool_redis::redis;
use std::sync::Arc;
use crate::ws_context::WsUserContext;
use chrono::Utc;
use models::rooms::room_notifications;
use models::users::user as user_model;
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, Set, prelude::Expr, query::*};
use uuid::Uuid;
use crate::connection::extract_get_redis;
use crate::error::RoomError;
use crate::service::RoomService;
impl RoomService {
pub async fn notification_create(
&self,
request: super::NotificationCreateRequest,
) -> Result<super::NotificationResponse, RoomError> {
let notification_type = match request.notification_type {
super::NotificationType::Mention => room_notifications::NotificationType::Mention,
super::NotificationType::Invitation => room_notifications::NotificationType::Invitation,
super::NotificationType::RoleChange => room_notifications::NotificationType::RoleChange,
super::NotificationType::RoomCreated => {
room_notifications::NotificationType::RoomCreated
}
super::NotificationType::RoomDeleted => {
room_notifications::NotificationType::RoomDeleted
}
super::NotificationType::SystemAnnouncement => {
room_notifications::NotificationType::SystemAnnouncement
}
};
let model = room_notifications::ActiveModel {
id: Set(Uuid::now_v7()),
room: Set(request.room_id),
project: Set(Some(request.project_id)),
user_id: Set(Some(request.user_id)),
notification_type: Set(notification_type),
related_message_id: Set(request.related_message_id),
related_user_id: Set(request.related_user_id),
related_room_id: Set(request.related_room_id),
title: Set(request.title),
content: Set(request.content),
metadata: Set(request.metadata),
is_read: Set(false),
is_archived: Set(false),
created_at: Set(Utc::now()),
read_at: Set(None),
expires_at: Set(request.expires_at),
}
.insert(&self.db)
.await?;
let user_info = {
let user = user_model::Entity::find()
.filter(user_model::Column::Uid.eq(request.user_id))
.one(&self.db)
.await
.ok()
.flatten();
user.map(|u| super::UserInfo {
uid: u.uid,
username: u.username,
avatar_url: u.avatar_url,
})
};
let response = super::NotificationResponse {
id: model.id,
room: model.room,
project: model.project,
user_id: model.user_id,
user_info,
notification_type: model.notification_type.to_string(),
title: model.title,
content: model.content,
related_message_id: model.related_message_id,
related_user_id: model.related_user_id,
related_room_id: model.related_room_id,
metadata: model.metadata.unwrap_or(serde_json::json!({})),
is_read: model.is_read,
is_archived: model.is_archived,
created_at: model.created_at,
read_at: model.read_at,
expires_at: model.expires_at,
};
self.push_notification_event(request.user_id, response.clone())
.await;
self.incr_unread_count_cache(request.user_id).await;
Ok(response)
}
pub async fn notification_list(
&self,
only_unread: Option<bool>,
archived: Option<bool>,
limit: Option<u64>,
ctx: &WsUserContext,
) -> Result<super::NotificationListResponse, RoomError> {
let user_id = ctx.user_id;
let show_archived = archived.unwrap_or(false);
let mut query = room_notifications::Entity::find()
.filter(room_notifications::Column::UserId.eq(user_id))
.filter(room_notifications::Column::IsArchived.eq(show_archived));
if only_unread.unwrap_or(false) {
query = query.filter(room_notifications::Column::IsRead.eq(false));
}
let unread_count = room_notifications::Entity::find()
.filter(room_notifications::Column::UserId.eq(user_id))
.filter(room_notifications::Column::IsArchived.eq(false))
.filter(room_notifications::Column::IsRead.eq(false))
.count(&self.db)
.await? as i64;
let total = query.clone().count(&self.db).await? as i64;
let models = query
.order_by_desc(room_notifications::Column::CreatedAt)
.limit(limit.unwrap_or(50))
.all(&self.db)
.await?;
let user_ids: Vec<Uuid> = models.iter().filter_map(|m| m.user_id).collect();
let users: std::collections::HashMap<Uuid, super::UserInfo> = if !user_ids.is_empty() {
user_model::Entity::find()
.filter(user_model::Column::Uid.is_in(user_ids))
.all(&self.db)
.await?
.into_iter()
.map(|u| {
(
u.uid,
super::UserInfo {
uid: u.uid,
username: u.username,
avatar_url: u.avatar_url,
},
)
})
.collect()
} else {
std::collections::HashMap::new()
};
let notifications: Vec<super::NotificationResponse> = models
.into_iter()
.map(|m| super::NotificationResponse {
id: m.id,
room: m.room,
project: m.project,
user_id: m.user_id,
user_info: m.user_id.and_then(|uid| users.get(&uid).cloned()),
notification_type: m.notification_type.to_string(),
title: m.title,
content: m.content,
related_message_id: m.related_message_id,
related_user_id: m.related_user_id,
related_room_id: m.related_room_id,
metadata: m.metadata.unwrap_or(serde_json::json!({})),
is_read: m.is_read,
is_archived: m.is_archived,
created_at: m.created_at,
read_at: m.read_at,
expires_at: m.expires_at,
})
.collect();
Ok(super::NotificationListResponse {
notifications,
total,
unread_count,
})
}
pub async fn notification_mark_read(
&self,
notification_id: Uuid,
ctx: &WsUserContext,
) -> Result<(), RoomError> {
let user_id = ctx.user_id;
let model = room_notifications::Entity::find_by_id(notification_id)
.one(&self.db)
.await?
.ok_or_else(|| RoomError::NotFound("Notification not found".to_string()))?;
if model.user_id != Some(user_id) {
return Err(RoomError::NoPower);
}
if !model.is_read {
let mut active: room_notifications::ActiveModel = model.into();
active.is_read = Set(true);
active.read_at = Set(Some(Utc::now()));
active.update(&self.db).await?;
self.decr_unread_count_cache(user_id).await;
}
Ok(())
}
pub async fn notification_mark_all_read(&self, ctx: &WsUserContext) -> Result<u64, RoomError> {
let user_id = ctx.user_id;
let result = room_notifications::Entity::update_many()
.filter(room_notifications::Column::UserId.eq(user_id))
.filter(room_notifications::Column::IsArchived.eq(false))
.filter(room_notifications::Column::IsRead.eq(false))
.col_expr(room_notifications::Column::IsRead, Expr::value(true))
.col_expr(
room_notifications::Column::ReadAt,
Expr::value(Some(Utc::now())),
)
.exec(&self.db)
.await?;
self.reset_unread_count_cache(user_id).await;
Ok(result.rows_affected)
}
pub async fn notification_archive(
&self,
notification_id: Uuid,
ctx: &WsUserContext,
) -> Result<(), RoomError> {
let user_id = ctx.user_id;
let model = room_notifications::Entity::find_by_id(notification_id)
.one(&self.db)
.await?
.ok_or_else(|| RoomError::NotFound("Notification not found".to_string()))?;
if model.user_id != Some(user_id) {
return Err(RoomError::NoPower);
}
let mut active: room_notifications::ActiveModel = model.into();
active.is_archived = Set(true);
active.update(&self.db).await?;
Ok(())
}
pub async fn notification_cleanup_expired(&self) -> Result<u64, RoomError> {
let result = room_notifications::Entity::delete_many()
.filter(room_notifications::Column::ExpiresAt.lt(Utc::now()))
.exec(&self.db)
.await?;
Ok(result.rows_affected)
}
async fn push_notification_event(
&self,
user_id: Uuid,
notification: super::NotificationResponse,
) {
let event = super::NotificationEvent::new(notification);
self.room_manager
.push_user_notification(user_id, Arc::new(event))
.await;
}
fn unread_cache_key(user_id: Uuid) -> String {
format!("room:notification:unread:{}", user_id)
}
async fn incr_unread_count_cache(&self, user_id: Uuid) {
let get_redis = extract_get_redis(self.queue.clone());
let key = Self::unread_cache_key(user_id);
tokio::spawn(async move {
let redis = match (get_redis)().await {
Ok(r) => r,
Err(_) => return,
};
let mut conn = redis;
let _: Result<i64, _> = redis::cmd("INCR").arg(&key).query_async(&mut conn).await;
let _: Result<(), _> = redis::cmd("EXPIRE")
.arg(&key)
.arg(3600)
.query_async(&mut conn)
.await;
});
}
async fn decr_unread_count_cache(&self, user_id: Uuid) {
let get_redis = extract_get_redis(self.queue.clone());
let key = Self::unread_cache_key(user_id);
tokio::spawn(async move {
let redis = match (get_redis)().await {
Ok(r) => r,
Err(_) => return,
};
let mut conn = redis;
let _: Result<(), _> = redis::cmd("EVAL")
.arg(r#"local c = redis.call('GET', KEYS[1]); if c and tonumber(c) > 0 then return redis.call('DECR', KEYS[1]) else return 0 end"#)
.arg(1)
.arg(&key)
.query_async(&mut conn)
.await;
});
}
async fn reset_unread_count_cache(&self, user_id: Uuid) {
let get_redis = extract_get_redis(self.queue.clone());
let key = Self::unread_cache_key(user_id);
tokio::spawn(async move {
let redis = match (get_redis)().await {
Ok(r) => r,
Err(_) => return,
};
let mut conn = redis;
let _: Result<(), _> = redis::cmd("DEL").arg(&key).query_async(&mut conn).await;
});
}
}