pub mod error; pub mod local; pub mod s3; use std::time::Duration; use async_trait::async_trait; pub use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::primitives::ByteStreamError; pub use error::{StorageError, StorageResult}; pub use local::{LocalStorage, LocalStorageConfig}; pub use s3::{S3Storage, S3StorageConfig}; use track::CounterVec; #[derive(Clone, Debug)] pub enum AppStorageConfig { Local(LocalStorageConfig), S3(S3StorageConfig), } #[derive(Clone)] pub struct AppStorage { inner: StorageBackend, metrics: Option, } #[derive(Clone)] enum StorageBackend { Local(LocalStorage), S3(S3Storage), } impl AppStorage { pub fn set_metrics(&mut self, registry: track::MetricsRegistry) { self.metrics = Some(registry); } fn backend_name(&self) -> &str { match &self.inner { StorageBackend::Local(_) => "local", StorageBackend::S3(_) => "s3", } } fn record_upload(&self, bytes: usize) { if let Some(reg) = &self.metrics { storage_ops_vec(reg) .with_label_values(&["upload", self.backend_name()]) .inc(); storage_bytes_vec(reg) .with_label_values(&["upload"]) .inc_by(bytes as f64); } } fn record_download(&self, bytes: usize) { if let Some(reg) = &self.metrics { storage_ops_vec(reg) .with_label_values(&["download", self.backend_name()]) .inc(); storage_bytes_vec(reg) .with_label_values(&["download"]) .inc_by(bytes as f64); } } fn record_delete(&self) { if let Some(reg) = &self.metrics { storage_ops_vec(reg) .with_label_values(&["delete", self.backend_name()]) .inc(); } } } #[derive(Clone, Debug, Default)] pub struct PutObjectOptions { pub content_type: Option, pub content_length: Option, pub cache_control: Option, } #[derive(Clone, Debug)] pub struct StoredObject { pub key: String, pub url: String, pub e_tag: Option, pub version_id: Option, } #[derive(Debug)] pub struct StorageObjectStream { pub body: ByteStream, pub content_length: Option, pub content_type: Option, pub e_tag: Option, } #[derive(Clone, Debug)] pub struct StorageObject { pub bytes: Vec, pub content_length: Option, pub content_type: Option, pub e_tag: Option, } #[async_trait] pub trait ObjectStorage: Send + Sync { async fn put_stream( &self, key: &str, body: ByteStream, options: PutObjectOptions, ) -> StorageResult; async fn put_bytes( &self, key: &str, bytes: Vec, options: PutObjectOptions, ) -> StorageResult; async fn get_stream(&self, key: &str) -> StorageResult; async fn get_bytes(&self, key: &str) -> StorageResult; async fn delete(&self, key: &str) -> StorageResult<()>; fn public_url(&self, key: &str) -> StorageResult>; async fn presigned_get_url( &self, key: &str, expires_in: Duration, ) -> StorageResult; } impl AppStorage { #[tracing::instrument(skip(config))] pub async fn init(config: AppStorageConfig) -> StorageResult { let inner = match config { AppStorageConfig::Local(config) => { tracing::info!("initializing local storage"); StorageBackend::Local(LocalStorage::connect(config).await?) } AppStorageConfig::S3(config) => { tracing::info!(bucket = %config.bucket, region = %config.region, "initializing S3 storage"); StorageBackend::S3(S3Storage::connect(config).await?) } }; Ok(Self { inner, metrics: None, }) } } #[async_trait] impl ObjectStorage for AppStorage { #[tracing::instrument(skip(self, body), fields(storage.key = %key))] async fn put_stream( &self, key: &str, body: ByteStream, options: PutObjectOptions, ) -> StorageResult { let result = match &self.inner { StorageBackend::Local(storage) => { storage.put_stream(key, body, options).await } StorageBackend::S3(storage) => { storage.put_stream(key, body, options).await } }; if result.is_ok() { self.record_upload(0); } result } #[tracing::instrument(skip(self, bytes), fields(storage.key = %key, storage.size = bytes.len()))] async fn put_bytes( &self, key: &str, bytes: Vec, options: PutObjectOptions, ) -> StorageResult { let size = bytes.len(); let result = match &self.inner { StorageBackend::Local(storage) => { storage.put_bytes(key, bytes, options).await } StorageBackend::S3(storage) => { storage.put_bytes(key, bytes, options).await } }; if result.is_ok() { self.record_upload(size); } result } #[tracing::instrument(skip(self), fields(storage.key = %key))] async fn get_stream( &self, key: &str, ) -> StorageResult { match &self.inner { StorageBackend::Local(storage) => storage.get_stream(key).await, StorageBackend::S3(storage) => storage.get_stream(key).await, } } #[tracing::instrument(skip(self), fields(storage.key = %key))] async fn get_bytes(&self, key: &str) -> StorageResult { let result = match &self.inner { StorageBackend::Local(storage) => storage.get_bytes(key).await, StorageBackend::S3(storage) => storage.get_bytes(key).await, }; if let Ok(obj) = &result { self.record_download(obj.bytes.len()); } result } #[tracing::instrument(skip(self), fields(storage.key = %key))] async fn delete(&self, key: &str) -> StorageResult<()> { let result = match &self.inner { StorageBackend::Local(storage) => storage.delete(key).await, StorageBackend::S3(storage) => storage.delete(key).await, }; if result.is_ok() { self.record_delete(); } result } fn public_url(&self, key: &str) -> StorageResult> { match &self.inner { StorageBackend::Local(storage) => storage.public_url(key), StorageBackend::S3(storage) => storage.public_url(key), } } async fn presigned_get_url( &self, key: &str, expires_in: Duration, ) -> StorageResult { match &self.inner { StorageBackend::Local(storage) => { storage.presigned_get_url(key, expires_in).await } StorageBackend::S3(storage) => { storage.presigned_get_url(key, expires_in).await } } } } fn storage_ops_vec(registry: &track::MetricsRegistry) -> CounterVec { registry .register_counter_vec( "storage_operations_total", "Total storage operations", &["operation", "backend"], ) .expect("failed to register storage_operations_total") } fn storage_bytes_vec(registry: &track::MetricsRegistry) -> CounterVec { registry .register_counter_vec( "storage_bytes_total", "Total bytes transferred", &["operation"], ) .expect("failed to register storage_bytes_total") } pub async fn collect_byte_stream( body: ByteStream, ) -> Result, ByteStreamError> { body.collect().await.map(|data| data.to_vec()) } impl TryFrom<&config::AppConfig> for AppStorageConfig { type Error = StorageError; fn try_from(config: &config::AppConfig) -> Result { let backend = config .storage_backend() .map_err(|error| StorageError::Config(error.to_string()))?; match backend.as_str() { "local" | "fs" | "filesystem" => { Ok(Self::Local(LocalStorageConfig::try_from(config)?)) } "s3" => Ok(Self::S3(S3StorageConfig::try_from(config)?)), backend => Err(StorageError::Config(format!( "unsupported storage backend: {backend}" ))), } } }