refactor(api): apply rustfmt and update fork API + project stats endpoint

This commit is contained in:
ZhenYi 2026-05-14 10:01:39 +08:00
parent e29ef0e76d
commit 724858a721
33 changed files with 527 additions and 327 deletions

View File

@ -33,7 +33,9 @@ pub async fn triage_issue(
let user_id = session.user().ok_or(crate::error::ApiError( let user_id = session.user().ok_or(crate::error::ApiError(
service::error::AppError::Unauthorized, service::error::AppError::Unauthorized,
))?; ))?;
let project = service.utils_find_project_by_name(project_name.clone()).await?; let project = service
.utils_find_project_by_name(project_name.clone())
.await?;
// Verify user has access to the project before triggering AI triage // Verify user has access to the project before triggering AI triage
service service
.check_project_access(project.id, user_id) .check_project_access(project.id, user_id)

View File

@ -37,10 +37,22 @@ pub fn init_auth_routes(cfg: &mut actix_web::web::ServiceConfig) {
"/password/confirm", "/password/confirm",
actix_web::web::post().to(password::api_user_confirm_password_reset), actix_web::web::post().to(password::api_user_confirm_password_reset),
) )
.route("/2fa/enable", actix_web::web::post().to(totp::api_2fa_enable)) .route(
.route("/2fa/verify", actix_web::web::post().to(totp::api_2fa_verify)) "/2fa/enable",
.route("/2fa/disable", actix_web::web::post().to(totp::api_2fa_disable)) actix_web::web::post().to(totp::api_2fa_enable),
.route("/2fa/status", actix_web::web::post().to(totp::api_2fa_status)) )
.route(
"/2fa/verify",
actix_web::web::post().to(totp::api_2fa_verify),
)
.route(
"/2fa/disable",
actix_web::web::post().to(totp::api_2fa_disable),
)
.route(
"/2fa/status",
actix_web::web::post().to(totp::api_2fa_status),
)
.route("/email", actix_web::web::post().to(email::api_email_get)) .route("/email", actix_web::web::post().to(email::api_email_get))
.route( .route(
"/email/change", "/email/change",

View File

@ -2,7 +2,9 @@ use crate::ApiResponse;
use crate::error::ApiError; use crate::error::ApiError;
use actix_web::{HttpResponse, Result, web}; use actix_web::{HttpResponse, Result, web};
use service::AppService; use service::AppService;
use service::auth::password::{ChangePasswordParams, ConfirmResetPasswordParams, ResetPasswordParams}; use service::auth::password::{
ChangePasswordParams, ConfirmResetPasswordParams, ResetPasswordParams,
};
use session::Session; use session::Session;
#[utoipa::path( #[utoipa::path(

View File

@ -30,7 +30,9 @@ pub async fn ws_token_generate(
service: web::Data<AppService>, service: web::Data<AppService>,
session: Session, session: Session,
) -> Result<HttpResponse, ApiError> { ) -> Result<HttpResponse, ApiError> {
let user_id = session.user().ok_or_else(|| ApiError::from(AppError::Unauthorized))?; let user_id = session
.user()
.ok_or_else(|| ApiError::from(AppError::Unauthorized))?;
let device_id = session.get::<String>("device_id").unwrap_or_default(); let device_id = session.get::<String>("device_id").unwrap_or_default();
let client_id = session.get::<String>("client_id").unwrap_or_default(); let client_id = session.get::<String>("client_id").unwrap_or_default();

View File

@ -7,9 +7,9 @@ use std::fs;
use std::io::Write; use std::io::Write;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use sha2::{Digest, Sha256};
use flate2::write::GzEncoder;
use flate2::Compression; use flate2::Compression;
use flate2::write::GzEncoder;
use sha2::{Digest, Sha256};
// ── Compression helpers ────────────────────────────────────────────────── // ── Compression helpers ──────────────────────────────────────────────────
@ -37,7 +37,10 @@ fn compute_etag(data: &[u8]) -> String {
hasher.update(data); hasher.update(data);
let hash = hasher.finalize(); let hash = hasher.finalize();
// First 32 hex chars for a compact etag // First 32 hex chars for a compact etag
hash.iter().map(|b| format!("{:02x}", b)).take(16).collect::<String>() hash.iter()
.map(|b| format!("{:02x}", b))
.take(16)
.collect::<String>()
} }
// ── Asset collection ───────────────────────────────────────────────────── // ── Asset collection ─────────────────────────────────────────────────────
@ -59,9 +62,8 @@ fn collect_assets(dist_dir: &Path) -> BTreeMap<String, Asset> {
continue; continue;
} }
let data = fs::read(&entry).unwrap_or_else(|e| { let data = fs::read(&entry)
panic!("Failed to read dist file {}: {}", path_str, e) .unwrap_or_else(|e| panic!("Failed to read dist file {}: {}", path_str, e));
});
let etag = compute_etag(&data); let etag = compute_etag(&data);
let brotli_data = brotli_compress(&data); let brotli_data = brotli_compress(&data);
@ -106,7 +108,11 @@ fn rust_byte_literal(data: &[u8]) -> String {
let lines: Vec<String> = data let lines: Vec<String> = data
.chunks(80) .chunks(80)
.map(|chunk| { .map(|chunk| {
chunk.iter().map(|b| b.to_string()).collect::<Vec<_>>().join(", ") chunk
.iter()
.map(|b| b.to_string())
.collect::<Vec<_>>()
.join(", ")
}) })
.collect(); .collect();
format!("[\n{}\n]", lines.join(",\n")) format!("[\n{}\n]", lines.join(",\n"))
@ -114,7 +120,11 @@ fn rust_byte_literal(data: &[u8]) -> String {
} }
fn path_to_ident(path: &str) -> String { fn path_to_ident(path: &str) -> String {
let s = path.replace('-', "_").replace('.', "_").replace('/', "_").to_uppercase(); let s = path
.replace('-', "_")
.replace('.', "_")
.replace('/', "_")
.to_uppercase();
format!("ASSET_{s}") format!("ASSET_{s}")
} }
@ -143,12 +153,20 @@ fn generate_frontend_module(assets: &BTreeMap<String, Asset>, out_dir: &Path) {
let br_id = br_ident(path); let br_id = br_ident(path);
let gz_id = gz_ident(path); let gz_id = gz_ident(path);
code += &format!("static {}: &[u8] = &{};\n", ident, rust_byte_literal(&asset.data)); code += &format!(
"static {}: &[u8] = &{};\n",
ident,
rust_byte_literal(&asset.data)
);
code += &format!("static {}: &str = \"{}\";\n", etag_id, asset.etag); code += &format!("static {}: &str = \"{}\";\n", etag_id, asset.etag);
if let Some(ref br) = asset.brotli { if let Some(ref br) = asset.brotli {
code += &format!("static {}: &[u8] = &{};\n", br_id, rust_byte_literal(br)); code += &format!("static {}: &[u8] = &{};\n", br_id, rust_byte_literal(br));
} }
code += &format!("static {}: &[u8] = &{};\n", gz_id, rust_byte_literal(&asset.gzip)); code += &format!(
"static {}: &[u8] = &{};\n",
gz_id,
rust_byte_literal(&asset.gzip)
);
code += "\n"; code += "\n";
} }
@ -185,18 +203,13 @@ fn generate_frontend_module(assets: &BTreeMap<String, Asset>, out_dir: &Path) {
code += "}\n"; code += "}\n";
let out_path = out_dir.join("frontend.rs"); let out_path = out_dir.join("frontend.rs");
fs::write(&out_path, code).unwrap_or_else(|e| { fs::write(&out_path, code)
panic!("Failed to write generated frontend.rs: {}", e) .unwrap_or_else(|e| panic!("Failed to write generated frontend.rs: {}", e));
});
} }
fn main() { fn main() {
let manifest_dir = env::var("CARGO_MANIFEST_DIR").unwrap(); let manifest_dir = env::var("CARGO_MANIFEST_DIR").unwrap();
let workspace_root = Path::new(&manifest_dir) let workspace_root = Path::new(&manifest_dir).parent().unwrap().parent().unwrap();
.parent()
.unwrap()
.parent()
.unwrap();
let dist_dir = workspace_root.join("dist"); let dist_dir = workspace_root.join("dist");
if !dist_dir.exists() { if !dist_dir.exists() {
@ -215,7 +228,10 @@ fn main() {
println!("cargo:rerun-if-changed=dist/"); println!("cargo:rerun-if-changed=dist/");
let assets = collect_assets(&dist_dir); let assets = collect_assets(&dist_dir);
println!("cargo:warning=Collected {} frontend assets from dist/", assets.len()); println!(
"cargo:warning=Collected {} frontend assets from dist/",
assets.len()
);
let out_dir = env::var("OUT_DIR").unwrap(); let out_dir = env::var("OUT_DIR").unwrap();
generate_frontend_module(&assets, Path::new(&out_dir)); generate_frontend_module(&assets, Path::new(&out_dir));

View File

@ -1,15 +1,17 @@
use actix_web::{web, HttpResponse, Result}; use actix_web::{HttpResponse, Result, web};
use session::Session;
use service::error::AppError; use service::error::AppError;
use session::Session;
use uuid::Uuid; use uuid::Uuid;
use crate::error::ApiError;
use crate::ApiResponse; use crate::ApiResponse;
use crate::error::ApiError;
use super::types::{ConversationListQuery, ConversationResponse, CreateConversationParams}; use super::types::{ConversationListQuery, ConversationResponse, CreateConversationParams};
fn get_user_id(session: &Session) -> Result<Uuid, ApiError> { fn get_user_id(session: &Session) -> Result<Uuid, ApiError> {
session.user().ok_or_else(|| ApiError::from(AppError::Unauthorized)) session
.user()
.ok_or_else(|| ApiError::from(AppError::Unauthorized))
} }
#[utoipa::path( #[utoipa::path(
@ -55,6 +57,7 @@ pub async fn conversation_create(
operation_id = "ai_conversation_list", operation_id = "ai_conversation_list",
params( params(
("project_id" = Option<Uuid>, Query, description = "Filter by project"), ("project_id" = Option<Uuid>, Query, description = "Filter by project"),
("q" = Option<String>, Query, description = "Search query (title)"),
), ),
responses( responses(
(status = 200, description = "List of conversations", body = ApiResponse<Vec<ConversationResponse>>), (status = 200, description = "List of conversations", body = ApiResponse<Vec<ConversationResponse>>),
@ -70,13 +73,11 @@ pub async fn conversation_list(
let user_id = get_user_id(&session)?; let user_id = get_user_id(&session)?;
let convs = service let convs = service
.list_conversations(user_id, query.project_id, 50) .list_conversations(user_id, query.project_id, 50, query.q.clone())
.await?; .await?;
let resp: Vec<ConversationResponse> = convs let resp: Vec<ConversationResponse> =
.into_iter() convs.into_iter().map(ConversationResponse::from).collect();
.map(ConversationResponse::from)
.collect();
Ok(ApiResponse::ok(resp).to_response()) Ok(ApiResponse::ok(resp).to_response())
} }

View File

@ -1,78 +1,64 @@
use actix_web::{web, HttpResponse, Result}; use actix_web::{HttpResponse, Result, web};
use session::Session;
use service::error::AppError; use service::error::AppError;
use session::Session;
use uuid::Uuid; use uuid::Uuid;
use crate::error::ApiError;
use crate::ApiResponse; use crate::ApiResponse;
use crate::error::ApiError;
#[derive(Debug, serde::Serialize, utoipa::ToSchema)] #[derive(Debug, serde::Serialize, utoipa::ToSchema)]
pub struct ForkResponse { pub struct ForkConversationResponse {
pub id: Uuid, pub id: Uuid,
pub conversation_id: Option<Uuid>, pub title: Option<String>,
pub source_message_id: Uuid, pub model: String,
pub fork_message_id: Uuid,
#[schema(value_type = chrono::DateTime<chrono::Utc>)]
pub created_at: chrono::DateTime<chrono::Utc>, pub created_at: chrono::DateTime<chrono::Utc>,
} }
/// Fork a conversation from a specific message, creating a new conversation
/// with all messages up to and including the source message.
#[utoipa::path( #[utoipa::path(
post, post,
path = "/api/ai/conversations/{conversation_id}/messages/{message_id}/fork/{target_message_id}", path = "/api/ai/conversations/{conversation_id}/messages/{message_id}/fork",
operation_id = "ai_message_fork", operation_id = "ai_conversation_fork",
params( params(
("conversation_id" = Uuid, Path, description = "Conversation ID"), ("conversation_id" = Uuid, Path, description = "Conversation ID"),
("message_id" = Uuid, Path, description = "Source message ID"), ("message_id" = Uuid, Path, description = "Source message ID to fork from"),
("target_message_id" = Uuid, Path, description = "Target/fork message ID to create"),
), ),
responses( responses(
(status = 200, description = "Fork created", body = ApiResponse<ForkResponse>), (status = 200, description = "Conversation forked", body = ApiResponse<ForkConversationResponse>),
), ),
tag = "AI Chat" tag = "AI Chat"
)] )]
pub async fn message_fork( pub async fn message_fork(
service: web::Data<service::AppService>, service: web::Data<service::AppService>,
session: Session, session: Session,
path: web::Path<(Uuid, Uuid, Uuid)>, path: web::Path<(Uuid, Uuid)>,
) -> Result<HttpResponse, ApiError> { ) -> Result<HttpResponse, ApiError> {
let user_id = session let user_id = session
.user() .user()
.ok_or_else(|| ApiError::from(AppError::Unauthorized))?; .ok_or_else(|| ApiError::from(AppError::Unauthorized))?;
let (conversation_id, source_message_id, target_message_id) = path.into_inner(); let (conversation_id, source_message_id) = path.into_inner();
let fork_record = service let new_conv = service
.fork_message( .fork_conversation_from_message(user_id, conversation_id, source_message_id)
conversation_id,
user_id,
source_message_id,
target_message_id,
)
.await?; .await?;
let resp = ForkResponse { let resp = ForkConversationResponse {
id: fork_record.id, id: new_conv.id,
conversation_id: fork_record.conversation_id, title: new_conv.title,
source_message_id: fork_record.source_message_id, model: new_conv.model,
fork_message_id: fork_record.fork_message_id, created_at: new_conv.created_at,
created_at: fork_record.created_at,
}; };
Ok(ApiResponse::ok(resp).to_response()) Ok(ApiResponse::ok(resp).to_response())
} }
#[utoipa::path( #[derive(Debug, serde::Serialize, utoipa::ToSchema)]
get, pub struct ForkListResponse {
path = "/api/ai/conversations/{conversation_id}/messages/{message_id}/forks", pub forks: Vec<ForkConversationResponse>,
operation_id = "ai_message_forks", }
params(
("conversation_id" = Uuid, Path, description = "Conversation ID"), /// List all forks created from a specific message.
("message_id" = Uuid, Path, description = "Source message ID"),
),
responses(
(status = 200, description = "List forks from message", body = ApiResponse<Vec<ForkResponse>>),
),
tag = "AI Chat"
)]
pub async fn message_forks( pub async fn message_forks(
service: web::Data<service::AppService>, service: web::Data<service::AppService>,
session: Session, session: Session,
@ -87,16 +73,18 @@ pub async fn message_forks(
.list_forks(conversation_id, user_id, source_message_id) .list_forks(conversation_id, user_id, source_message_id)
.await?; .await?;
let resp: Vec<ForkResponse> = forks let fork_responses: Vec<ForkConversationResponse> = forks
.into_iter() .into_iter()
.map(|f| ForkResponse { .map(|f| ForkConversationResponse {
id: f.id, id: f.fork_message_id,
conversation_id: f.conversation_id, title: None,
source_message_id: f.source_message_id, model: String::new(),
fork_message_id: f.fork_message_id,
created_at: f.created_at, created_at: f.created_at,
}) })
.collect(); .collect();
Ok(ApiResponse::ok(resp).to_response()) Ok(ApiResponse::ok(ForkListResponse {
forks: fork_responses,
})
.to_response())
} }

View File

@ -1,14 +1,18 @@
use crate::error::ApiError;
use crate::ApiResponse; use crate::ApiResponse;
use actix_web::{web, HttpResponse, Result}; use crate::error::ApiError;
use session::Session; use actix_web::{HttpResponse, Result, web};
use models::ai::AiMessage;
use sea_orm::EntityTrait;
use service::error::AppError; use service::error::AppError;
use session::Session;
use uuid::Uuid; use uuid::Uuid;
use super::types::{CreateMessageParams, EditMessageParams, MessageListQuery, MessageResponse}; use super::types::{CreateMessageParams, EditMessageParams, MessageListQuery, MessageResponse};
fn get_user_id(session: &Session) -> Result<Uuid, ApiError> { fn get_user_id(session: &Session) -> Result<Uuid, ApiError> {
session.user().ok_or_else(|| ApiError::from(AppError::Unauthorized)) session
.user()
.ok_or_else(|| ApiError::from(AppError::Unauthorized))
} }
#[utoipa::path( #[utoipa::path(
@ -39,10 +43,7 @@ pub async fn message_list(
.list_messages(conversation_id, user_id, limit) .list_messages(conversation_id, user_id, limit)
.await?; .await?;
let resp: Vec<MessageResponse> = msgs let resp: Vec<MessageResponse> = msgs.into_iter().map(MessageResponse::from).collect();
.into_iter()
.map(MessageResponse::from)
.collect();
Ok(ApiResponse::ok(resp).to_response()) Ok(ApiResponse::ok(resp).to_response())
} }
@ -75,7 +76,7 @@ pub async fn message_create(
conversation_id, conversation_id,
user_id, user_id,
params.parent_message_id, params.parent_message_id,
params.content.role.clone(), "user".to_string(),
params.content.content.clone(), params.content.content.clone(),
params.model.clone(), params.model.clone(),
params.is_fork_origin.unwrap_or(false), params.is_fork_origin.unwrap_or(false),
@ -200,10 +201,7 @@ pub async fn message_children(
.list_child_messages(conversation_id, user_id, parent_message_id) .list_child_messages(conversation_id, user_id, parent_message_id)
.await?; .await?;
let resp: Vec<MessageResponse> = msgs let resp: Vec<MessageResponse> = msgs.into_iter().map(MessageResponse::from).collect();
.into_iter()
.map(MessageResponse::from)
.collect();
Ok(ApiResponse::ok(resp).to_response()) Ok(ApiResponse::ok(resp).to_response())
} }
@ -235,6 +233,15 @@ pub async fn message_stream(
let model = conv.model; let model = conv.model;
let msg = AiMessage::find_by_id(message_id)
.one(service.db.reader())
.await
.map_err(AppError::from)?
.ok_or_else(|| ApiError::from(AppError::NotFound("message".into())))?;
if msg.conversation_id != conversation_id || msg.role != "user" || !msg.is_latest {
return Err(ApiError::from(AppError::NotFound("message".into())));
}
let response = actix_web::HttpResponse::Ok() let response = actix_web::HttpResponse::Ok()
.content_type("text/event-stream") .content_type("text/event-stream")
.insert_header(("Cache-Control", "no-cache")) .insert_header(("Cache-Control", "no-cache"))
@ -306,10 +313,7 @@ pub async fn message_versions(
.list_message_versions(conversation_id, user_id, message_id) .list_message_versions(conversation_id, user_id, message_id)
.await?; .await?;
let resp: Vec<MessageResponse> = versions let resp: Vec<MessageResponse> = versions.into_iter().map(MessageResponse::from).collect();
.into_iter()
.map(MessageResponse::from)
.collect();
Ok(ApiResponse::ok(resp).to_response()) Ok(ApiResponse::ok(resp).to_response())
} }

View File

@ -1,15 +1,17 @@
use actix_web::{web, HttpResponse, Result}; use actix_web::{HttpResponse, Result, web};
use session::Session;
use service::error::AppError; use service::error::AppError;
use session::Session;
use uuid::Uuid; use uuid::Uuid;
use crate::error::ApiError;
use crate::ApiResponse; use crate::ApiResponse;
use crate::error::ApiError;
use super::types::{ConversationResponse, ShareResponse}; use super::types::{ConversationResponse, ShareResponse};
fn get_user_id(session: &Session) -> Result<Uuid, ApiError> { fn get_user_id(session: &Session) -> Result<Uuid, ApiError> {
session.user().ok_or_else(|| ApiError::from(AppError::Unauthorized)) session
.user()
.ok_or_else(|| ApiError::from(AppError::Unauthorized))
} }
#[utoipa::path( #[utoipa::path(
@ -33,9 +35,7 @@ pub async fn conversation_share(
let user_id = get_user_id(&session)?; let user_id = get_user_id(&session)?;
let conversation_id = path.into_inner(); let conversation_id = path.into_inner();
let (share, share_token) = service let (share, share_token) = service.share_conversation(conversation_id, user_id).await?;
.share_conversation(conversation_id, user_id)
.await?;
let resp = ShareResponse { let resp = ShareResponse {
id: share.id, id: share.id,

View File

@ -56,6 +56,7 @@ pub struct UpdateConversationParams {
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
pub struct ConversationListQuery { pub struct ConversationListQuery {
pub project_id: Option<Uuid>, pub project_id: Option<Uuid>,
pub q: Option<String>,
} }
#[derive(Debug, Deserialize, utoipa::ToSchema)] #[derive(Debug, Deserialize, utoipa::ToSchema)]

View File

@ -7,7 +7,10 @@ pub mod watch;
pub fn init_chat_routes(cfg: &mut web::ServiceConfig) { pub fn init_chat_routes(cfg: &mut web::ServiceConfig) {
cfg.service( cfg.service(
web::scope("/ai/conversations") web::scope("/ai/conversations")
.route("", web::post().to(handlers::conversation::conversation_create)) .route(
"",
web::post().to(handlers::conversation::conversation_create),
)
.route("", web::get().to(handlers::conversation::conversation_list)) .route("", web::get().to(handlers::conversation::conversation_list))
.route( .route(
"/{conversation_id}", "/{conversation_id}",
@ -54,7 +57,7 @@ pub fn init_chat_routes(cfg: &mut web::ServiceConfig) {
web::post().to(handlers::message::message_resend), web::post().to(handlers::message::message_resend),
) )
.route( .route(
"/{conversation_id}/messages/{message_id}/fork/{target_message_id}", "/{conversation_id}/messages/{message_id}/fork",
web::post().to(handlers::fork::message_fork), web::post().to(handlers::fork::message_fork),
) )
.route( .route(

View File

@ -1,17 +1,20 @@
use agent::chat::chat_execution; use agent::chat::chat_execution;
use agent::chat::{normalize_thinking_content, AiChunkType, AiStreamChunk}; use agent::chat::{AiChunkType, AiStreamChunk, normalize_thinking_content};
use agent::client::AiClientConfig; use agent::client::AiClientConfig;
use agent::client::types::ChatRequestMessage;
use agent::client::StreamChunkType; use agent::client::StreamChunkType;
use agent::client::types::ChatRequestMessage;
use agent::react::PERSONAL_CONTEXT_PROMPT;
use futures::StreamExt; use futures::StreamExt;
use models::ai::{ai_message, ai_conversation, AiMessage};
use models::agents::{model, model_version}; use models::agents::{model, model_version};
use models::ai::{AiMessage, ai_conversation, ai_message};
use queue::{ChatMessageEvent, ChatStreamChunkEvent}; use queue::{ChatMessageEvent, ChatStreamChunkEvent};
use sea_orm::{EntityTrait, QueryFilter, ColumnTrait, QueryOrder, ActiveModelTrait, Set, PaginatorTrait}; use sea_orm::{
ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, Set,
};
use service::AppService; use service::AppService;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use uuid::Uuid; use uuid::Uuid;
@ -40,6 +43,10 @@ pub fn create_chat_sse_stream(
msg_id, msg_id,
started_at started_at
)).await; )).await;
let _ = tx
.send("data: {\"event\":\"done\",\"data\":\"recovery\"}\n\n".to_string())
.await;
return;
} }
let queue = service.queue_producer.clone(); let queue = service.queue_producer.clone();
@ -49,7 +56,8 @@ pub fn create_chat_sse_stream(
let messages = match build_messages_from_history(&service, conversation_id).await { let messages = match build_messages_from_history(&service, conversation_id).await {
Ok(msgs) => msgs, Ok(msgs) => msgs,
Err(e) => { Err(e) => {
let _ = tx.send(format!("data: {{\"event\":\"error\",\"data\":\"{}\"}}\n\n", e)).await; let payload = serde_json::json!({"event":"error","data": e.to_string()});
let _ = tx.send(format!("data: {}\n\n", payload)).await;
return; return;
} }
}; };
@ -58,14 +66,24 @@ pub fn create_chat_sse_stream(
let api_key = match service.config.ai_api_key() { let api_key = match service.config.ai_api_key() {
Ok(k) => k, Ok(k) => k,
Err(_) => { Err(_) => {
let _ = tx.send("data: {\"event\":\"error\",\"data\":\"AI not configured\"}\n\n".to_string()).await; let _ = tx
.send(
"data: {\"event\":\"error\",\"data\":\"AI not configured\"}\n\n"
.to_string(),
)
.await;
return; return;
} }
}; };
let base_url = match service.config.ai_basic_url() { let base_url = match service.config.ai_basic_url() {
Ok(u) => u, Ok(u) => u,
Err(_) => { Err(_) => {
let _ = tx.send("data: {\"event\":\"error\",\"data\":\"AI not configured\"}\n\n".to_string()).await; let _ = tx
.send(
"data: {\"event\":\"error\",\"data\":\"AI not configured\"}\n\n"
.to_string(),
)
.await;
return; return;
} }
}; };
@ -82,55 +100,110 @@ pub fn create_chat_sse_stream(
None => (Vec::new(), None, None), None => (Vec::new(), None, None),
}; };
// Get project_id from conversation // Get project_id and scope from conversation
let project_id = match service.find_conversation(conversation_id).await { let (project_id, conv_project_id, is_personal) =
Ok(c) => c.project_id.unwrap_or(Uuid::nil()), match service.find_conversation(conversation_id).await {
Err(_) => { Ok(c) => {
let _ = tx.send("data: {\"event\":\"error\",\"data\":\"conversation not found\"}\n\n".to_string()).await; let conv_project_id = c.project_id;
return; (
} conv_project_id.unwrap_or(Uuid::nil()),
conv_project_id,
conv_project_id.is_none(),
)
}
Err(_) => {
let _ = tx
.send(
"data: {\"event\":\"error\",\"data\":\"conversation not found\"}\n\n"
.to_string(),
)
.await;
return;
}
};
// In personal scope: filter out project/git/repo tools and inject personal context prompt
let tools = if is_personal {
tools
.into_iter()
.filter(|t| {
let name = t
.get("function")
.and_then(|f| f.get("name"))
.and_then(|n| n.as_str())
.unwrap_or("");
!name.starts_with("project_")
&& !name.starts_with("git_")
&& !name.starts_with("repo_")
&& name != "send_message"
&& name != "retract_message"
})
.collect()
} else {
tools
};
// Inject personal context system prompt for non-project chats
let messages = if is_personal {
let mut msgs = messages;
msgs.insert(
0,
ChatRequestMessage::system(PERSONAL_CONTEXT_PROMPT.to_string()),
);
msgs
} else {
messages
}; };
// Pre-flight balance check: verify project + user can afford at least a minimal AI call // Pre-flight balance check: verify project + user can afford at least a minimal AI call
let balance_ok = agent::billing::check_balance( if !is_personal {
&service.db, project_id, user_id, Uuid::nil(), 500, 250, let balance_ok = agent::billing::check_balance(
).await; &service.db,
project_id,
user_id,
Uuid::nil(),
500,
250,
)
.await;
match balance_ok { match balance_ok {
Ok(true) => {}, Ok(true) => {}
Ok(false) => { Ok(false) => {
tracing::warn!(project_id = %project_id, user_id = %user_id, "Insufficient balance for chat AI call"); tracing::warn!(project_id = %project_id, user_id = %user_id, "Insufficient balance for chat AI call");
let _ = agent::billing::persist_billing_error( let _ = agent::billing::persist_billing_error(
&service.db, "user", user_id, "insufficient_balance", &service.db, "user", user_id, "insufficient_balance",
&format!("Insufficient balance. Your account does not have enough funds for this AI request."), &format!("Insufficient balance. Your account does not have enough funds for this AI request."),
Some(serde_json::json!({ Some(serde_json::json!({
"user_id": user_id.to_string(), "user_id": user_id.to_string(),
"project_id": project_id.to_string(), "project_id": project_id.to_string(),
})), })),
).await; ).await;
let error_msg = "Insufficient balance. Your account does not have enough funds to process this AI request. Please add credits to continue."; let error_msg = "Insufficient balance. Your account does not have enough funds to process this AI request. Please add credits to continue.";
let _ = tx.send(format!("data: {{\"event\":\"billing_error\",\"data\":\"{}\"}}\n\n", error_msg)).await; let payload = serde_json::json!({"event":"billing_error","data":error_msg});
let _ = tx.send("data: {\"event\":\"done\",\"data\":\"billing_error\"}\n\n".to_string()).await; let _ = tx.send(format!("data: {}\n\n", payload)).await;
return; let _ = tx
}, .send(
Err(e) => { "data: {\"event\":\"done\",\"data\":\"billing_error\"}\n\n".to_string(),
tracing::warn!(error = %e, "Balance check failed, proceeding without pre-flight check"); )
.await;
return;
}
Err(e) => {
tracing::warn!(error = %e, "Balance check failed, proceeding without pre-flight check");
}
} }
} }
let max_tool_depth = 99; let max_tool_depth = 99;
let assistant_msg_id = Uuid::now_v7();
// Determine conversation project_id for chat message event // Determine conversation project_id for chat message event
let conv_project_id = match service.find_conversation(conversation_id).await {
Ok(c) => c.project_id,
Err(_) => None,
};
// Broadcast chat message start event via NATS // Broadcast chat message start event via NATS
let chat_msg = ChatMessageEvent { let chat_msg = ChatMessageEvent {
message_id: user_message_id, message_id: assistant_msg_id,
conversation_id, conversation_id,
project_id: conv_project_id, project_id: conv_project_id,
sender_id: Uuid::nil(), sender_id: Uuid::nil(),
@ -144,7 +217,16 @@ pub fn create_chat_sse_stream(
let _ = queue.publish_chat_message(&chat_msg).await; let _ = queue.publish_chat_message(&chat_msg).await;
// Mark stream as active in Redis so page refresh can recover // Mark stream as active in Redis so page refresh can recover
let _ = cache.set_chat_stream_active(conversation_id, user_message_id).await; let _ = cache
.set_chat_stream_active(conversation_id, user_message_id)
.await;
// Clear any stale cancel flag before starting
let _ = cache.clear_chat_stream_cancelled(conversation_id).await;
// Cancellation token — checked in on_chunk and by a periodic poller
let cancelled = Arc::new(std::sync::atomic::AtomicBool::new(false));
let cancelled_for_on_chunk = cancelled.clone();
let on_chunk_tx = tx.clone(); let on_chunk_tx = tx.clone();
let on_chunk_queue = queue.clone(); let on_chunk_queue = queue.clone();
@ -160,7 +242,13 @@ pub fn create_chat_sse_stream(
let conv_id = on_chunk_conv_id; let conv_id = on_chunk_conv_id;
let msg_id = on_chunk_msg_id; let msg_id = on_chunk_msg_id;
let model = on_chunk_model.clone(); let model = on_chunk_model.clone();
let cancelled = cancelled_for_on_chunk.clone();
Box::pin(async move { Box::pin(async move {
// Check if stream has been cancelled
if cancelled.load(Ordering::Acquire) {
return;
}
let event = match chunk.chunk_type { let event = match chunk.chunk_type {
AiChunkType::Thinking => "thinking", AiChunkType::Thinking => "thinking",
AiChunkType::Answer => "token", AiChunkType::Answer => "token",
@ -171,10 +259,28 @@ pub fn create_chat_sse_stream(
AiChunkType::Thinking => normalize_thinking_content(&chunk.content), AiChunkType::Thinking => normalize_thinking_content(&chunk.content),
_ => chunk.content.clone(), _ => chunk.content.clone(),
}; };
// Build structured data payload based on chunk type
let data_json = match chunk.chunk_type {
AiChunkType::ToolCall | AiChunkType::ToolResult => {
// Use structured metadata if available
if let Some(meta) = chunk.metadata {
meta
} else {
// Fallback: wrap raw content as display text
serde_json::json!({"display": content})
}
}
_ => {
// thinking / answer: send plain text content
serde_json::Value::String(content)
}
};
let sse = format!( let sse = format!(
"data: {{\"event\":\"{}\",\"data\":{}}}\n\n", "data: {{\"event\":\"{}\",\"data\":{}}}\n\n",
event, event,
serde_json::to_string(&content).unwrap_or_default() serde_json::to_string(&data_json).unwrap_or_default()
); );
let _ = tx.send(sse).await; let _ = tx.send(sse).await;
@ -183,7 +289,7 @@ pub fn create_chat_sse_stream(
conversation_id: conv_id, conversation_id: conv_id,
message_id: msg_id, message_id: msg_id,
seq, seq,
content, content: chunk.content,
done: false, done: false,
error: None, error: None,
chunk_type: Some(event.to_string()), chunk_type: Some(event.to_string()),
@ -193,13 +299,42 @@ pub fn create_chat_sse_stream(
}) as Pin<Box<dyn std::future::Future<Output = ()> + Send>> }) as Pin<Box<dyn std::future::Future<Output = ()> + Send>>
}); });
let cancelled_for_check = cancelled.clone();
let cache_for_check = cache.clone();
let conv_id_for_check = conversation_id;
let (done_tx, mut done_rx) = tokio::sync::oneshot::channel::<()>();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
loop {
tokio::select! {
_ = interval.tick() => {
if cache_for_check.is_chat_stream_cancelled(conv_id_for_check).await {
cancelled_for_check.store(true, Ordering::Release);
break;
}
}
_ = &mut done_rx => break,
}
}
});
// Resolve max_tokens from model config (unlimited if not set)
let max_tokens = match model::Entity::find()
.filter(model::Column::Name.eq(&model_name))
.one(service.db.reader())
.await
{
Ok(Some(m)) => m.max_output_tokens.map(|v| v as u32).unwrap_or(u32::MAX),
_ => u32::MAX,
};
let result = chat_execution::execute_chat_stream( let result = chat_execution::execute_chat_stream(
messages, messages,
tools, tools,
&model_name, &model_name,
&config, &config,
0.7, // temperature 0.7, // temperature
4096, // max_tokens max_tokens, // max_tokens from model config
max_tool_depth, max_tool_depth,
tool_registry.as_ref(), tool_registry.as_ref(),
service.db.clone(), service.db.clone(),
@ -210,17 +345,34 @@ pub fn create_chat_sse_stream(
embed_service, embed_service,
on_chunk, on_chunk,
Some(conversation_id), Some(conversation_id),
).await; )
.await;
// Clear stream active state (streaming finished) // Clear stream active state and cancel flag (streaming finished)
let _ = cache.clear_chat_stream_active(conversation_id).await; let _ = cache.clear_chat_stream_active(conversation_id).await;
let _ = cache.clear_chat_stream_cancelled(conversation_id).await;
let was_cancelled = cancelled.load(Ordering::Acquire);
let _ = done_tx.send(());
match result { match result {
Ok(stream_result) => { Ok(stream_result) => {
if was_cancelled {
let _ = tx
.send("data: {\"event\":\"done\",\"data\":\"stopped\"}\n\n".to_string())
.await;
return;
}
// Build ordered content blocks from stream chunks, merging // Build ordered content blocks from stream chunks, merging
// consecutive blocks of the same role (thinking/assistant). // consecutive blocks of the same role (thinking/assistant).
let raw_blocks: Vec<(String, String)> = stream_result.chunks.iter() let raw_blocks: Vec<(String, String)> = stream_result
.filter(|c| matches!(c.chunk_type, StreamChunkType::Thinking | StreamChunkType::Answer)) .chunks
.iter()
.filter(|c| {
matches!(
c.chunk_type,
StreamChunkType::Thinking | StreamChunkType::Answer
)
})
.map(|chunk| { .map(|chunk| {
let role = match chunk.chunk_type { let role = match chunk.chunk_type {
StreamChunkType::Thinking => "thinking", StreamChunkType::Thinking => "thinking",
@ -234,14 +386,18 @@ pub fn create_chat_sse_stream(
// Apply thinking normalization to the fully merged thinking // Apply thinking normalization to the fully merged thinking
// blocks — per-token normalization is meaningless since each // blocks — per-token normalization is meaningless since each
// chunk is a single token. // chunk is a single token.
let normalized_blocks: Vec<(String, String)> = merged_blocks.into_iter().map(|(role, content)| { let normalized_blocks: Vec<(String, String)> = merged_blocks
if role == "thinking" { .into_iter()
(role, normalize_thinking_content(&content)) .map(|(role, content)| {
} else { if role == "thinking" {
(role, content) (role, normalize_thinking_content(&content))
} } else {
}).collect(); (role, content)
let content_blocks: Vec<serde_json::Value> = normalized_blocks.iter() }
})
.collect();
let content_blocks: Vec<serde_json::Value> = normalized_blocks
.iter()
.map(|(role, content)| serde_json::json!({ "role": role, "content": content })) .map(|(role, content)| serde_json::json!({ "role": role, "content": content }))
.collect(); .collect();
let content_value = if content_blocks.is_empty() { let content_value = if content_blocks.is_empty() {
@ -251,7 +407,6 @@ pub fn create_chat_sse_stream(
}; };
// Persist assistant message // Persist assistant message
let assistant_msg_id = Uuid::now_v7();
let assistant_msg = ai_message::ActiveModel { let assistant_msg = ai_message::ActiveModel {
id: Set(assistant_msg_id), id: Set(assistant_msg_id),
conversation_id: Set(conversation_id), conversation_id: Set(conversation_id),
@ -279,10 +434,14 @@ pub fn create_chat_sse_stream(
// After AI response, check/update conversation title and emit via SSE // After AI response, check/update conversation title and emit via SSE
if let Ok(Some(conv)) = ai_conversation::Entity::find_by_id(conversation_id) if let Ok(Some(conv)) = ai_conversation::Entity::find_by_id(conversation_id)
.one(service.db.reader()).await .one(service.db.reader())
.await
{ {
let existing_title = conv.title.clone(); let existing_title = conv.title.clone();
let needs_title = existing_title.as_deref().map(|t| t.is_empty() || t == "New Chat").unwrap_or(true); let needs_title = existing_title
.as_deref()
.map(|t| t.is_empty() || t == "New Chat")
.unwrap_or(true);
if needs_title { if needs_title {
// Generate title from first user message // Generate title from first user message
@ -290,18 +449,20 @@ pub fn create_chat_sse_stream(
.filter(ai_message::Column::ConversationId.eq(conversation_id)) .filter(ai_message::Column::ConversationId.eq(conversation_id))
.filter(ai_message::Column::Role.eq("user")) .filter(ai_message::Column::Role.eq("user"))
.order_by_asc(ai_message::Column::CreatedAt) .order_by_asc(ai_message::Column::CreatedAt)
.one(service.db.reader()).await.ok().flatten(); .one(service.db.reader())
.await
.ok()
.flatten();
if let Some(user_msg) = first_user_msg { if let Some(user_msg) = first_user_msg {
let content = match &user_msg.content { let content = match &user_msg.content {
serde_json::Value::String(s) => s.clone(), serde_json::Value::String(s) => s.clone(),
serde_json::Value::Array(arr) => { serde_json::Value::Array(arr) => arr
arr.first() .first()
.and_then(|f| f.get("content")) .and_then(|f| f.get("content"))
.and_then(|c| c.as_str()) .and_then(|c| c.as_str())
.unwrap_or("") .unwrap_or("")
.to_string() .to_string(),
}
other => other.to_string(), other => other.to_string(),
}; };
@ -323,14 +484,25 @@ pub fn create_chat_sse_stream(
let _ = active.update(service.db.writer()).await; let _ = active.update(service.db.writer()).await;
// Emit title via SSE // Emit title via SSE
let title_payload = serde_json::json!({"title": truncated}).to_string(); let title_payload =
let _ = tx.send(format!("data: {{\"event\":\"title\",\"data\":{}}}\n\n", title_payload)).await; serde_json::json!({"title": truncated}).to_string();
let _ = tx
.send(format!(
"data: {{\"event\":\"title\",\"data\":{}}}\n\n",
title_payload
))
.await;
} }
} }
} else if let Some(title) = &existing_title { } else if let Some(title) = &existing_title {
// Title already set (e.g. by AI tool) — emit it // Title already set (e.g. by AI tool) — emit it
let title_payload = serde_json::json!({"title": title}).to_string(); let title_payload = serde_json::json!({"title": title}).to_string();
let _ = tx.send(format!("data: {{\"event\":\"title\",\"data\":{}}}\n\n", title_payload)).await; let _ = tx
.send(format!(
"data: {{\"event\":\"title\",\"data\":{}}}\n\n",
title_payload
))
.await;
} }
} }
} }
@ -359,7 +531,7 @@ pub fn create_chat_sse_stream(
None => None, None => None,
}; };
if let Some(version_id) = billing_version_id { if let (Some(version_id), Some(_)) = (billing_version_id, conv_project_id) {
match agent::billing::record_ai_usage( match agent::billing::record_ai_usage(
&service.db, &service.db,
project_id, project_id,
@ -392,7 +564,7 @@ pub fn create_chat_sse_stream(
// Broadcast final chat message with token usage // Broadcast final chat message with token usage
let final_msg = ChatMessageEvent { let final_msg = ChatMessageEvent {
message_id: user_message_id, message_id: assistant_msg_id,
conversation_id, conversation_id,
project_id: conv_project_id, project_id: conv_project_id,
sender_id: Uuid::nil(), sender_id: Uuid::nil(),
@ -406,10 +578,13 @@ pub fn create_chat_sse_stream(
let _ = queue.publish_chat_message(&final_msg).await; let _ = queue.publish_chat_message(&final_msg).await;
// Send final SSE done event // Send final SSE done event
let _ = tx.send("data: {\"event\":\"done\",\"data\":\"ok\"}\n\n".to_string()).await; let _ = tx
.send("data: {\"event\":\"done\",\"data\":\"ok\"}\n\n".to_string())
.await;
} }
Err(e) => { Err(e) => {
let _ = tx.send(format!("data: {{\"event\":\"error\",\"data\":\"{}\"}}\n\n", e)).await; let payload = serde_json::json!({"event":"error","data": e.to_string()});
let _ = tx.send(format!("data: {}\n\n", payload)).await;
} }
} }
}); });
@ -427,20 +602,23 @@ async fn update_conversation_after_response(
use sea_orm::EntityTrait; use sea_orm::EntityTrait;
if let Ok(Some(conv)) = ai_conversation::Entity::find_by_id(conversation_id) if let Ok(Some(conv)) = ai_conversation::Entity::find_by_id(conversation_id)
.one(service.db.reader()).await .one(service.db.reader())
.await
{ {
let input_tokens = assistant_msg.input_tokens.unwrap_or(0) as i64; let input_tokens = assistant_msg.input_tokens.unwrap_or(0) as i64;
let output_tokens = assistant_msg.output_tokens.unwrap_or(0) as i64; let output_tokens = assistant_msg.output_tokens.unwrap_or(0) as i64;
let total_tokens = input_tokens + output_tokens; let total_tokens = input_tokens + output_tokens;
let previous_token_total = conv.token_usage_total.unwrap_or(0);
let mut active: ai_conversation::ActiveModel = conv.into(); let mut active: ai_conversation::ActiveModel = conv.into();
if let Ok(count) = AiMessage::find() if let Ok(count) = AiMessage::find()
.filter(ai_message::Column::ConversationId.eq(conversation_id)) .filter(ai_message::Column::ConversationId.eq(conversation_id))
.count(service.db.reader()).await .count(service.db.reader())
.await
{ {
active.message_count = Set(count as i32); active.message_count = Set(count as i32);
} }
active.token_usage_total = Set(Some(total_tokens as i32)); active.token_usage_total = Set(Some(previous_token_total + total_tokens as i32));
active.updated_at = Set(chrono::Utc::now()); active.updated_at = Set(chrono::Utc::now());
let _ = active.update(service.db.writer()).await; let _ = active.update(service.db.writer()).await;
} }
@ -471,12 +649,15 @@ async fn build_messages_from_history(
// For user/system messages: take the first block's content // For user/system messages: take the first block's content
if role == "assistant" { if role == "assistant" {
arr.iter() arr.iter()
.filter(|item| item.get("role").and_then(|r| r.as_str()) != Some("thinking")) .filter(|item| {
item.get("role").and_then(|r| r.as_str()) != Some("thinking")
})
.filter_map(|item| item.get("content").and_then(|c| c.as_str())) .filter_map(|item| item.get("content").and_then(|c| c.as_str()))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join("\n") .join("\n")
} else if let Some(first) = arr.first() { } else if let Some(first) = arr.first() {
first.get("content") first
.get("content")
.and_then(|c| c.as_str()) .and_then(|c| c.as_str())
.unwrap_or("") .unwrap_or("")
.to_string() .to_string()
@ -506,7 +687,9 @@ async fn build_messages_from_history(
fn merge_consecutive_blocks(blocks: Vec<(String, String)>) -> Vec<(String, String)> { fn merge_consecutive_blocks(blocks: Vec<(String, String)>) -> Vec<(String, String)> {
let mut merged: Vec<(String, String)> = Vec::new(); let mut merged: Vec<(String, String)> = Vec::new();
for (role, content) in blocks { for (role, content) in blocks {
if content.is_empty() { continue; } if content.is_empty() {
continue;
}
if let Some(last) = merged.last_mut() { if let Some(last) = merged.last_mut() {
if last.0 == role { if last.0 == role {
last.1.push_str(&content); last.1.push_str(&content);

View File

@ -5,7 +5,7 @@
//! stream chunks to connected clients. This enables multiple viewers to watch //! stream chunks to connected clients. This enables multiple viewers to watch
//! the same AI conversation in real-time. //! the same AI conversation in real-time.
use actix_web::{web, HttpResponse, Result}; use actix_web::{HttpResponse, Result, web};
use futures::StreamExt; use futures::StreamExt;
use service::AppService; use service::AppService;
use std::pin::Pin; use std::pin::Pin;
@ -34,10 +34,12 @@ pub fn create_watch_sse_stream(
let nats = match &service.queue_producer.nats { let nats = match &service.queue_producer.nats {
Some(n) => n.clone(), Some(n) => n.clone(),
None => { None => {
let _ = tx.send(format!( let _ = tx
"data: {{\"event\":\"error\",\"data\":{}}}\n\n", .send(format!(
serde_json::to_string("NATS not available").unwrap_or_default() "data: {{\"event\":\"error\",\"data\":{}}}\n\n",
)).await; serde_json::to_string("NATS not available").unwrap_or_default()
))
.await;
return; return;
} }
}; };
@ -47,10 +49,12 @@ pub fn create_watch_sse_stream(
let mut chunk_sub = match nats.subscribe(&chunk_subject).await { let mut chunk_sub = match nats.subscribe(&chunk_subject).await {
Ok(s) => s, Ok(s) => s,
Err(e) => { Err(e) => {
let _ = tx.send(format!( let _ = tx
"data: {{\"event\":\"error\",\"data\":{}}}\n\n", .send(format!(
serde_json::to_string(&e.to_string()).unwrap_or_default() "data: {{\"event\":\"error\",\"data\":{}}}\n\n",
)).await; serde_json::to_string(&e.to_string()).unwrap_or_default()
))
.await;
return; return;
} }
}; };
@ -60,10 +64,12 @@ pub fn create_watch_sse_stream(
let mut msg_sub = match nats.subscribe(&msg_subject).await { let mut msg_sub = match nats.subscribe(&msg_subject).await {
Ok(s) => s, Ok(s) => s,
Err(e) => { Err(e) => {
let _ = tx.send(format!( let _ = tx
"data: {{\"event\":\"error\",\"data\":{}}}\n\n", .send(format!(
serde_json::to_string(&e.to_string()).unwrap_or_default() "data: {{\"event\":\"error\",\"data\":{}}}\n\n",
)).await; serde_json::to_string(&e.to_string()).unwrap_or_default()
))
.await;
return; return;
} }
}; };
@ -115,9 +121,9 @@ pub fn create_watch_sse_stream(
} }
}); });
Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx).map(|s| { Box::pin(
Ok(actix_web::web::Bytes::from(s)) tokio_stream::wrappers::ReceiverStream::new(rx).map(|s| Ok(actix_web::web::Bytes::from(s))),
})) )
} }
#[utoipa::path( #[utoipa::path(
@ -137,7 +143,9 @@ pub async fn conversation_watch(
session: session::Session, session: session::Session,
path: web::Path<Uuid>, path: web::Path<Uuid>,
) -> Result<HttpResponse, ApiError> { ) -> Result<HttpResponse, ApiError> {
let user_id = session.user().ok_or_else(|| ApiError::from(service::error::AppError::Unauthorized))?; let user_id = session
.user()
.ok_or_else(|| ApiError::from(service::error::AppError::Unauthorized))?;
let conversation_id = path.into_inner(); let conversation_id = path.into_inner();
// Verify access (view-only is sufficient) // Verify access (view-only is sufficient)

View File

@ -1,4 +1,4 @@
use actix_web::{http::header, web, HttpRequest, HttpResponse}; use actix_web::{HttpRequest, HttpResponse, http::header, web};
use mime_guess2::MimeGuess; use mime_guess2::MimeGuess;
fn cache_control_header(path: &str) -> &'static str { fn cache_control_header(path: &str) -> &'static str {
@ -68,7 +68,9 @@ fn content_type_for_path(path: &str) -> String {
_ => {} _ => {}
} }
} }
MimeGuess::from_path(path).first_or_octet_stream().to_string() MimeGuess::from_path(path)
.first_or_octet_stream()
.to_string()
} }
/// Build an HttpResponse for the given asset. /// Build an HttpResponse for the given asset.

View File

@ -7,13 +7,15 @@ use session::Session;
fn sanitize_repo_path(path: &str) -> Result<String, ApiError> { fn sanitize_repo_path(path: &str) -> Result<String, ApiError> {
if path.contains("..") || path.contains('~') { if path.contains("..") || path.contains('~') {
return Err(ApiError(service::error::AppError::BadRequest( return Err(ApiError(service::error::AppError::BadRequest(
"Invalid repository path".to_string() "Invalid repository path".to_string(),
))); )));
} }
if path.starts_with('/') || path.starts_with('\\') || if path.starts_with('/')
(path.len() >= 3 && path.as_bytes()[1] == b':' && path.as_bytes()[2] == b'\\') { || path.starts_with('\\')
|| (path.len() >= 3 && path.as_bytes()[1] == b':' && path.as_bytes()[2] == b'\\')
{
return Err(ApiError(service::error::AppError::BadRequest( return Err(ApiError(service::error::AppError::BadRequest(
"Absolute paths are not allowed".to_string() "Absolute paths are not allowed".to_string(),
))); )));
} }
Ok(path.to_string()) Ok(path.to_string())

View File

@ -1,7 +1,7 @@
use crate::{ApiResponse, error::ApiError}; use crate::{ApiResponse, error::ApiError};
use actix_web::{HttpResponse, Result, web}; use actix_web::{HttpResponse, Result, web};
use service::issue::IssueAddLabelsByNamesRequest;
use service::AppService; use service::AppService;
use service::issue::IssueAddLabelsByNamesRequest;
use session::Session; use session::Session;
#[utoipa::path( #[utoipa::path(

View File

@ -20,4 +20,4 @@ pub mod user;
#[allow(dead_code)] #[allow(dead_code)]
mod frontend; mod frontend;
pub use error::{api_success, ApiError, ApiResponse}; pub use error::{ApiError, ApiResponse, api_success};

View File

@ -743,7 +743,7 @@ use utoipa::OpenApi;
crate::chat::handlers::types::MessageContent, crate::chat::handlers::types::MessageContent,
crate::chat::handlers::types::MessageResponse, crate::chat::handlers::types::MessageResponse,
crate::chat::handlers::types::ShareResponse, crate::chat::handlers::types::ShareResponse,
crate::chat::handlers::fork::ForkResponse, crate::chat::handlers::fork::ForkConversationResponse,
) )
), ),
tags( tags(

View File

@ -1,4 +1,4 @@
use actix_web::{web, HttpResponse}; use actix_web::{HttpResponse, web};
use service::AppService; use service::AppService;
/// Serves robots.txt, blocking all sensitive paths from crawlers. /// Serves robots.txt, blocking all sensitive paths from crawlers.

View File

@ -191,10 +191,7 @@ pub fn init_room_routes(cfg: &mut web::ServiceConfig) {
web::delete().to(draft_and_history::draft_clear), web::delete().to(draft_and_history::draft_clear),
) )
// file upload // file upload
.route( .route("/rooms/{room_id}/upload", web::post().to(upload::upload))
"/rooms/{room_id}/upload",
web::post().to(upload::upload),
)
.route( .route(
"/rooms/{room_id}/attachments/{attachment_id}", "/rooms/{room_id}/attachments/{attachment_id}",
web::get().to(upload::get_attachment), web::get().to(upload::get_attachment),

View File

@ -1,11 +1,11 @@
use crate::{ApiResponse, error::ApiError}; use crate::{ApiResponse, error::ApiError};
use actix_web::{HttpResponse, Result, web}; use actix_web::{HttpResponse, Result, web};
use room::presence::PresenceChanged;
use room::ws_context::WsUserContext; use room::ws_context::WsUserContext;
use service::AppService; use service::AppService;
use session::Session; use session::Session;
use utoipa::IntoParams; use utoipa::IntoParams;
use uuid::Uuid; use uuid::Uuid;
use room::presence::PresenceChanged;
#[derive(Debug, serde::Deserialize, IntoParams)] #[derive(Debug, serde::Deserialize, IntoParams)]
pub struct RoomListQuery { pub struct RoomListQuery {

View File

@ -1,6 +1,6 @@
use actix_multipart::Multipart; use actix_multipart::Multipart;
use actix_web::{HttpResponse, Result, web};
use actix_web::http::header::{CONTENT_DISPOSITION, CONTENT_TYPE}; use actix_web::http::header::{CONTENT_DISPOSITION, CONTENT_TYPE};
use actix_web::{HttpResponse, Result, web};
use chrono::Utc; use chrono::Utc;
use futures_util::StreamExt; use futures_util::StreamExt;
use models::rooms::room_attachment; use models::rooms::room_attachment;
@ -48,14 +48,11 @@ pub async fn upload(
.user() .user()
.ok_or_else(|| crate::error::ApiError(service::error::AppError::Unauthorized))?; .ok_or_else(|| crate::error::ApiError(service::error::AppError::Unauthorized))?;
let storage = service let storage = service.storage.as_ref().ok_or_else(|| {
.storage crate::error::ApiError(service::error::AppError::BadRequest(
.as_ref() "Storage not configured".to_string(),
.ok_or_else(|| { ))
crate::error::ApiError(service::error::AppError::BadRequest( })?;
"Storage not configured".to_string(),
))
})?;
let room_id = path.into_inner(); let room_id = path.into_inner();
service service
@ -125,14 +122,9 @@ pub async fn upload(
let key = format!("rooms/{}/{}", room_id, unique_name); let key = format!("rooms/{}/{}", room_id, unique_name);
let file_size = file_data.len() as i64; let file_size = file_data.len() as i64;
let _url = storage let _url = storage.upload(&key, file_data).await.map_err(|e| {
.upload(&key, file_data) crate::error::ApiError(service::error::AppError::InternalServerError(e.to_string()))
.await })?;
.map_err(|e| {
crate::error::ApiError(service::error::AppError::InternalServerError(
e.to_string(),
))
})?;
// Write to room_attachment table (message will be linked when message is created) // Write to room_attachment table (message will be linked when message is created)
let attachment_id = Uuid::now_v7(); let attachment_id = Uuid::now_v7();
@ -147,14 +139,9 @@ pub async fn upload(
s3_key: Set(key), s3_key: Set(key),
created_at: Set(Utc::now()), created_at: Set(Utc::now()),
}; };
attachment attachment.insert(&service.db).await.map_err(|e| {
.insert(&service.db) crate::error::ApiError(service::error::AppError::InternalServerError(e.to_string()))
.await })?;
.map_err(|e| {
crate::error::ApiError(service::error::AppError::InternalServerError(
e.to_string(),
))
})?;
// Return the structured attachment URL instead of the /files/... path // Return the structured attachment URL instead of the /files/... path
// (the /files/... path has no handler on the API server) // (the /files/... path has no handler on the API server)
@ -205,30 +192,35 @@ pub async fn get_attachment(
let attachment = room_attachment::Entity::find_by_id(attachment_id) let attachment = room_attachment::Entity::find_by_id(attachment_id)
.one(&service.db) .one(&service.db)
.await .await
.map_err(|e| crate::error::ApiError(service::error::AppError::InternalServerError(e.to_string())))? .map_err(|e| {
.ok_or_else(|| crate::error::ApiError(service::error::AppError::NotFound("attachment not found".into())))?; crate::error::ApiError(service::error::AppError::InternalServerError(e.to_string()))
})?
.ok_or_else(|| {
crate::error::ApiError(service::error::AppError::NotFound(
"attachment not found".into(),
))
})?;
// Ensure the attachment belongs to the requested room // Ensure the attachment belongs to the requested room
if attachment.room != room_id { if attachment.room != room_id {
return Err(crate::error::ApiError(service::error::AppError::NotFound("attachment not found".into()))); return Err(crate::error::ApiError(service::error::AppError::NotFound(
"attachment not found".into(),
)));
} }
let storage = service let storage = service.storage.as_ref().ok_or_else(|| {
.storage crate::error::ApiError(service::error::AppError::InternalServerError(
.as_ref() "Storage not configured".to_string(),
.ok_or_else(|| crate::error::ApiError(service::error::AppError::InternalServerError("Storage not configured".to_string())))?; ))
})?;
let (data, content_type) = storage let (data, content_type) = storage.read(&attachment.s3_key).await.map_err(|e| {
.read(&attachment.s3_key) crate::error::ApiError(service::error::AppError::InternalServerError(e.to_string()))
.await })?;
.map_err(|e| crate::error::ApiError(service::error::AppError::InternalServerError(e.to_string())))?;
Ok(HttpResponse::Ok() Ok(HttpResponse::Ok()
.content_type(content_type.clone()) .content_type(content_type.clone())
.insert_header(( .insert_header((CONTENT_TYPE, content_type))
CONTENT_TYPE,
content_type,
))
.insert_header(( .insert_header((
CONTENT_DISPOSITION, CONTENT_DISPOSITION,
format!("inline; filename=\"{}\"", attachment.file_name), format!("inline; filename=\"{}\"", attachment.file_name),

View File

@ -2,7 +2,9 @@ use crate::ApiResponse;
use crate::error::ApiError; use crate::error::ApiError;
use actix_web::{HttpResponse, Result, web}; use actix_web::{HttpResponse, Result, web};
use service::AppService; use service::AppService;
use service::search::{GlobalMessageSearchQuery, GlobalMessageSearchResponse, SearchQuery, SearchResponse}; use service::search::{
GlobalMessageSearchQuery, GlobalMessageSearchResponse, SearchQuery, SearchResponse,
};
use session::Session; use session::Session;
#[utoipa::path( #[utoipa::path(
@ -52,6 +54,8 @@ pub async fn search_messages(
session: Session, session: Session,
query: web::Query<GlobalMessageSearchQuery>, query: web::Query<GlobalMessageSearchQuery>,
) -> Result<HttpResponse, ApiError> { ) -> Result<HttpResponse, ApiError> {
let resp = service.global_message_search(&session, query.into_inner()).await?; let resp = service
.global_message_search(&session, query.into_inner())
.await?;
Ok(ApiResponse::ok(resp).to_response()) Ok(ApiResponse::ok(resp).to_response())
} }

View File

@ -1,4 +1,4 @@
use actix_web::{web, HttpResponse}; use actix_web::{HttpResponse, web};
use db::cache::AppCache; use db::cache::AppCache;
use models::projects::project::{Column as PCol, Entity as PEntity}; use models::projects::project::{Column as PCol, Entity as PEntity};
use models::repos::repo::{Column as RCol, Entity as REntity}; use models::repos::repo::{Column as RCol, Entity as REntity};

View File

@ -38,9 +38,7 @@ pub async fn skill_list(
query: web::Query<SkillQuery>, query: web::Query<SkillQuery>,
) -> Result<HttpResponse, ApiError> { ) -> Result<HttpResponse, ApiError> {
let project_name = path.into_inner(); let project_name = path.into_inner();
let project = service let project = service.project_info(&session, project_name.clone()).await?;
.project_info(&session, project_name.clone())
.await?;
let q = service::skill::info::SkillListQuery { let q = service::skill::info::SkillListQuery {
source: query.source.clone(), source: query.source.clone(),
@ -73,14 +71,9 @@ pub async fn skill_get(
session: Session, session: Session,
path: web::Path<SkillPath>, path: web::Path<SkillPath>,
) -> Result<HttpResponse, ApiError> { ) -> Result<HttpResponse, ApiError> {
let SkillPath { let SkillPath { project_name, slug } = path.into_inner();
project_name,
slug,
} = path.into_inner();
let project = service let project = service.project_info(&session, project_name.clone()).await?;
.project_info(&session, project_name.clone())
.await?;
let skill = service let skill = service
.skill_get(project.uid.to_string(), slug, &session) .skill_get(project.uid.to_string(), slug, &session)
@ -108,9 +101,7 @@ pub async fn skill_create(
body: web::Json<service::skill::manage::CreateSkillRequest>, body: web::Json<service::skill::manage::CreateSkillRequest>,
) -> Result<HttpResponse, ApiError> { ) -> Result<HttpResponse, ApiError> {
let project_name = path.into_inner(); let project_name = path.into_inner();
let project = service let project = service.project_info(&session, project_name.clone()).await?;
.project_info(&session, project_name.clone())
.await?;
let skill = service let skill = service
.skill_create(project.uid.to_string(), body.into_inner(), &session) .skill_create(project.uid.to_string(), body.into_inner(), &session)
@ -140,14 +131,9 @@ pub async fn skill_update(
path: web::Path<SkillPath>, path: web::Path<SkillPath>,
body: web::Json<service::skill::manage::UpdateSkillRequest>, body: web::Json<service::skill::manage::UpdateSkillRequest>,
) -> Result<HttpResponse, ApiError> { ) -> Result<HttpResponse, ApiError> {
let SkillPath { let SkillPath { project_name, slug } = path.into_inner();
project_name,
slug,
} = path.into_inner();
let project = service let project = service.project_info(&session, project_name.clone()).await?;
.project_info(&session, project_name.clone())
.await?;
let skill = service let skill = service
.skill_update(project.uid.to_string(), slug, body.into_inner(), &session) .skill_update(project.uid.to_string(), slug, body.into_inner(), &session)
@ -175,14 +161,9 @@ pub async fn skill_delete(
session: Session, session: Session,
path: web::Path<SkillPath>, path: web::Path<SkillPath>,
) -> Result<HttpResponse, ApiError> { ) -> Result<HttpResponse, ApiError> {
let SkillPath { let SkillPath { project_name, slug } = path.into_inner();
project_name,
slug,
} = path.into_inner();
let project = service let project = service.project_info(&session, project_name.clone()).await?;
.project_info(&session, project_name.clone())
.await?;
let result = service let result = service
.skill_delete(project.uid.to_string(), slug, &session) .skill_delete(project.uid.to_string(), slug, &session)
@ -207,20 +188,17 @@ pub async fn skill_scan(
path: web::Path<String>, path: web::Path<String>,
) -> Result<HttpResponse, ApiError> { ) -> Result<HttpResponse, ApiError> {
let project_name = path.into_inner(); let project_name = path.into_inner();
let project = service let project = service.project_info(&session, project_name).await?;
.project_info(&session, project_name)
.await?;
let result = service let result = service.skill_scan_repos(project.uid, project.uid).await?;
.skill_scan_repos(project.uid, project.uid)
.await?;
Ok(ApiResponse::ok(ScanResponse { Ok(ApiResponse::ok(ScanResponse {
discovered: result.discovered, discovered: result.discovered,
created: result.created, created: result.created,
updated: result.updated, updated: result.updated,
removed: result.removed, removed: result.removed,
}).to_response()) })
.to_response())
} }
#[derive(serde::Serialize, utoipa::ToSchema)] #[derive(serde::Serialize, utoipa::ToSchema)]
@ -230,4 +208,3 @@ pub struct ScanResponse {
pub updated: i64, pub updated: i64,
pub removed: i64, pub removed: i64,
} }

View File

@ -52,6 +52,8 @@ pub async fn user_billing_history(
session: Session, session: Session,
query: web::Query<service::user::billing::UserBillingHistoryQuery>, query: web::Query<service::user::billing::UserBillingHistoryQuery>,
) -> Result<HttpResponse, ApiError> { ) -> Result<HttpResponse, ApiError> {
let resp = service.user_billing_history(&session, query.into_inner()).await?; let resp = service
.user_billing_history(&session, query.into_inner())
.await?;
Ok(ApiResponse::ok(resp).to_response()) Ok(ApiResponse::ok(resp).to_response())
} }

View File

@ -10,9 +10,9 @@ pub mod repository;
pub mod ssh_key; pub mod ssh_key;
pub mod stars; pub mod stars;
pub mod subscribe; pub mod subscribe;
pub mod summary;
pub mod user_activity; pub mod user_activity;
pub mod user_info; pub mod user_info;
pub mod summary;
use actix_web::web; use actix_web::web;
@ -73,10 +73,7 @@ pub fn init_user_routes(cfg: &mut web::ServiceConfig) {
"/me/heatmap", "/me/heatmap",
web::get().to(chpc::get_my_contribution_heatmap), web::get().to(chpc::get_my_contribution_heatmap),
) )
.route( .route("/me/billing", web::get().to(billing::user_billing))
"/me/billing",
web::get().to(billing::user_billing),
)
.route( .route(
"/me/billing/errors", "/me/billing/errors",
web::get().to(billing::user_billing_errors), web::get().to(billing::user_billing_errors),
@ -99,13 +96,22 @@ pub fn init_user_routes(cfg: &mut web::ServiceConfig) {
web::get().to(profile::get_profile_by_username), web::get().to(profile::get_profile_by_username),
) )
.route("/{username}/info", web::get().to(user_info::get_user_info)) .route("/{username}/info", web::get().to(user_info::get_user_info))
.route("/{username}/summary", web::get().to(summary::get_user_summary)) .route(
"/{username}/summary",
web::get().to(summary::get_user_summary),
)
.route( .route(
"/{username}/heatmap", "/{username}/heatmap",
web::get().to(chpc::get_contribution_heatmap), web::get().to(chpc::get_contribution_heatmap),
) )
.route("/{username}/keys", web::get().to(ssh_key::list_user_ssh_keys)) .route(
.route("/{username}/activity", web::get().to(user_activity::get_user_activity)) "/{username}/keys",
web::get().to(ssh_key::list_user_ssh_keys),
)
.route(
"/{username}/activity",
web::get().to(user_activity::get_user_activity),
)
.route("/{username}/stars", web::get().to(stars::get_user_stars)) .route("/{username}/stars", web::get().to(stars::get_user_stars))
.route( .route(
"/{username}/keys/{key_id}", "/{username}/keys/{key_id}",

View File

@ -1,9 +1,9 @@
use actix_web::{HttpResponse, Result, web}; use actix_web::{HttpResponse, Result, web};
use service::error::AppError;
use service::AppService; use service::AppService;
use service::error::AppError;
use session::Session; use session::Session;
use crate::{error::ApiError, ApiResponse}; use crate::{ApiResponse, error::ApiError};
#[derive(serde::Serialize, utoipa::ToSchema)] #[derive(serde::Serialize, utoipa::ToSchema)]
pub struct VapidKeyResponse { pub struct VapidKeyResponse {
@ -22,9 +22,7 @@ pub struct VapidKeyResponse {
pub async fn get_vapid_public_key( pub async fn get_vapid_public_key(
service: web::Data<AppService>, service: web::Data<AppService>,
) -> Result<HttpResponse, ApiError> { ) -> Result<HttpResponse, ApiError> {
let public_key = service let public_key = service.config.vapid_public_key();
.config
.vapid_public_key();
let public_key = match public_key { let public_key = match public_key {
Some(k) => k, Some(k) => k,
None => { None => {

View File

@ -126,8 +126,6 @@ pub async fn list_user_ssh_keys(
path: web::Path<String>, path: web::Path<String>,
) -> Result<HttpResponse, ApiError> { ) -> Result<HttpResponse, ApiError> {
let username = path.into_inner(); let username = path.into_inner();
let resp = service let resp = service.user_list_ssh_keys_by_username(username).await?;
.user_list_ssh_keys_by_username(username)
.await?;
Ok(ApiResponse::ok(resp).to_response()) Ok(ApiResponse::ok(resp).to_response())
} }