gitdataai/libs/agent/task/tree.rs
ZhenYi d45e9e28f4 refactor(agent): split monolithic service files into specialized modules
Extract agent, compact, embed, task, and modes modules from single
service.rs files into focused sub-modules. Add orao module for
O1-like reasoning loop. Move RigAgentService to rig_tool.rs.
2026-05-11 17:04:57 +08:00

89 lines
3.6 KiB
Rust

use models::agent_task::{ActiveModel, Column as C, Entity, Model, TaskStatus};
use sea_orm::{ActiveModelTrait, ColumnTrait, DbErr, EntityTrait, QueryFilter, QueryOrder};
impl super::TaskService {
/// Propagate child task status up the tree.
///
/// When a child task reaches a terminal state, checks whether all its
/// siblings are also terminal. If so, marks the parent appropriately:
/// - Done if any child succeeded
/// - Failed if all children failed or were cancelled
pub async fn propagate_to_parent(&self, task_id: i64) -> Result<Option<Model>, DbErr> {
let model = self
.get(task_id)
.await?
.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?;
let Some(parent_id) = model.parent_id else {
return Ok(None);
};
let siblings = self.children(parent_id).await?;
if siblings.iter().all(|s| s.is_done()) {
let parent = self.get(parent_id).await?.ok_or_else(|| {
DbErr::RecordNotFound(format!("parent task {} not found", parent_id))
})?;
if parent.is_running() {
let mut active: ActiveModel = parent.into();
let has_success = siblings.iter().any(|s| s.status == TaskStatus::Done);
if has_success {
active.status = sea_orm::Set(TaskStatus::Done);
active.error = sea_orm::Set(None);
} else {
active.status = sea_orm::Set(TaskStatus::Failed);
active.error =
sea_orm::Set(Some("All sub-tasks failed or were cancelled".to_string()));
}
active.done_at = sea_orm::Set(Some(chrono::Utc::now().into()));
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
let updated = active.update(self.db()).await?;
return Ok(Some(updated));
}
}
Ok(None)
}
/// List all sub-tasks for a parent task.
pub async fn children(&self, parent_id: i64) -> Result<Vec<Model>, DbErr> {
Entity::find()
.filter(C::ParentId.eq(parent_id))
.order_by_asc(C::CreatedAt)
.all(self.db())
.await
}
/// Check if all sub-tasks of a given parent are in a terminal state.
/// Returns true if there are no children (empty tree counts as done).
pub async fn are_children_done(&self, parent_id: i64) -> Result<bool, DbErr> {
let children = self.children(parent_id).await?;
Ok(children.is_empty() || children.iter().all(|c| c.is_done()))
}
/// Delete a task and all its sub-tasks recursively.
/// Only allows deletion of root tasks.
pub async fn delete(&self, task_id: i64) -> Result<(), DbErr> {
// Collect all task IDs to delete using an explicit stack (avoiding async recursion).
let mut stack = vec![task_id];
let mut idx = 0;
while idx < stack.len() {
let current = stack[idx];
let children = Entity::find()
.filter(C::ParentId.eq(current))
.all(self.db())
.await?;
for child in children {
stack.push(child.id);
}
idx += 1;
}
for task_id in stack {
let model = Entity::find_by_id(task_id).one(self.db()).await?;
if let Some(m) = model {
let active: ActiveModel = m.into();
active.delete(self.db()).await?;
}
}
Ok(())
}
}