328 lines
11 KiB
Rust
328 lines
11 KiB
Rust
use std::{
|
|
path::PathBuf,
|
|
pin::Pin,
|
|
time::{Duration, Instant},
|
|
};
|
|
|
|
use actix_web::{Error, HttpResponse, web};
|
|
use async_stream::stream;
|
|
use db::database::AppDatabase;
|
|
use futures_util::{Stream, StreamExt};
|
|
use model::repos::{RepoModel, repo_protect::RepoProtectModel};
|
|
use tokio::io::AsyncWriteExt;
|
|
|
|
use crate::ssh::{
|
|
branch_protect::check_branch_protection, ref_update::RefUpdate,
|
|
};
|
|
|
|
type ByteStream = Pin<Box<dyn Stream<Item = Result<Vec<u8>, std::io::Error>>>>;
|
|
|
|
const PRE_PACK_LIMIT: usize = 1_048_576;
|
|
const GIT_OPERATION_TIMEOUT: Duration = Duration::from_secs(30);
|
|
|
|
pub fn is_valid_oid(oid: &str) -> bool {
|
|
oid.len() == 40 && oid.chars().all(|c| c.is_ascii_hexdigit())
|
|
}
|
|
|
|
pub fn is_valid_lfs_oid(oid: &str) -> bool {
|
|
oid.len() == 64 && oid.chars().all(|c| c.is_ascii_hexdigit())
|
|
}
|
|
|
|
pub struct GitHttpHandler {
|
|
storage_path: PathBuf,
|
|
repo: RepoModel,
|
|
db: AppDatabase,
|
|
}
|
|
|
|
impl GitHttpHandler {
|
|
pub fn new(
|
|
storage_path: PathBuf,
|
|
repo: RepoModel,
|
|
db: AppDatabase,
|
|
) -> Self {
|
|
Self {
|
|
storage_path,
|
|
repo,
|
|
db,
|
|
}
|
|
}
|
|
|
|
pub async fn upload_pack(
|
|
&self,
|
|
payload: web::Payload,
|
|
) -> Result<HttpResponse, Error> {
|
|
self.handle_git_rpc("upload-pack", payload).await
|
|
}
|
|
|
|
pub async fn receive_pack(
|
|
&self,
|
|
payload: web::Payload,
|
|
) -> Result<HttpResponse, Error> {
|
|
self.handle_git_rpc("receive-pack", payload).await
|
|
}
|
|
|
|
pub async fn info_refs(
|
|
&self,
|
|
service: &str,
|
|
) -> Result<HttpResponse, Error> {
|
|
let git_cmd = match service {
|
|
"git-upload-pack" => "upload-pack",
|
|
"git-receive-pack" => "receive-pack",
|
|
_ => {
|
|
return Err(actix_web::error::ErrorBadRequest(
|
|
"Invalid service",
|
|
));
|
|
}
|
|
};
|
|
|
|
let output = tokio::time::timeout(GIT_OPERATION_TIMEOUT, async {
|
|
tokio::process::Command::new("git")
|
|
.arg(git_cmd)
|
|
.arg("--stateless-rpc")
|
|
.arg("--advertise-refs")
|
|
.arg(&self.storage_path)
|
|
.output()
|
|
.await
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
actix_web::error::ErrorInternalServerError("Git info-refs timeout")
|
|
})?
|
|
.map_err(|e| {
|
|
actix_web::error::ErrorInternalServerError(format!(
|
|
"Failed to execute git: {}",
|
|
e
|
|
))
|
|
})?;
|
|
|
|
if !output.status.success() {
|
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
|
return Err(actix_web::error::ErrorInternalServerError(format!(
|
|
"Git command failed: {}",
|
|
stderr
|
|
)));
|
|
}
|
|
|
|
let mut response_body = Vec::new();
|
|
let header = format!("# service={}\n", service);
|
|
write_pkt_line(&mut response_body, header.as_bytes());
|
|
write_flush_pkt(&mut response_body);
|
|
response_body.extend_from_slice(&output.stdout);
|
|
|
|
Ok(HttpResponse::Ok()
|
|
.content_type(format!("application/x-{}-advertisement", service))
|
|
.insert_header(("Cache-Control", "no-cache"))
|
|
.body(response_body))
|
|
}
|
|
|
|
async fn handle_git_rpc(
|
|
&self,
|
|
service: &str,
|
|
mut payload: web::Payload,
|
|
) -> Result<HttpResponse, Error> {
|
|
let started = Instant::now();
|
|
tracing::info!(
|
|
"git_rpc_started service={} repo={} repo_id={}",
|
|
service,
|
|
self.repo.name,
|
|
self.repo.id.to_string()
|
|
);
|
|
let mut child = tokio::process::Command::new("git")
|
|
.arg(service)
|
|
.arg("--stateless-rpc")
|
|
.arg(&self.storage_path)
|
|
.stdin(std::process::Stdio::piped())
|
|
.stdout(std::process::Stdio::piped())
|
|
.stderr(std::process::Stdio::piped())
|
|
.kill_on_drop(true)
|
|
.spawn()
|
|
.map_err(|e| {
|
|
actix_web::error::ErrorInternalServerError(format!(
|
|
"Failed to spawn git: {}",
|
|
e
|
|
))
|
|
})?;
|
|
|
|
let stream = stream! {
|
|
while let Some(chunk) = payload.next().await {
|
|
match chunk {
|
|
Ok(bytes) => { yield Ok(bytes.to_vec()); }
|
|
Err(e) => { yield Err(std::io::Error::new(std::io::ErrorKind::Other, e.to_string())); }
|
|
}
|
|
}
|
|
};
|
|
let mut stream: ByteStream = Box::pin(stream);
|
|
|
|
if service == "receive-pack" {
|
|
let branch_protects: Vec<RepoProtectModel> = sqlx::query_as::<_, RepoProtectModel>(
|
|
"SELECT id, repo, pattern, require_pull_request, required_approvals, \
|
|
require_status_checks, required_status_contexts, enforce_admins, \
|
|
allow_force_pushes, allow_deletions, created_at, updated_at \
|
|
FROM repo_protect \
|
|
WHERE repo = $1",
|
|
)
|
|
.bind(self.repo.id)
|
|
.fetch_all(self.db.reader())
|
|
.await
|
|
.map_err(|e| actix_web::error::ErrorInternalServerError(e.to_string()))?;
|
|
|
|
let mut pre_pack: Vec<u8> = Vec::with_capacity(65536);
|
|
|
|
while let Some(chunk) = stream.next().await {
|
|
let bytes = match chunk {
|
|
Ok(b) => b,
|
|
Err(e) => return Err(Error::from(e)),
|
|
};
|
|
|
|
if pre_pack.len() + bytes.len() > PRE_PACK_LIMIT {
|
|
tracing::warn!(
|
|
"git_rpc_payload_too_large service={} repo={} repo_id={}",
|
|
service,
|
|
self.repo.name,
|
|
self.repo.id.to_string()
|
|
);
|
|
return Err(actix_web::error::ErrorPayloadTooLarge(
|
|
format!(
|
|
"Ref negotiation exceeds {} byte limit",
|
|
PRE_PACK_LIMIT
|
|
),
|
|
));
|
|
}
|
|
|
|
if let Some(pos) = bytes.windows(4).position(|w| w == b"0000") {
|
|
let end = pos + 4;
|
|
pre_pack.extend_from_slice(&bytes[..end]);
|
|
|
|
let refs = RefUpdate::parse_ref_updates(&pre_pack)
|
|
.map_err(actix_web::error::ErrorBadRequest)?;
|
|
if let Some(msg) = refs.iter().find_map(|r#ref| {
|
|
check_branch_protection(&branch_protects, r#ref)
|
|
}) {
|
|
tracing::warn!(
|
|
"branch_protection_violation repo={} repo_id={} message={}",
|
|
self.repo.name,
|
|
self.repo.id.to_string(),
|
|
msg
|
|
);
|
|
return Err(actix_web::error::ErrorForbidden(msg));
|
|
}
|
|
|
|
let remaining: ByteStream = Box::pin(stream! {
|
|
yield Ok(pre_pack);
|
|
if end < bytes.len() {
|
|
yield Ok(bytes[end..].to_vec());
|
|
}
|
|
while let Some(chunk) = stream.next().await {
|
|
yield chunk;
|
|
}
|
|
});
|
|
stream = remaining;
|
|
break;
|
|
} else {
|
|
pre_pack.extend_from_slice(&bytes);
|
|
}
|
|
}
|
|
}
|
|
|
|
if let Some(mut stdin) = child.stdin.take() {
|
|
let write_task = actix_web::rt::spawn(async move {
|
|
while let Some(chunk) = stream.next().await {
|
|
match chunk {
|
|
Ok(bytes) => {
|
|
if let Err(e) = stdin.write_all(&bytes).await {
|
|
return Err(e);
|
|
}
|
|
}
|
|
Err(e) => {
|
|
return Err(std::io::Error::new(
|
|
std::io::ErrorKind::Other,
|
|
e,
|
|
));
|
|
}
|
|
}
|
|
}
|
|
drop(stdin);
|
|
Ok::<_, std::io::Error>(())
|
|
});
|
|
|
|
let write_result =
|
|
tokio::time::timeout(GIT_OPERATION_TIMEOUT, write_task)
|
|
.await
|
|
.map_err(|_| {
|
|
actix_web::error::ErrorInternalServerError(
|
|
"Git stdin write timeout",
|
|
)
|
|
})?
|
|
.map_err(|e| {
|
|
actix_web::error::ErrorInternalServerError(format!(
|
|
"Write error: {}",
|
|
e
|
|
))
|
|
})?;
|
|
|
|
if let Err(e) = write_result {
|
|
return Err(actix_web::error::ErrorInternalServerError(
|
|
format!("Failed to write to git: {}", e),
|
|
));
|
|
}
|
|
}
|
|
|
|
let output = tokio::time::timeout(
|
|
GIT_OPERATION_TIMEOUT,
|
|
child.wait_with_output(),
|
|
)
|
|
.await
|
|
.map_err(|_| {
|
|
actix_web::error::ErrorInternalServerError("Git operation timeout")
|
|
})?
|
|
.map_err(|e| {
|
|
actix_web::error::ErrorInternalServerError(format!(
|
|
"Git wait failed: {}",
|
|
e
|
|
))
|
|
})?;
|
|
|
|
if !output.status.success() {
|
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
|
let ms = started.elapsed().as_millis() as u64;
|
|
tracing::error!(
|
|
"git_rpc_failed service={} repo={} repo_id={} duration_ms={} stderr={}",
|
|
service,
|
|
self.repo.name,
|
|
self.repo.id.to_string(),
|
|
ms,
|
|
stderr.to_string()
|
|
);
|
|
return Err(actix_web::error::ErrorInternalServerError(format!(
|
|
"Git command failed: {}",
|
|
stderr
|
|
)));
|
|
}
|
|
|
|
let ms = started.elapsed().as_millis() as u64;
|
|
tracing::info!(
|
|
"git_rpc_completed service={} repo={} repo_id={} duration_ms={} bytes_out={}",
|
|
service,
|
|
self.repo.name,
|
|
self.repo.id.to_string(),
|
|
ms,
|
|
output.stdout.len()
|
|
);
|
|
|
|
Ok(HttpResponse::Ok()
|
|
.content_type(format!("application/x-git-{}-result", service))
|
|
.insert_header(("Cache-Control", "no-cache"))
|
|
.body(output.stdout))
|
|
}
|
|
}
|
|
|
|
fn write_pkt_line(buf: &mut Vec<u8>, data: &[u8]) {
|
|
let len = data.len() + 4;
|
|
buf.extend_from_slice(format!("{:04x}", len).as_bytes());
|
|
buf.extend_from_slice(data);
|
|
}
|
|
|
|
fn write_flush_pkt(buf: &mut Vec<u8>) {
|
|
buf.extend_from_slice(b"0000");
|
|
}
|