From ecf9f33b2649820d0c9c5c9202a5026b7ce85d9f Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Sun, 26 Apr 2026 16:30:41 +0800 Subject: [PATCH] refactor(agent/sync): remove OpenRouter dependency, use upstream /v1/models directly MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The upstream AI endpoint already returns complete model metadata: - name, owned_by, context_length, max_output_tokens - capabilities (vision, tool_call, reasoning) - pricing (input, output, cache_read, cache_write, currency) Remove the OpenRouter fallback entirely and parse the upstream response directly for all sync operations. Both sync_upstream_models (API) and sync_once (background task) now use a single unified path. Changes: - Remove OpenRouter types and fetch_openrouter_models() - Add UpstreamModel / UpstreamCapabilities / UpstreamPricing types - Parse capabilities from upstream instead of inferring from name - Use real pricing from upstream instead of defaulting to 0.00 - Simplify sync flow: list → parse → upsert (no filtering/matching) - Add provider normalizations for moonshot, zai, minimax, qwen --- libs/service/agent/sync.rs | 886 +++++++++---------------------------- 1 file changed, 221 insertions(+), 665 deletions(-) diff --git a/libs/service/agent/sync.rs b/libs/service/agent/sync.rs index 020a903..60f4496 100644 --- a/libs/service/agent/sync.rs +++ b/libs/service/agent/sync.rs @@ -1,14 +1,12 @@ -//! Synchronizes AI model metadata from OpenRouter into the local database. +//! Synchronizes AI model metadata from the upstream AI endpoint +//! (`GET /v1/models`) into the local database. //! //! Flow: -//! 1. Call `GET /models` with the real API key to list accessible model IDs. -//! 2. Fetch full metadata (pricing, context_length, capabilities) for those -//! model IDs from OpenRouter's public `/api/v1/models` endpoint (no auth). +//! 1. Call `GET /v1/models` with the configured AI API key. +//! 2. Parse the rich response (name, context_length, max_output_tokens, +//! capabilities, pricing, owned_by) — no external metadata source needed. //! 3. Upsert provider / model / version / pricing / capability / profile -//! records for models the client can actually call. -//! 4. Models accessible from the user's endpoint but NOT in OpenRouter's -//! catalog ("stranger" models) are also upserted through the same -//! pipeline with inferred defaults (direct sync). +//! records for all accessible models. //! //! Usage: call `start_sync_task()` to launch a background task that syncs //! immediately and then every 10 minutes. On app startup, run it once @@ -17,7 +15,6 @@ use std::time::Duration; use tokio::task::JoinHandle; use tokio::time::interval; -use tokio::time::sleep; use crate::error::AppError; use crate::AppService; @@ -38,65 +35,56 @@ use session::Session; use utoipa::ToSchema; use uuid::Uuid; -const OPENROUTER_URL: &str = "https://openrouter.ai/api/v1/models"; - -// OpenRouter API types ------------------------------------------------------- +// Upstream /v1/models response types ----------------------------------------- #[derive(Debug, Clone, Deserialize)] -struct OpenRouterResponse { - data: Vec, +struct ModelsListResponse { + data: Vec, } #[derive(Debug, Clone, Deserialize)] -#[allow(dead_code)] -struct OpenRouterModel { +struct UpstreamModel { id: String, + #[serde(default)] name: Option, #[serde(default)] - description: Option, + owned_by: Option, #[serde(default)] context_length: Option, #[serde(default)] - architecture: Option, + max_output_tokens: Option, #[serde(default)] - top_provider: Option, + capabilities: Option, + #[serde(default)] + pricing: Option, + #[serde(default)] + r#type: Option, } #[derive(Debug, Clone, Deserialize)] -#[allow(dead_code)] -struct OpenRouterArchitecture { +struct UpstreamCapabilities { #[serde(default)] - modality: Option, + vision: Option, #[serde(default)] - input_modalities: Option>, + tool_call: Option, #[serde(default)] - output_modalities: Option>, - #[serde(default)] - tokenizer: Option, - #[serde(default)] - instruct_type: Option, + reasoning: Option, } #[derive(Debug, Clone, Deserialize)] -#[allow(dead_code)] -struct OpenRouterTopProvider { +struct UpstreamPricing { #[serde(default)] - context_length: Option, + input: Option, #[serde(default)] - max_completion_tokens: Option, + output: Option, #[serde(default)] - is_moderated: Option, -} - -/// Fallback model type used when the user's AI endpoint is NOT OpenRouter -/// (e.g. Bailian/MiniMax). OpenRouter has no metadata for these models, -/// so we sync them directly from the endpoint's own /models response. -#[derive(Debug, Clone)] -#[allow(dead_code)] -struct DirectModel { - id: String, - name: Option, - context_length: Option, + cache_read: Option, + #[serde(default)] + cache_write: Option, + #[serde(default)] + unit: Option, + #[serde(default)] + currency: Option, } // Response type -------------------------------------------------------------- @@ -111,22 +99,15 @@ pub struct SyncModelsResponse { pub profiles_created: i64, } -// Inference helpers ---------------------------------------------------------- +// Mapping helpers ------------------------------------------------------------ -fn infer_modality(name: &str, arch_modality: Option<&str>) -> ModelModality { - if let Some(m) = arch_modality { - let m = m.to_lowercase(); - if m.contains("text") || m.contains("chat") { - return ModelModality::Text; - } - if m.contains("image") || m.contains("vision") { +fn infer_modality(model: &UpstreamModel) -> ModelModality { + if let Some(caps) = &model.capabilities { + if caps.vision == Some(true) { return ModelModality::Multimodal; } - if m.contains("audio") || m.contains("speech") { - return ModelModality::Audio; - } } - let lower = name.to_lowercase(); + let lower = model.id.to_lowercase(); if lower.contains("vision") || lower.contains("dall-e") || lower.contains("gpt-image") @@ -142,8 +123,8 @@ fn infer_modality(name: &str, arch_modality: Option<&str>) -> ModelModality { } } -fn infer_capability(name: &str) -> ModelCapability { - let lower = name.to_lowercase(); +fn infer_capability(model: &UpstreamModel) -> ModelCapability { + let lower = model.id.to_lowercase(); if lower.contains("embedding") { ModelCapability::Embedding } else { @@ -151,32 +132,44 @@ fn infer_capability(name: &str) -> ModelCapability { } } -fn infer_context_length(ctx: Option) -> i64 { - ctx.map(|c| c as i64).unwrap_or(8_192) +fn context_length(model: &UpstreamModel) -> i64 { + model.context_length.map(|c| c as i64).unwrap_or(8_192) } -fn infer_max_output(top_provider_max: Option) -> Option { - top_provider_max.map(|v| v as i64) +fn max_output_tokens(model: &UpstreamModel) -> Option { + model.max_output_tokens.map(|v| v as i64) } -fn infer_capability_list(arch: &OpenRouterArchitecture) -> Vec<(CapabilityType, bool)> { - let mut caps = vec![(CapabilityType::FunctionCall, true)]; - if let Some(m) = &arch.modality { - let m = m.to_lowercase(); - if m.contains("image") || m.contains("vision") { - caps.push((CapabilityType::Vision, true)); - } - if m.contains("text") || m.contains("chat") { +fn capability_list(model: &UpstreamModel) -> Vec<(CapabilityType, bool)> { + let mut caps = Vec::new(); + + // Function call / tool use + if let Some(u) = &model.capabilities { + if u.tool_call == Some(true) { caps.push((CapabilityType::ToolUse, true)); } + if u.vision == Some(true) { + caps.push((CapabilityType::Vision, true)); + } } + + // Always mark function call as supported by default for chat models + if caps.is_empty() { + caps.push((CapabilityType::FunctionCall, true)); + } + caps } // Provider helpers ----------------------------------------------------------- -fn extract_provider(model_id: &str) -> &str { - model_id.split('/').next().unwrap_or("unknown") +fn extract_provider_name(model: &UpstreamModel) -> &str { + if let Some(owned) = &model.owned_by { + if !owned.is_empty() { + return normalize_provider_name(owned); + } + } + normalize_provider_name(model.id.split('/').next().unwrap_or("unknown")) } fn normalize_provider_name(slug: &str) -> &'static str { @@ -189,6 +182,10 @@ fn normalize_provider_name(slug: &str) -> &'static str { "deepseek" => "deepseek", "azure" | "azure-openai" => "azure", "x-ai" | "xai" => "xai", + "moonshot" => "moonshot", + "zai" => "zai", + "minimax" => "minimax", + "alibaba" | "qwen" => "qwen", s => Box::leak(s.to_string().into_boxed_str()), } } @@ -203,6 +200,10 @@ fn provider_display_name(name: &str) -> String { "deepseek" => "DeepSeek".to_string(), "azure" => "Microsoft Azure".to_string(), "xai" => "xAI".to_string(), + "moonshot" => "Moonshot AI".to_string(), + "zai" => "Zhipu AI".to_string(), + "minimax" => "MiniMax".to_string(), + "qwen" => "Alibaba Qwen".to_string(), s => s.to_string(), } } @@ -210,13 +211,12 @@ fn provider_display_name(name: &str) -> String { // Upsert helpers ------------------------------------------------------------- async fn upsert_provider(db: &AppDatabase, slug: &str) -> Result { - let name = normalize_provider_name(slug); - let display = provider_display_name(name); + let display = provider_display_name(slug); let now = Utc::now(); use models::agents::model_provider::Column as PCol; if let Some(existing) = ProviderEntity::find() - .filter(PCol::Name.eq(name)) + .filter(PCol::Name.eq(slug)) .one(db) .await? { @@ -224,14 +224,14 @@ async fn upsert_provider(db: &AppDatabase, slug: &str) -> Result Result Result<(models::agents::model::Model, bool), AppError> { let now = Utc::now(); - let modality_str = or_model - .architecture - .as_ref() - .and_then(|a| a.modality.as_deref()); - let modality = infer_modality(model_id_str, modality_str); - let capability = infer_capability(model_id_str); - - let context_length = infer_context_length(or_model.context_length); - let max_output = infer_max_output( - or_model - .top_provider - .as_ref() - .and_then(|p| p.max_completion_tokens), - ); + let modality = infer_modality(model); + let capability = infer_capability(model); + let ctx = context_length(model); + let max_out = max_output_tokens(model); use models::agents::model::Column as MCol; if let Some(existing) = ModelEntity::find() .filter(MCol::ProviderId.eq(provider_id)) - .filter(MCol::Name.eq(model_id_str)) - .one(db) - .await? - { - let mut active: models::agents::model::ActiveModel = existing.clone().into(); - active.context_length = Set(context_length); - active.max_output_tokens = Set(max_output); - active.status = Set(ModelStatus::Active.to_string()); - active.updated_at = Set(now); - active.update(db).await?; - Ok(( - ModelEntity::find_by_id(existing.id).one(db).await?.unwrap(), - false, - )) - } else { - let active = models::agents::model::ActiveModel { - id: Set(Uuid::now_v7()), - provider_id: Set(provider_id), - name: Set(model_id_str.to_string()), - modality: Set(modality.to_string()), - capability: Set(capability.to_string()), - context_length: Set(context_length), - max_output_tokens: Set(max_output), - training_cutoff: Set(None), - is_open_source: Set(false), - status: Set(ModelStatus::Active.to_string()), - created_at: Set(now), - updated_at: Set(now), - ..Default::default() - }; - let inserted = active.insert(db).await.map_err(AppError::from)?; - Ok((inserted, true)) - } -} - -/// Upsert a model directly from the user's AI endpoint response (no OpenRouter metadata). -/// Used as fallback when the endpoint is not OpenRouter-compatible. -async fn upsert_model_direct( - db: &AppDatabase, - provider_id: Uuid, - model_id_str: &str, - _name: Option<&str>, - context_length: Option, -) -> Result<(models::agents::model::Model, bool), AppError> { - let now = Utc::now(); - let modality = infer_modality(model_id_str, None); - let capability = infer_capability(model_id_str); - let ctx = infer_context_length(context_length); - - use models::agents::model::Column as MCol; - if let Some(existing) = ModelEntity::find() - .filter(MCol::ProviderId.eq(provider_id)) - .filter(MCol::Name.eq(model_id_str)) + .filter(MCol::Name.eq(&model.id)) .one(db) .await? { let mut active: models::agents::model::ActiveModel = existing.clone().into(); active.context_length = Set(ctx); + active.max_output_tokens = Set(max_out); active.status = Set(ModelStatus::Active.to_string()); active.updated_at = Set(now); active.update(db).await?; @@ -336,11 +274,11 @@ async fn upsert_model_direct( let active = models::agents::model::ActiveModel { id: Set(Uuid::now_v7()), provider_id: Set(provider_id), - name: Set(model_id_str.to_string()), + name: Set(model.id.clone()), modality: Set(modality.to_string()), capability: Set(capability.to_string()), context_length: Set(ctx), - max_output_tokens: Set(None), + max_output_tokens: Set(max_out), training_cutoff: Set(None), is_open_source: Set(false), status: Set(ModelStatus::Active.to_string()), @@ -382,10 +320,10 @@ async fn upsert_version( } } -/// Create default pricing record with 0 price for admin-side modification. async fn upsert_pricing( db: &AppDatabase, version_uuid: Uuid, + pricing: Option<&UpstreamPricing>, ) -> Result { use models::agents::model_pricing::Column as PCol; use models::agents::model_pricing::Entity as PricingEntity; @@ -397,12 +335,25 @@ async fn upsert_pricing( return Ok(false); } + let (input_price, output_price) = if let Some(p) = pricing { + ( + format!("{:.2}", p.input.unwrap_or(0.0)), + format!("{:.2}", p.output.unwrap_or(0.0)), + ) + } else { + ("0.00".to_string(), "0.00".to_string()) + }; + + let currency = pricing + .and_then(|p| p.currency.clone()) + .unwrap_or_else(|| "USD".to_string()); + let active = models::agents::model_pricing::ActiveModel { id: Set(Uuid::now_v7().as_u128() as i64), model_version_id: Set(version_uuid), - input_price_per_1k_tokens: Set("0.00".to_string()), - output_price_per_1k_tokens: Set("0.00".to_string()), - currency: Set("USD".to_string()), + input_price_per_1k_tokens: Set(input_price), + output_price_per_1k_tokens: Set(output_price), + currency: Set(currency), effective_from: Set(Utc::now()), }; active.insert(db).await.map_err(AppError::from)?; @@ -412,18 +363,10 @@ async fn upsert_pricing( async fn upsert_capabilities( db: &AppDatabase, version_uuid: Uuid, - arch: Option<&OpenRouterArchitecture>, + model: &UpstreamModel, ) -> Result { use models::agents::model_capability::Column as CCol; - let caps = infer_capability_list( - arch.unwrap_or(&OpenRouterArchitecture { - modality: None, - input_modalities: None, - output_modalities: None, - tokenizer: None, - instruct_type: None, - }), - ); + let caps = capability_list(model); let now = Utc::now(); let mut created = 0i64; @@ -452,7 +395,7 @@ async fn upsert_capabilities( async fn upsert_parameter_profile( db: &AppDatabase, version_uuid: Uuid, - model_name: &str, + model: &UpstreamModel, ) -> Result { use models::agents::model_parameter_profile::Column as PCol; let existing = ProfileEntity::find() @@ -463,7 +406,7 @@ async fn upsert_parameter_profile( return Ok(false); } - let lower = model_name.to_lowercase(); + let lower = model.id.to_lowercase(); let (t_min, t_max) = if lower.contains("o1") || lower.contains("o3") { (1.0, 1.0) } else { @@ -484,18 +427,12 @@ async fn upsert_parameter_profile( Ok(true) } -/// Sync models directly from the user's AI endpoint when OpenRouter has no matching models. -/// This handles non-OpenRouter endpoints (e.g. Bailian, MiniMax) gracefully. -async fn sync_models_direct( - db: &AppDatabase, - available_ids: &std::collections::HashSet, -) -> SyncModelsResponse { - tracing::info!( - model_count = available_ids.len(), - "sync_models_direct: {} models from endpoint (no OpenRouter metadata)", - available_ids.len() - ); +// Core sync logic ------------------------------------------------------------ +async fn sync_models_from_upstream( + db: &AppDatabase, + models: Vec, +) -> SyncModelsResponse { let mut models_created = 0i64; let mut models_updated = 0i64; let mut versions_created = 0i64; @@ -503,64 +440,67 @@ async fn sync_models_direct( let mut capabilities_created = 0i64; let mut profiles_created = 0i64; - for model_id in available_ids { - let provider_slug = extract_provider(model_id); + for model in models { + let provider_slug = extract_provider_name(&model); let provider = match upsert_provider(db, provider_slug).await { Ok(p) => p, Err(e) => { tracing::warn!( provider = %provider_slug, error = ?e, - "sync_models_direct: upsert_provider error" + "sync_models_from_upstream: upsert_provider error" ); continue; } }; - let (model_record, _is_new) = - match upsert_model_direct(db, provider.id, model_id, None, None).await { - Ok((m, n)) => { - if n { - models_created += 1; - } else { - models_updated += 1; - } - (m, n) + let (model_record, _is_new) = match upsert_model(db, provider.id, &model).await { + Ok((m, n)) => { + if n { + models_created += 1; + } else { + models_updated += 1; } - Err(e) => { - tracing::warn!( - model = %model_id, - error = ?e, - "sync_models_direct: upsert_model_direct error" - ); - continue; - } - }; + (m, n) + } + Err(e) => { + tracing::warn!( + model = %model.id, + error = ?e, + "sync_models_from_upstream: upsert_model error" + ); + continue; + } + }; - let (version_record, version_is_new) = - match upsert_version(db, model_record.id).await { - Ok(v) => v, - Err(e) => { - tracing::warn!( - model = %model_id, - error = ?e, - "sync_models_direct: upsert_version error" - ); - continue; - } - }; + let (version_record, version_is_new) = match upsert_version(db, model_record.id).await { + Ok(v) => v, + Err(e) => { + tracing::warn!( + model = %model.id, + error = ?e, + "sync_models_from_upstream: upsert_version error" + ); + continue; + } + }; if version_is_new { versions_created += 1; } - if upsert_pricing(db, version_record.id).await.unwrap_or(false) { + if upsert_pricing(db, version_record.id, model.pricing.as_ref()) + .await + .unwrap_or(false) + { pricing_created += 1; } capabilities_created += - upsert_capabilities(db, version_record.id, None).await.unwrap_or(0); + upsert_capabilities(db, version_record.id, &model) + .await + .unwrap_or(0); - if upsert_parameter_profile(db, version_record.id, model_id) + if upsert_parameter_profile(db, version_record.id, &model) .await .unwrap_or(false) { @@ -568,17 +508,6 @@ async fn sync_models_direct( } } - tracing::info!( - matched = available_ids.len(), - models_created, - models_updated, - versions_created, - pricing_created, - capabilities_created, - profiles_created, - "sync_models_direct complete" - ); - SyncModelsResponse { models_created, models_updated, @@ -591,104 +520,12 @@ async fn sync_models_direct( // HTTP helpers --------------------------------------------------------------- -async fn fetch_openrouter_models( - client: &reqwest::Client, -) -> Result { - const MAX_RETRIES: u32 = 3; - const BASE_DELAY_MS: u64 = 1_000; - - let mut attempt = 0; - loop { - attempt += 1; - match client.get(OPENROUTER_URL).send().await { - Ok(r) => { - return match r.error_for_status() { - Ok(resp) => match resp.json::().await { - Ok(root) => Ok(root), - Err(e) => Err(format!( - "failed to parse response after {} attempt(s): {}", - attempt, e - )), - }, - Err(e) => Err(format!( - "HTTP status error after {} attempt(s): url={} status={}", - attempt, - e.url() - .map(|u| u.to_string()) - .unwrap_or_else(|| OPENROUTER_URL.to_string()), - e - )), - }; - } - Err(e) => { - let kind = if e.is_timeout() { - "timeout" - } else if e.is_connect() { - "connect" - } else { - "request" - }; - let url = e - .url() - .map(|u| u.to_string()) - .unwrap_or_else(|| OPENROUTER_URL.to_string()); - - if attempt >= MAX_RETRIES { - return Err(format!( - "OpenRouter connection failed after {} attempt(s): [{}] url={} error={:?}", - attempt, kind, url, e - )); - } - let delay_ms = BASE_DELAY_MS * (1 << (attempt - 1)); - tracing::warn!( - attempt = attempt, - max_retries = MAX_RETRIES, - kind = %kind, - url = %url, - error = ?e, - retry_delay_ms = delay_ms, - "OpenRouter connection attempt failed, retrying" - ); - sleep(Duration::from_millis(delay_ms)).await; - } - } - } -} - -/// Build reqwest client and config from the AI config. -fn build_ai_client(config: &config::AppConfig) -> Result<(reqwest::Client, String, String), AppError> { - let api_key = config - .ai_api_key() - .map_err(|e| AppError::InternalServerError(format!("AI API key not configured: {}", e)))?; - - let base_url = config - .ai_basic_url() - .unwrap_or_else(|_| "https://api.openai.com".into()); - - Ok((reqwest::Client::new(), base_url, api_key)) -} - -/// Response from `GET /v1/models`. -#[derive(Debug, Deserialize)] -struct ModelsListResponse { - data: Vec, -} - -#[derive(Debug, Deserialize)] -struct ModelEntry { - id: String, -} - -/// List accessible model IDs from the AI endpoint. -/// Supports multiple response formats: -/// - OpenAI: `{ "data": [{"id": "..."}, ...] }` -/// - Array: `[{"id": "..."}, ...]` -/// - Custom: `{ "models": [{"id": "..."}, ...] }` -async fn list_accessible_models( +/// List models from the upstream AI endpoint (`GET /v1/models`). +async fn list_upstream_models( client: &reqwest::Client, base_url: &str, api_key: &str, -) -> Result, AppError> { +) -> Result, AppError> { let url = format!("{}/v1/models", base_url.trim_end_matches('/')); let resp = client .get(&url) @@ -702,215 +539,80 @@ async fn list_accessible_models( .await .map_err(|e| AppError::InternalServerError(format!("failed to read models body: {}", e)))?; - // Try OpenAI format: { "data": [{"id": "..."}, ...] } + // Try standard OpenAI-compatible format: { "data": [{...}, ...] } if let Ok(parsed) = serde_json::from_str::(&body) { - return Ok(parsed.data.into_iter().map(|m| m.id).collect()); + return Ok(parsed.data); } - // Try raw array: [{"id": "..."}, ...] - if let Ok(parsed) = serde_json::from_str::>(&body) { - return Ok(parsed.into_iter().map(|m| m.id).collect()); + // Try raw array: [{...}, ...] + if let Ok(parsed) = serde_json::from_str::>(&body) { + return Ok(parsed); } - // Try { "models": [{"id": "..."}, ...] } - #[derive(Debug, Deserialize)] - struct ModelsAltResponse { - models: Vec, - } - if let Ok(parsed) = serde_json::from_str::(&body) { - return Ok(parsed.models.into_iter().map(|m| m.id).collect()); - } - - tracing::warn!(body = %body.chars().take(500).collect::(), "list_accessible_models: unknown response format"); + tracing::warn!( + body = %body.chars().take(500).collect::(), + "list_upstream_models: unknown response format" + ); Err(AppError::InternalServerError(format!( "unexpected /v1/models response format (first 200 chars): {}", body.chars().take(200).collect::() ))) } +fn build_ai_client(config: &config::AppConfig) -> Result<(reqwest::Client, String, String), AppError> { + let api_key = config + .ai_api_key() + .map_err(|e| AppError::InternalServerError(format!("AI API key not configured: {}", e)))?; + + let base_url = config + .ai_basic_url() + .unwrap_or_else(|_| "https://api.openai.com".into()); + + Ok((reqwest::Client::new(), base_url, api_key)) +} + +fn build_ai_client_from_parts( + api_key: Option, + base_url: Option, +) -> Result<(reqwest::Client, String, String), String> { + let api_key = api_key.ok_or_else(|| "AI API key not configured".to_string())?; + let base_url = base_url.unwrap_or_else(|| "https://api.openai.com".into()); + Ok((reqwest::Client::new(), base_url, api_key)) +} + +// Public API ----------------------------------------------------------------- + impl AppService { - /// Sync metadata for models that are accessible by the configured AI client. + /// Sync model metadata from the upstream AI endpoint (`GET /v1/models`). /// - /// Steps: - /// 1. Call `client.models().list()` to get the set of accessible model IDs. - /// 2. Fetch full model list from OpenRouter's public `/api/v1/models` endpoint. - /// 3. Keep only models whose ID appears in the accessible set, then upsert - /// with full OpenRouter metadata. - /// 4. Models NOT in OpenRouter's catalog are also upserted through the - /// same pipeline (provider → model → version → pricing → capabilities - /// → parameter_profile) with inferred defaults. + /// Parses the full response (name, context_length, max_output_tokens, + /// capabilities, pricing, owned_by) and upserts all related records. pub async fn sync_upstream_models( &self, _ctx: &Session, ) -> Result { - // Step 1: list models the AI client can access. let (http_client, base_url, api_key) = build_ai_client(&self.config)?; - let available_ids = list_accessible_models(&http_client, &base_url, &api_key).await?; + let upstream_models = list_upstream_models(&http_client, &base_url, &api_key).await?; tracing::info!( - model_count = available_ids.len(), - "sync_upstream_models: {} accessible models found", - available_ids.len() + model_count = upstream_models.len(), + "sync_upstream_models: {} models from upstream endpoint", + upstream_models.len() ); - // Step 2: fetch OpenRouter metadata (optional — failure falls back to - // direct sync for all available models). - let http_client = reqwest::Client::new(); - let or_models: Vec = match fetch_openrouter_models(&http_client).await { - Ok(resp) => resp.data, - Err(msg) => { - tracing::warn!(error = %msg, "sync_upstream_models: OpenRouter fetch failed, falling back to direct sync"); - let direct_result = sync_models_direct(&self.db, &available_ids).await; - return Ok(direct_result); - } - }; - - // Step 3: filter to only accessible models. - let filtered: Vec<&OpenRouterModel> = or_models - .iter() - .filter(|m| available_ids.contains(&m.id)) - .filter(|m| m.id != "openrouter/auto") - .collect(); - - // Identify "stranger" models: accessible from the user's endpoint but - // NOT present in OpenRouter's public catalog. These are also upserted - // through the same pipeline (provider → model → version → pricing → - // capabilities → parameter_profile) with inferred defaults. - let or_matched_ids: std::collections::HashSet<&str> = filtered - .iter() - .map(|m| m.id.as_str()) - .collect(); - let unknown_ids: Vec<&str> = available_ids - .iter() - .filter(|id| !or_matched_ids.contains(id.as_str())) - .map(|s| s.as_str()) - .collect(); - - let filtered_count = filtered.len(); - - let mut models_created = 0i64; - let mut models_updated = 0i64; - let mut versions_created = 0i64; - let mut pricing_created = 0i64; - let mut capabilities_created = 0i64; - let mut profiles_created = 0i64; - - // Sync stranger models (non-OpenRouter) through the direct pipeline. - if !unknown_ids.is_empty() { - tracing::info!( - unknown_count = unknown_ids.len(), - "sync_upstream_models: {} models not in OpenRouter catalog, syncing directly", - unknown_ids.len() - ); - let unknown_set: std::collections::HashSet = - unknown_ids.iter().map(|s| ToString::to_string(s)).collect(); - let direct_result = sync_models_direct(&self.db, &unknown_set).await; - models_created += direct_result.models_created; - models_updated += direct_result.models_updated; - versions_created += direct_result.versions_created; - pricing_created += direct_result.pricing_created; - capabilities_created += direct_result.capabilities_created; - profiles_created += direct_result.profiles_created; - } - - // If no OpenRouter metadata matched at all, the direct sync above - // already handled everything — return early. - if filtered_count == 0 { - return Ok(SyncModelsResponse { - models_created, - models_updated, - versions_created, - pricing_created, - capabilities_created, - profiles_created, - }); - } - - for or_model in filtered { - let provider_slug = extract_provider(&or_model.id); - let provider = match upsert_provider(&self.db, provider_slug).await { - Ok(p) => p, - Err(e) => { - tracing::warn!( - provider = %provider_slug, - error = ?e, - "sync_upstream_models: upsert_provider error" - ); - continue; - } - }; - - let (model_record, _is_new) = - match upsert_model(&self.db, provider.id, &or_model.id, or_model).await { - Ok((m, n)) => { - if n { - models_created += 1; - } else { - models_updated += 1; - } - (m, n) - } - Err(e) => { - tracing::warn!( - model = %or_model.id, - error = ?e, - "sync_upstream_models: upsert_model error" - ); - continue; - } - }; - - let (version_record, version_is_new) = - match upsert_version(&self.db, model_record.id).await { - Ok(v) => v, - Err(e) => { - tracing::warn!( - model = %or_model.id, - error = ?e, - "sync_upstream_models: upsert_version error" - ); - continue; - } - }; - if version_is_new { - versions_created += 1; - } - - if upsert_pricing(&self.db, version_record.id).await.unwrap_or(false) { - pricing_created += 1; - } - - capabilities_created += - upsert_capabilities(&self.db, version_record.id, or_model.architecture.as_ref()) - .await - .unwrap_or(0); - - if upsert_parameter_profile(&self.db, version_record.id, &or_model.id) - .await - .unwrap_or(false) - { - profiles_created += 1; - } - } + let result = sync_models_from_upstream(&self.db, upstream_models).await; tracing::info!( - filtered_count, - models_created, - models_updated, - "sync_upstream_models: synced {} accessible models ({}) OpenRouter + ({}) direct", - filtered_count + unknown_ids.len(), - filtered_count, - unknown_ids.len() + models_created = result.models_created, + models_updated = result.models_updated, + versions_created = result.versions_created, + pricing_created = result.pricing_created, + capabilities_created = result.capabilities_created, + profiles_created = result.profiles_created, + "sync_upstream_models: complete" ); - Ok(SyncModelsResponse { - models_created, - models_updated, - versions_created, - pricing_created, - capabilities_created, - profiles_created, - }) + Ok(result) } /// Spawn a background task that syncs model metadata immediately @@ -941,184 +643,38 @@ impl AppService { ai_api_key: Option, ai_base_url: Option, ) { - // Build AI client to list accessible models. let (http_client, base_url, api_key) = match build_ai_client_from_parts(ai_api_key, ai_base_url) { Ok(c) => c, Err(msg) => { - tracing::warn!(error = %msg, "OpenRouter model sync"); + tracing::warn!(error = %msg, "Model sync: AI client config error"); return; } }; - let available_ids = match list_accessible_models(&http_client, &base_url, &api_key).await { - Ok(ids) => ids, + let upstream_models = match list_upstream_models(&http_client, &base_url, &api_key).await { + Ok(models) => models, Err(e) => { - tracing::warn!(error = ?e, "OpenRouter model sync: failed to list available models"); + tracing::warn!(error = ?e, "Model sync: failed to list upstream models"); return; } }; - let http_client = reqwest::Client::new(); - let or_models: Vec = match fetch_openrouter_models(&http_client).await { - Ok(resp) => resp.data, - Err(msg) => { - tracing::warn!(error = %msg, "OpenRouter model sync: fetch failed, falling back to direct sync"); - sync_models_direct(db, &available_ids).await; - return; - } - }; - - let filtered: Vec<&OpenRouterModel> = or_models - .iter() - .filter(|m| available_ids.contains(&m.id)) - .filter(|m| m.id != "openrouter/auto") - .collect(); - - // Identify "stranger" models: accessible from the user's endpoint but - // NOT present in OpenRouter's public catalog. - let or_matched_ids: std::collections::HashSet<&str> = filtered - .iter() - .map(|m| m.id.as_str()) - .collect(); - let unknown_ids: Vec<&str> = available_ids - .iter() - .filter(|id| !or_matched_ids.contains(id.as_str())) - .map(|s| s.as_str()) - .collect(); - - let filtered_count = filtered.len(); - - // Sync stranger models (non-OpenRouter) through the direct pipeline. - let mut models_created = 0i64; - let mut models_updated = 0i64; - let mut versions_created = 0i64; - let mut pricing_created = 0i64; - let mut capabilities_created = 0i64; - let mut profiles_created = 0i64; - - if !unknown_ids.is_empty() { - tracing::info!( - unknown_count = unknown_ids.len(), - "OpenRouter model sync: {} models not in OpenRouter catalog, syncing directly", - unknown_ids.len() - ); - let unknown_set: std::collections::HashSet = - unknown_ids.iter().map(|s| ToString::to_string(s)).collect(); - let direct_result = sync_models_direct(db, &unknown_set).await; - models_created += direct_result.models_created; - models_updated += direct_result.models_updated; - versions_created += direct_result.versions_created; - pricing_created += direct_result.pricing_created; - capabilities_created += direct_result.capabilities_created; - profiles_created += direct_result.profiles_created; - } - - // If no OpenRouter metadata matched at all, the direct sync above - // already handled everything — return early. - if filtered_count == 0 { - tracing::info!( - matched = filtered_count, - models_created, - models_updated, - versions_created, - pricing_created, - capabilities_created, - profiles_created, - "OpenRouter model sync complete (direct only)" - ); - return; - } - - for or_model in filtered { - let provider_slug = extract_provider(&or_model.id); - let provider = match upsert_provider(db, provider_slug).await { - Ok(p) => p, - Err(e) => { - tracing::warn!( - provider = %provider_slug, - error = ?e, - "OpenRouter model sync: upsert_provider error" - ); - continue; - } - }; - - let (model_record, _is_new) = - match upsert_model(db, provider.id, &or_model.id, or_model).await { - Ok((m, true)) => { - models_created += 1; - (m, true) - } - Ok((m, false)) => { - models_updated += 1; - (m, false) - } - Err(e) => { - tracing::warn!( - model = %or_model.id, - error = ?e, - "OpenRouter model sync: upsert_model error" - ); - continue; - } - }; - - let (version_record, version_is_new) = - match upsert_version(db, model_record.id).await { - Ok(v) => v, - Err(e) => { - tracing::warn!( - model = %or_model.id, - error = ?e, - "OpenRouter model sync: upsert_version error" - ); - continue; - } - }; - if version_is_new { - versions_created += 1; - } - - if upsert_pricing(db, version_record.id).await.unwrap_or(false) { - pricing_created += 1; - } - - capabilities_created += - upsert_capabilities(db, version_record.id, or_model.architecture.as_ref()) - .await - .unwrap_or(0); - - if upsert_parameter_profile(db, version_record.id, &or_model.id) - .await - .unwrap_or(false) - { - profiles_created += 1; - } - } - tracing::info!( - matched = filtered_count, - unknown = unknown_ids.len(), - models_created, - models_updated, - versions_created, - pricing_created, - capabilities_created, - profiles_created, - "OpenRouter model sync complete: {} total ({} OpenRouter + {} direct)", - filtered_count + unknown_ids.len(), - filtered_count, - unknown_ids.len() + model_count = upstream_models.len(), + "Model sync: {} models from upstream", + upstream_models.len() + ); + + let result = sync_models_from_upstream(db, upstream_models).await; + + tracing::info!( + models_created = result.models_created, + models_updated = result.models_updated, + versions_created = result.versions_created, + pricing_created = result.pricing_created, + capabilities_created = result.capabilities_created, + profiles_created = result.profiles_created, + "Model sync complete" ); } } - -/// Build a reqwest client and config parts for background sync task. -fn build_ai_client_from_parts( - api_key: Option, - base_url: Option, -) -> Result<(reqwest::Client, String, String), String> { - let api_key = api_key.ok_or_else(|| "AI API key not configured".to_string())?; - let base_url = base_url.unwrap_or_else(|| "https://api.openai.com".into()); - Ok((reqwest::Client::new(), base_url, api_key)) -}