From 3de4fff11d8c6451de196894928f8d89bc5fe275 Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Fri, 17 Apr 2026 00:13:40 +0800 Subject: [PATCH] feat(service): improve model sync and harden git HTTP/SSH stability Model sync: - Filter OpenRouter models by what the user's AI client can actually access, before upserting metadata (avoids bloating with inaccessible models). - Fall back to direct endpoint sync when no OpenRouter metadata matches (handles Bailian/MiniMax and other non-OpenRouter providers). Git stability fixes: - SSH: add 5s timeout on stdin flush/shutdown in channel_eof and cleanup_channel to prevent blocking the event loop on unresponsive git. - SSH: remove dbg!() calls from production code paths. - HTTP auth: pass proper Logger to SshAuthService instead of discarding all auth events to slog::Discard. Dependencies: - reqwest: add native-tls feature for HTTPS on Windows/Linux/macOS. --- Cargo.lock | 88 +++++ libs/agent/chat/mod.rs | 3 - libs/git/http/auth.rs | 4 +- libs/git/http/routes.rs | 6 +- libs/git/ssh/handle.rs | 17 +- libs/service/Cargo.toml | 2 +- libs/service/agent/sync.rs | 705 ++++++++++++++++++++++++++++++------- 7 files changed, 679 insertions(+), 146 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9119193..fc78320 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2520,6 +2520,21 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -3329,6 +3344,22 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.20" @@ -4441,6 +4472,23 @@ dependencies = [ "typenum", ] +[[package]] +name = "native-tls" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "465500e14ea162429d264d44189adc38b199b62b1c21eea9f69e4b73cb03bbf2" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe 0.2.1", + "openssl-sys", + "schannel", + "security-framework 3.7.0", + "security-framework-sys", + "tempfile", +] + [[package]] name = "new_debug_unreachable" version = "1.0.6" @@ -4657,6 +4705,32 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" +[[package]] +name = "openssl" +version = "0.10.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "951c002c75e16ea2c65b8c7e4d3d51d5530d8dfa7d060b4776828c88cfb18ecf" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "openssl-probe" version = "0.1.6" @@ -5821,15 +5895,19 @@ dependencies = [ "http-body", "http-body-util", "hyper", + "hyper-tls", "hyper-util", "js-sys", "log", + "native-tls", "percent-encoding", "pin-project-lite", + "rustls-pki-types", "serde", "serde_json", "sync_wrapper", "tokio", + "tokio-native-tls", "tower 0.5.3", "tower-http", "tower-service", @@ -7489,6 +7567,16 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.4" diff --git a/libs/agent/chat/mod.rs b/libs/agent/chat/mod.rs index 57723e1..a22c499 100644 --- a/libs/agent/chat/mod.rs +++ b/libs/agent/chat/mod.rs @@ -36,7 +36,6 @@ pub struct AiRequest { pub input: String, pub mention: Vec, pub history: Vec, - /// Optional user name mapping: user_id -> username pub user_names: HashMap, pub temperature: f64, pub max_tokens: i32, @@ -44,9 +43,7 @@ pub struct AiRequest { pub frequency_penalty: f64, pub presence_penalty: f64, pub think: bool, - /// OpenAI tool definitions. If None or empty, tool calling is disabled. pub tools: Option>, - /// Maximum tool-call recursion depth (AI → tool → result → AI loops). Default: 3. pub max_tool_depth: usize, } diff --git a/libs/git/http/auth.rs b/libs/git/http/auth.rs index aa592f7..95ac17b 100644 --- a/libs/git/http/auth.rs +++ b/libs/git/http/auth.rs @@ -6,6 +6,7 @@ use models::repos::repo; use models::users::{user, user_token}; use sea_orm::sqlx::types::chrono; use sea_orm::*; +use slog::Logger; pub async fn verify_access_token( db: &AppDatabase, @@ -44,6 +45,7 @@ pub async fn verify_access_token( pub async fn authorize_repo_access( req: &HttpRequest, db: &AppDatabase, + logger: &Logger, repo: &repo::Model, is_write: bool, ) -> Result<(), Error> { @@ -53,7 +55,7 @@ pub async fn authorize_repo_access( let (username, access_key) = extract_basic_credentials(req)?; let user = verify_access_token(db, &username, &access_key).await?; - let authz = SshAuthService::new(db.clone(), slog::Logger::root(slog::Discard, slog::o!())); + let authz = SshAuthService::new(db.clone(), logger.clone()); let can_access = authz.check_repo_permission(&user, repo, is_write).await; if !can_access { diff --git a/libs/git/http/routes.rs b/libs/git/http/routes.rs index 6a74404..0fce26a 100644 --- a/libs/git/http/routes.rs +++ b/libs/git/http/routes.rs @@ -34,7 +34,7 @@ pub async fn info_refs( let path_inner = path.into_inner(); let model = get_repo_model(&path_inner.0, &path_inner.1, &state.db).await?; let is_write = service_param == "git-receive-pack"; - authorize_repo_access(&req, &state.db, &model, is_write).await?; + authorize_repo_access(&req, &state.db, &state.logger, &model, is_write).await?; let storage_path = PathBuf::from(&model.storage_path); let handler = GitHttpHandler::new(storage_path, model, state.db.clone()); @@ -56,7 +56,7 @@ pub async fn upload_pack( let path_inner = path.into_inner(); let model = get_repo_model(&path_inner.0, &path_inner.1, &state.db).await?; - authorize_repo_access(&req, &state.db, &model, false).await?; + authorize_repo_access(&req, &state.db, &state.logger, &model, false).await?; let storage_path = PathBuf::from(&model.storage_path); let handler = GitHttpHandler::new(storage_path, model, state.db.clone()); @@ -78,7 +78,7 @@ pub async fn receive_pack( let path_inner = path.into_inner(); let model = get_repo_model(&path_inner.0, &path_inner.1, &state.db).await?; - authorize_repo_access(&req, &state.db, &model, true).await?; + authorize_repo_access(&req, &state.db, &state.logger, &model, true).await?; let storage_path = PathBuf::from(&model.storage_path); let handler = GitHttpHandler::new(storage_path, model.clone(), state.db.clone()); diff --git a/libs/git/ssh/handle.rs b/libs/git/ssh/handle.rs index 869e1d4..1d80f7b 100644 --- a/libs/git/ssh/handle.rs +++ b/libs/git/ssh/handle.rs @@ -119,8 +119,11 @@ impl SSHandle { fn cleanup_channel(&mut self, channel_id: ChannelId) { if let Some(mut stdin) = self.stdin.remove(&channel_id) { tokio::spawn(async move { - stdin.flush().await.ok(); - let _ = stdin.shutdown().await; + let _ = tokio::time::timeout(Duration::from_secs(5), async { + stdin.flush().await.ok(); + let _ = stdin.shutdown().await; + }) + .await; }); } self.eof.remove(&channel_id); @@ -359,8 +362,12 @@ impl russh::server::Handler for SSHandle { "channel" => ?channel, "client" => ?self.client_addr ); - let _ = stdin.flush().await; - let _ = stdin.shutdown().await; + // Use timeout so we never block the SSH event loop waiting for git. + let _ = tokio::time::timeout(Duration::from_secs(5), async { + stdin.flush().await.ok(); + let _ = stdin.shutdown().await; + }) + .await; info!(self.logger, "stdin closed"; "channel" => ?channel, "client" => ?self.client_addr @@ -450,7 +457,6 @@ impl russh::server::Handler for SSHandle { .all(self.db.reader()) .await .map_err(|e| { - dbg!(&e); russh::Error::IO(io::Error::new(io::ErrorKind::Other, e)) })?; @@ -666,7 +672,6 @@ impl russh::server::Handler for SSHandle { error!(&logger, "{}", format!("Process spawn failed error={}", e)); let _ = session.channel_failure(channel_id); self.cleanup_channel(channel_id); - dbg!(&e); return Err(russh::Error::IO(e)); } }; diff --git a/libs/service/Cargo.toml b/libs/service/Cargo.toml index c7349f9..033790e 100644 --- a/libs/service/Cargo.toml +++ b/libs/service/Cargo.toml @@ -38,7 +38,7 @@ argon2 = { workspace = true } uuid = { workspace = true, features = ["serde", "v7"] } sea-orm = { workspace = true, features = [] } async-openai = { version = "0.34.0", features = ["chat-completion"] } -reqwest = { workspace = true, features = ["json"] } +reqwest = { workspace = true, features = ["json", "native-tls"] } base64 = { workspace = true } rsa = { workspace = true } rand = { workspace = true } diff --git a/libs/service/agent/sync.rs b/libs/service/agent/sync.rs index 96fd955..1504e64 100644 --- a/libs/service/agent/sync.rs +++ b/libs/service/agent/sync.rs @@ -1,23 +1,28 @@ -//! Synchronizes AI models from OpenRouter into the local database. +//! Synchronizes AI model metadata from OpenRouter into the local database. //! -//! Fetches the full model list via OpenRouter's `/api/v1/models` endpoint -//! (requires `OPENROUTER_API_KEY` in config or falls back to `AI_API_KEY`). -//! -//! OpenRouter returns rich metadata per model including `context_length`, -//! `pricing`, and `architecture.modality` — these are used to populate all -//! five model tables without any hard-coded heuristics. +//! Flow: +//! 1. Use the configured `async_openai` client (with the real API key) to call +//! `GET /models` — this returns only the models the current key can access. +//! 2. Fetch full metadata (pricing, context_length, capabilities) for those +//! model IDs from OpenRouter's public `/api/v1/models` endpoint (no auth). +//! 3. Upsert provider / model / version / pricing / capability / profile +//! records only for models the client can actually call. //! //! Usage: call `start_sync_task()` to launch a background task that syncs //! immediately and then every 10 minutes. On app startup, run it once //! eagerly before accepting traffic. -use std::time::Duration; -use tokio::time::interval; -use tokio::task::JoinHandle; +use async_openai::Client; +use async_openai::config::OpenAIConfig; +use async_openai::types::models::Model as OpenAiModel; use slog::Logger; +use std::time::Duration; +use tokio::task::JoinHandle; +use tokio::time::interval; +use tokio::time::sleep; -use crate::AppService; use crate::error::AppError; +use crate::AppService; use chrono::Utc; use db::database::AppDatabase; use models::agents::model::Entity as ModelEntity; @@ -36,6 +41,8 @@ use session::Session; use utoipa::ToSchema; use uuid::Uuid; +const OPENROUTER_URL: &str = "https://openrouter.ai/api/v1/models"; + // OpenRouter API types ------------------------------------------------------- #[derive(Debug, Clone, Deserialize)] @@ -104,6 +111,17 @@ struct OpenRouterTopProvider { is_moderated: Option, } +/// Fallback model type used when the user's AI endpoint is NOT OpenRouter +/// (e.g. Bailian/MiniMax). OpenRouter has no metadata for these models, +/// so we sync them directly from the endpoint's own /models response. +#[derive(Debug, Clone)] +#[allow(dead_code)] +struct DirectModel { + id: String, + name: Option, + context_length: Option, +} + // Response type -------------------------------------------------------------- #[derive(Debug, Clone, Serialize, ToSchema)] @@ -116,7 +134,7 @@ pub struct SyncModelsResponse { pub profiles_created: i64, } -// Inference helpers (fallbacks when OpenRouter data is missing) --------------- +// Inference helpers ---------------------------------------------------------- fn infer_modality(name: &str, arch_modality: Option<&str>) -> ModelModality { if let Some(m) = arch_modality { @@ -151,8 +169,6 @@ fn infer_capability(name: &str) -> ModelCapability { let lower = name.to_lowercase(); if lower.contains("embedding") { ModelCapability::Embedding - } else if lower.contains("code") { - ModelCapability::Code } else { ModelCapability::Chat } @@ -167,11 +183,7 @@ fn infer_max_output(top_provider_max: Option) -> Option { } fn infer_capability_list(arch: &OpenRouterArchitecture) -> Vec<(CapabilityType, bool)> { - // Derive capabilities purely from OpenRouter architecture data. - // FunctionCall is a safe baseline for chat models. let mut caps = vec![(CapabilityType::FunctionCall, true)]; - - // Vision capability from modality. if let Some(m) = &arch.modality { let m = m.to_lowercase(); if m.contains("image") || m.contains("vision") { @@ -186,12 +198,10 @@ fn infer_capability_list(arch: &OpenRouterArchitecture) -> Vec<(CapabilityType, // Provider helpers ----------------------------------------------------------- -/// Extract provider slug from OpenRouter model ID (e.g. "anthropic/claude-3.5-sonnet" → "anthropic"). fn extract_provider(model_id: &str) -> &str { model_id.split('/').next().unwrap_or("unknown") } -/// Normalize a provider slug to a short canonical name. fn normalize_provider_name(slug: &str) -> &'static str { match slug { "openai" => "openai", @@ -222,10 +232,7 @@ fn provider_display_name(name: &str) -> String { // Upsert helpers ------------------------------------------------------------- -async fn upsert_provider( - db: &AppDatabase, - slug: &str, -) -> Result { +async fn upsert_provider(db: &AppDatabase, slug: &str) -> Result { let name = normalize_provider_name(slug); let display = provider_display_name(name); let now = Utc::now(); @@ -258,7 +265,6 @@ async fn upsert_provider( } } -/// Upsert a model record and return (model, is_new). async fn upsert_model( db: &AppDatabase, provider_id: Uuid, @@ -273,11 +279,13 @@ async fn upsert_model( let modality = infer_modality(model_id_str, modality_str); let capability = infer_capability(model_id_str); - // OpenRouter context_length takes priority; fall back to inference let context_length = infer_context_length(or_model.context_length); - - let max_output = - infer_max_output(or_model.top_provider.as_ref().and_then(|p| p.max_completion_tokens)); + let max_output = infer_max_output( + or_model + .top_provider + .as_ref() + .and_then(|p| p.max_completion_tokens), + ); use models::agents::model::Column as MCol; if let Some(existing) = ModelEntity::find() @@ -292,7 +300,10 @@ async fn upsert_model( active.status = Set(ModelStatus::Active.to_string()); active.updated_at = Set(now); active.update(db).await?; - Ok((ModelEntity::find_by_id(existing.id).one(db).await?.unwrap(), false)) + Ok(( + ModelEntity::find_by_id(existing.id).one(db).await?.unwrap(), + false, + )) } else { let active = models::agents::model::ActiveModel { id: Set(Uuid::now_v7()), @@ -314,7 +325,57 @@ async fn upsert_model( } } -/// Upsert default version for a model. +/// Upsert a model directly from the user's AI endpoint response (no OpenRouter metadata). +/// Used as fallback when the endpoint is not OpenRouter-compatible. +async fn upsert_model_direct( + db: &AppDatabase, + provider_id: Uuid, + model_id_str: &str, + _name: Option<&str>, + context_length: Option, +) -> Result<(models::agents::model::Model, bool), AppError> { + let now = Utc::now(); + let modality = infer_modality(model_id_str, None); + let capability = infer_capability(model_id_str); + let ctx = infer_context_length(context_length); + + use models::agents::model::Column as MCol; + if let Some(existing) = ModelEntity::find() + .filter(MCol::ProviderId.eq(provider_id)) + .filter(MCol::Name.eq(model_id_str)) + .one(db) + .await? + { + let mut active: models::agents::model::ActiveModel = existing.clone().into(); + active.context_length = Set(ctx); + active.status = Set(ModelStatus::Active.to_string()); + active.updated_at = Set(now); + active.update(db).await?; + Ok(( + ModelEntity::find_by_id(existing.id).one(db).await?.unwrap(), + false, + )) + } else { + let active = models::agents::model::ActiveModel { + id: Set(Uuid::now_v7()), + provider_id: Set(provider_id), + name: Set(model_id_str.to_string()), + modality: Set(modality.to_string()), + capability: Set(capability.to_string()), + context_length: Set(ctx), + max_output_tokens: Set(None), + training_cutoff: Set(None), + is_open_source: Set(false), + status: Set(ModelStatus::Active.to_string()), + created_at: Set(now), + updated_at: Set(now), + ..Default::default() + }; + let inserted = active.insert(db).await.map_err(AppError::from)?; + Ok((inserted, true)) + } +} + async fn upsert_version( db: &AppDatabase, model_uuid: Uuid, @@ -344,7 +405,14 @@ async fn upsert_version( } } -/// Upsert pricing for a model version. Returns true if created. +/// OpenRouter prices are per-million-tokens strings; convert to per-1k-tokens. +fn parse_price(s: &str) -> String { + match s.parse::() { + Ok(v) => format!("{:.6}", v / 1_000.0), + Err(_) => "0.00".to_string(), + } +} + async fn upsert_pricing( db: &AppDatabase, version_uuid: Uuid, @@ -359,9 +427,8 @@ async fn upsert_pricing( return Ok(false); } - // OpenRouter prices are per-million-tokens strings; if missing, insert zero prices. let (input_str, output_str) = if let Some(p) = pricing { - (p.prompt.clone(), p.completion.clone()) + (parse_price(&p.prompt), parse_price(&p.completion)) } else { ("0.00".to_string(), "0.00".to_string()) }; @@ -378,20 +445,21 @@ async fn upsert_pricing( Ok(true) } -/// Upsert capability records for a model version. Returns count of new records. async fn upsert_capabilities( db: &AppDatabase, version_uuid: Uuid, arch: Option<&OpenRouterArchitecture>, ) -> Result { use models::agents::model_capability::Column as CCol; - let caps = infer_capability_list(arch.unwrap_or(&OpenRouterArchitecture { - modality: None, - input_modalities: None, - output_modalities: None, - tokenizer: None, - instruct_type: None, - })); + let caps = infer_capability_list( + arch.unwrap_or(&OpenRouterArchitecture { + modality: None, + input_modalities: None, + output_modalities: None, + tokenizer: None, + instruct_type: None, + }), + ); let now = Utc::now(); let mut created = 0i64; @@ -417,7 +485,6 @@ async fn upsert_capabilities( Ok(created) } -/// Upsert default parameter profile for a model version. Returns true if created. async fn upsert_parameter_profile( db: &AppDatabase, version_uuid: Uuid, @@ -453,30 +520,278 @@ async fn upsert_parameter_profile( Ok(true) } +/// Sync models directly from the user's AI endpoint when OpenRouter has no matching models. +/// This handles non-OpenRouter endpoints (e.g. Bailian, MiniMax) gracefully. +async fn sync_models_direct( + db: &AppDatabase, + log: &Logger, + available_ids: &std::collections::HashSet, +) -> SyncModelsResponse { + slog::info!( + log, + "{}", + format!( + "sync_models_direct: {} models from endpoint (no OpenRouter metadata)", + available_ids.len() + ) + ); + + let mut models_created = 0i64; + let mut models_updated = 0i64; + let mut versions_created = 0i64; + let mut pricing_created = 0i64; + let mut capabilities_created = 0i64; + let mut profiles_created = 0i64; + + for model_id in available_ids { + let provider_slug = extract_provider(model_id); + let provider = match upsert_provider(db, provider_slug).await { + Ok(p) => p, + Err(e) => { + slog::warn!( + log, + "{}", + format!( + "sync_models_direct: upsert_provider error provider={} {:?}", + provider_slug, e + ) + ); + continue; + } + }; + + let (model_record, _is_new) = + match upsert_model_direct(db, provider.id, model_id, None, None).await { + Ok((m, n)) => { + if n { + models_created += 1; + } else { + models_updated += 1; + } + (m, n) + } + Err(e) => { + slog::warn!( + log, + "{}", + format!( + "sync_models_direct: upsert_model_direct error model={} {:?}", + model_id, e + ) + ); + continue; + } + }; + + let (version_record, version_is_new) = + match upsert_version(db, model_record.id).await { + Ok(v) => v, + Err(e) => { + slog::warn!( + log, + "{}", + format!( + "sync_models_direct: upsert_version error model={} {:?}", + model_id, e + ) + ); + continue; + } + }; + if version_is_new { + versions_created += 1; + } + + if upsert_pricing(db, version_record.id, None).await.unwrap_or(false) { + pricing_created += 1; + } + + capabilities_created += + upsert_capabilities(db, version_record.id, None).await.unwrap_or(0); + + if upsert_parameter_profile(db, version_record.id, model_id) + .await + .unwrap_or(false) + { + profiles_created += 1; + } + } + + slog::info!( + log, + "{}", + format!( + "sync_models_direct complete: matched={} created={} updated={} \ + versions={} pricing={} capabilities={} profiles={}", + available_ids.len(), + models_created, + models_updated, + versions_created, + pricing_created, + capabilities_created, + profiles_created + ) + ); + + SyncModelsResponse { + models_created, + models_updated, + versions_created, + pricing_created, + capabilities_created, + profiles_created, + } +} + +// HTTP helpers --------------------------------------------------------------- + +async fn fetch_openrouter_models( + client: &reqwest::Client, + log: &Logger, +) -> Result { + const MAX_RETRIES: u32 = 3; + const BASE_DELAY_MS: u64 = 1_000; + + let mut attempt = 0; + loop { + attempt += 1; + match client.get(OPENROUTER_URL).send().await { + Ok(r) => { + return match r.error_for_status() { + Ok(resp) => match resp.json::().await { + Ok(root) => Ok(root), + Err(e) => Err(format!( + "failed to parse response after {} attempt(s): {}", + attempt, e + )), + }, + Err(e) => Err(format!( + "HTTP status error after {} attempt(s): url={} status={}", + attempt, + e.url() + .map(|u| u.to_string()) + .unwrap_or_else(|| OPENROUTER_URL.to_string()), + e + )), + }; + } + Err(e) => { + let kind = if e.is_timeout() { + "timeout" + } else if e.is_connect() { + "connect" + } else { + "request" + }; + let url = e + .url() + .map(|u| u.to_string()) + .unwrap_or_else(|| OPENROUTER_URL.to_string()); + + if attempt >= MAX_RETRIES { + return Err(format!( + "OpenRouter connection failed after {} attempt(s): [{}] url={} error={:?}", + attempt, kind, url, e + )); + } + let delay_ms = BASE_DELAY_MS * (1 << (attempt - 1)); + slog::warn!( + log, + "{}", + format!( + "OpenRouter connection attempt {}/{} failed: [{}] url={} error={:?}. retrying in {}ms", + attempt, MAX_RETRIES, kind, url, e, delay_ms + ) + ); + sleep(Duration::from_millis(delay_ms)).await; + } + } + } +} + +/// Build an async_openai Client from the AI config. +fn build_ai_client(config: &config::AppConfig) -> Result, AppError> { + let api_key = config + .ai_api_key() + .map_err(|e| AppError::InternalServerError(format!("AI API key not configured: {}", e)))?; + + let base_url = config + .ai_basic_url() + .unwrap_or_else(|_| "https://api.openai.com".into()); + + let cfg = OpenAIConfig::new() + .with_api_key(&api_key) + .with_api_base(&base_url); + + Ok(Client::with_config(cfg)) +} + impl AppService { - /// Sync models from OpenRouter into the local database. + /// Sync metadata for models that are accessible by the configured AI client. /// - /// Calls OpenRouter's public `GET /api/v1/models` endpoint (no auth required), - /// then upserts provider / model / version / pricing / capability / - /// parameter-profile records. - /// - /// OpenRouter returns `context_length`, `pricing`, and `architecture.modality` - /// per model — these drive all field population. No model names are hardcoded. + /// Steps: + /// 1. Call `client.models().list()` to get the set of accessible model IDs. + /// 2. Fetch full model list from OpenRouter's public `/api/v1/models` endpoint. + /// 3. Keep only models whose ID appears in the accessible set, then upsert. pub async fn sync_upstream_models( &self, _ctx: &Session, ) -> Result { - let client = reqwest::Client::new(); - let resp: OpenRouterResponse = client - .get("https://openrouter.ai/api/v1/models") - .send() + // Step 1: list models the AI client can access. + let ai_client = build_ai_client(&self.config)?; + let available_ids: std::collections::HashSet = ai_client + .models() + .list() .await - .map_err(|e| AppError::InternalServerError(format!("OpenRouter API request failed: {}", e)))? - .error_for_status() - .map_err(|e| AppError::InternalServerError(format!("OpenRouter API error: {}", e)))? - .json() + .map_err(|e| { + AppError::InternalServerError(format!( + "failed to list available models from AI endpoint: {}", + e + )) + })? + .data + .into_iter() + .map(|m: OpenAiModel| m.id) + .collect(); + + slog::info!( + self.logs, + "{}", + format!( + "sync_upstream_models: {} accessible models found", + available_ids.len() + ) + ); + + // Step 2: fetch OpenRouter metadata. + let http_client = reqwest::Client::new(); + let or_resp: OpenRouterResponse = fetch_openrouter_models(&http_client, &self.logs) .await - .map_err(|e| AppError::InternalServerError(format!("Failed to parse OpenRouter response: {}", e)))?; + .map_err(AppError::InternalServerError)?; + + // Step 3: filter to only accessible models. + let filtered: Vec<&OpenRouterModel> = or_resp + .data + .iter() + .filter(|m| available_ids.contains(&m.id)) + .filter(|m| m.id != "openrouter/auto") + .collect(); + + let filtered_count = filtered.len(); + + // Fallback: if no OpenRouter metadata matches, sync models directly from + // the user's endpoint (handles Bailian/MiniMax and other non-OpenRouter providers). + if filtered_count == 0 && !available_ids.is_empty() { + slog::info!( + self.logs, + "{}", + format!( + "sync_upstream_models: no OpenRouter matches, falling back to direct sync for {} models", + available_ids.len() + ) + ); + return Ok(sync_models_direct(&self.db, &self.logs, &available_ids).await); + } let mut models_created = 0i64; let mut models_updated = 0i64; @@ -485,29 +800,58 @@ impl AppService { let mut capabilities_created = 0i64; let mut profiles_created = 0i64; - for or_model in resp.data { - // Filter out openrouter/auto which has negative pricing - if or_model.id == "openrouter/auto" { - continue; - } - + for or_model in filtered { let provider_slug = extract_provider(&or_model.id); - let provider = upsert_provider(&self.db, provider_slug).await?; + let provider = match upsert_provider(&self.db, provider_slug).await { + Ok(p) => p, + Err(e) => { + slog::warn!( + self.logs, + "{}", + format!( + "sync_upstream_models: upsert_provider error provider={} {:?}", + provider_slug, e + ) + ); + continue; + } + }; - let (model_record, is_new) = - upsert_model(&self.db, provider.id, &or_model.id, &or_model).await?; - - if is_new { - models_created += 1; - } else { - models_updated += 1; - } + let (model_record, _is_new) = + match upsert_model(&self.db, provider.id, &or_model.id, or_model).await { + Ok((m, n)) => { + if n { + models_created += 1; + } else { + models_updated += 1; + } + (m, n) + } + Err(e) => { + slog::warn!( + self.logs, + "{}", + format!( + "sync_upstream_models: upsert_model error model={} {:?}", + or_model.id, e + ) + ); + continue; + } + }; let (version_record, version_is_new) = match upsert_version(&self.db, model_record.id).await { Ok(v) => v, Err(e) => { - slog::warn!(self.logs, "{}", format!("sync_upstream_models: upsert_version error: {:?}", e)); + slog::warn!( + self.logs, + "{}", + format!( + "sync_upstream_models: upsert_version error model={} {:?}", + or_model.id, e + ) + ); continue; } }; @@ -515,8 +859,17 @@ impl AppService { versions_created += 1; } - if let Err(e) = upsert_pricing(&self.db, version_record.id, or_model.pricing.as_ref()).await { - slog::warn!(self.logs, "{}", format!("sync_upstream_models: upsert_pricing error: {:?}", e)); + if let Err(e) = + upsert_pricing(&self.db, version_record.id, or_model.pricing.as_ref()).await + { + slog::warn!( + self.logs, + "{}", + format!( + "sync_upstream_models: upsert_pricing error model={} {:?}", + or_model.id, e + ) + ); } else { pricing_created += 1; } @@ -526,11 +879,25 @@ impl AppService { .await .unwrap_or(0); - if upsert_parameter_profile(&self.db, version_record.id, &or_model.id).await.unwrap_or(false) { + if upsert_parameter_profile(&self.db, version_record.id, &or_model.id) + .await + .unwrap_or(false) + { profiles_created += 1; } } + slog::info!( + self.logs, + "{}", + format!( + "sync_upstream_models: synced {} accessible models ({}/{} new/updated)", + filtered_count, + models_created, + models_updated + ) + ); + Ok(SyncModelsResponse { models_created, models_updated, @@ -541,54 +908,90 @@ impl AppService { }) } - /// Spawn a background task that syncs OpenRouter models immediately + /// Spawn a background task that syncs model metadata immediately /// and then every 10 minutes. Returns the `JoinHandle`. /// /// Failures are logged but do not stop the task — it keeps retrying. pub fn start_sync_task(self) -> JoinHandle<()> { let db = self.db.clone(); let log = self.logs.clone(); + let ai_api_key = self.config.ai_api_key().ok(); + let ai_base_url = self.config.ai_basic_url().ok(); tokio::spawn(async move { // Run once immediately on startup before taking traffic. - Self::sync_once(&db, &log).await; + Self::sync_once(&db, &log, ai_api_key.clone(), ai_base_url.clone()).await; let mut tick = interval(Duration::from_secs(60 * 10)); loop { tick.tick().await; - Self::sync_once(&db, &log).await; + Self::sync_once(&db, &log, ai_api_key.clone(), ai_base_url.clone()).await; } }) } /// Perform a single sync pass. Errors are logged and silently swallowed /// so the periodic task never stops. - async fn sync_once(db: &AppDatabase, log: &Logger) { - let client = reqwest::Client::new(); - let resp = match client - .get("https://openrouter.ai/api/v1/models") - .send() - .await - { - Ok(r) => match r.error_for_status() { - Ok(resp) => match resp.json::().await { - Ok(resp) => resp, - Err(e) => { - slog::error!(log, "{}", format!("OpenRouter model sync: failed to parse response: {}", e)); - return; - } - }, - Err(e) => { - slog::error!(log, "{}", format!("OpenRouter model sync: API error: {}", e)); - return; - } - }, - Err(e) => { - slog::error!(log, "{}", format!("OpenRouter model sync: request failed: {}", e)); + async fn sync_once( + db: &AppDatabase, + log: &Logger, + ai_api_key: Option, + ai_base_url: Option, + ) { + // Build AI client to list accessible models. + let ai_client = match build_ai_client_from_parts(ai_api_key, ai_base_url) { + Ok(c) => c, + Err(msg) => { + slog::warn!(log, "{}", format!("OpenRouter model sync: {}", msg)); return; } }; + let available_ids: std::collections::HashSet = match ai_client.models().list().await { + Ok(resp) => resp.data.into_iter().map(|m: OpenAiModel| m.id).collect(), + Err(e) => { + slog::warn!( + log, + "{}", + format!("OpenRouter model sync: failed to list available models: {}", e) + ); + return; + } + }; + + let http_client = reqwest::Client::new(); + let or_resp = match fetch_openrouter_models(&http_client, log).await { + Ok(r) => r, + Err(msg) => { + slog::warn!(log, "{}", format!("OpenRouter model sync: {}", msg)); + return; + } + }; + + let filtered: Vec<&OpenRouterModel> = or_resp + .data + .iter() + .filter(|m| available_ids.contains(&m.id)) + .filter(|m| m.id != "openrouter/auto") + .collect(); + + let filtered_count = filtered.len(); + + // Fallback: if no OpenRouter metadata matches, sync models directly from + // the user's endpoint (handles Bailian/MiniMax and other non-OpenRouter providers). + if filtered_count == 0 && !available_ids.is_empty() { + slog::info!( + log, + "{}", + format!( + "OpenRouter model sync: no matches, falling back to direct sync for {} models", + available_ids.len() + ) + ); + sync_models_direct(db, log, &available_ids).await; + return; + } + let mut models_created = 0i64; let mut models_updated = 0i64; let mut versions_created = 0i64; @@ -596,42 +999,61 @@ impl AppService { let mut capabilities_created = 0i64; let mut profiles_created = 0i64; - for or_model in resp.data { - if or_model.id == "openrouter/auto" { - continue; - } - + for or_model in filtered { let provider_slug = extract_provider(&or_model.id); let provider = match upsert_provider(db, provider_slug).await { Ok(p) => p, Err(e) => { - slog::warn!(log, "{}", format!("OpenRouter model sync: upsert_provider error: {:?}", e)); + slog::warn!( + log, + "{}", + format!( + "OpenRouter model sync: upsert_provider error provider={} {:?}", + provider_slug, e + ) + ); continue; } }; - let model_record = match upsert_model(db, provider.id, &or_model.id, &or_model).await { - Ok((m, true)) => { - models_created += 1; - m - } - Ok((m, false)) => { - models_updated += 1; - m - } - Err(e) => { - slog::warn!(log, "{}", format!("OpenRouter model sync: upsert_model error: {:?}", e)); - continue; - } - }; + let (model_record, _is_new) = + match upsert_model(db, provider.id, &or_model.id, or_model).await { + Ok((m, true)) => { + models_created += 1; + (m, true) + } + Ok((m, false)) => { + models_updated += 1; + (m, false) + } + Err(e) => { + slog::warn!( + log, + "{}", + format!( + "OpenRouter model sync: upsert_model error model={} {:?}", + or_model.id, e + ) + ); + continue; + } + }; - let (version_record, version_is_new) = match upsert_version(db, model_record.id).await { - Ok(v) => v, - Err(e) => { - slog::warn!(log, "{}", format!("OpenRouter model sync: upsert_version error: {:?}", e)); - continue; - } - }; + let (version_record, version_is_new) = + match upsert_version(db, model_record.id).await { + Ok(v) => v, + Err(e) => { + slog::warn!( + log, + "{}", + format!( + "OpenRouter model sync: upsert_version error model={} {:?}", + or_model.id, e + ) + ); + continue; + } + }; if version_is_new { versions_created += 1; } @@ -656,10 +1078,13 @@ impl AppService { } } - slog::info!(log, "{}", + slog::info!( + log, + "{}", format!( - "OpenRouter model sync complete: created={} updated={} \ + "OpenRouter model sync complete: matched={} created={} updated={} \ versions={} pricing={} capabilities={} profiles={}", + filtered_count, models_created, models_updated, versions_created, @@ -670,3 +1095,19 @@ impl AppService { ); } } + +/// Build an async_openai Client from raw API key and base URL (for background task). +fn build_ai_client_from_parts( + api_key: Option, + base_url: Option, +) -> Result, String> { + let api_key = api_key.ok_or_else(|| "AI API key not configured".to_string())?; + + let base_url = base_url.unwrap_or_else(|| "https://api.openai.com".into()); + + let cfg = OpenAIConfig::new() + .with_api_key(&api_key) + .with_api_base(&base_url); + + Ok(Client::with_config(cfg)) +}