diff --git a/libs/api/agent/issue_triage.rs b/libs/api/agent/issue_triage.rs index 775d94c..2cf71f2 100644 --- a/libs/api/agent/issue_triage.rs +++ b/libs/api/agent/issue_triage.rs @@ -33,7 +33,9 @@ pub async fn triage_issue( let user_id = session.user().ok_or(crate::error::ApiError( service::error::AppError::Unauthorized, ))?; - let project = service.utils_find_project_by_name(project_name.clone()).await?; + let project = service + .utils_find_project_by_name(project_name.clone()) + .await?; // Verify user has access to the project before triggering AI triage service .check_project_access(project.id, user_id) diff --git a/libs/api/auth/mod.rs b/libs/api/auth/mod.rs index a453140..a4fc8d0 100644 --- a/libs/api/auth/mod.rs +++ b/libs/api/auth/mod.rs @@ -37,10 +37,22 @@ pub fn init_auth_routes(cfg: &mut actix_web::web::ServiceConfig) { "/password/confirm", actix_web::web::post().to(password::api_user_confirm_password_reset), ) - .route("/2fa/enable", actix_web::web::post().to(totp::api_2fa_enable)) - .route("/2fa/verify", actix_web::web::post().to(totp::api_2fa_verify)) - .route("/2fa/disable", actix_web::web::post().to(totp::api_2fa_disable)) - .route("/2fa/status", actix_web::web::post().to(totp::api_2fa_status)) + .route( + "/2fa/enable", + actix_web::web::post().to(totp::api_2fa_enable), + ) + .route( + "/2fa/verify", + actix_web::web::post().to(totp::api_2fa_verify), + ) + .route( + "/2fa/disable", + actix_web::web::post().to(totp::api_2fa_disable), + ) + .route( + "/2fa/status", + actix_web::web::post().to(totp::api_2fa_status), + ) .route("/email", actix_web::web::post().to(email::api_email_get)) .route( "/email/change", diff --git a/libs/api/auth/password.rs b/libs/api/auth/password.rs index 9c416a5..c179097 100644 --- a/libs/api/auth/password.rs +++ b/libs/api/auth/password.rs @@ -2,7 +2,9 @@ use crate::ApiResponse; use crate::error::ApiError; use actix_web::{HttpResponse, Result, web}; use service::AppService; -use service::auth::password::{ChangePasswordParams, ConfirmResetPasswordParams, ResetPasswordParams}; +use service::auth::password::{ + ChangePasswordParams, ConfirmResetPasswordParams, ResetPasswordParams, +}; use session::Session; #[utoipa::path( diff --git a/libs/api/auth/totp.rs b/libs/api/auth/totp.rs index 0562fe9..ed66d2b 100644 --- a/libs/api/auth/totp.rs +++ b/libs/api/auth/totp.rs @@ -91,4 +91,4 @@ pub async fn api_2fa_status( ) -> Result { let resp = service.auth_2fa_status(&session).await?; Ok(ApiResponse::ok(resp).to_response()) -} \ No newline at end of file +} diff --git a/libs/api/auth/ws_token.rs b/libs/api/auth/ws_token.rs index b1a0524..c98b91e 100644 --- a/libs/api/auth/ws_token.rs +++ b/libs/api/auth/ws_token.rs @@ -30,11 +30,13 @@ pub async fn ws_token_generate( service: web::Data, session: Session, ) -> Result { - let user_id = session.user().ok_or_else(|| ApiError::from(AppError::Unauthorized))?; + let user_id = session + .user() + .ok_or_else(|| ApiError::from(AppError::Unauthorized))?; let device_id = session.get::("device_id").unwrap_or_default(); let client_id = session.get::("client_id").unwrap_or_default(); - + let token = service .ws_token .generate_token(user_id, device_id, client_id) diff --git a/libs/api/build.rs b/libs/api/build.rs index 94e5b2d..e2331c5 100644 --- a/libs/api/build.rs +++ b/libs/api/build.rs @@ -7,9 +7,9 @@ use std::fs; use std::io::Write; use std::path::{Path, PathBuf}; -use sha2::{Digest, Sha256}; -use flate2::write::GzEncoder; use flate2::Compression; +use flate2::write::GzEncoder; +use sha2::{Digest, Sha256}; // ── Compression helpers ────────────────────────────────────────────────── @@ -37,7 +37,10 @@ fn compute_etag(data: &[u8]) -> String { hasher.update(data); let hash = hasher.finalize(); // First 32 hex chars for a compact etag - hash.iter().map(|b| format!("{:02x}", b)).take(16).collect::() + hash.iter() + .map(|b| format!("{:02x}", b)) + .take(16) + .collect::() } // ── Asset collection ───────────────────────────────────────────────────── @@ -59,9 +62,8 @@ fn collect_assets(dist_dir: &Path) -> BTreeMap { continue; } - let data = fs::read(&entry).unwrap_or_else(|e| { - panic!("Failed to read dist file {}: {}", path_str, e) - }); + let data = fs::read(&entry) + .unwrap_or_else(|e| panic!("Failed to read dist file {}: {}", path_str, e)); let etag = compute_etag(&data); let brotli_data = brotli_compress(&data); @@ -106,7 +108,11 @@ fn rust_byte_literal(data: &[u8]) -> String { let lines: Vec = data .chunks(80) .map(|chunk| { - chunk.iter().map(|b| b.to_string()).collect::>().join(", ") + chunk + .iter() + .map(|b| b.to_string()) + .collect::>() + .join(", ") }) .collect(); format!("[\n{}\n]", lines.join(",\n")) @@ -114,7 +120,11 @@ fn rust_byte_literal(data: &[u8]) -> String { } fn path_to_ident(path: &str) -> String { - let s = path.replace('-', "_").replace('.', "_").replace('/', "_").to_uppercase(); + let s = path + .replace('-', "_") + .replace('.', "_") + .replace('/', "_") + .to_uppercase(); format!("ASSET_{s}") } @@ -143,12 +153,20 @@ fn generate_frontend_module(assets: &BTreeMap, out_dir: &Path) { let br_id = br_ident(path); let gz_id = gz_ident(path); - code += &format!("static {}: &[u8] = &{};\n", ident, rust_byte_literal(&asset.data)); + code += &format!( + "static {}: &[u8] = &{};\n", + ident, + rust_byte_literal(&asset.data) + ); code += &format!("static {}: &str = \"{}\";\n", etag_id, asset.etag); if let Some(ref br) = asset.brotli { code += &format!("static {}: &[u8] = &{};\n", br_id, rust_byte_literal(br)); } - code += &format!("static {}: &[u8] = &{};\n", gz_id, rust_byte_literal(&asset.gzip)); + code += &format!( + "static {}: &[u8] = &{};\n", + gz_id, + rust_byte_literal(&asset.gzip) + ); code += "\n"; } @@ -185,18 +203,13 @@ fn generate_frontend_module(assets: &BTreeMap, out_dir: &Path) { code += "}\n"; let out_path = out_dir.join("frontend.rs"); - fs::write(&out_path, code).unwrap_or_else(|e| { - panic!("Failed to write generated frontend.rs: {}", e) - }); + fs::write(&out_path, code) + .unwrap_or_else(|e| panic!("Failed to write generated frontend.rs: {}", e)); } fn main() { let manifest_dir = env::var("CARGO_MANIFEST_DIR").unwrap(); - let workspace_root = Path::new(&manifest_dir) - .parent() - .unwrap() - .parent() - .unwrap(); + let workspace_root = Path::new(&manifest_dir).parent().unwrap().parent().unwrap(); let dist_dir = workspace_root.join("dist"); if !dist_dir.exists() { @@ -215,8 +228,11 @@ fn main() { println!("cargo:rerun-if-changed=dist/"); let assets = collect_assets(&dist_dir); - println!("cargo:warning=Collected {} frontend assets from dist/", assets.len()); + println!( + "cargo:warning=Collected {} frontend assets from dist/", + assets.len() + ); let out_dir = env::var("OUT_DIR").unwrap(); generate_frontend_module(&assets, Path::new(&out_dir)); -} \ No newline at end of file +} diff --git a/libs/api/chat/handlers/conversation.rs b/libs/api/chat/handlers/conversation.rs index 5266150..c5990d4 100644 --- a/libs/api/chat/handlers/conversation.rs +++ b/libs/api/chat/handlers/conversation.rs @@ -1,15 +1,17 @@ -use actix_web::{web, HttpResponse, Result}; -use session::Session; +use actix_web::{HttpResponse, Result, web}; use service::error::AppError; +use session::Session; use uuid::Uuid; -use crate::error::ApiError; use crate::ApiResponse; +use crate::error::ApiError; use super::types::{ConversationListQuery, ConversationResponse, CreateConversationParams}; fn get_user_id(session: &Session) -> Result { - session.user().ok_or_else(|| ApiError::from(AppError::Unauthorized)) + session + .user() + .ok_or_else(|| ApiError::from(AppError::Unauthorized)) } #[utoipa::path( @@ -55,6 +57,7 @@ pub async fn conversation_create( operation_id = "ai_conversation_list", params( ("project_id" = Option, Query, description = "Filter by project"), + ("q" = Option, Query, description = "Search query (title)"), ), responses( (status = 200, description = "List of conversations", body = ApiResponse>), @@ -70,13 +73,11 @@ pub async fn conversation_list( let user_id = get_user_id(&session)?; let convs = service - .list_conversations(user_id, query.project_id, 50) + .list_conversations(user_id, query.project_id, 50, query.q.clone()) .await?; - let resp: Vec = convs - .into_iter() - .map(ConversationResponse::from) - .collect(); + let resp: Vec = + convs.into_iter().map(ConversationResponse::from).collect(); Ok(ApiResponse::ok(resp).to_response()) } diff --git a/libs/api/chat/handlers/fork.rs b/libs/api/chat/handlers/fork.rs index f9118fe..d05a863 100644 --- a/libs/api/chat/handlers/fork.rs +++ b/libs/api/chat/handlers/fork.rs @@ -1,78 +1,64 @@ -use actix_web::{web, HttpResponse, Result}; -use session::Session; +use actix_web::{HttpResponse, Result, web}; use service::error::AppError; +use session::Session; use uuid::Uuid; -use crate::error::ApiError; use crate::ApiResponse; +use crate::error::ApiError; #[derive(Debug, serde::Serialize, utoipa::ToSchema)] -pub struct ForkResponse { +pub struct ForkConversationResponse { pub id: Uuid, - pub conversation_id: Option, - pub source_message_id: Uuid, - pub fork_message_id: Uuid, - #[schema(value_type = chrono::DateTime)] + pub title: Option, + pub model: String, pub created_at: chrono::DateTime, } +/// Fork a conversation from a specific message, creating a new conversation +/// with all messages up to and including the source message. #[utoipa::path( post, - path = "/api/ai/conversations/{conversation_id}/messages/{message_id}/fork/{target_message_id}", - operation_id = "ai_message_fork", + path = "/api/ai/conversations/{conversation_id}/messages/{message_id}/fork", + operation_id = "ai_conversation_fork", params( ("conversation_id" = Uuid, Path, description = "Conversation ID"), - ("message_id" = Uuid, Path, description = "Source message ID"), - ("target_message_id" = Uuid, Path, description = "Target/fork message ID to create"), + ("message_id" = Uuid, Path, description = "Source message ID to fork from"), ), responses( - (status = 200, description = "Fork created", body = ApiResponse), + (status = 200, description = "Conversation forked", body = ApiResponse), ), tag = "AI Chat" )] pub async fn message_fork( service: web::Data, session: Session, - path: web::Path<(Uuid, Uuid, Uuid)>, + path: web::Path<(Uuid, Uuid)>, ) -> Result { let user_id = session .user() .ok_or_else(|| ApiError::from(AppError::Unauthorized))?; - let (conversation_id, source_message_id, target_message_id) = path.into_inner(); + let (conversation_id, source_message_id) = path.into_inner(); - let fork_record = service - .fork_message( - conversation_id, - user_id, - source_message_id, - target_message_id, - ) + let new_conv = service + .fork_conversation_from_message(user_id, conversation_id, source_message_id) .await?; - let resp = ForkResponse { - id: fork_record.id, - conversation_id: fork_record.conversation_id, - source_message_id: fork_record.source_message_id, - fork_message_id: fork_record.fork_message_id, - created_at: fork_record.created_at, + let resp = ForkConversationResponse { + id: new_conv.id, + title: new_conv.title, + model: new_conv.model, + created_at: new_conv.created_at, }; Ok(ApiResponse::ok(resp).to_response()) } -#[utoipa::path( - get, - path = "/api/ai/conversations/{conversation_id}/messages/{message_id}/forks", - operation_id = "ai_message_forks", - params( - ("conversation_id" = Uuid, Path, description = "Conversation ID"), - ("message_id" = Uuid, Path, description = "Source message ID"), - ), - responses( - (status = 200, description = "List forks from message", body = ApiResponse>), - ), - tag = "AI Chat" -)] +#[derive(Debug, serde::Serialize, utoipa::ToSchema)] +pub struct ForkListResponse { + pub forks: Vec, +} + +/// List all forks created from a specific message. pub async fn message_forks( service: web::Data, session: Session, @@ -87,16 +73,18 @@ pub async fn message_forks( .list_forks(conversation_id, user_id, source_message_id) .await?; - let resp: Vec = forks + let fork_responses: Vec = forks .into_iter() - .map(|f| ForkResponse { - id: f.id, - conversation_id: f.conversation_id, - source_message_id: f.source_message_id, - fork_message_id: f.fork_message_id, + .map(|f| ForkConversationResponse { + id: f.fork_message_id, + title: None, + model: String::new(), created_at: f.created_at, }) .collect(); - Ok(ApiResponse::ok(resp).to_response()) + Ok(ApiResponse::ok(ForkListResponse { + forks: fork_responses, + }) + .to_response()) } diff --git a/libs/api/chat/handlers/message.rs b/libs/api/chat/handlers/message.rs index 009283f..3f6f307 100644 --- a/libs/api/chat/handlers/message.rs +++ b/libs/api/chat/handlers/message.rs @@ -1,14 +1,18 @@ -use crate::error::ApiError; use crate::ApiResponse; -use actix_web::{web, HttpResponse, Result}; -use session::Session; +use crate::error::ApiError; +use actix_web::{HttpResponse, Result, web}; +use models::ai::AiMessage; +use sea_orm::EntityTrait; use service::error::AppError; +use session::Session; use uuid::Uuid; use super::types::{CreateMessageParams, EditMessageParams, MessageListQuery, MessageResponse}; fn get_user_id(session: &Session) -> Result { - session.user().ok_or_else(|| ApiError::from(AppError::Unauthorized)) + session + .user() + .ok_or_else(|| ApiError::from(AppError::Unauthorized)) } #[utoipa::path( @@ -39,10 +43,7 @@ pub async fn message_list( .list_messages(conversation_id, user_id, limit) .await?; - let resp: Vec = msgs - .into_iter() - .map(MessageResponse::from) - .collect(); + let resp: Vec = msgs.into_iter().map(MessageResponse::from).collect(); Ok(ApiResponse::ok(resp).to_response()) } @@ -75,7 +76,7 @@ pub async fn message_create( conversation_id, user_id, params.parent_message_id, - params.content.role.clone(), + "user".to_string(), params.content.content.clone(), params.model.clone(), params.is_fork_origin.unwrap_or(false), @@ -200,10 +201,7 @@ pub async fn message_children( .list_child_messages(conversation_id, user_id, parent_message_id) .await?; - let resp: Vec = msgs - .into_iter() - .map(MessageResponse::from) - .collect(); + let resp: Vec = msgs.into_iter().map(MessageResponse::from).collect(); Ok(ApiResponse::ok(resp).to_response()) } @@ -235,6 +233,15 @@ pub async fn message_stream( let model = conv.model; + let msg = AiMessage::find_by_id(message_id) + .one(service.db.reader()) + .await + .map_err(AppError::from)? + .ok_or_else(|| ApiError::from(AppError::NotFound("message".into())))?; + if msg.conversation_id != conversation_id || msg.role != "user" || !msg.is_latest { + return Err(ApiError::from(AppError::NotFound("message".into()))); + } + let response = actix_web::HttpResponse::Ok() .content_type("text/event-stream") .insert_header(("Cache-Control", "no-cache")) @@ -306,10 +313,7 @@ pub async fn message_versions( .list_message_versions(conversation_id, user_id, message_id) .await?; - let resp: Vec = versions - .into_iter() - .map(MessageResponse::from) - .collect(); + let resp: Vec = versions.into_iter().map(MessageResponse::from).collect(); Ok(ApiResponse::ok(resp).to_response()) } diff --git a/libs/api/chat/handlers/share.rs b/libs/api/chat/handlers/share.rs index af92d76..19a40df 100644 --- a/libs/api/chat/handlers/share.rs +++ b/libs/api/chat/handlers/share.rs @@ -1,15 +1,17 @@ -use actix_web::{web, HttpResponse, Result}; -use session::Session; +use actix_web::{HttpResponse, Result, web}; use service::error::AppError; +use session::Session; use uuid::Uuid; -use crate::error::ApiError; use crate::ApiResponse; +use crate::error::ApiError; use super::types::{ConversationResponse, ShareResponse}; fn get_user_id(session: &Session) -> Result { - session.user().ok_or_else(|| ApiError::from(AppError::Unauthorized)) + session + .user() + .ok_or_else(|| ApiError::from(AppError::Unauthorized)) } #[utoipa::path( @@ -33,9 +35,7 @@ pub async fn conversation_share( let user_id = get_user_id(&session)?; let conversation_id = path.into_inner(); - let (share, share_token) = service - .share_conversation(conversation_id, user_id) - .await?; + let (share, share_token) = service.share_conversation(conversation_id, user_id).await?; let resp = ShareResponse { id: share.id, diff --git a/libs/api/chat/handlers/types.rs b/libs/api/chat/handlers/types.rs index 4de083d..1c38478 100644 --- a/libs/api/chat/handlers/types.rs +++ b/libs/api/chat/handlers/types.rs @@ -56,6 +56,7 @@ pub struct UpdateConversationParams { #[derive(Debug, Deserialize)] pub struct ConversationListQuery { pub project_id: Option, + pub q: Option, } #[derive(Debug, Deserialize, utoipa::ToSchema)] diff --git a/libs/api/chat/mod.rs b/libs/api/chat/mod.rs index 142b172..8769945 100644 --- a/libs/api/chat/mod.rs +++ b/libs/api/chat/mod.rs @@ -7,7 +7,10 @@ pub mod watch; pub fn init_chat_routes(cfg: &mut web::ServiceConfig) { cfg.service( web::scope("/ai/conversations") - .route("", web::post().to(handlers::conversation::conversation_create)) + .route( + "", + web::post().to(handlers::conversation::conversation_create), + ) .route("", web::get().to(handlers::conversation::conversation_list)) .route( "/{conversation_id}", @@ -54,7 +57,7 @@ pub fn init_chat_routes(cfg: &mut web::ServiceConfig) { web::post().to(handlers::message::message_resend), ) .route( - "/{conversation_id}/messages/{message_id}/fork/{target_message_id}", + "/{conversation_id}/messages/{message_id}/fork", web::post().to(handlers::fork::message_fork), ) .route( diff --git a/libs/api/chat/stream.rs b/libs/api/chat/stream.rs index 4b47825..9e41ad3 100644 --- a/libs/api/chat/stream.rs +++ b/libs/api/chat/stream.rs @@ -1,17 +1,20 @@ use agent::chat::chat_execution; -use agent::chat::{normalize_thinking_content, AiChunkType, AiStreamChunk}; +use agent::chat::{AiChunkType, AiStreamChunk, normalize_thinking_content}; use agent::client::AiClientConfig; -use agent::client::types::ChatRequestMessage; use agent::client::StreamChunkType; +use agent::client::types::ChatRequestMessage; +use agent::react::PERSONAL_CONTEXT_PROMPT; use futures::StreamExt; -use models::ai::{ai_message, ai_conversation, AiMessage}; use models::agents::{model, model_version}; +use models::ai::{AiMessage, ai_conversation, ai_message}; use queue::{ChatMessageEvent, ChatStreamChunkEvent}; -use sea_orm::{EntityTrait, QueryFilter, ColumnTrait, QueryOrder, ActiveModelTrait, Set, PaginatorTrait}; +use sea_orm::{ + ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, Set, +}; use service::AppService; use std::pin::Pin; -use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use tokio_stream::wrappers::ReceiverStream; use uuid::Uuid; @@ -40,6 +43,10 @@ pub fn create_chat_sse_stream( msg_id, started_at )).await; + let _ = tx + .send("data: {\"event\":\"done\",\"data\":\"recovery\"}\n\n".to_string()) + .await; + return; } let queue = service.queue_producer.clone(); @@ -49,7 +56,8 @@ pub fn create_chat_sse_stream( let messages = match build_messages_from_history(&service, conversation_id).await { Ok(msgs) => msgs, Err(e) => { - let _ = tx.send(format!("data: {{\"event\":\"error\",\"data\":\"{}\"}}\n\n", e)).await; + let payload = serde_json::json!({"event":"error","data": e.to_string()}); + let _ = tx.send(format!("data: {}\n\n", payload)).await; return; } }; @@ -58,14 +66,24 @@ pub fn create_chat_sse_stream( let api_key = match service.config.ai_api_key() { Ok(k) => k, Err(_) => { - let _ = tx.send("data: {\"event\":\"error\",\"data\":\"AI not configured\"}\n\n".to_string()).await; + let _ = tx + .send( + "data: {\"event\":\"error\",\"data\":\"AI not configured\"}\n\n" + .to_string(), + ) + .await; return; } }; let base_url = match service.config.ai_basic_url() { Ok(u) => u, Err(_) => { - let _ = tx.send("data: {\"event\":\"error\",\"data\":\"AI not configured\"}\n\n".to_string()).await; + let _ = tx + .send( + "data: {\"event\":\"error\",\"data\":\"AI not configured\"}\n\n" + .to_string(), + ) + .await; return; } }; @@ -82,55 +100,110 @@ pub fn create_chat_sse_stream( None => (Vec::new(), None, None), }; - // Get project_id from conversation - let project_id = match service.find_conversation(conversation_id).await { - Ok(c) => c.project_id.unwrap_or(Uuid::nil()), - Err(_) => { - let _ = tx.send("data: {\"event\":\"error\",\"data\":\"conversation not found\"}\n\n".to_string()).await; - return; - } + // Get project_id and scope from conversation + let (project_id, conv_project_id, is_personal) = + match service.find_conversation(conversation_id).await { + Ok(c) => { + let conv_project_id = c.project_id; + ( + conv_project_id.unwrap_or(Uuid::nil()), + conv_project_id, + conv_project_id.is_none(), + ) + } + Err(_) => { + let _ = tx + .send( + "data: {\"event\":\"error\",\"data\":\"conversation not found\"}\n\n" + .to_string(), + ) + .await; + return; + } + }; + + // In personal scope: filter out project/git/repo tools and inject personal context prompt + let tools = if is_personal { + tools + .into_iter() + .filter(|t| { + let name = t + .get("function") + .and_then(|f| f.get("name")) + .and_then(|n| n.as_str()) + .unwrap_or(""); + !name.starts_with("project_") + && !name.starts_with("git_") + && !name.starts_with("repo_") + && name != "send_message" + && name != "retract_message" + }) + .collect() + } else { + tools + }; + + // Inject personal context system prompt for non-project chats + let messages = if is_personal { + let mut msgs = messages; + msgs.insert( + 0, + ChatRequestMessage::system(PERSONAL_CONTEXT_PROMPT.to_string()), + ); + msgs + } else { + messages }; // Pre-flight balance check: verify project + user can afford at least a minimal AI call - let balance_ok = agent::billing::check_balance( - &service.db, project_id, user_id, Uuid::nil(), 500, 250, - ).await; + if !is_personal { + let balance_ok = agent::billing::check_balance( + &service.db, + project_id, + user_id, + Uuid::nil(), + 500, + 250, + ) + .await; - match balance_ok { - Ok(true) => {}, - Ok(false) => { - tracing::warn!(project_id = %project_id, user_id = %user_id, "Insufficient balance for chat AI call"); + match balance_ok { + Ok(true) => {} + Ok(false) => { + tracing::warn!(project_id = %project_id, user_id = %user_id, "Insufficient balance for chat AI call"); - let _ = agent::billing::persist_billing_error( - &service.db, "user", user_id, "insufficient_balance", - &format!("Insufficient balance. Your account does not have enough funds for this AI request."), - Some(serde_json::json!({ - "user_id": user_id.to_string(), - "project_id": project_id.to_string(), - })), - ).await; + let _ = agent::billing::persist_billing_error( + &service.db, "user", user_id, "insufficient_balance", + &format!("Insufficient balance. Your account does not have enough funds for this AI request."), + Some(serde_json::json!({ + "user_id": user_id.to_string(), + "project_id": project_id.to_string(), + })), + ).await; - let error_msg = "Insufficient balance. Your account does not have enough funds to process this AI request. Please add credits to continue."; - let _ = tx.send(format!("data: {{\"event\":\"billing_error\",\"data\":\"{}\"}}\n\n", error_msg)).await; - let _ = tx.send("data: {\"event\":\"done\",\"data\":\"billing_error\"}\n\n".to_string()).await; - return; - }, - Err(e) => { - tracing::warn!(error = %e, "Balance check failed, proceeding without pre-flight check"); + let error_msg = "Insufficient balance. Your account does not have enough funds to process this AI request. Please add credits to continue."; + let payload = serde_json::json!({"event":"billing_error","data":error_msg}); + let _ = tx.send(format!("data: {}\n\n", payload)).await; + let _ = tx + .send( + "data: {\"event\":\"done\",\"data\":\"billing_error\"}\n\n".to_string(), + ) + .await; + return; + } + Err(e) => { + tracing::warn!(error = %e, "Balance check failed, proceeding without pre-flight check"); + } } } let max_tool_depth = 99; + let assistant_msg_id = Uuid::now_v7(); // Determine conversation project_id for chat message event - let conv_project_id = match service.find_conversation(conversation_id).await { - Ok(c) => c.project_id, - Err(_) => None, - }; - // Broadcast chat message start event via NATS let chat_msg = ChatMessageEvent { - message_id: user_message_id, + message_id: assistant_msg_id, conversation_id, project_id: conv_project_id, sender_id: Uuid::nil(), @@ -144,7 +217,16 @@ pub fn create_chat_sse_stream( let _ = queue.publish_chat_message(&chat_msg).await; // Mark stream as active in Redis so page refresh can recover - let _ = cache.set_chat_stream_active(conversation_id, user_message_id).await; + let _ = cache + .set_chat_stream_active(conversation_id, user_message_id) + .await; + + // Clear any stale cancel flag before starting + let _ = cache.clear_chat_stream_cancelled(conversation_id).await; + + // Cancellation token — checked in on_chunk and by a periodic poller + let cancelled = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let cancelled_for_on_chunk = cancelled.clone(); let on_chunk_tx = tx.clone(); let on_chunk_queue = queue.clone(); @@ -160,7 +242,13 @@ pub fn create_chat_sse_stream( let conv_id = on_chunk_conv_id; let msg_id = on_chunk_msg_id; let model = on_chunk_model.clone(); + let cancelled = cancelled_for_on_chunk.clone(); Box::pin(async move { + // Check if stream has been cancelled + if cancelled.load(Ordering::Acquire) { + return; + } + let event = match chunk.chunk_type { AiChunkType::Thinking => "thinking", AiChunkType::Answer => "token", @@ -171,10 +259,28 @@ pub fn create_chat_sse_stream( AiChunkType::Thinking => normalize_thinking_content(&chunk.content), _ => chunk.content.clone(), }; + + // Build structured data payload based on chunk type + let data_json = match chunk.chunk_type { + AiChunkType::ToolCall | AiChunkType::ToolResult => { + // Use structured metadata if available + if let Some(meta) = chunk.metadata { + meta + } else { + // Fallback: wrap raw content as display text + serde_json::json!({"display": content}) + } + } + _ => { + // thinking / answer: send plain text content + serde_json::Value::String(content) + } + }; + let sse = format!( "data: {{\"event\":\"{}\",\"data\":{}}}\n\n", event, - serde_json::to_string(&content).unwrap_or_default() + serde_json::to_string(&data_json).unwrap_or_default() ); let _ = tx.send(sse).await; @@ -183,7 +289,7 @@ pub fn create_chat_sse_stream( conversation_id: conv_id, message_id: msg_id, seq, - content, + content: chunk.content, done: false, error: None, chunk_type: Some(event.to_string()), @@ -193,13 +299,42 @@ pub fn create_chat_sse_stream( }) as Pin + Send>> }); + let cancelled_for_check = cancelled.clone(); + let cache_for_check = cache.clone(); + let conv_id_for_check = conversation_id; + let (done_tx, mut done_rx) = tokio::sync::oneshot::channel::<()>(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); + loop { + tokio::select! { + _ = interval.tick() => { + if cache_for_check.is_chat_stream_cancelled(conv_id_for_check).await { + cancelled_for_check.store(true, Ordering::Release); + break; + } + } + _ = &mut done_rx => break, + } + } + }); + + // Resolve max_tokens from model config (unlimited if not set) + let max_tokens = match model::Entity::find() + .filter(model::Column::Name.eq(&model_name)) + .one(service.db.reader()) + .await + { + Ok(Some(m)) => m.max_output_tokens.map(|v| v as u32).unwrap_or(u32::MAX), + _ => u32::MAX, + }; + let result = chat_execution::execute_chat_stream( messages, tools, &model_name, &config, - 0.7, // temperature - 4096, // max_tokens + 0.7, // temperature + max_tokens, // max_tokens from model config max_tool_depth, tool_registry.as_ref(), service.db.clone(), @@ -210,17 +345,34 @@ pub fn create_chat_sse_stream( embed_service, on_chunk, Some(conversation_id), - ).await; + ) + .await; - // Clear stream active state (streaming finished) + // Clear stream active state and cancel flag (streaming finished) let _ = cache.clear_chat_stream_active(conversation_id).await; + let _ = cache.clear_chat_stream_cancelled(conversation_id).await; + let was_cancelled = cancelled.load(Ordering::Acquire); + let _ = done_tx.send(()); match result { Ok(stream_result) => { + if was_cancelled { + let _ = tx + .send("data: {\"event\":\"done\",\"data\":\"stopped\"}\n\n".to_string()) + .await; + return; + } // Build ordered content blocks from stream chunks, merging // consecutive blocks of the same role (thinking/assistant). - let raw_blocks: Vec<(String, String)> = stream_result.chunks.iter() - .filter(|c| matches!(c.chunk_type, StreamChunkType::Thinking | StreamChunkType::Answer)) + let raw_blocks: Vec<(String, String)> = stream_result + .chunks + .iter() + .filter(|c| { + matches!( + c.chunk_type, + StreamChunkType::Thinking | StreamChunkType::Answer + ) + }) .map(|chunk| { let role = match chunk.chunk_type { StreamChunkType::Thinking => "thinking", @@ -234,14 +386,18 @@ pub fn create_chat_sse_stream( // Apply thinking normalization to the fully merged thinking // blocks — per-token normalization is meaningless since each // chunk is a single token. - let normalized_blocks: Vec<(String, String)> = merged_blocks.into_iter().map(|(role, content)| { - if role == "thinking" { - (role, normalize_thinking_content(&content)) - } else { - (role, content) - } - }).collect(); - let content_blocks: Vec = normalized_blocks.iter() + let normalized_blocks: Vec<(String, String)> = merged_blocks + .into_iter() + .map(|(role, content)| { + if role == "thinking" { + (role, normalize_thinking_content(&content)) + } else { + (role, content) + } + }) + .collect(); + let content_blocks: Vec = normalized_blocks + .iter() .map(|(role, content)| serde_json::json!({ "role": role, "content": content })) .collect(); let content_value = if content_blocks.is_empty() { @@ -251,7 +407,6 @@ pub fn create_chat_sse_stream( }; // Persist assistant message - let assistant_msg_id = Uuid::now_v7(); let assistant_msg = ai_message::ActiveModel { id: Set(assistant_msg_id), conversation_id: Set(conversation_id), @@ -279,10 +434,14 @@ pub fn create_chat_sse_stream( // After AI response, check/update conversation title and emit via SSE if let Ok(Some(conv)) = ai_conversation::Entity::find_by_id(conversation_id) - .one(service.db.reader()).await + .one(service.db.reader()) + .await { let existing_title = conv.title.clone(); - let needs_title = existing_title.as_deref().map(|t| t.is_empty() || t == "New Chat").unwrap_or(true); + let needs_title = existing_title + .as_deref() + .map(|t| t.is_empty() || t == "New Chat") + .unwrap_or(true); if needs_title { // Generate title from first user message @@ -290,18 +449,20 @@ pub fn create_chat_sse_stream( .filter(ai_message::Column::ConversationId.eq(conversation_id)) .filter(ai_message::Column::Role.eq("user")) .order_by_asc(ai_message::Column::CreatedAt) - .one(service.db.reader()).await.ok().flatten(); + .one(service.db.reader()) + .await + .ok() + .flatten(); if let Some(user_msg) = first_user_msg { let content = match &user_msg.content { serde_json::Value::String(s) => s.clone(), - serde_json::Value::Array(arr) => { - arr.first() - .and_then(|f| f.get("content")) - .and_then(|c| c.as_str()) - .unwrap_or("") - .to_string() - } + serde_json::Value::Array(arr) => arr + .first() + .and_then(|f| f.get("content")) + .and_then(|c| c.as_str()) + .unwrap_or("") + .to_string(), other => other.to_string(), }; @@ -323,14 +484,25 @@ pub fn create_chat_sse_stream( let _ = active.update(service.db.writer()).await; // Emit title via SSE - let title_payload = serde_json::json!({"title": truncated}).to_string(); - let _ = tx.send(format!("data: {{\"event\":\"title\",\"data\":{}}}\n\n", title_payload)).await; + let title_payload = + serde_json::json!({"title": truncated}).to_string(); + let _ = tx + .send(format!( + "data: {{\"event\":\"title\",\"data\":{}}}\n\n", + title_payload + )) + .await; } } } else if let Some(title) = &existing_title { // Title already set (e.g. by AI tool) — emit it let title_payload = serde_json::json!({"title": title}).to_string(); - let _ = tx.send(format!("data: {{\"event\":\"title\",\"data\":{}}}\n\n", title_payload)).await; + let _ = tx + .send(format!( + "data: {{\"event\":\"title\",\"data\":{}}}\n\n", + title_payload + )) + .await; } } } @@ -359,7 +531,7 @@ pub fn create_chat_sse_stream( None => None, }; - if let Some(version_id) = billing_version_id { + if let (Some(version_id), Some(_)) = (billing_version_id, conv_project_id) { match agent::billing::record_ai_usage( &service.db, project_id, @@ -392,7 +564,7 @@ pub fn create_chat_sse_stream( // Broadcast final chat message with token usage let final_msg = ChatMessageEvent { - message_id: user_message_id, + message_id: assistant_msg_id, conversation_id, project_id: conv_project_id, sender_id: Uuid::nil(), @@ -406,10 +578,13 @@ pub fn create_chat_sse_stream( let _ = queue.publish_chat_message(&final_msg).await; // Send final SSE done event - let _ = tx.send("data: {\"event\":\"done\",\"data\":\"ok\"}\n\n".to_string()).await; + let _ = tx + .send("data: {\"event\":\"done\",\"data\":\"ok\"}\n\n".to_string()) + .await; } Err(e) => { - let _ = tx.send(format!("data: {{\"event\":\"error\",\"data\":\"{}\"}}\n\n", e)).await; + let payload = serde_json::json!({"event":"error","data": e.to_string()}); + let _ = tx.send(format!("data: {}\n\n", payload)).await; } } }); @@ -427,20 +602,23 @@ async fn update_conversation_after_response( use sea_orm::EntityTrait; if let Ok(Some(conv)) = ai_conversation::Entity::find_by_id(conversation_id) - .one(service.db.reader()).await + .one(service.db.reader()) + .await { let input_tokens = assistant_msg.input_tokens.unwrap_or(0) as i64; let output_tokens = assistant_msg.output_tokens.unwrap_or(0) as i64; let total_tokens = input_tokens + output_tokens; + let previous_token_total = conv.token_usage_total.unwrap_or(0); let mut active: ai_conversation::ActiveModel = conv.into(); if let Ok(count) = AiMessage::find() .filter(ai_message::Column::ConversationId.eq(conversation_id)) - .count(service.db.reader()).await + .count(service.db.reader()) + .await { active.message_count = Set(count as i32); } - active.token_usage_total = Set(Some(total_tokens as i32)); + active.token_usage_total = Set(Some(previous_token_total + total_tokens as i32)); active.updated_at = Set(chrono::Utc::now()); let _ = active.update(service.db.writer()).await; } @@ -471,12 +649,15 @@ async fn build_messages_from_history( // For user/system messages: take the first block's content if role == "assistant" { arr.iter() - .filter(|item| item.get("role").and_then(|r| r.as_str()) != Some("thinking")) + .filter(|item| { + item.get("role").and_then(|r| r.as_str()) != Some("thinking") + }) .filter_map(|item| item.get("content").and_then(|c| c.as_str())) .collect::>() .join("\n") } else if let Some(first) = arr.first() { - first.get("content") + first + .get("content") .and_then(|c| c.as_str()) .unwrap_or("") .to_string() @@ -506,7 +687,9 @@ async fn build_messages_from_history( fn merge_consecutive_blocks(blocks: Vec<(String, String)>) -> Vec<(String, String)> { let mut merged: Vec<(String, String)> = Vec::new(); for (role, content) in blocks { - if content.is_empty() { continue; } + if content.is_empty() { + continue; + } if let Some(last) = merged.last_mut() { if last.0 == role { last.1.push_str(&content); diff --git a/libs/api/chat/watch.rs b/libs/api/chat/watch.rs index adf4911..0eb6077 100644 --- a/libs/api/chat/watch.rs +++ b/libs/api/chat/watch.rs @@ -5,7 +5,7 @@ //! stream chunks to connected clients. This enables multiple viewers to watch //! the same AI conversation in real-time. -use actix_web::{web, HttpResponse, Result}; +use actix_web::{HttpResponse, Result, web}; use futures::StreamExt; use service::AppService; use std::pin::Pin; @@ -34,10 +34,12 @@ pub fn create_watch_sse_stream( let nats = match &service.queue_producer.nats { Some(n) => n.clone(), None => { - let _ = tx.send(format!( - "data: {{\"event\":\"error\",\"data\":{}}}\n\n", - serde_json::to_string("NATS not available").unwrap_or_default() - )).await; + let _ = tx + .send(format!( + "data: {{\"event\":\"error\",\"data\":{}}}\n\n", + serde_json::to_string("NATS not available").unwrap_or_default() + )) + .await; return; } }; @@ -47,10 +49,12 @@ pub fn create_watch_sse_stream( let mut chunk_sub = match nats.subscribe(&chunk_subject).await { Ok(s) => s, Err(e) => { - let _ = tx.send(format!( - "data: {{\"event\":\"error\",\"data\":{}}}\n\n", - serde_json::to_string(&e.to_string()).unwrap_or_default() - )).await; + let _ = tx + .send(format!( + "data: {{\"event\":\"error\",\"data\":{}}}\n\n", + serde_json::to_string(&e.to_string()).unwrap_or_default() + )) + .await; return; } }; @@ -60,10 +64,12 @@ pub fn create_watch_sse_stream( let mut msg_sub = match nats.subscribe(&msg_subject).await { Ok(s) => s, Err(e) => { - let _ = tx.send(format!( - "data: {{\"event\":\"error\",\"data\":{}}}\n\n", - serde_json::to_string(&e.to_string()).unwrap_or_default() - )).await; + let _ = tx + .send(format!( + "data: {{\"event\":\"error\",\"data\":{}}}\n\n", + serde_json::to_string(&e.to_string()).unwrap_or_default() + )) + .await; return; } }; @@ -115,9 +121,9 @@ pub fn create_watch_sse_stream( } }); - Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx).map(|s| { - Ok(actix_web::web::Bytes::from(s)) - })) + Box::pin( + tokio_stream::wrappers::ReceiverStream::new(rx).map(|s| Ok(actix_web::web::Bytes::from(s))), + ) } #[utoipa::path( @@ -137,7 +143,9 @@ pub async fn conversation_watch( session: session::Session, path: web::Path, ) -> Result { - let user_id = session.user().ok_or_else(|| ApiError::from(service::error::AppError::Unauthorized))?; + let user_id = session + .user() + .ok_or_else(|| ApiError::from(service::error::AppError::Unauthorized))?; let conversation_id = path.into_inner(); // Verify access (view-only is sufficient) diff --git a/libs/api/dist.rs b/libs/api/dist.rs index 55c4dca..306effe 100644 --- a/libs/api/dist.rs +++ b/libs/api/dist.rs @@ -1,4 +1,4 @@ -use actix_web::{http::header, web, HttpRequest, HttpResponse}; +use actix_web::{HttpRequest, HttpResponse, http::header, web}; use mime_guess2::MimeGuess; fn cache_control_header(path: &str) -> &'static str { @@ -68,7 +68,9 @@ fn content_type_for_path(path: &str) -> String { _ => {} } } - MimeGuess::from_path(path).first_or_octet_stream().to_string() + MimeGuess::from_path(path) + .first_or_octet_stream() + .to_string() } /// Build an HttpResponse for the given asset. diff --git a/libs/api/frontend.rs b/libs/api/frontend.rs index eb67ce5..9712d7a 100644 --- a/libs/api/frontend.rs +++ b/libs/api/frontend.rs @@ -1,4 +1,4 @@ //! Frontend assets module — auto-generated by build.rs. //! The actual content is generated at $OUT_DIR/frontend.rs by the build script. -include!(concat!(env!("OUT_DIR"), "/frontend.rs")); \ No newline at end of file +include!(concat!(env!("OUT_DIR"), "/frontend.rs")); diff --git a/libs/api/git/init.rs b/libs/api/git/init.rs index b1b4815..00167f2 100644 --- a/libs/api/git/init.rs +++ b/libs/api/git/init.rs @@ -7,13 +7,15 @@ use session::Session; fn sanitize_repo_path(path: &str) -> Result { if path.contains("..") || path.contains('~') { return Err(ApiError(service::error::AppError::BadRequest( - "Invalid repository path".to_string() + "Invalid repository path".to_string(), ))); } - if path.starts_with('/') || path.starts_with('\\') || - (path.len() >= 3 && path.as_bytes()[1] == b':' && path.as_bytes()[2] == b'\\') { + if path.starts_with('/') + || path.starts_with('\\') + || (path.len() >= 3 && path.as_bytes()[1] == b':' && path.as_bytes()[2] == b'\\') + { return Err(ApiError(service::error::AppError::BadRequest( - "Absolute paths are not allowed".to_string() + "Absolute paths are not allowed".to_string(), ))); } Ok(path.to_string()) diff --git a/libs/api/issue/issue_label.rs b/libs/api/issue/issue_label.rs index b8ca7f8..c537419 100644 --- a/libs/api/issue/issue_label.rs +++ b/libs/api/issue/issue_label.rs @@ -1,7 +1,7 @@ use crate::{ApiResponse, error::ApiError}; use actix_web::{HttpResponse, Result, web}; -use service::issue::IssueAddLabelsByNamesRequest; use service::AppService; +use service::issue::IssueAddLabelsByNamesRequest; use session::Session; #[utoipa::path( diff --git a/libs/api/lib.rs b/libs/api/lib.rs index 1c0c972..6cbce97 100644 --- a/libs/api/lib.rs +++ b/libs/api/lib.rs @@ -20,4 +20,4 @@ pub mod user; #[allow(dead_code)] mod frontend; -pub use error::{api_success, ApiError, ApiResponse}; +pub use error::{ApiError, ApiResponse, api_success}; diff --git a/libs/api/openapi.rs b/libs/api/openapi.rs index 9aa9a17..8d134e1 100644 --- a/libs/api/openapi.rs +++ b/libs/api/openapi.rs @@ -743,7 +743,7 @@ use utoipa::OpenApi; crate::chat::handlers::types::MessageContent, crate::chat::handlers::types::MessageResponse, crate::chat::handlers::types::ShareResponse, - crate::chat::handlers::fork::ForkResponse, + crate::chat::handlers::fork::ForkConversationResponse, ) ), tags( diff --git a/libs/api/project/stats.rs b/libs/api/project/stats.rs index 96ad4ad..5ce8708 100644 --- a/libs/api/project/stats.rs +++ b/libs/api/project/stats.rs @@ -24,4 +24,4 @@ pub async fn project_stats( let project_name = path.into_inner(); let resp = service.project_stats(&session, project_name).await?; Ok(ApiResponse::ok(resp).to_response()) -} \ No newline at end of file +} diff --git a/libs/api/robots.rs b/libs/api/robots.rs index e0182d4..afd6433 100644 --- a/libs/api/robots.rs +++ b/libs/api/robots.rs @@ -1,4 +1,4 @@ -use actix_web::{web, HttpResponse}; +use actix_web::{HttpResponse, web}; use service::AppService; /// Serves robots.txt, blocking all sensitive paths from crawlers. diff --git a/libs/api/room/member.rs b/libs/api/room/member.rs index f0af166..20e9a4b 100644 --- a/libs/api/room/member.rs +++ b/libs/api/room/member.rs @@ -167,4 +167,4 @@ pub async fn state_update_dnd( .await .map_err(ApiError::from)?; Ok(ApiResponse::ok(resp).to_response()) -} \ No newline at end of file +} diff --git a/libs/api/room/mod.rs b/libs/api/room/mod.rs index b22a604..9b7de4e 100644 --- a/libs/api/room/mod.rs +++ b/libs/api/room/mod.rs @@ -191,10 +191,7 @@ pub fn init_room_routes(cfg: &mut web::ServiceConfig) { web::delete().to(draft_and_history::draft_clear), ) // file upload - .route( - "/rooms/{room_id}/upload", - web::post().to(upload::upload), - ) + .route("/rooms/{room_id}/upload", web::post().to(upload::upload)) .route( "/rooms/{room_id}/attachments/{attachment_id}", web::get().to(upload::get_attachment), diff --git a/libs/api/room/room.rs b/libs/api/room/room.rs index 5ed6cd3..8fec3b6 100644 --- a/libs/api/room/room.rs +++ b/libs/api/room/room.rs @@ -1,11 +1,11 @@ use crate::{ApiResponse, error::ApiError}; use actix_web::{HttpResponse, Result, web}; +use room::presence::PresenceChanged; use room::ws_context::WsUserContext; use service::AppService; use session::Session; use utoipa::IntoParams; use uuid::Uuid; -use room::presence::PresenceChanged; #[derive(Debug, serde::Deserialize, IntoParams)] pub struct RoomListQuery { diff --git a/libs/api/room/upload.rs b/libs/api/room/upload.rs index 88089cc..7179761 100644 --- a/libs/api/room/upload.rs +++ b/libs/api/room/upload.rs @@ -1,6 +1,6 @@ use actix_multipart::Multipart; -use actix_web::{HttpResponse, Result, web}; use actix_web::http::header::{CONTENT_DISPOSITION, CONTENT_TYPE}; +use actix_web::{HttpResponse, Result, web}; use chrono::Utc; use futures_util::StreamExt; use models::rooms::room_attachment; @@ -48,14 +48,11 @@ pub async fn upload( .user() .ok_or_else(|| crate::error::ApiError(service::error::AppError::Unauthorized))?; - let storage = service - .storage - .as_ref() - .ok_or_else(|| { - crate::error::ApiError(service::error::AppError::BadRequest( - "Storage not configured".to_string(), - )) - })?; + let storage = service.storage.as_ref().ok_or_else(|| { + crate::error::ApiError(service::error::AppError::BadRequest( + "Storage not configured".to_string(), + )) + })?; let room_id = path.into_inner(); service @@ -125,14 +122,9 @@ pub async fn upload( let key = format!("rooms/{}/{}", room_id, unique_name); let file_size = file_data.len() as i64; - let _url = storage - .upload(&key, file_data) - .await - .map_err(|e| { - crate::error::ApiError(service::error::AppError::InternalServerError( - e.to_string(), - )) - })?; + let _url = storage.upload(&key, file_data).await.map_err(|e| { + crate::error::ApiError(service::error::AppError::InternalServerError(e.to_string())) + })?; // Write to room_attachment table (message will be linked when message is created) let attachment_id = Uuid::now_v7(); @@ -147,14 +139,9 @@ pub async fn upload( s3_key: Set(key), created_at: Set(Utc::now()), }; - attachment - .insert(&service.db) - .await - .map_err(|e| { - crate::error::ApiError(service::error::AppError::InternalServerError( - e.to_string(), - )) - })?; + attachment.insert(&service.db).await.map_err(|e| { + crate::error::ApiError(service::error::AppError::InternalServerError(e.to_string())) + })?; // Return the structured attachment URL instead of the /files/... path // (the /files/... path has no handler on the API server) @@ -205,30 +192,35 @@ pub async fn get_attachment( let attachment = room_attachment::Entity::find_by_id(attachment_id) .one(&service.db) .await - .map_err(|e| crate::error::ApiError(service::error::AppError::InternalServerError(e.to_string())))? - .ok_or_else(|| crate::error::ApiError(service::error::AppError::NotFound("attachment not found".into())))?; + .map_err(|e| { + crate::error::ApiError(service::error::AppError::InternalServerError(e.to_string())) + })? + .ok_or_else(|| { + crate::error::ApiError(service::error::AppError::NotFound( + "attachment not found".into(), + )) + })?; // Ensure the attachment belongs to the requested room if attachment.room != room_id { - return Err(crate::error::ApiError(service::error::AppError::NotFound("attachment not found".into()))); + return Err(crate::error::ApiError(service::error::AppError::NotFound( + "attachment not found".into(), + ))); } - let storage = service - .storage - .as_ref() - .ok_or_else(|| crate::error::ApiError(service::error::AppError::InternalServerError("Storage not configured".to_string())))?; + let storage = service.storage.as_ref().ok_or_else(|| { + crate::error::ApiError(service::error::AppError::InternalServerError( + "Storage not configured".to_string(), + )) + })?; - let (data, content_type) = storage - .read(&attachment.s3_key) - .await - .map_err(|e| crate::error::ApiError(service::error::AppError::InternalServerError(e.to_string())))?; + let (data, content_type) = storage.read(&attachment.s3_key).await.map_err(|e| { + crate::error::ApiError(service::error::AppError::InternalServerError(e.to_string())) + })?; Ok(HttpResponse::Ok() .content_type(content_type.clone()) - .insert_header(( - CONTENT_TYPE, - content_type, - )) + .insert_header((CONTENT_TYPE, content_type)) .insert_header(( CONTENT_DISPOSITION, format!("inline; filename=\"{}\"", attachment.file_name), diff --git a/libs/api/search/service.rs b/libs/api/search/service.rs index 4b93710..2786115 100644 --- a/libs/api/search/service.rs +++ b/libs/api/search/service.rs @@ -2,7 +2,9 @@ use crate::ApiResponse; use crate::error::ApiError; use actix_web::{HttpResponse, Result, web}; use service::AppService; -use service::search::{GlobalMessageSearchQuery, GlobalMessageSearchResponse, SearchQuery, SearchResponse}; +use service::search::{ + GlobalMessageSearchQuery, GlobalMessageSearchResponse, SearchQuery, SearchResponse, +}; use session::Session; #[utoipa::path( @@ -52,6 +54,8 @@ pub async fn search_messages( session: Session, query: web::Query, ) -> Result { - let resp = service.global_message_search(&session, query.into_inner()).await?; + let resp = service + .global_message_search(&session, query.into_inner()) + .await?; Ok(ApiResponse::ok(resp).to_response()) } diff --git a/libs/api/sidemap.rs b/libs/api/sidemap.rs index bd0679d..128ea5b 100644 --- a/libs/api/sidemap.rs +++ b/libs/api/sidemap.rs @@ -1,4 +1,4 @@ -use actix_web::{web, HttpResponse}; +use actix_web::{HttpResponse, web}; use db::cache::AppCache; use models::projects::project::{Column as PCol, Entity as PEntity}; use models::repos::repo::{Column as RCol, Entity as REntity}; diff --git a/libs/api/skill.rs b/libs/api/skill.rs index b54c8a9..872ff4d 100644 --- a/libs/api/skill.rs +++ b/libs/api/skill.rs @@ -38,9 +38,7 @@ pub async fn skill_list( query: web::Query, ) -> Result { let project_name = path.into_inner(); - let project = service - .project_info(&session, project_name.clone()) - .await?; + let project = service.project_info(&session, project_name.clone()).await?; let q = service::skill::info::SkillListQuery { source: query.source.clone(), @@ -73,14 +71,9 @@ pub async fn skill_get( session: Session, path: web::Path, ) -> Result { - let SkillPath { - project_name, - slug, - } = path.into_inner(); + let SkillPath { project_name, slug } = path.into_inner(); - let project = service - .project_info(&session, project_name.clone()) - .await?; + let project = service.project_info(&session, project_name.clone()).await?; let skill = service .skill_get(project.uid.to_string(), slug, &session) @@ -108,9 +101,7 @@ pub async fn skill_create( body: web::Json, ) -> Result { let project_name = path.into_inner(); - let project = service - .project_info(&session, project_name.clone()) - .await?; + let project = service.project_info(&session, project_name.clone()).await?; let skill = service .skill_create(project.uid.to_string(), body.into_inner(), &session) @@ -140,14 +131,9 @@ pub async fn skill_update( path: web::Path, body: web::Json, ) -> Result { - let SkillPath { - project_name, - slug, - } = path.into_inner(); + let SkillPath { project_name, slug } = path.into_inner(); - let project = service - .project_info(&session, project_name.clone()) - .await?; + let project = service.project_info(&session, project_name.clone()).await?; let skill = service .skill_update(project.uid.to_string(), slug, body.into_inner(), &session) @@ -175,14 +161,9 @@ pub async fn skill_delete( session: Session, path: web::Path, ) -> Result { - let SkillPath { - project_name, - slug, - } = path.into_inner(); + let SkillPath { project_name, slug } = path.into_inner(); - let project = service - .project_info(&session, project_name.clone()) - .await?; + let project = service.project_info(&session, project_name.clone()).await?; let result = service .skill_delete(project.uid.to_string(), slug, &session) @@ -207,20 +188,17 @@ pub async fn skill_scan( path: web::Path, ) -> Result { let project_name = path.into_inner(); - let project = service - .project_info(&session, project_name) - .await?; + let project = service.project_info(&session, project_name).await?; - let result = service - .skill_scan_repos(project.uid, project.uid) - .await?; + let result = service.skill_scan_repos(project.uid, project.uid).await?; Ok(ApiResponse::ok(ScanResponse { discovered: result.discovered, created: result.created, updated: result.updated, removed: result.removed, - }).to_response()) + }) + .to_response()) } #[derive(serde::Serialize, utoipa::ToSchema)] @@ -230,4 +208,3 @@ pub struct ScanResponse { pub updated: i64, pub removed: i64, } - diff --git a/libs/api/user/billing.rs b/libs/api/user/billing.rs index 76be0ac..543412a 100644 --- a/libs/api/user/billing.rs +++ b/libs/api/user/billing.rs @@ -52,6 +52,8 @@ pub async fn user_billing_history( session: Session, query: web::Query, ) -> Result { - let resp = service.user_billing_history(&session, query.into_inner()).await?; + let resp = service + .user_billing_history(&session, query.into_inner()) + .await?; Ok(ApiResponse::ok(resp).to_response()) -} \ No newline at end of file +} diff --git a/libs/api/user/mod.rs b/libs/api/user/mod.rs index 07ed691..1749e21 100644 --- a/libs/api/user/mod.rs +++ b/libs/api/user/mod.rs @@ -10,9 +10,9 @@ pub mod repository; pub mod ssh_key; pub mod stars; pub mod subscribe; +pub mod summary; pub mod user_activity; pub mod user_info; -pub mod summary; use actix_web::web; @@ -73,10 +73,7 @@ pub fn init_user_routes(cfg: &mut web::ServiceConfig) { "/me/heatmap", web::get().to(chpc::get_my_contribution_heatmap), ) - .route( - "/me/billing", - web::get().to(billing::user_billing), - ) + .route("/me/billing", web::get().to(billing::user_billing)) .route( "/me/billing/errors", web::get().to(billing::user_billing_errors), @@ -99,13 +96,22 @@ pub fn init_user_routes(cfg: &mut web::ServiceConfig) { web::get().to(profile::get_profile_by_username), ) .route("/{username}/info", web::get().to(user_info::get_user_info)) - .route("/{username}/summary", web::get().to(summary::get_user_summary)) + .route( + "/{username}/summary", + web::get().to(summary::get_user_summary), + ) .route( "/{username}/heatmap", web::get().to(chpc::get_contribution_heatmap), ) - .route("/{username}/keys", web::get().to(ssh_key::list_user_ssh_keys)) - .route("/{username}/activity", web::get().to(user_activity::get_user_activity)) + .route( + "/{username}/keys", + web::get().to(ssh_key::list_user_ssh_keys), + ) + .route( + "/{username}/activity", + web::get().to(user_activity::get_user_activity), + ) .route("/{username}/stars", web::get().to(stars::get_user_stars)) .route( "/{username}/keys/{key_id}", diff --git a/libs/api/user/notification.rs b/libs/api/user/notification.rs index 0f0305a..55e3c6c 100644 --- a/libs/api/user/notification.rs +++ b/libs/api/user/notification.rs @@ -1,9 +1,9 @@ use actix_web::{HttpResponse, Result, web}; -use service::error::AppError; use service::AppService; +use service::error::AppError; use session::Session; -use crate::{error::ApiError, ApiResponse}; +use crate::{ApiResponse, error::ApiError}; #[derive(serde::Serialize, utoipa::ToSchema)] pub struct VapidKeyResponse { @@ -22,9 +22,7 @@ pub struct VapidKeyResponse { pub async fn get_vapid_public_key( service: web::Data, ) -> Result { - let public_key = service - .config - .vapid_public_key(); + let public_key = service.config.vapid_public_key(); let public_key = match public_key { Some(k) => k, None => { diff --git a/libs/api/user/ssh_key.rs b/libs/api/user/ssh_key.rs index 59931cc..3ffaf6f 100644 --- a/libs/api/user/ssh_key.rs +++ b/libs/api/user/ssh_key.rs @@ -126,8 +126,6 @@ pub async fn list_user_ssh_keys( path: web::Path, ) -> Result { let username = path.into_inner(); - let resp = service - .user_list_ssh_keys_by_username(username) - .await?; + let resp = service.user_list_ssh_keys_by_username(username).await?; Ok(ApiResponse::ok(resp).to_response()) }