gitdataai/lib/channel/ack.rs
2026-05-30 01:38:40 +08:00

163 lines
4.2 KiB
Rust

use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::ChannelResult;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageAck {
pub message_id: Uuid,
pub room_id: Uuid,
pub seq: i64,
pub status: AckStatus,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum AckStatus {
Pending,
Received,
Persisted,
Delivered,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AckRequest {
pub message_id: Uuid,
pub room_id: Uuid,
pub client_timestamp: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AckResponse {
pub message_id: Uuid,
pub status: AckStatus,
pub seq: Option<i64>,
pub server_timestamp: chrono::DateTime<chrono::Utc>,
pub error: Option<String>,
}
#[derive(Clone)]
pub struct AckTracker {
cache: cache::AppCache,
timeout: std::time::Duration,
}
impl AckTracker {
pub fn new(cache: cache::AppCache) -> Self {
Self::with_config(cache, std::time::Duration::from_secs(30))
}
pub fn with_config(
cache: cache::AppCache,
timeout: std::time::Duration,
) -> Self {
Self { cache, timeout }
}
async fn set_with_ttl<T>(&self, key: &str, value: &T) -> ChannelResult<()>
where
T: serde::Serialize,
{
self.cache.set(key, value).await?;
if let Some(cluster) = &self.cache.cluster {
if let Err(e) = cluster.expire(key, self.timeout).await {
tracing::warn!(error = %e, "ack TTL override failed, using default cache TTL");
}
}
Ok(())
}
pub async fn track_pending(
&self,
message_id: Uuid,
room_id: Uuid,
user_id: Uuid,
) -> ChannelResult<()> {
let key = format!("ack:pending:{}:{}", room_id, message_id);
let ack = MessageAck {
message_id,
room_id,
seq: 0,
status: AckStatus::Pending,
timestamp: chrono::Utc::now(),
};
let owner_key = format!("ack:owner:{}:{}", room_id, message_id);
self.set_with_ttl(&key, &ack).await?;
self.set_with_ttl(&owner_key, &user_id.to_string()).await?;
Ok(())
}
pub async fn mark_received(
&self,
message_id: Uuid,
room_id: Uuid,
) -> ChannelResult<()> {
self.update_status(message_id, room_id, AckStatus::Received)
.await
}
pub async fn mark_persisted(
&self,
message_id: Uuid,
room_id: Uuid,
seq: i64,
) -> ChannelResult<()> {
let key = format!("ack:pending:{}:{}", room_id, message_id);
let ack = MessageAck {
message_id,
room_id,
seq,
status: AckStatus::Persisted,
timestamp: chrono::Utc::now(),
};
self.set_with_ttl(&key, &ack).await?;
Ok(())
}
pub async fn mark_delivered(
&self,
message_id: Uuid,
room_id: Uuid,
) -> ChannelResult<()> {
self.update_status(message_id, room_id, AckStatus::Delivered)
.await?;
let key = format!("ack:pending:{}:{}", room_id, message_id);
self.cache.remove(&key).await?;
Ok(())
}
pub async fn mark_failed(
&self,
message_id: Uuid,
room_id: Uuid,
) -> ChannelResult<()> {
self.update_status(message_id, room_id, AckStatus::Failed)
.await
}
pub async fn get_status(
&self,
message_id: Uuid,
room_id: Uuid,
) -> ChannelResult<Option<MessageAck>> {
let key = format!("ack:pending:{}:{}", room_id, message_id);
Ok(self.cache.get::<MessageAck>(&key).await?)
}
async fn update_status(
&self,
message_id: Uuid,
room_id: Uuid,
status: AckStatus,
) -> ChannelResult<()> {
if let Some(mut ack) = self.get_status(message_id, room_id).await? {
ack.status = status;
ack.timestamp = chrono::Utc::now();
let key = format!("ack:pending:{}:{}", room_id, message_id);
self.set_with_ttl(&key, &ack).await?;
}
Ok(())
}
}