Compare commits

..

No commits in common. "8b47f677bb6c5e13d6235be37f5ad367492eea90" and "91bebba45e9cb272066515c73c0ae8dca0624f1a" have entirely different histories.

23 changed files with 75 additions and 645 deletions

2
Cargo.lock generated
View File

@ -8286,8 +8286,6 @@ dependencies = [
"actix-web", "actix-web",
"anyhow", "anyhow",
"env_logger", "env_logger",
"futures",
"log",
"mime", "mime",
"mime_guess2", "mime_guess2",
"serde", "serde",

View File

@ -1,11 +1,10 @@
use actix_cors::Cors; use actix_cors::Cors;
use actix_web::cookie::time::Duration; use actix_web::cookie::time::Duration;
use actix_web::dev::{Service, ServiceRequest, ServiceResponse}; use actix_web::middleware::Logger;
use actix_web::{cookie::Key, web, App, HttpResponse, HttpServer}; use actix_web::{cookie::Key, web, App, HttpResponse, HttpServer};
use clap::Parser; use clap::Parser;
use db::cache::AppCache; use db::cache::AppCache;
use db::database::AppDatabase; use db::database::AppDatabase;
use futures::future::LocalBoxFuture;
use observability::{ use observability::{
init_tracing_subscriber, install_recorder, prometheus_handler, spawn_http_metrics_poller, init_tracing_subscriber, install_recorder, prometheus_handler, spawn_http_metrics_poller,
HttpMetrics, HttpSnapshotGuard, MetricsMiddleware, TracingSpanMiddleware, HttpMetrics, HttpSnapshotGuard, MetricsMiddleware, TracingSpanMiddleware,
@ -15,8 +14,6 @@ use service::AppService;
use session::config::{PersistentSession, SessionLifecycle, TtlExtensionPolicy}; use session::config::{PersistentSession, SessionLifecycle, TtlExtensionPolicy};
use session::storage::RedisClusterSessionStore; use session::storage::RedisClusterSessionStore;
use session::SessionMiddleware; use session::SessionMiddleware;
use std::task::{Context, Poll};
use std::time::Instant;
mod args; mod args;
@ -30,80 +27,6 @@ pub struct AppState {
pub cache: AppCache, pub cache: AppCache,
} }
/// Custom middleware that logs requests except for noisy paths.
struct RequestLogger;
impl<S, B> actix_web::dev::Transform<S, ServiceRequest> for RequestLogger
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error>,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = actix_web::Error;
type Transform = RequestLoggerService<S>;
type InitError = ();
type Future = futures::future::Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
futures::future::ok(RequestLoggerService {
service,
_marker: std::marker::PhantomData,
})
}
}
struct RequestLoggerService<S> {
service: S,
_marker: std::marker::PhantomData<fn(ServiceRequest)>,
}
impl<S, B> actix_web::dev::Service<ServiceRequest> for RequestLoggerService<S>
where
S: actix_web::dev::Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error>,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = actix_web::Error;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&self, req: ServiceRequest) -> Self::Future {
let path = req.path().to_string();
let method = req.method().to_string();
let should_log = !(path == "/health"
|| path == "/metrics"
|| path.starts_with("/ws")
|| path.starts_with("/assets"));
let start = Instant::now();
let fut = self.service.call(req);
Box::pin(async move {
let res = fut.await?;
if should_log {
tracing::info!(
target: "http_request",
method = %method,
path = %path,
status = res.status().as_u16(),
elapsed = ?start.elapsed(),
"{} {} {} {:?}",
method,
path,
res.status().as_u16(),
start.elapsed()
);
}
Ok(res)
})
}
}
fn build_session_key(cfg: &AppConfig) -> anyhow::Result<Key> { fn build_session_key(cfg: &AppConfig) -> anyhow::Result<Key> {
if let Some(secret) = cfg.env.get("APP_SESSION_SECRET") { if let Some(secret) = cfg.env.get("APP_SESSION_SECRET") {
let bytes: Vec<u8> = secret.as_bytes().iter().cycle().take(64).copied().collect(); let bytes: Vec<u8> = secret.as_bytes().iter().cycle().take(64).copied().collect();
@ -205,7 +128,7 @@ async fn main() -> anyhow::Result<()> {
App::new() App::new()
.wrap(cors) .wrap(cors)
.wrap(session_mw) .wrap(session_mw)
.wrap(RequestLogger) .wrap(Logger::default().exclude("/health"))
.wrap(metrics_mw) .wrap(metrics_mw)
.wrap(TracingSpanMiddleware::new()) .wrap(TracingSpanMiddleware::new())
.app_data(web::Data::new(AppState { .app_data(web::Data::new(AppState {

View File

@ -8,7 +8,6 @@ actix-web = { workspace = true }
actix-files = { workspace = true } actix-files = { workspace = true }
actix-cors = { workspace = true } actix-cors = { workspace = true }
tokio = { workspace = true, features = ["full"] } tokio = { workspace = true, features = ["full"] }
futures = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
mime = { workspace = true } mime = { workspace = true }
@ -16,7 +15,6 @@ mime_guess2 = { workspace = true }
slog = { workspace = true } slog = { workspace = true }
anyhow = { workspace = true } anyhow = { workspace = true }
env_logger = { workspace = true } env_logger = { workspace = true }
log = "0.4"
[profile.release] [profile.release]
strip = true strip = true

View File

@ -1,12 +1,7 @@
use actix_cors::Cors; use actix_cors::Cors;
use actix_files::Files; use actix_files::Files;
use actix_web::dev::{Service, ServiceRequest, ServiceResponse}; use actix_web::{http::header, middleware::Logger, web, App, HttpResponse, HttpServer};
use actix_web::{http::header, web, App, HttpResponse, HttpServer};
use futures::future::LocalBoxFuture;
use log::info;
use std::path::PathBuf; use std::path::PathBuf;
use std::task::{Context, Poll};
use std::time::Instant;
/// Static file server for avatar, blob, and other static files /// Static file server for avatar, blob, and other static files
/// Serves files from /data/{type} directories /// Serves files from /data/{type} directories
@ -44,79 +39,6 @@ async fn health() -> HttpResponse {
})) }))
} }
/// Custom middleware that logs requests except for noisy paths (health, metrics, static files).
struct RequestLogger;
impl<S, B> actix_web::dev::Transform<S, ServiceRequest> for RequestLogger
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error>,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = actix_web::Error;
type Transform = RequestLoggerService<S>;
type InitError = ();
type Future = futures::future::Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
futures::future::ok(RequestLoggerService {
service,
_marker: std::marker::PhantomData,
})
}
}
struct RequestLoggerService<S> {
service: S,
_marker: std::marker::PhantomData<fn(ServiceRequest)>,
}
impl<S, B> Service<ServiceRequest> for RequestLoggerService<S>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error>,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = actix_web::Error;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&self, req: ServiceRequest) -> Self::Future {
let path = req.path().to_string();
let method = req.method().to_string();
let should_log = !(path == "/health"
|| path == "/metrics"
|| path.starts_with("/ws")
|| path.starts_with("/avatar")
|| path.starts_with("/blob")
|| path.starts_with("/media")
|| path.starts_with("/static"));
let start = Instant::now();
let fut = self.service.call(req);
Box::pin(async move {
let res = fut.await?;
if should_log {
info!(
target: "static_server",
"{} {} {} {:?}",
method,
path,
res.status().as_u16(),
start.elapsed()
);
}
Ok(res)
})
}
}
#[actix_web::main] #[actix_web::main]
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
@ -157,7 +79,7 @@ async fn main() -> anyhow::Result<()> {
App::new() App::new()
.wrap(cors) .wrap(cors)
.wrap(RequestLogger) .wrap(Logger::default())
.route("/health", web::get().to(health)) .route("/health", web::get().to(health))
.service( .service(
Files::new("/avatar", root.join("avatar")) Files::new("/avatar", root.join("avatar"))

View File

@ -7,7 +7,6 @@ pub mod issue;
pub mod openapi; pub mod openapi;
pub mod project; pub mod project;
pub mod pull_request; pub mod pull_request;
pub mod robots;
pub mod room; pub mod room;
pub mod route; pub mod route;
pub mod search; pub mod search;

View File

@ -1,82 +0,0 @@
use actix_multipart::Multipart;
use actix_web::{HttpResponse, Result, web};
use futures_util::StreamExt;
use service::AppService;
use session::Session;
#[derive(serde::Serialize, utoipa::ToSchema)]
pub struct AvatarUploadResponse {
pub avatar_url: String,
}
/// Upload a project's avatar.
/// Accepts a multipart form with a single file field.
#[utoipa::path(
post,
path = "/api/projects/{project_name}/avatar",
params(
("project_name" = String, Path, description = "Project name"),
),
responses(
(status = 200, description = "Avatar uploaded", body = crate::ApiResponse<AvatarUploadResponse>),
(status = 401, description = "Unauthorized"),
(status = 403, description = "No permission"),
),
tag = "Project"
)]
pub async fn upload_project_avatar(
service: web::Data<AppService>,
session: Session,
path: web::Path<String>,
mut payload: Multipart,
) -> Result<HttpResponse, crate::error::ApiError> {
let project_name = path.into_inner();
let max_size: usize = 2 * 1024 * 1024; // 2MB
let mut file_data: Vec<u8> = Vec::new();
let mut file_ext = "png".to_string();
while let Some(item) = payload.next().await {
let mut field = item.map_err(|e| {
crate::error::ApiError(service::error::AppError::BadRequest(e.to_string()))
})?;
// Detect file extension from content-type
if let Some(content_type) = field.content_type() {
let ext = match content_type.essence_str() {
"image/jpeg" | "image/jpg" => "jpg",
"image/gif" => "gif",
"image/webp" => "webp",
"image/png" | _ => "png",
};
file_ext = ext.to_string();
}
while let Some(chunk) = field.next().await {
let data = chunk.map_err(|e| {
crate::error::ApiError(service::error::AppError::BadRequest(e.to_string()))
})?;
if file_data.len() + data.len() > max_size {
return Err(crate::error::ApiError(
service::error::AppError::BadRequest(
"File exceeds maximum size of 2MB".to_string(),
),
));
}
file_data.extend_from_slice(&data);
}
}
if file_data.is_empty() {
return Err(crate::error::ApiError(
service::error::AppError::BadRequest("No file provided".to_string()),
));
}
let avatar_url = service
.project_avatar_upload(&session, project_name, file_data, file_ext)
.await
.map_err(crate::error::ApiError::from)?;
Ok(crate::ApiResponse::ok(AvatarUploadResponse { avatar_url }).to_response())
}

View File

@ -1,6 +1,5 @@
pub mod activity; pub mod activity;
pub mod audit; pub mod audit;
pub mod avatar;
pub mod billing; pub mod billing;
pub mod board; pub mod board;
pub mod info; pub mod info;
@ -169,10 +168,6 @@ pub fn init_project_routes(cfg: &mut web::ServiceConfig) {
"/{project_name}/activities", "/{project_name}/activities",
web::post().to(activity::project_log_activity), web::post().to(activity::project_log_activity),
) )
.route(
"/{project_name}/avatar",
web::post().to(avatar::upload_project_avatar),
)
.route( .route(
"/{project_name}/billing", "/{project_name}/billing",
web::get().to(billing::project_billing), web::get().to(billing::project_billing),

View File

View File

@ -276,8 +276,6 @@ pub async fn ws_universal(
"content": chunk.content, "content": chunk.content,
"done": chunk.done, "done": chunk.done,
"error": chunk.error, "error": chunk.error,
"display_name": chunk.display_name,
"chunk_type": chunk.chunk_type,
}, },
}); });
if session.text(payload.to_string()).await.is_err() { if session.text(payload.to_string()).await.is_err() {
@ -294,7 +292,6 @@ pub async fn ws_universal(
"username": event.username, "username": event.username,
"avatar_url": event.avatar_url, "avatar_url": event.avatar_url,
"action": event.action, "action": event.action,
"sender_type": event.sender_type.as_deref().unwrap_or("user"),
}, },
}); });
if session.text(payload.to_string()).await.is_err() { if session.text(payload.to_string()).await.is_err() {
@ -410,7 +407,6 @@ pub async fn ws_universal(
username: names.into_values().next().unwrap_or_else(|| "unknown".to_string()), username: names.into_values().next().unwrap_or_else(|| "unknown".to_string()),
avatar_url: None, avatar_url: None,
action: action.to_string(), action: action.to_string(),
sender_type: None,
}; };
manager.broadcast_typing(room_id, typing_event).await; manager.broadcast_typing(room_id, typing_event).await;
} }

