Compare commits
3 Commits
894c3873a4
...
b35d2d4fe7
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b35d2d4fe7 | ||
|
|
0703816482 | ||
|
|
2b6b4af3db |
@ -1,6 +1,8 @@
|
|||||||
use crate::http::utils::{extract_basic_credentials, hash_access_key};
|
use crate::http::utils::extract_basic_credentials;
|
||||||
use crate::ssh::authz::SshAuthService;
|
use crate::ssh::authz::SshAuthService;
|
||||||
use actix_web::{Error, HttpRequest};
|
use actix_web::{Error, HttpRequest};
|
||||||
|
use argon2::Argon2;
|
||||||
|
use argon2::password_hash::{PasswordHash, PasswordVerifier};
|
||||||
use db::database::AppDatabase;
|
use db::database::AppDatabase;
|
||||||
use models::repos::repo;
|
use models::repos::repo;
|
||||||
use models::users::{user, user_token};
|
use models::users::{user, user_token};
|
||||||
@ -19,27 +21,36 @@ pub async fn verify_access_token(
|
|||||||
.map_err(|_| actix_web::error::ErrorUnauthorized("Invalid username or access key"))?
|
.map_err(|_| actix_web::error::ErrorUnauthorized("Invalid username or access key"))?
|
||||||
.ok_or_else(|| actix_web::error::ErrorUnauthorized("Invalid username or access key"))?;
|
.ok_or_else(|| actix_web::error::ErrorUnauthorized("Invalid username or access key"))?;
|
||||||
|
|
||||||
let token_hash = hash_access_key(access_key)
|
let tokens = user_token::Entity::find()
|
||||||
.map_err(|_| actix_web::error::ErrorInternalServerError("Token hash failed"))?;
|
|
||||||
|
|
||||||
let token = user_token::Entity::find()
|
|
||||||
.filter(user_token::Column::User.eq(user.uid))
|
.filter(user_token::Column::User.eq(user.uid))
|
||||||
.filter(user_token::Column::TokenHash.eq(token_hash))
|
|
||||||
.filter(user_token::Column::IsRevoked.eq(false))
|
.filter(user_token::Column::IsRevoked.eq(false))
|
||||||
.one(db.reader())
|
.all(db.reader())
|
||||||
.await
|
.await
|
||||||
.map_err(|_| actix_web::error::ErrorUnauthorized("Invalid username or access key"))?
|
.map_err(|_| actix_web::error::ErrorUnauthorized("Invalid username or access key"))?
|
||||||
.ok_or_else(|| actix_web::error::ErrorUnauthorized("Invalid username or access key"))?;
|
.into_iter()
|
||||||
|
.filter(|token| {
|
||||||
|
token
|
||||||
|
.expires_at
|
||||||
|
.map(|expires_at| expires_at >= chrono::Utc::now())
|
||||||
|
.unwrap_or(true)
|
||||||
|
});
|
||||||
|
|
||||||
if let Some(expires_at) = token.expires_at {
|
for token in tokens {
|
||||||
if expires_at < chrono::Utc::now() {
|
let Ok(hash) = PasswordHash::new(&token.token_hash) else {
|
||||||
return Err(actix_web::error::ErrorUnauthorized(
|
tracing::warn!(token_id = token.id, "invalid stored access key hash");
|
||||||
"Access key has expired",
|
continue;
|
||||||
));
|
};
|
||||||
|
if Argon2::default()
|
||||||
|
.verify_password(access_key.as_bytes(), &hash)
|
||||||
|
.is_ok()
|
||||||
|
{
|
||||||
|
return Ok(user);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(user)
|
Err(actix_web::error::ErrorUnauthorized(
|
||||||
|
"Invalid username or access key",
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn authorize_repo_access(
|
pub async fn authorize_repo_access(
|
||||||
|
|||||||
@ -1,3 +1,5 @@
|
|||||||
|
use crate::ssh::branch_protect::check_branch_protection;
|
||||||
|
use crate::ssh::ref_update::RefUpdate;
|
||||||
use actix_web::{Error, HttpResponse, web};
|
use actix_web::{Error, HttpResponse, web};
|
||||||
use async_stream::stream;
|
use async_stream::stream;
|
||||||
use futures_util::Stream;
|
use futures_util::Stream;
|
||||||
@ -134,7 +136,6 @@ impl GitHttpHandler {
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| actix_web::error::ErrorInternalServerError(e.to_string()))?;
|
.map_err(|e| actix_web::error::ErrorInternalServerError(e.to_string()))?;
|
||||||
|
|
||||||
const PACK_SIG: &[u8] = b"PACK";
|
|
||||||
let mut pre_pack: Vec<u8> = Vec::with_capacity(65536);
|
let mut pre_pack: Vec<u8> = Vec::with_capacity(65536);
|
||||||
|
|
||||||
while let Some(chunk) = stream.next().await {
|
while let Some(chunk) = stream.next().await {
|
||||||
@ -157,10 +158,16 @@ impl GitHttpHandler {
|
|||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(pos) = bytes.windows(4).position(|w| w == PACK_SIG) {
|
if let Some(pos) = bytes.windows(4).position(|w| w == b"0000") {
|
||||||
pre_pack.extend_from_slice(&bytes[..pos]);
|
let end = pos + 4;
|
||||||
|
pre_pack.extend_from_slice(&bytes[..end]);
|
||||||
|
|
||||||
if let Err(msg) = check_branch_protection(&branch_protects, &pre_pack) {
|
let refs = RefUpdate::parse_ref_updates(&pre_pack)
|
||||||
|
.map_err(actix_web::error::ErrorBadRequest)?;
|
||||||
|
if let Some(msg) = refs
|
||||||
|
.iter()
|
||||||
|
.find_map(|r#ref| check_branch_protection(&branch_protects, r#ref))
|
||||||
|
{
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
"branch_protection_violation repo={} repo_id={} message={}",
|
"branch_protection_violation repo={} repo_id={} message={}",
|
||||||
self.repo.repo_name,
|
self.repo.repo_name,
|
||||||
@ -171,7 +178,10 @@ impl GitHttpHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let remaining: ByteStream = Box::pin(stream! {
|
let remaining: ByteStream = Box::pin(stream! {
|
||||||
yield Ok(bytes[pos..].to_vec());
|
yield Ok(pre_pack);
|
||||||
|
if end < bytes.len() {
|
||||||
|
yield Ok(bytes[end..].to_vec());
|
||||||
|
}
|
||||||
while let Some(chunk) = stream.next().await {
|
while let Some(chunk) = stream.next().await {
|
||||||
yield chunk;
|
yield chunk;
|
||||||
}
|
}
|
||||||
@ -267,106 +277,3 @@ fn write_pkt_line(buf: &mut Vec<u8>, data: &[u8]) {
|
|||||||
fn write_flush_pkt(buf: &mut Vec<u8>) {
|
fn write_flush_pkt(buf: &mut Vec<u8>) {
|
||||||
buf.extend_from_slice(b"0000");
|
buf.extend_from_slice(b"0000");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct RefUpdate {
|
|
||||||
old_oid: Option<String>,
|
|
||||||
new_oid: Option<String>,
|
|
||||||
name: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn check_branch_protection(
|
|
||||||
branch_protects: &[repo_branch_protect::Model],
|
|
||||||
pre_pack: &[u8],
|
|
||||||
) -> Result<(), String> {
|
|
||||||
let refs = parse_ref_updates(pre_pack)?;
|
|
||||||
for r#ref in &refs {
|
|
||||||
for protection in branch_protects {
|
|
||||||
// Match exactly or as directory prefix (e.g. "refs/heads/main"
|
|
||||||
// matches "refs/heads/main" and "refs/heads/main/*" but NOT
|
|
||||||
// "refs/heads/main-v2").
|
|
||||||
let matches = r#ref.name == protection.branch
|
|
||||||
|| r#ref.name.starts_with(&format!("{}/", protection.branch));
|
|
||||||
if !matches {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// Check deletion (new_oid is all zeros / 40 zeros)
|
|
||||||
if r#ref.new_oid.as_deref() == Some("0000000000000000000000000000000000000000") {
|
|
||||||
if protection.forbid_deletion {
|
|
||||||
return Err(format!(
|
|
||||||
"Deletion of protected branch '{}' is forbidden",
|
|
||||||
r#ref.name
|
|
||||||
));
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check tag push
|
|
||||||
if r#ref.name.starts_with("refs/tags/") {
|
|
||||||
if protection.forbid_tag_push {
|
|
||||||
return Err(format!(
|
|
||||||
"Tag push to protected branch '{}' is forbidden",
|
|
||||||
r#ref.name
|
|
||||||
));
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check force push: old != new AND old is non-zero (non-fast-forward update)
|
|
||||||
if let (Some(old_oid), Some(new_oid)) =
|
|
||||||
(r#ref.old_oid.as_deref(), r#ref.new_oid.as_deref())
|
|
||||||
{
|
|
||||||
let is_new_branch = old_oid == "0000000000000000000000000000000000000000";
|
|
||||||
if !is_new_branch
|
|
||||||
&& old_oid != new_oid
|
|
||||||
&& r#ref.name.starts_with("refs/heads/")
|
|
||||||
&& protection.forbid_force_push
|
|
||||||
{
|
|
||||||
return Err(format!(
|
|
||||||
"Force push to protected branch '{}' is forbidden",
|
|
||||||
r#ref.name
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check push
|
|
||||||
if protection.forbid_push {
|
|
||||||
return Err(format!(
|
|
||||||
"Push to protected branch '{}' is forbidden",
|
|
||||||
r#ref.name
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn parse_ref_updates(buffer: &[u8]) -> Result<Vec<RefUpdate>, String> {
|
|
||||||
let text = String::from_utf8_lossy(buffer);
|
|
||||||
let mut refs = Vec::new();
|
|
||||||
|
|
||||||
for line in text.lines() {
|
|
||||||
let line = line.trim();
|
|
||||||
if line.is_empty() || line.starts_with('#') || line.starts_with("PACK") {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// Format: "<old-oid> <new-oid> <ref-name>\n"
|
|
||||||
let mut parts = line.split_whitespace();
|
|
||||||
let old_oid = parts.next().map(|s| s.to_string());
|
|
||||||
let new_oid = parts.next().map(|s| s.to_string());
|
|
||||||
let name = parts
|
|
||||||
.next()
|
|
||||||
.unwrap_or("")
|
|
||||||
.trim_start_matches('\0')
|
|
||||||
.to_string();
|
|
||||||
if !name.is_empty() {
|
|
||||||
refs.push(RefUpdate {
|
|
||||||
old_oid,
|
|
||||||
new_oid,
|
|
||||||
name,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(refs)
|
|
||||||
}
|
|
||||||
|
|||||||
@ -1,11 +1,19 @@
|
|||||||
use crate::error::GitError;
|
use crate::error::GitError;
|
||||||
use crate::http::HttpAppState;
|
use crate::http::HttpAppState;
|
||||||
use crate::http::auth::authorize_repo_access;
|
use crate::http::auth::authorize_repo_access;
|
||||||
|
use crate::http::auth::verify_access_token;
|
||||||
use crate::http::handler::is_valid_lfs_oid;
|
use crate::http::handler::is_valid_lfs_oid;
|
||||||
use crate::http::lfs::{BatchRequest, CreateLockRequest, LfsHandler};
|
use crate::http::lfs::{BatchRequest, CreateLockRequest, LfsHandler};
|
||||||
use crate::http::utils::{get_repo_model, hash_access_key};
|
use crate::http::utils::{extract_basic_credentials, get_repo_model};
|
||||||
|
use crate::ssh::authz::SshAuthService;
|
||||||
|
use crate::ssh::push_queue::{
|
||||||
|
PushQueueEvent, PushQueueLease, PushQueueWaitError, wait_for_push_queue_slot,
|
||||||
|
};
|
||||||
use actix_web::{Error, HttpRequest, HttpResponse, web};
|
use actix_web::{Error, HttpRequest, HttpResponse, web};
|
||||||
use models::users::user_token;
|
use argon2::Argon2;
|
||||||
|
use argon2::password_hash::{PasswordHash, PasswordVerifier};
|
||||||
|
use models::repos::repo;
|
||||||
|
use models::users::{user, user_token};
|
||||||
use sea_orm::prelude::*;
|
use sea_orm::prelude::*;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
@ -31,43 +39,16 @@ fn bearer_token(req: &HttpRequest) -> Result<String, Error> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn hash_token(token: &str) -> Result<String, argon2::password_hash::Error> {
|
|
||||||
hash_access_key(token)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Derive the acting user from the authenticated bearer token.
|
/// Derive the acting user from the authenticated bearer token.
|
||||||
async fn user_uid(req: &HttpRequest, db: &db::database::AppDatabase) -> Result<uuid::Uuid, Error> {
|
async fn user_uid(req: &HttpRequest, db: &db::database::AppDatabase) -> Result<uuid::Uuid, Error> {
|
||||||
let auth_header = req
|
if let Ok((username, access_key)) = extract_basic_credentials(req) {
|
||||||
.headers()
|
return verify_access_token(db, &username, &access_key)
|
||||||
.get("authorization")
|
.await
|
||||||
.ok_or_else(|| actix_web::error::ErrorUnauthorized("Missing authorization header"))?;
|
.map(|user| user.uid);
|
||||||
let auth_str = auth_header
|
|
||||||
.to_str()
|
|
||||||
.map_err(|_| actix_web::error::ErrorUnauthorized("Invalid authorization header"))?;
|
|
||||||
let token = auth_str
|
|
||||||
.strip_prefix("Bearer ")
|
|
||||||
.ok_or_else(|| actix_web::error::ErrorUnauthorized("Invalid authorization format"))?;
|
|
||||||
|
|
||||||
let token_hash = hash_token(token)
|
|
||||||
.map_err(|_| actix_web::error::ErrorInternalServerError("Token hash failed"))?;
|
|
||||||
|
|
||||||
let token_model = user_token::Entity::find()
|
|
||||||
.filter(user_token::Column::TokenHash.eq(&token_hash))
|
|
||||||
.filter(user_token::Column::IsRevoked.eq(false))
|
|
||||||
.one(db.reader())
|
|
||||||
.await
|
|
||||||
.map_err(|_| actix_web::error::ErrorUnauthorized("Authentication failed"))?;
|
|
||||||
|
|
||||||
let token_model =
|
|
||||||
token_model.ok_or_else(|| actix_web::error::ErrorUnauthorized("Invalid token"))?;
|
|
||||||
|
|
||||||
if let Some(expires_at) = token_model.expires_at {
|
|
||||||
if expires_at < chrono::Utc::now() {
|
|
||||||
return Err(actix_web::error::ErrorUnauthorized("Token expired"));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(token_model.user)
|
let token = bearer_token(req)?;
|
||||||
|
find_user_by_bearer_token(&token, db).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Store LFS batch-generated token in Redis with TTL.
|
/// Store LFS batch-generated token in Redis with TTL.
|
||||||
@ -140,24 +121,139 @@ async fn validate_lfs_token(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Second: check if it's a regular user access token
|
// Second: check if it's a regular user access token.
|
||||||
let token_hash = hash_token(token)
|
find_user_by_bearer_token(token, db).await
|
||||||
.map_err(|_| actix_web::error::ErrorInternalServerError("Token hash failed"))?;
|
}
|
||||||
let token_model = user_token::Entity::find()
|
|
||||||
.filter(user_token::Column::TokenHash.eq(&token_hash))
|
|
||||||
.filter(user_token::Column::IsRevoked.eq(false))
|
|
||||||
.one(db.reader())
|
|
||||||
.await
|
|
||||||
.map_err(|_| actix_web::error::ErrorUnauthorized("Authentication failed"))?
|
|
||||||
.ok_or_else(|| actix_web::error::ErrorUnauthorized("Invalid token"))?;
|
|
||||||
|
|
||||||
if let Some(expires_at) = token_model.expires_at {
|
async fn find_user_by_bearer_token(
|
||||||
if expires_at < chrono::Utc::now() {
|
token: &str,
|
||||||
return Err(actix_web::error::ErrorUnauthorized("Token expired"));
|
db: &db::database::AppDatabase,
|
||||||
|
) -> Result<uuid::Uuid, Error> {
|
||||||
|
let tokens = user_token::Entity::find()
|
||||||
|
.filter(user_token::Column::IsRevoked.eq(false))
|
||||||
|
.all(db.reader())
|
||||||
|
.await
|
||||||
|
.map_err(|_| actix_web::error::ErrorUnauthorized("Authentication failed"))?;
|
||||||
|
|
||||||
|
for token_model in tokens {
|
||||||
|
if token_model
|
||||||
|
.expires_at
|
||||||
|
.map(|expires_at| expires_at < chrono::Utc::now())
|
||||||
|
.unwrap_or(false)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let Ok(hash) = PasswordHash::new(&token_model.token_hash) else {
|
||||||
|
tracing::warn!(
|
||||||
|
token_id = token_model.id,
|
||||||
|
"invalid stored bearer token hash"
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
if Argon2::default()
|
||||||
|
.verify_password(token.as_bytes(), &hash)
|
||||||
|
.is_ok()
|
||||||
|
{
|
||||||
|
return Ok(token_model.user);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(token_model.user)
|
Err(actix_web::error::ErrorUnauthorized("Invalid token"))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn authorize_user_repo_access(
|
||||||
|
db: &db::database::AppDatabase,
|
||||||
|
user_uid: uuid::Uuid,
|
||||||
|
repo: &repo::Model,
|
||||||
|
is_write: bool,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let user = user::Entity::find()
|
||||||
|
.filter(user::Column::Uid.eq(user_uid))
|
||||||
|
.one(db.reader())
|
||||||
|
.await
|
||||||
|
.map_err(|_| actix_web::error::ErrorUnauthorized("Authentication failed"))?
|
||||||
|
.ok_or_else(|| actix_web::error::ErrorUnauthorized("Invalid token user"))?;
|
||||||
|
|
||||||
|
let authz = SshAuthService::new(db.clone());
|
||||||
|
if authz.check_repo_permission(&user, repo, is_write).await {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(actix_web::error::ErrorForbidden(
|
||||||
|
"No permission for repository",
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn acquire_lfs_write_queue(
|
||||||
|
state: &HttpAppState,
|
||||||
|
repo: &repo::Model,
|
||||||
|
operation: &'static str,
|
||||||
|
) -> Result<PushQueueLease, Error> {
|
||||||
|
match wait_for_push_queue_slot(state.sync.clone(), repo.id, |event, request_id| {
|
||||||
|
let request_id = request_id.to_string();
|
||||||
|
match event {
|
||||||
|
PushQueueEvent::Waiting(position) => {
|
||||||
|
tracing::info!(
|
||||||
|
repo = %repo.repo_name,
|
||||||
|
repo_id = %repo.id,
|
||||||
|
request_id = %request_id,
|
||||||
|
operation = operation,
|
||||||
|
position = position.position,
|
||||||
|
total = position.total,
|
||||||
|
"lfs_write_queue_waiting"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
PushQueueEvent::Acquired => {
|
||||||
|
tracing::info!(
|
||||||
|
repo = %repo.repo_name,
|
||||||
|
repo_id = %repo.id,
|
||||||
|
request_id = %request_id,
|
||||||
|
operation = operation,
|
||||||
|
"lfs_write_queue_acquired"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(lease) => Ok(lease),
|
||||||
|
Err(PushQueueWaitError::Join(e)) => {
|
||||||
|
tracing::error!(
|
||||||
|
error = %e,
|
||||||
|
repo = %repo.repo_name,
|
||||||
|
repo_id = %repo.id,
|
||||||
|
operation = operation,
|
||||||
|
"lfs_write_queue_join_failed"
|
||||||
|
);
|
||||||
|
Err(actix_web::error::ErrorServiceUnavailable(
|
||||||
|
"GitData: LFS write queue is temporarily unavailable. Please retry later.",
|
||||||
|
))
|
||||||
|
}
|
||||||
|
Err(PushQueueWaitError::Lock(e)) => {
|
||||||
|
tracing::error!(
|
||||||
|
error = %e,
|
||||||
|
repo = %repo.repo_name,
|
||||||
|
repo_id = %repo.id,
|
||||||
|
operation = operation,
|
||||||
|
"lfs_write_queue_lock_failed"
|
||||||
|
);
|
||||||
|
Err(actix_web::error::ErrorServiceUnavailable(
|
||||||
|
"GitData: LFS write queue lock failed. Please retry later.",
|
||||||
|
))
|
||||||
|
}
|
||||||
|
Err(PushQueueWaitError::Timeout) => {
|
||||||
|
tracing::warn!(
|
||||||
|
repo = %repo.repo_name,
|
||||||
|
repo_id = %repo.id,
|
||||||
|
operation = operation,
|
||||||
|
"lfs_write_queue_timeout"
|
||||||
|
);
|
||||||
|
Err(actix_web::error::ErrorServiceUnavailable(
|
||||||
|
"GitData: LFS write queue timed out. Please retry in a moment.",
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn lfs_batch(
|
pub async fn lfs_batch(
|
||||||
@ -214,16 +310,20 @@ pub async fn lfs_upload(
|
|||||||
let token = bearer_token(&req)?;
|
let token = bearer_token(&req)?;
|
||||||
|
|
||||||
// Validate token (batch token or user access token) with write permission
|
// Validate token (batch token or user access token) with write permission
|
||||||
let _uid = validate_lfs_token(&token, &state.cache, &state.db, repo.id, "upload").await?;
|
let uid = validate_lfs_token(&token, &state.cache, &state.db, repo.id, "upload").await?;
|
||||||
|
authorize_user_repo_access(&state.db, uid, &repo, true).await?;
|
||||||
|
|
||||||
let handler = LfsHandler::new(PathBuf::from(&repo.storage_path), repo, state.db.clone());
|
let handler = LfsHandler::new(PathBuf::from(&repo.storage_path), repo, state.db.clone());
|
||||||
|
let mut queue_lease = acquire_lfs_write_queue(&state, &handler.model, "upload").await?;
|
||||||
|
|
||||||
match handler.upload_object(&oid, payload).await {
|
let result = match handler.upload_object(&oid, payload).await {
|
||||||
Ok(response) => Ok(response),
|
Ok(response) => Ok(response),
|
||||||
Err(GitError::InvalidOid(_)) => Err(actix_web::error::ErrorBadRequest("Invalid OID")),
|
Err(GitError::InvalidOid(_)) => Err(actix_web::error::ErrorBadRequest("Invalid OID")),
|
||||||
Err(GitError::AuthFailed(_)) => Err(actix_web::error::ErrorUnauthorized("Unauthorized")),
|
Err(GitError::AuthFailed(_)) => Err(actix_web::error::ErrorUnauthorized("Unauthorized")),
|
||||||
Err(_e) => Err(actix_web::error::ErrorInternalServerError("Upload failed")),
|
Err(_e) => Err(actix_web::error::ErrorInternalServerError("Upload failed")),
|
||||||
}
|
};
|
||||||
|
queue_lease.release().await;
|
||||||
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn lfs_download(
|
pub async fn lfs_download(
|
||||||
@ -242,8 +342,8 @@ pub async fn lfs_download(
|
|||||||
// Auth check: private repos require auth; public repos allow anonymous
|
// Auth check: private repos require auth; public repos allow anonymous
|
||||||
if repo.is_private {
|
if repo.is_private {
|
||||||
let token = bearer_token(&req)?;
|
let token = bearer_token(&req)?;
|
||||||
let _uid = validate_lfs_token(&token, &state.cache, &state.db, repo.id, "download").await?;
|
let uid = validate_lfs_token(&token, &state.cache, &state.db, repo.id, "download").await?;
|
||||||
authorize_repo_access(&req, &state.db, &repo, false).await?;
|
authorize_user_repo_access(&state.db, uid, &repo, false).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let handler = LfsHandler::new(PathBuf::from(&repo.storage_path), repo, state.db.clone());
|
let handler = LfsHandler::new(PathBuf::from(&repo.storage_path), repo, state.db.clone());
|
||||||
@ -270,12 +370,15 @@ pub async fn lfs_lock_create(
|
|||||||
let uid = user_uid(&req, &state.db).await?;
|
let uid = user_uid(&req, &state.db).await?;
|
||||||
authorize_repo_access(&req, &state.db, &repo, true).await?;
|
authorize_repo_access(&req, &state.db, &repo, true).await?;
|
||||||
let handler = LfsHandler::new(PathBuf::from(&repo.storage_path), repo, state.db.clone());
|
let handler = LfsHandler::new(PathBuf::from(&repo.storage_path), repo, state.db.clone());
|
||||||
|
let mut queue_lease = acquire_lfs_write_queue(&state, &handler.model, "lock_create").await?;
|
||||||
|
|
||||||
match handler.lock_object(&body.oid, uid).await {
|
let result = match handler.lock_object(&body.oid, uid).await {
|
||||||
Ok(lock) => Ok(HttpResponse::Created().json(lock)),
|
Ok(lock) => Ok(HttpResponse::Created().json(lock)),
|
||||||
Err(GitError::Locked(msg)) => Ok(HttpResponse::Conflict().body(msg)),
|
Err(GitError::Locked(msg)) => Ok(HttpResponse::Conflict().body(msg)),
|
||||||
Err(_e) => Err(actix_web::error::ErrorInternalServerError("Lock failed")),
|
Err(_e) => Err(actix_web::error::ErrorInternalServerError("Lock failed")),
|
||||||
}
|
};
|
||||||
|
queue_lease.release().await;
|
||||||
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn lfs_lock_list(
|
pub async fn lfs_lock_list(
|
||||||
@ -289,7 +392,8 @@ pub async fn lfs_lock_list(
|
|||||||
|
|
||||||
// Auth check: private repos require auth for lock listing
|
// Auth check: private repos require auth for lock listing
|
||||||
if repo.is_private {
|
if repo.is_private {
|
||||||
user_uid(&req, &state.db).await?;
|
let uid = user_uid(&req, &state.db).await?;
|
||||||
|
authorize_user_repo_access(&state.db, uid, &repo, false).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let maybe_oid = query.get("oid").map(|s| s.as_str());
|
let maybe_oid = query.get("oid").map(|s| s.as_str());
|
||||||
@ -313,7 +417,8 @@ pub async fn lfs_lock_get(
|
|||||||
|
|
||||||
// Auth check: private repos require auth for lock viewing
|
// Auth check: private repos require auth for lock viewing
|
||||||
if repo.is_private {
|
if repo.is_private {
|
||||||
user_uid(&req, &state.db).await?;
|
let uid = user_uid(&req, &state.db).await?;
|
||||||
|
authorize_user_repo_access(&state.db, uid, &repo, false).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let handler = LfsHandler::new(PathBuf::from(&repo.storage_path), repo, state.db.clone());
|
let handler = LfsHandler::new(PathBuf::from(&repo.storage_path), repo, state.db.clone());
|
||||||
@ -338,13 +443,16 @@ pub async fn lfs_lock_delete(
|
|||||||
let uid = user_uid(&req, &state.db).await?;
|
let uid = user_uid(&req, &state.db).await?;
|
||||||
authorize_repo_access(&req, &state.db, &repo, true).await?;
|
authorize_repo_access(&req, &state.db, &repo, true).await?;
|
||||||
let handler = LfsHandler::new(PathBuf::from(&repo.storage_path), repo, state.db.clone());
|
let handler = LfsHandler::new(PathBuf::from(&repo.storage_path), repo, state.db.clone());
|
||||||
|
let mut queue_lease = acquire_lfs_write_queue(&state, &handler.model, "lock_delete").await?;
|
||||||
|
|
||||||
match handler.unlock_object(&lock_id, uid).await {
|
let result = match handler.unlock_object(&lock_id, uid).await {
|
||||||
Ok(()) => Ok(HttpResponse::NoContent().finish()),
|
Ok(()) => Ok(HttpResponse::NoContent().finish()),
|
||||||
Err(GitError::PermissionDenied(_)) => Err(actix_web::error::ErrorForbidden("Not allowed")),
|
Err(GitError::PermissionDenied(_)) => Err(actix_web::error::ErrorForbidden("Not allowed")),
|
||||||
Err(GitError::NotFound(_)) => Err(actix_web::error::ErrorNotFound("Lock not found")),
|
Err(GitError::NotFound(_)) => Err(actix_web::error::ErrorNotFound("Lock not found")),
|
||||||
Err(_e) => Err(actix_web::error::ErrorInternalServerError(
|
Err(_e) => Err(actix_web::error::ErrorInternalServerError(
|
||||||
"Lock delete failed",
|
"Lock delete failed",
|
||||||
)),
|
)),
|
||||||
}
|
};
|
||||||
|
queue_lease.release().await;
|
||||||
|
result
|
||||||
}
|
}
|
||||||
|
|||||||
@ -3,6 +3,7 @@ use crate::http::auth::authorize_repo_access;
|
|||||||
use crate::http::handler::GitHttpHandler;
|
use crate::http::handler::GitHttpHandler;
|
||||||
use crate::http::utils::get_repo_model;
|
use crate::http::utils::get_repo_model;
|
||||||
use crate::ssh::RepoReceiveSyncTask;
|
use crate::ssh::RepoReceiveSyncTask;
|
||||||
|
use crate::ssh::push_queue::{PushQueueEvent, PushQueueWaitError, wait_for_push_queue_slot};
|
||||||
use actix_web::{Error, HttpRequest, HttpResponse, web};
|
use actix_web::{Error, HttpRequest, HttpResponse, web};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@ -77,21 +78,88 @@ pub async fn receive_pack(
|
|||||||
let model = get_repo_model(&path_inner.0, &path_inner.1, &state.db).await?;
|
let model = get_repo_model(&path_inner.0, &path_inner.1, &state.db).await?;
|
||||||
authorize_repo_access(&req, &state.db, &model, true).await?;
|
authorize_repo_access(&req, &state.db, &model, true).await?;
|
||||||
|
|
||||||
|
let mut push_queue_lease =
|
||||||
|
match wait_for_push_queue_slot(state.sync.clone(), model.id, |event, request_id| {
|
||||||
|
let request_id = request_id.to_string();
|
||||||
|
let repo_name = model.repo_name.clone();
|
||||||
|
let repo_id = model.id;
|
||||||
|
match event {
|
||||||
|
PushQueueEvent::Waiting(position) => {
|
||||||
|
tracing::info!(
|
||||||
|
repo = %repo_name,
|
||||||
|
repo_id = %repo_id,
|
||||||
|
request_id = %request_id,
|
||||||
|
position = position.position,
|
||||||
|
total = position.total,
|
||||||
|
"http_push_queue_waiting"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
PushQueueEvent::Acquired => {
|
||||||
|
tracing::info!(
|
||||||
|
repo = %repo_name,
|
||||||
|
repo_id = %repo_id,
|
||||||
|
request_id = %request_id,
|
||||||
|
"http_push_queue_acquired"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(lease) => lease,
|
||||||
|
Err(PushQueueWaitError::Join(e)) => {
|
||||||
|
tracing::error!(
|
||||||
|
error = %e,
|
||||||
|
repo = %model.repo_name,
|
||||||
|
repo_id = %model.id,
|
||||||
|
"http_push_queue_join_failed"
|
||||||
|
);
|
||||||
|
return Err(actix_web::error::ErrorServiceUnavailable(
|
||||||
|
"GitData: push queue is temporarily unavailable. Please retry later.",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Err(PushQueueWaitError::Lock(e)) => {
|
||||||
|
tracing::error!(
|
||||||
|
error = %e,
|
||||||
|
repo = %model.repo_name,
|
||||||
|
repo_id = %model.id,
|
||||||
|
"http_push_queue_lock_failed"
|
||||||
|
);
|
||||||
|
return Err(actix_web::error::ErrorServiceUnavailable(
|
||||||
|
"GitData: push queue lock failed. Please retry later.",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Err(PushQueueWaitError::Timeout) => {
|
||||||
|
tracing::info!(
|
||||||
|
repo = %model.repo_name,
|
||||||
|
repo_id = %model.id,
|
||||||
|
"http_push_queue_timeout"
|
||||||
|
);
|
||||||
|
return Ok(HttpResponse::ServiceUnavailable()
|
||||||
|
.insert_header(("Retry-After", "5"))
|
||||||
|
.content_type("text/plain; charset=utf-8")
|
||||||
|
.body("GitData: push queue timed out. Please retry in a moment.\n"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let storage_path = PathBuf::from(&model.storage_path);
|
let storage_path = PathBuf::from(&model.storage_path);
|
||||||
let handler = GitHttpHandler::new(storage_path, model.clone(), state.db.clone());
|
let handler = GitHttpHandler::new(storage_path, model.clone(), state.db.clone());
|
||||||
let result = handler.receive_pack(payload).await;
|
let result = handler.receive_pack(payload).await;
|
||||||
|
push_queue_lease.release().await;
|
||||||
|
|
||||||
let _ = tokio::spawn({
|
if result.is_ok() {
|
||||||
let sync = state.sync.clone();
|
let _ = tokio::spawn({
|
||||||
let repo_uid = model.id;
|
let sync = state.sync.clone();
|
||||||
async move {
|
let repo_uid = model.id;
|
||||||
let _ = timeout(
|
async move {
|
||||||
Duration::from_secs(5),
|
let _ = timeout(
|
||||||
sync.send(RepoReceiveSyncTask { repo_uid }),
|
Duration::from_secs(5),
|
||||||
)
|
sync.send(RepoReceiveSyncTask { repo_uid }),
|
||||||
.await;
|
)
|
||||||
}
|
.await;
|
||||||
});
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|||||||
@ -23,7 +23,7 @@ pub fn check_branch_protection(
|
|||||||
if r#ref.new_oid == "0000000000000000000000000000000000000000" {
|
if r#ref.new_oid == "0000000000000000000000000000000000000000" {
|
||||||
if protection.forbid_deletion {
|
if protection.forbid_deletion {
|
||||||
return Some(format!(
|
return Some(format!(
|
||||||
"Deletion of protected branch '{}' is forbidden",
|
"GitData: 🛡️ protected branch rejected. Deletion of '{}' is forbidden. Create a PR or ask a maintainer to update branch protection.",
|
||||||
r#ref.name
|
r#ref.name
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
@ -34,7 +34,7 @@ pub fn check_branch_protection(
|
|||||||
if r#ref.name.starts_with("refs/tags/") {
|
if r#ref.name.starts_with("refs/tags/") {
|
||||||
if protection.forbid_tag_push {
|
if protection.forbid_tag_push {
|
||||||
return Some(format!(
|
return Some(format!(
|
||||||
"Tag push to protected branch '{}' is forbidden",
|
"GitData: 🛡️ protected ref rejected. Tag push to '{}' is forbidden by branch protection.",
|
||||||
r#ref.name
|
r#ref.name
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
@ -49,7 +49,7 @@ pub fn check_branch_protection(
|
|||||||
&& protection.forbid_force_push
|
&& protection.forbid_force_push
|
||||||
{
|
{
|
||||||
return Some(format!(
|
return Some(format!(
|
||||||
"Force push to protected branch '{}' is forbidden",
|
"GitData: 🛡️ protected branch rejected. Force push to '{}' is forbidden. Create a PR instead of rewriting protected history.",
|
||||||
r#ref.name
|
r#ref.name
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
@ -57,7 +57,7 @@ pub fn check_branch_protection(
|
|||||||
// Check push
|
// Check push
|
||||||
if protection.forbid_push {
|
if protection.forbid_push {
|
||||||
return Some(format!(
|
return Some(format!(
|
||||||
"Push to protected branch '{}' is forbidden",
|
"GitData: 🛡️ protected branch rejected. Direct push to '{}' is forbidden. Please push to a feature branch and create a PR.",
|
||||||
r#ref.name
|
r#ref.name
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -5,6 +5,7 @@ use crate::ssh::authz::SshAuthService;
|
|||||||
use crate::ssh::branch_protect::check_branch_protection;
|
use crate::ssh::branch_protect::check_branch_protection;
|
||||||
use crate::ssh::forward::forward;
|
use crate::ssh::forward::forward;
|
||||||
use crate::ssh::git_service::{GitService, build_git_command, parse_git_command, parse_repo_path};
|
use crate::ssh::git_service::{GitService, build_git_command, parse_git_command, parse_repo_path};
|
||||||
|
use crate::ssh::push_queue::{PushQueueEvent, PushQueueWaitError, wait_for_push_queue_slot};
|
||||||
use crate::ssh::ref_update::RefUpdate;
|
use crate::ssh::ref_update::RefUpdate;
|
||||||
use db::cache::AppCache;
|
use db::cache::AppCache;
|
||||||
use db::database::AppDatabase;
|
use db::database::AppDatabase;
|
||||||
@ -21,13 +22,15 @@ use std::io;
|
|||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::process::Stdio;
|
use std::process::Stdio;
|
||||||
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio_util::bytes::Bytes;
|
use tokio_util::bytes::Bytes;
|
||||||
|
|
||||||
const PRE_PACK_LIMIT: usize = 1_048_576;
|
const PRE_PACK_LIMIT: usize = 1_048_576;
|
||||||
|
const ZERO_OID: &str = "0000000000000000000000000000000000000000";
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
use tokio::process::ChildStdin;
|
use tokio::process::ChildStdin;
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::{Mutex, mpsc::Sender};
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
pub struct SSHandle {
|
pub struct SSHandle {
|
||||||
pub repo: Option<PathBuf>,
|
pub repo: Option<PathBuf>,
|
||||||
@ -39,6 +42,7 @@ pub struct SSHandle {
|
|||||||
pub auth: SshAuthService,
|
pub auth: SshAuthService,
|
||||||
pub buffer: HashMap<ChannelId, Vec<u8>>,
|
pub buffer: HashMap<ChannelId, Vec<u8>>,
|
||||||
pub branch: HashMap<ChannelId, Vec<RefUpdate>>,
|
pub branch: HashMap<ChannelId, Vec<RefUpdate>>,
|
||||||
|
pub post_receive_refs: HashMap<ChannelId, Arc<Mutex<Vec<RefUpdate>>>>,
|
||||||
pub service: Option<GitService>,
|
pub service: Option<GitService>,
|
||||||
pub cache: AppCache,
|
pub cache: AppCache,
|
||||||
pub sync: ReceiveSyncService,
|
pub sync: ReceiveSyncService,
|
||||||
@ -70,6 +74,7 @@ impl SSHandle {
|
|||||||
auth,
|
auth,
|
||||||
buffer: HashMap::new(),
|
buffer: HashMap::new(),
|
||||||
branch: HashMap::new(),
|
branch: HashMap::new(),
|
||||||
|
post_receive_refs: HashMap::new(),
|
||||||
service: None,
|
service: None,
|
||||||
cache,
|
cache,
|
||||||
sync,
|
sync,
|
||||||
@ -94,8 +99,37 @@ impl SSHandle {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
self.eof.remove(&channel_id);
|
self.eof.remove(&channel_id);
|
||||||
|
self.post_receive_refs.remove(&channel_id);
|
||||||
self.upload_pack_eof_sent.remove(&channel_id);
|
self.upload_pack_eof_sent.remove(&channel_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn format_post_receive_hints(
|
||||||
|
namespace: &str,
|
||||||
|
repo: &repo::Model,
|
||||||
|
refs: &[RefUpdate],
|
||||||
|
queue: Option<(usize, usize)>,
|
||||||
|
) -> String {
|
||||||
|
let mut lines = Vec::new();
|
||||||
|
for r#ref in refs {
|
||||||
|
if r#ref.old_oid == ZERO_OID && r#ref.name.starts_with("refs/heads/") {
|
||||||
|
let branch = r#ref.name.trim_start_matches("refs/heads/");
|
||||||
|
lines.push(format!(
|
||||||
|
"remote: GitData: 🌱 new branch '{}' pushed. Create a PR: /{}/repo/{}/pulls/new?head={}\r\n",
|
||||||
|
branch,
|
||||||
|
namespace,
|
||||||
|
repo.repo_name,
|
||||||
|
branch
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some((position, total)) = queue {
|
||||||
|
lines.push(format!(
|
||||||
|
"remote: GitData: ⏳ repository sync queued ({}/{}). Metadata, webhooks and search indexes will update shortly.\r\n",
|
||||||
|
position, total
|
||||||
|
));
|
||||||
|
}
|
||||||
|
lines.concat()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for SSHandle {
|
impl Drop for SSHandle {
|
||||||
@ -489,10 +523,16 @@ impl russh::server::Handler for SSHandle {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if let Some(refs_for_hints) = self.post_receive_refs.get(&channel) {
|
||||||
|
*refs_for_hints.lock().await = refs.clone();
|
||||||
|
}
|
||||||
self.branch.insert(channel, refs);
|
self.branch.insert(channel, refs);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::warn!("ref_update_parse_error error={:?}", e);
|
tracing::warn!("ref_update_parse_error error={:?}", e);
|
||||||
|
if let Some(refs_for_hints) = self.post_receive_refs.get(&channel) {
|
||||||
|
refs_for_hints.lock().await.clear();
|
||||||
|
}
|
||||||
self.branch.insert(channel, vec![]);
|
self.branch.insert(channel, vec![]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -606,6 +646,7 @@ impl russh::server::Handler for SSHandle {
|
|||||||
return Err(russh::Error::Disconnect);
|
return Err(russh::Error::Disconnect);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
let namespace = owner.to_string();
|
||||||
let repo = repo.strip_suffix(".git").unwrap_or(repo).to_string();
|
let repo = repo.strip_suffix(".git").unwrap_or(repo).to_string();
|
||||||
|
|
||||||
let repo = match self.auth.find_repo(owner, &repo).await {
|
let repo = match self.auth.find_repo(owner, &repo).await {
|
||||||
@ -660,6 +701,98 @@ impl russh::server::Handler for SSHandle {
|
|||||||
is_write
|
is_write
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let mut push_queue_lease = if is_write {
|
||||||
|
let repo_id = repo.id;
|
||||||
|
let queue_result =
|
||||||
|
wait_for_push_queue_slot(self.sync.clone(), repo_id, |event, request_id| {
|
||||||
|
let request_id = request_id.to_string();
|
||||||
|
match event {
|
||||||
|
PushQueueEvent::Waiting(position) => {
|
||||||
|
let msg = format!(
|
||||||
|
"remote: GitData: ⏳ another push is running for this repository. Queued {}/{}.\r\n",
|
||||||
|
position.position, position.total
|
||||||
|
);
|
||||||
|
let _ = session.extended_data(
|
||||||
|
channel_id,
|
||||||
|
1,
|
||||||
|
Bytes::copy_from_slice(msg.as_bytes()),
|
||||||
|
);
|
||||||
|
let _ = session.flush();
|
||||||
|
tracing::info!(
|
||||||
|
repo_id = %repo_id,
|
||||||
|
request_id = %request_id,
|
||||||
|
position = position.position,
|
||||||
|
total = position.total,
|
||||||
|
"push_queue_waiting"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
PushQueueEvent::Acquired => {
|
||||||
|
let msg = "remote: GitData: 🚀 push queue slot acquired. Processing now.\r\n";
|
||||||
|
let _ = session.extended_data(
|
||||||
|
channel_id,
|
||||||
|
1,
|
||||||
|
Bytes::copy_from_slice(msg.as_bytes()),
|
||||||
|
);
|
||||||
|
let _ = session.flush();
|
||||||
|
tracing::info!(
|
||||||
|
repo_id = %repo_id,
|
||||||
|
request_id = %request_id,
|
||||||
|
"push_queue_acquired"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match queue_result {
|
||||||
|
Ok(lease) => Some(lease),
|
||||||
|
Err(error) => {
|
||||||
|
match &error {
|
||||||
|
PushQueueWaitError::Join(e) => {
|
||||||
|
tracing::error!(error = %e, repo = %repo.repo_name, "push_queue_join_failed");
|
||||||
|
let msg = "remote: GitData: ⛔ push queue is temporarily unavailable. Please retry later.\r\n";
|
||||||
|
let _ = session.extended_data(
|
||||||
|
channel_id,
|
||||||
|
1,
|
||||||
|
Bytes::copy_from_slice(msg.as_bytes()),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
PushQueueWaitError::Lock(e) => {
|
||||||
|
tracing::error!(error = %e, repo_id = %repo.id, "push_queue_lock_failed");
|
||||||
|
let msg = "remote: GitData: ⛔ push queue lock failed. Please retry later.\r\n";
|
||||||
|
let _ = session.extended_data(
|
||||||
|
channel_id,
|
||||||
|
1,
|
||||||
|
Bytes::copy_from_slice(msg.as_bytes()),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
PushQueueWaitError::Timeout => {
|
||||||
|
tracing::warn!(repo_id = %repo.id, "push_queue_timeout");
|
||||||
|
let msg = "remote: GitData: ⏱️ push queue timed out. Please retry in a moment.\r\n";
|
||||||
|
let _ = session.extended_data(
|
||||||
|
channel_id,
|
||||||
|
1,
|
||||||
|
Bytes::copy_from_slice(msg.as_bytes()),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let _ = session.channel_failure(channel_id);
|
||||||
|
let _ = session.close(channel_id);
|
||||||
|
self.cleanup_channel(channel_id);
|
||||||
|
return if matches!(error, PushQueueWaitError::Timeout) {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(russh::Error::IO(io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
error.to_string(),
|
||||||
|
)))
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
let repo_path = PathBuf::from(&repo.storage_path);
|
let repo_path = PathBuf::from(&repo.storage_path);
|
||||||
if !repo_path.exists() {
|
if !repo_path.exists() {
|
||||||
tracing::error!("repo_path_not_found path={}", repo.storage_path);
|
tracing::error!("repo_path_not_found path={}", repo.storage_path);
|
||||||
@ -683,6 +816,9 @@ impl russh::server::Handler for SSHandle {
|
|||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("process_spawn_failed error={}", e);
|
tracing::error!("process_spawn_failed error={}", e);
|
||||||
|
if let Some(lease) = &mut push_queue_lease {
|
||||||
|
lease.release().await;
|
||||||
|
}
|
||||||
let _ = session.channel_failure(channel_id);
|
let _ = session.channel_failure(channel_id);
|
||||||
self.cleanup_channel(channel_id);
|
self.cleanup_channel(channel_id);
|
||||||
return Err(russh::Error::IO(e));
|
return Err(russh::Error::IO(e));
|
||||||
@ -693,6 +829,9 @@ impl russh::server::Handler for SSHandle {
|
|||||||
Some(s) => s,
|
Some(s) => s,
|
||||||
None => {
|
None => {
|
||||||
tracing::error!("stdin pipe unavailable for channel={:?}", channel_id);
|
tracing::error!("stdin pipe unavailable for channel={:?}", channel_id);
|
||||||
|
if let Some(lease) = &mut push_queue_lease {
|
||||||
|
lease.release().await;
|
||||||
|
}
|
||||||
let _ = session_handle.channel_failure(channel_id).await;
|
let _ = session_handle.channel_failure(channel_id).await;
|
||||||
return Err(russh::Error::IO(io::Error::new(
|
return Err(russh::Error::IO(io::Error::new(
|
||||||
io::ErrorKind::Other,
|
io::ErrorKind::Other,
|
||||||
@ -705,6 +844,9 @@ impl russh::server::Handler for SSHandle {
|
|||||||
Some(s) => s,
|
Some(s) => s,
|
||||||
None => {
|
None => {
|
||||||
tracing::error!("stdout pipe unavailable for channel={:?}", channel_id);
|
tracing::error!("stdout pipe unavailable for channel={:?}", channel_id);
|
||||||
|
if let Some(lease) = &mut push_queue_lease {
|
||||||
|
lease.release().await;
|
||||||
|
}
|
||||||
return Err(russh::Error::IO(io::Error::new(
|
return Err(russh::Error::IO(io::Error::new(
|
||||||
io::ErrorKind::Other,
|
io::ErrorKind::Other,
|
||||||
"stdout unavailable",
|
"stdout unavailable",
|
||||||
@ -715,6 +857,9 @@ impl russh::server::Handler for SSHandle {
|
|||||||
Some(s) => s,
|
Some(s) => s,
|
||||||
None => {
|
None => {
|
||||||
tracing::error!("stderr pipe unavailable for channel={:?}", channel_id);
|
tracing::error!("stderr pipe unavailable for channel={:?}", channel_id);
|
||||||
|
if let Some(lease) = &mut push_queue_lease {
|
||||||
|
lease.release().await;
|
||||||
|
}
|
||||||
return Err(russh::Error::IO(io::Error::new(
|
return Err(russh::Error::IO(io::Error::new(
|
||||||
io::ErrorKind::Other,
|
io::ErrorKind::Other,
|
||||||
"stderr unavailable",
|
"stderr unavailable",
|
||||||
@ -724,9 +869,15 @@ impl russh::server::Handler for SSHandle {
|
|||||||
|
|
||||||
let (eof_tx, mut eof_rx) = tokio::sync::mpsc::channel::<bool>(10);
|
let (eof_tx, mut eof_rx) = tokio::sync::mpsc::channel::<bool>(10);
|
||||||
self.eof.insert(channel_id, eof_tx);
|
self.eof.insert(channel_id, eof_tx);
|
||||||
|
let refs_for_hints = Arc::new(Mutex::new(Vec::new()));
|
||||||
|
self.post_receive_refs
|
||||||
|
.insert(channel_id, refs_for_hints.clone());
|
||||||
let repo_uid = repo.id;
|
let repo_uid = repo.id;
|
||||||
|
let repo_for_hints = repo.clone();
|
||||||
|
let namespace_for_hints = namespace.clone();
|
||||||
let should_sync = service == GitService::ReceivePack;
|
let should_sync = service == GitService::ReceivePack;
|
||||||
let sync = self.sync.clone();
|
let sync = self.sync.clone();
|
||||||
|
let mut push_queue_lease = push_queue_lease;
|
||||||
|
|
||||||
let fut = async move {
|
let fut = async move {
|
||||||
tracing::info!(channel = ?channel_id, "git_task_started");
|
tracing::info!(channel = ?channel_id, "git_task_started");
|
||||||
@ -753,11 +904,23 @@ impl russh::server::Handler for SSHandle {
|
|||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
result = shell.wait() => {
|
result = shell.wait() => {
|
||||||
let status = result?;
|
let status = match result {
|
||||||
|
Ok(status) => status,
|
||||||
|
Err(e) => {
|
||||||
|
if let Some(lease) = &mut push_queue_lease {
|
||||||
|
lease.release().await;
|
||||||
|
}
|
||||||
|
return Err(russh::Error::IO(e));
|
||||||
|
}
|
||||||
|
};
|
||||||
let status_code = status.code().unwrap_or(128) as u32;
|
let status_code = status.code().unwrap_or(128) as u32;
|
||||||
|
|
||||||
tracing::info!("git_process_exited channel={:?} status={}", channel_id, status_code);
|
tracing::info!("git_process_exited channel={:?} status={}", channel_id, status_code);
|
||||||
|
|
||||||
|
if let Some(lease) = &mut push_queue_lease {
|
||||||
|
lease.release().await;
|
||||||
|
}
|
||||||
|
|
||||||
if !stdout_done || !stderr_done {
|
if !stdout_done || !stderr_done {
|
||||||
let _ = tokio::time::timeout(Duration::from_millis(100), async {
|
let _ = tokio::time::timeout(Duration::from_millis(100), async {
|
||||||
tokio::join!(
|
tokio::join!(
|
||||||
@ -775,11 +938,20 @@ impl russh::server::Handler for SSHandle {
|
|||||||
}).await;
|
}).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
if should_sync {
|
if should_sync && status_code == 0 {
|
||||||
let sync = sync.clone();
|
let queue = sync.send(RepoReceiveSyncTask { repo_uid }).await;
|
||||||
tokio::spawn(async move {
|
let refs_for_hints = refs_for_hints.lock().await.clone();
|
||||||
sync.send(RepoReceiveSyncTask { repo_uid }).await
|
let msg = SSHandle::format_post_receive_hints(
|
||||||
});
|
&namespace_for_hints,
|
||||||
|
&repo_for_hints,
|
||||||
|
&refs_for_hints,
|
||||||
|
queue,
|
||||||
|
);
|
||||||
|
if !msg.is_empty() {
|
||||||
|
let _ = session_handle
|
||||||
|
.extended_data(channel_id, 1, Bytes::copy_from_slice(msg.as_bytes()))
|
||||||
|
.await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let _ = session_handle.exit_status_request(channel_id, status_code).await;
|
let _ = session_handle.exit_status_request(channel_id, status_code).await;
|
||||||
|
|||||||
@ -1,12 +1,14 @@
|
|||||||
use crate::error::GitError;
|
use crate::error::GitError;
|
||||||
use crate::hook::pool::types::{HookTask, TaskType};
|
use crate::hook::pool::types::{HookTask, TaskType};
|
||||||
use crate::http::utils::hash_access_key;
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
|
use argon2::Argon2;
|
||||||
|
use argon2::password_hash::{PasswordHash, PasswordVerifier};
|
||||||
use config::AppConfig;
|
use config::AppConfig;
|
||||||
use db::cache::AppCache;
|
use db::cache::AppCache;
|
||||||
use db::database::AppDatabase;
|
use db::database::AppDatabase;
|
||||||
use deadpool_redis::cluster::Pool as RedisPool;
|
use deadpool_redis::cluster::Pool as RedisPool;
|
||||||
use models::users::{user, user_token};
|
use models::users::{user, user_token};
|
||||||
|
use redis::AsyncCommands;
|
||||||
use russh::keys::PrivateKey;
|
use russh::keys::PrivateKey;
|
||||||
use russh::server::Server;
|
use russh::server::Server;
|
||||||
use russh::{MethodKind, MethodSet, SshId, server::Config};
|
use russh::{MethodKind, MethodSet, SshId, server::Config};
|
||||||
@ -20,6 +22,7 @@ pub mod branch_protect;
|
|||||||
pub mod forward;
|
pub mod forward;
|
||||||
pub mod git_service;
|
pub mod git_service;
|
||||||
pub mod handle;
|
pub mod handle;
|
||||||
|
pub mod push_queue;
|
||||||
pub mod rate_limit;
|
pub mod rate_limit;
|
||||||
pub mod ref_update;
|
pub mod ref_update;
|
||||||
pub mod server;
|
pub mod server;
|
||||||
@ -194,7 +197,159 @@ impl ReceiveSyncService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Enqueue a sync task. Fire-and-forget — logs errors but does not block.
|
/// Enqueue a sync task. Fire-and-forget — logs errors but does not block.
|
||||||
pub async fn send(&self, task: RepoReceiveSyncTask) {
|
pub async fn queue_position(&self, repo_uid: uuid::Uuid) -> Option<(usize, usize)> {
|
||||||
|
let queue_key = format!("{}:sync", self.redis_prefix);
|
||||||
|
let work_key = format!("{}:work", queue_key);
|
||||||
|
let redis = self.pool.get().await.ok()?;
|
||||||
|
let mut conn: deadpool_redis::cluster::Connection = redis;
|
||||||
|
let queue_items: Vec<String> = conn.lrange(&queue_key, 0, -1).await.ok()?;
|
||||||
|
let work_items: Vec<String> = conn.lrange(&work_key, 0, -1).await.unwrap_or_default();
|
||||||
|
let repo_id = repo_uid.to_string();
|
||||||
|
let queued_before = queue_items
|
||||||
|
.iter()
|
||||||
|
.rev()
|
||||||
|
.take_while(|item| {
|
||||||
|
serde_json::from_str::<HookTask>(item)
|
||||||
|
.map(|task| task.repo_id != repo_id)
|
||||||
|
.unwrap_or(true)
|
||||||
|
})
|
||||||
|
.count();
|
||||||
|
let total = work_items.len() + queue_items.len() + 1;
|
||||||
|
Some((work_items.len() + queued_before + 1, total))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn push_queue_keys(repo_uid: uuid::Uuid) -> (String, String) {
|
||||||
|
let hash_tag = format!("{{push:{}}}", repo_uid);
|
||||||
|
(
|
||||||
|
format!("git:{}:queue", hash_tag),
|
||||||
|
format!("git:{}:lock", hash_tag),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn join_push_queue(
|
||||||
|
&self,
|
||||||
|
repo_uid: uuid::Uuid,
|
||||||
|
request_id: &str,
|
||||||
|
) -> redis::RedisResult<()> {
|
||||||
|
let (queue_key, _) = Self::push_queue_keys(repo_uid);
|
||||||
|
let redis = self.pool.get().await.map_err(|e| {
|
||||||
|
redis::RedisError::from((
|
||||||
|
redis::ErrorKind::Io,
|
||||||
|
"failed to get Redis connection",
|
||||||
|
e.to_string(),
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
let mut conn: deadpool_redis::cluster::Connection = redis;
|
||||||
|
redis::cmd("RPUSH")
|
||||||
|
.arg(&queue_key)
|
||||||
|
.arg(request_id)
|
||||||
|
.query_async::<()>(&mut conn)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn push_queue_position(
|
||||||
|
&self,
|
||||||
|
repo_uid: uuid::Uuid,
|
||||||
|
request_id: &str,
|
||||||
|
) -> Option<(usize, usize)> {
|
||||||
|
let (queue_key, _) = Self::push_queue_keys(repo_uid);
|
||||||
|
let redis = self.pool.get().await.ok()?;
|
||||||
|
let mut conn: deadpool_redis::cluster::Connection = redis;
|
||||||
|
let queue_items: Vec<String> = conn.lrange(&queue_key, 0, -1).await.ok()?;
|
||||||
|
let position = queue_items.iter().position(|item| item == request_id)? + 1;
|
||||||
|
Some((position, queue_items.len()))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn try_acquire_push_lock(
|
||||||
|
&self,
|
||||||
|
repo_uid: uuid::Uuid,
|
||||||
|
request_id: &str,
|
||||||
|
ttl_secs: usize,
|
||||||
|
) -> redis::RedisResult<bool> {
|
||||||
|
let (_, lock_key) = Self::push_queue_keys(repo_uid);
|
||||||
|
let redis = self.pool.get().await.map_err(|e| {
|
||||||
|
redis::RedisError::from((
|
||||||
|
redis::ErrorKind::Io,
|
||||||
|
"failed to get Redis connection",
|
||||||
|
e.to_string(),
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
let mut conn: deadpool_redis::cluster::Connection = redis;
|
||||||
|
let acquired: Option<String> = redis::cmd("SET")
|
||||||
|
.arg(&lock_key)
|
||||||
|
.arg(request_id)
|
||||||
|
.arg("NX")
|
||||||
|
.arg("EX")
|
||||||
|
.arg(ttl_secs)
|
||||||
|
.query_async(&mut conn)
|
||||||
|
.await?;
|
||||||
|
Ok(acquired.is_some())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn release_push_queue(&self, repo_uid: uuid::Uuid, request_id: &str) {
|
||||||
|
let (queue_key, lock_key) = Self::push_queue_keys(repo_uid);
|
||||||
|
let redis = match self.pool.get().await {
|
||||||
|
Ok(c) => c,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error = %e, repo_id = %repo_uid, "push_queue_release_redis_connection_failed");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let mut conn: deadpool_redis::cluster::Connection = redis;
|
||||||
|
let script = redis::Script::new(
|
||||||
|
r#"
|
||||||
|
redis.call("LREM", KEYS[1], 0, ARGV[1])
|
||||||
|
if redis.call("GET", KEYS[2]) == ARGV[1] then
|
||||||
|
redis.call("DEL", KEYS[2])
|
||||||
|
end
|
||||||
|
return 1
|
||||||
|
"#,
|
||||||
|
);
|
||||||
|
if let Err(e) = script
|
||||||
|
.key(&queue_key)
|
||||||
|
.key(&lock_key)
|
||||||
|
.arg(request_id)
|
||||||
|
.invoke_async::<()>(&mut conn)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::warn!(error = %e, repo_id = %repo_uid, "push_queue_release_failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn refresh_push_lock(
|
||||||
|
&self,
|
||||||
|
repo_uid: uuid::Uuid,
|
||||||
|
request_id: &str,
|
||||||
|
ttl_secs: usize,
|
||||||
|
) -> redis::RedisResult<bool> {
|
||||||
|
let (_, lock_key) = Self::push_queue_keys(repo_uid);
|
||||||
|
let redis = self.pool.get().await.map_err(|e| {
|
||||||
|
redis::RedisError::from((
|
||||||
|
redis::ErrorKind::Io,
|
||||||
|
"failed to get Redis connection",
|
||||||
|
e.to_string(),
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
let mut conn: deadpool_redis::cluster::Connection = redis;
|
||||||
|
let refreshed: i32 = redis::Script::new(
|
||||||
|
r#"
|
||||||
|
if redis.call("GET", KEYS[1]) == ARGV[1] then
|
||||||
|
redis.call("EXPIRE", KEYS[1], ARGV[2])
|
||||||
|
return 1
|
||||||
|
end
|
||||||
|
return 0
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.key(&lock_key)
|
||||||
|
.arg(request_id)
|
||||||
|
.arg(ttl_secs)
|
||||||
|
.invoke_async(&mut conn)
|
||||||
|
.await?;
|
||||||
|
Ok(refreshed == 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn send(&self, task: RepoReceiveSyncTask) -> Option<(usize, usize)> {
|
||||||
|
let position = self.queue_position(task.repo_uid).await;
|
||||||
let hook_task = HookTask {
|
let hook_task = HookTask {
|
||||||
id: uuid::Uuid::new_v4().to_string(),
|
id: uuid::Uuid::new_v4().to_string(),
|
||||||
repo_id: task.repo_uid.to_string(),
|
repo_id: task.repo_uid.to_string(),
|
||||||
@ -210,7 +365,7 @@ impl ReceiveSyncService {
|
|||||||
Ok(p) => p,
|
Ok(p) => p,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("failed to serialize hook task: {}", e);
|
tracing::error!("failed to serialize hook task: {}", e);
|
||||||
return;
|
return position;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -218,7 +373,7 @@ impl ReceiveSyncService {
|
|||||||
Ok(seq) => {
|
Ok(seq) => {
|
||||||
tracing::info!(repo_id = %task.repo_uid, seq = seq, "hook task queued to NATS");
|
tracing::info!(repo_id = %task.repo_uid, seq = seq, "hook task queued to NATS");
|
||||||
metrics::counter!("hook_task_queued_total", "backend" => "nats").increment(1);
|
metrics::counter!("hook_task_queued_total", "backend" => "nats").increment(1);
|
||||||
return;
|
return position;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::warn!(error = %e, "NATS publish failed, falling back to Redis");
|
tracing::warn!(error = %e, "NATS publish failed, falling back to Redis");
|
||||||
@ -231,7 +386,7 @@ impl ReceiveSyncService {
|
|||||||
Ok(j) => j,
|
Ok(j) => j,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("failed to serialize hook task: {}", e);
|
tracing::error!("failed to serialize hook task: {}", e);
|
||||||
return;
|
return position;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -241,7 +396,7 @@ impl ReceiveSyncService {
|
|||||||
Ok(c) => c,
|
Ok(c) => c,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("failed to get Redis connection: {}", e);
|
tracing::error!("failed to get Redis connection: {}", e);
|
||||||
return;
|
return position;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -261,6 +416,7 @@ impl ReceiveSyncService {
|
|||||||
tracing::info!(repo_id = %task.repo_uid, "hook task queued to Redis");
|
tracing::info!(repo_id = %task.repo_uid, "hook task queued to Redis");
|
||||||
metrics::counter!("hook_task_queued_total", "backend" => "redis").increment(1);
|
metrics::counter!("hook_task_queued_total", "backend" => "redis").increment(1);
|
||||||
}
|
}
|
||||||
|
position
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -281,40 +437,44 @@ impl SshTokenService {
|
|||||||
Self { db }
|
Self { db }
|
||||||
}
|
}
|
||||||
|
|
||||||
fn hash_token(token: &str) -> Result<String, argon2::password_hash::Error> {
|
|
||||||
hash_access_key(token)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn find_user_by_token(&self, token: &str) -> Result<Option<user::Model>, GitError> {
|
pub async fn find_user_by_token(&self, token: &str) -> Result<Option<user::Model>, GitError> {
|
||||||
let token_hash = Self::hash_token(token)
|
let token_models = user_token::Entity::find()
|
||||||
.map_err(|e| GitError::Internal(format!("Token hash failed: {}", e)))?;
|
|
||||||
|
|
||||||
let token_model = user_token::Entity::find()
|
|
||||||
.filter(user_token::Column::TokenHash.eq(&token_hash))
|
|
||||||
.filter(user_token::Column::IsRevoked.eq(false))
|
.filter(user_token::Column::IsRevoked.eq(false))
|
||||||
.one(self.db.reader())
|
.all(self.db.reader())
|
||||||
.await
|
.await
|
||||||
.map_err(|e| GitError::Internal(e.to_string()))?;
|
.map_err(|e| GitError::Internal(e.to_string()))?;
|
||||||
|
|
||||||
let token_model = match token_model {
|
for token_model in token_models {
|
||||||
Some(t) => t,
|
if token_model
|
||||||
None => return Ok(None),
|
.expires_at
|
||||||
};
|
.map(|expires_at| expires_at < chrono::Utc::now())
|
||||||
|
.unwrap_or(false)
|
||||||
// Check expiry
|
{
|
||||||
if let Some(expires_at) = token_model.expires_at {
|
continue;
|
||||||
if expires_at < chrono::Utc::now() {
|
|
||||||
return Ok(None);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let Ok(hash) = PasswordHash::new(&token_model.token_hash) else {
|
||||||
|
tracing::warn!(token_id = token_model.id, "invalid stored SSH token hash");
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
if Argon2::default()
|
||||||
|
.verify_password(token.as_bytes(), &hash)
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let user_model = user::Entity::find()
|
||||||
|
.filter(user::Column::Uid.eq(token_model.user))
|
||||||
|
.one(self.db.reader())
|
||||||
|
.await
|
||||||
|
.map_err(|e| GitError::Internal(e.to_string()))?;
|
||||||
|
|
||||||
|
return Ok(user_model);
|
||||||
}
|
}
|
||||||
|
|
||||||
let user_model = user::Entity::find()
|
Ok(None)
|
||||||
.filter(user::Column::Uid.eq(token_model.user))
|
|
||||||
.one(self.db.reader())
|
|
||||||
.await
|
|
||||||
.map_err(|e| GitError::Internal(e.to_string()))?;
|
|
||||||
|
|
||||||
Ok(user_model)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
186
libs/git/ssh/push_queue.rs
Normal file
186
libs/git/ssh/push_queue.rs
Normal file
@ -0,0 +1,186 @@
|
|||||||
|
use crate::ssh::ReceiveSyncService;
|
||||||
|
use std::fmt;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
use tokio::time::sleep;
|
||||||
|
|
||||||
|
pub const PUSH_QUEUE_TIMEOUT: Duration = Duration::from_secs(120);
|
||||||
|
pub const PUSH_LOCK_TTL_SECS: usize = 300;
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||||
|
pub struct PushQueuePosition {
|
||||||
|
pub position: usize,
|
||||||
|
pub total: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||||
|
pub enum PushQueueEvent {
|
||||||
|
Waiting(PushQueuePosition),
|
||||||
|
Acquired,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum PushQueueWaitError {
|
||||||
|
Join(redis::RedisError),
|
||||||
|
Lock(redis::RedisError),
|
||||||
|
Timeout,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for PushQueueWaitError {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::Join(e) => write!(f, "failed to join push queue: {e}"),
|
||||||
|
Self::Lock(e) => write!(f, "failed to acquire push queue lock: {e}"),
|
||||||
|
Self::Timeout => write!(f, "push queue timed out"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for PushQueueWaitError {}
|
||||||
|
|
||||||
|
pub struct PushQueueLease {
|
||||||
|
service: ReceiveSyncService,
|
||||||
|
repo_uid: uuid::Uuid,
|
||||||
|
request_id: String,
|
||||||
|
heartbeat: Option<JoinHandle<()>>,
|
||||||
|
released: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PushQueueLease {
|
||||||
|
fn new(service: ReceiveSyncService, repo_uid: uuid::Uuid, request_id: String) -> Self {
|
||||||
|
let heartbeat = Some(start_lock_heartbeat(
|
||||||
|
service.clone(),
|
||||||
|
repo_uid,
|
||||||
|
request_id.clone(),
|
||||||
|
));
|
||||||
|
Self {
|
||||||
|
service,
|
||||||
|
repo_uid,
|
||||||
|
request_id,
|
||||||
|
heartbeat,
|
||||||
|
released: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn request_id(&self) -> &str {
|
||||||
|
&self.request_id
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn release(&mut self) {
|
||||||
|
if self.released {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
self.service
|
||||||
|
.release_push_queue(self.repo_uid, &self.request_id)
|
||||||
|
.await;
|
||||||
|
if let Some(heartbeat) = self.heartbeat.take() {
|
||||||
|
heartbeat.abort();
|
||||||
|
}
|
||||||
|
self.released = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for PushQueueLease {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if self.released {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if let Some(heartbeat) = self.heartbeat.take() {
|
||||||
|
heartbeat.abort();
|
||||||
|
}
|
||||||
|
let service = self.service.clone();
|
||||||
|
let repo_uid = self.repo_uid;
|
||||||
|
let request_id = self.request_id.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
service.release_push_queue(repo_uid, &request_id).await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn start_lock_heartbeat(
|
||||||
|
service: ReceiveSyncService,
|
||||||
|
repo_uid: uuid::Uuid,
|
||||||
|
request_id: String,
|
||||||
|
) -> JoinHandle<()> {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let interval = Duration::from_secs((PUSH_LOCK_TTL_SECS as u64 / 3).max(30));
|
||||||
|
loop {
|
||||||
|
sleep(interval).await;
|
||||||
|
match service
|
||||||
|
.refresh_push_lock(repo_uid, &request_id, PUSH_LOCK_TTL_SECS)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(true) => {}
|
||||||
|
Ok(false) => {
|
||||||
|
tracing::warn!(
|
||||||
|
repo_id = %repo_uid,
|
||||||
|
request_id = %request_id,
|
||||||
|
"push_queue_lock_lost"
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(
|
||||||
|
error = %e,
|
||||||
|
repo_id = %repo_uid,
|
||||||
|
request_id = %request_id,
|
||||||
|
"push_queue_lock_refresh_failed"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn wait_for_push_queue_slot<F>(
|
||||||
|
service: ReceiveSyncService,
|
||||||
|
repo_uid: uuid::Uuid,
|
||||||
|
mut on_event: F,
|
||||||
|
) -> Result<PushQueueLease, PushQueueWaitError>
|
||||||
|
where
|
||||||
|
F: FnMut(PushQueueEvent, &str),
|
||||||
|
{
|
||||||
|
let request_id = uuid::Uuid::new_v4().to_string();
|
||||||
|
service
|
||||||
|
.join_push_queue(repo_uid, &request_id)
|
||||||
|
.await
|
||||||
|
.map_err(PushQueueWaitError::Join)?;
|
||||||
|
|
||||||
|
let deadline = Instant::now() + PUSH_QUEUE_TIMEOUT;
|
||||||
|
let mut last_position = None;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let position = service.push_queue_position(repo_uid, &request_id).await;
|
||||||
|
if let Some((position, total)) = position {
|
||||||
|
let position = PushQueuePosition { position, total };
|
||||||
|
if last_position != Some(position) && position.position > 1 {
|
||||||
|
on_event(PushQueueEvent::Waiting(position), &request_id);
|
||||||
|
}
|
||||||
|
last_position = Some(position);
|
||||||
|
|
||||||
|
if position.position == 1 {
|
||||||
|
match service
|
||||||
|
.try_acquire_push_lock(repo_uid, &request_id, PUSH_LOCK_TTL_SECS)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(true) => {
|
||||||
|
on_event(PushQueueEvent::Acquired, &request_id);
|
||||||
|
return Ok(PushQueueLease::new(service, repo_uid, request_id));
|
||||||
|
}
|
||||||
|
Ok(false) => {}
|
||||||
|
Err(e) => {
|
||||||
|
service.release_push_queue(repo_uid, &request_id).await;
|
||||||
|
return Err(PushQueueWaitError::Lock(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if Instant::now() >= deadline {
|
||||||
|
service.release_push_queue(repo_uid, &request_id).await;
|
||||||
|
return Err(PushQueueWaitError::Timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
sleep(Duration::from_secs(1)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -6,32 +6,108 @@ pub struct RefUpdate {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl RefUpdate {
|
impl RefUpdate {
|
||||||
/// Parse git reference update commands from SSH protocol text.
|
/// Parse git receive-pack reference update commands from pkt-line data.
|
||||||
/// Format: "<old-oid> <new-oid> <ref-name>\n"
|
/// Payload format: "<old-oid> <new-oid> <ref-name>\0capabilities\n".
|
||||||
pub fn parse_ref_updates(data: &[u8]) -> Result<Vec<Self>, String> {
|
pub fn parse_ref_updates(data: &[u8]) -> Result<Vec<Self>, String> {
|
||||||
let text = String::from_utf8_lossy(data);
|
|
||||||
let mut refs = Vec::new();
|
let mut refs = Vec::new();
|
||||||
for line in text.lines() {
|
|
||||||
let line = line.trim();
|
for payload in parse_pkt_line_payloads(data)? {
|
||||||
if line.is_empty() || line.starts_with('#') || line.starts_with("PACK") {
|
let line = String::from_utf8_lossy(payload);
|
||||||
|
let line = line.trim_end_matches(['\r', '\n']);
|
||||||
|
if line.is_empty() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let mut parts = line.split_whitespace();
|
|
||||||
let old_oid = parts.next().map(|s| s.to_string()).unwrap_or_default();
|
let mut parts = line.splitn(3, ' ');
|
||||||
let new_oid = parts.next().map(|s| s.to_string()).unwrap_or_default();
|
let old_oid = parts.next().unwrap_or_default();
|
||||||
let name = parts
|
let new_oid = parts.next().unwrap_or_default();
|
||||||
.next()
|
let raw_name = parts.next().unwrap_or_default();
|
||||||
.unwrap_or("")
|
let name = raw_name
|
||||||
.trim_start_matches('\0')
|
.split_once('\0')
|
||||||
.to_string();
|
.map(|(name, _)| name)
|
||||||
if !name.is_empty() {
|
.unwrap_or(raw_name)
|
||||||
refs.push(RefUpdate {
|
.trim();
|
||||||
old_oid,
|
|
||||||
new_oid,
|
if old_oid.len() != 40 || new_oid.len() != 40 || name.is_empty() {
|
||||||
name,
|
continue;
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
refs.push(RefUpdate {
|
||||||
|
old_oid: old_oid.to_string(),
|
||||||
|
new_oid: new_oid.to_string(),
|
||||||
|
name: name.to_string(),
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(refs)
|
Ok(refs)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn parse_pkt_line_payloads(data: &[u8]) -> Result<Vec<&[u8]>, String> {
|
||||||
|
let mut payloads = Vec::new();
|
||||||
|
let mut offset = 0;
|
||||||
|
|
||||||
|
while offset + 4 <= data.len() {
|
||||||
|
let header = std::str::from_utf8(&data[offset..offset + 4])
|
||||||
|
.map_err(|_| "invalid pkt-line header encoding".to_string())?;
|
||||||
|
let len = usize::from_str_radix(header, 16)
|
||||||
|
.map_err(|_| format!("invalid pkt-line length: {header}"))?;
|
||||||
|
offset += 4;
|
||||||
|
|
||||||
|
match len {
|
||||||
|
0 => break,
|
||||||
|
1..=3 => return Err(format!("invalid pkt-line length: {len}")),
|
||||||
|
_ => {
|
||||||
|
let payload_len = len - 4;
|
||||||
|
if offset + payload_len > data.len() {
|
||||||
|
return Err("truncated pkt-line payload".to_string());
|
||||||
|
}
|
||||||
|
payloads.push(&data[offset..offset + payload_len]);
|
||||||
|
offset += payload_len;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(payloads)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::RefUpdate;
|
||||||
|
|
||||||
|
fn pkt(payload: &str) -> Vec<u8> {
|
||||||
|
let len = payload.len() + 4;
|
||||||
|
let mut out = format!("{len:04x}").into_bytes();
|
||||||
|
out.extend_from_slice(payload.as_bytes());
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parses_receive_pack_ref_with_capabilities() {
|
||||||
|
let mut data = pkt(
|
||||||
|
"0000000000000000000000000000000000000000 1111111111111111111111111111111111111111 refs/heads/feature\0 report-status\n",
|
||||||
|
);
|
||||||
|
data.extend_from_slice(b"0000");
|
||||||
|
|
||||||
|
let refs = RefUpdate::parse_ref_updates(&data).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(refs.len(), 1);
|
||||||
|
assert_eq!(refs[0].old_oid, "0000000000000000000000000000000000000000");
|
||||||
|
assert_eq!(refs[0].new_oid, "1111111111111111111111111111111111111111");
|
||||||
|
assert_eq!(refs[0].name, "refs/heads/feature");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parses_receive_pack_ref_without_pack_payload() {
|
||||||
|
let mut data = pkt(
|
||||||
|
"2222222222222222222222222222222222222222 0000000000000000000000000000000000000000 refs/heads/old\n",
|
||||||
|
);
|
||||||
|
data.extend_from_slice(b"0000");
|
||||||
|
|
||||||
|
let refs = RefUpdate::parse_ref_updates(&data).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(refs.len(), 1);
|
||||||
|
assert_eq!(refs[0].name, "refs/heads/old");
|
||||||
|
assert_eq!(refs[0].new_oid, "0000000000000000000000000000000000000000");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -1,5 +1,7 @@
|
|||||||
use crate::AppService;
|
use crate::AppService;
|
||||||
use crate::error::AppError;
|
use crate::error::AppError;
|
||||||
|
use argon2::Argon2;
|
||||||
|
use argon2::password_hash::{PasswordHash, PasswordVerifier};
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use models::users::{user_activity_log, user_token};
|
use models::users::{user_activity_log, user_token};
|
||||||
use sea_orm::*;
|
use sea_orm::*;
|
||||||
@ -191,22 +193,33 @@ impl AppService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn user_verify_access_key(&self, access_key: String) -> Result<Uuid, AppError> {
|
pub async fn user_verify_access_key(&self, access_key: String) -> Result<Uuid, AppError> {
|
||||||
let access_key_hash = self.user_hash_access_key(&access_key);
|
let access_key_models = user_token::Entity::find()
|
||||||
|
|
||||||
let access_key_model = user_token::Entity::find()
|
|
||||||
.filter(user_token::Column::TokenHash.eq(access_key_hash))
|
|
||||||
.filter(user_token::Column::IsRevoked.eq(false))
|
.filter(user_token::Column::IsRevoked.eq(false))
|
||||||
.one(&self.db)
|
.all(&self.db)
|
||||||
.await?
|
.await?;
|
||||||
.ok_or(AppError::Unauthorized)?;
|
|
||||||
|
|
||||||
if let Some(expires_at) = access_key_model.expires_at {
|
for access_key_model in access_key_models {
|
||||||
if expires_at < Utc::now() {
|
if access_key_model
|
||||||
return Err(AppError::Unauthorized);
|
.expires_at
|
||||||
|
.map(|expires_at| expires_at < Utc::now())
|
||||||
|
.unwrap_or(false)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let Ok(hash) = PasswordHash::new(&access_key_model.token_hash) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
if Argon2::default()
|
||||||
|
.verify_password(access_key.as_bytes(), &hash)
|
||||||
|
.is_ok()
|
||||||
|
{
|
||||||
|
return Ok(access_key_model.user);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(access_key_model.user)
|
Err(AppError::Unauthorized)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn user_generate_access_key(&self) -> String {
|
fn user_generate_access_key(&self) -> String {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user