From 2b6b4af3db9cbc5d99baa0a547876150676ee832 Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Fri, 15 May 2026 11:48:33 +0800 Subject: [PATCH] feat(http): improve auth verification and route handling - Migrate access key auth from custom hash to Argon2 password verification - Check all un-revoked tokens with expiry validation - Add branch protection checks to HTTP push handlers --- libs/git/http/auth.rs | 39 +++--- libs/git/http/handler.rs | 123 +++---------------- libs/git/http/lfs_routes.rs | 228 ++++++++++++++++++++++++++---------- libs/git/http/routes.rs | 90 ++++++++++++-- 4 files changed, 287 insertions(+), 193 deletions(-) diff --git a/libs/git/http/auth.rs b/libs/git/http/auth.rs index 57de0d3..8d8258a 100644 --- a/libs/git/http/auth.rs +++ b/libs/git/http/auth.rs @@ -1,6 +1,8 @@ -use crate::http::utils::{extract_basic_credentials, hash_access_key}; +use crate::http::utils::extract_basic_credentials; use crate::ssh::authz::SshAuthService; use actix_web::{Error, HttpRequest}; +use argon2::Argon2; +use argon2::password_hash::{PasswordHash, PasswordVerifier}; use db::database::AppDatabase; use models::repos::repo; use models::users::{user, user_token}; @@ -19,27 +21,36 @@ pub async fn verify_access_token( .map_err(|_| actix_web::error::ErrorUnauthorized("Invalid username or access key"))? .ok_or_else(|| actix_web::error::ErrorUnauthorized("Invalid username or access key"))?; - let token_hash = hash_access_key(access_key) - .map_err(|_| actix_web::error::ErrorInternalServerError("Token hash failed"))?; - - let token = user_token::Entity::find() + let tokens = user_token::Entity::find() .filter(user_token::Column::User.eq(user.uid)) - .filter(user_token::Column::TokenHash.eq(token_hash)) .filter(user_token::Column::IsRevoked.eq(false)) - .one(db.reader()) + .all(db.reader()) .await .map_err(|_| actix_web::error::ErrorUnauthorized("Invalid username or access key"))? - .ok_or_else(|| actix_web::error::ErrorUnauthorized("Invalid username or access key"))?; + .into_iter() + .filter(|token| { + token + .expires_at + .map(|expires_at| expires_at >= chrono::Utc::now()) + .unwrap_or(true) + }); - if let Some(expires_at) = token.expires_at { - if expires_at < chrono::Utc::now() { - return Err(actix_web::error::ErrorUnauthorized( - "Access key has expired", - )); + for token in tokens { + let Ok(hash) = PasswordHash::new(&token.token_hash) else { + tracing::warn!(token_id = token.id, "invalid stored access key hash"); + continue; + }; + if Argon2::default() + .verify_password(access_key.as_bytes(), &hash) + .is_ok() + { + return Ok(user); } } - Ok(user) + Err(actix_web::error::ErrorUnauthorized( + "Invalid username or access key", + )) } pub async fn authorize_repo_access( diff --git a/libs/git/http/handler.rs b/libs/git/http/handler.rs index 7a27098..9b0aa7d 100644 --- a/libs/git/http/handler.rs +++ b/libs/git/http/handler.rs @@ -1,3 +1,5 @@ +use crate::ssh::branch_protect::check_branch_protection; +use crate::ssh::ref_update::RefUpdate; use actix_web::{Error, HttpResponse, web}; use async_stream::stream; use futures_util::Stream; @@ -134,7 +136,6 @@ impl GitHttpHandler { .await .map_err(|e| actix_web::error::ErrorInternalServerError(e.to_string()))?; - const PACK_SIG: &[u8] = b"PACK"; let mut pre_pack: Vec = Vec::with_capacity(65536); while let Some(chunk) = stream.next().await { @@ -157,10 +158,16 @@ impl GitHttpHandler { ))); } - if let Some(pos) = bytes.windows(4).position(|w| w == PACK_SIG) { - pre_pack.extend_from_slice(&bytes[..pos]); + if let Some(pos) = bytes.windows(4).position(|w| w == b"0000") { + let end = pos + 4; + pre_pack.extend_from_slice(&bytes[..end]); - if let Err(msg) = check_branch_protection(&branch_protects, &pre_pack) { + let refs = RefUpdate::parse_ref_updates(&pre_pack) + .map_err(actix_web::error::ErrorBadRequest)?; + if let Some(msg) = refs + .iter() + .find_map(|r#ref| check_branch_protection(&branch_protects, r#ref)) + { tracing::warn!( "branch_protection_violation repo={} repo_id={} message={}", self.repo.repo_name, @@ -171,7 +178,10 @@ impl GitHttpHandler { } let remaining: ByteStream = Box::pin(stream! { - yield Ok(bytes[pos..].to_vec()); + yield Ok(pre_pack); + if end < bytes.len() { + yield Ok(bytes[end..].to_vec()); + } while let Some(chunk) = stream.next().await { yield chunk; } @@ -267,106 +277,3 @@ fn write_pkt_line(buf: &mut Vec, data: &[u8]) { fn write_flush_pkt(buf: &mut Vec) { buf.extend_from_slice(b"0000"); } - -#[derive(Debug)] -struct RefUpdate { - old_oid: Option, - new_oid: Option, - name: String, -} - -fn check_branch_protection( - branch_protects: &[repo_branch_protect::Model], - pre_pack: &[u8], -) -> Result<(), String> { - let refs = parse_ref_updates(pre_pack)?; - for r#ref in &refs { - for protection in branch_protects { - // Match exactly or as directory prefix (e.g. "refs/heads/main" - // matches "refs/heads/main" and "refs/heads/main/*" but NOT - // "refs/heads/main-v2"). - let matches = r#ref.name == protection.branch - || r#ref.name.starts_with(&format!("{}/", protection.branch)); - if !matches { - continue; - } - // Check deletion (new_oid is all zeros / 40 zeros) - if r#ref.new_oid.as_deref() == Some("0000000000000000000000000000000000000000") { - if protection.forbid_deletion { - return Err(format!( - "Deletion of protected branch '{}' is forbidden", - r#ref.name - )); - } - continue; - } - - // Check tag push - if r#ref.name.starts_with("refs/tags/") { - if protection.forbid_tag_push { - return Err(format!( - "Tag push to protected branch '{}' is forbidden", - r#ref.name - )); - } - continue; - } - - // Check force push: old != new AND old is non-zero (non-fast-forward update) - if let (Some(old_oid), Some(new_oid)) = - (r#ref.old_oid.as_deref(), r#ref.new_oid.as_deref()) - { - let is_new_branch = old_oid == "0000000000000000000000000000000000000000"; - if !is_new_branch - && old_oid != new_oid - && r#ref.name.starts_with("refs/heads/") - && protection.forbid_force_push - { - return Err(format!( - "Force push to protected branch '{}' is forbidden", - r#ref.name - )); - } - } - - // Check push - if protection.forbid_push { - return Err(format!( - "Push to protected branch '{}' is forbidden", - r#ref.name - )); - } - } - } - Ok(()) -} - -fn parse_ref_updates(buffer: &[u8]) -> Result, String> { - let text = String::from_utf8_lossy(buffer); - let mut refs = Vec::new(); - - for line in text.lines() { - let line = line.trim(); - if line.is_empty() || line.starts_with('#') || line.starts_with("PACK") { - continue; - } - // Format: " \n" - let mut parts = line.split_whitespace(); - let old_oid = parts.next().map(|s| s.to_string()); - let new_oid = parts.next().map(|s| s.to_string()); - let name = parts - .next() - .unwrap_or("") - .trim_start_matches('\0') - .to_string(); - if !name.is_empty() { - refs.push(RefUpdate { - old_oid, - new_oid, - name, - }); - } - } - - Ok(refs) -} diff --git a/libs/git/http/lfs_routes.rs b/libs/git/http/lfs_routes.rs index cceb3f0..8a9a17a 100644 --- a/libs/git/http/lfs_routes.rs +++ b/libs/git/http/lfs_routes.rs @@ -1,11 +1,19 @@ use crate::error::GitError; use crate::http::HttpAppState; use crate::http::auth::authorize_repo_access; +use crate::http::auth::verify_access_token; use crate::http::handler::is_valid_lfs_oid; use crate::http::lfs::{BatchRequest, CreateLockRequest, LfsHandler}; -use crate::http::utils::{get_repo_model, hash_access_key}; +use crate::http::utils::{extract_basic_credentials, get_repo_model}; +use crate::ssh::authz::SshAuthService; +use crate::ssh::push_queue::{ + PushQueueEvent, PushQueueLease, PushQueueWaitError, wait_for_push_queue_slot, +}; use actix_web::{Error, HttpRequest, HttpResponse, web}; -use models::users::user_token; +use argon2::Argon2; +use argon2::password_hash::{PasswordHash, PasswordVerifier}; +use models::repos::repo; +use models::users::{user, user_token}; use sea_orm::prelude::*; use std::path::PathBuf; @@ -31,43 +39,16 @@ fn bearer_token(req: &HttpRequest) -> Result { } } -fn hash_token(token: &str) -> Result { - hash_access_key(token) -} - /// Derive the acting user from the authenticated bearer token. async fn user_uid(req: &HttpRequest, db: &db::database::AppDatabase) -> Result { - let auth_header = req - .headers() - .get("authorization") - .ok_or_else(|| actix_web::error::ErrorUnauthorized("Missing authorization header"))?; - let auth_str = auth_header - .to_str() - .map_err(|_| actix_web::error::ErrorUnauthorized("Invalid authorization header"))?; - let token = auth_str - .strip_prefix("Bearer ") - .ok_or_else(|| actix_web::error::ErrorUnauthorized("Invalid authorization format"))?; - - let token_hash = hash_token(token) - .map_err(|_| actix_web::error::ErrorInternalServerError("Token hash failed"))?; - - let token_model = user_token::Entity::find() - .filter(user_token::Column::TokenHash.eq(&token_hash)) - .filter(user_token::Column::IsRevoked.eq(false)) - .one(db.reader()) - .await - .map_err(|_| actix_web::error::ErrorUnauthorized("Authentication failed"))?; - - let token_model = - token_model.ok_or_else(|| actix_web::error::ErrorUnauthorized("Invalid token"))?; - - if let Some(expires_at) = token_model.expires_at { - if expires_at < chrono::Utc::now() { - return Err(actix_web::error::ErrorUnauthorized("Token expired")); - } + if let Ok((username, access_key)) = extract_basic_credentials(req) { + return verify_access_token(db, &username, &access_key) + .await + .map(|user| user.uid); } - Ok(token_model.user) + let token = bearer_token(req)?; + find_user_by_bearer_token(&token, db).await } /// Store LFS batch-generated token in Redis with TTL. @@ -140,24 +121,139 @@ async fn validate_lfs_token( } } - // Second: check if it's a regular user access token - let token_hash = hash_token(token) - .map_err(|_| actix_web::error::ErrorInternalServerError("Token hash failed"))?; - let token_model = user_token::Entity::find() - .filter(user_token::Column::TokenHash.eq(&token_hash)) - .filter(user_token::Column::IsRevoked.eq(false)) - .one(db.reader()) - .await - .map_err(|_| actix_web::error::ErrorUnauthorized("Authentication failed"))? - .ok_or_else(|| actix_web::error::ErrorUnauthorized("Invalid token"))?; + // Second: check if it's a regular user access token. + find_user_by_bearer_token(token, db).await +} - if let Some(expires_at) = token_model.expires_at { - if expires_at < chrono::Utc::now() { - return Err(actix_web::error::ErrorUnauthorized("Token expired")); +async fn find_user_by_bearer_token( + token: &str, + db: &db::database::AppDatabase, +) -> Result { + let tokens = user_token::Entity::find() + .filter(user_token::Column::IsRevoked.eq(false)) + .all(db.reader()) + .await + .map_err(|_| actix_web::error::ErrorUnauthorized("Authentication failed"))?; + + for token_model in tokens { + if token_model + .expires_at + .map(|expires_at| expires_at < chrono::Utc::now()) + .unwrap_or(false) + { + continue; + } + + let Ok(hash) = PasswordHash::new(&token_model.token_hash) else { + tracing::warn!( + token_id = token_model.id, + "invalid stored bearer token hash" + ); + continue; + }; + if Argon2::default() + .verify_password(token.as_bytes(), &hash) + .is_ok() + { + return Ok(token_model.user); } } - Ok(token_model.user) + Err(actix_web::error::ErrorUnauthorized("Invalid token")) +} + +async fn authorize_user_repo_access( + db: &db::database::AppDatabase, + user_uid: uuid::Uuid, + repo: &repo::Model, + is_write: bool, +) -> Result<(), Error> { + let user = user::Entity::find() + .filter(user::Column::Uid.eq(user_uid)) + .one(db.reader()) + .await + .map_err(|_| actix_web::error::ErrorUnauthorized("Authentication failed"))? + .ok_or_else(|| actix_web::error::ErrorUnauthorized("Invalid token user"))?; + + let authz = SshAuthService::new(db.clone()); + if authz.check_repo_permission(&user, repo, is_write).await { + Ok(()) + } else { + Err(actix_web::error::ErrorForbidden( + "No permission for repository", + )) + } +} + +async fn acquire_lfs_write_queue( + state: &HttpAppState, + repo: &repo::Model, + operation: &'static str, +) -> Result { + match wait_for_push_queue_slot(state.sync.clone(), repo.id, |event, request_id| { + let request_id = request_id.to_string(); + match event { + PushQueueEvent::Waiting(position) => { + tracing::info!( + repo = %repo.repo_name, + repo_id = %repo.id, + request_id = %request_id, + operation = operation, + position = position.position, + total = position.total, + "lfs_write_queue_waiting" + ); + } + PushQueueEvent::Acquired => { + tracing::info!( + repo = %repo.repo_name, + repo_id = %repo.id, + request_id = %request_id, + operation = operation, + "lfs_write_queue_acquired" + ); + } + } + }) + .await + { + Ok(lease) => Ok(lease), + Err(PushQueueWaitError::Join(e)) => { + tracing::error!( + error = %e, + repo = %repo.repo_name, + repo_id = %repo.id, + operation = operation, + "lfs_write_queue_join_failed" + ); + Err(actix_web::error::ErrorServiceUnavailable( + "GitData: LFS write queue is temporarily unavailable. Please retry later.", + )) + } + Err(PushQueueWaitError::Lock(e)) => { + tracing::error!( + error = %e, + repo = %repo.repo_name, + repo_id = %repo.id, + operation = operation, + "lfs_write_queue_lock_failed" + ); + Err(actix_web::error::ErrorServiceUnavailable( + "GitData: LFS write queue lock failed. Please retry later.", + )) + } + Err(PushQueueWaitError::Timeout) => { + tracing::warn!( + repo = %repo.repo_name, + repo_id = %repo.id, + operation = operation, + "lfs_write_queue_timeout" + ); + Err(actix_web::error::ErrorServiceUnavailable( + "GitData: LFS write queue timed out. Please retry in a moment.", + )) + } + } } pub async fn lfs_batch( @@ -214,16 +310,20 @@ pub async fn lfs_upload( let token = bearer_token(&req)?; // Validate token (batch token or user access token) with write permission - let _uid = validate_lfs_token(&token, &state.cache, &state.db, repo.id, "upload").await?; + let uid = validate_lfs_token(&token, &state.cache, &state.db, repo.id, "upload").await?; + authorize_user_repo_access(&state.db, uid, &repo, true).await?; let handler = LfsHandler::new(PathBuf::from(&repo.storage_path), repo, state.db.clone()); + let mut queue_lease = acquire_lfs_write_queue(&state, &handler.model, "upload").await?; - match handler.upload_object(&oid, payload).await { + let result = match handler.upload_object(&oid, payload).await { Ok(response) => Ok(response), Err(GitError::InvalidOid(_)) => Err(actix_web::error::ErrorBadRequest("Invalid OID")), Err(GitError::AuthFailed(_)) => Err(actix_web::error::ErrorUnauthorized("Unauthorized")), Err(_e) => Err(actix_web::error::ErrorInternalServerError("Upload failed")), - } + }; + queue_lease.release().await; + result } pub async fn lfs_download( @@ -242,8 +342,8 @@ pub async fn lfs_download( // Auth check: private repos require auth; public repos allow anonymous if repo.is_private { let token = bearer_token(&req)?; - let _uid = validate_lfs_token(&token, &state.cache, &state.db, repo.id, "download").await?; - authorize_repo_access(&req, &state.db, &repo, false).await?; + let uid = validate_lfs_token(&token, &state.cache, &state.db, repo.id, "download").await?; + authorize_user_repo_access(&state.db, uid, &repo, false).await?; } let handler = LfsHandler::new(PathBuf::from(&repo.storage_path), repo, state.db.clone()); @@ -270,12 +370,15 @@ pub async fn lfs_lock_create( let uid = user_uid(&req, &state.db).await?; authorize_repo_access(&req, &state.db, &repo, true).await?; let handler = LfsHandler::new(PathBuf::from(&repo.storage_path), repo, state.db.clone()); + let mut queue_lease = acquire_lfs_write_queue(&state, &handler.model, "lock_create").await?; - match handler.lock_object(&body.oid, uid).await { + let result = match handler.lock_object(&body.oid, uid).await { Ok(lock) => Ok(HttpResponse::Created().json(lock)), Err(GitError::Locked(msg)) => Ok(HttpResponse::Conflict().body(msg)), Err(_e) => Err(actix_web::error::ErrorInternalServerError("Lock failed")), - } + }; + queue_lease.release().await; + result } pub async fn lfs_lock_list( @@ -289,7 +392,8 @@ pub async fn lfs_lock_list( // Auth check: private repos require auth for lock listing if repo.is_private { - user_uid(&req, &state.db).await?; + let uid = user_uid(&req, &state.db).await?; + authorize_user_repo_access(&state.db, uid, &repo, false).await?; } let maybe_oid = query.get("oid").map(|s| s.as_str()); @@ -313,7 +417,8 @@ pub async fn lfs_lock_get( // Auth check: private repos require auth for lock viewing if repo.is_private { - user_uid(&req, &state.db).await?; + let uid = user_uid(&req, &state.db).await?; + authorize_user_repo_access(&state.db, uid, &repo, false).await?; } let handler = LfsHandler::new(PathBuf::from(&repo.storage_path), repo, state.db.clone()); @@ -338,13 +443,16 @@ pub async fn lfs_lock_delete( let uid = user_uid(&req, &state.db).await?; authorize_repo_access(&req, &state.db, &repo, true).await?; let handler = LfsHandler::new(PathBuf::from(&repo.storage_path), repo, state.db.clone()); + let mut queue_lease = acquire_lfs_write_queue(&state, &handler.model, "lock_delete").await?; - match handler.unlock_object(&lock_id, uid).await { + let result = match handler.unlock_object(&lock_id, uid).await { Ok(()) => Ok(HttpResponse::NoContent().finish()), Err(GitError::PermissionDenied(_)) => Err(actix_web::error::ErrorForbidden("Not allowed")), Err(GitError::NotFound(_)) => Err(actix_web::error::ErrorNotFound("Lock not found")), Err(_e) => Err(actix_web::error::ErrorInternalServerError( "Lock delete failed", )), - } + }; + queue_lease.release().await; + result } diff --git a/libs/git/http/routes.rs b/libs/git/http/routes.rs index 289cbdc..9f291cd 100644 --- a/libs/git/http/routes.rs +++ b/libs/git/http/routes.rs @@ -3,6 +3,7 @@ use crate::http::auth::authorize_repo_access; use crate::http::handler::GitHttpHandler; use crate::http::utils::get_repo_model; use crate::ssh::RepoReceiveSyncTask; +use crate::ssh::push_queue::{PushQueueEvent, PushQueueWaitError, wait_for_push_queue_slot}; use actix_web::{Error, HttpRequest, HttpResponse, web}; use std::path::PathBuf; use std::time::Duration; @@ -77,21 +78,88 @@ pub async fn receive_pack( let model = get_repo_model(&path_inner.0, &path_inner.1, &state.db).await?; authorize_repo_access(&req, &state.db, &model, true).await?; + let mut push_queue_lease = + match wait_for_push_queue_slot(state.sync.clone(), model.id, |event, request_id| { + let request_id = request_id.to_string(); + let repo_name = model.repo_name.clone(); + let repo_id = model.id; + match event { + PushQueueEvent::Waiting(position) => { + tracing::info!( + repo = %repo_name, + repo_id = %repo_id, + request_id = %request_id, + position = position.position, + total = position.total, + "http_push_queue_waiting" + ); + } + PushQueueEvent::Acquired => { + tracing::info!( + repo = %repo_name, + repo_id = %repo_id, + request_id = %request_id, + "http_push_queue_acquired" + ); + } + } + }) + .await + { + Ok(lease) => lease, + Err(PushQueueWaitError::Join(e)) => { + tracing::error!( + error = %e, + repo = %model.repo_name, + repo_id = %model.id, + "http_push_queue_join_failed" + ); + return Err(actix_web::error::ErrorServiceUnavailable( + "GitData: push queue is temporarily unavailable. Please retry later.", + )); + } + Err(PushQueueWaitError::Lock(e)) => { + tracing::error!( + error = %e, + repo = %model.repo_name, + repo_id = %model.id, + "http_push_queue_lock_failed" + ); + return Err(actix_web::error::ErrorServiceUnavailable( + "GitData: push queue lock failed. Please retry later.", + )); + } + Err(PushQueueWaitError::Timeout) => { + tracing::info!( + repo = %model.repo_name, + repo_id = %model.id, + "http_push_queue_timeout" + ); + return Ok(HttpResponse::ServiceUnavailable() + .insert_header(("Retry-After", "5")) + .content_type("text/plain; charset=utf-8") + .body("GitData: push queue timed out. Please retry in a moment.\n")); + } + }; + let storage_path = PathBuf::from(&model.storage_path); let handler = GitHttpHandler::new(storage_path, model.clone(), state.db.clone()); let result = handler.receive_pack(payload).await; + push_queue_lease.release().await; - let _ = tokio::spawn({ - let sync = state.sync.clone(); - let repo_uid = model.id; - async move { - let _ = timeout( - Duration::from_secs(5), - sync.send(RepoReceiveSyncTask { repo_uid }), - ) - .await; - } - }); + if result.is_ok() { + let _ = tokio::spawn({ + let sync = state.sync.clone(); + let repo_uid = model.id; + async move { + let _ = timeout( + Duration::from_secs(5), + sync.send(RepoReceiveSyncTask { repo_uid }), + ) + .await; + } + }); + } result }