use std::time::Duration; use deadpool_redis::cluster::Pool as RedisPool; use hmac::{Hmac, KeyInit, Mac}; use sha2::Sha256; type HmacSha256 = Hmac; fn webhook_queue_keys(repo_id: uuid::Uuid) -> (String, String) { let hash_tag = format!("{{wh:{}}}", repo_id); ( format!("{}:pending", hash_tag), format!("{}:processing", hash_tag), ) } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct WebhookDeliveryTask { pub id: String, pub webhook_id: String, pub repo_id: String, pub event: String, pub url: String, pub secret: Option, pub payload: serde_json::Value, pub created_at: chrono::DateTime, pub retry_count: usize, } pub async fn enqueue_delivery( task: WebhookDeliveryTask, redis_pool: &RedisPool, ) -> Result<(), String> { let task_json = serde_json::to_string(&task) .map_err(|e| format!("serialize error: {}", e))?; let repo_id: uuid::Uuid = task .repo_id .parse() .map_err(|e| format!("invalid repo_id: {}", e))?; let (pending_key, _) = webhook_queue_keys(repo_id); let redis = redis_pool .get() .await .map_err(|e| format!("redis pool: {}", e))?; let mut conn: deadpool_redis::cluster::Connection = redis; redis::cmd("LPUSH") .arg(&pending_key) .arg(&task_json) .query_async::<()>(&mut conn) .await .map_err(|e| format!("LPUSH error: {}", e))?; tracing::info!( webhook_id = %task.webhook_id, repo_id = %task.repo_id, event = %task.event, "webhook delivery enqueued" ); Ok(()) } pub async fn poll_delivery_for_repo( redis_pool: &RedisPool, repo_id: uuid::Uuid, block_timeout_secs: usize, ) -> Option { let (pending_key, processing_key) = webhook_queue_keys(repo_id); let redis = redis_pool.get().await.ok()?; let mut conn: deadpool_redis::cluster::Connection = redis; redis::cmd("BLMOVE") .arg(&pending_key) .arg(&processing_key) .arg("RIGHT") .arg("LEFT") .arg(block_timeout_secs) .query_async::>(&mut conn) .await .ok() .flatten() } pub async fn ack_delivery( redis_pool: &RedisPool, repo_id: uuid::Uuid, task_json: &str, ) { let (_, processing_key) = webhook_queue_keys(repo_id); let redis = match redis_pool.get().await { Ok(c) => c, Err(e) => { tracing::warn!(error = %e, "webhook ack: failed to get redis connection"); return; } }; let mut conn: deadpool_redis::cluster::Connection = redis; if let Err(e) = redis::cmd("LREM") .arg(&processing_key) .arg(1) .arg(task_json) .query_async::<()>(&mut conn) .await { tracing::warn!(error = %e, "webhook ack: LREM failed"); } } fn compute_hmac_signature(secret: &str, body: &[u8]) -> String { let mut mac = HmacSha256::new_from_slice(secret.as_bytes()) .expect("HMAC can take key of any size"); mac.update(body); let result = mac.finalize(); let code_bytes = result.into_bytes(); hex::encode(code_bytes) } pub async fn deliver_webhook( task: &WebhookDeliveryTask, ) -> WebhookDeliveryResult { let body_bytes = serde_json::to_vec(&task.payload).unwrap_or_default(); let signature = task .secret .as_ref() .map(|s| compute_hmac_signature(s, &body_bytes)); let client = reqwest::Client::builder() .timeout(Duration::from_secs(10)) .build() .unwrap_or_default(); let mut request = client .post(&task.url) .header("Content-Type", "application/json") .header("X-GitData-Event", &task.event) .header("X-GitData-Delivery", &task.id); if let Some(sig) = &signature { request = request .header("X-GitData-Signature-256", format!("sha256={}", sig)); } let response = request.body(body_bytes.clone()).send().await; match response { Ok(resp) => { let status = resp.status().as_u16() as i32; let resp_headers: String = resp .headers() .iter() .map(|(k, v)| { format!("{}: {}", k, v.to_str().unwrap_or_default()) }) .collect::>() .join("\n"); let resp_body = resp.text().await.unwrap_or_default(); WebhookDeliveryResult { response_status: Some(status), response_headers: Some(resp_headers), response_body: Some(resp_body), error: None, request_headers: Some(format!( "Content-Type: application/json\nX-GitData-Event: {}\nX-GitData-Delivery: {}", task.event, task.id )), request_body: Some( String::from_utf8_lossy(&body_bytes).to_string(), ), } } Err(e) => WebhookDeliveryResult { response_status: None, response_headers: None, response_body: None, error: Some(e.to_string()), request_headers: Some(format!( "Content-Type: application/json\nX-GitData-Event: {}\nX-GitData-Delivery: {}", task.event, task.id )), request_body: Some( String::from_utf8_lossy(&body_bytes).to_string(), ), }, } } pub struct WebhookDeliveryResult { pub request_headers: Option, pub request_body: Option, pub response_status: Option, pub response_headers: Option, pub response_body: Option, pub error: Option, }