inital commit

This commit is contained in:
2026-06-08 15:57:11 +02:00
parent aaf8729663
commit d9ca48addc
114 changed files with 12172 additions and 1 deletions

View File

View File

@@ -0,0 +1,210 @@
from __future__ import annotations
from dataclasses import dataclass, field
from enum import StrEnum
from pathlib import Path
from .fields import Field, FieldConfiguration, FieldContents
from .recipients import Recipient, RecipientList, RecipientType
from .template import MailTemplate
class TransportSecurity(StrEnum):
PLAIN = "plain"
TLS = "tls"
STARTTLS = "starttls"
@property
def standard_port(self) -> int:
return 465 if self == TransportSecurity.TLS else 587
@dataclass
class MailServerSettings:
server: str = ""
port: int | None = None
username: str = ""
password: str = ""
transport_security: TransportSecurity = TransportSecurity.PLAIN
def use_plain(self) -> "MailServerSettings":
self.transport_security = TransportSecurity.PLAIN
self.port = self.port or self.transport_security.standard_port
return self
def use_tls(self) -> "MailServerSettings":
self.transport_security = TransportSecurity.TLS
self.port = self.port or self.transport_security.standard_port
return self
def use_starttls(self) -> "MailServerSettings":
self.transport_security = TransportSecurity.STARTTLS
self.port = self.port or self.transport_security.standard_port
return self
def resolved_port(self) -> int:
return self.port or self.transport_security.standard_port
@dataclass(frozen=True, slots=True)
class MailAttachmentConfig:
base_dir: Path
file_filter: str = "*"
include_subdirs: bool = False
@dataclass
class MailEntry:
field_config: FieldConfiguration
is_active: bool = True
from_recipient: Recipient | None = None
to: RecipientList = field(default_factory=RecipientList)
cc: RecipientList = field(default_factory=RecipientList)
bcc: RecipientList = field(default_factory=RecipientList)
combine_to: bool = True
combine_cc: bool = True
combine_bcc: bool = True
attachment_configs: list[MailAttachmentConfig] = field(default_factory=list)
combine_attachments: bool = True
field_contents: FieldContents = field(init=False)
def __post_init__(self) -> None:
self.field_contents = FieldContents(self.field_config)
def add_to(self, recipient: Recipient) -> "MailEntry":
self.to.add_recipient(recipient)
return self
def add_cc(self, recipient: Recipient) -> "MailEntry":
self.cc.add_recipient(recipient)
return self
def add_bcc(self, recipient: Recipient) -> "MailEntry":
self.bcc.add_recipient(recipient)
return self
def no_combine_to(self) -> "MailEntry":
self.combine_to = False
return self
def combine_to_recipients(self) -> "MailEntry":
self.combine_to = True
return self
def no_combine_attachments(self) -> "MailEntry":
self.combine_attachments = False
return self
def combine_attachments_with_global(self) -> "MailEntry":
self.combine_attachments = True
return self
def add_mail_attachment_config(self, config: MailAttachmentConfig) -> "MailEntry":
self.attachment_configs.append(config)
return self
def set_field_content_for_name(self, name: str, value: Field | object) -> "MailEntry":
if not self.field_contents.set_field_content_for_name(name, value):
raise KeyError(f"unknown field: {name}")
return self
def get_field_content_from_name(self, name: str) -> Field:
return self.field_contents.get_field_content_from_name(name)
@dataclass
class MailCampaign:
mail_server_settings: MailServerSettings | None = None
global_from: Recipient | None = None
global_to: RecipientList = field(default_factory=RecipientList)
global_cc: RecipientList = field(default_factory=RecipientList)
global_bcc: RecipientList = field(default_factory=RecipientList)
individual_from: bool = False
individual_to: bool = False
individual_cc: bool = False
individual_bcc: bool = False
base_attachment_path: Path = Path(".")
global_attachment_configs: list[MailAttachmentConfig] = field(default_factory=list)
individual_attachments: bool = False
send_without_attachments: bool = True
field_config: FieldConfiguration = field(default_factory=FieldConfiguration)
field_contents: FieldContents = field(init=False)
subject_template: MailTemplate = field(default_factory=MailTemplate)
mail_template: MailTemplate = field(default_factory=MailTemplate)
mail_entries: list[MailEntry] = field(default_factory=list)
def __post_init__(self) -> None:
self.field_contents = FieldContents(self.field_config)
@classmethod
def with_server_settings(cls, settings: MailServerSettings) -> "MailCampaign":
return cls(mail_server_settings=settings)
def add_field(self, name: str, field_type) -> "MailCampaign":
from .fields import FieldDescription
self.field_config.add_field_at_end(FieldDescription(name, field_type))
self.field_contents.ensure_field(self.field_config.get_field_description(name)) # type: ignore[arg-type]
for entry in self.mail_entries:
entry.field_contents.ensure_field(self.field_config.get_field_description(name)) # type: ignore[arg-type]
return self
def set_from(self, recipient: Recipient) -> "MailCampaign":
self.global_from = recipient
return self
def add_to(self, recipient: Recipient) -> "MailCampaign":
self.global_to.add_recipient(recipient)
return self
def allow_individual_to(self) -> "MailCampaign":
self.individual_to = True
return self
def disallow_individual_to(self) -> "MailCampaign":
self.individual_to = False
return self
def allow_individual_attachments(self) -> "MailCampaign":
self.individual_attachments = True
return self
def disallow_individual_attachments(self) -> "MailCampaign":
self.individual_attachments = False
return self
def dont_send_without_attachments(self) -> "MailCampaign":
self.send_without_attachments = False
return self
def send_without_attachments_allowed(self) -> "MailCampaign":
self.send_without_attachments = True
return self
def add_new_mail_entry(self) -> MailEntry:
entry = MailEntry(self.field_config)
self.mail_entries.append(entry)
return entry
def set_field_content_for_name(self, name: str, value: Field | object) -> "MailCampaign":
if not self.field_contents.set_field_content_for_name(name, value):
raise KeyError(f"unknown field: {name}")
return self
def get_field_content_from_name(self, name: str) -> Field:
return self.field_contents.get_field_content_from_name(name)
def all_recipients_for(self, entry: MailEntry) -> list[Recipient]:
recipients: list[Recipient] = []
if not self.individual_to or entry.combine_to:
recipients.extend(self.global_to.recipients)
if not self.individual_cc or entry.combine_cc:
recipients.extend(self.global_cc.recipients)
if not self.individual_bcc or entry.combine_bcc:
recipients.extend(self.global_bcc.recipients)
if self.individual_to:
recipients.extend(entry.to.recipients)
if self.individual_cc:
recipients.extend(entry.cc.recipients)
if self.individual_bcc:
recipients.extend(entry.bcc.recipients)
return recipients

