diff --git a/libs/config/lib.rs b/libs/config/lib.rs index db41622..86549c5 100644 --- a/libs/config/lib.rs +++ b/libs/config/lib.rs @@ -33,7 +33,10 @@ impl AppConfig { GLOBAL_CONFIG.get().unwrap().clone() } else { let _ = GLOBAL_CONFIG.set(this); - GLOBAL_CONFIG.get().expect("global config should be set after load").clone() + GLOBAL_CONFIG + .get() + .expect("global config should be set after load") + .clone() } } } diff --git a/libs/config/redis.rs b/libs/config/redis.rs index 6975cc1..a391630 100644 --- a/libs/config/redis.rs +++ b/libs/config/redis.rs @@ -4,9 +4,9 @@ impl AppConfig { /// Returns a single Redis URL (first from APP_REDIS_URLS or APP_REDIS_URL). pub fn redis_url(&self) -> anyhow::Result { let urls = self.redis_urls()?; - urls.into_iter().next().ok_or_else(|| { - anyhow::anyhow!("APP_REDIS_URLS or APP_REDIS_URL is empty") - }) + urls.into_iter() + .next() + .ok_or_else(|| anyhow::anyhow!("APP_REDIS_URLS or APP_REDIS_URL is empty")) } pub fn redis_urls(&self) -> anyhow::Result> { diff --git a/libs/db/cache.rs b/libs/db/cache.rs index 7d69a56..8c12d26 100644 --- a/libs/db/cache.rs +++ b/libs/db/cache.rs @@ -60,23 +60,20 @@ impl AppCache { } /// Get active chat stream state for a conversation. Returns (message_id, started_at_ts). - pub async fn get_chat_stream_active( - &self, - conversation_id: Uuid, - ) -> Option<(Uuid, i64)> { + pub async fn get_chat_stream_active(&self, conversation_id: Uuid) -> Option<(Uuid, i64)> { if let Ok(mut conn) = self.conn().await { let key = format!("{}{}", CHAT_STREAM_KEY_PREFIX, conversation_id); - if let Ok(value) = - redis::cmd("GET").arg(&key).query_async::(&mut conn).await + if let Ok(value) = redis::cmd("GET") + .arg(&key) + .query_async::(&mut conn) + .await { if let Ok(parsed) = serde_json::from_str::(&value) { let msg_id = parsed .get("message_id") .and_then(|v| v.as_str()) .and_then(|s| Uuid::parse_str(s).ok()); - let started_at = parsed - .get("started_at") - .and_then(|v| v.as_i64()); + let started_at = parsed.get("started_at").and_then(|v| v.as_i64()); if let (Some(mid), Some(ts)) = (msg_id, started_at) { return Some((mid, ts)); } @@ -93,4 +90,45 @@ impl AppCache { let _: Result<(), _> = redis::cmd("DEL").arg(&key).query_async(&mut conn).await; } } + + // ─── Chat Stream Cancellation ───────────────────────────────────────── + + /// Mark a chat stream as cancelled by the user. + /// TTL 60 seconds — prevents stale entries. + pub async fn set_chat_stream_cancelled(&self, conversation_id: Uuid) -> bool { + if let Ok(mut conn) = self.conn().await { + let key = format!("{}cancel:{}", CHAT_STREAM_KEY_PREFIX, conversation_id); + let _: Result<(), _> = redis::cmd("SETEX") + .arg(&key) + .arg(60_i64) + .arg("1") + .query_async(&mut conn) + .await; + return true; + } + false + } + + /// Check if a chat stream has been cancelled. + pub async fn is_chat_stream_cancelled(&self, conversation_id: Uuid) -> bool { + if let Ok(mut conn) = self.conn().await { + let key = format!("{}cancel:{}", CHAT_STREAM_KEY_PREFIX, conversation_id); + if let Ok(value) = redis::cmd("GET") + .arg(&key) + .query_async::>(&mut conn) + .await + { + return value.is_some(); + } + } + false + } + + /// Clear the cancel flag for a chat stream. + pub async fn clear_chat_stream_cancelled(&self, conversation_id: Uuid) { + if let Ok(mut conn) = self.conn().await { + let key = format!("{}cancel:{}", CHAT_STREAM_KEY_PREFIX, conversation_id); + let _: Result<(), _> = redis::cmd("DEL").arg(&key).query_async(&mut conn).await; + } + } } diff --git a/libs/email/lib.rs b/libs/email/lib.rs index 6e0c879..58fdaf8 100644 --- a/libs/email/lib.rs +++ b/libs/email/lib.rs @@ -1,7 +1,7 @@ use config::AppConfig; +use lettre::Transport; use lettre::message::Mailbox; use lettre::transport::smtp::{PoolConfig, SmtpTransport}; -use lettre::Transport; use metrics::counter; use regex::Regex; use serde::{Deserialize, Serialize};