164 lines
5.3 KiB
Python
164 lines
5.3 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
from alembic import command
|
|
from alembic.config import Config
|
|
from alembic.runtime.migration import MigrationContext
|
|
from sqlalchemy import create_engine, inspect
|
|
|
|
from app.settings import settings
|
|
|
|
# Historic development databases could be created partly through Alembic and
|
|
# partly through Base.metadata.create_all(). In that state Alembic still says
|
|
# "2c..." while the 3d/4e file-storage tables already exist, so a normal
|
|
# upgrade attempts to create file_blobs again. This reconciliation is kept
|
|
# deliberately narrow and only advances the marker when the complete expected
|
|
# schema for the skipped revisions is already present.
|
|
REVISION_AUTH_RBAC = "2c3d4e5f6a7b"
|
|
REVISION_FILE_STORAGE = "3d4e5f6a7b8c"
|
|
REVISION_FILE_FOLDERS = "4e5f6a7b8c9d"
|
|
|
|
_FILE_STORAGE_TABLES = {
|
|
"file_blobs",
|
|
"file_assets",
|
|
"file_versions",
|
|
"file_shares",
|
|
"campaign_attachment_uses",
|
|
}
|
|
_FILE_FOLDER_TABLES = {"file_folders"}
|
|
|
|
_FILE_STORAGE_COLUMNS = {
|
|
"file_blobs": {
|
|
"id",
|
|
"tenant_id",
|
|
"storage_backend",
|
|
"storage_key",
|
|
"checksum_sha256",
|
|
"size_bytes",
|
|
},
|
|
"file_assets": {
|
|
"id",
|
|
"tenant_id",
|
|
"owner_type",
|
|
"display_path",
|
|
"filename",
|
|
"current_version_id",
|
|
},
|
|
"file_versions": {
|
|
"id",
|
|
"file_asset_id",
|
|
"blob_id",
|
|
"version_number",
|
|
"checksum_sha256",
|
|
},
|
|
"file_shares": {"id", "file_asset_id", "target_type", "target_id", "permission"},
|
|
"campaign_attachment_uses": {
|
|
"id",
|
|
"campaign_id",
|
|
"campaign_version_id",
|
|
"file_asset_id",
|
|
"file_version_id",
|
|
"file_blob_id",
|
|
},
|
|
"file_folders": {"id", "tenant_id", "owner_type", "path"},
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True, slots=True)
|
|
class MigrationResult:
|
|
previous_revision: str | None
|
|
reconciled_revision: str | None
|
|
current_revision: str | None
|
|
|
|
|
|
def alembic_config(*, database_url: str | None = None) -> Config:
|
|
server_root = Path(__file__).resolve().parents[2]
|
|
config = Config(str(server_root / "alembic.ini"))
|
|
config.set_main_option("script_location", str(server_root / "alembic"))
|
|
config.attributes["database_url"] = database_url or settings.database_url
|
|
return config
|
|
|
|
|
|
def database_revision(database_url: str | None = None) -> str | None:
|
|
url = database_url or settings.database_url
|
|
engine = create_engine(url)
|
|
try:
|
|
with engine.connect() as connection:
|
|
return MigrationContext.configure(connection).get_current_revision()
|
|
finally:
|
|
engine.dispose()
|
|
|
|
|
|
def _has_columns(inspector, table_name: str, required: set[str]) -> bool:
|
|
try:
|
|
actual = {column["name"] for column in inspector.get_columns(table_name)}
|
|
except Exception:
|
|
return False
|
|
return required.issubset(actual)
|
|
|
|
|
|
def reconcile_legacy_create_all_schema(database_url: str | None = None) -> str | None:
|
|
"""Repair the known Alembic/create_all drift without modifying table data.
|
|
|
|
Returns the revision stamped during reconciliation, or ``None`` when no
|
|
repair was necessary. A partial/unknown schema is intentionally left alone
|
|
so Alembic can fail visibly instead of guessing.
|
|
"""
|
|
|
|
url = database_url or settings.database_url
|
|
engine = create_engine(url)
|
|
try:
|
|
with engine.connect() as connection:
|
|
current = MigrationContext.configure(connection).get_current_revision()
|
|
schema = inspect(connection)
|
|
tables = set(schema.get_table_names())
|
|
|
|
has_file_storage = _FILE_STORAGE_TABLES.issubset(tables) and all(
|
|
_has_columns(schema, table, _FILE_STORAGE_COLUMNS[table])
|
|
for table in _FILE_STORAGE_TABLES
|
|
)
|
|
has_file_folders = _FILE_FOLDER_TABLES.issubset(tables) and _has_columns(
|
|
schema,
|
|
"file_folders",
|
|
_FILE_STORAGE_COLUMNS["file_folders"],
|
|
)
|
|
finally:
|
|
engine.dispose()
|
|
|
|
target: str | None = None
|
|
if current == REVISION_AUTH_RBAC and has_file_storage and has_file_folders:
|
|
target = REVISION_FILE_FOLDERS
|
|
elif current == REVISION_AUTH_RBAC and has_file_storage:
|
|
target = REVISION_FILE_STORAGE
|
|
elif current == REVISION_FILE_STORAGE and has_file_folders:
|
|
target = REVISION_FILE_FOLDERS
|
|
elif current is None and has_file_storage and has_file_folders:
|
|
# This is the other create_all-only development shape. The strict
|
|
# column checks above ensure that we only stamp a complete known schema.
|
|
target = REVISION_FILE_FOLDERS
|
|
|
|
if target is None:
|
|
return None
|
|
|
|
command.stamp(alembic_config(database_url=url), target)
|
|
return target
|
|
|
|
|
|
def migrate_database(
|
|
*,
|
|
database_url: str | None = None,
|
|
reconcile_legacy_schema: bool = True,
|
|
) -> MigrationResult:
|
|
url = database_url or settings.database_url
|
|
previous = database_revision(url)
|
|
reconciled = reconcile_legacy_create_all_schema(url) if reconcile_legacy_schema else None
|
|
command.upgrade(alembic_config(database_url=url), "head")
|
|
current = database_revision(url)
|
|
return MigrationResult(
|
|
previous_revision=previous,
|
|
reconciled_revision=reconciled,
|
|
current_revision=current,
|
|
)
|