381 lines
11 KiB
Rust
381 lines
11 KiB
Rust
use std::{
|
|
path::{Component, Path, PathBuf},
|
|
time::Duration,
|
|
};
|
|
|
|
use async_trait::async_trait;
|
|
use aws_sdk_s3::primitives::ByteStream;
|
|
use serde::{Deserialize, Serialize};
|
|
use tokio::fs;
|
|
|
|
use crate::{
|
|
ObjectStorage, PutObjectOptions, StorageError, StorageObject,
|
|
StorageObjectStream, StorageResult, StoredObject,
|
|
};
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct LocalStorageConfig {
|
|
pub root_path: PathBuf,
|
|
pub public_url: Option<String>,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct LocalStorage {
|
|
config: LocalStorageConfig,
|
|
}
|
|
|
|
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
|
|
struct LocalObjectMetadata {
|
|
content_type: Option<String>,
|
|
}
|
|
|
|
impl LocalStorage {
|
|
pub async fn connect(config: LocalStorageConfig) -> StorageResult<Self> {
|
|
if config.root_path.as_os_str().is_empty() {
|
|
return Err(StorageError::Config(
|
|
"local storage root path cannot be empty".to_string(),
|
|
));
|
|
}
|
|
|
|
fs::create_dir_all(&config.root_path)
|
|
.await
|
|
.map_err(|error| StorageError::Local(error.to_string()))?;
|
|
|
|
Ok(Self { config })
|
|
}
|
|
|
|
pub async fn put_bytes(
|
|
&self,
|
|
key: &str,
|
|
bytes: Vec<u8>,
|
|
options: PutObjectOptions,
|
|
) -> StorageResult<StoredObject> {
|
|
self.put_stream(key, ByteStream::from(bytes), options).await
|
|
}
|
|
|
|
fn normalize_key(key: &str) -> StorageResult<String> {
|
|
let key = key.trim().trim_start_matches('/');
|
|
if key.is_empty() {
|
|
return Err(StorageError::InvalidKey(key.to_string()));
|
|
}
|
|
|
|
let mut parts = Vec::new();
|
|
for component in Path::new(key).components() {
|
|
match component {
|
|
Component::Normal(part) => {
|
|
let part = part.to_str().ok_or_else(|| {
|
|
StorageError::InvalidKey(key.to_string())
|
|
})?;
|
|
parts.push(part);
|
|
}
|
|
Component::CurDir
|
|
| Component::ParentDir
|
|
| Component::RootDir
|
|
| Component::Prefix(_) => {
|
|
return Err(StorageError::InvalidKey(key.to_string()));
|
|
}
|
|
}
|
|
}
|
|
|
|
if parts.is_empty() {
|
|
return Err(StorageError::InvalidKey(key.to_string()));
|
|
}
|
|
|
|
Ok(parts.join("/"))
|
|
}
|
|
|
|
fn object_path(&self, key: &str) -> PathBuf {
|
|
self.config.root_path.join(key)
|
|
}
|
|
|
|
fn metadata_path(&self, key: &str) -> PathBuf {
|
|
self.config
|
|
.root_path
|
|
.join(".metadata")
|
|
.join(format!("{key}.json"))
|
|
}
|
|
|
|
fn public_url_for_config(
|
|
config: &LocalStorageConfig,
|
|
key: &str,
|
|
) -> Option<String> {
|
|
let base_url = config.public_url.as_ref()?.trim_end_matches('/');
|
|
Some(format!("{base_url}/{key}"))
|
|
}
|
|
|
|
async fn write_metadata(
|
|
&self,
|
|
key: &str,
|
|
metadata: &LocalObjectMetadata,
|
|
) -> StorageResult<()> {
|
|
let path = self.metadata_path(key);
|
|
if let Some(parent) = path.parent() {
|
|
fs::create_dir_all(parent)
|
|
.await
|
|
.map_err(|error| StorageError::Local(error.to_string()))?;
|
|
}
|
|
|
|
let bytes = serde_json::to_vec(metadata)
|
|
.map_err(|error| StorageError::Local(error.to_string()))?;
|
|
fs::write(path, bytes)
|
|
.await
|
|
.map_err(|error| StorageError::Local(error.to_string()))
|
|
}
|
|
|
|
async fn read_metadata(
|
|
&self,
|
|
key: &str,
|
|
) -> StorageResult<LocalObjectMetadata> {
|
|
let path = self.metadata_path(key);
|
|
let bytes = fs::read(path).await.map_err(|error| {
|
|
if error.kind() == std::io::ErrorKind::NotFound {
|
|
StorageError::NotFound(key.to_string())
|
|
} else {
|
|
StorageError::Local(error.to_string())
|
|
}
|
|
})?;
|
|
serde_json::from_slice(&bytes)
|
|
.map_err(|error| StorageError::Local(error.to_string()))
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl ObjectStorage for LocalStorage {
|
|
async fn put_stream(
|
|
&self,
|
|
key: &str,
|
|
body: ByteStream,
|
|
options: PutObjectOptions,
|
|
) -> StorageResult<StoredObject> {
|
|
let key = Self::normalize_key(key)?;
|
|
let path = self.object_path(&key);
|
|
if let Some(parent) = path.parent() {
|
|
fs::create_dir_all(parent)
|
|
.await
|
|
.map_err(|error| StorageError::Local(error.to_string()))?;
|
|
}
|
|
|
|
let bytes = crate::collect_byte_stream(body)
|
|
.await
|
|
.map_err(|error| StorageError::Stream(error.to_string()))?;
|
|
if let Some(content_length) = options.content_length
|
|
&& content_length >= 0
|
|
&& bytes.len() as i64 != content_length
|
|
{
|
|
return Err(StorageError::Local(format!(
|
|
"content length mismatch for {key}: expected {content_length}, got {}",
|
|
bytes.len()
|
|
)));
|
|
}
|
|
|
|
let metadata = LocalObjectMetadata {
|
|
content_type: options.content_type,
|
|
};
|
|
|
|
fs::write(&path, bytes)
|
|
.await
|
|
.map_err(|error| StorageError::Local(error.to_string()))?;
|
|
self.write_metadata(&key, &metadata).await?;
|
|
|
|
Ok(StoredObject {
|
|
url: self.public_url(&key)?.unwrap_or_else(|| key.clone()),
|
|
key,
|
|
e_tag: None,
|
|
version_id: None,
|
|
})
|
|
}
|
|
|
|
async fn put_bytes(
|
|
&self,
|
|
key: &str,
|
|
bytes: Vec<u8>,
|
|
options: PutObjectOptions,
|
|
) -> StorageResult<StoredObject> {
|
|
LocalStorage::put_bytes(self, key, bytes, options).await
|
|
}
|
|
|
|
async fn get_stream(
|
|
&self,
|
|
key: &str,
|
|
) -> StorageResult<StorageObjectStream> {
|
|
let key = Self::normalize_key(key)?;
|
|
let path = self.object_path(&key);
|
|
let metadata = fs::metadata(&path).await.map_err(|error| {
|
|
if error.kind() == std::io::ErrorKind::NotFound {
|
|
StorageError::NotFound(key.clone())
|
|
} else {
|
|
StorageError::Local(error.to_string())
|
|
}
|
|
})?;
|
|
let object_metadata = match self.read_metadata(&key).await {
|
|
Ok(metadata) => metadata,
|
|
Err(StorageError::NotFound(_)) => LocalObjectMetadata::default(),
|
|
Err(error) => return Err(error),
|
|
};
|
|
let body = ByteStream::from_path(&path)
|
|
.await
|
|
.map_err(|error| StorageError::Stream(error.to_string()))?;
|
|
|
|
Ok(StorageObjectStream {
|
|
body,
|
|
content_length: Some(metadata.len() as i64),
|
|
content_type: object_metadata.content_type,
|
|
e_tag: None,
|
|
})
|
|
}
|
|
|
|
async fn get_bytes(&self, key: &str) -> StorageResult<StorageObject> {
|
|
let stream = self.get_stream(key).await?;
|
|
let bytes = crate::collect_byte_stream(stream.body)
|
|
.await
|
|
.map_err(|error| StorageError::Stream(error.to_string()))?;
|
|
|
|
Ok(StorageObject {
|
|
bytes,
|
|
content_length: stream.content_length,
|
|
content_type: stream.content_type,
|
|
e_tag: stream.e_tag,
|
|
})
|
|
}
|
|
|
|
async fn delete(&self, key: &str) -> StorageResult<()> {
|
|
let key = Self::normalize_key(key)?;
|
|
let path = self.object_path(&key);
|
|
fs::remove_file(path).await.map_or_else(
|
|
|error| {
|
|
if error.kind() == std::io::ErrorKind::NotFound {
|
|
Ok(())
|
|
} else {
|
|
Err(StorageError::Local(error.to_string()))
|
|
}
|
|
},
|
|
|_| Ok(()),
|
|
)?;
|
|
fs::remove_file(self.metadata_path(&key)).await.map_or_else(
|
|
|error| {
|
|
if error.kind() == std::io::ErrorKind::NotFound {
|
|
Ok(())
|
|
} else {
|
|
Err(StorageError::Local(error.to_string()))
|
|
}
|
|
},
|
|
|_| Ok(()),
|
|
)
|
|
}
|
|
|
|
fn public_url(&self, key: &str) -> StorageResult<Option<String>> {
|
|
let key = Self::normalize_key(key)?;
|
|
Ok(Self::public_url_for_config(&self.config, &key))
|
|
}
|
|
|
|
async fn presigned_get_url(
|
|
&self,
|
|
key: &str,
|
|
_expires_in: Duration,
|
|
) -> StorageResult<String> {
|
|
self.public_url(key)?.ok_or_else(|| {
|
|
StorageError::Config(
|
|
"local storage public URL is not configured".to_string(),
|
|
)
|
|
})
|
|
}
|
|
}
|
|
|
|
impl TryFrom<&config::AppConfig> for LocalStorageConfig {
|
|
type Error = StorageError;
|
|
|
|
fn try_from(config: &config::AppConfig) -> Result<Self, Self::Error> {
|
|
Ok(Self {
|
|
root_path: PathBuf::from(config.storage_path()),
|
|
public_url: Some(config.storage_public_url()),
|
|
})
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use std::time::{SystemTime, UNIX_EPOCH};
|
|
|
|
use tokio::fs;
|
|
|
|
use super::{LocalStorage, LocalStorageConfig};
|
|
use crate::{ObjectStorage, PutObjectOptions, StorageError};
|
|
|
|
fn temp_root() -> Result<std::path::PathBuf, StorageError> {
|
|
let nanos = SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.map_err(|error| StorageError::Local(error.to_string()))?
|
|
.as_nanos();
|
|
Ok(std::env::temp_dir().join(format!(
|
|
"gitdataai-local-storage-{}-{nanos}",
|
|
std::process::id()
|
|
)))
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn stores_reads_and_deletes_bytes() -> Result<(), StorageError> {
|
|
let root = temp_root()?;
|
|
let storage = LocalStorage::connect(LocalStorageConfig {
|
|
root_path: root.clone(),
|
|
public_url: Some("/files".to_string()),
|
|
})
|
|
.await?;
|
|
|
|
let stored = storage
|
|
.put_bytes(
|
|
"avatars/user-1.txt",
|
|
b"hello".to_vec(),
|
|
PutObjectOptions {
|
|
content_type: Some("text/plain".to_string()),
|
|
content_length: Some(5),
|
|
..PutObjectOptions::default()
|
|
},
|
|
)
|
|
.await?;
|
|
assert_eq!(stored.key, "avatars/user-1.txt");
|
|
assert_eq!(stored.url, "/files/avatars/user-1.txt");
|
|
|
|
let object = storage.get_bytes("avatars/user-1.txt").await?;
|
|
assert_eq!(object.bytes, b"hello");
|
|
assert_eq!(object.content_length, Some(5));
|
|
assert_eq!(object.content_type.as_deref(), Some("text/plain"));
|
|
|
|
storage.delete("avatars/user-1.txt").await?;
|
|
assert!(matches!(
|
|
storage.get_bytes("avatars/user-1.txt").await,
|
|
Err(StorageError::NotFound(_))
|
|
));
|
|
|
|
fs::remove_dir_all(root)
|
|
.await
|
|
.map_err(|error| StorageError::Local(error.to_string()))?;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn rejects_path_traversal_keys() -> Result<(), StorageError> {
|
|
let storage = LocalStorage::connect(LocalStorageConfig {
|
|
root_path: temp_root()?,
|
|
public_url: Some("/files".to_string()),
|
|
})
|
|
.await?;
|
|
|
|
assert!(matches!(
|
|
storage
|
|
.put_bytes(
|
|
"../escape.txt",
|
|
b"bad".to_vec(),
|
|
PutObjectOptions::default()
|
|
)
|
|
.await,
|
|
Err(StorageError::InvalidKey(_))
|
|
));
|
|
assert!(matches!(
|
|
storage.public_url("nested/../escape.txt"),
|
|
Err(StorageError::InvalidKey(_))
|
|
));
|
|
|
|
Ok(())
|
|
}
|
|
}
|