View File

@@ -0,0 +1,125 @@
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import date, datetime
from enum import StrEnum
from typing import Any
class FieldType(StrEnum):
STRING = "string"
INTEGER = "integer"
DOUBLE = "double"
DATE = "date"
PASSWORD = "password"
@dataclass(slots=True)
class FieldDescription:
name: str
type: FieldType = FieldType.STRING
@dataclass(slots=True)
class Field:
content: Any
@classmethod
def with_content(cls, content: Any) -> "Field":
if content is None:
raise ValueError("content must not be None")
return cls(content=content)
@property
def type(self) -> FieldType:
if isinstance(self.content, bool):
return FieldType.STRING
if isinstance(self.content, int):
return FieldType.INTEGER
if isinstance(self.content, float):
return FieldType.DOUBLE
if isinstance(self.content, (date, datetime)):
return FieldType.DATE
if isinstance(self.content, (bytes, bytearray)):
return FieldType.PASSWORD
return FieldType.STRING
def as_string(self) -> str:
if isinstance(self.content, (bytes, bytearray)):
return self.content.decode("utf-8")
if isinstance(self.content, (date, datetime)):
return self.content.isoformat()
return str(self.content)
@dataclass
class FieldConfiguration:
fields: list[FieldDescription] = field(default_factory=list)
def add_field_at_end(self, field_description: FieldDescription) -> "FieldConfiguration":
return self.add_field_at_position(len(self.fields), field_description)
def add_field_at_start(self, field_description: FieldDescription) -> "FieldConfiguration":
return self.add_field_at_position(0, field_description)
def add_field_at_position(self, position: int, field_description: FieldDescription) -> "FieldConfiguration":
if self.has_field(field_description.name):
raise ValueError(f"field already exists: {field_description.name}")
position = max(0, min(position, len(self.fields)))
self.fields.insert(position, field_description)
return self
def has_field(self, name: str) -> bool:
return any(f.name == name for f in self.fields)
def get_field_description(self, name: str) -> FieldDescription | None:
return next((f for f in self.fields if f.name == name), None)
def get_field_names(self) -> list[str]:
return [f.name for f in self.fields]
@dataclass
class FieldContents:
field_config: FieldConfiguration
field_map: dict[str, Field] = field(default_factory=dict)
def __post_init__(self) -> None:
for field_description in self.field_config.fields:
self.ensure_field(field_description)
def ensure_field(self, field_description: FieldDescription) -> None:
if field_description.name in self.field_map:
return
match field_description.type:
case FieldType.INTEGER:
value = 0
case FieldType.DOUBLE:
value = 0.0
case FieldType.DATE:
value = date.today()
case FieldType.PASSWORD:
value = b""
case _:
value = ""
self.field_map[field_description.name] = Field.with_content(value)
def get_field_content_from_name(self, name: str) -> Field:
try:
return self.field_map[name]
except KeyError as exc:
raise KeyError(f"unknown field: {name}") from exc
def set_field_content_for_name(self, name: str, value: Field | Any) -> bool:
if name not in self.field_map:
return False
if not isinstance(value, Field):
value = Field.with_content(value)
expected = self.field_map[name].type
if expected != value.type and expected != FieldType.PASSWORD:
raise TypeError(f"field {name!r} expects {expected}, got {value.type}")
self.field_map[name] = value
return True
def as_value_map(self, prefix: str) -> dict[str, str]:
return {f"{prefix}::{name}": field.as_string() for name, field in self.field_map.items()}

