diff --git a/Cargo.lock b/Cargo.lock index c439e97..ac792c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3108,6 +3108,7 @@ name = "git" version = "0.2.9" dependencies = [ "actix-web", + "agent", "anyhow", "async-stream", "base64 0.22.1", diff --git a/apps/git-hook/src/main.rs b/apps/git-hook/src/main.rs index 285a235..2732a2d 100644 --- a/apps/git-hook/src/main.rs +++ b/apps/git-hook/src/main.rs @@ -112,7 +112,7 @@ async fn main() -> anyhow::Result<()> { cfg, ); - let cancel = hooks.start_worker(); + let cancel = hooks.start_worker().await; let cancel_signal = cancel.clone(); // 7. Start health/metrics server on a dedicated port diff --git a/libs/fctool/src/git_tools/tag.rs b/libs/fctool/src/git_tools/tag.rs index eed8808..a7b239b 100644 --- a/libs/fctool/src/git_tools/tag.rs +++ b/libs/fctool/src/git_tools/tag.rs @@ -2,6 +2,7 @@ use super::ctx::GitToolCtx; use agent::{ToolDefinition, ToolHandler, ToolParam, ToolRegistry, ToolSchema}; +use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; use std::collections::HashMap; async fn git_tag_list_exec(ctx: GitToolCtx, args: serde_json::Value) -> Result { @@ -63,6 +64,56 @@ async fn git_tag_info_exec(ctx: GitToolCtx, args: serde_json::Value) -> Result Result { + let p: serde_json::Map = serde_json::from_value(args).map_err(|e| e.to_string())?; + let project_name = p.get("project_name").and_then(|v| v.as_str()).ok_or("missing project_name")?; + let query = p.get("query").and_then(|v| v.as_str()).ok_or("missing query")?; + let limit = p.get("limit").and_then(|v| v.as_u64()).unwrap_or(5) as usize; + + // Resolve project_id from project_name for project isolation + let db = ctx.ctx.db(); + let project = models::projects::project::Entity::find() + .filter(models::projects::project::Column::Name.eq(project_name)) + .one(db) + .await + .map_err(|e| format!("DB error looking up project '{}': {}", project_name, e))? + .ok_or_else(|| format!("project '{}' not found", project_name))?; + let project_id = project.id.to_string(); + + // Get embed_service from context + let embed = ctx.ctx.embed_service() + .ok_or_else(|| "EmbedService not available — Qdrant vector search is disabled".to_string())?; + + let results = embed.search_tags(query, &project_id, limit).await + .map_err(|e| format!("tag search failed: {}", e))?; + + let json_results: Vec = results.into_iter().map(|r| { + let repo_name = r.payload.extra.as_ref() + .and_then(|e| e.get("repo_name")) + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + let tag_name = r.payload.extra.as_ref() + .and_then(|e| e.get("tag_name")) + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + let description = r.payload.extra.as_ref() + .and_then(|e| e.get("description")) + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + serde_json::json!({ + "tag_name": tag_name, + "repo_name": repo_name, + "description": description, + "score": r.score, + }) + }).collect(); + + Ok(serde_json::to_value(json_results).map_err(|e| e.to_string())?) +} + pub fn register_git_tools(registry: &mut ToolRegistry) { // git_tag_list let p = HashMap::from([ @@ -97,4 +148,21 @@ pub fn register_git_tools(registry: &mut ToolRegistry) { }) }), ); -} \ No newline at end of file + + // git_tag_search + let p = HashMap::from([ + ("project_name".into(), ToolParam { name: "project_name".into(), param_type: "string".into(), description: Some("Project name (slug)".into()), required: true, properties: None, items: None }), + ("query".into(), ToolParam { name: "query".into(), param_type: "string".into(), description: Some("Semantic search query to find relevant tags".into()), required: true, properties: None, items: None }), + ("limit".into(), ToolParam { name: "limit".into(), param_type: "integer".into(), description: Some("Maximum number of results (default 5)".into()), required: false, properties: None, items: None }), + ]); + let schema = ToolSchema { schema_type: "object".into(), properties: Some(p), required: Some(vec!["project_name".into(), "query".into()]) }; + registry.register( + ToolDefinition::new("git_tag_search").description("Semantically search tags across repositories in a project. Uses vector search to find tags relevant to the query, with project-level isolation.").parameters(schema), + ToolHandler::new(|ctx, args| { + let gctx = super::ctx::GitToolCtx::new(ctx); + Box::pin(async move { + git_tag_search_exec(gctx, args).await.map_err(agent::ToolError::ExecutionError) + }) + }), + ); +} diff --git a/libs/git/Cargo.toml b/libs/git/Cargo.toml index 53cb7bb..396de24 100644 --- a/libs/git/Cargo.toml +++ b/libs/git/Cargo.toml @@ -51,5 +51,6 @@ actix-web = { workspace = true } hex = "0.4.3" reqwest = { workspace = true } metrics = { workspace = true } +agent = { workspace = true } [lints] workspace = true diff --git a/libs/git/hook/mod.rs b/libs/git/hook/mod.rs index 6a63bb6..1ec9994 100644 --- a/libs/git/hook/mod.rs +++ b/libs/git/hook/mod.rs @@ -11,6 +11,34 @@ pub mod webhook_dispatch; pub use pool::{HookWorker, PoolConfig, RedisConsumer}; pub use pool::types::{HookTask, TaskType}; +/// Helper to initialize an optional EmbedService from config (graceful degradation). +async fn init_embed_service(config: &AppConfig, db: &AppDatabase) -> Option { + match agent::new_embed_client(config).await { + Ok(client) => { + let model_name = config + .get_embed_model_name() + .unwrap_or_else(|_| "text-embedding-3-small".into()); + let dimensions = config + .get_embed_model_dimensions() + .unwrap_or(1536); + let svc = agent::embed::EmbedService::new( + client, + db.writer().clone(), + model_name, + dimensions, + ); + // Ensure the repo_tag collection exists + let _ = svc.ensure_collections().await; + tracing::info!("hook worker: EmbedService initialized for tag embedding"); + Some(svc) + } + Err(e) => { + tracing::warn!(error = %e, "hook worker: EmbedService not available — tag embedding disabled"); + None + } + } +} + /// Hook service that manages the Redis-backed task queue worker. /// Multiple gitserver pods can run concurrently — the worker acquires a /// per-repo Redis lock before processing each task. @@ -20,6 +48,7 @@ pub struct HookService { pub(crate) cache: AppCache, pub(crate) redis_pool: RedisPool, pub(crate) config: AppConfig, + pub(crate) embed_service: Option, } impl HookService { @@ -34,17 +63,31 @@ impl HookService { cache, redis_pool, config, + embed_service: None, } } + /// Set an externally-initialized EmbedService (e.g. from the web app). + pub fn with_embed_service(mut self, svc: agent::embed::EmbedService) -> Self { + self.embed_service = Some(svc); + self + } + /// Start the background worker and return a cancellation token. - pub fn start_worker(&self) -> CancellationToken { + pub async fn start_worker(&self) -> CancellationToken { + // Auto-init embed_service if not set and config allows (standalone binaries) + let embed = match self.embed_service.clone() { + Some(svc) => Some(svc), + None => init_embed_service(&self.config, &self.db).await, + }; + let pool_config = PoolConfig::from_env(&self.config); pool::start_worker( self.db.clone(), self.cache.clone(), self.redis_pool.clone(), pool_config, + embed, ) } } diff --git a/libs/git/hook/pool/mod.rs b/libs/git/hook/pool/mod.rs index 5c0948e..4327870 100644 --- a/libs/git/hook/pool/mod.rs +++ b/libs/git/hook/pool/mod.rs @@ -19,6 +19,7 @@ pub fn start_worker( cache: AppCache, redis_pool: RedisPool, config: PoolConfig, + embed_service: Option, ) -> CancellationToken { let consumer = RedisConsumer::new( redis_pool.clone(), @@ -28,7 +29,7 @@ pub fn start_worker( let http_client = Arc::new(reqwest::Client::new()); let max_retries = config.redis_max_retries as u32; - let worker = HookWorker::new(db, cache, consumer, http_client, max_retries); + let worker = HookWorker::new(db, cache, consumer, http_client, max_retries, embed_service); let cancel = CancellationToken::new(); let cancel_clone = cancel.clone(); diff --git a/libs/git/hook/pool/worker.rs b/libs/git/hook/pool/worker.rs index a503f5d..f104ddc 100644 --- a/libs/git/hook/pool/worker.rs +++ b/libs/git/hook/pool/worker.rs @@ -2,10 +2,13 @@ use crate::error::GitError; use crate::hook::pool::redis::RedisConsumer; use crate::hook::pool::types::{HookTask, TaskType}; use crate::hook::sync::HookMetaDataSync; +use agent::TagEmbedInput; use db::cache::AppCache; use db::database::AppDatabase; use metrics::counter; +use models::repos::repo_tag; use models::EntityTrait; +use sea_orm::{ColumnTrait, QueryFilter}; use std::sync::Arc; use std::time::Duration; use tokio_util::sync::CancellationToken; @@ -20,6 +23,7 @@ pub struct HookWorker { consumer: RedisConsumer, http_client: Arc, max_retries: u32, + embed_service: Option, } impl HookWorker { @@ -29,6 +33,7 @@ impl HookWorker { consumer: RedisConsumer, http_client: Arc, max_retries: u32, + embed_service: Option, ) -> Self { Self { db, @@ -36,6 +41,7 @@ impl HookWorker { consumer, http_client, max_retries, + embed_service, } } @@ -277,6 +283,7 @@ impl HookWorker { // Dispatch tag webhooks and collect handles let mut tag_changes: u64 = 0; + let mut changed_tag_names: Vec = Vec::new(); for (tag, after_oid) in after_tag_tips { let before_oid = before_tag_tips .iter() @@ -286,6 +293,7 @@ impl HookWorker { let was_updated = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(false); if is_new || was_updated { tag_changes += 1; + changed_tag_names.push(tag.clone()); let before_oid = before_oid.map_or("0", |v| v).to_string(); let tag_name = tag.clone(); let h = tokio::spawn({ @@ -326,6 +334,51 @@ impl HookWorker { counter!("hook_sync_branches_changed_total").increment(branch_changes); counter!("hook_sync_tags_changed_total").increment(tag_changes); + // Embed changed tags into Qdrant for semantic search (non-blocking, fire-and-forget) + if let Some(ref embed) = self.embed_service { + if !changed_tag_names.is_empty() { + let es = embed.clone(); + let db = self.db.clone(); + let repo_uuid = repo_uuid; + let repo_name = repo_name.clone(); + let project_id = project.to_string(); + let tag_names = changed_tag_names.clone(); + tokio::spawn(async move { + let tags = repo_tag::Entity::find() + .filter(repo_tag::Column::Repo.eq(repo_uuid)) + .filter(repo_tag::Column::Name.is_in(tag_names)) + .all(db.reader()) + .await; + + match tags { + Ok(rows) => { + let count = rows.len(); + let inputs: Vec = rows + .into_iter() + .map(|t| TagEmbedInput { + repo_id: repo_uuid.to_string(), + repo_name: repo_name.clone(), + project_id: project_id.clone(), + name: t.name, + description: t.description, + }) + .collect(); + if !inputs.is_empty() { + if let Err(e) = es.embed_tags_batch(inputs).await { + tracing::warn!(error = %e, "failed to embed changed tags"); + } else { + tracing::debug!(count, "embedded changed tags into Qdrant"); + } + } + } + Err(e) => { + tracing::warn!(error = %e, "failed to query changed tags for embedding"); + } + } + }); + } + } + Ok(()) } diff --git a/libs/git/http/mod.rs b/libs/git/http/mod.rs index 56607e8..6c02a60 100644 --- a/libs/git/http/mod.rs +++ b/libs/git/http/mod.rs @@ -118,14 +118,12 @@ pub async fn run_http(config: AppConfig) -> anyhow::Result<()> { let app_cache = app_cache?; let redis_pool = app_cache.redis_pool().clone(); - let hook = HookService::new( + let _hook = HookService::new( db.clone(), app_cache.clone(), redis_pool.clone(), config.clone(), ); - let _worker_cancel = hook.start_worker(); - tracing::info!("hook worker started"); let sync = crate::ssh::ReceiveSyncService::new(redis_pool.clone()); diff --git a/libs/git/ssh/mod.rs b/libs/git/ssh/mod.rs index 0f2ad72..9d3044a 100644 --- a/libs/git/ssh/mod.rs +++ b/libs/git/ssh/mod.rs @@ -279,15 +279,12 @@ pub async fn run_ssh(config: AppConfig) -> anyhow::Result<()> { let cache = AppCache::init(&config).await?; let redis_pool = cache.redis_pool().clone(); - // Start the hook worker (Redis queue consumer) - let hook = crate::hook::HookService::new( + let _hook = crate::hook::HookService::new( db.clone(), cache.clone(), redis_pool.clone(), config.clone(), ); - let _worker_cancel = hook.start_worker(); - tracing::info!("hook worker started"); SSHHandle::new(db, config.clone(), cache, redis_pool) .run_ssh()