refactor(git): migrate libs/git from slog to tracing

- Remove all use slog::* imports and log: slog::Logger fields
- ssh/handle.rs: replace slog macro chains with tracing::{info!, warn!,
  error!, debug!}; remove log field from GitSshHandle
- ssh/authz.rs, ssh/mod.rs, ssh/server.rs: remove slog Logger fields
- http/: auth.rs, handler.rs, mod.rs, routes.rs: remove slog usage
- hook/: pool worker, sync modules, webhook_dispatch.rs: remove slog
This commit is contained in:
ZhenYi 2026-04-21 22:29:26 +08:00
parent 9a03cded8d
commit 0c1a9ddf98
19 changed files with 173 additions and 293 deletions

View File

@ -28,8 +28,8 @@ models = { workspace = true }
db = { workspace = true } db = { workspace = true }
deadpool-redis = { workspace = true, features = ["rt_tokio_1", "cluster-async", "cluster"] } deadpool-redis = { workspace = true, features = ["rt_tokio_1", "cluster-async", "cluster"] }
config = { workspace = true } config = { workspace = true }
slog = { workspace = true }
tokio = { workspace = true, features = ["sync", "rt", "process"] } tokio = { workspace = true, features = ["sync", "rt", "process"] }
tracing = { workspace = true }
tokio-util = { workspace = true } tokio-util = { workspace = true }
qdrant-client = { workspace = true } qdrant-client = { workspace = true }
redis = { workspace = true } redis = { workspace = true }

View File

@ -2,7 +2,6 @@ use config::AppConfig;
use db::cache::AppCache; use db::cache::AppCache;
use db::database::AppDatabase; use db::database::AppDatabase;
use deadpool_redis::cluster::Pool as RedisPool; use deadpool_redis::cluster::Pool as RedisPool;
use slog::Logger;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
pub mod pool; pub mod pool;
@ -20,7 +19,6 @@ pub struct HookService {
pub(crate) db: AppDatabase, pub(crate) db: AppDatabase,
pub(crate) cache: AppCache, pub(crate) cache: AppCache,
pub(crate) redis_pool: RedisPool, pub(crate) redis_pool: RedisPool,
pub(crate) logger: Logger,
pub(crate) config: AppConfig, pub(crate) config: AppConfig,
} }
@ -29,14 +27,12 @@ impl HookService {
db: AppDatabase, db: AppDatabase,
cache: AppCache, cache: AppCache,
redis_pool: RedisPool, redis_pool: RedisPool,
logger: Logger,
config: AppConfig, config: AppConfig,
) -> Self { ) -> Self {
Self { Self {
db, db,
cache, cache,
redis_pool, redis_pool,
logger,
config, config,
} }
} }
@ -48,7 +44,6 @@ impl HookService {
self.db.clone(), self.db.clone(),
self.cache.clone(), self.cache.clone(),
self.redis_pool.clone(), self.redis_pool.clone(),
self.logger.clone(),
pool_config, pool_config,
) )
} }

View File

@ -9,7 +9,6 @@ pub use worker::HookWorker;
use db::cache::AppCache; use db::cache::AppCache;
use db::database::AppDatabase; use db::database::AppDatabase;
use deadpool_redis::cluster::Pool as RedisPool; use deadpool_redis::cluster::Pool as RedisPool;
use slog::Logger;
use std::sync::Arc; use std::sync::Arc;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@ -19,19 +18,17 @@ pub fn start_worker(
db: AppDatabase, db: AppDatabase,
cache: AppCache, cache: AppCache,
redis_pool: RedisPool, redis_pool: RedisPool,
logger: Logger,
config: PoolConfig, config: PoolConfig,
) -> CancellationToken { ) -> CancellationToken {
let consumer = RedisConsumer::new( let consumer = RedisConsumer::new(
redis_pool.clone(), redis_pool.clone(),
config.redis_list_prefix.clone(), config.redis_list_prefix.clone(),
config.redis_block_timeout_secs, config.redis_block_timeout_secs,
logger.clone(),
); );
let http_client = Arc::new(reqwest::Client::new()); let http_client = Arc::new(reqwest::Client::new());
let max_retries = config.redis_max_retries as u32; let max_retries = config.redis_max_retries as u32;
let worker = HookWorker::new(db, cache, logger, consumer, http_client, max_retries); let worker = HookWorker::new(db, cache, consumer, http_client, max_retries);
let cancel = CancellationToken::new(); let cancel = CancellationToken::new();
let cancel_clone = cancel.clone(); let cancel_clone = cancel.clone();

View File

@ -1,7 +1,6 @@
use crate::error::GitError; use crate::error::GitError;
use crate::hook::pool::types::HookTask; use crate::hook::pool::types::HookTask;
use deadpool_redis::cluster::Connection as RedisConn; use deadpool_redis::cluster::Connection as RedisConn;
use slog::Logger;
use std::time::Duration; use std::time::Duration;
/// Redis List consumer using BLMOVE for atomic move-from-queue-to-work pattern. /// Redis List consumer using BLMOVE for atomic move-from-queue-to-work pattern.
@ -10,7 +9,6 @@ pub struct RedisConsumer {
/// Hash-tag-prefixed key prefix, e.g. "{hook}". /// Hash-tag-prefixed key prefix, e.g. "{hook}".
prefix: String, prefix: String,
block_timeout_secs: u64, block_timeout_secs: u64,
logger: Logger,
} }
const POOL_GET_TIMEOUT: Duration = Duration::from_secs(5); const POOL_GET_TIMEOUT: Duration = Duration::from_secs(5);
@ -20,13 +18,11 @@ impl RedisConsumer {
pool: deadpool_redis::cluster::Pool, pool: deadpool_redis::cluster::Pool,
prefix: String, prefix: String,
block_timeout_secs: u64, block_timeout_secs: u64,
logger: Logger,
) -> Self { ) -> Self {
Self { Self {
pool, pool,
prefix, prefix,
block_timeout_secs, block_timeout_secs,
logger,
} }
} }
@ -61,19 +57,12 @@ impl RedisConsumer {
Some(json) => { Some(json) => {
match serde_json::from_str::<HookTask>(&json) { match serde_json::from_str::<HookTask>(&json) {
Ok(task) => { Ok(task) => {
slog::debug!(self.logger, "task dequeued"; tracing::debug!("task dequeued task_id={} task_type={} queue={}", task.id, task.task_type, queue_key);
"task_id" => %task.id,
"task_type" => %task.task_type,
"queue" => %queue_key
);
Ok(Some((task, json))) Ok(Some((task, json)))
} }
Err(e) => { Err(e) => {
// Malformed task — remove from work queue and discard // Malformed task — remove from work queue and discard
slog::warn!(self.logger, "malformed task JSON, discarding"; tracing::warn!("malformed task JSON, discarding error={} queue={}", e, work_key);
"error" => %e,
"queue" => %work_key
);
let _ = self.ack_raw(&work_key, &json).await; let _ = self.ack_raw(&work_key, &json).await;
Ok(None) Ok(None)
} }
@ -154,7 +143,7 @@ impl RedisConsumer {
.await .await
.map_err(|e| GitError::Internal(format!("nak script failed: {}", e)))?; .map_err(|e| GitError::Internal(format!("nak script failed: {}", e)))?;
slog::warn!(self.logger, "task nack'd and requeued queue={}", queue_key); tracing::warn!("task nack'd and requeued queue={}", queue_key);
Ok(()) Ok(())
} }
@ -170,7 +159,6 @@ impl Clone for RedisConsumer {
pool: self.pool.clone(), pool: self.pool.clone(),
prefix: self.prefix.clone(), prefix: self.prefix.clone(),
block_timeout_secs: self.block_timeout_secs, block_timeout_secs: self.block_timeout_secs,
logger: self.logger.clone(),
} }
} }
} }

View File

