gitdataai/libs/transport/ack.rs
ZhenYi 14f6e1e500 feat(core): initialize project with access control and AI integration
- Add gitignore and prettier configuration files for project scaffolding
- Implement room access control service with project member verification
- Create user access key management with CRUD operations and activity logging
- Add accordion UI component for frontend expandable sections
- Implement room AI configuration with list, upsert, and delete operations
- Add AI event types for agent join/leave/status change tracking
- Create streaming AI processing services for mode and react patterns
- Build room AI service with model detection and idempotency handling
- Integrate chat service orchestration for AI message processing
- Add typing indicators and stream cancellation for AI interactions
- Implement mention parsing and context extraction for AI agents
2026-05-03 06:04:31 +08:00

190 lines
5.5 KiB
Rust

use serde::{Deserialize, Serialize};
use std::time::Duration;
use uuid::Uuid;
use redis::AsyncCommands;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageAck {
pub message_id: Uuid,
pub room_id: Uuid,
pub seq: i64,
pub status: AckStatus,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum AckStatus {
Pending,
Received,
Persisted,
Delivered,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AckRequest {
pub message_id: Uuid,
pub room_id: Uuid,
pub client_timestamp: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AckResponse {
pub message_id: Uuid,
pub status: AckStatus,
pub seq: Option<i64>,
pub server_timestamp: chrono::DateTime<chrono::Utc>,
pub error: Option<String>,
}
#[derive(Clone)]
pub struct AckTracker {
cache: db::cache::AppCache,
timeout: Duration,
}
impl AckTracker {
pub fn new(cache: db::cache::AppCache) -> Self {
Self {
cache,
timeout: Duration::from_secs(30),
}
}
pub async fn track_pending(
&self,
message_id: Uuid,
room_id: Uuid,
_user_id: Uuid,
) -> Result<(), crate::error::AppTransportError> {
let key = format!("ack:pending:{}:{}", room_id, message_id);
let ack = MessageAck {
message_id,
room_id,
seq: 0,
status: AckStatus::Pending,
timestamp: chrono::Utc::now(),
};
let value = serde_json::to_string(&ack)
.map_err(|_| crate::error::AppTransportError::Internal)?;
let mut conn = self.cache.conn().await
.map_err(|_| crate::error::AppTransportError::Internal)?;
let _: () = conn.set_ex(&key, &value, self.timeout.as_secs())
.await
.map_err(|_| crate::error::AppTransportError::Internal)?;
Ok(())
}
pub async fn mark_received(
&self,
message_id: Uuid,
room_id: Uuid,
) -> Result<(), crate::error::AppTransportError> {
self.update_status(message_id, room_id, AckStatus::Received).await
}
pub async fn mark_persisted(
&self,
message_id: Uuid,
room_id: Uuid,
seq: i64,
) -> Result<(), crate::error::AppTransportError> {
let key = format!("ack:pending:{}:{}", room_id, message_id);
let ack = MessageAck {
message_id,
room_id,
seq,
status: AckStatus::Persisted,
timestamp: chrono::Utc::now(),
};
let value = serde_json::to_string(&ack)
.map_err(|_| crate::error::AppTransportError::Internal)?;
let mut conn = self.cache.conn().await
.map_err(|_| crate::error::AppTransportError::Internal)?;
let _: () = conn.set_ex(&key, &value, self.timeout.as_secs())
.await
.map_err(|_| crate::error::AppTransportError::Internal)?;
Ok(())
}
pub async fn mark_delivered(
&self,
message_id: Uuid,
room_id: Uuid,
) -> Result<(), crate::error::AppTransportError> {
self.update_status(message_id, room_id, AckStatus::Delivered).await?;
let key = format!("ack:pending:{}:{}", room_id, message_id);
let mut conn = self.cache.conn().await
.map_err(|_| crate::error::AppTransportError::Internal)?;
let _: Result<(), redis::RedisError> = conn.del(&key).await;
Ok(())
}
pub async fn mark_failed(
&self,
message_id: Uuid,
room_id: Uuid,
) -> Result<(), crate::error::AppTransportError> {
self.update_status(message_id, room_id, AckStatus::Failed).await
}
pub async fn get_status(
&self,
message_id: Uuid,
room_id: Uuid,
) -> Result<Option<MessageAck>, crate::error::AppTransportError> {
let key = format!("ack:pending:{}:{}", room_id, message_id);
let mut conn = self.cache.conn().await
.map_err(|_| crate::error::AppTransportError::Internal)?;
let value: Option<String> = conn.get(&key).await
.map_err(|_| crate::error::AppTransportError::Internal)?;
match value {
Some(v) => {
let ack: MessageAck = serde_json::from_str(&v)
.map_err(|_| crate::error::AppTransportError::Internal)?;
Ok(Some(ack))
}
None => Ok(None),
}
}
async fn update_status(
&self,
message_id: Uuid,
room_id: Uuid,
status: AckStatus,
) -> Result<(), crate::error::AppTransportError> {
let key = format!("ack:pending:{}:{}", room_id, message_id);
if let Some(mut ack) = self.get_status(message_id, room_id).await? {
ack.status = status;
ack.timestamp = chrono::Utc::now();
let value = serde_json::to_string(&ack)
.map_err(|_| crate::error::AppTransportError::Internal)?;
let mut conn = self.cache.conn().await
.map_err(|_| crate::error::AppTransportError::Internal)?;
let _: () = conn.set_ex(&key, &value, self.timeout.as_secs())
.await
.map_err(|_| crate::error::AppTransportError::Internal)?;
}
Ok(())
}
pub async fn cleanup_expired(&self) -> Result<(), crate::error::AppTransportError> {
Ok(())
}
}