refactor(apps): migrate app, gitserver, git-hook, email from slog to tracing
- apps/app: remove mod logging, replace init_tracing_subscriber() call, remove slog macros from main.rs, remove logging.rs - apps/gitserver: remove slog usage from main.rs - apps/git-hook: remove slog from main.rs - apps/email: remove slog from main.rs
This commit is contained in:
parent
e99feb236b
commit
236aebe4ea
@ -25,7 +25,7 @@ migrate = { workspace = true }
|
||||
actix-web = { workspace = true }
|
||||
actix-cors = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
slog = "2"
|
||||
tracing = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
clap = { workspace = true }
|
||||
sea-orm = { workspace = true }
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
//! Structured HTTP request logging middleware using slog.
|
||||
//! Structured HTTP request logging middleware using tracing.
|
||||
//!
|
||||
//! Logs every incoming request with method, path, status code,
|
||||
//! response time, client IP, authenticated user ID, and trace_id.
|
||||
@ -6,7 +6,6 @@
|
||||
use actix_web::dev::{Service, ServiceRequest, ServiceResponse, Transform};
|
||||
use futures::future::{LocalBoxFuture, Ready, ok};
|
||||
use session::SessionExt;
|
||||
use slog::{error as slog_error, info as slog_info, warn as slog_warn};
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Instant;
|
||||
@ -14,12 +13,12 @@ use uuid::Uuid;
|
||||
|
||||
/// Default log format: `{method} {path} {status} {duration_ms}ms`
|
||||
pub struct RequestLogger {
|
||||
log: slog::Logger,
|
||||
trace_id_header: String,
|
||||
}
|
||||
|
||||
impl RequestLogger {
|
||||
pub fn new(log: slog::Logger) -> Self {
|
||||
Self { log }
|
||||
pub fn new(trace_id_header: String) -> Self {
|
||||
Self { trace_id_header }
|
||||
}
|
||||
}
|
||||
|
||||
@ -38,21 +37,21 @@ where
|
||||
fn new_transform(&self, service: S) -> Self::Future {
|
||||
ok(RequestLoggerMiddleware {
|
||||
service: Arc::new(service),
|
||||
log: self.log.clone(),
|
||||
trace_id_header: self.trace_id_header.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RequestLoggerMiddleware<S> {
|
||||
service: Arc<S>,
|
||||
log: slog::Logger,
|
||||
trace_id_header: String,
|
||||
}
|
||||
|
||||
impl<S> Clone for RequestLoggerMiddleware<S> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
service: self.service.clone(),
|
||||
log: self.log.clone(),
|
||||
trace_id_header: self.trace_id_header.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -73,7 +72,7 @@ where
|
||||
|
||||
fn call(&self, req: ServiceRequest) -> Self::Future {
|
||||
let started = Instant::now();
|
||||
let log = self.log.clone();
|
||||
let trace_id_header = self.trace_id_header.clone();
|
||||
let method = req.method().to_string();
|
||||
let path = req.path().to_string();
|
||||
let query = req.query_string().to_string();
|
||||
@ -91,7 +90,6 @@ where
|
||||
format!("{}?{}", path, query)
|
||||
};
|
||||
|
||||
// Clone the Arc<S> so it can be moved into the async block
|
||||
let service = self.service.clone();
|
||||
|
||||
Box::pin(async move {
|
||||
@ -107,39 +105,25 @@ where
|
||||
.unwrap_or_else(|| "-".to_string());
|
||||
let duration_ms = elapsed.as_millis() as u64;
|
||||
|
||||
let log_args = (
|
||||
method = %method,
|
||||
path = %full_path,
|
||||
status = status_code,
|
||||
duration_ms = duration_ms,
|
||||
remote = %remote,
|
||||
user_id = %user_id_str,
|
||||
trace_id = %trace_id,
|
||||
);
|
||||
|
||||
match status_code {
|
||||
200..=299 => {
|
||||
slog_info!(log, "http_request";
|
||||
"method" => %method,
|
||||
"path" => %full_path,
|
||||
"status" => status_code,
|
||||
"duration_ms" => duration_ms,
|
||||
"remote" => %remote,
|
||||
"user_id" => %user_id_str,
|
||||
"trace_id" => %trace_id
|
||||
);
|
||||
tracing::info!(log_args, "http_request");
|
||||
}
|
||||
400..=499 => {
|
||||
slog_warn!(log, "http_request";
|
||||
"method" => %method,
|
||||
"path" => %full_path,
|
||||
"status" => status_code,
|
||||
"duration_ms" => duration_ms,
|
||||
"remote" => %remote,
|
||||
"user_id" => %user_id_str,
|
||||
"trace_id" => %trace_id
|
||||
);
|
||||
tracing::warn!(log_args, "http_request");
|
||||
}
|
||||
_ => {
|
||||
slog_error!(log, "http_request";
|
||||
"method" => %method,
|
||||
"path" => %full_path,
|
||||
"status" => status_code,
|
||||
"duration_ms" => duration_ms,
|
||||
"remote" => %remote,
|
||||
"user_id" => %user_id_str,
|
||||
"trace_id" => %trace_id
|
||||
);
|
||||
tracing::error!(log_args, "http_request");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -10,10 +10,12 @@ use service::AppService;
|
||||
use session::SessionMiddleware;
|
||||
use session::config::{PersistentSession, SessionLifecycle, TtlExtensionPolicy};
|
||||
use session::storage::RedisClusterSessionStore;
|
||||
use observability::{build_logger, MetricsMiddleware, HttpMetrics};
|
||||
use observability::{
|
||||
init_tracing_subscriber, install_recorder, prometheus_handler, spawn_http_metrics_poller,
|
||||
MetricsMiddleware, HttpMetrics, TracingSpanMiddleware, HttpSnapshotGuard,
|
||||
};
|
||||
|
||||
mod args;
|
||||
mod logging;
|
||||
|
||||
use args::ServerArgs;
|
||||
use config::AppConfig;
|
||||
@ -37,25 +39,24 @@ fn build_session_key(cfg: &AppConfig) -> anyhow::Result<Key> {
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let cfg = AppConfig::load();
|
||||
let log_level = cfg.log_level().unwrap_or_else(|_| "info".to_string());
|
||||
let log = build_logger(&log_level);
|
||||
slog::info!(
|
||||
log,
|
||||
"Starting {} {}",
|
||||
cfg.app_name().unwrap_or_default(),
|
||||
cfg.app_version().unwrap_or_default()
|
||||
init_tracing_subscriber(&log_level);
|
||||
tracing::info!(
|
||||
app_name = %cfg.app_name().unwrap_or_default(),
|
||||
app_version = %cfg.app_version().unwrap_or_default(),
|
||||
"Starting application"
|
||||
);
|
||||
let db = AppDatabase::init(&cfg).await?;
|
||||
slog::info!(log, "Database connected");
|
||||
tracing::info!("Database connected");
|
||||
let redis_urls = cfg.redis_urls()?;
|
||||
let store: RedisClusterSessionStore = RedisClusterSessionStore::new(redis_urls).await?;
|
||||
slog::info!(log, "Redis connected");
|
||||
tracing::info!("Redis connected");
|
||||
let cache = AppCache::init(&cfg).await?;
|
||||
slog::info!(log, "Cache initialized");
|
||||
run_migrations(&db, &log).await?;
|
||||
tracing::info!("Cache initialized");
|
||||
run_migrations(&db).await?;
|
||||
let session_key = build_session_key(&cfg)?;
|
||||
let args = ServerArgs::parse();
|
||||
let service = AppService::new(cfg.clone()).await?;
|
||||
slog::info!(log, "AppService initialized");
|
||||
tracing::info!("AppService initialized");
|
||||
|
||||
// Spawn background task: sync OpenRouter models immediately on startup,
|
||||
// then every 10 minutes.
|
||||
@ -66,17 +67,48 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
|
||||
let worker_service = service.clone();
|
||||
let log_for_http = log.clone();
|
||||
let log_for_worker = log.clone();
|
||||
let worker_handle = tokio::spawn(async move {
|
||||
worker_service
|
||||
.start_room_workers(shutdown_rx, log_for_worker)
|
||||
.start_room_workers(shutdown_rx)
|
||||
.await
|
||||
});
|
||||
|
||||
let bind_addr = args.bind.unwrap_or_else(|| "127.0.0.1:8080".to_string());
|
||||
// ── Phase 6: OTLP tracing ──────────────────────────────────────────────
|
||||
let _otel_guard = if cfg.otel_enabled().unwrap_or(false) {
|
||||
let endpoint = cfg.otel_endpoint().unwrap_or_else(|_| "http://localhost:4317".to_string());
|
||||
let service_name = cfg.otel_service_name().unwrap_or_else(|_| "app".to_string());
|
||||
let service_version = cfg.otel_service_version().unwrap_or_else(|_| "0.1.0".to_string());
|
||||
tracing::info!(endpoint = %endpoint, service = %service_name, "OTLP tracing enabled");
|
||||
let guard = observability::init_otlp(
|
||||
&endpoint,
|
||||
&service_name,
|
||||
&service_version,
|
||||
&log_level,
|
||||
)
|
||||
.map_err(|e| anyhow::anyhow!("OTLP init failed: {}", e))?;
|
||||
guard
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// ── Phase 6: Prometheus metrics ─────────────────────────────────────────
|
||||
install_recorder();
|
||||
|
||||
let http_metrics = std::sync::Arc::new(HttpMetrics::new());
|
||||
slog::info!(log, "Listening on {}", bind_addr);
|
||||
let http_snapshot: HttpSnapshotGuard =
|
||||
std::sync::Arc::new(std::sync::RwLock::new(
|
||||
observability::HttpMetricsSnapshot::default(),
|
||||
));
|
||||
let http_snapshot_for_poller = http_snapshot.clone();
|
||||
spawn_http_metrics_poller(
|
||||
http_metrics.clone(),
|
||||
http_snapshot_for_poller,
|
||||
std::time::Duration::from_secs(15),
|
||||
);
|
||||
let http_snapshot_data = web::Data::new(http_snapshot);
|
||||
|
||||
let bind_addr = args.bind.unwrap_or_else(|| "127.0.0.1:8080".to_string());
|
||||
tracing::info!(bind_addr = %bind_addr, "Listening");
|
||||
let http_metrics_server = http_metrics.clone();
|
||||
HttpServer::new(move || {
|
||||
let cors = Cors::default()
|
||||
@ -105,6 +137,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
.wrap(session_mw)
|
||||
.wrap(Logger::default().exclude("/health"))
|
||||
.wrap(metrics_mw)
|
||||
.wrap(TracingSpanMiddleware::new())
|
||||
.app_data(web::Data::new(AppState {
|
||||
db: db.clone(),
|
||||
cache: cache.clone(),
|
||||
@ -113,28 +146,29 @@ async fn main() -> anyhow::Result<()> {
|
||||
.app_data(web::Data::new(cfg.clone()))
|
||||
.app_data(web::Data::new(db.clone()))
|
||||
.app_data(web::Data::new(cache.clone()))
|
||||
.wrap(logging::RequestLogger::new(log_for_http.clone()))
|
||||
.app_data(http_snapshot_data.clone())
|
||||
.route("/health", web::get().to(health_check))
|
||||
.route("/metrics", web::get().to(prometheus_handler))
|
||||
.configure(api::route::init_routes)
|
||||
})
|
||||
.bind(&bind_addr)?
|
||||
.run()
|
||||
.await?;
|
||||
|
||||
slog::info!(log, "Server stopped, shutting down room workers");
|
||||
tracing::info!("Server stopped, shutting down room workers");
|
||||
let _ = shutdown_tx.send(());
|
||||
let _ = worker_handle.await;
|
||||
slog::info!(log, "Room workers stopped");
|
||||
tracing::info!("Room workers stopped");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_migrations(db: &AppDatabase, log: &slog::Logger) -> anyhow::Result<()> {
|
||||
slog::info!(log, "Running database migrations...");
|
||||
async fn run_migrations(db: &AppDatabase) -> anyhow::Result<()> {
|
||||
tracing::info!("Running database migrations...");
|
||||
Migrator::up(db.writer(), None)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Migration failed: {:?}", e))?;
|
||||
slog::info!(log, "Migrations completed");
|
||||
tracing::info!("Migrations completed");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@ -21,7 +21,8 @@ tokio = { workspace = true, features = ["full"] }
|
||||
service = { workspace = true }
|
||||
db = { workspace = true }
|
||||
config = { workspace = true }
|
||||
slog = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
observability = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
use clap::Parser;
|
||||
use config::AppConfig;
|
||||
use observability::init_tracing_subscriber;
|
||||
use service::AppService;
|
||||
use slog::{Drain, OwnedKVList, Record};
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(name = "email-worker")]
|
||||
@ -15,70 +15,19 @@ struct Args {
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let args = Args::parse();
|
||||
let cfg = AppConfig::load();
|
||||
let log = build_logger(&args.log_level);
|
||||
init_tracing_subscriber(&args.log_level);
|
||||
|
||||
slog::info!(log, "Starting email worker");
|
||||
tracing::info!("Starting email worker");
|
||||
let service = AppService::new(cfg).await?;
|
||||
|
||||
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
|
||||
let log_for_signal = log.clone();
|
||||
tokio::spawn(async move {
|
||||
tokio::signal::ctrl_c().await.ok();
|
||||
slog::info!(log_for_signal, "shutting down email worker");
|
||||
tracing::info!("shutting down email worker");
|
||||
let _ = shutdown_tx.send(());
|
||||
});
|
||||
|
||||
service.start_email_workers(shutdown_rx).await?;
|
||||
slog::info!(log, "email worker stopped");
|
||||
tracing::info!("email worker stopped");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn build_logger(level: &str) -> slog::Logger {
|
||||
let level_filter = match level {
|
||||
"trace" => 0usize,
|
||||
"debug" => 1usize,
|
||||
"info" => 2usize,
|
||||
"warn" => 3usize,
|
||||
"error" => 4usize,
|
||||
_ => 2usize,
|
||||
};
|
||||
|
||||
struct StderrDrain(usize);
|
||||
|
||||
impl Drain for StderrDrain {
|
||||
type Ok = ();
|
||||
type Err = ();
|
||||
#[inline]
|
||||
fn log(&self, record: &Record, _logger: &OwnedKVList) -> Result<(), ()> {
|
||||
let slog_level = match record.level() {
|
||||
slog::Level::Trace => 0,
|
||||
slog::Level::Debug => 1,
|
||||
slog::Level::Info => 2,
|
||||
slog::Level::Warning => 3,
|
||||
slog::Level::Error => 4,
|
||||
slog::Level::Critical => 5,
|
||||
};
|
||||
if slog_level < self.0 {
|
||||
return Ok(());
|
||||
}
|
||||
let _ = eprintln!(
|
||||
"{} [{}] {}:{} - {}",
|
||||
chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.3fZ"),
|
||||
record.level().to_string(),
|
||||
record
|
||||
.file()
|
||||
.rsplit_once('/')
|
||||
.map(|(_, s)| s)
|
||||
.unwrap_or(record.file()),
|
||||
record.line(),
|
||||
record.msg(),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
let drain = StderrDrain(level_filter);
|
||||
let drain = std::sync::Mutex::new(drain);
|
||||
let drain = slog::Fuse::new(drain);
|
||||
slog::Logger::root(drain, slog::o!())
|
||||
}
|
||||
|
||||
@ -21,7 +21,6 @@ config = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true, features = ["json"] }
|
||||
anyhow = { workspace = true }
|
||||
slog = { workspace = true }
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
tokio-util = { workspace = true }
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
|
||||
@ -3,7 +3,7 @@ use config::AppConfig;
|
||||
use db::cache::AppCache;
|
||||
use db::database::AppDatabase;
|
||||
use git::hook::HookService;
|
||||
use observability::build_logger;
|
||||
use observability::init_tracing_subscriber;
|
||||
use tokio::signal;
|
||||
|
||||
mod args;
|
||||
@ -15,35 +15,33 @@ async fn main() -> anyhow::Result<()> {
|
||||
// 1. Load configuration
|
||||
let cfg = AppConfig::load();
|
||||
|
||||
// 2. Init slog logging
|
||||
// 2. Init tracing logging
|
||||
let log_level = cfg.log_level().unwrap_or_else(|_| "info".to_string());
|
||||
let log = build_logger(&log_level);
|
||||
init_tracing_subscriber(&log_level);
|
||||
|
||||
// 3. Connect to database
|
||||
let db = AppDatabase::init(&cfg).await?;
|
||||
slog::info!(log, "database connected");
|
||||
tracing::info!("database connected");
|
||||
|
||||
// 4. Connect to Redis cache (also provides the cluster pool for hook queue)
|
||||
let cache = AppCache::init(&cfg).await?;
|
||||
slog::info!(log, "cache connected");
|
||||
tracing::info!("cache connected");
|
||||
|
||||
// 5. Parse CLI args
|
||||
let _args = HookArgs::parse();
|
||||
|
||||
slog::info!(log, "git-hook worker starting");
|
||||
tracing::info!("git-hook worker starting");
|
||||
|
||||
// 6. Build and start git hook service
|
||||
let hooks = HookService::new(
|
||||
db,
|
||||
cache.clone(),
|
||||
cache.redis_pool().clone(),
|
||||
log.clone(),
|
||||
cfg,
|
||||
);
|
||||
|
||||
let cancel = hooks.start_worker();
|
||||
let cancel_signal = cancel.clone();
|
||||
let log_clone = log.clone();
|
||||
|
||||
// Spawn signal handler that cancels on SIGINT/SIGTERM
|
||||
tokio::spawn(async move {
|
||||
@ -66,10 +64,10 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
tokio::select! {
|
||||
_ = ctrl_c => {
|
||||
slog::info!(log_clone, "received SIGINT, initiating shutdown");
|
||||
tracing::info!("received SIGINT, initiating shutdown");
|
||||
}
|
||||
_ = term => {
|
||||
slog::info!(log_clone, "received SIGTERM, initiating shutdown");
|
||||
tracing::info!("received SIGTERM, initiating shutdown");
|
||||
}
|
||||
}
|
||||
cancel_signal.cancel();
|
||||
@ -77,7 +75,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
// Wait until the worker is cancelled (by signal handler or otherwise)
|
||||
cancel.cancelled().await;
|
||||
slog::info!(log, "git-hook worker stopped");
|
||||
tracing::info!("git-hook worker stopped");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@ -20,9 +20,9 @@ path = "src/main.rs"
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
git = { workspace = true }
|
||||
observability = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
db = { workspace = true }
|
||||
config = { workspace = true }
|
||||
slog = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
use clap::Parser;
|
||||
use config::AppConfig;
|
||||
use observability::build_logger;
|
||||
use observability::init_tracing_subscriber;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(name = "gitserver")]
|
||||
@ -14,32 +14,31 @@ struct Args {
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let args = Args::parse();
|
||||
let cfg = AppConfig::load();
|
||||
let log = build_logger(&args.log_level);
|
||||
init_tracing_subscriber(&args.log_level);
|
||||
|
||||
let http_handle = tokio::spawn(git::http::run_http(cfg.clone(), log.clone()));
|
||||
let ssh_handle = tokio::spawn(git::ssh::run_ssh(cfg, log.clone()));
|
||||
let http_handle = tokio::spawn(git::http::run_http(cfg.clone()));
|
||||
let ssh_handle = tokio::spawn(git::ssh::run_ssh(cfg));
|
||||
|
||||
tokio::select! {
|
||||
result = http_handle => {
|
||||
match result {
|
||||
Ok(Ok(())) => slog::info!(log, "HTTP server stopped"),
|
||||
Ok(Err(e)) => slog::error!(log, "HTTP server error: {}", e),
|
||||
Err(e) => slog::error!(log, "HTTP server task panicked: {}", e),
|
||||
Ok(Ok(())) => tracing::info!("HTTP server stopped"),
|
||||
Ok(Err(e)) => tracing::error!("HTTP server error: {}", e),
|
||||
Err(e) => tracing::error!("HTTP server task panicked: {}", e),
|
||||
}
|
||||
}
|
||||
result = ssh_handle => {
|
||||
match result {
|
||||
Ok(Ok(())) => slog::info!(log, "SSH server stopped"),
|
||||
Ok(Err(e)) => slog::error!(log, "SSH server error: {}", e),
|
||||
Err(e) => slog::error!(log, "SSH server task panicked: {}", e),
|
||||
Ok(Ok(())) => tracing::info!("SSH server stopped"),
|
||||
Ok(Err(e)) => tracing::error!("SSH server error: {}", e),
|
||||
Err(e) => tracing::error!("SSH server task panicked: {}", e),
|
||||
}
|
||||
}
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
slog::info!(log, "received shutdown signal");
|
||||
tracing::info!("received shutdown signal");
|
||||
}
|
||||
}
|
||||
|
||||
slog::info!(log, "shutting down");
|
||||
tracing::info!("shutting down");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user