352 lines
13 KiB
Python
352 lines
13 KiB
Python
from __future__ import annotations
|
|
|
|
import csv
|
|
import io
|
|
import math
|
|
from collections import Counter
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.db.models import (
|
|
Campaign,
|
|
CampaignIssue,
|
|
CampaignJob,
|
|
CampaignVersion,
|
|
ImapAppendAttempt,
|
|
SendAttempt,
|
|
)
|
|
from app.mailer.campaign.loader import load_campaign_config
|
|
from app.mailer.persistence.campaigns import _write_campaign_snapshot
|
|
|
|
|
|
class CampaignReportError(RuntimeError):
|
|
pass
|
|
|
|
|
|
def _utcnow_iso() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def _counter(values: list[str | None]) -> dict[str, int]:
|
|
return dict(Counter(value or "unknown" for value in values))
|
|
|
|
|
|
def _get_campaign(session: Session, *, tenant_id: str, campaign_id: str) -> Campaign:
|
|
campaign = session.query(Campaign).filter(Campaign.tenant_id == tenant_id, Campaign.id == campaign_id).one_or_none()
|
|
if not campaign:
|
|
raise CampaignReportError(f"Campaign not found or not accessible: {campaign_id}")
|
|
return campaign
|
|
|
|
|
|
def _current_version(session: Session, campaign: Campaign) -> CampaignVersion | None:
|
|
if not campaign.current_version_id:
|
|
return None
|
|
version = session.get(CampaignVersion, campaign.current_version_id)
|
|
if version and version.campaign_id == campaign.id:
|
|
return version
|
|
return None
|
|
|
|
|
|
def _version_info(version: CampaignVersion | None) -> dict[str, Any] | None:
|
|
if not version:
|
|
return None
|
|
return {
|
|
"id": version.id,
|
|
"version_number": version.version_number,
|
|
"schema_version": version.schema_version,
|
|
"source_filename": version.source_filename,
|
|
"created_at": version.created_at.isoformat() if version.created_at else None,
|
|
"validation_summary": version.validation_summary,
|
|
"build_summary": version.build_summary,
|
|
}
|
|
|
|
|
|
def _load_delivery_info(version: CampaignVersion | None, jobs: list[CampaignJob]) -> dict[str, Any]:
|
|
"""Extract rate-limit and IMAP settings from the version JSON where possible.
|
|
|
|
This stays best-effort so reports still work if the schema evolves or a
|
|
partial/invalid campaign snapshot exists.
|
|
"""
|
|
|
|
default = {
|
|
"rate_limit": {"messages_per_minute": None, "concurrency": None},
|
|
"imap_append_sent": {"enabled": None, "folder": None},
|
|
"retry": {"max_attempts": None, "backoff_seconds": []},
|
|
"estimated_remaining_send_seconds": None,
|
|
"estimated_remaining_send_human": None,
|
|
}
|
|
if not version:
|
|
return default
|
|
try:
|
|
snapshot_path = _write_campaign_snapshot(version)
|
|
config = load_campaign_config(snapshot_path)
|
|
except Exception as exc: # pragma: no cover - reporting should not fail hard here
|
|
default["load_error"] = str(exc)
|
|
return default
|
|
|
|
messages_per_minute = config.delivery.rate_limit.messages_per_minute
|
|
pending = [job for job in jobs if job.send_status in {"queued", "failed_temporary", "sending"}]
|
|
estimated_seconds = None
|
|
if messages_per_minute and pending:
|
|
estimated_seconds = int(math.ceil((len(pending) / messages_per_minute) * 60))
|
|
|
|
return {
|
|
"rate_limit": {
|
|
"messages_per_minute": messages_per_minute,
|
|
"concurrency": config.delivery.rate_limit.concurrency,
|
|
},
|
|
"imap_append_sent": {
|
|
"enabled": config.delivery.imap_append_sent.enabled,
|
|
"folder": config.delivery.imap_append_sent.folder,
|
|
},
|
|
"retry": {
|
|
"max_attempts": config.delivery.retry.max_attempts,
|
|
"backoff_seconds": config.delivery.retry.backoff_seconds,
|
|
},
|
|
"estimated_remaining_send_seconds": estimated_seconds,
|
|
"estimated_remaining_send_human": _human_duration(estimated_seconds),
|
|
}
|
|
|
|
|
|
def _human_duration(seconds: int | None) -> str | None:
|
|
if seconds is None:
|
|
return None
|
|
if seconds < 60:
|
|
return f"{seconds}s"
|
|
minutes, sec = divmod(seconds, 60)
|
|
if minutes < 60:
|
|
return f"{minutes}m {sec}s" if sec else f"{minutes}m"
|
|
hours, minute = divmod(minutes, 60)
|
|
return f"{hours}h {minute}m" if minute else f"{hours}h"
|
|
|
|
|
|
def _issue_summary_from_jobs(jobs: list[CampaignJob]) -> dict[str, Any]:
|
|
severity_counter: Counter[str] = Counter()
|
|
code_counter: Counter[str] = Counter()
|
|
behavior_counter: Counter[str] = Counter()
|
|
total = 0
|
|
for job in jobs:
|
|
for issue in job.issues_snapshot or []:
|
|
if not isinstance(issue, dict):
|
|
continue
|
|
total += 1
|
|
severity_counter[issue.get("severity") or "unknown"] += 1
|
|
code_counter[issue.get("code") or "unknown"] += 1
|
|
if issue.get("behavior"):
|
|
behavior_counter[issue["behavior"]] += 1
|
|
return {
|
|
"total": total,
|
|
"by_severity": dict(severity_counter),
|
|
"by_code": dict(code_counter),
|
|
"by_behavior": dict(behavior_counter),
|
|
}
|
|
|
|
|
|
def _attachment_summary(jobs: list[CampaignJob]) -> dict[str, Any]:
|
|
status_counter: Counter[str] = Counter()
|
|
behavior_counter: Counter[str] = Counter()
|
|
total_configs = 0
|
|
total_matched_files = 0
|
|
zip_enabled = 0
|
|
missing = 0
|
|
ambiguous = 0
|
|
for job in jobs:
|
|
for attachment in job.resolved_attachments or []:
|
|
if not isinstance(attachment, dict):
|
|
continue
|
|
total_configs += 1
|
|
status = attachment.get("status") or "unknown"
|
|
status_counter[status] += 1
|
|
if attachment.get("behavior"):
|
|
behavior_counter[attachment["behavior"]] += 1
|
|
matches = attachment.get("matches") or []
|
|
if isinstance(matches, list):
|
|
total_matched_files += len(matches)
|
|
if attachment.get("zip_enabled"):
|
|
zip_enabled += 1
|
|
if status == "missing":
|
|
missing += 1
|
|
if status == "ambiguous":
|
|
ambiguous += 1
|
|
return {
|
|
"total_attachment_configs": total_configs,
|
|
"total_matched_files": total_matched_files,
|
|
"zip_enabled_configs": zip_enabled,
|
|
"missing_configs": missing,
|
|
"ambiguous_configs": ambiguous,
|
|
"by_status": dict(status_counter),
|
|
"by_behavior": dict(behavior_counter),
|
|
}
|
|
|
|
|
|
def _recent_failures(jobs: list[CampaignJob], *, limit: int = 20) -> list[dict[str, Any]]:
|
|
failed = [job for job in jobs if job.last_error or str(job.send_status).startswith("failed") or job.imap_status == "failed"]
|
|
failed.sort(key=lambda job: job.updated_at or job.created_at, reverse=True)
|
|
return [
|
|
{
|
|
"job_id": job.id,
|
|
"entry_index": job.entry_index,
|
|
"entry_id": job.entry_id,
|
|
"recipient_email": job.recipient_email,
|
|
"validation_status": job.validation_status,
|
|
"send_status": job.send_status,
|
|
"imap_status": job.imap_status,
|
|
"attempt_count": job.attempt_count,
|
|
"last_error": job.last_error,
|
|
"updated_at": job.updated_at.isoformat() if job.updated_at else None,
|
|
}
|
|
for job in failed[:limit]
|
|
]
|
|
|
|
|
|
def _job_row(job: CampaignJob) -> dict[str, Any]:
|
|
return {
|
|
"job_id": job.id,
|
|
"entry_index": job.entry_index,
|
|
"entry_id": job.entry_id,
|
|
"recipient_email": job.recipient_email,
|
|
"subject": job.subject,
|
|
"build_status": job.build_status,
|
|
"validation_status": job.validation_status,
|
|
"queue_status": job.queue_status,
|
|
"send_status": job.send_status,
|
|
"imap_status": job.imap_status,
|
|
"attempt_count": job.attempt_count,
|
|
"queued_at": job.queued_at.isoformat() if job.queued_at else None,
|
|
"sent_at": job.sent_at.isoformat() if job.sent_at else None,
|
|
"last_error": job.last_error,
|
|
"eml_size_bytes": job.eml_size_bytes,
|
|
"issues_count": len(job.issues_snapshot or []),
|
|
"attachment_config_count": len(job.resolved_attachments or []),
|
|
"matched_file_count": sum(len(item.get("matches") or []) for item in (job.resolved_attachments or []) if isinstance(item, dict)),
|
|
}
|
|
|
|
|
|
def generate_campaign_report(
|
|
session: Session,
|
|
*,
|
|
tenant_id: str,
|
|
campaign_id: str,
|
|
include_jobs: bool = False,
|
|
include_recent_failures: bool = True,
|
|
) -> dict[str, Any]:
|
|
"""Generate a dashboard/report payload for one campaign.
|
|
|
|
The shape is intentionally web-UI friendly: status counters for cards,
|
|
issue/attachment summaries for review panels, and optional job rows for
|
|
tables/export.
|
|
"""
|
|
|
|
campaign = _get_campaign(session, tenant_id=tenant_id, campaign_id=campaign_id)
|
|
version = _current_version(session, campaign)
|
|
jobs = (
|
|
session.query(CampaignJob)
|
|
.filter(CampaignJob.tenant_id == tenant_id, CampaignJob.campaign_id == campaign.id)
|
|
.order_by(CampaignJob.entry_index.asc())
|
|
.all()
|
|
)
|
|
job_ids = [job.id for job in jobs]
|
|
send_attempts = session.query(SendAttempt).filter(SendAttempt.job_id.in_(job_ids)).count() if job_ids else 0
|
|
imap_attempts = session.query(ImapAppendAttempt).filter(ImapAppendAttempt.job_id.in_(job_ids)).count() if job_ids else 0
|
|
persisted_issues = session.query(CampaignIssue).filter(CampaignIssue.tenant_id == tenant_id, CampaignIssue.campaign_id == campaign.id).count()
|
|
|
|
validation_counts = _counter([job.validation_status for job in jobs])
|
|
queue_counts = _counter([job.queue_status for job in jobs])
|
|
send_counts = _counter([job.send_status for job in jobs])
|
|
imap_counts = _counter([job.imap_status for job in jobs])
|
|
build_counts = _counter([job.build_status for job in jobs])
|
|
|
|
queueable = sum(1 for job in jobs if job.validation_status in {"ready", "warning"} and job.build_status == "built")
|
|
needs_attention = sum(
|
|
1
|
|
for job in jobs
|
|
if job.validation_status in {"needs_review", "blocked"}
|
|
or job.send_status in {"failed_temporary", "failed_permanent"}
|
|
or job.imap_status == "failed"
|
|
)
|
|
sent = send_counts.get("sent", 0)
|
|
failed = send_counts.get("failed_temporary", 0) + send_counts.get("failed_permanent", 0)
|
|
|
|
report: dict[str, Any] = {
|
|
"generated_at": _utcnow_iso(),
|
|
"campaign": {
|
|
"id": campaign.id,
|
|
"external_id": campaign.external_id,
|
|
"name": campaign.name,
|
|
"description": campaign.description,
|
|
"status": campaign.status,
|
|
"created_at": campaign.created_at.isoformat() if campaign.created_at else None,
|
|
"updated_at": campaign.updated_at.isoformat() if campaign.updated_at else None,
|
|
},
|
|
"current_version": _version_info(version),
|
|
"cards": {
|
|
"jobs_total": len(jobs),
|
|
"queueable": queueable,
|
|
"needs_attention": needs_attention,
|
|
"sent": sent,
|
|
"failed": failed,
|
|
"imap_appended": imap_counts.get("appended", 0),
|
|
"imap_failed": imap_counts.get("failed", 0),
|
|
},
|
|
"status_counts": {
|
|
"build": build_counts,
|
|
"validation": validation_counts,
|
|
"queue": queue_counts,
|
|
"send": send_counts,
|
|
"imap": imap_counts,
|
|
},
|
|
"issues": {
|
|
**_issue_summary_from_jobs(jobs),
|
|
"persisted_campaign_issue_count": persisted_issues,
|
|
},
|
|
"attachments": _attachment_summary(jobs),
|
|
"attempts": {
|
|
"send_attempts": int(send_attempts),
|
|
"imap_append_attempts": int(imap_attempts),
|
|
},
|
|
"delivery": _load_delivery_info(version, jobs),
|
|
}
|
|
if include_recent_failures:
|
|
report["recent_failures"] = _recent_failures(jobs)
|
|
if include_jobs:
|
|
report["jobs"] = [_job_row(job) for job in jobs]
|
|
return report
|
|
|
|
|
|
def generate_jobs_csv(session: Session, *, tenant_id: str, campaign_id: str) -> str:
|
|
campaign = _get_campaign(session, tenant_id=tenant_id, campaign_id=campaign_id)
|
|
jobs = (
|
|
session.query(CampaignJob)
|
|
.filter(CampaignJob.tenant_id == tenant_id, CampaignJob.campaign_id == campaign.id)
|
|
.order_by(CampaignJob.entry_index.asc())
|
|
.all()
|
|
)
|
|
rows = [_job_row(job) for job in jobs]
|
|
fieldnames = [
|
|
"job_id",
|
|
"entry_index",
|
|
"entry_id",
|
|
"recipient_email",
|
|
"subject",
|
|
"build_status",
|
|
"validation_status",
|
|
"queue_status",
|
|
"send_status",
|
|
"imap_status",
|
|
"attempt_count",
|
|
"queued_at",
|
|
"sent_at",
|
|
"last_error",
|
|
"eml_size_bytes",
|
|
"issues_count",
|
|
"attachment_config_count",
|
|
"matched_file_count",
|
|
]
|
|
buffer = io.StringIO()
|
|
writer = csv.DictWriter(buffer, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
writer.writerows(rows)
|
|
return buffer.getvalue()
|