from __future__ import annotations import fnmatch import re from enum import StrEnum from pathlib import Path from typing import Any, Iterable from pydantic import BaseModel, ConfigDict, Field from app.mailer.campaign.entries import load_campaign_entries from app.mailer.campaign.field_values import effective_entry_field_values from app.mailer.campaign.models import AttachmentBasePathConfig, AttachmentConfig, Behavior, CampaignConfig, EntryConfig class AttachmentScope(StrEnum): GLOBAL = "global" ENTRY = "entry" class AttachmentMatchStatus(StrEnum): OK = "ok" MISSING = "missing" AMBIGUOUS = "ambiguous" class MessageAttachmentStatus(StrEnum): READY = "ready" WARNING = "warning" NEEDS_REVIEW = "needs_review" BLOCKED = "blocked" EXCLUDED = "excluded" INACTIVE = "inactive" class ResolutionSeverity(StrEnum): INFO = "info" WARNING = "warning" ERROR = "error" class AttachmentIssue(BaseModel): model_config = ConfigDict(extra="forbid") severity: ResolutionSeverity code: str message: str behavior: Behavior | None = None class ResolvedAttachment(BaseModel): model_config = ConfigDict(extra="forbid") scope: AttachmentScope index: int attachment_id: str | None = None label: str | None = None base_dir_template: str file_filter_template: str base_path_name: str | None = None base_path: str | None = None base_dir: str file_filter: str directory: str include_subdirs: bool required: bool allow_multiple: bool zip_enabled: bool status: AttachmentMatchStatus behavior: Behavior | None = None matches: list[str] = Field(default_factory=list) issues: list[AttachmentIssue] = Field(default_factory=list) class EntryAttachmentResolution(BaseModel): model_config = ConfigDict(extra="forbid") entry_index: int entry_id: str | None = None active: bool status: MessageAttachmentStatus attachments: list[ResolvedAttachment] = Field(default_factory=list) issues: list[AttachmentIssue] = Field(default_factory=list) @property def match_count(self) -> int: return sum(len(item.matches) for item in self.attachments) class AttachmentResolutionReport(BaseModel): model_config = ConfigDict(extra="forbid") campaign_id: str campaign_name: str campaign_file: str attachments_base_path: str entries_count: int entries: list[EntryAttachmentResolution] = Field(default_factory=list) @property def ready_count(self) -> int: return sum(1 for entry in self.entries if entry.status == MessageAttachmentStatus.READY) @property def warning_count(self) -> int: return sum(1 for entry in self.entries if entry.status == MessageAttachmentStatus.WARNING) @property def needs_review_count(self) -> int: return sum(1 for entry in self.entries if entry.status == MessageAttachmentStatus.NEEDS_REVIEW) @property def blocked_count(self) -> int: return sum(1 for entry in self.entries if entry.status == MessageAttachmentStatus.BLOCKED) @property def excluded_count(self) -> int: return sum(1 for entry in self.entries if entry.status == MessageAttachmentStatus.EXCLUDED) @property def inactive_count(self) -> int: return sum(1 for entry in self.entries if entry.status == MessageAttachmentStatus.INACTIVE) def _resolve_path(campaign_file: str | Path, raw_path: str) -> Path: campaign_path = Path(campaign_file).resolve() path = Path(raw_path).expanduser() if path.is_absolute(): return path return (campaign_path.parent / path).resolve() _DOLLAR_FIELD_PATTERN = re.compile(r"(? str: key = raw.strip() if key.startswith("fields."): key = key.removeprefix("fields.") elif key.startswith("local."): key = "local::" + key.removeprefix("local.") elif key.startswith("global."): key = "global::" + key.removeprefix("global.") if key.startswith("local::") or key.startswith("global::"): return key if key.startswith("local:"): return "local::" + key.removeprefix("local:") if key.startswith("global:"): return "global::" + key.removeprefix("global:") return key def _render_template(template: str, values: dict[str, Any]) -> str: def replace(match: re.Match[str]) -> str: key = _normalize_template_key(match.group(1)) if key in values: value = values[key] return "" if value is None else str(value) return match.group(0) rendered = _DOLLAR_FIELD_PATTERN.sub(replace, template) rendered = _BRACE_FIELD_PATTERN.sub(replace, rendered) return rendered.replace(r"\${", "${").replace(r"\}", "}") def _recipient_values(entry: EntryConfig) -> dict[str, str]: values: dict[str, str] = {} for list_name in ["to", "cc", "bcc", "reply_to", "bounce_to", "disposition_notification_to"]: recipients = getattr(entry, list_name) for index, recipient in enumerate(recipients): prefix = f"{list_name}.{index}" values[f"local::{prefix}.email"] = recipient.email values[f"local::{prefix}.name"] = recipient.name or "" values[f"local::{prefix}.type"] = recipient.recipient_type.value if entry.from_: values["local::from.email"] = entry.from_.email values["local::from.name"] = entry.from_.name or "" values["local::from.type"] = entry.from_.recipient_type.value return values def _template_values(config: CampaignConfig, entry: EntryConfig) -> dict[str, Any]: values: dict[str, Any] = {} for key, value in config.global_values.items(): values[f"global::{key}"] = value for key, value in effective_entry_field_values(config, entry).items(): values[f"local::{key}"] = value if entry.id: values["local::id"] = entry.id values["local::active"] = entry.active values.update(_recipient_values(entry)) return values def _rendered_base_dir(config: AttachmentConfig, values: dict[str, Any]) -> str: rendered = _render_template(config.base_dir, values).strip() return rendered or "." def _base_path_by_path(config: CampaignConfig, rendered_base_dir: str) -> AttachmentBasePathConfig | None: for base_path in config.attachments.base_paths: if base_path.path == rendered_base_dir: return base_path return None def _default_base_path(config: CampaignConfig) -> AttachmentBasePathConfig: return config.attachments.base_paths[0] def _selected_base_path(config: CampaignConfig, rendered_base_dir: str) -> AttachmentBasePathConfig | None: if config.attachments.base_paths: if rendered_base_dir in {"", "."}: return _default_base_path(config) return _base_path_by_path(config, rendered_base_dir) return None def _rule_allows_multiple(config: AttachmentConfig, rendered_file_filter: str) -> bool: """Return whether a rule may produce multiple attachments. New UI versions no longer expose allow_multiple. Treat wildcard patterns as inherently multi-match-capable while keeping the legacy allow_multiple flag for old campaign JSON. """ return config.allow_multiple or any(char in rendered_file_filter for char in "*?[") def _missing_behavior(campaign_config: CampaignConfig, config: AttachmentConfig) -> Behavior: if config.missing_behavior is not None: return config.missing_behavior if config.required: return campaign_config.validation_policy.missing_required_attachment return campaign_config.validation_policy.missing_optional_attachment def _ambiguous_behavior(campaign_config: CampaignConfig, config: AttachmentConfig) -> Behavior: return config.ambiguous_behavior or campaign_config.validation_policy.ambiguous_attachment_match def _entry_attachment_allowed(config: CampaignConfig, attachment_config: AttachmentConfig, values: dict[str, Any]) -> bool: rendered_base_dir = _rendered_base_dir(attachment_config, values) individual_paths = config.attachments.individual_base_path_values if individual_paths: return rendered_base_dir in individual_paths return config.attachments.allow_individual def _iter_effective_attachment_configs( config: CampaignConfig, entry: EntryConfig, values: dict[str, Any], ) -> Iterable[tuple[AttachmentScope, int, AttachmentConfig]]: if entry.combine_attachments: for index, attachment_config in enumerate(config.attachments.global_): yield AttachmentScope.GLOBAL, index, attachment_config for index, attachment_config in enumerate(entry.attachments): if _entry_attachment_allowed(config, attachment_config, values): yield AttachmentScope.ENTRY, index, attachment_config def _resolve_attachment_directory( *, campaign_file: str | Path, campaign_config: CampaignConfig, rendered_base_dir: str, ) -> tuple[Path, AttachmentBasePathConfig | None]: """Resolve the directory for an attachment rule. Legacy campaigns used attachments.base_path as the root and base_dir as a child directory. Current WebUI campaigns select one named base path directly in base_dir. Prefer the new base_paths list when present to avoid resolving e.g. attachments/base_path + base_dir twice. """ selected_base_path = _selected_base_path(campaign_config, rendered_base_dir) if selected_base_path is not None: return _resolve_path(campaign_file, selected_base_path.path), selected_base_path if campaign_config.attachments.base_paths: return _resolve_path(campaign_file, rendered_base_dir), None legacy_root = _resolve_path(campaign_file, campaign_config.attachments.base_path) return (legacy_root / rendered_base_dir).resolve(), None def _match_files(directory: Path, file_filter: str, include_subdirs: bool) -> list[Path]: if not directory.exists() or not directory.is_dir(): return [] if include_subdirs: # pathlib.rglob accepts glob patterns, but fnmatch keeps behavior predictable # when file_filter is supplied as the Java-style filter portion only. return sorted(path for path in directory.rglob("*") if path.is_file() and fnmatch.fnmatch(path.name, file_filter)) return sorted(path for path in directory.glob(file_filter) if path.is_file()) def _issue_for_missing(config: AttachmentConfig, behavior: Behavior) -> AttachmentIssue: code = "missing_required_attachment" if config.required else "missing_optional_attachment" severity = ResolutionSeverity.ERROR if config.required and behavior == Behavior.BLOCK else ResolutionSeverity.WARNING return AttachmentIssue( severity=severity, code=code, message=f"No file matched attachment filter {config.file_filter!r}", behavior=behavior, ) def _issue_for_ambiguous(config: AttachmentConfig, behavior: Behavior, match_count: int) -> AttachmentIssue: severity = ResolutionSeverity.ERROR if behavior == Behavior.BLOCK else ResolutionSeverity.WARNING return AttachmentIssue( severity=severity, code="ambiguous_attachment_match", message=f"Attachment filter {config.file_filter!r} matched {match_count} files, but it is configured as a direct/single-file selection", behavior=behavior, ) def _resolve_one_config( *, campaign_file: str | Path, campaign_config: CampaignConfig, values: dict[str, Any], scope: AttachmentScope, index: int, config: AttachmentConfig, ) -> ResolvedAttachment: rendered_base_dir = _rendered_base_dir(config, values) rendered_file_filter = _render_template(config.file_filter, values) directory, selected_base_path = _resolve_attachment_directory( campaign_file=campaign_file, campaign_config=campaign_config, rendered_base_dir=rendered_base_dir, ) matches = _match_files(directory, rendered_file_filter, config.include_subdirs) allow_multiple = _rule_allows_multiple(config, rendered_file_filter) issues: list[AttachmentIssue] = [] behavior: Behavior | None = None if not matches: status = AttachmentMatchStatus.MISSING behavior = _missing_behavior(campaign_config, config) issues.append(_issue_for_missing(config, behavior)) elif len(matches) > 1 and not allow_multiple: status = AttachmentMatchStatus.AMBIGUOUS behavior = _ambiguous_behavior(campaign_config, config) issues.append(_issue_for_ambiguous(config, behavior, len(matches))) else: status = AttachmentMatchStatus.OK return ResolvedAttachment( scope=scope, index=index, attachment_id=config.id, label=config.label, base_dir_template=config.base_dir, file_filter_template=config.file_filter, base_path_name=selected_base_path.name if selected_base_path else None, base_path=selected_base_path.path if selected_base_path else None, base_dir=rendered_base_dir, file_filter=rendered_file_filter, directory=str(directory), include_subdirs=config.include_subdirs, required=config.required, allow_multiple=allow_multiple, zip_enabled=config.zip.enabled, status=status, behavior=behavior, matches=[str(path) for path in matches], issues=issues, ) def _status_from_issues(active: bool, issues: list[AttachmentIssue]) -> MessageAttachmentStatus: if not active: return MessageAttachmentStatus.INACTIVE behaviors = {issue.behavior for issue in issues if issue.behavior is not None} if Behavior.BLOCK in behaviors: return MessageAttachmentStatus.BLOCKED if Behavior.DROP in behaviors: return MessageAttachmentStatus.EXCLUDED if Behavior.ASK in behaviors: return MessageAttachmentStatus.NEEDS_REVIEW if Behavior.WARN in behaviors: return MessageAttachmentStatus.WARNING return MessageAttachmentStatus.READY def resolve_entry_attachments( *, config: CampaignConfig, campaign_file: str | Path, entry: EntryConfig, entry_index: int, ) -> EntryAttachmentResolution: values = _template_values(config, entry) resolved: list[ResolvedAttachment] = [] if entry.active: for scope, index, attachment_config in _iter_effective_attachment_configs(config, entry, values): resolved.append( _resolve_one_config( campaign_file=campaign_file, campaign_config=config, values=values, scope=scope, index=index, config=attachment_config, ) ) issues = [issue for item in resolved for issue in item.issues] return EntryAttachmentResolution( entry_index=entry_index, entry_id=entry.id, active=entry.active, status=_status_from_issues(entry.active, issues), attachments=resolved, issues=issues, ) def resolve_campaign_attachments(config: CampaignConfig, *, campaign_file: str | Path) -> AttachmentResolutionReport: entries = load_campaign_entries(config, campaign_file=campaign_file) base_path = _resolve_path(campaign_file, config.attachments.base_paths[0].path if config.attachments.base_paths else config.attachments.base_path) resolved_entries = [ resolve_entry_attachments(config=config, campaign_file=campaign_file, entry=entry, entry_index=index) for index, entry in enumerate(entries, start=1) ] return AttachmentResolutionReport( campaign_id=config.campaign.id, campaign_name=config.campaign.name, campaign_file=str(Path(campaign_file).resolve()), attachments_base_path=str(base_path), entries_count=len(entries), entries=resolved_entries, )