use serde::{Deserialize, Serialize}; use uuid::Uuid; use crate::ChannelResult; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct MessageAck { pub message_id: Uuid, pub room_id: Uuid, pub seq: i64, pub status: AckStatus, pub timestamp: chrono::DateTime, } #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum AckStatus { Pending, Received, Persisted, Delivered, Failed, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AckRequest { pub message_id: Uuid, pub room_id: Uuid, pub client_timestamp: chrono::DateTime, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AckResponse { pub message_id: Uuid, pub status: AckStatus, pub seq: Option, pub server_timestamp: chrono::DateTime, pub error: Option, } #[derive(Clone)] pub struct AckTracker { cache: cache::AppCache, timeout: std::time::Duration, } impl AckTracker { pub fn new(cache: cache::AppCache) -> Self { Self::with_config(cache, std::time::Duration::from_secs(30)) } pub fn with_config( cache: cache::AppCache, timeout: std::time::Duration, ) -> Self { Self { cache, timeout } } async fn set_with_ttl(&self, key: &str, value: &T) -> ChannelResult<()> where T: serde::Serialize, { self.cache.set(key, value).await?; if let Some(cluster) = &self.cache.cluster { if let Err(e) = cluster.expire(key, self.timeout).await { tracing::warn!(error = %e, "ack TTL override failed, using default cache TTL"); } } Ok(()) } pub async fn track_pending( &self, message_id: Uuid, room_id: Uuid, user_id: Uuid, ) -> ChannelResult<()> { let key = format!("ack:pending:{}:{}", room_id, message_id); let ack = MessageAck { message_id, room_id, seq: 0, status: AckStatus::Pending, timestamp: chrono::Utc::now(), }; let owner_key = format!("ack:owner:{}:{}", room_id, message_id); self.set_with_ttl(&key, &ack).await?; self.set_with_ttl(&owner_key, &user_id.to_string()).await?; Ok(()) } pub async fn mark_received( &self, message_id: Uuid, room_id: Uuid, ) -> ChannelResult<()> { self.update_status(message_id, room_id, AckStatus::Received) .await } pub async fn mark_persisted( &self, message_id: Uuid, room_id: Uuid, seq: i64, ) -> ChannelResult<()> { let key = format!("ack:pending:{}:{}", room_id, message_id); let ack = MessageAck { message_id, room_id, seq, status: AckStatus::Persisted, timestamp: chrono::Utc::now(), }; self.set_with_ttl(&key, &ack).await?; Ok(()) } pub async fn mark_delivered( &self, message_id: Uuid, room_id: Uuid, ) -> ChannelResult<()> { self.update_status(message_id, room_id, AckStatus::Delivered) .await?; let key = format!("ack:pending:{}:{}", room_id, message_id); self.cache.remove(&key).await?; Ok(()) } pub async fn mark_failed( &self, message_id: Uuid, room_id: Uuid, ) -> ChannelResult<()> { self.update_status(message_id, room_id, AckStatus::Failed) .await } pub async fn get_status( &self, message_id: Uuid, room_id: Uuid, ) -> ChannelResult> { let key = format!("ack:pending:{}:{}", room_id, message_id); Ok(self.cache.get::(&key).await?) } async fn update_status( &self, message_id: Uuid, room_id: Uuid, status: AckStatus, ) -> ChannelResult<()> { if let Some(mut ack) = self.get_status(message_id, room_id).await? { ack.status = status; ack.timestamp = chrono::Utc::now(); let key = format!("ack:pending:{}:{}", room_id, message_id); self.set_with_ttl(&key, &ack).await?; } Ok(()) } }