use std::{ path::{Component, Path, PathBuf}, time::Duration, }; use async_trait::async_trait; use aws_sdk_s3::primitives::ByteStream; use serde::{Deserialize, Serialize}; use tokio::fs; use crate::{ ObjectStorage, PutObjectOptions, StorageError, StorageObject, StorageObjectStream, StorageResult, StoredObject, }; #[derive(Clone, Debug)] pub struct LocalStorageConfig { pub root_path: PathBuf, pub public_url: Option, } #[derive(Clone)] pub struct LocalStorage { config: LocalStorageConfig, } #[derive(Clone, Debug, Default, Deserialize, Serialize)] struct LocalObjectMetadata { content_type: Option, } impl LocalStorage { pub async fn connect(config: LocalStorageConfig) -> StorageResult { if config.root_path.as_os_str().is_empty() { return Err(StorageError::Config( "local storage root path cannot be empty".to_string(), )); } fs::create_dir_all(&config.root_path) .await .map_err(|error| StorageError::Local(error.to_string()))?; Ok(Self { config }) } pub async fn put_bytes( &self, key: &str, bytes: Vec, options: PutObjectOptions, ) -> StorageResult { self.put_stream(key, ByteStream::from(bytes), options).await } fn normalize_key(key: &str) -> StorageResult { let key = key.trim().trim_start_matches('/'); if key.is_empty() { return Err(StorageError::InvalidKey(key.to_string())); } let mut parts = Vec::new(); for component in Path::new(key).components() { match component { Component::Normal(part) => { let part = part.to_str().ok_or_else(|| { StorageError::InvalidKey(key.to_string()) })?; parts.push(part); } Component::CurDir | Component::ParentDir | Component::RootDir | Component::Prefix(_) => { return Err(StorageError::InvalidKey(key.to_string())); } } } if parts.is_empty() { return Err(StorageError::InvalidKey(key.to_string())); } Ok(parts.join("/")) } fn object_path(&self, key: &str) -> PathBuf { self.config.root_path.join(key) } fn metadata_path(&self, key: &str) -> PathBuf { self.config .root_path .join(".metadata") .join(format!("{key}.json")) } fn public_url_for_config( config: &LocalStorageConfig, key: &str, ) -> Option { let base_url = config.public_url.as_ref()?.trim_end_matches('/'); Some(format!("{base_url}/{key}")) } async fn write_metadata( &self, key: &str, metadata: &LocalObjectMetadata, ) -> StorageResult<()> { let path = self.metadata_path(key); if let Some(parent) = path.parent() { fs::create_dir_all(parent) .await .map_err(|error| StorageError::Local(error.to_string()))?; } let bytes = serde_json::to_vec(metadata) .map_err(|error| StorageError::Local(error.to_string()))?; fs::write(path, bytes) .await .map_err(|error| StorageError::Local(error.to_string())) } async fn read_metadata( &self, key: &str, ) -> StorageResult { let path = self.metadata_path(key); let bytes = fs::read(path).await.map_err(|error| { if error.kind() == std::io::ErrorKind::NotFound { StorageError::NotFound(key.to_string()) } else { StorageError::Local(error.to_string()) } })?; serde_json::from_slice(&bytes) .map_err(|error| StorageError::Local(error.to_string())) } } #[async_trait] impl ObjectStorage for LocalStorage { async fn put_stream( &self, key: &str, body: ByteStream, options: PutObjectOptions, ) -> StorageResult { let key = Self::normalize_key(key)?; let path = self.object_path(&key); if let Some(parent) = path.parent() { fs::create_dir_all(parent) .await .map_err(|error| StorageError::Local(error.to_string()))?; } let bytes = crate::collect_byte_stream(body) .await .map_err(|error| StorageError::Stream(error.to_string()))?; if let Some(content_length) = options.content_length && content_length >= 0 && bytes.len() as i64 != content_length { return Err(StorageError::Local(format!( "content length mismatch for {key}: expected {content_length}, got {}", bytes.len() ))); } let metadata = LocalObjectMetadata { content_type: options.content_type, }; fs::write(&path, bytes) .await .map_err(|error| StorageError::Local(error.to_string()))?; self.write_metadata(&key, &metadata).await?; Ok(StoredObject { url: self.public_url(&key)?.unwrap_or_else(|| key.clone()), key, e_tag: None, version_id: None, }) } async fn put_bytes( &self, key: &str, bytes: Vec, options: PutObjectOptions, ) -> StorageResult { LocalStorage::put_bytes(self, key, bytes, options).await } async fn get_stream( &self, key: &str, ) -> StorageResult { let key = Self::normalize_key(key)?; let path = self.object_path(&key); let metadata = fs::metadata(&path).await.map_err(|error| { if error.kind() == std::io::ErrorKind::NotFound { StorageError::NotFound(key.clone()) } else { StorageError::Local(error.to_string()) } })?; let object_metadata = match self.read_metadata(&key).await { Ok(metadata) => metadata, Err(StorageError::NotFound(_)) => LocalObjectMetadata::default(), Err(error) => return Err(error), }; let body = ByteStream::from_path(&path) .await .map_err(|error| StorageError::Stream(error.to_string()))?; Ok(StorageObjectStream { body, content_length: Some(metadata.len() as i64), content_type: object_metadata.content_type, e_tag: None, }) } async fn get_bytes(&self, key: &str) -> StorageResult { let stream = self.get_stream(key).await?; let bytes = crate::collect_byte_stream(stream.body) .await .map_err(|error| StorageError::Stream(error.to_string()))?; Ok(StorageObject { bytes, content_length: stream.content_length, content_type: stream.content_type, e_tag: stream.e_tag, }) } async fn delete(&self, key: &str) -> StorageResult<()> { let key = Self::normalize_key(key)?; let path = self.object_path(&key); fs::remove_file(path).await.map_or_else( |error| { if error.kind() == std::io::ErrorKind::NotFound { Ok(()) } else { Err(StorageError::Local(error.to_string())) } }, |_| Ok(()), )?; fs::remove_file(self.metadata_path(&key)).await.map_or_else( |error| { if error.kind() == std::io::ErrorKind::NotFound { Ok(()) } else { Err(StorageError::Local(error.to_string())) } }, |_| Ok(()), ) } fn public_url(&self, key: &str) -> StorageResult> { let key = Self::normalize_key(key)?; Ok(Self::public_url_for_config(&self.config, &key)) } async fn presigned_get_url( &self, key: &str, _expires_in: Duration, ) -> StorageResult { self.public_url(key)?.ok_or_else(|| { StorageError::Config( "local storage public URL is not configured".to_string(), ) }) } } impl TryFrom<&config::AppConfig> for LocalStorageConfig { type Error = StorageError; fn try_from(config: &config::AppConfig) -> Result { Ok(Self { root_path: PathBuf::from(config.storage_path()), public_url: Some(config.storage_public_url()), }) } } #[cfg(test)] mod tests { use std::time::{SystemTime, UNIX_EPOCH}; use tokio::fs; use super::{LocalStorage, LocalStorageConfig}; use crate::{ObjectStorage, PutObjectOptions, StorageError}; fn temp_root() -> Result { let nanos = SystemTime::now() .duration_since(UNIX_EPOCH) .map_err(|error| StorageError::Local(error.to_string()))? .as_nanos(); Ok(std::env::temp_dir().join(format!( "gitdataai-local-storage-{}-{nanos}", std::process::id() ))) } #[tokio::test] async fn stores_reads_and_deletes_bytes() -> Result<(), StorageError> { let root = temp_root()?; let storage = LocalStorage::connect(LocalStorageConfig { root_path: root.clone(), public_url: Some("/files".to_string()), }) .await?; let stored = storage .put_bytes( "avatars/user-1.txt", b"hello".to_vec(), PutObjectOptions { content_type: Some("text/plain".to_string()), content_length: Some(5), ..PutObjectOptions::default() }, ) .await?; assert_eq!(stored.key, "avatars/user-1.txt"); assert_eq!(stored.url, "/files/avatars/user-1.txt"); let object = storage.get_bytes("avatars/user-1.txt").await?; assert_eq!(object.bytes, b"hello"); assert_eq!(object.content_length, Some(5)); assert_eq!(object.content_type.as_deref(), Some("text/plain")); storage.delete("avatars/user-1.txt").await?; assert!(matches!( storage.get_bytes("avatars/user-1.txt").await, Err(StorageError::NotFound(_)) )); fs::remove_dir_all(root) .await .map_err(|error| StorageError::Local(error.to_string()))?; Ok(()) } #[tokio::test] async fn rejects_path_traversal_keys() -> Result<(), StorageError> { let storage = LocalStorage::connect(LocalStorageConfig { root_path: temp_root()?, public_url: Some("/files".to_string()), }) .await?; assert!(matches!( storage .put_bytes( "../escape.txt", b"bad".to_vec(), PutObjectOptions::default() ) .await, Err(StorageError::InvalidKey(_)) )); assert!(matches!( storage.public_url("nested/../escape.txt"), Err(StorageError::InvalidKey(_)) )); Ok(()) } }