diff --git a/libs/migrate/sql/m20250628_000003_create_user_2fa.sql b/libs/migrate/sql/m20250628_000003_create_user_2fa.sql index 40ba790..7083f56 100644 --- a/libs/migrate/sql/m20250628_000003_create_user_2fa.sql +++ b/libs/migrate/sql/m20250628_000003_create_user_2fa.sql @@ -1,9 +1,10 @@ -CREATE TABLE IF NOT EXISTS user_2fa ( - "user" UUID PRIMARY KEY, - method VARCHAR(255) NOT NULL, - secret VARCHAR(255), - backup_codes JSONB NOT NULL, - is_enabled BOOLEAN NOT NULL DEFAULT false, - created_at TIMESTAMPTZ NOT NULL, - updated_at TIMESTAMPTZ NOT NULL +CREATE TABLE IF NOT EXISTS user_2fa +( + "user" UUID PRIMARY KEY, + method VARCHAR(255) NOT NULL, + secret VARCHAR(255), + backup_codes JSONB NOT NULL, + is_enabled BOOLEAN NOT NULL DEFAULT false, + created_at TIMESTAMPTZ NOT NULL, + updated_at TIMESTAMPTZ NOT NULL ); diff --git a/libs/room/src/service/ai_streaming.rs b/libs/room/src/service/ai_streaming.rs index bfe1185..d0b4457 100644 --- a/libs/room/src/service/ai_streaming.rs +++ b/libs/room/src/service/ai_streaming.rs @@ -1,5 +1,6 @@ use std::pin::Pin; use std::sync::Arc; +use std::sync::atomic::Ordering; use chrono::Utc; use db::cache::AppCache; @@ -7,11 +8,12 @@ use db::database::AppDatabase; use models::rooms::{room_ai, room_message}; use queue::{MessageProducer, ProjectRoomEvent, RoomMessageEnvelope}; use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, ExprTrait, QueryFilter, Set}; +use tokio::sync::Mutex; use uuid::Uuid; use super::sequence::next_room_message_seq_internal; use crate::connection::RoomConnectionManager; -use agent::chat::{normalize_thinking_content, AiRequest, ChatService}; +use agent::chat::{normalize_thinking_content, AiChunkType, AiRequest, ChatService}; pub async fn process_message_ai_streaming( chat_service: Arc, @@ -70,6 +72,10 @@ pub async fn process_message_ai_streaming( let chunk_count = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); let queue_for_chunk = queue.clone(); + // Answer/thinking buffering for smooth SSE — reduce per-token event overhead. + let answer_buf = Arc::new(Mutex::new(String::new())); + let thinking_buf = Arc::new(Mutex::new(String::new())); + let on_chunk = move |chunk: agent::chat::AiStreamChunk| { Box::pin({ let queue = queue_for_chunk.clone(); @@ -78,35 +84,129 @@ pub async fn process_message_ai_streaming( let chunk_count = chunk_count.clone(); let ai_display_name_for_chunk = ai_display_name_for_chunk.clone(); let cancel = cancel.clone(); + let answer_buf = answer_buf.clone(); + let thinking_buf = thinking_buf.clone(); async move { if cancel.load(std::sync::atomic::Ordering::Acquire) { - // Stream was explicitly cancelled via cancel_ai_stream — drop this chunk return; } - let chunk_type_str = match chunk.chunk_type { - agent::chat::AiChunkType::Thinking => "thinking", - agent::chat::AiChunkType::Answer => "answer", - agent::chat::AiChunkType::ToolCall => "tool_call", - agent::chat::AiChunkType::ToolResult => "tool_result", - }; - let content = match chunk.chunk_type { - agent::chat::AiChunkType::Thinking => normalize_thinking_content(&chunk.content), - _ => chunk.content, - }; - let seq = chunk_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let event = RoomMessageStreamChunkEvent { - message_id: streaming_msg_id, - room_id, - seq, - content, - done: chunk.done, - error: None, - display_name: Some(ai_display_name_for_chunk), - chunk_type: Some(chunk_type_str.to_string()), - }; - // Ignore send error (let _) so the task continues even if consumer is gone. - // This ensures the AI continues to generate and the final result is persisted to DB. - let _ = queue.publish_stream_chunk(&event).await; + + match chunk.chunk_type { + AiChunkType::Thinking => { + let mut buf = thinking_buf.lock().await; + buf.push_str(&chunk.content); + // Only flush when we hit a newline boundary (natural paragraph break) + if chunk.content.contains('\n') || buf.len() > 500 { + let content = normalize_thinking_content(&buf); + let flush = !buf.is_empty(); + buf.clear(); + if flush && !cancel.clone().load(Ordering::Acquire) { + let seq = chunk_count.fetch_add(1, Ordering::Relaxed); + let event = queue::RoomMessageStreamChunkEvent { + message_id: streaming_msg_id, + room_id, + seq, + content, + done: false, + error: None, + display_name: Some(ai_display_name_for_chunk.clone()), + chunk_type: Some("thinking".to_string()), + }; + let _ = queue.publish_stream_chunk(&event).await; + } + } + } + AiChunkType::Answer => { + let mut buf = answer_buf.lock().await; + buf.push_str(&chunk.content); + // Flush on natural boundaries: newlines, or when content buffer grows large. + // Small tokens (< 6 chars without newline) accumulate until boundary. + let should_flush = chunk.content.contains('\n') + || buf.len() >= 60 + || chunk.done; + if should_flush { + let content = std::mem::take(&mut *buf); + buf.clear(); + if !cancel.load(Ordering::Acquire) { + let seq = chunk_count.fetch_add(1, Ordering::Relaxed); + let event = queue::RoomMessageStreamChunkEvent { + message_id: streaming_msg_id, + room_id, + seq, + content, + done: chunk.done, + error: None, + display_name: Some(ai_display_name_for_chunk.clone()), + chunk_type: Some("answer".to_string()), + }; + let _ = queue.publish_stream_chunk(&event).await; + } + } + } + AiChunkType::ToolCall | AiChunkType::ToolResult => { + // Tool events are critical — flush buffers first, then emit immediately. + // For ToolCall: flush any accumulated content before showing the call. + let answer_content = { + let mut buf = answer_buf.lock().await; + let content = std::mem::take(&mut *buf); + buf.clear(); + content + }; + let thinking_content = { + let mut buf = thinking_buf.lock().await; + let content = normalize_thinking_content(&std::mem::take(&mut *buf)); + buf.clear(); + content + }; + if !cancel.load(Ordering::Acquire) { + let mut seq = chunk_count.fetch_add(1, Ordering::Relaxed); + if !answer_content.is_empty() { + let event = queue::RoomMessageStreamChunkEvent { + message_id: streaming_msg_id, + room_id, + seq, + content: answer_content, + done: false, + error: None, + display_name: Some(ai_display_name_for_chunk.clone()), + chunk_type: Some("answer".to_string()), + }; + let _ = queue.publish_stream_chunk(&event).await; + seq = chunk_count.fetch_add(1, Ordering::Relaxed); + } + if !thinking_content.is_empty() { + let event = queue::RoomMessageStreamChunkEvent { + message_id: streaming_msg_id, + room_id, + seq, + content: thinking_content, + done: false, + error: None, + display_name: Some(ai_display_name_for_chunk.clone()), + chunk_type: Some("thinking".to_string()), + }; + let _ = queue.publish_stream_chunk(&event).await; + seq = chunk_count.fetch_add(1, Ordering::Relaxed); + } + let chunk_type_str = match chunk.chunk_type { + AiChunkType::ToolCall => "tool_call", + _ => "tool_result", + }; + let content = chunk.content; + let event = queue::RoomMessageStreamChunkEvent { + message_id: streaming_msg_id, + room_id, + seq, + content, + done: false, + error: None, + display_name: Some(ai_display_name_for_chunk.clone()), + chunk_type: Some(chunk_type_str.to_string()), + }; + let _ = queue.publish_stream_chunk(&event).await; + } + } + } } }) as Pin + Send>> }; diff --git a/libs/service/auth/totp.rs b/libs/service/auth/totp.rs index a2e1726..58209a0 100644 --- a/libs/service/auth/totp.rs +++ b/libs/service/auth/totp.rs @@ -363,7 +363,7 @@ impl AppService { fn hash_backup_code(code: &str) -> String { let mut hasher = Sha256::new(); hasher.update(code.as_bytes()); - format!("{:x}", hasher.finalize()) + hasher.finalize().iter().map(|b| format!("{:02x}", b)).collect::() } fn hash_backup_codes(codes: &[String]) -> Vec { @@ -387,7 +387,7 @@ impl AppService { } fn generate_totp_code(&self, secret: &str, counter: u64) -> Result { - use hmac::{Hmac, Mac}; + use hmac::{Hmac, Mac, KeyInit}; use sha1::Sha1; let secret_bytes = self.decode_base32(secret)?; diff --git a/libs/service/lib.rs b/libs/service/lib.rs index 5464e2c..fdd2f57 100644 --- a/libs/service/lib.rs +++ b/libs/service/lib.rs @@ -2,9 +2,8 @@ use std::sync::Arc; use ::agent::chat::ChatService; use ::agent::client::AiClientConfig; -use ::agent::task::service::TaskService; use ::agent::tool::ToolRegistry; -use ::agent::{new_embed_client, EmbedService}; +use ::agent::{new_embed_client, EmbedService, TaskService}; use avatar::AppAvatar; use config::AppConfig; use db::cache::AppCache; @@ -43,9 +42,6 @@ pub struct AppService { } impl AppService { - /// Send a Web Push notification to a specific user. - /// Reads the user's push subscription from `user_notification` table. - /// Non-blocking: failures are logged but don't affect the caller. pub fn send_push_to_user(&self, user_id: uuid::Uuid, payload: PushPayload) { push_helper::spawn_push_notification(self.push.clone(), self.db.clone(), user_id, payload); } diff --git a/libs/service/webhook_dispatch.rs b/libs/service/webhook_dispatch.rs index 808d881..b472c95 100644 --- a/libs/service/webhook_dispatch.rs +++ b/libs/service/webhook_dispatch.rs @@ -1,4 +1,4 @@ -use hmac::{Hmac, Mac}; +use hmac::{Hmac, Mac, KeyInit}; use sha2::Sha256; use std::time::Duration;