Files
meubility-workbench/tests/test_route_layer.py
2026-07-01 23:29:51 +02:00

1005 lines
38 KiB
Python

from __future__ import annotations
import json
from shapely.geometry import LineString, Point, shape
from sqlalchemy import select
from app.db import reset_db, session_scope
from app.journey import find_journeys, search_scheduled_stops
from app.models import (
CanonicalStopLink,
Dataset,
GtfsCalendar,
GtfsCalendarDate,
GtfsRoute,
GtfsRoutePatternLink,
GtfsShape,
GtfsStop,
GtfsStopTime,
GtfsTrip,
MatchRule,
OsmFeature,
RouteMatch,
RoutePattern,
Source,
)
from app.pipeline.route_layer import (
_GtfsPatternSeed,
_OsmRouteCandidate,
_OsmRouteCandidateIndex,
_RouteLayerOverrides,
_choose_osm_candidate,
rebuild_route_layer,
)
from app.pipeline.utils import geometry_json_and_bbox, norm_ref
def test_directional_candidate_selection_prefers_matching_osm_geometry_orientation():
gtfs_route = GtfsRoute(
dataset_id=1,
route_id="u5",
short_name="U5",
route_type=1,
mode="subway",
route_key=norm_ref("U5"),
)
forward_geometry, forward_bbox = geometry_json_and_bbox(LineString([(0, 0), (1, 0), (2, 0)]))
reverse_geometry, reverse_bbox = geometry_json_and_bbox(LineString([(2, 0), (1, 0), (0, 0)]))
forward_seed = _GtfsPatternSeed(
route=gtfs_route,
shape_id="forward",
trip_id="trip-forward",
geometry_text=forward_geometry,
geometry_source="gtfs_shape",
bbox=forward_bbox,
start_point=Point(0, 0),
end_point=Point(2, 0),
center_point=Point(1, 0),
)
reverse_seed = _GtfsPatternSeed(
route=gtfs_route,
shape_id="reverse",
trip_id="trip-reverse",
geometry_text=reverse_geometry,
geometry_source="gtfs_shape",
bbox=reverse_bbox,
start_point=Point(2, 0),
end_point=Point(0, 0),
center_point=Point(1, 0),
)
osm_forward = _route_candidate(
feature_id=10,
osm_id="forward",
geometry_text=forward_geometry or "",
bbox=forward_bbox,
)
osm_reverse = _route_candidate(
feature_id=11,
osm_id="reverse",
geometry_text=reverse_geometry or "",
bbox=reverse_bbox,
)
candidate_index = _OsmRouteCandidateIndex(
by_ref_mode={(norm_ref("U5"), "subway"): [osm_forward, osm_reverse]},
by_id={osm_forward.feature.id: osm_forward, osm_reverse.feature.id: osm_reverse},
)
overrides = _RouteLayerOverrides(accepted_by_gtfs_route_id={}, rejected_by_gtfs_route_id={})
chosen_forward, _, forward_reasons = _choose_osm_candidate(forward_seed, candidate_index, overrides)
chosen_reverse, _, reverse_reasons = _choose_osm_candidate(reverse_seed, candidate_index, overrides)
assert chosen_forward is not None
assert chosen_forward.feature.osm_id == "forward"
assert forward_reasons["directional_match"]["projection_direction"] == "forward"
assert chosen_reverse is not None
assert chosen_reverse.feature.osm_id == "reverse"
assert reverse_reasons["directional_match"]["projection_direction"] == "forward"
def test_opposite_gtfs_shapes_share_osm_visual_route_and_reverse_journey_segment():
reset_db()
with session_scope() as session:
gtfs_source = Source(name="Directional GTFS", kind="gtfs", url="./directional.zip")
osm_source = Source(name="Directional OSM", kind="osm_geojson", url="./directional.geojson")
session.add_all([gtfs_source, osm_source])
session.flush()
gtfs_dataset = Dataset(
source_id=gtfs_source.id,
kind="gtfs",
local_path="./directional.zip",
sha256="gtfs",
is_active=True,
status="imported",
)
osm_dataset = Dataset(
source_id=osm_source.id,
kind="osm_geojson",
local_path="./directional.geojson",
sha256="osm",
is_active=True,
status="imported",
)
session.add_all([gtfs_dataset, osm_dataset])
session.flush()
stops = [
GtfsStop(dataset_id=gtfs_dataset.id, stop_id="A", name="Alpha", lat=0.0, lon=0.0),
GtfsStop(dataset_id=gtfs_dataset.id, stop_id="B", name="Beta", lat=0.0, lon=1.0),
GtfsStop(dataset_id=gtfs_dataset.id, stop_id="C", name="Gamma", lat=0.0, lon=2.0),
]
session.add_all(stops)
gtfs_route = GtfsRoute(
dataset_id=gtfs_dataset.id,
route_id="u5",
short_name="U5",
long_name="Alpha - Gamma",
route_type=1,
mode="subway",
operator_name="Example Transit",
route_key=norm_ref("U5"),
)
session.add(gtfs_route)
forward_geometry, forward_bbox = geometry_json_and_bbox(LineString([(0, 0), (1, 0), (2, 0)]))
reverse_geometry, reverse_bbox = geometry_json_and_bbox(LineString([(2, 0), (1, 0), (0, 0)]))
session.add_all(
[
GtfsShape(
dataset_id=gtfs_dataset.id,
shape_id="u5_outbound",
geometry_geojson=forward_geometry or "",
min_lon=forward_bbox[0],
min_lat=forward_bbox[1],
max_lon=forward_bbox[2],
max_lat=forward_bbox[3],
),
GtfsShape(
dataset_id=gtfs_dataset.id,
shape_id="u5_inbound",
geometry_geojson=reverse_geometry or "",
min_lon=reverse_bbox[0],
min_lat=reverse_bbox[1],
max_lon=reverse_bbox[2],
max_lat=reverse_bbox[3],
),
GtfsTrip(
dataset_id=gtfs_dataset.id,
route_id="u5",
trip_id="u5_out",
service_id="daily",
shape_id="u5_outbound",
),
GtfsTrip(
dataset_id=gtfs_dataset.id,
route_id="u5",
trip_id="u5_back",
service_id="daily",
shape_id="u5_inbound",
),
]
)
session.add_all(
[
_stop_time(gtfs_dataset.id, "u5_out", "A", 1, "08:00:00", 28800),
_stop_time(gtfs_dataset.id, "u5_out", "B", 2, "08:05:00", 29100),
_stop_time(gtfs_dataset.id, "u5_out", "C", 3, "08:10:00", 29400),
_stop_time(gtfs_dataset.id, "u5_back", "C", 1, "08:30:00", 30600),
_stop_time(gtfs_dataset.id, "u5_back", "B", 2, "08:35:00", 30900),
_stop_time(gtfs_dataset.id, "u5_back", "A", 3, "08:40:00", 31200),
]
)
osm_geometry, osm_bbox = geometry_json_and_bbox(LineString([(0, 0), (1, 0), (2, 0)]))
osm_feature = OsmFeature(
dataset_id=osm_dataset.id,
osm_type="relation",
osm_id="5005",
kind="route",
mode="subway",
name="U5 Alpha - Gamma",
ref="U5",
operator="Example Transit",
network="Example Network",
geometry_geojson=osm_geometry,
min_lon=osm_bbox[0],
min_lat=osm_bbox[1],
max_lon=osm_bbox[2],
max_lat=osm_bbox[3],
route_key=norm_ref("U5"),
operator_key="example transit",
)
session.add(osm_feature)
session.flush()
session.add(
RouteMatch(
gtfs_route_id=gtfs_route.id,
osm_feature_id=osm_feature.id,
confidence=100.0,
status="accepted",
rule_source="manual",
)
)
session.flush()
result = rebuild_route_layer(session)
assert result["route_patterns"] == 1
assert result["route_pattern_links"] == 2
patterns = session.scalars(select(RoutePattern)).all()
assert len(patterns) == 1
assert patterns[0].source_kind == "osm"
links = session.scalars(select(GtfsRoutePatternLink).order_by(GtfsRoutePatternLink.shape_id)).all()
assert len({link.route_pattern_id for link in links}) == 1
reasons = {link.shape_id: json.loads(link.reasons_json or "{}") for link in links}
directions = {shape_id: reason["direction"]["direction"] for shape_id, reason in reasons.items()}
assert directions == {"u5_inbound": "reverse", "u5_outbound": "forward"}
assert {reason["manual"] for reason in reasons.values()} == {"accepted_route_match"}
journey = find_journeys(session, from_stop_id="C", to_stop_id="A", departure="08:00", max_transfers=0, limit=1)
assert journey["journeys"]
leg = journey["journeys"][0]["legs"][0]
assert leg["trip_id"] == "u5_back"
assert leg["route_pattern_id"] == patterns[0].id
coords = journey["journeys"][0]["features"]["features"][0]["geometry"]["coordinates"]
assert tuple(coords[0]) == (2.0, 0.0)
assert tuple(coords[-1]) == (0.0, 0.0)
def test_journey_geometry_rejects_remote_route_pattern_and_uses_trip_shape():
reset_db()
with session_scope() as session:
source = Source(name="Rail GTFS", kind="gtfs", url="./rail.zip")
session.add(source)
session.flush()
dataset = Dataset(
source_id=source.id,
kind="gtfs",
local_path="./rail.zip",
sha256="rail",
is_active=True,
status="imported",
)
session.add(dataset)
session.flush()
session.add_all(
[
GtfsStop(dataset_id=dataset.id, stop_id="A", name="Dresden", lat=51.05, lon=13.74),
GtfsStop(dataset_id=dataset.id, stop_id="B", name="Leipzig", lat=51.34, lon=12.38),
]
)
route = GtfsRoute(
dataset_id=dataset.id,
route_id="ice50",
short_name="ICE 50",
long_name="Dresden - Leipzig",
route_type=2,
mode="train",
route_key=norm_ref("ICE 50"),
)
session.add(route)
actual_geometry, actual_bbox = geometry_json_and_bbox(LineString([(13.74, 51.05), (13.0, 51.2), (12.38, 51.34)]))
remote_geometry, remote_bbox = geometry_json_and_bbox(LineString([(8.68, 50.11), (9.9, 50.4), (12.38, 51.34)]))
session.add_all(
[
GtfsShape(
dataset_id=dataset.id,
shape_id="actual_shape",
geometry_geojson=actual_geometry or "",
min_lon=actual_bbox[0],
min_lat=actual_bbox[1],
max_lon=actual_bbox[2],
max_lat=actual_bbox[3],
),
GtfsTrip(dataset_id=dataset.id, route_id="ice50", trip_id="ice50_dresden_leipzig", service_id="daily", shape_id="actual_shape"),
_stop_time(dataset.id, "ice50_dresden_leipzig", "A", 1, "08:00:00", 28800),
_stop_time(dataset.id, "ice50_dresden_leipzig", "B", 2, "09:00:00", 32400),
]
)
session.flush()
remote_pattern = RoutePattern(
pattern_key="osm:remote",
route_ref="ICE 50",
route_name="Leipzig - Frankfurt",
mode="train",
source_kind="osm",
status="active",
gtfs_route_id=route.id,
geometry_geojson=remote_geometry or "",
min_lon=remote_bbox[0],
min_lat=remote_bbox[1],
max_lon=remote_bbox[2],
max_lat=remote_bbox[3],
confidence=95.0,
)
session.add(remote_pattern)
session.flush()
session.add(
GtfsRoutePatternLink(
dataset_id=dataset.id,
gtfs_route_id=route.id,
route_id=route.route_id,
shape_id="actual_shape",
route_pattern_id=remote_pattern.id,
confidence=95.0,
status="linked",
source_kind="osm",
)
)
session.flush()
journey = find_journeys(session, from_stop_id="A", to_stop_id="B", departure="07:55", max_transfers=0, limit=1)
assert journey["journeys"]
feature = journey["journeys"][0]["features"]["features"][0]
assert feature["properties"]["geometry_source"] == "gtfs_shape"
assert feature["properties"]["route_pattern_id"] is None
coords = feature["geometry"]["coordinates"]
assert tuple(coords[0]) == (13.74, 51.05)
assert tuple(coords[-1]) == (12.38, 51.34)
def test_journey_geometry_uses_alternate_route_pattern_that_covers_leg_stops():
reset_db()
with session_scope() as session:
source = Source(name="Rail GTFS", kind="gtfs", url="./rail.zip")
session.add(source)
session.flush()
dataset = Dataset(
source_id=source.id,
kind="gtfs",
local_path="./rail.zip",
sha256="rail",
is_active=True,
status="imported",
)
session.add(dataset)
session.flush()
session.add_all(
[
GtfsStop(dataset_id=dataset.id, stop_id="A", name="Dresden", lat=51.05, lon=13.74),
GtfsStop(dataset_id=dataset.id, stop_id="B", name="Leipzig", lat=51.34, lon=12.38),
]
)
route = GtfsRoute(
dataset_id=dataset.id,
route_id="ice50",
short_name="ICE 50",
long_name="Dresden - Leipzig",
route_type=2,
mode="train",
route_key=norm_ref("ICE 50"),
)
session.add(route)
session.add_all(
[
GtfsTrip(dataset_id=dataset.id, route_id="ice50", trip_id="ice50_dresden_leipzig", service_id="daily"),
_stop_time(dataset.id, "ice50_dresden_leipzig", "A", 1, "08:00:00", 28800),
_stop_time(dataset.id, "ice50_dresden_leipzig", "B", 2, "09:00:00", 32400),
]
)
session.flush()
remote_geometry, remote_bbox = geometry_json_and_bbox(LineString([(8.68, 50.11), (9.9, 50.4), (12.38, 51.34)]))
valid_geometry, valid_bbox = geometry_json_and_bbox(LineString([(13.74, 51.05), (13.0, 51.2), (12.38, 51.34)]))
remote_pattern = RoutePattern(
pattern_key="osm:remote",
route_ref="ICE 50",
route_name="Leipzig - Frankfurt",
mode="train",
source_kind="osm",
status="active",
gtfs_route_id=route.id,
geometry_geojson=remote_geometry or "",
min_lon=remote_bbox[0],
min_lat=remote_bbox[1],
max_lon=remote_bbox[2],
max_lat=remote_bbox[3],
confidence=95.0,
)
valid_pattern = RoutePattern(
pattern_key="osm:valid",
route_ref="ICE 50",
route_name="Dresden - Leipzig",
mode="train",
source_kind="osm",
status="active",
gtfs_route_id=route.id,
geometry_geojson=valid_geometry or "",
min_lon=valid_bbox[0],
min_lat=valid_bbox[1],
max_lon=valid_bbox[2],
max_lat=valid_bbox[3],
confidence=80.0,
)
session.add_all([remote_pattern, valid_pattern])
session.flush()
session.add(
GtfsRoutePatternLink(
dataset_id=dataset.id,
gtfs_route_id=route.id,
route_id=route.route_id,
shape_id="__route__",
route_pattern_id=remote_pattern.id,
confidence=95.0,
status="linked",
source_kind="osm",
)
)
session.flush()
journey = find_journeys(session, from_stop_id="A", to_stop_id="B", departure="07:55", max_transfers=0, limit=1)
feature = journey["journeys"][0]["features"]["features"][0]
assert feature["properties"]["geometry_source"] == "route_layer:osm:alternate"
assert feature["properties"]["route_pattern_id"] == valid_pattern.id
coords = feature["geometry"]["coordinates"]
assert tuple(coords[0]) == (13.74, 51.05)
assert tuple(coords[-1]) == (12.38, 51.34)
def test_journey_without_route_geometry_falls_back_to_intermediate_stop_sequence():
reset_db()
with session_scope() as session:
source = Source(name="Stop Sequence GTFS", kind="gtfs", url="./stop-sequence.zip")
session.add(source)
session.flush()
dataset = Dataset(
source_id=source.id,
kind="gtfs",
local_path="./stop-sequence.zip",
sha256="stop-sequence",
is_active=True,
status="imported",
)
session.add(dataset)
session.flush()
session.add_all(
[
GtfsStop(dataset_id=dataset.id, stop_id="A", name="Alpha", lat=0.0, lon=0.0),
GtfsStop(dataset_id=dataset.id, stop_id="B", name="Beta", lat=1.0, lon=1.0),
GtfsStop(dataset_id=dataset.id, stop_id="C", name="Gamma", lat=0.0, lon=2.0),
GtfsRoute(
dataset_id=dataset.id,
route_id="r1",
short_name="R1",
long_name="Alpha - Gamma",
route_type=2,
mode="train",
route_key=norm_ref("R1"),
),
GtfsTrip(dataset_id=dataset.id, route_id="r1", trip_id="trip_with_middle_stop", service_id="daily"),
_stop_time(dataset.id, "trip_with_middle_stop", "A", 1, "08:00:00", 28800),
_stop_time(dataset.id, "trip_with_middle_stop", "B", 2, "08:10:00", 29400),
_stop_time(dataset.id, "trip_with_middle_stop", "C", 3, "08:20:00", 30000),
]
)
session.flush()
journey = find_journeys(session, from_stop_id="A", to_stop_id="C", departure="07:55", max_transfers=0, limit=1)
feature = journey["journeys"][0]["features"]["features"][0]
assert feature["properties"]["geometry_source"] == "stop_sequence_fallback"
assert [tuple(coord) for coord in feature["geometry"]["coordinates"]] == [(0.0, 0.0), (1.0, 1.0), (2.0, 0.0)]
def test_journey_stitches_partial_route_pattern_to_remaining_stop_sequence():
reset_db()
with session_scope() as session:
source = Source(name="Partial Rail GTFS", kind="gtfs", url="./partial.zip")
session.add(source)
session.flush()
dataset = Dataset(
source_id=source.id,
kind="gtfs",
local_path="./partial.zip",
sha256="partial",
is_active=True,
status="imported",
)
session.add(dataset)
session.flush()
session.add_all(
[
GtfsStop(dataset_id=dataset.id, stop_id="A", name="Berlin", lat=0.0, lon=0.0),
GtfsStop(dataset_id=dataset.id, stop_id="B", name="Intermediate", lat=-0.9, lon=1.4),
GtfsStop(dataset_id=dataset.id, stop_id="C", name="Leipzig", lat=-1.0, lon=2.0),
]
)
route = GtfsRoute(
dataset_id=dataset.id,
route_id="ice15",
short_name="ICE 15",
long_name="Berlin - Leipzig",
route_type=2,
mode="train",
route_key=norm_ref("ICE 15"),
)
session.add(route)
session.add_all(
[
GtfsTrip(dataset_id=dataset.id, route_id="ice15", trip_id="ice15_partial", service_id="daily"),
_stop_time(dataset.id, "ice15_partial", "A", 1, "08:00:00", 28800),
_stop_time(dataset.id, "ice15_partial", "B", 2, "08:30:00", 30600),
_stop_time(dataset.id, "ice15_partial", "C", 3, "09:00:00", 32400),
]
)
partial_geometry, partial_bbox = geometry_json_and_bbox(LineString([(0.0, 0.0), (0.4, -0.35), (0.8, -0.65)]))
partial_pattern = RoutePattern(
pattern_key="osm:partial",
route_ref="ICE 15",
route_name="Berlin partial",
mode="train",
source_kind="osm",
status="active",
gtfs_route_id=route.id,
geometry_geojson=partial_geometry or "",
min_lon=partial_bbox[0],
min_lat=partial_bbox[1],
max_lon=partial_bbox[2],
max_lat=partial_bbox[3],
confidence=80.0,
)
session.add(partial_pattern)
session.flush()
session.add(
GtfsRoutePatternLink(
dataset_id=dataset.id,
gtfs_route_id=route.id,
route_id=route.route_id,
shape_id="__route__",
route_pattern_id=partial_pattern.id,
confidence=80.0,
status="linked",
source_kind="osm",
)
)
session.flush()
journey = find_journeys(session, from_stop_id="A", to_stop_id="C", departure="07:55", max_transfers=0, limit=1)
feature = journey["journeys"][0]["features"]["features"][0]
assert feature["properties"]["geometry_source"] == "route_layer:osm:stitched"
assert feature["properties"]["route_pattern_id"] == partial_pattern.id
coords = [tuple(coord) for coord in feature["geometry"]["coordinates"]]
assert coords[:3] == [(0.0, 0.0), (0.4, -0.35), (0.8, -0.65)]
assert coords[-2:] == [(1.4, -0.9), (2.0, -1.0)]
def test_partial_route_layer_stitching_precedes_legacy_gtfs_route_geometry():
reset_db()
with session_scope() as session:
source = Source(name="Partial Destination GTFS", kind="gtfs", url="./partial-destination.zip")
session.add(source)
session.flush()
dataset = Dataset(
source_id=source.id,
kind="gtfs",
local_path="./partial-destination.zip",
sha256="partial-destination",
is_active=True,
status="imported",
)
session.add(dataset)
session.flush()
session.add_all(
[
GtfsStop(dataset_id=dataset.id, stop_id="A", name="Origin", lat=0.0, lon=0.0),
GtfsStop(dataset_id=dataset.id, stop_id="B", name="Before known path", lat=0.2, lon=1.0),
GtfsStop(dataset_id=dataset.id, stop_id="C", name="Destination", lat=0.0, lon=2.0),
]
)
legacy_geometry, legacy_bbox = geometry_json_and_bbox(LineString([(0.0, 0.0), (2.0, 0.0)]))
route = GtfsRoute(
dataset_id=dataset.id,
route_id="ice15",
short_name="ICE 15",
long_name="Origin - Destination",
route_type=2,
mode="train",
geometry_geojson=legacy_geometry,
min_lon=legacy_bbox[0],
min_lat=legacy_bbox[1],
max_lon=legacy_bbox[2],
max_lat=legacy_bbox[3],
route_key=norm_ref("ICE 15"),
)
session.add(route)
session.add_all(
[
GtfsTrip(dataset_id=dataset.id, route_id="ice15", trip_id="ice15_partial_destination", service_id="daily"),
_stop_time(dataset.id, "ice15_partial_destination", "A", 1, "08:00:00", 28800),
_stop_time(dataset.id, "ice15_partial_destination", "B", 2, "08:30:00", 30600),
_stop_time(dataset.id, "ice15_partial_destination", "C", 3, "09:00:00", 32400),
]
)
partial_geometry, partial_bbox = geometry_json_and_bbox(LineString([(1.3, 0.1), (1.7, 0.05), (2.0, 0.0)]))
partial_pattern = RoutePattern(
pattern_key="osm:partial-destination",
route_ref="ICE 15",
route_name="Known destination approach",
mode="train",
source_kind="osm",
status="active",
gtfs_route_id=route.id,
geometry_geojson=partial_geometry or "",
min_lon=partial_bbox[0],
min_lat=partial_bbox[1],
max_lon=partial_bbox[2],
max_lat=partial_bbox[3],
confidence=80.0,
)
session.add(partial_pattern)
session.flush()
session.add(
GtfsRoutePatternLink(
dataset_id=dataset.id,
gtfs_route_id=route.id,
route_id=route.route_id,
shape_id="__route__",
route_pattern_id=partial_pattern.id,
confidence=80.0,
status="linked",
source_kind="osm",
)
)
session.flush()
journey = find_journeys(session, from_stop_id="A", to_stop_id="C", departure="07:55", max_transfers=0, limit=1)
feature = journey["journeys"][0]["features"]["features"][0]
assert feature["properties"]["geometry_source"] == "route_layer:osm:stitched"
assert feature["properties"]["route_pattern_id"] == partial_pattern.id
assert [tuple(coord) for coord in feature["geometry"]["coordinates"]] == [
(0.0, 0.0),
(1.0, 0.2),
(1.3, 0.1),
(1.7, 0.05),
(2.0, 0.0),
]
def test_journey_can_transfer_between_gtfs_sources_at_shared_canonical_stop():
reset_db()
with session_scope() as session:
local_source = Source(name="Local Berlin GTFS", kind="gtfs", url="./local.zip")
coach_source = Source(name="Coach Europe GTFS", kind="gtfs", url="./coach.zip")
session.add_all([local_source, coach_source])
session.flush()
local_dataset = Dataset(
source_id=local_source.id,
kind="gtfs",
local_path="./local.zip",
sha256="local",
is_active=True,
status="imported",
)
coach_dataset = Dataset(
source_id=coach_source.id,
kind="gtfs",
local_path="./coach.zip",
sha256="coach",
is_active=True,
status="imported",
)
session.add_all([local_dataset, coach_dataset])
session.flush()
session.add_all(
[
GtfsStop(dataset_id=local_dataset.id, stop_id="zoo", name="Zoologischer Garten", lat=52.5069, lon=13.3320),
GtfsStop(dataset_id=local_dataset.id, stop_id="hbf", name="Berlin Hauptbahnhof", lat=52.5251, lon=13.3696),
GtfsStop(dataset_id=coach_dataset.id, stop_id="berlin_hbf", name="Berlin Central Station (FlixTrain)", lat=52.5252, lon=13.3697),
GtfsStop(dataset_id=coach_dataset.id, stop_id="leipzig", name="Leipzig Hauptbahnhof", lat=51.3450, lon=12.3822),
GtfsRoute(
dataset_id=local_dataset.id,
route_id="u1",
short_name="U1",
long_name="Zoo - Hauptbahnhof",
route_type=1,
mode="subway",
operator_name="Local Transit",
route_key=norm_ref("U1"),
),
GtfsRoute(
dataset_id=coach_dataset.id,
route_id="flix1",
short_name="FLX",
long_name="Berlin - Leipzig",
route_type=3,
mode="coach",
operator_name="Coach Europe",
route_key=norm_ref("FLX"),
),
GtfsTrip(dataset_id=local_dataset.id, route_id="u1", trip_id="u1_early", service_id="daily"),
GtfsTrip(dataset_id=local_dataset.id, route_id="u1", trip_id="u1_late", service_id="daily"),
GtfsTrip(dataset_id=coach_dataset.id, route_id="flix1", trip_id="flix_trip", service_id="daily"),
_stop_time(local_dataset.id, "u1_early", "zoo", 1, "08:00:00", 28800),
_stop_time(local_dataset.id, "u1_early", "hbf", 2, "08:10:00", 29400),
_stop_time(local_dataset.id, "u1_late", "zoo", 1, "08:12:00", 29520),
_stop_time(local_dataset.id, "u1_late", "hbf", 2, "08:14:00", 29640),
_stop_time(coach_dataset.id, "flix_trip", "berlin_hbf", 1, "08:15:00", 29700),
_stop_time(coach_dataset.id, "flix_trip", "leipzig", 2, "09:30:00", 34200),
]
)
session.flush()
result = rebuild_route_layer(session)
assert result["canonical_stops"] == 3
hbf_results = [stop for stop in search_scheduled_stops(session, "Hauptbahnhof") if stop["name"] == "Berlin Hauptbahnhof"]
assert len(hbf_results) == 1
assert set(hbf_results[0]["source_names"]) == {"Local Berlin GTFS", "Coach Europe GTFS"}
journey = find_journeys(
session,
from_stop_id="zoo",
to_stop_id="leipzig",
departure="08:00",
max_transfers=1,
transfer_seconds=0,
limit=1,
)
assert journey["journeys"]
legs = journey["journeys"][0]["legs"]
assert [leg["route_ref"] for leg in legs] == ["U1", "FLX"]
assert [leg["dataset_id"] for leg in legs] == [local_dataset.id, coach_dataset.id]
assert {leg["source_name"] for leg in legs} == {"Local Berlin GTFS", "Coach Europe GTFS"}
assert legs[0]["trip_id"] == "u1_late"
assert journey["journeys"][0]["departure_time"] == "08:12:00"
def test_manual_canonical_stop_link_replays_against_refreshed_target_dataset():
reset_db()
with session_scope() as session:
local_source = Source(name="Local GTFS", kind="gtfs", url="./local.zip")
coach_source = Source(name="Coach GTFS", kind="gtfs", url="./coach.zip")
session.add_all([local_source, coach_source])
session.flush()
local_dataset_v1 = Dataset(
source_id=local_source.id,
kind="gtfs",
local_path="./local-v1.zip",
sha256="local-v1",
is_active=True,
status="imported",
)
coach_dataset = Dataset(
source_id=coach_source.id,
kind="gtfs",
local_path="./coach.zip",
sha256="coach",
is_active=True,
status="imported",
)
session.add_all([local_dataset_v1, coach_dataset])
session.flush()
session.add_all(
[
GtfsStop(dataset_id=local_dataset_v1.id, stop_id="hbf", name="Berlin Hauptbahnhof", lat=52.5251, lon=13.3696),
GtfsStop(dataset_id=coach_dataset.id, stop_id="berlin_hbf", name="Berlin Central Station", lat=52.40, lon=13.20),
_stop_time(local_dataset_v1.id, "local-trip", "hbf", 1, "08:00:00", 28800),
_stop_time(coach_dataset.id, "coach-trip", "berlin_hbf", 1, "08:05:00", 29100),
]
)
session.flush()
rebuild_route_layer(session)
target_link = session.scalar(
select(CanonicalStopLink).where(
CanonicalStopLink.object_type == "gtfs_stop",
CanonicalStopLink.dataset_id == local_dataset_v1.id,
CanonicalStopLink.external_id == "hbf",
)
)
assert target_link is not None
target_stop = target_link.canonical_stop
session.add(
MatchRule(
rule_type="link_canonical_stop",
selector_json=json.dumps(
{
"object_type": "gtfs_stop",
"source_id": coach_source.id,
"dataset_id": coach_dataset.id,
"external_id": "berlin_hbf",
},
separators=(",", ":"),
),
action_json=json.dumps(
{
"target_stop_key": target_stop.stop_key,
"target_name": target_stop.name,
"target_lat": target_stop.lat,
"target_lon": target_stop.lon,
"target_gtfs_stops": [
{
"source_id": local_source.id,
"dataset_id": local_dataset_v1.id,
"external_id": "hbf",
}
],
},
separators=(",", ":"),
),
)
)
local_dataset_v1.is_active = False
local_dataset_v2 = Dataset(
source_id=local_source.id,
kind="gtfs",
local_path="./local-v2.zip",
sha256="local-v2",
is_active=True,
status="imported",
)
session.add(local_dataset_v2)
session.flush()
session.add_all(
[
GtfsStop(dataset_id=local_dataset_v2.id, stop_id="hbf", name="Berlin Hauptbahnhof", lat=52.5252, lon=13.3697),
_stop_time(local_dataset_v2.id, "local-trip-v2", "hbf", 1, "08:00:00", 28800),
]
)
session.flush()
rebuild_route_layer(session)
refreshed_target_link = session.scalar(
select(CanonicalStopLink).where(
CanonicalStopLink.object_type == "gtfs_stop",
CanonicalStopLink.dataset_id == local_dataset_v2.id,
CanonicalStopLink.external_id == "hbf",
)
)
coach_link = session.scalar(
select(CanonicalStopLink).where(
CanonicalStopLink.object_type == "gtfs_stop",
CanonicalStopLink.dataset_id == coach_dataset.id,
CanonicalStopLink.external_id == "berlin_hbf",
)
)
assert refreshed_target_link is not None
assert coach_link is not None
assert refreshed_target_link.canonical_stop_id == coach_link.canonical_stop_id
assert refreshed_target_link.canonical_stop.stop_key == f"gtfs:{local_dataset_v2.id}:hbf"
def test_journey_service_date_filters_duplicate_clock_time_trips():
reset_db()
with session_scope() as session:
source = Source(name="Calendar GTFS", kind="gtfs", url="./calendar.zip")
session.add(source)
session.flush()
dataset = Dataset(
source_id=source.id,
kind="gtfs",
local_path="./calendar.zip",
sha256="calendar",
is_active=True,
status="imported",
)
session.add(dataset)
session.flush()
session.add_all(
[
GtfsStop(dataset_id=dataset.id, stop_id="berlin", name="Berlin Hbf", lat=52.525, lon=13.369),
GtfsStop(dataset_id=dataset.id, stop_id="hamburg", name="Hamburg Hbf", lat=53.553, lon=10.006),
GtfsRoute(
dataset_id=dataset.id,
route_id="ice28",
short_name="ICE 28",
long_name="Berlin - Hamburg",
route_type=2,
mode="train",
operator_name="DB",
route_key=norm_ref("ICE 28"),
),
GtfsTrip(dataset_id=dataset.id, route_id="ice28", trip_id="weekday_trip", service_id="weekday"),
GtfsTrip(dataset_id=dataset.id, route_id="ice28", trip_id="saturday_trip", service_id="saturday"),
GtfsCalendar(
dataset_id=dataset.id,
service_id="weekday",
monday=True,
tuesday=True,
wednesday=True,
thursday=True,
friday=True,
saturday=False,
sunday=False,
start_date=20260601,
end_date=20260630,
),
GtfsCalendar(
dataset_id=dataset.id,
service_id="saturday",
monday=False,
tuesday=False,
wednesday=False,
thursday=False,
friday=False,
saturday=True,
sunday=False,
start_date=20260601,
end_date=20260630,
),
GtfsCalendarDate(dataset_id=dataset.id, service_id="weekday", date=20260627, exception_type=2),
_stop_time(dataset.id, "weekday_trip", "berlin", 1, "09:37:00", 34620),
_stop_time(dataset.id, "weekday_trip", "hamburg", 2, "11:20:00", 40800),
_stop_time(dataset.id, "saturday_trip", "berlin", 1, "09:37:00", 34620),
_stop_time(dataset.id, "saturday_trip", "hamburg", 2, "11:24:00", 41040),
]
)
session.flush()
rebuild_route_layer(session)
saturday = find_journeys(
session,
from_stop_id="berlin",
to_stop_id="hamburg",
departure="09:00",
max_transfers=0,
limit=5,
service_date="2026-06-27",
)
monday = find_journeys(
session,
from_stop_id="berlin",
to_stop_id="hamburg",
departure="09:00",
max_transfers=0,
limit=5,
service_date="2026-06-29",
)
assert [journey["legs"][0]["trip_id"] for journey in saturday["journeys"]] == ["saturday_trip"]
assert [journey["legs"][0]["trip_id"] for journey in monday["journeys"]] == ["weekday_trip"]
def _route_candidate(
feature_id: int,
osm_id: str,
geometry_text: str,
bbox: tuple[float | None, float | None, float | None, float | None],
) -> _OsmRouteCandidate:
feature = OsmFeature(
id=feature_id,
dataset_id=1,
osm_type="relation",
osm_id=osm_id,
kind="route",
mode="subway",
name=f"U5 {osm_id}",
ref="U5",
geometry_geojson=geometry_text,
min_lon=bbox[0],
min_lat=bbox[1],
max_lon=bbox[2],
max_lat=bbox[3],
route_key=norm_ref("U5"),
)
return _OsmRouteCandidate(
feature=feature,
geom=shape(json.loads(geometry_text)),
geometry_text=geometry_text,
bbox=bbox,
ref_key=norm_ref("U5"),
mode="subway",
)
def _stop_time(dataset_id: int, trip_id: str, stop_id: str, sequence: int, time: str, seconds: int) -> GtfsStopTime:
return GtfsStopTime(
dataset_id=dataset_id,
trip_id=trip_id,
stop_id=stop_id,
stop_sequence=sequence,
arrival_time=time,
departure_time=time,
arrival_seconds=seconds,
departure_seconds=seconds,
)