56 lines
1.4 KiB
Rust
56 lines
1.4 KiB
Rust
use std::time::Duration;
|
|
use uuid::Uuid;
|
|
|
|
use crate::{ChannelError, ChannelResult, security::require_cluster};
|
|
|
|
#[derive(Clone)]
|
|
pub struct DeduplicationManager {
|
|
cache: cache::AppCache,
|
|
window: Duration,
|
|
}
|
|
|
|
impl DeduplicationManager {
|
|
pub fn new(cache: cache::AppCache) -> Self {
|
|
Self {
|
|
cache,
|
|
window: Duration::from_secs(300),
|
|
}
|
|
}
|
|
|
|
pub fn with_config(cache: cache::AppCache, window: Duration) -> Self {
|
|
Self { cache, window }
|
|
}
|
|
|
|
pub async fn check_and_mark(
|
|
&self,
|
|
message_id: Uuid,
|
|
room_id: Uuid,
|
|
) -> ChannelResult<bool> {
|
|
let cluster = require_cluster(&self.cache)?;
|
|
let key = format!("dedup:{}:{}", room_id, message_id);
|
|
let mut conn = cluster.conn();
|
|
|
|
let result: Option<String> = redis::cmd("SET")
|
|
.arg(&key)
|
|
.arg("1")
|
|
.arg("NX")
|
|
.arg("EX")
|
|
.arg(self.window.as_secs())
|
|
.query_async(&mut conn)
|
|
.await
|
|
.map_err(|e| ChannelError::Cache(cache::CacheError::Redis(e)))?;
|
|
|
|
Ok(result.is_some())
|
|
}
|
|
|
|
pub async fn is_duplicate(
|
|
&self,
|
|
message_id: Uuid,
|
|
room_id: Uuid,
|
|
) -> ChannelResult<bool> {
|
|
let cluster = require_cluster(&self.cache)?;
|
|
let key = format!("dedup:{}:{}", room_id, message_id);
|
|
Ok(cluster.exists(&key).await?)
|
|
}
|
|
}
|