use crate::AppService; use crate::error::AppError; use crate::git::CommitMeta; use crate::git::diff::{SideBySideDiffQuery, SideBySideDiffResponse}; use crate::project::activity::ActivityLogParams; use chrono::Utc; use models::projects::{MemberRole, project_members}; use models::pull_request::{PrStatus, pull_request}; use models::repos::repo; use models::users::user; use redis::AsyncCommands; use sea_orm::*; use serde::{Deserialize, Serialize}; use session::Session; use utoipa::ToSchema; use uuid::Uuid; #[derive(Debug, Clone, Deserialize, ToSchema)] pub struct PullRequestCreateRequest { pub title: String, pub body: Option, pub base: String, pub head: String, #[serde(default)] pub draft: bool, } #[derive(Debug, Clone, Deserialize, ToSchema)] pub struct PullRequestUpdateRequest { pub title: Option, pub body: Option, pub base: Option, #[serde(default)] pub draft: Option, } #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct PullRequestResponse { pub repo: Uuid, pub number: i64, pub issue: Option, pub title: String, pub body: Option, pub author: Uuid, pub author_username: Option, pub base: String, pub head: String, pub status: String, pub merged_by: Option, pub created_at: chrono::DateTime, pub updated_at: chrono::DateTime, pub merged_at: Option>, pub created_by_ai: bool, } impl From for PullRequestResponse { fn from(pr: pull_request::Model) -> Self { Self { repo: pr.repo, number: pr.number, issue: Some(pr.issue), title: pr.title, body: pr.body, author: pr.author, author_username: None, base: pr.base, head: pr.head, status: pr.status, merged_by: pr.merged_by, created_at: pr.created_at, updated_at: pr.updated_at, merged_at: pr.merged_at, created_by_ai: pr.created_by_ai, } } } #[derive(Debug, Clone, Serialize, ToSchema)] pub struct PullRequestListResponse { pub pull_requests: Vec, pub total: u64, pub page: i64, pub per_page: i64, } #[derive(Debug, Clone, Serialize, ToSchema)] pub struct PullRequestSummaryResponse { pub total: u64, pub open: u64, pub merged: u64, pub closed: u64, } #[derive(Debug, Clone, Serialize, ToSchema)] pub struct PullRequestStatusResponse { pub status: String, pub can_merge: bool, pub merge_analysis: Option, } #[derive(Debug, Clone, Serialize, ToSchema)] pub struct MergeAnalysisResult { pub is_fast_forward: bool, pub is_up_to_date: bool, pub is_normal: bool, } #[derive(Debug, Clone, Serialize, ToSchema)] pub struct PrCommitResponse { pub oid: String, pub short_oid: String, pub message: String, pub summary: String, pub author_name: String, pub author_email: String, pub authored_at: chrono::DateTime, pub committer_name: String, pub committer_email: String, pub committed_at: chrono::DateTime, } impl From for PrCommitResponse { fn from(c: CommitMeta) -> Self { let oid_str = c.oid.to_string(); fn sig_to_dt(time_secs: i64) -> chrono::DateTime { chrono::DateTime::from_timestamp(time_secs, 0) .unwrap_or_else(|| chrono::DateTime::from_timestamp(0, 0).unwrap()) } Self { oid: oid_str.clone(), short_oid: oid_str[..std::cmp::min(7, oid_str.len())].to_string(), message: c.message, summary: c.summary, author_name: c.author.name, author_email: c.author.email, authored_at: sig_to_dt(c.author.time_secs), committer_name: c.committer.name, committer_email: c.committer.email, committed_at: sig_to_dt(c.committer.time_secs), } } } #[derive(Debug, Clone, Serialize, ToSchema)] pub struct PrCommitsListResponse { pub commits: Vec, } impl AppService { /// List pull requests for a repo. pub async fn pull_request_list( &self, namespace: String, repo_name: String, status: Option, page: Option, per_page: Option, ctx: &Session, ) -> Result { let repo = self.utils_find_repo(namespace, repo_name, ctx).await?; let page = page.unwrap_or(1); let per_page = per_page.unwrap_or(20); let offset = (page - 1) * per_page; let mut query = pull_request::Entity::find() .filter(pull_request::Column::Repo.eq(repo.id)) .order_by_desc(pull_request::Column::CreatedAt); if let Some(ref s) = status { query = query.filter(pull_request::Column::Status.eq(s)); } let total = query.clone().count(&self.db).await?; let prs = query .offset(offset as u64) .limit(per_page as u64) .all(&self.db) .await?; let author_ids: Vec = prs.iter().map(|p| p.author).collect(); let authors = if author_ids.is_empty() { vec![] } else { user::Entity::find() .filter(user::Column::Uid.is_in(author_ids)) .all(&self.db) .await? }; let responses: Vec = prs .into_iter() .map(|pr| { let username = authors .iter() .find(|u| u.uid == pr.author) .map(|u| u.username.clone()); PullRequestResponse { author_username: username, ..PullRequestResponse::from(pr) } }) .collect(); Ok(PullRequestListResponse { pull_requests: responses, total, page, per_page, }) } /// Get a single pull request. pub async fn pull_request_get( &self, namespace: String, repo_name: String, number: i64, ctx: &Session, ) -> Result { let repo = self.utils_find_repo(namespace, repo_name, ctx).await?; let cache_key = format!("pr:get:{}:{}", repo.id, number); if let Ok(mut conn) = self.cache.conn().await { if let Ok(cached) = conn.get::<_, String>(cache_key.clone()).await { if let Ok(cached) = serde_json::from_str::(&cached) { return Ok(cached); } } } let pr = pull_request::Entity::find() .filter(pull_request::Column::Repo.eq(repo.id)) .filter(pull_request::Column::Number.eq(number)) .one(&self.db) .await? .ok_or(AppError::NotFound("Pull request not found".to_string()))?; let author = user::Entity::find_by_id(pr.author) .one(&self.db) .await .ok() .flatten(); let username = author.map(|u| u.username); let response = PullRequestResponse { author_username: username, ..PullRequestResponse::from(pr) }; if let Ok(mut conn) = self.cache.conn().await { let _: Option<()> = conn .set_ex::( cache_key, serde_json::to_string(&response).unwrap_or_default(), 300, ) .await .ok(); } Ok(response) } /// Get the next PR number for a repo. async fn next_pr_number(&self, repo_id: Uuid) -> Result { let max_num: Option> = pull_request::Entity::find() .filter(pull_request::Column::Repo.eq(repo_id)) .select_only() .column_as(pull_request::Column::Number.max(), "max_num") .into_tuple::>() .one(&self.db) .await?; Ok(max_num.flatten().unwrap_or(0) + 1) } pub async fn pull_request_create( &self, namespace: String, repo_name: String, request: PullRequestCreateRequest, ctx: &Session, ) -> Result { let user_uid = ctx.user().ok_or(AppError::Unauthorized)?; let repo: repo::Model = self .utils_check_repo_admin(namespace.clone(), repo_name.clone(), ctx) .await?; let number = self.next_pr_number(repo.id).await?; let now = Utc::now(); let status = if request.draft { PrStatus::Draft } else { PrStatus::Open }; let active = pull_request::ActiveModel { repo: Set(repo.id), number: Set(number), issue: Set(Uuid::now_v7()), title: Set(request.title), body: Set(request.body), author: Set(user_uid), base: Set(request.base), head: Set(request.head), status: Set(status.to_string()), merged_by: Set(None), created_at: Set(now), updated_at: Set(now), merged_at: Set(None), created_by_ai: Set(false), ..Default::default() }; let model = active.insert(&self.db).await?; let actor_username = user::Entity::find_by_id(user_uid) .one(&self.db) .await .ok() .flatten() .map(|u| u.username) .unwrap_or_default(); let _ = self .project_log_activity( repo.project, Some(repo.id), user_uid, super::super::project::activity::ActivityLogParams { event_type: "pr_open".to_string(), title: format!( "{} opened pull request #{}: {}", actor_username, number, model.title ), repo_id: Some(repo.id), content: None, event_id: None, event_sub_id: Some(number), metadata: Some(serde_json::json!({ "base": model.base, "head": model.head, })), is_private: false, }, ) .await; if repo.ai_code_review_enabled && !request.draft { let this = self.clone(); let namespace = namespace.clone(); let repo_name = repo.repo_name.clone(); let pr_number = Some(number); let repo_for_bg = repo.clone(); tokio::spawn(async move { let _ = this .trigger_ai_code_review_internal( namespace, repo_name, pr_number, None, repo_for_bg, ) .await; }); } Ok(PullRequestResponse::from(model)) } /// Update a PR (title, body, base, draft status). /// Only the PR author OR admin/owner can update. pub async fn pull_request_update( &self, namespace: String, repo_name: String, number: i64, request: PullRequestUpdateRequest, ctx: &Session, ) -> Result { let user_uid = ctx.user().ok_or(AppError::Unauthorized)?; let repo: repo::Model = self .utils_check_repo_admin(namespace.clone(), repo_name.clone(), ctx) .await?; let pr = pull_request::Entity::find() .filter(pull_request::Column::Repo.eq(repo.id)) .filter(pull_request::Column::Number.eq(number)) .one(&self.db) .await? .ok_or(AppError::NotFound("Pull request not found".to_string()))?; // Permission: author OR admin/owner let is_author = pr.author == user_uid; let member = project_members::Entity::find() .filter(project_members::Column::Project.eq(repo.project)) .filter(project_members::Column::User.eq(user_uid)) .one(&self.db) .await? .ok_or(AppError::NoPower)?; let role = member.scope_role().map_err(|_| AppError::RoleParseError)?; let is_admin = role == MemberRole::Admin || role == MemberRole::Owner; if !is_author && !is_admin { return Err(AppError::NoPower); } // Cannot update a merged PR if pr.status == PrStatus::Merged.to_string() { return Err(AppError::BadRequest( "Cannot update a merged pull request".to_string(), )); } let mut active: pull_request::ActiveModel = pr.clone().into(); if let Some(title) = request.title { active.title = Set(title); } if let Some(body) = request.body { active.body = Set(Some(body)); } if let Some(base) = request.base { active.base = Set(base); } if let Some(draft) = request.draft { active.status = Set(if draft { PrStatus::Draft.to_string() } else { PrStatus::Open.to_string() }); } active.updated_at = Set(Utc::now()); let model = active.update(&self.db).await?; super::invalidate_pr_cache(&self.cache, repo.id, number).await; let actor_username = user::Entity::find_by_id(user_uid) .one(&self.db) .await .ok() .flatten() .map(|u| u.username) .unwrap_or_default(); let _ = self .project_log_activity( repo.project, Some(repo.id), user_uid, super::super::project::activity::ActivityLogParams { event_type: "pr_update".to_string(), title: format!("{} updated pull request #{}", actor_username, number), repo_id: Some(repo.id), content: Some(model.title.clone()), event_id: None, event_sub_id: Some(number), metadata: None, is_private: false, }, ) .await; Ok(PullRequestResponse::from(model)) } /// Close a pull request. Author OR admin/owner only. pub async fn pull_request_close( &self, namespace: String, repo_name: String, number: i64, ctx: &Session, ) -> Result { self.pr_set_status(namespace, repo_name, number, PrStatus::Closed, ctx) .await } /// Reopen a pull request. Author OR admin/owner only. pub async fn pull_request_reopen( &self, namespace: String, repo_name: String, number: i64, ctx: &Session, ) -> Result { self.pr_set_status(namespace, repo_name, number, PrStatus::Open, ctx) .await } pub async fn pull_request_delete( &self, namespace: String, repo_name: String, number: i64, ctx: &Session, ) -> Result<(), AppError> { let user_uid = ctx.user().ok_or(AppError::Unauthorized)?; let repo: repo::Model = self .utils_check_repo_admin(namespace.clone(), repo_name.clone(), ctx) .await?; let pr = pull_request::Entity::find() .filter(pull_request::Column::Repo.eq(repo.id)) .filter(pull_request::Column::Number.eq(number)) .one(&self.db) .await? .ok_or(AppError::NotFound("Pull request not found".to_string()))?; // Permission: author OR admin/owner let is_author = pr.author == user_uid; let member = project_members::Entity::find() .filter(project_members::Column::Project.eq(repo.project)) .filter(project_members::Column::User.eq(user_uid)) .one(&self.db) .await? .ok_or(AppError::NoPower)?; let role = member.scope_role().map_err(|_| AppError::RoleParseError)?; let is_admin = role == MemberRole::Admin || role == MemberRole::Owner; if !is_author && !is_admin { return Err(AppError::NoPower); } // Cascade delete related records models::pull_request::PullRequestCommit::delete_many() .filter(models::pull_request::pull_request_commit::Column::Repo.eq(repo.id)) .filter(models::pull_request::pull_request_commit::Column::Number.eq(number)) .exec(&self.db) .await?; models::pull_request::PullRequestReview::delete_many() .filter(models::pull_request::pull_request_review::Column::Repo.eq(repo.id)) .filter(models::pull_request::pull_request_review::Column::Number.eq(number)) .exec(&self.db) .await?; models::pull_request::PullRequestReviewComment::delete_many() .filter(models::pull_request::pull_request_review_comment::Column::Repo.eq(repo.id)) .filter(models::pull_request::pull_request_review_comment::Column::Number.eq(number)) .exec(&self.db) .await?; pull_request::Entity::delete_by_id((repo.id, number)) .exec(&self.db) .await?; super::invalidate_pr_cache(&self.cache, repo.id, number).await; let pr_title = pr.title.clone(); let _ = self .project_log_activity( repo.project, Some(repo.id), user_uid, ActivityLogParams { event_type: "pr_delete".to_string(), title: format!( "{} deleted pull request #{}: {}", user_uid, number, pr_title ), repo_id: Some(repo.id), content: Some(pr_title.clone()), event_id: None, event_sub_id: Some(number), metadata: Some(serde_json::json!({ "pr_number": number, "pr_title": pr_title, "base": pr.base, "head": pr.head, })), is_private: false, }, ) .await; Ok(()) } /// Get PR summary counts. pub async fn pull_request_summary( &self, namespace: String, repo_name: String, ctx: &Session, ) -> Result { let repo = self.utils_find_repo(namespace, repo_name, ctx).await?; let total: u64 = pull_request::Entity::find() .filter(pull_request::Column::Repo.eq(repo.id)) .count(&self.db) .await?; let open: u64 = pull_request::Entity::find() .filter(pull_request::Column::Repo.eq(repo.id)) .filter(pull_request::Column::Status.eq(PrStatus::Open.to_string())) .count(&self.db) .await?; let merged: u64 = pull_request::Entity::find() .filter(pull_request::Column::Repo.eq(repo.id)) .filter(pull_request::Column::Status.eq(PrStatus::Merged.to_string())) .count(&self.db) .await?; let closed: u64 = pull_request::Entity::find() .filter(pull_request::Column::Repo.eq(repo.id)) .filter(pull_request::Column::Status.eq(PrStatus::Closed.to_string())) .count(&self.db) .await?; Ok(PullRequestSummaryResponse { total, open, merged, closed, }) } async fn pr_set_status( &self, namespace: String, repo_name: String, number: i64, status: PrStatus, ctx: &Session, ) -> Result { let user_uid = ctx.user().ok_or(AppError::Unauthorized)?; let repo: repo::Model = self .utils_check_repo_admin(namespace.clone(), repo_name.clone(), ctx) .await?; let pr = pull_request::Entity::find() .filter(pull_request::Column::Repo.eq(repo.id)) .filter(pull_request::Column::Number.eq(number)) .one(&self.db) .await? .ok_or(AppError::NotFound("Pull request not found".to_string()))?; // Permission: author OR admin/owner let is_author = pr.author == user_uid; let member = project_members::Entity::find() .filter(project_members::Column::Project.eq(repo.project)) .filter(project_members::Column::User.eq(user_uid)) .one(&self.db) .await? .ok_or(AppError::NoPower)?; let role = member.scope_role().map_err(|_| AppError::RoleParseError)?; let is_admin = role == MemberRole::Admin || role == MemberRole::Owner; if !is_author && !is_admin { return Err(AppError::NoPower); } if pr.status == PrStatus::Merged.to_string() { return Err(AppError::BadRequest( "Cannot modify a merged pull request".to_string(), )); } let mut active: pull_request::ActiveModel = pr.clone().into(); active.status = Set(status.to_string()); active.updated_at = Set(Utc::now()); let model = active.update(&self.db).await?; super::invalidate_pr_cache(&self.cache, repo.id, number).await; let actor_username = user::Entity::find_by_id(user_uid) .one(&self.db) .await .ok() .flatten() .map(|u| u.username) .unwrap_or_default(); let event_type = if status == PrStatus::Closed { "pr_close" } else { "pr_reopen" }; let _ = self .project_log_activity( repo.project, Some(repo.id), user_uid, super::super::project::activity::ActivityLogParams { event_type: event_type.to_string(), title: format!( "{} {} pull request #{}", actor_username, if status == PrStatus::Closed { "closed" } else { "reopened" }, number ), repo_id: Some(repo.id), content: Some(model.title.clone()), event_id: None, event_sub_id: Some(number), metadata: None, is_private: false, }, ) .await; Ok(PullRequestResponse::from(model)) } /// List all commits in a pull request (from base..head). pub async fn pr_commits_list( &self, namespace: String, repo_name: String, pr_number: i64, ctx: &Session, ) -> Result { let repo = self .utils_find_repo(namespace.clone(), repo_name.clone(), ctx) .await?; let pr = pull_request::Entity::find() .filter(pull_request::Column::Repo.eq(repo.id)) .filter(pull_request::Column::Number.eq(pr_number)) .one(&self.db) .await? .ok_or(AppError::NotFound("Pull request not found".to_string()))?; // Clone repo once so it can be used in both spawn_blocking calls let repo_for_base = repo.clone(); let repo_for_commits = repo.clone(); let (base_oid, head_oid) = tokio::task::spawn_blocking({ let pr = pr.clone(); move || { let domain = crate::git::GitDomain::from_model(repo_for_base)?; let base_ref = if pr.base.starts_with("refs/") { pr.base.clone() } else { format!("refs/heads/{}", pr.base) }; let head_ref = if pr.head.starts_with("refs/") { pr.head.clone() } else { format!("refs/heads/{}", pr.head) }; let base_oid = domain .ref_target(&base_ref) .map_err(|e| crate::git::GitError::Internal(e.to_string()))? .ok_or_else(|| { crate::git::GitError::NotFound(format!( "Base branch '{}' not found", pr.base )) })?; let head_oid = domain .ref_target(&head_ref) .map_err(|e| crate::git::GitError::Internal(e.to_string()))? .ok_or_else(|| { crate::git::GitError::NotFound(format!( "Head branch '{}' not found", pr.head )) })?; Ok::<_, crate::git::GitError>((base_oid, head_oid)) } }) .await .map_err(|e| AppError::InternalServerError(format!("Task join error: {e}")))? .map_err(AppError::from)?; let commits = tokio::task::spawn_blocking(move || { let domain = crate::git::GitDomain::from_model(repo_for_commits)?; let range = format!("{}..{}", base_oid, head_oid); let metas = domain.commit_log(Some(&range), 0, 500)?; Ok::<_, crate::git::GitError>(metas) }) .await .map_err(|e| AppError::InternalServerError(format!("Task join error: {e}")))? .map_err(AppError::from)?; let commits: Vec = commits.into_iter().map(PrCommitResponse::from).collect(); Ok(PrCommitsListResponse { commits }) } /// Get the side-by-side diff for a pull request. /// /// Resolves the PR's base and head branch refs to commit OIDs and /// generates a side-by-side diff suitable for UI rendering. pub async fn pr_diff_side_by_side( &self, namespace: String, repo_name: String, pr_number: i64, query: SideBySideDiffQuery, ctx: &Session, ) -> Result { let repo = self .utils_find_repo(namespace.clone(), repo_name.clone(), ctx) .await?; let pr = pull_request::Entity::find() .filter(pull_request::Column::Repo.eq(repo.id)) .filter(pull_request::Column::Number.eq(pr_number)) .one(&self.db) .await? .ok_or(AppError::NotFound("Pull request not found".to_string()))?; let (base_oid, head_oid) = tokio::task::spawn_blocking(move || { let domain = crate::git::GitDomain::from_model(repo)?; let base_oid = domain .branch_target(&pr.base) .map_err(|e| crate::git::GitError::Internal(e.to_string()))? .ok_or_else(|| { crate::git::GitError::NotFound(format!("Branch '{}' not found", pr.base)) })?; let head_oid = domain .branch_target(&pr.head) .map_err(|e| crate::git::GitError::Internal(e.to_string()))? .ok_or_else(|| { crate::git::GitError::NotFound(format!("Branch '{}' not found", pr.head)) })?; Ok::<_, crate::git::GitError>((base_oid, head_oid)) }) .await .map_err(|e| AppError::InternalServerError(format!("Task join error: {e}")))? .map_err(AppError::from)?; let diff_query = SideBySideDiffQuery { base: base_oid.to_string(), head: head_oid.to_string(), pathspec: query.pathspec, context_lines: query.context_lines, }; self.git_diff_side_by_side(namespace, repo_name, diff_query, ctx) .await } }