73 lines
3.6 KiB
Python
73 lines
3.6 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import zipfile
|
|
|
|
from sqlalchemy import func, select
|
|
|
|
from app.db import reset_db, session_scope
|
|
from app.gtfs_storage import sidecar_path, stop_time_count, stop_times_by_trip
|
|
from app.journey import find_journeys, search_scheduled_stops
|
|
from app.models import Dataset, GtfsCalendar, Source
|
|
from app.pipeline.run import run_source
|
|
|
|
|
|
def test_gtfs_import_uses_staging_bulk_loader_and_reports_chunks(tmp_path, monkeypatch):
|
|
reset_db()
|
|
gtfs_path = tmp_path / "small.gtfs.zip"
|
|
with zipfile.ZipFile(gtfs_path, "w") as zf:
|
|
zf.writestr("agency.txt", "agency_id,agency_name,agency_url,agency_timezone\nA,Agency,https://example.invalid,Europe/Berlin\n")
|
|
zf.writestr(
|
|
"stops.txt",
|
|
"stop_id,stop_name,stop_lat,stop_lon\nA,Alpha,52.0,13.0\nB,Beta,52.1,13.1\nC,Gamma,52.2,13.2\n",
|
|
)
|
|
zf.writestr("routes.txt", "route_id,agency_id,route_short_name,route_long_name,route_type\nR,A,R1,Alpha - Gamma,3\n")
|
|
zf.writestr("trips.txt", "route_id,service_id,trip_id,shape_id\nR,daily,t1,s1\nR,daily,t2,s1\n")
|
|
zf.writestr("calendar.txt", "service_id,monday,tuesday,wednesday,thursday,friday,saturday,sunday,start_date,end_date\ndaily,1,1,1,1,1,1,1,20260101,20261231\n")
|
|
zf.writestr(
|
|
"stop_times.txt",
|
|
"\n".join(
|
|
[
|
|
"trip_id,arrival_time,departure_time,stop_id,stop_sequence",
|
|
"t1,08:00:00,08:00:00,A,1",
|
|
"t1,08:05:00,08:05:00,B,2",
|
|
"t1,08:10:00,08:10:00,C,3",
|
|
"t2,09:00:00,09:00:00,A,1",
|
|
"t2,09:10:00,09:10:00,C,2",
|
|
]
|
|
)
|
|
+ "\n",
|
|
)
|
|
zf.writestr("shapes.txt", "shape_id,shape_pt_lat,shape_pt_lon,shape_pt_sequence\ns1,52.0,13.0,1\ns1,52.2,13.2,2\n")
|
|
|
|
monkeypatch.setattr("app.pipeline.gtfs.GTFS_STAGE_BATCH_SIZE", 2)
|
|
events = []
|
|
with session_scope() as session:
|
|
source = Source(name="Small GTFS", kind="gtfs", url=str(gtfs_path))
|
|
session.add(source)
|
|
session.flush()
|
|
dataset = run_source(session, source, progress_callback=lambda *args: events.append(args))
|
|
|
|
metadata = json.loads(dataset.metadata_json or "{}")
|
|
assert metadata["importer"] == "gtfs_import_v6_sidecar_stop_times"
|
|
assert metadata["staging"] == "sqlite_promoted_to_sidecar"
|
|
assert metadata["gtfs_storage"]["tables"]["gtfs_stop_times"] == "sidecar"
|
|
assert metadata["stop_times_imported"] == 5
|
|
assert sidecar_path(dataset) is not None
|
|
assert sidecar_path(dataset).exists()
|
|
assert stop_time_count(session, dataset.id) == 5
|
|
assert len(stop_times_by_trip(session, dataset.id, ["t1"])["t1"]) == 3
|
|
assert session.scalar(select(func.count()).select_from(GtfsCalendar).where(GtfsCalendar.dataset_id == dataset.id)) == 1
|
|
assert session.scalar(select(func.count()).select_from(Dataset).where(Dataset.kind == "gtfs", Dataset.is_active.is_(True))) == 1
|
|
alpha = search_scheduled_stops(session, "Alpha", limit=1)[0]
|
|
gamma = search_scheduled_stops(session, "Gamma", limit=1)[0]
|
|
journey = find_journeys(session, alpha["id"], gamma["id"], "08:00", limit=1)
|
|
assert journey["journeys"][0]["departure_time"] == "08:00:00"
|
|
assert journey["journeys"][0]["arrival_time"] == "08:10:00"
|
|
|
|
event_types = [event[0] for event in events]
|
|
assert "gtfs_staging_started" in event_types
|
|
assert "gtfs_file_chunk" in event_types
|
|
assert "gtfs_activation_sidecar_stop_times" in event_types
|
|
assert "gtfs_activation_completed" in event_types
|