459 lines
15 KiB
Rust
459 lines
15 KiB
Rust
use crate::error::GitError;
|
|
use crate::http::handler::is_valid_oid;
|
|
use actix_web::{HttpResponse, web};
|
|
use base64::Engine;
|
|
use base64::engine::general_purpose::STANDARD;
|
|
use db::database::AppDatabase;
|
|
use models::repos::{repo, repo_lfs_lock, repo_lfs_object};
|
|
use sea_orm::sqlx::types::chrono;
|
|
use sea_orm::*;
|
|
use serde::{Deserialize, Serialize};
|
|
use std::collections::HashMap;
|
|
use std::path::PathBuf;
|
|
|
|
const LFS_AUTH_TOKEN_EXPIRY: u64 = 3600;
|
|
|
|
#[derive(Deserialize, Serialize)]
|
|
pub struct BatchRequest {
|
|
pub operation: String,
|
|
pub objects: Vec<LfsObjectReq>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub transfers: Option<Vec<String>>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub r#ref: Option<LfsRef>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub hash_algo: Option<String>,
|
|
}
|
|
|
|
#[derive(Deserialize, Serialize)]
|
|
pub struct LfsRef {
|
|
pub name: String,
|
|
}
|
|
|
|
#[derive(Deserialize, Serialize, Clone)]
|
|
pub struct LfsObjectReq {
|
|
pub oid: String,
|
|
pub size: i64,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub struct BatchResponse {
|
|
pub transfer: String,
|
|
pub objects: Vec<LfsObjectResponse>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub hash_algo: Option<String>,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub struct LfsObjectResponse {
|
|
pub oid: String,
|
|
pub size: i64,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub authenticated: Option<bool>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub actions: Option<HashMap<String, LfsAction>>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub error: Option<LfsError>,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub struct LfsAction {
|
|
pub href: String,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub header: Option<HashMap<String, String>>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub expires_in: Option<i64>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub expires_at: Option<String>,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub struct LfsError {
|
|
pub code: i32,
|
|
pub message: String,
|
|
}
|
|
|
|
#[derive(Deserialize)]
|
|
pub struct CreateLockRequest {
|
|
pub oid: String,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub struct LockResponse {
|
|
pub path: String,
|
|
pub locked_by: uuid::Uuid,
|
|
pub locked_at: String,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub unlocked_at: Option<String>,
|
|
}
|
|
|
|
pub struct LfsHandler {
|
|
pub storage_path: PathBuf,
|
|
pub model: repo::Model,
|
|
pub db: AppDatabase,
|
|
}
|
|
|
|
impl LfsHandler {
|
|
pub fn new(storage_path: PathBuf, model: repo::Model, db: AppDatabase) -> Self {
|
|
Self {
|
|
storage_path,
|
|
model,
|
|
db,
|
|
}
|
|
}
|
|
|
|
fn get_lfs_storage_path(&self) -> PathBuf {
|
|
self.storage_path.join(".lfs")
|
|
}
|
|
|
|
fn get_object_path(&self, oid: &str) -> PathBuf {
|
|
let prefix = &oid[..2];
|
|
self.get_lfs_storage_path()
|
|
.join("objects")
|
|
.join(prefix)
|
|
.join(oid)
|
|
}
|
|
|
|
pub async fn batch(
|
|
&self,
|
|
req: BatchRequest,
|
|
base_url: &str,
|
|
) -> Result<BatchResponse, GitError> {
|
|
let operation = req.operation.as_str();
|
|
|
|
if operation != "upload" && operation != "download" {
|
|
return Err(GitError::InvalidOid(format!(
|
|
"Invalid operation: {}",
|
|
operation
|
|
)));
|
|
}
|
|
|
|
let oids: Vec<&str> = req.objects.iter().map(|o| o.oid.as_str()).collect();
|
|
|
|
// Single batch query for all OIDs
|
|
let existing: Vec<repo_lfs_object::Model> = repo_lfs_object::Entity::find()
|
|
.filter(repo_lfs_object::Column::Oid.is_in(oids.clone()))
|
|
.filter(repo_lfs_object::Column::Repo.eq(self.model.id))
|
|
.all(self.db.reader())
|
|
.await
|
|
.map_err(|e| GitError::Internal(e.to_string()))?;
|
|
|
|
let existing_map: HashMap<&str, &repo_lfs_object::Model> =
|
|
existing.iter().map(|m| (m.oid.as_str(), m)).collect();
|
|
|
|
let mut response_objects = Vec::with_capacity(req.objects.len());
|
|
|
|
for obj in req.objects {
|
|
let existing = existing_map.get(obj.oid.as_str());
|
|
|
|
let mut actions = HashMap::new();
|
|
|
|
match operation {
|
|
"upload" => {
|
|
if existing.is_none() {
|
|
let upload_url = format!(
|
|
"{}/{}/{}.git/info/lfs/objects/{}",
|
|
base_url, self.model.project, self.model.repo_name, obj.oid
|
|
);
|
|
|
|
let mut headers = HashMap::new();
|
|
headers.insert("authorization".to_string(), "Bearer token".to_string());
|
|
|
|
actions.insert(
|
|
"upload".to_string(),
|
|
LfsAction {
|
|
href: upload_url,
|
|
header: Some(headers),
|
|
expires_in: Some(LFS_AUTH_TOKEN_EXPIRY as i64),
|
|
expires_at: None,
|
|
},
|
|
);
|
|
}
|
|
}
|
|
"download" => match existing {
|
|
Some(_) => {
|
|
let download_url = format!(
|
|
"{}/{}/{}.git/info/lfs/objects/{}",
|
|
base_url, self.model.project, self.model.repo_name, obj.oid
|
|
);
|
|
|
|
let mut headers = HashMap::new();
|
|
headers.insert("authorization".to_string(), "Bearer token".to_string());
|
|
|
|
actions.insert(
|
|
"download".to_string(),
|
|
LfsAction {
|
|
href: download_url,
|
|
header: Some(headers),
|
|
expires_in: Some(LFS_AUTH_TOKEN_EXPIRY as i64),
|
|
expires_at: None,
|
|
},
|
|
);
|
|
}
|
|
None => {
|
|
response_objects.push(LfsObjectResponse {
|
|
oid: obj.oid,
|
|
size: obj.size,
|
|
authenticated: None,
|
|
actions: None,
|
|
error: Some(LfsError {
|
|
code: 404,
|
|
message: "Object does not exist".to_string(),
|
|
}),
|
|
});
|
|
continue;
|
|
}
|
|
},
|
|
_ => {}
|
|
}
|
|
|
|
response_objects.push(LfsObjectResponse {
|
|
oid: obj.oid,
|
|
size: obj.size,
|
|
authenticated: Some(true),
|
|
actions: if actions.is_empty() {
|
|
None
|
|
} else {
|
|
Some(actions)
|
|
},
|
|
error: None,
|
|
});
|
|
}
|
|
|
|
Ok(BatchResponse {
|
|
transfer: "basic".to_string(),
|
|
objects: response_objects,
|
|
hash_algo: req.hash_algo,
|
|
})
|
|
}
|
|
|
|
pub async fn upload_object(
|
|
&self,
|
|
oid: &str,
|
|
payload: web::Payload,
|
|
_auth_token: &str,
|
|
) -> Result<HttpResponse, GitError> {
|
|
if !is_valid_oid(oid) {
|
|
return Err(GitError::InvalidOid(format!("Invalid OID format: {}", oid)));
|
|
}
|
|
|
|
let object_path = self.get_object_path(oid);
|
|
if let Some(parent) = object_path.parent() {
|
|
tokio::fs::create_dir_all(parent)
|
|
.await
|
|
.map_err(|e| GitError::Internal(format!("Failed to create directory: {}", e)))?;
|
|
}
|
|
|
|
let temp_path = object_path.with_extension("tmp");
|
|
let mut file = tokio::fs::File::create(&temp_path)
|
|
.await
|
|
.map_err(|e| GitError::Internal(format!("Failed to create temp file: {}", e)))?;
|
|
|
|
use futures_util::stream::StreamExt;
|
|
use sha2::Digest;
|
|
use tokio::io::AsyncWriteExt;
|
|
|
|
let mut payload = payload;
|
|
let mut size = 0i64;
|
|
let mut hasher = sha2::Sha256::new();
|
|
|
|
while let Some(chunk) = payload.next().await {
|
|
let chunk = chunk.map_err(|e| GitError::Internal(format!("Payload error: {}", e)))?;
|
|
size += chunk.len() as i64;
|
|
hasher.update(&chunk);
|
|
if let Err(e) = file.write_all(&chunk).await {
|
|
let _ = tokio::fs::remove_file(&temp_path).await;
|
|
return Err(GitError::Internal(format!("Failed to write file: {}", e)));
|
|
}
|
|
}
|
|
|
|
file.flush()
|
|
.await
|
|
.map_err(|e| GitError::Internal(format!("Failed to flush file: {}", e)))?;
|
|
drop(file);
|
|
|
|
let hash_bytes = hasher.finalize();
|
|
let calculated_oid = STANDARD.encode(hash_bytes.as_slice());
|
|
|
|
if calculated_oid != oid {
|
|
let _ = tokio::fs::remove_file(&temp_path).await;
|
|
return Err(GitError::InvalidOid(format!(
|
|
"OID mismatch: expected {}, got {}",
|
|
oid, calculated_oid
|
|
)));
|
|
}
|
|
|
|
if let Err(e) = tokio::fs::rename(&temp_path, &object_path).await {
|
|
let _ = tokio::fs::remove_file(&temp_path).await;
|
|
return Err(GitError::Internal(format!("Failed to move file: {}", e)));
|
|
}
|
|
|
|
let now = chrono::Utc::now();
|
|
let new_object = repo_lfs_object::ActiveModel {
|
|
id: Set(0i64),
|
|
oid: Set(oid.to_string()),
|
|
repo: Set(self.model.id),
|
|
size: Set(size),
|
|
storage_path: Set(object_path.to_string_lossy().to_string()),
|
|
uploaded_by: Set(None),
|
|
uploaded_at: Set(now),
|
|
};
|
|
|
|
new_object
|
|
.insert(self.db.writer())
|
|
.await
|
|
.map_err(|e| GitError::Internal(e.to_string()))?;
|
|
|
|
Ok(HttpResponse::Ok().finish())
|
|
}
|
|
|
|
pub async fn download_object(
|
|
&self,
|
|
oid: &str,
|
|
_auth_token: &str,
|
|
) -> Result<HttpResponse, GitError> {
|
|
if !is_valid_oid(oid) {
|
|
return Err(GitError::InvalidOid(format!("Invalid OID format: {}", oid)));
|
|
}
|
|
|
|
let obj = repo_lfs_object::Entity::find()
|
|
.filter(repo_lfs_object::Column::Oid.eq(oid))
|
|
.filter(repo_lfs_object::Column::Repo.eq(self.model.id))
|
|
.one(self.db.reader())
|
|
.await
|
|
.map_err(|e| GitError::Internal(e.to_string()))?
|
|
.ok_or_else(|| GitError::NotFound("Object not found".to_string()))?;
|
|
|
|
let file = tokio::fs::File::open(&obj.storage_path)
|
|
.await
|
|
.map_err(|e| GitError::Internal(format!("Failed to open file: {}", e)))?;
|
|
|
|
use actix_web::body::BodyStream;
|
|
use futures_util::stream;
|
|
use tokio::io::AsyncReadExt;
|
|
|
|
let chunk_size: usize = 65536;
|
|
|
|
let stream = stream::unfold(file, move |mut file| async move {
|
|
let mut buffer = vec![0u8; chunk_size];
|
|
match file.read(&mut buffer).await {
|
|
Ok(0) => None,
|
|
Ok(n) => {
|
|
buffer.truncate(n);
|
|
Some((
|
|
Ok::<_, std::io::Error>(actix_web::web::Bytes::from(buffer)),
|
|
file,
|
|
))
|
|
}
|
|
Err(e) => Some((Err(e), file)),
|
|
}
|
|
});
|
|
|
|
Ok(HttpResponse::Ok()
|
|
.content_type("application/octet-stream")
|
|
.insert_header(("Content-Length", obj.size.to_string()))
|
|
.body(BodyStream::new(stream)))
|
|
}
|
|
|
|
pub async fn lock_object(
|
|
&self,
|
|
oid: &str,
|
|
user_uid: uuid::Uuid,
|
|
) -> Result<LockResponse, GitError> {
|
|
use sea_orm::ActiveModelTrait;
|
|
|
|
if !is_valid_oid(oid) {
|
|
return Err(GitError::InvalidOid(format!("Invalid OID format: {}", oid)));
|
|
}
|
|
|
|
let now = chrono::Utc::now();
|
|
|
|
let am = repo_lfs_lock::ActiveModel {
|
|
repo: Set(self.model.id),
|
|
path: Set(oid.to_string()),
|
|
lock_type: Set("upload".to_string()),
|
|
locked_by: Set(user_uid),
|
|
locked_at: Set(now),
|
|
unlocked_at: Set(None),
|
|
};
|
|
|
|
match am.insert(self.db.writer()).await {
|
|
Ok(model) => Ok(LockResponse {
|
|
path: model.path,
|
|
locked_by: model.locked_by,
|
|
locked_at: model.locked_at.to_rfc3339(),
|
|
unlocked_at: model.unlocked_at.map(|t| t.to_rfc3339()),
|
|
}),
|
|
Err(e) => {
|
|
let err_msg = format!("{}", e);
|
|
if err_msg.contains("duplicate key") || err_msg.contains("23505") {
|
|
return Err(GitError::Locked("Already locked".to_string()));
|
|
}
|
|
Err(GitError::Internal(format!("DB error: {}", e)))
|
|
}
|
|
}
|
|
}
|
|
|
|
pub async fn unlock_object(&self, lock_id: &str, user_uid: uuid::Uuid) -> Result<(), GitError> {
|
|
let existing = repo_lfs_lock::Entity::find()
|
|
.filter(repo_lfs_lock::Column::Repo.eq(self.model.id))
|
|
.filter(repo_lfs_lock::Column::Path.eq(lock_id.to_string()))
|
|
.one(self.db.reader())
|
|
.await
|
|
.map_err(|e| GitError::Internal(e.to_string()))?
|
|
.ok_or_else(|| GitError::NotFound("Lock not found".to_string()))?;
|
|
|
|
if existing.locked_by != user_uid && existing.locked_by != self.model.created_by {
|
|
return Err(GitError::PermissionDenied(
|
|
"Not allowed to unlock".to_string(),
|
|
));
|
|
}
|
|
|
|
let now = chrono::Utc::now();
|
|
let mut am: repo_lfs_lock::ActiveModel = existing.into();
|
|
am.unlocked_at = Set(Some(now));
|
|
let _: repo_lfs_lock::Model = am
|
|
.update(self.db.writer())
|
|
.await
|
|
.map_err(|e| GitError::Internal(e.to_string()))?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn list_locks(&self, maybe_oid: Option<&str>) -> Result<Vec<LockResponse>, GitError> {
|
|
let mut q =
|
|
repo_lfs_lock::Entity::find().filter(repo_lfs_lock::Column::Repo.eq(self.model.id));
|
|
if let Some(oid) = maybe_oid {
|
|
q = q.filter(repo_lfs_lock::Column::Path.eq(oid.to_string()));
|
|
}
|
|
let rows: Vec<repo_lfs_lock::Model> = q
|
|
.all(self.db.reader())
|
|
.await
|
|
.map_err(|e| GitError::Internal(e.to_string()))?;
|
|
Ok(rows
|
|
.into_iter()
|
|
.map(|r| LockResponse {
|
|
path: r.path,
|
|
locked_by: r.locked_by,
|
|
locked_at: r.locked_at.to_rfc3339(),
|
|
unlocked_at: r.unlocked_at.map(|t| t.to_rfc3339()),
|
|
})
|
|
.collect())
|
|
}
|
|
|
|
pub async fn get_lock(&self, path: &str) -> Result<LockResponse, GitError> {
|
|
let r = repo_lfs_lock::Entity::find()
|
|
.filter(repo_lfs_lock::Column::Repo.eq(self.model.id))
|
|
.filter(repo_lfs_lock::Column::Path.eq(path.to_string()))
|
|
.one(self.db.reader())
|
|
.await
|
|
.map_err(|e| GitError::Internal(e.to_string()))?
|
|
.ok_or_else(|| GitError::NotFound("Lock not found".to_string()))?;
|
|
Ok(LockResponse {
|
|
path: r.path,
|
|
locked_by: r.locked_by,
|
|
locked_at: r.locked_at.to_rfc3339(),
|
|
unlocked_at: r.unlocked_at.map(|t| t.to_rfc3339()),
|
|
})
|
|
}
|
|
}
|