gitdataai/libs/room/src/member.rs
ZhenYi 57779822dc refactor(room): migrate from slog to tracing + upgrade metrics to 0.22
- Remove all use slog::* imports and log: slog::Logger fields
- Replace slog macros with tracing::{info!, warn!, error!, debug!}
- metrics.rs: upgrade metrics 0.21→0.22, remove register_*! macros,
  use functional API: metrics::gauge!(), metrics::counter!(),
  metrics::histogram!(), metrics::describe_gauge!() etc.
- RoomMetrics: all fields now use functional metrics API, dynamic
  room_id labels passed as owned String to avoid lifetime issues
- RoomService: remove pub log: slog::Logger field
- connection.rs: remove log from subscribe_room_events,
  subscribe_project_room_events, subscribe_task_events_fn
2026-04-21 22:28:52 +08:00

434 lines
15 KiB
Rust

use crate::error::RoomError;
use crate::service::RoomService;
use crate::ws_context::WsUserContext;
use chrono::Utc;
use models::projects::project_members;
use models::rooms::{RoomMemberRole, room_member};
use models::users::user as user_model;
use sea_orm::*;
use uuid::Uuid;
impl RoomService {
/// Cache TTL for member list (in seconds).
const MEMBER_LIST_CACHE_TTL: u64 = 30;
pub async fn room_member_list(
&self,
room_id: Uuid,
ctx: &WsUserContext,
) -> Result<Vec<super::RoomMemberResponse>, RoomError> {
let user_id = ctx.user_id;
self.require_room_member(room_id, user_id).await?;
// Try cache first
let cache_key = format!("room:members:{}", room_id);
if let Ok(mut conn) = self.cache.conn().await {
if let Ok(Some(cached)) = redis::cmd("GET")
.arg(&cache_key)
.query_async::<Option<String>>(&mut conn)
.await
{
if let Ok(responses) = serde_json::from_str::<Vec<super::RoomMemberResponse>>(&cached) {
tracing::debug!(cache_key = %cache_key, "room_member_list: cache hit");
return Ok(responses);
}
}
}
tracing::debug!(cache_key = %cache_key, "room_member_list: cache miss");
let members = room_member::Entity::find()
.filter(room_member::Column::Room.eq(room_id))
.all(&self.db)
.await?;
let user_ids: Vec<Uuid> = members.iter().map(|m| m.user).collect();
let users: std::collections::HashMap<Uuid, super::UserInfo> = if !user_ids.is_empty() {
use sea_orm::ColumnTrait;
user_model::Entity::find()
.filter(user_model::Column::Uid.is_in(user_ids))
.all(&self.db)
.await?
.into_iter()
.map(|u| {
(
u.uid,
super::UserInfo {
uid: u.uid,
username: u.username,
avatar_url: u.avatar_url,
},
)
})
.collect()
} else {
std::collections::HashMap::new()
};
let responses = members
.into_iter()
.map(|m| super::RoomMemberResponse {
room: m.room,
user: m.user,
user_info: users.get(&m.user).cloned(),
role: m.role.to_string(),
first_msg_in: m.first_msg_in,
joined_at: m.joined_at,
last_read_seq: m.last_read_seq,
do_not_disturb: m.do_not_disturb,
dnd_start_hour: m.dnd_start_hour,
dnd_end_hour: m.dnd_end_hour,
})
.collect();
// Cache the result
if let Ok(mut conn) = self.cache.conn().await {
if let Ok(json) = serde_json::to_string(&responses) {
let _: Option<String> = redis::cmd("SETEX")
.arg(&cache_key)
.arg(Self::MEMBER_LIST_CACHE_TTL)
.arg(&json)
.query_async(&mut conn)
.await
.inspect_err(|e| {
tracing::warn!(cache_key = %cache_key, error = %e, "room_member_list: failed to cache");
})
.ok();
}
}
Ok(responses)
}
pub async fn room_member_add(
&self,
room_id: Uuid,
request: super::RoomMemberAddRequest,
ctx: &WsUserContext,
) -> Result<super::RoomMemberResponse, RoomError> {
let actor_id = ctx.user_id;
let room_model = self.find_room_or_404(room_id).await?;
self.require_room_admin(room_id, actor_id).await?;
let target_project_member = project_members::Entity::find()
.filter(project_members::Column::Project.eq(room_model.project))
.filter(project_members::Column::User.eq(request.user_id))
.one(&self.db)
.await?;
if target_project_member.is_none() {
return Err(RoomError::NoPower);
}
if let Some(existing) = self.find_room_member(room_id, request.user_id).await? {
let user_info = user_model::Entity::find()
.filter(user_model::Column::Uid.eq(request.user_id))
.one(&self.db)
.await
.ok()
.flatten()
.map(|u| super::UserInfo {
uid: u.uid,
username: u.username,
avatar_url: u.avatar_url,
});
let mut response = super::RoomMemberResponse::from(existing);
response.user_info = user_info;
return Ok(response);
}
let role = if let Some(role) = request.role {
Self::parse_room_member_role(&role.to_lowercase())?
} else {
RoomMemberRole::Member
};
let created = room_member::ActiveModel {
room: Set(room_id),
user: Set(request.user_id),
role: Set(role),
first_msg_in: Set(None),
joined_at: Set(Some(Utc::now())),
last_read_seq: Set(None),
do_not_disturb: Set(false),
dnd_start_hour: Set(None),
dnd_end_hour: Set(None),
}
.insert(&self.db)
.await?;
drop(self.room_manager.subscribe(room_id, request.user_id).await);
// Invalidate member list cache
self.invalidate_member_list_cache(room_id).await;
self.publish_room_event(
room_model.project,
super::RoomEventType::MemberJoined,
Some(room_id),
None,
None,
None,
)
.await;
let _ = self
.notification_create(super::NotificationCreateRequest {
notification_type: super::NotificationType::Invitation,
user_id: request.user_id,
title: format!("你已被邀请加入房间 {}", room_model.room_name),
content: None,
room_id: Some(room_id),
project_id: room_model.project,
related_message_id: None,
related_user_id: Some(actor_id),
related_room_id: Some(room_id),
metadata: None,
expires_at: None,
})
.await;
let created_response = {
let user_info = user_model::Entity::find()
.filter(user_model::Column::Uid.eq(request.user_id))
.one(&self.db)
.await
.ok()
.flatten()
.map(|u| super::UserInfo {
uid: u.uid,
username: u.username,
avatar_url: u.avatar_url,
});
let mut r = super::RoomMemberResponse::from(created);
r.user_info = user_info;
r
};
Ok(created_response)
}
pub async fn room_member_update_role(
&self,
room_id: Uuid,
request: super::RoomMemberRoleUpdateRequest,
ctx: &WsUserContext,
) -> Result<super::RoomMemberResponse, RoomError> {
let actor_id = ctx.user_id;
let actor = self.require_room_admin(room_id, actor_id).await?;
let target = self
.find_room_member(room_id, request.user_id)
.await?
.ok_or_else(|| RoomError::NotFound("Room member not found".to_string()))?;
if target.role == RoomMemberRole::Owner {
return Err(RoomError::NoPower);
}
let new_role = Self::parse_room_member_role(&request.role.to_lowercase())?;
if matches!(new_role, RoomMemberRole::Owner) {
return Err(RoomError::NoPower);
}
if actor.role != RoomMemberRole::Owner && matches!(new_role, RoomMemberRole::Admin) {
return Err(RoomError::NoPower);
}
let old_role = target.role.clone();
let new_role_cloned = new_role.clone();
let mut active: room_member::ActiveModel = target.into();
active.role = Set(new_role);
let updated = active.update(&self.db).await?;
// Invalidate member list cache
self.invalidate_member_list_cache(room_id).await;
let room = self.find_room_or_404(room_id).await?;
let _ = self
.notification_create(super::NotificationCreateRequest {
notification_type: super::NotificationType::RoleChange,
user_id: request.user_id,
title: format!(
"你在房间 {} 的角色已变更为 {}",
room.room_name, new_role_cloned
),
content: None,
room_id: Some(room_id),
project_id: room.project,
related_message_id: None,
related_user_id: Some(actor_id),
related_room_id: Some(room_id),
metadata: Some(serde_json::json!({
"old_role": old_role.to_string(),
"new_role": new_role_cloned.to_string(),
})),
expires_at: None,
})
.await;
let updated_response = {
let user_info = user_model::Entity::find()
.filter(user_model::Column::Uid.eq(request.user_id))
.one(&self.db)
.await
.ok()
.flatten()
.map(|u| super::UserInfo {
uid: u.uid,
username: u.username,
avatar_url: u.avatar_url,
});
let mut r = super::RoomMemberResponse::from(updated);
r.user_info = user_info;
r
};
Ok(updated_response)
}
pub async fn room_member_remove(
&self,
room_id: Uuid,
user_id: Uuid,
ctx: &WsUserContext,
) -> Result<(), RoomError> {
let actor_id = ctx.user_id;
let actor = self.require_room_admin(room_id, actor_id).await?;
let target = self
.find_room_member(room_id, user_id)
.await?
.ok_or_else(|| RoomError::NotFound("Room member not found".to_string()))?;
if target.role == RoomMemberRole::Owner {
return Err(RoomError::NoPower);
}
if actor.role == RoomMemberRole::Admin && target.role == RoomMemberRole::Admin {
return Err(RoomError::NoPower);
}
room_member::Entity::delete_by_id((room_id, user_id))
.exec(&self.db)
.await?;
// Invalidate member list cache
self.invalidate_member_list_cache(room_id).await;
self.room_manager.unsubscribe(room_id, user_id).await;
let room = self.find_room_or_404(room_id).await?;
self.publish_room_event(
room.project,
super::RoomEventType::MemberRemoved,
Some(room_id),
None,
None,
None,
)
.await;
Ok(())
}
pub async fn room_member_set_read_seq(
&self,
room_id: Uuid,
request: super::RoomMemberReadSeqRequest,
ctx: &WsUserContext,
) -> Result<super::RoomMemberResponse, RoomError> {
let user_id = ctx.user_id;
let member = self.require_room_member_model(room_id, user_id).await?;
let mut active: room_member::ActiveModel = member.into();
active.last_read_seq = Set(Some(request.last_read_seq));
let updated = active.update(&self.db).await?;
let room = self.find_room_or_404(room_id).await?;
self.publish_room_event(
room.project,
super::RoomEventType::ReadReceipt,
Some(room_id),
None,
Some(user_id),
Some(request.last_read_seq),
)
.await;
let updated_response = {
let user_info = user_model::Entity::find()
.filter(user_model::Column::Uid.eq(user_id))
.one(&self.db)
.await
.ok()
.flatten()
.map(|u| super::UserInfo {
uid: u.uid,
username: u.username,
avatar_url: u.avatar_url,
});
let mut r = super::RoomMemberResponse::from(updated);
r.user_info = user_info;
r
};
Ok(updated_response)
}
pub async fn room_member_update_dnd(
&self,
room_id: Uuid,
request: super::RoomMemberUpdateDndRequest,
ctx: &WsUserContext,
) -> Result<super::RoomMemberResponse, RoomError> {
let user_id = ctx.user_id;
let member = self.require_room_member_model(room_id, user_id).await?;
let mut active: room_member::ActiveModel = member.into();
if let Some(dnd) = request.do_not_disturb {
active.do_not_disturb = Set(dnd);
}
if let Some(start) = request.dnd_start_hour {
if !(0..=23).contains(&start) {
return Err(RoomError::BadRequest("dnd_start_hour must be 0-23".into()));
}
active.dnd_start_hour = Set(Some(start));
}
if let Some(end) = request.dnd_end_hour {
if !(0..=23).contains(&end) {
return Err(RoomError::BadRequest("dnd_end_hour must be 0-23".into()));
}
active.dnd_end_hour = Set(Some(end));
}
let updated = active.update(&self.db).await?;
let updated_response = {
let user_info = user_model::Entity::find()
.filter(user_model::Column::Uid.eq(user_id))
.one(&self.db)
.await
.ok()
.flatten()
.map(|u| super::UserInfo {
uid: u.uid,
username: u.username,
avatar_url: u.avatar_url,
});
let mut r = super::RoomMemberResponse::from(updated);
r.user_info = user_info;
r
};
Ok(updated_response)
}
/// Invalidate member list cache for a room.
async fn invalidate_member_list_cache(&self, room_id: Uuid) {
let cache_key = format!("room:members:{}", room_id);
if let Ok(mut conn) = self.cache.conn().await {
if let Err(e) = redis::cmd("DEL")
.arg(&cache_key)
.query_async::<i64>(&mut conn)
.await
{
tracing::warn!(cache_key = %cache_key, error = %e, "invalidate_member_list_cache: DEL failed");
} else {
tracing::debug!(cache_key = %cache_key, "invalidate_member_list_cache: deleted");
}
}
}
}