gitdataai/libs/db/cache.rs

97 lines
3.3 KiB
Rust

use config::AppConfig;
use deadpool_redis::cluster::{Connection, Manager, Pool};
use uuid::Uuid;
const CHAT_STREAM_KEY_PREFIX: &str = "chat:stream:";
const CHAT_STREAM_TTL_SECS: u64 = 600; // 10 minutes
#[derive(Clone)]
pub struct AppCache {
pool: Pool,
redis_url: String,
}
impl AppCache {
pub async fn init(cfg: &AppConfig) -> anyhow::Result<Self> {
let urls = cfg.redis_urls()?;
let pool_size = cfg.redis_pool_size()?;
let conn = Manager::new(urls.clone(), true)?;
let pool = deadpool_redis::cluster::Pool::builder(conn)
.max_size(pool_size as usize)
.build()?;
let redis_url = urls
.first()
.cloned()
.unwrap_or_else(|| "redis://127.0.0.1:6379".to_string());
Ok(Self { pool, redis_url })
}
pub async fn conn(&self) -> anyhow::Result<Connection> {
Ok(self.pool.get().await?)
}
pub fn redis_pool(&self) -> &Pool {
&self.pool
}
pub fn redis_url(&self) -> &str {
&self.redis_url
}
/// Set chat stream active state (conversation_id → {message_id, started_at}).
/// TTL 10 minutes — prevents stale entries.
pub async fn set_chat_stream_active(&self, conversation_id: Uuid, message_id: Uuid) -> bool {
if let Ok(mut conn) = self.conn().await {
let key = format!("{}{}", CHAT_STREAM_KEY_PREFIX, conversation_id);
let value = serde_json::json!({
"message_id": message_id.to_string(),
"started_at": chrono::Utc::now().timestamp(),
})
.to_string();
let _: Result<(), _> = redis::cmd("SETEX")
.arg(&key)
.arg(CHAT_STREAM_TTL_SECS as i64)
.arg(&value)
.query_async(&mut conn)
.await;
return true;
}
false
}
/// Get active chat stream state for a conversation. Returns (message_id, started_at_ts).
pub async fn get_chat_stream_active(
&self,
conversation_id: Uuid,
) -> Option<(Uuid, i64)> {
if let Ok(mut conn) = self.conn().await {
let key = format!("{}{}", CHAT_STREAM_KEY_PREFIX, conversation_id);
if let Ok(value) =
redis::cmd("GET").arg(&key).query_async::<String>(&mut conn).await
{
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&value) {
let msg_id = parsed
.get("message_id")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
let started_at = parsed
.get("started_at")
.and_then(|v| v.as_i64());
if let (Some(mid), Some(ts)) = (msg_id, started_at) {
return Some((mid, ts));
}
}
}
}
None
}
/// Clear chat stream active state (called when streaming finishes).
pub async fn clear_chat_stream_active(&self, conversation_id: Uuid) {
if let Ok(mut conn) = self.conn().await {
let key = format!("{}{}", CHAT_STREAM_KEY_PREFIX, conversation_id);
let _: Result<(), _> = redis::cmd("DEL").arg(&key).query_async(&mut conn).await;
}
}
}