from __future__ import annotations import subprocess from sqlalchemy import select from app.db import reset_db, session_scope from app.models import Dataset, OsmDiffState, Source from app.pipeline.osm_pbf import _try_prepare_raw_from_diffs from app.pipeline.osm_replication import ReplicationState, diff_url_for_sequence, parse_replication_state_text def test_parse_replication_state_text_and_diff_url(): state = parse_replication_state_text( """ #Sat Jun 27 21:21:03 UTC 2026 sequenceNumber=1234 timestamp=2026-06-27T21\\:21\\:02Z """ ) assert state.sequence_number == 1234 assert state.timestamp == "2026-06-27T21:21:02Z" assert diff_url_for_sequence("https://download.geofabrik.de/europe/germany/berlin-updates", 1234).endswith( "/000/001/234.osc.gz" ) def test_osm_diff_application_records_new_raw_dataset_and_state(tmp_path, monkeypatch): reset_db() base_path = tmp_path / "base.osm.pbf" base_path.write_bytes(b"base") diff_paths = [] def fake_fetch(_updates_url, timeout=30): return ReplicationState(sequence_number=3, timestamp="2026-06-27T21:21:02Z", raw={"sequenceNumber": "3"}) def fake_download(_updates_url, sequence_number, output_dir, timeout=120): path = output_dir / f"{sequence_number}.osc.gz" path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(f"diff-{sequence_number}".encode()) diff_paths.append(path) return path def fake_apply(base, diffs, output, host_tool): output.write_bytes(base.read_bytes() + b"+" + b"+".join(path.read_bytes() for path in diffs)) return subprocess.CompletedProcess(args=["osmium"], returncode=0, stdout="applied", stderr="") monkeypatch.setattr("app.pipeline.osm_pbf.fetch_replication_state", fake_fetch) monkeypatch.setattr("app.pipeline.osm_pbf.download_diff", fake_download) monkeypatch.setattr("app.pipeline.osm_pbf.apply_osm_changes", fake_apply) with session_scope() as session: source = Source( name="Berlin OSM", kind="osm_pbf", url="https://download.geofabrik.de/europe/germany/berlin-latest.osm.pbf", notes="geofabrik_id=berlin; updates_url=https://download.geofabrik.de/europe/germany/berlin-updates", ) session.add(source) session.flush() base_dataset = Dataset( source_id=source.id, kind="osm_pbf_raw", local_path=str(base_path), sha256="b" * 64, is_active=False, status="committed", ) session.add(base_dataset) session.flush() session.add( OsmDiffState( source_id=source.id, raw_dataset_id=base_dataset.id, updates_url="https://download.geofabrik.de/europe/germany/berlin-updates", sequence_number=1, timestamp="2026-06-26T21:21:02Z", status="active", ) ) session.flush() new_dataset = _try_prepare_raw_from_diffs(session, source) assert new_dataset is not None assert new_dataset.id != base_dataset.id assert new_dataset.kind == "osm_pbf_raw" assert len(diff_paths) == 2 states = session.scalars(select(OsmDiffState).where(OsmDiffState.source_id == source.id).order_by(OsmDiffState.sequence_number)).all() assert [state.sequence_number for state in states] == [1, 3] assert [state.status for state in states] == ["superseded", "active"]