Files
meubility-workbench/tests/test_gtfs_import.py
2026-07-01 23:29:51 +02:00

73 lines
3.6 KiB
Python

from __future__ import annotations
import json
import zipfile
from sqlalchemy import func, select
from app.db import reset_db, session_scope
from app.gtfs_storage import sidecar_path, stop_time_count, stop_times_by_trip
from app.journey import find_journeys, search_scheduled_stops
from app.models import Dataset, GtfsCalendar, Source
from app.pipeline.run import run_source
def test_gtfs_import_uses_staging_bulk_loader_and_reports_chunks(tmp_path, monkeypatch):
reset_db()
gtfs_path = tmp_path / "small.gtfs.zip"
with zipfile.ZipFile(gtfs_path, "w") as zf:
zf.writestr("agency.txt", "agency_id,agency_name,agency_url,agency_timezone\nA,Agency,https://example.invalid,Europe/Berlin\n")
zf.writestr(
"stops.txt",
"stop_id,stop_name,stop_lat,stop_lon\nA,Alpha,52.0,13.0\nB,Beta,52.1,13.1\nC,Gamma,52.2,13.2\n",
)
zf.writestr("routes.txt", "route_id,agency_id,route_short_name,route_long_name,route_type\nR,A,R1,Alpha - Gamma,3\n")
zf.writestr("trips.txt", "route_id,service_id,trip_id,shape_id\nR,daily,t1,s1\nR,daily,t2,s1\n")
zf.writestr("calendar.txt", "service_id,monday,tuesday,wednesday,thursday,friday,saturday,sunday,start_date,end_date\ndaily,1,1,1,1,1,1,1,20260101,20261231\n")
zf.writestr(
"stop_times.txt",
"\n".join(
[
"trip_id,arrival_time,departure_time,stop_id,stop_sequence",
"t1,08:00:00,08:00:00,A,1",
"t1,08:05:00,08:05:00,B,2",
"t1,08:10:00,08:10:00,C,3",
"t2,09:00:00,09:00:00,A,1",
"t2,09:10:00,09:10:00,C,2",
]
)
+ "\n",
)
zf.writestr("shapes.txt", "shape_id,shape_pt_lat,shape_pt_lon,shape_pt_sequence\ns1,52.0,13.0,1\ns1,52.2,13.2,2\n")
monkeypatch.setattr("app.pipeline.gtfs.GTFS_STAGE_BATCH_SIZE", 2)
events = []
with session_scope() as session:
source = Source(name="Small GTFS", kind="gtfs", url=str(gtfs_path))
session.add(source)
session.flush()
dataset = run_source(session, source, progress_callback=lambda *args: events.append(args))
metadata = json.loads(dataset.metadata_json or "{}")
assert metadata["importer"] == "gtfs_import_v6_sidecar_stop_times"
assert metadata["staging"] == "sqlite_promoted_to_sidecar"
assert metadata["gtfs_storage"]["tables"]["gtfs_stop_times"] == "sidecar"
assert metadata["stop_times_imported"] == 5
assert sidecar_path(dataset) is not None
assert sidecar_path(dataset).exists()
assert stop_time_count(session, dataset.id) == 5
assert len(stop_times_by_trip(session, dataset.id, ["t1"])["t1"]) == 3
assert session.scalar(select(func.count()).select_from(GtfsCalendar).where(GtfsCalendar.dataset_id == dataset.id)) == 1
assert session.scalar(select(func.count()).select_from(Dataset).where(Dataset.kind == "gtfs", Dataset.is_active.is_(True))) == 1
alpha = search_scheduled_stops(session, "Alpha", limit=1)[0]
gamma = search_scheduled_stops(session, "Gamma", limit=1)[0]
journey = find_journeys(session, alpha["id"], gamma["id"], "08:00", limit=1)
assert journey["journeys"][0]["departure_time"] == "08:00:00"
assert journey["journeys"][0]["arrival_time"] == "08:10:00"
event_types = [event[0] for event in events]
assert "gtfs_staging_started" in event_types
assert "gtfs_file_chunk" in event_types
assert "gtfs_activation_sidecar_stop_times" in event_types
assert "gtfs_activation_completed" in event_types