View File

@@ -0,0 +1,29 @@
from __future__ import annotations
from dataclasses import dataclass, field
from email.message import EmailMessage
from typing import Iterator
@dataclass
class MailQueue:
messages: list[EmailMessage] = field(default_factory=list)
def add_mail(self, message: EmailMessage) -> None:
self.messages.append(message)
def remove_mail(self, message: EmailMessage) -> bool:
if message in self.messages:
self.messages.remove(message)
return True
return False
@property
def mail_count(self) -> int:
return len(self.messages)
def is_empty(self) -> bool:
return not self.messages
def __iter__(self) -> Iterator[EmailMessage]:
return iter(self.messages)

View File

@@ -0,0 +1,43 @@
from __future__ import annotations
from dataclasses import dataclass, field
from email.utils import formataddr
from enum import StrEnum
class RecipientType(StrEnum):
TO = "to"
CC = "cc"
BCC = "bcc"
@dataclass(frozen=True, slots=True)
class Recipient:
address: str
name: str | None = None
type: RecipientType = RecipientType.TO
def formatted(self) -> str:
return formataddr((self.name or self.address, self.address))
@dataclass
class RecipientList:
recipients: list[Recipient] = field(default_factory=list)
def add_recipient(self, recipient: Recipient) -> "RecipientList":
if recipient not in self.recipients:
self.recipients.append(recipient)
return self
def add_multiple_recipients(self, recipients: list[Recipient] | tuple[Recipient, ...]) -> "RecipientList":
for recipient in recipients:
self.add_recipient(recipient)
return self
def clear_all_recipients(self) -> "RecipientList":
self.recipients.clear()
return self
def by_type(self, recipient_type: RecipientType) -> list[Recipient]:
return [r for r in self.recipients if r.type == recipient_type]

View File

@@ -0,0 +1,28 @@
from __future__ import annotations
import re
from dataclasses import dataclass
_FIELD_PATTERN = re.compile(r"(?<!\\)\$\{(.*?)(?<!\\)\}")
@dataclass
class MailTemplate:
template_string: str = ""
def set_template_string(self, template: str) -> "MailTemplate":
self.template_string = template
return self
def get_used_fields(self) -> set[str]:
return set(_FIELD_PATTERN.findall(self.template_string))
def apply_values(self, values: dict[str, str], *, keep_missing: bool = True) -> str:
def replace(match: re.Match[str]) -> str:
key = match.group(1)
if key in values:
return values[key]
return match.group(0) if keep_missing else ""
rendered = _FIELD_PATTERN.sub(replace, self.template_string)
return rendered.replace(r"\${", "${").replace(r"\}", "}")