use chrono::Utc; use uuid::Uuid; use crate::event::{RoomInfo, UserInfo, message_read}; use crate::{ChannelBus, ChannelResult}; use super::WsOutEvent; use super::WsHandler; impl WsHandler { pub(super) async fn message_mark_read( bus: &ChannelBus, user_id: Uuid, room: Uuid, message_ids: Vec, ) -> ChannelResult> { Self::ensure_room_access(bus, user_id, room).await?; if message_ids.is_empty() { return Ok(None); } let now = Utc::now(); db::sqlx::query( "INSERT INTO message_read (message, room, \"user\", read_at) \ SELECT m.id, m.room, $1, $2 \ FROM room_message m \ WHERE m.id = ANY($3) AND m.room = $4 AND m.deleted_at IS NULL \ ON CONFLICT (message, \"user\") DO NOTHING", ) .bind(user_id) .bind(now) .bind(&message_ids) .bind(room) .execute(bus.inner.db.writer()) .await?; let max_seq_row: Option<(i64,)> = db::sqlx::query_as( "SELECT MAX(seq) FROM room_message \ WHERE id = ANY($1) AND room = $2 AND deleted_at IS NULL", ) .bind(&message_ids) .bind(room) .fetch_optional(bus.inner.db.reader()) .await?; let max_seq = max_seq_row.map(|r| r.0).unwrap_or(0); if max_seq > 0 { db::sqlx::query( "INSERT INTO user_room_state (\"user\", room, last_read_seq, last_read_at, updated_at) \ VALUES ($1, $2, $3, $4, $4) \ ON CONFLICT (\"user\", room) DO UPDATE \ SET last_read_seq = GREATEST(user_room_state.last_read_seq, $3), \ last_read_at = $4, updated_at = $4", ) .bind(user_id) .bind(room) .bind(max_seq) .bind(now) .execute(bus.inner.db.writer()) .await?; } let room_info = bus.lookup_room(room).await.unwrap_or_else(|_| RoomInfo::unknown(room)); let reader_info = bus.lookup_user(user_id).await.unwrap_or_else(|_| UserInfo::unknown(user_id)); let data = message_read::MessageReadBatchService { room: room_info.clone(), message_ids, last_seq: max_seq, reader: reader_info, read_at: now, }; bus.publish_room_event(room, "message.read_batch", &data).await?; Ok(Some(WsOutEvent::MessageReadBatch { room: room_info, data, })) } pub(super) async fn message_get_readers( bus: &ChannelBus, user_id: Uuid, message_id: Uuid, ) -> ChannelResult> { let msg_room: Option<(Uuid, Uuid)> = db::sqlx::query_as( "SELECT room, seq FROM room_message WHERE id = $1 AND deleted_at IS NULL", ) .bind(message_id) .fetch_optional(bus.inner.db.reader()) .await?; let Some((room, _seq)) = msg_room else { return Err(crate::ChannelError::RoomNotFound); }; Self::ensure_room_access(bus, user_id, room).await?; let rows = db::sqlx::query_as::<_, (Uuid, chrono::DateTime)>( "SELECT \"user\", read_at FROM message_read \ WHERE message = $1 ORDER BY read_at ASC", ) .bind(message_id) .fetch_all(bus.inner.db.reader()) .await?; let user_ids: Vec = rows.iter().map(|(uid, _)| *uid).collect(); let users = bus.lookup_users(&user_ids).await.unwrap_or_default(); let readers: Vec = rows .into_iter() .map(|(uid, read_at)| message_read::MessageReaderEntry { user: users .get(&uid) .cloned() .unwrap_or_else(|| UserInfo::unknown(uid)), read_at, }) .collect(); Ok(Some(WsOutEvent::MessageReaders { data: message_read::MessageReadersService { message_id, readers, }, })) } }