Files
meubility-workbench/app/journey.py
2026-07-01 23:29:51 +02:00

5386 lines
206 KiB
Python

from __future__ import annotations
import json
import math
import re
import threading
import time
from dataclasses import dataclass
from datetime import date, datetime
from typing import Iterator, Optional
from shapely.geometry import LineString, MultiLineString, Point, mapping, shape
from shapely.ops import linemerge, substring
from sqlalchemy import and_, bindparam, case, exists, func, or_, select, text
from sqlalchemy.orm import Session, aliased
from app.address_search import (
address_by_token,
address_point_by_token,
address_point_token,
address_token,
coordinate_token,
is_coordinate_token,
is_address_point_token,
is_address_token,
is_location_token,
parse_coordinate_token,
)
from app.config import settings
from app.gtfs_storage import (
GTFS_STOP_TIME_COLUMNS,
SQLITE_IN_CHUNK_SIZE,
all_scheduled_stop_ids,
execute_sidecar_query,
has_scheduled_stop as storage_has_scheduled_stop,
scheduled_stop_ids as storage_scheduled_stop_ids,
stop_times_by_trip as storage_stop_times_by_trip,
stop_times_for_trip_range,
uses_sidecar_stop_times,
)
from app.models import (
CanonicalStop,
CanonicalStopLink,
Dataset,
GtfsCalendar,
GtfsCalendarDate,
GtfsRoute,
GtfsShape,
GtfsStop,
GtfsStopTime,
GtfsTrip,
OsmAddress,
OsmFeature,
RoutePattern,
Source,
)
from app.osm_storage import query_osm_features
from app.pipeline.route_layer import (
canonical_stop_for_gtfs_stop,
logical_stop_group_id,
route_pattern_for_trip,
)
from app.routing import route_between_points, snap_point_to_routing_graph
from app.serializers import feature_collection
MAX_DIRECT_ROWS = 12000
MAX_TRANSFER_BOARDINGS = 350
MAX_TARGET_DESTINATION_ARRIVALS = 1400
MAX_TARGET_SECOND_LEGS_PER_STOP = 48
MAX_TARGET_TRANSFER_CANDIDATES = 4500
MAX_BACKWARD_SECOND_LEG_OPTIONS = 160
OSM_STOP_MATCH_RADIUS_DEG = 0.0012
LEG_GEOMETRY_MAX_STOP_DISTANCE_DEG = 0.08
MAX_STOP_SEARCH_ROWS = 700
MAX_GROUP_STOP_IDS = 120
MAX_ROUTER_BOARDING_CANDIDATES = 2200
MAX_ROUTER_TRANSIT_LEGS = 6
MAX_JOURNEY_DATASET_PAIRS = 40
WALKING_TRANSFER_RADIUS_M = 450
WALKING_TRANSFER_RADIUS_DEG = WALKING_TRANSFER_RADIUS_M / 111_320
WALKING_TRANSFER_SPEED_MPS = 1.25
MAX_WALKING_TRANSFER_SOURCE_STOPS = 80
MAX_WALKING_TRANSFER_NEIGHBORS_PER_STOP = 8
ACCESS_TRANSFER_MAX_SECONDS = 45 * 60
MAX_ACCESS_TRANSFER_CANDIDATES = 4
PUBLIC_TRANSPORT_WALK_OPTION_MAX_SECONDS = 45 * 60
ADDRESS_ACCESS_RADIUS_M = 1800
ADDRESS_ACCESS_MAX_SECONDS = 30 * 60
ADDRESS_ACCESS_STOP_CANDIDATES = 4
ADDRESS_ACCESS_MAX_PAIR_CANDIDATES = 8
ADDRESS_ACCESS_MAX_DEEP_PAIR_CANDIDATES = 4
ADDRESS_ACCESS_SHORT_DIRECT_WALK_SECONDS = 20 * 60
ADDRESS_ACCESS_LONG_DISTANCE_HUB_THRESHOLD_M = 50_000
ADDRESS_ACCESS_MAJOR_HUB_RADIUS_M = 12_000
ADDRESS_ACCESS_MAJOR_HUB_CANDIDATES = 3
ADDRESS_ACCESS_NORMAL_PRIORITY = 100
ADDRESS_ACCESS_MAJOR_HUB_PRIORITY = 10
WALK_GEOMETRY_CACHE_TTL_SECONDS = 10 * 60
WALK_GEOMETRY_CACHE_MAX_ENTRIES = 1024
LEG_GEOMETRY_CACHE_TTL_SECONDS = 10 * 60
LEG_GEOMETRY_CACHE_MAX_ENTRIES = 2048
STOP_GROUP_PREFIX = "group:"
STOP_EXACT_PREFIX = "stop:"
STOP_PLACE_PREFIX = "place:"
_walk_geometry_cache_lock = threading.RLock()
_walk_geometry_cache: dict[tuple[float, float, float, float], tuple[float, tuple[dict | None, float, float | None]]] = {}
_leg_geometry_cache_lock = threading.RLock()
_leg_geometry_cache: dict[tuple[object, ...], tuple[float, dict | None, str, int | None]] = {}
@dataclass(frozen=True)
class StopSummary:
id: int
dataset_id: int
stop_id: str
name: str | None
lat: float | None
lon: float | None
@dataclass(frozen=True)
class StopSelection:
display: StopSummary
stop_ids_by_dataset: dict[int, tuple[str, ...]]
canonical_stop_id: int | None = None
@property
def dataset_id(self) -> int:
return next(iter(self.stop_ids_by_dataset))
@property
def stop_ids(self) -> tuple[str, ...]:
return self.stop_ids_by_dataset[self.dataset_id]
@property
def dataset_ids(self) -> tuple[int, ...]:
return tuple(self.stop_ids_by_dataset)
@dataclass(frozen=True)
class _AccessStopCandidate:
token: str
selection: StopSelection
distance_m: float
priority: int = ADDRESS_ACCESS_NORMAL_PRIORITY
def search_scheduled_stops(
db: Session,
query: str | None = None,
source_ids: list[int] | None = None,
limit: int = 25,
bbox: tuple[float, float, float, float] | None = None,
) -> list[dict]:
"""Return stops that have imported stop_times.
The importer may intentionally cap stop_times for large feeds. Searching only
scheduled stops prevents the UI from offering stops that cannot route yet.
"""
active_dataset_ids = _active_gtfs_dataset_ids(db, source_ids=source_ids)
if not active_dataset_ids:
return []
stmt = (
select(GtfsStop, Source.id, Source.name)
.join(Dataset, Dataset.id == GtfsStop.dataset_id)
.join(Source, Source.id == Dataset.source_id)
.where(GtfsStop.dataset_id.in_(active_dataset_ids))
)
q = (query or "").strip()
if q:
pattern = f"%{q}%"
tokens = [token for token in re.split(r"[\s,;/]+", q) if token]
token_filters = [
or_(GtfsStop.name.ilike(f"%{token}%"), GtfsStop.stop_id.ilike(f"%{token}%"))
for token in tokens
]
where_parts = [GtfsStop.name.ilike(pattern), GtfsStop.stop_id.ilike(pattern)]
if token_filters:
where_parts.append(and_(*token_filters))
stmt = stmt.where(or_(*where_parts))
rank = case(
(GtfsStop.name.ilike(q), 0),
(GtfsStop.name.ilike(f"{q}%"), 1),
(GtfsStop.name.ilike(pattern), 2),
(GtfsStop.stop_id.ilike(f"{q}%"), 3),
else_=4,
)
if bbox is not None:
stmt = stmt.order_by(rank, *_bbox_order_expressions(GtfsStop, bbox), GtfsStop.name, GtfsStop.id)
else:
stmt = stmt.order_by(rank, GtfsStop.name, GtfsStop.id)
else:
if bbox is not None:
stmt = stmt.order_by(*_bbox_order_expressions(GtfsStop, bbox), GtfsStop.name, GtfsStop.id)
else:
stmt = stmt.order_by(GtfsStop.name, GtfsStop.id)
stmt = stmt.limit(MAX_STOP_SEARCH_ROWS * (3 if bbox is not None else 1))
groups: dict[tuple[int, str], dict] = {}
for stop, source_id, source_name in db.execute(stmt).all():
group_id = logical_stop_group_id(stop)
key = (stop.dataset_id, group_id)
rank_value = _stop_match_rank(stop, q)
group = groups.setdefault(
key,
{
"dataset_id": stop.dataset_id,
"group_id": group_id,
"source_id": source_id,
"source_name": source_name,
"rank": rank_value,
"matches": [],
},
)
group["rank"] = min(int(group["rank"]), rank_value)
group["matches"].append(stop)
if not groups:
return []
parents = _parent_stops_for_groups(db, groups.keys())
scheduled = _scheduled_stops_for_groups(db, groups.keys())
results = []
for key, group in groups.items():
scheduled_stops = scheduled.get(key, [])
if not scheduled_stops:
continue
parent = parents.get(key)
display_stop = parent or _best_display_stop(str(group["group_id"]), group["matches"], scheduled_stops)
canonical = _canonical_stop_for_group(db, scheduled_stops)
name = canonical.name if canonical is not None else display_stop.name
display_parts = _city_stop_display_parts(
name,
display_stop.name,
parent.name if parent is not None else None,
*(stop.name for stop in scheduled_stops),
)
display_name = display_parts["display_name"]
result_id = (
_stop_place_token(canonical.id, display_stop.dataset_id)
if canonical is not None
else _stop_group_token(display_stop.dataset_id, str(group["group_id"]))
)
display_rank = _stop_match_rank(display_stop, q)
result_lat = canonical.lat if canonical is not None else display_stop.lat
result_lon = canonical.lon if canonical is not None else display_stop.lon
bbox_rank, bbox_distance_m = _bbox_rank(result_lat, result_lon, bbox)
results.append(
{
"id": result_id,
"canonical_stop_id": None if canonical is None else canonical.id,
"dataset_id": display_stop.dataset_id,
"stop_id": str(group["group_id"]),
"name": name,
"display_name": display_name,
"city": display_parts["city"],
"local_name": display_parts["local_name"],
"lat": result_lat,
"lon": result_lon,
"source_id": group["source_id"],
"source_name": group["source_name"],
"scheduled": True,
"grouped": True,
"grouped_stop_count": len(scheduled_stops),
"sample_stop_ids": [stop.stop_id for stop in scheduled_stops[:5]],
"_display_rank": display_rank,
"_match_rank": group["rank"],
"_bbox_rank": bbox_rank,
"_bbox_distance_m": bbox_distance_m,
"_importance_rank": _station_importance_rank(
display_name,
name,
display_stop.name,
parent.name if parent is not None else None,
*(stop.name for stop in scheduled_stops),
),
}
)
results.sort(
key=lambda item: (
item["_bbox_rank"],
item["_importance_rank"],
item["_display_rank"],
item["_match_rank"],
item["_bbox_distance_m"],
-(int(item["grouped_stop_count"])),
item["name"] or "",
item["stop_id"],
)
)
if not source_ids or len(source_ids) > 1:
results = _merge_canonical_stop_results(results)
_enrich_canonical_stop_sources(db, results, active_dataset_ids)
results.sort(
key=lambda item: (
item["_bbox_rank"],
item["_importance_rank"],
item["_display_rank"],
item["_match_rank"],
item["_bbox_distance_m"],
-(int(item["grouped_stop_count"])),
item["name"] or "",
item["stop_id"],
)
)
selected = results[: max(1, min(limit, 100))]
for item in selected:
item.pop("_display_rank", None)
item.pop("_match_rank", None)
item.pop("_bbox_rank", None)
item.pop("_bbox_distance_m", None)
item.pop("_importance_rank", None)
return selected
def nearest_scheduled_stops(
db: Session,
*,
lat: float,
lon: float,
source_ids: list[int] | None = None,
limit: int = 3,
radius_m: float = 900,
) -> list[dict]:
active_dataset_ids = _active_gtfs_dataset_ids(db, source_ids=source_ids)
if not active_dataset_ids:
return []
selected_limit = max(1, min(int(limit), 25))
if settings.is_postgresql_database:
rows = _nearest_canonical_stop_rows_postgresql(
db,
lat=lat,
lon=lon,
dataset_ids=active_dataset_ids,
limit=selected_limit * 8,
radius_m=radius_m,
)
rows.extend(
_nearest_visual_stop_rows_postgresql(
db,
lat=lat,
lon=lon,
dataset_ids=active_dataset_ids,
limit=selected_limit * 8,
radius_m=radius_m,
)
)
rows.sort(key=lambda item: (float(item[2] or 0), int(item[0]), int(item[1])))
results: list[dict] = []
seen: set[int] = set()
for canonical_stop_id, preferred_dataset_id, distance_m in rows:
if int(canonical_stop_id) in seen:
continue
try:
selection = _selection_for_canonical_stop(
db,
int(canonical_stop_id),
dataset_ids=active_dataset_ids,
preferred_dataset_id=int(preferred_dataset_id),
)
except ValueError:
continue
seen.add(int(canonical_stop_id))
source = _source_payload_for_dataset_id(db, selection.dataset_id) or {}
payload = _stop_payload(selection.display)
payload.update(
{
"id": _stop_place_token(int(canonical_stop_id), selection.dataset_id),
"kind": "stop",
"canonical_stop_id": int(canonical_stop_id),
"display_name": selection.display.name,
"source_id": source.get("id"),
"source_name": source.get("name"),
"scheduled": True,
"grouped": True,
"grouped_stop_count": sum(len(stop_ids) for stop_ids in selection.stop_ids_by_dataset.values()),
"distance_m": float(distance_m or 0),
}
)
results.append(payload)
if len(results) >= selected_limit:
break
return results
radius_deg = float(radius_m) / 111_320
bbox = (float(lon) - radius_deg, float(lat) - radius_deg, float(lon) + radius_deg, float(lat) + radius_deg)
candidates = search_scheduled_stops(db, source_ids=source_ids, bbox=bbox, limit=selected_limit * 4)
for item in candidates:
if item.get("lat") is None or item.get("lon") is None:
item["distance_m"] = float("inf")
else:
item["distance_m"] = _distance_m(float(lat), float(lon), float(item["lat"]), float(item["lon"]))
item["kind"] = "stop"
candidates = [item for item in candidates if float(item.get("distance_m") or 0) <= radius_m]
candidates.sort(key=lambda item: (float(item.get("distance_m") or 0), item.get("display_name") or item.get("name") or ""))
return candidates[:selected_limit]
def _nearest_canonical_stop_rows_postgresql(
db: Session,
*,
lat: float,
lon: float,
dataset_ids: list[int],
limit: int,
radius_m: float,
) -> list[tuple[int, int, float]]:
radius_deg = float(radius_m) / 111_320
stmt = text(
"""
WITH point AS (
SELECT ST_SetSRID(ST_MakePoint(:lon, :lat), 4326) AS geom
)
SELECT
canonical_stops.id AS canonical_stop_id,
canonical_stop_links.dataset_id AS dataset_id,
ST_DistanceSphere(canonical_stops.geom, point.geom) AS distance_m
FROM canonical_stops
JOIN canonical_stop_links
ON canonical_stop_links.canonical_stop_id = canonical_stops.id
AND canonical_stop_links.object_type = 'gtfs_stop'
JOIN datasets
ON datasets.id = canonical_stop_links.dataset_id
AND datasets.kind = 'gtfs'
AND datasets.is_active IS TRUE
CROSS JOIN point
WHERE canonical_stop_links.dataset_id IN :dataset_ids
AND canonical_stops.geom IS NOT NULL
AND canonical_stops.geom && ST_Expand(point.geom, :radius_deg)
AND ST_DWithin(canonical_stops.geom::geography, point.geom::geography, :radius_m)
GROUP BY canonical_stops.id, canonical_stop_links.dataset_id, canonical_stops.geom, point.geom
ORDER BY canonical_stops.geom <-> point.geom, canonical_stops.id
LIMIT :limit
"""
).bindparams(bindparam("dataset_ids", expanding=True))
rows = db.execute(
stmt,
{
"lat": float(lat),
"lon": float(lon),
"dataset_ids": tuple(dataset_ids),
"radius_deg": radius_deg,
"radius_m": float(radius_m),
"limit": max(1, int(limit)),
},
).all()
return [(int(row[0]), int(row[1]), float(row[2] or 0)) for row in rows]
def _nearest_visual_stop_rows_postgresql(
db: Session,
*,
lat: float,
lon: float,
dataset_ids: list[int],
limit: int,
radius_m: float,
) -> list[tuple[int, int, float]]:
radius_deg = float(radius_m) / 111_320
stmt = text(
"""
WITH point AS (
SELECT ST_SetSRID(ST_MakePoint(:lon, :lat), 4326) AS geom
),
visual_hits AS (
SELECT
osm_link.canonical_stop_id AS canonical_stop_id,
gtfs_link.dataset_id AS dataset_id,
ST_DistanceSphere(osm_features.geom, point.geom) AS distance_m
FROM osm_features
JOIN canonical_stop_links AS osm_link
ON osm_link.object_type = 'osm_feature'
AND osm_link.object_id = osm_features.id
JOIN canonical_stop_links AS gtfs_link
ON gtfs_link.canonical_stop_id = osm_link.canonical_stop_id
AND gtfs_link.object_type = 'gtfs_stop'
JOIN datasets
ON datasets.id = gtfs_link.dataset_id
AND datasets.kind = 'gtfs'
AND datasets.is_active IS TRUE
CROSS JOIN point
WHERE gtfs_link.dataset_id IN :dataset_ids
AND osm_features.kind IN ('stop', 'station', 'terminal')
AND osm_features.geom IS NOT NULL
AND osm_features.geom && ST_Expand(point.geom, :radius_deg)
AND ST_DWithin(osm_features.geom::geography, point.geom::geography, :radius_m)
)
SELECT canonical_stop_id, dataset_id, MIN(distance_m) AS distance_m
FROM visual_hits
GROUP BY canonical_stop_id, dataset_id
ORDER BY MIN(distance_m), canonical_stop_id
LIMIT :limit
"""
).bindparams(bindparam("dataset_ids", expanding=True))
rows = db.execute(
stmt,
{
"lat": float(lat),
"lon": float(lon),
"dataset_ids": tuple(dataset_ids),
"radius_deg": radius_deg,
"radius_m": float(radius_m),
"limit": max(1, int(limit)),
},
).all()
return [(int(row[0]), int(row[1]), float(row[2] or 0)) for row in rows]
def _enrich_canonical_stop_sources(db: Session, results: list[dict], active_dataset_ids: list[int]) -> None:
canonical_stop_ids = sorted(
{
int(item["canonical_stop_id"])
for item in results
if item.get("canonical_stop_id") is not None
}
)
if not canonical_stop_ids or not active_dataset_ids:
for item in results:
item.setdefault("source_names", [item["source_name"]] if item.get("source_name") else [])
return
rows = db.execute(
select(CanonicalStopLink.canonical_stop_id, Source.id, Source.name, func.count(CanonicalStopLink.id))
.join(Dataset, Dataset.id == CanonicalStopLink.dataset_id)
.join(Source, Source.id == Dataset.source_id)
.where(
CanonicalStopLink.object_type == "gtfs_stop",
CanonicalStopLink.canonical_stop_id.in_(canonical_stop_ids),
CanonicalStopLink.dataset_id.in_(active_dataset_ids),
)
.group_by(CanonicalStopLink.canonical_stop_id, Source.id, Source.name)
.order_by(CanonicalStopLink.canonical_stop_id, Source.name, Source.id)
).all()
summaries: dict[int, dict] = {}
for canonical_stop_id, source_id, source_name, linked_stop_count in rows:
summary = summaries.setdefault(int(canonical_stop_id), {"source_ids": [], "source_names": [], "linked_stop_count": 0})
summary["source_ids"].append(int(source_id))
summary["source_names"].append(str(source_name))
summary["linked_stop_count"] += int(linked_stop_count or 0)
for item in results:
canonical_stop_id = item.get("canonical_stop_id")
if canonical_stop_id is None:
item.setdefault("source_names", [item["source_name"]] if item.get("source_name") else [])
continue
summary = summaries.get(int(canonical_stop_id))
if not summary:
item.setdefault("source_names", [item["source_name"]] if item.get("source_name") else [])
continue
source_names = summary["source_names"]
item["source_names"] = source_names
item["source_name"] = ", ".join(source_names[:3])
if len(source_names) > 3:
item["source_name"] += f" +{len(source_names) - 3}"
item["source_id"] = summary["source_ids"][0] if len(summary["source_ids"]) == 1 else None
item["grouped_stop_count"] = max(int(item.get("grouped_stop_count") or 0), int(summary["linked_stop_count"]))
def _merge_canonical_stop_results(results: list[dict]) -> list[dict]:
merged: dict[tuple[object, ...], dict] = {}
for item in results:
canonical_stop_id = item.get("canonical_stop_id")
key = (
("canonical", canonical_stop_id)
if canonical_stop_id is not None
else ("group", item.get("dataset_id"), item.get("stop_id"))
)
current = merged.get(key)
if current is None:
copied = dict(item)
copied["source_names"] = [item["source_name"]] if item.get("source_name") else []
merged[key] = copied
continue
current["_display_rank"] = min(int(current["_display_rank"]), int(item["_display_rank"]))
current["_match_rank"] = min(int(current["_match_rank"]), int(item["_match_rank"]))
current["_bbox_rank"] = min(int(current.get("_bbox_rank", 2)), int(item.get("_bbox_rank", 2)))
current["_bbox_distance_m"] = min(
float(current.get("_bbox_distance_m", float("inf"))),
float(item.get("_bbox_distance_m", float("inf"))),
)
current["_importance_rank"] = min(
int(current.get("_importance_rank", 3)),
int(item.get("_importance_rank", 3)),
)
current["grouped_stop_count"] = int(current.get("grouped_stop_count") or 0) + int(item.get("grouped_stop_count") or 0)
current["sample_stop_ids"] = _merge_sample_stop_ids(current.get("sample_stop_ids", []), item.get("sample_stop_ids", []))
source_names = _merge_source_names(current.get("source_names", []), [item["source_name"]] if item.get("source_name") else [])
current["source_names"] = source_names
current["source_name"] = ", ".join(source_names[:3])
if len(source_names) > 3:
current["source_name"] += f" +{len(source_names) - 3}"
if len(source_names) > 1:
current["source_id"] = None
selected = list(merged.values())
selected.sort(
key=lambda item: (
item.get("_bbox_rank", 2),
item.get("_importance_rank", 3),
item["_display_rank"],
item["_match_rank"],
item.get("_bbox_distance_m", float("inf")),
-(int(item["grouped_stop_count"])),
item["name"] or "",
item["stop_id"],
)
)
return selected
def _merge_sample_stop_ids(left: list[str], right: list[str]) -> list[str]:
merged = []
seen = set()
for stop_id in [*left, *right]:
if stop_id in seen:
continue
seen.add(stop_id)
merged.append(stop_id)
if len(merged) >= 8:
break
return merged
def _merge_source_names(left: list[str], right: list[str]) -> list[str]:
names = []
seen = set()
for name in [*left, *right]:
if not name or name in seen:
continue
seen.add(name)
names.append(name)
return names
def _city_stop_display_parts(primary: str | None, *candidates: str | None) -> dict[str, str | None]:
primary_name = _clean_stop_name(primary)
if not primary_name:
return {"display_name": None, "city": None, "local_name": None}
if "," in primary_name:
pairs = _candidate_city_stop_pairs(primary_name)
if pairs:
city, stop_name = pairs[0]
return {"display_name": f"{city}, {stop_name}", "city": city, "local_name": stop_name}
leading = _split_leading_city_stop_name(primary_name)
if leading is not None:
city, stop_name = leading
return {"display_name": f"{city}, {stop_name}", "city": city, "local_name": stop_name}
for candidate in candidates:
for city, stop_name in _candidate_city_stop_pairs(candidate):
local_name = _local_stop_name(primary_name, city, stop_name)
if stop_name and (_stop_names_match(primary_name, stop_name) or _stop_names_match(local_name, stop_name)):
return {"display_name": f"{city}, {local_name}", "city": city, "local_name": local_name}
return {"display_name": primary_name, "city": None, "local_name": primary_name}
def _normalize_city_stop_name(value: str) -> str:
city, stop_name = _split_city_stop_name(value)
if city and stop_name:
return f"{city}, {stop_name}"
return _clean_stop_name(value) or value
def _split_city_stop_name(value: str | None, primary_name: str | None = None) -> tuple[str | None, str | None]:
pairs = _candidate_city_stop_pairs(value, primary_name=primary_name)
if pairs:
return pairs[0]
name = _clean_stop_name(value)
return (None, name)
def _candidate_city_stop_pairs(value: str | None, primary_name: str | None = None) -> list[tuple[str, str]]:
name = _clean_stop_name(value)
parts = _split_first_comma_outside_parentheses(name)
if parts is None:
return []
left, right = parts
left = _clean_stop_name(left)
right = _clean_stop_name(right)
if not left or not right:
return []
left_stop = _looks_like_stop_name(left)
right_stop = _looks_like_stop_name(right)
pairs: list[tuple[str, str]] = []
if primary_name:
left_matches = _stop_names_match(primary_name, left)
right_matches = _stop_names_match(primary_name, right)
if left_matches and not right_matches and not right_stop:
pairs.append((right, left))
if right_matches and not left_matches and not left_stop:
pairs.append((left, right))
if left_stop and not right_stop:
pairs.append((right, left))
elif right_stop and not left_stop:
pairs.append((left, right))
elif not left_stop and not right_stop:
pairs.append((left, right))
deduped: list[tuple[str, str]] = []
seen = set()
for pair in pairs:
if pair in seen:
continue
seen.add(pair)
deduped.append(pair)
return deduped
def _split_first_comma_outside_parentheses(value: str | None) -> tuple[str, str] | None:
if not value:
return None
depth = 0
for index, char in enumerate(value):
if char == "(":
depth += 1
elif char == ")" and depth > 0:
depth -= 1
elif char == "," and depth == 0:
return value[:index], value[index + 1 :]
return None
def _looks_like_stop_name(value: str) -> bool:
normalized = _normalize_stop_search(value)
if re.search(r"(^|[\s,(/-])hbf\.?($|[\s,)/-])", normalized):
return True
stop_tokens = (
"hauptbahnhof",
"bahnhof",
"station",
"central station",
"central train station",
"steig",
"tram",
"bus",
"zob",
"ostseite",
"westseite",
)
return any(token in normalized for token in stop_tokens)
def _split_leading_city_stop_name(value: str) -> tuple[str, str] | None:
name = _clean_stop_name(value)
if not name:
return None
match = re.match(
r"^(.+?)\s+(central train station|central station|main station|hauptbahnhof(?:\s+.*)?|hbf\.?(?:\s+.*)?)$",
name,
flags=re.IGNORECASE,
)
if not match:
return None
city = _clean_stop_name(match.group(1))
stop_name = _clean_stop_name(match.group(2))
if not city or not stop_name or _looks_like_stop_name(city):
return None
return city, stop_name
def _local_stop_name(primary_name: str, city: str, candidate_stop_name: str | None) -> str:
if _normalize_stop_search(primary_name).startswith(f"{_normalize_stop_search(city)} "):
remainder = primary_name[len(city) :].strip(" ,")
if remainder:
return remainder
if candidate_stop_name and _normalize_station_synonyms(primary_name) == _normalize_station_synonyms(candidate_stop_name):
return candidate_stop_name
return primary_name
def _stop_names_match(left: str | None, right: str | None) -> bool:
left_norm = _normalize_stop_search(left or "")
right_norm = _normalize_stop_search(right or "")
if not left_norm or not right_norm:
return False
if left_norm == right_norm or left_norm in right_norm or right_norm in left_norm:
return True
return _normalize_station_synonyms(left_norm) == _normalize_station_synonyms(right_norm)
def _normalize_station_synonyms(value: str) -> str:
normalized = _normalize_stop_search(value)
normalized = re.sub(r"\bcentral train station\b", "mainstation", normalized)
normalized = re.sub(r"\bcentral station\b", "mainstation", normalized)
normalized = re.sub(r"\bmain station\b", "mainstation", normalized)
normalized = re.sub(r"\bhauptbahnhof\b", "mainstation", normalized)
normalized = re.sub(r"(^|[\s,(/-])hbf($|[\s,)/-])", " mainstation ", normalized)
return re.sub(r"[^a-z0-9]+", "", normalized)
def _clean_stop_name(value: str | None) -> str | None:
cleaned = re.sub(r"\s+", " ", str(value or "")).strip()
return cleaned or None
def _stop_group_token(dataset_id: int, group_id: str) -> str:
return f"{STOP_GROUP_PREFIX}{dataset_id}:{group_id}"
def _stop_place_token(canonical_stop_id: int, dataset_id: int) -> str:
return f"{STOP_PLACE_PREFIX}{canonical_stop_id}:{dataset_id}"
def _canonical_stop_for_group(db: Session, stops: list[GtfsStop]) -> CanonicalStop | None:
stop_ids = [stop.id for stop in stops]
if not stop_ids:
return None
link = db.scalar(
select(CanonicalStopLink)
.where(CanonicalStopLink.object_type == "gtfs_stop", CanonicalStopLink.object_id.in_(stop_ids))
.order_by(CanonicalStopLink.role, CanonicalStopLink.id)
)
if link is None:
return None
return db.get(CanonicalStop, link.canonical_stop_id)
def _stop_match_rank(stop: GtfsStop, query: str) -> int:
if not query:
return 4
needle = _normalize_stop_search(query)
name = _normalize_stop_search(stop.name or "")
stop_id = _normalize_stop_search(stop.stop_id)
if needle in {name, stop_id}:
return 0
if name.startswith(needle) or stop_id.startswith(needle):
return 1
if needle in name or needle in stop_id:
return 2
tokens = [token for token in re.split(r"[\s,;/]+", needle) if token]
haystack = f"{name} {stop_id}"
if tokens and all(token in haystack for token in tokens):
return 3
return 4
def _bbox_order_expressions(model, bbox: tuple[float, float, float, float]):
min_lon, min_lat, max_lon, max_lat = bbox
center_lon = (min_lon + max_lon) / 2
center_lat = (min_lat + max_lat) / 2
missing = or_(model.lon.is_(None), model.lat.is_(None))
inside = and_(model.lon >= min_lon, model.lon <= max_lon, model.lat >= min_lat, model.lat <= max_lat)
bbox_rank = case((missing, 2), (inside, 0), else_=1)
lon_offset = func.coalesce(model.lon, center_lon) - center_lon
lat_offset = func.coalesce(model.lat, center_lat) - center_lat
return (bbox_rank, lon_offset * lon_offset + lat_offset * lat_offset)
def _bbox_rank(
lat: float | None,
lon: float | None,
bbox: tuple[float, float, float, float] | None,
) -> tuple[int, float]:
if bbox is None:
return (1, 0.0)
if lat is None or lon is None:
return (2, float("inf"))
min_lon, min_lat, max_lon, max_lat = bbox
if min_lon <= lon <= max_lon and min_lat <= lat <= max_lat:
return (0, 0.0)
clamped_lon = min(max(lon, min_lon), max_lon)
clamped_lat = min(max(lat, min_lat), max_lat)
return (1, _distance_m(lat, lon, clamped_lat, clamped_lon))
def _station_importance_rank(*names: str | None) -> int:
normalized_names = [_normalize_stop_search(name or "") for name in names if name]
if any(
re.search(r"(^|[\\s,(/-])hbf($|[\\s,)/-])", name)
or "hauptbahnhof" in name
or "central station" in name
or "central train station" in name
for name in normalized_names
):
return 0
if any(
re.search(r"(^|[\\s,(/-])bf($|[\\s,)/-])", name)
or "bahnhof" in name
or "station" in name
for name in normalized_names
):
return 1
if any("zob" in name or "busbahnhof" in name for name in normalized_names):
return 2
return 3
def _normalize_stop_search(value: str) -> str:
return re.sub(r"\s+", " ", value.casefold().strip())
def _parent_stops_for_groups(db: Session, group_keys) -> dict[tuple[int, str], GtfsStop]:
requested = set(group_keys)
if not requested:
return {}
dataset_ids = {dataset_id for dataset_id, _ in requested}
group_ids = {group_id for _, group_id in requested}
rows = db.scalars(
select(GtfsStop).where(GtfsStop.dataset_id.in_(dataset_ids), GtfsStop.stop_id.in_(group_ids))
).all()
return {
(stop.dataset_id, stop.stop_id): stop
for stop in rows
if (stop.dataset_id, stop.stop_id) in requested
}
def _scheduled_stops_for_groups(db: Session, group_keys) -> dict[tuple[int, str], list[GtfsStop]]:
requested = set(group_keys)
if not requested:
return {}
dataset_ids = {dataset_id for dataset_id, _ in requested}
group_ids = {group_id for _, group_id in requested}
if settings.is_postgresql_database:
group_condition = or_(
GtfsStop.stop_id.in_(group_ids),
GtfsStop.parent_station.in_(group_ids),
func.split_part(GtfsStop.stop_id, "::", 1).in_(group_ids),
)
else:
inferred_child_filters = [GtfsStop.stop_id.ilike(f"{group_id}::%") for group_id in group_ids]
group_condition = or_(GtfsStop.stop_id.in_(group_ids), GtfsStop.parent_station.in_(group_ids), *inferred_child_filters)
rows = db.scalars(
select(GtfsStop)
.where(
GtfsStop.dataset_id.in_(dataset_ids),
group_condition,
*(_scheduled_gtfs_stop_condition() if settings.is_postgresql_database else ()),
)
.order_by(GtfsStop.name, GtfsStop.stop_id)
).all()
scheduled_by_dataset = {} if settings.is_postgresql_database else {dataset_id: all_scheduled_stop_ids(db, dataset_id) for dataset_id in dataset_ids}
grouped: dict[tuple[int, str], list[GtfsStop]] = {}
for stop in rows:
if scheduled_by_dataset and stop.stop_id not in scheduled_by_dataset.get(stop.dataset_id, set()):
continue
group_id = logical_stop_group_id(stop)
key = (stop.dataset_id, group_id)
if key not in requested:
continue
bucket = grouped.setdefault(key, [])
if len(bucket) < MAX_GROUP_STOP_IDS:
bucket.append(stop)
return grouped
def _scheduled_gtfs_stop_condition():
scheduled_exists = (
select(GtfsStopTime.id)
.where(
GtfsStopTime.dataset_id == GtfsStop.dataset_id,
GtfsStopTime.stop_id == GtfsStop.stop_id,
)
.limit(1)
.exists()
)
return (scheduled_exists,)
def _best_display_stop(group_id: str, matches: list[GtfsStop], scheduled_stops: list[GtfsStop]) -> GtfsStop:
candidates = [*matches, *scheduled_stops]
return min(
candidates,
key=lambda stop: (
0 if stop.stop_id == group_id and stop.parent_station is None else 1,
0 if stop.parent_station == group_id else 1,
0 if stop.parent_station is not None else 1,
0 if stop.lat is not None and stop.lon is not None else 1,
stop.name or "",
stop.stop_id,
),
)
def _resolve_stop_selection(db: Session, value: int | str, source_ids: list[int] | None = None) -> StopSelection:
token = str(value).strip()
if is_location_token(token):
raise ValueError("selected location must be routed through location-aware search")
active_dataset_ids = _active_gtfs_dataset_ids(db, source_ids=source_ids)
if token.startswith(STOP_PLACE_PREFIX):
canonical_stop_id, dataset_id = _parse_stop_place_token(token)
return _selection_for_canonical_stop(
db,
canonical_stop_id,
dataset_ids=active_dataset_ids,
preferred_dataset_id=dataset_id,
)
if token.startswith(STOP_GROUP_PREFIX):
dataset_id, group_id = _parse_stop_group_token(token)
selection = _selection_for_group(db, dataset_id, group_id)
if selection.canonical_stop_id is not None:
return _selection_for_canonical_stop(
db,
selection.canonical_stop_id,
dataset_ids=active_dataset_ids,
preferred_dataset_id=dataset_id,
)
return selection
exact_external_stop_id = False
if token.startswith(STOP_EXACT_PREFIX):
token = token[len(STOP_EXACT_PREFIX) :]
exact_external_stop_id = True
stop = _active_stop_by_external_stop_id(db, token, active_dataset_ids) if token else None
if stop is None and not exact_external_stop_id and token.isdigit():
candidate = db.get(GtfsStop, int(token))
if candidate is not None and (not active_dataset_ids or candidate.dataset_id in active_dataset_ids):
stop = candidate
if stop is None:
raise ValueError("from_stop_id and to_stop_id must reference existing GTFS stops")
return _selection_for_stop(db, stop, active_dataset_ids)
def resolve_location_summary(db: Session, value: int | str, source_ids: list[int] | None = None) -> StopSummary:
token = str(value).strip()
if is_coordinate_token(token):
lat, lon = parse_coordinate_token(token)
return _coordinate_summary(db, lat, lon)
if is_address_point_token(token):
address, lat, lon = address_point_by_token(db, token)
return _address_summary(address, db=db, lat=lat, lon=lon, street_level=True)
if is_address_token(token):
return _address_summary(address_by_token(db, token), db=db)
return _resolve_stop_selection(db, token, source_ids=source_ids).display
def _address_summary(
address: OsmAddress,
*,
db: Session | None = None,
lat: float | None = None,
lon: float | None = None,
street_level: bool = False,
) -> StopSummary:
resolved_lat = address.lat if lat is None else lat
resolved_lon = address.lon if lon is None else lon
snapped = _snap_walk_location(db, lat=resolved_lat, lon=resolved_lon)
if snapped is not None:
resolved_lat, resolved_lon = snapped
if street_level and resolved_lat is not None and resolved_lon is not None:
stop_id = address_point_token(address.id, float(resolved_lat), float(resolved_lon))
name = _street_address_name(address)
else:
stop_id = address_token(address.id)
name = address.display_name
return StopSummary(
id=address.id,
dataset_id=address.dataset_id,
stop_id=stop_id,
name=name,
lat=resolved_lat,
lon=resolved_lon,
)
def _coordinate_summary(db: Session, lat: float, lon: float) -> StopSummary:
token = coordinate_token(lat, lon)
snapped = _snap_walk_location(db, lat=lat, lon=lon)
resolved_lat, resolved_lon = snapped if snapped is not None else (float(lat), float(lon))
return StopSummary(
id=0,
dataset_id=0,
stop_id=token,
name=f"Map point {lat:.5f}, {lon:.5f}",
lat=resolved_lat,
lon=resolved_lon,
)
def _snap_walk_location(db: Session | None, *, lat: float | None, lon: float | None) -> tuple[float, float] | None:
if db is None or lat is None or lon is None:
return None
try:
snapped = snap_point_to_routing_graph(db, lon=float(lon), lat=float(lat), mode="walk", max_distance_m=250)
except Exception: # noqa: BLE001 - snapping must not break address/coordinate routing
return None
if snapped is None:
return None
return float(snapped["lat"]), float(snapped["lon"])
def _street_address_name(address: OsmAddress) -> str:
local_name = address.street or address.place or address.name or address.display_name or "Address"
locality = " ".join(str(part) for part in [address.postcode, address.city] if part).strip()
return f"{local_name}, {locality}" if locality else str(local_name)
def _active_stop_by_external_stop_id(db: Session, stop_id: str, active_dataset_ids: list[int]) -> GtfsStop | None:
stmt = (
select(GtfsStop)
.join(Dataset, Dataset.id == GtfsStop.dataset_id)
.where(Dataset.is_active.is_(True), Dataset.kind == "gtfs", GtfsStop.stop_id == stop_id)
.order_by(
GtfsStop.dataset_id,
case((GtfsStop.parent_station.is_(None), 0), else_=1),
GtfsStop.id,
)
)
if active_dataset_ids:
stmt = stmt.where(GtfsStop.dataset_id.in_(active_dataset_ids))
return db.scalar(stmt)
def _selection_for_stop(db: Session, stop: GtfsStop, active_dataset_ids: list[int]) -> StopSelection:
if _has_scheduled_stop(db, stop):
canonical = canonical_stop_for_gtfs_stop(db, stop)
if canonical is not None:
return _selection_for_canonical_stop(
db,
canonical.id,
dataset_ids=active_dataset_ids,
preferred_dataset_id=stop.dataset_id,
)
return StopSelection(
display=_stop_summary(stop),
stop_ids_by_dataset={stop.dataset_id: (stop.stop_id,)},
canonical_stop_id=None if canonical is None else canonical.id,
)
selection = _selection_for_group(db, stop.dataset_id, stop.parent_station or stop.stop_id)
if selection.canonical_stop_id is not None:
return _selection_for_canonical_stop(
db,
selection.canonical_stop_id,
dataset_ids=active_dataset_ids,
preferred_dataset_id=stop.dataset_id,
)
return selection
def _parse_stop_group_token(token: str) -> tuple[int, str]:
rest = token[len(STOP_GROUP_PREFIX) :]
if ":" not in rest:
raise ValueError("invalid grouped stop token")
dataset_text, group_id = rest.split(":", 1)
try:
dataset_id = int(dataset_text)
except ValueError as exc:
raise ValueError("invalid grouped stop token") from exc
if not group_id:
raise ValueError("invalid grouped stop token")
return dataset_id, group_id
def _parse_stop_place_token(token: str) -> tuple[int, int]:
rest = token[len(STOP_PLACE_PREFIX) :]
if ":" not in rest:
raise ValueError("invalid canonical stop token")
canonical_text, dataset_text = rest.split(":", 1)
try:
canonical_stop_id = int(canonical_text)
dataset_id = int(dataset_text)
except ValueError as exc:
raise ValueError("invalid canonical stop token") from exc
return canonical_stop_id, dataset_id
def _selection_for_canonical_stop(
db: Session,
canonical_stop_id: int,
dataset_ids: list[int] | None = None,
preferred_dataset_id: int | None = None,
) -> StopSelection:
canonical = db.get(CanonicalStop, canonical_stop_id)
if canonical is None:
raise ValueError("selected stop place does not exist")
active_dataset_ids = _active_gtfs_dataset_ids(db) if dataset_ids is None else dataset_ids
stop_ids_by_dataset = _gtfs_stop_ids_for_canonical_stop_by_dataset(db, canonical_stop_id, active_dataset_ids)
scheduled_by_dataset: dict[int, tuple[str, ...]] = {}
for dataset_id in _preferred_dataset_order(stop_ids_by_dataset, preferred_dataset_id):
scheduled_stop_ids = _scheduled_stop_ids(db, dataset_id, stop_ids_by_dataset[dataset_id])
if scheduled_stop_ids:
scheduled_by_dataset[dataset_id] = scheduled_stop_ids
if not scheduled_by_dataset:
raise ValueError("selected stop place has no imported scheduled stop_times in the selected source scope")
display_dataset_id = preferred_dataset_id if preferred_dataset_id in scheduled_by_dataset else next(iter(scheduled_by_dataset))
return StopSelection(
display=StopSummary(
id=canonical.id,
dataset_id=display_dataset_id,
stop_id=f"canonical:{canonical.id}",
name=canonical.name,
lat=canonical.lat,
lon=canonical.lon,
),
stop_ids_by_dataset=scheduled_by_dataset,
canonical_stop_id=canonical.id,
)
def _selection_for_group(db: Session, dataset_id: int, group_id: str) -> StopSelection:
scheduled = _scheduled_stops_for_groups(db, [(dataset_id, group_id)]).get((dataset_id, group_id), [])
if not scheduled:
raise ValueError("selected stop group has no imported scheduled stop_times")
parent = _parent_stops_for_groups(db, [(dataset_id, group_id)]).get((dataset_id, group_id))
display = parent or _best_display_stop(group_id, [], scheduled)
canonical = _canonical_stop_for_group(db, scheduled)
return StopSelection(
display=_stop_summary(display),
stop_ids_by_dataset={dataset_id: tuple(stop.stop_id for stop in scheduled[:MAX_GROUP_STOP_IDS])},
canonical_stop_id=None if canonical is None else canonical.id,
)
def _gtfs_stop_ids_for_canonical_stop_by_dataset(
db: Session, canonical_stop_id: int, dataset_ids: list[int]
) -> dict[int, tuple[str, ...]]:
if not dataset_ids:
return {}
rows = db.execute(
select(CanonicalStopLink.dataset_id, CanonicalStopLink.external_id)
.where(
CanonicalStopLink.canonical_stop_id == canonical_stop_id,
CanonicalStopLink.object_type == "gtfs_stop",
CanonicalStopLink.dataset_id.in_(dataset_ids),
)
.order_by(CanonicalStopLink.dataset_id, CanonicalStopLink.role, CanonicalStopLink.external_id)
).all()
grouped: dict[int, list[str]] = {}
for dataset_id, external_id in rows:
bucket = grouped.setdefault(int(dataset_id), [])
if len(bucket) < MAX_GROUP_STOP_IDS:
bucket.append(str(external_id))
return {dataset_id: tuple(stop_ids) for dataset_id, stop_ids in grouped.items()}
def _preferred_dataset_order(stop_ids_by_dataset: dict[int, tuple[str, ...]], preferred_dataset_id: int | None) -> list[int]:
dataset_ids = sorted(stop_ids_by_dataset)
if preferred_dataset_id is None or preferred_dataset_id not in stop_ids_by_dataset:
return dataset_ids
return [preferred_dataset_id, *[dataset_id for dataset_id in dataset_ids if dataset_id != preferred_dataset_id]]
def _scheduled_stop_ids(db: Session, dataset_id: int, stop_ids: tuple[str, ...]) -> tuple[str, ...]:
return storage_scheduled_stop_ids(db, dataset_id, stop_ids)[:MAX_GROUP_STOP_IDS]
def _has_scheduled_stop(db: Session, stop: GtfsStop) -> bool:
return storage_has_scheduled_stop(db, stop.dataset_id, stop.stop_id)
def find_journeys(
db: Session,
from_stop_id: int | str,
to_stop_id: int | str,
departure: str,
max_transfers: int = 0,
limit: int = 5,
transfer_seconds: int = 120,
source_ids: list[int] | None = None,
via_stop_id: int | str | None = None,
service_date: str | date | None = None,
_allow_access_transfer: bool = True,
_allow_address_access: bool = True,
) -> dict:
if via_stop_id is not None and str(via_stop_id).strip():
return _find_journeys_via(
db=db,
from_stop_id=from_stop_id,
via_stop_id=via_stop_id,
to_stop_id=to_stop_id,
departure=departure,
max_transfers=max_transfers,
transfer_seconds=transfer_seconds,
limit=limit,
source_ids=source_ids,
service_date=service_date,
)
if _allow_address_access and (is_location_token(from_stop_id) or is_location_token(to_stop_id)):
return _find_journeys_with_address_access(
db=db,
from_stop_id=from_stop_id,
to_stop_id=to_stop_id,
departure=departure,
max_transfers=max_transfers,
transfer_seconds=transfer_seconds,
limit=limit,
source_ids=source_ids,
service_date=service_date,
)
from_selection = _resolve_stop_selection(db, from_stop_id, source_ids=source_ids)
to_selection = _resolve_stop_selection(db, to_stop_id, source_ids=source_ids)
departure_seconds = parse_gtfs_time(departure)
if departure_seconds is None:
raise ValueError("departure must be HH:MM or HH:MM:SS")
parsed_service_date = parse_service_date(service_date)
stop_cache: dict[tuple[int, str], StopSummary] = {}
for dataset_id, stop_ids in from_selection.stop_ids_by_dataset.items():
for stop_id in stop_ids:
stop_cache.setdefault((dataset_id, stop_id), _stop_summary_for_stop_id(db, dataset_id, stop_id))
for dataset_id, stop_ids in to_selection.stop_ids_by_dataset.items():
for stop_id in stop_ids:
stop_cache.setdefault((dataset_id, stop_id), _stop_summary_for_stop_id(db, dataset_id, stop_id))
osm_stop_cache: dict[tuple[int, str], dict] = {}
max_journeys = max(1, min(limit, 10))
common_dataset_ids = sorted(set(from_selection.stop_ids_by_dataset) & set(to_selection.stop_ids_by_dataset))
service_ids_by_dataset = _service_ids_by_dataset(db, sorted(set(from_selection.stop_ids_by_dataset) | set(to_selection.stop_ids_by_dataset)), parsed_service_date)
direct: list[dict] = []
for dataset_id in common_dataset_ids:
service_ids = service_ids_by_dataset.get(dataset_id)
if service_ids == set():
continue
direct.extend(
_find_direct_journeys(
db=db,
dataset_id=dataset_id,
service_ids=service_ids,
from_stop_ids=from_selection.stop_ids_by_dataset[dataset_id],
to_stop_ids=to_selection.stop_ids_by_dataset[dataset_id],
earliest_departure=departure_seconds,
limit=max_journeys,
stop_cache=stop_cache,
osm_stop_cache=osm_stop_cache,
)
)
direct = sorted(direct, key=_journey_sort_key)[:max_journeys]
if max_transfers > 0:
direct_arrival = direct[0]["arrival_seconds"] if direct else None
transfer_journeys: list[dict] = []
for first_dataset_id, second_dataset_id in _journey_dataset_pairs(from_selection, to_selection):
first_service_ids = service_ids_by_dataset.get(first_dataset_id)
second_service_ids = service_ids_by_dataset.get(second_dataset_id)
if first_service_ids == set() or second_service_ids == set():
continue
transfer_journeys.extend(
_find_one_transfer_journeys(
db=db,
first_dataset_id=first_dataset_id,
second_dataset_id=second_dataset_id,
first_service_ids=first_service_ids,
second_service_ids=second_service_ids,
from_stop_ids=from_selection.stop_ids_by_dataset[first_dataset_id],
to_stop_ids=to_selection.stop_ids_by_dataset[second_dataset_id],
origin_canonical_stop_id=from_selection.canonical_stop_id,
target_canonical_stop_id=to_selection.canonical_stop_id,
earliest_departure=departure_seconds,
latest_arrival=direct_arrival,
transfer_seconds=max(0, transfer_seconds),
limit=max_journeys,
stop_cache=stop_cache,
osm_stop_cache=osm_stop_cache,
)
)
transfer_journeys = sorted(
transfer_journeys,
key=_journey_sort_key,
)[: max_journeys * 3]
if max_transfers > 1:
best_known_arrival = min(
(
int(journey["arrival_seconds"])
for journey in [*direct, *transfer_journeys]
if journey.get("arrival_seconds") is not None
),
default=None,
)
round_journeys: list[dict] = []
for dataset_id in common_dataset_ids:
service_ids = service_ids_by_dataset.get(dataset_id)
if service_ids == set():
continue
round_journeys.extend(
_find_round_journeys(
db=db,
dataset_id=dataset_id,
service_ids=service_ids,
from_selection=from_selection,
to_selection=to_selection,
earliest_departure=departure_seconds,
max_transfers=max(0, max_transfers),
transfer_seconds=max(0, transfer_seconds),
latest_arrival=best_known_arrival,
limit=max_journeys,
stop_cache=stop_cache,
osm_stop_cache=osm_stop_cache,
)
)
transfer_journeys = sorted(
[*transfer_journeys, *round_journeys],
key=_journey_sort_key,
)[: max_journeys * 3]
else:
transfer_journeys = []
walk_journey = _find_walk_only_journey(
db,
from_selection=from_selection,
to_selection=to_selection,
departure_seconds=departure_seconds,
)
walk_journeys = [] if walk_journey is None else [walk_journey]
journeys = _filter_reasonable_journeys([*walk_journeys, *transfer_journeys, *direct])
unique: dict[tuple[str, ...], dict] = {}
for journey in sorted(journeys, key=_journey_sort_key):
key = tuple(_journey_leg_signature(leg) for leg in journey["legs"])
unique.setdefault(key, journey)
selected = _select_diverse_journeys(unique.values(), limit=max_journeys)
if not selected and _allow_access_transfer and max_transfers > 0:
access_journeys = _find_access_transfer_journeys(
db=db,
from_selection=from_selection,
to_stop_id=to_stop_id,
earliest_departure=departure_seconds,
max_transfers=max_transfers,
transfer_seconds=max(0, transfer_seconds),
limit=max_journeys,
source_ids=source_ids,
service_date=parsed_service_date,
stop_cache=stop_cache,
osm_stop_cache=osm_stop_cache,
)
selected = list(
{
tuple(_journey_leg_signature(leg) for leg in journey["legs"]): journey
for journey in sorted(access_journeys, key=_journey_sort_key)
}.values()
)[:max_journeys]
selected_dataset_ids = sorted(
{
int(leg["dataset_id"])
for journey in selected
for leg in journey.get("legs", [])
if leg.get("dataset_id") is not None
}
)
searched_dataset_ids = sorted(set(from_selection.stop_ids_by_dataset) | set(to_selection.stop_ids_by_dataset))
source_payloads = _source_payloads_for_dataset_ids(db, selected_dataset_ids or searched_dataset_ids)
return {
"from": _stop_payload(from_selection.display),
"to": _stop_payload(to_selection.display),
"source": source_payloads[0] if len(source_payloads) == 1 else None,
"sources": source_payloads,
"dataset_id": selected_dataset_ids[0] if len(selected_dataset_ids) == 1 else None,
"dataset_ids": selected_dataset_ids or searched_dataset_ids,
"departure_time": format_gtfs_time(departure_seconds),
"departure_time_label": format_gtfs_time_label(departure_seconds),
"service_date": None if parsed_service_date is None else parsed_service_date.isoformat(),
"max_transfers": max(0, max_transfers),
"journeys": selected,
}
def _find_journeys_with_address_access(
db: Session,
from_stop_id: int | str,
to_stop_id: int | str,
departure: str,
max_transfers: int,
transfer_seconds: int,
limit: int,
source_ids: list[int] | None,
service_date: str | date | None,
) -> dict:
departure_seconds = parse_gtfs_time(departure)
if departure_seconds is None:
raise ValueError("departure must be HH:MM or HH:MM:SS")
parsed_service_date = parse_service_date(service_date)
active_dataset_ids = _active_gtfs_dataset_ids(db, source_ids=source_ids)
from_location = resolve_location_summary(db, from_stop_id, source_ids=source_ids)
to_location = resolve_location_summary(db, to_stop_id, source_ids=source_ids)
max_journeys = max(1, min(limit, 10))
journeys: list[dict] = []
direct_walk = _walk_only_journey_between_summaries(
db,
from_location=from_location,
to_location=to_location,
departure_seconds=departure_seconds,
dataset_id=(active_dataset_ids[0] if active_dataset_ids else from_location.dataset_id),
route_geometry=True,
)
if direct_walk is not None:
journeys.append(direct_walk)
origin_is_address = is_location_token(from_stop_id)
destination_is_address = is_location_token(to_stop_id)
short_direct_walk_only = (
direct_walk is not None
and origin_is_address != destination_is_address
and int(direct_walk.get("duration_seconds") or 0) <= ADDRESS_ACCESS_SHORT_DIRECT_WALK_SECONDS
)
access_distance_m = (
_distance_m(float(from_location.lat), float(from_location.lon), float(to_location.lat), float(to_location.lon))
if from_location.lat is not None
and from_location.lon is not None
and to_location.lat is not None
and to_location.lon is not None
else 0
)
include_major_hubs = (
origin_is_address
and destination_is_address
and access_distance_m >= ADDRESS_ACCESS_LONG_DISTANCE_HUB_THRESHOLD_M
)
origin_candidates = _location_stop_candidates(
db,
from_stop_id,
from_location,
active_dataset_ids,
source_ids=source_ids,
include_major_hubs=include_major_hubs,
)
destination_candidates = _location_stop_candidates(
db,
to_stop_id,
to_location,
active_dataset_ids,
source_ids=source_ids,
include_major_hubs=include_major_hubs,
)
if short_direct_walk_only:
origin_candidates = []
destination_candidates = []
candidate_pairs = []
else:
candidate_pairs = _address_access_candidate_pairs(
origin_candidates,
destination_candidates,
origin_is_address=origin_is_address,
destination_is_address=destination_is_address,
max_pairs=ADDRESS_ACCESS_MAX_DEEP_PAIR_CANDIDATES if max_transfers > 1 else ADDRESS_ACCESS_MAX_PAIR_CANDIDATES,
)
access_leg_cache: dict[str, dict | None] = {}
transit_departure_cache: dict[str, int | None] = {}
for origin, destination in candidate_pairs:
access_leg = access_leg_cache.get(origin.token)
transit_departure_seconds = transit_departure_cache.get(origin.token)
if origin.token not in transit_departure_cache:
access_leg = None
transit_departure_seconds = departure_seconds
if origin_is_address:
access_leg = _walk_leg_between_summaries(
db,
from_stop=from_location,
to_stop=origin.selection.display,
departure_seconds=departure_seconds,
dataset_id=origin.selection.dataset_id,
max_duration_seconds=ADDRESS_ACCESS_MAX_SECONDS,
route_geometry=True,
)
if access_leg is None:
transit_departure_seconds = None
else:
transit_departure_seconds = int(access_leg["arrival_seconds"])
access_leg_cache[origin.token] = access_leg
transit_departure_cache[origin.token] = transit_departure_seconds
if transit_departure_seconds is None:
continue
transit_departure = format_gtfs_time(transit_departure_seconds)
if transit_departure is None:
continue
try:
transit = find_journeys(
db=db,
from_stop_id=origin.token,
to_stop_id=destination.token,
departure=transit_departure,
max_transfers=max_transfers,
transfer_seconds=transfer_seconds,
limit=max(max_journeys, 6),
source_ids=source_ids,
service_date=parsed_service_date,
_allow_access_transfer=include_major_hubs,
_allow_address_access=False,
)
except ValueError:
continue
for transit_journey in transit.get("journeys", [])[: max_journeys * 2]:
egress_leg = None
if destination_is_address:
arrival_seconds = transit_journey.get("arrival_seconds")
if arrival_seconds is None:
continue
egress_leg = _walk_leg_between_summaries(
db,
from_stop=destination.selection.display,
to_stop=to_location,
departure_seconds=int(arrival_seconds),
dataset_id=destination.selection.dataset_id,
max_duration_seconds=ADDRESS_ACCESS_MAX_SECONDS,
route_geometry=True,
)
if egress_leg is None:
continue
combined = _compose_address_access_journey(
transit_journey,
access_leg=access_leg,
egress_leg=egress_leg,
)
if combined is not None:
journeys.append(combined)
if include_major_hubs and len(journeys) >= max_journeys:
break
if include_major_hubs and len(journeys) >= max_journeys:
break
unique: dict[tuple[str, ...], dict] = {}
for journey in sorted(_filter_reasonable_journeys(journeys), key=_journey_sort_key):
key = tuple(_journey_leg_signature(leg) for leg in journey["legs"])
unique.setdefault(key, journey)
selected = _select_diverse_journeys(unique.values(), limit=max_journeys)
selected_dataset_ids = sorted(
{
int(leg["dataset_id"])
for journey in selected
for leg in journey.get("legs", [])
if leg.get("dataset_id") is not None
}
)
searched_dataset_ids = sorted(active_dataset_ids)
source_payloads = _source_payloads_for_dataset_ids(db, selected_dataset_ids or searched_dataset_ids)
diagnostics = {
"address_access": {
"origin_candidates": len(origin_candidates),
"destination_candidates": len(destination_candidates),
"searched_pairs": len(candidate_pairs),
"max_pairs": ADDRESS_ACCESS_MAX_DEEP_PAIR_CANDIDATES if max_transfers > 1 else ADDRESS_ACCESS_MAX_PAIR_CANDIDATES,
"major_hubs": include_major_hubs,
}
}
return {
"from": _stop_payload(from_location),
"to": _stop_payload(to_location),
"source": source_payloads[0] if len(source_payloads) == 1 else None,
"sources": source_payloads,
"dataset_id": selected_dataset_ids[0] if len(selected_dataset_ids) == 1 else None,
"dataset_ids": selected_dataset_ids or searched_dataset_ids,
"departure_time": format_gtfs_time(departure_seconds),
"departure_time_label": format_gtfs_time_label(departure_seconds),
"service_date": None if parsed_service_date is None else parsed_service_date.isoformat(),
"max_transfers": max(0, max_transfers),
"diagnostics": diagnostics,
"journeys": selected,
}
def _address_access_candidate_pairs(
origins: list[_AccessStopCandidate],
destinations: list[_AccessStopCandidate],
*,
origin_is_address: bool,
destination_is_address: bool,
max_pairs: int,
) -> list[tuple[_AccessStopCandidate, _AccessStopCandidate]]:
pairs = [
(
(origin.distance_m if origin_is_address else 0) + (destination.distance_m if destination_is_address else 0),
origin,
destination,
)
for origin in origins
for destination in destinations
]
pairs.sort(key=lambda item: (item[0], item[1].distance_m, item[2].distance_m, item[1].token, item[2].token))
if not origin_is_address or not destination_is_address:
return [(origin, destination) for _, origin, destination in pairs]
closest_count = max(2, max_pairs // 2)
selected: list[tuple[float, _AccessStopCandidate, _AccessStopCandidate]] = []
seen: set[tuple[str, str]] = set()
priority_pairs = sorted(
(
item
for item in pairs
if item[2].priority < ADDRESS_ACCESS_NORMAL_PRIORITY
),
key=lambda item: (
item[2].priority,
item[1].distance_m,
item[2].distance_m,
item[0],
item[1].token,
item[2].token,
),
)
def append_item(item: tuple[float, _AccessStopCandidate, _AccessStopCandidate]) -> bool:
_, origin, destination = item
key = (origin.token, destination.token)
if key in seen:
return False
seen.add(key)
selected.append(item)
return True
priority_budget = max(0, max_pairs - closest_count)
if priority_budget > 0:
for item in priority_pairs:
append_item(item)
if len(selected) >= priority_budget:
break
for item in pairs[:closest_count]:
append_item(item)
for item in priority_pairs:
append_item(item)
if len(selected) >= max_pairs:
break
for item in pairs:
append_item(item)
if len(selected) >= max_pairs:
break
return [(origin, destination) for _, origin, destination in selected[:max_pairs]]
def _location_stop_candidates(
db: Session,
token: int | str,
location: StopSummary,
active_dataset_ids: list[int],
*,
source_ids: list[int] | None,
include_major_hubs: bool = False,
) -> list[_AccessStopCandidate]:
if not is_location_token(token):
selection = _resolve_stop_selection(db, token, source_ids=source_ids)
if selection.canonical_stop_id is not None:
return [
_AccessStopCandidate(
token=_stop_place_token(selection.canonical_stop_id, selection.dataset_id),
selection=selection,
distance_m=0,
)
]
return [_AccessStopCandidate(token=str(token), selection=selection, distance_m=0)]
if location.lon is None or location.lat is None or not active_dataset_ids:
return []
rows = (
_nearby_canonical_stops_postgresql(db, location, active_dataset_ids)
if settings.is_postgresql_database
else _nearby_canonical_stops_sqlite(db, location, active_dataset_ids)
)
candidates: list[_AccessStopCandidate] = []
seen: set[int] = set()
for canonical_stop_id, preferred_dataset_id, distance_m in rows:
if int(canonical_stop_id) in seen:
continue
seen.add(int(canonical_stop_id))
try:
selection = _selection_for_canonical_stop(
db,
int(canonical_stop_id),
dataset_ids=active_dataset_ids,
preferred_dataset_id=int(preferred_dataset_id),
)
except ValueError:
continue
candidates.append(
_AccessStopCandidate(
token=_stop_place_token(int(canonical_stop_id), selection.dataset_id),
selection=selection,
distance_m=float(distance_m or 0),
)
)
if len(candidates) >= ADDRESS_ACCESS_STOP_CANDIDATES:
break
if include_major_hubs:
candidates = _merge_access_stop_candidates(
candidates,
_location_major_hub_stop_candidates(db, token, location, active_dataset_ids),
)
return candidates
def _merge_access_stop_candidates(
primary: list[_AccessStopCandidate],
extra: list[_AccessStopCandidate],
) -> list[_AccessStopCandidate]:
merged = list(primary)
seen_tokens = {candidate.token for candidate in merged}
seen_canonical_ids = {
candidate.selection.canonical_stop_id
for candidate in merged
if candidate.selection.canonical_stop_id is not None
}
for candidate in extra:
canonical_stop_id = candidate.selection.canonical_stop_id
if candidate.token in seen_tokens or (canonical_stop_id is not None and canonical_stop_id in seen_canonical_ids):
continue
merged.append(candidate)
seen_tokens.add(candidate.token)
if canonical_stop_id is not None:
seen_canonical_ids.add(canonical_stop_id)
return merged
def _location_major_hub_stop_candidates(
db: Session,
token: int | str,
location: StopSummary,
active_dataset_ids: list[int],
) -> list[_AccessStopCandidate]:
if location.lon is None or location.lat is None or not active_dataset_ids:
return []
locality = _address_city_for_token(db, token)
rows = (
_major_hub_canonical_stops_postgresql(db, location, active_dataset_ids, locality=locality)
if settings.is_postgresql_database
else _major_hub_canonical_stops_sqlite(db, location, active_dataset_ids, locality=locality)
)
candidates: list[_AccessStopCandidate] = []
seen: set[int] = set()
for canonical_stop_id, preferred_dataset_id, distance_m in rows:
if int(canonical_stop_id) in seen:
continue
seen.add(int(canonical_stop_id))
try:
selection = _selection_for_canonical_stop(
db,
int(canonical_stop_id),
dataset_ids=active_dataset_ids,
preferred_dataset_id=int(preferred_dataset_id),
)
except ValueError:
continue
candidates.append(
_AccessStopCandidate(
token=_stop_place_token(int(canonical_stop_id), selection.dataset_id),
selection=selection,
distance_m=float(distance_m or 0),
priority=ADDRESS_ACCESS_MAJOR_HUB_PRIORITY,
)
)
if len(candidates) >= ADDRESS_ACCESS_MAJOR_HUB_CANDIDATES:
break
return candidates
def _address_city_for_token(db: Session, token: int | str) -> str:
try:
if is_coordinate_token(token):
return ""
if is_address_point_token(token):
address, _, _ = address_point_by_token(db, token)
elif is_address_token(token):
address = address_by_token(db, token)
else:
return ""
except ValueError:
return ""
return _normalize_stop_search(address.city or "")
def _is_major_station_name(value: str | None) -> bool:
normalized = _normalize_stop_search(value or "")
return (
bool(re.search(r"(^|[\s,(/-])hbf($|[\s,)/-])", normalized))
or "hauptbahnhof" in normalized
or "central station" in normalized
or "central train station" in normalized
)
def _major_hub_canonical_stops_postgresql(
db: Session,
location: StopSummary,
active_dataset_ids: list[int],
*,
locality: str,
) -> list[tuple[int, int, float]]:
radius_deg = ADDRESS_ACCESS_MAJOR_HUB_RADIUS_M / 111_320
stmt = text(
"""
WITH point AS (
SELECT ST_SetSRID(ST_MakePoint(:lon, :lat), 4326) AS geom
),
hub_rows AS (
SELECT
canonical_stops.id AS canonical_stop_id,
canonical_stop_links.dataset_id AS dataset_id,
ST_DistanceSphere(canonical_stops.geom, point.geom) AS distance_m,
MIN(
CASE
WHEN :locality = '' THEN 1
WHEN LOWER(COALESCE(canonical_stops.name, '')) LIKE :locality_pattern THEN 0
ELSE 1
END
) AS locality_rank
FROM canonical_stops
JOIN canonical_stop_links
ON canonical_stop_links.canonical_stop_id = canonical_stops.id
AND canonical_stop_links.object_type = 'gtfs_stop'
JOIN datasets
ON datasets.id = canonical_stop_links.dataset_id
AND datasets.kind = 'gtfs'
AND datasets.is_active IS TRUE
CROSS JOIN point
WHERE canonical_stop_links.dataset_id IN :dataset_ids
AND canonical_stops.geom IS NOT NULL
AND canonical_stops.geom && ST_Expand(point.geom, :radius_deg)
AND ST_DWithin(canonical_stops.geom::geography, point.geom::geography, :radius_m)
AND (
LOWER(COALESCE(canonical_stops.name, '')) ~ '(^|[[:space:],(/-])hbf($|[[:space:],)/-])'
OR LOWER(COALESCE(canonical_stops.name, '')) LIKE '%hauptbahnhof%'
OR LOWER(COALESCE(canonical_stops.name, '')) LIKE '%central station%'
OR LOWER(COALESCE(canonical_stops.name, '')) LIKE '%central train station%'
)
GROUP BY canonical_stops.id, canonical_stop_links.dataset_id, canonical_stops.geom, point.geom
)
SELECT canonical_stop_id, dataset_id, distance_m
FROM hub_rows
ORDER BY locality_rank, distance_m, canonical_stop_id
LIMIT :limit
"""
).bindparams(bindparam("dataset_ids", expanding=True))
rows = db.execute(
stmt,
{
"lon": float(location.lon),
"lat": float(location.lat),
"dataset_ids": tuple(active_dataset_ids),
"radius_deg": radius_deg,
"radius_m": ADDRESS_ACCESS_MAJOR_HUB_RADIUS_M,
"locality": locality,
"locality_pattern": f"%{locality}%" if locality else "",
"limit": ADDRESS_ACCESS_MAJOR_HUB_CANDIDATES * 6,
},
).all()
return [(int(row[0]), int(row[1]), float(row[2] or 0)) for row in rows]
def _major_hub_canonical_stops_sqlite(
db: Session,
location: StopSummary,
active_dataset_ids: list[int],
*,
locality: str,
) -> list[tuple[int, int, float]]:
lon = float(location.lon)
lat = float(location.lat)
radius_deg = ADDRESS_ACCESS_MAJOR_HUB_RADIUS_M / 111_320
rows = db.execute(
select(
CanonicalStop.id,
CanonicalStopLink.dataset_id,
CanonicalStop.name,
CanonicalStop.lat,
CanonicalStop.lon,
)
.join(CanonicalStopLink, CanonicalStopLink.canonical_stop_id == CanonicalStop.id)
.where(
CanonicalStopLink.object_type == "gtfs_stop",
CanonicalStopLink.dataset_id.in_(active_dataset_ids),
CanonicalStop.lat.is_not(None),
CanonicalStop.lon.is_not(None),
CanonicalStop.lat >= lat - radius_deg,
CanonicalStop.lat <= lat + radius_deg,
CanonicalStop.lon >= lon - radius_deg,
CanonicalStop.lon <= lon + radius_deg,
)
.limit(ADDRESS_ACCESS_MAJOR_HUB_CANDIDATES * 100)
).all()
result: list[tuple[int, int, float, int]] = []
seen: set[int] = set()
for canonical_stop_id, dataset_id, canonical_name, stop_lat, stop_lon in rows:
if not _is_major_station_name(canonical_name):
continue
distance_m = _distance_m(lat, lon, float(stop_lat), float(stop_lon))
if distance_m > ADDRESS_ACCESS_MAJOR_HUB_RADIUS_M:
continue
locality_rank = (
0
if locality
and locality in _normalize_stop_search(canonical_name or "")
else 1
)
if int(canonical_stop_id) in seen:
continue
seen.add(int(canonical_stop_id))
result.append((int(canonical_stop_id), int(dataset_id), distance_m, locality_rank))
result.sort(key=lambda item: (item[3], item[2], item[0]))
return [(canonical_stop_id, dataset_id, distance_m) for canonical_stop_id, dataset_id, distance_m, _ in result]
def _nearby_canonical_stops_postgresql(
db: Session,
location: StopSummary,
active_dataset_ids: list[int],
) -> list[tuple[int, int, float]]:
radius_deg = ADDRESS_ACCESS_RADIUS_M / 111_320
stmt = text(
"""
WITH point AS (
SELECT ST_SetSRID(ST_MakePoint(:lon, :lat), 4326) AS geom
)
SELECT
canonical_stops.id AS canonical_stop_id,
canonical_stop_links.dataset_id AS dataset_id,
ST_DistanceSphere(canonical_stops.geom, point.geom) AS distance_m
FROM canonical_stops
JOIN canonical_stop_links
ON canonical_stop_links.canonical_stop_id = canonical_stops.id
AND canonical_stop_links.object_type = 'gtfs_stop'
JOIN datasets
ON datasets.id = canonical_stop_links.dataset_id
AND datasets.kind = 'gtfs'
AND datasets.is_active IS TRUE
CROSS JOIN point
WHERE canonical_stop_links.dataset_id IN :dataset_ids
AND canonical_stops.geom IS NOT NULL
AND canonical_stops.geom && ST_Expand(point.geom, :radius_deg)
AND ST_DWithin(canonical_stops.geom::geography, point.geom::geography, :radius_m)
GROUP BY canonical_stops.id, canonical_stop_links.dataset_id, canonical_stops.geom, point.geom
ORDER BY canonical_stops.geom <-> point.geom, canonical_stops.id
LIMIT :limit
"""
).bindparams(bindparam("dataset_ids", expanding=True))
rows = db.execute(
stmt,
{
"lon": float(location.lon),
"lat": float(location.lat),
"dataset_ids": tuple(active_dataset_ids),
"radius_deg": radius_deg,
"radius_m": ADDRESS_ACCESS_RADIUS_M,
"limit": ADDRESS_ACCESS_STOP_CANDIDATES * 8,
},
).all()
return [(int(row[0]), int(row[1]), float(row[2] or 0)) for row in rows]
def _nearby_canonical_stops_sqlite(
db: Session,
location: StopSummary,
active_dataset_ids: list[int],
) -> list[tuple[int, int, float]]:
lon = float(location.lon)
lat = float(location.lat)
distance_expr = (CanonicalStop.lon - lon) * (CanonicalStop.lon - lon) + (CanonicalStop.lat - lat) * (CanonicalStop.lat - lat)
rows = db.execute(
select(CanonicalStop.id, CanonicalStopLink.dataset_id, CanonicalStop.lat, CanonicalStop.lon)
.join(CanonicalStopLink, CanonicalStopLink.canonical_stop_id == CanonicalStop.id)
.where(
CanonicalStopLink.object_type == "gtfs_stop",
CanonicalStopLink.dataset_id.in_(active_dataset_ids),
CanonicalStop.lat.is_not(None),
CanonicalStop.lon.is_not(None),
)
.order_by(distance_expr)
.limit(ADDRESS_ACCESS_STOP_CANDIDATES * 8)
).all()
result = []
for canonical_stop_id, dataset_id, stop_lat, stop_lon in rows:
distance_m = _distance_m(lat, lon, float(stop_lat), float(stop_lon))
if distance_m <= ADDRESS_ACCESS_RADIUS_M:
result.append((int(canonical_stop_id), int(dataset_id), distance_m))
result.sort(key=lambda item: item[2])
return result
def _walk_only_journey_between_summaries(
db: Session,
*,
from_location: StopSummary,
to_location: StopSummary,
departure_seconds: int,
dataset_id: int,
route_geometry: bool = True,
) -> dict | None:
if from_location.lon is None or from_location.lat is None or to_location.lon is None or to_location.lat is None:
return None
direct_distance_m = _distance_m(float(from_location.lat), float(from_location.lon), float(to_location.lat), float(to_location.lon))
if direct_distance_m > PUBLIC_TRANSPORT_WALK_OPTION_MAX_SECONDS * 1.35:
return None
leg = _walk_leg_between_summaries(
db,
from_stop=from_location,
to_stop=to_location,
departure_seconds=departure_seconds,
dataset_id=dataset_id,
max_duration_seconds=PUBLIC_TRANSPORT_WALK_OPTION_MAX_SECONDS,
route_geometry=route_geometry,
)
if leg is None:
return None
leg["route_name"] = "Walk only"
return _journey_payload([leg])
def _walk_leg_between_summaries(
db: Session,
*,
from_stop: StopSummary,
to_stop: StopSummary,
departure_seconds: int,
dataset_id: int,
max_duration_seconds: int,
route_geometry: bool = True,
) -> dict | None:
if from_stop.lon is None or from_stop.lat is None or to_stop.lon is None or to_stop.lat is None:
return None
distance_m = _distance_m(float(from_stop.lat), float(from_stop.lon), float(to_stop.lat), float(to_stop.lon))
estimated_duration_seconds = int(math.ceil(distance_m / 1.35))
if estimated_duration_seconds > max_duration_seconds * 1.5:
return None
leg = _walk_leg_payload(
db,
_RouterWalkBacklink(
previous_label=_RouterLabel(canonical_stop_id=0, arrival_seconds=departure_seconds),
from_stop=from_stop,
to_stop=to_stop,
distance_m=distance_m,
departure_seconds=departure_seconds,
arrival_seconds=departure_seconds + estimated_duration_seconds,
),
dataset_id,
route_geometry=route_geometry,
)
if int(leg.get("duration_seconds") or 0) > max_duration_seconds:
return None
return leg
def _compose_address_access_journey(
journey: dict,
*,
access_leg: dict | None,
egress_leg: dict | None,
) -> dict | None:
public_legs: list[dict] = []
features: list[dict] = []
leg_offset = 0
if access_leg is not None:
public_legs.append(_leg_public_payload(access_leg))
features.extend(_offset_feature_legs(_feature_items(_journey_payload([access_leg])), leg_offset))
leg_offset += 1
public_legs.extend(journey.get("legs") or [])
features.extend(_offset_feature_legs(_feature_items(journey), leg_offset))
leg_offset += len(journey.get("legs") or [])
if egress_leg is not None:
public_legs.append(_leg_public_payload(egress_leg))
features.extend(_offset_feature_legs(_feature_items(_journey_payload([egress_leg])), leg_offset))
if not public_legs:
return None
departure = access_leg["departure_seconds"] if access_leg is not None else journey.get("departure_seconds")
arrival = egress_leg["arrival_seconds"] if egress_leg is not None else journey.get("arrival_seconds")
if departure is None or arrival is None:
return None
transit_legs = [leg for leg in public_legs if leg.get("mode") != "walk"]
duration_seconds = max(0, int(arrival) - int(departure))
return {
"transfers": max(0, len(transit_legs) - 1),
"departure_seconds": int(departure),
"arrival_seconds": int(arrival),
"departure_time": format_gtfs_time(int(departure)),
"arrival_time": format_gtfs_time(int(arrival)),
"departure_time_label": format_gtfs_time_label(int(departure)),
"arrival_time_label": format_gtfs_time_label(int(arrival)),
"duration_seconds": duration_seconds,
"duration_minutes": duration_minutes_ceil(duration_seconds),
"duration_label": format_duration_label(duration_seconds),
"legs": public_legs,
"features": feature_collection(features),
}
def _feature_items(payload: dict) -> list[dict]:
features = payload.get("features") if isinstance(payload, dict) else None
if isinstance(features, dict):
items = features.get("features")
else:
items = None
return list(items or []) if isinstance(items, list) else []
def _offset_feature_legs(features: list[dict], offset: int) -> list[dict]:
if offset <= 0:
return json.loads(json.dumps(features))
copied = json.loads(json.dumps(features))
for feature in copied:
props = feature.get("properties") if isinstance(feature, dict) else None
if isinstance(props, dict) and isinstance(props.get("leg"), int):
props["leg"] = int(props["leg"]) + offset
return copied
def _select_diverse_journeys(journeys, *, limit: int) -> list[dict]:
ranked = sorted((dict(journey) for journey in journeys), key=_journey_sort_key)
selected: list[dict] = []
seen_exact: set[str] = set()
seen_diversity: set[tuple[object, ...]] = set()
for journey in ranked:
exact = "||".join(_journey_leg_signature(leg) for leg in journey.get("legs") or [])
if exact in seen_exact:
continue
seen_exact.add(exact)
diversity_key = _journey_diversity_key(journey)
if diversity_key in seen_diversity and len(selected) >= max(3, limit // 2):
continue
seen_diversity.add(diversity_key)
selected.append(journey)
if len(selected) >= limit:
break
if len(selected) < min(limit, 3):
for journey in ranked:
exact = "||".join(_journey_leg_signature(leg) for leg in journey.get("legs") or [])
if exact in {"||".join(_journey_leg_signature(leg) for leg in existing.get("legs") or []) for existing in selected}:
continue
selected.append(journey)
if len(selected) >= min(limit, 3):
break
return _ensure_walk_only_option(selected, ranked, limit=limit)
def _ensure_walk_only_option(selected: list[dict], ranked: list[dict], *, limit: int) -> list[dict]:
if any(_journey_is_walk_only(journey) for journey in selected):
return selected
walk = next((journey for journey in ranked if _journey_is_walk_only(journey)), None)
if walk is None:
return selected
if len(selected) < limit:
return [*selected, walk]
if selected:
selected[-1] = walk
return selected
def _journey_is_walk_only(journey: dict) -> bool:
legs = journey.get("legs") or []
return bool(legs) and all(leg.get("mode") == "walk" for leg in legs)
def _journey_diversity_key(journey: dict) -> tuple[object, ...]:
route_signature = tuple(
str(leg.get("route_ref") or leg.get("route_id") or leg.get("mode") or "")
for leg in journey.get("legs") or []
if leg.get("mode") != "walk"
)
departure = journey.get("departure_seconds")
time_band = None if departure is None else int(departure) // (30 * 60)
return (int(journey.get("transfers") or 0), route_signature, time_band)
def _find_journeys_via(
db: Session,
from_stop_id: int | str,
via_stop_id: int | str,
to_stop_id: int | str,
departure: str,
max_transfers: int,
transfer_seconds: int,
limit: int,
source_ids: list[int] | None,
service_date: str | date | None,
) -> dict:
max_journeys = max(1, min(limit, 10))
first_result = find_journeys(
db=db,
from_stop_id=from_stop_id,
to_stop_id=via_stop_id,
departure=departure,
max_transfers=max_transfers,
transfer_seconds=transfer_seconds,
limit=max_journeys,
source_ids=source_ids,
via_stop_id=None,
service_date=service_date,
)
combined = []
for first in first_result.get("journeys", [])[:max_journeys]:
first_arrival = first.get("arrival_seconds")
if first_arrival is None:
continue
onward_departure = format_gtfs_time(int(first_arrival) + max(0, transfer_seconds))
second_result = find_journeys(
db=db,
from_stop_id=via_stop_id,
to_stop_id=to_stop_id,
departure=onward_departure or departure,
max_transfers=max_transfers,
transfer_seconds=transfer_seconds,
limit=max_journeys,
source_ids=source_ids,
via_stop_id=None,
service_date=service_date,
)
for second in second_result.get("journeys", [])[:max_journeys]:
combined.append(_combine_via_journey(first, second))
unique: dict[tuple[str, ...], dict] = {}
for journey in sorted(combined, key=_journey_sort_key):
key = tuple(_journey_leg_signature(leg) for leg in journey["legs"])
unique.setdefault(key, journey)
selected = list(unique.values())[:max_journeys]
dataset_ids = sorted(
{
int(leg["dataset_id"])
for journey in selected
for leg in journey.get("legs", [])
if leg.get("dataset_id") is not None
}
)
searched_dataset_ids = sorted(set(first_result.get("dataset_ids") or []) | set(dataset_ids))
return {
"from": first_result.get("from"),
"to": selected[0]["legs"][-1]["to"] if selected else None,
"via": first_result.get("to"),
"source": None,
"sources": _source_payloads_for_dataset_ids(db, dataset_ids or searched_dataset_ids),
"dataset_id": dataset_ids[0] if len(dataset_ids) == 1 else None,
"dataset_ids": dataset_ids or searched_dataset_ids,
"departure_time": first_result.get("departure_time"),
"service_date": first_result.get("service_date"),
"max_transfers": max(0, max_transfers),
"via_transfer_seconds": max(0, transfer_seconds),
"journeys": selected,
}
def _combine_via_journey(first: dict, second: dict) -> dict:
legs = [*first.get("legs", []), *second.get("legs", [])]
departure = first.get("departure_seconds")
arrival = second.get("arrival_seconds")
duration_seconds = None if departure is None or arrival is None else max(0, int(arrival) - int(departure))
features = _combine_via_features(first.get("features") or {}, second.get("features") or {}, first_leg_count=len(first.get("legs", [])))
return {
"transfers": max(0, len(legs) - 1),
"departure_seconds": departure,
"arrival_seconds": arrival,
"departure_time": format_gtfs_time(departure),
"arrival_time": format_gtfs_time(arrival),
"departure_time_label": format_gtfs_time_label(departure),
"arrival_time_label": format_gtfs_time_label(arrival),
"duration_seconds": duration_seconds,
"duration_minutes": duration_minutes_ceil(duration_seconds),
"duration_label": format_duration_label(duration_seconds),
"legs": legs,
"features": feature_collection(features),
"via_forced": True,
}
def _combine_via_features(first_features: dict, second_features: dict, first_leg_count: int) -> list[dict]:
features = []
first_collection = first_features.get("features") if isinstance(first_features, dict) else []
second_collection = second_features.get("features") if isinstance(second_features, dict) else []
for feature in first_collection or []:
features.append(_copy_via_feature(feature, leg_offset=0, first_part=True))
for feature in second_collection or []:
features.append(_copy_via_feature(feature, leg_offset=first_leg_count, first_part=False))
return features
def _copy_via_feature(feature: dict, *, leg_offset: int, first_part: bool) -> dict:
copied = json.loads(json.dumps(feature))
props = copied.setdefault("properties", {})
if isinstance(props.get("leg"), int):
props["leg"] = int(props["leg"]) + leg_offset
if props.get("feature_type") == "journey_stop":
if first_part and props.get("role") == "end":
props["role"] = "transfer"
elif not first_part and props.get("role") == "start":
props["role"] = "transfer"
return copied
def _journey_dataset_pairs(from_selection: StopSelection, to_selection: StopSelection) -> list[tuple[int, int]]:
pairs = [
(first_dataset_id, second_dataset_id)
for first_dataset_id in from_selection.stop_ids_by_dataset
for second_dataset_id in to_selection.stop_ids_by_dataset
]
pairs.sort(key=lambda item: (item[0] != item[1], item[0], item[1]))
return pairs[:MAX_JOURNEY_DATASET_PAIRS]
def _source_payloads_for_dataset_ids(db: Session, dataset_ids: list[int]) -> list[dict]:
if not dataset_ids:
return []
rows = db.execute(
select(Dataset.id, Source.id, Source.name)
.join(Source, Source.id == Dataset.source_id)
.where(Dataset.id.in_(dataset_ids))
.order_by(Source.name, Source.id)
).all()
payloads = []
seen = set()
for dataset_id, source_id, source_name in rows:
if source_id in seen:
continue
seen.add(source_id)
payloads.append({"id": source_id, "name": source_name, "dataset_id": dataset_id})
return payloads
def _journey_sort_key(journey: dict) -> tuple[float, float, float, int, int]:
arrival = journey.get("arrival_seconds")
departure = journey.get("departure_seconds")
transfers = int(journey.get("transfers") or 0)
walking_seconds = sum(
float(leg.get("distance_m") or 0) / 1.35
for leg in journey.get("legs") or []
if leg.get("mode") == "walk"
)
recommended_arrival = None if arrival is None else float(arrival) + transfers * 600 + walking_seconds
transit_legs = sum(1 for leg in journey.get("legs") or [] if leg.get("mode") != "walk")
return (
float("inf") if recommended_arrival is None else recommended_arrival,
float("inf") if arrival is None else float(arrival),
float("inf") if departure is None else -float(departure),
transfers,
1 if transit_legs == 0 else 0,
)
def _filter_reasonable_journeys(journeys: list[dict]) -> list[dict]:
return [journey for journey in journeys if _journey_is_reasonable(journey)]
def _journey_is_reasonable(journey: dict) -> bool:
path: list[int] = []
for leg in journey.get("legs") or []:
path.extend(_leg_endpoint_canonical_ids(leg))
collapsed: list[int] = []
for canonical_stop_id in path:
if not collapsed or collapsed[-1] != canonical_stop_id:
collapsed.append(canonical_stop_id)
seen: set[int] = set()
for canonical_stop_id in collapsed:
if canonical_stop_id in seen:
return False
seen.add(canonical_stop_id)
return True
def _leg_endpoint_canonical_ids(leg: dict) -> tuple[int, ...]:
ids: list[int] = []
stops = leg.get("stops") or []
for stop in stops:
canonical_id = (stop.get("canonical_stop") or {}).get("id") or stop.get("canonical_stop_id")
if canonical_id is None:
continue
try:
ids.append(int(canonical_id))
except (TypeError, ValueError):
continue
return tuple(ids)
def parse_service_date(value: str | date | None) -> date | None:
if value is None or value == "":
return None
if isinstance(value, date) and not isinstance(value, datetime):
return value
text = str(value).strip()
try:
return date.fromisoformat(text)
except ValueError as exc:
raise ValueError("service_date must be YYYY-MM-DD") from exc
def _service_ids_by_dataset(db: Session, dataset_ids: list[int], service_date: date | None) -> dict[int, set[str] | None]:
if service_date is None or not dataset_ids:
return {dataset_id: None for dataset_id in dataset_ids}
return {dataset_id: _active_service_ids(db, dataset_id, service_date) for dataset_id in dataset_ids}
def _active_service_ids(db: Session, dataset_id: int, service_date: date) -> set[str] | None:
has_calendar = bool(db.scalar(select(exists().where(GtfsCalendar.dataset_id == dataset_id))))
has_calendar_dates = bool(db.scalar(select(exists().where(GtfsCalendarDate.dataset_id == dataset_id))))
if not has_calendar and not has_calendar_dates:
return None
date_int = int(service_date.strftime("%Y%m%d"))
weekday_column = [
GtfsCalendar.monday,
GtfsCalendar.tuesday,
GtfsCalendar.wednesday,
GtfsCalendar.thursday,
GtfsCalendar.friday,
GtfsCalendar.saturday,
GtfsCalendar.sunday,
][service_date.weekday()]
active = set(
db.scalars(
select(GtfsCalendar.service_id).where(
GtfsCalendar.dataset_id == dataset_id,
GtfsCalendar.start_date <= date_int,
GtfsCalendar.end_date >= date_int,
weekday_column.is_(True),
)
).all()
)
exceptions = db.execute(
select(GtfsCalendarDate.service_id, GtfsCalendarDate.exception_type).where(
GtfsCalendarDate.dataset_id == dataset_id,
GtfsCalendarDate.date == date_int,
)
).all()
for service_id, exception_type in exceptions:
if int(exception_type or 0) == 1:
active.add(str(service_id))
elif int(exception_type or 0) == 2:
active.discard(str(service_id))
return active
def _where_trip_service_active(stmt, trip_model, service_ids: set[str] | None):
if service_ids is None:
return stmt
return stmt.where(trip_model.service_id.in_(service_ids))
def _sidecar_service_filter(service_ids: set[str] | None, alias: str = "trips") -> tuple[str, list[object]]:
if service_ids is None:
return "", []
if not service_ids:
return " AND 0", []
service_list = sorted(str(service_id) for service_id in service_ids)
placeholders = ", ".join(["?"] * len(service_list))
return f" AND {alias}.service_id IN ({placeholders})", list(service_list)
def _sidecar_stop_time_columns(alias: str, prefix: str) -> str:
return ", ".join(f"{alias}.{column} AS {prefix}_{column}" for column in GTFS_STOP_TIME_COLUMNS)
def _sidecar_stop_time_from_row(dataset_id: int, row, prefix: str) -> GtfsStopTime:
return GtfsStopTime(
dataset_id=dataset_id,
trip_id=str(row[f"{prefix}_trip_id"]),
stop_id=str(row[f"{prefix}_stop_id"]),
stop_sequence=int(row[f"{prefix}_stop_sequence"]),
arrival_time=row[f"{prefix}_arrival_time"],
departure_time=row[f"{prefix}_departure_time"],
arrival_seconds=row[f"{prefix}_arrival_seconds"],
departure_seconds=row[f"{prefix}_departure_seconds"],
)
def _trip_route_lookup(
db: Session,
dataset_id: int,
trip_ids: list[str],
service_ids: set[str] | None = None,
) -> dict[str, tuple[GtfsTrip, GtfsRoute]]:
if service_ids == set() or not trip_ids:
return {}
service_filter = None if service_ids is None else {str(service_id) for service_id in service_ids}
lookup: dict[str, tuple[GtfsTrip, GtfsRoute]] = {}
for chunk in _chunks(sorted(set(trip_ids)), SQLITE_IN_CHUNK_SIZE):
stmt = (
select(GtfsTrip, GtfsRoute)
.join(GtfsRoute, and_(GtfsRoute.dataset_id == GtfsTrip.dataset_id, GtfsRoute.route_id == GtfsTrip.route_id))
.where(GtfsTrip.dataset_id == dataset_id, GtfsTrip.trip_id.in_(chunk))
)
for trip, route in db.execute(stmt).all():
if service_filter is not None and str(trip.service_id) not in service_filter:
continue
lookup.setdefault(trip.trip_id, (trip, route))
return lookup
def _sidecar_direct_leg_rows(
db: Session,
dataset_id: int,
service_ids: set[str] | None,
from_stop_ids: tuple[str, ...],
to_stop_ids: tuple[str, ...],
earliest_departure: int,
limit: int,
) -> list[tuple[GtfsStopTime, GtfsStopTime, GtfsTrip, GtfsRoute]]:
service_sql, service_params = _sidecar_service_filter(service_ids)
origin_columns = _sidecar_stop_time_columns("origin", "origin")
dest_columns = _sidecar_stop_time_columns("dest", "dest")
from_placeholders = ", ".join(["?"] * len(from_stop_ids))
to_placeholders = ", ".join(["?"] * len(to_stop_ids))
rows = execute_sidecar_query(
db,
dataset_id,
f"""
SELECT {origin_columns}, {dest_columns}, trips.trip_id AS lookup_trip_id
FROM gtfs_stop_times AS origin
JOIN gtfs_stop_times AS dest
ON dest.trip_id = origin.trip_id
AND dest.stop_sequence > origin.stop_sequence
JOIN gtfs_trips AS trips
ON trips.trip_id = origin.trip_id
WHERE origin.stop_id IN ({from_placeholders})
AND dest.stop_id IN ({to_placeholders})
AND (origin.departure_seconds IS NULL OR origin.departure_seconds >= ?)
{service_sql}
ORDER BY origin.departure_seconds, origin.departure_time, dest.arrival_seconds, dest.arrival_time, origin.trip_id
LIMIT ?
""",
[*from_stop_ids, *to_stop_ids, earliest_departure, *service_params, limit],
)
trip_lookup = _trip_route_lookup(db, dataset_id, [str(row["lookup_trip_id"]) for row in rows], service_ids)
results = []
for row in rows:
trip_route = trip_lookup.get(str(row["lookup_trip_id"]))
if trip_route is None:
continue
trip, route = trip_route
results.append(
(
_sidecar_stop_time_from_row(dataset_id, row, "origin"),
_sidecar_stop_time_from_row(dataset_id, row, "dest"),
trip,
route,
)
)
return results
def _sidecar_latest_direct_leg_rows(
db: Session,
dataset_id: int,
service_ids: set[str] | None,
from_stop_ids: tuple[str, ...],
to_stop_ids: tuple[str, ...],
earliest_departure: int,
latest_arrival: int,
excluded_trip_id: str | None,
) -> list[tuple[GtfsStopTime, GtfsStopTime, GtfsTrip, GtfsRoute]]:
service_sql, service_params = _sidecar_service_filter(service_ids)
excluded_sql = " AND origin.trip_id != ?" if excluded_trip_id else ""
origin_columns = _sidecar_stop_time_columns("origin", "origin")
dest_columns = _sidecar_stop_time_columns("dest", "dest")
from_placeholders = ", ".join(["?"] * len(from_stop_ids))
to_placeholders = ", ".join(["?"] * len(to_stop_ids))
params: list[object] = [*from_stop_ids, *to_stop_ids, earliest_departure, latest_arrival, *service_params]
if excluded_trip_id:
params.append(excluded_trip_id)
params.append(120)
rows = execute_sidecar_query(
db,
dataset_id,
f"""
SELECT {origin_columns}, {dest_columns}, trips.trip_id AS lookup_trip_id
FROM gtfs_stop_times AS origin
JOIN gtfs_stop_times AS dest
ON dest.trip_id = origin.trip_id
AND dest.stop_sequence > origin.stop_sequence
JOIN gtfs_trips AS trips
ON trips.trip_id = origin.trip_id
WHERE origin.stop_id IN ({from_placeholders})
AND dest.stop_id IN ({to_placeholders})
AND (origin.departure_seconds IS NULL OR origin.departure_seconds >= ?)
AND (dest.arrival_seconds IS NULL OR dest.arrival_seconds <= ?)
{service_sql}
{excluded_sql}
ORDER BY origin.departure_seconds DESC, origin.departure_time DESC, dest.arrival_seconds DESC, dest.arrival_time DESC, origin.trip_id
LIMIT ?
""",
params,
)
trip_lookup = _trip_route_lookup(db, dataset_id, [str(row["lookup_trip_id"]) for row in rows], service_ids)
results = []
for row in rows:
trip_route = trip_lookup.get(str(row["lookup_trip_id"]))
if trip_route is None:
continue
trip, route = trip_route
results.append(
(
_sidecar_stop_time_from_row(dataset_id, row, "origin"),
_sidecar_stop_time_from_row(dataset_id, row, "dest"),
trip,
route,
)
)
return results
def _sidecar_destination_arrival_rows(
db: Session,
dataset_id: int,
service_ids: set[str] | None,
stop_ids: tuple[str, ...],
earliest_departure: int,
latest_arrival: int | None,
) -> list[tuple[GtfsStopTime, GtfsTrip, GtfsRoute]]:
service_sql, service_params = _sidecar_service_filter(service_ids)
latest_sql = " AND (call.arrival_seconds IS NULL OR call.arrival_seconds <= ?)" if latest_arrival is not None else ""
call_columns = _sidecar_stop_time_columns("call", "call")
stop_placeholders = ", ".join(["?"] * len(stop_ids))
params: list[object] = [*stop_ids, earliest_departure]
if latest_arrival is not None:
params.append(latest_arrival)
params.extend(service_params)
params.append(MAX_TARGET_DESTINATION_ARRIVALS)
rows = execute_sidecar_query(
db,
dataset_id,
f"""
SELECT {call_columns}, trips.trip_id AS lookup_trip_id
FROM gtfs_stop_times AS call
JOIN gtfs_trips AS trips
ON trips.trip_id = call.trip_id
WHERE call.stop_id IN ({stop_placeholders})
AND (call.arrival_seconds IS NULL OR call.arrival_seconds >= ?)
{latest_sql}
{service_sql}
ORDER BY call.arrival_seconds, call.arrival_time, call.trip_id
LIMIT ?
""",
params,
)
trip_lookup = _trip_route_lookup(db, dataset_id, [str(row["lookup_trip_id"]) for row in rows], service_ids)
results = []
for row in rows:
trip_route = trip_lookup.get(str(row["lookup_trip_id"]))
if trip_route is None:
continue
trip, route = trip_route
results.append((_sidecar_stop_time_from_row(dataset_id, row, "call"), trip, route))
return results
def _sidecar_boarding_rows(
db: Session,
dataset_id: int,
service_ids: set[str] | None,
stop_ids: tuple[str, ...],
earliest_departure: int,
limit: int,
latest_departure: int | None = None,
) -> list[tuple[GtfsStopTime, GtfsTrip, GtfsRoute]]:
service_sql, service_params = _sidecar_service_filter(service_ids)
latest_sql = " AND (call.departure_seconds IS NULL OR call.departure_seconds < ?)" if latest_departure is not None else ""
call_columns = _sidecar_stop_time_columns("call", "call")
stop_placeholders = ", ".join(["?"] * len(stop_ids))
params: list[object] = [*stop_ids, earliest_departure]
if latest_departure is not None:
params.append(latest_departure)
params.extend(service_params)
params.append(limit)
rows = execute_sidecar_query(
db,
dataset_id,
f"""
SELECT {call_columns}, trips.trip_id AS lookup_trip_id
FROM gtfs_stop_times AS call
JOIN gtfs_trips AS trips
ON trips.trip_id = call.trip_id
WHERE call.stop_id IN ({stop_placeholders})
AND (call.departure_seconds IS NULL OR call.departure_seconds >= ?)
{latest_sql}
{service_sql}
ORDER BY call.departure_seconds, call.departure_time, call.trip_id
LIMIT ?
""",
params,
)
trip_lookup = _trip_route_lookup(db, dataset_id, [str(row["lookup_trip_id"]) for row in rows], service_ids)
results = []
for row in rows:
trip_route = trip_lookup.get(str(row["lookup_trip_id"]))
if trip_route is None:
continue
trip, route = trip_route
results.append((_sidecar_stop_time_from_row(dataset_id, row, "call"), trip, route))
return results
def _chunks[T](items: list[T], size: int) -> Iterator[list[T]]:
for index in range(0, len(items), size):
yield items[index : index + size]
def parse_gtfs_time(value: str | None) -> int | None:
if not value:
return None
parts = value.strip().split(":")
if len(parts) == 2:
parts.append("0")
if len(parts) != 3:
return None
try:
hours, minutes, seconds = [int(part) for part in parts]
except ValueError:
return None
if hours < 0 or minutes < 0 or minutes > 59 or seconds < 0 or seconds > 59:
return None
return hours * 3600 + minutes * 60 + seconds
def format_gtfs_time(seconds: int | None) -> str | None:
if seconds is None:
return None
hours = seconds // 3600
minutes = (seconds % 3600) // 60
secs = seconds % 60
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
def format_gtfs_time_label(seconds: int | None) -> str | None:
if seconds is None:
return None
service_day = seconds // 86_400
seconds_in_day = seconds % 86_400
hours = seconds_in_day // 3600
minutes = (seconds_in_day % 3600) // 60
secs = seconds_in_day % 60
clock = f"{hours:02d}:{minutes:02d}" if secs == 0 else f"{hours:02d}:{minutes:02d}:{secs:02d}"
return clock if service_day == 0 else f"+{service_day}d {clock}"
def duration_minutes_ceil(seconds: int | float | None) -> int | None:
if seconds is None:
return None
return max(0, int(math.ceil(float(seconds) / 60)))
def format_duration_label(seconds: int | float | None) -> str | None:
minutes_total = duration_minutes_ceil(seconds)
if minutes_total is None:
return None
days = minutes_total // (24 * 60)
remaining = minutes_total % (24 * 60)
hours = remaining // 60
minutes = remaining % 60
if days:
return f"{days}d {hours:02d}:{minutes:02d}"
if hours:
return f"{hours}:{minutes:02d}"
return f"{minutes} min"
@dataclass
class _RouterLabel:
canonical_stop_id: int
arrival_seconds: int
previous: "_RouterLegBacklink | _RouterWalkBacklink | None" = None
@dataclass(frozen=True)
class _RouterLegBacklink:
previous_label: _RouterLabel
route: GtfsRoute
trip: GtfsTrip
origin: GtfsStopTime
dest: GtfsStopTime
@dataclass(frozen=True)
class _RouterWalkBacklink:
previous_label: _RouterLabel
from_stop: StopSummary
to_stop: StopSummary
distance_m: float
departure_seconds: int
arrival_seconds: int
@dataclass(frozen=True)
class _RouterBoarding:
canonical_stop_id: int
call: GtfsStopTime
trip: GtfsTrip
route: GtfsRoute
ready_seconds: int
def _find_round_journeys(
db: Session,
dataset_id: int,
service_ids: set[str] | None,
from_selection: StopSelection,
to_selection: StopSelection,
earliest_departure: int,
max_transfers: int,
transfer_seconds: int,
latest_arrival: int | None,
limit: int,
stop_cache: dict[tuple[int, str], StopSummary],
osm_stop_cache: dict[tuple[int, str], dict],
) -> list[dict]:
if from_selection.canonical_stop_id is None or to_selection.canonical_stop_id is None:
return []
origin_id = from_selection.canonical_stop_id
target_id = to_selection.canonical_stop_id
best: dict[int, _RouterLabel] = {origin_id: _RouterLabel(origin_id, earliest_departure)}
marked = {origin_id}
solutions: list[_RouterLabel] = []
max_legs = max(1, min(max_transfers + 1, MAX_ROUTER_TRANSIT_LEGS))
for round_index in range(max_legs):
if not marked:
break
boarding_labels = {
stop_id: label
for stop_id in marked
if (label := best.get(stop_id)) is not None
}
walking_labels = _walking_transfer_labels(
db,
dataset_id=dataset_id,
source_labels=boarding_labels,
latest_arrival=latest_arrival,
)
for stop_id, label in walking_labels.items():
current = best.get(stop_id)
accepted = current is None or label.arrival_seconds < current.arrival_seconds
if accepted:
best[stop_id] = label
boarding_labels[stop_id] = label
if stop_id == target_id:
solutions.append(label)
elif stop_id not in boarding_labels:
boarding_labels[stop_id] = current
board_ready = {
stop_id: ready_seconds
for stop_id, label in boarding_labels.items()
if (ready_seconds := label.arrival_seconds + (0 if label.previous is None else transfer_seconds)) is not None
and (latest_arrival is None or ready_seconds < latest_arrival)
}
if not board_ready:
break
boardings = _router_boardings_for_marked_stops(
db=db,
dataset_id=dataset_id,
service_ids=service_ids,
board_ready=board_ready,
latest_arrival=latest_arrival,
)
if not boardings:
break
next_marked: set[int] = set()
calls_by_trip = _stop_times_by_trip(db, dataset_id, sorted({boarding.trip.trip_id for boarding in boardings}))
stop_to_canonical = _canonical_ids_for_trip_calls(db, dataset_id, calls_by_trip)
for boarding in boardings:
previous_label = best.get(boarding.canonical_stop_id)
if previous_label is None:
continue
calls = calls_by_trip.get(boarding.trip.trip_id, [])
for call in calls:
if call.stop_sequence <= boarding.call.stop_sequence:
continue
canonical_stop_id = stop_to_canonical.get(call.stop_id)
if canonical_stop_id is None:
continue
arrival = _arrival_seconds(call)
if arrival is None or arrival < boarding.ready_seconds:
continue
if latest_arrival is not None and arrival >= latest_arrival:
continue
current = best.get(canonical_stop_id)
if current is not None and current.arrival_seconds <= arrival:
continue
label = _RouterLabel(
canonical_stop_id=canonical_stop_id,
arrival_seconds=arrival,
previous=_RouterLegBacklink(
previous_label=previous_label,
route=boarding.route,
trip=boarding.trip,
origin=boarding.call,
dest=call,
),
)
best[canonical_stop_id] = label
next_marked.add(canonical_stop_id)
if canonical_stop_id == target_id:
solutions.append(label)
marked = next_marked
if len(solutions) >= limit and round_index > 0:
break
journeys = []
for label in sorted(solutions, key=lambda item: item.arrival_seconds)[: max(limit * 2, limit)]:
legs = _router_label_legs(db, dataset_id, label, stop_cache, osm_stop_cache)
if legs:
journeys.append(_journey_payload(legs))
return sorted(journeys, key=_journey_sort_key)[:limit]
def _walking_transfer_labels(
db: Session,
dataset_id: int,
source_labels: dict[int, _RouterLabel],
latest_arrival: int | None,
) -> dict[int, _RouterLabel]:
if not source_labels:
return {}
source_labels = dict(
sorted(source_labels.items(), key=lambda item: (item[1].arrival_seconds, item[0]))[
:MAX_WALKING_TRANSFER_SOURCE_STOPS
]
)
nearby_rows = (
_walking_transfer_rows_postgres(db, dataset_id, tuple(source_labels))
if settings.is_postgresql_database
else _walking_transfer_rows_sqlite(db, dataset_id, tuple(source_labels))
)
labels: dict[int, _RouterLabel] = {}
stop_summaries = _canonical_stop_summaries(
db,
dataset_id,
{stop_id for row in nearby_rows for stop_id in (int(row[0]), int(row[1]))},
)
for source_id, target_id, distance_m in nearby_rows:
source_label = source_labels.get(source_id)
if source_label is None:
continue
from_stop = stop_summaries.get(int(source_id))
to_stop = stop_summaries.get(int(target_id))
if from_stop is None or to_stop is None:
continue
walk_seconds = _walking_transfer_seconds(distance_m)
arrival = source_label.arrival_seconds + walk_seconds
if latest_arrival is not None and arrival >= latest_arrival:
continue
current = labels.get(target_id)
if current is not None and current.arrival_seconds <= arrival:
continue
labels[target_id] = _RouterLabel(
canonical_stop_id=target_id,
arrival_seconds=arrival,
previous=_RouterWalkBacklink(
previous_label=source_label,
from_stop=from_stop,
to_stop=to_stop,
distance_m=float(distance_m or 0),
departure_seconds=source_label.arrival_seconds,
arrival_seconds=arrival,
),
)
return labels
def _canonical_stop_summaries(db: Session, dataset_id: int, canonical_stop_ids: set[int]) -> dict[int, StopSummary]:
if not canonical_stop_ids:
return {}
rows = db.scalars(select(CanonicalStop).where(CanonicalStop.id.in_(canonical_stop_ids))).all()
return {
canonical.id: StopSummary(
id=canonical.id,
dataset_id=dataset_id,
stop_id=f"canonical:{canonical.id}",
name=canonical.name,
lat=canonical.lat,
lon=canonical.lon,
)
for canonical in rows
}
def _walking_transfer_rows_postgres(
db: Session,
dataset_id: int,
source_ids: tuple[int, ...],
) -> list[tuple[int, int, float]]:
if not source_ids:
return []
stmt = text(
"""
WITH nearby AS (
SELECT
src.id AS source_id,
dest.id AS target_id,
ST_DistanceSphere(src.geom, dest.geom) AS distance_m,
row_number() OVER (
PARTITION BY src.id
ORDER BY ST_DistanceSphere(src.geom, dest.geom), dest.id
) AS rn
FROM canonical_stops AS src
JOIN canonical_stops AS dest
ON dest.id != src.id
AND src.geom IS NOT NULL
AND dest.geom IS NOT NULL
AND dest.geom && ST_Expand(src.geom, :radius_deg)
AND ST_DWithin(src.geom, dest.geom, :radius_deg)
WHERE src.id IN :source_ids
AND EXISTS (
SELECT 1
FROM canonical_stop_links AS link
WHERE link.canonical_stop_id = dest.id
AND link.dataset_id = :dataset_id
AND link.object_type = 'gtfs_stop'
)
)
SELECT source_id, target_id, distance_m
FROM nearby
WHERE rn <= :neighbor_limit
ORDER BY source_id, distance_m, target_id
"""
).bindparams(bindparam("source_ids", expanding=True))
rows = db.execute(
stmt,
{
"dataset_id": dataset_id,
"source_ids": source_ids,
"radius_deg": WALKING_TRANSFER_RADIUS_DEG,
"neighbor_limit": MAX_WALKING_TRANSFER_NEIGHBORS_PER_STOP,
},
).all()
return [(int(source_id), int(target_id), float(distance_m or 0)) for source_id, target_id, distance_m in rows]
def _walking_transfer_rows_sqlite(
db: Session,
dataset_id: int,
source_ids: tuple[int, ...],
) -> list[tuple[int, int, float]]:
if not source_ids:
return []
source_rows = db.execute(
select(CanonicalStop.id, CanonicalStop.lat, CanonicalStop.lon).where(CanonicalStop.id.in_(source_ids))
).all()
sources = {
int(stop_id): (float(lat), float(lon))
for stop_id, lat, lon in source_rows
if lat is not None and lon is not None
}
if not sources:
return []
lat_delta = WALKING_TRANSFER_RADIUS_M / 111_320
min_lat = min(lat for lat, _ in sources.values()) - lat_delta
max_lat = max(lat for lat, _ in sources.values()) + lat_delta
min_lon = min(lon for _, lon in sources.values()) - lat_delta
max_lon = max(lon for _, lon in sources.values()) + lat_delta
dest_rows = db.execute(
select(CanonicalStop.id, CanonicalStop.lat, CanonicalStop.lon)
.join(CanonicalStopLink, CanonicalStopLink.canonical_stop_id == CanonicalStop.id)
.where(
CanonicalStopLink.dataset_id == dataset_id,
CanonicalStopLink.object_type == "gtfs_stop",
CanonicalStop.lat >= min_lat,
CanonicalStop.lat <= max_lat,
CanonicalStop.lon >= min_lon,
CanonicalStop.lon <= max_lon,
)
.distinct()
).all()
rows: list[tuple[int, int, float]] = []
for source_id, (source_lat, source_lon) in sources.items():
candidates = []
for target_id, target_lat, target_lon in dest_rows:
if int(target_id) == source_id or target_lat is None or target_lon is None:
continue
distance_m = _distance_m(source_lat, source_lon, float(target_lat), float(target_lon))
if distance_m <= WALKING_TRANSFER_RADIUS_M:
candidates.append((source_id, int(target_id), distance_m))
rows.extend(
sorted(candidates, key=lambda item: (item[2], item[1]))[:MAX_WALKING_TRANSFER_NEIGHBORS_PER_STOP]
)
return rows
def _walking_transfer_seconds(distance_m: float) -> int:
return max(30, int(math.ceil(float(distance_m or 0) / WALKING_TRANSFER_SPEED_MPS)))
def _distance_m(lat_a: float, lon_a: float, lat_b: float, lon_b: float) -> float:
mean_lat = math.radians((lat_a + lat_b) / 2)
meters_per_lon = 111_320 * math.cos(mean_lat)
dx = (lon_b - lon_a) * meters_per_lon
dy = (lat_b - lat_a) * 111_320
return math.hypot(dx, dy)
def _router_boardings_for_marked_stops(
db: Session,
dataset_id: int,
service_ids: set[str] | None,
board_ready: dict[int, int],
latest_arrival: int | None = None,
) -> list[_RouterBoarding]:
if not board_ready:
return []
stop_ids_by_canonical = _gtfs_stop_ids_for_canonical_ids(db, dataset_id, set(board_ready))
stop_to_canonical = {
stop_id: canonical_stop_id
for canonical_stop_id, stop_ids in stop_ids_by_canonical.items()
for stop_id in stop_ids
}
stop_ids = tuple(stop_to_canonical)
if not stop_ids:
return []
boardings: list[_RouterBoarding] = []
seen: set[str] = set()
earliest = min(board_ready.values())
for call, trip, route in _router_boarding_rows(db, dataset_id, service_ids, stop_ids, earliest, latest_arrival):
canonical_stop_id = stop_to_canonical.get(call.stop_id)
if canonical_stop_id is None:
continue
ready = board_ready.get(canonical_stop_id)
departure = _departure_seconds(call)
if ready is None or departure is None or departure < ready:
continue
if trip.trip_id in seen:
continue
seen.add(trip.trip_id)
boardings.append(
_RouterBoarding(
canonical_stop_id=canonical_stop_id,
call=call,
trip=trip,
route=route,
ready_seconds=ready,
)
)
if len(boardings) >= MAX_ROUTER_BOARDING_CANDIDATES:
break
return sorted(boardings, key=lambda item: (_departure_seconds(item.call) or 10**9, item.trip.trip_id))
def _router_boarding_rows(
db: Session,
dataset_id: int,
service_ids: set[str] | None,
stop_ids: tuple[str, ...],
earliest: int,
latest_departure: int | None = None,
) -> list[tuple[GtfsStopTime, GtfsTrip, GtfsRoute]]:
if service_ids == set():
return []
if uses_sidecar_stop_times(db, dataset_id):
return _sidecar_boarding_rows(
db=db,
dataset_id=dataset_id,
service_ids=service_ids,
stop_ids=stop_ids,
earliest_departure=earliest,
latest_departure=latest_departure,
limit=MAX_ROUTER_BOARDING_CANDIDATES * 2,
)
stmt = (
select(GtfsStopTime, GtfsTrip, GtfsRoute)
.join(GtfsTrip, and_(GtfsTrip.dataset_id == GtfsStopTime.dataset_id, GtfsTrip.trip_id == GtfsStopTime.trip_id))
.join(GtfsRoute, and_(GtfsRoute.dataset_id == GtfsTrip.dataset_id, GtfsRoute.route_id == GtfsTrip.route_id))
.where(
GtfsStopTime.dataset_id == dataset_id,
GtfsStopTime.stop_id.in_(stop_ids),
or_(GtfsStopTime.departure_seconds.is_(None), GtfsStopTime.departure_seconds >= earliest),
)
.order_by(GtfsStopTime.departure_seconds, GtfsStopTime.departure_time, GtfsStopTime.trip_id)
.limit(MAX_ROUTER_BOARDING_CANDIDATES * 2)
)
stmt = _where_trip_service_active(stmt, GtfsTrip, service_ids)
if latest_departure is not None:
stmt = stmt.where(or_(GtfsStopTime.departure_seconds.is_(None), GtfsStopTime.departure_seconds < latest_departure))
return db.execute(stmt).all()
def _gtfs_stop_ids_for_canonical_ids(
db: Session,
dataset_id: int,
canonical_stop_ids: set[int],
) -> dict[int, tuple[str, ...]]:
if not canonical_stop_ids:
return {}
rows = db.execute(
select(CanonicalStopLink.canonical_stop_id, CanonicalStopLink.external_id)
.where(
CanonicalStopLink.object_type == "gtfs_stop",
CanonicalStopLink.dataset_id == dataset_id,
CanonicalStopLink.canonical_stop_id.in_(canonical_stop_ids),
)
.order_by(CanonicalStopLink.canonical_stop_id, CanonicalStopLink.external_id)
).all()
grouped: dict[int, list[str]] = {}
for canonical_stop_id, stop_id in rows:
grouped.setdefault(int(canonical_stop_id), []).append(str(stop_id))
return {canonical_stop_id: tuple(stop_ids) for canonical_stop_id, stop_ids in grouped.items()}
def _canonical_ids_for_trip_calls(
db: Session,
dataset_id: int,
calls_by_trip: dict[str, list[GtfsStopTime]],
) -> dict[str, int]:
stop_ids = sorted({call.stop_id for calls in calls_by_trip.values() for call in calls})
if not stop_ids:
return {}
rows = db.execute(
select(CanonicalStopLink.external_id, CanonicalStopLink.canonical_stop_id)
.where(
CanonicalStopLink.object_type == "gtfs_stop",
CanonicalStopLink.dataset_id == dataset_id,
CanonicalStopLink.external_id.in_(stop_ids),
)
).all()
return {str(stop_id): int(canonical_stop_id) for stop_id, canonical_stop_id in rows}
def _router_label_legs(
db: Session,
dataset_id: int,
label: _RouterLabel,
stop_cache: dict[tuple[int, str], StopSummary],
osm_stop_cache: dict[tuple[int, str], dict],
) -> list[dict]:
backlinks: list[_RouterLegBacklink | _RouterWalkBacklink] = []
current = label
while current.previous is not None:
backlinks.append(current.previous)
current = current.previous.previous_label
backlinks.reverse()
legs = []
for backlink in backlinks:
if isinstance(backlink, _RouterWalkBacklink):
legs.append(_walk_leg_payload(db, backlink, dataset_id))
continue
legs.append(
_leg_payload(
db=db,
dataset_id=dataset_id,
route=backlink.route,
trip=backlink.trip,
origin=backlink.origin,
dest=backlink.dest,
stop_cache=stop_cache,
osm_stop_cache=osm_stop_cache,
)
)
return legs
def _find_walk_only_journey(
db: Session,
*,
from_selection: StopSelection,
to_selection: StopSelection,
departure_seconds: int,
) -> dict | None:
if from_selection.canonical_stop_id is None or to_selection.canonical_stop_id is None:
return None
if from_selection.canonical_stop_id == to_selection.canonical_stop_id:
return None
if (
from_selection.display.lon is None
or from_selection.display.lat is None
or to_selection.display.lon is None
or to_selection.display.lat is None
):
return None
direct_distance_m = _distance_m(
float(from_selection.display.lat),
float(from_selection.display.lon),
float(to_selection.display.lat),
float(to_selection.display.lon),
)
if direct_distance_m > PUBLIC_TRANSPORT_WALK_OPTION_MAX_SECONDS * 1.35:
return None
try:
route = route_between_points(
db,
from_lon=float(from_selection.display.lon),
from_lat=float(from_selection.display.lat),
to_lon=float(to_selection.display.lon),
to_lat=float(to_selection.display.lat),
mode="walk",
max_visited=80_000,
)
except Exception: # noqa: BLE001 - walking comparison is optional
return None
duration_seconds = float(route.get("duration_seconds") or 0)
if duration_seconds <= 0 or duration_seconds > PUBLIC_TRANSPORT_WALK_OPTION_MAX_SECONDS:
return None
arrival_seconds = departure_seconds + int(math.ceil(duration_seconds))
leg = _walk_leg_payload(
db,
_RouterWalkBacklink(
previous_label=_RouterLabel(
canonical_stop_id=from_selection.canonical_stop_id,
arrival_seconds=departure_seconds,
),
from_stop=from_selection.display,
to_stop=to_selection.display,
distance_m=float(route.get("distance_m") or 0),
departure_seconds=departure_seconds,
arrival_seconds=arrival_seconds,
),
from_selection.display.dataset_id,
)
leg["route_name"] = "Walk only"
leg["duration_seconds"] = duration_seconds
return _journey_payload([leg])
def _find_direct_journeys(
db: Session,
dataset_id: int,
service_ids: set[str] | None,
from_stop_ids: tuple[str, ...],
to_stop_ids: tuple[str, ...],
earliest_departure: int,
limit: int,
stop_cache: dict[tuple[int, str], StopSummary],
osm_stop_cache: dict[tuple[int, str], dict],
) -> list[dict]:
candidates = [
_journey_payload([leg])
for leg in _find_direct_legs(
db,
dataset_id,
service_ids,
from_stop_ids,
to_stop_ids,
earliest_departure,
stop_cache,
osm_stop_cache,
max_legs=max(limit * 4, limit),
)
]
return sorted(candidates, key=_journey_sort_key)[:limit]
def _find_direct_legs(
db: Session,
dataset_id: int,
service_ids: set[str] | None,
from_stop_ids: tuple[str, ...],
to_stop_ids: tuple[str, ...],
earliest_departure: int,
stop_cache: dict[tuple[int, str], StopSummary],
osm_stop_cache: dict[tuple[int, str], dict],
max_legs: int = 20,
) -> list[dict]:
if not from_stop_ids or not to_stop_ids:
return []
if service_ids == set():
return []
if uses_sidecar_stop_times(db, dataset_id):
rows = _sidecar_direct_leg_rows(
db=db,
dataset_id=dataset_id,
service_ids=service_ids,
from_stop_ids=from_stop_ids,
to_stop_ids=to_stop_ids,
earliest_departure=earliest_departure,
limit=MAX_DIRECT_ROWS,
)
candidates: list[dict] = []
seen: set[tuple[object, ...]] = set()
for origin, dest, trip, route in rows:
dep_seconds = _departure_seconds(origin)
arr_seconds = _arrival_seconds(dest)
if dep_seconds is None or arr_seconds is None:
continue
if dep_seconds < earliest_departure or arr_seconds < dep_seconds:
continue
key = (route.route_id, route.short_name, origin.stop_id, dest.stop_id, dep_seconds, arr_seconds)
if key in seen:
continue
seen.add(key)
candidates.append(_leg_payload(db, dataset_id, route, trip, origin, dest, stop_cache, osm_stop_cache))
if len(candidates) >= max(1, max_legs):
break
return sorted(candidates, key=lambda item: (item["arrival_seconds"], -(item["departure_seconds"] or -1)))
Origin = aliased(GtfsStopTime)
Dest = aliased(GtfsStopTime)
stmt = (
select(Origin, Dest, GtfsTrip, GtfsRoute)
.join(
Dest,
and_(
Dest.dataset_id == Origin.dataset_id,
Dest.trip_id == Origin.trip_id,
Dest.stop_sequence > Origin.stop_sequence,
),
)
.join(GtfsTrip, and_(GtfsTrip.dataset_id == Origin.dataset_id, GtfsTrip.trip_id == Origin.trip_id))
.join(GtfsRoute, and_(GtfsRoute.dataset_id == GtfsTrip.dataset_id, GtfsRoute.route_id == GtfsTrip.route_id))
.where(Origin.dataset_id == dataset_id, Origin.stop_id.in_(from_stop_ids), Dest.stop_id.in_(to_stop_ids))
.where(or_(Origin.departure_seconds.is_(None), Origin.departure_seconds >= earliest_departure))
.order_by(Origin.departure_seconds, Origin.departure_time, Dest.arrival_seconds, Dest.arrival_time, Origin.trip_id)
.limit(MAX_DIRECT_ROWS)
)
stmt = _where_trip_service_active(stmt, GtfsTrip, service_ids)
candidates: list[dict] = []
seen: set[tuple[object, ...]] = set()
for origin, dest, trip, route in db.execute(stmt).all():
dep_seconds = _departure_seconds(origin)
arr_seconds = _arrival_seconds(dest)
if dep_seconds is None or arr_seconds is None:
continue
if dep_seconds < earliest_departure or arr_seconds < dep_seconds:
continue
key = (route.route_id, route.short_name, origin.stop_id, dest.stop_id, dep_seconds, arr_seconds)
if key in seen:
continue
seen.add(key)
leg = _leg_payload(db, dataset_id, route, trip, origin, dest, stop_cache, osm_stop_cache)
candidates.append(leg)
if len(candidates) >= max(1, max_legs):
break
return sorted(candidates, key=lambda item: (item["arrival_seconds"], -(item["departure_seconds"] or -1)))
@dataclass(frozen=True)
class _FirstLegOption:
departure_seconds: int
arrival_seconds: int
origin: GtfsStopTime
dest: GtfsStopTime
trip: GtfsTrip
route: GtfsRoute
@dataclass(frozen=True)
class _SecondLegOption:
canonical_stop_id: int
departure_seconds: int
arrival_seconds: int
origin: GtfsStopTime
dest: GtfsStopTime
trip: GtfsTrip
route: GtfsRoute
@dataclass(frozen=True)
class _OneTransferCandidate:
arrival_seconds: int
departure_seconds: int
first_route: GtfsRoute
first_trip: GtfsTrip
first_origin: GtfsStopTime
first_dest: GtfsStopTime
second: _SecondLegOption
final_walk: _RouterWalkBacklink | None = None
@dataclass(frozen=True)
class _AccessTransferCandidate:
canonical_stop_id: int
option: _FirstLegOption
rank: int
def _find_one_transfer_journeys(
db: Session,
first_dataset_id: int,
second_dataset_id: int,
first_service_ids: set[str] | None,
second_service_ids: set[str] | None,
from_stop_ids: tuple[str, ...],
to_stop_ids: tuple[str, ...],
origin_canonical_stop_id: int | None,
target_canonical_stop_id: int | None,
earliest_departure: int,
latest_arrival: int | None,
transfer_seconds: int,
limit: int,
stop_cache: dict[tuple[int, str], StopSummary],
osm_stop_cache: dict[tuple[int, str], dict],
) -> list[dict]:
if first_service_ids == set() or second_service_ids == set():
return []
if latest_arrival is not None and latest_arrival <= earliest_departure:
return []
destination_groups = _destination_stop_groups_with_final_walks(
db,
dataset_id=second_dataset_id,
to_stop_ids=to_stop_ids,
target_canonical_stop_id=target_canonical_stop_id,
)
second_legs: dict[int, list[_SecondLegOption]] = {}
final_walk_by_canonical: dict[int, _RouterWalkBacklink] = {}
for destination_stop_ids, group_walks in destination_groups:
group_second_legs = _targeted_second_leg_options(
db,
second_dataset_id,
second_service_ids,
destination_stop_ids,
earliest_departure,
latest_arrival,
)
for canonical_stop_id, options in group_second_legs.items():
second_legs.setdefault(canonical_stop_id, []).extend(options)
final_walk_by_canonical.update(group_walks)
if not second_legs:
return []
second_dest_canonical = _canonical_ids_for_stop_ids(
db,
second_dataset_id,
{option.dest.stop_id for options in second_legs.values() for option in options},
)
transfer_stop_ids_by_canonical = _gtfs_stop_ids_for_canonical_ids(db, first_dataset_id, set(second_legs))
candidates: list[_OneTransferCandidate] = []
seen: set[tuple[object, ...]] = set()
second_leg_options = sorted(
[
(canonical_stop_id, option)
for canonical_stop_id, options in second_legs.items()
for option in options
],
key=lambda item: (item[1].arrival_seconds, -item[1].departure_seconds),
)
latest_first_arrival_limit = max(
(
option.departure_seconds - transfer_seconds
for _, option in second_leg_options
if option.departure_seconds - transfer_seconds >= earliest_departure
),
default=earliest_departure,
)
first_options_by_canonical = _first_leg_options_to_transfer_stops(
db=db,
dataset_id=first_dataset_id,
service_ids=first_service_ids,
from_stop_ids=from_stop_ids,
transfer_stop_ids_by_canonical=transfer_stop_ids_by_canonical,
earliest_departure=earliest_departure,
latest_arrival=latest_first_arrival_limit,
)
searched_second_legs = 0
best_candidate_arrival: int | None = None
for canonical_stop_id, second in second_leg_options:
if searched_second_legs >= MAX_BACKWARD_SECOND_LEG_OPTIONS and candidates:
break
if best_candidate_arrival is not None and candidates and second.arrival_seconds > best_candidate_arrival:
break
searched_second_legs += 1
transfer_stop_ids = transfer_stop_ids_by_canonical.get(canonical_stop_id)
if not transfer_stop_ids:
continue
latest_first_arrival = second.departure_seconds - transfer_seconds
if latest_first_arrival < earliest_departure:
continue
excluded_trip_id = second.trip.trip_id if first_dataset_id == second_dataset_id else None
first = _best_first_leg_for_second(
first_options_by_canonical.get(canonical_stop_id, []),
latest_arrival=latest_first_arrival,
excluded_trip_id=excluded_trip_id,
)
if first is None:
continue
if origin_canonical_stop_id is not None and canonical_stop_id == origin_canonical_stop_id:
continue
final_walk_template = final_walk_by_canonical.get(second_dest_canonical.get(second.dest.stop_id))
final_walk = None
candidate_arrival = second.arrival_seconds
if final_walk_template is not None:
if origin_canonical_stop_id is not None and final_walk_template.from_stop.id == origin_canonical_stop_id:
continue
candidate_arrival = second.arrival_seconds + _walking_transfer_seconds(final_walk_template.distance_m)
if latest_arrival is not None and candidate_arrival >= latest_arrival:
continue
final_walk = _RouterWalkBacklink(
previous_label=final_walk_template.previous_label,
from_stop=final_walk_template.from_stop,
to_stop=final_walk_template.to_stop,
distance_m=final_walk_template.distance_m,
departure_seconds=second.arrival_seconds,
arrival_seconds=candidate_arrival,
)
key = (
first_dataset_id,
first.trip.trip_id,
first.origin.stop_sequence,
first.dest.stop_id,
second_dataset_id,
second.trip.trip_id,
second.origin.stop_sequence,
second.dest.stop_sequence,
None if final_walk is None else final_walk.to_stop.stop_id,
)
if key in seen:
continue
seen.add(key)
best_candidate_arrival = candidate_arrival if best_candidate_arrival is None else min(best_candidate_arrival, candidate_arrival)
candidates.append(
_OneTransferCandidate(
arrival_seconds=candidate_arrival,
departure_seconds=first.departure_seconds,
first_route=first.route,
first_trip=first.trip,
first_origin=first.origin,
first_dest=first.dest,
second=second,
final_walk=final_walk,
)
)
if len(candidates) >= MAX_TARGET_TRANSFER_CANDIDATES:
break
tightened_candidates = _latest_feeder_by_onward_leg(candidates)
journeys: list[dict] = []
for candidate in sorted(tightened_candidates, key=_one_transfer_candidate_sort_key)[
: max(limit * 4, limit)
]:
first_leg = _leg_payload(
db,
first_dataset_id,
candidate.first_route,
candidate.first_trip,
candidate.first_origin,
candidate.first_dest,
stop_cache,
osm_stop_cache,
)
second_leg = _leg_payload(
db,
second_dataset_id,
candidate.second.route,
candidate.second.trip,
candidate.second.origin,
candidate.second.dest,
stop_cache,
osm_stop_cache,
)
legs = [first_leg, second_leg]
if candidate.final_walk is not None:
legs.append(_walk_leg_payload(db, candidate.final_walk, second_dataset_id))
journeys.append(_journey_payload(legs))
return sorted(journeys, key=_journey_sort_key)[:limit]
def _destination_stop_groups_with_final_walks(
db: Session,
dataset_id: int,
to_stop_ids: tuple[str, ...],
target_canonical_stop_id: int | None,
) -> list[tuple[tuple[str, ...], dict[int, _RouterWalkBacklink]]]:
if target_canonical_stop_id is None:
return [(to_stop_ids, {})]
target_summary = _canonical_stop_summaries(db, dataset_id, {target_canonical_stop_id}).get(target_canonical_stop_id)
if target_summary is None:
return [(to_stop_ids, {})]
nearby_rows = (
_walking_transfer_rows_postgres(db, dataset_id, (target_canonical_stop_id,))
if settings.is_postgresql_database
else _walking_transfer_rows_sqlite(db, dataset_id, (target_canonical_stop_id,))
)
nearby_ids = [int(target_id) for _, target_id, _ in nearby_rows]
if not nearby_ids:
return [(to_stop_ids, {})]
summaries = _canonical_stop_summaries(db, dataset_id, set(nearby_ids))
final_walk_by_canonical: dict[int, _RouterWalkBacklink] = {}
for _, nearby_id, distance_m in nearby_rows:
nearby_id = int(nearby_id)
from_summary = summaries.get(nearby_id)
if from_summary is None:
continue
final_walk_by_canonical[nearby_id] = _RouterWalkBacklink(
previous_label=_RouterLabel(nearby_id, 0),
from_stop=from_summary,
to_stop=target_summary,
distance_m=float(distance_m or 0),
departure_seconds=0,
arrival_seconds=0,
)
stop_ids_by_canonical = _gtfs_stop_ids_for_canonical_ids(db, dataset_id, set(final_walk_by_canonical))
groups: list[tuple[tuple[str, ...], dict[int, _RouterWalkBacklink]]] = [(to_stop_ids, {})]
for canonical_stop_id, stop_ids in stop_ids_by_canonical.items():
walk = final_walk_by_canonical.get(canonical_stop_id)
if not stop_ids or walk is None:
continue
groups.append((stop_ids[:MAX_GROUP_STOP_IDS], {canonical_stop_id: walk}))
return groups
def _canonical_ids_for_stop_ids(db: Session, dataset_id: int, stop_ids: set[str]) -> dict[str, int]:
if not stop_ids:
return {}
rows = db.execute(
select(CanonicalStopLink.external_id, CanonicalStopLink.canonical_stop_id)
.where(
CanonicalStopLink.object_type == "gtfs_stop",
CanonicalStopLink.dataset_id == dataset_id,
CanonicalStopLink.external_id.in_(stop_ids),
)
).all()
return {str(stop_id): int(canonical_stop_id) for stop_id, canonical_stop_id in rows}
def _find_access_transfer_journeys(
db: Session,
from_selection: StopSelection,
to_stop_id: int | str,
earliest_departure: int,
max_transfers: int,
transfer_seconds: int,
limit: int,
source_ids: list[int] | None,
service_date: date | None,
stop_cache: dict[tuple[int, str], StopSummary],
osm_stop_cache: dict[tuple[int, str], dict],
) -> list[dict]:
journeys: list[dict] = []
for dataset_id, from_stop_ids in from_selection.stop_ids_by_dataset.items():
service_ids = _service_ids_by_dataset(db, [dataset_id], service_date).get(dataset_id)
if service_ids == set():
continue
candidates = _access_transfer_candidates(
db=db,
dataset_id=dataset_id,
service_ids=service_ids,
from_selection=from_selection,
from_stop_ids=from_stop_ids,
earliest_departure=earliest_departure,
)
for candidate in candidates:
access_leg = _leg_payload(
db=db,
dataset_id=dataset_id,
route=candidate.option.route,
trip=candidate.option.trip,
origin=candidate.option.origin,
dest=candidate.option.dest,
stop_cache=stop_cache,
osm_stop_cache=osm_stop_cache,
)
onward_departure = format_gtfs_time(candidate.option.arrival_seconds + transfer_seconds)
if onward_departure is None:
continue
try:
onward = find_journeys(
db=db,
from_stop_id=_stop_place_token(candidate.canonical_stop_id, dataset_id),
to_stop_id=to_stop_id,
departure=onward_departure,
max_transfers=max(0, max_transfers - 1),
limit=limit,
transfer_seconds=transfer_seconds,
source_ids=source_ids,
service_date=service_date,
_allow_access_transfer=False,
)
except ValueError:
continue
for onward_journey in onward.get("journeys", [])[:limit]:
journeys.append(_prepend_access_leg_to_journey(access_leg, onward_journey))
if len(journeys) >= limit * 3:
break
if len(journeys) >= limit * 3:
break
return sorted(journeys, key=_journey_sort_key)[:limit]
def _access_transfer_candidates(
db: Session,
dataset_id: int,
service_ids: set[str] | None,
from_selection: StopSelection,
from_stop_ids: tuple[str, ...],
earliest_departure: int,
) -> list[_AccessTransferCandidate]:
boardings = _origin_boardings(
db=db,
dataset_id=dataset_id,
service_ids=service_ids,
stop_ids=from_stop_ids,
earliest_departure=earliest_departure,
latest_departure=earliest_departure + ACCESS_TRANSFER_MAX_SECONDS,
)
if not boardings:
return []
calls_by_trip = _stop_times_by_trip(db, dataset_id, sorted({boarding.trip.trip_id for boarding in boardings}))
stop_to_canonical = _canonical_ids_for_trip_calls(db, dataset_id, calls_by_trip)
canonical_ids = sorted(set(stop_to_canonical.values()))
canonical_names = {
int(canonical.id): canonical.name
for canonical in db.scalars(select(CanonicalStop).where(CanonicalStop.id.in_(canonical_ids))).all()
}
candidates: dict[int, _AccessTransferCandidate] = {}
for boarding in boardings:
departure = _departure_seconds(boarding.call)
if departure is None or departure < earliest_departure:
continue
for call in calls_by_trip.get(boarding.trip.trip_id, []):
if call.stop_sequence <= boarding.call.stop_sequence:
continue
arrival = _arrival_seconds(call)
if arrival is None or arrival < departure:
continue
if arrival - earliest_departure > ACCESS_TRANSFER_MAX_SECONDS:
break
canonical_stop_id = stop_to_canonical.get(call.stop_id)
if canonical_stop_id is None or canonical_stop_id == from_selection.canonical_stop_id:
continue
stop_name = _stop_name_for_stop_id(db, dataset_id, call.stop_id)
rank = _station_importance_rank(canonical_names.get(canonical_stop_id), stop_name)
if rank > 1:
continue
option = _FirstLegOption(
departure_seconds=departure,
arrival_seconds=arrival,
origin=boarding.call,
dest=call,
trip=boarding.trip,
route=boarding.route,
)
current = candidates.get(canonical_stop_id)
candidate = _AccessTransferCandidate(canonical_stop_id=canonical_stop_id, option=option, rank=rank)
if current is None or _access_transfer_sort_key(candidate) < _access_transfer_sort_key(current):
candidates[canonical_stop_id] = candidate
return sorted(candidates.values(), key=_access_transfer_sort_key)[:MAX_ACCESS_TRANSFER_CANDIDATES]
def _access_transfer_sort_key(candidate: _AccessTransferCandidate) -> tuple[int, int, int, str]:
return (
candidate.rank,
candidate.option.arrival_seconds,
candidate.option.arrival_seconds - candidate.option.departure_seconds,
candidate.option.dest.stop_id,
)
def _stop_name_for_stop_id(db: Session, dataset_id: int, stop_id: str) -> str | None:
stop = db.scalar(select(GtfsStop).where(GtfsStop.dataset_id == dataset_id, GtfsStop.stop_id == stop_id))
return None if stop is None else stop.name
def _prepend_access_leg_to_journey(access_leg: dict, onward_journey: dict) -> dict:
access_payload = _journey_payload([access_leg])
access_features = access_payload.get("features") or {}
onward_features = onward_journey.get("features") or {}
features = _combine_via_features(access_features, onward_features, first_leg_count=1)
legs = [access_payload["legs"][0], *(onward_journey.get("legs") or [])]
departure = access_leg.get("departure_seconds")
arrival = onward_journey.get("arrival_seconds")
transit_legs = [leg for leg in legs if leg.get("mode") != "walk"]
duration_seconds = None if departure is None or arrival is None else max(0, int(arrival) - int(departure))
return {
"transfers": max(0, len(transit_legs) - 1),
"departure_seconds": departure,
"arrival_seconds": arrival,
"departure_time": format_gtfs_time(departure),
"arrival_time": format_gtfs_time(arrival),
"departure_time_label": format_gtfs_time_label(departure),
"arrival_time_label": format_gtfs_time_label(arrival),
"duration_seconds": duration_seconds,
"duration_minutes": duration_minutes_ceil(duration_seconds),
"duration_label": format_duration_label(duration_seconds),
"legs": legs,
"features": feature_collection(features),
"access_transfer_composed": True,
}
def _first_leg_options_to_transfer_stops(
db: Session,
dataset_id: int,
service_ids: set[str] | None,
from_stop_ids: tuple[str, ...],
transfer_stop_ids_by_canonical: dict[int, tuple[str, ...]],
earliest_departure: int,
latest_arrival: int,
) -> dict[int, list[_FirstLegOption]]:
if not transfer_stop_ids_by_canonical:
return {}
stop_to_canonical = {
stop_id: canonical_stop_id
for canonical_stop_id, stop_ids in transfer_stop_ids_by_canonical.items()
for stop_id in stop_ids
}
if not stop_to_canonical:
return {}
boardings = _origin_boardings(
db=db,
dataset_id=dataset_id,
service_ids=service_ids,
stop_ids=from_stop_ids,
earliest_departure=earliest_departure,
latest_departure=latest_arrival,
)
if not boardings:
return {}
calls_by_trip = _stop_times_by_trip(db, dataset_id, sorted({boarding.trip.trip_id for boarding in boardings}))
grouped: dict[int, list[_FirstLegOption]] = {}
seen: set[tuple[object, ...]] = set()
for boarding in boardings:
departure = _departure_seconds(boarding.call)
if departure is None or departure < earliest_departure:
continue
calls = calls_by_trip.get(boarding.trip.trip_id, [])
for call in calls:
if call.stop_sequence <= boarding.call.stop_sequence:
continue
canonical_stop_id = stop_to_canonical.get(call.stop_id)
if canonical_stop_id is None:
continue
arrival = _arrival_seconds(call)
if arrival is None or arrival < departure or arrival > latest_arrival:
continue
key = (canonical_stop_id, boarding.trip.trip_id, boarding.call.stop_sequence, call.stop_sequence)
if key in seen:
continue
seen.add(key)
grouped.setdefault(canonical_stop_id, []).append(
_FirstLegOption(
departure_seconds=departure,
arrival_seconds=arrival,
origin=boarding.call,
dest=call,
trip=boarding.trip,
route=boarding.route,
)
)
for canonical_stop_id, options in grouped.items():
grouped[canonical_stop_id] = sorted(
options,
key=lambda option: (option.departure_seconds, option.arrival_seconds),
reverse=True,
)[:MAX_TRANSFER_BOARDINGS]
return grouped
def _best_first_leg_for_second(
options: list[_FirstLegOption],
latest_arrival: int,
excluded_trip_id: str | None,
) -> _FirstLegOption | None:
for option in options:
if excluded_trip_id and option.trip.trip_id == excluded_trip_id:
continue
if option.arrival_seconds <= latest_arrival:
return option
return None
def _latest_feeder_by_onward_leg(candidates: list[_OneTransferCandidate]) -> list[_OneTransferCandidate]:
latest_by_second_leg: dict[tuple[object, ...], _OneTransferCandidate] = {}
for candidate in candidates:
key = (
candidate.second.canonical_stop_id,
candidate.second.trip.dataset_id,
candidate.second.trip.trip_id,
candidate.second.origin.stop_sequence,
candidate.second.dest.stop_sequence,
candidate.second.departure_seconds,
candidate.second.arrival_seconds,
)
current = latest_by_second_leg.get(key)
if current is None or _one_transfer_feeder_rank(candidate) > _one_transfer_feeder_rank(current):
latest_by_second_leg[key] = candidate
return list(latest_by_second_leg.values())
def _one_transfer_feeder_rank(candidate: _OneTransferCandidate) -> tuple[int, int]:
first_arrival = _arrival_seconds(candidate.first_dest) or -1
return (candidate.departure_seconds, first_arrival)
def _one_transfer_candidate_sort_key(candidate: _OneTransferCandidate) -> tuple[float, float, int]:
return (
float(candidate.arrival_seconds),
-float(candidate.departure_seconds),
1,
)
def _latest_direct_leg_to_stops(
db: Session,
dataset_id: int,
service_ids: set[str] | None,
from_stop_ids: tuple[str, ...],
to_stop_ids: tuple[str, ...],
earliest_departure: int,
latest_arrival: int,
excluded_trip_id: str | None = None,
) -> _FirstLegOption | None:
if not from_stop_ids or not to_stop_ids:
return None
if service_ids == set():
return None
if uses_sidecar_stop_times(db, dataset_id):
rows = _sidecar_latest_direct_leg_rows(
db=db,
dataset_id=dataset_id,
service_ids=service_ids,
from_stop_ids=from_stop_ids,
to_stop_ids=to_stop_ids,
earliest_departure=earliest_departure,
latest_arrival=latest_arrival,
excluded_trip_id=excluded_trip_id,
)
for origin, dest, trip, route in rows:
departure = _departure_seconds(origin)
arrival = _arrival_seconds(dest)
if departure is None or arrival is None:
continue
if departure < earliest_departure or arrival > latest_arrival or arrival < departure:
continue
return _FirstLegOption(
departure_seconds=departure,
arrival_seconds=arrival,
origin=origin,
dest=dest,
trip=trip,
route=route,
)
return None
Origin = aliased(GtfsStopTime)
Dest = aliased(GtfsStopTime)
stmt = (
select(Origin, Dest, GtfsTrip, GtfsRoute)
.join(
Dest,
and_(
Dest.dataset_id == Origin.dataset_id,
Dest.trip_id == Origin.trip_id,
Dest.stop_sequence > Origin.stop_sequence,
),
)
.join(GtfsTrip, and_(GtfsTrip.dataset_id == Origin.dataset_id, GtfsTrip.trip_id == Origin.trip_id))
.join(GtfsRoute, and_(GtfsRoute.dataset_id == GtfsTrip.dataset_id, GtfsRoute.route_id == GtfsTrip.route_id))
.where(
Origin.dataset_id == dataset_id,
Origin.stop_id.in_(from_stop_ids),
Dest.stop_id.in_(to_stop_ids),
or_(Origin.departure_seconds.is_(None), Origin.departure_seconds >= earliest_departure),
or_(Dest.arrival_seconds.is_(None), Dest.arrival_seconds <= latest_arrival),
)
.order_by(
Origin.departure_seconds.desc(),
Origin.departure_time.desc(),
Dest.arrival_seconds.desc(),
Dest.arrival_time.desc(),
Origin.trip_id,
)
.limit(120)
)
stmt = _where_trip_service_active(stmt, GtfsTrip, service_ids)
if excluded_trip_id:
stmt = stmt.where(GtfsTrip.trip_id != excluded_trip_id)
for origin, dest, trip, route in db.execute(stmt).all():
departure = _departure_seconds(origin)
arrival = _arrival_seconds(dest)
if departure is None or arrival is None:
continue
if departure < earliest_departure or arrival > latest_arrival or arrival < departure:
continue
return _FirstLegOption(
departure_seconds=departure,
arrival_seconds=arrival,
origin=origin,
dest=dest,
trip=trip,
route=route,
)
return None
def _targeted_second_leg_options(
db: Session,
dataset_id: int,
service_ids: set[str] | None,
to_stop_ids: tuple[str, ...],
earliest_departure: int,
latest_arrival: int | None,
) -> dict[int, list[_SecondLegOption]]:
if not to_stop_ids:
return {}
if service_ids == set():
return {}
destination_rows = _destination_arrivals(db, dataset_id, service_ids, to_stop_ids, earliest_departure, latest_arrival)
if not destination_rows:
return {}
calls_by_trip = _stop_times_by_trip(db, dataset_id, sorted({trip.trip_id for _, trip, _ in destination_rows}))
stop_to_canonical = _canonical_ids_for_trip_calls(db, dataset_id, calls_by_trip)
grouped: dict[int, list[_SecondLegOption]] = {}
seen: set[tuple[object, ...]] = set()
to_stop_id_set = set(to_stop_ids)
for dest, trip, route in destination_rows:
dest_arrival = _arrival_seconds(dest)
if dest_arrival is None:
continue
for call in calls_by_trip.get(trip.trip_id, []):
if call.stop_sequence >= dest.stop_sequence:
break
if call.stop_id in to_stop_id_set:
continue
departure = _departure_seconds(call)
if departure is None or departure < earliest_departure or departure > dest_arrival:
continue
canonical_stop_id = stop_to_canonical.get(call.stop_id)
if canonical_stop_id is None:
continue
key = (canonical_stop_id, trip.trip_id, call.stop_sequence, dest.stop_sequence)
if key in seen:
continue
seen.add(key)
grouped.setdefault(canonical_stop_id, []).append(
_SecondLegOption(
canonical_stop_id=canonical_stop_id,
departure_seconds=departure,
arrival_seconds=dest_arrival,
origin=call,
dest=dest,
trip=trip,
route=route,
)
)
capped: dict[int, list[_SecondLegOption]] = {}
for canonical_stop_id, options in grouped.items():
selected = sorted(options, key=lambda item: (item.departure_seconds, item.arrival_seconds))[
:MAX_TARGET_SECOND_LEGS_PER_STOP
]
if selected:
capped[canonical_stop_id] = selected
return capped
def _destination_arrivals(
db: Session,
dataset_id: int,
service_ids: set[str] | None,
stop_ids: tuple[str, ...],
earliest_departure: int,
latest_arrival: int | None,
) -> list[tuple[GtfsStopTime, GtfsTrip, GtfsRoute]]:
if service_ids == set():
return []
if uses_sidecar_stop_times(db, dataset_id):
rows = _sidecar_destination_arrival_rows(
db=db,
dataset_id=dataset_id,
service_ids=service_ids,
stop_ids=stop_ids,
earliest_departure=earliest_departure,
latest_arrival=latest_arrival,
)
selected = []
for stop_time, trip, route in rows:
arrival = _arrival_seconds(stop_time)
if arrival is None or arrival < earliest_departure:
continue
if latest_arrival is not None and arrival >= latest_arrival:
continue
selected.append((stop_time, trip, route))
return selected
stmt = (
select(GtfsStopTime, GtfsTrip, GtfsRoute)
.join(GtfsTrip, and_(GtfsTrip.dataset_id == GtfsStopTime.dataset_id, GtfsTrip.trip_id == GtfsStopTime.trip_id))
.join(GtfsRoute, and_(GtfsRoute.dataset_id == GtfsTrip.dataset_id, GtfsRoute.route_id == GtfsTrip.route_id))
.where(GtfsStopTime.dataset_id == dataset_id, GtfsStopTime.stop_id.in_(stop_ids))
.where(or_(GtfsStopTime.arrival_seconds.is_(None), GtfsStopTime.arrival_seconds >= earliest_departure))
.order_by(GtfsStopTime.arrival_seconds, GtfsStopTime.arrival_time, GtfsStopTime.trip_id)
.limit(MAX_TARGET_DESTINATION_ARRIVALS)
)
stmt = _where_trip_service_active(stmt, GtfsTrip, service_ids)
if latest_arrival is not None:
stmt = stmt.where(or_(GtfsStopTime.arrival_seconds.is_(None), GtfsStopTime.arrival_seconds <= latest_arrival))
rows = []
for stop_time, trip, route in db.execute(stmt).all():
arrival = _arrival_seconds(stop_time)
if arrival is None or arrival < earliest_departure:
continue
if latest_arrival is not None and arrival >= latest_arrival:
continue
rows.append((stop_time, trip, route))
return rows
@dataclass(frozen=True)
class _Boarding:
call: GtfsStopTime
trip: GtfsTrip
route: GtfsRoute
def _origin_boardings(
db: Session,
dataset_id: int,
service_ids: set[str] | None,
stop_ids: tuple[str, ...],
earliest_departure: int,
latest_departure: int | None = None,
) -> list[_Boarding]:
if not stop_ids:
return []
if service_ids == set():
return []
if uses_sidecar_stop_times(db, dataset_id):
boardings: list[_Boarding] = []
for call, trip, route in _sidecar_boarding_rows(
db=db,
dataset_id=dataset_id,
service_ids=service_ids,
stop_ids=stop_ids,
earliest_departure=earliest_departure,
latest_departure=latest_departure,
limit=MAX_DIRECT_ROWS,
):
departure = _departure_seconds(call)
if departure is None or departure < earliest_departure:
continue
if latest_departure is not None and departure >= latest_departure:
continue
boardings.append(_Boarding(call=call, trip=trip, route=route))
if len(boardings) >= MAX_TRANSFER_BOARDINGS:
break
return boardings
stmt = (
select(GtfsStopTime, GtfsTrip, GtfsRoute)
.join(GtfsTrip, and_(GtfsTrip.dataset_id == GtfsStopTime.dataset_id, GtfsTrip.trip_id == GtfsStopTime.trip_id))
.join(GtfsRoute, and_(GtfsRoute.dataset_id == GtfsTrip.dataset_id, GtfsRoute.route_id == GtfsTrip.route_id))
.where(GtfsStopTime.dataset_id == dataset_id, GtfsStopTime.stop_id.in_(stop_ids))
.where(or_(GtfsStopTime.departure_seconds.is_(None), GtfsStopTime.departure_seconds >= earliest_departure))
.order_by(GtfsStopTime.departure_seconds, GtfsStopTime.departure_time, GtfsStopTime.trip_id)
.limit(MAX_DIRECT_ROWS)
)
stmt = _where_trip_service_active(stmt, GtfsTrip, service_ids)
if latest_departure is not None:
stmt = stmt.where(or_(GtfsStopTime.departure_seconds.is_(None), GtfsStopTime.departure_seconds < latest_departure))
boardings: list[_Boarding] = []
for call, trip, route in db.execute(stmt).all():
departure = _departure_seconds(call)
if departure is None or departure < earliest_departure:
continue
if latest_departure is not None and departure >= latest_departure:
continue
boardings.append(_Boarding(call=call, trip=trip, route=route))
if len(boardings) >= MAX_TRANSFER_BOARDINGS:
break
return boardings
def _stop_times_by_trip(db: Session, dataset_id: int, trip_ids: list[str]) -> dict[str, list[GtfsStopTime]]:
return storage_stop_times_by_trip(db, dataset_id, trip_ids)
def _leg_payload(
db: Session,
dataset_id: int,
route: GtfsRoute,
trip: GtfsTrip,
origin: GtfsStopTime,
dest: GtfsStopTime,
stop_cache: dict[tuple[int, str], StopSummary],
osm_stop_cache: dict[tuple[int, str], dict],
) -> dict:
from_stop = _stop_for_id(db, dataset_id, origin.stop_id, stop_cache)
to_stop = _stop_for_id(db, dataset_id, dest.stop_id, stop_cache)
departure_seconds = _departure_seconds(origin)
arrival_seconds = _arrival_seconds(dest)
linked_route_pattern = route_pattern_for_trip(db, route, trip)
stops = _leg_stop_payloads(
db=db,
dataset_id=dataset_id,
trip_id=trip.trip_id,
start_sequence=origin.stop_sequence,
end_sequence=dest.stop_sequence,
stop_cache=stop_cache,
osm_stop_cache=osm_stop_cache,
)
geometry, geometry_source, route_pattern = _leg_geometry(db, linked_route_pattern, route, trip, from_stop, to_stop, stops)
source = _source_payload_for_dataset_id(db, dataset_id)
stop_count = len(stops)
return {
"dataset_id": dataset_id,
"source_id": None if source is None else source["id"],
"source_name": None if source is None else source["name"],
"route_db_id": route.id,
"route_id": route.route_id,
"route_ref": route.short_name,
"route_name": route.long_name,
"mode": route.mode,
"operator": route.operator_name,
"trip_id": trip.trip_id,
"route_pattern_id": None if route_pattern is None else route_pattern.id,
"route_pattern_source": None if route_pattern is None else route_pattern.source_kind,
"route_pattern_status": None if route_pattern is None else route_pattern.status,
"from": _stop_payload(from_stop),
"to": _stop_payload(to_stop),
"departure_seconds": departure_seconds,
"arrival_seconds": arrival_seconds,
"departure_time": format_gtfs_time(departure_seconds),
"arrival_time": format_gtfs_time(arrival_seconds),
"departure_time_label": format_gtfs_time_label(departure_seconds),
"arrival_time_label": format_gtfs_time_label(arrival_seconds),
"stop_count": stop_count,
"intermediate_stop_count": max(0, stop_count - 2),
"geometry": geometry,
"geometry_source": geometry_source,
"stops": stops,
}
def _journey_payload(legs: list[dict]) -> dict:
departure = legs[0]["departure_seconds"]
arrival = legs[-1]["arrival_seconds"]
duration_seconds = None if departure is None or arrival is None else max(0, int(arrival) - int(departure))
transit_legs = [leg for leg in legs if leg.get("mode") != "walk"]
features = []
for index, leg in enumerate(legs, start=1):
if leg["geometry"] is None:
continue
features.append(
{
"type": "Feature",
"geometry": leg["geometry"],
"properties": {
"leg": index,
"route_id": leg["route_id"],
"route_ref": leg["route_ref"],
"mode": leg["mode"],
"trip_id": leg["trip_id"],
"route_pattern_id": leg.get("route_pattern_id"),
"route_pattern_source": leg.get("route_pattern_source"),
"route_pattern_status": leg.get("route_pattern_status"),
"geometry_source": leg["geometry_source"],
},
}
)
features.extend(_journey_stop_features(legs))
return {
"transfers": max(0, len(transit_legs) - 1),
"departure_seconds": departure,
"arrival_seconds": arrival,
"departure_time": format_gtfs_time(departure),
"arrival_time": format_gtfs_time(arrival),
"departure_time_label": format_gtfs_time_label(departure),
"arrival_time_label": format_gtfs_time_label(arrival),
"duration_seconds": duration_seconds,
"duration_minutes": duration_minutes_ceil(duration_seconds),
"duration_label": format_duration_label(duration_seconds),
"legs": [_leg_public_payload(leg) for leg in legs],
"features": feature_collection(features),
}
def _leg_public_payload(leg: dict) -> dict:
return {key: value for key, value in leg.items() if key not in {"geometry", "departure_seconds", "arrival_seconds"}}
def _walk_leg_payload(db: Session, backlink: _RouterWalkBacklink, dataset_id: int, *, route_geometry: bool = True) -> dict:
geometry = None
geometry_source = "walking_transfer"
distance_m = round(float(backlink.distance_m or 0), 1)
duration_seconds = max(0, int(backlink.arrival_seconds) - int(backlink.departure_seconds))
arrival_seconds = backlink.arrival_seconds
if (
backlink.from_stop.lon is not None
and backlink.from_stop.lat is not None
and backlink.to_stop.lon is not None
and backlink.to_stop.lat is not None
):
if route_geometry:
routed_geometry, routed_distance, routed_duration_seconds = _walk_geometry_from_routing(db, backlink.from_stop, backlink.to_stop)
else:
routed_geometry, routed_distance, routed_duration_seconds = None, 0.0, None
if routed_geometry is not None:
geometry = routed_geometry
geometry_source = "routing_layer:walk"
distance_m = routed_distance
if routed_duration_seconds is not None:
duration_seconds = max(0, int(math.ceil(routed_duration_seconds)))
arrival_seconds = backlink.departure_seconds + duration_seconds
if geometry is None:
geometry = {
"type": "LineString",
"coordinates": [
[backlink.from_stop.lon, backlink.from_stop.lat],
[backlink.to_stop.lon, backlink.to_stop.lat],
],
}
return {
"dataset_id": dataset_id,
"source_id": None,
"source_name": None,
"route_db_id": None,
"route_id": "walk",
"route_ref": "Walk",
"route_name": "Walking transfer",
"mode": "walk",
"operator": None,
"trip_id": None,
"route_pattern_id": None,
"route_pattern_source": None,
"route_pattern_status": None,
"from": _stop_payload(backlink.from_stop),
"to": _stop_payload(backlink.to_stop),
"departure_seconds": backlink.departure_seconds,
"arrival_seconds": arrival_seconds,
"departure_time": format_gtfs_time(backlink.departure_seconds),
"arrival_time": format_gtfs_time(arrival_seconds),
"departure_time_label": format_gtfs_time_label(backlink.departure_seconds),
"arrival_time_label": format_gtfs_time_label(arrival_seconds),
"distance_m": distance_m,
"duration_seconds": duration_seconds,
"geometry": geometry,
"geometry_source": geometry_source,
"stops": [
_canonical_walk_stop_payload(backlink.from_stop, 1),
_canonical_walk_stop_payload(backlink.to_stop, 2),
],
}
def _walk_geometry_from_routing(db: Session, from_stop: StopSummary, to_stop: StopSummary) -> tuple[dict | None, float, float | None]:
if from_stop.lon is None or from_stop.lat is None or to_stop.lon is None or to_stop.lat is None:
return None, 0.0, None
cache_key = (
round(float(from_stop.lon), 6),
round(float(from_stop.lat), 6),
round(float(to_stop.lon), 6),
round(float(to_stop.lat), 6),
)
cached = _walk_geometry_cache_get(cache_key)
if cached is not None:
return cached
try:
route = route_between_points(
db,
from_lon=float(from_stop.lon),
from_lat=float(from_stop.lat),
to_lon=float(to_stop.lon),
to_lat=float(to_stop.lat),
mode="walk",
max_visited=5_000,
)
except Exception: # noqa: BLE001 - routing graph may be unavailable during import
return None, 0.0, None
features = (route.get("features") or {}).get("features") if isinstance(route, dict) else None
if not isinstance(features, list):
return None, 0.0, None
lines = [
feature.get("geometry")
for feature in features
if isinstance(feature, dict) and (feature.get("geometry") or {}).get("type") == "LineString"
]
coordinates = [
geometry.get("coordinates")
for geometry in lines
if isinstance(geometry, dict) and len(geometry.get("coordinates") or []) >= 2
]
if not coordinates:
return None, 0.0, None
geometry = coordinates[0] if len(coordinates) == 1 else None
duration_seconds = float(route.get("duration_seconds") or 0)
if geometry is not None:
result = ({"type": "LineString", "coordinates": geometry}, float(route.get("distance_m") or 0), duration_seconds)
else:
result = ({"type": "MultiLineString", "coordinates": coordinates}, float(route.get("distance_m") or 0), duration_seconds)
_walk_geometry_cache_put(cache_key, result)
return _copy_walk_geometry_cache_value(result)
def _walk_geometry_cache_get(key: tuple[float, float, float, float]) -> tuple[dict | None, float, float | None] | None:
now = time.monotonic()
with _walk_geometry_cache_lock:
cached = _walk_geometry_cache.get(key)
if cached is None:
return None
expires_at, value = cached
if expires_at <= now:
_walk_geometry_cache.pop(key, None)
return None
return _copy_walk_geometry_cache_value(value)
def _walk_geometry_cache_put(key: tuple[float, float, float, float], value: tuple[dict | None, float, float | None]) -> None:
with _walk_geometry_cache_lock:
_walk_geometry_cache[key] = (time.monotonic() + WALK_GEOMETRY_CACHE_TTL_SECONDS, _copy_walk_geometry_cache_value(value))
if len(_walk_geometry_cache) <= WALK_GEOMETRY_CACHE_MAX_ENTRIES:
return
oldest = sorted(_walk_geometry_cache.items(), key=lambda item: item[1][0])[
: len(_walk_geometry_cache) - WALK_GEOMETRY_CACHE_MAX_ENTRIES
]
for old_key, _ in oldest:
_walk_geometry_cache.pop(old_key, None)
def _copy_walk_geometry_cache_value(value: tuple[dict | None, float, float | None]) -> tuple[dict | None, float, float | None]:
geometry, distance_m, duration_seconds = value
copied_geometry = None if geometry is None else json.loads(json.dumps(geometry))
return copied_geometry, distance_m, duration_seconds
def _canonical_walk_stop_payload(stop: StopSummary, sequence: int) -> dict:
payload = _stop_payload(stop)
payload["stop_sequence"] = sequence
is_external_location = is_location_token(stop.stop_id)
payload["visual_source"] = "address" if is_external_location else "canonical_stop"
payload["visual_lon"] = stop.lon
payload["visual_lat"] = stop.lat
payload["osm"] = None
payload["canonical_stop"] = None if is_external_location else {"id": stop.id, "name": stop.name}
return payload
def _leg_geometry(
db: Session,
linked_route_pattern: RoutePattern | None,
route: GtfsRoute,
trip: GtfsTrip,
from_stop: StopSummary,
to_stop: StopSummary,
fallback_stops: list[dict],
) -> tuple[dict | None, str, RoutePattern | None]:
cache_key = _leg_geometry_cache_key(route, trip, linked_route_pattern, from_stop, to_stop)
cached = _leg_geometry_cache_get(db, cache_key)
if cached is not None:
return cached
route_layer_candidates: list[tuple[str, str | None, RoutePattern | None]] = []
gtfs_shape_candidates: list[tuple[str, str | None, RoutePattern | None]] = []
legacy_candidates: list[tuple[str, str | None, RoutePattern | None]] = []
if linked_route_pattern is not None:
route_layer_candidates.append((f"route_layer:{linked_route_pattern.source_kind}", linked_route_pattern.geometry_geojson, linked_route_pattern))
route_layer_candidates.extend(_alternate_route_pattern_geometry_candidates(db, route, linked_route_pattern))
if trip.shape_id:
shape_row = db.scalar(
select(GtfsShape).where(
GtfsShape.dataset_id == trip.dataset_id,
GtfsShape.shape_id == trip.shape_id,
)
)
if shape_row is not None:
gtfs_shape_candidates.append(("gtfs_shape", shape_row.geometry_geojson, None))
legacy_candidates.append(("legacy_gtfs_route", route.geometry_geojson, None))
full_geometry_candidates = [*route_layer_candidates, *gtfs_shape_candidates]
usable_route_layer_candidates = _usable_geometry_candidates(route_layer_candidates)
for geometry_source, geometry_text, candidate_pattern in _usable_geometry_candidates(full_geometry_candidates):
geometry = _validated_leg_geometry(geometry_text, from_stop, to_stop)
if geometry is not None:
return _leg_geometry_cache_put(cache_key, geometry, geometry_source, candidate_pattern)
stop_coords = _stop_sequence_coords(fallback_stops, from_stop, to_stop)
for geometry_source, geometry_text, candidate_pattern in usable_route_layer_candidates:
stitched = _stitched_partial_geometry(geometry_text, stop_coords)
if stitched is not None:
return _leg_geometry_cache_put(cache_key, stitched, f"{geometry_source}:stitched", candidate_pattern)
for geometry_source, geometry_text, candidate_pattern in _usable_geometry_candidates(legacy_candidates):
geometry = _validated_leg_geometry(geometry_text, from_stop, to_stop)
if geometry is not None:
return _leg_geometry_cache_put(cache_key, geometry, geometry_source, candidate_pattern)
for geometry_source, geometry_text, candidate_pattern in _usable_geometry_candidates(gtfs_shape_candidates):
stitched = _stitched_partial_geometry(geometry_text, stop_coords)
if stitched is not None:
return _leg_geometry_cache_put(cache_key, stitched, f"{geometry_source}:stitched", candidate_pattern)
fallback_geometry, fallback_source = _stop_sequence_fallback_geometry(stop_coords)
if fallback_geometry is not None:
return _leg_geometry_cache_put(cache_key, fallback_geometry, fallback_source, None)
return _leg_geometry_cache_put(cache_key, None, "none", None)
def _leg_geometry_cache_key(
route: GtfsRoute,
trip: GtfsTrip,
linked_route_pattern: RoutePattern | None,
from_stop: StopSummary,
to_stop: StopSummary,
) -> tuple[object, ...]:
return (
route.dataset_id,
route.route_id,
route.id,
_geometry_text_fingerprint(route.geometry_geojson),
trip.shape_id or "",
None if linked_route_pattern is None else linked_route_pattern.id,
_geometry_text_fingerprint(None if linked_route_pattern is None else linked_route_pattern.geometry_geojson),
from_stop.id,
from_stop.stop_id,
to_stop.id,
to_stop.stop_id,
)
def _geometry_text_fingerprint(value: str | None) -> tuple[int, str, str]:
if not value:
return (0, "", "")
text_value = str(value)
return (len(text_value), text_value[:96], text_value[-96:])
def _leg_geometry_cache_get(
db: Session,
cache_key: tuple[object, ...],
) -> tuple[dict | None, str, RoutePattern | None] | None:
now = time.monotonic()
with _leg_geometry_cache_lock:
cached = _leg_geometry_cache.get(cache_key)
if cached is None:
return None
expires_at, geometry, geometry_source, route_pattern_id = cached
if expires_at <= now:
_leg_geometry_cache.pop(cache_key, None)
return None
pattern = db.get(RoutePattern, route_pattern_id) if route_pattern_id is not None else None
return json.loads(json.dumps(geometry)) if geometry is not None else None, geometry_source, pattern
def _leg_geometry_cache_put(
cache_key: tuple[object, ...],
geometry: dict | None,
geometry_source: str,
route_pattern: RoutePattern | None,
) -> tuple[dict | None, str, RoutePattern | None]:
stored_geometry = json.loads(json.dumps(geometry)) if geometry is not None else None
with _leg_geometry_cache_lock:
_leg_geometry_cache[cache_key] = (
time.monotonic() + LEG_GEOMETRY_CACHE_TTL_SECONDS,
stored_geometry,
geometry_source,
None if route_pattern is None else int(route_pattern.id),
)
if len(_leg_geometry_cache) > LEG_GEOMETRY_CACHE_MAX_ENTRIES:
oldest_keys = sorted(
_leg_geometry_cache,
key=lambda key: _leg_geometry_cache[key][0],
)[: len(_leg_geometry_cache) - LEG_GEOMETRY_CACHE_MAX_ENTRIES]
for oldest_key in oldest_keys:
_leg_geometry_cache.pop(oldest_key, None)
return geometry, geometry_source, route_pattern
def _usable_geometry_candidates(
candidates: list[tuple[str, str | None, RoutePattern | None]]
) -> list[tuple[str, str, RoutePattern | None]]:
seen_geometry: set[str] = set()
usable: list[tuple[str, str, RoutePattern | None]] = []
for geometry_source, geometry_text, candidate_pattern in candidates:
if not geometry_text or geometry_text in seen_geometry:
continue
seen_geometry.add(geometry_text)
usable.append((geometry_source, geometry_text, candidate_pattern))
return usable
def _alternate_route_pattern_geometry_candidates(
db: Session,
route: GtfsRoute,
linked_route_pattern: RoutePattern | None,
) -> list[tuple[str, str | None, RoutePattern | None]]:
route_refs = [value for value in [route.short_name, route.route_id] if value]
if not route_refs:
return []
stmt = (
select(RoutePattern)
.where(RoutePattern.route_ref.in_(route_refs))
.order_by(
case((RoutePattern.source_kind == "osm", 0), else_=1),
RoutePattern.confidence.desc(),
RoutePattern.id,
)
.limit(40)
)
if route.mode:
stmt = stmt.where(or_(RoutePattern.mode == route.mode, RoutePattern.mode.is_(None)))
if linked_route_pattern is not None:
stmt = stmt.where(RoutePattern.id != linked_route_pattern.id)
return [
(f"route_layer:{pattern.source_kind}:alternate", pattern.geometry_geojson, pattern)
for pattern in db.scalars(stmt).all()
]
def _validated_leg_geometry(geometry_text: str, from_stop: StopSummary, to_stop: StopSummary) -> dict | None:
full_geometry = json.loads(geometry_text)
if from_stop.lon is None or from_stop.lat is None or to_stop.lon is None or to_stop.lat is None:
return full_geometry
try:
segment = _segment_between_stops(shape(full_geometry), from_stop, to_stop)
if segment is None or segment.is_empty or segment.length == 0:
return None
return mapping(segment)
except Exception: # noqa: BLE001 - route geometry clipping should not break journey search
return None
def _stop_sequence_fallback_geometry(
coords: list[tuple[float, float]],
) -> tuple[dict | None, str]:
if len(coords) < 2:
return None, "none"
source = "stop_sequence_fallback" if len(coords) > 2 else "stop_straight_line_fallback"
return mapping(LineString(coords)), source
def _stop_sequence_coords(
stops: list[dict],
from_stop: StopSummary,
to_stop: StopSummary,
) -> list[tuple[float, float]]:
coords: list[tuple[float, float]] = []
for stop in stops:
lon = _float_or_none(stop.get("visual_lon", stop.get("lon")))
lat = _float_or_none(stop.get("visual_lat", stop.get("lat")))
_append_coord(coords, lon, lat)
if not stops:
_append_coord(coords, from_stop.lon, from_stop.lat)
_append_coord(coords, to_stop.lon, to_stop.lat)
else:
if _stop_payload_coord(stops[0]) is None:
_prepend_coord(coords, from_stop.lon, from_stop.lat)
if _stop_payload_coord(stops[-1]) is None:
_append_coord(coords, to_stop.lon, to_stop.lat)
if len(coords) < 2:
_prepend_coord(coords, from_stop.lon, from_stop.lat)
_append_coord(coords, to_stop.lon, to_stop.lat)
return coords
def _stitched_partial_geometry(geometry_text: str, stop_coords: list[tuple[float, float]]) -> dict | None:
if len(stop_coords) < 2:
return None
try:
geom = shape(json.loads(geometry_text))
except Exception: # noqa: BLE001 - invalid geometry should not break routing
return None
line = _stitchable_line_for_geometry(geom, stop_coords)
if line is None or line.length == 0:
return None
matches = _stop_projection_matches(line, stop_coords)
if not matches:
return None
first_match = matches[0]
last_match = matches[-1]
start_stop_index, start_measure, end_stop_index, end_measure = _partial_line_measure_range(line, stop_coords, matches)
if start_stop_index is None or end_stop_index is None or start_measure is None or end_measure is None:
return None
if abs(end_measure - start_measure) <= 1e-12:
return None
route_segment = substring(line, min(start_measure, end_measure), max(start_measure, end_measure))
if route_segment.is_empty or route_segment.length == 0 or not isinstance(route_segment, LineString):
return None
if start_measure > end_measure:
route_segment = LineString(list(route_segment.coords)[::-1])
coords: list[tuple[float, float]] = []
for coord in stop_coords[:start_stop_index]:
_append_coord(coords, coord[0], coord[1])
for coord in route_segment.coords:
_append_coord(coords, float(coord[0]), float(coord[1]))
for coord in stop_coords[end_stop_index + 1 :]:
_append_coord(coords, coord[0], coord[1])
if len(coords) < 2:
return None
if len(coords) == len(stop_coords) and all(_coords_equal(left, right) for left, right in zip(coords, stop_coords)):
return None
return mapping(LineString(coords))
def _stitchable_line_for_geometry(geom, stop_coords: list[tuple[float, float]]) -> LineString | None:
if isinstance(geom, LineString):
return geom
if not isinstance(geom, MultiLineString):
return None
merged = linemerge(geom)
if isinstance(merged, LineString):
return merged
if not isinstance(merged, MultiLineString):
return None
stop_points = [Point(coord) for coord in stop_coords]
def score(line: LineString) -> tuple[int, float, float]:
distances = [line.distance(point) for point in stop_points]
near_count = sum(distance <= LEG_GEOMETRY_MAX_STOP_DISTANCE_DEG for distance in distances)
return (near_count, -sum(distances), line.length)
best = max(merged.geoms, key=score, default=None)
if best is None or score(best)[0] == 0:
return None
return best
def _stop_projection_matches(line: LineString, stop_coords: list[tuple[float, float]]) -> list[tuple[int, float, float]]:
matches = []
for index, coord in enumerate(stop_coords):
point = Point(coord)
distance = line.distance(point)
if distance <= LEG_GEOMETRY_MAX_STOP_DISTANCE_DEG:
matches.append((index, line.project(point), distance))
return matches
def _partial_line_measure_range(
line: LineString,
stop_coords: list[tuple[float, float]],
matches: list[tuple[int, float, float]],
) -> tuple[int | None, float | None, int | None, float | None]:
first_match = matches[0]
last_match = matches[-1]
direction = _projection_direction(matches)
start_index = first_match[0]
end_index = last_match[0]
start_measure = first_match[1]
end_measure = last_match[1]
if direction is None:
only_index, only_measure, _ = first_match
if only_index < len(stop_coords) - 1:
endpoint = _line_endpoint_toward(line, only_measure, stop_coords[only_index + 1])
if endpoint is None:
return None, None, None, None
start_index = only_index
end_index = only_index
start_measure = only_measure
end_measure = endpoint
elif only_index > 0:
endpoint = _line_endpoint_toward(line, only_measure, stop_coords[only_index - 1])
if endpoint is None:
return None, None, None, None
start_index = only_index
end_index = only_index
start_measure = endpoint
end_measure = only_measure
else:
return None, None, None, None
elif direction > 0:
if start_index > 0:
endpoint = _line_endpoint_toward(line, start_measure, stop_coords[start_index - 1], preferred="before")
if endpoint is not None:
start_measure = endpoint
if end_index < len(stop_coords) - 1:
endpoint = _line_endpoint_toward(line, end_measure, stop_coords[end_index + 1], preferred="after")
if endpoint is not None:
end_measure = endpoint
else:
if end_index < len(stop_coords) - 1:
endpoint = _line_endpoint_toward(line, end_measure, stop_coords[end_index + 1], preferred="before")
if endpoint is not None:
end_measure = endpoint
if start_index > 0:
endpoint = _line_endpoint_toward(line, start_measure, stop_coords[start_index - 1], preferred="after")
if endpoint is not None:
start_measure = endpoint
return start_index, start_measure, end_index, end_measure
def _projection_direction(matches: list[tuple[int, float, float]]) -> int | None:
if len(matches) < 2:
return None
first = matches[0][1]
last = matches[-1][1]
if abs(last - first) <= 1e-12:
return None
return 1 if last > first else -1
def _line_endpoint_toward(
line: LineString,
from_measure: float,
target_coord: tuple[float, float],
preferred: str | None = None,
) -> float | None:
target = Point(target_coord)
candidates = []
if preferred in {None, "before"} and from_measure > 1e-12:
candidates.append(0.0)
if preferred in {None, "after"} and from_measure < line.length - 1e-12:
candidates.append(float(line.length))
if not candidates:
return None
projected_point = line.interpolate(from_measure)
projected_distance = projected_point.distance(target)
endpoint = min(candidates, key=lambda measure: line.interpolate(measure).distance(target))
if line.interpolate(endpoint).distance(target) >= projected_distance:
return None
return endpoint
def _coords_equal(left: tuple[float, float], right: tuple[float, float]) -> bool:
return abs(left[0] - right[0]) < 1e-12 and abs(left[1] - right[1]) < 1e-12
def _append_coord(coords: list[tuple[float, float]], lon: float | None, lat: float | None) -> None:
if lon is None or lat is None:
return
coord = (float(lon), float(lat))
if coords and abs(coords[-1][0] - coord[0]) < 1e-12 and abs(coords[-1][1] - coord[1]) < 1e-12:
return
coords.append(coord)
def _prepend_coord(coords: list[tuple[float, float]], lon: float | None, lat: float | None) -> None:
if lon is None or lat is None:
return
coord = (float(lon), float(lat))
if coords and abs(coords[0][0] - coord[0]) < 1e-12 and abs(coords[0][1] - coord[1]) < 1e-12:
return
coords.insert(0, coord)
def _stop_payload_coord(stop: dict) -> tuple[float, float] | None:
lon = _float_or_none(stop.get("visual_lon", stop.get("lon")))
lat = _float_or_none(stop.get("visual_lat", stop.get("lat")))
if lon is None or lat is None:
return None
return (lon, lat)
def _float_or_none(value) -> float | None:
if value is None:
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def _segment_between_stops(geom, from_stop: StopSummary, to_stop: StopSummary) -> LineString | None:
start_point = Point(from_stop.lon, from_stop.lat)
end_point = Point(to_stop.lon, to_stop.lat)
if geom.distance(start_point) > LEG_GEOMETRY_MAX_STOP_DISTANCE_DEG:
return None
if geom.distance(end_point) > LEG_GEOMETRY_MAX_STOP_DISTANCE_DEG:
return None
if isinstance(geom, LineString):
return _substring_for_points(geom, start_point, end_point)
if isinstance(geom, MultiLineString):
merged = linemerge(geom)
if isinstance(merged, LineString):
return _substring_for_points(merged, start_point, end_point)
if isinstance(merged, MultiLineString):
path = _network_path_for_points(merged, start_point, end_point)
if path is not None:
return path
line = _best_line_for_points(merged, start_point, end_point)
if line is not None:
return _substring_for_points(line, start_point, end_point)
return None
def _substring_for_points(line: LineString, start_point: Point, end_point: Point) -> LineString | None:
if line.length == 0:
return None
start = line.project(start_point)
end = line.project(end_point)
if abs(start - end) <= 1e-12:
return None
segment = substring(line, min(start, end), max(start, end))
if segment.is_empty or segment.length == 0:
return None
if start > end and isinstance(segment, LineString):
segment = LineString(list(segment.coords)[::-1])
return segment if isinstance(segment, LineString) else None
def _network_path_for_points(geom: MultiLineString, start_point: Point, end_point: Point) -> LineString | None:
nodes: dict[tuple[float, float], tuple[float, float]] = {}
graph: dict[tuple[float, float], list[tuple[tuple[float, float], float]]] = {}
def key(coord) -> tuple[float, float]:
return (round(float(coord[0]), 6), round(float(coord[1]), 6))
def add_node(coord) -> tuple[float, float]:
node = key(coord)
nodes.setdefault(node, (float(coord[0]), float(coord[1])))
graph.setdefault(node, [])
return node
for line in geom.geoms:
coords = list(line.coords)
for left, right in zip(coords, coords[1:]):
left_key = add_node(left)
right_key = add_node(right)
weight = Point(nodes[left_key]).distance(Point(nodes[right_key]))
if weight == 0:
continue
graph[left_key].append((right_key, weight))
graph[right_key].append((left_key, weight))
if not nodes:
return None
start_key = _nearest_graph_node(nodes, start_point)
end_key = _nearest_graph_node(nodes, end_point)
if start_key is None or end_key is None:
return None
path_keys = _shortest_path(graph, start_key, end_key)
if not path_keys:
return None
coords = [(start_point.x, start_point.y)]
coords.extend(nodes[node] for node in path_keys)
coords.append((end_point.x, end_point.y))
deduped = []
for coord in coords:
if not deduped or Point(deduped[-1]).distance(Point(coord)) > 1e-10:
deduped.append(coord)
if len(deduped) < 2:
return None
return LineString(deduped)
def _nearest_graph_node(nodes: dict[tuple[float, float], tuple[float, float]], point: Point) -> tuple[float, float] | None:
if not nodes:
return None
return min(nodes, key=lambda node: Point(nodes[node]).distance(point))
def _shortest_path(
graph: dict[tuple[float, float], list[tuple[tuple[float, float], float]]],
start: tuple[float, float],
end: tuple[float, float],
) -> list[tuple[float, float]] | None:
unvisited = {start}
distances = {start: 0.0}
previous: dict[tuple[float, float], tuple[float, float]] = {}
visited: set[tuple[float, float]] = set()
while unvisited:
current = min(unvisited, key=lambda node: distances.get(node, float("inf")))
unvisited.remove(current)
if current == end:
break
visited.add(current)
for neighbor, weight in graph.get(current, []):
if neighbor in visited:
continue
candidate = distances[current] + weight
if candidate < distances.get(neighbor, float("inf")):
distances[neighbor] = candidate
previous[neighbor] = current
unvisited.add(neighbor)
if end not in distances:
return None
path = [end]
while path[-1] != start:
parent = previous.get(path[-1])
if parent is None:
return None
path.append(parent)
path.reverse()
return path
def _best_line_for_points(geom: MultiLineString, start: Point, end: Point) -> LineString | None:
return min(geom.geoms, key=lambda line: line.distance(start) + line.distance(end), default=None)
def _leg_stop_payloads(
db: Session,
dataset_id: int,
trip_id: str,
start_sequence: int,
end_sequence: int,
stop_cache: dict[tuple[int, str], StopSummary],
osm_stop_cache: dict[tuple[int, str], dict],
) -> list[dict]:
rows = stop_times_for_trip_range(db, dataset_id, trip_id, start_sequence, end_sequence)
stops = []
for row in rows:
stop = _stop_for_id(db, dataset_id, row.stop_id, stop_cache)
stops.append(_visual_stop_payload(db, stop, row.stop_sequence, osm_stop_cache))
return stops
def _visual_stop_payload(db: Session, stop: StopSummary, stop_sequence: int, osm_stop_cache: dict[tuple[int, str], dict]) -> dict:
payload = _stop_payload(stop)
payload["stop_sequence"] = stop_sequence
payload["visual_source"] = "gtfs"
payload["visual_lon"] = stop.lon
payload["visual_lat"] = stop.lat
payload["osm"] = None
payload["canonical_stop"] = None
canonical = _canonical_visual_stop(db, stop)
if canonical is not None:
payload["visual_source"] = "canonical_stop"
payload["visual_lon"] = canonical["lon"]
payload["visual_lat"] = canonical["lat"]
payload["canonical_stop"] = {
"id": canonical["id"],
"name": canonical["name"],
}
if canonical["name"]:
payload["name"] = canonical["name"]
return payload
cache_key = (stop.dataset_id, stop.stop_id)
if cache_key not in osm_stop_cache:
osm_stop_cache[cache_key] = _nearest_osm_stop(db, stop) or {}
osm = osm_stop_cache[cache_key]
if osm:
payload["visual_source"] = "osm"
payload["visual_lon"] = osm["lon"]
payload["visual_lat"] = osm["lat"]
payload["osm"] = {
"id": osm["id"],
"dataset_id": osm["dataset_id"],
"osm_type": osm["osm_type"],
"osm_id": osm["osm_id"],
"name": osm["name"],
"distance_m": osm["distance_m"],
}
return payload
def _canonical_visual_stop(db: Session, stop: StopSummary) -> dict | None:
if not stop.id:
return None
link = db.scalar(
select(CanonicalStopLink)
.where(CanonicalStopLink.object_type == "gtfs_stop", CanonicalStopLink.object_id == stop.id)
.order_by(CanonicalStopLink.id)
)
if link is None:
return None
canonical = db.get(CanonicalStop, link.canonical_stop_id)
if canonical is None or canonical.lon is None or canonical.lat is None:
return None
return {
"id": canonical.id,
"name": canonical.name,
"lon": canonical.lon,
"lat": canonical.lat,
}
def _nearest_osm_stop(db: Session, stop: StopSummary) -> dict | None:
if stop.lon is None or stop.lat is None:
return None
active_osm_dataset_ids = [
row[0]
for row in db.execute(select(Dataset.id).where(Dataset.is_active.is_(True), Dataset.kind == "osm_geojson")).all()
]
if not active_osm_dataset_ids:
return None
min_lon = stop.lon - OSM_STOP_MATCH_RADIUS_DEG
max_lon = stop.lon + OSM_STOP_MATCH_RADIUS_DEG
min_lat = stop.lat - OSM_STOP_MATCH_RADIUS_DEG
max_lat = stop.lat + OSM_STOP_MATCH_RADIUS_DEG
candidates = query_osm_features(
db,
active_osm_dataset_ids,
kinds=["stop", "station", "terminal"],
bbox=(min_lon, min_lat, max_lon, max_lat),
limit=80,
)
point = Point(stop.lon, stop.lat)
best = None
for candidate in candidates:
if not candidate.geometry_geojson:
continue
try:
geom = shape(json.loads(candidate.geometry_geojson))
except Exception: # noqa: BLE001 - ignore malformed feature geometry in visual stop matching
continue
representative = geom if isinstance(geom, Point) else geom.representative_point()
distance_deg = representative.distance(point)
if best is None or distance_deg < best["distance_deg"]:
best = {
"id": candidate.id,
"dataset_id": candidate.dataset_id,
"osm_type": candidate.osm_type,
"osm_id": candidate.osm_id,
"name": candidate.name,
"lon": representative.x,
"lat": representative.y,
"distance_deg": distance_deg,
"distance_m": round(distance_deg * 111_320, 1),
}
if best is None or best["distance_deg"] > OSM_STOP_MATCH_RADIUS_DEG:
return None
best.pop("distance_deg", None)
return best
def _journey_stop_features(legs: list[dict]) -> list[dict]:
features_by_key: dict[str, dict] = {}
for leg_index, leg in enumerate(legs, start=1):
stops = leg.get("stops", [])
for stop_index, stop in enumerate(stops):
lon = stop.get("visual_lon")
lat = stop.get("visual_lat")
if lon is None or lat is None:
continue
role = "passed"
if leg_index == 1 and stop_index == 0:
role = "start"
elif leg_index == len(legs) and stop_index == len(stops) - 1:
role = "end"
elif (stop_index == len(stops) - 1 and leg_index < len(legs)) or (stop_index == 0 and leg_index > 1):
role = "transfer"
key = f"{stop['dataset_id']}:{stop['stop_id']}:{round(float(lon), 6)}:{round(float(lat), 6)}"
current = features_by_key.get(key)
if current is not None and _stop_role_rank(current["properties"]["role"]) >= _stop_role_rank(role):
continue
features_by_key[key] = {
"type": "Feature",
"geometry": {"type": "Point", "coordinates": [lon, lat]},
"properties": {
"feature_type": "journey_stop",
"role": role,
"leg": leg_index,
"route_ref": leg.get("route_ref"),
"mode": leg.get("mode"),
"stop_id": stop.get("stop_id"),
"name": stop.get("name"),
"visual_source": stop.get("visual_source"),
"canonical_stop_id": (stop.get("canonical_stop") or {}).get("id"),
"osm_id": (stop.get("osm") or {}).get("osm_id"),
},
}
return list(features_by_key.values())
def _stop_role_rank(role: str) -> int:
return {"passed": 0, "transfer": 1, "start": 2, "end": 2}.get(role, 0)
def _arrival_seconds(stop_time: GtfsStopTime) -> int | None:
return stop_time.arrival_seconds if stop_time.arrival_seconds is not None else parse_gtfs_time(stop_time.arrival_time or stop_time.departure_time)
def _departure_seconds(stop_time: GtfsStopTime) -> int | None:
return stop_time.departure_seconds if stop_time.departure_seconds is not None else parse_gtfs_time(stop_time.departure_time or stop_time.arrival_time)
def _stop_for_id(db: Session, dataset_id: int, stop_id: str, stop_cache: dict[tuple[int, str], StopSummary]) -> StopSummary:
key = (dataset_id, stop_id)
if key in stop_cache:
return stop_cache[key]
summary = _stop_summary_for_stop_id(db, dataset_id, stop_id)
stop_cache[key] = summary
return summary
def _source_payload_for_dataset_id(db: Session, dataset_id: int) -> dict | None:
row = db.execute(
select(Source.id, Source.name)
.join(Dataset, Dataset.source_id == Source.id)
.where(Dataset.id == dataset_id)
).first()
if row is None:
return None
source_id, source_name = row
return {"id": source_id, "name": source_name, "dataset_id": dataset_id}
def _stop_summary_for_stop_id(db: Session, dataset_id: int, stop_id: str) -> StopSummary:
stop = db.scalar(select(GtfsStop).where(GtfsStop.dataset_id == dataset_id, GtfsStop.stop_id == stop_id))
if stop is None:
return StopSummary(id=0, dataset_id=dataset_id, stop_id=stop_id, name=stop_id, lat=None, lon=None)
return _stop_summary(stop)
def _stop_summary(stop: GtfsStop) -> StopSummary:
return StopSummary(
id=stop.id,
dataset_id=stop.dataset_id,
stop_id=stop.stop_id,
name=stop.name,
lat=stop.lat,
lon=stop.lon,
)
def _stop_payload(stop: StopSummary) -> dict:
return {
"id": stop.id,
"dataset_id": stop.dataset_id,
"stop_id": stop.stop_id,
"name": stop.name,
"lat": stop.lat,
"lon": stop.lon,
}
def _active_gtfs_dataset_ids(db: Session, source_ids: Optional[list[int]] = None) -> list[int]:
stmt = select(Dataset.id).where(Dataset.is_active.is_(True), Dataset.kind == "gtfs")
if source_ids:
stmt = stmt.where(Dataset.source_id.in_(source_ids))
return [row[0] for row in db.execute(stmt).all()]
def _journey_leg_signature(leg: dict) -> str:
return "|".join(
str(part or "")
for part in [
leg.get("dataset_id"),
leg.get("route_id"),
leg.get("route_ref"),
leg.get("from", {}).get("name") or leg.get("from", {}).get("stop_id"),
leg.get("to", {}).get("name") or leg.get("to", {}).get("stop_id"),
leg.get("departure_seconds") or leg.get("departure_time"),
leg.get("arrival_seconds") or leg.get("arrival_time"),
]
)