Compare commits

..

15 Commits

Author SHA1 Message Date
ZhenYi
eba75ee359 fix(admin): ioredis 5.x username/password in redisOptions, add logging
Some checks are pending
CI / Rust Lint & Check (push) Waiting to run
CI / Rust Tests (push) Waiting to run
CI / Frontend Lint & Type Check (push) Waiting to run
CI / Frontend Build (push) Blocked by required conditions
2026-04-20 15:50:51 +08:00
ZhenYi
779aaba575 feat: add service worker for push notifications and update room documentation 2026-04-20 15:45:51 +08:00
ZhenYi
6f6f57f062 feat(frontend): add push notification hooks, image compression, and update room/chat components 2026-04-20 15:45:47 +08:00
ZhenYi
8316fe926f feat(service): add push and storage service modules, update project/user/workspace services 2026-04-20 15:45:40 +08:00
ZhenYi
0c64122b80 chore(frontend): update frontend build configuration 2026-04-20 15:45:35 +08:00
ZhenYi
26865f8dcf chore(api): update dist.rs 2026-04-20 15:45:30 +08:00
ZhenYi
0e4631ec75 chore(api/user): update user notification endpoint 2026-04-20 15:45:26 +08:00
ZhenYi
26c86f0796 feat(api/room): add upload handler and update websocket handler 2026-04-20 15:45:22 +08:00
ZhenYi
cec8d486f1 feat(room): update room lib (connection, helpers, member, message, notification, reaction, room, search, service, types) 2026-04-20 15:45:18 +08:00
ZhenYi
1b863a9f65 chore(queue): update queue types 2026-04-20 15:45:13 +08:00
ZhenYi
2186960002 chore(models/users): update user notification model 2026-04-20 15:45:08 +08:00
ZhenYi
a2e8f5bf5b feat(models/rooms): add room attachment model and update room message/notifications 2026-04-20 15:45:03 +08:00
ZhenYi
98e6f77341 feat(migrate): add room attachment, push subscription, and model_id migrations 2026-04-20 15:44:59 +08:00
ZhenYi
d09af7c326 feat(config): add storage configuration module 2026-04-20 15:44:54 +08:00
ZhenYi
ba15324603 chore: update dependencies (cargo + npm) 2026-04-20 15:44:49 +08:00
62 changed files with 7076 additions and 283 deletions

272
Cargo.lock generated
View File

