from __future__ import annotations import mimetypes import re import tempfile from dataclasses import dataclass from email.message import EmailMessage from email.utils import formataddr, make_msgid, formatdate from pathlib import Path from typing import Any, Iterable from app.mailer.attachments.resolver import ( AttachmentMatchStatus, EntryAttachmentResolution, MessageAttachmentStatus, ResolvedAttachment, resolve_entry_attachments, ) from app.mailer.campaign.entries import load_campaign_entries from app.mailer.campaign.field_values import effective_entry_field_values, ignored_entry_field_overrides from app.mailer.campaign.models import ( Behavior, BuildStatus, CampaignConfig, EntryConfig, MissingAddressBehavior, RecipientConfig, SendStatus, ) from app.mailer.services.zip_service import create_encrypted_zip from .models import ( CampaignBuildReport, ImapStatus, MessageAddress, MessageAttachmentSummary, MessageDraft, MessageIssue, MessageValidationStatus, ) _DOLLAR_FIELD_PATTERN = re.compile(r"(? str: key = raw.strip() if key.startswith("fields."): key = key.removeprefix("fields.") elif key.startswith("local."): key = "local::" + key.removeprefix("local.") elif key.startswith("global."): key = "global::" + key.removeprefix("global.") if key.startswith("local::") or key.startswith("global::"): return key if key.startswith("local:"): return "local::" + key.removeprefix("local:") if key.startswith("global:"): return "global::" + key.removeprefix("global:") return key @dataclass(slots=True) class BuiltMessage: draft: MessageDraft mime: EmailMessage | None @dataclass(slots=True) class CampaignBuildResult: report: CampaignBuildReport built_messages: list[BuiltMessage] def _resolve(campaign_file: str | Path, raw_path: str) -> Path: campaign_path = Path(campaign_file).resolve() path = Path(raw_path).expanduser() if path.is_absolute(): return path return (campaign_path.parent / path).resolve() def _read_text(campaign_file: str | Path, raw_path: str | None, encoding: str = "utf-8") -> str | None: if not raw_path: return None path = _resolve(campaign_file, raw_path) return path.read_text(encoding=encoding) def _render_template(template: str, values: dict[str, Any], *, keep_missing: bool = True) -> str: def replace(match: re.Match[str]) -> str: key = _normalize_template_key(match.group(1)) if key in values: value = values[key] return "" if value is None else str(value) return match.group(0) if keep_missing else "" rendered = _DOLLAR_FIELD_PATTERN.sub(replace, template) rendered = _BRACE_FIELD_PATTERN.sub(replace, rendered) return rendered.replace(r"\${", "${").replace(r"\}", "}") def _find_unresolved_placeholders(text: str | None) -> set[str]: if not text: return set() return { _normalize_template_key(match.group(1)) for pattern in (_DOLLAR_FIELD_PATTERN, _BRACE_FIELD_PATTERN) for match in pattern.finditer(text) } def _recipient_values(entry: EntryConfig) -> dict[str, str]: values: dict[str, str] = {} for list_name in ["to", "cc", "bcc", "reply_to", "bounce_to", "disposition_notification_to"]: recipients = getattr(entry, list_name) for index, recipient in enumerate(recipients): prefix = f"{list_name}.{index}" values[f"local::{prefix}.email"] = recipient.email values[f"local::{prefix}.name"] = recipient.name or "" values[f"local::{prefix}.type"] = recipient.recipient_type.value if entry.from_: values["local::from.email"] = entry.from_.email values["local::from.name"] = entry.from_.name or "" values["local::from.type"] = entry.from_.recipient_type.value return values def _template_values(config: CampaignConfig, entry: EntryConfig) -> dict[str, Any]: values: dict[str, Any] = {} for field in config.fields: values.setdefault(field.name, "") values.setdefault(f"global::{field.name}", "") values.setdefault(f"local::{field.name}", "") for key, value in config.global_values.items(): values[f"global::{key}"] = value for key, value in effective_entry_field_values(config, entry).items(): values[key] = value values[f"local::{key}"] = value if entry.id: values["local::id"] = entry.id values["local::active"] = entry.active values.update(_recipient_values(entry)) return values def _message_address(recipient: RecipientConfig | None) -> MessageAddress | None: if recipient is None: return None return MessageAddress(email=recipient.email, name=recipient.name) def _message_addresses(recipients: Iterable[RecipientConfig]) -> list[MessageAddress]: return [MessageAddress(email=recipient.email, name=recipient.name) for recipient in recipients] def _format_recipient(recipient: RecipientConfig) -> str: return formataddr((recipient.name or recipient.email, recipient.email)) def _format_recipient_header(recipients: Iterable[RecipientConfig]) -> str: return ", ".join(_format_recipient(recipient) for recipient in recipients) def _effective_sender(config: CampaignConfig, entry: EntryConfig) -> RecipientConfig | None: if config.recipients.allow_individual_from and entry.from_: return entry.from_ return config.recipients.from_ def _combine_recipients( *, allow_individual: bool, combine: bool, global_recipients: list[RecipientConfig], entry_recipients: list[RecipientConfig], ) -> list[RecipientConfig]: recipients: list[RecipientConfig] = [] if not allow_individual or combine: recipients.extend(global_recipients) if allow_individual: recipients.extend(entry_recipients) # keep order while avoiding exact duplicate email/type pairs seen: set[tuple[str, str]] = set() unique: list[RecipientConfig] = [] for recipient in recipients: key = (recipient.email.lower(), recipient.recipient_type.value) if key in seen: continue seen.add(key) unique.append(recipient) return unique def _effective_recipients(config: CampaignConfig, entry: EntryConfig) -> dict[str, list[RecipientConfig]]: return { "to": _combine_recipients( allow_individual=config.recipients.allow_individual_to, combine=entry.combine_to, global_recipients=config.recipients.to, entry_recipients=entry.to, ), "cc": _combine_recipients( allow_individual=config.recipients.allow_individual_cc, combine=entry.combine_cc, global_recipients=config.recipients.cc, entry_recipients=entry.cc, ), "bcc": _combine_recipients( allow_individual=config.recipients.allow_individual_bcc, combine=entry.combine_bcc, global_recipients=config.recipients.bcc, entry_recipients=entry.bcc, ), "reply_to": _combine_recipients( allow_individual=config.recipients.allow_individual_reply_to, combine=entry.combine_reply_to, global_recipients=config.recipients.reply_to, entry_recipients=entry.reply_to, ), "bounce_to": _combine_recipients( allow_individual=config.recipients.allow_individual_bounce_to, combine=entry.combine_bounce_to, global_recipients=config.recipients.bounce_to, entry_recipients=entry.bounce_to, ), "disposition_notification_to": _combine_recipients( allow_individual=config.recipients.allow_individual_disposition_notification_to, combine=entry.combine_disposition_notification_to, global_recipients=config.recipients.disposition_notification_to, entry_recipients=entry.disposition_notification_to, ), } def _load_template_parts(config: CampaignConfig, campaign_file: str | Path) -> tuple[str, str | None, str | None]: template = config.template if template.source: subject = _read_text(campaign_file, template.source.subject_path, template.source.encoding) text = _read_text(campaign_file, template.source.text_path, template.source.encoding) html = _read_text(campaign_file, template.source.html_path, template.source.encoding) return subject or "", text, html return template.subject or "", template.text, template.html def _issue_from_behavior(*, code: str, message: str, behavior: str, source: str) -> MessageIssue: severity = "error" if behavior == "block" else "warning" return MessageIssue(severity=severity, code=code, message=message, behavior=behavior, source=source) def _apply_behavior(current: MessageValidationStatus, behavior: str) -> MessageValidationStatus: if behavior == Behavior.BLOCK.value: return MessageValidationStatus.BLOCKED if behavior == Behavior.DROP.value: return MessageValidationStatus.EXCLUDED if behavior == Behavior.ASK.value: if current not in {MessageValidationStatus.BLOCKED, MessageValidationStatus.EXCLUDED}: return MessageValidationStatus.NEEDS_REVIEW if behavior == Behavior.WARN.value: if current == MessageValidationStatus.READY: return MessageValidationStatus.WARNING # continue leaves status as-is return current def _validation_status_from_attachment_status(status: MessageAttachmentStatus) -> MessageValidationStatus: return MessageValidationStatus(status.value) def _attachment_summaries(resolution: EntryAttachmentResolution) -> list[MessageAttachmentSummary]: return [ MessageAttachmentSummary( attachment_id=attachment.attachment_id, label=attachment.label, status=attachment.status.value, behavior=attachment.behavior.value if attachment.behavior else None, required=attachment.required, allow_multiple=attachment.allow_multiple, zip_enabled=attachment.zip_enabled, base_path_name=attachment.base_path_name, base_path=attachment.base_path, file_filter=attachment.file_filter, directory=attachment.directory, matches=attachment.matches, ) for attachment in resolution.attachments ] def _message_issues_from_attachment_resolution(resolution: EntryAttachmentResolution) -> list[MessageIssue]: return [ MessageIssue( severity=issue.severity.value, code=issue.code, message=issue.message, behavior=issue.behavior.value if issue.behavior else None, source="attachments", ) for issue in resolution.issues ] def _safe_filename(value: str | None, fallback: str) -> str: raw = value or fallback safe = re.sub(r"[^A-Za-z0-9_.-]+", "_", raw).strip("._") return safe or fallback def _attachment_bytes(path: Path) -> tuple[bytes, str, str]: data = path.read_bytes() mime_type, _ = mimetypes.guess_type(str(path)) if not mime_type: return data, "application", "octet-stream" maintype, subtype = mime_type.split("/", 1) return data, maintype, subtype def _render_zip_filename( *, attachment: ResolvedAttachment, values: dict[str, Any], entry: EntryConfig, default_index: int, ) -> str: template = attachment.attachment_id or attachment.label or f"attachments-{default_index}" # The resolver summary does not carry the full ZipConfig, so the build step receives # filename/password through the resolved attachment's original config by re-resolving # via a private companion in _zip_config_for_attachment. rendered = _render_template(template, values, keep_missing=False) if not rendered.lower().endswith(".zip"): rendered += ".zip" return _safe_filename(rendered, f"entry-{entry.id or default_index}.zip") def _iter_attachment_configs_for_resolution(config: CampaignConfig, entry: EntryConfig): if entry.combine_attachments: for index, attachment_config in enumerate(config.attachments.global_): yield "global", index, attachment_config for index, attachment_config in enumerate(entry.attachments): yield "entry", index, attachment_config def _zip_config_for_attachment(config: CampaignConfig, entry: EntryConfig, resolved: ResolvedAttachment): for scope, index, attachment_config in _iter_attachment_configs_for_resolution(config, entry): if scope == resolved.scope.value and index == resolved.index: return attachment_config.zip return None def _attach_files( *, message: EmailMessage, config: CampaignConfig, entry: EntryConfig, resolution: EntryAttachmentResolution, values: dict[str, Any], work_dir: Path, ) -> int: attached_count = 0 zip_dir = work_dir / "_zip" zip_dir.mkdir(parents=True, exist_ok=True) for index, attachment in enumerate(resolution.attachments, start=1): # Missing/ambiguous configs still keep the message draft. They simply do not add files. if attachment.status != AttachmentMatchStatus.OK: continue match_paths = [Path(match) for match in attachment.matches] if not match_paths: continue zip_config = _zip_config_for_attachment(config, entry, attachment) if attachment.zip_enabled: filename_template = zip_config.filename_template if zip_config else None if filename_template: filename = _safe_filename(_render_template(filename_template, values, keep_missing=False), f"entry-{entry.entry_id if hasattr(entry, 'entry_id') else index}.zip") if not filename.lower().endswith(".zip"): filename += ".zip" else: filename = _render_zip_filename(attachment=attachment, values=values, entry=entry, default_index=index) password = _render_template(zip_config.password_template or "", values, keep_missing=False) if zip_config else "" zip_path = create_encrypted_zip(zip_dir / filename, match_paths, password) files_to_attach = [zip_path] else: files_to_attach = match_paths for path in files_to_attach: data, maintype, subtype = _attachment_bytes(path) message.add_attachment(data, maintype=maintype, subtype=subtype, filename=path.name) attached_count += 1 return attached_count def _imap_initial_status(config: CampaignConfig) -> ImapStatus: if config.delivery.imap_append_sent.enabled: return ImapStatus.PENDING return ImapStatus.NOT_REQUESTED def _write_eml(message: EmailMessage, output_dir: Path, entry: EntryConfig, entry_index: int) -> tuple[str, int]: output_dir.mkdir(parents=True, exist_ok=True) filename = _safe_filename(entry.id, f"entry-{entry_index:04d}") + ".eml" path = output_dir / filename path.write_bytes(bytes(message)) return str(path), path.stat().st_size def build_entry_message( *, config: CampaignConfig, campaign_file: str | Path, entry: EntryConfig, entry_index: int, output_dir: Path | None = None, write_eml: bool = False, work_dir: Path | None = None, ) -> BuiltMessage: resolution = resolve_entry_attachments(config=config, campaign_file=campaign_file, entry=entry, entry_index=entry_index) recipients = _effective_recipients(config, entry) sender = _effective_sender(config, entry) issues = _message_issues_from_attachment_resolution(resolution) validation_status = _validation_status_from_attachment_status(resolution.status) ignored_field_overrides = ignored_entry_field_overrides(config, entry) if ignored_field_overrides: issues.append( MessageIssue( severity="warning", code="field_override_not_allowed", message="Recipient field value(s) ignored because the campaign field does not allow overrides: " + ", ".join(ignored_field_overrides), behavior="warn", source="fields", ) ) if validation_status == MessageValidationStatus.READY: validation_status = MessageValidationStatus.WARNING if not entry.active: draft = MessageDraft( entry_index=entry_index, entry_id=entry.id, active=False, build_status=BuildStatus.BUILD_FAILED, validation_status=MessageValidationStatus.INACTIVE, send_status=SendStatus.DRAFT, imap_status=ImapStatus.SKIPPED, from_=_message_address(sender), to=_message_addresses(recipients["to"]), cc=_message_addresses(recipients["cc"]), bcc=_message_addresses(recipients["bcc"]), reply_to=_message_addresses(recipients["reply_to"]), bounce_to=_message_addresses(recipients["bounce_to"]), disposition_notification_to=_message_addresses(recipients["disposition_notification_to"]), attachments=_attachment_summaries(resolution), issues=[MessageIssue(severity="info", code="inactive_entry", message="Entry is inactive", behavior=config.validation_policy.inactive_entry.value, source="entry")], ) return BuiltMessage(draft=draft, mime=None) if not recipients["to"]: behavior = config.validation_policy.missing_email.value issues.append(_issue_from_behavior(code="missing_email", message="No effective To recipient is configured", behavior=behavior, source="recipients")) validation_status = MessageValidationStatus.BLOCKED if behavior == MissingAddressBehavior.BLOCK.value else MessageValidationStatus.EXCLUDED subject_template, text_template, html_template = _load_template_parts(config, campaign_file) values = _template_values(config, entry) keep_missing_placeholders = not config.validation_policy.ignore_empty_fields subject = _render_template(subject_template, values, keep_missing=keep_missing_placeholders) text_body = _render_template(text_template or "", values, keep_missing=keep_missing_placeholders) if text_template is not None else None html_body = _render_template(html_template or "", values, keep_missing=keep_missing_placeholders) if html_template is not None else None unresolved = sorted( _find_unresolved_placeholders(subject) | _find_unresolved_placeholders(text_body) | _find_unresolved_placeholders(html_body) ) if unresolved: behavior = config.validation_policy.template_error.value issues.append( _issue_from_behavior( code="template_error", message="Unresolved template placeholder(s): " + ", ".join(unresolved), behavior=behavior, source="template", ) ) validation_status = MessageValidationStatus.BLOCKED if behavior == MissingAddressBehavior.BLOCK.value else MessageValidationStatus.EXCLUDED message = EmailMessage() try: message["Date"] = formatdate(localtime=True) message["Message-ID"] = make_msgid() if sender: message["From"] = _format_recipient(sender) if recipients["to"]: message["To"] = _format_recipient_header(recipients["to"]) if recipients["cc"]: message["Cc"] = _format_recipient_header(recipients["cc"]) # Bcc deliberately remains envelope-only and is tracked in MessageDraft. if recipients["reply_to"]: message["Reply-To"] = _format_recipient_header(recipients["reply_to"]) if recipients["disposition_notification_to"]: message["Disposition-Notification-To"] = _format_recipient_header(recipients["disposition_notification_to"]) # bounce_to is tracked but not emitted as Return-Path. That should be the SMTP envelope sender. message["Subject"] = subject if html_body is not None: message.set_content(text_body or "") message.add_alternative(html_body, subtype="html") else: message.set_content(text_body or "") if work_dir is None: work_dir = output_dir or Path(tempfile.mkdtemp(prefix="multimailer-build-")) attachment_count = _attach_files( message=message, config=config, entry=entry, resolution=resolution, values=values, work_dir=work_dir, ) build_status = BuildStatus.BUILT except Exception as exc: issues.append(MessageIssue(severity="error", code="build_failed", message=str(exc), behavior="block", source="builder")) validation_status = MessageValidationStatus.BLOCKED build_status = BuildStatus.BUILD_FAILED attachment_count = 0 message = None # type: ignore[assignment] eml_path: str | None = None eml_size: int | None = None if write_eml and output_dir is not None and message is not None: eml_path, eml_size = _write_eml(message, output_dir, entry, entry_index) draft = MessageDraft( entry_index=entry_index, entry_id=entry.id, active=entry.active, build_status=build_status, validation_status=validation_status, send_status=SendStatus.DRAFT, imap_status=_imap_initial_status(config) if build_status == BuildStatus.BUILT else ImapStatus.SKIPPED, subject=subject, from_=_message_address(sender), to=_message_addresses(recipients["to"]), cc=_message_addresses(recipients["cc"]), bcc=_message_addresses(recipients["bcc"]), reply_to=_message_addresses(recipients["reply_to"]), bounce_to=_message_addresses(recipients["bounce_to"]), disposition_notification_to=_message_addresses(recipients["disposition_notification_to"]), attachment_count=attachment_count, attachments=_attachment_summaries(resolution), issues=issues, eml_path=eml_path, eml_size_bytes=eml_size, ) return BuiltMessage(draft=draft, mime=message) def _unsent_attachment_issues( *, config: CampaignConfig, campaign_file: str | Path, built_messages: list[BuiltMessage], ) -> list[MessageIssue]: behavior = config.validation_policy.unsent_attachment_files.value if behavior == Behavior.CONTINUE.value: return [] matched_files = { Path(match).resolve() for built in built_messages for attachment in built.draft.attachments for match in attachment.matches } issues: list[MessageIssue] = [] for base_path in config.attachments.base_paths: if not base_path.unsent_warning: continue directory = _resolve(campaign_file, base_path.path) if not directory.exists() or not directory.is_dir(): continue all_files = sorted(path.resolve() for path in directory.rglob("*") if path.is_file()) unsent = [path for path in all_files if path not in matched_files] if not unsent: continue shown = ", ".join(str(path.relative_to(directory)) for path in unsent[:10]) if len(unsent) > 10: shown += f", … (+{len(unsent) - 10} more)" issues.append( _issue_from_behavior( code="unsent_attachment_files", message=f"{len(unsent)} file(s) in attachment source {base_path.name!r} are not used by any message: {shown}", behavior=behavior, source=f"attachments:{base_path.name}", ) ) return issues def _apply_campaign_level_issues(built_messages: list[BuiltMessage], issues: list[MessageIssue]) -> None: if not issues: return for built in built_messages: if not built.draft.active: continue built.draft.issues.extend(issues) status = built.draft.validation_status for issue in issues: if issue.behavior: status = _apply_behavior(status, issue.behavior) built.draft.validation_status = status def build_campaign_messages( config: CampaignConfig, *, campaign_file: str | Path, output_dir: str | Path | None = None, write_eml: bool = False, ) -> CampaignBuildResult: campaign_path = Path(campaign_file).resolve() entries = load_campaign_entries(config, campaign_file=campaign_path) output_path = Path(output_dir).resolve() if output_dir is not None else None with tempfile.TemporaryDirectory(prefix="multimailer-build-") as tmp: work_dir = output_path or Path(tmp) built_messages = [ build_entry_message( config=config, campaign_file=campaign_path, entry=entry, entry_index=index, output_dir=output_path, write_eml=write_eml, work_dir=work_dir, ) for index, entry in enumerate(entries, start=1) ] _apply_campaign_level_issues( built_messages, _unsent_attachment_issues(config=config, campaign_file=campaign_path, built_messages=built_messages), ) report = CampaignBuildReport( campaign_id=config.campaign.id, campaign_name=config.campaign.name, campaign_file=str(campaign_path), entries_count=len(entries), messages=[built.draft for built in built_messages], ) return CampaignBuildResult(report=report, built_messages=built_messages)