diff --git a/Cargo.toml b/Cargo.toml index d47bb32..5d956bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,7 +59,7 @@ actix-csrf = "0.8.0" actix-rt = "2.11.0" actix = "0.13" async-stream = "0.3" -async-nats = "0.47.0" + actix-service = "2.0.3" actix-utils = "3.0.1" redis = "1.1.0" diff --git a/libs/agent/error.rs b/libs/agent/error.rs index 51320b7..4f7afb9 100644 --- a/libs/agent/error.rs +++ b/libs/agent/error.rs @@ -4,10 +4,36 @@ use thiserror::Error; pub enum AgentError { #[error("openai error: {0}")] OpenAi(String), + #[error("qdrant error: {0}")] Qdrant(String), + #[error("internal error: {0}")] Internal(String), + + /// The task exceeded its timeout limit. + #[error("task {task_id} timed out after {seconds}s")] + Timeout { task_id: i64, seconds: u64 }, + + /// The agent has been rate-limited; retry after the indicated delay. + #[error("rate limited, retry after {retry_after_secs}s")] + RateLimited { retry_after_secs: u64 }, + + /// A transient error that can be retried. + #[error("retryable error (attempt {attempt}): {message}")] + Retryable { attempt: u32, message: String }, + + /// The requested tool is not registered in the tool registry. + #[error("tool not found: {tool}")] + ToolNotFound { tool: String }, + + /// A tool execution failed. + #[error("tool '{tool}' execution failed: {cause}")] + ToolExecutionFailed { tool: String, cause: String }, + + /// The request contains invalid input. + #[error("invalid input in '{field}': {reason}")] + InvalidInput { field: String, reason: String }, } pub type Result = std::result::Result; diff --git a/libs/agent/task/service.rs b/libs/agent/task/service.rs index 801b59c..e19aa61 100644 --- a/libs/agent/task/service.rs +++ b/libs/agent/task/service.rs @@ -5,23 +5,222 @@ //! to the caller — this service only manages task lifecycle and state. use db::database::AppDatabase; -use models::agent_task::{ - ActiveModel, AgentType, Column as C, Entity, Model, TaskStatus, -}; +use models::agent_task::{ActiveModel, AgentType, Column as C, Entity, Model, TaskStatus}; +use models::IssueId; use sea_orm::{ entity::EntityTrait, query::{QueryFilter, QueryOrder, QuerySelect}, ActiveModelTrait, - ColumnTrait, DbErr, + ColumnTrait, + DbErr, }; +use serde::Serialize; +use std::sync::Arc; + +/// Event payload published to WebSocket clients via Redis Pub/Sub. +#[derive(Debug, Clone, Serialize)] +pub struct TaskEvent { + pub task_id: i64, + pub project_id: uuid::Uuid, + pub parent_id: Option, + pub event: String, + pub message: Option, + pub output: Option, + pub error: Option, + pub status: String, +} + +impl TaskEvent { + pub fn started(task_id: i64, project_id: uuid::Uuid, parent_id: Option) -> Self { + Self { + task_id, + project_id, + parent_id, + event: "started".to_string(), + message: None, + output: None, + error: None, + status: TaskStatus::Running.to_string(), + } + } + + pub fn progress( + task_id: i64, + project_id: uuid::Uuid, + parent_id: Option, + msg: String, + ) -> Self { + Self { + task_id, + project_id, + parent_id, + event: "progress".to_string(), + message: Some(msg), + output: None, + error: None, + status: TaskStatus::Running.to_string(), + } + } + + pub fn completed( + task_id: i64, + project_id: uuid::Uuid, + parent_id: Option, + output: String, + ) -> Self { + Self { + task_id, + project_id, + parent_id, + event: "done".to_string(), + message: None, + output: Some(output), + error: None, + status: TaskStatus::Done.to_string(), + } + } + + pub fn failed( + task_id: i64, + project_id: uuid::Uuid, + parent_id: Option, + error: String, + ) -> Self { + Self { + task_id, + project_id, + parent_id, + event: "failed".to_string(), + message: None, + output: None, + error: Some(error), + status: TaskStatus::Failed.to_string(), + } + } + + pub fn cancelled(task_id: i64, project_id: uuid::Uuid, parent_id: Option) -> Self { + Self { + task_id, + project_id, + parent_id, + event: "cancelled".to_string(), + message: None, + output: None, + error: None, + status: TaskStatus::Cancelled.to_string(), + } + } +} + +/// Helper trait for publishing task lifecycle events via Redis Pub/Sub. +/// +/// Callers inject a suitable `publish_fn` at construction time via +/// `TaskEvents::new(...)`. If no publisher is supplied events are silently +/// dropped (graceful degradation on startup). +pub trait TaskEventPublisher: Send + Sync { + fn publish(&self, project_id: uuid::Uuid, event: TaskEvent); +} + +/// No-op publisher used when no Redis Pub/Sub connection is available. +#[derive(Clone, Default)] +pub struct NoOpPublisher; + +impl TaskEventPublisher for NoOpPublisher { + fn publish(&self, _: uuid::Uuid, _: TaskEvent) {} +} + +#[derive(Clone)] +pub struct TaskEvents { + publisher: Arc, +} + +impl TaskEvents { + pub fn new(publisher: impl TaskEventPublisher + 'static) -> Self { + Self { + publisher: Arc::new(publisher), + } + } + + pub fn noop() -> Self { + Self::new(NoOpPublisher) + } + + fn emit(&self, task: &Model, event: TaskEvent) { + self.publisher.publish(task.project_uuid, event); + } + + pub fn emit_started(&self, task: &Model) { + self.emit( + task, + TaskEvent::started(task.id, task.project_uuid, task.parent_id), + ); + } + + pub fn emit_progress(&self, task: &Model, msg: String) { + self.emit( + task, + TaskEvent::progress(task.id, task.project_uuid, task.parent_id, msg), + ); + } + + pub fn emit_completed(&self, task: &Model, output: String) { + self.emit( + task, + TaskEvent::completed(task.id, task.project_uuid, task.parent_id, output), + ); + } + + pub fn emit_failed(&self, task: &Model, error: String) { + self.emit( + task, + TaskEvent::failed(task.id, task.project_uuid, task.parent_id, error), + ); + } + + pub fn emit_cancelled(&self, task: &Model) { + self.emit( + task, + TaskEvent::cancelled(task.id, task.project_uuid, task.parent_id), + ); + } +} + +/// Builder for TaskService so that the events publisher can be set independently +/// of the database connection. +#[derive(Clone, Default)] +pub struct TaskServiceBuilder { + events: Option, +} + +impl TaskServiceBuilder { + pub fn with_events(mut self, events: TaskEvents) -> Self { + self.events = Some(events); + self + } + + pub async fn build(self, db: AppDatabase) -> TaskService { + TaskService { + db, + events: self.events.unwrap_or_else(TaskEvents::noop), + } + } +} /// Service for managing agent tasks (root tasks and sub-tasks). #[derive(Clone)] pub struct TaskService { db: AppDatabase, + events: TaskEvents, } impl TaskService { pub fn new(db: AppDatabase) -> Self { - Self { db } + Self { + db, + events: TaskEvents::noop(), + } + } + + pub fn with_events(db: AppDatabase, events: TaskEvents) -> Self { + Self { db, events } } /// Create a new task (root or sub-task) with status = pending. @@ -31,7 +230,20 @@ impl TaskService { input: impl Into, agent_type: AgentType, ) -> Result { - self.create_with_parent(project_uuid, None, input, agent_type, None).await + self.create_with_parent(project_uuid, None, input, agent_type, None, None) + .await + } + + /// Create a new task bound to an issue. + pub async fn create_for_issue( + &self, + project_uuid: impl Into, + issue_id: IssueId, + input: impl Into, + agent_type: AgentType, + ) -> Result { + self.create_with_parent(project_uuid, None, input, agent_type, None, Some(issue_id)) + .await } /// Create a new sub-task with a parent reference. @@ -43,8 +255,15 @@ impl TaskService { agent_type: AgentType, title: Option, ) -> Result { - self.create_with_parent(project_uuid, Some(parent_id), input, agent_type, title) - .await + self.create_with_parent( + project_uuid, + Some(parent_id), + input, + agent_type, + title, + None, + ) + .await } async fn create_with_parent( @@ -54,10 +273,12 @@ impl TaskService { input: impl Into, agent_type: AgentType, title: Option, + issue_id: Option, ) -> Result { let model = ActiveModel { project_uuid: sea_orm::Set(project_uuid.into()), parent_id: sea_orm::Set(parent_id), + issue_id: sea_orm::Set(issue_id), agent_type: sea_orm::Set(agent_type), status: sea_orm::Set(TaskStatus::Pending), title: sea_orm::Set(title), @@ -70,61 +291,223 @@ impl TaskService { /// Mark a task as running and record the start time. pub async fn start(&self, task_id: i64) -> Result { let model = Entity::find_by_id(task_id).one(&self.db).await?; - let model = model.ok_or_else(|| { - DbErr::RecordNotFound("agent_task not found".to_string()) - })?; + let model = + model.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?; let mut active: ActiveModel = model.into(); active.status = sea_orm::Set(TaskStatus::Running); active.started_at = sea_orm::Set(Some(chrono::Utc::now().into())); active.updated_at = sea_orm::Set(chrono::Utc::now().into()); - active.update(&self.db).await + let updated = active.update(&self.db).await?; + self.events.emit_started(&updated); + Ok(updated) } /// Update progress text (e.g., "step 2/5: analyzing PR"). - pub async fn update_progress(&self, task_id: i64, progress: impl Into) -> Result<(), DbErr> { + pub async fn update_progress( + &self, + task_id: i64, + progress: impl Into, + ) -> Result<(), DbErr> { let model = Entity::find_by_id(task_id).one(&self.db).await?; - let model = model.ok_or_else(|| { - DbErr::RecordNotFound("agent_task not found".to_string()) - })?; + let model = + model.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?; + let progress_str = progress.into(); let mut active: ActiveModel = model.into(); - active.progress = sea_orm::Set(Some(progress.into())); + active.progress = sea_orm::Set(Some(progress_str.clone())); active.updated_at = sea_orm::Set(chrono::Utc::now().into()); - active.update(&self.db).await?; + let updated = active.update(&self.db).await?; + self.events.emit_progress(&updated, progress_str); Ok(()) } /// Mark a task as completed with the output text. pub async fn complete(&self, task_id: i64, output: impl Into) -> Result { let model = Entity::find_by_id(task_id).one(&self.db).await?; - let model = model.ok_or_else(|| { - DbErr::RecordNotFound("agent_task not found".to_string()) - })?; + let model = + model.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?; let mut active: ActiveModel = model.into(); active.status = sea_orm::Set(TaskStatus::Done); - active.output = sea_orm::Set(Some(output.into())); + let out = output.into(); + active.output = sea_orm::Set(Some(out.clone())); active.done_at = sea_orm::Set(Some(chrono::Utc::now().into())); active.updated_at = sea_orm::Set(chrono::Utc::now().into()); - active.update(&self.db).await + let updated = active.update(&self.db).await?; + self.events.emit_completed(&updated, out); + Ok(updated) } /// Mark a task as failed with an error message. pub async fn fail(&self, task_id: i64, error: impl Into) -> Result { let model = Entity::find_by_id(task_id).one(&self.db).await?; - let model = model.ok_or_else(|| { - DbErr::RecordNotFound("agent_task not found".to_string()) - })?; + let model = + model.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?; let mut active: ActiveModel = model.into(); active.status = sea_orm::Set(TaskStatus::Failed); - active.error = sea_orm::Set(Some(error.into())); + let err = error.into(); + active.error = sea_orm::Set(Some(err.clone())); active.done_at = sea_orm::Set(Some(chrono::Utc::now().into())); active.updated_at = sea_orm::Set(chrono::Utc::now().into()); + let updated = active.update(&self.db).await?; + self.events.emit_failed(&updated, err); + Ok(updated) + } + + /// Propagate child task status up the tree. + /// + /// Only allows cancelling tasks that are not yet in a terminal state + /// (Pending / Running / Paused). + /// + /// Cancelled children are marked done so that `are_children_done()` returns + /// true for the parent after cancellation. + pub async fn cancel(&self, task_id: i64) -> Result { + // Collect all task IDs (parent + descendants) using an explicit stack. + let mut stack = vec![task_id]; + let mut idx = 0; + while idx < stack.len() { + let current = stack[idx]; + let children = Entity::find() + .filter(C::ParentId.eq(current)) + .all(&self.db) + .await?; + for child in children { + stack.push(child.id); + } + idx += 1; + } + + // Mark every collected task as cancelled (terminal state). + for id in &stack { + let model = Entity::find_by_id(*id).one(&self.db).await?; + if let Some(m) = model { + if !m.is_done() { + let mut active: ActiveModel = m.into(); + active.status = sea_orm::Set(TaskStatus::Cancelled); + active.done_at = sea_orm::Set(Some(chrono::Utc::now().into())); + active.updated_at = sea_orm::Set(chrono::Utc::now().into()); + active.update(&self.db).await?; + } + } + } + + let final_model = Entity::find_by_id(task_id) + .one(&self.db) + .await? + .ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?; + self.events.emit_cancelled(&final_model); + Ok(final_model) + } + + /// Pause a running or pending task. + /// + /// Pausing a task that is not Pending/Running is a no-op that returns + /// the current model (same behaviour as `start` on an already-running task). + pub async fn pause(&self, task_id: i64) -> Result { + let model = Entity::find_by_id(task_id).one(&self.db).await?; + let model = + model.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?; + + if !model.is_running() { + // Already in a terminal or paused state — return unchanged. + return Ok(model); + } + + let mut active: ActiveModel = model.into(); + active.status = sea_orm::Set(TaskStatus::Paused); + active.updated_at = sea_orm::Set(chrono::Utc::now().into()); active.update(&self.db).await } + /// Resume a paused task back to Running. + /// + /// Returns an error if the task is not currently Paused. + pub async fn resume(&self, task_id: i64) -> Result { + let model = Entity::find_by_id(task_id).one(&self.db).await?; + let model = + model.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?; + + if model.status != TaskStatus::Paused { + return Err(DbErr::Custom(format!( + "cannot resume task {}: expected status Paused, got {}", + task_id, model.status + ))); + } + + let mut active: ActiveModel = model.into(); + active.status = sea_orm::Set(TaskStatus::Running); + active.updated_at = sea_orm::Set(chrono::Utc::now().into()); + active.update(&self.db).await + } + + /// Retry a failed or cancelled task by resetting it to Pending. + /// + /// Clears `output`, `error`, and `done_at`; increments `retry_count`. + /// Only tasks in Failed or Cancelled state can be retried. + pub async fn retry(&self, task_id: i64) -> Result { + let model = Entity::find_by_id(task_id).one(&self.db).await?; + let model = + model.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?; + + match model.status { + TaskStatus::Failed | TaskStatus::Cancelled | TaskStatus::Done => {} + _ => { + return Err(DbErr::Custom(format!( + "cannot retry task {}: only Failed/Cancelled/Done tasks can be retried (got {})", + task_id, model.status + ))); + } + } + + let retry_count = model.retry_count.map(|c| c + 1).unwrap_or(1); + + let mut active: ActiveModel = model.into(); + active.status = sea_orm::Set(TaskStatus::Pending); + active.output = sea_orm::Set(None); + active.error = sea_orm::Set(None); + active.done_at = sea_orm::Set(None); + active.started_at = sea_orm::Set(None); + active.retry_count = sea_orm::Set(Some(retry_count)); + active.updated_at = sea_orm::Set(chrono::Utc::now().into()); + active.update(&self.db).await + } + + /// Propagate child task status up the tree. + /// + /// When a child task reaches a terminal state, checks whether all its + /// siblings are also terminal. If so, marks the parent as failed so that + /// a stuck parent is never left in the `Running` state. + pub async fn propagate_to_parent(&self, task_id: i64) -> Result, DbErr> { + let model = self + .get(task_id) + .await? + .ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?; + + let Some(parent_id) = model.parent_id else { + return Ok(None); + }; + + let siblings = self.children(parent_id).await?; + if siblings.iter().all(|s| s.is_done()) { + let parent = self.get(parent_id).await?.ok_or_else(|| { + DbErr::RecordNotFound(format!("parent task {} not found", parent_id)) + })?; + if parent.is_running() { + let mut active: ActiveModel = parent.into(); + active.status = sea_orm::Set(TaskStatus::Failed); + active.error = + sea_orm::Set(Some("All sub-tasks failed or were cancelled".to_string())); + active.done_at = sea_orm::Set(Some(chrono::Utc::now().into())); + active.updated_at = sea_orm::Set(chrono::Utc::now().into()); + let updated = active.update(&self.db).await?; + return Ok(Some(updated)); + } + } + Ok(None) + } + /// Get a task by ID. pub async fn get(&self, task_id: i64) -> Result, DbErr> { Entity::find_by_id(task_id).one(&self.db).await @@ -140,11 +523,14 @@ impl TaskService { } /// List all active (non-terminal) tasks for a project. - pub async fn active_tasks(&self, project_uuid: impl Into) -> Result, DbErr> { + pub async fn active_tasks( + &self, + project_uuid: impl Into, + ) -> Result, DbErr> { let uuid: uuid::Uuid = project_uuid.into(); Entity::find() .filter(C::ProjectUuid.eq(uuid)) - .filter(C::Status.is_in([TaskStatus::Pending, TaskStatus::Running])) + .filter(C::Status.is_in([TaskStatus::Pending, TaskStatus::Running, TaskStatus::Paused])) .order_by_desc(C::CreatedAt) .all(&self.db) .await @@ -198,12 +584,10 @@ impl TaskService { Ok(()) } - /// Check if all sub-tasks of a given parent are done. + /// Check if all sub-tasks of a given parent are in a terminal state. + /// Returns true if there are no children (empty tree counts as done). pub async fn are_children_done(&self, parent_id: i64) -> Result { let children = self.children(parent_id).await?; - if children.is_empty() { - return Ok(true); - } - Ok(children.iter().all(|c| c.is_done())) + Ok(children.is_empty() || children.iter().all(|c| c.is_done())) } } diff --git a/libs/agent/tool/executor.rs b/libs/agent/tool/executor.rs index 1753fde..b372e93 100644 --- a/libs/agent/tool/executor.rs +++ b/libs/agent/tool/executor.rs @@ -1,7 +1,7 @@ //! Executes tool calls and converts results to OpenAI `tool` messages. -use futures::StreamExt; use futures::stream; +use futures::StreamExt; use async_openai::types::chat::{ ChatCompletionRequestMessage, ChatCompletionRequestToolMessage, @@ -70,8 +70,9 @@ impl ToolExecutor { ctx.increment_tool_calls(); let concurrency = self.max_concurrency; - use std::sync::Mutex; - let results: Mutex> = Mutex::new(Vec::with_capacity(calls.len())); + use tokio::sync::Mutex as AsyncMutex; + let results: AsyncMutex> = + AsyncMutex::new(Vec::with_capacity(calls.len())); stream::iter(calls.into_iter().map(|call| { let child_ctx = ctx.child_context(); @@ -91,12 +92,12 @@ impl ToolExecutor { e.to_string(), ) }); - results.lock().unwrap().push(r); + results.lock().await.push(r); }, ) .await; - Ok(results.into_inner().unwrap()) + Ok(results.into_inner()) } async fn execute_one( diff --git a/libs/migrate/lib.rs b/libs/migrate/lib.rs index b539866..d7ecbcf 100644 --- a/libs/migrate/lib.rs +++ b/libs/migrate/lib.rs @@ -79,6 +79,8 @@ impl MigratorTrait for Migrator { Box::new(m20260412_000003_create_project_skill::Migration), Box::new(m20260413_000001_add_skill_commit_blob::Migration), Box::new(m20260414_000001_create_agent_task::Migration), + Box::new(m20260415_000001_add_issue_id_to_agent_task::Migration), + Box::new(m20260416_000001_add_retry_count_to_agent_task::Migration), // Repo tables Box::new(m20250628_000028_create_repo::Migration), Box::new(m20250628_000029_create_repo_branch::Migration), @@ -248,3 +250,5 @@ pub mod m20260412_000002_create_workspace_billing_history; pub mod m20260412_000003_create_project_skill; pub mod m20260413_000001_add_skill_commit_blob; pub mod m20260414_000001_create_agent_task; +pub mod m20260415_000001_add_issue_id_to_agent_task; +pub mod m20260416_000001_add_retry_count_to_agent_task; diff --git a/libs/migrate/m20260415_000001_add_issue_id_to_agent_task.rs b/libs/migrate/m20260415_000001_add_issue_id_to_agent_task.rs new file mode 100644 index 0000000..46dc4f2 --- /dev/null +++ b/libs/migrate/m20260415_000001_add_issue_id_to_agent_task.rs @@ -0,0 +1,23 @@ +//! SeaORM migration: add issue_id to agent_task + +use sea_orm_migration::prelude::*; + +pub struct Migration; + +impl MigrationName for Migration { + fn name(&self) -> &str { + "m20260415_000001_add_issue_id_to_agent_task" + } +} + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let sql = include_str!("sql/m20260415_000001_add_issue_id_to_agent_task.sql"); + super::execute_sql(manager, sql).await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + super::execute_sql(manager, "ALTER TABLE agent_task DROP COLUMN IF EXISTS issue_id;").await + } +} diff --git a/libs/migrate/m20260416_000001_add_retry_count_to_agent_task.rs b/libs/migrate/m20260416_000001_add_retry_count_to_agent_task.rs new file mode 100644 index 0000000..96ea6ee --- /dev/null +++ b/libs/migrate/m20260416_000001_add_retry_count_to_agent_task.rs @@ -0,0 +1,27 @@ +//! SeaORM migration: add retry_count to agent_task + +use sea_orm_migration::prelude::*; + +pub struct Migration; + +impl MigrationName for Migration { + fn name(&self) -> &str { + "m20260416_000001_add_retry_count_to_agent_task" + } +} + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let sql = include_str!("sql/m20260416_000001_add_retry_count_to_agent_task.sql"); + super::execute_sql(manager, sql).await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + super::execute_sql( + manager, + "ALTER TABLE agent_task DROP COLUMN IF EXISTS retry_count;", + ) + .await + } +} diff --git a/libs/migrate/sql/m20260415_000001_add_issue_id_to_agent_task.sql b/libs/migrate/sql/m20260415_000001_add_issue_id_to_agent_task.sql new file mode 100644 index 0000000..b4c085d --- /dev/null +++ b/libs/migrate/sql/m20260415_000001_add_issue_id_to_agent_task.sql @@ -0,0 +1,4 @@ +-- Add issue_id column to agent_task for issue-task binding +ALTER TABLE agent_task ADD COLUMN IF NOT EXISTS issue_id UUID; + +CREATE INDEX IF NOT EXISTS idx_agent_task_issue ON agent_task (issue_id); diff --git a/libs/migrate/sql/m20260416_000001_add_retry_count_to_agent_task.sql b/libs/migrate/sql/m20260416_000001_add_retry_count_to_agent_task.sql new file mode 100644 index 0000000..2fe281f --- /dev/null +++ b/libs/migrate/sql/m20260416_000001_add_retry_count_to_agent_task.sql @@ -0,0 +1,4 @@ +-- Add retry_count column to agent_task for retry tracking +ALTER TABLE agent_task ADD COLUMN IF NOT EXISTS retry_count INTEGER DEFAULT 0; + +CREATE INDEX IF NOT EXISTS idx_agent_task_retry_count ON agent_task (retry_count); diff --git a/libs/models/agent_task/mod.rs b/libs/models/agent_task/mod.rs index abf2205..ea4aa9d 100644 --- a/libs/models/agent_task/mod.rs +++ b/libs/models/agent_task/mod.rs @@ -10,7 +10,7 @@ //! Sub-agents are represented as `agent_task` records with a parent reference, //! allowing hierarchical task trees and result aggregation. -use crate::{DateTimeUtc, ProjectId, UserId}; +use crate::{DateTimeUtc, IssueId, ProjectId, UserId}; use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; @@ -51,10 +51,14 @@ pub enum TaskStatus { Pending, #[sea_orm(string_value = "Running")] Running, + #[sea_orm(string_value = "Paused")] + Paused, #[sea_orm(string_value = "Done")] Done, #[sea_orm(string_value = "Failed")] Failed, + #[sea_orm(string_value = "Cancelled")] + Cancelled, } impl Default for TaskStatus { @@ -68,8 +72,10 @@ impl std::fmt::Display for TaskStatus { match self { TaskStatus::Pending => write!(f, "Pending"), TaskStatus::Running => write!(f, "Running"), + TaskStatus::Paused => write!(f, "Paused"), TaskStatus::Done => write!(f, "Done"), TaskStatus::Failed => write!(f, "Failed"), + TaskStatus::Cancelled => write!(f, "Cancelled"), } } } @@ -88,6 +94,10 @@ pub struct Model { #[sea_orm(nullable)] pub parent_id: Option, + /// Issue this task is bound to (optional). + #[sea_orm(nullable)] + pub issue_id: Option, + /// Agent type that executes this task. #[sea_orm(column_type = "String(StringLen::None)", default = "React")] pub agent_type: AgentType, @@ -129,15 +139,15 @@ pub struct Model { /// Current progress description (e.g., "step 2/5: analyzing code"). #[sea_orm(nullable)] pub progress: Option, + + /// Number of times this task has been retried. + #[sea_orm(nullable)] + pub retry_count: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation { - #[sea_orm( - belongs_to = "Entity", - from = "Column::ParentId", - to = "Column::Id" - )] + #[sea_orm(belongs_to = "Entity", from = "Column::ParentId", to = "Column::Id")] ParentTask, } @@ -149,6 +159,16 @@ impl Model { } pub fn is_done(&self) -> bool { - matches!(self.status, TaskStatus::Done | TaskStatus::Failed) + matches!( + self.status, + TaskStatus::Done | TaskStatus::Failed | TaskStatus::Cancelled + ) + } + + pub fn is_running(&self) -> bool { + matches!( + self.status, + TaskStatus::Running | TaskStatus::Pending | TaskStatus::Paused + ) } } diff --git a/libs/queue/lib.rs b/libs/queue/lib.rs index d0084ce..80e6ed7 100644 --- a/libs/queue/lib.rs +++ b/libs/queue/lib.rs @@ -1,4 +1,4 @@ -//! Room message queue: Redis Streams + NATS. +//! Room message queue: Redis Streams + Redis Pub/Sub. pub mod producer; pub mod types; @@ -10,6 +10,6 @@ pub use types::{ RoomMessageEvent, RoomMessageStreamChunkEvent, }; pub use worker::{ - EmailSendFn, EmailSendFut, GetRedis, PersistFn, RedisFuture, room_worker_task, - start as start_worker, start_email_worker, + room_worker_task, start as start_worker, start_email_worker, EmailSendFn, EmailSendFut, GetRedis, + PersistFn, RedisFuture, }; diff --git a/libs/room/src/metrics.rs b/libs/room/src/metrics.rs index f119f35..145e97c 100644 --- a/libs/room/src/metrics.rs +++ b/libs/room/src/metrics.rs @@ -2,8 +2,8 @@ use std::collections::HashMap; use std::sync::Arc; use metrics::{ - Counter, Gauge, Histogram, Unit, describe_counter, describe_gauge, describe_histogram, - register_counter, register_gauge, register_histogram, + describe_counter, describe_gauge, describe_histogram, register_counter, register_gauge, register_histogram, Counter, + Gauge, Histogram, Unit, }; use tokio::sync::RwLock; use uuid::Uuid; @@ -20,7 +20,7 @@ pub struct RoomMetrics { pub broadcasts_sent: Counter, pub broadcasts_dropped: Counter, pub duplicates_skipped: Counter, - pub nats_publish_failed: Counter, + pub redis_publish_failed: Counter, pub message_latency_ms: Histogram, pub ws_rate_limit_hits: Counter, pub ws_auth_failures: Counter, @@ -78,9 +78,9 @@ impl Default for RoomMetrics { "Total duplicate messages skipped (idempotency)" ); describe_counter!( - "room_nats_publish_failed_total", + "room_redis_publish_failed_total", Unit::Count, - "Total NATS publish failures" + "Total Redis publish failures" ); describe_histogram!( "room_message_latency_ms", @@ -130,7 +130,7 @@ impl Default for RoomMetrics { broadcasts_sent: register_counter!("room_broadcasts_sent_total"), broadcasts_dropped: register_counter!("room_broadcasts_dropped_total"), duplicates_skipped: register_counter!("room_duplicates_skipped_total"), - nats_publish_failed: register_counter!("room_nats_publish_failed_total"), + redis_publish_failed: register_counter!("room_redis_publish_failed_total"), message_latency_ms: register_histogram!("room_message_latency_ms"), ws_rate_limit_hits: register_counter!("room_ws_rate_limit_hits_total"), ws_auth_failures: register_counter!("room_ws_auth_failures_total"), diff --git a/libs/room/src/service.rs b/libs/room/src/service.rs index e9cd852..0356301 100644 --- a/libs/room/src/service.rs +++ b/libs/room/src/service.rs @@ -242,7 +242,7 @@ impl RoomService { project_id: Uuid, agent_type: AgentType, input: String, - title: Option, + _title: Option, execute: F, ) -> anyhow::Result where diff --git a/libs/service/agent/sync.rs b/libs/service/agent/sync.rs index 4f9883e..cf32f57 100644 --- a/libs/service/agent/sync.rs +++ b/libs/service/agent/sync.rs @@ -35,6 +35,7 @@ struct OpenRouterResponse { } #[derive(Debug, Clone, Deserialize)] +#[allow(dead_code)] struct OpenRouterModel { id: String, name: Option, @@ -50,6 +51,7 @@ struct OpenRouterModel { } #[derive(Debug, Clone, Deserialize)] +#[allow(dead_code)] struct OpenRouterPricing { prompt: String, completion: String, @@ -68,6 +70,7 @@ struct OpenRouterPricing { } #[derive(Debug, Clone, Deserialize)] +#[allow(dead_code)] struct OpenRouterArchitecture { #[serde(default)] modality: Option, @@ -82,6 +85,7 @@ struct OpenRouterArchitecture { } #[derive(Debug, Clone, Deserialize)] +#[allow(dead_code)] struct OpenRouterTopProvider { #[serde(default)] context_length: Option,