refactor: update channel and model layers

This commit is contained in:
zhenyi 2026-06-01 22:04:31 +08:00
parent 734e1c4cc8
commit 079ea3a5cf
19 changed files with 473 additions and 151 deletions

View File

@ -41,6 +41,7 @@ pub struct Inner {
pub cache: AppCache, pub cache: AppCache,
pub io: SocketIo, pub io: SocketIo,
pub config: ChannelBusConfig, pub config: ChannelBusConfig,
pub cdn: crate::CdnManager,
pub online: RwLock<HashMap<Uuid, HashMap<String, Socket>>>, pub online: RwLock<HashMap<Uuid, HashMap<String, Socket>>>,
pub user_sync_locks: DashMap<Uuid, Arc<Mutex<()>>>, pub user_sync_locks: DashMap<Uuid, Arc<Mutex<()>>>,
pub typing_states: DashMap< pub typing_states: DashMap<
@ -203,6 +204,8 @@ impl ChannelBus {
cache: AppCache, cache: AppCache,
io: SocketIo, io: SocketIo,
config: ChannelBusConfig, config: ChannelBusConfig,
cdn: crate::CdnManager,
metrics_registry: Option<track::MetricsRegistry>,
) -> Self { ) -> Self {
let seq = match config.seq_segment_size { let seq = match config.seq_segment_size {
Some(size) => { Some(size) => {
@ -217,7 +220,7 @@ impl ChannelBus {
), ),
); );
let reconnect = ReconnectManager::new(cache.clone(), db.clone()); let reconnect = ReconnectManager::new(cache.clone(), db.clone());
let rate_limiter = match ( let mut rate_limiter = match (
config.rate_limit_max_requests, config.rate_limit_max_requests,
config.rate_limit_window_secs, config.rate_limit_window_secs,
) { ) {
@ -229,7 +232,7 @@ impl ChannelBus {
_ => RateLimiter::new(cache.clone()), _ => RateLimiter::new(cache.clone()),
}; };
let csrf = CsrfProtection::new(cache.clone()); let csrf = CsrfProtection::new(cache.clone());
let circuit_breaker = match ( let mut circuit_breaker = match (
config.circuit_breaker_failure_threshold, config.circuit_breaker_failure_threshold,
config.circuit_breaker_success_threshold, config.circuit_breaker_success_threshold,
config.circuit_breaker_timeout_secs, config.circuit_breaker_timeout_secs,
@ -245,18 +248,23 @@ impl ChannelBus {
} }
_ => CircuitBreaker::new(), _ => CircuitBreaker::new(),
}; };
if let Some(ref reg) = metrics_registry {
rate_limiter.set_metrics(reg);
circuit_breaker.set_metrics(reg);
}
Self { Self {
inner: Arc::new(Inner { inner: Arc::new(Inner {
db, db,
cache, cache,
io, io,
config, config,
cdn,
online: RwLock::new(HashMap::new()), online: RwLock::new(HashMap::new()),
user_sync_locks: DashMap::new(), user_sync_locks: DashMap::new(),
typing_states: DashMap::new(), typing_states: DashMap::new(),
seq, seq,
dedup, dedup,
metrics: ChannelMetrics::new(), metrics: ChannelMetrics::new(metrics_registry),
reconnect, reconnect,
rate_limiter, rate_limiter,
csrf, csrf,

View File

@ -1,6 +1,7 @@
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use track::CounterVec;
use crate::ChannelError; use crate::ChannelError;
@ -11,6 +12,34 @@ const STATUS_HALF_OPEN: u8 = 2;
#[derive(Clone)] #[derive(Clone)]
pub struct CircuitBreaker { pub struct CircuitBreaker {
inner: Arc<Inner>, inner: Arc<Inner>,
metrics: Option<CircuitBreakerMetrics>,
}
#[derive(Clone)]
struct CircuitBreakerMetrics {
transitions: CounterVec,
calls: CounterVec,
}
impl CircuitBreakerMetrics {
fn new(registry: &track::MetricsRegistry) -> Self {
Self {
transitions: registry
.register_counter_vec(
"circuit_breaker_transitions_total",
"Circuit breaker state transitions",
&["transition"],
)
.expect("failed to register circuit_breaker_transitions_total"),
calls: registry
.register_counter_vec(
"circuit_breaker_calls_total",
"Circuit breaker call outcomes",
&["outcome"],
)
.expect("failed to register circuit_breaker_calls_total"),
}
}
} }
struct CircuitState { struct CircuitState {
@ -61,9 +90,14 @@ impl CircuitBreaker {
half_open_max_calls, half_open_max_calls,
}, },
}), }),
metrics: None,
} }
} }
pub fn set_metrics(&mut self, registry: &track::MetricsRegistry) {
self.metrics = Some(CircuitBreakerMetrics::new(registry));
}
pub async fn call<F, T>(&self, f: F) -> Result<T, CircuitBreakerError> pub async fn call<F, T>(&self, f: F) -> Result<T, CircuitBreakerError>
where where
F: std::future::Future<Output = Result<T, ChannelError>>, F: std::future::Future<Output = Result<T, ChannelError>>,
@ -90,20 +124,29 @@ impl CircuitBreaker {
false false
} }
} }
_ => true, // Closed → allow _ => true,
} }
}; // Lock released before executing the call. };
if !slot_reserved { if !slot_reserved {
if let Some(m) = &self.metrics {
m.calls.with_label_values(&["rejected"]).inc();
}
return Err(CircuitBreakerError::Open); return Err(CircuitBreakerError::Open);
} }
match f.await { match f.await {
Ok(result) => { Ok(result) => {
if let Some(m) = &self.metrics {
m.calls.with_label_values(&["success"]).inc();
}
self.on_success().await; self.on_success().await;
Ok(result) Ok(result)
} }
Err(e) => { Err(e) => {
if let Some(m) = &self.metrics {
m.calls.with_label_values(&["failure"]).inc();
}
self.on_failure().await; self.on_failure().await;
Err(CircuitBreakerError::Inner(e)) Err(CircuitBreakerError::Inner(e))
} }
@ -120,9 +163,15 @@ impl CircuitBreaker {
state.status = STATUS_CLOSED; state.status = STATUS_CLOSED;
state.success_count = 0; state.success_count = 0;
state.half_open_calls = 0; state.half_open_calls = 0;
if let Some(m) = &self.metrics {
m.transitions
.with_label_values(&["half_open_to_closed"])
.inc();
}
} }
} }
} }
async fn on_failure(&self) { async fn on_failure(&self) {
let mut state = self.inner.state.lock().await; let mut state = self.inner.state.lock().await;
state.failure_count += 1; state.failure_count += 1;
@ -132,10 +181,18 @@ impl CircuitBreaker {
state.status = STATUS_OPEN; state.status = STATUS_OPEN;
state.success_count = 0; state.success_count = 0;
state.half_open_calls = 0; state.half_open_calls = 0;
if let Some(m) = &self.metrics {
m.transitions
.with_label_values(&["half_open_to_open"])
.inc();
}
} else if state.status == STATUS_CLOSED } else if state.status == STATUS_CLOSED
&& state.failure_count >= self.inner.config.failure_threshold && state.failure_count >= self.inner.config.failure_threshold
{ {
state.status = STATUS_OPEN; state.status = STATUS_OPEN;
if let Some(m) = &self.metrics {
m.transitions.with_label_values(&["closed_to_open"]).inc();
}
} }
} }