@ -5,7 +5,6 @@ use crate::hook::sync::HookMetaDataSync;
use db::cache::AppCache; use db::cache::AppCache;
use db::database::AppDatabase; use db::database::AppDatabase;
use models::EntityTrait; use models::EntityTrait;
use slog::Logger;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@ -17,7 +16,6 @@ use tokio_util::sync::CancellationToken;
pub struct HookWorker { pub struct HookWorker {
db: AppDatabase, db: AppDatabase,
cache: AppCache, cache: AppCache,
logger: Logger,
consumer: RedisConsumer, consumer: RedisConsumer,
http_client: Arc<reqwest::Client>, http_client: Arc<reqwest::Client>,
max_retries: u32, max_retries: u32,
@ -27,7 +25,6 @@ impl HookWorker {
pub fn new( pub fn new(
db: AppDatabase, db: AppDatabase,
cache: AppCache, cache: AppCache,
logger: Logger,
consumer: RedisConsumer, consumer: RedisConsumer,
http_client: Arc<reqwest::Client>, http_client: Arc<reqwest::Client>,
max_retries: u32, max_retries: u32,
@ -35,7 +32,6 @@ impl HookWorker {
Self { Self {
db, db,
cache, cache,
logger,
consumer, consumer,
http_client, http_client,
max_retries, max_retries,
@ -44,7 +40,7 @@ impl HookWorker {
/// Run the worker loop. Blocks until cancelled. /// Run the worker loop. Blocks until cancelled.
pub async fn run(&self, cancel: CancellationToken) { pub async fn run(&self, cancel: CancellationToken) {
slog::info!(self.logger, "hook worker started"); tracing::info!("hook worker started");
let task_types = [TaskType::Sync, TaskType::Fsck, TaskType::Gc]; let task_types = [TaskType::Sync, TaskType::Fsck, TaskType::Gc];
let mut redis_backoff_ms: u64 = 1000; let mut redis_backoff_ms: u64 = 1000;
@ -52,7 +48,7 @@ impl HookWorker {
loop { loop {
// Check cancellation at top of loop to avoid unnecessary work // Check cancellation at top of loop to avoid unnecessary work
if cancel.is_cancelled() { if cancel.is_cancelled() {
slog::info!(self.logger, "hook worker shutdown signal received"); tracing::info!("hook worker shutdown signal received");
break; break;
} }
@ -67,7 +63,7 @@ impl HookWorker {
} }
Ok(None) => continue, Ok(None) => continue,
Err(e) => { Err(e) => {
slog::warn!(self.logger, "failed to dequeue task: {}", e); tracing::warn!("failed to dequeue task: {}", e);
tokio::time::sleep(Duration::from_millis(redis_backoff_ms)).await; tokio::time::sleep(Duration::from_millis(redis_backoff_ms)).await;
// Exponential backoff, cap at 32s // Exponential backoff, cap at 32s
redis_backoff_ms = (redis_backoff_ms * 2).min(32_000); redis_backoff_ms = (redis_backoff_ms * 2).min(32_000);
@ -83,7 +79,7 @@ impl HookWorker {
} }
} }
slog::info!(self.logger, "hook worker stopped"); tracing::info!("hook worker stopped");
} }
async fn process_task( async fn process_task(
@ -93,7 +89,7 @@ impl HookWorker {
work_key: &str, work_key: &str,
queue_key: &str, queue_key: &str,
) { ) {
slog::info!(self.logger, "task started task_id={} task_type={} repo_id={}", tracing::info!("task started task_id={} task_type={} repo_id={}",
task.id, task.task_type, task.repo_id); task.id, task.task_type, task.repo_id);
let result = match task.task_type { let result = match task.task_type {
@ -105,25 +101,25 @@ impl HookWorker {
match result { match result {
Ok(()) => { Ok(()) => {
if let Err(e) = self.consumer.ack(work_key, task_json).await { if let Err(e) = self.consumer.ack(work_key, task_json).await {
slog::warn!(self.logger, "failed to ack task: {}", e); tracing::warn!("failed to ack task: {}", e);
} }
slog::info!(self.logger, "task completed task_id={}", task.id); tracing::info!("task completed task_id={}", task.id);
} }
Err(e) => { Err(e) => {
let is_locked = matches!(e, crate::GitError::Locked(_)); let is_locked = matches!(e, crate::GitError::Locked(_));
if is_locked { if is_locked {
// Another worker holds the lock — requeue without counting as retry. // Another worker holds the lock — requeue without counting as retry.
slog::info!(self.logger, "repo locked by another worker, requeueing task_id={}", task.id); tracing::info!("repo locked by another worker, requeueing task_id={}", task.id);
if let Err(nak_err) = self.consumer.nak(work_key, queue_key, task_json).await { if let Err(nak_err) = self.consumer.nak(work_key, queue_key, task_json).await {
slog::warn!(self.logger, "failed to requeue locked task: {}", nak_err); tracing::warn!("failed to requeue locked task: {}", nak_err);
} }
} else { } else {
slog::warn!(self.logger, "task failed task_id={} task_type={} repo_id={} error={}", tracing::warn!("task failed task_id={} task_type={} repo_id={} error={}",
task.id, task.task_type, task.repo_id, e); task.id, task.task_type, task.repo_id, e);
if task.retry_count >= self.max_retries { if task.retry_count >= self.max_retries {
slog::warn!(self.logger, "task exhausted retries, discarding task_id={} retry_count={}", tracing::warn!("task exhausted retries, discarding task_id={} retry_count={}",
task.id, task.retry_count); task.id, task.retry_count);
let _ = self.consumer.ack(work_key, task_json).await; let _ = self.consumer.ack(work_key, task_json).await;
} else { } else {
@ -162,10 +158,9 @@ impl HookWorker {
let before_tips = tokio::task::spawn_blocking({ let before_tips = tokio::task::spawn_blocking({
let db = self.db.clone(); let db = self.db.clone();
let cache = self.cache.clone(); let cache = self.cache.clone();
let logger = self.logger.clone();
let repo = repo.clone(); let repo = repo.clone();
move || { move || {
let sync = HookMetaDataSync::new(db, cache, repo, logger) let sync = HookMetaDataSync::new(db, cache, repo)
.map_err(|e| GitError::Internal(e.to_string()))?; .map_err(|e| GitError::Internal(e.to_string()))?;
Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips())) Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips()))
} }
@ -177,11 +172,10 @@ impl HookWorker {
// Run full sync (internally acquires/releases per-repo lock) // Run full sync (internally acquires/releases per-repo lock)
let db = self.db.clone(); let db = self.db.clone();
let cache = self.cache.clone(); let cache = self.cache.clone();
let logger = self.logger.clone();
let repo_clone = repo.clone(); let repo_clone = repo.clone();
let _sync_result = tokio::task::spawn_blocking(move || { let _sync_result = tokio::task::spawn_blocking(move || {
let result = tokio::runtime::Handle::current().block_on(async { let result = tokio::runtime::Handle::current().block_on(async {
let sync = HookMetaDataSync::new(db.clone(), cache.clone(), repo_clone.clone(), logger.clone())?; let sync = HookMetaDataSync::new(db.clone(), cache.clone(), repo_clone.clone())?;
sync.sync().await sync.sync().await
}); });
match result { match result {
@ -199,10 +193,9 @@ impl HookWorker {
let after_tips = tokio::task::spawn_blocking({ let after_tips = tokio::task::spawn_blocking({
let db = self.db.clone(); let db = self.db.clone();
let cache = self.cache.clone(); let cache = self.cache.clone();
let logger = self.logger.clone();
let repo = repo.clone(); let repo = repo.clone();
move || { move || {
let sync = HookMetaDataSync::new(db, cache, repo, logger) let sync = HookMetaDataSync::new(db, cache, repo)
.map_err(|e| GitError::Internal(e.to_string()))?; .map_err(|e| GitError::Internal(e.to_string()))?;
Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips())) Ok::<_, GitError>((sync.list_branch_tips(), sync.list_tag_tips()))
} }
@ -229,7 +222,6 @@ impl HookWorker {
let default_branch = repo.default_branch.clone(); let default_branch = repo.default_branch.clone();
let http_client = self.http_client.clone(); let http_client = self.http_client.clone();
let db = self.db.clone(); let db = self.db.clone();
let logger = self.logger.clone();
// Dispatch branch webhooks and collect handles // Dispatch branch webhooks and collect handles
let mut handles = Vec::new(); let mut handles = Vec::new();
@ -245,7 +237,6 @@ impl HookWorker {
let h = tokio::spawn({ let h = tokio::spawn({
let http_client = http_client.clone(); let http_client = http_client.clone();
let db = db.clone(); let db = db.clone();
let logger = logger.clone();
let repo_id_str = repo_id_str.clone(); let repo_id_str = repo_id_str.clone();
let namespace = namespace.clone(); let namespace = namespace.clone();
let repo_name = repo_name.clone(); let repo_name = repo_name.clone();
@ -254,7 +245,6 @@ impl HookWorker {
crate::hook::webhook_dispatch::dispatch_repo_webhooks( crate::hook::webhook_dispatch::dispatch_repo_webhooks(
&db, &db,
&http_client, &http_client,
&logger,
&repo_id_str, &repo_id_str,
&namespace, &namespace,
&repo_name, &repo_name,
@ -289,7 +279,6 @@ impl HookWorker {
let h = tokio::spawn({ let h = tokio::spawn({
let http_client = http_client.clone(); let http_client = http_client.clone();
let db = db.clone(); let db = db.clone();
let logger = logger.clone();
let repo_id_str = repo_id_str.clone(); let repo_id_str = repo_id_str.clone();
let namespace = namespace.clone(); let namespace = namespace.clone();
let repo_name = repo_name.clone(); let repo_name = repo_name.clone();
@ -298,7 +287,6 @@ impl HookWorker {
crate::hook::webhook_dispatch::dispatch_repo_webhooks( crate::hook::webhook_dispatch::dispatch_repo_webhooks(
&db, &db,
&http_client, &http_client,
&logger,
&repo_id_str, &repo_id_str,
&namespace, &namespace,
&repo_name, &repo_name,
@ -345,8 +333,7 @@ impl HookWorker {
let db = self.db.clone(); let db = self.db.clone();
let cache = self.cache.clone(); let cache = self.cache.clone();
let logger = self.logger.clone(); let sync = HookMetaDataSync::new(db, cache, repo)?;
let sync = HookMetaDataSync::new(db, cache, repo, logger)?;
sync.fsck_only().await?; sync.fsck_only().await?;
Ok(()) Ok(())
@ -371,8 +358,7 @@ impl HookWorker {
let db = self.db.clone(); let db = self.db.clone();
let cache = self.cache.clone(); let cache = self.cache.clone();
let logger = self.logger.clone(); let sync = HookMetaDataSync::new(db, cache, repo)?;
let sync = HookMetaDataSync::new(db, cache, repo, logger)?;
sync.gc_only().await?; sync.gc_only().await?;
Ok(()) Ok(())

View File

@ -61,7 +61,7 @@ impl HookMetaDataSync {
let reference = match ref_result { let reference = match ref_result {
Ok(r) => r, Ok(r) => r,
Err(e) => { Err(e) => {
slog::warn!(self.logger, "failed to read reference: {}", e); tracing::warn!("failed to read reference error={}", e);
continue; continue;
} }
}; };

View File

@ -13,7 +13,6 @@ impl HookMetaDataSync {
) -> Result<(), GitError> { ) -> Result<(), GitError> {
let snapshot = self.snapshot_refs(); let snapshot = self.snapshot_refs();
let storage_path = self.repo.storage_path.clone(); let storage_path = self.repo.storage_path.clone();
let logger = self.logger.clone();
let fsck_errors = tokio::task::spawn_blocking(move || { let fsck_errors = tokio::task::spawn_blocking(move || {
let output = Command::new("git") let output = Command::new("git")
@ -27,9 +26,8 @@ impl HookMetaDataSync {
if !output.status.success() { if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr).to_string(); let stderr = String::from_utf8_lossy(&output.stderr).to_string();
let stdout = String::from_utf8_lossy(&output.stdout).to_string(); let stdout = String::from_utf8_lossy(&output.stdout).to_string();
slog::warn!( tracing::warn!(
logger, "git fsck failed code={:?} stdout={} stderr={}",
"git fsck failed with code {:?}. stdout: {}, stderr: {}",
output.status.code(), output.status.code(),
stdout, stdout,
stderr stderr
@ -98,7 +96,6 @@ impl HookMetaDataSync {
async fn rollback_refs(&self, snapshot: &HashMap<String, String>) { async fn rollback_refs(&self, snapshot: &HashMap<String, String>) {
let storage_path = self.repo.storage_path.clone(); let storage_path = self.repo.storage_path.clone();
let logger = self.logger.clone();
let refs: Vec<(String, String)> = snapshot let refs: Vec<(String, String)> = snapshot
.iter() .iter()
.map(|(k, v)| (k.clone(), v.clone())) .map(|(k, v)| (k.clone(), v.clone()))
@ -119,18 +116,17 @@ impl HookMetaDataSync {
match status { match status {
Ok(s) if s.success() => { Ok(s) if s.success() => {
slog::info!(logger, "rolled back ref {} to {}", ref_name, oid); tracing::info!("rolled back ref ref_name={} oid={}", ref_name, oid);
} }
Ok(s) => { Ok(s) => {
slog::error!( tracing::error!(
logger, "failed to rollback ref ref_name={} code={:?}",
"failed to rollback ref {}: git exited with {:?}",
ref_name, ref_name,
s.code() s.code()
); );
} }
Err(e) => { Err(e) => {
slog::error!(logger, "failed to rollback ref {}: {}", ref_name, e); tracing::error!("failed to rollback ref ref_name={} error={}", ref_name, e);
} }
} }
} }

View File

@ -5,7 +5,6 @@ use std::process::Command;
impl HookMetaDataSync { impl HookMetaDataSync {
pub async fn run_gc(&self) -> Result<(), GitError> { pub async fn run_gc(&self) -> Result<(), GitError> {
let storage_path = self.repo.storage_path.clone(); let storage_path = self.repo.storage_path.clone();
let logger = self.logger.clone();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let status = Command::new("git") let status = Command::new("git")
@ -20,7 +19,7 @@ impl HookMetaDataSync {
if !status.success() { if !status.success() {
// git gc --auto exits non-zero when there's nothing to collect, // git gc --auto exits non-zero when there's nothing to collect,
// or when another gc is already running — both are benign. // or when another gc is already running — both are benign.
slog::warn!(logger, "git gc exited with {:?}", status.code()); tracing::warn!(code = ?status.code(), "git gc exited with non-zero status");
} }
Ok::<(), GitError>(()) Ok::<(), GitError>(())

View File

@ -33,11 +33,7 @@ impl HookMetaDataSync {
let path = match self.domain.lfs_object_path(&oid) { let path = match self.domain.lfs_object_path(&oid) {
Ok(p) => p, Ok(p) => p,
Err(e) => { Err(e) => {
slog::warn!( tracing::warn!("invalid LFS OID in local objects directory error={}", e);
self.logger,
"invalid LFS OID in local objects directory: {}",
e
);
continue; continue;
} }
}; };

View File

@ -14,7 +14,6 @@ use models::repos::repo::Model as RepoModel;
use models::RepoId; use models::RepoId;
use models::ActiveModelTrait; use models::ActiveModelTrait;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, Set}; use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, Set};
use slog::Logger;
use std::collections::HashMap; use std::collections::HashMap;
use std::path::Path; use std::path::Path;
@ -133,7 +132,6 @@ pub struct HookMetaDataSync {
pub cache: AppCache, pub cache: AppCache,
pub repo: RepoModel, pub repo: RepoModel,
pub domain: GitDomain, pub domain: GitDomain,
pub logger: Logger,
} }
impl HookMetaDataSync { impl HookMetaDataSync {
@ -141,7 +139,6 @@ impl HookMetaDataSync {
db: AppDatabase, db: AppDatabase,
cache: AppCache, cache: AppCache,
repo: RepoModel, repo: RepoModel,
logger: Logger,
) -> Result<Self, crate::GitError> { ) -> Result<Self, crate::GitError> {
let domain = GitDomain::from_model(repo.clone())?; let domain = GitDomain::from_model(repo.clone())?;
Ok(Self { Ok(Self {
@ -149,7 +146,6 @@ impl HookMetaDataSync {
cache, cache,
repo, repo,
domain, domain,
logger,
}) })
} }
@ -160,7 +156,7 @@ impl HookMetaDataSync {
let res = self.sync_work().await; let res = self.sync_work().await;
if let Err(ref e) = res { if let Err(ref e) = res {
slog::error!(self.logger, "sync failed: {}", e); tracing::error!("sync failed error={}", e);
} }
let _ = self.release_lock(&lock_value).await; let _ = self.release_lock(&lock_value).await;
@ -304,11 +300,11 @@ impl HookMetaDataSync {
{ {
Ok(Ok(d)) => d, Ok(Ok(d)) => d,
Ok(Err(e)) => { Ok(Err(e)) => {
slog::warn!(self.logger, "failed to scan skills directory: {}", e); tracing::warn!("failed to scan skills directory error={}", e);
return; return;
} }
Err(e) => { Err(e) => {
slog::warn!(self.logger, "spawn_blocking join error: {}", e); tracing::warn!("spawn_blocking join error error={}", e);
return; return;
} }
}; };
@ -331,7 +327,7 @@ impl HookMetaDataSync {
{ {
Ok(e) => e, Ok(e) => e,
Err(e) => { Err(e) => {
slog::warn!(self.logger, "failed to query existing skills: {}", e); tracing::warn!("failed to query existing skills error={}", e);
return; return;
} }
}; };
@ -396,11 +392,7 @@ impl HookMetaDataSync {
} }
if created > 0 || updated > 0 || removed > 0 { if created > 0 || updated > 0 || removed > 0 {
slog::info!( tracing::info!("skills synced created={} updated={} removed={}", created, updated, removed);
self.logger,
"skills synced: created={}, updated={}, removed={}",
created, updated, removed
);
} }
} }
} }

View File

@ -212,7 +212,6 @@ pub enum WebhookEventKind {
pub async fn dispatch_repo_webhooks( pub async fn dispatch_repo_webhooks(
db: &AppDatabase, db: &AppDatabase,
http: &reqwest::Client, http: &reqwest::Client,
logs: &slog::Logger,
repo_uuid: &str, repo_uuid: &str,
namespace: &str, namespace: &str,
repo_name: &str, repo_name: &str,
@ -231,7 +230,7 @@ pub async fn dispatch_repo_webhooks(
{ {
Ok(ws) => ws, Ok(ws) => ws,
Err(e) => { Err(e) => {
slog::error!(logs, "failed to query webhooks repo={} error={}", repo_uuid, e); tracing::error!("failed to query webhooks repo={} error={}", repo_uuid, e);
return; return;
} }
}; };
@ -297,7 +296,7 @@ pub async fn dispatch_repo_webhooks(
let body = match serde_json::to_vec(&payload) { let body = match serde_json::to_vec(&payload) {
Ok(b) => b, Ok(b) => b,
Err(e) => { Err(e) => {
slog::error!(logs, "failed to serialize push payload error={}", e); tracing::error!("failed to serialize push payload error={}", e);
continue; continue;
} }
}; };
@ -310,16 +309,16 @@ pub async fn dispatch_repo_webhooks(
.await .await
{ {
Ok(Ok(())) => { Ok(Ok(())) => {
slog::info!(logs, "push webhook delivered webhook_id={} url={}", webhook_id, url); tracing::info!("push webhook delivered webhook_id={} url={}", webhook_id, url);
let _ = touch_webhook(db, webhook_id, true, logs).await; let _ = touch_webhook(db, webhook_id, true).await;
} }
Ok(Err(e)) => { Ok(Err(e)) => {
slog::warn!(logs, "push webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e); tracing::warn!("push webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e);
let _ = touch_webhook(db, webhook_id, false, logs).await; let _ = touch_webhook(db, webhook_id, false).await;
} }
Err(_) => { Err(_) => {
slog::warn!(logs, "push webhook timed out webhook_id={} url={}", webhook_id, url); tracing::warn!("push webhook timed out webhook_id={} url={}", webhook_id, url);
let _ = touch_webhook(db, webhook_id, false, logs).await; let _ = touch_webhook(db, webhook_id, false).await;
} }
} }
} }
@ -351,7 +350,7 @@ pub async fn dispatch_repo_webhooks(
let body = match serde_json::to_vec(&payload) { let body = match serde_json::to_vec(&payload) {
Ok(b) => b, Ok(b) => b,
Err(e) => { Err(e) => {
slog::error!(logs, "failed to serialize tag payload error={}", e); tracing::error!("failed to serialize tag payload error={}", e);
continue; continue;
} }
}; };
@ -364,16 +363,16 @@ pub async fn dispatch_repo_webhooks(
.await .await
{ {
Ok(Ok(())) => { Ok(Ok(())) => {
slog::info!(logs, "tag webhook delivered webhook_id={} url={}", webhook_id, url); tracing::info!("tag webhook delivered webhook_id={} url={}", webhook_id, url);
let _ = touch_webhook(db, webhook_id, true, logs).await; let _ = touch_webhook(db, webhook_id, true).await;
} }
Ok(Err(e)) => { Ok(Err(e)) => {
slog::warn!(logs, "tag webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e); tracing::warn!("tag webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e);
let _ = touch_webhook(db, webhook_id, false, logs).await; let _ = touch_webhook(db, webhook_id, false).await;
} }
Err(_) => { Err(_) => {
slog::warn!(logs, "tag webhook timed out webhook_id={} url={}", webhook_id, url); tracing::warn!("tag webhook timed out webhook_id={} url={}", webhook_id, url);
let _ = touch_webhook(db, webhook_id, false, logs).await; let _ = touch_webhook(db, webhook_id, false).await;
} }
} }
} }
@ -381,7 +380,7 @@ pub async fn dispatch_repo_webhooks(
} }
} }
async fn touch_webhook(db: &AppDatabase, webhook_id: i64, success: bool, logs: &slog::Logger) { async fn touch_webhook(db: &AppDatabase, webhook_id: i64, success: bool) {
use models::repos::repo_webhook::{Column as RwCol, Entity as RepoWebhookEntity}; use models::repos::repo_webhook::{Column as RwCol, Entity as RepoWebhookEntity};
use models::{ColumnTrait, EntityTrait, QueryFilter}; use models::{ColumnTrait, EntityTrait, QueryFilter};
use sea_orm::{sea_query::Expr, ExprTrait}; use sea_orm::{sea_query::Expr, ExprTrait};
@ -405,6 +404,6 @@ async fn touch_webhook(db: &AppDatabase, webhook_id: i64, success: bool, logs: &
}; };
if let Err(e) = result { if let Err(e) = result {
slog::warn!(logs, "failed to update webhook touch error={}", e); tracing::warn!("failed to update webhook touch error={}", e);
} }
} }

View File

@ -6,7 +6,6 @@ use models::repos::repo;
use models::users::{user, user_token}; use models::users::{user, user_token};
use sea_orm::sqlx::types::chrono; use sea_orm::sqlx::types::chrono;
use sea_orm::*; use sea_orm::*;
use slog::Logger;
pub async fn verify_access_token( pub async fn verify_access_token(
db: &AppDatabase, db: &AppDatabase,
@ -45,7 +44,6 @@ pub async fn verify_access_token(
pub async fn authorize_repo_access( pub async fn authorize_repo_access(
req: &HttpRequest, req: &HttpRequest,
db: &AppDatabase, db: &AppDatabase,
logger: &Logger,
repo: &repo::Model, repo: &repo::Model,
is_write: bool, is_write: bool,
) -> Result<(), Error> { ) -> Result<(), Error> {
@ -55,7 +53,7 @@ pub async fn authorize_repo_access(
let (username, access_key) = extract_basic_credentials(req)?; let (username, access_key) = extract_basic_credentials(req)?;
let user = verify_access_token(db, &username, &access_key).await?; let user = verify_access_token(db, &username, &access_key).await?;
let authz = SshAuthService::new(db.clone(), logger.clone()); let authz = SshAuthService::new(db.clone());
let can_access = authz.check_repo_permission(&user, repo, is_write).await; let can_access = authz.check_repo_permission(&user, repo, is_write).await;
if !can_access { if !can_access {

View File

@ -4,7 +4,6 @@ use futures_util::Stream;
use futures_util::StreamExt; use futures_util::StreamExt;
use models::repos::{repo, repo_branch_protect}; use models::repos::{repo, repo_branch_protect};
use sea_orm::*; use sea_orm::*;
use slog::{error, info, warn, Logger};
use std::path::PathBuf; use std::path::PathBuf;
use std::pin::Pin; use std::pin::Pin;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -25,16 +24,14 @@ pub struct GitHttpHandler {
storage_path: PathBuf, storage_path: PathBuf,
repo: repo::Model, repo: repo::Model,
db: AppDatabase, db: AppDatabase,
logger: Logger,
} }
impl GitHttpHandler { impl GitHttpHandler {
pub fn new(storage_path: PathBuf, repo: repo::Model, db: AppDatabase, logger: Logger) -> Self { pub fn new(storage_path: PathBuf, repo: repo::Model, db: AppDatabase) -> Self {
Self { Self {
storage_path, storage_path,
repo, repo,
db, db,
logger,
} }
} }
@ -96,7 +93,7 @@ impl GitHttpHandler {
mut payload: web::Payload, mut payload: web::Payload,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let started = Instant::now(); let started = Instant::now();
info!(self.logger, "git_rpc_started"; "service" => %service, "repo" => %self.repo.repo_name, "repo_id" => self.repo.id.to_string()); tracing::info!("git_rpc_started service={} repo={} repo_id={}", service, self.repo.repo_name, self.repo.id.to_string());
let mut child = tokio::process::Command::new("git") let mut child = tokio::process::Command::new("git")
.arg(service) .arg(service)
.arg("--stateless-rpc") .arg("--stateless-rpc")
@ -138,7 +135,7 @@ impl GitHttpHandler {
// Reject oversized pre-PACK data to prevent memory exhaustion // Reject oversized pre-PACK data to prevent memory exhaustion
if pre_pack.len() + bytes.len() > PRE_PACK_LIMIT { if pre_pack.len() + bytes.len() > PRE_PACK_LIMIT {
warn!(self.logger, "git_rpc_payload_too_large"; "service" => %service, "repo" => %self.repo.repo_name, "repo_id" => self.repo.id.to_string()); tracing::warn!("git_rpc_payload_too_large service={} repo={} repo_id={}", service, self.repo.repo_name, self.repo.id.to_string());
return Err(actix_web::error::ErrorPayloadTooLarge(format!( return Err(actix_web::error::ErrorPayloadTooLarge(format!(
"Ref negotiation exceeds {} byte limit", "Ref negotiation exceeds {} byte limit",
PRE_PACK_LIMIT PRE_PACK_LIMIT
@ -149,7 +146,7 @@ impl GitHttpHandler {
pre_pack.extend_from_slice(&bytes[..pos]); pre_pack.extend_from_slice(&bytes[..pos]);
if let Err(msg) = check_branch_protection(&branch_protects, &pre_pack) { if let Err(msg) = check_branch_protection(&branch_protects, &pre_pack) {
warn!(self.logger, "branch_protection_violation"; "repo" => %self.repo.repo_name, "repo_id" => self.repo.id.to_string(), "message" => %msg); tracing::warn!("branch_protection_violation repo={} repo_id={} message={}", self.repo.repo_name, self.repo.id.to_string(), msg);
return Err(actix_web::error::ErrorForbidden(msg)); return Err(actix_web::error::ErrorForbidden(msg));
} }
@ -210,7 +207,7 @@ impl GitHttpHandler {
if !output.status.success() { if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr); let stderr = String::from_utf8_lossy(&output.stderr);
let ms = started.elapsed().as_millis() as u64; let ms = started.elapsed().as_millis() as u64;
error!(self.logger, "git_rpc_failed"; "service" => %service, "repo" => %self.repo.repo_name, "repo_id" => self.repo.id.to_string(), "duration_ms" => ms, "stderr" => %stderr.to_string()); tracing::error!("git_rpc_failed service={} repo={} repo_id={} duration_ms={} stderr={}", service, self.repo.repo_name, self.repo.id.to_string(), ms, stderr.to_string());
return Err(actix_web::error::ErrorInternalServerError(format!( return Err(actix_web::error::ErrorInternalServerError(format!(
"Git command failed: {}", "Git command failed: {}",
stderr stderr
@ -218,7 +215,7 @@ impl GitHttpHandler {
} }
let ms = started.elapsed().as_millis() as u64; let ms = started.elapsed().as_millis() as u64;
info!(self.logger, "git_rpc_completed"; "service" => %service, "repo" => %self.repo.repo_name, "repo_id" => self.repo.id.to_string(), "duration_ms" => ms, "bytes_out" => output.stdout.len()); tracing::info!("git_rpc_completed service={} repo={} repo_id={} duration_ms={} bytes_out={}", service, self.repo.repo_name, self.repo.id.to_string(), ms, output.stdout.len());
Ok(HttpResponse::Ok() Ok(HttpResponse::Ok()
.content_type(format!("application/x-git-{}-result", service)) .content_type(format!("application/x-git-{}-result", service))

View File

@ -3,7 +3,6 @@ use actix_web::{App, HttpServer, web};
use config::AppConfig; use config::AppConfig;
use db::cache::AppCache; use db::cache::AppCache;
use db::database::AppDatabase; use db::database::AppDatabase;
use slog::{Logger, error, info};
use std::sync::Arc; use std::sync::Arc;
pub mod auth; pub mod auth;
@ -20,7 +19,6 @@ pub struct HttpAppState {
pub cache: AppCache, pub cache: AppCache,
pub sync: crate::ssh::ReceiveSyncService, pub sync: crate::ssh::ReceiveSyncService,
pub rate_limiter: Arc<rate_limit::RateLimiter>, pub rate_limiter: Arc<rate_limit::RateLimiter>,
pub logger: Logger,
} }
pub fn git_http_cfg(cfg: &mut web::ServiceConfig) { pub fn git_http_cfg(cfg: &mut web::ServiceConfig) {
@ -66,7 +64,7 @@ pub fn git_http_cfg(cfg: &mut web::ServiceConfig) {
); );
} }
pub async fn run_http(config: AppConfig, logger: Logger) -> anyhow::Result<()> { pub async fn run_http(config: AppConfig) -> anyhow::Result<()> {
let (db, app_cache) = tokio::join!(AppDatabase::init(&config), AppCache::init(&config),); let (db, app_cache) = tokio::join!(AppDatabase::init(&config), AppCache::init(&config),);
let db = db?; let db = db?;
let app_cache = app_cache?; let app_cache = app_cache?;
@ -76,13 +74,12 @@ pub async fn run_http(config: AppConfig, logger: Logger) -> anyhow::Result<()> {
db.clone(), db.clone(),
app_cache.clone(), app_cache.clone(),
redis_pool.clone(), redis_pool.clone(),
logger.clone(),
config.clone(), config.clone(),
); );
let _worker_cancel = hook.start_worker(); let _worker_cancel = hook.start_worker();
slog::info!(logger, "hook worker started"); tracing::info!("hook worker started");
let sync = crate::ssh::ReceiveSyncService::new(redis_pool.clone(), logger.clone()); let sync = crate::ssh::ReceiveSyncService::new(redis_pool.clone());
let rate_limiter = Arc::new(rate_limit::RateLimiter::new( let rate_limiter = Arc::new(rate_limit::RateLimiter::new(
rate_limit::RateLimitConfig::default(), rate_limit::RateLimitConfig::default(),
@ -94,11 +91,9 @@ pub async fn run_http(config: AppConfig, logger: Logger) -> anyhow::Result<()> {
cache: app_cache.clone(), cache: app_cache.clone(),
sync, sync,
rate_limiter, rate_limiter,
logger: logger.clone(),
}; };
let logger_startup = logger.clone(); tracing::info!("Starting git HTTP server on 0.0.0.0:8021");
info!(&logger_startup, "Starting git HTTP server on 0.0.0.0:8021");
let server = HttpServer::new(move || { let server = HttpServer::new(move || {
App::new() App::new()
@ -112,9 +107,9 @@ pub async fn run_http(config: AppConfig, logger: Logger) -> anyhow::Result<()> {
// workers finish in-flight requests then exit (graceful shutdown). // workers finish in-flight requests then exit (graceful shutdown).
let result = server.await; let result = server.await;
if let Err(e) = result { if let Err(e) = result {
error!(&logger, "HTTP server error: {}", e); tracing::error!("HTTP server error: {}", e);
} }
info!(&logger, "Git HTTP server stopped"); tracing::info!("Git HTTP server stopped");
Ok(()) Ok(())
} }

View File

@ -34,10 +34,10 @@ pub async fn info_refs(
let path_inner = path.into_inner(); let path_inner = path.into_inner();
let model = get_repo_model(&path_inner.0, &path_inner.1, &state.db).await?; let model = get_repo_model(&path_inner.0, &path_inner.1, &state.db).await?;
let is_write = service_param == "git-receive-pack"; let is_write = service_param == "git-receive-pack";
authorize_repo_access(&req, &state.db, &state.logger, &model, is_write).await?; authorize_repo_access(&req, &state.db, &model, is_write).await?;
let storage_path = PathBuf::from(&model.storage_path); let storage_path = PathBuf::from(&model.storage_path);
let handler = GitHttpHandler::new(storage_path, model, state.db.clone(), state.logger.clone()); let handler = GitHttpHandler::new(storage_path, model, state.db.clone());
handler.info_refs(service_param).await handler.info_refs(service_param).await
} }
@ -56,10 +56,10 @@ pub async fn upload_pack(
let path_inner = path.into_inner(); let path_inner = path.into_inner();
let model = get_repo_model(&path_inner.0, &path_inner.1, &state.db).await?; let model = get_repo_model(&path_inner.0, &path_inner.1, &state.db).await?;
authorize_repo_access(&req, &state.db, &state.logger, &model, false).await?; authorize_repo_access(&req, &state.db, &model, false).await?;
let storage_path = PathBuf::from(&model.storage_path); let storage_path = PathBuf::from(&model.storage_path);
let handler = GitHttpHandler::new(storage_path, model, state.db.clone(), state.logger.clone()); let handler = GitHttpHandler::new(storage_path, model, state.db.clone());
handler.upload_pack(payload).await handler.upload_pack(payload).await
} }
@ -78,10 +78,10 @@ pub async fn receive_pack(
let path_inner = path.into_inner(); let path_inner = path.into_inner();
let model = get_repo_model(&path_inner.0, &path_inner.1, &state.db).await?; let model = get_repo_model(&path_inner.0, &path_inner.1, &state.db).await?;
authorize_repo_access(&req, &state.db, &state.logger, &model, true).await?; authorize_repo_access(&req, &state.db, &model, true).await?;
let storage_path = PathBuf::from(&model.storage_path); let storage_path = PathBuf::from(&model.storage_path);
let handler = GitHttpHandler::new(storage_path, model.clone(), state.db.clone(), state.logger.clone()); let handler = GitHttpHandler::new(storage_path, model.clone(), state.db.clone());
let result = handler.receive_pack(payload).await; let result = handler.receive_pack(payload).await;
let _ = tokio::spawn({ let _ = tokio::spawn({

View File

@ -8,17 +8,15 @@ use models::users::{user, user_ssh_key};
use sea_orm::sqlx::types::chrono; use sea_orm::sqlx::types::chrono;
use sea_orm::*; use sea_orm::*;
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use slog::{Logger, error, info, warn};
/// SSH authentication service optimized for performance /// SSH authentication service optimized for performance
pub struct SshAuthService { pub struct SshAuthService {
db: AppDatabase, db: AppDatabase,
logger: Logger,
} }
impl SshAuthService { impl SshAuthService {
pub fn new(db: AppDatabase, logger: Logger) -> Self { pub fn new(db: AppDatabase) -> Self {
Self { db, logger } Self { db }
} }
pub async fn find_repo( pub async fn find_repo(
@ -103,7 +101,7 @@ impl SshAuthService {
let fingerprint = match self.generate_fingerprint_from_public_key(public_key_str) { let fingerprint = match self.generate_fingerprint_from_public_key(public_key_str) {
Ok(fp) => fp, Ok(fp) => fp,
Err(e) => { Err(e) => {
error!(self.logger, "{}", format!("Failed to generate fingerprint error={}", e)); tracing::error!("failed to generate SSH key fingerprint error={}", e);
return Ok(None); return Ok(None);
} }
}; };
@ -113,7 +111,7 @@ impl SshAuthService {
} else { } else {
fingerprint.clone() fingerprint.clone()
}; };
info!(self.logger, "{}", format!("Looking up user with SSH key fingerprint={}", fingerprint_preview)); tracing::info!("looking up user with SSH key fingerprint={}", fingerprint_preview);
let ssh_key = user_ssh_key::Entity::find() let ssh_key = user_ssh_key::Entity::find()
.filter(user_ssh_key::Column::Fingerprint.eq(&fingerprint)) .filter(user_ssh_key::Column::Fingerprint.eq(&fingerprint))
@ -124,13 +122,13 @@ impl SshAuthService {
let ssh_key = match ssh_key { let ssh_key = match ssh_key {
Some(key) => key, Some(key) => key,
None => { None => {
warn!(self.logger, "{}", format!("No SSH key found fingerprint={}", fingerprint)); tracing::warn!("no SSH key found fingerprint={}", fingerprint);
return Ok(None); return Ok(None);
} }
}; };
if self.is_key_expired(&ssh_key) { if self.is_key_expired(&ssh_key) {
warn!(self.logger, "{}", format!("SSH key expired key_id={} expires_at={:?}", ssh_key.id, ssh_key.expires_at)); tracing::warn!("SSH key expired key_id={} expires_at={:?}", ssh_key.id, ssh_key.expires_at);
return Ok(None); return Ok(None);
} }
@ -140,7 +138,7 @@ impl SshAuthService {
.await?; .await?;
if let Some(ref user) = user_model { if let Some(ref user) = user_model {
info!(self.logger, "{}", format!("User authenticated user={} key={}", user.username, ssh_key.title)); tracing::info!("user authenticated via SSH key user={} key={}", user.username, ssh_key.title);
self.update_key_last_used_async(ssh_key.id); self.update_key_last_used_async(ssh_key.id);
} }
@ -158,17 +156,15 @@ impl SshAuthService {
fn update_key_last_used_async(&self, key_id: i64) { fn update_key_last_used_async(&self, key_id: i64) {
let db_clone = self.db.clone(); let db_clone = self.db.clone();
let logger = self.logger.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = Self::update_key_last_used_sync(db_clone, &logger, key_id).await { if let Err(e) = Self::update_key_last_used_sync(db_clone, key_id).await {
warn!(&logger, "{}", format!("Failed to update key last_used key_id={} error={}", key_id, e)); tracing::warn!("failed to update key last_used key_id={} error={}", key_id, e);
} }
}); });
} }
async fn update_key_last_used_sync( async fn update_key_last_used_sync(
db: AppDatabase, db: AppDatabase,
logger: &Logger,
key_id: i64, key_id: i64,
) -> Result<(), DbErr> { ) -> Result<(), DbErr> {
let key = user_ssh_key::Entity::find_by_id(key_id) let key = user_ssh_key::Entity::find_by_id(key_id)
@ -182,7 +178,7 @@ impl SshAuthService {
active_key.updated_at = Set(now); active_key.updated_at = Set(now);
active_key.update(db.writer()).await?; active_key.update(db.writer()).await?;
info!(logger, "{}", format!("Updated key last_used key_id={}", key_id)); tracing::info!("updated key last_used key_id={}", key_id);
} }
Ok(()) Ok(())
@ -195,12 +191,12 @@ impl SshAuthService {
is_write: bool, is_write: bool,
) -> bool { ) -> bool {
if repo.created_by == user.uid { if repo.created_by == user.uid {
info!(self.logger, "{}", format!("User is repo owner user={} repo={}", user.username, repo.repo_name)); tracing::info!("user is repo owner user={} repo={}", user.username, repo.repo_name);
return true; return true;
} }
if !is_write && !repo.is_private { if !is_write && !repo.is_private {
info!(self.logger, "{}", format!("Public repo allows read repo={}", repo.repo_name)); tracing::info!("public repo allows read access repo={}", repo.repo_name);
return true; return true;
} }
@ -209,7 +205,7 @@ impl SshAuthService {
.await .await
.unwrap_or(false) .unwrap_or(false)
{ {
info!(self.logger, "{}", format!("User has collaborator access user={} repo={}", user.username, repo.repo_name)); tracing::info!("user has collaborator access user={} repo={}", user.username, repo.repo_name);
return true; return true;
} }
@ -219,11 +215,11 @@ impl SshAuthService {
.await .await
.unwrap_or(false) .unwrap_or(false)
{ {
info!(self.logger, "{}", format!("User has project member access user={} repo={}", user.username, repo.repo_name)); tracing::info!("user has project member access user={} repo={}", user.username, repo.repo_name);
return true; return true;
} }
warn!(self.logger, "{}", format!("Access denied user={} repo={} write={}", user.username, repo.repo_name, is_write)); tracing::warn!("access denied user={} repo={} is_write={}", user.username, repo.repo_name, is_write);
false false
} }
@ -251,7 +247,7 @@ impl SshAuthService {
return Ok(true); return Ok(true);
} }
warn!(self.logger, "{}", format!("Collaborator has no valid roles scope={}", collab.scope)); tracing::warn!("collaborator has no valid roles scope={}", collab.scope);
Ok(false) Ok(false)
} else { } else {
Ok(false) Ok(false)

View File

@ -12,7 +12,6 @@ use russh::{Channel, ChannelId, CryptoVec, Disconnect};
use sea_orm::ColumnTrait; use sea_orm::ColumnTrait;
use sea_orm::EntityTrait; use sea_orm::EntityTrait;
use sea_orm::QueryFilter; use sea_orm::QueryFilter;
use slog::{Logger, error, info, warn};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::io; use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -77,7 +76,6 @@ pub struct SSHandle {
pub cache: AppCache, pub cache: AppCache,
pub sync: ReceiveSyncService, pub sync: ReceiveSyncService,
pub upload_pack_eof_sent: HashSet<ChannelId>, pub upload_pack_eof_sent: HashSet<ChannelId>,
pub logger: Logger,
pub token_service: SshTokenService, pub token_service: SshTokenService,
pub client_addr: Option<SocketAddr>, pub client_addr: Option<SocketAddr>,
} }
@ -87,15 +85,14 @@ impl SSHandle {
db: AppDatabase, db: AppDatabase,
cache: AppCache, cache: AppCache,
sync: ReceiveSyncService, sync: ReceiveSyncService,
logger: Logger,
token_service: SshTokenService, token_service: SshTokenService,
client_addr: Option<SocketAddr>, client_addr: Option<SocketAddr>,
) -> Self { ) -> Self {
let auth = SshAuthService::new(db.clone(), logger.clone()); let auth = SshAuthService::new(db.clone());
let addr_str = client_addr let addr_str = client_addr
.map(|addr| format!("{}", addr)) .map(|addr| format!("{}", addr))
.unwrap_or_else(|| "unknown".to_string()); .unwrap_or_else(|| "unknown".to_string());
info!(logger, "SSH handler created for client: {}", addr_str); tracing::info!("SSH handler created client={}", addr_str);
Self { Self {
repo: None, repo: None,
model: None, model: None,
@ -110,7 +107,6 @@ impl SSHandle {
cache, cache,
sync, sync,
upload_pack_eof_sent: HashSet::new(), upload_pack_eof_sent: HashSet::new(),
logger,
token_service, token_service,
client_addr, client_addr,
} }
@ -137,7 +133,7 @@ impl Drop for SSHandle {
.client_addr .client_addr
.map(|addr| format!("{}", addr)) .map(|addr| format!("{}", addr))
.unwrap_or_else(|| "unknown".to_string()); .unwrap_or_else(|| "unknown".to_string());
info!(self.logger, "ssh_handler_dropped"; "client" => %addr_str); tracing::info!("ssh_handler_dropped client={}", addr_str);
let channel_ids: Vec<_> = self.stdin.keys().copied().collect(); let channel_ids: Vec<_> = self.stdin.keys().copied().collect();
for channel_id in channel_ids { for channel_id in channel_ids {
@ -154,7 +150,7 @@ impl russh::server::Handler for SSHandle {
.client_addr .client_addr
.map(|addr| format!("{}", addr)) .map(|addr| format!("{}", addr))
.unwrap_or_else(|| "unknown".to_string()); .unwrap_or_else(|| "unknown".to_string());
info!(self.logger, "auth_none_received"; "user" => %user, "client" => %client_info); tracing::info!("auth_none_received user={} client={}", user, client_info);
Ok(Auth::UnsupportedMethod) Ok(Auth::UnsupportedMethod)
} }
@ -166,25 +162,25 @@ impl russh::server::Handler for SSHandle {
if token.is_empty() { if token.is_empty() {
warn!(self.logger, "auth_rejected_empty_token"; "client" => %client_info); tracing::warn!("auth_rejected_empty_token client={}", client_info);
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
info!(self.logger, "auth_token_attempt"; "client" => %client_info); tracing::info!("auth_token_attempt client={}", client_info);
let user_model = match self.token_service.find_user_by_token(token).await { let user_model = match self.token_service.find_user_by_token(token).await {
Ok(Some(model)) => model, Ok(Some(model)) => model,
Ok(None) => { Ok(None) => {
warn!(self.logger, "auth_rejected_token_not_found"; "client" => %client_info); tracing::warn!("auth_rejected_token_not_found client={}", client_info);
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
Err(e) => { Err(e) => {
error!(self.logger, "auth_token_error"; "error" => %e.to_string(), "client" => %client_info); tracing::error!("auth_token_error error={}", e);
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
}; };
info!(self.logger, "auth_token_success"; "user" => %user_model.username, "client" => %client_info); tracing::info!("auth_token_success user={} client={}", user_model.username, client_info);
self.operator = Some(user_model); self.operator = Some(user_model);
Ok(Auth::Accept) Ok(Auth::Accept)
} }
@ -206,29 +202,29 @@ impl russh::server::Handler for SSHandle {
.unwrap_or_else(|| "unknown".to_string()); .unwrap_or_else(|| "unknown".to_string());
if user != "git" { if user != "git" {
warn!(self.logger, "auth_rejected_invalid_username"; "user" => %user, "client" => %client_info); tracing::warn!("auth_rejected_invalid_username user={} client={}", user, client_info);
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
let public_key_str = public_key.to_string(); let public_key_str = public_key.to_string();
if public_key_str.len() < 32 { if public_key_str.len() < 32 {
warn!(self.logger, "auth_rejected_invalid_key_length"; "key_length" => public_key_str.len(), "client" => %client_info); tracing::warn!("auth_rejected_invalid_key_length key_length={}", public_key_str.len());
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
info!(self.logger, "auth_publickey_attempt"; "client" => %client_info); tracing::info!("auth_publickey_attempt client={}", client_info);
let user_model = match self.auth.find_user_by_public_key(&public_key_str).await { let user_model = match self.auth.find_user_by_public_key(&public_key_str).await {
Ok(Some(model)) => model, Ok(Some(model)) => model,
Ok(None) => { Ok(None) => {
warn!(self.logger, "auth_rejected_key_not_found"; "client" => %client_info); tracing::warn!("auth_rejected_key_not_found client={}", client_info);
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
Err(e) => { Err(e) => {
error!(self.logger, "auth_publickey_error"; "error" => %e.to_string(), "client" => %client_info); tracing::error!("auth_publickey_error error={}", e);
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
}; };
info!(self.logger, "auth_publickey_success"; "user" => %user_model.username, "client" => %client_info); tracing::info!("auth_publickey_success user={} client={}", user_model.username, client_info);
self.operator = Some(user_model); self.operator = Some(user_model);
Ok(Auth::Accept) Ok(Auth::Accept)
} }
@ -243,29 +239,29 @@ impl russh::server::Handler for SSHandle {
.unwrap_or_else(|| "unknown".to_string()); .unwrap_or_else(|| "unknown".to_string());
if user != "git" { if user != "git" {
warn!(self.logger, "auth_rejected_invalid_username"; "user" => %user, "client" => %client_info); tracing::warn!("auth_rejected_invalid_username user={} client={}", user, client_info);
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
let public_key_str = certificate.to_string(); let public_key_str = certificate.to_string();
if public_key_str.len() < 32 { if public_key_str.len() < 32 {
warn!(self.logger, "auth_rejected_invalid_key_length"; "key_length" => public_key_str.len(), "client" => %client_info); tracing::warn!("auth_rejected_invalid_key_length key_length={}", public_key_str.len());
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
info!(self.logger, "auth_publickey_attempt"; "client" => %client_info); tracing::info!("auth_publickey_attempt client={}", client_info);
let user_model = match self.auth.find_user_by_public_key(&public_key_str).await { let user_model = match self.auth.find_user_by_public_key(&public_key_str).await {
Ok(Some(model)) => model, Ok(Some(model)) => model,
Ok(None) => { Ok(None) => {
warn!(self.logger, "auth_rejected_key_not_found"; "client" => %client_info); tracing::warn!("auth_rejected_key_not_found client={}", client_info);
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
Err(e) => { Err(e) => {
error!(self.logger, "auth_publickey_error"; "error" => %e.to_string(), "client" => %client_info); tracing::error!("auth_publickey_error error={}", e);
return Err(russh::Error::NotAuthenticated); return Err(russh::Error::NotAuthenticated);
} }
}; };
info!(self.logger, "auth_publickey_success"; "user" => %user_model.username, "client" => %client_info); tracing::info!("auth_publickey_success user={} client={}", user_model.username, client_info);
self.operator = Some(user_model); self.operator = Some(user_model);
Ok(Auth::Accept) Ok(Auth::Accept)
} }
@ -278,7 +274,7 @@ impl russh::server::Handler for SSHandle {
channel: ChannelId, channel: ChannelId,
_: &mut Session, _: &mut Session,
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
info!(self.logger, "channel_close"; "channel" => ?channel, "client" => ?self.client_addr); tracing::info!("channel_close channel={:?} client={:?}", channel, self.client_addr);
self.cleanup_channel(channel); self.cleanup_channel(channel);
Ok(()) Ok(())
} }
@ -288,35 +284,23 @@ impl russh::server::Handler for SSHandle {
channel: ChannelId, channel: ChannelId,
_: &mut Session, _: &mut Session,
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
info!(self.logger, "channel_eof"; tracing::info!("channel_eof channel={:?} client={:?}", channel, self.client_addr);
"channel" => ?channel,
"client" => ?self.client_addr
);
if let Some(eof) = self.eof.get(&channel) { if let Some(eof) = self.eof.get(&channel) {
let _ = eof.send(true).await; let _ = eof.send(true).await;
} }
if let Some(mut stdin) = self.stdin.remove(&channel) { if let Some(mut stdin) = self.stdin.remove(&channel) {
info!(self.logger, "Closing stdin"; tracing::info!("Closing stdin channel={:?} client={:?}", channel, self.client_addr);
"channel" => ?channel,
"client" => ?self.client_addr
);
// Use timeout so we never block the SSH event loop waiting for git. // Use timeout so we never block the SSH event loop waiting for git.
let _ = tokio::time::timeout(Duration::from_secs(5), async { let _ = tokio::time::timeout(Duration::from_secs(5), async {
stdin.flush().await.ok(); stdin.flush().await.ok();
let _ = stdin.shutdown().await; let _ = stdin.shutdown().await;
}) })
.await; .await;
info!(self.logger, "stdin closed"; tracing::info!("stdin closed channel={:?} client={:?}", channel, self.client_addr);
"channel" => ?channel,
"client" => ?self.client_addr
);
} else { } else {
warn!(self.logger, "stdin already removed"; tracing::warn!("stdin already removed channel={:?} client={:?}", channel, self.client_addr);
"channel" => ?channel,
"client" => ?self.client_addr
);
} }
Ok(()) Ok(())
@ -331,7 +315,7 @@ impl russh::server::Handler for SSHandle {
.client_addr .client_addr
.map(|addr| format!("{}", addr)) .map(|addr| format!("{}", addr))
.unwrap_or_else(|| "unknown".to_string()); .unwrap_or_else(|| "unknown".to_string());
info!(self.logger, "channel_open_session"; "channel" => ?channel, "client" => %client_info); tracing::info!("channel_open_session channel={:?} client={}", channel, client_info);
let _ = session.flush().ok(); let _ = session.flush().ok();
Ok(true) Ok(true)
} }
@ -347,11 +331,7 @@ impl russh::server::Handler for SSHandle {
_modes: &[(russh::Pty, u32)], _modes: &[(russh::Pty, u32)],
session: &mut Session, session: &mut Session,
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
let client_info = self tracing::warn!("pty_request not supported channel={:?} term={} cols={} rows={}", channel, term, col_width, row_height);
.client_addr
.map(|addr| format!("{}", addr))
.unwrap_or_else(|| "unknown".to_string());
warn!(self.logger, "pty_request not supported"; "channel" => ?channel, "term" => %term, "cols" => col_width, "rows" => row_height, "client" => %client_info);
let _ = session.flush().ok(); let _ = session.flush().ok();
Ok(()) Ok(())
} }
@ -362,11 +342,7 @@ impl russh::server::Handler for SSHandle {
name: &str, name: &str,
session: &mut Session, session: &mut Session,
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
let client_info = self tracing::info!("subsystem_request channel={:?} subsystem={}", channel, name);
.client_addr
.map(|addr| format!("{}", addr))
.unwrap_or_else(|| "unknown".to_string());
info!(self.logger, "subsystem_request"; "channel" => ?channel, "subsystem" => %name, "client" => %client_info);
// git-clients may send "subsystem" for git protocol over ssh. // git-clients may send "subsystem" for git protocol over ssh.
// We don't use subsystem; exec_request handles it directly. // We don't use subsystem; exec_request handles it directly.
let _ = session.flush().ok(); let _ = session.flush().ok();
@ -423,7 +399,7 @@ impl russh::server::Handler for SSHandle {
self.branch.insert(channel, refs); self.branch.insert(channel, refs);
} }
Err(e) => { Err(e) => {
warn!(self.logger, "ref_update_parse_error"; "error" => ?e); tracing::warn!("ref_update_parse_error error={:?}", e);
self.branch.insert(channel, vec![]); self.branch.insert(channel, vec![]);
} }
} }
@ -432,7 +408,7 @@ impl russh::server::Handler for SSHandle {
stdin.write_all(&buffered).await?; stdin.write_all(&buffered).await?;
stdin.flush().await?; stdin.flush().await?;
} else { } else {
error!(self.logger, "stdin_not_found"; "channel" => ?channel); tracing::error!("stdin_not_found channel={:?}", channel);
} }
return Ok(()); return Ok(());
} }
@ -441,7 +417,7 @@ impl russh::server::Handler for SSHandle {
stdin.write_all(data).await?; stdin.write_all(data).await?;
stdin.flush().await?; stdin.flush().await?;
} else { } else {
error!(self.logger, "stdin_not_found_forwarding"; "channel" => ?channel); tracing::error!("stdin_not_found_forwarding channel={:?}", channel);
} }
return Ok(()); return Ok(());
} }
@ -472,7 +448,7 @@ impl russh::server::Handler for SSHandle {
user.username user.username
); );
info!(self.logger, "shell_request"; "user" => %user.username); tracing::info!("shell_request user={}", user.username);
session session
.data(channel_id, CryptoVec::from_slice(welcome_msg.as_bytes())) .data(channel_id, CryptoVec::from_slice(welcome_msg.as_bytes()))
.ok(); .ok();
@ -481,7 +457,7 @@ impl russh::server::Handler for SSHandle {
session.close(channel_id).ok(); session.close(channel_id).ok();
let _ = session.flush().ok(); let _ = session.flush().ok();
} else { } else {
warn!(self.logger, "shell_request_unauthenticated"; "channel" => ?channel_id); tracing::warn!("shell_request_unauthenticated channel={:?}", channel_id);
let msg = "Authentication required\r\n"; let msg = "Authentication required\r\n";
session session
.data(channel_id, CryptoVec::from_slice(msg.as_bytes())) .data(channel_id, CryptoVec::from_slice(msg.as_bytes()))
@ -504,15 +480,12 @@ impl russh::server::Handler for SSHandle {
.map(|addr| format!("{}", addr)) .map(|addr| format!("{}", addr))
.unwrap_or_else(|| "unknown".to_string()); .unwrap_or_else(|| "unknown".to_string());
info!( tracing::info!("exec_request received channel={:?} client={}", channel_id, client_info);
self.logger,
"exec_request received, channel: {:?}, client: {}", channel_id, client_info
);
let git_shell_cmd = match std::str::from_utf8(data) { let git_shell_cmd = match std::str::from_utf8(data) {
Ok(cmd) => cmd.trim(), Ok(cmd) => cmd.trim(),
Err(e) => { Err(e) => {
error!(self.logger, "invalid_command_encoding"; "error" => %e.to_string()); tracing::error!("invalid_command_encoding error={}", e);
session session
.disconnect( .disconnect(
Disconnect::ServiceNotAvailable, Disconnect::ServiceNotAvailable,
@ -526,7 +499,7 @@ impl russh::server::Handler for SSHandle {
let (service, path) = match parse_git_command(git_shell_cmd) { let (service, path) = match parse_git_command(git_shell_cmd) {
Some((s, p)) => (s, p), Some((s, p)) => (s, p),
None => { None => {
error!(self.logger, "invalid_git_command"; "command" => %git_shell_cmd); tracing::error!("invalid_git_command command={}", git_shell_cmd);
let msg = format!("Invalid git command: {}", git_shell_cmd); let msg = format!("Invalid git command: {}", git_shell_cmd);
session session
.disconnect(Disconnect::ServiceNotAvailable, &msg, "") .disconnect(Disconnect::ServiceNotAvailable, &msg, "")
@ -539,7 +512,7 @@ impl russh::server::Handler for SSHandle {
Some(pair) => pair, Some(pair) => pair,
None => { None => {
let msg = format!("Invalid repository path: {}", path); let msg = format!("Invalid repository path: {}", path);
error!(self.logger, "invalid_repo_path"; "path" => %path); tracing::error!("invalid_repo_path path={}", path);
session session
.disconnect(Disconnect::ServiceNotAvailable, &msg, "") .disconnect(Disconnect::ServiceNotAvailable, &msg, "")
.ok(); .ok();
@ -552,7 +525,7 @@ impl russh::server::Handler for SSHandle {
Ok(repo) => repo, Ok(repo) => repo,
Err(e) => { Err(e) => {
// Log the detailed error internally; client receives generic message. // Log the detailed error internally; client receives generic message.
error!(self.logger, "repo_fetch_error"; "error" => %e.to_string()); tracing::error!("repo_fetch_error error={}", e);
session session
.disconnect(Disconnect::ServiceNotAvailable, "Repository not found", "") .disconnect(Disconnect::ServiceNotAvailable, "Repository not found", "")
.ok(); .ok();
@ -565,7 +538,7 @@ impl russh::server::Handler for SSHandle {
Some(user) => user, Some(user) => user,
None => { None => {
let msg = "Authentication error: no authenticated user"; let msg = "Authentication error: no authenticated user";
error!(self.logger, "exec_no_authenticated_user"; "channel" => ?channel_id); tracing::error!("exec_no_authenticated_user channel={:?}", channel_id);
session.disconnect(Disconnect::ByApplication, msg, "").ok(); session.disconnect(Disconnect::ByApplication, msg, "").ok();
return Err(russh::Error::Disconnect); return Err(russh::Error::Disconnect);
} }
@ -584,20 +557,20 @@ impl russh::server::Handler for SSHandle {
if is_write { "write" } else { "read" }, if is_write { "write" } else { "read" },
repo.repo_name repo.repo_name
); );
error!(self.logger, "access_denied"; "user" => %operator.username, "repo" => %repo.repo_name, "is_write" => is_write); tracing::error!("access_denied user={} repo={} is_write={}", operator.username, repo.repo_name, is_write);
session.disconnect(Disconnect::ByApplication, &msg, "").ok(); session.disconnect(Disconnect::ByApplication, &msg, "").ok();
return Err(russh::Error::Disconnect); return Err(russh::Error::Disconnect);
} }
info!(self.logger, "access_granted"; "user" => %operator.username, "repo" => %repo.repo_name, "is_write" => is_write); tracing::info!("access_granted user={} repo={} is_write={}", operator.username, repo.repo_name, is_write);
let repo_path = PathBuf::from(&repo.storage_path); let repo_path = PathBuf::from(&repo.storage_path);
if !repo_path.exists() { if !repo_path.exists() {
error!(self.logger, "repo_path_not_found"; "path" => %repo.storage_path); tracing::error!("repo_path_not_found path={}", repo.storage_path);
} }
let mut cmd = build_git_command(service, repo_path); let mut cmd = build_git_command(service, repo_path);
let logger = self.logger.clone();
info!(&logger, "spawn_git_process"; "service" => ?service, "path" => %repo.storage_path); tracing::info!("spawn_git_process service={:?} path={}", service, repo.storage_path);
let mut shell = match cmd let mut shell = match cmd
.stdin(Stdio::piped()) .stdin(Stdio::piped())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
@ -609,7 +582,7 @@ impl russh::server::Handler for SSHandle {
shell shell
} }
Err(e) => { Err(e) => {
error!(&logger, "process_spawn_failed"; "error" => %e.to_string()); tracing::error!("process_spawn_failed error={}", e);
let _ = session.channel_failure(channel_id); let _ = session.channel_failure(channel_id);
self.cleanup_channel(channel_id); self.cleanup_channel(channel_id);
return Err(russh::Error::IO(e)); return Err(russh::Error::IO(e));
@ -626,9 +599,9 @@ impl russh::server::Handler for SSHandle {
let repo_uid = repo.id; let repo_uid = repo.id;
let should_sync = service == GitService::ReceivePack; let should_sync = service == GitService::ReceivePack;
let sync = self.sync.clone(); let sync = self.sync.clone();
let logger_for_fut = self.logger.clone();
let fut = async move { let fut = async move {
info!(&logger_for_fut, "git_task_started"; "channel" => ?channel_id); tracing::info!(channel = ?channel_id, "git_task_started");
let mut stdout_done = false; let mut stdout_done = false;
let mut stderr_done = false; let mut stderr_done = false;
@ -655,7 +628,7 @@ impl russh::server::Handler for SSHandle {
let status = result?; let status = result?;
let status_code = status.code().unwrap_or(128) as u32; let status_code = status.code().unwrap_or(128) as u32;
info!(&logger_for_fut, "git_process_exited"; "channel" => ?channel_id, "status" => status_code); tracing::info!("git_process_exited channel={:?} status={}", channel_id, status_code);
if !stdout_done || !stderr_done { if !stdout_done || !stderr_done {
let _ = tokio::time::timeout(Duration::from_millis(100), async { let _ = tokio::time::timeout(Duration::from_millis(100), async {
@ -685,21 +658,21 @@ impl russh::server::Handler for SSHandle {
sleep(Duration::from_millis(50)).await; sleep(Duration::from_millis(50)).await;
let _ = session_handle.eof(channel_id).await; let _ = session_handle.eof(channel_id).await;
let _ = session_handle.close(channel_id).await; let _ = session_handle.close(channel_id).await;
info!(&logger_for_fut, "channel_closed"; "channel" => ?channel_id); tracing::info!(channel = ?channel_id, "channel_closed");
break; break;
} }
result = &mut stdout_fut, if !stdout_done => { result = &mut stdout_fut, if !stdout_done => {
info!(&logger_for_fut, "stdout completed"); tracing::info!("stdout completed");
stdout_done = true; stdout_done = true;
if let Err(e) = result { if let Err(e) = result {
warn!(&logger_for_fut, "stdout_forward_error"; "error" => ?e); tracing::warn!(error = ?e, "stdout_forward_error");
} }
} }
result = &mut stderr_fut, if !stderr_done => { result = &mut stderr_fut, if !stderr_done => {
info!(&logger_for_fut, "stderr completed"); tracing::info!("stderr completed");
stderr_done = true; stderr_done = true;
if let Err(e) = result { if let Err(e) = result {
warn!(&logger_for_fut, "stderr_forward_error"; "error" => ?e); tracing::warn!(error = ?e, "stderr_forward_error");
} }
} }
} }
@ -710,7 +683,7 @@ impl russh::server::Handler for SSHandle {
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = fut.await { if let Err(e) = fut.await {
error!(&logger, "git_ssh_channel_task_error"; "error" => %e.to_string()); tracing::error!("git_ssh_channel_task_error error={}", e);
} }
while eof_rx.recv().await.is_some() {} while eof_rx.recv().await.is_some() {}
}); });

View File

@ -12,7 +12,6 @@ use russh::server::Server;
use russh::{MethodKind, MethodSet, SshId, server::Config}; use russh::{MethodKind, MethodSet, SshId, server::Config};
use sea_orm::prelude::*; use sea_orm::prelude::*;
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use slog::{Logger, error, info};
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -27,7 +26,6 @@ pub struct SSHHandle {
pub app: AppConfig, pub app: AppConfig,
pub cache: AppCache, pub cache: AppCache,
pub redis_pool: RedisPool, pub redis_pool: RedisPool,
pub logger: Logger,
} }
impl SSHHandle { impl SSHHandle {
@ -35,7 +33,7 @@ impl SSHHandle {
let this = self.clone(); let this = self.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = this.run_ssh().await { if let Err(e) = this.run_ssh().await {
error!(this.logger, "SSH server error: {}", e); tracing::error!("SSH server error: {}", e);
} }
}); });
} }
@ -44,18 +42,16 @@ impl SSHHandle {
app: AppConfig, app: AppConfig,
cache: AppCache, cache: AppCache,
redis_pool: RedisPool, redis_pool: RedisPool,
logger: Logger,
) -> Self { ) -> Self {
SSHHandle { SSHHandle {
db, db,
app, app,
cache, cache,
redis_pool, redis_pool,
logger,
} }
} }
pub async fn run_ssh(&self) -> anyhow::Result<()> { pub async fn run_ssh(&self) -> anyhow::Result<()> {
info!(self.logger, "SSH server starting"); tracing::info!("SSH server starting");
let private_key_content = self.app.ssh_server_private_key()?; let private_key_content = self.app.ssh_server_private_key()?;
if private_key_content.is_empty() { if private_key_content.is_empty() {
return Err(anyhow::anyhow!("SSH server private key is not configured")); return Err(anyhow::anyhow!("SSH server private key is not configured"));
@ -66,8 +62,7 @@ impl SSHHandle {
} else { } else {
private_key_content.clone() private_key_content.clone()
}; };
info!( tracing::info!(
self.logger,
"Loading SSH private key (hex, {} bytes)", "Loading SSH private key (hex, {} bytes)",
private_key_content.len() private_key_content.len()
); );
@ -79,8 +74,7 @@ impl SSHHandle {
) )
})?; })?;
info!( tracing::info!(
self.logger,
"Hex decoded to {} bytes", "Hex decoded to {} bytes",
private_key_bytes.len() private_key_bytes.len()
); );
@ -89,18 +83,18 @@ impl SSHHandle {
.with_context(|| "Decoded SSH private key is not valid UTF-8")?; .with_context(|| "Decoded SSH private key is not valid UTF-8")?;
if let Some(first_line) = private_key_pem.lines().next() { if let Some(first_line) = private_key_pem.lines().next() {
info!(self.logger, "PEM format starts with: {}", first_line); tracing::info!("PEM format starts with: {}", first_line);
} }
info!( tracing::info!(
self.logger, "Complete private key content:\n{}",
"Complete private key content:\n{}", private_key_pem private_key_pem
); );
let private_key = { let private_key = {
match ssh_key::PrivateKey::from_openssh(private_key_pem) { match ssh_key::PrivateKey::from_openssh(private_key_pem) {
Ok(ssh_key) => { Ok(ssh_key) => {
info!(self.logger, "Successfully parsed with ssh-key crate"); tracing::info!("Successfully parsed with ssh-key crate");
let openssh_pem = ssh_key let openssh_pem = ssh_key
.to_openssh(ssh_key::LineEnding::LF) .to_openssh(ssh_key::LineEnding::LF)
.with_context(|| "Failed to serialize to OpenSSH format")?; .with_context(|| "Failed to serialize to OpenSSH format")?;
@ -109,8 +103,7 @@ impl SSHHandle {
.with_context(|| "Failed to parse with russh after ssh-key conversion")? .with_context(|| "Failed to parse with russh after ssh-key conversion")?
} }
Err(e) => { Err(e) => {
info!( tracing::info!(
self.logger,
"ssh-key from_openssh failed: {}, trying direct russh parse", e "ssh-key from_openssh failed: {}, trying direct russh parse", e
); );
PrivateKey::from_str(private_key_pem).with_context(|| { PrivateKey::from_str(private_key_pem).with_context(|| {
@ -119,7 +112,7 @@ impl SSHHandle {
} }
} }
}; };
info!(self.logger, "SSH private key loaded"); tracing::info!("SSH private key loaded");
let mut config = Config::default(); let mut config = Config::default();
config.keys = vec![private_key]; config.keys = vec![private_key];
let version = format!("SSH-2.0-GitdataAI {}", env!("CARGO_PKG_VERSION")); let version = format!("SSH-2.0-GitdataAI {}", env!("CARGO_PKG_VERSION"));
@ -132,8 +125,7 @@ impl SSHHandle {
config.keepalive_interval = Some(Duration::from_secs(60)); config.keepalive_interval = Some(Duration::from_secs(60));
config.keepalive_max = 3; config.keepalive_max = 3;
info!( tracing::info!(
self.logger,
"SSH server configured with methods: {:?}", config.methods "SSH server configured with methods: {:?}", config.methods
); );
let token_service = SshTokenService::new(self.db.clone()); let token_service = SshTokenService::new(self.db.clone());
@ -141,7 +133,6 @@ impl SSHHandle {
self.db.clone(), self.db.clone(),
self.cache.clone(), self.cache.clone(),
self.redis_pool.clone(), self.redis_pool.clone(),
self.logger.clone(),
token_service, token_service,
); );
@ -161,7 +152,7 @@ impl SSHHandle {
ssh_port, public_host, ssh_port ssh_port, public_host, ssh_port
) )
}; };
info!(self.logger, "{}", msg); tracing::info!("{}", msg);
server.run_on_address(Arc::new(config), bind_addr).await?; server.run_on_address(Arc::new(config), bind_addr).await?;
Ok(()) Ok(())
} }
@ -172,15 +163,13 @@ impl SSHHandle {
#[derive(Clone)] #[derive(Clone)]
pub struct ReceiveSyncService { pub struct ReceiveSyncService {
pool: deadpool_redis::cluster::Pool, pool: deadpool_redis::cluster::Pool,
logger: Logger,
redis_prefix: String, redis_prefix: String,
} }
impl ReceiveSyncService { impl ReceiveSyncService {
pub fn new(pool: deadpool_redis::cluster::Pool, logger: Logger) -> Self { pub fn new(pool: deadpool_redis::cluster::Pool) -> Self {
Self { Self {
pool, pool,
logger,
redis_prefix: "{hook}".to_string(), redis_prefix: "{hook}".to_string(),
} }
} }
@ -199,7 +188,7 @@ impl ReceiveSyncService {
let task_json = match serde_json::to_string(&hook_task) { let task_json = match serde_json::to_string(&hook_task) {
Ok(j) => j, Ok(j) => j,
Err(e) => { Err(e) => {
error!(self.logger, "failed to serialize hook task: {}", e); tracing::error!("failed to serialize hook task: {}", e);
return; return;
} }
}; };
@ -209,7 +198,7 @@ impl ReceiveSyncService {
let redis = match self.pool.get().await { let redis = match self.pool.get().await {
Ok(c) => c, Ok(c) => c,
Err(e) => { Err(e) => {
error!(self.logger, "failed to get Redis connection: {}", e); tracing::error!("failed to get Redis connection: {}", e);
return; return;
} }
}; };
@ -221,7 +210,7 @@ impl ReceiveSyncService {
.query_async::<()>(&mut conn) .query_async::<()>(&mut conn)
.await .await
{ {
error!(self.logger, "failed to enqueue sync task repo_id={} error={}", tracing::error!("failed to enqueue sync task repo_id={} error={}",
task.repo_uid, e); task.repo_uid, e);
} }
} }
@ -282,8 +271,8 @@ impl SshTokenService {
} }
} }
pub async fn run_ssh(config: AppConfig, logger: Logger) -> anyhow::Result<()> { pub async fn run_ssh(config: AppConfig) -> anyhow::Result<()> {
info!(logger, "SSH server initializing"); tracing::info!("SSH server initializing");
let db = AppDatabase::init(&config).await?; let db = AppDatabase::init(&config).await?;
let cache = AppCache::init(&config).await?; let cache = AppCache::init(&config).await?;
let redis_pool = cache.redis_pool().clone(); let redis_pool = cache.redis_pool().clone();
@ -293,13 +282,12 @@ pub async fn run_ssh(config: AppConfig, logger: Logger) -> anyhow::Result<()> {
db.clone(), db.clone(),
cache.clone(), cache.clone(),
redis_pool.clone(), redis_pool.clone(),
logger.clone(),
config.clone(), config.clone(),
); );
let _worker_cancel = hook.start_worker(); let _worker_cancel = hook.start_worker();
slog::info!(logger, "hook worker started"); tracing::info!("hook worker started");
SSHHandle::new(db, config.clone(), cache, redis_pool, logger) SSHHandle::new(db, config.clone(), cache, redis_pool)
.run_ssh() .run_ssh()
.await?; .await?;
Ok(()) Ok(())

View File

@ -5,7 +5,6 @@ use db::cache::AppCache;
use db::database::AppDatabase; use db::database::AppDatabase;
use deadpool_redis::cluster::Pool as RedisPool; use deadpool_redis::cluster::Pool as RedisPool;
use russh::server::Handler; use russh::server::Handler;
use slog::{Logger, info, warn};
use std::io; use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -13,7 +12,6 @@ pub struct SSHServer {
pub db: AppDatabase, pub db: AppDatabase,
pub cache: AppCache, pub cache: AppCache,
pub redis_pool: RedisPool, pub redis_pool: RedisPool,
pub logger: Logger,
pub token_service: SshTokenService, pub token_service: SshTokenService,
} }
@ -22,14 +20,12 @@ impl SSHServer {
db: AppDatabase, db: AppDatabase,
cache: AppCache, cache: AppCache,
redis_pool: RedisPool, redis_pool: RedisPool,
logger: Logger,
token_service: SshTokenService, token_service: SshTokenService,
) -> Self { ) -> Self {
SSHServer { SSHServer {
db, db,
cache, cache,
redis_pool, redis_pool,
logger,
token_service, token_service,
} }
} }
@ -39,21 +35,15 @@ impl russh::server::Server for SSHServer {
fn new_client(&mut self, addr: Option<SocketAddr>) -> Self::Handler { fn new_client(&mut self, addr: Option<SocketAddr>) -> Self::Handler {
if let Some(addr) = addr { if let Some(addr) = addr {
info!( tracing::info!("New SSH connection ip={} port={}", addr.ip(), addr.port());
self.logger,
"New SSH connection from {}:{}",
addr.ip(),
addr.port()
);
} else { } else {
info!(self.logger, "New SSH connection from unknown address"); tracing::info!("New SSH connection from unknown address");
} }
let sync_service = ReceiveSyncService::new(self.redis_pool.clone(), self.logger.clone()); let sync_service = ReceiveSyncService::new(self.redis_pool.clone());
SSHandle::new( SSHandle::new(
self.db.clone(), self.db.clone(),
self.cache.clone(), self.cache.clone(),
sync_service, sync_service,
self.logger.clone(),
self.token_service.clone(), self.token_service.clone(),
addr, addr,
) )
@ -62,33 +52,28 @@ impl russh::server::Server for SSHServer {
fn handle_session_error(&mut self, error: <Self::Handler as Handler>::Error) { fn handle_session_error(&mut self, error: <Self::Handler as Handler>::Error) {
match error { match error {
russh::Error::Disconnect => { russh::Error::Disconnect => {
info!(self.logger, "Connection disconnected by peer"); tracing::info!("Connection disconnected by peer");
} }
russh::Error::Inconsistent => { russh::Error::Inconsistent => {
warn!(self.logger, "Protocol inconsistency detected"); tracing::warn!("Protocol inconsistency detected");
} }
russh::Error::NotAuthenticated => { russh::Error::NotAuthenticated => {
warn!(self.logger, "Authentication failed"); tracing::warn!("Authentication failed");
} }
russh::Error::IO(ref io_err) => { russh::Error::IO(ref io_err) => {
let error_msg = format!( tracing::warn!(
"IO error: kind={:?}, message={}, raw_os_error={:?}", "SSH IO error kind={:?} message={} raw_os_error={:?}",
io_err.kind(), io_err.kind(),
io_err, io_err,
io_err.raw_os_error() io_err.raw_os_error()
); );
warn!(self.logger, "{}", error_msg);
if io_err.kind() == io::ErrorKind::UnexpectedEof { if io_err.kind() == io::ErrorKind::UnexpectedEof {
warn!( tracing::warn!("Client disconnected during handshake or before authentication");
self.logger,
"Client disconnected during handshake or before authentication"
);
} }
} }
_ => { _ => {
let error_msg = format!("SSH session error: {}", error); tracing::warn!("SSH session error error={}", error);
warn!(self.logger, "{}", error_msg);
} }
} }
} }