diff --git a/libs/room/src/connection.rs b/libs/room/src/connection.rs index a20dca9..fdb8692 100644 --- a/libs/room/src/connection.rs +++ b/libs/room/src/connection.rs @@ -788,12 +788,16 @@ pub fn make_persist_fn( db: AppDatabase, metrics: Arc, dedup_cache: DedupCache, + embed_service: Option>, ) -> PersistFn { Arc::new(move |envelopes: Vec| { let db = db.clone(); let metrics = metrics.clone(); let cache = dedup_cache.clone(); + let embed = embed_service.clone(); Box::pin(async move { + let mut persisted: Vec = Vec::new(); + for chunk in envelopes.chunks(BATCH_SIZE) { let mut models_to_insert = Vec::new(); let mut ids_to_dedup: Vec = Vec::new(); @@ -885,13 +889,115 @@ pub fn make_persist_fn( } metrics.messages_persisted.increment(count); + + // Collect persisted messages for Qdrant embedding + for env in chunk { + if existing_ids.contains(&env.id) { + continue; + } + persisted.push(env.clone()); + } } } + + // Batch-embed text messages into Qdrant (non-blocking, fire-and-forget) + if let Some(embed) = embed { + if !persisted.is_empty() { + let embed_db = db.clone(); + tokio::spawn(async move { + embed_persisted_messages(embed, embed_db, persisted).await; + }); + } + } + Ok(()) }) }) } +/// Filter and batch-embed persisted messages into Qdrant per-room collections. +/// Only embeds text-type, non-empty messages (filters system/tool/non-text). +async fn embed_persisted_messages( + embed: Arc, + db: AppDatabase, + messages: Vec, +) { + // Filter: only text content, non-empty, skip system messages + let to_embed: Vec<&RoomMessageEnvelope> = messages + .iter() + .filter(|m| { + m.content_type == "text" + && !m.content.trim().is_empty() + && m.sender_type != "system" + && m.sender_type != "tool" + }) + .collect(); + + if to_embed.is_empty() { + return; + } + + // Batch-lookup room → project_id → project_name + let room_ids: Vec = to_embed.iter().map(|m| m.room_id).collect(); + let rooms = match models::rooms::room::Entity::find() + .filter(models::rooms::room::Column::Id.is_in(room_ids.clone())) + .all(&db) + .await + { + Ok(r) => r, + Err(e) => { + tracing::warn!(error = %e, "embed: failed to lookup rooms"); + return; + } + }; + + let project_ids: Vec = rooms.iter().map(|r| r.project).collect(); + let projects = match models::projects::project::Entity::find() + .filter(models::projects::project::Column::Id.is_in(project_ids)) + .all(&db) + .await + { + Ok(p) => p, + Err(e) => { + tracing::warn!(error = %e, "embed: failed to lookup projects"); + return; + } + }; + + // Build room_id → project_name map + use std::collections::HashMap; + let mut room_project: HashMap = HashMap::new(); + for room in &rooms { + if let Some(proj) = projects.iter().find(|p| p.id == room.project) { + room_project.insert(room.id, proj.display_name.clone()); + } + } + + // Build EmbedMemoryInput list + let inputs: Vec = to_embed + .into_iter() + .filter_map(|m| { + let project_name = room_project.get(&m.room_id)?; + Some(agent::embed::EmbedMemoryInput { + message_id: m.id.to_string(), + content: m.content.clone(), + project_name: project_name.clone(), + room_id: m.room_id.to_string(), + user_id: m.sender_id.map(|id| id.to_string()), + sender_type: m.sender_type.clone(), + }) + }) + .collect(); + + if inputs.is_empty() { + return; + } + + if let Err(e) = embed.embed_memories_batch(inputs).await { + tracing::warn!(error = %e, "batch memory embed failed"); + } +} + pub type RedisFuture = Pin> + Send>>; diff --git a/libs/room/src/message.rs b/libs/room/src/message.rs index 1ec8f5b..8f1952c 100644 --- a/libs/room/src/message.rs +++ b/libs/room/src/message.rs @@ -308,38 +308,6 @@ impl RoomService { } } - // Embed user messages into Qdrant for vector memory search (non-blocking) - if is_text_message { - let embed_service = self.embed_service.clone(); - let embed_content = content.clone(); - let embed_room_id = room_id; - let embed_message_id = id; - let embed_user_id = user_id; - let embed_db = self.db.clone(); - let embed_project_id = project_id; - tokio::spawn(async move { - if let Some(embed) = embed_service { - // Look up project name for the Qdrant collection namespace - let project_name = match models::projects::project::Entity::find_by_id(embed_project_id) - .one(&embed_db) - .await - { - Ok(Some(p)) => p.display_name, - _ => return, - }; - let _ = embed - .embed_memory( - &embed_message_id.to_string(), - &embed_content, - &project_name, - &embed_room_id.to_string(), - Some(&embed_user_id.to_string()), - ) - .await; - } - }); - } - Ok(super::RoomMessageResponse { id, seq, diff --git a/libs/room/src/service/mod.rs b/libs/room/src/service/mod.rs index 21ff430..ac2f8d6 100644 --- a/libs/room/src/service/mod.rs +++ b/libs/room/src/service/mod.rs @@ -1,5 +1,4 @@ mod access; -mod billing; mod ai_common; mod ai_nonstreaming; mod ai_react_nonstreaming; @@ -110,6 +109,7 @@ impl RoomService { self.task_service.clone(), None, // max_concurrent_workers handled by semaphore shutdown_rx, + self.embed_service.clone(), ) .await } @@ -152,6 +152,7 @@ impl RoomService { self.queue.clone(), self.redis_url.clone(), self.worker_semaphore.clone(), + self.embed_service.clone(), ); } diff --git a/libs/room/src/service/workers.rs b/libs/room/src/service/workers.rs index 74baa17..44e4602 100644 --- a/libs/room/src/service/workers.rs +++ b/libs/room/src/service/workers.rs @@ -26,6 +26,7 @@ pub async fn start_workers( _task_service: Option>, _max_concurrent_workers: Option, mut shutdown_rx: tokio::sync::broadcast::Receiver<()>, + embed_service: Option>, ) -> anyhow::Result<()> { // Load rooms with a reasonable cap to prevent resource exhaustion on large instances. // Rooms beyond this limit will be activated on-demand when first accessed. @@ -50,7 +51,12 @@ pub async fn start_workers( "starting room workers" ); - let persist_fn: PersistFn = make_persist_fn(db.clone(), room_manager.metrics.clone(), dedup_cache.clone()); + let persist_fn: PersistFn = make_persist_fn( + db.clone(), + room_manager.metrics.clone(), + dedup_cache.clone(), + embed_service.clone(), + ); let get_redis: Arc queue::worker::RedisFuture + Send + Sync> = extract_get_redis(queue.clone()); @@ -272,6 +278,7 @@ pub fn spawn_room_workers( queue: MessageProducer, redis_url: String, worker_semaphore: Arc, + embed_service: Option>, ) { let persist_fn: PersistFn = make_persist_fn( db.clone(), @@ -282,6 +289,7 @@ pub fn spawn_room_workers( Default::default(), ), ), + embed_service, ); let get_redis: Arc queue::worker::RedisFuture + Send + Sync> = extract_get_redis(queue.clone());