gitdataai/lib/git/sync/mod.rs

267 lines
7.8 KiB
Rust

use deadpool_redis::cluster::Pool as RedisPool;
use redis::AsyncCommands;
pub mod branch;
pub mod cicheck;
pub mod commit;
pub mod consumer;
pub mod language;
pub mod lfs;
pub mod lock;
pub mod push_queue;
pub mod tag;
pub mod webhook;
pub mod worker;
#[derive(Clone)]
pub struct ReceiveSyncService {
pool: RedisPool,
redis_prefix: String,
}
impl ReceiveSyncService {
pub fn new(pool: RedisPool) -> Self {
Self {
pool,
redis_prefix: "{hook}".to_string(),
}
}
pub fn pool(&self) -> RedisPool {
self.pool.clone()
}
pub async fn queue_position(
&self,
repo_uid: uuid::Uuid,
) -> Option<(usize, usize)> {
let queue_key = format!("{}:sync", self.redis_prefix);
let work_key = format!("{}:work", queue_key);
let redis = self.pool.get().await.ok()?;
let mut conn: deadpool_redis::cluster::Connection = redis;
let queue_items: Vec<String> =
conn.lrange(&queue_key, 0, -1).await.ok()?;
let work_items: Vec<String> =
conn.lrange(&work_key, 0, -1).await.unwrap_or_default();
let repo_id = repo_uid.to_string();
let queued_before = queue_items
.iter()
.rev()
.take_while(|item| {
serde_json::from_str::<HookTask>(item)
.map(|task| task.repo_id != repo_id)
.unwrap_or(true)
})
.count();
let total = work_items.len() + queue_items.len() + 1;
Some((work_items.len() + queued_before + 1, total))
}
pub fn push_queue_keys(repo_uid: uuid::Uuid) -> (String, String) {
let hash_tag = format!("{{push:{}}}", repo_uid);
(
format!("git:{}:queue", hash_tag),
format!("git:{}:lock", hash_tag),
)
}
pub async fn join_push_queue(
&self,
repo_uid: uuid::Uuid,
request_id: &str,
) -> redis::RedisResult<()> {
let (queue_key, _) = Self::push_queue_keys(repo_uid);
let redis = self.pool.get().await.map_err(|e| {
redis::RedisError::from((
redis::ErrorKind::Io,
"failed to get Redis connection",
e.to_string(),
))
})?;
let mut conn: deadpool_redis::cluster::Connection = redis;
redis::cmd("RPUSH")
.arg(&queue_key)
.arg(request_id)
.query_async::<()>(&mut conn)
.await
}
pub async fn push_queue_position(
&self,
repo_uid: uuid::Uuid,
request_id: &str,
) -> Option<(usize, usize)> {
let (queue_key, _) = Self::push_queue_keys(repo_uid);
let redis = self.pool.get().await.ok()?;
let mut conn: deadpool_redis::cluster::Connection = redis;
let queue_items: Vec<String> =
conn.lrange(&queue_key, 0, -1).await.ok()?;
let position =
queue_items.iter().position(|item| item == request_id)? + 1;
Some((position, queue_items.len()))
}
pub async fn try_acquire_push_lock(
&self,
repo_uid: uuid::Uuid,
request_id: &str,
ttl_secs: usize,
) -> redis::RedisResult<bool> {
let (_, lock_key) = Self::push_queue_keys(repo_uid);
let redis = self.pool.get().await.map_err(|e| {
redis::RedisError::from((
redis::ErrorKind::Io,
"failed to get Redis connection",
e.to_string(),
))
})?;
let mut conn: deadpool_redis::cluster::Connection = redis;
let acquired: Option<String> = redis::cmd("SET")
.arg(&lock_key)
.arg(request_id)
.arg("NX")
.arg("EX")
.arg(ttl_secs)
.query_async(&mut conn)
.await?;
Ok(acquired.is_some())
}
pub async fn release_push_queue(
&self,
repo_uid: uuid::Uuid,
request_id: &str,
) {
let (queue_key, lock_key) = Self::push_queue_keys(repo_uid);
let redis = match self.pool.get().await {
Ok(c) => c,
Err(e) => {
tracing::warn!(error = %e, repo_id = %repo_uid, "push_queue_release_redis_connection_failed");
return;
}
};
let mut conn: deadpool_redis::cluster::Connection = redis;
let script = redis::Script::new(
r#"
redis.call("LREM", KEYS[1], 0, ARGV[1])
if redis.call("GET", KEYS[2]) == ARGV[1] then
redis.call("DEL", KEYS[2])
end
return 1
"#,
);
if let Err(e) = script
.key(&queue_key)
.key(&lock_key)
.arg(request_id)
.invoke_async::<()>(&mut conn)
.await
{
tracing::warn!(error = %e, repo_id = %repo_uid, "push_queue_release_failed");
}
}
pub async fn refresh_push_lock(
&self,
repo_uid: uuid::Uuid,
request_id: &str,
ttl_secs: usize,
) -> redis::RedisResult<bool> {
let (_, lock_key) = Self::push_queue_keys(repo_uid);
let redis = self.pool.get().await.map_err(|e| {
redis::RedisError::from((
redis::ErrorKind::Io,
"failed to get Redis connection",
e.to_string(),
))
})?;
let mut conn: deadpool_redis::cluster::Connection = redis;
let refreshed: i32 = redis::Script::new(
r#"
if redis.call("GET", KEYS[1]) == ARGV[1] then
redis.call("EXPIRE", KEYS[1], ARGV[2])
return 1
end
return 0
"#,
)
.key(&lock_key)
.arg(request_id)
.arg(ttl_secs)
.invoke_async(&mut conn)
.await?;
Ok(refreshed == 1)
}
pub async fn send(
&self,
task: RepoReceiveSyncTask,
) -> Option<(usize, usize)> {
let position = self.queue_position(task.repo_uid).await;
let hook_task = HookTask {
id: uuid::Uuid::new_v4().to_string(),
repo_id: task.repo_uid.to_string(),
task_type: TaskType::Sync,
payload: serde_json::Value::Null,
created_at: chrono::Utc::now(),
retry_count: 0,
};
let task_json = match serde_json::to_string(&hook_task) {
Ok(j) => j,
Err(e) => {
tracing::error!("failed to serialize hook task: {}", e);
return position;
}
};
let queue_key = format!("{}:sync", self.redis_prefix);
let redis = match self.pool.get().await {
Ok(c) => c,
Err(e) => {
tracing::error!("failed to get Redis connection: {}", e);
return position;
}
};
let mut conn: deadpool_redis::cluster::Connection = redis;
if let Err(e) = redis::cmd("LPUSH")
.arg(&queue_key)
.arg(&task_json)
.query_async::<()>(&mut conn)
.await
{
tracing::error!(
"failed to enqueue sync task repo_id={} error={}",
task.repo_uid,
e
);
} else {
tracing::info!(repo_id = %task.repo_uid, "hook task queued to Redis");
}
position
}
}
#[derive(Clone)]
pub struct RepoReceiveSyncTask {
pub repo_uid: uuid::Uuid,
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct HookTask {
pub id: String,
pub repo_id: String,
pub task_type: TaskType,
pub payload: serde_json::Value,
pub created_at: chrono::DateTime<chrono::Utc>,
pub retry_count: usize,
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub enum TaskType {
Sync,
Fsck,
Gc,
Webhook,
}