gitdataai/libs/agent/task/service.rs
ZhenYi a09ff66191 refactor(room): remove NATS, use Redis pub/sub for message queue
- Remove async-nats from Cargo.toml dependencies
- Rename nats_publish_failed metric → redis_publish_failed
- Update queue lib doc comment: Redis Streams + Redis Pub/Sub
- Add Paused/Cancelled task statuses to agent_task model
- Add issue_id and retry_count fields to agent_task
- Switch tool executor Mutex from std::sync → tokio::sync (async context)
- Add timeout/rate-limited/retryable/tool-not-found error variants
2026-04-16 17:24:04 +08:00

594 lines
20 KiB
Rust

//! Task service for creating, tracking, and executing agent tasks.
//!
//! All methods are async and interact with the database directly.
//! Execution of the task logic (running the ReAct loop, etc.) is delegated
//! to the caller — this service only manages task lifecycle and state.
use db::database::AppDatabase;
use models::agent_task::{ActiveModel, AgentType, Column as C, Entity, Model, TaskStatus};
use models::IssueId;
use sea_orm::{
entity::EntityTrait, query::{QueryFilter, QueryOrder, QuerySelect}, ActiveModelTrait,
ColumnTrait,
DbErr,
};
use serde::Serialize;
use std::sync::Arc;
/// Event payload published to WebSocket clients via Redis Pub/Sub.
#[derive(Debug, Clone, Serialize)]
pub struct TaskEvent {
pub task_id: i64,
pub project_id: uuid::Uuid,
pub parent_id: Option<i64>,
pub event: String,
pub message: Option<String>,
pub output: Option<String>,
pub error: Option<String>,
pub status: String,
}
impl TaskEvent {
pub fn started(task_id: i64, project_id: uuid::Uuid, parent_id: Option<i64>) -> Self {
Self {
task_id,
project_id,
parent_id,
event: "started".to_string(),
message: None,
output: None,
error: None,
status: TaskStatus::Running.to_string(),
}
}
pub fn progress(
task_id: i64,
project_id: uuid::Uuid,
parent_id: Option<i64>,
msg: String,
) -> Self {
Self {
task_id,
project_id,
parent_id,
event: "progress".to_string(),
message: Some(msg),
output: None,
error: None,
status: TaskStatus::Running.to_string(),
}
}
pub fn completed(
task_id: i64,
project_id: uuid::Uuid,
parent_id: Option<i64>,
output: String,
) -> Self {
Self {
task_id,
project_id,
parent_id,
event: "done".to_string(),
message: None,
output: Some(output),
error: None,
status: TaskStatus::Done.to_string(),
}
}
pub fn failed(
task_id: i64,
project_id: uuid::Uuid,
parent_id: Option<i64>,
error: String,
) -> Self {
Self {
task_id,
project_id,
parent_id,
event: "failed".to_string(),
message: None,
output: None,
error: Some(error),
status: TaskStatus::Failed.to_string(),
}
}
pub fn cancelled(task_id: i64, project_id: uuid::Uuid, parent_id: Option<i64>) -> Self {
Self {
task_id,
project_id,
parent_id,
event: "cancelled".to_string(),
message: None,
output: None,
error: None,
status: TaskStatus::Cancelled.to_string(),
}
}
}
/// Helper trait for publishing task lifecycle events via Redis Pub/Sub.
///
/// Callers inject a suitable `publish_fn` at construction time via
/// `TaskEvents::new(...)`. If no publisher is supplied events are silently
/// dropped (graceful degradation on startup).
pub trait TaskEventPublisher: Send + Sync {
fn publish(&self, project_id: uuid::Uuid, event: TaskEvent);
}
/// No-op publisher used when no Redis Pub/Sub connection is available.
#[derive(Clone, Default)]
pub struct NoOpPublisher;
impl TaskEventPublisher for NoOpPublisher {
fn publish(&self, _: uuid::Uuid, _: TaskEvent) {}
}
#[derive(Clone)]
pub struct TaskEvents {
publisher: Arc<dyn TaskEventPublisher>,
}
impl TaskEvents {
pub fn new(publisher: impl TaskEventPublisher + 'static) -> Self {
Self {
publisher: Arc::new(publisher),
}
}
pub fn noop() -> Self {
Self::new(NoOpPublisher)
}
fn emit(&self, task: &Model, event: TaskEvent) {
self.publisher.publish(task.project_uuid, event);
}
pub fn emit_started(&self, task: &Model) {
self.emit(
task,
TaskEvent::started(task.id, task.project_uuid, task.parent_id),
);
}
pub fn emit_progress(&self, task: &Model, msg: String) {
self.emit(
task,
TaskEvent::progress(task.id, task.project_uuid, task.parent_id, msg),
);
}
pub fn emit_completed(&self, task: &Model, output: String) {
self.emit(
task,
TaskEvent::completed(task.id, task.project_uuid, task.parent_id, output),
);
}
pub fn emit_failed(&self, task: &Model, error: String) {
self.emit(
task,
TaskEvent::failed(task.id, task.project_uuid, task.parent_id, error),
);
}
pub fn emit_cancelled(&self, task: &Model) {
self.emit(
task,
TaskEvent::cancelled(task.id, task.project_uuid, task.parent_id),
);
}
}
/// Builder for TaskService so that the events publisher can be set independently
/// of the database connection.
#[derive(Clone, Default)]
pub struct TaskServiceBuilder {
events: Option<TaskEvents>,
}
impl TaskServiceBuilder {
pub fn with_events(mut self, events: TaskEvents) -> Self {
self.events = Some(events);
self
}
pub async fn build(self, db: AppDatabase) -> TaskService {
TaskService {
db,
events: self.events.unwrap_or_else(TaskEvents::noop),
}
}
}
/// Service for managing agent tasks (root tasks and sub-tasks).
#[derive(Clone)]
pub struct TaskService {
db: AppDatabase,
events: TaskEvents,
}
impl TaskService {
pub fn new(db: AppDatabase) -> Self {
Self {
db,
events: TaskEvents::noop(),
}
}
pub fn with_events(db: AppDatabase, events: TaskEvents) -> Self {
Self { db, events }
}
/// Create a new task (root or sub-task) with status = pending.
pub async fn create(
&self,
project_uuid: impl Into<uuid::Uuid>,
input: impl Into<String>,
agent_type: AgentType,
) -> Result<Model, DbErr> {
self.create_with_parent(project_uuid, None, input, agent_type, None, None)
.await
}
/// Create a new task bound to an issue.
pub async fn create_for_issue(
&self,
project_uuid: impl Into<uuid::Uuid>,
issue_id: IssueId,
input: impl Into<String>,
agent_type: AgentType,
) -> Result<Model, DbErr> {
self.create_with_parent(project_uuid, None, input, agent_type, None, Some(issue_id))
.await
}
/// Create a new sub-task with a parent reference.
pub async fn create_subtask(
&self,
project_uuid: impl Into<uuid::Uuid>,
parent_id: i64,
input: impl Into<String>,
agent_type: AgentType,
title: Option<String>,
) -> Result<Model, DbErr> {
self.create_with_parent(
project_uuid,
Some(parent_id),
input,
agent_type,
title,
None,
)
.await
}
async fn create_with_parent(
&self,
project_uuid: impl Into<uuid::Uuid>,
parent_id: Option<i64>,
input: impl Into<String>,
agent_type: AgentType,
title: Option<String>,
issue_id: Option<IssueId>,
) -> Result<Model, DbErr> {
let model = ActiveModel {
project_uuid: sea_orm::Set(project_uuid.into()),
parent_id: sea_orm::Set(parent_id),
issue_id: sea_orm::Set(issue_id),
agent_type: sea_orm::Set(agent_type),
status: sea_orm::Set(TaskStatus::Pending),
title: sea_orm::Set(title),
input: sea_orm::Set(input.into()),
..Default::default()
};
model.insert(&self.db).await
}
/// Mark a task as running and record the start time.
pub async fn start(&self, task_id: i64) -> Result<Model, DbErr> {
let model = Entity::find_by_id(task_id).one(&self.db).await?;
let model =
model.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?;
let mut active: ActiveModel = model.into();
active.status = sea_orm::Set(TaskStatus::Running);
active.started_at = sea_orm::Set(Some(chrono::Utc::now().into()));
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
let updated = active.update(&self.db).await?;
self.events.emit_started(&updated);
Ok(updated)
}
/// Update progress text (e.g., "step 2/5: analyzing PR").
pub async fn update_progress(
&self,
task_id: i64,
progress: impl Into<String>,
) -> Result<(), DbErr> {
let model = Entity::find_by_id(task_id).one(&self.db).await?;
let model =
model.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?;
let progress_str = progress.into();
let mut active: ActiveModel = model.into();
active.progress = sea_orm::Set(Some(progress_str.clone()));
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
let updated = active.update(&self.db).await?;
self.events.emit_progress(&updated, progress_str);
Ok(())
}
/// Mark a task as completed with the output text.
pub async fn complete(&self, task_id: i64, output: impl Into<String>) -> Result<Model, DbErr> {
let model = Entity::find_by_id(task_id).one(&self.db).await?;
let model =
model.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?;
let mut active: ActiveModel = model.into();
active.status = sea_orm::Set(TaskStatus::Done);
let out = output.into();
active.output = sea_orm::Set(Some(out.clone()));
active.done_at = sea_orm::Set(Some(chrono::Utc::now().into()));
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
let updated = active.update(&self.db).await?;
self.events.emit_completed(&updated, out);
Ok(updated)
}
/// Mark a task as failed with an error message.
pub async fn fail(&self, task_id: i64, error: impl Into<String>) -> Result<Model, DbErr> {
let model = Entity::find_by_id(task_id).one(&self.db).await?;
let model =
model.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?;
let mut active: ActiveModel = model.into();
active.status = sea_orm::Set(TaskStatus::Failed);
let err = error.into();
active.error = sea_orm::Set(Some(err.clone()));
active.done_at = sea_orm::Set(Some(chrono::Utc::now().into()));
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
let updated = active.update(&self.db).await?;
self.events.emit_failed(&updated, err);
Ok(updated)
}
/// Propagate child task status up the tree.
///
/// Only allows cancelling tasks that are not yet in a terminal state
/// (Pending / Running / Paused).
///
/// Cancelled children are marked done so that `are_children_done()` returns
/// true for the parent after cancellation.
pub async fn cancel(&self, task_id: i64) -> Result<Model, DbErr> {
// Collect all task IDs (parent + descendants) using an explicit stack.
let mut stack = vec![task_id];
let mut idx = 0;
while idx < stack.len() {
let current = stack[idx];
let children = Entity::find()
.filter(C::ParentId.eq(current))
.all(&self.db)
.await?;
for child in children {
stack.push(child.id);
}
idx += 1;
}
// Mark every collected task as cancelled (terminal state).
for id in &stack {
let model = Entity::find_by_id(*id).one(&self.db).await?;
if let Some(m) = model {
if !m.is_done() {
let mut active: ActiveModel = m.into();
active.status = sea_orm::Set(TaskStatus::Cancelled);
active.done_at = sea_orm::Set(Some(chrono::Utc::now().into()));
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
active.update(&self.db).await?;
}
}
}
let final_model = Entity::find_by_id(task_id)
.one(&self.db)
.await?
.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?;
self.events.emit_cancelled(&final_model);
Ok(final_model)
}
/// Pause a running or pending task.
///
/// Pausing a task that is not Pending/Running is a no-op that returns
/// the current model (same behaviour as `start` on an already-running task).
pub async fn pause(&self, task_id: i64) -> Result<Model, DbErr> {
let model = Entity::find_by_id(task_id).one(&self.db).await?;
let model =
model.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?;
if !model.is_running() {
// Already in a terminal or paused state — return unchanged.
return Ok(model);
}
let mut active: ActiveModel = model.into();
active.status = sea_orm::Set(TaskStatus::Paused);
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
active.update(&self.db).await
}
/// Resume a paused task back to Running.
///
/// Returns an error if the task is not currently Paused.
pub async fn resume(&self, task_id: i64) -> Result<Model, DbErr> {
let model = Entity::find_by_id(task_id).one(&self.db).await?;
let model =
model.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?;
if model.status != TaskStatus::Paused {
return Err(DbErr::Custom(format!(
"cannot resume task {}: expected status Paused, got {}",
task_id, model.status
)));
}
let mut active: ActiveModel = model.into();
active.status = sea_orm::Set(TaskStatus::Running);
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
active.update(&self.db).await
}
/// Retry a failed or cancelled task by resetting it to Pending.
///
/// Clears `output`, `error`, and `done_at`; increments `retry_count`.
/// Only tasks in Failed or Cancelled state can be retried.
pub async fn retry(&self, task_id: i64) -> Result<Model, DbErr> {
let model = Entity::find_by_id(task_id).one(&self.db).await?;
let model =
model.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?;
match model.status {
TaskStatus::Failed | TaskStatus::Cancelled | TaskStatus::Done => {}
_ => {
return Err(DbErr::Custom(format!(
"cannot retry task {}: only Failed/Cancelled/Done tasks can be retried (got {})",
task_id, model.status
)));
}
}
let retry_count = model.retry_count.map(|c| c + 1).unwrap_or(1);
let mut active: ActiveModel = model.into();
active.status = sea_orm::Set(TaskStatus::Pending);
active.output = sea_orm::Set(None);
active.error = sea_orm::Set(None);
active.done_at = sea_orm::Set(None);
active.started_at = sea_orm::Set(None);
active.retry_count = sea_orm::Set(Some(retry_count));
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
active.update(&self.db).await
}
/// Propagate child task status up the tree.
///
/// When a child task reaches a terminal state, checks whether all its
/// siblings are also terminal. If so, marks the parent as failed so that
/// a stuck parent is never left in the `Running` state.
pub async fn propagate_to_parent(&self, task_id: i64) -> Result<Option<Model>, DbErr> {
let model = self
.get(task_id)
.await?
.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?;
let Some(parent_id) = model.parent_id else {
return Ok(None);
};
let siblings = self.children(parent_id).await?;
if siblings.iter().all(|s| s.is_done()) {
let parent = self.get(parent_id).await?.ok_or_else(|| {
DbErr::RecordNotFound(format!("parent task {} not found", parent_id))
})?;
if parent.is_running() {
let mut active: ActiveModel = parent.into();
active.status = sea_orm::Set(TaskStatus::Failed);
active.error =
sea_orm::Set(Some("All sub-tasks failed or were cancelled".to_string()));
active.done_at = sea_orm::Set(Some(chrono::Utc::now().into()));
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
let updated = active.update(&self.db).await?;
return Ok(Some(updated));
}
}
Ok(None)
}
/// Get a task by ID.
pub async fn get(&self, task_id: i64) -> Result<Option<Model>, DbErr> {
Entity::find_by_id(task_id).one(&self.db).await
}
/// List all sub-tasks for a parent task.
pub async fn children(&self, parent_id: i64) -> Result<Vec<Model>, DbErr> {
Entity::find()
.filter(C::ParentId.eq(parent_id))
.order_by_asc(C::CreatedAt)
.all(&self.db)
.await
}
/// List all active (non-terminal) tasks for a project.
pub async fn active_tasks(
&self,
project_uuid: impl Into<uuid::Uuid>,
) -> Result<Vec<Model>, DbErr> {
let uuid: uuid::Uuid = project_uuid.into();
Entity::find()
.filter(C::ProjectUuid.eq(uuid))
.filter(C::Status.is_in([TaskStatus::Pending, TaskStatus::Running, TaskStatus::Paused]))
.order_by_desc(C::CreatedAt)
.all(&self.db)
.await
}
/// List all tasks (root only) for a project.
pub async fn list(
&self,
project_uuid: impl Into<uuid::Uuid>,
limit: u64,
) -> Result<Vec<Model>, DbErr> {
let uuid: uuid::Uuid = project_uuid.into();
Entity::find()
.filter(C::ProjectUuid.eq(uuid))
.filter(C::ParentId.is_null())
.order_by_desc(C::CreatedAt)
.limit(limit)
.all(&self.db)
.await
}
/// Delete a task and all its sub-tasks recursively.
/// Only allows deletion of root tasks.
pub async fn delete(&self, task_id: i64) -> Result<(), DbErr> {
self.delete_recursive(task_id).await
}
async fn delete_recursive(&self, task_id: i64) -> Result<(), DbErr> {
// Collect all task IDs to delete using an explicit stack (avoiding async recursion).
let mut stack = vec![task_id];
let mut idx = 0;
while idx < stack.len() {
let current = stack[idx];
let children = Entity::find()
.filter(C::ParentId.eq(current))
.all(&self.db)
.await?;
for child in children {
stack.push(child.id);
}
idx += 1;
}
for task_id in stack {
let model = Entity::find_by_id(task_id).one(&self.db).await?;
if let Some(m) = model {
let active: ActiveModel = m.into();
active.delete(&self.db).await?;
}
}
Ok(())
}
/// Check if all sub-tasks of a given parent are in a terminal state.
/// Returns true if there are no children (empty tree counts as done).
pub async fn are_children_done(&self, parent_id: i64) -> Result<bool, DbErr> {
let children = self.children(parent_id).await?;
Ok(children.is_empty() || children.iter().all(|c| c.is_done()))
}
}