Compare commits
No commits in common. "99ebfc14a78d5df5a800b8f40436b126a054ff76" and "07e74c230c179b7e5506d13f46e841f0d72fe991" have entirely different histories.
99ebfc14a7
...
07e74c230c
@ -5,14 +5,12 @@
|
|||||||
|
|
||||||
pub mod tracing_fmt;
|
pub mod tracing_fmt;
|
||||||
pub mod tracing_init;
|
pub mod tracing_init;
|
||||||
pub mod msg_json_fmt;
|
|
||||||
pub mod metrics_middleware;
|
pub mod metrics_middleware;
|
||||||
pub mod prometheus_exporter;
|
pub mod prometheus_exporter;
|
||||||
pub mod otlp;
|
pub mod otlp;
|
||||||
pub mod tracing_middleware;
|
pub mod tracing_middleware;
|
||||||
|
|
||||||
pub use tracing_fmt::{init_tracing_subscriber, instance_id};
|
pub use tracing_fmt::{init_tracing_subscriber, instance_id};
|
||||||
pub use msg_json_fmt::set_span_msg;
|
|
||||||
pub use metrics_middleware::{MetricsMiddleware, HttpMetrics};
|
pub use metrics_middleware::{MetricsMiddleware, HttpMetrics};
|
||||||
pub use prometheus_exporter::{
|
pub use prometheus_exporter::{
|
||||||
install_recorder, prometheus_handler, spawn_http_metrics_poller,
|
install_recorder, prometheus_handler, spawn_http_metrics_poller,
|
||||||
|
|||||||
@ -1,231 +0,0 @@
|
|||||||
//! Custom JSON formatter that injects `_msg` as the first field.
|
|
||||||
//!
|
|
||||||
//! VictoriaLogs requires every log line to have a `_msg` field as the message
|
|
||||||
//! subject for full-text search. This formatter ensures `_msg` is always the
|
|
||||||
//! first key in the serialized JSON object.
|
|
||||||
//!
|
|
||||||
//! `_msg` derivation rules:
|
|
||||||
//! - If the event has a `message` field → use it
|
|
||||||
//! - If a parent span stored a `_msg` in the thread-local buffer (set by the
|
|
||||||
//! HTTP middleware) → use it
|
|
||||||
//! - Fallback → use the event target (module path)
|
|
||||||
|
|
||||||
use serde::Serialize;
|
|
||||||
use std::cell::RefCell;
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::fmt;
|
|
||||||
use tracing::field::{Field, Visit};
|
|
||||||
use tracing_subscriber::fmt::format::FormatEvent;
|
|
||||||
|
|
||||||
// Thread-local buffer holding the current span's `_msg` value.
|
|
||||||
// Set by the HTTP middleware via `set_span_msg()` at span creation time.
|
|
||||||
// The formatter reads this value when the event message is empty (span close).
|
|
||||||
thread_local! {
|
|
||||||
static SPAN_MSG: RefCell<String> = const { RefCell::new(String::new()) };
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Set the `_msg` for the current span. Called by middleware at span creation.
|
|
||||||
pub fn set_span_msg(msg: String) {
|
|
||||||
SPAN_MSG.with(|cell| cell.borrow_mut().clone_from(&msg));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Read and clear the current span's `_msg`. Called by the formatter.
|
|
||||||
fn take_span_msg() -> Option<String> {
|
|
||||||
SPAN_MSG.with(|cell| {
|
|
||||||
let mut s = cell.borrow_mut();
|
|
||||||
if s.is_empty() {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
let out = s.clone();
|
|
||||||
s.clear();
|
|
||||||
Some(out)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Capture all event fields via the visitor pattern.
|
|
||||||
struct FieldCollector {
|
|
||||||
message: Option<String>,
|
|
||||||
fields: HashMap<String, serde_json::Value>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FieldCollector {
|
|
||||||
fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
message: None,
|
|
||||||
fields: HashMap::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Visit for FieldCollector {
|
|
||||||
fn record_str(&mut self, field: &Field, value: &str) {
|
|
||||||
if field.name() == "message" {
|
|
||||||
self.message = Some(value.to_string());
|
|
||||||
} else {
|
|
||||||
self.fields
|
|
||||||
.insert(field.name().to_string(), serde_json::Value::String(value.to_string()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
|
|
||||||
self.fields.insert(
|
|
||||||
field.name().to_string(),
|
|
||||||
serde_json::Value::String(format!("{:?}", value)),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn record_i64(&mut self, field: &Field, value: i64) {
|
|
||||||
self.fields.insert(
|
|
||||||
field.name().to_string(),
|
|
||||||
serde_json::Value::Number(value.into()),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn record_u64(&mut self, field: &Field, value: u64) {
|
|
||||||
use serde_json::Number;
|
|
||||||
self.fields.insert(
|
|
||||||
field.name().to_string(),
|
|
||||||
serde_json::Value::Number(Number::from(value)),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn record_bool(&mut self, field: &Field, value: bool) {
|
|
||||||
self.fields.insert(
|
|
||||||
field.name().to_string(),
|
|
||||||
serde_json::Value::Bool(value),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Custom `FormatEvent` that outputs JSON with `_msg` as the first field.
|
|
||||||
pub struct MsgJsonFormat;
|
|
||||||
|
|
||||||
impl<S, N> FormatEvent<S, N> for MsgJsonFormat
|
|
||||||
where
|
|
||||||
S: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
|
|
||||||
N: for<'a> tracing_subscriber::fmt::format::FormatFields<'a> + 'static,
|
|
||||||
{
|
|
||||||
fn format_event(
|
|
||||||
&self,
|
|
||||||
_ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
|
|
||||||
mut writer: tracing_subscriber::fmt::format::Writer<'_>,
|
|
||||||
event: &tracing::Event<'_>,
|
|
||||||
) -> std::fmt::Result {
|
|
||||||
let mut collector = FieldCollector::new();
|
|
||||||
event.record(&mut collector);
|
|
||||||
|
|
||||||
// Derive _msg: event message → thread-local span msg → target fallback
|
|
||||||
let msg = collector
|
|
||||||
.message
|
|
||||||
.clone()
|
|
||||||
.or_else(take_span_msg)
|
|
||||||
.unwrap_or_else(|| event.metadata().target().to_string());
|
|
||||||
|
|
||||||
// Build ordered JSON: _msg first, then standard fields, then event fields
|
|
||||||
let mut ordered = serde_json::Map::with_capacity(10 + collector.fields.len());
|
|
||||||
ordered.insert("_msg".to_string(), serde_json::Value::String(msg));
|
|
||||||
ordered.insert(
|
|
||||||
"timestamp".to_string(),
|
|
||||||
serde_json::Value::String(chrono::Utc::now().to_rfc3339()),
|
|
||||||
);
|
|
||||||
ordered.insert(
|
|
||||||
"level".to_string(),
|
|
||||||
serde_json::Value::String(format!("{}", event.metadata().level())),
|
|
||||||
);
|
|
||||||
ordered.insert(
|
|
||||||
"target".to_string(),
|
|
||||||
serde_json::Value::String(event.metadata().target().to_string()),
|
|
||||||
);
|
|
||||||
|
|
||||||
if let Some(module_path) = event.metadata().module_path() {
|
|
||||||
ordered.insert(
|
|
||||||
"module".to_string(),
|
|
||||||
serde_json::Value::String(module_path.to_string()),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
if let Some(file) = event.metadata().file() {
|
|
||||||
ordered.insert("file".to_string(), serde_json::Value::String(file.to_string()));
|
|
||||||
}
|
|
||||||
if let Some(line) = event.metadata().line() {
|
|
||||||
ordered.insert("line".to_string(), serde_json::Value::Number(line.into()));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Event fields (excluding `message` which is already in `_msg`)
|
|
||||||
for (key, value) in &collector.fields {
|
|
||||||
ordered.insert(key.clone(), value.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
write!(writer, "{}", serde_json::to_string(&ordered).map_err(|_| fmt::Error)?)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Log entry wrapper for VictoriaLogs-compatible output.
|
|
||||||
///
|
|
||||||
/// All fields serialize in a fixed order with `_msg` first.
|
|
||||||
#[derive(Serialize)]
|
|
||||||
pub struct VLogEntry {
|
|
||||||
_msg: String,
|
|
||||||
timestamp: String,
|
|
||||||
level: String,
|
|
||||||
target: String,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
module: Option<String>,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
file: Option<String>,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
line: Option<u32>,
|
|
||||||
#[serde(flatten)]
|
|
||||||
extra: serde_json::Map<String, serde_json::Value>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl VLogEntry {
|
|
||||||
pub fn new(msg: String, level: &str, target: &str) -> Self {
|
|
||||||
Self {
|
|
||||||
_msg: msg,
|
|
||||||
timestamp: chrono::Utc::now().to_rfc3339(),
|
|
||||||
level: level.to_string(),
|
|
||||||
target: target.to_string(),
|
|
||||||
module: None,
|
|
||||||
file: None,
|
|
||||||
line: None,
|
|
||||||
extra: serde_json::Map::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn module(mut self, m: String) -> Self {
|
|
||||||
self.module = Some(m);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn file(mut self, f: String) -> Self {
|
|
||||||
self.file = Some(f);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn line(mut self, l: u32) -> Self {
|
|
||||||
self.line = Some(l);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn field(mut self, key: String, value: serde_json::Value) -> Self {
|
|
||||||
self.extra.insert(key, value);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn json_order_has_msg_first() {
|
|
||||||
let entry = VLogEntry::new("GET /api/users".to_string(), "INFO", "my::module")
|
|
||||||
.module("my::module".to_string())
|
|
||||||
.field("status".to_string(), serde_json::json!(200))
|
|
||||||
.field("duration_ms".to_string(), serde_json::json!(42));
|
|
||||||
|
|
||||||
let json = serde_json::to_string(&entry).unwrap();
|
|
||||||
assert!(json.starts_with(r#"{"_msg":"GET /api/users""#));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -8,12 +8,10 @@
|
|||||||
//! already registered. This function rebuilds the global subscriber with the
|
//! already registered. This function rebuilds the global subscriber with the
|
||||||
//! OTLP tracing layer on top.
|
//! OTLP tracing layer on top.
|
||||||
|
|
||||||
use crate::msg_json_fmt::MsgJsonFormat;
|
|
||||||
|
|
||||||
use opentelemetry::trace::TracerProvider;
|
use opentelemetry::trace::TracerProvider;
|
||||||
use opentelemetry_otlp::{SpanExporter, WithExportConfig};
|
use opentelemetry_otlp::{SpanExporter, WithExportConfig};
|
||||||
use opentelemetry_sdk::trace as sdktrace;
|
use opentelemetry_sdk::trace as sdktrace;
|
||||||
use tracing_subscriber::{fmt, fmt::Layer as FmtLayer, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
|
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
|
||||||
|
|
||||||
/// Guard that shuts down the OTLP pipeline on drop.
|
/// Guard that shuts down the OTLP pipeline on drop.
|
||||||
#[must_use]
|
#[must_use]
|
||||||
@ -61,8 +59,13 @@ pub fn init_otlp(
|
|||||||
let env_filter = EnvFilter::try_from_default_env()
|
let env_filter = EnvFilter::try_from_default_env()
|
||||||
.unwrap_or_else(|_| EnvFilter::new(log_level));
|
.unwrap_or_else(|_| EnvFilter::new(log_level));
|
||||||
|
|
||||||
let fmt_layer: FmtLayer<_, _, _, _> = fmt::layer()
|
let fmt_layer = tracing_subscriber::fmt::layer()
|
||||||
.event_format(MsgJsonFormat);
|
.json()
|
||||||
|
.with_target(true)
|
||||||
|
.with_thread_ids(false)
|
||||||
|
.with_file(true)
|
||||||
|
.with_line_number(true)
|
||||||
|
.flatten_event(true);
|
||||||
|
|
||||||
let tracer_provider = sdktrace::SdkTracerProvider::builder()
|
let tracer_provider = sdktrace::SdkTracerProvider::builder()
|
||||||
.with_batch_exporter(exporter)
|
.with_batch_exporter(exporter)
|
||||||
|
|||||||
@ -1,14 +1,12 @@
|
|||||||
//! Tracing subscriber initialisation with JSON output.
|
//! Tracing subscriber initialisation with JSON output.
|
||||||
//!
|
//!
|
||||||
//! Uses a custom `MsgJsonFormat` that injects `_msg` as the first field
|
//! Uses `tracing_subscriber::fmt` with a custom `Make` impl that injects
|
||||||
//! for VictoriaLogs compatibility.
|
//! `instance_id` into every log line.
|
||||||
|
|
||||||
use crate::msg_json_fmt::MsgJsonFormat;
|
|
||||||
|
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use tracing_subscriber::{
|
use tracing_subscriber::{
|
||||||
fmt::{self, format::FmtSpan, Layer as FmtLayer},
|
fmt::{self, format::FmtSpan},
|
||||||
layer::SubscriberExt,
|
layer::SubscriberExt,
|
||||||
util::SubscriberInitExt,
|
util::SubscriberInitExt,
|
||||||
EnvFilter,
|
EnvFilter,
|
||||||
@ -36,8 +34,8 @@ pub fn instance_id() -> String {
|
|||||||
|
|
||||||
/// Initialises the global tracing subscriber with JSON-formatted output to stderr.
|
/// Initialises the global tracing subscriber with JSON-formatted output to stderr.
|
||||||
///
|
///
|
||||||
/// Each JSON line includes `_msg` (first field), `timestamp`, `level`, `target`,
|
/// Each JSON line includes `ts`, `level`, `target` (module), `fields` (structured kv),
|
||||||
/// `file`, `line`, and structured event fields.
|
/// `line`, `file`, and `instance_id`.
|
||||||
/// `RUST_LOG` env var controls the log level filter.
|
/// `RUST_LOG` env var controls the log level filter.
|
||||||
///
|
///
|
||||||
/// Pass `defer = true` when OTLP will be initialized afterwards via `init_otlp()`;
|
/// Pass `defer = true` when OTLP will be initialized afterwards via `init_otlp()`;
|
||||||
@ -48,15 +46,22 @@ pub fn init_tracing_subscriber(level: &str, defer: bool) {
|
|||||||
.or_else(|_| EnvFilter::from_str(level))
|
.or_else(|_| EnvFilter::from_str(level))
|
||||||
.expect("invalid log level");
|
.expect("invalid log level");
|
||||||
|
|
||||||
let mut fmt_layer: FmtLayer<_, _, _, _> = fmt::layer()
|
let fmt_layer = fmt::layer()
|
||||||
.event_format(MsgJsonFormat);
|
.json()
|
||||||
fmt_layer.set_span_events(FmtSpan::CLOSE);
|
.with_target(true)
|
||||||
|
.with_thread_ids(false)
|
||||||
|
.with_file(true)
|
||||||
|
.with_line_number(true)
|
||||||
|
.with_span_events(FmtSpan::CLOSE)
|
||||||
|
.flatten_event(true);
|
||||||
|
|
||||||
let registry = tracing_subscriber::registry()
|
let registry = tracing_subscriber::registry()
|
||||||
.with(env_filter)
|
.with(env_filter)
|
||||||
.with(fmt_layer);
|
.with(fmt_layer);
|
||||||
|
|
||||||
if defer {
|
if defer {
|
||||||
|
// Caller will invoke init_otlp() which builds the full subscriber
|
||||||
|
// including the OTLP layer, then calls try_init() once.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -74,9 +74,6 @@ where
|
|||||||
let service = self.service.clone();
|
let service = self.service.clone();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
// Set _msg for VictoriaLogs before entering the span.
|
|
||||||
// The JSON formatter reads this thread-local value.
|
|
||||||
crate::msg_json_fmt::set_span_msg(format!("{} {}", method, path));
|
|
||||||
let span = tracing::info_span!("HTTP {method} {path}", method = %method, path = %path);
|
let span = tracing::info_span!("HTTP {method} {path}", method = %method, path = %path);
|
||||||
let _guard = span.enter();
|
let _guard = span.enter();
|
||||||
let res = service.call(req).await?;
|
let res = service.call(req).await?;
|
||||||
|
|||||||
@ -200,7 +200,7 @@ export const MessageList = memo(function MessageList({
|
|||||||
|
|
||||||
const scrollToBottom = useCallback((smooth = true) => {
|
const scrollToBottom = useCallback((smooth = true) => {
|
||||||
if (rows.length === 0) return;
|
if (rows.length === 0) return;
|
||||||
virtualizer.scrollToIndex(rows.length - 1, { align: 'end', behavior: smooth ? 'smooth' : 'auto' });
|
virtualizer.scrollToIndex(rows.length - 1, { align: 'end', smooth });
|
||||||
}, [virtualizer, rows.length]);
|
}, [virtualizer, rows.length]);
|
||||||
|
|
||||||
// Ensure scroll-to-bottom fires after virtualizer measures all rows
|
// Ensure scroll-to-bottom fires after virtualizer measures all rows
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user