gitdataai/libs/queue/producer.rs
2026-04-14 19:02:01 +08:00

229 lines
7.5 KiB
Rust

//! Publishes room messages into Redis Streams + Redis Pub/Sub (replaces NATS).
use crate::types::{
AgentTaskEvent, EmailEnvelope, ProjectRoomEvent, ReactionGroup, RoomMessageEnvelope,
RoomMessageEvent,
};
use anyhow::Context;
use deadpool_redis::cluster::Connection as RedisConn;
use std::sync::Arc;
/// Redis Pub/Sub client for broadcasting room events to all server instances.
#[derive(Clone)]
pub struct RedisPubSub {
/// Shared connection pool; cloned handles share the same pool.
pub get_redis:
Arc<dyn Fn() -> tokio::task::JoinHandle<anyhow::Result<RedisConn>> + Send + Sync>,
pub log: slog::Logger,
}
impl RedisPubSub {
/// Publish a serialised event to a Redis channel.
async fn publish_channel(&self, channel: &str, payload: &[u8]) {
let redis = match (self.get_redis)().await {
Ok(Ok(c)) => c,
Ok(Err(e)) => {
slog::error!(self.log, "redis pool get failed"; "error" => %e);
return;
}
Err(_) => {
slog::error!(self.log, "redis pool task panicked");
return;
}
};
let mut conn: RedisConn = redis;
if let Err(e) = redis::cmd("PUBLISH")
.arg(channel)
.arg(payload)
.query_async::<()>(&mut conn)
.await
{
slog::error!(self.log, "Redis PUBLISH failed"; "channel" => %channel, "error" => %e);
}
}
/// Broadcast a RoomMessageEvent to all servers subscribed to this room.
pub async fn publish_room_message(&self, room_id: uuid::Uuid, event: &RoomMessageEvent) {
let channel = format!("room:pub:{}", room_id);
let payload = match serde_json::to_vec(event) {
Ok(p) => p,
Err(e) => {
slog::error!(self.log, "serialise RoomMessageEvent failed"; "error" => %e);
return;
}
};
self.publish_channel(&channel, &payload).await;
}
/// Broadcast a project-level event to all servers subscribed to this project.
pub async fn publish_project_room_event(
&self,
project_id: uuid::Uuid,
event: &ProjectRoomEvent,
) {
let channel = format!("project:pub:{}", project_id);
let payload = match serde_json::to_vec(event) {
Ok(p) => p,
Err(e) => {
slog::error!(self.log, "serialise ProjectRoomEvent failed"; "error" => %e);
return;
}
};
self.publish_channel(&channel, &payload).await;
}
/// Broadcast an agent task event to all servers subscribed to this project.
pub async fn publish_agent_task_event(&self, project_id: uuid::Uuid, event: &AgentTaskEvent) {
let channel = format!("task:pub:{}", project_id);
let payload = match serde_json::to_vec(event) {
Ok(p) => p,
Err(e) => {
slog::error!(self.log, "serialise AgentTaskEvent failed"; "error" => %e);
return;
}
};
self.publish_channel(&channel, &payload).await;
}
}
/// Produces room messages into Redis Streams + Redis Pub/Sub.
#[derive(Clone)]
pub struct MessageProducer {
pub get_redis:
Arc<dyn Fn() -> tokio::task::JoinHandle<anyhow::Result<RedisConn>> + Send + Sync>,
maxlen: i64,
/// Redis Pub/Sub client used to fan-out events to all server instances.
pub pubsub: Option<RedisPubSub>,
log: slog::Logger,
}
impl MessageProducer {
pub fn new(
get_redis: Arc<
dyn Fn() -> tokio::task::JoinHandle<anyhow::Result<RedisConn>> + Send + Sync,
>,
pubsub: Option<RedisPubSub>,
maxlen: i64,
log: slog::Logger,
) -> Self {
Self {
get_redis,
maxlen,
pubsub,
log,
}
}
pub async fn publish(
&self,
room_id: uuid::Uuid,
envelope: RoomMessageEnvelope,
) -> anyhow::Result<String> {
let redis_key = format!("room:stream:{room_id}");
let payload = serde_json::to_string(&envelope)?;
let redis = (self.get_redis)().await??;
let mut conn: RedisConn = redis;
let entry_id: String = redis::cmd("XADD")
.arg(&redis_key)
.arg("MAXLEN")
.arg("~")
.arg(self.maxlen)
.arg("*")
.arg("data")
.arg(&payload)
.query_async(&mut conn)
.await
.context("XADD to Redis Stream")?;
slog::info!(self.log, "message queued to stream";
"room_id" => %room_id, "entry_id" => %entry_id);
// Fan-out via Redis Pub/Sub so all server instances can push to their WS clients.
if let Some(pubsub) = &self.pubsub {
let event = RoomMessageEvent::from(envelope);
pubsub.publish_room_message(room_id, &event).await;
}
Ok(entry_id)
}
/// Publish a project-level room event via Pub/Sub (no Redis Stream write).
pub async fn publish_project_room_event(
&self,
project_id: uuid::Uuid,
event: ProjectRoomEvent,
) {
let Some(pubsub) = &self.pubsub else {
slog::warn!(self.log, "pubsub not configured, skipping project event");
return;
};
pubsub.publish_project_room_event(project_id, &event).await;
}
/// Publish an agent task event via Pub/Sub (no Redis Stream write).
pub async fn publish_agent_task_event(&self, project_id: uuid::Uuid, event: AgentTaskEvent) {
let Some(pubsub) = &self.pubsub else {
slog::warn!(self.log, "pubsub not configured, skipping task event");
return;
};
pubsub.publish_agent_task_event(project_id, &event).await;
}
/// Broadcast a reaction-update event to all WS clients subscribed to this room.
/// Unlike `publish()`, this does NOT write to the Redis Stream.
pub async fn publish_reaction_event(
&self,
room_id: uuid::Uuid,
_message_id: uuid::Uuid,
reactions: Vec<ReactionGroup>,
) {
let Some(pubsub) = &self.pubsub else {
slog::warn!(self.log, "pubsub not configured, skipping reaction event");
return;
};
let event = RoomMessageEvent {
id: uuid::Uuid::now_v7(),
room_id,
sender_type: String::new(),
sender_id: None,
thread_id: None,
in_reply_to: None,
content: String::new(),
content_type: String::new(),
send_at: chrono::Utc::now(),
seq: 0,
display_name: None,
reactions: Some(reactions),
};
pubsub.publish_room_message(room_id, &event).await;
}
/// Publish an email message to the Redis Stream for async processing.
pub async fn publish_email(&self, envelope: EmailEnvelope) -> anyhow::Result<String> {
let redis_key = "email:stream";
let payload = serde_json::to_string(&envelope)?;
let redis = (self.get_redis)().await??;
let mut conn: RedisConn = redis;
let entry_id: String = redis::cmd("XADD")
.arg(redis_key)
.arg("MAXLEN")
.arg("~")
.arg(self.maxlen)
.arg("*")
.arg("data")
.arg(&payload)
.query_async(&mut conn)
.await
.context("XADD email to Redis Stream")?;
slog::info!(self.log, "email queued to stream";
"to" => %envelope.to, "entry_id" => %entry_id);
Ok(entry_id)
}
}