from __future__ import annotations import csv from enum import StrEnum from pathlib import Path from typing import Iterable from pydantic import BaseModel, ConfigDict, Field from .field_values import ignored_entry_field_overrides from .models import AttachmentConfig, CampaignConfig, EntryConfig, SourceType class Severity(StrEnum): INFO = "info" WARNING = "warning" ERROR = "error" class SemanticIssue(BaseModel): model_config = ConfigDict(extra="forbid") severity: Severity code: str message: str path: str | None = None class SemanticReport(BaseModel): model_config = ConfigDict(extra="forbid") campaign_id: str campaign_name: str issues: list[SemanticIssue] = Field(default_factory=list) entries_mode: str entries_count: int | None = None attachments_base_path: str rate_limit: str imap_append_enabled: bool @property def error_count(self) -> int: return sum(1 for issue in self.issues if issue.severity == Severity.ERROR) @property def warning_count(self) -> int: return sum(1 for issue in self.issues if issue.severity == Severity.WARNING) @property def ok(self) -> bool: return self.error_count == 0 def _issue(severity: Severity, code: str, message: str, path: str | None = None) -> SemanticIssue: return SemanticIssue(severity=severity, code=code, message=message, path=path) def _resolve(campaign_file: Path, raw_path: str) -> Path: path = Path(raw_path).expanduser() if path.is_absolute(): return path return (campaign_file.parent / path).resolve() def _mapping_target_field_name(target: str) -> str | None: if target.startswith("fields."): return target.split(".", 1)[1] return None def _mapping_target_known(target: str, field_names: set[str]) -> bool: direct_targets = { "id", "active", "last_sent", "combine_to", "combine_cc", "combine_bcc", "combine_reply_to", "combine_bounce_to", "combine_disposition_notification_to", "combine_attachments", } if target in direct_targets: return True if target.startswith("fields."): name = target.split(".", 1)[1] return not field_names or name in field_names if target.startswith("from."): return target in {"from.email", "from.name", "from.type"} for prefix in ["to", "cc", "bcc", "reply_to", "bounce_to", "disposition_notification_to"]: if target.startswith(prefix + "."): parts = target.split(".") return len(parts) == 3 and parts[1].isdigit() and parts[2] in {"email", "name", "type"} if target.startswith("attachments."): parts = target.split(".") # attachments.0.zip.filename_template etc. if len(parts) >= 3 and parts[1].isdigit(): if parts[2] in { "id", "label", "base_dir", "file_filter", "include_subdirs", "required", "allow_multiple", "missing_behavior", "ambiguous_behavior", }: return len(parts) == 3 if parts[2] == "zip" and len(parts) == 4: return parts[3] in {"enabled", "filename_template", "password_template", "method"} return False def _csv_header(path: Path, delimiter: str, encoding: str) -> list[str] | None: with path.open("r", encoding=encoding, newline="") as handle: reader = csv.reader(handle, delimiter=delimiter) try: return next(reader) except StopIteration: return [] def _iter_template_source_paths(config: CampaignConfig) -> Iterable[tuple[str, str]]: if not config.template.source: return [] source = config.template.source paths: list[tuple[str, str]] = [] if source.subject_path: paths.append(("/template/source/subject_path", source.subject_path)) if source.text_path: paths.append(("/template/source/text_path", source.text_path)) if source.html_path: paths.append(("/template/source/html_path", source.html_path)) return paths def _attachment_base_path_report_value(config: CampaignConfig) -> str: if config.attachments.base_paths: return ", ".join(f"{base_path.name}: {base_path.path}" for base_path in config.attachments.base_paths) return config.attachments.base_path def _iter_attachment_rules(config: CampaignConfig) -> Iterable[tuple[str, AttachmentConfig, bool]]: for index, attachment_config in enumerate(config.attachments.global_): yield f"/attachments/global/{index}", attachment_config, False inline_entries = config.entries.inline or [] if config.entries.is_inline else [] for entry_index, entry in enumerate(inline_entries): for attachment_index, attachment_config in enumerate(entry.attachments): yield f"/entries/inline/{entry_index}/attachments/{attachment_index}", attachment_config, True if config.entries.defaults: for attachment_index, attachment_config in enumerate(config.entries.defaults.attachments): yield f"/entries/defaults/attachments/{attachment_index}", attachment_config, True def _attachment_path_issues(config: CampaignConfig) -> list[SemanticIssue]: issues: list[SemanticIssue] = [] configured_paths = {base_path.path for base_path in config.attachments.base_paths} individual_paths = config.attachments.individual_base_path_values if config.attachments.base_paths: for index, base_path in enumerate(config.attachments.base_paths): if not base_path.name.strip(): issues.append(_issue(Severity.WARNING, "attachment_base_path_missing_name", "attachment base path has no display name", f"/attachments/base_paths/{index}/name")) if not base_path.path.strip(): issues.append(_issue(Severity.ERROR, "attachment_base_path_missing_path", "attachment base path has no path", f"/attachments/base_paths/{index}/path")) elif not config.attachments.base_path: issues.append(_issue(Severity.INFO, "missing_attachment_base_path", "Attachment base path is not configured yet.", "/attachments/base_path")) if configured_paths: for path, attachment_config, is_individual in _iter_attachment_rules(config): if attachment_config.base_dir and attachment_config.base_dir not in configured_paths: issues.append(_issue( Severity.WARNING, "unknown_attachment_base_path", f"attachment rule refers to base path {attachment_config.base_dir!r}, but it is not listed in attachments.base_paths", f"{path}/base_dir", )) if is_individual and individual_paths and attachment_config.base_dir not in individual_paths: issues.append(_issue( Severity.WARNING, "individual_attachment_base_path_not_allowed", f"individual attachment rule uses base path {attachment_config.base_dir!r}, but that base path does not allow individual attachments", f"{path}/base_dir", )) return issues def _ignored_override_issues(config: CampaignConfig, entry: EntryConfig, path_prefix: str) -> list[SemanticIssue]: return [ _issue( Severity.WARNING, "field_override_not_allowed", f"recipient value for field {field_name!r} will be ignored because the field does not allow overrides", f"{path_prefix}/fields/{field_name}", ) for field_name in ignored_entry_field_overrides(config, entry) ] def validate_campaign_config( config: CampaignConfig, *, campaign_file: str | Path | None = None, check_files: bool = False, ) -> SemanticReport: campaign_path = Path(campaign_file).resolve() if campaign_file else Path.cwd() / "campaign.json" issues: list[SemanticIssue] = [] field_names = config.field_names field_definitions = {field.name: field for field in config.fields} declared_names = set(field_definitions) for key in config.global_values: if declared_names and key not in declared_names: issues.append(_issue( Severity.WARNING, "unknown_global_value", f"global_values contains {key!r}, but it is not declared in fields", f"/global_values/{key}", )) issues.extend(_attachment_path_issues(config)) if config.server.imap and config.server.imap.enabled: missing = [name for name in ["host", "port", "username", "password"] if getattr(config.server.imap, name) in (None, "")] if missing: issues.append(_issue( Severity.ERROR, "incomplete_imap_config", "IMAP append is enabled, but these IMAP settings are missing: " + ", ".join(missing), "/server/imap", )) if config.delivery.imap_append_sent.enabled and not (config.server.imap and config.server.imap.enabled): issues.append(_issue( Severity.WARNING, "delivery_imap_enabled_without_server_imap", "delivery.imap_append_sent is enabled, but server.imap.enabled is not true", "/delivery/imap_append_sent/enabled", )) if config.campaign.mode == "send" and not config.server.smtp: issues.append(_issue( Severity.ERROR, "missing_smtp_config", "campaign mode is 'send', but no server.smtp configuration is present", "/server/smtp", )) if config.server.smtp: missing = [name for name in ["host", "port"] if getattr(config.server.smtp, name) in (None, "")] if missing: issues.append(_issue( Severity.WARNING, "incomplete_smtp_config", "SMTP settings are present, but these settings are missing: " + ", ".join(missing), "/server/smtp", )) if config.entries.is_inline: inline_entries = config.entries.inline or [] entries_count = len(inline_entries) entries_mode = "inline" if entries_count == 0: issues.append(_issue(Severity.WARNING, "no_inline_entries", "entries.inline is empty", "/entries/inline")) for index, entry in enumerate(inline_entries): issues.extend(_ignored_override_issues(config, entry, f"/entries/inline/{index}")) else: entries_count = None entries_mode = f"external:{config.entries.source.type.value if config.entries.source else 'unknown'}" mapping = config.entries.mapping or {} if not mapping: issues.append(_issue(Severity.ERROR, "empty_mapping", "external entries require a non-empty mapping", "/entries/mapping")) for target in mapping: if not _mapping_target_known(target, field_names): issues.append(_issue( Severity.WARNING, "unknown_mapping_target", f"mapping target {target!r} is not recognized by the current campaign model", f"/entries/mapping/{target}", )) field_name = _mapping_target_field_name(target) if field_name and field_name in field_definitions and not field_definitions[field_name].can_override: issues.append(_issue( Severity.WARNING, "mapping_target_not_overridable", f"mapping target {target!r} points to a field that does not allow recipient overrides; mapped values will be ignored", f"/entries/mapping/{target}", )) if config.entries.defaults: issues.extend(_ignored_override_issues(config, config.entries.defaults, "/entries/defaults")) if check_files and config.entries.source: source_path = _resolve(campaign_path, config.entries.source.path) if not source_path.exists(): issues.append(_issue( Severity.ERROR, "entries_source_not_found", f"entries source file does not exist: {source_path}", "/entries/source/path", )) elif config.entries.source.type == SourceType.CSV and config.entries.source.has_header: try: header = _csv_header(source_path, config.entries.source.delimiter, config.entries.source.encoding) header_set = set(header or []) missing_columns = sorted({source_name for source_name in mapping.values() if source_name not in header_set}) if missing_columns: issues.append(_issue( Severity.ERROR, "mapping_columns_missing", "CSV mapping refers to missing columns: " + ", ".join(missing_columns), "/entries/mapping", )) except OSError as exc: issues.append(_issue(Severity.ERROR, "entries_source_read_error", str(exc), "/entries/source/path")) if check_files: if config.attachments.base_paths: for index, base_path_config in enumerate(config.attachments.base_paths): attachments_base_path = _resolve(campaign_path, base_path_config.path) if not attachments_base_path.exists(): issues.append(_issue( Severity.WARNING, "attachments_base_path_not_found", f"attachment base path {base_path_config.name!r} does not exist: {attachments_base_path}", f"/attachments/base_paths/{index}/path", )) else: attachments_base_path = _resolve(campaign_path, config.attachments.base_path) if not attachments_base_path.exists(): issues.append(_issue( Severity.WARNING, "attachments_base_path_not_found", f"attachments.base_path does not exist: {attachments_base_path}", "/attachments/base_path", )) for schema_path, raw_path in _iter_template_source_paths(config): path = _resolve(campaign_path, raw_path) if not path.exists(): issues.append(_issue( Severity.ERROR, "template_source_not_found", f"template source file does not exist: {path}", schema_path, )) report = SemanticReport( campaign_id=config.campaign.id, campaign_name=config.campaign.name, issues=issues, entries_mode=entries_mode, entries_count=entries_count, attachments_base_path=_attachment_base_path_report_value(config), rate_limit=f"{config.delivery.rate_limit.messages_per_minute}/min, concurrency {config.delivery.rate_limit.concurrency}", imap_append_enabled=config.delivery.imap_append_sent.enabled, ) return report