gitdataai/libs/agent/orao/mod.rs
ZhenYi d45e9e28f4 refactor(agent): split monolithic service files into specialized modules
Extract agent, compact, embed, task, and modes modules from single
service.rs files into focused sub-modules. Add orao module for
O1-like reasoning loop. Move RigAgentService to rig_tool.rs.
2026-05-11 17:04:57 +08:00

428 lines
15 KiB
Rust
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! ORAO (ObserveReasonActObserve) — a single-agent loop for complex engineering tasks.
//!
//! ORAO extends the ReAct paradigm with:
//! - **Multi-channel perception**: LLM-driven observation via read-only tools
//! - **Structured reasoning**: analysis + step-by-step action plan
//! - **Safety levels**: L0L4 permission grading for every action
//! - **Deadlock detection**: terminates after 3 rounds with no progress
//! - **Plan mode**: optional user-approval gate before execution
//! - **Round recording**: full audit trail for debugging and resumption
//!
//! # Architecture
//!
//! The [`OraoExecutor`] runs the O→R→A→O loop:
//! 1. **Observe** — LLM explores environment via observation tools, produces snapshot
//! 2. **Reason** — LLM analyzes snapshot, generates structured plan
//! 3. **Act** — Execute each planned action via [`ActionExecutor`] with safety checks
//! 4. **Observe** — Collect results, feed into next round
//!
//! All file access goes through function calls (tools), never direct filesystem operations.
//!
//! [`ActionExecutor`]: act::ActionExecutor
pub mod act;
pub mod observe;
pub mod reason;
pub mod types;
use std::time::Instant;
use crate::client::AiClientConfig;
use crate::error::{AgentError, Result};
pub use act::ActionExecutor;
pub use types::{
ActionResult, ActionType, ActionVerdict, FileChange, FileChangeType, OraoConfig, OraoStep,
PerceptionSnapshot, PlannedAction, ReasoningOutput, RoundRecord, SafetyLevel,
};
// ── ORAO Executor ───────────────────────────────────────────────────────────
/// Executes the ORAO loop for a single task.
///
/// All environment interaction goes through:
/// - **Observation tools** (read-only) for the Observe phase
/// - **Action executor** callback for the Act phase
///
/// No direct filesystem access — everything is mediated through function calls.
pub struct OraoExecutor {
config: AiClientConfig,
model_name: String,
action_executor: ActionExecutor,
}
impl OraoExecutor {
/// Create a new ORAO executor.
///
/// `action_executor` is called to execute each planned action. Wire it to
/// your [`ToolRegistry`] for tool-based execution, or use
/// [`act::shell_executor`] for simple shell-command execution.
///
/// [`ToolRegistry`]: crate::tool::ToolRegistry
pub fn new(
config: AiClientConfig,
model_name: impl Into<String>,
action_executor: ActionExecutor,
) -> Self {
Self {
config,
model_name: model_name.into(),
action_executor,
}
}
/// Run the ORAO loop to completion.
///
/// # Parameters
/// - `task_goal`: Description of what to accomplish.
/// - `orao_config`: ORAO-specific settings (max rounds, safety level, etc.).
/// - `tool_factory`: Called each round to produce read-only observation tools
/// (e.g. `git_diff`, `git_blob`, `repo_search`, `git_grep`). This allows
/// callers to provide fresh tool instances each round.
/// - `on_step`: Called with each [`OraoStep`] event for streaming/persistence.
/// - `on_plan_approval`: Called in plan mode; return `true` to proceed.
pub async fn execute<C, Fut, PA, PAFut, TF>(
&self,
task_goal: &str,
orao_config: &OraoConfig,
tool_factory: TF,
on_step: C,
on_plan_approval: PA,
) -> Result<OraoOutcome>
where
C: Fn(OraoStep) -> Fut + Send,
Fut: Future<Output = ()> + Send,
PA: Fn(ReasoningOutput) -> PAFut + Send,
PAFut: Future<Output = bool> + Send,
TF: Fn() -> Vec<Box<dyn rig::tool::ToolDyn + 'static>> + Send + Sync,
{
let mut round = 0usize;
let mut round_records: Vec<RoundRecord> = Vec::new();
let mut previous_result: Option<ActionResult> = None;
let mut previous_snapshot: Option<PerceptionSnapshot> = None;
let mut no_change_count: usize = 0;
// Observation turns: limit tool calls during exploration
let observe_max_turns = 10;
loop {
round += 1;
let round_start = Instant::now();
let round_input_tokens: u64 = 0;
let round_output_tokens: u64 = 0;
// ── Phase 1: Observe ───────────────────────────────────────
let snapshot = observe::observe(
&self.config,
&self.model_name,
task_goal,
previous_result.take(),
tool_factory(),
observe_max_turns,
)
.await?;
on_step(OraoStep::Observe {
round,
snapshot: snapshot.clone(),
})
.await;
// ── Deadlock detection ─────────────────────────────────────
if let Some(ref prev) = previous_snapshot {
if !observe::has_environment_changed(prev, &snapshot) {
no_change_count += 1;
if no_change_count >= orao_config.deadlock_threshold {
let reason = format!(
"Deadlock detected: no environmental change for {} consecutive rounds",
no_change_count
);
on_step(OraoStep::Failed {
total_rounds: round,
reason: reason.clone(),
})
.await;
return Ok(OraoOutcome::Failed {
reason,
rounds: round,
records: round_records,
});
}
} else {
no_change_count = 0;
}
}
previous_snapshot = Some(snapshot.clone());
// ── Phase 2: Reason ────────────────────────────────────────
let reasoning = reason::reason(
&self.config,
&self.model_name,
orao_config,
task_goal,
&snapshot,
round,
&round_records,
)
.await?;
on_step(OraoStep::Reason {
round,
reasoning: reasoning.clone(),
})
.await;
// ── Plan mode gate ─────────────────────────────────────────
if orao_config.plan_mode {
on_step(OraoStep::PlanProposed {
round,
reasoning: reasoning.clone(),
})
.await;
if !on_plan_approval(reasoning.clone()).await {
return Ok(OraoOutcome::Cancelled {
rounds: round,
records: round_records,
});
}
}
// ── Phase 3: Act ───────────────────────────────────────────
let mut round_result: Option<ActionResult> = None;
let mut all_success = true;
for planned in &reasoning.plan {
let safety = SafetyLevel::classify_command(&planned.command_or_content);
on_step(OraoStep::Act {
round,
action: planned.clone(),
safety_level: safety,
})
.await;
let result =
act::execute_action(planned.clone(), orao_config, &self.action_executor).await;
on_step(OraoStep::ObserveResult {
round,
result: result.clone(),
})
.await;
match &result.verdict {
ActionVerdict::Failure => {
all_success = false;
round_result = Some(result);
break; // Stop executing further steps on failure
}
ActionVerdict::SuccessWithWarnings => {
round_result = Some(result);
}
ActionVerdict::Success => {
round_result = Some(result);
}
}
}
// ── Phase 4: Record round ──────────────────────────────────
let duration_ms = round_start.elapsed().as_millis() as u64;
let record = RoundRecord {
round,
observe_summary: summarize_snapshot(&snapshot),
reasoning_summary: reasoning.analysis.clone(),
action: reasoning.plan.first().cloned(),
result_summary: round_result
.as_ref()
.map(|r| format!("{:?}: {}", r.verdict, truncate(&r.stdout, 200))),
tokens_input: round_input_tokens,
tokens_output: round_output_tokens,
duration_ms,
};
round_records.push(record);
// ── Check termination ──────────────────────────────────────
if all_success && !reasoning.plan.is_empty() {
let summary = format!(
"Task completed in {} round(s). Last action: {}",
round,
round_result
.as_ref()
.map(|r| truncate(&r.stdout, 500))
.unwrap_or_default()
);
on_step(OraoStep::Completed {
total_rounds: round,
summary: summary.clone(),
})
.await;
return Ok(OraoOutcome::Completed {
summary,
rounds: round,
records: round_records,
});
}
// Max rounds exceeded
if round >= orao_config.max_rounds {
let reason = format!("Reached max rounds ({})", orao_config.max_rounds);
on_step(OraoStep::Failed {
total_rounds: round,
reason: reason.clone(),
})
.await;
return Ok(OraoOutcome::Failed {
reason,
rounds: round,
records: round_records,
});
}
// Prepare for next round
previous_result = round_result;
}
}
}
// ── Outcome ─────────────────────────────────────────────────────────────────
/// Final outcome of an ORAO execution.
#[derive(Debug, Clone)]
pub enum OraoOutcome {
/// Task completed successfully.
Completed {
summary: String,
rounds: usize,
records: Vec<RoundRecord>,
},
/// Task failed (max rounds, deadlock, or unrecoverable error).
Failed {
reason: String,
rounds: usize,
records: Vec<RoundRecord>,
},
/// User cancelled the task (plan mode rejection or explicit interrupt).
Cancelled {
rounds: usize,
records: Vec<RoundRecord>,
},
}
impl OraoOutcome {
/// Number of rounds executed.
pub fn rounds(&self) -> usize {
match self {
Self::Completed { rounds, .. }
| Self::Failed { rounds, .. }
| Self::Cancelled { rounds, .. } => *rounds,
}
}
/// Whether the task was successful.
pub fn is_success(&self) -> bool {
matches!(self, Self::Completed { .. })
}
/// Round records for audit/debugging.
pub fn records(&self) -> &[RoundRecord] {
match self {
Self::Completed { records, .. }
| Self::Failed { records, .. }
| Self::Cancelled { records, .. } => records,
}
}
}
// ── Helpers ─────────────────────────────────────────────────────────────────
fn summarize_snapshot(snapshot: &PerceptionSnapshot) -> String {
let mut parts: Vec<String> = Vec::new();
if let Some(ref gs) = snapshot.git_status {
let first_line = gs.lines().next().unwrap_or("");
parts.push(format!("git: {}", truncate(first_line, 80)));
}
if !snapshot.files.is_empty() {
parts.push(format!("{} files", snapshot.files.len()));
}
if !snapshot.errors.is_empty() {
parts.push(format!("{} errors", snapshot.errors.len()));
}
if parts.is_empty() {
"no changes".to_string()
} else {
parts.join(", ")
}
}
fn truncate(s: &str, max_len: usize) -> String {
if s.len() <= max_len {
s.to_string()
} else {
format!("{}...", &s[..max_len])
}
}
// ── Convenience builder ─────────────────────────────────────────────────────
/// Builder for [`OraoExecutor`] with chainable configuration.
pub struct OraoExecutorBuilder {
config: Option<AiClientConfig>,
model_name: Option<String>,
action_executor: Option<ActionExecutor>,
}
impl OraoExecutorBuilder {
pub fn new() -> Self {
Self {
config: None,
model_name: None,
action_executor: None,
}
}
pub fn ai_config(mut self, config: AiClientConfig) -> Self {
self.config = Some(config);
self
}
pub fn model(mut self, name: impl Into<String>) -> Self {
self.model_name = Some(name.into());
self
}
pub fn action_executor(mut self, executor: ActionExecutor) -> Self {
self.action_executor = Some(executor);
self
}
pub fn build(self) -> Result<OraoExecutor> {
let config = self.config.ok_or_else(|| AgentError::InvalidInput {
field: "config".to_string(),
reason: "AI client config is required".to_string(),
})?;
let model_name = self.model_name.ok_or_else(|| AgentError::InvalidInput {
field: "model_name".to_string(),
reason: "Model name is required".to_string(),
})?;
let action_executor = self
.action_executor
.ok_or_else(|| AgentError::InvalidInput {
field: "action_executor".to_string(),
reason: "Action executor is required".to_string(),
})?;
Ok(OraoExecutor::new(config, model_name, action_executor))
}
}
impl Default for OraoExecutorBuilder {
fn default() -> Self {
Self::new()
}
}