gitdataai/libs/observability/src/tracing_middleware.rs
ZhenYi 6eb65a5c65 feat(observability): inject _msg field for VictoriaLogs compatibility
Add MsgJsonFormat custom event formatter that outputs JSON with _msg as
the first field, required by VictoriaLogs for full-text search. HTTP
middleware stores interpolated "METHOD /path" in thread-local buffer
for the formatter to read on span-close events.
2026-04-26 13:31:05 +08:00

87 lines
2.7 KiB
Rust

//! Actix-web middleware that creates a `tracing::Span` for each HTTP request.
//!
//! The span is set as the current context so all downstream async code is
//! automatically instrumented. When the `tracing_opentelemetry` layer is
//! active, spans are exported via OTLP.
use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform};
use std::sync::Arc;
use std::task::{Context, Poll};
/// Actix-web middleware that creates a tracing span per request.
pub struct TracingSpanMiddleware;
impl TracingSpanMiddleware {
pub fn new() -> Self {
Self
}
}
impl Default for TracingSpanMiddleware {
fn default() -> Self {
Self::new()
}
}
impl<S, B> Transform<S, ServiceRequest> for TracingSpanMiddleware
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error> + 'static,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = actix_web::Error;
type Transform = TracingSpanMiddlewareService<S>;
type InitError = ();
type Future = std::future::Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
std::future::ready(Ok(TracingSpanMiddlewareService {
service: Arc::new(service),
}))
}
}
pub struct TracingSpanMiddlewareService<S> {
service: Arc<S>,
}
impl<S> Clone for TracingSpanMiddlewareService<S> {
fn clone(&self) -> Self {
Self {
service: self.service.clone(),
}
}
}
impl<S, B> Service<ServiceRequest> for TracingSpanMiddlewareService<S>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = actix_web::Error> + 'static,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = actix_web::Error;
type Future = std::pin::Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + 'static>>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&self, req: ServiceRequest) -> Self::Future {
let method = req.method().to_string();
let path = req.path().to_string();
let service = self.service.clone();
Box::pin(async move {
// Set _msg for VictoriaLogs before entering the span.
// The JSON formatter reads this thread-local value.
crate::msg_json_fmt::set_span_msg(format!("{} {}", method, path));
let span = tracing::info_span!("HTTP {method} {path}", method = %method, path = %path);
let _guard = span.enter();
let res = service.call(req).await?;
Ok(res)
})
}
}