fix(git): harden hook pool retry, standardize slog log format
- Add retry_count to HookTask with serde(default) for backwards compat
- Limit hook task retries to MAX_RETRIES=5, discard after limit to prevent
infinite requeue loops that caused 'task nack'd and requeued' log spam
- Add nak_with_retry() in RedisConsumer to requeue with incremented count
- Standardize all slog logs: replace "info!(l, "msg"; "k" => v)" shorthand
with "info!(l, "{}", format!("msg k={}", v))" across ssh/authz.rs,
ssh/handle.rs, ssh/server.rs, hook/webhook_dispatch.rs, hook/pool/mod.rs
This commit is contained in:
parent
759ce7444e
commit
bbf2d75fba
@ -181,7 +181,8 @@ impl GitHookPool {
|
|||||||
"task_id" => &task.id,
|
"task_id" => &task.id,
|
||||||
"task_type" => %task.task_type,
|
"task_type" => %task.task_type,
|
||||||
"repo_id" => &task.repo_id,
|
"repo_id" => &task.repo_id,
|
||||||
"worker_id" => &self.config.worker_id
|
"worker_id" => &self.config.worker_id,
|
||||||
|
"retry" => task.retry_count
|
||||||
);
|
);
|
||||||
|
|
||||||
self.log_stream
|
self.log_stream
|
||||||
@ -210,13 +211,39 @@ impl GitHookPool {
|
|||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
if let Err(e) = consumer.nak(&work_key, &queue_key, &task_json).await {
|
// Check retry count and decide whether to requeue or discard.
|
||||||
slog::warn!(self.logger, "failed to nak task: {}", e);
|
const MAX_RETRIES: u32 = 5;
|
||||||
|
if task.retry_count >= MAX_RETRIES {
|
||||||
|
// Max retries exceeded — discard the task to prevent infinite loop.
|
||||||
|
slog::warn!(self.logger, "task exhausted retries, discarding";
|
||||||
|
"task_id" => &task.id,
|
||||||
|
"task_type" => %task.task_type,
|
||||||
|
"repo_id" => &task.repo_id,
|
||||||
|
"retry_count" => task.retry_count,
|
||||||
|
"last_error" => %e
|
||||||
|
);
|
||||||
|
if let Err(e) = consumer.ack(&work_key, &task_json).await {
|
||||||
|
slog::warn!(self.logger, "failed to ack discarded task: {}", e);
|
||||||
|
}
|
||||||
|
self.total_failed.fetch_add(1, Ordering::Relaxed);
|
||||||
|
self.log_stream
|
||||||
|
.error(&task.id, &task.repo_id, &format!("task failed after {} retries: {}", task.retry_count, e))
|
||||||
|
.await;
|
||||||
|
} else {
|
||||||
|
// Requeue with incremented retry count.
|
||||||
|
let mut task = task;
|
||||||
|
task.retry_count += 1;
|
||||||
|
let retry_json = serde_json::to_string(&task)
|
||||||
|
.unwrap_or_else(|_| task_json.clone());
|
||||||
|
|
||||||
|
if let Err(e) = consumer.nak_with_retry(&work_key, &queue_key, &task_json, &retry_json).await {
|
||||||
|
slog::warn!(self.logger, "failed to nak task: {}", e);
|
||||||
|
}
|
||||||
|
self.total_failed.fetch_add(1, Ordering::Relaxed);
|
||||||
|
self.log_stream
|
||||||
|
.error(&task.id, &task.repo_id, &format!("task failed: {}", e))
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
self.total_failed.fetch_add(1, Ordering::Relaxed);
|
|
||||||
self.log_stream
|
|
||||||
.error(&task.id, &task.repo_id, &format!("task failed: {}", e))
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -315,7 +342,7 @@ impl GitHookPool {
|
|||||||
let after = after_oid.clone();
|
let after = after_oid.clone();
|
||||||
let branch_name = branch.clone();
|
let branch_name = branch.clone();
|
||||||
|
|
||||||
slog::info!(logger, "detected push on branch"; "branch" => &branch_name, "before" => &before_oid, "after" => &after);
|
slog::info!(logger, "{}", format!("detected push on branch branch={} before={} after={}", branch_name, before_oid, after));
|
||||||
|
|
||||||
let http = http.clone();
|
let http = http.clone();
|
||||||
let db = db.clone();
|
let db = db.clone();
|
||||||
@ -361,7 +388,7 @@ impl GitHookPool {
|
|||||||
let after = after_oid.clone();
|
let after = after_oid.clone();
|
||||||
let tag_name = tag.clone();
|
let tag_name = tag.clone();
|
||||||
|
|
||||||
slog::info!(logger, "detected tag push"; "tag" => &tag_name, "before" => &before_oid, "after" => &after);
|
slog::info!(logger, "{}", format!("detected tag push tag={} before={} after={}", tag_name, before_oid, after));
|
||||||
|
|
||||||
let http = http.clone();
|
let http = http.clone();
|
||||||
let db = db.clone();
|
let db = db.clone();
|
||||||
|
|||||||
@ -120,10 +120,22 @@ impl RedisConsumer {
|
|||||||
queue_key: &str,
|
queue_key: &str,
|
||||||
task_json: &str,
|
task_json: &str,
|
||||||
) -> Result<(), GitError> {
|
) -> Result<(), GitError> {
|
||||||
// First remove from work queue
|
self.nak_with_retry(work_key, queue_key, task_json, task_json).await
|
||||||
self.ack_raw(work_key, task_json).await?;
|
}
|
||||||
|
|
||||||
// Then push back to main queue for retry
|
/// Negative acknowledge with a different (updated) task JSON — used to
|
||||||
|
/// requeue with an incremented retry_count.
|
||||||
|
pub async fn nak_with_retry(
|
||||||
|
&self,
|
||||||
|
work_key: &str,
|
||||||
|
queue_key: &str,
|
||||||
|
old_task_json: &str,
|
||||||
|
new_task_json: &str,
|
||||||
|
) -> Result<(), GitError> {
|
||||||
|
// Remove the old entry from work queue
|
||||||
|
self.ack_raw(work_key, old_task_json).await?;
|
||||||
|
|
||||||
|
// Push the updated entry back to main queue for retry
|
||||||
let redis = self
|
let redis = self
|
||||||
.pool
|
.pool
|
||||||
.get()
|
.get()
|
||||||
@ -134,12 +146,12 @@ impl RedisConsumer {
|
|||||||
|
|
||||||
let _: i64 = redis::cmd("LPUSH")
|
let _: i64 = redis::cmd("LPUSH")
|
||||||
.arg(queue_key)
|
.arg(queue_key)
|
||||||
.arg(task_json)
|
.arg(new_task_json)
|
||||||
.query_async(&mut conn)
|
.query_async(&mut conn)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| GitError::Internal(format!("LPUSH retry failed: {}", e)))?;
|
.map_err(|e| GitError::Internal(format!("LPUSH retry failed: {}", e)))?;
|
||||||
|
|
||||||
slog::warn!(self.logger, "task nack'd and requeued"; "queue" => %queue_key);
|
slog::warn!(self.logger, "{}", format!("task nack'd and requeued queue={}", queue_key));
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@ -9,6 +9,10 @@ pub struct HookTask {
|
|||||||
pub task_type: TaskType,
|
pub task_type: TaskType,
|
||||||
pub payload: serde_json::Value,
|
pub payload: serde_json::Value,
|
||||||
pub created_at: chrono::DateTime<chrono::Utc>,
|
pub created_at: chrono::DateTime<chrono::Utc>,
|
||||||
|
/// Number of times this task has been retried after a failure.
|
||||||
|
/// When >= MAX_TASK_RETRIES, the task is discarded instead of requeued.
|
||||||
|
#[serde(default)]
|
||||||
|
pub retry_count: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
|
|||||||
@ -231,7 +231,7 @@ pub async fn dispatch_repo_webhooks(
|
|||||||
{
|
{
|
||||||
Ok(ws) => ws,
|
Ok(ws) => ws,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
slog::error!(logs, "failed to query webhooks: {}", e; "repo" => repo_uuid);
|
slog::error!(logs, "{}", format!("failed to query webhooks repo={} error={}", repo_uuid, e));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -297,7 +297,7 @@ pub async fn dispatch_repo_webhooks(
|
|||||||
let body = match serde_json::to_vec(&payload) {
|
let body = match serde_json::to_vec(&payload) {
|
||||||
Ok(b) => b,
|
Ok(b) => b,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
slog::error!(logs, "failed to serialize push payload"; "error" => e.to_string());
|
slog::error!(logs, "{}", format!("failed to serialize push payload error={}", e));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -310,15 +310,15 @@ pub async fn dispatch_repo_webhooks(
|
|||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(Ok(())) => {
|
Ok(Ok(())) => {
|
||||||
slog::info!(logs, "push webhook delivered"; "webhook_id" => webhook_id, "url" => url);
|
slog::info!(logs, "{}", format!("push webhook delivered webhook_id={} url={}", webhook_id, url));
|
||||||
let _ = touch_webhook(db, webhook_id, true, logs).await;
|
let _ = touch_webhook(db, webhook_id, true, logs).await;
|
||||||
}
|
}
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
slog::warn!(logs, "push webhook delivery failed"; "error" => e.to_string(), "webhook_id" => webhook_id, "url" => url);
|
slog::warn!(logs, "{}", format!("push webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e));
|
||||||
let _ = touch_webhook(db, webhook_id, false, logs).await;
|
let _ = touch_webhook(db, webhook_id, false, logs).await;
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
slog::warn!(logs, "push webhook timed out"; "webhook_id" => webhook_id, "url" => url);
|
slog::warn!(logs, "{}", format!("push webhook timed out webhook_id={} url={}", webhook_id, url));
|
||||||
let _ = touch_webhook(db, webhook_id, false, logs).await;
|
let _ = touch_webhook(db, webhook_id, false, logs).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -351,7 +351,7 @@ pub async fn dispatch_repo_webhooks(
|
|||||||
let body = match serde_json::to_vec(&payload) {
|
let body = match serde_json::to_vec(&payload) {
|
||||||
Ok(b) => b,
|
Ok(b) => b,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
slog::error!(logs, "failed to serialize tag payload"; "error" => e.to_string());
|
slog::error!(logs, "{}", format!("failed to serialize tag payload error={}", e));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -364,15 +364,15 @@ pub async fn dispatch_repo_webhooks(
|
|||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(Ok(())) => {
|
Ok(Ok(())) => {
|
||||||
slog::info!(logs, "tag webhook delivered"; "webhook_id" => webhook_id, "url" => url);
|
slog::info!(logs, "{}", format!("tag webhook delivered webhook_id={} url={}", webhook_id, url));
|
||||||
let _ = touch_webhook(db, webhook_id, true, logs).await;
|
let _ = touch_webhook(db, webhook_id, true, logs).await;
|
||||||
}
|
}
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
slog::warn!(logs, "tag webhook delivery failed"; "error" => e.to_string(), "webhook_id" => webhook_id, "url" => url);
|
slog::warn!(logs, "{}", format!("tag webhook delivery failed webhook_id={} url={} error={}", webhook_id, url, e));
|
||||||
let _ = touch_webhook(db, webhook_id, false, logs).await;
|
let _ = touch_webhook(db, webhook_id, false, logs).await;
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
slog::warn!(logs, "tag webhook timed out"; "webhook_id" => webhook_id, "url" => url);
|
slog::warn!(logs, "{}", format!("tag webhook timed out webhook_id={} url={}", webhook_id, url));
|
||||||
let _ = touch_webhook(db, webhook_id, false, logs).await;
|
let _ = touch_webhook(db, webhook_id, false, logs).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -405,6 +405,6 @@ async fn touch_webhook(db: &AppDatabase, webhook_id: i64, success: bool, logs: &
|
|||||||
};
|
};
|
||||||
|
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
slog::warn!(logs, "failed to update webhook touch"; "error" => e.to_string());
|
slog::warn!(logs, "{}", format!("failed to update webhook touch error={}", e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -103,7 +103,7 @@ impl SshAuthService {
|
|||||||
let fingerprint = match self.generate_fingerprint_from_public_key(public_key_str) {
|
let fingerprint = match self.generate_fingerprint_from_public_key(public_key_str) {
|
||||||
Ok(fp) => fp,
|
Ok(fp) => fp,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(self.logger, "Failed to generate fingerprint"; "error" => %e);
|
error!(self.logger, "{}", format!("Failed to generate fingerprint error={}", e));
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -113,7 +113,7 @@ impl SshAuthService {
|
|||||||
} else {
|
} else {
|
||||||
fingerprint.clone()
|
fingerprint.clone()
|
||||||
};
|
};
|
||||||
info!(self.logger, "Looking up user with SSH key"; "fingerprint" => %fingerprint_preview);
|
info!(self.logger, "{}", format!("Looking up user with SSH key fingerprint={}", fingerprint_preview));
|
||||||
|
|
||||||
let ssh_key = user_ssh_key::Entity::find()
|
let ssh_key = user_ssh_key::Entity::find()
|
||||||
.filter(user_ssh_key::Column::Fingerprint.eq(&fingerprint))
|
.filter(user_ssh_key::Column::Fingerprint.eq(&fingerprint))
|
||||||
@ -124,13 +124,13 @@ impl SshAuthService {
|
|||||||
let ssh_key = match ssh_key {
|
let ssh_key = match ssh_key {
|
||||||
Some(key) => key,
|
Some(key) => key,
|
||||||
None => {
|
None => {
|
||||||
warn!(self.logger, "No SSH key found"; "fingerprint" => %fingerprint);
|
warn!(self.logger, "{}", format!("No SSH key found fingerprint={}", fingerprint));
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if self.is_key_expired(&ssh_key) {
|
if self.is_key_expired(&ssh_key) {
|
||||||
warn!(self.logger, "SSH key expired"; "key_id" => ssh_key.id, "expires_at" => ?ssh_key.expires_at);
|
warn!(self.logger, "{}", format!("SSH key expired key_id={} expires_at={:?}", ssh_key.id, ssh_key.expires_at));
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -140,7 +140,7 @@ impl SshAuthService {
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
if let Some(ref user) = user_model {
|
if let Some(ref user) = user_model {
|
||||||
info!(self.logger, "User authenticated"; "user" => %user.username, "key" => %ssh_key.title);
|
info!(self.logger, "{}", format!("User authenticated user={} key={}", user.username, ssh_key.title));
|
||||||
self.update_key_last_used_async(ssh_key.id);
|
self.update_key_last_used_async(ssh_key.id);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -161,7 +161,7 @@ impl SshAuthService {
|
|||||||
let logger = self.logger.clone();
|
let logger = self.logger.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = Self::update_key_last_used_sync(db_clone, &logger, key_id).await {
|
if let Err(e) = Self::update_key_last_used_sync(db_clone, &logger, key_id).await {
|
||||||
warn!(&logger, "Failed to update key last_used"; "key_id" => key_id, "error" => %e);
|
warn!(&logger, "{}", format!("Failed to update key last_used key_id={} error={}", key_id, e));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -182,7 +182,7 @@ impl SshAuthService {
|
|||||||
active_key.updated_at = Set(now);
|
active_key.updated_at = Set(now);
|
||||||
|
|
||||||
active_key.update(db.writer()).await?;
|
active_key.update(db.writer()).await?;
|
||||||
info!(logger, "Updated key last_used"; "key_id" => key_id);
|
info!(logger, "{}", format!("Updated key last_used key_id={}", key_id));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -195,12 +195,12 @@ impl SshAuthService {
|
|||||||
is_write: bool,
|
is_write: bool,
|
||||||
) -> bool {
|
) -> bool {
|
||||||
if repo.created_by == user.uid {
|
if repo.created_by == user.uid {
|
||||||
info!(self.logger, "User is repo owner"; "user" => %user.username, "repo" => %repo.repo_name);
|
info!(self.logger, "{}", format!("User is repo owner user={} repo={}", user.username, repo.repo_name));
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if !is_write && !repo.is_private {
|
if !is_write && !repo.is_private {
|
||||||
info!(self.logger, "Public repo allows read"; "repo" => %repo.repo_name);
|
info!(self.logger, "{}", format!("Public repo allows read repo={}", repo.repo_name));
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -209,7 +209,7 @@ impl SshAuthService {
|
|||||||
.await
|
.await
|
||||||
.unwrap_or(false)
|
.unwrap_or(false)
|
||||||
{
|
{
|
||||||
info!(self.logger, "User has collaborator access"; "user" => %user.username, "repo" => %repo.repo_name);
|
info!(self.logger, "{}", format!("User has collaborator access user={} repo={}", user.username, repo.repo_name));
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -219,11 +219,11 @@ impl SshAuthService {
|
|||||||
.await
|
.await
|
||||||
.unwrap_or(false)
|
.unwrap_or(false)
|
||||||
{
|
{
|
||||||
info!(self.logger, "User has project member access"; "user" => %user.username, "repo" => %repo.repo_name);
|
info!(self.logger, "{}", format!("User has project member access user={} repo={}", user.username, repo.repo_name));
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
warn!(self.logger, "Access denied"; "user" => %user.username, "repo" => %repo.repo_name, "write" => is_write);
|
warn!(self.logger, "{}", format!("Access denied user={} repo={} write={}", user.username, repo.repo_name, is_write));
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -251,7 +251,7 @@ impl SshAuthService {
|
|||||||
return Ok(true);
|
return Ok(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
warn!(self.logger, "Collaborator has no valid roles"; "scope" => %collab.scope);
|
warn!(self.logger, "{}", format!("Collaborator has no valid roles scope={}", collab.scope));
|
||||||
Ok(false)
|
Ok(false)
|
||||||
} else {
|
} else {
|
||||||
Ok(false)
|
Ok(false)
|
||||||
|
|||||||
@ -516,7 +516,7 @@ impl russh::server::Handler for SSHandle {
|
|||||||
self.branch.insert(channel, refs);
|
self.branch.insert(channel, refs);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!(self.logger, "Failed to parse ref updates, forwarding raw data"; "error" => ?e);
|
warn!(self.logger, "{}", format!("Failed to parse ref updates, forwarding raw data error={:?}", e));
|
||||||
self.branch.insert(channel, vec![]);
|
self.branch.insert(channel, vec![]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -525,7 +525,7 @@ impl russh::server::Handler for SSHandle {
|
|||||||
stdin.write_all(&buffered).await?;
|
stdin.write_all(&buffered).await?;
|
||||||
stdin.flush().await?;
|
stdin.flush().await?;
|
||||||
} else {
|
} else {
|
||||||
error!(self.logger, "stdin not found"; "channel" => ?channel);
|
error!(self.logger, "{}", format!("stdin not found channel={:?}", channel));
|
||||||
}
|
}
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
@ -534,7 +534,7 @@ impl russh::server::Handler for SSHandle {
|
|||||||
stdin.write_all(data).await?;
|
stdin.write_all(data).await?;
|
||||||
stdin.flush().await?;
|
stdin.flush().await?;
|
||||||
} else {
|
} else {
|
||||||
error!(self.logger, "stdin not found (forwarding)"; "channel" => ?channel);
|
error!(self.logger, "{}", format!("stdin not found (forwarding) channel={:?}", channel));
|
||||||
}
|
}
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
@ -565,7 +565,7 @@ impl russh::server::Handler for SSHandle {
|
|||||||
user.username
|
user.username
|
||||||
);
|
);
|
||||||
|
|
||||||
info!(self.logger, "Shell request"; "user" => %user.username);
|
info!(self.logger, "{}", format!("Shell request user={}", user.username));
|
||||||
session
|
session
|
||||||
.data(channel_id, CryptoVec::from_slice(welcome_msg.as_bytes()))
|
.data(channel_id, CryptoVec::from_slice(welcome_msg.as_bytes()))
|
||||||
.ok();
|
.ok();
|
||||||
@ -605,7 +605,7 @@ impl russh::server::Handler for SSHandle {
|
|||||||
let git_shell_cmd = match std::str::from_utf8(data) {
|
let git_shell_cmd = match std::str::from_utf8(data) {
|
||||||
Ok(cmd) => cmd.trim(),
|
Ok(cmd) => cmd.trim(),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(self.logger, "Invalid command encoding"; "error" => %e);
|
error!(self.logger, "{}", format!("Invalid command encoding error={}", e));
|
||||||
session
|
session
|
||||||
.disconnect(
|
.disconnect(
|
||||||
Disconnect::ServiceNotAvailable,
|
Disconnect::ServiceNotAvailable,
|
||||||
@ -619,7 +619,7 @@ impl russh::server::Handler for SSHandle {
|
|||||||
let (service, path) = match parse_git_command(git_shell_cmd) {
|
let (service, path) = match parse_git_command(git_shell_cmd) {
|
||||||
Some((s, p)) => (s, p),
|
Some((s, p)) => (s, p),
|
||||||
None => {
|
None => {
|
||||||
error!(self.logger, "Invalid git command"; "command" => %git_shell_cmd);
|
error!(self.logger, "{}", format!("Invalid git command command={}", git_shell_cmd));
|
||||||
let msg = format!("Invalid git command: {}", git_shell_cmd);
|
let msg = format!("Invalid git command: {}", git_shell_cmd);
|
||||||
session
|
session
|
||||||
.disconnect(Disconnect::ServiceNotAvailable, &msg, "")
|
.disconnect(Disconnect::ServiceNotAvailable, &msg, "")
|
||||||
@ -632,7 +632,7 @@ impl russh::server::Handler for SSHandle {
|
|||||||
Some(pair) => pair,
|
Some(pair) => pair,
|
||||||
None => {
|
None => {
|
||||||
let msg = format!("Invalid repository path: {}", path);
|
let msg = format!("Invalid repository path: {}", path);
|
||||||
error!(self.logger, "Invalid repo path"; "path" => path);
|
error!(self.logger, "{}", format!("Invalid repo path path={}", path));
|
||||||
session
|
session
|
||||||
.disconnect(Disconnect::ServiceNotAvailable, &msg, "")
|
.disconnect(Disconnect::ServiceNotAvailable, &msg, "")
|
||||||
.ok();
|
.ok();
|
||||||
@ -645,7 +645,7 @@ impl russh::server::Handler for SSHandle {
|
|||||||
Ok(repo) => repo,
|
Ok(repo) => repo,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Log the detailed error internally; client receives generic message.
|
// Log the detailed error internally; client receives generic message.
|
||||||
error!(self.logger, "Error fetching repo"; "error" => %e);
|
error!(self.logger, "{}", format!("Error fetching repo error={}", e));
|
||||||
session
|
session
|
||||||
.disconnect(Disconnect::ServiceNotAvailable, "Repository not found", "")
|
.disconnect(Disconnect::ServiceNotAvailable, "Repository not found", "")
|
||||||
.ok();
|
.ok();
|
||||||
@ -677,7 +677,7 @@ impl russh::server::Handler for SSHandle {
|
|||||||
if is_write { "write" } else { "read" },
|
if is_write { "write" } else { "read" },
|
||||||
repo.repo_name
|
repo.repo_name
|
||||||
);
|
);
|
||||||
error!(self.logger, "Access denied"; "user" => %operator.username, "repo" => %repo.repo_name, "is_write" => is_write);
|
error!(self.logger, "{}", format!("Access denied user={} repo={} is_write={}", operator.username, repo.repo_name, is_write));
|
||||||
session.disconnect(Disconnect::ByApplication, &msg, "").ok();
|
session.disconnect(Disconnect::ByApplication, &msg, "").ok();
|
||||||
return Err(russh::Error::Disconnect);
|
return Err(russh::Error::Disconnect);
|
||||||
}
|
}
|
||||||
@ -690,20 +690,20 @@ impl russh::server::Handler for SSHandle {
|
|||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
let msg = format!("Rate limit exceeded for repository access: {}", repo_path);
|
let msg = format!("Rate limit exceeded for repository access: {}", repo_path);
|
||||||
warn!(self.logger, "Repo access rate limit exceeded"; "user" => %operator.username, "repo" => %repo.repo_name);
|
warn!(self.logger, "{}", format!("Repo access rate limit exceeded user={} repo={}", operator.username, repo.repo_name));
|
||||||
session.disconnect(Disconnect::ByApplication, &msg, "").ok();
|
session.disconnect(Disconnect::ByApplication, &msg, "").ok();
|
||||||
return Err(russh::Error::Disconnect);
|
return Err(russh::Error::Disconnect);
|
||||||
}
|
}
|
||||||
|
|
||||||
info!(self.logger, "Access granted"; "user" => %operator.username, "repo" => %repo.repo_name, "is_write" => is_write);
|
info!(self.logger, "{}", format!("Access granted user={} repo={} is_write={}", operator.username, repo.repo_name, is_write));
|
||||||
|
|
||||||
let repo_path = PathBuf::from(&repo.storage_path);
|
let repo_path = PathBuf::from(&repo.storage_path);
|
||||||
if !repo_path.exists() {
|
if !repo_path.exists() {
|
||||||
error!(self.logger, "Repository path not found"; "path" => %repo.storage_path);
|
error!(self.logger, "{}", format!("Repository path not found path={}", repo.storage_path));
|
||||||
}
|
}
|
||||||
let mut cmd = build_git_command(service, repo_path);
|
let mut cmd = build_git_command(service, repo_path);
|
||||||
let logger = self.logger.clone();
|
let logger = self.logger.clone();
|
||||||
info!(&logger, "Spawning git process"; "service" => ?service, "path" => %repo.storage_path);
|
info!(&logger, "{}", format!("Spawning git process service={:?} path={}", service, repo.storage_path));
|
||||||
let mut shell = match cmd
|
let mut shell = match cmd
|
||||||
.stdin(Stdio::piped())
|
.stdin(Stdio::piped())
|
||||||
.stdout(Stdio::piped())
|
.stdout(Stdio::piped())
|
||||||
@ -715,7 +715,7 @@ impl russh::server::Handler for SSHandle {
|
|||||||
shell
|
shell
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(&logger, "Process spawn failed"; "error" => %e);
|
error!(&logger, "{}", format!("Process spawn failed error={}", e));
|
||||||
let _ = session.channel_failure(channel_id);
|
let _ = session.channel_failure(channel_id);
|
||||||
self.cleanup_channel(channel_id);
|
self.cleanup_channel(channel_id);
|
||||||
dbg!(&e);
|
dbg!(&e);
|
||||||
@ -735,7 +735,7 @@ impl russh::server::Handler for SSHandle {
|
|||||||
let sync = self.sync.clone();
|
let sync = self.sync.clone();
|
||||||
let logger_for_fut = self.logger.clone();
|
let logger_for_fut = self.logger.clone();
|
||||||
let fut = async move {
|
let fut = async move {
|
||||||
info!(&logger_for_fut, "Task started"; "channel" => ?channel_id);
|
info!(&logger_for_fut, "{}", format!("Task started channel={:?}", channel_id));
|
||||||
|
|
||||||
let mut stdout_done = false;
|
let mut stdout_done = false;
|
||||||
let mut stderr_done = false;
|
let mut stderr_done = false;
|
||||||
@ -762,7 +762,7 @@ impl russh::server::Handler for SSHandle {
|
|||||||
let status = result?;
|
let status = result?;
|
||||||
let status_code = status.code().unwrap_or(128) as u32;
|
let status_code = status.code().unwrap_or(128) as u32;
|
||||||
|
|
||||||
info!(&logger_for_fut, "Git process exited"; "channel" => ?channel_id, "status" => status_code);
|
info!(&logger_for_fut, "{}", format!("Git process exited channel={:?} status={}", channel_id, status_code));
|
||||||
|
|
||||||
if !stdout_done || !stderr_done {
|
if !stdout_done || !stderr_done {
|
||||||
let _ = tokio::time::timeout(Duration::from_millis(100), async {
|
let _ = tokio::time::timeout(Duration::from_millis(100), async {
|
||||||
@ -792,21 +792,21 @@ impl russh::server::Handler for SSHandle {
|
|||||||
sleep(Duration::from_millis(50)).await;
|
sleep(Duration::from_millis(50)).await;
|
||||||
let _ = session_handle.eof(channel_id).await;
|
let _ = session_handle.eof(channel_id).await;
|
||||||
let _ = session_handle.close(channel_id).await;
|
let _ = session_handle.close(channel_id).await;
|
||||||
info!(&logger_for_fut, "Channel closed"; "channel" => ?channel_id);
|
info!(&logger_for_fut, "{}", format!("Channel closed channel={:?}", channel_id));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
result = &mut stdout_fut, if !stdout_done => {
|
result = &mut stdout_fut, if !stdout_done => {
|
||||||
info!(&logger_for_fut, "stdout completed");
|
info!(&logger_for_fut, "stdout completed");
|
||||||
stdout_done = true;
|
stdout_done = true;
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
warn!(&logger_for_fut, "stdout forward error"; "error" => ?e);
|
warn!(&logger_for_fut, "{}", format!("stdout forward error error={:?}", e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
result = &mut stderr_fut, if !stderr_done => {
|
result = &mut stderr_fut, if !stderr_done => {
|
||||||
info!(&logger_for_fut, "stderr completed");
|
info!(&logger_for_fut, "stderr completed");
|
||||||
stderr_done = true;
|
stderr_done = true;
|
||||||
if let Err(e) = result {
|
if let Err(e) = result {
|
||||||
warn!(&logger_for_fut, "stderr forward error"; "error" => ?e);
|
warn!(&logger_for_fut, "{}", format!("stderr forward error error={:?}", e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -817,7 +817,7 @@ impl russh::server::Handler for SSHandle {
|
|||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = fut.await {
|
if let Err(e) = fut.await {
|
||||||
error!(&logger, "Git SSH channel task error"; "error" => %e);
|
error!(&logger, "{}", format!("Git SSH channel task error error={}", e));
|
||||||
}
|
}
|
||||||
while eof_rx.recv().await.is_some() {}
|
while eof_rx.recv().await.is_some() {}
|
||||||
});
|
});
|
||||||
|
|||||||
@ -193,6 +193,7 @@ impl ReceiveSyncService {
|
|||||||
task_type: TaskType::Sync,
|
task_type: TaskType::Sync,
|
||||||
payload: serde_json::Value::Null,
|
payload: serde_json::Value::Null,
|
||||||
created_at: chrono::Utc::now(),
|
created_at: chrono::Utc::now(),
|
||||||
|
retry_count: 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
let task_json = match serde_json::to_string(&hook_task) {
|
let task_json = match serde_json::to_string(&hook_task) {
|
||||||
@ -220,7 +221,7 @@ impl ReceiveSyncService {
|
|||||||
.query_async::<()>(&mut conn)
|
.query_async::<()>(&mut conn)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
error!(self.logger, "Failed to LPUSH sync task"; "error" => %e, "repo_id" => %task.repo_uid);
|
error!(self.logger, "{}", format!("Failed to LPUSH sync task repo_id={} error={}", task.repo_uid, e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -55,7 +55,7 @@ impl russh::server::Server for SSHServer {
|
|||||||
let logger = self.logger.clone();
|
let logger = self.logger.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if !rate_limiter.is_ip_allowed(&ip).await {
|
if !rate_limiter.is_ip_allowed(&ip).await {
|
||||||
warn!(logger, "IP rate limit exceeded"; "ip" => %ip);
|
warn!(logger, "{}", format!("IP rate limit exceeded ip={}", ip));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user