use dashmap::DashMap; use std::pin::Pin; use std::sync::Arc; use std::sync::LazyLock; use chrono::Utc; use db::cache::AppCache; use db::database::AppDatabase; use models::projects::project_members; use models::rooms::room; use models::rooms::room_ai; use models::EntityTrait; use queue::{AgentTaskEvent, MessageProducer, ProjectRoomEvent, RoomMessageEnvelope}; use sea_orm::{sea_query::Expr, ColumnTrait, ExprTrait, QueryFilter, QueryOrder, QuerySelect}; use uuid::Uuid; use crate::connection::{ extract_get_redis, make_persist_fn, DedupCache, PersistFn, RoomConnectionManager, }; use crate::error::RoomError; use agent::chat::{AiRequest, ChatService, Mention}; use agent::TaskService; use models::agent_task::AgentType; const DEFAULT_MAX_CONCURRENT_WORKERS: usize = 1024; static USER_MENTION_RE: LazyLock regex_lite::Regex> = LazyLock::new(|| regex_lite::Regex::new(r"\s*([^<]+?)\s*").unwrap()); #[derive(Clone)] pub struct RoomService { pub db: AppDatabase, pub cache: AppCache, pub room_manager: Arc, pub queue: MessageProducer, pub redis_url: String, pub chat_service: Option>, pub task_service: Option>, pub log: slog::Logger, worker_semaphore: Arc, dedup_cache: DedupCache, } impl RoomService { pub fn new( db: AppDatabase, cache: AppCache, queue: MessageProducer, room_manager: Arc, redis_url: String, chat_service: Option>, task_service: Option>, log: slog::Logger, max_concurrent_workers: Option, ) -> Self { let dedup_cache: DedupCache = Arc::new(DashMap::with_capacity_and_hasher(10000, Default::default())); Self { db, cache, room_manager, queue, redis_url, chat_service, task_service, log, worker_semaphore: Arc::new(tokio::sync::Semaphore::new( max_concurrent_workers.unwrap_or(DEFAULT_MAX_CONCURRENT_WORKERS), )), dedup_cache, } } pub async fn start_workers( &self, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, log: slog::Logger, ) -> anyhow::Result<()> { use models::rooms::Room; use sea_orm::EntityTrait; let rooms: Vec = Room::find().all(&self.db).await?; let room_ids: Vec = rooms.iter().map(|r| r.id).collect(); let project_ids: Vec = rooms .iter() .map(|r| r.project) .collect::>() .into_iter() .collect(); // Save a clone for task subscriber handles before `project_ids` gets moved. let task_project_ids = project_ids.clone(); slog::info!(log, "starting room workers"; "room_count" => room_ids.len(), "project_count" => project_ids.len()); let persist_fn: PersistFn = make_persist_fn( self.db.clone(), self.room_manager.metrics.clone(), self.dedup_cache.clone(), ); let get_redis: Arc queue::worker::RedisFuture + Send + Sync> = extract_get_redis(self.queue.clone()); let worker_log = log.clone(); let worker_room_ids = room_ids.clone(); let worker_shutdown = shutdown_rx.resubscribe(); let worker_handle = tokio::spawn({ let get_redis = get_redis.clone(); let persist_fn = persist_fn.clone(); async move { queue::start_worker( worker_room_ids, get_redis, persist_fn, worker_shutdown, worker_log, ) .await; } }); let manager = self.room_manager.clone(); let subscriber_log = log.clone(); let redis_url = self.redis_url.clone(); let mut handles: Vec<_> = room_ids .into_iter() .map(|room_id| { let manager = manager.clone(); let log = subscriber_log.clone(); let redis_url = redis_url.clone(); let shutdown_rx = shutdown_rx.resubscribe(); tokio::spawn(async move { crate::connection::subscribe_room_events( redis_url, manager, room_id, log, shutdown_rx, ) .await; }) }) .collect(); let project_handles: Vec<_> = project_ids .into_iter() .map(|project_id| { let manager = manager.clone(); let log = subscriber_log.clone(); let redis_url = redis_url.clone(); let shutdown_rx = shutdown_rx.resubscribe(); tokio::spawn(async move { crate::connection::subscribe_project_room_events( redis_url, manager, project_id, log, shutdown_rx, ) .await; }) }) .collect(); handles.extend(project_handles); // Subscribe to agent task events for each project. let task_handles: Vec<_> = task_project_ids .into_iter() .map(|project_id| { let manager = manager.clone(); let log = subscriber_log.clone(); let redis_url = redis_url.clone(); let shutdown_rx = shutdown_rx.resubscribe(); tokio::spawn(async move { crate::connection::subscribe_task_events_fn( redis_url, manager, project_id, log, shutdown_rx, ) .await; }) }) .collect(); handles.extend(task_handles); let cleanup_handle = { let manager = self.room_manager.clone(); let db = self.db.clone(); let dedup_cache = self.dedup_cache.clone(); let mut cleanup_shutdown = shutdown_rx.resubscribe(); tokio::spawn(async move { let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300)); interval.tick().await; loop { tokio::select! { _ = interval.tick() => { manager.cleanup_rate_limit().await; crate::connection::cleanup_dedup_cache(&dedup_cache); if let Ok(rooms) = Room::find().all(&db).await { let room_ids: Vec<_> = rooms.iter().map(|r| r.id).collect(); let project_ids: Vec<_> = rooms.iter().map(|r| r.project).collect(); manager.metrics.cleanup_stale_rooms(&room_ids).await; manager.prune_stale_rooms(&room_ids).await; manager.prune_stale_projects(&project_ids).await; } } _ = cleanup_shutdown.recv() => { slog::info!(slog::Logger::root(slog::Discard, slog::o!()), "cleanup task shutting down"); break; } } } }) }; handles.push(cleanup_handle); let _ = shutdown_rx.recv().await; slog::info!(log, "room workers shutting down"); for h in handles { let _ = h.abort(); } let _ = worker_handle.await; slog::info!(log, "room workers stopped"); Ok(()) } /// Spawn a background agent task: /// 1. Creates a DB record (status = pending → running) /// 2. Publishes a "started" event via Redis Pub/Sub /// 3. Runs `execute()` behind a semaphore /// 4. On complete/fail, updates the record and publishes the final event pub async fn spawn_agent_task( &self, project_id: Uuid, agent_type: AgentType, input: String, _title: Option, execute: F, ) -> anyhow::Result where F: FnOnce(i64, Arc) -> Fut + Send + 'static, Fut: std::future::Future> + Send, { let task_service = match &self.task_service { Some(ts) => ts.clone(), None => return Err(anyhow::anyhow!("task service not configured")), }; let task = task_service .create(project_id, input, agent_type) .await .map_err(|e| anyhow::anyhow!("create task failed: {}", e))?; let task_id = task.id; // Publish "started" event via Redis Pub/Sub. let started_event = AgentTaskEvent { task_id, project_id, parent_id: task.parent_id, event: "started".to_string(), message: None, output: None, error: None, status: models::agent_task::TaskStatus::Running.to_string(), timestamp: Utc::now(), }; self.queue .publish_agent_task_event(project_id, started_event) .await; // Mark task as running. let _ = task_service.start(task_id).await; let queue = self.queue.clone(); let room_manager = self.room_manager.clone(); let semaphore = self.worker_semaphore.clone(); let log = self.log.clone(); // Spawn the background task. tokio::spawn(async move { let _permit = semaphore.acquire().await.expect("semaphore closed"); let result = execute(task_id, task_service.clone()).await; let event = match result { Ok(output) => { let _ = task_service.complete(task_id, &output).await; AgentTaskEvent { task_id, project_id, parent_id: None, event: "done".to_string(), message: None, output: Some(output), error: None, status: models::agent_task::TaskStatus::Done.to_string(), timestamp: chrono::Utc::now(), } } Err(err) => { let _ = task_service.fail(task_id, &err).await; AgentTaskEvent { task_id, project_id, parent_id: None, event: "failed".to_string(), message: None, output: None, error: Some(err), status: models::agent_task::TaskStatus::Failed.to_string(), timestamp: chrono::Utc::now(), } } }; queue .publish_agent_task_event(project_id, event.clone()) .await; room_manager.broadcast_agent_task(project_id, event).await; slog::info!(log, "agent task finished"; "task_id" => task_id, "project_id" => %project_id); }); Ok(task_id) } pub fn spawn_room_workers(&self, room_id: uuid::Uuid) { let persist_fn: PersistFn = make_persist_fn( self.db.clone(), self.room_manager.metrics.clone(), self.dedup_cache.clone(), ); let get_redis: Arc queue::worker::RedisFuture + Send + Sync> = extract_get_redis(self.queue.clone()); let manager = self.room_manager.clone(); let redis_url = self.redis_url.clone(); let log = self.log.clone(); let semaphore = self.worker_semaphore.clone(); let db = self.db.clone(); let log2 = log.clone(); let log3 = log.clone(); let manager2 = self.room_manager.clone(); let redis_url2 = redis_url.clone(); let redis_url3 = redis_url.clone(); tokio::spawn(async move { let _permit = match semaphore.acquire_owned().await { Ok(p) => p, Err(_) => return, }; let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel::<()>(1); queue::room_worker_task( room_id, uuid::Uuid::new_v4().to_string(), get_redis, persist_fn, shutdown_rx, log, ) .await; let _ = shutdown_tx.send(()); }); tokio::spawn(async move { let shutdown_rx = manager.register_room(room_id).await; crate::connection::subscribe_room_events( redis_url2, manager.clone(), room_id, log2, shutdown_rx, ) .await; }); tokio::spawn(async move { let project_id = { let room = room::Entity::find_by_id(room_id) .one(&db) .await .ok() .flatten(); match room { Some(r) => r.project, None => return, } }; let shutdown_rx = manager2.register_project(project_id).await; crate::connection::subscribe_project_room_events( redis_url3, manager2, project_id, log3, shutdown_rx, ) .await; }); } pub async fn publish_room_event( &self, project_id: uuid::Uuid, event_type: super::RoomEventType, room_id: Option, category_id: Option, message_id: Option, seq: Option, ) { let event = ProjectRoomEvent { event_type: event_type.as_str().into(), project_id, room_id, category_id, message_id, seq, timestamp: Utc::now(), }; self.queue .publish_project_room_event(project_id, event) .await; } pub(crate) fn notify_project_members( &self, project_id: uuid::Uuid, notification_type: super::NotificationType, title: String, content: Option, related_room_id: Option, ) { let db = self.db.clone(); let notification_type_inner = notification_type; let title_inner = title; let content_inner = content; let related_room_id_inner = related_room_id; let project_id_inner = project_id; tokio::spawn(async move { let members = match project_members::Entity::find() .filter(project_members::Column::Project.eq(project_id_inner)) .all(&db) .await { Ok(m) => m, Err(e) => { slog::error!(slog::Logger::root(slog::Discard, slog::o!()), "notify_project_members: failed to fetch members"; "project_id" => %project_id_inner, "error" => %e); return; } }; for member in members { let user_id = member.user; if let Err(e) = Self::_notification_create_sync( &db, notification_type_inner, user_id, title_inner.clone(), content_inner.clone(), related_room_id_inner, project_id_inner, ) .await { slog::warn!(slog::Logger::root(slog::Discard, slog::o!()), "notify_project_members: failed to create notification for user"; "user_id" => %user_id, "project_id" => %project_id_inner, "error" => %e); } } }); } async fn _notification_create_sync( db: &db::database::AppDatabase, notification_type: super::NotificationType, user_id: uuid::Uuid, title: String, content: Option, related_room_id: Option, project_id: uuid::Uuid, ) -> Result<(), crate::error::RoomError> { use chrono::Utc; use models::rooms::room_notifications; use sea_orm::{ActiveModelTrait, Set}; let notification_type_model = match notification_type { super::NotificationType::Mention => room_notifications::NotificationType::Mention, super::NotificationType::Invitation => room_notifications::NotificationType::Invitation, super::NotificationType::RoleChange => room_notifications::NotificationType::RoleChange, super::NotificationType::RoomCreated => { room_notifications::NotificationType::RoomCreated } super::NotificationType::RoomDeleted => { room_notifications::NotificationType::RoomDeleted } super::NotificationType::SystemAnnouncement => { room_notifications::NotificationType::SystemAnnouncement } }; let _model = room_notifications::ActiveModel { id: Set(uuid::Uuid::now_v7()), room: Set(related_room_id), project: Set(Some(project_id)), user_id: Set(Some(user_id)), notification_type: Set(notification_type_model), related_message_id: Set(None), related_user_id: Set(None), related_room_id: Set(related_room_id), title: Set(title), content: Set(content), metadata: Set(None), is_read: Set(false), is_archived: Set(false), created_at: Set(Utc::now()), read_at: Set(None), expires_at: Set(None), } .insert(db) .await .map_err(|e| crate::error::RoomError::Database(e))?; Ok(()) } pub fn extract_mentions(content: &str) -> Vec { let mut mentioned = Vec::new(); for cap in USER_MENTION_RE.captures_iter(content) { if let Some(inner) = cap.get(1) { let token = inner.as_str().trim(); if let Ok(uuid) = Uuid::parse_str(token) { if !mentioned.contains(&uuid) { mentioned.push(uuid); } } } } mentioned } pub async fn resolve_mentions(&self, content: &str) -> Vec { use models::users::User; use sea_orm::EntityTrait; let mut resolved: Vec = Vec::new(); let mut seen_usernames: Vec = Vec::new(); for cap in USER_MENTION_RE.captures_iter(content) { if let Some(inner) = cap.get(1) { let token = inner.as_str().trim(); if let Ok(uuid) = Uuid::parse_str(token) { if !resolved.contains(&uuid) { resolved.push(uuid); } continue; } let token_lower = token.to_lowercase(); if seen_usernames.contains(&token_lower) { continue; } seen_usernames.push(token_lower.clone()); if let Some(user) = User::find() .filter(models::users::user::Column::Username.eq(token_lower)) .one(&self.db) .await .ok() .flatten() { if !resolved.contains(&user.uid) { resolved.push(user.uid); } } } } resolved } pub async fn check_room_access(&self, room_id: Uuid, user_id: Uuid) -> Result<(), RoomError> { let room = room::Entity::find_by_id(room_id) .one(&self.db) .await? .ok_or_else(|| RoomError::NotFound("Room not found".to_string()))?; if room.public { return Ok(()); } if self.require_room_member(room_id, user_id).await.is_ok() { return Ok(()); } self.check_project_member(room.project, user_id).await?; Ok(()) } pub async fn check_project_member( &self, project_id: Uuid, user_id: Uuid, ) -> Result<(), RoomError> { let member = project_members::Entity::find() .filter(project_members::Column::Project.eq(project_id)) .filter(project_members::Column::User.eq(user_id)) .one(&self.db) .await?; if member.is_some() { Ok(()) } else { Err(RoomError::NoPower) } } pub async fn should_ai_respond(&self, room_id: Uuid) -> Result { use models::rooms::room_ai; let ai_config = room_ai::Entity::find() .filter(room_ai::Column::Room.eq(room_id)) .one(&self.db) .await?; Ok(ai_config.is_some()) } pub async fn get_room_ai_config( &self, room_id: Uuid, ) -> Result, RoomError> { use models::rooms::room_ai; let ai_config = room_ai::Entity::find() .filter(room_ai::Column::Room.eq(room_id)) .one(&self.db) .await?; Ok(ai_config) } pub async fn get_user_names( &self, user_ids: &[Uuid], ) -> std::collections::HashMap { use models::users::User; use sea_orm::EntityTrait; let mut names = std::collections::HashMap::new(); if user_ids.is_empty() { return names; } let users = User::find() .filter(models::users::user::Column::Uid.is_in(user_ids.to_vec())) .all(&self.db) .await .unwrap_or_default(); for user in users { names.insert(user.uid, user.username); } names } pub async fn require_room_member(&self, room_id: Uuid, user_id: Uuid) -> Result<(), RoomError> { use models::rooms::room_member::{Column as RmCol, Entity as RoomMember}; let member = RoomMember::find() .filter(RmCol::Room.eq(room_id)) .filter(RmCol::User.eq(user_id)) .one(&self.db) .await?; member .ok_or_else(|| RoomError::NotFound("Room member not found".to_string())) .map(|_| ()) } pub async fn find_room_or_404(&self, room_id: Uuid) -> Result { room::Entity::find_by_id(room_id) .one(&self.db) .await? .ok_or_else(|| RoomError::NotFound("Room not found".to_string())) } pub async fn process_message_ai( &self, room_id: Uuid, _message_id: Uuid, sender_id: Uuid, content: String, ) -> Result<(), RoomError> { let Some(chat_service) = &self.chat_service else { return Ok(()); }; let Some(ai_config) = self.get_room_ai_config(room_id).await? else { return Ok(()); }; let Some(lock_guard) = crate::room_ai_queue::acquire_room_ai_lock(&self.cache, room_id, &self.log).await? else { return Ok(()); }; let room = self.find_room_or_404(room_id).await?; let project = models::projects::project::Entity::find_by_id(room.project) .one(&self.db) .await? .ok_or_else(|| RoomError::NotFound("Project not found".to_string()))?; let model = models::agents::model::Entity::find_by_id(ai_config.model) .one(&self.db) .await? .ok_or_else(|| RoomError::NotFound("AI model not found".to_string()))?; let sender = models::users::User::find_by_id(sender_id) .one(&self.db) .await? .ok_or_else(|| RoomError::NotFound("Sender not found".to_string()))?; let history = self.get_room_history(room_id, 50).await?; let user_ids: Vec = history .iter() .filter_map(|m| m.sender_id) .chain(std::iter::once(sender_id)) .collect(); let user_names = self.get_user_names(&user_ids).await; let mentions = self.extract_mention_context(&content).await; let request = AiRequest { db: self.db.clone(), cache: self.cache.clone(), model, project: project.clone(), sender, room: room.clone(), input: content, mention: mentions, history, user_names, temperature: ai_config.temperature.unwrap_or(0.7), max_tokens: ai_config.max_tokens.unwrap_or(4096) as i32, top_p: 1.0, frequency_penalty: 0.0, presence_penalty: 0.0, think: ai_config.think, tools: None, max_tool_depth: 0, }; let use_streaming = ai_config.stream; if use_streaming { self.process_message_ai_streaming( chat_service.clone(), request, room_id, room.project, ai_config.model, lock_guard, ) .await; } else { self.process_message_ai_nonstreaming( chat_service.clone(), request, room_id, room.project, ai_config.model, lock_guard, ) .await; } Ok(()) } async fn process_message_ai_streaming( &self, chat_service: Arc, request: AiRequest, room_id: Uuid, project_id: Uuid, _model_id: Uuid, lock_guard: crate::room_ai_queue::RoomAiLockGuard, ) { use queue::RoomMessageStreamChunkEvent; let streaming_msg_id = Uuid::now_v7(); let seq = match Self::next_room_message_seq_internal(room_id, &self.db, &self.cache).await { Ok(s) => s, Err(e) => { slog::error!( self.log, "Failed to get seq for streaming AI message: {}", e ); return; } }; let stream_rx = self .room_manager .register_stream_channel(streaming_msg_id) .await; let room_manager = self.room_manager.clone(); let db = self.db.clone(); let room_id_inner = room_id; let project_id_inner = project_id; let now = Utc::now(); let sender_type = "ai".to_string(); let queue = self.queue.clone(); let ai_display_name = request.model.name.clone(); let db = db.clone(); let model_id = request.model.id; let log = self.log.clone(); tokio::spawn(async move { let _lock_guard = lock_guard; let room_manager = room_manager.clone(); let db = db.clone(); let model_id = model_id; let ai_display_name = ai_display_name; let streaming_msg_id = streaming_msg_id; let room_id_for_chunk = room_id_inner; let chunk_count = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); let room_manager_cb = room_manager.clone(); let on_chunk = move |chunk: agent::chat::AiStreamChunk| { Box::pin({ let room_manager = room_manager_cb.clone(); let streaming_msg_id = streaming_msg_id; let room_id = room_id_for_chunk; let chunk_count = chunk_count.clone(); async move { let event = RoomMessageStreamChunkEvent { message_id: streaming_msg_id, room_id, content: chunk.content, done: chunk.done, error: None, }; room_manager.broadcast_stream_chunk(event).await; chunk_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); } }) as Pin + Send>> }; let stream_callback: agent::chat::StreamCallback = Box::new(on_chunk); match chat_service.process_stream(request, stream_callback).await { Ok(()) => { let full_content = { let mut rx = stream_rx; let mut content = String::new(); while let Ok(chunk_event) = rx.recv().await { if chunk_event.done { content = chunk_event.content.clone(); break; } content = chunk_event.content.clone(); } content }; let envelope = RoomMessageEnvelope { id: streaming_msg_id, dedup_key: Some(format!("{}:{}", room_id_inner, streaming_msg_id)), room_id: room_id_inner, sender_type: sender_type.clone(), sender_id: None, thread_id: None, content: full_content.clone(), content_type: "text".to_string(), send_at: now, seq, in_reply_to: None, }; if let Err(e) = queue.publish(room_id_inner, envelope).await { slog::error!(log, "Failed to publish streaming AI message: {}", e); } else { let now = Utc::now(); if let Err(e) = room_ai::Entity::update_many() .col_expr( room_ai::Column::CallCount, Expr::col(room_ai::Column::CallCount).add(1), ) .col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now))) .filter(room_ai::Column::Room.eq(room_id_inner)) .filter(room_ai::Column::Model.eq(model_id)) .exec(&db) .await { slog::warn!(log, "Failed to update room_ai call stats: {}", e); } let msg_event = queue::RoomMessageEvent { id: streaming_msg_id, room_id: room_id_inner, sender_type: sender_type.clone(), sender_id: None, thread_id: None, content: full_content, content_type: "text".to_string(), send_at: now, seq, display_name: Some(ai_display_name.clone()), in_reply_to: None, reactions: None, }; room_manager.broadcast(room_id_inner, msg_event).await; room_manager.metrics.messages_sent.increment(1); let event = queue::ProjectRoomEvent { event_type: super::RoomEventType::NewMessage.as_str().into(), project_id: project_id_inner, room_id: Some(room_id_inner), category_id: None, message_id: Some(streaming_msg_id), seq: Some(seq), timestamp: now, }; queue .publish_project_room_event(project_id_inner, event) .await; } } Err(e) => { slog::error!(log, "AI streaming failed: {}", e); let event = RoomMessageStreamChunkEvent { message_id: streaming_msg_id, room_id: room_id_inner, content: String::new(), done: true, error: Some(e.to_string()), }; room_manager.broadcast_stream_chunk(event).await; } } room_manager.close_stream_channel(streaming_msg_id).await; }); } async fn process_message_ai_nonstreaming( &self, chat_service: Arc, request: AiRequest, room_id: Uuid, project_id: Uuid, model_id: Uuid, lock_guard: crate::room_ai_queue::RoomAiLockGuard, ) { let chat_service = chat_service.clone(); let db = self.db.clone(); let cache = self.cache.clone(); let queue = self.queue.clone(); let room_manager = self.room_manager.clone(); let log = self.log.clone(); let room_id_for_ai = room_id; let project_id_for_ai = project_id; let model_id_inner = model_id; tokio::spawn(async move { let _lock_guard = lock_guard; let model_display_name = request.model.name.clone(); match chat_service.process(request).await { Ok(response) => { if let Err(e) = Self::create_and_publish_ai_message( &db, &cache, &queue, &room_manager, room_id_for_ai, project_id_for_ai, Uuid::now_v7(), response, Some(model_display_name), ) .await { slog::error!(log, "Failed to create AI message: {}", e); } else { let now = Utc::now(); if let Err(e) = room_ai::Entity::update_many() .col_expr( room_ai::Column::CallCount, Expr::col(room_ai::Column::CallCount).add(1), ) .col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now))) .filter(room_ai::Column::Room.eq(room_id_for_ai)) .filter(room_ai::Column::Model.eq(model_id_inner)) .exec(&db) .await { slog::warn!(log, "Failed to update room_ai call stats: {}", e); } } } Err(e) => { slog::error!(log, "AI processing failed: {}", e); } } }); } pub async fn create_and_publish_ai_message( db: &AppDatabase, cache: &AppCache, queue: &MessageProducer, room_manager: &Arc, room_id: Uuid, project_id: Uuid, _reply_to: Uuid, content: String, model_display_name: Option, ) -> Result { let now = Utc::now(); let seq = Self::next_room_message_seq_internal(room_id, db, cache).await?; let id = Uuid::now_v7(); let envelope = RoomMessageEnvelope { id, dedup_key: Some(format!("{}:{}", room_id, id)), room_id, sender_type: "ai".to_string(), sender_id: None, thread_id: None, content: content.clone(), content_type: "text".to_string(), send_at: now, seq, in_reply_to: None, }; queue.publish(room_id, envelope).await?; room_manager.metrics.messages_sent.increment(1); let event = queue::RoomMessageEvent { id, room_id, sender_type: "ai".to_string(), sender_id: None, thread_id: None, content: content.clone(), content_type: "text".to_string(), send_at: now, seq, display_name: model_display_name, in_reply_to: None, reactions: None, }; room_manager.broadcast(room_id, event).await; Self::publish_room_event_internal( &db, queue, project_id, super::RoomEventType::NewMessage, Some(room_id), Some(id), Some(seq), ) .await; Ok(id) } async fn get_room_history( &self, room_id: Uuid, limit: usize, ) -> Result, RoomError> { use models::rooms::room_message::{Column as RmCol, Entity as RoomMessage}; use sea_orm::EntityTrait; let messages = RoomMessage::find() .filter(RmCol::Room.eq(room_id)) .order_by_desc(RmCol::Seq) .limit(limit as u64) .all(&self.db) .await?; Ok(messages) } async fn extract_mention_context(&self, _content: &str) -> Vec { Vec::new() } async fn next_room_message_seq_internal( room_id: Uuid, db: &AppDatabase, cache: &AppCache, ) -> Result { let seq_key = format!("room:seq:{}", room_id); let mut conn = cache.conn().await.map_err(|e| { RoomError::Internal(format!("failed to get redis connection for seq: {}", e)) })?; // Atomically increment and check via Lua: INCR first, then if Redis was // externally set to a higher value, jump to max+1. This prevents concurrent // requests from getting duplicate seqs — the Lua script runs as one atomic unit. let seq: i64 = redis::cmd("EVAL") .arg( r#" local current = redis.call('INCR', KEYS[1]) local stored = redis.call('GET', KEYS[1]) if stored and tonumber(stored) > current then local next = tonumber(stored) + 1 redis.call('SET', KEYS[1], next) return next end return current "#, ) .arg(1) .arg(&seq_key) .query_async(&mut conn) .await .map_err(|e| RoomError::Internal(format!("seq Lua script: {}", e)))?; // Reconciliation check: if DB is ahead of Redis (e.g. server restart wiped // Redis), bump Redis to stay in sync. This query is only hit on the rare // cross-server handoff case, not on every request. use models::rooms::room_message::{Column as RmCol, Entity as RoomMessage}; use sea_orm::EntityTrait; let db_seq: Option>> = RoomMessage::find() .filter(RmCol::Room.eq(room_id)) .select_only() .column_as(RmCol::Seq.max(), "max_seq") .into_tuple::>>() .one(db) .await? .map(|r| r); let db_seq = db_seq.flatten().flatten().unwrap_or(0); if db_seq >= seq { // Another server handled this room while we were idle — catch up. let _: String = redis::cmd("SET") .arg(&seq_key) .arg(db_seq + 1) .query_async(&mut conn) .await .map_err(|e| RoomError::Internal(format!("SET seq: {}", e)))?; return Ok(db_seq + 1); } Ok(seq) } async fn publish_room_event_internal( _db: &AppDatabase, queue: &MessageProducer, project_id: Uuid, event_type: super::RoomEventType, room_id: Option, message_id: Option, seq: Option, ) { let event = ProjectRoomEvent { event_type: event_type.as_str().into(), project_id, room_id, category_id: None, message_id, seq, timestamp: Utc::now(), }; queue.publish_project_room_event(project_id, event).await; } }