gitdataai/lib/service/agent/conversation.rs

642 lines
22 KiB
Rust

use chrono::Utc;
use db::sqlx;
use model::agent::AgentConversationModel;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use uuid::Uuid;
use crate::AppService;
use crate::error::AppError;
#[derive(Debug, Clone, Deserialize, ToSchema)]
pub struct CreateConversation {
pub title: String,
}
#[derive(Debug, Clone, Deserialize, ToSchema)]
pub struct UpdateConversation {
pub title: Option<String>,
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct ConversationResponse {
pub id: Uuid,
pub session_id: Uuid,
pub title: String,
pub created_by: Uuid,
pub last_message_at: Option<chrono::DateTime<Utc>>,
pub archived_at: Option<chrono::DateTime<Utc>>,
pub created_at: chrono::DateTime<Utc>,
pub updated_at: chrono::DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct ToolCallResponse {
pub id: String,
pub name: String,
pub arguments: serde_json::Value,
pub output: Option<serde_json::Value>,
pub error: Option<String>,
pub status: String,
pub elapsed_ms: Option<i64>,
}
impl From<model::agent::AgentToolCallLogModel> for ToolCallResponse {
fn from(m: model::agent::AgentToolCallLogModel) -> Self {
Self {
id: m.tool_call_id.unwrap_or_default(),
name: m.tool_name,
arguments: m
.arguments
.as_deref()
.and_then(|s| serde_json::from_str(s).ok())
.unwrap_or_default(),
output: m
.result
.as_deref()
.and_then(|s| serde_json::from_str(s).ok()),
error: m.error,
status: m.status,
elapsed_ms: m.latency_ms,
}
}
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct MessageResponse {
pub id: Uuid,
pub conversation_id: Uuid,
pub parent_id: Option<Uuid>,
pub role: String,
pub author: Option<Uuid>,
pub content: String,
pub content_type: String,
pub status: String,
pub model_invocation: Option<Uuid>,
pub reasoning_content: Option<String>,
#[serde(default)]
pub tool_calls: Vec<ToolCallResponse>,
pub created_at: chrono::DateTime<Utc>,
pub updated_at: chrono::DateTime<Utc>,
}
#[derive(Debug, Clone, sqlx::FromRow)]
struct ConversationWithSessionRow {
pub id: Uuid,
pub session: Uuid,
pub title: String,
pub created_by: Uuid,
pub last_message_at: Option<chrono::DateTime<Utc>>,
pub archived_at: Option<chrono::DateTime<Utc>>,
pub created_at: chrono::DateTime<Utc>,
pub updated_at: chrono::DateTime<Utc>,
pub session_name: Option<String>,
#[allow(dead_code)]
pub session_wk: Option<Uuid>,
}
#[derive(Debug, Clone, Serialize, ToSchema)]
pub struct ConversationWithSessionResponse {
pub id: Uuid,
pub session_id: Uuid,
pub session_name: Option<String>,
pub title: String,
pub created_by: Uuid,
pub last_message_at: Option<chrono::DateTime<Utc>>,
pub archived_at: Option<chrono::DateTime<Utc>>,
pub created_at: chrono::DateTime<Utc>,
pub updated_at: chrono::DateTime<Utc>,
}
impl From<ConversationWithSessionRow> for ConversationWithSessionResponse {
fn from(r: ConversationWithSessionRow) -> Self {
Self {
id: r.id,
session_id: r.session,
session_name: r.session_name,
title: r.title,
created_by: r.created_by,
last_message_at: r.last_message_at,
archived_at: r.archived_at,
created_at: r.created_at,
updated_at: r.updated_at,
}
}
}
impl AppService {
pub async fn agent_require_conversation_access(
&self,
user_id: Uuid,
conversation_id: Uuid,
) -> Result<AgentConversationModel, AppError> {
let conv = sqlx::query_as::<_, AgentConversationModel>(
"SELECT id, session, title, created_by, last_message_at, \
archived_at, compacted_summary, created_at, updated_at, deleted_at \
FROM agent_conversation \
WHERE id = $1 AND deleted_at IS NULL",
)
.bind(conversation_id)
.fetch_optional(self.db.reader())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?
.ok_or_else(|| AppError::NotFound("conversation not found".to_string()))?;
let session: (Option<uuid::Uuid>, Option<uuid::Uuid>) = sqlx::query_as(
"SELECT \"user\", wk \
FROM agent_session \
WHERE id = $1 AND deleted_at IS NULL AND enabled = true",
)
.bind(conv.session)
.fetch_optional(self.db.reader())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?
.ok_or_else(|| {
AppError::NotFound("agent session not found".to_string())
})?;
let (session_user, session_wk) = session;
if session_user != Some(user_id) {
if let Some(wk) = session_wk {
let _ = crate::AppService::workspace_require_member(
&*self, wk, user_id,
)
.await?;
} else {
return Err(AppError::PermissionDenied);
}
}
Ok(conv)
}
async fn agent_require_session_access(
&self,
user_id: Uuid,
session_id: Uuid,
) -> Result<(), AppError> {
let session: (Option<Uuid>, Option<Uuid>) = sqlx::query_as(
"SELECT \"user\", wk \
FROM agent_session \
WHERE id = $1 AND deleted_at IS NULL AND enabled = true",
)
.bind(session_id)
.fetch_optional(self.db.reader())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?
.ok_or_else(|| {
AppError::NotFound("agent session not found".to_string())
})?;
if session.0 != Some(user_id) {
if let Some(wk) = session.1 {
let _ = crate::AppService::workspace_require_member(
&*self, wk, user_id,
)
.await?;
} else {
return Err(AppError::PermissionDenied);
}
}
Ok(())
}
}
impl AppService {
pub async fn agent_conversation_create(
&self,
user_id: Uuid,
session_id: Uuid,
params: CreateConversation,
) -> Result<ConversationResponse, AppError> {
self.agent_require_session_access(user_id, session_id)
.await?;
let id = Uuid::now_v7();
let now = Utc::now();
let row = sqlx::query_as::<_, AgentConversationModel>(
"INSERT INTO agent_conversation \
(id, session, title, created_by, created_at, updated_at) \
VALUES ($1, $2, $3, $4, $5, $5) \
RETURNING id, session, title, created_by, last_message_at, \
archived_at, compacted_summary, created_at, updated_at, deleted_at",
)
.bind(id)
.bind(session_id)
.bind(&params.title)
.bind(user_id)
.bind(now)
.fetch_one(self.db.writer())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
Ok(row.into())
}
pub async fn agent_conversation_list(
&self,
user_id: Uuid,
session_id: Uuid,
) -> Result<Vec<ConversationResponse>, AppError> {
self.agent_require_session_access(user_id, session_id)
.await?;
let rows = sqlx::query_as::<_, AgentConversationModel>(
"SELECT id, session, title, created_by, last_message_at, \
archived_at, compacted_summary, created_at, updated_at, deleted_at \
FROM agent_conversation \
WHERE session = $1 AND deleted_at IS NULL \
ORDER BY last_message_at DESC NULLS LAST, created_at DESC \
LIMIT 100",
)
.bind(session_id)
.fetch_all(self.db.reader())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
Ok(rows.into_iter().map(Into::into).collect())
}
pub async fn agent_conversation_list_all(
&self,
user_id: Uuid,
wk: Option<&str>,
) -> Result<Vec<ConversationWithSessionResponse>, AppError> {
let rows: Vec<ConversationWithSessionRow> = if let Some(wk_name) = wk {
sqlx::query_as(
"SELECT c.id, c.session, c.title, c.created_by, c.last_message_at, \
c.archived_at, c.created_at, c.updated_at, \
s.name as session_name, s.wk as session_wk \
FROM agent_conversation c \
INNER JOIN agent_session s ON s.id = c.session \
WHERE c.deleted_at IS NULL AND s.deleted_at IS NULL AND s.enabled = true \
AND (s.\"user\" = $1 OR (s.wk = (SELECT id FROM workspace WHERE name = $2) AND s.wk IN (SELECT wk FROM wk_member WHERE \"user\" = $1))) \
ORDER BY c.last_message_at DESC NULLS LAST, c.created_at DESC \
LIMIT 100",
)
.bind(user_id)
.bind(wk_name)
.fetch_all(self.db.reader())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?
} else {
sqlx::query_as(
"SELECT c.id, c.session, c.title, c.created_by, c.last_message_at, \
c.archived_at, c.created_at, c.updated_at, \
s.name as session_name, s.wk as session_wk \
FROM agent_conversation c \
INNER JOIN agent_session s ON s.id = c.session \
WHERE c.deleted_at IS NULL AND s.deleted_at IS NULL AND s.enabled = true \
AND s.\"user\" = $1 \
ORDER BY c.last_message_at DESC NULLS LAST, c.created_at DESC \
LIMIT 100",
)
.bind(user_id)
.fetch_all(self.db.reader())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?
};
Ok(rows.into_iter().map(Into::into).collect())
}
pub async fn agent_conversation_get(
&self,
user_id: Uuid,
conversation_id: Uuid,
) -> Result<ConversationResponse, AppError> {
Ok(self
.agent_require_conversation_access(user_id, conversation_id)
.await?
.into())
}
pub async fn agent_conversation_update(
&self,
user_id: Uuid,
conversation_id: Uuid,
params: UpdateConversation,
) -> Result<ConversationResponse, AppError> {
let existing = self
.agent_require_conversation_access(user_id, conversation_id)
.await?;
let title = params.title.unwrap_or(existing.title);
let now = Utc::now();
let row = sqlx::query_as::<_, AgentConversationModel>(
"UPDATE agent_conversation SET title = $1, updated_at = $2 \
WHERE id = $3 AND deleted_at IS NULL \
RETURNING id, session, title, created_by, last_message_at, \
archived_at, compacted_summary, created_at, updated_at, deleted_at",
)
.bind(&title)
.bind(now)
.bind(conversation_id)
.fetch_one(self.db.writer())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
Ok(row.into())
}
pub async fn agent_conversation_delete(
&self,
user_id: Uuid,
conversation_id: Uuid,
) -> Result<(), AppError> {
self.agent_require_conversation_access(user_id, conversation_id)
.await?;
let now = Utc::now();
let rows = sqlx::query(
"UPDATE agent_conversation SET deleted_at = $1, updated_at = $1 \
WHERE id = $2 AND deleted_at IS NULL",
)
.bind(now)
.bind(conversation_id)
.execute(self.db.writer())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
if rows.rows_affected() == 0 {
return Err(AppError::NotFound(
"conversation not found".to_string(),
));
}
Ok(())
}
pub async fn agent_conversation_archive(
&self,
user_id: Uuid,
conversation_id: Uuid,
) -> Result<ConversationResponse, AppError> {
self.agent_require_conversation_access(user_id, conversation_id)
.await?;
let now = Utc::now();
let row = sqlx::query_as::<_, AgentConversationModel>(
"UPDATE agent_conversation SET archived_at = $1, updated_at = $1 \
WHERE id = $2 AND deleted_at IS NULL \
RETURNING id, session, title, created_by, last_message_at, \
archived_at, compacted_summary, created_at, updated_at, deleted_at",
)
.bind(now)
.bind(conversation_id)
.fetch_optional(self.db.writer())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?
.ok_or_else(|| AppError::NotFound("conversation not found".to_string()))?;
Ok(row.into())
}
pub async fn agent_conversation_unarchive(
&self,
user_id: Uuid,
conversation_id: Uuid,
) -> Result<ConversationResponse, AppError> {
self.agent_require_conversation_access(user_id, conversation_id)
.await?;
let now = Utc::now();
let row = sqlx::query_as::<_, AgentConversationModel>(
"UPDATE agent_conversation SET archived_at = NULL, updated_at = $1 \
WHERE id = $2 AND deleted_at IS NULL \
RETURNING id, session, title, created_by, last_message_at, \
archived_at, compacted_summary, created_at, updated_at, deleted_at",
)
.bind(now)
.bind(conversation_id)
.fetch_optional(self.db.writer())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?
.ok_or_else(|| AppError::NotFound("conversation not found".to_string()))?;
Ok(row.into())
}
}
impl AppService {
pub async fn agent_message_list(
&self,
user_id: Uuid,
conversation_id: Uuid,
limit: Option<u32>,
before: Option<Uuid>,
) -> Result<Vec<MessageResponse>, AppError> {
self.agent_require_conversation_access(user_id, conversation_id)
.await?;
let limit = limit.unwrap_or(50).min(100) as i64;
let rows = if let Some(before_id) = before {
sqlx::query_as::<_, model::agent::AgentMessageModel>(
"SELECT id, conversation, parent, role, author, content, content_type, \
status, model_invocation, reasoning_content, created_at, updated_at, deleted_at \
FROM agent_message \
WHERE conversation = $1 AND deleted_at IS NULL \
AND created_at < (SELECT created_at FROM agent_message WHERE id = $2 AND conversation = $1) \
ORDER BY created_at DESC LIMIT $3",
)
.bind(conversation_id)
.bind(before_id)
.bind(limit)
.fetch_all(self.db.reader())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?
} else {
sqlx::query_as::<_, model::agent::AgentMessageModel>(
"SELECT id, conversation, parent, role, author, content, content_type, \
status, model_invocation, reasoning_content, created_at, updated_at, deleted_at \
FROM agent_message \
WHERE conversation = $1 AND deleted_at IS NULL \
ORDER BY created_at DESC LIMIT $2",
)
.bind(conversation_id)
.bind(limit)
.fetch_all(self.db.reader())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?
};
// Fetch tool calls for all assistant messages in one query.
let message_ids: Vec<Uuid> = rows.iter().map(|r| r.id).collect();
let tool_call_logs = if !message_ids.is_empty() {
sqlx::query_as::<_, model::agent::AgentToolCallLogModel>(
"SELECT id, invocation, session, conversation, message, tool_call_id, \
tool_name, arguments, result, error, status, \
started_at, finished_at, latency_ms \
FROM agent_tool_call_log \
WHERE message = ANY($1) \
ORDER BY started_at ASC",
)
.bind(&message_ids)
.fetch_all(self.db.reader())
.await
.unwrap_or_default()
} else {
Vec::new()
};
// Group tool calls by message_id.
let mut tool_calls_by_message: std::collections::HashMap<
Uuid,
Vec<ToolCallResponse>,
> = std::collections::HashMap::new();
for log in tool_call_logs {
if let Some(msg_id) = log.message {
tool_calls_by_message
.entry(msg_id)
.or_default()
.push(log.into());
}
}
let mut messages: Vec<MessageResponse> = rows
.into_iter()
.map(|row| {
let mut msg: MessageResponse = row.into();
msg.tool_calls =
tool_calls_by_message.remove(&msg.id).unwrap_or_default();
msg
})
.collect();
messages.reverse();
Ok(messages)
}
pub async fn agent_conversation_fork(
&self,
user_id: Uuid,
source_conversation_id: Uuid,
up_to_message_id: Option<Uuid>,
title_override: Option<&str>,
) -> Result<ConversationResponse, AppError> {
let source = self
.agent_require_conversation_access(user_id, source_conversation_id)
.await?;
let session_id = source.session;
let base_title = title_override
.map(|t| t.to_string())
.unwrap_or_else(|| format!("{} (fork)", source.title));
let new_id = Uuid::now_v7();
let now = Utc::now();
sqlx::query(
"INSERT INTO agent_conversation \
(id, session, title, created_by, created_at, updated_at) \
VALUES ($1, $2, $3, $4, $5, $5)",
)
.bind(new_id)
.bind(session_id)
.bind(&base_title)
.bind(user_id)
.bind(now)
.execute(self.db.writer())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
let messages = if let Some(msg_id) = up_to_message_id {
sqlx::query_as::<_, model::agent::AgentMessageModel>(
"SELECT id, conversation, parent, role, author, content, content_type, \
status, model_invocation, reasoning_content, \
created_at, updated_at, deleted_at \
FROM agent_message \
WHERE conversation = $1 \
AND deleted_at IS NULL \
AND created_at <= (SELECT created_at FROM agent_message WHERE id = $2) \
ORDER BY created_at ASC",
)
.bind(source_conversation_id)
.bind(msg_id)
.fetch_all(self.db.reader())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?
} else {
sqlx::query_as::<_, model::agent::AgentMessageModel>(
"SELECT id, conversation, parent, role, author, content, content_type, \
status, model_invocation, reasoning_content, \
created_at, updated_at, deleted_at \
FROM agent_message \
WHERE conversation = $1 AND deleted_at IS NULL \
ORDER BY created_at ASC",
)
.bind(source_conversation_id)
.fetch_all(self.db.reader())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?
};
for msg in &messages {
let new_msg_id = Uuid::now_v7();
sqlx::query(
"INSERT INTO agent_message \
(id, conversation, parent, role, author, content, content_type, \
status, model_invocation, reasoning_content, created_at, updated_at) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $11)",
)
.bind(new_msg_id)
.bind(new_id)
.bind::<Option<Uuid>>(None)
.bind(&msg.role)
.bind(msg.author)
.bind(&msg.content)
.bind(&msg.content_type)
.bind(&msg.status)
.bind(msg.model_invocation)
.bind(&msg.reasoning_content)
.bind(msg.created_at)
.execute(self.db.writer())
.await
.map_err(|e| AppError::DatabaseError(e.to_string()))?;
let _ = sqlx::query(
"INSERT INTO agent_message_fork \
(source_message, forked_conversation, forked_by, created_at) \
VALUES ($1, $2, $3, $4)",
)
.bind(msg.id)
.bind(new_id)
.bind(user_id)
.bind(now)
.execute(self.db.writer())
.await;
}
self.agent_conversation_get(user_id, new_id).await
}
}
impl From<AgentConversationModel> for ConversationResponse {
fn from(m: AgentConversationModel) -> Self {
Self {
id: m.id,
session_id: m.session,
title: m.title,
created_by: m.created_by,
last_message_at: m.last_message_at,
archived_at: m.archived_at,
created_at: m.created_at,
updated_at: m.updated_at,
}
}
}
impl From<model::agent::AgentMessageModel> for MessageResponse {
fn from(m: model::agent::AgentMessageModel) -> Self {
Self {
id: m.id,
conversation_id: m.conversation,
parent_id: m.parent,
role: m.role,
author: m.author,
content: m.content,
content_type: m.content_type,
status: m.status,
model_invocation: m.model_invocation,
reasoning_content: m.reasoning_content,
tool_calls: Vec::new(),
created_at: m.created_at,
updated_at: m.updated_at,
}
}
}