gitdataai/libs/git/http/handler.rs
2026-04-14 19:02:01 +08:00

322 lines
11 KiB
Rust

use actix_web::{Error, HttpResponse, web};
use async_stream::stream;
use futures_util::Stream;
use futures_util::StreamExt;
use models::repos::{repo, repo_branch_protect};
use sea_orm::*;
use std::path::PathBuf;
use std::pin::Pin;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use db::database::AppDatabase;
type ByteStream = Pin<Box<dyn Stream<Item = Result<Vec<u8>, std::io::Error>>>>;
const PRE_PACK_LIMIT: usize = 1_048_576;
const GIT_OPERATION_TIMEOUT: Duration = Duration::from_secs(30);
pub fn is_valid_oid(oid: &str) -> bool {
oid.len() == 40 && oid.chars().all(|c| c.is_ascii_hexdigit())
}
pub struct GitHttpHandler {
storage_path: PathBuf,
repo: repo::Model,
db: AppDatabase,
}
impl GitHttpHandler {
pub fn new(storage_path: PathBuf, repo: repo::Model, db: AppDatabase) -> Self {
Self {
storage_path,
repo,
db,
}
}
pub async fn upload_pack(&self, payload: web::Payload) -> Result<HttpResponse, Error> {
self.handle_git_rpc("upload-pack", payload).await
}
pub async fn receive_pack(&self, payload: web::Payload) -> Result<HttpResponse, Error> {
self.handle_git_rpc("receive-pack", payload).await
}
pub async fn info_refs(&self, service: &str) -> Result<HttpResponse, Error> {
let git_cmd = match service {
"git-upload-pack" => "upload-pack",
"git-receive-pack" => "receive-pack",
_ => {
return Ok(HttpResponse::BadRequest().body("Invalid service"));
}
};
let output = tokio::process::Command::new("git")
.arg(git_cmd)
.arg("--stateless-rpc")
.arg("--advertise-refs")
.arg(&self.storage_path)
.output()
.await
.map_err(|e| {
actix_web::error::ErrorInternalServerError(format!("Failed to execute git: {}", e))
})?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Ok(
HttpResponse::InternalServerError().body(format!("Git command failed: {}", stderr))
);
}
let mut response_body = Vec::new();
let header = format!("# service={}\n", service);
write_pkt_line(&mut response_body, header.as_bytes());
write_flush_pkt(&mut response_body);
response_body.extend_from_slice(&output.stdout);
Ok(HttpResponse::Ok()
.content_type(format!("application/x-{}-advertisement", service))
.insert_header(("Cache-Control", "no-cache"))
.body(response_body))
}
async fn handle_git_rpc(
&self,
service: &str,
mut payload: web::Payload,
) -> Result<HttpResponse, Error> {
let mut child = tokio::process::Command::new("git")
.arg(service)
.arg("--stateless-rpc")
.arg(&self.storage_path)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.kill_on_drop(true)
.spawn()
.map_err(|e| {
actix_web::error::ErrorInternalServerError(format!("Failed to spawn git: {}", e))
})?;
let stream = stream! {
while let Some(chunk) = payload.next().await {
match chunk {
Ok(bytes) => { yield Ok(bytes.to_vec()); }
Err(e) => { yield Err(std::io::Error::new(std::io::ErrorKind::Other, e.to_string())); }
}
}
};
let mut stream: ByteStream = Box::pin(stream);
if service == "receive-pack" {
let branch_protects = repo_branch_protect::Entity::find()
.filter(repo_branch_protect::Column::Repo.eq(self.repo.id))
.all(self.db.reader())
.await
.map_err(|e| actix_web::error::ErrorInternalServerError(e.to_string()))?;
const PACK_SIG: &[u8] = b"PACK";
let mut pre_pack: Vec<u8> = Vec::with_capacity(65536);
while let Some(chunk) = stream.next().await {
let bytes = match chunk {
Ok(b) => b,
Err(e) => return Err(Error::from(e)),
};
// Reject oversized pre-PACK data to prevent memory exhaustion
if pre_pack.len() + bytes.len() > PRE_PACK_LIMIT {
return Ok(HttpResponse::PayloadTooLarge()
.insert_header(("Content-Type", "text/plain"))
.body(format!(
"Ref negotiation exceeds {} byte limit",
PRE_PACK_LIMIT
)));
}
if let Some(pos) = bytes.windows(4).position(|w| w == PACK_SIG) {
pre_pack.extend_from_slice(&bytes[..pos]);
if let Err(msg) = check_branch_protection(&branch_protects, &pre_pack) {
return Ok(HttpResponse::Forbidden()
.insert_header(("Content-Type", "text/plain"))
.body(msg));
}
let remaining: ByteStream = Box::pin(stream! {
yield Ok(bytes[pos..].to_vec());
while let Some(chunk) = stream.next().await {
yield chunk;
}
});
stream = remaining;
break;
} else {
pre_pack.extend_from_slice(&bytes);
}
}
}
if let Some(mut stdin) = child.stdin.take() {
let write_task = actix_web::rt::spawn(async move {
while let Some(chunk) = stream.next().await {
match chunk {
Ok(bytes) => {
if let Err(e) = stdin.write_all(&bytes).await {
return Err(e);
}
}
Err(e) => {
return Err(std::io::Error::new(std::io::ErrorKind::Other, e));
}
}
}
drop(stdin);
Ok::<_, std::io::Error>(())
});
let write_result = tokio::time::timeout(GIT_OPERATION_TIMEOUT, write_task)
.await
.map_err(|_| actix_web::error::ErrorInternalServerError("Git stdin write timeout"))?
.map_err(|e| {
actix_web::error::ErrorInternalServerError(format!("Write error: {}", e))
})?;
if let Err(e) = write_result {
return Err(actix_web::error::ErrorInternalServerError(format!(
"Failed to write to git: {}",
e
)));
}
}
let output = tokio::time::timeout(GIT_OPERATION_TIMEOUT, child.wait_with_output())
.await
.map_err(|_| actix_web::error::ErrorInternalServerError("Git operation timeout"))?
.map_err(|e| {
actix_web::error::ErrorInternalServerError(format!("Git wait failed: {}", e))
})?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Ok(HttpResponse::InternalServerError()
.insert_header(("Content-Type", "text/plain"))
.body(format!("Git command failed: {}", stderr)));
}
Ok(HttpResponse::Ok()
.content_type(format!("application/x-git-{}-result", service))
.insert_header(("Cache-Control", "no-cache"))
.body(output.stdout))
}
}
fn write_pkt_line(buf: &mut Vec<u8>, data: &[u8]) {
let len = data.len() + 4;
buf.extend_from_slice(format!("{:04x}", len).as_bytes());
buf.extend_from_slice(data);
}
fn write_flush_pkt(buf: &mut Vec<u8>) {
buf.extend_from_slice(b"0000");
}
#[derive(Debug)]
struct RefUpdate {
old_oid: Option<String>,
new_oid: Option<String>,
name: String,
}
fn check_branch_protection(
branch_protects: &[repo_branch_protect::Model],
pre_pack: &[u8],
) -> Result<(), String> {
let refs = parse_ref_updates(pre_pack)?;
for r#ref in &refs {
for protection in branch_protects {
if r#ref.name.starts_with(&protection.branch) {
// Check deletion (new_oid is all zeros / 40 zeros)
if r#ref.new_oid.as_deref() == Some("0000000000000000000000000000000000000000") {
if protection.forbid_deletion {
return Err(format!(
"Deletion of protected branch '{}' is forbidden",
r#ref.name
));
}
continue;
}
// Check tag push
if r#ref.name.starts_with("refs/tags/") {
if protection.forbid_tag_push {
return Err(format!(
"Tag push to protected branch '{}' is forbidden",
r#ref.name
));
}
continue;
}
// Check force push: old != new AND old is non-zero (non-fast-forward update)
if let (Some(old_oid), Some(new_oid)) =
(r#ref.old_oid.as_deref(), r#ref.new_oid.as_deref())
{
let is_new_branch = old_oid == "0000000000000000000000000000000000000000";
if !is_new_branch
&& old_oid != new_oid
&& r#ref.name.starts_with("refs/heads/")
&& protection.forbid_force_push
{
return Err(format!(
"Force push to protected branch '{}' is forbidden",
r#ref.name
));
}
}
// Check push
if protection.forbid_push {
return Err(format!(
"Push to protected branch '{}' is forbidden",
r#ref.name
));
}
}
}
}
Ok(())
}
fn parse_ref_updates(buffer: &[u8]) -> Result<Vec<RefUpdate>, String> {
let text = String::from_utf8_lossy(buffer);
let mut refs = Vec::new();
for line in text.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') || line.starts_with("PACK") {
continue;
}
// Format: "<old-oid> <new-oid> <ref-name>\n"
let mut parts = line.split_whitespace();
let old_oid = parts.next().map(|s| s.to_string());
let new_oid = parts.next().map(|s| s.to_string());
let name = parts
.next()
.unwrap_or("")
.trim_start_matches('\0')
.to_string();
if !name.is_empty() {
refs.push(RefUpdate {
old_oid,
new_oid,
name,
});
}
}
Ok(refs)
}