Files
meubility-workbench/app/routing.py
2026-07-01 23:29:51 +02:00

912 lines
33 KiB
Python

from __future__ import annotations
import copy
import heapq
import json
import math
import threading
import time
from collections import OrderedDict
from dataclasses import dataclass
from sqlalchemy import func, select, text
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from app.config import settings
from app.models import Dataset, RoutingEdge, RoutingNode
from app.pipeline.routing_layer import active_routing_dataset
from app.serializers import feature_collection
WALK_HEURISTIC_MPS = 1.6
DRIVE_HEURISTIC_MPS = 36.0
DEFAULT_MAX_VISITED = 160_000
PGR_WALK_BBOX_PADDING_KM = [0.5, 1.5, 4, 10, 25]
PGR_DRIVE_BBOX_PADDING_KM = [2, 8, 25, 75, 200]
PGR_WALK_STATEMENT_TIMEOUT_MS = 2_500
PGR_DRIVE_STATEMENT_TIMEOUT_MS = 7_500
ROUTE_CACHE_TTL_SECONDS = 15 * 60
ROUTE_CACHE_MAX_ENTRIES = 512
_route_cache_lock = threading.RLock()
_route_cache: OrderedDict[tuple[object, ...], tuple[float, dict[str, object]]] = OrderedDict()
@dataclass(frozen=True)
class _GraphNode:
osm_node_id: int
lon: float
lat: float
distance_m: float
@dataclass(frozen=True)
class _Traversal:
edge_id: int
from_node: int
to_node: int
from_lon: float
from_lat: float
to_lon: float
to_lat: float
cost_s: float
length_m: float
highway: str | None
name: str | None
geometry_geojson: str
reversed: bool
def routing_status(db: Session) -> dict[str, object]:
dataset = active_routing_dataset(db)
dataset_id = None if dataset is None else int(dataset.id)
node_count = 0
edge_count = 0
if dataset_id is not None:
node_count, edge_count = _routing_status_counts(db, dataset, dataset_id)
pgrouting_available = False
pgrouting_installed = False
if settings.is_postgresql_database:
pgrouting_available = bool(
db.execute(text("SELECT EXISTS (SELECT 1 FROM pg_available_extensions WHERE name = 'pgrouting')")).scalar()
)
pgrouting_installed = bool(
db.execute(text("SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pgrouting')")).scalar()
)
return {
"dataset_id": dataset_id,
"nodes": node_count,
"edges": edge_count,
"available": edge_count > 0,
"engine": "pgrouting" if pgrouting_installed else "python_astar",
"pgrouting_available": pgrouting_available,
"pgrouting_installed": pgrouting_installed,
}
def _routing_status_counts(db: Session, dataset: Dataset, dataset_id: int) -> tuple[int, int]:
metadata = _metadata(dataset)
routing_layer = metadata.get("routing_layer")
if isinstance(routing_layer, dict):
try:
nodes = int(routing_layer.get("nodes") or 0)
edges = int(routing_layer.get("edges") or 0)
except (TypeError, ValueError):
nodes = 0
edges = 0
if nodes or edges:
return nodes, edges
if settings.is_postgresql_database:
rows = db.execute(
text(
"""
SELECT relname, COALESCE(reltuples, 0)::bigint AS estimate
FROM pg_class
WHERE oid IN ('routing_nodes'::regclass, 'routing_edges'::regclass)
"""
)
).mappings()
estimates = {str(row["relname"]): int(row["estimate"] or 0) for row in rows}
return estimates.get("routing_nodes", 0), estimates.get("routing_edges", 0)
node_count = int(db.scalar(select(func.count()).select_from(RoutingNode).where(RoutingNode.dataset_id == dataset_id)) or 0)
edge_count = int(db.scalar(select(func.count()).select_from(RoutingEdge).where(RoutingEdge.dataset_id == dataset_id)) or 0)
return node_count, edge_count
def _metadata(dataset: Dataset) -> dict[str, object]:
if not dataset.metadata_json:
return {}
try:
value = json.loads(dataset.metadata_json)
except json.JSONDecodeError:
return {}
return value if isinstance(value, dict) else {}
def route_between_points(
db: Session,
*,
from_lon: float,
from_lat: float,
to_lon: float,
to_lat: float,
mode: str = "walk",
dataset_id: int | None = None,
max_visited: int = DEFAULT_MAX_VISITED,
) -> dict[str, object]:
if mode not in {"walk", "drive"}:
raise ValueError("mode must be walk or drive")
dataset = db.get(Dataset, dataset_id) if dataset_id is not None else active_routing_dataset(db)
if dataset is None:
raise ValueError("No routing dataset is available.")
dataset_id = int(dataset.id)
cache_key = _route_cache_key(dataset_id, mode, from_lon, from_lat, to_lon, to_lat)
cached = _route_cache_get(cache_key)
if cached is not None:
return cached
start = _nearest_node(db, dataset_id, from_lon, from_lat, mode)
target = _nearest_node(db, dataset_id, to_lon, to_lat, mode)
if start is None or target is None:
raise ValueError("Routing graph has no nearby nodes for the requested mode.")
if start.osm_node_id == target.osm_node_id:
payload = _single_point_route(start, from_lon, from_lat, to_lon, to_lat, mode, dataset_id)
_route_cache_put(cache_key, payload)
return payload
if settings.is_postgresql_database and _pgrouting_installed(db):
try:
payload = _route_with_pgrouting(
db,
dataset_id=dataset_id,
mode=mode,
start=start,
target=target,
from_lon=from_lon,
from_lat=from_lat,
to_lon=to_lon,
to_lat=to_lat,
)
_route_cache_put(cache_key, payload)
return payload
except ValueError:
pass
except SQLAlchemyError:
db.rollback()
heuristic_mps = WALK_HEURISTIC_MPS if mode == "walk" else DRIVE_HEURISTIC_MPS
queue: list[tuple[float, float, int]] = []
heapq.heappush(queue, (0.0, 0.0, start.osm_node_id))
costs: dict[int, float] = {start.osm_node_id: 0.0}
coords: dict[int, tuple[float, float]] = {start.osm_node_id: (start.lon, start.lat), target.osm_node_id: (target.lon, target.lat)}
previous: dict[int, tuple[int, _Traversal]] = {}
adjacency_cache: dict[int, list[_Traversal]] = {}
visited: set[int] = set()
while queue and len(visited) < max(1, max_visited):
_, cost, node_id = heapq.heappop(queue)
if node_id in visited:
continue
visited.add(node_id)
if node_id == target.osm_node_id:
payload = _route_payload(
dataset_id=dataset_id,
mode=mode,
start=start,
target=target,
from_lon=from_lon,
from_lat=from_lat,
to_lon=to_lon,
to_lat=to_lat,
previous=previous,
total_cost_s=cost,
visited=len(visited),
)
_route_cache_put(cache_key, payload)
return payload
for edge in adjacency_cache.setdefault(node_id, _outgoing_edges(db, dataset_id, node_id, mode)):
coords[edge.to_node] = (edge.to_lon, edge.to_lat)
next_cost = cost + edge.cost_s
if next_cost >= costs.get(edge.to_node, float("inf")):
continue
costs[edge.to_node] = next_cost
previous[edge.to_node] = (node_id, edge)
heuristic = _distance_m(edge.to_lat, edge.to_lon, target.lat, target.lon) / heuristic_mps
heapq.heappush(queue, (next_cost + heuristic, next_cost, edge.to_node))
raise ValueError(f"No {mode} route found within {max_visited:,} visited graph nodes.")
def direct_route_between_points(
db: Session,
*,
from_lon: float,
from_lat: float,
to_lon: float,
to_lat: float,
mode: str = "walk",
dataset_id: int | None = None,
reason: str | None = None,
) -> dict[str, object]:
if mode not in {"walk", "drive"}:
raise ValueError("mode must be walk or drive")
dataset = db.get(Dataset, dataset_id) if dataset_id is not None else active_routing_dataset(db)
payload = _direct_route_payload(
dataset_id=0 if dataset is None else int(dataset.id),
mode=mode,
from_lon=float(from_lon),
from_lat=float(from_lat),
to_lon=float(to_lon),
to_lat=float(to_lat),
)
if reason:
payload["warning"] = reason
return payload
def snap_point_to_routing_graph(
db: Session,
*,
lon: float,
lat: float,
mode: str = "walk",
dataset_id: int | None = None,
max_distance_m: float = 250,
) -> dict[str, object] | None:
if mode not in {"walk", "drive"}:
raise ValueError("mode must be walk or drive")
dataset = db.get(Dataset, dataset_id) if dataset_id is not None else active_routing_dataset(db)
if dataset is None:
return None
dataset_id = int(dataset.id)
if settings.is_postgresql_database:
return _snap_point_to_routing_edge_postgresql(
db,
dataset_id=dataset_id,
lon=float(lon),
lat=float(lat),
mode=mode,
max_distance_m=float(max_distance_m),
)
node = _nearest_node(db, dataset_id, float(lon), float(lat), mode)
if node is None or node.distance_m > max_distance_m:
return None
return {
"dataset_id": dataset_id,
"lon": node.lon,
"lat": node.lat,
"distance_m": round(node.distance_m, 1),
"source": "routing_node",
"osm_node_id": node.osm_node_id,
}
def _snap_point_to_routing_edge_postgresql(
db: Session,
*,
dataset_id: int,
lon: float,
lat: float,
mode: str,
max_distance_m: float,
) -> dict[str, object] | None:
cost_column = "walk_cost_s" if mode == "walk" else "drive_cost_s"
reverse_cost_column = "reverse_walk_cost_s" if mode == "walk" else "reverse_drive_cost_s"
radius_deg = max_distance_m / 111_320
row = db.execute(
text(
f"""
WITH point AS (
SELECT ST_SetSRID(ST_MakePoint(:lon, :lat), 4326) AS geom
),
edges AS MATERIALIZED (
SELECT
edge.id,
edge.highway,
edge.name,
CASE
WHEN edge.tags_json IS NULL OR edge.tags_json = '' THEN NULL
ELSE edge.tags_json::jsonb ->> 'service'
END AS service,
edge.source_osm_node_id,
edge.target_osm_node_id,
ST_SetSRID(
ST_MakeLine(
ST_MakePoint(edge.source_lon, edge.source_lat),
ST_MakePoint(edge.target_lon, edge.target_lat)
),
4326
) AS edge_geom
FROM routing_edges AS edge
CROSS JOIN point
WHERE edge.dataset_id = :dataset_id
AND (edge.{cost_column} IS NOT NULL OR edge.{reverse_cost_column} IS NOT NULL)
AND box(point(edge.max_lon, edge.max_lat), point(edge.min_lon, edge.min_lat))
&& box(
point(:lon + :radius_deg, :lat + :radius_deg),
point(:lon - :radius_deg, :lat - :radius_deg)
)
),
candidate AS (
SELECT
edges.id,
edges.highway,
edges.name,
edges.service,
edges.source_osm_node_id,
edges.target_osm_node_id,
ST_ClosestPoint(edges.edge_geom, point.geom) AS snapped_geom,
ST_DistanceSphere(edges.edge_geom, point.geom) AS distance_m,
CASE
WHEN edges.highway IN ('footway', 'pedestrian', 'steps') THEN 0
WHEN edges.highway IN ('path', 'cycleway', 'bridleway') THEN 1
WHEN edges.highway IN ('living_street', 'residential') THEN 2
WHEN edges.highway = 'service' THEN 3
ELSE 4
END AS highway_rank,
CASE
WHEN :mode != 'walk' THEN 0
WHEN edges.highway = 'service' THEN 20
WHEN edges.highway IN ('primary', 'primary_link', 'secondary', 'secondary_link') THEN 10
WHEN edges.highway IN ('tertiary', 'tertiary_link', 'unclassified', 'road') THEN 5
ELSE 0
END AS snap_penalty_m
FROM edges
CROSS JOIN point
WHERE ST_DWithin(edges.edge_geom::geography, point.geom::geography, :max_distance_m)
AND NOT (
:mode = 'walk'
AND edges.highway = 'service'
AND COALESCE(edges.service, '') IN ('driveway', 'parking_aisle', 'drive-through')
)
ORDER BY
ST_DistanceSphere(edges.edge_geom, point.geom) + CASE
WHEN :mode != 'walk' THEN 0
WHEN edges.highway = 'service' THEN 20
WHEN edges.highway IN ('primary', 'primary_link', 'secondary', 'secondary_link') THEN 10
WHEN edges.highway IN ('tertiary', 'tertiary_link', 'unclassified', 'road') THEN 5
ELSE 0
END,
ST_DistanceSphere(edges.edge_geom, point.geom),
highway_rank,
edges.id
LIMIT 1
)
SELECT
id,
highway,
name,
source_osm_node_id,
target_osm_node_id,
ST_X(snapped_geom) AS lon,
ST_Y(snapped_geom) AS lat,
distance_m
FROM candidate
"""
),
{
"dataset_id": dataset_id,
"lon": lon,
"lat": lat,
"radius_deg": radius_deg,
"max_distance_m": max_distance_m,
"mode": mode,
},
).mappings().first()
if row is None:
return None
return {
"dataset_id": dataset_id,
"lon": float(row["lon"]),
"lat": float(row["lat"]),
"distance_m": round(float(row["distance_m"] or 0), 1),
"source": "routing_edge",
"edge_id": int(row["id"]),
"highway": row["highway"],
"name": row["name"],
"source_osm_node_id": int(row["source_osm_node_id"]),
"target_osm_node_id": int(row["target_osm_node_id"]),
}
def _route_cache_key(dataset_id: int, mode: str, from_lon: float, from_lat: float, to_lon: float, to_lat: float) -> tuple[object, ...]:
return (
int(dataset_id),
mode,
round(float(from_lon), 6),
round(float(from_lat), 6),
round(float(to_lon), 6),
round(float(to_lat), 6),
)
def _route_cache_get(key: tuple[object, ...]) -> dict[str, object] | None:
now = time.monotonic()
with _route_cache_lock:
cached = _route_cache.get(key)
if cached is None:
return None
expires_at, payload = cached
if expires_at <= now:
_route_cache.pop(key, None)
return None
_route_cache.move_to_end(key)
return copy.deepcopy(payload)
def _route_cache_put(key: tuple[object, ...], payload: dict[str, object]) -> None:
with _route_cache_lock:
_route_cache[key] = (time.monotonic() + ROUTE_CACHE_TTL_SECONDS, copy.deepcopy(payload))
_route_cache.move_to_end(key)
while len(_route_cache) > ROUTE_CACHE_MAX_ENTRIES:
_route_cache.popitem(last=False)
def _pgrouting_installed(db: Session) -> bool:
return bool(db.execute(text("SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pgrouting')")).scalar())
def _route_with_pgrouting(
db: Session,
*,
dataset_id: int,
mode: str,
start: _GraphNode,
target: _GraphNode,
from_lon: float,
from_lat: float,
to_lon: float,
to_lat: float,
) -> dict[str, object]:
cost_column = "walk_cost_s" if mode == "walk" else "drive_cost_s"
reverse_cost_column = "reverse_walk_cost_s" if mode == "walk" else "reverse_drive_cost_s"
routing_cost = _routing_cost_expression(cost_column, mode)
reverse_routing_cost = _routing_cost_expression(reverse_cost_column, mode)
for padding_km in PGR_WALK_BBOX_PADDING_KM if mode == "walk" else PGR_DRIVE_BBOX_PADDING_KM:
_set_local_statement_timeout(
db,
PGR_WALK_STATEMENT_TIMEOUT_MS if mode == "walk" else PGR_DRIVE_STATEMENT_TIMEOUT_MS,
)
bbox = _expanded_bbox(
min(from_lon, to_lon, start.lon, target.lon),
min(from_lat, to_lat, start.lat, target.lat),
max(from_lon, to_lon, start.lon, target.lon),
max(from_lat, to_lat, start.lat, target.lat),
padding_km,
)
edge_sql = f"""
SELECT
id,
source_osm_node_id AS source,
target_osm_node_id AS target,
COALESCE({routing_cost}, -1)::float8 AS cost,
COALESCE({reverse_routing_cost}, -1)::float8 AS reverse_cost
FROM routing_edges
WHERE dataset_id = {int(dataset_id)}
AND ({cost_column} IS NOT NULL OR {reverse_cost_column} IS NOT NULL)
AND box(point(max_lon, max_lat), point(min_lon, min_lat))
&& box(point({bbox[2]:.8f}, {bbox[3]:.8f}), point({bbox[0]:.8f}, {bbox[1]:.8f}))
"""
rows = db.execute(
text(
f"""
WITH route AS (
SELECT *
FROM pgr_dijkstra(:edge_sql, :start_node, :target_node, directed := true)
),
steps AS (
SELECT
route.path_seq,
route.node AS from_node,
LEAD(route.node) OVER (ORDER BY route.path_seq) AS to_node,
route.edge,
route.cost
FROM route
)
SELECT
steps.path_seq,
steps.from_node,
steps.to_node,
steps.cost,
edge.id,
edge.source_osm_node_id,
edge.target_osm_node_id,
edge.source_lon,
edge.source_lat,
edge.target_lon,
edge.target_lat,
edge.length_m,
edge.highway,
edge.name,
edge.geometry_geojson,
CASE
WHEN steps.from_node = edge.source_osm_node_id THEN edge.{cost_column}
ELSE edge.{reverse_cost_column}
END AS actual_cost_s
FROM steps
JOIN routing_edges AS edge ON edge.id = steps.edge
WHERE steps.edge <> -1
ORDER BY steps.path_seq
"""
),
{"edge_sql": edge_sql, "start_node": start.osm_node_id, "target_node": target.osm_node_id},
).all()
if rows:
return _pgrouting_payload(
dataset_id=dataset_id,
mode=mode,
start=start,
target=target,
from_lon=from_lon,
from_lat=from_lat,
to_lon=to_lon,
to_lat=to_lat,
rows=rows,
padding_km=padding_km,
)
raise ValueError("pgRouting did not find a route in the bounded search area.")
def _set_local_statement_timeout(db: Session, timeout_ms: int) -> None:
db.execute(text("SELECT set_config('statement_timeout', :timeout, true)"), {"timeout": f"{int(timeout_ms)}ms"})
def _pgrouting_payload(
*,
dataset_id: int,
mode: str,
start: _GraphNode,
target: _GraphNode,
from_lon: float,
from_lat: float,
to_lon: float,
to_lat: float,
rows,
padding_km: float,
) -> dict[str, object]:
previous: dict[int, tuple[int, _Traversal]] = {}
total_cost = 0.0
for row in rows:
if row.to_node is None:
continue
from_node = int(row.from_node)
to_node = int(row.to_node)
source_node = int(row.source_osm_node_id)
target_node = int(row.target_osm_node_id)
actual_cost = float(row.actual_cost_s if row.actual_cost_s is not None else row.cost or 0)
reversed_edge = from_node == target_node and to_node == source_node
if reversed_edge:
from_lon_edge, from_lat_edge = float(row.target_lon), float(row.target_lat)
to_lon_edge, to_lat_edge = float(row.source_lon), float(row.source_lat)
else:
from_lon_edge, from_lat_edge = float(row.source_lon), float(row.source_lat)
to_lon_edge, to_lat_edge = float(row.target_lon), float(row.target_lat)
total_cost += actual_cost
previous[to_node] = (
from_node,
_Traversal(
edge_id=int(row.id),
from_node=from_node,
to_node=to_node,
from_lon=from_lon_edge,
from_lat=from_lat_edge,
to_lon=to_lon_edge,
to_lat=to_lat_edge,
cost_s=actual_cost,
length_m=float(row.length_m),
highway=row.highway,
name=row.name,
geometry_geojson=str(row.geometry_geojson),
reversed=reversed_edge,
),
)
payload = _route_payload(
dataset_id=dataset_id,
mode=mode,
start=start,
target=target,
from_lon=from_lon,
from_lat=from_lat,
to_lon=to_lon,
to_lat=to_lat,
previous=previous,
total_cost_s=total_cost,
visited=len(rows),
)
payload["engine"] = "pgrouting"
payload["bbox_padding_km"] = padding_km
return payload
def _routing_cost_expression(column: str, mode: str) -> str:
if mode != "walk":
return column
return f"""
CASE
WHEN {column} IS NULL THEN NULL
ELSE {column} * CASE
WHEN highway IN ('footway', 'pedestrian') THEN 0.70
WHEN highway = 'path' THEN 0.78
WHEN highway = 'steps' THEN 0.95
WHEN highway = 'cycleway' THEN 1.05
WHEN highway = 'bridleway' THEN 1.10
WHEN highway IN ('living_street', 'track') THEN 1.15
WHEN highway IN ('residential', 'service') THEN 1.35
WHEN highway IN ('unclassified', 'road') THEN 1.55
WHEN highway IN ('tertiary', 'tertiary_link') THEN 1.80
WHEN highway IN ('secondary', 'secondary_link') THEN 2.15
WHEN highway IN ('primary', 'primary_link') THEN 2.50
ELSE 1.30
END
END
"""
def _nearest_node(db: Session, dataset_id: int, lon: float, lat: float, mode: str) -> _GraphNode | None:
cost_column = "walk_cost_s" if mode == "walk" else "drive_cost_s"
reverse_cost_column = "reverse_walk_cost_s" if mode == "walk" else "reverse_drive_cost_s"
row = None
for candidate_limit in (64, 512, 4096):
row = db.execute(
text(
f"""
WITH nearest AS MATERIALIZED (
SELECT node.osm_node_id, node.lon, node.lat, node.geom
FROM routing_nodes AS node
WHERE node.dataset_id = :dataset_id
AND node.geom IS NOT NULL
ORDER BY node.geom <-> ST_SetSRID(ST_MakePoint(:lon, :lat), 4326)
LIMIT :candidate_limit
),
candidate AS (
SELECT nearest.osm_node_id, nearest.lon, nearest.lat, nearest.geom
FROM nearest
WHERE EXISTS (
SELECT 1
FROM routing_edges AS edge
WHERE edge.dataset_id = :dataset_id
AND (
(edge.source_osm_node_id = nearest.osm_node_id AND edge.{cost_column} IS NOT NULL)
OR (edge.target_osm_node_id = nearest.osm_node_id AND edge.{reverse_cost_column} IS NOT NULL)
)
LIMIT 1
)
ORDER BY nearest.geom <-> ST_SetSRID(ST_MakePoint(:lon, :lat), 4326)
LIMIT 1
)
SELECT osm_node_id, lon, lat, ST_DistanceSphere(geom, ST_SetSRID(ST_MakePoint(:lon, :lat), 4326)) AS distance_m
FROM candidate
"""
),
{"dataset_id": dataset_id, "lon": lon, "lat": lat, "candidate_limit": candidate_limit},
).first()
if row is not None:
break
if row is None:
return None
return _GraphNode(osm_node_id=int(row.osm_node_id), lon=float(row.lon), lat=float(row.lat), distance_m=float(row.distance_m or 0))
def _outgoing_edges(db: Session, dataset_id: int, node_id: int, mode: str) -> list[_Traversal]:
cost_column = "walk_cost_s" if mode == "walk" else "drive_cost_s"
reverse_cost_column = "reverse_walk_cost_s" if mode == "walk" else "reverse_drive_cost_s"
rows = db.execute(
text(
f"""
SELECT
id, source_osm_node_id, target_osm_node_id,
source_lon, source_lat, target_lon, target_lat,
length_m, highway, name, geometry_geojson,
CASE
WHEN source_osm_node_id = :node_id THEN {cost_column}
ELSE {reverse_cost_column}
END AS cost_s,
target_osm_node_id != :node_id AS forward
FROM routing_edges
WHERE dataset_id = :dataset_id
AND (
(source_osm_node_id = :node_id AND {cost_column} IS NOT NULL)
OR (target_osm_node_id = :node_id AND {reverse_cost_column} IS NOT NULL)
)
"""
),
{"dataset_id": dataset_id, "node_id": node_id},
).all()
edges = []
for row in rows:
forward = bool(row.forward)
if forward:
to_node = int(row.target_osm_node_id)
from_lon, from_lat = float(row.source_lon), float(row.source_lat)
to_lon, to_lat = float(row.target_lon), float(row.target_lat)
else:
to_node = int(row.source_osm_node_id)
from_lon, from_lat = float(row.target_lon), float(row.target_lat)
to_lon, to_lat = float(row.source_lon), float(row.source_lat)
edges.append(
_Traversal(
edge_id=int(row.id),
from_node=node_id,
to_node=to_node,
from_lon=from_lon,
from_lat=from_lat,
to_lon=to_lon,
to_lat=to_lat,
cost_s=float(row.cost_s),
length_m=float(row.length_m),
highway=row.highway,
name=row.name,
geometry_geojson=str(row.geometry_geojson),
reversed=not forward,
)
)
return edges
def _route_payload(
*,
dataset_id: int,
mode: str,
start: _GraphNode,
target: _GraphNode,
from_lon: float,
from_lat: float,
to_lon: float,
to_lat: float,
previous: dict[int, tuple[int, _Traversal]],
total_cost_s: float,
visited: int,
) -> dict[str, object]:
edges: list[_Traversal] = []
current = target.osm_node_id
while current != start.osm_node_id:
prior, edge = previous[current]
edges.append(edge)
current = prior
edges.reverse()
network_distance = sum(edge.length_m for edge in edges)
access_distance = start.distance_m + target.distance_m
features = []
if start.distance_m:
features.append(_connector_feature("access", mode, [[from_lon, from_lat], [start.lon, start.lat]], start.distance_m))
for index, edge in enumerate(edges, start=1):
geometry = json.loads(edge.geometry_geojson)
if edge.reversed:
geometry["coordinates"] = list(reversed(geometry.get("coordinates", [])))
features.append(
{
"type": "Feature",
"geometry": geometry,
"properties": {
"feature_type": "routing_edge",
"sequence": index,
"mode": mode,
"edge_id": edge.edge_id,
"highway": edge.highway,
"name": edge.name,
"length_m": edge.length_m,
"cost_s": edge.cost_s,
},
}
)
if target.distance_m:
features.append(_connector_feature("egress", mode, [[target.lon, target.lat], [to_lon, to_lat]], target.distance_m))
duration_seconds = total_cost_s + _connector_seconds(access_distance, mode)
return {
"dataset_id": dataset_id,
"mode": mode,
"engine": "python_astar",
"distance_m": round(network_distance + access_distance, 1),
"network_distance_m": round(network_distance, 1),
"access_distance_m": round(access_distance, 1),
"duration_seconds": round(duration_seconds, 1),
"duration_minutes": _duration_minutes_ceil(duration_seconds),
"duration_label": _duration_label(duration_seconds),
"visited_nodes": visited,
"start_node": {"osm_node_id": start.osm_node_id, "distance_m": round(start.distance_m, 1)},
"target_node": {"osm_node_id": target.osm_node_id, "distance_m": round(target.distance_m, 1)},
"features": feature_collection(features),
}
def _single_point_route(start: _GraphNode, from_lon: float, from_lat: float, to_lon: float, to_lat: float, mode: str, dataset_id: int) -> dict[str, object]:
return _direct_route_payload(
dataset_id=dataset_id,
mode=mode,
from_lon=from_lon,
from_lat=from_lat,
to_lon=to_lon,
to_lat=to_lat,
engine="python_astar",
start_node={"osm_node_id": start.osm_node_id, "distance_m": round(start.distance_m, 1)},
target_node={"osm_node_id": start.osm_node_id, "distance_m": round(start.distance_m, 1)},
visited_nodes=1,
)
def _direct_route_payload(
*,
dataset_id: int,
mode: str,
from_lon: float,
from_lat: float,
to_lon: float,
to_lat: float,
engine: str = "direct_fallback",
start_node: dict[str, object] | None = None,
target_node: dict[str, object] | None = None,
visited_nodes: int = 0,
) -> dict[str, object]:
distance = _distance_m(from_lat, from_lon, to_lat, to_lon)
duration_seconds = _connector_seconds(distance, mode)
return {
"dataset_id": dataset_id,
"mode": mode,
"engine": engine,
"distance_m": round(distance, 1),
"network_distance_m": 0,
"access_distance_m": round(distance, 1),
"duration_seconds": round(duration_seconds, 1),
"duration_minutes": _duration_minutes_ceil(duration_seconds),
"duration_label": _duration_label(duration_seconds),
"visited_nodes": visited_nodes,
"start_node": start_node,
"target_node": target_node,
"features": feature_collection([_connector_feature("direct", mode, [[from_lon, from_lat], [to_lon, to_lat]], distance)]),
}
def _connector_feature(kind: str, mode: str, coordinates: list[list[float]], distance_m: float) -> dict:
return {
"type": "Feature",
"geometry": {"type": "LineString", "coordinates": coordinates},
"properties": {
"feature_type": "routing_connector",
"connector": kind,
"mode": mode,
"length_m": distance_m,
"cost_s": _connector_seconds(distance_m, mode),
},
}
def _connector_seconds(distance_m: float, mode: str) -> float:
speed = 1.35 if mode == "walk" else 8.0
return float(distance_m) / speed
def _duration_minutes_ceil(seconds: int | float | None) -> int | None:
if seconds is None:
return None
return max(0, int(math.ceil(float(seconds) / 60)))
def _duration_label(seconds: int | float | None) -> str | None:
minutes_total = _duration_minutes_ceil(seconds)
if minutes_total is None:
return None
days = minutes_total // (24 * 60)
remaining = minutes_total % (24 * 60)
hours = remaining // 60
minutes = remaining % 60
if days:
return f"{days}d {hours:02d}:{minutes:02d}"
if hours:
return f"{hours}:{minutes:02d}"
return f"{minutes} min"
def _expanded_bbox(min_lon: float, min_lat: float, max_lon: float, max_lat: float, padding_km: float) -> tuple[float, float, float, float]:
mid_lat = (min_lat + max_lat) / 2
lat_delta = padding_km / 111.0
lon_delta = padding_km / max(1.0, 111.0 * math.cos(math.radians(mid_lat)))
return (min_lon - lon_delta, min_lat - lat_delta, max_lon + lon_delta, max_lat + lat_delta)
def _distance_m(lat_a: float, lon_a: float, lat_b: float, lon_b: float) -> float:
radius = 6_371_000.0
phi_a = math.radians(lat_a)
phi_b = math.radians(lat_b)
delta_phi = math.radians(lat_b - lat_a)
delta_lambda = math.radians(lon_b - lon_a)
hav = math.sin(delta_phi / 2) ** 2 + math.cos(phi_a) * math.cos(phi_b) * math.sin(delta_lambda / 2) ** 2
return radius * 2 * math.atan2(math.sqrt(hav), math.sqrt(1 - hav))