feat(http): improve auth verification and route handling
- Migrate access key auth from custom hash to Argon2 password verification - Check all un-revoked tokens with expiry validation - Add branch protection checks to HTTP push handlers
This commit is contained in:
parent
894c3873a4
commit
2b6b4af3db
@ -1,6 +1,8 @@
|
||||
use crate::http::utils::{extract_basic_credentials, hash_access_key};
|
||||
use crate::http::utils::extract_basic_credentials;
|
||||
use crate::ssh::authz::SshAuthService;
|
||||
use actix_web::{Error, HttpRequest};
|
||||
use argon2::Argon2;
|
||||
use argon2::password_hash::{PasswordHash, PasswordVerifier};
|
||||
use db::database::AppDatabase;
|
||||
use models::repos::repo;
|
||||
use models::users::{user, user_token};
|
||||
@ -19,27 +21,36 @@ pub async fn verify_access_token(
|
||||
.map_err(|_| actix_web::error::ErrorUnauthorized("Invalid username or access key"))?
|
||||
.ok_or_else(|| actix_web::error::ErrorUnauthorized("Invalid username or access key"))?;
|
||||
|
||||
let token_hash = hash_access_key(access_key)
|
||||
.map_err(|_| actix_web::error::ErrorInternalServerError("Token hash failed"))?;
|
||||
|
||||
let token = user_token::Entity::find()
|
||||
let tokens = user_token::Entity::find()
|
||||
.filter(user_token::Column::User.eq(user.uid))
|
||||
.filter(user_token::Column::TokenHash.eq(token_hash))
|
||||
.filter(user_token::Column::IsRevoked.eq(false))
|
||||
.one(db.reader())
|
||||
.all(db.reader())
|
||||
.await
|
||||
.map_err(|_| actix_web::error::ErrorUnauthorized("Invalid username or access key"))?
|
||||
.ok_or_else(|| actix_web::error::ErrorUnauthorized("Invalid username or access key"))?;
|
||||
.into_iter()
|
||||
.filter(|token| {
|
||||
token
|
||||
.expires_at
|
||||
.map(|expires_at| expires_at >= chrono::Utc::now())
|
||||
.unwrap_or(true)
|
||||
});
|
||||
|
||||
if let Some(expires_at) = token.expires_at {
|
||||
if expires_at < chrono::Utc::now() {
|
||||
return Err(actix_web::error::ErrorUnauthorized(
|
||||
"Access key has expired",
|
||||
));
|
||||
for token in tokens {
|
||||
let Ok(hash) = PasswordHash::new(&token.token_hash) else {
|
||||
tracing::warn!(token_id = token.id, "invalid stored access key hash");
|
||||
continue;
|
||||
};
|
||||
if Argon2::default()
|
||||
.verify_password(access_key.as_bytes(), &hash)
|
||||
.is_ok()
|
||||
{
|
||||
return Ok(user);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(user)
|
||||
Err(actix_web::error::ErrorUnauthorized(
|
||||
"Invalid username or access key",
|
||||
))
|
||||
}
|
||||
|
||||
pub async fn authorize_repo_access(
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
use crate::ssh::branch_protect::check_branch_protection;
|
||||
use crate::ssh::ref_update::RefUpdate;
|
||||
use actix_web::{Error, HttpResponse, web};
|
||||
use async_stream::stream;
|
||||
use futures_util::Stream;
|
||||
@ -134,7 +136,6 @@ impl GitHttpHandler {
|
||||
.await
|
||||
.map_err(|e| actix_web::error::ErrorInternalServerError(e.to_string()))?;
|
||||
|
||||
const PACK_SIG: &[u8] = b"PACK";
|
||||
let mut pre_pack: Vec<u8> = Vec::with_capacity(65536);
|
||||
|
||||
while let Some(chunk) = stream.next().await {
|
||||
@ -157,10 +158,16 @@ impl GitHttpHandler {
|
||||
)));
|
||||
}
|
||||
|
||||
if let Some(pos) = bytes.windows(4).position(|w| w == PACK_SIG) {
|
||||
pre_pack.extend_from_slice(&bytes[..pos]);
|
||||
if let Some(pos) = bytes.windows(4).position(|w| w == b"0000") {
|
||||
let end = pos + 4;
|
||||
pre_pack.extend_from_slice(&bytes[..end]);
|
||||
|
||||
if let Err(msg) = check_branch_protection(&branch_protects, &pre_pack) {
|
||||
let refs = RefUpdate::parse_ref_updates(&pre_pack)
|
||||
.map_err(actix_web::error::ErrorBadRequest)?;
|
||||
if let Some(msg) = refs
|
||||
.iter()
|
||||
.find_map(|r#ref| check_branch_protection(&branch_protects, r#ref))
|
||||
{
|
||||
tracing::warn!(
|
||||
"branch_protection_violation repo={} repo_id={} message={}",
|
||||
self.repo.repo_name,
|
||||
@ -171,7 +178,10 @@ impl GitHttpHandler {
|
||||
}
|
||||
|
||||
let remaining: ByteStream = Box::pin(stream! {
|
||||
yield Ok(bytes[pos..].to_vec());
|
||||
yield Ok(pre_pack);
|
||||
if end < bytes.len() {
|
||||
yield Ok(bytes[end..].to_vec());
|
||||
}
|
||||
while let Some(chunk) = stream.next().await {
|
||||
yield chunk;
|
||||
}
|
||||
@ -267,106 +277,3 @@ fn write_pkt_line(buf: &mut Vec<u8>, data: &[u8]) {
|
||||
fn write_flush_pkt(buf: &mut Vec<u8>) {
|
||||
buf.extend_from_slice(b"0000");
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct RefUpdate {
|
||||
old_oid: Option<String>,
|
||||
new_oid: Option<String>,
|
||||
name: String,
|
||||
}
|
||||
|
||||
fn check_branch_protection(
|
||||
branch_protects: &[repo_branch_protect::Model],
|
||||
pre_pack: &[u8],
|
||||
) -> Result<(), String> {
|
||||
let refs = parse_ref_updates(pre_pack)?;
|
||||
for r#ref in &refs {
|
||||
for protection in branch_protects {
|
||||
// Match exactly or as directory prefix (e.g. "refs/heads/main"
|
||||
// matches "refs/heads/main" and "refs/heads/main/*" but NOT
|
||||
// "refs/heads/main-v2").
|
||||
let matches = r#ref.name == protection.branch
|
||||
|| r#ref.name.starts_with(&format!("{}/", protection.branch));
|
||||
if !matches {
|
||||
continue;
|
||||
}
|
||||
// Check deletion (new_oid is all zeros / 40 zeros)
|
||||
if r#ref.new_oid.as_deref() == Some("0000000000000000000000000000000000000000") {
|
||||
if protection.forbid_deletion {
|
||||
return Err(format!(
|
||||
"Deletion of protected branch '{}' is forbidden",
|
||||
r#ref.name
|
||||
));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check tag push
|
||||
if r#ref.name.starts_with("refs/tags/") {
|
||||
if protection.forbid_tag_push {
|
||||
return Err(format!(
|
||||
"Tag push to protected branch '{}' is forbidden",
|
||||
r#ref.name
|
||||
));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check force push: old != new AND old is non-zero (non-fast-forward update)
|
||||
if let (Some(old_oid), Some(new_oid)) =
|
||||
(r#ref.old_oid.as_deref(), r#ref.new_oid.as_deref())
|
||||
{
|
||||
let is_new_branch = old_oid == "0000000000000000000000000000000000000000";
|
||||
if !is_new_branch
|
||||
&& old_oid != new_oid
|
||||
&& r#ref.name.starts_with("refs/heads/")
|
||||
&& protection.forbid_force_push
|
||||
{
|
||||
return Err(format!(
|
||||
"Force push to protected branch '{}' is forbidden",
|
||||
r#ref.name
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// Check push
|
||||
if protection.forbid_push {
|
||||
return Err(format!(
|
||||
"Push to protected branch '{}' is forbidden",
|
||||
r#ref.name
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn parse_ref_updates(buffer: &[u8]) -> Result<Vec<RefUpdate>, String> {
|
||||
let text = String::from_utf8_lossy(buffer);
|
||||
let mut refs = Vec::new();
|
||||
|
||||
for line in text.lines() {
|
||||
let line = line.trim();
|
||||
if line.is_empty() || line.starts_with('#') || line.starts_with("PACK") {
|
||||
continue;
|
||||
}
|
||||
// Format: "<old-oid> <new-oid> <ref-name>\n"
|
||||
let mut parts = line.split_whitespace();
|
||||
let old_oid = parts.next().map(|s| s.to_string());
|
||||
let new_oid = parts.next().map(|s| s.to_string());
|
||||
let name = parts
|
||||
.next()
|
||||
.unwrap_or("")
|
||||
.trim_start_matches('\0')
|
||||
.to_string();
|
||||
if !name.is_empty() {
|
||||
refs.push(RefUpdate {
|
||||
old_oid,
|
||||
new_oid,
|
||||
name,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok(refs)
|
||||
}
|
||||
|
||||
@ -1,11 +1,19 @@
|
||||
use crate::error::GitError;
|
||||
use crate::http::HttpAppState;
|
||||
use crate::http::auth::authorize_repo_access;
|
||||
use crate::http::auth::verify_access_token;
|
||||
use crate::http::handler::is_valid_lfs_oid;
|
||||
use crate::http::lfs::{BatchRequest, CreateLockRequest, LfsHandler};
|
||||
use crate::http::utils::{get_repo_model, hash_access_key};
|
||||
use crate::http::utils::{extract_basic_credentials, get_repo_model};
|
||||
use crate::ssh::authz::SshAuthService;
|
||||
use crate::ssh::push_queue::{
|
||||
PushQueueEvent, PushQueueLease, PushQueueWaitError, wait_for_push_queue_slot,
|
||||
};
|
||||
use actix_web::{Error, HttpRequest, HttpResponse, web};
|
||||
use models::users::user_token;
|
||||
use argon2::Argon2;
|
||||
use argon2::password_hash::{PasswordHash, PasswordVerifier};
|
||||
use models::repos::repo;
|
||||
use models::users::{user, user_token};
|
||||
use sea_orm::prelude::*;
|
||||
use std::path::PathBuf;
|
||||
|
||||
@ -31,43 +39,16 @@ fn bearer_token(req: &HttpRequest) -> Result<String, Error> {
|
||||
}
|
||||
}
|
||||
|
||||
fn hash_token(token: &str) -> Result<String, argon2::password_hash::Error> {
|
||||
hash_access_key(token)
|
||||
}
|
||||
|
||||
/// Derive the acting user from the authenticated bearer token.
|
||||
async fn user_uid(req: &HttpRequest, db: &db::database::AppDatabase) -> Result<uuid::Uuid, Error> {
|
||||
let auth_header = req
|
||||
.headers()
|
||||
.get("authorization")
|
||||
.ok_or_else(|| actix_web::error::ErrorUnauthorized("Missing authorization header"))?;
|
||||
let auth_str = auth_header
|
||||
.to_str()
|
||||
.map_err(|_| actix_web::error::ErrorUnauthorized("Invalid authorization header"))?;
|
||||
let token = auth_str
|
||||
.strip_prefix("Bearer ")
|
||||
.ok_or_else(|| actix_web::error::ErrorUnauthorized("Invalid authorization format"))?;
|
||||
|
||||
let token_hash = hash_token(token)
|
||||
.map_err(|_| actix_web::error::ErrorInternalServerError("Token hash failed"))?;
|
||||
|
||||
let token_model = user_token::Entity::find()
|
||||
.filter(user_token::Column::TokenHash.eq(&token_hash))
|
||||
.filter(user_token::Column::IsRevoked.eq(false))
|
||||
.one(db.reader())
|
||||
.await
|
||||
.map_err(|_| actix_web::error::ErrorUnauthorized("Authentication failed"))?;
|
||||
|
||||
let token_model =
|
||||
token_model.ok_or_else(|| actix_web::error::ErrorUnauthorized("Invalid token"))?;
|
||||
|
||||
if let Some(expires_at) = token_model.expires_at {
|
||||
if expires_at < chrono::Utc::now() {
|
||||
return Err(actix_web::error::ErrorUnauthorized("Token expired"));
|
||||
}
|
||||
if let Ok((username, access_key)) = extract_basic_credentials(req) {
|
||||
return verify_access_token(db, &username, &access_key)
|
||||
.await
|
||||
.map(|user| user.uid);
|
||||
}
|
||||
|
||||
Ok(token_model.user)
|
||||
let token = bearer_token(req)?;
|
||||
find_user_by_bearer_token(&token, db).await
|
||||
}
|
||||
|
||||
/// Store LFS batch-generated token in Redis with TTL.
|
||||
@ -140,24 +121,139 @@ async fn validate_lfs_token(
|
||||
}
|
||||
}
|
||||
|
||||
// Second: check if it's a regular user access token
|
||||
let token_hash = hash_token(token)
|
||||
.map_err(|_| actix_web::error::ErrorInternalServerError("Token hash failed"))?;
|
||||
let token_model = user_token::Entity::find()
|
||||
.filter(user_token::Column::TokenHash.eq(&token_hash))
|
||||
.filter(user_token::Column::IsRevoked.eq(false))
|
||||
.one(db.reader())
|
||||
.await
|
||||
.map_err(|_| actix_web::error::ErrorUnauthorized("Authentication failed"))?
|
||||
.ok_or_else(|| actix_web::error::ErrorUnauthorized("Invalid token"))?;
|
||||
// Second: check if it's a regular user access token.
|
||||
find_user_by_bearer_token(token, db).await
|
||||
}
|
||||
|
||||
if let Some(expires_at) = token_model.expires_at {
|
||||
if expires_at < chrono::Utc::now() {
|
||||
return Err(actix_web::error::ErrorUnauthorized("Token expired"));
|
||||
async fn find_user_by_bearer_token(
|
||||
token: &str,
|
||||
db: &db::database::AppDatabase,
|
||||
) -> Result<uuid::Uuid, Error> {
|
||||
let tokens = user_token::Entity::find()
|
||||
.filter(user_token::Column::IsRevoked.eq(false))
|
||||
.all(db.reader())
|
||||
.await
|
||||
.map_err(|_| actix_web::error::ErrorUnauthorized("Authentication failed"))?;
|
||||
|
||||
for token_model in tokens {
|
||||
if token_model
|
||||
.expires_at
|
||||
.map(|expires_at| expires_at < chrono::Utc::now())
|
||||
.unwrap_or(false)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
let Ok(hash) = PasswordHash::new(&token_model.token_hash) else {
|
||||
tracing::warn!(
|
||||
token_id = token_model.id,
|
||||
"invalid stored bearer token hash"
|
||||
);
|
||||
continue;
|
||||
};
|
||||
if Argon2::default()
|
||||
.verify_password(token.as_bytes(), &hash)
|
||||
.is_ok()
|
||||
{
|
||||
return Ok(token_model.user);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(token_model.user)
|
||||
Err(actix_web::error::ErrorUnauthorized("Invalid token"))
|
||||
}
|
||||
|
||||
async fn authorize_user_repo_access(
|
||||
db: &db::database::AppDatabase,
|
||||
user_uid: uuid::Uuid,
|
||||
repo: &repo::Model,
|
||||
is_write: bool,
|
||||
) -> Result<(), Error> {
|
||||
let user = user::Entity::find()
|
||||
.filter(user::Column::Uid.eq(user_uid))
|
||||
.one(db.reader())
|
||||
.await
|
||||
.map_err(|_| actix_web::error::ErrorUnauthorized("Authentication failed"))?
|
||||
.ok_or_else(|| actix_web::error::ErrorUnauthorized("Invalid token user"))?;
|
||||
|
||||
let authz = SshAuthService::new(db.clone());
|
||||
if authz.check_repo_permission(&user, repo, is_write).await {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(actix_web::error::ErrorForbidden(
|
||||
"No permission for repository",
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
async fn acquire_lfs_write_queue(
|
||||
state: &HttpAppState,
|
||||
repo: &repo::Model,
|
||||
operation: &'static str,
|
||||
) -> Result<PushQueueLease, Error> {
|
||||
match wait_for_push_queue_slot(state.sync.clone(), repo.id, |event, request_id| {
|
||||
let request_id = request_id.to_string();
|
||||
match event {
|
||||
PushQueueEvent::Waiting(position) => {
|
||||
tracing::info!(
|
||||
repo = %repo.repo_name,
|
||||
repo_id = %repo.id,
|
||||
request_id = %request_id,
|
||||
operation = operation,
|
||||
position = position.position,
|
||||
total = position.total,
|
||||
"lfs_write_queue_waiting"
|
||||
);
|
||||
}
|
||||
PushQueueEvent::Acquired => {
|
||||
tracing::info!(
|
||||
repo = %repo.repo_name,
|
||||
repo_id = %repo.id,
|
||||
request_id = %request_id,
|
||||
operation = operation,
|
||||
"lfs_write_queue_acquired"
|
||||
);
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(lease) => Ok(lease),
|
||||
Err(PushQueueWaitError::Join(e)) => {
|
||||
tracing::error!(
|
||||
error = %e,
|
||||
repo = %repo.repo_name,
|
||||
repo_id = %repo.id,
|
||||
operation = operation,
|
||||
"lfs_write_queue_join_failed"
|
||||
);
|
||||
Err(actix_web::error::ErrorServiceUnavailable(
|
||||
"GitData: LFS write queue is temporarily unavailable. Please retry later.",
|
||||
))
|
||||
}
|
||||
Err(PushQueueWaitError::Lock(e)) => {
|
||||
tracing::error!(
|
||||
error = %e,
|
||||
repo = %repo.repo_name,
|
||||
repo_id = %repo.id,
|
||||
operation = operation,
|
||||
"lfs_write_queue_lock_failed"
|
||||
);
|
||||
Err(actix_web::error::ErrorServiceUnavailable(
|
||||
"GitData: LFS write queue lock failed. Please retry later.",
|
||||
))
|
||||
}
|
||||
Err(PushQueueWaitError::Timeout) => {
|
||||
tracing::warn!(
|
||||
repo = %repo.repo_name,
|
||||
repo_id = %repo.id,
|
||||
operation = operation,
|
||||
"lfs_write_queue_timeout"
|
||||
);
|
||||
Err(actix_web::error::ErrorServiceUnavailable(
|
||||
"GitData: LFS write queue timed out. Please retry in a moment.",
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn lfs_batch(
|
||||
@ -214,16 +310,20 @@ pub async fn lfs_upload(
|
||||
let token = bearer_token(&req)?;
|
||||
|
||||
// Validate token (batch token or user access token) with write permission
|
||||
let _uid = validate_lfs_token(&token, &state.cache, &state.db, repo.id, "upload").await?;
|
||||
let uid = validate_lfs_token(&token, &state.cache, &state.db, repo.id, "upload").await?;
|
||||
authorize_user_repo_access(&state.db, uid, &repo, true).await?;
|
||||
|
||||
let handler = LfsHandler::new(PathBuf::from(&repo.storage_path), repo, state.db.clone());
|
||||
let mut queue_lease = acquire_lfs_write_queue(&state, &handler.model, "upload").await?;
|
||||
|
||||
match handler.upload_object(&oid, payload).await {
|
||||
let result = match handler.upload_object(&oid, payload).await {
|
||||
Ok(response) => Ok(response),
|
||||
Err(GitError::InvalidOid(_)) => Err(actix_web::error::ErrorBadRequest("Invalid OID")),
|
||||
Err(GitError::AuthFailed(_)) => Err(actix_web::error::ErrorUnauthorized("Unauthorized")),
|
||||
Err(_e) => Err(actix_web::error::ErrorInternalServerError("Upload failed")),
|
||||
}
|
||||
};
|
||||
queue_lease.release().await;
|
||||
result
|
||||
}
|
||||
|
||||
pub async fn lfs_download(
|
||||
@ -242,8 +342,8 @@ pub async fn lfs_download(
|
||||
// Auth check: private repos require auth; public repos allow anonymous
|
||||
if repo.is_private {
|
||||
let token = bearer_token(&req)?;
|
||||
let _uid = validate_lfs_token(&token, &state.cache, &state.db, repo.id, "download").await?;
|
||||
authorize_repo_access(&req, &state.db, &repo, false).await?;
|
||||
let uid = validate_lfs_token(&token, &state.cache, &state.db, repo.id, "download").await?;
|
||||
authorize_user_repo_access(&state.db, uid, &repo, false).await?;
|
||||
}
|
||||
|
||||
let handler = LfsHandler::new(PathBuf::from(&repo.storage_path), repo, state.db.clone());
|
||||
@ -270,12 +370,15 @@ pub async fn lfs_lock_create(
|
||||
let uid = user_uid(&req, &state.db).await?;
|
||||
authorize_repo_access(&req, &state.db, &repo, true).await?;
|
||||
let handler = LfsHandler::new(PathBuf::from(&repo.storage_path), repo, state.db.clone());
|
||||
let mut queue_lease = acquire_lfs_write_queue(&state, &handler.model, "lock_create").await?;
|
||||
|
||||
match handler.lock_object(&body.oid, uid).await {
|
||||
let result = match handler.lock_object(&body.oid, uid).await {
|
||||
Ok(lock) => Ok(HttpResponse::Created().json(lock)),
|
||||
Err(GitError::Locked(msg)) => Ok(HttpResponse::Conflict().body(msg)),
|
||||
Err(_e) => Err(actix_web::error::ErrorInternalServerError("Lock failed")),
|
||||
}
|
||||
};
|
||||
queue_lease.release().await;
|
||||
result
|
||||
}
|
||||
|
||||
pub async fn lfs_lock_list(
|
||||
@ -289,7 +392,8 @@ pub async fn lfs_lock_list(
|
||||
|
||||
// Auth check: private repos require auth for lock listing
|
||||
if repo.is_private {
|
||||
user_uid(&req, &state.db).await?;
|
||||
let uid = user_uid(&req, &state.db).await?;
|
||||
authorize_user_repo_access(&state.db, uid, &repo, false).await?;
|
||||
}
|
||||
|
||||
let maybe_oid = query.get("oid").map(|s| s.as_str());
|
||||
@ -313,7 +417,8 @@ pub async fn lfs_lock_get(
|
||||
|
||||
// Auth check: private repos require auth for lock viewing
|
||||
if repo.is_private {
|
||||
user_uid(&req, &state.db).await?;
|
||||
let uid = user_uid(&req, &state.db).await?;
|
||||
authorize_user_repo_access(&state.db, uid, &repo, false).await?;
|
||||
}
|
||||
|
||||
let handler = LfsHandler::new(PathBuf::from(&repo.storage_path), repo, state.db.clone());
|
||||
@ -338,13 +443,16 @@ pub async fn lfs_lock_delete(
|
||||
let uid = user_uid(&req, &state.db).await?;
|
||||
authorize_repo_access(&req, &state.db, &repo, true).await?;
|
||||
let handler = LfsHandler::new(PathBuf::from(&repo.storage_path), repo, state.db.clone());
|
||||
let mut queue_lease = acquire_lfs_write_queue(&state, &handler.model, "lock_delete").await?;
|
||||
|
||||
match handler.unlock_object(&lock_id, uid).await {
|
||||
let result = match handler.unlock_object(&lock_id, uid).await {
|
||||
Ok(()) => Ok(HttpResponse::NoContent().finish()),
|
||||
Err(GitError::PermissionDenied(_)) => Err(actix_web::error::ErrorForbidden("Not allowed")),
|
||||
Err(GitError::NotFound(_)) => Err(actix_web::error::ErrorNotFound("Lock not found")),
|
||||
Err(_e) => Err(actix_web::error::ErrorInternalServerError(
|
||||
"Lock delete failed",
|
||||
)),
|
||||
}
|
||||
};
|
||||
queue_lease.release().await;
|
||||
result
|
||||
}
|
||||
|
||||
@ -3,6 +3,7 @@ use crate::http::auth::authorize_repo_access;
|
||||
use crate::http::handler::GitHttpHandler;
|
||||
use crate::http::utils::get_repo_model;
|
||||
use crate::ssh::RepoReceiveSyncTask;
|
||||
use crate::ssh::push_queue::{PushQueueEvent, PushQueueWaitError, wait_for_push_queue_slot};
|
||||
use actix_web::{Error, HttpRequest, HttpResponse, web};
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
@ -77,21 +78,88 @@ pub async fn receive_pack(
|
||||
let model = get_repo_model(&path_inner.0, &path_inner.1, &state.db).await?;
|
||||
authorize_repo_access(&req, &state.db, &model, true).await?;
|
||||
|
||||
let mut push_queue_lease =
|
||||
match wait_for_push_queue_slot(state.sync.clone(), model.id, |event, request_id| {
|
||||
let request_id = request_id.to_string();
|
||||
let repo_name = model.repo_name.clone();
|
||||
let repo_id = model.id;
|
||||
match event {
|
||||
PushQueueEvent::Waiting(position) => {
|
||||
tracing::info!(
|
||||
repo = %repo_name,
|
||||
repo_id = %repo_id,
|
||||
request_id = %request_id,
|
||||
position = position.position,
|
||||
total = position.total,
|
||||
"http_push_queue_waiting"
|
||||
);
|
||||
}
|
||||
PushQueueEvent::Acquired => {
|
||||
tracing::info!(
|
||||
repo = %repo_name,
|
||||
repo_id = %repo_id,
|
||||
request_id = %request_id,
|
||||
"http_push_queue_acquired"
|
||||
);
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(lease) => lease,
|
||||
Err(PushQueueWaitError::Join(e)) => {
|
||||
tracing::error!(
|
||||
error = %e,
|
||||
repo = %model.repo_name,
|
||||
repo_id = %model.id,
|
||||
"http_push_queue_join_failed"
|
||||
);
|
||||
return Err(actix_web::error::ErrorServiceUnavailable(
|
||||
"GitData: push queue is temporarily unavailable. Please retry later.",
|
||||
));
|
||||
}
|
||||
Err(PushQueueWaitError::Lock(e)) => {
|
||||
tracing::error!(
|
||||
error = %e,
|
||||
repo = %model.repo_name,
|
||||
repo_id = %model.id,
|
||||
"http_push_queue_lock_failed"
|
||||
);
|
||||
return Err(actix_web::error::ErrorServiceUnavailable(
|
||||
"GitData: push queue lock failed. Please retry later.",
|
||||
));
|
||||
}
|
||||
Err(PushQueueWaitError::Timeout) => {
|
||||
tracing::info!(
|
||||
repo = %model.repo_name,
|
||||
repo_id = %model.id,
|
||||
"http_push_queue_timeout"
|
||||
);
|
||||
return Ok(HttpResponse::ServiceUnavailable()
|
||||
.insert_header(("Retry-After", "5"))
|
||||
.content_type("text/plain; charset=utf-8")
|
||||
.body("GitData: push queue timed out. Please retry in a moment.\n"));
|
||||
}
|
||||
};
|
||||
|
||||
let storage_path = PathBuf::from(&model.storage_path);
|
||||
let handler = GitHttpHandler::new(storage_path, model.clone(), state.db.clone());
|
||||
let result = handler.receive_pack(payload).await;
|
||||
push_queue_lease.release().await;
|
||||
|
||||
let _ = tokio::spawn({
|
||||
let sync = state.sync.clone();
|
||||
let repo_uid = model.id;
|
||||
async move {
|
||||
let _ = timeout(
|
||||
Duration::from_secs(5),
|
||||
sync.send(RepoReceiveSyncTask { repo_uid }),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
});
|
||||
if result.is_ok() {
|
||||
let _ = tokio::spawn({
|
||||
let sync = state.sync.clone();
|
||||
let repo_uid = model.id;
|
||||
async move {
|
||||
let _ = timeout(
|
||||
Duration::from_secs(5),
|
||||
sync.send(RepoReceiveSyncTask { repo_uid }),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user