diff --git a/lib/api/src/auth/login.rs b/lib/api/src/auth/login.rs index 2b48e70..7a6ee6c 100644 --- a/lib/api/src/auth/login.rs +++ b/lib/api/src/auth/login.rs @@ -15,6 +15,7 @@ fn ok() -> Result { responses((status = 200)), tag = "auth" )] +#[tracing::instrument(skip(session, params, service), fields(username = %params.username))] pub async fn login( session: Session, params: web::Json, diff --git a/lib/api/src/auth/register.rs b/lib/api/src/auth/register.rs index 3918cb5..e9a3be8 100644 --- a/lib/api/src/auth/register.rs +++ b/lib/api/src/auth/register.rs @@ -16,6 +16,7 @@ fn ok_json(data: T) -> Result { responses((status = 200)), tag = "auth" )] +#[tracing::instrument(skip(session, params, service), fields(username = %params.username))] pub async fn register( session: Session, params: web::Json, diff --git a/lib/api/src/channel/mod.rs b/lib/api/src/channel/mod.rs index 3597392..e324b83 100644 --- a/lib/api/src/channel/mod.rs +++ b/lib/api/src/channel/mod.rs @@ -1,5 +1,6 @@ pub mod rest; pub mod rest_article; +pub mod rest_attachment; pub mod rest_embed; pub mod rest_interact; pub mod rest_member; @@ -197,10 +198,12 @@ pub fn configure(cfg: &mut ServiceConfig, bus: ChannelBus) { .route(actix_web::web::get().to(rest_article::article_list)), ) .service( - actix_web::web::resource("/channels/{channel_id}/articles/{article_id}") - .route(actix_web::web::get().to(rest_article::article_get)) - .route(actix_web::web::patch().to(rest_article::article_update)) - .route(actix_web::web::delete().to(rest_article::article_delete)), + actix_web::web::resource( + "/channels/{channel_id}/articles/{article_id}", + ) + .route(actix_web::web::get().to(rest_article::article_get)) + .route(actix_web::web::patch().to(rest_article::article_update)) + .route(actix_web::web::delete().to(rest_article::article_delete)), ) .service( actix_web::web::resource("/articles/{article_id}/like") @@ -208,17 +211,30 @@ pub fn configure(cfg: &mut ServiceConfig, bus: ChannelBus) { ) .service( actix_web::web::resource("/articles/{article_id}/comments") - .route(actix_web::web::post().to(rest_article::article_comment_create)) - .route(actix_web::web::get().to(rest_article::article_comment_list)), + .route( + actix_web::web::post().to(rest_article::article_comment_create), + ) + .route( + actix_web::web::get().to(rest_article::article_comment_list), + ), ) .service( - actix_web::web::resource("/articles/{article_id}/comments/{comment_id}") - .route(actix_web::web::delete().to(rest_article::article_comment_delete)), + actix_web::web::resource( + "/articles/{article_id}/comments/{comment_id}", + ) + .route( + actix_web::web::delete().to(rest_article::article_comment_delete), + ), ) .service( actix_web::web::resource("/articles/{article_id}/likes") .route(actix_web::web::get().to(rest_article::article_liked_users)), ); + cfg.service( + actix_web::web::resource("/rooms/{room_id}/attachments").route( + actix_web::web::post().to(rest_attachment::upload_attachment), + ), + ); cfg.service( actix_web::web::resource("/embed/twitter") .route(actix_web::web::get().to(rest_embed::twitter_oembed)), diff --git a/lib/api/src/channel/rest_attachment.rs b/lib/api/src/channel/rest_attachment.rs new file mode 100644 index 0000000..2050911 --- /dev/null +++ b/lib/api/src/channel/rest_attachment.rs @@ -0,0 +1,91 @@ +use actix_web::{HttpRequest, HttpResponse, web}; +use channel::ChannelError; +use uuid::Uuid; + +use super::rest::{channel_err, extract_user}; +use crate::error::ApiError; + +#[derive(serde::Deserialize, utoipa::IntoParams)] +pub struct UploadParams { + pub filename: Option, +} + +#[utoipa::path( + post, + path = "/api/v1/ws/rooms/{room_id}/attachments", + params(UploadParams), + request_body(content = String, description = "Raw file bytes", content_type = "application/octet-stream"), + responses((status = 201, description = "Attachment uploaded")), + tag = "channel", +)] +pub async fn upload_attachment( + req: HttpRequest, + room_id: web::Path, + params: web::Query, + body: web::Bytes, + bus: web::Data, +) -> Result { + let user_id = extract_user(&req)?; + let room_id = room_id.into_inner(); + + // Ensure room access + let msg = channel::http::WsInMessage::RoomGet { room: room_id }; + channel::http::WsHandler::handle(&bus, user_id, msg) + .await + .map_err(channel_err)?; + + let filename = params + .filename + .clone() + .unwrap_or_else(|| Uuid::new_v4().to_string()); + + let content_type = req + .headers() + .get("content-type") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + + let attachment_id = Uuid::now_v7(); + + // Upload to CDN + let stored = bus + .inner + .cdn + .upload_file( + room_id, + attachment_id, + &body, + &filename, + content_type.clone(), + ) + .await + .map_err(channel_err)?; + + // Generate public URL for the file + let public_url = bus.inner.cdn.public_url(&stored.key).ok().flatten(); + + // Insert room_attachment row (message = null until message is created) + db::sqlx::query( + "INSERT INTO room_attachment (id, message, seq, file_name, content_type, \ + size_bytes, storage_key, url, uploaded_by, created_at) \ + VALUES ($1, NULL, 0, $2, $3, $4, $5, $6, $7, now())", + ) + .bind(attachment_id) + .bind(&filename) + .bind(&content_type) + .bind(stored.size) + .bind(&stored.key) + .bind(&public_url) + .bind(user_id) + .execute(bus.inner.db.writer()) + .await + .map_err(|e| channel_err(ChannelError::from(e)))?; + + Ok(HttpResponse::Created().json(serde_json::json!({ + "id": attachment_id, + "filename": filename, + "content_type": content_type, + "size": stored.size, + "url": public_url, + }))) +} diff --git a/lib/api/src/channel/rest_message.rs b/lib/api/src/channel/rest_message.rs index 19dc5e0..ee15c8d 100644 --- a/lib/api/src/channel/rest_message.rs +++ b/lib/api/src/channel/rest_message.rs @@ -13,6 +13,7 @@ pub struct CreateMessageRequest { pub content_type: Option, pub thread: Option, pub in_reply_to: Option, + pub attachment_ids: Option>, } #[derive(Debug, Deserialize, utoipa::ToSchema)] @@ -68,6 +69,7 @@ pub async fn create_message( content_type: body.content_type.clone(), thread: body.thread, in_reply_to: body.in_reply_to, + attachment_ids: body.attachment_ids.clone(), }; let result = WsHandler::handle(&bus, user_id, msg) .await diff --git a/lib/api/src/git/embed.rs b/lib/api/src/git/embed.rs new file mode 100644 index 0000000..3c71057 --- /dev/null +++ b/lib/api/src/git/embed.rs @@ -0,0 +1,38 @@ +use actix_web::{HttpResponse, web}; +use serde::{Deserialize, Serialize}; +use service::AppService; +use session::Session; + +use crate::error::ApiError; + +fn ok_json(data: T) -> Result { + Ok(HttpResponse::Ok().json(data)) +} + +#[derive(Deserialize, utoipa::IntoParams)] +pub struct WkRepoPath { + pub wk: String, + pub repo: String, +} + +#[utoipa::path( + get, + path = "/api/v1/workspace/{wk}/repos/{repo}/embed-card", + params(WkRepoPath), + responses( + (status = 200, description = "Aggregated repo card data for channel embed"), + (status = 401, description = "Unauthorized"), + (status = 403, description = "Not member"), + (status = 404, description = "Repo not found"), + ), + security(("session" = [])) +)] +pub async fn repo_embed_card( + session: Session, + service: web::Data, + path: web::Path, +) -> Result { + let WkRepoPath { wk, repo } = path.into_inner(); + let data = service.repo_embed_card(&session, &wk, &repo).await?; + ok_json(data) +} diff --git a/lib/api/src/git/init.rs b/lib/api/src/git/init.rs index 0347b97..aa8d05f 100644 --- a/lib/api/src/git/init.rs +++ b/lib/api/src/git/init.rs @@ -24,6 +24,7 @@ pub struct WkPath { (status = 403, description = "Permission denied"), (status = 409, description = "Repo name exists")), security(("session" = [])) )] +#[tracing::instrument(skip(session, service, path, params))] pub async fn create_repo( session: Session, service: web::Data, diff --git a/lib/api/src/git/mod.rs b/lib/api/src/git/mod.rs index 1af45f4..ff717ef 100644 --- a/lib/api/src/git/mod.rs +++ b/lib/api/src/git/mod.rs @@ -9,6 +9,7 @@ pub mod contents; pub mod contributor; pub mod diff; pub mod dto; +pub mod embed; pub mod fork; pub mod init; pub mod language; @@ -33,6 +34,10 @@ pub fn configure(cfg: &mut ServiceConfig) { cfg.service( web::resource("/clone").route(web::post().to(init::clone_repo)), ); + cfg.service( + web::resource("/{repo}/embed-card") + .route(web::get().to(embed::repo_embed_card)), + ); cfg.service( web::resource("/{repo}") .route(web::get().to(repo::get_repo)) diff --git a/lib/api/src/git/release.rs b/lib/api/src/git/release.rs index 9de77ce..6f7963f 100644 --- a/lib/api/src/git/release.rs +++ b/lib/api/src/git/release.rs @@ -107,6 +107,7 @@ pub async fn get_release_by_tag( responses((status = 201, body = ReleaseResponse)), security(("session" = [])) )] +#[tracing::instrument(skip(session, service, path, body))] pub async fn create_release( session: Session, service: web::Data, diff --git a/lib/api/src/git/repo.rs b/lib/api/src/git/repo.rs index c177df4..323f5e1 100644 --- a/lib/api/src/git/repo.rs +++ b/lib/api/src/git/repo.rs @@ -16,7 +16,7 @@ fn ok() -> Result { Ok(HttpResponse::Ok().finish()) } -#[derive(Deserialize, utoipa::IntoParams)] +#[derive(Debug, Deserialize, utoipa::IntoParams)] pub struct WkRepoPath { pub wk: String, pub repo: String, @@ -35,6 +35,7 @@ pub struct WkRepoPath { ), security(("session" = [])) )] +#[tracing::instrument(skip(session, service, filter, pagination), fields(workspace = %path))] pub async fn list_repos( session: Session, service: web::Data, @@ -59,6 +60,7 @@ pub async fn list_repos( ), security(("session" = [])) )] +#[tracing::instrument(skip(session, service), fields(workspace = %path.wk, repo = %path.repo))] pub async fn get_repo( session: Session, service: web::Data, @@ -81,6 +83,7 @@ pub async fn get_repo( ), security(("session" = [])) )] +#[tracing::instrument(skip(session, service, params), fields(workspace = %path.wk, repo = %path.repo))] pub async fn update_repo( session: Session, service: web::Data, @@ -124,6 +127,7 @@ pub async fn archive_repo( ), security(("session" = [])) )] +#[tracing::instrument(skip(session, service), fields(workspace = %path.wk, repo = %path.repo))] pub async fn delete_repo( session: Session, service: web::Data, @@ -146,6 +150,7 @@ pub async fn delete_repo( ), security(("session" = [])) )] +#[tracing::instrument(skip(session, service, params), fields(workspace = %path.wk, repo = %path.repo, target = %params.target_workspace))] pub async fn transfer_repo( session: Session, service: web::Data, diff --git a/lib/api/src/git/webhook.rs b/lib/api/src/git/webhook.rs index 40b0995..723e4fc 100644 --- a/lib/api/src/git/webhook.rs +++ b/lib/api/src/git/webhook.rs @@ -60,6 +60,7 @@ pub async fn list_webhooks( ), security(("session" = [])) )] +#[tracing::instrument(skip(session, service, path, params))] pub async fn create_webhook( session: Session, service: web::Data, diff --git a/lib/api/src/issues/issue.rs b/lib/api/src/issues/issue.rs index 67b27f6..f6c46e6 100644 --- a/lib/api/src/issues/issue.rs +++ b/lib/api/src/issues/issue.rs @@ -40,6 +40,7 @@ pub struct IssuePath { ("session" = []) ) )] +#[tracing::instrument(skip(session, service, path, params))] pub async fn create_issue( session: Session, service: web::Data, @@ -117,6 +118,7 @@ pub async fn get_issue( ("session" = []) ) )] +#[tracing::instrument(skip(session, service, path, params))] pub async fn update_issue( session: Session, service: web::Data, @@ -165,6 +167,7 @@ pub async fn delete_issue( ("session" = []) ) )] +#[tracing::instrument(skip(session, service, path))] pub async fn close_issue( session: Session, service: web::Data, @@ -187,6 +190,7 @@ pub async fn close_issue( ("session" = []) ) )] +#[tracing::instrument(skip(session, service, path))] pub async fn reopen_issue( session: Session, service: web::Data, diff --git a/lib/api/src/lib.rs b/lib/api/src/lib.rs index dbd048d..fa05278 100644 --- a/lib/api/src/lib.rs +++ b/lib/api/src/lib.rs @@ -5,6 +5,7 @@ pub mod channel; pub mod error; pub mod git; pub mod issues; +pub mod metrics; pub mod openapi; pub mod pull_request; pub mod search; @@ -15,6 +16,7 @@ pub mod workspace; use actix_web::web::{self, ServiceConfig}; pub fn configure(cfg: &mut ServiceConfig, channel_bus: channel::ChannelBus) { + cfg.route("/metrics", web::get().to(metrics::metrics)); cfg.service( web::scope("/api/v1") .configure(auth::configure) diff --git a/lib/api/src/metrics.rs b/lib/api/src/metrics.rs new file mode 100644 index 0000000..7527b3b --- /dev/null +++ b/lib/api/src/metrics.rs @@ -0,0 +1,15 @@ +use actix_web::{HttpResponse, web}; +use service::AppService; + +/// Expose Prometheus text-format metrics. +/// Scraped by Prometheus or any compatible collector. +pub async fn metrics(service: web::Data) -> HttpResponse { + match service.metrics_registry.encode() { + Ok(body) => HttpResponse::Ok() + .content_type("text/plain; version=0.0.4") + .body(body), + Err(e) => HttpResponse::InternalServerError() + .content_type("text/plain") + .body(format!("metrics encoding error: {e}")), + } +} diff --git a/lib/api/src/openapi.rs b/lib/api/src/openapi.rs index ecd1617..22ebac6 100644 --- a/lib/api/src/openapi.rs +++ b/lib/api/src/openapi.rs @@ -398,5 +398,7 @@ impl Modify for SecurityAddon { } pub fn openapi_json() -> String { - ApiDoc::openapi().to_pretty_json().unwrap() + ApiDoc::openapi() + .to_pretty_json() + .expect("OpenAPI spec serialization should never fail") } diff --git a/lib/api/src/pull_request/merge.rs b/lib/api/src/pull_request/merge.rs index eb1933a..a4cca48 100644 --- a/lib/api/src/pull_request/merge.rs +++ b/lib/api/src/pull_request/merge.rs @@ -53,6 +53,7 @@ pub async fn merge_analysis( ), security(("session" = [])) )] +#[tracing::instrument(skip(session, service, path, params))] pub async fn merge_pr( session: Session, service: web::Data, diff --git a/lib/api/src/pull_request/pull_request.rs b/lib/api/src/pull_request/pull_request.rs index 16b444d..9b8fe60 100644 --- a/lib/api/src/pull_request/pull_request.rs +++ b/lib/api/src/pull_request/pull_request.rs @@ -46,6 +46,7 @@ pub struct PrRepoPath { ), security(("session" = [])) )] +#[tracing::instrument(skip(session, service, path, params))] pub async fn create_pr( session: Session, service: web::Data, diff --git a/lib/api/src/workspace/join.rs b/lib/api/src/workspace/join.rs index 7d87635..63abfc2 100644 --- a/lib/api/src/workspace/join.rs +++ b/lib/api/src/workspace/join.rs @@ -86,6 +86,7 @@ pub async fn update_join_strategy( ("session" = []) ) )] +#[tracing::instrument(skip(session, service, path, params))] pub async fn apply_join( session: Session, service: web::Data, @@ -183,6 +184,7 @@ pub async fn list_join_applies( ("session" = []) ) )] +#[tracing::instrument(skip(session, service, path, params))] pub async fn approve_join( session: Session, service: web::Data, diff --git a/lib/api/src/workspace/workspace.rs b/lib/api/src/workspace/workspace.rs index fcf443d..363246f 100644 --- a/lib/api/src/workspace/workspace.rs +++ b/lib/api/src/workspace/workspace.rs @@ -27,6 +27,7 @@ fn ok_json(data: T) -> Result { ("session" = []) ) )] +#[tracing::instrument(skip(session, service, params))] pub async fn create_workspace( session: Session, service: web::Data, @@ -96,6 +97,7 @@ pub async fn get_workspace( ("session" = []) ) )] +#[tracing::instrument(skip(session, service, path, params))] pub async fn update_workspace( session: Session, service: web::Data, diff --git a/lib/service/agent/run.rs b/lib/service/agent/run.rs index e6dd7c5..20be119 100644 --- a/lib/service/agent/run.rs +++ b/lib/service/agent/run.rs @@ -116,6 +116,7 @@ impl AppService { .await?; let invocation_id = Uuid::now_v7(); + let model_name = ctx.provider_model_name.clone(); info!( invocation_id = %invocation_id, session_id = %ctx.session_id, @@ -144,6 +145,7 @@ impl AppService { match result { Ok(output) => { + self.metrics.record_ai_run(&model_name, "completed"); let message_id = self .persist_assistant_message( conversation_id, @@ -247,6 +249,7 @@ impl AppService { }) } Err(e) => { + self.metrics.record_ai_run(&model_name, "failed"); warn!( invocation_id = %invocation_id, error = %e, diff --git a/lib/service/agent/sse.rs b/lib/service/agent/sse.rs index 530c2ce..bf046d7 100644 --- a/lib/service/agent/sse.rs +++ b/lib/service/agent/sse.rs @@ -82,6 +82,7 @@ impl AppService { let invocation_id = Uuid::now_v7(); let ctx_clone = ctx.clone(); + let model_name = ctx.provider_model_name.clone(); let self_clone = self.clone(); info!( @@ -180,6 +181,25 @@ impl AppService { match agent_result { Ok(result) => { + self_clone.metrics.record_ai_run(&model_name, "completed"); + self_clone.metrics.record_ai_token_usage( + &model_name, + result.input_tokens, + result.output_tokens, + ); + for step in &result.steps { + for tc in &step.tool_calls { + let status = if tc.error.is_some() { + "error" + } else { + "success" + }; + self_clone + .metrics + .record_ai_tool_call(&tc.name, status); + } + } + let reasoning_content: Option = { let collected: Vec = result .steps @@ -334,6 +354,7 @@ impl AppService { } } Err(e) => { + self_clone.metrics.record_ai_run(&model_name, "failed"); warn!(invocation_id = %invocation_id, error = %e, "agent sse stream failed"); let _ = tx .send(super::persistence::stream_error(&e.to_string())); diff --git a/lib/service/auth/login.rs b/lib/service/auth/login.rs index d8135fe..54ad769 100644 --- a/lib/service/auth/login.rs +++ b/lib/service/auth/login.rs @@ -19,6 +19,7 @@ pub struct LoginParams { impl AppService { pub const TOTP_KEY: &'static str = "totp_key"; + #[tracing::instrument(skip(self, params, context), fields(username = %params.username, ip = ?context.ip_address()))] pub async fn auth_login( &self, params: LoginParams, @@ -35,6 +36,10 @@ impl AppService { Err(_) => { let _ = Argon2::default() .hash_password(password.as_bytes()); + self.metrics + .auth_login_total + .with_label_values(&["user_not_found"]) + .inc(); return Err(AppError::UserNotFound); } } @@ -58,6 +63,10 @@ impl AppService { .is_err() { tracing::warn!(username = %params.username, ip = ?context.ip_address(), "Login failed: invalid password"); + self.metrics + .auth_login_total + .with_label_values(&["invalid_password"]) + .inc(); return Err(AppError::UserNotFound); } @@ -84,6 +93,14 @@ impl AppService { .await .map_err(|e| AppError::InternalServerError(e.to_string()))?; tracing::info!(username = %params.username, ip = ?context.ip_address(), "Login 2FA triggered"); + self.metrics + .auth_login_total + .with_label_values(&["2fa_required"]) + .inc(); + self.metrics + .auth_2fa_triggered_total + .with_label_values(&[]) + .inc(); return Err(AppError::TwoFactorRequired); } @@ -99,6 +116,10 @@ impl AppService { context.remove(Self::RSA_PRIVATE_KEY); context.remove(Self::RSA_PUBLIC_KEY); tracing::info!(user_uid = %user.id, username = %user.username, ip = ?context.ip_address(), "User logged in successfully"); + self.metrics + .auth_login_total + .with_label_values(&["success"]) + .inc(); Ok(()) } diff --git a/lib/service/auth/register.rs b/lib/service/auth/register.rs index e24d7bb..f610408 100644 --- a/lib/service/auth/register.rs +++ b/lib/service/auth/register.rs @@ -15,6 +15,7 @@ pub struct RegisterParams { } impl AppService { + #[tracing::instrument(skip(self, params, context), fields(username = %params.username))] pub async fn auth_register( &self, params: RegisterParams, @@ -31,6 +32,10 @@ impl AppService { let email_exists = self.auth_find_user_by_email(¶ms.email).await.is_ok(); if username_exists || email_exists { + self.metrics + .auth_register_total + .with_label_values(&["already_exists"]) + .inc(); return Err(AppError::AccountAlreadyExists); } @@ -87,6 +92,10 @@ impl AppService { context.remove(Self::RSA_PRIVATE_KEY); context.remove(Self::RSA_PUBLIC_KEY); tracing::info!(user_uid = %user_id, username = %user.username, "User registered successfully"); + self.metrics + .auth_register_total + .with_label_values(&["success"]) + .inc(); Ok(user) } } diff --git a/lib/service/auth/reset_pass.rs b/lib/service/auth/reset_pass.rs index 9fc7e49..8c2788a 100644 --- a/lib/service/auth/reset_pass.rs +++ b/lib/service/auth/reset_pass.rs @@ -51,6 +51,10 @@ impl AppService { .await { tracing::error!(error = %e, user_uid = %user.id, "Failed to cache reset token"); + self.metrics + .auth_password_reset_total + .with_label_values(&["cache_error"]) + .inc(); return Ok(()); } @@ -58,6 +62,10 @@ impl AppService { Ok(d) => d, Err(e) => { tracing::error!(error = %e, "Domain not configured for password reset"); + self.metrics + .auth_password_reset_total + .with_label_values(&["config_error"]) + .inc(); return Ok(()); } }; @@ -79,6 +87,15 @@ impl AppService { .await { tracing::error!(error = %e, email = %params.email, "Failed to queue password reset email"); + self.metrics + .auth_password_reset_total + .with_label_values(&["queue_error"]) + .inc(); + } else { + self.metrics + .auth_password_reset_total + .with_label_values(&["request_success"]) + .inc(); } tracing::info!(email = %params.email, user_uid = %user.id, "Password reset email queued"); @@ -93,6 +110,10 @@ impl AppService { params: ResetPasswordVerifyParams, ) -> Result<(), AppError> { if params.token.is_empty() { + self.metrics + .auth_password_reset_total + .with_label_values(&["invalid_token"]) + .inc(); return Err(AppError::InvalidResetToken); } @@ -108,6 +129,10 @@ impl AppService { > Duration::hours(Self::RESET_PASS_EXPIRY_HOURS) { let _ = self.cache.remove(&cache_key).await; + self.metrics + .auth_password_reset_total + .with_label_values(&["expired"]) + .inc(); return Err(AppError::ResetTokenExpired); } @@ -136,9 +161,17 @@ impl AppService { .map_err(|e| AppError::DatabaseError(e.to_string()))?; if result.rows_affected() == 0 { + self.metrics + .auth_password_reset_total + .with_label_values(&["invalid_token"]) + .inc(); return Err(AppError::InvalidResetToken); } + self.metrics + .auth_password_reset_total + .with_label_values(&["verify_success"]) + .inc(); tracing::info!(user_uid = %pending.user_uid, "Password reset successfully"); Ok(()) } diff --git a/lib/service/git/embed.rs b/lib/service/git/embed.rs new file mode 100644 index 0000000..7a397fa --- /dev/null +++ b/lib/service/git/embed.rs @@ -0,0 +1,122 @@ +use db::sqlx; +use serde::{Deserialize, Serialize}; +use session::Session; +use utoipa::ToSchema; + +use crate::{AppService, error::AppError}; + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct RepoEmbedCardResponse { + // Repo basics + pub name: String, + pub description: Option, + pub default_branch: String, + pub visibility: String, + pub size_bytes: i64, + pub is_archived: bool, + pub updated_at: String, + + // Language + pub language: Option, + + // Stats + pub star_count: i64, + pub fork_count: i64, + + // Topics + pub topics: Vec, +} + +impl AppService { + pub async fn repo_embed_card( + &self, + ctx: &Session, + wk_name: &str, + repo_name: &str, + ) -> Result { + let repo = self.git_require_member(ctx, wk_name, repo_name).await?; + + // Fetch language, topics, star count, fork count in parallel + let (lang, topics, star_count, fork_count) = tokio::try_join!( + self.git_repo_embed_language(repo.id), + self.git_repo_embed_topics(repo.id), + self.git_repo_embed_star_count(repo.id), + self.git_repo_embed_fork_count(repo.id), + )?; + + Ok(RepoEmbedCardResponse { + name: repo.name, + description: repo.description, + default_branch: repo.default_branch, + visibility: repo.visibility, + size_bytes: repo.size_bytes, + is_archived: repo.is_archived, + updated_at: repo.updated_at.to_rfc3339(), + language: lang, + star_count, + fork_count, + topics, + }) + } + + async fn git_repo_embed_language( + &self, + repo_id: uuid::Uuid, + ) -> Result, AppError> { + let row: Option<(String,)> = sqlx::query_as( + "SELECT language FROM repo_language WHERE repo = $1 ORDER BY bytes DESC LIMIT 1", + ) + .bind(repo_id) + .fetch_optional(self.db.reader()) + .await + .map_err(|e| AppError::DatabaseError(e.to_string()))?; + + Ok(row.map(|r| r.0)) + } + + async fn git_repo_embed_topics( + &self, + repo_id: uuid::Uuid, + ) -> Result, AppError> { + let rows: Vec<(String,)> = sqlx::query_as( + "SELECT topic FROM repo_topic WHERE repo = $1 ORDER BY topic", + ) + .bind(repo_id) + .fetch_all(self.db.reader()) + .await + .map_err(|e| AppError::DatabaseError(e.to_string()))?; + + Ok(rows.into_iter().map(|r| r.0).collect()) + } + + async fn git_repo_embed_star_count( + &self, + repo_id: uuid::Uuid, + ) -> Result { + let row: (i64,) = + sqlx::query_as("SELECT COUNT(*) FROM repo_star WHERE repo = $1") + .bind(repo_id) + .fetch_one(self.db.reader()) + .await + .map_err(|e| AppError::DatabaseError(e.to_string()))?; + + Ok(row.0) + } + + async fn git_repo_embed_fork_count( + &self, + repo_id: uuid::Uuid, + ) -> Result { + let row: (i64,) = sqlx::query_as( + "SELECT COUNT(*) FROM repo_fork f \ + INNER JOIN repo r ON r.id = f.repo AND r.deleted_at IS NULL \ + WHERE f.source_repo = $1", + ) + .bind(repo_id) + .fetch_one(self.db.reader()) + .await + .map_err(|e| AppError::DatabaseError(e.to_string()))?; + + Ok(row.0) + } +} diff --git a/lib/service/git/fork.rs b/lib/service/git/fork.rs index 92e9a4a..dbe14e7 100644 --- a/lib/service/git/fork.rs +++ b/lib/service/git/fork.rs @@ -4,7 +4,10 @@ use model::repos::RepoModel; use serde::{Deserialize, Serialize}; use session::Session; -use crate::{AppService, Pagination, error::AppError, session_user}; +use crate::{ + AppService, Pagination, error::AppError, metrics::with_op_metric, + session_user, +}; #[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] pub struct ForkResponse { @@ -41,6 +44,7 @@ struct ForkListRow { } impl AppService { + #[tracing::instrument(skip(self, ctx), fields(workspace = %wk_name, repo = %repo_name))] pub async fn repo_fork_create( &self, ctx: &Session, @@ -48,7 +52,8 @@ impl AppService { repo_name: &str, params: CreateFork, ) -> Result { - let user_uid = session_user(ctx)?; + with_op_metric(&self.metrics.repo_fork_total, &[], async { + let user_uid = session_user(ctx)?; let src_wk = self.workspace_resolve(wk_name).await?; self.workspace_require_member(src_wk.id, user_uid).await?; let source_repo = self.repo_resolve(src_wk.id, repo_name).await?; @@ -148,7 +153,7 @@ impl AppService { txn.commit().await.map_err(|_| AppError::TxnError)?; self.queue_sync(repo_id).await; - Ok(ForkResponse { + Ok(ForkResponse { id: fork_repo.id, name: fork_repo.name, description: fork_repo.description, @@ -157,7 +162,8 @@ impl AppService { source_repo: source_repo.id, forked_by: user_uid, created_at: fork_repo.created_at, - }) + }) + }).await } pub async fn repo_fork_list( diff --git a/lib/service/git/init.rs b/lib/service/git/init.rs index d8b6dcd..85f5d16 100644 --- a/lib/service/git/init.rs +++ b/lib/service/git/init.rs @@ -3,7 +3,10 @@ use git::rpc::{proto as p, proto::init_service_client::InitServiceClient}; use model::repos::RepoModel; use session::Session; -use crate::{AppService, error::AppError, git::rpc_err, session_user}; +use crate::{ + AppService, error::AppError, git::rpc_err, metrics::with_op_metric, + session_user, +}; #[derive(Debug, Clone, serde::Deserialize, utoipa::ToSchema)] pub struct CreateRepo { @@ -24,13 +27,15 @@ pub struct CloneRepo { } impl AppService { + #[tracing::instrument(skip(self, ctx), fields(workspace = %wk_name))] pub async fn git_init_bare( &self, ctx: &Session, wk_name: &str, params: CreateRepo, ) -> Result { - let user_uid = session_user(ctx)?; + with_op_metric(&self.metrics.repo_operations_total, &["create"], async { + let user_uid = session_user(ctx)?; let wk = self.workspace_resolve(wk_name).await?; self.workspace_require_admin(wk.id, user_uid).await?; @@ -120,7 +125,8 @@ impl AppService { txn.commit().await.map_err(|_| AppError::TxnError)?; self.queue_sync(repo_id).await; - Ok(repo) + Ok(repo) + }).await } pub async fn git_clone_bare( diff --git a/lib/service/git/mod.rs b/lib/service/git/mod.rs index 5fb92dd..7cee2b2 100644 --- a/lib/service/git/mod.rs +++ b/lib/service/git/mod.rs @@ -8,6 +8,7 @@ pub mod compare; pub mod contents; pub mod contributor; pub mod diff; +pub mod embed; pub mod fork; pub mod init; pub mod language; diff --git a/lib/service/git/repo.rs b/lib/service/git/repo.rs index 902746b..69ac380 100644 --- a/lib/service/git/repo.rs +++ b/lib/service/git/repo.rs @@ -5,7 +5,8 @@ use serde::{Deserialize, Serialize}; use session::Session; use crate::{ - AppService, Pagination, error::AppError, git::rpc_err, session_user, + AppService, Pagination, error::AppError, git::rpc_err, + metrics::with_op_metric, session_user, }; #[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] @@ -148,7 +149,8 @@ impl AppService { repo_name: &str, params: UpdateRepo, ) -> Result { - let user_uid = session_user(ctx)?; + with_op_metric(&self.metrics.repo_operations_total, &["update"], async { + let user_uid = session_user(ctx)?; let wk = self.workspace_resolve(wk_name).await?; self.workspace_require_admin(wk.id, user_uid).await?; let mut repo = self.repo_resolve(wk.id, repo_name).await?; @@ -251,7 +253,8 @@ impl AppService { self.queue_sync(repo.id).await; } - Ok(repo_response(updated)) + Ok(repo_response(updated)) + }).await } pub async fn repo_archive( @@ -282,19 +285,26 @@ impl AppService { wk_name: &str, repo_name: &str, ) -> Result<(), AppError> { - let user_uid = session_user(ctx)?; - let wk = self.workspace_resolve(wk_name).await?; - self.workspace_require_owner(wk.id, user_uid).await?; - let repo = self.repo_resolve(wk.id, repo_name).await?; + with_op_metric( + &self.metrics.repo_operations_total, + &["delete"], + async { + let user_uid = session_user(ctx)?; + let wk = self.workspace_resolve(wk_name).await?; + self.workspace_require_owner(wk.id, user_uid).await?; + let repo = self.repo_resolve(wk.id, repo_name).await?; - sqlx::query("UPDATE repo SET deleted_at = $1 WHERE id = $2") - .bind(chrono::Utc::now()) - .bind(repo.id) - .execute(self.db.writer()) - .await - .map_err(|e| AppError::DatabaseError(e.to_string()))?; + sqlx::query("UPDATE repo SET deleted_at = $1 WHERE id = $2") + .bind(chrono::Utc::now()) + .bind(repo.id) + .execute(self.db.writer()) + .await + .map_err(|e| AppError::DatabaseError(e.to_string()))?; - Ok(()) + Ok(()) + }, + ) + .await } pub async fn repo_transfer( @@ -304,7 +314,8 @@ impl AppService { repo_name: &str, params: TransferRepo, ) -> Result { - let user_uid = session_user(ctx)?; + with_op_metric(&self.metrics.repo_transfer_total, &[], async { + let user_uid = session_user(ctx)?; let src_wk = self.workspace_resolve(wk_name).await?; self.workspace_require_owner(src_wk.id, user_uid).await?; let repo = self.repo_resolve(src_wk.id, repo_name).await?; @@ -355,7 +366,8 @@ impl AppService { .map_err(|e| AppError::DatabaseError(e.to_string()))?; txn.commit().await.map_err(|_| AppError::TxnError)?; - Ok(repo_response(updated)) + Ok(repo_response(updated)) + }).await } pub async fn repo_topics( diff --git a/lib/service/issues/issue.rs b/lib/service/issues/issue.rs index f305c97..afa57c4 100644 --- a/lib/service/issues/issue.rs +++ b/lib/service/issues/issue.rs @@ -4,7 +4,10 @@ use serde::Deserialize; use session::Session; use super::types::{IssueFilter, IssueResponse, issue_author}; -use crate::{AppService, Pagination, error::AppError, session_user}; +use crate::{ + AppService, Pagination, error::AppError, metrics::with_op_metric, + session_user, +}; #[derive(Debug, Clone, Deserialize, utoipa::ToSchema)] pub struct CreateIssue { @@ -23,12 +26,14 @@ pub struct UpdateIssue { } impl AppService { + #[tracing::instrument(skip(self, ctx), fields(workspace = %wk_name))] pub async fn issue_create( &self, ctx: &Session, wk_name: &str, params: CreateIssue, ) -> Result { + with_op_metric(&self.metrics.issue_operations_total, &["create"], async { let user_uid = session_user(ctx)?; let wk = self.workspace_resolve(wk_name).await?; self.workspace_require_member(wk.id, user_uid).await?; @@ -102,7 +107,8 @@ impl AppService { milestone: None, repos: Vec::new(), pull_requests: Vec::new(), - }) + }) + }).await } pub async fn issue_list( @@ -292,12 +298,14 @@ impl AppService { self.issue_build_response(issue).await } + #[tracing::instrument(skip(self, ctx), fields(workspace = %wk_name, issue = %number))] pub async fn issue_close( &self, ctx: &Session, wk_name: &str, number: i64, ) -> Result { + with_op_metric(&self.metrics.issue_operations_total, &["close"], async { let user_uid = session_user(ctx)?; let wk = self.workspace_resolve(wk_name).await?; self.workspace_require_member(wk.id, user_uid).await?; @@ -335,14 +343,17 @@ impl AppService { let issue = self.issue_resolve(wk.id, number).await?; self.issue_build_response(issue).await + }).await } + #[tracing::instrument(skip(self, ctx), fields(workspace = %wk_name, issue = %number))] pub async fn issue_reopen( &self, ctx: &Session, wk_name: &str, number: i64, ) -> Result { + with_op_metric(&self.metrics.issue_operations_total, &["reopen"], async { let user_uid = session_user(ctx)?; let wk = self.workspace_resolve(wk_name).await?; self.workspace_require_member(wk.id, user_uid).await?; @@ -379,6 +390,7 @@ impl AppService { let issue = self.issue_resolve(wk.id, number).await?; self.issue_build_response(issue).await + }).await } pub async fn issue_delete( diff --git a/lib/service/lib.rs b/lib/service/lib.rs index c6a5fef..732a122 100644 --- a/lib/service/lib.rs +++ b/lib/service/lib.rs @@ -16,6 +16,7 @@ pub mod auth; pub mod error; pub mod git; pub mod issues; +pub mod metrics; pub mod pull_request; pub mod user; pub mod users; @@ -62,4 +63,6 @@ pub struct AppService { pub config: AppConfig, pub git: Channel, pub redis_pool: RedisPool, + pub metrics_registry: track::MetricsRegistry, + pub metrics: metrics::ServiceMetrics, } diff --git a/lib/service/metrics.rs b/lib/service/metrics.rs new file mode 100644 index 0000000..de36841 --- /dev/null +++ b/lib/service/metrics.rs @@ -0,0 +1,311 @@ +use prometheus::{CounterVec, HistogramVec}; + +use track::MetricsRegistry; + +#[derive(Clone)] +pub struct ServiceMetrics { + pub auth_login_total: CounterVec, + pub auth_register_total: CounterVec, + pub auth_2fa_triggered_total: CounterVec, + pub auth_password_reset_total: CounterVec, + + pub repo_operations_total: CounterVec, + pub repo_fork_total: CounterVec, + pub repo_transfer_total: CounterVec, + + pub workspace_operations_total: CounterVec, + pub workspace_join_total: CounterVec, + + pub issue_operations_total: CounterVec, + + pub pr_operations_total: CounterVec, + pub pr_merge_total: CounterVec, + + pub ai_agent_runs_total: CounterVec, + pub ai_tool_calls_total: CounterVec, + pub ai_token_usage_total: CounterVec, + + pub db_query_duration_seconds: HistogramVec, + pub db_queries_total: CounterVec, + + pub cache_hits_total: CounterVec, + pub cache_misses_total: CounterVec, + pub cache_sets_total: CounterVec, + pub cache_removes_total: CounterVec, + + pub storage_operations_total: CounterVec, + pub storage_bytes_total: CounterVec, + + pub queue_messages_total: CounterVec, + pub queue_dlq_total: CounterVec, +} + +impl ServiceMetrics { + pub fn record_ai_run(&self, model: &str, status: &str) { + self.ai_agent_runs_total + .with_label_values(&[model, status]) + .inc(); + track::record_otel_counter( + "ai_agent_runs_total", + 1, + &[("model", model.to_string()), ("status", status.to_string())], + ); + } + + pub fn record_ai_tool_call(&self, tool_name: &str, status: &str) { + self.ai_tool_calls_total + .with_label_values(&[tool_name, status]) + .inc(); + track::record_otel_counter( + "ai_tool_calls_total", + 1, + &[ + ("tool_name", tool_name.to_string()), + ("status", status.to_string()), + ], + ); + } + + pub fn record_ai_token_usage( + &self, + model: &str, + input_tokens: i64, + output_tokens: i64, + ) { + if input_tokens > 0 { + self.ai_token_usage_total + .with_label_values(&[model, "input"]) + .inc_by(input_tokens as f64); + track::record_otel_counter( + "ai_token_usage_total", + input_tokens as u64, + &[ + ("model", model.to_string()), + ("direction", "input".to_string()), + ], + ); + } + if output_tokens > 0 { + self.ai_token_usage_total + .with_label_values(&[model, "output"]) + .inc_by(output_tokens as f64); + track::record_otel_counter( + "ai_token_usage_total", + output_tokens as u64, + &[ + ("model", model.to_string()), + ("direction", "output".to_string()), + ], + ); + } + } + + pub fn init(registry: &MetricsRegistry) -> Self { + Self { + auth_login_total: cvec( + registry, + "auth_login_total", + "Total login attempts", + &["status"], + ), + auth_register_total: cvec( + registry, + "auth_register_total", + "Total user registrations", + &["status"], + ), + auth_2fa_triggered_total: cvec( + registry, + "auth_2fa_triggered_total", + "Total 2FA challenges triggered", + &[], + ), + auth_password_reset_total: cvec( + registry, + "auth_password_reset_total", + "Total password reset operations", + &["status"], + ), + + repo_operations_total: cvec( + registry, + "repo_operations_total", + "Total repo operations", + &["operation", "status"], + ), + repo_fork_total: cvec( + registry, + "repo_fork_total", + "Total fork creations", + &["status"], + ), + repo_transfer_total: cvec( + registry, + "repo_transfer_total", + "Total repo transfers", + &["status"], + ), + + workspace_operations_total: cvec( + registry, + "workspace_operations_total", + "Total workspace operations", + &["operation", "status"], + ), + workspace_join_total: cvec( + registry, + "workspace_join_total", + "Total workspace join operations", + &["operation"], + ), + + issue_operations_total: cvec( + registry, + "issue_operations_total", + "Total issue operations", + &["operation", "status"], + ), + + pr_operations_total: cvec( + registry, + "pr_operations_total", + "Total pull request operations", + &["operation", "status"], + ), + pr_merge_total: cvec( + registry, + "pr_merge_total", + "Total PR merges", + &["method", "status"], + ), + + ai_agent_runs_total: cvec( + registry, + "ai_agent_runs_total", + "Total AI agent invocations", + &["model", "status"], + ), + ai_tool_calls_total: cvec( + registry, + "ai_tool_calls_total", + "Total AI tool calls", + &["tool_name", "status"], + ), + ai_token_usage_total: cvec( + registry, + "ai_token_usage_total", + "Total AI token usage", + &["model", "direction"], + ), + + db_query_duration_seconds: hvec( + registry, + "db_query_duration_seconds", + "DB query duration in seconds", + &["kind", "route"], + vec![ + 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, + 5.0, + ], + ), + db_queries_total: cvec( + registry, + "db_queries_total", + "Total database queries", + &["kind", "route", "status"], + ), + + cache_hits_total: cvec( + registry, + "cache_hits_total", + "Total cache hits", + &["tier"], + ), + cache_misses_total: cvec( + registry, + "cache_misses_total", + "Total cache misses", + &[], + ), + cache_sets_total: cvec( + registry, + "cache_sets_total", + "Total cache set operations", + &[], + ), + cache_removes_total: cvec( + registry, + "cache_removes_total", + "Total cache remove operations", + &[], + ), + + storage_operations_total: cvec( + registry, + "storage_operations_total", + "Total storage operations", + &["operation", "backend"], + ), + storage_bytes_total: cvec( + registry, + "storage_bytes_total", + "Total bytes transferred", + &["operation"], + ), + + queue_messages_total: cvec( + registry, + "queue_messages_total", + "Total queue messages", + &["topic", "status"], + ), + queue_dlq_total: cvec( + registry, + "queue_dlq_total", + "Total messages routed to DLQ", + &["topic"], + ), + } + } +} + +/// Wraps an async operation and records a business metric with `success`/`error` status. +/// `op_labels` are the dimension labels (e.g., `["create"]`). +/// The final label `"success"` or `"error"` is appended automatically. +pub(crate) async fn with_op_metric( + counter: &CounterVec, + op_labels: &[&str], + fut: Fut, +) -> Result +where + Fut: std::future::Future>, +{ + let result = fut.await; + let mut labels: Vec<&str> = op_labels.to_vec(); + labels.push(if result.is_ok() { "success" } else { "error" }); + counter.with_label_values(&labels).inc(); + result +} + +fn cvec( + registry: &MetricsRegistry, + name: &str, + help: &str, + labels: &[&str], +) -> CounterVec { + registry + .register_counter_vec(name, help, labels) + .expect("failed to register counter metric") +} + +fn hvec( + registry: &MetricsRegistry, + name: &str, + help: &str, + labels: &[&str], + buckets: Vec, +) -> HistogramVec { + registry + .register_histogram_vec(name, help, labels, buckets) + .expect("failed to register histogram metric") +} diff --git a/lib/service/pull_request/merge.rs b/lib/service/pull_request/merge.rs index 14c1d71..0b08164 100644 --- a/lib/service/pull_request/merge.rs +++ b/lib/service/pull_request/merge.rs @@ -4,7 +4,7 @@ use serde::Deserialize; use session::Session; use crate::{ - AppService, error::AppError, git::rpc_err, + AppService, error::AppError, git::rpc_err, metrics::with_op_metric, pull_request::types::PullRequestResponse, session_user, }; @@ -72,6 +72,7 @@ impl AppService { Ok(resp) } + #[tracing::instrument(skip(self, ctx, params), fields(workspace = %wk_name, repo = %repo_name, pr = %number))] pub async fn pr_merge( &self, ctx: &Session, @@ -80,6 +81,8 @@ impl AppService { number: i64, params: MergePullRequest, ) -> Result { + let method = params.method.unwrap_or_else(|| "merge".to_string()); + with_op_metric(&self.metrics.pr_merge_total, &[&method], async { let user_uid = session_user(ctx)?; let (repo_id, _) = self.pr_resolve_repo_admin(ctx, wk_name, repo_name).await?; @@ -95,8 +98,6 @@ impl AppService { "draft pull request cannot be merged".to_string(), )); } - - let method = params.method.unwrap_or_else(|| "merge".to_string()); let now = chrono::Utc::now(); let merge_result_sha = match method.as_str() { @@ -181,6 +182,7 @@ impl AppService { let pr = self.pr_resolve(repo_id, number).await?; self.pr_build_response(pr).await + }).await } pub async fn pr_merge_abort( diff --git a/lib/service/pull_request/pull_request.rs b/lib/service/pull_request/pull_request.rs index 535a59a..774266e 100644 --- a/lib/service/pull_request/pull_request.rs +++ b/lib/service/pull_request/pull_request.rs @@ -7,6 +7,7 @@ use crate::{ AppService, Pagination, error::AppError, issues::types::issue_author, + metrics::with_op_metric, pull_request::types::{PullRequestFilter, PullRequestResponse}, session_user, }; @@ -30,6 +31,7 @@ pub struct UpdatePullRequest { } impl AppService { + #[tracing::instrument(skip(self, ctx), fields(workspace = %wk_name, repo = %repo_name))] pub async fn pr_create( &self, ctx: &Session, @@ -37,6 +39,7 @@ impl AppService { repo_name: &str, params: CreatePullRequest, ) -> Result { + with_op_metric(&self.metrics.pr_operations_total, &["create"], async { let user_uid = session_user(ctx)?; let (repo_id, repo) = self.pr_resolve_repo(ctx, wk_name, repo_name).await?; @@ -104,6 +107,7 @@ impl AppService { .map_err(|e| AppError::DatabaseError(e.to_string()))?; self.pr_build_response(pr).await + }).await } pub async fn pr_list( diff --git a/lib/service/workspace/join.rs b/lib/service/workspace/join.rs index 46898b2..2656ab2 100644 --- a/lib/service/workspace/join.rs +++ b/lib/service/workspace/join.rs @@ -5,7 +5,9 @@ use model::workspace::{ use serde::{Deserialize, Serialize}; use session::Session; -use crate::{AppService, error::AppError, session_user}; +use crate::{ + AppService, error::AppError, metrics::with_op_metric, session_user, +}; const JOIN_STATUS_PENDING: &str = "pending"; const JOIN_STATUS_APPROVED: &str = "approved"; @@ -166,12 +168,14 @@ impl AppService { Ok(strategy_response(saved, &wk)) } + #[tracing::instrument(skip(self, ctx, params), fields(workspace = %name))] pub async fn workspace_apply_join( &self, ctx: &Session, name: &str, params: CreateWorkspaceJoinApply, ) -> Result { + with_op_metric(&self.metrics.workspace_join_total, &["apply"], async { let user_uid = session_user(ctx)?; let wk = self.workspace_resolve(name).await?; if self.workspace_member(wk.id, user_uid).await.is_ok() { @@ -245,7 +249,8 @@ impl AppService { &wk, current_user.username, clean_optional(Some(current_user.avatar_url)), - )) + )) + }).await } pub async fn workspace_my_join_applies( @@ -337,6 +342,7 @@ impl AppService { .collect()) } + #[tracing::instrument(skip(self, ctx, params), fields(workspace = %name, username = %username))] pub async fn workspace_approve_join_apply( &self, ctx: &Session, @@ -344,6 +350,8 @@ impl AppService { username: &str, params: ApproveWorkspaceJoinApply, ) -> Result { + let op = if params.approved { "approve" } else { "reject" }; + with_op_metric(&self.metrics.workspace_join_total, &[op], async { let approver = session_user(ctx)?; let wk = self.workspace_resolve(name).await?; self.workspace_require_admin(wk.id, approver).await?; @@ -408,7 +416,6 @@ impl AppService { .map_err(|e| AppError::DatabaseError(e.to_string()))?; } - txn.commit().await.map_err(|_| AppError::TxnError)?; Ok(approval_response( approval, &wk, @@ -416,7 +423,8 @@ impl AppService { clean_optional(Some(applicant.avatar_url)), approver_user.username, clean_optional(Some(approver_user.avatar_url)), - )) + )) + }).await } async fn workspace_join_strategy_by_wk( diff --git a/lib/service/workspace/workspace.rs b/lib/service/workspace/workspace.rs index 990e6b6..0292204 100644 --- a/lib/service/workspace/workspace.rs +++ b/lib/service/workspace/workspace.rs @@ -7,7 +7,9 @@ use storage::{ObjectStorage, PutObjectOptions}; use super::types::{ WorkspaceListRow, WorkspaceResponse, normalize_name, workspace_response, }; -use crate::{AppService, error::AppError, session_user}; +use crate::{ + AppService, error::AppError, metrics::with_op_metric, session_user, +}; const ALLOWED_AVATAR_TYPES: &[&str] = &["image/png", "image/jpeg", "image/webp", "image/gif"]; @@ -43,11 +45,13 @@ pub struct UpdateWorkspace { } impl AppService { + #[tracing::instrument(skip(self, ctx), fields(workspace = %params.name))] pub async fn workspace_create( &self, ctx: &Session, params: CreateWorkspace, ) -> Result { + with_op_metric(&self.metrics.workspace_operations_total, &["create"], async { let user_uid = session_user(ctx)?; let name = normalize_name(¶ms.name)?; self.workspace_ensure_name_available(&name).await?; @@ -85,6 +89,7 @@ impl AppService { txn.commit().await.map_err(|_| AppError::TxnError)?; Ok(workspace_response(workspace, true, true)) + }).await } pub async fn workspace_my( @@ -132,6 +137,7 @@ impl AppService { name: &str, params: UpdateWorkspace, ) -> Result { + with_op_metric(&self.metrics.workspace_operations_total, &["update"], async { let user_uid = session_user(ctx)?; let mut wk = self.workspace_resolve(name).await?; self.workspace_require_admin(wk.id, user_uid).await?; @@ -185,6 +191,7 @@ impl AppService { let member = self.workspace_member(wk.id, user_uid).await?; Ok(workspace_response(wk, member.owner, member.admin)) + }).await } /// Get a workspace's avatar URL by workspace name.