166 lines
5.7 KiB
Rust
166 lines
5.7 KiB
Rust
use std::sync::Arc;
|
|
|
|
use cache::AppCache;
|
|
use tokio_stream::wrappers::ReceiverStream;
|
|
use tonic::{Request, Response, Status};
|
|
|
|
use crate::rpc::{
|
|
error::{spawn_blocking_error, to_status},
|
|
proto as p,
|
|
registry::RepoRegistry,
|
|
};
|
|
|
|
pub struct BlobServiceImpl {
|
|
pub registry: Arc<RepoRegistry>,
|
|
pub cache: AppCache,
|
|
}
|
|
|
|
type BlobChunkStream = ReceiverStream<Result<p::BlobChunk, Status>>;
|
|
|
|
#[tonic::async_trait]
|
|
impl p::blob_service_server::BlobService for BlobServiceImpl {
|
|
async fn blob_load(
|
|
&self,
|
|
req: Request<p::BlobLoadRequest>,
|
|
) -> Result<Response<p::BlobLoadResponse>, Status> {
|
|
let inner = req.into_inner();
|
|
let repo_id = inner.repo_id.clone();
|
|
let oid_str = inner.id.clone().map(|o| o.value).unwrap_or_default();
|
|
let path = inner.path.clone();
|
|
let cache_key =
|
|
format!("git:rpc:cache:blob:load:{}:{}:{}", repo_id, oid_str, path);
|
|
|
|
if let Ok(Some(cached)) =
|
|
self.cache.get::<p::BlobLoadResponse>(&cache_key).await
|
|
{
|
|
return Ok(Response::new(cached));
|
|
}
|
|
|
|
let bare = self.registry.get(&repo_id).await?;
|
|
let params = crate::cmd::blob::BlobLoadParams {
|
|
id: inner.id.unwrap_or_default().into(),
|
|
path: inner.path.clone(),
|
|
};
|
|
let params_clone = params.clone();
|
|
let result =
|
|
tokio::task::spawn_blocking(move || bare.blob_load(¶ms_clone))
|
|
.await
|
|
.map_err(spawn_blocking_error)?
|
|
.map_err(to_status)?;
|
|
let resp = p::BlobLoadResponse { blob: result.blob };
|
|
let _ = self.cache.set(&cache_key, &resp).await;
|
|
Ok(Response::new(resp))
|
|
}
|
|
|
|
async fn blob_size(
|
|
&self,
|
|
req: Request<p::BlobSizeRequest>,
|
|
) -> Result<Response<p::BlobSizeResponse>, Status> {
|
|
let inner = req.into_inner();
|
|
let bare = self.registry.get(&inner.repo_id).await?;
|
|
let params = crate::cmd::blob::BlobSizeParams {
|
|
id: inner.id.unwrap_or_default().into(),
|
|
path: inner.path.clone(),
|
|
};
|
|
let result =
|
|
tokio::task::spawn_blocking(move || bare.blob_size(¶ms))
|
|
.await
|
|
.map_err(spawn_blocking_error)?
|
|
.map_err(to_status)?;
|
|
Ok(Response::new(p::BlobSizeResponse { size: result }))
|
|
}
|
|
|
|
async fn blob_exists(
|
|
&self,
|
|
req: Request<p::BlobExistsRequest>,
|
|
) -> Result<Response<p::BlobExistsResponse>, Status> {
|
|
let inner = req.into_inner();
|
|
let bare = self.registry.get(&inner.repo_id).await?;
|
|
let oid: crate::cmd::oid::ObjectId =
|
|
inner.id.unwrap_or_default().into();
|
|
let result = tokio::task::spawn_blocking(move || bare.blob_exists(oid))
|
|
.await
|
|
.map_err(spawn_blocking_error)?
|
|
.map_err(to_status)?;
|
|
Ok(Response::new(p::BlobExistsResponse { exists: result }))
|
|
}
|
|
|
|
async fn blob_is_binary(
|
|
&self,
|
|
req: Request<p::BlobIsBinaryRequest>,
|
|
) -> Result<Response<p::BlobIsBinaryResponse>, Status> {
|
|
let inner = req.into_inner();
|
|
let repo_id = inner.repo_id.clone();
|
|
let oid_str = inner.id.clone().map(|o| o.value).unwrap_or_default();
|
|
let cache_key =
|
|
format!("git:rpc:cache:blob:binary:{}:{}", repo_id, oid_str);
|
|
|
|
if let Ok(Some(cached)) =
|
|
self.cache.get::<p::BlobIsBinaryResponse>(&cache_key).await
|
|
{
|
|
return Ok(Response::new(cached));
|
|
}
|
|
|
|
let bare = self.registry.get(&repo_id).await?;
|
|
let oid: crate::cmd::oid::ObjectId =
|
|
inner.id.unwrap_or_default().into();
|
|
let result =
|
|
tokio::task::spawn_blocking(move || bare.blob_is_binary(oid))
|
|
.await
|
|
.map_err(spawn_blocking_error)?
|
|
.map_err(to_status)?;
|
|
let resp = p::BlobIsBinaryResponse { is_binary: result };
|
|
let _ = self.cache.set(&cache_key, &resp).await;
|
|
Ok(Response::new(resp))
|
|
}
|
|
|
|
async fn blob_upload(
|
|
&self,
|
|
req: Request<p::BlobUploadRequest>,
|
|
) -> Result<Response<p::BlobUploadResponse>, Status> {
|
|
let inner = req.into_inner();
|
|
let bare = self.registry.get(&inner.repo_id).await?;
|
|
let params = crate::cmd::blob::BlobUploadParams {
|
|
blob: inner.blob.clone(),
|
|
path: inner.path.clone(),
|
|
};
|
|
let result =
|
|
tokio::task::spawn_blocking(move || bare.blob_upload(params))
|
|
.await
|
|
.map_err(spawn_blocking_error)?
|
|
.map_err(to_status)?;
|
|
Ok(Response::new(p::BlobUploadResponse {
|
|
id: Some(result.id.into()),
|
|
}))
|
|
}
|
|
|
|
type BlobChunkStreamStream = BlobChunkStream;
|
|
|
|
async fn blob_chunk_stream(
|
|
&self,
|
|
req: Request<p::BlobChunkRequest>,
|
|
) -> Result<Response<BlobChunkStream>, Status> {
|
|
let inner = req.into_inner();
|
|
let bare = self.registry.get(&inner.repo_id).await?;
|
|
let param = crate::cmd::blob::BlobChunkParam {
|
|
path: inner.path.clone(),
|
|
oid: inner.oid.unwrap_or_default().into(),
|
|
size: inner.size as usize,
|
|
offset: inner.offset as usize,
|
|
};
|
|
let (tx, rx) = tokio::sync::mpsc::channel(4);
|
|
tokio::task::spawn_blocking(move || {
|
|
let result = bare.blob_chunk(param);
|
|
match result {
|
|
Ok(chunk) => {
|
|
let _ = tx.blocking_send(Ok(chunk.into()));
|
|
}
|
|
Err(e) => {
|
|
let _ = tx.blocking_send(Err(to_status(e)));
|
|
}
|
|
}
|
|
});
|
|
Ok(Response::new(ReceiverStream::new(rx)))
|
|
}
|
|
}
|