gitdataai/lib/service/git/repo.rs
2026-06-01 22:04:38 +08:00

456 lines
15 KiB
Rust

use db::sqlx;
use git::rpc::{proto as p, proto::init_service_client::InitServiceClient};
use model::repos::{RepoModel, RepoTopicModel};
use serde::{Deserialize, Serialize};
use session::Session;
use crate::{
AppService, Pagination, error::AppError, git::rpc_err,
metrics::with_op_metric, session_user,
};
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
pub struct RepoResponse {
#[schema(value_type = String)]
pub id: uuid::Uuid,
pub name: String,
pub description: Option<String>,
pub default_branch: String,
pub visibility: String,
#[schema(value_type = i64)]
pub size_bytes: i64,
pub is_archived: bool,
pub is_template: bool,
pub is_mirror: bool,
#[schema(value_type = String)]
pub created_by: uuid::Uuid,
#[schema(value_type = String)]
pub created_at: chrono::DateTime<chrono::Utc>,
#[schema(value_type = String)]
pub updated_at: chrono::DateTime<chrono::Utc>,
}
pub fn repo_response(repo: RepoModel) -> RepoResponse {
RepoResponse {
id: repo.id,
name: repo.name,
description: repo.description,
default_branch: repo.default_branch,
visibility: repo.visibility,
size_bytes: repo.size_bytes,
is_archived: repo.is_archived,
is_template: repo.is_template,
is_mirror: repo.is_mirror,
created_by: repo.created_by,
created_at: repo.created_at,
updated_at: repo.updated_at,
}
}
#[derive(Debug, Clone, Deserialize, utoipa::ToSchema)]
pub struct UpdateRepo {
pub name: Option<String>,
pub description: Option<String>,
pub default_branch: Option<String>,
pub visibility: Option<String>,
pub is_archived: Option<bool>,
pub is_template: Option<bool>,
}
#[derive(Debug, Clone, Deserialize, utoipa::ToSchema)]
pub struct TransferRepo {
pub target_workspace: String,
}
#[derive(Debug, Clone, Deserialize, utoipa::IntoParams)]
pub struct RepoFilter {
pub visibility: Option<String>,
pub is_archived: Option<bool>,
pub search: Option<String>,
}
impl AppService {
pub async fn repo_list(
&self,
ctx: &Session,
wk_name: &str,
filter: RepoFilter,
pagination: Pagination,
) -> Result<Vec<RepoResponse>, AppError> {
let user_uid = session_user(ctx)?;
let wk = self.workspace_resolve(wk_name).await?;
self.workspace_require_member(wk.id, user_uid).await?;
let mut param_idx = 2;
let mut sql = String::from(
"SELECT id, wk, name, description, default_branch, visibility, size_bytes, \
is_archived, is_template, is_mirror, created_by, storage_path, created_at, updated_at, deleted_at \
FROM repo WHERE wk = $1 AND deleted_at IS NULL",
);
if filter.visibility.is_some() {
sql.push_str(&format!(" AND visibility = ${param_idx}"));
param_idx += 1;
}
if filter.is_archived.is_some() {
sql.push_str(&format!(" AND is_archived = ${param_idx}"));
param_idx += 1;
}
if filter.search.is_some() {
sql.push_str(&format!(" AND name ILIKE ${param_idx}"));
param_idx += 1;
}
let offset_idx = param_idx;
let limit_idx = param_idx + 1;
sql.push_str(&format!(
" ORDER BY name ASC OFFSET ${offset_idx} LIMIT ${limit_idx}"
));
let mut q = sqlx::query_as::<_, RepoModel>(sqlx::AssertSqlSafe(sql))
.bind(wk.id);
if let Some(vis) = &filter.visibility {
q = q.bind(vis.clone());
}
if let Some(archived) = filter.is_archived {
q = q.bind(archived);
}
if let Some(search) = &filter.search {
q = q.bind(format!("%{}%", search));
}
q = q
.bind(pagination.offset() as i64)
.bind(pagination.limit() as i64);
let rows = q
.fetch_all(self.db.reader())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
Ok(rows.into_iter().map(repo_response).collect())
}
pub async fn repo_get(
&self,
ctx: &Session,
wk_name: &str,
repo_name: &str,
) -> Result<RepoResponse, AppError> {
let repo = self.git_require_member(ctx, wk_name, repo_name).await?;
Ok(repo_response(repo))
}
pub async fn repo_update(
&self,
ctx: &Session,
wk_name: &str,
repo_name: &str,
params: UpdateRepo,
) -> Result<RepoResponse, AppError> {
with_op_metric(&self.metrics.repo_operations_total, &["update"], async {
let user_uid = session_user(ctx)?;
let wk = self.workspace_resolve(wk_name).await?;
self.workspace_require_admin(wk.id, user_uid).await?;
let mut repo = self.repo_resolve(wk.id, repo_name).await?;
let next_name = match params.name {
Some(name) => {
let name = name.trim();
if name.is_empty() {
return Err(AppError::BadRequest(
"repo name is required".to_string(),
));
}
if name != repo.name {
let existing = sqlx::query_scalar::<_, bool>(
"SELECT EXISTS(SELECT 1 FROM repo WHERE wk = $1 AND name = $2 AND deleted_at IS NULL)",
)
.bind(wk.id)
.bind(name)
.fetch_one(self.db.reader())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
if existing {
return Err(AppError::RepoNameAlreadyExists);
}
Some(name.to_string())
} else {
None
}
}
None => None,
};
let mut txn = self.db.begin().await.map_err(|_| AppError::TxnError)?;
if let Some(next_name) = &next_name {
sqlx::query(
"INSERT INTO repo_history_name (id, repo, name, changed_by, created_at) \
VALUES ($1, $2, $3, $4, $5)",
)
.bind(uuid::Uuid::now_v7())
.bind(repo.id)
.bind(&repo.name)
.bind(user_uid)
.bind(chrono::Utc::now())
.execute(&mut **txn.inner_mut())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
repo.name = next_name.clone();
}
if let Some(desc) = params.description {
repo.description = if desc.is_empty() { None } else { Some(desc) };
}
let mut default_branch_changed = false;
if let Some(branch) = params.default_branch {
repo.default_branch = branch;
default_branch_changed = true;
}
if let Some(vis) = params.visibility {
repo.visibility = vis;
}
if let Some(archived) = params.is_archived {
repo.is_archived = archived;
}
if let Some(template) = params.is_template {
repo.is_template = template;
}
let updated = sqlx::query_as::<_, RepoModel>(
"UPDATE repo SET name = $1, description = $2, default_branch = $3, \
visibility = $4, is_archived = $5, is_template = $6, updated_at = $7 \
WHERE id = $8 \
RETURNING id, wk, name, description, default_branch, visibility, size_bytes, \
is_archived, is_template, is_mirror, created_by, storage_path, created_at, updated_at, deleted_at",
)
.bind(&repo.name)
.bind(&repo.description)
.bind(&repo.default_branch)
.bind(&repo.visibility)
.bind(repo.is_archived)
.bind(repo.is_template)
.bind(chrono::Utc::now())
.bind(repo.id)
.fetch_one(&mut **txn.inner_mut())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
txn.commit().await.map_err(|_| AppError::TxnError)?;
if default_branch_changed {
let mut client = InitServiceClient::new(self.git.clone());
let _ = client
.set_default_branch(tonic::Request::new(
p::SetDefaultBranchRequest {
repo_id: repo.id.to_string(),
branch_name: repo.default_branch.clone(),
},
))
.await
.map_err(rpc_err);
self.queue_sync(repo.id).await;
}
Ok(repo_response(updated))
}).await
}
pub async fn repo_archive(
&self,
ctx: &Session,
wk_name: &str,
repo_name: &str,
) -> Result<RepoResponse, AppError> {
self.repo_update(
ctx,
wk_name,
repo_name,
UpdateRepo {
name: None,
description: None,
default_branch: None,
visibility: None,
is_archived: Some(true),
is_template: None,
},
)
.await
}
pub async fn repo_delete(
&self,
ctx: &Session,
wk_name: &str,
repo_name: &str,
) -> Result<(), AppError> {
with_op_metric(
&self.metrics.repo_operations_total,
&["delete"],
async {
let user_uid = session_user(ctx)?;
let wk = self.workspace_resolve(wk_name).await?;
self.workspace_require_owner(wk.id, user_uid).await?;
let repo = self.repo_resolve(wk.id, repo_name).await?;
sqlx::query("UPDATE repo SET deleted_at = $1 WHERE id = $2")
.bind(chrono::Utc::now())
.bind(repo.id)
.execute(self.db.writer())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
Ok(())
},
)
.await
}
pub async fn repo_transfer(
&self,
ctx: &Session,
wk_name: &str,
repo_name: &str,
params: TransferRepo,
) -> Result<RepoResponse, AppError> {
with_op_metric(&self.metrics.repo_transfer_total, &[], async {
let user_uid = session_user(ctx)?;
let src_wk = self.workspace_resolve(wk_name).await?;
self.workspace_require_owner(src_wk.id, user_uid).await?;
let repo = self.repo_resolve(src_wk.id, repo_name).await?;
let target_wk =
self.workspace_resolve(&params.target_workspace).await?;
self.workspace_require_admin(target_wk.id, user_uid).await?;
let existing = sqlx::query_scalar::<_, bool>(
"SELECT EXISTS(SELECT 1 FROM repo WHERE wk = $1 AND name = $2 AND deleted_at IS NULL)",
)
.bind(target_wk.id)
.bind(&repo.name)
.fetch_one(self.db.reader())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
if existing {
return Err(AppError::Conflict(
"repo name already exists in target workspace".to_string(),
));
}
let mut txn = self.db.begin().await.map_err(|_| AppError::TxnError)?;
sqlx::query(
"INSERT INTO repo_history_name (id, repo, name, changed_by, created_at) \
VALUES ($1, $2, $3, $4, $5)",
)
.bind(uuid::Uuid::now_v7())
.bind(repo.id)
.bind(&format!("{}/{}", src_wk.name, repo.name))
.bind(user_uid)
.bind(chrono::Utc::now())
.execute(&mut **txn.inner_mut())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
let updated = sqlx::query_as::<_, RepoModel>(
"UPDATE repo SET wk = $1, updated_at = $2 WHERE id = $3 \
RETURNING id, wk, name, description, default_branch, visibility, size_bytes, \
is_archived, is_template, is_mirror, created_by, storage_path, created_at, updated_at, deleted_at",
)
.bind(target_wk.id)
.bind(chrono::Utc::now())
.bind(repo.id)
.fetch_one(&mut **txn.inner_mut())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
txn.commit().await.map_err(|_| AppError::TxnError)?;
Ok(repo_response(updated))
}).await
}
pub async fn repo_topics(
&self,
ctx: &Session,
wk_name: &str,
repo_name: &str,
) -> Result<Vec<String>, AppError> {
let repo = self.git_require_member(ctx, wk_name, repo_name).await?;
let rows = sqlx::query_as::<_, RepoTopicModel>(
"SELECT repo, topic, created_at FROM repo_topic WHERE repo = $1",
)
.bind(repo.id)
.fetch_all(self.db.reader())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
Ok(rows.into_iter().map(|r| r.topic).collect())
}
pub async fn repo_update_topics(
&self,
ctx: &Session,
wk_name: &str,
repo_name: &str,
topics: Vec<String>,
) -> Result<Vec<String>, AppError> {
let user_uid = session_user(ctx)?;
let wk = self.workspace_resolve(wk_name).await?;
self.workspace_require_admin(wk.id, user_uid).await?;
let repo = self.repo_resolve(wk.id, repo_name).await?;
let mut txn = self.db.begin().await.map_err(|_| AppError::TxnError)?;
sqlx::query("DELETE FROM repo_topic WHERE repo = $1")
.bind(repo.id)
.execute(&mut **txn.inner_mut())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
for topic in &topics {
let topic = topic.trim();
if topic.is_empty() {
continue;
}
sqlx::query(
"INSERT INTO repo_topic (repo, topic, created_at) VALUES ($1, $2, $3)",
)
.bind(repo.id)
.bind(topic)
.bind(chrono::Utc::now())
.execute(&mut **txn.inner_mut())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
}
txn.commit().await.map_err(|_| AppError::TxnError)?;
Ok(topics)
}
/// CMDK BFF: list repo names + descriptions for a workspace.
pub async fn repo_list_inner(
&self,
wk_name: &str,
) -> Result<Vec<(String, Option<String>)>, AppError> {
let wk = sqlx::query_as::<_, (uuid::Uuid,)>(
"SELECT id FROM workspace WHERE name = $1",
)
.bind(wk_name)
.fetch_optional(self.db.reader())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?
.ok_or_else(|| AppError::NotFound("workspace not found".to_string()))?;
let rows = sqlx::query_as::<_, (String, Option<String>)>(
"SELECT name, description FROM repo WHERE wk = $1 AND deleted_at IS NULL ORDER BY updated_at DESC"
)
.bind(wk.0)
.fetch_all(self.db.reader())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
Ok(rows)
}
}