From 36e9211ee63757dbff0314441847f3b4a2226745 Mon Sep 17 00:00:00 2001 From: Albrecht Degering Date: Sat, 13 Jun 2026 04:07:46 +0200 Subject: [PATCH] Refactoring of services.py; tests --- server/app/api/v1/file_schemas.py | 186 ++++++ server/app/api/v1/files.py | 288 ++++------ server/app/storage/access.py | 44 ++ server/app/storage/archives.py | 75 +++ server/app/storage/campaign_usage.py | 127 ++++ server/app/storage/common.py | 42 ++ server/app/storage/files.py | 484 ++++++++++++++++ server/app/storage/folders.py | 173 ++++++ server/app/storage/search.py | 77 +++ server/app/storage/services.py | 828 +++------------------------ server/app/storage/transfers.py | 447 +++++++++++++++ server/multimailer-dev.db | Bin 1011712 -> 1032192 bytes server/requirements-dev.txt | 4 + server/tests/__init__.py | 1 + server/tests/test_api_smoke.py | 257 +++++++++ 15 files changed, 2113 insertions(+), 920 deletions(-) create mode 100644 server/app/api/v1/file_schemas.py create mode 100644 server/app/storage/access.py create mode 100644 server/app/storage/archives.py create mode 100644 server/app/storage/campaign_usage.py create mode 100644 server/app/storage/common.py create mode 100644 server/app/storage/files.py create mode 100644 server/app/storage/folders.py create mode 100644 server/app/storage/search.py create mode 100644 server/app/storage/transfers.py create mode 100644 server/requirements-dev.txt create mode 100644 server/tests/__init__.py create mode 100644 server/tests/test_api_smoke.py diff --git a/server/app/api/v1/file_schemas.py b/server/app/api/v1/file_schemas.py new file mode 100644 index 0000000..ea592c3 --- /dev/null +++ b/server/app/api/v1/file_schemas.py @@ -0,0 +1,186 @@ +from __future__ import annotations + +from typing import Any, Literal + +from pydantic import BaseModel, Field + +from app.storage.common import FileConflictResolution + + +class FileSpaceResponse(BaseModel): + id: str + label: str + owner_type: Literal["user", "group"] + owner_id: str + description: str | None = None + + +class FileSpacesResponse(BaseModel): + spaces: list[FileSpaceResponse] + + +class FileShareResponse(BaseModel): + id: str + target_type: str + target_id: str + permission: str + created_at: str + revoked_at: str | None = None + + +class FileAssetResponse(BaseModel): + id: str + tenant_id: str + owner_type: str + owner_id: str + display_path: str + filename: str + description: str | None = None + size_bytes: int + content_type: str | None = None + checksum_sha256: str + version_id: str + created_at: str + updated_at: str + deleted_at: str | None = None + audit_relevant: bool = False + metadata: dict[str, Any] | None = None + shares: list[FileShareResponse] = Field(default_factory=list) + + +class FileFolderResponse(BaseModel): + id: str + tenant_id: str + owner_type: str + owner_id: str + path: str + created_at: str + updated_at: str + deleted_at: str | None = None + + +class FileFoldersResponse(BaseModel): + folders: list[FileFolderResponse] + + +class FileFolderCreateRequest(BaseModel): + owner_type: Literal["user", "group"] + owner_id: str + path: str + + +class FileFolderDeleteRequest(BaseModel): + owner_type: Literal["user", "group"] + owner_id: str + path: str + recursive: bool = True + + +class FileFolderDeleteResponse(BaseModel): + deleted_folders: int + deleted_files: int + + +class FileListResponse(BaseModel): + files: list[FileAssetResponse] + + +class FileUploadResponse(BaseModel): + files: list[FileAssetResponse] + + +class BulkDeleteRequest(BaseModel): + file_ids: list[str] + + +class BulkDeleteResponse(BaseModel): + deleted_count: int + + +class ConflictResolutionRequest(BaseModel): + target_path: str + action: Literal["overwrite", "rename", "skip"] + new_path: str | None = None + + +def _conflict_resolutions(items: list[ConflictResolutionRequest] | None) -> list[FileConflictResolution]: + return [FileConflictResolution(target_path=item.target_path, action=item.action, new_path=item.new_path) for item in items or []] + + +class FileShareRequest(BaseModel): + target_type: Literal["user", "group", "campaign", "tenant"] + target_id: str + permission: Literal["read", "write", "manage"] = "read" + + +class RenameRequest(BaseModel): + file_ids: list[str] = Field(default_factory=list) + folder_paths: list[str] = Field(default_factory=list) + owner_type: Literal["user", "group"] | None = None + owner_id: str | None = None + mode: Literal["direct", "prefix", "suffix", "replace"] + new_name: str | None = None + find: str | None = None + replacement: str = "" + prefix: str = "" + suffix: str = "" + recursive: bool = False + dry_run: bool = True + + +class RenamePreviewItem(BaseModel): + kind: Literal["file", "folder"] + id: str + file_id: str | None = None + folder_path: str | None = None + old_path: str + new_path: str + + +class RenameResponse(BaseModel): + dry_run: bool + items: list[RenamePreviewItem] + + +class TransferRequest(BaseModel): + operation: Literal["move", "copy"] + file_ids: list[str] = Field(default_factory=list) + folder_paths: list[str] = Field(default_factory=list) + source_owner_type: Literal["user", "group"] + source_owner_id: str + target_owner_type: Literal["user", "group"] + target_owner_id: str + target_folder: str = "" + conflict_strategy: Literal["reject", "overwrite", "rename"] = "reject" + conflict_resolutions: list[ConflictResolutionRequest] = Field(default_factory=list) + + +class TransferResponse(BaseModel): + operation: str + files: int + folders: int + + +class ArchiveRequest(BaseModel): + file_ids: list[str] + filename: str = "files.zip" + + +class PatternResolveRequest(BaseModel): + patterns: list[str] + owner_type: Literal["user", "group"] | None = None + owner_id: str | None = None + campaign_id: str | None = None + path_prefix: str | None = None + include_unmatched: bool = True + case_sensitive: bool = False + + +class PatternMatchResponse(BaseModel): + pattern: str + matches: list[FileAssetResponse] + + +class PatternResolveResponse(BaseModel): + patterns: list[PatternMatchResponse] + unmatched: list[FileAssetResponse] = Field(default_factory=list) diff --git a/server/app/api/v1/files.py b/server/app/api/v1/files.py index b70b0bb..8d6460d 100644 --- a/server/app/api/v1/files.py +++ b/server/app/api/v1/files.py @@ -1,180 +1,63 @@ from __future__ import annotations +import json from io import BytesIO -from typing import Any, Literal - +from typing import Literal from fastapi import APIRouter, Depends, File as FastAPIFile, Form, HTTPException, UploadFile, status from fastapi.responses import StreamingResponse -from pydantic import BaseModel, Field from sqlalchemy.orm import Session from app.auth.dependencies import ApiPrincipal, require_scope +from app.api.v1.file_schemas import ( + ArchiveRequest, + BulkDeleteRequest, + BulkDeleteResponse, + ConflictResolutionRequest, + FileAssetResponse, + FileFolderCreateRequest, + FileFolderDeleteRequest, + FileFolderDeleteResponse, + FileFolderResponse, + FileFoldersResponse, + FileListResponse, + FileShareRequest, + FileShareResponse, + FileSpaceResponse, + FileSpacesResponse, + FileUploadResponse, + PatternMatchResponse, + PatternResolveRequest, + PatternResolveResponse, + RenamePreviewItem, + RenameRequest, + RenameResponse, + TransferRequest, + TransferResponse, + _conflict_resolutions, +) from app.db.models import Campaign, FileAsset, FileFolder, FileShare, Group from app.db.session import get_session from app.storage.paths import UnsafeFilePathError, filename_from_path, normalize_folder, normalize_logical_path -from app.storage.services import ( - FileStorageError, +from app.storage.access import ensure_group_access, user_group_ids +from app.storage.archives import create_zip_bytes, extract_zip_upload +from app.storage.common import FileStorageError +from app.storage.files import ( asset_is_audit_relevant, - build_rename_preview, create_file_asset, - create_folder, - create_zip_bytes, current_version_and_blob, - extract_zip_upload, get_asset_for_user, list_assets_for_user, - list_folders_for_user, - rename_asset, - resolve_patterns, + read_asset_bytes, share_file, soft_delete_assets, - soft_delete_folder, - user_group_ids, - read_asset_bytes, ) +from app.storage.folders import create_folder, list_folders_for_user, soft_delete_folder +from app.storage.search import resolve_patterns +from app.storage.transfers import rename_selection, transfer_selection router = APIRouter(prefix="/files", tags=["files"]) -class FileSpaceResponse(BaseModel): - id: str - label: str - owner_type: Literal["user", "group"] - owner_id: str - description: str | None = None - - -class FileSpacesResponse(BaseModel): - spaces: list[FileSpaceResponse] - - -class FileShareResponse(BaseModel): - id: str - target_type: str - target_id: str - permission: str - created_at: str - revoked_at: str | None = None - - -class FileAssetResponse(BaseModel): - id: str - tenant_id: str - owner_type: str - owner_id: str - display_path: str - filename: str - description: str | None = None - size_bytes: int - content_type: str | None = None - checksum_sha256: str - version_id: str - created_at: str - updated_at: str - deleted_at: str | None = None - audit_relevant: bool = False - metadata: dict[str, Any] | None = None - shares: list[FileShareResponse] = Field(default_factory=list) - - -class FileFolderResponse(BaseModel): - id: str - tenant_id: str - owner_type: str - owner_id: str - path: str - created_at: str - updated_at: str - deleted_at: str | None = None - - -class FileFoldersResponse(BaseModel): - folders: list[FileFolderResponse] - - -class FileFolderCreateRequest(BaseModel): - owner_type: Literal["user", "group"] - owner_id: str - path: str - - -class FileFolderDeleteRequest(BaseModel): - owner_type: Literal["user", "group"] - owner_id: str - path: str - recursive: bool = True - - -class FileFolderDeleteResponse(BaseModel): - deleted_folders: int - deleted_files: int - - -class FileListResponse(BaseModel): - files: list[FileAssetResponse] - - -class FileUploadResponse(BaseModel): - files: list[FileAssetResponse] - - -class BulkDeleteRequest(BaseModel): - file_ids: list[str] - - -class BulkDeleteResponse(BaseModel): - deleted_count: int - - -class FileShareRequest(BaseModel): - target_type: Literal["user", "group", "campaign", "tenant"] - target_id: str - permission: Literal["read", "write", "manage"] = "read" - - -class RenameRequest(BaseModel): - file_ids: list[str] - mode: Literal["prefix", "suffix", "replace"] - find: str | None = None - replacement: str = "" - prefix: str = "" - suffix: str = "" - dry_run: bool = True - - -class RenamePreviewItem(BaseModel): - file_id: str - old_path: str - new_path: str - - -class RenameResponse(BaseModel): - dry_run: bool - items: list[RenamePreviewItem] - - -class ArchiveRequest(BaseModel): - file_ids: list[str] - filename: str = "files.zip" - - -class PatternResolveRequest(BaseModel): - patterns: list[str] - owner_type: Literal["user", "group"] | None = None - owner_id: str | None = None - campaign_id: str | None = None - path_prefix: str | None = None - include_unmatched: bool = True - - -class PatternMatchResponse(BaseModel): - pattern: str - matches: list[FileAssetResponse] - - -class PatternResolveResponse(BaseModel): - patterns: list[PatternMatchResponse] - unmatched: list[FileAssetResponse] = Field(default_factory=list) def _is_admin(principal: ApiPrincipal) -> bool: @@ -251,8 +134,6 @@ def _ensure_list_owner_access(session: Session, principal: ApiPrincipal, owner_t raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No access to this user file space") if owner_type == "group" and owner_id: try: - from app.storage.services import ensure_group_access - ensure_group_access( session, tenant_id=principal.tenant_id, @@ -393,12 +274,16 @@ async def upload_files( path: str = Form(default=""), campaign_id: str | None = Form(default=None), unpack_zip: bool = Form(default=False), + conflict_strategy: Literal["reject", "overwrite", "rename"] = Form(default="reject"), + conflict_resolutions_json: str | None = Form(default=None), session: Session = Depends(get_session), principal: ApiPrincipal = Depends(require_scope("attachments:write")), ): target_owner = owner_id or principal.user.id uploaded_assets: list[FileAsset] = [] try: + raw_resolutions = json.loads(conflict_resolutions_json) if conflict_resolutions_json else [] + upload_resolutions = _conflict_resolutions([ConflictResolutionRequest(**item) for item in raw_resolutions]) for upload in files: data = await upload.read() filename = upload.filename or "file" @@ -413,6 +298,8 @@ async def upload_files( zip_data=data, folder=path, campaign_id=campaign_id, + conflict_strategy=conflict_strategy, + conflict_resolutions=upload_resolutions, is_admin=_is_admin(principal), ) uploaded_assets.extend(item.asset for item in extracted) @@ -428,6 +315,8 @@ async def upload_files( folder=path, content_type=content_type, campaign_id=campaign_id, + conflict_strategy=conflict_strategy, + conflict_resolutions=upload_resolutions, is_admin=_is_admin(principal), ) uploaded_assets.append(stored.asset) @@ -445,12 +334,16 @@ async def upload_zip( owner_id: str | None = Form(default=None), path: str = Form(default=""), campaign_id: str | None = Form(default=None), + conflict_strategy: Literal["reject", "overwrite", "rename"] = Form(default="reject"), + conflict_resolutions_json: str | None = Form(default=None), session: Session = Depends(get_session), principal: ApiPrincipal = Depends(require_scope("attachments:write")), ): data = await file.read() target_owner = owner_id or principal.user.id try: + raw_resolutions = json.loads(conflict_resolutions_json) if conflict_resolutions_json else [] + upload_resolutions = _conflict_resolutions([ConflictResolutionRequest(**item) for item in raw_resolutions]) extracted = extract_zip_upload( session, tenant_id=principal.tenant_id, @@ -460,6 +353,8 @@ async def upload_zip( zip_data=data, folder=path, campaign_id=campaign_id, + conflict_strategy=conflict_strategy, + conflict_resolutions=upload_resolutions, is_admin=_is_admin(principal), ) session.commit() @@ -571,25 +466,70 @@ def bulk_rename( principal: ApiPrincipal = Depends(require_scope("attachments:write")), ): try: - assets = [ - get_asset_for_user(session, tenant_id=principal.tenant_id, user_id=principal.user.id, asset_id=file_id, require_write=True, is_admin=_is_admin(principal)) - for file_id in payload.file_ids - ] - previews = [ - RenamePreviewItem( - file_id=asset.id, - old_path=asset.display_path, - new_path=normalize_logical_path(build_rename_preview(asset, mode=payload.mode, find=payload.find, replacement=payload.replacement, prefix=payload.prefix, suffix=payload.suffix)), - ) - for asset in assets - ] + plan = rename_selection( + session, + tenant_id=principal.tenant_id, + user_id=principal.user.id, + file_ids=payload.file_ids, + folder_paths=payload.folder_paths, + owner_type=payload.owner_type, + owner_id=payload.owner_id, + mode=payload.mode, + new_name=payload.new_name, + find=payload.find, + replacement=payload.replacement, + prefix=payload.prefix, + suffix=payload.suffix, + recursive=payload.recursive, + dry_run=payload.dry_run, + is_admin=_is_admin(principal), + ) if not payload.dry_run: - by_id = {asset.id: asset for asset in assets} - for item in previews: - rename_asset(by_id[item.file_id], new_path=item.new_path) - session.add(by_id[item.file_id]) session.commit() - return RenameResponse(dry_run=payload.dry_run, items=previews) + return RenameResponse( + dry_run=payload.dry_run, + items=[ + RenamePreviewItem( + kind=item.kind, + id=item.id, + file_id=item.id if item.kind == "file" else None, + folder_path=item.old_path if item.kind == "folder" else None, + old_path=item.old_path, + new_path=item.new_path, + ) + for item in plan + ], + ) + except (FileStorageError, UnsafeFilePathError, ValueError) as exc: + session.rollback() + raise _http_error(exc) from exc + + +@router.post("/transfer", response_model=TransferResponse) +def transfer_files( + payload: TransferRequest, + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("attachments:write")), +): + try: + files, folders = transfer_selection( + session, + tenant_id=principal.tenant_id, + user_id=principal.user.id, + operation=payload.operation, + file_ids=payload.file_ids, + folder_paths=payload.folder_paths, + source_owner_type=payload.source_owner_type, + source_owner_id=payload.source_owner_id, + target_owner_type=payload.target_owner_type, + target_owner_id=payload.target_owner_id, + target_folder=payload.target_folder, + conflict_strategy=payload.conflict_strategy, + conflict_resolutions=_conflict_resolutions(payload.conflict_resolutions), + is_admin=_is_admin(principal), + ) + session.commit() + return TransferResponse(operation=payload.operation, files=files, folders=folders) except (FileStorageError, UnsafeFilePathError, ValueError) as exc: session.rollback() raise _http_error(exc) from exc @@ -632,7 +572,7 @@ def resolve_file_patterns( path_prefix=payload.path_prefix, is_admin=_is_admin(principal), ) - resolved, unmatched = resolve_patterns(assets, payload.patterns, base_path=payload.path_prefix) + resolved, unmatched = resolve_patterns(assets, payload.patterns, base_path=payload.path_prefix, case_sensitive=payload.case_sensitive) return PatternResolveResponse( patterns=[PatternMatchResponse(pattern=item.pattern, matches=[_asset_response(session, asset) for asset in item.matches]) for item in resolved], unmatched=[_asset_response(session, asset) for asset in unmatched] if payload.include_unmatched else [], diff --git a/server/app/storage/access.py b/server/app/storage/access.py new file mode 100644 index 0000000..c0e8f84 --- /dev/null +++ b/server/app/storage/access.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from sqlalchemy.orm import Session + +from app.db.models import Group, UserGroupMembership +from app.storage.common import FileStorageError + + +def user_group_ids(session: Session, *, tenant_id: str, user_id: str, include_admin_groups: bool = False) -> list[str]: + if include_admin_groups: + return [row.id for row in session.query(Group).filter(Group.tenant_id == tenant_id).order_by(Group.name.asc()).all()] + return [ + row.group_id + for row in session.query(UserGroupMembership) + .filter(UserGroupMembership.tenant_id == tenant_id, UserGroupMembership.user_id == user_id) + .all() + ] + + +def ensure_group_access(session: Session, *, tenant_id: str, group_id: str, user_id: str, is_admin: bool = False) -> None: + group = session.get(Group, group_id) + if not group or group.tenant_id != tenant_id: + raise FileStorageError("Group not found") + if is_admin: + return + membership = ( + session.query(UserGroupMembership) + .filter(UserGroupMembership.tenant_id == tenant_id, UserGroupMembership.user_id == user_id, UserGroupMembership.group_id == group_id) + .one_or_none() + ) + if membership is None: + raise FileStorageError("No access to this group file space") + + +def ensure_owner_access(session: Session, *, tenant_id: str, owner_type: str, owner_id: str, user_id: str, is_admin: bool = False) -> None: + owner_type = owner_type.lower().strip() + if owner_type == "user": + if owner_id != user_id and not is_admin: + raise FileStorageError("No access to this user file space") + return + if owner_type == "group": + ensure_group_access(session, tenant_id=tenant_id, group_id=owner_id, user_id=user_id, is_admin=is_admin) + return + raise FileStorageError("Files must be owned by a user or group") diff --git a/server/app/storage/archives.py b/server/app/storage/archives.py new file mode 100644 index 0000000..aaf97a0 --- /dev/null +++ b/server/app/storage/archives.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +import mimetypes +import zipfile +from io import BytesIO +from typing import Iterable + +from sqlalchemy.orm import Session + +from app.db.models import FileAsset +from app.storage.common import FileStorageError, UploadedStoredFile +from app.storage.files import create_file_asset, read_asset_bytes +from app.storage.paths import filename_from_path, normalize_folder, normalize_logical_path + + +def create_zip_bytes(session: Session, assets: Iterable[FileAsset]) -> bytes: + buffer = BytesIO() + with zipfile.ZipFile(buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as archive: + for asset in assets: + data, _, _ = read_asset_bytes(session, asset) + archive.writestr(asset.display_path, data) + buffer.seek(0) + return buffer.getvalue() + + +def extract_zip_upload( + session: Session, + *, + tenant_id: str, + owner_type: str, + owner_id: str, + user_id: str, + zip_data: bytes, + folder: str | None, + campaign_id: str | None, + conflict_strategy: str = "reject", + conflict_resolutions: Iterable[FileConflictResolution] | None = None, + is_admin: bool = False, + max_files: int = 1000, + max_total_bytes: int = 250 * 1024 * 1024, +) -> list[UploadedStoredFile]: + uploaded: list[UploadedStoredFile] = [] + total = 0 + base_folder = normalize_folder(folder) + with zipfile.ZipFile(BytesIO(zip_data)) as archive: + infos = [info for info in archive.infolist() if not info.is_dir()] + if len(infos) > max_files: + raise FileStorageError(f"ZIP contains too many files (limit {max_files})") + for info in infos: + if info.file_size < 0: + raise FileStorageError("Invalid ZIP member") + total += info.file_size + if total > max_total_bytes: + raise FileStorageError("ZIP is too large after extraction") + inner_path = normalize_logical_path(info.filename) + target_path = f"{base_folder}/{inner_path}" if base_folder else inner_path + data = archive.read(info) + uploaded.append( + create_file_asset( + session, + tenant_id=tenant_id, + owner_type=owner_type, + owner_id=owner_id, + user_id=user_id, + filename=filename_from_path(inner_path), + data=data, + display_path=target_path, + content_type=mimetypes.guess_type(inner_path)[0] or "application/octet-stream", + campaign_id=campaign_id, + conflict_strategy=conflict_strategy, + conflict_resolutions=conflict_resolutions, + is_admin=is_admin, + ) + ) + return uploaded diff --git a/server/app/storage/campaign_usage.py b/server/app/storage/campaign_usage.py new file mode 100644 index 0000000..825007a --- /dev/null +++ b/server/app/storage/campaign_usage.py @@ -0,0 +1,127 @@ +from __future__ import annotations + +from pathlib import PurePosixPath + +from sqlalchemy.orm import Session + +from app.db.models import CampaignAttachmentUse, CampaignJob, FileAsset +from app.storage.common import utcnow +from app.storage.files import current_version_and_blob, list_assets_for_user + + +def _candidate_match_keys(raw_match: str) -> set[str]: + cleaned = raw_match.replace("\\", "/").strip().strip("/") + result = {cleaned} + if cleaned: + result.add(PurePosixPath(cleaned).name) + return {item for item in result if item} + + +def record_campaign_attachment_uses_for_job(session: Session, job: CampaignJob, *, stage: str = "built") -> None: + """Create best-effort immutable file-use records for matched managed files. + + Existing attachment resolution is still filesystem/path based. This bridge + records uses when a resolved attachment match can be tied to a managed file + by logical path or filename among files shared with the campaign. + """ + + attachments = job.resolved_attachments or [] + if not isinstance(attachments, list): + return + assets = list_assets_for_user( + session, + tenant_id=job.tenant_id, + user_id="", + campaign_id=job.campaign_id, + is_admin=True, + ) + by_key: dict[str, FileAsset] = {} + for asset in assets: + by_key[asset.display_path.strip("/")] = asset + by_key[asset.filename] = asset + for attachment in attachments: + if not isinstance(attachment, dict): + continue + matches = attachment.get("matches") if isinstance(attachment.get("matches"), list) else [] + for raw in matches: + if not isinstance(raw, str): + continue + asset = next((by_key[key] for key in _candidate_match_keys(raw) if key in by_key), None) + if not asset: + continue + version, blob = current_version_and_blob(session, asset) + exists = ( + session.query(CampaignAttachmentUse) + .filter( + CampaignAttachmentUse.campaign_job_id == job.id, + CampaignAttachmentUse.file_version_id == version.id, + CampaignAttachmentUse.filename_used == asset.filename, + CampaignAttachmentUse.use_stage == stage, + ) + .one_or_none() + ) + if exists: + continue + session.add( + CampaignAttachmentUse( + tenant_id=job.tenant_id, + campaign_id=job.campaign_id, + campaign_version_id=job.campaign_version_id, + campaign_job_id=job.id, + entry_index=job.entry_index, + entry_id=job.entry_id, + file_asset_id=asset.id, + file_version_id=version.id, + file_blob_id=blob.id, + filename_used=asset.filename, + checksum_sha256=blob.checksum_sha256, + size_bytes=blob.size_bytes, + content_type=blob.content_type, + use_stage=stage, + ) + ) + + +def mark_job_attachment_uses_sent(session: Session, job: CampaignJob) -> None: + record_campaign_attachment_uses_for_job(session, job, stage="built") + now = utcnow() + uses = ( + session.query(CampaignAttachmentUse) + .filter( + CampaignAttachmentUse.tenant_id == job.tenant_id, + CampaignAttachmentUse.campaign_job_id == job.id, + CampaignAttachmentUse.use_stage == "built", + ) + .all() + ) + for use in uses: + sent = ( + session.query(CampaignAttachmentUse) + .filter( + CampaignAttachmentUse.campaign_job_id == job.id, + CampaignAttachmentUse.file_version_id == use.file_version_id, + CampaignAttachmentUse.use_stage == "sent", + ) + .one_or_none() + ) + if sent: + continue + session.add( + CampaignAttachmentUse( + tenant_id=use.tenant_id, + campaign_id=use.campaign_id, + campaign_version_id=use.campaign_version_id, + campaign_job_id=use.campaign_job_id, + entry_index=use.entry_index, + entry_id=use.entry_id, + file_asset_id=use.file_asset_id, + file_version_id=use.file_version_id, + file_blob_id=use.file_blob_id, + filename_used=use.filename_used, + checksum_sha256=use.checksum_sha256, + size_bytes=use.size_bytes, + content_type=use.content_type, + use_stage="sent", + used_at=now, + ) + ) diff --git a/server/app/storage/common.py b/server/app/storage/common.py new file mode 100644 index 0000000..271ba50 --- /dev/null +++ b/server/app/storage/common.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timezone + +from app.db.models import FileAsset, FileBlob, FileVersion + + +class FileStorageError(RuntimeError): + pass + + +@dataclass(slots=True) +class UploadedStoredFile: + asset: FileAsset + version: FileVersion + blob: FileBlob + + +@dataclass(slots=True) +class ResolvedPattern: + pattern: str + matches: list[FileAsset] + + +@dataclass(slots=True) +class FileConflictResolution: + target_path: str + action: str + new_path: str | None = None + + +@dataclass(slots=True) +class RenamePlanItem: + kind: str + id: str + old_path: str + new_path: str + + +def utcnow() -> datetime: + return datetime.now(timezone.utc) diff --git a/server/app/storage/files.py b/server/app/storage/files.py new file mode 100644 index 0000000..b2a55e5 --- /dev/null +++ b/server/app/storage/files.py @@ -0,0 +1,484 @@ +from __future__ import annotations + +import hashlib +import mimetypes +from pathlib import PurePosixPath +from typing import Any, Iterable +from uuid import uuid4 + +from sqlalchemy import or_ +from sqlalchemy.orm import Session + +from app.db.models import Campaign, CampaignAttachmentUse, FileAsset, FileBlob, FileShare, FileVersion +from app.settings import settings +from app.storage.access import ensure_owner_access, user_group_ids +from app.storage.backends import get_storage_backend +from app.storage.common import FileConflictResolution, FileStorageError, UploadedStoredFile, utcnow +from app.storage.paths import filename_from_path, join_folder_filename, normalize_folder, normalize_logical_path, safe_storage_component + + +def _asset_query_for_owner(session: Session, *, tenant_id: str, owner_type: str, owner_id: str): + query = session.query(FileAsset).filter(FileAsset.tenant_id == tenant_id, FileAsset.owner_type == owner_type) + if owner_type == "user": + return query.filter(FileAsset.owner_user_id == owner_id) + if owner_type == "group": + return query.filter(FileAsset.owner_group_id == owner_id) + raise FileStorageError("Unsupported owner type") + + +def _storage_bucket_name() -> str: + return settings.file_storage_s3_bucket or settings.s3_bucket + + +def _storage_backend_name() -> str: + return settings.file_storage_backend.lower().strip() + + +def _storage_key(*, tenant_id: str, checksum: str, filename: str) -> str: + return f"tenants/{tenant_id}/files/{checksum[:2]}/{uuid4().hex}-{safe_storage_component(filename)}" + + +def _get_or_create_blob( + session: Session, + *, + tenant_id: str, + data: bytes, + filename: str, + content_type: str | None, +) -> FileBlob: + checksum = hashlib.sha256(data).hexdigest() + size = len(data) + blob = ( + session.query(FileBlob) + .filter(FileBlob.tenant_id == tenant_id, FileBlob.checksum_sha256 == checksum, FileBlob.size_bytes == size) + .one_or_none() + ) + if blob: + blob.ref_count += 1 + session.add(blob) + return blob + + storage_key = _storage_key(tenant_id=tenant_id, checksum=checksum, filename=filename) + backend = get_storage_backend() + backend.put_bytes(storage_key, data, content_type=content_type) + blob = FileBlob( + tenant_id=tenant_id, + storage_backend=_storage_backend_name(), + storage_bucket=_storage_bucket_name(), + storage_key=storage_key, + checksum_sha256=checksum, + size_bytes=size, + content_type=content_type, + ref_count=1, + ) + session.add(blob) + session.flush() + return blob + + +def create_file_asset( + session: Session, + *, + tenant_id: str, + owner_type: str, + owner_id: str, + user_id: str, + filename: str, + data: bytes, + folder: str | None = None, + display_path: str | None = None, + content_type: str | None = None, + description: str | None = None, + metadata: dict[str, Any] | None = None, + campaign_id: str | None = None, + conflict_strategy: str = "reject", + conflict_resolutions: Iterable[FileConflictResolution] | None = None, + is_admin: bool = False, +) -> UploadedStoredFile: + owner_type = owner_type.lower().strip() + ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin) + + safe_filename = filename_from_path(normalize_logical_path(filename, fallback_filename="file")) + logical_path = normalize_logical_path(display_path) if display_path else join_folder_filename(folder, safe_filename) + if not content_type: + content_type = mimetypes.guess_type(safe_filename)[0] or "application/octet-stream" + + conflict_strategy = _normalize_conflict_strategy(conflict_strategy) + resolutions = _resolution_by_path(conflict_resolutions) + resolution = resolutions.get(logical_path) + action = resolution.action if resolution else conflict_strategy + if resolution and resolution.new_path and action == "rename": + logical_path = normalize_logical_path(resolution.new_path) + elif _active_asset_exists(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, path=logical_path): + if action == "reject": + raise FileStorageError(f"Target file already exists: {logical_path}") + if action == "overwrite": + _soft_delete_conflicting_asset(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, path=logical_path) + elif action == "rename": + logical_path = _next_available_logical_path(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, desired_path=logical_path) + elif action == "skip": + raise FileStorageError(f"Skipped upload target: {logical_path}") + elif action == "rename": + logical_path = _next_available_logical_path(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, desired_path=logical_path) + + blob = _get_or_create_blob(session, tenant_id=tenant_id, data=data, filename=safe_filename, content_type=content_type) + asset = FileAsset( + tenant_id=tenant_id, + owner_type=owner_type, + owner_user_id=owner_id if owner_type == "user" else None, + owner_group_id=owner_id if owner_type == "group" else None, + display_path=logical_path, + filename=filename_from_path(logical_path), + description=description, + created_by_user_id=user_id, + metadata_=metadata or {}, + ) + session.add(asset) + session.flush() + version = FileVersion( + tenant_id=tenant_id, + file_asset_id=asset.id, + blob_id=blob.id, + version_number=1, + filename_at_upload=safe_filename, + display_path_at_upload=logical_path, + content_type=content_type, + size_bytes=blob.size_bytes, + checksum_sha256=blob.checksum_sha256, + created_by_user_id=user_id, + ) + session.add(version) + session.flush() + asset.current_version_id = version.id + session.add(asset) + if campaign_id: + share_file(session, tenant_id=tenant_id, asset=asset, target_type="campaign", target_id=campaign_id, permission="read", user_id=user_id) + return UploadedStoredFile(asset=asset, version=version, blob=blob) + + +def get_asset_for_user(session: Session, *, tenant_id: str, user_id: str, asset_id: str, require_write: bool = False, is_admin: bool = False) -> FileAsset: + asset = session.get(FileAsset, asset_id) + if not asset or asset.tenant_id != tenant_id or asset.deleted_at is not None: + raise FileStorageError("File not found") + if is_admin: + return asset + group_ids = user_group_ids(session, tenant_id=tenant_id, user_id=user_id) + owns = (asset.owner_type == "user" and asset.owner_user_id == user_id) or (asset.owner_type == "group" and asset.owner_group_id in group_ids) + if owns: + return asset + permission_values = ["read", "write", "manage"] if not require_write else ["write", "manage"] + share = ( + session.query(FileShare) + .filter( + FileShare.tenant_id == tenant_id, + FileShare.file_asset_id == asset.id, + FileShare.revoked_at.is_(None), + FileShare.permission.in_(permission_values), + or_( + (FileShare.target_type == "user") & (FileShare.target_id == user_id), + (FileShare.target_type == "group") & (FileShare.target_id.in_(group_ids)), + (FileShare.target_type == "tenant") & (FileShare.target_id == tenant_id), + ), + ) + .first() + ) + if not share: + raise FileStorageError("No access to this file") + return asset + + +def list_assets_for_user( + session: Session, + *, + tenant_id: str, + user_id: str, + owner_type: str | None = None, + owner_id: str | None = None, + campaign_id: str | None = None, + path_prefix: str | None = None, + include_deleted: bool = False, + is_admin: bool = False, +) -> list[FileAsset]: + query = session.query(FileAsset).filter(FileAsset.tenant_id == tenant_id) + if not include_deleted: + query = query.filter(FileAsset.deleted_at.is_(None)) + if owner_type: + query = query.filter(FileAsset.owner_type == owner_type) + if owner_type == "user" and owner_id: + query = query.filter(FileAsset.owner_user_id == owner_id) + if owner_type == "group" and owner_id: + query = query.filter(FileAsset.owner_group_id == owner_id) + if campaign_id: + query = query.join(FileShare, FileShare.file_asset_id == FileAsset.id).filter( + FileShare.tenant_id == tenant_id, + FileShare.target_type == "campaign", + FileShare.target_id == campaign_id, + FileShare.revoked_at.is_(None), + ) + elif not is_admin and not owner_type: + group_ids = user_group_ids(session, tenant_id=tenant_id, user_id=user_id) + query = query.outerjoin(FileShare, FileShare.file_asset_id == FileAsset.id).filter( + or_( + (FileAsset.owner_type == "user") & (FileAsset.owner_user_id == user_id), + (FileAsset.owner_type == "group") & (FileAsset.owner_group_id.in_(group_ids)), + (FileShare.revoked_at.is_(None)) & (FileShare.target_type == "user") & (FileShare.target_id == user_id), + (FileShare.revoked_at.is_(None)) & (FileShare.target_type == "group") & (FileShare.target_id.in_(group_ids)), + (FileShare.revoked_at.is_(None)) & (FileShare.target_type == "tenant") & (FileShare.target_id == tenant_id), + ) + ) + if path_prefix: + prefix = normalize_folder(path_prefix) + if prefix: + query = query.filter(FileAsset.display_path.like(f"{prefix}/%")) + return query.order_by(FileAsset.display_path.asc(), FileAsset.updated_at.desc()).all() + + +def current_version_and_blob(session: Session, asset: FileAsset) -> tuple[FileVersion, FileBlob]: + if not asset.current_version_id: + raise FileStorageError("File has no current version") + version = session.get(FileVersion, asset.current_version_id) + if not version: + raise FileStorageError("File version not found") + blob = session.get(FileBlob, version.blob_id) + if not blob: + raise FileStorageError("File blob not found") + return version, blob + + +def read_asset_bytes(session: Session, asset: FileAsset) -> tuple[bytes, FileVersion, FileBlob]: + version, blob = current_version_and_blob(session, asset) + backend = get_storage_backend() + return backend.get_bytes(blob.storage_key), version, blob + + +def share_file( + session: Session, + *, + tenant_id: str, + asset: FileAsset, + target_type: str, + target_id: str, + permission: str, + user_id: str, +) -> FileShare: + target_type = target_type.lower().strip() + permission = permission.lower().strip() + if target_type not in {"user", "group", "campaign", "tenant"}: + raise FileStorageError("Unsupported share target") + if permission not in {"read", "write", "manage"}: + raise FileStorageError("Unsupported file permission") + if target_type == "campaign": + campaign = session.get(Campaign, target_id) + if not campaign or campaign.tenant_id != tenant_id: + raise FileStorageError("Campaign not found") + existing = ( + session.query(FileShare) + .filter( + FileShare.tenant_id == tenant_id, + FileShare.file_asset_id == asset.id, + FileShare.target_type == target_type, + FileShare.target_id == target_id, + FileShare.revoked_at.is_(None), + ) + .one_or_none() + ) + if existing: + existing.permission = permission + session.add(existing) + return existing + share = FileShare( + tenant_id=tenant_id, + file_asset_id=asset.id, + target_type=target_type, + target_id=target_id, + permission=permission, + created_by_user_id=user_id, + ) + session.add(share) + return share + + +def soft_delete_assets(session: Session, assets: Iterable[FileAsset]) -> int: + count = 0 + now = utcnow() + for asset in assets: + if asset.deleted_at is None: + asset.deleted_at = now + session.add(asset) + count += 1 + return count + + +def asset_is_audit_relevant(session: Session, asset: FileAsset) -> bool: + return ( + session.query(CampaignAttachmentUse) + .filter(CampaignAttachmentUse.file_asset_id == asset.id, CampaignAttachmentUse.use_stage == "sent") + .first() + is not None + ) + + +def _asset_owner_id(asset: FileAsset) -> str: + if asset.owner_type == "user" and asset.owner_user_id: + return asset.owner_user_id + if asset.owner_type == "group" and asset.owner_group_id: + return asset.owner_group_id + raise FileStorageError("File has no valid owner") + + +def _active_asset_exists(session: Session, *, tenant_id: str, owner_type: str, owner_id: str, path: str, exclude_asset_id: str | None = None) -> bool: + return _active_asset_at_path( + session, + tenant_id=tenant_id, + owner_type=owner_type, + owner_id=owner_id, + path=path, + exclude_asset_id=exclude_asset_id, + ) is not None + + +def _active_asset_at_path(session: Session, *, tenant_id: str, owner_type: str, owner_id: str, path: str, exclude_asset_id: str | None = None) -> FileAsset | None: + query = _asset_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id).filter( + FileAsset.deleted_at.is_(None), + FileAsset.display_path == normalize_logical_path(path), + ) + if exclude_asset_id: + query = query.filter(FileAsset.id != exclude_asset_id) + return query.first() + + +def _soft_delete_conflicting_asset(session: Session, *, tenant_id: str, owner_type: str, owner_id: str, path: str, exclude_asset_id: str | None = None) -> None: + asset = _active_asset_at_path( + session, + tenant_id=tenant_id, + owner_type=owner_type, + owner_id=owner_id, + path=path, + exclude_asset_id=exclude_asset_id, + ) + if asset is not None: + asset.deleted_at = utcnow() + session.add(asset) + + +def _split_logical_path(path: str) -> tuple[str, str, str, str]: + normalized = normalize_logical_path(path) + logical = PurePosixPath(normalized) + folder = "" if str(logical.parent) == "." else str(logical.parent) + filename = logical.name + suffixes = "".join(PurePosixPath(filename).suffixes) + stem = filename[: -len(suffixes)] if suffixes else filename + return folder, filename, stem, suffixes + + +def _candidate_renamed_path(path: str, counter: int) -> str: + folder, _filename, stem, suffixes = _split_logical_path(path) + suffix = " copy" if counter == 1 else f" copy {counter}" + next_name = f"{stem}{suffix}{suffixes}" + return normalize_logical_path(f"{folder}/{next_name}" if folder else next_name) + + +def _next_available_logical_path( + session: Session, + *, + tenant_id: str, + owner_type: str, + owner_id: str, + desired_path: str, + reserved_paths: set[str] | None = None, + exclude_asset_id: str | None = None, +) -> str: + reserved = reserved_paths or set() + desired = normalize_logical_path(desired_path) + if desired not in reserved and not _active_asset_exists( + session, + tenant_id=tenant_id, + owner_type=owner_type, + owner_id=owner_id, + path=desired, + exclude_asset_id=exclude_asset_id, + ): + return desired + counter = 1 + while True: + candidate = _candidate_renamed_path(desired, counter) + if candidate not in reserved and not _active_asset_exists( + session, + tenant_id=tenant_id, + owner_type=owner_type, + owner_id=owner_id, + path=candidate, + exclude_asset_id=exclude_asset_id, + ): + return candidate + counter += 1 + + +def _resolution_by_path(conflict_resolutions: Iterable[FileConflictResolution] | None) -> dict[str, FileConflictResolution]: + result: dict[str, FileConflictResolution] = {} + for item in conflict_resolutions or []: + target_path = normalize_logical_path(item.target_path) + action = item.action.lower().strip() + if action not in {"overwrite", "rename", "skip"}: + raise FileStorageError("Unsupported conflict resolution") + result[target_path] = FileConflictResolution(target_path=target_path, action=action, new_path=item.new_path) + return result + + +def _normalize_conflict_strategy(strategy: str | None) -> str: + normalized = (strategy or "reject").lower().strip() + if normalized not in {"reject", "overwrite", "rename"}: + raise FileStorageError("Unsupported conflict strategy") + return normalized + + +def _copy_asset_to_path( + session: Session, + asset: FileAsset, + *, + tenant_id: str, + target_owner_type: str, + target_owner_id: str, + target_path: str, + user_id: str, +) -> FileAsset: + version, blob = current_version_and_blob(session, asset) + blob.ref_count += 1 + session.add(blob) + normalized_path = normalize_logical_path(target_path) + copied = FileAsset( + tenant_id=tenant_id, + owner_type=target_owner_type, + owner_user_id=target_owner_id if target_owner_type == "user" else None, + owner_group_id=target_owner_id if target_owner_type == "group" else None, + display_path=normalized_path, + filename=filename_from_path(normalized_path), + description=asset.description, + created_by_user_id=user_id, + metadata_=dict(asset.metadata_ or {}), + ) + session.add(copied) + session.flush() + copied_version = FileVersion( + tenant_id=tenant_id, + file_asset_id=copied.id, + blob_id=blob.id, + version_number=1, + filename_at_upload=version.filename_at_upload, + display_path_at_upload=normalized_path, + content_type=version.content_type, + size_bytes=blob.size_bytes, + checksum_sha256=blob.checksum_sha256, + created_by_user_id=user_id, + ) + session.add(copied_version) + session.flush() + copied.current_version_id = copied_version.id + session.add(copied) + return copied + + +def rename_asset(asset: FileAsset, *, new_path: str) -> None: + normalized = normalize_logical_path(new_path) + asset.display_path = normalized + asset.filename = filename_from_path(normalized) diff --git a/server/app/storage/folders.py b/server/app/storage/folders.py new file mode 100644 index 0000000..1e47a01 --- /dev/null +++ b/server/app/storage/folders.py @@ -0,0 +1,173 @@ +from __future__ import annotations + +from sqlalchemy import or_ +from sqlalchemy.orm import Session + +from app.db.models import FileAsset, FileFolder +from app.storage.access import ensure_owner_access +from app.storage.common import FileStorageError, utcnow +from app.storage.files import _asset_query_for_owner +from app.storage.paths import normalize_folder + + +def _owner_filter(query, owner_type: str, owner_id: str): + if owner_type == "user": + return query.filter(FileFolder.owner_user_id == owner_id) + if owner_type == "group": + return query.filter(FileFolder.owner_group_id == owner_id) + raise FileStorageError("Unsupported owner type") + + +def create_folder( + session: Session, + *, + tenant_id: str, + owner_type: str, + owner_id: str, + user_id: str, + path: str, + is_admin: bool = False, +) -> FileFolder: + owner_type = owner_type.lower().strip() + ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin) + normalized = normalize_folder(path) + if not normalized: + raise FileStorageError("Folder path is required") + query = session.query(FileFolder).filter(FileFolder.tenant_id == tenant_id, FileFolder.owner_type == owner_type, FileFolder.path == normalized) + query = _owner_filter(query, owner_type, owner_id) + active_existing = query.filter(FileFolder.deleted_at.is_(None)).first() + if active_existing is not None: + raise FileStorageError(f"Folder already exists: {normalized}") + deleted_existing = query.filter(FileFolder.deleted_at.is_not(None)).first() + if deleted_existing is not None: + deleted_existing.deleted_at = None + session.add(deleted_existing) + session.flush() + return deleted_existing + folder = FileFolder( + tenant_id=tenant_id, + owner_type=owner_type, + owner_user_id=owner_id if owner_type == "user" else None, + owner_group_id=owner_id if owner_type == "group" else None, + path=normalized, + created_by_user_id=user_id, + metadata_={}, + ) + session.add(folder) + session.flush() + return folder + + +def list_folders_for_user( + session: Session, + *, + tenant_id: str, + user_id: str, + owner_type: str, + owner_id: str, + include_deleted: bool = False, + is_admin: bool = False, +) -> list[FileFolder]: + owner_type = owner_type.lower().strip() + ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin) + query = session.query(FileFolder).filter(FileFolder.tenant_id == tenant_id, FileFolder.owner_type == owner_type) + query = _owner_filter(query, owner_type, owner_id) + if not include_deleted: + query = query.filter(FileFolder.deleted_at.is_(None)) + return query.order_by(FileFolder.path.asc()).all() + + +def soft_delete_folder( + session: Session, + *, + tenant_id: str, + owner_type: str, + owner_id: str, + user_id: str, + path: str, + recursive: bool = True, + is_admin: bool = False, +) -> tuple[int, int]: + owner_type = owner_type.lower().strip() + ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin) + normalized = normalize_folder(path) + if not normalized: + raise FileStorageError("Folder path is required") + prefix = f"{normalized}/" + now = utcnow() + + folder_query = session.query(FileFolder).filter(FileFolder.tenant_id == tenant_id, FileFolder.owner_type == owner_type, FileFolder.deleted_at.is_(None)) + folder_query = _owner_filter(folder_query, owner_type, owner_id) + if recursive: + folder_query = folder_query.filter(or_(FileFolder.path == normalized, FileFolder.path.like(f"{prefix}%"))) + else: + child_exists = folder_query.filter(FileFolder.path.like(f"{prefix}%")).first() is not None + file_exists = _asset_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id).filter(FileAsset.display_path.like(f"{prefix}%")).first() is not None + if child_exists or file_exists: + raise FileStorageError("Folder is not empty") + folder_query = folder_query.filter(FileFolder.path == normalized) + + folders = folder_query.all() + for folder in folders: + folder.deleted_at = now + session.add(folder) + + file_query = _asset_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id).filter(FileAsset.deleted_at.is_(None), FileAsset.display_path.like(f"{prefix}%")) + assets = file_query.all() if recursive else [] + for asset in assets: + asset.deleted_at = now + session.add(asset) + + return len(folders), len(assets) + + +def _active_folder_exists(session: Session, *, tenant_id: str, owner_type: str, owner_id: str, path: str, exclude_paths: set[str] | None = None) -> bool: + normalized = normalize_folder(path) + if not normalized: + return False + query = session.query(FileFolder).filter( + FileFolder.tenant_id == tenant_id, + FileFolder.owner_type == owner_type, + FileFolder.deleted_at.is_(None), + FileFolder.path == normalized, + ) + query = _owner_filter(query, owner_type, owner_id) + if exclude_paths: + query = query.filter(FileFolder.path.notin_(exclude_paths)) + return query.first() is not None + + +def _folder_query_for_owner(session: Session, *, tenant_id: str, owner_type: str, owner_id: str): + query = session.query(FileFolder).filter(FileFolder.tenant_id == tenant_id, FileFolder.owner_type == owner_type) + return _owner_filter(query, owner_type, owner_id) + + +def _ensure_target_folder_hierarchy( + session: Session, + *, + tenant_id: str, + owner_type: str, + owner_id: str, + user_id: str, + path: str, +) -> None: + parts = normalize_folder(path).split("/") if normalize_folder(path) else [] + for index in range(1, len(parts) + 1): + partial = "/".join(parts[:index]) + query = _folder_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id).filter(FileFolder.path == partial) + existing = query.one_or_none() + if existing: + if existing.deleted_at is not None: + existing.deleted_at = None + session.add(existing) + continue + folder = FileFolder( + tenant_id=tenant_id, + owner_type=owner_type, + owner_user_id=owner_id if owner_type == "user" else None, + owner_group_id=owner_id if owner_type == "group" else None, + path=partial, + created_by_user_id=user_id, + metadata_={}, + ) + session.add(folder) diff --git a/server/app/storage/search.py b/server/app/storage/search.py new file mode 100644 index 0000000..202797a --- /dev/null +++ b/server/app/storage/search.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +import re +from typing import Iterable + +from app.db.models import FileAsset +from app.storage.common import ResolvedPattern +from app.storage.paths import normalize_folder, normalize_logical_path + + +def _normalize_pattern(pattern: str) -> str: + if pattern.strip() in {"", "*"}: + return "*" + return normalize_logical_path(pattern, fallback_filename="*") + + +def _logical_glob_regex(pattern: str, *, case_sensitive: bool = False) -> re.Pattern[str]: + """Compile Multi Seal Mail logical globs. + + `*` and `?` stay within one folder segment. `**` crosses folder + boundaries, and `**/` also matches the current folder so `**/*.pdf` + returns direct and nested PDF files. + """ + + pattern = _normalize_pattern(pattern) + pieces = ["^"] + index = 0 + while index < len(pattern): + char = pattern[index] + if char == "*": + if index + 1 < len(pattern) and pattern[index + 1] == "*": + index += 2 + if index < len(pattern) and pattern[index] == "/": + pieces.append("(?:.*/)?") + index += 1 + else: + pieces.append(".*") + continue + pieces.append("[^/]*") + elif char == "?": + pieces.append("[^/]") + else: + pieces.append(re.escape(char)) + index += 1 + pieces.append("$") + flags = 0 if case_sensitive else re.IGNORECASE + return re.compile("".join(pieces), flags) + + +def _relative_display_path(asset: FileAsset, base_path: str | None) -> str: + path = normalize_logical_path(asset.display_path) + base = normalize_folder(base_path) + if not base: + return path + prefix = f"{base}/" + if path.startswith(prefix): + return path[len(prefix) :] + return path + + +def match_assets(assets: Iterable[FileAsset], pattern: str, *, base_path: str | None = None, case_sensitive: bool = False) -> list[FileAsset]: + regex = _logical_glob_regex(pattern, case_sensitive=case_sensitive) + normalized_pattern = _normalize_pattern(pattern) + has_path_context = base_path is not None or "/" in normalized_pattern or "**" in normalized_pattern + matches: list[FileAsset] = [] + for asset in assets: + candidates = [_relative_display_path(asset, base_path)] if has_path_context else [asset.display_path, asset.filename] + if any(regex.match(candidate) for candidate in candidates): + matches.append(asset) + return matches + + +def resolve_patterns(assets: list[FileAsset], patterns: list[str], *, base_path: str | None = None, case_sensitive: bool = False) -> tuple[list[ResolvedPattern], list[FileAsset]]: + resolved = [ResolvedPattern(pattern=pattern, matches=match_assets(assets, pattern, base_path=base_path, case_sensitive=case_sensitive)) for pattern in patterns] + matched_ids = {asset.id for item in resolved for asset in item.matches} + unmatched = [asset for asset in assets if asset.id not in matched_ids] + return resolved, unmatched diff --git a/server/app/storage/services.py b/server/app/storage/services.py index 9ac9b64..e991d8e 100644 --- a/server/app/storage/services.py +++ b/server/app/storage/services.py @@ -1,750 +1,86 @@ from __future__ import annotations -import hashlib -import mimetypes -import re -import zipfile -from dataclasses import dataclass -from datetime import datetime, timezone -from io import BytesIO -from pathlib import PurePosixPath -from typing import Any, Iterable -from uuid import uuid4 +# Compatibility facade for existing storage imports. New storage code should +# prefer importing from the focused modules directly. -from sqlalchemy import or_ -from sqlalchemy.orm import Session - -from app.db.models import ( - Campaign, - CampaignAttachmentUse, - CampaignJob, - FileAsset, - FileBlob, - FileFolder, - FileShare, - FileVersion, - Group, - UserGroupMembership, +from app.storage.access import ensure_group_access, ensure_owner_access, user_group_ids +from app.storage.archives import create_zip_bytes, extract_zip_upload +from app.storage.campaign_usage import mark_job_attachment_uses_sent, record_campaign_attachment_uses_for_job +from app.storage.common import ( + FileConflictResolution, + FileStorageError, + RenamePlanItem, + ResolvedPattern, + UploadedStoredFile, + utcnow, ) -from app.settings import settings -from app.storage.backends import get_storage_backend -from app.storage.paths import filename_from_path, join_folder_filename, normalize_folder, normalize_logical_path, safe_storage_component - - -class FileStorageError(RuntimeError): - pass - - -@dataclass(slots=True) -class UploadedStoredFile: - asset: FileAsset - version: FileVersion - blob: FileBlob - - -@dataclass(slots=True) -class ResolvedPattern: - pattern: str - matches: list[FileAsset] - - -def utcnow() -> datetime: - return datetime.now(timezone.utc) - - -def user_group_ids(session: Session, *, tenant_id: str, user_id: str, include_admin_groups: bool = False) -> list[str]: - if include_admin_groups: - return [row.id for row in session.query(Group).filter(Group.tenant_id == tenant_id).order_by(Group.name.asc()).all()] - return [ - row.group_id - for row in session.query(UserGroupMembership) - .filter(UserGroupMembership.tenant_id == tenant_id, UserGroupMembership.user_id == user_id) - .all() - ] - - -def ensure_group_access(session: Session, *, tenant_id: str, group_id: str, user_id: str, is_admin: bool = False) -> None: - group = session.get(Group, group_id) - if not group or group.tenant_id != tenant_id: - raise FileStorageError("Group not found") - if is_admin: - return - membership = ( - session.query(UserGroupMembership) - .filter(UserGroupMembership.tenant_id == tenant_id, UserGroupMembership.user_id == user_id, UserGroupMembership.group_id == group_id) - .one_or_none() - ) - if membership is None: - raise FileStorageError("No access to this group file space") - - - - -def _owner_filter(query, owner_type: str, owner_id: str): - if owner_type == "user": - return query.filter(FileFolder.owner_user_id == owner_id) - if owner_type == "group": - return query.filter(FileFolder.owner_group_id == owner_id) - raise FileStorageError("Unsupported owner type") - - -def ensure_owner_access(session: Session, *, tenant_id: str, owner_type: str, owner_id: str, user_id: str, is_admin: bool = False) -> None: - owner_type = owner_type.lower().strip() - if owner_type == "user": - if owner_id != user_id and not is_admin: - raise FileStorageError("No access to this user file space") - return - if owner_type == "group": - ensure_group_access(session, tenant_id=tenant_id, group_id=owner_id, user_id=user_id, is_admin=is_admin) - return - raise FileStorageError("Files must be owned by a user or group") - - -def create_folder( - session: Session, - *, - tenant_id: str, - owner_type: str, - owner_id: str, - user_id: str, - path: str, - is_admin: bool = False, -) -> FileFolder: - owner_type = owner_type.lower().strip() - ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin) - normalized = normalize_folder(path) - if not normalized: - raise FileStorageError("Folder path is required") - query = session.query(FileFolder).filter(FileFolder.tenant_id == tenant_id, FileFolder.owner_type == owner_type, FileFolder.path == normalized) - query = _owner_filter(query, owner_type, owner_id) - existing = query.order_by(FileFolder.deleted_at.asc()).first() - if existing: - if existing.deleted_at is not None: - existing.deleted_at = None - session.add(existing) - return existing - folder = FileFolder( - tenant_id=tenant_id, - owner_type=owner_type, - owner_user_id=owner_id if owner_type == "user" else None, - owner_group_id=owner_id if owner_type == "group" else None, - path=normalized, - created_by_user_id=user_id, - metadata_={}, - ) - session.add(folder) - session.flush() - return folder - - -def list_folders_for_user( - session: Session, - *, - tenant_id: str, - user_id: str, - owner_type: str, - owner_id: str, - include_deleted: bool = False, - is_admin: bool = False, -) -> list[FileFolder]: - owner_type = owner_type.lower().strip() - ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin) - query = session.query(FileFolder).filter(FileFolder.tenant_id == tenant_id, FileFolder.owner_type == owner_type) - query = _owner_filter(query, owner_type, owner_id) - if not include_deleted: - query = query.filter(FileFolder.deleted_at.is_(None)) - return query.order_by(FileFolder.path.asc()).all() - - -def soft_delete_folder( - session: Session, - *, - tenant_id: str, - owner_type: str, - owner_id: str, - user_id: str, - path: str, - recursive: bool = True, - is_admin: bool = False, -) -> tuple[int, int]: - owner_type = owner_type.lower().strip() - ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin) - normalized = normalize_folder(path) - if not normalized: - raise FileStorageError("Folder path is required") - prefix = f"{normalized}/" - now = utcnow() - - folder_query = session.query(FileFolder).filter(FileFolder.tenant_id == tenant_id, FileFolder.owner_type == owner_type, FileFolder.deleted_at.is_(None)) - folder_query = _owner_filter(folder_query, owner_type, owner_id) - if recursive: - folder_query = folder_query.filter(or_(FileFolder.path == normalized, FileFolder.path.like(f"{prefix}%"))) - else: - child_exists = folder_query.filter(FileFolder.path.like(f"{prefix}%")).first() is not None - file_exists = _asset_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id).filter(FileAsset.display_path.like(f"{prefix}%")).first() is not None - if child_exists or file_exists: - raise FileStorageError("Folder is not empty") - folder_query = folder_query.filter(FileFolder.path == normalized) - - folders = folder_query.all() - for folder in folders: - folder.deleted_at = now - session.add(folder) - - file_query = _asset_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id).filter(FileAsset.deleted_at.is_(None), FileAsset.display_path.like(f"{prefix}%")) - assets = file_query.all() if recursive else [] - for asset in assets: - asset.deleted_at = now - session.add(asset) - - return len(folders), len(assets) - - -def _asset_query_for_owner(session: Session, *, tenant_id: str, owner_type: str, owner_id: str): - query = session.query(FileAsset).filter(FileAsset.tenant_id == tenant_id, FileAsset.owner_type == owner_type) - if owner_type == "user": - return query.filter(FileAsset.owner_user_id == owner_id) - if owner_type == "group": - return query.filter(FileAsset.owner_group_id == owner_id) - raise FileStorageError("Unsupported owner type") - -def _storage_bucket_name() -> str: - return settings.file_storage_s3_bucket or settings.s3_bucket - - -def _storage_backend_name() -> str: - return settings.file_storage_backend.lower().strip() - - -def _storage_key(*, tenant_id: str, checksum: str, filename: str) -> str: - return f"tenants/{tenant_id}/files/{checksum[:2]}/{uuid4().hex}-{safe_storage_component(filename)}" - - -def _get_or_create_blob( - session: Session, - *, - tenant_id: str, - data: bytes, - filename: str, - content_type: str | None, -) -> FileBlob: - checksum = hashlib.sha256(data).hexdigest() - size = len(data) - blob = ( - session.query(FileBlob) - .filter(FileBlob.tenant_id == tenant_id, FileBlob.checksum_sha256 == checksum, FileBlob.size_bytes == size) - .one_or_none() - ) - if blob: - blob.ref_count += 1 - session.add(blob) - return blob - - storage_key = _storage_key(tenant_id=tenant_id, checksum=checksum, filename=filename) - backend = get_storage_backend() - backend.put_bytes(storage_key, data, content_type=content_type) - blob = FileBlob( - tenant_id=tenant_id, - storage_backend=_storage_backend_name(), - storage_bucket=_storage_bucket_name(), - storage_key=storage_key, - checksum_sha256=checksum, - size_bytes=size, - content_type=content_type, - ref_count=1, - ) - session.add(blob) - session.flush() - return blob - - -def create_file_asset( - session: Session, - *, - tenant_id: str, - owner_type: str, - owner_id: str, - user_id: str, - filename: str, - data: bytes, - folder: str | None = None, - display_path: str | None = None, - content_type: str | None = None, - description: str | None = None, - metadata: dict[str, Any] | None = None, - campaign_id: str | None = None, - is_admin: bool = False, -) -> UploadedStoredFile: - owner_type = owner_type.lower().strip() - ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin) - - safe_filename = filename_from_path(normalize_logical_path(filename, fallback_filename="file")) - logical_path = normalize_logical_path(display_path) if display_path else join_folder_filename(folder, safe_filename) - if not content_type: - content_type = mimetypes.guess_type(safe_filename)[0] or "application/octet-stream" - - blob = _get_or_create_blob(session, tenant_id=tenant_id, data=data, filename=safe_filename, content_type=content_type) - asset = FileAsset( - tenant_id=tenant_id, - owner_type=owner_type, - owner_user_id=owner_id if owner_type == "user" else None, - owner_group_id=owner_id if owner_type == "group" else None, - display_path=logical_path, - filename=filename_from_path(logical_path), - description=description, - created_by_user_id=user_id, - metadata_=metadata or {}, - ) - session.add(asset) - session.flush() - version = FileVersion( - tenant_id=tenant_id, - file_asset_id=asset.id, - blob_id=blob.id, - version_number=1, - filename_at_upload=safe_filename, - display_path_at_upload=logical_path, - content_type=content_type, - size_bytes=blob.size_bytes, - checksum_sha256=blob.checksum_sha256, - created_by_user_id=user_id, - ) - session.add(version) - session.flush() - asset.current_version_id = version.id - session.add(asset) - if campaign_id: - share_file(session, tenant_id=tenant_id, asset=asset, target_type="campaign", target_id=campaign_id, permission="read", user_id=user_id) - return UploadedStoredFile(asset=asset, version=version, blob=blob) - - -def get_asset_for_user(session: Session, *, tenant_id: str, user_id: str, asset_id: str, require_write: bool = False, is_admin: bool = False) -> FileAsset: - asset = session.get(FileAsset, asset_id) - if not asset or asset.tenant_id != tenant_id or asset.deleted_at is not None: - raise FileStorageError("File not found") - if is_admin: - return asset - group_ids = user_group_ids(session, tenant_id=tenant_id, user_id=user_id) - owns = (asset.owner_type == "user" and asset.owner_user_id == user_id) or (asset.owner_type == "group" and asset.owner_group_id in group_ids) - if owns: - return asset - permission_values = ["read", "write", "manage"] if not require_write else ["write", "manage"] - share = ( - session.query(FileShare) - .filter( - FileShare.tenant_id == tenant_id, - FileShare.file_asset_id == asset.id, - FileShare.revoked_at.is_(None), - FileShare.permission.in_(permission_values), - or_( - (FileShare.target_type == "user") & (FileShare.target_id == user_id), - (FileShare.target_type == "group") & (FileShare.target_id.in_(group_ids)), - (FileShare.target_type == "tenant") & (FileShare.target_id == tenant_id), - ), - ) - .first() - ) - if not share: - raise FileStorageError("No access to this file") - return asset - - -def list_assets_for_user( - session: Session, - *, - tenant_id: str, - user_id: str, - owner_type: str | None = None, - owner_id: str | None = None, - campaign_id: str | None = None, - path_prefix: str | None = None, - include_deleted: bool = False, - is_admin: bool = False, -) -> list[FileAsset]: - query = session.query(FileAsset).filter(FileAsset.tenant_id == tenant_id) - if not include_deleted: - query = query.filter(FileAsset.deleted_at.is_(None)) - if owner_type: - query = query.filter(FileAsset.owner_type == owner_type) - if owner_type == "user" and owner_id: - query = query.filter(FileAsset.owner_user_id == owner_id) - if owner_type == "group" and owner_id: - query = query.filter(FileAsset.owner_group_id == owner_id) - if campaign_id: - query = query.join(FileShare, FileShare.file_asset_id == FileAsset.id).filter( - FileShare.tenant_id == tenant_id, - FileShare.target_type == "campaign", - FileShare.target_id == campaign_id, - FileShare.revoked_at.is_(None), - ) - elif not is_admin and not owner_type: - group_ids = user_group_ids(session, tenant_id=tenant_id, user_id=user_id) - query = query.outerjoin(FileShare, FileShare.file_asset_id == FileAsset.id).filter( - or_( - (FileAsset.owner_type == "user") & (FileAsset.owner_user_id == user_id), - (FileAsset.owner_type == "group") & (FileAsset.owner_group_id.in_(group_ids)), - (FileShare.revoked_at.is_(None)) & (FileShare.target_type == "user") & (FileShare.target_id == user_id), - (FileShare.revoked_at.is_(None)) & (FileShare.target_type == "group") & (FileShare.target_id.in_(group_ids)), - (FileShare.revoked_at.is_(None)) & (FileShare.target_type == "tenant") & (FileShare.target_id == tenant_id), - ) - ) - if path_prefix: - prefix = normalize_folder(path_prefix) - if prefix: - query = query.filter(FileAsset.display_path.like(f"{prefix}/%")) - return query.order_by(FileAsset.display_path.asc(), FileAsset.updated_at.desc()).all() - - -def current_version_and_blob(session: Session, asset: FileAsset) -> tuple[FileVersion, FileBlob]: - if not asset.current_version_id: - raise FileStorageError("File has no current version") - version = session.get(FileVersion, asset.current_version_id) - if not version: - raise FileStorageError("File version not found") - blob = session.get(FileBlob, version.blob_id) - if not blob: - raise FileStorageError("File blob not found") - return version, blob - - -def read_asset_bytes(session: Session, asset: FileAsset) -> tuple[bytes, FileVersion, FileBlob]: - version, blob = current_version_and_blob(session, asset) - backend = get_storage_backend() - return backend.get_bytes(blob.storage_key), version, blob - - -def share_file( - session: Session, - *, - tenant_id: str, - asset: FileAsset, - target_type: str, - target_id: str, - permission: str, - user_id: str, -) -> FileShare: - target_type = target_type.lower().strip() - permission = permission.lower().strip() - if target_type not in {"user", "group", "campaign", "tenant"}: - raise FileStorageError("Unsupported share target") - if permission not in {"read", "write", "manage"}: - raise FileStorageError("Unsupported file permission") - if target_type == "campaign": - campaign = session.get(Campaign, target_id) - if not campaign or campaign.tenant_id != tenant_id: - raise FileStorageError("Campaign not found") - existing = ( - session.query(FileShare) - .filter( - FileShare.tenant_id == tenant_id, - FileShare.file_asset_id == asset.id, - FileShare.target_type == target_type, - FileShare.target_id == target_id, - FileShare.revoked_at.is_(None), - ) - .one_or_none() - ) - if existing: - existing.permission = permission - session.add(existing) - return existing - share = FileShare( - tenant_id=tenant_id, - file_asset_id=asset.id, - target_type=target_type, - target_id=target_id, - permission=permission, - created_by_user_id=user_id, - ) - session.add(share) - return share - - -def soft_delete_assets(session: Session, assets: Iterable[FileAsset]) -> int: - count = 0 - now = utcnow() - for asset in assets: - if asset.deleted_at is None: - asset.deleted_at = now - session.add(asset) - count += 1 - return count - - -def asset_is_audit_relevant(session: Session, asset: FileAsset) -> bool: - return ( - session.query(CampaignAttachmentUse) - .filter(CampaignAttachmentUse.file_asset_id == asset.id, CampaignAttachmentUse.use_stage == "sent") - .first() - is not None - ) - - -def _normalize_pattern(pattern: str) -> str: - if pattern.strip() in {"", "*"}: - return "*" - return normalize_logical_path(pattern, fallback_filename="*") - - -def _logical_glob_regex(pattern: str) -> re.Pattern[str]: - """Compile Multi Seal Mail logical globs. - - `*` and `?` stay within one folder segment. `**` crosses folder - boundaries, and `**/` also matches the current folder so `**/*.pdf` - returns direct and nested PDF files. - """ - - pattern = _normalize_pattern(pattern) - pieces = ["^"] - index = 0 - while index < len(pattern): - char = pattern[index] - if char == "*": - if index + 1 < len(pattern) and pattern[index + 1] == "*": - index += 2 - if index < len(pattern) and pattern[index] == "/": - pieces.append("(?:.*/)?") - index += 1 - else: - pieces.append(".*") - continue - pieces.append("[^/]*") - elif char == "?": - pieces.append("[^/]") - else: - pieces.append(re.escape(char)) - index += 1 - pieces.append("$") - return re.compile("".join(pieces)) - - -def _relative_display_path(asset: FileAsset, base_path: str | None) -> str: - path = normalize_logical_path(asset.display_path) - base = normalize_folder(base_path) - if not base: - return path - prefix = f"{base}/" - if path.startswith(prefix): - return path[len(prefix) :] - return path - - -def match_assets(assets: Iterable[FileAsset], pattern: str, *, base_path: str | None = None) -> list[FileAsset]: - regex = _logical_glob_regex(pattern) - normalized_pattern = _normalize_pattern(pattern) - has_path_context = base_path is not None or "/" in normalized_pattern or "**" in normalized_pattern - matches: list[FileAsset] = [] - for asset in assets: - candidates = [_relative_display_path(asset, base_path)] if has_path_context else [asset.display_path, asset.filename] - if any(regex.match(candidate) for candidate in candidates): - matches.append(asset) - return matches - - -def resolve_patterns(assets: list[FileAsset], patterns: list[str], *, base_path: str | None = None) -> tuple[list[ResolvedPattern], list[FileAsset]]: - resolved = [ResolvedPattern(pattern=pattern, matches=match_assets(assets, pattern, base_path=base_path)) for pattern in patterns] - matched_ids = {asset.id for item in resolved for asset in item.matches} - unmatched = [asset for asset in assets if asset.id not in matched_ids] - return resolved, unmatched - - -def rename_asset(asset: FileAsset, *, new_path: str) -> None: - normalized = normalize_logical_path(new_path) - asset.display_path = normalized - asset.filename = filename_from_path(normalized) - - -def build_rename_preview(asset: FileAsset, *, mode: str, find: str | None = None, replacement: str = "", prefix: str = "", suffix: str = "") -> str: - path = PurePosixPath(asset.display_path) - folder = "" if str(path.parent) == "." else str(path.parent) - name = path.name - stem = PurePosixPath(name).stem - ext = "".join(PurePosixPath(name).suffixes) - if mode == "prefix": - next_name = prefix + name - elif mode == "suffix": - next_name = f"{stem}{suffix}{ext}" - elif mode == "replace": - if not find: - next_name = name - else: - next_name = name.replace(find, replacement) - else: - raise FileStorageError("Unsupported rename mode") - return f"{folder}/{next_name}" if folder else next_name - - -def create_zip_bytes(session: Session, assets: Iterable[FileAsset]) -> bytes: - buffer = BytesIO() - with zipfile.ZipFile(buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as archive: - for asset in assets: - data, _, _ = read_asset_bytes(session, asset) - archive.writestr(asset.display_path, data) - buffer.seek(0) - return buffer.getvalue() - - -def extract_zip_upload( - session: Session, - *, - tenant_id: str, - owner_type: str, - owner_id: str, - user_id: str, - zip_data: bytes, - folder: str | None, - campaign_id: str | None, - is_admin: bool = False, - max_files: int = 1000, - max_total_bytes: int = 250 * 1024 * 1024, -) -> list[UploadedStoredFile]: - uploaded: list[UploadedStoredFile] = [] - total = 0 - base_folder = normalize_folder(folder) - with zipfile.ZipFile(BytesIO(zip_data)) as archive: - infos = [info for info in archive.infolist() if not info.is_dir()] - if len(infos) > max_files: - raise FileStorageError(f"ZIP contains too many files (limit {max_files})") - for info in infos: - if info.file_size < 0: - raise FileStorageError("Invalid ZIP member") - total += info.file_size - if total > max_total_bytes: - raise FileStorageError("ZIP is too large after extraction") - inner_path = normalize_logical_path(info.filename) - target_path = f"{base_folder}/{inner_path}" if base_folder else inner_path - data = archive.read(info) - uploaded.append( - create_file_asset( - session, - tenant_id=tenant_id, - owner_type=owner_type, - owner_id=owner_id, - user_id=user_id, - filename=filename_from_path(inner_path), - data=data, - display_path=target_path, - content_type=mimetypes.guess_type(inner_path)[0] or "application/octet-stream", - campaign_id=campaign_id, - is_admin=is_admin, - ) - ) - return uploaded - - -def _candidate_match_keys(raw_match: str) -> set[str]: - cleaned = raw_match.replace("\\", "/").strip().strip("/") - result = {cleaned} - if cleaned: - result.add(PurePosixPath(cleaned).name) - return {item for item in result if item} - - -def record_campaign_attachment_uses_for_job(session: Session, job: CampaignJob, *, stage: str = "built") -> None: - """Create best-effort immutable file-use records for matched managed files. - - Existing attachment resolution is still filesystem/path based. This bridge - records uses when a resolved attachment match can be tied to a managed file - by logical path or filename among files shared with the campaign. - """ - - attachments = job.resolved_attachments or [] - if not isinstance(attachments, list): - return - assets = list_assets_for_user( - session, - tenant_id=job.tenant_id, - user_id="", - campaign_id=job.campaign_id, - is_admin=True, - ) - by_key: dict[str, FileAsset] = {} - for asset in assets: - by_key[asset.display_path.strip("/")] = asset - by_key[asset.filename] = asset - for attachment in attachments: - if not isinstance(attachment, dict): - continue - matches = attachment.get("matches") if isinstance(attachment.get("matches"), list) else [] - for raw in matches: - if not isinstance(raw, str): - continue - asset = next((by_key[key] for key in _candidate_match_keys(raw) if key in by_key), None) - if not asset: - continue - version, blob = current_version_and_blob(session, asset) - exists = ( - session.query(CampaignAttachmentUse) - .filter( - CampaignAttachmentUse.campaign_job_id == job.id, - CampaignAttachmentUse.file_version_id == version.id, - CampaignAttachmentUse.filename_used == asset.filename, - CampaignAttachmentUse.use_stage == stage, - ) - .one_or_none() - ) - if exists: - continue - session.add( - CampaignAttachmentUse( - tenant_id=job.tenant_id, - campaign_id=job.campaign_id, - campaign_version_id=job.campaign_version_id, - campaign_job_id=job.id, - entry_index=job.entry_index, - entry_id=job.entry_id, - file_asset_id=asset.id, - file_version_id=version.id, - file_blob_id=blob.id, - filename_used=asset.filename, - checksum_sha256=blob.checksum_sha256, - size_bytes=blob.size_bytes, - content_type=blob.content_type, - use_stage=stage, - ) - ) - - -def mark_job_attachment_uses_sent(session: Session, job: CampaignJob) -> None: - record_campaign_attachment_uses_for_job(session, job, stage="built") - now = utcnow() - uses = ( - session.query(CampaignAttachmentUse) - .filter( - CampaignAttachmentUse.tenant_id == job.tenant_id, - CampaignAttachmentUse.campaign_job_id == job.id, - CampaignAttachmentUse.use_stage == "built", - ) - .all() - ) - for use in uses: - sent = ( - session.query(CampaignAttachmentUse) - .filter( - CampaignAttachmentUse.campaign_job_id == job.id, - CampaignAttachmentUse.file_version_id == use.file_version_id, - CampaignAttachmentUse.use_stage == "sent", - ) - .one_or_none() - ) - if sent: - continue - session.add( - CampaignAttachmentUse( - tenant_id=use.tenant_id, - campaign_id=use.campaign_id, - campaign_version_id=use.campaign_version_id, - campaign_job_id=use.campaign_job_id, - entry_index=use.entry_index, - entry_id=use.entry_id, - file_asset_id=use.file_asset_id, - file_version_id=use.file_version_id, - file_blob_id=use.file_blob_id, - filename_used=use.filename_used, - checksum_sha256=use.checksum_sha256, - size_bytes=use.size_bytes, - content_type=use.content_type, - use_stage="sent", - used_at=now, - ) - ) +from app.storage.files import ( + _active_asset_at_path, + _active_asset_exists, + _asset_owner_id, + _asset_query_for_owner, + _candidate_renamed_path, + _copy_asset_to_path, + _get_or_create_blob, + _next_available_logical_path, + _normalize_conflict_strategy, + _resolution_by_path, + _soft_delete_conflicting_asset, + _split_logical_path, + _storage_backend_name, + _storage_bucket_name, + _storage_key, + asset_is_audit_relevant, + create_file_asset, + current_version_and_blob, + get_asset_for_user, + list_assets_for_user, + read_asset_bytes, + rename_asset, + share_file, + soft_delete_assets, +) +from app.storage.folders import ( + _active_folder_exists, + _ensure_target_folder_hierarchy, + _folder_query_for_owner, + _owner_filter, + create_folder, + list_folders_for_user, + soft_delete_folder, +) +from app.storage.search import match_assets, resolve_patterns +from app.storage.transfers import build_rename_preview, rename_selection, transfer_selection + +__all__ = [ + "FileConflictResolution", + "FileStorageError", + "RenamePlanItem", + "ResolvedPattern", + "UploadedStoredFile", + "asset_is_audit_relevant", + "build_rename_preview", + "create_file_asset", + "create_folder", + "create_zip_bytes", + "current_version_and_blob", + "ensure_group_access", + "ensure_owner_access", + "extract_zip_upload", + "get_asset_for_user", + "list_assets_for_user", + "list_folders_for_user", + "mark_job_attachment_uses_sent", + "match_assets", + "read_asset_bytes", + "record_campaign_attachment_uses_for_job", + "rename_asset", + "rename_selection", + "resolve_patterns", + "share_file", + "soft_delete_assets", + "soft_delete_folder", + "transfer_selection", + "user_group_ids", + "utcnow", +] diff --git a/server/app/storage/transfers.py b/server/app/storage/transfers.py new file mode 100644 index 0000000..cc278bd --- /dev/null +++ b/server/app/storage/transfers.py @@ -0,0 +1,447 @@ +from __future__ import annotations + +from pathlib import PurePosixPath +from typing import Iterable + +from sqlalchemy import or_ +from sqlalchemy.orm import Session + +from app.db.models import FileAsset, FileFolder +from app.storage.access import ensure_owner_access +from app.storage.common import FileConflictResolution, FileStorageError, RenamePlanItem, utcnow +from app.storage.files import ( + _active_asset_exists, + _asset_owner_id, + _asset_query_for_owner, + _candidate_renamed_path, + _copy_asset_to_path, + _next_available_logical_path, + _normalize_conflict_strategy, + _resolution_by_path, + _soft_delete_conflicting_asset, + current_version_and_blob, + get_asset_for_user, + rename_asset, +) +from app.storage.folders import _active_folder_exists, _ensure_target_folder_hierarchy, _folder_query_for_owner +from app.storage.paths import filename_from_path, join_folder_filename, normalize_folder, normalize_logical_path + + +def transfer_selection( + session: Session, + *, + tenant_id: str, + user_id: str, + operation: str, + file_ids: list[str], + folder_paths: list[str], + source_owner_type: str, + source_owner_id: str, + target_owner_type: str, + target_owner_id: str, + target_folder: str, + conflict_strategy: str = "reject", + conflict_resolutions: Iterable[FileConflictResolution] | None = None, + is_admin: bool = False, +) -> tuple[int, int]: + """Move or copy files/folders between user/group file spaces. + + Folder transfers preserve the selected folder's basename below the target + folder. File transfers place files directly in the target folder. Existing + active target paths are handled by the requested conflict strategy. Copies + create new file assets/versions that reference the existing immutable blob. + """ + + operation = operation.lower().strip() + if operation not in {"move", "copy"}: + raise FileStorageError("Unsupported transfer operation") + source_owner_type = source_owner_type.lower().strip() + target_owner_type = target_owner_type.lower().strip() + source_folder_paths = [normalize_folder(path) for path in folder_paths if normalize_folder(path)] + target_folder = normalize_folder(target_folder) + conflict_strategy = _normalize_conflict_strategy(conflict_strategy) + conflict_resolution_map = _resolution_by_path(conflict_resolutions) + + ensure_owner_access(session, tenant_id=tenant_id, owner_type=source_owner_type, owner_id=source_owner_id, user_id=user_id, is_admin=is_admin) + ensure_owner_access(session, tenant_id=tenant_id, owner_type=target_owner_type, owner_id=target_owner_id, user_id=user_id, is_admin=is_admin) + + if operation == "move" and source_owner_type == target_owner_type and source_owner_id == target_owner_id: + for folder_path in source_folder_paths: + if target_folder == folder_path or target_folder.startswith(f"{folder_path}/"): + raise FileStorageError("Cannot move a folder into itself or one of its child folders") + + assets_by_id: dict[str, FileAsset] = {} + for file_id in file_ids: + asset = get_asset_for_user( + session, + tenant_id=tenant_id, + user_id=user_id, + asset_id=file_id, + require_write=operation == "move", + is_admin=is_admin, + ) + assets_by_id[asset.id] = asset + + folder_asset_targets: dict[str, str] = {} + folder_target_paths: dict[str, str] = {} + for folder_path in source_folder_paths: + folder_basename = PurePosixPath(folder_path).name + target_prefix = normalize_folder(f"{target_folder}/{folder_basename}" if target_folder else folder_basename) + folder_target_paths[folder_path] = target_prefix + if operation == "copy" or not (source_owner_type == target_owner_type and source_owner_id == target_owner_id and target_prefix == folder_path): + if _active_folder_exists(session, tenant_id=tenant_id, owner_type=target_owner_type, owner_id=target_owner_id, path=target_prefix): + resolution = conflict_resolution_map.get(target_prefix) + action = resolution.action if resolution else conflict_strategy + if action == "skip": + folder_target_paths.pop(folder_path, None) + continue + if action == "rename": + # Rename the selected folder root while preserving its contents below it. + counter = 1 + candidate = target_prefix + while _active_folder_exists(session, tenant_id=tenant_id, owner_type=target_owner_type, owner_id=target_owner_id, path=candidate): + candidate = _candidate_renamed_path(target_prefix, counter) + counter += 1 + target_prefix = normalize_folder(candidate) + folder_target_paths[folder_path] = target_prefix + elif action == "reject": + raise FileStorageError(f"Target folder already exists: {target_prefix}") + # overwrite on folders means merge into the existing folder; conflicting files are still handled below. + source_assets = _asset_query_for_owner(session, tenant_id=tenant_id, owner_type=source_owner_type, owner_id=source_owner_id).filter( + FileAsset.deleted_at.is_(None), + FileAsset.display_path.like(f"{folder_path}/%"), + ).all() + for asset in source_assets: + relative = asset.display_path[len(folder_path) + 1 :] + folder_asset_targets[asset.id] = normalize_logical_path(f"{target_prefix}/{relative}") + assets_by_id[asset.id] = asset + + direct_file_targets: dict[str, str] = {} + for file_id, asset in assets_by_id.items(): + if file_id in folder_asset_targets: + continue + direct_file_targets[file_id] = normalize_logical_path(join_folder_filename(target_folder, filename_from_path(asset.display_path))) + + all_targets = {**folder_asset_targets, **direct_file_targets} + if not all_targets and not source_folder_paths: + return 0, 0 + + resolved_targets: dict[str, str] = {} + skipped_asset_ids: set[str] = set() + reserved_targets: set[str] = set() + for asset_id, target_path in all_targets.items(): + source_asset = assets_by_id[asset_id] + same_owner = (source_asset.owner_type == target_owner_type and _asset_owner_id(source_asset) == target_owner_id) + exclude = source_asset.id if operation == "move" and same_owner else None + normalized_target = normalize_logical_path(target_path) + resolution = conflict_resolution_map.get(normalized_target) + action = resolution.action if resolution else conflict_strategy + if resolution and resolution.new_path and action == "rename": + normalized_target = normalize_logical_path(resolution.new_path) + conflict = _active_asset_exists( + session, + tenant_id=tenant_id, + owner_type=target_owner_type, + owner_id=target_owner_id, + path=normalized_target, + exclude_asset_id=exclude, + ) or normalized_target in reserved_targets + if conflict: + if action == "reject": + raise FileStorageError(f"Target file already exists: {normalized_target}") + if action == "skip": + skipped_asset_ids.add(asset_id) + continue + if action == "overwrite": + _soft_delete_conflicting_asset( + session, + tenant_id=tenant_id, + owner_type=target_owner_type, + owner_id=target_owner_id, + path=normalized_target, + exclude_asset_id=exclude, + ) + elif action == "rename": + normalized_target = _next_available_logical_path( + session, + tenant_id=tenant_id, + owner_type=target_owner_type, + owner_id=target_owner_id, + desired_path=normalized_target, + reserved_paths=reserved_targets, + exclude_asset_id=exclude, + ) + elif action == "rename": + normalized_target = _next_available_logical_path( + session, + tenant_id=tenant_id, + owner_type=target_owner_type, + owner_id=target_owner_id, + desired_path=normalized_target, + reserved_paths=reserved_targets, + exclude_asset_id=exclude, + ) + reserved_targets.add(normalized_target) + resolved_targets[asset_id] = normalized_target + all_targets = resolved_targets + + copied_or_moved_files = 0 + copied_or_moved_folders = 0 + + # Create target folder hierarchy first, including selected folder roots and + # any nested folders/files parents. + same_owner_transfer = source_owner_type == target_owner_type and source_owner_id == target_owner_id + for target_path in folder_target_paths.values(): + path_to_create = target_path + if operation == "move" and same_owner_transfer: + parent = str(PurePosixPath(target_path).parent) + path_to_create = "" if parent == "." else parent + if path_to_create: + _ensure_target_folder_hierarchy(session, tenant_id=tenant_id, owner_type=target_owner_type, owner_id=target_owner_id, user_id=user_id, path=path_to_create) + copied_or_moved_folders += 1 + for target_path in all_targets.values(): + parent = str(PurePosixPath(target_path).parent) + if parent and parent != ".": + _ensure_target_folder_hierarchy(session, tenant_id=tenant_id, owner_type=target_owner_type, owner_id=target_owner_id, user_id=user_id, path=parent) + + if operation == "copy": + for asset_id, target_path in all_targets.items(): + _copy_asset_to_path(session, assets_by_id[asset_id], tenant_id=tenant_id, target_owner_type=target_owner_type, target_owner_id=target_owner_id, target_path=target_path, user_id=user_id) + copied_or_moved_files += 1 + return copied_or_moved_files, copied_or_moved_folders + + now = utcnow() + for asset_id, target_path in all_targets.items(): + asset = assets_by_id[asset_id] + same_owner = asset.owner_type == target_owner_type and _asset_owner_id(asset) == target_owner_id + if same_owner: + if normalize_logical_path(asset.display_path) == target_path: + continue + rename_asset(asset, new_path=target_path) + session.add(asset) + else: + _copy_asset_to_path(session, asset, tenant_id=tenant_id, target_owner_type=target_owner_type, target_owner_id=target_owner_id, target_path=target_path, user_id=user_id) + asset.deleted_at = now + session.add(asset) + copied_or_moved_files += 1 + + # Move/copy persisted folder records after files. For cross-owner moves, create + # target records and soft-delete source records. For same-owner moves, rename + # them in place. + for source_path, target_prefix in folder_target_paths.items(): + folder_rows = _folder_query_for_owner(session, tenant_id=tenant_id, owner_type=source_owner_type, owner_id=source_owner_id).filter( + FileFolder.deleted_at.is_(None), + or_(FileFolder.path == source_path, FileFolder.path.like(f"{source_path}/%")), + ).all() + for folder in folder_rows: + relative = "" if folder.path == source_path else folder.path[len(source_path) + 1 :] + new_path = normalize_folder(f"{target_prefix}/{relative}" if relative else target_prefix) + if operation == "move" and source_owner_type == target_owner_type and source_owner_id == target_owner_id: + folder.path = new_path + session.add(folder) + else: + _ensure_target_folder_hierarchy(session, tenant_id=tenant_id, owner_type=target_owner_type, owner_id=target_owner_id, user_id=user_id, path=new_path) + if operation == "move": + folder.deleted_at = now + session.add(folder) + return copied_or_moved_files, copied_or_moved_folders + + +def _rename_basename(name: str, *, mode: str, new_name: str | None = None, find: str | None = None, replacement: str = "", prefix: str = "", suffix: str = "") -> str: + if mode == "direct": + cleaned = normalize_logical_path(new_name or "", fallback_filename="item") + if "/" in cleaned: + raise FileStorageError("Rename expects a name, not a path") + return cleaned + stem_path = PurePosixPath(name) + suffixes = "".join(stem_path.suffixes) + stem = name[: -len(suffixes)] if suffixes else name + if mode == "prefix": + return prefix + name + if mode == "suffix": + return f"{stem}{suffix}{suffixes}" + if mode == "replace": + return name.replace(find or "", replacement) if find else name + raise FileStorageError("Unsupported rename mode") + + +def _rename_path(path: str, *, mode: str, new_name: str | None = None, find: str | None = None, replacement: str = "", prefix: str = "", suffix: str = "") -> str: + normalized = normalize_logical_path(path) + logical = PurePosixPath(normalized) + folder = "" if str(logical.parent) == "." else str(logical.parent) + next_name = _rename_basename(logical.name, mode=mode, new_name=new_name, find=find, replacement=replacement, prefix=prefix, suffix=suffix) + return normalize_logical_path(f"{folder}/{next_name}" if folder else next_name) + + +def _rename_relative_recursive(relative_path: str, *, mode: str, new_name: str | None = None, find: str | None = None, replacement: str = "", prefix: str = "", suffix: str = "") -> str: + parts = normalize_logical_path(relative_path).split("/") + return normalize_logical_path( + "/".join( + _rename_basename(part, mode=mode, new_name=new_name if len(parts) == 1 else None, find=find, replacement=replacement, prefix=prefix, suffix=suffix) + for part in parts + if part + ) + ) + + +def build_rename_preview(asset: FileAsset, *, mode: str, find: str | None = None, replacement: str = "", prefix: str = "", suffix: str = "") -> str: + return _rename_path(asset.display_path, mode=mode, find=find, replacement=replacement, prefix=prefix, suffix=suffix) + + +def _collapse_folder_roots(folder_paths: list[str]) -> list[str]: + roots = sorted({normalize_folder(path) for path in folder_paths if normalize_folder(path)}, key=lambda item: (item.count("/"), item)) + collapsed: list[str] = [] + for path in roots: + if any(path == root or path.startswith(f"{root}/") for root in collapsed): + continue + collapsed.append(path) + return collapsed + + +def _path_under_root(path: str, root: str) -> bool: + normalized = normalize_logical_path(path) + return normalized == root or normalized.startswith(f"{root}/") + + +def _folder_new_path_for_root(path: str, root: str, new_root: str, *, recursive: bool, mode: str, find: str | None, replacement: str, prefix: str, suffix: str) -> str: + if path == root: + return normalize_folder(new_root) + relative = path[len(root) + 1 :] + if recursive: + relative = _rename_relative_recursive(relative, mode=mode, find=find, replacement=replacement, prefix=prefix, suffix=suffix) + return normalize_folder(f"{new_root}/{relative}") + + +def _file_new_path_for_root(path: str, root: str, new_root: str, *, recursive: bool, mode: str, find: str | None, replacement: str, prefix: str, suffix: str) -> str: + relative = path[len(root) + 1 :] + if recursive: + relative = _rename_relative_recursive(relative, mode=mode, find=find, replacement=replacement, prefix=prefix, suffix=suffix) + return normalize_logical_path(f"{new_root}/{relative}") + + +def rename_selection( + session: Session, + *, + tenant_id: str, + user_id: str, + file_ids: list[str], + folder_paths: list[str], + owner_type: str | None, + owner_id: str | None, + mode: str, + new_name: str | None = None, + find: str | None = None, + replacement: str = "", + prefix: str = "", + suffix: str = "", + recursive: bool = False, + dry_run: bool = True, + is_admin: bool = False, +) -> list[RenamePlanItem]: + mode = mode.lower().strip() + if mode not in {"direct", "prefix", "suffix", "replace"}: + raise FileStorageError("Unsupported rename mode") + selected_file_ids = list(dict.fromkeys(file_ids)) + selected_folder_roots = _collapse_folder_roots(folder_paths) + if not selected_file_ids and not selected_folder_roots: + raise FileStorageError("No files or folders selected") + if selected_folder_roots and (not owner_type or not owner_id): + raise FileStorageError("Folder rename requires an owner file space") + if mode == "direct" and (len(selected_file_ids) + len(selected_folder_roots)) != 1: + raise FileStorageError("Direct rename requires exactly one selected item") + + assets: dict[str, FileAsset] = {} + for file_id in selected_file_ids: + asset = get_asset_for_user(session, tenant_id=tenant_id, user_id=user_id, asset_id=file_id, require_write=True, is_admin=is_admin) + assets[asset.id] = asset + + owner_type_norm = owner_type.lower().strip() if owner_type else None + owner_id_norm = owner_id + folder_rows_by_path: dict[str, FileFolder] = {} + affected_folder_paths: set[str] = set() + if selected_folder_roots and owner_type_norm and owner_id_norm: + ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type_norm, owner_id=owner_id_norm, user_id=user_id, is_admin=is_admin) + for root in selected_folder_roots: + rows = _folder_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type_norm, owner_id=owner_id_norm).filter( + FileFolder.deleted_at.is_(None), + or_(FileFolder.path == root, FileFolder.path.like(f"{root}/%")), + ).all() + if not rows and not _asset_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type_norm, owner_id=owner_id_norm).filter(FileAsset.deleted_at.is_(None), FileAsset.display_path.like(f"{root}/%")).first(): + raise FileStorageError(f"Folder not found: {root}") + for row in rows: + folder_rows_by_path[row.path] = row + affected_folder_paths.add(row.path) + for asset in _asset_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type_norm, owner_id=owner_id_norm).filter(FileAsset.deleted_at.is_(None), FileAsset.display_path.like(f"{root}/%")).all(): + assets[asset.id] = asset + + folder_plan: dict[str, str] = {} + root_target_paths: dict[str, str] = {} + for root in selected_folder_roots: + if mode == "direct": + root_parent = "" if str(PurePosixPath(root).parent) == "." else str(PurePosixPath(root).parent) + root_new_name = _rename_basename(PurePosixPath(root).name, mode=mode, new_name=new_name) + new_root = normalize_folder(f"{root_parent}/{root_new_name}" if root_parent else root_new_name) + else: + new_root = normalize_folder(_rename_path(root, mode=mode, find=find, replacement=replacement, prefix=prefix, suffix=suffix)) + root_target_paths[root] = new_root + if root in affected_folder_paths: + folder_plan[root] = new_root + for path in sorted(affected_folder_paths, key=lambda item: item.count("/")): + if path != root and _path_under_root(path, root): + folder_plan[path] = _folder_new_path_for_root(path, root, new_root, recursive=recursive, mode=mode, find=find, replacement=replacement, prefix=prefix, suffix=suffix) + + file_plan: dict[str, str] = {} + for asset in assets.values(): + matched_root = next((root for root in sorted(selected_folder_roots, key=len, reverse=True) if _path_under_root(asset.display_path, root)), None) + if matched_root: + root_new = root_target_paths.get(matched_root) + if not root_new: + continue + file_plan[asset.id] = _file_new_path_for_root(asset.display_path, matched_root, root_new, recursive=recursive, mode=mode, find=find, replacement=replacement, prefix=prefix, suffix=suffix) + else: + file_plan[asset.id] = _rename_path(asset.display_path, mode=mode, new_name=new_name, find=find, replacement=replacement, prefix=prefix, suffix=suffix) + + # Detect duplicate targets and existing active target conflicts before applying. + target_files: set[str] = set() + target_folders: set[str] = set() + affected_file_ids = set(file_plan) + for asset_id, target in file_plan.items(): + normalized = normalize_logical_path(target) + if normalized in target_files: + raise FileStorageError(f"Rename would create duplicate file path: {normalized}") + target_files.add(normalized) + asset = assets[asset_id] + if _active_asset_exists(session, tenant_id=tenant_id, owner_type=asset.owner_type, owner_id=_asset_owner_id(asset), path=normalized, exclude_asset_id=asset.id): + raise FileStorageError(f"Target file already exists: {normalized}") + if _active_folder_exists(session, tenant_id=tenant_id, owner_type=asset.owner_type, owner_id=_asset_owner_id(asset), path=normalized): + raise FileStorageError(f"Target path is already a folder: {normalized}") + if selected_folder_roots and owner_type_norm and owner_id_norm: + for old_path, target in folder_plan.items(): + normalized = normalize_folder(target) + if normalized in target_folders: + raise FileStorageError(f"Rename would create duplicate folder path: {normalized}") + target_folders.add(normalized) + if _active_folder_exists(session, tenant_id=tenant_id, owner_type=owner_type_norm, owner_id=owner_id_norm, path=normalized, exclude_paths=set(folder_plan.keys())): + raise FileStorageError(f"Target folder already exists: {normalized}") + if _active_asset_exists(session, tenant_id=tenant_id, owner_type=owner_type_norm, owner_id=owner_id_norm, path=normalized): + raise FileStorageError(f"Target path is already a file: {normalized}") + + plan: list[RenamePlanItem] = [] + for old_path, new_path in sorted(folder_plan.items()): + plan.append(RenamePlanItem(kind="folder", id=old_path, old_path=old_path, new_path=new_path)) + for asset_id, new_path in file_plan.items(): + asset = assets[asset_id] + plan.append(RenamePlanItem(kind="file", id=asset.id, old_path=asset.display_path, new_path=new_path)) + + if dry_run: + return plan + + for old_path, new_path in folder_plan.items(): + folder = folder_rows_by_path.get(old_path) + if folder: + folder.path = normalize_folder(new_path) + session.add(folder) + for asset_id, new_path in file_plan.items(): + rename_asset(assets[asset_id], new_path=new_path) + session.add(assets[asset_id]) + return plan diff --git a/server/multimailer-dev.db b/server/multimailer-dev.db index 332338a7f9d571facb58719ee9a1d7aa1dafbeee..2fda82b7047153667b1d3f2477f2d1f403b90dba 100644 GIT binary patch delta 20459 zcmeHv3zQVqxo%fiKkC)B8DJPdfdPicz*JAYA0v+$iNOb`NJa!fsj99rD9Vh0IU3D8 zNDw1gf~foW$caR9H71HCkrus)qInz>i4tSv7!whXF^L9ZqIgu4`|s-R>FxogZ@g!% zyUq%0sNLOr*WUkr{g3be_wL;{v$uElrqQY9r(&_#;mwV+;>}O>Eq`b^HFCD7*ln8C zW*AvRmo*))pG{qY%D-ujA=^9Jse_Lqtx;Cm6unKhvZ~=|`Hw!JQa@%I(HT%@>@XG{ zkI#yuGrnKf9!K=|`mOweXX5wEjK#%b)GStbC{3RqpTMqULv}g4g#8?QCVMJ-GCPAc zS(%;2HnC&bk?c@bfcH0~dXjW3PR)v*Go$B>=y_7~Y>S>w^t7X=6+O-9X+%#wdTP;A zjh;&Ml%r>B^vp(2DSEa<&*{-~S~7n-`g&`Gw)|{R$4c9cvKIi3!^>w+V;D{h22-Z;G>! zj<$>SnQ@tJv<&gWZgHpZMYK0_s(6vO39T2-f!?1e;I8ZVL{DhzzG7;Dqy{-%(sQ0J zd9vzCre@fttOYsEvKyw-os=uvcIc|Ul(P+A(%qaaIYCa5Oi%N5BXpdcZ;c=YW!3Nv zBWFvFn^Ps-Q%uPV9a-{q%L#2q^<_V-Cj~7(7minQjsk@OXxpdX#&H{ZLrV*Nb-vJB1_s*32oWRb0l+$y_fs@UOFZ?mGT_ zK@$H2o$O=rGk;DcdptLU+AJr~EiaTkP0@0mYUqX<20>`6PU!1_E1SA1dzNf^p<*bZ z6>4grc#dy{R>LWYz6_-+mTcIr>*$UaxRw`~o@FYE9q6F~Yg*A2-!(%|R@_i_WLI_^ z&9f{`4tqky@?<$25mf^(Yk}c;x@(!1>Da2~*iJ6=EI06#K(hiDsIf!O4gy0C<(#Fc zvg#S3Z@@_Vo>L#y61Zvrqi%${=URrJgL*>W)MdvDbFOc@rmP2{rnn|dMZK=-t^xC6 zD!LgOImOp>ZFnAr239ew9-MAuT8nGqR(@*!syq0v(t|U;O){Q_jDJqWf7?`KY?zAl z6ee?X=8DXsj16-V&tT^H0AnZWXo-anKe|^vW?+`v-3Ze((Oal2JuHRbQryhet@>4dl83{ zC?B5`UQb3{*Ci()+uIi=CaFeVLAqSl_5| z9UX*`9EXqK?%~{NJ^weG=2&}XV7Lq8SutQ0IkMI!M>&>eD7Gpn z)AOgRjRPeN)v#pw*zcN-D&vQ}^sA!UX2?VwnwoCO4&EH5&%;zMJ*FbVuq`X!l1sN7 z9?Biye$3s*b@3-tH}Q}2x9}_RdCQq|@#D*xz4$XOGl^=$`+TN3|GLY3jVb(M<93&6 zrmRAu7ZNQhM!j43<`j}yg=9t{IVno=!&Wi}=uwG-vGZfk$C7T+N@miXsRvWnrk16& z6pNWIX8iag@!j#e;#bDcicg|Gp!QIAQ`b-nspi=8$(NH4C9h9U$8BAVmr8adM(`_&eP|twJPYGc~G^$c3b}kYuAI|7DjAflH8AoA6DSFiY_B zq!P1G-Y6t0NkG^hiBkNp7c;#HkfUdBscWA%@cJZbX0g~I+_9FKn!o!C%rb^rjFY|0 zq8R9iFM8P`1DG;@I?X+Au3&+sHKumedmWi=-oxlmSpN+BgdCdfGz zDgk)}qBvB1C}~-eZ>zHI+aS~om&%foO8%sN=BW(5lwxpt6Qc+eL#-DxnO8GAsTc9A zO^o?jN@dSsr!q&F-!R{0)-g*Mg<;Y!$NS@n^p5n#v`0OVw$mu}TI$i%jj0u>*{Ko9 zza*bb-bO7=b|+6wj!V3g_(ftXetZ*iYN5xb>AAia`jV{JmZaOBFWHLaOO9^1pd=W9 z>RZ%ILY)oL$Z?72cXJ_(jjc$I?OBrM+Loi`OdDh~HG`CNa)GSbzAu3^gOZ-+N}g&3 z5{v|V20c;NJ<1{_%^pk1d77t5nw--lvWq1mSYiB?oGqKC9XJ7HMwP0nVaNukFq#2V z6RL(}o2o43TvrKTv_WdClpIy+xQYg9f}{s3OpOIvkfX>djJl@ip%Eyd=Tohuq@BBY z0Y4$%awGHn%v9pLv8yNnr=MkJPe^?^H6MoVqvRjxZ_$_0R{WLtz46uYCh9>-NCl~E z@@M$mXBjn4O~)IaWtI;~ekBO02ANdV)I@@$f{Ha*R z%8V4=6J8V^748tO5iSsB3ylKB@8_SS)A2vXpTf*OW=H<_`V;Gh2* z*1vKEH#Yy7SD9H!vZGGe+|iJSGLz{-Bh2&67An8yFU+Qsse5r!W)}~o57P(eee|#B zAJaSNd+6KfjranYy?<7!H+4zs+*C)ZCDo9M!;;$zi|+p9t;sJZS0$GuPf5zj6OyUK zTZxwvKTCWs@%6-p#OlNaiPI8#VqAj5OBME;acUC`%ti_yP*|adnw6o)#C`YDE_cO6 zmr*mSzMWL{t*z>tQ}xZR`es#q)2qH|!)b{swRa&tV6*A`zg^0Dar}zS4yVq~-)yog zo70)Ku`Sf)vHWM3vm>Yx)aB^U=-22+=pJ+4XYqu*To;83VAaNZ|HwWj zOy)o2f6ae~@8=iuGN0yt&ppI#gdlY!iO*3T7KM zr@$&3)-q30`7hkTer+PP5GNmJTgP(e2&ZvV*^h*A{2{8HYT%!yR?yed%V?eE*k9w* zA7_{4w?EGAf~rHP+M^20#4~cH6Bv%9=!OitS$2Vay1-kArcI>2t*g{5vhzIIw?UW! z_fX^j_NX69jzzR1&o^{kag@+dDW_Ock)ePj8({6|mIfQybydj!?jJzGknU(U;T6Tx*$o7k5S{JtvKxKOH(5{JE{Dl zVX^vpP*YAA)tvv1%KdwiS#jAFt5*2IYTUjt?cn=t?z9#n|78Q%r7X*eG%b7HyoKjY zXlb0_xvPTN(AGF%;I+EjxM2;`rnMgHB|IK;U&41e+_LHOuWSrgtPEB+wl=N_ zReXCckz1$B-ZI`;2xfcbwwzaE+%iFNq8gCj-3Us+^K3~-s(Aba&-Ic5JbK1DJ zIKJ?9uHh75d0-hjmLqjr>Se1hTV=XFd3&Yn1uMzx9c$LO{^u_ZE?d*6d}2TFf}e;J z3DLnYcz?~6T@i86Wdyb(L=zCM=2_?OZ~p7bRBXbQgD}kerzullU<4{lpgdrMHWl8c#e2 z)AmCeFI&rv1F;WGY#gWzQDWc)S8&agTKFaxlGZ|!jgtJ;Yq<>^t)(Bv>0WN!*!07h zEt!im?U@n6o5B;qtwI-L3Ukx@g)#g)jF6wv%dJTl7Iy#|one_E#sc3U@=paJsfUuH z$c}A=hT*vmbrM;KU}y(^U~7`=IIs>?P!K#_mnF>!0?Tl7mZK_^PD+NVsp|n4(Y_59 zx~}D5#j1ua!MauhFbqN`m!q_(&PgC}!NLtC&xb~J2jr=j10m;utb^rkSQ@A%Y992d zZHwpy@f|q5kL$xr?qnyxXbExL`x*?)Ec~DYx)h9BA)Qf7Pb#J$dZK)%n1b+$zFWoA zET%>=)r+ZCOx0ql6jQmFwieTDl;-bwja$kclgM`pL@q888KppElmd}a3PeUJ5E-RF zWRwDtQ3^yxDG(W@KxC8xkx>dnMkx>(r9fnq0+CS)L_(Us=N)e8=+QhS#Ka$D=J3De zE@l@p^{M-k^Xb>(Ybaq$UHbv8Ki6*wVBW-LkLm1N-O*8l>Vsd|mOcj`*vUP7L1FEy zZL*dHltb6F0pntDj@^H0zZo^uAw>*g$T9lc`cMooHHTdrrtR7B-1FsqSHuyV zHpR*smZfXu6pPx^1)gKF-7zx*nnWG*@m6-ecui32+&o*g3TG8EdjC$pswqH>>-w) zD{Hdt1qJ}5=5X9~r7$%=b|(La_~`o72Qlsv{vkAp9WP8oTZGN257IC2bJHvGqucr4 zrSJ=L`RQ{2c9soS4=e{Xvk=5vP6h4G0ioykj;Xi`ps*m=v_J*pP?tRqAXCSOF$qiy z%6hH8sK?_^gNj_Riw)2lbtUw@AZO<4byc4+wfEq;BZ-U zjSvh+Bb2ifhes9V=T)B zLJX~EyJE5Y(q4XZy5OQPd>eF5RRtRW^cr2YVMT=jI23>p1Begq2s>1XLm>yMyl(}n zWcUtI>;qGB0X>nx>)@Iyfc^lbE2N+*0|uV+z)Z*)CaCp#xEfTP}c#jcLB=qw46)Y2BpsT1IZ6m04^L@$fhZgg1%=szN!a35@1N69tOS)b_$q6 zs;2;_cO4^_^IAwr%Lkm_34usM2Awab8xmxKA}1RLD26VeLe6wj5NNOgV980gs}P7u z%|Su1e0*O~eUPWQTxd=s1%uFXz?%Uuho<_5m&*KsgRdOcf?eIqp|P+-^dA13BSxtMCS?|Zh3&u3c|g@^}Vu-jx{|C+7X>d3*_a+?k? zw4F6U8X2b>Eyu_Quce-KEKM`4X~)P`+6*P@fOIj{QH1{K4Q}I&{Fy^z4T#P9;)l6W-=b^^6$l&ON0^p zQ~c-n7Cynf#Qgy5{~UKR*T{avzQjI~{$Bb;HqPF_UckS>O8gB>oKcylnVa)1`}j3! zcM82Q@=Oq;RwZVw}0m><+6_rz3X`d zUR9l2&djt-qZbckMkEy*9Vyd6Iivdk-;_DA>L+NRDKq`3*V6@>r{PBr^W*cs{S&{H z8pl5#8y3-DepFQjwV#0RJOi=GF{nGn;vw(z!|_WW@k29mRsB$IB<|TSrtn|h;K$*1 z7P0x6Z}K_;5MvXz)U}UGOr{r5c>HVP1pLTB{`3hdU^u8-xRw1o+s7L0BJ`-x&Hh3> zD4fRLCGHlcu*2}kzw&m1d6Ib}fA&7!WzMdi&`&?{c=t#AcuL0I2l?>{z*h@1hz8AI zlnq<1`!H-PGHp}~`8)-nRPrKK9#21>zx6PG2~DdgeRG;7yzF)nrxE?$EklIFiA21i zWz1ddSY8vFGvmOK&htx|slqmPE*?2VXlwzI_Z4m?7qYi9PqSU@8{$PmjD12l1j6hb zVK?(m`Uw95bCp;Rf^jcYP8j8Ne6q(-9n%0qJ9IT*w52k6=XN3doFS|q?U z*92G@0C8}N1F&z}Il!)gKNGp2nWkd_dL0QccuVtS$B;mw0U_c7((NfA?!c*_gV!mi z+Oii3u)tOnrzZqa7kD-p`8xRJZE$x>ssRw1Z_BnHSdjn&zqf7rCWtl-kZm2%2FdY2 z(wPo;!$Cr+Ca}p!h=AafJp;rRpx*>O11!P>#V`QMp*BY*fLbI(bYQuG?Maup;7o@q z0l~91kj#J``K|$=gJqhL0E4!{DFRRhxY(ge4djxmIVJ=G1Oc#p!wPe9B*1hq1YFMp z+3&hU!bALl6Ii+g(39ombQk2mGcDgWO1LRCG>Kj(d*(f&?V`8Po9K0jocJ}0zuSV} zY!ycSbGHAd%(erwy+lp#j)AWIf~YZPp!=Bd+#6^y4~n>O68jV88~p6d%iJ!A6ZtK7 zX?nM~EaM6{pd|Bm4l`@{AMz_imK~FMNO(14i4Tg`vIm8Lc}l1gc5{CC1q>q=l@pKJ_#jP-RfNH{g#8RO4StrDbk2e7yXc6wqqcU|)9!s_(3=uxy**qPoP z?_~3dU+}M|K8WAW%}FlGT+e(X{tEX@^3}{K%wqP#WS#h@%pT@7`aQlr)hym6E@QhB zU!lKGUrJBmx27HvjxcyvFEjP~jqOiR|8MXMHHu^V)cA#hzj2Rn3Q-cj4M}Su$rcg` z@3{|tY{9MDApv!e6q$wr@Hizu&U@cg6te z3{G0*;uTZ@oCe+@EtDW?0@O*+=_EU+Xb>9#@gs(&c#1;6&`Ee} zBWn7DR#nro?J5wx)GEnwtiB$qqE&Fb0VD1S)y}vVbErER01h zynhf?PQ_R#hG9G4=q;n`C8{6?>_~z`(P(`PcKdp?9cJxc5RM{QYnZH%juhDAkDnT! zLU*z?bUBR35#Ni6$@kE=(E_074RQLS6s(|FO;ArX5^1{d2HtgsFe&<_C4O#H2V3)K z-ZCVfNEHSqKjdus193}ZdtZdp;QNn2B*oEL5dMqNg>+Lg8Wp^CFWZ!i`W<~F^>H+M zF^1koucH@1cYYe-7iY7R#eo@EPdILQ2By}s3rJg0H?^p#TDmvT-rmZN0ak+VZ;hV| zH2$p+llV8F@jsDB@E4jq;xrv&zqwrA(UqYdn3UKWLlB;Z#)*xY$wDhHaVC2TAjWm3WMPB>r6N`7L$r+tt1+5a9E`07i9o)PY+C;)QhxG>}xV*8tjvkShq`^`*d3 zRDg$b5a)~CSJRVl*CN!MT%B`QM;pAg2Lc>bh`Q9X;0~}=<%?x!QWX5C>SXKN25kJY zkEf7Rd<)`&)&4kJAV!IZ0lW|lLhu$tkOLri5ZEBOE&!ssWkW=APK6C(*me%0nGJmF zOmqtVMpon)06Pk}2{0bHr+}C_Sw)8)$W>%P=)K{z>5ylGrB-$cfk$EBEkiR6xi2Rc z$ZGD4!5HZQ_@!ltZG$*1fJp)82fHi?q|gPl)AJ$h3Id%AWSRKh_r=ddbR|e+LKJ?0 z_nnKxB@mwqeiEA;5dau|fCWU2mvdC5FPEuo+%y9Nu;82oBLia+08_MO06Dds41j3P zhL|$qA)SPKC2{Jp8g3~x43Vh1Lp*LJ-c#AI=4ghKUK52F0s%rL0ig`e1{i(`ya-@p zL(BmHwP7d$FP2@!gHRRR*T_!(*!H30APQ79t62(oY(K7j)9B0bmF+?-ovy&}N)Sj6 zxG%(fOCAKBN@fV*<1$Dp&&74;qWQ<_bD>R$2L>a((q}uU&!*Ow<0{*9bvICL9gJ_A zID?&i>5-ZL!47RBNpMMAT<6CN9j;W*o zqSs+o0sl8mg+x?Z3tYr&}q+ku(&C8I2(YEOM#C>6C z0x4O;LbiBPu!hyAa-`q@pX~VrM#++rH7qBOAq8t#0J$_NXevJV)?h~(pfKi7oGt!w z2n|z)(|=8kD^3jYo;>kk>f|_$&{YKGCmEkZ5u}g%U*`Ma_Z@&xlAkvryaNemj6)M+ zNc=E{9zi#u7eFX~1Fc0v&{^V#Xa@37b6)Hfe<$_~rJDz!=ZNR4{Lw!#Xk!dq3`b{( zNB<-o{S$ZePni+4LfSQ=;I^pY=s~b|2L8u5dK3kc5&lpeJ*Aq&G=A^~apB-GBF>N} zGHEhK)q@vh+{d@aoBCly4pC?`+K8@2*Pt(eKJpw`fWL#|2{)tjkb~+)I4;%xtVqsK zAbpTPMuWXM10pUGj4aV1MB=Qe$7en!HjgIX`51X}=x{6{yvhViP?rfOLW0&CPA;B2L!Eyx^!*YIeF2*TPTaRr(MP_dc6 zkw;4@yo^U2zUoC$z=;g1gEImu$W##MaGnc%c@s0pqY5pIz;}pfr~v*i2%9kt$Fb!6 zBO;nhoj|55=ESIH#1+D?_>^bq5Vsh%I$zHPftN-=S8-pVKvNOPE%INlTOz40!2MY>X)flZuAjVQ4zC-R` zGP3yE;<>R%8N|{?z6n3>pssQi#50B;eN|ittE1BMNn}t1{#F~RYaGac*ap?;pLb%3 znuW1Q(&3lqf!+9+Qduo=V&T*0qltLq5it^I4fp{S)s2G6AT|@cv#@?3?&_S;#mc(z ziz>QZta95`c-HWT`^32gnht5vGA)UI2vjulYnGvnaT418S=hcOQ+$8>38lVcX~v)- z8;M_@fx1tC7}3a43&)qN{$-iuYwY!L%rONIu>{5^bBN zT98-l_6SXQ`IF+rOx38DJ(jq2JbGwpal@M$oIb49L&`a}x~^k-k=lhNYHRS);O^7e zNlmpFb@&Q4;v)Bh9An+AX$6AjE-n#NgEt7`b+}1QwSdgn zoefKT`@f5025qM4KNBCF48|I4MBQ{CaJ+iZ$~lIX-xi>y+~|z>!x)=jV?tf#GVZR_ zc6M?4UGb;Mvl1+f?8aO1HAL&J+Plt1dsnnoJO`r z<^m!yAwB})A2f)rsEjbH$gv=Luz#6S(IsLw*Wdst8wx=!p3fR&!Y3EOl!0_&mRAOp zRN223T;7zyG#||Wqc9X(v+e#HULvB0CUhfVa*@78!3?J2r%pDj+6~q8EZpA;v|A&2ULek=|8A`oMVCz`x5jt5Snl zFpnDVLc|J@tDQX+F-=DGkVo-Ok-jI}dO1iHjNgGsC{2@Pr+>+`Vi!uvMlC$FgglMP zb7P{V(^3PAc1m4|LTHHY0v)Ibx*FVpBEyBVvk=@?-k_udB@IBa_5OL&iya7xYE3Ot zsnmgQzb~HBQiHscD`~%k@XR5A!9^-N93C_(RZBu2;7mpqj#a?9Bv1(paY_A_TqJaE zk(1Qm5GIkw1*D;=mQWD9=Oca${``KigODiBC>sT;V^@I@im=7_GF`@Z{z{yC za_cWXZ742r=(JV}OMsy&`9p=UgdEVVl)-~{eEElW-Oo(!sX?l$H5g+NvptS|RUX~4 z3C5K}^YB&}yN-$fs1;T+!lI$Ouq<~06Rbi&t8>hChFsJF=a#g<8a6=0TbpXwS3d69 z1)GF93q5;5c`LxN!YWQvnWHG{=Cl?`@=GMuATaT+A2E|B))3E8)=Q68>qlZxPIq1~ zAV)1g=Bs#4WsV}RGqQsRuLj9Tu1=WLRExwNAob5K2$Zt4)CPgtRXN%^LmEt#dwk6( z(j9B_yOtFDKRX4gKuQp(T?ZFKrTiGsrbv!KSRc&;>)R-N#1p1>H4hh0G*n+aQN!gq zg^MRX?FBRgbb_O_`WleR90-#px@{J`x2miSzIfuo(v=(0B}|HDmwO+btSY_>?f~0Z z$B!uWv?ID=qlVjm$cjF#=O4Z#r+7zBWq+z~G#SV#^`m%o2>8~@T0rqmsq(ENR@KHJpqSRKm{yKOvUCSIjRB&#AZefl@4Ye|3VG2_l|w|t#EQ0jy~vS*_Ctei? zt`$4FtEC)rs?dp=ES~1A;g*7dfi0ir1%nNQ6&y=Cs&Azn(l@)ZZ_v>3E;+z&&EXc} cgF88f9C^KadTE}(Mu8}FU@u3vR?N}=4_|!i?f?J) delta 4132 zcmcIneOOf2nZM_L-+RtI2LuKr5k>@s1aXFe;ln6HFu|m0q)8z$LJBcxu|x-z`e^O5 zqr0LeHjWbW`jMnDq0O$fQ7dv3O#DheQgxBKjXbehlp2$Y60O1X!N{J$B-^!5_m6#c z=Z`yc&U@b9`+3jrolvMK)H5lZsHZKFBQ~sI9oN7xF8gI?tV<CsnOyWIgd;+=oib2UYP zgYl_8?n^kp={)HR{UiBK!CMi5V|*n4Z|yj(V7QHlohAqJ#~ThmG~Or~aZe2Glh{pm zgSE09Y#Xzg#w0pO2k2GW$GX`*_5!PC-(*>g3DtKg?|alph;~`>UWv{pf5JiQrn$<9 z)m~RK<-Ov>YclM8#^|M@y8c3sKg*MwpX2kxx>?4PfSxr*{g=S+^?9;+=CelTO_)<{ zR*XO|;e+^PT#xU!uh{R|o9!SRtTwx$w$V&O*Pv&nnZYmCm_IUyccAk7=06Q@q8qoX zIxzeN{zh{;OsO%`cuc){MKMsk^ptd2g2bRXI{LDFTz*qtEKifu&>-qUZ$V|yeCA%G z!WeI;`f2?a`f9ye&(o=PLEEo2Yjd<>EkW%^>rovFpm=MP1YHg0*vKP88!4J2+#DTP z15Trv&N~}Sw*u$GRyuFpY#l*xag)`8e9*N>2;W*{R+*>*w2+wwnIZELSQ0XmU~|ZH z>#~FXggT%-WX_P$7?H$e#Qn?7e|MA$Sp_3(dUfhJz0N+E5;py?G;Ahi8Tkg&FX$cm zdc96B(p}nR?TEHnYtW`@$?9k7QFSXigLb3Us2ceZfrDXlYO_0VQ_WDy8Fs-)p!b^Y;a~jRC9@f!yxWWR z56p)xb%rulg7hA<&Fy45g!GYPWII_y>d7M{lMvj8kFi3UY1LaL{A`bT35gi0V8KE= ziBIV@Cu=G`BHgvFIG(@yu{l;z#hc$O(zh0YSSRs?m(B3~XgO#eYdQoy*4t%Ty;h+G zw0pEz^_Kdv`a5;M`cpNmE>WLVA64_!6qP6gN{{kerB!)NS*6TZDwPKnkCLdU@)fyT zJ}hsSUzS7i9QoVwWI0_P0hM0s4O!hJjBZfjiq|51tKX{5ctGTJ8(Jl?OY8%7h;3)U zmb2&BldMF%NF$j_`{{@DmvkrHLYrs2`273)W1&@|8vS`%F4A(D&-B0Y_dp*m%yQl;c5HavFFsxo?&(^k6`=5HUg z4jZye1%@s2rN6aS*J|=cN!}4*`AIj)JJob}a1QVvmMAu)hmW<=a*6lA7m&HFdM+x@XnZ*DgrO zbf+ws*EoM>ZAzg#C9;BL8|-nspuqOa5IAe8{D=41Bb9OgjTihmxw*B^)-9}?JNvns zxs5gT^J{0;1?y_(E`&qhv9hAY;q1tb}3Mza}^4=i=6-K1-sI|Q?NfG_{^?M)Ddl!XsbBCjq=tcT!y~lple@pP_%UQ zYYr0Zvx8wi!h_cnVYLE+i-{nGN%$k#6=R%|?C;?{EX^8C#;|6x&Nzijc~StM(Z$&w z7<>@l%jKz9Ht%GXuPMhBVKA~l@KvK=tru8_(uguu)oClyxVi~(~ec0=|~oF#M@6UbpoIiPp3 z3wB(f8ZUVV$IEJhh#<0=1WOO#Rd-q&27Q0n8V!r96GzY@|w&+K>zp%#)*n{3mBKhgCogHx~Z~05wUcGlWMJ(LH`62_kV$1Ah4%qSf!F^##7YC zer{i4({Y?#LMB?j#6`{@?L%U;I&D8E#;Y2#mT5NNcW@oGoIRx1$)mgIN~@pLl4!Ee z9?72$ka>~RuR&UdT5gGcs%5D>_z-!?C5-%kw(E!pyvUl5HPQ1#qOkBQKe&TeQ7jT@OLEofRRs-Iu z?;%4b915A~B7YC7Ant(&c+^kGb$hp3eRJ9y!C+g^K|5otdPz4MC98?L?b%KSt}z=$ zAT=iV@1ZwTGE+)iSL}l13G^I{9Zv^&*1Z%Vl_g8RYK{&Zh6$PUa~PdPZwVro&WWuv z>w{{PE(USdBt}szuD>dlBM@&;*iocuJgSIZkGg%JTL_yXVuo)Yh+GQb8`8Uk{HVk} zXZ`FWc82{2J1XRy4Zp2$avt>xdGGBB_!_&+K4!h*M(bqIu~yLGWk%v{o$ZoXJ3Ga$ zb9acIq0R2VuVB|&`uIPxEG`J}F(YDGxc>K^4a;}n&Jl|wofSxIo$#xbZ4l!@f$%Mj z4UgwQ-^C_8dVprA?Bwmltvf*Px7kf8a>0rZKXZVN^CX&Tw?wx%e%xi(TZZw5K1uD? zDoCESSb)VL-hG|cB2}wE)5W?Rf$>**F|v6*7OoIx!>0Esf%6uN0guKE)tlqZZ4);( z`daG1Ld>GzRvxo7uP4u+$Jb+4i6E#j7pM-4hAUGeqC7F{^Dz#4=5JMBp=<65D=MkS3!?xU= zY^{)pR3fQyPNlZQeosGXW?IkFcDa+Jpw&`Ib97(~US2Yam6ykrOXZKlL=W2n@^qE} zkupHjH`o{$cZ;S%*v}Fak2f23&0((iY;R6>{_>K$ztVeLDwm>*i{X!tuv8e~V?kJU z3nxPL5G{*n#skR_0gu0nS@GE(F-I&fNxV~at6Qj^>@rf}U^-2QpXZ1r`h<>wFM4R{ zFo^2S&B=%2+03PR{rR2(zThu(7UG9LrB5MVHi=C~{KiD~1d1i2Bu!eLm++;D(o*3a zyy#^B4XLaU&i3ApCm!C(7G9k#j+DWd@6G0KO%@E|G`8S^Kw2jG;gjiFRv}YP!txbM7l1~ wc4r}&W$a^qy^Mtj59P8oa*QLSy4E7m`^dl8$E;YRQsdq-yiVgSwXD+qPe0YcTmS$7 diff --git a/server/requirements-dev.txt b/server/requirements-dev.txt new file mode 100644 index 0000000..7171e02 --- /dev/null +++ b/server/requirements-dev.txt @@ -0,0 +1,4 @@ +-r requirements.txt + +# FastAPI/Starlette TestClient transport used by the stdlib unittest smoke suite. +httpx==0.28.1 diff --git a/server/tests/__init__.py b/server/tests/__init__.py new file mode 100644 index 0000000..b314cf4 --- /dev/null +++ b/server/tests/__init__.py @@ -0,0 +1 @@ +"""Runtime and API contract tests for Multi Seal Mail.""" diff --git a/server/tests/test_api_smoke.py b/server/tests/test_api_smoke.py new file mode 100644 index 0000000..356016f --- /dev/null +++ b/server/tests/test_api_smoke.py @@ -0,0 +1,257 @@ +from __future__ import annotations + +import io +import json +import os +import shutil +import tempfile +import unittest +import zipfile +from pathlib import Path + +# Settings are instantiated while importing the application. Configure an +# isolated test database and local storage before importing app modules. +_TEST_ROOT = Path(tempfile.mkdtemp(prefix="multi-seal-mail-tests-")) +os.environ["APP_ENV"] = "test" +os.environ["DATABASE_URL"] = f"sqlite:///{_TEST_ROOT / 'test.db'}" +os.environ["FILE_STORAGE_BACKEND"] = "local" +os.environ["FILE_STORAGE_LOCAL_ROOT"] = str(_TEST_ROOT / "files") +os.environ["MOCK_MAILBOX_DIR"] = str(_TEST_ROOT / "mock-mailbox") +os.environ["DEV_BOOTSTRAP_ENABLED"] = "false" + +from fastapi.testclient import TestClient + +from app.db.base import Base +from app.db.bootstrap import bootstrap_dev_data +from app.db.session import SessionLocal, engine +from app.main import app + + +class ApiSmokeTests(unittest.TestCase): + @classmethod + def setUpClass(cls) -> None: + cls.client = TestClient(app) + + @classmethod + def tearDownClass(cls) -> None: + cls.client.close() + engine.dispose() + shutil.rmtree(_TEST_ROOT, ignore_errors=True) + + def setUp(self) -> None: + Base.metadata.drop_all(bind=engine) + Base.metadata.create_all(bind=engine) + shutil.rmtree(_TEST_ROOT / "files", ignore_errors=True) + shutil.rmtree(_TEST_ROOT / "mock-mailbox", ignore_errors=True) + with SessionLocal() as session: + bootstrap_dev_data( + session, + api_key_secret="test-api-key", + user_password="test-admin", + ) + + def _login(self) -> tuple[dict[str, str], dict[str, object]]: + response = self.client.post( + "/api/v1/auth/login", + json={"email": "admin@example.local", "password": "test-admin"}, + ) + self.assertEqual(response.status_code, 200, response.text) + payload = response.json() + return {"Authorization": f"Bearer {payload['access_token']}"}, payload + + def test_managed_file_runtime_flow(self) -> None: + headers, login = self._login() + user_id = login["user"]["id"] + + spaces = self.client.get("/api/v1/files/spaces", headers=headers) + self.assertEqual(spaces.status_code, 200, spaces.text) + self.assertEqual(spaces.json()["spaces"][0]["owner_id"], user_id) + + folder = self.client.post( + "/api/v1/files/folders", + headers=headers, + json={"owner_type": "user", "owner_id": user_id, "path": "inbox"}, + ) + self.assertEqual(folder.status_code, 200, folder.text) + + first = self.client.post( + "/api/v1/files/upload", + headers=headers, + data={"owner_type": "user", "owner_id": user_id, "path": "inbox"}, + files=[("files", ("report.txt", b"first report", ""))], + ) + self.assertEqual(first.status_code, 200, first.text) + first_file = first.json()["files"][0] + self.assertEqual(first_file["display_path"], "inbox/report.txt") + self.assertEqual(first_file["content_type"], "text/plain") + + # Exercises request-model conversion to FileConflictResolution and the + # explicit rename path rather than silently overwriting an asset. + second = self.client.post( + "/api/v1/files/upload", + headers=headers, + data={ + "owner_type": "user", + "owner_id": user_id, + "path": "inbox", + "conflict_resolutions_json": json.dumps( + [ + { + "target_path": "inbox/report.txt", + "action": "rename", + "new_path": "inbox/report-copy.txt", + } + ] + ), + }, + files=[("files", ("report.txt", b"second report", "text/plain"))], + ) + self.assertEqual(second.status_code, 200, second.text) + second_file = second.json()["files"][0] + self.assertEqual(second_file["display_path"], "inbox/report-copy.txt") + + listing = self.client.get( + "/api/v1/files", + headers=headers, + params={"owner_type": "user", "owner_id": user_id, "path_prefix": "inbox"}, + ) + self.assertEqual(listing.status_code, 200, listing.text) + self.assertEqual(len(listing.json()["files"]), 2) + + resolved = self.client.post( + "/api/v1/files/resolve-patterns", + headers=headers, + json={ + "patterns": ["**/*.txt"], + "owner_type": "user", + "owner_id": user_id, + "include_unmatched": True, + }, + ) + self.assertEqual(resolved.status_code, 200, resolved.text) + self.assertEqual(len(resolved.json()["patterns"][0]["matches"]), 2) + self.assertEqual(resolved.json()["unmatched"], []) + + # Exercises RenamePlanItem construction without mutating persisted paths. + rename_preview = self.client.post( + "/api/v1/files/bulk-rename", + headers=headers, + json={ + "file_ids": [first_file["id"]], + "mode": "prefix", + "prefix": "archived-", + "dry_run": True, + }, + ) + self.assertEqual(rename_preview.status_code, 200, rename_preview.text) + self.assertEqual(rename_preview.json()["items"][0]["new_path"], "inbox/archived-report.txt") + + download = self.client.get(f"/api/v1/files/{first_file['id']}/download", headers=headers) + self.assertEqual(download.status_code, 200, download.text) + self.assertEqual(download.content, b"first report") + + archive = self.client.post( + "/api/v1/files/archive.zip", + headers=headers, + json={"file_ids": [first_file["id"], second_file["id"]], "filename": "reports.zip"}, + ) + self.assertEqual(archive.status_code, 200, archive.text) + with zipfile.ZipFile(io.BytesIO(archive.content)) as bundle: + self.assertEqual(sorted(bundle.namelist()), ["inbox/report-copy.txt", "inbox/report.txt"]) + self.assertEqual(bundle.read("inbox/report.txt"), b"first report") + + def test_campaign_create_validate_build_and_mock_send(self) -> None: + headers, _ = self._login() + campaign_json = { + "version": "1.0", + "campaign": {"id": "api-smoke", "name": "API smoke campaign", "mode": "test"}, + "fields": [{"name": "first_name", "type": "string", "required": True}], + "global_values": {}, + "server": { + "smtp": { + "host": "smtp.example.invalid", + "port": 587, + "username": "sender@example.org", + "password": "test-secret", + "security": "starttls", + }, + "imap": { + "enabled": True, + "host": "imap.example.invalid", + "port": 993, + "username": "sender@example.org", + "password": "test-secret", + "security": "tls", + }, + }, + "recipients": { + "from": {"email": "sender@example.org", "name": "Sender", "type": "to"}, + "allow_individual_to": True, + }, + "template": { + "subject": "Hello ${local::first_name}", + "text": "Hello ${local::first_name}, this is a smoke test.", + }, + "attachments": {"base_path": ".", "global": [], "allow_individual": False}, + "entries": { + "inline": [ + { + "id": "recipient-1", + "to": [{"email": "recipient@example.org", "name": "Recipient", "type": "to"}], + "fields": {"first_name": "Ada"}, + } + ] + }, + "validation_policy": {"missing_email": "block", "template_error": "block"}, + "delivery": {"imap_append_sent": {"enabled": True, "folder": "Sent"}}, + "status_tracking": {"enabled": True}, + } + + created = self.client.post( + "/api/v1/campaigns", + headers=headers, + json={"config": campaign_json}, + ) + self.assertEqual(created.status_code, 200, created.text) + created_payload = created.json() + campaign_id = created_payload["campaign"]["id"] + version_id = created_payload["version"]["id"] + + validated = self.client.post( + f"/api/v1/campaigns/versions/{version_id}/validate", + headers=headers, + json={"check_files": False}, + ) + self.assertEqual(validated.status_code, 200, validated.text) + self.assertTrue(validated.json()["ok"]) + + built = self.client.post( + f"/api/v1/campaigns/versions/{version_id}/build", + headers=headers, + json={"write_eml": False}, + ) + self.assertEqual(built.status_code, 200, built.text) + self.assertEqual(built.json()["built_count"], 1) + + mocked = self.client.post( + f"/api/v1/campaigns/{campaign_id}/mock-send", + headers=headers, + json={ + "version_id": version_id, + "send": True, + "append_sent": True, + "clear_mailbox": True, + "check_files": False, + }, + ) + self.assertEqual(mocked.status_code, 200, mocked.text) + result = mocked.json()["result"] + self.assertTrue(result["validation"]["ok"]) + self.assertEqual(result["build"]["built_count"], 1) + self.assertEqual(result["send"]["sent_count"], 1) + self.assertEqual(result["send"]["imap_appended_count"], 1) + self.assertEqual(result["send"]["imap_failed_count"], 0) + + +if __name__ == "__main__": + unittest.main()