View File

@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid; use uuid::Uuid;
use super::common::{UserInfo, RoomInfo}; use super::common::{RoomInfo, UserInfo};
/// Created when a user publishes an article in an article channel. /// Created when a user publishes an article in an article channel.
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]

View File

@ -60,12 +60,10 @@ impl WsHandler {
author: author.clone(), author: author.clone(),
}; };
bus.publish_room_event(channel, "article.created", &data).await?; bus.publish_room_event(channel, "article.created", &data)
.await?;
Ok(Some(WsOutEvent::ArticleCreated { Ok(Some(WsOutEvent::ArticleCreated { room, data }))
room,
data,
}))
} }
pub(super) async fn article_update( pub(super) async fn article_update(
@ -142,12 +140,10 @@ impl WsHandler {
channel: room.clone(), channel: room.clone(),
}; };
bus.publish_room_event(row.channel, "article.updated", &data).await?; bus.publish_room_event(row.channel, "article.updated", &data)
.await?;
Ok(Some(WsOutEvent::ArticleUpdated { Ok(Some(WsOutEvent::ArticleUpdated { room, data }))
room,
data,
}))
} }
pub(super) async fn article_delete( pub(super) async fn article_delete(
@ -193,12 +189,10 @@ impl WsHandler {
deleted_by: deleted_by.clone(), deleted_by: deleted_by.clone(),
}; };
bus.publish_room_event(old.channel, "article.deleted", &data).await?; bus.publish_room_event(old.channel, "article.deleted", &data)
.await?;
Ok(Some(WsOutEvent::ArticleDeleted { Ok(Some(WsOutEvent::ArticleDeleted { room, data }))
room,
data,
}))
} }
pub(super) async fn article_list( pub(super) async fn article_list(
@ -383,7 +377,8 @@ impl WsHandler {
user, user,
like_count: new_count, like_count: new_count,
}; };
bus.publish_room_event(art.channel, "article.liked", &data).await?; bus.publish_room_event(art.channel, "article.liked", &data)
.await?;
Ok(Some(WsOutEvent::ArticleLiked { room, data })) Ok(Some(WsOutEvent::ArticleLiked { room, data }))
} else { } else {
let data = article::ArticleUnlikedService { let data = article::ArticleUnlikedService {
@ -392,7 +387,8 @@ impl WsHandler {
user, user,
like_count: new_count, like_count: new_count,
}; };
bus.publish_room_event(art.channel, "article.unliked", &data).await?; bus.publish_room_event(art.channel, "article.unliked", &data)
.await?;
Ok(Some(WsOutEvent::ArticleUnliked { room, data })) Ok(Some(WsOutEvent::ArticleUnliked { room, data }))
} }
} }
@ -462,7 +458,8 @@ impl WsHandler {
comment_count: count_row.0, comment_count: count_row.0,
}; };
bus.publish_room_event(art.channel, "article.comment.created", &data).await?; bus.publish_room_event(art.channel, "article.comment.created", &data)
.await?;
Ok(Some(WsOutEvent::ArticleCommentCreated { room, data })) Ok(Some(WsOutEvent::ArticleCommentCreated { room, data }))
} }
@ -601,7 +598,8 @@ impl WsHandler {
comment_count: count_row.0, comment_count: count_row.0,
}; };
bus.publish_room_event(art.channel, "article.comment.deleted", &data).await?; bus.publish_room_event(art.channel, "article.comment.deleted", &data)
.await?;
Ok(Some(WsOutEvent::ArticleCommentDeleted { room, data })) Ok(Some(WsOutEvent::ArticleCommentDeleted { room, data }))
} }
@ -657,7 +655,12 @@ impl WsHandler {
let users_map = bus.lookup_users(&user_ids).await?; let users_map = bus.lookup_users(&user_ids).await?;
let users: Vec<UserInfo> = user_ids let users: Vec<UserInfo> = user_ids
.iter() .iter()
.map(|id| users_map.get(id).cloned().unwrap_or_else(|| UserInfo::unknown(*id))) .map(|id| {
users_map
.get(id)
.cloned()
.unwrap_or_else(|| UserInfo::unknown(*id))
})
.collect(); .collect();
Ok(Some(WsOutEvent::ArticleLikedUsers { Ok(Some(WsOutEvent::ArticleLikedUsers {

View File

@ -103,6 +103,89 @@ impl WsHandler {
Ok(()) Ok(())
} }
/// Store parsed mentions in the room_mention table.
/// For `@all` mentions, also insert notifications for all workspace members.
async fn persist_mentions(
bus: &ChannelBus,
message_id: Uuid,
seq: i64,
mentions: &[crate::richtext::Mention],
sender_id: Uuid,
room: Uuid,
workspace: Uuid,
) -> ChannelResult<()> {
let mut has_all = false;
for mention in mentions {
db::sqlx::query(
"INSERT INTO room_mention (message, seq, mention_type, target_id) \
VALUES ($1, $2, $3, $4)",
)
.bind(message_id)
.bind(seq)
.bind(&mention.mention_type)
.bind(&mention.target_id)
.execute(bus.inner.db.writer())
.await?;
if mention.mention_type == "all" {
has_all = true;
}
}
// When @all is used, insert notification records for every workspace member
if has_all && !workspace.is_nil() {
let sender = bus
.lookup_user(sender_id)
.await
.unwrap_or_else(|_| crate::event::UserInfo::unknown(sender_id));
let room_info = bus
.lookup_room(room)
.await
.unwrap_or_else(|_| crate::event::RoomInfo::unknown(room));
let sender_name: &str = if sender.display_name.is_empty() {
&sender.username
} else {
&sender.display_name
};
let title = format!(
"{} mentioned @everyone in #{}",
sender_name, room_info.name
);
let members = bus
.list_workspace_members(workspace)
.await
.unwrap_or_default();
for (member_id, _username, _display_name, _avatar_url) in &members {
if *member_id == sender_id {
continue; // Don't notify the sender
}
let notify_id = Uuid::now_v7();
db::sqlx::query(
"INSERT INTO user_app_notify \
(id, \"user\", title, body, notify_type, target_type, target_id, \
created_at, updated_at) \
VALUES ($1, $2, $3, $4, $5, $6, $7, now(), now())",
)
.bind(notify_id)
.bind(member_id)
.bind(&title)
.bind(&format!("Message from {}", sender_name))
.bind("mention_all")
.bind("room")
.bind(room)
.execute(bus.inner.db.writer())
.await?;
}
}
Ok(())
}
pub(super) async fn message_create( pub(super) async fn message_create(
bus: &ChannelBus, bus: &ChannelBus,
user_id: Uuid, user_id: Uuid,
@ -111,6 +194,7 @@ impl WsHandler {
content_type: Option<String>, content_type: Option<String>,
thread: Option<Uuid>, thread: Option<Uuid>,
in_reply_to: Option<Uuid>, in_reply_to: Option<Uuid>,
attachment_ids: Option<Vec<Uuid>>,
) -> ChannelResult<Option<WsOutEvent>> { ) -> ChannelResult<Option<WsOutEvent>> {
Self::ensure_room_access(bus, user_id, room).await?; Self::ensure_room_access(bus, user_id, room).await?;
if content.len() > MAX_TEXT_LEN { if content.len() > MAX_TEXT_LEN {
@ -226,6 +310,10 @@ impl WsHandler {
let seq = bus.inner.seq.seq(room).await?; let seq = bus.inner.seq.seq(room).await?;
let sender = bus.lookup_user(user_id).await?; let sender = bus.lookup_user(user_id).await?;
let sender_for_response = sender.clone(); let sender_for_response = sender.clone();
// Parse mentions from content before inserting
let mentions = crate::richtext::parse_mentions(&content);
let row = db::sqlx::query_as::<_, model::channel::RoomMessageModel>( let row = db::sqlx::query_as::<_, model::channel::RoomMessageModel>(
"INSERT INTO room_message (room, seq, thread, parent, author, content, content_type) \ "INSERT INTO room_message (room, seq, thread, parent, author, content, content_type) \
VALUES ($1, $2, $3, $4, $5, $6, $7) \ VALUES ($1, $2, $3, $4, $5, $6, $7) \
@ -237,11 +325,35 @@ impl WsHandler {
.bind(effective_thread) .bind(effective_thread)
.bind(in_reply_to) .bind(in_reply_to)
.bind(user_id) .bind(user_id)
.bind(content) .bind(&content)
.bind(content_type.unwrap_or_else(|| "text".to_string())) .bind(content_type.clone().unwrap_or_else(|| "text".to_string()))
.fetch_one(bus.inner.db.writer()) .fetch_one(bus.inner.db.writer())
.await?; .await?;
// Store mentions in the room_mention table
let workspace = crate::rooms::room_workspace(&bus.inner.db, room)
.await?
.unwrap_or(Uuid::nil());
Self::persist_mentions(
bus, row.id, row.seq, &mentions, user_id, room, workspace,
)
.await?;
// Link attachments to the created message
if let Some(ref att_ids) = attachment_ids {
if !att_ids.is_empty() {
db::sqlx::query(
"UPDATE room_attachment SET message = $1, seq = $2 WHERE id = ANY($3) AND uploaded_by = $4",
)
.bind(row.id)
.bind(row.seq)
.bind(att_ids)
.bind(user_id)
.execute(bus.inner.db.writer())
.await?;
}
}
bus.publish_room_message(row.clone(), Some(sender)).await?; bus.publish_room_message(row.clone(), Some(sender)).await?;
let msg_room = bus let msg_room = bus
.lookup_room(room) .lookup_room(room)

View File

@ -87,6 +87,7 @@ impl WsHandler {
content_type, content_type,
thread, thread,
in_reply_to, in_reply_to,
attachment_ids,
} => { } => {
Self::message_create( Self::message_create(
bus, bus,
@ -96,6 +97,7 @@ impl WsHandler {
content_type, content_type,
thread, thread,
in_reply_to, in_reply_to,
attachment_ids,
) )
.await .await
} }
@ -117,8 +119,14 @@ impl WsHandler {
channel_type, channel_type,
} => { } => {
Self::room_create( Self::room_create(
bus, user_id, workspace, room_name, public, category, bus,
ai_enabled, channel_type, user_id,
workspace,
room_name,
public,
category,
ai_enabled,
channel_type,
) )
.await .await
} }
@ -357,8 +365,16 @@ impl WsHandler {
status, status,
} => { } => {
Self::article_create( Self::article_create(
bus, user_id, channel, title, cover_url, content, bus,
content_type, summary, tags, status, user_id,
channel,
title,
cover_url,
content,
content_type,
summary,
tags,
status,
) )
.await .await
} }
@ -374,8 +390,17 @@ impl WsHandler {
status, status,
} => { } => {
Self::article_update( Self::article_update(
bus, user_id, article_id, title, cover_url, content, bus,
content_type, summary, tags, is_pinned, status, user_id,
article_id,
title,
cover_url,
content,
content_type,
summary,
tags,
is_pinned,
status,
) )
.await .await
} }
@ -386,9 +411,7 @@ impl WsHandler {
channel, channel,
before, before,
limit, limit,
} => { } => Self::article_list(bus, user_id, channel, before, limit).await,
Self::article_list(bus, user_id, channel, before, limit).await
}
WsInMessage::ArticleGet { article_id } => { WsInMessage::ArticleGet { article_id } => {
Self::article_get(bus, user_id, article_id).await Self::article_get(bus, user_id, article_id).await
} }

View File

@ -9,6 +9,7 @@ use super::WsOutEvent;
/// Helper struct for thread_list JOIN query result /// Helper struct for thread_list JOIN query result
#[derive(db::sqlx::FromRow)] #[derive(db::sqlx::FromRow)]
#[allow(dead_code)]
struct ThreadListRow { struct ThreadListRow {
id: Uuid, id: Uuid,
room: Uuid, room: Uuid,

View File

@ -2,9 +2,9 @@ use serde::Serialize;
use uuid::Uuid; use uuid::Uuid;
use crate::event::{ use crate::event::{
RoomInfo, WorkspaceInfo, article, attachment, ban, category, conversation, draft, RoomInfo, WorkspaceInfo, article, attachment, ban, category, conversation,
forward, invite, member, message, message_read, notify, pin, presence, draft, forward, invite, member, message, message_read, notify, pin,
reaction, rooms, search, star, thread, voice, workspace, presence, reaction, rooms, search, star, thread, voice, workspace,
}; };
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]

View File

@ -44,6 +44,7 @@ pub enum WsInMessage {
content_type: Option<String>, content_type: Option<String>,
thread: Option<Uuid>, thread: Option<Uuid>,
in_reply_to: Option<Uuid>, in_reply_to: Option<Uuid>,
attachment_ids: Option<Vec<Uuid>>,
}, },
MessageUpdate { MessageUpdate {
message: Uuid, message: Uuid,

View File

@ -1,7 +1,7 @@
use socketio::{EventPayload, Socket}; use socketio::{EventPayload, Socket};
use uuid::Uuid; use uuid::Uuid;
use crate::{ChannelBus, ChannelError, ChannelResult}; use crate::{ChannelBus, ChannelError};
use super::handler::WsHandler; use super::handler::WsHandler;
use super::out_event::{WsError, WsOutEvent}; use super::out_event::{WsError, WsOutEvent};

View File

@ -11,6 +11,7 @@ pub mod http;
mod metrics; mod metrics;
mod pagination; mod pagination;
mod reconnect; mod reconnect;
pub mod richtext;
pub mod rooms; pub mod rooms;
mod search; mod search;
mod security; mod security;
@ -32,6 +33,7 @@ pub use pagination::{
PaginationParams, PaginationParams,
}; };
pub use reconnect::{ClientState, MissedMessage, ReconnectManager}; pub use reconnect::{ClientState, MissedMessage, ReconnectManager};
pub use richtext::{Mention, parse_mentions};
pub use search::{SearchEngine, SearchHit, SearchQuery, SearchResult}; pub use search::{SearchEngine, SearchHit, SearchQuery, SearchResult};
pub use security::{CsrfProtection, RateLimiter}; pub use security::{CsrfProtection, RateLimiter};
pub use seq::SeqAllocator; pub use seq::SeqAllocator;

View File

@ -1,45 +1,94 @@
use std::sync::Arc; use std::sync::Arc;
use track::{CounterVec, Gauge};
#[derive(Clone)] #[derive(Clone)]
pub struct ChannelMetrics { pub struct ChannelMetrics {
pub messages_sent: Arc<std::sync::atomic::AtomicU64>, pub messages_sent: Arc<std::sync::atomic::AtomicU64>,
pub messages_received: Arc<std::sync::atomic::AtomicU64>, pub messages_received: Arc<std::sync::atomic::AtomicU64>,
pub messages_failed: Arc<std::sync::atomic::AtomicU64>, pub messages_failed: Arc<std::sync::atomic::AtomicU64>,
pub active_connections: Arc<std::sync::atomic::AtomicI64>, pub active_connections: Arc<std::sync::atomic::AtomicI64>,
events_total: Option<CounterVec>,
active_connections_gauge: Option<Gauge>,
} }
impl ChannelMetrics { impl ChannelMetrics {
pub fn new() -> Self { pub fn new(registry: Option<track::MetricsRegistry>) -> Self {
let events_total = registry.as_ref().and_then(|registry| {
registry
.register_counter_vec(
"channel_events_total",
"Total channel socket and message events",
&["event"],
)
.map_err(|error| {
tracing::warn!(%error, "failed to register channel_events_total");
error
})
.ok()
});
let active_connections_gauge = registry.as_ref().and_then(|registry| {
registry
.register_gauge(
"channel_active_connections",
"Current active channel socket connections",
)
.map_err(|error| {
tracing::warn!(%error, "failed to register channel_active_connections");
error
})
.ok()
});
Self { Self {
messages_sent: Arc::new(std::sync::atomic::AtomicU64::new(0)), messages_sent: Arc::new(std::sync::atomic::AtomicU64::new(0)),
messages_received: Arc::new(std::sync::atomic::AtomicU64::new(0)), messages_received: Arc::new(std::sync::atomic::AtomicU64::new(0)),
messages_failed: Arc::new(std::sync::atomic::AtomicU64::new(0)), messages_failed: Arc::new(std::sync::atomic::AtomicU64::new(0)),
active_connections: Arc::new(std::sync::atomic::AtomicI64::new(0)), active_connections: Arc::new(std::sync::atomic::AtomicI64::new(0)),
events_total,
active_connections_gauge,
} }
} }
pub fn increment_sent(&self) { pub fn increment_sent(&self) {
self.messages_sent self.messages_sent
.fetch_add(1, std::sync::atomic::Ordering::Relaxed); .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.record_event("sent");
} }
pub fn increment_received(&self) { pub fn increment_received(&self) {
self.messages_received self.messages_received
.fetch_add(1, std::sync::atomic::Ordering::Relaxed); .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.record_event("received");
} }
pub fn increment_failed(&self) { pub fn increment_failed(&self) {
self.messages_failed self.messages_failed
.fetch_add(1, std::sync::atomic::Ordering::Relaxed); .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.record_event("failed");
} }
pub fn increment_connections(&self) { pub fn increment_connections(&self) {
self.active_connections self.active_connections
.fetch_add(1, std::sync::atomic::Ordering::Relaxed); .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if let Some(gauge) = &self.active_connections_gauge {
gauge.inc();
}
self.record_event("connected");
} }
pub fn decrement_connections(&self) { pub fn decrement_connections(&self) {
self.active_connections self.active_connections
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
if let Some(gauge) = &self.active_connections_gauge {
gauge.dec();
}
self.record_event("disconnected");
}
fn record_event(&self, event: &str) {
if let Some(counter) = &self.events_total {
counter.with_label_values(&[event]).inc();
}
} }
} }

View File

@ -1,112 +1,150 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)] /// Parsed mention from `@[type:id:label]` IR format.
pub struct RichTextBlock { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub block_type: BlockType, pub struct Mention {
pub content: String, pub mention_type: String,
pub attributes: Option<serde_json::Value>, pub target_id: String,
pub label: String,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] /// Parse all `@[type:id:label]` mentions from content.
#[serde(rename_all = "lowercase")] /// Returns deduplicated mentions in order of first appearance.
pub enum BlockType { pub fn parse_mentions(content: &str) -> Vec<Mention> {
Text, let mut mentions = Vec::new();
Code, let mut seen = std::collections::HashSet::new();
Quote,
Link,
Mention,
Emoji,
Image,
}
pub struct RichTextRenderer; // Simple manual parser for @[type:id:label]
let bytes = content.as_bytes();
let len = bytes.len();
let mut i = 0;
impl RichTextRenderer { while i < len {
pub fn new() -> Self { // Look for "@["
Self {} if i + 2 < len && bytes[i] == b'@' && bytes[i + 1] == b'[' {
} let start = i + 2; // after "@["
pub fn parse_markdown(&self, content: &str) -> Vec<RichTextBlock> { // Find first ':'' after start
vec![RichTextBlock { if let Some(type_end) = content[start..].find(':') {
block_type: BlockType::Text, let mention_type = &content[start..start + type_end];
content: content.to_string(), let after_type = start + type_end + 1; // after first ':'
attributes: None,
}]
}
pub fn parse_mentions(&self, content: &str) -> Vec<Uuid> { // Find second ':' (between id and label)
content if let Some(id_end) = content[after_type..].find(':') {
.split_whitespace() let target_id = &content[after_type..after_type + id_end];
.filter(|w| w.starts_with('@')) let after_id = after_type + id_end + 1; // after second ':'
.filter_map(|w| Uuid::parse_str(&w[1..]).ok())
.collect()
}
pub fn highlight_code(&self, code: &str, language: &str) -> String { // Find closing ']'
format!("```{}\n{}\n```", language, code) if let Some(close) = content[after_id..].find(']') {
} let label = &content[after_id..after_id + close];
pub fn render_to_html(&self, blocks: &[RichTextBlock]) -> String { if !mention_type.is_empty() && !target_id.is_empty() {
blocks let key = format!("{}:{}", mention_type, target_id);
.iter() if seen.insert(key) {
.map(|block| match block.block_type { mentions.push(Mention {
BlockType::Text => { mention_type: mention_type.to_string(),
format!("<p>{}</p>", html_escape(&block.content)) target_id: target_id.to_string(),
} label: label.to_string(),
BlockType::Code => format!( });
"<pre><code>{}</code></pre>", }
html_escape(&block.content) }
),
BlockType::Quote => format!( i = after_id + close + 1; // skip past ']'
"<blockquote>{}</blockquote>", continue;
html_escape(&block.content)
),
BlockType::Link => {
let safe_href = sanitize_uri(&block.content);
format!(
"<a href=\"{}\">{}</a>",
html_escape(&safe_href),
html_escape(&block.content)
)
}
BlockType::Mention => format!(
"<span class=\"mention\">@{}</span>",
html_escape(&block.content)
),
BlockType::Emoji => format!(
"<span class=\"emoji\">{}</span>",
html_escape(&block.content)
),
BlockType::Image => {
let safe_src = sanitize_uri(&block.content);
if safe_src.is_empty() {
String::new()
} else {
format!("<img src=\"{}\" />", html_escape(&safe_src))
} }
} }
}) }
.collect::<Vec<_>>() }
.join("\n") i += 1;
}
}
fn sanitize_uri(uri: &str) -> String {
let lower = uri.to_lowercase();
if lower.starts_with("http://")
|| lower.starts_with("https://")
|| lower.starts_with("mailto:")
{
uri.to_string()
} else {
String::new()
} }
mentions
} }
fn html_escape(s: &str) -> String { /// Extract unique target IDs of a specific mention type.
s.replace('&', "&amp;") pub fn extract_mention_ids(
.replace('<', "&lt;") mentions: &[Mention],
.replace('>', "&gt;") mention_type: &str,
.replace('"', "&quot;") ) -> Vec<String> {
.replace('\'', "&#x27;") mentions
.iter()
.filter(|m| m.mention_type == mention_type)
.map(|m| m.target_id.clone())
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_single_room_mention() {
let input = "hey check out @[room:abc123:general]";
let mentions = parse_mentions(input);
assert_eq!(mentions.len(), 1);
assert_eq!(mentions[0].mention_type, "room");
assert_eq!(mentions[0].target_id, "abc123");
assert_eq!(mentions[0].label, "general");
}
#[test]
fn test_parse_single_repo_mention() {
let input = "look at @[repo:my-repo:my-repo]";
let mentions = parse_mentions(input);
assert_eq!(mentions.len(), 1);
assert_eq!(mentions[0].mention_type, "repo");
assert_eq!(mentions[0].target_id, "my-repo");
}
#[test]
fn test_parse_multiple_mentions() {
let input =
"compare @[repo:backend:backend] with @[repo:frontend:frontend]";
let mentions = parse_mentions(input);
assert_eq!(mentions.len(), 2);
assert_eq!(mentions[0].target_id, "backend");
assert_eq!(mentions[1].target_id, "frontend");
}
#[test]
fn test_deduplicate() {
let input = "look at @[repo:a:a] and also @[repo:a:a] please";
let mentions = parse_mentions(input);
assert_eq!(mentions.len(), 1);
}
#[test]
fn test_no_mentions() {
let input = "hello world no mentions here";
let mentions = parse_mentions(input);
assert_eq!(mentions.len(), 0);
}
#[test]
fn test_mixed_mentions() {
let input = "@[user:abc:John] and @[room:xyz:general] and @[repo:r:r]";
let mentions = parse_mentions(input);
assert_eq!(mentions.len(), 3);
}
#[test]
fn test_incomplete_mention_ignored() {
let input = "this @[incomplete is just text";
let mentions = parse_mentions(input);
assert_eq!(mentions.len(), 0);
}
#[test]
fn test_empty_input() {
let mentions = parse_mentions("");
assert_eq!(mentions.len(), 0);
}
#[test]
fn test_extract_mention_ids() {
let input = "@[repo:a:a] and @[room:b:general] and @[repo:c:c]";
let mentions = parse_mentions(input);
let repo_ids = extract_mention_ids(&mentions, "repo");
assert_eq!(repo_ids, vec!["a", "c"]);
}
} }

View File

@ -1,4 +1,5 @@
use std::time::Duration; use std::time::Duration;
use track::CounterVec;
use uuid::Uuid; use uuid::Uuid;
use crate::{ChannelError, ChannelResult}; use crate::{ChannelError, ChannelResult};
@ -22,6 +23,26 @@ pub struct RateLimiter {
cache: cache::AppCache, cache: cache::AppCache,
max_requests: u32, max_requests: u32,
window: Duration, window: Duration,
metrics: Option<RateLimiterMetrics>,
}
#[derive(Clone)]
struct RateLimiterMetrics {
outcomes: CounterVec,
}
impl RateLimiterMetrics {
fn new(registry: &track::MetricsRegistry) -> Self {
Self {
outcomes: registry
.register_counter_vec(
"rate_limiter_decisions_total",
"Rate limiter decisions",
&["action", "outcome"],
)
.expect("failed to register rate_limiter_decisions_total"),
}
}
} }
impl RateLimiter { impl RateLimiter {
@ -30,6 +51,7 @@ impl RateLimiter {
cache, cache,
max_requests: 100, max_requests: 100,
window: Duration::from_secs(60), window: Duration::from_secs(60),
metrics: None,
} }
} }
@ -42,9 +64,14 @@ impl RateLimiter {
cache, cache,
max_requests, max_requests,
window, window,
metrics: None,
} }
} }
pub fn set_metrics(&mut self, registry: &track::MetricsRegistry) {
self.metrics = Some(RateLimiterMetrics::new(registry));
}
pub async fn check_rate_limit( pub async fn check_rate_limit(
&self, &self,
user_id: Uuid, user_id: Uuid,
@ -65,7 +92,12 @@ impl RateLimiter {
.await .await
.map_err(|e| ChannelError::Cache(cache::CacheError::Redis(e)))?; .map_err(|e| ChannelError::Cache(cache::CacheError::Redis(e)))?;
Ok(allowed == 1) let is_allowed = allowed == 1;
if let Some(m) = &self.metrics {
let outcome = if is_allowed { "allowed" } else { "blocked" };
m.outcomes.with_label_values(&[action, outcome]).inc();
}
Ok(is_allowed)
} }
pub async fn get_remaining( pub async fn get_remaining(

View File

@ -3,7 +3,6 @@ use serde::{Deserialize, Serialize};
use sqlx::FromRow; use sqlx::FromRow;
use uuid::Uuid; use uuid::Uuid;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, FromRow)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, FromRow)]
pub struct ChannelArticleModel { pub struct ChannelArticleModel {
pub id: Uuid, pub id: Uuid,
@ -26,7 +25,6 @@ pub struct ChannelArticleModel {
pub deleted_at: Option<DateTime<Utc>>, pub deleted_at: Option<DateTime<Utc>>,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateArticlePayload { pub struct CreateArticlePayload {
pub channel: Uuid, pub channel: Uuid,
@ -39,7 +37,6 @@ pub struct CreateArticlePayload {
pub status: Option<String>, pub status: Option<String>,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdateArticlePayload { pub struct UpdateArticlePayload {
pub title: Option<String>, pub title: Option<String>,
@ -52,7 +49,6 @@ pub struct UpdateArticlePayload {
pub status: Option<String>, pub status: Option<String>,
} }
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)] #[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct ChannelArticleCard { pub struct ChannelArticleCard {
pub id: Uuid, pub id: Uuid,

View File

@ -15,19 +15,19 @@ pub mod room_server_label;
pub mod room_threads; pub mod room_threads;
pub mod user_room_state; pub mod user_room_state;
pub use message_read::MessageReadModel;
pub use message_star::MessageStarModel;
pub use channel::ChannelModel; pub use channel::ChannelModel;
pub use channel::ChannelType; pub use channel::ChannelType;
pub use channel_article::ChannelArticleModel;
pub use channel_article::ChannelArticleCard; pub use channel_article::ChannelArticleCard;
pub use channel_article::ChannelArticleModel;
pub use channel_article::CreateArticlePayload; pub use channel_article::CreateArticlePayload;
pub use channel_article::UpdateArticlePayload; pub use channel_article::UpdateArticlePayload;
pub use channel_article_interact::ArticleLikeModel;
pub use channel_article_interact::ArticleCommentModel;
pub use channel_article_interact::ArticleCommentItem; pub use channel_article_interact::ArticleCommentItem;
pub use channel_article_interact::ArticleCommentList; pub use channel_article_interact::ArticleCommentList;
pub use channel_article_interact::ArticleCommentModel;
pub use channel_article_interact::ArticleLikeModel;
pub use channel_article_interact::CreateCommentPayload; pub use channel_article_interact::CreateCommentPayload;
pub use message_read::MessageReadModel;
pub use message_star::MessageStarModel;
pub use room_attachments::RoomAttachmentModel; pub use room_attachments::RoomAttachmentModel;
pub use room_categories::RoomCategoryModel; pub use room_categories::RoomCategoryModel;
pub use room_mention::RoomMentionModel; pub use room_mention::RoomMentionModel;

View File

@ -6,7 +6,7 @@ use uuid::Uuid;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, FromRow)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, FromRow)]
pub struct RoomAttachmentModel { pub struct RoomAttachmentModel {
pub id: Uuid, pub id: Uuid,
pub message: Uuid, pub message: Option<Uuid>,
pub seq: i64, pub seq: i64,
pub file_name: String, pub file_name: String,
pub content_type: Option<String>, pub content_type: Option<String>,

View File

@ -9,6 +9,6 @@ pub struct RoomMentionModel {
pub message: Uuid, pub message: Uuid,
pub seq: i64, pub seq: i64,
pub mention_type: String, pub mention_type: String,
pub target_id: Uuid, pub target_id: String,
pub created_at: DateTime<Utc>, pub created_at: DateTime<Utc>,
} }

View File

@ -2,12 +2,12 @@ use db::AppDatabase;
pub mod agent; pub mod agent;
pub mod ai; pub mod ai;
pub mod channel;
pub mod issues; pub mod issues;
pub mod logs; pub mod logs;
pub mod notify; pub mod notify;
pub mod pull_request; pub mod pull_request;
pub mod repos; pub mod repos;
pub mod channel;
pub mod system; pub mod system;
pub mod users; pub mod users;
pub mod workspace; pub mod workspace;