from __future__ import annotations import json import re import shutil from dataclasses import dataclass from datetime import datetime, timezone from email import policy from email.message import EmailMessage from email.parser import BytesParser from pathlib import Path from typing import Any from uuid import uuid4 from app.settings import settings MOCK_SMTP_HOSTS = {"mock", "mock.smtp", "mock.smtp.local", "mock-mail", "__mock_smtp__"} MOCK_IMAP_HOSTS = {"mock", "mock.imap", "mock.imap.local", "mock-mail", "__mock_imap__"} MOCK_IMAP_FOLDERS = [ {"name": "INBOX", "flags": []}, {"name": "Sent", "flags": ["\\Sent"]}, {"name": "Drafts", "flags": ["\\Drafts"]}, {"name": "Trash", "flags": ["\\Trash"]}, {"name": "Archive", "flags": ["\\Archive"]}, ] @dataclass(frozen=True, slots=True) class MockDeliveryRecord: id: str kind: str created_at: str envelope_from: str | None envelope_recipients: list[str] subject: str | None from_header: str | None to_header: str | None cc_header: str | None bcc_header: str | None message_id: str | None size_bytes: int body_preview: str | None attachment_count: int folder: str | None = None smtp_host: str | None = None imap_host: str | None = None raw_filename: str | None = None headers: dict[str, str] | None = None attachments: list[dict[str, Any]] | None = None def as_dict(self, *, include_raw: bool = False) -> dict[str, Any]: data = { "id": self.id, "kind": self.kind, "created_at": self.created_at, "envelope_from": self.envelope_from, "envelope_recipients": self.envelope_recipients, "subject": self.subject, "from_header": self.from_header, "to_header": self.to_header, "cc_header": self.cc_header, "bcc_header": self.bcc_header, "message_id": self.message_id, "size_bytes": self.size_bytes, "body_preview": self.body_preview, "attachment_count": self.attachment_count, "folder": self.folder, "smtp_host": self.smtp_host, "imap_host": self.imap_host, "raw_filename": self.raw_filename, "headers": self.headers or {}, "attachments": self.attachments or [], } if include_raw: data["raw_eml"] = read_raw_message(self.id) return data def _base_dir() -> Path: path = Path(settings.mock_mailbox_dir) path.mkdir(parents=True, exist_ok=True) (path / "messages").mkdir(parents=True, exist_ok=True) return path def _message_dir() -> Path: return _base_dir() / "messages" def _failure_path() -> Path: return _base_dir() / "failures.json" def normalize_mock_host(host: str | None) -> str: return (host or "").strip().lower() def is_mock_smtp_host(host: str | None) -> bool: return normalize_mock_host(host) in MOCK_SMTP_HOSTS def is_mock_imap_host(host: str | None) -> bool: return normalize_mock_host(host) in MOCK_IMAP_HOSTS def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _message_to_bytes(message: EmailMessage | bytes) -> bytes: if isinstance(message, bytes): return message return message.as_bytes(policy=policy.SMTP) def _parse_message(raw: bytes) -> EmailMessage: return BytesParser(policy=policy.default).parsebytes(raw) def _header_text(message: EmailMessage, name: str) -> str | None: value = message.get(name) return str(value) if value is not None else None def _body_preview(message: EmailMessage) -> str | None: body = None try: if message.is_multipart(): body = message.get_body(preferencelist=("plain", "html")) else: body = message if body is None: return None content = body.get_content() except Exception: return None text = re.sub(r"\s+", " ", str(content)).strip() if not text: return None return text[:600] def _attachment_summaries(message: EmailMessage) -> list[dict[str, Any]]: attachments: list[dict[str, Any]] = [] for part in message.iter_attachments(): payload = part.get_payload(decode=True) or b"" attachments.append( { "filename": part.get_filename(), "content_type": part.get_content_type(), "size_bytes": len(payload), } ) return attachments def _headers(message: EmailMessage) -> dict[str, str]: return {str(key): str(value) for key, value in message.items()} def _record_from_raw( raw: bytes, *, kind: str, envelope_from: str | None = None, envelope_recipients: list[str] | None = None, folder: str | None = None, smtp_host: str | None = None, imap_host: str | None = None, ) -> MockDeliveryRecord: message_id = uuid4().hex raw_filename = f"{message_id}.eml" json_filename = f"{message_id}.json" message = _parse_message(raw) attachments = _attachment_summaries(message) record = MockDeliveryRecord( id=message_id, kind=kind, created_at=_now_iso(), envelope_from=envelope_from, envelope_recipients=list(envelope_recipients or []), subject=_header_text(message, "Subject"), from_header=_header_text(message, "From"), to_header=_header_text(message, "To"), cc_header=_header_text(message, "Cc"), bcc_header=_header_text(message, "Bcc"), message_id=_header_text(message, "Message-ID"), size_bytes=len(raw), body_preview=_body_preview(message), attachment_count=len(attachments), folder=folder, smtp_host=smtp_host, imap_host=imap_host, raw_filename=raw_filename, headers=_headers(message), attachments=attachments, ) message_dir = _message_dir() (message_dir / raw_filename).write_bytes(raw) (message_dir / json_filename).write_text(json.dumps(record.as_dict(), indent=2, ensure_ascii=False), encoding="utf-8") return record def record_smtp_delivery( message: EmailMessage, *, envelope_from: str, envelope_recipients: list[str], smtp_host: str | None = None, ) -> MockDeliveryRecord: return _record_from_raw( _message_to_bytes(message), kind="smtp", envelope_from=envelope_from, envelope_recipients=envelope_recipients, smtp_host=smtp_host, ) def record_imap_append( message_bytes: bytes, *, folder: str, imap_host: str | None = None, ) -> MockDeliveryRecord: return _record_from_raw(message_bytes, kind="imap_append", folder=folder, imap_host=imap_host) def _load_record(record_id: str) -> dict[str, Any] | None: path = _message_dir() / f"{record_id}.json" if not path.exists(): return None try: return json.loads(path.read_text(encoding="utf-8")) except json.JSONDecodeError: return None def list_records(*, kind: str | None = None, limit: int = 100) -> list[dict[str, Any]]: records: list[dict[str, Any]] = [] for path in _message_dir().glob("*.json"): try: record = json.loads(path.read_text(encoding="utf-8")) except json.JSONDecodeError: continue if kind and record.get("kind") != kind: continue records.append(record) records.sort(key=lambda item: str(item.get("created_at") or ""), reverse=True) return records[: max(1, min(limit, 500))] def get_record(record_id: str, *, include_raw: bool = True) -> dict[str, Any] | None: record = _load_record(record_id) if record and include_raw: record["raw_eml"] = read_raw_message(record_id) return record def read_raw_message(record_id: str) -> str | None: record = _load_record(record_id) if not record: return None raw_filename = record.get("raw_filename") if not raw_filename: return None raw_path = _message_dir() / str(raw_filename) if not raw_path.exists(): return None return raw_path.read_text(encoding="utf-8", errors="replace") def clear_records() -> int: message_dir = _message_dir() count = len(list(message_dir.glob("*.json"))) if message_dir.exists(): shutil.rmtree(message_dir) message_dir.mkdir(parents=True, exist_ok=True) return count def get_failures() -> dict[str, Any]: path = _failure_path() if not path.exists(): return { "fail_next_smtp": False, "fail_next_imap": False, "smtp_reject_recipients_containing": None, } try: data = json.loads(path.read_text(encoding="utf-8")) except json.JSONDecodeError: data = {} return { "fail_next_smtp": bool(data.get("fail_next_smtp")), "fail_next_imap": bool(data.get("fail_next_imap")), "smtp_reject_recipients_containing": data.get("smtp_reject_recipients_containing") or None, } def set_failures(*, fail_next_smtp: bool | None = None, fail_next_imap: bool | None = None, smtp_reject_recipients_containing: str | None = None) -> dict[str, Any]: current = get_failures() if fail_next_smtp is not None: current["fail_next_smtp"] = fail_next_smtp if fail_next_imap is not None: current["fail_next_imap"] = fail_next_imap current["smtp_reject_recipients_containing"] = smtp_reject_recipients_containing or None _failure_path().write_text(json.dumps(current, indent=2), encoding="utf-8") return current def consume_fail_next_smtp() -> bool: current = get_failures() if current.get("fail_next_smtp"): current["fail_next_smtp"] = False _failure_path().write_text(json.dumps(current, indent=2), encoding="utf-8") return True return False def consume_fail_next_imap() -> bool: current = get_failures() if current.get("fail_next_imap"): current["fail_next_imap"] = False _failure_path().write_text(json.dumps(current, indent=2), encoding="utf-8") return True return False