Files
multi-seal-mail/server/app/mailer/attachments/resolver.py

461 lines
16 KiB
Python

from __future__ import annotations
import fnmatch
import re
from enum import StrEnum
from pathlib import Path
from typing import Any, Iterable
from pydantic import BaseModel, ConfigDict, Field
from app.mailer.campaign.entries import load_campaign_entries
from app.mailer.campaign.field_values import effective_entry_field_values
from app.mailer.campaign.models import AttachmentBasePathConfig, AttachmentConfig, Behavior, CampaignConfig, EntryConfig
class AttachmentScope(StrEnum):
GLOBAL = "global"
ENTRY = "entry"
class AttachmentMatchStatus(StrEnum):
OK = "ok"
MISSING = "missing"
AMBIGUOUS = "ambiguous"
class MessageAttachmentStatus(StrEnum):
READY = "ready"
WARNING = "warning"
NEEDS_REVIEW = "needs_review"
BLOCKED = "blocked"
EXCLUDED = "excluded"
INACTIVE = "inactive"
class ResolutionSeverity(StrEnum):
INFO = "info"
WARNING = "warning"
ERROR = "error"
class AttachmentIssue(BaseModel):
model_config = ConfigDict(extra="forbid")
severity: ResolutionSeverity
code: str
message: str
behavior: Behavior | None = None
class ResolvedAttachment(BaseModel):
model_config = ConfigDict(extra="forbid")
scope: AttachmentScope
index: int
attachment_id: str | None = None
label: str | None = None
base_dir_template: str
file_filter_template: str
base_path_name: str | None = None
base_path: str | None = None
base_dir: str
file_filter: str
directory: str
include_subdirs: bool
required: bool
allow_multiple: bool
zip_enabled: bool
status: AttachmentMatchStatus
behavior: Behavior | None = None
matches: list[str] = Field(default_factory=list)
issues: list[AttachmentIssue] = Field(default_factory=list)
class EntryAttachmentResolution(BaseModel):
model_config = ConfigDict(extra="forbid")
entry_index: int
entry_id: str | None = None
active: bool
status: MessageAttachmentStatus
attachments: list[ResolvedAttachment] = Field(default_factory=list)
issues: list[AttachmentIssue] = Field(default_factory=list)
@property
def match_count(self) -> int:
return sum(len(item.matches) for item in self.attachments)
class AttachmentResolutionReport(BaseModel):
model_config = ConfigDict(extra="forbid")
campaign_id: str
campaign_name: str
campaign_file: str
attachments_base_path: str
entries_count: int
entries: list[EntryAttachmentResolution] = Field(default_factory=list)
@property
def ready_count(self) -> int:
return sum(1 for entry in self.entries if entry.status == MessageAttachmentStatus.READY)
@property
def warning_count(self) -> int:
return sum(1 for entry in self.entries if entry.status == MessageAttachmentStatus.WARNING)
@property
def needs_review_count(self) -> int:
return sum(1 for entry in self.entries if entry.status == MessageAttachmentStatus.NEEDS_REVIEW)
@property
def blocked_count(self) -> int:
return sum(1 for entry in self.entries if entry.status == MessageAttachmentStatus.BLOCKED)
@property
def excluded_count(self) -> int:
return sum(1 for entry in self.entries if entry.status == MessageAttachmentStatus.EXCLUDED)
@property
def inactive_count(self) -> int:
return sum(1 for entry in self.entries if entry.status == MessageAttachmentStatus.INACTIVE)
def _resolve_path(campaign_file: str | Path, raw_path: str) -> Path:
campaign_path = Path(campaign_file).resolve()
path = Path(raw_path).expanduser()
if path.is_absolute():
return path
return (campaign_path.parent / path).resolve()
_DOLLAR_FIELD_PATTERN = re.compile(r"(?<!\\)\$\{(.*?)(?<!\\)\}")
_BRACE_FIELD_PATTERN = re.compile(r"(?<!\\)\{\{\s*(.*?)\s*\}\}")
def _normalize_template_key(raw: str) -> str:
key = raw.strip()
if key.startswith("fields."):
key = key.removeprefix("fields.")
elif key.startswith("local."):
key = "local::" + key.removeprefix("local.")
elif key.startswith("global."):
key = "global::" + key.removeprefix("global.")
if key.startswith("local::") or key.startswith("global::"):
return key
if key.startswith("local:"):
return "local::" + key.removeprefix("local:")
if key.startswith("global:"):
return "global::" + key.removeprefix("global:")
return key
def _render_template(template: str, values: dict[str, Any]) -> str:
def replace(match: re.Match[str]) -> str:
key = _normalize_template_key(match.group(1))
if key in values:
value = values[key]
return "" if value is None else str(value)
return match.group(0)
rendered = _DOLLAR_FIELD_PATTERN.sub(replace, template)
rendered = _BRACE_FIELD_PATTERN.sub(replace, rendered)
return rendered.replace(r"\${", "${").replace(r"\}", "}")
def _recipient_values(entry: EntryConfig) -> dict[str, str]:
values: dict[str, str] = {}
for list_name in ["to", "cc", "bcc", "reply_to", "bounce_to", "disposition_notification_to"]:
recipients = getattr(entry, list_name)
for index, recipient in enumerate(recipients):
prefix = f"{list_name}.{index}"
values[f"local::{prefix}.email"] = recipient.email
values[f"local::{prefix}.name"] = recipient.name or ""
values[f"local::{prefix}.type"] = recipient.recipient_type.value
if entry.from_:
values["local::from.email"] = entry.from_.email
values["local::from.name"] = entry.from_.name or ""
values["local::from.type"] = entry.from_.recipient_type.value
return values
def _template_values(config: CampaignConfig, entry: EntryConfig) -> dict[str, Any]:
values: dict[str, Any] = {}
for field in config.fields:
values.setdefault(field.name, "")
values.setdefault(f"global::{field.name}", "")
values.setdefault(f"local::{field.name}", "")
for key, value in config.global_values.items():
values[f"global::{key}"] = value
for key, value in effective_entry_field_values(config, entry).items():
values[key] = value
values[f"local::{key}"] = value
if entry.id:
values["local::id"] = entry.id
values["local::active"] = entry.active
values.update(_recipient_values(entry))
return values
def _rendered_base_dir(config: AttachmentConfig, values: dict[str, Any]) -> str:
rendered = _render_template(config.base_dir, values).strip()
return rendered or "."
def _base_path_by_path(config: CampaignConfig, rendered_base_dir: str) -> AttachmentBasePathConfig | None:
for base_path in config.attachments.base_paths:
if base_path.path == rendered_base_dir:
return base_path
return None
def _base_path_by_id(config: CampaignConfig, base_path_id: str | None) -> AttachmentBasePathConfig | None:
if not base_path_id:
return None
return next((base_path for base_path in config.attachments.base_paths if base_path.id == base_path_id), None)
def _default_base_path(config: CampaignConfig) -> AttachmentBasePathConfig:
return config.attachments.base_paths[0]
def _selected_base_path(
config: CampaignConfig,
attachment_config: AttachmentConfig,
rendered_base_dir: str,
) -> AttachmentBasePathConfig | None:
if not config.attachments.base_paths:
return None
selected_by_id = _base_path_by_id(config, attachment_config.base_path_id)
if selected_by_id is not None:
return selected_by_id
if rendered_base_dir in {"", "."}:
return _default_base_path(config)
return _base_path_by_path(config, rendered_base_dir)
def _rule_allows_multiple(config: AttachmentConfig, rendered_file_filter: str) -> bool:
"""Return whether a rule may produce multiple attachments.
New UI versions no longer expose allow_multiple. Treat wildcard patterns as
inherently multi-match-capable while keeping the legacy allow_multiple flag
for old campaign JSON.
"""
return config.allow_multiple or any(char in rendered_file_filter for char in "*?[")
def _missing_behavior(campaign_config: CampaignConfig, config: AttachmentConfig) -> Behavior:
if config.missing_behavior is not None:
return config.missing_behavior
if config.required:
return campaign_config.validation_policy.missing_required_attachment
return campaign_config.validation_policy.missing_optional_attachment
def _ambiguous_behavior(campaign_config: CampaignConfig, config: AttachmentConfig) -> Behavior:
return config.ambiguous_behavior or campaign_config.validation_policy.ambiguous_attachment_match
def _entry_attachment_allowed(config: CampaignConfig, attachment_config: AttachmentConfig, values: dict[str, Any]) -> bool:
rendered_base_dir = _rendered_base_dir(attachment_config, values)
selected = _selected_base_path(config, attachment_config, rendered_base_dir)
if config.attachments.base_paths:
return bool(selected and selected.allow_individual)
return config.attachments.allow_individual
def _iter_effective_attachment_configs(
config: CampaignConfig,
entry: EntryConfig,
values: dict[str, Any],
) -> Iterable[tuple[AttachmentScope, int, AttachmentConfig]]:
if entry.combine_attachments:
for index, attachment_config in enumerate(config.attachments.global_):
yield AttachmentScope.GLOBAL, index, attachment_config
for index, attachment_config in enumerate(entry.attachments):
if _entry_attachment_allowed(config, attachment_config, values):
yield AttachmentScope.ENTRY, index, attachment_config
def _resolve_attachment_directory(
*,
campaign_file: str | Path,
campaign_config: CampaignConfig,
attachment_config: AttachmentConfig,
rendered_base_dir: str,
) -> tuple[Path, AttachmentBasePathConfig | None]:
"""Resolve the directory for an attachment rule.
Legacy campaigns used attachments.base_path as the root and base_dir as a
child directory. Current WebUI campaigns select one named base path directly
in base_dir. Prefer the new base_paths list when present to avoid resolving
e.g. attachments/base_path + base_dir twice.
"""
selected_base_path = _selected_base_path(campaign_config, attachment_config, rendered_base_dir)
if selected_base_path is not None:
return _resolve_path(campaign_file, selected_base_path.path), selected_base_path
if campaign_config.attachments.base_paths:
return _resolve_path(campaign_file, rendered_base_dir), None
legacy_root = _resolve_path(campaign_file, campaign_config.attachments.base_path)
return (legacy_root / rendered_base_dir).resolve(), None
def _match_files(directory: Path, file_filter: str, include_subdirs: bool) -> list[Path]:
if not directory.exists() or not directory.is_dir():
return []
if include_subdirs:
# pathlib.rglob accepts glob patterns, but fnmatch keeps behavior predictable
# when file_filter is supplied as the Java-style filter portion only.
return sorted(path for path in directory.rglob("*") if path.is_file() and fnmatch.fnmatch(path.name, file_filter))
return sorted(path for path in directory.glob(file_filter) if path.is_file())
def _issue_for_missing(config: AttachmentConfig, behavior: Behavior) -> AttachmentIssue:
code = "missing_required_attachment" if config.required else "missing_optional_attachment"
severity = ResolutionSeverity.ERROR if config.required and behavior == Behavior.BLOCK else ResolutionSeverity.WARNING
return AttachmentIssue(
severity=severity,
code=code,
message=f"No file matched attachment filter {config.file_filter!r}",
behavior=behavior,
)
def _issue_for_ambiguous(config: AttachmentConfig, behavior: Behavior, match_count: int) -> AttachmentIssue:
severity = ResolutionSeverity.ERROR if behavior == Behavior.BLOCK else ResolutionSeverity.WARNING
return AttachmentIssue(
severity=severity,
code="ambiguous_attachment_match",
message=f"Attachment filter {config.file_filter!r} matched {match_count} files, but it is configured as a direct/single-file selection",
behavior=behavior,
)
def _resolve_one_config(
*,
campaign_file: str | Path,
campaign_config: CampaignConfig,
values: dict[str, Any],
scope: AttachmentScope,
index: int,
config: AttachmentConfig,
) -> ResolvedAttachment:
rendered_base_dir = _rendered_base_dir(config, values)
rendered_file_filter = _render_template(config.file_filter, values)
directory, selected_base_path = _resolve_attachment_directory(
campaign_file=campaign_file,
campaign_config=campaign_config,
attachment_config=config,
rendered_base_dir=rendered_base_dir,
)
matches = _match_files(directory, rendered_file_filter, config.include_subdirs)
allow_multiple = _rule_allows_multiple(config, rendered_file_filter)
issues: list[AttachmentIssue] = []
behavior: Behavior | None = None
if not matches:
status = AttachmentMatchStatus.MISSING
behavior = _missing_behavior(campaign_config, config)
issues.append(_issue_for_missing(config, behavior))
elif len(matches) > 1 and not allow_multiple:
status = AttachmentMatchStatus.AMBIGUOUS
behavior = _ambiguous_behavior(campaign_config, config)
issues.append(_issue_for_ambiguous(config, behavior, len(matches)))
else:
status = AttachmentMatchStatus.OK
return ResolvedAttachment(
scope=scope,
index=index,
attachment_id=config.id,
label=config.label,
base_dir_template=config.base_dir,
file_filter_template=config.file_filter,
base_path_name=selected_base_path.name if selected_base_path else None,
base_path=selected_base_path.path if selected_base_path else None,
base_dir=rendered_base_dir,
file_filter=rendered_file_filter,
directory=str(directory),
include_subdirs=config.include_subdirs,
required=config.required,
allow_multiple=allow_multiple,
zip_enabled=config.zip.enabled,
status=status,
behavior=behavior,
matches=[str(path) for path in matches],
issues=issues,
)
def _status_from_issues(active: bool, issues: list[AttachmentIssue]) -> MessageAttachmentStatus:
if not active:
return MessageAttachmentStatus.INACTIVE
behaviors = {issue.behavior for issue in issues if issue.behavior is not None}
if Behavior.BLOCK in behaviors:
return MessageAttachmentStatus.BLOCKED
if Behavior.DROP in behaviors:
return MessageAttachmentStatus.EXCLUDED
if Behavior.ASK in behaviors:
return MessageAttachmentStatus.NEEDS_REVIEW
if Behavior.WARN in behaviors:
return MessageAttachmentStatus.WARNING
return MessageAttachmentStatus.READY
def resolve_entry_attachments(
*,
config: CampaignConfig,
campaign_file: str | Path,
entry: EntryConfig,
entry_index: int,
) -> EntryAttachmentResolution:
values = _template_values(config, entry)
resolved: list[ResolvedAttachment] = []
if entry.active:
for scope, index, attachment_config in _iter_effective_attachment_configs(config, entry, values):
resolved.append(
_resolve_one_config(
campaign_file=campaign_file,
campaign_config=config,
values=values,
scope=scope,
index=index,
config=attachment_config,
)
)
issues = [issue for item in resolved for issue in item.issues]
return EntryAttachmentResolution(
entry_index=entry_index,
entry_id=entry.id,
active=entry.active,
status=_status_from_issues(entry.active, issues),
attachments=resolved,
issues=issues,
)
def resolve_campaign_attachments(config: CampaignConfig, *, campaign_file: str | Path) -> AttachmentResolutionReport:
entries = load_campaign_entries(config, campaign_file=campaign_file)
base_path = _resolve_path(campaign_file, config.attachments.base_paths[0].path if config.attachments.base_paths else config.attachments.base_path)
resolved_entries = [
resolve_entry_attachments(config=config, campaign_file=campaign_file, entry=entry, entry_index=index)
for index, entry in enumerate(entries, start=1)
]
return AttachmentResolutionReport(
campaign_id=config.campaign.id,
campaign_name=config.campaign.name,
campaign_file=str(Path(campaign_file).resolve()),
attachments_base_path=str(base_path),
entries_count=len(entries),
entries=resolved_entries,
)