gitdataai/libs/git/hook/webhook_dispatch.rs
2026-04-14 19:02:01 +08:00

411 lines
13 KiB
Rust

use db::database::AppDatabase;
use serde::Deserialize;
use sha2::{Digest, Sha256};
use std::time::Duration;
use tokio::time::timeout;
/// Compute HMAC-SHA256 of `body` with `secret`, returning "sha256=<hex>" or None if secret is empty.
pub fn sign_payload(body: &[u8], secret: &str) -> Option<String> {
if secret.is_empty() {
return None;
}
// HMAC-SHA256: inner = SHA256(k XOR ipad || text), outer = SHA256(k XOR opad || inner)
const IPAD: u8 = 0x36;
const OPAD: u8 = 0x5c;
const BLOCK_SIZE: usize = 64; // SHA256 block size
// Pad or hash key to 64 bytes
let key = if secret.len() > BLOCK_SIZE {
Sha256::digest(secret.as_bytes()).to_vec()
} else {
secret.as_bytes().to_vec()
};
let mut key_block = vec![0u8; BLOCK_SIZE];
key_block[..key.len()].copy_from_slice(&key);
// k_ipad = key_block XOR ipad, k_opad = key_block XOR opad
let mut k_ipad = [0u8; BLOCK_SIZE];
let mut k_opad = [0u8; BLOCK_SIZE];
for i in 0..BLOCK_SIZE {
k_ipad[i] = key_block[i] ^ IPAD;
k_opad[i] = key_block[i] ^ OPAD;
}
// inner = SHA256(k_ipad || body)
let mut inner_hasher = Sha256::new();
inner_hasher.update(&k_ipad);
inner_hasher.update(body);
let inner = inner_hasher.finalize();
// outer = SHA256(k_opad || inner)
let mut outer_hasher = Sha256::new();
outer_hasher.update(&k_opad);
outer_hasher.update(inner);
let result = outer_hasher.finalize();
Some(format!(
"sha256={}",
result
.iter()
.map(|b| format!("{:02x}", b))
.collect::<String>()
))
}
#[derive(Debug, Clone, Default, Deserialize)]
pub struct WebhookEvents {
pub push: bool,
pub tag_push: bool,
pub pull_request: bool,
pub issue_comment: bool,
pub release: bool,
}
impl From<serde_json::Value> for WebhookEvents {
fn from(v: serde_json::Value) -> Self {
Self {
push: v.get("push").and_then(|v| v.as_bool()).unwrap_or(false),
tag_push: v.get("tag_push").and_then(|v| v.as_bool()).unwrap_or(false),
pull_request: v
.get("pull_request")
.and_then(|v| v.as_bool())
.unwrap_or(false),
issue_comment: v
.get("issue_comment")
.and_then(|v| v.as_bool())
.unwrap_or(false),
release: v.get("release").and_then(|v| v.as_bool()).unwrap_or(false),
}
}
}
#[derive(Debug, serde::Serialize)]
pub struct PushPayload {
#[serde(rename = "ref")]
pub r#ref: String,
pub before: String,
pub after: String,
pub repository: RepositoryPayload,
pub pusher: PusherPayload,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub commits: Vec<CommitPayload>,
}
#[derive(Debug, serde::Serialize)]
pub struct TagPushPayload {
#[serde(rename = "ref")]
pub r#ref: String,
pub before: String,
pub after: String,
pub repository: RepositoryPayload,
pub pusher: PusherPayload,
}
#[derive(Debug, serde::Serialize)]
pub struct RepositoryPayload {
pub id: String,
pub name: String,
pub full_name: String,
pub namespace: String,
pub default_branch: String,
}
#[derive(Debug, serde::Serialize)]
pub struct PusherPayload {
pub name: String,
pub email: String,
}
#[derive(Debug, serde::Serialize)]
pub struct CommitPayload {
pub id: String,
pub message: String,
pub author: AuthorPayload,
}
#[derive(Debug, serde::Serialize)]
pub struct AuthorPayload {
pub name: String,
pub email: String,
}
#[derive(Debug)]
pub enum DispatchError {
Timeout,
ConnectionFailed,
RequestFailed(String),
HttpError(u16),
}
impl std::fmt::Display for DispatchError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DispatchError::Timeout => write!(f, "timeout"),
DispatchError::ConnectionFailed => write!(f, "connection failed"),
DispatchError::RequestFailed(s) => write!(f, "request failed: {}", s),
DispatchError::HttpError(code) => write!(f, "http error: {}", code),
}
}
}
pub async fn deliver(
client: &reqwest::Client,
url: &str,
secret: Option<&str>,
content_type: &str,
body: &[u8],
) -> Result<(), DispatchError> {
let mut req = client
.post(url)
.header("Content-Type", content_type)
.header("User-Agent", "Code-Git-Hook/1.0")
.timeout(Duration::from_secs(10))
.body(body.to_vec());
if let Some(secret) = secret {
if let Some(sig) = sign_payload(body, secret) {
req = req.header("X-Hub-Signature-256", sig);
}
}
let resp = req.send().await.map_err(|e| {
if e.is_timeout() {
DispatchError::Timeout
} else if e.is_connect() {
DispatchError::ConnectionFailed
} else {
DispatchError::RequestFailed(e.to_string())
}
})?;
if resp.status().is_success() {
Ok(())
} else {
Err(DispatchError::HttpError(resp.status().as_u16()))
}
}
pub struct CommitDispatch {
pub id: String,
pub message: String,
pub author_name: String,
pub author_email: String,
}
pub enum WebhookEventKind {
Push {
r#ref: String,
before: String,
after: String,
commits: Vec<CommitDispatch>,
},
TagPush {
r#ref: String,
before: String,
after: String,
},
}
/// Dispatch webhooks for a repository after a push or tag event.
/// Queries active webhooks from the DB and sends HTTP POST requests.
pub async fn dispatch_repo_webhooks(
db: &AppDatabase,
http: &reqwest::Client,
logs: &slog::Logger,
repo_uuid: &str,
namespace: &str,
repo_name: &str,
default_branch: &str,
pusher_name: &str,
pusher_email: &str,
event: WebhookEventKind,
) {
use models::repos::repo_webhook::{Column as RwCol, Entity as RepoWebhookEntity};
use models::{ColumnTrait, EntityTrait, QueryFilter, Uuid};
let webhooks: Vec<<RepoWebhookEntity as EntityTrait>::Model> = match RepoWebhookEntity::find()
.filter(RwCol::Repo.eq(Uuid::parse_str(repo_uuid).ok()))
.all(db.reader())
.await
{
Ok(ws) => ws,
Err(e) => {
slog::error!(logs, "failed to query webhooks: {}", e; "repo" => repo_uuid);
return;
}
};
if webhooks.is_empty() {
return;
}
for webhook in webhooks {
let event_config: WebhookEvents =
serde_json::from_value(webhook.event.clone()).unwrap_or_default();
let content_type = webhook
.event
.get("content_type")
.and_then(|v: &serde_json::Value| v.as_str())
.unwrap_or("application/json");
let url = webhook.url.as_deref().unwrap_or("");
if url.is_empty() {
continue;
}
let secret = webhook.secret_key.as_deref();
match &event {
WebhookEventKind::Push {
r#ref,
before,
after,
commits,
} => {
if !event_config.push {
continue;
}
let payload = PushPayload {
r#ref: r#ref.clone(),
before: before.clone(),
after: after.clone(),
repository: RepositoryPayload {
id: repo_uuid.to_owned(),
name: repo_name.to_owned(),
full_name: format!("{}/{}", namespace, repo_name),
namespace: namespace.to_owned(),
default_branch: default_branch.to_owned(),
},
pusher: PusherPayload {
name: pusher_name.to_owned(),
email: pusher_email.to_owned(),
},
commits: commits
.iter()
.map(|c| CommitPayload {
id: c.id.clone(),
message: c.message.clone(),
author: AuthorPayload {
name: c.author_name.clone(),
email: c.author_email.clone(),
},
})
.collect(),
};
let body = match serde_json::to_vec(&payload) {
Ok(b) => b,
Err(e) => {
slog::error!(logs, "failed to serialize push payload"; "error" => e.to_string());
continue;
}
};
let webhook_id = webhook.id;
match timeout(
Duration::from_secs(10),
deliver(http, url, secret, content_type, &body),
)
.await
{
Ok(Ok(())) => {
slog::info!(logs, "push webhook delivered"; "webhook_id" => webhook_id, "url" => url);
let _ = touch_webhook(db, webhook_id, true, logs).await;
}
Ok(Err(e)) => {
slog::warn!(logs, "push webhook delivery failed"; "error" => e.to_string(), "webhook_id" => webhook_id, "url" => url);
let _ = touch_webhook(db, webhook_id, false, logs).await;
}
Err(_) => {
slog::warn!(logs, "push webhook timed out"; "webhook_id" => webhook_id, "url" => url);
let _ = touch_webhook(db, webhook_id, false, logs).await;
}
}
}
WebhookEventKind::TagPush {
r#ref,
before,
after,
} => {
if !event_config.tag_push {
continue;
}
let payload = TagPushPayload {
r#ref: r#ref.clone(),
before: before.clone(),
after: after.clone(),
repository: RepositoryPayload {
id: repo_uuid.to_owned(),
name: repo_name.to_owned(),
full_name: format!("{}/{}", namespace, repo_name),
namespace: namespace.to_owned(),
default_branch: default_branch.to_owned(),
},
pusher: PusherPayload {
name: pusher_name.to_owned(),
email: pusher_email.to_owned(),
},
};
let body = match serde_json::to_vec(&payload) {
Ok(b) => b,
Err(e) => {
slog::error!(logs, "failed to serialize tag payload"; "error" => e.to_string());
continue;
}
};
let webhook_id = webhook.id;
match timeout(
Duration::from_secs(10),
deliver(http, url, secret, content_type, &body),
)
.await
{
Ok(Ok(())) => {
slog::info!(logs, "tag webhook delivered"; "webhook_id" => webhook_id, "url" => url);
let _ = touch_webhook(db, webhook_id, true, logs).await;
}
Ok(Err(e)) => {
slog::warn!(logs, "tag webhook delivery failed"; "error" => e.to_string(), "webhook_id" => webhook_id, "url" => url);
let _ = touch_webhook(db, webhook_id, false, logs).await;
}
Err(_) => {
slog::warn!(logs, "tag webhook timed out"; "webhook_id" => webhook_id, "url" => url);
let _ = touch_webhook(db, webhook_id, false, logs).await;
}
}
}
}
}
}
async fn touch_webhook(db: &AppDatabase, webhook_id: i64, success: bool, logs: &slog::Logger) {
use models::repos::repo_webhook::{Column as RwCol, Entity as RepoWebhookEntity};
use models::{ColumnTrait, EntityTrait, QueryFilter};
use sea_orm::prelude::Expr;
let result: Result<sea_orm::UpdateResult, sea_orm::DbErr> = if success {
RepoWebhookEntity::update_many()
.filter(RwCol::Id.eq(webhook_id))
.col_expr(
RwCol::LastDeliveredAt,
Expr::value(Some(chrono::Utc::now())),
)
.col_expr(RwCol::TouchCount, Expr::value(1i64))
.exec(db.writer())
.await
} else {
RepoWebhookEntity::update_many()
.filter(RwCol::Id.eq(webhook_id))
.col_expr(RwCol::TouchCount, Expr::value(1i64))
.exec(db.writer())
.await
};
if let Err(e) = result {
slog::warn!(logs, "failed to update webhook touch"; "error" => e.to_string());
}
}