163 lines
4.2 KiB
Rust
163 lines
4.2 KiB
Rust
use serde::{Deserialize, Serialize};
|
|
use uuid::Uuid;
|
|
|
|
use crate::ChannelResult;
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct MessageAck {
|
|
pub message_id: Uuid,
|
|
pub room_id: Uuid,
|
|
pub seq: i64,
|
|
pub status: AckStatus,
|
|
pub timestamp: chrono::DateTime<chrono::Utc>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
|
|
#[serde(rename_all = "lowercase")]
|
|
pub enum AckStatus {
|
|
Pending,
|
|
Received,
|
|
Persisted,
|
|
Delivered,
|
|
Failed,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct AckRequest {
|
|
pub message_id: Uuid,
|
|
pub room_id: Uuid,
|
|
pub client_timestamp: chrono::DateTime<chrono::Utc>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct AckResponse {
|
|
pub message_id: Uuid,
|
|
pub status: AckStatus,
|
|
pub seq: Option<i64>,
|
|
pub server_timestamp: chrono::DateTime<chrono::Utc>,
|
|
pub error: Option<String>,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct AckTracker {
|
|
cache: cache::AppCache,
|
|
timeout: std::time::Duration,
|
|
}
|
|
|
|
impl AckTracker {
|
|
pub fn new(cache: cache::AppCache) -> Self {
|
|
Self::with_config(cache, std::time::Duration::from_secs(30))
|
|
}
|
|
|
|
pub fn with_config(
|
|
cache: cache::AppCache,
|
|
timeout: std::time::Duration,
|
|
) -> Self {
|
|
Self { cache, timeout }
|
|
}
|
|
async fn set_with_ttl<T>(&self, key: &str, value: &T) -> ChannelResult<()>
|
|
where
|
|
T: serde::Serialize,
|
|
{
|
|
self.cache.set(key, value).await?;
|
|
if let Some(cluster) = &self.cache.cluster {
|
|
if let Err(e) = cluster.expire(key, self.timeout).await {
|
|
tracing::warn!(error = %e, "ack TTL override failed, using default cache TTL");
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn track_pending(
|
|
&self,
|
|
message_id: Uuid,
|
|
room_id: Uuid,
|
|
user_id: Uuid,
|
|
) -> ChannelResult<()> {
|
|
let key = format!("ack:pending:{}:{}", room_id, message_id);
|
|
let ack = MessageAck {
|
|
message_id,
|
|
room_id,
|
|
seq: 0,
|
|
status: AckStatus::Pending,
|
|
timestamp: chrono::Utc::now(),
|
|
};
|
|
let owner_key = format!("ack:owner:{}:{}", room_id, message_id);
|
|
self.set_with_ttl(&key, &ack).await?;
|
|
self.set_with_ttl(&owner_key, &user_id.to_string()).await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn mark_received(
|
|
&self,
|
|
message_id: Uuid,
|
|
room_id: Uuid,
|
|
) -> ChannelResult<()> {
|
|
self.update_status(message_id, room_id, AckStatus::Received)
|
|
.await
|
|
}
|
|
|
|
pub async fn mark_persisted(
|
|
&self,
|
|
message_id: Uuid,
|
|
room_id: Uuid,
|
|
seq: i64,
|
|
) -> ChannelResult<()> {
|
|
let key = format!("ack:pending:{}:{}", room_id, message_id);
|
|
let ack = MessageAck {
|
|
message_id,
|
|
room_id,
|
|
seq,
|
|
status: AckStatus::Persisted,
|
|
timestamp: chrono::Utc::now(),
|
|
};
|
|
self.set_with_ttl(&key, &ack).await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn mark_delivered(
|
|
&self,
|
|
message_id: Uuid,
|
|
room_id: Uuid,
|
|
) -> ChannelResult<()> {
|
|
self.update_status(message_id, room_id, AckStatus::Delivered)
|
|
.await?;
|
|
let key = format!("ack:pending:{}:{}", room_id, message_id);
|
|
self.cache.remove(&key).await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn mark_failed(
|
|
&self,
|
|
message_id: Uuid,
|
|
room_id: Uuid,
|
|
) -> ChannelResult<()> {
|
|
self.update_status(message_id, room_id, AckStatus::Failed)
|
|
.await
|
|
}
|
|
|
|
pub async fn get_status(
|
|
&self,
|
|
message_id: Uuid,
|
|
room_id: Uuid,
|
|
) -> ChannelResult<Option<MessageAck>> {
|
|
let key = format!("ack:pending:{}:{}", room_id, message_id);
|
|
Ok(self.cache.get::<MessageAck>(&key).await?)
|
|
}
|
|
|
|
async fn update_status(
|
|
&self,
|
|
message_id: Uuid,
|
|
room_id: Uuid,
|
|
status: AckStatus,
|
|
) -> ChannelResult<()> {
|
|
if let Some(mut ack) = self.get_status(message_id, room_id).await? {
|
|
ack.status = status;
|
|
ack.timestamp = chrono::Utc::now();
|
|
let key = format!("ack:pending:{}:{}", room_id, message_id);
|
|
self.set_with_ttl(&key, &ack).await?;
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|