gitdataai/libs/service/agent/sync.rs
ZhenYi 8a23a22c9b fix(agent/sync): make OpenRouter fetch optional, fallback to direct sync
When OpenRouter's public /api/v1/models endpoint fails (network error,
timeout, parse failure), the entire sync was aborted — meaning models
accessible from the user's AI endpoint were never synced.

Now: if OpenRouter fetch fails, fall back to sync_models_direct for all
available models instead of returning an error. Both sync_upstream_models
(API) and sync_once (background task) have this fix.
2026-04-26 15:49:34 +08:00

1098 lines
38 KiB
Rust

//! Synchronizes AI model metadata from OpenRouter into the local database.
//!
//! Flow:
//! 1. Call `GET /models` with the real API key to list accessible model IDs.
//! 2. Fetch full metadata (pricing, context_length, capabilities) for those
//! model IDs from OpenRouter's public `/api/v1/models` endpoint (no auth).
//! 3. Upsert provider / model / version / pricing / capability / profile
//! records for models the client can actually call.
//! 4. Models accessible from the user's endpoint but NOT in OpenRouter's
//! catalog ("stranger" models) are also upserted through the same
//! pipeline with inferred defaults (direct sync).
//!
//! Usage: call `start_sync_task()` to launch a background task that syncs
//! immediately and then every 10 minutes. On app startup, run it once
//! eagerly before accepting traffic.
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::interval;
use tokio::time::sleep;
use crate::error::AppError;
use crate::AppService;
use chrono::Utc;
use db::database::AppDatabase;
use models::agents::model::Entity as ModelEntity;
use models::agents::model_capability::Entity as CapabilityEntity;
use models::agents::model_parameter_profile::Entity as ProfileEntity;
use models::agents::model_provider::Entity as ProviderEntity;
use models::agents::model_provider::Model as ProviderModel;
use models::agents::model_version::Entity as VersionEntity;
use models::agents::{CapabilityType, ModelCapability, ModelModality, ModelStatus};
use sea_orm::prelude::*;
use sea_orm::Set;
use serde::Deserialize;
use serde::Serialize;
use session::Session;
use utoipa::ToSchema;
use uuid::Uuid;
const OPENROUTER_URL: &str = "https://openrouter.ai/api/v1/models";
// OpenRouter API types -------------------------------------------------------
#[derive(Debug, Clone, Deserialize)]
struct OpenRouterResponse {
data: Vec<OpenRouterModel>,
}
#[derive(Debug, Clone, Deserialize)]
#[allow(dead_code)]
struct OpenRouterModel {
id: String,
name: Option<String>,
#[serde(default)]
description: Option<String>,
#[serde(default)]
context_length: Option<u64>,
#[serde(default)]
architecture: Option<OpenRouterArchitecture>,
#[serde(default)]
top_provider: Option<OpenRouterTopProvider>,
}
#[derive(Debug, Clone, Deserialize)]
#[allow(dead_code)]
struct OpenRouterArchitecture {
#[serde(default)]
modality: Option<String>,
#[serde(default)]
input_modalities: Option<Vec<String>>,
#[serde(default)]
output_modalities: Option<Vec<String>>,
#[serde(default)]
tokenizer: Option<String>,
#[serde(default)]
instruct_type: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
#[allow(dead_code)]
struct OpenRouterTopProvider {
#[serde(default)]
context_length: Option<u64>,
#[serde(default)]
max_completion_tokens: Option<u64>,
#[serde(default)]
is_moderated: Option<bool>,
}
/// Fallback model type used when the user's AI endpoint is NOT OpenRouter
/// (e.g. Bailian/MiniMax). OpenRouter has no metadata for these models,
/// so we sync them directly from the endpoint's own /models response.
#[derive(Debug, Clone)]
#[allow(dead_code)]
struct DirectModel {
id: String,
name: Option<String>,
context_length: Option<u64>,
}
// Response type --------------------------------------------------------------
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct SyncModelsResponse {
pub models_created: i64,
pub models_updated: i64,
pub versions_created: i64,
pub pricing_created: i64,
pub capabilities_created: i64,
pub profiles_created: i64,
}
// Inference helpers ----------------------------------------------------------
fn infer_modality(name: &str, arch_modality: Option<&str>) -> ModelModality {
if let Some(m) = arch_modality {
let m = m.to_lowercase();
if m.contains("text") || m.contains("chat") {
return ModelModality::Text;
}
if m.contains("image") || m.contains("vision") {
return ModelModality::Multimodal;
}
if m.contains("audio") || m.contains("speech") {
return ModelModality::Audio;
}
}
let lower = name.to_lowercase();
if lower.contains("vision")
|| lower.contains("dall-e")
|| lower.contains("gpt-image")
|| lower.contains("gpt-4o")
{
ModelModality::Multimodal
} else if lower.contains("embedding") {
ModelModality::Text
} else if lower.contains("whisper") || lower.contains("audio") {
ModelModality::Audio
} else {
ModelModality::Text
}
}
fn infer_capability(name: &str) -> ModelCapability {
let lower = name.to_lowercase();
if lower.contains("embedding") {
ModelCapability::Embedding
} else {
ModelCapability::Chat
}
}
fn infer_context_length(ctx: Option<u64>) -> i64 {
ctx.map(|c| c as i64).unwrap_or(8_192)
}
fn infer_max_output(top_provider_max: Option<u64>) -> Option<i64> {
top_provider_max.map(|v| v as i64)
}
fn infer_capability_list(arch: &OpenRouterArchitecture) -> Vec<(CapabilityType, bool)> {
let mut caps = vec![(CapabilityType::FunctionCall, true)];
if let Some(m) = &arch.modality {
let m = m.to_lowercase();
if m.contains("image") || m.contains("vision") {
caps.push((CapabilityType::Vision, true));
}
if m.contains("text") || m.contains("chat") {
caps.push((CapabilityType::ToolUse, true));
}
}
caps
}
// Provider helpers -----------------------------------------------------------
fn extract_provider(model_id: &str) -> &str {
model_id.split('/').next().unwrap_or("unknown")
}
fn normalize_provider_name(slug: &str) -> &'static str {
match slug {
"openai" => "openai",
"anthropic" => "anthropic",
"google" | "google-ai" => "google",
"mistralai" => "mistral",
"meta-llama" | "meta" => "meta",
"deepseek" => "deepseek",
"azure" | "azure-openai" => "azure",
"x-ai" | "xai" => "xai",
s => Box::leak(s.to_string().into_boxed_str()),
}
}
fn provider_display_name(name: &str) -> String {
match name {
"openai" => "OpenAI".to_string(),
"anthropic" => "Anthropic".to_string(),
"google" => "Google DeepMind".to_string(),
"mistral" => "Mistral AI".to_string(),
"meta" => "Meta".to_string(),
"deepseek" => "DeepSeek".to_string(),
"azure" => "Microsoft Azure".to_string(),
"xai" => "xAI".to_string(),
s => s.to_string(),
}
}
// Upsert helpers -------------------------------------------------------------
async fn upsert_provider(db: &AppDatabase, slug: &str) -> Result<ProviderModel, AppError> {
let name = normalize_provider_name(slug);
let display = provider_display_name(name);
let now = Utc::now();
use models::agents::model_provider::Column as PCol;
if let Some(existing) = ProviderEntity::find()
.filter(PCol::Name.eq(name))
.one(db)
.await?
{
let mut active: models::agents::model_provider::ActiveModel = existing.into();
active.updated_at = Set(now);
active.update(db).await?;
Ok(ProviderEntity::find()
.filter(PCol::Name.eq(name))
.one(db)
.await?
.unwrap())
} else {
let active = models::agents::model_provider::ActiveModel {
id: Set(Uuid::now_v7()),
name: Set(name.to_string()),
display_name: Set(display.to_string()),
website: Set(None),
status: Set(ModelStatus::Active.to_string()),
created_at: Set(now),
updated_at: Set(now),
};
active.insert(db).await.map_err(AppError::from)
}
}
async fn upsert_model(
db: &AppDatabase,
provider_id: Uuid,
model_id_str: &str,
or_model: &OpenRouterModel,
) -> Result<(models::agents::model::Model, bool), AppError> {
let now = Utc::now();
let modality_str = or_model
.architecture
.as_ref()
.and_then(|a| a.modality.as_deref());
let modality = infer_modality(model_id_str, modality_str);
let capability = infer_capability(model_id_str);
let context_length = infer_context_length(or_model.context_length);
let max_output = infer_max_output(
or_model
.top_provider
.as_ref()
.and_then(|p| p.max_completion_tokens),
);
use models::agents::model::Column as MCol;
if let Some(existing) = ModelEntity::find()
.filter(MCol::ProviderId.eq(provider_id))
.filter(MCol::Name.eq(model_id_str))
.one(db)
.await?
{
let mut active: models::agents::model::ActiveModel = existing.clone().into();
active.context_length = Set(context_length);
active.max_output_tokens = Set(max_output);
active.status = Set(ModelStatus::Active.to_string());
active.updated_at = Set(now);
active.update(db).await?;
Ok((
ModelEntity::find_by_id(existing.id).one(db).await?.unwrap(),
false,
))
} else {
let active = models::agents::model::ActiveModel {
id: Set(Uuid::now_v7()),
provider_id: Set(provider_id),
name: Set(model_id_str.to_string()),
modality: Set(modality.to_string()),
capability: Set(capability.to_string()),
context_length: Set(context_length),
max_output_tokens: Set(max_output),
training_cutoff: Set(None),
is_open_source: Set(false),
status: Set(ModelStatus::Active.to_string()),
created_at: Set(now),
updated_at: Set(now),
..Default::default()
};
let inserted = active.insert(db).await.map_err(AppError::from)?;
Ok((inserted, true))
}
}
/// Upsert a model directly from the user's AI endpoint response (no OpenRouter metadata).
/// Used as fallback when the endpoint is not OpenRouter-compatible.
async fn upsert_model_direct(
db: &AppDatabase,
provider_id: Uuid,
model_id_str: &str,
_name: Option<&str>,
context_length: Option<u64>,
) -> Result<(models::agents::model::Model, bool), AppError> {
let now = Utc::now();
let modality = infer_modality(model_id_str, None);
let capability = infer_capability(model_id_str);
let ctx = infer_context_length(context_length);
use models::agents::model::Column as MCol;
if let Some(existing) = ModelEntity::find()
.filter(MCol::ProviderId.eq(provider_id))
.filter(MCol::Name.eq(model_id_str))
.one(db)
.await?
{
let mut active: models::agents::model::ActiveModel = existing.clone().into();
active.context_length = Set(ctx);
active.status = Set(ModelStatus::Active.to_string());
active.updated_at = Set(now);
active.update(db).await?;
Ok((
ModelEntity::find_by_id(existing.id).one(db).await?.unwrap(),
false,
))
} else {
let active = models::agents::model::ActiveModel {
id: Set(Uuid::now_v7()),
provider_id: Set(provider_id),
name: Set(model_id_str.to_string()),
modality: Set(modality.to_string()),
capability: Set(capability.to_string()),
context_length: Set(ctx),
max_output_tokens: Set(None),
training_cutoff: Set(None),
is_open_source: Set(false),
status: Set(ModelStatus::Active.to_string()),
created_at: Set(now),
updated_at: Set(now),
..Default::default()
};
let inserted = active.insert(db).await.map_err(AppError::from)?;
Ok((inserted, true))
}
}
async fn upsert_version(
db: &AppDatabase,
model_uuid: Uuid,
) -> Result<(models::agents::model_version::Model, bool), AppError> {
use models::agents::model_version::Column as VCol;
let now = Utc::now();
if let Some(existing) = VersionEntity::find()
.filter(VCol::ModelId.eq(model_uuid))
.filter(VCol::IsDefault.eq(true))
.one(db)
.await?
{
Ok((existing, false))
} else {
let active = models::agents::model_version::ActiveModel {
id: Set(Uuid::now_v7()),
model_id: Set(model_uuid),
version: Set("1".to_string()),
release_date: Set(None),
change_log: Set(None),
is_default: Set(true),
status: Set(ModelStatus::Active.to_string()),
created_at: Set(now),
};
let inserted = active.insert(db).await.map_err(AppError::from)?;
Ok((inserted, true))
}
}
/// Create default pricing record with 0 price for admin-side modification.
async fn upsert_pricing(
db: &AppDatabase,
version_uuid: Uuid,
) -> Result<bool, AppError> {
use models::agents::model_pricing::Column as PCol;
use models::agents::model_pricing::Entity as PricingEntity;
let existing = PricingEntity::find()
.filter(PCol::ModelVersionId.eq(version_uuid))
.one(db)
.await?;
if existing.is_some() {
return Ok(false);
}
let active = models::agents::model_pricing::ActiveModel {
id: Set(Uuid::now_v7().as_u128() as i64),
model_version_id: Set(version_uuid),
input_price_per_1k_tokens: Set("0.00".to_string()),
output_price_per_1k_tokens: Set("0.00".to_string()),
currency: Set("USD".to_string()),
effective_from: Set(Utc::now()),
};
active.insert(db).await.map_err(AppError::from)?;
Ok(true)
}
async fn upsert_capabilities(
db: &AppDatabase,
version_uuid: Uuid,
arch: Option<&OpenRouterArchitecture>,
) -> Result<i64, AppError> {
use models::agents::model_capability::Column as CCol;
let caps = infer_capability_list(
arch.unwrap_or(&OpenRouterArchitecture {
modality: None,
input_modalities: None,
output_modalities: None,
tokenizer: None,
instruct_type: None,
}),
);
let now = Utc::now();
let mut created = 0i64;
for (cap_type, supported) in caps {
let exists = CapabilityEntity::find()
.filter(CCol::ModelVersionId.eq(version_uuid))
.filter(CCol::Capability.eq(cap_type.to_string()))
.one(db)
.await?;
if exists.is_some() {
continue;
}
let active = models::agents::model_capability::ActiveModel {
id: Set(Uuid::now_v7().as_u128() as i64),
model_version_id: Set(version_uuid.as_u128() as i64),
capability: Set(cap_type.to_string()),
is_supported: Set(supported),
created_at: Set(now),
};
active.insert(db).await.map_err(AppError::from)?;
created += 1;
}
Ok(created)
}
async fn upsert_parameter_profile(
db: &AppDatabase,
version_uuid: Uuid,
model_name: &str,
) -> Result<bool, AppError> {
use models::agents::model_parameter_profile::Column as PCol;
let existing = ProfileEntity::find()
.filter(PCol::ModelVersionId.eq(version_uuid))
.one(db)
.await?;
if existing.is_some() {
return Ok(false);
}
let lower = model_name.to_lowercase();
let (t_min, t_max) = if lower.contains("o1") || lower.contains("o3") {
(1.0, 1.0)
} else {
(0.0, 2.0)
};
let active = models::agents::model_parameter_profile::ActiveModel {
id: Set(Uuid::now_v7().as_u128() as i64),
model_version_id: Set(version_uuid),
temperature_min: Set(t_min),
temperature_max: Set(t_max),
top_p_min: Set(0.0),
top_p_max: Set(1.0),
frequency_penalty_supported: Set(true),
presence_penalty_supported: Set(true),
};
active.insert(db).await.map_err(AppError::from)?;
Ok(true)
}
/// Sync models directly from the user's AI endpoint when OpenRouter has no matching models.
/// This handles non-OpenRouter endpoints (e.g. Bailian, MiniMax) gracefully.
async fn sync_models_direct(
db: &AppDatabase,
available_ids: &std::collections::HashSet<String>,
) -> SyncModelsResponse {
tracing::info!(
model_count = available_ids.len(),
"sync_models_direct: {} models from endpoint (no OpenRouter metadata)",
available_ids.len()
);
let mut models_created = 0i64;
let mut models_updated = 0i64;
let mut versions_created = 0i64;
let mut pricing_created = 0i64;
let mut capabilities_created = 0i64;
let mut profiles_created = 0i64;
for model_id in available_ids {
let provider_slug = extract_provider(model_id);
let provider = match upsert_provider(db, provider_slug).await {
Ok(p) => p,
Err(e) => {
tracing::warn!(
provider = %provider_slug,
error = ?e,
"sync_models_direct: upsert_provider error"
);
continue;
}
};
let (model_record, _is_new) =
match upsert_model_direct(db, provider.id, model_id, None, None).await {
Ok((m, n)) => {
if n {
models_created += 1;
} else {
models_updated += 1;
}
(m, n)
}
Err(e) => {
tracing::warn!(
model = %model_id,
error = ?e,
"sync_models_direct: upsert_model_direct error"
);
continue;
}
};
let (version_record, version_is_new) =
match upsert_version(db, model_record.id).await {
Ok(v) => v,
Err(e) => {
tracing::warn!(
model = %model_id,
error = ?e,
"sync_models_direct: upsert_version error"
);
continue;
}
};
if version_is_new {
versions_created += 1;
}
if upsert_pricing(db, version_record.id).await.unwrap_or(false) {
pricing_created += 1;
}
capabilities_created +=
upsert_capabilities(db, version_record.id, None).await.unwrap_or(0);
if upsert_parameter_profile(db, version_record.id, model_id)
.await
.unwrap_or(false)
{
profiles_created += 1;
}
}
tracing::info!(
matched = available_ids.len(),
models_created,
models_updated,
versions_created,
pricing_created,
capabilities_created,
profiles_created,
"sync_models_direct complete"
);
SyncModelsResponse {
models_created,
models_updated,
versions_created,
pricing_created,
capabilities_created,
profiles_created,
}
}
// HTTP helpers ---------------------------------------------------------------
async fn fetch_openrouter_models(
client: &reqwest::Client,
) -> Result<OpenRouterResponse, String> {
const MAX_RETRIES: u32 = 3;
const BASE_DELAY_MS: u64 = 1_000;
let mut attempt = 0;
loop {
attempt += 1;
match client.get(OPENROUTER_URL).send().await {
Ok(r) => {
return match r.error_for_status() {
Ok(resp) => match resp.json::<OpenRouterResponse>().await {
Ok(root) => Ok(root),
Err(e) => Err(format!(
"failed to parse response after {} attempt(s): {}",
attempt, e
)),
},
Err(e) => Err(format!(
"HTTP status error after {} attempt(s): url={} status={}",
attempt,
e.url()
.map(|u| u.to_string())
.unwrap_or_else(|| OPENROUTER_URL.to_string()),
e
)),
};
}
Err(e) => {
let kind = if e.is_timeout() {
"timeout"
} else if e.is_connect() {
"connect"
} else {
"request"
};
let url = e
.url()
.map(|u| u.to_string())
.unwrap_or_else(|| OPENROUTER_URL.to_string());
if attempt >= MAX_RETRIES {
return Err(format!(
"OpenRouter connection failed after {} attempt(s): [{}] url={} error={:?}",
attempt, kind, url, e
));
}
let delay_ms = BASE_DELAY_MS * (1 << (attempt - 1));
tracing::warn!(
attempt = attempt,
max_retries = MAX_RETRIES,
kind = %kind,
url = %url,
error = ?e,
retry_delay_ms = delay_ms,
"OpenRouter connection attempt failed, retrying"
);
sleep(Duration::from_millis(delay_ms)).await;
}
}
}
}
/// Build reqwest client and config from the AI config.
fn build_ai_client(config: &config::AppConfig) -> Result<(reqwest::Client, String, String), AppError> {
let api_key = config
.ai_api_key()
.map_err(|e| AppError::InternalServerError(format!("AI API key not configured: {}", e)))?;
let base_url = config
.ai_basic_url()
.unwrap_or_else(|_| "https://api.openai.com".into());
Ok((reqwest::Client::new(), base_url, api_key))
}
/// Response from `GET /v1/models`.
#[derive(Debug, Deserialize)]
struct ModelsListResponse {
data: Vec<ModelEntry>,
}
#[derive(Debug, Deserialize)]
struct ModelEntry {
id: String,
}
/// List accessible model IDs from the AI endpoint.
async fn list_accessible_models(
client: &reqwest::Client,
base_url: &str,
api_key: &str,
) -> Result<std::collections::HashSet<String>, AppError> {
let url = format!("{}/v1/models", base_url.trim_end_matches('/'));
let resp = client
.get(&url)
.header("Authorization", format!("Bearer {}", api_key))
.send()
.await
.map_err(|e| AppError::InternalServerError(format!("failed to list models: {}", e)))?;
let body: ModelsListResponse = resp
.json()
.await
.map_err(|e| AppError::InternalServerError(format!("failed to parse models response: {}", e)))?;
Ok(body.data.into_iter().map(|m| m.id).collect())
}
impl AppService {
/// Sync metadata for models that are accessible by the configured AI client.
///
/// Steps:
/// 1. Call `client.models().list()` to get the set of accessible model IDs.
/// 2. Fetch full model list from OpenRouter's public `/api/v1/models` endpoint.
/// 3. Keep only models whose ID appears in the accessible set, then upsert
/// with full OpenRouter metadata.
/// 4. Models NOT in OpenRouter's catalog are also upserted through the
/// same pipeline (provider → model → version → pricing → capabilities
/// → parameter_profile) with inferred defaults.
pub async fn sync_upstream_models(
&self,
_ctx: &Session,
) -> Result<SyncModelsResponse, AppError> {
// Step 1: list models the AI client can access.
let (http_client, base_url, api_key) = build_ai_client(&self.config)?;
let available_ids = list_accessible_models(&http_client, &base_url, &api_key).await?;
tracing::info!(
model_count = available_ids.len(),
"sync_upstream_models: {} accessible models found",
available_ids.len()
);
// Step 2: fetch OpenRouter metadata (optional — failure falls back to
// direct sync for all available models).
let http_client = reqwest::Client::new();
let or_models: Vec<OpenRouterModel> = match fetch_openrouter_models(&http_client).await {
Ok(resp) => resp.data,
Err(msg) => {
tracing::warn!(error = %msg, "sync_upstream_models: OpenRouter fetch failed, falling back to direct sync");
let direct_result = sync_models_direct(&self.db, &available_ids).await;
return Ok(direct_result);
}
};
// Step 3: filter to only accessible models.
let filtered: Vec<&OpenRouterModel> = or_models
.iter()
.filter(|m| available_ids.contains(&m.id))
.filter(|m| m.id != "openrouter/auto")
.collect();
// Identify "stranger" models: accessible from the user's endpoint but
// NOT present in OpenRouter's public catalog. These are also upserted
// through the same pipeline (provider → model → version → pricing →
// capabilities → parameter_profile) with inferred defaults.
let or_matched_ids: std::collections::HashSet<&str> = filtered
.iter()
.map(|m| m.id.as_str())
.collect();
let unknown_ids: Vec<&str> = available_ids
.iter()
.filter(|id| !or_matched_ids.contains(id.as_str()))
.map(|s| s.as_str())
.collect();
let filtered_count = filtered.len();
let mut models_created = 0i64;
let mut models_updated = 0i64;
let mut versions_created = 0i64;
let mut pricing_created = 0i64;
let mut capabilities_created = 0i64;
let mut profiles_created = 0i64;
// Sync stranger models (non-OpenRouter) through the direct pipeline.
if !unknown_ids.is_empty() {
tracing::info!(
unknown_count = unknown_ids.len(),
"sync_upstream_models: {} models not in OpenRouter catalog, syncing directly",
unknown_ids.len()
);
let unknown_set: std::collections::HashSet<String> =
unknown_ids.iter().map(|s| ToString::to_string(s)).collect();
let direct_result = sync_models_direct(&self.db, &unknown_set).await;
models_created += direct_result.models_created;
models_updated += direct_result.models_updated;
versions_created += direct_result.versions_created;
pricing_created += direct_result.pricing_created;
capabilities_created += direct_result.capabilities_created;
profiles_created += direct_result.profiles_created;
}
// If no OpenRouter metadata matched at all, the direct sync above
// already handled everything — return early.
if filtered_count == 0 {
return Ok(SyncModelsResponse {
models_created,
models_updated,
versions_created,
pricing_created,
capabilities_created,
profiles_created,
});
}
for or_model in filtered {
let provider_slug = extract_provider(&or_model.id);
let provider = match upsert_provider(&self.db, provider_slug).await {
Ok(p) => p,
Err(e) => {
tracing::warn!(
provider = %provider_slug,
error = ?e,
"sync_upstream_models: upsert_provider error"
);
continue;
}
};
let (model_record, _is_new) =
match upsert_model(&self.db, provider.id, &or_model.id, or_model).await {
Ok((m, n)) => {
if n {
models_created += 1;
} else {
models_updated += 1;
}
(m, n)
}
Err(e) => {
tracing::warn!(
model = %or_model.id,
error = ?e,
"sync_upstream_models: upsert_model error"
);
continue;
}
};
let (version_record, version_is_new) =
match upsert_version(&self.db, model_record.id).await {
Ok(v) => v,
Err(e) => {
tracing::warn!(
model = %or_model.id,
error = ?e,
"sync_upstream_models: upsert_version error"
);
continue;
}
};
if version_is_new {
versions_created += 1;
}
if upsert_pricing(&self.db, version_record.id).await.unwrap_or(false) {
pricing_created += 1;
}
capabilities_created +=
upsert_capabilities(&self.db, version_record.id, or_model.architecture.as_ref())
.await
.unwrap_or(0);
if upsert_parameter_profile(&self.db, version_record.id, &or_model.id)
.await
.unwrap_or(false)
{
profiles_created += 1;
}
}
tracing::info!(
filtered_count,
models_created,
models_updated,
"sync_upstream_models: synced {} accessible models ({}) OpenRouter + ({}) direct",
filtered_count + unknown_ids.len(),
filtered_count,
unknown_ids.len()
);
Ok(SyncModelsResponse {
models_created,
models_updated,
versions_created,
pricing_created,
capabilities_created,
profiles_created,
})
}
/// Spawn a background task that syncs model metadata immediately
/// and then every 10 minutes. Returns the `JoinHandle`.
///
/// Failures are logged but do not stop the task — it keeps retrying.
pub fn start_sync_task(self) -> JoinHandle<()> {
let db = self.db.clone();
let ai_api_key = self.config.ai_api_key().ok();
let ai_base_url = self.config.ai_basic_url().ok();
tokio::spawn(async move {
// Run once immediately on startup before taking traffic.
Self::sync_once(&db, ai_api_key.clone(), ai_base_url.clone()).await;
let mut tick = interval(Duration::from_secs(60 * 10));
loop {
tick.tick().await;
Self::sync_once(&db, ai_api_key.clone(), ai_base_url.clone()).await;
}
})
}
/// Perform a single sync pass. Errors are logged and silently swallowed
/// so the periodic task never stops.
async fn sync_once(
db: &AppDatabase,
ai_api_key: Option<String>,
ai_base_url: Option<String>,
) {
// Build AI client to list accessible models.
let (http_client, base_url, api_key) = match build_ai_client_from_parts(ai_api_key, ai_base_url) {
Ok(c) => c,
Err(msg) => {
tracing::warn!(error = %msg, "OpenRouter model sync");
return;
}
};
let available_ids = match list_accessible_models(&http_client, &base_url, &api_key).await {
Ok(ids) => ids,
Err(e) => {
tracing::warn!(error = ?e, "OpenRouter model sync: failed to list available models");
return;
}
};
let http_client = reqwest::Client::new();
let or_models: Vec<OpenRouterModel> = match fetch_openrouter_models(&http_client).await {
Ok(resp) => resp.data,
Err(msg) => {
tracing::warn!(error = %msg, "OpenRouter model sync: fetch failed, falling back to direct sync");
sync_models_direct(db, &available_ids).await;
return;
}
};
let filtered: Vec<&OpenRouterModel> = or_models
.iter()
.filter(|m| available_ids.contains(&m.id))
.filter(|m| m.id != "openrouter/auto")
.collect();
// Identify "stranger" models: accessible from the user's endpoint but
// NOT present in OpenRouter's public catalog.
let or_matched_ids: std::collections::HashSet<&str> = filtered
.iter()
.map(|m| m.id.as_str())
.collect();
let unknown_ids: Vec<&str> = available_ids
.iter()
.filter(|id| !or_matched_ids.contains(id.as_str()))
.map(|s| s.as_str())
.collect();
let filtered_count = filtered.len();
// Sync stranger models (non-OpenRouter) through the direct pipeline.
let mut models_created = 0i64;
let mut models_updated = 0i64;
let mut versions_created = 0i64;
let mut pricing_created = 0i64;
let mut capabilities_created = 0i64;
let mut profiles_created = 0i64;
if !unknown_ids.is_empty() {
tracing::info!(
unknown_count = unknown_ids.len(),
"OpenRouter model sync: {} models not in OpenRouter catalog, syncing directly",
unknown_ids.len()
);
let unknown_set: std::collections::HashSet<String> =
unknown_ids.iter().map(|s| ToString::to_string(s)).collect();
let direct_result = sync_models_direct(db, &unknown_set).await;
models_created += direct_result.models_created;
models_updated += direct_result.models_updated;
versions_created += direct_result.versions_created;
pricing_created += direct_result.pricing_created;
capabilities_created += direct_result.capabilities_created;
profiles_created += direct_result.profiles_created;
}
// If no OpenRouter metadata matched at all, the direct sync above
// already handled everything — return early.
if filtered_count == 0 {
tracing::info!(
matched = filtered_count,
models_created,
models_updated,
versions_created,
pricing_created,
capabilities_created,
profiles_created,
"OpenRouter model sync complete (direct only)"
);
return;
}
for or_model in filtered {
let provider_slug = extract_provider(&or_model.id);
let provider = match upsert_provider(db, provider_slug).await {
Ok(p) => p,
Err(e) => {
tracing::warn!(
provider = %provider_slug,
error = ?e,
"OpenRouter model sync: upsert_provider error"
);
continue;
}
};
let (model_record, _is_new) =
match upsert_model(db, provider.id, &or_model.id, or_model).await {
Ok((m, true)) => {
models_created += 1;
(m, true)
}
Ok((m, false)) => {
models_updated += 1;
(m, false)
}
Err(e) => {
tracing::warn!(
model = %or_model.id,
error = ?e,
"OpenRouter model sync: upsert_model error"
);
continue;
}
};
let (version_record, version_is_new) =
match upsert_version(db, model_record.id).await {
Ok(v) => v,
Err(e) => {
tracing::warn!(
model = %or_model.id,
error = ?e,
"OpenRouter model sync: upsert_version error"
);
continue;
}
};
if version_is_new {
versions_created += 1;
}
if upsert_pricing(db, version_record.id).await.unwrap_or(false) {
pricing_created += 1;
}
capabilities_created +=
upsert_capabilities(db, version_record.id, or_model.architecture.as_ref())
.await
.unwrap_or(0);
if upsert_parameter_profile(db, version_record.id, &or_model.id)
.await
.unwrap_or(false)
{
profiles_created += 1;
}
}
tracing::info!(
matched = filtered_count,
unknown = unknown_ids.len(),
models_created,
models_updated,
versions_created,
pricing_created,
capabilities_created,
profiles_created,
"OpenRouter model sync complete: {} total ({} OpenRouter + {} direct)",
filtered_count + unknown_ids.len(),
filtered_count,
unknown_ids.len()
);
}
}
/// Build a reqwest client and config parts for background sync task.
fn build_ai_client_from_parts(
api_key: Option<String>,
base_url: Option<String>,
) -> Result<(reqwest::Client, String, String), String> {
let api_key = api_key.ok_or_else(|| "AI API key not configured".to_string())?;
let base_url = base_url.unwrap_or_else(|| "https://api.openai.com".into());
Ok((reqwest::Client::new(), base_url, api_key))
}