diff --git a/server/app/api/v1/campaigns.py b/server/app/api/v1/campaigns.py index d5ca98b..d731517 100644 --- a/server/app/api/v1/campaigns.py +++ b/server/app/api/v1/campaigns.py @@ -351,6 +351,7 @@ def validate_version( tenant_id=principal.tenant_id, version_id=version_id, check_files=payload.check_files if payload else False, + user_id=principal.user.id, ) audit_from_principal( session, @@ -536,7 +537,14 @@ def email_campaign_report( # Queue / delivery control ------------------------------------------------- -from app.api.v1.schemas import AppendSentRequest, CampaignActionResponse, QueueCampaignRequest, QueueCampaignResponse +from app.api.v1.schemas import ( + AppendSentRequest, + CampaignActionResponse, + QueueCampaignRequest, + QueueCampaignResponse, + SendCampaignNowRequest, + SendCampaignNowResponse, +) from app.mailer.sending.jobs import ( QueueingError, cancel_campaign_jobs, @@ -544,6 +552,7 @@ from app.mailer.sending.jobs import ( pause_campaign_jobs, queue_campaign_jobs, resume_campaign_jobs, + send_campaign_now, ) @@ -579,6 +588,78 @@ def queue_campaign( raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(exc)) from exc +@router.post("/{campaign_id}/send-now", response_model=SendCampaignNowResponse) +def send_campaign_now_endpoint( + campaign_id: str, + payload: SendCampaignNowRequest | None = None, + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("campaign:send")), +): + """Validate/build/queue and synchronously send a small campaign version. + + This endpoint is intentionally conservative and suitable for a first small + test campaign. Larger campaigns should use the queue/Celery flow. + """ + + payload = payload or SendCampaignNowRequest() + try: + campaign = _get_campaign_for_tenant(session, campaign_id, principal.tenant_id) + version_id = payload.version_id or campaign.current_version_id + if not version_id: + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Campaign has no current version") + + validation_result: dict[str, object] | None = None + build_result: dict[str, object] | None = None + if payload.validate_before_send: + validation_result = validate_campaign_version( + session, + tenant_id=principal.tenant_id, + version_id=version_id, + check_files=payload.check_files, + user_id=principal.user.id, + ) + if not validation_result.get("ok"): + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail={"message": "Campaign validation failed", "validation": validation_result}) + + if payload.build_before_send: + build_result = build_campaign_version( + session, + tenant_id=principal.tenant_id, + version_id=version_id, + write_eml=True, + ) + + result = send_campaign_now( + session, + tenant_id=principal.tenant_id, + campaign_id=campaign_id, + version_id=version_id, + include_warnings=payload.include_warnings, + dry_run=payload.dry_run, + use_rate_limit=payload.use_rate_limit, + enqueue_imap_task=payload.enqueue_imap_task, + ).as_dict() + result["validation"] = validation_result + result["build"] = build_result + audit_from_principal( + session, + principal, + action="campaign.sent_now" if not payload.dry_run else "campaign.send_now_dry_run", + object_type="campaign", + object_id=campaign_id, + details=result, + commit=True, + ) + return SendCampaignNowResponse(result=result) + except HTTPException: + raise + except (CampaignPersistenceError, QueueingError) as exc: + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(exc)) from exc + except Exception as exc: + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(exc)) from exc + + + @router.post("/{campaign_id}/pause", response_model=CampaignActionResponse) def pause_campaign( campaign_id: str, diff --git a/server/app/api/v1/schemas.py b/server/app/api/v1/schemas.py index 7d5aad6..cf07a63 100644 --- a/server/app/api/v1/schemas.py +++ b/server/app/api/v1/schemas.py @@ -199,6 +199,23 @@ class QueueCampaignResponse(BaseModel): dry_run: bool = False +class SendCampaignNowRequest(BaseModel): + model_config = ConfigDict(extra="forbid") + + version_id: str | None = None + include_warnings: bool = True + check_files: bool = False + validate_before_send: bool = True + build_before_send: bool = True + dry_run: bool = False + use_rate_limit: bool = True + enqueue_imap_task: bool = False + + +class SendCampaignNowResponse(BaseModel): + result: dict[str, Any] + + class AppendSentRequest(BaseModel): model_config = ConfigDict(extra="forbid") diff --git a/server/app/mailer/attachments/resolver.py b/server/app/mailer/attachments/resolver.py index bf9960f..fe581d1 100644 --- a/server/app/mailer/attachments/resolver.py +++ b/server/app/mailer/attachments/resolver.py @@ -10,7 +10,7 @@ from pydantic import BaseModel, ConfigDict, Field from app.mailer.campaign.entries import load_campaign_entries from app.mailer.campaign.field_values import effective_entry_field_values -from app.mailer.campaign.models import AttachmentConfig, Behavior, CampaignConfig, EntryConfig +from app.mailer.campaign.models import AttachmentBasePathConfig, AttachmentConfig, Behavior, CampaignConfig, EntryConfig class AttachmentScope(StrEnum): @@ -57,6 +57,8 @@ class ResolvedAttachment(BaseModel): label: str | None = None base_dir_template: str file_filter_template: str + base_path_name: str | None = None + base_path: str | None = None base_dir: str file_filter: str directory: str @@ -192,15 +194,99 @@ def _template_values(config: CampaignConfig, entry: EntryConfig) -> dict[str, An return values -def _iter_effective_attachment_configs(config: CampaignConfig, entry: EntryConfig) -> Iterable[tuple[AttachmentScope, int, AttachmentConfig]]: +def _rendered_base_dir(config: AttachmentConfig, values: dict[str, Any]) -> str: + rendered = _render_template(config.base_dir, values).strip() + return rendered or "." + + +def _base_path_by_path(config: CampaignConfig, rendered_base_dir: str) -> AttachmentBasePathConfig | None: + for base_path in config.attachments.base_paths: + if base_path.path == rendered_base_dir: + return base_path + return None + + +def _default_base_path(config: CampaignConfig) -> AttachmentBasePathConfig: + return config.attachments.base_paths[0] + + +def _selected_base_path(config: CampaignConfig, rendered_base_dir: str) -> AttachmentBasePathConfig | None: + if config.attachments.base_paths: + if rendered_base_dir in {"", "."}: + return _default_base_path(config) + return _base_path_by_path(config, rendered_base_dir) + return None + + +def _rule_allows_multiple(config: AttachmentConfig, rendered_file_filter: str) -> bool: + """Return whether a rule may produce multiple attachments. + + New UI versions no longer expose allow_multiple. Treat wildcard patterns as + inherently multi-match-capable while keeping the legacy allow_multiple flag + for old campaign JSON. + """ + + return config.allow_multiple or any(char in rendered_file_filter for char in "*?[") + + +def _missing_behavior(campaign_config: CampaignConfig, config: AttachmentConfig) -> Behavior: + if config.missing_behavior is not None: + return config.missing_behavior + if config.required: + return campaign_config.validation_policy.missing_required_attachment + return campaign_config.validation_policy.missing_optional_attachment + + +def _ambiguous_behavior(campaign_config: CampaignConfig, config: AttachmentConfig) -> Behavior: + return config.ambiguous_behavior or campaign_config.validation_policy.ambiguous_attachment_match + + +def _entry_attachment_allowed(config: CampaignConfig, attachment_config: AttachmentConfig, values: dict[str, Any]) -> bool: + rendered_base_dir = _rendered_base_dir(attachment_config, values) + individual_paths = config.attachments.individual_base_path_values + if individual_paths: + return rendered_base_dir in individual_paths + return config.attachments.allow_individual + + +def _iter_effective_attachment_configs( + config: CampaignConfig, + entry: EntryConfig, + values: dict[str, Any], +) -> Iterable[tuple[AttachmentScope, int, AttachmentConfig]]: if entry.combine_attachments: for index, attachment_config in enumerate(config.attachments.global_): yield AttachmentScope.GLOBAL, index, attachment_config - if config.attachments.allow_individual: - for index, attachment_config in enumerate(entry.attachments): + for index, attachment_config in enumerate(entry.attachments): + if _entry_attachment_allowed(config, attachment_config, values): yield AttachmentScope.ENTRY, index, attachment_config +def _resolve_attachment_directory( + *, + campaign_file: str | Path, + campaign_config: CampaignConfig, + rendered_base_dir: str, +) -> tuple[Path, AttachmentBasePathConfig | None]: + """Resolve the directory for an attachment rule. + + Legacy campaigns used attachments.base_path as the root and base_dir as a + child directory. Current WebUI campaigns select one named base path directly + in base_dir. Prefer the new base_paths list when present to avoid resolving + e.g. attachments/base_path + base_dir twice. + """ + + selected_base_path = _selected_base_path(campaign_config, rendered_base_dir) + if selected_base_path is not None: + return _resolve_path(campaign_file, selected_base_path.path), selected_base_path + + if campaign_config.attachments.base_paths: + return _resolve_path(campaign_file, rendered_base_dir), None + + legacy_root = _resolve_path(campaign_file, campaign_config.attachments.base_path) + return (legacy_root / rendered_base_dir).resolve(), None + + def _match_files(directory: Path, file_filter: str, include_subdirs: bool) -> list[Path]: if not directory.exists() or not directory.is_dir(): return [] @@ -227,7 +313,7 @@ def _issue_for_ambiguous(config: AttachmentConfig, behavior: Behavior, match_cou return AttachmentIssue( severity=severity, code="ambiguous_attachment_match", - message=f"Attachment filter {config.file_filter!r} matched {match_count} files, but allow_multiple is false", + message=f"Attachment filter {config.file_filter!r} matched {match_count} files, but it is configured as a direct/single-file selection", behavior=behavior, ) @@ -235,27 +321,32 @@ def _issue_for_ambiguous(config: AttachmentConfig, behavior: Behavior, match_cou def _resolve_one_config( *, campaign_file: str | Path, - attachments_base_path: Path, + campaign_config: CampaignConfig, values: dict[str, Any], scope: AttachmentScope, index: int, config: AttachmentConfig, ) -> ResolvedAttachment: - rendered_base_dir = _render_template(config.base_dir, values) + rendered_base_dir = _rendered_base_dir(config, values) rendered_file_filter = _render_template(config.file_filter, values) - directory = (attachments_base_path / rendered_base_dir).resolve() + directory, selected_base_path = _resolve_attachment_directory( + campaign_file=campaign_file, + campaign_config=campaign_config, + rendered_base_dir=rendered_base_dir, + ) matches = _match_files(directory, rendered_file_filter, config.include_subdirs) + allow_multiple = _rule_allows_multiple(config, rendered_file_filter) issues: list[AttachmentIssue] = [] behavior: Behavior | None = None if not matches: status = AttachmentMatchStatus.MISSING - behavior = config.missing_behavior + behavior = _missing_behavior(campaign_config, config) issues.append(_issue_for_missing(config, behavior)) - elif len(matches) > 1 and not config.allow_multiple: + elif len(matches) > 1 and not allow_multiple: status = AttachmentMatchStatus.AMBIGUOUS - behavior = config.ambiguous_behavior + behavior = _ambiguous_behavior(campaign_config, config) issues.append(_issue_for_ambiguous(config, behavior, len(matches))) else: status = AttachmentMatchStatus.OK @@ -267,12 +358,14 @@ def _resolve_one_config( label=config.label, base_dir_template=config.base_dir, file_filter_template=config.file_filter, + base_path_name=selected_base_path.name if selected_base_path else None, + base_path=selected_base_path.path if selected_base_path else None, base_dir=rendered_base_dir, file_filter=rendered_file_filter, directory=str(directory), include_subdirs=config.include_subdirs, required=config.required, - allow_multiple=config.allow_multiple, + allow_multiple=allow_multiple, zip_enabled=config.zip.enabled, status=status, behavior=behavior, @@ -303,16 +396,15 @@ def resolve_entry_attachments( entry: EntryConfig, entry_index: int, ) -> EntryAttachmentResolution: - attachments_base_path = _resolve_path(campaign_file, config.attachments.base_path) values = _template_values(config, entry) resolved: list[ResolvedAttachment] = [] if entry.active: - for scope, index, attachment_config in _iter_effective_attachment_configs(config, entry): + for scope, index, attachment_config in _iter_effective_attachment_configs(config, entry, values): resolved.append( _resolve_one_config( campaign_file=campaign_file, - attachments_base_path=attachments_base_path, + campaign_config=config, values=values, scope=scope, index=index, @@ -333,7 +425,7 @@ def resolve_entry_attachments( def resolve_campaign_attachments(config: CampaignConfig, *, campaign_file: str | Path) -> AttachmentResolutionReport: entries = load_campaign_entries(config, campaign_file=campaign_file) - base_path = _resolve_path(campaign_file, config.attachments.base_path) + base_path = _resolve_path(campaign_file, config.attachments.base_paths[0].path if config.attachments.base_paths else config.attachments.base_path) resolved_entries = [ resolve_entry_attachments(config=config, campaign_file=campaign_file, entry=entry, entry_index=index) for index, entry in enumerate(entries, start=1) diff --git a/server/app/mailer/campaign/models.py b/server/app/mailer/campaign/models.py index 1ca0ade..3fa5f86 100644 --- a/server/app/mailer/campaign/models.py +++ b/server/app/mailer/campaign/models.py @@ -136,6 +136,14 @@ class RecipientConfig(StrictModel): class RecipientsConfig(StrictModel): from_: RecipientConfig | None = Field(default=None, alias="from") + + @field_validator("from_", mode="before") + @classmethod + def empty_from_object_means_unset(cls, value: Any) -> Any: + if isinstance(value, dict) and not any(value.values()): + return None + return value + allow_individual_from: bool = False to: list[RecipientConfig] = Field(default_factory=list) @@ -200,6 +208,17 @@ class ZipConfig(StrictModel): method: ZipMethod = ZipMethod.AES +class AttachmentBasePathConfig(StrictModel): + id: str | None = None + name: str + path: str = "." + allow_individual: bool = False + # Legacy UI builds briefly wrote a source value. Keep accepting it so older + # drafts do not become invalid merely because the current UI no longer shows + # or edits that column. + source: str | None = None + + class AttachmentConfig(StrictModel): id: str | None = None label: str | None = None @@ -208,26 +227,45 @@ class AttachmentConfig(StrictModel): include_subdirs: bool = False required: bool = True allow_multiple: bool = False - missing_behavior: Behavior = Behavior.ASK - ambiguous_behavior: Behavior = Behavior.ASK + # None means: inherit from validation_policy. Explicit values remain + # supported for backwards compatibility and per-rule overrides. + missing_behavior: Behavior | None = None + ambiguous_behavior: Behavior | None = None zip: ZipConfig = Field(default_factory=ZipConfig) class AttachmentsConfig(StrictModel): base_path: str = "." + base_paths: list[AttachmentBasePathConfig] = Field(default_factory=list) allow_individual: bool = False send_without_attachments: bool = True global_: list[AttachmentConfig] = Field(default_factory=list, alias="global") missing_behavior: Behavior = Behavior.ASK ambiguous_behavior: Behavior = Behavior.ASK + @property + def individual_base_path_values(self) -> set[str]: + return {base_path.path for base_path in self.base_paths if base_path.allow_individual} + class EntryConfig(StrictModel): id: str | None = None active: bool = True + # Compatibility fields written by older/current WebUI recipient rows. + # Address routing uses the explicit to/cc/bcc/reply_to/from fields below; + # these values are retained for round-tripping but are not used for sending. + name: str | None = None + email: str | None = None from_: RecipientConfig | None = Field(default=None, alias="from") + @field_validator("from_", mode="before") + @classmethod + def empty_from_object_means_unset(cls, value: Any) -> Any: + if isinstance(value, dict) and not any(value.values()): + return None + return value + to: list[RecipientConfig] = Field(default_factory=list) combine_to: bool = True @@ -270,8 +308,12 @@ class EntriesConfig(StrictModel): @model_validator(mode="after") def inline_or_external(self) -> "EntriesConfig": has_inline = self.inline is not None - has_external = self.source is not None or self.mapping is not None or self.defaults is not None - if has_inline and has_external: + has_external_source = self.source is not None or self.mapping is not None + # defaults are compatible with both inline and external entries. The + # WebUI stores the current per-entry combination defaults here even for + # inline campaigns, so treating defaults as an external-source marker + # made valid UI drafts fail backend validation. + if has_inline and has_external_source: raise ValueError("entries must be either inline or source-based, not both") if has_inline: return self @@ -292,6 +334,7 @@ class ValidationPolicy(StrictModel): missing_required_attachment: Behavior = Behavior.ASK missing_optional_attachment: Behavior = Behavior.WARN ambiguous_attachment_match: Behavior = Behavior.ASK + ignore_empty_fields: bool = False missing_email: MissingAddressBehavior = MissingAddressBehavior.BLOCK template_error: MissingAddressBehavior = MissingAddressBehavior.BLOCK inactive_entry: InactiveEntryBehavior = InactiveEntryBehavior.DROP diff --git a/server/app/mailer/campaign/validation.py b/server/app/mailer/campaign/validation.py index 4b5eaa8..16b0cbb 100644 --- a/server/app/mailer/campaign/validation.py +++ b/server/app/mailer/campaign/validation.py @@ -8,7 +8,7 @@ from typing import Iterable from pydantic import BaseModel, ConfigDict, Field from .field_values import ignored_entry_field_overrides -from .models import CampaignConfig, EntryConfig, SourceType +from .models import AttachmentConfig, CampaignConfig, EntryConfig, SourceType class Severity(StrEnum): @@ -136,6 +136,59 @@ def _iter_template_source_paths(config: CampaignConfig) -> Iterable[tuple[str, s return paths +def _attachment_base_path_report_value(config: CampaignConfig) -> str: + if config.attachments.base_paths: + return ", ".join(f"{base_path.name}: {base_path.path}" for base_path in config.attachments.base_paths) + return config.attachments.base_path + + +def _iter_attachment_rules(config: CampaignConfig) -> Iterable[tuple[str, AttachmentConfig, bool]]: + for index, attachment_config in enumerate(config.attachments.global_): + yield f"/attachments/global/{index}", attachment_config, False + + inline_entries = config.entries.inline or [] if config.entries.is_inline else [] + for entry_index, entry in enumerate(inline_entries): + for attachment_index, attachment_config in enumerate(entry.attachments): + yield f"/entries/inline/{entry_index}/attachments/{attachment_index}", attachment_config, True + + if config.entries.defaults: + for attachment_index, attachment_config in enumerate(config.entries.defaults.attachments): + yield f"/entries/defaults/attachments/{attachment_index}", attachment_config, True + + +def _attachment_path_issues(config: CampaignConfig) -> list[SemanticIssue]: + issues: list[SemanticIssue] = [] + configured_paths = {base_path.path for base_path in config.attachments.base_paths} + individual_paths = config.attachments.individual_base_path_values + + if config.attachments.base_paths: + for index, base_path in enumerate(config.attachments.base_paths): + if not base_path.name.strip(): + issues.append(_issue(Severity.WARNING, "attachment_base_path_missing_name", "attachment base path has no display name", f"/attachments/base_paths/{index}/name")) + if not base_path.path.strip(): + issues.append(_issue(Severity.ERROR, "attachment_base_path_missing_path", "attachment base path has no path", f"/attachments/base_paths/{index}/path")) + elif not config.attachments.base_path: + issues.append(_issue(Severity.INFO, "missing_attachment_base_path", "Attachment base path is not configured yet.", "/attachments/base_path")) + + if configured_paths: + for path, attachment_config, is_individual in _iter_attachment_rules(config): + if attachment_config.base_dir and attachment_config.base_dir not in configured_paths: + issues.append(_issue( + Severity.WARNING, + "unknown_attachment_base_path", + f"attachment rule refers to base path {attachment_config.base_dir!r}, but it is not listed in attachments.base_paths", + f"{path}/base_dir", + )) + if is_individual and individual_paths and attachment_config.base_dir not in individual_paths: + issues.append(_issue( + Severity.WARNING, + "individual_attachment_base_path_not_allowed", + f"individual attachment rule uses base path {attachment_config.base_dir!r}, but that base path does not allow individual attachments", + f"{path}/base_dir", + )) + return issues + + def _ignored_override_issues(config: CampaignConfig, entry: EntryConfig, path_prefix: str) -> list[SemanticIssue]: return [ _issue( @@ -170,6 +223,8 @@ def validate_campaign_config( f"/global_values/{key}", )) + issues.extend(_attachment_path_issues(config)) + if config.server.imap and config.server.imap.enabled: missing = [name for name in ["host", "port", "username", "password"] if getattr(config.server.imap, name) in (None, "")] if missing: @@ -263,14 +318,25 @@ def validate_campaign_config( issues.append(_issue(Severity.ERROR, "entries_source_read_error", str(exc), "/entries/source/path")) if check_files: - attachments_base_path = _resolve(campaign_path, config.attachments.base_path) - if not attachments_base_path.exists(): - issues.append(_issue( - Severity.WARNING, - "attachments_base_path_not_found", - f"attachments.base_path does not exist: {attachments_base_path}", - "/attachments/base_path", - )) + if config.attachments.base_paths: + for index, base_path_config in enumerate(config.attachments.base_paths): + attachments_base_path = _resolve(campaign_path, base_path_config.path) + if not attachments_base_path.exists(): + issues.append(_issue( + Severity.WARNING, + "attachments_base_path_not_found", + f"attachment base path {base_path_config.name!r} does not exist: {attachments_base_path}", + f"/attachments/base_paths/{index}/path", + )) + else: + attachments_base_path = _resolve(campaign_path, config.attachments.base_path) + if not attachments_base_path.exists(): + issues.append(_issue( + Severity.WARNING, + "attachments_base_path_not_found", + f"attachments.base_path does not exist: {attachments_base_path}", + "/attachments/base_path", + )) for schema_path, raw_path in _iter_template_source_paths(config): path = _resolve(campaign_path, raw_path) if not path.exists(): @@ -287,7 +353,7 @@ def validate_campaign_config( issues=issues, entries_mode=entries_mode, entries_count=entries_count, - attachments_base_path=config.attachments.base_path, + attachments_base_path=_attachment_base_path_report_value(config), rate_limit=f"{config.delivery.rate_limit.messages_per_minute}/min, concurrency {config.delivery.rate_limit.concurrency}", imap_append_enabled=config.delivery.imap_append_sent.enabled, ) diff --git a/server/app/mailer/messages/builder.py b/server/app/mailer/messages/builder.py index d013bb8..3f088c7 100644 --- a/server/app/mailer/messages/builder.py +++ b/server/app/mailer/messages/builder.py @@ -273,6 +273,8 @@ def _attachment_summaries(resolution: EntryAttachmentResolution) -> list[Message required=attachment.required, allow_multiple=attachment.allow_multiple, zip_enabled=attachment.zip_enabled, + base_path_name=attachment.base_path_name, + base_path=attachment.base_path, file_filter=attachment.file_filter, directory=attachment.directory, matches=attachment.matches, @@ -330,9 +332,8 @@ def _iter_attachment_configs_for_resolution(config: CampaignConfig, entry: Entry if entry.combine_attachments: for index, attachment_config in enumerate(config.attachments.global_): yield "global", index, attachment_config - if config.attachments.allow_individual: - for index, attachment_config in enumerate(entry.attachments): - yield "entry", index, attachment_config + for index, attachment_config in enumerate(entry.attachments): + yield "entry", index, attachment_config def _zip_config_for_attachment(config: CampaignConfig, entry: EntryConfig, resolved: ResolvedAttachment): @@ -457,9 +458,10 @@ def build_entry_message( subject_template, text_template, html_template = _load_template_parts(config, campaign_file) values = _template_values(config, entry) - subject = _render_template(subject_template, values) - text_body = _render_template(text_template or "", values) if text_template is not None else None - html_body = _render_template(html_template or "", values) if html_template is not None else None + keep_missing_placeholders = not config.validation_policy.ignore_empty_fields + subject = _render_template(subject_template, values, keep_missing=keep_missing_placeholders) + text_body = _render_template(text_template or "", values, keep_missing=keep_missing_placeholders) if text_template is not None else None + html_body = _render_template(html_template or "", values, keep_missing=keep_missing_placeholders) if html_template is not None else None unresolved = sorted( _find_unresolved_placeholders(subject) diff --git a/server/app/mailer/messages/models.py b/server/app/mailer/messages/models.py index b879950..cde16aa 100644 --- a/server/app/mailer/messages/models.py +++ b/server/app/mailer/messages/models.py @@ -52,6 +52,8 @@ class MessageAttachmentSummary(BaseModel): required: bool allow_multiple: bool zip_enabled: bool + base_path_name: str | None = None + base_path: str | None = None file_filter: str directory: str matches: list[str] = Field(default_factory=list) diff --git a/server/app/mailer/persistence/campaigns.py b/server/app/mailer/persistence/campaigns.py index b65291d..4d3e277 100644 --- a/server/app/mailer/persistence/campaigns.py +++ b/server/app/mailer/persistence/campaigns.py @@ -14,6 +14,7 @@ from app.db.models import ( CampaignJob, CampaignStatus, CampaignVersion, + CampaignVersionWorkflowState, JobBuildStatus, JobImapStatus, JobQueueStatus, @@ -154,7 +155,15 @@ def load_version_config(session: Session, version_id: str): return version, path, load_campaign_config(path) -def validate_campaign_version(session: Session, *, tenant_id: str, version_id: str, check_files: bool = False) -> dict[str, Any]: +def validate_campaign_version( + session: Session, + *, + tenant_id: str, + version_id: str, + check_files: bool = False, + user_id: str | None = None, + lock_on_success: bool = True, +) -> dict[str, Any]: version, snapshot_path, config = load_version_config(session, version_id) campaign = session.get(Campaign, version.campaign_id) if not campaign or campaign.tenant_id != tenant_id: @@ -186,8 +195,15 @@ def validate_campaign_version(session: Session, *, tenant_id: str, version_id: s campaign.status = CampaignStatus.VALIDATED.value if report.ok else CampaignStatus.NEEDS_REVIEW.value if report.ok: - version.workflow_state = "under_review" + version.workflow_state = CampaignVersionWorkflowState.APPROVED.value version.is_complete = True + if lock_on_success and version.locked_at is None: + from datetime import UTC, datetime + + version.locked_at = datetime.now(UTC) + version.locked_by_user_id = user_id + else: + version.workflow_state = CampaignVersionWorkflowState.EDITING.value session.add(version) session.add(campaign) session.commit() @@ -248,6 +264,15 @@ def build_campaign_version( campaign = session.get(Campaign, version.campaign_id) if not campaign or campaign.tenant_id != tenant_id: raise CampaignPersistenceError("Campaign version is not accessible for this tenant") + if version.workflow_state == CampaignVersionWorkflowState.COMPLETED.value: + raise CampaignPersistenceError("Sent campaign versions cannot be rebuilt") + validation_summary = version.validation_summary if isinstance(version.validation_summary, dict) else {} + if not validation_summary.get("ok"): + raise CampaignPersistenceError("Campaign version must be successfully validated before messages are built") + if version.locked_at is None: + from datetime import UTC, datetime + + version.locked_at = datetime.now(UTC) output_dir = BUILD_OUTPUT_DIR / campaign.id / version.id result = build_campaign_messages(config, campaign_file=snapshot_path, output_dir=output_dir, write_eml=write_eml) @@ -296,10 +321,10 @@ def build_campaign_version( if result.report.needs_review_count or result.report.blocked_count: campaign.status = CampaignStatus.NEEDS_REVIEW.value - version.workflow_state = "under_review" + version.workflow_state = CampaignVersionWorkflowState.APPROVED.value elif result.report.queueable_count > 0: campaign.status = CampaignStatus.READY_TO_QUEUE.value - version.workflow_state = "built" + version.workflow_state = CampaignVersionWorkflowState.BUILT.value else: campaign.status = CampaignStatus.VALIDATED.value diff --git a/server/app/mailer/persistence/versions.py b/server/app/mailer/persistence/versions.py index 28d5d4c..043dcfb 100644 --- a/server/app/mailer/persistence/versions.py +++ b/server/app/mailer/persistence/versions.py @@ -87,6 +87,14 @@ def minimal_campaign_json(*, external_id: str, name: str, description: str | Non }, "attachments": { "base_path": ".", + "base_paths": [ + { + "id": "default", + "name": "Campaign files", + "path": ".", + "allow_individual": True, + } + ], "allow_individual": True, "send_without_attachments": False, "global": [], @@ -111,6 +119,7 @@ def minimal_campaign_json(*, external_id: str, name: str, description: str | Non "missing_required_attachment": "ask", "missing_optional_attachment": "warn", "ambiguous_attachment_match": "ask", + "ignore_empty_fields": False, "missing_email": "block", "template_error": "block", }, @@ -195,6 +204,92 @@ def get_campaign_version_for_tenant( return version + + +LOCKED_WORKFLOW_STATES = { + CampaignVersionWorkflowState.APPROVED.value, + CampaignVersionWorkflowState.BUILT.value, + CampaignVersionWorkflowState.QUEUED.value, + CampaignVersionWorkflowState.SENDING.value, + CampaignVersionWorkflowState.COMPLETED.value, + CampaignVersionWorkflowState.CANCELLED.value, + CampaignVersionWorkflowState.ARCHIVED.value, +} + + +def is_version_locked(version: CampaignVersion) -> bool: + """Return True when a version is immutable and edits must fork.""" + + return bool(version.locked_at or version.workflow_state in LOCKED_WORKFLOW_STATES) + + +def _apply_campaign_metadata(campaign: Campaign, raw_json: dict[str, Any]) -> None: + campaign_meta = raw_json.get("campaign") if isinstance(raw_json.get("campaign"), dict) else {} + if campaign_meta: + campaign.name = campaign_meta.get("name") or campaign.name + campaign.description = campaign_meta.get("description", campaign.description) + campaign.external_id = campaign_meta.get("id") or campaign.external_id + + +def fork_campaign_version_for_edit( + session: Session, + *, + tenant_id: str, + campaign_id: str, + version_id: str, + raw_json: dict[str, Any] | None = None, + current_flow: str | None = None, + current_step: str | None = None, + editor_state: dict[str, Any] | None = None, + source_filename: str | None = None, + source_base_path: str | None = None, + autosave: bool = True, +) -> CampaignVersion: + """Create a new editable working version from a locked/validated version. + + This preserves the audit value of the validated/sent version while allowing + users to continue editing a campaign. New content starts with the supplied + raw_json when provided, otherwise with a clone of the source version. + """ + + source = get_campaign_version_for_tenant(session, tenant_id=tenant_id, campaign_id=campaign_id, version_id=version_id) + campaign = session.get(Campaign, campaign_id) + assert campaign is not None + + base_json = raw_json if raw_json is not None else copy.deepcopy(source.raw_json) + runtime_json = normalize_campaign_paths(base_json, source_base_path) if source_base_path else copy.deepcopy(base_json) + + new_version = CampaignVersion( + campaign_id=campaign.id, + version_number=_next_version_number(session, campaign.id), + raw_json=runtime_json, + schema_version=str(runtime_json.get("version", source.schema_version or "1.0")), + source_filename=source_filename if source_filename is not None else source.source_filename, + source_base_path=source_base_path if source_base_path is not None else source.source_base_path, + workflow_state=CampaignVersionWorkflowState.EDITING.value, + current_flow=current_flow if current_flow is not None else (source.current_flow or CampaignVersionFlow.MANUAL.value), + current_step=current_step if current_step is not None else source.current_step, + is_complete=False, + editor_state=editor_state if editor_state is not None else copy.deepcopy(source.editor_state or {}), + autosaved_at=datetime.now(UTC) if autosave else None, + ) + session.add(new_version) + session.flush() + + _apply_campaign_metadata(campaign, runtime_json) + campaign.current_version_id = new_version.id + campaign.status = CampaignStatus.DRAFT.value + session.add(campaign) + _write_campaign_snapshot(new_version) + session.commit() + return new_version + + +def lock_validated_version(version: CampaignVersion, *, user_id: str | None = None) -> None: + if version.locked_at is None: + version.locked_at = datetime.now(UTC) + version.locked_by_user_id = user_id + def update_campaign_version( session: Session, *, @@ -215,15 +310,28 @@ def update_campaign_version( campaign = session.get(Campaign, campaign_id) assert campaign is not None + if is_version_locked(version): + if raw_json is None: + raise CampaignPersistenceError("Campaign version is locked. Save campaign changes to create a new editable version.") + return fork_campaign_version_for_edit( + session, + tenant_id=tenant_id, + campaign_id=campaign_id, + version_id=version_id, + raw_json=raw_json, + current_flow=current_flow, + current_step=current_step, + editor_state=editor_state, + source_filename=source_filename, + source_base_path=source_base_path, + autosave=autosave, + ) + if raw_json is not None: runtime_json = normalize_campaign_paths(raw_json, source_base_path) if source_base_path else copy.deepcopy(raw_json) version.raw_json = runtime_json version.schema_version = str(runtime_json.get("version", version.schema_version or "1.0")) - campaign_meta = runtime_json.get("campaign") if isinstance(runtime_json.get("campaign"), dict) else {} - if campaign_meta: - campaign.name = campaign_meta.get("name") or campaign.name - campaign.description = campaign_meta.get("description", campaign.description) - campaign.external_id = campaign_meta.get("id") or campaign.external_id + _apply_campaign_metadata(campaign, runtime_json) if current_flow is not None: version.current_flow = current_flow @@ -246,6 +354,11 @@ def update_campaign_version( if raw_json is not None: version.validation_summary = None version.build_summary = None + version.locked_at = None + version.locked_by_user_id = None + if version.workflow_state != CampaignVersionWorkflowState.EDITING.value: + version.workflow_state = CampaignVersionWorkflowState.EDITING.value + campaign.status = CampaignStatus.DRAFT.value session.query(CampaignIssue).filter(CampaignIssue.campaign_version_id == version.id).delete(synchronize_session=False) session.add(version) @@ -323,7 +436,9 @@ def validate_campaign_partial(raw_json: dict[str, Any], *, section: str | None = issue("warning", "template", "template", "missing_template_body", "No text, HTML or file-based template body configured yet.") attachments = raw_json.get("attachments") if isinstance(raw_json.get("attachments"), dict) else {} - if not attachments.get("base_path"): + base_paths = attachments.get("base_paths") if isinstance(attachments.get("base_paths"), list) else [] + has_named_base_path = any(isinstance(item, dict) and item.get("path") for item in base_paths) + if not has_named_base_path and not attachments.get("base_path"): issue("info", "attachments", "attachments.base_path", "missing_attachment_base_path", "Attachment base path is not configured yet.") delivery = raw_json.get("delivery") if isinstance(raw_json.get("delivery"), dict) else {} diff --git a/server/app/mailer/schema/campaign.schema.json b/server/app/mailer/schema/campaign.schema.json index e25510c..fa52280 100644 --- a/server/app/mailer/schema/campaign.schema.json +++ b/server/app/mailer/schema/campaign.schema.json @@ -172,7 +172,18 @@ "type": "object", "properties": { "from": { - "$ref": "#/$defs/recipient" + "oneOf": [ + { + "$ref": "#/$defs/recipient" + }, + { + "type": "object", + "maxProperties": 0 + }, + { + "type": "null" + } + ] }, "allow_individual_from": { "type": "boolean", @@ -260,10 +271,16 @@ "type": "string" }, "text": { - "type": "string" + "type": [ + "string", + "null" + ] }, "html": { - "type": "string" + "type": [ + "string", + "null" + ] } }, "additionalProperties": false @@ -311,7 +328,15 @@ "base_path": { "type": "string", "default": ".", - "description": "Campaign-level base path. Global and entry attachment base_dir values are resolved relative to this path unless absolute." + "description": "Legacy single campaign-level attachment base path. Used when attachments.base_paths is absent." + }, + "base_paths": { + "type": "array", + "items": { + "$ref": "#/$defs/attachment_base_path" + }, + "default": [], + "description": "Named selectable attachment base paths. New WebUI campaigns use these instead of the legacy single base_path." }, "allow_individual": { "type": "boolean", @@ -355,6 +380,7 @@ "additionalProperties": false, "default": { "base_path": ".", + "base_paths": [], "global": [] } }, @@ -371,6 +397,9 @@ "items": { "$ref": "#/$defs/entry" } + }, + "defaults": { + "$ref": "#/$defs/entry" } }, "additionalProperties": false @@ -436,6 +465,11 @@ ], "default": "ask" }, + "ignore_empty_fields": { + "type": "boolean", + "default": false, + "description": "Render unresolved/empty template placeholders as an empty string instead of keeping them and raising a template error." + }, "missing_email": { "type": "string", "enum": [ @@ -463,7 +497,9 @@ } }, "additionalProperties": false, - "default": {} + "default": { + "ignore_empty_fields": false + } }, "delivery": { "type": "object", @@ -601,7 +637,7 @@ }, "base_dir": { "type": "string", - "description": "Directory relative to attachments.base_path unless absolute." + "description": "Selected attachment base path for current WebUI campaigns, or a directory relative to attachments.base_path for legacy campaigns." }, "file_filter": { "type": "string", @@ -617,29 +653,40 @@ }, "allow_multiple": { "type": "boolean", - "default": false + "default": false, + "description": "Legacy compatibility flag. Current UI treats wildcard file_filter patterns as multi-match-capable automatically." }, "missing_behavior": { - "type": "string", + "type": [ + "string", + "null" + ], "enum": [ "block", "ask", "drop", "continue", - "warn" + "warn", + null ], - "default": "ask" + "default": null, + "description": "Optional per-rule override. Null or omitted inherits from validation_policy." }, "ambiguous_behavior": { - "type": "string", + "type": [ + "string", + "null" + ], "enum": [ "block", "ask", "drop", "continue", - "warn" + "warn", + null ], - "default": "ask" + "default": null, + "description": "Optional per-rule override. Null or omitted inherits from validation_policy." }, "zip": { "type": "object", @@ -682,7 +729,18 @@ "default": true }, "from": { - "$ref": "#/$defs/recipient" + "oneOf": [ + { + "$ref": "#/$defs/recipient" + }, + { + "type": "object", + "maxProperties": 0 + }, + { + "type": "null" + } + ] }, "to": { "type": "array", @@ -769,6 +827,19 @@ "last_sent": { "type": "string", "format": "date-time" + }, + "name": { + "type": [ + "string", + "null" + ] + }, + "email": { + "type": [ + "string", + "null" + ], + "format": "email" } }, "additionalProperties": false @@ -804,6 +875,41 @@ } }, "additionalProperties": false + }, + "attachment_base_path": { + "type": "object", + "required": [ + "name", + "path" + ], + "properties": { + "id": { + "type": "string", + "description": "Optional stable ID for UI/status references." + }, + "name": { + "type": "string", + "description": "Display name for this selectable attachment base path." + }, + "path": { + "type": "string", + "default": ".", + "description": "Base path relative to the campaign file unless absolute." + }, + "allow_individual": { + "type": "boolean", + "default": false, + "description": "Whether recipient-level attachments may use this base path." + }, + "source": { + "type": [ + "string", + "null" + ], + "description": "Legacy UI compatibility value. Ignored by the backend." + } + }, + "additionalProperties": false } } } diff --git a/server/app/mailer/sending/jobs.py b/server/app/mailer/sending/jobs.py index 783d0d5..9840327 100644 --- a/server/app/mailer/sending/jobs.py +++ b/server/app/mailer/sending/jobs.py @@ -15,6 +15,7 @@ from app.db.models import ( CampaignJob, CampaignStatus, CampaignVersion, + CampaignVersionWorkflowState, JobBuildStatus, JobImapStatus, JobQueueStatus, @@ -61,6 +62,30 @@ class QueueCampaignResult: } +@dataclass(frozen=True, slots=True) +class SendCampaignNowResult: + campaign_id: str + version_id: str + attempted_count: int + sent_count: int + failed_count: int + skipped_count: int + dry_run: bool = False + results: list[dict[str, Any]] | None = None + + def as_dict(self) -> dict[str, Any]: + return { + "campaign_id": self.campaign_id, + "version_id": self.version_id, + "attempted_count": self.attempted_count, + "sent_count": self.sent_count, + "failed_count": self.failed_count, + "skipped_count": self.skipped_count, + "dry_run": self.dry_run, + "results": self.results or [], + } + + @dataclass(frozen=True, slots=True) class SendJobResult: job_id: str @@ -197,6 +222,10 @@ def queue_campaign_jobs( if not dry_run: if queued: campaign.status = CampaignStatus.QUEUED.value + version.workflow_state = CampaignVersionWorkflowState.QUEUED.value + if version.locked_at is None: + version.locked_at = _utcnow() + session.add(version) session.add(campaign) session.commit() @@ -217,6 +246,91 @@ def queue_campaign_jobs( ) +def send_campaign_now( + session: Session, + *, + tenant_id: str, + campaign_id: str, + version_id: str | None = None, + include_warnings: bool = True, + dry_run: bool = False, + use_rate_limit: bool = True, + enqueue_imap_task: bool = False, +) -> SendCampaignNowResult: + """Queue and send a small campaign synchronously. + + This is intended for WebUI test/small-campaign flows. Large campaigns can + still use queue_campaign_jobs with Celery workers. + """ + + campaign = _get_campaign_for_tenant(session, campaign_id=campaign_id, tenant_id=tenant_id) + version = _get_current_version(session, campaign, version_id=version_id) + + queue_result = queue_campaign_jobs( + session, + tenant_id=tenant_id, + campaign_id=campaign.id, + version_id=version.id, + include_warnings=include_warnings, + enqueue_celery=False, + dry_run=dry_run, + ) + if dry_run: + return SendCampaignNowResult( + campaign_id=campaign.id, + version_id=version.id, + attempted_count=0, + sent_count=0, + failed_count=0, + skipped_count=queue_result.skipped_count + queue_result.blocked_count, + dry_run=True, + results=[queue_result.as_dict()], + ) + + jobs = ( + session.query(CampaignJob) + .filter( + CampaignJob.tenant_id == tenant_id, + CampaignJob.campaign_version_id == version.id, + CampaignJob.queue_status == JobQueueStatus.QUEUED.value, + CampaignJob.send_status.in_([JobSendStatus.QUEUED.value, JobSendStatus.FAILED_TEMPORARY.value]), + ) + .order_by(CampaignJob.entry_index.asc()) + .all() + ) + + results: list[dict[str, Any]] = [] + sent_count = 0 + failed_count = 0 + for job in jobs: + try: + result = send_campaign_job( + session, + job_id=job.id, + dry_run=False, + use_rate_limit=use_rate_limit, + enqueue_imap_task=enqueue_imap_task, + ) + result_dict = result.as_dict() + results.append(result_dict) + if result.status in {"sent", "already_sent"}: + sent_count += 1 + except Exception as exc: # keep sending other jobs and return per-job details + failed_count += 1 + results.append({"job_id": job.id, "status": "failed", "message": str(exc)}) + + return SendCampaignNowResult( + campaign_id=campaign.id, + version_id=version.id, + attempted_count=len(jobs), + sent_count=sent_count, + failed_count=failed_count, + skipped_count=queue_result.skipped_count + queue_result.blocked_count, + dry_run=False, + results=results, + ) + + def enqueue_existing_queued_jobs(session: Session, *, tenant_id: str, campaign_id: str) -> int: campaign = _get_campaign_for_tenant(session, campaign_id=campaign_id, tenant_id=tenant_id) jobs = ( @@ -360,15 +474,18 @@ def _record_attempt_start(session: Session, job: CampaignJob) -> SendAttempt: return attempt -def _update_campaign_after_job(session: Session, campaign_id: str) -> None: +def _update_campaign_after_job(session: Session, campaign_id: str, version_id: str | None = None) -> None: session.flush() campaign = session.get(Campaign, campaign_id) if not campaign: return + base_filters = [CampaignJob.campaign_id == campaign_id] + if version_id: + base_filters.append(CampaignJob.campaign_version_id == version_id) remaining = ( session.query(CampaignJob) .filter( - CampaignJob.campaign_id == campaign_id, + *base_filters, CampaignJob.queue_status.in_([JobQueueStatus.QUEUED.value, JobQueueStatus.SENDING.value, JobQueueStatus.PAUSED.value]), ) .count() @@ -376,18 +493,25 @@ def _update_campaign_after_job(session: Session, campaign_id: str) -> None: failed = ( session.query(CampaignJob) .filter( - CampaignJob.campaign_id == campaign_id, + *base_filters, CampaignJob.send_status.in_([JobSendStatus.FAILED_TEMPORARY.value, JobSendStatus.FAILED_PERMANENT.value]), ) .count() ) - sent = session.query(CampaignJob).filter(CampaignJob.campaign_id == campaign_id, CampaignJob.send_status == JobSendStatus.SENT.value).count() + sent = session.query(CampaignJob).filter(*base_filters, CampaignJob.send_status == JobSendStatus.SENT.value).count() if remaining: campaign.status = CampaignStatus.QUEUED.value elif failed: campaign.status = CampaignStatus.FAILED.value if not sent else CampaignStatus.NEEDS_REVIEW.value elif sent: campaign.status = CampaignStatus.SENT.value + if version_id: + version = session.get(CampaignVersion, version_id) + if version: + version.workflow_state = CampaignVersionWorkflowState.COMPLETED.value + if version.locked_at is None: + version.locked_at = _utcnow() + session.add(version) session.add(campaign) @@ -452,7 +576,7 @@ def send_campaign_job(session: Session, *, job_id: str, dry_run: bool = False, u job.last_error = None session.add(attempt) session.add(job) - _update_campaign_after_job(session, job.campaign_id) + _update_campaign_after_job(session, job.campaign_id, job.campaign_version_id) session.commit() if enqueue_imap_task and job.imap_status == JobImapStatus.PENDING.value: _celery_enqueue_append_sent_job(job.id) @@ -473,7 +597,7 @@ def send_campaign_job(session: Session, *, job_id: str, dry_run: bool = False, u job.send_status = JobSendStatus.FAILED_PERMANENT.value session.add(attempt) session.add(job) - _update_campaign_after_job(session, job.campaign_id) + _update_campaign_after_job(session, job.campaign_id, job.campaign_version_id) session.commit() raise diff --git a/server/multimailer-dev.db b/server/multimailer-dev.db index 99ae282..48d973c 100644 Binary files a/server/multimailer-dev.db and b/server/multimailer-dev.db differ