View File

@ -1,76 +0,0 @@
use actix_multipart::Multipart;
use actix_web::{HttpResponse, Result, web};
use futures_util::StreamExt;
use service::AppService;
use session::Session;
#[derive(serde::Serialize, utoipa::ToSchema)]
pub struct AvatarUploadResponse {
pub avatar_url: String,
}
/// Upload current user's avatar.
/// Accepts a multipart form with a single file field.
#[utoipa::path(
post,
path = "/api/users/me/avatar",
responses(
(status = 200, description = "Avatar uploaded", body = crate::ApiResponse<AvatarUploadResponse>),
(status = 401, description = "Unauthorized"),
),
tag = "User"
)]
pub async fn upload_avatar(
service: web::Data<AppService>,
session: Session,
mut payload: Multipart,
) -> Result<HttpResponse, crate::error::ApiError> {
let max_size: usize = 2 * 1024 * 1024; // 2MB
let mut file_data: Vec<u8> = Vec::new();
let mut file_ext = "png".to_string();
while let Some(item) = payload.next().await {
let mut field = item.map_err(|e| {
crate::error::ApiError(service::error::AppError::BadRequest(e.to_string()))
})?;
// Detect file extension from content-type
if let Some(content_type) = field.content_type() {
let ext = match content_type.essence_str() {
"image/jpeg" | "image/jpg" => "jpg",
"image/gif" => "gif",
"image/webp" => "webp",
"image/png" | _ => "png",
};
file_ext = ext.to_string();
}
while let Some(chunk) = field.next().await {
let data = chunk.map_err(|e| {
crate::error::ApiError(service::error::AppError::BadRequest(e.to_string()))
})?;
if file_data.len() + data.len() > max_size {
return Err(crate::error::ApiError(
service::error::AppError::BadRequest(
"File exceeds maximum size of 2MB".to_string(),
),
));
}
file_data.extend_from_slice(&data);
}
}
if file_data.is_empty() {
return Err(crate::error::ApiError(
service::error::AppError::BadRequest("No file provided".to_string()),
));
}
let avatar_url = service
.user_avatar_upload(session, file_data, &file_ext)
.await
.map_err(crate::error::ApiError::from)?;
Ok(crate::ApiResponse::ok(AvatarUploadResponse { avatar_url }).to_response())
}

