diff --git a/libs/service/agent/billing.rs b/libs/service/agent/billing.rs index 116161a..02a1cd3 100644 --- a/libs/service/agent/billing.rs +++ b/libs/service/agent/billing.rs @@ -41,9 +41,7 @@ impl AppService { .await? { BillingResult::Success(record) => Ok(record), - BillingResult::InsufficientBalance { message } => { - Err(AppError::BadRequest(message)) - } + BillingResult::InsufficientBalance { message } => Err(AppError::BadRequest(message)), } } } diff --git a/libs/service/agent/code_review.rs b/libs/service/agent/code_review.rs index aef194b..84f9fb3 100644 --- a/libs/service/agent/code_review.rs +++ b/libs/service/agent/code_review.rs @@ -403,9 +403,18 @@ async fn call_ai_model( let messages = vec![agent::ChatRequestMessage::user(prompt.to_string())]; - agent::call_with_params(&messages, model_name, &client_config, 0.2, 8192, None, None, None) - .await - .map_err(|e| AppError::InternalServerError(format!("AI call failed: {}", e))) + agent::call_with_params( + &messages, + model_name, + &client_config, + 0.2, + 8192, + None, + None, + None, + ) + .await + .map_err(|e| AppError::InternalServerError(format!("AI call failed: {}", e))) } fn parse_ai_response(response: &str) -> Vec { diff --git a/libs/service/agent/issue_triage.rs b/libs/service/agent/issue_triage.rs index cacc250..c56be62 100644 --- a/libs/service/agent/issue_triage.rs +++ b/libs/service/agent/issue_triage.rs @@ -106,8 +106,7 @@ async fn call_ai_for_triage( .ai_basic_url() .unwrap_or_else(|_| "https://api.openai.com".into()); - let client_config = - ::agent::AiClientConfig::new(api_key).with_base_url(base_url); + let client_config = ::agent::AiClientConfig::new(api_key).with_base_url(base_url); let messages = vec![agent::ChatRequestMessage::user(prompt.to_string())]; @@ -122,9 +121,7 @@ async fn call_ai_for_triage( None, ) .await - .map_err(|e| { - AppError::InternalServerError(format!("AI triage call failed: {}", e)) - })?; + .map_err(|e| AppError::InternalServerError(format!("AI triage call failed: {}", e)))?; Ok(response) } @@ -137,7 +134,9 @@ impl AppService { project_name: String, issue_number: i64, ) -> Result { - let project = self.utils_find_project_by_name(project_name.clone()).await?; + let project = self + .utils_find_project_by_name(project_name.clone()) + .await?; let issue_model = issue::Entity::find() .filter(issue::Column::Project.eq(project.id)) @@ -168,8 +167,11 @@ impl AppService { } }; - let prompt = - build_triage_prompt(&issue_model.title, issue_model.body.as_deref(), &existing_labels); + let prompt = build_triage_prompt( + &issue_model.title, + issue_model.body.as_deref(), + &existing_labels, + ); let ai_response = match call_ai_for_triage(&model.name, &prompt, &self.config).await { Ok(r) => r, Err(e) => { diff --git a/libs/service/agent/model.rs b/libs/service/agent/model.rs index 8fdde71..09071a5 100644 --- a/libs/service/agent/model.rs +++ b/libs/service/agent/model.rs @@ -5,7 +5,10 @@ use crate::error::AppError; use session::Session; use uuid::Uuid; -pub use agent::model::model_entry::{CreateModelRequest, ModelListResponse, ModelResponse, UpdateModelRequest, ModelWithPricingResponse}; +pub use agent::model::model_entry::{ + CreateModelRequest, ModelListResponse, ModelResponse, ModelWithPricingResponse, + UpdateModelRequest, +}; impl AppService { pub async fn agent_model_list( @@ -30,7 +33,8 @@ impl AppService { search.as_deref(), page, per_page, - ).await?) + ) + .await?) } pub async fn agent_model_get( @@ -60,11 +64,7 @@ impl AppService { Ok(agent::model::model_entry::update_model(&self.db, id, request).await?) } - pub async fn agent_model_delete( - &self, - id: Uuid, - ctx: &Session, - ) -> Result<(), AppError> { + pub async fn agent_model_delete(&self, id: Uuid, ctx: &Session) -> Result<(), AppError> { super::provider::require_system_caller(ctx)?; Ok(agent::model::model_entry::delete_model(&self.db, id).await?) } diff --git a/libs/service/agent/model_capability.rs b/libs/service/agent/model_capability.rs index b965877..199111d 100644 --- a/libs/service/agent/model_capability.rs +++ b/libs/service/agent/model_capability.rs @@ -4,7 +4,9 @@ use crate::AppService; use crate::error::AppError; use session::Session; -pub use agent::model::capability::{CreateModelCapabilityRequest, ModelCapabilityResponse, UpdateModelCapabilityRequest}; +pub use agent::model::capability::{ + CreateModelCapabilityRequest, ModelCapabilityResponse, UpdateModelCapabilityRequest, +}; impl AppService { pub async fn agent_model_capability_list( diff --git a/libs/service/agent/model_parameter_profile.rs b/libs/service/agent/model_parameter_profile.rs index 2f62d32..9532335 100644 --- a/libs/service/agent/model_parameter_profile.rs +++ b/libs/service/agent/model_parameter_profile.rs @@ -5,7 +5,10 @@ use crate::error::AppError; use session::Session; use uuid::Uuid; -pub use agent::model::parameter_profile::{CreateModelParameterProfileRequest, ModelParameterProfileResponse, UpdateModelParameterProfileRequest}; +pub use agent::model::parameter_profile::{ + CreateModelParameterProfileRequest, ModelParameterProfileResponse, + UpdateModelParameterProfileRequest, +}; impl AppService { pub async fn agent_model_parameter_profile_list( @@ -13,7 +16,10 @@ impl AppService { model_version_id: Uuid, _ctx: &Session, ) -> Result, AppError> { - Ok(agent::model::parameter_profile::list_parameter_profiles(&self.db, model_version_id).await?) + Ok( + agent::model::parameter_profile::list_parameter_profiles(&self.db, model_version_id) + .await?, + ) } pub async fn agent_model_parameter_profile_get( @@ -40,7 +46,10 @@ impl AppService { ctx: &Session, ) -> Result { super::provider::require_system_caller(ctx)?; - Ok(agent::model::parameter_profile::update_parameter_profile(&self.db, id, request).await?) + Ok( + agent::model::parameter_profile::update_parameter_profile(&self.db, id, request) + .await?, + ) } pub async fn agent_model_parameter_profile_delete( diff --git a/libs/service/agent/model_pricing.rs b/libs/service/agent/model_pricing.rs index 7f4e4ad..97d1d47 100644 --- a/libs/service/agent/model_pricing.rs +++ b/libs/service/agent/model_pricing.rs @@ -5,7 +5,9 @@ use crate::error::AppError; use session::Session; use uuid::Uuid; -pub use agent::model::pricing::{CreateModelPricingRequest, ModelPricingResponse, UpdateModelPricingRequest}; +pub use agent::model::pricing::{ + CreateModelPricingRequest, ModelPricingResponse, UpdateModelPricingRequest, +}; impl AppService { pub async fn agent_model_pricing_list( @@ -43,11 +45,7 @@ impl AppService { Ok(agent::model::pricing::update_pricing(&self.db, id, request).await?) } - pub async fn agent_model_pricing_delete( - &self, - id: i64, - ctx: &Session, - ) -> Result<(), AppError> { + pub async fn agent_model_pricing_delete(&self, id: i64, ctx: &Session) -> Result<(), AppError> { super::provider::require_system_caller(ctx)?; Ok(agent::model::pricing::delete_pricing(&self.db, id).await?) } diff --git a/libs/service/agent/model_version.rs b/libs/service/agent/model_version.rs index 5d07a31..54e505d 100644 --- a/libs/service/agent/model_version.rs +++ b/libs/service/agent/model_version.rs @@ -5,7 +5,9 @@ use crate::error::AppError; use session::Session; use uuid::Uuid; -pub use agent::model::version::{CreateModelVersionRequest, ModelVersionResponse, UpdateModelVersionRequest}; +pub use agent::model::version::{ + CreateModelVersionRequest, ModelVersionResponse, UpdateModelVersionRequest, +}; impl AppService { pub async fn agent_model_version_list( diff --git a/libs/service/agent/pr_summary.rs b/libs/service/agent/pr_summary.rs index 45c4aac..dda0ba8 100644 --- a/libs/service/agent/pr_summary.rs +++ b/libs/service/agent/pr_summary.rs @@ -138,9 +138,18 @@ async fn call_ai_model_for_description( let messages = vec![agent::ChatRequestMessage::user(prompt.to_string())]; - agent::call_with_params(&messages, model_name, &client_config, 0.3, 4096, None, None, None) - .await - .map_err(|e| AppError::InternalServerError(format!("AI call failed: {}", e))) + agent::call_with_params( + &messages, + model_name, + &client_config, + 0.3, + 4096, + None, + None, + None, + ) + .await + .map_err(|e| AppError::InternalServerError(format!("AI call failed: {}", e))) } /// Extract JSON from a response that may contain markdown code fences. diff --git a/libs/service/agent/provider.rs b/libs/service/agent/provider.rs index 5160908..445fbb6 100644 --- a/libs/service/agent/provider.rs +++ b/libs/service/agent/provider.rs @@ -49,11 +49,7 @@ impl AppService { Ok(agent::model::provider::update_provider(&self.db, id, request).await?) } - pub async fn agent_provider_delete( - &self, - id: Uuid, - ctx: &Session, - ) -> Result<(), AppError> { + pub async fn agent_provider_delete(&self, id: Uuid, ctx: &Session) -> Result<(), AppError> { require_system_caller(ctx)?; Ok(agent::model::provider::delete_provider(&self.db, id).await?) } diff --git a/libs/service/agent/sync.rs b/libs/service/agent/sync.rs index 4a4970c..164937d 100644 --- a/libs/service/agent/sync.rs +++ b/libs/service/agent/sync.rs @@ -17,8 +17,8 @@ use std::time::Duration; use tokio::task::JoinHandle; use tokio::time::interval; -use crate::error::AppError; use crate::AppService; +use crate::error::AppError; use chrono::Utc; use db::database::AppDatabase; use models::agents::model::Entity as ModelEntity; @@ -28,8 +28,8 @@ use models::agents::model_provider::Entity as ProviderEntity; use models::agents::model_provider::Model as ProviderModel; use models::agents::model_version::Entity as VersionEntity; use models::agents::{CapabilityType, ModelCapability, ModelModality, ModelStatus}; -use sea_orm::prelude::*; use sea_orm::Set; +use sea_orm::prelude::*; use serde::Deserialize; use serde::Serialize; use session::Session; @@ -444,8 +444,8 @@ fn extract_model_name(model: &UpstreamModel) -> String { /// For models with the same name from different providers, keeps the newest one /// and deletes the older duplicates. async fn mark_all_models_offline(db: &AppDatabase) -> Result { - use models::agents::model::Entity as MEntity; use models::agents::model::Column as MCol; + use models::agents::model::Entity as MEntity; let now = Utc::now(); let updated = MEntity::update_many() @@ -569,8 +569,8 @@ async fn sync_models_from_upstream( /// Deactivates models that were previously marked offline and are not in any active sync. /// These are manually added models that are no longer needed. async fn deactivate_orphaned_models(db: &AppDatabase) -> Result { - use models::agents::model::Entity as MEntity; use models::agents::model::Column as MCol; + use models::agents::model::Entity as MEntity; let now = Utc::now(); let updated = MEntity::update_many() diff --git a/libs/service/auth/login.rs b/libs/service/auth/login.rs index 62992ea..7b201c4 100644 --- a/libs/service/auth/login.rs +++ b/libs/service/auth/login.rs @@ -1,5 +1,5 @@ -use crate::error::AppError; use crate::AppService; +use crate::error::AppError; use argon2::password_hash::{PasswordHasher, SaltString}; use argon2::{Argon2, PasswordHash, PasswordVerifier}; use models::users::{user_activity_log, user_password}; diff --git a/libs/service/auth/password.rs b/libs/service/auth/password.rs index bc2d205..9c58269 100644 --- a/libs/service/auth/password.rs +++ b/libs/service/auth/password.rs @@ -209,7 +209,10 @@ impl AppService { let salt = SaltString::generate(&mut rsa::rand_core::OsRng::default()); let new_password_hash = Argon2::default() - .hash_password(params.new_password.as_bytes(), Salt::from_b64(&*salt.to_string())?) + .hash_password( + params.new_password.as_bytes(), + Salt::from_b64(&*salt.to_string())?, + ) .map_err(|_| AppError::PasswordHashError("hash failed".to_string()))? .to_string(); @@ -218,11 +221,17 @@ impl AppService { let mut active_password: user_password::ActiveModel = user_password.into(); active_password.password_hash = Set(new_password_hash); active_password.password_salt = Set(Some(salt.to_string())); - active_password.update(&txn).await.map_err(|_| AppError::TxnError)?; + active_password + .update(&txn) + .await + .map_err(|_| AppError::TxnError)?; let mut used_token: user_password_reset::ActiveModel = reset_token.clone().into(); used_token.used = Set(true); - used_token.update(&txn).await.map_err(|_| AppError::TxnError)?; + used_token + .update(&txn) + .await + .map_err(|_| AppError::TxnError)?; let _ = user_activity_log::ActiveModel { user_uid: Set(Some(reset_token.user_uid)), @@ -251,7 +260,10 @@ impl AppService { .await .map_err(|_| AppError::UserNotFound)?; - tracing::info!(count = result.rows_affected, "Expired password reset tokens cleaned up"); + tracing::info!( + count = result.rows_affected, + "Expired password reset tokens cleaned up" + ); Ok(result.rows_affected) } } diff --git a/libs/service/auth/rsa.rs b/libs/service/auth/rsa.rs index b75550c..3cece85 100644 --- a/libs/service/auth/rsa.rs +++ b/libs/service/auth/rsa.rs @@ -6,8 +6,8 @@ use hkdf::Hkdf; use rsa::pkcs1::{DecodeRsaPrivateKey, EncodeRsaPrivateKey, EncodeRsaPublicKey}; use rsa::{Pkcs1v15Encrypt, RsaPrivateKey, RsaPublicKey}; use serde::{Deserialize, Serialize}; -use sha2::Sha256; use session::Session; +use sha2::Sha256; #[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)] pub struct RsaResponse { @@ -21,12 +21,16 @@ impl AppService { /// Derive a ChaCha20-Poly1305 encryption key from APP_SESSION_SECRET via HKDF-SHA256. fn derive_rsa_encryption_key(&self) -> [u8; 32] { - let secret = self.config.env.get("APP_SESSION_SECRET") + let secret = self + .config + .env + .get("APP_SESSION_SECRET") .map(|s| s.as_str()) .expect("APP_SESSION_SECRET must be set in production. Do not use fallback keys."); let hk = Hkdf::::new(Some(b"rsa-session-encryption"), secret.as_bytes()); let mut okm = [0u8; 32]; - hk.expand(b"rsa-private-key-aead", &mut okm).expect("HKDF expand within hash length"); + hk.expand(b"rsa-private-key-aead", &mut okm) + .expect("HKDF expand within hash length"); okm } @@ -37,7 +41,8 @@ impl AppService { .expect("32-byte key is valid for ChaCha20Poly1305"); let nonce_bytes: [u8; 12] = rand::random(); let nonce = chacha20poly1305::aead::generic_array::GenericArray::from_slice(&nonce_bytes); - let ciphertext = cipher.encrypt(nonce, plaintext.as_bytes()) + let ciphertext = cipher + .encrypt(nonce, plaintext.as_bytes()) .map_err(|_| AppError::RsaGenerationError)?; let mut combined = nonce_bytes.to_vec(); combined.extend_from_slice(&ciphertext); @@ -55,22 +60,34 @@ impl AppService { if combined.len() < 12 { return Err(AppError::RsaDecodeError); } - let nonce = chacha20poly1305::aead::generic_array::GenericArray::from_slice(&combined[..12]); - let plaintext = cipher.decrypt(nonce, &combined[12..]) + let nonce = + chacha20poly1305::aead::generic_array::GenericArray::from_slice(&combined[..12]); + let plaintext = cipher + .decrypt(nonce, &combined[12..]) .map_err(|_| AppError::RsaDecodeError)?; Ok(String::from_utf8(plaintext).map_err(|_| AppError::RsaDecodeError)?) } pub async fn auth_rsa(&self, context: &Session) -> Result { - if context.get::(Self::RSA_PRIVATE_KEY).ok().flatten().is_some() - && context.get::(Self::RSA_PUBLIC_KEY).ok().flatten().is_some() + if context + .get::(Self::RSA_PRIVATE_KEY) + .ok() + .flatten() + .is_some() + && context + .get::(Self::RSA_PUBLIC_KEY) + .ok() + .flatten() + .is_some() { let pub_pem = context .get::(Self::RSA_PUBLIC_KEY) .ok() .flatten() .expect("checked above"); - return Ok(RsaResponse { public_key: pub_pem }); + return Ok(RsaResponse { + public_key: pub_pem, + }); } let mut rng = rsa::rand_core::OsRng; @@ -126,4 +143,4 @@ impl AppService { }; Ok(String::from_utf8_lossy(&decrypted).to_string()) } -} \ No newline at end of file +} diff --git a/libs/service/auth/totp.rs b/libs/service/auth/totp.rs index 58209a0..5169370 100644 --- a/libs/service/auth/totp.rs +++ b/libs/service/auth/totp.rs @@ -1,4 +1,4 @@ -use sha2::{Sha256, Digest}; +use sha2::{Digest, Sha256}; use crate::AppService; use crate::error::AppError; @@ -210,7 +210,8 @@ impl AppService { .ok_or(AppError::TwoFactorNotSetup)? .into(); - active_model.backup_codes = Set(serde_json::json!(Self::hash_backup_codes(&backup_codes))); + active_model.backup_codes = + Set(serde_json::json!(Self::hash_backup_codes(&backup_codes))); active_model.updated_at = Set(chrono::Utc::now()); active_model.update(&self.db).await?; @@ -363,7 +364,11 @@ impl AppService { fn hash_backup_code(code: &str) -> String { let mut hasher = Sha256::new(); hasher.update(code.as_bytes()); - hasher.finalize().iter().map(|b| format!("{:02x}", b)).collect::() + hasher + .finalize() + .iter() + .map(|b| format!("{:02x}", b)) + .collect::() } fn hash_backup_codes(codes: &[String]) -> Vec { @@ -387,7 +392,7 @@ impl AppService { } fn generate_totp_code(&self, secret: &str, counter: u64) -> Result { - use hmac::{Hmac, Mac, KeyInit}; + use hmac::{Hmac, KeyInit, Mac}; use sha1::Sha1; let secret_bytes = self.decode_base32(secret)?; diff --git a/libs/service/chat/access.rs b/libs/service/chat/access.rs index 480e9be..9393998 100644 --- a/libs/service/chat/access.rs +++ b/libs/service/chat/access.rs @@ -32,11 +32,7 @@ pub enum AccessLevel { /// with the given `access_visibility` setting. /// /// Hierarchical override: higher roles always see lower roles' chats. -pub fn can_view( - creator_role: MemberRole, - user_role: MemberRole, - access_visibility: &str, -) -> bool { +pub fn can_view(creator_role: MemberRole, user_role: MemberRole, access_visibility: &str) -> bool { // Owner sees everything if user_role == MemberRole::Owner { return true; @@ -59,11 +55,7 @@ pub fn can_view( } /// Check whether `user_role` can ASK (send messages) in a conversation. -pub fn can_ask( - creator_role: MemberRole, - user_role: MemberRole, - can_ask_setting: &str, -) -> bool { +pub fn can_ask(creator_role: MemberRole, user_role: MemberRole, can_ask_setting: &str) -> bool { // Same hierarchy as viewing if user_role == MemberRole::Owner { return true; @@ -148,7 +140,8 @@ pub async fn check_conversation_access( } // Get creator's role - let Some(creator_role) = resolve_project_role(db, project_id, conversation.user_id).await? else { + let Some(creator_role) = resolve_project_role(db, project_id, conversation.user_id).await? + else { return Ok(AccessLevel::Denied); }; diff --git a/libs/service/chat/conversation.rs b/libs/service/chat/conversation.rs index 21aabb7..a2c15dd 100644 --- a/libs/service/chat/conversation.rs +++ b/libs/service/chat/conversation.rs @@ -1,7 +1,10 @@ +use crate::error::AppError; use models::ai::{AiConversation, ai_conversation}; use models::projects::MemberRole; -use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, QuerySelect, Set}; -use crate::error::AppError; +use sea_orm::{ + ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, + QuerySelect, Set, +}; use uuid::Uuid; use crate::AppService; @@ -26,9 +29,8 @@ impl AppService { if c.user_id != user_id { // For project conversations, check access control if c.project_id.is_some() { - let access = super::access::check_conversation_access( - &self.db, &c, user_id, - ).await?; + let access = + super::access::check_conversation_access(&self.db, &c, user_id).await?; if access != super::AccessLevel::Denied { return Ok(c); } @@ -48,9 +50,7 @@ impl AppService { return Ok(c); } if c.project_id.is_some() { - let access = super::access::check_conversation_access( - &self.db, &c, user_id, - ).await?; + let access = super::access::check_conversation_access(&self.db, &c, user_id).await?; if access != super::AccessLevel::Denied { return Ok(c); } @@ -58,6 +58,37 @@ impl AppService { Err(AppError::PermissionDenied) } + pub async fn find_conversation_full_access( + &self, + conversation_id: Uuid, + user_id: Uuid, + ) -> Result { + let c = self.find_conversation(conversation_id).await?; + if c.user_id == user_id { + return Ok(c); + } + if c.project_id.is_some() { + let access = super::access::check_conversation_access(&self.db, &c, user_id).await?; + if access == super::AccessLevel::Full { + return Ok(c); + } + } + Err(AppError::PermissionDenied) + } + + pub async fn find_conversation_creator( + &self, + conversation_id: Uuid, + user_id: Uuid, + ) -> Result { + let c = self.find_conversation(conversation_id).await?; + if c.user_id == user_id { + Ok(c) + } else { + Err(AppError::PermissionDenied) + } + } + pub async fn create_conversation( &self, user_id: Uuid, @@ -81,30 +112,33 @@ impl AppService { // Auto-increment project_uid let next_uid = self.next_project_chat_uid(pid).await?; let now = chrono::Utc::now(); - let conv = ai_conversation::ActiveModel { - id: Set(Uuid::new_v4()), - user_id: Set(user_id), - project_id: Set(Some(pid)), - scope: Set("project".to_string()), - title: Set(title), - model: Set(model), - model_config: Set(model_config), - status: Set("active".to_string()), - root_message_id: Set(None), - fork_count: Set(0), - is_shared: Set(false), - message_count: Set(0), - token_usage_total: Set(None), - access_visibility: Set(access_visibility.unwrap_or_else(|| "owner".to_string())), - can_ask: Set(can_ask.unwrap_or_else(|| "owner".to_string())), - project_uid: Set(Some(next_uid)), - model_uid: Set(model_uid), - model_name: Set(model_name), - created_at: Set(now), - updated_at: Set(now), - } - .insert(self.db.writer()) - .await?; + let conv = + ai_conversation::ActiveModel { + id: Set(Uuid::new_v4()), + user_id: Set(user_id), + project_id: Set(Some(pid)), + scope: Set("project".to_string()), + title: Set(title), + model: Set(model), + model_config: Set(model_config), + status: Set("active".to_string()), + root_message_id: Set(None), + fork_count: Set(0), + is_shared: Set(false), + message_count: Set(0), + token_usage_total: Set(None), + access_visibility: Set( + access_visibility.unwrap_or_else(|| "owner".to_string()) + ), + can_ask: Set(can_ask.unwrap_or_else(|| "owner".to_string())), + project_uid: Set(Some(next_uid)), + model_uid: Set(model_uid), + model_name: Set(model_name), + created_at: Set(now), + updated_at: Set(now), + } + .insert(self.db.writer()) + .await?; observability::incr!(observability::AI_CHAT_CONVERSATIONS_CREATED); return Ok(conv); } @@ -165,10 +199,9 @@ impl AppService { user_id: Uuid, project_id: Option, page_size: u64, + search_query: Option, ) -> Result, AppError> { - let mut query = - AiConversation::find() - .order_by_desc(ai_conversation::Column::UpdatedAt); + let mut query = AiConversation::find().order_by_desc(ai_conversation::Column::UpdatedAt); if let Some(pid) = project_id { // For project chats, apply visibility rules @@ -176,14 +209,25 @@ impl AppService { match role { Some(r) => { query = query.filter(ai_conversation::Column::ProjectId.eq(pid)); - // Filter visible conversations based on role - // Owner sees all; Admin sees own + member-visible; Member sees only member-visible + own - if !matches!(r, MemberRole::Owner) { - // Not owner, so apply visibility filter: - // - Own conversations - // - OR access_visibility = "member" (for member) or "admin"/"member" (for admin) - // - OR hierarchical: admin sees member creator's chats + let convs = query + .paginate(self.db.reader(), page_size.saturating_mul(4).max(page_size)) + .fetch_page(0) + .await?; + let mut visible = Vec::new(); + for conv in convs { + if conv.user_id == user_id || matches!(r, MemberRole::Owner) { + visible.push(conv); + } else if super::access::check_conversation_access(&self.db, &conv, user_id) + .await? + != super::AccessLevel::Denied + { + visible.push(conv); + } + if visible.len() >= page_size as usize { + break; + } } + return Ok(visible); } None => { // Not a project member — only show own chats @@ -199,7 +243,17 @@ impl AppService { .filter(ai_conversation::Column::ProjectId.is_null()); } - let convs = query.paginate(self.db.reader(), page_size).fetch_page(0).await?; + // Apply search filter if provided + if let Some(ref q) = search_query { + if !q.is_empty() { + query = query.filter(ai_conversation::Column::Title.contains(q)); + } + } + + let convs = query + .paginate(self.db.reader(), page_size) + .fetch_page(0) + .await?; Ok(convs) } @@ -217,7 +271,9 @@ impl AppService { model_uid: Option, model_name: Option, ) -> Result<(), AppError> { - let c = self.find_conversation_owned(conversation_id, user_id).await?; + let c = self + .find_conversation_creator(conversation_id, user_id) + .await?; let mut active: ai_conversation::ActiveModel = c.into(); if let Some(t) = title { @@ -254,7 +310,8 @@ impl AppService { conversation_id: Uuid, user_id: Uuid, ) -> Result<(), AppError> { - self.find_conversation_owned(conversation_id, user_id).await?; + self.find_conversation_creator(conversation_id, user_id) + .await?; AiConversation::delete_by_id(conversation_id) .exec(self.db.writer()) .await?; diff --git a/libs/service/chat/fork.rs b/libs/service/chat/fork.rs index 7ec3344..4ad7fdf 100644 --- a/libs/service/chat/fork.rs +++ b/libs/service/chat/fork.rs @@ -1,51 +1,149 @@ -use models::ai::{AiMessage, ai_conversation, ai_message, ai_message_fork}; -use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set}; use crate::error::AppError; +use models::ai::{AiMessage, ai_conversation, ai_message, ai_message_fork}; +use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, QueryOrder, Set}; use uuid::Uuid; use crate::AppService; impl AppService { - pub async fn fork_message( + /// Fork a conversation from a specific message: creates a new conversation + /// with all messages up to and including the source message, then copies + /// the source message content as a new user message in the forked conversation. + /// + /// Returns the new conversation. + pub async fn fork_conversation_from_message( &self, - conversation_id: Uuid, user_id: Uuid, + conversation_id: Uuid, source_message_id: Uuid, - target_message_id: Uuid, - ) -> Result { - let c = self.find_conversation_owned(conversation_id, user_id).await?; + ) -> Result { + let c = self + .find_conversation_owned(conversation_id, user_id) + .await?; - // Mark source as fork origin - let mut source: ai_message::ActiveModel = AiMessage::find_by_id(source_message_id) + // Verify source message exists in this conversation + let source_msg = AiMessage::find_by_id(source_message_id) .one(self.db.reader()) .await? - .ok_or_else(|| AppError::NotFound("message".into()))? - .into(); + .ok_or_else(|| AppError::NotFound("message".into()))?; + + if source_msg.conversation_id != conversation_id { + return Err(AppError::NotFound("message not in conversation".into())); + } + + // Get all messages in the conversation up to the source message + let all_messages = AiMessage::find() + .filter(ai_message::Column::ConversationId.eq(conversation_id)) + .filter(ai_message::Column::IsLatest.eq(true)) + .order_by_asc(ai_message::Column::CreatedAt) + .all(self.db.reader()) + .await?; + + // Find the index of the source message in the ordered list + let source_idx = all_messages + .iter() + .position(|m| m.id == source_message_id) + .ok_or_else(|| AppError::NotFound("source message not found in conversation".into()))?; + + // Messages to copy: up to and including the source message + let messages_to_copy = &all_messages[..=source_idx]; + + // Create new conversation + let new_conv_id = Uuid::new_v4(); + let now = chrono::Utc::now(); + let new_conv = ai_conversation::ActiveModel { + id: Set(new_conv_id), + user_id: Set(user_id), + project_id: Set(c.project_id), + scope: Set(c.scope.clone()), + title: Set(c.title.clone().map(|t| format!("Fork: {}", t))), + model: Set(c.model.clone()), + model_config: Set(c.model_config.clone()), + status: Set("active".to_string()), + root_message_id: Set(None), + fork_count: Set(0), + is_shared: Set(false), + message_count: Set(0), + token_usage_total: Set(None), + access_visibility: Set(c.access_visibility.clone()), + can_ask: Set(c.can_ask.clone()), + project_uid: Set(c.project_uid), + model_uid: Set(c.model_uid), + model_name: Set(c.model_name.clone()), + created_at: Set(now), + updated_at: Set(now), + } + .insert(self.db.writer()) + .await?; + + // Copy messages to the new conversation + let mut prev_msg_id: Option = None; + let mut msg_count = 0; + + for msg in messages_to_copy { + let new_msg_id = Uuid::now_v7(); + ai_message::ActiveModel { + id: Set(new_msg_id), + conversation_id: Set(new_conv_id), + parent_message_id: Set(prev_msg_id), + role: Set(msg.role.clone()), + content: Set(msg.content.clone()), + model: Set(msg.model.clone()), + is_fork_origin: Set(false), + stop_reason: Set(None), + input_tokens: Set(None), + output_tokens: Set(None), + latency_ms: Set(None), + metadata: Set(None), + room_id: Set(None), + version_group_id: Set(Some(new_msg_id)), + version_number: Set(1), + is_latest: Set(true), + created_at: Set(chrono::Utc::now()), + } + .insert(self.db.writer()) + .await?; + + prev_msg_id = Some(new_msg_id); + msg_count += 1; + } + + // Update conversation message count + let mut updated: ai_conversation::ActiveModel = new_conv.clone().into(); + updated.message_count = Set(msg_count); + updated.updated_at = Set(chrono::Utc::now()); + updated.update(self.db.writer()).await?; + + // Mark source message as fork origin + let mut source: ai_message::ActiveModel = source_msg.into(); source.is_fork_origin = Set(true); source.update(self.db.writer()).await?; // Create fork record - let fork_record = ai_message_fork::ActiveModel { + ai_message_fork::ActiveModel { id: Set(Uuid::new_v4()), - conversation_id: Set(Some(conversation_id)), + conversation_id: Set(Some(new_conv_id)), source_message_id: Set(source_message_id), - fork_message_id: Set(target_message_id), + fork_message_id: Set(prev_msg_id.unwrap_or(source_message_id)), created_at: Set(chrono::Utc::now()), } .insert(self.db.writer()) .await?; - // Update conversation fork_count - let fork_count = c.fork_count; - let root_msg_id = c.root_message_id; - let mut updated: ai_conversation::ActiveModel = c.into(); - updated.fork_count = Set(fork_count + 1); - if root_msg_id.is_none() { - updated.root_message_id = Set(Some(target_message_id)); - } - updated.update(self.db.writer()).await?; + // Update original conversation fork_count + let original_fork_count = c.fork_count; + let mut orig_updated: ai_conversation::ActiveModel = c.into(); + orig_updated.fork_count = Set(original_fork_count + 1); + orig_updated.updated_at = Set(chrono::Utc::now()); + orig_updated.update(self.db.writer()).await?; - Ok(fork_record) + // Return the new conversation + let new_conv = ai_conversation::Entity::find_by_id(new_conv_id) + .one(self.db.reader()) + .await? + .ok_or_else(|| AppError::NotFound("conversation".into()))?; + + Ok(new_conv) } /// List all fork records for a message within a conversation. @@ -55,11 +153,11 @@ impl AppService { user_id: Uuid, source_message_id: Uuid, ) -> Result, AppError> { - self.find_conversation_owned(conversation_id, user_id).await?; + self.find_conversation_owned(conversation_id, user_id) + .await?; let forks = ai_message_fork::Entity::find() .filter(ai_message_fork::Column::SourceMessageId.eq(source_message_id)) - .filter(ai_message_fork::Column::ConversationId.eq(conversation_id)) .all(self.db.reader()) .await?; diff --git a/libs/service/chat/message.rs b/libs/service/chat/message.rs index 907d85c..70a8996 100644 --- a/libs/service/chat/message.rs +++ b/libs/service/chat/message.rs @@ -1,6 +1,8 @@ -use models::ai::{AiMessage, ai_conversation, ai_message}; -use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QuerySelect, QueryFilter, QueryOrder, Set}; use crate::error::AppError; +use models::ai::{AiMessage, ai_conversation, ai_message}; +use sea_orm::{ + ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, QueryOrder, QuerySelect, Set, +}; use uuid::Uuid; use crate::AppService; @@ -22,7 +24,8 @@ impl AppService { user_id: Uuid, limit: u64, ) -> Result, AppError> { - self.find_conversation_owned(conversation_id, user_id).await?; + self.find_conversation_owned(conversation_id, user_id) + .await?; // Only return latest versions for each version group let msgs = AiMessage::find() @@ -48,13 +51,24 @@ impl AppService { metadata: Option, room_id: Option, ) -> Result { - let c = self.find_conversation_owned(conversation_id, user_id).await?; + let c = self + .find_conversation_full_access(conversation_id, user_id) + .await?; + + if role != "user" { + return Err(AppError::PermissionDenied); + } + + if let Some(parent_id) = parent_message_id { + let parent = self.find_message(parent_id).await?; + if parent.conversation_id != conversation_id || !parent.is_latest { + return Err(AppError::NotFound("parent message".into())); + } + } // For project chats, non-owner must also have can_ask permission if c.user_id != user_id && c.project_id.is_some() { - let access = super::access::check_conversation_access( - &self.db, &c, user_id, - ).await?; + let access = super::access::check_conversation_access(&self.db, &c, user_id).await?; if access != super::AccessLevel::Full { return Err(AppError::PermissionDenied); } @@ -99,7 +113,8 @@ impl AppService { user_id: Uuid, message_id: Uuid, ) -> Result { - self.find_conversation_owned(conversation_id, user_id).await?; + self.find_conversation_owned(conversation_id, user_id) + .await?; let msg = self.find_message(message_id).await?; if msg.conversation_id != conversation_id { @@ -114,26 +129,34 @@ impl AppService { user_id: Uuid, message_id: Uuid, ) -> Result<(), AppError> { - let c = self.find_conversation_owned(conversation_id, user_id).await?; + let c = self + .find_conversation_full_access(conversation_id, user_id) + .await?; // For project chats, non-owner must also have can_ask permission if c.user_id != user_id && c.project_id.is_some() { - let access = super::access::check_conversation_access( - &self.db, &c, user_id, - ).await?; + let access = super::access::check_conversation_access(&self.db, &c, user_id).await?; if access != super::AccessLevel::Full { return Err(AppError::PermissionDenied); } } - let mut msg: ai_message::ActiveModel = AiMessage::find_by_id(message_id) + let existing = AiMessage::find_by_id(message_id) .one(self.db.reader()) .await? - .ok_or_else(|| AppError::NotFound("message".into()))? - .into(); + .ok_or_else(|| AppError::NotFound("message".into()))?; + if existing.conversation_id != conversation_id { + return Err(AppError::NotFound("message".into())); + } + + let mut msg: ai_message::ActiveModel = existing.into(); msg.stop_reason = Set(Some("stop".to_string())); msg.update(self.db.writer()).await?; + + // Signal cancellation to the active stream + self.cache.set_chat_stream_cancelled(conversation_id).await; + Ok(()) } @@ -147,13 +170,13 @@ impl AppService { message_id: Uuid, new_content: String, ) -> Result { - let c = self.find_conversation_owned(conversation_id, user_id).await?; + let c = self + .find_conversation_full_access(conversation_id, user_id) + .await?; // For project chats, non-owner must also have can_ask permission if c.user_id != user_id && c.project_id.is_some() { - let access = super::access::check_conversation_access( - &self.db, &c, user_id, - ).await?; + let access = super::access::check_conversation_access(&self.db, &c, user_id).await?; if access != super::AccessLevel::Full { return Err(AppError::PermissionDenied); } @@ -244,7 +267,8 @@ impl AppService { user_id: Uuid, message_id: Uuid, ) -> Result, AppError> { - self.find_conversation_owned(conversation_id, user_id).await?; + self.find_conversation_owned(conversation_id, user_id) + .await?; let msg = self.find_message(message_id).await?; if msg.conversation_id != conversation_id { @@ -271,7 +295,9 @@ impl AppService { message_id: Uuid, target_version_number: i32, ) -> Result { - let c = self.find_conversation_owned(conversation_id, user_id).await?; + let c = self + .find_conversation_full_access(conversation_id, user_id) + .await?; if c.user_id != user_id { return Err(AppError::PermissionDenied); @@ -316,7 +342,8 @@ impl AppService { } // Find the target version and mark it as latest - let target = all_versions.iter() + let target = all_versions + .iter() .find(|v| v.version_number == target_version_number) .ok_or_else(|| AppError::NotFound("version".into()))?; @@ -357,13 +384,13 @@ impl AppService { user_id: Uuid, message_id: Uuid, ) -> Result { - let c = self.find_conversation_owned(conversation_id, user_id).await?; + let c = self + .find_conversation_full_access(conversation_id, user_id) + .await?; // For project chats, non-owner must also have can_ask permission if c.user_id != user_id && c.project_id.is_some() { - let access = super::access::check_conversation_access( - &self.db, &c, user_id, - ).await?; + let access = super::access::check_conversation_access(&self.db, &c, user_id).await?; if access != super::AccessLevel::Full { return Err(AppError::PermissionDenied); } @@ -451,7 +478,8 @@ impl AppService { user_id: Uuid, parent_message_id: Uuid, ) -> Result, AppError> { - self.find_conversation_owned(conversation_id, user_id).await?; + self.find_conversation_owned(conversation_id, user_id) + .await?; let msgs = AiMessage::find() .filter(ai_message::Column::ConversationId.eq(conversation_id)) diff --git a/libs/service/chat/mod.rs b/libs/service/chat/mod.rs index 08927c1..80966ab 100644 --- a/libs/service/chat/mod.rs +++ b/libs/service/chat/mod.rs @@ -4,4 +4,7 @@ pub mod fork; pub mod message; pub mod share; -pub use access::{AccessLevel, can_view, can_ask, can_create, check_access, check_conversation_access, resolve_project_role}; +pub use access::{ + AccessLevel, can_ask, can_create, can_view, check_access, check_conversation_access, + resolve_project_role, +}; diff --git a/libs/service/chat/share.rs b/libs/service/chat/share.rs index a8f1b6a..3251bed 100644 --- a/libs/service/chat/share.rs +++ b/libs/service/chat/share.rs @@ -1,6 +1,6 @@ +use crate::error::AppError; use models::ai::{AiSharedConversation, ai_conversation, ai_shared_conversation}; use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, Set}; -use crate::error::AppError; use uuid::Uuid; use crate::AppService; @@ -11,7 +11,9 @@ impl AppService { conversation_id: Uuid, user_id: Uuid, ) -> Result<(ai_shared_conversation::Model, String), AppError> { - let c = self.find_conversation_owned(conversation_id, user_id).await?; + let c = self + .find_conversation_owned(conversation_id, user_id) + .await?; let share_token = Uuid::new_v4().to_string().replace("-", ""); let now = chrono::Utc::now(); diff --git a/libs/service/git/commit.rs b/libs/service/git/commit.rs index baf08ec..c0a7933 100644 --- a/libs/service/git/commit.rs +++ b/libs/service/git/commit.rs @@ -820,7 +820,8 @@ impl AppService { // Cache miss: compute count and cache it. let computed = git_spawn!(repo, domain -> { domain.commit_count(from.as_deref(), to.as_deref()) - }).unwrap_or(0); + }) + .unwrap_or(0); if let Err(e) = conn .set_ex::( total_cache_key.clone(), @@ -838,7 +839,8 @@ impl AppService { // No cache: compute directly. git_spawn!(repo, domain -> { domain.commit_count(from.as_deref(), to.as_deref()) - }).unwrap_or(0) + }) + .unwrap_or(0) }; let total_pages = if total == 0 { diff --git a/libs/service/git/init.rs b/libs/service/git/init.rs index 462df57..1f09000 100644 --- a/libs/service/git/init.rs +++ b/libs/service/git/init.rs @@ -38,7 +38,11 @@ impl AppService { }) } - pub async fn git_open_bare(&self, path: String, ctx: &Session) -> Result { + pub async fn git_open_bare( + &self, + path: String, + ctx: &Session, + ) -> Result { let _user_uid = ctx.user().ok_or(AppError::Unauthorized)?; let domain = git::GitDomain::open_bare(&path).map_err(AppError::from)?; Ok(GitInitResponse { diff --git a/libs/service/git/refs.rs b/libs/service/git/refs.rs index b93bd41..60eeb00 100644 --- a/libs/service/git/refs.rs +++ b/libs/service/git/refs.rs @@ -8,11 +8,7 @@ use session::Session; /// Delete all cached ref list entries for a given namespace/repo. /// Redis DEL does not support glob patterns, so we SCAN and delete each key. -async fn invalidate_ref_cache( - cache: &db::cache::AppCache, - namespace: &str, - repo_name: &str, -) { +async fn invalidate_ref_cache(cache: &db::cache::AppCache, namespace: &str, repo_name: &str) { let prefix = format!("git:ref:list:{}:{}:", namespace, repo_name); if let Ok(mut conn) = cache.conn().await { let pattern = format!("{}*", prefix); diff --git a/libs/service/git/repo.rs b/libs/service/git/repo.rs index edd0144..c4cbf3b 100644 --- a/libs/service/git/repo.rs +++ b/libs/service/git/repo.rs @@ -478,14 +478,16 @@ impl AppService { active.update(&txn).await?; txn.commit().await?; - self.room.publish_room_event( - repo.project, - room::RoomEventType::RepoUpdated, - None, - None, - None, - None, - ).await; + self.room + .publish_room_event( + repo.project, + room::RoomEventType::RepoUpdated, + None, + None, + None, + None, + ) + .await; Ok(()) } diff --git a/libs/service/git/tag.rs b/libs/service/git/tag.rs index 3e7dcd8..1f3a9d4 100644 --- a/libs/service/git/tag.rs +++ b/libs/service/git/tag.rs @@ -399,10 +399,8 @@ impl AppService { let info = tokio::task::spawn_blocking(move || { let domain = git::GitDomain::from_model(repo)?; let tagger = git::CommitSignature { - name: tagger_name - .unwrap_or_else(|| "Anonymous".to_string()), - email: tagger_email - .unwrap_or_else(|| "anonymous@example.com".to_string()), + name: tagger_name.unwrap_or_else(|| "Anonymous".to_string()), + email: tagger_email.unwrap_or_else(|| "anonymous@example.com".to_string()), time_secs: chrono::Utc::now().timestamp(), offset_minutes: 0, }; diff --git a/libs/service/git/tree.rs b/libs/service/git/tree.rs index e943ce8..77d4288 100644 --- a/libs/service/git/tree.rs +++ b/libs/service/git/tree.rs @@ -12,7 +12,9 @@ pub struct TreeGetQuery { #[serde(default = "default_tree_limit")] pub limit: usize, } -fn default_tree_limit() -> usize { 1000 } +fn default_tree_limit() -> usize { + 1000 +} #[derive(Debug, Clone, Deserialize)] pub struct TreeEntryQuery { @@ -184,8 +186,11 @@ impl AppService { .map_err(|e| AppError::InternalServerError(format!("Task join error: {}", e)))? .map_err(AppError::from)?; - let response: Vec = - entries.into_iter().take(query.limit).map(TreeEntryResponse::from).collect(); + let response: Vec = entries + .into_iter() + .take(query.limit) + .map(TreeEntryResponse::from) + .collect(); if let Ok(mut conn) = self.cache.conn().await { if let Err(e) = conn diff --git a/libs/service/issue/issue.rs b/libs/service/issue/issue.rs index 0fb47ba..d138de1 100644 --- a/libs/service/issue/issue.rs +++ b/libs/service/issue/issue.rs @@ -1,8 +1,8 @@ -use crate::error::AppError; use crate::AppService; +use crate::error::AppError; use chrono::Utc; use models::issues::{ - issue, issue_assignee, issue_comment, issue_label, issue_repo, issue_subscriber, IssueState, + IssueState, issue, issue_assignee, issue_comment, issue_label, issue_repo, issue_subscriber, }; use models::projects::project_members; use models::users::user; diff --git a/libs/service/issue/label.rs b/libs/service/issue/label.rs index 5eed154..963be9b 100644 --- a/libs/service/issue/label.rs +++ b/libs/service/issue/label.rs @@ -25,7 +25,10 @@ fn default_color_for_label(name: &str) -> String { let lower = name.to_lowercase(); if lower.contains("bug") || lower.contains("critical") || lower.contains("security") { "ef4444".to_string() - } else if lower.contains("enhancement") || lower.contains("feature") || lower.contains("improvement") { + } else if lower.contains("enhancement") + || lower.contains("feature") + || lower.contains("improvement") + { "22c55e".to_string() } else if lower.contains("documentation") || lower.contains("docs") { "3b82f6".to_string() diff --git a/libs/service/issue/mod.rs b/libs/service/issue/mod.rs index df0c31e..0d2f773 100644 --- a/libs/service/issue/mod.rs +++ b/libs/service/issue/mod.rs @@ -16,7 +16,10 @@ pub use comment::{ pub use issue::{ IssueCreateRequest, IssueListResponse, IssueResponse, IssueSummaryResponse, IssueUpdateRequest, }; -pub use label::{CreateLabelRequest, IssueAddLabelRequest, IssueAddLabelsByNamesRequest, IssueLabelResponse, LabelResponse}; +pub use label::{ + CreateLabelRequest, IssueAddLabelRequest, IssueAddLabelsByNamesRequest, IssueLabelResponse, + LabelResponse, +}; pub use pull_request::{IssueLinkPullRequestRequest, IssuePullRequestResponse}; pub use reaction::{ReactionAddRequest, ReactionListResponse, ReactionResponse}; pub use repo::{IssueLinkRepoRequest, IssueRepoResponse}; diff --git a/libs/service/lib.rs b/libs/service/lib.rs index fdd2f57..53c0eef 100644 --- a/libs/service/lib.rs +++ b/libs/service/lib.rs @@ -3,17 +3,17 @@ use std::sync::Arc; use ::agent::chat::ChatService; use ::agent::client::AiClientConfig; use ::agent::tool::ToolRegistry; -use ::agent::{new_embed_client, EmbedService, TaskService}; +use ::agent::{EmbedService, TaskService, new_embed_client}; use avatar::AppAvatar; use config::AppConfig; use db::cache::AppCache; use db::database::AppDatabase; use email::AppEmail; use queue::{ - start_email_worker, EmailEnvelope, EmailSendFn, EmailSendFut, MessageProducer, NatsClient, + EmailEnvelope, EmailSendFn, EmailSendFut, MessageProducer, NatsClient, start_email_worker, }; -use room::metrics::RoomMetrics; use room::RoomService; +use room::metrics::RoomMetrics; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use ws_token::WsTokenService; @@ -147,6 +147,11 @@ impl AppService { (Ok(api_key), Ok(base_url)) => { tracing::info!(url = %base_url, "AI chat enabled"); let ai_client_config = AiClientConfig::new(api_key).with_base_url(&base_url); + let compact_service = ::agent::CompactService::new( + db.writer().clone(), + ai_client_config.clone(), + "gpt-4o-mini".to_string(), + ); let mut registry = ToolRegistry::new(); fctool::git_tools::register_all(&mut registry); fctool::file_tools::register_all(&mut registry); @@ -154,6 +159,7 @@ impl AppService { fctool::chat_tools::register_all(&mut registry); let mut chat_svc = ChatService::new() .with_ai_client_config(ai_client_config) + .with_compact_service(compact_service) .with_tool_registry(registry); if let Some(ref es) = embed_service { chat_svc = chat_svc.with_embed_service((**es).clone()); diff --git a/libs/service/project/billing.rs b/libs/service/project/billing.rs index 5606c58..6ecb88f 100644 --- a/libs/service/project/billing.rs +++ b/libs/service/project/billing.rs @@ -11,7 +11,6 @@ use session::Session; use utoipa::{IntoParams, ToSchema}; use uuid::Uuid; - #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct ProjectBillingCurrentResponse { pub project_uid: Uuid, @@ -216,16 +215,19 @@ impl AppService { .await?; Ok(BillingErrorsResponse { - list: errors.into_iter().map(|e| BillingErrorItem { - id: e.id, - scope: e.scope, - scope_id: e.scope_id, - error_type: e.error_type, - message: e.message, - details: e.details, - resolved: e.resolved, - created_at: e.created_at, - }).collect(), + list: errors + .into_iter() + .map(|e| BillingErrorItem { + id: e.id, + scope: e.scope, + scope_id: e.scope_id, + error_type: e.error_type, + message: e.message, + details: e.details, + resolved: e.resolved, + created_at: e.created_at, + }) + .collect(), }) } } diff --git a/libs/service/project/init.rs b/libs/service/project/init.rs index f4416ef..1cda75b 100644 --- a/libs/service/project/init.rs +++ b/libs/service/project/init.rs @@ -91,7 +91,9 @@ impl AppService { observability::incr!(observability::PROJECTS_CREATED_TOTAL); // Initialize project billing ($20 for first project, $0 otherwise) - if let Err(e) = agent::billing::initialize_project_billing(&self.db, _project.id, user.uid).await { + if let Err(e) = + agent::billing::initialize_project_billing(&self.db, _project.id, user.uid).await + { tracing::warn!(project_id = %_project.id, error = %e, "Failed to initialize project billing — non-critical, continuing"); } diff --git a/libs/service/project/invitation.rs b/libs/service/project/invitation.rs index 212571b..ea93b1b 100644 --- a/libs/service/project/invitation.rs +++ b/libs/service/project/invitation.rs @@ -2,7 +2,7 @@ use crate::AppService; use crate::error::AppError; use chrono::{DateTime, Utc}; use models::projects::{ - project, MemberRole, project_audit_log, project_member_invitations, project_members, + MemberRole, project, project_audit_log, project_member_invitations, project_members, }; use models::users::{user, user_email}; use sea_orm::*; @@ -330,7 +330,10 @@ impl AppService { target_uid, crate::push::PushPayload { title: format!("Project invitation: {}", project.name), - body: format!("{} invited you to join \"{}\" as {:?}", inviter.username, project.name, scope), + body: format!( + "{} invited you to join \"{}\" as {:?}", + inviter.username, project.name, scope + ), url: Some(format!("/projects/{}/invitations", project.name)), icon: None, }, @@ -341,7 +344,10 @@ impl AppService { .notification_create(room::NotificationCreateRequest { notification_type: room::NotificationType::ProjectInvitation, user_id: target_uid, - title: format!("{} invited you to join \"{}\"", inviter.username, project.name), + title: format!( + "{} invited you to join \"{}\"", + inviter.username, project.name + ), content: Some(format!("Role: {:?}", scope)), room_id: None, project_id: project.id, diff --git a/libs/service/project/members.rs b/libs/service/project/members.rs index 652d366..dfb4e5f 100644 --- a/libs/service/project/members.rs +++ b/libs/service/project/members.rs @@ -4,8 +4,8 @@ use chrono::Utc; use models::projects::{MemberRole, project_audit_log, project_members}; use models::rooms::{room, room_user_state}; use models::users::user; -use sea_orm::*; use sea_orm::sea_query::OnConflict; +use sea_orm::*; use serde::{Deserialize, Serialize}; use session::Session; use utoipa::ToSchema; @@ -148,9 +148,7 @@ impl AppService { ctx: &Session, ) -> Result { let user_uid = ctx.user().ok_or(AppError::Unauthorized)?; - let project = self - .utils_find_project_by_name(project_name) - .await?; + let project = self.utils_find_project_by_name(project_name).await?; let _requester_member = project_members::Entity::find() .filter(project_members::Column::Project.eq(project.id)) @@ -195,7 +193,8 @@ impl AppService { }) .collect(); - let mut groups: std::collections::BTreeMap> = std::collections::BTreeMap::new(); + let mut groups: std::collections::BTreeMap> = + std::collections::BTreeMap::new(); for m in member_infos { let role_str = m.scope.to_string(); groups.entry(role_str).or_default().push(m); @@ -208,8 +207,14 @@ impl AppService { .collect(); sorted_groups.sort_by(|a, b| { - let pa = role_priority.iter().position(|&r| r == a.role).unwrap_or(99); - let pb = role_priority.iter().position(|&r| r == b.role).unwrap_or(99); + let pa = role_priority + .iter() + .position(|&r| r == a.role) + .unwrap_or(99); + let pb = role_priority + .iter() + .position(|&r| r == b.role) + .unwrap_or(99); pa.cmp(&pb) }); diff --git a/libs/service/project/repo.rs b/libs/service/project/repo.rs index c7f318a..6f5911b 100644 --- a/libs/service/project/repo.rs +++ b/libs/service/project/repo.rs @@ -1,7 +1,7 @@ -use crate::error::AppError; use crate::AppService; +use crate::error::AppError; use chrono::{DateTime, Utc}; -use models::projects::{project_members, Project}; +use models::projects::{Project, project_members}; use models::repos::repo::{ ActiveModel as RepoActiveModel, Column as RepoColumn, Entity as RepoEntity, }; @@ -348,14 +348,16 @@ impl AppService { ) .await; - self.room.publish_room_event( - project.id, - room::RoomEventType::RepoCreated, - None, - None, - None, - None, - ).await; + self.room + .publish_room_event( + project.id, + room::RoomEventType::RepoCreated, + None, + None, + None, + None, + ) + .await; observability::incr!(observability::REPOS_CREATED_TOTAL); Ok(ProjectRepoCreateResponse { diff --git a/libs/service/project/stats.rs b/libs/service/project/stats.rs index bdfc383..6618d70 100644 --- a/libs/service/project/stats.rs +++ b/libs/service/project/stats.rs @@ -1,11 +1,11 @@ use crate::AppService; use crate::error::AppError; use chrono::{DateTime, Utc}; +use models::ai::ai_token_usage; +use models::issues::issue; use models::projects::{project_activity, project_members}; use models::repos::repo; use models::rooms::{room, room_ai}; -use models::issues::issue; -use models::ai::ai_token_usage; use models::users::user; use sea_orm::*; use serde::{Deserialize, Serialize}; @@ -152,7 +152,12 @@ impl AppService { .count(&self.db) .await?; - (pr_total as i64, pr_open as i64, pr_merged as i64, pr_closed as i64) + ( + pr_total as i64, + pr_open as i64, + pr_merged as i64, + pr_closed as i64, + ) }; // ── Room count ────────────────────────────────────────── @@ -200,12 +205,16 @@ impl AppService { .all(&self.db) .await?; - let input_sum = token_records.iter().map(|t| t.input_tokens as i64).sum::(); - let output_sum = token_records.iter().map(|t| t.output_tokens as i64).sum::(); - let cost_sum: rust_decimal::Decimal = token_records + let input_sum = token_records .iter() - .filter_map(|t| t.cost_usd) - .sum(); + .map(|t| t.input_tokens as i64) + .sum::(); + let output_sum = token_records + .iter() + .map(|t| t.output_tokens as i64) + .sum::(); + let cost_sum: rust_decimal::Decimal = + token_records.iter().filter_map(|t| t.cost_usd).sum(); let cost_str = if cost_sum != rust_decimal::Decimal::ZERO { Some(cost_sum.to_string()) @@ -226,7 +235,8 @@ impl AppService { .await?; // Group by event_type - let mut breakdown_map: std::collections::HashMap = std::collections::HashMap::new(); + let mut breakdown_map: std::collections::HashMap = + std::collections::HashMap::new(); for a in &activity_rows { *breakdown_map.entry(a.event_type.clone()).or_insert(0) += 1; } @@ -246,18 +256,27 @@ impl AppService { // Enrich with actor info let actor_ids: Vec = recent.iter().map(|a| a.actor).collect(); - let actor_map: std::collections::HashMap)> = if actor_ids.is_empty() { - std::collections::HashMap::new() - } else { - let users = user::Entity::find() - .filter(user::Column::Uid.is_in(actor_ids)) - .all(&self.db) - .await?; - users - .into_iter() - .map(|u| (u.uid, (u.display_name.or(Some(u.username)).unwrap_or_default(), u.avatar_url))) - .collect() - }; + let actor_map: std::collections::HashMap)> = + if actor_ids.is_empty() { + std::collections::HashMap::new() + } else { + let users = user::Entity::find() + .filter(user::Column::Uid.is_in(actor_ids)) + .all(&self.db) + .await?; + users + .into_iter() + .map(|u| { + ( + u.uid, + ( + u.display_name.or(Some(u.username)).unwrap_or_default(), + u.avatar_url, + ), + ) + }) + .collect() + }; let recent_activities: Vec = recent .into_iter() @@ -299,4 +318,4 @@ impl AppService { recent_activities, }) } -} \ No newline at end of file +} diff --git a/libs/service/pull_request/merge.rs b/libs/service/pull_request/merge.rs index 6de57b5..fecd415 100644 --- a/libs/service/pull_request/merge.rs +++ b/libs/service/pull_request/merge.rs @@ -170,8 +170,9 @@ impl AppService { .ref_target(&head_ref_name)? .ok_or_else(|| AppError::BadRequest("Head ref has no OID".to_string()))?; let (analysis, _pref) = domain.merge_analysis_for_ref(&base, &head_oid)?; - let has_conflicts = - !analysis.is_fast_forward && !analysis.is_up_to_date && domain.merge_is_conflicted(); + let has_conflicts = !analysis.is_fast_forward + && !analysis.is_up_to_date + && domain.merge_is_conflicted(); let conflicted_files = if has_conflicts { let index = domain .repo() @@ -289,7 +290,8 @@ impl AppService { let (analysis, _pref) = domain.merge_analysis_for_ref(&pr_base, &head_oid)?; - if !analysis.is_fast_forward && !analysis.is_up_to_date && domain.merge_is_conflicted() { + if !analysis.is_fast_forward && !analysis.is_up_to_date && domain.merge_is_conflicted() + { return Err(AppError::BadRequest( "Pull request has merge conflicts".to_string(), )); diff --git a/libs/service/pull_request/review_comment.rs b/libs/service/pull_request/review_comment.rs index d2d5b3f..16db5ee 100644 --- a/libs/service/pull_request/review_comment.rs +++ b/libs/service/pull_request/review_comment.rs @@ -137,7 +137,11 @@ impl AppService { } let total = stmt.clone().count(&self.db).await? as i64; - let comments = stmt.limit(query.limit.unwrap_or(200)).offset(query.offset.unwrap_or(0)).all(&self.db).await?; + let comments = stmt + .limit(query.limit.unwrap_or(200)) + .offset(query.offset.unwrap_or(0)) + .all(&self.db) + .await?; let author_ids: Vec = comments.iter().map(|c| c.author).collect(); let authors = if author_ids.is_empty() { diff --git a/libs/service/push.rs b/libs/service/push.rs index 0cc47ae..326d6fc 100644 --- a/libs/service/push.rs +++ b/libs/service/push.rs @@ -1,10 +1,10 @@ use std::sync::Arc; -use anyhow::{bail, Context}; +use anyhow::{Context, bail}; use base64ct::{Base64UrlUnpadded, Encoding}; use serde::Serialize; use web_push_native::{ - jwt_simple::algorithms::ES256KeyPair, p256::PublicKey, Auth, WebPushBuilder, + Auth, WebPushBuilder, jwt_simple::algorithms::ES256KeyPair, p256::PublicKey, }; #[derive(Clone)] @@ -63,8 +63,8 @@ impl WebPushService { let ua_public_bytes = Base64UrlUnpadded::decode_vec(p256dh) .with_context(|| format!("Failed to decode p256dh: {}", p256dh))?; - let ua_public = PublicKey::from_sec1_bytes(&ua_public_bytes) - .with_context(|| "Invalid p256dh key")?; + let ua_public = + PublicKey::from_sec1_bytes(&ua_public_bytes).with_context(|| "Invalid p256dh key")?; let auth_bytes = Base64UrlUnpadded::decode_vec(auth) .with_context(|| format!("Failed to decode auth: {}", auth))?; @@ -76,8 +76,8 @@ impl WebPushService { .with_vapid(&self.vapid_key_pair, &self.sender_email) .build(payload_bytes)?; - let reqwest_request = reqwest::Request::try_from(request) - .context("Failed to convert web-push request")?; + let reqwest_request = + reqwest::Request::try_from(request).context("Failed to convert web-push request")?; let response = self.http.execute(reqwest_request).await?; let status = response.status(); diff --git a/libs/service/search/service.rs b/libs/service/search/service.rs index dd604f4..9e80ff8 100644 --- a/libs/service/search/service.rs +++ b/libs/service/search/service.rs @@ -63,7 +63,8 @@ fn parse_types(types: Option) -> Vec { } fn build_like_pattern(q: &str) -> String { - let escaped = q.trim() + let escaped = q + .trim() .replace('\\', "\\\\") .replace('%', "\\%") .replace('_', "\\_"); @@ -678,15 +679,19 @@ impl AppService { AND m.revoked IS NULL ORDER BY m.send_at DESC LIMIT $3 OFFSET $4"#, - tsquery, - tsquery + tsquery, tsquery ); // Results query let results_sql = Statement::from_sql_and_values( DbBackend::Postgres, &sql, - vec![q.into(), accessible_rooms.clone().into(), per_page.into(), offset.into()], + vec![ + q.into(), + accessible_rooms.clone().into(), + per_page.into(), + offset.into(), + ], ); let rows = self.db.query_all_raw(results_sql).await?; @@ -694,11 +699,11 @@ impl AppService { for row in rows { let room_id: Uuid = row.try_get::("", "room").unwrap_or_default(); let sender_type_str = row.try_get::("", "sender_type").unwrap_or_default(); - let content_type_str = row.try_get::("", "content_type").unwrap_or_default(); + let content_type_str = row + .try_get::("", "content_type") + .unwrap_or_default(); - let highlighted = row - .try_get::("", "highlighted_content") - .ok(); + let highlighted = row.try_get::("", "highlighted_content").ok(); messages.push(GlobalMessageSearchItem { id: row.try_get::("", "id").unwrap_or_default(), @@ -709,7 +714,9 @@ impl AppService { display_name: None, content: row.try_get::("", "content").unwrap_or_default(), content_type: content_type_str, - send_at: row.try_get::>("", "send_at").unwrap_or_default(), + send_at: row + .try_get::>("", "send_at") + .unwrap_or_default(), highlighted_content: highlighted, }); } diff --git a/libs/service/skill/manage.rs b/libs/service/skill/manage.rs index ef58bb4..1c8964b 100644 --- a/libs/service/skill/manage.rs +++ b/libs/service/skill/manage.rs @@ -1,11 +1,11 @@ //! Create, update, delete project skills. +use super::info::SkillResponse; use crate::AppService; use crate::error::AppError; -use super::info::SkillResponse; use chrono::Utc; -use models::projects::project_skill::{Column as C, Entity as SkillEntity}; use models::ActiveModelTrait; +use models::projects::project_skill::{Column as C, Entity as SkillEntity}; use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, Set}; use serde::{Deserialize, Serialize}; use session::Session; @@ -64,9 +64,7 @@ impl AppService { let project_id = Uuid::parse_str(&project_uuid) .map_err(|_| AppError::BadRequest("Invalid project UUID".to_string()))?; - let user_id = ctx - .user() - .ok_or_else(|| AppError::Unauthorized)?; + let user_id = ctx.user().ok_or_else(|| AppError::Unauthorized)?; // Check for duplicate slug within project let exists = SkillEntity::find() @@ -82,7 +80,9 @@ impl AppService { } let now = Utc::now(); - let metadata = request.metadata.unwrap_or(serde_json::Value::Object(Default::default())); + let metadata = request + .metadata + .unwrap_or(serde_json::Value::Object(Default::default())); let name = request.name.unwrap_or_else(|| request.slug.clone()); let active = models::projects::project_skill::ActiveModel { @@ -114,7 +114,10 @@ impl AppService { let scontent = inserted.content.clone(); let sproj = inserted.project_uuid.to_string(); tokio::spawn(async move { - if let Err(e) = es.embed_skill(sid, &sname, sdesc.as_deref(), &scontent, &sproj).await { + if let Err(e) = es + .embed_skill(sid, &sname, sdesc.as_deref(), &scontent, &sproj) + .await + { tracing::warn!(error = %e, skill_id = %sid, "failed to embed skill"); } }); @@ -170,7 +173,10 @@ impl AppService { let scontent = updated.content.clone(); let sproj = updated.project_uuid.to_string(); tokio::spawn(async move { - if let Err(e) = es.embed_skill(sid, &sname, sdesc.as_deref(), &scontent, &sproj).await { + if let Err(e) = es + .embed_skill(sid, &sname, sdesc.as_deref(), &scontent, &sproj) + .await + { tracing::warn!(error = %e, skill_id = %sid, "failed to re-embed skill on update"); } }); diff --git a/libs/service/skill/scan.rs b/libs/service/skill/scan.rs index a2e1835..fba2853 100644 --- a/libs/service/skill/scan.rs +++ b/libs/service/skill/scan.rs @@ -2,8 +2,8 @@ use crate::AppService; use crate::error::AppError; -use models::repos::repo::Entity as RepoEntity; use models::repos::repo::Column as RCol; +use models::repos::repo::Entity as RepoEntity; use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; use uuid::Uuid; diff --git a/libs/service/skill/scanner.rs b/libs/service/skill/scanner.rs index 62fe721..2ac1747 100644 --- a/libs/service/skill/scanner.rs +++ b/libs/service/skill/scanner.rs @@ -115,7 +115,8 @@ pub fn scan_repo_for_skills( .map(|s| s.to_lowercase()) == Some("skill.md".to_string()) { - if let Some(dir_name) = path.parent() + if let Some(dir_name) = path + .parent() .and_then(|p| p.file_name()) .and_then(|n| n.to_str()) .filter(|s| !s.starts_with('.')) @@ -168,7 +169,9 @@ pub fn scan_repo_tree_for_skills( match entry.kind() { Some(git2::ObjectType::Tree) => { if !name.starts_with('.') { - if let Ok(subtree) = entry.to_object(git_repo).and_then(|o| o.peel_to_tree()) { + if let Ok(subtree) = + entry.to_object(git_repo).and_then(|o| o.peel_to_tree()) + { stack.push((subtree, entry_path)); } } @@ -220,7 +223,11 @@ pub async fn scan_and_sync_skills( } }; - let commit_sha = git_repo.head().ok().and_then(|h| h.target()).map(|oid| oid.to_string()); + let commit_sha = git_repo + .head() + .ok() + .and_then(|h| h.target()) + .map(|oid| oid.to_string()); // For bare repos (no workdir), scan git tree objects directly let mut discovered = if git_repo.is_bare() || git_repo.workdir().is_none() { @@ -265,13 +272,20 @@ async fn sync_discovered_skills( let mut updated = 0i64; // Deduplicate by {repo_id}+{blob_hash}, keep latest by commit_sha - let mut deduped: std::collections::HashMap = std::collections::HashMap::new(); + let mut deduped: std::collections::HashMap = + std::collections::HashMap::new(); for skill in discovered { - let key = format!("{}:{}", repo_id, skill.blob_hash.as_ref().unwrap_or(&skill.slug)); + let key = format!( + "{}:{}", + repo_id, + skill.blob_hash.as_ref().unwrap_or(&skill.slug) + ); match deduped.get(&key) { Some(existing) => { // Keep the one with the later commit_sha - if skill.commit_sha.as_ref().unwrap_or(&String::new()) > existing.commit_sha.as_ref().unwrap_or(&String::new()) { + if skill.commit_sha.as_ref().unwrap_or(&String::new()) + > existing.commit_sha.as_ref().unwrap_or(&String::new()) + { deduped.insert(key, skill); } } @@ -292,7 +306,11 @@ async fn sync_discovered_skills( let existing_by_hash: std::collections::HashMap<_, _> = existing .into_iter() .map(|s| { - let key = format!("{}:{}", s.repo_id.unwrap_or_default(), s.blob_hash.clone().unwrap_or_default()); + let key = format!( + "{}:{}", + s.repo_id.unwrap_or_default(), + s.blob_hash.clone().unwrap_or_default() + ); (key, s) }) .collect(); diff --git a/libs/service/storage.rs b/libs/service/storage.rs index e33ced7..f5c0cf8 100644 --- a/libs/service/storage.rs +++ b/libs/service/storage.rs @@ -39,11 +39,7 @@ impl AppStorage { } /// Write data to a local path and return the public URL. - pub async fn upload( - &self, - key: &str, - data: Vec, - ) -> anyhow::Result { + pub async fn upload(&self, key: &str, data: Vec) -> anyhow::Result { let safe_path = Self::sanitize_key(key)?; let path = self.base_path.join(safe_path); @@ -54,11 +50,7 @@ impl AppStorage { tokio::fs::write(&path, &data).await?; - let url = format!( - "{}/{}", - self.public_url_base.trim_end_matches('/'), - key - ); + let url = format!("{}/{}", self.public_url_base.trim_end_matches('/'), key); Ok(url) } diff --git a/libs/service/user/access_key.rs b/libs/service/user/access_key.rs index d035837..c7de5f2 100644 --- a/libs/service/user/access_key.rs +++ b/libs/service/user/access_key.rs @@ -222,8 +222,8 @@ impl AppService { } fn user_hash_access_key(&self, access_key: &str) -> String { - use argon2::password_hash::{SaltString, PasswordHasher}; use argon2::Argon2; + use argon2::password_hash::{PasswordHasher, SaltString}; let salt = SaltString::generate(&mut rsa::rand_core::OsRng::default()); Argon2::default() .hash_password(access_key.as_bytes(), &salt) diff --git a/libs/service/user/billing.rs b/libs/service/user/billing.rs index 7580d67..a374d23 100644 --- a/libs/service/user/billing.rs +++ b/libs/service/user/billing.rs @@ -75,10 +75,28 @@ impl AppService { ) -> Result { let user_uid = ctx.user().ok_or(AppError::Unauthorized)?; - let billing = user_billing::Entity::find_by_id(user_uid) + let billing = match user_billing::Entity::find_by_id(user_uid) .one(&self.db) .await? - .ok_or_else(|| AppError::InternalServerError("User billing not found".into()))?; + { + Some(b) => b, + None => { + let now = Utc::now(); + let active = user_billing::ActiveModel { + user: Set(user_uid), + balance: Set(rust_decimal::Decimal::from(10)), + currency: Set("USD".to_string()), + is_pro: Set(false), + monthly_quota: Set(rust_decimal::Decimal::from(0)), + month_used: Set(rust_decimal::Decimal::from(0)), + cycle_start: Set(None), + cycle_end: Set(None), + updated_at: Set(now), + created_at: Set(now), + }; + active.insert(&self.db).await? + } + }; Ok(UserBillingResponse { user_uid: billing.user, @@ -109,16 +127,19 @@ impl AppService { .await?; Ok(UserBillingErrorsResponse { - list: errors.into_iter().map(|e| UserBillingErrorItem { - id: e.id, - scope: e.scope, - scope_id: e.scope_id, - error_type: e.error_type, - message: e.message, - details: e.details, - resolved: e.resolved, - created_at: e.created_at, - }).collect(), + list: errors + .into_iter() + .map(|e| UserBillingErrorItem { + id: e.id, + scope: e.scope, + scope_id: e.scope_id, + error_type: e.error_type, + message: e.message, + details: e.details, + resolved: e.resolved, + created_at: e.created_at, + }) + .collect(), }) } @@ -138,17 +159,25 @@ impl AppService { let total = paginator.num_items().await?; let rows = paginator.fetch_page(page - 1).await?; - let list = rows.into_iter().map(|x| UserBillingHistoryItem { - uid: x.uid, - project_uid: x.project, - user_uid: x.user, - amount: x.amount.to_f64().unwrap_or_default(), - currency: x.currency, - reason: x.reason, - extra: x.extra.map(|v| v.into()), - created_at: x.created_at, - }).collect(); + let list = rows + .into_iter() + .map(|x| UserBillingHistoryItem { + uid: x.uid, + project_uid: x.project, + user_uid: x.user, + amount: x.amount.to_f64().unwrap_or_default(), + currency: x.currency, + reason: x.reason, + extra: x.extra.map(|v| v.into()), + created_at: x.created_at, + }) + .collect(); - Ok(UserBillingHistoryResponse { page, per_page, total, list }) + Ok(UserBillingHistoryResponse { + page, + per_page, + total, + list, + }) } -} \ No newline at end of file +} diff --git a/libs/service/user/chpc.rs b/libs/service/user/chpc.rs index dce8562..5a0be17 100644 --- a/libs/service/user/chpc.rs +++ b/libs/service/user/chpc.rs @@ -78,8 +78,12 @@ impl AppService { return Ok(response); } - let start_dt = start_date.and_hms_opt(0, 0, 0).unwrap_or(chrono::NaiveDateTime::MIN); - let end_dt = end_date.and_hms_opt(23, 59, 59).unwrap_or(chrono::NaiveDateTime::MAX); + let start_dt = start_date + .and_hms_opt(0, 0, 0) + .unwrap_or(chrono::NaiveDateTime::MIN); + let end_dt = end_date + .and_hms_opt(23, 59, 59) + .unwrap_or(chrono::NaiveDateTime::MAX); let commits: Vec = repo_commit::Entity::find() .filter(repo_commit::Column::AuthorEmail.is_in(emails.clone())) diff --git a/libs/service/user/notification.rs b/libs/service/user/notification.rs index 5800fea..26f1c38 100644 --- a/libs/service/user/notification.rs +++ b/libs/service/user/notification.rs @@ -135,8 +135,10 @@ impl AppService { active_prefs.push_subscription_keys_auth = Set(None); } else { active_prefs.push_subscription_endpoint = Set(Some(endpoint)); - active_prefs.push_subscription_keys_p256dh = Set(params.push_subscription_keys_p256dh.clone()); - active_prefs.push_subscription_keys_auth = Set(params.push_subscription_keys_auth.clone()); + active_prefs.push_subscription_keys_p256dh = + Set(params.push_subscription_keys_p256dh.clone()); + active_prefs.push_subscription_keys_auth = + Set(params.push_subscription_keys_auth.clone()); } } active_prefs.updated_at = Set(Utc::now()); @@ -222,10 +224,7 @@ impl AppService { Ok(NotificationPreferencesResponse::from(created_prefs)) } - pub async fn user_unsubscribe_push( - &self, - context: &Session, - ) -> Result<(), AppError> { + pub async fn user_unsubscribe_push(&self, context: &Session) -> Result<(), AppError> { let user_uid = context.user().ok_or(AppError::Unauthorized)?; let prefs = user_notification::Entity::find_by_id(user_uid) diff --git a/libs/service/user/projects.rs b/libs/service/user/projects.rs index f749fda..78a169d 100644 --- a/libs/service/user/projects.rs +++ b/libs/service/user/projects.rs @@ -81,13 +81,20 @@ impl AppService { // Union + dedup (preserving first occurrence order) let mut project_ids: Vec = created_projects; let project_id_set: std::collections::HashSet<&Uuid> = project_ids.iter().collect(); - let new_ids: Vec = member_projects.into_iter().filter(|id| !project_id_set.contains(id)).collect(); + let new_ids: Vec = member_projects + .into_iter() + .filter(|id| !project_id_set.contains(id)) + .collect(); project_ids.extend(new_ids); let total_count = project_ids.len() as u64; // Paginate - let page_ids: Vec = project_ids.into_iter().skip(offset as usize).take(per_page as usize).collect(); + let page_ids: Vec = project_ids + .into_iter() + .skip(offset as usize) + .take(per_page as usize) + .collect(); if page_ids.is_empty() { return Ok(UserProjectsResponse { @@ -105,8 +112,14 @@ impl AppService { // Preserve the order from project_ids (created projects first, then member projects) let mut sorted_projects = project_list; sorted_projects.sort_by(|a, b| { - let a_idx = page_ids.iter().position(|&x| x == a.id).unwrap_or(usize::MAX); - let b_idx = page_ids.iter().position(|&x| x == b.id).unwrap_or(usize::MAX); + let a_idx = page_ids + .iter() + .position(|&x| x == a.id) + .unwrap_or(usize::MAX); + let b_idx = page_ids + .iter() + .position(|&x| x == b.id) + .unwrap_or(usize::MAX); a_idx.cmp(&b_idx) }); diff --git a/libs/service/user/repository.rs b/libs/service/user/repository.rs index a3161f5..7f56a7b 100644 --- a/libs/service/user/repository.rs +++ b/libs/service/user/repository.rs @@ -1,16 +1,16 @@ use crate::AppService; use crate::error::AppError; use chrono::Utc; -use models::repos::repo; use models::projects::project; +use models::repos::repo; use models::users::user; use sea_orm::prelude::*; use sea_orm::*; use serde::{Deserialize, Serialize}; use session::Session; +use std::collections::HashMap; use utoipa::{IntoParams, ToSchema}; use uuid::Uuid; -use std::collections::HashMap; #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct UserRepoInfo { diff --git a/libs/service/user/stars.rs b/libs/service/user/stars.rs index 150c16b..96ed062 100644 --- a/libs/service/user/stars.rs +++ b/libs/service/user/stars.rs @@ -115,13 +115,14 @@ impl AppService { let followed_project_ids: Vec = follows.iter().map(|f| f.project).collect(); - let followed_projects: std::collections::HashMap = project::Entity::find() - .filter(project::Column::Id.is_in(followed_project_ids)) - .all(&self.db) - .await? - .into_iter() - .map(|p| (p.id, p)) - .collect(); + let followed_projects: std::collections::HashMap = + project::Entity::find() + .filter(project::Column::Id.is_in(followed_project_ids)) + .all(&self.db) + .await? + .into_iter() + .map(|p| (p.id, p)) + .collect(); let mut project_items: Vec = Vec::new(); for follow in &follows { diff --git a/libs/service/user/subscribe.rs b/libs/service/user/subscribe.rs index fd1d931..76b276f 100644 --- a/libs/service/user/subscribe.rs +++ b/libs/service/user/subscribe.rs @@ -182,13 +182,14 @@ impl AppService { let followed_uids: Vec = following.iter().map(|f| f.target).collect(); - let followed_users: std::collections::HashMap = models::users::user::Entity::find() - .filter(models::users::user::Column::Uid.is_in(followed_uids.clone())) - .all(&self.db) - .await? - .into_iter() - .map(|u| (u.uid, u)) - .collect(); + let followed_users: std::collections::HashMap = + models::users::user::Entity::find() + .filter(models::users::user::Column::Uid.is_in(followed_uids.clone())) + .all(&self.db) + .await? + .into_iter() + .map(|u| (u.uid, u)) + .collect(); // If current user is logged in, check who they also follow let current_follows: std::collections::HashSet = if let Some(uid) = current_uid { diff --git a/libs/service/user/summary.rs b/libs/service/user/summary.rs index d76d8e2..1b81b7a 100644 --- a/libs/service/user/summary.rs +++ b/libs/service/user/summary.rs @@ -1,13 +1,13 @@ +use super::chpc::{ContributionHeatmapQuery, ContributionHeatmapResponse}; +use super::projects::{UserProjectInfo, UserProjectsQuery}; +use super::repository::{UserRepoInfo, UserReposQuery}; +use super::user_activity::{UserActivityItem, UserActivityQuery}; +use super::user_info::UserInfoExternal; use crate::AppService; use crate::error::AppError; use serde::{Deserialize, Serialize}; use session::Session; use utoipa::ToSchema; -use super::user_info::UserInfoExternal; -use super::repository::{UserRepoInfo, UserReposQuery}; -use super::projects::{UserProjectInfo, UserProjectsQuery}; -use super::user_activity::{UserActivityItem, UserActivityQuery}; -use super::chpc::{ContributionHeatmapResponse, ContributionHeatmapQuery}; #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct UserSummaryResponse { @@ -28,19 +28,61 @@ impl AppService { username: String, ) -> Result { let info = self.user_info(context.clone(), username.clone()).await?; - - let repos_resp = self.get_user_repos(context.clone(), username.clone(), UserReposQuery { page: Some(1), per_page: Some(4) }).await?; - - let projects_resp = self.get_user_projects(context.clone(), username.clone(), UserProjectsQuery { page: Some(1), per_page: Some(4) }).await?; - - let activity_resp = self.get_user_activity(context.clone(), username.clone(), UserActivityQuery { page: Some(1), per_page: Some(8) }).await?; - - let heatmap = self.get_user_contribution_heatmap(context.clone(), username.clone(), ContributionHeatmapQuery { start_date: None, end_date: None }).await?; - - let follower_count = self.user_get_subscriber_count(context.clone(), username.clone()).await?; - let following_count = self.user_get_subscription_count(context.clone(), username.clone()).await?; - - let stars_resp = self.get_user_stars(context.clone(), username.clone()).await?; + + let repos_resp = self + .get_user_repos( + context.clone(), + username.clone(), + UserReposQuery { + page: Some(1), + per_page: Some(4), + }, + ) + .await?; + + let projects_resp = self + .get_user_projects( + context.clone(), + username.clone(), + UserProjectsQuery { + page: Some(1), + per_page: Some(4), + }, + ) + .await?; + + let activity_resp = self + .get_user_activity( + context.clone(), + username.clone(), + UserActivityQuery { + page: Some(1), + per_page: Some(8), + }, + ) + .await?; + + let heatmap = self + .get_user_contribution_heatmap( + context.clone(), + username.clone(), + ContributionHeatmapQuery { + start_date: None, + end_date: None, + }, + ) + .await?; + + let follower_count = self + .user_get_subscriber_count(context.clone(), username.clone()) + .await?; + let following_count = self + .user_get_subscription_count(context.clone(), username.clone()) + .await?; + + let stars_resp = self + .get_user_stars(context.clone(), username.clone()) + .await?; let stars_count = stars_resp.total; Ok(UserSummaryResponse { diff --git a/libs/service/user/user_activity.rs b/libs/service/user/user_activity.rs index 1165d1a..0dc851a 100644 --- a/libs/service/user/user_activity.rs +++ b/libs/service/user/user_activity.rs @@ -128,9 +128,7 @@ impl AppService { action: activity.event_type, title: activity.title, resource_type: Some("project".to_string()), - resource_name: proj_map - .get(&activity.project) - .map(|p| p.name.clone()), + resource_name: proj_map.get(&activity.project).map(|p| p.name.clone()), metadata: activity.metadata, created_at: activity.created_at, }); @@ -140,7 +138,11 @@ impl AppService { items.sort_by(|a, b| b.created_at.cmp(&a.created_at)); let total = items.len() as u64; - let page_items: Vec = items.into_iter().skip(offset as usize).take(per_page as usize).collect(); + let page_items: Vec = items + .into_iter() + .skip(offset as usize) + .take(per_page as usize) + .collect(); Ok(UserActivityResponse { items: page_items, diff --git a/libs/service/ws_token.rs b/libs/service/ws_token.rs index 5bca95e..4756dba 100644 --- a/libs/service/ws_token.rs +++ b/libs/service/ws_token.rs @@ -34,7 +34,12 @@ impl WsTokenService { } /// Generate a new WebSocket token for the given user - pub async fn generate_token(&self, user_id: Uuid, device_id: Option, client_id: Option) -> Result { + pub async fn generate_token( + &self, + user_id: Uuid, + device_id: Option, + client_id: Option, + ) -> Result { let token = Self::random_token(); let now = Utc::now(); let token_data = WsTokenData { @@ -92,7 +97,7 @@ impl WsTokenService { Ok(ws_token_data) } - pub async fn validate_token(&self, token: &str) -> Result { + pub async fn validate_token(&self, token: &str) -> Result { let data = self.validate_token_ctx(token).await?; Ok(data.user_id) } diff --git a/libs/session/lib.rs b/libs/session/lib.rs index ad8555d..fe8e433 100644 --- a/libs/session/lib.rs +++ b/libs/session/lib.rs @@ -8,8 +8,6 @@ pub mod storage; pub use self::{ middleware::SessionMiddleware, - session::{ - Session, SessionGetError, SessionInsertError, SessionStatus, SessionUser, - }, + session::{Session, SessionGetError, SessionInsertError, SessionStatus, SessionUser}, session_ext::SessionExt, };