Files
multi-seal-mail/server/app/storage/folders.py

174 lines
6.6 KiB
Python

from __future__ import annotations
from sqlalchemy import or_
from sqlalchemy.orm import Session
from app.db.models import FileAsset, FileFolder
from app.storage.access import ensure_owner_access
from app.storage.common import FileStorageError, utcnow
from app.storage.files import _asset_query_for_owner
from app.storage.paths import normalize_folder
def _owner_filter(query, owner_type: str, owner_id: str):
if owner_type == "user":
return query.filter(FileFolder.owner_user_id == owner_id)
if owner_type == "group":
return query.filter(FileFolder.owner_group_id == owner_id)
raise FileStorageError("Unsupported owner type")
def create_folder(
session: Session,
*,
tenant_id: str,
owner_type: str,
owner_id: str,
user_id: str,
path: str,
is_admin: bool = False,
) -> FileFolder:
owner_type = owner_type.lower().strip()
ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin)
normalized = normalize_folder(path)
if not normalized:
raise FileStorageError("Folder path is required")
query = session.query(FileFolder).filter(FileFolder.tenant_id == tenant_id, FileFolder.owner_type == owner_type, FileFolder.path == normalized)
query = _owner_filter(query, owner_type, owner_id)
active_existing = query.filter(FileFolder.deleted_at.is_(None)).first()
if active_existing is not None:
raise FileStorageError(f"Folder already exists: {normalized}")
deleted_existing = query.filter(FileFolder.deleted_at.is_not(None)).first()
if deleted_existing is not None:
deleted_existing.deleted_at = None
session.add(deleted_existing)
session.flush()
return deleted_existing
folder = FileFolder(
tenant_id=tenant_id,
owner_type=owner_type,
owner_user_id=owner_id if owner_type == "user" else None,
owner_group_id=owner_id if owner_type == "group" else None,
path=normalized,
created_by_user_id=user_id,
metadata_={},
)
session.add(folder)
session.flush()
return folder
def list_folders_for_user(
session: Session,
*,
tenant_id: str,
user_id: str,
owner_type: str,
owner_id: str,
include_deleted: bool = False,
is_admin: bool = False,
) -> list[FileFolder]:
owner_type = owner_type.lower().strip()
ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin)
query = session.query(FileFolder).filter(FileFolder.tenant_id == tenant_id, FileFolder.owner_type == owner_type)
query = _owner_filter(query, owner_type, owner_id)
if not include_deleted:
query = query.filter(FileFolder.deleted_at.is_(None))
return query.order_by(FileFolder.path.asc()).all()
def soft_delete_folder(
session: Session,
*,
tenant_id: str,
owner_type: str,
owner_id: str,
user_id: str,
path: str,
recursive: bool = True,
is_admin: bool = False,
) -> tuple[int, int]:
owner_type = owner_type.lower().strip()
ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin)
normalized = normalize_folder(path)
if not normalized:
raise FileStorageError("Folder path is required")
prefix = f"{normalized}/"
now = utcnow()
folder_query = session.query(FileFolder).filter(FileFolder.tenant_id == tenant_id, FileFolder.owner_type == owner_type, FileFolder.deleted_at.is_(None))
folder_query = _owner_filter(folder_query, owner_type, owner_id)
if recursive:
folder_query = folder_query.filter(or_(FileFolder.path == normalized, FileFolder.path.like(f"{prefix}%")))
else:
child_exists = folder_query.filter(FileFolder.path.like(f"{prefix}%")).first() is not None
file_exists = _asset_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id).filter(FileAsset.display_path.like(f"{prefix}%")).first() is not None
if child_exists or file_exists:
raise FileStorageError("Folder is not empty")
folder_query = folder_query.filter(FileFolder.path == normalized)
folders = folder_query.all()
for folder in folders:
folder.deleted_at = now
session.add(folder)
file_query = _asset_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id).filter(FileAsset.deleted_at.is_(None), FileAsset.display_path.like(f"{prefix}%"))
assets = file_query.all() if recursive else []
for asset in assets:
asset.deleted_at = now
session.add(asset)
return len(folders), len(assets)
def _active_folder_exists(session: Session, *, tenant_id: str, owner_type: str, owner_id: str, path: str, exclude_paths: set[str] | None = None) -> bool:
normalized = normalize_folder(path)
if not normalized:
return False
query = session.query(FileFolder).filter(
FileFolder.tenant_id == tenant_id,
FileFolder.owner_type == owner_type,
FileFolder.deleted_at.is_(None),
FileFolder.path == normalized,
)
query = _owner_filter(query, owner_type, owner_id)
if exclude_paths:
query = query.filter(FileFolder.path.notin_(exclude_paths))
return query.first() is not None
def _folder_query_for_owner(session: Session, *, tenant_id: str, owner_type: str, owner_id: str):
query = session.query(FileFolder).filter(FileFolder.tenant_id == tenant_id, FileFolder.owner_type == owner_type)
return _owner_filter(query, owner_type, owner_id)
def _ensure_target_folder_hierarchy(
session: Session,
*,
tenant_id: str,
owner_type: str,
owner_id: str,
user_id: str,
path: str,
) -> None:
parts = normalize_folder(path).split("/") if normalize_folder(path) else []
for index in range(1, len(parts) + 1):
partial = "/".join(parts[:index])
query = _folder_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id).filter(FileFolder.path == partial)
existing = query.one_or_none()
if existing:
if existing.deleted_at is not None:
existing.deleted_at = None
session.add(existing)
continue
folder = FileFolder(
tenant_id=tenant_id,
owner_type=owner_type,
owner_user_id=owner_id if owner_type == "user" else None,
owner_group_id=owner_id if owner_type == "group" else None,
path=partial,
created_by_user_id=user_id,
metadata_={},
)
session.add(folder)