diff --git a/server/app/api/v1/file_schemas.py b/server/app/api/v1/file_schemas.py new file mode 100644 index 0000000..ea592c3 --- /dev/null +++ b/server/app/api/v1/file_schemas.py @@ -0,0 +1,186 @@ +from __future__ import annotations + +from typing import Any, Literal + +from pydantic import BaseModel, Field + +from app.storage.common import FileConflictResolution + + +class FileSpaceResponse(BaseModel): + id: str + label: str + owner_type: Literal["user", "group"] + owner_id: str + description: str | None = None + + +class FileSpacesResponse(BaseModel): + spaces: list[FileSpaceResponse] + + +class FileShareResponse(BaseModel): + id: str + target_type: str + target_id: str + permission: str + created_at: str + revoked_at: str | None = None + + +class FileAssetResponse(BaseModel): + id: str + tenant_id: str + owner_type: str + owner_id: str + display_path: str + filename: str + description: str | None = None + size_bytes: int + content_type: str | None = None + checksum_sha256: str + version_id: str + created_at: str + updated_at: str + deleted_at: str | None = None + audit_relevant: bool = False + metadata: dict[str, Any] | None = None + shares: list[FileShareResponse] = Field(default_factory=list) + + +class FileFolderResponse(BaseModel): + id: str + tenant_id: str + owner_type: str + owner_id: str + path: str + created_at: str + updated_at: str + deleted_at: str | None = None + + +class FileFoldersResponse(BaseModel): + folders: list[FileFolderResponse] + + +class FileFolderCreateRequest(BaseModel): + owner_type: Literal["user", "group"] + owner_id: str + path: str + + +class FileFolderDeleteRequest(BaseModel): + owner_type: Literal["user", "group"] + owner_id: str + path: str + recursive: bool = True + + +class FileFolderDeleteResponse(BaseModel): + deleted_folders: int + deleted_files: int + + +class FileListResponse(BaseModel): + files: list[FileAssetResponse] + + +class FileUploadResponse(BaseModel): + files: list[FileAssetResponse] + + +class BulkDeleteRequest(BaseModel): + file_ids: list[str] + + +class BulkDeleteResponse(BaseModel): + deleted_count: int + + +class ConflictResolutionRequest(BaseModel): + target_path: str + action: Literal["overwrite", "rename", "skip"] + new_path: str | None = None + + +def _conflict_resolutions(items: list[ConflictResolutionRequest] | None) -> list[FileConflictResolution]: + return [FileConflictResolution(target_path=item.target_path, action=item.action, new_path=item.new_path) for item in items or []] + + +class FileShareRequest(BaseModel): + target_type: Literal["user", "group", "campaign", "tenant"] + target_id: str + permission: Literal["read", "write", "manage"] = "read" + + +class RenameRequest(BaseModel): + file_ids: list[str] = Field(default_factory=list) + folder_paths: list[str] = Field(default_factory=list) + owner_type: Literal["user", "group"] | None = None + owner_id: str | None = None + mode: Literal["direct", "prefix", "suffix", "replace"] + new_name: str | None = None + find: str | None = None + replacement: str = "" + prefix: str = "" + suffix: str = "" + recursive: bool = False + dry_run: bool = True + + +class RenamePreviewItem(BaseModel): + kind: Literal["file", "folder"] + id: str + file_id: str | None = None + folder_path: str | None = None + old_path: str + new_path: str + + +class RenameResponse(BaseModel): + dry_run: bool + items: list[RenamePreviewItem] + + +class TransferRequest(BaseModel): + operation: Literal["move", "copy"] + file_ids: list[str] = Field(default_factory=list) + folder_paths: list[str] = Field(default_factory=list) + source_owner_type: Literal["user", "group"] + source_owner_id: str + target_owner_type: Literal["user", "group"] + target_owner_id: str + target_folder: str = "" + conflict_strategy: Literal["reject", "overwrite", "rename"] = "reject" + conflict_resolutions: list[ConflictResolutionRequest] = Field(default_factory=list) + + +class TransferResponse(BaseModel): + operation: str + files: int + folders: int + + +class ArchiveRequest(BaseModel): + file_ids: list[str] + filename: str = "files.zip" + + +class PatternResolveRequest(BaseModel): + patterns: list[str] + owner_type: Literal["user", "group"] | None = None + owner_id: str | None = None + campaign_id: str | None = None + path_prefix: str | None = None + include_unmatched: bool = True + case_sensitive: bool = False + + +class PatternMatchResponse(BaseModel): + pattern: str + matches: list[FileAssetResponse] + + +class PatternResolveResponse(BaseModel): + patterns: list[PatternMatchResponse] + unmatched: list[FileAssetResponse] = Field(default_factory=list) diff --git a/server/app/api/v1/files.py b/server/app/api/v1/files.py index b70b0bb..8d6460d 100644 --- a/server/app/api/v1/files.py +++ b/server/app/api/v1/files.py @@ -1,180 +1,63 @@ from __future__ import annotations +import json from io import BytesIO -from typing import Any, Literal - +from typing import Literal from fastapi import APIRouter, Depends, File as FastAPIFile, Form, HTTPException, UploadFile, status from fastapi.responses import StreamingResponse -from pydantic import BaseModel, Field from sqlalchemy.orm import Session from app.auth.dependencies import ApiPrincipal, require_scope +from app.api.v1.file_schemas import ( + ArchiveRequest, + BulkDeleteRequest, + BulkDeleteResponse, + ConflictResolutionRequest, + FileAssetResponse, + FileFolderCreateRequest, + FileFolderDeleteRequest, + FileFolderDeleteResponse, + FileFolderResponse, + FileFoldersResponse, + FileListResponse, + FileShareRequest, + FileShareResponse, + FileSpaceResponse, + FileSpacesResponse, + FileUploadResponse, + PatternMatchResponse, + PatternResolveRequest, + PatternResolveResponse, + RenamePreviewItem, + RenameRequest, + RenameResponse, + TransferRequest, + TransferResponse, + _conflict_resolutions, +) from app.db.models import Campaign, FileAsset, FileFolder, FileShare, Group from app.db.session import get_session from app.storage.paths import UnsafeFilePathError, filename_from_path, normalize_folder, normalize_logical_path -from app.storage.services import ( - FileStorageError, +from app.storage.access import ensure_group_access, user_group_ids +from app.storage.archives import create_zip_bytes, extract_zip_upload +from app.storage.common import FileStorageError +from app.storage.files import ( asset_is_audit_relevant, - build_rename_preview, create_file_asset, - create_folder, - create_zip_bytes, current_version_and_blob, - extract_zip_upload, get_asset_for_user, list_assets_for_user, - list_folders_for_user, - rename_asset, - resolve_patterns, + read_asset_bytes, share_file, soft_delete_assets, - soft_delete_folder, - user_group_ids, - read_asset_bytes, ) +from app.storage.folders import create_folder, list_folders_for_user, soft_delete_folder +from app.storage.search import resolve_patterns +from app.storage.transfers import rename_selection, transfer_selection router = APIRouter(prefix="/files", tags=["files"]) -class FileSpaceResponse(BaseModel): - id: str - label: str - owner_type: Literal["user", "group"] - owner_id: str - description: str | None = None - - -class FileSpacesResponse(BaseModel): - spaces: list[FileSpaceResponse] - - -class FileShareResponse(BaseModel): - id: str - target_type: str - target_id: str - permission: str - created_at: str - revoked_at: str | None = None - - -class FileAssetResponse(BaseModel): - id: str - tenant_id: str - owner_type: str - owner_id: str - display_path: str - filename: str - description: str | None = None - size_bytes: int - content_type: str | None = None - checksum_sha256: str - version_id: str - created_at: str - updated_at: str - deleted_at: str | None = None - audit_relevant: bool = False - metadata: dict[str, Any] | None = None - shares: list[FileShareResponse] = Field(default_factory=list) - - -class FileFolderResponse(BaseModel): - id: str - tenant_id: str - owner_type: str - owner_id: str - path: str - created_at: str - updated_at: str - deleted_at: str | None = None - - -class FileFoldersResponse(BaseModel): - folders: list[FileFolderResponse] - - -class FileFolderCreateRequest(BaseModel): - owner_type: Literal["user", "group"] - owner_id: str - path: str - - -class FileFolderDeleteRequest(BaseModel): - owner_type: Literal["user", "group"] - owner_id: str - path: str - recursive: bool = True - - -class FileFolderDeleteResponse(BaseModel): - deleted_folders: int - deleted_files: int - - -class FileListResponse(BaseModel): - files: list[FileAssetResponse] - - -class FileUploadResponse(BaseModel): - files: list[FileAssetResponse] - - -class BulkDeleteRequest(BaseModel): - file_ids: list[str] - - -class BulkDeleteResponse(BaseModel): - deleted_count: int - - -class FileShareRequest(BaseModel): - target_type: Literal["user", "group", "campaign", "tenant"] - target_id: str - permission: Literal["read", "write", "manage"] = "read" - - -class RenameRequest(BaseModel): - file_ids: list[str] - mode: Literal["prefix", "suffix", "replace"] - find: str | None = None - replacement: str = "" - prefix: str = "" - suffix: str = "" - dry_run: bool = True - - -class RenamePreviewItem(BaseModel): - file_id: str - old_path: str - new_path: str - - -class RenameResponse(BaseModel): - dry_run: bool - items: list[RenamePreviewItem] - - -class ArchiveRequest(BaseModel): - file_ids: list[str] - filename: str = "files.zip" - - -class PatternResolveRequest(BaseModel): - patterns: list[str] - owner_type: Literal["user", "group"] | None = None - owner_id: str | None = None - campaign_id: str | None = None - path_prefix: str | None = None - include_unmatched: bool = True - - -class PatternMatchResponse(BaseModel): - pattern: str - matches: list[FileAssetResponse] - - -class PatternResolveResponse(BaseModel): - patterns: list[PatternMatchResponse] - unmatched: list[FileAssetResponse] = Field(default_factory=list) def _is_admin(principal: ApiPrincipal) -> bool: @@ -251,8 +134,6 @@ def _ensure_list_owner_access(session: Session, principal: ApiPrincipal, owner_t raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No access to this user file space") if owner_type == "group" and owner_id: try: - from app.storage.services import ensure_group_access - ensure_group_access( session, tenant_id=principal.tenant_id, @@ -393,12 +274,16 @@ async def upload_files( path: str = Form(default=""), campaign_id: str | None = Form(default=None), unpack_zip: bool = Form(default=False), + conflict_strategy: Literal["reject", "overwrite", "rename"] = Form(default="reject"), + conflict_resolutions_json: str | None = Form(default=None), session: Session = Depends(get_session), principal: ApiPrincipal = Depends(require_scope("attachments:write")), ): target_owner = owner_id or principal.user.id uploaded_assets: list[FileAsset] = [] try: + raw_resolutions = json.loads(conflict_resolutions_json) if conflict_resolutions_json else [] + upload_resolutions = _conflict_resolutions([ConflictResolutionRequest(**item) for item in raw_resolutions]) for upload in files: data = await upload.read() filename = upload.filename or "file" @@ -413,6 +298,8 @@ async def upload_files( zip_data=data, folder=path, campaign_id=campaign_id, + conflict_strategy=conflict_strategy, + conflict_resolutions=upload_resolutions, is_admin=_is_admin(principal), ) uploaded_assets.extend(item.asset for item in extracted) @@ -428,6 +315,8 @@ async def upload_files( folder=path, content_type=content_type, campaign_id=campaign_id, + conflict_strategy=conflict_strategy, + conflict_resolutions=upload_resolutions, is_admin=_is_admin(principal), ) uploaded_assets.append(stored.asset) @@ -445,12 +334,16 @@ async def upload_zip( owner_id: str | None = Form(default=None), path: str = Form(default=""), campaign_id: str | None = Form(default=None), + conflict_strategy: Literal["reject", "overwrite", "rename"] = Form(default="reject"), + conflict_resolutions_json: str | None = Form(default=None), session: Session = Depends(get_session), principal: ApiPrincipal = Depends(require_scope("attachments:write")), ): data = await file.read() target_owner = owner_id or principal.user.id try: + raw_resolutions = json.loads(conflict_resolutions_json) if conflict_resolutions_json else [] + upload_resolutions = _conflict_resolutions([ConflictResolutionRequest(**item) for item in raw_resolutions]) extracted = extract_zip_upload( session, tenant_id=principal.tenant_id, @@ -460,6 +353,8 @@ async def upload_zip( zip_data=data, folder=path, campaign_id=campaign_id, + conflict_strategy=conflict_strategy, + conflict_resolutions=upload_resolutions, is_admin=_is_admin(principal), ) session.commit() @@ -571,25 +466,70 @@ def bulk_rename( principal: ApiPrincipal = Depends(require_scope("attachments:write")), ): try: - assets = [ - get_asset_for_user(session, tenant_id=principal.tenant_id, user_id=principal.user.id, asset_id=file_id, require_write=True, is_admin=_is_admin(principal)) - for file_id in payload.file_ids - ] - previews = [ - RenamePreviewItem( - file_id=asset.id, - old_path=asset.display_path, - new_path=normalize_logical_path(build_rename_preview(asset, mode=payload.mode, find=payload.find, replacement=payload.replacement, prefix=payload.prefix, suffix=payload.suffix)), - ) - for asset in assets - ] + plan = rename_selection( + session, + tenant_id=principal.tenant_id, + user_id=principal.user.id, + file_ids=payload.file_ids, + folder_paths=payload.folder_paths, + owner_type=payload.owner_type, + owner_id=payload.owner_id, + mode=payload.mode, + new_name=payload.new_name, + find=payload.find, + replacement=payload.replacement, + prefix=payload.prefix, + suffix=payload.suffix, + recursive=payload.recursive, + dry_run=payload.dry_run, + is_admin=_is_admin(principal), + ) if not payload.dry_run: - by_id = {asset.id: asset for asset in assets} - for item in previews: - rename_asset(by_id[item.file_id], new_path=item.new_path) - session.add(by_id[item.file_id]) session.commit() - return RenameResponse(dry_run=payload.dry_run, items=previews) + return RenameResponse( + dry_run=payload.dry_run, + items=[ + RenamePreviewItem( + kind=item.kind, + id=item.id, + file_id=item.id if item.kind == "file" else None, + folder_path=item.old_path if item.kind == "folder" else None, + old_path=item.old_path, + new_path=item.new_path, + ) + for item in plan + ], + ) + except (FileStorageError, UnsafeFilePathError, ValueError) as exc: + session.rollback() + raise _http_error(exc) from exc + + +@router.post("/transfer", response_model=TransferResponse) +def transfer_files( + payload: TransferRequest, + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("attachments:write")), +): + try: + files, folders = transfer_selection( + session, + tenant_id=principal.tenant_id, + user_id=principal.user.id, + operation=payload.operation, + file_ids=payload.file_ids, + folder_paths=payload.folder_paths, + source_owner_type=payload.source_owner_type, + source_owner_id=payload.source_owner_id, + target_owner_type=payload.target_owner_type, + target_owner_id=payload.target_owner_id, + target_folder=payload.target_folder, + conflict_strategy=payload.conflict_strategy, + conflict_resolutions=_conflict_resolutions(payload.conflict_resolutions), + is_admin=_is_admin(principal), + ) + session.commit() + return TransferResponse(operation=payload.operation, files=files, folders=folders) except (FileStorageError, UnsafeFilePathError, ValueError) as exc: session.rollback() raise _http_error(exc) from exc @@ -632,7 +572,7 @@ def resolve_file_patterns( path_prefix=payload.path_prefix, is_admin=_is_admin(principal), ) - resolved, unmatched = resolve_patterns(assets, payload.patterns, base_path=payload.path_prefix) + resolved, unmatched = resolve_patterns(assets, payload.patterns, base_path=payload.path_prefix, case_sensitive=payload.case_sensitive) return PatternResolveResponse( patterns=[PatternMatchResponse(pattern=item.pattern, matches=[_asset_response(session, asset) for asset in item.matches]) for item in resolved], unmatched=[_asset_response(session, asset) for asset in unmatched] if payload.include_unmatched else [], diff --git a/server/app/storage/access.py b/server/app/storage/access.py new file mode 100644 index 0000000..c0e8f84 --- /dev/null +++ b/server/app/storage/access.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from sqlalchemy.orm import Session + +from app.db.models import Group, UserGroupMembership +from app.storage.common import FileStorageError + + +def user_group_ids(session: Session, *, tenant_id: str, user_id: str, include_admin_groups: bool = False) -> list[str]: + if include_admin_groups: + return [row.id for row in session.query(Group).filter(Group.tenant_id == tenant_id).order_by(Group.name.asc()).all()] + return [ + row.group_id + for row in session.query(UserGroupMembership) + .filter(UserGroupMembership.tenant_id == tenant_id, UserGroupMembership.user_id == user_id) + .all() + ] + + +def ensure_group_access(session: Session, *, tenant_id: str, group_id: str, user_id: str, is_admin: bool = False) -> None: + group = session.get(Group, group_id) + if not group or group.tenant_id != tenant_id: + raise FileStorageError("Group not found") + if is_admin: + return + membership = ( + session.query(UserGroupMembership) + .filter(UserGroupMembership.tenant_id == tenant_id, UserGroupMembership.user_id == user_id, UserGroupMembership.group_id == group_id) + .one_or_none() + ) + if membership is None: + raise FileStorageError("No access to this group file space") + + +def ensure_owner_access(session: Session, *, tenant_id: str, owner_type: str, owner_id: str, user_id: str, is_admin: bool = False) -> None: + owner_type = owner_type.lower().strip() + if owner_type == "user": + if owner_id != user_id and not is_admin: + raise FileStorageError("No access to this user file space") + return + if owner_type == "group": + ensure_group_access(session, tenant_id=tenant_id, group_id=owner_id, user_id=user_id, is_admin=is_admin) + return + raise FileStorageError("Files must be owned by a user or group") diff --git a/server/app/storage/archives.py b/server/app/storage/archives.py new file mode 100644 index 0000000..aaf97a0 --- /dev/null +++ b/server/app/storage/archives.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +import mimetypes +import zipfile +from io import BytesIO +from typing import Iterable + +from sqlalchemy.orm import Session + +from app.db.models import FileAsset +from app.storage.common import FileStorageError, UploadedStoredFile +from app.storage.files import create_file_asset, read_asset_bytes +from app.storage.paths import filename_from_path, normalize_folder, normalize_logical_path + + +def create_zip_bytes(session: Session, assets: Iterable[FileAsset]) -> bytes: + buffer = BytesIO() + with zipfile.ZipFile(buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as archive: + for asset in assets: + data, _, _ = read_asset_bytes(session, asset) + archive.writestr(asset.display_path, data) + buffer.seek(0) + return buffer.getvalue() + + +def extract_zip_upload( + session: Session, + *, + tenant_id: str, + owner_type: str, + owner_id: str, + user_id: str, + zip_data: bytes, + folder: str | None, + campaign_id: str | None, + conflict_strategy: str = "reject", + conflict_resolutions: Iterable[FileConflictResolution] | None = None, + is_admin: bool = False, + max_files: int = 1000, + max_total_bytes: int = 250 * 1024 * 1024, +) -> list[UploadedStoredFile]: + uploaded: list[UploadedStoredFile] = [] + total = 0 + base_folder = normalize_folder(folder) + with zipfile.ZipFile(BytesIO(zip_data)) as archive: + infos = [info for info in archive.infolist() if not info.is_dir()] + if len(infos) > max_files: + raise FileStorageError(f"ZIP contains too many files (limit {max_files})") + for info in infos: + if info.file_size < 0: + raise FileStorageError("Invalid ZIP member") + total += info.file_size + if total > max_total_bytes: + raise FileStorageError("ZIP is too large after extraction") + inner_path = normalize_logical_path(info.filename) + target_path = f"{base_folder}/{inner_path}" if base_folder else inner_path + data = archive.read(info) + uploaded.append( + create_file_asset( + session, + tenant_id=tenant_id, + owner_type=owner_type, + owner_id=owner_id, + user_id=user_id, + filename=filename_from_path(inner_path), + data=data, + display_path=target_path, + content_type=mimetypes.guess_type(inner_path)[0] or "application/octet-stream", + campaign_id=campaign_id, + conflict_strategy=conflict_strategy, + conflict_resolutions=conflict_resolutions, + is_admin=is_admin, + ) + ) + return uploaded diff --git a/server/app/storage/campaign_usage.py b/server/app/storage/campaign_usage.py new file mode 100644 index 0000000..825007a --- /dev/null +++ b/server/app/storage/campaign_usage.py @@ -0,0 +1,127 @@ +from __future__ import annotations + +from pathlib import PurePosixPath + +from sqlalchemy.orm import Session + +from app.db.models import CampaignAttachmentUse, CampaignJob, FileAsset +from app.storage.common import utcnow +from app.storage.files import current_version_and_blob, list_assets_for_user + + +def _candidate_match_keys(raw_match: str) -> set[str]: + cleaned = raw_match.replace("\\", "/").strip().strip("/") + result = {cleaned} + if cleaned: + result.add(PurePosixPath(cleaned).name) + return {item for item in result if item} + + +def record_campaign_attachment_uses_for_job(session: Session, job: CampaignJob, *, stage: str = "built") -> None: + """Create best-effort immutable file-use records for matched managed files. + + Existing attachment resolution is still filesystem/path based. This bridge + records uses when a resolved attachment match can be tied to a managed file + by logical path or filename among files shared with the campaign. + """ + + attachments = job.resolved_attachments or [] + if not isinstance(attachments, list): + return + assets = list_assets_for_user( + session, + tenant_id=job.tenant_id, + user_id="", + campaign_id=job.campaign_id, + is_admin=True, + ) + by_key: dict[str, FileAsset] = {} + for asset in assets: + by_key[asset.display_path.strip("/")] = asset + by_key[asset.filename] = asset + for attachment in attachments: + if not isinstance(attachment, dict): + continue + matches = attachment.get("matches") if isinstance(attachment.get("matches"), list) else [] + for raw in matches: + if not isinstance(raw, str): + continue + asset = next((by_key[key] for key in _candidate_match_keys(raw) if key in by_key), None) + if not asset: + continue + version, blob = current_version_and_blob(session, asset) + exists = ( + session.query(CampaignAttachmentUse) + .filter( + CampaignAttachmentUse.campaign_job_id == job.id, + CampaignAttachmentUse.file_version_id == version.id, + CampaignAttachmentUse.filename_used == asset.filename, + CampaignAttachmentUse.use_stage == stage, + ) + .one_or_none() + ) + if exists: + continue + session.add( + CampaignAttachmentUse( + tenant_id=job.tenant_id, + campaign_id=job.campaign_id, + campaign_version_id=job.campaign_version_id, + campaign_job_id=job.id, + entry_index=job.entry_index, + entry_id=job.entry_id, + file_asset_id=asset.id, + file_version_id=version.id, + file_blob_id=blob.id, + filename_used=asset.filename, + checksum_sha256=blob.checksum_sha256, + size_bytes=blob.size_bytes, + content_type=blob.content_type, + use_stage=stage, + ) + ) + + +def mark_job_attachment_uses_sent(session: Session, job: CampaignJob) -> None: + record_campaign_attachment_uses_for_job(session, job, stage="built") + now = utcnow() + uses = ( + session.query(CampaignAttachmentUse) + .filter( + CampaignAttachmentUse.tenant_id == job.tenant_id, + CampaignAttachmentUse.campaign_job_id == job.id, + CampaignAttachmentUse.use_stage == "built", + ) + .all() + ) + for use in uses: + sent = ( + session.query(CampaignAttachmentUse) + .filter( + CampaignAttachmentUse.campaign_job_id == job.id, + CampaignAttachmentUse.file_version_id == use.file_version_id, + CampaignAttachmentUse.use_stage == "sent", + ) + .one_or_none() + ) + if sent: + continue + session.add( + CampaignAttachmentUse( + tenant_id=use.tenant_id, + campaign_id=use.campaign_id, + campaign_version_id=use.campaign_version_id, + campaign_job_id=use.campaign_job_id, + entry_index=use.entry_index, + entry_id=use.entry_id, + file_asset_id=use.file_asset_id, + file_version_id=use.file_version_id, + file_blob_id=use.file_blob_id, + filename_used=use.filename_used, + checksum_sha256=use.checksum_sha256, + size_bytes=use.size_bytes, + content_type=use.content_type, + use_stage="sent", + used_at=now, + ) + ) diff --git a/server/app/storage/common.py b/server/app/storage/common.py new file mode 100644 index 0000000..271ba50 --- /dev/null +++ b/server/app/storage/common.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timezone + +from app.db.models import FileAsset, FileBlob, FileVersion + + +class FileStorageError(RuntimeError): + pass + + +@dataclass(slots=True) +class UploadedStoredFile: + asset: FileAsset + version: FileVersion + blob: FileBlob + + +@dataclass(slots=True) +class ResolvedPattern: + pattern: str + matches: list[FileAsset] + + +@dataclass(slots=True) +class FileConflictResolution: + target_path: str + action: str + new_path: str | None = None + + +@dataclass(slots=True) +class RenamePlanItem: + kind: str + id: str + old_path: str + new_path: str + + +def utcnow() -> datetime: + return datetime.now(timezone.utc) diff --git a/server/app/storage/files.py b/server/app/storage/files.py new file mode 100644 index 0000000..b2a55e5 --- /dev/null +++ b/server/app/storage/files.py @@ -0,0 +1,484 @@ +from __future__ import annotations + +import hashlib +import mimetypes +from pathlib import PurePosixPath +from typing import Any, Iterable +from uuid import uuid4 + +from sqlalchemy import or_ +from sqlalchemy.orm import Session + +from app.db.models import Campaign, CampaignAttachmentUse, FileAsset, FileBlob, FileShare, FileVersion +from app.settings import settings +from app.storage.access import ensure_owner_access, user_group_ids +from app.storage.backends import get_storage_backend +from app.storage.common import FileConflictResolution, FileStorageError, UploadedStoredFile, utcnow +from app.storage.paths import filename_from_path, join_folder_filename, normalize_folder, normalize_logical_path, safe_storage_component + + +def _asset_query_for_owner(session: Session, *, tenant_id: str, owner_type: str, owner_id: str): + query = session.query(FileAsset).filter(FileAsset.tenant_id == tenant_id, FileAsset.owner_type == owner_type) + if owner_type == "user": + return query.filter(FileAsset.owner_user_id == owner_id) + if owner_type == "group": + return query.filter(FileAsset.owner_group_id == owner_id) + raise FileStorageError("Unsupported owner type") + + +def _storage_bucket_name() -> str: + return settings.file_storage_s3_bucket or settings.s3_bucket + + +def _storage_backend_name() -> str: + return settings.file_storage_backend.lower().strip() + + +def _storage_key(*, tenant_id: str, checksum: str, filename: str) -> str: + return f"tenants/{tenant_id}/files/{checksum[:2]}/{uuid4().hex}-{safe_storage_component(filename)}" + + +def _get_or_create_blob( + session: Session, + *, + tenant_id: str, + data: bytes, + filename: str, + content_type: str | None, +) -> FileBlob: + checksum = hashlib.sha256(data).hexdigest() + size = len(data) + blob = ( + session.query(FileBlob) + .filter(FileBlob.tenant_id == tenant_id, FileBlob.checksum_sha256 == checksum, FileBlob.size_bytes == size) + .one_or_none() + ) + if blob: + blob.ref_count += 1 + session.add(blob) + return blob + + storage_key = _storage_key(tenant_id=tenant_id, checksum=checksum, filename=filename) + backend = get_storage_backend() + backend.put_bytes(storage_key, data, content_type=content_type) + blob = FileBlob( + tenant_id=tenant_id, + storage_backend=_storage_backend_name(), + storage_bucket=_storage_bucket_name(), + storage_key=storage_key, + checksum_sha256=checksum, + size_bytes=size, + content_type=content_type, + ref_count=1, + ) + session.add(blob) + session.flush() + return blob + + +def create_file_asset( + session: Session, + *, + tenant_id: str, + owner_type: str, + owner_id: str, + user_id: str, + filename: str, + data: bytes, + folder: str | None = None, + display_path: str | None = None, + content_type: str | None = None, + description: str | None = None, + metadata: dict[str, Any] | None = None, + campaign_id: str | None = None, + conflict_strategy: str = "reject", + conflict_resolutions: Iterable[FileConflictResolution] | None = None, + is_admin: bool = False, +) -> UploadedStoredFile: + owner_type = owner_type.lower().strip() + ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin) + + safe_filename = filename_from_path(normalize_logical_path(filename, fallback_filename="file")) + logical_path = normalize_logical_path(display_path) if display_path else join_folder_filename(folder, safe_filename) + if not content_type: + content_type = mimetypes.guess_type(safe_filename)[0] or "application/octet-stream" + + conflict_strategy = _normalize_conflict_strategy(conflict_strategy) + resolutions = _resolution_by_path(conflict_resolutions) + resolution = resolutions.get(logical_path) + action = resolution.action if resolution else conflict_strategy + if resolution and resolution.new_path and action == "rename": + logical_path = normalize_logical_path(resolution.new_path) + elif _active_asset_exists(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, path=logical_path): + if action == "reject": + raise FileStorageError(f"Target file already exists: {logical_path}") + if action == "overwrite": + _soft_delete_conflicting_asset(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, path=logical_path) + elif action == "rename": + logical_path = _next_available_logical_path(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, desired_path=logical_path) + elif action == "skip": + raise FileStorageError(f"Skipped upload target: {logical_path}") + elif action == "rename": + logical_path = _next_available_logical_path(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, desired_path=logical_path) + + blob = _get_or_create_blob(session, tenant_id=tenant_id, data=data, filename=safe_filename, content_type=content_type) + asset = FileAsset( + tenant_id=tenant_id, + owner_type=owner_type, + owner_user_id=owner_id if owner_type == "user" else None, + owner_group_id=owner_id if owner_type == "group" else None, + display_path=logical_path, + filename=filename_from_path(logical_path), + description=description, + created_by_user_id=user_id, + metadata_=metadata or {}, + ) + session.add(asset) + session.flush() + version = FileVersion( + tenant_id=tenant_id, + file_asset_id=asset.id, + blob_id=blob.id, + version_number=1, + filename_at_upload=safe_filename, + display_path_at_upload=logical_path, + content_type=content_type, + size_bytes=blob.size_bytes, + checksum_sha256=blob.checksum_sha256, + created_by_user_id=user_id, + ) + session.add(version) + session.flush() + asset.current_version_id = version.id + session.add(asset) + if campaign_id: + share_file(session, tenant_id=tenant_id, asset=asset, target_type="campaign", target_id=campaign_id, permission="read", user_id=user_id) + return UploadedStoredFile(asset=asset, version=version, blob=blob) + + +def get_asset_for_user(session: Session, *, tenant_id: str, user_id: str, asset_id: str, require_write: bool = False, is_admin: bool = False) -> FileAsset: + asset = session.get(FileAsset, asset_id) + if not asset or asset.tenant_id != tenant_id or asset.deleted_at is not None: + raise FileStorageError("File not found") + if is_admin: + return asset + group_ids = user_group_ids(session, tenant_id=tenant_id, user_id=user_id) + owns = (asset.owner_type == "user" and asset.owner_user_id == user_id) or (asset.owner_type == "group" and asset.owner_group_id in group_ids) + if owns: + return asset + permission_values = ["read", "write", "manage"] if not require_write else ["write", "manage"] + share = ( + session.query(FileShare) + .filter( + FileShare.tenant_id == tenant_id, + FileShare.file_asset_id == asset.id, + FileShare.revoked_at.is_(None), + FileShare.permission.in_(permission_values), + or_( + (FileShare.target_type == "user") & (FileShare.target_id == user_id), + (FileShare.target_type == "group") & (FileShare.target_id.in_(group_ids)), + (FileShare.target_type == "tenant") & (FileShare.target_id == tenant_id), + ), + ) + .first() + ) + if not share: + raise FileStorageError("No access to this file") + return asset + + +def list_assets_for_user( + session: Session, + *, + tenant_id: str, + user_id: str, + owner_type: str | None = None, + owner_id: str | None = None, + campaign_id: str | None = None, + path_prefix: str | None = None, + include_deleted: bool = False, + is_admin: bool = False, +) -> list[FileAsset]: + query = session.query(FileAsset).filter(FileAsset.tenant_id == tenant_id) + if not include_deleted: + query = query.filter(FileAsset.deleted_at.is_(None)) + if owner_type: + query = query.filter(FileAsset.owner_type == owner_type) + if owner_type == "user" and owner_id: + query = query.filter(FileAsset.owner_user_id == owner_id) + if owner_type == "group" and owner_id: + query = query.filter(FileAsset.owner_group_id == owner_id) + if campaign_id: + query = query.join(FileShare, FileShare.file_asset_id == FileAsset.id).filter( + FileShare.tenant_id == tenant_id, + FileShare.target_type == "campaign", + FileShare.target_id == campaign_id, + FileShare.revoked_at.is_(None), + ) + elif not is_admin and not owner_type: + group_ids = user_group_ids(session, tenant_id=tenant_id, user_id=user_id) + query = query.outerjoin(FileShare, FileShare.file_asset_id == FileAsset.id).filter( + or_( + (FileAsset.owner_type == "user") & (FileAsset.owner_user_id == user_id), + (FileAsset.owner_type == "group") & (FileAsset.owner_group_id.in_(group_ids)), + (FileShare.revoked_at.is_(None)) & (FileShare.target_type == "user") & (FileShare.target_id == user_id), + (FileShare.revoked_at.is_(None)) & (FileShare.target_type == "group") & (FileShare.target_id.in_(group_ids)), + (FileShare.revoked_at.is_(None)) & (FileShare.target_type == "tenant") & (FileShare.target_id == tenant_id), + ) + ) + if path_prefix: + prefix = normalize_folder(path_prefix) + if prefix: + query = query.filter(FileAsset.display_path.like(f"{prefix}/%")) + return query.order_by(FileAsset.display_path.asc(), FileAsset.updated_at.desc()).all() + + +def current_version_and_blob(session: Session, asset: FileAsset) -> tuple[FileVersion, FileBlob]: + if not asset.current_version_id: + raise FileStorageError("File has no current version") + version = session.get(FileVersion, asset.current_version_id) + if not version: + raise FileStorageError("File version not found") + blob = session.get(FileBlob, version.blob_id) + if not blob: + raise FileStorageError("File blob not found") + return version, blob + + +def read_asset_bytes(session: Session, asset: FileAsset) -> tuple[bytes, FileVersion, FileBlob]: + version, blob = current_version_and_blob(session, asset) + backend = get_storage_backend() + return backend.get_bytes(blob.storage_key), version, blob + + +def share_file( + session: Session, + *, + tenant_id: str, + asset: FileAsset, + target_type: str, + target_id: str, + permission: str, + user_id: str, +) -> FileShare: + target_type = target_type.lower().strip() + permission = permission.lower().strip() + if target_type not in {"user", "group", "campaign", "tenant"}: + raise FileStorageError("Unsupported share target") + if permission not in {"read", "write", "manage"}: + raise FileStorageError("Unsupported file permission") + if target_type == "campaign": + campaign = session.get(Campaign, target_id) + if not campaign or campaign.tenant_id != tenant_id: + raise FileStorageError("Campaign not found") + existing = ( + session.query(FileShare) + .filter( + FileShare.tenant_id == tenant_id, + FileShare.file_asset_id == asset.id, + FileShare.target_type == target_type, + FileShare.target_id == target_id, + FileShare.revoked_at.is_(None), + ) + .one_or_none() + ) + if existing: + existing.permission = permission + session.add(existing) + return existing + share = FileShare( + tenant_id=tenant_id, + file_asset_id=asset.id, + target_type=target_type, + target_id=target_id, + permission=permission, + created_by_user_id=user_id, + ) + session.add(share) + return share + + +def soft_delete_assets(session: Session, assets: Iterable[FileAsset]) -> int: + count = 0 + now = utcnow() + for asset in assets: + if asset.deleted_at is None: + asset.deleted_at = now + session.add(asset) + count += 1 + return count + + +def asset_is_audit_relevant(session: Session, asset: FileAsset) -> bool: + return ( + session.query(CampaignAttachmentUse) + .filter(CampaignAttachmentUse.file_asset_id == asset.id, CampaignAttachmentUse.use_stage == "sent") + .first() + is not None + ) + + +def _asset_owner_id(asset: FileAsset) -> str: + if asset.owner_type == "user" and asset.owner_user_id: + return asset.owner_user_id + if asset.owner_type == "group" and asset.owner_group_id: + return asset.owner_group_id + raise FileStorageError("File has no valid owner") + + +def _active_asset_exists(session: Session, *, tenant_id: str, owner_type: str, owner_id: str, path: str, exclude_asset_id: str | None = None) -> bool: + return _active_asset_at_path( + session, + tenant_id=tenant_id, + owner_type=owner_type, + owner_id=owner_id, + path=path, + exclude_asset_id=exclude_asset_id, + ) is not None + + +def _active_asset_at_path(session: Session, *, tenant_id: str, owner_type: str, owner_id: str, path: str, exclude_asset_id: str | None = None) -> FileAsset | None: + query = _asset_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id).filter( + FileAsset.deleted_at.is_(None), + FileAsset.display_path == normalize_logical_path(path), + ) + if exclude_asset_id: + query = query.filter(FileAsset.id != exclude_asset_id) + return query.first() + + +def _soft_delete_conflicting_asset(session: Session, *, tenant_id: str, owner_type: str, owner_id: str, path: str, exclude_asset_id: str | None = None) -> None: + asset = _active_asset_at_path( + session, + tenant_id=tenant_id, + owner_type=owner_type, + owner_id=owner_id, + path=path, + exclude_asset_id=exclude_asset_id, + ) + if asset is not None: + asset.deleted_at = utcnow() + session.add(asset) + + +def _split_logical_path(path: str) -> tuple[str, str, str, str]: + normalized = normalize_logical_path(path) + logical = PurePosixPath(normalized) + folder = "" if str(logical.parent) == "." else str(logical.parent) + filename = logical.name + suffixes = "".join(PurePosixPath(filename).suffixes) + stem = filename[: -len(suffixes)] if suffixes else filename + return folder, filename, stem, suffixes + + +def _candidate_renamed_path(path: str, counter: int) -> str: + folder, _filename, stem, suffixes = _split_logical_path(path) + suffix = " copy" if counter == 1 else f" copy {counter}" + next_name = f"{stem}{suffix}{suffixes}" + return normalize_logical_path(f"{folder}/{next_name}" if folder else next_name) + + +def _next_available_logical_path( + session: Session, + *, + tenant_id: str, + owner_type: str, + owner_id: str, + desired_path: str, + reserved_paths: set[str] | None = None, + exclude_asset_id: str | None = None, +) -> str: + reserved = reserved_paths or set() + desired = normalize_logical_path(desired_path) + if desired not in reserved and not _active_asset_exists( + session, + tenant_id=tenant_id, + owner_type=owner_type, + owner_id=owner_id, + path=desired, + exclude_asset_id=exclude_asset_id, + ): + return desired + counter = 1 + while True: + candidate = _candidate_renamed_path(desired, counter) + if candidate not in reserved and not _active_asset_exists( + session, + tenant_id=tenant_id, + owner_type=owner_type, + owner_id=owner_id, + path=candidate, + exclude_asset_id=exclude_asset_id, + ): + return candidate + counter += 1 + + +def _resolution_by_path(conflict_resolutions: Iterable[FileConflictResolution] | None) -> dict[str, FileConflictResolution]: + result: dict[str, FileConflictResolution] = {} + for item in conflict_resolutions or []: + target_path = normalize_logical_path(item.target_path) + action = item.action.lower().strip() + if action not in {"overwrite", "rename", "skip"}: + raise FileStorageError("Unsupported conflict resolution") + result[target_path] = FileConflictResolution(target_path=target_path, action=action, new_path=item.new_path) + return result + + +def _normalize_conflict_strategy(strategy: str | None) -> str: + normalized = (strategy or "reject").lower().strip() + if normalized not in {"reject", "overwrite", "rename"}: + raise FileStorageError("Unsupported conflict strategy") + return normalized + + +def _copy_asset_to_path( + session: Session, + asset: FileAsset, + *, + tenant_id: str, + target_owner_type: str, + target_owner_id: str, + target_path: str, + user_id: str, +) -> FileAsset: + version, blob = current_version_and_blob(session, asset) + blob.ref_count += 1 + session.add(blob) + normalized_path = normalize_logical_path(target_path) + copied = FileAsset( + tenant_id=tenant_id, + owner_type=target_owner_type, + owner_user_id=target_owner_id if target_owner_type == "user" else None, + owner_group_id=target_owner_id if target_owner_type == "group" else None, + display_path=normalized_path, + filename=filename_from_path(normalized_path), + description=asset.description, + created_by_user_id=user_id, + metadata_=dict(asset.metadata_ or {}), + ) + session.add(copied) + session.flush() + copied_version = FileVersion( + tenant_id=tenant_id, + file_asset_id=copied.id, + blob_id=blob.id, + version_number=1, + filename_at_upload=version.filename_at_upload, + display_path_at_upload=normalized_path, + content_type=version.content_type, + size_bytes=blob.size_bytes, + checksum_sha256=blob.checksum_sha256, + created_by_user_id=user_id, + ) + session.add(copied_version) + session.flush() + copied.current_version_id = copied_version.id + session.add(copied) + return copied + + +def rename_asset(asset: FileAsset, *, new_path: str) -> None: + normalized = normalize_logical_path(new_path) + asset.display_path = normalized + asset.filename = filename_from_path(normalized) diff --git a/server/app/storage/folders.py b/server/app/storage/folders.py new file mode 100644 index 0000000..1e47a01 --- /dev/null +++ b/server/app/storage/folders.py @@ -0,0 +1,173 @@ +from __future__ import annotations + +from sqlalchemy import or_ +from sqlalchemy.orm import Session + +from app.db.models import FileAsset, FileFolder +from app.storage.access import ensure_owner_access +from app.storage.common import FileStorageError, utcnow +from app.storage.files import _asset_query_for_owner +from app.storage.paths import normalize_folder + + +def _owner_filter(query, owner_type: str, owner_id: str): + if owner_type == "user": + return query.filter(FileFolder.owner_user_id == owner_id) + if owner_type == "group": + return query.filter(FileFolder.owner_group_id == owner_id) + raise FileStorageError("Unsupported owner type") + + +def create_folder( + session: Session, + *, + tenant_id: str, + owner_type: str, + owner_id: str, + user_id: str, + path: str, + is_admin: bool = False, +) -> FileFolder: + owner_type = owner_type.lower().strip() + ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin) + normalized = normalize_folder(path) + if not normalized: + raise FileStorageError("Folder path is required") + query = session.query(FileFolder).filter(FileFolder.tenant_id == tenant_id, FileFolder.owner_type == owner_type, FileFolder.path == normalized) + query = _owner_filter(query, owner_type, owner_id) + active_existing = query.filter(FileFolder.deleted_at.is_(None)).first() + if active_existing is not None: + raise FileStorageError(f"Folder already exists: {normalized}") + deleted_existing = query.filter(FileFolder.deleted_at.is_not(None)).first() + if deleted_existing is not None: + deleted_existing.deleted_at = None + session.add(deleted_existing) + session.flush() + return deleted_existing + folder = FileFolder( + tenant_id=tenant_id, + owner_type=owner_type, + owner_user_id=owner_id if owner_type == "user" else None, + owner_group_id=owner_id if owner_type == "group" else None, + path=normalized, + created_by_user_id=user_id, + metadata_={}, + ) + session.add(folder) + session.flush() + return folder + + +def list_folders_for_user( + session: Session, + *, + tenant_id: str, + user_id: str, + owner_type: str, + owner_id: str, + include_deleted: bool = False, + is_admin: bool = False, +) -> list[FileFolder]: + owner_type = owner_type.lower().strip() + ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin) + query = session.query(FileFolder).filter(FileFolder.tenant_id == tenant_id, FileFolder.owner_type == owner_type) + query = _owner_filter(query, owner_type, owner_id) + if not include_deleted: + query = query.filter(FileFolder.deleted_at.is_(None)) + return query.order_by(FileFolder.path.asc()).all() + + +def soft_delete_folder( + session: Session, + *, + tenant_id: str, + owner_type: str, + owner_id: str, + user_id: str, + path: str, + recursive: bool = True, + is_admin: bool = False, +) -> tuple[int, int]: + owner_type = owner_type.lower().strip() + ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin) + normalized = normalize_folder(path) + if not normalized: + raise FileStorageError("Folder path is required") + prefix = f"{normalized}/" + now = utcnow() + + folder_query = session.query(FileFolder).filter(FileFolder.tenant_id == tenant_id, FileFolder.owner_type == owner_type, FileFolder.deleted_at.is_(None)) + folder_query = _owner_filter(folder_query, owner_type, owner_id) + if recursive: + folder_query = folder_query.filter(or_(FileFolder.path == normalized, FileFolder.path.like(f"{prefix}%"))) + else: + child_exists = folder_query.filter(FileFolder.path.like(f"{prefix}%")).first() is not None + file_exists = _asset_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id).filter(FileAsset.display_path.like(f"{prefix}%")).first() is not None + if child_exists or file_exists: + raise FileStorageError("Folder is not empty") + folder_query = folder_query.filter(FileFolder.path == normalized) + + folders = folder_query.all() + for folder in folders: + folder.deleted_at = now + session.add(folder) + + file_query = _asset_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id).filter(FileAsset.deleted_at.is_(None), FileAsset.display_path.like(f"{prefix}%")) + assets = file_query.all() if recursive else [] + for asset in assets: + asset.deleted_at = now + session.add(asset) + + return len(folders), len(assets) + + +def _active_folder_exists(session: Session, *, tenant_id: str, owner_type: str, owner_id: str, path: str, exclude_paths: set[str] | None = None) -> bool: + normalized = normalize_folder(path) + if not normalized: + return False + query = session.query(FileFolder).filter( + FileFolder.tenant_id == tenant_id, + FileFolder.owner_type == owner_type, + FileFolder.deleted_at.is_(None), + FileFolder.path == normalized, + ) + query = _owner_filter(query, owner_type, owner_id) + if exclude_paths: + query = query.filter(FileFolder.path.notin_(exclude_paths)) + return query.first() is not None + + +def _folder_query_for_owner(session: Session, *, tenant_id: str, owner_type: str, owner_id: str): + query = session.query(FileFolder).filter(FileFolder.tenant_id == tenant_id, FileFolder.owner_type == owner_type) + return _owner_filter(query, owner_type, owner_id) + + +def _ensure_target_folder_hierarchy( + session: Session, + *, + tenant_id: str, + owner_type: str, + owner_id: str, + user_id: str, + path: str, +) -> None: + parts = normalize_folder(path).split("/") if normalize_folder(path) else [] + for index in range(1, len(parts) + 1): + partial = "/".join(parts[:index]) + query = _folder_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id).filter(FileFolder.path == partial) + existing = query.one_or_none() + if existing: + if existing.deleted_at is not None: + existing.deleted_at = None + session.add(existing) + continue + folder = FileFolder( + tenant_id=tenant_id, + owner_type=owner_type, + owner_user_id=owner_id if owner_type == "user" else None, + owner_group_id=owner_id if owner_type == "group" else None, + path=partial, + created_by_user_id=user_id, + metadata_={}, + ) + session.add(folder) diff --git a/server/app/storage/search.py b/server/app/storage/search.py new file mode 100644 index 0000000..202797a --- /dev/null +++ b/server/app/storage/search.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +import re +from typing import Iterable + +from app.db.models import FileAsset +from app.storage.common import ResolvedPattern +from app.storage.paths import normalize_folder, normalize_logical_path + + +def _normalize_pattern(pattern: str) -> str: + if pattern.strip() in {"", "*"}: + return "*" + return normalize_logical_path(pattern, fallback_filename="*") + + +def _logical_glob_regex(pattern: str, *, case_sensitive: bool = False) -> re.Pattern[str]: + """Compile Multi Seal Mail logical globs. + + `*` and `?` stay within one folder segment. `**` crosses folder + boundaries, and `**/` also matches the current folder so `**/*.pdf` + returns direct and nested PDF files. + """ + + pattern = _normalize_pattern(pattern) + pieces = ["^"] + index = 0 + while index < len(pattern): + char = pattern[index] + if char == "*": + if index + 1 < len(pattern) and pattern[index + 1] == "*": + index += 2 + if index < len(pattern) and pattern[index] == "/": + pieces.append("(?:.*/)?") + index += 1 + else: + pieces.append(".*") + continue + pieces.append("[^/]*") + elif char == "?": + pieces.append("[^/]") + else: + pieces.append(re.escape(char)) + index += 1 + pieces.append("$") + flags = 0 if case_sensitive else re.IGNORECASE + return re.compile("".join(pieces), flags) + + +def _relative_display_path(asset: FileAsset, base_path: str | None) -> str: + path = normalize_logical_path(asset.display_path) + base = normalize_folder(base_path) + if not base: + return path + prefix = f"{base}/" + if path.startswith(prefix): + return path[len(prefix) :] + return path + + +def match_assets(assets: Iterable[FileAsset], pattern: str, *, base_path: str | None = None, case_sensitive: bool = False) -> list[FileAsset]: + regex = _logical_glob_regex(pattern, case_sensitive=case_sensitive) + normalized_pattern = _normalize_pattern(pattern) + has_path_context = base_path is not None or "/" in normalized_pattern or "**" in normalized_pattern + matches: list[FileAsset] = [] + for asset in assets: + candidates = [_relative_display_path(asset, base_path)] if has_path_context else [asset.display_path, asset.filename] + if any(regex.match(candidate) for candidate in candidates): + matches.append(asset) + return matches + + +def resolve_patterns(assets: list[FileAsset], patterns: list[str], *, base_path: str | None = None, case_sensitive: bool = False) -> tuple[list[ResolvedPattern], list[FileAsset]]: + resolved = [ResolvedPattern(pattern=pattern, matches=match_assets(assets, pattern, base_path=base_path, case_sensitive=case_sensitive)) for pattern in patterns] + matched_ids = {asset.id for item in resolved for asset in item.matches} + unmatched = [asset for asset in assets if asset.id not in matched_ids] + return resolved, unmatched diff --git a/server/app/storage/services.py b/server/app/storage/services.py index 9ac9b64..e991d8e 100644 --- a/server/app/storage/services.py +++ b/server/app/storage/services.py @@ -1,750 +1,86 @@ from __future__ import annotations -import hashlib -import mimetypes -import re -import zipfile -from dataclasses import dataclass -from datetime import datetime, timezone -from io import BytesIO -from pathlib import PurePosixPath -from typing import Any, Iterable -from uuid import uuid4 +# Compatibility facade for existing storage imports. New storage code should +# prefer importing from the focused modules directly. -from sqlalchemy import or_ -from sqlalchemy.orm import Session - -from app.db.models import ( - Campaign, - CampaignAttachmentUse, - CampaignJob, - FileAsset, - FileBlob, - FileFolder, - FileShare, - FileVersion, - Group, - UserGroupMembership, +from app.storage.access import ensure_group_access, ensure_owner_access, user_group_ids +from app.storage.archives import create_zip_bytes, extract_zip_upload +from app.storage.campaign_usage import mark_job_attachment_uses_sent, record_campaign_attachment_uses_for_job +from app.storage.common import ( + FileConflictResolution, + FileStorageError, + RenamePlanItem, + ResolvedPattern, + UploadedStoredFile, + utcnow, ) -from app.settings import settings -from app.storage.backends import get_storage_backend -from app.storage.paths import filename_from_path, join_folder_filename, normalize_folder, normalize_logical_path, safe_storage_component - - -class FileStorageError(RuntimeError): - pass - - -@dataclass(slots=True) -class UploadedStoredFile: - asset: FileAsset - version: FileVersion - blob: FileBlob - - -@dataclass(slots=True) -class ResolvedPattern: - pattern: str - matches: list[FileAsset] - - -def utcnow() -> datetime: - return datetime.now(timezone.utc) - - -def user_group_ids(session: Session, *, tenant_id: str, user_id: str, include_admin_groups: bool = False) -> list[str]: - if include_admin_groups: - return [row.id for row in session.query(Group).filter(Group.tenant_id == tenant_id).order_by(Group.name.asc()).all()] - return [ - row.group_id - for row in session.query(UserGroupMembership) - .filter(UserGroupMembership.tenant_id == tenant_id, UserGroupMembership.user_id == user_id) - .all() - ] - - -def ensure_group_access(session: Session, *, tenant_id: str, group_id: str, user_id: str, is_admin: bool = False) -> None: - group = session.get(Group, group_id) - if not group or group.tenant_id != tenant_id: - raise FileStorageError("Group not found") - if is_admin: - return - membership = ( - session.query(UserGroupMembership) - .filter(UserGroupMembership.tenant_id == tenant_id, UserGroupMembership.user_id == user_id, UserGroupMembership.group_id == group_id) - .one_or_none() - ) - if membership is None: - raise FileStorageError("No access to this group file space") - - - - -def _owner_filter(query, owner_type: str, owner_id: str): - if owner_type == "user": - return query.filter(FileFolder.owner_user_id == owner_id) - if owner_type == "group": - return query.filter(FileFolder.owner_group_id == owner_id) - raise FileStorageError("Unsupported owner type") - - -def ensure_owner_access(session: Session, *, tenant_id: str, owner_type: str, owner_id: str, user_id: str, is_admin: bool = False) -> None: - owner_type = owner_type.lower().strip() - if owner_type == "user": - if owner_id != user_id and not is_admin: - raise FileStorageError("No access to this user file space") - return - if owner_type == "group": - ensure_group_access(session, tenant_id=tenant_id, group_id=owner_id, user_id=user_id, is_admin=is_admin) - return - raise FileStorageError("Files must be owned by a user or group") - - -def create_folder( - session: Session, - *, - tenant_id: str, - owner_type: str, - owner_id: str, - user_id: str, - path: str, - is_admin: bool = False, -) -> FileFolder: - owner_type = owner_type.lower().strip() - ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin) - normalized = normalize_folder(path) - if not normalized: - raise FileStorageError("Folder path is required") - query = session.query(FileFolder).filter(FileFolder.tenant_id == tenant_id, FileFolder.owner_type == owner_type, FileFolder.path == normalized) - query = _owner_filter(query, owner_type, owner_id) - existing = query.order_by(FileFolder.deleted_at.asc()).first() - if existing: - if existing.deleted_at is not None: - existing.deleted_at = None - session.add(existing) - return existing - folder = FileFolder( - tenant_id=tenant_id, - owner_type=owner_type, - owner_user_id=owner_id if owner_type == "user" else None, - owner_group_id=owner_id if owner_type == "group" else None, - path=normalized, - created_by_user_id=user_id, - metadata_={}, - ) - session.add(folder) - session.flush() - return folder - - -def list_folders_for_user( - session: Session, - *, - tenant_id: str, - user_id: str, - owner_type: str, - owner_id: str, - include_deleted: bool = False, - is_admin: bool = False, -) -> list[FileFolder]: - owner_type = owner_type.lower().strip() - ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin) - query = session.query(FileFolder).filter(FileFolder.tenant_id == tenant_id, FileFolder.owner_type == owner_type) - query = _owner_filter(query, owner_type, owner_id) - if not include_deleted: - query = query.filter(FileFolder.deleted_at.is_(None)) - return query.order_by(FileFolder.path.asc()).all() - - -def soft_delete_folder( - session: Session, - *, - tenant_id: str, - owner_type: str, - owner_id: str, - user_id: str, - path: str, - recursive: bool = True, - is_admin: bool = False, -) -> tuple[int, int]: - owner_type = owner_type.lower().strip() - ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin) - normalized = normalize_folder(path) - if not normalized: - raise FileStorageError("Folder path is required") - prefix = f"{normalized}/" - now = utcnow() - - folder_query = session.query(FileFolder).filter(FileFolder.tenant_id == tenant_id, FileFolder.owner_type == owner_type, FileFolder.deleted_at.is_(None)) - folder_query = _owner_filter(folder_query, owner_type, owner_id) - if recursive: - folder_query = folder_query.filter(or_(FileFolder.path == normalized, FileFolder.path.like(f"{prefix}%"))) - else: - child_exists = folder_query.filter(FileFolder.path.like(f"{prefix}%")).first() is not None - file_exists = _asset_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id).filter(FileAsset.display_path.like(f"{prefix}%")).first() is not None - if child_exists or file_exists: - raise FileStorageError("Folder is not empty") - folder_query = folder_query.filter(FileFolder.path == normalized) - - folders = folder_query.all() - for folder in folders: - folder.deleted_at = now - session.add(folder) - - file_query = _asset_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id).filter(FileAsset.deleted_at.is_(None), FileAsset.display_path.like(f"{prefix}%")) - assets = file_query.all() if recursive else [] - for asset in assets: - asset.deleted_at = now - session.add(asset) - - return len(folders), len(assets) - - -def _asset_query_for_owner(session: Session, *, tenant_id: str, owner_type: str, owner_id: str): - query = session.query(FileAsset).filter(FileAsset.tenant_id == tenant_id, FileAsset.owner_type == owner_type) - if owner_type == "user": - return query.filter(FileAsset.owner_user_id == owner_id) - if owner_type == "group": - return query.filter(FileAsset.owner_group_id == owner_id) - raise FileStorageError("Unsupported owner type") - -def _storage_bucket_name() -> str: - return settings.file_storage_s3_bucket or settings.s3_bucket - - -def _storage_backend_name() -> str: - return settings.file_storage_backend.lower().strip() - - -def _storage_key(*, tenant_id: str, checksum: str, filename: str) -> str: - return f"tenants/{tenant_id}/files/{checksum[:2]}/{uuid4().hex}-{safe_storage_component(filename)}" - - -def _get_or_create_blob( - session: Session, - *, - tenant_id: str, - data: bytes, - filename: str, - content_type: str | None, -) -> FileBlob: - checksum = hashlib.sha256(data).hexdigest() - size = len(data) - blob = ( - session.query(FileBlob) - .filter(FileBlob.tenant_id == tenant_id, FileBlob.checksum_sha256 == checksum, FileBlob.size_bytes == size) - .one_or_none() - ) - if blob: - blob.ref_count += 1 - session.add(blob) - return blob - - storage_key = _storage_key(tenant_id=tenant_id, checksum=checksum, filename=filename) - backend = get_storage_backend() - backend.put_bytes(storage_key, data, content_type=content_type) - blob = FileBlob( - tenant_id=tenant_id, - storage_backend=_storage_backend_name(), - storage_bucket=_storage_bucket_name(), - storage_key=storage_key, - checksum_sha256=checksum, - size_bytes=size, - content_type=content_type, - ref_count=1, - ) - session.add(blob) - session.flush() - return blob - - -def create_file_asset( - session: Session, - *, - tenant_id: str, - owner_type: str, - owner_id: str, - user_id: str, - filename: str, - data: bytes, - folder: str | None = None, - display_path: str | None = None, - content_type: str | None = None, - description: str | None = None, - metadata: dict[str, Any] | None = None, - campaign_id: str | None = None, - is_admin: bool = False, -) -> UploadedStoredFile: - owner_type = owner_type.lower().strip() - ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin) - - safe_filename = filename_from_path(normalize_logical_path(filename, fallback_filename="file")) - logical_path = normalize_logical_path(display_path) if display_path else join_folder_filename(folder, safe_filename) - if not content_type: - content_type = mimetypes.guess_type(safe_filename)[0] or "application/octet-stream" - - blob = _get_or_create_blob(session, tenant_id=tenant_id, data=data, filename=safe_filename, content_type=content_type) - asset = FileAsset( - tenant_id=tenant_id, - owner_type=owner_type, - owner_user_id=owner_id if owner_type == "user" else None, - owner_group_id=owner_id if owner_type == "group" else None, - display_path=logical_path, - filename=filename_from_path(logical_path), - description=description, - created_by_user_id=user_id, - metadata_=metadata or {}, - ) - session.add(asset) - session.flush() - version = FileVersion( - tenant_id=tenant_id, - file_asset_id=asset.id, - blob_id=blob.id, - version_number=1, - filename_at_upload=safe_filename, - display_path_at_upload=logical_path, - content_type=content_type, - size_bytes=blob.size_bytes, - checksum_sha256=blob.checksum_sha256, - created_by_user_id=user_id, - ) - session.add(version) - session.flush() - asset.current_version_id = version.id - session.add(asset) - if campaign_id: - share_file(session, tenant_id=tenant_id, asset=asset, target_type="campaign", target_id=campaign_id, permission="read", user_id=user_id) - return UploadedStoredFile(asset=asset, version=version, blob=blob) - - -def get_asset_for_user(session: Session, *, tenant_id: str, user_id: str, asset_id: str, require_write: bool = False, is_admin: bool = False) -> FileAsset: - asset = session.get(FileAsset, asset_id) - if not asset or asset.tenant_id != tenant_id or asset.deleted_at is not None: - raise FileStorageError("File not found") - if is_admin: - return asset - group_ids = user_group_ids(session, tenant_id=tenant_id, user_id=user_id) - owns = (asset.owner_type == "user" and asset.owner_user_id == user_id) or (asset.owner_type == "group" and asset.owner_group_id in group_ids) - if owns: - return asset - permission_values = ["read", "write", "manage"] if not require_write else ["write", "manage"] - share = ( - session.query(FileShare) - .filter( - FileShare.tenant_id == tenant_id, - FileShare.file_asset_id == asset.id, - FileShare.revoked_at.is_(None), - FileShare.permission.in_(permission_values), - or_( - (FileShare.target_type == "user") & (FileShare.target_id == user_id), - (FileShare.target_type == "group") & (FileShare.target_id.in_(group_ids)), - (FileShare.target_type == "tenant") & (FileShare.target_id == tenant_id), - ), - ) - .first() - ) - if not share: - raise FileStorageError("No access to this file") - return asset - - -def list_assets_for_user( - session: Session, - *, - tenant_id: str, - user_id: str, - owner_type: str | None = None, - owner_id: str | None = None, - campaign_id: str | None = None, - path_prefix: str | None = None, - include_deleted: bool = False, - is_admin: bool = False, -) -> list[FileAsset]: - query = session.query(FileAsset).filter(FileAsset.tenant_id == tenant_id) - if not include_deleted: - query = query.filter(FileAsset.deleted_at.is_(None)) - if owner_type: - query = query.filter(FileAsset.owner_type == owner_type) - if owner_type == "user" and owner_id: - query = query.filter(FileAsset.owner_user_id == owner_id) - if owner_type == "group" and owner_id: - query = query.filter(FileAsset.owner_group_id == owner_id) - if campaign_id: - query = query.join(FileShare, FileShare.file_asset_id == FileAsset.id).filter( - FileShare.tenant_id == tenant_id, - FileShare.target_type == "campaign", - FileShare.target_id == campaign_id, - FileShare.revoked_at.is_(None), - ) - elif not is_admin and not owner_type: - group_ids = user_group_ids(session, tenant_id=tenant_id, user_id=user_id) - query = query.outerjoin(FileShare, FileShare.file_asset_id == FileAsset.id).filter( - or_( - (FileAsset.owner_type == "user") & (FileAsset.owner_user_id == user_id), - (FileAsset.owner_type == "group") & (FileAsset.owner_group_id.in_(group_ids)), - (FileShare.revoked_at.is_(None)) & (FileShare.target_type == "user") & (FileShare.target_id == user_id), - (FileShare.revoked_at.is_(None)) & (FileShare.target_type == "group") & (FileShare.target_id.in_(group_ids)), - (FileShare.revoked_at.is_(None)) & (FileShare.target_type == "tenant") & (FileShare.target_id == tenant_id), - ) - ) - if path_prefix: - prefix = normalize_folder(path_prefix) - if prefix: - query = query.filter(FileAsset.display_path.like(f"{prefix}/%")) - return query.order_by(FileAsset.display_path.asc(), FileAsset.updated_at.desc()).all() - - -def current_version_and_blob(session: Session, asset: FileAsset) -> tuple[FileVersion, FileBlob]: - if not asset.current_version_id: - raise FileStorageError("File has no current version") - version = session.get(FileVersion, asset.current_version_id) - if not version: - raise FileStorageError("File version not found") - blob = session.get(FileBlob, version.blob_id) - if not blob: - raise FileStorageError("File blob not found") - return version, blob - - -def read_asset_bytes(session: Session, asset: FileAsset) -> tuple[bytes, FileVersion, FileBlob]: - version, blob = current_version_and_blob(session, asset) - backend = get_storage_backend() - return backend.get_bytes(blob.storage_key), version, blob - - -def share_file( - session: Session, - *, - tenant_id: str, - asset: FileAsset, - target_type: str, - target_id: str, - permission: str, - user_id: str, -) -> FileShare: - target_type = target_type.lower().strip() - permission = permission.lower().strip() - if target_type not in {"user", "group", "campaign", "tenant"}: - raise FileStorageError("Unsupported share target") - if permission not in {"read", "write", "manage"}: - raise FileStorageError("Unsupported file permission") - if target_type == "campaign": - campaign = session.get(Campaign, target_id) - if not campaign or campaign.tenant_id != tenant_id: - raise FileStorageError("Campaign not found") - existing = ( - session.query(FileShare) - .filter( - FileShare.tenant_id == tenant_id, - FileShare.file_asset_id == asset.id, - FileShare.target_type == target_type, - FileShare.target_id == target_id, - FileShare.revoked_at.is_(None), - ) - .one_or_none() - ) - if existing: - existing.permission = permission - session.add(existing) - return existing - share = FileShare( - tenant_id=tenant_id, - file_asset_id=asset.id, - target_type=target_type, - target_id=target_id, - permission=permission, - created_by_user_id=user_id, - ) - session.add(share) - return share - - -def soft_delete_assets(session: Session, assets: Iterable[FileAsset]) -> int: - count = 0 - now = utcnow() - for asset in assets: - if asset.deleted_at is None: - asset.deleted_at = now - session.add(asset) - count += 1 - return count - - -def asset_is_audit_relevant(session: Session, asset: FileAsset) -> bool: - return ( - session.query(CampaignAttachmentUse) - .filter(CampaignAttachmentUse.file_asset_id == asset.id, CampaignAttachmentUse.use_stage == "sent") - .first() - is not None - ) - - -def _normalize_pattern(pattern: str) -> str: - if pattern.strip() in {"", "*"}: - return "*" - return normalize_logical_path(pattern, fallback_filename="*") - - -def _logical_glob_regex(pattern: str) -> re.Pattern[str]: - """Compile Multi Seal Mail logical globs. - - `*` and `?` stay within one folder segment. `**` crosses folder - boundaries, and `**/` also matches the current folder so `**/*.pdf` - returns direct and nested PDF files. - """ - - pattern = _normalize_pattern(pattern) - pieces = ["^"] - index = 0 - while index < len(pattern): - char = pattern[index] - if char == "*": - if index + 1 < len(pattern) and pattern[index + 1] == "*": - index += 2 - if index < len(pattern) and pattern[index] == "/": - pieces.append("(?:.*/)?") - index += 1 - else: - pieces.append(".*") - continue - pieces.append("[^/]*") - elif char == "?": - pieces.append("[^/]") - else: - pieces.append(re.escape(char)) - index += 1 - pieces.append("$") - return re.compile("".join(pieces)) - - -def _relative_display_path(asset: FileAsset, base_path: str | None) -> str: - path = normalize_logical_path(asset.display_path) - base = normalize_folder(base_path) - if not base: - return path - prefix = f"{base}/" - if path.startswith(prefix): - return path[len(prefix) :] - return path - - -def match_assets(assets: Iterable[FileAsset], pattern: str, *, base_path: str | None = None) -> list[FileAsset]: - regex = _logical_glob_regex(pattern) - normalized_pattern = _normalize_pattern(pattern) - has_path_context = base_path is not None or "/" in normalized_pattern or "**" in normalized_pattern - matches: list[FileAsset] = [] - for asset in assets: - candidates = [_relative_display_path(asset, base_path)] if has_path_context else [asset.display_path, asset.filename] - if any(regex.match(candidate) for candidate in candidates): - matches.append(asset) - return matches - - -def resolve_patterns(assets: list[FileAsset], patterns: list[str], *, base_path: str | None = None) -> tuple[list[ResolvedPattern], list[FileAsset]]: - resolved = [ResolvedPattern(pattern=pattern, matches=match_assets(assets, pattern, base_path=base_path)) for pattern in patterns] - matched_ids = {asset.id for item in resolved for asset in item.matches} - unmatched = [asset for asset in assets if asset.id not in matched_ids] - return resolved, unmatched - - -def rename_asset(asset: FileAsset, *, new_path: str) -> None: - normalized = normalize_logical_path(new_path) - asset.display_path = normalized - asset.filename = filename_from_path(normalized) - - -def build_rename_preview(asset: FileAsset, *, mode: str, find: str | None = None, replacement: str = "", prefix: str = "", suffix: str = "") -> str: - path = PurePosixPath(asset.display_path) - folder = "" if str(path.parent) == "." else str(path.parent) - name = path.name - stem = PurePosixPath(name).stem - ext = "".join(PurePosixPath(name).suffixes) - if mode == "prefix": - next_name = prefix + name - elif mode == "suffix": - next_name = f"{stem}{suffix}{ext}" - elif mode == "replace": - if not find: - next_name = name - else: - next_name = name.replace(find, replacement) - else: - raise FileStorageError("Unsupported rename mode") - return f"{folder}/{next_name}" if folder else next_name - - -def create_zip_bytes(session: Session, assets: Iterable[FileAsset]) -> bytes: - buffer = BytesIO() - with zipfile.ZipFile(buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as archive: - for asset in assets: - data, _, _ = read_asset_bytes(session, asset) - archive.writestr(asset.display_path, data) - buffer.seek(0) - return buffer.getvalue() - - -def extract_zip_upload( - session: Session, - *, - tenant_id: str, - owner_type: str, - owner_id: str, - user_id: str, - zip_data: bytes, - folder: str | None, - campaign_id: str | None, - is_admin: bool = False, - max_files: int = 1000, - max_total_bytes: int = 250 * 1024 * 1024, -) -> list[UploadedStoredFile]: - uploaded: list[UploadedStoredFile] = [] - total = 0 - base_folder = normalize_folder(folder) - with zipfile.ZipFile(BytesIO(zip_data)) as archive: - infos = [info for info in archive.infolist() if not info.is_dir()] - if len(infos) > max_files: - raise FileStorageError(f"ZIP contains too many files (limit {max_files})") - for info in infos: - if info.file_size < 0: - raise FileStorageError("Invalid ZIP member") - total += info.file_size - if total > max_total_bytes: - raise FileStorageError("ZIP is too large after extraction") - inner_path = normalize_logical_path(info.filename) - target_path = f"{base_folder}/{inner_path}" if base_folder else inner_path - data = archive.read(info) - uploaded.append( - create_file_asset( - session, - tenant_id=tenant_id, - owner_type=owner_type, - owner_id=owner_id, - user_id=user_id, - filename=filename_from_path(inner_path), - data=data, - display_path=target_path, - content_type=mimetypes.guess_type(inner_path)[0] or "application/octet-stream", - campaign_id=campaign_id, - is_admin=is_admin, - ) - ) - return uploaded - - -def _candidate_match_keys(raw_match: str) -> set[str]: - cleaned = raw_match.replace("\\", "/").strip().strip("/") - result = {cleaned} - if cleaned: - result.add(PurePosixPath(cleaned).name) - return {item for item in result if item} - - -def record_campaign_attachment_uses_for_job(session: Session, job: CampaignJob, *, stage: str = "built") -> None: - """Create best-effort immutable file-use records for matched managed files. - - Existing attachment resolution is still filesystem/path based. This bridge - records uses when a resolved attachment match can be tied to a managed file - by logical path or filename among files shared with the campaign. - """ - - attachments = job.resolved_attachments or [] - if not isinstance(attachments, list): - return - assets = list_assets_for_user( - session, - tenant_id=job.tenant_id, - user_id="", - campaign_id=job.campaign_id, - is_admin=True, - ) - by_key: dict[str, FileAsset] = {} - for asset in assets: - by_key[asset.display_path.strip("/")] = asset - by_key[asset.filename] = asset - for attachment in attachments: - if not isinstance(attachment, dict): - continue - matches = attachment.get("matches") if isinstance(attachment.get("matches"), list) else [] - for raw in matches: - if not isinstance(raw, str): - continue - asset = next((by_key[key] for key in _candidate_match_keys(raw) if key in by_key), None) - if not asset: - continue - version, blob = current_version_and_blob(session, asset) - exists = ( - session.query(CampaignAttachmentUse) - .filter( - CampaignAttachmentUse.campaign_job_id == job.id, - CampaignAttachmentUse.file_version_id == version.id, - CampaignAttachmentUse.filename_used == asset.filename, - CampaignAttachmentUse.use_stage == stage, - ) - .one_or_none() - ) - if exists: - continue - session.add( - CampaignAttachmentUse( - tenant_id=job.tenant_id, - campaign_id=job.campaign_id, - campaign_version_id=job.campaign_version_id, - campaign_job_id=job.id, - entry_index=job.entry_index, - entry_id=job.entry_id, - file_asset_id=asset.id, - file_version_id=version.id, - file_blob_id=blob.id, - filename_used=asset.filename, - checksum_sha256=blob.checksum_sha256, - size_bytes=blob.size_bytes, - content_type=blob.content_type, - use_stage=stage, - ) - ) - - -def mark_job_attachment_uses_sent(session: Session, job: CampaignJob) -> None: - record_campaign_attachment_uses_for_job(session, job, stage="built") - now = utcnow() - uses = ( - session.query(CampaignAttachmentUse) - .filter( - CampaignAttachmentUse.tenant_id == job.tenant_id, - CampaignAttachmentUse.campaign_job_id == job.id, - CampaignAttachmentUse.use_stage == "built", - ) - .all() - ) - for use in uses: - sent = ( - session.query(CampaignAttachmentUse) - .filter( - CampaignAttachmentUse.campaign_job_id == job.id, - CampaignAttachmentUse.file_version_id == use.file_version_id, - CampaignAttachmentUse.use_stage == "sent", - ) - .one_or_none() - ) - if sent: - continue - session.add( - CampaignAttachmentUse( - tenant_id=use.tenant_id, - campaign_id=use.campaign_id, - campaign_version_id=use.campaign_version_id, - campaign_job_id=use.campaign_job_id, - entry_index=use.entry_index, - entry_id=use.entry_id, - file_asset_id=use.file_asset_id, - file_version_id=use.file_version_id, - file_blob_id=use.file_blob_id, - filename_used=use.filename_used, - checksum_sha256=use.checksum_sha256, - size_bytes=use.size_bytes, - content_type=use.content_type, - use_stage="sent", - used_at=now, - ) - ) +from app.storage.files import ( + _active_asset_at_path, + _active_asset_exists, + _asset_owner_id, + _asset_query_for_owner, + _candidate_renamed_path, + _copy_asset_to_path, + _get_or_create_blob, + _next_available_logical_path, + _normalize_conflict_strategy, + _resolution_by_path, + _soft_delete_conflicting_asset, + _split_logical_path, + _storage_backend_name, + _storage_bucket_name, + _storage_key, + asset_is_audit_relevant, + create_file_asset, + current_version_and_blob, + get_asset_for_user, + list_assets_for_user, + read_asset_bytes, + rename_asset, + share_file, + soft_delete_assets, +) +from app.storage.folders import ( + _active_folder_exists, + _ensure_target_folder_hierarchy, + _folder_query_for_owner, + _owner_filter, + create_folder, + list_folders_for_user, + soft_delete_folder, +) +from app.storage.search import match_assets, resolve_patterns +from app.storage.transfers import build_rename_preview, rename_selection, transfer_selection + +__all__ = [ + "FileConflictResolution", + "FileStorageError", + "RenamePlanItem", + "ResolvedPattern", + "UploadedStoredFile", + "asset_is_audit_relevant", + "build_rename_preview", + "create_file_asset", + "create_folder", + "create_zip_bytes", + "current_version_and_blob", + "ensure_group_access", + "ensure_owner_access", + "extract_zip_upload", + "get_asset_for_user", + "list_assets_for_user", + "list_folders_for_user", + "mark_job_attachment_uses_sent", + "match_assets", + "read_asset_bytes", + "record_campaign_attachment_uses_for_job", + "rename_asset", + "rename_selection", + "resolve_patterns", + "share_file", + "soft_delete_assets", + "soft_delete_folder", + "transfer_selection", + "user_group_ids", + "utcnow", +] diff --git a/server/app/storage/transfers.py b/server/app/storage/transfers.py new file mode 100644 index 0000000..cc278bd --- /dev/null +++ b/server/app/storage/transfers.py @@ -0,0 +1,447 @@ +from __future__ import annotations + +from pathlib import PurePosixPath +from typing import Iterable + +from sqlalchemy import or_ +from sqlalchemy.orm import Session + +from app.db.models import FileAsset, FileFolder +from app.storage.access import ensure_owner_access +from app.storage.common import FileConflictResolution, FileStorageError, RenamePlanItem, utcnow +from app.storage.files import ( + _active_asset_exists, + _asset_owner_id, + _asset_query_for_owner, + _candidate_renamed_path, + _copy_asset_to_path, + _next_available_logical_path, + _normalize_conflict_strategy, + _resolution_by_path, + _soft_delete_conflicting_asset, + current_version_and_blob, + get_asset_for_user, + rename_asset, +) +from app.storage.folders import _active_folder_exists, _ensure_target_folder_hierarchy, _folder_query_for_owner +from app.storage.paths import filename_from_path, join_folder_filename, normalize_folder, normalize_logical_path + + +def transfer_selection( + session: Session, + *, + tenant_id: str, + user_id: str, + operation: str, + file_ids: list[str], + folder_paths: list[str], + source_owner_type: str, + source_owner_id: str, + target_owner_type: str, + target_owner_id: str, + target_folder: str, + conflict_strategy: str = "reject", + conflict_resolutions: Iterable[FileConflictResolution] | None = None, + is_admin: bool = False, +) -> tuple[int, int]: + """Move or copy files/folders between user/group file spaces. + + Folder transfers preserve the selected folder's basename below the target + folder. File transfers place files directly in the target folder. Existing + active target paths are handled by the requested conflict strategy. Copies + create new file assets/versions that reference the existing immutable blob. + """ + + operation = operation.lower().strip() + if operation not in {"move", "copy"}: + raise FileStorageError("Unsupported transfer operation") + source_owner_type = source_owner_type.lower().strip() + target_owner_type = target_owner_type.lower().strip() + source_folder_paths = [normalize_folder(path) for path in folder_paths if normalize_folder(path)] + target_folder = normalize_folder(target_folder) + conflict_strategy = _normalize_conflict_strategy(conflict_strategy) + conflict_resolution_map = _resolution_by_path(conflict_resolutions) + + ensure_owner_access(session, tenant_id=tenant_id, owner_type=source_owner_type, owner_id=source_owner_id, user_id=user_id, is_admin=is_admin) + ensure_owner_access(session, tenant_id=tenant_id, owner_type=target_owner_type, owner_id=target_owner_id, user_id=user_id, is_admin=is_admin) + + if operation == "move" and source_owner_type == target_owner_type and source_owner_id == target_owner_id: + for folder_path in source_folder_paths: + if target_folder == folder_path or target_folder.startswith(f"{folder_path}/"): + raise FileStorageError("Cannot move a folder into itself or one of its child folders") + + assets_by_id: dict[str, FileAsset] = {} + for file_id in file_ids: + asset = get_asset_for_user( + session, + tenant_id=tenant_id, + user_id=user_id, + asset_id=file_id, + require_write=operation == "move", + is_admin=is_admin, + ) + assets_by_id[asset.id] = asset + + folder_asset_targets: dict[str, str] = {} + folder_target_paths: dict[str, str] = {} + for folder_path in source_folder_paths: + folder_basename = PurePosixPath(folder_path).name + target_prefix = normalize_folder(f"{target_folder}/{folder_basename}" if target_folder else folder_basename) + folder_target_paths[folder_path] = target_prefix + if operation == "copy" or not (source_owner_type == target_owner_type and source_owner_id == target_owner_id and target_prefix == folder_path): + if _active_folder_exists(session, tenant_id=tenant_id, owner_type=target_owner_type, owner_id=target_owner_id, path=target_prefix): + resolution = conflict_resolution_map.get(target_prefix) + action = resolution.action if resolution else conflict_strategy + if action == "skip": + folder_target_paths.pop(folder_path, None) + continue + if action == "rename": + # Rename the selected folder root while preserving its contents below it. + counter = 1 + candidate = target_prefix + while _active_folder_exists(session, tenant_id=tenant_id, owner_type=target_owner_type, owner_id=target_owner_id, path=candidate): + candidate = _candidate_renamed_path(target_prefix, counter) + counter += 1 + target_prefix = normalize_folder(candidate) + folder_target_paths[folder_path] = target_prefix + elif action == "reject": + raise FileStorageError(f"Target folder already exists: {target_prefix}") + # overwrite on folders means merge into the existing folder; conflicting files are still handled below. + source_assets = _asset_query_for_owner(session, tenant_id=tenant_id, owner_type=source_owner_type, owner_id=source_owner_id).filter( + FileAsset.deleted_at.is_(None), + FileAsset.display_path.like(f"{folder_path}/%"), + ).all() + for asset in source_assets: + relative = asset.display_path[len(folder_path) + 1 :] + folder_asset_targets[asset.id] = normalize_logical_path(f"{target_prefix}/{relative}") + assets_by_id[asset.id] = asset + + direct_file_targets: dict[str, str] = {} + for file_id, asset in assets_by_id.items(): + if file_id in folder_asset_targets: + continue + direct_file_targets[file_id] = normalize_logical_path(join_folder_filename(target_folder, filename_from_path(asset.display_path))) + + all_targets = {**folder_asset_targets, **direct_file_targets} + if not all_targets and not source_folder_paths: + return 0, 0 + + resolved_targets: dict[str, str] = {} + skipped_asset_ids: set[str] = set() + reserved_targets: set[str] = set() + for asset_id, target_path in all_targets.items(): + source_asset = assets_by_id[asset_id] + same_owner = (source_asset.owner_type == target_owner_type and _asset_owner_id(source_asset) == target_owner_id) + exclude = source_asset.id if operation == "move" and same_owner else None + normalized_target = normalize_logical_path(target_path) + resolution = conflict_resolution_map.get(normalized_target) + action = resolution.action if resolution else conflict_strategy + if resolution and resolution.new_path and action == "rename": + normalized_target = normalize_logical_path(resolution.new_path) + conflict = _active_asset_exists( + session, + tenant_id=tenant_id, + owner_type=target_owner_type, + owner_id=target_owner_id, + path=normalized_target, + exclude_asset_id=exclude, + ) or normalized_target in reserved_targets + if conflict: + if action == "reject": + raise FileStorageError(f"Target file already exists: {normalized_target}") + if action == "skip": + skipped_asset_ids.add(asset_id) + continue + if action == "overwrite": + _soft_delete_conflicting_asset( + session, + tenant_id=tenant_id, + owner_type=target_owner_type, + owner_id=target_owner_id, + path=normalized_target, + exclude_asset_id=exclude, + ) + elif action == "rename": + normalized_target = _next_available_logical_path( + session, + tenant_id=tenant_id, + owner_type=target_owner_type, + owner_id=target_owner_id, + desired_path=normalized_target, + reserved_paths=reserved_targets, + exclude_asset_id=exclude, + ) + elif action == "rename": + normalized_target = _next_available_logical_path( + session, + tenant_id=tenant_id, + owner_type=target_owner_type, + owner_id=target_owner_id, + desired_path=normalized_target, + reserved_paths=reserved_targets, + exclude_asset_id=exclude, + ) + reserved_targets.add(normalized_target) + resolved_targets[asset_id] = normalized_target + all_targets = resolved_targets + + copied_or_moved_files = 0 + copied_or_moved_folders = 0 + + # Create target folder hierarchy first, including selected folder roots and + # any nested folders/files parents. + same_owner_transfer = source_owner_type == target_owner_type and source_owner_id == target_owner_id + for target_path in folder_target_paths.values(): + path_to_create = target_path + if operation == "move" and same_owner_transfer: + parent = str(PurePosixPath(target_path).parent) + path_to_create = "" if parent == "." else parent + if path_to_create: + _ensure_target_folder_hierarchy(session, tenant_id=tenant_id, owner_type=target_owner_type, owner_id=target_owner_id, user_id=user_id, path=path_to_create) + copied_or_moved_folders += 1 + for target_path in all_targets.values(): + parent = str(PurePosixPath(target_path).parent) + if parent and parent != ".": + _ensure_target_folder_hierarchy(session, tenant_id=tenant_id, owner_type=target_owner_type, owner_id=target_owner_id, user_id=user_id, path=parent) + + if operation == "copy": + for asset_id, target_path in all_targets.items(): + _copy_asset_to_path(session, assets_by_id[asset_id], tenant_id=tenant_id, target_owner_type=target_owner_type, target_owner_id=target_owner_id, target_path=target_path, user_id=user_id) + copied_or_moved_files += 1 + return copied_or_moved_files, copied_or_moved_folders + + now = utcnow() + for asset_id, target_path in all_targets.items(): + asset = assets_by_id[asset_id] + same_owner = asset.owner_type == target_owner_type and _asset_owner_id(asset) == target_owner_id + if same_owner: + if normalize_logical_path(asset.display_path) == target_path: + continue + rename_asset(asset, new_path=target_path) + session.add(asset) + else: + _copy_asset_to_path(session, asset, tenant_id=tenant_id, target_owner_type=target_owner_type, target_owner_id=target_owner_id, target_path=target_path, user_id=user_id) + asset.deleted_at = now + session.add(asset) + copied_or_moved_files += 1 + + # Move/copy persisted folder records after files. For cross-owner moves, create + # target records and soft-delete source records. For same-owner moves, rename + # them in place. + for source_path, target_prefix in folder_target_paths.items(): + folder_rows = _folder_query_for_owner(session, tenant_id=tenant_id, owner_type=source_owner_type, owner_id=source_owner_id).filter( + FileFolder.deleted_at.is_(None), + or_(FileFolder.path == source_path, FileFolder.path.like(f"{source_path}/%")), + ).all() + for folder in folder_rows: + relative = "" if folder.path == source_path else folder.path[len(source_path) + 1 :] + new_path = normalize_folder(f"{target_prefix}/{relative}" if relative else target_prefix) + if operation == "move" and source_owner_type == target_owner_type and source_owner_id == target_owner_id: + folder.path = new_path + session.add(folder) + else: + _ensure_target_folder_hierarchy(session, tenant_id=tenant_id, owner_type=target_owner_type, owner_id=target_owner_id, user_id=user_id, path=new_path) + if operation == "move": + folder.deleted_at = now + session.add(folder) + return copied_or_moved_files, copied_or_moved_folders + + +def _rename_basename(name: str, *, mode: str, new_name: str | None = None, find: str | None = None, replacement: str = "", prefix: str = "", suffix: str = "") -> str: + if mode == "direct": + cleaned = normalize_logical_path(new_name or "", fallback_filename="item") + if "/" in cleaned: + raise FileStorageError("Rename expects a name, not a path") + return cleaned + stem_path = PurePosixPath(name) + suffixes = "".join(stem_path.suffixes) + stem = name[: -len(suffixes)] if suffixes else name + if mode == "prefix": + return prefix + name + if mode == "suffix": + return f"{stem}{suffix}{suffixes}" + if mode == "replace": + return name.replace(find or "", replacement) if find else name + raise FileStorageError("Unsupported rename mode") + + +def _rename_path(path: str, *, mode: str, new_name: str | None = None, find: str | None = None, replacement: str = "", prefix: str = "", suffix: str = "") -> str: + normalized = normalize_logical_path(path) + logical = PurePosixPath(normalized) + folder = "" if str(logical.parent) == "." else str(logical.parent) + next_name = _rename_basename(logical.name, mode=mode, new_name=new_name, find=find, replacement=replacement, prefix=prefix, suffix=suffix) + return normalize_logical_path(f"{folder}/{next_name}" if folder else next_name) + + +def _rename_relative_recursive(relative_path: str, *, mode: str, new_name: str | None = None, find: str | None = None, replacement: str = "", prefix: str = "", suffix: str = "") -> str: + parts = normalize_logical_path(relative_path).split("/") + return normalize_logical_path( + "/".join( + _rename_basename(part, mode=mode, new_name=new_name if len(parts) == 1 else None, find=find, replacement=replacement, prefix=prefix, suffix=suffix) + for part in parts + if part + ) + ) + + +def build_rename_preview(asset: FileAsset, *, mode: str, find: str | None = None, replacement: str = "", prefix: str = "", suffix: str = "") -> str: + return _rename_path(asset.display_path, mode=mode, find=find, replacement=replacement, prefix=prefix, suffix=suffix) + + +def _collapse_folder_roots(folder_paths: list[str]) -> list[str]: + roots = sorted({normalize_folder(path) for path in folder_paths if normalize_folder(path)}, key=lambda item: (item.count("/"), item)) + collapsed: list[str] = [] + for path in roots: + if any(path == root or path.startswith(f"{root}/") for root in collapsed): + continue + collapsed.append(path) + return collapsed + + +def _path_under_root(path: str, root: str) -> bool: + normalized = normalize_logical_path(path) + return normalized == root or normalized.startswith(f"{root}/") + + +def _folder_new_path_for_root(path: str, root: str, new_root: str, *, recursive: bool, mode: str, find: str | None, replacement: str, prefix: str, suffix: str) -> str: + if path == root: + return normalize_folder(new_root) + relative = path[len(root) + 1 :] + if recursive: + relative = _rename_relative_recursive(relative, mode=mode, find=find, replacement=replacement, prefix=prefix, suffix=suffix) + return normalize_folder(f"{new_root}/{relative}") + + +def _file_new_path_for_root(path: str, root: str, new_root: str, *, recursive: bool, mode: str, find: str | None, replacement: str, prefix: str, suffix: str) -> str: + relative = path[len(root) + 1 :] + if recursive: + relative = _rename_relative_recursive(relative, mode=mode, find=find, replacement=replacement, prefix=prefix, suffix=suffix) + return normalize_logical_path(f"{new_root}/{relative}") + + +def rename_selection( + session: Session, + *, + tenant_id: str, + user_id: str, + file_ids: list[str], + folder_paths: list[str], + owner_type: str | None, + owner_id: str | None, + mode: str, + new_name: str | None = None, + find: str | None = None, + replacement: str = "", + prefix: str = "", + suffix: str = "", + recursive: bool = False, + dry_run: bool = True, + is_admin: bool = False, +) -> list[RenamePlanItem]: + mode = mode.lower().strip() + if mode not in {"direct", "prefix", "suffix", "replace"}: + raise FileStorageError("Unsupported rename mode") + selected_file_ids = list(dict.fromkeys(file_ids)) + selected_folder_roots = _collapse_folder_roots(folder_paths) + if not selected_file_ids and not selected_folder_roots: + raise FileStorageError("No files or folders selected") + if selected_folder_roots and (not owner_type or not owner_id): + raise FileStorageError("Folder rename requires an owner file space") + if mode == "direct" and (len(selected_file_ids) + len(selected_folder_roots)) != 1: + raise FileStorageError("Direct rename requires exactly one selected item") + + assets: dict[str, FileAsset] = {} + for file_id in selected_file_ids: + asset = get_asset_for_user(session, tenant_id=tenant_id, user_id=user_id, asset_id=file_id, require_write=True, is_admin=is_admin) + assets[asset.id] = asset + + owner_type_norm = owner_type.lower().strip() if owner_type else None + owner_id_norm = owner_id + folder_rows_by_path: dict[str, FileFolder] = {} + affected_folder_paths: set[str] = set() + if selected_folder_roots and owner_type_norm and owner_id_norm: + ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type_norm, owner_id=owner_id_norm, user_id=user_id, is_admin=is_admin) + for root in selected_folder_roots: + rows = _folder_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type_norm, owner_id=owner_id_norm).filter( + FileFolder.deleted_at.is_(None), + or_(FileFolder.path == root, FileFolder.path.like(f"{root}/%")), + ).all() + if not rows and not _asset_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type_norm, owner_id=owner_id_norm).filter(FileAsset.deleted_at.is_(None), FileAsset.display_path.like(f"{root}/%")).first(): + raise FileStorageError(f"Folder not found: {root}") + for row in rows: + folder_rows_by_path[row.path] = row + affected_folder_paths.add(row.path) + for asset in _asset_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type_norm, owner_id=owner_id_norm).filter(FileAsset.deleted_at.is_(None), FileAsset.display_path.like(f"{root}/%")).all(): + assets[asset.id] = asset + + folder_plan: dict[str, str] = {} + root_target_paths: dict[str, str] = {} + for root in selected_folder_roots: + if mode == "direct": + root_parent = "" if str(PurePosixPath(root).parent) == "." else str(PurePosixPath(root).parent) + root_new_name = _rename_basename(PurePosixPath(root).name, mode=mode, new_name=new_name) + new_root = normalize_folder(f"{root_parent}/{root_new_name}" if root_parent else root_new_name) + else: + new_root = normalize_folder(_rename_path(root, mode=mode, find=find, replacement=replacement, prefix=prefix, suffix=suffix)) + root_target_paths[root] = new_root + if root in affected_folder_paths: + folder_plan[root] = new_root + for path in sorted(affected_folder_paths, key=lambda item: item.count("/")): + if path != root and _path_under_root(path, root): + folder_plan[path] = _folder_new_path_for_root(path, root, new_root, recursive=recursive, mode=mode, find=find, replacement=replacement, prefix=prefix, suffix=suffix) + + file_plan: dict[str, str] = {} + for asset in assets.values(): + matched_root = next((root for root in sorted(selected_folder_roots, key=len, reverse=True) if _path_under_root(asset.display_path, root)), None) + if matched_root: + root_new = root_target_paths.get(matched_root) + if not root_new: + continue + file_plan[asset.id] = _file_new_path_for_root(asset.display_path, matched_root, root_new, recursive=recursive, mode=mode, find=find, replacement=replacement, prefix=prefix, suffix=suffix) + else: + file_plan[asset.id] = _rename_path(asset.display_path, mode=mode, new_name=new_name, find=find, replacement=replacement, prefix=prefix, suffix=suffix) + + # Detect duplicate targets and existing active target conflicts before applying. + target_files: set[str] = set() + target_folders: set[str] = set() + affected_file_ids = set(file_plan) + for asset_id, target in file_plan.items(): + normalized = normalize_logical_path(target) + if normalized in target_files: + raise FileStorageError(f"Rename would create duplicate file path: {normalized}") + target_files.add(normalized) + asset = assets[asset_id] + if _active_asset_exists(session, tenant_id=tenant_id, owner_type=asset.owner_type, owner_id=_asset_owner_id(asset), path=normalized, exclude_asset_id=asset.id): + raise FileStorageError(f"Target file already exists: {normalized}") + if _active_folder_exists(session, tenant_id=tenant_id, owner_type=asset.owner_type, owner_id=_asset_owner_id(asset), path=normalized): + raise FileStorageError(f"Target path is already a folder: {normalized}") + if selected_folder_roots and owner_type_norm and owner_id_norm: + for old_path, target in folder_plan.items(): + normalized = normalize_folder(target) + if normalized in target_folders: + raise FileStorageError(f"Rename would create duplicate folder path: {normalized}") + target_folders.add(normalized) + if _active_folder_exists(session, tenant_id=tenant_id, owner_type=owner_type_norm, owner_id=owner_id_norm, path=normalized, exclude_paths=set(folder_plan.keys())): + raise FileStorageError(f"Target folder already exists: {normalized}") + if _active_asset_exists(session, tenant_id=tenant_id, owner_type=owner_type_norm, owner_id=owner_id_norm, path=normalized): + raise FileStorageError(f"Target path is already a file: {normalized}") + + plan: list[RenamePlanItem] = [] + for old_path, new_path in sorted(folder_plan.items()): + plan.append(RenamePlanItem(kind="folder", id=old_path, old_path=old_path, new_path=new_path)) + for asset_id, new_path in file_plan.items(): + asset = assets[asset_id] + plan.append(RenamePlanItem(kind="file", id=asset.id, old_path=asset.display_path, new_path=new_path)) + + if dry_run: + return plan + + for old_path, new_path in folder_plan.items(): + folder = folder_rows_by_path.get(old_path) + if folder: + folder.path = normalize_folder(new_path) + session.add(folder) + for asset_id, new_path in file_plan.items(): + rename_asset(assets[asset_id], new_path=new_path) + session.add(assets[asset_id]) + return plan diff --git a/server/multimailer-dev.db b/server/multimailer-dev.db index 332338a..2fda82b 100644 Binary files a/server/multimailer-dev.db and b/server/multimailer-dev.db differ diff --git a/server/requirements-dev.txt b/server/requirements-dev.txt new file mode 100644 index 0000000..7171e02 --- /dev/null +++ b/server/requirements-dev.txt @@ -0,0 +1,4 @@ +-r requirements.txt + +# FastAPI/Starlette TestClient transport used by the stdlib unittest smoke suite. +httpx==0.28.1 diff --git a/server/tests/__init__.py b/server/tests/__init__.py new file mode 100644 index 0000000..b314cf4 --- /dev/null +++ b/server/tests/__init__.py @@ -0,0 +1 @@ +"""Runtime and API contract tests for Multi Seal Mail.""" diff --git a/server/tests/test_api_smoke.py b/server/tests/test_api_smoke.py new file mode 100644 index 0000000..356016f --- /dev/null +++ b/server/tests/test_api_smoke.py @@ -0,0 +1,257 @@ +from __future__ import annotations + +import io +import json +import os +import shutil +import tempfile +import unittest +import zipfile +from pathlib import Path + +# Settings are instantiated while importing the application. Configure an +# isolated test database and local storage before importing app modules. +_TEST_ROOT = Path(tempfile.mkdtemp(prefix="multi-seal-mail-tests-")) +os.environ["APP_ENV"] = "test" +os.environ["DATABASE_URL"] = f"sqlite:///{_TEST_ROOT / 'test.db'}" +os.environ["FILE_STORAGE_BACKEND"] = "local" +os.environ["FILE_STORAGE_LOCAL_ROOT"] = str(_TEST_ROOT / "files") +os.environ["MOCK_MAILBOX_DIR"] = str(_TEST_ROOT / "mock-mailbox") +os.environ["DEV_BOOTSTRAP_ENABLED"] = "false" + +from fastapi.testclient import TestClient + +from app.db.base import Base +from app.db.bootstrap import bootstrap_dev_data +from app.db.session import SessionLocal, engine +from app.main import app + + +class ApiSmokeTests(unittest.TestCase): + @classmethod + def setUpClass(cls) -> None: + cls.client = TestClient(app) + + @classmethod + def tearDownClass(cls) -> None: + cls.client.close() + engine.dispose() + shutil.rmtree(_TEST_ROOT, ignore_errors=True) + + def setUp(self) -> None: + Base.metadata.drop_all(bind=engine) + Base.metadata.create_all(bind=engine) + shutil.rmtree(_TEST_ROOT / "files", ignore_errors=True) + shutil.rmtree(_TEST_ROOT / "mock-mailbox", ignore_errors=True) + with SessionLocal() as session: + bootstrap_dev_data( + session, + api_key_secret="test-api-key", + user_password="test-admin", + ) + + def _login(self) -> tuple[dict[str, str], dict[str, object]]: + response = self.client.post( + "/api/v1/auth/login", + json={"email": "admin@example.local", "password": "test-admin"}, + ) + self.assertEqual(response.status_code, 200, response.text) + payload = response.json() + return {"Authorization": f"Bearer {payload['access_token']}"}, payload + + def test_managed_file_runtime_flow(self) -> None: + headers, login = self._login() + user_id = login["user"]["id"] + + spaces = self.client.get("/api/v1/files/spaces", headers=headers) + self.assertEqual(spaces.status_code, 200, spaces.text) + self.assertEqual(spaces.json()["spaces"][0]["owner_id"], user_id) + + folder = self.client.post( + "/api/v1/files/folders", + headers=headers, + json={"owner_type": "user", "owner_id": user_id, "path": "inbox"}, + ) + self.assertEqual(folder.status_code, 200, folder.text) + + first = self.client.post( + "/api/v1/files/upload", + headers=headers, + data={"owner_type": "user", "owner_id": user_id, "path": "inbox"}, + files=[("files", ("report.txt", b"first report", ""))], + ) + self.assertEqual(first.status_code, 200, first.text) + first_file = first.json()["files"][0] + self.assertEqual(first_file["display_path"], "inbox/report.txt") + self.assertEqual(first_file["content_type"], "text/plain") + + # Exercises request-model conversion to FileConflictResolution and the + # explicit rename path rather than silently overwriting an asset. + second = self.client.post( + "/api/v1/files/upload", + headers=headers, + data={ + "owner_type": "user", + "owner_id": user_id, + "path": "inbox", + "conflict_resolutions_json": json.dumps( + [ + { + "target_path": "inbox/report.txt", + "action": "rename", + "new_path": "inbox/report-copy.txt", + } + ] + ), + }, + files=[("files", ("report.txt", b"second report", "text/plain"))], + ) + self.assertEqual(second.status_code, 200, second.text) + second_file = second.json()["files"][0] + self.assertEqual(second_file["display_path"], "inbox/report-copy.txt") + + listing = self.client.get( + "/api/v1/files", + headers=headers, + params={"owner_type": "user", "owner_id": user_id, "path_prefix": "inbox"}, + ) + self.assertEqual(listing.status_code, 200, listing.text) + self.assertEqual(len(listing.json()["files"]), 2) + + resolved = self.client.post( + "/api/v1/files/resolve-patterns", + headers=headers, + json={ + "patterns": ["**/*.txt"], + "owner_type": "user", + "owner_id": user_id, + "include_unmatched": True, + }, + ) + self.assertEqual(resolved.status_code, 200, resolved.text) + self.assertEqual(len(resolved.json()["patterns"][0]["matches"]), 2) + self.assertEqual(resolved.json()["unmatched"], []) + + # Exercises RenamePlanItem construction without mutating persisted paths. + rename_preview = self.client.post( + "/api/v1/files/bulk-rename", + headers=headers, + json={ + "file_ids": [first_file["id"]], + "mode": "prefix", + "prefix": "archived-", + "dry_run": True, + }, + ) + self.assertEqual(rename_preview.status_code, 200, rename_preview.text) + self.assertEqual(rename_preview.json()["items"][0]["new_path"], "inbox/archived-report.txt") + + download = self.client.get(f"/api/v1/files/{first_file['id']}/download", headers=headers) + self.assertEqual(download.status_code, 200, download.text) + self.assertEqual(download.content, b"first report") + + archive = self.client.post( + "/api/v1/files/archive.zip", + headers=headers, + json={"file_ids": [first_file["id"], second_file["id"]], "filename": "reports.zip"}, + ) + self.assertEqual(archive.status_code, 200, archive.text) + with zipfile.ZipFile(io.BytesIO(archive.content)) as bundle: + self.assertEqual(sorted(bundle.namelist()), ["inbox/report-copy.txt", "inbox/report.txt"]) + self.assertEqual(bundle.read("inbox/report.txt"), b"first report") + + def test_campaign_create_validate_build_and_mock_send(self) -> None: + headers, _ = self._login() + campaign_json = { + "version": "1.0", + "campaign": {"id": "api-smoke", "name": "API smoke campaign", "mode": "test"}, + "fields": [{"name": "first_name", "type": "string", "required": True}], + "global_values": {}, + "server": { + "smtp": { + "host": "smtp.example.invalid", + "port": 587, + "username": "sender@example.org", + "password": "test-secret", + "security": "starttls", + }, + "imap": { + "enabled": True, + "host": "imap.example.invalid", + "port": 993, + "username": "sender@example.org", + "password": "test-secret", + "security": "tls", + }, + }, + "recipients": { + "from": {"email": "sender@example.org", "name": "Sender", "type": "to"}, + "allow_individual_to": True, + }, + "template": { + "subject": "Hello ${local::first_name}", + "text": "Hello ${local::first_name}, this is a smoke test.", + }, + "attachments": {"base_path": ".", "global": [], "allow_individual": False}, + "entries": { + "inline": [ + { + "id": "recipient-1", + "to": [{"email": "recipient@example.org", "name": "Recipient", "type": "to"}], + "fields": {"first_name": "Ada"}, + } + ] + }, + "validation_policy": {"missing_email": "block", "template_error": "block"}, + "delivery": {"imap_append_sent": {"enabled": True, "folder": "Sent"}}, + "status_tracking": {"enabled": True}, + } + + created = self.client.post( + "/api/v1/campaigns", + headers=headers, + json={"config": campaign_json}, + ) + self.assertEqual(created.status_code, 200, created.text) + created_payload = created.json() + campaign_id = created_payload["campaign"]["id"] + version_id = created_payload["version"]["id"] + + validated = self.client.post( + f"/api/v1/campaigns/versions/{version_id}/validate", + headers=headers, + json={"check_files": False}, + ) + self.assertEqual(validated.status_code, 200, validated.text) + self.assertTrue(validated.json()["ok"]) + + built = self.client.post( + f"/api/v1/campaigns/versions/{version_id}/build", + headers=headers, + json={"write_eml": False}, + ) + self.assertEqual(built.status_code, 200, built.text) + self.assertEqual(built.json()["built_count"], 1) + + mocked = self.client.post( + f"/api/v1/campaigns/{campaign_id}/mock-send", + headers=headers, + json={ + "version_id": version_id, + "send": True, + "append_sent": True, + "clear_mailbox": True, + "check_files": False, + }, + ) + self.assertEqual(mocked.status_code, 200, mocked.text) + result = mocked.json()["result"] + self.assertTrue(result["validation"]["ok"]) + self.assertEqual(result["build"]["built_count"], 1) + self.assertEqual(result["send"]["sent_count"], 1) + self.assertEqual(result["send"]["imap_appended_count"], 1) + self.assertEqual(result["send"]["imap_failed_count"], 0) + + +if __name__ == "__main__": + unittest.main()