from __future__ import annotations import imaplib import re import socket import ssl import time from dataclasses import dataclass from app.mailer.campaign.models import ImapConfig, TransportSecurity from app.mailer.dev.mock_mailbox import ( MOCK_IMAP_FOLDERS, consume_fail_next_imap, is_mock_imap_host, record_imap_append, ) class ImapConfigurationError(ValueError): """Raised when IMAP settings are incomplete or inconsistent.""" class ImapAppendError(RuntimeError): """Raised when APPENDing to Sent fails. temporary=True means retrying later may help. temporary=False means the configuration or mailbox choice probably needs user/admin attention. """ def __init__(self, message: str, *, temporary: bool | None = None): super().__init__(message) self.temporary = temporary @dataclass(frozen=True, slots=True) class ImapLoginTestResult: host: str port: int security: str authenticated: bool @dataclass(frozen=True, slots=True) class ImapMailboxInfo: name: str flags: list[str] @dataclass(frozen=True, slots=True) class ImapFolderListResult: host: str port: int security: str folders: list[ImapMailboxInfo] detected_sent_folder: str | None = None @dataclass(frozen=True, slots=True) class ImapAppendResult: host: str port: int security: str folder: str bytes_appended: int response: str | None = None def _require_imap_config(config: ImapConfig) -> tuple[str, int]: if not config.enabled: raise ImapConfigurationError("IMAP is disabled") if not config.host: raise ImapConfigurationError("IMAP host is required") if not config.port: raise ImapConfigurationError("IMAP port is required") if bool(config.username) != bool(config.password): raise ImapConfigurationError("IMAP username and password must be provided together, or both omitted") return config.host, config.port def _open_imap(config: ImapConfig) -> imaplib.IMAP4: host, port = _require_imap_config(config) context = ssl.create_default_context() try: if config.security == TransportSecurity.TLS: client: imaplib.IMAP4 = imaplib.IMAP4_SSL(host=host, port=port, timeout=config.timeout_seconds, ssl_context=context) else: client = imaplib.IMAP4(host=host, port=port, timeout=config.timeout_seconds) if config.security == TransportSecurity.STARTTLS: typ, data = client.starttls(ssl_context=context) if typ != "OK": raise ImapAppendError(f"IMAP STARTTLS failed: {data!r}", temporary=True) if config.username and config.password: typ, data = client.login(config.username, config.password) if typ != "OK": raise ImapAppendError(f"IMAP login failed: {data!r}", temporary=False) return client except Exception: try: client.logout() # type: ignore[possibly-undefined] except Exception: pass raise def _decode_item(item: bytes | str | None) -> str: if item is None: return "" if isinstance(item, bytes): return item.decode("utf-8", errors="replace") return item def _unquote_imap_token(value: str) -> str: value = value.strip() if len(value) >= 2 and value[0] == '"' and value[-1] == '"': value = value[1:-1] value = value.replace('\\"', '"').replace('\\\\', '\\') return value def _extract_mailbox_name(list_response_line: bytes | str) -> tuple[str, set[str]] | None: r"""Best-effort parser for IMAP LIST response lines. RFC 3501 LIST responses contain attributes, hierarchy delimiter, then mailbox name. Some servers quote both delimiter and mailbox:: (\HasNoChildren \Sent) "/" "Sent" Others quote only the delimiter and leave the mailbox as an atom:: (\HasNoChildren \Sent) "/" Sent The parser must therefore parse the delimiter token separately instead of blindly taking the last quoted value. """ line = _decode_item(list_response_line).strip() match = re.match( r'^\((?P[^)]*)\)\s+' r'(?P"(?:[^"\\]|\\.)*"|NIL|[^\s]+)\s+' r'(?P.+?)\s*$', line, re.IGNORECASE, ) if match: flags = {part.lower() for part in match.group("flags").split()} mailbox = _unquote_imap_token(match.group("mailbox")) if mailbox: return mailbox, flags return None # Fallback for non-standard server lines: prefer the final token. parts = line.split(maxsplit=2) if len(parts) >= 3: flags_text = parts[0].strip("()") flags = {part.lower() for part in flags_text.split()} mailbox = _unquote_imap_token(parts[2]) if mailbox: return mailbox, flags return None def _detect_sent_folder(parsed: list[tuple[str, set[str]]]) -> str | None: for name, flags in parsed: if "\\sent" in flags or "\\sentmail" in flags: return name common_names = [ "Sent", "Sent Items", "Sent Messages", "Gesendet", "Gesendete Elemente", "INBOX.Sent", "INBOX/Sent", ] names = {name.lower(): name for name, _ in parsed} for candidate in common_names: if candidate.lower() in names: return names[candidate.lower()] return None def discover_sent_folder(client: imaplib.IMAP4) -> str | None: typ, data = client.list() if typ != "OK" or not data: return None parsed: list[tuple[str, set[str]]] = [] for item in data: extracted = _extract_mailbox_name(item) if extracted: parsed.append(extracted) return _detect_sent_folder(parsed) def _effective_sent_folder(*, config: ImapConfig, requested_folder: str | None, client: imaplib.IMAP4) -> str: if requested_folder and requested_folder != "auto": return requested_folder if config.sent_folder and config.sent_folder != "auto": return config.sent_folder discovered = discover_sent_folder(client) if discovered: return discovered raise ImapConfigurationError("Could not discover Sent folder; configure delivery.imap_append_sent.folder or server.imap.sent_folder") def test_imap_login(*, imap_config: ImapConfig) -> ImapLoginTestResult: """Open an IMAP connection and authenticate if credentials are configured. This is a side-effect-free connection test for the WebUI. It does not select a mailbox and does not append any message. """ host, port = _require_imap_config(imap_config) if is_mock_imap_host(imap_config.host): return ImapLoginTestResult( host=host, port=port, security=imap_config.security.value, authenticated=bool(imap_config.username and imap_config.password), ) client = _open_imap(imap_config) try: return ImapLoginTestResult( host=host, port=port, security=imap_config.security.value, authenticated=bool(imap_config.username and imap_config.password), ) finally: try: client.logout() except Exception: pass def list_imap_folders(*, imap_config: ImapConfig) -> ImapFolderListResult: """Return folders visible through IMAP LIST and the best sent-folder guess.""" host, port = _require_imap_config(imap_config) if is_mock_imap_host(imap_config.host): folders = [ImapMailboxInfo(name=str(item["name"]), flags=list(item.get("flags") or [])) for item in MOCK_IMAP_FOLDERS] return ImapFolderListResult( host=host, port=port, security=imap_config.security.value, folders=folders, detected_sent_folder="Sent", ) client = _open_imap(imap_config) try: typ, data = client.list() if typ != "OK": raise ImapAppendError(f"IMAP folder listing failed: {data!r}", temporary=True) parsed: list[tuple[str, set[str]]] = [] folders: list[ImapMailboxInfo] = [] for item in data or []: extracted = _extract_mailbox_name(item) if not extracted: continue name, flags = extracted parsed.append((name, flags)) folders.append(ImapMailboxInfo(name=name, flags=sorted(flags))) return ImapFolderListResult( host=host, port=port, security=imap_config.security.value, folders=folders, detected_sent_folder=_detect_sent_folder(parsed), ) finally: try: client.logout() except Exception: pass def append_message_to_sent( message_bytes: bytes, *, imap_config: ImapConfig, folder: str | None = None, ) -> ImapAppendResult: """Append a sent MIME message to the configured IMAP Sent folder. The SMTP send remains authoritative. APPEND is a separate best-effort step and should not be used to decide whether an email was sent. """ host, port = _require_imap_config(imap_config) if is_mock_imap_host(imap_config.host): if consume_fail_next_imap(): raise ImapAppendError("Mock IMAP configured to fail the next append", temporary=False) target_folder = folder or (imap_config.sent_folder if imap_config.sent_folder and imap_config.sent_folder != "auto" else "Sent") record = record_imap_append(message_bytes, folder=target_folder, imap_host=imap_config.host) return ImapAppendResult( host=host, port=port, security=imap_config.security.value, folder=target_folder, bytes_appended=len(message_bytes), response=f"mock append stored as {record.id}", ) client: imaplib.IMAP4 | None = None try: client = _open_imap(imap_config) target_folder = _effective_sent_folder(config=imap_config, requested_folder=folder, client=client) internal_date = imaplib.Time2Internaldate(time.time()) typ, data = client.append(target_folder, "\\Seen", internal_date, message_bytes) if typ != "OK": raise ImapAppendError(f"IMAP APPEND failed for folder {target_folder!r}: {data!r}", temporary=False) response = "; ".join(_decode_item(item) for item in (data or [])) or None return ImapAppendResult( host=host, port=port, security=imap_config.security.value, folder=target_folder, bytes_appended=len(message_bytes), response=response, ) except ImapAppendError: raise except (OSError, socket.timeout, imaplib.IMAP4.abort) as exc: raise ImapAppendError(f"IMAP append failed: {exc}", temporary=True) from exc except imaplib.IMAP4.error as exc: raise ImapAppendError(f"IMAP append failed: {exc}", temporary=False) from exc finally: if client is not None: try: client.logout() except Exception: pass