229 lines
7.5 KiB
Rust
229 lines
7.5 KiB
Rust
//! Publishes room messages into Redis Streams + Redis Pub/Sub (replaces NATS).
|
|
|
|
use crate::types::{
|
|
AgentTaskEvent, EmailEnvelope, ProjectRoomEvent, ReactionGroup, RoomMessageEnvelope,
|
|
RoomMessageEvent,
|
|
};
|
|
use anyhow::Context;
|
|
use deadpool_redis::cluster::Connection as RedisConn;
|
|
use std::sync::Arc;
|
|
|
|
/// Redis Pub/Sub client for broadcasting room events to all server instances.
|
|
#[derive(Clone)]
|
|
pub struct RedisPubSub {
|
|
/// Shared connection pool; cloned handles share the same pool.
|
|
pub get_redis:
|
|
Arc<dyn Fn() -> tokio::task::JoinHandle<anyhow::Result<RedisConn>> + Send + Sync>,
|
|
pub log: slog::Logger,
|
|
}
|
|
|
|
impl RedisPubSub {
|
|
/// Publish a serialised event to a Redis channel.
|
|
async fn publish_channel(&self, channel: &str, payload: &[u8]) {
|
|
let redis = match (self.get_redis)().await {
|
|
Ok(Ok(c)) => c,
|
|
Ok(Err(e)) => {
|
|
slog::error!(self.log, "redis pool get failed"; "error" => %e);
|
|
return;
|
|
}
|
|
Err(_) => {
|
|
slog::error!(self.log, "redis pool task panicked");
|
|
return;
|
|
}
|
|
};
|
|
let mut conn: RedisConn = redis;
|
|
if let Err(e) = redis::cmd("PUBLISH")
|
|
.arg(channel)
|
|
.arg(payload)
|
|
.query_async::<()>(&mut conn)
|
|
.await
|
|
{
|
|
slog::error!(self.log, "Redis PUBLISH failed"; "channel" => %channel, "error" => %e);
|
|
}
|
|
}
|
|
|
|
/// Broadcast a RoomMessageEvent to all servers subscribed to this room.
|
|
pub async fn publish_room_message(&self, room_id: uuid::Uuid, event: &RoomMessageEvent) {
|
|
let channel = format!("room:pub:{}", room_id);
|
|
let payload = match serde_json::to_vec(event) {
|
|
Ok(p) => p,
|
|
Err(e) => {
|
|
slog::error!(self.log, "serialise RoomMessageEvent failed"; "error" => %e);
|
|
return;
|
|
}
|
|
};
|
|
self.publish_channel(&channel, &payload).await;
|
|
}
|
|
|
|
/// Broadcast a project-level event to all servers subscribed to this project.
|
|
pub async fn publish_project_room_event(
|
|
&self,
|
|
project_id: uuid::Uuid,
|
|
event: &ProjectRoomEvent,
|
|
) {
|
|
let channel = format!("project:pub:{}", project_id);
|
|
let payload = match serde_json::to_vec(event) {
|
|
Ok(p) => p,
|
|
Err(e) => {
|
|
slog::error!(self.log, "serialise ProjectRoomEvent failed"; "error" => %e);
|
|
return;
|
|
}
|
|
};
|
|
self.publish_channel(&channel, &payload).await;
|
|
}
|
|
|
|
/// Broadcast an agent task event to all servers subscribed to this project.
|
|
pub async fn publish_agent_task_event(&self, project_id: uuid::Uuid, event: &AgentTaskEvent) {
|
|
let channel = format!("task:pub:{}", project_id);
|
|
let payload = match serde_json::to_vec(event) {
|
|
Ok(p) => p,
|
|
Err(e) => {
|
|
slog::error!(self.log, "serialise AgentTaskEvent failed"; "error" => %e);
|
|
return;
|
|
}
|
|
};
|
|
self.publish_channel(&channel, &payload).await;
|
|
}
|
|
}
|
|
|
|
/// Produces room messages into Redis Streams + Redis Pub/Sub.
|
|
#[derive(Clone)]
|
|
pub struct MessageProducer {
|
|
pub get_redis:
|
|
Arc<dyn Fn() -> tokio::task::JoinHandle<anyhow::Result<RedisConn>> + Send + Sync>,
|
|
maxlen: i64,
|
|
/// Redis Pub/Sub client used to fan-out events to all server instances.
|
|
pub pubsub: Option<RedisPubSub>,
|
|
log: slog::Logger,
|
|
}
|
|
|
|
impl MessageProducer {
|
|
pub fn new(
|
|
get_redis: Arc<
|
|
dyn Fn() -> tokio::task::JoinHandle<anyhow::Result<RedisConn>> + Send + Sync,
|
|
>,
|
|
pubsub: Option<RedisPubSub>,
|
|
maxlen: i64,
|
|
log: slog::Logger,
|
|
) -> Self {
|
|
Self {
|
|
get_redis,
|
|
maxlen,
|
|
pubsub,
|
|
log,
|
|
}
|
|
}
|
|
|
|
pub async fn publish(
|
|
&self,
|
|
room_id: uuid::Uuid,
|
|
envelope: RoomMessageEnvelope,
|
|
) -> anyhow::Result<String> {
|
|
let redis_key = format!("room:stream:{room_id}");
|
|
let payload = serde_json::to_string(&envelope)?;
|
|
|
|
let redis = (self.get_redis)().await??;
|
|
let mut conn: RedisConn = redis;
|
|
|
|
let entry_id: String = redis::cmd("XADD")
|
|
.arg(&redis_key)
|
|
.arg("MAXLEN")
|
|
.arg("~")
|
|
.arg(self.maxlen)
|
|
.arg("*")
|
|
.arg("data")
|
|
.arg(&payload)
|
|
.query_async(&mut conn)
|
|
.await
|
|
.context("XADD to Redis Stream")?;
|
|
|
|
slog::info!(self.log, "message queued to stream";
|
|
"room_id" => %room_id, "entry_id" => %entry_id);
|
|
|
|
// Fan-out via Redis Pub/Sub so all server instances can push to their WS clients.
|
|
if let Some(pubsub) = &self.pubsub {
|
|
let event = RoomMessageEvent::from(envelope);
|
|
pubsub.publish_room_message(room_id, &event).await;
|
|
}
|
|
|
|
Ok(entry_id)
|
|
}
|
|
|
|
/// Publish a project-level room event via Pub/Sub (no Redis Stream write).
|
|
pub async fn publish_project_room_event(
|
|
&self,
|
|
project_id: uuid::Uuid,
|
|
event: ProjectRoomEvent,
|
|
) {
|
|
let Some(pubsub) = &self.pubsub else {
|
|
slog::warn!(self.log, "pubsub not configured, skipping project event");
|
|
return;
|
|
};
|
|
pubsub.publish_project_room_event(project_id, &event).await;
|
|
}
|
|
|
|
/// Publish an agent task event via Pub/Sub (no Redis Stream write).
|
|
pub async fn publish_agent_task_event(&self, project_id: uuid::Uuid, event: AgentTaskEvent) {
|
|
let Some(pubsub) = &self.pubsub else {
|
|
slog::warn!(self.log, "pubsub not configured, skipping task event");
|
|
return;
|
|
};
|
|
pubsub.publish_agent_task_event(project_id, &event).await;
|
|
}
|
|
|
|
/// Broadcast a reaction-update event to all WS clients subscribed to this room.
|
|
/// Unlike `publish()`, this does NOT write to the Redis Stream.
|
|
pub async fn publish_reaction_event(
|
|
&self,
|
|
room_id: uuid::Uuid,
|
|
_message_id: uuid::Uuid,
|
|
reactions: Vec<ReactionGroup>,
|
|
) {
|
|
let Some(pubsub) = &self.pubsub else {
|
|
slog::warn!(self.log, "pubsub not configured, skipping reaction event");
|
|
return;
|
|
};
|
|
let event = RoomMessageEvent {
|
|
id: uuid::Uuid::now_v7(),
|
|
room_id,
|
|
sender_type: String::new(),
|
|
sender_id: None,
|
|
thread_id: None,
|
|
in_reply_to: None,
|
|
content: String::new(),
|
|
content_type: String::new(),
|
|
send_at: chrono::Utc::now(),
|
|
seq: 0,
|
|
display_name: None,
|
|
reactions: Some(reactions),
|
|
};
|
|
pubsub.publish_room_message(room_id, &event).await;
|
|
}
|
|
|
|
/// Publish an email message to the Redis Stream for async processing.
|
|
pub async fn publish_email(&self, envelope: EmailEnvelope) -> anyhow::Result<String> {
|
|
let redis_key = "email:stream";
|
|
let payload = serde_json::to_string(&envelope)?;
|
|
|
|
let redis = (self.get_redis)().await??;
|
|
let mut conn: RedisConn = redis;
|
|
|
|
let entry_id: String = redis::cmd("XADD")
|
|
.arg(redis_key)
|
|
.arg("MAXLEN")
|
|
.arg("~")
|
|
.arg(self.maxlen)
|
|
.arg("*")
|
|
.arg("data")
|
|
.arg(&payload)
|
|
.query_async(&mut conn)
|
|
.await
|
|
.context("XADD email to Redis Stream")?;
|
|
|
|
slog::info!(self.log, "email queued to stream";
|
|
"to" => %envelope.to, "entry_id" => %entry_id);
|
|
|
|
Ok(entry_id)
|
|
}
|
|
}
|