492 lines
19 KiB
Rust
492 lines
19 KiB
Rust
use models::projects::project_skill;
|
|
use sea_orm::*;
|
|
|
|
use super::context::RoomMessageContext;
|
|
use super::{AiRequest, Mention};
|
|
use crate::client::types::ChatRequestMessage;
|
|
use crate::compact::CompactService;
|
|
use crate::embed::EmbedService;
|
|
use crate::error::Result;
|
|
use crate::perception::{PerceptionService, SkillEntry};
|
|
|
|
pub struct MessageBuilder {
|
|
pub compact_service: Option<CompactService>,
|
|
pub embed_service: Option<EmbedService>,
|
|
pub perception_service: PerceptionService,
|
|
}
|
|
|
|
impl MessageBuilder {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
compact_service: None,
|
|
embed_service: None,
|
|
perception_service: PerceptionService::default(),
|
|
}
|
|
}
|
|
|
|
pub fn with_compact_service(mut self, cs: CompactService) -> Self {
|
|
self.compact_service = Some(cs);
|
|
self
|
|
}
|
|
|
|
pub fn with_embed_service(mut self, es: EmbedService) -> Self {
|
|
self.embed_service = Some(es);
|
|
self
|
|
}
|
|
|
|
pub fn with_perception_service(mut self, ps: PerceptionService) -> Self {
|
|
self.perception_service = ps;
|
|
self
|
|
}
|
|
|
|
pub async fn build_messages(&self, request: &AiRequest) -> Result<Vec<ChatRequestMessage>> {
|
|
let mut messages = Vec::new();
|
|
|
|
messages.push(ChatRequestMessage::system(
|
|
"When receiving a question or problem, follow this reasoning process:\n\
|
|
1. ANALYZE: Break down the question. Identify what is being asked, what context is available, and what information is missing.\n\
|
|
2. GATHER: Use available tools (repository search, file reading, etc.) to collect relevant information before answering.\n\
|
|
3. REASON: Synthesize the gathered information. Consider edge cases and potential issues.\n\
|
|
4. ANSWER: Provide a clear, actionable answer based on your analysis.\n\
|
|
\n\
|
|
Do NOT guess or assume when tools can provide concrete answers. Always verify claims against actual code or documentation.".to_string()
|
|
));
|
|
|
|
let mut processed_history = Vec::new();
|
|
if let Some(compact_service) = &self.compact_service {
|
|
let compact_cache_key = format!("ai:compact:{}", request.room.id);
|
|
let cached_summary: Option<String> = match request.cache.conn().await {
|
|
Ok(mut conn) => redis::cmd("GET")
|
|
.arg(&compact_cache_key)
|
|
.query_async::<Option<String>>(&mut conn)
|
|
.await
|
|
.unwrap_or(None),
|
|
Err(e) => {
|
|
tracing::warn!(error = %e, "compact cache: conn failed");
|
|
None
|
|
}
|
|
};
|
|
|
|
if let Some(cached_json) = cached_summary {
|
|
if let Ok(summary) =
|
|
serde_json::from_str::<crate::compact::CompactSummary>(&cached_json)
|
|
{
|
|
if !summary.summary.is_empty() {
|
|
messages.push(ChatRequestMessage::system(format!(
|
|
"Conversation summary:\n{}",
|
|
summary.summary
|
|
)));
|
|
}
|
|
processed_history = summary.retained;
|
|
}
|
|
}
|
|
|
|
if processed_history.is_empty() {
|
|
let compact_config = request
|
|
.context_setting
|
|
.as_ref()
|
|
.map(|s| {
|
|
crate::compact::CompactConfig::from_project_setting(
|
|
s.context_window_tokens,
|
|
s.compaction_threshold,
|
|
s.compaction_max_summary_ratio,
|
|
)
|
|
})
|
|
.unwrap_or_default();
|
|
let history_text = request
|
|
.history
|
|
.iter()
|
|
.map(|m| m.content.as_str())
|
|
.collect::<Vec<_>>()
|
|
.join("\n");
|
|
let estimated_tokens =
|
|
crate::tokent::count_message_text(&history_text, &request.model.name)
|
|
.unwrap_or_else(|_| history_text.len() / 4);
|
|
|
|
if estimated_tokens < compact_config.token_threshold {
|
|
tracing::debug!(
|
|
estimated_tokens,
|
|
threshold = compact_config.token_threshold,
|
|
"conversation compaction skipped below threshold"
|
|
);
|
|
} else {
|
|
match compact_service
|
|
.for_model(&request.model.name)
|
|
.compact_room(
|
|
request.room.id,
|
|
compact_config.default_level,
|
|
Some(request.user_names.clone()),
|
|
request.sender.uid,
|
|
request
|
|
.context_setting
|
|
.as_ref()
|
|
.map(|s| s.context_window_tokens)
|
|
.unwrap_or(128000),
|
|
request
|
|
.context_setting
|
|
.as_ref()
|
|
.map(|s| s.compaction_max_summary_ratio)
|
|
.unwrap_or(0.2),
|
|
)
|
|
.await
|
|
{
|
|
Ok(compact_summary) => {
|
|
if !compact_summary.summary.is_empty() {
|
|
messages.push(ChatRequestMessage::system(format!(
|
|
"Conversation summary:\n{}",
|
|
compact_summary.summary
|
|
)));
|
|
}
|
|
if let Ok(json) = serde_json::to_string(&compact_summary) {
|
|
if let Ok(mut conn) = request.cache.conn().await {
|
|
let _ = redis::cmd("SETEX").arg(&compact_cache_key).arg(300u64).arg(&json).query_async::<()>(&mut conn).await
|
|
.inspect_err(|e| { tracing::warn!(error = %e, "compact cache: SETEX failed"); });
|
|
}
|
|
}
|
|
processed_history = compact_summary.retained;
|
|
}
|
|
Err(e) => {
|
|
tracing::warn!(error = %e, "conversation compaction failed, using full history")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if !processed_history.is_empty() {
|
|
for msg_summary in processed_history {
|
|
let ctx = RoomMessageContext::from(msg_summary);
|
|
messages.push(ctx.to_message());
|
|
}
|
|
} else {
|
|
for msg in &request.history {
|
|
let ctx = RoomMessageContext::from_model_with_names(msg, &request.user_names);
|
|
messages.push(ctx.to_message());
|
|
}
|
|
}
|
|
|
|
for mention in &request.mention {
|
|
match mention {
|
|
Mention::Repo(repo) => {
|
|
let mut parts = vec![
|
|
format!("Name: {}", repo.repo_name),
|
|
format!("ID: {}", repo.id),
|
|
];
|
|
if let Some(ref desc) = repo.description {
|
|
parts.push(format!("Description: {}", desc));
|
|
}
|
|
parts.push(format!("Default branch: {}", repo.default_branch));
|
|
parts.push(format!(
|
|
"Private: {}",
|
|
if repo.is_private { "yes" } else { "no" }
|
|
));
|
|
parts.push(format!("Created: {}", repo.created_at.format("%Y-%m-%d")));
|
|
messages.push(ChatRequestMessage::system(format!(
|
|
"Mentioned repository:\n{}",
|
|
parts.join("\n")
|
|
)));
|
|
|
|
if let Some(embed_service) = &self.embed_service {
|
|
let query = format!(
|
|
"{} {}",
|
|
repo.repo_name,
|
|
repo.description.as_deref().unwrap_or_default()
|
|
);
|
|
if let Ok(issues) = embed_service.search_issues(&query, 5).await {
|
|
if !issues.is_empty() {
|
|
let context = format!(
|
|
"Related issues for repo {}:\n{}",
|
|
repo.repo_name,
|
|
issues
|
|
.iter()
|
|
.map(|i| format!("- {}", i.payload.text))
|
|
.collect::<Vec<_>>()
|
|
.join("\n")
|
|
);
|
|
messages.push(ChatRequestMessage::system(context));
|
|
}
|
|
}
|
|
if let Ok(repos) = embed_service.search_repos(&query, 3).await {
|
|
if !repos.is_empty() {
|
|
let context = format!(
|
|
"Similar repositories:\n{}",
|
|
repos
|
|
.iter()
|
|
.map(|r| format!("- {}", r.payload.text))
|
|
.collect::<Vec<_>>()
|
|
.join("\n")
|
|
);
|
|
messages.push(ChatRequestMessage::system(context));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Mention::User(user) => {
|
|
let mut profile_parts = vec![format!("Username: {}", user.username)];
|
|
if let Some(ref display_name) = user.display_name {
|
|
profile_parts.push(format!("Display name: {}", display_name));
|
|
}
|
|
if let Some(ref org) = user.organization {
|
|
profile_parts.push(format!("Organization: {}", org));
|
|
}
|
|
if let Some(ref website) = user.website_url {
|
|
profile_parts.push(format!("Website: {}", website));
|
|
}
|
|
messages.push(ChatRequestMessage::system(format!(
|
|
"Mentioned user profile:\n{}",
|
|
profile_parts.join("\n")
|
|
)));
|
|
}
|
|
}
|
|
}
|
|
|
|
for ctx in self.build_skill_context(request).await {
|
|
messages.push(ctx.to_system_message());
|
|
}
|
|
for mem in self.build_memory_context(request).await {
|
|
messages.push(mem.to_system_message());
|
|
}
|
|
|
|
messages.push(ChatRequestMessage::system(format!(
|
|
"Current Project:\n{}\nDescription: {}\nPublic: {}",
|
|
request.project.display_name,
|
|
request.project.description.as_deref().unwrap_or("(none)"),
|
|
if request.project.is_public {
|
|
"yes"
|
|
} else {
|
|
"no"
|
|
}
|
|
)));
|
|
|
|
let mut sender_parts = vec![format!("**Sender:** {}", request.sender.username)];
|
|
if let Some(ref display_name) = request.sender.display_name {
|
|
sender_parts.push(display_name.clone());
|
|
}
|
|
if let Some(ref org) = request.sender.organization {
|
|
sender_parts.push(format!("({})", org));
|
|
}
|
|
messages.push(ChatRequestMessage::system(format!(
|
|
"The person sending the next message:\n{}",
|
|
sender_parts.join(" ")
|
|
)));
|
|
messages.push(ChatRequestMessage::user(&request.input));
|
|
Ok(messages)
|
|
}
|
|
|
|
pub async fn build_room_optimized_context_text(
|
|
&self,
|
|
request: &AiRequest,
|
|
) -> Result<(String, Option<i64>)> {
|
|
let Some(compact_service) = &self.compact_service else {
|
|
return Ok((Self::recent_history_text(request, None), None));
|
|
};
|
|
|
|
let compact_config = request
|
|
.context_setting
|
|
.as_ref()
|
|
.map(|s| {
|
|
crate::compact::CompactConfig::from_project_setting(
|
|
s.context_window_tokens,
|
|
s.compaction_threshold,
|
|
s.compaction_max_summary_ratio,
|
|
)
|
|
})
|
|
.unwrap_or_default();
|
|
|
|
let context = match compact_service
|
|
.for_model(&request.model.name)
|
|
.prepare_room_compact_context(
|
|
request.room.id,
|
|
request.sender.uid,
|
|
Some(request.user_names.clone()),
|
|
compact_config,
|
|
)
|
|
.await
|
|
{
|
|
Ok(context) => context,
|
|
Err(e) => {
|
|
tracing::warn!(error = %e, "room compact context unavailable; using recent history");
|
|
return Ok((Self::recent_history_text(request, None), None));
|
|
}
|
|
};
|
|
|
|
let mut parts = Vec::new();
|
|
if let Some(summary) = context.summary.as_ref().filter(|s| !s.trim().is_empty()) {
|
|
parts.push(format!(
|
|
"### Compressed Conversation Before Seq {}\n{}",
|
|
context.cutoff_seq.unwrap_or_default(),
|
|
summary
|
|
));
|
|
}
|
|
if !context.retained.is_empty() {
|
|
parts.push(format!(
|
|
"### Recent Conversation After Compression\n{}",
|
|
crate::compact::helpers::retained_as_text(&context.retained)
|
|
));
|
|
} else if parts.is_empty() {
|
|
parts.push(Self::recent_history_text(request, context.cutoff_seq));
|
|
}
|
|
|
|
Ok((parts.join("\n\n"), context.cutoff_seq))
|
|
}
|
|
|
|
fn recent_history_text(request: &AiRequest, cutoff_seq: Option<i64>) -> String {
|
|
let mut lines = Vec::new();
|
|
for msg in request
|
|
.history
|
|
.iter()
|
|
.filter(|m| cutoff_seq.map(|seq| m.seq > seq).unwrap_or(true))
|
|
.rev()
|
|
.take(20)
|
|
.collect::<Vec<_>>()
|
|
.into_iter()
|
|
.rev()
|
|
{
|
|
let author = msg
|
|
.sender_id
|
|
.and_then(|uid| request.user_names.get(&uid))
|
|
.cloned()
|
|
.unwrap_or_else(|| msg.sender_type.to_string());
|
|
let content = msg.content.split_whitespace().collect::<Vec<_>>().join(" ");
|
|
let content = if content.len() > 500 {
|
|
format!("{}...", content.chars().take(500).collect::<String>())
|
|
} else {
|
|
content
|
|
};
|
|
lines.push(format!("[{}] {}: {}", msg.send_at, author, content));
|
|
}
|
|
if lines.is_empty() {
|
|
"### Recent Conversation\nnone".to_string()
|
|
} else {
|
|
format!("### Recent Conversation\n{}", lines.join("\n"))
|
|
}
|
|
}
|
|
|
|
async fn build_skill_context(
|
|
&self,
|
|
request: &AiRequest,
|
|
) -> Vec<crate::perception::SkillContext> {
|
|
let db_skills: Vec<SkillEntry> = match project_skill::Entity::find()
|
|
.filter(project_skill::Column::ProjectUuid.eq(request.project.id))
|
|
.filter(project_skill::Column::Enabled.eq(true))
|
|
.all(&request.db)
|
|
.await
|
|
{
|
|
Ok(models) => models
|
|
.into_iter()
|
|
.map(|s| SkillEntry {
|
|
slug: s.slug,
|
|
name: s.name,
|
|
description: s.description,
|
|
content: s.content,
|
|
})
|
|
.collect(),
|
|
Err(_) => Vec::new(),
|
|
};
|
|
let mut all_skills: Vec<SkillEntry> = db_skills;
|
|
for built_in in crate::skills::all_skills() {
|
|
if !all_skills.iter().any(|s| s.slug == built_in.slug) {
|
|
all_skills.push(SkillEntry {
|
|
slug: built_in.slug.to_string(),
|
|
name: built_in.name.to_string(),
|
|
description: Some(built_in.description.to_string()),
|
|
content: built_in.content.clone(),
|
|
});
|
|
}
|
|
}
|
|
if all_skills.is_empty() {
|
|
return Vec::new();
|
|
}
|
|
|
|
let history_texts: Vec<String> = request
|
|
.history
|
|
.iter()
|
|
.rev()
|
|
.take(10)
|
|
.map(|msg| msg.content.clone())
|
|
.collect();
|
|
let keyword_skills = self
|
|
.perception_service
|
|
.inject_skills(&request.input, &history_texts, &[], &all_skills)
|
|
.await;
|
|
let mut vector_skills = Vec::new();
|
|
if let Some(es) = &self.embed_service {
|
|
let rag_enabled = request
|
|
.context_setting
|
|
.as_ref()
|
|
.map(|s| s.rag_enabled)
|
|
.unwrap_or(true);
|
|
if rag_enabled {
|
|
let max_results = request
|
|
.context_setting
|
|
.as_ref()
|
|
.map(|s| s.rag_max_results as usize)
|
|
.unwrap_or(3);
|
|
let min_score = request
|
|
.context_setting
|
|
.as_ref()
|
|
.map(|s| s.rag_min_score)
|
|
.unwrap_or(0.70);
|
|
let awareness =
|
|
crate::perception::VectorActiveAwareness::new(max_results, min_score);
|
|
vector_skills = awareness
|
|
.detect(es, &request.input, &request.project.id.to_string())
|
|
.await;
|
|
}
|
|
}
|
|
let mut seen = std::collections::HashSet::new();
|
|
let mut result = Vec::new();
|
|
for ctx in vector_skills {
|
|
if seen.insert(ctx.label.clone()) {
|
|
result.push(ctx);
|
|
}
|
|
}
|
|
for ctx in keyword_skills {
|
|
if seen.insert(ctx.label.clone()) {
|
|
result.push(ctx);
|
|
}
|
|
}
|
|
result
|
|
}
|
|
|
|
async fn build_memory_context(
|
|
&self,
|
|
request: &AiRequest,
|
|
) -> Vec<crate::perception::vector::MemoryContext> {
|
|
let rag_enabled = request
|
|
.context_setting
|
|
.as_ref()
|
|
.map(|s| s.rag_enabled)
|
|
.unwrap_or(true);
|
|
if !rag_enabled {
|
|
return Vec::new();
|
|
}
|
|
match &self.embed_service {
|
|
Some(es) => {
|
|
let max_results = request
|
|
.context_setting
|
|
.as_ref()
|
|
.map(|s| s.rag_max_results as usize)
|
|
.unwrap_or(3);
|
|
let min_score = request
|
|
.context_setting
|
|
.as_ref()
|
|
.map(|s| s.rag_min_score)
|
|
.unwrap_or(0.72);
|
|
let awareness =
|
|
crate::perception::VectorPassiveAwareness::new(max_results, min_score);
|
|
awareness
|
|
.detect(
|
|
es,
|
|
&request.input,
|
|
&request.project.display_name,
|
|
&request.room.id.to_string(),
|
|
request.history_cutoff_seq,
|
|
)
|
|
.await
|
|
}
|
|
None => Vec::new(),
|
|
}
|
|
}
|
|
}
|