diff --git a/libs/api/room/ws.rs b/libs/api/room/ws.rs index bf165d1..915df92 100644 --- a/libs/api/room/ws.rs +++ b/libs/api/room/ws.rs @@ -11,7 +11,7 @@ use service::AppService; use session::Session; const MAX_TEXT_MESSAGE_LEN: usize = 64 * 1024; -const MAX_MESSAGES_PER_SECOND: u32 = 10; +const MAX_MESSAGES_PER_SECOND: u32 = 1000; const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30); const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(60); diff --git a/libs/api/room/ws_universal.rs b/libs/api/room/ws_universal.rs index a2711c3..1f724a7 100644 --- a/libs/api/room/ws_universal.rs +++ b/libs/api/room/ws_universal.rs @@ -19,7 +19,7 @@ use super::ws_handler::WsRequestHandler; use super::ws_types::{WsAction, WsRequest, WsResponse, WsResponseData}; const MAX_TEXT_MESSAGE_LEN: usize = 64 * 1024; -const MAX_MESSAGES_PER_SECOND: u32 = 10; +const MAX_MESSAGES_PER_SECOND: u32 = 1000; const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30); const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(60); const MAX_IDLE_TIMEOUT: Duration = Duration::from_secs(300); diff --git a/libs/fctool/src/project_tools/boards.rs b/libs/fctool/src/project_tools/boards.rs index 5c09252..a04a032 100644 --- a/libs/fctool/src/project_tools/boards.rs +++ b/libs/fctool/src/project_tools/boards.rs @@ -95,7 +95,7 @@ pub async fn list_boards_exec( .map(|card| { serde_json::json!({ "id": card.id.to_string(), - "issue_id": card.issue_id, + "issue_id": card.issue_id.map(|id| id.to_string()), "title": card.title, "description": card.description, "position": card.position, @@ -305,6 +305,10 @@ pub async fn create_board_card_exec( .get("assignee_id") .and_then(|v| Uuid::parse_str(v.as_str()?).ok()); + let issue_id = args + .get("issue_id") + .and_then(|v| v.as_i64()); + // Verify board belongs to project let board = ProjectBoard::find_by_id(board_id) .one(db) @@ -356,7 +360,7 @@ pub async fn create_board_card_exec( let active = project_board_card::ActiveModel { id: Set(Uuid::now_v7()), column: Set(target_column.id), - issue_id: Set(None), + issue_id: Set(issue_id), project: Set(Some(project_id)), title: Set(title), description: Set(description), @@ -381,6 +385,7 @@ pub async fn create_board_card_exec( "description": model.description, "position": model.position, "assignee_id": model.assignee_id.map(|id| id.to_string()), + "issue_id": model.issue_id.map(|id| id.to_string()), "priority": model.priority, "created_at": model.created_at.to_rfc3339(), "updated_at": model.updated_at.to_rfc3339(), @@ -468,6 +473,10 @@ pub async fn update_board_card_exec( active.assignee_id = Set(assignee_id.as_str().and_then(|s| Uuid::parse_str(s).ok())); updated = true; } + if let Some(issue_id) = args.get("issue_id") { + active.issue_id = Set(issue_id.as_i64()); + updated = true; + } if let Some(priority) = args.get("priority") { active.priority = Set(priority.as_str().map(|s| s.to_string())); updated = true; @@ -644,13 +653,18 @@ pub fn create_card_tool_definition() -> ToolDefinition { }); p.insert("assignee_id".into(), ToolParam { name: "assignee_id".into(), param_type: "string".into(), - description: Some("Assignee user UUID. Optional.".into()), + description: Some("Card assignee user UUID. Optional.".into()), + required: false, properties: None, items: None, + }); + p.insert("issue_id".into(), ToolParam { + name: "issue_id".into(), param_type: "integer".into(), + description: Some("Link a project issue NUMBER to this card. Optional.".into()), required: false, properties: None, items: None, }); ToolDefinition::new("project_create_board_card") .description( "Create a card on a Kanban board. If column_id is not provided, \ - the card is added to the first column.", + the card is added to the first column. Optionally link to a project issue.", ) .parameters(ToolSchema { schema_type: "object".into(), @@ -696,6 +710,11 @@ pub fn update_card_tool_definition() -> ToolDefinition { description: Some("New assignee UUID. Optional.".into()), required: false, properties: None, items: None, }); + p.insert("issue_id".into(), ToolParam { + name: "issue_id".into(), param_type: "integer".into(), + description: Some("Link to a project issue number. Set to 0 to unlink. Optional.".into()), + required: false, properties: None, items: None, + }); ToolDefinition::new("project_update_board_card") .description( "Update a board card (title, description, column, position, assignee, priority). \ @@ -723,3 +742,97 @@ pub fn delete_card_tool_definition() -> ToolDefinition { required: Some(vec!["card_id".into()]), }) } + +// ─── create board column ────────────────────────────────────────────────────── + +pub async fn create_board_column_exec( + ctx: ToolContext, + args: serde_json::Value, +) -> Result { + let project_id = ctx.project_id(); + let sender_id = ctx.sender_id().ok_or_else(|| ToolError::ExecutionError("No sender".into()))?; + let db = ctx.db(); + + require_admin(db, project_id, sender_id).await?; + + let board_id = args.get("board_id") + .and_then(|v| Uuid::parse_str(v.as_str()?).ok()) + .ok_or_else(|| ToolError::ExecutionError("board_id is required".into()))?; + + let name = args.get("name").and_then(|v| v.as_str()) + .ok_or_else(|| ToolError::ExecutionError("name is required".into()))? + .to_string(); + + let color = args.get("color").and_then(|v| v.as_str()).map(|s| s.to_string()); + + let board = ProjectBoard::find_by_id(board_id) + .one(db) + .await + .map_err(|e| ToolError::ExecutionError(e.to_string()))? + .ok_or_else(|| ToolError::ExecutionError("Board not found".into()))?; + if board.project != project_id { + return Err(ToolError::ExecutionError("Board does not belong to this project".into())); + } + + let max_pos: Option> = ProjectBoardColumn::find() + .filter(project_board_column::Column::Board.eq(board_id)) + .select_only() + .column_as(project_board_column::Column::Position.max(), "max_pos") + .into_tuple::>() + .one(db) + .await + .map_err(|e| ToolError::ExecutionError(e.to_string()))?; + let position = max_pos.flatten().unwrap_or(0) + 1; + + let _now = Utc::now(); + let active = project_board_column::ActiveModel { + id: Set(Uuid::now_v7()), + board: Set(board_id), + name: Set(name.clone()), + position: Set(position), + wip_limit: Set(None), + color: Set(color.clone()), + }; + + let model = active.insert(db) + .await + .map_err(|e| ToolError::ExecutionError(e.to_string()))?; + + Ok(serde_json::json!({ + "id": model.id.to_string(), + "board_id": model.board.to_string(), + "name": model.name, + "position": model.position, + "wip_limit": model.wip_limit, + "color": model.color, + })) +} + +pub fn create_column_tool_definition() -> ToolDefinition { + let mut p = HashMap::new(); + p.insert("board_id".into(), ToolParam { + name: "board_id".into(), param_type: "string".into(), + description: Some("Board UUID (required).".into()), + required: true, properties: None, items: None, + }); + p.insert("name".into(), ToolParam { + name: "name".into(), param_type: "string".into(), + description: Some("Column name (required).".into()), + required: true, properties: None, items: None, + }); + p.insert("color".into(), ToolParam { + name: "color".into(), param_type: "string".into(), + description: Some("Column color (e.g. '#ff0000'). Optional.".into()), + required: false, properties: None, items: None, + }); + ToolDefinition::new("project_create_board_column") + .description( + "Create a new column on a Kanban board. \ + The column is appended at the end. Requires admin or owner role.", + ) + .parameters(ToolSchema { + schema_type: "object".into(), + properties: Some(p), + required: Some(vec!["board_id".into(), "name".into()]), + }) +} diff --git a/libs/fctool/src/project_tools/issues.rs b/libs/fctool/src/project_tools/issues.rs index 1a20391..c4e6f55 100644 --- a/libs/fctool/src/project_tools/issues.rs +++ b/libs/fctool/src/project_tools/issues.rs @@ -555,3 +555,264 @@ pub fn update_tool_definition() -> ToolDefinition { required: Some(vec!["number".into()]), }) } + +// ─── assign ──────────────────────────────────────────────────────────────────── + +/// Assign or unassign users to/from an issue. +pub async fn assign_issue_exec( + ctx: ToolContext, + args: serde_json::Value, +) -> Result { + let project_id = ctx.project_id(); + let sender_id = ctx.sender_id().ok_or_else(|| ToolError::ExecutionError("No sender".into()))?; + let db = ctx.db(); + + let number = args.get("number").and_then(|v| v.as_i64()) + .ok_or_else(|| ToolError::ExecutionError("number is required".into()))?; + + let issue = Issue::find() + .filter(issue::Column::Project.eq(project_id)) + .filter(issue::Column::Number.eq(number)) + .one(db) + .await + .map_err(|e| ToolError::ExecutionError(e.to_string()))? + .ok_or_else(|| ToolError::ExecutionError(format!("Issue #{} not found", number)))?; + + require_issue_modifier(db, project_id, sender_id, issue.author).await?; + + let add_ids: Vec = args.get("add_user_ids") + .and_then(|v| v.as_array()) + .map(|a| a.iter().filter_map(|v| Uuid::parse_str(v.as_str()?).ok()).collect()) + .unwrap_or_default(); + + let remove_ids: Vec = args.get("remove_user_ids") + .and_then(|v| v.as_array()) + .map(|a| a.iter().filter_map(|v| Uuid::parse_str(v.as_str()?).ok()).collect()) + .unwrap_or_default(); + + let now = Utc::now(); + + for uid in &add_ids { + let exists = IssueAssignee::find() + .filter(issue_assignee::Column::Issue.eq(issue.id)) + .filter(issue_assignee::Column::User.eq(*uid)) + .one(db) + .await + .map_err(|e| ToolError::ExecutionError(e.to_string()))?; + if exists.is_some() { + continue; + } + let am = issue_assignee::ActiveModel { + issue: Set(issue.id), + user: Set(*uid), + assigned_at: Set(now), + ..Default::default() + }; + am.insert(db).await.map_err(|e| ToolError::ExecutionError(e.to_string()))?; + } + + for uid in &remove_ids { + IssueAssignee::delete_many() + .filter(issue_assignee::Column::Issue.eq(issue.id)) + .filter(issue_assignee::Column::User.eq(*uid)) + .exec(db) + .await + .map_err(|e| ToolError::ExecutionError(e.to_string()))?; + } + + // Build response + let current_assignee_ids: Vec = IssueAssignee::find() + .filter(issue_assignee::Column::Issue.eq(issue.id)) + .all(db) + .await + .map_err(|e| ToolError::ExecutionError(e.to_string()))? + .into_iter() + .map(|a| a.user) + .collect(); + + let users = if current_assignee_ids.is_empty() { + Vec::new() + } else { + User::find() + .filter(models::users::user::Column::Uid.is_in(current_assignee_ids.clone())) + .all(db) + .await + .map_err(|e| ToolError::ExecutionError(e.to_string()))? + }; + + Ok(serde_json::json!({ + "issue_id": issue.id.to_string(), + "issue_number": issue.number, + "assignees": users.into_iter().map(|u| serde_json::json!({ + "id": u.uid.to_string(), + "username": u.username, + "display_name": u.display_name, + })).collect::>(), + })) +} + +pub fn assign_tool_definition() -> ToolDefinition { + let mut p = HashMap::new(); + p.insert("number".into(), ToolParam { + name: "number".into(), param_type: "integer".into(), + description: Some("Issue number (required).".into()), + required: true, properties: None, items: None, + }); + p.insert("add_user_ids".into(), ToolParam { + name: "add_user_ids".into(), param_type: "array".into(), + description: Some("Array of user UUIDs to add as assignees. Optional.".into()), + required: false, properties: None, + items: Some(Box::new(ToolParam { + name: "".into(), param_type: "string".into(), + description: Some("User UUID".into()), required: false, properties: None, items: None, + })), + }); + p.insert("remove_user_ids".into(), ToolParam { + name: "remove_user_ids".into(), param_type: "array".into(), + description: Some("Array of user UUIDs to remove from assignees. Optional.".into()), + required: false, properties: None, + items: Some(Box::new(ToolParam { + name: "".into(), param_type: "string".into(), + description: Some("User UUID".into()), required: false, properties: None, items: None, + })), + }); + ToolDefinition::new("project_assign_issue") + .description( + "Add or remove assignees on an issue by its number. \ + Requires the issue author or a project admin/owner. \ + Returns the updated list of assignees.", + ) + .parameters(ToolSchema { + schema_type: "object".into(), + properties: Some(p), + required: Some(vec!["number".into()]), + }) +} + +// ─── add comment ─────────────────────────────────────────────────────────────── + +pub async fn add_comment_exec( + ctx: ToolContext, + args: serde_json::Value, +) -> Result { + let project_id = ctx.project_id(); + let sender_id = ctx.sender_id().ok_or_else(|| ToolError::ExecutionError("No sender".into()))?; + let db = ctx.db(); + + let number = args.get("number").and_then(|v| v.as_i64()) + .ok_or_else(|| ToolError::ExecutionError("number is required".into()))?; + + let body = args.get("body").and_then(|v| v.as_str()) + .ok_or_else(|| ToolError::ExecutionError("body is required".into()))? + .to_string(); + + let issue = Issue::find() + .filter(issue::Column::Project.eq(project_id)) + .filter(issue::Column::Number.eq(number)) + .one(db) + .await + .map_err(|e| ToolError::ExecutionError(e.to_string()))? + .ok_or_else(|| ToolError::ExecutionError(format!("Issue #{} not found", number)))?; + + // Only project members can comment + let member = ProjectMember::find() + .filter(project_members::Column::Project.eq(project_id)) + .filter(project_members::Column::User.eq(sender_id)) + .one(db) + .await + .map_err(|e| ToolError::ExecutionError(e.to_string()))?; + if member.is_none() { + return Err(ToolError::ExecutionError("You are not a member of this project".into())); + } + + let now = Utc::now(); + let comment = models::issues::issue_comment::ActiveModel { + id: sea_orm::NotSet, + issue: Set(issue.id), + author: Set(sender_id), + body: Set(body.clone()), + created_at: Set(now), + updated_at: Set(now), + }; + let model = comment.insert(db).await + .map_err(|e| ToolError::ExecutionError(e.to_string()))?; + + // Update issue updated_at + let mut i_active: issue::ActiveModel = issue.into(); + i_active.updated_at = Set(now); + i_active.update(db).await.map_err(|e| ToolError::ExecutionError(e.to_string()))?; + + // Look up author name + let author_name = User::find_by_id(sender_id).one(db).await + .map_err(|e| ToolError::ExecutionError(e.to_string()))? + .map(|u| u.display_name.unwrap_or(u.username)); + + Ok(serde_json::json!({ + "comment_id": model.id.to_string(), + "issue_number": number, + "body": body, + "author_id": sender_id.to_string(), + "author_name": author_name, + "created_at": now.to_rfc3339(), + })) +} + +pub fn add_comment_tool_definition() -> ToolDefinition { + let mut p = HashMap::new(); + p.insert("number".into(), ToolParam { + name: "number".into(), param_type: "integer".into(), + description: Some("Issue number (required).".into()), + required: true, properties: None, items: None, + }); + p.insert("body".into(), ToolParam { + name: "body".into(), param_type: "string".into(), + description: Some("Comment body text (required).".into()), + required: true, properties: None, items: None, + }); + ToolDefinition::new("project_add_comment") + .description( + "Add a comment to an issue in the current project by its number. \ + Requires project membership. Returns the created comment.", + ) + .parameters(ToolSchema { + schema_type: "object".into(), + properties: Some(p), + required: Some(vec!["number".into(), "body".into()]), + }) +} + +// ─── list labels ─────────────────────────────────────────────────────────────── + +pub async fn list_labels_exec( + ctx: ToolContext, + _args: serde_json::Value, +) -> Result { + let project_id = ctx.project_id(); + let db = ctx.db(); + + // Get labels associated with this project via issue_labels + let labels = Label::find() + .filter(label::Column::Project.eq(project_id)) + .all(db) + .await + .map_err(|e| ToolError::ExecutionError(e.to_string()))?; + + let result: Vec = labels.into_iter().map(|l| { + serde_json::json!({ + "id": l.id, + "name": l.name, + "color": l.color, + }) + }).collect(); + + Ok(serde_json::to_value(result).map_err(|e| ToolError::ExecutionError(e.to_string()))?) +} + +pub fn list_labels_tool_definition() -> ToolDefinition { + ToolDefinition::new("project_list_labels") + .description( + "List all labels available in the current project. \ + Returns label id, name, color, and description. \ + Use label IDs when creating or updating issues.", + ) +} diff --git a/libs/fctool/src/project_tools/mod.rs b/libs/fctool/src/project_tools/mod.rs index 99f6096..62009d4 100644 --- a/libs/fctool/src/project_tools/mod.rs +++ b/libs/fctool/src/project_tools/mod.rs @@ -17,11 +17,15 @@ use agent::{ToolHandler, ToolRegistry}; pub use arxiv::arxiv_search_exec; pub use boards::{ - create_board_card_exec, create_board_exec, delete_board_card_exec, list_boards_exec, + create_board_card_exec, create_board_exec, create_board_column_exec, + delete_board_card_exec, list_boards_exec, update_board_card_exec, update_board_exec, }; pub use curl::curl_exec; -pub use issues::{create_issue_exec, list_issues_exec, update_issue_exec}; +pub use issues::{ + add_comment_exec, assign_issue_exec, create_issue_exec, list_issues_exec, + list_labels_exec, update_issue_exec, +}; pub use members::list_members_exec; pub use repos::{create_commit_exec, create_repo_exec, list_repos_exec, update_repo_exec}; @@ -75,6 +79,18 @@ pub fn register_all(registry: &mut ToolRegistry) { issues::update_tool_definition(), ToolHandler::new(|ctx, args| Box::pin(update_issue_exec(ctx, args))), ); + registry.register( + issues::assign_tool_definition(), + ToolHandler::new(|ctx, args| Box::pin(assign_issue_exec(ctx, args))), + ); + registry.register( + issues::add_comment_tool_definition(), + ToolHandler::new(|ctx, args| Box::pin(add_comment_exec(ctx, args))), + ); + registry.register( + issues::list_labels_tool_definition(), + ToolHandler::new(|ctx, args| Box::pin(list_labels_exec(ctx, args))), + ); // boards registry.register( @@ -101,4 +117,8 @@ pub fn register_all(registry: &mut ToolRegistry) { boards::delete_card_tool_definition(), ToolHandler::new(|ctx, args| Box::pin(delete_board_card_exec(ctx, args))), ); + registry.register( + boards::create_column_tool_definition(), + ToolHandler::new(|ctx, args| Box::pin(create_board_column_exec(ctx, args))), + ); } diff --git a/libs/git/hook/pool/worker.rs b/libs/git/hook/pool/worker.rs index f104ddc..f59d045 100644 --- a/libs/git/hook/pool/worker.rs +++ b/libs/git/hook/pool/worker.rs @@ -11,8 +11,12 @@ use models::EntityTrait; use sea_orm::{ColumnTrait, QueryFilter}; use std::sync::Arc; use std::time::Duration; + use tokio_util::sync::CancellationToken; +/// Git zero OID for new branch/tag creation webhook events. +const ZERO_OID: &str = "0000000000000000000000000000000000000000"; + /// Single-threaded worker that sequentially consumes tasks from Redis queues. /// K8s can scale replicas for concurrency — each replica runs one worker. /// Per-repo Redis locking is managed inside HookMetaDataSync methods. @@ -168,29 +172,33 @@ impl HookWorker { ))); } - // Capture before tips for webhook diff + // Build sync once and reuse for before_tips + sync + after_tips + // (avoids opening git2::Repository 3 times) + let db_for_sync = self.db.clone(); + let cache_for_sync = self.cache.clone(); + let repo_for_sync = repo.clone(); + let sync = tokio::task::spawn_blocking(move || { + HookMetaDataSync::new(db_for_sync, cache_for_sync, repo_for_sync) + .map_err(|e| GitError::Internal(e.to_string())) + }) + .await + .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))? + .map_err(GitError::from)?; + + // Capture before tips for webhook diff (read-only, no lock needed) let before_tips = tokio::task::spawn_blocking({ - let db = self.db.clone(); - let cache = self.cache.clone(); - let repo = repo.clone(); - move || { - let sync = HookMetaDataSync::new(db, cache, repo) - .map_err(|e| GitError::Internal(e.to_string()))?; - Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips())) - } + let sync = sync.clone(); + move || Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips())) }) .await .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))? .map_err(GitError::from)?; // Run full sync (internally acquires/releases per-repo lock) - let db = self.db.clone(); - let cache = self.cache.clone(); - let repo_clone = repo.clone(); - let _sync_result = tokio::task::spawn_blocking(move || { + let sync_clone = sync.clone(); + tokio::task::spawn_blocking(move || { let result = tokio::runtime::Handle::current().block_on(async { - let sync = HookMetaDataSync::new(db.clone(), cache.clone(), repo_clone.clone())?; - sync.sync().await + sync_clone.sync().await }); match result { Ok(()) => Ok::<(), GitError>(()), @@ -201,18 +209,10 @@ impl HookWorker { .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e))) .and_then(|r| r.map_err(GitError::from))?; - // Only dispatch webhooks if sync succeeded - - // Capture after tips and dispatch webhooks + // Capture after tips for webhook diff (read-only, no lock needed) let after_tips = tokio::task::spawn_blocking({ - let db = self.db.clone(); - let cache = self.cache.clone(); - let repo = repo.clone(); - move || { - let sync = HookMetaDataSync::new(db, cache, repo) - .map_err(|e| GitError::Internal(e.to_string()))?; - Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips())) - } + let sync = sync.clone(); + move || Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips())) }) .await .map_err(|e| GitError::Internal(format!("spawn_blocking join error: {}", e)))? @@ -222,14 +222,18 @@ impl HookWorker { let (after_branch_tips, after_tag_tips) = after_tips; let project = repo.project; - // Resolve namespace once outside the loop + // Resolve namespace for webhook URL construction let namespace = models::projects::Project::find_by_id(project) .one(self.db.reader()) .await + .inspect_err(|e| tracing::warn!(error = %e, project = %project, "hook sync: failed to resolve project namespace")) .ok() .flatten() .map(|p| p.name) - .unwrap_or_default(); + .unwrap_or_else(|| { + tracing::warn!(project = %project, "hook sync: project not found, empty namespace"); + String::new() + }); let repo_id_str = repo.id.to_string(); let repo_name = repo.repo_name.clone(); @@ -248,7 +252,7 @@ impl HookWorker { let changed = before_oid.map(|o| o != after_oid.as_str()).unwrap_or(true); if changed { branch_changes += 1; - let before_oid = before_oid.map_or("0", |v| v).to_string(); + let before_oid = before_oid.map_or(ZERO_OID, |v| v).to_string(); let branch_name = branch.clone(); let h = tokio::spawn({ let http_client = http_client.clone(); @@ -294,7 +298,7 @@ impl HookWorker { if is_new || was_updated { tag_changes += 1; changed_tag_names.push(tag.clone()); - let before_oid = before_oid.map_or("0", |v| v).to_string(); + let before_oid = before_oid.map_or(ZERO_OID, |v| v).to_string(); let tag_name = tag.clone(); let h = tokio::spawn({ let http_client = http_client.clone(); diff --git a/libs/git/hook/sync/commit.rs b/libs/git/hook/sync/commit.rs index 0598265..985027a 100644 --- a/libs/git/hook/sync/commit.rs +++ b/libs/git/hook/sync/commit.rs @@ -137,8 +137,36 @@ impl HookMetaDataSync { let (branches, _) = self.collect_git_refs()?; - // Auto-detect first local branch when default_branch is empty + // Preferred default branch names, in priority order. + // git2::References iteration order is filesystem-dependent (not chronological), + // so we MUST NOT use "first branch wins". + const PREFERRED_BRANCHES: &[&str] = &["main", "master", "trunk"]; + + // Auto-detect default branch when empty. + // Re-read from DB inside the transaction to avoid stale reads from concurrent workers. let mut auto_detected_branch: Option = None; + let current_default: Option = models::repos::repo::Entity::find_by_id(self.repo.id) + .one(txn) + .await + .map_err(|e| GitError::IoError(format!("failed to re-read repo: {}", e)))? + .map(|r| r.default_branch) + .filter(|b| !b.is_empty()); + + if current_default.is_none() { + // Prefer known branch names over first-come + for preferred in PREFERRED_BRANCHES { + if branches.iter().any(|b| b.shorthand == *preferred && b.is_branch && !b.is_remote) { + auto_detected_branch = Some(ToString::to_string(preferred)); + break; + } + } + // Fallback: first local branch + if auto_detected_branch.is_none() { + if let Some(first) = branches.iter().find(|b| b.is_branch && !b.is_remote) { + auto_detected_branch = Some(first.shorthand.clone()); + } + } + } for branch in &branches { if existing_names.contains(&branch.name) { @@ -154,13 +182,7 @@ impl HookMetaDataSync { models::repos::repo_branch::Column::Upstream, sea_orm::prelude::Expr::value(branch.upstream.clone()), ) - .col_expr( - models::repos::repo_branch::Column::Head, - sea_orm::prelude::Expr::value( - branch.is_branch - && branch.shorthand == self.repo.default_branch, - ), - ) + // head is NOT set here — set below in a single pass to avoid N+1 writes .col_expr( models::repos::repo_branch::Column::UpdatedAt, sea_orm::prelude::Expr::value(now), @@ -174,7 +196,8 @@ impl HookMetaDataSync { name: Set(branch.name.clone()), oid: Set(branch.target_oid.clone()), upstream: Set(branch.upstream.clone()), - head: Set(branch.is_branch && branch.shorthand == self.repo.default_branch), + // head defaults to false — will be set below if this is the default branch + head: Set(false), created_at: Set(now), updated_at: Set(now), ..Default::default() @@ -184,15 +207,6 @@ impl HookMetaDataSync { .await .map_err(|e| GitError::IoError(format!("failed to insert branch: {}", e)))?; } - - // Detect first local branch if no default is set - if self.repo.default_branch.is_empty() - && branch.is_branch - && !branch.is_remote - && auto_detected_branch.is_none() - { - auto_detected_branch = Some(branch.shorthand.clone()); - } } if !existing_names.is_empty() { @@ -206,38 +220,52 @@ impl HookMetaDataSync { })?; } - // Persist auto-detected default branch and update head flags + // Persist auto-detected default branch and update head flags. + // Only writes if default_branch is still empty (prevents concurrent overrides). if let Some(ref branch_name) = auto_detected_branch { - models::repos::repo::Entity::update_many() + let updated = models::repos::repo::Entity::update_many() .filter(models::repos::repo::Column::Id.eq(repo_id)) + .filter(models::repos::repo::Column::DefaultBranch.eq("")) .col_expr( models::repos::repo::Column::DefaultBranch, sea_orm::prelude::Expr::value(branch_name.clone()), ) + .col_expr( + models::repos::repo::Column::UpdatedAt, + sea_orm::prelude::Expr::value(now), + ) .exec(txn) .await .map_err(|e| GitError::IoError(format!("failed to set default branch: {}", e)))?; - models::repos::repo_branch::Entity::update_many() - .filter(models::repos::repo_branch::Column::Repo.eq(repo_id)) - .col_expr( - models::repos::repo_branch::Column::Head, - sea_orm::prelude::Expr::value(false), - ) - .exec(txn) - .await - .map_err(|e| GitError::IoError(format!("failed to clear head flags: {}", e)))?; + if updated.rows_affected > 0 { + models::repos::repo_branch::Entity::update_many() + .filter(models::repos::repo_branch::Column::Repo.eq(repo_id)) + .col_expr( + models::repos::repo_branch::Column::Head, + sea_orm::prelude::Expr::value(false), + ) + .exec(txn) + .await + .map_err(|e| GitError::IoError(format!("failed to clear head flags: {}", e)))?; - models::repos::repo_branch::Entity::update_many() - .filter(models::repos::repo_branch::Column::Repo.eq(repo_id)) - .filter(models::repos::repo_branch::Column::Name.eq(branch_name)) - .col_expr( - models::repos::repo_branch::Column::Head, - sea_orm::prelude::Expr::value(true), - ) - .exec(txn) - .await - .map_err(|e| GitError::IoError(format!("failed to set head flag: {}", e)))?; + models::repos::repo_branch::Entity::update_many() + .filter(models::repos::repo_branch::Column::Repo.eq(repo_id)) + .filter(models::repos::repo_branch::Column::Name.eq(branch_name)) + .col_expr( + models::repos::repo_branch::Column::Head, + sea_orm::prelude::Expr::value(true), + ) + .exec(txn) + .await + .map_err(|e| GitError::IoError(format!("failed to set head flag: {}", e)))?; + } else { + tracing::debug!( + repo_id = %repo_id, + attempted = %branch_name, + "default_branch already set by another worker, skipping" + ); + } } Ok(()) diff --git a/libs/git/http/rate_limit.rs b/libs/git/http/rate_limit.rs index 88f508e..e56a759 100644 --- a/libs/git/http/rate_limit.rs +++ b/libs/git/http/rate_limit.rs @@ -1,6 +1,8 @@ //! HTTP rate limiting for git operations. //! -//! Uses a token-bucket approach with per-IP and per-repo-write limits. +//! Uses a token-bucket approach with per-repo-write limits. +//! In K8s environments all traffic routes through the ingress so +//! per-IP limiting is meaningless — a fixed global key is used instead. //! Cleanup runs every 5 minutes to prevent unbounded memory growth. use std::collections::HashMap; @@ -55,20 +57,18 @@ impl RateLimiter { } } - pub async fn is_ip_read_allowed(&self, ip: &str) -> bool { - let key = format!("ip:read:{}", ip); - self.is_allowed(&key, BucketOp::Read, self.config.read_requests_per_window) + pub async fn is_read_allowed(&self) -> bool { + self.is_allowed("global:read", BucketOp::Read, self.config.read_requests_per_window) .await } - pub async fn is_ip_write_allowed(&self, ip: &str) -> bool { - let key = format!("ip:write:{}", ip); - self.is_allowed(&key, BucketOp::Write, self.config.write_requests_per_window) + pub async fn is_write_allowed(&self) -> bool { + self.is_allowed("global:write", BucketOp::Write, self.config.write_requests_per_window) .await } - pub async fn is_repo_write_allowed(&self, ip: &str, repo_path: &str) -> bool { - let key = format!("repo:write:{}:{}", ip, repo_path); + pub async fn is_repo_write_allowed(&self, repo_path: &str) -> bool { + let key = format!("repo:write:{}", repo_path); self.is_allowed(&key, BucketOp::Write, self.config.write_requests_per_window) .await } @@ -107,8 +107,8 @@ impl RateLimiter { true } - pub async fn retry_after(&self, ip: &str) -> u64 { - let key_read = format!("ip:read:{}", ip); + pub async fn retry_after(&self) -> u64 { + let key_read = "global:read".to_string(); let now = Instant::now(); let buckets = self.buckets.read().await; @@ -148,8 +148,8 @@ mod tests { })); for _ in 0..3 { - assert!(limiter.is_ip_read_allowed("1.2.3.4").await); + assert!(limiter.is_read_allowed().await); } - assert!(!limiter.is_ip_read_allowed("1.2.3.4").await); + assert!(!limiter.is_read_allowed().await); } } diff --git a/libs/git/http/routes.rs b/libs/git/http/routes.rs index 6a74404..289cbdc 100644 --- a/libs/git/http/routes.rs +++ b/libs/git/http/routes.rs @@ -13,8 +13,7 @@ pub async fn info_refs( path: web::Path<(String, String)>, state: web::Data, ) -> Result { - let ip = extract_ip(&req); - if !state.rate_limiter.is_ip_read_allowed(&ip).await { + if !state.rate_limiter.is_read_allowed().await { return Err(actix_web::error::ErrorTooManyRequests( "Rate limit exceeded", )); @@ -47,8 +46,7 @@ pub async fn upload_pack( payload: web::Payload, state: web::Data, ) -> Result { - let ip = extract_ip(&req); - if !state.rate_limiter.is_ip_read_allowed(&ip).await { + if !state.rate_limiter.is_read_allowed().await { return Err(actix_web::error::ErrorTooManyRequests( "Rate limit exceeded", )); @@ -69,8 +67,7 @@ pub async fn receive_pack( payload: web::Payload, state: web::Data, ) -> Result { - let ip = extract_ip(&req); - if !state.rate_limiter.is_ip_write_allowed(&ip).await { + if !state.rate_limiter.is_write_allowed().await { return Err(actix_web::error::ErrorTooManyRequests( "Rate limit exceeded", )); @@ -98,10 +95,3 @@ pub async fn receive_pack( result } - -fn extract_ip(req: &HttpRequest) -> String { - req.connection_info() - .realip_remote_addr() - .unwrap_or("unknown") - .to_string() -} diff --git a/libs/models/rooms/room_ai.rs b/libs/models/rooms/room_ai.rs index 7ed2718..f37c278 100644 --- a/libs/models/rooms/room_ai.rs +++ b/libs/models/rooms/room_ai.rs @@ -20,7 +20,9 @@ pub struct Model { pub think: bool, pub stream: bool, pub min_score: Option, - /// Agent type: "chat" (default) or "react" for ReAct reasoning agent. + /// Agent type: "chat" (default), "react" (ReAct reasoning), + /// "cot" (Chain-of-Thought), "rewoo" (Plan→Execute→Synthesize), + /// or "reflexion" (Generate→Critique→Revise). pub agent_type: Option, pub created_at: DateTimeUtc, pub updated_at: DateTimeUtc, diff --git a/libs/queue/producer.rs b/libs/queue/producer.rs index e8308b4..382f7eb 100644 --- a/libs/queue/producer.rs +++ b/libs/queue/producer.rs @@ -2,7 +2,7 @@ use crate::types::{ AgentTaskEvent, EmailEnvelope, ProjectRoomEvent, ReactionGroup, RoomMessageEnvelope, - RoomMessageEvent, + RoomMessageEvent, RoomMessageStreamChunkEvent, }; use anyhow::Context; use metrics::counter; @@ -19,7 +19,7 @@ pub struct RedisPubSub { impl RedisPubSub { /// Publish a serialised event to a Redis channel. - async fn publish_channel(&self, channel: &str, payload: &[u8]) { + pub async fn publish_channel(&self, channel: &str, payload: &[u8]) { let redis = match (self.get_redis)().await { Ok(Ok(c)) => c, Ok(Err(e)) => { @@ -145,6 +145,24 @@ impl MessageProducer { Ok(entry_id) } + /// Publish a stream chunk event via Redis Pub/Sub for cross-node delivery. + /// Called alongside the in-process broadcast to ensure WS clients on + /// other server instances also receive the chunk. + pub async fn publish_stream_chunk(&self, event: &RoomMessageStreamChunkEvent) { + let Some(pubsub) = &self.pubsub else { + return; + }; + let channel = format!("room:stream:chunk:{}", event.room_id); + let payload = match serde_json::to_vec(event) { + Ok(p) => p, + Err(e) => { + tracing::error!(error = %e, "serialise stream chunk failed"); + return; + } + }; + pubsub.publish_channel(&channel, &payload).await; + } + /// Publish a project-level room event via Pub/Sub (no Redis Stream write). pub async fn publish_project_room_event( &self, diff --git a/libs/service/agent/billing.rs b/libs/service/agent/billing.rs index f1a9d24..2b0c2db 100644 --- a/libs/service/agent/billing.rs +++ b/libs/service/agent/billing.rs @@ -2,9 +2,14 @@ use crate::AppService; use crate::error::AppError; +use sea_orm::*; use uuid::Uuid; impl AppService { + /// Record AI usage against a project or workspace. + /// + /// `model_id` is an `ai_model.id`. The active/default model version is resolved + /// internally so callers do not need to distinguish ModelId from ModelVersionId. pub async fn record_ai_usage( &self, project_uid: Uuid, @@ -13,11 +18,22 @@ impl AppService { output_tokens: i64, ) -> Result { use agent::billing::BillingResult; + use models::agents::model_version; + + let version_id = model_version::Entity::find() + .filter(model_version::Column::ModelId.eq(model_id)) + .filter(model_version::Column::Status.eq("active")) + .order_by_desc(model_version::Column::IsDefault) + .order_by_desc(model_version::Column::ReleaseDate) + .one(&self.db) + .await? + .map(|v| v.id) + .unwrap_or(model_id); match agent::billing::record_ai_usage( &self.db, project_uid, - model_id, + version_id, input_tokens, output_tokens, ) diff --git a/libs/service/project/billing.rs b/libs/service/project/billing.rs index 68a9956..0c220b4 100644 --- a/libs/service/project/billing.rs +++ b/libs/service/project/billing.rs @@ -69,14 +69,15 @@ impl AppService { let month_used = project_billing_history::Entity::find() .filter(project_billing_history::Column::Project.eq(project.id)) - .filter(project_billing_history::Column::Reason.eq("ai_usage_monthly")) + .filter(project_billing_history::Column::Reason.like("ai_usage%")) .filter(project_billing_history::Column::CreatedAt.gte(month_start)) .filter(project_billing_history::Column::CreatedAt.lt(next_month_start)) - .order_by_desc(project_billing_history::Column::CreatedAt) - .one(&self.db) + .all(&self.db) .await? + .into_iter() .map(|m| m.amount) - .unwrap_or(Decimal::ZERO); + .sum::(); + let month_used = -month_used; Ok(ProjectBillingCurrentResponse { project_uid: project.id, @@ -155,7 +156,7 @@ impl AppService { .filter(models::projects::project::Column::CreatedBy.eq(uid)) .all(&self.db) .await?; - if existing_projects.is_empty() { + if existing_projects.len() <= 1 { Decimal::from_f64_retain(DEFAULT_PROJECT_MONTHLY_CREDIT).unwrap_or(Decimal::ZERO) } else { Decimal::ZERO diff --git a/libs/service/workspace/alert.rs b/libs/service/workspace/alert.rs index 2aef095..37852ea 100644 --- a/libs/service/workspace/alert.rs +++ b/libs/service/workspace/alert.rs @@ -249,7 +249,8 @@ impl AppService { .unwrap_or_default() .into_iter() .map(|r| r.amount.to_f64().unwrap_or_default()) - .sum() + .sum(); + -month_used } /// Get email addresses for workspace owners and admins who have email notifications enabled. diff --git a/libs/service/workspace/billing.rs b/libs/service/workspace/billing.rs index 99ecce8..7237ca5 100644 --- a/libs/service/workspace/billing.rs +++ b/libs/service/workspace/billing.rs @@ -92,6 +92,7 @@ impl AppService { .into_iter() .map(|m| m.amount.to_f64().unwrap_or_default()) .sum::(); + let month_used = -month_used; Ok(WorkspaceBillingCurrentResponse { workspace_id: ws.id, @@ -188,25 +189,22 @@ impl AppService { let billing = self.ensure_workspace_billing(ws.id, Some(user_uid)).await?; let now_utc = Utc::now(); - let new_balance = - Decimal::from_f64_retain(billing.balance.to_f64().unwrap_or_default() + params.amount) - .unwrap_or(Decimal::ZERO); + let amount_dec = + Decimal::from_f64_retain(params.amount).unwrap_or(Decimal::ZERO); + let new_balance = billing.balance + amount_dec; + let currency = billing.currency.clone(); - let _ = workspace_billing::ActiveModel { - workspace_id: Unchanged(ws.id), - balance: Set(new_balance), - updated_at: Set(now_utc), - ..Default::default() - } - .update(&self.db) - .await; + let mut updated: workspace_billing::ActiveModel = billing.into(); + updated.balance = Set(new_balance); + updated.updated_at = Set(now_utc); + updated.update(&self.db).await?; let _ = workspace_billing_history::ActiveModel { uid: Set(Uuid::now_v7()), workspace_id: Set(ws.id), user_id: Set(Some(user_uid)), amount: Set(Decimal::from_f64_retain(params.amount).unwrap_or(Decimal::ZERO)), - currency: Set(billing.currency.clone()), + currency: Set(currency), reason: Set(params.reason.unwrap_or_else(|| "credit_added".to_string())), extra: Set(None), created_at: Set(now_utc), diff --git a/libs/service/workspace/init.rs b/libs/service/workspace/init.rs index ce91587..7c66063 100644 --- a/libs/service/workspace/init.rs +++ b/libs/service/workspace/init.rs @@ -97,7 +97,7 @@ impl AppService { .filter(workspace_membership::Column::Status.eq("active")) .all(&self.db) .await?; - let initial_balance = if existing_workspaces.len() <= 1 { + let initial_balance = if existing_workspaces.is_empty() { Decimal::from_f64_retain(30.0).unwrap_or(Decimal::ZERO) } else { Decimal::ZERO diff --git a/src/components/room/RoomSettingsPanel.tsx b/src/components/room/RoomSettingsPanel.tsx index 8adac22..f98438b 100644 --- a/src/components/room/RoomSettingsPanel.tsx +++ b/src/components/room/RoomSettingsPanel.tsx @@ -464,7 +464,10 @@ export const RoomSettingsPanel = memo(function RoomSettingsPanel({ style={{ background: 'var(--room-bg)', borderColor: 'var(--room-border)', color: 'var(--room-text)' }} > - {agentType === 'react' ? 'ReAct (multi-step reasoning)' : 'Chat (simple)'} + {agentType === 'react' ? 'ReAct' : + agentType === 'cot' ? 'CoT' : + agentType === 'rewoo' ? 'ReWOO' : + agentType === 'reflexion' ? 'Reflexion' : 'Chat'} @@ -476,6 +479,18 @@ export const RoomSettingsPanel = memo(function RoomSettingsPanel({ ReAct Multi-step + tools + + CoT + Chain-of-Thought + + + ReWOO + Plan → Execute → Synthesize + + + Reflexion + Generate → Critique → Revise + @@ -704,12 +719,12 @@ export const RoomSettingsPanel = memo(function RoomSettingsPanel({ think )} - {config.agent_type === 'react' && ( + {config.agent_type && ['react', 'cot', 'rewoo', 'reflexion'].includes(config.agent_type) && ( - react + {config.agent_type} )}