117 lines
3.8 KiB
Python
117 lines
3.8 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Protocol
|
|
|
|
import boto3
|
|
|
|
from app.settings import settings
|
|
|
|
|
|
class StorageBackendError(RuntimeError):
|
|
pass
|
|
|
|
|
|
class StorageBackend(Protocol):
|
|
name: str
|
|
|
|
def put_bytes(self, key: str, data: bytes, *, content_type: str | None = None) -> None: ...
|
|
def get_bytes(self, key: str) -> bytes: ...
|
|
def delete(self, key: str) -> None: ...
|
|
def exists(self, key: str) -> bool: ...
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class LocalFilesystemStorageBackend:
|
|
root: Path
|
|
name: str = "local"
|
|
|
|
def __post_init__(self) -> None:
|
|
self.root = self.root.expanduser().resolve()
|
|
self.root.mkdir(parents=True, exist_ok=True)
|
|
|
|
def _path(self, key: str) -> Path:
|
|
path = (self.root / key).resolve()
|
|
if not path.is_relative_to(self.root):
|
|
raise StorageBackendError("Storage key escapes local storage root")
|
|
return path
|
|
|
|
def put_bytes(self, key: str, data: bytes, *, content_type: str | None = None) -> None:
|
|
path = self._path(key)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_bytes(data)
|
|
|
|
def get_bytes(self, key: str) -> bytes:
|
|
path = self._path(key)
|
|
if not path.exists() or not path.is_file():
|
|
raise StorageBackendError("Stored object does not exist")
|
|
return path.read_bytes()
|
|
|
|
def delete(self, key: str) -> None:
|
|
path = self._path(key)
|
|
if path.exists() and path.is_file():
|
|
path.unlink()
|
|
|
|
def exists(self, key: str) -> bool:
|
|
path = self._path(key)
|
|
return path.exists() and path.is_file()
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class S3StorageBackend:
|
|
bucket: str
|
|
endpoint_url: str
|
|
region_name: str
|
|
access_key_id: str
|
|
secret_access_key: str
|
|
name: str = "s3"
|
|
|
|
@property
|
|
def client(self):
|
|
return boto3.client(
|
|
"s3",
|
|
endpoint_url=self.endpoint_url,
|
|
region_name=self.region_name,
|
|
aws_access_key_id=self.access_key_id,
|
|
aws_secret_access_key=self.secret_access_key,
|
|
)
|
|
|
|
def put_bytes(self, key: str, data: bytes, *, content_type: str | None = None) -> None:
|
|
kwargs = {"Bucket": self.bucket, "Key": key, "Body": data}
|
|
if content_type:
|
|
kwargs["ContentType"] = content_type
|
|
self.client.put_object(**kwargs)
|
|
|
|
def get_bytes(self, key: str) -> bytes:
|
|
try:
|
|
obj = self.client.get_object(Bucket=self.bucket, Key=key)
|
|
return obj["Body"].read()
|
|
except Exception as exc: # pragma: no cover - depends on S3 backend
|
|
raise StorageBackendError(str(exc)) from exc
|
|
|
|
def delete(self, key: str) -> None:
|
|
self.client.delete_object(Bucket=self.bucket, Key=key)
|
|
|
|
def exists(self, key: str) -> bool:
|
|
try:
|
|
self.client.head_object(Bucket=self.bucket, Key=key)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def get_storage_backend() -> StorageBackend:
|
|
backend = settings.file_storage_backend.lower().strip()
|
|
if backend in {"local", "filesystem", "fs"}:
|
|
return LocalFilesystemStorageBackend(Path(settings.file_storage_local_root))
|
|
if backend in {"s3", "garage"}:
|
|
return S3StorageBackend(
|
|
bucket=settings.file_storage_s3_bucket or settings.s3_bucket,
|
|
endpoint_url=settings.file_storage_s3_endpoint_url or settings.s3_endpoint_url,
|
|
region_name=settings.file_storage_s3_region or settings.s3_region,
|
|
access_key_id=settings.file_storage_s3_access_key_id or settings.s3_access_key_id,
|
|
secret_access_key=settings.file_storage_s3_secret_access_key or settings.s3_secret_access_key,
|
|
)
|
|
raise StorageBackendError(f"Unsupported file storage backend: {settings.file_storage_backend}")
|