from __future__ import annotations import json from pathlib import Path from typing import Any import copy from datetime import UTC, datetime from uuid import uuid4 from sqlalchemy import func from sqlalchemy.orm import Session from app.db.models import ( Campaign, CampaignIssue, CampaignJob, CampaignStatus, CampaignVersion, CampaignVersionWorkflowState, JobBuildStatus, JobImapStatus, JobQueueStatus, JobSendStatus, JobValidationStatus, ) from app.mailer.campaign.loader import load_campaign_config from app.mailer.campaign.validation import Severity, validate_campaign_config from app.mailer.messages.builder import build_campaign_messages from app.mailer.messages.models import MessageDraft from app.storage.services import record_campaign_attachment_uses_for_job from app.storage.campaign_attachments import ( annotate_built_messages_with_managed_files, prepared_campaign_snapshot, public_attachment_summary_payload, ) RUNTIME_DIR = Path(__file__).resolve().parents[3] / "runtime" CAMPAIGN_SNAPSHOT_DIR = RUNTIME_DIR / "campaign_snapshots" BUILD_OUTPUT_DIR = RUNTIME_DIR / "generated_eml" class CampaignPersistenceError(RuntimeError): pass def _ensure_dirs() -> None: CAMPAIGN_SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True) BUILD_OUTPUT_DIR.mkdir(parents=True, exist_ok=True) def _write_campaign_snapshot(version: CampaignVersion) -> Path: _ensure_dirs() path = CAMPAIGN_SNAPSHOT_DIR / f"{version.id}.json" path.write_text(json.dumps(version.raw_json, ensure_ascii=False, indent=2), encoding="utf-8") return path def _next_version_number(session: Session, campaign_id: str) -> int: current = session.query(func.max(CampaignVersion.version_number)).filter(CampaignVersion.campaign_id == campaign_id).scalar() return int(current or 0) + 1 def _resolve_runtime_path(base_path: Path | None, value: str | None) -> str | None: if not value or base_path is None: return value path = Path(value).expanduser() if path.is_absolute(): return str(path) return str((base_path / path).resolve()) def normalize_campaign_paths(raw_json: dict[str, Any], source_base_path: str | Path | None) -> dict[str, Any]: """Return a DB/runtime-safe campaign JSON snapshot. The CLI naturally resolves relative paths against the campaign.json file. Once the campaign is stored in the database, the JSON snapshot lives in app/mailer/runtime/campaign_snapshots. To keep existing file-based campaigns working, relative file paths are normalized to absolute paths at import time when a source_base_path is known. """ base = Path(source_base_path).expanduser().resolve() if source_base_path else None data = copy.deepcopy(raw_json) template_source = data.get("template", {}).get("source") if isinstance(data.get("template"), dict) else None if isinstance(template_source, dict): for key in ("subject_path", "text_path", "html_path"): template_source[key] = _resolve_runtime_path(base, template_source.get(key)) entries_source = data.get("entries", {}).get("source") if isinstance(data.get("entries"), dict) else None if isinstance(entries_source, dict): entries_source["path"] = _resolve_runtime_path(base, entries_source.get("path")) attachments = data.get("attachments") if isinstance(attachments, dict): attachments["base_path"] = _resolve_runtime_path(base, attachments.get("base_path")) or "." return data def create_campaign_version_from_json( session: Session, *, tenant_id: str, user_id: str | None, raw_json: dict[str, Any], source_filename: str | None = None, source_base_path: str | None = None, ) -> tuple[Campaign, CampaignVersion]: if source_base_path is None and source_filename: source_path = Path(source_filename).expanduser() source_base_path = str(source_path.parent if source_path.suffix else source_path) runtime_json = normalize_campaign_paths(raw_json, source_base_path) # load_campaign_config is file-oriented. Use a temporary snapshot for schema/Pydantic validation. _ensure_dirs() tmp_path = CAMPAIGN_SNAPSHOT_DIR / "_incoming_campaign.json" tmp_path.write_text(json.dumps(runtime_json, ensure_ascii=False, indent=2), encoding="utf-8") config = load_campaign_config(tmp_path) campaign = ( session.query(Campaign) .filter(Campaign.tenant_id == tenant_id, Campaign.external_id == config.campaign.id) .one_or_none() ) if campaign is None: campaign = Campaign( tenant_id=tenant_id, created_by_user_id=user_id, external_id=config.campaign.id, name=config.campaign.name, description=config.campaign.description, status=CampaignStatus.DRAFT.value, ) session.add(campaign) session.flush() else: current = session.get(CampaignVersion, campaign.current_version_id) if campaign.current_version_id else None if current and not _version_is_audit_safe_snapshot(current): raise CampaignPersistenceError( f"Campaign already has active working version #{current.version_number}. " "Continue editing or unlock that version instead of importing a parallel draft." ) campaign.name = config.campaign.name campaign.description = config.campaign.description version = CampaignVersion( campaign_id=campaign.id, version_number=_next_version_number(session, campaign.id), raw_json=runtime_json, schema_version=raw_json.get("version", "1.0"), source_filename=source_filename, source_base_path=source_base_path, ) session.add(version) session.flush() campaign.current_version_id = version.id session.add(campaign) _write_campaign_snapshot(version) session.commit() return campaign, version def _version_user_lock_state(version: CampaignVersion) -> str | None: state = getattr(version, "user_lock_state", None) if state in {"temporary", "permanent"}: return state return "permanent" if version.published_at else None def _version_is_user_locked(version: CampaignVersion) -> bool: return _version_user_lock_state(version) is not None def _version_is_audit_safe_snapshot(version: CampaignVersion) -> bool: return _version_user_lock_state(version) == "permanent" or version.workflow_state in { CampaignVersionWorkflowState.QUEUED.value, CampaignVersionWorkflowState.SENDING.value, CampaignVersionWorkflowState.COMPLETED.value, CampaignVersionWorkflowState.CANCELLED.value, CampaignVersionWorkflowState.ARCHIVED.value, } def _ensure_current_campaign_version(campaign: Campaign, version: CampaignVersion, *, action: str) -> None: if campaign.current_version_id != version.id: raise CampaignPersistenceError( f"Historical campaign versions are read-only and cannot be used to {action}. " "Open the current working version instead." ) def _version_is_validated_and_locked(version: CampaignVersion) -> bool: validation_summary = version.validation_summary if isinstance(version.validation_summary, dict) else {} return bool(version.locked_at and validation_summary.get("ok") is True and not _version_is_user_locked(version)) def _ensure_version_validated_and_locked(version: CampaignVersion) -> None: state = _version_user_lock_state(version) if state == "temporary": raise CampaignPersistenceError("This version has a temporary user lock. Unlock it before building, queueing, dry-run or sending.") if state == "permanent": raise CampaignPersistenceError("This version is permanently user-locked. Create an editable copy instead.") if not _version_is_validated_and_locked(version): raise CampaignPersistenceError("Campaign version must be validated and locked before building, queueing, dry-run or sending.") def load_version_config(session: Session, version_id: str): version = session.get(CampaignVersion, version_id) if not version: raise CampaignPersistenceError(f"Campaign version not found: {version_id}") path = _write_campaign_snapshot(version) return version, path, load_campaign_config(path) def validate_campaign_version( session: Session, *, tenant_id: str, version_id: str, check_files: bool = False, user_id: str | None = None, lock_on_success: bool = True, ) -> dict[str, Any]: version, snapshot_path, config = load_version_config(session, version_id) campaign = session.get(Campaign, version.campaign_id) if not campaign or campaign.tenant_id != tenant_id: raise CampaignPersistenceError("Campaign version is not accessible for this tenant") _ensure_current_campaign_version(campaign, version, action="validate") if _version_is_user_locked(version) or version.workflow_state in { CampaignVersionWorkflowState.QUEUED.value, CampaignVersionWorkflowState.SENDING.value, CampaignVersionWorkflowState.COMPLETED.value, CampaignVersionWorkflowState.CANCELLED.value, CampaignVersionWorkflowState.ARCHIVED.value, }: lock_label = "temporarily user-locked" if _version_user_lock_state(version) == "temporary" else "permanently locked/final" raise CampaignPersistenceError(f"{lock_label.capitalize()} campaign versions cannot be validated. Unlock or create an editable copy instead.") if check_files: with prepared_campaign_snapshot( session, tenant_id=tenant_id, campaign_id=campaign.id, raw_json=version.raw_json if isinstance(version.raw_json, dict) else {}, include_bytes=False, prefix="multimailer-managed-validate-", ) as prepared: managed_config = load_campaign_config(prepared.path) report = validate_campaign_config(managed_config, campaign_file=prepared.path, check_files=True) else: report = validate_campaign_config(config, campaign_file=snapshot_path, check_files=False) report_json = report.model_dump(mode="json") report_json.update({"ok": report.ok, "error_count": report.error_count, "warning_count": report.warning_count}) version.validation_summary = report_json # Replace version-level semantic issues from previous validations. ( session.query(CampaignIssue) .filter(CampaignIssue.campaign_version_id == version.id, CampaignIssue.job_id.is_(None)) .delete(synchronize_session=False) ) for issue in report.issues: session.add( CampaignIssue( tenant_id=tenant_id, campaign_id=campaign.id, campaign_version_id=version.id, severity=issue.severity.value, code=issue.code, message=issue.message, source=issue.path, ) ) campaign.status = CampaignStatus.VALIDATED.value if report.ok else CampaignStatus.NEEDS_REVIEW.value if report.ok: version.workflow_state = CampaignVersionWorkflowState.APPROVED.value version.is_complete = True if lock_on_success and version.locked_at is None: from datetime import UTC, datetime version.locked_at = datetime.now(UTC) version.locked_by_user_id = user_id else: version.workflow_state = CampaignVersionWorkflowState.EDITING.value session.add(version) session.add(campaign) session.commit() return report_json def _job_validation_status(value: str) -> str: allowed = {item.value for item in JobValidationStatus} return value if value in allowed else JobValidationStatus.NEEDS_REVIEW.value def _job_from_message( *, tenant_id: str, campaign_id: str, version_id: str, message: MessageDraft, ) -> CampaignJob: recipient_email = message.to[0].email if message.to else None return CampaignJob( tenant_id=tenant_id, campaign_id=campaign_id, campaign_version_id=version_id, entry_index=message.entry_index, entry_id=message.entry_id, recipient_email=recipient_email, subject=message.subject, eml_local_path=message.eml_path, eml_size_bytes=message.eml_size_bytes, build_status=message.build_status.value if hasattr(message.build_status, "value") else str(message.build_status), validation_status=_job_validation_status(message.validation_status.value), queue_status=JobQueueStatus.DRAFT.value, send_status=JobSendStatus.NOT_QUEUED.value, imap_status=message.imap_status.value if hasattr(message.imap_status, "value") else JobImapStatus.NOT_REQUESTED.value, resolved_recipients={ "from": message.from_.model_dump(mode="json") if message.from_ else None, "to": [item.model_dump(mode="json") for item in message.to], "cc": [item.model_dump(mode="json") for item in message.cc], "bcc": [item.model_dump(mode="json") for item in message.bcc], "reply_to": [item.model_dump(mode="json") for item in message.reply_to], "bounce_to": [item.model_dump(mode="json") for item in message.bounce_to], "disposition_notification_to": [item.model_dump(mode="json") for item in message.disposition_notification_to], }, resolved_attachments=[public_attachment_summary_payload(item) for item in message.attachments], issues_snapshot=[item.model_dump(mode="json") for item in message.issues], last_error="; ".join(issue.message for issue in message.issues if issue.severity == "error") or None, ) def build_campaign_version( session: Session, *, tenant_id: str, version_id: str, write_eml: bool = True, ) -> dict[str, Any]: version, snapshot_path, config = load_version_config(session, version_id) campaign = session.get(Campaign, version.campaign_id) if not campaign or campaign.tenant_id != tenant_id: raise CampaignPersistenceError("Campaign version is not accessible for this tenant") _ensure_current_campaign_version(campaign, version, action="build") if version.workflow_state == CampaignVersionWorkflowState.COMPLETED.value: raise CampaignPersistenceError("Sent campaign versions cannot be rebuilt") validation_summary = version.validation_summary if isinstance(version.validation_summary, dict) else {} if not validation_summary.get("ok"): raise CampaignPersistenceError("Campaign version must be successfully validated before messages are built") _ensure_version_validated_and_locked(version) output_dir = BUILD_OUTPUT_DIR / campaign.id / version.id with prepared_campaign_snapshot( session, tenant_id=tenant_id, campaign_id=campaign.id, raw_json=version.raw_json if isinstance(version.raw_json, dict) else {}, include_bytes=True, prefix="multimailer-managed-build-", ) as prepared: managed_config = load_campaign_config(prepared.path) result = build_campaign_messages(managed_config, campaign_file=prepared.path, output_dir=output_dir, write_eml=write_eml) annotate_built_messages_with_managed_files(result.built_messages, prepared.managed_files_by_local_path) report_json = result.report.model_dump(mode="json", by_alias=True) for message_payload, message in zip(report_json.get("messages", []), result.report.messages, strict=False): if isinstance(message_payload, dict): message_payload["attachments"] = [public_attachment_summary_payload(item) for item in message.attachments] report_json["built_at"] = datetime.now(UTC).isoformat() report_json["build_token"] = uuid4().hex report_json.update({ "built_count": result.report.built_count, "build_failed_count": result.report.build_failed_count, "ready_count": result.report.ready_count, "warning_count": result.report.warning_count, "needs_review_count": result.report.needs_review_count, "blocked_count": result.report.blocked_count, "excluded_count": result.report.excluded_count, "inactive_count": result.report.inactive_count, "queueable_count": result.report.queueable_count, }) version.build_summary = report_json editor_state = copy.deepcopy(version.editor_state or {}) editor_state.pop("review_send", None) version.editor_state = editor_state # Rebuild jobs for the current version. Later, protect sent jobs from destructive rebuilds. session.query(CampaignIssue).filter(CampaignIssue.campaign_version_id == version.id, CampaignIssue.job_id.is_not(None)).delete(synchronize_session=False) session.query(CampaignJob).filter(CampaignJob.campaign_version_id == version.id).delete(synchronize_session=False) session.flush() for built in result.built_messages: job = _job_from_message( tenant_id=tenant_id, campaign_id=campaign.id, version_id=version.id, message=built.draft, ) session.add(job) session.flush() record_campaign_attachment_uses_for_job(session, job, stage="built") for issue in built.draft.issues: session.add( CampaignIssue( tenant_id=tenant_id, campaign_id=campaign.id, campaign_version_id=version.id, job_id=job.id, severity=issue.severity, code=issue.code, message=issue.message, source=issue.source, behavior=issue.behavior, ) ) if result.report.needs_review_count or result.report.blocked_count: campaign.status = CampaignStatus.NEEDS_REVIEW.value version.workflow_state = CampaignVersionWorkflowState.APPROVED.value elif result.report.queueable_count > 0: campaign.status = CampaignStatus.READY_TO_QUEUE.value version.workflow_state = CampaignVersionWorkflowState.BUILT.value else: campaign.status = CampaignStatus.VALIDATED.value session.add(version) session.add(campaign) session.commit() return report_json