From 18a4adb32fb8e70e023f21b01276891451aec14b Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Sun, 17 May 2026 16:37:39 +0800 Subject: [PATCH] feat(service): add project members, activity and billing services --- libs/db/cache.rs | 44 +++ libs/git/Cargo.toml | 1 + libs/git/hook/sync/mod.rs | 21 +- libs/migrate/ai_subagent_session.rs | 63 ++++ libs/migrate/lib.rs | 6 + libs/migrate/project_message_favorite.rs | 57 ++++ libs/migrate/user_billing_history.rs | 56 ++++ libs/models/ai/ai_subagent_session.rs | 29 ++ libs/models/ai/mod.rs | 2 + libs/models/projects/mod.rs | 2 + .../projects/project_message_favorite.rs | 22 ++ libs/models/users/mod.rs | 2 + libs/models/users/user_billing_history.rs | 40 +++ libs/queue/nats_client.rs | 1 + libs/queue/producer.rs | 88 ++++++ libs/queue/types.rs | 28 ++ libs/service/Cargo.toml | 1 + libs/service/agent/billing.rs | 3 +- libs/service/agent/code_review.rs | 1 + libs/service/agent/issue_triage.rs | 1 + libs/service/agent/pr_summary.rs | 1 + libs/service/agent/sync.rs | 65 ++-- libs/service/auth/me.rs | 16 + libs/service/chat/context.rs | 167 +++++++++++ libs/service/chat/mod.rs | 1 + libs/service/project/activity.rs | 1 + libs/service/project/invitation.rs | 3 + libs/service/project/join_answers.rs | 10 + libs/service/project/join_request.rs | 100 +++++++ libs/service/project/members.rs | 27 +- libs/service/project/message_favorite.rs | 278 ++++++++++++++++++ libs/service/project/mod.rs | 1 + libs/service/skill/scanner.rs | 62 +++- libs/service/user/billing.rs | 15 +- 34 files changed, 1161 insertions(+), 54 deletions(-) create mode 100644 libs/migrate/ai_subagent_session.rs create mode 100644 libs/migrate/project_message_favorite.rs create mode 100644 libs/migrate/user_billing_history.rs create mode 100644 libs/models/ai/ai_subagent_session.rs create mode 100644 libs/models/projects/project_message_favorite.rs create mode 100644 libs/models/users/user_billing_history.rs create mode 100644 libs/service/chat/context.rs create mode 100644 libs/service/project/message_favorite.rs diff --git a/libs/db/cache.rs b/libs/db/cache.rs index 8c12d26..2e849ad 100644 --- a/libs/db/cache.rs +++ b/libs/db/cache.rs @@ -131,4 +131,48 @@ impl AppCache { let _: Result<(), _> = redis::cmd("DEL").arg(&key).query_async(&mut conn).await; } } + + pub async fn set_sub_agent_cancelled(&self, conversation_id: Uuid, children_id: &str) -> bool { + if let Ok(mut conn) = self.conn().await { + let key = format!( + "{}subagent:cancel:{}:{}", + CHAT_STREAM_KEY_PREFIX, conversation_id, children_id + ); + let _: Result<(), _> = redis::cmd("SETEX") + .arg(&key) + .arg(300_i64) + .arg("1") + .query_async(&mut conn) + .await; + return true; + } + false + } + + pub async fn is_sub_agent_cancelled(&self, conversation_id: Uuid, children_id: &str) -> bool { + if let Ok(mut conn) = self.conn().await { + let key = format!( + "{}subagent:cancel:{}:{}", + CHAT_STREAM_KEY_PREFIX, conversation_id, children_id + ); + if let Ok(value) = redis::cmd("GET") + .arg(&key) + .query_async::>(&mut conn) + .await + { + return value.is_some(); + } + } + false + } + + pub async fn clear_sub_agent_cancelled(&self, conversation_id: Uuid, children_id: &str) { + if let Ok(mut conn) = self.conn().await { + let key = format!( + "{}subagent:cancel:{}:{}", + CHAT_STREAM_KEY_PREFIX, conversation_id, children_id + ); + let _: Result<(), _> = redis::cmd("DEL").arg(&key).query_async(&mut conn).await; + } + } } diff --git a/libs/git/Cargo.toml b/libs/git/Cargo.toml index c5662e1..aa56f7f 100644 --- a/libs/git/Cargo.toml +++ b/libs/git/Cargo.toml @@ -20,6 +20,7 @@ git2-hooks = { workspace = true, features = [] } git2-ext = { workspace = true, features = [] } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } +serde_yaml = { workspace = true } tar = { workspace = true } flate2 = { workspace = true } zip = { workspace = true } diff --git a/libs/git/hook/sync/mod.rs b/libs/git/hook/sync/mod.rs index 1c2df49..a1c11ad 100644 --- a/libs/git/hook/sync/mod.rs +++ b/libs/git/hook/sync/mod.rs @@ -23,6 +23,10 @@ use crate::GitDomain; use sha1::Digest; +fn should_descend_dir(name: &str) -> bool { + name != ".git" +} + /// Recursively scan `base` for files named `SKILL.md`. /// The skill slug is `{short_repo_id}/{parent_dir_name}` to ensure uniqueness across repos. fn scan_skills_from_dir( @@ -53,7 +57,6 @@ fn scan_skills_from_dir( .parent() .and_then(|p| p.file_name()) .and_then(|n| n.to_str()) - .filter(|s| !s.starts_with('.')) { let slug = format!("{}/{}", repo_id_prefix, dir_name); if let Ok(raw) = std::fs::read(&path) { @@ -79,12 +82,17 @@ fn git_blob_hash(content: &[u8]) -> String { hex::encode(hasher.finalize()) } +fn parse_frontmatter(frontmatter: Option<&str>) -> serde_json::Value { + frontmatter + .and_then(|fm| serde_json::from_str(fm).ok()) + .or_else(|| frontmatter.and_then(|fm| serde_yaml::from_str(fm).ok())) + .unwrap_or_default() +} + fn parse_skill_content(slug: &str, raw: &[u8]) -> DiscoveredSkill { let content = String::from_utf8_lossy(raw); let (frontmatter, body) = extract_frontmatter(&content); - let metadata: serde_json::Value = frontmatter - .and_then(|fm| serde_json::from_str(fm).ok()) - .unwrap_or_default(); + let metadata = parse_frontmatter(frontmatter); let name = metadata .get("name") @@ -159,7 +167,7 @@ fn scan_skills_from_tree( match entry.kind() { Some(git2::ObjectType::Tree) => { - if !name.starts_with('.') { + if should_descend_dir(name) { if let Ok(subtree) = entry.to_object(git_repo).and_then(|o| o.peel_to_tree()) { @@ -171,8 +179,7 @@ fn scan_skills_from_tree( let dir_name = std::path::Path::new(&entry_path) .parent() .and_then(|p| p.file_name()) - .and_then(|n| n.to_str()) - .filter(|s| !s.starts_with('.')); + .and_then(|n| n.to_str()); let Some(dir_name) = dir_name else { continue }; let slug = format!("{}/{}", repo_id_prefix, dir_name); diff --git a/libs/migrate/ai_subagent_session.rs b/libs/migrate/ai_subagent_session.rs new file mode 100644 index 0000000..c44fb8a --- /dev/null +++ b/libs/migrate/ai_subagent_session.rs @@ -0,0 +1,63 @@ +use sea_orm_migration::prelude::*; + +pub struct Migration; + +impl MigrationName for Migration { + fn name(&self) -> &str { + "ai_subagent_session" + } +} + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute_unprepared( + r#" +create table if not exists ai_subagent_session +( + id uuid not null primary key, + conversation_id uuid not null, + message_id uuid not null, + children_id varchar(255) not null, + role varchar(64) not null, + task text not null, + output text not null, + input_tokens bigint default 0 not null, + output_tokens bigint default 0 not null, + model_name varchar(255), + status varchar(32) default 'completed' not null, + error_message text, + created_at timestamp with time zone not null +); + +create index if not exists idx_ai_subagent_session_conv + on ai_subagent_session (conversation_id); + +create index if not exists idx_ai_subagent_session_children + on ai_subagent_session (children_id); + +create index if not exists idx_ai_subagent_session_message + on ai_subagent_session (message_id); +"#, + ) + .await?; + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute_unprepared( + r#" +drop index if exists idx_ai_subagent_session_message; +drop index if exists idx_ai_subagent_session_children; +drop index if exists idx_ai_subagent_session_conv; +drop table if exists ai_subagent_session; +"#, + ) + .await?; + Ok(()) + } +} \ No newline at end of file diff --git a/libs/migrate/lib.rs b/libs/migrate/lib.rs index 56a6008..8ee8498 100644 --- a/libs/migrate/lib.rs +++ b/libs/migrate/lib.rs @@ -43,9 +43,15 @@ impl MigratorTrait for Migrator { vec![ Box::new(init::Migration), Box::new(room_compact_summary::Migration), + Box::new(user_billing_history::Migration), + Box::new(project_message_favorite::Migration), + Box::new(ai_subagent_session::Migration), ] } } +pub mod ai_subagent_session; pub mod init; +pub mod project_message_favorite; pub mod room_compact_summary; +pub mod user_billing_history; diff --git a/libs/migrate/project_message_favorite.rs b/libs/migrate/project_message_favorite.rs new file mode 100644 index 0000000..1d81e45 --- /dev/null +++ b/libs/migrate/project_message_favorite.rs @@ -0,0 +1,57 @@ +use sea_orm_migration::prelude::*; + +pub struct Migration; + +impl MigrationName for Migration { + fn name(&self) -> &str { + "project_message_favorite" + } +} + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute_unprepared( + r#" +create table if not exists project_message_favorite +( + uid uuid not null + primary key, + project uuid not null, + room uuid not null, + message uuid not null, + user_uuid uuid not null, + created_at timestamp with time zone not null +); + +create unique index if not exists idx_project_message_favorite_user_message + on project_message_favorite (user_uuid, message); + +create index if not exists idx_project_message_favorite_project_user + on project_message_favorite (project, user_uuid, created_at desc); + +create index if not exists idx_project_message_favorite_room + on project_message_favorite (room); +"#, + ) + .await?; + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute_unprepared( + r#" +drop index if exists idx_project_message_favorite_room; +drop index if exists idx_project_message_favorite_project_user; +drop index if exists idx_project_message_favorite_user_message; +drop table if exists project_message_favorite; +"#, + ) + .await?; + Ok(()) + } +} diff --git a/libs/migrate/user_billing_history.rs b/libs/migrate/user_billing_history.rs new file mode 100644 index 0000000..fe90567 --- /dev/null +++ b/libs/migrate/user_billing_history.rs @@ -0,0 +1,56 @@ +use sea_orm_migration::prelude::*; + +pub struct Migration; + +impl MigrationName for Migration { + fn name(&self) -> &str { + "user_billing_history" + } +} + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute_unprepared( + r#" +create table if not exists user_billing_history +( + uid uuid not null + primary key, + user_uuid uuid not null, + amount numeric not null, + currency text not null, + reason text not null, + extra jsonb, + created_at timestamp with time zone not null +); + +create index if not exists idx_user_billing_history_user + on user_billing_history (user_uuid); + +create index if not exists idx_user_billing_history_created_at + on user_billing_history (created_at); +"#, + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute_unprepared( + r#" +drop index if exists idx_user_billing_history_created_at; +drop index if exists idx_user_billing_history_user; +drop table if exists user_billing_history; +"#, + ) + .await?; + + Ok(()) + } +} diff --git a/libs/models/ai/ai_subagent_session.rs b/libs/models/ai/ai_subagent_session.rs new file mode 100644 index 0000000..38f1c31 --- /dev/null +++ b/libs/models/ai/ai_subagent_session.rs @@ -0,0 +1,29 @@ +use crate::{DateTimeUtc, Uuid}; +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)] +#[sea_orm(table_name = "ai_subagent_session")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: Uuid, + pub conversation_id: Uuid, + pub message_id: Uuid, + pub children_id: String, + pub role: String, + pub task: String, + #[sea_orm(column_type = "Text")] + pub output: String, + pub input_tokens: i64, + pub output_tokens: i64, + pub model_name: Option, + pub status: String, + #[sea_orm(column_type = "Text", nullable)] + pub error_message: Option, + pub created_at: DateTimeUtc, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} \ No newline at end of file diff --git a/libs/models/ai/mod.rs b/libs/models/ai/mod.rs index ce56af2..9fe0026 100644 --- a/libs/models/ai/mod.rs +++ b/libs/models/ai/mod.rs @@ -41,6 +41,7 @@ pub use ai_conversation::Entity as AiConversation; pub use ai_message::Entity as AiMessage; pub use ai_message_fork::Entity as AiMessageFork; pub use ai_shared_conversation::Entity as AiSharedConversation; +pub use ai_subagent_session::Entity as AiSubAgentSession; pub use ai_token_usage::Entity as AiTokenUsage; pub use billing_error::Entity as BillingError; pub use subscription::Entity as Subscription; @@ -50,6 +51,7 @@ pub mod ai_message; pub mod ai_message_fork; pub mod ai_session; pub mod ai_shared_conversation; +pub mod ai_subagent_session; pub mod ai_token_usage; pub mod ai_tool_auth; pub mod ai_tool_call; diff --git a/libs/models/projects/mod.rs b/libs/models/projects/mod.rs index adda7e1..45c4fb4 100644 --- a/libs/models/projects/mod.rs +++ b/libs/models/projects/mod.rs @@ -52,6 +52,7 @@ pub use project_member_join_answers::Entity as ProjectMemberJoinAnswers; pub use project_member_join_request::Entity as ProjectMemberJoinRequest; pub use project_member_join_settings::Entity as ProjectMemberJoinSettings; pub use project_members::Entity as ProjectMember; +pub use project_message_favorite::Entity as ProjectMessageFavorite; pub use project_role_priority::Entity as ProjectRolePriority; pub use project_watch::Entity as ProjectWatch; @@ -73,5 +74,6 @@ pub mod project_member_join_answers; pub mod project_member_join_request; pub mod project_member_join_settings; pub mod project_members; +pub mod project_message_favorite; pub mod project_role_priority; pub mod project_watch; diff --git a/libs/models/projects/project_message_favorite.rs b/libs/models/projects/project_message_favorite.rs new file mode 100644 index 0000000..6c6ee55 --- /dev/null +++ b/libs/models/projects/project_message_favorite.rs @@ -0,0 +1,22 @@ +use crate::{DateTimeUtc, MessageId, ProjectId, RoomId, UserId}; +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +/// User-saved room messages scoped to a project. +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)] +#[sea_orm(table_name = "project_message_favorite")] +pub struct Model { + #[sea_orm(primary_key)] + pub uid: Uuid, + pub project: ProjectId, + pub room: RoomId, + pub message: MessageId, + #[sea_orm(column_name = "user_uuid")] + pub user: UserId, + pub created_at: DateTimeUtc, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/libs/models/users/mod.rs b/libs/models/users/mod.rs index 8582df6..aa0fa28 100644 --- a/libs/models/users/mod.rs +++ b/libs/models/users/mod.rs @@ -2,6 +2,7 @@ pub use user::Entity as User; pub use user_2fa::Entity as User2fa; pub use user_activity_log::Entity as UserActivityLog; pub use user_billing::Entity as UserBilling; +pub use user_billing_history::Entity as UserBillingHistory; pub use user_email::Entity as UserEmail; pub use user_email_change::Entity as UserEmailChange; pub use user_notification::Entity as UserNotification; @@ -16,6 +17,7 @@ pub mod user; pub mod user_2fa; pub mod user_activity_log; pub mod user_billing; +pub mod user_billing_history; pub mod user_email; pub mod user_email_change; pub mod user_notification; diff --git a/libs/models/users/user_billing_history.rs b/libs/models/users/user_billing_history.rs new file mode 100644 index 0000000..21f95a5 --- /dev/null +++ b/libs/models/users/user_billing_history.rs @@ -0,0 +1,40 @@ +use crate::{DateTimeUtc, UserId}; +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +/// Billing transaction history for a user's personal account. +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)] +#[sea_orm(table_name = "user_billing_history")] +pub struct Model { + #[sea_orm(primary_key)] + pub uid: Uuid, + #[sea_orm(column_name = "user_uuid")] + pub user: UserId, + #[sea_orm(column_type = "Decimal(Some((20, 4)))")] + pub amount: Decimal, + #[sea_orm(column_type = "Text")] + pub currency: String, + #[sea_orm(column_type = "Text")] + pub reason: String, + #[sea_orm(column_type = "JsonBinary", nullable)] + pub extra: Option, + pub created_at: DateTimeUtc, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation { + #[sea_orm( + belongs_to = "crate::users::user::Entity", + from = "Column::User", + to = "crate::users::user::Column::Uid" + )] + User, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::User.def() + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/libs/queue/nats_client.rs b/libs/queue/nats_client.rs index d977bfc..e4c812c 100644 --- a/libs/queue/nats_client.rs +++ b/libs/queue/nats_client.rs @@ -107,6 +107,7 @@ impl NatsClient { "room.chunk.>".to_string(), "chat.message.>".to_string(), "chat.chunk.>".to_string(), + "chat.subagent.chunk.>".to_string(), ], retention: jetstream::stream::RetentionPolicy::Interest, max_age: Duration::from_secs(config.nats_max_age_secs()), diff --git a/libs/queue/producer.rs b/libs/queue/producer.rs index dd0c7b0..90082a6 100644 --- a/libs/queue/producer.rs +++ b/libs/queue/producer.rs @@ -230,4 +230,92 @@ impl MessageProducer { tracing::warn!(error = %e, conversation_id = %event.conversation_id, "JetStream chat chunk publish failed"); } } + + /// Publish a sub-agent stream chunk via JetStream for real-time multi-viewer delivery. + /// Subject: `chat.subagent.chunk.{conversation_id}.{children_id}` + pub async fn publish_sub_agent_chunk(&self, event: &crate::types::SubAgentStreamChunkEvent) { + let subject = format!( + "chat.subagent.chunk.{}.{}", + event.conversation_id, event.children_id + ); + let payload = match serde_json::to_vec(event) { + Ok(p) => p, + Err(e) => { + tracing::error!(error = %e, "serialise sub-agent chunk failed"); + return; + } + }; + if let Err(e) = self + .publish_sub_agent_chunk_redis(&subject, payload.clone()) + .await + { + tracing::warn!(error = %e, conversation_id = %event.conversation_id, children_id = %event.children_id, "Redis sub-agent chunk publish failed"); + } + let core_publish = self.core_publish.clone(); + let core_subject = subject.clone(); + let core_payload = payload.clone(); + tokio::spawn(async move { + core_publish(core_subject, core_payload).await; + }); + let jetstream_publish = self.jetstream_publish.clone(); + let conversation_id = event.conversation_id; + let children_id = event.children_id.clone(); + tokio::spawn(async move { + if let Err(e) = (jetstream_publish)(subject, payload).await { + tracing::warn!(error = %e, conversation_id = %conversation_id, children_id = %children_id, "JetStream sub-agent chunk publish failed"); + } + }); + } + + /// Publish a sub-agent chunk on Core NATS only. + /// Token-level sub-agent output is transient and latency-sensitive; final + /// output is persisted separately as an AI sub-agent session. + pub async fn publish_sub_agent_chunk_realtime( + &self, + event: &crate::types::SubAgentStreamChunkEvent, + ) { + let subject = format!( + "chat.subagent.chunk.{}.{}", + event.conversation_id, event.children_id + ); + let payload = match serde_json::to_vec(event) { + Ok(p) => p, + Err(e) => { + tracing::error!(error = %e, "serialise realtime sub-agent chunk failed"); + return; + } + }; + if let Err(e) = self + .publish_sub_agent_chunk_redis(&subject, payload.clone()) + .await + { + tracing::warn!(error = %e, conversation_id = %event.conversation_id, children_id = %event.children_id, "Redis realtime sub-agent chunk publish failed"); + } + let core_publish = self.core_publish.clone(); + tokio::spawn(async move { + core_publish(subject, payload).await; + }); + } + + async fn publish_sub_agent_chunk_redis( + &self, + subject: &str, + payload: Vec, + ) -> anyhow::Result<()> { + tokio::time::timeout(std::time::Duration::from_millis(500), async { + let handle = (self.get_redis)(); + let mut conn = handle + .await + .map_err(|e| anyhow::anyhow!("redis pool task panicked: {}", e))??; + let _: i32 = redis::cmd("PUBLISH") + .arg(subject) + .arg(payload) + .query_async(&mut conn) + .await + .map_err(|e| anyhow::anyhow!("redis publish failed: {}", e))?; + Ok(()) + }) + .await + .map_err(|_| anyhow::anyhow!("redis publish timed out"))? + } } diff --git a/libs/queue/types.rs b/libs/queue/types.rs index 3641516..b52ca25 100644 --- a/libs/queue/types.rs +++ b/libs/queue/types.rs @@ -202,3 +202,31 @@ pub struct ChatStreamChunkEvent { pub chunk_type: Option, pub model_name: Option, } + +/// Sub-agent stream chunk event — published to dedicated NATS subject `chat.subagent.chunk.{conversation_id}.{children_id}`. +/// Frontend subscribes to this subject via the sub-agent SSE endpoint using children_id. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SubAgentStreamChunkEvent { + pub conversation_id: Uuid, + pub children_id: String, + pub seq: u64, + pub content: String, + pub done: bool, + pub error: Option, + pub chunk_type: Option, + pub role: String, + pub task: String, +} + +/// Sub-agent session record — persisted after sub-agent completes. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SubAgentSessionRecord { + pub conversation_id: Uuid, + pub children_id: String, + pub role: String, + pub task: String, + pub output: String, + pub input_tokens: i64, + pub output_tokens: i64, + pub created_at: DateTime, +} diff --git a/libs/service/Cargo.toml b/libs/service/Cargo.toml index b7500f9..9b7f004 100644 --- a/libs/service/Cargo.toml +++ b/libs/service/Cargo.toml @@ -31,6 +31,7 @@ anyhow = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true, features = [] } +serde_yaml = { workspace = true } slog = { workspace = true, features = ["anyhow"] } captcha-rs = { workspace = true } utoipa = { workspace = true, features = ["uuid", "chrono"] } diff --git a/libs/service/agent/billing.rs b/libs/service/agent/billing.rs index 02a1cd3..c68ac9d 100644 --- a/libs/service/agent/billing.rs +++ b/libs/service/agent/billing.rs @@ -13,6 +13,7 @@ impl AppService { pub async fn record_ai_usage( &self, project_uid: Uuid, + user_uid: Uuid, model_id: Uuid, input_tokens: i64, output_tokens: i64, @@ -33,7 +34,7 @@ impl AppService { match agent::billing::record_ai_usage( &self.db, project_uid, - Uuid::nil(), + user_uid, version_id, input_tokens, output_tokens, diff --git a/libs/service/agent/code_review.rs b/libs/service/agent/code_review.rs index 84f9fb3..2fe76e4 100644 --- a/libs/service/agent/code_review.rs +++ b/libs/service/agent/code_review.rs @@ -155,6 +155,7 @@ impl AppService { let billing = self .record_ai_usage( repo.project, + repo.created_by, model.id, ai_response.input_tokens, ai_response.output_tokens, diff --git a/libs/service/agent/issue_triage.rs b/libs/service/agent/issue_triage.rs index c56be62..f6c5211 100644 --- a/libs/service/agent/issue_triage.rs +++ b/libs/service/agent/issue_triage.rs @@ -192,6 +192,7 @@ impl AppService { let _ = self .record_ai_usage( project.id, + project.created_by, model.id, ai_response.input_tokens, ai_response.output_tokens, diff --git a/libs/service/agent/pr_summary.rs b/libs/service/agent/pr_summary.rs index dda0ba8..1e047ec 100644 --- a/libs/service/agent/pr_summary.rs +++ b/libs/service/agent/pr_summary.rs @@ -245,6 +245,7 @@ impl AppService { let billing = self .record_ai_usage( repo.project, + repo.created_by, model.id, ai_response.input_tokens, ai_response.output_tokens, diff --git a/libs/service/agent/sync.rs b/libs/service/agent/sync.rs index 164937d..448948e 100644 --- a/libs/service/agent/sync.rs +++ b/libs/service/agent/sync.rs @@ -332,33 +332,52 @@ async fn upsert_pricing( .filter(PCol::ModelVersionId.eq(version_uuid)) .one(db) .await?; - if existing.is_some() { + + let Some(p) = pricing else { + tracing::warn!( + model_version_id = %version_uuid, + "sync_models_from_upstream: skipping pricing because upstream did not provide pricing" + ); + return Ok(false); + }; + + let input = p.input.unwrap_or(0.0); + let output = p.output.unwrap_or(0.0); + if input <= 0.0 && output <= 0.0 { + tracing::warn!( + model_version_id = %version_uuid, + input_price = input, + output_price = output, + "sync_models_from_upstream: skipping zero pricing" + ); return Ok(false); } - let (input_price, output_price) = if let Some(p) = pricing { - ( - format!("{:.2}", p.input.unwrap_or(0.0)), - format!("{:.2}", p.output.unwrap_or(0.0)), - ) + let (input_price, output_price) = (format!("{:.4}", input), format!("{:.4}", output)); + + let currency = p.currency.clone().unwrap_or_else(|| "USD".to_string()); + + if let Some(existing) = existing { + // Update existing pricing with corrected per-1K values + let mut active: models::agents::model_pricing::ActiveModel = existing.into(); + active.input_price_per_1k_tokens = Set(input_price); + active.output_price_per_1k_tokens = Set(output_price); + active.currency = Set(currency); + active.effective_from = Set(Utc::now()); + active.update(db).await.map_err(AppError::from)?; + Ok(true) } else { - ("0.00".to_string(), "0.00".to_string()) - }; - - let currency = pricing - .and_then(|p| p.currency.clone()) - .unwrap_or_else(|| "USD".to_string()); - - let active = models::agents::model_pricing::ActiveModel { - id: Set(Utc::now().timestamp_millis()), - model_version_id: Set(version_uuid), - input_price_per_1k_tokens: Set(input_price), - output_price_per_1k_tokens: Set(output_price), - currency: Set(currency), - effective_from: Set(Utc::now()), - }; - active.insert(db).await.map_err(AppError::from)?; - Ok(true) + let active = models::agents::model_pricing::ActiveModel { + id: Set(Utc::now().timestamp_millis()), + model_version_id: Set(version_uuid), + input_price_per_1k_tokens: Set(input_price), + output_price_per_1k_tokens: Set(output_price), + currency: Set(currency), + effective_from: Set(Utc::now()), + }; + active.insert(db).await.map_err(AppError::from)?; + Ok(true) + } } async fn upsert_capabilities( diff --git a/libs/service/auth/me.rs b/libs/service/auth/me.rs index 64b19b9..f7b9c60 100644 --- a/libs/service/auth/me.rs +++ b/libs/service/auth/me.rs @@ -1,5 +1,7 @@ use crate::AppService; use crate::error::AppError; +use models::users::user_preferences; +use sea_orm::*; use serde::{Deserialize, Serialize}; use session::Session; use uuid::Uuid; @@ -11,17 +13,31 @@ pub struct ContextMe { pub display_name: Option, pub avatar_url: Option, pub has_unread_notifications: u64, + pub language: String, + pub timezone: String, } impl AppService { pub async fn auth_me(&self, ctx: Session) -> Result { let user_id = ctx.user().ok_or(AppError::Unauthorized)?; let user = self.utils_find_user_by_uid(user_id).await?; + let preferences = user_preferences::Entity::find_by_id(user_id) + .one(&self.db) + .await?; + Ok(ContextMe { uid: user.uid, username: user.username, display_name: user.display_name, avatar_url: user.avatar_url, has_unread_notifications: 0, + language: preferences + .as_ref() + .map(|prefs| prefs.language.clone()) + .unwrap_or_else(|| "en".to_string()), + timezone: preferences + .as_ref() + .map(|prefs| prefs.timezone.clone()) + .unwrap_or_else(|| "UTC".to_string()), }) } } diff --git a/libs/service/chat/context.rs b/libs/service/chat/context.rs new file mode 100644 index 0000000..06d3691 --- /dev/null +++ b/libs/service/chat/context.rs @@ -0,0 +1,167 @@ +use std::collections::HashSet; + +use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; +use uuid::Uuid; + +use crate::AppService; +use crate::error::AppError; +use models::projects::project_skill; +use models::repos::repo; + +fn metadata_object( + metadata: Option<&serde_json::Value>, +) -> Option<&serde_json::Map> { + metadata?.as_object() +} + +fn slash_context_object( + metadata: Option<&serde_json::Value>, +) -> Option<&serde_json::Map> { + metadata_object(metadata)? + .get("slash_context")? + .as_object() +} + +fn stringify_text(value: &serde_json::Value) -> Option { + value + .as_str() + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(ToOwned::to_owned) +} + +fn repo_ids_from_metadata(metadata: Option<&serde_json::Value>) -> Vec { + let Some(context) = slash_context_object(metadata) else { + return Vec::new(); + }; + + let Some(repos) = context.get("repos").and_then(|value| value.as_array()) else { + return Vec::new(); + }; + + let mut seen = HashSet::new(); + let mut ids = Vec::new(); + + for repo in repos { + let Some(repo_id) = repo + .as_object() + .and_then(|value| value.get("id")) + .and_then(stringify_text) + else { + continue; + }; + + let Ok(repo_uuid) = Uuid::parse_str(&repo_id) else { + continue; + }; + + if seen.insert(repo_uuid) { + ids.push(repo_uuid); + } + } + + ids +} + +fn skill_ids_from_metadata(metadata: Option<&serde_json::Value>) -> Vec { + let Some(context) = slash_context_object(metadata) else { + return Vec::new(); + }; + + let Some(skills) = context.get("skills").and_then(|value| value.as_array()) else { + return Vec::new(); + }; + + let mut seen = HashSet::new(); + let mut ids = Vec::new(); + + for skill in skills { + let Some(skill_id) = skill + .as_object() + .and_then(|value| value.get("id")) + .and_then(stringify_text) + else { + continue; + }; + + let Ok(skill_id) = skill_id.parse::() else { + continue; + }; + + if seen.insert(skill_id) { + ids.push(skill_id); + } + } + + ids +} + +impl AppService { + pub async fn build_message_context_prompts( + &self, + project_id: Option, + metadata: Option<&serde_json::Value>, + ) -> Result, AppError> { + let mut prompts = Vec::new(); + + let repo_ids = repo_ids_from_metadata(metadata); + for repo_id in repo_ids { + let mut query = repo::Entity::find().filter(repo::Column::Id.eq(repo_id)); + if let Some(project_id) = project_id { + query = query.filter(repo::Column::Project.eq(project_id)); + } + + if let Some(repo) = query.one(self.db.reader()).await? { + let mut parts = vec![ + format!("Repository name: {}", repo.repo_name), + format!("Repository id: {}", repo.id), + format!("Default branch: {}", repo.default_branch), + format!( + "Visibility: {}", + if repo.is_private { "private" } else { "public" } + ), + ]; + + if let Some(description) = repo.description.as_deref() { + parts.push(format!("Description: {}", description)); + } + + prompts.push(format!( + "[Selected repository context]\n{}", + parts.join("\n") + )); + } + } + + let skill_ids = skill_ids_from_metadata(metadata); + if let Some(project_id) = project_id { + for skill_id in skill_ids { + if let Some(skill) = project_skill::Entity::find() + .filter(project_skill::Column::Id.eq(skill_id)) + .filter(project_skill::Column::ProjectUuid.eq(project_id)) + .filter(project_skill::Column::Enabled.eq(true)) + .one(self.db.reader()) + .await? + { + let mut header = vec![ + format!("Skill name: {}", skill.name), + format!("Skill slug: {}", skill.slug), + format!("Skill source: {}", skill.source), + ]; + + if let Some(description) = skill.description.as_deref() { + header.push(format!("Description: {}", description)); + } + + prompts.push(format!( + "[Selected skill context]\n{}\n\n{}", + header.join("\n"), + skill.content + )); + } + } + } + + Ok(prompts) + } +} diff --git a/libs/service/chat/mod.rs b/libs/service/chat/mod.rs index 80966ab..fda1108 100644 --- a/libs/service/chat/mod.rs +++ b/libs/service/chat/mod.rs @@ -1,5 +1,6 @@ pub mod access; pub mod conversation; +pub mod context; pub mod fork; pub mod message; pub mod share; diff --git a/libs/service/project/activity.rs b/libs/service/project/activity.rs index e1294f7..bc3fb6b 100644 --- a/libs/service/project/activity.rs +++ b/libs/service/project/activity.rs @@ -75,6 +75,7 @@ impl ActivityLogResponse { self.event_type.as_str(), "member_role_change" | "member_remove" + | "member_leave" | "member_invite" | "invitation_cancelled" | "join_request_approve" diff --git a/libs/service/project/invitation.rs b/libs/service/project/invitation.rs index ea93b1b..06b7361 100644 --- a/libs/service/project/invitation.rs +++ b/libs/service/project/invitation.rs @@ -201,6 +201,9 @@ impl AppService { if role != MemberRole::Owner && role != MemberRole::Admin { return Err(AppError::NoPower); } + if scope == MemberRole::Owner || (scope == MemberRole::Admin && role != MemberRole::Owner) { + return Err(AppError::NoPower); + } let target_user = user_email::Entity::find() .filter(user_email::Column::Email.eq(invitee_email.clone())) .one(&self.db) diff --git a/libs/service/project/join_answers.rs b/libs/service/project/join_answers.rs index 245e543..269548b 100644 --- a/libs/service/project/join_answers.rs +++ b/libs/service/project/join_answers.rs @@ -38,6 +38,16 @@ impl AppService { let _user_uid = ctx.user().ok_or(AppError::Unauthorized)?; let project = self.utils_find_project_by_name(project_name).await?; + let role = self + .utils_project_context_role(&ctx, project.name.clone()) + .await + .map_err(|_| AppError::NoPower)?; + if role != models::projects::MemberRole::Owner + && role != models::projects::MemberRole::Admin + { + return Err(AppError::NoPower); + } + // Verify the request exists and belongs to this project let join_request = project_member_join_request::Entity::find_by_id(request_id) .filter(project_member_join_request::Column::Project.eq(project.id)) diff --git a/libs/service/project/join_request.rs b/libs/service/project/join_request.rs index 580d666..3e99803 100644 --- a/libs/service/project/join_request.rs +++ b/libs/service/project/join_request.rs @@ -49,6 +49,32 @@ pub struct ProcessJoinRequest { } impl AppService { + fn validate_join_request_answers( + questions: &[String], + answers: &[AnswerRequest], + ) -> Result<(), AppError> { + if questions.len() != answers.len() { + return Err(AppError::BadRequest( + "Answer count does not match required questions".to_string(), + )); + } + + for (expected, submitted) in questions.iter().zip(answers.iter()) { + if expected != &submitted.question { + return Err(AppError::BadRequest( + "Submitted answers do not match required questions".to_string(), + )); + } + if submitted.answer.trim().is_empty() { + return Err(AppError::BadRequest( + "All required questions must be answered".to_string(), + )); + } + } + + Ok(()) + } + pub async fn project_get_join_requests( &self, project_name: String, @@ -174,6 +200,18 @@ impl AppService { .one(&self.db) .await?; + if let Some(ref s) = settings { + if s.require_questions { + let required_questions: Vec = + serde_json::from_value(s.questions.clone()).map_err(|_| { + AppError::InternalServerError( + "Invalid join question configuration".to_string(), + ) + })?; + Self::validate_join_request_answers(&required_questions, &request.answers)?; + } + } + // Clone message for audit log before moving let message = request.message.clone(); @@ -271,6 +309,12 @@ impl AppService { return Err(AppError::NoPower); } + if process.scope == MemberRole::Owner + || (process.scope == MemberRole::Admin && role != MemberRole::Owner) + { + return Err(AppError::NoPower); + } + let join_request = project_member_join_request::Entity::find_by_id(request_id) .filter(project_member_join_request::Column::Project.eq(project.id)) .one(&self.db) @@ -507,3 +551,59 @@ impl AppService { }) } } + +#[cfg(test)] +mod tests { + use super::*; + + fn answers(items: &[(&str, &str)]) -> Vec { + items + .iter() + .map(|(question, answer)| AnswerRequest { + question: (*question).to_string(), + answer: (*answer).to_string(), + }) + .collect() + } + + #[test] + fn validates_matching_answers() { + let questions = vec!["Why join?".to_string(), "Experience?".to_string()]; + let answers = answers(&[("Why join?", "Because"), ("Experience?", "Five years")]); + + assert!(AppService::validate_join_request_answers(&questions, &answers).is_ok()); + } + + #[test] + fn rejects_missing_answers() { + let questions = vec!["Why join?".to_string()]; + let answers = Vec::new(); + + assert!(matches!( + AppService::validate_join_request_answers(&questions, &answers), + Err(AppError::BadRequest(_)) + )); + } + + #[test] + fn rejects_mismatched_questions() { + let questions = vec!["Why join?".to_string()]; + let answers = answers(&[("Other question", "Answer")]); + + assert!(matches!( + AppService::validate_join_request_answers(&questions, &answers), + Err(AppError::BadRequest(_)) + )); + } + + #[test] + fn rejects_blank_answers() { + let questions = vec!["Why join?".to_string()]; + let answers = answers(&[("Why join?", " ")]); + + assert!(matches!( + AppService::validate_join_request_answers(&questions, &answers), + Err(AppError::BadRequest(_)) + )); + } +} diff --git a/libs/service/project/members.rs b/libs/service/project/members.rs index dfb4e5f..f046b58 100644 --- a/libs/service/project/members.rs +++ b/libs/service/project/members.rs @@ -354,7 +354,10 @@ impl AppService { .scope_role() .map_err(|_| AppError::RoleParseError)?; - if actor_role != MemberRole::Owner && actor_role != MemberRole::Admin { + if actor_role != MemberRole::Owner + && actor_role != MemberRole::Admin + && actor_uid != user_id + { return Err(AppError::NoPower); } @@ -369,11 +372,13 @@ impl AppService { .scope_role() .map_err(|_| AppError::RoleParseError)?; + let is_self = actor_uid == user_id; + if target_role == MemberRole::Owner { return Err(AppError::NoPower); } - if actor_role == MemberRole::Admin && target_role == MemberRole::Admin { + if !is_self && actor_role == MemberRole::Admin && target_role == MemberRole::Admin { return Err(AppError::NoPower); } @@ -403,11 +408,19 @@ impl AppService { None, actor_uid, super::activity::ActivityLogParams { - event_type: "member_remove".to_string(), - title: format!( - "{} removed {} from the project", - actor_username, target_username - ), + event_type: if is_self { + "member_leave".to_string() + } else { + "member_remove".to_string() + }, + title: if is_self { + format!("{} left the project", actor_username) + } else { + format!( + "{} removed {} from the project", + actor_username, target_username + ) + }, repo_id: None, content: None, event_id: None, diff --git a/libs/service/project/message_favorite.rs b/libs/service/project/message_favorite.rs new file mode 100644 index 0000000..76a59cc --- /dev/null +++ b/libs/service/project/message_favorite.rs @@ -0,0 +1,278 @@ +use crate::AppService; +use crate::error::AppError; +use chrono::{DateTime, Utc}; +use models::projects::{project_members, project_message_favorite}; +use models::rooms::{room, room_message}; +use sea_orm::sea_query::Expr; +use sea_orm::*; +use serde::{Deserialize, Serialize}; +use session::Session; +use utoipa::{IntoParams, ToSchema}; +use uuid::Uuid; + +#[derive(Debug, Clone, Deserialize, Serialize, ToSchema, IntoParams)] +pub struct ProjectMessageFavoriteQuery { + pub page: Option, + pub per_page: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)] +pub struct ProjectMessageFavoriteItem { + pub uid: Uuid, + pub project_uid: Uuid, + pub room_id: Uuid, + pub room_name: String, + pub message_id: Uuid, + pub sender_id: Option, + pub sender_type: String, + pub display_name: Option, + pub content: String, + pub content_type: String, + pub send_at: DateTime, + pub favorited_at: DateTime, +} + +#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)] +pub struct ProjectMessageFavoriteResponse { + pub page: u64, + pub per_page: u64, + pub total: u64, + pub list: Vec, +} + +impl AppService { + pub async fn project_message_favorites( + &self, + ctx: &Session, + project_name: String, + query: ProjectMessageFavoriteQuery, + ) -> Result { + let user_uid = ctx.user().ok_or(AppError::Unauthorized)?; + let project = self.utils_find_project_by_name(project_name).await?; + self.require_project_member(project.id, user_uid).await?; + + let page = std::cmp::max(query.page.unwrap_or(1), 1); + let per_page = std::cmp::min(std::cmp::max(query.per_page.unwrap_or(20), 1), 100); + + let base = project_message_favorite::Entity::find() + .filter(project_message_favorite::Column::Project.eq(project.id)) + .filter(project_message_favorite::Column::User.eq(user_uid)); + let total = base.clone().count(&self.db).await?; + + let rows = base + .join( + JoinType::InnerJoin, + project_message_favorite::Entity::belongs_to(room_message::Entity) + .from(project_message_favorite::Column::Message) + .to(room_message::Column::Id) + .into(), + ) + .join( + JoinType::InnerJoin, + project_message_favorite::Entity::belongs_to(room::Entity) + .from(project_message_favorite::Column::Room) + .to(room::Column::Id) + .into(), + ) + .filter(room_message::Column::Revoked.is_null()) + .order_by_desc(project_message_favorite::Column::CreatedAt) + .select_only() + .column_as(project_message_favorite::Column::Uid, "uid") + .column_as(project_message_favorite::Column::Project, "project_uid") + .column_as(project_message_favorite::Column::Room, "room_id") + .column_as(room::Column::RoomName, "room_name") + .column_as(project_message_favorite::Column::Message, "message_id") + .column_as(room_message::Column::SenderId, "sender_id") + .column_as(Expr::cust("room_message.sender_type::text"), "sender_type") + .column_as(room_message::Column::Content, "content") + .column_as( + Expr::cust("room_message.content_type::text"), + "content_type", + ) + .column_as(room_message::Column::SendAt, "send_at") + .column_as(project_message_favorite::Column::CreatedAt, "favorited_at") + .column_as( + Expr::cust( + "coalesce(room_message.sender_id::text, room_message.sender_type::text)", + ), + "display_name", + ) + .limit(per_page) + .offset((page - 1) * per_page) + .into_tuple::<( + Uuid, + Uuid, + Uuid, + String, + Uuid, + Option, + String, + String, + String, + DateTime, + DateTime, + Option, + )>() + .all(&self.db) + .await?; + + let list = rows + .into_iter() + .map( + |( + uid, + project_uid, + room_id, + room_name, + message_id, + sender_id, + sender_type, + content, + content_type, + send_at, + favorited_at, + display_name, + )| ProjectMessageFavoriteItem { + uid, + project_uid, + room_id, + room_name, + message_id, + sender_id, + sender_type, + display_name, + content, + content_type, + send_at, + favorited_at, + }, + ) + .collect(); + + Ok(ProjectMessageFavoriteResponse { + page, + per_page, + total, + list, + }) + } + + pub async fn project_message_favorite_add( + &self, + ctx: &Session, + project_name: String, + message_id: Uuid, + ) -> Result { + let user_uid = ctx.user().ok_or(AppError::Unauthorized)?; + let project = self + .utils_find_project_by_name(project_name.clone()) + .await?; + self.require_project_member(project.id, user_uid).await?; + + let message = room_message::Entity::find_by_id(message_id) + .one(&self.db) + .await? + .ok_or_else(|| AppError::NotFound("message".to_string()))?; + if message.revoked.is_some() { + return Err(AppError::NotFound("message".to_string())); + } + + let room = room::Entity::find_by_id(message.room) + .one(&self.db) + .await? + .ok_or_else(|| AppError::NotFound("room".to_string()))?; + if room.project != project.id { + return Err(AppError::NotFound("message".to_string())); + } + + if let Some(existing) = project_message_favorite::Entity::find() + .filter(project_message_favorite::Column::User.eq(user_uid)) + .filter(project_message_favorite::Column::Message.eq(message.id)) + .one(&self.db) + .await? + { + return Ok(ProjectMessageFavoriteItem { + uid: existing.uid, + project_uid: existing.project, + room_id: existing.room, + room_name: room.room_name, + message_id: existing.message, + sender_id: message.sender_id, + sender_type: message.sender_type.to_string(), + display_name: message + .sender_id + .map(|id| id.to_string()) + .or_else(|| Some(message.sender_type.to_string())), + content: message.content, + content_type: message.content_type.to_string(), + send_at: message.send_at, + favorited_at: existing.created_at, + }); + } + + let created = project_message_favorite::ActiveModel { + uid: Set(Uuid::new_v4()), + project: Set(project.id), + room: Set(message.room), + message: Set(message.id), + user: Set(user_uid), + created_at: Set(Utc::now()), + } + .insert(&self.db) + .await?; + + Ok(ProjectMessageFavoriteItem { + uid: created.uid, + project_uid: created.project, + room_id: created.room, + room_name: room.room_name, + message_id: created.message, + sender_id: message.sender_id, + sender_type: message.sender_type.to_string(), + display_name: message + .sender_id + .map(|id| id.to_string()) + .or_else(|| Some(message.sender_type.to_string())), + content: message.content, + content_type: message.content_type.to_string(), + send_at: message.send_at, + favorited_at: created.created_at, + }) + } + + pub async fn project_message_favorite_remove( + &self, + ctx: &Session, + project_name: String, + message_id: Uuid, + ) -> Result<(), AppError> { + let user_uid = ctx.user().ok_or(AppError::Unauthorized)?; + let project = self.utils_find_project_by_name(project_name).await?; + self.require_project_member(project.id, user_uid).await?; + + project_message_favorite::Entity::delete_many() + .filter(project_message_favorite::Column::Project.eq(project.id)) + .filter(project_message_favorite::Column::User.eq(user_uid)) + .filter(project_message_favorite::Column::Message.eq(message_id)) + .exec(&self.db) + .await?; + Ok(()) + } + + async fn require_project_member( + &self, + project_id: Uuid, + user_id: Uuid, + ) -> Result<(), AppError> { + let member = project_members::Entity::find() + .filter(project_members::Column::Project.eq(project_id)) + .filter(project_members::Column::User.eq(user_id)) + .one(&self.db) + .await?; + if member.is_some() { + Ok(()) + } else { + Err(AppError::NoPower) + } + } +} diff --git a/libs/service/project/mod.rs b/libs/service/project/mod.rs index a5701e6..5399c7f 100644 --- a/libs/service/project/mod.rs +++ b/libs/service/project/mod.rs @@ -13,6 +13,7 @@ pub mod join_settings; pub mod labels; pub mod like; pub mod members; +pub mod message_favorite; pub mod repo; pub mod settings; pub mod standard; diff --git a/libs/service/skill/scanner.rs b/libs/service/skill/scanner.rs index 2ac1747..dd6f094 100644 --- a/libs/service/skill/scanner.rs +++ b/libs/service/skill/scanner.rs @@ -15,6 +15,10 @@ use sha1::Digest; use std::path::Path; use uuid::Uuid; +fn should_descend_dir(name: &str) -> bool { + name != ".git" +} + /// Skill discovery result from a single repository. #[derive(Debug)] pub struct DiscoveredSkill { @@ -45,13 +49,17 @@ fn git_blob_hash(content: &[u8]) -> String { hex::encode(hasher.finalize()) } +fn parse_frontmatter(frontmatter: Option<&str>) -> serde_json::Value { + frontmatter + .and_then(|fm| serde_json::from_str(fm).ok()) + .or_else(|| frontmatter.and_then(|fm| serde_yaml::from_str(fm).ok())) + .unwrap_or_default() +} + /// Parse a SKILL.md file and extract metadata + content. fn parse_skill_file(slug: &str, raw: &str) -> DiscoveredSkill { let (frontmatter, content) = extract_frontmatter(raw); - - let metadata: serde_json::Value = frontmatter - .map(|fm| serde_json::from_str(fm).unwrap_or_default()) - .unwrap_or_default(); + let metadata = parse_frontmatter(frontmatter); let name = metadata .get("name") @@ -119,7 +127,6 @@ pub fn scan_repo_for_skills( .parent() .and_then(|p| p.file_name()) .and_then(|n| n.to_str()) - .filter(|s| !s.starts_with('.')) { let slug = format!("{}/{}", repo_id_prefix, dir_name); if let Ok(raw) = std::fs::read(&path) { @@ -168,7 +175,7 @@ pub fn scan_repo_tree_for_skills( match entry.kind() { Some(git2::ObjectType::Tree) => { - if !name.starts_with('.') { + if should_descend_dir(name) { if let Ok(subtree) = entry.to_object(git_repo).and_then(|o| o.peel_to_tree()) { @@ -181,8 +188,7 @@ pub fn scan_repo_tree_for_skills( let dir_name = std::path::Path::new(&entry_path) .parent() .and_then(|p| p.file_name()) - .and_then(|n| n.to_str()) - .filter(|s| !s.starts_with('.')); + .and_then(|n| n.to_str()); let Some(dir_name) = dir_name else { continue }; let slug = format!("{}/{}", repo_id_prefix, dir_name); @@ -202,6 +208,46 @@ pub fn scan_repo_tree_for_skills( Ok(discovered) } +#[cfg(test)] +mod tests { + use super::scan_repo_tree_for_skills; + use git2::{Repository, Signature}; + use std::fs; + use tempfile::tempdir; + use uuid::Uuid; + + #[test] + fn tree_scan_discovers_skills_under_hidden_dirs() { + let temp = tempdir().expect("tempdir"); + let repo = Repository::init(temp.path()).expect("init repo"); + + let skill_dir = temp.path().join(".claude").join("skills").join("demo-skill"); + fs::create_dir_all(&skill_dir).expect("create skill dir"); + fs::write( + skill_dir.join("SKILL.md"), + "---\nname: Demo Skill\ndescription: test\n---\ncontent", + ) + .expect("write skill"); + + let mut index = repo.index().expect("index"); + index + .add_path(std::path::Path::new(".claude/skills/demo-skill/SKILL.md")) + .expect("add skill"); + index.write().expect("write index"); + let tree_id = index.write_tree().expect("write tree"); + let tree = repo.find_tree(tree_id).expect("find tree"); + let sig = Signature::now("tester", "tester@example.com").expect("signature"); + repo.commit(Some("HEAD"), &sig, &sig, "add skill", &tree, &[]) + .expect("commit"); + + let discovered = scan_repo_tree_for_skills(&repo, Uuid::nil()).expect("scan tree"); + + assert_eq!(discovered.len(), 1); + assert_eq!(discovered[0].name, "Demo Skill"); + assert_eq!(discovered[0].slug, "00000000/demo-skill"); + } +} + /// Scan a git2::Repository for skills and upsert them into the database. /// Uses filesystem walk for normal repos, git tree traversal for bare repos. pub async fn scan_and_sync_skills( diff --git a/libs/service/user/billing.rs b/libs/service/user/billing.rs index a374d23..31e8e9a 100644 --- a/libs/service/user/billing.rs +++ b/libs/service/user/billing.rs @@ -2,8 +2,7 @@ use crate::AppService; use crate::error::AppError; use chrono::{DateTime, Utc}; use models::ai::billing_error; -use models::projects::project_billing_history; -use models::users::user_billing; +use models::users::{user_billing, user_billing_history}; use sea_orm::sea_query::prelude::rust_decimal::prelude::ToPrimitive; use sea_orm::*; use serde::{Deserialize, Serialize}; @@ -51,8 +50,8 @@ pub struct UserBillingHistoryQuery { #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct UserBillingHistoryItem { pub uid: Uuid, - pub project_uid: Uuid, - pub user_uid: Option, + pub project_uid: Option, + pub user_uid: Uuid, pub amount: f64, pub currency: String, pub reason: String, @@ -152,9 +151,9 @@ impl AppService { let page = std::cmp::max(query.page.unwrap_or(1), 1); let per_page = query.per_page.unwrap_or(20).clamp(1, 200); - let paginator = project_billing_history::Entity::find() - .filter(project_billing_history::Column::User.eq(user_uid)) - .order_by_desc(project_billing_history::Column::CreatedAt) + let paginator = user_billing_history::Entity::find() + .filter(user_billing_history::Column::User.eq(user_uid)) + .order_by_desc(user_billing_history::Column::CreatedAt) .paginate(&self.db, per_page); let total = paginator.num_items().await?; let rows = paginator.fetch_page(page - 1).await?; @@ -163,7 +162,7 @@ impl AppService { .into_iter() .map(|x| UserBillingHistoryItem { uid: x.uid, - project_uid: x.project, + project_uid: None, user_uid: x.user, amount: x.amount.to_f64().unwrap_or_default(), currency: x.currency,