133 lines
4.2 KiB
Rust
133 lines
4.2 KiB
Rust
use chrono::Utc;
|
|
use uuid::Uuid;
|
|
|
|
use crate::event::{RoomInfo, UserInfo, message_read};
|
|
use crate::{ChannelBus, ChannelResult};
|
|
|
|
use super::WsHandler;
|
|
use super::WsOutEvent;
|
|
|
|
impl WsHandler {
|
|
pub(super) async fn message_mark_read(
|
|
bus: &ChannelBus,
|
|
user_id: Uuid,
|
|
room: Uuid,
|
|
message_ids: Vec<Uuid>,
|
|
) -> ChannelResult<Option<WsOutEvent>> {
|
|
Self::ensure_room_access(bus, user_id, room).await?;
|
|
|
|
if message_ids.is_empty() {
|
|
return Ok(None);
|
|
}
|
|
|
|
let now = Utc::now();
|
|
db::sqlx::query(
|
|
"INSERT INTO message_read (message, room, \"user\", read_at) \
|
|
SELECT m.id, m.room, $1, $2 \
|
|
FROM room_message m \
|
|
WHERE m.id = ANY($3) AND m.room = $4 AND m.deleted_at IS NULL \
|
|
ON CONFLICT (message, \"user\") DO NOTHING",
|
|
)
|
|
.bind(user_id)
|
|
.bind(now)
|
|
.bind(&message_ids)
|
|
.bind(room)
|
|
.execute(bus.inner.db.writer())
|
|
.await?;
|
|
let max_seq_row: Option<(i64,)> = db::sqlx::query_as(
|
|
"SELECT MAX(seq) FROM room_message \
|
|
WHERE id = ANY($1) AND room = $2 AND deleted_at IS NULL",
|
|
)
|
|
.bind(&message_ids)
|
|
.bind(room)
|
|
.fetch_optional(bus.inner.db.reader())
|
|
.await?;
|
|
let max_seq = max_seq_row.map(|r| r.0).unwrap_or(0);
|
|
|
|
if max_seq > 0 {
|
|
db::sqlx::query(
|
|
"INSERT INTO user_room_state (\"user\", room, last_read_seq, last_read_at, updated_at) \
|
|
VALUES ($1, $2, $3, $4, $4) \
|
|
ON CONFLICT (\"user\", room) DO UPDATE \
|
|
SET last_read_seq = GREATEST(user_room_state.last_read_seq, $3), \
|
|
last_read_at = $4, updated_at = $4",
|
|
)
|
|
.bind(user_id)
|
|
.bind(room)
|
|
.bind(max_seq)
|
|
.bind(now)
|
|
.execute(bus.inner.db.writer())
|
|
.await?;
|
|
}
|
|
|
|
let room_info = bus
|
|
.lookup_room(room)
|
|
.await
|
|
.unwrap_or_else(|_| RoomInfo::unknown(room));
|
|
let reader_info = bus
|
|
.lookup_user(user_id)
|
|
.await
|
|
.unwrap_or_else(|_| UserInfo::unknown(user_id));
|
|
|
|
let data = message_read::MessageReadBatchService {
|
|
room: room_info.clone(),
|
|
message_ids,
|
|
last_seq: max_seq,
|
|
reader: reader_info,
|
|
read_at: now,
|
|
};
|
|
bus.publish_room_event(room, "message.read_batch", &data)
|
|
.await?;
|
|
Ok(Some(WsOutEvent::MessageReadBatch {
|
|
room: room_info,
|
|
data,
|
|
}))
|
|
}
|
|
pub(super) async fn message_get_readers(
|
|
bus: &ChannelBus,
|
|
user_id: Uuid,
|
|
message_id: Uuid,
|
|
) -> ChannelResult<Option<WsOutEvent>> {
|
|
let msg_room: Option<(Uuid, Uuid)> = db::sqlx::query_as(
|
|
"SELECT room, seq FROM room_message WHERE id = $1 AND deleted_at IS NULL",
|
|
)
|
|
.bind(message_id)
|
|
.fetch_optional(bus.inner.db.reader())
|
|
.await?;
|
|
|
|
let Some((room, _seq)) = msg_room else {
|
|
return Err(crate::ChannelError::RoomNotFound);
|
|
};
|
|
Self::ensure_room_access(bus, user_id, room).await?;
|
|
|
|
let rows = db::sqlx::query_as::<_, (Uuid, chrono::DateTime<Utc>)>(
|
|
"SELECT \"user\", read_at FROM message_read \
|
|
WHERE message = $1 ORDER BY read_at ASC",
|
|
)
|
|
.bind(message_id)
|
|
.fetch_all(bus.inner.db.reader())
|
|
.await?;
|
|
|
|
let user_ids: Vec<Uuid> = rows.iter().map(|(uid, _)| *uid).collect();
|
|
let users = bus.lookup_users(&user_ids).await.unwrap_or_default();
|
|
|
|
let readers: Vec<message_read::MessageReaderEntry> = rows
|
|
.into_iter()
|
|
.map(|(uid, read_at)| message_read::MessageReaderEntry {
|
|
user: users
|
|
.get(&uid)
|
|
.cloned()
|
|
.unwrap_or_else(|| UserInfo::unknown(uid)),
|
|
read_at,
|
|
})
|
|
.collect();
|
|
|
|
Ok(Some(WsOutEvent::MessageReaders {
|
|
data: message_read::MessageReadersService {
|
|
message_id,
|
|
readers,
|
|
},
|
|
}))
|
|
}
|
|
}
|