Files

582 lines
22 KiB
Python

from __future__ import annotations
import json
from io import BytesIO
from typing import Literal
from fastapi import APIRouter, Depends, File as FastAPIFile, Form, HTTPException, UploadFile, status
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from app.auth.dependencies import ApiPrincipal, require_scope
from app.api.v1.file_schemas import (
ArchiveRequest,
BulkDeleteRequest,
BulkDeleteResponse,
ConflictResolutionRequest,
FileAssetResponse,
FileFolderCreateRequest,
FileFolderDeleteRequest,
FileFolderDeleteResponse,
FileFolderResponse,
FileFoldersResponse,
FileListResponse,
FileShareRequest,
FileShareResponse,
FileSpaceResponse,
FileSpacesResponse,
FileUploadResponse,
PatternMatchResponse,
PatternResolveRequest,
PatternResolveResponse,
RenamePreviewItem,
RenameRequest,
RenameResponse,
TransferRequest,
TransferResponse,
_conflict_resolutions,
)
from app.db.models import Campaign, FileAsset, FileFolder, FileShare, Group
from app.db.session import get_session
from app.storage.paths import UnsafeFilePathError, filename_from_path, normalize_folder, normalize_logical_path
from app.storage.access import ensure_group_access, user_group_ids
from app.storage.archives import create_zip_bytes, extract_zip_upload
from app.storage.common import FileStorageError
from app.storage.files import (
asset_is_audit_relevant,
create_file_asset,
current_version_and_blob,
get_asset_for_user,
list_assets_for_user,
read_asset_bytes,
share_file,
soft_delete_assets,
)
from app.storage.folders import create_folder, list_folders_for_user, soft_delete_folder
from app.storage.search import resolve_patterns
from app.storage.transfers import rename_selection, transfer_selection
router = APIRouter(prefix="/files", tags=["files"])
def _is_admin(principal: ApiPrincipal) -> bool:
return principal.user.is_tenant_admin or "*" in set(principal.scopes or [])
def _http_error(exc: Exception, *, not_found: bool = False) -> HTTPException:
code = status.HTTP_404_NOT_FOUND if not_found else status.HTTP_400_BAD_REQUEST
return HTTPException(status_code=code, detail=str(exc))
def _owner_id(asset: FileAsset) -> str:
return asset.owner_user_id if asset.owner_type == "user" else asset.owner_group_id # type: ignore[return-value]
def _asset_response(session: Session, asset: FileAsset, *, include_shares: bool = False) -> FileAssetResponse:
version, blob = current_version_and_blob(session, asset)
shares: list[FileShareResponse] = []
if include_shares:
rows = session.query(FileShare).filter(FileShare.file_asset_id == asset.id).order_by(FileShare.created_at.desc()).all()
shares = [
FileShareResponse(
id=row.id,
target_type=row.target_type,
target_id=row.target_id,
permission=row.permission,
created_at=row.created_at.isoformat(),
revoked_at=row.revoked_at.isoformat() if row.revoked_at else None,
)
for row in rows
]
return FileAssetResponse(
id=asset.id,
tenant_id=asset.tenant_id,
owner_type=asset.owner_type,
owner_id=_owner_id(asset),
display_path=asset.display_path,
filename=asset.filename,
description=asset.description,
size_bytes=blob.size_bytes,
content_type=blob.content_type,
checksum_sha256=blob.checksum_sha256,
version_id=version.id,
created_at=asset.created_at.isoformat(),
updated_at=asset.updated_at.isoformat(),
deleted_at=asset.deleted_at.isoformat() if asset.deleted_at else None,
audit_relevant=asset_is_audit_relevant(session, asset),
metadata=asset.metadata_ or {},
shares=shares,
)
def _folder_owner_id(folder: FileFolder) -> str:
return folder.owner_user_id if folder.owner_type == "user" else folder.owner_group_id # type: ignore[return-value]
def _folder_response(folder: FileFolder) -> FileFolderResponse:
return FileFolderResponse(
id=folder.id,
tenant_id=folder.tenant_id,
owner_type=folder.owner_type,
owner_id=_folder_owner_id(folder),
path=folder.path,
created_at=folder.created_at.isoformat(),
updated_at=folder.updated_at.isoformat(),
deleted_at=folder.deleted_at.isoformat() if folder.deleted_at else None,
)
def _ensure_list_owner_access(session: Session, principal: ApiPrincipal, owner_type: str | None, owner_id: str | None) -> None:
if not owner_type:
return
if owner_type == "user" and owner_id and owner_id != principal.user.id and not _is_admin(principal):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No access to this user file space")
if owner_type == "group" and owner_id:
try:
ensure_group_access(
session,
tenant_id=principal.tenant_id,
group_id=owner_id,
user_id=principal.user.id,
is_admin=_is_admin(principal),
)
except FileStorageError as exc:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(exc)) from exc
@router.get("/spaces", response_model=FileSpacesResponse)
def list_file_spaces(
session: Session = Depends(get_session),
principal: ApiPrincipal = Depends(require_scope("attachments:read")),
):
spaces = [
FileSpaceResponse(
id=f"user:{principal.user.id}",
label="My files",
owner_type="user",
owner_id=principal.user.id,
description="Files owned by your user account.",
)
]
group_ids = user_group_ids(session, tenant_id=principal.tenant_id, user_id=principal.user.id, include_admin_groups=_is_admin(principal))
if group_ids:
groups = session.query(Group).filter(Group.tenant_id == principal.tenant_id, Group.id.in_(group_ids)).order_by(Group.name.asc()).all()
spaces.extend(
FileSpaceResponse(
id=f"group:{group.id}",
label=f"{group.name} files",
owner_type="group",
owner_id=group.id,
description="Files owned by this group.",
)
for group in groups
)
return FileSpacesResponse(spaces=spaces)
@router.get("/folders", response_model=FileFoldersResponse)
def list_file_folders(
owner_type: Literal["user", "group"],
owner_id: str,
session: Session = Depends(get_session),
principal: ApiPrincipal = Depends(require_scope("attachments:read")),
):
try:
folders = list_folders_for_user(
session,
tenant_id=principal.tenant_id,
user_id=principal.user.id,
owner_type=owner_type,
owner_id=owner_id,
is_admin=_is_admin(principal),
)
return FileFoldersResponse(folders=[_folder_response(folder) for folder in folders])
except FileStorageError as exc:
raise _http_error(exc) from exc
@router.post("/folders", response_model=FileFolderResponse)
def create_file_folder(
payload: FileFolderCreateRequest,
session: Session = Depends(get_session),
principal: ApiPrincipal = Depends(require_scope("attachments:write")),
):
try:
folder = create_folder(
session,
tenant_id=principal.tenant_id,
owner_type=payload.owner_type,
owner_id=payload.owner_id,
user_id=principal.user.id,
path=payload.path,
is_admin=_is_admin(principal),
)
session.commit()
return _folder_response(folder)
except (FileStorageError, UnsafeFilePathError, ValueError) as exc:
session.rollback()
raise _http_error(exc) from exc
@router.post("/folders/delete", response_model=FileFolderDeleteResponse)
def delete_file_folder(
payload: FileFolderDeleteRequest,
session: Session = Depends(get_session),
principal: ApiPrincipal = Depends(require_scope("attachments:write")),
):
try:
deleted_folders, deleted_files = soft_delete_folder(
session,
tenant_id=principal.tenant_id,
owner_type=payload.owner_type,
owner_id=payload.owner_id,
user_id=principal.user.id,
path=payload.path,
recursive=payload.recursive,
is_admin=_is_admin(principal),
)
session.commit()
return FileFolderDeleteResponse(deleted_folders=deleted_folders, deleted_files=deleted_files)
except (FileStorageError, UnsafeFilePathError, ValueError) as exc:
session.rollback()
raise _http_error(exc) from exc
@router.get("", response_model=FileListResponse)
def list_files(
owner_type: Literal["user", "group"] | None = None,
owner_id: str | None = None,
campaign_id: str | None = None,
path_prefix: str | None = None,
session: Session = Depends(get_session),
principal: ApiPrincipal = Depends(require_scope("attachments:read")),
):
_ensure_list_owner_access(session, principal, owner_type, owner_id)
assets = list_assets_for_user(
session,
tenant_id=principal.tenant_id,
user_id=principal.user.id,
owner_type=owner_type,
owner_id=owner_id,
campaign_id=campaign_id,
path_prefix=path_prefix,
is_admin=_is_admin(principal),
)
return FileListResponse(files=[_asset_response(session, asset, include_shares=True) for asset in assets])
@router.post("/upload", response_model=FileUploadResponse)
async def upload_files(
files: list[UploadFile] = FastAPIFile(...),
owner_type: Literal["user", "group"] = Form(default="user"),
owner_id: str | None = Form(default=None),
path: str = Form(default=""),
campaign_id: str | None = Form(default=None),
unpack_zip: bool = Form(default=False),
conflict_strategy: Literal["reject", "overwrite", "rename"] = Form(default="reject"),
conflict_resolutions_json: str | None = Form(default=None),
session: Session = Depends(get_session),
principal: ApiPrincipal = Depends(require_scope("attachments:write")),
):
target_owner = owner_id or principal.user.id
uploaded_assets: list[FileAsset] = []
try:
raw_resolutions = json.loads(conflict_resolutions_json) if conflict_resolutions_json else []
upload_resolutions = _conflict_resolutions([ConflictResolutionRequest(**item) for item in raw_resolutions])
for upload in files:
data = await upload.read()
filename = upload.filename or "file"
content_type = upload.content_type or None
if unpack_zip and filename.lower().endswith(".zip"):
extracted = extract_zip_upload(
session,
tenant_id=principal.tenant_id,
owner_type=owner_type,
owner_id=target_owner,
user_id=principal.user.id,
zip_data=data,
folder=path,
campaign_id=campaign_id,
conflict_strategy=conflict_strategy,
conflict_resolutions=upload_resolutions,
is_admin=_is_admin(principal),
)
uploaded_assets.extend(item.asset for item in extracted)
continue
stored = create_file_asset(
session,
tenant_id=principal.tenant_id,
owner_type=owner_type,
owner_id=target_owner,
user_id=principal.user.id,
filename=filename,
data=data,
folder=path,
content_type=content_type,
campaign_id=campaign_id,
conflict_strategy=conflict_strategy,
conflict_resolutions=upload_resolutions,
is_admin=_is_admin(principal),
)
uploaded_assets.append(stored.asset)
session.commit()
except (FileStorageError, UnsafeFilePathError, ValueError) as exc:
session.rollback()
raise _http_error(exc) from exc
return FileUploadResponse(files=[_asset_response(session, asset, include_shares=True) for asset in uploaded_assets])
@router.post("/upload-zip", response_model=FileUploadResponse)
async def upload_zip(
file: UploadFile = FastAPIFile(...),
owner_type: Literal["user", "group"] = Form(default="user"),
owner_id: str | None = Form(default=None),
path: str = Form(default=""),
campaign_id: str | None = Form(default=None),
conflict_strategy: Literal["reject", "overwrite", "rename"] = Form(default="reject"),
conflict_resolutions_json: str | None = Form(default=None),
session: Session = Depends(get_session),
principal: ApiPrincipal = Depends(require_scope("attachments:write")),
):
data = await file.read()
target_owner = owner_id or principal.user.id
try:
raw_resolutions = json.loads(conflict_resolutions_json) if conflict_resolutions_json else []
upload_resolutions = _conflict_resolutions([ConflictResolutionRequest(**item) for item in raw_resolutions])
extracted = extract_zip_upload(
session,
tenant_id=principal.tenant_id,
owner_type=owner_type,
owner_id=target_owner,
user_id=principal.user.id,
zip_data=data,
folder=path,
campaign_id=campaign_id,
conflict_strategy=conflict_strategy,
conflict_resolutions=upload_resolutions,
is_admin=_is_admin(principal),
)
session.commit()
except (FileStorageError, UnsafeFilePathError, ValueError) as exc:
session.rollback()
raise _http_error(exc) from exc
return FileUploadResponse(files=[_asset_response(session, item.asset, include_shares=True) for item in extracted])
@router.get("/{file_id}", response_model=FileAssetResponse)
def get_file(
file_id: str,
session: Session = Depends(get_session),
principal: ApiPrincipal = Depends(require_scope("attachments:read")),
):
try:
asset = get_asset_for_user(session, tenant_id=principal.tenant_id, user_id=principal.user.id, asset_id=file_id, is_admin=_is_admin(principal))
return _asset_response(session, asset, include_shares=True)
except FileStorageError as exc:
raise _http_error(exc, not_found=True) from exc
@router.get("/{file_id}/download")
def download_file(
file_id: str,
session: Session = Depends(get_session),
principal: ApiPrincipal = Depends(require_scope("attachments:read")),
):
try:
asset = get_asset_for_user(session, tenant_id=principal.tenant_id, user_id=principal.user.id, asset_id=file_id, is_admin=_is_admin(principal))
data, _, blob = read_asset_bytes(session, asset)
except FileStorageError as exc:
raise _http_error(exc, not_found=True) from exc
headers = {"Content-Disposition": f'attachment; filename="{asset.filename}"'}
return StreamingResponse(BytesIO(data), media_type=blob.content_type or "application/octet-stream", headers=headers)
@router.delete("/{file_id}", response_model=BulkDeleteResponse)
def delete_file(
file_id: str,
session: Session = Depends(get_session),
principal: ApiPrincipal = Depends(require_scope("attachments:write")),
):
try:
asset = get_asset_for_user(session, tenant_id=principal.tenant_id, user_id=principal.user.id, asset_id=file_id, require_write=True, is_admin=_is_admin(principal))
count = soft_delete_assets(session, [asset])
session.commit()
return BulkDeleteResponse(deleted_count=count)
except FileStorageError as exc:
session.rollback()
raise _http_error(exc, not_found=True) from exc
@router.post("/bulk-delete", response_model=BulkDeleteResponse)
def bulk_delete_files(
payload: BulkDeleteRequest,
session: Session = Depends(get_session),
principal: ApiPrincipal = Depends(require_scope("attachments:write")),
):
try:
assets = [
get_asset_for_user(session, tenant_id=principal.tenant_id, user_id=principal.user.id, asset_id=file_id, require_write=True, is_admin=_is_admin(principal))
for file_id in payload.file_ids
]
count = soft_delete_assets(session, assets)
session.commit()
return BulkDeleteResponse(deleted_count=count)
except FileStorageError as exc:
session.rollback()
raise _http_error(exc) from exc
@router.post("/{file_id}/shares", response_model=FileShareResponse)
def create_share(
file_id: str,
payload: FileShareRequest,
session: Session = Depends(get_session),
principal: ApiPrincipal = Depends(require_scope("attachments:write")),
):
try:
asset = get_asset_for_user(session, tenant_id=principal.tenant_id, user_id=principal.user.id, asset_id=file_id, require_write=True, is_admin=_is_admin(principal))
share = share_file(
session,
tenant_id=principal.tenant_id,
asset=asset,
target_type=payload.target_type,
target_id=payload.target_id,
permission=payload.permission,
user_id=principal.user.id,
)
session.commit()
return FileShareResponse(
id=share.id,
target_type=share.target_type,
target_id=share.target_id,
permission=share.permission,
created_at=share.created_at.isoformat(),
revoked_at=share.revoked_at.isoformat() if share.revoked_at else None,
)
except FileStorageError as exc:
session.rollback()
raise _http_error(exc) from exc
@router.post("/bulk-rename", response_model=RenameResponse)
def bulk_rename(
payload: RenameRequest,
session: Session = Depends(get_session),
principal: ApiPrincipal = Depends(require_scope("attachments:write")),
):
try:
plan = rename_selection(
session,
tenant_id=principal.tenant_id,
user_id=principal.user.id,
file_ids=payload.file_ids,
folder_paths=payload.folder_paths,
owner_type=payload.owner_type,
owner_id=payload.owner_id,
mode=payload.mode,
new_name=payload.new_name,
find=payload.find,
replacement=payload.replacement,
prefix=payload.prefix,
suffix=payload.suffix,
recursive=payload.recursive,
dry_run=payload.dry_run,
is_admin=_is_admin(principal),
)
if not payload.dry_run:
session.commit()
return RenameResponse(
dry_run=payload.dry_run,
items=[
RenamePreviewItem(
kind=item.kind,
id=item.id,
file_id=item.id if item.kind == "file" else None,
folder_path=item.old_path if item.kind == "folder" else None,
old_path=item.old_path,
new_path=item.new_path,
)
for item in plan
],
)
except (FileStorageError, UnsafeFilePathError, ValueError) as exc:
session.rollback()
raise _http_error(exc) from exc
@router.post("/transfer", response_model=TransferResponse)
def transfer_files(
payload: TransferRequest,
session: Session = Depends(get_session),
principal: ApiPrincipal = Depends(require_scope("attachments:write")),
):
try:
files, folders = transfer_selection(
session,
tenant_id=principal.tenant_id,
user_id=principal.user.id,
operation=payload.operation,
file_ids=payload.file_ids,
folder_paths=payload.folder_paths,
source_owner_type=payload.source_owner_type,
source_owner_id=payload.source_owner_id,
target_owner_type=payload.target_owner_type,
target_owner_id=payload.target_owner_id,
target_folder=payload.target_folder,
conflict_strategy=payload.conflict_strategy,
conflict_resolutions=_conflict_resolutions(payload.conflict_resolutions),
is_admin=_is_admin(principal),
)
session.commit()
return TransferResponse(operation=payload.operation, files=files, folders=folders)
except (FileStorageError, UnsafeFilePathError, ValueError) as exc:
session.rollback()
raise _http_error(exc) from exc
@router.post("/archive.zip")
def download_archive(
payload: ArchiveRequest,
session: Session = Depends(get_session),
principal: ApiPrincipal = Depends(require_scope("attachments:read")),
):
try:
assets = [
get_asset_for_user(session, tenant_id=principal.tenant_id, user_id=principal.user.id, asset_id=file_id, is_admin=_is_admin(principal))
for file_id in payload.file_ids
]
data = create_zip_bytes(session, assets)
except FileStorageError as exc:
raise _http_error(exc) from exc
filename = filename_from_path(normalize_logical_path(payload.filename, fallback_filename="files.zip"))
headers = {"Content-Disposition": f'attachment; filename="{filename}"'}
return StreamingResponse(BytesIO(data), media_type="application/zip", headers=headers)
@router.post("/resolve-patterns", response_model=PatternResolveResponse)
def resolve_file_patterns(
payload: PatternResolveRequest,
session: Session = Depends(get_session),
principal: ApiPrincipal = Depends(require_scope("attachments:read")),
):
_ensure_list_owner_access(session, principal, payload.owner_type, payload.owner_id)
try:
assets = list_assets_for_user(
session,
tenant_id=principal.tenant_id,
user_id=principal.user.id,
owner_type=payload.owner_type,
owner_id=payload.owner_id,
campaign_id=payload.campaign_id,
path_prefix=payload.path_prefix,
is_admin=_is_admin(principal),
)
resolved, unmatched = resolve_patterns(assets, payload.patterns, base_path=payload.path_prefix, case_sensitive=payload.case_sensitive)
return PatternResolveResponse(
patterns=[PatternMatchResponse(pattern=item.pattern, matches=[_asset_response(session, asset) for asset in item.matches]) for item in resolved],
unmatched=[_asset_response(session, asset) for asset in unmatched] if payload.include_unmatched else [],
)
except (FileStorageError, UnsafeFilePathError, ValueError) as exc:
raise _http_error(exc) from exc