View File

@ -1,5 +1,4 @@
pub mod access_key; pub mod access_key;
pub mod avatar;
pub mod chpc; pub mod chpc;
pub mod notification; pub mod notification;
pub mod preferences; pub mod preferences;
@ -19,7 +18,6 @@ pub fn init_user_routes(cfg: &mut web::ServiceConfig) {
web::scope("/users") web::scope("/users")
.route("/me/profile", web::get().to(profile::get_my_profile)) .route("/me/profile", web::get().to(profile::get_my_profile))
.route("/me/profile", web::patch().to(profile::update_my_profile)) .route("/me/profile", web::patch().to(profile::update_my_profile))
.route("/me/avatar", web::post().to(avatar::upload_avatar))
.route( .route(
"/me/preferences", "/me/preferences",
web::get().to(preferences::get_preferences), web::get().to(preferences::get_preferences),

View File

@ -54,9 +54,6 @@ pub struct TypingEvent {
pub avatar_url: Option<String>, pub avatar_url: Option<String>,
/// "start" or "stop" /// "start" or "stop"
pub action: String, pub action: String,
/// Sender type: "user" or "ai". Defaults to "user" if absent.
#[serde(skip_serializing_if = "Option::is_none")]
pub sender_type: Option<String>,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]

View File

@ -634,30 +634,24 @@ impl RoomConnectionManager {
room_id: Uuid, room_id: Uuid,
) -> broadcast::Receiver<Arc<TypingEvent>> { ) -> broadcast::Receiver<Arc<TypingEvent>> {
let mut map: tokio::sync::RwLockWriteGuard<'_, std::collections::HashMap<Uuid, broadcast::Sender<Arc<TypingEvent>>>> = self.typing_inner.write().await; let mut map: tokio::sync::RwLockWriteGuard<'_, std::collections::HashMap<Uuid, broadcast::Sender<Arc<TypingEvent>>>> = self.typing_inner.write().await;
let tx = map.entry(room_id).or_insert_with(|| { if let Some(tx) = map.get(&room_id) {
let (tx, _) = broadcast::channel(BROADCAST_CAPACITY); return tx.subscribe();
tx
});
// Replay active typing state from Redis to the new subscriber.
// This ensures newly connected WS clients see who is currently typing.
let active_events = self.get_active_typing_events(room_id).await;
for event in active_events {
let _ = tx.send(Arc::new(event));
} }
tx.subscribe() let (tx, rx) = broadcast::channel(BROADCAST_CAPACITY);
map.insert(room_id, tx);
rx
} }
/// Broadcast a typing event and persist it to Redis with 60s TTL. /// Broadcast a typing event and persist it to Redis with 10s TTL.
/// - "start": writes key with 60s expiry, broadcasts start event /// - "start": writes key with 10s expiry, broadcasts start event
/// - "stop": deletes key, broadcasts stop event /// - "stop": deletes key, broadcasts stop event
pub async fn broadcast_typing(&self, room_id: Uuid, event: TypingEvent) { pub async fn broadcast_typing(&self, room_id: Uuid, event: TypingEvent) {
let user_key = format!("typing:{}:{}", room_id, event.user_id); let user_key = format!("typing:{}:{}", room_id, event.user_id);
let action = event.action.clone(); let action = event.action.clone();
let username = event.username.clone(); let username = event.username.clone();
let avatar_url = event.avatar_url.clone(); let avatar_url = event.avatar_url.clone();
let sender_type = event.sender_type.clone().unwrap_or_else(|| "user".to_string());
// Write/delete Redis key for 60s expiry (non-blocking) // Write/delete Redis key for 10s expiry (non-blocking)
if let Ok(mut conn) = self.cache.conn().await { if let Ok(mut conn) = self.cache.conn().await {
let key = user_key; let key = user_key;
tokio::spawn(async move { tokio::spawn(async move {
@ -665,12 +659,11 @@ impl RoomConnectionManager {
let value = serde_json::json!({ let value = serde_json::json!({
"username": username, "username": username,
"avatar_url": avatar_url, "avatar_url": avatar_url,
"sender_type": sender_type,
}) })
.to_string(); .to_string();
let _: Result<(), _> = redis::cmd("SETEX") let _: Result<(), _> = redis::cmd("SETEX")
.arg(&key) .arg(&key)
.arg(60i64) .arg(10i64)
.arg(&value) .arg(&value)
.query_async(&mut conn) .query_async(&mut conn)
.await; .await;
@ -686,43 +679,6 @@ impl RoomConnectionManager {
let _ = tx.send(event); let _ = tx.send(event);
} }
} }
/// Load all active typing entries for a room from Redis and return as TypingEvents.
/// Used to replay current typing state to newly connected WS clients.
pub async fn get_active_typing_events(&self, room_id: Uuid) -> Vec<TypingEvent> {
let pattern = format!("typing:{}:*", room_id);
if let Ok(mut conn) = self.cache.conn().await {
let keys: Vec<String> = match redis::cmd("KEYS").arg(&pattern).query_async(&mut conn).await {
Ok(k) => k,
Err(_) => return vec![],
};
if keys.is_empty() {
return vec![];
}
let mut results = Vec::new();
for key in keys {
let parts: Vec<&str> = key.split(':').collect();
let user_id = parts.get(2).and_then(|s| Uuid::parse_str(s).ok());
if let (Some(value), Some(user_uuid)) = (
redis::cmd("GET").arg(&key).query_async::<String>(&mut conn).await.ok(),
user_id,
) {
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&value) {
results.push(TypingEvent {
room_id,
user_id: user_uuid,
username: parsed.get("username").and_then(|v| v.as_str()).unwrap_or("").to_string(),
avatar_url: parsed.get("avatar_url").and_then(|v| v.as_str()).map(String::from),
action: "start".to_string(),
sender_type: parsed.get("sender_type").and_then(|v| v.as_str()).map(String::from),
});
}
}
}
return results;
}
vec![]
}
} }
fn parse_sender_type(s: &str) -> MessageSenderType { fn parse_sender_type(s: &str) -> MessageSenderType {

View File

@ -1030,19 +1030,6 @@ impl RoomService {
.register_stream_channel(streaming_msg_id) .register_stream_channel(streaming_msg_id)
.await; .await;
// Emit an initial "thinking" chunk immediately so the frontend shows the
// "AI is thinking..." indicator without waiting for the first real token.
let initial_event = RoomMessageStreamChunkEvent {
message_id: streaming_msg_id,
room_id,
content: String::new(),
done: false,
error: None,
display_name: Some(request.model.name.clone()),
chunk_type: Some("thinking".to_string()),
};
self.room_manager.broadcast_stream_chunk(initial_event).await;
let room_manager = self.room_manager.clone(); let room_manager = self.room_manager.clone();
let db = self.db.clone(); let db = self.db.clone();
let room_id_inner = room_id; let room_id_inner = room_id;
@ -1059,8 +1046,6 @@ impl RoomService {
let room_manager = room_manager.clone(); let room_manager = room_manager.clone();
let db = db.clone(); let db = db.clone();
let model_id = model_id; let model_id = model_id;
// Fixed UUID to identify AI typing events across WS reconnections.
let ai_typing_id = Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
// Clone before closure so closure captures clone, not the original. // Clone before closure so closure captures clone, not the original.
let ai_display_name_for_chunk = ai_display_name.clone(); let ai_display_name_for_chunk = ai_display_name.clone();
let ai_display_name_for_final = ai_display_name.clone(); let ai_display_name_for_final = ai_display_name.clone();
@ -1103,17 +1088,6 @@ impl RoomService {
let stream_callback: agent::chat::StreamCallback = Box::new(on_chunk); let stream_callback: agent::chat::StreamCallback = Box::new(on_chunk);
// Broadcast AI typing.start so WS clients (including reconnections) see the indicator.
let typing_start = queue::TypingEvent {
room_id: room_id_inner,
user_id: ai_typing_id,
username: ai_display_name.clone(),
avatar_url: None,
action: "start".to_string(),
sender_type: Some("ai".to_string()),
};
room_manager.broadcast_typing(room_id_inner, typing_start).await;
match chat_service.process_stream(request, stream_callback).await { match chat_service.process_stream(request, stream_callback).await {
Ok(full_content) => { Ok(full_content) => {
let envelope = RoomMessageEnvelope { let envelope = RoomMessageEnvelope {
@ -1168,17 +1142,6 @@ impl RoomService {
room_manager.broadcast(room_id_inner, msg_event).await; room_manager.broadcast(room_id_inner, msg_event).await;
room_manager.metrics.messages_sent.increment(1); room_manager.metrics.messages_sent.increment(1);
// Stop AI typing indicator now that the message is delivered.
let typing_stop = queue::TypingEvent {
room_id: room_id_inner,
user_id: ai_typing_id,
username: ai_display_name_for_final.clone(),
avatar_url: None,
action: "stop".to_string(),
sender_type: Some("ai".to_string()),
};
room_manager.broadcast_typing(room_id_inner, typing_stop).await;
let event = queue::ProjectRoomEvent { let event = queue::ProjectRoomEvent {
event_type: super::RoomEventType::NewMessage.as_str().into(), event_type: super::RoomEventType::NewMessage.as_str().into(),
project_id: project_id_inner, project_id: project_id_inner,
@ -1195,17 +1158,6 @@ impl RoomService {
} }
Err(e) => { Err(e) => {
tracing::error!(error = %e, "AI streaming failed"); tracing::error!(error = %e, "AI streaming failed");
// Stop AI typing indicator since the stream failed.
let typing_stop = queue::TypingEvent {
room_id: room_id_inner,
user_id: ai_typing_id,
username: ai_display_name.clone(),
avatar_url: None,
action: "stop".to_string(),
sender_type: Some("ai".to_string()),
};
room_manager.broadcast_typing(room_id_inner, typing_stop).await;
let event = RoomMessageStreamChunkEvent { let event = RoomMessageStreamChunkEvent {
message_id: streaming_msg_id, message_id: streaming_msg_id,
room_id: room_id_inner, room_id: room_id_inner,

View File

@ -15,7 +15,7 @@ impl AppService {
project_name: String, project_name: String,
file: Vec<u8>, file: Vec<u8>,
file_ext: String, file_ext: String,
) -> Result<String, AppError> { ) -> Result<(), AppError> {
let user_uid = ctx.user().ok_or(AppError::Unauthorized)?; let user_uid = ctx.user().ok_or(AppError::Unauthorized)?;
let project = self let project = self
.utils_find_project_by_name(project_name.clone()) .utils_find_project_by_name(project_name.clone())
@ -26,18 +26,14 @@ impl AppService {
if role == MemberRole::Member { if role == MemberRole::Member {
return Err(AppError::NoPower); return Err(AppError::NoPower);
} }
let time = Utc::now().timestamp(); let time = Utc::now();
let file_name = format!("{}-{}", project.id, time); let file_name = format!("{}-{}", project.id, time);
self.avatar self.avatar
.upload(file, file_name.clone(), &file_ext) .upload(file, file_name.clone(), &file_ext)
.await .await
.map_err(|e| AppError::AvatarUploadError(e.to_string()))?; .map_err(|e| AppError::AvatarUploadError(e.to_string()))?;
let static_url = self.config.static_domain().unwrap_or("/static".to_string()); let static_url = self.config.static_domain().unwrap_or("/static".to_string());
let file_url = format!( let file_url = format!("{}/{}", static_url, file_name);
"{}/avatar/{}",
static_url.trim_end_matches('/'),
format!("{}.{}", file_name, file_ext)
);
project::Entity::update_many() project::Entity::update_many()
.filter(project::Column::Id.eq(project.id)) .filter(project::Column::Id.eq(project.id))
.col_expr(project::Column::AvatarUrl, Expr::value(file_url.clone())) .col_expr(project::Column::AvatarUrl, Expr::value(file_url.clone()))
@ -58,6 +54,6 @@ impl AppService {
ctx, ctx,
) )
.await?; .await?;
Ok(file_url) Ok(())
} }
} }

View File

@ -30,7 +30,7 @@ impl AppService {
.static_domain() .static_domain()
.unwrap_or_else(|_| "/static".to_string()); .unwrap_or_else(|_| "/static".to_string());
let file_url = format!( let file_url = format!(
"{}/avatar/{}", "{}/{}",
static_url.trim_end_matches('/'), static_url.trim_end_matches('/'),
format!("{}.{}", file_name, file_ext) format!("{}.{}", file_name, file_ext)
); );

View File

@ -1,6 +1,5 @@
import { useMutation } from '@tanstack/react-query';
import { useState } from 'react'; import { useState } from 'react';
import { Loader2, Upload, User } from 'lucide-react'; import { Loader2 } from 'lucide-react';
import { toast } from 'sonner'; import { toast } from 'sonner';
import { import {
AlertDialog, AlertDialog,
@ -31,51 +30,12 @@ export function SettingsGeneral() {
const [displayName, setDisplayName] = useState(project?.display_name || ''); const [displayName, setDisplayName] = useState(project?.display_name || '');
const [description, setDescription] = useState(project?.description || ''); const [description, setDescription] = useState(project?.description || '');
const [isPublic, setIsPublic] = useState(project?.is_public ?? false); const [isPublic, setIsPublic] = useState(project?.is_public ?? false);
const [avatarUrl, setAvatarUrl] = useState(project?.avatar_url ?? '');
const hasChanges = const hasChanges =
displayName !== (project?.display_name || '') || displayName !== (project?.display_name || '') ||
description !== (project?.description || '') || description !== (project?.description || '') ||
isPublic !== (project?.is_public ?? false); isPublic !== (project?.is_public ?? false);
const uploadAvatarMutation = useMutation({
mutationFn: async (file: File) => {
const baseUrl = import.meta.env.VITE_API_BASE_URL ?? '';
const formData = new FormData();
formData.append('file', file);
const res = await fetch(`${baseUrl}/api/projects/${project?.name}/avatar`, {
method: 'POST',
body: formData,
credentials: 'include',
});
if (!res.ok) {
const err = await res.json().catch(() => null);
throw new Error(err?.message ?? 'Upload failed');
}
const json = await res.json();
return json as { data?: { avatar_url?: string } };
},
onSuccess: (data) => {
toast.success('Project avatar updated');
const url = data?.data?.avatar_url;
if (url) setAvatarUrl(url);
refetch();
},
onError: (err: Error) => {
toast.error(err.message || 'Failed to upload avatar');
},
});
const handleFileChange = (e: React.ChangeEvent<HTMLInputElement>) => {
const file = e.target.files?.[0];
if (!file) return;
if (file.size > 2 * 1024 * 1024) {
toast.error('File size must be less than 2MB');
return;
}
uploadAvatarMutation.mutate(file);
};
const handleSave = async () => { const handleSave = async () => {
if (!project?.name) return; if (!project?.name) return;
@ -120,47 +80,6 @@ export function SettingsGeneral() {
</CardHeader> </CardHeader>
<CardContent className="space-y-6 pt-2"> <CardContent className="space-y-6 pt-2">
<div className="space-y-2">
<Label>Project Avatar</Label>
<div className="flex items-center gap-4">
<div className="h-16 w-16 rounded-lg border-2 border-muted bg-muted/50 flex items-center justify-center overflow-hidden shrink-0">
{avatarUrl ? (
<img
src={avatarUrl}
alt="Project avatar"
className="h-full w-full object-cover"
onError={(e) => { e.currentTarget.style.display = 'none'; }}
/>
) : (
<User className="h-8 w-8 text-muted-foreground/50" />
)}
{uploadAvatarMutation.isPending && (
<div className="absolute inset-0 bg-background/50 flex items-center justify-center">
<Loader2 className="h-5 w-5 animate-spin" />
</div>
)}
</div>
<Button
type="button"
variant="outline"
size="sm"
onClick={() => document.getElementById('project-avatar-upload')?.click()}
disabled={uploadAvatarMutation.isPending}
>
<Upload className="h-4 w-4 mr-2" />
Upload avatar
</Button>
<Input
id="project-avatar-upload"
type="file"
className="hidden"
accept="image/png,image/jpeg,image/gif,image/webp"
onChange={handleFileChange}
/>
<span className="text-xs text-muted-foreground">PNG, JPG, GIF or WebP. Max 2MB.</span>
</div>
</div>
<div className="space-y-2"> <div className="space-y-2">
<Label htmlFor="display-name">Display Name</Label> <Label htmlFor="display-name">Display Name</Label>
<Input <Input

View File

@ -43,45 +43,45 @@ export function SettingsProfile() {
websiteUrl !== originalWebsiteUrl || websiteUrl !== originalWebsiteUrl ||
organization !== originalOrganization; organization !== originalOrganization;
const uploadAvatarMutation = useMutation({ const uploadAvatarMutation = useMutation({
mutationFn: async (file: File) => { mutationFn: async (data: { image_data: string; format?: string }) => {
const baseUrl = import.meta.env.VITE_API_BASE_URL ?? ''; // This would need an actual avatar upload API
const formData = new FormData(); // For now, we'll just use the data URL directly
formData.append('file', file); return { data: { avatar_url: `data:image/png;base64,${data.image_data}` } };
const res = await fetch(`${baseUrl}/api/users/me/avatar`, { },
method: 'POST', onSuccess: (data: unknown) => {
body: formData, toast.success('Avatar uploaded successfully');
credentials: 'include', const url = (data as { data?: { avatar_url?: string } })?.data?.avatar_url;
}); if (url) setAvatarUrl(url);
if (!res.ok) { queryClient.invalidateQueries({ queryKey: ['userProfile'] });
const err = await res.json().catch(() => null); },
throw new Error(err?.message ?? 'Upload failed'); onError: () => {
} toast.error('Failed to upload avatar');
const json = await res.json(); },
return json as { data?: { avatar_url?: string } }; });
},
onSuccess: (data) => {
toast.success('Avatar uploaded successfully');
const url = data?.data?.avatar_url;
if (url) setAvatarUrl(url);
queryClient.invalidateQueries({ queryKey: ['userProfile'] });
},
onError: (err: Error) => {
toast.error(err.message || 'Failed to upload avatar');
},
});
const handleFileChange = (e: React.ChangeEvent<HTMLInputElement>) => { const handleFileChange = (e: React.ChangeEvent<HTMLInputElement>) => {
const file = e.target.files?.[0]; const file = e.target.files?.[0];
if (!file) return; if (!file) return;
if (file.size > 2 * 1024 * 1024) { if (file.size > 2 * 1024 * 1024) {
toast.error('File size must be less than 2MB'); toast.error('File size must be less than 2MB');
return; return;
} }
uploadAvatarMutation.mutate(file); const reader = new FileReader();
}; reader.onloadend = () => {
const base64String = reader.result as string;
const base64Data = base64String.split(',')[1];
const format = file.type.split('/')[1] || 'png';
uploadAvatarMutation.mutate({
image_data: base64Data,
format: format,
});
};
reader.readAsDataURL(file);
};
const updateProfileMutation = useMutation({ const updateProfileMutation = useMutation({
mutationFn: async () => { mutationFn: async () => {

View File

@ -352,8 +352,8 @@ export function DiscordChatPanel({ room, isAdmin, onClose, onDelete, onToggleCha
onCreateThread={handleCreateThread} onCreateThread={handleCreateThread}
/> />
{/* AI thinking / generating indicator — hidden when typingUsers already shows AI */} {/* AI thinking / generating indicator */}
{activeAiStream && !Object.entries(typingUsers?.[room.id] ?? {}).find(([, v]) => v.sender_type === 'ai') && ( {activeAiStream && (
<div className="px-4 py-1 text-xs flex items-center gap-1.5" style={{ color: 'var(--room-text-subtle)' }}> <div className="px-4 py-1 text-xs flex items-center gap-1.5" style={{ color: 'var(--room-text-subtle)' }}>
<span className="flex gap-0.5"> <span className="flex gap-0.5">
{[0, 1, 2].map((i) => ( {[0, 1, 2].map((i) => (
@ -374,9 +374,9 @@ export function DiscordChatPanel({ room, isAdmin, onClose, onDelete, onToggleCha
{/* Human typing indicator — show who is typing */} {/* Human typing indicator — show who is typing */}
{(() => { {(() => {
const roomTyping = typingUsers?.[room.id] ?? {}; const roomTyping = typingUsers?.[room.id] ?? {};
const humanTyping = Object.entries(roomTyping).filter(([, v]) => v.sender_type !== 'ai'); const typingList = Object.entries(roomTyping);
if (humanTyping.length === 0) return null; if (typingList.length === 0) return null;
const names = humanTyping.map(([, v]) => v.username); const names = typingList.map(([, v]) => v.username);
const label = names.length === 1 const label = names.length === 1
? `${names[0]} is typing...` ? `${names[0]} is typing...`
: names.length === 2 : names.length === 2
@ -400,31 +400,6 @@ export function DiscordChatPanel({ room, isAdmin, onClose, onDelete, onToggleCha
); );
})()} })()}
{/* AI typing — distinct from human typing */}
{(() => {
const roomTyping = typingUsers?.[room.id] ?? {};
const aiTyping = Object.entries(roomTyping).find(([, v]) => v.sender_type === 'ai');
if (!aiTyping) return null;
const name = aiTyping[1].username;
return (
<div className="px-4 py-1 text-xs flex items-center gap-1.5" style={{ color: 'var(--room-text-subtle)' }}>
<span className="flex gap-0.5">
{[0, 1, 2].map((i) => (
<span
key={i}
className="w-1.5 h-1.5 rounded-full"
style={{ background: 'var(--room-text-subtle)', animation: `typing-bounce 1.2s infinite ${i * 0.2}s` }}
/>
))}
</span>
<span>
<span style={{ color: 'var(--room-accent)', fontWeight: 500 }}>{name}</span>
{' is thinking...'}
</span>
</div>
);
})()}
<MessageInput <MessageInput
ref={messageInputRef} ref={messageInputRef}
roomName={room.room_name ?? 'room'} roomName={room.room_name ?? 'room'}

View File

@ -95,11 +95,7 @@ export const MessageInput = forwardRef<MessageInputHandle, MessageInputProps>(fu
const typingStopTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null); const typingStopTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null);
const sendTypingStart = useCallback(() => { const sendTypingStart = useCallback(() => {
if (!wsClient || !activeRoomId) { if (!wsClient || !activeRoomId) return;
console.debug('[MessageInput] sendTypingStart skipped: wsClient=', !!wsClient, 'activeRoomId=', activeRoomId);
return;
}
console.debug('[MessageInput] sendTypingStart room:', activeRoomId);
if (typingStopTimerRef.current) { if (typingStopTimerRef.current) {
clearTimeout(typingStopTimerRef.current); clearTimeout(typingStopTimerRef.current);
typingStopTimerRef.current = null; typingStopTimerRef.current = null;
@ -122,7 +118,6 @@ export const MessageInput = forwardRef<MessageInputHandle, MessageInputProps>(fu
// Only stop typing on explicit clear or send. // Only stop typing on explicit clear or send.
return; return;
} }
console.debug('[MessageInput] handleEditorUpdate text_len:', text.length, 'ws:', !!wsClient, 'room:', activeRoomId);
sendTypingStart(); sendTypingStart();
// Auto-stop after 1.5s of inactivity // Auto-stop after 1.5s of inactivity
if (typingStopTimerRef.current) clearTimeout(typingStopTimerRef.current); if (typingStopTimerRef.current) clearTimeout(typingStopTimerRef.current);

View File

@ -170,8 +170,8 @@ interface RoomContextValue {
roomAiConfigs: RoomAiConfig[]; roomAiConfigs: RoomAiConfig[];
aiConfigsLoading?: boolean; aiConfigsLoading?: boolean;
/** Typing users in the active room: roomId -> userId -> { username, avatar_url, sender_type } */ /** Typing users in the active room: roomId -> userId -> { username, avatar_url } */
typingUsers: Record<string, Record<string, { username: string; avatar_url?: string; sender_type?: string; timeoutId?: ReturnType<typeof setTimeout> }>>; typingUsers: Record<string, Record<string, { username: string; avatar_url?: string; timeoutId?: ReturnType<typeof setTimeout> }>>;
} }
const RoomContext = createContext<RoomContextValue | null>(null); const RoomContext = createContext<RoomContextValue | null>(null);
@ -435,8 +435,8 @@ export function RoomProvider({
// User presence map: user_id -> status // User presence map: user_id -> status
const [presence, setPresence] = useState<PresenceMap>({}); const [presence, setPresence] = useState<PresenceMap>({});
// Typing users map: roomId -> Map<userId, { username, avatar_url, sender_type, timeoutId }> // Typing users map: roomId -> Map<userId, { username, avatar_url, timeoutId }>
const [typingUsers, setTypingUsers] = useState<Record<string, Record<string, { username: string; avatar_url?: string; sender_type?: string; timeoutId?: ReturnType<typeof setTimeout> }>>>({}); const [typingUsers, setTypingUsers] = useState<Record<string, Record<string, { username: string; avatar_url?: string; timeoutId?: ReturnType<typeof setTimeout> }>>>({});
@ -563,7 +563,6 @@ export function RoomProvider({
} }
}, },
onAiStreamChunk: (chunk: { done: boolean; message_id: string; room_id: string; content: string; display_name?: string; chunk_type?: string }) => { onAiStreamChunk: (chunk: { done: boolean; message_id: string; room_id: string; content: string; display_name?: string; chunk_type?: string }) => {
console.debug('[RoomContext] onAiStreamChunk', chunk.chunk_type, chunk.done ? '(done)' : '', 'msg:', chunk.message_id);
const isToolCall = chunk.chunk_type === 'tool_call' || chunk.chunk_type === 'tool_result'; const isToolCall = chunk.chunk_type === 'tool_call' || chunk.chunk_type === 'tool_result';
if (chunk.done) { if (chunk.done) {
@ -768,32 +767,25 @@ export function RoomProvider({
setPresence((prev) => ({ ...prev, [payload.user_id]: payload.status })); setPresence((prev) => ({ ...prev, [payload.user_id]: payload.status }));
}, },
onTypingStart: (payload) => { onTypingStart: (payload) => {
console.debug('[RoomContext] onTypingStart', payload.room_id, 'user:', payload.user_id, 'username:', payload.username, 'currentRoom:', activeRoomIdRef.current, 'currentUser:', user?.uid);
if (payload.room_id !== activeRoomIdRef.current) return; if (payload.room_id !== activeRoomIdRef.current) return;
// Skip own typing events (except AI — own AI stream should still show indicator). if (payload.user_id === user?.uid) return;
if (payload.user_id === user?.uid && payload.sender_type !== 'ai') return;
setTypingUsers((prev) => { setTypingUsers((prev) => {
const roomMap = prev[payload.room_id] ?? {}; const roomMap = prev[payload.room_id] ?? {};
// Clear existing timeout for this user // Clear existing timeout for this user
const existing = roomMap[payload.user_id]; const existing = roomMap[payload.user_id];
if (existing?.timeoutId) clearTimeout(existing.timeoutId); if (existing?.timeoutId) clearTimeout(existing.timeoutId);
// AI typing has explicit backend stop — no timeout needed. const timeoutId = setTimeout(() => {
// Human typing uses 4s client-side expiry as a fallback. setTypingUsers((p) => {
let timeoutId: ReturnType<typeof setTimeout> | undefined; const rm = { ...p[payload.room_id] };
if (payload.sender_type !== 'ai') { delete rm[payload.user_id];
timeoutId = setTimeout(() => { return { ...p, [payload.room_id]: rm };
setTypingUsers((p) => { });
const rm = { ...p[payload.room_id] }; }, 4000);
delete rm[payload.user_id];
return { ...p, [payload.room_id]: rm };
});
}, 4000);
}
const next = { const next = {
...prev, ...prev,
[payload.room_id]: { [payload.room_id]: {
...roomMap, ...roomMap,
[payload.user_id]: { username: payload.username, avatar_url: payload.avatar_url, sender_type: payload.sender_type, timeoutId }, [payload.user_id]: { username: payload.username, avatar_url: payload.avatar_url, timeoutId },
}, },
}; };
return next; return next;
@ -854,25 +846,6 @@ export function RoomProvider({
} }
}, []); }, []);
// Reconnect WS when tab becomes visible again after background throttling.
// Chrome heavily throttles setInterval in background tabs (1s granularity or pauses),
// so heartbeat may not fire in time, causing the backend to close the connection.
useEffect(() => {
const handleVisibilityChange = () => {
if (document.visibilityState === 'visible') {
const client = wsClientRef.current;
if (client && client.getStatus() !== 'open') {
console.debug('[RoomContext] Tab visible, reconnecting WS...');
client.connect().catch(() => {});
}
}
};
document.addEventListener('visibilitychange', handleVisibilityChange);
return () => {
document.removeEventListener('visibilitychange', handleVisibilityChange);
};
}, []);
const disconnectWs = useCallback(() => { const disconnectWs = useCallback(() => {
wsClientRef.current?.disconnect(); wsClientRef.current?.disconnect();
}, []); }, []);

View File

@ -1054,15 +1054,13 @@ export class RoomWsClient {
case 'room.typing': case 'room.typing':
case 'room_typing': case 'room_typing':
{ {
const data = event.data as { user_id?: string; username?: string; avatar_url?: string; action?: string; sender_type?: string } | undefined; const data = event.data as { user_id?: string; username?: string; avatar_url?: string; action?: string } | undefined;
console.debug('[RoomWs] room.typing event:', data?.action, 'room:', event.room_id, 'user:', data?.user_id, 'username:', data?.username, 'sender:', data?.sender_type);
if (data?.action === 'start') { if (data?.action === 'start') {
this.callbacks.onTypingStart?.({ this.callbacks.onTypingStart?.({
room_id: event.room_id ?? '', room_id: event.room_id ?? '',
user_id: data.user_id ?? '', user_id: data.user_id ?? '',
username: data.username ?? '', username: data.username ?? '',
avatar_url: data.avatar_url, avatar_url: data.avatar_url,
sender_type: data.sender_type,
}); });
} else if (data?.action === 'stop') { } else if (data?.action === 'stop') {
this.callbacks.onTypingStop?.({ this.callbacks.onTypingStop?.({

View File

@ -144,8 +144,6 @@ export interface TypingStartPayload {
user_id: string; user_id: string;
username: string; username: string;
avatar_url?: string; avatar_url?: string;
/** "user" or "ai". Defaults to "user". */
sender_type?: string;
} }
export interface TypingStopPayload { export interface TypingStopPayload {