from __future__ import annotations import csv import io import math from collections import Counter from datetime import datetime, timezone from typing import Any from sqlalchemy.orm import Session from app.db.models import ( Campaign, CampaignIssue, CampaignJob, CampaignVersion, ImapAppendAttempt, SendAttempt, ) from app.mailer.campaign.loader import load_campaign_config from app.mailer.persistence.campaigns import _write_campaign_snapshot class CampaignReportError(RuntimeError): pass def _utcnow_iso() -> str: return datetime.now(timezone.utc).isoformat() def _counter(values: list[str | None]) -> dict[str, int]: return dict(Counter(value or "unknown" for value in values)) def _get_campaign(session: Session, *, tenant_id: str, campaign_id: str) -> Campaign: campaign = session.query(Campaign).filter(Campaign.tenant_id == tenant_id, Campaign.id == campaign_id).one_or_none() if not campaign: raise CampaignReportError(f"Campaign not found or not accessible: {campaign_id}") return campaign def _current_version(session: Session, campaign: Campaign) -> CampaignVersion | None: if not campaign.current_version_id: return None version = session.get(CampaignVersion, campaign.current_version_id) if version and version.campaign_id == campaign.id: return version return None def _version_info(version: CampaignVersion | None) -> dict[str, Any] | None: if not version: return None return { "id": version.id, "version_number": version.version_number, "schema_version": version.schema_version, "source_filename": version.source_filename, "created_at": version.created_at.isoformat() if version.created_at else None, "validation_summary": version.validation_summary, "build_summary": version.build_summary, } def _load_delivery_info(version: CampaignVersion | None, jobs: list[CampaignJob]) -> dict[str, Any]: """Extract rate-limit and IMAP settings from the version JSON where possible. This stays best-effort so reports still work if the schema evolves or a partial/invalid campaign snapshot exists. """ default = { "rate_limit": {"messages_per_minute": None, "concurrency": None}, "imap_append_sent": {"enabled": None, "folder": None}, "retry": {"max_attempts": None, "backoff_seconds": []}, "estimated_remaining_send_seconds": None, "estimated_remaining_send_human": None, } if not version: return default try: snapshot_path = _write_campaign_snapshot(version) config = load_campaign_config(snapshot_path) except Exception as exc: # pragma: no cover - reporting should not fail hard here default["load_error"] = str(exc) return default messages_per_minute = config.delivery.rate_limit.messages_per_minute pending = [job for job in jobs if job.send_status in {"queued", "failed_temporary", "sending"}] estimated_seconds = None if messages_per_minute and pending: estimated_seconds = int(math.ceil((len(pending) / messages_per_minute) * 60)) return { "rate_limit": { "messages_per_minute": messages_per_minute, "concurrency": config.delivery.rate_limit.concurrency, }, "imap_append_sent": { "enabled": config.delivery.imap_append_sent.enabled, "folder": config.delivery.imap_append_sent.folder, }, "retry": { "max_attempts": config.delivery.retry.max_attempts, "backoff_seconds": config.delivery.retry.backoff_seconds, }, "estimated_remaining_send_seconds": estimated_seconds, "estimated_remaining_send_human": _human_duration(estimated_seconds), } def _human_duration(seconds: int | None) -> str | None: if seconds is None: return None if seconds < 60: return f"{seconds}s" minutes, sec = divmod(seconds, 60) if minutes < 60: return f"{minutes}m {sec}s" if sec else f"{minutes}m" hours, minute = divmod(minutes, 60) return f"{hours}h {minute}m" if minute else f"{hours}h" def _issue_summary_from_jobs(jobs: list[CampaignJob]) -> dict[str, Any]: severity_counter: Counter[str] = Counter() code_counter: Counter[str] = Counter() behavior_counter: Counter[str] = Counter() total = 0 for job in jobs: for issue in job.issues_snapshot or []: if not isinstance(issue, dict): continue total += 1 severity_counter[issue.get("severity") or "unknown"] += 1 code_counter[issue.get("code") or "unknown"] += 1 if issue.get("behavior"): behavior_counter[issue["behavior"]] += 1 return { "total": total, "by_severity": dict(severity_counter), "by_code": dict(code_counter), "by_behavior": dict(behavior_counter), } def _attachment_summary(jobs: list[CampaignJob]) -> dict[str, Any]: status_counter: Counter[str] = Counter() behavior_counter: Counter[str] = Counter() total_configs = 0 total_matched_files = 0 zip_enabled = 0 missing = 0 ambiguous = 0 for job in jobs: for attachment in job.resolved_attachments or []: if not isinstance(attachment, dict): continue total_configs += 1 status = attachment.get("status") or "unknown" status_counter[status] += 1 if attachment.get("behavior"): behavior_counter[attachment["behavior"]] += 1 matches = attachment.get("matches") or [] if isinstance(matches, list): total_matched_files += len(matches) if attachment.get("zip_enabled"): zip_enabled += 1 if status == "missing": missing += 1 if status == "ambiguous": ambiguous += 1 return { "total_attachment_configs": total_configs, "total_matched_files": total_matched_files, "zip_enabled_configs": zip_enabled, "missing_configs": missing, "ambiguous_configs": ambiguous, "by_status": dict(status_counter), "by_behavior": dict(behavior_counter), } def _recent_failures(jobs: list[CampaignJob], *, limit: int = 20) -> list[dict[str, Any]]: failed = [job for job in jobs if job.last_error or str(job.send_status).startswith("failed") or job.imap_status == "failed"] failed.sort(key=lambda job: job.updated_at or job.created_at, reverse=True) return [ { "job_id": job.id, "entry_index": job.entry_index, "entry_id": job.entry_id, "recipient_email": job.recipient_email, "validation_status": job.validation_status, "send_status": job.send_status, "imap_status": job.imap_status, "attempt_count": job.attempt_count, "last_error": job.last_error, "updated_at": job.updated_at.isoformat() if job.updated_at else None, } for job in failed[:limit] ] def _job_row(job: CampaignJob) -> dict[str, Any]: return { "job_id": job.id, "entry_index": job.entry_index, "entry_id": job.entry_id, "recipient_email": job.recipient_email, "subject": job.subject, "build_status": job.build_status, "validation_status": job.validation_status, "queue_status": job.queue_status, "send_status": job.send_status, "imap_status": job.imap_status, "attempt_count": job.attempt_count, "queued_at": job.queued_at.isoformat() if job.queued_at else None, "sent_at": job.sent_at.isoformat() if job.sent_at else None, "last_error": job.last_error, "eml_size_bytes": job.eml_size_bytes, "issues_count": len(job.issues_snapshot or []), "attachment_config_count": len(job.resolved_attachments or []), "matched_file_count": sum(len(item.get("matches") or []) for item in (job.resolved_attachments or []) if isinstance(item, dict)), } def generate_campaign_report( session: Session, *, tenant_id: str, campaign_id: str, include_jobs: bool = False, include_recent_failures: bool = True, ) -> dict[str, Any]: """Generate a dashboard/report payload for one campaign. The shape is intentionally web-UI friendly: status counters for cards, issue/attachment summaries for review panels, and optional job rows for tables/export. """ campaign = _get_campaign(session, tenant_id=tenant_id, campaign_id=campaign_id) version = _current_version(session, campaign) jobs = ( session.query(CampaignJob) .filter(CampaignJob.tenant_id == tenant_id, CampaignJob.campaign_id == campaign.id) .order_by(CampaignJob.entry_index.asc()) .all() ) job_ids = [job.id for job in jobs] send_attempts = session.query(SendAttempt).filter(SendAttempt.job_id.in_(job_ids)).count() if job_ids else 0 imap_attempts = session.query(ImapAppendAttempt).filter(ImapAppendAttempt.job_id.in_(job_ids)).count() if job_ids else 0 persisted_issues = session.query(CampaignIssue).filter(CampaignIssue.tenant_id == tenant_id, CampaignIssue.campaign_id == campaign.id).count() validation_counts = _counter([job.validation_status for job in jobs]) queue_counts = _counter([job.queue_status for job in jobs]) send_counts = _counter([job.send_status for job in jobs]) imap_counts = _counter([job.imap_status for job in jobs]) build_counts = _counter([job.build_status for job in jobs]) queueable = sum(1 for job in jobs if job.validation_status in {"ready", "warning"} and job.build_status == "built") needs_attention = sum( 1 for job in jobs if job.validation_status in {"needs_review", "blocked"} or job.send_status in {"failed_temporary", "failed_permanent"} or job.imap_status == "failed" ) sent = send_counts.get("sent", 0) failed = send_counts.get("failed_temporary", 0) + send_counts.get("failed_permanent", 0) report: dict[str, Any] = { "generated_at": _utcnow_iso(), "campaign": { "id": campaign.id, "external_id": campaign.external_id, "name": campaign.name, "description": campaign.description, "status": campaign.status, "created_at": campaign.created_at.isoformat() if campaign.created_at else None, "updated_at": campaign.updated_at.isoformat() if campaign.updated_at else None, }, "current_version": _version_info(version), "cards": { "jobs_total": len(jobs), "queueable": queueable, "needs_attention": needs_attention, "sent": sent, "failed": failed, "imap_appended": imap_counts.get("appended", 0), "imap_failed": imap_counts.get("failed", 0), }, "status_counts": { "build": build_counts, "validation": validation_counts, "queue": queue_counts, "send": send_counts, "imap": imap_counts, }, "issues": { **_issue_summary_from_jobs(jobs), "persisted_campaign_issue_count": persisted_issues, }, "attachments": _attachment_summary(jobs), "attempts": { "send_attempts": int(send_attempts), "imap_append_attempts": int(imap_attempts), }, "delivery": _load_delivery_info(version, jobs), } if include_recent_failures: report["recent_failures"] = _recent_failures(jobs) if include_jobs: report["jobs"] = [_job_row(job) for job in jobs] return report def generate_jobs_csv(session: Session, *, tenant_id: str, campaign_id: str) -> str: campaign = _get_campaign(session, tenant_id=tenant_id, campaign_id=campaign_id) jobs = ( session.query(CampaignJob) .filter(CampaignJob.tenant_id == tenant_id, CampaignJob.campaign_id == campaign.id) .order_by(CampaignJob.entry_index.asc()) .all() ) rows = [_job_row(job) for job in jobs] fieldnames = [ "job_id", "entry_index", "entry_id", "recipient_email", "subject", "build_status", "validation_status", "queue_status", "send_status", "imap_status", "attempt_count", "queued_at", "sent_at", "last_error", "eml_size_bytes", "issues_count", "attachment_config_count", "matched_file_count", ] buffer = io.StringIO() writer = csv.DictWriter(buffer, fieldnames=fieldnames) writer.writeheader() writer.writerows(rows) return buffer.getvalue()