Files
2026-06-08 15:57:11 +02:00

211 lines
7.2 KiB
Python

from __future__ import annotations
import json
from dataclasses import dataclass
from email.message import EmailMessage
from email.utils import formataddr
from typing import Any
from sqlalchemy.orm import Session
from app.db.models import Campaign, CampaignVersion
from app.mailer.campaign.loader import load_campaign_config
from app.mailer.campaign.models import CampaignConfig, SmtpConfig
from app.mailer.persistence.campaigns import _write_campaign_snapshot
from app.mailer.reports.campaigns import CampaignReportError, generate_campaign_report, generate_jobs_csv
from app.mailer.sending.smtp import SmtpConfigurationError, SmtpSendResult, send_email_message
class CampaignReportEmailError(RuntimeError):
pass
@dataclass(slots=True)
class CampaignReportEmailResult:
campaign_id: str
to: list[str]
subject: str
dry_run: bool
sent: bool
attached_jobs_csv: bool
attached_report_json: bool
smtp_host: str | None = None
smtp_port: int | None = None
accepted_count: int | None = None
def as_dict(self) -> dict[str, Any]:
return {
"campaign_id": self.campaign_id,
"to": self.to,
"subject": self.subject,
"dry_run": self.dry_run,
"sent": self.sent,
"attached_jobs_csv": self.attached_jobs_csv,
"attached_report_json": self.attached_report_json,
"smtp_host": self.smtp_host,
"smtp_port": self.smtp_port,
"accepted_count": self.accepted_count,
}
def _current_version(session: Session, campaign: Campaign) -> CampaignVersion:
version = session.get(CampaignVersion, campaign.current_version_id) if campaign.current_version_id else None
if version is None:
version = (
session.query(CampaignVersion)
.filter(CampaignVersion.campaign_id == campaign.id)
.order_by(CampaignVersion.version_number.desc())
.first()
)
if version is None:
raise CampaignReportEmailError("Campaign has no version")
return version
def _load_config(version: CampaignVersion) -> CampaignConfig:
snapshot_path = _write_campaign_snapshot(version)
return load_campaign_config(snapshot_path)
def _effective_from(config: CampaignConfig) -> tuple[str, str | None]:
if config.recipients.from_:
return config.recipients.from_.email, config.recipients.from_.name
if config.server.smtp and config.server.smtp.username and "@" in config.server.smtp.username:
return config.server.smtp.username, None
raise SmtpConfigurationError("Report email requires recipients.from.email or an SMTP username that is an email address")
def _text_summary(report: dict[str, Any]) -> str:
campaign = report["campaign"]
cards = report["cards"]
status = report["status_counts"]
delivery = report.get("delivery", {})
lines = [
f"Campaign report: {campaign['name']}",
"",
f"Campaign ID: {campaign['id']}",
f"External ID: {campaign['external_id']}",
f"Status: {campaign['status']}",
"",
"Overview",
f"- Jobs total: {cards['jobs_total']}",
f"- Queueable: {cards['queueable']}",
f"- Needs attention: {cards['needs_attention']}",
f"- Sent: {cards['sent']}",
f"- Failed: {cards['failed']}",
f"- IMAP appended: {cards['imap_appended']}",
f"- IMAP failed: {cards['imap_failed']}",
"",
f"Build status: {status.get('build', {})}",
f"Validation status: {status.get('validation', {})}",
f"Queue status: {status.get('queue', {})}",
f"Send status: {status.get('send', {})}",
f"IMAP status: {status.get('imap', {})}",
]
if delivery.get("estimated_remaining_send_human"):
lines.extend(["", f"Estimated remaining send time: {delivery['estimated_remaining_send_human']}"])
lines.extend(["", "This report was generated by MultiMailer."])
return "\n".join(lines)
def build_report_message(
*,
campaign: Campaign,
config: CampaignConfig,
report: dict[str, Any],
to: list[str],
jobs_csv: str | None = None,
report_json: dict[str, Any] | None = None,
) -> EmailMessage:
from_email, from_name = _effective_from(config)
subject = f"MultiMailer report: {campaign.name}"
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = formataddr((from_name or from_email, from_email))
msg["To"] = ", ".join(to)
msg["X-MultiMailer-Report"] = "campaign"
msg.set_content(_text_summary(report))
if jobs_csv is not None:
filename = f"multimailer-{campaign.external_id}-jobs.csv"
msg.add_attachment(jobs_csv.encode("utf-8"), maintype="text", subtype="csv", filename=filename)
if report_json is not None:
filename = f"multimailer-{campaign.external_id}-report.json"
msg.add_attachment(
json.dumps(report_json, indent=2, ensure_ascii=False, default=str).encode("utf-8"),
maintype="application",
subtype="json",
filename=filename,
)
return msg
def send_campaign_report_email(
session: Session,
*,
tenant_id: str,
campaign_id: str,
to: list[str],
include_jobs: bool = False,
attach_jobs_csv: bool = True,
attach_report_json: bool = False,
dry_run: bool = False,
) -> CampaignReportEmailResult:
campaign = session.get(Campaign, campaign_id)
if not campaign or campaign.tenant_id != tenant_id:
raise CampaignReportError("Campaign not found")
if not to:
raise CampaignReportEmailError("At least one report recipient is required")
version = _current_version(session, campaign)
config = _load_config(version)
smtp_config: SmtpConfig | None = config.server.smtp
if smtp_config is None:
raise SmtpConfigurationError("Campaign has no SMTP configuration")
report = generate_campaign_report(session, tenant_id=tenant_id, campaign_id=campaign_id, include_jobs=include_jobs)
jobs_csv = generate_jobs_csv(session, tenant_id=tenant_id, campaign_id=campaign_id) if attach_jobs_csv else None
report_json = report if attach_report_json else None
message = build_report_message(
campaign=campaign,
config=config,
report=report,
to=to,
jobs_csv=jobs_csv,
report_json=report_json,
)
envelope_from, _ = _effective_from(config)
if dry_run:
return CampaignReportEmailResult(
campaign_id=campaign.id,
to=to,
subject=str(message["Subject"]),
dry_run=True,
sent=False,
attached_jobs_csv=jobs_csv is not None,
attached_report_json=report_json is not None,
smtp_host=smtp_config.host,
smtp_port=smtp_config.port,
)
result: SmtpSendResult = send_email_message(
message,
smtp_config=smtp_config,
envelope_from=envelope_from,
envelope_recipients=to,
)
return CampaignReportEmailResult(
campaign_id=campaign.id,
to=to,
subject=str(message["Subject"]),
dry_run=False,
sent=True,
attached_jobs_csv=jobs_csv is not None,
attached_report_json=report_json is not None,
smtp_host=result.host,
smtp_port=result.port,
accepted_count=result.accepted_count,
)