366 lines
12 KiB
Rust
366 lines
12 KiB
Rust
use db::sqlx;
|
|
use hmac::{Hmac, KeyInit, Mac};
|
|
use model::repos::{RepoWebhookDeliveryModel, RepoWebhookModel};
|
|
use serde::{Deserialize, Serialize};
|
|
use session::Session;
|
|
use sha2::Sha256;
|
|
|
|
use crate::{AppService, Pagination, error::AppError, session_user};
|
|
|
|
type HmacSha256 = Hmac<Sha256>;
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub enum WebhookEvent {
|
|
Push,
|
|
PushBranch,
|
|
PushTag,
|
|
Issue,
|
|
PullRequest,
|
|
Comment,
|
|
Release,
|
|
Fork,
|
|
Wiki,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
|
|
pub struct WebhookResponse {
|
|
#[schema(value_type = String)]
|
|
pub id: uuid::Uuid,
|
|
#[schema(value_type = String)]
|
|
pub repo: uuid::Uuid,
|
|
pub url: String,
|
|
pub events: Vec<String>,
|
|
pub active: bool,
|
|
#[schema(value_type = String)]
|
|
pub created_by: uuid::Uuid,
|
|
#[schema(value_type = String)]
|
|
pub created_at: chrono::DateTime<chrono::Utc>,
|
|
#[schema(value_type = String)]
|
|
pub updated_at: chrono::DateTime<chrono::Utc>,
|
|
}
|
|
|
|
pub fn webhook_response(w: RepoWebhookModel) -> WebhookResponse {
|
|
WebhookResponse {
|
|
id: w.id,
|
|
repo: w.repo,
|
|
url: w.url,
|
|
events: w
|
|
.events
|
|
.split('.')
|
|
.filter(|s| !s.is_empty())
|
|
.map(|s| s.to_string())
|
|
.collect(),
|
|
active: w.active,
|
|
created_by: w.created_by,
|
|
created_at: w.created_at,
|
|
updated_at: w.updated_at,
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
|
|
pub struct WebhookDeliveryResponse {
|
|
#[schema(value_type = String)]
|
|
pub id: uuid::Uuid,
|
|
#[schema(value_type = String)]
|
|
pub webhook: uuid::Uuid,
|
|
pub event: String,
|
|
pub response_status: Option<i32>,
|
|
pub error: Option<String>,
|
|
#[schema(value_type = Option<String>)]
|
|
pub delivered_at: Option<chrono::DateTime<chrono::Utc>>,
|
|
#[schema(value_type = String)]
|
|
pub created_at: chrono::DateTime<chrono::Utc>,
|
|
}
|
|
|
|
pub fn delivery_response(
|
|
d: RepoWebhookDeliveryModel,
|
|
) -> WebhookDeliveryResponse {
|
|
WebhookDeliveryResponse {
|
|
id: d.id,
|
|
webhook: d.webhook,
|
|
event: d.event,
|
|
response_status: d.response_status,
|
|
error: d.error,
|
|
delivered_at: d.delivered_at,
|
|
created_at: d.created_at,
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, Deserialize, utoipa::ToSchema)]
|
|
pub struct CreateWebhook {
|
|
pub url: String,
|
|
pub secret: Option<String>,
|
|
pub events: Vec<String>,
|
|
pub active: Option<bool>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Deserialize, utoipa::ToSchema)]
|
|
pub struct UpdateWebhook {
|
|
pub url: Option<String>,
|
|
pub secret: Option<String>,
|
|
pub events: Option<Vec<String>>,
|
|
pub active: Option<bool>,
|
|
}
|
|
|
|
impl AppService {
|
|
pub async fn repo_webhook_list(
|
|
&self,
|
|
ctx: &Session,
|
|
wk_name: &str,
|
|
repo_name: &str,
|
|
pagination: Pagination,
|
|
) -> Result<Vec<WebhookResponse>, AppError> {
|
|
let repo = self.git_require_member(ctx, wk_name, repo_name).await?;
|
|
|
|
let rows = sqlx::query_as::<_, RepoWebhookModel>(
|
|
"SELECT id, repo, url, secret_hash, events, active, created_by, created_at, updated_at \
|
|
FROM repo_webhook WHERE repo = $1 \
|
|
ORDER BY created_at DESC OFFSET $2 LIMIT $3",
|
|
)
|
|
.bind(repo.id)
|
|
.bind(pagination.offset() as i64)
|
|
.bind(pagination.limit() as i64)
|
|
.fetch_all(self.db.reader())
|
|
.await
|
|
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
|
|
|
|
Ok(rows.into_iter().map(webhook_response).collect())
|
|
}
|
|
|
|
pub async fn repo_webhook_create(
|
|
&self,
|
|
ctx: &Session,
|
|
wk_name: &str,
|
|
repo_name: &str,
|
|
params: CreateWebhook,
|
|
) -> Result<WebhookResponse, AppError> {
|
|
let user_uid = session_user(ctx)?;
|
|
let wk = self.workspace_resolve(wk_name).await?;
|
|
self.workspace_require_admin(wk.id, user_uid).await?;
|
|
let repo = self.repo_resolve(wk.id, repo_name).await?;
|
|
|
|
let url = params.url.trim();
|
|
if url.is_empty() {
|
|
return Err(AppError::BadRequest("url is required".to_string()));
|
|
}
|
|
|
|
let secret_hash = params.secret.map(|s| {
|
|
let mut mac = HmacSha256::new_from_slice(b"gitdata-webhook-secret")
|
|
.expect("HMAC can take key of any size");
|
|
mac.update(s.as_bytes());
|
|
let result = mac.finalize();
|
|
let code_bytes = result.into_bytes();
|
|
hex::encode(code_bytes)
|
|
});
|
|
|
|
let events = params.events.join(".");
|
|
let active = params.active.unwrap_or(true);
|
|
let id = uuid::Uuid::now_v7();
|
|
let now = chrono::Utc::now();
|
|
|
|
let row = sqlx::query_as::<_, RepoWebhookModel>(
|
|
"INSERT INTO repo_webhook \
|
|
(id, repo, url, secret_hash, events, active, created_by, created_at, updated_at) \
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $8) \
|
|
RETURNING id, repo, url, secret_hash, events, active, created_by, created_at, updated_at",
|
|
)
|
|
.bind(id)
|
|
.bind(repo.id)
|
|
.bind(url)
|
|
.bind(&secret_hash)
|
|
.bind(&events)
|
|
.bind(active)
|
|
.bind(user_uid)
|
|
.bind(now)
|
|
.fetch_one(self.db.writer())
|
|
.await
|
|
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
|
|
|
|
Ok(webhook_response(row))
|
|
}
|
|
|
|
pub async fn repo_webhook_update(
|
|
&self,
|
|
ctx: &Session,
|
|
wk_name: &str,
|
|
repo_name: &str,
|
|
webhook_id: uuid::Uuid,
|
|
params: UpdateWebhook,
|
|
) -> Result<WebhookResponse, AppError> {
|
|
let user_uid = session_user(ctx)?;
|
|
let wk = self.workspace_resolve(wk_name).await?;
|
|
self.workspace_require_admin(wk.id, user_uid).await?;
|
|
let repo = self.repo_resolve(wk.id, repo_name).await?;
|
|
|
|
let existing = sqlx::query_as::<_, RepoWebhookModel>(
|
|
"SELECT id, repo, url, secret_hash, events, active, created_by, created_at, updated_at \
|
|
FROM repo_webhook WHERE id = $1 AND repo = $2",
|
|
)
|
|
.bind(webhook_id)
|
|
.bind(repo.id)
|
|
.fetch_optional(self.db.reader())
|
|
.await
|
|
.map_err(|e| AppError::DatabaseError(e.to_string()))?
|
|
.ok_or(AppError::NotFound("webhook not found".to_string()))?;
|
|
|
|
let url = params.url.unwrap_or(existing.url);
|
|
let events = params
|
|
.events
|
|
.map(|e| e.join("."))
|
|
.unwrap_or(existing.events);
|
|
let active = params.active.unwrap_or(existing.active);
|
|
|
|
let secret_hash = if let Some(secret) = params.secret {
|
|
let mut mac = HmacSha256::new_from_slice(b"gitdata-webhook-secret")
|
|
.expect("HMAC can take key of any size");
|
|
mac.update(secret.as_bytes());
|
|
let result = mac.finalize();
|
|
let code_bytes = result.into_bytes();
|
|
Some(hex::encode(code_bytes))
|
|
} else {
|
|
existing.secret_hash
|
|
};
|
|
|
|
let row = sqlx::query_as::<_, RepoWebhookModel>(
|
|
"UPDATE repo_webhook SET url = $1, secret_hash = $2, events = $3, \
|
|
active = $4, updated_at = $5 WHERE id = $6 \
|
|
RETURNING id, repo, url, secret_hash, events, active, created_by, created_at, updated_at",
|
|
)
|
|
.bind(&url)
|
|
.bind(&secret_hash)
|
|
.bind(&events)
|
|
.bind(active)
|
|
.bind(chrono::Utc::now())
|
|
.bind(webhook_id)
|
|
.fetch_one(self.db.writer())
|
|
.await
|
|
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
|
|
|
|
Ok(webhook_response(row))
|
|
}
|
|
|
|
pub async fn repo_webhook_delete(
|
|
&self,
|
|
ctx: &Session,
|
|
wk_name: &str,
|
|
repo_name: &str,
|
|
webhook_id: uuid::Uuid,
|
|
) -> Result<(), AppError> {
|
|
let user_uid = session_user(ctx)?;
|
|
let wk = self.workspace_resolve(wk_name).await?;
|
|
self.workspace_require_admin(wk.id, user_uid).await?;
|
|
let repo = self.repo_resolve(wk.id, repo_name).await?;
|
|
|
|
let result =
|
|
sqlx::query("DELETE FROM repo_webhook WHERE id = $1 AND repo = $2")
|
|
.bind(webhook_id)
|
|
.bind(repo.id)
|
|
.execute(self.db.writer())
|
|
.await
|
|
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
|
|
|
|
if result.rows_affected() == 0 {
|
|
return Err(AppError::NotFound("webhook not found".to_string()));
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn repo_webhook_deliveries(
|
|
&self,
|
|
ctx: &Session,
|
|
wk_name: &str,
|
|
repo_name: &str,
|
|
webhook_id: uuid::Uuid,
|
|
pagination: Pagination,
|
|
) -> Result<Vec<WebhookDeliveryResponse>, AppError> {
|
|
let repo = self.git_require_member(ctx, wk_name, repo_name).await?;
|
|
|
|
let rows = sqlx::query_as::<_, RepoWebhookDeliveryModel>(
|
|
"SELECT id, repo, webhook, event, request_headers, request_body, \
|
|
response_status, response_headers, response_body, error, delivered_at, created_at \
|
|
FROM repo_webhook_delivery WHERE webhook = $1 AND repo = $2 \
|
|
ORDER BY created_at DESC OFFSET $3 LIMIT $4",
|
|
)
|
|
.bind(webhook_id)
|
|
.bind(repo.id)
|
|
.bind(pagination.offset() as i64)
|
|
.bind(pagination.limit() as i64)
|
|
.fetch_all(self.db.reader())
|
|
.await
|
|
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
|
|
|
|
Ok(rows.into_iter().map(delivery_response).collect())
|
|
}
|
|
pub async fn trigger_webhook_event(
|
|
&self,
|
|
repo_id: uuid::Uuid,
|
|
event: &str,
|
|
payload: serde_json::Value,
|
|
) -> Result<(), AppError> {
|
|
let webhooks = sqlx::query_as::<_, RepoWebhookModel>(
|
|
"SELECT id, repo, url, secret_hash, events, active, created_by, created_at, updated_at \
|
|
FROM repo_webhook WHERE repo = $1 AND active = true",
|
|
)
|
|
.bind(repo_id)
|
|
.fetch_all(self.db.reader())
|
|
.await
|
|
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
|
|
|
|
for wh in webhooks {
|
|
let subscribed_events: Vec<&str> =
|
|
wh.events.split('.').filter(|s| !s.is_empty()).collect();
|
|
let matches = subscribed_events.iter().any(|e| {
|
|
*e == event
|
|
|| *e == "push"
|
|
&& (event == "push_branch" || event == "push_tag")
|
|
});
|
|
|
|
if !matches {
|
|
continue;
|
|
}
|
|
|
|
let delivery_id = uuid::Uuid::now_v7();
|
|
let now = chrono::Utc::now();
|
|
sqlx::query(
|
|
"INSERT INTO repo_webhook_delivery \
|
|
(id, repo, webhook, event, request_headers, request_body, \
|
|
response_status, response_headers, response_body, error, delivered_at, created_at) \
|
|
VALUES ($1, $2, $3, $4, NULL, NULL, NULL, NULL, NULL, NULL, NULL, $5)",
|
|
)
|
|
.bind(delivery_id)
|
|
.bind(repo_id)
|
|
.bind(wh.id)
|
|
.bind(event)
|
|
.bind(now)
|
|
.execute(self.db.writer())
|
|
.await
|
|
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
|
|
let task = git::sync::webhook::WebhookDeliveryTask {
|
|
id: delivery_id.to_string(),
|
|
webhook_id: wh.id.to_string(),
|
|
repo_id: repo_id.to_string(),
|
|
event: event.to_string(),
|
|
url: wh.url.clone(),
|
|
secret: wh.secret_hash.clone(),
|
|
payload: payload.clone(),
|
|
created_at: now,
|
|
retry_count: 0,
|
|
};
|
|
|
|
if let Err(e) =
|
|
git::sync::webhook::enqueue_delivery(task, &self.redis_pool)
|
|
.await
|
|
{
|
|
tracing::error!(
|
|
repo_id = %repo_id,
|
|
webhook_id = %wh.id,
|
|
error = %e,
|
|
"failed to enqueue webhook delivery"
|
|
);
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|