use std::collections::HashMap; use std::io; use std::time::Duration; use anyhow::{Context, bail}; use config::AppConfig; use opentelemetry::KeyValue; use opentelemetry::trace::TracerProvider; use opentelemetry_otlp::{WithExportConfig, WithHttpConfig}; use opentelemetry_sdk::{ Resource, logs::SdkLoggerProvider, metrics::SdkMeterProvider, trace::SdkTracerProvider, }; use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt}; const OTEL_EXPORT_TIMEOUT: Duration = Duration::from_secs(10); type WorkerGuard = tracing_appender::non_blocking::WorkerGuard; /// `io::Write` adapter that strips the path prefix down to `gitdataai/` for /// cleaner log output. #[derive(Clone)] struct StripWriter { inner: W, } impl StripWriter { fn new(inner: W) -> Self { Self { inner } } } impl io::Write for StripWriter { fn write(&mut self, buf: &[u8]) -> io::Result { let s = String::from_utf8_lossy(buf); if let Some(pos) = s.rfind("gitdataai/") { let start = pos + "gitdataai/".len(); self.inner.write_all(s[start..].as_bytes())?; } else { self.inner.write_all(buf)?; } Ok(buf.len()) } fn flush(&mut self) -> io::Result<()> { self.inner.flush() } } fn strip_writer() -> impl for<'a> tracing_subscriber::fmt::MakeWriter<'a> { || StripWriter::new(io::stdout()) } pub struct LgtmGuard { tracer_provider: SdkTracerProvider, logger_provider: SdkLoggerProvider, meter_provider: SdkMeterProvider, _file_guard: Option, } struct OtlpEndpoints { traces: String, logs: String, metrics: String, } impl Drop for LgtmGuard { fn drop(&mut self) { let mut had_error = false; if let Err(err) = self.tracer_provider.shutdown() { had_error = true; eprintln!("failed to shutdown OTel tracer provider: {err}"); } if let Err(err) = self.meter_provider.shutdown() { had_error = true; eprintln!("failed to shutdown OTel meter provider: {err}"); } if let Err(err) = self.logger_provider.shutdown() { had_error = true; eprintln!("failed to shutdown OTel logger provider: {err}"); } drop(self._file_guard.take()); if !had_error { eprintln!("OpenTelemetry providers shut down"); } } } pub fn init_lgtm(config: &AppConfig) -> anyhow::Result> { let filter = EnvFilter::try_new(config.log_level()?)?; let log_format = config.log_format()?; if !config.otel_enabled()? { let file_guard = init_fmt_subscriber(filter, &log_format, config)?; if let Some(guard) = file_guard { static FILE_GUARD: std::sync::OnceLock = std::sync::OnceLock::new(); let _ = FILE_GUARD.set(guard); } return Ok(None); } let (endpoint, tracer_provider, logger_provider, meter_provider) = build_lgtm_guard(config)?; let guard = LgtmGuard { tracer_provider: tracer_provider.clone(), logger_provider: logger_provider.clone(), meter_provider: meter_provider.clone(), _file_guard: None, }; let file_guard = install_otel_subscriber(filter, &guard, &log_format, config)?; opentelemetry::global::set_meter_provider(meter_provider.clone()); tracing::info!(endpoint = %endpoint, "LGTM observability initialized"); Ok(Some(LgtmGuard { tracer_provider, logger_provider, meter_provider, _file_guard: file_guard, })) } fn build_lgtm_guard( config: &AppConfig, ) -> anyhow::Result<( String, SdkTracerProvider, SdkLoggerProvider, SdkMeterProvider, )> { let endpoint = config.otel_endpoint()?; let endpoints = build_otlp_endpoints(&endpoint)?; let headers = build_headers(config)?; let resource = build_resource(config)?; let trace_exporter = build_trace_exporter(&endpoints.traces, &headers)?; let log_exporter = build_log_exporter(&endpoints.logs, &headers)?; let metric_exporter = build_metric_exporter(&endpoints.metrics, headers)?; let tracer_provider = SdkTracerProvider::builder() .with_batch_exporter(trace_exporter) .with_resource(resource.clone()) .build(); let logger_provider = SdkLoggerProvider::builder() .with_batch_exporter(log_exporter) .with_resource(resource.clone()) .build(); let meter_provider = SdkMeterProvider::builder() .with_periodic_exporter(metric_exporter) .with_resource(resource) .build(); Ok((endpoint, tracer_provider, logger_provider, meter_provider)) } fn install_otel_subscriber( filter: EnvFilter, guard: &LgtmGuard, log_format: &str, config: &AppConfig, ) -> anyhow::Result> { let tracer = guard.tracer_provider.tracer(env!("CARGO_PKG_NAME")); let otel_log_layer = opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new( &guard.logger_provider, ); if config.log_file_enabled()? { let dir = config .log_file_path() .unwrap_or_else(|_| "./logs".to_string()); let file_appender = tracing_appender::rolling::daily(dir, "gitdataai.log"); let (non_blocking, file_guard) = tracing_appender::non_blocking(file_appender); let file_writer = { let nb = non_blocking; move || StripWriter::new(nb.clone()) }; if log_format.eq_ignore_ascii_case("json") { let subscriber = Registry::default() .with(filter) .with(tracing_opentelemetry::layer().with_tracer(tracer)) .with(otel_log_layer) .with( tracing_subscriber::fmt::layer() .json() .with_target(false) .with_file(true) .with_line_number(true) .with_writer(strip_writer()), ) .with( tracing_subscriber::fmt::layer() .json() .with_ansi(false) .with_target(false) .with_file(true) .with_line_number(true) .with_writer(file_writer), ); tracing::subscriber::set_global_default(subscriber) .context("failed to initialize tracing subscriber")?; return Ok(Some(file_guard)); } let subscriber = Registry::default() .with(filter) .with(tracing_opentelemetry::layer().with_tracer(tracer)) .with(otel_log_layer) .with( tracing_subscriber::fmt::layer() .with_target(false) .with_file(true) .with_line_number(true) .with_writer(strip_writer()), ) .with( tracing_subscriber::fmt::layer() .with_ansi(false) .with_target(false) .with_file(true) .with_line_number(true) .with_writer(file_writer), ); tracing::subscriber::set_global_default(subscriber) .context("failed to initialize tracing subscriber")?; return Ok(Some(file_guard)); } if log_format.eq_ignore_ascii_case("json") { let subscriber = Registry::default() .with(filter) .with(tracing_opentelemetry::layer().with_tracer(tracer)) .with(otel_log_layer) .with( tracing_subscriber::fmt::layer() .json() .with_target(false) .with_file(true) .with_line_number(true) .with_writer(strip_writer()), ); tracing::subscriber::set_global_default(subscriber) .context("failed to initialize tracing subscriber")?; return Ok(None); } let subscriber = Registry::default() .with(filter) .with(tracing_opentelemetry::layer().with_tracer(tracer)) .with(otel_log_layer) .with( tracing_subscriber::fmt::layer() .with_target(false) .with_file(true) .with_line_number(true) .with_writer(strip_writer()), ); tracing::subscriber::set_global_default(subscriber) .context("failed to initialize tracing subscriber")?; Ok(None) } fn build_trace_exporter( endpoint: &str, headers: &HashMap, ) -> anyhow::Result { opentelemetry_otlp::SpanExporter::builder() .with_http() .with_endpoint(endpoint) .with_timeout(OTEL_EXPORT_TIMEOUT) .with_headers(headers.clone()) .build() .context("failed to build OTLP trace exporter") } fn build_log_exporter( endpoint: &str, headers: &HashMap, ) -> anyhow::Result { opentelemetry_otlp::LogExporter::builder() .with_http() .with_endpoint(endpoint) .with_timeout(OTEL_EXPORT_TIMEOUT) .with_headers(headers.clone()) .build() .context("failed to build OTLP log exporter") } fn build_metric_exporter( endpoint: &str, headers: HashMap, ) -> anyhow::Result { opentelemetry_otlp::MetricExporter::builder() .with_http() .with_endpoint(endpoint) .with_timeout(OTEL_EXPORT_TIMEOUT) .with_headers(headers) .build() .context("failed to build OTLP metric exporter") } fn init_fmt_subscriber( filter: EnvFilter, log_format: &str, config: &AppConfig, ) -> anyhow::Result> { let file_enabled = config.log_file_enabled()?; if log_format.eq_ignore_ascii_case("json") { if file_enabled { let dir = config .log_file_path() .unwrap_or_else(|_| "./logs".to_string()); let file_appender = tracing_appender::rolling::daily(dir, "gitdataai.log"); let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); let subscriber = tracing_subscriber::fmt() .json() .with_env_filter(filter) .with_target(false) .with_file(true) .with_line_number(true) .with_writer({ let nb = non_blocking; move || StripWriter::new(nb.clone()) }) .finish(); tracing::subscriber::set_global_default(subscriber) .context("failed to initialize tracing subscriber")?; return Ok(Some(guard)); } let subscriber = tracing_subscriber::fmt() .json() .with_env_filter(filter) .with_target(false) .with_file(true) .with_line_number(true) .with_writer(strip_writer()) .finish(); tracing::subscriber::set_global_default(subscriber) .context("failed to initialize tracing subscriber")?; return Ok(None); } if file_enabled { let dir = config .log_file_path() .unwrap_or_else(|_| "./logs".to_string()); let file_appender = tracing_appender::rolling::daily(dir, "gitdataai.log"); let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); let subscriber = tracing_subscriber::fmt() .with_env_filter(filter) .with_target(false) .with_file(true) .with_line_number(true) .with_writer({ let nb = non_blocking; move || StripWriter::new(nb.clone()) }) .finish(); tracing::subscriber::set_global_default(subscriber) .context("failed to initialize tracing subscriber")?; return Ok(Some(guard)); } let subscriber = tracing_subscriber::fmt() .with_env_filter(filter) .with_target(false) .with_file(true) .with_line_number(true) .with_writer(strip_writer()) .finish(); tracing::subscriber::set_global_default(subscriber) .context("failed to initialize tracing subscriber")?; Ok(None) } fn build_resource(config: &AppConfig) -> anyhow::Result { Ok(Resource::builder() .with_service_name(config.otel_service_name()?) .with_attributes([KeyValue::new( "service.version", config.otel_service_version()?, )]) .build()) } fn build_headers( config: &AppConfig, ) -> anyhow::Result> { let mut headers = HashMap::new(); if let Some(auth) = config.otel_authorization()? { headers.insert("Authorization".to_string(), auth); } else if let Some(token) = config.otel_organization()? { headers.insert( "signoz-access-token".to_string(), format!("ingest:{token}"), ); } Ok(headers) } fn build_otlp_endpoints(endpoint: &str) -> anyhow::Result { let base = normalize_otlp_base(endpoint)?; Ok(OtlpEndpoints { traces: format!("{base}/v1/traces"), logs: format!("{base}/v1/logs"), metrics: format!("{base}/v1/metrics"), }) } fn normalize_otlp_base(endpoint: &str) -> anyhow::Result { let endpoint = endpoint.trim().trim_end_matches('/'); if endpoint.is_empty() { bail!("APP_OTEL_ENDPOINT must not be empty"); } for suffix in ["/v1/traces", "/v1/logs", "/v1/metrics"] { if let Some(base) = endpoint.strip_suffix(suffix) { let base = base.trim_end_matches('/'); if base.is_empty() { bail!("APP_OTEL_ENDPOINT base URL must not be empty"); } return Ok(base.to_string()); } } Ok(endpoint.to_string()) }