diff --git a/.env.example b/.env.example index b265eed..c8d3f66 100644 --- a/.env.example +++ b/.env.example @@ -61,6 +61,8 @@ CELERY_PREFETCH_MULTIPLIER=1 CELERY_MAX_TASKS_PER_CHILD=200 CELERY_LOGLEVEL=INFO +RUN_DB_MIGRATIONS=true + # Existing Traefik/proxy network example EXTERNAL_PROXY_NETWORK=proxy TRAEFIK_ENTRYPOINT=websecure diff --git a/server/alembic/env.py b/server/alembic/env.py index 9d1137f..af6bf9d 100644 --- a/server/alembic/env.py +++ b/server/alembic/env.py @@ -10,7 +10,8 @@ from app.db import models # noqa: F401 - ensure models are imported from app.settings import settings config = context.config -config.set_main_option("sqlalchemy.url", settings.database_url) +database_url = config.attributes.get("database_url") or settings.database_url +config.set_main_option("sqlalchemy.url", database_url) if config.config_file_name is not None: fileConfig(config.config_file_name) diff --git a/server/alembic/versions/5f6a7b8c9d0e_campaign_version_user_locks.py b/server/alembic/versions/5f6a7b8c9d0e_campaign_version_user_locks.py new file mode 100644 index 0000000..55ec532 --- /dev/null +++ b/server/alembic/versions/5f6a7b8c9d0e_campaign_version_user_locks.py @@ -0,0 +1,55 @@ +"""explicit temporary and permanent user locks + +Revision ID: 5f6a7b8c9d0e +Revises: 4e5f6a7b8c9d +Create Date: 2026-06-13 18:00:00.000000 +""" +from __future__ import annotations + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +revision: str = "5f6a7b8c9d0e" +down_revision: Union[str, None] = "4e5f6a7b8c9d" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + with op.batch_alter_table("campaign_versions") as batch_op: + batch_op.add_column(sa.Column("user_lock_state", sa.String(length=20), nullable=True)) + batch_op.add_column(sa.Column("user_locked_at", sa.DateTime(timezone=True), nullable=True)) + batch_op.add_column(sa.Column("user_locked_by_user_id", sa.String(length=36), nullable=True)) + batch_op.create_foreign_key( + "fk_campaign_versions_user_locked_by_user_id_users", + "users", + ["user_locked_by_user_id"], + ["id"], + ondelete="SET NULL", + ) + batch_op.create_index("ix_campaign_versions_user_lock_state", ["user_lock_state"]) + batch_op.create_index("ix_campaign_versions_user_locked_by_user_id", ["user_locked_by_user_id"]) + + # Existing published snapshots were the former irreversible user lock. + op.execute( + """ + UPDATE campaign_versions + SET user_lock_state = 'permanent', + user_locked_at = published_at, + user_locked_by_user_id = NULL + WHERE published_at IS NOT NULL + AND user_lock_state IS NULL + """ + ) + + +def downgrade() -> None: + with op.batch_alter_table("campaign_versions") as batch_op: + batch_op.drop_index("ix_campaign_versions_user_locked_by_user_id") + batch_op.drop_index("ix_campaign_versions_user_lock_state") + batch_op.drop_constraint("fk_campaign_versions_user_locked_by_user_id_users", type_="foreignkey") + batch_op.drop_column("user_locked_by_user_id") + batch_op.drop_column("user_locked_at") + batch_op.drop_column("user_lock_state") diff --git a/server/app/api/v1/campaigns.py b/server/app/api/v1/campaigns.py index f95028c..6749ccd 100644 --- a/server/app/api/v1/campaigns.py +++ b/server/app/api/v1/campaigns.py @@ -18,6 +18,7 @@ from app.api.v1.schemas import ( CampaignVersionDetailResponse, CampaignVersionResponse, CampaignVersionSetStepRequest, + CampaignReviewStateRequest, CampaignVersionUpdateRequest, CampaignPartialValidationRequest, CampaignPartialValidationResponse, @@ -49,9 +50,13 @@ from app.mailer.persistence.versions import ( is_user_locked_version, is_version_locked, get_campaign_version_for_tenant, + lock_campaign_version_temporarily, + permanently_lock_campaign_version, publish_campaign_version, + unlock_user_locked_campaign_version, unlock_validated_campaign_version, update_campaign_version, + update_campaign_review_state, validate_campaign_partial, ) @@ -359,6 +364,98 @@ def unlock_version_validation( raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc +@router.post("/{campaign_id}/versions/{version_id}/lock-temporarily", response_model=CampaignVersionDetailResponse) +def lock_version_temporarily( + campaign_id: str, + version_id: str, + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("campaign:write")), +): + try: + version = lock_campaign_version_temporarily( + session, + tenant_id=principal.tenant_id, + campaign_id=campaign_id, + version_id=version_id, + user_id=principal.user.id, + ) + audit_from_principal( + session, + principal, + action="campaign.version_user_locked_temporarily", + object_type="campaign_version", + object_id=version.id, + details={"campaign_id": campaign_id}, + commit=True, + ) + return CampaignVersionDetailResponse.model_validate(version) + except LockedCampaignVersionError as exc: + raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(exc)) from exc + except CampaignPersistenceError as exc: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc + + +@router.post("/{campaign_id}/versions/{version_id}/unlock-user-lock", response_model=CampaignVersionDetailResponse) +def unlock_version_user_lock( + campaign_id: str, + version_id: str, + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("campaign:write")), +): + try: + version = unlock_user_locked_campaign_version( + session, + tenant_id=principal.tenant_id, + campaign_id=campaign_id, + version_id=version_id, + ) + audit_from_principal( + session, + principal, + action="campaign.version_user_lock_removed", + object_type="campaign_version", + object_id=version.id, + details={"campaign_id": campaign_id}, + commit=True, + ) + return CampaignVersionDetailResponse.model_validate(version) + except LockedCampaignVersionError as exc: + raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(exc)) from exc + except CampaignPersistenceError as exc: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc + + +@router.post("/{campaign_id}/versions/{version_id}/lock-permanently", response_model=CampaignVersionDetailResponse) +def lock_version_permanently( + campaign_id: str, + version_id: str, + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("campaign:write")), +): + try: + version = permanently_lock_campaign_version( + session, + tenant_id=principal.tenant_id, + campaign_id=campaign_id, + version_id=version_id, + user_id=principal.user.id, + ) + audit_from_principal( + session, + principal, + action="campaign.version_user_locked_permanently", + object_type="campaign_version", + object_id=version.id, + details={"campaign_id": campaign_id}, + commit=True, + ) + return CampaignVersionDetailResponse.model_validate(version) + except LockedCampaignVersionError as exc: + raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(exc)) from exc + except CampaignPersistenceError as exc: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc + + @router.put("/{campaign_id}/versions/{version_id}", response_model=CampaignVersionDetailResponse) def update_version_detail( campaign_id: str, @@ -468,6 +565,44 @@ def set_version_step( raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc +@router.post("/{campaign_id}/versions/{version_id}/review-state", response_model=CampaignVersionDetailResponse) +def set_version_review_state( + campaign_id: str, + version_id: str, + payload: CampaignReviewStateRequest, + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("campaign:write")), +): + try: + version = update_campaign_review_state( + session, + tenant_id=principal.tenant_id, + campaign_id=campaign_id, + version_id=version_id, + inspection_complete=payload.inspection_complete, + reviewed_message_keys=payload.reviewed_message_keys, + user_id=principal.user.id, + ) + audit_from_principal( + session, + principal, + action="campaign.message_review_updated", + object_type="campaign_version", + object_id=version.id, + details={ + "campaign_id": campaign_id, + "inspection_complete": payload.inspection_complete, + "reviewed_message_count": len(payload.reviewed_message_keys), + }, + commit=True, + ) + return CampaignVersionDetailResponse.model_validate(version) + except LockedCampaignVersionError as exc: + raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(exc)) from exc + except CampaignPersistenceError as exc: + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(exc)) from exc + + @router.post("/{campaign_id}/versions/{version_id}/validate-partial", response_model=CampaignPartialValidationResponse) def validate_version_partial( campaign_id: str, @@ -504,17 +639,25 @@ def publish_version( principal: ApiPrincipal = Depends(require_scope("campaign:write")), ): try: - version = publish_campaign_version(session, tenant_id=principal.tenant_id, campaign_id=campaign_id, version_id=version_id) + version = publish_campaign_version( + session, + tenant_id=principal.tenant_id, + campaign_id=campaign_id, + version_id=version_id, + user_id=principal.user.id, + ) audit_from_principal( session, principal, - action="campaign.version_published", + action="campaign.version_user_locked_permanently", object_type="campaign_version", object_id=version.id, details={"campaign_id": campaign_id}, commit=True, ) return CampaignVersionDetailResponse.model_validate(version) + except LockedCampaignVersionError as exc: + raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(exc)) from exc except CampaignPersistenceError as exc: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc @@ -531,7 +674,7 @@ def validate_version( if is_user_locked_version(version) or is_version_final_locked(version): raise HTTPException( status_code=status.HTTP_409_CONFLICT, - detail="This version is audit-safe/final and cannot be validated again. Create an editable copy instead.", + detail="This version has a user lock or final delivery lock and cannot be validated. Remove a temporary lock or create an editable copy.", ) result = validate_campaign_version( session, @@ -550,6 +693,8 @@ def validate_version( commit=True, ) return result + except HTTPException: + raise except CampaignPersistenceError as exc: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc except Exception as exc: @@ -589,13 +734,19 @@ def build_version( @router.get("/{campaign_id}/jobs", response_model=CampaignJobsResponse) def list_jobs( campaign_id: str, + version_id: str | None = None, session: Session = Depends(get_session), principal: ApiPrincipal = Depends(require_scope("campaign:read")), ): campaign = _get_campaign_for_tenant(session, campaign_id, principal.tenant_id) + query = session.query(CampaignJob).filter(CampaignJob.campaign_id == campaign.id) + if version_id: + version = _get_version_for_tenant(session, version_id, principal.tenant_id) + if version.campaign_id != campaign.id: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Campaign version not found") + query = query.filter(CampaignJob.campaign_version_id == version.id) jobs = ( - session.query(CampaignJob) - .filter(CampaignJob.campaign_id == campaign.id) + query .order_by(CampaignJob.entry_index.asc()) .all() ) @@ -603,6 +754,7 @@ def list_jobs( jobs=[ { "id": job.id, + "campaign_version_id": job.campaign_version_id, "entry_index": job.entry_index, "entry_id": job.entry_id, "recipient_email": job.recipient_email, @@ -620,6 +772,7 @@ def list_jobs( "sent_at": job.sent_at, "issues": job.issues_snapshot, "attachments": job.resolved_attachments, + "resolved_recipients": job.resolved_recipients, } for job in jobs ] diff --git a/server/app/api/v1/schemas.py b/server/app/api/v1/schemas.py index 7ec3d9a..d101682 100644 --- a/server/app/api/v1/schemas.py +++ b/server/app/api/v1/schemas.py @@ -57,6 +57,13 @@ class CampaignVersionSetStepRequest(BaseModel): current_step: str +class CampaignReviewStateRequest(BaseModel): + model_config = ConfigDict(extra="forbid") + + inspection_complete: bool = False + reviewed_message_keys: list[str] = Field(default_factory=list) + + class CampaignPartialValidationRequest(BaseModel): model_config = ConfigDict(extra="forbid") @@ -82,6 +89,9 @@ class CampaignVersionResponse(BaseModel): published_at: datetime | None = None locked_at: datetime | None = None locked_by_user_id: str | None = None + user_lock_state: Literal["temporary", "permanent"] | None = None + user_locked_at: datetime | None = None + user_locked_by_user_id: str | None = None created_at: datetime updated_at: datetime validation_summary: dict[str, Any] | None = None diff --git a/server/app/db/migrate.py b/server/app/db/migrate.py new file mode 100644 index 0000000..f836e88 --- /dev/null +++ b/server/app/db/migrate.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +import argparse + +from app.db.migrations import migrate_database + + +def main() -> None: + parser = argparse.ArgumentParser(description="Upgrade the Multi Seal Mail database schema") + parser.add_argument( + "--no-reconcile-legacy-schema", + action="store_true", + help="Disable repair of the known create_all/Alembic development schema drift", + ) + args = parser.parse_args() + + result = migrate_database(reconcile_legacy_schema=not args.no_reconcile_legacy_schema) + if result.reconciled_revision: + print( + "Reconciled legacy database marker " + f"from {result.previous_revision or 'unversioned'} to {result.reconciled_revision}." + ) + print(f"Database schema is at revision {result.current_revision or 'unknown'}.") + + +if __name__ == "__main__": + main() diff --git a/server/app/db/migrations.py b/server/app/db/migrations.py new file mode 100644 index 0000000..d016c46 --- /dev/null +++ b/server/app/db/migrations.py @@ -0,0 +1,163 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + +from alembic import command +from alembic.config import Config +from alembic.runtime.migration import MigrationContext +from sqlalchemy import create_engine, inspect + +from app.settings import settings + +# Historic development databases could be created partly through Alembic and +# partly through Base.metadata.create_all(). In that state Alembic still says +# "2c..." while the 3d/4e file-storage tables already exist, so a normal +# upgrade attempts to create file_blobs again. This reconciliation is kept +# deliberately narrow and only advances the marker when the complete expected +# schema for the skipped revisions is already present. +REVISION_AUTH_RBAC = "2c3d4e5f6a7b" +REVISION_FILE_STORAGE = "3d4e5f6a7b8c" +REVISION_FILE_FOLDERS = "4e5f6a7b8c9d" + +_FILE_STORAGE_TABLES = { + "file_blobs", + "file_assets", + "file_versions", + "file_shares", + "campaign_attachment_uses", +} +_FILE_FOLDER_TABLES = {"file_folders"} + +_FILE_STORAGE_COLUMNS = { + "file_blobs": { + "id", + "tenant_id", + "storage_backend", + "storage_key", + "checksum_sha256", + "size_bytes", + }, + "file_assets": { + "id", + "tenant_id", + "owner_type", + "display_path", + "filename", + "current_version_id", + }, + "file_versions": { + "id", + "file_asset_id", + "blob_id", + "version_number", + "checksum_sha256", + }, + "file_shares": {"id", "file_asset_id", "target_type", "target_id", "permission"}, + "campaign_attachment_uses": { + "id", + "campaign_id", + "campaign_version_id", + "file_asset_id", + "file_version_id", + "file_blob_id", + }, + "file_folders": {"id", "tenant_id", "owner_type", "path"}, +} + + +@dataclass(frozen=True, slots=True) +class MigrationResult: + previous_revision: str | None + reconciled_revision: str | None + current_revision: str | None + + +def alembic_config(*, database_url: str | None = None) -> Config: + server_root = Path(__file__).resolve().parents[2] + config = Config(str(server_root / "alembic.ini")) + config.set_main_option("script_location", str(server_root / "alembic")) + config.attributes["database_url"] = database_url or settings.database_url + return config + + +def database_revision(database_url: str | None = None) -> str | None: + url = database_url or settings.database_url + engine = create_engine(url) + try: + with engine.connect() as connection: + return MigrationContext.configure(connection).get_current_revision() + finally: + engine.dispose() + + +def _has_columns(inspector, table_name: str, required: set[str]) -> bool: + try: + actual = {column["name"] for column in inspector.get_columns(table_name)} + except Exception: + return False + return required.issubset(actual) + + +def reconcile_legacy_create_all_schema(database_url: str | None = None) -> str | None: + """Repair the known Alembic/create_all drift without modifying table data. + + Returns the revision stamped during reconciliation, or ``None`` when no + repair was necessary. A partial/unknown schema is intentionally left alone + so Alembic can fail visibly instead of guessing. + """ + + url = database_url or settings.database_url + engine = create_engine(url) + try: + with engine.connect() as connection: + current = MigrationContext.configure(connection).get_current_revision() + schema = inspect(connection) + tables = set(schema.get_table_names()) + + has_file_storage = _FILE_STORAGE_TABLES.issubset(tables) and all( + _has_columns(schema, table, _FILE_STORAGE_COLUMNS[table]) + for table in _FILE_STORAGE_TABLES + ) + has_file_folders = _FILE_FOLDER_TABLES.issubset(tables) and _has_columns( + schema, + "file_folders", + _FILE_STORAGE_COLUMNS["file_folders"], + ) + finally: + engine.dispose() + + target: str | None = None + if current == REVISION_AUTH_RBAC and has_file_storage and has_file_folders: + target = REVISION_FILE_FOLDERS + elif current == REVISION_AUTH_RBAC and has_file_storage: + target = REVISION_FILE_STORAGE + elif current == REVISION_FILE_STORAGE and has_file_folders: + target = REVISION_FILE_FOLDERS + elif current is None and has_file_storage and has_file_folders: + # This is the other create_all-only development shape. The strict + # column checks above ensure that we only stamp a complete known schema. + target = REVISION_FILE_FOLDERS + + if target is None: + return None + + command.stamp(alembic_config(database_url=url), target) + return target + + +def migrate_database( + *, + database_url: str | None = None, + reconcile_legacy_schema: bool = True, +) -> MigrationResult: + url = database_url or settings.database_url + previous = database_revision(url) + reconciled = reconcile_legacy_create_all_schema(url) if reconcile_legacy_schema else None + command.upgrade(alembic_config(database_url=url), "head") + current = database_revision(url) + return MigrationResult( + previous_revision=previous, + reconciled_revision=reconciled, + current_revision=current, + ) diff --git a/server/app/db/models.py b/server/app/db/models.py index 3e4cc05..a61044d 100644 --- a/server/app/db/models.py +++ b/server/app/db/models.py @@ -267,6 +267,14 @@ class CampaignVersion(Base, TimestampMixin): locked_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) locked_by_user_id: Mapped[str | None] = mapped_column(ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True) + # Explicit user-requested lock. This is deliberately separate from + # locked_at, which represents the reversible validation lock used by the + # build/send workflow. Temporary user locks may later receive a dedicated + # RBAC permission for unlocking; permanent locks never unlock in place. + user_lock_state: Mapped[str | None] = mapped_column(String(20), nullable=True, index=True) + user_locked_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + user_locked_by_user_id: Mapped[str | None] = mapped_column(ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True) + validation_summary: Mapped[dict[str, Any] | None] = mapped_column(JSON, nullable=True) build_summary: Mapped[dict[str, Any] | None] = mapped_column(JSON, nullable=True) diff --git a/server/app/mailer/commands/init_db.py b/server/app/mailer/commands/init_db.py index ec70584..af24287 100644 --- a/server/app/mailer/commands/init_db.py +++ b/server/app/mailer/commands/init_db.py @@ -2,7 +2,8 @@ from __future__ import annotations import argparse -from app.db.bootstrap import bootstrap_dev_data, create_all_tables +from app.db.bootstrap import bootstrap_dev_data +from app.db.migrations import migrate_database from app.db.session import SessionLocal from app.settings import settings @@ -13,8 +14,13 @@ def main() -> None: parser.add_argument("--dev-api-key", default=settings.dev_bootstrap_api_key, help="Development API key secret to create") args = parser.parse_args() - create_all_tables() - print("Database tables ensured.") + migration = migrate_database() + if migration.reconciled_revision: + print( + "Reconciled legacy database marker " + f"to {migration.reconciled_revision}." + ) + print(f"Database schema upgraded to {migration.current_revision}.") if args.with_dev_data: with SessionLocal() as session: diff --git a/server/app/mailer/dev/mock_campaign.py b/server/app/mailer/dev/mock_campaign.py index 3266747..f3a0314 100644 --- a/server/app/mailer/dev/mock_campaign.py +++ b/server/app/mailer/dev/mock_campaign.py @@ -15,6 +15,7 @@ from app.mailer.messages.models import MessageAddress, MessageDraft, MessageVali from app.storage.campaign_attachments import ( annotate_built_messages_with_managed_files, prepared_campaign_snapshot, + public_attachment_summary_payload, ) from app.mailer.dev.mock_mailbox import ( clear_records, @@ -46,7 +47,7 @@ def _issue_payloads(message: MessageDraft) -> list[dict[str, Any]]: def _attachment_payloads(message: MessageDraft) -> list[dict[str, Any]]: - return [attachment.model_dump(mode="json") for attachment in message.attachments] + return [public_attachment_summary_payload(attachment) for attachment in message.attachments] def _message_payload(message: MessageDraft) -> dict[str, Any]: diff --git a/server/app/mailer/persistence/campaigns.py b/server/app/mailer/persistence/campaigns.py index efc87fc..95d54ee 100644 --- a/server/app/mailer/persistence/campaigns.py +++ b/server/app/mailer/persistence/campaigns.py @@ -4,6 +4,8 @@ import json from pathlib import Path from typing import Any import copy +from datetime import UTC, datetime +from uuid import uuid4 from sqlalchemy import func from sqlalchemy.orm import Session @@ -29,6 +31,7 @@ from app.storage.services import record_campaign_attachment_uses_for_job from app.storage.campaign_attachments import ( annotate_built_messages_with_managed_files, prepared_campaign_snapshot, + public_attachment_summary_payload, ) RUNTIME_DIR = Path(__file__).resolve().parents[3] / "runtime" @@ -154,8 +157,15 @@ def create_campaign_version_from_json( +def _version_user_lock_state(version: CampaignVersion) -> str | None: + state = getattr(version, "user_lock_state", None) + if state in {"temporary", "permanent"}: + return state + return "permanent" if version.published_at else None + + def _version_is_user_locked(version: CampaignVersion) -> bool: - return bool(version.published_at) + return _version_user_lock_state(version) is not None def _version_is_validated_and_locked(version: CampaignVersion) -> bool: @@ -164,8 +174,11 @@ def _version_is_validated_and_locked(version: CampaignVersion) -> bool: def _ensure_version_validated_and_locked(version: CampaignVersion) -> None: - if _version_is_user_locked(version): - raise CampaignPersistenceError("User-locked audit-safe versions cannot be built, queued, dry-run or sent. Create an editable copy instead.") + state = _version_user_lock_state(version) + if state == "temporary": + raise CampaignPersistenceError("This version has a temporary user lock. Unlock it before building, queueing, dry-run or sending.") + if state == "permanent": + raise CampaignPersistenceError("This version is permanently user-locked. Create an editable copy instead.") if not _version_is_validated_and_locked(version): raise CampaignPersistenceError("Campaign version must be validated and locked before building, queueing, dry-run or sending.") @@ -190,14 +203,15 @@ def validate_campaign_version( campaign = session.get(Campaign, version.campaign_id) if not campaign or campaign.tenant_id != tenant_id: raise CampaignPersistenceError("Campaign version is not accessible for this tenant") - if version.published_at or version.workflow_state in { + if _version_is_user_locked(version) or version.workflow_state in { CampaignVersionWorkflowState.QUEUED.value, CampaignVersionWorkflowState.SENDING.value, CampaignVersionWorkflowState.COMPLETED.value, CampaignVersionWorkflowState.CANCELLED.value, CampaignVersionWorkflowState.ARCHIVED.value, }: - raise CampaignPersistenceError("Audit-safe/final campaign versions cannot be validated. Create an editable copy instead.") + lock_label = "temporarily user-locked" if _version_user_lock_state(version) == "temporary" else "permanently locked/final" + raise CampaignPersistenceError(f"{lock_label.capitalize()} campaign versions cannot be validated. Unlock or create an editable copy instead.") if check_files: with prepared_campaign_snapshot( @@ -289,7 +303,7 @@ def _job_from_message( "bounce_to": [item.model_dump(mode="json") for item in message.bounce_to], "disposition_notification_to": [item.model_dump(mode="json") for item in message.disposition_notification_to], }, - resolved_attachments=[item.model_dump(mode="json") for item in message.attachments], + resolved_attachments=[public_attachment_summary_payload(item) for item in message.attachments], issues_snapshot=[item.model_dump(mode="json") for item in message.issues], last_error="; ".join(issue.message for issue in message.issues if issue.severity == "error") or None, ) @@ -326,6 +340,11 @@ def build_campaign_version( result = build_campaign_messages(managed_config, campaign_file=prepared.path, output_dir=output_dir, write_eml=write_eml) annotate_built_messages_with_managed_files(result.built_messages, prepared.managed_files_by_local_path) report_json = result.report.model_dump(mode="json", by_alias=True) + for message_payload, message in zip(report_json.get("messages", []), result.report.messages, strict=False): + if isinstance(message_payload, dict): + message_payload["attachments"] = [public_attachment_summary_payload(item) for item in message.attachments] + report_json["built_at"] = datetime.now(UTC).isoformat() + report_json["build_token"] = uuid4().hex report_json.update({ "built_count": result.report.built_count, "build_failed_count": result.report.build_failed_count, @@ -338,6 +357,9 @@ def build_campaign_version( "queueable_count": result.report.queueable_count, }) version.build_summary = report_json + editor_state = copy.deepcopy(version.editor_state or {}) + editor_state.pop("review_send", None) + version.editor_state = editor_state # Rebuild jobs for the current version. Later, protect sent jobs from destructive rebuilds. session.query(CampaignIssue).filter(CampaignIssue.campaign_version_id == version.id, CampaignIssue.job_id.is_not(None)).delete(synchronize_session=False) diff --git a/server/app/mailer/persistence/versions.py b/server/app/mailer/persistence/versions.py index 0c1c315..0672ec0 100644 --- a/server/app/mailer/persistence/versions.py +++ b/server/app/mailer/persistence/versions.py @@ -4,6 +4,7 @@ import copy from datetime import UTC, datetime from pathlib import Path from typing import Any +from uuid import uuid4 from sqlalchemy import func from sqlalchemy.orm import Session @@ -33,6 +34,26 @@ class LockedCampaignVersionError(CampaignPersistenceError): """Raised when a caller tries to edit an immutable campaign version.""" +USER_LOCK_TEMPORARY = "temporary" +USER_LOCK_PERMANENT = "permanent" +USER_LOCK_STATES = {USER_LOCK_TEMPORARY, USER_LOCK_PERMANENT} + + +def campaign_version_user_lock_state(version: CampaignVersion) -> str | None: + """Return the explicit user-lock state with backwards compatibility. + + Older databases represented a permanent user lock only through + published_at. Treat those rows as permanent until the migration has + backfilled the explicit state. + """ + + state = getattr(version, "user_lock_state", None) + if state in USER_LOCK_STATES: + return state + if version.published_at: + return USER_LOCK_PERMANENT + return None + def minimal_campaign_json(*, external_id: str, name: str, description: str | None = None) -> dict[str, Any]: """Return a WebUI-friendly starter campaign JSON. @@ -227,9 +248,13 @@ LOCKED_WORKFLOW_STATES = { def is_version_locked(version: CampaignVersion) -> bool: - """Return True when a version is immutable and edits must fork.""" + """Return True when a version is immutable and edits must fork/unlock.""" - return bool(version.locked_at or version.workflow_state in LOCKED_WORKFLOW_STATES) + return bool( + version.locked_at + or campaign_version_user_lock_state(version) + or version.workflow_state in LOCKED_WORKFLOW_STATES + ) def _apply_campaign_metadata(campaign: Campaign, raw_json: dict[str, Any]) -> None: @@ -312,16 +337,24 @@ def is_version_final_locked(version: CampaignVersion) -> bool: } -def is_user_locked_version(version: CampaignVersion) -> bool: - """Return True when a user explicitly locked a version as an audit-safe snapshot.""" +def is_temporary_user_locked_version(version: CampaignVersion) -> bool: + return campaign_version_user_lock_state(version) == USER_LOCK_TEMPORARY - return bool(version.published_at) + +def is_permanent_user_locked_version(version: CampaignVersion) -> bool: + return campaign_version_user_lock_state(version) == USER_LOCK_PERMANENT + + +def is_user_locked_version(version: CampaignVersion) -> bool: + """Return True for either reversible or permanent user-requested locks.""" + + return campaign_version_user_lock_state(version) is not None def is_audit_safe_version(version: CampaignVersion) -> bool: """Return True when a version is immutable and cannot be unlocked.""" - return is_user_locked_version(version) or is_version_final_locked(version) + return is_permanent_user_locked_version(version) or is_version_final_locked(version) def is_version_validated_and_locked(version: CampaignVersion) -> bool: @@ -349,8 +382,10 @@ def unlock_validated_campaign_version( campaign = session.get(Campaign, campaign_id) assert campaign is not None - if is_user_locked_version(version): - raise LockedCampaignVersionError("This version was locked as an audit-safe snapshot and cannot be unlocked. Create an editable copy instead.") + if is_temporary_user_locked_version(version): + raise LockedCampaignVersionError("This version has a temporary user lock. Remove that lock before unlocking validation.") + if is_permanent_user_locked_version(version): + raise LockedCampaignVersionError("This version is permanently locked and cannot be unlocked. Create an editable copy instead.") if is_version_final_locked(version): raise LockedCampaignVersionError("This version is already queued/sent/final and cannot be unlocked. Create an editable copy instead.") @@ -370,6 +405,9 @@ def unlock_validated_campaign_version( version.locked_by_user_id = None version.validation_summary = None version.build_summary = None + editor_state = copy.deepcopy(version.editor_state or {}) + editor_state.pop("review_send", None) + version.editor_state = editor_state version.workflow_state = CampaignVersionWorkflowState.EDITING.value version.is_complete = False @@ -450,29 +488,176 @@ def update_campaign_version( return version -def publish_campaign_version( +def update_campaign_review_state( + session: Session, + *, + tenant_id: str, + campaign_id: str, + version_id: str, + inspection_complete: bool, + reviewed_message_keys: list[str], + user_id: str | None, +) -> CampaignVersion: + """Persist review acknowledgement without mutating the locked campaign data. + + Validation locks make the campaign JSON immutable, but review metadata is + operational state attached to a specific build. It is therefore stored in + editor_state and tied to the current build token so a rebuild invalidates it. + """ + + version = get_campaign_version_for_tenant( + session, + tenant_id=tenant_id, + campaign_id=campaign_id, + version_id=version_id, + ) + if is_version_final_locked(version): + raise LockedCampaignVersionError("Delivery has started; message review state can no longer be changed.") + build_summary = version.build_summary if isinstance(version.build_summary, dict) else {} + if not build_summary: + raise CampaignPersistenceError("Build messages before recording review state.") + build_token = str(build_summary.get("build_token") or build_summary.get("built_at") or "").strip() + if not build_token: + # Backwards-compatible upgrade for build summaries created before + # review-state tokens were introduced. + build_token = uuid4().hex + build_summary = copy.deepcopy(build_summary) + build_summary["build_token"] = build_token + version.build_summary = build_summary + + editor_state = copy.deepcopy(version.editor_state or {}) + editor_state["review_send"] = { + "build_token": build_token, + "inspection_complete": bool(inspection_complete), + "reviewed_message_keys": list(dict.fromkeys(str(value) for value in reviewed_message_keys if str(value).strip())), + "updated_at": datetime.now(UTC).isoformat(), + "updated_by_user_id": user_id, + } + version.editor_state = editor_state + session.add(version) + session.commit() + return version + + +def lock_campaign_version_temporarily( + session: Session, + *, + tenant_id: str, + campaign_id: str, + version_id: str, + user_id: str | None, +) -> CampaignVersion: + """Apply a reversible user-requested lock without changing workflow state.""" + + version = get_campaign_version_for_tenant( + session, + tenant_id=tenant_id, + campaign_id=campaign_id, + version_id=version_id, + ) + if is_version_final_locked(version): + raise LockedCampaignVersionError("Delivery/final versions are permanently locked and cannot receive a temporary user lock.") + if is_permanent_user_locked_version(version): + raise LockedCampaignVersionError("This version is already permanently locked.") + if is_temporary_user_locked_version(version): + return version + if version.locked_at: + raise LockedCampaignVersionError("This version is already temporarily locked by validation. Unlock validation before applying a user lock.") + + version.user_lock_state = USER_LOCK_TEMPORARY + version.user_locked_at = datetime.now(UTC) + version.user_locked_by_user_id = user_id + session.add(version) + session.commit() + return version + + +def unlock_user_locked_campaign_version( session: Session, *, tenant_id: str, campaign_id: str, version_id: str, ) -> CampaignVersion: - version = get_campaign_version_for_tenant(session, tenant_id=tenant_id, campaign_id=campaign_id, version_id=version_id) - campaign = session.get(Campaign, campaign_id) - assert campaign is not None - now = datetime.now(UTC) - version.workflow_state = CampaignVersionWorkflowState.ARCHIVED.value - version.published_at = now - if version.locked_at is None: - version.locked_at = now - campaign.current_version_id = version.id - campaign.status = CampaignStatus.ARCHIVED.value + """Remove a reversible user lock without invalidating campaign data.""" + + version = get_campaign_version_for_tenant( + session, + tenant_id=tenant_id, + campaign_id=campaign_id, + version_id=version_id, + ) + state = campaign_version_user_lock_state(version) + if state == USER_LOCK_PERMANENT: + raise LockedCampaignVersionError("Permanently locked versions cannot be unlocked. Create an editable copy instead.") + if state != USER_LOCK_TEMPORARY: + raise LockedCampaignVersionError("This version does not have a temporary user lock.") + if is_version_final_locked(version): + raise LockedCampaignVersionError("Delivery/final versions cannot be unlocked. Create an editable copy instead.") + + version.user_lock_state = None + version.user_locked_at = None + version.user_locked_by_user_id = None session.add(version) - session.add(campaign) session.commit() return version +def permanently_lock_campaign_version( + session: Session, + *, + tenant_id: str, + campaign_id: str, + version_id: str, + user_id: str | None, +) -> CampaignVersion: + """Apply an irreversible user lock. + + The version remains in its current workflow state so the campaign itself is + not silently archived. Future changes must be made in an editable copy. + """ + + version = get_campaign_version_for_tenant( + session, + tenant_id=tenant_id, + campaign_id=campaign_id, + version_id=version_id, + ) + if is_version_final_locked(version): + raise LockedCampaignVersionError("This version is already permanently locked by its delivery/final state.") + if is_permanent_user_locked_version(version): + return version + + now = datetime.now(UTC) + version.user_lock_state = USER_LOCK_PERMANENT + version.user_locked_at = now + version.user_locked_by_user_id = user_id + # Retain published_at as a compatibility marker for existing integrations. + version.published_at = version.published_at or now + session.add(version) + session.commit() + return version + + +def publish_campaign_version( + session: Session, + *, + tenant_id: str, + campaign_id: str, + version_id: str, + user_id: str | None = None, +) -> CampaignVersion: + """Backwards-compatible alias for the permanent user lock.""" + + return permanently_lock_campaign_version( + session, + tenant_id=tenant_id, + campaign_id=campaign_id, + version_id=version_id, + user_id=user_id, + ) + + def validate_campaign_partial(raw_json: dict[str, Any], *, section: str | None = None) -> dict[str, Any]: """Lightweight UI-facing validation for incomplete campaign working copies. diff --git a/server/app/mailer/sending/jobs.py b/server/app/mailer/sending/jobs.py index 78fc13a..0da7458 100644 --- a/server/app/mailer/sending/jobs.py +++ b/server/app/mailer/sending/jobs.py @@ -131,8 +131,15 @@ QUEUEABLE_VALIDATION_STATUSES = { } +def _version_user_lock_state(version: CampaignVersion) -> str | None: + state = getattr(version, "user_lock_state", None) + if state in {"temporary", "permanent"}: + return state + return "permanent" if version.published_at else None + + def _version_is_user_locked(version: CampaignVersion) -> bool: - return bool(version.published_at) + return _version_user_lock_state(version) is not None def _version_is_validated_and_locked(version: CampaignVersion) -> bool: @@ -141,8 +148,11 @@ def _version_is_validated_and_locked(version: CampaignVersion) -> bool: def _ensure_version_validated_and_locked(version: CampaignVersion) -> None: - if _version_is_user_locked(version): - raise QueueingError("User-locked audit-safe versions cannot be queued, dry-run or sent. Create an editable copy instead.") + state = _version_user_lock_state(version) + if state == "temporary": + raise QueueingError("This version has a temporary user lock. Unlock it before queueing, dry-run or sending.") + if state == "permanent": + raise QueueingError("This version is permanently user-locked. Create an editable copy instead.") if not _version_is_validated_and_locked(version): raise QueueingError("Campaign version must be validated and locked before building, queueing, dry-run or sending.") diff --git a/server/app/storage/campaign_attachments.py b/server/app/storage/campaign_attachments.py index d8b31c0..a0fb8bb 100644 --- a/server/app/storage/campaign_attachments.py +++ b/server/app/storage/campaign_attachments.py @@ -280,6 +280,42 @@ def managed_match_payloads( return payloads +def public_attachment_summary_payload(value: Any) -> dict[str, Any]: + """Return an attachment summary without temporary materialization paths. + + Managed builds use isolated local directories internally. Queue, review and + audit payloads must expose the stable managed paths and immutable IDs + instead of those temporary paths. Legacy filesystem attachments remain + unchanged for backwards compatibility. + """ + + if hasattr(value, "model_dump"): + payload = value.model_dump(mode="json") + elif isinstance(value, dict): + payload = copy.deepcopy(value) + else: + return {} + + managed_matches = payload.get("managed_matches") + if not isinstance(managed_matches, list) or not managed_matches: + return payload + + logical_matches: list[str] = [] + for item in managed_matches: + if not isinstance(item, dict): + continue + display_path = str(item.get("display_path") or item.get("relative_path") or item.get("filename") or "").strip() + if display_path: + logical_matches.append(display_path) + + payload["matches"] = logical_matches + # These values point into a deleted temporary materialization directory. + # The named source plus managed match metadata are the stable references. + payload["base_path"] = None + payload["directory"] = payload.get("base_path_name") or "managed" + return payload + + def annotate_built_messages_with_managed_files( built_messages: list[Any], manifest: dict[str, ManagedAttachmentFile], diff --git a/server/entrypoint.sh b/server/entrypoint.sh index f04a8c4..e9d274b 100644 --- a/server/entrypoint.sh +++ b/server/entrypoint.sh @@ -4,6 +4,10 @@ set -e ROLE="${APP_ROLE:-api}" if [ "$ROLE" = "api" ]; then + if [ "${RUN_DB_MIGRATIONS:-true}" = "true" ]; then + python -m app.db.migrate + fi + exec uvicorn app.main:app \ --host "${APP_HOST:-0.0.0.0}" \ --port "${APP_PORT:-8000}" \ diff --git a/server/multimailer-dev.db b/server/multimailer-dev.db index c9d8e4a..0370f63 100644 Binary files a/server/multimailer-dev.db and b/server/multimailer-dev.db differ diff --git a/server/tests/test_api_smoke.py b/server/tests/test_api_smoke.py index e83b65d..de63eea 100644 --- a/server/tests/test_api_smoke.py +++ b/server/tests/test_api_smoke.py @@ -160,6 +160,67 @@ class ApiSmokeTests(unittest.TestCase): self.assertEqual(sorted(bundle.namelist()), ["inbox/report-copy.txt", "inbox/report.txt"]) self.assertEqual(bundle.read("inbox/report.txt"), b"first report") + def test_temporary_and_permanent_user_lock_lifecycle(self) -> None: + headers, _ = self._login() + created = self.client.post( + "/api/v1/campaigns/new", + headers=headers, + json={"external_id": "lock-lifecycle", "name": "Lock lifecycle"}, + ) + self.assertEqual(created.status_code, 200, created.text) + campaign_id = created.json()["campaign"]["id"] + version_id = created.json()["version"]["id"] + + temporary = self.client.post( + f"/api/v1/campaigns/{campaign_id}/versions/{version_id}/lock-temporarily", + headers=headers, + ) + self.assertEqual(temporary.status_code, 200, temporary.text) + self.assertEqual(temporary.json()["user_lock_state"], "temporary") + self.assertTrue(temporary.json()["user_locked_at"]) + + blocked_update = self.client.put( + f"/api/v1/campaigns/{campaign_id}/versions/{version_id}", + headers=headers, + json={"current_step": "fields"}, + ) + self.assertEqual(blocked_update.status_code, 409, blocked_update.text) + + unlocked = self.client.post( + f"/api/v1/campaigns/{campaign_id}/versions/{version_id}/unlock-user-lock", + headers=headers, + ) + self.assertEqual(unlocked.status_code, 200, unlocked.text) + self.assertIsNone(unlocked.json()["user_lock_state"]) + + updated = self.client.put( + f"/api/v1/campaigns/{campaign_id}/versions/{version_id}", + headers=headers, + json={"current_step": "fields"}, + ) + self.assertEqual(updated.status_code, 200, updated.text) + self.assertEqual(updated.json()["current_step"], "fields") + + relocked = self.client.post( + f"/api/v1/campaigns/{campaign_id}/versions/{version_id}/lock-temporarily", + headers=headers, + ) + self.assertEqual(relocked.status_code, 200, relocked.text) + + permanent = self.client.post( + f"/api/v1/campaigns/{campaign_id}/versions/{version_id}/lock-permanently", + headers=headers, + ) + self.assertEqual(permanent.status_code, 200, permanent.text) + self.assertEqual(permanent.json()["user_lock_state"], "permanent") + self.assertTrue(permanent.json()["published_at"]) + + refused_unlock = self.client.post( + f"/api/v1/campaigns/{campaign_id}/versions/{version_id}/unlock-user-lock", + headers=headers, + ) + self.assertEqual(refused_unlock.status_code, 409, refused_unlock.text) + def test_campaign_create_validate_build_and_mock_send(self) -> None: headers, _ = self._login() campaign_json = { @@ -377,10 +438,59 @@ class ApiSmokeTests(unittest.TestCase): self.assertEqual(built.status_code, 200, built.text) self.assertEqual(built.json()["built_count"], 1) self.assertEqual(built.json()["messages"][0]["attachment_count"], 2) + self.assertTrue(built.json().get("built_at")) + self.assertTrue(built.json().get("build_token")) self.assertEqual( sum(len(item["managed_matches"]) for item in built.json()["messages"][0]["attachments"]), 2, ) + resolved_paths = { + match + for attachment in built.json()["messages"][0]["attachments"] + for match in attachment["matches"] + } + self.assertEqual(resolved_paths, { + "invoices/archive/202605-010001-report.XLSX", + "invoices/202605-010001-90100010-9601741.XLSX", + }) + self.assertFalse(any("multimailer-managed-build" in value for value in resolved_paths)) + + jobs = self.client.get( + f"/api/v1/campaigns/{campaign_id}/jobs", + headers=headers, + params={"version_id": version_id}, + ) + self.assertEqual(jobs.status_code, 200, jobs.text) + self.assertEqual(len(jobs.json()["jobs"]), 1) + job = jobs.json()["jobs"][0] + self.assertEqual(job["campaign_version_id"], version_id) + self.assertEqual(job["resolved_recipients"]["to"][0]["email"], "recipient@example.org") + self.assertEqual( + { + match + for attachment in job["attachments"] + for match in attachment["matches"] + }, + resolved_paths, + ) + + review_state = self.client.post( + f"/api/v1/campaigns/{campaign_id}/versions/{version_id}/review-state", + headers=headers, + json={"inspection_complete": True, "reviewed_message_keys": ["recipient-1"]}, + ) + self.assertEqual(review_state.status_code, 200, review_state.text) + stored_review = review_state.json()["editor_state"]["review_send"] + self.assertTrue(stored_review["inspection_complete"]) + self.assertEqual(stored_review["reviewed_message_keys"], ["recipient-1"]) + self.assertEqual(stored_review["build_token"], built.json()["build_token"]) + + reloaded_version = self.client.get( + f"/api/v1/campaigns/{campaign_id}/versions/{version_id}", + headers=headers, + ) + self.assertEqual(reloaded_version.status_code, 200, reloaded_version.text) + self.assertTrue(reloaded_version.json()["editor_state"]["review_send"]["inspection_complete"]) mocked = self.client.post( f"/api/v1/campaigns/{campaign_id}/mock-send", diff --git a/server/tests/test_database_migrations.py b/server/tests/test_database_migrations.py new file mode 100644 index 0000000..fe5fbd8 --- /dev/null +++ b/server/tests/test_database_migrations.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +import tempfile +import unittest +from pathlib import Path + +from alembic import command +from alembic.runtime.migration import MigrationContext +from sqlalchemy import create_engine, inspect + +from app.db.base import Base +from app.db.migrations import ( + REVISION_AUTH_RBAC, + REVISION_FILE_FOLDERS, + alembic_config, + migrate_database, +) + + +class DatabaseMigrationTests(unittest.TestCase): + def test_repairs_create_all_schema_drift_and_upgrades_to_head(self) -> None: + with tempfile.TemporaryDirectory(prefix="msm-migration-test-") as directory: + database = Path(directory) / "legacy.db" + url = f"sqlite:///{database}" + + # Reproduce the historical development database: Alembic was run + # through auth/RBAC, then create_all() created later file tables + # without advancing alembic_version and without altering the + # already-existing campaign_versions table. + command.upgrade(alembic_config(database_url=url), REVISION_AUTH_RBAC) + engine = create_engine(url) + try: + Base.metadata.create_all(bind=engine) + with engine.connect() as connection: + self.assertEqual( + MigrationContext.configure(connection).get_current_revision(), + REVISION_AUTH_RBAC, + ) + self.assertIn("file_blobs", inspect(connection).get_table_names()) + self.assertNotIn( + "user_lock_state", + {column["name"] for column in inspect(connection).get_columns("campaign_versions")}, + ) + finally: + engine.dispose() + + result = migrate_database(database_url=url) + self.assertEqual(result.previous_revision, REVISION_AUTH_RBAC) + self.assertEqual(result.reconciled_revision, REVISION_FILE_FOLDERS) + + engine = create_engine(url) + try: + with engine.connect() as connection: + current = MigrationContext.configure(connection).get_current_revision() + columns = { + column["name"] + for column in inspect(connection).get_columns("campaign_versions") + } + self.assertEqual(current, result.current_revision) + self.assertIn("user_lock_state", columns) + self.assertIn("user_locked_at", columns) + self.assertIn("user_locked_by_user_id", columns) + finally: + engine.dispose()