Files

433 lines
18 KiB
Python

from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import copy
from datetime import UTC, datetime
from uuid import uuid4
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.db.models import (
Campaign,
CampaignIssue,
CampaignJob,
CampaignStatus,
CampaignVersion,
CampaignVersionWorkflowState,
JobBuildStatus,
JobImapStatus,
JobQueueStatus,
JobSendStatus,
JobValidationStatus,
)
from app.mailer.campaign.loader import load_campaign_config
from app.mailer.campaign.validation import Severity, validate_campaign_config
from app.mailer.messages.builder import build_campaign_messages
from app.mailer.messages.models import MessageDraft
from app.storage.services import record_campaign_attachment_uses_for_job
from app.storage.campaign_attachments import (
annotate_built_messages_with_managed_files,
prepared_campaign_snapshot,
public_attachment_summary_payload,
)
RUNTIME_DIR = Path(__file__).resolve().parents[3] / "runtime"
CAMPAIGN_SNAPSHOT_DIR = RUNTIME_DIR / "campaign_snapshots"
BUILD_OUTPUT_DIR = RUNTIME_DIR / "generated_eml"
class CampaignPersistenceError(RuntimeError):
pass
def _ensure_dirs() -> None:
CAMPAIGN_SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True)
BUILD_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
def _write_campaign_snapshot(version: CampaignVersion) -> Path:
_ensure_dirs()
path = CAMPAIGN_SNAPSHOT_DIR / f"{version.id}.json"
path.write_text(json.dumps(version.raw_json, ensure_ascii=False, indent=2), encoding="utf-8")
return path
def _next_version_number(session: Session, campaign_id: str) -> int:
current = session.query(func.max(CampaignVersion.version_number)).filter(CampaignVersion.campaign_id == campaign_id).scalar()
return int(current or 0) + 1
def _resolve_runtime_path(base_path: Path | None, value: str | None) -> str | None:
if not value or base_path is None:
return value
path = Path(value).expanduser()
if path.is_absolute():
return str(path)
return str((base_path / path).resolve())
def normalize_campaign_paths(raw_json: dict[str, Any], source_base_path: str | Path | None) -> dict[str, Any]:
"""Return a DB/runtime-safe campaign JSON snapshot.
The CLI naturally resolves relative paths against the campaign.json file.
Once the campaign is stored in the database, the JSON snapshot lives in
app/mailer/runtime/campaign_snapshots. To keep existing file-based
campaigns working, relative file paths are normalized to absolute paths at
import time when a source_base_path is known.
"""
base = Path(source_base_path).expanduser().resolve() if source_base_path else None
data = copy.deepcopy(raw_json)
template_source = data.get("template", {}).get("source") if isinstance(data.get("template"), dict) else None
if isinstance(template_source, dict):
for key in ("subject_path", "text_path", "html_path"):
template_source[key] = _resolve_runtime_path(base, template_source.get(key))
entries_source = data.get("entries", {}).get("source") if isinstance(data.get("entries"), dict) else None
if isinstance(entries_source, dict):
entries_source["path"] = _resolve_runtime_path(base, entries_source.get("path"))
attachments = data.get("attachments")
if isinstance(attachments, dict):
attachments["base_path"] = _resolve_runtime_path(base, attachments.get("base_path")) or "."
return data
def create_campaign_version_from_json(
session: Session,
*,
tenant_id: str,
user_id: str | None,
raw_json: dict[str, Any],
source_filename: str | None = None,
source_base_path: str | None = None,
) -> tuple[Campaign, CampaignVersion]:
if source_base_path is None and source_filename:
source_path = Path(source_filename).expanduser()
source_base_path = str(source_path.parent if source_path.suffix else source_path)
runtime_json = normalize_campaign_paths(raw_json, source_base_path)
# load_campaign_config is file-oriented. Use a temporary snapshot for schema/Pydantic validation.
_ensure_dirs()
tmp_path = CAMPAIGN_SNAPSHOT_DIR / "_incoming_campaign.json"
tmp_path.write_text(json.dumps(runtime_json, ensure_ascii=False, indent=2), encoding="utf-8")
config = load_campaign_config(tmp_path)
campaign = (
session.query(Campaign)
.filter(Campaign.tenant_id == tenant_id, Campaign.external_id == config.campaign.id)
.one_or_none()
)
if campaign is None:
campaign = Campaign(
tenant_id=tenant_id,
created_by_user_id=user_id,
external_id=config.campaign.id,
name=config.campaign.name,
description=config.campaign.description,
status=CampaignStatus.DRAFT.value,
)
session.add(campaign)
session.flush()
else:
current = session.get(CampaignVersion, campaign.current_version_id) if campaign.current_version_id else None
if current and not _version_is_audit_safe_snapshot(current):
raise CampaignPersistenceError(
f"Campaign already has active working version #{current.version_number}. "
"Continue editing or unlock that version instead of importing a parallel draft."
)
campaign.name = config.campaign.name
campaign.description = config.campaign.description
version = CampaignVersion(
campaign_id=campaign.id,
version_number=_next_version_number(session, campaign.id),
raw_json=runtime_json,
schema_version=raw_json.get("version", "1.0"),
source_filename=source_filename,
source_base_path=source_base_path,
)
session.add(version)
session.flush()
campaign.current_version_id = version.id
session.add(campaign)
_write_campaign_snapshot(version)
session.commit()
return campaign, version
def _version_user_lock_state(version: CampaignVersion) -> str | None:
state = getattr(version, "user_lock_state", None)
if state in {"temporary", "permanent"}:
return state
return "permanent" if version.published_at else None
def _version_is_user_locked(version: CampaignVersion) -> bool:
return _version_user_lock_state(version) is not None
def _version_is_audit_safe_snapshot(version: CampaignVersion) -> bool:
return _version_user_lock_state(version) == "permanent" or version.workflow_state in {
CampaignVersionWorkflowState.QUEUED.value,
CampaignVersionWorkflowState.SENDING.value,
CampaignVersionWorkflowState.COMPLETED.value,
CampaignVersionWorkflowState.CANCELLED.value,
CampaignVersionWorkflowState.ARCHIVED.value,
}
def _ensure_current_campaign_version(campaign: Campaign, version: CampaignVersion, *, action: str) -> None:
if campaign.current_version_id != version.id:
raise CampaignPersistenceError(
f"Historical campaign versions are read-only and cannot be used to {action}. "
"Open the current working version instead."
)
def _version_is_validated_and_locked(version: CampaignVersion) -> bool:
validation_summary = version.validation_summary if isinstance(version.validation_summary, dict) else {}
return bool(version.locked_at and validation_summary.get("ok") is True and not _version_is_user_locked(version))
def _ensure_version_validated_and_locked(version: CampaignVersion) -> None:
state = _version_user_lock_state(version)
if state == "temporary":
raise CampaignPersistenceError("This version has a temporary user lock. Unlock it before building, queueing, dry-run or sending.")
if state == "permanent":
raise CampaignPersistenceError("This version is permanently user-locked. Create an editable copy instead.")
if not _version_is_validated_and_locked(version):
raise CampaignPersistenceError("Campaign version must be validated and locked before building, queueing, dry-run or sending.")
def load_version_config(session: Session, version_id: str):
version = session.get(CampaignVersion, version_id)
if not version:
raise CampaignPersistenceError(f"Campaign version not found: {version_id}")
path = _write_campaign_snapshot(version)
return version, path, load_campaign_config(path)
def validate_campaign_version(
session: Session,
*,
tenant_id: str,
version_id: str,
check_files: bool = False,
user_id: str | None = None,
lock_on_success: bool = True,
) -> dict[str, Any]:
version, snapshot_path, config = load_version_config(session, version_id)
campaign = session.get(Campaign, version.campaign_id)
if not campaign or campaign.tenant_id != tenant_id:
raise CampaignPersistenceError("Campaign version is not accessible for this tenant")
_ensure_current_campaign_version(campaign, version, action="validate")
if _version_is_user_locked(version) or version.workflow_state in {
CampaignVersionWorkflowState.QUEUED.value,
CampaignVersionWorkflowState.SENDING.value,
CampaignVersionWorkflowState.COMPLETED.value,
CampaignVersionWorkflowState.CANCELLED.value,
CampaignVersionWorkflowState.ARCHIVED.value,
}:
lock_label = "temporarily user-locked" if _version_user_lock_state(version) == "temporary" else "permanently locked/final"
raise CampaignPersistenceError(f"{lock_label.capitalize()} campaign versions cannot be validated. Unlock or create an editable copy instead.")
if check_files:
with prepared_campaign_snapshot(
session,
tenant_id=tenant_id,
campaign_id=campaign.id,
raw_json=version.raw_json if isinstance(version.raw_json, dict) else {},
include_bytes=False,
prefix="multimailer-managed-validate-",
) as prepared:
managed_config = load_campaign_config(prepared.path)
report = validate_campaign_config(managed_config, campaign_file=prepared.path, check_files=True)
else:
report = validate_campaign_config(config, campaign_file=snapshot_path, check_files=False)
report_json = report.model_dump(mode="json")
report_json.update({"ok": report.ok, "error_count": report.error_count, "warning_count": report.warning_count})
version.validation_summary = report_json
# Replace version-level semantic issues from previous validations.
(
session.query(CampaignIssue)
.filter(CampaignIssue.campaign_version_id == version.id, CampaignIssue.job_id.is_(None))
.delete(synchronize_session=False)
)
for issue in report.issues:
session.add(
CampaignIssue(
tenant_id=tenant_id,
campaign_id=campaign.id,
campaign_version_id=version.id,
severity=issue.severity.value,
code=issue.code,
message=issue.message,
source=issue.path,
)
)
campaign.status = CampaignStatus.VALIDATED.value if report.ok else CampaignStatus.NEEDS_REVIEW.value
if report.ok:
version.workflow_state = CampaignVersionWorkflowState.APPROVED.value
version.is_complete = True
if lock_on_success and version.locked_at is None:
from datetime import UTC, datetime
version.locked_at = datetime.now(UTC)
version.locked_by_user_id = user_id
else:
version.workflow_state = CampaignVersionWorkflowState.EDITING.value
session.add(version)
session.add(campaign)
session.commit()
return report_json
def _job_validation_status(value: str) -> str:
allowed = {item.value for item in JobValidationStatus}
return value if value in allowed else JobValidationStatus.NEEDS_REVIEW.value
def _job_from_message(
*,
tenant_id: str,
campaign_id: str,
version_id: str,
message: MessageDraft,
) -> CampaignJob:
recipient_email = message.to[0].email if message.to else None
return CampaignJob(
tenant_id=tenant_id,
campaign_id=campaign_id,
campaign_version_id=version_id,
entry_index=message.entry_index,
entry_id=message.entry_id,
recipient_email=recipient_email,
subject=message.subject,
eml_local_path=message.eml_path,
eml_size_bytes=message.eml_size_bytes,
build_status=message.build_status.value if hasattr(message.build_status, "value") else str(message.build_status),
validation_status=_job_validation_status(message.validation_status.value),
queue_status=JobQueueStatus.DRAFT.value,
send_status=JobSendStatus.NOT_QUEUED.value,
imap_status=message.imap_status.value if hasattr(message.imap_status, "value") else JobImapStatus.NOT_REQUESTED.value,
resolved_recipients={
"from": message.from_.model_dump(mode="json") if message.from_ else None,
"to": [item.model_dump(mode="json") for item in message.to],
"cc": [item.model_dump(mode="json") for item in message.cc],
"bcc": [item.model_dump(mode="json") for item in message.bcc],
"reply_to": [item.model_dump(mode="json") for item in message.reply_to],
"bounce_to": [item.model_dump(mode="json") for item in message.bounce_to],
"disposition_notification_to": [item.model_dump(mode="json") for item in message.disposition_notification_to],
},
resolved_attachments=[public_attachment_summary_payload(item) for item in message.attachments],
issues_snapshot=[item.model_dump(mode="json") for item in message.issues],
last_error="; ".join(issue.message for issue in message.issues if issue.severity == "error") or None,
)
def build_campaign_version(
session: Session,
*,
tenant_id: str,
version_id: str,
write_eml: bool = True,
) -> dict[str, Any]:
version, snapshot_path, config = load_version_config(session, version_id)
campaign = session.get(Campaign, version.campaign_id)
if not campaign or campaign.tenant_id != tenant_id:
raise CampaignPersistenceError("Campaign version is not accessible for this tenant")
_ensure_current_campaign_version(campaign, version, action="build")
if version.workflow_state == CampaignVersionWorkflowState.COMPLETED.value:
raise CampaignPersistenceError("Sent campaign versions cannot be rebuilt")
validation_summary = version.validation_summary if isinstance(version.validation_summary, dict) else {}
if not validation_summary.get("ok"):
raise CampaignPersistenceError("Campaign version must be successfully validated before messages are built")
_ensure_version_validated_and_locked(version)
output_dir = BUILD_OUTPUT_DIR / campaign.id / version.id
with prepared_campaign_snapshot(
session,
tenant_id=tenant_id,
campaign_id=campaign.id,
raw_json=version.raw_json if isinstance(version.raw_json, dict) else {},
include_bytes=True,
prefix="multimailer-managed-build-",
) as prepared:
managed_config = load_campaign_config(prepared.path)
result = build_campaign_messages(managed_config, campaign_file=prepared.path, output_dir=output_dir, write_eml=write_eml)
annotate_built_messages_with_managed_files(result.built_messages, prepared.managed_files_by_local_path)
report_json = result.report.model_dump(mode="json", by_alias=True)
for message_payload, message in zip(report_json.get("messages", []), result.report.messages, strict=False):
if isinstance(message_payload, dict):
message_payload["attachments"] = [public_attachment_summary_payload(item) for item in message.attachments]
report_json["built_at"] = datetime.now(UTC).isoformat()
report_json["build_token"] = uuid4().hex
report_json.update({
"built_count": result.report.built_count,
"build_failed_count": result.report.build_failed_count,
"ready_count": result.report.ready_count,
"warning_count": result.report.warning_count,
"needs_review_count": result.report.needs_review_count,
"blocked_count": result.report.blocked_count,
"excluded_count": result.report.excluded_count,
"inactive_count": result.report.inactive_count,
"queueable_count": result.report.queueable_count,
})
version.build_summary = report_json
editor_state = copy.deepcopy(version.editor_state or {})
editor_state.pop("review_send", None)
version.editor_state = editor_state
# Rebuild jobs for the current version. Later, protect sent jobs from destructive rebuilds.
session.query(CampaignIssue).filter(CampaignIssue.campaign_version_id == version.id, CampaignIssue.job_id.is_not(None)).delete(synchronize_session=False)
session.query(CampaignJob).filter(CampaignJob.campaign_version_id == version.id).delete(synchronize_session=False)
session.flush()
for built in result.built_messages:
job = _job_from_message(
tenant_id=tenant_id,
campaign_id=campaign.id,
version_id=version.id,
message=built.draft,
)
session.add(job)
session.flush()
record_campaign_attachment_uses_for_job(session, job, stage="built")
for issue in built.draft.issues:
session.add(
CampaignIssue(
tenant_id=tenant_id,
campaign_id=campaign.id,
campaign_version_id=version.id,
job_id=job.id,
severity=issue.severity,
code=issue.code,
message=issue.message,
source=issue.source,
behavior=issue.behavior,
)
)
if result.report.needs_review_count or result.report.blocked_count:
campaign.status = CampaignStatus.NEEDS_REVIEW.value
version.workflow_state = CampaignVersionWorkflowState.APPROVED.value
elif result.report.queueable_count > 0:
campaign.status = CampaignStatus.READY_TO_QUEUE.value
version.workflow_state = CampaignVersionWorkflowState.BUILT.value
else:
campaign.status = CampaignStatus.VALIDATED.value
session.add(version)
session.add(campaign)
session.commit()
return report_json