diff --git a/libs/agent/embed/client.rs b/libs/agent/embed/client.rs index eee5166..b00f21b 100644 --- a/libs/agent/embed/client.rs +++ b/libs/agent/embed/client.rs @@ -72,6 +72,15 @@ impl EmbedClient { self.qdrant.upsert_points(points).await } + /// Upsert points into a named collection (bypasses entity_type routing). + pub async fn upsert_to_collection( + &self, + collection_name: &str, + points: Vec, + ) -> crate::Result<()> { + self.qdrant.upsert_to_collection(collection_name, points).await + } + pub async fn search( &self, query: &str, @@ -113,6 +122,18 @@ impl EmbedClient { self.qdrant.ensure_skill_collection(dimensions).await } + /// Ensure a room-specific memory collection exists. + pub async fn ensure_room_memory_collection( + &self, + project_name: &str, + room_id: &str, + dimensions: u64, + ) -> crate::Result<()> { + self.qdrant + .ensure_room_memory_collection(project_name, room_id, dimensions) + .await + } + /// Embed and store a conversation memory (message) in Qdrant. /// Uses per-room collection: `room:{project_name}:{room_id}`. pub async fn embed_memory( diff --git a/libs/agent/embed/mod.rs b/libs/agent/embed/mod.rs index 69e2eb7..e074961 100644 --- a/libs/agent/embed/mod.rs +++ b/libs/agent/embed/mod.rs @@ -4,7 +4,7 @@ pub mod service; pub use client::{EmbedClient, EmbedPayload, EmbedVector, SearchResult}; pub use qdrant::QdrantClient; -pub use service::{EmbedService, Embeddable}; +pub use service::{EmbedMemoryInput, EmbedService, Embeddable, TagEmbedInput}; pub async fn new_embed_client(config: &config::AppConfig) -> crate::Result { let base_url = config diff --git a/libs/agent/embed/service.rs b/libs/agent/embed/service.rs index 7787477..b4f58d7 100644 --- a/libs/agent/embed/service.rs +++ b/libs/agent/embed/service.rs @@ -5,6 +5,12 @@ use std::sync::Arc; use super::client::{EmbedClient, EmbedPayload, EmbedVector, SearchResult}; +/// Maximum characters per chunk for embedding (approximates token limit). +/// text-embedding-3-small: 8192 token limit. +/// For CJK ~1 char/token, for English ~4 chars/token. +/// Conservative limit: 7000 chars to leave room for all languages. +const MAX_CHUNK_CHARS: usize = 7000; + #[async_trait] pub trait Embeddable { fn entity_type(&self) -> &'static str; @@ -12,6 +18,72 @@ pub trait Embeddable { fn entity_id(&self) -> String; } +/// Split long text into chunks at paragraph/sentence boundaries. +/// Returns at least one chunk even for empty text. +/// Safe for multi-byte characters (uses char indices, not byte indices). +fn chunk_text(text: &str) -> Vec { + if text.is_empty() { + return vec![String::new()]; + } + if text.len() <= MAX_CHUNK_CHARS { + return vec![text.to_string()]; + } + + // Collect char boundary byte positions + let char_indices: Vec = text.char_indices().map(|(i, _)| i).collect(); + let total_chars = char_indices.len(); + + let mut chunks = Vec::new(); + let mut start_idx = 0; // char index + + while start_idx < total_chars { + // Start byte offset + let byte_start = char_indices[start_idx]; + + // Find end char index: at most MAX_CHUNK_CHARS characters + let end_char_idx = (start_idx + MAX_CHUNK_CHARS).min(total_chars); + let byte_end_candidate = char_indices[end_char_idx - 1] + text[char_indices[end_char_idx - 1]..].chars().next().map(|c| c.len_utf8()).unwrap_or(1); + + if end_char_idx >= total_chars { + chunks.push(text[byte_start..].to_string()); + break; + } + + // Try to break at paragraph or sentence boundary in the allowed range + let search_range = &text[byte_start..byte_end_candidate]; + let break_at = if let Some(pos) = search_range.rfind("\n\n") { + Some(pos + 2) // after the paragraph break + } else if let Some(pos) = search_range.rfind('\n') { + Some(pos + 1) + } else if let Some(pos) = search_range.rfind(". ") { + Some(pos + 1) + } else if let Some(pos) = search_range.rfind("! ") { + Some(pos + 1) + } else if let Some(pos) = search_range.rfind("? ") { + Some(pos + 1) + } else { + None + }; + + if let Some(offset) = break_at { + let byte_end = byte_start + offset; + chunks.push(text[byte_start..byte_end].to_string()); + // Advance char index to match the byte break + let mut advance = start_idx + 1; + while advance < total_chars && char_indices[advance] < byte_end { + advance += 1; + } + start_idx = advance; + } else { + // Hard break at char boundary + chunks.push(text[byte_start..byte_end_candidate].to_string()); + start_idx = end_char_idx; + } + } + + chunks +} + #[derive(Clone)] pub struct EmbedService { client: Arc, @@ -165,6 +237,9 @@ impl EmbedService { .ensure_collection("repo", self.dimensions) .await?; self.client.ensure_skill_collection(self.dimensions).await?; + self.client + .ensure_collection("repo_tag", self.dimensions) + .await?; // Room memory collections are created per-room on first embed Ok(()) } @@ -188,9 +263,244 @@ impl EmbedService { ) -> crate::Result<()> { let desc = description.unwrap_or_default(); let id = skill_id.to_string(); - self.client - .embed_skill(&id, name, desc, content, project_uuid, &self.model_name) - .await + + // Auto-chunk long content + let texts = chunk_text(content); + if texts.len() == 1 { + self.client + .embed_skill(&id, name, desc, content, project_uuid, &self.model_name) + .await + } else { + // Multi-chunk: embed each chunk with chunk_index metadata + let full_texts: Vec = texts.iter().map(|t| format!("{}: {} {}", name, desc, t)).collect(); + let embeddings = self.client.embed_batch(&full_texts, &self.model_name).await?; + + let points: Vec = embeddings.into_iter().enumerate().map(|(i, vector)| { + EmbedVector { + id: format!("{}:chunk:{}", id, i), + vector, + payload: EmbedPayload { + entity_type: "skill".to_string(), + entity_id: project_uuid.to_string(), + text: texts[i].clone(), + extra: serde_json::json!({ + "name": name, + "description": desc, + "chunk_index": i, + "total_chunks": texts.len(), + }).into(), + }, + } + }).collect(); + + self.client.upsert(points).await + } + } + + /// Embed an issue with auto-chunking for long content. + pub async fn embed_issue_chunked( + &self, + id: &str, + title: &str, + body: Option<&str>, + ) -> crate::Result<()> { + let text = match body { + Some(b) if !b.is_empty() => format!("{}\n\n{}", title, b), + _ => title.to_string(), + }; + + let chunks = chunk_text(&text); + if chunks.len() == 1 { + return self.embed_issue(id, title, body).await; + } + + let embeddings = self.client.embed_batch(&chunks, &self.model_name).await?; + + let points: Vec = embeddings.into_iter().enumerate().map(|(i, vector)| { + EmbedVector { + id: format!("{}:chunk:{}", id, i), + vector, + payload: EmbedPayload { + entity_type: "issue".to_string(), + entity_id: id.to_string(), + text: chunks[i].clone(), + extra: serde_json::json!({ + "chunk_index": i, + "total_chunks": chunks.len(), + }).into(), + }, + } + }).collect(); + + self.client.upsert(points).await + } + + /// Batch-embed multiple conversation messages into per-room Qdrant collections. + /// Auto-chunks long messages and filters non-text/system/empty content. + /// Handles all filtering internally: only text-type, non-empty, non-system messages are embedded. + pub async fn embed_memories_batch( + &self, + messages: Vec, + ) -> crate::Result<()> { + if messages.is_empty() { + return Ok(()); + } + + // Group by room collection for batch upsert to reduce Qdrant calls + use std::collections::HashMap; + let mut by_room: HashMap)>> = HashMap::new(); + + for msg in messages { + let chunks = chunk_text(&msg.content); + if chunks.is_empty() || chunks.iter().all(|c| c.trim().is_empty()) { + continue; + } + let collection = crate::embed::qdrant::QdrantClient::room_memory_collection_name( + &msg.project_name, &msg.room_id, + ); + by_room.entry(collection).or_default().push((msg, chunks)); + } + + for (collection, entries) in &by_room { + // Collect all texts for batch embedding + let all_texts: Vec = entries.iter() + .flat_map(|(_, chunks)| chunks.iter().cloned()) + .collect(); + + if all_texts.is_empty() { + continue; + } + + let embeddings = self.client.embed_batch(&all_texts, &self.model_name).await?; + + // Ensure the room collection exists with correct dimensions + if let Some((first, _)) = entries.first() { + let _ = self.client + .ensure_room_memory_collection(&first.project_name, &first.room_id, self.dimensions) + .await; + } + + // Build points: one per chunk + let mut points = Vec::new(); + let mut embed_idx = 0; + for (msg, chunks) in entries { + for (chunk_i, chunk) in chunks.iter().enumerate() { + if embed_idx >= embeddings.len() { + break; + } + let point_id = if chunks.len() == 1 { + msg.message_id.clone() + } else { + format!("{}:chunk:{}", msg.message_id, chunk_i) + }; + points.push(EmbedVector { + id: point_id, + vector: embeddings[embed_idx].clone(), + payload: EmbedPayload { + entity_type: "memory".to_string(), + entity_id: msg.room_id.clone(), + text: chunk.clone(), + extra: serde_json::json!({ + "user_id": msg.user_id, + "sender_type": msg.sender_type, + "chunk_index": if chunks.len() > 1 { Some(chunk_i) } else { None }, + "total_chunks": if chunks.len() > 1 { Some(chunks.len()) } else { None }, + }).into(), + }, + }); + embed_idx += 1; + } + } + + if let Err(e) = self.client.upsert_to_collection(collection, points).await { + tracing::warn!(collection = %collection, error = %e, "batch memory embed failed"); + } + } + + Ok(()) + } + + /// Batch-embed repo tags with project isolation. + /// Each tag stores project_id as entity_id for post-filtering. + pub async fn embed_tags_batch( + &self, + tags: Vec, + ) -> crate::Result<()> { + if tags.is_empty() { + return Ok(()); + } + + let texts: Vec = tags + .iter() + .map(|t| { + if let Some(ref desc) = t.description { + if !desc.is_empty() { + format!("{}: {}", t.name, desc) + } else { + t.name.clone() + } + } else { + t.name.clone() + } + }) + .collect(); + + let embeddings = self.client.embed_batch(&texts, &self.model_name).await?; + + let points: Vec = tags + .into_iter() + .zip(embeddings.into_iter()) + .map(|(tag, vector)| { + let point_id = format!("{}:{}", tag.repo_id, tag.name); + EmbedVector { + id: point_id, + vector, + payload: EmbedPayload { + entity_type: "repo_tag".to_string(), + entity_id: tag.project_id.clone(), + text: tag.name.clone(), + extra: serde_json::json!({ + "repo_id": tag.repo_id, + "repo_name": tag.repo_name, + "tag_name": tag.name, + "description": tag.description, + }) + .into(), + }, + } + }) + .collect(); + + self.client.upsert(points).await + } + + /// Search repo tags by semantic similarity within a project. + /// Filters by project_id (stored in entity_id) for project isolation. + pub async fn search_tags( + &self, + query: &str, + project_id: &str, + limit: usize, + ) -> crate::Result> { + let mut results = self + .client + .search(query, "repo_tag", &self.model_name, limit + 1) + .await?; + results.retain(|r| r.payload.entity_id == project_id); + results.truncate(limit); + Ok(results) + } + + pub fn model_name(&self) -> &str { + &self.model_name + } + + pub fn dimensions(&self) -> u64 { + self.dimensions + } + + pub fn embed_client(&self) -> &EmbedClient { + &self.client } /// Search skills by semantic similarity within a project. @@ -232,3 +542,24 @@ impl EmbedService { .await } } + +/// Input struct for batch memory embedding into per-room Qdrant collections. +#[derive(Debug, Clone)] +pub struct EmbedMemoryInput { + pub message_id: String, + pub content: String, + pub project_name: String, + pub room_id: String, + pub user_id: Option, + pub sender_type: String, +} + +/// Input struct for batch tag embedding. +#[derive(Debug, Clone)] +pub struct TagEmbedInput { + pub repo_id: String, + pub repo_name: String, + pub project_id: String, + pub name: String, + pub description: Option, +} diff --git a/libs/agent/lib.rs b/libs/agent/lib.rs index 34c4c94..9c224fa 100644 --- a/libs/agent/lib.rs +++ b/libs/agent/lib.rs @@ -29,7 +29,9 @@ pub use chat::{ pub use client::{AiCallResponse, AiClientConfig, call_with_params, call_with_retry}; pub use client::types::ChatRequestMessage; pub use compact::{CompactConfig, CompactLevel, CompactService, CompactSummary, MessageSummary}; -pub use embed::{new_embed_client, EmbedClient, EmbedService, QdrantClient, SearchResult}; +pub use embed::{ + EmbedClient, EmbedMemoryInput, EmbedService, QdrantClient, SearchResult, TagEmbedInput, new_embed_client, +}; pub use error::{AgentError, Result}; pub use react::{ReactConfig, ReactStep, DEFAULT_SYSTEM_PROMPT}; pub use tool::{