From 52e1831452ed2651f054cc332483a7981c95a84e Mon Sep 17 00:00:00 2001 From: ZhenYi <434836402@qq.com> Date: Thu, 14 May 2026 10:02:15 +0800 Subject: [PATCH] refactor(observability,queue): apply rustfmt formatting --- libs/observability/src/business_metrics.rs | 218 +++++++++++++++--- libs/observability/src/lib.rs | 24 +- libs/observability/src/metrics_middleware.rs | 44 +++- libs/observability/src/msg_json_fmt.rs | 17 +- libs/observability/src/otlp.rs | 15 +- libs/observability/src/prometheus_exporter.rs | 5 +- libs/observability/src/push.rs | 15 +- libs/observability/src/tracing_fmt.rs | 2 +- libs/observability/src/tracing_init.rs | 5 +- libs/observability/src/tracing_middleware.rs | 4 +- libs/queue/lib.rs | 6 +- libs/queue/nats_client.rs | 16 +- libs/queue/producer.rs | 41 ++-- libs/queue/worker.rs | 75 +++--- 14 files changed, 331 insertions(+), 156 deletions(-) diff --git a/libs/observability/src/business_metrics.rs b/libs/observability/src/business_metrics.rs index ae0eca1..327b690 100644 --- a/libs/observability/src/business_metrics.rs +++ b/libs/observability/src/business_metrics.rs @@ -89,61 +89,205 @@ pub const ACTIVE_ROOM_PARTICIPANTS: &str = "active_room_participants"; /// Called from `install_recorder()` in `prometheus_exporter.rs`. pub fn describe_business_metrics() { // Project - metrics::describe_counter!(PROJECTS_CREATED_TOTAL, metrics::Unit::Count, "Projects created"); - metrics::describe_counter!(PROJECTS_DELETED_TOTAL, metrics::Unit::Count, "Projects deleted"); - metrics::describe_counter!(PROJECT_MEMBERS_ADDED_TOTAL, metrics::Unit::Count, "Project members added"); - metrics::describe_counter!(PROJECT_MEMBERS_REMOVED_TOTAL, metrics::Unit::Count, "Project members removed"); - metrics::describe_counter!(PROJECT_MEMBERS_ROLE_CHANGED_TOTAL, metrics::Unit::Count, "Project member role changes"); + metrics::describe_counter!( + PROJECTS_CREATED_TOTAL, + metrics::Unit::Count, + "Projects created" + ); + metrics::describe_counter!( + PROJECTS_DELETED_TOTAL, + metrics::Unit::Count, + "Projects deleted" + ); + metrics::describe_counter!( + PROJECT_MEMBERS_ADDED_TOTAL, + metrics::Unit::Count, + "Project members added" + ); + metrics::describe_counter!( + PROJECT_MEMBERS_REMOVED_TOTAL, + metrics::Unit::Count, + "Project members removed" + ); + metrics::describe_counter!( + PROJECT_MEMBERS_ROLE_CHANGED_TOTAL, + metrics::Unit::Count, + "Project member role changes" + ); metrics::describe_counter!(PROJECT_LIKES_TOTAL, metrics::Unit::Count, "Project likes"); - metrics::describe_counter!(PROJECT_UNLIKES_TOTAL, metrics::Unit::Count, "Project unlikes"); - metrics::describe_counter!(PROJECT_WATCHES_TOTAL, metrics::Unit::Count, "Project watches"); - metrics::describe_counter!(PROJECT_UNWATCHES_TOTAL, metrics::Unit::Count, "Project unwatches"); + metrics::describe_counter!( + PROJECT_UNLIKES_TOTAL, + metrics::Unit::Count, + "Project unlikes" + ); + metrics::describe_counter!( + PROJECT_WATCHES_TOTAL, + metrics::Unit::Count, + "Project watches" + ); + metrics::describe_counter!( + PROJECT_UNWATCHES_TOTAL, + metrics::Unit::Count, + "Project unwatches" + ); // Issue metrics::describe_counter!(ISSUES_OPENED_TOTAL, metrics::Unit::Count, "Issues opened"); metrics::describe_counter!(ISSUES_CLOSED_TOTAL, metrics::Unit::Count, "Issues closed"); - metrics::describe_counter!(ISSUES_REOPENED_TOTAL, metrics::Unit::Count, "Issues reopened"); + metrics::describe_counter!( + ISSUES_REOPENED_TOTAL, + metrics::Unit::Count, + "Issues reopened" + ); metrics::describe_counter!(ISSUES_DELETED_TOTAL, metrics::Unit::Count, "Issues deleted"); metrics::describe_counter!(ISSUES_UPDATED_TOTAL, metrics::Unit::Count, "Issues updated"); - metrics::describe_counter!(ISSUE_COMMENTS_CREATED_TOTAL, metrics::Unit::Count, "Issue comments created"); - metrics::describe_counter!(ISSUE_COMMENTS_DELETED_TOTAL, metrics::Unit::Count, "Issue comments deleted"); + metrics::describe_counter!( + ISSUE_COMMENTS_CREATED_TOTAL, + metrics::Unit::Count, + "Issue comments created" + ); + metrics::describe_counter!( + ISSUE_COMMENTS_DELETED_TOTAL, + metrics::Unit::Count, + "Issue comments deleted" + ); // Pull Request - metrics::describe_counter!(PRS_OPENED_TOTAL, metrics::Unit::Count, "Pull requests opened"); - metrics::describe_counter!(PRS_MERGED_TOTAL, metrics::Unit::Count, "Pull requests merged"); - metrics::describe_counter!(PRS_CLOSED_TOTAL, metrics::Unit::Count, "Pull requests closed (without merge)"); - metrics::describe_counter!(PRS_UPDATED_TOTAL, metrics::Unit::Count, "Pull requests updated"); - metrics::describe_counter!(PR_REVIEWS_SUBMITTED_TOTAL, metrics::Unit::Count, "PR reviews submitted"); - metrics::describe_counter!(PR_REVIEW_COMMENTS_TOTAL, metrics::Unit::Count, "PR review comments"); + metrics::describe_counter!( + PRS_OPENED_TOTAL, + metrics::Unit::Count, + "Pull requests opened" + ); + metrics::describe_counter!( + PRS_MERGED_TOTAL, + metrics::Unit::Count, + "Pull requests merged" + ); + metrics::describe_counter!( + PRS_CLOSED_TOTAL, + metrics::Unit::Count, + "Pull requests closed (without merge)" + ); + metrics::describe_counter!( + PRS_UPDATED_TOTAL, + metrics::Unit::Count, + "Pull requests updated" + ); + metrics::describe_counter!( + PR_REVIEWS_SUBMITTED_TOTAL, + metrics::Unit::Count, + "PR reviews submitted" + ); + metrics::describe_counter!( + PR_REVIEW_COMMENTS_TOTAL, + metrics::Unit::Count, + "PR review comments" + ); // Room - metrics::describe_counter!(ROOMS_CREATED_TOTAL, metrics::Unit::Count, "Chat rooms created"); - metrics::describe_counter!(ROOMS_DELETED_TOTAL, metrics::Unit::Count, "Chat rooms deleted"); - metrics::describe_counter!(ROOMS_UPDATED_TOTAL, metrics::Unit::Count, "Chat rooms updated"); - metrics::describe_counter!(ROOM_MESSAGES_SENT_TOTAL, metrics::Unit::Count, "Room messages sent (human)"); - metrics::describe_counter!(ROOM_MESSAGES_AI_TOTAL, metrics::Unit::Count, "Room messages sent (AI)"); - metrics::describe_counter!(ROOM_THREADS_CREATED_TOTAL, metrics::Unit::Count, "Room threads created"); + metrics::describe_counter!( + ROOMS_CREATED_TOTAL, + metrics::Unit::Count, + "Chat rooms created" + ); + metrics::describe_counter!( + ROOMS_DELETED_TOTAL, + metrics::Unit::Count, + "Chat rooms deleted" + ); + metrics::describe_counter!( + ROOMS_UPDATED_TOTAL, + metrics::Unit::Count, + "Chat rooms updated" + ); + metrics::describe_counter!( + ROOM_MESSAGES_SENT_TOTAL, + metrics::Unit::Count, + "Room messages sent (human)" + ); + metrics::describe_counter!( + ROOM_MESSAGES_AI_TOTAL, + metrics::Unit::Count, + "Room messages sent (AI)" + ); + metrics::describe_counter!( + ROOM_THREADS_CREATED_TOTAL, + metrics::Unit::Count, + "Room threads created" + ); // Repo / Git metrics::describe_counter!(REPOS_CREATED_TOTAL, metrics::Unit::Count, "Repos created"); - metrics::describe_counter!(GIT_COMMITS_PUSHED_TOTAL, metrics::Unit::Count, "Git commits pushed"); - metrics::describe_counter!(GIT_BRANCHES_CREATED_TOTAL, metrics::Unit::Count, "Git branches created"); - metrics::describe_counter!(GIT_BRANCHES_DELETED_TOTAL, metrics::Unit::Count, "Git branches deleted"); - metrics::describe_counter!(GIT_TAGS_CREATED_TOTAL, metrics::Unit::Count, "Git tags created"); - metrics::describe_counter!(GIT_TAGS_DELETED_TOTAL, metrics::Unit::Count, "Git tags deleted"); - metrics::describe_counter!(GIT_CLONES_TOTAL, metrics::Unit::Count, "Git clone/fetch operations"); + metrics::describe_counter!( + GIT_COMMITS_PUSHED_TOTAL, + metrics::Unit::Count, + "Git commits pushed" + ); + metrics::describe_counter!( + GIT_BRANCHES_CREATED_TOTAL, + metrics::Unit::Count, + "Git branches created" + ); + metrics::describe_counter!( + GIT_BRANCHES_DELETED_TOTAL, + metrics::Unit::Count, + "Git branches deleted" + ); + metrics::describe_counter!( + GIT_TAGS_CREATED_TOTAL, + metrics::Unit::Count, + "Git tags created" + ); + metrics::describe_counter!( + GIT_TAGS_DELETED_TOTAL, + metrics::Unit::Count, + "Git tags deleted" + ); + metrics::describe_counter!( + GIT_CLONES_TOTAL, + metrics::Unit::Count, + "Git clone/fetch operations" + ); // Billing - metrics::describe_counter!(BILLING_CREDITS_USED_TOTAL, metrics::Unit::Count, "Billing credits consumed"); + metrics::describe_counter!( + BILLING_CREDITS_USED_TOTAL, + metrics::Unit::Count, + "Billing credits consumed" + ); metrics::describe_counter!(BILLING_ERRORS_TOTAL, metrics::Unit::Count, "Billing errors"); - metrics::describe_counter!(BILLING_CREDITS_ADDED_TOTAL, metrics::Unit::Count, "Billing credits added (top-up)"); + metrics::describe_counter!( + BILLING_CREDITS_ADDED_TOTAL, + metrics::Unit::Count, + "Billing credits added (top-up)" + ); // AI - metrics::describe_counter!(AI_ROOM_CALLS_TOTAL, metrics::Unit::Count, "AI calls in room context"); - metrics::describe_counter!(AI_CHAT_CONVERSATIONS_CREATED, metrics::Unit::Count, "AI chat conversations created"); - metrics::describe_counter!(AI_CHAT_MESSAGES_SENT, metrics::Unit::Count, "AI chat messages sent"); + metrics::describe_counter!( + AI_ROOM_CALLS_TOTAL, + metrics::Unit::Count, + "AI calls in room context" + ); + metrics::describe_counter!( + AI_CHAT_CONVERSATIONS_CREATED, + metrics::Unit::Count, + "AI chat conversations created" + ); + metrics::describe_counter!( + AI_CHAT_MESSAGES_SENT, + metrics::Unit::Count, + "AI chat messages sent" + ); // Gauges - metrics::describe_gauge!(ACTIVE_CONNECTIONS, metrics::Unit::Count, "Active WebSocket connections"); - metrics::describe_gauge!(ACTIVE_ROOM_PARTICIPANTS, metrics::Unit::Count, "Active room participants"); -} \ No newline at end of file + metrics::describe_gauge!( + ACTIVE_CONNECTIONS, + metrics::Unit::Count, + "Active WebSocket connections" + ); + metrics::describe_gauge!( + ACTIVE_ROOM_PARTICIPANTS, + metrics::Unit::Count, + "Active room participants" + ); +} diff --git a/libs/observability/src/lib.rs b/libs/observability/src/lib.rs index 026fb99..15a1e70 100644 --- a/libs/observability/src/lib.rs +++ b/libs/observability/src/lib.rs @@ -3,24 +3,24 @@ //! Call `observability::init_tracing_subscriber(level)` once at startup. //! All services then use `tracing::info!`, `tracing::warn!`, etc. directly. +pub mod business_metrics; +pub mod metrics_middleware; +pub mod msg_json_fmt; +pub mod otlp; +pub mod prometheus_exporter; +pub mod push; pub mod tracing_fmt; pub mod tracing_init; -pub mod msg_json_fmt; -pub mod metrics_middleware; -pub mod prometheus_exporter; -pub mod business_metrics; -pub mod otlp; pub mod tracing_middleware; -pub mod push; -pub use tracing_fmt::{init_tracing_subscriber, instance_id}; +pub use metrics_middleware::{HttpMetrics, MetricsMiddleware}; pub use msg_json_fmt::set_span_msg; -pub use metrics_middleware::{MetricsMiddleware, HttpMetrics}; pub use prometheus_exporter::{ - install_recorder, prometheus_handler, spawn_http_metrics_poller, - HttpMetricsSnapshot, HttpSnapshotGuard, render_to_hashmap, + HttpMetricsSnapshot, HttpSnapshotGuard, install_recorder, prometheus_handler, + render_to_hashmap, spawn_http_metrics_poller, }; +pub use tracing_fmt::{init_tracing_subscriber, instance_id}; pub type PrometheusHandle = metrics_exporter_prometheus::PrometheusHandle; -pub use otlp::{init_otlp, OtelGuard}; -pub use tracing_middleware::TracingSpanMiddleware; pub use business_metrics::*; +pub use otlp::{OtelGuard, init_otlp}; +pub use tracing_middleware::TracingSpanMiddleware; diff --git a/libs/observability/src/metrics_middleware.rs b/libs/observability/src/metrics_middleware.rs index 36fce0e..d92fd16 100644 --- a/libs/observability/src/metrics_middleware.rs +++ b/libs/observability/src/metrics_middleware.rs @@ -37,7 +37,10 @@ impl HttpMetrics { /// Increment the counter for a specific HTTP endpoint (method + path). pub fn incr_endpoint(&self, method: &str, path: &str) { let key = format!("{} {}", method, path); - let mut map = self.endpoint_counts.write().unwrap_or_else(|e| e.into_inner()); + let mut map = self + .endpoint_counts + .write() + .unwrap_or_else(|e| e.into_inner()); let counter = map.entry(key).or_insert_with(|| AtomicU64::new(0)); counter.fetch_add(1, Ordering::Relaxed); } @@ -45,19 +48,40 @@ impl HttpMetrics { /// Returns a snapshot of all current counter values. pub fn snapshot(&self) -> HashMap { let mut m = HashMap::new(); - m.insert("http_requests_total".into(), serde_json::json!(self.request_count.load(Ordering::Relaxed))); - m.insert("http_request_duration_ms_total".into(), serde_json::json!(self.total_duration_ms.load(Ordering::Relaxed))); - m.insert("http_requests_2xx".into(), serde_json::json!(self.status_2xx.load(Ordering::Relaxed))); - m.insert("http_requests_4xx".into(), serde_json::json!(self.status_4xx.load(Ordering::Relaxed))); - m.insert("http_requests_5xx".into(), serde_json::json!(self.status_5xx.load(Ordering::Relaxed))); + m.insert( + "http_requests_total".into(), + serde_json::json!(self.request_count.load(Ordering::Relaxed)), + ); + m.insert( + "http_request_duration_ms_total".into(), + serde_json::json!(self.total_duration_ms.load(Ordering::Relaxed)), + ); + m.insert( + "http_requests_2xx".into(), + serde_json::json!(self.status_2xx.load(Ordering::Relaxed)), + ); + m.insert( + "http_requests_4xx".into(), + serde_json::json!(self.status_4xx.load(Ordering::Relaxed)), + ); + m.insert( + "http_requests_5xx".into(), + serde_json::json!(self.status_5xx.load(Ordering::Relaxed)), + ); // Per-endpoint counters - let map = self.endpoint_counts.read().unwrap_or_else(|e| e.into_inner()); + let map = self + .endpoint_counts + .read() + .unwrap_or_else(|e| e.into_inner()); for (key, counter) in map.iter() { // Sanitize key for use as metric name: replace spaces and slashes with underscores let sanitized = key.replace([' ', '/'], "_").to_lowercase(); let metric_key = format!("http_endpoint_{}", sanitized); - m.insert(metric_key, serde_json::json!(counter.load(Ordering::Relaxed))); + m.insert( + metric_key, + serde_json::json!(counter.load(Ordering::Relaxed)), + ); } m @@ -139,7 +163,9 @@ where // Update counters atomically. metrics.request_count.fetch_add(1, Ordering::Relaxed); - metrics.total_duration_ms.fetch_add(elapsed_ms, Ordering::Relaxed); + metrics + .total_duration_ms + .fetch_add(elapsed_ms, Ordering::Relaxed); metrics.incr_endpoint(&method, &path); match status_code { diff --git a/libs/observability/src/msg_json_fmt.rs b/libs/observability/src/msg_json_fmt.rs index 1b98a0f..df8306b 100644 --- a/libs/observability/src/msg_json_fmt.rs +++ b/libs/observability/src/msg_json_fmt.rs @@ -63,8 +63,10 @@ impl Visit for FieldCollector { if field.name() == "message" { self.message = Some(value.to_string()); } else { - self.fields - .insert(field.name().to_string(), serde_json::Value::String(value.to_string())); + self.fields.insert( + field.name().to_string(), + serde_json::Value::String(value.to_string()), + ); } } @@ -91,10 +93,8 @@ impl Visit for FieldCollector { } fn record_bool(&mut self, field: &Field, value: bool) { - self.fields.insert( - field.name().to_string(), - serde_json::Value::Bool(value), - ); + self.fields + .insert(field.name().to_string(), serde_json::Value::Bool(value)); } } @@ -145,7 +145,10 @@ where ); } if let Some(file) = event.metadata().file() { - ordered.insert("file".to_string(), serde_json::Value::String(file.to_string())); + ordered.insert( + "file".to_string(), + serde_json::Value::String(file.to_string()), + ); } if let Some(line) = event.metadata().line() { ordered.insert("line".to_string(), serde_json::Value::Number(line.into())); diff --git a/libs/observability/src/otlp.rs b/libs/observability/src/otlp.rs index 6618776..a259a65 100644 --- a/libs/observability/src/otlp.rs +++ b/libs/observability/src/otlp.rs @@ -13,7 +13,9 @@ use crate::msg_json_fmt::MsgJsonFormat; use opentelemetry::trace::TracerProvider; use opentelemetry_otlp::{SpanExporter, WithExportConfig}; use opentelemetry_sdk::trace as sdktrace; -use tracing_subscriber::{fmt, fmt::Layer as FmtLayer, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; +use tracing_subscriber::{ + EnvFilter, fmt, fmt::Layer as FmtLayer, layer::SubscriberExt, util::SubscriberInitExt, +}; /// Guard that shuts down the OTLP pipeline on drop. #[must_use] @@ -58,11 +60,10 @@ pub fn init_otlp( .build() .map_err(|e| InitOtlError::ExporterInit(e.to_string()))?; - let env_filter = EnvFilter::try_from_default_env() - .unwrap_or_else(|_| EnvFilter::new(log_level)); + let env_filter = + EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(log_level)); - let fmt_layer: FmtLayer<_, _, _, _> = fmt::layer() - .event_format(MsgJsonFormat); + let fmt_layer: FmtLayer<_, _, _, _> = fmt::layer().event_format(MsgJsonFormat); let tracer_provider = sdktrace::SdkTracerProvider::builder() .with_batch_exporter(exporter) @@ -82,7 +83,9 @@ pub fn init_otlp( tracing::debug!(endpoint = %endpoint, "OTLP tracer installed"); - Ok(Some(OtelGuard { provider: tracer_provider })) + Ok(Some(OtelGuard { + provider: tracer_provider, + })) } #[derive(Debug, thiserror::Error)] diff --git a/libs/observability/src/prometheus_exporter.rs b/libs/observability/src/prometheus_exporter.rs index 89aca34..3d771bc 100644 --- a/libs/observability/src/prometheus_exporter.rs +++ b/libs/observability/src/prometheus_exporter.rs @@ -12,8 +12,8 @@ //! `RoomMetrics` must be constructed **after** `install_recorder()` is called, //! because its `register_*` macro calls require a global recorder to be set. -use actix_web::{web, HttpRequest, HttpResponse}; -use metrics::{describe_counter, describe_histogram, Unit}; +use actix_web::{HttpRequest, HttpResponse, web}; +use metrics::{Unit, describe_counter, describe_histogram}; use metrics_exporter_prometheus::PrometheusBuilder; use std::collections::HashMap; use std::sync::atomic::Ordering; @@ -97,7 +97,6 @@ pub fn render_to_hashmap(body: &str) -> HashMap { /// Re-export `HttpMetrics` so callers don't need to import from `metrics_middleware`. pub use crate::metrics_middleware::HttpMetrics; - /// Live HTTP metric values updated by the background poller. #[derive(Debug, Clone, Default)] pub struct HttpMetricsSnapshot { diff --git a/libs/observability/src/push.rs b/libs/observability/src/push.rs index dcfe3ca..611a95d 100644 --- a/libs/observability/src/push.rs +++ b/libs/observability/src/push.rs @@ -18,8 +18,8 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; -use crate::metrics_middleware::HttpMetrics; use crate::instance_id; +use crate::metrics_middleware::HttpMetrics; // ── Payload types ────────────────────────────────────────────────────────────── @@ -265,7 +265,7 @@ impl MetricsPusher { system, business: business_filtered, token_usage, - tasks: None, // Populated by apps that have task queues + tasks: None, // Populated by apps that have task queues latency: HashMap::new(), // Populated from histogram data logs: Vec::new(), } @@ -310,7 +310,9 @@ mod tests { fn payload_serialises_to_json() { let _pusher = MetricsPusher::new("http://localhost:9090", "test-app"); let metrics = HttpMetrics::new(); - metrics.request_count.fetch_add(42, std::sync::atomic::Ordering::Relaxed); + metrics + .request_count + .fetch_add(42, std::sync::atomic::Ordering::Relaxed); let _handle = metrics_exporter_prometheus::PrometheusBuilder::new() .build_recorder() .handle(); @@ -318,7 +320,10 @@ mod tests { // Just check the HTTP payload portion. let snapshot = metrics.snapshot(); let http = HttpPayload { - requests_total: snapshot.get("http_requests_total").and_then(|v| v.as_u64()).unwrap_or(0), + requests_total: snapshot + .get("http_requests_total") + .and_then(|v| v.as_u64()) + .unwrap_or(0), request_duration_ms_total: 0, requests_2xx: 0, requests_4xx: 0, @@ -335,4 +340,4 @@ mod tests { assert!(payload.uptime_secs >= 100); assert!(payload.memory_total_mb > 0); } -} \ No newline at end of file +} diff --git a/libs/observability/src/tracing_fmt.rs b/libs/observability/src/tracing_fmt.rs index 9c70c0c..d3475f1 100644 --- a/libs/observability/src/tracing_fmt.rs +++ b/libs/observability/src/tracing_fmt.rs @@ -11,10 +11,10 @@ use std::io::IsTerminal; use std::str::FromStr; use tracing_subscriber::Layer; use tracing_subscriber::{ + EnvFilter, fmt::{self, format::FmtSpan}, layer::SubscriberExt, util::SubscriberInitExt, - EnvFilter, }; /// Global instance identifier, resolved once at startup. diff --git a/libs/observability/src/tracing_init.rs b/libs/observability/src/tracing_init.rs index 3c30f8e..04caa3a 100644 --- a/libs/observability/src/tracing_init.rs +++ b/libs/observability/src/tracing_init.rs @@ -3,16 +3,15 @@ //! Call `init_tracing()` during application startup to set up the //! tracing-subscriber fmt layer (writes human-readable spans to stderr). -use tracing_subscriber::{fmt, EnvFilter}; use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::{EnvFilter, fmt}; /// Initialize tracing with a fmt layer. /// /// The `EnvFilter` reads the `RUST_LOG` environment variable to /// set the log level (e.g. `RUST_LOG=info`). pub fn init_tracing() { - let env_filter = EnvFilter::try_from_default_env() - .unwrap_or_else(|_| EnvFilter::new("info")); + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); let fmt_layer = fmt::layer() .with_target(true) diff --git a/libs/observability/src/tracing_middleware.rs b/libs/observability/src/tracing_middleware.rs index e44b956..6489cf2 100644 --- a/libs/observability/src/tracing_middleware.rs +++ b/libs/observability/src/tracing_middleware.rs @@ -62,7 +62,9 @@ where { type Response = ServiceResponse; type Error = actix_web::Error; - type Future = std::pin::Pin> + 'static>>; + type Future = std::pin::Pin< + Box> + 'static>, + >; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { self.service.poll_ready(cx) diff --git a/libs/queue/lib.rs b/libs/queue/lib.rs index 3d5d361..3e40491 100644 --- a/libs/queue/lib.rs +++ b/libs/queue/lib.rs @@ -12,6 +12,6 @@ pub use types::{ ReactionGroup, RoomMessageEnvelope, RoomMessageEvent, RoomMessageStreamChunkEvent, TypingEvent, }; pub use worker::{ - room_worker_task, start as start_worker, start_email_worker, EmailSendFn, EmailSendFut, - NatsConsumeFn, PersistFn, -}; \ No newline at end of file + EmailSendFn, EmailSendFut, NatsConsumeFn, PersistFn, room_worker_task, start as start_worker, + start_email_worker, +}; diff --git a/libs/queue/nats_client.rs b/libs/queue/nats_client.rs index 7a827b0..d977bfc 100644 --- a/libs/queue/nats_client.rs +++ b/libs/queue/nats_client.rs @@ -66,10 +66,7 @@ impl NatsClient { if !is { attempts += 1; if attempts > 12 { - tracing::error!( - "NATS disconnected for {}s", - attempts * 5 - ); + tracing::error!("NATS disconnected for {}s", attempts * 5); } } } @@ -136,11 +133,7 @@ impl NatsClient { /// Publish to core NATS for real-time broadcast. Fire-and-forget. pub async fn core_publish(&self, subject: String, payload: Vec) { - if let Err(e) = self - .client - .publish(subject.clone(), payload.into()) - .await - { + if let Err(e) = self.client.publish(subject.clone(), payload.into()).await { tracing::warn!(subject = %subject, error = %e, "NATS core publish failed"); } } @@ -166,10 +159,7 @@ impl NatsClient { } /// Create a core NATS subscription. Returns a subscriber receiver. - pub async fn subscribe( - &self, - subject: &str, - ) -> anyhow::Result { + pub async fn subscribe(&self, subject: &str) -> anyhow::Result { self.client .subscribe(subject.to_string()) .await diff --git a/libs/queue/producer.rs b/libs/queue/producer.rs index a264eb4..dd0c7b0 100644 --- a/libs/queue/producer.rs +++ b/libs/queue/producer.rs @@ -18,21 +18,26 @@ pub type NatsPublishResult = u64; pub struct MessageProducer { /// JetStream publish function for durable/persisted messages. pub jetstream_publish: Arc< - dyn Fn(String, Vec) -> std::pin::Pin< - Box> + Send>, - > + Send + dyn Fn( + String, + Vec, + ) + -> std::pin::Pin> + Send>> + + Send + Sync, >, /// Core NATS publish function for real-time broadcast (fire-and-forget). pub core_publish: Arc< - dyn Fn(String, Vec) -> std::pin::Pin< - Box + Send>, - > + Send + dyn Fn(String, Vec) -> std::pin::Pin + Send>> + + Send + Sync, >, /// Redis connection getter — kept for cache/seq access (notification count, etc.) - pub get_redis: - Arc tokio::task::JoinHandle> + Send + Sync>, + pub get_redis: Arc< + dyn Fn() -> tokio::task::JoinHandle> + + Send + + Sync, + >, /// Direct NATS client reference for subscriptions (watch endpoints, etc.) pub nats: Option>, } @@ -47,7 +52,10 @@ impl MessageProducer { >, ) -> Self { let js_fn: Arc< - dyn Fn(String, Vec) -> std::pin::Pin< + dyn Fn( + String, + Vec, + ) -> std::pin::Pin< Box> + Send>, > + Send + Sync, @@ -64,9 +72,12 @@ impl MessageProducer { }; let core_fn: Arc< - dyn Fn(String, Vec) -> std::pin::Pin< - Box + Send>, - > + Send + dyn Fn( + String, + Vec, + ) + -> std::pin::Pin + Send>> + + Send + Sync, > = if let Some(ref n) = nats { let n = n.clone(); @@ -75,9 +86,7 @@ impl MessageProducer { Box::pin(async move { n.core_publish(subject, payload).await }) as _ }) } else { - Arc::new(|_subject: String, _payload: Vec| { - Box::pin(async move {}) as _ - }) + Arc::new(|_subject: String, _payload: Vec| Box::pin(async move {}) as _) }; Self { @@ -221,4 +230,4 @@ impl MessageProducer { tracing::warn!(error = %e, conversation_id = %event.conversation_id, "JetStream chat chunk publish failed"); } } -} \ No newline at end of file +} diff --git a/libs/queue/worker.rs b/libs/queue/worker.rs index 2537adf..0eb4163 100644 --- a/libs/queue/worker.rs +++ b/libs/queue/worker.rs @@ -1,6 +1,6 @@ //! Room message worker: NATS JetStream durable pull consumer. -use crate::types::{EmailEnvelope, RoomMessageEvent, RoomMessageEnvelope}; +use crate::types::{EmailEnvelope, RoomMessageEnvelope, RoomMessageEvent}; use futures::StreamExt; use metrics::counter; use std::sync::Arc; @@ -19,7 +19,14 @@ pub type NatsConsumeFn = Arc< Output = anyhow::Result< Vec<( Vec, - Box std::pin::Pin> + Send>> + Send>, + Box< + dyn Fn() -> std::pin::Pin< + Box< + dyn std::future::Future> + + Send, + >, + > + Send, + >, )>, >, > + Send, @@ -30,7 +37,8 @@ pub type NatsConsumeFn = Arc< /// Function that persists a batch of room message envelopes to the database. pub type PersistFn = Arc) -> PersistFut + Send + Sync>; -pub type PersistFut = std::pin::Pin> + Send>>; +pub type PersistFut = + std::pin::Pin> + Send>>; /// Start the room message worker that consumes from NATS JetStream per room. pub async fn start( @@ -131,25 +139,18 @@ async fn consume_once( // Fetch up to BATCH_SIZE messages for _ in 0..BATCH_SIZE { - match tokio::time::timeout( - std::time::Duration::from_millis(500), - messages.next(), - ) - .await - { - Ok(Some(Ok(msg))) => { - match serde_json::from_slice::(&msg.payload) { - Ok(event) => { - let env = RoomMessageEnvelope::from(event); - batch.push(env); - acks.push(msg); - } - Err(e) => { - tracing::warn!(error = %e, "malformed envelope"); - let _ = msg.ack().await; - } + match tokio::time::timeout(std::time::Duration::from_millis(500), messages.next()).await { + Ok(Some(Ok(msg))) => match serde_json::from_slice::(&msg.payload) { + Ok(event) => { + let env = RoomMessageEnvelope::from(event); + batch.push(env); + acks.push(msg); } - } + Err(e) => { + tracing::warn!(error = %e, "malformed envelope"); + let _ = msg.ack().await; + } + }, Ok(Some(Err(e))) => { tracing::warn!(error = %e, "message error"); } @@ -181,7 +182,8 @@ async fn consume_once( /// Email send function type. pub type EmailSendFn = Arc) -> EmailSendFut + Send + Sync>; -pub type EmailSendFut = std::pin::Pin> + Send>>; +pub type EmailSendFut = + std::pin::Pin> + Send>>; /// Start the email worker that consumes from NATS JetStream. pub async fn start_email_worker( @@ -255,24 +257,17 @@ async fn email_consume_once( let mut acks: Vec = Vec::new(); for _ in 0..BATCH_SIZE { - match tokio::time::timeout( - std::time::Duration::from_millis(500), - messages.next(), - ) - .await - { - Ok(Some(Ok(msg))) => { - match serde_json::from_slice::(&msg.payload) { - Ok(env) => { - batch.push(env); - acks.push(msg); - } - Err(e) => { - tracing::warn!(error = %e, "malformed email envelope"); - let _ = msg.ack().await; - } + match tokio::time::timeout(std::time::Duration::from_millis(500), messages.next()).await { + Ok(Some(Ok(msg))) => match serde_json::from_slice::(&msg.payload) { + Ok(env) => { + batch.push(env); + acks.push(msg); } - } + Err(e) => { + tracing::warn!(error = %e, "malformed email envelope"); + let _ = msg.ack().await; + } + }, Ok(Some(Err(e))) => { tracing::warn!(error = %e, "email message error"); } @@ -302,4 +297,4 @@ async fn email_consume_once( tracing::info!(n = batch_size, "email batch sent and acked"); Ok(batch_size) -} \ No newline at end of file +}