78 lines
3.0 KiB
Python
78 lines
3.0 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
from typing import Iterable
|
|
|
|
from app.db.models import FileAsset
|
|
from app.storage.common import ResolvedPattern
|
|
from app.storage.paths import normalize_folder, normalize_logical_path
|
|
|
|
|
|
def _normalize_pattern(pattern: str) -> str:
|
|
if pattern.strip() in {"", "*"}:
|
|
return "*"
|
|
return normalize_logical_path(pattern, fallback_filename="*")
|
|
|
|
|
|
def _logical_glob_regex(pattern: str, *, case_sensitive: bool = False) -> re.Pattern[str]:
|
|
"""Compile Multi Seal Mail logical globs.
|
|
|
|
`*` and `?` stay within one folder segment. `**` crosses folder
|
|
boundaries, and `**/` also matches the current folder so `**/*.pdf`
|
|
returns direct and nested PDF files.
|
|
"""
|
|
|
|
pattern = _normalize_pattern(pattern)
|
|
pieces = ["^"]
|
|
index = 0
|
|
while index < len(pattern):
|
|
char = pattern[index]
|
|
if char == "*":
|
|
if index + 1 < len(pattern) and pattern[index + 1] == "*":
|
|
index += 2
|
|
if index < len(pattern) and pattern[index] == "/":
|
|
pieces.append("(?:.*/)?")
|
|
index += 1
|
|
else:
|
|
pieces.append(".*")
|
|
continue
|
|
pieces.append("[^/]*")
|
|
elif char == "?":
|
|
pieces.append("[^/]")
|
|
else:
|
|
pieces.append(re.escape(char))
|
|
index += 1
|
|
pieces.append("$")
|
|
flags = 0 if case_sensitive else re.IGNORECASE
|
|
return re.compile("".join(pieces), flags)
|
|
|
|
|
|
def _relative_display_path(asset: FileAsset, base_path: str | None) -> str:
|
|
path = normalize_logical_path(asset.display_path)
|
|
base = normalize_folder(base_path)
|
|
if not base:
|
|
return path
|
|
prefix = f"{base}/"
|
|
if path.startswith(prefix):
|
|
return path[len(prefix) :]
|
|
return path
|
|
|
|
|
|
def match_assets(assets: Iterable[FileAsset], pattern: str, *, base_path: str | None = None, case_sensitive: bool = False) -> list[FileAsset]:
|
|
regex = _logical_glob_regex(pattern, case_sensitive=case_sensitive)
|
|
normalized_pattern = _normalize_pattern(pattern)
|
|
has_path_context = base_path is not None or "/" in normalized_pattern or "**" in normalized_pattern
|
|
matches: list[FileAsset] = []
|
|
for asset in assets:
|
|
candidates = [_relative_display_path(asset, base_path)] if has_path_context else [asset.display_path, asset.filename]
|
|
if any(regex.match(candidate) for candidate in candidates):
|
|
matches.append(asset)
|
|
return matches
|
|
|
|
|
|
def resolve_patterns(assets: list[FileAsset], patterns: list[str], *, base_path: str | None = None, case_sensitive: bool = False) -> tuple[list[ResolvedPattern], list[FileAsset]]:
|
|
resolved = [ResolvedPattern(pattern=pattern, matches=match_assets(assets, pattern, base_path=base_path, case_sensitive=case_sensitive)) for pattern in patterns]
|
|
matched_ids = {asset.id for item in resolved for asset in item.matches}
|
|
unmatched = [asset for asset in assets if asset.id not in matched_ids]
|
|
return resolved, unmatched
|