gitdataai/lib/service/git/init.rs
2026-06-01 22:04:38 +08:00

228 lines
8.1 KiB
Rust

use db::sqlx;
use git::rpc::{proto as p, proto::init_service_client::InitServiceClient};
use model::repos::RepoModel;
use session::Session;
use crate::{
AppService, error::AppError, git::rpc_err, metrics::with_op_metric,
session_user,
};
#[derive(Debug, Clone, serde::Deserialize, utoipa::ToSchema)]
pub struct CreateRepo {
pub name: String,
pub description: Option<String>,
pub default_branch: Option<String>,
pub visibility: Option<String>,
pub initialize_with_readme: Option<bool>,
pub enable_lfs: Option<bool>,
}
#[derive(Debug, Clone, serde::Deserialize, utoipa::ToSchema)]
pub struct CloneRepo {
pub name: String,
pub source_url: String,
pub description: Option<String>,
pub visibility: Option<String>,
}
impl AppService {
#[tracing::instrument(skip(self, ctx), fields(workspace = %wk_name))]
pub async fn git_init_bare(
&self,
ctx: &Session,
wk_name: &str,
params: CreateRepo,
) -> Result<RepoModel, AppError> {
with_op_metric(&self.metrics.repo_operations_total, &["create"], async {
let user_uid = session_user(ctx)?;
let wk = self.workspace_resolve(wk_name).await?;
self.workspace_require_admin(wk.id, user_uid).await?;
let name = params.name.trim();
if name.is_empty() {
return Err(AppError::BadRequest(
"repo name is required".to_string(),
));
}
let existing = sqlx::query_scalar::<_, bool>(
"SELECT EXISTS(SELECT 1 FROM repo WHERE wk = $1 AND name = $2 AND deleted_at IS NULL)",
)
.bind(wk.id)
.bind(name)
.fetch_one(self.db.reader())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
if existing {
return Err(AppError::RepoNameAlreadyExists);
}
let default_branch =
params.default_branch.unwrap_or_else(|| "main".to_string());
let visibility =
params.visibility.unwrap_or_else(|| "private".to_string());
let description = params.description.unwrap_or_default();
let now = chrono::Utc::now();
let repo_id = uuid::Uuid::now_v7();
let mut txn = self.db.begin().await.map_err(|_| AppError::TxnError)?;
let _repo = sqlx::query_as::<_, RepoModel>(
"INSERT INTO repo (id, wk, name, description, default_branch, visibility, size_bytes, \
is_archived, is_template, is_mirror, created_by, storage_path, created_at, updated_at) \
VALUES ($1, $2, $3, $4, $5, $6, 0, false, false, false, $7, '', $8, $8) \
RETURNING id, wk, name, description, default_branch, visibility, size_bytes, \
is_archived, is_template, is_mirror, created_by, storage_path, created_at, updated_at, deleted_at",
)
.bind(repo_id)
.bind(wk.id)
.bind(name)
.bind(&description)
.bind(&default_branch)
.bind(&visibility)
.bind(user_uid)
.bind(now)
.fetch_one(&mut **txn.inner_mut())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
let storage_root = self
.config
.repos_root()
.map_err(|e| AppError::InternalServerError(e.to_string()))?;
let mut client = InitServiceClient::new(self.git.clone());
let rpc_resp = client
.init_bare(tonic::Request::new(p::InitBareRequest {
storage_root,
params: Some(p::InitRepoParams {
namespace: wk.name.clone(),
repo_name: name.to_string(),
default_branch,
description: Some(description),
initialize_with_readme: params
.initialize_with_readme
.unwrap_or(false),
enable_lfs: params.enable_lfs.unwrap_or(false),
}),
}))
.await
.map_err(rpc_err)?
.into_inner();
let repo = sqlx::query_as::<_, RepoModel>(
"UPDATE repo SET storage_path = $1 WHERE id = $2 \
RETURNING id, wk, name, description, default_branch, visibility, size_bytes, \
is_archived, is_template, is_mirror, created_by, storage_path, created_at, updated_at, deleted_at",
)
.bind(&rpc_resp.storage_path)
.bind(repo_id)
.fetch_one(&mut **txn.inner_mut())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
txn.commit().await.map_err(|_| AppError::TxnError)?;
self.queue_sync(repo_id).await;
Ok(repo)
}).await
}
pub async fn git_clone_bare(
&self,
ctx: &Session,
wk_name: &str,
params: CloneRepo,
) -> Result<RepoModel, AppError> {
let user_uid = session_user(ctx)?;
let wk = self.workspace_resolve(wk_name).await?;
self.workspace_require_admin(wk.id, user_uid).await?;
let name = params.name.trim();
if name.is_empty() {
return Err(AppError::BadRequest(
"repo name is required".to_string(),
));
}
let source_url = params.source_url.trim();
if source_url.is_empty() {
return Err(AppError::BadRequest(
"source URL is required".to_string(),
));
}
let existing = sqlx::query_scalar::<_, bool>(
"SELECT EXISTS(SELECT 1 FROM repo WHERE wk = $1 AND name = $2 AND deleted_at IS NULL)",
)
.bind(wk.id)
.bind(name)
.fetch_one(self.db.reader())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
if existing {
return Err(AppError::RepoNameAlreadyExists);
}
let visibility =
params.visibility.unwrap_or_else(|| "private".to_string());
let description = params.description.unwrap_or_default();
let now = chrono::Utc::now();
let repo_id = uuid::Uuid::now_v7();
let mut txn = self.db.begin().await.map_err(|_| AppError::TxnError)?;
let _repo = sqlx::query_as::<_, RepoModel>(
"INSERT INTO repo (id, wk, name, description, default_branch, visibility, size_bytes, \
is_archived, is_template, is_mirror, created_by, storage_path, created_at, updated_at) \
VALUES ($1, $2, $3, $4, '', $5, 0, false, false, true, $6, '', $7, $7) \
RETURNING id, wk, name, description, default_branch, visibility, size_bytes, \
is_archived, is_template, is_mirror, created_by, storage_path, created_at, updated_at, deleted_at",
)
.bind(repo_id)
.bind(wk.id)
.bind(name)
.bind(&description)
.bind(&visibility)
.bind(user_uid)
.bind(now)
.fetch_one(&mut **txn.inner_mut())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
let storage_root = self
.config
.repos_root()
.map_err(|e| AppError::InternalServerError(e.to_string()))?;
let mut client = InitServiceClient::new(self.git.clone());
let rpc_resp = client
.clone_bare(tonic::Request::new(p::CloneBareRequest {
storage_root,
source_url: source_url.to_string(),
namespace: wk.name.clone(),
repo_name: name.to_string(),
}))
.await
.map_err(rpc_err)?
.into_inner();
let repo = sqlx::query_as::<_, RepoModel>(
"UPDATE repo SET storage_path = $1 WHERE id = $2 \
RETURNING id, wk, name, description, default_branch, visibility, size_bytes, \
is_archived, is_template, is_mirror, created_by, storage_path, created_at, updated_at, deleted_at",
)
.bind(&rpc_resp.storage_path)
.bind(repo_id)
.fetch_one(&mut **txn.inner_mut())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
txn.commit().await.map_err(|_| AppError::TxnError)?;
self.queue_sync(repo_id).await;
Ok(repo)
}
}