Files

485 lines
18 KiB
Python

from __future__ import annotations
import hashlib
import mimetypes
from pathlib import PurePosixPath
from typing import Any, Iterable
from uuid import uuid4
from sqlalchemy import or_
from sqlalchemy.orm import Session
from app.db.models import Campaign, CampaignAttachmentUse, FileAsset, FileBlob, FileShare, FileVersion
from app.settings import settings
from app.storage.access import ensure_owner_access, user_group_ids
from app.storage.backends import get_storage_backend
from app.storage.common import FileConflictResolution, FileStorageError, UploadedStoredFile, utcnow
from app.storage.paths import filename_from_path, join_folder_filename, normalize_folder, normalize_logical_path, safe_storage_component
def _asset_query_for_owner(session: Session, *, tenant_id: str, owner_type: str, owner_id: str):
query = session.query(FileAsset).filter(FileAsset.tenant_id == tenant_id, FileAsset.owner_type == owner_type)
if owner_type == "user":
return query.filter(FileAsset.owner_user_id == owner_id)
if owner_type == "group":
return query.filter(FileAsset.owner_group_id == owner_id)
raise FileStorageError("Unsupported owner type")
def _storage_bucket_name() -> str:
return settings.file_storage_s3_bucket or settings.s3_bucket
def _storage_backend_name() -> str:
return settings.file_storage_backend.lower().strip()
def _storage_key(*, tenant_id: str, checksum: str, filename: str) -> str:
return f"tenants/{tenant_id}/files/{checksum[:2]}/{uuid4().hex}-{safe_storage_component(filename)}"
def _get_or_create_blob(
session: Session,
*,
tenant_id: str,
data: bytes,
filename: str,
content_type: str | None,
) -> FileBlob:
checksum = hashlib.sha256(data).hexdigest()
size = len(data)
blob = (
session.query(FileBlob)
.filter(FileBlob.tenant_id == tenant_id, FileBlob.checksum_sha256 == checksum, FileBlob.size_bytes == size)
.one_or_none()
)
if blob:
blob.ref_count += 1
session.add(blob)
return blob
storage_key = _storage_key(tenant_id=tenant_id, checksum=checksum, filename=filename)
backend = get_storage_backend()
backend.put_bytes(storage_key, data, content_type=content_type)
blob = FileBlob(
tenant_id=tenant_id,
storage_backend=_storage_backend_name(),
storage_bucket=_storage_bucket_name(),
storage_key=storage_key,
checksum_sha256=checksum,
size_bytes=size,
content_type=content_type,
ref_count=1,
)
session.add(blob)
session.flush()
return blob
def create_file_asset(
session: Session,
*,
tenant_id: str,
owner_type: str,
owner_id: str,
user_id: str,
filename: str,
data: bytes,
folder: str | None = None,
display_path: str | None = None,
content_type: str | None = None,
description: str | None = None,
metadata: dict[str, Any] | None = None,
campaign_id: str | None = None,
conflict_strategy: str = "reject",
conflict_resolutions: Iterable[FileConflictResolution] | None = None,
is_admin: bool = False,
) -> UploadedStoredFile:
owner_type = owner_type.lower().strip()
ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin)
safe_filename = filename_from_path(normalize_logical_path(filename, fallback_filename="file"))
logical_path = normalize_logical_path(display_path) if display_path else join_folder_filename(folder, safe_filename)
if not content_type:
content_type = mimetypes.guess_type(safe_filename)[0] or "application/octet-stream"
conflict_strategy = _normalize_conflict_strategy(conflict_strategy)
resolutions = _resolution_by_path(conflict_resolutions)
resolution = resolutions.get(logical_path)
action = resolution.action if resolution else conflict_strategy
if resolution and resolution.new_path and action == "rename":
logical_path = normalize_logical_path(resolution.new_path)
elif _active_asset_exists(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, path=logical_path):
if action == "reject":
raise FileStorageError(f"Target file already exists: {logical_path}")
if action == "overwrite":
_soft_delete_conflicting_asset(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, path=logical_path)
elif action == "rename":
logical_path = _next_available_logical_path(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, desired_path=logical_path)
elif action == "skip":
raise FileStorageError(f"Skipped upload target: {logical_path}")
elif action == "rename":
logical_path = _next_available_logical_path(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, desired_path=logical_path)
blob = _get_or_create_blob(session, tenant_id=tenant_id, data=data, filename=safe_filename, content_type=content_type)
asset = FileAsset(
tenant_id=tenant_id,
owner_type=owner_type,
owner_user_id=owner_id if owner_type == "user" else None,
owner_group_id=owner_id if owner_type == "group" else None,
display_path=logical_path,
filename=filename_from_path(logical_path),
description=description,
created_by_user_id=user_id,
metadata_=metadata or {},
)
session.add(asset)
session.flush()
version = FileVersion(
tenant_id=tenant_id,
file_asset_id=asset.id,
blob_id=blob.id,
version_number=1,
filename_at_upload=safe_filename,
display_path_at_upload=logical_path,
content_type=content_type,
size_bytes=blob.size_bytes,
checksum_sha256=blob.checksum_sha256,
created_by_user_id=user_id,
)
session.add(version)
session.flush()
asset.current_version_id = version.id
session.add(asset)
if campaign_id:
share_file(session, tenant_id=tenant_id, asset=asset, target_type="campaign", target_id=campaign_id, permission="read", user_id=user_id)
return UploadedStoredFile(asset=asset, version=version, blob=blob)
def get_asset_for_user(session: Session, *, tenant_id: str, user_id: str, asset_id: str, require_write: bool = False, is_admin: bool = False) -> FileAsset:
asset = session.get(FileAsset, asset_id)
if not asset or asset.tenant_id != tenant_id or asset.deleted_at is not None:
raise FileStorageError("File not found")
if is_admin:
return asset
group_ids = user_group_ids(session, tenant_id=tenant_id, user_id=user_id)
owns = (asset.owner_type == "user" and asset.owner_user_id == user_id) or (asset.owner_type == "group" and asset.owner_group_id in group_ids)
if owns:
return asset
permission_values = ["read", "write", "manage"] if not require_write else ["write", "manage"]
share = (
session.query(FileShare)
.filter(
FileShare.tenant_id == tenant_id,
FileShare.file_asset_id == asset.id,
FileShare.revoked_at.is_(None),
FileShare.permission.in_(permission_values),
or_(
(FileShare.target_type == "user") & (FileShare.target_id == user_id),
(FileShare.target_type == "group") & (FileShare.target_id.in_(group_ids)),
(FileShare.target_type == "tenant") & (FileShare.target_id == tenant_id),
),
)
.first()
)
if not share:
raise FileStorageError("No access to this file")
return asset
def list_assets_for_user(
session: Session,
*,
tenant_id: str,
user_id: str,
owner_type: str | None = None,
owner_id: str | None = None,
campaign_id: str | None = None,
path_prefix: str | None = None,
include_deleted: bool = False,
is_admin: bool = False,
) -> list[FileAsset]:
query = session.query(FileAsset).filter(FileAsset.tenant_id == tenant_id)
if not include_deleted:
query = query.filter(FileAsset.deleted_at.is_(None))
if owner_type:
query = query.filter(FileAsset.owner_type == owner_type)
if owner_type == "user" and owner_id:
query = query.filter(FileAsset.owner_user_id == owner_id)
if owner_type == "group" and owner_id:
query = query.filter(FileAsset.owner_group_id == owner_id)
if campaign_id:
query = query.join(FileShare, FileShare.file_asset_id == FileAsset.id).filter(
FileShare.tenant_id == tenant_id,
FileShare.target_type == "campaign",
FileShare.target_id == campaign_id,
FileShare.revoked_at.is_(None),
)
elif not is_admin and not owner_type:
group_ids = user_group_ids(session, tenant_id=tenant_id, user_id=user_id)
query = query.outerjoin(FileShare, FileShare.file_asset_id == FileAsset.id).filter(
or_(
(FileAsset.owner_type == "user") & (FileAsset.owner_user_id == user_id),
(FileAsset.owner_type == "group") & (FileAsset.owner_group_id.in_(group_ids)),
(FileShare.revoked_at.is_(None)) & (FileShare.target_type == "user") & (FileShare.target_id == user_id),
(FileShare.revoked_at.is_(None)) & (FileShare.target_type == "group") & (FileShare.target_id.in_(group_ids)),
(FileShare.revoked_at.is_(None)) & (FileShare.target_type == "tenant") & (FileShare.target_id == tenant_id),
)
)
if path_prefix:
prefix = normalize_folder(path_prefix)
if prefix:
query = query.filter(FileAsset.display_path.like(f"{prefix}/%"))
return query.order_by(FileAsset.display_path.asc(), FileAsset.updated_at.desc()).all()
def current_version_and_blob(session: Session, asset: FileAsset) -> tuple[FileVersion, FileBlob]:
if not asset.current_version_id:
raise FileStorageError("File has no current version")
version = session.get(FileVersion, asset.current_version_id)
if not version:
raise FileStorageError("File version not found")
blob = session.get(FileBlob, version.blob_id)
if not blob:
raise FileStorageError("File blob not found")
return version, blob
def read_asset_bytes(session: Session, asset: FileAsset) -> tuple[bytes, FileVersion, FileBlob]:
version, blob = current_version_and_blob(session, asset)
backend = get_storage_backend()
return backend.get_bytes(blob.storage_key), version, blob
def share_file(
session: Session,
*,
tenant_id: str,
asset: FileAsset,
target_type: str,
target_id: str,
permission: str,
user_id: str,
) -> FileShare:
target_type = target_type.lower().strip()
permission = permission.lower().strip()
if target_type not in {"user", "group", "campaign", "tenant"}:
raise FileStorageError("Unsupported share target")
if permission not in {"read", "write", "manage"}:
raise FileStorageError("Unsupported file permission")
if target_type == "campaign":
campaign = session.get(Campaign, target_id)
if not campaign or campaign.tenant_id != tenant_id:
raise FileStorageError("Campaign not found")
existing = (
session.query(FileShare)
.filter(
FileShare.tenant_id == tenant_id,
FileShare.file_asset_id == asset.id,
FileShare.target_type == target_type,
FileShare.target_id == target_id,
FileShare.revoked_at.is_(None),
)
.one_or_none()
)
if existing:
existing.permission = permission
session.add(existing)
return existing
share = FileShare(
tenant_id=tenant_id,
file_asset_id=asset.id,
target_type=target_type,
target_id=target_id,
permission=permission,
created_by_user_id=user_id,
)
session.add(share)
return share
def soft_delete_assets(session: Session, assets: Iterable[FileAsset]) -> int:
count = 0
now = utcnow()
for asset in assets:
if asset.deleted_at is None:
asset.deleted_at = now
session.add(asset)
count += 1
return count
def asset_is_audit_relevant(session: Session, asset: FileAsset) -> bool:
return (
session.query(CampaignAttachmentUse)
.filter(CampaignAttachmentUse.file_asset_id == asset.id, CampaignAttachmentUse.use_stage == "sent")
.first()
is not None
)
def _asset_owner_id(asset: FileAsset) -> str:
if asset.owner_type == "user" and asset.owner_user_id:
return asset.owner_user_id
if asset.owner_type == "group" and asset.owner_group_id:
return asset.owner_group_id
raise FileStorageError("File has no valid owner")
def _active_asset_exists(session: Session, *, tenant_id: str, owner_type: str, owner_id: str, path: str, exclude_asset_id: str | None = None) -> bool:
return _active_asset_at_path(
session,
tenant_id=tenant_id,
owner_type=owner_type,
owner_id=owner_id,
path=path,
exclude_asset_id=exclude_asset_id,
) is not None
def _active_asset_at_path(session: Session, *, tenant_id: str, owner_type: str, owner_id: str, path: str, exclude_asset_id: str | None = None) -> FileAsset | None:
query = _asset_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id).filter(
FileAsset.deleted_at.is_(None),
FileAsset.display_path == normalize_logical_path(path),
)
if exclude_asset_id:
query = query.filter(FileAsset.id != exclude_asset_id)
return query.first()
def _soft_delete_conflicting_asset(session: Session, *, tenant_id: str, owner_type: str, owner_id: str, path: str, exclude_asset_id: str | None = None) -> None:
asset = _active_asset_at_path(
session,
tenant_id=tenant_id,
owner_type=owner_type,
owner_id=owner_id,
path=path,
exclude_asset_id=exclude_asset_id,
)
if asset is not None:
asset.deleted_at = utcnow()
session.add(asset)
def _split_logical_path(path: str) -> tuple[str, str, str, str]:
normalized = normalize_logical_path(path)
logical = PurePosixPath(normalized)
folder = "" if str(logical.parent) == "." else str(logical.parent)
filename = logical.name
suffixes = "".join(PurePosixPath(filename).suffixes)
stem = filename[: -len(suffixes)] if suffixes else filename
return folder, filename, stem, suffixes
def _candidate_renamed_path(path: str, counter: int) -> str:
folder, _filename, stem, suffixes = _split_logical_path(path)
suffix = " copy" if counter == 1 else f" copy {counter}"
next_name = f"{stem}{suffix}{suffixes}"
return normalize_logical_path(f"{folder}/{next_name}" if folder else next_name)
def _next_available_logical_path(
session: Session,
*,
tenant_id: str,
owner_type: str,
owner_id: str,
desired_path: str,
reserved_paths: set[str] | None = None,
exclude_asset_id: str | None = None,
) -> str:
reserved = reserved_paths or set()
desired = normalize_logical_path(desired_path)
if desired not in reserved and not _active_asset_exists(
session,
tenant_id=tenant_id,
owner_type=owner_type,
owner_id=owner_id,
path=desired,
exclude_asset_id=exclude_asset_id,
):
return desired
counter = 1
while True:
candidate = _candidate_renamed_path(desired, counter)
if candidate not in reserved and not _active_asset_exists(
session,
tenant_id=tenant_id,
owner_type=owner_type,
owner_id=owner_id,
path=candidate,
exclude_asset_id=exclude_asset_id,
):
return candidate
counter += 1
def _resolution_by_path(conflict_resolutions: Iterable[FileConflictResolution] | None) -> dict[str, FileConflictResolution]:
result: dict[str, FileConflictResolution] = {}
for item in conflict_resolutions or []:
target_path = normalize_logical_path(item.target_path)
action = item.action.lower().strip()
if action not in {"overwrite", "rename", "skip"}:
raise FileStorageError("Unsupported conflict resolution")
result[target_path] = FileConflictResolution(target_path=target_path, action=action, new_path=item.new_path)
return result
def _normalize_conflict_strategy(strategy: str | None) -> str:
normalized = (strategy or "reject").lower().strip()
if normalized not in {"reject", "overwrite", "rename"}:
raise FileStorageError("Unsupported conflict strategy")
return normalized
def _copy_asset_to_path(
session: Session,
asset: FileAsset,
*,
tenant_id: str,
target_owner_type: str,
target_owner_id: str,
target_path: str,
user_id: str,
) -> FileAsset:
version, blob = current_version_and_blob(session, asset)
blob.ref_count += 1
session.add(blob)
normalized_path = normalize_logical_path(target_path)
copied = FileAsset(
tenant_id=tenant_id,
owner_type=target_owner_type,
owner_user_id=target_owner_id if target_owner_type == "user" else None,
owner_group_id=target_owner_id if target_owner_type == "group" else None,
display_path=normalized_path,
filename=filename_from_path(normalized_path),
description=asset.description,
created_by_user_id=user_id,
metadata_=dict(asset.metadata_ or {}),
)
session.add(copied)
session.flush()
copied_version = FileVersion(
tenant_id=tenant_id,
file_asset_id=copied.id,
blob_id=blob.id,
version_number=1,
filename_at_upload=version.filename_at_upload,
display_path_at_upload=normalized_path,
content_type=version.content_type,
size_bytes=blob.size_bytes,
checksum_sha256=blob.checksum_sha256,
created_by_user_id=user_id,
)
session.add(copied_version)
session.flush()
copied.current_version_id = copied_version.id
session.add(copied)
return copied
def rename_asset(asset: FileAsset, *, new_path: str) -> None:
normalized = normalize_logical_path(new_path)
asset.display_path = normalized
asset.filename = filename_from_path(normalized)