feat(room): batch-embed all room messages into Qdrant on persist

- make_persist_fn now accepts embed_service, collects persisted text messages
- Filters non-text, non-empty, non-system/tool messages
- Groups by room→project_name, batch-embeds via embed_memories_batch
- Removes old per-message synchronous embed_memory call
- Workers thread embed_service through to persist_fn
This commit is contained in:
ZhenYi 2026-04-28 13:03:59 +08:00
parent 026f5cf32d
commit 93ec515f29
4 changed files with 117 additions and 34 deletions

View File

@ -788,12 +788,16 @@ pub fn make_persist_fn(
db: AppDatabase,
metrics: Arc<RoomMetrics>,
dedup_cache: DedupCache,
embed_service: Option<Arc<agent::embed::EmbedService>>,
) -> PersistFn {
Arc::new(move |envelopes: Vec<RoomMessageEnvelope>| {
let db = db.clone();
let metrics = metrics.clone();
let cache = dedup_cache.clone();
let embed = embed_service.clone();
Box::pin(async move {
let mut persisted: Vec<RoomMessageEnvelope> = Vec::new();
for chunk in envelopes.chunks(BATCH_SIZE) {
let mut models_to_insert = Vec::new();
let mut ids_to_dedup: Vec<uuid::Uuid> = Vec::new();
@ -885,13 +889,115 @@ pub fn make_persist_fn(
}
metrics.messages_persisted.increment(count);
// Collect persisted messages for Qdrant embedding
for env in chunk {
if existing_ids.contains(&env.id) {
continue;
}
persisted.push(env.clone());
}
}
}
// Batch-embed text messages into Qdrant (non-blocking, fire-and-forget)
if let Some(embed) = embed {
if !persisted.is_empty() {
let embed_db = db.clone();
tokio::spawn(async move {
embed_persisted_messages(embed, embed_db, persisted).await;
});
}
}
Ok(())
})
})
}
/// Filter and batch-embed persisted messages into Qdrant per-room collections.
/// Only embeds text-type, non-empty messages (filters system/tool/non-text).
async fn embed_persisted_messages(
embed: Arc<agent::embed::EmbedService>,
db: AppDatabase,
messages: Vec<RoomMessageEnvelope>,
) {
// Filter: only text content, non-empty, skip system messages
let to_embed: Vec<&RoomMessageEnvelope> = messages
.iter()
.filter(|m| {
m.content_type == "text"
&& !m.content.trim().is_empty()
&& m.sender_type != "system"
&& m.sender_type != "tool"
})
.collect();
if to_embed.is_empty() {
return;
}
// Batch-lookup room → project_id → project_name
let room_ids: Vec<Uuid> = to_embed.iter().map(|m| m.room_id).collect();
let rooms = match models::rooms::room::Entity::find()
.filter(models::rooms::room::Column::Id.is_in(room_ids.clone()))
.all(&db)
.await
{
Ok(r) => r,
Err(e) => {
tracing::warn!(error = %e, "embed: failed to lookup rooms");
return;
}
};
let project_ids: Vec<Uuid> = rooms.iter().map(|r| r.project).collect();
let projects = match models::projects::project::Entity::find()
.filter(models::projects::project::Column::Id.is_in(project_ids))
.all(&db)
.await
{
Ok(p) => p,
Err(e) => {
tracing::warn!(error = %e, "embed: failed to lookup projects");
return;
}
};
// Build room_id → project_name map
use std::collections::HashMap;
let mut room_project: HashMap<Uuid, String> = HashMap::new();
for room in &rooms {
if let Some(proj) = projects.iter().find(|p| p.id == room.project) {
room_project.insert(room.id, proj.display_name.clone());
}
}
// Build EmbedMemoryInput list
let inputs: Vec<agent::embed::EmbedMemoryInput> = to_embed
.into_iter()
.filter_map(|m| {
let project_name = room_project.get(&m.room_id)?;
Some(agent::embed::EmbedMemoryInput {
message_id: m.id.to_string(),
content: m.content.clone(),
project_name: project_name.clone(),
room_id: m.room_id.to_string(),
user_id: m.sender_id.map(|id| id.to_string()),
sender_type: m.sender_type.clone(),
})
})
.collect();
if inputs.is_empty() {
return;
}
if let Err(e) = embed.embed_memories_batch(inputs).await {
tracing::warn!(error = %e, "batch memory embed failed");
}
}
pub type RedisFuture =
Pin<Box<dyn Future<Output = anyhow::Result<deadpool_redis::cluster::Connection>> + Send>>;

View File

@ -308,38 +308,6 @@ impl RoomService {
}
}
// Embed user messages into Qdrant for vector memory search (non-blocking)
if is_text_message {
let embed_service = self.embed_service.clone();
let embed_content = content.clone();
let embed_room_id = room_id;
let embed_message_id = id;
let embed_user_id = user_id;
let embed_db = self.db.clone();
let embed_project_id = project_id;
tokio::spawn(async move {
if let Some(embed) = embed_service {
// Look up project name for the Qdrant collection namespace
let project_name = match models::projects::project::Entity::find_by_id(embed_project_id)
.one(&embed_db)
.await
{
Ok(Some(p)) => p.display_name,
_ => return,
};
let _ = embed
.embed_memory(
&embed_message_id.to_string(),
&embed_content,
&project_name,
&embed_room_id.to_string(),
Some(&embed_user_id.to_string()),
)
.await;
}
});
}
Ok(super::RoomMessageResponse {
id,
seq,

View File

@ -1,5 +1,4 @@
mod access;
mod billing;
mod ai_common;
mod ai_nonstreaming;
mod ai_react_nonstreaming;
@ -110,6 +109,7 @@ impl RoomService {
self.task_service.clone(),
None, // max_concurrent_workers handled by semaphore
shutdown_rx,
self.embed_service.clone(),
)
.await
}
@ -152,6 +152,7 @@ impl RoomService {
self.queue.clone(),
self.redis_url.clone(),
self.worker_semaphore.clone(),
self.embed_service.clone(),
);
}

View File

@ -26,6 +26,7 @@ pub async fn start_workers(
_task_service: Option<Arc<agent::TaskService>>,
_max_concurrent_workers: Option<usize>,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
embed_service: Option<Arc<agent::embed::EmbedService>>,
) -> anyhow::Result<()> {
// Load rooms with a reasonable cap to prevent resource exhaustion on large instances.
// Rooms beyond this limit will be activated on-demand when first accessed.
@ -50,7 +51,12 @@ pub async fn start_workers(
"starting room workers"
);
let persist_fn: PersistFn = make_persist_fn(db.clone(), room_manager.metrics.clone(), dedup_cache.clone());
let persist_fn: PersistFn = make_persist_fn(
db.clone(),
room_manager.metrics.clone(),
dedup_cache.clone(),
embed_service.clone(),
);
let get_redis: Arc<dyn Fn() -> queue::worker::RedisFuture + Send + Sync> =
extract_get_redis(queue.clone());
@ -272,6 +278,7 @@ pub fn spawn_room_workers(
queue: MessageProducer,
redis_url: String,
worker_semaphore: Arc<tokio::sync::Semaphore>,
embed_service: Option<Arc<agent::embed::EmbedService>>,
) {
let persist_fn: PersistFn = make_persist_fn(
db.clone(),
@ -282,6 +289,7 @@ pub fn spawn_room_workers(
Default::default(),
),
),
embed_service,
);
let get_redis: Arc<dyn Fn() -> queue::worker::RedisFuture + Send + Sync> =
extract_get_redis(queue.clone());