gitdataai/libs/observability/src/otlp.rs
ZhenYi 6eb65a5c65 feat(observability): inject _msg field for VictoriaLogs compatibility
Add MsgJsonFormat custom event formatter that outputs JSON with _msg as
the first field, required by VictoriaLogs for full-text search. HTTP
middleware stores interpolated "METHOD /path" in thread-local buffer
for the formatter to read on span-close events.
2026-04-26 13:31:05 +08:00

99 lines
3.2 KiB
Rust

//! OTLP tracer initialisation (Phase 6).
//!
//! Uses HTTP/proto transport to the OTLP endpoint.
//! The endpoint URL is passed as-is to the HTTP exporter.
//! Default Kubernetes otel-collector-agent accepts HTTP on :4318.
//!
//! Call `init_otlp()` **after** `init_tracing_subscriber()` so the fmt layer is
//! already registered. This function rebuilds the global subscriber with the
//! OTLP tracing layer on top.
use crate::msg_json_fmt::MsgJsonFormat;
use opentelemetry::trace::TracerProvider;
use opentelemetry_otlp::{SpanExporter, WithExportConfig};
use opentelemetry_sdk::trace as sdktrace;
use tracing_subscriber::{fmt, fmt::Layer as FmtLayer, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
/// Guard that shuts down the OTLP pipeline on drop.
#[must_use]
pub struct OtelGuard {
provider: sdktrace::SdkTracerProvider,
}
impl OtelGuard {
/// Force-flush any pending spans and shut down the OTLP exporter.
pub async fn shutdown(self) {
if let Err(e) = self.provider.shutdown() {
tracing::warn!(error = %e, "OTLP tracer shutdown error");
}
}
}
/// Initialise OTLP tracing and attach it to the global tracing subscriber.
///
/// Uses HTTP/proto transport to the given endpoint.
/// Returns `Ok(Some(guard))` on success; the caller should store the guard and
/// call `guard.shutdown()` during app shutdown for a clean flush.
///
/// The `fmt_registry` parameter should be the value returned by
/// `init_tracing_subscriber(level, true)` — i.e. a registry that was built but
/// not yet installed. This function extends that registry with the OTLP tracing
/// layer and calls `try_init()` once, avoiding the "global default already set" error.
pub fn init_otlp(
endpoint: &str,
service_name: &str,
_service_version: &str,
log_level: &str,
) -> Result<Option<OtelGuard>, InitOtlError> {
if endpoint.is_empty() {
return Err(InitOtlError::EmptyEndpoint);
}
let endpoint = endpoint.trim_end_matches('/');
let exporter = SpanExporter::builder()
.with_http()
.with_endpoint(endpoint)
.build()
.map_err(|e| InitOtlError::ExporterInit(e.to_string()))?;
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new(log_level));
let fmt_layer: FmtLayer<_, _, _, _> = fmt::layer()
.event_format(MsgJsonFormat);
let tracer_provider = sdktrace::SdkTracerProvider::builder()
.with_batch_exporter(exporter)
.build();
let tracer = tracer_provider.tracer(service_name.to_string());
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
let layered = tracing_subscriber::registry()
.with(env_filter)
.with(fmt_layer)
.with(otel_layer);
tracing::Dispatch::new(layered)
.try_init()
.map_err(|e| InitOtlError::SubscriberInit(e.to_string()))?;
tracing::debug!(endpoint = %endpoint, "OTLP tracer installed");
Ok(Some(OtelGuard { provider: tracer_provider }))
}
#[derive(Debug, thiserror::Error)]
pub enum InitOtlError {
#[error("endpoint is empty")]
EmptyEndpoint,
#[error("failed to build OTLP exporter: {0}")]
ExporterInit(String),
#[error("failed to set tracing subscriber: {0}")]
SubscriberInit(String),
}