use std::sync::Arc; use async_trait::async_trait; use redis::{AsyncCommands, Msg, PushInfo}; use tracing::warn; use crate::{ adapter::{Adapter, BroadcastOptions, MemoryAdapter}, error::Result, packet::Packet, server::SocketIo, }; #[derive(Clone, Debug)] pub struct RedisClusterAdapterConfig { pub channel_prefix: String, pub node_id: String, } impl Default for RedisClusterAdapterConfig { fn default() -> Self { Self { channel_prefix: "socket.io:{adapter}".to_owned(), node_id: uuid::Uuid::new_v4().to_string(), } } } pub struct RedisClusterAdapter { local: Arc, pool: deadpool_redis::cluster::Pool, config: RedisClusterAdapterConfig, } #[derive(Debug, serde::Deserialize, serde::Serialize)] struct RedisMessage { origin: String, packet: RedisPacket, opts: BroadcastOptions, } #[derive(Debug, serde::Deserialize, serde::Serialize)] struct RedisPacket { encoded: String, attachments: Vec>, } impl RedisClusterAdapter { pub fn new( pool: deadpool_redis::cluster::Pool, config: RedisClusterAdapterConfig, ) -> Arc { Arc::new(Self { local: MemoryAdapter::new(), pool, config, }) } pub async fn attach_with_push_receiver( self: Arc, io: SocketIo, mut rx: tokio::sync::mpsc::UnboundedReceiver, ) -> Result<()> { let channel = self.channel(); let mut conn = self.pool.get().await.map_err(|err| { redis::RedisError::from(( redis::ErrorKind::Io, "redis pool failed", err.to_string(), )) })?; let _: () = conn.subscribe(&channel).await?; let node_id = self.config.node_id.clone(); actix_web::rt::spawn(async move { let mut _subscribed_conn = conn; while let Some(push) = rx.recv().await { let Some(message) = Msg::from_push_info(push) else { continue; }; let Ok(payload) = message.get_payload::() else { warn!("failed to decode redis adapter payload"); continue; }; let Ok(message) = serde_json::from_str::(&payload) else { warn!("failed to parse redis adapter message"); continue; }; if message.origin == node_id { continue; } let Ok(mut packet) = Packet::decode(&message.packet.encoded) else { warn!("failed to decode redis adapter socket.io packet"); continue; }; packet.attachments = message.packet.attachments; if let Err(err) = io.deliver_remote_packet(message.opts, packet).await { warn!(error = %err, "failed to deliver redis adapter packet"); } } }); Ok(()) } fn channel(&self) -> String { format!("{}:broadcast", self.config.channel_prefix) } } #[async_trait] impl Adapter for RedisClusterAdapter { async fn add_socket(&self, namespace: &str, sid: &str) -> Result<()> { self.local.add_socket(namespace, sid).await } async fn remove_socket(&self, namespace: &str, sid: &str) -> Result<()> { self.local.remove_socket(namespace, sid).await } async fn add_to_room( &self, namespace: &str, sid: &str, room: &str, ) -> Result<()> { self.local.add_to_room(namespace, sid, room).await } async fn remove_from_room( &self, namespace: &str, sid: &str, room: &str, ) -> Result<()> { self.local.remove_from_room(namespace, sid, room).await } async fn sockets( &self, namespace: &str, opts: &BroadcastOptions, ) -> Result> { self.local.sockets(namespace, opts).await } async fn publish( &self, packet: &Packet, opts: &BroadcastOptions, ) -> Result<()> { let message = RedisMessage { origin: self.config.node_id.clone(), packet: RedisPacket { encoded: packet.encode(), attachments: packet.attachments.clone(), }, opts: opts.clone(), }; let payload = serde_json::to_string(&message)?; let mut conn = self.pool.get().await.map_err(|err| { redis::RedisError::from(( redis::ErrorKind::Io, "redis pool failed", err.to_string(), )) })?; let _: usize = conn.publish(self.channel(), payload).await?; Ok(()) } }