use std::sync::Arc; use cache::AppCache; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; use crate::rpc::{ error::{spawn_blocking_error, to_status}, proto as p, registry::RepoRegistry, }; pub struct BlobServiceImpl { pub registry: Arc, pub cache: AppCache, } type BlobChunkStream = ReceiverStream>; #[tonic::async_trait] impl p::blob_service_server::BlobService for BlobServiceImpl { async fn blob_load( &self, req: Request, ) -> Result, Status> { let inner = req.into_inner(); let repo_id = inner.repo_id.clone(); let oid_str = inner.id.clone().map(|o| o.value).unwrap_or_default(); let path = inner.path.clone(); let cache_key = format!("git:rpc:cache:blob:load:{}:{}:{}", repo_id, oid_str, path); if let Ok(Some(cached)) = self.cache.get::(&cache_key).await { return Ok(Response::new(cached)); } let bare = self.registry.get(&repo_id).await?; let params = crate::cmd::blob::BlobLoadParams { id: inner.id.unwrap_or_default().into(), path: inner.path.clone(), }; let params_clone = params.clone(); let result = tokio::task::spawn_blocking(move || bare.blob_load(¶ms_clone)) .await .map_err(spawn_blocking_error)? .map_err(to_status)?; let resp = p::BlobLoadResponse { blob: result.blob }; let _ = self.cache.set(&cache_key, &resp).await; Ok(Response::new(resp)) } async fn blob_size( &self, req: Request, ) -> Result, Status> { let inner = req.into_inner(); let bare = self.registry.get(&inner.repo_id).await?; let params = crate::cmd::blob::BlobSizeParams { id: inner.id.unwrap_or_default().into(), path: inner.path.clone(), }; let result = tokio::task::spawn_blocking(move || bare.blob_size(¶ms)) .await .map_err(spawn_blocking_error)? .map_err(to_status)?; Ok(Response::new(p::BlobSizeResponse { size: result })) } async fn blob_exists( &self, req: Request, ) -> Result, Status> { let inner = req.into_inner(); let bare = self.registry.get(&inner.repo_id).await?; let oid: crate::cmd::oid::ObjectId = inner.id.unwrap_or_default().into(); let result = tokio::task::spawn_blocking(move || bare.blob_exists(oid)) .await .map_err(spawn_blocking_error)? .map_err(to_status)?; Ok(Response::new(p::BlobExistsResponse { exists: result })) } async fn blob_is_binary( &self, req: Request, ) -> Result, Status> { let inner = req.into_inner(); let repo_id = inner.repo_id.clone(); let oid_str = inner.id.clone().map(|o| o.value).unwrap_or_default(); let cache_key = format!("git:rpc:cache:blob:binary:{}:{}", repo_id, oid_str); if let Ok(Some(cached)) = self.cache.get::(&cache_key).await { return Ok(Response::new(cached)); } let bare = self.registry.get(&repo_id).await?; let oid: crate::cmd::oid::ObjectId = inner.id.unwrap_or_default().into(); let result = tokio::task::spawn_blocking(move || bare.blob_is_binary(oid)) .await .map_err(spawn_blocking_error)? .map_err(to_status)?; let resp = p::BlobIsBinaryResponse { is_binary: result }; let _ = self.cache.set(&cache_key, &resp).await; Ok(Response::new(resp)) } async fn blob_upload( &self, req: Request, ) -> Result, Status> { let inner = req.into_inner(); let bare = self.registry.get(&inner.repo_id).await?; let params = crate::cmd::blob::BlobUploadParams { blob: inner.blob.clone(), path: inner.path.clone(), }; let result = tokio::task::spawn_blocking(move || bare.blob_upload(params)) .await .map_err(spawn_blocking_error)? .map_err(to_status)?; Ok(Response::new(p::BlobUploadResponse { id: Some(result.id.into()), })) } type BlobChunkStreamStream = BlobChunkStream; async fn blob_chunk_stream( &self, req: Request, ) -> Result, Status> { let inner = req.into_inner(); let bare = self.registry.get(&inner.repo_id).await?; let param = crate::cmd::blob::BlobChunkParam { path: inner.path.clone(), oid: inner.oid.unwrap_or_default().into(), size: inner.size as usize, offset: inner.offset as usize, }; let (tx, rx) = tokio::sync::mpsc::channel(4); tokio::task::spawn_blocking(move || { let result = bare.blob_chunk(param); match result { Ok(chunk) => { let _ = tx.blocking_send(Ok(chunk.into())); } Err(e) => { let _ = tx.blocking_send(Err(to_status(e))); } } }); Ok(Response::new(ReceiverStream::new(rx))) } }