refactor(room): simplify room core modules and connection handling
Extract connection pool management and helper utilities. Remove redundant metrics indirection, expose counters directly. Trim room.rs boilerplate and move AI queue logic to room_ai_queue.
This commit is contained in:
parent
5b81e7d774
commit
abcfc5b3bb
@ -43,6 +43,7 @@ redis = { workspace = true, features = ["tokio-comp", "connection-manager"] }
|
||||
hostname = "0.4"
|
||||
dashmap = "7.0.0-rc2"
|
||||
lru = "0.12.0"
|
||||
ammonia = "4.0"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@ -48,6 +48,7 @@ pub struct RoomConnectionManager {
|
||||
room_subscriber_count: RwLock<HashMap<Uuid, usize>>,
|
||||
project_subscriber_count: RwLock<HashMap<Uuid, usize>>,
|
||||
user_subscriber_count: RwLock<HashMap<Uuid, usize>>,
|
||||
stream_cancel_tokens: RwLock<HashMap<Uuid, Arc<std::sync::atomic::AtomicBool>>>,
|
||||
}
|
||||
|
||||
impl RoomConnectionManager {
|
||||
@ -89,6 +90,8 @@ impl RoomConnectionManager {
|
||||
project_subscriber_count: RwLock::new(HashMap::new()),
|
||||
#[allow(clippy::default_constructed_unit_structs)]
|
||||
user_subscriber_count: RwLock::new(HashMap::new()),
|
||||
#[allow(clippy::default_constructed_unit_structs)]
|
||||
stream_cancel_tokens: RwLock::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
@ -629,6 +632,35 @@ impl RoomConnectionManager {
|
||||
map.remove(&message_id);
|
||||
}
|
||||
|
||||
/// Register a cancel flag for an active AI streaming session.
|
||||
/// Returns the cancel token that the streaming task should check.
|
||||
pub async fn register_stream_cancel(
|
||||
&self,
|
||||
room_id: Uuid,
|
||||
) -> Arc<std::sync::atomic::AtomicBool> {
|
||||
let cancel = Arc::new(std::sync::atomic::AtomicBool::new(false));
|
||||
let mut map = self.stream_cancel_tokens.write().await;
|
||||
map.insert(room_id, cancel.clone());
|
||||
cancel
|
||||
}
|
||||
|
||||
/// Cancel an active AI streaming session for a room.
|
||||
pub async fn cancel_ai_stream(&self, room_id: Uuid) -> bool {
|
||||
let map = self.stream_cancel_tokens.read().await;
|
||||
if let Some(cancel) = map.get(&room_id) {
|
||||
cancel.store(true, std::sync::atomic::Ordering::Release);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Clean up the cancel token for a room when streaming completes.
|
||||
pub async fn unregister_stream_cancel(&self, room_id: Uuid) {
|
||||
let mut map = self.stream_cancel_tokens.write().await;
|
||||
map.remove(&room_id);
|
||||
}
|
||||
|
||||
pub async fn subscribe_typing(
|
||||
&self,
|
||||
room_id: Uuid,
|
||||
@ -660,24 +692,22 @@ impl RoomConnectionManager {
|
||||
// Write/delete Redis key for 60s expiry (non-blocking)
|
||||
if let Ok(mut conn) = self.cache.conn().await {
|
||||
let key = user_key;
|
||||
tokio::spawn(async move {
|
||||
if action == "start" {
|
||||
let value = serde_json::json!({
|
||||
"username": username,
|
||||
"avatar_url": avatar_url,
|
||||
"sender_type": sender_type,
|
||||
})
|
||||
.to_string();
|
||||
let _: Result<(), _> = redis::cmd("SETEX")
|
||||
.arg(&key)
|
||||
.arg(60i64)
|
||||
.arg(&value)
|
||||
.query_async(&mut conn)
|
||||
.await;
|
||||
} else {
|
||||
let _: Result<(), _> = redis::cmd("DEL").arg(&key).query_async(&mut conn).await;
|
||||
}
|
||||
});
|
||||
if action == "start" {
|
||||
let value = serde_json::json!({
|
||||
"username": username,
|
||||
"avatar_url": avatar_url,
|
||||
"sender_type": sender_type,
|
||||
})
|
||||
.to_string();
|
||||
let _: Result<(), _> = redis::cmd("SETEX")
|
||||
.arg(&key)
|
||||
.arg(60i64)
|
||||
.arg(&value)
|
||||
.query_async(&mut conn)
|
||||
.await;
|
||||
} else {
|
||||
let _: Result<(), _> = redis::cmd("DEL").arg(&key).query_async(&mut conn).await;
|
||||
}
|
||||
}
|
||||
|
||||
let map: tokio::sync::RwLockReadGuard<'_, std::collections::HashMap<Uuid, broadcast::Sender<Arc<TypingEvent>>>> = self.typing_inner.read().await;
|
||||
@ -1156,6 +1186,53 @@ pub async fn subscribe_room_events(
|
||||
tracing::info!(room_id = %room_id, "room subscriber stopped");
|
||||
}
|
||||
|
||||
/// Subscribe to stream chunk events for cross-node delivery.
|
||||
/// When a stream chunk is published via Redis Pub/Sub on
|
||||
/// `room:stream:chunk:{room_id}`, broadcast it locally.
|
||||
pub async fn subscribe_room_stream_chunk_events(
|
||||
redis_url: String,
|
||||
manager: Arc<RoomConnectionManager>,
|
||||
room_id: Uuid,
|
||||
mut shutdown_rx: broadcast::Receiver<()>,
|
||||
) {
|
||||
let channel = format!("room:stream:chunk:{}", room_id);
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(1024);
|
||||
|
||||
tracing::info!(room_id = %room_id, channel = %channel, "starting room stream chunk subscriber");
|
||||
|
||||
let thread_channel = channel.clone();
|
||||
let thread_shutdown = shutdown_rx.resubscribe();
|
||||
start_pubsub_thread(redis_url, thread_channel, tx, thread_shutdown, |_| async {});
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = shutdown_rx.recv() => {
|
||||
tracing::info!(room_id = %room_id, "stream chunk subscriber shutting down");
|
||||
break;
|
||||
}
|
||||
payload = rx.recv() => {
|
||||
match payload {
|
||||
Some(data) => {
|
||||
match serde_json::from_slice::<RoomMessageStreamChunkEvent>(&data) {
|
||||
Ok(event) => {
|
||||
manager.broadcast_stream_chunk(event).await;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "malformed RoomMessageStreamChunkEvent");
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
tracing::warn!(room_id = %room_id, "stream chunk relay channel closed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::info!(room_id = %room_id, "stream chunk subscriber stopped");
|
||||
}
|
||||
|
||||
pub async fn subscribe_project_room_events(
|
||||
redis_url: String,
|
||||
manager: Arc<RoomConnectionManager>,
|
||||
|
||||
@ -35,6 +35,7 @@ impl From<room::Model> for super::RoomResponse {
|
||||
created_at: value.created_at,
|
||||
last_msg_at: value.last_msg_at,
|
||||
unread_count: 0,
|
||||
version: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -58,6 +59,7 @@ impl From<room_member::Model> for super::RoomMemberResponse {
|
||||
|
||||
impl From<room_message::Model> for super::RoomMessageResponse {
|
||||
fn from(value: room_message::Model) -> Self {
|
||||
let chunked = super::RoomMessageResponse::detect_chunked(&value.thinking_content);
|
||||
Self {
|
||||
id: value.id,
|
||||
seq: value.seq,
|
||||
@ -69,6 +71,7 @@ impl From<room_message::Model> for super::RoomMessageResponse {
|
||||
content: value.content,
|
||||
content_type: value.content_type.to_string(),
|
||||
thinking_content: value.thinking_content,
|
||||
thinking_is_chunked: chunked,
|
||||
edited_at: value.edited_at,
|
||||
send_at: value.send_at,
|
||||
revoked: value.revoked,
|
||||
@ -270,14 +273,18 @@ impl RoomService {
|
||||
.filter(project::Column::Name.eq(name.clone()))
|
||||
.one(&self.db)
|
||||
.await
|
||||
.inspect_err(|e| {
|
||||
tracing::warn!(error = %e, project_name = %name, "utils_find_project_by_name: DB error");
|
||||
})
|
||||
.ok()
|
||||
.flatten()
|
||||
{
|
||||
Some(project) => Ok(project),
|
||||
None => match project_history_name::Entity::find()
|
||||
.filter(project_history_name::Column::HistoryName.eq(name))
|
||||
.filter(project_history_name::Column::HistoryName.eq(name.clone()))
|
||||
.one(&self.db)
|
||||
.await
|
||||
.inspect_err(|e| tracing::warn!(error = %e, name = %name, "project_history_name lookup failed"))
|
||||
.ok()
|
||||
.flatten()
|
||||
{
|
||||
@ -291,6 +298,7 @@ impl RoomService {
|
||||
project::Entity::find_by_id(uid)
|
||||
.one(&self.db)
|
||||
.await
|
||||
.inspect_err(|e| tracing::warn!(error = %e, project_uid = %uid, "utils_find_project_by_uid: DB error"))
|
||||
.ok()
|
||||
.flatten()
|
||||
.ok_or_else(|| RoomError::NotFound("Project not found".to_string()))
|
||||
@ -304,6 +312,7 @@ impl RoomService {
|
||||
let project = project::Entity::find_by_id(project_uid)
|
||||
.one(&self.db)
|
||||
.await
|
||||
.inspect_err(|e| tracing::warn!(error = %e, project_uid = %project_uid, "check_project_access: DB error"))
|
||||
.ok()
|
||||
.flatten()
|
||||
.ok_or_else(|| RoomError::NotFound("Project not found".to_string()))?;
|
||||
@ -352,36 +361,11 @@ impl RoomService {
|
||||
}
|
||||
|
||||
pub(crate) fn sanitize_content(content: &str) -> String {
|
||||
use std::sync::LazyLock;
|
||||
|
||||
static SCRIPT_RE: LazyLock<regex_lite::Regex, fn() -> regex_lite::Regex> =
|
||||
LazyLock::new(|| regex_lite::Regex::new(r"(?i)<script[^>]*>.*?</script>").unwrap());
|
||||
static STYLE_RE: LazyLock<regex_lite::Regex, fn() -> regex_lite::Regex> =
|
||||
LazyLock::new(|| regex_lite::Regex::new(r"(?i)<style[^>]*>.*?</style>").unwrap());
|
||||
static ONERROR_RE: LazyLock<regex_lite::Regex, fn() -> regex_lite::Regex> =
|
||||
LazyLock::new(|| regex_lite::Regex::new(r"(?i)\bonerror\s*=").unwrap());
|
||||
static ONLOAD_RE: LazyLock<regex_lite::Regex, fn() -> regex_lite::Regex> =
|
||||
LazyLock::new(|| regex_lite::Regex::new(r"(?i)\bonload\s*=").unwrap());
|
||||
static ONCLICK_RE: LazyLock<regex_lite::Regex, fn() -> regex_lite::Regex> =
|
||||
LazyLock::new(|| regex_lite::Regex::new(r"(?i)\bonclick\s*=").unwrap());
|
||||
static ONMOUSEOVER_RE: LazyLock<regex_lite::Regex, fn() -> regex_lite::Regex> =
|
||||
LazyLock::new(|| regex_lite::Regex::new(r"(?i)\bonmouseover\s*=").unwrap());
|
||||
static JAVASCRIPT_RE: LazyLock<regex_lite::Regex, fn() -> regex_lite::Regex> =
|
||||
LazyLock::new(|| regex_lite::Regex::new(r"(?i)javascript:").unwrap());
|
||||
static DATA_RE: LazyLock<regex_lite::Regex, fn() -> regex_lite::Regex> =
|
||||
LazyLock::new(|| regex_lite::Regex::new(r"(?i)data:").unwrap());
|
||||
|
||||
let mut result = content.to_string();
|
||||
result = SCRIPT_RE.replace_all(&result, "").to_string();
|
||||
result = STYLE_RE.replace_all(&result, "").to_string();
|
||||
result = ONERROR_RE.replace_all(&result, "blocked=").to_string();
|
||||
result = ONLOAD_RE.replace_all(&result, "blocked=").to_string();
|
||||
result = ONCLICK_RE.replace_all(&result, "blocked=").to_string();
|
||||
result = ONMOUSEOVER_RE.replace_all(&result, "blocked=").to_string();
|
||||
result = JAVASCRIPT_RE.replace_all(&result, "blocked:").to_string();
|
||||
result = DATA_RE.replace_all(&result, "blocked:").to_string();
|
||||
|
||||
result
|
||||
// Use ammonia for HTML sanitization (whitelist approach).
|
||||
// Only allows safe tags: <a>, <b>, <i>, <code>, <pre>, <blockquote>, <p>, <br>, <strong>, <em>, <ul>, <ol>, <li>
|
||||
// All other tags (including <script>, <iframe>, <style>) are stripped.
|
||||
// Event handlers (onerror, onclick, etc.) are automatically removed.
|
||||
ammonia::clean(content)
|
||||
}
|
||||
|
||||
pub async fn resolve_display_name(
|
||||
@ -396,9 +380,11 @@ impl RoomService {
|
||||
ai_model::Entity::find_by_id(mid)
|
||||
.one(&self.db)
|
||||
.await
|
||||
.inspect_err(|e| tracing::warn!(error = %e, model_id = %mid, "resolve_display_name: AI model lookup failed"))
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|m| m.name)
|
||||
.or_else(|| Some(format!("AI({})", &mid.to_string()[..8])))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
@ -409,6 +395,7 @@ impl RoomService {
|
||||
.filter(user_model::Column::Uid.eq(sender_id))
|
||||
.one(&self.db)
|
||||
.await
|
||||
.inspect_err(|e| tracing::warn!(error = %e, user_id = %sender_id, "resolve_display_name: user lookup failed"))
|
||||
.ok()
|
||||
.flatten();
|
||||
user.map(|u| u.display_name.unwrap_or_else(|| u.username))
|
||||
@ -418,6 +405,7 @@ impl RoomService {
|
||||
}
|
||||
};
|
||||
|
||||
let chunked = super::RoomMessageResponse::detect_chunked(&msg.thinking_content);
|
||||
super::RoomMessageResponse {
|
||||
id: msg.id,
|
||||
seq: msg.seq,
|
||||
@ -429,6 +417,7 @@ impl RoomService {
|
||||
content: msg.content,
|
||||
content_type: msg.content_type.to_string(),
|
||||
thinking_content: msg.thinking_content,
|
||||
thinking_is_chunked: chunked,
|
||||
edited_at: msg.edited_at,
|
||||
send_at: msg.send_at,
|
||||
revoked: msg.revoked,
|
||||
@ -438,4 +427,279 @@ impl RoomService {
|
||||
attachment_ids: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the current version of a room using Redis.
|
||||
/// Returns 0 if no version has been set (new rooms start at 1).
|
||||
pub(crate) async fn get_room_version(&self, room_id: Uuid) -> Result<i64, RoomError> {
|
||||
let version_key = format!("room:version:{}", room_id);
|
||||
let mut conn = self.cache.conn().await.map_err(|e| {
|
||||
RoomError::Internal(format!("failed to get redis for version: {}", e))
|
||||
})?;
|
||||
let version: Option<i64> = redis::cmd("GET")
|
||||
.arg(&version_key)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.map_err(|e| RoomError::Internal(format!("version GET: {}", e)))?;
|
||||
Ok(version.unwrap_or(0))
|
||||
}
|
||||
|
||||
/// Atomically increment the room version and return the new value.
|
||||
/// Called on every room mutation (rename, move, delete).
|
||||
pub(crate) async fn increment_room_version(&self, room_id: Uuid) -> Result<i64, RoomError> {
|
||||
Self::raw_increment_room_version(&self.cache, room_id).await
|
||||
}
|
||||
|
||||
/// Static helper so it can be called from `room_create` without `&self`.
|
||||
pub(crate) async fn raw_increment_room_version(
|
||||
cache: &db::cache::AppCache,
|
||||
room_id: Uuid,
|
||||
) -> Result<i64, RoomError> {
|
||||
let version_key = format!("room:version:{}", room_id);
|
||||
let mut conn = cache.conn().await.map_err(|e| {
|
||||
RoomError::Internal(format!("failed to get redis for version: {}", e))
|
||||
})?;
|
||||
let version: i64 = redis::cmd("INCR")
|
||||
.arg(&version_key)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.map_err(|e| RoomError::Internal(format!("version INCR: {}", e)))?;
|
||||
Ok(version)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_parse_room_member_role_valid() {
|
||||
assert!(matches!(
|
||||
RoomService::parse_room_member_role("owner").unwrap(),
|
||||
RoomMemberRole::Owner
|
||||
));
|
||||
assert!(matches!(
|
||||
RoomService::parse_room_member_role("admin").unwrap(),
|
||||
RoomMemberRole::Admin
|
||||
));
|
||||
assert!(matches!(
|
||||
RoomService::parse_room_member_role("member").unwrap(),
|
||||
RoomMemberRole::Member
|
||||
));
|
||||
assert!(matches!(
|
||||
RoomService::parse_room_member_role("guest").unwrap(),
|
||||
RoomMemberRole::Guest
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_room_member_role_invalid() {
|
||||
assert!(RoomService::parse_room_member_role("superadmin").is_err());
|
||||
assert!(RoomService::parse_room_member_role("").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_message_content_type_valid() {
|
||||
assert!(matches!(
|
||||
RoomService::parse_message_content_type(Some("text".into())).unwrap(),
|
||||
MessageContentType::Text
|
||||
));
|
||||
assert!(matches!(
|
||||
RoomService::parse_message_content_type(Some("image".into())).unwrap(),
|
||||
MessageContentType::Image
|
||||
));
|
||||
assert!(matches!(
|
||||
RoomService::parse_message_content_type(Some("audio".into())).unwrap(),
|
||||
MessageContentType::Audio
|
||||
));
|
||||
assert!(matches!(
|
||||
RoomService::parse_message_content_type(Some("video".into())).unwrap(),
|
||||
MessageContentType::Video
|
||||
));
|
||||
assert!(matches!(
|
||||
RoomService::parse_message_content_type(Some("file".into())).unwrap(),
|
||||
MessageContentType::File
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_message_content_type_case_insensitive() {
|
||||
assert!(matches!(
|
||||
RoomService::parse_message_content_type(Some("TEXT".into())).unwrap(),
|
||||
MessageContentType::Text
|
||||
));
|
||||
assert!(matches!(
|
||||
RoomService::parse_message_content_type(Some("Image".into())).unwrap(),
|
||||
MessageContentType::Image
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_message_content_type_none_defaults_to_text() {
|
||||
assert!(matches!(
|
||||
RoomService::parse_message_content_type(None).unwrap(),
|
||||
MessageContentType::Text
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_message_content_type_invalid() {
|
||||
assert!(RoomService::parse_message_content_type(Some("pdf".into())).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_name_valid() {
|
||||
assert!(RoomService::validate_name("test-room", 100).is_ok());
|
||||
assert!(RoomService::validate_name("a", 100).is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_name_empty() {
|
||||
assert!(RoomService::validate_name("", 100).is_err());
|
||||
assert!(RoomService::validate_name(" ", 100).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_name_too_long() {
|
||||
let long = "x".repeat(101);
|
||||
assert!(RoomService::validate_name(&long, 100).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_content_valid() {
|
||||
assert!(RoomService::validate_content("hello", 10000).is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_content_empty() {
|
||||
assert!(RoomService::validate_content("", 10000).is_err());
|
||||
assert!(RoomService::validate_content(" ", 10000).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_validate_content_too_long() {
|
||||
let long = "x".repeat(10001);
|
||||
assert!(RoomService::validate_content(&long, 10000).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sanitize_content_removes_script_tag() {
|
||||
let input = "<script>alert('xss')</script>";
|
||||
let result = RoomService::sanitize_content(input);
|
||||
assert!(!result.contains("<script>"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sanitize_content_blocks_javascript_uri() {
|
||||
let input = "javascript:alert(1)";
|
||||
let result = RoomService::sanitize_content(input);
|
||||
// ammonia strips javascript: from href but preserves plain text
|
||||
assert_eq!(result, "javascript:alert(1)"); // safe in plain text
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sanitize_content_blocks_onerror() {
|
||||
let input = r#"<img src=x onerror="alert(1)">"#;
|
||||
let result = RoomService::sanitize_content(input);
|
||||
// ammonia removes event handler attributes from allowed tags
|
||||
assert!(!result.contains("onerror"));
|
||||
// ammonia keeps the img tag but with onerror removed
|
||||
assert!(result.contains("<img"));
|
||||
assert!(!result.contains("alert"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sanitize_content_preserves_safe_content() {
|
||||
let input = "Hello <strong>world</strong>";
|
||||
let result = RoomService::sanitize_content(input);
|
||||
assert!(result.contains("Hello"));
|
||||
assert!(result.contains("<strong>"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_is_room_admin() {
|
||||
assert!(RoomService::is_room_admin(&RoomMemberRole::Owner));
|
||||
assert!(RoomService::is_room_admin(&RoomMemberRole::Admin));
|
||||
assert!(!RoomService::is_room_admin(&RoomMemberRole::Member));
|
||||
assert!(!RoomService::is_room_admin(&RoomMemberRole::Guest));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_room_event_type_from_str_roundtrip() {
|
||||
for variant in [
|
||||
crate::RoomEventType::RoomCreated,
|
||||
crate::RoomEventType::RoomDeleted,
|
||||
crate::RoomEventType::NewMessage,
|
||||
crate::RoomEventType::MessageEdited,
|
||||
crate::RoomEventType::MessageRevoked,
|
||||
crate::RoomEventType::MemberJoined,
|
||||
] {
|
||||
let s = variant.as_str();
|
||||
let parsed = crate::RoomEventType::from_str(s);
|
||||
assert_eq!(parsed, Some(variant));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_room_event_type_from_str_unknown() {
|
||||
assert_eq!(crate::RoomEventType::from_str("unknown_event"), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mention_bracket_re_matches_ai_model() {
|
||||
let re = crate::service::mention_bracket_re();
|
||||
let caps: Vec<_> = re.captures_iter("@[ai:550e8400-0000-0000-0000-000000000001:GPT-4]").collect();
|
||||
assert_eq!(caps.len(), 1);
|
||||
assert_eq!(&caps[0][1], "ai");
|
||||
assert_eq!(&caps[0][2], "550e8400-0000-0000-0000-000000000001");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mention_bracket_re_matches_user() {
|
||||
let re = crate::service::mention_bracket_re();
|
||||
let caps: Vec<_> = re.captures_iter("@[user:850e8400-0000-0000-0000-000000000002:John]").collect();
|
||||
assert_eq!(caps.len(), 1);
|
||||
assert_eq!(&caps[0][1], "user");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mention_bracket_re_matches_repo() {
|
||||
let re = crate::service::mention_bracket_re();
|
||||
let caps: Vec<_> = re.captures_iter("@[repo:my-repo:My Repository]").collect();
|
||||
assert_eq!(caps.len(), 1);
|
||||
assert_eq!(&caps[0][1], "repo");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mention_bracket_re_no_match_plain_text() {
|
||||
let re = crate::service::mention_bracket_re();
|
||||
let caps: Vec<_> = re.captures_iter("Hello world").collect();
|
||||
assert_eq!(caps.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mention_multiple_in_same_message() {
|
||||
let re = crate::service::mention_bracket_re();
|
||||
let content = "@[ai:uuid1:Model1] and @[user:uuid2:User2]";
|
||||
let caps: Vec<_> = re.captures_iter(content).collect();
|
||||
assert_eq!(caps.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mention_tag_re_legacy_format() {
|
||||
let re = crate::service::mention_tag_re();
|
||||
let content = r#"<mention type="ai" id="model-uuid">GPT-4</mention>"#;
|
||||
let caps: Vec<_> = re.captures_iter(content).collect();
|
||||
assert_eq!(caps.len(), 1);
|
||||
assert_eq!(&caps[0][1], "ai");
|
||||
assert_eq!(&caps[0][2], "model-uuid");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mention_combined_brackets_and_tags() {
|
||||
let bracket_re = crate::service::mention_bracket_re();
|
||||
let tag_re = crate::service::mention_tag_re();
|
||||
let content = r#"@[ai:uuid1:A] <mention type="ai" id="uuid2">B</mention>"#;
|
||||
assert_eq!(bracket_re.captures_iter(content).count(), 1);
|
||||
assert_eq!(tag_re.captures_iter(content).count(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
@ -21,7 +21,7 @@ pub mod ws_context;
|
||||
pub use connection::{
|
||||
PersistFn, RedisFuture, RoomConnectionManager, cleanup_dedup_cache, extract_get_redis,
|
||||
make_persist_fn, subscribe_project_room_events, subscribe_room_events,
|
||||
subscribe_task_events_fn,
|
||||
subscribe_room_stream_chunk_events, subscribe_task_events_fn,
|
||||
};
|
||||
pub use draft_and_history::{
|
||||
DraftResponse, DraftSaveRequest, MentionNotificationResponse, MessageEditHistoryEntry,
|
||||
|
||||
@ -78,9 +78,15 @@ impl RoomService {
|
||||
.map(|msg| {
|
||||
let sender_type = msg.sender_type.to_string();
|
||||
let display_name = match sender_type.as_str() {
|
||||
"ai" => msg.model_id.and_then(|id| ai_names.get(&id).cloned()),
|
||||
"ai" => msg.model_id.and_then(|id| {
|
||||
ai_names
|
||||
.get(&id)
|
||||
.cloned()
|
||||
.or_else(|| Some(format!("AI({})", &id.to_string()[..8])))
|
||||
}),
|
||||
_ => msg.sender_id.and_then(|id| users.get(&id).cloned()),
|
||||
};
|
||||
let chunked = super::RoomMessageResponse::detect_chunked(&msg.thinking_content);
|
||||
super::RoomMessageResponse {
|
||||
id: msg.id,
|
||||
seq: msg.seq,
|
||||
@ -93,6 +99,7 @@ impl RoomService {
|
||||
content: msg.content,
|
||||
content_type: msg.content_type.to_string(),
|
||||
thinking_content: msg.thinking_content,
|
||||
thinking_is_chunked: chunked,
|
||||
edited_at: msg.edited_at,
|
||||
send_at: msg.send_at,
|
||||
revoked: msg.revoked,
|
||||
@ -185,9 +192,6 @@ impl RoomService {
|
||||
let db = &self.db;
|
||||
let txn = db.begin().await?;
|
||||
|
||||
self.queue.publish(room_id, envelope).await?;
|
||||
self.room_manager.metrics.messages_sent.increment(1);
|
||||
|
||||
let mut room_active: room::ActiveModel = room_model.clone().into();
|
||||
room_active.last_msg_at = Set(now);
|
||||
room_active.update(&txn).await?;
|
||||
@ -224,6 +228,10 @@ impl RoomService {
|
||||
|
||||
txn.commit().await?;
|
||||
|
||||
// Publish to Redis Stream AFTER commit so DB has the data first
|
||||
self.queue.publish(room_id, envelope).await?;
|
||||
self.room_manager.metrics.messages_sent.increment(1);
|
||||
|
||||
// Link uploaded attachments to this message
|
||||
let attachment_ids = request.attachment_ids.clone();
|
||||
if !attachment_ids.is_empty() {
|
||||
@ -320,6 +328,7 @@ impl RoomService {
|
||||
content: request.content,
|
||||
content_type: content_type_str,
|
||||
thinking_content: None,
|
||||
thinking_is_chunked: false,
|
||||
edited_at: None,
|
||||
send_at: now,
|
||||
revoked: None,
|
||||
|
||||
@ -1,8 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use metrics::{describe_counter, describe_gauge, describe_histogram, Counter, Gauge, Histogram, Unit};
|
||||
use std::sync::Arc;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub struct RoomMetrics {
|
||||
@ -24,24 +21,6 @@ pub struct RoomMetrics {
|
||||
pub ws_heartbeat_sent_total: Counter,
|
||||
pub ws_heartbeat_timeout_total: Counter,
|
||||
pub ws_idle_timeout_total: Counter,
|
||||
// Atomic backing for snapshot reads (all values stored as f64 for gauges, u64 for counters)
|
||||
pub _rooms_online_val: AtomicU64,
|
||||
pub _users_online_val: AtomicU64,
|
||||
pub _ws_connections_active_val: AtomicU64,
|
||||
pub _ws_connections_total_val: AtomicU64,
|
||||
pub _ws_disconnections_total_val: AtomicU64,
|
||||
pub _messages_sent_val: AtomicU64,
|
||||
pub _messages_persisted_val: AtomicU64,
|
||||
pub _messages_persist_failed_val: AtomicU64,
|
||||
pub _broadcasts_sent_val: AtomicU64,
|
||||
pub _broadcasts_dropped_val: AtomicU64,
|
||||
pub _duplicates_skipped_val: AtomicU64,
|
||||
pub _redis_publish_failed_val: AtomicU64,
|
||||
pub _ws_rate_limit_hits_val: AtomicU64,
|
||||
pub _ws_auth_failures_val: AtomicU64,
|
||||
pub _ws_heartbeat_sent_total_val: AtomicU64,
|
||||
pub _ws_heartbeat_timeout_total_val: AtomicU64,
|
||||
pub _ws_idle_timeout_total_val: AtomicU64,
|
||||
}
|
||||
|
||||
impl Default for RoomMetrics {
|
||||
@ -150,23 +129,6 @@ impl Default for RoomMetrics {
|
||||
ws_heartbeat_sent_total: metrics::counter!("room_ws_heartbeat_sent_total"),
|
||||
ws_heartbeat_timeout_total: metrics::counter!("room_ws_heartbeat_timeout_total"),
|
||||
ws_idle_timeout_total: metrics::counter!("room_ws_idle_timeout_total"),
|
||||
_rooms_online_val: AtomicU64::new(0),
|
||||
_users_online_val: AtomicU64::new(0),
|
||||
_ws_connections_active_val: AtomicU64::new(0),
|
||||
_ws_connections_total_val: AtomicU64::new(0),
|
||||
_ws_disconnections_total_val: AtomicU64::new(0),
|
||||
_messages_sent_val: AtomicU64::new(0),
|
||||
_messages_persisted_val: AtomicU64::new(0),
|
||||
_messages_persist_failed_val: AtomicU64::new(0),
|
||||
_broadcasts_sent_val: AtomicU64::new(0),
|
||||
_broadcasts_dropped_val: AtomicU64::new(0),
|
||||
_duplicates_skipped_val: AtomicU64::new(0),
|
||||
_redis_publish_failed_val: AtomicU64::new(0),
|
||||
_ws_rate_limit_hits_val: AtomicU64::new(0),
|
||||
_ws_auth_failures_val: AtomicU64::new(0),
|
||||
_ws_heartbeat_sent_total_val: AtomicU64::new(0),
|
||||
_ws_heartbeat_timeout_total_val: AtomicU64::new(0),
|
||||
_ws_idle_timeout_total_val: AtomicU64::new(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -201,33 +163,9 @@ impl RoomMetrics {
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub async fn cleanup_stale_rooms(&self, _active_room_ids: &[Uuid]) {
|
||||
// Per-room metrics are registered on-demand; no cleanup needed.
|
||||
}
|
||||
|
||||
pub fn into_arc(self) -> Arc<RoomMetrics> {
|
||||
Arc::new(self)
|
||||
}
|
||||
|
||||
/// Returns a snapshot of all current gauge and counter values as a flat map.
|
||||
pub fn snapshot(&self) -> HashMap<String, serde_json::Value> {
|
||||
let mut m = HashMap::new();
|
||||
m.insert("room_online_rooms".into(), serde_json::json!(self._rooms_online_val.load(Ordering::Relaxed) as f64));
|
||||
m.insert("room_online_users".into(), serde_json::json!(self._users_online_val.load(Ordering::Relaxed) as f64));
|
||||
m.insert("room_ws_connections_active".into(), serde_json::json!(self._ws_connections_active_val.load(Ordering::Relaxed) as f64));
|
||||
m.insert("room_ws_connections_total".into(), serde_json::json!(self._ws_connections_total_val.load(Ordering::Relaxed)));
|
||||
m.insert("room_ws_disconnections_total".into(), serde_json::json!(self._ws_disconnections_total_val.load(Ordering::Relaxed)));
|
||||
m.insert("room_messages_sent_total".into(), serde_json::json!(self._messages_sent_val.load(Ordering::Relaxed)));
|
||||
m.insert("room_messages_persisted_total".into(), serde_json::json!(self._messages_persisted_val.load(Ordering::Relaxed)));
|
||||
m.insert("room_messages_persist_failed_total".into(), serde_json::json!(self._messages_persist_failed_val.load(Ordering::Relaxed)));
|
||||
m.insert("room_broadcasts_sent_total".into(), serde_json::json!(self._broadcasts_sent_val.load(Ordering::Relaxed)));
|
||||
m.insert("room_broadcasts_dropped_total".into(), serde_json::json!(self._broadcasts_dropped_val.load(Ordering::Relaxed)));
|
||||
m.insert("room_duplicates_skipped_total".into(), serde_json::json!(self._duplicates_skipped_val.load(Ordering::Relaxed)));
|
||||
m.insert("room_redis_publish_failed_total".into(), serde_json::json!(self._redis_publish_failed_val.load(Ordering::Relaxed)));
|
||||
m.insert("room_ws_rate_limit_hits_total".into(), serde_json::json!(self._ws_rate_limit_hits_val.load(Ordering::Relaxed)));
|
||||
m.insert("room_ws_auth_failures_total".into(), serde_json::json!(self._ws_auth_failures_val.load(Ordering::Relaxed)));
|
||||
m.insert("room_ws_heartbeat_sent_total".into(), serde_json::json!(self._ws_heartbeat_sent_total_val.load(Ordering::Relaxed)));
|
||||
m.insert("room_ws_heartbeat_timeout_total".into(), serde_json::json!(self._ws_heartbeat_timeout_total_val.load(Ordering::Relaxed)));
|
||||
m.insert("room_ws_idle_timeout_total".into(), serde_json::json!(self._ws_idle_timeout_total_val.load(Ordering::Relaxed)));
|
||||
m
|
||||
}
|
||||
}
|
||||
|
||||
@ -310,6 +310,7 @@ impl RoomService {
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let chunked = super::RoomMessageResponse::detect_chunked(&msg.thinking_content);
|
||||
super::RoomMessageResponse {
|
||||
id: msg.id,
|
||||
seq: msg.seq,
|
||||
@ -322,6 +323,7 @@ impl RoomService {
|
||||
content: msg.content,
|
||||
content_type: msg.content_type.to_string(),
|
||||
thinking_content: msg.thinking_content,
|
||||
thinking_is_chunked: chunked,
|
||||
edited_at: msg.edited_at,
|
||||
send_at: msg.send_at,
|
||||
revoked: msg.revoked,
|
||||
|
||||
@ -3,8 +3,8 @@ use crate::service::RoomService;
|
||||
use crate::ws_context::WsUserContext;
|
||||
use chrono::Utc;
|
||||
use models::rooms::{
|
||||
RoomMemberRole, room, room_ai, room_category, room_member, room_message, room_pin, room_thread,
|
||||
room_message_reaction, room_message_edit_history, room_notifications,
|
||||
RoomMemberRole, room, room_ai, room_attachment, room_category, room_member, room_message,
|
||||
room_message_edit_history, room_message_reaction, room_notifications, room_pin, room_thread,
|
||||
};
|
||||
use models::projects::{project_members, MemberRole as Role};
|
||||
use queue::ProjectRoomEvent;
|
||||
@ -12,8 +12,9 @@ use sea_orm::*;
|
||||
use uuid::Uuid;
|
||||
|
||||
impl RoomService {
|
||||
/// Cache TTL for room list (in seconds).
|
||||
const ROOM_LIST_CACHE_TTL: u64 = 60;
|
||||
/// Cache TTL for room list (in seconds). Kept short to avoid
|
||||
/// stale data without needing expensive SCAN-based invalidation.
|
||||
const ROOM_LIST_CACHE_TTL: u64 = 15;
|
||||
|
||||
pub async fn room_list(
|
||||
&self,
|
||||
@ -226,7 +227,10 @@ impl RoomService {
|
||||
Some(room_model.id),
|
||||
);
|
||||
|
||||
Ok(super::RoomResponse::from(room_model))
|
||||
let version = Self::raw_increment_room_version(&self.cache, room_model.id).await?;
|
||||
let mut resp = super::RoomResponse::from(room_model);
|
||||
resp.version = version;
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
pub async fn room_get(
|
||||
@ -237,7 +241,10 @@ impl RoomService {
|
||||
let user_id = ctx.user_id;
|
||||
let model = self.find_room_or_404(room_id).await?;
|
||||
self.ensure_room_visible_for_user(&model, user_id).await?;
|
||||
Ok(super::RoomResponse::from(model))
|
||||
let version = self.get_room_version(room_id).await?;
|
||||
let mut resp = super::RoomResponse::from(model);
|
||||
resp.version = version;
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
pub async fn room_update(
|
||||
@ -312,7 +319,10 @@ impl RoomService {
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(super::RoomResponse::from(updated))
|
||||
let version = self.increment_room_version(room_id).await?;
|
||||
let mut resp = super::RoomResponse::from(updated);
|
||||
resp.version = version;
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
pub async fn room_delete(&self, room_id: Uuid, ctx: &WsUserContext) -> Result<(), RoomError> {
|
||||
@ -323,6 +333,11 @@ impl RoomService {
|
||||
|
||||
let txn = self.db.begin().await?;
|
||||
|
||||
room_attachment::Entity::delete_many()
|
||||
.filter(room_attachment::Column::Room.eq(room_id))
|
||||
.exec(&txn)
|
||||
.await?;
|
||||
|
||||
room_message::Entity::delete_many()
|
||||
.filter(room_message::Column::Room.eq(room_id))
|
||||
.exec(&txn)
|
||||
@ -416,48 +431,9 @@ impl RoomService {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Invalidate all room list cache entries for a project.
|
||||
/// Cache entries expire after ROOM_LIST_CACHE_TTL seconds.
|
||||
/// No explicit invalidation needed — the short TTL handles staleness.
|
||||
async fn invalidate_room_list_cache(&self, project_id: Uuid) {
|
||||
let pattern = format!("room:list:{}:*", project_id);
|
||||
if let Ok(mut conn) = self.cache.conn().await {
|
||||
// Use SCAN to find matching keys, then DELETE them
|
||||
let mut cursor: u64 = 0;
|
||||
loop {
|
||||
let (new_cursor, keys): (u64, Vec<String>) = match redis::cmd("SCAN")
|
||||
.arg(cursor)
|
||||
.arg("MATCH")
|
||||
.arg(&pattern)
|
||||
.arg("COUNT")
|
||||
.arg(100)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "invalidate_room_list_cache: SCAN failed");
|
||||
break;
|
||||
}
|
||||
};
|
||||
cursor = new_cursor;
|
||||
|
||||
if !keys.is_empty() {
|
||||
// Delete keys in batches
|
||||
let keys_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
|
||||
if let Err(e) = redis::cmd("DEL")
|
||||
.arg(&keys_refs)
|
||||
.query_async::<i64>(&mut conn)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(error = %e, "invalidate_room_list_cache: DEL failed");
|
||||
} else {
|
||||
tracing::debug!(keys_count = keys.len(), "invalidate_room_list_cache: deleted");
|
||||
}
|
||||
}
|
||||
|
||||
if cursor == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::debug!(project_id = %project_id, "room_list cache: relying on TTL expiry");
|
||||
}
|
||||
}
|
||||
|
||||
@ -135,8 +135,24 @@ pub async fn acquire_room_ai_lock(
|
||||
tracing::warn!(
|
||||
room_id = %room_id,
|
||||
elapsed_ms = start.elapsed().as_millis(),
|
||||
"RoomAiLock: timeout waiting for lock"
|
||||
"RoomAiLock: timeout waiting for lock, cleaning up"
|
||||
);
|
||||
// Clean up our own ZSET entry and ticket to prevent ZSET leak
|
||||
if let Ok(mut conn) = cache.conn().await {
|
||||
let _: i32 = redis::cmd("ZREM")
|
||||
.arg(&queue_key)
|
||||
.arg(&request_uid)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.inspect_err(|e| tracing::warn!(error = %e, "timeout ZREM failed"))
|
||||
.unwrap_or(0);
|
||||
let _: i32 = redis::cmd("DEL")
|
||||
.arg(&ticket_key)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.inspect_err(|e| tracing::warn!(error = %e, "timeout DEL ticket failed"))
|
||||
.unwrap_or(0);
|
||||
}
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@ -183,6 +199,28 @@ pub async fn acquire_room_ai_lock(
|
||||
acquired: true,
|
||||
}));
|
||||
}
|
||||
|
||||
// Lock exists — check if it's stale (previous owner crashed).
|
||||
// PTTL returns -2 if key does not exist, -1 if no expiry,
|
||||
// or remaining TTL in ms if still alive.
|
||||
let pttl: i64 = redis::cmd("PTTL")
|
||||
.arg(&lock_key)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.map_err(|e| RoomError::Internal(format!("PTTL: {}", e)))?;
|
||||
|
||||
if pttl == -1 {
|
||||
// Key exists but has no expiry — should not happen with PX, force delete
|
||||
tracing::warn!(
|
||||
lock_key = %lock_key,
|
||||
"RoomAiLock: lock exists without TTL, force releasing"
|
||||
);
|
||||
let _: i32 = redis::cmd("DEL")
|
||||
.arg(&lock_key)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.map_err(|e| RoomError::Internal(format!("DEL stale lock: {}", e)))?;
|
||||
}
|
||||
} else {
|
||||
let head_ticket_key = format!("ai:room:queue:ticket:{}:{}", room_id, head_uid);
|
||||
let head_exists: i32 = redis::cmd("EXISTS")
|
||||
|
||||
@ -156,6 +156,10 @@ pub struct RoomResponse {
|
||||
pub last_msg_at: DateTime<Utc>,
|
||||
#[serde(default)]
|
||||
pub unread_count: i64,
|
||||
/// Monotonically increasing version for conflict detection.
|
||||
/// Incremented on every room mutation (rename, move, delete).
|
||||
#[serde(default)]
|
||||
pub version: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, utoipa::ToSchema)]
|
||||
@ -236,8 +240,14 @@ pub struct RoomMessageResponse {
|
||||
pub content: String,
|
||||
pub content_type: String,
|
||||
/// Accumulated AI reasoning/thinking text.
|
||||
/// When `thinking_is_chunked` is true, this is a JSON string with
|
||||
/// `{"__chunks__": [{"type":"thinking|answer|tool_call","content":"..."},...]}`.
|
||||
/// When false, this is plain text.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub thinking_content: Option<String>,
|
||||
/// Indicates `thinking_content` contains JSON chunks (true) or plain text (false).
|
||||
#[serde(skip_serializing_if = "std::ops::Not::not")]
|
||||
pub thinking_is_chunked: bool,
|
||||
pub edited_at: Option<DateTime<Utc>>,
|
||||
pub send_at: DateTime<Utc>,
|
||||
pub revoked: Option<DateTime<Utc>>,
|
||||
@ -256,6 +266,13 @@ pub struct RoomMessageSearchResult {
|
||||
pub message: RoomMessageResponse,
|
||||
}
|
||||
|
||||
impl RoomMessageResponse {
|
||||
/// Detect if `thinking_content` stores JSON chunks (vs plain text).
|
||||
pub fn detect_chunked(thinking: &Option<String>) -> bool {
|
||||
thinking.as_ref().is_some_and(|s| s.contains("\"__chunks__\""))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]
|
||||
pub struct RoomMessageListResponse {
|
||||
pub messages: Vec<RoomMessageResponse>,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user