Files
2026-06-08 15:57:11 +02:00

352 lines
13 KiB
Python

from __future__ import annotations
import csv
import io
import math
from collections import Counter
from datetime import datetime, timezone
from typing import Any
from sqlalchemy.orm import Session
from app.db.models import (
Campaign,
CampaignIssue,
CampaignJob,
CampaignVersion,
ImapAppendAttempt,
SendAttempt,
)
from app.mailer.campaign.loader import load_campaign_config
from app.mailer.persistence.campaigns import _write_campaign_snapshot
class CampaignReportError(RuntimeError):
pass
def _utcnow_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _counter(values: list[str | None]) -> dict[str, int]:
return dict(Counter(value or "unknown" for value in values))
def _get_campaign(session: Session, *, tenant_id: str, campaign_id: str) -> Campaign:
campaign = session.query(Campaign).filter(Campaign.tenant_id == tenant_id, Campaign.id == campaign_id).one_or_none()
if not campaign:
raise CampaignReportError(f"Campaign not found or not accessible: {campaign_id}")
return campaign
def _current_version(session: Session, campaign: Campaign) -> CampaignVersion | None:
if not campaign.current_version_id:
return None
version = session.get(CampaignVersion, campaign.current_version_id)
if version and version.campaign_id == campaign.id:
return version
return None
def _version_info(version: CampaignVersion | None) -> dict[str, Any] | None:
if not version:
return None
return {
"id": version.id,
"version_number": version.version_number,
"schema_version": version.schema_version,
"source_filename": version.source_filename,
"created_at": version.created_at.isoformat() if version.created_at else None,
"validation_summary": version.validation_summary,
"build_summary": version.build_summary,
}
def _load_delivery_info(version: CampaignVersion | None, jobs: list[CampaignJob]) -> dict[str, Any]:
"""Extract rate-limit and IMAP settings from the version JSON where possible.
This stays best-effort so reports still work if the schema evolves or a
partial/invalid campaign snapshot exists.
"""
default = {
"rate_limit": {"messages_per_minute": None, "concurrency": None},
"imap_append_sent": {"enabled": None, "folder": None},
"retry": {"max_attempts": None, "backoff_seconds": []},
"estimated_remaining_send_seconds": None,
"estimated_remaining_send_human": None,
}
if not version:
return default
try:
snapshot_path = _write_campaign_snapshot(version)
config = load_campaign_config(snapshot_path)
except Exception as exc: # pragma: no cover - reporting should not fail hard here
default["load_error"] = str(exc)
return default
messages_per_minute = config.delivery.rate_limit.messages_per_minute
pending = [job for job in jobs if job.send_status in {"queued", "failed_temporary", "sending"}]
estimated_seconds = None
if messages_per_minute and pending:
estimated_seconds = int(math.ceil((len(pending) / messages_per_minute) * 60))
return {
"rate_limit": {
"messages_per_minute": messages_per_minute,
"concurrency": config.delivery.rate_limit.concurrency,
},
"imap_append_sent": {
"enabled": config.delivery.imap_append_sent.enabled,
"folder": config.delivery.imap_append_sent.folder,
},
"retry": {
"max_attempts": config.delivery.retry.max_attempts,
"backoff_seconds": config.delivery.retry.backoff_seconds,
},
"estimated_remaining_send_seconds": estimated_seconds,
"estimated_remaining_send_human": _human_duration(estimated_seconds),
}
def _human_duration(seconds: int | None) -> str | None:
if seconds is None:
return None
if seconds < 60:
return f"{seconds}s"
minutes, sec = divmod(seconds, 60)
if minutes < 60:
return f"{minutes}m {sec}s" if sec else f"{minutes}m"
hours, minute = divmod(minutes, 60)
return f"{hours}h {minute}m" if minute else f"{hours}h"
def _issue_summary_from_jobs(jobs: list[CampaignJob]) -> dict[str, Any]:
severity_counter: Counter[str] = Counter()
code_counter: Counter[str] = Counter()
behavior_counter: Counter[str] = Counter()
total = 0
for job in jobs:
for issue in job.issues_snapshot or []:
if not isinstance(issue, dict):
continue
total += 1
severity_counter[issue.get("severity") or "unknown"] += 1
code_counter[issue.get("code") or "unknown"] += 1
if issue.get("behavior"):
behavior_counter[issue["behavior"]] += 1
return {
"total": total,
"by_severity": dict(severity_counter),
"by_code": dict(code_counter),
"by_behavior": dict(behavior_counter),
}
def _attachment_summary(jobs: list[CampaignJob]) -> dict[str, Any]:
status_counter: Counter[str] = Counter()
behavior_counter: Counter[str] = Counter()
total_configs = 0
total_matched_files = 0
zip_enabled = 0
missing = 0
ambiguous = 0
for job in jobs:
for attachment in job.resolved_attachments or []:
if not isinstance(attachment, dict):
continue
total_configs += 1
status = attachment.get("status") or "unknown"
status_counter[status] += 1
if attachment.get("behavior"):
behavior_counter[attachment["behavior"]] += 1
matches = attachment.get("matches") or []
if isinstance(matches, list):
total_matched_files += len(matches)
if attachment.get("zip_enabled"):
zip_enabled += 1
if status == "missing":
missing += 1
if status == "ambiguous":
ambiguous += 1
return {
"total_attachment_configs": total_configs,
"total_matched_files": total_matched_files,
"zip_enabled_configs": zip_enabled,
"missing_configs": missing,
"ambiguous_configs": ambiguous,
"by_status": dict(status_counter),
"by_behavior": dict(behavior_counter),
}
def _recent_failures(jobs: list[CampaignJob], *, limit: int = 20) -> list[dict[str, Any]]:
failed = [job for job in jobs if job.last_error or str(job.send_status).startswith("failed") or job.imap_status == "failed"]
failed.sort(key=lambda job: job.updated_at or job.created_at, reverse=True)
return [
{
"job_id": job.id,
"entry_index": job.entry_index,
"entry_id": job.entry_id,
"recipient_email": job.recipient_email,
"validation_status": job.validation_status,
"send_status": job.send_status,
"imap_status": job.imap_status,
"attempt_count": job.attempt_count,
"last_error": job.last_error,
"updated_at": job.updated_at.isoformat() if job.updated_at else None,
}
for job in failed[:limit]
]
def _job_row(job: CampaignJob) -> dict[str, Any]:
return {
"job_id": job.id,
"entry_index": job.entry_index,
"entry_id": job.entry_id,
"recipient_email": job.recipient_email,
"subject": job.subject,
"build_status": job.build_status,
"validation_status": job.validation_status,
"queue_status": job.queue_status,
"send_status": job.send_status,
"imap_status": job.imap_status,
"attempt_count": job.attempt_count,
"queued_at": job.queued_at.isoformat() if job.queued_at else None,
"sent_at": job.sent_at.isoformat() if job.sent_at else None,
"last_error": job.last_error,
"eml_size_bytes": job.eml_size_bytes,
"issues_count": len(job.issues_snapshot or []),
"attachment_config_count": len(job.resolved_attachments or []),
"matched_file_count": sum(len(item.get("matches") or []) for item in (job.resolved_attachments or []) if isinstance(item, dict)),
}
def generate_campaign_report(
session: Session,
*,
tenant_id: str,
campaign_id: str,
include_jobs: bool = False,
include_recent_failures: bool = True,
) -> dict[str, Any]:
"""Generate a dashboard/report payload for one campaign.
The shape is intentionally web-UI friendly: status counters for cards,
issue/attachment summaries for review panels, and optional job rows for
tables/export.
"""
campaign = _get_campaign(session, tenant_id=tenant_id, campaign_id=campaign_id)
version = _current_version(session, campaign)
jobs = (
session.query(CampaignJob)
.filter(CampaignJob.tenant_id == tenant_id, CampaignJob.campaign_id == campaign.id)
.order_by(CampaignJob.entry_index.asc())
.all()
)
job_ids = [job.id for job in jobs]
send_attempts = session.query(SendAttempt).filter(SendAttempt.job_id.in_(job_ids)).count() if job_ids else 0
imap_attempts = session.query(ImapAppendAttempt).filter(ImapAppendAttempt.job_id.in_(job_ids)).count() if job_ids else 0
persisted_issues = session.query(CampaignIssue).filter(CampaignIssue.tenant_id == tenant_id, CampaignIssue.campaign_id == campaign.id).count()
validation_counts = _counter([job.validation_status for job in jobs])
queue_counts = _counter([job.queue_status for job in jobs])
send_counts = _counter([job.send_status for job in jobs])
imap_counts = _counter([job.imap_status for job in jobs])
build_counts = _counter([job.build_status for job in jobs])
queueable = sum(1 for job in jobs if job.validation_status in {"ready", "warning"} and job.build_status == "built")
needs_attention = sum(
1
for job in jobs
if job.validation_status in {"needs_review", "blocked"}
or job.send_status in {"failed_temporary", "failed_permanent"}
or job.imap_status == "failed"
)
sent = send_counts.get("sent", 0)
failed = send_counts.get("failed_temporary", 0) + send_counts.get("failed_permanent", 0)
report: dict[str, Any] = {
"generated_at": _utcnow_iso(),
"campaign": {
"id": campaign.id,
"external_id": campaign.external_id,
"name": campaign.name,
"description": campaign.description,
"status": campaign.status,
"created_at": campaign.created_at.isoformat() if campaign.created_at else None,
"updated_at": campaign.updated_at.isoformat() if campaign.updated_at else None,
},
"current_version": _version_info(version),
"cards": {
"jobs_total": len(jobs),
"queueable": queueable,
"needs_attention": needs_attention,
"sent": sent,
"failed": failed,
"imap_appended": imap_counts.get("appended", 0),
"imap_failed": imap_counts.get("failed", 0),
},
"status_counts": {
"build": build_counts,
"validation": validation_counts,
"queue": queue_counts,
"send": send_counts,
"imap": imap_counts,
},
"issues": {
**_issue_summary_from_jobs(jobs),
"persisted_campaign_issue_count": persisted_issues,
},
"attachments": _attachment_summary(jobs),
"attempts": {
"send_attempts": int(send_attempts),
"imap_append_attempts": int(imap_attempts),
},
"delivery": _load_delivery_info(version, jobs),
}
if include_recent_failures:
report["recent_failures"] = _recent_failures(jobs)
if include_jobs:
report["jobs"] = [_job_row(job) for job in jobs]
return report
def generate_jobs_csv(session: Session, *, tenant_id: str, campaign_id: str) -> str:
campaign = _get_campaign(session, tenant_id=tenant_id, campaign_id=campaign_id)
jobs = (
session.query(CampaignJob)
.filter(CampaignJob.tenant_id == tenant_id, CampaignJob.campaign_id == campaign.id)
.order_by(CampaignJob.entry_index.asc())
.all()
)
rows = [_job_row(job) for job in jobs]
fieldnames = [
"job_id",
"entry_index",
"entry_id",
"recipient_email",
"subject",
"build_status",
"validation_status",
"queue_status",
"send_status",
"imap_status",
"attempt_count",
"queued_at",
"sent_at",
"last_error",
"eml_size_bytes",
"issues_count",
"attachment_config_count",
"matched_file_count",
]
buffer = io.StringIO()
writer = csv.DictWriter(buffer, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
return buffer.getvalue()