gitdataai/libs/room/src/service.rs
ZhenYi 78eee672a4 feat(room): AI typing indicator with 60s Redis TTL and WS replay
- Add sender_type field to TypingEvent (user/ai)
- Change Redis TTL from 10s to 60s for AI typing persistence
- Broadcast typing.start/stop with sender_type=ai when AI stream starts/ends
- Replay active AI typing events from Redis on new WS subscribe
- Fix ai.stream_chunk WS payload missing display_name and chunk_type
- Add initial thinking chunk on AI stream start for immediate indicator
2026-04-25 22:45:03 +08:00

1761 lines
65 KiB
Rust

use dashmap::DashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::LazyLock;
use chrono::Utc;
use db::cache::AppCache;
use db::database::AppDatabase;
use models::projects::project_members;
use models::rooms::room;
use models::rooms::room_ai;
use models::EntityTrait;
use queue::{AgentTaskEvent, MessageProducer, ProjectRoomEvent, RoomMessageEnvelope};
use sea_orm::{sea_query::Expr, ColumnTrait, ExprTrait, QueryFilter, QueryOrder, QuerySelect};
use uuid::Uuid;
use crate::connection::{
extract_get_redis, make_persist_fn, DedupCache, PersistFn, RoomConnectionManager,
};
use crate::error::RoomError;
use agent::chat::{AiRequest, ChatService, Mention};
use agent::embed::EmbedService;
use agent::react::ReactStep;
use agent::TaskService;
use models::agent_task::AgentType;
const DEFAULT_MAX_CONCURRENT_WORKERS: usize = 1024;
/// Callback type for sending push notifications.
/// The caller (AppService) provides this to RoomService so it can trigger
/// browser push notifications without depending on the service crate directly.
pub type PushNotificationFn =
Arc<dyn Fn(Uuid, String, Option<String>, Option<String>) + Send + Sync>;
/// Legacy: <user>uuid</user> or <user>username</user>
static USER_MENTION_RE: LazyLock<regex_lite::Regex, fn() -> regex_lite::Regex> =
LazyLock::new(|| regex_lite::Regex::new(r"<user>\s*([^<]+?)\s*</user>").unwrap());
/// Legacy: <mention type="..." id="...">label</mention>
static MENTION_TAG_RE: LazyLock<regex_lite::Regex, fn() -> regex_lite::Regex> =
LazyLock::new(|| {
regex_lite::Regex::new(
r#"<mention\s+type="([^"]+)"\s+id="([^"]+)"[^>]*>\s*([^<]*?)\s*</mention>"#,
)
.unwrap()
});
/// New format: @[type:id:label]
/// e.g. @[user:550e8400-e29b-41d4-a716-446655440000:alice]
/// @[repo:660e8400-e29b-41d4-a716-446655440001:my-repo]
/// @[ai:gpt-4o:Claude]
static MENTION_BRACKET_RE: LazyLock<regex_lite::Regex, fn() -> regex_lite::Regex> =
LazyLock::new(|| regex_lite::Regex::new(r"@\[([a-z]+):([^:\]]+):([^\]]+)\]").unwrap());
#[derive(Clone)]
pub struct RoomService {
pub db: AppDatabase,
pub cache: AppCache,
pub config: config::AppConfig,
pub room_manager: Arc<RoomConnectionManager>,
pub queue: MessageProducer,
pub redis_url: String,
pub chat_service: Option<Arc<ChatService>>,
pub task_service: Option<Arc<TaskService>>,
pub embed_service: Option<Arc<EmbedService>>,
pub push_fn: Option<PushNotificationFn>,
worker_semaphore: Arc<tokio::sync::Semaphore>,
dedup_cache: DedupCache,
}
impl RoomService {
pub fn new(
db: AppDatabase,
cache: AppCache,
config: config::AppConfig,
queue: MessageProducer,
room_manager: Arc<RoomConnectionManager>,
redis_url: String,
chat_service: Option<Arc<ChatService>>,
task_service: Option<Arc<TaskService>>,
max_concurrent_workers: Option<usize>,
push_fn: Option<PushNotificationFn>,
embed_service: Option<Arc<EmbedService>>,
) -> Self {
let dedup_cache: DedupCache =
Arc::new(DashMap::with_capacity_and_hasher(10000, Default::default()));
Self {
db,
cache,
config,
room_manager,
queue,
redis_url,
chat_service,
task_service,
embed_service,
worker_semaphore: Arc::new(tokio::sync::Semaphore::new(
max_concurrent_workers.unwrap_or(DEFAULT_MAX_CONCURRENT_WORKERS),
)),
dedup_cache,
push_fn,
}
}
pub async fn start_workers(
&self,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<()> {
use models::rooms::Room;
use sea_orm::EntityTrait;
let rooms: Vec<room::Model> = Room::find().all(&self.db).await?;
let room_ids: Vec<uuid::Uuid> = rooms.iter().map(|r| r.id).collect();
let project_ids: Vec<uuid::Uuid> = rooms
.iter()
.map(|r| r.project)
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
// Save a clone for task subscriber handles before `project_ids` gets moved.
let task_project_ids = project_ids.clone();
tracing::info!(
room_count = room_ids.len(),
project_count = project_ids.len(),
"starting room workers"
);
let persist_fn: PersistFn = make_persist_fn(
self.db.clone(),
self.room_manager.metrics.clone(),
self.dedup_cache.clone(),
);
let get_redis: Arc<dyn Fn() -> queue::worker::RedisFuture + Send + Sync> =
extract_get_redis(self.queue.clone());
let worker_room_ids = room_ids.clone();
let worker_shutdown = shutdown_rx.resubscribe();
let worker_handle = tokio::spawn({
let get_redis = get_redis.clone();
let persist_fn = persist_fn.clone();
async move {
queue::start_worker(worker_room_ids, get_redis, persist_fn, worker_shutdown).await;
}
});
let manager = self.room_manager.clone();
let redis_url = self.redis_url.clone();
let mut handles: Vec<_> = room_ids
.into_iter()
.map(|room_id| {
let manager = manager.clone();
let redis_url = redis_url.clone();
let shutdown_rx = shutdown_rx.resubscribe();
tokio::spawn(async move {
crate::connection::subscribe_room_events(
redis_url,
manager,
room_id,
shutdown_rx,
)
.await;
})
})
.collect();
let project_handles: Vec<_> = project_ids
.into_iter()
.map(|project_id| {
let manager = manager.clone();
let redis_url = redis_url.clone();
let shutdown_rx = shutdown_rx.resubscribe();
tokio::spawn(async move {
crate::connection::subscribe_project_room_events(
redis_url,
manager,
project_id,
shutdown_rx,
)
.await;
})
})
.collect();
handles.extend(project_handles);
// Subscribe to agent task events for each project.
let task_handles: Vec<_> = task_project_ids
.into_iter()
.map(|project_id| {
let manager = manager.clone();
let redis_url = redis_url.clone();
let shutdown_rx = shutdown_rx.resubscribe();
tokio::spawn(async move {
crate::connection::subscribe_task_events_fn(
redis_url,
manager,
project_id,
shutdown_rx,
)
.await;
})
})
.collect();
handles.extend(task_handles);
let cleanup_handle = {
let manager = self.room_manager.clone();
let db = self.db.clone();
let dedup_cache = self.dedup_cache.clone();
let mut cleanup_shutdown = shutdown_rx.resubscribe();
tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(300));
interval.tick().await;
loop {
tokio::select! {
_ = interval.tick() => {
manager.cleanup_rate_limit().await;
crate::connection::cleanup_dedup_cache(&dedup_cache);
if let Ok(rooms) = Room::find().all(&db).await {
let room_ids: Vec<_> = rooms.iter().map(|r| r.id).collect();
let project_ids: Vec<_> = rooms.iter().map(|r| r.project).collect();
manager.metrics.cleanup_stale_rooms(&room_ids).await;
manager.prune_stale_rooms(&room_ids).await;
manager.prune_stale_projects(&project_ids).await;
}
}
_ = cleanup_shutdown.recv() => {
tracing::info!("cleanup task shutting down");
break;
}
}
}
})
};
handles.push(cleanup_handle);
let _ = shutdown_rx.recv().await;
tracing::info!("room workers shutting down");
for h in handles {
let _ = h.abort();
}
let _ = worker_handle.await;
tracing::info!("room workers stopped");
Ok(())
}
/// Spawn a background agent task:
/// 1. Creates a DB record (status = pending → running)
/// 2. Publishes a "started" event via Redis Pub/Sub
/// 3. Runs `execute()` behind a semaphore
/// 4. On complete/fail, updates the record and publishes the final event
pub async fn spawn_agent_task<F, Fut>(
&self,
project_id: Uuid,
agent_type: AgentType,
input: String,
_title: Option<String>,
execute: F,
) -> anyhow::Result<i64>
where
F: FnOnce(i64, Arc<TaskService>) -> Fut + Send + 'static,
Fut: std::future::Future<Output = Result<String, String>> + Send,
{
let task_service = match &self.task_service {
Some(ts) => ts.clone(),
None => return Err(anyhow::anyhow!("task service not configured")),
};
let task = task_service
.create(project_id, input, agent_type)
.await
.map_err(|e| anyhow::anyhow!("create task failed: {}", e))?;
let task_id = task.id;
// Publish "started" event via Redis Pub/Sub.
let started_event = AgentTaskEvent {
task_id,
project_id,
parent_id: task.parent_id,
event: "started".to_string(),
message: None,
output: None,
error: None,
status: models::agent_task::TaskStatus::Running.to_string(),
timestamp: Utc::now(),
};
self.queue
.publish_agent_task_event(project_id, started_event)
.await;
// Mark task as running.
let _ = task_service.start(task_id).await;
let queue = self.queue.clone();
let room_manager = self.room_manager.clone();
let semaphore = self.worker_semaphore.clone();
// Spawn the background task.
tokio::spawn(async move {
let _permit = semaphore.acquire().await.expect("semaphore closed");
let result = execute(task_id, task_service.clone()).await;
let event = match result {
Ok(output) => {
let _ = task_service.complete(task_id, &output).await;
AgentTaskEvent {
task_id,
project_id,
parent_id: None,
event: "done".to_string(),
message: None,
output: Some(output),
error: None,
status: models::agent_task::TaskStatus::Done.to_string(),
timestamp: chrono::Utc::now(),
}
}
Err(err) => {
let _ = task_service.fail(task_id, &err).await;
AgentTaskEvent {
task_id,
project_id,
parent_id: None,
event: "failed".to_string(),
message: None,
output: None,
error: Some(err),
status: models::agent_task::TaskStatus::Failed.to_string(),
timestamp: chrono::Utc::now(),
}
}
};
queue
.publish_agent_task_event(project_id, event.clone())
.await;
room_manager.broadcast_agent_task(project_id, event).await;
tracing::info!(task_id = task_id, project_id = %project_id, "agent task finished");
});
Ok(task_id)
}
pub fn spawn_room_workers(&self, room_id: uuid::Uuid) {
let persist_fn: PersistFn = make_persist_fn(
self.db.clone(),
self.room_manager.metrics.clone(),
self.dedup_cache.clone(),
);
let get_redis: Arc<dyn Fn() -> queue::worker::RedisFuture + Send + Sync> =
extract_get_redis(self.queue.clone());
let manager = self.room_manager.clone();
let redis_url = self.redis_url.clone();
let semaphore = self.worker_semaphore.clone();
let db = self.db.clone();
let manager2 = self.room_manager.clone();
let redis_url2 = redis_url.clone();
let redis_url3 = redis_url.clone();
tokio::spawn(async move {
let _permit = match semaphore.acquire_owned().await {
Ok(p) => p,
Err(_) => return,
};
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
queue::room_worker_task(
room_id,
uuid::Uuid::new_v4().to_string(),
get_redis,
persist_fn,
shutdown_rx,
)
.await;
let _ = shutdown_tx.send(());
});
tokio::spawn(async move {
let shutdown_rx = manager.register_room(room_id).await;
crate::connection::subscribe_room_events(
redis_url2,
manager.clone(),
room_id,
shutdown_rx,
)
.await;
});
tokio::spawn(async move {
let project_id = {
let room = room::Entity::find_by_id(room_id)
.one(&db)
.await
.ok()
.flatten();
match room {
Some(r) => r.project,
None => return,
}
};
let shutdown_rx = manager2.register_project(project_id).await;
crate::connection::subscribe_project_room_events(
redis_url3,
manager2,
project_id,
shutdown_rx,
)
.await;
});
}
pub async fn publish_room_event(
&self,
project_id: uuid::Uuid,
event_type: super::RoomEventType,
room_id: Option<uuid::Uuid>,
category_id: Option<uuid::Uuid>,
message_id: Option<uuid::Uuid>,
seq: Option<i64>,
) {
let event = ProjectRoomEvent {
event_type: event_type.as_str().into(),
project_id,
room_id,
category_id,
message_id,
seq,
timestamp: Utc::now(),
};
self.queue
.publish_project_room_event(project_id, event)
.await;
}
pub(crate) fn notify_project_members(
&self,
project_id: uuid::Uuid,
notification_type: super::NotificationType,
title: String,
content: Option<String>,
related_room_id: Option<uuid::Uuid>,
) {
let db = self.db.clone();
let notification_type_inner = notification_type;
let title_inner = title;
let content_inner = content;
let related_room_id_inner = related_room_id;
let project_id_inner = project_id;
tokio::spawn(async move {
let members = match project_members::Entity::find()
.filter(project_members::Column::Project.eq(project_id_inner))
.all(&db)
.await
{
Ok(m) => m,
Err(e) => {
tracing::error!(project_id = %project_id_inner, error = %e,
"notify_project_members: failed to fetch members");
return;
}
};
for member in members {
let user_id = member.user;
if let Err(e) = Self::_notification_create_sync(
&db,
notification_type_inner,
user_id,
title_inner.clone(),
content_inner.clone(),
related_room_id_inner,
project_id_inner,
)
.await
{
tracing::warn!(user_id = %user_id, project_id = %project_id_inner, error = %e,
"notify_project_members: failed to create notification for user");
}
}
});
}
async fn _notification_create_sync(
db: &db::database::AppDatabase,
notification_type: super::NotificationType,
user_id: uuid::Uuid,
title: String,
content: Option<String>,
related_room_id: Option<uuid::Uuid>,
project_id: uuid::Uuid,
) -> Result<(), crate::error::RoomError> {
use chrono::Utc;
use models::rooms::room_notifications;
use sea_orm::{ActiveModelTrait, Set};
let notification_type_model = match notification_type {
super::NotificationType::Mention => room_notifications::NotificationType::Mention,
super::NotificationType::Invitation => room_notifications::NotificationType::Invitation,
super::NotificationType::RoleChange => room_notifications::NotificationType::RoleChange,
super::NotificationType::RoomCreated => {
room_notifications::NotificationType::RoomCreated
}
super::NotificationType::RoomDeleted => {
room_notifications::NotificationType::RoomDeleted
}
super::NotificationType::SystemAnnouncement => {
room_notifications::NotificationType::SystemAnnouncement
}
super::NotificationType::ProjectInvitation => {
room_notifications::NotificationType::ProjectInvitation
}
super::NotificationType::WorkspaceInvitation => {
room_notifications::NotificationType::WorkspaceInvitation
}
};
let _model = room_notifications::ActiveModel {
id: Set(uuid::Uuid::now_v7()),
room: Set(related_room_id),
project: Set(Some(project_id)),
user_id: Set(Some(user_id)),
notification_type: Set(notification_type_model),
related_message_id: Set(None),
related_user_id: Set(None),
related_room_id: Set(related_room_id),
title: Set(title),
content: Set(content),
metadata: Set(None),
is_read: Set(false),
is_archived: Set(false),
created_at: Set(Utc::now()),
read_at: Set(None),
expires_at: Set(None),
}
.insert(db)
.await
.map_err(|e| crate::error::RoomError::Database(e))?;
Ok(())
}
/// Extracts user UUIDs from all mention formats:
/// - Legacy: `<user>uuid</user>`
/// - Legacy: `<mention type="user" id="uuid">label</mention>`
/// - New: `@[user:uuid:label]`
pub fn extract_mentions(content: &str) -> Vec<Uuid> {
let mut mentioned = Vec::new();
// Legacy <user>uuid</user> format
for cap in USER_MENTION_RE.captures_iter(content) {
if let Some(inner) = cap.get(1) {
let token = inner.as_str().trim();
if let Ok(uuid) = Uuid::parse_str(token) {
if !mentioned.contains(&uuid) {
mentioned.push(uuid);
}
}
}
}
// Legacy <mention type="user" id="...">label</mention> format
for cap in MENTION_TAG_RE.captures_iter(content) {
if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) {
if type_m.as_str() == "user" {
if let Ok(uuid) = Uuid::parse_str(id_m.as_str().trim()) {
if !mentioned.contains(&uuid) {
mentioned.push(uuid);
}
}
}
}
}
// New @[user:uuid:label] format
for cap in MENTION_BRACKET_RE.captures_iter(content) {
if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) {
if type_m.as_str() == "user" {
if let Ok(uuid) = Uuid::parse_str(id_m.as_str().trim()) {
if !mentioned.contains(&uuid) {
mentioned.push(uuid);
}
}
}
}
}
mentioned
}
/// Resolves user mentions from all formats:
/// - Legacy: `<user>uuid</user>` or `<user>username</user>`
/// - Legacy: `<mention type="user" id="uuid">label</mention>`
/// - New: `@[user:uuid:label]`
/// Repository and AI mention types are accepted but produce no user UUIDs.
pub async fn resolve_mentions(&self, content: &str) -> Vec<Uuid> {
use models::users::User;
use sea_orm::EntityTrait;
let mut resolved: Vec<Uuid> = Vec::new();
let mut seen_usernames: Vec<String> = Vec::new();
// Legacy <user>uuid</user> or <user>username</user> format
for cap in USER_MENTION_RE.captures_iter(content) {
if let Some(inner) = cap.get(1) {
let token = inner.as_str().trim();
if let Ok(uuid) = Uuid::parse_str(token) {
if !resolved.contains(&uuid) {
resolved.push(uuid);
}
continue;
}
let token_lower = token.to_lowercase();
if seen_usernames.contains(&token_lower) {
continue;
}
seen_usernames.push(token_lower.clone());
if let Some(user) = User::find()
.filter(models::users::user::Column::Username.eq(token_lower))
.one(&self.db)
.await
.ok()
.flatten()
{
if !resolved.contains(&user.uid) {
resolved.push(user.uid);
}
}
}
}
// New <mention type="user" id="uuid">label</mention> format
for cap in MENTION_TAG_RE.captures_iter(content) {
if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) {
if type_m.as_str() == "user" {
let id = id_m.as_str().trim();
if let Ok(uuid) = Uuid::parse_str(id) {
if !resolved.contains(&uuid) {
resolved.push(uuid);
}
} else {
// Fall back to label-based username lookup
if let Some(label_m) = cap.get(3) {
let label = label_m.as_str().trim();
if !label.is_empty() {
let label_lower = label.to_lowercase();
if seen_usernames.contains(&label_lower) {
continue;
}
seen_usernames.push(label_lower.clone());
if let Some(user) = User::find()
.filter(models::users::user::Column::Username.eq(label_lower))
.one(&self.db)
.await
.ok()
.flatten()
{
if !resolved.contains(&user.uid) {
resolved.push(user.uid);
}
}
}
}
}
}
// `repository` and `ai` mention types are accepted but do not
// produce user notification UUIDs — no-op here.
}
}
// New @[user:uuid:label] format
for cap in MENTION_BRACKET_RE.captures_iter(content) {
if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) {
if type_m.as_str() == "user" {
let id = id_m.as_str().trim();
if let Ok(uuid) = Uuid::parse_str(id) {
if !resolved.contains(&uuid) {
resolved.push(uuid);
}
} else {
// Fall back to label-based username lookup
if let Some(label_m) = cap.get(3) {
let label = label_m.as_str().trim();
if !label.is_empty() {
let label_lower = label.to_lowercase();
if seen_usernames.contains(&label_lower) {
continue;
}
seen_usernames.push(label_lower.clone());
if let Some(user) = User::find()
.filter(models::users::user::Column::Username.eq(label_lower))
.one(&self.db)
.await
.ok()
.flatten()
{
if !resolved.contains(&user.uid) {
resolved.push(user.uid);
}
}
}
}
}
}
// `repository` and `ai` mention types: no user UUIDs
}
}
resolved
}
pub async fn check_room_access(&self, room_id: Uuid, user_id: Uuid) -> Result<(), RoomError> {
let room = room::Entity::find_by_id(room_id)
.one(&self.db)
.await?
.ok_or_else(|| RoomError::NotFound("Room not found".to_string()))?;
if room.public {
return Ok(());
}
if self.require_room_member(room_id, user_id).await.is_ok() {
return Ok(());
}
self.check_project_member(room.project, user_id).await?;
Ok(())
}
pub async fn check_project_member(
&self,
project_id: Uuid,
user_id: Uuid,
) -> Result<(), RoomError> {
let member = project_members::Entity::find()
.filter(project_members::Column::Project.eq(project_id))
.filter(project_members::Column::User.eq(user_id))
.one(&self.db)
.await?;
if member.is_some() {
Ok(())
} else {
Err(RoomError::NoPower)
}
}
/// Determine whether AI should respond to a message in this room.
/// - No room_ai config → AI not configured, never respond.
/// - use_exact = false → respond to every text message.
/// - use_exact = true → only respond when the message contains an @[ai:...] or
/// <mention type="ai">... tag that mentions this room's configured AI model.
pub async fn should_ai_respond(&self, room_id: Uuid, content: &str) -> Result<bool, RoomError> {
use models::rooms::room_ai;
let ai_config = room_ai::Entity::find()
.filter(room_ai::Column::Room.eq(room_id))
.one(&self.db)
.await?;
let config = match ai_config {
Some(c) => c,
None => return Ok(false),
};
if !config.use_exact {
return Ok(true);
}
// use_exact mode: only respond when AI is explicitly mentioned
let model_id_str = config.model.to_string();
// Check @[ai:model_id:label] format
for cap in MENTION_BRACKET_RE.captures_iter(content) {
if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) {
if type_m.as_str() == "ai" && id_m.as_str().trim() == model_id_str {
return Ok(true);
}
}
}
// Check <mention type="ai" id="model_id">label</mention> format
for cap in MENTION_TAG_RE.captures_iter(content) {
if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) {
if type_m.as_str() == "ai" && id_m.as_str().trim() == model_id_str {
return Ok(true);
}
}
}
Ok(false)
}
pub async fn get_room_ai_config(
&self,
room_id: Uuid,
) -> Result<Option<room_ai::Model>, RoomError> {
use models::rooms::room_ai;
let ai_config = room_ai::Entity::find()
.filter(room_ai::Column::Room.eq(room_id))
.one(&self.db)
.await?;
Ok(ai_config)
}
pub async fn get_user_names(
&self,
user_ids: &[Uuid],
) -> std::collections::HashMap<Uuid, String> {
use models::users::User;
use sea_orm::EntityTrait;
let mut names = std::collections::HashMap::new();
if user_ids.is_empty() {
return names;
}
let users = User::find()
.filter(models::users::user::Column::Uid.is_in(user_ids.to_vec()))
.all(&self.db)
.await
.unwrap_or_default();
for user in users {
names.insert(user.uid, user.username);
}
names
}
pub async fn require_room_member(&self, room_id: Uuid, user_id: Uuid) -> Result<(), RoomError> {
use models::rooms::room_member::{Column as RmCol, Entity as RoomMember};
let member = RoomMember::find()
.filter(RmCol::Room.eq(room_id))
.filter(RmCol::User.eq(user_id))
.one(&self.db)
.await?;
member
.ok_or_else(|| RoomError::NotFound("Room member not found".to_string()))
.map(|_| ())
}
pub async fn find_room_or_404(&self, room_id: Uuid) -> Result<room::Model, RoomError> {
room::Entity::find_by_id(room_id)
.one(&self.db)
.await?
.ok_or_else(|| RoomError::NotFound("Room not found".to_string()))
}
pub async fn process_message_ai(
&self,
room_id: Uuid,
_message_id: Uuid,
sender_id: Uuid,
content: String,
) -> Result<(), RoomError> {
let Some(chat_service) = &self.chat_service else {
return Ok(());
};
let Some(ai_config) = self.get_room_ai_config(room_id).await? else {
return Ok(());
};
let Some(lock_guard) =
crate::room_ai_queue::acquire_room_ai_lock(&self.cache, room_id).await?
else {
return Ok(());
};
let room = self.find_room_or_404(room_id).await?;
let project = models::projects::project::Entity::find_by_id(room.project)
.one(&self.db)
.await?
.ok_or_else(|| RoomError::NotFound("Project not found".to_string()))?;
// Parse @[ai:uuid:label] from content to allow per-mention model routing.
// If no mention is found, use the room's default model.
let mentioned_model_id = {
let mut found = None;
for cap in MENTION_BRACKET_RE.captures_iter(&content) {
if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) {
if type_m.as_str() == "ai" {
if let Ok(uuid) = Uuid::parse_str(id_m.as_str().trim()) {
found = Some(uuid);
break;
}
}
}
}
found
};
let model_id = mentioned_model_id.unwrap_or(ai_config.model);
let model = models::agents::model::Entity::find_by_id(model_id)
.one(&self.db)
.await?
.ok_or_else(|| RoomError::NotFound("AI model not found".to_string()))?;
let sender = models::users::User::find_by_id(sender_id)
.one(&self.db)
.await?
.ok_or_else(|| RoomError::NotFound("Sender not found".to_string()))?;
let history = self.get_room_history(room_id, 50).await?;
let user_ids: Vec<Uuid> = history
.iter()
.filter_map(|m| m.sender_id)
.chain(std::iter::once(sender_id))
.collect();
let user_names = self.get_user_names(&user_ids).await;
let mentions = self.extract_mention_context(&content).await;
let request = AiRequest {
db: self.db.clone(),
cache: self.cache.clone(),
config: self.config.clone(),
model,
project: project.clone(),
sender,
room: room.clone(),
input: content,
mention: mentions,
history,
user_names,
temperature: ai_config.temperature.unwrap_or(0.7),
max_tokens: ai_config.max_tokens.unwrap_or(4096) as i32,
top_p: 1.0,
frequency_penalty: 0.0,
presence_penalty: 0.0,
think: ai_config.think,
tools: Some(chat_service.tools()),
max_tool_depth: 1000,
};
let use_streaming = ai_config.stream;
let is_react = ai_config.agent_type.as_deref() == Some("react");
if is_react {
if use_streaming {
self.process_message_ai_react_streaming(
chat_service.clone(),
request,
room_id,
room.project,
model_id,
lock_guard,
)
.await;
} else {
self.process_message_ai_react_nonstreaming(
chat_service.clone(),
request,
room_id,
room.project,
model_id,
lock_guard,
)
.await;
}
} else if use_streaming {
self.process_message_ai_streaming(
chat_service.clone(),
request,
room_id,
room.project,
model_id,
lock_guard,
)
.await;
} else {
self.process_message_ai_nonstreaming(
chat_service.clone(),
request,
room_id,
room.project,
model_id,
lock_guard,
)
.await;
}
Ok(())
}
async fn process_message_ai_streaming(
&self,
chat_service: Arc<ChatService>,
request: AiRequest,
room_id: Uuid,
project_id: Uuid,
_model_id: Uuid,
lock_guard: crate::room_ai_queue::RoomAiLockGuard,
) {
use queue::RoomMessageStreamChunkEvent;
let streaming_msg_id = Uuid::now_v7();
let seq = match Self::next_room_message_seq_internal(room_id, &self.db, &self.cache).await {
Ok(s) => s,
Err(e) => {
tracing::error!(error = %e, "Failed to get seq for streaming AI message");
return;
}
};
// Register stream channel for real-time WebSocket broadcasting of chunks
let _ = self
.room_manager
.register_stream_channel(streaming_msg_id)
.await;
// Emit an initial "thinking" chunk immediately so the frontend shows the
// "AI is thinking..." indicator without waiting for the first real token.
let initial_event = RoomMessageStreamChunkEvent {
message_id: streaming_msg_id,
room_id,
content: String::new(),
done: false,
error: None,
display_name: Some(request.model.name.clone()),
chunk_type: Some("thinking".to_string()),
};
self.room_manager.broadcast_stream_chunk(initial_event).await;
let room_manager = self.room_manager.clone();
let db = self.db.clone();
let room_id_inner = room_id;
let project_id_inner = project_id;
let now = Utc::now();
let sender_type = "ai".to_string();
let queue = self.queue.clone();
let ai_display_name = request.model.name.clone();
let db = db.clone();
let model_id = request.model.id;
tokio::spawn(async move {
let _lock_guard = lock_guard;
let room_manager = room_manager.clone();
let db = db.clone();
let model_id = model_id;
// Fixed UUID to identify AI typing events across WS reconnections.
let ai_typing_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
// Clone before closure so closure captures clone, not the original.
let ai_display_name_for_chunk = ai_display_name.clone();
let ai_display_name_for_final = ai_display_name.clone();
let streaming_msg_id = streaming_msg_id;
let room_id_for_chunk = room_id_inner;
let chunk_count = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let room_manager_cb = room_manager.clone();
let on_chunk = move |chunk: agent::chat::AiStreamChunk| {
Box::pin({
let room_manager = room_manager_cb.clone();
let streaming_msg_id = streaming_msg_id;
let room_id = room_id_for_chunk;
let chunk_count = chunk_count.clone();
// Clone display_name INSIDE the async block so the outer closure stays `Fn`.
let ai_display_name_for_chunk = ai_display_name_for_chunk.clone();
async move {
let chunk_type_str = match chunk.chunk_type {
agent::chat::AiChunkType::Thinking => "thinking",
agent::chat::AiChunkType::Answer => "answer",
agent::chat::AiChunkType::ToolCall => "tool_call",
agent::chat::AiChunkType::ToolResult => "tool_result",
};
let event = RoomMessageStreamChunkEvent {
message_id: streaming_msg_id,
room_id,
content: chunk.content,
done: chunk.done,
error: None,
display_name: Some(ai_display_name_for_chunk),
chunk_type: Some(chunk_type_str.to_string()),
};
room_manager.broadcast_stream_chunk(event).await;
chunk_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}) as Pin<Box<dyn std::future::Future<Output = ()> + Send>>
};
let stream_callback: agent::chat::StreamCallback = Box::new(on_chunk);
// Broadcast AI typing.start so WS clients (including reconnections) see the indicator.
let typing_start = queue::TypingEvent {
room_id: room_id_inner,
user_id: ai_typing_id,
username: ai_display_name.clone(),
avatar_url: None,
action: "start".to_string(),
sender_type: Some("ai".to_string()),
};
room_manager.broadcast_typing(room_id_inner, typing_start).await;
match chat_service.process_stream(request, stream_callback).await {
Ok(full_content) => {
let envelope = RoomMessageEnvelope {
id: streaming_msg_id,
dedup_key: Some(format!("{}:{}", room_id_inner, streaming_msg_id)),
room_id: room_id_inner,
sender_type: sender_type.clone(),
sender_id: None,
model_id: Some(model_id),
thread_id: None,
content: full_content.clone(),
content_type: "text".to_string(),
send_at: now,
seq,
in_reply_to: None,
display_name: Some(ai_display_name_for_final.clone()),
};
if let Err(e) = queue.publish(room_id_inner, envelope).await {
tracing::error!(error = %e, "Failed to publish streaming AI message");
} else {
let now = Utc::now();
if let Err(e) = room_ai::Entity::update_many()
.col_expr(
room_ai::Column::CallCount,
Expr::col(room_ai::Column::CallCount).add(1),
)
.col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now)))
.filter(room_ai::Column::Room.eq(room_id_inner))
.filter(room_ai::Column::Model.eq(model_id))
.exec(&db)
.await
{
tracing::warn!(error = %e, "Failed to update room_ai call stats");
}
let msg_event = queue::RoomMessageEvent {
id: streaming_msg_id,
room_id: room_id_inner,
sender_type: sender_type.clone(),
sender_id: None,
thread_id: None,
content: full_content,
content_type: "text".to_string(),
send_at: now,
seq,
display_name: Some(ai_display_name_for_final.clone()),
in_reply_to: None,
reactions: None,
message_id: None,
};
room_manager.broadcast(room_id_inner, msg_event).await;
room_manager.metrics.messages_sent.increment(1);
// Stop AI typing indicator now that the message is delivered.
let typing_stop = queue::TypingEvent {
room_id: room_id_inner,
user_id: ai_typing_id,
username: ai_display_name_for_final.clone(),
avatar_url: None,
action: "stop".to_string(),
sender_type: Some("ai".to_string()),
};
room_manager.broadcast_typing(room_id_inner, typing_stop).await;
let event = queue::ProjectRoomEvent {
event_type: super::RoomEventType::NewMessage.as_str().into(),
project_id: project_id_inner,
room_id: Some(room_id_inner),
category_id: None,
message_id: Some(streaming_msg_id),
seq: Some(seq),
timestamp: now,
};
queue
.publish_project_room_event(project_id_inner, event)
.await;
}
}
Err(e) => {
tracing::error!(error = %e, "AI streaming failed");
// Stop AI typing indicator since the stream failed.
let typing_stop = queue::TypingEvent {
room_id: room_id_inner,
user_id: ai_typing_id,
username: ai_display_name.clone(),
avatar_url: None,
action: "stop".to_string(),
sender_type: Some("ai".to_string()),
};
room_manager.broadcast_typing(room_id_inner, typing_stop).await;
let event = RoomMessageStreamChunkEvent {
message_id: streaming_msg_id,
room_id: room_id_inner,
content: String::new(),
done: true,
error: Some(e.to_string()),
display_name: Some(ai_display_name.clone()),
chunk_type: None,
};
room_manager.broadcast_stream_chunk(event).await;
}
}
room_manager.close_stream_channel(streaming_msg_id).await;
});
}
async fn process_message_ai_nonstreaming(
&self,
chat_service: Arc<ChatService>,
request: AiRequest,
room_id: Uuid,
project_id: Uuid,
model_id: Uuid,
lock_guard: crate::room_ai_queue::RoomAiLockGuard,
) {
let chat_service = chat_service.clone();
let db = self.db.clone();
let cache = self.cache.clone();
let queue = self.queue.clone();
let room_manager = self.room_manager.clone();
let room_id_for_ai = room_id;
let project_id_for_ai = project_id;
let model_id_inner = model_id;
tokio::spawn(async move {
let _lock_guard = lock_guard;
let model_display_name = request.model.name.clone();
match chat_service.process(request).await {
Ok(response) => {
if let Err(e) = Self::create_and_publish_ai_message(
&db,
&cache,
&queue,
&room_manager,
room_id_for_ai,
project_id_for_ai,
Uuid::now_v7(),
response,
model_id_inner,
Some(model_display_name),
)
.await
{
tracing::error!(error = %e, "Failed to create AI message");
} else {
let now = Utc::now();
if let Err(e) = room_ai::Entity::update_many()
.col_expr(
room_ai::Column::CallCount,
Expr::col(room_ai::Column::CallCount).add(1),
)
.col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now)))
.filter(room_ai::Column::Room.eq(room_id_for_ai))
.filter(room_ai::Column::Model.eq(model_id_inner))
.exec(&db)
.await
{
tracing::warn!(error = %e, "Failed to update room_ai call stats");
}
}
}
Err(e) => {
tracing::error!(error = %e, "AI processing failed");
// Send an error message so the user knows something went wrong
let _ = Self::create_and_publish_ai_message(
&db,
&cache,
&queue,
&room_manager,
room_id_for_ai,
project_id_for_ai,
Uuid::now_v7(),
format!("[AI error: {}]", e),
model_id_inner,
Some(model_display_name),
)
.await;
}
}
});
}
/// ReAct agent — non-streaming: collect full answer then persist.
async fn process_message_ai_react_nonstreaming(
&self,
chat_service: Arc<ChatService>,
request: AiRequest,
room_id: Uuid,
project_id: Uuid,
model_id: Uuid,
lock_guard: crate::room_ai_queue::RoomAiLockGuard,
) {
let chat_service = chat_service.clone();
let db = self.db.clone();
let cache = self.cache.clone();
let queue = self.queue.clone();
let room_manager = self.room_manager.clone();
let room_id_for_ai = room_id;
let project_id_for_ai = project_id;
let model_id_inner = model_id;
tokio::spawn(async move {
let _lock_guard = lock_guard;
let model_display_name = request.model.name.clone();
let final_answer = chat_service
.process_react(&request, |_step| {
// ReAct step events are logged internally; no streaming output here.
})
.await;
match final_answer {
Ok(response) => {
if let Err(e) = Self::create_and_publish_ai_message(
&db,
&cache,
&queue,
&room_manager,
room_id_for_ai,
project_id_for_ai,
Uuid::now_v7(),
response,
model_id_inner,
Some(model_display_name),
)
.await
{
tracing::error!(error = %e, "Failed to create ReAct AI message");
} else {
let now = Utc::now();
if let Err(e) = room_ai::Entity::update_many()
.col_expr(
room_ai::Column::CallCount,
Expr::col(room_ai::Column::CallCount).add(1),
)
.col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now)))
.filter(room_ai::Column::Room.eq(room_id_for_ai))
.filter(room_ai::Column::Model.eq(model_id_inner))
.exec(&db)
.await
{
tracing::warn!(error = %e, "Failed to update room_ai call stats");
}
}
}
Err(e) => {
tracing::error!(error = %e, "ReAct agent failed");
let _ = Self::create_and_publish_ai_message(
&db,
&cache,
&queue,
&room_manager,
room_id_for_ai,
project_id_for_ai,
Uuid::now_v7(),
format!("[AI error: {}]", e),
model_id_inner,
Some(model_display_name),
)
.await;
}
}
});
}
/// ReAct agent — streaming: forward each ReactStep to WebSocket, then persist final answer.
async fn process_message_ai_react_streaming(
&self,
chat_service: Arc<ChatService>,
request: AiRequest,
room_id: Uuid,
project_id: Uuid,
_model_id: Uuid,
lock_guard: crate::room_ai_queue::RoomAiLockGuard,
) {
use queue::RoomMessageStreamChunkEvent;
let streaming_msg_id = Uuid::now_v7();
let seq = match Self::next_room_message_seq_internal(room_id, &self.db, &self.cache).await {
Ok(s) => s,
Err(e) => {
tracing::error!(error = %e, "Failed to get seq for ReAct streaming");
return;
}
};
let room_manager = self.room_manager.clone();
let db = self.db.clone();
let room_id_inner = room_id;
let project_id_inner = project_id;
let now = Utc::now();
let sender_type = "ai".to_string();
let queue = self.queue.clone();
let ai_display_name = request.model.name.clone();
let model_id_inner = request.model.id;
tokio::spawn(async move {
let _lock_guard = lock_guard;
// Buffer all reasoning steps + the final answer separately.
let reasoning_buffer: std::sync::Arc<std::sync::Mutex<String>> =
std::sync::Arc::new(std::sync::Mutex::new(String::new()));
let answer_buffer: std::sync::Arc<std::sync::Mutex<String>> =
std::sync::Arc::new(std::sync::Mutex::new(String::new()));
let step_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
let on_step = {
let room_manager = room_manager.clone();
let streaming_msg_id = streaming_msg_id;
let room_id = room_id_inner;
let step_count = step_count.clone();
let ai_display_name_for_step = std::sync::Arc::new(ai_display_name.clone());
let reasoning_buffer = reasoning_buffer.clone();
let answer_buffer = answer_buffer.clone();
move |step: ReactStep| {
let room_manager = room_manager.clone();
let content = match &step {
ReactStep::Thought { step: _, thought } => {
format!("[Thinking] {}", thought)
}
ReactStep::Action { step: _, action } => {
format!("[Action] Calling `{}` with {:?}", action.name, action.args)
}
ReactStep::Observation {
step: _,
observation,
} => {
format!("[Observation] {}", observation)
}
ReactStep::Answer { step: _, answer } => answer.clone(),
};
let is_answer = matches!(&step, ReactStep::Answer { .. });
if is_answer {
step_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
// Update buffers BEFORE spawning — must be synchronous so the main
// thread sees the updates immediately after `process_react` returns.
{
let mut rb = reasoning_buffer.lock().unwrap();
rb.push_str(&content);
rb.push('\n');
}
if is_answer {
let mut ab = answer_buffer.lock().unwrap();
ab.push_str(&content);
}
let done = is_answer;
let ai_name = ai_display_name_for_step.clone();
tokio::spawn(async move {
let event = RoomMessageStreamChunkEvent {
message_id: streaming_msg_id,
room_id,
content: content.clone(),
done,
error: None,
display_name: Some((*ai_name).clone()),
chunk_type: Some(if is_answer {
"answer".to_string()
} else {
"thinking".to_string()
}),
};
room_manager.broadcast_stream_chunk(event).await;
});
}
};
let result = chat_service.process_react(&request, on_step).await;
let final_content = answer_buffer.lock().unwrap().clone();
let reasoning_chain = reasoning_buffer.lock().unwrap().clone();
// Determine what to persist: prefer the answer, fall back to the reasoning chain
let content_to_persist = if !final_content.is_empty() {
final_content
} else if !reasoning_chain.trim().is_empty() {
// No Answer step, but the reasoning chain was streamed — still send it
format!(
"[Agent ran through {} reasoning steps but did not produce a final answer.]\n{}",
step_count.load(std::sync::atomic::Ordering::Relaxed),
reasoning_chain.trim_end()
)
} else {
// Nothing produced — this should not happen in practice
String::from("[No output from reasoning agent]")
};
let (err_msg, should_log) = match &result {
Err(e) => (Some(format!("[Agent error: {}]", e)), true),
_ => (None, false),
};
let content_to_persist = if let Some(msg) = &err_msg {
format!(
"{}\n[Error during reasoning: {}]",
content_to_persist.trim_end(),
msg.trim_start_matches("[Agent error: ")
.trim_end_matches("]")
)
} else {
content_to_persist
};
if should_log {
tracing::error!(error = %result.as_ref().unwrap_err(), "ReAct streaming failed");
}
let persist_content = content_to_persist.trim().to_string();
if persist_content.is_empty() {
return;
}
let envelope = RoomMessageEnvelope {
id: streaming_msg_id,
dedup_key: Some(format!("{}:{}", room_id_inner, streaming_msg_id)),
room_id: room_id_inner,
sender_type: sender_type.clone(),
sender_id: None,
model_id: Some(model_id_inner),
thread_id: None,
content: persist_content.clone(),
content_type: "text".to_string(),
send_at: now,
seq,
in_reply_to: None,
display_name: Some(ai_display_name.clone()),
};
if let Err(e) = queue.publish(room_id_inner, envelope).await {
tracing::error!(error = %e, "Failed to publish ReAct streaming message");
} else {
let now = Utc::now();
if let Err(e) = room_ai::Entity::update_many()
.col_expr(
room_ai::Column::CallCount,
Expr::col(room_ai::Column::CallCount).add(1),
)
.col_expr(room_ai::Column::LastCallAt, Expr::value(Some(now)))
.filter(room_ai::Column::Room.eq(room_id_inner))
.filter(room_ai::Column::Model.eq(model_id_inner))
.exec(&db)
.await
{
tracing::warn!(error = %e, "Failed to update room_ai call stats");
}
let msg_event = queue::RoomMessageEvent {
id: streaming_msg_id,
room_id: room_id_inner,
sender_type: sender_type.clone(),
sender_id: None,
thread_id: None,
content: persist_content,
content_type: "text".to_string(),
send_at: now,
seq,
display_name: Some(ai_display_name.clone()),
in_reply_to: None,
reactions: None,
message_id: None,
};
room_manager.broadcast(room_id_inner, msg_event).await;
room_manager.metrics.messages_sent.increment(1);
let event = queue::ProjectRoomEvent {
event_type: super::RoomEventType::NewMessage.as_str().into(),
project_id: project_id_inner,
room_id: Some(room_id_inner),
category_id: None,
message_id: Some(streaming_msg_id),
seq: Some(seq),
timestamp: now,
};
queue
.publish_project_room_event(project_id_inner, event)
.await;
}
room_manager.close_stream_channel(streaming_msg_id).await;
});
}
pub async fn create_and_publish_ai_message(
db: &AppDatabase,
cache: &AppCache,
queue: &MessageProducer,
room_manager: &Arc<RoomConnectionManager>,
room_id: Uuid,
project_id: Uuid,
_reply_to: Uuid,
content: String,
model_id: Uuid,
model_display_name: Option<String>,
) -> Result<Uuid, RoomError> {
let now = Utc::now();
let seq = Self::next_room_message_seq_internal(room_id, db, cache).await?;
let id = Uuid::now_v7();
let envelope = RoomMessageEnvelope {
id,
dedup_key: Some(format!("{}:{}", room_id, id)),
room_id,
sender_type: "ai".to_string(),
sender_id: None,
model_id: Some(model_id),
thread_id: None,
content: content.clone(),
content_type: "text".to_string(),
send_at: now,
seq,
in_reply_to: None,
display_name: model_display_name.clone(),
};
queue.publish(room_id, envelope).await?;
room_manager.metrics.messages_sent.increment(1);
let event = queue::RoomMessageEvent {
id,
room_id,
sender_type: "ai".to_string(),
sender_id: None,
thread_id: None,
content: content.clone(),
content_type: "text".to_string(),
send_at: now,
seq,
display_name: model_display_name,
in_reply_to: None,
reactions: None,
message_id: None,
};
room_manager.broadcast(room_id, event).await;
Self::publish_room_event_internal(
&db,
queue,
project_id,
super::RoomEventType::NewMessage,
Some(room_id),
Some(id),
Some(seq),
)
.await;
Ok(id)
}
async fn get_room_history(
&self,
room_id: Uuid,
limit: usize,
) -> Result<Vec<models::rooms::room_message::Model>, RoomError> {
use models::rooms::room_message::{Column as RmCol, Entity as RoomMessage};
use sea_orm::EntityTrait;
let messages = RoomMessage::find()
.filter(RmCol::Room.eq(room_id))
.order_by_desc(RmCol::Seq)
.limit(limit as u64)
.all(&self.db)
.await?;
Ok(messages)
}
async fn extract_mention_context(&self, _content: &str) -> Vec<Mention> {
Vec::new()
}
pub(crate) async fn next_room_message_seq_internal(
room_id: Uuid,
db: &AppDatabase,
cache: &AppCache,
) -> Result<i64, RoomError> {
let seq_key = format!("room:seq:{}", room_id);
let mut conn = cache.conn().await.map_err(|e| {
RoomError::Internal(format!("failed to get redis connection for seq: {}", e))
})?;
// Normal path: Redis INCR is atomic and sufficient for sequence generation.
// Lua script removed — it was executing on every single message (costly).
let seq: i64 = redis::cmd("INCR")
.arg(&seq_key)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("seq INCR: {}", e)))?;
// DB reconciliation: only check every 1000 messages, not on every request.
// This handles the rare cross-server handoff case (Redis restart wipe).
if seq % 1000 == 0 {
use models::rooms::room_message::{Column as RmCol, Entity as RoomMessage};
use sea_orm::EntityTrait;
let db_seq: Option<Option<Option<i64>>> = RoomMessage::find()
.filter(RmCol::Room.eq(room_id))
.select_only()
.column_as(RmCol::Seq.max(), "max_seq")
.into_tuple::<Option<Option<i64>>>()
.one(db)
.await?
.map(|r| r);
let db_seq = db_seq.flatten().flatten().unwrap_or(0);
if db_seq >= seq {
let _: String = redis::cmd("SET")
.arg(&seq_key)
.arg(db_seq + 1)
.query_async(&mut conn)
.await
.map_err(|e| RoomError::Internal(format!("seq SET: {}", e)))?;
return Ok(db_seq + 1);
}
}
Ok(seq)
}
async fn publish_room_event_internal(
_db: &AppDatabase,
queue: &MessageProducer,
project_id: Uuid,
event_type: super::RoomEventType,
room_id: Option<Uuid>,
message_id: Option<Uuid>,
seq: Option<i64>,
) {
let event = ProjectRoomEvent {
event_type: event_type.as_str().into(),
project_id,
room_id,
category_id: None,
message_id,
seq,
timestamp: Utc::now(),
};
queue.publish_project_room_event(project_id, event).await;
}
}