Files
meubility-workbench/tests/test_osm_replication.py
2026-07-01 23:29:51 +02:00

93 lines
3.5 KiB
Python

from __future__ import annotations
import subprocess
from sqlalchemy import select
from app.db import reset_db, session_scope
from app.models import Dataset, OsmDiffState, Source
from app.pipeline.osm_pbf import _try_prepare_raw_from_diffs
from app.pipeline.osm_replication import ReplicationState, diff_url_for_sequence, parse_replication_state_text
def test_parse_replication_state_text_and_diff_url():
state = parse_replication_state_text(
"""
#Sat Jun 27 21:21:03 UTC 2026
sequenceNumber=1234
timestamp=2026-06-27T21\\:21\\:02Z
"""
)
assert state.sequence_number == 1234
assert state.timestamp == "2026-06-27T21:21:02Z"
assert diff_url_for_sequence("https://download.geofabrik.de/europe/germany/berlin-updates", 1234).endswith(
"/000/001/234.osc.gz"
)
def test_osm_diff_application_records_new_raw_dataset_and_state(tmp_path, monkeypatch):
reset_db()
base_path = tmp_path / "base.osm.pbf"
base_path.write_bytes(b"base")
diff_paths = []
def fake_fetch(_updates_url, timeout=30):
return ReplicationState(sequence_number=3, timestamp="2026-06-27T21:21:02Z", raw={"sequenceNumber": "3"})
def fake_download(_updates_url, sequence_number, output_dir, timeout=120):
path = output_dir / f"{sequence_number}.osc.gz"
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(f"diff-{sequence_number}".encode())
diff_paths.append(path)
return path
def fake_apply(base, diffs, output, host_tool):
output.write_bytes(base.read_bytes() + b"+" + b"+".join(path.read_bytes() for path in diffs))
return subprocess.CompletedProcess(args=["osmium"], returncode=0, stdout="applied", stderr="")
monkeypatch.setattr("app.pipeline.osm_pbf.fetch_replication_state", fake_fetch)
monkeypatch.setattr("app.pipeline.osm_pbf.download_diff", fake_download)
monkeypatch.setattr("app.pipeline.osm_pbf.apply_osm_changes", fake_apply)
with session_scope() as session:
source = Source(
name="Berlin OSM",
kind="osm_pbf",
url="https://download.geofabrik.de/europe/germany/berlin-latest.osm.pbf",
notes="geofabrik_id=berlin; updates_url=https://download.geofabrik.de/europe/germany/berlin-updates",
)
session.add(source)
session.flush()
base_dataset = Dataset(
source_id=source.id,
kind="osm_pbf_raw",
local_path=str(base_path),
sha256="b" * 64,
is_active=False,
status="committed",
)
session.add(base_dataset)
session.flush()
session.add(
OsmDiffState(
source_id=source.id,
raw_dataset_id=base_dataset.id,
updates_url="https://download.geofabrik.de/europe/germany/berlin-updates",
sequence_number=1,
timestamp="2026-06-26T21:21:02Z",
status="active",
)
)
session.flush()
new_dataset = _try_prepare_raw_from_diffs(session, source)
assert new_dataset is not None
assert new_dataset.id != base_dataset.id
assert new_dataset.kind == "osm_pbf_raw"
assert len(diff_paths) == 2
states = session.scalars(select(OsmDiffState).where(OsmDiffState.source_id == source.id).order_by(OsmDiffState.sequence_number)).all()
assert [state.sequence_number for state in states] == [1, 3]
assert [state.status for state in states] == ["superseded", "active"]