@ -68,7 +68,7 @@ checksum = "daa239b93927be1ff123eebada5a3ff23e89f0124ccb8609234e5103d5a5ae6d"
dependencies = [
"actix-utils",
"actix-web",
"derive_more",
"derive_more 2.1.1",
"futures-util",
"log",
"once_cell",
@ -87,7 +87,7 @@ dependencies = [
"actix-web",
"bitflags",
"bytes",
"derive_more",
"derive_more 2.1.1",
"futures-core",
"http-range",
"log",
@ -113,7 +113,7 @@ dependencies = [
"brotli",
"bytes",
"bytestring",
"derive_more",
"derive_more 2.1.1",
"encoding_rs",
"flate2",
"foldhash",
@ -147,6 +147,44 @@ dependencies = [
"syn 2.0.117",
]
[[package]]
name = "actix-multipart"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5118a26dee7e34e894f7e85aa0ee5080ae4c18bf03c0e30d49a80e418f00a53"
dependencies = [
"actix-multipart-derive",
"actix-utils",
"actix-web",
"derive_more 0.99.20",
"futures-core",
"futures-util",
"httparse",
"local-waker",
"log",
"memchr",
"mime",
"rand 0.8.5",
"serde",
"serde_json",
"serde_plain",
"tempfile",
"tokio",
]
[[package]]
name = "actix-multipart-derive"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e11eb847f49a700678ea2fa73daeb3208061afa2b9d1a8527c03390f4c4a1c6b"
dependencies = [
"darling",
"parse-size",
"proc-macro2",
"quote",
"syn 2.0.117",
]
[[package]]
name = "actix-router"
version = "0.5.4"
@ -228,7 +266,7 @@ dependencies = [
"bytestring",
"cfg-if",
"cookie",
"derive_more",
"derive_more 2.1.1",
"encoding_rs",
"foldhash",
"futures-core",
@ -369,6 +407,16 @@ dependencies = [
"zeroize",
]
[[package]]
name = "aes-keywrap"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10b6f24a1f796bc46415a1d0d18dc0a8203ccba088acf5def3291c4f61225522"
dependencies = [
"aes 0.9.0-rc.4",
"byteorder",
]
[[package]]
name = "agent"
version = "0.2.9"
@ -556,6 +604,7 @@ version = "0.2.9"
dependencies = [
"actix",
"actix-cors",
"actix-multipart",
"actix-web",
"actix-ws",
"anyhow",
@ -566,6 +615,7 @@ dependencies = [
"email",
"frontend",
"futures",
"futures-util",
"git",
"mime_guess2",
"models",
@ -672,6 +722,12 @@ dependencies = [
"password-hash",
]
[[package]]
name = "arrayref"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76a2e8124351fda1ef8aaaa3bbd7ebbcb486bbcd4225aca0aa0d84bb2db8fecb"
[[package]]
name = "arrayvec"
version = "0.7.6"
@ -1141,6 +1197,12 @@ dependencies = [
"serde",
]
[[package]]
name = "binstring"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0669d5a35b64fdb5ab7fb19cae13148b6b5cbdf4b8247faf54ece47f699c8cef"
[[package]]
name = "bit-set"
version = "0.5.3"
@ -1201,6 +1263,17 @@ dependencies = [
"digest 0.10.7",
]
[[package]]
name = "blake2b_simd"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b79834656f71332577234b50bfc009996f7449e0c056884e6a02492ded0ca2f3"
dependencies = [
"arrayref",
"arrayvec",
"constant_time_eq",
]
[[package]]
name = "block-buffer"
version = "0.10.4"
@ -1538,6 +1611,17 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de0758edba32d61d1fd9f4d69491b47604b91ee2f7e6b33de7e54ca4ebe55dc3"
[[package]]
name = "coarsetime"
version = "0.1.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e58eb270476aa4fc7843849f8a35063e8743b4dbcdf6dd0f8ea0886980c204c2"
dependencies = [
"libc",
"wasix",
"wasm-bindgen",
]
[[package]]
name = "codepage"
version = "0.1.2"
@ -1631,6 +1715,12 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d52eff69cd5e647efe296129160853a42795992097e8af39800e1060caeea9b"
[[package]]
name = "convert_case"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e"
[[package]]
name = "convert_case"
version = "0.10.0"
@ -1728,9 +1818,9 @@ dependencies = [
[[package]]
name = "crc"
version = "3.4.0"
version = "3.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5eb8a2a1cd12ab0d987a5d5e825195d372001a4094a0376319d5a0ad71c1ba0d"
checksum = "9710d3b3739c2e349eb44fe848ad0b7c8cb1e42bd87ee49371df2f7acaf3e675"
dependencies = [
"crc-catalog",
]
@ -1858,6 +1948,12 @@ dependencies = [
"memchr",
]
[[package]]
name = "ct-codecs"
version = "1.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b10589d1a5e400d61f9f38f12f884cfd080ff345de8f17efda36fe0e4a02aa8"
[[package]]
name = "ctr"
version = "0.9.2"
@ -2090,6 +2186,19 @@ dependencies = [
"syn 2.0.117",
]
[[package]]
name = "derive_more"
version = "0.99.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6edb4b64a43d977b8e99788fe3a04d483834fba1215a7e02caa415b626497f7f"
dependencies = [
"convert_case 0.4.0",
"proc-macro2",
"quote",
"rustc_version",
"syn 2.0.117",
]
[[package]]
name = "derive_more"
version = "2.1.1"
@ -2203,6 +2312,17 @@ dependencies = [
"spki",
]
[[package]]
name = "ece-native"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30d8e2c05464ca407a32663c1f119abd2a0f8d948879c160fc6cf5b86b6c05d6"
dependencies = [
"aes-gcm 0.10.3",
"hkdf",
"sha2 0.10.9",
]
[[package]]
name = "ed25519"
version = "2.2.3"
@ -2213,6 +2333,16 @@ dependencies = [
"signature 2.2.0",
]
[[package]]
name = "ed25519-compact"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ce99a9e19c84beb4cc35ece85374335ccc398240712114c85038319ed709bd"
dependencies = [
"ct-codecs",
"getrandom 0.3.4",
]
[[package]]
name = "ed25519-dalek"
version = "2.2.0"
@ -2622,6 +2752,7 @@ name = "frontend"
version = "0.2.9"
dependencies = [
"lazy_static",
"md5",
"walkdir",
]
@ -3267,6 +3398,30 @@ dependencies = [
"digest 0.10.7",
]
[[package]]
name = "hmac-sha1-compact"
version = "1.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0b3ba31f6dc772cc8221ce81dbbbd64fa1e668255a6737d95eeace59b5a8823"
[[package]]
name = "hmac-sha256"
version = "1.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec9d92d097f4749b64e8cc33d924d9f40a2d4eb91402b458014b781f5733d60f"
dependencies = [
"digest 0.10.7",
]
[[package]]
name = "hmac-sha512"
version = "1.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "019ece39bbefc17f13f677a690328cb978dbf6790e141a3c24e66372cb38588b"
dependencies = [
"digest 0.10.7",
]
[[package]]
name = "home"
version = "0.5.12"
@ -3903,6 +4058,46 @@ dependencies = [
"serde_json",
]
[[package]]
name = "jwt-simple"
version = "0.12.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3991f54af4b009bb6efe01aa5a4fcce9ca52f3de7a104a3f6b6e2ad36c852c48"
dependencies = [
"anyhow",
"binstring",
"blake2b_simd",
"coarsetime",
"ct-codecs",
"ed25519-compact",
"hmac-sha1-compact",
"hmac-sha256",
"hmac-sha512",
"k256",
"p256",
"p384",
"rand 0.8.5",
"serde",
"serde_json",
"superboring",
"thiserror 2.0.18",
"zeroize",
]
[[package]]
name = "k256"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6e3919bbaa2945715f0bb6d3934a173d1e9a59ac23767fbaaef277265a7411b"
dependencies = [
"cfg-if",
"ecdsa",
"elliptic-curve",
"once_cell",
"sha2 0.10.9",
"signature 2.2.0",
]
[[package]]
name = "k8s-openapi"
version = "0.24.0"
@ -5033,6 +5228,12 @@ dependencies = [
"windows-link",
]
[[package]]
name = "parse-size"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "487f2ccd1e17ce8c1bfab3a65c89525af41cfad4c8659021a1e9a2aacd73b89b"
[[package]]
name = "password-hash"
version = "0.5.0"
@ -6597,7 +6798,7 @@ dependencies = [
"async-trait",
"bigdecimal",
"chrono",
"derive_more",
"derive_more 2.1.1",
"futures-util",
"itertools",
"log",
@ -6893,6 +7094,15 @@ dependencies = [
"zmij",
]
[[package]]
name = "serde_plain"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ce1fc6db65a611022b23a0dec6975d63fb80a302cb3388835ff02c097258d50"
dependencies = [
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
@ -6928,6 +7138,7 @@ dependencies = [
"async-openai",
"avatar",
"base64 0.22.1",
"base64ct",
"calamine",
"captcha-rs",
"chrono",
@ -6942,9 +7153,12 @@ dependencies = [
"git2",
"hex",
"hmac",
"http 1.4.0",
"jwt-simple",
"lopdf",
"models",
"moka",
"p256",
"pulldown-cmark",
"queue",
"quick-xml 0.37.5",
@ -6970,6 +7184,7 @@ dependencies = [
"utoipa",
"uuid",
"walkdir",
"web-push-native",
"zip 8.4.0",
]
@ -6982,7 +7197,7 @@ dependencies = [
"actix-web",
"anyhow",
"deadpool-redis",
"derive_more",
"derive_more 2.1.1",
"rand 0.10.0",
"redis",
"serde",
@ -7550,6 +7765,21 @@ version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "superboring"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d8af9125d1ea290cf5c297b94d0e518c939bbe1f45ef130c19525dae7afba99"
dependencies = [
"aes-gcm 0.10.3",
"aes-keywrap",
"getrandom 0.2.17",
"hmac-sha256",
"hmac-sha512",
"rand 0.8.5",
"rsa",
]
[[package]]
name = "syn"
version = "1.0.109"
@ -8342,6 +8572,15 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b"
[[package]]
name = "wasix"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1757e0d1f8456693c7e5c6c629bdb54884e032aa0bb53c155f6a39f94440d332"
dependencies = [
"wasi",
]
[[package]]
name = "wasm-bindgen"
version = "0.2.115"
@ -8445,6 +8684,23 @@ dependencies = [
"semver",
]
[[package]]
name = "web-push-native"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2175ef28a9a693fa88f322d88484f702a340da864a449a2b6b2a1fe26db712f3"
dependencies = [
"aes-gcm 0.10.3",
"base64ct",
"ece-native",
"hkdf",
"http 1.4.0",
"jwt-simple",
"p256",
"serde",
"sha2 0.10.9",
]
[[package]]
name = "web-sys"
version = "0.3.92"

View File

@ -106,7 +106,8 @@ sha1_smol = "1.0.1"
rsa = { version = "0.9.7", package = "rsa" }
reqwest = { version = "0.13.2", default-features = false }
dotenvy = "0.15.7"
aws-sdk-s3 = "1.127.0"
# aws-lc-sys requires NASM on Windows, so we use local filesystem storage instead of S3
# aws-sdk-s3 = "1.127.0"
sea-orm = "2.0.0-rc.37"
sea-orm-migration = "2.0.0-rc.37"
sha1 = { version = "0.10.6", features = ["compress"] }
@ -149,6 +150,7 @@ pulldown-cmark = "0.12"
quick-xml = "0.37"
sqlparser = "0.55"
lazy_static = "1.5"
md5 = "0.7"
moka = "0.12.15"
serde = "1.0.228"
serde_json = "1.0.149"
@ -157,6 +159,9 @@ serde_bytes = "0.11.19"
phf = "0.13.1"
phf_codegen = "0.13.1"
base64 = "0.22.1"
base64ct = "1"
p256 = { version = "0.13", features = ["ecdsa", "std"] }
http = "1"
tempfile = "3"
[workspace.package]

View File

@ -1,7 +1,7 @@
import type { NextConfig } from "next";
const nextConfig: NextConfig = {
serverExternalPackages: ["bcrypt"],
serverExternalPackages: ["bcrypt", "ioredis"],
turbopack: {
root: process.cwd(),
},

View File

@ -35,16 +35,20 @@ function createClusterClient(): Redis {
});
const firstUrl = new URL(REDIS_CLUSTER_URLS[0]);
const username = firstUrl.username || undefined;
const password = firstUrl.password || undefined;
// ioredis 5.x: Cluster 是 default export, redisOptions 展开到顶层, 无 clusterRetryStrategy
// ioredis 5.x: username/password 必须放在 redisOptions 里,不能放顶层
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const cluster = new (Cluster as any)(nodes, {
lazyConnect: true,
enableReadyCheck: true,
maxRetriesPerRequest: 3,
retryStrategy: (times: number) => Math.min(times * 100, 3000),
// 从第一个 URL 提取认证信息(所有节点共用相同密码)
username: firstUrl.username || undefined,
password: firstUrl.password || undefined,
redisOptions: {
username,
password,
retryStrategy: (times: number) => Math.min(times * 100, 3000),
},
});
return cluster as Redis;
@ -52,6 +56,16 @@ function createClusterClient(): Redis {
export function getRedis(): Redis {
if (!redis) {
const mode = REDIS_CLUSTER_URLS.length > 1 ? "cluster" : "single";
const clusterNodes = REDIS_CLUSTER_URLS.map((url) => {
const u = new URL(url);
return `${u.hostname}:${u.port}`;
});
console.log("[Redis] Initializing", {
mode,
clusterNodes,
singleUrl: mode === "single" ? REDIS_URL : undefined,
});
redis =
REDIS_CLUSTER_URLS.length > 1 ? createClusterClient() : createSingleClient();
}

View File

@ -42,10 +42,12 @@ actix-ws = { workspace = true, features = [] }
actix = { workspace = true, features = ["macros"] }
tokio-stream = { workspace = true, features = ["sync"] }
futures = { workspace = true }
futures-util = { workspace = true }
tokio = { workspace = true, features = ["sync", "rt"] }
chrono = { workspace = true }
mime_guess2 = { workspace = true, features = ["phf-map"] }
sea-orm = "2.0.0-rc.37"
rust_decimal = "1.40.0"
actix-multipart = { workspace = true, features = ["tempfile"] }
[lints]
workspace = true

View File

@ -1,29 +1,69 @@
use actix_web::{web, HttpResponse};
use actix_web::{http::header, web, HttpRequest, HttpResponse};
use mime_guess2::MimeGuess;
pub async fn serve_frontend(path: web::Path<String>) -> HttpResponse {
let path = path.into_inner();
let path = if path.is_empty() || path == "/" {
"index.html"
fn cache_control_header(path: &str) -> &'static str {
if path == "index.html" {
"no-cache, no-store, must-revalidate"
} else if path.ends_with(".js")
|| path.ends_with(".css")
|| path.ends_with(".woff2")
|| path.ends_with(".woff")
|| path.ends_with(".ttf")
|| path.ends_with(".otf")
|| path.ends_with(".png")
|| path.ends_with(".jpg")
|| path.ends_with(".jpeg")
|| path.ends_with(".gif")
|| path.ends_with(".svg")
|| path.ends_with(".ico")
|| path.ends_with(".webp")
|| path.ends_with(".avif")
|| path.ends_with(".map")
{
"public, max-age=31536000, immutable"
} else {
&path
};
match frontend::get_frontend_asset(path) {
Some(data) => {
let mime = MimeGuess::from_path(path).first_or_octet_stream();
HttpResponse::Ok()
.content_type(mime.as_ref())
.body(data.to_vec())
}
None => {
// Fallback to index.html for SPA routing
match frontend::get_frontend_asset("index.html") {
Some(data) => HttpResponse::Ok()
.content_type("text/html")
.body(data.to_vec()),
None => HttpResponse::NotFound().finish(),
}
}
"no-cache"
}
}
pub async fn serve_frontend(req: HttpRequest, path: web::Path<String>) -> HttpResponse {
let path = path.into_inner();
let path_str = if path.is_empty() || path == "/" {
"index.html"
} else {
path.as_str()
};
let cc = cache_control_header(path_str);
match frontend::get_frontend_asset_with_etag(path_str) {
Some((data, etag)) => {
// Check If-None-Match for conditional request
if let Some(if_none_match) = req.headers().get(header::IF_NONE_MATCH) {
if let Ok(client_etag) = if_none_match.to_str() {
if client_etag == etag {
return HttpResponse::NotModified()
.insert_header(("Cache-Control", cc))
.insert_header(("ETag", etag))
.finish();
}
}
}
let mime = MimeGuess::from_path(path_str).first_or_octet_stream();
HttpResponse::Ok()
.content_type(mime.as_ref())
.insert_header(("Cache-Control", cc))
.insert_header(("ETag", etag))
.body(data.to_vec())
}
None => match frontend::get_frontend_asset_with_etag("index.html") {
Some((data, etag)) => HttpResponse::Ok()
.content_type("text/html")
.insert_header(("Cache-Control", "no-cache, no-store, must-revalidate"))
.insert_header(("ETag", etag))
.body(data.to_vec()),
None => HttpResponse::NotFound().finish(),
},
}
}

View File

@ -8,6 +8,7 @@ pub mod pin;
pub mod reaction;
pub mod room;
pub mod thread;
pub mod upload;
pub mod ws;
pub mod ws_handler;
pub mod ws_types;
@ -173,6 +174,11 @@ pub fn init_room_routes(cfg: &mut web::ServiceConfig) {
.route(
"/me/notifications/{notification_id}/archive",
web::post().to(notification::notification_archive),
)
// file upload
.route(
"/rooms/{room_id}/upload",
web::post().to(upload::upload),
),
);
}

140
libs/api/room/upload.rs Normal file
View File

@ -0,0 +1,140 @@
use actix_multipart::Multipart;
use actix_web::{HttpResponse, Result, web};
use actix_web::http::header::{CONTENT_DISPOSITION, CONTENT_TYPE};
use futures_util::StreamExt;
use service::AppService;
use session::Session;
use uuid::Uuid;
#[derive(Debug, serde::Serialize, utoipa::ToSchema)]
pub struct UploadResponse {
pub url: String,
pub file_name: String,
pub file_size: i64,
pub content_type: String,
}
fn sanitize_key(key: &str) -> String {
key.replace(['/', '\\', ':'], "_")
}
fn extract_filename(disposition: &actix_web::http::header::HeaderValue) -> Option<String> {
let s = disposition.to_str().ok()?;
// Simple parsing: extract filename from Content-Disposition header
// e.g., "form-data; filename="test.png""
for part in s.split(';') {
let part = part.trim();
if part.starts_with("filename=") {
let value = &part[9..].trim_matches('"');
if !value.is_empty() {
return Some(value.to_string());
}
}
}
None
}
pub async fn upload(
service: web::Data<AppService>,
session: Session,
path: web::Path<Uuid>,
mut payload: Multipart,
) -> Result<HttpResponse, crate::error::ApiError> {
let user_id = session
.user()
.ok_or_else(|| crate::error::ApiError(service::error::AppError::Unauthorized))?;
let storage = service
.storage
.as_ref()
.ok_or_else(|| {
crate::error::ApiError(service::error::AppError::BadRequest(
"Storage not configured".to_string(),
))
})?;
let room_id = path.into_inner();
service
.room
.require_room_member(room_id, user_id)
.await
.map_err(crate::error::ApiError::from)?;
let max_size = service.config.storage_max_file_size();
let mut file_data: Vec<u8> = Vec::new();
let mut file_name = String::new();
let mut content_type = "application/octet-stream".to_string();
while let Some(item) = payload.next().await {
let mut field = item.map_err(|e| {
crate::error::ApiError(service::error::AppError::BadRequest(e.to_string()))
})?;
if let Some(disposition) = field.headers().get(&CONTENT_DISPOSITION) {
if let Some(name) = extract_filename(disposition) {
file_name = name;
}
}
if let Some(ct) = field.headers().get(&CONTENT_TYPE) {
if let Ok(ct_str) = ct.to_str() {
if !ct_str.is_empty() && ct_str != "application/octet-stream" {
content_type = ct_str.to_string();
}
}
}
while let Some(chunk) = field.next().await {
let data = chunk.map_err(|e| {
crate::error::ApiError(service::error::AppError::BadRequest(e.to_string()))
})?;
if file_data.len() + data.len() > max_size {
return Err(crate::error::ApiError(
service::error::AppError::BadRequest(format!(
"File exceeds maximum size of {} bytes",
max_size
)),
));
}
file_data.extend_from_slice(&data);
}
}
if file_data.is_empty() {
return Err(crate::error::ApiError(
service::error::AppError::BadRequest("No file provided".to_string()),
));
}
if file_name.is_empty() {
file_name = format!("upload_{}", Uuid::now_v7());
}
// Detect content type from file extension if still octet-stream
if content_type == "application/octet-stream" {
content_type = mime_guess2::from_path(&file_name)
.first_or_octet_stream()
.to_string();
}
let unique_name = format!("{}_{}", Uuid::now_v7(), sanitize_key(&file_name));
let key = format!("rooms/{}/{}", room_id, unique_name);
let file_size = file_data.len() as i64;
let url = storage
.upload(&key, file_data)
.await
.map_err(|e| {
crate::error::ApiError(service::error::AppError::InternalServerError(
e.to_string(),
))
})?;
Ok(crate::ApiResponse::ok(UploadResponse {
url,
file_name,
file_size,
content_type,
})
.to_response())
}

View File

@ -98,6 +98,8 @@ pub struct AiStreamChunkPayload {
pub content: String,
pub done: bool,
pub error: Option<String>,
/// Human-readable AI model name for display in the UI.
pub display_name: Option<String>,
}
impl From<RoomMessageStreamChunkEvent> for AiStreamChunkPayload {
@ -108,6 +110,7 @@ impl From<RoomMessageStreamChunkEvent> for AiStreamChunkPayload {
content: e.content,
done: e.done,
error: e.error,
display_name: e.display_name,
}
}
}

View File

@ -55,6 +55,14 @@ pub fn init_user_routes(cfg: &mut web::ServiceConfig) {
"/me/notifications/preferences",
web::patch().to(notification::update_notification_preferences),
)
.route(
"/me/notifications/push/vapid-key",
web::get().to(notification::get_vapid_public_key),
)
.route(
"/me/notifications/push/subscription",
web::delete().to(notification::unsubscribe_push),
)
.route(
"/me/heatmap",
web::get().to(chpc::get_my_contribution_heatmap),

View File

@ -1,8 +1,57 @@
use crate::{ApiResponse, error::ApiError};
use actix_web::{HttpResponse, Result, web};
use service::error::AppError;
use service::AppService;
use session::Session;
use crate::{error::ApiError, ApiResponse};
#[derive(serde::Serialize, utoipa::ToSchema)]
pub struct VapidKeyResponse {
pub public_key: String,
}
#[utoipa::path(
get,
path = "/api/users/me/notifications/push/vapid-key",
responses(
(status = 200, description = "Get VAPID public key for push subscription", body = ApiResponse<VapidKeyResponse>),
(status = 503, description = "Push notifications not configured"),
),
tag = "User"
)]
pub async fn get_vapid_public_key(
service: web::Data<AppService>,
) -> Result<HttpResponse, ApiError> {
let public_key = service
.config
.vapid_public_key();
let public_key = match public_key {
Some(k) => k,
None => {
let err: AppError = AppError::InternalError;
return Err(err.into());
}
};
Ok(ApiResponse::ok(VapidKeyResponse { public_key }).to_response())
}
#[utoipa::path(
delete,
path = "/api/users/me/notifications/push/subscription",
responses(
(status = 200, description = "Unsubscribe from push notifications", body = ApiResponse<serde_json::Value>),
(status = 401, description = "Unauthorized"),
),
tag = "User"
)]
pub async fn unsubscribe_push(
service: web::Data<AppService>,
session: Session,
) -> Result<HttpResponse, ApiError> {
service.user_unsubscribe_push(&session).await?;
Ok(ApiResponse::ok(serde_json::json!({ "success": true })).to_response())
}
#[utoipa::path(
get,
path = "/api/users/me/notifications/preferences",

View File

@ -22,7 +22,8 @@ impl AppConfig {
}
}
}
env = env.into_iter().chain(std::env::vars()).collect();
// Environment variables (e.g. K8s injected APP_DOMAIN_URL) take precedence over .env files
env = std::env::vars().chain(env).collect();
let this = AppConfig { env };
if let Err(config) = GLOBAL_CONFIG.set(this) {
eprintln!("Failed to set global config: {:?}", config);
@ -47,3 +48,4 @@ pub mod qdrant;
pub mod redis;
pub mod smtp;
pub mod ssh;
pub mod storage;

39
libs/config/storage.rs Normal file
View File

@ -0,0 +1,39 @@
use crate::AppConfig;
impl AppConfig {
pub fn storage_path(&self) -> String {
self.env
.get("STORAGE_PATH")
.cloned()
.unwrap_or_else(|| "/data/files".to_string())
}
pub fn storage_public_url(&self) -> String {
self.env
.get("STORAGE_PUBLIC_URL")
.cloned()
.unwrap_or_else(|| "/files".to_string())
}
pub fn storage_max_file_size(&self) -> usize {
self.env
.get("STORAGE_MAX_FILE_SIZE")
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(10 * 1024 * 1024) // 10MB default
}
pub fn vapid_public_key(&self) -> Option<String> {
self.env.get("VAPID_PUBLIC_KEY").cloned()
}
pub fn vapid_private_key(&self) -> Option<String> {
self.env.get("VAPID_PRIVATE_KEY").cloned()
}
pub fn vapid_sender_email(&self) -> String {
self.env
.get("VAPID_SENDER_EMAIL")
.cloned()
.unwrap_or_else(|| "mailto:admin@example.com".to_string())
}
}

View File

@ -8,3 +8,4 @@ lazy_static.workspace = true
[build-dependencies]
walkdir.workspace = true
md5.workspace = true

View File

@ -1,3 +1,4 @@
use md5::compute as md5_hash;
use std::{env, fs, path::PathBuf, process::Command};
fn run_pnpm(args: &[&str], cwd: &str) {
@ -66,15 +67,24 @@ fn main() {
let safe_name = key.replace('/', "_").replace('\\', "_");
let blob_path = blob_dir.join(&safe_name);
fs::copy(&file, &blob_path).unwrap();
// Compute ETag (MD5 hex of content) at build time
let content = fs::read(&file).unwrap();
let hash = md5_hash(&content);
let etag_literal = format!("\"{:x}\"", hash);
let key_literal = format!("\"{}\"", key.replace('"', "\\\""));
strs.push(format!(" ({}, include_bytes!(\"dist_blobs/{}\")),", key_literal, safe_name));
strs.push(format!(
" ({}, include_bytes!(\"dist_blobs/{}\"), {}),",
key_literal, safe_name, etag_literal
));
}
let out_file = out_dir.join("frontend.rs");
let content = format!(
"lazy_static::lazy_static! {{\n pub static ref FRONTEND: Vec<(&'static str, &'static [u8])> = vec![\n{} ];\n}}\n",
let generated = format!(
"lazy_static::lazy_static! {{\n pub static ref FRONTEND: Vec<(&'static str, &'static [u8], &'static str)> = vec![\n{} ];\n}}\n",
strs.join("\n")
);
fs::write(&out_file, content).unwrap();
fs::write(&out_file, generated).unwrap();
println!("cargo:include={}", out_file.display());
}

View File

@ -4,5 +4,12 @@ include!(concat!(env!("OUT_DIR"), "/frontend.rs"));
/// Returns the embedded frontend static asset for the given path, or `None` if not found.
pub fn get_frontend_asset(path: &str) -> Option<&'static [u8]> {
FRONTEND.iter().find(|(k, _)| *k == path).map(|(_, v)| *v)
FRONTEND.iter().find(|(k, _, _)| *k == path).map(|(_, v, _)| *v)
}
/// Returns the embedded frontend static asset and its ETag for the given path.
pub fn get_frontend_asset_with_etag(path: &str) -> Option<(&'static [u8], &'static str)> {
FRONTEND.iter()
.find(|(k, _, _)| *k == path)
.map(|(_, v, etag)| (v as &_, etag as &_))
}

View File

@ -82,6 +82,9 @@ impl MigratorTrait for Migrator {
Box::new(m20260415_000001_add_issue_id_to_agent_task::Migration),
Box::new(m20260416_000001_add_retry_count_to_agent_task::Migration),
Box::new(m20260417_000001_add_stream_to_room_ai::Migration),
Box::new(m20260420_000001_create_room_attachment::Migration),
Box::new(m20260420_000002_add_push_subscription::Migration),
Box::new(m20260420_000003_add_model_id_to_room_message::Migration),
// Repo tables
Box::new(m20250628_000028_create_repo::Migration),
Box::new(m20250628_000029_create_repo_branch::Migration),
@ -254,3 +257,5 @@ pub mod m20260414_000001_create_agent_task;
pub mod m20260415_000001_add_issue_id_to_agent_task;
pub mod m20260416_000001_add_retry_count_to_agent_task;
pub mod m20260417_000001_add_stream_to_room_ai;
pub mod m20260420_000001_create_room_attachment;
pub mod m20260420_000002_add_push_subscription;

View File

@ -0,0 +1,30 @@
//! SeaORM migration: create room_attachment table
use sea_orm_migration::prelude::*;
pub struct Migration;
impl MigrationName for Migration {
fn name(&self) -> &str {
"m20260420_000001_create_room_attachment"
}
}
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let sql = include_str!("sql/m20260420_000001_create_room_attachment.sql");
super::execute_sql(manager, sql).await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_raw(sea_orm::Statement::from_string(
sea_orm::DbBackend::Postgres,
"DROP TABLE IF EXISTS room_attachment;".to_string(),
))
.await?;
Ok(())
}
}

View File

@ -0,0 +1,35 @@
//! SeaORM migration: add push subscription fields to user_notification
use sea_orm_migration::prelude::*;
pub struct Migration;
impl MigrationName for Migration {
fn name(&self) -> &str {
"m20260420_000002_add_push_subscription"
}
}
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let sql = include_str!("sql/m20260420_000002_add_push_subscription.sql");
super::execute_sql(manager, sql).await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_raw(sea_orm::Statement::from_string(
sea_orm::DbBackend::Postgres,
r#"
ALTER TABLE user_notification
DROP COLUMN IF EXISTS push_subscription_endpoint,
DROP COLUMN IF EXISTS push_subscription_keys_p256dh,
DROP COLUMN IF EXISTS push_subscription_keys_auth;
"#.to_string(),
))
.await?;
Ok(())
}
}

View File

@ -0,0 +1,30 @@
//! SeaORM migration: add model_id column to room_message
use sea_orm_migration::prelude::*;
pub struct Migration;
impl MigrationName for Migration {
fn name(&self) -> &str {
"m20260420_000003_add_model_id_to_room_message"
}
}
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let sql = include_str!("sql/m20260420_000003_add_model_id_to_room_message.sql");
super::execute_sql(manager, sql).await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_raw(
sea_orm::DbBackend::Postgres,
"ALTER TABLE room_message DROP COLUMN IF EXISTS model_id;",
)
.await?;
Ok(())
}
}

View File

@ -0,0 +1,15 @@
CREATE TABLE IF NOT EXISTS room_attachment (
id UUID PRIMARY KEY,
room UUID NOT NULL,
message UUID NOT NULL,
uploader UUID NOT NULL,
file_name VARCHAR(255) NOT NULL,
file_size BIGINT NOT NULL,
content_type VARCHAR(100) NOT NULL,
s3_key VARCHAR(500) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_room_attachment_room ON room_attachment (room);
CREATE INDEX idx_room_attachment_message ON room_attachment (message);
CREATE INDEX idx_room_attachment_uploader ON room_attachment (uploader);

View File

@ -0,0 +1,5 @@
-- Add push subscription fields to user_notification
ALTER TABLE user_notification
ADD COLUMN push_subscription_endpoint TEXT,
ADD COLUMN push_subscription_keys_p256dh TEXT,
ADD COLUMN push_subscription_keys_auth TEXT;

View File

@ -0,0 +1,2 @@
ALTER TABLE room_message ADD COLUMN IF NOT EXISTS model_id UUID;
CREATE INDEX IF NOT EXISTS idx_room_message_model_id ON room_message (model_id) WHERE model_id IS NOT NULL;

View File

@ -114,6 +114,7 @@ impl std::fmt::Display for ToolCallStatus {
pub use room::Entity as Room;
pub use room_ai::Entity as RoomAi;
pub use room_attachment::Entity as RoomAttachment;
pub use room_category::Entity as RoomCategory;
pub use room_member::Entity as RoomMember;
pub use room_message::Entity as RoomMessage;
@ -125,6 +126,7 @@ pub use room_pin::Entity as RoomPin;
pub use room_thread::Entity as RoomThread;
pub mod room;
pub mod room_ai;
pub mod room_attachment;
pub mod room_category;
pub mod room_member;
pub mod room_message;

View File

@ -0,0 +1,22 @@
use crate::{DateTimeUtc, MessageId, RoomId, UserId};
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "room_attachment")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: Uuid,
pub room: RoomId,
pub message: MessageId,
pub uploader: UserId,
pub file_name: String,
pub file_size: i64,
pub content_type: String,
pub s3_key: String,
pub created_at: DateTimeUtc,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View File

@ -13,6 +13,8 @@ pub struct Model {
pub room: RoomId,
pub sender_type: MessageSenderType,
pub sender_id: Option<UserId>,
/// AI model ID — set when sender_type = "ai", used for display name lookups.
pub model_id: Option<Uuid>,
pub thread: Option<RoomThreadId>,
pub in_reply_to: Option<MessageId>,
pub content: String,

View File

@ -18,6 +18,10 @@ pub enum NotificationType {
RoomDeleted,
#[sea_orm(string_value = "system_announcement")]
SystemAnnouncement,
#[sea_orm(string_value = "project_invitation")]
ProjectInvitation,
#[sea_orm(string_value = "workspace_invitation")]
WorkspaceInvitation,
}
impl std::fmt::Display for NotificationType {
@ -29,6 +33,8 @@ impl std::fmt::Display for NotificationType {
NotificationType::RoomCreated => "room_created",
NotificationType::RoomDeleted => "room_deleted",
NotificationType::SystemAnnouncement => "system_announcement",
NotificationType::ProjectInvitation => "project_invitation",
NotificationType::WorkspaceInvitation => "workspace_invitation",
};
write!(f, "{}", s)
}

View File

@ -47,6 +47,9 @@ pub struct Model {
pub marketing_enabled: bool,
pub security_enabled: bool,
pub product_enabled: bool,
pub push_subscription_endpoint: Option<String>,
pub push_subscription_keys_p256dh: Option<String>,
pub push_subscription_keys_auth: Option<String>,
pub created_at: DateTimeUtc,
pub updated_at: DateTimeUtc,
}

View File

@ -11,6 +11,8 @@ pub struct RoomMessageEnvelope {
pub room_id: Uuid,
pub sender_type: String,
pub sender_id: Option<Uuid>,
/// AI model ID — set when sender_type = "ai", used for display name lookups.
pub model_id: Option<Uuid>,
pub thread_id: Option<Uuid>,
pub in_reply_to: Option<Uuid>,
pub content: String,
@ -87,6 +89,8 @@ pub struct RoomMessageStreamChunkEvent {
pub content: String,
pub done: bool,
pub error: Option<String>,
/// Human-readable AI model name (e.g. "Claude 3.5 Sonnet") for display.
pub display_name: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@ -10,7 +10,7 @@ use uuid::Uuid;
use db::database::AppDatabase;
use models::rooms::{MessageContentType, MessageSenderType, room_message};
use queue::{AgentTaskEvent, ProjectRoomEvent, RoomMessageEnvelope, RoomMessageEvent, RoomMessageStreamChunkEvent};
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, Set};
use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter, Set};
use crate::error::RoomError;
use crate::metrics::RoomMetrics;
@ -720,6 +720,7 @@ pub fn make_persist_fn(
room: Set(env.room_id),
sender_type: Set(sender_type),
sender_id: Set(env.sender_id),
model_id: Set(env.model_id),
thread: Set(env.thread_id),
content: Set(env.content.clone()),
content_type: Set(content_type),
@ -736,6 +737,21 @@ pub fn make_persist_fn(
room_message::Entity::insert_many(models_to_insert)
.exec(&db)
.await?;
// Update content_tsv for inserted messages
for env in chunk.iter() {
let update_sql = format!(
"UPDATE room_message SET content_tsv = to_tsvector('simple', content) WHERE id = '{}'",
env.id
);
let stmt = sea_orm::Statement::from_sql_and_values(
sea_orm::DbBackend::Postgres,
&update_sql,
vec![],
);
let _ = db.execute_raw(stmt).await;
}
metrics.messages_persisted.increment(count);
}
}

View File

@ -73,6 +73,7 @@ impl From<room_message::Model> for super::RoomMessageResponse {
revoked: value.revoked,
revoked_by: value.revoked_by,
in_reply_to: value.in_reply_to,
highlighted_content: None,
}
}
}
@ -388,8 +389,8 @@ impl RoomService {
let sender_type = msg.sender_type.to_string();
let display_name = match sender_type.as_str() {
"ai" => {
if let Some(sender_id) = msg.sender_id {
ai_model::Entity::find_by_id(sender_id)
if let Some(mid) = msg.model_id {
ai_model::Entity::find_by_id(mid)
.one(&self.db)
.await
.ok()
@ -429,6 +430,7 @@ impl RoomService {
revoked: msg.revoked,
revoked_by: msg.revoked_by,
in_reply_to: msg.in_reply_to,
highlighted_content: None,
}
}
}

View File

@ -30,5 +30,5 @@ pub use draft_and_history::{
pub use error::RoomError;
pub use metrics::RoomMetrics;
pub use reaction::{MessageReactionsResponse, MessageSearchResponse};
pub use service::RoomService;
pub use service::{RoomService, PushNotificationFn};
pub use types::{RoomEventType, *};

View File

@ -9,6 +9,9 @@ use sea_orm::*;
use uuid::Uuid;
impl RoomService {
/// Cache TTL for member list (in seconds).
const MEMBER_LIST_CACHE_TTL: u64 = 30;
pub async fn room_member_list(
&self,
room_id: Uuid,
@ -17,6 +20,24 @@ impl RoomService {
let user_id = ctx.user_id;
self.require_room_member(room_id, user_id).await?;
// Try cache first
let cache_key = format!("room:members:{}", room_id);
if let Ok(mut conn) = self.cache.conn().await {
if let Ok(Some(cached)) = redis::cmd("GET")
.arg(&cache_key)
.query_async::<Option<String>>(&mut conn)
.await
{
if let Ok(responses) = serde_json::from_str::<Vec<super::RoomMemberResponse>>(&cached) {
slog::debug!(self.log, "room_member_list: cache hit for key={}", cache_key);
return Ok(responses);
}
}
}
slog::debug!(self.log, "room_member_list: cache miss for key={}", cache_key);
let members = room_member::Entity::find()
.filter(room_member::Column::Room.eq(room_id))
.all(&self.db)
@ -60,6 +81,23 @@ impl RoomService {
dnd_end_hour: m.dnd_end_hour,
})
.collect();
// Cache the result
if let Ok(mut conn) = self.cache.conn().await {
if let Ok(json) = serde_json::to_string(&responses) {
let _: Option<String> = redis::cmd("SETEX")
.arg(&cache_key)
.arg(Self::MEMBER_LIST_CACHE_TTL)
.arg(&json)
.query_async(&mut conn)
.await
.inspect_err(|e| {
slog::warn!(self.log, "room_member_list: failed to cache key={}: {}", cache_key, e);
})
.ok();
}
}
Ok(responses)
}
@ -121,6 +159,9 @@ impl RoomService {
drop(self.room_manager.subscribe(room_id, request.user_id).await);
// Invalidate member list cache
self.invalidate_member_list_cache(room_id).await;
self.publish_room_event(
room_model.project,
super::RoomEventType::MemberJoined,
@ -198,6 +239,9 @@ impl RoomService {
active.role = Set(new_role);
let updated = active.update(&self.db).await?;
// Invalidate member list cache
self.invalidate_member_list_cache(room_id).await;
let room = self.find_room_or_404(room_id).await?;
let _ = self
.notification_create(super::NotificationCreateRequest {
@ -264,6 +308,9 @@ impl RoomService {
.exec(&self.db)
.await?;
// Invalidate member list cache
self.invalidate_member_list_cache(room_id).await;
self.room_manager.unsubscribe(room_id, user_id).await;
let room = self.find_room_or_404(room_id).await?;
@ -367,4 +414,20 @@ impl RoomService {
};
Ok(updated_response)
}
/// Invalidate member list cache for a room.
async fn invalidate_member_list_cache(&self, room_id: Uuid) {
let cache_key = format!("room:members:{}", room_id);
if let Ok(mut conn) = self.cache.conn().await {
if let Err(e) = redis::cmd("DEL")
.arg(&cache_key)
.query_async::<i64>(&mut conn)
.await
{
slog::warn!(self.log, "invalidate_member_list_cache: DEL failed for {}: {}", cache_key, e);
} else {
slog::debug!(self.log, "invalidate_member_list_cache: deleted {}", cache_key);
}
}
}
}

View File

@ -44,7 +44,7 @@ impl RoomService {
let ai_model_ids: Vec<Uuid> = models
.iter()
.filter(|m| m.sender_type.to_string() == "ai")
.filter_map(|m| m.sender_id)
.filter_map(|m| m.model_id)
.collect();
let users: std::collections::HashMap<Uuid, String> = if !user_ids.is_empty() {
@ -78,7 +78,7 @@ impl RoomService {
.map(|msg| {
let sender_type = msg.sender_type.to_string();
let display_name = match sender_type.as_str() {
"ai" => msg.sender_id.and_then(|id| ai_names.get(&id).cloned()),
"ai" => msg.model_id.and_then(|id| ai_names.get(&id).cloned()),
_ => msg.sender_id.and_then(|id| users.get(&id).cloned()),
};
super::RoomMessageResponse {
@ -96,6 +96,7 @@ impl RoomService {
send_at: msg.send_at,
revoked: msg.revoked,
revoked_by: msg.revoked_by,
highlighted_content: None,
}
})
.collect();
@ -146,6 +147,7 @@ impl RoomService {
room_id,
sender_type: "member".to_string(),
sender_id: Some(user_id),
model_id: None,
thread_id,
in_reply_to,
content: content.clone(),
@ -275,6 +277,7 @@ impl RoomService {
send_at: now,
revoked: None,
revoked_by: None,
highlighted_content: None,
})
}

View File

@ -30,6 +30,12 @@ impl RoomService {
super::NotificationType::SystemAnnouncement => {
room_notifications::NotificationType::SystemAnnouncement
}
super::NotificationType::ProjectInvitation => {
room_notifications::NotificationType::ProjectInvitation
}
super::NotificationType::WorkspaceInvitation => {
room_notifications::NotificationType::WorkspaceInvitation
}
};
let model = room_notifications::ActiveModel {
@ -261,10 +267,20 @@ impl RoomService {
user_id: Uuid,
notification: super::NotificationResponse,
) {
let event = super::NotificationEvent::new(notification);
let event = super::NotificationEvent::new(notification.clone());
self.room_manager
.push_user_notification(user_id, Arc::new(event))
.await;
// Also trigger Web Push for offline users
if let Some(push_fn) = &self.push_fn {
push_fn(
user_id,
notification.title.clone(),
notification.content.clone(),
None, // URL — could be derived from room/project
);
}
}
fn unread_cache_key(user_id: Uuid) -> String {

View File

@ -325,6 +325,7 @@ impl RoomService {
send_at: msg.send_at,
revoked: msg.revoked,
revoked_by: msg.revoked_by,
highlighted_content: None,
}
})
.collect()

View File

@ -11,6 +11,9 @@ use sea_orm::*;
use uuid::Uuid;
impl RoomService {
/// Cache TTL for room list (in seconds).
const ROOM_LIST_CACHE_TTL: u64 = 60;
pub async fn room_list(
&self,
project_name: String,
@ -21,6 +24,29 @@ impl RoomService {
let project = self.utils_find_project_by_name(project_name).await?;
self.check_project_access(project.id, user_id).await?;
// Try cache first
let cache_key = format!(
"room:list:{}:{}:public={}",
project.id,
user_id,
only_public.unwrap_or(false)
);
if let Ok(mut conn) = self.cache.conn().await {
if let Ok(Some(cached)) = redis::cmd("GET")
.arg(&cache_key)
.query_async::<Option<String>>(&mut conn)
.await
{
if let Ok(responses) = serde_json::from_str::<Vec<super::RoomResponse>>(&cached) {
slog::debug!(self.log, "room_list: cache hit for key={}", cache_key);
return Ok(responses);
}
}
}
slog::debug!(self.log, "room_list: cache miss for key={}", cache_key);
let mut query = room::Entity::find().filter(room::Column::Project.eq(project.id));
if only_public.unwrap_or(false) {
query = query.filter(room::Column::Public.eq(true));
@ -66,6 +92,22 @@ impl RoomService {
responses.push(response);
}
// Cache the result
if let Ok(mut conn) = self.cache.conn().await {
if let Ok(json) = serde_json::to_string(&responses) {
let _: Option<String> = redis::cmd("SETEX")
.arg(&cache_key)
.arg(Self::ROOM_LIST_CACHE_TTL)
.arg(&json)
.query_async(&mut conn)
.await
.inspect_err(|e| {
slog::warn!(self.log, "room_list: failed to cache key={}: {}", cache_key, e);
})
.ok();
}
}
Ok(responses)
}
@ -156,6 +198,9 @@ impl RoomService {
txn.commit().await?;
// Invalidate room list cache for this project
self.invalidate_room_list_cache(project.id).await;
self.spawn_room_workers(room_model.id);
let event = ProjectRoomEvent {
@ -232,6 +277,9 @@ impl RoomService {
}
let updated = active.update(&self.db).await?;
// Invalidate room list cache
self.invalidate_room_list_cache(updated.project).await;
if renamed {
let event = ProjectRoomEvent {
event_type: super::RoomEventType::RoomRenamed.as_str().into(),
@ -303,6 +351,9 @@ impl RoomService {
txn.commit().await?;
// Invalidate room list cache
self.invalidate_room_list_cache(project_id).await;
self.room_manager.shutdown_room(room_id).await;
// Clean up Redis seq key so re-creating the room starts fresh
@ -342,4 +393,49 @@ impl RoomService {
Ok(())
}
/// Invalidate all room list cache entries for a project.
async fn invalidate_room_list_cache(&self, project_id: Uuid) {
let pattern = format!("room:list:{}:*", project_id);
if let Ok(mut conn) = self.cache.conn().await {
// Use SCAN to find matching keys, then DELETE them
let mut cursor: u64 = 0;
loop {
let (new_cursor, keys): (u64, Vec<String>) = match redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg(&pattern)
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await
{
Ok(result) => result,
Err(e) => {
slog::warn!(self.log, "invalidate_room_list_cache: SCAN failed: {}", e);
break;
}
};
cursor = new_cursor;
if !keys.is_empty() {
// Delete keys in batches
let keys_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
if let Err(e) = redis::cmd("DEL")
.arg(&keys_refs)
.query_async::<i64>(&mut conn)
.await
{
slog::warn!(self.log, "invalidate_room_list_cache: DEL failed: {}", e);
} else {
slog::debug!(self.log, "invalidate_room_list_cache: deleted {} keys", keys.len());
}
}
if cursor == 0 {
break;
}
}
}
}
}

View File

@ -1,5 +1,6 @@
use crate::error::RoomError;
use crate::service::RoomService;
use crate::types::RoomMessageSearchRequest;
use crate::ws_context::WsUserContext;
use chrono::Utc;
use models::rooms::{room_message, room_message_reaction};
@ -11,139 +12,177 @@ impl RoomService {
pub async fn room_message_search(
&self,
room_id: Uuid,
query: &str,
limit: Option<u64>,
offset: Option<u64>,
request: RoomMessageSearchRequest,
ctx: &WsUserContext,
) -> Result<super::MessageSearchResponse, RoomError> {
let user_id = ctx.user_id;
self.require_room_member(room_id, user_id).await?;
if query.trim().is_empty() {
if request.q.trim().is_empty() {
return Ok(super::MessageSearchResponse {
messages: Vec::new(),
total: 0,
});
}
let limit = std::cmp::min(limit.unwrap_or(20), 100);
let offset = offset.unwrap_or(0);
let limit = std::cmp::min(request.limit.unwrap_or(20), 100);
let offset = request.offset.unwrap_or(0);
// PostgreSQL full-text search via raw SQL with parameterized query.
// plainto_tsquery('simple', $1) is injection-safe — it treats input as text.
let sql = r#"
// Build dynamic WHERE conditions
let mut conditions = vec![
"room = $1".to_string(),
"content_tsv @@ plainto_tsquery('simple', $2)".to_string(),
"revoked IS NULL".to_string(),
];
let mut param_index = 3;
let mut params: Vec<sea_orm::Value> = vec![room_id.into(), request.q.trim().into()];
// Add time range filter
if let Some(start_time) = request.start_time {
conditions.push(format!("send_at >= ${}", param_index));
params.push(start_time.into());
param_index += 1;
}
if let Some(end_time) = request.end_time {
conditions.push(format!("send_at <= ${}", param_index));
params.push(end_time.into());
param_index += 1;
}
// Add sender filter
if let Some(sender_id) = request.sender_id {
conditions.push(format!("sender_id = ${}", param_index));
params.push(sender_id.into());
param_index += 1;
}
// Add content type filter
if let Some(ref content_type) = request.content_type {
conditions.push(format!("content_type = ${}", param_index));
params.push(content_type.clone().into());
param_index += 1;
}
let where_clause = conditions.join(" AND ");
// PostgreSQL full-text search with highlighting via raw SQL.
// Uses ts_headline for result highlighting with <mark> tags.
let sql = format!(
r#"
SELECT id, seq, room, sender_type, sender_id, thread, in_reply_to,
content, content_type, edited_at, send_at, revoked, revoked_by
content, content_type, edited_at, send_at, revoked, revoked_by,
ts_headline('simple', content, plainto_tsquery('simple', $2),
'StartSel=<mark>, StopSel=</mark>, MaxWords=50, MinWords=15') AS highlighted_content
FROM room_message
WHERE room = $1
AND content_tsv @@ plainto_tsquery('simple', $2)
AND revoked IS NULL
WHERE {}
ORDER BY send_at DESC
LIMIT $3 OFFSET $4"#;
let stmt = Statement::from_sql_and_values(
DbBackend::Postgres,
sql,
vec![
room_id.into(),
query.trim().into(),
limit.into(),
offset.into(),
],
LIMIT ${} OFFSET ${}"#,
where_clause,
param_index,
param_index + 1
);
let rows: Vec<room_message::Model> = self
.db
.query_all_raw(stmt)
.await?
.into_iter()
.map(|row| {
let sender_type = row
.try_get::<String>("", "sender_type")
.map(|s| match s.as_str() {
"admin" => models::rooms::MessageSenderType::Admin,
"owner" => models::rooms::MessageSenderType::Owner,
"ai" => models::rooms::MessageSenderType::Ai,
"system" => models::rooms::MessageSenderType::System,
"tool" => models::rooms::MessageSenderType::Tool,
"guest" => models::rooms::MessageSenderType::Guest,
_ => models::rooms::MessageSenderType::Member,
})
.unwrap_or(models::rooms::MessageSenderType::Member);
params.push(limit.into());
params.push(offset.into());
let content_type = row
.try_get::<String>("", "content_type")
.map(|s| match s.as_str() {
"image" => models::rooms::MessageContentType::Image,
"audio" => models::rooms::MessageContentType::Audio,
"video" => models::rooms::MessageContentType::Video,
"file" => models::rooms::MessageContentType::File,
_ => models::rooms::MessageContentType::Text,
})
.unwrap_or(models::rooms::MessageContentType::Text);
let stmt = Statement::from_sql_and_values(DbBackend::Postgres, &sql, params);
room_message::Model {
id: row.try_get::<MessageId>("", "id").unwrap_or_default(),
seq: row.try_get::<Seq>("", "seq").unwrap_or_default(),
room: row.try_get::<RoomId>("", "room").unwrap_or_default(),
sender_type,
sender_id: row
.try_get::<Option<UserId>>("", "sender_id")
.ok()
.flatten(),
thread: row
.try_get::<Option<RoomThreadId>>("", "thread")
.ok()
.flatten(),
in_reply_to: row
.try_get::<Option<MessageId>>("", "in_reply_to")
.ok()
.flatten(),
content: row.try_get::<String>("", "content").unwrap_or_default(),
content_type,
edited_at: row
.try_get::<Option<DateTimeUtc>>("", "edited_at")
.ok()
.flatten(),
send_at: row
.try_get::<DateTimeUtc>("", "send_at")
.unwrap_or_default(),
revoked: row
.try_get::<Option<DateTimeUtc>>("", "revoked")
.ok()
.flatten(),
revoked_by: row
.try_get::<Option<UserId>>("", "revoked_by")
.ok()
.flatten(),
content_tsv: None,
}
})
.collect();
let rows = self.db.query_all_raw(stmt).await?;
// Efficient COUNT query.
let count_sql = r#"
SELECT COUNT(*) AS count
FROM room_message
WHERE room = $1
AND content_tsv @@ plainto_tsquery('simple', $2)
AND revoked IS NULL"#;
// Parse results and build response with highlighted content
let mut results: Vec<super::RoomMessageResponse> = Vec::new();
let count_stmt = Statement::from_sql_and_values(
DbBackend::Postgres,
count_sql,
vec![room_id.into(), query.trim().into()],
for row in rows {
let sender_type_str = row.try_get::<String>("", "sender_type").unwrap_or_default();
let sender_type = match sender_type_str.as_str() {
"admin" => models::rooms::MessageSenderType::Admin,
"owner" => models::rooms::MessageSenderType::Owner,
"ai" => models::rooms::MessageSenderType::Ai,
"system" => models::rooms::MessageSenderType::System,
"tool" => models::rooms::MessageSenderType::Tool,
"guest" => models::rooms::MessageSenderType::Guest,
_ => models::rooms::MessageSenderType::Member,
};
let content_type_str = row.try_get::<String>("", "content_type").unwrap_or_default();
let content_type = match content_type_str.as_str() {
"image" => models::rooms::MessageContentType::Image,
"audio" => models::rooms::MessageContentType::Audio,
"video" => models::rooms::MessageContentType::Video,
"file" => models::rooms::MessageContentType::File,
_ => models::rooms::MessageContentType::Text,
};
let msg = room_message::Model {
id: row.try_get::<MessageId>("", "id").unwrap_or_default(),
seq: row.try_get::<Seq>("", "seq").unwrap_or_default(),
room: row.try_get::<RoomId>("", "room").unwrap_or_default(),
sender_type,
sender_id: row.try_get::<Option<UserId>>("", "sender_id").ok().flatten(),
thread: row.try_get::<Option<RoomThreadId>>("", "thread").ok().flatten(),
in_reply_to: row.try_get::<Option<MessageId>>("", "in_reply_to").ok().flatten(),
content: row.try_get::<String>("", "content").unwrap_or_default(),
content_type,
edited_at: row.try_get::<Option<DateTimeUtc>>("", "edited_at").ok().flatten(),
send_at: row.try_get::<DateTimeUtc>("", "send_at").unwrap_or_default(),
revoked: row.try_get::<Option<DateTimeUtc>>("", "revoked").ok().flatten(),
revoked_by: row.try_get::<Option<UserId>>("", "revoked_by").ok().flatten(),
content_tsv: None,
};
let highlighted_content = row
.try_get::<String>("", "highlighted_content")
.unwrap_or_else(|_| msg.content.clone());
// Resolve display name for this message
let message_with_name = self.resolve_display_name(msg.clone(), room_id).await;
let mut msg_with_name = message_with_name;
msg_with_name.highlighted_content = Some(highlighted_content);
results.push(msg_with_name);
}
// COUNT query for total (without pagination)
let mut count_conditions = vec![
"room = $1".to_string(),
"content_tsv @@ plainto_tsquery('simple', $2)".to_string(),
"revoked IS NULL".to_string(),
];
let mut count_params: Vec<sea_orm::Value> = vec![room_id.into(), request.q.trim().into()];
let mut count_param_idx = 3;
if let Some(start_time) = request.start_time {
count_conditions.push(format!("send_at >= ${}", count_param_idx));
count_params.push(start_time.into());
count_param_idx += 1;
}
if let Some(end_time) = request.end_time {
count_conditions.push(format!("send_at <= ${}", count_param_idx));
count_params.push(end_time.into());
count_param_idx += 1;
}
if let Some(sender_id) = request.sender_id {
count_conditions.push(format!("sender_id = ${}", count_param_idx));
count_params.push(sender_id.into());
count_param_idx += 1;
}
if let Some(ref content_type) = request.content_type {
count_conditions.push(format!("content_type = ${}", count_param_idx));
count_params.push(content_type.clone().into());
}
let count_sql = format!(
"SELECT COUNT(*) AS count FROM room_message WHERE {}",
count_conditions.join(" AND ")
);
let count_stmt = Statement::from_sql_and_values(DbBackend::Postgres, &count_sql, count_params);
let count_row = self.db.query_one_raw(count_stmt).await?;
let total: i64 = count_row
.and_then(|r| r.try_get::<i64>("", "count").ok())
.unwrap_or(0);
let response_messages = self.build_messages_with_display_names(rows).await;
Ok(super::MessageSearchResponse {
messages: response_messages,
messages: results,
total,
})
}

View File

@ -24,6 +24,11 @@ use models::agent_task::AgentType;
const DEFAULT_MAX_CONCURRENT_WORKERS: usize = 1024;
/// Callback type for sending push notifications.
/// The caller (AppService) provides this to RoomService so it can trigger
/// browser push notifications without depending on the service crate directly.
pub type PushNotificationFn = Arc<dyn Fn(Uuid, String, Option<String>, Option<String>) + Send + Sync>;
/// Legacy: <user>uuid</user> or <user>username</user>
static USER_MENTION_RE: LazyLock<regex_lite::Regex, fn() -> regex_lite::Regex> =
LazyLock::new(|| regex_lite::Regex::new(r"<user>\s*([^<]+?)\s*</user>").unwrap());
@ -54,6 +59,7 @@ pub struct RoomService {
pub chat_service: Option<Arc<ChatService>>,
pub task_service: Option<Arc<TaskService>>,
pub log: slog::Logger,
pub push_fn: Option<PushNotificationFn>,
worker_semaphore: Arc<tokio::sync::Semaphore>,
dedup_cache: DedupCache,
}
@ -69,6 +75,7 @@ impl RoomService {
task_service: Option<Arc<TaskService>>,
log: slog::Logger,
max_concurrent_workers: Option<usize>,
push_fn: Option<PushNotificationFn>,
) -> Self {
let dedup_cache: DedupCache =
Arc::new(DashMap::with_capacity_and_hasher(10000, Default::default()));
@ -85,6 +92,7 @@ impl RoomService {
max_concurrent_workers.unwrap_or(DEFAULT_MAX_CONCURRENT_WORKERS),
)),
dedup_cache,
push_fn,
}
}
@ -523,6 +531,12 @@ impl RoomService {
super::NotificationType::SystemAnnouncement => {
room_notifications::NotificationType::SystemAnnouncement
}
super::NotificationType::ProjectInvitation => {
room_notifications::NotificationType::ProjectInvitation
}
super::NotificationType::WorkspaceInvitation => {
room_notifications::NotificationType::WorkspaceInvitation
}
};
let _model = room_notifications::ActiveModel {
@ -975,7 +989,9 @@ impl RoomService {
let room_manager = room_manager.clone();
let db = db.clone();
let model_id = model_id;
let ai_display_name = ai_display_name;
// Clone before closure so closure captures clone, not the original.
let ai_display_name_for_chunk = ai_display_name.clone();
let ai_display_name_for_final = ai_display_name.clone();
let streaming_msg_id = streaming_msg_id;
let room_id_for_chunk = room_id_inner;
@ -988,6 +1004,8 @@ impl RoomService {
let streaming_msg_id = streaming_msg_id;
let room_id = room_id_for_chunk;
let chunk_count = chunk_count.clone();
// Clone display_name INSIDE the async block so the outer closure stays `Fn`.
let ai_display_name_for_chunk = ai_display_name_for_chunk.clone();
async move {
let event = RoomMessageStreamChunkEvent {
message_id: streaming_msg_id,
@ -995,6 +1013,7 @@ impl RoomService {
content: chunk.content,
done: chunk.done,
error: None,
display_name: Some(ai_display_name_for_chunk),
};
room_manager.broadcast_stream_chunk(event).await;
@ -1026,6 +1045,7 @@ impl RoomService {
room_id: room_id_inner,
sender_type: sender_type.clone(),
sender_id: None,
model_id: Some(model_id),
thread_id: None,
content: full_content.clone(),
content_type: "text".to_string(),
@ -1062,7 +1082,7 @@ impl RoomService {
content_type: "text".to_string(),
send_at: now,
seq,
display_name: Some(ai_display_name.clone()),
display_name: Some(ai_display_name_for_final.clone()),
in_reply_to: None,
reactions: None,
message_id: None,
@ -1092,6 +1112,7 @@ impl RoomService {
content: String::new(),
done: true,
error: Some(e.to_string()),
display_name: Some(ai_display_name.clone()),
};
room_manager.broadcast_stream_chunk(event).await;
}
@ -1134,6 +1155,7 @@ impl RoomService {
project_id_for_ai,
Uuid::now_v7(),
response,
model_id_inner,
Some(model_display_name),
)
.await
@ -1172,6 +1194,7 @@ impl RoomService {
project_id: Uuid,
_reply_to: Uuid,
content: String,
model_id: Uuid,
model_display_name: Option<String>,
) -> Result<Uuid, RoomError> {
let now = Utc::now();
@ -1184,6 +1207,7 @@ impl RoomService {
room_id,
sender_type: "ai".to_string(),
sender_id: None,
model_id: Some(model_id),
thread_id: None,
content: content.clone(),
content_type: "text".to_string(),

View File

@ -126,7 +126,7 @@ pub struct RoomUpdateRequest {
pub category: Option<Uuid>,
}
#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
pub struct RoomResponse {
pub id: Uuid,
pub project: Uuid,
@ -157,7 +157,7 @@ pub struct RoomMemberReadSeqRequest {
pub last_read_seq: i64,
}
#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
pub struct RoomMemberResponse {
pub room: Uuid,
pub user: Uuid,
@ -192,6 +192,17 @@ pub struct RoomMessageUpdateRequest {
pub content: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, utoipa::ToSchema)]
pub struct RoomMessageSearchRequest {
pub q: String,
pub start_time: Option<DateTime<Utc>>,
pub end_time: Option<DateTime<Utc>>,
pub sender_id: Option<Uuid>,
pub content_type: Option<String>,
pub limit: Option<u64>,
pub offset: Option<u64>,
}
#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]
pub struct RoomMessageResponse {
pub id: Uuid,
@ -208,6 +219,16 @@ pub struct RoomMessageResponse {
pub send_at: DateTime<Utc>,
pub revoked: Option<DateTime<Utc>>,
pub revoked_by: Option<Uuid>,
/// Highlighted content with <mark> tags around matched terms (for search results)
#[serde(skip_serializing_if = "Option::is_none")]
pub highlighted_content: Option<String>,
}
/// Search result wrapper (keeps API compatibility)
#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]
pub struct RoomMessageSearchResult {
#[serde(flatten)]
pub message: RoomMessageResponse,
}
#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]
@ -285,6 +306,8 @@ pub enum NotificationType {
RoomCreated,
RoomDeleted,
SystemAnnouncement,
ProjectInvitation,
WorkspaceInvitation,
}
#[derive(Debug, Clone, Serialize, utoipa::ToSchema)]

View File

@ -43,6 +43,10 @@ base64 = { workspace = true }
rsa = { workspace = true }
rand = { workspace = true }
hex = { workspace = true }
base64ct = { workspace = true }
p256 = { workspace = true }
jwt-simple = { version = "0.12.6", features = ["pure-rust"], default-features = false }
http = { workspace = true }
sha2 = { workspace = true }
hmac = { workspace = true }
sha1 = { workspace = true }
@ -65,6 +69,7 @@ zip = { workspace = true }
regex = { workspace = true }
flate2 = { workspace = true }
tempfile = { workspace = true }
web-push-native = { version = "0.4.0", features = ["vapid"] }
[lints]
workspace = true

View File

@ -20,6 +20,11 @@ use slog::{Drain, OwnedKVList, Record};
use utoipa::ToSchema;
use ws_token::WsTokenService;
pub mod storage;
pub use storage::AppStorage;
pub mod push;
pub use push::{WebPushService, PushPayload};
#[derive(Clone)]
pub struct AppService {
pub db: AppDatabase,
@ -31,6 +36,48 @@ pub struct AppService {
pub room: RoomService,
pub ws_token: Arc<WsTokenService>,
pub queue_producer: MessageProducer,
pub storage: Option<AppStorage>,
pub push: Option<WebPushService>,
}
impl AppService {
/// Send a Web Push notification to a specific user.
/// Reads the user's push subscription from `user_notification` table.
/// Non-blocking: failures are logged but don't affect the caller.
pub fn send_push_to_user(&self, user_id: uuid::Uuid, payload: PushPayload) {
let push = self.push.clone();
let db = self.db.clone();
let log = self.logs.clone();
tokio::spawn(async move {
if let Some(push) = push {
use models::users::user_notification;
use sea_orm::EntityTrait;
let prefs = user_notification::Entity::find_by_id(user_id)
.one(&db)
.await;
if let Ok(Some(prefs)) = prefs {
if prefs.push_enabled
&& prefs.push_subscription_endpoint.is_some()
&& prefs.push_subscription_keys_p256dh.is_some()
&& prefs.push_subscription_keys_auth.is_some()
{
let endpoint = prefs.push_subscription_endpoint.unwrap();
let p256dh = prefs.push_subscription_keys_p256dh.unwrap();
let auth = prefs.push_subscription_keys_auth.unwrap();
if let Err(e) = push.send(&endpoint, &p256dh, &auth, &payload).await {
slog::warn!(log, "WebPush send failed"; "user_id" => %user_id, "error" => %e);
}
}
} else if let Err(e) = prefs {
slog::warn!(log, "Failed to read push subscription"; "user_id" => %user_id, "error" => %e);
}
}
});
}
}
impl AppService {
@ -101,6 +148,42 @@ impl AppService {
let email = AppEmail::init(&config, logs.clone()).await?;
let avatar = AppAvatar::init(&config).await?;
let storage = match AppStorage::new(&config) {
Ok(s) => {
slog::info!(logs, "Storage initialized at {}", s.base_path.display());
Some(s)
}
Err(e) => {
slog::warn!(logs, "Storage not available: {}", e);
None
}
};
let push = match (
config.vapid_public_key(),
config.vapid_private_key(),
) {
(Some(public_key), Some(private_key)) => {
match WebPushService::new(
public_key,
private_key,
config.vapid_sender_email(),
) {
Ok(s) => {
slog::info!(logs, "WebPush initialized");
Some(s)
}
Err(e) => {
slog::warn!(logs, "WebPush not available: {}", e);
None
}
}
}
_ => {
slog::warn!(logs, "WebPush disabled — VAPID keys not configured");
None
}
};
// Build get_redis closure for MessageProducer
let get_redis: Arc<
@ -162,6 +245,47 @@ impl AppService {
}
};
// Build push notification callback for RoomService
let push_fn: Option<room::PushNotificationFn> = push.clone().map(|push_svc| {
let db_clone = db.clone();
let log_clone = logs.clone();
Arc::new(move |user_id: uuid::Uuid, title: String, body: Option<String>, url: Option<String>| {
let push = push_svc.clone();
let db = db_clone.clone();
let log = log_clone.clone();
let payload = PushPayload {
title,
body: body.unwrap_or_default(),
url,
icon: None,
};
tokio::spawn(async move {
use models::users::user_notification;
use sea_orm::EntityTrait;
let prefs = user_notification::Entity::find_by_id(user_id)
.one(&db)
.await;
if let Ok(Some(prefs)) = prefs {
if prefs.push_enabled
&& prefs.push_subscription_endpoint.is_some()
&& prefs.push_subscription_keys_p256dh.is_some()
&& prefs.push_subscription_keys_auth.is_some()
{
let endpoint = prefs.push_subscription_endpoint.unwrap();
let p256dh = prefs.push_subscription_keys_p256dh.unwrap();
let auth = prefs.push_subscription_keys_auth.unwrap();
if let Err(e) = push.send(&endpoint, &p256dh, &auth, &payload).await {
slog::warn!(log, "WebPush send failed"; "user_id" => %user_id, "error" => %e);
}
}
}
});
}) as room::PushNotificationFn
});
let room = RoomService::new(
db.clone(),
cache.clone(),
@ -172,6 +296,7 @@ impl AppService {
Some(task_service.clone()),
logs.clone(),
None,
push_fn,
);
// Build WsTokenService
@ -187,6 +312,8 @@ impl AppService {
room,
ws_token,
queue_producer: message_producer,
storage,
push,
})
}

View File

@ -324,6 +324,40 @@ impl AppService {
if let Err(_e) = self.queue_producer.publish_email(envelope).await {
// Failed to queue invitation email
}
// Send in-app notification + push notification to the invitee
self.send_push_to_user(
target_uid,
crate::push::PushPayload {
title: format!("Project invitation: {}", project.name),
body: format!("{} invited you to join \"{}\" as {:?}", inviter.username, project.name, scope),
url: Some(format!("/projects/{}/invitations", project.name)),
icon: None,
},
);
let _ = self
.room
.notification_create(room::NotificationCreateRequest {
notification_type: room::NotificationType::ProjectInvitation,
user_id: target_uid,
title: format!("{} invited you to join \"{}\"", inviter.username, project.name),
content: Some(format!("Role: {:?}", scope)),
room_id: None,
project_id: project.id,
related_message_id: None,
related_user_id: Some(inviter_uid),
related_room_id: None,
metadata: Some(serde_json::json!({
"project_id": project.id,
"project_name": project.name,
"inviter_uid": inviter_uid,
"scope": format!("{:?}", scope),
})),
expires_at: None,
})
.await;
Ok(())
}

91
libs/service/push.rs Normal file
View File

@ -0,0 +1,91 @@
use std::sync::Arc;
use anyhow::{bail, Context};
use base64ct::{Base64UrlUnpadded, Encoding};
use serde::Serialize;
use web_push_native::{
jwt_simple::algorithms::ES256KeyPair, p256::PublicKey, Auth, WebPushBuilder,
};
#[derive(Clone)]
pub struct WebPushService {
http: reqwest::Client,
vapid_key_pair: Arc<ES256KeyPair>,
sender_email: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct PushPayload {
pub title: String,
pub body: String,
pub url: Option<String>,
pub icon: Option<String>,
}
impl WebPushService {
/// Create a new WebPush service with VAPID keys.
/// - `_vapid_public_key`: Base64url-encoded P-256 public key (derived from private key)
/// - `vapid_private_key`: Base64url-encoded P-256 private key
/// - `sender_email`: Contact email for VAPID (e.g. "mailto:admin@example.com")
pub fn new(
_vapid_public_key: String,
vapid_private_key: String,
sender_email: String,
) -> anyhow::Result<Self> {
// The VAPID private key bytes are used to create the ES256KeyPair.
// The public key is derived from it, so we don't need to validate the public key separately.
let key_bytes = Base64UrlUnpadded::decode_vec(&vapid_private_key)
.context("Failed to decode VAPID private key")?;
let vapid_key_pair =
ES256KeyPair::from_bytes(&key_bytes).context("Invalid VAPID private key")?;
Ok(Self {
http: reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.context("Failed to build HTTP client")?,
vapid_key_pair: Arc::new(vapid_key_pair),
sender_email,
})
}
/// Send a push notification to a browser subscription.
pub async fn send(
&self,
endpoint: &str,
p256dh: &str,
auth: &str,
payload: &PushPayload,
) -> anyhow::Result<()> {
let endpoint_uri: http::Uri = endpoint
.parse()
.with_context(|| format!("Invalid endpoint URL: {}", endpoint))?;
let ua_public_bytes = Base64UrlUnpadded::decode_vec(p256dh)
.with_context(|| format!("Failed to decode p256dh: {}", p256dh))?;
let ua_public = PublicKey::from_sec1_bytes(&ua_public_bytes)
.with_context(|| "Invalid p256dh key")?;
let auth_bytes = Base64UrlUnpadded::decode_vec(auth)
.with_context(|| format!("Failed to decode auth: {}", auth))?;
let ua_auth = Auth::clone_from_slice(&auth_bytes);
let payload_bytes = serde_json::to_vec(payload)?;
let request = WebPushBuilder::new(endpoint_uri, ua_public, ua_auth)
.with_vapid(&self.vapid_key_pair, &self.sender_email)
.build(payload_bytes)?;
let reqwest_request = reqwest::Request::try_from(request)
.context("Failed to convert web-push request")?;
let response = self.http.execute(reqwest_request).await?;
let status = response.status();
if !status.is_success() && status.as_u16() != 201 {
let body = response.text().await.unwrap_or_default();
bail!("WebPush failed: {} - {}", status, body);
}
Ok(())
}
}

60
libs/service/storage.rs Normal file
View File

@ -0,0 +1,60 @@
use config::AppConfig;
use std::path::PathBuf;
#[derive(Clone)]
pub struct AppStorage {
pub base_path: PathBuf,
pub public_url_base: String,
}
impl AppStorage {
pub fn new(config: &AppConfig) -> anyhow::Result<Self> {
let base_path = config
.env
.get("STORAGE_PATH")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("/data/files"));
let public_url_base = config
.env
.get("STORAGE_PUBLIC_URL")
.cloned()
.unwrap_or_else(|| "/files".to_string());
Ok(Self {
base_path,
public_url_base,
})
}
/// Write data to a local path and return the public URL.
pub async fn upload(
&self,
key: &str,
data: Vec<u8>,
) -> anyhow::Result<String> {
let path = self.base_path.join(key);
// Create parent directories
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(&path, &data).await?;
let url = format!(
"{}/{}",
self.public_url_base.trim_end_matches('/'),
key
);
Ok(url)
}
pub async fn delete(&self, key: &str) -> anyhow::Result<()> {
let path = self.base_path.join(key);
if path.exists() {
tokio::fs::remove_file(&path).await?;
}
Ok(())
}
}

View File

@ -19,6 +19,12 @@ pub struct NotificationPreferencesParams {
pub marketing_enabled: Option<bool>,
pub security_enabled: Option<bool>,
pub product_enabled: Option<bool>,
/// Web Push subscription endpoint (set to null to unsubscribe)
pub push_subscription_endpoint: Option<String>,
/// Web Push subscription p256dh key
pub push_subscription_keys_p256dh: Option<String>,
/// Web Push subscription auth key
pub push_subscription_keys_auth: Option<String>,
}
#[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)]
@ -121,6 +127,18 @@ impl AppService {
if let Some(product_enabled) = params.product_enabled {
active_prefs.product_enabled = Set(product_enabled);
}
if let Some(endpoint) = params.push_subscription_endpoint.clone() {
if endpoint.is_empty() {
// Empty string means unsubscribe — clear all subscription fields
active_prefs.push_subscription_endpoint = Set(None);
active_prefs.push_subscription_keys_p256dh = Set(None);
active_prefs.push_subscription_keys_auth = Set(None);
} else {
active_prefs.push_subscription_endpoint = Set(Some(endpoint));
active_prefs.push_subscription_keys_p256dh = Set(params.push_subscription_keys_p256dh.clone());
active_prefs.push_subscription_keys_auth = Set(params.push_subscription_keys_auth.clone());
}
}
active_prefs.updated_at = Set(Utc::now());
active_prefs.update(&self.db).await?
@ -140,6 +158,9 @@ impl AppService {
marketing_enabled: Set(params.marketing_enabled.unwrap_or(false)),
security_enabled: Set(params.security_enabled.unwrap_or(true)),
product_enabled: Set(params.product_enabled.unwrap_or(false)),
push_subscription_endpoint: Set(None),
push_subscription_keys_p256dh: Set(None),
push_subscription_keys_auth: Set(None),
created_at: Set(Utc::now()),
updated_at: Set(Utc::now()),
};
@ -189,6 +210,9 @@ impl AppService {
marketing_enabled: Set(false),
security_enabled: Set(true),
product_enabled: Set(false),
push_subscription_endpoint: Set(None),
push_subscription_keys_p256dh: Set(None),
push_subscription_keys_auth: Set(None),
created_at: Set(Utc::now()),
updated_at: Set(Utc::now()),
};
@ -197,4 +221,26 @@ impl AppService {
Ok(NotificationPreferencesResponse::from(created_prefs))
}
pub async fn user_unsubscribe_push(
&self,
context: &Session,
) -> Result<(), AppError> {
let user_uid = context.user().ok_or(AppError::Unauthorized)?;
let prefs = user_notification::Entity::find_by_id(user_uid)
.one(&self.db)
.await?;
if let Some(prefs) = prefs {
let mut active_prefs: user_notification::ActiveModel = prefs.into();
active_prefs.push_subscription_endpoint = Set(None);
active_prefs.push_subscription_keys_p256dh = Set(None);
active_prefs.push_subscription_keys_auth = Set(None);
active_prefs.updated_at = Set(Utc::now());
active_prefs.update(&self.db).await?;
}
Ok(())
}
}

View File

@ -373,6 +373,40 @@ impl AppService {
self.email.send(envelope).await.map_err(|e| {
AppError::InternalServerError(format!("Failed to send invitation email: {}", e))
})?;
// Send in-app notification + push notification to the invitee
self.send_push_to_user(
target_user.uid,
crate::push::PushPayload {
title: format!("Workspace invitation: {}", ws.name),
body: format!("{} invited you to join the workspace \"{}\"", inviter.username, ws.name),
url: Some(format!("/workspaces/{}/invitations", ws.slug)),
icon: None,
},
);
let _ = self
.room
.notification_create(room::NotificationCreateRequest {
notification_type: room::NotificationType::WorkspaceInvitation,
user_id: target_user.uid,
title: format!("{} invited you to join \"{}\"", inviter.username, ws.name),
content: None,
room_id: None,
project_id: Default::default(), // workspace invitations don't have a project_id
related_message_id: None,
related_user_id: Some(user_uid),
related_room_id: None,
metadata: Some(serde_json::json!({
"workspace_id": ws.id,
"workspace_name": ws.name,
"workspace_slug": ws.slug,
"inviter_uid": user_uid,
})),
expires_at: None,
})
.await;
Ok(())
}

View File

@ -19,6 +19,7 @@
"@dnd-kit/utilities": "^3.2.2",
"@fontsource-variable/geist": "^5.2.8",
"@gitgraph/react": "^1.6.0",
"@lobehub/icons": "^5.4.0",
"@tailwindcss/vite": "^4.2.2",
"@tanstack/react-query": "^5.96.0",
"@tanstack/react-virtual": "^3.13.23",
@ -29,6 +30,7 @@
"@tiptap/starter-kit": "^3.22.3",
"@tiptap/suggestion": "^3.22.3",
"axios": "^1.7.0",
"browser-image-compression": "^2.0.2",
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"cmdk": "^1.1.1",

4051
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

51
public/sw.js Normal file
View File

@ -0,0 +1,51 @@
// Service Worker for Web Push Notifications
const CACHE_NAME = 'app-v1';
self.addEventListener('push', (event) => {
if (!event.data) return;
let data;
try {
data = event.data.json();
} catch {
data = { title: 'Notification', body: event.data.text() };
}
const options = {
body: data.body || '',
icon: data.icon || '/icon.png',
badge: '/badge.png',
data: { url: data.url || '/' },
vibrate: [200, 100, 200],
requireInteraction: false,
};
event.waitUntil(
self.registration.showNotification(data.title || 'App Notification', options)
);
});
self.addEventListener('notificationclick', (event) => {
event.notification.close();
const url = event.notification.data?.url || '/';
event.waitUntil(
self.clients.matchAll({ type: 'window', includeUncontrolled: true }).then((clients) => {
for (const client of clients) {
if (client.url === url && 'focus' in client) {
return client.focus();
}
}
return self.clients.openWindow(url);
})
);
});
self.addEventListener('install', () => {
self.skipWaiting();
});
self.addEventListener('activate', (event) => {
event.waitUntil(self.clients.claim());
});

814
room.md Normal file
View File

@ -0,0 +1,814 @@
# Room 模块设计文档
## 1. 概述
`Room` 模块是本系统核心的协作与消息通信功能模块,提供稳定、高效、可扩展的实时消息平台。系统采用 Rust (Actix-web) 后端 + React 19 前端的技术栈,通过 WebSocket 实现实时通信,集成了 AI 代理进行智能协作。
### 技术栈
- **后端**: Rust + Actix-web + SeaORM + Redis + NATS
- **前端**: React 19 + TypeScript + Tailwind CSS
- **实时通信**: WebSocket (自定义协议)
- **消息队列**: NATS (事件分发)
- **缓存**: Redis (序列号管理、去重)
---
## 2. 数据模型
### 2.1 核心表结构
#### `room` - 房间表
| 字段 | 类型 | 说明 |
|------|------|------|
| id | UUID (v7) | 主键,时间排序 |
| project | UUID | 所属项目 |
| room_name | VARCHAR(128) | 房间名称 |
| public | BOOLEAN | 是否公开房间 |
| category | UUID (nullable) | 所属分类 |
| created_by | UUID | 创建者 |
| created_at | TIMESTAMPTZ | 创建时间 |
| last_msg_at | TIMESTAMPTZ | 最后消息时间 |
#### `room_message` - 消息表
| 字段 | 类型 | 说明 |
|------|------|------|
| id | UUID (v7) | 主键 |
| seq | BIGINT | 全局序列号(递增) |
| room | UUID | 所属房间 |
| sender_type | ENUM | `member` / `ai` / `system` |
| sender_id | UUID (nullable) | 发送者 ID |
| thread | UUID (nullable) | 所属线程 |
| in_reply_to | UUID (nullable) | 回复的消息 ID |
| content | TEXT | 消息内容 |
| content_type | ENUM | `text` / `markdown` / `code` / `mention` |
| edited_at | TIMESTAMPTZ (nullable) | 编辑时间 |
| send_at | TIMESTAMPTZ | 发送时间 |
| revoked | TIMESTAMPTZ (nullable) | 撤回时间 |
| revoked_by | UUID (nullable) | 撤回操作者 |
#### `room_member` - 房间成员表
| 字段 | 类型 | 说明 |
|------|------|------|
| room | UUID | 房间 ID |
| user | UUID | 用户 ID |
| role | ENUM | `Owner` / `Admin` / `Member` |
| first_msg_in | TIMESTAMPTZ (nullable) | 首次发消息时间 |
| joined_at | TIMESTAMPTZ (nullable) | 加入时间 |
| last_read_seq | BIGINT (nullable) | 已读序列号 |
| do_not_disturb | BOOLEAN | 免打扰 |
| dnd_start_hour | INT (nullable) | DND 开始小时 |
| dnd_end_hour | INT (nullable) | DND 结束小时 |
#### `room_category` - 频道分类表
| 字段 | 类型 | 说明 |
|------|------|------|
| id | UUID | 主键 |
| project | UUID | 所属项目 |
| name | VARCHAR(64) | 分类名称 |
| position | INT | 排序位置 |
#### `room_thread` - 线程表
| 字段 | 类型 | 说明 |
|------|------|------|
| id | UUID | 主键 |
| room | UUID | 所属房间 |
| parent_message | UUID | 父消息 ID |
| created_by | UUID | 创建者 |
| created_at | TIMESTAMPTZ | 创建时间 |
#### `room_ai` - AI 配置表
| 字段 | 类型 | 说明 |
|------|------|------|
| room | UUID | 房间 ID |
| model | UUID | AI 模型 ID |
| version | VARCHAR (nullable) | 模型版本 |
| call_count | INT | 调用次数 |
| last_call_at | TIMESTAMPTZ (nullable) | 最后调用时间 |
| history_limit | INT | 上下文历史限制 |
| system_prompt | TEXT | 系统提示词 |
| temperature | FLOAT | 温度参数 |
| max_tokens | INT | 最大 token 数 |
| use_exact | BOOLEAN | 精确模式 |
| think | BOOLEAN | 思考模式 |
| stream | BOOLEAN | 流式输出 |
| min_score | FLOAT (nullable) | 最小分数阈值 |
#### `room_message_reaction` - 消息反应表
| 字段 | 类型 | 说明 |
|------|------|------|
| id | UUID | 主键 |
| message | UUID | 消息 ID |
| user | UUID | 用户 ID |
| emoji | VARCHAR(32) | emoji |
| created_at | TIMESTAMPTZ | 创建时间 |
#### `room_pin` - 置顶消息表
| 字段 | 类型 | 说明 |
|------|------|------|
| id | UUID | 主键 |
| room | UUID | 房间 ID |
| message | UUID | 消息 ID |
| pinned_by | UUID | 置顶者 |
| pinned_at | TIMESTAMPTZ | 置顶时间 |
### 2.2 消息内容类型
```typescript
type MessageContentType = 'text' | 'markdown' | 'code' | 'mention';
type MessageSenderType = 'member' | 'ai' | 'system';
```
---
## 3. WebSocket 通信协议
### 3.1 协议格式
#### 请求
```typescript
interface WsRequest {
type: 'request';
request_id: string; // 唯一请求 ID
action: WsAction; // 操作类型
params?: WsRequestParams;
}
```
#### 响应
```typescript
interface WsResponse {
type: 'response';
request_id: string;
action: string;
data?: WsResponseData;
error?: WsError;
}
```
### 3.2 Action 列表
#### 房间管理
| Action | 说明 | 权限 |
|--------|------|------|
| `room.list` | 获取房间列表 | Member |
| `room.get` | 获取房间详情 | Member |
| `room.create` | 创建房间 | Admin |
| `room.update` | 更新房间 | Admin |
| `room.delete` | 删除房间 | Admin |
| `room.subscribe` | 订阅房间事件 | Member |
| `room.unsubscribe` | 取消订阅 | Member |
#### 消息管理
| Action | 说明 | 权限 |
|--------|------|------|
| `message.list` | 获取消息列表(分页) | Member |
| `message.create` | 发送消息 | Member |
| `message.update` | 编辑消息 | Owner |
| `message.revoke` | 撤回消息 | Owner |
| `message.get` | 获取单条消息 | Member |
| `message.search` | 搜索消息 | Member |
#### 成员管理
| Action | 说明 | 权限 |
|--------|------|------|
| `member.list` | 获取成员列表 | Member |
| `member.add` | 添加成员 | Admin |
| `member.remove` | 移除成员 | Admin |
| `member.leave` | 离开房间 | Member |
| `member.update_role` | 更新角色 | Admin |
| `member.set_read_seq` | 设置已读位置 | Member |
#### 分类管理
| Action | 说明 | 权限 |
|--------|------|------|
| `category.list` | 获取分类列表 | Member |
| `category.create` | 创建分类 | Admin |
| `category.update` | 更新分类 | Admin |
| `category.delete` | 删除分类 | Admin |
#### 线程管理
| Action | 说明 | 权限 |
|--------|------|------|
| `thread.list` | 获取线程列表 | Member |
| `thread.create` | 创建线程 | Member |
| `thread.messages` | 获取线程消息 | Member |
#### 反应管理
| Action | 说明 | 权限 |
|--------|------|------|
| `reaction.add` | 添加反应 | Member |
| `reaction.remove` | 移除反应 | Owner |
| `reaction.list_batch` | 批量获取反应 | Member |
#### 置顶管理
| Action | 说明 | 权限 |
|--------|------|------|
| `pin.list` | 获取置顶列表 | Member |
| `pin.add` | 添加置顶 | Admin |
| `pin.remove` | 移除置顶 | Admin |
#### AI 管理
| Action | 说明 | 权限 |
|--------|------|------|
| `ai.list` | 获取 AI 配置列表 | Member |
| `ai.upsert` | 创建/更新 AI 配置 | Admin |
| `ai.delete` | 删除 AI 配置 | Admin |
#### 通知管理
| Action | 说明 | 权限 |
|--------|------|------|
| `notification.list` | 获取通知列表 | Member |
| `notification.mark_read` | 标记已读 | Member |
| `notification.mark_all_read` | 全部标记已读 | Member |
| `notification.archive` | 归档通知 | Member |
#### 提及管理
| Action | 说明 | 权限 |
|--------|------|------|
| `mention.list` | 获取提及列表 | Member |
| `mention.read_all` | 全部标记已读 | Member |
### 3.3 实时事件推送
```typescript
type WsEventType =
| 'room.created'
| 'room.updated'
| 'room.deleted'
| 'message.created'
| 'message.updated'
| 'message.revoked'
| 'member.joined'
| 'member.left'
| 'member.role_changed'
| 'thread.created'
| 'reaction.updated'
| 'pin.updated';
```
---
## 4. 后端架构
### 4.1 目录结构
```
libs/room/src/
├── lib.rs # 模块入口
├── service.rs # RoomService 主服务
├── room.rs # 房间 CRUD
├── message.rs # 消息管理
├── member.rs # 成员管理
├── category.rs # 分类管理
├── thread.rs # 线程管理
├── reaction.rs # 反应管理
├── pin.rs # 置顶管理
├── ai.rs # AI 配置管理
├── notification.rs # 通知管理
├── search.rs # 搜索功能
├── connection.rs # WebSocket 连接管理
├── room_ai_queue.rs # AI 队列锁
├── error.rs # 错误类型
├── types.rs # 请求/响应类型
├── helpers.rs # 辅助函数
├── metrics.rs # 指标收集
├── ws_context.rs # WebSocket 上下文
└── draft_and_history.rs # 草稿和编辑历史
```
### 4.2 核心服务 RoomService
```rust
pub struct RoomService {
db: DatabaseConnection,
cache: AppCache,
queue: NatsContext,
ai_client: AiClient,
rate_limiter: RateLimiter,
log: Logger,
}
```
### 4.3 并发控制
- **物理并发限制**: `tokio::sync::Semaphore`
- **消息序列号**: Redis INCR 原子递增
- **消息去重**: `DashMap<UUID, HashSet<i64>>` 内存缓存
- **AI 队列锁**: Redis 分布式锁 (`ai:room:queue:lock:{room_id}`)
### 4.4 AI 队列机制
```rust
// Redis 键结构
ai:room:queue:{room_id} // 队列
ai:room:queue:seq:{room_id} // 序号
ai:room:queue:lock:{room_id} // 分布式锁
ai:room:queue:ticket:{room_id}:{ticket_id} // 票据
// 锁参数
LOCK_TTL_MS: 120_000 // 锁超时 2 分钟
TICKET_TTL_MS: 90_000 // 票据超时 1.5 分钟
MAX_BACKOFF_MS: 200 // 最大退避 200ms
```
### 4.5 定时任务
- **空闲房间清理**: 30 天无消息的房间标记为归档
- **频率限制刷新**: 每分钟重置计数
- **陈旧指标清理**: 定期清理过期数据
---
## 5. 前端架构
### 5.1 目录结构
```
src/
├── components/room/
│ ├── DiscordServerSidebar.tsx # 服务器图标侧边栏 (72px)
│ ├── DiscordChannelSidebar.tsx # 可折叠频道列表
│ ├── DiscordChatPanel.tsx # 主聊天面板
│ ├── DiscordMemberList.tsx # 成员列表 (带在线状态)
│ ├── message/
│ │ ├── MessageList.tsx # 消息列表
│ │ ├── MessageInput.tsx # 消息输入框
│ │ ├── MessageBubble.tsx # 消息气泡
│ │ └── MessageActions.tsx # 消息操作菜单
│ ├── RoomThreadPanel.tsx # 线程侧边栏
│ ├── RoomMentionPanel.tsx # 提及面板
│ ├── RoomPinBar.tsx # 置顶栏
│ ├── RoomMessageSearch.tsx # 消息搜索
│ ├── RoomSettingsPanel.tsx # 设置面板
│ ├── RoomAiAuthBanner.tsx # AI 认证提示
│ ├── RoomAiTasksPanel.tsx # AI 任务面板
│ └── ...
├── contexts/
│ └── room-context.tsx # 房间状态管理
└── lib/
└── ws-protocol.ts # WebSocket 协议定义
```
### 5.2 React Context 状态管理
```typescript
interface RoomContextValue {
// 房间状态
rooms: RoomResponse[];
currentRoom: RoomResponse | null;
roomsLoading: boolean;
// 消息状态
messages: MessageWithMeta[];
threads: RoomThreadResponse[];
// 成员状态
members: RoomMember[];
membersLoading: boolean;
// AI 配置
roomAiConfigs: RoomAiResponse[];
// WebSocket 状态
wsStatus: 'open' | 'connecting' | 'disconnected';
wsError: Error | null;
wsClient: WebSocketClient | null;
// 操作方法
sendMessage: (content: string, inReplyTo?: string) => Promise<void>;
editMessage: (messageId: string, content: string) => Promise<void>;
revokeMessage: (messageId: string) => Promise<void>;
updateRoom: (roomId: string, data: Partial<RoomUpdateRequest>) => Promise<void>;
refreshThreads: () => Promise<void>;
}
```
### 5.3 Discord 风格 UI 布局
```
┌─────────────────────────────────────────────────────────────────┐
│ [Icon] │ # channel-name [🔍][👤] │
├──────────┼─────────────────────────────────────────┬────────────┤
│ │ ┌─────────────────────────────────┐ │ │
│ Category │ │ Message List │ │ Members │
│ ────────│ │ (with virtual scrolling) │ │ Online(3) │
│ # gen │ │ │ │ ● user1 │
│ # dev │ │ user1 [10:30] Hello! │ │ ● user2 │
│ # ai │ │ ↳ user2 [10:32] Hi! │ │ │
│ │ │ │ │ Offline(5)│
│ Category │ │ 🤖 AI [10:33] Thinking... │ │ ○ user3 │
│ ────────│ │ │ │ │
│ 🔒 priv │ └─────────────────────────────────┘ │ │
│ ├─────────────────────────────────────────┤ │
│ │ [+] Type a message... [@][📎] │ │
└──────────┴─────────────────────────────────────────┴────────────┘
72px Flex: 1 240px
```
### 5.4 消息数据流
```
用户输入 → MessageInput
WebSocket.send({ action: 'message.create', params: { content, room_id } })
后端处理 → Redis INCR seq → DB insert → NATS publish
WebSocket.push({ event: 'message.created', data: message })
RoomContext.handleMessage() → setMessages(prev => [...prev, message])
MessageList 渲染
```
---
## 6. API 端点 (REST)
### 6.1 房间管理
```
GET /api/projects/{project}/rooms # 房间列表
GET /api/projects/{project}/rooms/{room} # 房间详情
POST /api/projects/{project}/rooms # 创建房间
PATCH /api/projects/{project}/rooms/{room} # 更新房间
DELETE /api/projects/{project}/rooms/{room} # 删除房间
```
### 6.2 消息管理
```
GET /api/rooms/{room}/messages # 消息列表 (分页)
GET /api/rooms/{room}/messages/{message} # 单条消息
POST /api/rooms/{room}/messages # 发送消息
PATCH /api/rooms/{room}/messages/{message} # 编辑消息
DELETE /api/rooms/{room}/messages/{message} # 撤回消息
GET /api/rooms/{room}/messages/search # 搜索消息
GET /api/rooms/{room}/messages/{message}/history # 编辑历史
```
### 6.3 AI 端点
```
GET /api/rooms/{room}/ai # AI 配置列表
POST /api/rooms/{room}/ai # 创建 AI 配置
PATCH /api/rooms/{room}/ai/{model} # 更新 AI 配置
DELETE /api/rooms/{room}/ai/{model} # 删除 AI 配置
```
---
## 7. 关键设计原则
| 原则 | 实现方式 |
|------|----------|
| 高内聚低耦合 | 模块化服务层、清晰的领域边界 |
| 异步非阻塞 | Tokio async/await、Redis 异步客户端 |
| 事件驱动 | NATS Pub/Sub 解耦组件 |
| 强类型安全 | TypeScript + Rust 编译期检查 |
| 分层架构 | Service → Repository → Database |
| 鲁棒性优先 | 完善的错误处理、限流、资源回收 |
---
## 8. 已实现功能清单
### 8.1 核心功能 ✅
- [x] 房间 CRUD (创建/读取/更新/删除)
- [x] 消息 CRUD (发送/编辑/撤回/历史)
- [x] 成员管理 (邀请/移除/角色变更)
- [x] 频道分类 (创建/排序/折叠)
- [x] 线程回复 (创建线程/线程消息)
- [x] 消息反应 (emoji 反应)
- [x] 消息置顶
- [x] 消息搜索
- [x] 未读计数
### 8.2 实时功能 ✅
- [x] WebSocket 长连接
- [x] 消息实时推送
- [x] 成员状态同步
- [x] 在线状态显示
- [x] 消息去重
- [x] IndexedDB 离线缓存
- [x] 虚拟滚动列表 (@tanstack/react-virtual)
### 8.3 AI 集成 ✅
- [x] AI 模型配置
- [x] AI 消息发送
- [x] 系统提示词
- [x] 上下文历史限制
- [x] 流式输出支持
- [x] AI 队列锁
### 8.4 富媒体消息 ✅
- [x] 消息内容类型: text, image, audio, video, file
- [x] Tiptap 富文本编辑器 (IMEditor)
- [x] FileNode Tiptap 扩展 (文件/图片节点)
- [x] 文件上传状态管理 (uploading/done/error)
- [x] Markdown 支持
### 8.5 全文搜索 ✅
- [x] PostgreSQL 全文索引 (GIN + tsvector)
- [x] 搜索 API (room_message_search)
- [x] 分页与结果计数
### 8.6 通知系统 ✅
- [x] 提及通知
- [x] 线程通知
- [x] DND 免打扰时段 (do_not_disturb, dnd_start/end_hour)
### 8.4 用户体验 ✅
- [x] 消息草稿自动保存
- [x] @提及功能
- [x] 回复引用
- [x] 消息时间格式化
- [x] Discord 风格 UI
- [x] 侧边栏折叠
- [x] 成员列表按角色着色
---
## 9. 已完成功能 (详细技术方案)
### 9.1 富媒体消息支持 ✅
#### 9.1.1 图片消息
```
技术实现:
- 前端: FileNode.tsx Tiptap 扩展,支持 inline 文件节点
- 富媒体消息类型: MessageContentType::Image, Audio, Video, File
- 文件上传: Tiptap IMEditor 支持拖拽上传
- 状态管理: uploading/done/error 三种状态
代码位置:
- src/components/room/message/editor/FileNode.tsx (Tiptap 文件节点)
- src/components/room/message/editor/IMEditor.tsx (富文本编辑器)
```
#### 9.1.2 消息内容类型 ✅
```
已实现的消息类型:
- Text (text)
- Image (image)
- Audio (audio)
- Video (video)
- File (file)
代码位置:
- libs/models/rooms/mod.rs (MessageContentType enum)
```
### 9.2 历史消息优化 ✅
#### 9.2.1 虚拟滚动列表
```
技术实现:
- @tanstack/react-virtual + useVirtualizer
- 按需渲染可见区域消息 (overscan: 30)
- 动态高度估算 (estimateMessageRowHeight)
- 日期分隔符自动插入
- 滚动位置保持 (加载更多时)
代码位置:
- src/components/room/message/MessageList.tsx
功能特性:
- [x] IntersectionObserver 自动加载更多
- [x] 滚动位置恢复
- [x] 滚动到底部按钮
- [x] 日期分组分隔符
```
#### 9.2.2 IndexedDB 离线缓存 ✅
```
技术实现:
- IndexedDB 本地持久化存储
- 双索引: by_room, by_room_seq
- 支持离线消息恢复
- 自动保存/加载消息
API:
- saveMessage(msg) # 保存单条
- saveMessages(roomId, msgs) # 批量保存
- loadMessages(roomId) # 加载房间消息
- loadOlderMessagesFromIdb() # 加载历史消息
- getMaxSeq(roomId) # 获取最大序列号 (去重)
代码位置:
- src/lib/storage/indexed-db.ts
- src/contexts/room-context.tsx (集成缓存)
```
### 9.3 全文搜索 ✅
#### 9.3.1 PostgreSQL 全文索引
```
技术实现:
- content_tsv TSVECTOR 列
- GIN 索引 (idx_room_message_content_tsv)
- plainto_tsquery('simple', query) 全文搜索
代码位置:
- libs/room/src/search.rs (room_message_search)
- libs/migrate/sql/m20250628_000080_add_message_reactions_and_search.sql
搜索功能:
- [x] 全文搜索 API
- [x] 分页支持 (limit, offset)
- [x] 结果计数 (total)
- [x] 显示名称解析
```
### 9.4 通知系统完善 ✅
#### 9.4.1 多维度通知配置 ✅
```
技术实现:
- room_member 表新增字段:
- do_not_disturb: BOOLEAN (免打扰开关)
- dnd_start_hour: INT (DND 开始时间, 0-23)
- dnd_end_hour: INT (DND 结束时间, 0-23)
数据库迁移:
- libs/migrate/m20250628_000078_add_room_member_do_not_disturb.rs
代码位置:
- libs/models/rooms/room_member.rs
```
#### 9.4.2 通知系统
```
已实现功能:
- [x] 提及通知 (Mention)
- [x] 线程通知 (Thread)
- [x] DND 免打扰时段
- [x] 通知列表 API
- [x] 标记已读 API
代码位置:
- libs/room/src/notification.rs
- libs/service/user/notification.rs
```
---
## 10. 待实现功能
### 10.1 富媒体消息完善
```
待实现:
1. [ ] 对象存储集成 (S3/MinIO)
2. [ ] 文件下载 API
3. [ ] 图片预览 Modal
4. [ ] 视频播放器集成
5. [ ] Office 文档预览
6. [ ] 文件大小/类型验证
7. [ ] 图片压缩 (WebWorker)
```
### 10.2 全文搜索增强
```
待实现:
1. [ ] 时间范围筛选
2. [ ] 用户筛选 (@username)
3. [ ] 文件类型筛选 (content_type)
4. [ ] 搜索历史记录
5. [ ] 结果高亮
6. [ ] 正则搜索支持
7. [ ] 数据库触发器自动更新 tsvector
```
### 10.3 推送通知
```
待实现:
1. [ ] Web Push 集成 (service worker)
2. [ ] 移动端推送
3. [ ] 通知中心 UI
4. [ ] 未读计数 Badge
5. [ ] 关键词提醒
```
### 10.4 性能优化
```
待实现:
1. [ ] room_message 表分区 (按时间)
2. [ ] 读写分离
3. [ ] 房间列表缓存 (Redis)
4. [ ] 成员列表缓存
5. [ ] Redis Pipeline 批量操作
6. [ ] 组件代码分割 (React.lazy)
7. [ ] 图片懒加载
```
### 10.5 AI 增强功能
```
待实现:
1. [ ] AI 连续对话上下文管理
2. [ ] AI 会话历史管理
3. [ ] AI 切换对话线程
4. [ ] AI 输出 Markdown 渲染优化
5. [ ] AI 工具调用扩展 (消息引用/代码执行/搜索)
6. [ ] 定时 AI 任务
7. [ ] 会议纪要生成
```
### 10.6 国际化 (i18n)
```
待实现:
1. [ ] 前端 i18n (react-i18next)
2. [ ] 后端 i18n (rust-i18n)
3. [ ] 提取 UI 字符串
4. [ ] 语言切换器
5. [ ] 日期/时间本地化
6. [ ] RTL 语言支持
```
---
## 11. 测试计划
### 11.1 单元测试
- [ ] RoomService 业务逻辑测试
- [ ] 消息序列号生成测试
- [ ] 权限检查测试
- [ ] React Hooks 测试
### 11.2 集成测试
- [ ] WebSocket 连接测试
- [ ] 数据库事务测试
- [ ] Redis 缓存测试
- [ ] NATS 消息分发测试
### 11.3 E2E 测试
- [ ] 房间创建流程
- [ ] 消息发送与接收
- [ ] 消息编辑与撤回
- [ ] AI 对话流程
---
## 12. 部署与运维
### 11.1 环境变量
```
# 数据库
DATABASE_URL=postgresql://user:pass@host:5432/db
# Redis
REDIS_URL=redis://host:6379
# NATS
NATS_URL=nats://host:4222
# 对象存储
S3_ENDPOINT=https://s3.example.com
S3_BUCKET=room-media
AWS_ACCESS_KEY_ID=xxx
AWS_SECRET_ACCESS_KEY=xxx
# AI
OPENAI_API_KEY=sk-xxx
OPENROUTER_API_KEY=xxx
```
### 11.2 Kubernetes 配置
```
# Room Service Deployment
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "1Gi"
cpu: "500m"
# HPA 自动扩缩容
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
```
### 11.3 监控指标
- WebSocket 连接数
- 消息吞吐量 (msg/s)
- AI 调用延迟
- Redis 缓存命中率
- 数据库查询延迟
---
## 13. 安全考虑
### 12.1 权限模型
```
项目权限 → 房间权限 → 成员角色
Admin/Owner → Admin/Owner/Member
```
### 12.2 输入验证
- 消息内容长度限制 (MAX: 10000 chars)
- 文件大小限制 (MAX: 100MB)
- 文件类型白名单
### 12.3 CSRF/XSS 防护
- WebSocket 请求携带 JWT
- 消息内容转义
- 文件名 sanitize

View File

@ -1,5 +1,5 @@
import { useEffect, useRef, useState } from 'react';
import { Bell, Loader2, Mail, Moon, Package, Shield } from 'lucide-react';
import { Bell, Loader2, Mail, Moon, Package, Shield, BellRing } from 'lucide-react';
import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query';
import { toast } from 'sonner';
import { Button } from '@/components/ui/button';
@ -9,11 +9,20 @@ import { Label } from '@/components/ui/label';
import { Separator } from '@/components/ui/separator';
import { Switch } from '@/components/ui/switch';
import { getNotificationPreferences, updateNotificationPreferences } from '@/client';
import {getApiErrorMessage} from '@/lib/api-error';
import { getApiErrorMessage } from '@/lib/api-error';
import { usePushNotification } from '@/hooks/usePushNotification';
export function SettingsPreferences() {
const queryClient = useQueryClient();
const isInitialized = useRef(false);
const { permission: pushPermission, isSubscribed: isPushSubscribed, isLoading: isPushLoading, error: pushError, subscribe: subscribePush, unsubscribe: unsubscribePush } = usePushNotification();
// Sync push_enabled state with subscription
const [pushEnabled, setPushEnabled] = useState(false);
useEffect(() => {
setPushEnabled(isPushSubscribed);
}, [isPushSubscribed]);
const [emailEnabled, setEmailEnabled] = useState(true);
const [inAppEnabled, setInAppEnabled] = useState(true);
@ -80,6 +89,7 @@ export function SettingsPreferences() {
marketing_enabled: marketingEnabled,
security_enabled: securityEnabled,
product_enabled: productEnabled,
push_enabled: pushEnabled,
},
});
},
@ -146,6 +156,46 @@ export function SettingsPreferences() {
</div>
<Switch id="in-app-enabled" checked={inAppEnabled} onCheckedChange={setInAppEnabled} />
</div>
<Separator />
{/* Browser Push Notifications */}
<div className="flex items-center justify-between">
<div className="space-y-0.5">
<div className="flex items-center gap-2">
<BellRing className="h-4 w-4 text-muted-foreground" />
<Label htmlFor="push-enabled" className="cursor-pointer">
Browser Push Notifications
</Label>
</div>
<p className="text-sm text-muted-foreground">
{pushPermission === 'unsupported'
? 'Your browser does not support push notifications'
: pushPermission === 'denied'
? 'Blocked by browser. Enable in site settings.'
: isPushSubscribed
? 'Subscribed — you will receive browser notifications'
: 'Receive notifications even when the tab is closed'}
</p>
{pushError && <p className="text-sm text-destructive">{pushError}</p>}
</div>
<Button
id="push-enabled"
size="sm"
variant={isPushSubscribed ? 'destructive' : 'default'}
disabled={isPushLoading || pushPermission === 'unsupported' || pushPermission === 'denied'}
onClick={isPushSubscribed ? unsubscribePush : subscribePush}
>
{isPushLoading ? (
<Loader2 className="h-4 w-4 animate-spin mr-1" />
) : null}
{isPushLoading
? 'Loading...'
: isPushSubscribed
? 'Disable'
: 'Enable'}
</Button>
</div>
</CardContent>
</Card>

View File

@ -1,26 +1,39 @@
import type { ReactNode } from 'react';
/**
* AI model icon using @lobehub/icons.
* Pass the AI model display name (e.g. "anthropic/claude-3-5-sonnet-20241022")
* as the `model` prop lobehub's keyword regex matching handles the rest.
*/
import { memo } from 'react';
import LobehubModelIcon from '@lobehub/icons/es/features/ModelIcon';
/** Map model IDs to colored circles (fallback for AI sender avatars) */
export function ModelIcon({ modelId, className }: { modelId?: string; className?: string }): ReactNode {
const colors: Record<string, string> = {
claude: 'bg-orange-500',
gpt: 'bg-green-600',
gemini: 'bg-blue-600',
deepseek: 'bg-purple-600',
o1: 'bg-pink-600',
o3: 'bg-pink-700',
o4: 'bg-pink-800',
ai: 'bg-primary',
};
const color = modelId ? (colors[modelId.toLowerCase()] ?? 'bg-primary') : 'bg-primary';
return (
<span
className={`inline-block h-5 w-5 rounded-full ${color} flex items-center justify-center ${className ?? ''}`}
title={modelId}
>
<span className="text-[10px] font-bold text-white">A</span>
</span>
);
/** Derive a readable label from the display name for the tooltip. */
export function modelDisplayLabel(displayName?: string): string {
if (!displayName) return 'AI';
// e.g. "anthropic/claude-3-5-sonnet-20241022" → "Claude 3 5 Sonnet 20241022"
const last = displayName.includes('/')
? displayName.split('/').pop()!
: displayName;
return last
.replace(/[-_]/g, ' ')
.replace(/\b\w/g, (c) => c.toUpperCase());
}
interface ModelIconProps {
/** AI model display name from the backend (e.g. "anthropic/claude-3-5-sonnet"). */
model?: string;
className?: string;
}
/** Colored AI model logo icon from @lobehub/icons.
* Supports 250+ AI models via keyword regex matching.
* Falls back to a default circle when no match is found. */
export const ModelIcon = memo(function ModelIcon({ model, className }: ModelIconProps) {
return (
<LobehubModelIcon
model={model}
type="avatar"
size={20}
className={className}
/>
);
});

View File

@ -17,7 +17,7 @@ import { ModelIcon } from '../icon-match';
import { FunctionCallBadge } from '../FunctionCallBadge';
import { MessageContent } from './MessageContent';
import { ThreadIndicator } from '../RoomThreadPanel';
import { getSenderDisplayName, getSenderModelId, getAvatarFromUiMessage, getSenderUserUid, isUserSender } from '../sender';
import { getSenderDisplayName, getAvatarFromUiMessage, getSenderUserUid, isUserSender } from '../sender';
import { MessageReactions } from './MessageReactions';
import { ReactionPicker } from './ReactionPicker';
@ -78,7 +78,6 @@ export const MessageBubble = memo(function MessageBubble({
const isAi = ['ai', 'system', 'tool'].includes(message.sender_type);
const isSystem = message.sender_type === 'system';
const displayName = getSenderDisplayName(message);
const senderModelId = getSenderModelId(message);
const avatarUrl = getAvatarFromUiMessage(message);
const initial = (displayName?.charAt(0) ?? '?').toUpperCase();
const isStreaming = !!message.is_streaming;
@ -209,7 +208,7 @@ export const MessageBubble = memo(function MessageBubble({
className="text-sm font-semibold"
style={{ background: `${senderColor}22`, color: senderColor }}
>
{isAi ? <ModelIcon modelId={senderModelId} /> : initial}
{isAi ? <ModelIcon model={displayName} /> : initial}
</AvatarFallback>
</Avatar>
</button>
@ -318,12 +317,10 @@ export const MessageBubble = memo(function MessageBubble({
</div>
))
) : (
<div className="whitespace-pre-wrap break-words">
<MessageContent
<MessageContent
content={displayContent}
onMentionClick={handleMentionClick}
/>
</div>
)}
{/* Streaming cursor */}

View File

@ -1,9 +1,14 @@
'use client';
/**
* Renders message content parses @[type:id:label] mentions into styled spans.
* Renders message content markdown with @[type:id:label] mentions.
* Mentions are protected from markdown parsing by replacing them with
* placeholder tokens before rendering, then restored in custom text components.
*/
import { memo, useMemo } from 'react';
import Markdown from 'react-markdown';
import remarkGfm from 'remark-gfm';
import { cn } from '@/lib/utils';
interface MessageContentProps {
@ -11,35 +16,23 @@ interface MessageContentProps {
onMentionClick?: (type: string, id: string, label: string) => void;
}
/** Parses @[type:id:label] patterns from message content */
function parseContent(content: string): Array<{ type: 'text' | 'mention'; text?: string; mention?: { type: string; id: string; label: string } }> {
const parts: Array<{ type: 'text' | 'mention'; text?: string; mention?: { type: string; id: string; label: string } }> = [];
const RE = /@\[([a-z]+):([^:\]]+):([^\]]+)\]/g;
let lastIndex = 0;
let match: RegExpExecArray | null;
const MENTION_RE = /@\[([a-z]+):([^:\]]+):([^\]]+)\]/g;
while ((match = RE.exec(content)) !== null) {
// Text before this match
if (match.index > lastIndex) {
parts.push({ type: 'text', text: content.slice(lastIndex, match.index) });
}
parts.push({
type: 'mention',
mention: {
type: match[1],
id: match[2],
label: match[3],
},
});
lastIndex = RE.lastIndex;
}
interface MentionInfo {
type: string;
id: string;
label: string;
}
// Remaining text
if (lastIndex < content.length) {
parts.push({ type: 'text', text: content.slice(lastIndex) });
}
return parts;
/** Replace @[type:id:label] with ◊MENTION_i◊ placeholders (◊ is unlikely in real content) */
function extractMentions(content: string): { safeContent: string; mentions: MentionInfo[] } {
const mentions: MentionInfo[] = [];
const safeContent = content.replace(MENTION_RE, (_match, type, id, label) => {
const idx = mentions.length;
mentions.push({ type, id, label });
return `\u200BMENTION_${idx}\u200B`; // zero-width spaces prevent markdown parsing
});
return { safeContent, mentions };
}
function getMentionStyle(type: string): string {
@ -52,42 +45,149 @@ function getMentionStyle(type: string): string {
}
}
export function MessageContent({ content, onMentionClick }: MessageContentProps) {
const parts = parseContent(content);
/** Restore mention placeholders inside a text node into React elements */
function restoreMentions(text: string, mentions: MentionInfo[], onMentionClick?: (type: string, id: string, label: string) => void): React.ReactNode[] {
const MENTION_PLACEHOLDER_RE = /\u200BMENTION_(\d+)\u200B/g;
const parts: React.ReactNode[] = [];
let lastIndex = 0;
let match: RegExpExecArray | null;
while ((match = MENTION_PLACEHOLDER_RE.exec(text)) !== null) {
if (match.index > lastIndex) {
parts.push(text.slice(lastIndex, match.index));
}
const idx = parseInt(match[1], 10);
const m = mentions[idx];
if (m) {
parts.push(
<span
key={`mention-${idx}`}
role={onMentionClick ? 'button' : undefined}
tabIndex={onMentionClick ? 0 : undefined}
className={cn(
'inline-flex items-center gap-0.5 rounded px-1 py-0.5 font-medium text-xs mx-0.5',
getMentionStyle(m.type),
)}
onClick={() => onMentionClick?.(m.type, m.id, m.label)}
onKeyDown={(e) => {
if ((e.key === 'Enter' || e.key === ' ') && onMentionClick) {
e.preventDefault();
onMentionClick(m.type, m.id, m.label);
}
}}
>
@{m.label}
</span>,
);
}
lastIndex = MENTION_PLACEHOLDER_RE.lastIndex;
}
if (lastIndex < text.length) {
parts.push(text.slice(lastIndex));
}
return parts;
}
export const MessageContent = memo(function MessageContent({ content, onMentionClick }: MessageContentProps) {
const { safeContent, mentions } = useMemo(() => extractMentions(content), [content]);
return (
<div
className={cn(
'text-sm text-foreground',
'max-w-full min-w-0 break-words whitespace-pre-wrap',
'text-[15px] text-foreground',
'max-w-full min-w-0 break-words',
'[&_code]:rounded [&_code]:bg-muted [&_code]:px-1 [&_code]:py-0.5 [&_code]:font-mono [&_code]:text-xs',
'[&_pre]:rounded-md [&_pre]:bg-muted [&_pre]:p-3 [&_pre]:overflow-x-auto',
'[&_p]:whitespace-pre-wrap [&_p]:leading-[1.4] [&_p]:my-1',
'[&_ul]:list-disc [&_ul]:pl-6 [&_ul]:my-1',
'[&_ol]:list-decimal [&_ol]:pl-6 [&_ol]:my-1',
'[&_li]:my-0.5',
'[&_blockquote]:border-l-2 [&_blockquote]:border-primary [&_blockquote]:pl-4 [&_blockquote]:my-1',
'[&_h1]:text-xl [&_h1]:font-semibold [&_h1]:my-2',
'[&_h2]:text-lg [&_h2]:font-semibold [&_h2]:my-2',
'[&_h3]:text-base [&_h3]:font-semibold [&_h3]:my-1.5',
'[&_strong]:font-semibold',
'[&_a]:text-primary [&_a]:underline [&_a]:underline-offset-2',
'[&_hr]:border-foreground/20 [&_hr]:my-2',
'[&_table]:w-full [&_table]:border-collapse [&_table]:rounded-md [&_table]:border [&_table]:border-foreground/20 [&_table]:my-2',
'[&_th]:border [&_th]:border-foreground/20 [&_th]:px-2 [&_th]:py-1 [&_th]:text-left [&_th]:font-bold',
'[&_td]:border [&_td]:border-foreground/20 [&_td]:px-2 [&_td]:py-1 [&_td]:text-left',
'[&_tr]:border-t [&_tr]:even:bg-muted',
)}
>
{parts.map((part, i) =>
part.type === 'text' ? (
<span key={i}>{part.text}</span>
) : (
<span
key={i}
role={onMentionClick ? 'button' : undefined}
tabIndex={onMentionClick ? 0 : undefined}
className={cn(
'inline-flex items-center gap-0.5 rounded px-1 py-0.5 font-medium text-xs mx-0.5',
getMentionStyle(part.mention!.type),
)}
onClick={() => onMentionClick?.(part.mention!.type, part.mention!.id, part.mention!.label)}
onKeyDown={(e) => {
if ((e.key === 'Enter' || e.key === ' ') && onMentionClick) {
e.preventDefault();
onMentionClick(part.mention!.type, part.mention!.id, part.mention!.label);
<Markdown
remarkPlugins={[remarkGfm]}
components={{
p: ({ children }) => {
// Restore mentions in paragraph text nodes
if (typeof children === 'string') {
return <p>{restoreMentions(children, mentions, onMentionClick)}</p>;
}
// Children may be an array of strings/elements
if (Array.isArray(children)) {
const restored = children.map((child) => {
if (typeof child === 'string') {
return restoreMentions(child, mentions, onMentionClick);
}
}}
>
<span>@{part.mention!.label}</span>
</span>
),
)}
return child;
});
return <p>{restored}</p>;
}
return <p>{children}</p>;
},
li: ({ children }) => {
if (typeof children === 'string') {
return <li>{restoreMentions(children, mentions, onMentionClick)}</li>;
}
if (Array.isArray(children)) {
const restored = children.map((child) => {
if (typeof child === 'string') {
return restoreMentions(child, mentions, onMentionClick);
}
return child;
});
return <li>{restored}</li>;
}
return <li>{children}</li>;
},
strong: ({ children }) => {
if (typeof children === 'string') {
return <strong>{restoreMentions(children, mentions, onMentionClick)}</strong>;
}
return <strong>{children}</strong>;
},
em: ({ children }) => {
if (typeof children === 'string') {
return <em>{restoreMentions(children, mentions, onMentionClick)}</em>;
}
return <em>{children}</em>;
},
code: ({ className, children, ...props }) => {
// Inline code — don't restore mentions inside code blocks
const isBlock = typeof className === 'string' && className.includes('language-');
if (isBlock) {
// Fenced code block — let the pre wrapper handle it
return <code className={className} {...props}>{children}</code>;
}
return (
<code
className="font-mono rounded bg-muted px-1 py-0.5 text-xs"
{...props}
>
{children}
</code>
);
},
pre: ({ children }) => {
// Preserve code blocks as-is, no mention restoration
return <pre className="rounded-md bg-muted p-3 overflow-x-auto">{children}</pre>;
},
}}
>
{safeContent}
</Markdown>
</div>
);
}
});

View File

@ -15,6 +15,7 @@ import { Paperclip, Smile, Send, X } from 'lucide-react';
import { cn } from '@/lib/utils';
import { COMMON_EMOJIS } from '../../shared';
import { useTheme } from '@/contexts';
import { useImageCompress } from '@/hooks/useImageCompress';
export interface IMEditorProps {
replyingTo?: { id: string; display_name?: string; content: string } | null;
@ -243,6 +244,7 @@ export const IMEditor = forwardRef<IMEditorHandle, IMEditorProps>(function IMEdi
) {
const { resolvedTheme } = useTheme();
const p = resolvedTheme === 'dark' ? DARK : LIGHT;
const { compress } = useImageCompress();
const [showEmoji, setShowEmoji] = useState(false);
const [mentionOpen, setMentionOpen] = useState(false);
@ -338,8 +340,14 @@ export const IMEditor = forwardRef<IMEditorHandle, IMEditorProps>(function IMEdi
const doUpload = async (file: File) => {
if (!editor || !onUploadFile) return;
try {
const res = await onUploadFile(file);
editor.chain().focus().insertContent({ type: 'file', attrs: { id: res.id, name: file.name, url: res.url, size: file.size, type: file.type, status: 'done' } }).insertContent(' ').run();
// Compress image before upload (only if it's an image and > 500KB)
let uploadFile = file;
if (file.type.startsWith('image/') && file.size > 500 * 1024) {
const result = await compress(file, { maxSizeMB: 1, maxWidthOrHeight: 1920, useWebWorker: true });
uploadFile = result.file;
}
const res = await onUploadFile(uploadFile);
editor.chain().focus().insertContent({ type: 'file', attrs: { id: res.id, name: uploadFile.name, url: res.url, size: uploadFile.size, type: uploadFile.type, status: 'done' } }).insertContent(' ').run();
} catch { /* ignore */ }
};

View File

@ -178,6 +178,9 @@ export function RoomProvider({
const [wsStatus, setWsStatus] = useState<RoomWsStatus>('idle');
const [wsError, setWsError] = useState<string | null>(null);
const [wsToken, setWsToken] = useState<string | null>(null);
// Buffer for messages received while user is in a different room (Bug 3 fix).
// Merged into state when the user switches to that room.
const pendingRoomMessagesRef = useRef<Map<string, RoomMessagePayload[]>>(new Map());
// Keep ref updated with latest activeRoomId
activeRoomIdRef.current = activeRoomId;
@ -237,6 +240,19 @@ export function RoomProvider({
setMessages([]);
setIsHistoryLoaded(false);
setNextCursor(null);
// Merge any buffered messages for the new room (Bug 3 fix)
if (activeRoomId) {
const pending = pendingRoomMessagesRef.current.get(activeRoomId);
if (pending && pending.length > 0) {
pendingRoomMessagesRef.current.delete(activeRoomId);
setMessages((prev) => {
const merged = [...prev, ...pending.map(wsMessageToUiMessage)];
merged.sort((a, b) => a.seq - b.seq);
return merged;
});
}
}
// NOTE: intentionally NOT clearing IndexedDB — keeping it enables instant
// load when the user returns to this room without waiting for API.
}
@ -396,6 +412,16 @@ export function RoomProvider({
const [roomAiConfigs, setRoomAiConfigs] = useState<RoomAiConfig[]>([]);
const [aiConfigsLoading, setAiConfigsLoading] = useState(false);
// ── Update WS token on existing client (instead of recreating client) ────────
useEffect(() => {
if (wsToken && wsClientRef.current) {
wsClientRef.current.setWsToken(wsToken);
}
}, [wsToken]);
// ── Create WS client ONCE on mount ──────────────────────────────────────────
// Recreating the client on wsToken change caused multiple invalid connections
// (Bug 1). Instead, create once and update the token in-place.
useEffect(() => {
const baseUrl = import.meta.env.VITE_API_BASE_URL ?? window.location.origin;
const client = createRoomWsClient(
@ -407,13 +433,22 @@ export function RoomProvider({
setMessages((prev) => {
const existingIdx = prev.findIndex((m) => m.id === payload.id);
if (existingIdx !== -1) {
// Message already exists — update reactions if provided
if (payload.reactions !== undefined) {
// Message already exists (e.g. created by streaming chunk) —
// merge server-side fields (display_name, reactions) that the
// chunk didn't have.
const existing = prev[existingIdx];
const needsUpdate =
(!existing.display_name && payload.display_name) ||
(payload.reactions !== undefined && existing.reactions === undefined);
if (needsUpdate) {
const updated = [...prev];
updated[existingIdx] = { ...updated[existingIdx], reactions: payload.reactions };
updated[existingIdx] = {
...existing,
display_name: payload.display_name ?? existing.display_name,
reactions: payload.reactions ?? existing.reactions,
};
return updated;
}
// Duplicate of a real message — ignore
return prev;
}
// Replace optimistic message with server-confirmed one
@ -437,9 +472,16 @@ export function RoomProvider({
}
return updated;
});
} else {
// Buffer messages for non-active rooms (Bug 3 fix).
// When user switches to that room, pending messages are merged.
pendingRoomMessagesRef.current.set(payload.room_id, [
...pendingRoomMessagesRef.current.get(payload.room_id) ?? [],
payload,
]);
}
},
onAiStreamChunk: (chunk: { done: boolean; message_id: string; room_id: string; content: string }) => {
onAiStreamChunk: (chunk: { done: boolean; message_id: string; room_id: string; content: string; display_name?: string }) => {
if (chunk.done) {
setStreamingContent((prev) => {
prev.delete(chunk.message_id);
@ -480,6 +522,7 @@ export function RoomProvider({
room: chunk.room_id,
seq: 0,
sender_type: 'ai',
display_name: chunk.display_name,
content: accumulated,
display_content: accumulated,
content_type: 'text',
@ -493,7 +536,7 @@ export function RoomProvider({
}
},
onRoomReactionUpdated: (payload: RoomReactionUpdatedPayload) => {
if (!activeRoomIdRef.current) return;
if (payload.room_id !== activeRoomIdRef.current) return;
setMessages((prev) => {
const existingIdx = prev.findIndex((m) => m.id === payload.message_id);
if (existingIdx === -1) return prev;
@ -589,26 +632,21 @@ export function RoomProvider({
setWsError(error.message);
},
},
{ wsToken: wsToken ?? undefined },
);
setWsClient(client);
wsClientRef.current = client;
return () => {
client.disconnect();
wsClientRef.current = null;
};
}, [wsToken]);
// ── Connect WS whenever a new client is created ─────────────────────────────
// Intentionally depends on wsClient (not wsClientRef) so a new client triggers connect().
// connect() is idempotent — no-op if already connecting/open.
useEffect(() => {
wsClientRef.current?.connect().catch((e) => {
// Connect immediately — connect() fetches its own token if needed
client.connect().catch((e) => {
console.error('[RoomContext] WS connect error:', e);
});
}, [wsClient]);
return () => {
client.disconnect(); // Intentional disconnect on unmount — no reconnect
wsClientRef.current = null;
};
}, []); // ← empty deps: create once on mount
const connectWs = useCallback(async () => {
const client = wsClientRef.current;

View File

@ -0,0 +1,56 @@
import { useState, useCallback } from 'react';
import imageCompression from 'browser-image-compression';
export interface ImageCompressOptions {
maxSizeMB?: number;
maxWidthOrHeight?: number;
useWebWorker?: boolean;
}
export interface CompressionResult {
file: File;
originalSize: number;
compressedSize: number;
}
/**
* Compresses an image file using browser-image-compression.
* Runs in a WebWorker by default to avoid blocking the main thread.
*/
export function useImageCompress() {
const [isCompressing, setIsCompressing] = useState(false);
const [error, setError] = useState<string | null>(null);
const compress = useCallback(async (
file: File,
options: ImageCompressOptions = {}
): Promise<CompressionResult> => {
setIsCompressing(true);
setError(null);
const defaultOptions = {
maxSizeMB: 1,
maxWidthOrHeight: 1920,
useWebWorker: true,
fileType: file.type,
...options,
};
try {
const compressed = await imageCompression(file, defaultOptions);
setIsCompressing(false);
return {
file: compressed as File,
originalSize: file.size,
compressedSize: compressed.size,
};
} catch (e) {
const msg = e instanceof Error ? e.message : 'Compression failed';
setError(msg);
setIsCompressing(false);
throw e;
}
}, []);
return { compress, isCompressing, error };
}

View File

@ -0,0 +1,170 @@
'use client';
import { useCallback, useEffect, useState } from 'react';
import axios from 'axios';
export type PushPermissionState = NotificationPermission | 'unsupported';
export interface PushSubscriptionInfo {
endpoint: string;
p256dh: string;
auth: string;
}
interface UsePushNotificationReturn {
permission: PushPermissionState;
isSubscribed: boolean;
isLoading: boolean;
error: string | null;
subscribe: () => Promise<void>;
unsubscribe: () => Promise<void>;
}
/**
* Hook for managing Web Push notification subscriptions.
* Handles Service Worker registration, push permission, and subscription lifecycle.
*/
export function usePushNotification(): UsePushNotificationReturn {
const [permission, setPermission] = useState<PushPermissionState>('unsupported');
const [isSubscribed, setIsSubscribed] = useState(false);
const [isLoading, setIsLoading] = useState(false);
const [error, setError] = useState<string | null>(null);
// Check initial state on mount
useEffect(() => {
if (!('Notification' in self) || !('serviceWorker' in navigator) || !('PushManager' in self)) {
setPermission('unsupported');
return;
}
setPermission(Notification.permission);
// Check if already subscribed
navigator.serviceWorker.ready.then((registration) => {
registration.pushManager.getSubscription().then((sub) => {
setIsSubscribed(!!sub);
}).catch(() => {
setIsSubscribed(false);
});
}).catch(() => {
setIsSubscribed(false);
});
}, []);
const subscribe = useCallback(async () => {
if (permission === 'unsupported') {
setError('Push notifications are not supported in this browser.');
return;
}
if (permission === 'denied') {
setError('Push notifications are blocked. Please enable them in your browser settings.');
return;
}
setIsLoading(true);
setError(null);
try {
// 1. Register / get Service Worker
const registration = await navigator.serviceWorker.register('/sw.js', { scope: '/' }).catch(() => {
// If already registered, just get it
return navigator.serviceWorker.ready;
});
// 2. Request permission if not granted
if (Notification.permission !== 'granted') {
const result = await Notification.requestPermission();
if (result !== 'granted') {
setPermission(result);
setError('Permission denied. Cannot subscribe to push notifications.');
setIsLoading(false);
return;
}
setPermission('granted');
}
// 3. Get VAPID public key from server
const vapidResponse = await axios.get<{ data?: { public_key?: string } }>(
'/api/users/me/notifications/push/vapid-key'
);
const publicKey = vapidResponse.data?.data?.public_key;
if (!publicKey) {
throw new Error('VAPID public key not available from server.');
}
// 4. Subscribe to push
const subscription = await registration.pushManager.subscribe({
userVisibleOnly: true,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
applicationServerKey: urlBase64ToUint8Array(publicKey) as any,
});
// 5. Extract subscription details
const raw = subscription.toJSON();
const pushSubscription: PushSubscriptionInfo = {
endpoint: raw.endpoint ?? '',
p256dh: raw.keys?.p256dh ?? '',
auth: raw.keys?.auth ?? '',
};
// 6. Save subscription to server via the preferences endpoint
// The server stores these in user_notification.push_subscription_* columns
await axios.patch('/api/users/me/notifications/preferences', {
push_subscription_endpoint: pushSubscription.endpoint,
push_subscription_keys_p256dh: pushSubscription.p256dh,
push_subscription_keys_auth: pushSubscription.auth,
});
setIsSubscribed(true);
} catch (err) {
const msg = err instanceof Error ? err.message : 'Failed to subscribe to push notifications';
setError(msg);
} finally {
setIsLoading(false);
}
}, [permission]);
const unsubscribe = useCallback(async () => {
setIsLoading(true);
setError(null);
try {
// 1. Unsubscribe from push manager
const registration = await navigator.serviceWorker.ready;
const subscription = await registration.pushManager.getSubscription();
if (subscription) {
await subscription.unsubscribe();
}
// 2. Clear subscription on server
await axios.delete('/api/users/me/notifications/push/subscription');
setIsSubscribed(false);
} catch (err) {
const msg = err instanceof Error ? err.message : 'Failed to unsubscribe from push notifications';
setError(msg);
} finally {
setIsLoading(false);
}
}, []);
return { permission, isSubscribed, isLoading, error, subscribe, unsubscribe };
}
// ─── Utility ────────────────────────────────────────────────────────────────
/**
* Convert a Base64url string to a Uint8Array (for applicationServerKey).
* Matches the browser's built-in urlBase64ToUint8Array behavior.
*/
function urlBase64ToUint8Array(base64String: string): Uint8Array {
const padding = '='.repeat((4 - (base64String.length % 4)) % 4);
const base64 = (base64String + padding).replace(/-/g, '+').replace(/_/g, '/');
const rawData = atob(base64);
const outputArray = new Uint8Array(rawData.length);
for (let i = 0; i < rawData.length; ++i) {
outputArray[i] = rawData.charCodeAt(i);
}
return outputArray;
}

View File

@ -249,8 +249,12 @@ export class RoomWsClient {
});
}
disconnect(): void {
this.shouldReconnect = false;
disconnect(graceful = false): void {
// Only permanently disable reconnect on intentional disconnect (user action).
// Graceful disconnect (cleanup from effect swap) allows reconnect to continue.
if (!graceful) {
this.shouldReconnect = false;
}
this.stopHeartbeat();
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
@ -1011,7 +1015,10 @@ export class RoomWsClient {
break;
case 'room.reaction_updated':
case 'room_reaction_updated':
this.callbacks.onRoomReactionUpdated?.(event.data as RoomReactionUpdatedPayload);
this.callbacks.onRoomReactionUpdated?.({
...(event.data as Omit<RoomReactionUpdatedPayload, 'room_id'>),
room_id: event.room_id ?? '',
});
break;
default:
// Unknown event type - ignore silently
@ -1063,9 +1070,12 @@ export class RoomWsClient {
}
private async resubscribeAll(): Promise<void> {
// Subscribe/unsubscribe are WS-only actions — request() would fall back to HTTP
// which maps them to POST /ws (not a real REST endpoint), causing 404 failures.
// Use requestWs() to ensure they go through the WebSocket.
for (const roomId of this.subscribedRooms) {
try {
await this.request('room.subscribe', { room_id: roomId });
await this.requestWs<SubscribeData>('room.subscribe', { room_id: roomId });
} catch (err) {
// Resubscribe failure is non-fatal — messages still arrive via REST poll.
// Log at warn level so operators can observe patterns (e.g. auth expiry).
@ -1074,7 +1084,7 @@ export class RoomWsClient {
}
for (const projectName of this.subscribedProjects) {
try {
await this.request('project.subscribe', { project_name: projectName });
await this.requestWs<SubscribeData>('project.subscribe', { project_name: projectName });
} catch (err) {
console.warn(`[RoomWs] resubscribe project failed (will retry on next reconnect): ${projectName}`, err);
}
@ -1089,7 +1099,9 @@ export class RoomWsClient {
// (thundering herd) after a server restart, overwhelming it.
const baseDelay = this.reconnectBaseDelay * Math.pow(2, this.reconnectAttempt);
const cappedDelay = Math.min(baseDelay, this.reconnectMaxDelay);
const jitter = Math.random() * cappedDelay;
// Ensure minimum 500ms delay to avoid hitting backend 30s connection cooldown
const minDelay = 500;
const jitter = minDelay + Math.random() * (cappedDelay - minDelay);
const delay = Math.floor(jitter);
this.reconnectAttempt++;

View File

@ -186,6 +186,8 @@ export interface AiStreamChunkPayload {
content: string;
done: boolean;
error?: string;
/** Human-readable AI model name for display (e.g. "Claude 3.5 Sonnet"). */
display_name?: string;
}
export interface RoomResponse {
@ -283,6 +285,7 @@ export interface ReactionListData {
}
export interface RoomReactionUpdatedPayload {
room_id: string;
message_id: string;
reactions: ReactionItem[];
}

View File

@ -0,0 +1,17 @@
declare module 'browser-image-compression' {
export interface Options {
maxSizeMB?: number;
maxWidthOrHeight?: number;
useWebWorker?: boolean;
fileType?: string;
initialQuality?: number;
alwaysKeepResolution?: boolean;
preserveExif?: boolean;
onProgress?: (progress: number) => void;
usePixelLength?: boolean;
signal?: AbortSignal;
}
function imageCompression(file: File | Blob, options?: Options): Promise<Blob>;
export default imageCompression;
}