feat(embed): add chunked embedding, batch memory embed, and tag vectorization support

- chunk_text(): char-boundary-safe text chunking at paragraph/sentence breaks (7000 char limit)
- embed_memories_batch(): groups messages by room, batch-embeds all texts to reduce Qdrant calls
- embed_issue_chunked(): auto-chunks long issue bodies
- embed_skill(): upgraded with auto-chunking via chunk_text
- TagEmbedInput struct for batch tag embedding
- embed_tags_batch() / search_tags() with project isolation
- ensure_collections() now creates embed_repo_tag collection
This commit is contained in:
ZhenYi 2026-04-28 13:03:51 +08:00
parent 32d7b3b902
commit bfdb934443
4 changed files with 359 additions and 5 deletions

View File

@ -72,6 +72,15 @@ impl EmbedClient {
self.qdrant.upsert_points(points).await
}
/// Upsert points into a named collection (bypasses entity_type routing).
pub async fn upsert_to_collection(
&self,
collection_name: &str,
points: Vec<EmbedVector>,
) -> crate::Result<()> {
self.qdrant.upsert_to_collection(collection_name, points).await
}
pub async fn search(
&self,
query: &str,
@ -113,6 +122,18 @@ impl EmbedClient {
self.qdrant.ensure_skill_collection(dimensions).await
}
/// Ensure a room-specific memory collection exists.
pub async fn ensure_room_memory_collection(
&self,
project_name: &str,
room_id: &str,
dimensions: u64,
) -> crate::Result<()> {
self.qdrant
.ensure_room_memory_collection(project_name, room_id, dimensions)
.await
}
/// Embed and store a conversation memory (message) in Qdrant.
/// Uses per-room collection: `room:{project_name}:{room_id}`.
pub async fn embed_memory(

View File

@ -4,7 +4,7 @@ pub mod service;
pub use client::{EmbedClient, EmbedPayload, EmbedVector, SearchResult};
pub use qdrant::QdrantClient;
pub use service::{EmbedService, Embeddable};
pub use service::{EmbedMemoryInput, EmbedService, Embeddable, TagEmbedInput};
pub async fn new_embed_client(config: &config::AppConfig) -> crate::Result<EmbedClient> {
let base_url = config

View File

@ -5,6 +5,12 @@ use std::sync::Arc;
use super::client::{EmbedClient, EmbedPayload, EmbedVector, SearchResult};
/// Maximum characters per chunk for embedding (approximates token limit).
/// text-embedding-3-small: 8192 token limit.
/// For CJK ~1 char/token, for English ~4 chars/token.
/// Conservative limit: 7000 chars to leave room for all languages.
const MAX_CHUNK_CHARS: usize = 7000;
#[async_trait]
pub trait Embeddable {
fn entity_type(&self) -> &'static str;
@ -12,6 +18,72 @@ pub trait Embeddable {
fn entity_id(&self) -> String;
}
/// Split long text into chunks at paragraph/sentence boundaries.
/// Returns at least one chunk even for empty text.
/// Safe for multi-byte characters (uses char indices, not byte indices).
fn chunk_text(text: &str) -> Vec<String> {
if text.is_empty() {
return vec![String::new()];
}
if text.len() <= MAX_CHUNK_CHARS {
return vec![text.to_string()];
}
// Collect char boundary byte positions
let char_indices: Vec<usize> = text.char_indices().map(|(i, _)| i).collect();
let total_chars = char_indices.len();
let mut chunks = Vec::new();
let mut start_idx = 0; // char index
while start_idx < total_chars {
// Start byte offset
let byte_start = char_indices[start_idx];
// Find end char index: at most MAX_CHUNK_CHARS characters
let end_char_idx = (start_idx + MAX_CHUNK_CHARS).min(total_chars);
let byte_end_candidate = char_indices[end_char_idx - 1] + text[char_indices[end_char_idx - 1]..].chars().next().map(|c| c.len_utf8()).unwrap_or(1);
if end_char_idx >= total_chars {
chunks.push(text[byte_start..].to_string());
break;
}
// Try to break at paragraph or sentence boundary in the allowed range
let search_range = &text[byte_start..byte_end_candidate];
let break_at = if let Some(pos) = search_range.rfind("\n\n") {
Some(pos + 2) // after the paragraph break
} else if let Some(pos) = search_range.rfind('\n') {
Some(pos + 1)
} else if let Some(pos) = search_range.rfind(". ") {
Some(pos + 1)
} else if let Some(pos) = search_range.rfind("! ") {
Some(pos + 1)
} else if let Some(pos) = search_range.rfind("? ") {
Some(pos + 1)
} else {
None
};
if let Some(offset) = break_at {
let byte_end = byte_start + offset;
chunks.push(text[byte_start..byte_end].to_string());
// Advance char index to match the byte break
let mut advance = start_idx + 1;
while advance < total_chars && char_indices[advance] < byte_end {
advance += 1;
}
start_idx = advance;
} else {
// Hard break at char boundary
chunks.push(text[byte_start..byte_end_candidate].to_string());
start_idx = end_char_idx;
}
}
chunks
}
#[derive(Clone)]
pub struct EmbedService {
client: Arc<EmbedClient>,
@ -165,6 +237,9 @@ impl EmbedService {
.ensure_collection("repo", self.dimensions)
.await?;
self.client.ensure_skill_collection(self.dimensions).await?;
self.client
.ensure_collection("repo_tag", self.dimensions)
.await?;
// Room memory collections are created per-room on first embed
Ok(())
}
@ -188,9 +263,244 @@ impl EmbedService {
) -> crate::Result<()> {
let desc = description.unwrap_or_default();
let id = skill_id.to_string();
self.client
.embed_skill(&id, name, desc, content, project_uuid, &self.model_name)
.await
// Auto-chunk long content
let texts = chunk_text(content);
if texts.len() == 1 {
self.client
.embed_skill(&id, name, desc, content, project_uuid, &self.model_name)
.await
} else {
// Multi-chunk: embed each chunk with chunk_index metadata
let full_texts: Vec<String> = texts.iter().map(|t| format!("{}: {} {}", name, desc, t)).collect();
let embeddings = self.client.embed_batch(&full_texts, &self.model_name).await?;
let points: Vec<EmbedVector> = embeddings.into_iter().enumerate().map(|(i, vector)| {
EmbedVector {
id: format!("{}:chunk:{}", id, i),
vector,
payload: EmbedPayload {
entity_type: "skill".to_string(),
entity_id: project_uuid.to_string(),
text: texts[i].clone(),
extra: serde_json::json!({
"name": name,
"description": desc,
"chunk_index": i,
"total_chunks": texts.len(),
}).into(),
},
}
}).collect();
self.client.upsert(points).await
}
}
/// Embed an issue with auto-chunking for long content.
pub async fn embed_issue_chunked(
&self,
id: &str,
title: &str,
body: Option<&str>,
) -> crate::Result<()> {
let text = match body {
Some(b) if !b.is_empty() => format!("{}\n\n{}", title, b),
_ => title.to_string(),
};
let chunks = chunk_text(&text);
if chunks.len() == 1 {
return self.embed_issue(id, title, body).await;
}
let embeddings = self.client.embed_batch(&chunks, &self.model_name).await?;
let points: Vec<EmbedVector> = embeddings.into_iter().enumerate().map(|(i, vector)| {
EmbedVector {
id: format!("{}:chunk:{}", id, i),
vector,
payload: EmbedPayload {
entity_type: "issue".to_string(),
entity_id: id.to_string(),
text: chunks[i].clone(),
extra: serde_json::json!({
"chunk_index": i,
"total_chunks": chunks.len(),
}).into(),
},
}
}).collect();
self.client.upsert(points).await
}
/// Batch-embed multiple conversation messages into per-room Qdrant collections.
/// Auto-chunks long messages and filters non-text/system/empty content.
/// Handles all filtering internally: only text-type, non-empty, non-system messages are embedded.
pub async fn embed_memories_batch(
&self,
messages: Vec<EmbedMemoryInput>,
) -> crate::Result<()> {
if messages.is_empty() {
return Ok(());
}
// Group by room collection for batch upsert to reduce Qdrant calls
use std::collections::HashMap;
let mut by_room: HashMap<String, Vec<(EmbedMemoryInput, Vec<String>)>> = HashMap::new();
for msg in messages {
let chunks = chunk_text(&msg.content);
if chunks.is_empty() || chunks.iter().all(|c| c.trim().is_empty()) {
continue;
}
let collection = crate::embed::qdrant::QdrantClient::room_memory_collection_name(
&msg.project_name, &msg.room_id,
);
by_room.entry(collection).or_default().push((msg, chunks));
}
for (collection, entries) in &by_room {
// Collect all texts for batch embedding
let all_texts: Vec<String> = entries.iter()
.flat_map(|(_, chunks)| chunks.iter().cloned())
.collect();
if all_texts.is_empty() {
continue;
}
let embeddings = self.client.embed_batch(&all_texts, &self.model_name).await?;
// Ensure the room collection exists with correct dimensions
if let Some((first, _)) = entries.first() {
let _ = self.client
.ensure_room_memory_collection(&first.project_name, &first.room_id, self.dimensions)
.await;
}
// Build points: one per chunk
let mut points = Vec::new();
let mut embed_idx = 0;
for (msg, chunks) in entries {
for (chunk_i, chunk) in chunks.iter().enumerate() {
if embed_idx >= embeddings.len() {
break;
}
let point_id = if chunks.len() == 1 {
msg.message_id.clone()
} else {
format!("{}:chunk:{}", msg.message_id, chunk_i)
};
points.push(EmbedVector {
id: point_id,
vector: embeddings[embed_idx].clone(),
payload: EmbedPayload {
entity_type: "memory".to_string(),
entity_id: msg.room_id.clone(),
text: chunk.clone(),
extra: serde_json::json!({
"user_id": msg.user_id,
"sender_type": msg.sender_type,
"chunk_index": if chunks.len() > 1 { Some(chunk_i) } else { None },
"total_chunks": if chunks.len() > 1 { Some(chunks.len()) } else { None },
}).into(),
},
});
embed_idx += 1;
}
}
if let Err(e) = self.client.upsert_to_collection(collection, points).await {
tracing::warn!(collection = %collection, error = %e, "batch memory embed failed");
}
}
Ok(())
}
/// Batch-embed repo tags with project isolation.
/// Each tag stores project_id as entity_id for post-filtering.
pub async fn embed_tags_batch(
&self,
tags: Vec<TagEmbedInput>,
) -> crate::Result<()> {
if tags.is_empty() {
return Ok(());
}
let texts: Vec<String> = tags
.iter()
.map(|t| {
if let Some(ref desc) = t.description {
if !desc.is_empty() {
format!("{}: {}", t.name, desc)
} else {
t.name.clone()
}
} else {
t.name.clone()
}
})
.collect();
let embeddings = self.client.embed_batch(&texts, &self.model_name).await?;
let points: Vec<EmbedVector> = tags
.into_iter()
.zip(embeddings.into_iter())
.map(|(tag, vector)| {
let point_id = format!("{}:{}", tag.repo_id, tag.name);
EmbedVector {
id: point_id,
vector,
payload: EmbedPayload {
entity_type: "repo_tag".to_string(),
entity_id: tag.project_id.clone(),
text: tag.name.clone(),
extra: serde_json::json!({
"repo_id": tag.repo_id,
"repo_name": tag.repo_name,
"tag_name": tag.name,
"description": tag.description,
})
.into(),
},
}
})
.collect();
self.client.upsert(points).await
}
/// Search repo tags by semantic similarity within a project.
/// Filters by project_id (stored in entity_id) for project isolation.
pub async fn search_tags(
&self,
query: &str,
project_id: &str,
limit: usize,
) -> crate::Result<Vec<SearchResult>> {
let mut results = self
.client
.search(query, "repo_tag", &self.model_name, limit + 1)
.await?;
results.retain(|r| r.payload.entity_id == project_id);
results.truncate(limit);
Ok(results)
}
pub fn model_name(&self) -> &str {
&self.model_name
}
pub fn dimensions(&self) -> u64 {
self.dimensions
}
pub fn embed_client(&self) -> &EmbedClient {
&self.client
}
/// Search skills by semantic similarity within a project.
@ -232,3 +542,24 @@ impl EmbedService {
.await
}
}
/// Input struct for batch memory embedding into per-room Qdrant collections.
#[derive(Debug, Clone)]
pub struct EmbedMemoryInput {
pub message_id: String,
pub content: String,
pub project_name: String,
pub room_id: String,
pub user_id: Option<String>,
pub sender_type: String,
}
/// Input struct for batch tag embedding.
#[derive(Debug, Clone)]
pub struct TagEmbedInput {
pub repo_id: String,
pub repo_name: String,
pub project_id: String,
pub name: String,
pub description: Option<String>,
}

View File

@ -29,7 +29,9 @@ pub use chat::{
pub use client::{AiCallResponse, AiClientConfig, call_with_params, call_with_retry};
pub use client::types::ChatRequestMessage;
pub use compact::{CompactConfig, CompactLevel, CompactService, CompactSummary, MessageSummary};
pub use embed::{new_embed_client, EmbedClient, EmbedService, QdrantClient, SearchResult};
pub use embed::{
EmbedClient, EmbedMemoryInput, EmbedService, QdrantClient, SearchResult, TagEmbedInput, new_embed_client,
};
pub use error::{AgentError, Result};
pub use react::{ReactConfig, ReactStep, DEFAULT_SYSTEM_PROMPT};
pub use tool::{