use db::sqlx; use hmac::{Hmac, KeyInit, Mac}; use model::repos::{RepoWebhookDeliveryModel, RepoWebhookModel}; use serde::{Deserialize, Serialize}; use session::Session; use sha2::Sha256; use crate::{AppService, Pagination, error::AppError, session_user}; type HmacSha256 = Hmac; #[derive(Debug, Clone, Serialize, Deserialize)] pub enum WebhookEvent { Push, PushBranch, PushTag, Issue, PullRequest, Comment, Release, Fork, Wiki, } #[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] pub struct WebhookResponse { #[schema(value_type = String)] pub id: uuid::Uuid, #[schema(value_type = String)] pub repo: uuid::Uuid, pub url: String, pub events: Vec, pub active: bool, #[schema(value_type = String)] pub created_by: uuid::Uuid, #[schema(value_type = String)] pub created_at: chrono::DateTime, #[schema(value_type = String)] pub updated_at: chrono::DateTime, } pub fn webhook_response(w: RepoWebhookModel) -> WebhookResponse { WebhookResponse { id: w.id, repo: w.repo, url: w.url, events: w .events .split('.') .filter(|s| !s.is_empty()) .map(|s| s.to_string()) .collect(), active: w.active, created_by: w.created_by, created_at: w.created_at, updated_at: w.updated_at, } } #[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)] pub struct WebhookDeliveryResponse { #[schema(value_type = String)] pub id: uuid::Uuid, #[schema(value_type = String)] pub webhook: uuid::Uuid, pub event: String, pub response_status: Option, pub error: Option, #[schema(value_type = Option)] pub delivered_at: Option>, #[schema(value_type = String)] pub created_at: chrono::DateTime, } pub fn delivery_response( d: RepoWebhookDeliveryModel, ) -> WebhookDeliveryResponse { WebhookDeliveryResponse { id: d.id, webhook: d.webhook, event: d.event, response_status: d.response_status, error: d.error, delivered_at: d.delivered_at, created_at: d.created_at, } } #[derive(Debug, Clone, Deserialize, utoipa::ToSchema)] pub struct CreateWebhook { pub url: String, pub secret: Option, pub events: Vec, pub active: Option, } #[derive(Debug, Clone, Deserialize, utoipa::ToSchema)] pub struct UpdateWebhook { pub url: Option, pub secret: Option, pub events: Option>, pub active: Option, } impl AppService { pub async fn repo_webhook_list( &self, ctx: &Session, wk_name: &str, repo_name: &str, pagination: Pagination, ) -> Result, AppError> { let repo = self.git_require_member(ctx, wk_name, repo_name).await?; let rows = sqlx::query_as::<_, RepoWebhookModel>( "SELECT id, repo, url, secret_hash, events, active, created_by, created_at, updated_at \ FROM repo_webhook WHERE repo = $1 \ ORDER BY created_at DESC OFFSET $2 LIMIT $3", ) .bind(repo.id) .bind(pagination.offset() as i64) .bind(pagination.limit() as i64) .fetch_all(self.db.reader()) .await .map_err(|e| AppError::DatabaseError(e.to_string()))?; Ok(rows.into_iter().map(webhook_response).collect()) } pub async fn repo_webhook_create( &self, ctx: &Session, wk_name: &str, repo_name: &str, params: CreateWebhook, ) -> Result { let user_uid = session_user(ctx)?; let wk = self.workspace_resolve(wk_name).await?; self.workspace_require_admin(wk.id, user_uid).await?; let repo = self.repo_resolve(wk.id, repo_name).await?; let url = params.url.trim(); if url.is_empty() { return Err(AppError::BadRequest("url is required".to_string())); } let secret_hash = params.secret.map(|s| { let mut mac = HmacSha256::new_from_slice(b"gitdata-webhook-secret") .expect("HMAC can take key of any size"); mac.update(s.as_bytes()); let result = mac.finalize(); let code_bytes = result.into_bytes(); hex::encode(code_bytes) }); let events = params.events.join("."); let active = params.active.unwrap_or(true); let id = uuid::Uuid::now_v7(); let now = chrono::Utc::now(); let row = sqlx::query_as::<_, RepoWebhookModel>( "INSERT INTO repo_webhook \ (id, repo, url, secret_hash, events, active, created_by, created_at, updated_at) \ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $8) \ RETURNING id, repo, url, secret_hash, events, active, created_by, created_at, updated_at", ) .bind(id) .bind(repo.id) .bind(url) .bind(&secret_hash) .bind(&events) .bind(active) .bind(user_uid) .bind(now) .fetch_one(self.db.writer()) .await .map_err(|e| AppError::DatabaseError(e.to_string()))?; Ok(webhook_response(row)) } pub async fn repo_webhook_update( &self, ctx: &Session, wk_name: &str, repo_name: &str, webhook_id: uuid::Uuid, params: UpdateWebhook, ) -> Result { let user_uid = session_user(ctx)?; let wk = self.workspace_resolve(wk_name).await?; self.workspace_require_admin(wk.id, user_uid).await?; let repo = self.repo_resolve(wk.id, repo_name).await?; let existing = sqlx::query_as::<_, RepoWebhookModel>( "SELECT id, repo, url, secret_hash, events, active, created_by, created_at, updated_at \ FROM repo_webhook WHERE id = $1 AND repo = $2", ) .bind(webhook_id) .bind(repo.id) .fetch_optional(self.db.reader()) .await .map_err(|e| AppError::DatabaseError(e.to_string()))? .ok_or(AppError::NotFound("webhook not found".to_string()))?; let url = params.url.unwrap_or(existing.url); let events = params .events .map(|e| e.join(".")) .unwrap_or(existing.events); let active = params.active.unwrap_or(existing.active); let secret_hash = if let Some(secret) = params.secret { let mut mac = HmacSha256::new_from_slice(b"gitdata-webhook-secret") .expect("HMAC can take key of any size"); mac.update(secret.as_bytes()); let result = mac.finalize(); let code_bytes = result.into_bytes(); Some(hex::encode(code_bytes)) } else { existing.secret_hash }; let row = sqlx::query_as::<_, RepoWebhookModel>( "UPDATE repo_webhook SET url = $1, secret_hash = $2, events = $3, \ active = $4, updated_at = $5 WHERE id = $6 \ RETURNING id, repo, url, secret_hash, events, active, created_by, created_at, updated_at", ) .bind(&url) .bind(&secret_hash) .bind(&events) .bind(active) .bind(chrono::Utc::now()) .bind(webhook_id) .fetch_one(self.db.writer()) .await .map_err(|e| AppError::DatabaseError(e.to_string()))?; Ok(webhook_response(row)) } pub async fn repo_webhook_delete( &self, ctx: &Session, wk_name: &str, repo_name: &str, webhook_id: uuid::Uuid, ) -> Result<(), AppError> { let user_uid = session_user(ctx)?; let wk = self.workspace_resolve(wk_name).await?; self.workspace_require_admin(wk.id, user_uid).await?; let repo = self.repo_resolve(wk.id, repo_name).await?; let result = sqlx::query("DELETE FROM repo_webhook WHERE id = $1 AND repo = $2") .bind(webhook_id) .bind(repo.id) .execute(self.db.writer()) .await .map_err(|e| AppError::DatabaseError(e.to_string()))?; if result.rows_affected() == 0 { return Err(AppError::NotFound("webhook not found".to_string())); } Ok(()) } pub async fn repo_webhook_deliveries( &self, ctx: &Session, wk_name: &str, repo_name: &str, webhook_id: uuid::Uuid, pagination: Pagination, ) -> Result, AppError> { let repo = self.git_require_member(ctx, wk_name, repo_name).await?; let rows = sqlx::query_as::<_, RepoWebhookDeliveryModel>( "SELECT id, repo, webhook, event, request_headers, request_body, \ response_status, response_headers, response_body, error, delivered_at, created_at \ FROM repo_webhook_delivery WHERE webhook = $1 AND repo = $2 \ ORDER BY created_at DESC OFFSET $3 LIMIT $4", ) .bind(webhook_id) .bind(repo.id) .bind(pagination.offset() as i64) .bind(pagination.limit() as i64) .fetch_all(self.db.reader()) .await .map_err(|e| AppError::DatabaseError(e.to_string()))?; Ok(rows.into_iter().map(delivery_response).collect()) } pub async fn trigger_webhook_event( &self, repo_id: uuid::Uuid, event: &str, payload: serde_json::Value, ) -> Result<(), AppError> { let webhooks = sqlx::query_as::<_, RepoWebhookModel>( "SELECT id, repo, url, secret_hash, events, active, created_by, created_at, updated_at \ FROM repo_webhook WHERE repo = $1 AND active = true", ) .bind(repo_id) .fetch_all(self.db.reader()) .await .map_err(|e| AppError::DatabaseError(e.to_string()))?; for wh in webhooks { let subscribed_events: Vec<&str> = wh.events.split('.').filter(|s| !s.is_empty()).collect(); let matches = subscribed_events.iter().any(|e| { *e == event || *e == "push" && (event == "push_branch" || event == "push_tag") }); if !matches { continue; } let delivery_id = uuid::Uuid::now_v7(); let now = chrono::Utc::now(); sqlx::query( "INSERT INTO repo_webhook_delivery \ (id, repo, webhook, event, request_headers, request_body, \ response_status, response_headers, response_body, error, delivered_at, created_at) \ VALUES ($1, $2, $3, $4, NULL, NULL, NULL, NULL, NULL, NULL, NULL, $5)", ) .bind(delivery_id) .bind(repo_id) .bind(wh.id) .bind(event) .bind(now) .execute(self.db.writer()) .await .map_err(|e| AppError::DatabaseError(e.to_string()))?; let task = git::sync::webhook::WebhookDeliveryTask { id: delivery_id.to_string(), webhook_id: wh.id.to_string(), repo_id: repo_id.to_string(), event: event.to_string(), url: wh.url.clone(), secret: wh.secret_hash.clone(), payload: payload.clone(), created_at: now, retry_count: 0, }; if let Err(e) = git::sync::webhook::enqueue_delivery(task, &self.redis_pool) .await { tracing::error!( repo_id = %repo_id, webhook_id = %wh.id, error = %e, "failed to enqueue webhook delivery" ); } } Ok(()) } }