refactor(room): apply rustfmt formatting

This commit is contained in:
ZhenYi 2026-05-14 10:02:21 +08:00
parent 52e1831452
commit 18ea3cc355
42 changed files with 1085 additions and 502 deletions

View File

@ -32,10 +32,8 @@ impl RoomService {
.all(&self.db)
.await?;
let model_names: std::collections::HashMap<Uuid, String> = models
.into_iter()
.map(|m| (m.id, m.name))
.collect();
let model_names: std::collections::HashMap<Uuid, String> =
models.into_iter().map(|m| (m.id, m.name)).collect();
let mut responses = Vec::with_capacity(configs.len());
for config in configs {
@ -178,11 +176,7 @@ impl RoomService {
Ok(())
}
pub async fn room_ai_stop(
&self,
room_id: Uuid,
ctx: &WsUserContext,
) -> Result<(), RoomError> {
pub async fn room_ai_stop(&self, room_id: Uuid, ctx: &WsUserContext) -> Result<(), RoomError> {
let user_id = ctx.user_id;
self.require_room_access(room_id, user_id).await?;
tracing::info!(%room_id, %user_id, "AI stream stop requested");

View File

@ -1,5 +1,5 @@
use uuid::Uuid;
use tokio::sync::broadcast;
use uuid::Uuid;
use super::{RoomConnectionManager, SHUTDOWN_CHANNEL_CAPACITY};

View File

@ -8,14 +8,14 @@ mod stream;
mod typing;
mod user_ops;
use std::sync::Arc;
use std::collections::HashMap;
use uuid::Uuid;
use std::sync::Arc;
use tokio::sync::{RwLock, broadcast};
use uuid::Uuid;
use db::cache::AppCache;
use crate::metrics::RoomMetrics;
use crate::types::NotificationEvent;
use db::cache::AppCache;
use queue::types::TypingEvent;
use queue::{AgentTaskEvent, ProjectRoomEvent, RoomMessageEvent, RoomMessageStreamChunkEvent};
@ -27,6 +27,8 @@ pub const CONNECTION_COOLDOWN: Duration = Duration::from_secs(30);
pub const MAX_CONNECTIONS_PER_ROOM: usize = 50000;
pub const MAX_CONNECTIONS_PER_PROJECT: usize = 50000;
pub const MAX_CONNECTIONS_PER_USER: usize = 50000;
/// Maximum rooms a single WS session can subscribe to.
pub const MAX_ROOMS_PER_SESSION: usize = 100;
pub const BATCH_SIZE: usize = 100;
pub const ROOM_IDLE_TIMEOUT: Duration = Duration::from_secs(30 * 60);
pub const REPLAY_BUFFER_SIZE: usize = 100;
@ -56,7 +58,8 @@ pub struct RoomConnectionManager {
project_shutdown_txs: RwLock<HashMap<Uuid, broadcast::Sender<()>>>,
user_shutdown_txs: RwLock<HashMap<Uuid, broadcast::Sender<()>>>,
stream_inner: RwLock<HashMap<Uuid, broadcast::Sender<Arc<RoomMessageStreamChunkEvent>>>>,
room_stream_inner: RwLock<HashMap<Uuid, (broadcast::Sender<Arc<RoomMessageStreamChunkEvent>>, usize)>>,
room_stream_inner:
RwLock<HashMap<Uuid, (broadcast::Sender<Arc<RoomMessageStreamChunkEvent>>, usize)>>,
/// Active AI streams keyed by message_id. Used to replay buffered chunks to
/// late-joining subscribers who missed the stream start.
active_streams: RwLock<HashMap<Uuid, ActiveStreamMeta>>,
@ -102,11 +105,19 @@ impl RoomConnectionManager {
}
pub use persist::{DedupCache, PersistFn, cleanup_dedup_cache, make_persist_fn};
pub use pubsub::{subscribe_room_events, subscribe_room_stream_chunk_events, subscribe_project_room_events, subscribe_task_events_fn};
pub use pubsub::{
subscribe_project_room_events, subscribe_room_events, subscribe_room_stream_chunk_events,
subscribe_task_events_fn,
};
/// Extract a Redis connection getter from MessageProducer.
/// Used for cache operations (notification unread count, etc.), NOT for message broadcasting.
pub type RedisFuture = std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<deadpool_redis::cluster::Connection>> + Send>>;
pub type RedisFuture = std::pin::Pin<
Box<
dyn std::future::Future<Output = anyhow::Result<deadpool_redis::cluster::Connection>>
+ Send,
>,
>;
pub fn extract_get_redis(
queue: queue::MessageProducer,

View File

@ -233,6 +233,7 @@ async fn embed_persisted_messages(
let project_name = room_project.get(&m.room_id)?;
Some(agent::embed::EmbedMemoryInput {
message_id: m.id.to_string(),
seq: m.seq,
content: m.content.clone(),
project_name: project_name.clone(),
room_id: m.room_id.to_string(),

View File

@ -1,12 +1,19 @@
use std::sync::Arc;
use uuid::Uuid;
use tokio::sync::broadcast;
use uuid::Uuid;
use super::{
AgentTaskEvent, BROADCAST_CAPACITY, MAX_CONNECTIONS_PER_PROJECT, ProjectRoomEvent,
RoomConnectionManager,
};
use crate::error::RoomError;
use super::{RoomConnectionManager, ProjectRoomEvent, AgentTaskEvent, BROADCAST_CAPACITY, MAX_CONNECTIONS_PER_PROJECT};
impl RoomConnectionManager {
pub async fn subscribe_project(&self, project_id: Uuid, _user_id: Uuid) -> Result<broadcast::Receiver<Arc<ProjectRoomEvent>>, RoomError> {
pub async fn subscribe_project(
&self,
project_id: Uuid,
_user_id: Uuid,
) -> Result<broadcast::Receiver<Arc<ProjectRoomEvent>>, RoomError> {
let mut map = self.project_inner.write().await;
if map.get(&project_id).is_some() {
drop(map);
@ -68,7 +75,10 @@ impl RoomConnectionManager {
}
}
pub async fn subscribe_task_events(&self, project_id: Uuid) -> Result<broadcast::Receiver<Arc<AgentTaskEvent>>, RoomError> {
pub async fn subscribe_task_events(
&self,
project_id: Uuid,
) -> Result<broadcast::Receiver<Arc<AgentTaskEvent>>, RoomError> {
let mut map = self.task_inner.write().await;
if let Some(sender) = map.get(&project_id).cloned() {

View File

@ -87,12 +87,7 @@ async fn consume_room_broadcast(
let mut count = 0usize;
while count < 100 {
match tokio::time::timeout(
std::time::Duration::from_millis(200),
messages.next(),
)
.await
{
match tokio::time::timeout(std::time::Duration::from_millis(200), messages.next()).await {
Ok(Some(Ok(msg))) => {
match serde_json::from_slice::<RoomMessageEvent>(&msg.payload) {
Ok(event) => {
@ -192,12 +187,7 @@ async fn consume_chunk_broadcast(
let mut count = 0usize;
while count < 100 {
match tokio::time::timeout(
std::time::Duration::from_millis(200),
messages.next(),
)
.await
{
match tokio::time::timeout(std::time::Duration::from_millis(200), messages.next()).await {
Ok(Some(Ok(msg))) => {
match serde_json::from_slice::<RoomMessageStreamChunkEvent>(&msg.payload) {
Ok(event) => {
@ -317,4 +307,4 @@ pub async fn subscribe_task_events_fn(
}
}
tracing::info!(project_id = %project_id, "task events subscriber stopped");
}
}

View File

@ -1,11 +1,17 @@
use std::time::Instant;
use uuid::Uuid;
use super::{
CONNECTION_COOLDOWN, MAX_CONNECTIONS_PER_ROOM, ROOM_IDLE_TIMEOUT, RoomConnectionManager,
};
use crate::error::RoomError;
use super::{RoomConnectionManager, CONNECTION_COOLDOWN, MAX_CONNECTIONS_PER_ROOM, ROOM_IDLE_TIMEOUT};
impl RoomConnectionManager {
pub async fn check_room_connection_rate(&self, room_id: Uuid, user_id: Uuid) -> Result<(), RoomError> {
pub async fn check_room_connection_rate(
&self,
room_id: Uuid,
user_id: Uuid,
) -> Result<(), RoomError> {
let mut map = self.connection_rate.write().await;
let key = (room_id, user_id);
if let Some(last) = map.get(&key) {
@ -21,7 +27,11 @@ impl RoomConnectionManager {
Ok(())
}
pub async fn check_project_connection_rate(&self, project_id: Uuid, user_id: Uuid) -> Result<(), RoomError> {
pub async fn check_project_connection_rate(
&self,
project_id: Uuid,
user_id: Uuid,
) -> Result<(), RoomError> {
let mut map = self.connection_rate.write().await;
let key = (project_id, user_id);
if let Some(last) = map.get(&key) {
@ -61,7 +71,11 @@ impl RoomConnectionManager {
let mut entries: Vec<_> = map.iter().collect();
entries.sort_by(|a, b| a.1.cmp(b.1));
let keep_count = entries.len() / 2;
let to_remove: Vec<_> = entries.into_iter().take(keep_count).map(|(k, _)| *k).collect();
let to_remove: Vec<_> = entries
.into_iter()
.take(keep_count)
.map(|(k, _)| *k)
.collect();
for key in to_remove {
map.remove(&key);
}

View File

@ -1,12 +1,18 @@
use std::sync::Arc;
use uuid::Uuid;
use tokio::sync::broadcast;
use uuid::Uuid;
use super::{
BROADCAST_CAPACITY, MAX_CONNECTIONS_PER_ROOM, RoomConnectionManager, RoomMessageEvent,
};
use crate::error::RoomError;
use super::{RoomConnectionManager, RoomMessageEvent, BROADCAST_CAPACITY, MAX_CONNECTIONS_PER_ROOM};
impl RoomConnectionManager {
pub async fn subscribe(&self, room_id: Uuid, _user_id: Uuid) -> Result<broadcast::Receiver<Arc<RoomMessageEvent>>, RoomError> {
pub async fn subscribe(
&self,
room_id: Uuid,
_user_id: Uuid,
) -> Result<broadcast::Receiver<Arc<RoomMessageEvent>>, RoomError> {
let mut map = self.room_inner.write().await;
if let Some(_sender) = map.get(&room_id) {
drop(map);
@ -16,7 +22,9 @@ impl RoomConnectionManager {
if let Some(sender) = map.get(&room_id) {
return Ok(sender.subscribe());
}
return Err(RoomError::Internal("room disappeared during subscribe".into()));
return Err(RoomError::Internal(
"room disappeared during subscribe".into(),
));
}
if map.len() >= MAX_CONNECTIONS_PER_ROOM {

View File

@ -1,11 +1,18 @@
use std::sync::Arc;
use tokio::sync::{RwLock, broadcast};
use uuid::Uuid;
use tokio::sync::{broadcast, RwLock};
use super::{RoomConnectionManager, RoomMessageStreamChunkEvent, BROADCAST_CAPACITY, REPLAY_BUFFER_SIZE};
use super::{
BROADCAST_CAPACITY, REPLAY_BUFFER_SIZE, RoomConnectionManager, RoomMessageStreamChunkEvent,
};
impl RoomConnectionManager {
pub async fn register_stream_channel(&self, message_id: Uuid, room_id: Uuid, display_name: Option<String>) -> broadcast::Receiver<Arc<RoomMessageStreamChunkEvent>> {
pub async fn register_stream_channel(
&self,
message_id: Uuid,
room_id: Uuid,
display_name: Option<String>,
) -> broadcast::Receiver<Arc<RoomMessageStreamChunkEvent>> {
let mut map = self.stream_inner.write().await;
if let Some(tx) = map.get(&message_id) {
return tx.subscribe();
@ -27,12 +34,18 @@ impl RoomConnectionManager {
rx
}
pub async fn subscribe_stream(&self, message_id: Uuid) -> Option<broadcast::Receiver<Arc<RoomMessageStreamChunkEvent>>> {
pub async fn subscribe_stream(
&self,
message_id: Uuid,
) -> Option<broadcast::Receiver<Arc<RoomMessageStreamChunkEvent>>> {
let map = self.stream_inner.read().await;
map.get(&message_id).map(|tx| tx.subscribe())
}
pub async fn subscribe_room_stream(&self, room_id: Uuid) -> broadcast::Receiver<Arc<RoomMessageStreamChunkEvent>> {
pub async fn subscribe_room_stream(
&self,
room_id: Uuid,
) -> broadcast::Receiver<Arc<RoomMessageStreamChunkEvent>> {
// New subscriber: replay active streams in this room so they catch up,
// then subscribe to the room's channel.
@ -47,7 +60,9 @@ impl RoomConnectionManager {
// Replay buffered chunks to existing channel so all subscribers receive them.
let active = self.active_streams.read().await;
for (&msg_id, meta) in active.iter() {
if meta.room_id != room_id { continue; }
if meta.room_id != room_id {
continue;
}
let start_event = Arc::new(RoomMessageStreamChunkEvent {
message_id: msg_id,
room_id,
@ -73,7 +88,9 @@ impl RoomConnectionManager {
// Replay buffered chunks to new channel.
let active = self.active_streams.read().await;
for (&msg_id, meta) in active.iter() {
if meta.room_id != room_id { continue; }
if meta.room_id != room_id {
continue;
}
let start_event = Arc::new(RoomMessageStreamChunkEvent {
message_id: msg_id,
room_id,
@ -121,7 +138,9 @@ impl RoomConnectionManager {
// Also update room_to_streams reverse index.
if is_start {
let mut r2s = self.room_to_streams.write().await;
r2s.entry(event.room_id).or_default().insert(event.message_id);
r2s.entry(event.room_id)
.or_default()
.insert(event.message_id);
}
}
@ -182,7 +201,10 @@ impl RoomConnectionManager {
}
}
pub async fn register_stream_cancel(&self, room_id: Uuid) -> Arc<std::sync::atomic::AtomicBool> {
pub async fn register_stream_cancel(
&self,
room_id: Uuid,
) -> Arc<std::sync::atomic::AtomicBool> {
let cancel = Arc::new(std::sync::atomic::AtomicBool::new(false));
let mut map = self.stream_cancel_tokens.write().await;
map.insert(room_id, cancel.clone());

View File

@ -1,13 +1,16 @@
use std::sync::Arc;
use uuid::Uuid;
use tokio::sync::{RwLockReadGuard, RwLockWriteGuard, broadcast};
use queue::types::TypingEvent;
use std::sync::Arc;
use tokio::sync::{RwLockReadGuard, RwLockWriteGuard, broadcast};
use uuid::Uuid;
use super::{RoomConnectionManager, BROADCAST_CAPACITY};
use super::{BROADCAST_CAPACITY, RoomConnectionManager};
impl RoomConnectionManager {
pub async fn subscribe_typing(&self, room_id: Uuid) -> broadcast::Receiver<Arc<TypingEvent>> {
let mut map: RwLockWriteGuard<'_, std::collections::HashMap<Uuid, broadcast::Sender<Arc<TypingEvent>>>> = self.typing_inner.write().await;
let mut map: RwLockWriteGuard<
'_,
std::collections::HashMap<Uuid, broadcast::Sender<Arc<TypingEvent>>>,
> = self.typing_inner.write().await;
let tx = map.entry(room_id).or_insert_with(|| {
let (tx, _) = broadcast::channel(BROADCAST_CAPACITY);
tx
@ -24,7 +27,10 @@ impl RoomConnectionManager {
let action = event.action.clone();
let username = event.username.clone();
let avatar_url = event.avatar_url.clone();
let sender_type = event.sender_type.clone().unwrap_or_else(|| "user".to_string());
let sender_type = event
.sender_type
.clone()
.unwrap_or_else(|| "user".to_string());
if let Ok(mut conn) = self.cache.conn().await {
let key = user_key;
@ -46,7 +52,10 @@ impl RoomConnectionManager {
}
}
let map: RwLockReadGuard<'_, std::collections::HashMap<Uuid, broadcast::Sender<Arc<TypingEvent>>>> = self.typing_inner.read().await;
let map: RwLockReadGuard<
'_,
std::collections::HashMap<Uuid, broadcast::Sender<Arc<TypingEvent>>>,
> = self.typing_inner.read().await;
if let Some(tx) = map.get(&room_id) {
let event = Arc::new(event);
let _ = tx.send(event);
@ -85,17 +94,31 @@ impl RoomConnectionManager {
let parts: Vec<&str> = key.split(':').collect();
let user_id = parts.get(2).and_then(|s| Uuid::parse_str(s).ok());
if let (Some(value), Some(user_uuid)) = (
redis::cmd("GET").arg(&key).query_async::<String>(&mut conn).await.ok(),
redis::cmd("GET")
.arg(&key)
.query_async::<String>(&mut conn)
.await
.ok(),
user_id,
) {
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&value) {
results.push(TypingEvent {
room_id,
user_id: user_uuid,
username: parsed.get("username").and_then(|v| v.as_str()).unwrap_or("").to_string(),
avatar_url: parsed.get("avatar_url").and_then(|v| v.as_str()).map(String::from),
username: parsed
.get("username")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
avatar_url: parsed
.get("avatar_url")
.and_then(|v| v.as_str())
.map(String::from),
action: "start".to_string(),
sender_type: parsed.get("sender_type").and_then(|v| v.as_str()).map(String::from),
sender_type: parsed
.get("sender_type")
.and_then(|v| v.as_str())
.map(String::from),
});
}
}

View File

@ -1,13 +1,18 @@
use std::sync::Arc;
use uuid::Uuid;
use tokio::sync::broadcast;
use uuid::Uuid;
use super::{
BROADCAST_CAPACITY, MAX_CONNECTIONS_PER_USER, ProjectRoomEvent, RoomConnectionManager,
};
use crate::error::RoomError;
use crate::types::NotificationEvent;
use super::{RoomConnectionManager, ProjectRoomEvent, BROADCAST_CAPACITY, MAX_CONNECTIONS_PER_USER};
impl RoomConnectionManager {
pub async fn subscribe_user(&self, user_id: Uuid) -> Result<broadcast::Receiver<Arc<ProjectRoomEvent>>, RoomError> {
pub async fn subscribe_user(
&self,
user_id: Uuid,
) -> Result<broadcast::Receiver<Arc<ProjectRoomEvent>>, RoomError> {
let mut map = self.user_inner.write().await;
if let Some(_sender) = map.get(&user_id) {
@ -62,7 +67,10 @@ impl RoomConnectionManager {
}
}
pub async fn subscribe_user_notification(&self, user_id: Uuid) -> broadcast::Receiver<Arc<NotificationEvent>> {
pub async fn subscribe_user_notification(
&self,
user_id: Uuid,
) -> broadcast::Receiver<Arc<NotificationEvent>> {
let mut map = self.user_notification_inner.write().await;
if let Some(sender) = map.get(&user_id) {
return sender.subscribe();

View File

@ -142,10 +142,7 @@ impl RoomService {
};
// Batch fetch room names to avoid N+1 queries
let room_ids: Vec<Uuid> = notifications
.iter()
.filter_map(|n| n.room)
.collect();
let room_ids: Vec<Uuid> = notifications.iter().filter_map(|n| n.room).collect();
let rooms: std::collections::HashMap<Uuid, String> = if !room_ids.is_empty() {
models::rooms::room::Entity::find()
.filter(models::rooms::room::Column::Id.is_in(room_ids))
@ -233,10 +230,14 @@ impl RoomService {
) -> Result<DraftResponse, RoomError> {
let user_id = ctx.user_id;
self.require_room_access(room_id, user_id).await?;
let key = format!("room:{}:draft:{}", room_id, user_id);
let mut conn = self.cache.conn().await.map_err(|e| RoomError::Internal(e.to_string()))?;
let mut conn = self
.cache
.conn()
.await
.map_err(|e| RoomError::Internal(e.to_string()))?;
let now = Utc::now();
deadpool_redis::redis::cmd("SETEX")
.arg(&key)
@ -245,7 +246,7 @@ impl RoomService {
.query_async::<()>(&mut conn)
.await
.map_err(|e| RoomError::Internal(e.to_string()))?;
Ok(DraftResponse {
room_id,
content,
@ -253,23 +254,23 @@ impl RoomService {
})
}
pub async fn draft_clear(
&self,
room_id: Uuid,
ctx: &WsUserContext,
) -> Result<(), RoomError> {
pub async fn draft_clear(&self, room_id: Uuid, ctx: &WsUserContext) -> Result<(), RoomError> {
let user_id = ctx.user_id;
self.require_room_access(room_id, user_id).await?;
let key = format!("room:{}:draft:{}", room_id, user_id);
let mut conn = self.cache.conn().await.map_err(|e| RoomError::Internal(e.to_string()))?;
let mut conn = self
.cache
.conn()
.await
.map_err(|e| RoomError::Internal(e.to_string()))?;
deadpool_redis::redis::cmd("DEL")
.arg(&key)
.query_async::<()>(&mut conn)
.await
.map_err(|e| RoomError::Internal(e.to_string()))?;
Ok(())
}
}

View File

@ -45,13 +45,25 @@ impl RoomService {
let chunked = super::RoomMessageResponse::detect_chunked(&msg.thinking_content);
super::RoomMessageResponse {
id: msg.id, seq: msg.seq, room: msg.room, sender_type,
sender_id: msg.sender_id, display_name, thread: msg.thread,
content: msg.content, content_type: msg.content_type.to_string(),
thinking_content: msg.thinking_content, thinking_is_chunked: chunked,
edited_at: msg.edited_at, send_at: msg.send_at, revoked: msg.revoked,
revoked_by: msg.revoked_by, in_reply_to: msg.in_reply_to,
highlighted_content: None, attachment_ids: Vec::new(), reactions: Vec::new(),
id: msg.id,
seq: msg.seq,
room: msg.room,
sender_type,
sender_id: msg.sender_id,
display_name,
thread: msg.thread,
content: msg.content,
content_type: msg.content_type.to_string(),
thinking_content: msg.thinking_content,
thinking_is_chunked: chunked,
edited_at: msg.edited_at,
send_at: msg.send_at,
revoked: msg.revoked,
revoked_by: msg.revoked_by,
in_reply_to: msg.in_reply_to,
highlighted_content: None,
attachment_ids: Vec::new(),
reactions: Vec::new(),
}
}
}

View File

@ -5,22 +5,46 @@ mod tests {
#[test]
fn test_parse_message_content_type_valid() {
assert!(matches!(RoomService::parse_message_content_type(Some("text".into())).unwrap(), MessageContentType::Text));
assert!(matches!(RoomService::parse_message_content_type(Some("image".into())).unwrap(), MessageContentType::Image));
assert!(matches!(RoomService::parse_message_content_type(Some("audio".into())).unwrap(), MessageContentType::Audio));
assert!(matches!(RoomService::parse_message_content_type(Some("video".into())).unwrap(), MessageContentType::Video));
assert!(matches!(RoomService::parse_message_content_type(Some("file".into())).unwrap(), MessageContentType::File));
assert!(matches!(
RoomService::parse_message_content_type(Some("text".into())).unwrap(),
MessageContentType::Text
));
assert!(matches!(
RoomService::parse_message_content_type(Some("image".into())).unwrap(),
MessageContentType::Image
));
assert!(matches!(
RoomService::parse_message_content_type(Some("audio".into())).unwrap(),
MessageContentType::Audio
));
assert!(matches!(
RoomService::parse_message_content_type(Some("video".into())).unwrap(),
MessageContentType::Video
));
assert!(matches!(
RoomService::parse_message_content_type(Some("file".into())).unwrap(),
MessageContentType::File
));
}
#[test]
fn test_parse_message_content_type_case_insensitive() {
assert!(matches!(RoomService::parse_message_content_type(Some("TEXT".into())).unwrap(), MessageContentType::Text));
assert!(matches!(RoomService::parse_message_content_type(Some("Image".into())).unwrap(), MessageContentType::Image));
assert!(matches!(
RoomService::parse_message_content_type(Some("TEXT".into())).unwrap(),
MessageContentType::Text
));
assert!(matches!(
RoomService::parse_message_content_type(Some("Image".into())).unwrap(),
MessageContentType::Image
));
}
#[test]
fn test_parse_message_content_type_none_defaults_to_text() {
assert!(matches!(RoomService::parse_message_content_type(None).unwrap(), MessageContentType::Text));
assert!(matches!(
RoomService::parse_message_content_type(None).unwrap(),
MessageContentType::Text
));
}
#[test]
@ -118,7 +142,9 @@ mod tests {
#[test]
fn test_mention_bracket_re_matches_ai_model() {
let re = crate::service::mention_bracket_re();
let caps: Vec<_> = re.captures_iter("@[ai:550e8400-0000-0000-0000-000000000001:GPT-4]").collect();
let caps: Vec<_> = re
.captures_iter("@[ai:550e8400-0000-0000-0000-000000000001:GPT-4]")
.collect();
assert_eq!(caps.len(), 1);
assert_eq!(&caps[0][1], "ai");
assert_eq!(&caps[0][2], "550e8400-0000-0000-0000-000000000001");
@ -127,7 +153,9 @@ mod tests {
#[test]
fn test_mention_bracket_re_matches_user() {
let re = crate::service::mention_bracket_re();
let caps: Vec<_> = re.captures_iter("@[user:850e8400-0000-0000-0000-000000000002:John]").collect();
let caps: Vec<_> = re
.captures_iter("@[user:850e8400-0000-0000-0000-000000000002:John]")
.collect();
assert_eq!(caps.len(), 1);
assert_eq!(&caps[0][1], "user");
}
@ -173,4 +201,4 @@ mod tests {
assert_eq!(bracket_re.captures_iter(content).count(), 1);
assert_eq!(tag_re.captures_iter(content).count(), 1);
}
}
}

View File

@ -26,11 +26,10 @@ pub mod types;
pub mod types_responses;
pub mod ws_context;
pub use presence::PresenceStore;
pub use connection::{
PersistFn, RoomConnectionManager, cleanup_dedup_cache, extract_get_redis,
make_persist_fn, subscribe_project_room_events, subscribe_room_events,
subscribe_room_stream_chunk_events, subscribe_task_events_fn,
PersistFn, RoomConnectionManager, cleanup_dedup_cache, extract_get_redis, make_persist_fn,
subscribe_project_room_events, subscribe_room_events, subscribe_room_stream_chunk_events,
subscribe_task_events_fn,
};
pub use draft_and_history::{
DraftResponse, DraftSaveRequest, MentionNotificationResponse, MessageEditHistoryEntry,
@ -38,7 +37,8 @@ pub use draft_and_history::{
};
pub use error::RoomError;
pub use metrics::RoomMetrics;
pub use presence::PresenceStore;
pub use reaction::{MessageReactionsResponse, MessageSearchResponse};
pub use service::{RoomService, PushNotificationFn};
pub use service::{PushNotificationFn, RoomService};
pub use types::{RoomEventType, *};
pub use types_responses::*;

View File

@ -38,7 +38,10 @@ impl RoomService {
.all(&self.db)
.await?;
for m in project_admins {
if matches!(m.scope_role(), Ok(models::projects::MemberRole::Owner | models::projects::MemberRole::Admin)) {
if matches!(
m.scope_role(),
Ok(models::projects::MemberRole::Owner | models::projects::MemberRole::Admin)
) {
if !user_ids.contains(&m.user) {
user_ids.push(m.user);
}
@ -66,7 +69,16 @@ impl RoomService {
.all(&self.db)
.await?
.into_iter()
.map(|u| (u.uid, super::UserInfo { uid: u.uid, username: u.username, avatar_url: u.avatar_url }))
.map(|u| {
(
u.uid,
super::UserInfo {
uid: u.uid,
username: u.username,
avatar_url: u.avatar_url,
},
)
})
.collect()
} else {
std::collections::HashMap::new()
@ -89,29 +101,42 @@ impl RoomService {
.await?;
let role_map: std::collections::HashMap<Uuid, String> = project_member_list
.into_iter()
.map(|m| (m.user, m.scope_role().map(|r| r.to_string()).unwrap_or_else(|_| "member".to_string())))
.map(|m| {
(
m.user,
m.scope_role()
.map(|r| r.to_string())
.unwrap_or_else(|_| "member".to_string()),
)
})
.collect();
let participants = members.into_iter().map(|user_id| {
let user_info = users.get(&user_id).cloned();
let state = state_map.get(&(room_id, user_id));
let project_role = role_map.get(&user_id).cloned().unwrap_or_else(|| "member".to_string());
let is_room_owner = room_model.created_by == user_id;
let participants = members
.into_iter()
.map(|user_id| {
let user_info = users.get(&user_id).cloned();
let state = state_map.get(&(room_id, user_id));
let project_role = role_map
.get(&user_id)
.cloned()
.unwrap_or_else(|| "member".to_string());
let is_room_owner = room_model.created_by == user_id;
super::RoomParticipantResponse {
room: room_id,
user: user_id,
user_info,
project_role,
is_room_owner,
last_read_seq: state.and_then(|s| s.last_read_seq),
do_not_disturb: state.map(|s| s.do_not_disturb).unwrap_or(false),
dnd_start_hour: state.and_then(|s| s.dnd_start_hour),
dnd_end_hour: state.and_then(|s| s.dnd_end_hour),
joined_at: state.and_then(|s| s.joined_at),
}
}).collect();
super::RoomParticipantResponse {
room: room_id,
user: user_id,
user_info,
project_role,
is_room_owner,
last_read_seq: state.and_then(|s| s.last_read_seq),
do_not_disturb: state.map(|s| s.do_not_disturb).unwrap_or(false),
dnd_start_hour: state.and_then(|s| s.dnd_start_hour),
dnd_end_hour: state.and_then(|s| s.dnd_end_hour),
joined_at: state.and_then(|s| s.joined_at),
}
})
.collect();
Ok(super::RoomParticipantListResponse { participants })
}
}
}

View File

@ -86,7 +86,8 @@ impl RoomService {
.or_else(|| Some(format!("AI({})", &id.to_string()[..8])))
}),
_ => msg.sender_id.and_then(|id| users.get(&id).cloned()),
}.or_else(|| msg.sender_id.map(|id| id.to_string()));
}
.or_else(|| msg.sender_id.map(|id| id.to_string()));
let chunked = super::RoomMessageResponse::detect_chunked(&msg.thinking_content);
super::RoomMessageResponse {
id: msg.id,
@ -142,8 +143,10 @@ impl RoomService {
.await
.unwrap_or_default();
let mut reaction_map: std::collections::HashMap<Uuid, Vec<room_message_reaction::Model>> =
std::collections::HashMap::new();
let mut reaction_map: std::collections::HashMap<
Uuid,
Vec<room_message_reaction::Model>,
> = std::collections::HashMap::new();
for r in reactions {
reaction_map.entry(r.message).or_default().push(r);
}

View File

@ -40,7 +40,8 @@ impl RoomService {
}
}
let seq = crate::service::next_room_message_seq_internal(room_id, &self.db, &self.cache).await?;
let seq =
crate::service::next_room_message_seq_internal(room_id, &self.db, &self.cache).await?;
let now = Utc::now();
let id = Uuid::now_v7();
let project_id = room_model.project;
@ -101,7 +102,12 @@ impl RoomService {
};
let preview = if content.len() > 50 {
let end = content.char_indices().map(|(i, _)| i).take_while(|&i| i <= 50).last().unwrap_or(50);
let end = content
.char_indices()
.map(|(i, _)| i)
.take_while(|&i| i <= 50)
.last()
.unwrap_or(50);
format!("{}...", &content[..end])
} else {
content.clone()
@ -152,10 +158,7 @@ impl RoomService {
let attachment_ids = request.attachment_ids.clone();
if !attachment_ids.is_empty() {
if let Err(e) = room_attachment::Entity::update_many()
.col_expr(
room_attachment::Column::Message,
Expr::value(Some(id)),
)
.col_expr(room_attachment::Column::Message, Expr::value(Some(id)))
.filter(room_attachment::Column::Id.is_in(attachment_ids.clone()))
.exec(&self.db)
.await
@ -196,7 +199,10 @@ impl RoomService {
.notification_create(super::NotificationCreateRequest {
notification_type: super::NotificationType::Mention,
user_id: mentioned_user_id,
title: format!("{} 在 {} 中提到了你", sender_display_name, room_model.room_name),
title: format!(
"{} 在 {} 中提到了你",
sender_display_name, room_model.room_name
),
content: Some(content.clone()),
room_id: Some(room_id),
project_id,

View File

@ -1,4 +1,6 @@
use metrics::{describe_counter, describe_gauge, describe_histogram, Counter, Gauge, Histogram, Unit};
use metrics::{
Counter, Gauge, Histogram, Unit, describe_counter, describe_gauge, describe_histogram,
};
use std::sync::Arc;
use uuid::Uuid;
@ -165,8 +167,7 @@ impl RoomMetrics {
}
#[allow(dead_code)]
pub async fn cleanup_stale_rooms(&self, _active_room_ids: &[Uuid]) {
}
pub async fn cleanup_stale_rooms(&self, _active_room_ids: &[Uuid]) {}
pub fn into_arc(self) -> Arc<RoomMetrics> {
Arc::new(self)

View File

@ -4,9 +4,9 @@ use crate::service::RoomService;
use crate::ws_context::WsUserContext;
use chrono::Utc;
use deadpool_redis::redis;
use models::Expr;
use models::rooms::room_notifications;
use models::users::user as user_model;
use models::Expr;
use sea_orm::*;
use std::sync::Arc;
use uuid::Uuid;

View File

@ -184,7 +184,11 @@ impl PresenceStore {
expires_at: Option<DateTime<Utc>>,
) -> Option<CustomStatusChanged> {
// Find the primary presence entry (first one found)
let key = self.entries.iter().find(|e| e.user_id == user_id).map(|e| *e.key());
let key = self
.entries
.iter()
.find(|e| e.user_id == user_id)
.map(|e| *e.key());
if let Some(key) = key {
if let Some(mut entry) = self.entries.get_mut(&key) {
@ -234,7 +238,11 @@ impl PresenceStore {
}
/// Remove user presence when they disconnect.
pub fn remove_presence(&self, user_id: Uuid, project_id: Option<Uuid>) -> Option<PresenceChanged> {
pub fn remove_presence(
&self,
user_id: Uuid,
project_id: Option<Uuid>,
) -> Option<PresenceChanged> {
let key = (project_id, user_id);
if let Some((_, entry)) = self.entries.remove(&key) {
// Remove from user_projects index
@ -261,7 +269,10 @@ impl PresenceStore {
pub fn project_online_count(&self, project_id: Uuid) -> usize {
self.entries
.iter()
.filter(|entry| entry.key().0 == Some(project_id) && entry.effective_status() != PresenceStatus::Offline)
.filter(|entry| {
entry.key().0 == Some(project_id)
&& entry.effective_status() != PresenceStatus::Offline
})
.count()
}
}
}

View File

@ -1,7 +1,7 @@
use crate::error::RoomError;
use crate::service::RoomService;
use crate::ws_context::WsUserContext;
use crate::types_responses::ReactionGroupResponse;
use crate::ws_context::WsUserContext;
use models::rooms::room_message_reaction;
use models::users::user as user_model;
use sea_orm::*;
@ -20,14 +20,23 @@ pub struct MessageSearchResponse {
}
impl RoomService {
pub async fn message_reactions_get(&self, message_id: Uuid, ctx: &WsUserContext) -> Result<MessageReactionsResponse, RoomError> {
pub async fn message_reactions_get(
&self,
message_id: Uuid,
ctx: &WsUserContext,
) -> Result<MessageReactionsResponse, RoomError> {
let user_id = ctx.user_id;
let message = self.find_message_or_404(message_id).await?;
self.require_room_access(message.room, user_id).await?;
self.get_message_reactions(message_id, Some(user_id)).await
}
pub async fn message_reactions_batch(&self, room_id: Uuid, message_ids: Vec<Uuid>, ctx: &WsUserContext) -> Result<Vec<MessageReactionsResponse>, RoomError> {
pub async fn message_reactions_batch(
&self,
room_id: Uuid,
message_ids: Vec<Uuid>,
ctx: &WsUserContext,
) -> Result<Vec<MessageReactionsResponse>, RoomError> {
let user_id = ctx.user_id;
self.require_room_access(room_id, user_id).await?;
let mut results = Vec::with_capacity(message_ids.len());
@ -38,17 +47,33 @@ impl RoomService {
Ok(results)
}
pub async fn message_search(&self, room_id: Uuid, query: &str, limit: Option<u64>, offset: Option<u64>, ctx: &WsUserContext) -> Result<MessageSearchResponse, RoomError> {
pub async fn message_search(
&self,
room_id: Uuid,
query: &str,
limit: Option<u64>,
offset: Option<u64>,
ctx: &WsUserContext,
) -> Result<MessageSearchResponse, RoomError> {
let user_id = ctx.user_id;
self.require_room_access(room_id, user_id).await?;
if query.trim().is_empty() {
return Ok(MessageSearchResponse { messages: Vec::new(), total: 0 });
return Ok(MessageSearchResponse {
messages: Vec::new(),
total: 0,
});
}
let limit = limit.unwrap_or(20);
let offset = offset.unwrap_or(0);
let search_pattern = format!("%{}%", query.replace('\\', "\\\\").replace('%', "\\%").replace('_', "\\_"));
let search_pattern = format!(
"%{}%",
query
.replace('\\', "\\\\")
.replace('%', "\\%")
.replace('_', "\\_")
);
let query_builder = models::rooms::room_message::Entity::find()
.filter(models::rooms::room_message::Column::Room.eq(room_id))
@ -58,14 +83,25 @@ impl RoomService {
let total = query_builder.clone().count(&self.db).await? as i64;
let messages = query_builder
.order_by_desc(models::rooms::room_message::Column::SendAt)
.limit(limit).offset(offset).all(&self.db).await?;
.limit(limit)
.offset(offset)
.all(&self.db)
.await?;
let response_messages = self.build_messages_with_display_names(messages).await;
Ok(MessageSearchResponse { messages: response_messages, total })
Ok(MessageSearchResponse {
messages: response_messages,
total,
})
}
pub(crate) async fn find_message_or_404(&self, message_id: Uuid) -> Result<models::rooms::room_message::Model, RoomError> {
models::rooms::room_message::Entity::find_by_id(message_id).one(&self.db).await?
pub(crate) async fn find_message_or_404(
&self,
message_id: Uuid,
) -> Result<models::rooms::room_message::Model, RoomError> {
models::rooms::room_message::Entity::find_by_id(message_id)
.one(&self.db)
.await?
.ok_or_else(|| RoomError::NotFound("Message not found".to_string()))
}
@ -76,45 +112,110 @@ impl RoomService {
Ok(())
}
pub(crate) async fn get_message_reactions(&self, message_id: Uuid, current_user_id: Option<Uuid>) -> Result<MessageReactionsResponse, RoomError> {
pub(crate) async fn get_message_reactions(
&self,
message_id: Uuid,
current_user_id: Option<Uuid>,
) -> Result<MessageReactionsResponse, RoomError> {
let reactions = room_message_reaction::Entity::find()
.filter(room_message_reaction::Column::Message.eq(message_id))
.limit(1000)
.all(&self.db).await?;
.all(&self.db)
.await?;
let reaction_groups = self.build_reaction_groups(reactions, current_user_id);
Ok(MessageReactionsResponse { message_id, reactions: reaction_groups })
Ok(MessageReactionsResponse {
message_id,
reactions: reaction_groups,
})
}
pub(crate) fn build_reaction_groups(&self, reactions: Vec<room_message_reaction::Model>, current_user_id: Option<Uuid>) -> Vec<ReactionGroupResponse> {
let mut grouped: std::collections::HashMap<String, Vec<&room_message_reaction::Model>> = std::collections::HashMap::new();
for r in &reactions { grouped.entry(r.emoji.clone()).or_default().push(r); }
pub(crate) fn build_reaction_groups(
&self,
reactions: Vec<room_message_reaction::Model>,
current_user_id: Option<Uuid>,
) -> Vec<ReactionGroupResponse> {
let mut grouped: std::collections::HashMap<String, Vec<&room_message_reaction::Model>> =
std::collections::HashMap::new();
for r in &reactions {
grouped.entry(r.emoji.clone()).or_default().push(r);
}
grouped.into_iter().map(|(emoji, user_reactions)| {
let count = user_reactions.len() as i32;
let reacted_by_me = current_user_id.map(|uid| user_reactions.iter().any(|r| r.user == uid)).unwrap_or(false);
let users = user_reactions.iter().take(3).map(|r| r.user.to_string()).collect();
ReactionGroupResponse { emoji, count, reacted_by_me, users }
}).collect()
grouped
.into_iter()
.map(|(emoji, user_reactions)| {
let count = user_reactions.len() as i32;
let reacted_by_me = current_user_id
.map(|uid| user_reactions.iter().any(|r| r.user == uid))
.unwrap_or(false);
let users = user_reactions
.iter()
.take(3)
.map(|r| r.user.to_string())
.collect();
ReactionGroupResponse {
emoji,
count,
reacted_by_me,
users,
}
})
.collect()
}
pub(crate) async fn build_messages_with_display_names(&self, messages: Vec<models::rooms::room_message::Model>) -> Vec<super::RoomMessageResponse> {
let user_ids: Vec<Uuid> = messages.iter()
.filter(|m| m.sender_type.to_string() == "user").filter_map(|m| m.sender_id).collect();
pub(crate) async fn build_messages_with_display_names(
&self,
messages: Vec<models::rooms::room_message::Model>,
) -> Vec<super::RoomMessageResponse> {
let user_ids: Vec<Uuid> = messages
.iter()
.filter(|m| m.sender_type.to_string() == "user")
.filter_map(|m| m.sender_id)
.collect();
let users: std::collections::HashMap<Uuid, String> = if !user_ids.is_empty() {
user_model::Entity::find().filter(user_model::Column::Uid.is_in(user_ids))
.all(&self.db).await.unwrap_or_default().into_iter()
.map(|u| (u.uid, u.display_name.unwrap_or(u.username))).collect()
} else { std::collections::HashMap::new() };
user_model::Entity::find()
.filter(user_model::Column::Uid.is_in(user_ids))
.all(&self.db)
.await
.unwrap_or_default()
.into_iter()
.map(|u| (u.uid, u.display_name.unwrap_or(u.username)))
.collect()
} else {
std::collections::HashMap::new()
};
messages.into_iter().map(|msg| {
let sender_type = msg.sender_type.to_string();
let display_name = match sender_type.as_str() {
"user" => msg.sender_id.and_then(|id| users.get(&id).cloned()),
_ => None,
};
let chunked = super::RoomMessageResponse::detect_chunked(&msg.thinking_content);
super::RoomMessageResponse { id: msg.id, seq: msg.seq, room: msg.room, sender_type, sender_id: msg.sender_id, display_name, thread: msg.thread, in_reply_to: msg.in_reply_to, content: msg.content, content_type: msg.content_type.to_string(), thinking_content: msg.thinking_content, thinking_is_chunked: chunked, edited_at: msg.edited_at, send_at: msg.send_at, revoked: msg.revoked, revoked_by: msg.revoked_by, highlighted_content: None, attachment_ids: Vec::new(), reactions: Vec::new() }
}).collect()
messages
.into_iter()
.map(|msg| {
let sender_type = msg.sender_type.to_string();
let display_name = match sender_type.as_str() {
"user" => msg.sender_id.and_then(|id| users.get(&id).cloned()),
_ => None,
};
let chunked = super::RoomMessageResponse::detect_chunked(&msg.thinking_content);
super::RoomMessageResponse {
id: msg.id,
seq: msg.seq,
room: msg.room,
sender_type,
sender_id: msg.sender_id,
display_name,
thread: msg.thread,
in_reply_to: msg.in_reply_to,
content: msg.content,
content_type: msg.content_type.to_string(),
thinking_content: msg.thinking_content,
thinking_is_chunked: chunked,
edited_at: msg.edited_at,
send_at: msg.send_at,
revoked: msg.revoked,
revoked_by: msg.revoked_by,
highlighted_content: None,
attachment_ids: Vec::new(),
reactions: Vec::new(),
}
})
.collect()
}
}

View File

@ -213,7 +213,8 @@ pub async fn acquire_room_ai_lock(
let lock_key_for_watchdog = lock_key.clone();
let lock_token_for_watchdog = lock_token.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(HEARTBEAT_INTERVAL_MS));
let mut interval =
tokio::time::interval(Duration::from_millis(HEARTBEAT_INTERVAL_MS));
loop {
tokio::select! {
_ = cancel_token.cancelled() => break,

View File

@ -2,7 +2,11 @@ use crate::error::RoomError;
use crate::service::RoomService;
use crate::ws_context::WsUserContext;
use chrono::Utc;
use models::rooms::{room, room_ai, room_attachment, room_category, room_message, room_message_edit_history, room_message_reaction, room_notifications, room_pin, room_thread, room_access, room_user_state};
use models::rooms::{
room, room_access, room_ai, room_attachment, room_category, room_message,
room_message_edit_history, room_message_reaction, room_notifications, room_pin, room_thread,
room_user_state,
};
use queue::ProjectRoomEvent;
use sea_orm::*;
use uuid::Uuid;
@ -25,7 +29,9 @@ impl RoomService {
.await?
.ok_or_else(|| RoomError::NotFound("Room category not found".to_string()))?;
if category.project != project.id {
return Err(RoomError::BadRequest("category does not belong to this project".to_string()));
return Err(RoomError::BadRequest(
"category does not belong to this project".to_string(),
));
}
}
@ -40,7 +46,9 @@ impl RoomService {
created_by: Set(user_id),
created_at: Set(Utc::now()),
last_msg_at: Set(Utc::now()),
}.insert(&txn).await?;
}
.insert(&txn)
.await?;
// Create room_user_state for creator
room_user_state::ActiveModel {
@ -51,7 +59,9 @@ impl RoomService {
dnd_start_hour: Set(None),
dnd_end_hour: Set(None),
joined_at: Set(Some(Utc::now())),
}.insert(&txn).await?;
}
.insert(&txn)
.await?;
// For private rooms, creator is auto-granted access
if !request.public {
@ -60,7 +70,9 @@ impl RoomService {
user: Set(user_id),
granted_by: Set(user_id),
granted_at: Set(Utc::now()),
}.insert(&txn).await?;
}
.insert(&txn)
.await?;
}
txn.commit().await?;
@ -76,7 +88,10 @@ impl RoomService {
seq: None,
timestamp: Utc::now(),
};
let _ = self.queue.publish_project_room_event(project.id, event).await;
let _ = self
.queue
.publish_project_room_event(project.id, event)
.await;
self.notify_project_members(
project.id,
@ -109,7 +124,9 @@ impl RoomService {
.await?
.ok_or_else(|| RoomError::NotFound("Room category not found".to_string()))?;
if category.project != room_model.project {
return Err(RoomError::BadRequest("category does not belong to this project".to_string()));
return Err(RoomError::BadRequest(
"category does not belong to this project".to_string(),
));
}
}
@ -140,7 +157,10 @@ impl RoomService {
seq: None,
timestamp: Utc::now(),
};
let _ = self.queue.publish_project_room_event(updated.project, event).await;
let _ = self
.queue
.publish_project_room_event(updated.project, event)
.await;
}
if moved {
let event = ProjectRoomEvent {
@ -152,7 +172,10 @@ impl RoomService {
seq: None,
timestamp: Utc::now(),
};
let _ = self.queue.publish_project_room_event(updated.project, event).await;
let _ = self
.queue
.publish_project_room_event(updated.project, event)
.await;
}
let version = self.increment_room_version(room_id).await?;
@ -169,14 +192,38 @@ impl RoomService {
let txn = self.db.begin().await?;
room_attachment::Entity::delete_many().filter(room_attachment::Column::Room.eq(room_id)).exec(&txn).await?;
room_message::Entity::delete_many().filter(room_message::Column::Room.eq(room_id)).exec(&txn).await?;
room_pin::Entity::delete_many().filter(room_pin::Column::Room.eq(room_id)).exec(&txn).await?;
room_thread::Entity::delete_many().filter(room_thread::Column::Room.eq(room_id)).exec(&txn).await?;
room_access::Entity::delete_many().filter(room_access::Column::Room.eq(room_id)).exec(&txn).await?;
room_user_state::Entity::delete_many().filter(room_user_state::Column::Room.eq(room_id)).exec(&txn).await?;
room_ai::Entity::delete_many().filter(room_ai::Column::Room.eq(room_id)).exec(&txn).await?;
room_message_reaction::Entity::delete_many().filter(room_message_reaction::Column::Room.eq(room_id)).exec(&txn).await?;
room_attachment::Entity::delete_many()
.filter(room_attachment::Column::Room.eq(room_id))
.exec(&txn)
.await?;
room_message::Entity::delete_many()
.filter(room_message::Column::Room.eq(room_id))
.exec(&txn)
.await?;
room_pin::Entity::delete_many()
.filter(room_pin::Column::Room.eq(room_id))
.exec(&txn)
.await?;
room_thread::Entity::delete_many()
.filter(room_thread::Column::Room.eq(room_id))
.exec(&txn)
.await?;
room_access::Entity::delete_many()
.filter(room_access::Column::Room.eq(room_id))
.exec(&txn)
.await?;
room_user_state::Entity::delete_many()
.filter(room_user_state::Column::Room.eq(room_id))
.exec(&txn)
.await?;
room_ai::Entity::delete_many()
.filter(room_ai::Column::Room.eq(room_id))
.exec(&txn)
.await?;
room_message_reaction::Entity::delete_many()
.filter(room_message_reaction::Column::Room.eq(room_id))
.exec(&txn)
.await?;
let subquery = room_message::Entity::find()
.filter(room_message::Column::Room.eq(room_id))
@ -188,7 +235,10 @@ impl RoomService {
.exec(&txn)
.await?;
room_notifications::Entity::delete_many().filter(room_notifications::Column::Room.eq(room_id)).exec(&txn).await?;
room_notifications::Entity::delete_many()
.filter(room_notifications::Column::Room.eq(room_id))
.exec(&txn)
.await?;
room::Entity::delete_by_id(room_id).exec(&txn).await?;
txn.commit().await?;
@ -212,7 +262,10 @@ impl RoomService {
seq: None,
timestamp: Utc::now(),
};
let _ = self.queue.publish_project_room_event(project_id, event).await;
let _ = self
.queue
.publish_project_room_event(project_id, event)
.await;
self.notify_project_members(
project_id,
@ -225,4 +278,4 @@ impl RoomService {
observability::incr!(observability::ROOMS_DELETED_TOTAL);
Ok(())
}
}
}

View File

@ -1,6 +1,6 @@
use crate::RoomMessageSearchRequest;
use crate::error::RoomError;
use crate::service::RoomService;
use crate::RoomMessageSearchRequest;
use crate::ws_context::WsUserContext;
use models::rooms::room_message;
use models::{DateTimeUtc, MessageId, RoomId, RoomThreadId, Seq, UserId};
@ -8,27 +8,60 @@ use sea_orm::*;
use uuid::Uuid;
impl RoomService {
pub async fn room_message_search(&self, room_id: Uuid, request: RoomMessageSearchRequest, ctx: &WsUserContext) -> Result<super::reaction::MessageSearchResponse, RoomError> {
pub async fn room_message_search(
&self,
room_id: Uuid,
request: RoomMessageSearchRequest,
ctx: &WsUserContext,
) -> Result<super::reaction::MessageSearchResponse, RoomError> {
let user_id = ctx.user_id;
self.require_room_access(room_id, user_id).await?;
if request.q.trim().is_empty() {
return Ok(super::reaction::MessageSearchResponse { messages: Vec::new(), total: 0 });
return Ok(super::reaction::MessageSearchResponse {
messages: Vec::new(),
total: 0,
});
}
let limit = std::cmp::min(request.limit.unwrap_or(20), 100);
let offset = request.offset.unwrap_or(0);
let mut conditions = vec!["room = $1".to_string(), "content_tsv @@ plainto_tsquery('simple', $2)".to_string(), "revoked IS NULL".to_string()];
let mut conditions = vec![
"room = $1".to_string(),
"content_tsv @@ plainto_tsquery('simple', $2)".to_string(),
"revoked IS NULL".to_string(),
];
let mut param_index = 3;
let mut params: Vec<sea_orm::Value> = vec![room_id.into(), request.q.trim().into()];
if let Some(st) = request.start_time { conditions.push(format!("send_at >= ${}", param_index)); params.push(st.into()); param_index += 1; }
if let Some(et) = request.end_time { conditions.push(format!("send_at <= ${}", param_index)); params.push(et.into()); param_index += 1; }
if let Some(sid) = request.sender_id { conditions.push(format!("sender_id = ${}", param_index)); params.push(sid.into()); param_index += 1; }
if let Some(ref ct) = request.content_type { conditions.push(format!("content_type = ${}", param_index)); params.push(ct.clone().into()); param_index += 1; }
if let Some(st) = request.start_time {
conditions.push(format!("send_at >= ${}", param_index));
params.push(st.into());
param_index += 1;
}
if let Some(et) = request.end_time {
conditions.push(format!("send_at <= ${}", param_index));
params.push(et.into());
param_index += 1;
}
if let Some(sid) = request.sender_id {
conditions.push(format!("sender_id = ${}", param_index));
params.push(sid.into());
param_index += 1;
}
if let Some(ref ct) = request.content_type {
conditions.push(format!("content_type = ${}", param_index));
params.push(ct.clone().into());
param_index += 1;
}
let where_clause = conditions.join(" AND ");
let sql = format!(r#"SELECT id, seq, room, sender_type, sender_id, thread, in_reply_to, content, content_type, edited_at, send_at, revoked, revoked_by, ts_headline('simple', content, plainto_tsquery('simple', $2), 'StartSel=<mark>, StopSel=</mark>, MaxWords=50, MinWords=15') AS highlighted_content FROM room_message WHERE {} ORDER BY send_at DESC LIMIT ${} OFFSET ${}"#, where_clause, param_index, param_index + 1);
let sql = format!(
r#"SELECT id, seq, room, sender_type, sender_id, thread, in_reply_to, content, content_type, edited_at, send_at, revoked, revoked_by, ts_headline('simple', content, plainto_tsquery('simple', $2), 'StartSel=<mark>, StopSel=</mark>, MaxWords=50, MinWords=15') AS highlighted_content FROM room_message WHERE {} ORDER BY send_at DESC LIMIT ${} OFFSET ${}"#,
where_clause,
param_index,
param_index + 1
);
params.push(limit.into());
params.push(offset.into());
@ -40,46 +73,147 @@ impl RoomService {
for row in rows {
let sender_type_str = row.try_get::<String>("", "sender_type").unwrap_or_default();
let sender_type = match sender_type_str.as_str() {
"user" => models::rooms::MessageSenderType::User, "ai" => models::rooms::MessageSenderType::Ai, "system" => models::rooms::MessageSenderType::System, "tool" => models::rooms::MessageSenderType::Tool, "webhook" => models::rooms::MessageSenderType::Webhook, _ => models::rooms::MessageSenderType::User,
"user" => models::rooms::MessageSenderType::User,
"ai" => models::rooms::MessageSenderType::Ai,
"system" => models::rooms::MessageSenderType::System,
"tool" => models::rooms::MessageSenderType::Tool,
"webhook" => models::rooms::MessageSenderType::Webhook,
_ => models::rooms::MessageSenderType::User,
};
let content_type_str = row.try_get::<String>("", "content_type").unwrap_or_default();
let content_type_str = row
.try_get::<String>("", "content_type")
.unwrap_or_default();
let content_type = match content_type_str.as_str() {
"image" => models::rooms::MessageContentType::Image, "audio" => models::rooms::MessageContentType::Audio, "video" => models::rooms::MessageContentType::Video, "file" => models::rooms::MessageContentType::File, _ => models::rooms::MessageContentType::Text,
"image" => models::rooms::MessageContentType::Image,
"audio" => models::rooms::MessageContentType::Audio,
"video" => models::rooms::MessageContentType::Video,
"file" => models::rooms::MessageContentType::File,
_ => models::rooms::MessageContentType::Text,
};
let msg = room_message::Model {
id: row.try_get::<MessageId>("", "id").unwrap_or_default(),
seq: row.try_get::<Seq>("", "seq").unwrap_or_default(),
room: row.try_get::<RoomId>("", "room").unwrap_or_default(),
sender_type,
sender_id: row
.try_get::<Option<UserId>>("", "sender_id")
.ok()
.flatten(),
model_id: row.try_get::<Option<Uuid>>("", "model_id").ok().flatten(),
thread: row
.try_get::<Option<RoomThreadId>>("", "thread")
.ok()
.flatten(),
in_reply_to: row
.try_get::<Option<MessageId>>("", "in_reply_to")
.ok()
.flatten(),
content: row.try_get::<String>("", "content").unwrap_or_default(),
content_type,
thinking_content: None,
edited_at: row
.try_get::<Option<DateTimeUtc>>("", "edited_at")
.ok()
.flatten(),
send_at: row
.try_get::<DateTimeUtc>("", "send_at")
.unwrap_or_default(),
revoked: row
.try_get::<Option<DateTimeUtc>>("", "revoked")
.ok()
.flatten(),
revoked_by: row
.try_get::<Option<UserId>>("", "revoked_by")
.ok()
.flatten(),
content_tsv: None,
};
let msg = room_message::Model { id: row.try_get::<MessageId>("", "id").unwrap_or_default(), seq: row.try_get::<Seq>("", "seq").unwrap_or_default(), room: row.try_get::<RoomId>("", "room").unwrap_or_default(), sender_type, sender_id: row.try_get::<Option<UserId>>("", "sender_id").ok().flatten(), model_id: row.try_get::<Option<Uuid>>("", "model_id").ok().flatten(), thread: row.try_get::<Option<RoomThreadId>>("", "thread").ok().flatten(), in_reply_to: row.try_get::<Option<MessageId>>("", "in_reply_to").ok().flatten(), content: row.try_get::<String>("", "content").unwrap_or_default(), content_type, thinking_content: None, edited_at: row.try_get::<Option<DateTimeUtc>>("", "edited_at").ok().flatten(), send_at: row.try_get::<DateTimeUtc>("", "send_at").unwrap_or_default(), revoked: row.try_get::<Option<DateTimeUtc>>("", "revoked").ok().flatten(), revoked_by: row.try_get::<Option<UserId>>("", "revoked_by").ok().flatten(), content_tsv: None };
let highlighted_content = row.try_get::<String>("", "highlighted_content").unwrap_or_else(|_| msg.content.clone());
let highlighted_content = row
.try_get::<String>("", "highlighted_content")
.unwrap_or_else(|_| msg.content.clone());
let mut msg_with_name = self.resolve_display_name(msg.clone(), room_id).await;
msg_with_name.highlighted_content = Some(highlighted_content);
results.push(msg_with_name);
}
let mut count_conditions = vec!["room = $1".to_string(), "content_tsv @@ plainto_tsquery('simple', $2)".to_string(), "revoked IS NULL".to_string()];
let mut count_conditions = vec![
"room = $1".to_string(),
"content_tsv @@ plainto_tsquery('simple', $2)".to_string(),
"revoked IS NULL".to_string(),
];
let mut count_params: Vec<sea_orm::Value> = vec![room_id.into(), request.q.trim().into()];
let mut count_param_idx = 3;
if let Some(st) = request.start_time { count_conditions.push(format!("send_at >= ${}", count_param_idx)); count_params.push(st.into()); count_param_idx += 1; }
if let Some(et) = request.end_time { count_conditions.push(format!("send_at <= ${}", count_param_idx)); count_params.push(et.into()); count_param_idx += 1; }
if let Some(sid) = request.sender_id { count_conditions.push(format!("sender_id = ${}", count_param_idx)); count_params.push(sid.into()); count_param_idx += 1; }
if let Some(ref ct) = request.content_type { count_conditions.push(format!("content_type = ${}", count_param_idx)); count_params.push(ct.clone().into()); }
if let Some(st) = request.start_time {
count_conditions.push(format!("send_at >= ${}", count_param_idx));
count_params.push(st.into());
count_param_idx += 1;
}
if let Some(et) = request.end_time {
count_conditions.push(format!("send_at <= ${}", count_param_idx));
count_params.push(et.into());
count_param_idx += 1;
}
if let Some(sid) = request.sender_id {
count_conditions.push(format!("sender_id = ${}", count_param_idx));
count_params.push(sid.into());
count_param_idx += 1;
}
if let Some(ref ct) = request.content_type {
count_conditions.push(format!("content_type = ${}", count_param_idx));
count_params.push(ct.clone().into());
}
let count_sql = format!("SELECT COUNT(*) AS count FROM room_message WHERE {}", count_conditions.join(" AND "));
let count_stmt = Statement::from_sql_and_values(DbBackend::Postgres, &count_sql, count_params);
let total: i64 = self.db.query_one_raw(count_stmt).await?.and_then(|r| r.try_get::<i64>("", "count").ok()).unwrap_or(0);
let count_sql = format!(
"SELECT COUNT(*) AS count FROM room_message WHERE {}",
count_conditions.join(" AND ")
);
let count_stmt =
Statement::from_sql_and_values(DbBackend::Postgres, &count_sql, count_params);
let total: i64 = self
.db
.query_one_raw(count_stmt)
.await?
.and_then(|r| r.try_get::<i64>("", "count").ok())
.unwrap_or(0);
Ok(super::reaction::MessageSearchResponse { messages: results, total })
Ok(super::reaction::MessageSearchResponse {
messages: results,
total,
})
}
pub async fn room_message_edit_history(&self, room_id: Uuid, message_id: Uuid, ctx: &WsUserContext) -> Result<super::MessageEditHistoryResponse, RoomError> {
pub async fn room_message_edit_history(
&self,
room_id: Uuid,
message_id: Uuid,
ctx: &WsUserContext,
) -> Result<super::MessageEditHistoryResponse, RoomError> {
let user_id = ctx.user_id;
self.require_room_access(room_id, user_id).await?;
let _msg = room_message::Entity::find_by_id(message_id).one(&self.db).await?
let _msg = room_message::Entity::find_by_id(message_id)
.one(&self.db)
.await?
.ok_or_else(|| RoomError::NotFound("Message not found".to_string()))?;
let history = models::rooms::room_message_edit_history::Entity::find()
.filter(models::rooms::room_message_edit_history::Column::Message.eq(message_id))
.order_by_asc(models::rooms::room_message_edit_history::Column::EditedAt).all(&self.db).await?;
.order_by_asc(models::rooms::room_message_edit_history::Column::EditedAt)
.all(&self.db)
.await?;
let total_edits = history.len() as i64;
let entries: Vec<super::MessageEditHistoryEntry> = history.into_iter().map(|h| super::MessageEditHistoryEntry { old_content: h.old_content, new_content: h.new_content, edited_at: h.edited_at }).collect();
Ok(super::MessageEditHistoryResponse { message_id, history: entries, total_edits })
let entries: Vec<super::MessageEditHistoryEntry> = history
.into_iter()
.map(|h| super::MessageEditHistoryEntry {
old_content: h.old_content,
new_content: h.new_content,
edited_at: h.edited_at,
})
.collect();
Ok(super::MessageEditHistoryResponse {
message_id,
history: entries,
total_edits,
})
}
}

View File

@ -7,15 +7,28 @@ use sea_orm::*;
use uuid::Uuid;
impl RoomService {
pub async fn room_message_reaction_list(&self, room_id: Uuid, message_id: Uuid, ctx: &WsUserContext) -> Result<super::reaction::MessageReactionsResponse, RoomError> {
pub async fn room_message_reaction_list(
&self,
room_id: Uuid,
message_id: Uuid,
ctx: &WsUserContext,
) -> Result<super::reaction::MessageReactionsResponse, RoomError> {
let user_id = ctx.user_id;
self.require_room_access(room_id, user_id).await?;
let _msg = room_message::Entity::find_by_id(message_id).one(&self.db).await?
let _msg = room_message::Entity::find_by_id(message_id)
.one(&self.db)
.await?
.ok_or_else(|| RoomError::NotFound("Message not found".to_string()))?;
self.get_message_reactions(message_id, Some(user_id)).await
}
pub async fn room_message_reaction_toggle(&self, room_id: Uuid, message_id: Uuid, emoji: String, ctx: &WsUserContext) -> Result<super::reaction::MessageReactionsResponse, RoomError> {
pub async fn room_message_reaction_toggle(
&self,
room_id: Uuid,
message_id: Uuid,
emoji: String,
ctx: &WsUserContext,
) -> Result<super::reaction::MessageReactionsResponse, RoomError> {
let user_id = ctx.user_id;
self.require_room_access(room_id, user_id).await?;
if emoji.is_empty() || emoji.len() > 50 {
@ -27,15 +40,24 @@ impl RoomService {
.filter(room_message_reaction::Column::Message.eq(message_id))
.filter(room_message_reaction::Column::User.eq(user_id))
.filter(room_message_reaction::Column::Emoji.eq(&emoji))
.one(&self.db).await?
.one(&self.db)
.await?
{
room_message_reaction::Entity::delete_by_id(existing.id).exec(&self.db).await?;
room_message_reaction::Entity::delete_by_id(existing.id)
.exec(&self.db)
.await?;
} else {
room_message_reaction::ActiveModel {
id: Set(Uuid::now_v7()), room: Set(room_id), message: Set(message_id),
user: Set(user_id), emoji: Set(emoji), created_at: Set(Utc::now()),
}.insert(&self.db).await?;
id: Set(Uuid::now_v7()),
room: Set(room_id),
message: Set(message_id),
user: Set(user_id),
emoji: Set(emoji),
created_at: Set(Utc::now()),
}
.insert(&self.db)
.await?;
}
self.get_message_reactions(message_id, Some(user_id)).await
}
}
}

View File

@ -1,7 +1,7 @@
use db::database::AppDatabase;
use models::projects::MemberRole;
use models::projects::project_members;
use models::rooms::{room, room_access, room_user_state};
use models::projects::MemberRole;
use sea_orm::*;
use uuid::Uuid;
@ -65,11 +65,7 @@ pub async fn check_project_member(
}
/// Check if user is a project admin (Owner or Admin role).
pub async fn is_project_admin(
db: &AppDatabase,
project_id: Uuid,
user_id: Uuid,
) -> bool {
pub async fn is_project_admin(db: &AppDatabase, project_id: Uuid, user_id: Uuid) -> bool {
let member = project_members::Entity::find()
.filter(project_members::Column::Project.eq(project_id))
.filter(project_members::Column::User.eq(user_id))
@ -98,11 +94,7 @@ pub async fn require_project_admin(
}
/// Check if user can admin a room (project admin OR room creator).
pub async fn is_room_admin(
db: &AppDatabase,
room_id: Uuid,
user_id: Uuid,
) -> bool {
pub async fn is_room_admin(db: &AppDatabase, room_id: Uuid, user_id: Uuid) -> bool {
let room_model = room::Entity::find_by_id(room_id)
.one(db)
.await
@ -176,4 +168,4 @@ pub async fn find_room_or_404(db: &AppDatabase, room_id: Uuid) -> Result<room::M
.one(db)
.await?
.ok_or_else(|| RoomError::NotFound("Room not found".to_string()))
}
}

View File

@ -1,9 +1,9 @@
use crate::error::RoomError;
use crate::service::RoomService;
use crate::ws_context::WsUserContext;
use crate::RoomEventType;
use crate::RoomUserStateResponse;
use crate::RoomUserStateUpdateDndRequest;
use crate::error::RoomError;
use crate::service::RoomService;
use crate::ws_context::WsUserContext;
use chrono::Utc;
use db::database::AppDatabase;
use models::rooms::{room_access, room_user_state};
@ -72,7 +72,9 @@ impl RoomService {
// Ensure user state exists (for last_read_seq tracking etc.)
let _ = crate::service::access::get_or_create_room_user_state(
&self.db, room_id, target_user_id,
&self.db,
room_id,
target_user_id,
)
.await;
@ -89,7 +91,9 @@ impl RoomService {
self.require_room_admin(room_id, ctx.user_id).await?;
if target_user_id == ctx.user_id {
return Err(RoomError::BadRequest("cannot revoke your own access".to_string()));
return Err(RoomError::BadRequest(
"cannot revoke your own access".to_string(),
));
}
revoke_room_access(&self.db, room_id, target_user_id).await?;
@ -105,10 +109,9 @@ impl RoomService {
) -> Result<RoomUserStateResponse, RoomError> {
self.require_room_access(room_id, ctx.user_id).await?;
let state = crate::service::access::get_or_create_room_user_state(
&self.db, room_id, ctx.user_id,
)
.await?;
let state =
crate::service::access::get_or_create_room_user_state(&self.db, room_id, ctx.user_id)
.await?;
let mut active: room_user_state::ActiveModel = state.into();
active.last_read_seq = Set(Some(last_read_seq));
@ -145,10 +148,9 @@ impl RoomService {
) -> Result<RoomUserStateResponse, RoomError> {
self.require_room_access(room_id, ctx.user_id).await?;
let state = crate::service::access::get_or_create_room_user_state(
&self.db, room_id, ctx.user_id,
)
.await?;
let state =
crate::service::access::get_or_create_room_user_state(&self.db, room_id, ctx.user_id)
.await?;
let mut active: room_user_state::ActiveModel = state.into();
if let Some(dnd) = request.do_not_disturb {
@ -178,4 +180,4 @@ impl RoomService {
joined_at: updated.joined_at,
})
}
}
}

View File

@ -2,10 +2,10 @@ use chrono::Utc;
use db::database::AppDatabase;
use models::rooms::{room_ai, room_message};
use queue::{MessageProducer, ProjectRoomEvent, RoomMessageEnvelope};
use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, ExprTrait, QueryFilter, Set};
use sea_orm::{ColumnTrait, EntityTrait, ExprTrait, QueryFilter, Set, sea_query::Expr};
use uuid::Uuid;
use super::ai_mode_streaming_steps::{lock_or_recover, ModeStreamingState};
use super::ai_mode_streaming_steps::{ModeStreamingState, lock_or_recover};
use crate::connection::RoomConnectionManager;
use agent::chat::normalize_thinking_content;

View File

@ -5,7 +5,7 @@ use db::cache::AppCache;
use db::database::AppDatabase;
use models::rooms::room_ai;
use queue::MessageProducer;
use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, ExprTrait, QueryFilter};
use sea_orm::{ColumnTrait, EntityTrait, ExprTrait, QueryFilter, sea_query::Expr};
use uuid::Uuid;
use super::ai_common::create_and_publish_ai_message;

View File

@ -5,7 +5,7 @@ use db::cache::AppCache;
use db::database::AppDatabase;
use models::rooms::room_ai;
use queue::MessageProducer;
use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, ExprTrait, QueryFilter};
use sea_orm::{ColumnTrait, EntityTrait, ExprTrait, QueryFilter, sea_query::Expr};
use uuid::Uuid;
use crate::connection::RoomConnectionManager;
@ -32,8 +32,11 @@ pub async fn process_message_ai_react_nonstreaming(
let final_answer = chat_service
.process_react_room(
&request, |_step| async move {},
room_tools, Some(&room_preamble), Some(queue.clone()),
&request,
|_step| async move {},
room_tools,
Some(&room_preamble),
Some(queue.clone()),
)
.await;

View File

@ -58,7 +58,9 @@ pub async fn process_message_ai_react_streaming(
action: "start".to_string(),
sender_type: Some("ai".to_string()),
};
room_manager.broadcast_typing(room_id, typing_start.clone()).await;
room_manager
.broadcast_typing(room_id, typing_start.clone())
.await;
let (typing_cancel_tx, mut typing_cancel_rx) = tokio::sync::oneshot::channel::<()>();
let typing_renew_handle = tokio::spawn({
@ -95,9 +97,15 @@ pub async fn process_message_ai_react_streaming(
true, // suppress_answer_broadcast: room mode — AI must use send_message
);
let result = chat_service.process_react_room(
&request, callback, room_tools, Some(&room_preamble), Some(queue.clone()),
).await;
let result = chat_service
.process_react_room(
&request,
callback,
room_tools,
Some(&room_preamble),
Some(queue.clone()),
)
.await;
// In room mode, suppress final answer posting — AI communicates via send_message tool.
finalize_react_stream(

View File

@ -2,10 +2,10 @@ use chrono::Utc;
use db::database::AppDatabase;
use models::rooms::{room_ai, room_message};
use queue::{MessageProducer, ProjectRoomEvent, RoomMessageEnvelope};
use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, ExprTrait, QueryFilter, Set};
use sea_orm::{ColumnTrait, EntityTrait, ExprTrait, QueryFilter, Set, sea_query::Expr};
use uuid::Uuid;
use super::ai_react_streaming_steps::{lock_or_recover, ReactStreamingState};
use super::ai_react_streaming_steps::{ReactStreamingState, lock_or_recover};
use super::sequence::next_room_message_seq_internal;
use crate::connection::RoomConnectionManager;
use agent::chat::normalize_thinking_content;
@ -32,7 +32,9 @@ pub(crate) async fn finalize_react_stream(
let final_event = RoomMessageStreamChunkEvent {
message_id: streaming_msg_id,
room_id,
seq: state.chunk_seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
seq: state
.chunk_seq
.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
content: final_stream_content.clone(),
done: true,
error: None,
@ -51,9 +53,9 @@ pub(crate) async fn finalize_react_stream(
// doesn't cross .await points (MutexGuard is not Send).
let used_send_message = {
let steps = lock_or_recover(&state.steps);
steps.iter().any(|(t, c)| {
t == "tool_call" && c.contains("\"name\":\"send_message\"")
})
steps
.iter()
.any(|(t, c)| t == "tool_call" && c.contains("\"name\":\"send_message\""))
};
if !used_send_message && !final_stream_content.trim().is_empty() {
@ -112,22 +114,27 @@ pub(crate) async fn finalize_react_stream(
if let Err(e) = queue.publish(room_id, envelope).await {
tracing::error!(error = %e, "Failed to publish auto-send room message");
} else {
room_manager.broadcast(room_id, queue::RoomMessageEvent {
id: msg_id,
room_id,
sender_type: "ai".to_string(),
sender_id: Some(model_id),
thread_id: None,
content: final_stream_content.clone(),
content_type: "text".to_string(),
thinking_content: None,
send_at: now,
seq: msg_seq,
in_reply_to: None,
display_name: Some(ai_display_name.to_string()),
reactions: None,
message_id: None,
}).await;
room_manager
.broadcast(
room_id,
queue::RoomMessageEvent {
id: msg_id,
room_id,
sender_type: "ai".to_string(),
sender_id: Some(model_id),
thread_id: None,
content: final_stream_content.clone(),
content_type: "text".to_string(),
thinking_content: None,
send_at: now,
seq: msg_seq,
in_reply_to: None,
display_name: Some(ai_display_name.to_string()),
reactions: None,
message_id: None,
},
)
.await;
room_manager.metrics.messages_sent.increment(1);
let project_event = ProjectRoomEvent {
@ -139,7 +146,9 @@ pub(crate) async fn finalize_react_stream(
seq: Some(msg_seq),
timestamp: now,
};
queue.publish_project_room_event(project_id, project_event).await;
queue
.publish_project_room_event(project_id, project_event)
.await;
}
}
@ -147,7 +156,10 @@ pub(crate) async fn finalize_react_stream(
if result.is_ok() {
let now = chrono::Utc::now();
if let Err(e) = room_ai::Entity::update_many()
.col_expr(room_ai::Column::CallCount, Expr::col(room_ai::Column::CallCount).add(1))
.col_expr(
room_ai::Column::CallCount,
Expr::col(room_ai::Column::CallCount).add(1),
)
.col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now)))
.filter(room_ai::Column::Room.eq(room_id))
.filter(room_ai::Column::Model.eq(model_id))
@ -272,7 +284,10 @@ pub(crate) async fn finalize_react_stream(
} else {
let now = Utc::now();
if let Err(e) = room_ai::Entity::update_many()
.col_expr(room_ai::Column::CallCount, Expr::col(room_ai::Column::CallCount).add(1))
.col_expr(
room_ai::Column::CallCount,
Expr::col(room_ai::Column::CallCount).add(1),
)
.col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now)))
.filter(room_ai::Column::Room.eq(room_id))
.filter(room_ai::Column::Model.eq(model_id))

View File

@ -7,7 +7,9 @@ use crate::connection::RoomConnectionManager;
use agent::react::ReactStep;
pub(crate) fn lock_or_recover<T>(mutex: &std::sync::Mutex<T>) -> std::sync::MutexGuard<'_, T> {
mutex.lock().unwrap_or_else(|poisoned| poisoned.into_inner())
mutex
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
}
pub(crate) struct ReactStreamingState {

View File

@ -58,10 +58,8 @@ impl RoomAiService {
return Ok(false);
}
let model_ids: std::collections::HashSet<String> = ai_configs
.iter()
.map(|c| c.model.to_string())
.collect();
let model_ids: std::collections::HashSet<String> =
ai_configs.iter().map(|c| c.model.to_string()).collect();
for cap in mention_bracket_re().captures_iter(content) {
if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) {
@ -147,9 +145,11 @@ impl RoomAiService {
content.hash(&mut hasher);
let idemp_key = format!("ai:idempot:{}:{}", room_id, hasher.finish());
{
let mut conn = self.cache.conn().await.map_err(|e| {
RoomError::Internal(format!("cache conn: {}", e))
})?;
let mut conn = self
.cache
.conn()
.await
.map_err(|e| RoomError::Internal(format!("cache conn: {}", e)))?;
let exists = redis::cmd("SET")
.arg(&idemp_key)
.arg("1")
@ -199,15 +199,15 @@ impl RoomAiService {
let user_names = history::get_user_names(&self.db, &user_ids).await;
let mentions =
history::extract_mention_context(&self.db, room.project, content).await;
let mentions = history::extract_mention_context(&self.db, room.project, content).await;
let context_setting = models::projects::project_context_setting::Entity::find_by_id(project.id)
.one(&self.db)
.await
.map_err(|_| ())
.ok()
.and_then(|x| x);
let context_setting =
models::projects::project_context_setting::Entity::find_by_id(project.id)
.one(&self.db)
.await
.map_err(|_| ())
.ok()
.and_then(|x| x);
// Build room-only tool registry (send_message, retract_message)
let mut room_tools = ToolRegistry::new();
@ -221,35 +221,31 @@ impl RoomAiService {
.await
.ok()
.flatten()
.map(|m| m.scope_role().map(|r| r.to_string()).unwrap_or_else(|_| "guest".into()))
.map(|m| {
m.scope_role()
.map(|r| r.to_string())
.unwrap_or_else(|_| "guest".into())
})
.unwrap_or_else(|| "guest".into());
// Build room preamble: room identity, sender info, permissions, history
let room_preamble = build_room_preamble(
&room,
&project,
&model,
&sender,
&sender_role,
&history_messages,
&user_names,
);
let max_tokens = ai_config.max_tokens.unwrap_or(4096) as i32;
let request = AiRequest {
let mut request = AiRequest {
db: self.db.clone(),
cache: self.cache.clone(),
config: self.config.clone(),
model,
model: model.clone(),
project: project.clone(),
context_setting,
sender,
sender: sender.clone(),
room: room.clone(),
input: content.to_string(),
mention: mentions,
history: history_messages,
history_cutoff_seq: None,
user_names,
temperature: ai_config.temperature.unwrap_or(0.7),
max_tokens: ai_config.max_tokens.unwrap_or(4096) as i32,
max_tokens,
top_p: 1.0,
frequency_penalty: 0.0,
presence_penalty: 0.0,
@ -258,37 +254,73 @@ impl RoomAiService {
max_tool_depth: 1000,
};
let (optimized_history, cutoff_seq) = chat_service
.build_room_optimized_context_text(&request)
.await
.unwrap_or_else(|e| {
tracing::warn!(error = %e, "room optimized context failed; using recent history");
(String::new(), None)
});
request.history_cutoff_seq = cutoff_seq;
// Build room preamble: room identity, sender info, permissions, optimized history
let room_preamble = build_room_preamble(
&room,
&project,
&model,
&sender,
&sender_role,
&optimized_history,
);
// Pre-flight balance check: verify the project + user can afford at least a minimal AI call
let balance_ok = agent::billing::check_balance(
&self.db, project.id, sender_id, model_id, 500, 250,
).await;
let balance_ok =
agent::billing::check_balance(&self.db, project.id, sender_id, model_id, 500, 250)
.await;
match balance_ok {
Ok(true) => {},
Ok(true) => {}
Ok(false) => {
tracing::warn!(project_id = %project.id, user_id = %sender_id, "Insufficient balance for AI call");
// Persist the billing error
let _ = agent::billing::persist_billing_error(
&self.db, "project", project.id, "insufficient_balance",
&format!("Insufficient balance. Project {} and user {} have no remaining funds.", project.id, sender_id),
&self.db,
"project",
project.id,
"insufficient_balance",
&format!(
"Insufficient balance. Project {} and user {} have no remaining funds.",
project.id, sender_id
),
Some(serde_json::json!({
"user_id": sender_id.to_string(),
"model_id": model_id.to_string(),
"project_id": project.id.to_string(),
})),
).await;
)
.await;
// Send the billing error as a visible message in the room
let error_content = format!("⚠️ Billing Error: Insufficient balance. Your project and personal account do not have enough funds to process this AI request. Please add credits to continue using AI features.");
let error_content = format!(
"⚠️ Billing Error: Insufficient balance. Your project and personal account do not have enough funds to process this AI request. Please add credits to continue using AI features."
);
let _ = super::ai_common::create_and_publish_ai_message(
&self.db, &self.cache, &self.queue, &self.room_manager,
room_id, project.id, Uuid::nil(), error_content,
model_id, Some("System".to_string()),
).await;
&self.db,
&self.cache,
&self.queue,
&self.room_manager,
room_id,
project.id,
Uuid::nil(),
error_content,
model_id,
Some("System".to_string()),
)
.await;
return Ok(());
},
}
Err(e) => {
tracing::warn!(error = %e, "Balance check failed, proceeding without pre-flight check");
}
@ -299,18 +331,36 @@ impl RoomAiService {
// Dispatch to ReAct streaming or nonstreaming with room tools and preamble
if use_streaming {
ai_react_streaming::process_message_ai_react_streaming(
chat_service, request, room_id, room.project, model_id,
lock_guard, self.db.clone(), self.cache.clone(),
self.queue.clone(), self.room_manager.clone(),
room_tools, room_preamble,
).await;
chat_service,
request,
room_id,
room.project,
model_id,
lock_guard,
self.db.clone(),
self.cache.clone(),
self.queue.clone(),
self.room_manager.clone(),
room_tools,
room_preamble,
)
.await;
} else {
ai_react_nonstreaming::process_message_ai_react_nonstreaming(
chat_service, request, room_id, room.project, model_id,
lock_guard, self.db.clone(), self.cache.clone(),
self.queue.clone(), self.room_manager.clone(),
room_tools, room_preamble,
).await;
chat_service,
request,
room_id,
room.project,
model_id,
lock_guard,
self.db.clone(),
self.cache.clone(),
self.queue.clone(),
self.room_manager.clone(),
room_tools,
room_preamble,
)
.await;
}
Ok(())
@ -328,8 +378,7 @@ fn build_room_preamble(
model: &models::agents::model::Model,
sender: &models::users::user::Model,
sender_role: &str,
history: &[models::rooms::room_message::Model],
user_names: &std::collections::HashMap<Uuid, String>,
optimized_history: &str,
) -> String {
let mut preamble = String::new();
@ -351,9 +400,7 @@ fn build_room_preamble(
- **Name:** {}\n\
- **Model ID:** `{}`\n\
You are an AI assistant in this room. When referring to yourself, use your name **{}**.\n",
model.name,
model.id,
model.name,
model.name, model.id, model.name,
));
// Sender info and permissions
@ -361,29 +408,16 @@ fn build_room_preamble(
"\n### Who Mentioned You\n\
- **User:** {} (ID: `{}`)\n\
- **Project Role:** {}\n",
sender.username,
sender.uid,
sender_role,
sender.username, sender.uid, sender_role,
));
if let Some(ref display_name) = sender.display_name {
preamble.push_str(&format!("- **Display Name:** {}\n", display_name));
}
// Recent history (sliding window)
if !history.is_empty() {
preamble.push_str(&format!(
"\n### Recent Conversation (last {} messages)\n",
history.len()
));
for msg in history.iter().rev().take(20) {
let author = msg
.sender_id
.and_then(|uid| user_names.get(&uid))
.cloned()
.unwrap_or_else(|| "unknown".into());
let content = msg.content.clone();
preamble.push_str(&format!("- **{}**: {}\n", author, content));
}
if !optimized_history.trim().is_empty() {
preamble.push_str("\n");
preamble.push_str(optimized_history);
preamble.push_str("\n");
}
// Append room communication rules

View File

@ -7,13 +7,13 @@ use db::cache::AppCache;
use db::database::AppDatabase;
use models::rooms::{room_ai, room_message};
use queue::{MessageProducer, ProjectRoomEvent, RoomMessageEnvelope};
use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, ExprTrait, QueryFilter, Set};
use sea_orm::{ColumnTrait, EntityTrait, ExprTrait, QueryFilter, Set, sea_query::Expr};
use tokio::sync::Mutex;
use uuid::Uuid;
use super::sequence::next_room_message_seq_internal;
use crate::connection::RoomConnectionManager;
use agent::chat::{normalize_thinking_content, AiChunkType, AiRequest, ChatService};
use agent::chat::{AiChunkType, AiRequest, ChatService, normalize_thinking_content};
pub async fn process_message_ai_streaming(
chat_service: Arc<ChatService>,
@ -121,9 +121,8 @@ pub async fn process_message_ai_streaming(
buf.push_str(&chunk.content);
// Flush on natural boundaries: newlines, or when content buffer grows large.
// Small tokens (< 6 chars without newline) accumulate until boundary.
let should_flush = chunk.content.contains('\n')
|| buf.len() >= 60
|| chunk.done;
let should_flush =
chunk.content.contains('\n') || buf.len() >= 60 || chunk.done;
if should_flush {
let content = std::mem::take(&mut *buf);
buf.clear();
@ -154,7 +153,8 @@ pub async fn process_message_ai_streaming(
};
let thinking_content = {
let mut buf = thinking_buf.lock().await;
let content = normalize_thinking_content(&std::mem::take(&mut *buf));
let content =
normalize_thinking_content(&std::mem::take(&mut *buf));
buf.clear();
content
};
@ -221,7 +221,9 @@ pub async fn process_message_ai_streaming(
action: "start".to_string(),
sender_type: Some("ai".to_string()),
};
room_manager.broadcast_typing(room_id_inner, typing_start.clone()).await;
room_manager
.broadcast_typing(room_id_inner, typing_start.clone())
.await;
let (typing_cancel_tx, typing_cancel_rx) = tokio::sync::oneshot::channel::<()>();
let typing_renew_handle = tokio::spawn({
@ -320,10 +322,7 @@ pub async fn process_message_ai_streaming(
room_ai::Column::CallCount,
Expr::col(room_ai::Column::CallCount).add(1),
)
.col_expr(
room_ai::Column::LastCallAt,
Expr::value(Some(now)),
)
.col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now)))
.filter(room_ai::Column::Room.eq(room_id_inner))
.filter(room_ai::Column::Model.eq(model_id))
.exec(&db)
@ -363,7 +362,9 @@ pub async fn process_message_ai_streaming(
action: "stop".to_string(),
sender_type: Some("ai".to_string()),
};
room_manager.broadcast_typing(room_id_inner, typing_stop).await;
room_manager
.broadcast_typing(room_id_inner, typing_stop)
.await;
let event = ProjectRoomEvent {
event_type: crate::RoomEventType::NewMessage.as_str().into(),
@ -391,7 +392,9 @@ pub async fn process_message_ai_streaming(
action: "stop".to_string(),
sender_type: Some("ai".to_string()),
};
room_manager.broadcast_typing(room_id_inner, typing_stop).await;
room_manager
.broadcast_typing(room_id_inner, typing_stop)
.await;
let event = RoomMessageStreamChunkEvent {
message_id: streaming_msg_id,

View File

@ -24,18 +24,22 @@ mod workers_spawn;
pub use patterns::{mention_bracket_re, mention_tag_re, user_mention_re};
pub use access::{check_room_access, check_project_member, require_room_access, require_room_admin, require_project_admin, is_room_admin, is_project_admin, find_room_or_404, get_or_create_room_user_state};
pub use access::{
check_project_member, check_room_access, find_room_or_404, get_or_create_room_user_state,
is_project_admin, is_room_admin, require_project_admin, require_room_access,
require_room_admin,
};
pub use ai_common::create_and_publish_ai_message;
pub use ai_service::RoomAiService;
pub use ai_nonstreaming::process_message_ai_nonstreaming;
pub use ai_react_nonstreaming::process_message_ai_react_nonstreaming;
pub use ai_react_streaming::process_message_ai_react_streaming;
pub use ai_service::RoomAiService;
pub use ai_streaming::process_message_ai_streaming;
pub use history::{get_room_history, get_user_names, get_room_ai_config, extract_mention_context};
pub use history::{extract_mention_context, get_room_ai_config, get_room_history, get_user_names};
pub use mentions::extract_mentions;
pub use notifications::{notify_project_members, publish_room_event};
pub use sequence::next_room_message_seq_internal;
pub use workers::{start_workers, PushNotificationFn};
pub use workers::{PushNotificationFn, start_workers};
pub use workers_spawn::{spawn_agent_task, spawn_room_workers, unmark_room_spawned};
use std::sync::Arc;
@ -49,12 +53,12 @@ use queue::{MessageProducer, ProjectRoomEvent};
use sea_orm::{ColumnTrait, QueryFilter};
use uuid::Uuid;
use crate::connection::{RoomConnectionManager, DedupCache};
use crate::connection::{DedupCache, RoomConnectionManager};
use crate::error::RoomError;
use crate::presence::PresenceStore;
use agent::TaskService;
use agent::chat::ChatService;
use agent::embed::EmbedService;
use agent::TaskService;
use models::agent_task::AgentType;
const DEFAULT_MAX_CONCURRENT_WORKERS: usize = 1024;
@ -91,8 +95,10 @@ impl RoomService {
push_fn: Option<workers::PushNotificationFn>,
embed_service: Option<Arc<EmbedService>>,
) -> Self {
let dedup_cache: DedupCache =
Arc::new(dashmap::DashMap::with_capacity_and_hasher(10000, Default::default()));
let dedup_cache: DedupCache = Arc::new(dashmap::DashMap::with_capacity_and_hasher(
10000,
Default::default(),
));
let ai_service = RoomAiService::new(
db.clone(),
cache.clone(),
@ -237,20 +243,30 @@ impl RoomService {
if type_m.as_str() == "user" {
let id = id_m.as_str().trim();
if let Ok(uuid) = Uuid::parse_str(id) {
if !resolved.contains(&uuid) { resolved.push(uuid); }
if !resolved.contains(&uuid) {
resolved.push(uuid);
}
} else if let Some(label_m) = cap.get(3) {
let label = label_m.as_str().trim();
if !label.is_empty() {
let label_lower = label.to_lowercase();
if seen_usernames.contains(&label_lower) { continue; }
if seen_usernames.contains(&label_lower) {
continue;
}
seen_usernames.push(label_lower.clone());
if let Some(user) = User::find()
.filter(models::users::user::Column::Username.ilike(&label_lower))
.one(&self.db).await.ok().flatten()
.one(&self.db)
.await
.ok()
.flatten()
{
if !resolved.contains(&user.uid) { resolved.push(user.uid); }
} }
if !resolved.contains(&user.uid) {
resolved.push(user.uid);
}
}
}
}
}
}
@ -300,7 +316,11 @@ impl RoomService {
access::is_room_admin(&self.db, room_id, user_id).await
}
pub async fn require_project_admin(&self, project_id: Uuid, user_id: Uuid) -> Result<(), RoomError> {
pub async fn require_project_admin(
&self,
project_id: Uuid,
user_id: Uuid,
) -> Result<(), RoomError> {
access::require_project_admin(&self.db, project_id, user_id).await
}
@ -322,18 +342,20 @@ impl RoomService {
if event.is_some() {
// Broadcast to project subscribers
if let Some(pid) = project_id {
self.room_manager.broadcast_project(
pid,
queue::ProjectRoomEvent {
event_type: "presence_changed".into(),
project_id: pid,
room_id: None,
category_id: None,
message_id: None,
seq: None,
timestamp: chrono::Utc::now(),
},
).await;
self.room_manager
.broadcast_project(
pid,
queue::ProjectRoomEvent {
event_type: "presence_changed".into(),
project_id: pid,
room_id: None,
category_id: None,
message_id: None,
seq: None,
timestamp: chrono::Utc::now(),
},
)
.await;
}
}
event
@ -347,7 +369,8 @@ impl RoomService {
text: Option<String>,
expires_at: Option<chrono::DateTime<chrono::Utc>>,
) -> Option<crate::presence::CustomStatusChanged> {
self.presence.set_custom_status(user_id, emoji, text, expires_at)
self.presence
.set_custom_status(user_id, emoji, text, expires_at)
}
/// Get all presence entries for a project.
@ -365,20 +388,22 @@ impl RoomService {
if event.is_some() {
// Broadcast to project subscribers
if let Some(pid) = project_id {
self.room_manager.broadcast_project(
pid,
queue::ProjectRoomEvent {
event_type: "presence_changed".into(),
project_id: pid,
room_id: None,
category_id: None,
message_id: None,
seq: None,
timestamp: chrono::Utc::now(),
},
).await;
self.room_manager
.broadcast_project(
pid,
queue::ProjectRoomEvent {
event_type: "presence_changed".into(),
project_id: pid,
room_id: None,
category_id: None,
message_id: None,
seq: None,
timestamp: chrono::Utc::now(),
},
)
.await;
}
}
event
}
}
}

View File

@ -1,10 +1,10 @@
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use uuid::Uuid;
use super::RoomService;
use super::ai_react_nonstreaming;
use super::ai_react_streaming;
use super::history;
use super::RoomService;
use crate::error::RoomError;
use crate::service::{mention_bracket_re, mention_tag_re};
use agent::chat::AiRequest;
@ -79,12 +79,13 @@ impl RoomService {
.await?
.ok_or_else(|| RoomError::NotFound("Project not found".to_string()))?;
let context_setting = models::projects::project_context_setting::Entity::find_by_id(project.id)
.one(&self.db)
.await
.map_err(|_| ())
.ok()
.and_then(|x| x);
let context_setting =
models::projects::project_context_setting::Entity::find_by_id(project.id)
.one(&self.db)
.await
.map_err(|_| ())
.ok()
.and_then(|x| x);
let model = models::agents::model::Entity::find_by_id(model_id)
.one(&self.db)
@ -119,34 +120,31 @@ impl RoomService {
.await
.ok()
.flatten()
.map(|m| m.scope_role().map(|r| r.to_string()).unwrap_or_else(|_| "guest".into()))
.map(|m| {
m.scope_role()
.map(|r| r.to_string())
.unwrap_or_else(|_| "guest".into())
})
.unwrap_or_else(|| "guest".into());
// Build room preamble: room identity, sender info, permissions, history
let room_preamble = build_room_preamble(
&room,
&project,
&sender,
&sender_role,
&history,
&user_names,
);
let max_tokens = ai_config.max_tokens.unwrap_or(4096) as i32;
let request = AiRequest {
let mut request = AiRequest {
db: self.db.clone(),
cache: self.cache.clone(),
config: self.config.clone(),
model,
project: project.clone(),
context_setting,
sender,
sender: sender.clone(),
room: room.clone(),
input: content,
mention: mentions,
history,
history_cutoff_seq: None,
user_names,
temperature: ai_config.temperature.unwrap_or(0.7),
max_tokens: ai_config.max_tokens.unwrap_or(4096) as i32,
max_tokens,
top_p: 1.0,
frequency_penalty: 0.0,
presence_penalty: 0.0,
@ -155,6 +153,19 @@ impl RoomService {
max_tool_depth: 1000,
};
let (optimized_history, cutoff_seq) = chat_service
.build_room_optimized_context_text(&request)
.await
.unwrap_or_else(|e| {
tracing::warn!(error = %e, "room optimized context failed; using recent history");
(String::new(), None)
});
request.history_cutoff_seq = cutoff_seq;
// Build room preamble: room identity, sender info, permissions, optimized history
let room_preamble =
build_room_preamble(&room, &project, &sender, &sender_role, &optimized_history);
let use_streaming = ai_config.stream;
// Dispatch to ReAct streaming or nonstreaming with room tools and preamble
@ -202,8 +213,7 @@ fn build_room_preamble(
project: &models::projects::project::Model,
sender: &models::users::user::Model,
sender_role: &str,
history: &[models::rooms::room_message::Model],
user_names: &std::collections::HashMap<Uuid, String>,
optimized_history: &str,
) -> String {
let mut preamble = String::new();
@ -222,33 +232,16 @@ fn build_room_preamble(
"\n### Who Mentioned You\n\
- **User:** {} (ID: `{}`)\n\
- **Project Role:** {}\n",
sender.username,
sender.uid,
sender_role,
sender.username, sender.uid, sender_role,
));
if let Some(ref display_name) = sender.display_name {
preamble.push_str(&format!("- **Display Name:** {}\n", display_name));
}
// Recent history (sliding window, last 20 messages)
if !history.is_empty() {
preamble.push_str(&format!(
"\n### Recent Conversation (last {} messages)\n",
history.len()
));
for msg in history.iter().rev().take(20) {
let author = msg
.sender_id
.and_then(|uid| user_names.get(&uid))
.cloned()
.unwrap_or_else(|| "unknown".into());
let content = if msg.content.len() > 200 {
format!("{}...", &msg.content[..200])
} else {
msg.content.clone()
};
preamble.push_str(&format!("- **{}**: {}\n", author, content));
}
if !optimized_history.trim().is_empty() {
preamble.push_str("\n");
preamble.push_str(optimized_history);
preamble.push_str("\n");
}
preamble.push_str(ROOM_CONTEXT_PROMPT);

View File

@ -1,9 +1,11 @@
use models::rooms::{
room, room_ai, room_category, room_message, room_notifications, room_pin,
room_thread,
room, room_ai, room_category, room_message, room_notifications, room_pin, room_thread,
};
use crate::{RoomCategoryResponse, RoomResponse, RoomMessageResponse, RoomThreadResponse, RoomPinResponse, RoomAiResponse, NotificationResponse};
use crate::{
NotificationResponse, RoomAiResponse, RoomCategoryResponse, RoomMessageResponse,
RoomPinResponse, RoomResponse, RoomThreadResponse,
};
impl From<room_category::Model> for RoomCategoryResponse {
fn from(value: room_category::Model) -> Self {
@ -135,4 +137,4 @@ impl From<room_notifications::Model> for NotificationResponse {
expires_at: value.expires_at,
}
}
}
}

View File

@ -40,17 +40,26 @@ impl RoomService {
pub(crate) fn parse_message_content_type(
content_type: Option<String>,
) -> Result<models::rooms::MessageContentType, RoomError> {
match content_type.unwrap_or_else(|| "text".to_string()).to_lowercase().as_str() {
match content_type
.unwrap_or_else(|| "text".to_string())
.to_lowercase()
.as_str()
{
"text" => Ok(models::rooms::MessageContentType::Text),
"image" => Ok(models::rooms::MessageContentType::Image),
"audio" => Ok(models::rooms::MessageContentType::Audio),
"video" => Ok(models::rooms::MessageContentType::Video),
"file" => Ok(models::rooms::MessageContentType::File),
_ => Err(RoomError::BadRequest("invalid message content_type".to_string())),
_ => Err(RoomError::BadRequest(
"invalid message content_type".to_string(),
)),
}
}
pub async fn utils_find_project_by_name(&self, name: String) -> Result<project::Model, RoomError> {
pub async fn utils_find_project_by_name(
&self,
name: String,
) -> Result<project::Model, RoomError> {
match project::Entity::find()
.filter(project::Column::Name.eq(name.clone()))
.one(&self.db)
@ -84,7 +93,11 @@ impl RoomService {
.ok_or_else(|| RoomError::NotFound("Project not found".to_string()))
}
pub async fn check_project_access(&self, project_uid: Uuid, user_uid: Uuid) -> Result<(), RoomError> {
pub async fn check_project_access(
&self,
project_uid: Uuid,
user_uid: Uuid,
) -> Result<(), RoomError> {
let project = project::Entity::find_by_id(project_uid)
.one(&self.db)
.await
@ -103,20 +116,27 @@ impl RoomService {
.one(&self.db)
.await?;
if member.is_some() { Ok(()) } else { Err(RoomError::NoPower) }
if member.is_some() {
Ok(())
} else {
Err(RoomError::NoPower)
}
}
pub async fn ensure_room_visible_for_user(
&self, room: &room::Model, user_id: Uuid,
&self,
room: &room::Model,
user_id: Uuid,
) -> Result<(), RoomError> {
self.require_room_access(room.id, user_id).await
}
pub async fn get_room_version(&self, room_id: Uuid) -> Result<i64, RoomError> {
let version_key = format!("room:version:{}", room_id);
let mut conn = self.cache.conn().await.map_err(|e| {
RoomError::Internal(format!("failed to get redis for version: {}", e))
})?;
let mut conn =
self.cache.conn().await.map_err(|e| {
RoomError::Internal(format!("failed to get redis for version: {}", e))
})?;
let version: Option<i64> = redis::cmd("GET")
.arg(&version_key)
.query_async(&mut conn)
@ -130,12 +150,14 @@ impl RoomService {
}
pub async fn raw_increment_room_version(
cache: &db::cache::AppCache, room_id: Uuid,
cache: &db::cache::AppCache,
room_id: Uuid,
) -> Result<i64, RoomError> {
let version_key = format!("room:version:{}", room_id);
let mut conn = cache.conn().await.map_err(|e| {
RoomError::Internal(format!("failed to get redis for version: {}", e))
})?;
let mut conn = cache
.conn()
.await
.map_err(|e| RoomError::Internal(format!("failed to get redis for version: {}", e)))?;
let version: i64 = redis::cmd("INCR")
.arg(&version_key)
.query_async(&mut conn)
@ -143,4 +165,4 @@ impl RoomService {
.map_err(|e| RoomError::Internal(format!("version INCR: {}", e)))?;
Ok(version)
}
}
}

View File

@ -6,9 +6,10 @@ use queue::MessageProducer;
use sea_orm::EntityTrait;
use uuid::Uuid;
use crate::connection::{make_persist_fn, DedupCache, PersistFn, RoomConnectionManager};
use crate::connection::{DedupCache, PersistFn, RoomConnectionManager, make_persist_fn};
pub type PushNotificationFn = Arc<dyn Fn(Uuid, String, Option<String>, Option<String>) + Send + Sync>;
pub type PushNotificationFn =
Arc<dyn Fn(Uuid, String, Option<String>, Option<String>) + Send + Sync>;
/// Start global workers (JetStream persist consumer + cleanup). Room broadcast
/// subscriptions are spawned lazily when the first WS client subscribes to a room,
@ -99,4 +100,4 @@ pub async fn start_workers(
}
tracing::info!("room workers stopped");
Ok(())
}
}

View File

@ -1,9 +1,9 @@
use crate::NotificationResponse;
use crate::NotificationType;
use crate::UserInfo;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::UserInfo;
use crate::NotificationType;
use crate::NotificationResponse;
#[derive(Debug, Clone, Deserialize, Serialize, utoipa::ToSchema)]
pub struct RoomCategoryCreateRequest {
@ -182,7 +182,9 @@ pub struct RoomMessageResponse {
impl RoomMessageResponse {
pub fn detect_chunked(thinking: &Option<String>) -> bool {
thinking.as_ref().is_some_and(|s| s.contains("\"__chunks__\""))
thinking
.as_ref()
.is_some_and(|s| s.contains("\"__chunks__\""))
}
}
@ -280,4 +282,4 @@ pub struct NotificationListResponse {
pub notifications: Vec<NotificationResponse>,
pub total: i64,
pub unread_count: i64,
}
}