gitdataai/lib/socketio/nats.rs
2026-05-30 01:38:40 +08:00

212 lines
6.3 KiB
Rust

use std::{sync::Arc, time::Duration};
use async_nats::jetstream;
use async_trait::async_trait;
use futures_util::StreamExt;
use tracing::warn;
use crate::{
adapter::{Adapter, BroadcastOptions, MemoryAdapter},
error::{Result, SocketIoError},
packet::Packet,
server::SocketIo,
};
#[derive(Clone, Debug)]
pub struct NatsJetStreamAdapterConfig {
pub stream_name: String,
pub subject: String,
pub durable_name: String,
pub node_id: String,
pub max_age: Duration,
pub ack_wait: Duration,
}
impl Default for NatsJetStreamAdapterConfig {
fn default() -> Self {
Self {
stream_name: "SOCKETIO_ADAPTER".to_owned(),
subject: "socketio.adapter.broadcast".to_owned(),
durable_name: format!("socketio-adapter-{}", uuid::Uuid::new_v4()),
node_id: uuid::Uuid::new_v4().to_string(),
max_age: Duration::from_secs(60 * 60),
ack_wait: Duration::from_secs(30),
}
}
}
pub struct NatsJetStreamAdapter {
local: Arc<MemoryAdapter>,
jetstream: jetstream::Context,
config: NatsJetStreamAdapterConfig,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct NatsMessage {
origin: String,
packet: NatsPacket,
opts: BroadcastOptions,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct NatsPacket {
encoded: String,
attachments: Vec<Vec<u8>>,
}
impl NatsJetStreamAdapter {
pub fn new(
jetstream: jetstream::Context,
config: NatsJetStreamAdapterConfig,
) -> Arc<Self> {
Arc::new(Self {
local: MemoryAdapter::new(),
jetstream,
config,
})
}
pub async fn attach(self: Arc<Self>, io: SocketIo) -> Result<()> {
let stream = self
.jetstream
.get_or_create_stream(jetstream::stream::Config {
name: self.config.stream_name.clone(),
subjects: vec![self.config.subject.clone()],
max_age: self.config.max_age,
..Default::default()
})
.await
.map_err(|err| {
SocketIoError::Adapter(format!("nats stream failed: {err}"))
})?;
let consumer = stream
.get_or_create_consumer(
&self.config.durable_name,
jetstream::consumer::pull::Config {
durable_name: Some(self.config.durable_name.clone()),
filter_subject: self.config.subject.clone(),
ack_wait: self.config.ack_wait,
..Default::default()
},
)
.await
.map_err(|err| {
SocketIoError::Adapter(format!("nats consumer failed: {err}"))
})?;
let node_id = self.config.node_id.clone();
actix_web::rt::spawn(async move {
let messages = consumer.messages().await;
let mut messages = match messages {
Ok(messages) => messages,
Err(err) => {
warn!(error = %err, "failed to open nats jetstream adapter consumer");
return;
}
};
while let Some(message) = messages.next().await {
let Ok(message) = message else {
warn!("failed to receive nats jetstream adapter message");
continue;
};
let payload = message.payload.to_vec();
let Ok(message_data) =
serde_json::from_slice::<NatsMessage>(&payload)
else {
warn!("failed to parse nats jetstream adapter message");
let _ = message.ack().await;
continue;
};
if message_data.origin == node_id {
let _ = message.ack().await;
continue;
}
let Ok(mut packet) =
Packet::decode(&message_data.packet.encoded)
else {
warn!(
"failed to decode nats jetstream adapter socket.io packet"
);
let _ = message.ack().await;
continue;
};
packet.attachments = message_data.packet.attachments;
if let Err(err) =
io.deliver_remote_packet(message_data.opts, packet).await
{
warn!(error = %err, "failed to deliver nats jetstream adapter packet");
}
let _ = message.ack().await;
}
});
Ok(())
}
}
#[async_trait]
impl Adapter for NatsJetStreamAdapter {
async fn add_socket(&self, namespace: &str, sid: &str) -> Result<()> {
self.local.add_socket(namespace, sid).await
}
async fn remove_socket(&self, namespace: &str, sid: &str) -> Result<()> {
self.local.remove_socket(namespace, sid).await
}
async fn add_to_room(
&self,
namespace: &str,
sid: &str,
room: &str,
) -> Result<()> {
self.local.add_to_room(namespace, sid, room).await
}
async fn remove_from_room(
&self,
namespace: &str,
sid: &str,
room: &str,
) -> Result<()> {
self.local.remove_from_room(namespace, sid, room).await
}
async fn sockets(
&self,
namespace: &str,
opts: &BroadcastOptions,
) -> Result<Vec<String>> {
self.local.sockets(namespace, opts).await
}
async fn publish(
&self,
packet: &Packet,
opts: &BroadcastOptions,
) -> Result<()> {
let message = NatsMessage {
origin: self.config.node_id.clone(),
packet: NatsPacket {
encoded: packet.encode(),
attachments: packet.attachments.clone(),
},
opts: opts.clone(),
};
let payload = serde_json::to_vec(&message)?;
let ack = self
.jetstream
.publish(self.config.subject.clone(), payload.into())
.await
.map_err(|err| {
SocketIoError::Adapter(format!("nats publish failed: {err}"))
})?;
ack.await.map_err(|err| {
SocketIoError::Adapter(format!("nats publish ack failed: {err}"))
})?;
Ok(())
}
}