gitdataai/libs/service/pull_request/pull_request.rs
2026-04-15 09:08:09 +08:00

842 lines
28 KiB
Rust

use crate::AppService;
use crate::error::AppError;
use crate::git::CommitMeta;
use crate::git::diff::{SideBySideDiffQuery, SideBySideDiffResponse};
use crate::project::activity::ActivityLogParams;
use chrono::Utc;
use models::projects::{MemberRole, project_members};
use models::pull_request::{PrStatus, pull_request};
use models::repos::repo;
use models::users::user;
use redis::AsyncCommands;
use sea_orm::*;
use serde::{Deserialize, Serialize};
use session::Session;
use utoipa::ToSchema;
use uuid::Uuid;
#[derive(Debug, Clone, Deserialize, ToSchema)]
pub struct PullRequestCreateRequest {
pub title: String,
pub body: Option<String>,
pub base: String,
pub head: String,
#[serde(default)]
pub draft: bool,
}
#[derive(Debug, Clone, Deserialize, ToSchema)]
pub struct PullRequestUpdateRequest {
pub title: Option<String>,
pub body: Option<String>,
pub base: Option<String>,
#[serde(default)]
pub draft: Option<bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct PullRequestResponse {
pub repo: Uuid,
pub number: i64,
pub issue: Option<Uuid>,
pub title: String,
pub body: Option<String>,
pub author: Uuid,
pub author_username: Option<String>,
pub base: String,
pub head: String,
pub status: String,
pub merged_by: Option<Uuid>,
pub created_at: chrono::DateTime<Utc>,
pub updated_at: chrono::DateTime<Utc>,
pub merged_at: Option<chrono::DateTime<Utc>>,
pub created_by_ai: bool,
}
impl From<pull_request::Model> for PullRequestResponse {
fn from(pr: pull_request::Model) -> Self {
Self {
repo: pr.repo,
number: pr.number,
issue: Some(pr.issue),
title: pr.title,
body: pr.body,
author: pr.author,
author_username: None,
base: pr.base,
head: pr.head,
status: pr.status,
merged_by: pr.merged_by,
created_at: pr.created_at,
updated_at: pr.updated_at,
merged_at: pr.merged_at,
created_by_ai: pr.created_by_ai,
}
}
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct PullRequestListResponse {
pub pull_requests: Vec<PullRequestResponse>,
pub total: u64,
pub page: i64,
pub per_page: i64,
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct PullRequestSummaryResponse {
pub total: u64,
pub open: u64,
pub merged: u64,
pub closed: u64,
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct PullRequestStatusResponse {
pub status: String,
pub can_merge: bool,
pub merge_analysis: Option<MergeAnalysisResult>,
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct MergeAnalysisResult {
pub is_fast_forward: bool,
pub is_up_to_date: bool,
pub is_normal: bool,
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct PrCommitResponse {
pub oid: String,
pub short_oid: String,
pub message: String,
pub summary: String,
pub author_name: String,
pub author_email: String,
pub authored_at: chrono::DateTime<Utc>,
pub committer_name: String,
pub committer_email: String,
pub committed_at: chrono::DateTime<Utc>,
}
impl From<CommitMeta> for PrCommitResponse {
fn from(c: CommitMeta) -> Self {
let oid_str = c.oid.to_string();
fn sig_to_dt(time_secs: i64) -> chrono::DateTime<Utc> {
chrono::DateTime::from_timestamp(time_secs, 0)
.unwrap_or_else(|| chrono::DateTime::from_timestamp(0, 0).unwrap())
}
Self {
oid: oid_str.clone(),
short_oid: oid_str[..std::cmp::min(7, oid_str.len())].to_string(),
message: c.message,
summary: c.summary,
author_name: c.author.name,
author_email: c.author.email,
authored_at: sig_to_dt(c.author.time_secs),
committer_name: c.committer.name,
committer_email: c.committer.email,
committed_at: sig_to_dt(c.committer.time_secs),
}
}
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct PrCommitsListResponse {
pub commits: Vec<PrCommitResponse>,
}
impl AppService {
/// List pull requests for a repo.
pub async fn pull_request_list(
&self,
namespace: String,
repo_name: String,
status: Option<String>,
page: Option<i64>,
per_page: Option<i64>,
ctx: &Session,
) -> Result<PullRequestListResponse, AppError> {
let repo = self.utils_find_repo(namespace, repo_name, ctx).await?;
let page = page.unwrap_or(1);
let per_page = per_page.unwrap_or(20);
let offset = (page - 1) * per_page;
let mut query = pull_request::Entity::find()
.filter(pull_request::Column::Repo.eq(repo.id))
.order_by_desc(pull_request::Column::CreatedAt);
if let Some(ref s) = status {
query = query.filter(pull_request::Column::Status.eq(s));
}
let total = query.clone().count(&self.db).await?;
let prs = query
.offset(offset as u64)
.limit(per_page as u64)
.all(&self.db)
.await?;
let author_ids: Vec<Uuid> = prs.iter().map(|p| p.author).collect();
let authors = if author_ids.is_empty() {
vec![]
} else {
user::Entity::find()
.filter(user::Column::Uid.is_in(author_ids))
.all(&self.db)
.await?
};
let responses: Vec<PullRequestResponse> = prs
.into_iter()
.map(|pr| {
let username = authors
.iter()
.find(|u| u.uid == pr.author)
.map(|u| u.username.clone());
PullRequestResponse {
author_username: username,
..PullRequestResponse::from(pr)
}
})
.collect();
Ok(PullRequestListResponse {
pull_requests: responses,
total,
page,
per_page,
})
}
/// Get a single pull request.
pub async fn pull_request_get(
&self,
namespace: String,
repo_name: String,
number: i64,
ctx: &Session,
) -> Result<PullRequestResponse, AppError> {
let repo = self.utils_find_repo(namespace, repo_name, ctx).await?;
let cache_key = format!("pr:get:{}:{}", repo.id, number);
if let Ok(mut conn) = self.cache.conn().await {
if let Ok(cached) = conn.get::<_, String>(cache_key.clone()).await {
if let Ok(cached) = serde_json::from_str::<PullRequestResponse>(&cached) {
return Ok(cached);
}
}
}
let pr = pull_request::Entity::find()
.filter(pull_request::Column::Repo.eq(repo.id))
.filter(pull_request::Column::Number.eq(number))
.one(&self.db)
.await?
.ok_or(AppError::NotFound("Pull request not found".to_string()))?;
let author = user::Entity::find_by_id(pr.author)
.one(&self.db)
.await
.ok()
.flatten();
let username = author.map(|u| u.username);
let response = PullRequestResponse {
author_username: username,
..PullRequestResponse::from(pr)
};
if let Ok(mut conn) = self.cache.conn().await {
let _: Option<()> = conn
.set_ex::<String, String, ()>(
cache_key,
serde_json::to_string(&response).unwrap_or_default(),
300,
)
.await
.ok();
}
Ok(response)
}
/// Get the next PR number for a repo.
async fn next_pr_number(&self, repo_id: Uuid) -> Result<i64, AppError> {
let max_num: Option<Option<i64>> = pull_request::Entity::find()
.filter(pull_request::Column::Repo.eq(repo_id))
.select_only()
.column_as(pull_request::Column::Number.max(), "max_num")
.into_tuple::<Option<i64>>()
.one(&self.db)
.await?;
Ok(max_num.flatten().unwrap_or(0) + 1)
}
pub async fn pull_request_create(
&self,
namespace: String,
repo_name: String,
request: PullRequestCreateRequest,
ctx: &Session,
) -> Result<PullRequestResponse, AppError> {
let user_uid = ctx.user().ok_or(AppError::Unauthorized)?;
let repo: repo::Model = self
.utils_check_repo_admin(namespace.clone(), repo_name.clone(), ctx)
.await?;
let number = self.next_pr_number(repo.id).await?;
let now = Utc::now();
let status = if request.draft {
PrStatus::Draft
} else {
PrStatus::Open
};
let active = pull_request::ActiveModel {
repo: Set(repo.id),
number: Set(number),
issue: Set(Uuid::now_v7()),
title: Set(request.title),
body: Set(request.body),
author: Set(user_uid),
base: Set(request.base),
head: Set(request.head),
status: Set(status.to_string()),
merged_by: Set(None),
created_at: Set(now),
updated_at: Set(now),
merged_at: Set(None),
created_by_ai: Set(false),
..Default::default()
};
let model = active.insert(&self.db).await?;
let actor_username = user::Entity::find_by_id(user_uid)
.one(&self.db)
.await
.ok()
.flatten()
.map(|u| u.username)
.unwrap_or_default();
let _ = self
.project_log_activity(
repo.project,
Some(repo.id),
user_uid,
super::super::project::activity::ActivityLogParams {
event_type: "pr_open".to_string(),
title: format!(
"{} opened pull request #{}: {}",
actor_username, number, model.title
),
repo_id: Some(repo.id),
content: None,
event_id: None,
event_sub_id: Some(number),
metadata: Some(serde_json::json!({
"base": model.base,
"head": model.head,
})),
is_private: false,
},
)
.await;
if repo.ai_code_review_enabled && !request.draft {
let this = self.clone();
let namespace = namespace.clone();
let repo_name = repo.repo_name.clone();
let pr_number = Some(number);
let repo_for_bg = repo.clone();
tokio::spawn(async move {
let _ = this
.trigger_ai_code_review_internal(
namespace,
repo_name,
pr_number,
None,
repo_for_bg,
)
.await;
});
}
Ok(PullRequestResponse::from(model))
}
/// Update a PR (title, body, base, draft status).
/// Only the PR author OR admin/owner can update.
pub async fn pull_request_update(
&self,
namespace: String,
repo_name: String,
number: i64,
request: PullRequestUpdateRequest,
ctx: &Session,
) -> Result<PullRequestResponse, AppError> {
let user_uid = ctx.user().ok_or(AppError::Unauthorized)?;
let repo: repo::Model = self
.utils_check_repo_admin(namespace.clone(), repo_name.clone(), ctx)
.await?;
let pr = pull_request::Entity::find()
.filter(pull_request::Column::Repo.eq(repo.id))
.filter(pull_request::Column::Number.eq(number))
.one(&self.db)
.await?
.ok_or(AppError::NotFound("Pull request not found".to_string()))?;
// Permission: author OR admin/owner
let is_author = pr.author == user_uid;
let member = project_members::Entity::find()
.filter(project_members::Column::Project.eq(repo.project))
.filter(project_members::Column::User.eq(user_uid))
.one(&self.db)
.await?
.ok_or(AppError::NoPower)?;
let role = member.scope_role().map_err(|_| AppError::RoleParseError)?;
let is_admin = role == MemberRole::Admin || role == MemberRole::Owner;
if !is_author && !is_admin {
return Err(AppError::NoPower);
}
// Cannot update a merged PR
if pr.status == PrStatus::Merged.to_string() {
return Err(AppError::BadRequest(
"Cannot update a merged pull request".to_string(),
));
}
let mut active: pull_request::ActiveModel = pr.clone().into();
if let Some(title) = request.title {
active.title = Set(title);
}
if let Some(body) = request.body {
active.body = Set(Some(body));
}
if let Some(base) = request.base {
active.base = Set(base);
}
if let Some(draft) = request.draft {
active.status = Set(if draft {
PrStatus::Draft.to_string()
} else {
PrStatus::Open.to_string()
});
}
active.updated_at = Set(Utc::now());
let model = active.update(&self.db).await?;
super::invalidate_pr_cache(&self.cache, repo.id, number).await;
let actor_username = user::Entity::find_by_id(user_uid)
.one(&self.db)
.await
.ok()
.flatten()
.map(|u| u.username)
.unwrap_or_default();
let _ = self
.project_log_activity(
repo.project,
Some(repo.id),
user_uid,
super::super::project::activity::ActivityLogParams {
event_type: "pr_update".to_string(),
title: format!("{} updated pull request #{}", actor_username, number),
repo_id: Some(repo.id),
content: Some(model.title.clone()),
event_id: None,
event_sub_id: Some(number),
metadata: None,
is_private: false,
},
)
.await;
Ok(PullRequestResponse::from(model))
}
/// Close a pull request. Author OR admin/owner only.
pub async fn pull_request_close(
&self,
namespace: String,
repo_name: String,
number: i64,
ctx: &Session,
) -> Result<PullRequestResponse, AppError> {
self.pr_set_status(namespace, repo_name, number, PrStatus::Closed, ctx)
.await
}
/// Reopen a pull request. Author OR admin/owner only.
pub async fn pull_request_reopen(
&self,
namespace: String,
repo_name: String,
number: i64,
ctx: &Session,
) -> Result<PullRequestResponse, AppError> {
self.pr_set_status(namespace, repo_name, number, PrStatus::Open, ctx)
.await
}
pub async fn pull_request_delete(
&self,
namespace: String,
repo_name: String,
number: i64,
ctx: &Session,
) -> Result<(), AppError> {
let user_uid = ctx.user().ok_or(AppError::Unauthorized)?;
let repo: repo::Model = self
.utils_check_repo_admin(namespace.clone(), repo_name.clone(), ctx)
.await?;
let pr = pull_request::Entity::find()
.filter(pull_request::Column::Repo.eq(repo.id))
.filter(pull_request::Column::Number.eq(number))
.one(&self.db)
.await?
.ok_or(AppError::NotFound("Pull request not found".to_string()))?;
// Permission: author OR admin/owner
let is_author = pr.author == user_uid;
let member = project_members::Entity::find()
.filter(project_members::Column::Project.eq(repo.project))
.filter(project_members::Column::User.eq(user_uid))
.one(&self.db)
.await?
.ok_or(AppError::NoPower)?;
let role = member.scope_role().map_err(|_| AppError::RoleParseError)?;
let is_admin = role == MemberRole::Admin || role == MemberRole::Owner;
if !is_author && !is_admin {
return Err(AppError::NoPower);
}
// Cascade delete related records
models::pull_request::PullRequestCommit::delete_many()
.filter(models::pull_request::pull_request_commit::Column::Repo.eq(repo.id))
.filter(models::pull_request::pull_request_commit::Column::Number.eq(number))
.exec(&self.db)
.await?;
models::pull_request::PullRequestReview::delete_many()
.filter(models::pull_request::pull_request_review::Column::Repo.eq(repo.id))
.filter(models::pull_request::pull_request_review::Column::Number.eq(number))
.exec(&self.db)
.await?;
models::pull_request::PullRequestReviewComment::delete_many()
.filter(models::pull_request::pull_request_review_comment::Column::Repo.eq(repo.id))
.filter(models::pull_request::pull_request_review_comment::Column::Number.eq(number))
.exec(&self.db)
.await?;
pull_request::Entity::delete_by_id((repo.id, number))
.exec(&self.db)
.await?;
super::invalidate_pr_cache(&self.cache, repo.id, number).await;
let pr_title = pr.title.clone();
let _ = self
.project_log_activity(
repo.project,
Some(repo.id),
user_uid,
ActivityLogParams {
event_type: "pr_delete".to_string(),
title: format!(
"{} deleted pull request #{}: {}",
user_uid, number, pr_title
),
repo_id: Some(repo.id),
content: Some(pr_title.clone()),
event_id: None,
event_sub_id: Some(number),
metadata: Some(serde_json::json!({
"pr_number": number,
"pr_title": pr_title,
"base": pr.base,
"head": pr.head,
})),
is_private: false,
},
)
.await;
Ok(())
}
/// Get PR summary counts.
pub async fn pull_request_summary(
&self,
namespace: String,
repo_name: String,
ctx: &Session,
) -> Result<PullRequestSummaryResponse, AppError> {
let repo = self.utils_find_repo(namespace, repo_name, ctx).await?;
let total: u64 = pull_request::Entity::find()
.filter(pull_request::Column::Repo.eq(repo.id))
.count(&self.db)
.await?;
let open: u64 = pull_request::Entity::find()
.filter(pull_request::Column::Repo.eq(repo.id))
.filter(pull_request::Column::Status.eq(PrStatus::Open.to_string()))
.count(&self.db)
.await?;
let merged: u64 = pull_request::Entity::find()
.filter(pull_request::Column::Repo.eq(repo.id))
.filter(pull_request::Column::Status.eq(PrStatus::Merged.to_string()))
.count(&self.db)
.await?;
let closed: u64 = pull_request::Entity::find()
.filter(pull_request::Column::Repo.eq(repo.id))
.filter(pull_request::Column::Status.eq(PrStatus::Closed.to_string()))
.count(&self.db)
.await?;
Ok(PullRequestSummaryResponse {
total,
open,
merged,
closed,
})
}
async fn pr_set_status(
&self,
namespace: String,
repo_name: String,
number: i64,
status: PrStatus,
ctx: &Session,
) -> Result<PullRequestResponse, AppError> {
let user_uid = ctx.user().ok_or(AppError::Unauthorized)?;
let repo: repo::Model = self
.utils_check_repo_admin(namespace.clone(), repo_name.clone(), ctx)
.await?;
let pr = pull_request::Entity::find()
.filter(pull_request::Column::Repo.eq(repo.id))
.filter(pull_request::Column::Number.eq(number))
.one(&self.db)
.await?
.ok_or(AppError::NotFound("Pull request not found".to_string()))?;
// Permission: author OR admin/owner
let is_author = pr.author == user_uid;
let member = project_members::Entity::find()
.filter(project_members::Column::Project.eq(repo.project))
.filter(project_members::Column::User.eq(user_uid))
.one(&self.db)
.await?
.ok_or(AppError::NoPower)?;
let role = member.scope_role().map_err(|_| AppError::RoleParseError)?;
let is_admin = role == MemberRole::Admin || role == MemberRole::Owner;
if !is_author && !is_admin {
return Err(AppError::NoPower);
}
if pr.status == PrStatus::Merged.to_string() {
return Err(AppError::BadRequest(
"Cannot modify a merged pull request".to_string(),
));
}
let mut active: pull_request::ActiveModel = pr.clone().into();
active.status = Set(status.to_string());
active.updated_at = Set(Utc::now());
let model = active.update(&self.db).await?;
super::invalidate_pr_cache(&self.cache, repo.id, number).await;
let actor_username = user::Entity::find_by_id(user_uid)
.one(&self.db)
.await
.ok()
.flatten()
.map(|u| u.username)
.unwrap_or_default();
let event_type = if status == PrStatus::Closed {
"pr_close"
} else {
"pr_reopen"
};
let _ = self
.project_log_activity(
repo.project,
Some(repo.id),
user_uid,
super::super::project::activity::ActivityLogParams {
event_type: event_type.to_string(),
title: format!(
"{} {} pull request #{}",
actor_username,
if status == PrStatus::Closed {
"closed"
} else {
"reopened"
},
number
),
repo_id: Some(repo.id),
content: Some(model.title.clone()),
event_id: None,
event_sub_id: Some(number),
metadata: None,
is_private: false,
},
)
.await;
Ok(PullRequestResponse::from(model))
}
/// List all commits in a pull request (from base..head).
pub async fn pr_commits_list(
&self,
namespace: String,
repo_name: String,
pr_number: i64,
ctx: &Session,
) -> Result<PrCommitsListResponse, AppError> {
let repo = self
.utils_find_repo(namespace.clone(), repo_name.clone(), ctx)
.await?;
let pr = pull_request::Entity::find()
.filter(pull_request::Column::Repo.eq(repo.id))
.filter(pull_request::Column::Number.eq(pr_number))
.one(&self.db)
.await?
.ok_or(AppError::NotFound("Pull request not found".to_string()))?;
// Clone repo once so it can be used in both spawn_blocking calls
let repo_for_base = repo.clone();
let repo_for_commits = repo.clone();
let (base_oid, head_oid) = tokio::task::spawn_blocking({
let pr = pr.clone();
move || {
let domain = crate::git::GitDomain::from_model(repo_for_base)?;
let base_ref = if pr.base.starts_with("refs/") {
pr.base.clone()
} else {
format!("refs/heads/{}", pr.base)
};
let head_ref = if pr.head.starts_with("refs/") {
pr.head.clone()
} else {
format!("refs/heads/{}", pr.head)
};
let base_oid = domain
.ref_target(&base_ref)
.map_err(|e| crate::git::GitError::Internal(e.to_string()))?
.ok_or_else(|| {
crate::git::GitError::NotFound(format!(
"Base branch '{}' not found",
pr.base
))
})?;
let head_oid = domain
.ref_target(&head_ref)
.map_err(|e| crate::git::GitError::Internal(e.to_string()))?
.ok_or_else(|| {
crate::git::GitError::NotFound(format!(
"Head branch '{}' not found",
pr.head
))
})?;
Ok::<_, crate::git::GitError>((base_oid, head_oid))
}
})
.await
.map_err(|e| AppError::InternalServerError(format!("Task join error: {e}")))?
.map_err(AppError::from)?;
let commits = tokio::task::spawn_blocking(move || {
let domain = crate::git::GitDomain::from_model(repo_for_commits)?;
let range = format!("{}..{}", base_oid, head_oid);
let metas = domain.commit_log(Some(&range), 0, 500)?;
Ok::<_, crate::git::GitError>(metas)
})
.await
.map_err(|e| AppError::InternalServerError(format!("Task join error: {e}")))?
.map_err(AppError::from)?;
let commits: Vec<PrCommitResponse> =
commits.into_iter().map(PrCommitResponse::from).collect();
Ok(PrCommitsListResponse { commits })
}
/// Get the side-by-side diff for a pull request.
///
/// Resolves the PR's base and head branch refs to commit OIDs and
/// generates a side-by-side diff suitable for UI rendering.
pub async fn pr_diff_side_by_side(
&self,
namespace: String,
repo_name: String,
pr_number: i64,
query: SideBySideDiffQuery,
ctx: &Session,
) -> Result<SideBySideDiffResponse, AppError> {
let repo = self
.utils_find_repo(namespace.clone(), repo_name.clone(), ctx)
.await?;
let pr = pull_request::Entity::find()
.filter(pull_request::Column::Repo.eq(repo.id))
.filter(pull_request::Column::Number.eq(pr_number))
.one(&self.db)
.await?
.ok_or(AppError::NotFound("Pull request not found".to_string()))?;
let (base_oid, head_oid) = tokio::task::spawn_blocking(move || {
let domain = crate::git::GitDomain::from_model(repo)?;
let base_oid = domain
.branch_target(&pr.base)
.map_err(|e| crate::git::GitError::Internal(e.to_string()))?
.ok_or_else(|| {
crate::git::GitError::NotFound(format!("Branch '{}' not found", pr.base))
})?;
let head_oid = domain
.branch_target(&pr.head)
.map_err(|e| crate::git::GitError::Internal(e.to_string()))?
.ok_or_else(|| {
crate::git::GitError::NotFound(format!("Branch '{}' not found", pr.head))
})?;
Ok::<_, crate::git::GitError>((base_oid, head_oid))
})
.await
.map_err(|e| AppError::InternalServerError(format!("Task join error: {e}")))?
.map_err(AppError::from)?;
let diff_query = SideBySideDiffQuery {
base: base_oid.to_string(),
head: head_oid.to_string(),
pathspec: query.pathspec,
context_lines: query.context_lines,
};
self.git_diff_side_by_side(namespace, repo_name, diff_query, ctx)
.await
}
}