- service now delegates model/provider/pricing logic to agent crate - ChatService built at startup with EmbedService (graceful degradation) - RoomService wired with EmbedService for Qdrant embedding - Add error types for embedding service
346 lines
12 KiB
Rust
346 lines
12 KiB
Rust
use std::sync::Arc;
|
|
|
|
use ::agent::chat::ChatService;
|
|
use ::agent::client::AiClientConfig;
|
|
use ::agent::task::service::TaskService;
|
|
use ::agent::tool::ToolRegistry;
|
|
use ::agent::{EmbedService, new_embed_client};
|
|
use avatar::AppAvatar;
|
|
use config::AppConfig;
|
|
use db::cache::AppCache;
|
|
use db::database::AppDatabase;
|
|
use email::AppEmail;
|
|
use queue::{
|
|
start_email_worker, EmailEnvelope, EmailSendFn, EmailSendFut, GetRedis, MessageProducer, RedisFuture,
|
|
RedisPubSub,
|
|
};
|
|
use room::metrics::RoomMetrics;
|
|
use room::RoomService;
|
|
use serde::{Deserialize, Serialize};
|
|
use utoipa::ToSchema;
|
|
use ws_token::WsTokenService;
|
|
|
|
pub mod storage;
|
|
pub use storage::AppStorage;
|
|
pub mod push;
|
|
pub use push::{WebPushService, PushPayload};
|
|
|
|
#[derive(Clone)]
|
|
pub struct AppService {
|
|
pub db: AppDatabase,
|
|
pub config: AppConfig,
|
|
pub cache: AppCache,
|
|
pub email: AppEmail,
|
|
pub avatar: AppAvatar,
|
|
pub room: RoomService,
|
|
pub ws_token: Arc<WsTokenService>,
|
|
pub queue_producer: MessageProducer,
|
|
pub storage: Option<AppStorage>,
|
|
pub push: Option<WebPushService>,
|
|
}
|
|
|
|
impl AppService {
|
|
/// Send a Web Push notification to a specific user.
|
|
/// Reads the user's push subscription from `user_notification` table.
|
|
/// Non-blocking: failures are logged but don't affect the caller.
|
|
pub fn send_push_to_user(&self, user_id: uuid::Uuid, payload: PushPayload) {
|
|
let push = self.push.clone();
|
|
let db = self.db.clone();
|
|
|
|
tokio::spawn(async move {
|
|
if let Some(push) = push {
|
|
use models::users::user_notification;
|
|
use sea_orm::EntityTrait;
|
|
|
|
let prefs = user_notification::Entity::find_by_id(user_id)
|
|
.one(&db)
|
|
.await;
|
|
|
|
if let Ok(Some(prefs)) = prefs {
|
|
if prefs.push_enabled
|
|
&& prefs.push_subscription_endpoint.is_some()
|
|
&& prefs.push_subscription_keys_p256dh.is_some()
|
|
&& prefs.push_subscription_keys_auth.is_some()
|
|
{
|
|
let endpoint = prefs.push_subscription_endpoint.unwrap();
|
|
let p256dh = prefs.push_subscription_keys_p256dh.unwrap();
|
|
let auth = prefs.push_subscription_keys_auth.unwrap();
|
|
|
|
if let Err(e) = push.send(&endpoint, &p256dh, &auth, &payload).await {
|
|
tracing::warn!(user_id = %user_id, error = %e, "WebPush send failed");
|
|
}
|
|
}
|
|
} else if let Err(e) = prefs {
|
|
tracing::warn!(user_id = %user_id, error = %e, "Failed to read push subscription");
|
|
}
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
impl AppService {
|
|
pub async fn start_room_workers(
|
|
&self,
|
|
shutdown_rx: tokio::sync::broadcast::Receiver<()>,
|
|
) -> anyhow::Result<()> {
|
|
self.room.start_workers(shutdown_rx).await
|
|
}
|
|
|
|
pub async fn new(config: AppConfig) -> anyhow::Result<Self> {
|
|
let db = AppDatabase::init(&config).await?;
|
|
let cache = AppCache::init(&config).await?;
|
|
|
|
let email = AppEmail::init(&config).await?;
|
|
let avatar = AppAvatar::init(&config).await?;
|
|
let storage = match AppStorage::new(&config) {
|
|
Ok(s) => {
|
|
tracing::info!(path = %s.base_path.display(), "Storage initialized");
|
|
Some(s)
|
|
}
|
|
Err(e) => {
|
|
tracing::warn!(error = %e, "Storage not available");
|
|
None
|
|
}
|
|
};
|
|
|
|
let push = match (
|
|
config.vapid_public_key(),
|
|
config.vapid_private_key(),
|
|
) {
|
|
(Some(public_key), Some(private_key)) => {
|
|
match WebPushService::new(
|
|
public_key,
|
|
private_key,
|
|
config.vapid_sender_email(),
|
|
) {
|
|
Ok(s) => {
|
|
tracing::info!("WebPush initialized");
|
|
Some(s)
|
|
}
|
|
Err(e) => {
|
|
tracing::warn!(error = %e, "WebPush not available");
|
|
None
|
|
}
|
|
}
|
|
}
|
|
_ => {
|
|
tracing::warn!("WebPush disabled — VAPID keys not configured");
|
|
None
|
|
}
|
|
};
|
|
|
|
// Build get_redis closure for MessageProducer
|
|
let get_redis: Arc<
|
|
dyn Fn() -> tokio::task::JoinHandle<anyhow::Result<deadpool_redis::cluster::Connection>>
|
|
+ Send
|
|
+ Sync,
|
|
> = Arc::new({
|
|
let pool = cache.redis_pool().clone();
|
|
move || {
|
|
let pool = pool.clone();
|
|
tokio::spawn(async move { pool.get().await.map_err(|e| anyhow::anyhow!("{}", e)) })
|
|
}
|
|
});
|
|
|
|
let redis_pubsub = Some(RedisPubSub {
|
|
get_redis: get_redis.clone(),
|
|
});
|
|
|
|
let message_producer =
|
|
MessageProducer::new(get_redis.clone(), redis_pubsub.clone(), 10000);
|
|
|
|
// Build RoomService
|
|
let task_service = Arc::new(TaskService::new(db.clone()));
|
|
let room_metrics = Arc::new(RoomMetrics::default());
|
|
let room_manager = Arc::new(room::connection::RoomConnectionManager::new(
|
|
room_metrics.clone(),
|
|
cache.clone(),
|
|
));
|
|
|
|
let redis_url = config
|
|
.redis_urls()
|
|
.ok()
|
|
.and_then(|urls| urls.first().cloned())
|
|
.unwrap_or_else(|| "redis://127.0.0.1:6379".to_string());
|
|
|
|
// Build EmbedService if Qdrant and embedding model are configured (graceful degradation)
|
|
let embed_service: Option<Arc<EmbedService>> =
|
|
match new_embed_client(&config).await {
|
|
Ok(client) => {
|
|
let model_name = config
|
|
.get_embed_model_name()
|
|
.unwrap_or_else(|_| "text-embedding-3-small".into());
|
|
let dimensions = config
|
|
.get_embed_model_dimensions()
|
|
.unwrap_or(1536);
|
|
let svc = EmbedService::new(
|
|
client,
|
|
db.writer().clone(),
|
|
model_name,
|
|
dimensions,
|
|
);
|
|
let _ = svc.ensure_collections().await;
|
|
tracing::info!("EmbedService initialized (Qdrant + embeddings)");
|
|
Some(Arc::new(svc))
|
|
}
|
|
Err(e) => {
|
|
tracing::warn!(error = %e, "EmbedService not available — vector search disabled");
|
|
None
|
|
}
|
|
};
|
|
|
|
// Build ChatService if AI is configured; otherwise AI chat is disabled (graceful degradation)
|
|
let chat_service: Option<Arc<ChatService>> =
|
|
match (config.ai_api_key(), config.ai_basic_url()) {
|
|
(Ok(api_key), Ok(base_url)) => {
|
|
tracing::info!(url = %base_url, "AI chat enabled");
|
|
let ai_client_config = AiClientConfig::new(api_key).with_base_url(&base_url);
|
|
let mut registry = ToolRegistry::new();
|
|
git_tools::register_all(&mut registry);
|
|
file_tools::register_all(&mut registry);
|
|
project_tools::register_all(&mut registry);
|
|
let mut chat_svc = ChatService::new()
|
|
.with_ai_client_config(ai_client_config)
|
|
.with_tool_registry(registry);
|
|
if let Some(ref es) = embed_service {
|
|
chat_svc = chat_svc.with_embed_service((**es).clone());
|
|
}
|
|
Some(Arc::new(chat_svc))
|
|
}
|
|
(Err(e), _) => {
|
|
tracing::warn!(error = %e, "AI chat disabled");
|
|
None
|
|
}
|
|
(_, Err(e)) => {
|
|
tracing::warn!(error = %e, "AI chat disabled");
|
|
None
|
|
}
|
|
};
|
|
|
|
// Build push notification callback for RoomService
|
|
let push_fn: Option<room::PushNotificationFn> = push.clone().map(|push_svc| {
|
|
let db_clone = db.clone();
|
|
Arc::new(move |user_id: uuid::Uuid, title: String, body: Option<String>, url: Option<String>| {
|
|
let push = push_svc.clone();
|
|
let db = db_clone.clone();
|
|
let payload = PushPayload {
|
|
title,
|
|
body: body.unwrap_or_default(),
|
|
url,
|
|
icon: None,
|
|
};
|
|
tokio::spawn(async move {
|
|
use models::users::user_notification;
|
|
use sea_orm::EntityTrait;
|
|
|
|
let prefs = user_notification::Entity::find_by_id(user_id)
|
|
.one(&db)
|
|
.await;
|
|
|
|
if let Ok(Some(prefs)) = prefs {
|
|
if prefs.push_enabled
|
|
&& prefs.push_subscription_endpoint.is_some()
|
|
&& prefs.push_subscription_keys_p256dh.is_some()
|
|
&& prefs.push_subscription_keys_auth.is_some()
|
|
{
|
|
let endpoint = prefs.push_subscription_endpoint.unwrap();
|
|
let p256dh = prefs.push_subscription_keys_p256dh.unwrap();
|
|
let auth = prefs.push_subscription_keys_auth.unwrap();
|
|
|
|
if let Err(e) = push.send(&endpoint, &p256dh, &auth, &payload).await {
|
|
tracing::warn!(user_id = %user_id, error = %e, "WebPush send failed");
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}) as room::PushNotificationFn
|
|
});
|
|
|
|
let room = RoomService::new(
|
|
db.clone(),
|
|
cache.clone(),
|
|
config.clone(),
|
|
message_producer.clone(),
|
|
room_manager,
|
|
redis_url,
|
|
chat_service,
|
|
Some(task_service.clone()),
|
|
None,
|
|
push_fn,
|
|
embed_service,
|
|
);
|
|
|
|
// Build WsTokenService
|
|
let ws_token = Arc::new(WsTokenService::new(get_redis.clone()));
|
|
|
|
Ok(Self {
|
|
db,
|
|
config,
|
|
cache,
|
|
email,
|
|
avatar,
|
|
room,
|
|
ws_token,
|
|
queue_producer: message_producer,
|
|
storage,
|
|
push,
|
|
})
|
|
}
|
|
|
|
pub async fn start_email_workers(
|
|
&self,
|
|
shutdown_rx: tokio::sync::broadcast::Receiver<()>,
|
|
) -> anyhow::Result<()> {
|
|
let get_redis_fn = self.queue_producer.get_redis.clone();
|
|
let get_redis: GetRedis = Arc::new(move || -> RedisFuture {
|
|
let get_redis_fn = get_redis_fn.clone();
|
|
Box::pin(async move { get_redis_fn().await? })
|
|
});
|
|
|
|
let email = self.email.clone();
|
|
let send_fn: EmailSendFn = Arc::new(move |envelopes: Vec<EmailEnvelope>| -> EmailSendFut {
|
|
let email = email.clone();
|
|
Box::pin(async move {
|
|
for envelope in envelopes {
|
|
let to = envelope.to.clone();
|
|
let msg = email::EmailMessage {
|
|
to: envelope.to,
|
|
subject: envelope.subject,
|
|
body: envelope.body,
|
|
};
|
|
if let Err(e) = email.send(msg).await {
|
|
tracing::error!(to = %to, error = %e, "email send failed");
|
|
}
|
|
}
|
|
Ok(())
|
|
})
|
|
});
|
|
|
|
start_email_worker(get_redis, send_fn, shutdown_rx).await;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
pub mod agent;
|
|
pub mod auth;
|
|
pub mod error;
|
|
pub mod file_tools;
|
|
pub mod git;
|
|
pub mod git_tools;
|
|
pub mod issue;
|
|
pub mod project;
|
|
pub mod project_tools;
|
|
pub mod pull_request;
|
|
pub mod search;
|
|
pub mod skill;
|
|
pub mod user;
|
|
pub mod utils;
|
|
pub mod workspace;
|
|
pub mod ws_token;
|
|
|
|
#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)]
|
|
pub struct Pager {
|
|
pub page: i64,
|
|
pub par_page: i64,
|
|
}
|