refactor(room): remove NATS, use Redis pub/sub for message queue
- Remove async-nats from Cargo.toml dependencies - Rename nats_publish_failed metric → redis_publish_failed - Update queue lib doc comment: Redis Streams + Redis Pub/Sub - Add Paused/Cancelled task statuses to agent_task model - Add issue_id and retry_count fields to agent_task - Switch tool executor Mutex from std::sync → tokio::sync (async context) - Add timeout/rate-limited/retryable/tool-not-found error variants
This commit is contained in:
parent
41f96af064
commit
a09ff66191
@ -59,7 +59,7 @@ actix-csrf = "0.8.0"
|
|||||||
actix-rt = "2.11.0"
|
actix-rt = "2.11.0"
|
||||||
actix = "0.13"
|
actix = "0.13"
|
||||||
async-stream = "0.3"
|
async-stream = "0.3"
|
||||||
async-nats = "0.47.0"
|
|
||||||
actix-service = "2.0.3"
|
actix-service = "2.0.3"
|
||||||
actix-utils = "3.0.1"
|
actix-utils = "3.0.1"
|
||||||
redis = "1.1.0"
|
redis = "1.1.0"
|
||||||
|
|||||||
@ -4,10 +4,36 @@ use thiserror::Error;
|
|||||||
pub enum AgentError {
|
pub enum AgentError {
|
||||||
#[error("openai error: {0}")]
|
#[error("openai error: {0}")]
|
||||||
OpenAi(String),
|
OpenAi(String),
|
||||||
|
|
||||||
#[error("qdrant error: {0}")]
|
#[error("qdrant error: {0}")]
|
||||||
Qdrant(String),
|
Qdrant(String),
|
||||||
|
|
||||||
#[error("internal error: {0}")]
|
#[error("internal error: {0}")]
|
||||||
Internal(String),
|
Internal(String),
|
||||||
|
|
||||||
|
/// The task exceeded its timeout limit.
|
||||||
|
#[error("task {task_id} timed out after {seconds}s")]
|
||||||
|
Timeout { task_id: i64, seconds: u64 },
|
||||||
|
|
||||||
|
/// The agent has been rate-limited; retry after the indicated delay.
|
||||||
|
#[error("rate limited, retry after {retry_after_secs}s")]
|
||||||
|
RateLimited { retry_after_secs: u64 },
|
||||||
|
|
||||||
|
/// A transient error that can be retried.
|
||||||
|
#[error("retryable error (attempt {attempt}): {message}")]
|
||||||
|
Retryable { attempt: u32, message: String },
|
||||||
|
|
||||||
|
/// The requested tool is not registered in the tool registry.
|
||||||
|
#[error("tool not found: {tool}")]
|
||||||
|
ToolNotFound { tool: String },
|
||||||
|
|
||||||
|
/// A tool execution failed.
|
||||||
|
#[error("tool '{tool}' execution failed: {cause}")]
|
||||||
|
ToolExecutionFailed { tool: String, cause: String },
|
||||||
|
|
||||||
|
/// The request contains invalid input.
|
||||||
|
#[error("invalid input in '{field}': {reason}")]
|
||||||
|
InvalidInput { field: String, reason: String },
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, AgentError>;
|
pub type Result<T> = std::result::Result<T, AgentError>;
|
||||||
|
|||||||
@ -5,23 +5,222 @@
|
|||||||
//! to the caller — this service only manages task lifecycle and state.
|
//! to the caller — this service only manages task lifecycle and state.
|
||||||
|
|
||||||
use db::database::AppDatabase;
|
use db::database::AppDatabase;
|
||||||
use models::agent_task::{
|
use models::agent_task::{ActiveModel, AgentType, Column as C, Entity, Model, TaskStatus};
|
||||||
ActiveModel, AgentType, Column as C, Entity, Model, TaskStatus,
|
use models::IssueId;
|
||||||
};
|
|
||||||
use sea_orm::{
|
use sea_orm::{
|
||||||
entity::EntityTrait, query::{QueryFilter, QueryOrder, QuerySelect}, ActiveModelTrait,
|
entity::EntityTrait, query::{QueryFilter, QueryOrder, QuerySelect}, ActiveModelTrait,
|
||||||
ColumnTrait, DbErr,
|
ColumnTrait,
|
||||||
|
DbErr,
|
||||||
};
|
};
|
||||||
|
use serde::Serialize;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
/// Event payload published to WebSocket clients via Redis Pub/Sub.
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct TaskEvent {
|
||||||
|
pub task_id: i64,
|
||||||
|
pub project_id: uuid::Uuid,
|
||||||
|
pub parent_id: Option<i64>,
|
||||||
|
pub event: String,
|
||||||
|
pub message: Option<String>,
|
||||||
|
pub output: Option<String>,
|
||||||
|
pub error: Option<String>,
|
||||||
|
pub status: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TaskEvent {
|
||||||
|
pub fn started(task_id: i64, project_id: uuid::Uuid, parent_id: Option<i64>) -> Self {
|
||||||
|
Self {
|
||||||
|
task_id,
|
||||||
|
project_id,
|
||||||
|
parent_id,
|
||||||
|
event: "started".to_string(),
|
||||||
|
message: None,
|
||||||
|
output: None,
|
||||||
|
error: None,
|
||||||
|
status: TaskStatus::Running.to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn progress(
|
||||||
|
task_id: i64,
|
||||||
|
project_id: uuid::Uuid,
|
||||||
|
parent_id: Option<i64>,
|
||||||
|
msg: String,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
task_id,
|
||||||
|
project_id,
|
||||||
|
parent_id,
|
||||||
|
event: "progress".to_string(),
|
||||||
|
message: Some(msg),
|
||||||
|
output: None,
|
||||||
|
error: None,
|
||||||
|
status: TaskStatus::Running.to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn completed(
|
||||||
|
task_id: i64,
|
||||||
|
project_id: uuid::Uuid,
|
||||||
|
parent_id: Option<i64>,
|
||||||
|
output: String,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
task_id,
|
||||||
|
project_id,
|
||||||
|
parent_id,
|
||||||
|
event: "done".to_string(),
|
||||||
|
message: None,
|
||||||
|
output: Some(output),
|
||||||
|
error: None,
|
||||||
|
status: TaskStatus::Done.to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn failed(
|
||||||
|
task_id: i64,
|
||||||
|
project_id: uuid::Uuid,
|
||||||
|
parent_id: Option<i64>,
|
||||||
|
error: String,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
task_id,
|
||||||
|
project_id,
|
||||||
|
parent_id,
|
||||||
|
event: "failed".to_string(),
|
||||||
|
message: None,
|
||||||
|
output: None,
|
||||||
|
error: Some(error),
|
||||||
|
status: TaskStatus::Failed.to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn cancelled(task_id: i64, project_id: uuid::Uuid, parent_id: Option<i64>) -> Self {
|
||||||
|
Self {
|
||||||
|
task_id,
|
||||||
|
project_id,
|
||||||
|
parent_id,
|
||||||
|
event: "cancelled".to_string(),
|
||||||
|
message: None,
|
||||||
|
output: None,
|
||||||
|
error: None,
|
||||||
|
status: TaskStatus::Cancelled.to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper trait for publishing task lifecycle events via Redis Pub/Sub.
|
||||||
|
///
|
||||||
|
/// Callers inject a suitable `publish_fn` at construction time via
|
||||||
|
/// `TaskEvents::new(...)`. If no publisher is supplied events are silently
|
||||||
|
/// dropped (graceful degradation on startup).
|
||||||
|
pub trait TaskEventPublisher: Send + Sync {
|
||||||
|
fn publish(&self, project_id: uuid::Uuid, event: TaskEvent);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// No-op publisher used when no Redis Pub/Sub connection is available.
|
||||||
|
#[derive(Clone, Default)]
|
||||||
|
pub struct NoOpPublisher;
|
||||||
|
|
||||||
|
impl TaskEventPublisher for NoOpPublisher {
|
||||||
|
fn publish(&self, _: uuid::Uuid, _: TaskEvent) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct TaskEvents {
|
||||||
|
publisher: Arc<dyn TaskEventPublisher>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TaskEvents {
|
||||||
|
pub fn new(publisher: impl TaskEventPublisher + 'static) -> Self {
|
||||||
|
Self {
|
||||||
|
publisher: Arc::new(publisher),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn noop() -> Self {
|
||||||
|
Self::new(NoOpPublisher)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn emit(&self, task: &Model, event: TaskEvent) {
|
||||||
|
self.publisher.publish(task.project_uuid, event);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn emit_started(&self, task: &Model) {
|
||||||
|
self.emit(
|
||||||
|
task,
|
||||||
|
TaskEvent::started(task.id, task.project_uuid, task.parent_id),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn emit_progress(&self, task: &Model, msg: String) {
|
||||||
|
self.emit(
|
||||||
|
task,
|
||||||
|
TaskEvent::progress(task.id, task.project_uuid, task.parent_id, msg),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn emit_completed(&self, task: &Model, output: String) {
|
||||||
|
self.emit(
|
||||||
|
task,
|
||||||
|
TaskEvent::completed(task.id, task.project_uuid, task.parent_id, output),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn emit_failed(&self, task: &Model, error: String) {
|
||||||
|
self.emit(
|
||||||
|
task,
|
||||||
|
TaskEvent::failed(task.id, task.project_uuid, task.parent_id, error),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn emit_cancelled(&self, task: &Model) {
|
||||||
|
self.emit(
|
||||||
|
task,
|
||||||
|
TaskEvent::cancelled(task.id, task.project_uuid, task.parent_id),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builder for TaskService so that the events publisher can be set independently
|
||||||
|
/// of the database connection.
|
||||||
|
#[derive(Clone, Default)]
|
||||||
|
pub struct TaskServiceBuilder {
|
||||||
|
events: Option<TaskEvents>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TaskServiceBuilder {
|
||||||
|
pub fn with_events(mut self, events: TaskEvents) -> Self {
|
||||||
|
self.events = Some(events);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn build(self, db: AppDatabase) -> TaskService {
|
||||||
|
TaskService {
|
||||||
|
db,
|
||||||
|
events: self.events.unwrap_or_else(TaskEvents::noop),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Service for managing agent tasks (root tasks and sub-tasks).
|
/// Service for managing agent tasks (root tasks and sub-tasks).
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct TaskService {
|
pub struct TaskService {
|
||||||
db: AppDatabase,
|
db: AppDatabase,
|
||||||
|
events: TaskEvents,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TaskService {
|
impl TaskService {
|
||||||
pub fn new(db: AppDatabase) -> Self {
|
pub fn new(db: AppDatabase) -> Self {
|
||||||
Self { db }
|
Self {
|
||||||
|
db,
|
||||||
|
events: TaskEvents::noop(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn with_events(db: AppDatabase, events: TaskEvents) -> Self {
|
||||||
|
Self { db, events }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new task (root or sub-task) with status = pending.
|
/// Create a new task (root or sub-task) with status = pending.
|
||||||
@ -31,7 +230,20 @@ impl TaskService {
|
|||||||
input: impl Into<String>,
|
input: impl Into<String>,
|
||||||
agent_type: AgentType,
|
agent_type: AgentType,
|
||||||
) -> Result<Model, DbErr> {
|
) -> Result<Model, DbErr> {
|
||||||
self.create_with_parent(project_uuid, None, input, agent_type, None).await
|
self.create_with_parent(project_uuid, None, input, agent_type, None, None)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a new task bound to an issue.
|
||||||
|
pub async fn create_for_issue(
|
||||||
|
&self,
|
||||||
|
project_uuid: impl Into<uuid::Uuid>,
|
||||||
|
issue_id: IssueId,
|
||||||
|
input: impl Into<String>,
|
||||||
|
agent_type: AgentType,
|
||||||
|
) -> Result<Model, DbErr> {
|
||||||
|
self.create_with_parent(project_uuid, None, input, agent_type, None, Some(issue_id))
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new sub-task with a parent reference.
|
/// Create a new sub-task with a parent reference.
|
||||||
@ -43,8 +255,15 @@ impl TaskService {
|
|||||||
agent_type: AgentType,
|
agent_type: AgentType,
|
||||||
title: Option<String>,
|
title: Option<String>,
|
||||||
) -> Result<Model, DbErr> {
|
) -> Result<Model, DbErr> {
|
||||||
self.create_with_parent(project_uuid, Some(parent_id), input, agent_type, title)
|
self.create_with_parent(
|
||||||
.await
|
project_uuid,
|
||||||
|
Some(parent_id),
|
||||||
|
input,
|
||||||
|
agent_type,
|
||||||
|
title,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn create_with_parent(
|
async fn create_with_parent(
|
||||||
@ -54,10 +273,12 @@ impl TaskService {
|
|||||||
input: impl Into<String>,
|
input: impl Into<String>,
|
||||||
agent_type: AgentType,
|
agent_type: AgentType,
|
||||||
title: Option<String>,
|
title: Option<String>,
|
||||||
|
issue_id: Option<IssueId>,
|
||||||
) -> Result<Model, DbErr> {
|
) -> Result<Model, DbErr> {
|
||||||
let model = ActiveModel {
|
let model = ActiveModel {
|
||||||
project_uuid: sea_orm::Set(project_uuid.into()),
|
project_uuid: sea_orm::Set(project_uuid.into()),
|
||||||
parent_id: sea_orm::Set(parent_id),
|
parent_id: sea_orm::Set(parent_id),
|
||||||
|
issue_id: sea_orm::Set(issue_id),
|
||||||
agent_type: sea_orm::Set(agent_type),
|
agent_type: sea_orm::Set(agent_type),
|
||||||
status: sea_orm::Set(TaskStatus::Pending),
|
status: sea_orm::Set(TaskStatus::Pending),
|
||||||
title: sea_orm::Set(title),
|
title: sea_orm::Set(title),
|
||||||
@ -70,61 +291,223 @@ impl TaskService {
|
|||||||
/// Mark a task as running and record the start time.
|
/// Mark a task as running and record the start time.
|
||||||
pub async fn start(&self, task_id: i64) -> Result<Model, DbErr> {
|
pub async fn start(&self, task_id: i64) -> Result<Model, DbErr> {
|
||||||
let model = Entity::find_by_id(task_id).one(&self.db).await?;
|
let model = Entity::find_by_id(task_id).one(&self.db).await?;
|
||||||
let model = model.ok_or_else(|| {
|
let model =
|
||||||
DbErr::RecordNotFound("agent_task not found".to_string())
|
model.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?;
|
||||||
})?;
|
|
||||||
|
|
||||||
let mut active: ActiveModel = model.into();
|
let mut active: ActiveModel = model.into();
|
||||||
active.status = sea_orm::Set(TaskStatus::Running);
|
active.status = sea_orm::Set(TaskStatus::Running);
|
||||||
active.started_at = sea_orm::Set(Some(chrono::Utc::now().into()));
|
active.started_at = sea_orm::Set(Some(chrono::Utc::now().into()));
|
||||||
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
|
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
|
||||||
active.update(&self.db).await
|
let updated = active.update(&self.db).await?;
|
||||||
|
self.events.emit_started(&updated);
|
||||||
|
Ok(updated)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Update progress text (e.g., "step 2/5: analyzing PR").
|
/// Update progress text (e.g., "step 2/5: analyzing PR").
|
||||||
pub async fn update_progress(&self, task_id: i64, progress: impl Into<String>) -> Result<(), DbErr> {
|
pub async fn update_progress(
|
||||||
|
&self,
|
||||||
|
task_id: i64,
|
||||||
|
progress: impl Into<String>,
|
||||||
|
) -> Result<(), DbErr> {
|
||||||
let model = Entity::find_by_id(task_id).one(&self.db).await?;
|
let model = Entity::find_by_id(task_id).one(&self.db).await?;
|
||||||
let model = model.ok_or_else(|| {
|
let model =
|
||||||
DbErr::RecordNotFound("agent_task not found".to_string())
|
model.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?;
|
||||||
})?;
|
|
||||||
|
|
||||||
|
let progress_str = progress.into();
|
||||||
let mut active: ActiveModel = model.into();
|
let mut active: ActiveModel = model.into();
|
||||||
active.progress = sea_orm::Set(Some(progress.into()));
|
active.progress = sea_orm::Set(Some(progress_str.clone()));
|
||||||
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
|
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
|
||||||
active.update(&self.db).await?;
|
let updated = active.update(&self.db).await?;
|
||||||
|
self.events.emit_progress(&updated, progress_str);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mark a task as completed with the output text.
|
/// Mark a task as completed with the output text.
|
||||||
pub async fn complete(&self, task_id: i64, output: impl Into<String>) -> Result<Model, DbErr> {
|
pub async fn complete(&self, task_id: i64, output: impl Into<String>) -> Result<Model, DbErr> {
|
||||||
let model = Entity::find_by_id(task_id).one(&self.db).await?;
|
let model = Entity::find_by_id(task_id).one(&self.db).await?;
|
||||||
let model = model.ok_or_else(|| {
|
let model =
|
||||||
DbErr::RecordNotFound("agent_task not found".to_string())
|
model.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?;
|
||||||
})?;
|
|
||||||
|
|
||||||
let mut active: ActiveModel = model.into();
|
let mut active: ActiveModel = model.into();
|
||||||
active.status = sea_orm::Set(TaskStatus::Done);
|
active.status = sea_orm::Set(TaskStatus::Done);
|
||||||
active.output = sea_orm::Set(Some(output.into()));
|
let out = output.into();
|
||||||
|
active.output = sea_orm::Set(Some(out.clone()));
|
||||||
active.done_at = sea_orm::Set(Some(chrono::Utc::now().into()));
|
active.done_at = sea_orm::Set(Some(chrono::Utc::now().into()));
|
||||||
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
|
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
|
||||||
active.update(&self.db).await
|
let updated = active.update(&self.db).await?;
|
||||||
|
self.events.emit_completed(&updated, out);
|
||||||
|
Ok(updated)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mark a task as failed with an error message.
|
/// Mark a task as failed with an error message.
|
||||||
pub async fn fail(&self, task_id: i64, error: impl Into<String>) -> Result<Model, DbErr> {
|
pub async fn fail(&self, task_id: i64, error: impl Into<String>) -> Result<Model, DbErr> {
|
||||||
let model = Entity::find_by_id(task_id).one(&self.db).await?;
|
let model = Entity::find_by_id(task_id).one(&self.db).await?;
|
||||||
let model = model.ok_or_else(|| {
|
let model =
|
||||||
DbErr::RecordNotFound("agent_task not found".to_string())
|
model.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?;
|
||||||
})?;
|
|
||||||
|
|
||||||
let mut active: ActiveModel = model.into();
|
let mut active: ActiveModel = model.into();
|
||||||
active.status = sea_orm::Set(TaskStatus::Failed);
|
active.status = sea_orm::Set(TaskStatus::Failed);
|
||||||
active.error = sea_orm::Set(Some(error.into()));
|
let err = error.into();
|
||||||
|
active.error = sea_orm::Set(Some(err.clone()));
|
||||||
active.done_at = sea_orm::Set(Some(chrono::Utc::now().into()));
|
active.done_at = sea_orm::Set(Some(chrono::Utc::now().into()));
|
||||||
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
|
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
|
||||||
|
let updated = active.update(&self.db).await?;
|
||||||
|
self.events.emit_failed(&updated, err);
|
||||||
|
Ok(updated)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Propagate child task status up the tree.
|
||||||
|
///
|
||||||
|
/// Only allows cancelling tasks that are not yet in a terminal state
|
||||||
|
/// (Pending / Running / Paused).
|
||||||
|
///
|
||||||
|
/// Cancelled children are marked done so that `are_children_done()` returns
|
||||||
|
/// true for the parent after cancellation.
|
||||||
|
pub async fn cancel(&self, task_id: i64) -> Result<Model, DbErr> {
|
||||||
|
// Collect all task IDs (parent + descendants) using an explicit stack.
|
||||||
|
let mut stack = vec![task_id];
|
||||||
|
let mut idx = 0;
|
||||||
|
while idx < stack.len() {
|
||||||
|
let current = stack[idx];
|
||||||
|
let children = Entity::find()
|
||||||
|
.filter(C::ParentId.eq(current))
|
||||||
|
.all(&self.db)
|
||||||
|
.await?;
|
||||||
|
for child in children {
|
||||||
|
stack.push(child.id);
|
||||||
|
}
|
||||||
|
idx += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark every collected task as cancelled (terminal state).
|
||||||
|
for id in &stack {
|
||||||
|
let model = Entity::find_by_id(*id).one(&self.db).await?;
|
||||||
|
if let Some(m) = model {
|
||||||
|
if !m.is_done() {
|
||||||
|
let mut active: ActiveModel = m.into();
|
||||||
|
active.status = sea_orm::Set(TaskStatus::Cancelled);
|
||||||
|
active.done_at = sea_orm::Set(Some(chrono::Utc::now().into()));
|
||||||
|
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
|
||||||
|
active.update(&self.db).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let final_model = Entity::find_by_id(task_id)
|
||||||
|
.one(&self.db)
|
||||||
|
.await?
|
||||||
|
.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?;
|
||||||
|
self.events.emit_cancelled(&final_model);
|
||||||
|
Ok(final_model)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Pause a running or pending task.
|
||||||
|
///
|
||||||
|
/// Pausing a task that is not Pending/Running is a no-op that returns
|
||||||
|
/// the current model (same behaviour as `start` on an already-running task).
|
||||||
|
pub async fn pause(&self, task_id: i64) -> Result<Model, DbErr> {
|
||||||
|
let model = Entity::find_by_id(task_id).one(&self.db).await?;
|
||||||
|
let model =
|
||||||
|
model.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?;
|
||||||
|
|
||||||
|
if !model.is_running() {
|
||||||
|
// Already in a terminal or paused state — return unchanged.
|
||||||
|
return Ok(model);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut active: ActiveModel = model.into();
|
||||||
|
active.status = sea_orm::Set(TaskStatus::Paused);
|
||||||
|
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
|
||||||
active.update(&self.db).await
|
active.update(&self.db).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Resume a paused task back to Running.
|
||||||
|
///
|
||||||
|
/// Returns an error if the task is not currently Paused.
|
||||||
|
pub async fn resume(&self, task_id: i64) -> Result<Model, DbErr> {
|
||||||
|
let model = Entity::find_by_id(task_id).one(&self.db).await?;
|
||||||
|
let model =
|
||||||
|
model.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?;
|
||||||
|
|
||||||
|
if model.status != TaskStatus::Paused {
|
||||||
|
return Err(DbErr::Custom(format!(
|
||||||
|
"cannot resume task {}: expected status Paused, got {}",
|
||||||
|
task_id, model.status
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut active: ActiveModel = model.into();
|
||||||
|
active.status = sea_orm::Set(TaskStatus::Running);
|
||||||
|
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
|
||||||
|
active.update(&self.db).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Retry a failed or cancelled task by resetting it to Pending.
|
||||||
|
///
|
||||||
|
/// Clears `output`, `error`, and `done_at`; increments `retry_count`.
|
||||||
|
/// Only tasks in Failed or Cancelled state can be retried.
|
||||||
|
pub async fn retry(&self, task_id: i64) -> Result<Model, DbErr> {
|
||||||
|
let model = Entity::find_by_id(task_id).one(&self.db).await?;
|
||||||
|
let model =
|
||||||
|
model.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?;
|
||||||
|
|
||||||
|
match model.status {
|
||||||
|
TaskStatus::Failed | TaskStatus::Cancelled | TaskStatus::Done => {}
|
||||||
|
_ => {
|
||||||
|
return Err(DbErr::Custom(format!(
|
||||||
|
"cannot retry task {}: only Failed/Cancelled/Done tasks can be retried (got {})",
|
||||||
|
task_id, model.status
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let retry_count = model.retry_count.map(|c| c + 1).unwrap_or(1);
|
||||||
|
|
||||||
|
let mut active: ActiveModel = model.into();
|
||||||
|
active.status = sea_orm::Set(TaskStatus::Pending);
|
||||||
|
active.output = sea_orm::Set(None);
|
||||||
|
active.error = sea_orm::Set(None);
|
||||||
|
active.done_at = sea_orm::Set(None);
|
||||||
|
active.started_at = sea_orm::Set(None);
|
||||||
|
active.retry_count = sea_orm::Set(Some(retry_count));
|
||||||
|
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
|
||||||
|
active.update(&self.db).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Propagate child task status up the tree.
|
||||||
|
///
|
||||||
|
/// When a child task reaches a terminal state, checks whether all its
|
||||||
|
/// siblings are also terminal. If so, marks the parent as failed so that
|
||||||
|
/// a stuck parent is never left in the `Running` state.
|
||||||
|
pub async fn propagate_to_parent(&self, task_id: i64) -> Result<Option<Model>, DbErr> {
|
||||||
|
let model = self
|
||||||
|
.get(task_id)
|
||||||
|
.await?
|
||||||
|
.ok_or_else(|| DbErr::RecordNotFound("agent_task not found".to_string()))?;
|
||||||
|
|
||||||
|
let Some(parent_id) = model.parent_id else {
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
|
||||||
|
let siblings = self.children(parent_id).await?;
|
||||||
|
if siblings.iter().all(|s| s.is_done()) {
|
||||||
|
let parent = self.get(parent_id).await?.ok_or_else(|| {
|
||||||
|
DbErr::RecordNotFound(format!("parent task {} not found", parent_id))
|
||||||
|
})?;
|
||||||
|
if parent.is_running() {
|
||||||
|
let mut active: ActiveModel = parent.into();
|
||||||
|
active.status = sea_orm::Set(TaskStatus::Failed);
|
||||||
|
active.error =
|
||||||
|
sea_orm::Set(Some("All sub-tasks failed or were cancelled".to_string()));
|
||||||
|
active.done_at = sea_orm::Set(Some(chrono::Utc::now().into()));
|
||||||
|
active.updated_at = sea_orm::Set(chrono::Utc::now().into());
|
||||||
|
let updated = active.update(&self.db).await?;
|
||||||
|
return Ok(Some(updated));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
|
||||||
/// Get a task by ID.
|
/// Get a task by ID.
|
||||||
pub async fn get(&self, task_id: i64) -> Result<Option<Model>, DbErr> {
|
pub async fn get(&self, task_id: i64) -> Result<Option<Model>, DbErr> {
|
||||||
Entity::find_by_id(task_id).one(&self.db).await
|
Entity::find_by_id(task_id).one(&self.db).await
|
||||||
@ -140,11 +523,14 @@ impl TaskService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// List all active (non-terminal) tasks for a project.
|
/// List all active (non-terminal) tasks for a project.
|
||||||
pub async fn active_tasks(&self, project_uuid: impl Into<uuid::Uuid>) -> Result<Vec<Model>, DbErr> {
|
pub async fn active_tasks(
|
||||||
|
&self,
|
||||||
|
project_uuid: impl Into<uuid::Uuid>,
|
||||||
|
) -> Result<Vec<Model>, DbErr> {
|
||||||
let uuid: uuid::Uuid = project_uuid.into();
|
let uuid: uuid::Uuid = project_uuid.into();
|
||||||
Entity::find()
|
Entity::find()
|
||||||
.filter(C::ProjectUuid.eq(uuid))
|
.filter(C::ProjectUuid.eq(uuid))
|
||||||
.filter(C::Status.is_in([TaskStatus::Pending, TaskStatus::Running]))
|
.filter(C::Status.is_in([TaskStatus::Pending, TaskStatus::Running, TaskStatus::Paused]))
|
||||||
.order_by_desc(C::CreatedAt)
|
.order_by_desc(C::CreatedAt)
|
||||||
.all(&self.db)
|
.all(&self.db)
|
||||||
.await
|
.await
|
||||||
@ -198,12 +584,10 @@ impl TaskService {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if all sub-tasks of a given parent are done.
|
/// Check if all sub-tasks of a given parent are in a terminal state.
|
||||||
|
/// Returns true if there are no children (empty tree counts as done).
|
||||||
pub async fn are_children_done(&self, parent_id: i64) -> Result<bool, DbErr> {
|
pub async fn are_children_done(&self, parent_id: i64) -> Result<bool, DbErr> {
|
||||||
let children = self.children(parent_id).await?;
|
let children = self.children(parent_id).await?;
|
||||||
if children.is_empty() {
|
Ok(children.is_empty() || children.iter().all(|c| c.is_done()))
|
||||||
return Ok(true);
|
|
||||||
}
|
|
||||||
Ok(children.iter().all(|c| c.is_done()))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
//! Executes tool calls and converts results to OpenAI `tool` messages.
|
//! Executes tool calls and converts results to OpenAI `tool` messages.
|
||||||
|
|
||||||
use futures::StreamExt;
|
|
||||||
use futures::stream;
|
use futures::stream;
|
||||||
|
use futures::StreamExt;
|
||||||
|
|
||||||
use async_openai::types::chat::{
|
use async_openai::types::chat::{
|
||||||
ChatCompletionRequestMessage, ChatCompletionRequestToolMessage,
|
ChatCompletionRequestMessage, ChatCompletionRequestToolMessage,
|
||||||
@ -70,8 +70,9 @@ impl ToolExecutor {
|
|||||||
ctx.increment_tool_calls();
|
ctx.increment_tool_calls();
|
||||||
|
|
||||||
let concurrency = self.max_concurrency;
|
let concurrency = self.max_concurrency;
|
||||||
use std::sync::Mutex;
|
use tokio::sync::Mutex as AsyncMutex;
|
||||||
let results: Mutex<Vec<ToolCallResult>> = Mutex::new(Vec::with_capacity(calls.len()));
|
let results: AsyncMutex<Vec<ToolCallResult>> =
|
||||||
|
AsyncMutex::new(Vec::with_capacity(calls.len()));
|
||||||
|
|
||||||
stream::iter(calls.into_iter().map(|call| {
|
stream::iter(calls.into_iter().map(|call| {
|
||||||
let child_ctx = ctx.child_context();
|
let child_ctx = ctx.child_context();
|
||||||
@ -91,12 +92,12 @@ impl ToolExecutor {
|
|||||||
e.to_string(),
|
e.to_string(),
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
results.lock().unwrap().push(r);
|
results.lock().await.push(r);
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
Ok(results.into_inner().unwrap())
|
Ok(results.into_inner())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn execute_one(
|
async fn execute_one(
|
||||||
|
|||||||
@ -79,6 +79,8 @@ impl MigratorTrait for Migrator {
|
|||||||
Box::new(m20260412_000003_create_project_skill::Migration),
|
Box::new(m20260412_000003_create_project_skill::Migration),
|
||||||
Box::new(m20260413_000001_add_skill_commit_blob::Migration),
|
Box::new(m20260413_000001_add_skill_commit_blob::Migration),
|
||||||
Box::new(m20260414_000001_create_agent_task::Migration),
|
Box::new(m20260414_000001_create_agent_task::Migration),
|
||||||
|
Box::new(m20260415_000001_add_issue_id_to_agent_task::Migration),
|
||||||
|
Box::new(m20260416_000001_add_retry_count_to_agent_task::Migration),
|
||||||
// Repo tables
|
// Repo tables
|
||||||
Box::new(m20250628_000028_create_repo::Migration),
|
Box::new(m20250628_000028_create_repo::Migration),
|
||||||
Box::new(m20250628_000029_create_repo_branch::Migration),
|
Box::new(m20250628_000029_create_repo_branch::Migration),
|
||||||
@ -248,3 +250,5 @@ pub mod m20260412_000002_create_workspace_billing_history;
|
|||||||
pub mod m20260412_000003_create_project_skill;
|
pub mod m20260412_000003_create_project_skill;
|
||||||
pub mod m20260413_000001_add_skill_commit_blob;
|
pub mod m20260413_000001_add_skill_commit_blob;
|
||||||
pub mod m20260414_000001_create_agent_task;
|
pub mod m20260414_000001_create_agent_task;
|
||||||
|
pub mod m20260415_000001_add_issue_id_to_agent_task;
|
||||||
|
pub mod m20260416_000001_add_retry_count_to_agent_task;
|
||||||
|
|||||||
23
libs/migrate/m20260415_000001_add_issue_id_to_agent_task.rs
Normal file
23
libs/migrate/m20260415_000001_add_issue_id_to_agent_task.rs
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
//! SeaORM migration: add issue_id to agent_task
|
||||||
|
|
||||||
|
use sea_orm_migration::prelude::*;
|
||||||
|
|
||||||
|
pub struct Migration;
|
||||||
|
|
||||||
|
impl MigrationName for Migration {
|
||||||
|
fn name(&self) -> &str {
|
||||||
|
"m20260415_000001_add_issue_id_to_agent_task"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl MigrationTrait for Migration {
|
||||||
|
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
let sql = include_str!("sql/m20260415_000001_add_issue_id_to_agent_task.sql");
|
||||||
|
super::execute_sql(manager, sql).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
super::execute_sql(manager, "ALTER TABLE agent_task DROP COLUMN IF EXISTS issue_id;").await
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,27 @@
|
|||||||
|
//! SeaORM migration: add retry_count to agent_task
|
||||||
|
|
||||||
|
use sea_orm_migration::prelude::*;
|
||||||
|
|
||||||
|
pub struct Migration;
|
||||||
|
|
||||||
|
impl MigrationName for Migration {
|
||||||
|
fn name(&self) -> &str {
|
||||||
|
"m20260416_000001_add_retry_count_to_agent_task"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl MigrationTrait for Migration {
|
||||||
|
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
let sql = include_str!("sql/m20260416_000001_add_retry_count_to_agent_task.sql");
|
||||||
|
super::execute_sql(manager, sql).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
|
||||||
|
super::execute_sql(
|
||||||
|
manager,
|
||||||
|
"ALTER TABLE agent_task DROP COLUMN IF EXISTS retry_count;",
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,4 @@
|
|||||||
|
-- Add issue_id column to agent_task for issue-task binding
|
||||||
|
ALTER TABLE agent_task ADD COLUMN IF NOT EXISTS issue_id UUID;
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_agent_task_issue ON agent_task (issue_id);
|
||||||
@ -0,0 +1,4 @@
|
|||||||
|
-- Add retry_count column to agent_task for retry tracking
|
||||||
|
ALTER TABLE agent_task ADD COLUMN IF NOT EXISTS retry_count INTEGER DEFAULT 0;
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_agent_task_retry_count ON agent_task (retry_count);
|
||||||
@ -10,7 +10,7 @@
|
|||||||
//! Sub-agents are represented as `agent_task` records with a parent reference,
|
//! Sub-agents are represented as `agent_task` records with a parent reference,
|
||||||
//! allowing hierarchical task trees and result aggregation.
|
//! allowing hierarchical task trees and result aggregation.
|
||||||
|
|
||||||
use crate::{DateTimeUtc, ProjectId, UserId};
|
use crate::{DateTimeUtc, IssueId, ProjectId, UserId};
|
||||||
use sea_orm::entity::prelude::*;
|
use sea_orm::entity::prelude::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
@ -51,10 +51,14 @@ pub enum TaskStatus {
|
|||||||
Pending,
|
Pending,
|
||||||
#[sea_orm(string_value = "Running")]
|
#[sea_orm(string_value = "Running")]
|
||||||
Running,
|
Running,
|
||||||
|
#[sea_orm(string_value = "Paused")]
|
||||||
|
Paused,
|
||||||
#[sea_orm(string_value = "Done")]
|
#[sea_orm(string_value = "Done")]
|
||||||
Done,
|
Done,
|
||||||
#[sea_orm(string_value = "Failed")]
|
#[sea_orm(string_value = "Failed")]
|
||||||
Failed,
|
Failed,
|
||||||
|
#[sea_orm(string_value = "Cancelled")]
|
||||||
|
Cancelled,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for TaskStatus {
|
impl Default for TaskStatus {
|
||||||
@ -68,8 +72,10 @@ impl std::fmt::Display for TaskStatus {
|
|||||||
match self {
|
match self {
|
||||||
TaskStatus::Pending => write!(f, "Pending"),
|
TaskStatus::Pending => write!(f, "Pending"),
|
||||||
TaskStatus::Running => write!(f, "Running"),
|
TaskStatus::Running => write!(f, "Running"),
|
||||||
|
TaskStatus::Paused => write!(f, "Paused"),
|
||||||
TaskStatus::Done => write!(f, "Done"),
|
TaskStatus::Done => write!(f, "Done"),
|
||||||
TaskStatus::Failed => write!(f, "Failed"),
|
TaskStatus::Failed => write!(f, "Failed"),
|
||||||
|
TaskStatus::Cancelled => write!(f, "Cancelled"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -88,6 +94,10 @@ pub struct Model {
|
|||||||
#[sea_orm(nullable)]
|
#[sea_orm(nullable)]
|
||||||
pub parent_id: Option<i64>,
|
pub parent_id: Option<i64>,
|
||||||
|
|
||||||
|
/// Issue this task is bound to (optional).
|
||||||
|
#[sea_orm(nullable)]
|
||||||
|
pub issue_id: Option<IssueId>,
|
||||||
|
|
||||||
/// Agent type that executes this task.
|
/// Agent type that executes this task.
|
||||||
#[sea_orm(column_type = "String(StringLen::None)", default = "React")]
|
#[sea_orm(column_type = "String(StringLen::None)", default = "React")]
|
||||||
pub agent_type: AgentType,
|
pub agent_type: AgentType,
|
||||||
@ -129,15 +139,15 @@ pub struct Model {
|
|||||||
/// Current progress description (e.g., "step 2/5: analyzing code").
|
/// Current progress description (e.g., "step 2/5: analyzing code").
|
||||||
#[sea_orm(nullable)]
|
#[sea_orm(nullable)]
|
||||||
pub progress: Option<String>,
|
pub progress: Option<String>,
|
||||||
|
|
||||||
|
/// Number of times this task has been retried.
|
||||||
|
#[sea_orm(nullable)]
|
||||||
|
pub retry_count: Option<i32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||||
pub enum Relation {
|
pub enum Relation {
|
||||||
#[sea_orm(
|
#[sea_orm(belongs_to = "Entity", from = "Column::ParentId", to = "Column::Id")]
|
||||||
belongs_to = "Entity",
|
|
||||||
from = "Column::ParentId",
|
|
||||||
to = "Column::Id"
|
|
||||||
)]
|
|
||||||
ParentTask,
|
ParentTask,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -149,6 +159,16 @@ impl Model {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_done(&self) -> bool {
|
pub fn is_done(&self) -> bool {
|
||||||
matches!(self.status, TaskStatus::Done | TaskStatus::Failed)
|
matches!(
|
||||||
|
self.status,
|
||||||
|
TaskStatus::Done | TaskStatus::Failed | TaskStatus::Cancelled
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_running(&self) -> bool {
|
||||||
|
matches!(
|
||||||
|
self.status,
|
||||||
|
TaskStatus::Running | TaskStatus::Pending | TaskStatus::Paused
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
//! Room message queue: Redis Streams + NATS.
|
//! Room message queue: Redis Streams + Redis Pub/Sub.
|
||||||
|
|
||||||
pub mod producer;
|
pub mod producer;
|
||||||
pub mod types;
|
pub mod types;
|
||||||
@ -10,6 +10,6 @@ pub use types::{
|
|||||||
RoomMessageEvent, RoomMessageStreamChunkEvent,
|
RoomMessageEvent, RoomMessageStreamChunkEvent,
|
||||||
};
|
};
|
||||||
pub use worker::{
|
pub use worker::{
|
||||||
EmailSendFn, EmailSendFut, GetRedis, PersistFn, RedisFuture, room_worker_task,
|
room_worker_task, start as start_worker, start_email_worker, EmailSendFn, EmailSendFut, GetRedis,
|
||||||
start as start_worker, start_email_worker,
|
PersistFn, RedisFuture,
|
||||||
};
|
};
|
||||||
|
|||||||
@ -2,8 +2,8 @@ use std::collections::HashMap;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use metrics::{
|
use metrics::{
|
||||||
Counter, Gauge, Histogram, Unit, describe_counter, describe_gauge, describe_histogram,
|
describe_counter, describe_gauge, describe_histogram, register_counter, register_gauge, register_histogram, Counter,
|
||||||
register_counter, register_gauge, register_histogram,
|
Gauge, Histogram, Unit,
|
||||||
};
|
};
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
@ -20,7 +20,7 @@ pub struct RoomMetrics {
|
|||||||
pub broadcasts_sent: Counter,
|
pub broadcasts_sent: Counter,
|
||||||
pub broadcasts_dropped: Counter,
|
pub broadcasts_dropped: Counter,
|
||||||
pub duplicates_skipped: Counter,
|
pub duplicates_skipped: Counter,
|
||||||
pub nats_publish_failed: Counter,
|
pub redis_publish_failed: Counter,
|
||||||
pub message_latency_ms: Histogram,
|
pub message_latency_ms: Histogram,
|
||||||
pub ws_rate_limit_hits: Counter,
|
pub ws_rate_limit_hits: Counter,
|
||||||
pub ws_auth_failures: Counter,
|
pub ws_auth_failures: Counter,
|
||||||
@ -78,9 +78,9 @@ impl Default for RoomMetrics {
|
|||||||
"Total duplicate messages skipped (idempotency)"
|
"Total duplicate messages skipped (idempotency)"
|
||||||
);
|
);
|
||||||
describe_counter!(
|
describe_counter!(
|
||||||
"room_nats_publish_failed_total",
|
"room_redis_publish_failed_total",
|
||||||
Unit::Count,
|
Unit::Count,
|
||||||
"Total NATS publish failures"
|
"Total Redis publish failures"
|
||||||
);
|
);
|
||||||
describe_histogram!(
|
describe_histogram!(
|
||||||
"room_message_latency_ms",
|
"room_message_latency_ms",
|
||||||
@ -130,7 +130,7 @@ impl Default for RoomMetrics {
|
|||||||
broadcasts_sent: register_counter!("room_broadcasts_sent_total"),
|
broadcasts_sent: register_counter!("room_broadcasts_sent_total"),
|
||||||
broadcasts_dropped: register_counter!("room_broadcasts_dropped_total"),
|
broadcasts_dropped: register_counter!("room_broadcasts_dropped_total"),
|
||||||
duplicates_skipped: register_counter!("room_duplicates_skipped_total"),
|
duplicates_skipped: register_counter!("room_duplicates_skipped_total"),
|
||||||
nats_publish_failed: register_counter!("room_nats_publish_failed_total"),
|
redis_publish_failed: register_counter!("room_redis_publish_failed_total"),
|
||||||
message_latency_ms: register_histogram!("room_message_latency_ms"),
|
message_latency_ms: register_histogram!("room_message_latency_ms"),
|
||||||
ws_rate_limit_hits: register_counter!("room_ws_rate_limit_hits_total"),
|
ws_rate_limit_hits: register_counter!("room_ws_rate_limit_hits_total"),
|
||||||
ws_auth_failures: register_counter!("room_ws_auth_failures_total"),
|
ws_auth_failures: register_counter!("room_ws_auth_failures_total"),
|
||||||
|
|||||||
@ -242,7 +242,7 @@ impl RoomService {
|
|||||||
project_id: Uuid,
|
project_id: Uuid,
|
||||||
agent_type: AgentType,
|
agent_type: AgentType,
|
||||||
input: String,
|
input: String,
|
||||||
title: Option<String>,
|
_title: Option<String>,
|
||||||
execute: F,
|
execute: F,
|
||||||
) -> anyhow::Result<i64>
|
) -> anyhow::Result<i64>
|
||||||
where
|
where
|
||||||
|
|||||||
@ -35,6 +35,7 @@ struct OpenRouterResponse {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[allow(dead_code)]
|
||||||
struct OpenRouterModel {
|
struct OpenRouterModel {
|
||||||
id: String,
|
id: String,
|
||||||
name: Option<String>,
|
name: Option<String>,
|
||||||
@ -50,6 +51,7 @@ struct OpenRouterModel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[allow(dead_code)]
|
||||||
struct OpenRouterPricing {
|
struct OpenRouterPricing {
|
||||||
prompt: String,
|
prompt: String,
|
||||||
completion: String,
|
completion: String,
|
||||||
@ -68,6 +70,7 @@ struct OpenRouterPricing {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[allow(dead_code)]
|
||||||
struct OpenRouterArchitecture {
|
struct OpenRouterArchitecture {
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
modality: Option<String>,
|
modality: Option<String>,
|
||||||
@ -82,6 +85,7 @@ struct OpenRouterArchitecture {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
#[allow(dead_code)]
|
||||||
struct OpenRouterTopProvider {
|
struct OpenRouterTopProvider {
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
context_length: Option<u64>,
|
context_length: Option<u64>,
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user