use config::AppConfig; use deadpool_redis::cluster::{Connection, Manager, Pool}; use uuid::Uuid; const CHAT_STREAM_KEY_PREFIX: &str = "chat:stream:"; const CHAT_STREAM_TTL_SECS: u64 = 600; // 10 minutes #[derive(Clone)] pub struct AppCache { pool: Pool, redis_url: String, } impl AppCache { pub async fn init(cfg: &AppConfig) -> anyhow::Result { let urls = cfg.redis_urls()?; let pool_size = cfg.redis_pool_size()?; let conn = Manager::new(urls.clone(), true)?; let pool = deadpool_redis::cluster::Pool::builder(conn) .max_size(pool_size as usize) .build()?; let redis_url = urls .first() .cloned() .unwrap_or_else(|| "redis://127.0.0.1:6379".to_string()); Ok(Self { pool, redis_url }) } pub async fn conn(&self) -> anyhow::Result { Ok(self.pool.get().await?) } pub fn redis_pool(&self) -> &Pool { &self.pool } pub fn redis_url(&self) -> &str { &self.redis_url } /// Set chat stream active state (conversation_id → {message_id, started_at}). /// TTL 10 minutes — prevents stale entries. pub async fn set_chat_stream_active(&self, conversation_id: Uuid, message_id: Uuid) -> bool { if let Ok(mut conn) = self.conn().await { let key = format!("{}{}", CHAT_STREAM_KEY_PREFIX, conversation_id); let value = serde_json::json!({ "message_id": message_id.to_string(), "started_at": chrono::Utc::now().timestamp(), }) .to_string(); let _: Result<(), _> = redis::cmd("SETEX") .arg(&key) .arg(CHAT_STREAM_TTL_SECS as i64) .arg(&value) .query_async(&mut conn) .await; return true; } false } /// Get active chat stream state for a conversation. Returns (message_id, started_at_ts). pub async fn get_chat_stream_active( &self, conversation_id: Uuid, ) -> Option<(Uuid, i64)> { if let Ok(mut conn) = self.conn().await { let key = format!("{}{}", CHAT_STREAM_KEY_PREFIX, conversation_id); if let Ok(value) = redis::cmd("GET").arg(&key).query_async::(&mut conn).await { if let Ok(parsed) = serde_json::from_str::(&value) { let msg_id = parsed .get("message_id") .and_then(|v| v.as_str()) .and_then(|s| Uuid::parse_str(s).ok()); let started_at = parsed .get("started_at") .and_then(|v| v.as_i64()); if let (Some(mid), Some(ts)) = (msg_id, started_at) { return Some((mid, ts)); } } } } None } /// Clear chat stream active state (called when streaming finishes). pub async fn clear_chat_stream_active(&self, conversation_id: Uuid) { if let Ok(mut conn) = self.conn().await { let key = format!("{}{}", CHAT_STREAM_KEY_PREFIX, conversation_id); let _: Result<(), _> = redis::cmd("DEL").arg(&key).query_async(&mut conn).await; } } }