gitdataai/lib/channel/http/handler/message_read.rs

133 lines
4.2 KiB
Rust

use chrono::Utc;
use uuid::Uuid;
use crate::event::{RoomInfo, UserInfo, message_read};
use crate::{ChannelBus, ChannelResult};
use super::WsHandler;
use super::WsOutEvent;
impl WsHandler {
pub(super) async fn message_mark_read(
bus: &ChannelBus,
user_id: Uuid,
room: Uuid,
message_ids: Vec<Uuid>,
) -> ChannelResult<Option<WsOutEvent>> {
Self::ensure_room_access(bus, user_id, room).await?;
if message_ids.is_empty() {
return Ok(None);
}
let now = Utc::now();
db::sqlx::query(
"INSERT INTO message_read (message, room, \"user\", read_at) \
SELECT m.id, m.room, $1, $2 \
FROM room_message m \
WHERE m.id = ANY($3) AND m.room = $4 AND m.deleted_at IS NULL \
ON CONFLICT (message, \"user\") DO NOTHING",
)
.bind(user_id)
.bind(now)
.bind(&message_ids)
.bind(room)
.execute(bus.inner.db.writer())
.await?;
let max_seq_row: Option<(i64,)> = db::sqlx::query_as(
"SELECT MAX(seq) FROM room_message \
WHERE id = ANY($1) AND room = $2 AND deleted_at IS NULL",
)
.bind(&message_ids)
.bind(room)
.fetch_optional(bus.inner.db.reader())
.await?;
let max_seq = max_seq_row.map(|r| r.0).unwrap_or(0);
if max_seq > 0 {
db::sqlx::query(
"INSERT INTO user_room_state (\"user\", room, last_read_seq, last_read_at, updated_at) \
VALUES ($1, $2, $3, $4, $4) \
ON CONFLICT (\"user\", room) DO UPDATE \
SET last_read_seq = GREATEST(user_room_state.last_read_seq, $3), \
last_read_at = $4, updated_at = $4",
)
.bind(user_id)
.bind(room)
.bind(max_seq)
.bind(now)
.execute(bus.inner.db.writer())
.await?;
}
let room_info = bus
.lookup_room(room)
.await
.unwrap_or_else(|_| RoomInfo::unknown(room));
let reader_info = bus
.lookup_user(user_id)
.await
.unwrap_or_else(|_| UserInfo::unknown(user_id));
let data = message_read::MessageReadBatchService {
room: room_info.clone(),
message_ids,
last_seq: max_seq,
reader: reader_info,
read_at: now,
};
bus.publish_room_event(room, "message.read_batch", &data)
.await?;
Ok(Some(WsOutEvent::MessageReadBatch {
room: room_info,
data,
}))
}
pub(super) async fn message_get_readers(
bus: &ChannelBus,
user_id: Uuid,
message_id: Uuid,
) -> ChannelResult<Option<WsOutEvent>> {
let msg_room: Option<(Uuid, Uuid)> = db::sqlx::query_as(
"SELECT room, seq FROM room_message WHERE id = $1 AND deleted_at IS NULL",
)
.bind(message_id)
.fetch_optional(bus.inner.db.reader())
.await?;
let Some((room, _seq)) = msg_room else {
return Err(crate::ChannelError::RoomNotFound);
};
Self::ensure_room_access(bus, user_id, room).await?;
let rows = db::sqlx::query_as::<_, (Uuid, chrono::DateTime<Utc>)>(
"SELECT \"user\", read_at FROM message_read \
WHERE message = $1 ORDER BY read_at ASC",
)
.bind(message_id)
.fetch_all(bus.inner.db.reader())
.await?;
let user_ids: Vec<Uuid> = rows.iter().map(|(uid, _)| *uid).collect();
let users = bus.lookup_users(&user_ids).await.unwrap_or_default();
let readers: Vec<message_read::MessageReaderEntry> = rows
.into_iter()
.map(|(uid, read_at)| message_read::MessageReaderEntry {
user: users
.get(&uid)
.cloned()
.unwrap_or_else(|| UserInfo::unknown(uid)),
read_at,
})
.collect();
Ok(Some(WsOutEvent::MessageReaders {
data: message_read::MessageReadersService {
message_id,
readers,
},
}))
}
}