456 lines
15 KiB
Rust
456 lines
15 KiB
Rust
use db::sqlx;
|
|
use git::rpc::{proto as p, proto::init_service_client::InitServiceClient};
|
|
use model::repos::{RepoModel, RepoTopicModel};
|
|
use serde::{Deserialize, Serialize};
|
|
use session::Session;
|
|
|
|
use crate::{
|
|
AppService, Pagination, error::AppError, git::rpc_err,
|
|
metrics::with_op_metric, session_user,
|
|
};
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
|
|
pub struct RepoResponse {
|
|
#[schema(value_type = String)]
|
|
pub id: uuid::Uuid,
|
|
pub name: String,
|
|
pub description: Option<String>,
|
|
pub default_branch: String,
|
|
pub visibility: String,
|
|
#[schema(value_type = i64)]
|
|
pub size_bytes: i64,
|
|
pub is_archived: bool,
|
|
pub is_template: bool,
|
|
pub is_mirror: bool,
|
|
#[schema(value_type = String)]
|
|
pub created_by: uuid::Uuid,
|
|
#[schema(value_type = String)]
|
|
pub created_at: chrono::DateTime<chrono::Utc>,
|
|
#[schema(value_type = String)]
|
|
pub updated_at: chrono::DateTime<chrono::Utc>,
|
|
}
|
|
|
|
pub fn repo_response(repo: RepoModel) -> RepoResponse {
|
|
RepoResponse {
|
|
id: repo.id,
|
|
name: repo.name,
|
|
description: repo.description,
|
|
default_branch: repo.default_branch,
|
|
visibility: repo.visibility,
|
|
size_bytes: repo.size_bytes,
|
|
is_archived: repo.is_archived,
|
|
is_template: repo.is_template,
|
|
is_mirror: repo.is_mirror,
|
|
created_by: repo.created_by,
|
|
created_at: repo.created_at,
|
|
updated_at: repo.updated_at,
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, Deserialize, utoipa::ToSchema)]
|
|
pub struct UpdateRepo {
|
|
pub name: Option<String>,
|
|
pub description: Option<String>,
|
|
pub default_branch: Option<String>,
|
|
pub visibility: Option<String>,
|
|
pub is_archived: Option<bool>,
|
|
pub is_template: Option<bool>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Deserialize, utoipa::ToSchema)]
|
|
pub struct TransferRepo {
|
|
pub target_workspace: String,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Deserialize, utoipa::IntoParams)]
|
|
pub struct RepoFilter {
|
|
pub visibility: Option<String>,
|
|
pub is_archived: Option<bool>,
|
|
pub search: Option<String>,
|
|
}
|
|
|
|
impl AppService {
|
|
pub async fn repo_list(
|
|
&self,
|
|
ctx: &Session,
|
|
wk_name: &str,
|
|
filter: RepoFilter,
|
|
pagination: Pagination,
|
|
) -> Result<Vec<RepoResponse>, AppError> {
|
|
let user_uid = session_user(ctx)?;
|
|
let wk = self.workspace_resolve(wk_name).await?;
|
|
self.workspace_require_member(wk.id, user_uid).await?;
|
|
|
|
let mut param_idx = 2;
|
|
let mut sql = String::from(
|
|
"SELECT id, wk, name, description, default_branch, visibility, size_bytes, \
|
|
is_archived, is_template, is_mirror, created_by, storage_path, created_at, updated_at, deleted_at \
|
|
FROM repo WHERE wk = $1 AND deleted_at IS NULL",
|
|
);
|
|
|
|
if filter.visibility.is_some() {
|
|
sql.push_str(&format!(" AND visibility = ${param_idx}"));
|
|
param_idx += 1;
|
|
}
|
|
if filter.is_archived.is_some() {
|
|
sql.push_str(&format!(" AND is_archived = ${param_idx}"));
|
|
param_idx += 1;
|
|
}
|
|
if filter.search.is_some() {
|
|
sql.push_str(&format!(" AND name ILIKE ${param_idx}"));
|
|
param_idx += 1;
|
|
}
|
|
|
|
let offset_idx = param_idx;
|
|
let limit_idx = param_idx + 1;
|
|
sql.push_str(&format!(
|
|
" ORDER BY name ASC OFFSET ${offset_idx} LIMIT ${limit_idx}"
|
|
));
|
|
|
|
let mut q = sqlx::query_as::<_, RepoModel>(sqlx::AssertSqlSafe(sql))
|
|
.bind(wk.id);
|
|
|
|
if let Some(vis) = &filter.visibility {
|
|
q = q.bind(vis.clone());
|
|
}
|
|
if let Some(archived) = filter.is_archived {
|
|
q = q.bind(archived);
|
|
}
|
|
if let Some(search) = &filter.search {
|
|
q = q.bind(format!("%{}%", search));
|
|
}
|
|
|
|
q = q
|
|
.bind(pagination.offset() as i64)
|
|
.bind(pagination.limit() as i64);
|
|
|
|
let rows = q
|
|
.fetch_all(self.db.reader())
|
|
.await
|
|
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
|
|
|
|
Ok(rows.into_iter().map(repo_response).collect())
|
|
}
|
|
|
|
pub async fn repo_get(
|
|
&self,
|
|
ctx: &Session,
|
|
wk_name: &str,
|
|
repo_name: &str,
|
|
) -> Result<RepoResponse, AppError> {
|
|
let repo = self.git_require_member(ctx, wk_name, repo_name).await?;
|
|
Ok(repo_response(repo))
|
|
}
|
|
|
|
pub async fn repo_update(
|
|
&self,
|
|
ctx: &Session,
|
|
wk_name: &str,
|
|
repo_name: &str,
|
|
params: UpdateRepo,
|
|
) -> Result<RepoResponse, AppError> {
|
|
with_op_metric(&self.metrics.repo_operations_total, &["update"], async {
|
|
let user_uid = session_user(ctx)?;
|
|
let wk = self.workspace_resolve(wk_name).await?;
|
|
self.workspace_require_admin(wk.id, user_uid).await?;
|
|
let mut repo = self.repo_resolve(wk.id, repo_name).await?;
|
|
|
|
let next_name = match params.name {
|
|
Some(name) => {
|
|
let name = name.trim();
|
|
if name.is_empty() {
|
|
return Err(AppError::BadRequest(
|
|
"repo name is required".to_string(),
|
|
));
|
|
}
|
|
if name != repo.name {
|
|
let existing = sqlx::query_scalar::<_, bool>(
|
|
"SELECT EXISTS(SELECT 1 FROM repo WHERE wk = $1 AND name = $2 AND deleted_at IS NULL)",
|
|
)
|
|
.bind(wk.id)
|
|
.bind(name)
|
|
.fetch_one(self.db.reader())
|
|
.await
|
|
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
|
|
if existing {
|
|
return Err(AppError::RepoNameAlreadyExists);
|
|
}
|
|
Some(name.to_string())
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
None => None,
|
|
};
|
|
|
|
let mut txn = self.db.begin().await.map_err(|_| AppError::TxnError)?;
|
|
|
|
if let Some(next_name) = &next_name {
|
|
sqlx::query(
|
|
"INSERT INTO repo_history_name (id, repo, name, changed_by, created_at) \
|
|
VALUES ($1, $2, $3, $4, $5)",
|
|
)
|
|
.bind(uuid::Uuid::now_v7())
|
|
.bind(repo.id)
|
|
.bind(&repo.name)
|
|
.bind(user_uid)
|
|
.bind(chrono::Utc::now())
|
|
.execute(&mut **txn.inner_mut())
|
|
.await
|
|
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
|
|
repo.name = next_name.clone();
|
|
}
|
|
|
|
if let Some(desc) = params.description {
|
|
repo.description = if desc.is_empty() { None } else { Some(desc) };
|
|
}
|
|
let mut default_branch_changed = false;
|
|
if let Some(branch) = params.default_branch {
|
|
repo.default_branch = branch;
|
|
default_branch_changed = true;
|
|
}
|
|
if let Some(vis) = params.visibility {
|
|
repo.visibility = vis;
|
|
}
|
|
if let Some(archived) = params.is_archived {
|
|
repo.is_archived = archived;
|
|
}
|
|
if let Some(template) = params.is_template {
|
|
repo.is_template = template;
|
|
}
|
|
|
|
let updated = sqlx::query_as::<_, RepoModel>(
|
|
"UPDATE repo SET name = $1, description = $2, default_branch = $3, \
|
|
visibility = $4, is_archived = $5, is_template = $6, updated_at = $7 \
|
|
WHERE id = $8 \
|
|
RETURNING id, wk, name, description, default_branch, visibility, size_bytes, \
|
|
is_archived, is_template, is_mirror, created_by, storage_path, created_at, updated_at, deleted_at",
|
|
)
|
|
.bind(&repo.name)
|
|
.bind(&repo.description)
|
|
.bind(&repo.default_branch)
|
|
.bind(&repo.visibility)
|
|
.bind(repo.is_archived)
|
|
.bind(repo.is_template)
|
|
.bind(chrono::Utc::now())
|
|
.bind(repo.id)
|
|
.fetch_one(&mut **txn.inner_mut())
|
|
.await
|
|
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
|
|
|
|
txn.commit().await.map_err(|_| AppError::TxnError)?;
|
|
if default_branch_changed {
|
|
let mut client = InitServiceClient::new(self.git.clone());
|
|
let _ = client
|
|
.set_default_branch(tonic::Request::new(
|
|
p::SetDefaultBranchRequest {
|
|
repo_id: repo.id.to_string(),
|
|
branch_name: repo.default_branch.clone(),
|
|
},
|
|
))
|
|
.await
|
|
.map_err(rpc_err);
|
|
self.queue_sync(repo.id).await;
|
|
}
|
|
|
|
Ok(repo_response(updated))
|
|
}).await
|
|
}
|
|
|
|
pub async fn repo_archive(
|
|
&self,
|
|
ctx: &Session,
|
|
wk_name: &str,
|
|
repo_name: &str,
|
|
) -> Result<RepoResponse, AppError> {
|
|
self.repo_update(
|
|
ctx,
|
|
wk_name,
|
|
repo_name,
|
|
UpdateRepo {
|
|
name: None,
|
|
description: None,
|
|
default_branch: None,
|
|
visibility: None,
|
|
is_archived: Some(true),
|
|
is_template: None,
|
|
},
|
|
)
|
|
.await
|
|
}
|
|
|
|
pub async fn repo_delete(
|
|
&self,
|
|
ctx: &Session,
|
|
wk_name: &str,
|
|
repo_name: &str,
|
|
) -> Result<(), AppError> {
|
|
with_op_metric(
|
|
&self.metrics.repo_operations_total,
|
|
&["delete"],
|
|
async {
|
|
let user_uid = session_user(ctx)?;
|
|
let wk = self.workspace_resolve(wk_name).await?;
|
|
self.workspace_require_owner(wk.id, user_uid).await?;
|
|
let repo = self.repo_resolve(wk.id, repo_name).await?;
|
|
|
|
sqlx::query("UPDATE repo SET deleted_at = $1 WHERE id = $2")
|
|
.bind(chrono::Utc::now())
|
|
.bind(repo.id)
|
|
.execute(self.db.writer())
|
|
.await
|
|
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
|
|
|
|
Ok(())
|
|
},
|
|
)
|
|
.await
|
|
}
|
|
|
|
pub async fn repo_transfer(
|
|
&self,
|
|
ctx: &Session,
|
|
wk_name: &str,
|
|
repo_name: &str,
|
|
params: TransferRepo,
|
|
) -> Result<RepoResponse, AppError> {
|
|
with_op_metric(&self.metrics.repo_transfer_total, &[], async {
|
|
let user_uid = session_user(ctx)?;
|
|
let src_wk = self.workspace_resolve(wk_name).await?;
|
|
self.workspace_require_owner(src_wk.id, user_uid).await?;
|
|
let repo = self.repo_resolve(src_wk.id, repo_name).await?;
|
|
|
|
let target_wk =
|
|
self.workspace_resolve(¶ms.target_workspace).await?;
|
|
self.workspace_require_admin(target_wk.id, user_uid).await?;
|
|
|
|
let existing = sqlx::query_scalar::<_, bool>(
|
|
"SELECT EXISTS(SELECT 1 FROM repo WHERE wk = $1 AND name = $2 AND deleted_at IS NULL)",
|
|
)
|
|
.bind(target_wk.id)
|
|
.bind(&repo.name)
|
|
.fetch_one(self.db.reader())
|
|
.await
|
|
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
|
|
if existing {
|
|
return Err(AppError::Conflict(
|
|
"repo name already exists in target workspace".to_string(),
|
|
));
|
|
}
|
|
|
|
let mut txn = self.db.begin().await.map_err(|_| AppError::TxnError)?;
|
|
|
|
sqlx::query(
|
|
"INSERT INTO repo_history_name (id, repo, name, changed_by, created_at) \
|
|
VALUES ($1, $2, $3, $4, $5)",
|
|
)
|
|
.bind(uuid::Uuid::now_v7())
|
|
.bind(repo.id)
|
|
.bind(&format!("{}/{}", src_wk.name, repo.name))
|
|
.bind(user_uid)
|
|
.bind(chrono::Utc::now())
|
|
.execute(&mut **txn.inner_mut())
|
|
.await
|
|
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
|
|
|
|
let updated = sqlx::query_as::<_, RepoModel>(
|
|
"UPDATE repo SET wk = $1, updated_at = $2 WHERE id = $3 \
|
|
RETURNING id, wk, name, description, default_branch, visibility, size_bytes, \
|
|
is_archived, is_template, is_mirror, created_by, storage_path, created_at, updated_at, deleted_at",
|
|
)
|
|
.bind(target_wk.id)
|
|
.bind(chrono::Utc::now())
|
|
.bind(repo.id)
|
|
.fetch_one(&mut **txn.inner_mut())
|
|
.await
|
|
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
|
|
|
|
txn.commit().await.map_err(|_| AppError::TxnError)?;
|
|
Ok(repo_response(updated))
|
|
}).await
|
|
}
|
|
|
|
pub async fn repo_topics(
|
|
&self,
|
|
ctx: &Session,
|
|
wk_name: &str,
|
|
repo_name: &str,
|
|
) -> Result<Vec<String>, AppError> {
|
|
let repo = self.git_require_member(ctx, wk_name, repo_name).await?;
|
|
|
|
let rows = sqlx::query_as::<_, RepoTopicModel>(
|
|
"SELECT repo, topic, created_at FROM repo_topic WHERE repo = $1",
|
|
)
|
|
.bind(repo.id)
|
|
.fetch_all(self.db.reader())
|
|
.await
|
|
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
|
|
|
|
Ok(rows.into_iter().map(|r| r.topic).collect())
|
|
}
|
|
|
|
pub async fn repo_update_topics(
|
|
&self,
|
|
ctx: &Session,
|
|
wk_name: &str,
|
|
repo_name: &str,
|
|
topics: Vec<String>,
|
|
) -> Result<Vec<String>, AppError> {
|
|
let user_uid = session_user(ctx)?;
|
|
let wk = self.workspace_resolve(wk_name).await?;
|
|
self.workspace_require_admin(wk.id, user_uid).await?;
|
|
let repo = self.repo_resolve(wk.id, repo_name).await?;
|
|
|
|
let mut txn = self.db.begin().await.map_err(|_| AppError::TxnError)?;
|
|
|
|
sqlx::query("DELETE FROM repo_topic WHERE repo = $1")
|
|
.bind(repo.id)
|
|
.execute(&mut **txn.inner_mut())
|
|
.await
|
|
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
|
|
|
|
for topic in &topics {
|
|
let topic = topic.trim();
|
|
if topic.is_empty() {
|
|
continue;
|
|
}
|
|
sqlx::query(
|
|
"INSERT INTO repo_topic (repo, topic, created_at) VALUES ($1, $2, $3)",
|
|
)
|
|
.bind(repo.id)
|
|
.bind(topic)
|
|
.bind(chrono::Utc::now())
|
|
.execute(&mut **txn.inner_mut())
|
|
.await
|
|
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
|
|
}
|
|
|
|
txn.commit().await.map_err(|_| AppError::TxnError)?;
|
|
Ok(topics)
|
|
}
|
|
|
|
/// CMDK BFF: list repo names + descriptions for a workspace.
|
|
pub async fn repo_list_inner(
|
|
&self,
|
|
wk_name: &str,
|
|
) -> Result<Vec<(String, Option<String>)>, AppError> {
|
|
let wk = sqlx::query_as::<_, (uuid::Uuid,)>(
|
|
"SELECT id FROM workspace WHERE name = $1",
|
|
)
|
|
.bind(wk_name)
|
|
.fetch_optional(self.db.reader())
|
|
.await
|
|
.map_err(|e| AppError::DatabaseError(e.to_string()))?
|
|
.ok_or_else(|| AppError::NotFound("workspace not found".to_string()))?;
|
|
|
|
let rows = sqlx::query_as::<_, (String, Option<String>)>(
|
|
"SELECT name, description FROM repo WHERE wk = $1 AND deleted_at IS NULL ORDER BY updated_at DESC"
|
|
)
|
|
.bind(wk.0)
|
|
.fetch_all(self.db.reader())
|
|
.await
|
|
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
|
|
Ok(rows)
|
|
}
|
|
}
|