use crate::ssh::ReceiveSyncService; use std::fmt; use std::time::{Duration, Instant}; use tokio::task::JoinHandle; use tokio::time::sleep; pub const PUSH_QUEUE_TIMEOUT: Duration = Duration::from_secs(120); pub const PUSH_LOCK_TTL_SECS: usize = 300; #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct PushQueuePosition { pub position: usize, pub total: usize, } #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum PushQueueEvent { Waiting(PushQueuePosition), Acquired, } #[derive(Debug)] pub enum PushQueueWaitError { Join(redis::RedisError), Lock(redis::RedisError), Timeout, } impl fmt::Display for PushQueueWaitError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::Join(e) => write!(f, "failed to join push queue: {e}"), Self::Lock(e) => write!(f, "failed to acquire push queue lock: {e}"), Self::Timeout => write!(f, "push queue timed out"), } } } impl std::error::Error for PushQueueWaitError {} pub struct PushQueueLease { service: ReceiveSyncService, repo_uid: uuid::Uuid, request_id: String, heartbeat: Option>, released: bool, } impl PushQueueLease { fn new(service: ReceiveSyncService, repo_uid: uuid::Uuid, request_id: String) -> Self { let heartbeat = Some(start_lock_heartbeat( service.clone(), repo_uid, request_id.clone(), )); Self { service, repo_uid, request_id, heartbeat, released: false, } } pub fn request_id(&self) -> &str { &self.request_id } pub async fn release(&mut self) { if self.released { return; } self.service .release_push_queue(self.repo_uid, &self.request_id) .await; if let Some(heartbeat) = self.heartbeat.take() { heartbeat.abort(); } self.released = true; } } impl Drop for PushQueueLease { fn drop(&mut self) { if self.released { return; } if let Some(heartbeat) = self.heartbeat.take() { heartbeat.abort(); } let service = self.service.clone(); let repo_uid = self.repo_uid; let request_id = self.request_id.clone(); tokio::spawn(async move { service.release_push_queue(repo_uid, &request_id).await; }); } } fn start_lock_heartbeat( service: ReceiveSyncService, repo_uid: uuid::Uuid, request_id: String, ) -> JoinHandle<()> { tokio::spawn(async move { let interval = Duration::from_secs((PUSH_LOCK_TTL_SECS as u64 / 3).max(30)); loop { sleep(interval).await; match service .refresh_push_lock(repo_uid, &request_id, PUSH_LOCK_TTL_SECS) .await { Ok(true) => {} Ok(false) => { tracing::warn!( repo_id = %repo_uid, request_id = %request_id, "push_queue_lock_lost" ); break; } Err(e) => { tracing::warn!( error = %e, repo_id = %repo_uid, request_id = %request_id, "push_queue_lock_refresh_failed" ); } } } }) } pub async fn wait_for_push_queue_slot( service: ReceiveSyncService, repo_uid: uuid::Uuid, mut on_event: F, ) -> Result where F: FnMut(PushQueueEvent, &str), { let request_id = uuid::Uuid::new_v4().to_string(); service .join_push_queue(repo_uid, &request_id) .await .map_err(PushQueueWaitError::Join)?; let deadline = Instant::now() + PUSH_QUEUE_TIMEOUT; let mut last_position = None; loop { let position = service.push_queue_position(repo_uid, &request_id).await; if let Some((position, total)) = position { let position = PushQueuePosition { position, total }; if last_position != Some(position) && position.position > 1 { on_event(PushQueueEvent::Waiting(position), &request_id); } last_position = Some(position); if position.position == 1 { match service .try_acquire_push_lock(repo_uid, &request_id, PUSH_LOCK_TTL_SECS) .await { Ok(true) => { on_event(PushQueueEvent::Acquired, &request_id); return Ok(PushQueueLease::new(service, repo_uid, request_id)); } Ok(false) => {} Err(e) => { service.release_push_queue(repo_uid, &request_id).await; return Err(PushQueueWaitError::Lock(e)); } } } } if Instant::now() >= deadline { service.release_push_queue(repo_uid, &request_id).await; return Err(PushQueueWaitError::Timeout); } sleep(Duration::from_secs(1)).await; } }