gitdataai/libs/service/chat/message.rs

494 lines
17 KiB
Rust

use crate::error::AppError;
use models::ai::{AiMessage, ai_conversation, ai_message};
use sea_orm::{
ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, QueryOrder, QuerySelect, Set,
};
use uuid::Uuid;
use crate::AppService;
impl AppService {
pub(crate) async fn find_message(
&self,
message_id: Uuid,
) -> Result<ai_message::Model, AppError> {
AiMessage::find_by_id(message_id)
.one(self.db.reader())
.await?
.ok_or_else(|| AppError::NotFound("message".into()))
}
pub async fn list_messages(
&self,
conversation_id: Uuid,
user_id: Uuid,
limit: u64,
) -> Result<Vec<ai_message::Model>, AppError> {
self.find_conversation_owned(conversation_id, user_id)
.await?;
// Only return latest versions for each version group
let msgs = AiMessage::find()
.filter(ai_message::Column::ConversationId.eq(conversation_id))
.filter(ai_message::Column::IsLatest.eq(true))
.order_by_asc(ai_message::Column::CreatedAt)
.limit(limit)
.all(self.db.reader())
.await?;
Ok(msgs)
}
pub async fn create_message(
&self,
conversation_id: Uuid,
user_id: Uuid,
parent_message_id: Option<Uuid>,
role: String,
content: serde_json::Value,
model: Option<String>,
is_fork_origin: bool,
metadata: Option<serde_json::Value>,
room_id: Option<Uuid>,
) -> Result<ai_message::Model, AppError> {
let c = self
.find_conversation_full_access(conversation_id, user_id)
.await?;
if role != "user" {
return Err(AppError::PermissionDenied);
}
if let Some(parent_id) = parent_message_id {
let parent = self.find_message(parent_id).await?;
if parent.conversation_id != conversation_id || !parent.is_latest {
return Err(AppError::NotFound("parent message".into()));
}
}
// For project chats, non-owner must also have can_ask permission
if c.user_id != user_id && c.project_id.is_some() {
let access = super::access::check_conversation_access(&self.db, &c, user_id).await?;
if access != super::AccessLevel::Full {
return Err(AppError::PermissionDenied);
}
}
let msg_id = Uuid::now_v7();
let msg = ai_message::ActiveModel {
id: Set(msg_id),
conversation_id: Set(conversation_id),
parent_message_id: Set(parent_message_id),
role: Set(role),
content: Set(content),
model: Set(model.or(Some(c.model.clone()))),
is_fork_origin: Set(is_fork_origin),
stop_reason: Set(None),
input_tokens: Set(None),
output_tokens: Set(None),
latency_ms: Set(None),
metadata: Set(metadata),
room_id: Set(room_id),
version_group_id: Set(Some(msg_id)),
version_number: Set(1),
is_latest: Set(true),
created_at: Set(chrono::Utc::now()),
}
.insert(self.db.writer())
.await?;
let msg_count = c.message_count;
let mut updated: ai_conversation::ActiveModel = c.into();
updated.message_count = Set(msg_count + 1);
updated.updated_at = Set(chrono::Utc::now());
let _ = updated.update(self.db.writer()).await;
observability::incr!(observability::AI_CHAT_MESSAGES_SENT);
Ok(msg)
}
pub async fn get_message(
&self,
conversation_id: Uuid,
user_id: Uuid,
message_id: Uuid,
) -> Result<ai_message::Model, AppError> {
self.find_conversation_owned(conversation_id, user_id)
.await?;
let msg = self.find_message(message_id).await?;
if msg.conversation_id != conversation_id {
return Err(AppError::NotFound("message".into()));
}
Ok(msg)
}
pub async fn stop_message(
&self,
conversation_id: Uuid,
user_id: Uuid,
message_id: Uuid,
) -> Result<(), AppError> {
let c = self
.find_conversation_full_access(conversation_id, user_id)
.await?;
// For project chats, non-owner must also have can_ask permission
if c.user_id != user_id && c.project_id.is_some() {
let access = super::access::check_conversation_access(&self.db, &c, user_id).await?;
if access != super::AccessLevel::Full {
return Err(AppError::PermissionDenied);
}
}
let existing = AiMessage::find_by_id(message_id)
.one(self.db.reader())
.await?
.ok_or_else(|| AppError::NotFound("message".into()))?;
if existing.conversation_id != conversation_id {
return Err(AppError::NotFound("message".into()));
}
let mut msg: ai_message::ActiveModel = existing.into();
msg.stop_reason = Set(Some("stop".to_string()));
msg.update(self.db.writer()).await?;
// Signal cancellation to the active stream
self.cache.set_chat_stream_cancelled(conversation_id).await;
Ok(())
}
/// Edit a user message: creates a new version in the same version group,
/// marks the old version as non-latest, and creates a new version as latest.
/// Also marks the old assistant response (child of old version) as non-latest.
pub async fn edit_message(
&self,
conversation_id: Uuid,
user_id: Uuid,
message_id: Uuid,
new_content: String,
) -> Result<ai_message::Model, AppError> {
let c = self
.find_conversation_full_access(conversation_id, user_id)
.await?;
// For project chats, non-owner must also have can_ask permission
if c.user_id != user_id && c.project_id.is_some() {
let access = super::access::check_conversation_access(&self.db, &c, user_id).await?;
if access != super::AccessLevel::Full {
return Err(AppError::PermissionDenied);
}
}
let original = self.find_message(message_id).await?;
if original.conversation_id != conversation_id {
return Err(AppError::NotFound("message".into()));
}
if original.role != "user" {
return Err(AppError::PermissionDenied);
}
let version_group_id = original.version_group_id.unwrap_or(original.id);
// Begin transaction for atomic version update
let txn = self.db.begin().await?;
// Mark the old version as non-latest
let mut old_active: ai_message::ActiveModel = original.clone().into();
old_active.is_latest = Set(false);
old_active.update(&txn).await?;
// Also mark any assistant response that was a child of this user message as non-latest
let children = AiMessage::find()
.filter(ai_message::Column::ConversationId.eq(conversation_id))
.filter(ai_message::Column::ParentMessageId.eq(message_id))
.filter(ai_message::Column::IsLatest.eq(true))
.all(self.db.reader())
.await?;
for child in children {
let mut child_active: ai_message::ActiveModel = child.into();
child_active.is_latest = Set(false);
child_active.update(&txn).await?;
}
// Determine the next version number
let max_version = AiMessage::find()
.filter(ai_message::Column::VersionGroupId.eq(version_group_id))
.all(self.db.reader())
.await?
.iter()
.map(|m| m.version_number)
.max()
.unwrap_or(1);
let new_msg_id = Uuid::now_v7();
// Create new version of the user message
let new_msg = ai_message::ActiveModel {
id: Set(new_msg_id),
conversation_id: Set(conversation_id),
parent_message_id: Set(original.parent_message_id),
role: Set("user".to_string()),
content: Set(serde_json::json!(new_content)),
model: Set(original.model),
is_fork_origin: Set(false),
stop_reason: Set(None),
input_tokens: Set(None),
output_tokens: Set(None),
latency_ms: Set(None),
metadata: Set(original.metadata),
room_id: Set(original.room_id),
version_group_id: Set(Some(version_group_id)),
version_number: Set(max_version + 1),
is_latest: Set(true),
created_at: Set(chrono::Utc::now()),
}
.insert(&txn)
.await?;
// Update conversation message count
let msg_count = c.message_count;
let mut updated: ai_conversation::ActiveModel = c.into();
updated.message_count = Set(msg_count + 1);
updated.updated_at = Set(chrono::Utc::now());
let _ = updated.update(&txn).await;
txn.commit().await?;
Ok(new_msg)
}
/// List all versions of a message within its version group.
pub async fn list_message_versions(
&self,
conversation_id: Uuid,
user_id: Uuid,
message_id: Uuid,
) -> Result<Vec<ai_message::Model>, AppError> {
self.find_conversation_owned(conversation_id, user_id)
.await?;
let msg = self.find_message(message_id).await?;
if msg.conversation_id != conversation_id {
return Err(AppError::NotFound("message".into()));
}
let version_group_id = msg.version_group_id.unwrap_or(msg.id);
let versions = AiMessage::find()
.filter(ai_message::Column::VersionGroupId.eq(version_group_id))
.order_by_desc(ai_message::Column::VersionNumber)
.all(self.db.reader())
.await?;
Ok(versions)
}
/// Switch to a specific version of a message: marks the current latest as
/// non-latest, marks the target version as latest, and adjusts child messages.
pub async fn switch_message_version(
&self,
conversation_id: Uuid,
user_id: Uuid,
message_id: Uuid,
target_version_number: i32,
) -> Result<ai_message::Model, AppError> {
let c = self
.find_conversation_full_access(conversation_id, user_id)
.await?;
if c.user_id != user_id {
return Err(AppError::PermissionDenied);
}
let msg = self.find_message(message_id).await?;
if msg.conversation_id != conversation_id {
return Err(AppError::NotFound("message".into()));
}
let version_group_id = msg.version_group_id.unwrap_or(msg.id);
// Begin transaction for atomic version switch
let txn = self.db.begin().await?;
// Mark ALL versions in this group as non-latest first
let all_versions = AiMessage::find()
.filter(ai_message::Column::VersionGroupId.eq(version_group_id))
.all(self.db.reader())
.await?;
for v in &all_versions {
if v.is_latest {
let mut active: ai_message::ActiveModel = v.clone().into();
active.is_latest = Set(false);
active.update(&txn).await?;
// Also mark children of the current latest as non-latest
let children = AiMessage::find()
.filter(ai_message::Column::ConversationId.eq(conversation_id))
.filter(ai_message::Column::ParentMessageId.eq(v.id))
.filter(ai_message::Column::IsLatest.eq(true))
.all(self.db.reader())
.await?;
for child in children {
let mut child_active: ai_message::ActiveModel = child.into();
child_active.is_latest = Set(false);
child_active.update(&txn).await?;
}
}
}
// Find the target version and mark it as latest
let target = all_versions
.iter()
.find(|v| v.version_number == target_version_number)
.ok_or_else(|| AppError::NotFound("version".into()))?;
let mut target_active: ai_message::ActiveModel = target.clone().into();
target_active.is_latest = Set(true);
let updated_target = target_active.update(&txn).await?;
// Also mark children of the target version as latest
let target_children = AiMessage::find()
.filter(ai_message::Column::ConversationId.eq(conversation_id))
.filter(ai_message::Column::ParentMessageId.eq(target.id))
.all(self.db.reader())
.await?;
for child in target_children {
let child_group_id = child.version_group_id.unwrap_or(child.id);
let child_latest = AiMessage::find()
.filter(ai_message::Column::VersionGroupId.eq(child_group_id))
.filter(ai_message::Column::IsLatest.eq(true))
.one(self.db.reader())
.await?;
if child_latest.is_none() {
let mut child_active: ai_message::ActiveModel = child.into();
child_active.is_latest = Set(true);
child_active.update(&txn).await?;
}
}
txn.commit().await?;
Ok(updated_target)
}
pub async fn resend_message(
&self,
conversation_id: Uuid,
user_id: Uuid,
message_id: Uuid,
) -> Result<ai_message::Model, AppError> {
let c = self
.find_conversation_full_access(conversation_id, user_id)
.await?;
// For project chats, non-owner must also have can_ask permission
if c.user_id != user_id && c.project_id.is_some() {
let access = super::access::check_conversation_access(&self.db, &c, user_id).await?;
if access != super::AccessLevel::Full {
return Err(AppError::PermissionDenied);
}
}
let original = self.find_message(message_id).await?;
if original.conversation_id != conversation_id {
return Err(AppError::NotFound("message".into()));
}
// resend_message now uses the same versioning mechanism as edit_message
// but keeps the same content — it just creates a new version to trigger
// a new AI response
let version_group_id = original.version_group_id.unwrap_or(original.id);
// Begin transaction for atomic version update
let txn = self.db.begin().await?;
// Mark the old version as non-latest
let mut old_active: ai_message::ActiveModel = original.clone().into();
old_active.is_latest = Set(false);
old_active.update(&txn).await?;
// Mark old assistant response as non-latest
let children = AiMessage::find()
.filter(ai_message::Column::ConversationId.eq(conversation_id))
.filter(ai_message::Column::ParentMessageId.eq(message_id))
.filter(ai_message::Column::IsLatest.eq(true))
.all(self.db.reader())
.await?;
for child in children {
let mut child_active: ai_message::ActiveModel = child.into();
child_active.is_latest = Set(false);
child_active.update(&txn).await?;
}
let max_version = AiMessage::find()
.filter(ai_message::Column::VersionGroupId.eq(version_group_id))
.all(self.db.reader())
.await?
.iter()
.map(|m| m.version_number)
.max()
.unwrap_or(1);
let new_msg_id = Uuid::now_v7();
let new_msg = ai_message::ActiveModel {
id: Set(new_msg_id),
conversation_id: Set(conversation_id),
parent_message_id: Set(original.parent_message_id),
role: Set(original.role),
content: Set(original.content),
model: Set(original.model),
is_fork_origin: Set(false),
stop_reason: Set(None),
input_tokens: Set(None),
output_tokens: Set(None),
latency_ms: Set(None),
metadata: Set(original.metadata),
room_id: Set(original.room_id),
version_group_id: Set(Some(version_group_id)),
version_number: Set(max_version + 1),
is_latest: Set(true),
created_at: Set(chrono::Utc::now()),
}
.insert(&txn)
.await?;
// Update conversation message count
let msg_count = c.message_count;
let mut updated: ai_conversation::ActiveModel = c.into();
updated.message_count = Set(msg_count + 1);
updated.updated_at = Set(chrono::Utc::now());
let _ = updated.update(&txn).await;
txn.commit().await?;
Ok(new_msg)
}
pub async fn list_child_messages(
&self,
conversation_id: Uuid,
user_id: Uuid,
parent_message_id: Uuid,
) -> Result<Vec<ai_message::Model>, AppError> {
self.find_conversation_owned(conversation_id, user_id)
.await?;
let msgs = AiMessage::find()
.filter(ai_message::Column::ConversationId.eq(conversation_id))
.filter(ai_message::Column::ParentMessageId.eq(parent_message_id))
.filter(ai_message::Column::IsLatest.eq(true))
.all(self.db.reader())
.await?;
Ok(msgs)
}
}