diff --git a/.env.example b/.env.example index dbd768b..b265eed 100644 --- a/.env.example +++ b/.env.example @@ -33,6 +33,16 @@ S3_SECRET_ACCESS_KEY=multimailer-dev-secret-change-me GARAGE_S3_PORT=3900 GARAGE_ADMIN_PORT=3903 +# Managed file storage. Development uses the local filesystem; production can +# use FILE_STORAGE_BACKEND=s3 with Garage-compatible credentials. +FILE_STORAGE_BACKEND=local +FILE_STORAGE_LOCAL_ROOT=runtime/files +FILE_STORAGE_S3_ENDPOINT_URL=http://garage:3900 +FILE_STORAGE_S3_REGION=garage +FILE_STORAGE_S3_BUCKET=files +FILE_STORAGE_S3_ACCESS_KEY_ID=GKmultimailerdev0000000000000000 +FILE_STORAGE_S3_SECRET_ACCESS_KEY=multimailer-dev-secret-change-me + # Crypto: required before storing real SMTP/IMAP credentials. # Generate: # python -c "import os,base64; print(base64.b64encode(os.urandom(32)).decode())" diff --git a/server/alembic/versions/3d4e5f6a7b8c_file_storage_backend.py b/server/alembic/versions/3d4e5f6a7b8c_file_storage_backend.py new file mode 100644 index 0000000..862da74 --- /dev/null +++ b/server/alembic/versions/3d4e5f6a7b8c_file_storage_backend.py @@ -0,0 +1,152 @@ +"""file storage backend + +Revision ID: 3d4e5f6a7b8c +Revises: 2c3d4e5f6a7b +Create Date: 2026-06-12 00:00:00.000000 +""" +from __future__ import annotations + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +revision: str = "3d4e5f6a7b8c" +down_revision: Union[str, None] = "2c3d4e5f6a7b" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "file_blobs", + sa.Column("id", sa.String(length=36), nullable=False), + sa.Column("tenant_id", sa.String(length=36), nullable=False), + sa.Column("storage_backend", sa.String(length=50), nullable=False), + sa.Column("storage_bucket", sa.String(length=255), nullable=True), + sa.Column("storage_key", sa.String(length=1000), nullable=False), + sa.Column("checksum_sha256", sa.String(length=64), nullable=False), + sa.Column("size_bytes", sa.Integer(), nullable=False), + sa.Column("content_type", sa.String(length=255), nullable=True), + sa.Column("ref_count", sa.Integer(), nullable=False, server_default="1"), + sa.Column("retained_until", sa.DateTime(timezone=True), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + sa.ForeignKeyConstraint(["tenant_id"], ["tenants.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("tenant_id", "checksum_sha256", "size_bytes", name="uq_file_blobs_tenant_checksum_size"), + ) + op.create_index(op.f("ix_file_blobs_tenant_id"), "file_blobs", ["tenant_id"]) + op.create_index(op.f("ix_file_blobs_checksum_sha256"), "file_blobs", ["checksum_sha256"]) + + op.create_table( + "file_assets", + sa.Column("id", sa.String(length=36), nullable=False), + sa.Column("tenant_id", sa.String(length=36), nullable=False), + sa.Column("owner_type", sa.String(length=20), nullable=False), + sa.Column("owner_user_id", sa.String(length=36), nullable=True), + sa.Column("owner_group_id", sa.String(length=36), nullable=True), + sa.Column("current_version_id", sa.String(length=36), nullable=True), + sa.Column("display_path", sa.String(length=1000), nullable=False), + sa.Column("filename", sa.String(length=500), nullable=False), + sa.Column("description", sa.Text(), nullable=True), + sa.Column("created_by_user_id", sa.String(length=36), nullable=True), + sa.Column("deleted_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("metadata", sa.JSON(), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + sa.ForeignKeyConstraint(["created_by_user_id"], ["users.id"], ondelete="SET NULL"), + sa.ForeignKeyConstraint(["owner_group_id"], ["groups.id"], ondelete="SET NULL"), + sa.ForeignKeyConstraint(["owner_user_id"], ["users.id"], ondelete="SET NULL"), + sa.ForeignKeyConstraint(["tenant_id"], ["tenants.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + ) + for col in ["tenant_id", "owner_type", "owner_user_id", "owner_group_id", "current_version_id", "display_path", "filename", "created_by_user_id", "deleted_at"]: + op.create_index(op.f(f"ix_file_assets_{col}"), "file_assets", [col]) + + op.create_table( + "file_versions", + sa.Column("id", sa.String(length=36), nullable=False), + sa.Column("tenant_id", sa.String(length=36), nullable=False), + sa.Column("file_asset_id", sa.String(length=36), nullable=False), + sa.Column("blob_id", sa.String(length=36), nullable=False), + sa.Column("version_number", sa.Integer(), nullable=False), + sa.Column("filename_at_upload", sa.String(length=500), nullable=False), + sa.Column("display_path_at_upload", sa.String(length=1000), nullable=False), + sa.Column("content_type", sa.String(length=255), nullable=True), + sa.Column("size_bytes", sa.Integer(), nullable=False), + sa.Column("checksum_sha256", sa.String(length=64), nullable=False), + sa.Column("created_by_user_id", sa.String(length=36), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + sa.ForeignKeyConstraint(["blob_id"], ["file_blobs.id"], ondelete="RESTRICT"), + sa.ForeignKeyConstraint(["created_by_user_id"], ["users.id"], ondelete="SET NULL"), + sa.ForeignKeyConstraint(["file_asset_id"], ["file_assets.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["tenant_id"], ["tenants.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("file_asset_id", "version_number", name="uq_file_versions_asset_number"), + ) + for col in ["tenant_id", "file_asset_id", "blob_id", "checksum_sha256", "created_by_user_id"]: + op.create_index(op.f(f"ix_file_versions_{col}"), "file_versions", [col]) + + op.create_table( + "file_shares", + sa.Column("id", sa.String(length=36), nullable=False), + sa.Column("tenant_id", sa.String(length=36), nullable=False), + sa.Column("file_asset_id", sa.String(length=36), nullable=False), + sa.Column("target_type", sa.String(length=20), nullable=False), + sa.Column("target_id", sa.String(length=36), nullable=False), + sa.Column("permission", sa.String(length=20), nullable=False, server_default="read"), + sa.Column("created_by_user_id", sa.String(length=36), nullable=True), + sa.Column("revoked_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + sa.ForeignKeyConstraint(["created_by_user_id"], ["users.id"], ondelete="SET NULL"), + sa.ForeignKeyConstraint(["file_asset_id"], ["file_assets.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["tenant_id"], ["tenants.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("file_asset_id", "target_type", "target_id", "revoked_at", name="uq_file_shares_active_target"), + ) + for col in ["tenant_id", "file_asset_id", "target_type", "target_id", "created_by_user_id", "revoked_at"]: + op.create_index(op.f(f"ix_file_shares_{col}"), "file_shares", [col]) + + op.create_table( + "campaign_attachment_uses", + sa.Column("id", sa.String(length=36), nullable=False), + sa.Column("tenant_id", sa.String(length=36), nullable=False), + sa.Column("campaign_id", sa.String(length=36), nullable=False), + sa.Column("campaign_version_id", sa.String(length=36), nullable=False), + sa.Column("campaign_job_id", sa.String(length=36), nullable=True), + sa.Column("entry_index", sa.Integer(), nullable=True), + sa.Column("entry_id", sa.String(length=255), nullable=True), + sa.Column("file_asset_id", sa.String(length=36), nullable=False), + sa.Column("file_version_id", sa.String(length=36), nullable=False), + sa.Column("file_blob_id", sa.String(length=36), nullable=False), + sa.Column("filename_used", sa.String(length=500), nullable=False), + sa.Column("checksum_sha256", sa.String(length=64), nullable=False), + sa.Column("size_bytes", sa.Integer(), nullable=False), + sa.Column("content_type", sa.String(length=255), nullable=True), + sa.Column("use_stage", sa.String(length=20), nullable=False, server_default="built"), + sa.Column("used_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + sa.ForeignKeyConstraint(["campaign_id"], ["campaigns.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["campaign_job_id"], ["campaign_jobs.id"], ondelete="SET NULL"), + sa.ForeignKeyConstraint(["campaign_version_id"], ["campaign_versions.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["file_asset_id"], ["file_assets.id"], ondelete="RESTRICT"), + sa.ForeignKeyConstraint(["file_blob_id"], ["file_blobs.id"], ondelete="RESTRICT"), + sa.ForeignKeyConstraint(["file_version_id"], ["file_versions.id"], ondelete="RESTRICT"), + sa.ForeignKeyConstraint(["tenant_id"], ["tenants.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("campaign_job_id", "file_version_id", "filename_used", "use_stage", name="uq_campaign_attachment_uses_job_file_stage"), + ) + for col in ["tenant_id", "campaign_id", "campaign_version_id", "campaign_job_id", "entry_id", "file_asset_id", "file_version_id", "file_blob_id", "use_stage", "used_at"]: + op.create_index(op.f(f"ix_campaign_attachment_uses_{col}"), "campaign_attachment_uses", [col]) + + +def downgrade() -> None: + op.drop_table("campaign_attachment_uses") + op.drop_table("file_shares") + op.drop_table("file_versions") + op.drop_table("file_assets") + op.drop_table("file_blobs") diff --git a/server/alembic/versions/4e5f6a7b8c9d_file_folders.py b/server/alembic/versions/4e5f6a7b8c9d_file_folders.py new file mode 100644 index 0000000..afed0f3 --- /dev/null +++ b/server/alembic/versions/4e5f6a7b8c9d_file_folders.py @@ -0,0 +1,45 @@ +"""file folders + +Revision ID: 4e5f6a7b8c9d +Revises: 3d4e5f6a7b8c +Create Date: 2026-06-12 00:30:00.000000 +""" +from __future__ import annotations + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +revision: str = "4e5f6a7b8c9d" +down_revision: Union[str, None] = "3d4e5f6a7b8c" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "file_folders", + sa.Column("id", sa.String(length=36), nullable=False), + sa.Column("tenant_id", sa.String(length=36), nullable=False), + sa.Column("owner_type", sa.String(length=20), nullable=False), + sa.Column("owner_user_id", sa.String(length=36), nullable=True), + sa.Column("owner_group_id", sa.String(length=36), nullable=True), + sa.Column("path", sa.String(length=1000), nullable=False), + sa.Column("created_by_user_id", sa.String(length=36), nullable=True), + sa.Column("deleted_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("metadata", sa.JSON(), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + sa.ForeignKeyConstraint(["created_by_user_id"], ["users.id"], ondelete="SET NULL"), + sa.ForeignKeyConstraint(["owner_group_id"], ["groups.id"], ondelete="SET NULL"), + sa.ForeignKeyConstraint(["owner_user_id"], ["users.id"], ondelete="SET NULL"), + sa.ForeignKeyConstraint(["tenant_id"], ["tenants.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + ) + for col in ["tenant_id", "owner_type", "owner_user_id", "owner_group_id", "path", "created_by_user_id", "deleted_at"]: + op.create_index(op.f(f"ix_file_folders_{col}"), "file_folders", [col]) + + +def downgrade() -> None: + op.drop_table("file_folders") diff --git a/server/app/api/v1/__init__.py b/server/app/api/v1/__init__.py index b0cec4e..573e085 100644 --- a/server/app/api/v1/__init__.py +++ b/server/app/api/v1/__init__.py @@ -6,6 +6,8 @@ from .campaigns import router as campaigns_router from .audit import router as audit_router from .system import router as system_router from .mail import router as mail_router +from .files import router as files_router +from .dev_mail import router as dev_mail_router router = APIRouter(prefix="/api/v1") router.include_router(auth_router) @@ -14,3 +16,5 @@ router.include_router(admin_router) router.include_router(audit_router) router.include_router(system_router) router.include_router(mail_router) +router.include_router(files_router) +router.include_router(dev_mail_router) diff --git a/server/app/api/v1/campaigns.py b/server/app/api/v1/campaigns.py index b258208..c1f2ace 100644 --- a/server/app/api/v1/campaigns.py +++ b/server/app/api/v1/campaigns.py @@ -1,7 +1,11 @@ from __future__ import annotations +import copy +import re + from fastapi import APIRouter, Depends, HTTPException, Response, status from sqlalchemy.orm import Session +from pydantic import BaseModel, Field from app.api.v1.schemas import ( BuildCampaignRequest, @@ -34,12 +38,14 @@ from app.mailer.persistence.campaigns import ( create_campaign_version_from_json, validate_campaign_version, ) +from app.storage.services import list_assets_for_user, resolve_patterns from app.mailer.persistence.versions import ( LockedCampaignVersionError, create_minimal_campaign, fork_campaign_version_for_edit, is_version_final_locked, is_user_locked_version, + is_version_locked, get_campaign_version_for_tenant, publish_campaign_version, unlock_validated_campaign_version, @@ -67,6 +73,37 @@ def _get_version_for_tenant(session: Session, version_id: str, tenant_id: str) - return version + + +def _sync_campaign_metadata_to_current_version(session: Session, campaign: Campaign) -> None: + """Keep editable version JSON aligned with version-independent campaign metadata. + + Campaign metadata can be edited from the overview while individual campaign + sections save the current version JSON later. Without this sync, a later + version save can re-apply stale `campaign.name` / `campaign.id` values from + raw_json and make the old overview metadata appear to come back. Audit-safe + or validation-locked versions are left untouched. + """ + + if not campaign.current_version_id: + return + + version = session.get(CampaignVersion, campaign.current_version_id) + if not version or version.campaign_id != campaign.id or is_version_locked(version): + return + + raw_json = copy.deepcopy(version.raw_json if isinstance(version.raw_json, dict) else {}) + campaign_section = raw_json.get("campaign") if isinstance(raw_json.get("campaign"), dict) else {} + raw_json["campaign"] = { + **campaign_section, + "id": campaign.external_id, + "name": campaign.name, + "description": campaign.description or "", + } + version.raw_json = raw_json + session.add(version) + + @router.post("", response_model=CampaignCreateResponse) def create_campaign( payload: CampaignCreateRequest, @@ -187,6 +224,8 @@ def update_campaign_metadata_endpoint( campaign.status = payload.status if payload.description is not None: campaign.description = payload.description + + _sync_campaign_metadata_to_current_version(session, campaign) session.add(campaign) session.commit() session.refresh(campaign) @@ -690,7 +729,10 @@ from app.api.v1.schemas import ( QueueCampaignResponse, SendCampaignNowRequest, SendCampaignNowResponse, + MockCampaignSendRequest, + MockCampaignSendResponse, ) +from app.mailer.dev.mock_campaign import MockCampaignSendError, run_mock_campaign_send from app.mailer.sending.jobs import ( QueueingError, cancel_campaign_jobs, @@ -734,6 +776,55 @@ def queue_campaign( raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(exc)) from exc +@router.post("/{campaign_id}/mock-send", response_model=MockCampaignSendResponse) +def mock_send_campaign( + campaign_id: str, + payload: MockCampaignSendRequest | None = None, + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("campaign:send")), +): + """Run a fully visible mock delivery flow without mutating campaign state. + + The route validates and builds the selected version, then optionally records + mock SMTP deliveries and mock IMAP appends. It never talks to the configured + real SMTP/IMAP servers and it does not mark the version sent/final. + """ + + payload = payload or MockCampaignSendRequest() + try: + result = run_mock_campaign_send( + session, + tenant_id=principal.tenant_id, + campaign_id=campaign_id, + version_id=payload.version_id, + send=payload.send, + include_warnings=payload.include_warnings, + include_needs_review=payload.include_needs_review, + append_sent=payload.append_sent, + clear_mailbox=payload.clear_mailbox, + check_files=payload.check_files, + ) + audit_from_principal( + session, + principal, + action="campaign.mock_send" if payload.send else "campaign.mock_send_review", + object_type="campaign", + object_id=campaign_id, + details={ + "version_id": result.get("version_id"), + "send_requested": payload.send, + "sent_count": result.get("send", {}).get("sent_count"), + "failed_count": result.get("send", {}).get("failed_count"), + }, + commit=True, + ) + return MockCampaignSendResponse(result=result) + except MockCampaignSendError as exc: + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(exc)) from exc + except Exception as exc: + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(exc)) from exc + + @router.post("/{campaign_id}/send-now", response_model=SendCampaignNowResponse) def send_campaign_now_endpoint( campaign_id: str, @@ -756,7 +847,7 @@ def send_campaign_now_endpoint( version = _get_version_for_tenant(session, version_id, principal.tenant_id) validation_result: dict[str, object] | None = version.validation_summary if isinstance(version.validation_summary, dict) else None - build_result: dict[str, object] | None = None + build_result: dict[str, object] | None = version.build_summary if isinstance(version.build_summary, dict) else None if is_user_locked_version(version): raise HTTPException( status_code=status.HTTP_409_CONFLICT, @@ -767,7 +858,7 @@ def send_campaign_now_endpoint( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Campaign version must be validated and locked before dry-run or sending.", ) - if not version.build_summary: + if not build_result: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Campaign version must be built before dry-run or sending.", @@ -874,3 +965,129 @@ def append_sent( return CampaignActionResponse(result=result) except QueueingError as exc: raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(exc)) from exc + + +class CampaignAttachmentPreviewRequest(BaseModel): + include_unmatched: bool = True + + +class CampaignAttachmentPreviewResponse(BaseModel): + campaign_id: str + version_id: str + shared_file_count: int + rules: list[dict[str, object]] = Field(default_factory=list) + unused_shared_files: list[dict[str, object]] = Field(default_factory=list) + + +_BRACE_PLACEHOLDER_RE = re.compile(r"(? str: + def value_for(raw_key: str) -> str: + key = raw_key.strip() + if key.startswith("local:"): + value = entry_fields.get(key.removeprefix("local:"), "") + elif key.startswith("local."): + value = entry_fields.get(key.removeprefix("local."), "") + elif key.startswith("global:"): + value = global_values.get(key.removeprefix("global:"), "") + elif key.startswith("global."): + value = global_values.get(key.removeprefix("global."), "") + else: + value = entry_fields.get(key, global_values.get(key, "")) + return "" if value is None else str(value) + + rendered = _BRACE_PLACEHOLDER_RE.sub(lambda match: value_for(match.group(1)), template) + return _DOLLAR_PLACEHOLDER_RE.sub(lambda match: value_for(match.group(1)), rendered) + + +def _rule_pattern(rule: dict[str, object], base_path_names: set[str], *, global_values: dict[str, object], entry_fields: dict[str, object]) -> str: + base_dir = _preview_render_template(str(rule.get("base_dir") or "."), global_values=global_values, entry_fields=entry_fields).strip().strip("/") + file_filter = _preview_render_template(str(rule.get("file_filter") or "*"), global_values=global_values, entry_fields=entry_fields).strip() or "*" + if not base_dir or base_dir == "." or base_dir in base_path_names: + return file_filter + return f"{base_dir}/{file_filter}" + + +def _file_preview(asset) -> dict[str, object]: + return { + "id": asset.id, + "display_path": asset.display_path, + "filename": asset.filename, + "owner_type": asset.owner_type, + "owner_id": asset.owner_user_id if asset.owner_type == "user" else asset.owner_group_id, + } + + +@router.post("/{campaign_id}/versions/{version_id}/attachments/preview", response_model=CampaignAttachmentPreviewResponse) +def preview_campaign_attachments( + campaign_id: str, + version_id: str, + payload: CampaignAttachmentPreviewRequest | None = None, + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("attachments:read")), +): + campaign = _get_campaign_for_tenant(session, campaign_id, principal.tenant_id) + version = _get_version_for_tenant(session, version_id, principal.tenant_id) + if version.campaign_id != campaign.id: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Campaign version not found") + + raw = version.raw_json if isinstance(version.raw_json, dict) else {} + attachments = raw.get("attachments") if isinstance(raw.get("attachments"), dict) else {} + base_paths = attachments.get("base_paths") if isinstance(attachments.get("base_paths"), list) else [] + base_path_names = {str(item.get("name")) for item in base_paths if isinstance(item, dict) and item.get("name")} + global_values = raw.get("global_values") if isinstance(raw.get("global_values"), dict) else {} + rules: list[dict[str, object]] = [] + + global_rules = attachments.get("global") if isinstance(attachments.get("global"), list) else [] + for index, rule in enumerate(global_rules): + if not isinstance(rule, dict): + continue + rules.append({ + "source": "global", + "index": index, + "label": rule.get("label"), + "required": bool(rule.get("required", True)), + "pattern": _rule_pattern(rule, base_path_names, global_values=global_values, entry_fields={}), + }) + + entries = raw.get("entries") if isinstance(raw.get("entries"), dict) else {} + inline_entries = entries.get("inline") if isinstance(entries.get("inline"), list) else [] + for entry_index, entry in enumerate(inline_entries, start=1): + if not isinstance(entry, dict): + continue + entry_fields = entry.get("fields") if isinstance(entry.get("fields"), dict) else {} + entry_rules = entry.get("attachments") if isinstance(entry.get("attachments"), list) else [] + for rule_index, rule in enumerate(entry_rules): + if not isinstance(rule, dict): + continue + rules.append({ + "source": "entry", + "entry_index": entry_index, + "entry_id": entry.get("id"), + "index": rule_index, + "label": rule.get("label"), + "required": bool(rule.get("required", True)), + "pattern": _rule_pattern(rule, base_path_names, global_values=global_values, entry_fields=entry_fields), + }) + + shared_assets = list_assets_for_user( + session, + tenant_id=principal.tenant_id, + user_id=principal.user.id, + campaign_id=campaign.id, + is_admin=principal.user.is_tenant_admin or "*" in set(principal.scopes or []), + ) + resolved, unmatched = resolve_patterns(shared_assets, [str(rule["pattern"]) for rule in rules]) + for rule, result in zip(rules, resolved, strict=False): + rule["matches"] = [_file_preview(asset) for asset in result.matches] + rule["match_count"] = len(result.matches) + + return CampaignAttachmentPreviewResponse( + campaign_id=campaign.id, + version_id=version.id, + shared_file_count=len(shared_assets), + rules=rules, + unused_shared_files=[_file_preview(asset) for asset in unmatched] if payload is None or payload.include_unmatched else [], + ) diff --git a/server/app/api/v1/dev_mail.py b/server/app/api/v1/dev_mail.py new file mode 100644 index 0000000..2469a64 --- /dev/null +++ b/server/app/api/v1/dev_mail.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +from typing import Any + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel, ConfigDict, Field + +from app.auth.dependencies import ApiPrincipal, require_scope +from app.mailer.dev.mock_mailbox import ( + clear_records, + get_failures, + get_record, + list_records, + set_failures, +) + +router = APIRouter(prefix="/dev/mailbox", tags=["dev-mailbox"]) + + +class MockMailboxListResponse(BaseModel): + messages: list[dict[str, Any]] + + +class MockMailboxMessageResponse(BaseModel): + message: dict[str, Any] + + +class MockMailboxClearResponse(BaseModel): + deleted_count: int + + +class MockMailboxFailureConfig(BaseModel): + model_config = ConfigDict(extra="forbid") + + fail_next_smtp: bool | None = None + fail_next_imap: bool | None = None + smtp_reject_recipients_containing: str | None = Field(default=None, max_length=255) + + +class MockMailboxFailureResponse(BaseModel): + config: dict[str, Any] + + +@router.get("/messages", response_model=MockMailboxListResponse) +def list_mock_mailbox_messages( + kind: str | None = None, + limit: int = 100, + principal: ApiPrincipal = Depends(require_scope("campaign:read")), +): + """List messages captured by the integrated development mail sandbox.""" + + del principal + return MockMailboxListResponse(messages=list_records(kind=kind, limit=limit)) + + +@router.get("/messages/{message_id}", response_model=MockMailboxMessageResponse) +def get_mock_mailbox_message( + message_id: str, + principal: ApiPrincipal = Depends(require_scope("campaign:read")), +): + del principal + message = get_record(message_id, include_raw=True) + if not message: + raise HTTPException(status_code=404, detail="Mock mailbox message not found") + return MockMailboxMessageResponse(message=message) + + +@router.delete("/messages", response_model=MockMailboxClearResponse) +def clear_mock_mailbox_messages( + principal: ApiPrincipal = Depends(require_scope("campaign:write")), +): + del principal + return MockMailboxClearResponse(deleted_count=clear_records()) + + +@router.get("/failures", response_model=MockMailboxFailureResponse) +def get_mock_failure_config( + principal: ApiPrincipal = Depends(require_scope("campaign:read")), +): + del principal + return MockMailboxFailureResponse(config=get_failures()) + + +@router.post("/failures", response_model=MockMailboxFailureResponse) +def update_mock_failure_config( + payload: MockMailboxFailureConfig, + principal: ApiPrincipal = Depends(require_scope("campaign:write")), +): + del principal + config = set_failures( + fail_next_smtp=payload.fail_next_smtp, + fail_next_imap=payload.fail_next_imap, + smtp_reject_recipients_containing=payload.smtp_reject_recipients_containing, + ) + return MockMailboxFailureResponse(config=config) diff --git a/server/app/api/v1/files.py b/server/app/api/v1/files.py new file mode 100644 index 0000000..b70b0bb --- /dev/null +++ b/server/app/api/v1/files.py @@ -0,0 +1,641 @@ +from __future__ import annotations + +from io import BytesIO +from typing import Any, Literal + +from fastapi import APIRouter, Depends, File as FastAPIFile, Form, HTTPException, UploadFile, status +from fastapi.responses import StreamingResponse +from pydantic import BaseModel, Field +from sqlalchemy.orm import Session + +from app.auth.dependencies import ApiPrincipal, require_scope +from app.db.models import Campaign, FileAsset, FileFolder, FileShare, Group +from app.db.session import get_session +from app.storage.paths import UnsafeFilePathError, filename_from_path, normalize_folder, normalize_logical_path +from app.storage.services import ( + FileStorageError, + asset_is_audit_relevant, + build_rename_preview, + create_file_asset, + create_folder, + create_zip_bytes, + current_version_and_blob, + extract_zip_upload, + get_asset_for_user, + list_assets_for_user, + list_folders_for_user, + rename_asset, + resolve_patterns, + share_file, + soft_delete_assets, + soft_delete_folder, + user_group_ids, + read_asset_bytes, +) + +router = APIRouter(prefix="/files", tags=["files"]) + + +class FileSpaceResponse(BaseModel): + id: str + label: str + owner_type: Literal["user", "group"] + owner_id: str + description: str | None = None + + +class FileSpacesResponse(BaseModel): + spaces: list[FileSpaceResponse] + + +class FileShareResponse(BaseModel): + id: str + target_type: str + target_id: str + permission: str + created_at: str + revoked_at: str | None = None + + +class FileAssetResponse(BaseModel): + id: str + tenant_id: str + owner_type: str + owner_id: str + display_path: str + filename: str + description: str | None = None + size_bytes: int + content_type: str | None = None + checksum_sha256: str + version_id: str + created_at: str + updated_at: str + deleted_at: str | None = None + audit_relevant: bool = False + metadata: dict[str, Any] | None = None + shares: list[FileShareResponse] = Field(default_factory=list) + + +class FileFolderResponse(BaseModel): + id: str + tenant_id: str + owner_type: str + owner_id: str + path: str + created_at: str + updated_at: str + deleted_at: str | None = None + + +class FileFoldersResponse(BaseModel): + folders: list[FileFolderResponse] + + +class FileFolderCreateRequest(BaseModel): + owner_type: Literal["user", "group"] + owner_id: str + path: str + + +class FileFolderDeleteRequest(BaseModel): + owner_type: Literal["user", "group"] + owner_id: str + path: str + recursive: bool = True + + +class FileFolderDeleteResponse(BaseModel): + deleted_folders: int + deleted_files: int + + +class FileListResponse(BaseModel): + files: list[FileAssetResponse] + + +class FileUploadResponse(BaseModel): + files: list[FileAssetResponse] + + +class BulkDeleteRequest(BaseModel): + file_ids: list[str] + + +class BulkDeleteResponse(BaseModel): + deleted_count: int + + +class FileShareRequest(BaseModel): + target_type: Literal["user", "group", "campaign", "tenant"] + target_id: str + permission: Literal["read", "write", "manage"] = "read" + + +class RenameRequest(BaseModel): + file_ids: list[str] + mode: Literal["prefix", "suffix", "replace"] + find: str | None = None + replacement: str = "" + prefix: str = "" + suffix: str = "" + dry_run: bool = True + + +class RenamePreviewItem(BaseModel): + file_id: str + old_path: str + new_path: str + + +class RenameResponse(BaseModel): + dry_run: bool + items: list[RenamePreviewItem] + + +class ArchiveRequest(BaseModel): + file_ids: list[str] + filename: str = "files.zip" + + +class PatternResolveRequest(BaseModel): + patterns: list[str] + owner_type: Literal["user", "group"] | None = None + owner_id: str | None = None + campaign_id: str | None = None + path_prefix: str | None = None + include_unmatched: bool = True + + +class PatternMatchResponse(BaseModel): + pattern: str + matches: list[FileAssetResponse] + + +class PatternResolveResponse(BaseModel): + patterns: list[PatternMatchResponse] + unmatched: list[FileAssetResponse] = Field(default_factory=list) + + +def _is_admin(principal: ApiPrincipal) -> bool: + return principal.user.is_tenant_admin or "*" in set(principal.scopes or []) + + +def _http_error(exc: Exception, *, not_found: bool = False) -> HTTPException: + code = status.HTTP_404_NOT_FOUND if not_found else status.HTTP_400_BAD_REQUEST + return HTTPException(status_code=code, detail=str(exc)) + + +def _owner_id(asset: FileAsset) -> str: + return asset.owner_user_id if asset.owner_type == "user" else asset.owner_group_id # type: ignore[return-value] + + +def _asset_response(session: Session, asset: FileAsset, *, include_shares: bool = False) -> FileAssetResponse: + version, blob = current_version_and_blob(session, asset) + shares: list[FileShareResponse] = [] + if include_shares: + rows = session.query(FileShare).filter(FileShare.file_asset_id == asset.id).order_by(FileShare.created_at.desc()).all() + shares = [ + FileShareResponse( + id=row.id, + target_type=row.target_type, + target_id=row.target_id, + permission=row.permission, + created_at=row.created_at.isoformat(), + revoked_at=row.revoked_at.isoformat() if row.revoked_at else None, + ) + for row in rows + ] + return FileAssetResponse( + id=asset.id, + tenant_id=asset.tenant_id, + owner_type=asset.owner_type, + owner_id=_owner_id(asset), + display_path=asset.display_path, + filename=asset.filename, + description=asset.description, + size_bytes=blob.size_bytes, + content_type=blob.content_type, + checksum_sha256=blob.checksum_sha256, + version_id=version.id, + created_at=asset.created_at.isoformat(), + updated_at=asset.updated_at.isoformat(), + deleted_at=asset.deleted_at.isoformat() if asset.deleted_at else None, + audit_relevant=asset_is_audit_relevant(session, asset), + metadata=asset.metadata_ or {}, + shares=shares, + ) + + +def _folder_owner_id(folder: FileFolder) -> str: + return folder.owner_user_id if folder.owner_type == "user" else folder.owner_group_id # type: ignore[return-value] + + +def _folder_response(folder: FileFolder) -> FileFolderResponse: + return FileFolderResponse( + id=folder.id, + tenant_id=folder.tenant_id, + owner_type=folder.owner_type, + owner_id=_folder_owner_id(folder), + path=folder.path, + created_at=folder.created_at.isoformat(), + updated_at=folder.updated_at.isoformat(), + deleted_at=folder.deleted_at.isoformat() if folder.deleted_at else None, + ) + + +def _ensure_list_owner_access(session: Session, principal: ApiPrincipal, owner_type: str | None, owner_id: str | None) -> None: + if not owner_type: + return + if owner_type == "user" and owner_id and owner_id != principal.user.id and not _is_admin(principal): + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No access to this user file space") + if owner_type == "group" and owner_id: + try: + from app.storage.services import ensure_group_access + + ensure_group_access( + session, + tenant_id=principal.tenant_id, + group_id=owner_id, + user_id=principal.user.id, + is_admin=_is_admin(principal), + ) + except FileStorageError as exc: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(exc)) from exc + + +@router.get("/spaces", response_model=FileSpacesResponse) +def list_file_spaces( + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("attachments:read")), +): + spaces = [ + FileSpaceResponse( + id=f"user:{principal.user.id}", + label="My files", + owner_type="user", + owner_id=principal.user.id, + description="Files owned by your user account.", + ) + ] + group_ids = user_group_ids(session, tenant_id=principal.tenant_id, user_id=principal.user.id, include_admin_groups=_is_admin(principal)) + if group_ids: + groups = session.query(Group).filter(Group.tenant_id == principal.tenant_id, Group.id.in_(group_ids)).order_by(Group.name.asc()).all() + spaces.extend( + FileSpaceResponse( + id=f"group:{group.id}", + label=f"{group.name} files", + owner_type="group", + owner_id=group.id, + description="Files owned by this group.", + ) + for group in groups + ) + return FileSpacesResponse(spaces=spaces) + + +@router.get("/folders", response_model=FileFoldersResponse) +def list_file_folders( + owner_type: Literal["user", "group"], + owner_id: str, + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("attachments:read")), +): + try: + folders = list_folders_for_user( + session, + tenant_id=principal.tenant_id, + user_id=principal.user.id, + owner_type=owner_type, + owner_id=owner_id, + is_admin=_is_admin(principal), + ) + return FileFoldersResponse(folders=[_folder_response(folder) for folder in folders]) + except FileStorageError as exc: + raise _http_error(exc) from exc + + +@router.post("/folders", response_model=FileFolderResponse) +def create_file_folder( + payload: FileFolderCreateRequest, + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("attachments:write")), +): + try: + folder = create_folder( + session, + tenant_id=principal.tenant_id, + owner_type=payload.owner_type, + owner_id=payload.owner_id, + user_id=principal.user.id, + path=payload.path, + is_admin=_is_admin(principal), + ) + session.commit() + return _folder_response(folder) + except (FileStorageError, UnsafeFilePathError, ValueError) as exc: + session.rollback() + raise _http_error(exc) from exc + + +@router.post("/folders/delete", response_model=FileFolderDeleteResponse) +def delete_file_folder( + payload: FileFolderDeleteRequest, + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("attachments:write")), +): + try: + deleted_folders, deleted_files = soft_delete_folder( + session, + tenant_id=principal.tenant_id, + owner_type=payload.owner_type, + owner_id=payload.owner_id, + user_id=principal.user.id, + path=payload.path, + recursive=payload.recursive, + is_admin=_is_admin(principal), + ) + session.commit() + return FileFolderDeleteResponse(deleted_folders=deleted_folders, deleted_files=deleted_files) + except (FileStorageError, UnsafeFilePathError, ValueError) as exc: + session.rollback() + raise _http_error(exc) from exc + + +@router.get("", response_model=FileListResponse) +def list_files( + owner_type: Literal["user", "group"] | None = None, + owner_id: str | None = None, + campaign_id: str | None = None, + path_prefix: str | None = None, + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("attachments:read")), +): + _ensure_list_owner_access(session, principal, owner_type, owner_id) + assets = list_assets_for_user( + session, + tenant_id=principal.tenant_id, + user_id=principal.user.id, + owner_type=owner_type, + owner_id=owner_id, + campaign_id=campaign_id, + path_prefix=path_prefix, + is_admin=_is_admin(principal), + ) + return FileListResponse(files=[_asset_response(session, asset, include_shares=True) for asset in assets]) + + +@router.post("/upload", response_model=FileUploadResponse) +async def upload_files( + files: list[UploadFile] = FastAPIFile(...), + owner_type: Literal["user", "group"] = Form(default="user"), + owner_id: str | None = Form(default=None), + path: str = Form(default=""), + campaign_id: str | None = Form(default=None), + unpack_zip: bool = Form(default=False), + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("attachments:write")), +): + target_owner = owner_id or principal.user.id + uploaded_assets: list[FileAsset] = [] + try: + for upload in files: + data = await upload.read() + filename = upload.filename or "file" + content_type = upload.content_type or None + if unpack_zip and filename.lower().endswith(".zip"): + extracted = extract_zip_upload( + session, + tenant_id=principal.tenant_id, + owner_type=owner_type, + owner_id=target_owner, + user_id=principal.user.id, + zip_data=data, + folder=path, + campaign_id=campaign_id, + is_admin=_is_admin(principal), + ) + uploaded_assets.extend(item.asset for item in extracted) + continue + stored = create_file_asset( + session, + tenant_id=principal.tenant_id, + owner_type=owner_type, + owner_id=target_owner, + user_id=principal.user.id, + filename=filename, + data=data, + folder=path, + content_type=content_type, + campaign_id=campaign_id, + is_admin=_is_admin(principal), + ) + uploaded_assets.append(stored.asset) + session.commit() + except (FileStorageError, UnsafeFilePathError, ValueError) as exc: + session.rollback() + raise _http_error(exc) from exc + return FileUploadResponse(files=[_asset_response(session, asset, include_shares=True) for asset in uploaded_assets]) + + +@router.post("/upload-zip", response_model=FileUploadResponse) +async def upload_zip( + file: UploadFile = FastAPIFile(...), + owner_type: Literal["user", "group"] = Form(default="user"), + owner_id: str | None = Form(default=None), + path: str = Form(default=""), + campaign_id: str | None = Form(default=None), + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("attachments:write")), +): + data = await file.read() + target_owner = owner_id or principal.user.id + try: + extracted = extract_zip_upload( + session, + tenant_id=principal.tenant_id, + owner_type=owner_type, + owner_id=target_owner, + user_id=principal.user.id, + zip_data=data, + folder=path, + campaign_id=campaign_id, + is_admin=_is_admin(principal), + ) + session.commit() + except (FileStorageError, UnsafeFilePathError, ValueError) as exc: + session.rollback() + raise _http_error(exc) from exc + return FileUploadResponse(files=[_asset_response(session, item.asset, include_shares=True) for item in extracted]) + + +@router.get("/{file_id}", response_model=FileAssetResponse) +def get_file( + file_id: str, + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("attachments:read")), +): + try: + asset = get_asset_for_user(session, tenant_id=principal.tenant_id, user_id=principal.user.id, asset_id=file_id, is_admin=_is_admin(principal)) + return _asset_response(session, asset, include_shares=True) + except FileStorageError as exc: + raise _http_error(exc, not_found=True) from exc + + +@router.get("/{file_id}/download") +def download_file( + file_id: str, + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("attachments:read")), +): + try: + asset = get_asset_for_user(session, tenant_id=principal.tenant_id, user_id=principal.user.id, asset_id=file_id, is_admin=_is_admin(principal)) + data, _, blob = read_asset_bytes(session, asset) + except FileStorageError as exc: + raise _http_error(exc, not_found=True) from exc + headers = {"Content-Disposition": f'attachment; filename="{asset.filename}"'} + return StreamingResponse(BytesIO(data), media_type=blob.content_type or "application/octet-stream", headers=headers) + + +@router.delete("/{file_id}", response_model=BulkDeleteResponse) +def delete_file( + file_id: str, + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("attachments:write")), +): + try: + asset = get_asset_for_user(session, tenant_id=principal.tenant_id, user_id=principal.user.id, asset_id=file_id, require_write=True, is_admin=_is_admin(principal)) + count = soft_delete_assets(session, [asset]) + session.commit() + return BulkDeleteResponse(deleted_count=count) + except FileStorageError as exc: + session.rollback() + raise _http_error(exc, not_found=True) from exc + + +@router.post("/bulk-delete", response_model=BulkDeleteResponse) +def bulk_delete_files( + payload: BulkDeleteRequest, + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("attachments:write")), +): + try: + assets = [ + get_asset_for_user(session, tenant_id=principal.tenant_id, user_id=principal.user.id, asset_id=file_id, require_write=True, is_admin=_is_admin(principal)) + for file_id in payload.file_ids + ] + count = soft_delete_assets(session, assets) + session.commit() + return BulkDeleteResponse(deleted_count=count) + except FileStorageError as exc: + session.rollback() + raise _http_error(exc) from exc + + +@router.post("/{file_id}/shares", response_model=FileShareResponse) +def create_share( + file_id: str, + payload: FileShareRequest, + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("attachments:write")), +): + try: + asset = get_asset_for_user(session, tenant_id=principal.tenant_id, user_id=principal.user.id, asset_id=file_id, require_write=True, is_admin=_is_admin(principal)) + share = share_file( + session, + tenant_id=principal.tenant_id, + asset=asset, + target_type=payload.target_type, + target_id=payload.target_id, + permission=payload.permission, + user_id=principal.user.id, + ) + session.commit() + return FileShareResponse( + id=share.id, + target_type=share.target_type, + target_id=share.target_id, + permission=share.permission, + created_at=share.created_at.isoformat(), + revoked_at=share.revoked_at.isoformat() if share.revoked_at else None, + ) + except FileStorageError as exc: + session.rollback() + raise _http_error(exc) from exc + + +@router.post("/bulk-rename", response_model=RenameResponse) +def bulk_rename( + payload: RenameRequest, + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("attachments:write")), +): + try: + assets = [ + get_asset_for_user(session, tenant_id=principal.tenant_id, user_id=principal.user.id, asset_id=file_id, require_write=True, is_admin=_is_admin(principal)) + for file_id in payload.file_ids + ] + previews = [ + RenamePreviewItem( + file_id=asset.id, + old_path=asset.display_path, + new_path=normalize_logical_path(build_rename_preview(asset, mode=payload.mode, find=payload.find, replacement=payload.replacement, prefix=payload.prefix, suffix=payload.suffix)), + ) + for asset in assets + ] + if not payload.dry_run: + by_id = {asset.id: asset for asset in assets} + for item in previews: + rename_asset(by_id[item.file_id], new_path=item.new_path) + session.add(by_id[item.file_id]) + session.commit() + return RenameResponse(dry_run=payload.dry_run, items=previews) + except (FileStorageError, UnsafeFilePathError, ValueError) as exc: + session.rollback() + raise _http_error(exc) from exc + + +@router.post("/archive.zip") +def download_archive( + payload: ArchiveRequest, + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("attachments:read")), +): + try: + assets = [ + get_asset_for_user(session, tenant_id=principal.tenant_id, user_id=principal.user.id, asset_id=file_id, is_admin=_is_admin(principal)) + for file_id in payload.file_ids + ] + data = create_zip_bytes(session, assets) + except FileStorageError as exc: + raise _http_error(exc) from exc + filename = filename_from_path(normalize_logical_path(payload.filename, fallback_filename="files.zip")) + headers = {"Content-Disposition": f'attachment; filename="{filename}"'} + return StreamingResponse(BytesIO(data), media_type="application/zip", headers=headers) + + +@router.post("/resolve-patterns", response_model=PatternResolveResponse) +def resolve_file_patterns( + payload: PatternResolveRequest, + session: Session = Depends(get_session), + principal: ApiPrincipal = Depends(require_scope("attachments:read")), +): + _ensure_list_owner_access(session, principal, payload.owner_type, payload.owner_id) + try: + assets = list_assets_for_user( + session, + tenant_id=principal.tenant_id, + user_id=principal.user.id, + owner_type=payload.owner_type, + owner_id=payload.owner_id, + campaign_id=payload.campaign_id, + path_prefix=payload.path_prefix, + is_admin=_is_admin(principal), + ) + resolved, unmatched = resolve_patterns(assets, payload.patterns, base_path=payload.path_prefix) + return PatternResolveResponse( + patterns=[PatternMatchResponse(pattern=item.pattern, matches=[_asset_response(session, asset) for asset in item.matches]) for item in resolved], + unmatched=[_asset_response(session, asset) for asset in unmatched] if payload.include_unmatched else [], + ) + except (FileStorageError, UnsafeFilePathError, ValueError) as exc: + raise _http_error(exc) from exc diff --git a/server/app/api/v1/schemas.py b/server/app/api/v1/schemas.py index b7cc698..7ec3d9a 100644 --- a/server/app/api/v1/schemas.py +++ b/server/app/api/v1/schemas.py @@ -227,6 +227,22 @@ class SendCampaignNowResponse(BaseModel): result: dict[str, Any] +class MockCampaignSendRequest(BaseModel): + model_config = ConfigDict(extra="forbid") + + version_id: str | None = None + send: bool = False + include_warnings: bool = True + include_needs_review: bool = False + append_sent: bool = True + clear_mailbox: bool = False + check_files: bool = False + + +class MockCampaignSendResponse(BaseModel): + result: dict[str, Any] + + class AppendSentRequest(BaseModel): model_config = ConfigDict(extra="forbid") diff --git a/server/app/db/models.py b/server/app/db/models.py index 1d41040..3e4cc05 100644 --- a/server/app/db/models.py +++ b/server/app/db/models.py @@ -351,6 +351,107 @@ class AttachmentInstance(Base, TimestampMixin): metadata_: Mapped[dict[str, Any] | None] = mapped_column("metadata", JSON, nullable=True) +class FileBlob(Base, TimestampMixin): + __tablename__ = "file_blobs" + __table_args__ = (UniqueConstraint("tenant_id", "checksum_sha256", "size_bytes", name="uq_file_blobs_tenant_checksum_size"),) + + id: Mapped[str] = mapped_column(String(36), primary_key=True, default=new_uuid) + tenant_id: Mapped[str] = mapped_column(ForeignKey("tenants.id", ondelete="CASCADE"), nullable=False, index=True) + storage_backend: Mapped[str] = mapped_column(String(50), nullable=False) + storage_bucket: Mapped[str | None] = mapped_column(String(255)) + storage_key: Mapped[str] = mapped_column(String(1000), nullable=False) + checksum_sha256: Mapped[str] = mapped_column(String(64), nullable=False, index=True) + size_bytes: Mapped[int] = mapped_column(Integer, nullable=False) + content_type: Mapped[str | None] = mapped_column(String(255)) + ref_count: Mapped[int] = mapped_column(Integer, default=1, nullable=False) + retained_until: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + + +class FileFolder(Base, TimestampMixin): + __tablename__ = "file_folders" + + id: Mapped[str] = mapped_column(String(36), primary_key=True, default=new_uuid) + tenant_id: Mapped[str] = mapped_column(ForeignKey("tenants.id", ondelete="CASCADE"), nullable=False, index=True) + owner_type: Mapped[str] = mapped_column(String(20), nullable=False, index=True) + owner_user_id: Mapped[str | None] = mapped_column(ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True) + owner_group_id: Mapped[str | None] = mapped_column(ForeignKey("groups.id", ondelete="SET NULL"), nullable=True, index=True) + path: Mapped[str] = mapped_column(String(1000), nullable=False, index=True) + created_by_user_id: Mapped[str | None] = mapped_column(ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True) + deleted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True, index=True) + metadata_: Mapped[dict[str, Any] | None] = mapped_column("metadata", JSON, nullable=True) + + +class FileAsset(Base, TimestampMixin): + __tablename__ = "file_assets" + + id: Mapped[str] = mapped_column(String(36), primary_key=True, default=new_uuid) + tenant_id: Mapped[str] = mapped_column(ForeignKey("tenants.id", ondelete="CASCADE"), nullable=False, index=True) + owner_type: Mapped[str] = mapped_column(String(20), nullable=False, index=True) + owner_user_id: Mapped[str | None] = mapped_column(ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True) + owner_group_id: Mapped[str | None] = mapped_column(ForeignKey("groups.id", ondelete="SET NULL"), nullable=True, index=True) + current_version_id: Mapped[str | None] = mapped_column(String(36), nullable=True, index=True) + display_path: Mapped[str] = mapped_column(String(1000), nullable=False, index=True) + filename: Mapped[str] = mapped_column(String(500), nullable=False, index=True) + description: Mapped[str | None] = mapped_column(Text) + created_by_user_id: Mapped[str | None] = mapped_column(ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True) + deleted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True, index=True) + metadata_: Mapped[dict[str, Any] | None] = mapped_column("metadata", JSON, nullable=True) + + +class FileVersion(Base, TimestampMixin): + __tablename__ = "file_versions" + __table_args__ = (UniqueConstraint("file_asset_id", "version_number", name="uq_file_versions_asset_number"),) + + id: Mapped[str] = mapped_column(String(36), primary_key=True, default=new_uuid) + tenant_id: Mapped[str] = mapped_column(ForeignKey("tenants.id", ondelete="CASCADE"), nullable=False, index=True) + file_asset_id: Mapped[str] = mapped_column(ForeignKey("file_assets.id", ondelete="CASCADE"), nullable=False, index=True) + blob_id: Mapped[str] = mapped_column(ForeignKey("file_blobs.id", ondelete="RESTRICT"), nullable=False, index=True) + version_number: Mapped[int] = mapped_column(Integer, nullable=False) + filename_at_upload: Mapped[str] = mapped_column(String(500), nullable=False) + display_path_at_upload: Mapped[str] = mapped_column(String(1000), nullable=False) + content_type: Mapped[str | None] = mapped_column(String(255)) + size_bytes: Mapped[int] = mapped_column(Integer, nullable=False) + checksum_sha256: Mapped[str] = mapped_column(String(64), nullable=False, index=True) + created_by_user_id: Mapped[str | None] = mapped_column(ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True) + + +class FileShare(Base, TimestampMixin): + __tablename__ = "file_shares" + __table_args__ = (UniqueConstraint("file_asset_id", "target_type", "target_id", "revoked_at", name="uq_file_shares_active_target"),) + + id: Mapped[str] = mapped_column(String(36), primary_key=True, default=new_uuid) + tenant_id: Mapped[str] = mapped_column(ForeignKey("tenants.id", ondelete="CASCADE"), nullable=False, index=True) + file_asset_id: Mapped[str] = mapped_column(ForeignKey("file_assets.id", ondelete="CASCADE"), nullable=False, index=True) + target_type: Mapped[str] = mapped_column(String(20), nullable=False, index=True) + target_id: Mapped[str] = mapped_column(String(36), nullable=False, index=True) + permission: Mapped[str] = mapped_column(String(20), default="read", nullable=False) + created_by_user_id: Mapped[str | None] = mapped_column(ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True) + revoked_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True, index=True) + + +class CampaignAttachmentUse(Base, TimestampMixin): + __tablename__ = "campaign_attachment_uses" + __table_args__ = (UniqueConstraint("campaign_job_id", "file_version_id", "filename_used", "use_stage", name="uq_campaign_attachment_uses_job_file_stage"),) + + id: Mapped[str] = mapped_column(String(36), primary_key=True, default=new_uuid) + tenant_id: Mapped[str] = mapped_column(ForeignKey("tenants.id", ondelete="CASCADE"), nullable=False, index=True) + campaign_id: Mapped[str] = mapped_column(ForeignKey("campaigns.id", ondelete="CASCADE"), nullable=False, index=True) + campaign_version_id: Mapped[str] = mapped_column(ForeignKey("campaign_versions.id", ondelete="CASCADE"), nullable=False, index=True) + campaign_job_id: Mapped[str | None] = mapped_column(ForeignKey("campaign_jobs.id", ondelete="SET NULL"), nullable=True, index=True) + entry_index: Mapped[int | None] = mapped_column(Integer) + entry_id: Mapped[str | None] = mapped_column(String(255), index=True) + file_asset_id: Mapped[str] = mapped_column(ForeignKey("file_assets.id", ondelete="RESTRICT"), nullable=False, index=True) + file_version_id: Mapped[str] = mapped_column(ForeignKey("file_versions.id", ondelete="RESTRICT"), nullable=False, index=True) + file_blob_id: Mapped[str] = mapped_column(ForeignKey("file_blobs.id", ondelete="RESTRICT"), nullable=False, index=True) + filename_used: Mapped[str] = mapped_column(String(500), nullable=False) + checksum_sha256: Mapped[str] = mapped_column(String(64), nullable=False) + size_bytes: Mapped[int] = mapped_column(Integer, nullable=False) + content_type: Mapped[str | None] = mapped_column(String(255)) + use_stage: Mapped[str] = mapped_column(String(20), default="built", nullable=False, index=True) + used_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True, index=True) + + + class SendAttempt(Base, TimestampMixin): __tablename__ = "send_attempts" diff --git a/server/app/mailer/attachments/resolver.py b/server/app/mailer/attachments/resolver.py index fe581d1..87b7aa2 100644 --- a/server/app/mailer/attachments/resolver.py +++ b/server/app/mailer/attachments/resolver.py @@ -183,9 +183,14 @@ def _recipient_values(entry: EntryConfig) -> dict[str, str]: def _template_values(config: CampaignConfig, entry: EntryConfig) -> dict[str, Any]: values: dict[str, Any] = {} + for field in config.fields: + values.setdefault(field.name, "") + values.setdefault(f"global::{field.name}", "") + values.setdefault(f"local::{field.name}", "") for key, value in config.global_values.items(): values[f"global::{key}"] = value for key, value in effective_entry_field_values(config, entry).items(): + values[key] = value values[f"local::{key}"] = value if entry.id: values["local::id"] = entry.id diff --git a/server/app/mailer/campaign/models.py b/server/app/mailer/campaign/models.py index 3fa5f86..de5dccc 100644 --- a/server/app/mailer/campaign/models.py +++ b/server/app/mailer/campaign/models.py @@ -213,6 +213,7 @@ class AttachmentBasePathConfig(StrictModel): name: str path: str = "." allow_individual: bool = False + unsent_warning: bool = False # Legacy UI builds briefly wrote a source value. Keep accepting it so older # drafts do not become invalid merely because the current UI no longer shows # or edits that column. @@ -222,11 +223,24 @@ class AttachmentBasePathConfig(StrictModel): class AttachmentConfig(StrictModel): id: str | None = None label: str | None = None + # Legacy UI helper. Current attachment resolution ignores this value and + # treats direct files as plain file_filter patterns without wildcards. + # Keep accepting it so existing drafts with {"type": ""}, "direct" + # or "pattern" remain valid. + type_: str | None = Field(default=None, alias="type") base_dir: str file_filter: str include_subdirs: bool = False required: bool = True allow_multiple: bool = False + + @field_validator("type_", mode="before") + @classmethod + def empty_type_means_unset(cls, value: Any) -> Any: + if value == "": + return None + return value + # None means: inherit from validation_policy. Explicit values remain # supported for backwards compatibility and per-rule overrides. missing_behavior: Behavior | None = None @@ -335,6 +349,7 @@ class ValidationPolicy(StrictModel): missing_optional_attachment: Behavior = Behavior.WARN ambiguous_attachment_match: Behavior = Behavior.ASK ignore_empty_fields: bool = False + unsent_attachment_files: Behavior = Behavior.WARN missing_email: MissingAddressBehavior = MissingAddressBehavior.BLOCK template_error: MissingAddressBehavior = MissingAddressBehavior.BLOCK inactive_entry: InactiveEntryBehavior = InactiveEntryBehavior.DROP diff --git a/server/app/mailer/dev/__init__.py b/server/app/mailer/dev/__init__.py new file mode 100644 index 0000000..ceebce9 --- /dev/null +++ b/server/app/mailer/dev/__init__.py @@ -0,0 +1 @@ +"""Development-only mail sandbox helpers.""" diff --git a/server/app/mailer/dev/mock_campaign.py b/server/app/mailer/dev/mock_campaign.py new file mode 100644 index 0000000..1bdb495 --- /dev/null +++ b/server/app/mailer/dev/mock_campaign.py @@ -0,0 +1,280 @@ +from __future__ import annotations + +import json +import tempfile +from email import policy +from email.message import EmailMessage +from pathlib import Path +from typing import Any + +from sqlalchemy.orm import Session + +from app.db.models import Campaign, CampaignVersion +from app.mailer.campaign.loader import load_campaign_config +from app.mailer.campaign.validation import validate_campaign_config +from app.mailer.messages.builder import build_campaign_messages +from app.mailer.messages.models import MessageAddress, MessageDraft, MessageValidationStatus +from app.mailer.dev.mock_mailbox import ( + clear_records, + consume_fail_next_imap, + consume_fail_next_smtp, + get_failures, + list_records, + record_imap_append, + record_smtp_delivery, +) + + +class MockCampaignSendError(RuntimeError): + pass + + +def _message_address_payload(address: MessageAddress | None) -> dict[str, Any] | None: + if address is None: + return None + return {"email": address.email, "name": address.name} + + +def _message_addresses_payload(addresses: list[MessageAddress]) -> list[dict[str, Any]]: + return [{"email": item.email, "name": item.name} for item in addresses] + + +def _issue_payloads(message: MessageDraft) -> list[dict[str, Any]]: + return [issue.model_dump(mode="json") for issue in message.issues] + + +def _attachment_payloads(message: MessageDraft) -> list[dict[str, Any]]: + return [attachment.model_dump(mode="json") for attachment in message.attachments] + + +def _message_payload(message: MessageDraft) -> dict[str, Any]: + return { + "entry_index": message.entry_index, + "entry_id": message.entry_id, + "active": message.active, + "subject": message.subject, + "from": _message_address_payload(message.from_), + "to": _message_addresses_payload(message.to), + "cc": _message_addresses_payload(message.cc), + "bcc": _message_addresses_payload(message.bcc), + "reply_to": _message_addresses_payload(message.reply_to), + "build_status": str(message.build_status.value if hasattr(message.build_status, "value") else message.build_status), + "validation_status": message.validation_status.value, + "send_status": str(message.send_status.value if hasattr(message.send_status, "value") else message.send_status), + "imap_status": message.imap_status.value, + "attachment_count": message.attachment_count, + "attachments": _attachment_payloads(message), + "issues": _issue_payloads(message), + "eml_size_bytes": message.eml_size_bytes, + "queueable": message.is_queueable, + } + + +def _recipient_emails(message: MessageDraft) -> list[str]: + values = [item.email for item in message.to + message.cc + message.bcc if item.email] + return list(dict.fromkeys(values)) + + +def _envelope_from(message: MessageDraft, *, fallback: str = "mock-sender@mock.local") -> str: + if message.bounce_to: + return message.bounce_to[0].email + if message.from_ and message.from_.email: + return message.from_.email + return fallback + + +def _raw_message_bytes(message: EmailMessage) -> bytes: + return message.as_bytes(policy=policy.SMTP) + + +def _smtp_rejection_matches(recipients: list[str]) -> list[str]: + needle = (get_failures().get("smtp_reject_recipients_containing") or "").strip().lower() + if not needle: + return [] + return [recipient for recipient in recipients if needle in recipient.lower()] + + +def _can_mock_send( + message: MessageDraft, + *, + include_warnings: bool, + include_needs_review: bool, +) -> tuple[bool, str | None]: + if not message.active: + return False, "Recipient is inactive" + if str(message.build_status.value if hasattr(message.build_status, "value") else message.build_status) != "built": + return False, f"Message is not built ({message.build_status})" + if message.validation_status == MessageValidationStatus.READY: + return True, None + if message.validation_status == MessageValidationStatus.WARNING and include_warnings: + return True, None + if message.validation_status == MessageValidationStatus.NEEDS_REVIEW and include_needs_review: + return True, None + return False, f"Validation status is {message.validation_status.value}" + + +def run_mock_campaign_send( + session: Session, + *, + tenant_id: str, + campaign_id: str, + version_id: str | None = None, + send: bool = False, + include_warnings: bool = True, + include_needs_review: bool = False, + append_sent: bool = True, + clear_mailbox: bool = False, + check_files: bool = False, +) -> dict[str, Any]: + """Validate, build and optionally mock-send a version without mutating it. + + This is a dev/test route. It does not change campaign/version status, does + not queue real jobs and does not use the configured SMTP/IMAP servers. It + records mock SMTP deliveries and mock IMAP appends in the integrated mock + mailbox only when send=True. + """ + + campaign = session.query(Campaign).filter(Campaign.id == campaign_id, Campaign.tenant_id == tenant_id).one_or_none() + if not campaign: + raise MockCampaignSendError("Campaign not found or not accessible") + wanted_version_id = version_id or campaign.current_version_id + if not wanted_version_id: + raise MockCampaignSendError("Campaign has no current version") + version = session.get(CampaignVersion, wanted_version_id) + if not version or version.campaign_id != campaign.id: + raise MockCampaignSendError("Campaign version not found or not part of campaign") + + if clear_mailbox: + clear_records() + + with tempfile.TemporaryDirectory(prefix="multimailer-mock-send-") as temp_dir: + temp_path = Path(temp_dir) + campaign_path = temp_path / f"campaign-{version.id}.json" + campaign_path.write_text(json.dumps(version.raw_json, ensure_ascii=False, indent=2), encoding="utf-8") + config = load_campaign_config(campaign_path) + validation_report = validate_campaign_config(config, campaign_file=campaign_path, check_files=check_files) + build_result = build_campaign_messages(config, campaign_file=campaign_path, write_eml=False) + + send_results: list[dict[str, Any]] = [] + sent_count = 0 + failed_count = 0 + skipped_count = 0 + imap_appended_count = 0 + imap_failed_count = 0 + + for built in build_result.built_messages: + draft = built.draft + can_send, skip_reason = _can_mock_send(draft, include_warnings=include_warnings, include_needs_review=include_needs_review) + row: dict[str, Any] = { + "entry_index": draft.entry_index, + "entry_id": draft.entry_id, + "subject": draft.subject, + "validation_status": draft.validation_status.value, + "build_status": str(draft.build_status.value if hasattr(draft.build_status, "value") else draft.build_status), + "to": _message_addresses_payload(draft.to), + "attachments": _attachment_payloads(draft), + "issues": _issue_payloads(draft), + } + + if not can_send or built.mime is None: + skipped_count += 1 + row.update({"status": "skipped", "message": skip_reason or "Message has no MIME output"}) + send_results.append(row) + continue + + recipients = _recipient_emails(draft) + envelope_from = _envelope_from(draft) + if not recipients: + skipped_count += 1 + row.update({"status": "skipped", "message": "No envelope recipients"}) + send_results.append(row) + continue + + if not send: + row.update({"status": "ready", "message": f"Would send to {len(recipients)} recipient(s)", "envelope_from": envelope_from, "envelope_recipients": recipients}) + send_results.append(row) + continue + + try: + if consume_fail_next_smtp(): + raise MockCampaignSendError("Configured mock failure: next SMTP delivery fails") + rejected = _smtp_rejection_matches(recipients) + if rejected and len(rejected) == len(recipients): + raise MockCampaignSendError(f"Configured mock failure: all recipients rejected ({', '.join(rejected)})") + + accepted = [recipient for recipient in recipients if recipient not in rejected] + smtp_record = record_smtp_delivery(built.mime, envelope_from=envelope_from, envelope_recipients=accepted, smtp_host="mock.smtp.local") + sent_count += 1 + row.update({ + "status": "sent", + "message": f"Mock SMTP captured as {smtp_record.id}", + "smtp_message_id": smtp_record.id, + "envelope_from": envelope_from, + "envelope_recipients": accepted, + "refused_recipients": rejected, + }) + + if append_sent: + try: + if consume_fail_next_imap(): + raise MockCampaignSendError("Configured mock failure: next IMAP append fails") + folder = "Sent" + if config.delivery.imap_append_sent.folder and config.delivery.imap_append_sent.folder != "auto": + folder = config.delivery.imap_append_sent.folder + elif config.server.imap and config.server.imap.sent_folder and config.server.imap.sent_folder != "auto": + folder = config.server.imap.sent_folder + imap_record = record_imap_append(_raw_message_bytes(built.mime), folder=folder, imap_host="mock.imap.local") + imap_appended_count += 1 + row.update({"imap_status": "appended", "imap_message_id": imap_record.id, "imap_folder": folder}) + except Exception as exc: + imap_failed_count += 1 + row.update({"imap_status": "failed", "imap_error": str(exc)}) + + except Exception as exc: + failed_count += 1 + row.update({"status": "failed", "message": str(exc), "envelope_from": envelope_from, "envelope_recipients": recipients}) + + send_results.append(row) + + validation_json = validation_report.model_dump(mode="json") + validation_json.update({"ok": validation_report.ok, "error_count": validation_report.error_count, "warning_count": validation_report.warning_count}) + build_report = build_result.report + build_json = build_report.model_dump(mode="json") + build_json.update({ + "built_count": build_report.built_count, + "queueable_count": build_report.queueable_count, + "needs_review_count": build_report.needs_review_count, + "blocked_count": build_report.blocked_count, + "warning_count": build_report.warning_count, + "ready_count": build_report.ready_count, + "messages": [_message_payload(message) for message in build_report.messages], + }) + + attempted_count = sum(1 for row in send_results if row.get("status") in {"sent", "failed"}) + return { + "campaign_id": campaign.id, + "version_id": version.id, + "version_number": version.version_number, + "send_requested": send, + "include_warnings": include_warnings, + "include_needs_review": include_needs_review, + "append_sent": append_sent, + "steps": [ + {"key": "validate", "label": "Validate campaign JSON", "status": "ok" if validation_report.ok else "needs_review", "summary": validation_json}, + {"key": "build", "label": "Build messages", "status": "ok" if build_report.queueable_count else "needs_review", "summary": {"built": build_report.built_count, "queueable": build_report.queueable_count, "needs_review": build_report.needs_review_count, "blocked": build_report.blocked_count}}, + {"key": "send", "label": "Mock SMTP delivery", "status": "skipped" if not send else ("ok" if failed_count == 0 else "needs_review"), "summary": {"attempted": attempted_count, "sent": sent_count, "failed": failed_count, "skipped": skipped_count}}, + {"key": "imap", "label": "Mock IMAP Sent append", "status": "skipped" if not send or not append_sent else ("ok" if imap_failed_count == 0 else "needs_review"), "summary": {"appended": imap_appended_count, "failed": imap_failed_count}}, + ], + "validation": validation_json, + "build": build_json, + "send": { + "attempted_count": attempted_count, + "sent_count": sent_count, + "failed_count": failed_count, + "skipped_count": skipped_count, + "imap_appended_count": imap_appended_count, + "imap_failed_count": imap_failed_count, + "results": send_results, + }, + "mailbox": {"messages": list_records(limit=200)}, + } diff --git a/server/app/mailer/dev/mock_mailbox.py b/server/app/mailer/dev/mock_mailbox.py new file mode 100644 index 0000000..1182d58 --- /dev/null +++ b/server/app/mailer/dev/mock_mailbox.py @@ -0,0 +1,327 @@ +from __future__ import annotations + +import json +import re +import shutil +from dataclasses import dataclass +from datetime import datetime, timezone +from email import policy +from email.message import EmailMessage +from email.parser import BytesParser +from pathlib import Path +from typing import Any +from uuid import uuid4 + +from app.settings import settings + +MOCK_SMTP_HOSTS = {"mock", "mock.smtp", "mock.smtp.local", "mock-mail", "__mock_smtp__"} +MOCK_IMAP_HOSTS = {"mock", "mock.imap", "mock.imap.local", "mock-mail", "__mock_imap__"} +MOCK_IMAP_FOLDERS = [ + {"name": "INBOX", "flags": []}, + {"name": "Sent", "flags": ["\\Sent"]}, + {"name": "Drafts", "flags": ["\\Drafts"]}, + {"name": "Trash", "flags": ["\\Trash"]}, + {"name": "Archive", "flags": ["\\Archive"]}, +] + + +@dataclass(frozen=True, slots=True) +class MockDeliveryRecord: + id: str + kind: str + created_at: str + envelope_from: str | None + envelope_recipients: list[str] + subject: str | None + from_header: str | None + to_header: str | None + cc_header: str | None + bcc_header: str | None + message_id: str | None + size_bytes: int + body_preview: str | None + attachment_count: int + folder: str | None = None + smtp_host: str | None = None + imap_host: str | None = None + raw_filename: str | None = None + headers: dict[str, str] | None = None + attachments: list[dict[str, Any]] | None = None + + def as_dict(self, *, include_raw: bool = False) -> dict[str, Any]: + data = { + "id": self.id, + "kind": self.kind, + "created_at": self.created_at, + "envelope_from": self.envelope_from, + "envelope_recipients": self.envelope_recipients, + "subject": self.subject, + "from_header": self.from_header, + "to_header": self.to_header, + "cc_header": self.cc_header, + "bcc_header": self.bcc_header, + "message_id": self.message_id, + "size_bytes": self.size_bytes, + "body_preview": self.body_preview, + "attachment_count": self.attachment_count, + "folder": self.folder, + "smtp_host": self.smtp_host, + "imap_host": self.imap_host, + "raw_filename": self.raw_filename, + "headers": self.headers or {}, + "attachments": self.attachments or [], + } + if include_raw: + data["raw_eml"] = read_raw_message(self.id) + return data + + +def _base_dir() -> Path: + path = Path(settings.mock_mailbox_dir) + path.mkdir(parents=True, exist_ok=True) + (path / "messages").mkdir(parents=True, exist_ok=True) + return path + + +def _message_dir() -> Path: + return _base_dir() / "messages" + + +def _failure_path() -> Path: + return _base_dir() / "failures.json" + + +def normalize_mock_host(host: str | None) -> str: + return (host or "").strip().lower() + + +def is_mock_smtp_host(host: str | None) -> bool: + return normalize_mock_host(host) in MOCK_SMTP_HOSTS + + +def is_mock_imap_host(host: str | None) -> bool: + return normalize_mock_host(host) in MOCK_IMAP_HOSTS + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _message_to_bytes(message: EmailMessage | bytes) -> bytes: + if isinstance(message, bytes): + return message + return message.as_bytes(policy=policy.SMTP) + + +def _parse_message(raw: bytes) -> EmailMessage: + return BytesParser(policy=policy.default).parsebytes(raw) + + +def _header_text(message: EmailMessage, name: str) -> str | None: + value = message.get(name) + return str(value) if value is not None else None + + +def _body_preview(message: EmailMessage) -> str | None: + body = None + try: + if message.is_multipart(): + body = message.get_body(preferencelist=("plain", "html")) + else: + body = message + if body is None: + return None + content = body.get_content() + except Exception: + return None + text = re.sub(r"\s+", " ", str(content)).strip() + if not text: + return None + return text[:600] + + +def _attachment_summaries(message: EmailMessage) -> list[dict[str, Any]]: + attachments: list[dict[str, Any]] = [] + for part in message.iter_attachments(): + payload = part.get_payload(decode=True) or b"" + attachments.append( + { + "filename": part.get_filename(), + "content_type": part.get_content_type(), + "size_bytes": len(payload), + } + ) + return attachments + + +def _headers(message: EmailMessage) -> dict[str, str]: + return {str(key): str(value) for key, value in message.items()} + + +def _record_from_raw( + raw: bytes, + *, + kind: str, + envelope_from: str | None = None, + envelope_recipients: list[str] | None = None, + folder: str | None = None, + smtp_host: str | None = None, + imap_host: str | None = None, +) -> MockDeliveryRecord: + message_id = uuid4().hex + raw_filename = f"{message_id}.eml" + json_filename = f"{message_id}.json" + message = _parse_message(raw) + attachments = _attachment_summaries(message) + record = MockDeliveryRecord( + id=message_id, + kind=kind, + created_at=_now_iso(), + envelope_from=envelope_from, + envelope_recipients=list(envelope_recipients or []), + subject=_header_text(message, "Subject"), + from_header=_header_text(message, "From"), + to_header=_header_text(message, "To"), + cc_header=_header_text(message, "Cc"), + bcc_header=_header_text(message, "Bcc"), + message_id=_header_text(message, "Message-ID"), + size_bytes=len(raw), + body_preview=_body_preview(message), + attachment_count=len(attachments), + folder=folder, + smtp_host=smtp_host, + imap_host=imap_host, + raw_filename=raw_filename, + headers=_headers(message), + attachments=attachments, + ) + message_dir = _message_dir() + (message_dir / raw_filename).write_bytes(raw) + (message_dir / json_filename).write_text(json.dumps(record.as_dict(), indent=2, ensure_ascii=False), encoding="utf-8") + return record + + +def record_smtp_delivery( + message: EmailMessage, + *, + envelope_from: str, + envelope_recipients: list[str], + smtp_host: str | None = None, +) -> MockDeliveryRecord: + return _record_from_raw( + _message_to_bytes(message), + kind="smtp", + envelope_from=envelope_from, + envelope_recipients=envelope_recipients, + smtp_host=smtp_host, + ) + + +def record_imap_append( + message_bytes: bytes, + *, + folder: str, + imap_host: str | None = None, +) -> MockDeliveryRecord: + return _record_from_raw(message_bytes, kind="imap_append", folder=folder, imap_host=imap_host) + + +def _load_record(record_id: str) -> dict[str, Any] | None: + path = _message_dir() / f"{record_id}.json" + if not path.exists(): + return None + try: + return json.loads(path.read_text(encoding="utf-8")) + except json.JSONDecodeError: + return None + + +def list_records(*, kind: str | None = None, limit: int = 100) -> list[dict[str, Any]]: + records: list[dict[str, Any]] = [] + for path in _message_dir().glob("*.json"): + try: + record = json.loads(path.read_text(encoding="utf-8")) + except json.JSONDecodeError: + continue + if kind and record.get("kind") != kind: + continue + records.append(record) + records.sort(key=lambda item: str(item.get("created_at") or ""), reverse=True) + return records[: max(1, min(limit, 500))] + + +def get_record(record_id: str, *, include_raw: bool = True) -> dict[str, Any] | None: + record = _load_record(record_id) + if record and include_raw: + record["raw_eml"] = read_raw_message(record_id) + return record + + +def read_raw_message(record_id: str) -> str | None: + record = _load_record(record_id) + if not record: + return None + raw_filename = record.get("raw_filename") + if not raw_filename: + return None + raw_path = _message_dir() / str(raw_filename) + if not raw_path.exists(): + return None + return raw_path.read_text(encoding="utf-8", errors="replace") + + +def clear_records() -> int: + message_dir = _message_dir() + count = len(list(message_dir.glob("*.json"))) + if message_dir.exists(): + shutil.rmtree(message_dir) + message_dir.mkdir(parents=True, exist_ok=True) + return count + + +def get_failures() -> dict[str, Any]: + path = _failure_path() + if not path.exists(): + return { + "fail_next_smtp": False, + "fail_next_imap": False, + "smtp_reject_recipients_containing": None, + } + try: + data = json.loads(path.read_text(encoding="utf-8")) + except json.JSONDecodeError: + data = {} + return { + "fail_next_smtp": bool(data.get("fail_next_smtp")), + "fail_next_imap": bool(data.get("fail_next_imap")), + "smtp_reject_recipients_containing": data.get("smtp_reject_recipients_containing") or None, + } + + +def set_failures(*, fail_next_smtp: bool | None = None, fail_next_imap: bool | None = None, smtp_reject_recipients_containing: str | None = None) -> dict[str, Any]: + current = get_failures() + if fail_next_smtp is not None: + current["fail_next_smtp"] = fail_next_smtp + if fail_next_imap is not None: + current["fail_next_imap"] = fail_next_imap + current["smtp_reject_recipients_containing"] = smtp_reject_recipients_containing or None + _failure_path().write_text(json.dumps(current, indent=2), encoding="utf-8") + return current + + +def consume_fail_next_smtp() -> bool: + current = get_failures() + if current.get("fail_next_smtp"): + current["fail_next_smtp"] = False + _failure_path().write_text(json.dumps(current, indent=2), encoding="utf-8") + return True + return False + + +def consume_fail_next_imap() -> bool: + current = get_failures() + if current.get("fail_next_imap"): + current["fail_next_imap"] = False + _failure_path().write_text(json.dumps(current, indent=2), encoding="utf-8") + return True + return False diff --git a/server/app/mailer/messages/builder.py b/server/app/mailer/messages/builder.py index 3f088c7..13fd6b3 100644 --- a/server/app/mailer/messages/builder.py +++ b/server/app/mailer/messages/builder.py @@ -129,9 +129,14 @@ def _recipient_values(entry: EntryConfig) -> dict[str, str]: def _template_values(config: CampaignConfig, entry: EntryConfig) -> dict[str, Any]: values: dict[str, Any] = {} + for field in config.fields: + values.setdefault(field.name, "") + values.setdefault(f"global::{field.name}", "") + values.setdefault(f"local::{field.name}", "") for key, value in config.global_values.items(): values[f"global::{key}"] = value for key, value in effective_entry_field_values(config, entry).items(): + values[key] = value values[f"local::{key}"] = value if entry.id: values["local::id"] = entry.id @@ -552,6 +557,62 @@ def build_entry_message( return BuiltMessage(draft=draft, mime=message) + +def _unsent_attachment_issues( + *, + config: CampaignConfig, + campaign_file: str | Path, + built_messages: list[BuiltMessage], +) -> list[MessageIssue]: + behavior = config.validation_policy.unsent_attachment_files.value + if behavior == Behavior.CONTINUE.value: + return [] + + matched_files = { + Path(match).resolve() + for built in built_messages + for attachment in built.draft.attachments + for match in attachment.matches + } + + issues: list[MessageIssue] = [] + for base_path in config.attachments.base_paths: + if not base_path.unsent_warning: + continue + directory = _resolve(campaign_file, base_path.path) + if not directory.exists() or not directory.is_dir(): + continue + all_files = sorted(path.resolve() for path in directory.rglob("*") if path.is_file()) + unsent = [path for path in all_files if path not in matched_files] + if not unsent: + continue + shown = ", ".join(str(path.relative_to(directory)) for path in unsent[:10]) + if len(unsent) > 10: + shown += f", … (+{len(unsent) - 10} more)" + issues.append( + _issue_from_behavior( + code="unsent_attachment_files", + message=f"{len(unsent)} file(s) in attachment source {base_path.name!r} are not used by any message: {shown}", + behavior=behavior, + source=f"attachments:{base_path.name}", + ) + ) + return issues + + +def _apply_campaign_level_issues(built_messages: list[BuiltMessage], issues: list[MessageIssue]) -> None: + if not issues: + return + for built in built_messages: + if not built.draft.active: + continue + built.draft.issues.extend(issues) + status = built.draft.validation_status + for issue in issues: + if issue.behavior: + status = _apply_behavior(status, issue.behavior) + built.draft.validation_status = status + def build_campaign_messages( config: CampaignConfig, *, @@ -577,6 +638,10 @@ def build_campaign_messages( ) for index, entry in enumerate(entries, start=1) ] + _apply_campaign_level_issues( + built_messages, + _unsent_attachment_issues(config=config, campaign_file=campaign_path, built_messages=built_messages), + ) report = CampaignBuildReport( campaign_id=config.campaign.id, diff --git a/server/app/mailer/persistence/campaigns.py b/server/app/mailer/persistence/campaigns.py index 37dcb47..073d5cf 100644 --- a/server/app/mailer/persistence/campaigns.py +++ b/server/app/mailer/persistence/campaigns.py @@ -25,6 +25,7 @@ from app.mailer.campaign.loader import load_campaign_config from app.mailer.campaign.validation import Severity, validate_campaign_config from app.mailer.messages.builder import build_campaign_messages from app.mailer.messages.models import MessageDraft +from app.storage.services import record_campaign_attachment_uses_for_job RUNTIME_DIR = Path(__file__).resolve().parents[3] / "runtime" CAMPAIGN_SNAPSHOT_DIR = RUNTIME_DIR / "campaign_snapshots" @@ -326,6 +327,7 @@ def build_campaign_version( ) session.add(job) session.flush() + record_campaign_attachment_uses_for_job(session, job, stage="built") for issue in built.draft.issues: session.add( CampaignIssue( diff --git a/server/app/mailer/persistence/versions.py b/server/app/mailer/persistence/versions.py index b464fc9..0c1c315 100644 --- a/server/app/mailer/persistence/versions.py +++ b/server/app/mailer/persistence/versions.py @@ -100,6 +100,7 @@ def minimal_campaign_json(*, external_id: str, name: str, description: str | Non "name": "Campaign files", "path": ".", "allow_individual": True, + "unsent_warning": False, } ], "allow_individual": True, @@ -126,6 +127,7 @@ def minimal_campaign_json(*, external_id: str, name: str, description: str | Non "missing_required_attachment": "ask", "missing_optional_attachment": "warn", "ambiguous_attachment_match": "ask", + "unsent_attachment_files": "warn", "ignore_empty_fields": False, "missing_email": "block", "template_error": "block", diff --git a/server/app/mailer/schema/campaign.schema.json b/server/app/mailer/schema/campaign.schema.json index fa52280..995b56f 100644 --- a/server/app/mailer/schema/campaign.schema.json +++ b/server/app/mailer/schema/campaign.schema.json @@ -494,11 +494,24 @@ "warn" ], "default": "drop" + }, + "unsent_attachment_files": { + "type": "string", + "enum": [ + "block", + "ask", + "drop", + "continue", + "warn" + ], + "default": "warn", + "description": "Behavior when a base path with unsent_warning contains files that are not attached to any message." } }, "additionalProperties": false, "default": { - "ignore_empty_fields": false + "ignore_empty_fields": false, + "unsent_attachment_files": "warn" } }, "delivery": { @@ -714,6 +727,13 @@ "default": { "enabled": false } + }, + "type": { + "description": "Legacy UI helper; ignored by backend. Direct files are represented as plain file_filter patterns.", + "type": [ + "string", + "null" + ] } }, "additionalProperties": false @@ -907,6 +927,11 @@ "null" ], "description": "Legacy UI compatibility value. Ignored by the backend." + }, + "unsent_warning": { + "type": "boolean", + "default": false, + "description": "Warn according to validation_policy.unsent_attachment_files if files in this source are not attached to any built message." } }, "additionalProperties": false diff --git a/server/app/mailer/sending/imap.py b/server/app/mailer/sending/imap.py index a5aa251..1820ee0 100644 --- a/server/app/mailer/sending/imap.py +++ b/server/app/mailer/sending/imap.py @@ -8,6 +8,12 @@ import time from dataclasses import dataclass from app.mailer.campaign.models import ImapConfig, TransportSecurity +from app.mailer.dev.mock_mailbox import ( + MOCK_IMAP_FOLDERS, + consume_fail_next_imap, + is_mock_imap_host, + record_imap_append, +) class ImapConfigurationError(ValueError): @@ -210,6 +216,14 @@ def test_imap_login(*, imap_config: ImapConfig) -> ImapLoginTestResult: """ host, port = _require_imap_config(imap_config) + if is_mock_imap_host(imap_config.host): + return ImapLoginTestResult( + host=host, + port=port, + security=imap_config.security.value, + authenticated=bool(imap_config.username and imap_config.password), + ) + client = _open_imap(imap_config) try: return ImapLoginTestResult( @@ -229,6 +243,16 @@ def list_imap_folders(*, imap_config: ImapConfig) -> ImapFolderListResult: """Return folders visible through IMAP LIST and the best sent-folder guess.""" host, port = _require_imap_config(imap_config) + if is_mock_imap_host(imap_config.host): + folders = [ImapMailboxInfo(name=str(item["name"]), flags=list(item.get("flags") or [])) for item in MOCK_IMAP_FOLDERS] + return ImapFolderListResult( + host=host, + port=port, + security=imap_config.security.value, + folders=folders, + detected_sent_folder="Sent", + ) + client = _open_imap(imap_config) try: typ, data = client.list() @@ -272,6 +296,20 @@ def append_message_to_sent( """ host, port = _require_imap_config(imap_config) + if is_mock_imap_host(imap_config.host): + if consume_fail_next_imap(): + raise ImapAppendError("Mock IMAP configured to fail the next append", temporary=False) + target_folder = folder or (imap_config.sent_folder if imap_config.sent_folder and imap_config.sent_folder != "auto" else "Sent") + record = record_imap_append(message_bytes, folder=target_folder, imap_host=imap_config.host) + return ImapAppendResult( + host=host, + port=port, + security=imap_config.security.value, + folder=target_folder, + bytes_appended=len(message_bytes), + response=f"mock append stored as {record.id}", + ) + client: imaplib.IMAP4 | None = None try: client = _open_imap(imap_config) diff --git a/server/app/mailer/sending/jobs.py b/server/app/mailer/sending/jobs.py index e3ac4cd..78fc13a 100644 --- a/server/app/mailer/sending/jobs.py +++ b/server/app/mailer/sending/jobs.py @@ -30,6 +30,7 @@ from app.mailer.persistence.campaigns import _write_campaign_snapshot from app.mailer.sending.rate_limit import wait_for_rate_limit from app.mailer.sending.smtp import SmtpConfigurationError, SmtpSendError, send_email_message from app.mailer.sending.imap import ImapAppendError, ImapConfigurationError, append_message_to_sent +from app.storage.services import mark_job_attachment_uses_sent class QueueingError(RuntimeError): @@ -591,6 +592,7 @@ def send_campaign_job(session: Session, *, job_id: str, dry_run: bool = False, u else: job.imap_status = JobImapStatus.NOT_REQUESTED.value job.last_error = None + mark_job_attachment_uses_sent(session, job) session.add(attempt) session.add(job) _update_campaign_after_job(session, job.campaign_id, job.campaign_version_id) @@ -702,6 +704,7 @@ def append_sent_for_job(session: Session, *, job_id: str, dry_run: bool = False) attempt.folder = result.folder job.imap_status = JobImapStatus.APPENDED.value job.last_error = None + mark_job_attachment_uses_sent(session, job) session.add(attempt) session.add(job) session.commit() diff --git a/server/app/mailer/sending/smtp.py b/server/app/mailer/sending/smtp.py index 1ea498e..854710b 100644 --- a/server/app/mailer/sending/smtp.py +++ b/server/app/mailer/sending/smtp.py @@ -8,6 +8,12 @@ from email.message import EmailMessage from email.utils import formataddr from app.mailer.campaign.models import SmtpConfig, TransportSecurity +from app.mailer.dev.mock_mailbox import ( + consume_fail_next_smtp, + get_failures, + is_mock_smtp_host, + record_smtp_delivery, +) class SmtpConfigurationError(ValueError): @@ -98,6 +104,15 @@ def test_smtp_login(*, smtp_config: SmtpConfig) -> SmtpLoginTestResult: """ host, port = _require_smtp_config(smtp_config) + if is_mock_smtp_host(smtp_config.host): + host, port = _require_smtp_config(smtp_config) + return SmtpLoginTestResult( + host=host, + port=port, + security=smtp_config.security.value, + authenticated=bool(smtp_config.username and smtp_config.password), + ) + smtp = _open_smtp(smtp_config) try: return SmtpLoginTestResult( @@ -165,6 +180,37 @@ def send_email_message( if not envelope_recipients: raise SmtpConfigurationError("at least one SMTP envelope recipient is required") + if is_mock_smtp_host(smtp_config.host): + if consume_fail_next_smtp(): + raise SmtpSendError("Mock SMTP configured to fail the next send") + failures = get_failures() + reject_text = str(failures.get("smtp_reject_recipients_containing") or "").strip().lower() + refused: dict[str, tuple[int, bytes]] = {} + accepted = list(envelope_recipients) + if reject_text: + refused = { + recipient: (550, b"mock recipient rejected") + for recipient in envelope_recipients + if reject_text in recipient.lower() + } + accepted = [recipient for recipient in envelope_recipients if recipient not in refused] + if not accepted: + raise SmtpSendError(f"all mock SMTP recipients were refused: {_decode_refused(refused)}") + record_smtp_delivery( + message, + envelope_from=envelope_from, + envelope_recipients=accepted, + smtp_host=smtp_config.host, + ) + return SmtpSendResult( + host=host, + port=port, + security=smtp_config.security.value, + envelope_from=envelope_from, + envelope_recipients=list(envelope_recipients), + refused_recipients=_decode_refused(refused), + ) + try: with _open_smtp(smtp_config) as smtp: refused = smtp.send_message( diff --git a/server/app/main.py b/server/app/main.py index 9365731..dcf43da 100644 --- a/server/app/main.py +++ b/server/app/main.py @@ -40,8 +40,10 @@ def health(): "env": settings.app_env, "api": {"version": "v1", "auth": "api-key-or-session"}, "storage": { - "endpoint": settings.s3_endpoint_url, - "bucket": settings.s3_bucket, - "region": settings.s3_region, + "backend": settings.file_storage_backend, + "local_root": settings.file_storage_local_root if settings.file_storage_backend == "local" else None, + "endpoint": settings.file_storage_s3_endpoint_url or settings.s3_endpoint_url, + "bucket": settings.file_storage_s3_bucket or settings.s3_bucket, + "region": settings.file_storage_s3_region or settings.s3_region, }, } diff --git a/server/app/settings.py b/server/app/settings.py index e0ef349..a556b12 100644 --- a/server/app/settings.py +++ b/server/app/settings.py @@ -19,8 +19,19 @@ class Settings(BaseSettings): s3_secret_access_key: str = Field(default="multimailer-dev-secret-change-me", alias="S3_SECRET_ACCESS_KEY") s3_bucket: str = Field(default="attachments", alias="S3_BUCKET") + # Managed file storage. Development defaults to local filesystem storage; + # production can switch to Garage/S3 without changing API contracts. + file_storage_backend: str = Field(default="local", alias="FILE_STORAGE_BACKEND") + file_storage_local_root: str = Field(default="runtime/files", alias="FILE_STORAGE_LOCAL_ROOT") + file_storage_s3_endpoint_url: str | None = Field(default=None, alias="FILE_STORAGE_S3_ENDPOINT_URL") + file_storage_s3_region: str | None = Field(default=None, alias="FILE_STORAGE_S3_REGION") + file_storage_s3_access_key_id: str | None = Field(default=None, alias="FILE_STORAGE_S3_ACCESS_KEY_ID") + file_storage_s3_secret_access_key: str | None = Field(default=None, alias="FILE_STORAGE_S3_SECRET_ACCESS_KEY") + file_storage_s3_bucket: str | None = Field(default="files", alias="FILE_STORAGE_S3_BUCKET") + master_key_b64: str | None = Field(default=None, alias="MASTER_KEY_B64") celery_queues: str = Field(default="send_email,append_sent,default", alias="CELERY_QUEUES") + mock_mailbox_dir: str = Field(default="runtime/mock-mailbox", alias="MOCK_MAILBOX_DIR") # Development bootstrap only. Do not use this in production. dev_bootstrap_api_key: str | None = Field(default="dev-multimailer-api-key", alias="DEV_BOOTSTRAP_API_KEY") diff --git a/server/app/storage/__init__.py b/server/app/storage/__init__.py new file mode 100644 index 0000000..784797a --- /dev/null +++ b/server/app/storage/__init__.py @@ -0,0 +1 @@ +"""Managed file storage services for Multi Seal Mail.""" diff --git a/server/app/storage/backends.py b/server/app/storage/backends.py new file mode 100644 index 0000000..35a1769 --- /dev/null +++ b/server/app/storage/backends.py @@ -0,0 +1,116 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Protocol + +import boto3 + +from app.settings import settings + + +class StorageBackendError(RuntimeError): + pass + + +class StorageBackend(Protocol): + name: str + + def put_bytes(self, key: str, data: bytes, *, content_type: str | None = None) -> None: ... + def get_bytes(self, key: str) -> bytes: ... + def delete(self, key: str) -> None: ... + def exists(self, key: str) -> bool: ... + + +@dataclass(slots=True) +class LocalFilesystemStorageBackend: + root: Path + name: str = "local" + + def __post_init__(self) -> None: + self.root = self.root.expanduser().resolve() + self.root.mkdir(parents=True, exist_ok=True) + + def _path(self, key: str) -> Path: + path = (self.root / key).resolve() + if not path.is_relative_to(self.root): + raise StorageBackendError("Storage key escapes local storage root") + return path + + def put_bytes(self, key: str, data: bytes, *, content_type: str | None = None) -> None: + path = self._path(key) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_bytes(data) + + def get_bytes(self, key: str) -> bytes: + path = self._path(key) + if not path.exists() or not path.is_file(): + raise StorageBackendError("Stored object does not exist") + return path.read_bytes() + + def delete(self, key: str) -> None: + path = self._path(key) + if path.exists() and path.is_file(): + path.unlink() + + def exists(self, key: str) -> bool: + path = self._path(key) + return path.exists() and path.is_file() + + +@dataclass(slots=True) +class S3StorageBackend: + bucket: str + endpoint_url: str + region_name: str + access_key_id: str + secret_access_key: str + name: str = "s3" + + @property + def client(self): + return boto3.client( + "s3", + endpoint_url=self.endpoint_url, + region_name=self.region_name, + aws_access_key_id=self.access_key_id, + aws_secret_access_key=self.secret_access_key, + ) + + def put_bytes(self, key: str, data: bytes, *, content_type: str | None = None) -> None: + kwargs = {"Bucket": self.bucket, "Key": key, "Body": data} + if content_type: + kwargs["ContentType"] = content_type + self.client.put_object(**kwargs) + + def get_bytes(self, key: str) -> bytes: + try: + obj = self.client.get_object(Bucket=self.bucket, Key=key) + return obj["Body"].read() + except Exception as exc: # pragma: no cover - depends on S3 backend + raise StorageBackendError(str(exc)) from exc + + def delete(self, key: str) -> None: + self.client.delete_object(Bucket=self.bucket, Key=key) + + def exists(self, key: str) -> bool: + try: + self.client.head_object(Bucket=self.bucket, Key=key) + return True + except Exception: + return False + + +def get_storage_backend() -> StorageBackend: + backend = settings.file_storage_backend.lower().strip() + if backend in {"local", "filesystem", "fs"}: + return LocalFilesystemStorageBackend(Path(settings.file_storage_local_root)) + if backend in {"s3", "garage"}: + return S3StorageBackend( + bucket=settings.file_storage_s3_bucket or settings.s3_bucket, + endpoint_url=settings.file_storage_s3_endpoint_url or settings.s3_endpoint_url, + region_name=settings.file_storage_s3_region or settings.s3_region, + access_key_id=settings.file_storage_s3_access_key_id or settings.s3_access_key_id, + secret_access_key=settings.file_storage_s3_secret_access_key or settings.s3_secret_access_key, + ) + raise StorageBackendError(f"Unsupported file storage backend: {settings.file_storage_backend}") diff --git a/server/app/storage/paths.py b/server/app/storage/paths.py new file mode 100644 index 0000000..0c9c4e9 --- /dev/null +++ b/server/app/storage/paths.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +import re +from pathlib import PurePosixPath +from uuid import uuid4 + +_SAFE_NAME_RE = re.compile(r"[^A-Za-z0-9_.@ -]+") + + +class UnsafeFilePathError(ValueError): + pass + + +def normalize_logical_path(path: str | None, *, fallback_filename: str | None = None) -> str: + """Return a safe tenant-relative logical path using POSIX separators. + + The logical path is metadata, not a filesystem path. It never starts with a + slash and cannot contain path traversal components. It is used for browsing, + wildcard matching and attachment rules. + """ + + raw = (path or "").replace("\\", "/").strip() + if not raw and fallback_filename: + raw = fallback_filename + if not raw: + raise UnsafeFilePathError("File path is empty") + if raw.startswith("/"): + raw = raw.lstrip("/") + parts: list[str] = [] + for part in raw.split("/"): + clean = part.strip() + if not clean or clean == ".": + continue + if clean == "..": + raise UnsafeFilePathError("Path traversal is not allowed") + parts.append(clean) + if not parts: + raise UnsafeFilePathError("File path is empty") + return "/".join(parts) + + +def normalize_folder(path: str | None) -> str: + raw = (path or "").replace("\\", "/").strip().strip("/") + if not raw: + return "" + normalized = normalize_logical_path(raw) + return "" if normalized == "." else normalized + + +def filename_from_path(path: str) -> str: + name = PurePosixPath(path).name + if not name or name in {".", ".."}: + raise UnsafeFilePathError("Invalid filename") + return name + + +def join_folder_filename(folder: str | None, filename: str) -> str: + safe_name = sanitize_filename(filename) + safe_folder = normalize_folder(folder) + return f"{safe_folder}/{safe_name}" if safe_folder else safe_name + + +def sanitize_filename(filename: str | None) -> str: + raw = (filename or "file").replace("\\", "/").split("/")[-1].strip() + raw = raw.strip(".") or "file" + safe = _SAFE_NAME_RE.sub("_", raw) + safe = re.sub(r"\s+", " ", safe).strip() + return safe or f"file-{uuid4().hex}" + + +def safe_storage_component(value: str | None, fallback: str = "file") -> str: + safe = sanitize_filename(value or fallback) + return safe.replace(" ", "_")[:180] diff --git a/server/app/storage/services.py b/server/app/storage/services.py new file mode 100644 index 0000000..9ac9b64 --- /dev/null +++ b/server/app/storage/services.py @@ -0,0 +1,750 @@ +from __future__ import annotations + +import hashlib +import mimetypes +import re +import zipfile +from dataclasses import dataclass +from datetime import datetime, timezone +from io import BytesIO +from pathlib import PurePosixPath +from typing import Any, Iterable +from uuid import uuid4 + +from sqlalchemy import or_ +from sqlalchemy.orm import Session + +from app.db.models import ( + Campaign, + CampaignAttachmentUse, + CampaignJob, + FileAsset, + FileBlob, + FileFolder, + FileShare, + FileVersion, + Group, + UserGroupMembership, +) +from app.settings import settings +from app.storage.backends import get_storage_backend +from app.storage.paths import filename_from_path, join_folder_filename, normalize_folder, normalize_logical_path, safe_storage_component + + +class FileStorageError(RuntimeError): + pass + + +@dataclass(slots=True) +class UploadedStoredFile: + asset: FileAsset + version: FileVersion + blob: FileBlob + + +@dataclass(slots=True) +class ResolvedPattern: + pattern: str + matches: list[FileAsset] + + +def utcnow() -> datetime: + return datetime.now(timezone.utc) + + +def user_group_ids(session: Session, *, tenant_id: str, user_id: str, include_admin_groups: bool = False) -> list[str]: + if include_admin_groups: + return [row.id for row in session.query(Group).filter(Group.tenant_id == tenant_id).order_by(Group.name.asc()).all()] + return [ + row.group_id + for row in session.query(UserGroupMembership) + .filter(UserGroupMembership.tenant_id == tenant_id, UserGroupMembership.user_id == user_id) + .all() + ] + + +def ensure_group_access(session: Session, *, tenant_id: str, group_id: str, user_id: str, is_admin: bool = False) -> None: + group = session.get(Group, group_id) + if not group or group.tenant_id != tenant_id: + raise FileStorageError("Group not found") + if is_admin: + return + membership = ( + session.query(UserGroupMembership) + .filter(UserGroupMembership.tenant_id == tenant_id, UserGroupMembership.user_id == user_id, UserGroupMembership.group_id == group_id) + .one_or_none() + ) + if membership is None: + raise FileStorageError("No access to this group file space") + + + + +def _owner_filter(query, owner_type: str, owner_id: str): + if owner_type == "user": + return query.filter(FileFolder.owner_user_id == owner_id) + if owner_type == "group": + return query.filter(FileFolder.owner_group_id == owner_id) + raise FileStorageError("Unsupported owner type") + + +def ensure_owner_access(session: Session, *, tenant_id: str, owner_type: str, owner_id: str, user_id: str, is_admin: bool = False) -> None: + owner_type = owner_type.lower().strip() + if owner_type == "user": + if owner_id != user_id and not is_admin: + raise FileStorageError("No access to this user file space") + return + if owner_type == "group": + ensure_group_access(session, tenant_id=tenant_id, group_id=owner_id, user_id=user_id, is_admin=is_admin) + return + raise FileStorageError("Files must be owned by a user or group") + + +def create_folder( + session: Session, + *, + tenant_id: str, + owner_type: str, + owner_id: str, + user_id: str, + path: str, + is_admin: bool = False, +) -> FileFolder: + owner_type = owner_type.lower().strip() + ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin) + normalized = normalize_folder(path) + if not normalized: + raise FileStorageError("Folder path is required") + query = session.query(FileFolder).filter(FileFolder.tenant_id == tenant_id, FileFolder.owner_type == owner_type, FileFolder.path == normalized) + query = _owner_filter(query, owner_type, owner_id) + existing = query.order_by(FileFolder.deleted_at.asc()).first() + if existing: + if existing.deleted_at is not None: + existing.deleted_at = None + session.add(existing) + return existing + folder = FileFolder( + tenant_id=tenant_id, + owner_type=owner_type, + owner_user_id=owner_id if owner_type == "user" else None, + owner_group_id=owner_id if owner_type == "group" else None, + path=normalized, + created_by_user_id=user_id, + metadata_={}, + ) + session.add(folder) + session.flush() + return folder + + +def list_folders_for_user( + session: Session, + *, + tenant_id: str, + user_id: str, + owner_type: str, + owner_id: str, + include_deleted: bool = False, + is_admin: bool = False, +) -> list[FileFolder]: + owner_type = owner_type.lower().strip() + ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin) + query = session.query(FileFolder).filter(FileFolder.tenant_id == tenant_id, FileFolder.owner_type == owner_type) + query = _owner_filter(query, owner_type, owner_id) + if not include_deleted: + query = query.filter(FileFolder.deleted_at.is_(None)) + return query.order_by(FileFolder.path.asc()).all() + + +def soft_delete_folder( + session: Session, + *, + tenant_id: str, + owner_type: str, + owner_id: str, + user_id: str, + path: str, + recursive: bool = True, + is_admin: bool = False, +) -> tuple[int, int]: + owner_type = owner_type.lower().strip() + ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin) + normalized = normalize_folder(path) + if not normalized: + raise FileStorageError("Folder path is required") + prefix = f"{normalized}/" + now = utcnow() + + folder_query = session.query(FileFolder).filter(FileFolder.tenant_id == tenant_id, FileFolder.owner_type == owner_type, FileFolder.deleted_at.is_(None)) + folder_query = _owner_filter(folder_query, owner_type, owner_id) + if recursive: + folder_query = folder_query.filter(or_(FileFolder.path == normalized, FileFolder.path.like(f"{prefix}%"))) + else: + child_exists = folder_query.filter(FileFolder.path.like(f"{prefix}%")).first() is not None + file_exists = _asset_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id).filter(FileAsset.display_path.like(f"{prefix}%")).first() is not None + if child_exists or file_exists: + raise FileStorageError("Folder is not empty") + folder_query = folder_query.filter(FileFolder.path == normalized) + + folders = folder_query.all() + for folder in folders: + folder.deleted_at = now + session.add(folder) + + file_query = _asset_query_for_owner(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id).filter(FileAsset.deleted_at.is_(None), FileAsset.display_path.like(f"{prefix}%")) + assets = file_query.all() if recursive else [] + for asset in assets: + asset.deleted_at = now + session.add(asset) + + return len(folders), len(assets) + + +def _asset_query_for_owner(session: Session, *, tenant_id: str, owner_type: str, owner_id: str): + query = session.query(FileAsset).filter(FileAsset.tenant_id == tenant_id, FileAsset.owner_type == owner_type) + if owner_type == "user": + return query.filter(FileAsset.owner_user_id == owner_id) + if owner_type == "group": + return query.filter(FileAsset.owner_group_id == owner_id) + raise FileStorageError("Unsupported owner type") + +def _storage_bucket_name() -> str: + return settings.file_storage_s3_bucket or settings.s3_bucket + + +def _storage_backend_name() -> str: + return settings.file_storage_backend.lower().strip() + + +def _storage_key(*, tenant_id: str, checksum: str, filename: str) -> str: + return f"tenants/{tenant_id}/files/{checksum[:2]}/{uuid4().hex}-{safe_storage_component(filename)}" + + +def _get_or_create_blob( + session: Session, + *, + tenant_id: str, + data: bytes, + filename: str, + content_type: str | None, +) -> FileBlob: + checksum = hashlib.sha256(data).hexdigest() + size = len(data) + blob = ( + session.query(FileBlob) + .filter(FileBlob.tenant_id == tenant_id, FileBlob.checksum_sha256 == checksum, FileBlob.size_bytes == size) + .one_or_none() + ) + if blob: + blob.ref_count += 1 + session.add(blob) + return blob + + storage_key = _storage_key(tenant_id=tenant_id, checksum=checksum, filename=filename) + backend = get_storage_backend() + backend.put_bytes(storage_key, data, content_type=content_type) + blob = FileBlob( + tenant_id=tenant_id, + storage_backend=_storage_backend_name(), + storage_bucket=_storage_bucket_name(), + storage_key=storage_key, + checksum_sha256=checksum, + size_bytes=size, + content_type=content_type, + ref_count=1, + ) + session.add(blob) + session.flush() + return blob + + +def create_file_asset( + session: Session, + *, + tenant_id: str, + owner_type: str, + owner_id: str, + user_id: str, + filename: str, + data: bytes, + folder: str | None = None, + display_path: str | None = None, + content_type: str | None = None, + description: str | None = None, + metadata: dict[str, Any] | None = None, + campaign_id: str | None = None, + is_admin: bool = False, +) -> UploadedStoredFile: + owner_type = owner_type.lower().strip() + ensure_owner_access(session, tenant_id=tenant_id, owner_type=owner_type, owner_id=owner_id, user_id=user_id, is_admin=is_admin) + + safe_filename = filename_from_path(normalize_logical_path(filename, fallback_filename="file")) + logical_path = normalize_logical_path(display_path) if display_path else join_folder_filename(folder, safe_filename) + if not content_type: + content_type = mimetypes.guess_type(safe_filename)[0] or "application/octet-stream" + + blob = _get_or_create_blob(session, tenant_id=tenant_id, data=data, filename=safe_filename, content_type=content_type) + asset = FileAsset( + tenant_id=tenant_id, + owner_type=owner_type, + owner_user_id=owner_id if owner_type == "user" else None, + owner_group_id=owner_id if owner_type == "group" else None, + display_path=logical_path, + filename=filename_from_path(logical_path), + description=description, + created_by_user_id=user_id, + metadata_=metadata or {}, + ) + session.add(asset) + session.flush() + version = FileVersion( + tenant_id=tenant_id, + file_asset_id=asset.id, + blob_id=blob.id, + version_number=1, + filename_at_upload=safe_filename, + display_path_at_upload=logical_path, + content_type=content_type, + size_bytes=blob.size_bytes, + checksum_sha256=blob.checksum_sha256, + created_by_user_id=user_id, + ) + session.add(version) + session.flush() + asset.current_version_id = version.id + session.add(asset) + if campaign_id: + share_file(session, tenant_id=tenant_id, asset=asset, target_type="campaign", target_id=campaign_id, permission="read", user_id=user_id) + return UploadedStoredFile(asset=asset, version=version, blob=blob) + + +def get_asset_for_user(session: Session, *, tenant_id: str, user_id: str, asset_id: str, require_write: bool = False, is_admin: bool = False) -> FileAsset: + asset = session.get(FileAsset, asset_id) + if not asset or asset.tenant_id != tenant_id or asset.deleted_at is not None: + raise FileStorageError("File not found") + if is_admin: + return asset + group_ids = user_group_ids(session, tenant_id=tenant_id, user_id=user_id) + owns = (asset.owner_type == "user" and asset.owner_user_id == user_id) or (asset.owner_type == "group" and asset.owner_group_id in group_ids) + if owns: + return asset + permission_values = ["read", "write", "manage"] if not require_write else ["write", "manage"] + share = ( + session.query(FileShare) + .filter( + FileShare.tenant_id == tenant_id, + FileShare.file_asset_id == asset.id, + FileShare.revoked_at.is_(None), + FileShare.permission.in_(permission_values), + or_( + (FileShare.target_type == "user") & (FileShare.target_id == user_id), + (FileShare.target_type == "group") & (FileShare.target_id.in_(group_ids)), + (FileShare.target_type == "tenant") & (FileShare.target_id == tenant_id), + ), + ) + .first() + ) + if not share: + raise FileStorageError("No access to this file") + return asset + + +def list_assets_for_user( + session: Session, + *, + tenant_id: str, + user_id: str, + owner_type: str | None = None, + owner_id: str | None = None, + campaign_id: str | None = None, + path_prefix: str | None = None, + include_deleted: bool = False, + is_admin: bool = False, +) -> list[FileAsset]: + query = session.query(FileAsset).filter(FileAsset.tenant_id == tenant_id) + if not include_deleted: + query = query.filter(FileAsset.deleted_at.is_(None)) + if owner_type: + query = query.filter(FileAsset.owner_type == owner_type) + if owner_type == "user" and owner_id: + query = query.filter(FileAsset.owner_user_id == owner_id) + if owner_type == "group" and owner_id: + query = query.filter(FileAsset.owner_group_id == owner_id) + if campaign_id: + query = query.join(FileShare, FileShare.file_asset_id == FileAsset.id).filter( + FileShare.tenant_id == tenant_id, + FileShare.target_type == "campaign", + FileShare.target_id == campaign_id, + FileShare.revoked_at.is_(None), + ) + elif not is_admin and not owner_type: + group_ids = user_group_ids(session, tenant_id=tenant_id, user_id=user_id) + query = query.outerjoin(FileShare, FileShare.file_asset_id == FileAsset.id).filter( + or_( + (FileAsset.owner_type == "user") & (FileAsset.owner_user_id == user_id), + (FileAsset.owner_type == "group") & (FileAsset.owner_group_id.in_(group_ids)), + (FileShare.revoked_at.is_(None)) & (FileShare.target_type == "user") & (FileShare.target_id == user_id), + (FileShare.revoked_at.is_(None)) & (FileShare.target_type == "group") & (FileShare.target_id.in_(group_ids)), + (FileShare.revoked_at.is_(None)) & (FileShare.target_type == "tenant") & (FileShare.target_id == tenant_id), + ) + ) + if path_prefix: + prefix = normalize_folder(path_prefix) + if prefix: + query = query.filter(FileAsset.display_path.like(f"{prefix}/%")) + return query.order_by(FileAsset.display_path.asc(), FileAsset.updated_at.desc()).all() + + +def current_version_and_blob(session: Session, asset: FileAsset) -> tuple[FileVersion, FileBlob]: + if not asset.current_version_id: + raise FileStorageError("File has no current version") + version = session.get(FileVersion, asset.current_version_id) + if not version: + raise FileStorageError("File version not found") + blob = session.get(FileBlob, version.blob_id) + if not blob: + raise FileStorageError("File blob not found") + return version, blob + + +def read_asset_bytes(session: Session, asset: FileAsset) -> tuple[bytes, FileVersion, FileBlob]: + version, blob = current_version_and_blob(session, asset) + backend = get_storage_backend() + return backend.get_bytes(blob.storage_key), version, blob + + +def share_file( + session: Session, + *, + tenant_id: str, + asset: FileAsset, + target_type: str, + target_id: str, + permission: str, + user_id: str, +) -> FileShare: + target_type = target_type.lower().strip() + permission = permission.lower().strip() + if target_type not in {"user", "group", "campaign", "tenant"}: + raise FileStorageError("Unsupported share target") + if permission not in {"read", "write", "manage"}: + raise FileStorageError("Unsupported file permission") + if target_type == "campaign": + campaign = session.get(Campaign, target_id) + if not campaign or campaign.tenant_id != tenant_id: + raise FileStorageError("Campaign not found") + existing = ( + session.query(FileShare) + .filter( + FileShare.tenant_id == tenant_id, + FileShare.file_asset_id == asset.id, + FileShare.target_type == target_type, + FileShare.target_id == target_id, + FileShare.revoked_at.is_(None), + ) + .one_or_none() + ) + if existing: + existing.permission = permission + session.add(existing) + return existing + share = FileShare( + tenant_id=tenant_id, + file_asset_id=asset.id, + target_type=target_type, + target_id=target_id, + permission=permission, + created_by_user_id=user_id, + ) + session.add(share) + return share + + +def soft_delete_assets(session: Session, assets: Iterable[FileAsset]) -> int: + count = 0 + now = utcnow() + for asset in assets: + if asset.deleted_at is None: + asset.deleted_at = now + session.add(asset) + count += 1 + return count + + +def asset_is_audit_relevant(session: Session, asset: FileAsset) -> bool: + return ( + session.query(CampaignAttachmentUse) + .filter(CampaignAttachmentUse.file_asset_id == asset.id, CampaignAttachmentUse.use_stage == "sent") + .first() + is not None + ) + + +def _normalize_pattern(pattern: str) -> str: + if pattern.strip() in {"", "*"}: + return "*" + return normalize_logical_path(pattern, fallback_filename="*") + + +def _logical_glob_regex(pattern: str) -> re.Pattern[str]: + """Compile Multi Seal Mail logical globs. + + `*` and `?` stay within one folder segment. `**` crosses folder + boundaries, and `**/` also matches the current folder so `**/*.pdf` + returns direct and nested PDF files. + """ + + pattern = _normalize_pattern(pattern) + pieces = ["^"] + index = 0 + while index < len(pattern): + char = pattern[index] + if char == "*": + if index + 1 < len(pattern) and pattern[index + 1] == "*": + index += 2 + if index < len(pattern) and pattern[index] == "/": + pieces.append("(?:.*/)?") + index += 1 + else: + pieces.append(".*") + continue + pieces.append("[^/]*") + elif char == "?": + pieces.append("[^/]") + else: + pieces.append(re.escape(char)) + index += 1 + pieces.append("$") + return re.compile("".join(pieces)) + + +def _relative_display_path(asset: FileAsset, base_path: str | None) -> str: + path = normalize_logical_path(asset.display_path) + base = normalize_folder(base_path) + if not base: + return path + prefix = f"{base}/" + if path.startswith(prefix): + return path[len(prefix) :] + return path + + +def match_assets(assets: Iterable[FileAsset], pattern: str, *, base_path: str | None = None) -> list[FileAsset]: + regex = _logical_glob_regex(pattern) + normalized_pattern = _normalize_pattern(pattern) + has_path_context = base_path is not None or "/" in normalized_pattern or "**" in normalized_pattern + matches: list[FileAsset] = [] + for asset in assets: + candidates = [_relative_display_path(asset, base_path)] if has_path_context else [asset.display_path, asset.filename] + if any(regex.match(candidate) for candidate in candidates): + matches.append(asset) + return matches + + +def resolve_patterns(assets: list[FileAsset], patterns: list[str], *, base_path: str | None = None) -> tuple[list[ResolvedPattern], list[FileAsset]]: + resolved = [ResolvedPattern(pattern=pattern, matches=match_assets(assets, pattern, base_path=base_path)) for pattern in patterns] + matched_ids = {asset.id for item in resolved for asset in item.matches} + unmatched = [asset for asset in assets if asset.id not in matched_ids] + return resolved, unmatched + + +def rename_asset(asset: FileAsset, *, new_path: str) -> None: + normalized = normalize_logical_path(new_path) + asset.display_path = normalized + asset.filename = filename_from_path(normalized) + + +def build_rename_preview(asset: FileAsset, *, mode: str, find: str | None = None, replacement: str = "", prefix: str = "", suffix: str = "") -> str: + path = PurePosixPath(asset.display_path) + folder = "" if str(path.parent) == "." else str(path.parent) + name = path.name + stem = PurePosixPath(name).stem + ext = "".join(PurePosixPath(name).suffixes) + if mode == "prefix": + next_name = prefix + name + elif mode == "suffix": + next_name = f"{stem}{suffix}{ext}" + elif mode == "replace": + if not find: + next_name = name + else: + next_name = name.replace(find, replacement) + else: + raise FileStorageError("Unsupported rename mode") + return f"{folder}/{next_name}" if folder else next_name + + +def create_zip_bytes(session: Session, assets: Iterable[FileAsset]) -> bytes: + buffer = BytesIO() + with zipfile.ZipFile(buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as archive: + for asset in assets: + data, _, _ = read_asset_bytes(session, asset) + archive.writestr(asset.display_path, data) + buffer.seek(0) + return buffer.getvalue() + + +def extract_zip_upload( + session: Session, + *, + tenant_id: str, + owner_type: str, + owner_id: str, + user_id: str, + zip_data: bytes, + folder: str | None, + campaign_id: str | None, + is_admin: bool = False, + max_files: int = 1000, + max_total_bytes: int = 250 * 1024 * 1024, +) -> list[UploadedStoredFile]: + uploaded: list[UploadedStoredFile] = [] + total = 0 + base_folder = normalize_folder(folder) + with zipfile.ZipFile(BytesIO(zip_data)) as archive: + infos = [info for info in archive.infolist() if not info.is_dir()] + if len(infos) > max_files: + raise FileStorageError(f"ZIP contains too many files (limit {max_files})") + for info in infos: + if info.file_size < 0: + raise FileStorageError("Invalid ZIP member") + total += info.file_size + if total > max_total_bytes: + raise FileStorageError("ZIP is too large after extraction") + inner_path = normalize_logical_path(info.filename) + target_path = f"{base_folder}/{inner_path}" if base_folder else inner_path + data = archive.read(info) + uploaded.append( + create_file_asset( + session, + tenant_id=tenant_id, + owner_type=owner_type, + owner_id=owner_id, + user_id=user_id, + filename=filename_from_path(inner_path), + data=data, + display_path=target_path, + content_type=mimetypes.guess_type(inner_path)[0] or "application/octet-stream", + campaign_id=campaign_id, + is_admin=is_admin, + ) + ) + return uploaded + + +def _candidate_match_keys(raw_match: str) -> set[str]: + cleaned = raw_match.replace("\\", "/").strip().strip("/") + result = {cleaned} + if cleaned: + result.add(PurePosixPath(cleaned).name) + return {item for item in result if item} + + +def record_campaign_attachment_uses_for_job(session: Session, job: CampaignJob, *, stage: str = "built") -> None: + """Create best-effort immutable file-use records for matched managed files. + + Existing attachment resolution is still filesystem/path based. This bridge + records uses when a resolved attachment match can be tied to a managed file + by logical path or filename among files shared with the campaign. + """ + + attachments = job.resolved_attachments or [] + if not isinstance(attachments, list): + return + assets = list_assets_for_user( + session, + tenant_id=job.tenant_id, + user_id="", + campaign_id=job.campaign_id, + is_admin=True, + ) + by_key: dict[str, FileAsset] = {} + for asset in assets: + by_key[asset.display_path.strip("/")] = asset + by_key[asset.filename] = asset + for attachment in attachments: + if not isinstance(attachment, dict): + continue + matches = attachment.get("matches") if isinstance(attachment.get("matches"), list) else [] + for raw in matches: + if not isinstance(raw, str): + continue + asset = next((by_key[key] for key in _candidate_match_keys(raw) if key in by_key), None) + if not asset: + continue + version, blob = current_version_and_blob(session, asset) + exists = ( + session.query(CampaignAttachmentUse) + .filter( + CampaignAttachmentUse.campaign_job_id == job.id, + CampaignAttachmentUse.file_version_id == version.id, + CampaignAttachmentUse.filename_used == asset.filename, + CampaignAttachmentUse.use_stage == stage, + ) + .one_or_none() + ) + if exists: + continue + session.add( + CampaignAttachmentUse( + tenant_id=job.tenant_id, + campaign_id=job.campaign_id, + campaign_version_id=job.campaign_version_id, + campaign_job_id=job.id, + entry_index=job.entry_index, + entry_id=job.entry_id, + file_asset_id=asset.id, + file_version_id=version.id, + file_blob_id=blob.id, + filename_used=asset.filename, + checksum_sha256=blob.checksum_sha256, + size_bytes=blob.size_bytes, + content_type=blob.content_type, + use_stage=stage, + ) + ) + + +def mark_job_attachment_uses_sent(session: Session, job: CampaignJob) -> None: + record_campaign_attachment_uses_for_job(session, job, stage="built") + now = utcnow() + uses = ( + session.query(CampaignAttachmentUse) + .filter( + CampaignAttachmentUse.tenant_id == job.tenant_id, + CampaignAttachmentUse.campaign_job_id == job.id, + CampaignAttachmentUse.use_stage == "built", + ) + .all() + ) + for use in uses: + sent = ( + session.query(CampaignAttachmentUse) + .filter( + CampaignAttachmentUse.campaign_job_id == job.id, + CampaignAttachmentUse.file_version_id == use.file_version_id, + CampaignAttachmentUse.use_stage == "sent", + ) + .one_or_none() + ) + if sent: + continue + session.add( + CampaignAttachmentUse( + tenant_id=use.tenant_id, + campaign_id=use.campaign_id, + campaign_version_id=use.campaign_version_id, + campaign_job_id=use.campaign_job_id, + entry_index=use.entry_index, + entry_id=use.entry_id, + file_asset_id=use.file_asset_id, + file_version_id=use.file_version_id, + file_blob_id=use.file_blob_id, + filename_used=use.filename_used, + checksum_sha256=use.checksum_sha256, + size_bytes=use.size_bytes, + content_type=use.content_type, + use_stage="sent", + used_at=now, + ) + ) diff --git a/server/multimailer-dev.db b/server/multimailer-dev.db index 39a31f6..332338a 100644 Binary files a/server/multimailer-dev.db and b/server/multimailer-dev.db differ