From 01285ca9ce7e7aede5da0b54bf5cb9f379f9bddb Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Sat, 25 Apr 2026 20:09:16 +0800 Subject: [PATCH] feat(room): inject text messages into Qdrant for vector search - message.rs: after creating a text message, spawn async task to embed and upsert into Qdrant collection "room:{project}:{room_id}" - service.rs: RoomService now takes optional EmbedService, embed on every message creation --- libs/room/Cargo.toml | 1 - libs/room/src/message.rs | 32 +++++++++++++++++ libs/room/src/service.rs | 75 ++++++++++++++++++++++------------------ 3 files changed, 74 insertions(+), 34 deletions(-) diff --git a/libs/room/Cargo.toml b/libs/room/Cargo.toml index c3289e7..86464f2 100644 --- a/libs/room/Cargo.toml +++ b/libs/room/Cargo.toml @@ -40,7 +40,6 @@ utoipa = { workspace = true, features = ["uuid", "chrono"] } metrics = "0.22" regex-lite = "0.1.6" redis = { workspace = true, features = ["tokio-comp", "connection-manager"] } -async-openai = { workspace = true } hostname = "0.4" dashmap = "7.0.0-rc2" lru = "0.12.0" diff --git a/libs/room/src/message.rs b/libs/room/src/message.rs index bc6f9ab..eff9d57 100644 --- a/libs/room/src/message.rs +++ b/libs/room/src/message.rs @@ -306,6 +306,38 @@ impl RoomService { } } + // Embed user messages into Qdrant for vector memory search (non-blocking) + if is_text_message { + let embed_service = self.embed_service.clone(); + let embed_content = content.clone(); + let embed_room_id = room_id; + let embed_message_id = id; + let embed_user_id = user_id; + let embed_db = self.db.clone(); + let embed_project_id = project_id; + tokio::spawn(async move { + if let Some(embed) = embed_service { + // Look up project name for the Qdrant collection namespace + let project_name = match models::projects::project::Entity::find_by_id(embed_project_id) + .one(&embed_db) + .await + { + Ok(Some(p)) => p.display_name, + _ => return, + }; + let _ = embed + .embed_memory( + &embed_message_id.to_string(), + &embed_content, + &project_name, + &embed_room_id.to_string(), + Some(&embed_user_id.to_string()), + ) + .await; + } + }); + } + Ok(super::RoomMessageResponse { id, seq, diff --git a/libs/room/src/service.rs b/libs/room/src/service.rs index a804a0a..f87894c 100644 --- a/libs/room/src/service.rs +++ b/libs/room/src/service.rs @@ -19,6 +19,7 @@ use crate::connection::{ }; use crate::error::RoomError; use agent::chat::{AiRequest, ChatService, Mention}; +use agent::embed::EmbedService; use agent::react::ReactStep; use agent::TaskService; use models::agent_task::AgentType; @@ -61,6 +62,7 @@ pub struct RoomService { pub redis_url: String, pub chat_service: Option>, pub task_service: Option>, + pub embed_service: Option>, pub push_fn: Option, worker_semaphore: Arc, dedup_cache: DedupCache, @@ -78,6 +80,7 @@ impl RoomService { task_service: Option>, max_concurrent_workers: Option, push_fn: Option, + embed_service: Option>, ) -> Self { let dedup_cache: DedupCache = Arc::new(DashMap::with_capacity_and_hasher(10000, Default::default())); @@ -90,6 +93,7 @@ impl RoomService { redis_url, chat_service, task_service, + embed_service, worker_semaphore: Arc::new(tokio::sync::Semaphore::new( max_concurrent_workers.unwrap_or(DEFAULT_MAX_CONCURRENT_WORKERS), )), @@ -889,7 +893,25 @@ impl RoomService { .await? .ok_or_else(|| RoomError::NotFound("Project not found".to_string()))?; - let model = models::agents::model::Entity::find_by_id(ai_config.model) + // Parse @[ai:uuid:label] from content to allow per-mention model routing. + // If no mention is found, use the room's default model. + let mentioned_model_id = { + let mut found = None; + for cap in MENTION_BRACKET_RE.captures_iter(&content) { + if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) { + if type_m.as_str() == "ai" { + if let Ok(uuid) = Uuid::parse_str(id_m.as_str().trim()) { + found = Some(uuid); + break; + } + } + } + } + found + }; + + let model_id = mentioned_model_id.unwrap_or(ai_config.model); + let model = models::agents::model::Entity::find_by_id(model_id) .one(&self.db) .await? .ok_or_else(|| RoomError::NotFound("AI model not found".to_string()))?; @@ -942,7 +964,7 @@ impl RoomService { request, room_id, room.project, - ai_config.model, + model_id, lock_guard, ) .await; @@ -952,7 +974,7 @@ impl RoomService { request, room_id, room.project, - ai_config.model, + model_id, lock_guard, ) .await; @@ -963,7 +985,7 @@ impl RoomService { request, room_id, room.project, - ai_config.model, + model_id, lock_guard, ) .await; @@ -973,7 +995,7 @@ impl RoomService { request, room_id, room.project, - ai_config.model, + model_id, lock_guard, ) .await; @@ -1002,7 +1024,8 @@ impl RoomService { } }; - let stream_rx = self + // Register stream channel for real-time WebSocket broadcasting of chunks + let _ = self .room_manager .register_stream_channel(streaming_msg_id) .await; @@ -1066,20 +1089,7 @@ impl RoomService { let stream_callback: agent::chat::StreamCallback = Box::new(on_chunk); match chat_service.process_stream(request, stream_callback).await { - Ok(()) => { - let full_content = { - let mut rx = stream_rx; - let mut content = String::new(); - while let Ok(chunk_event) = rx.recv().await { - if chunk_event.done { - content = chunk_event.content.clone(); - break; - } - content = chunk_event.content.clone(); - } - content - }; - + Ok(full_content) => { let envelope = RoomMessageEnvelope { id: streaming_msg_id, dedup_key: Some(format!("{}:{}", room_id_inner, streaming_msg_id)), @@ -1396,13 +1406,21 @@ impl RoomService { step_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); } - let done = is_answer; + // Update buffers BEFORE spawning — must be synchronous so the main + // thread sees the updates immediately after `process_react` returns. + { + let mut rb = reasoning_buffer.lock().unwrap(); + rb.push_str(&content); + rb.push('\n'); + } + if is_answer { + let mut ab = answer_buffer.lock().unwrap(); + ab.push_str(&content); + } + let done = is_answer; let ai_name = ai_display_name_for_step.clone(); - let reasoning_buf = reasoning_buffer.clone(); - let answer_buf = answer_buffer.clone(); tokio::spawn(async move { - // Always broadcast every step as a stream chunk let event = RoomMessageStreamChunkEvent { message_id: streaming_msg_id, room_id, @@ -1417,15 +1435,6 @@ impl RoomService { }), }; room_manager.broadcast_stream_chunk(event).await; - - // Collect all steps into reasoning_buffer; Answer goes to answer_buffer - let mut rb = reasoning_buf.lock().unwrap(); - rb.push_str(&content); - rb.push('\n'); - drop(rb); - if is_answer { - answer_buf.lock().unwrap().push_str(&content); - } }); } };