feat(room): inject text messages into Qdrant for vector search
- message.rs: after creating a text message, spawn async task to
embed and upsert into Qdrant collection "room:{project}:{room_id}"
- service.rs: RoomService now takes optional EmbedService, embed on
every message creation
This commit is contained in:
parent
215846b1db
commit
01285ca9ce
@ -40,7 +40,6 @@ utoipa = { workspace = true, features = ["uuid", "chrono"] }
|
|||||||
metrics = "0.22"
|
metrics = "0.22"
|
||||||
regex-lite = "0.1.6"
|
regex-lite = "0.1.6"
|
||||||
redis = { workspace = true, features = ["tokio-comp", "connection-manager"] }
|
redis = { workspace = true, features = ["tokio-comp", "connection-manager"] }
|
||||||
async-openai = { workspace = true }
|
|
||||||
hostname = "0.4"
|
hostname = "0.4"
|
||||||
dashmap = "7.0.0-rc2"
|
dashmap = "7.0.0-rc2"
|
||||||
lru = "0.12.0"
|
lru = "0.12.0"
|
||||||
|
|||||||
@ -306,6 +306,38 @@ impl RoomService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Embed user messages into Qdrant for vector memory search (non-blocking)
|
||||||
|
if is_text_message {
|
||||||
|
let embed_service = self.embed_service.clone();
|
||||||
|
let embed_content = content.clone();
|
||||||
|
let embed_room_id = room_id;
|
||||||
|
let embed_message_id = id;
|
||||||
|
let embed_user_id = user_id;
|
||||||
|
let embed_db = self.db.clone();
|
||||||
|
let embed_project_id = project_id;
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Some(embed) = embed_service {
|
||||||
|
// Look up project name for the Qdrant collection namespace
|
||||||
|
let project_name = match models::projects::project::Entity::find_by_id(embed_project_id)
|
||||||
|
.one(&embed_db)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(Some(p)) => p.display_name,
|
||||||
|
_ => return,
|
||||||
|
};
|
||||||
|
let _ = embed
|
||||||
|
.embed_memory(
|
||||||
|
&embed_message_id.to_string(),
|
||||||
|
&embed_content,
|
||||||
|
&project_name,
|
||||||
|
&embed_room_id.to_string(),
|
||||||
|
Some(&embed_user_id.to_string()),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
Ok(super::RoomMessageResponse {
|
Ok(super::RoomMessageResponse {
|
||||||
id,
|
id,
|
||||||
seq,
|
seq,
|
||||||
|
|||||||
@ -19,6 +19,7 @@ use crate::connection::{
|
|||||||
};
|
};
|
||||||
use crate::error::RoomError;
|
use crate::error::RoomError;
|
||||||
use agent::chat::{AiRequest, ChatService, Mention};
|
use agent::chat::{AiRequest, ChatService, Mention};
|
||||||
|
use agent::embed::EmbedService;
|
||||||
use agent::react::ReactStep;
|
use agent::react::ReactStep;
|
||||||
use agent::TaskService;
|
use agent::TaskService;
|
||||||
use models::agent_task::AgentType;
|
use models::agent_task::AgentType;
|
||||||
@ -61,6 +62,7 @@ pub struct RoomService {
|
|||||||
pub redis_url: String,
|
pub redis_url: String,
|
||||||
pub chat_service: Option<Arc<ChatService>>,
|
pub chat_service: Option<Arc<ChatService>>,
|
||||||
pub task_service: Option<Arc<TaskService>>,
|
pub task_service: Option<Arc<TaskService>>,
|
||||||
|
pub embed_service: Option<Arc<EmbedService>>,
|
||||||
pub push_fn: Option<PushNotificationFn>,
|
pub push_fn: Option<PushNotificationFn>,
|
||||||
worker_semaphore: Arc<tokio::sync::Semaphore>,
|
worker_semaphore: Arc<tokio::sync::Semaphore>,
|
||||||
dedup_cache: DedupCache,
|
dedup_cache: DedupCache,
|
||||||
@ -78,6 +80,7 @@ impl RoomService {
|
|||||||
task_service: Option<Arc<TaskService>>,
|
task_service: Option<Arc<TaskService>>,
|
||||||
max_concurrent_workers: Option<usize>,
|
max_concurrent_workers: Option<usize>,
|
||||||
push_fn: Option<PushNotificationFn>,
|
push_fn: Option<PushNotificationFn>,
|
||||||
|
embed_service: Option<Arc<EmbedService>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let dedup_cache: DedupCache =
|
let dedup_cache: DedupCache =
|
||||||
Arc::new(DashMap::with_capacity_and_hasher(10000, Default::default()));
|
Arc::new(DashMap::with_capacity_and_hasher(10000, Default::default()));
|
||||||
@ -90,6 +93,7 @@ impl RoomService {
|
|||||||
redis_url,
|
redis_url,
|
||||||
chat_service,
|
chat_service,
|
||||||
task_service,
|
task_service,
|
||||||
|
embed_service,
|
||||||
worker_semaphore: Arc::new(tokio::sync::Semaphore::new(
|
worker_semaphore: Arc::new(tokio::sync::Semaphore::new(
|
||||||
max_concurrent_workers.unwrap_or(DEFAULT_MAX_CONCURRENT_WORKERS),
|
max_concurrent_workers.unwrap_or(DEFAULT_MAX_CONCURRENT_WORKERS),
|
||||||
)),
|
)),
|
||||||
@ -889,7 +893,25 @@ impl RoomService {
|
|||||||
.await?
|
.await?
|
||||||
.ok_or_else(|| RoomError::NotFound("Project not found".to_string()))?;
|
.ok_or_else(|| RoomError::NotFound("Project not found".to_string()))?;
|
||||||
|
|
||||||
let model = models::agents::model::Entity::find_by_id(ai_config.model)
|
// Parse @[ai:uuid:label] from content to allow per-mention model routing.
|
||||||
|
// If no mention is found, use the room's default model.
|
||||||
|
let mentioned_model_id = {
|
||||||
|
let mut found = None;
|
||||||
|
for cap in MENTION_BRACKET_RE.captures_iter(&content) {
|
||||||
|
if let (Some(type_m), Some(id_m)) = (cap.get(1), cap.get(2)) {
|
||||||
|
if type_m.as_str() == "ai" {
|
||||||
|
if let Ok(uuid) = Uuid::parse_str(id_m.as_str().trim()) {
|
||||||
|
found = Some(uuid);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
found
|
||||||
|
};
|
||||||
|
|
||||||
|
let model_id = mentioned_model_id.unwrap_or(ai_config.model);
|
||||||
|
let model = models::agents::model::Entity::find_by_id(model_id)
|
||||||
.one(&self.db)
|
.one(&self.db)
|
||||||
.await?
|
.await?
|
||||||
.ok_or_else(|| RoomError::NotFound("AI model not found".to_string()))?;
|
.ok_or_else(|| RoomError::NotFound("AI model not found".to_string()))?;
|
||||||
@ -942,7 +964,7 @@ impl RoomService {
|
|||||||
request,
|
request,
|
||||||
room_id,
|
room_id,
|
||||||
room.project,
|
room.project,
|
||||||
ai_config.model,
|
model_id,
|
||||||
lock_guard,
|
lock_guard,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
@ -952,7 +974,7 @@ impl RoomService {
|
|||||||
request,
|
request,
|
||||||
room_id,
|
room_id,
|
||||||
room.project,
|
room.project,
|
||||||
ai_config.model,
|
model_id,
|
||||||
lock_guard,
|
lock_guard,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
@ -963,7 +985,7 @@ impl RoomService {
|
|||||||
request,
|
request,
|
||||||
room_id,
|
room_id,
|
||||||
room.project,
|
room.project,
|
||||||
ai_config.model,
|
model_id,
|
||||||
lock_guard,
|
lock_guard,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
@ -973,7 +995,7 @@ impl RoomService {
|
|||||||
request,
|
request,
|
||||||
room_id,
|
room_id,
|
||||||
room.project,
|
room.project,
|
||||||
ai_config.model,
|
model_id,
|
||||||
lock_guard,
|
lock_guard,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
@ -1002,7 +1024,8 @@ impl RoomService {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let stream_rx = self
|
// Register stream channel for real-time WebSocket broadcasting of chunks
|
||||||
|
let _ = self
|
||||||
.room_manager
|
.room_manager
|
||||||
.register_stream_channel(streaming_msg_id)
|
.register_stream_channel(streaming_msg_id)
|
||||||
.await;
|
.await;
|
||||||
@ -1066,20 +1089,7 @@ impl RoomService {
|
|||||||
let stream_callback: agent::chat::StreamCallback = Box::new(on_chunk);
|
let stream_callback: agent::chat::StreamCallback = Box::new(on_chunk);
|
||||||
|
|
||||||
match chat_service.process_stream(request, stream_callback).await {
|
match chat_service.process_stream(request, stream_callback).await {
|
||||||
Ok(()) => {
|
Ok(full_content) => {
|
||||||
let full_content = {
|
|
||||||
let mut rx = stream_rx;
|
|
||||||
let mut content = String::new();
|
|
||||||
while let Ok(chunk_event) = rx.recv().await {
|
|
||||||
if chunk_event.done {
|
|
||||||
content = chunk_event.content.clone();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
content = chunk_event.content.clone();
|
|
||||||
}
|
|
||||||
content
|
|
||||||
};
|
|
||||||
|
|
||||||
let envelope = RoomMessageEnvelope {
|
let envelope = RoomMessageEnvelope {
|
||||||
id: streaming_msg_id,
|
id: streaming_msg_id,
|
||||||
dedup_key: Some(format!("{}:{}", room_id_inner, streaming_msg_id)),
|
dedup_key: Some(format!("{}:{}", room_id_inner, streaming_msg_id)),
|
||||||
@ -1396,13 +1406,21 @@ impl RoomService {
|
|||||||
step_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
step_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
let done = is_answer;
|
// Update buffers BEFORE spawning — must be synchronous so the main
|
||||||
|
// thread sees the updates immediately after `process_react` returns.
|
||||||
|
{
|
||||||
|
let mut rb = reasoning_buffer.lock().unwrap();
|
||||||
|
rb.push_str(&content);
|
||||||
|
rb.push('\n');
|
||||||
|
}
|
||||||
|
if is_answer {
|
||||||
|
let mut ab = answer_buffer.lock().unwrap();
|
||||||
|
ab.push_str(&content);
|
||||||
|
}
|
||||||
|
|
||||||
|
let done = is_answer;
|
||||||
let ai_name = ai_display_name_for_step.clone();
|
let ai_name = ai_display_name_for_step.clone();
|
||||||
let reasoning_buf = reasoning_buffer.clone();
|
|
||||||
let answer_buf = answer_buffer.clone();
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
// Always broadcast every step as a stream chunk
|
|
||||||
let event = RoomMessageStreamChunkEvent {
|
let event = RoomMessageStreamChunkEvent {
|
||||||
message_id: streaming_msg_id,
|
message_id: streaming_msg_id,
|
||||||
room_id,
|
room_id,
|
||||||
@ -1417,15 +1435,6 @@ impl RoomService {
|
|||||||
}),
|
}),
|
||||||
};
|
};
|
||||||
room_manager.broadcast_stream_chunk(event).await;
|
room_manager.broadcast_stream_chunk(event).await;
|
||||||
|
|
||||||
// Collect all steps into reasoning_buffer; Answer goes to answer_buffer
|
|
||||||
let mut rb = reasoning_buf.lock().unwrap();
|
|
||||||
rb.push_str(&content);
|
|
||||||
rb.push('\n');
|
|
||||||
drop(rb);
|
|
||||||
if is_answer {
|
|
||||||
answer_buf.lock().unwrap().push_str(&content);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user