from __future__ import annotations import copy import heapq import json import math import threading import time from collections import OrderedDict from dataclasses import dataclass from sqlalchemy import func, select, text from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import Session from app.config import settings from app.models import Dataset, RoutingEdge, RoutingNode from app.pipeline.routing_layer import active_routing_dataset from app.serializers import feature_collection WALK_HEURISTIC_MPS = 1.6 DRIVE_HEURISTIC_MPS = 36.0 DEFAULT_MAX_VISITED = 160_000 PGR_WALK_BBOX_PADDING_KM = [0.5, 1.5, 4, 10, 25] PGR_DRIVE_BBOX_PADDING_KM = [2, 8, 25, 75, 200] PGR_WALK_STATEMENT_TIMEOUT_MS = 2_500 PGR_DRIVE_STATEMENT_TIMEOUT_MS = 7_500 ROUTE_CACHE_TTL_SECONDS = 15 * 60 ROUTE_CACHE_MAX_ENTRIES = 512 _route_cache_lock = threading.RLock() _route_cache: OrderedDict[tuple[object, ...], tuple[float, dict[str, object]]] = OrderedDict() @dataclass(frozen=True) class _GraphNode: osm_node_id: int lon: float lat: float distance_m: float @dataclass(frozen=True) class _Traversal: edge_id: int from_node: int to_node: int from_lon: float from_lat: float to_lon: float to_lat: float cost_s: float length_m: float highway: str | None name: str | None geometry_geojson: str reversed: bool def routing_status(db: Session) -> dict[str, object]: dataset = active_routing_dataset(db) dataset_id = None if dataset is None else int(dataset.id) node_count = 0 edge_count = 0 if dataset_id is not None: node_count, edge_count = _routing_status_counts(db, dataset, dataset_id) pgrouting_available = False pgrouting_installed = False if settings.is_postgresql_database: pgrouting_available = bool( db.execute(text("SELECT EXISTS (SELECT 1 FROM pg_available_extensions WHERE name = 'pgrouting')")).scalar() ) pgrouting_installed = bool( db.execute(text("SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pgrouting')")).scalar() ) return { "dataset_id": dataset_id, "nodes": node_count, "edges": edge_count, "available": edge_count > 0, "engine": "pgrouting" if pgrouting_installed else "python_astar", "pgrouting_available": pgrouting_available, "pgrouting_installed": pgrouting_installed, } def _routing_status_counts(db: Session, dataset: Dataset, dataset_id: int) -> tuple[int, int]: metadata = _metadata(dataset) routing_layer = metadata.get("routing_layer") if isinstance(routing_layer, dict): try: nodes = int(routing_layer.get("nodes") or 0) edges = int(routing_layer.get("edges") or 0) except (TypeError, ValueError): nodes = 0 edges = 0 if nodes or edges: return nodes, edges if settings.is_postgresql_database: rows = db.execute( text( """ SELECT relname, COALESCE(reltuples, 0)::bigint AS estimate FROM pg_class WHERE oid IN ('routing_nodes'::regclass, 'routing_edges'::regclass) """ ) ).mappings() estimates = {str(row["relname"]): int(row["estimate"] or 0) for row in rows} return estimates.get("routing_nodes", 0), estimates.get("routing_edges", 0) node_count = int(db.scalar(select(func.count()).select_from(RoutingNode).where(RoutingNode.dataset_id == dataset_id)) or 0) edge_count = int(db.scalar(select(func.count()).select_from(RoutingEdge).where(RoutingEdge.dataset_id == dataset_id)) or 0) return node_count, edge_count def _metadata(dataset: Dataset) -> dict[str, object]: if not dataset.metadata_json: return {} try: value = json.loads(dataset.metadata_json) except json.JSONDecodeError: return {} return value if isinstance(value, dict) else {} def route_between_points( db: Session, *, from_lon: float, from_lat: float, to_lon: float, to_lat: float, mode: str = "walk", dataset_id: int | None = None, max_visited: int = DEFAULT_MAX_VISITED, ) -> dict[str, object]: if mode not in {"walk", "drive"}: raise ValueError("mode must be walk or drive") dataset = db.get(Dataset, dataset_id) if dataset_id is not None else active_routing_dataset(db) if dataset is None: raise ValueError("No routing dataset is available.") dataset_id = int(dataset.id) cache_key = _route_cache_key(dataset_id, mode, from_lon, from_lat, to_lon, to_lat) cached = _route_cache_get(cache_key) if cached is not None: return cached start = _nearest_node(db, dataset_id, from_lon, from_lat, mode) target = _nearest_node(db, dataset_id, to_lon, to_lat, mode) if start is None or target is None: raise ValueError("Routing graph has no nearby nodes for the requested mode.") if start.osm_node_id == target.osm_node_id: payload = _single_point_route(start, from_lon, from_lat, to_lon, to_lat, mode, dataset_id) _route_cache_put(cache_key, payload) return payload if settings.is_postgresql_database and _pgrouting_installed(db): try: payload = _route_with_pgrouting( db, dataset_id=dataset_id, mode=mode, start=start, target=target, from_lon=from_lon, from_lat=from_lat, to_lon=to_lon, to_lat=to_lat, ) _route_cache_put(cache_key, payload) return payload except ValueError: pass except SQLAlchemyError: db.rollback() heuristic_mps = WALK_HEURISTIC_MPS if mode == "walk" else DRIVE_HEURISTIC_MPS queue: list[tuple[float, float, int]] = [] heapq.heappush(queue, (0.0, 0.0, start.osm_node_id)) costs: dict[int, float] = {start.osm_node_id: 0.0} coords: dict[int, tuple[float, float]] = {start.osm_node_id: (start.lon, start.lat), target.osm_node_id: (target.lon, target.lat)} previous: dict[int, tuple[int, _Traversal]] = {} adjacency_cache: dict[int, list[_Traversal]] = {} visited: set[int] = set() while queue and len(visited) < max(1, max_visited): _, cost, node_id = heapq.heappop(queue) if node_id in visited: continue visited.add(node_id) if node_id == target.osm_node_id: payload = _route_payload( dataset_id=dataset_id, mode=mode, start=start, target=target, from_lon=from_lon, from_lat=from_lat, to_lon=to_lon, to_lat=to_lat, previous=previous, total_cost_s=cost, visited=len(visited), ) _route_cache_put(cache_key, payload) return payload for edge in adjacency_cache.setdefault(node_id, _outgoing_edges(db, dataset_id, node_id, mode)): coords[edge.to_node] = (edge.to_lon, edge.to_lat) next_cost = cost + edge.cost_s if next_cost >= costs.get(edge.to_node, float("inf")): continue costs[edge.to_node] = next_cost previous[edge.to_node] = (node_id, edge) heuristic = _distance_m(edge.to_lat, edge.to_lon, target.lat, target.lon) / heuristic_mps heapq.heappush(queue, (next_cost + heuristic, next_cost, edge.to_node)) raise ValueError(f"No {mode} route found within {max_visited:,} visited graph nodes.") def direct_route_between_points( db: Session, *, from_lon: float, from_lat: float, to_lon: float, to_lat: float, mode: str = "walk", dataset_id: int | None = None, reason: str | None = None, ) -> dict[str, object]: if mode not in {"walk", "drive"}: raise ValueError("mode must be walk or drive") dataset = db.get(Dataset, dataset_id) if dataset_id is not None else active_routing_dataset(db) payload = _direct_route_payload( dataset_id=0 if dataset is None else int(dataset.id), mode=mode, from_lon=float(from_lon), from_lat=float(from_lat), to_lon=float(to_lon), to_lat=float(to_lat), ) if reason: payload["warning"] = reason return payload def snap_point_to_routing_graph( db: Session, *, lon: float, lat: float, mode: str = "walk", dataset_id: int | None = None, max_distance_m: float = 250, ) -> dict[str, object] | None: if mode not in {"walk", "drive"}: raise ValueError("mode must be walk or drive") dataset = db.get(Dataset, dataset_id) if dataset_id is not None else active_routing_dataset(db) if dataset is None: return None dataset_id = int(dataset.id) if settings.is_postgresql_database: return _snap_point_to_routing_edge_postgresql( db, dataset_id=dataset_id, lon=float(lon), lat=float(lat), mode=mode, max_distance_m=float(max_distance_m), ) node = _nearest_node(db, dataset_id, float(lon), float(lat), mode) if node is None or node.distance_m > max_distance_m: return None return { "dataset_id": dataset_id, "lon": node.lon, "lat": node.lat, "distance_m": round(node.distance_m, 1), "source": "routing_node", "osm_node_id": node.osm_node_id, } def _snap_point_to_routing_edge_postgresql( db: Session, *, dataset_id: int, lon: float, lat: float, mode: str, max_distance_m: float, ) -> dict[str, object] | None: cost_column = "walk_cost_s" if mode == "walk" else "drive_cost_s" reverse_cost_column = "reverse_walk_cost_s" if mode == "walk" else "reverse_drive_cost_s" radius_deg = max_distance_m / 111_320 row = db.execute( text( f""" WITH point AS ( SELECT ST_SetSRID(ST_MakePoint(:lon, :lat), 4326) AS geom ), edges AS MATERIALIZED ( SELECT edge.id, edge.highway, edge.name, CASE WHEN edge.tags_json IS NULL OR edge.tags_json = '' THEN NULL ELSE edge.tags_json::jsonb ->> 'service' END AS service, edge.source_osm_node_id, edge.target_osm_node_id, ST_SetSRID( ST_MakeLine( ST_MakePoint(edge.source_lon, edge.source_lat), ST_MakePoint(edge.target_lon, edge.target_lat) ), 4326 ) AS edge_geom FROM routing_edges AS edge CROSS JOIN point WHERE edge.dataset_id = :dataset_id AND (edge.{cost_column} IS NOT NULL OR edge.{reverse_cost_column} IS NOT NULL) AND box(point(edge.max_lon, edge.max_lat), point(edge.min_lon, edge.min_lat)) && box( point(:lon + :radius_deg, :lat + :radius_deg), point(:lon - :radius_deg, :lat - :radius_deg) ) ), candidate AS ( SELECT edges.id, edges.highway, edges.name, edges.service, edges.source_osm_node_id, edges.target_osm_node_id, ST_ClosestPoint(edges.edge_geom, point.geom) AS snapped_geom, ST_DistanceSphere(edges.edge_geom, point.geom) AS distance_m, CASE WHEN edges.highway IN ('footway', 'pedestrian', 'steps') THEN 0 WHEN edges.highway IN ('path', 'cycleway', 'bridleway') THEN 1 WHEN edges.highway IN ('living_street', 'residential') THEN 2 WHEN edges.highway = 'service' THEN 3 ELSE 4 END AS highway_rank, CASE WHEN :mode != 'walk' THEN 0 WHEN edges.highway = 'service' THEN 20 WHEN edges.highway IN ('primary', 'primary_link', 'secondary', 'secondary_link') THEN 10 WHEN edges.highway IN ('tertiary', 'tertiary_link', 'unclassified', 'road') THEN 5 ELSE 0 END AS snap_penalty_m FROM edges CROSS JOIN point WHERE ST_DWithin(edges.edge_geom::geography, point.geom::geography, :max_distance_m) AND NOT ( :mode = 'walk' AND edges.highway = 'service' AND COALESCE(edges.service, '') IN ('driveway', 'parking_aisle', 'drive-through') ) ORDER BY ST_DistanceSphere(edges.edge_geom, point.geom) + CASE WHEN :mode != 'walk' THEN 0 WHEN edges.highway = 'service' THEN 20 WHEN edges.highway IN ('primary', 'primary_link', 'secondary', 'secondary_link') THEN 10 WHEN edges.highway IN ('tertiary', 'tertiary_link', 'unclassified', 'road') THEN 5 ELSE 0 END, ST_DistanceSphere(edges.edge_geom, point.geom), highway_rank, edges.id LIMIT 1 ) SELECT id, highway, name, source_osm_node_id, target_osm_node_id, ST_X(snapped_geom) AS lon, ST_Y(snapped_geom) AS lat, distance_m FROM candidate """ ), { "dataset_id": dataset_id, "lon": lon, "lat": lat, "radius_deg": radius_deg, "max_distance_m": max_distance_m, "mode": mode, }, ).mappings().first() if row is None: return None return { "dataset_id": dataset_id, "lon": float(row["lon"]), "lat": float(row["lat"]), "distance_m": round(float(row["distance_m"] or 0), 1), "source": "routing_edge", "edge_id": int(row["id"]), "highway": row["highway"], "name": row["name"], "source_osm_node_id": int(row["source_osm_node_id"]), "target_osm_node_id": int(row["target_osm_node_id"]), } def _route_cache_key(dataset_id: int, mode: str, from_lon: float, from_lat: float, to_lon: float, to_lat: float) -> tuple[object, ...]: return ( int(dataset_id), mode, round(float(from_lon), 6), round(float(from_lat), 6), round(float(to_lon), 6), round(float(to_lat), 6), ) def _route_cache_get(key: tuple[object, ...]) -> dict[str, object] | None: now = time.monotonic() with _route_cache_lock: cached = _route_cache.get(key) if cached is None: return None expires_at, payload = cached if expires_at <= now: _route_cache.pop(key, None) return None _route_cache.move_to_end(key) return copy.deepcopy(payload) def _route_cache_put(key: tuple[object, ...], payload: dict[str, object]) -> None: with _route_cache_lock: _route_cache[key] = (time.monotonic() + ROUTE_CACHE_TTL_SECONDS, copy.deepcopy(payload)) _route_cache.move_to_end(key) while len(_route_cache) > ROUTE_CACHE_MAX_ENTRIES: _route_cache.popitem(last=False) def _pgrouting_installed(db: Session) -> bool: return bool(db.execute(text("SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pgrouting')")).scalar()) def _route_with_pgrouting( db: Session, *, dataset_id: int, mode: str, start: _GraphNode, target: _GraphNode, from_lon: float, from_lat: float, to_lon: float, to_lat: float, ) -> dict[str, object]: cost_column = "walk_cost_s" if mode == "walk" else "drive_cost_s" reverse_cost_column = "reverse_walk_cost_s" if mode == "walk" else "reverse_drive_cost_s" routing_cost = _routing_cost_expression(cost_column, mode) reverse_routing_cost = _routing_cost_expression(reverse_cost_column, mode) for padding_km in PGR_WALK_BBOX_PADDING_KM if mode == "walk" else PGR_DRIVE_BBOX_PADDING_KM: _set_local_statement_timeout( db, PGR_WALK_STATEMENT_TIMEOUT_MS if mode == "walk" else PGR_DRIVE_STATEMENT_TIMEOUT_MS, ) bbox = _expanded_bbox( min(from_lon, to_lon, start.lon, target.lon), min(from_lat, to_lat, start.lat, target.lat), max(from_lon, to_lon, start.lon, target.lon), max(from_lat, to_lat, start.lat, target.lat), padding_km, ) edge_sql = f""" SELECT id, source_osm_node_id AS source, target_osm_node_id AS target, COALESCE({routing_cost}, -1)::float8 AS cost, COALESCE({reverse_routing_cost}, -1)::float8 AS reverse_cost FROM routing_edges WHERE dataset_id = {int(dataset_id)} AND ({cost_column} IS NOT NULL OR {reverse_cost_column} IS NOT NULL) AND box(point(max_lon, max_lat), point(min_lon, min_lat)) && box(point({bbox[2]:.8f}, {bbox[3]:.8f}), point({bbox[0]:.8f}, {bbox[1]:.8f})) """ rows = db.execute( text( f""" WITH route AS ( SELECT * FROM pgr_dijkstra(:edge_sql, :start_node, :target_node, directed := true) ), steps AS ( SELECT route.path_seq, route.node AS from_node, LEAD(route.node) OVER (ORDER BY route.path_seq) AS to_node, route.edge, route.cost FROM route ) SELECT steps.path_seq, steps.from_node, steps.to_node, steps.cost, edge.id, edge.source_osm_node_id, edge.target_osm_node_id, edge.source_lon, edge.source_lat, edge.target_lon, edge.target_lat, edge.length_m, edge.highway, edge.name, edge.geometry_geojson, CASE WHEN steps.from_node = edge.source_osm_node_id THEN edge.{cost_column} ELSE edge.{reverse_cost_column} END AS actual_cost_s FROM steps JOIN routing_edges AS edge ON edge.id = steps.edge WHERE steps.edge <> -1 ORDER BY steps.path_seq """ ), {"edge_sql": edge_sql, "start_node": start.osm_node_id, "target_node": target.osm_node_id}, ).all() if rows: return _pgrouting_payload( dataset_id=dataset_id, mode=mode, start=start, target=target, from_lon=from_lon, from_lat=from_lat, to_lon=to_lon, to_lat=to_lat, rows=rows, padding_km=padding_km, ) raise ValueError("pgRouting did not find a route in the bounded search area.") def _set_local_statement_timeout(db: Session, timeout_ms: int) -> None: db.execute(text("SELECT set_config('statement_timeout', :timeout, true)"), {"timeout": f"{int(timeout_ms)}ms"}) def _pgrouting_payload( *, dataset_id: int, mode: str, start: _GraphNode, target: _GraphNode, from_lon: float, from_lat: float, to_lon: float, to_lat: float, rows, padding_km: float, ) -> dict[str, object]: previous: dict[int, tuple[int, _Traversal]] = {} total_cost = 0.0 for row in rows: if row.to_node is None: continue from_node = int(row.from_node) to_node = int(row.to_node) source_node = int(row.source_osm_node_id) target_node = int(row.target_osm_node_id) actual_cost = float(row.actual_cost_s if row.actual_cost_s is not None else row.cost or 0) reversed_edge = from_node == target_node and to_node == source_node if reversed_edge: from_lon_edge, from_lat_edge = float(row.target_lon), float(row.target_lat) to_lon_edge, to_lat_edge = float(row.source_lon), float(row.source_lat) else: from_lon_edge, from_lat_edge = float(row.source_lon), float(row.source_lat) to_lon_edge, to_lat_edge = float(row.target_lon), float(row.target_lat) total_cost += actual_cost previous[to_node] = ( from_node, _Traversal( edge_id=int(row.id), from_node=from_node, to_node=to_node, from_lon=from_lon_edge, from_lat=from_lat_edge, to_lon=to_lon_edge, to_lat=to_lat_edge, cost_s=actual_cost, length_m=float(row.length_m), highway=row.highway, name=row.name, geometry_geojson=str(row.geometry_geojson), reversed=reversed_edge, ), ) payload = _route_payload( dataset_id=dataset_id, mode=mode, start=start, target=target, from_lon=from_lon, from_lat=from_lat, to_lon=to_lon, to_lat=to_lat, previous=previous, total_cost_s=total_cost, visited=len(rows), ) payload["engine"] = "pgrouting" payload["bbox_padding_km"] = padding_km return payload def _routing_cost_expression(column: str, mode: str) -> str: if mode != "walk": return column return f""" CASE WHEN {column} IS NULL THEN NULL ELSE {column} * CASE WHEN highway IN ('footway', 'pedestrian') THEN 0.70 WHEN highway = 'path' THEN 0.78 WHEN highway = 'steps' THEN 0.95 WHEN highway = 'cycleway' THEN 1.05 WHEN highway = 'bridleway' THEN 1.10 WHEN highway IN ('living_street', 'track') THEN 1.15 WHEN highway IN ('residential', 'service') THEN 1.35 WHEN highway IN ('unclassified', 'road') THEN 1.55 WHEN highway IN ('tertiary', 'tertiary_link') THEN 1.80 WHEN highway IN ('secondary', 'secondary_link') THEN 2.15 WHEN highway IN ('primary', 'primary_link') THEN 2.50 ELSE 1.30 END END """ def _nearest_node(db: Session, dataset_id: int, lon: float, lat: float, mode: str) -> _GraphNode | None: cost_column = "walk_cost_s" if mode == "walk" else "drive_cost_s" reverse_cost_column = "reverse_walk_cost_s" if mode == "walk" else "reverse_drive_cost_s" row = None for candidate_limit in (64, 512, 4096): row = db.execute( text( f""" WITH nearest AS MATERIALIZED ( SELECT node.osm_node_id, node.lon, node.lat, node.geom FROM routing_nodes AS node WHERE node.dataset_id = :dataset_id AND node.geom IS NOT NULL ORDER BY node.geom <-> ST_SetSRID(ST_MakePoint(:lon, :lat), 4326) LIMIT :candidate_limit ), candidate AS ( SELECT nearest.osm_node_id, nearest.lon, nearest.lat, nearest.geom FROM nearest WHERE EXISTS ( SELECT 1 FROM routing_edges AS edge WHERE edge.dataset_id = :dataset_id AND ( (edge.source_osm_node_id = nearest.osm_node_id AND edge.{cost_column} IS NOT NULL) OR (edge.target_osm_node_id = nearest.osm_node_id AND edge.{reverse_cost_column} IS NOT NULL) ) LIMIT 1 ) ORDER BY nearest.geom <-> ST_SetSRID(ST_MakePoint(:lon, :lat), 4326) LIMIT 1 ) SELECT osm_node_id, lon, lat, ST_DistanceSphere(geom, ST_SetSRID(ST_MakePoint(:lon, :lat), 4326)) AS distance_m FROM candidate """ ), {"dataset_id": dataset_id, "lon": lon, "lat": lat, "candidate_limit": candidate_limit}, ).first() if row is not None: break if row is None: return None return _GraphNode(osm_node_id=int(row.osm_node_id), lon=float(row.lon), lat=float(row.lat), distance_m=float(row.distance_m or 0)) def _outgoing_edges(db: Session, dataset_id: int, node_id: int, mode: str) -> list[_Traversal]: cost_column = "walk_cost_s" if mode == "walk" else "drive_cost_s" reverse_cost_column = "reverse_walk_cost_s" if mode == "walk" else "reverse_drive_cost_s" rows = db.execute( text( f""" SELECT id, source_osm_node_id, target_osm_node_id, source_lon, source_lat, target_lon, target_lat, length_m, highway, name, geometry_geojson, CASE WHEN source_osm_node_id = :node_id THEN {cost_column} ELSE {reverse_cost_column} END AS cost_s, target_osm_node_id != :node_id AS forward FROM routing_edges WHERE dataset_id = :dataset_id AND ( (source_osm_node_id = :node_id AND {cost_column} IS NOT NULL) OR (target_osm_node_id = :node_id AND {reverse_cost_column} IS NOT NULL) ) """ ), {"dataset_id": dataset_id, "node_id": node_id}, ).all() edges = [] for row in rows: forward = bool(row.forward) if forward: to_node = int(row.target_osm_node_id) from_lon, from_lat = float(row.source_lon), float(row.source_lat) to_lon, to_lat = float(row.target_lon), float(row.target_lat) else: to_node = int(row.source_osm_node_id) from_lon, from_lat = float(row.target_lon), float(row.target_lat) to_lon, to_lat = float(row.source_lon), float(row.source_lat) edges.append( _Traversal( edge_id=int(row.id), from_node=node_id, to_node=to_node, from_lon=from_lon, from_lat=from_lat, to_lon=to_lon, to_lat=to_lat, cost_s=float(row.cost_s), length_m=float(row.length_m), highway=row.highway, name=row.name, geometry_geojson=str(row.geometry_geojson), reversed=not forward, ) ) return edges def _route_payload( *, dataset_id: int, mode: str, start: _GraphNode, target: _GraphNode, from_lon: float, from_lat: float, to_lon: float, to_lat: float, previous: dict[int, tuple[int, _Traversal]], total_cost_s: float, visited: int, ) -> dict[str, object]: edges: list[_Traversal] = [] current = target.osm_node_id while current != start.osm_node_id: prior, edge = previous[current] edges.append(edge) current = prior edges.reverse() network_distance = sum(edge.length_m for edge in edges) access_distance = start.distance_m + target.distance_m features = [] if start.distance_m: features.append(_connector_feature("access", mode, [[from_lon, from_lat], [start.lon, start.lat]], start.distance_m)) for index, edge in enumerate(edges, start=1): geometry = json.loads(edge.geometry_geojson) if edge.reversed: geometry["coordinates"] = list(reversed(geometry.get("coordinates", []))) features.append( { "type": "Feature", "geometry": geometry, "properties": { "feature_type": "routing_edge", "sequence": index, "mode": mode, "edge_id": edge.edge_id, "highway": edge.highway, "name": edge.name, "length_m": edge.length_m, "cost_s": edge.cost_s, }, } ) if target.distance_m: features.append(_connector_feature("egress", mode, [[target.lon, target.lat], [to_lon, to_lat]], target.distance_m)) duration_seconds = total_cost_s + _connector_seconds(access_distance, mode) return { "dataset_id": dataset_id, "mode": mode, "engine": "python_astar", "distance_m": round(network_distance + access_distance, 1), "network_distance_m": round(network_distance, 1), "access_distance_m": round(access_distance, 1), "duration_seconds": round(duration_seconds, 1), "duration_minutes": _duration_minutes_ceil(duration_seconds), "duration_label": _duration_label(duration_seconds), "visited_nodes": visited, "start_node": {"osm_node_id": start.osm_node_id, "distance_m": round(start.distance_m, 1)}, "target_node": {"osm_node_id": target.osm_node_id, "distance_m": round(target.distance_m, 1)}, "features": feature_collection(features), } def _single_point_route(start: _GraphNode, from_lon: float, from_lat: float, to_lon: float, to_lat: float, mode: str, dataset_id: int) -> dict[str, object]: return _direct_route_payload( dataset_id=dataset_id, mode=mode, from_lon=from_lon, from_lat=from_lat, to_lon=to_lon, to_lat=to_lat, engine="python_astar", start_node={"osm_node_id": start.osm_node_id, "distance_m": round(start.distance_m, 1)}, target_node={"osm_node_id": start.osm_node_id, "distance_m": round(start.distance_m, 1)}, visited_nodes=1, ) def _direct_route_payload( *, dataset_id: int, mode: str, from_lon: float, from_lat: float, to_lon: float, to_lat: float, engine: str = "direct_fallback", start_node: dict[str, object] | None = None, target_node: dict[str, object] | None = None, visited_nodes: int = 0, ) -> dict[str, object]: distance = _distance_m(from_lat, from_lon, to_lat, to_lon) duration_seconds = _connector_seconds(distance, mode) return { "dataset_id": dataset_id, "mode": mode, "engine": engine, "distance_m": round(distance, 1), "network_distance_m": 0, "access_distance_m": round(distance, 1), "duration_seconds": round(duration_seconds, 1), "duration_minutes": _duration_minutes_ceil(duration_seconds), "duration_label": _duration_label(duration_seconds), "visited_nodes": visited_nodes, "start_node": start_node, "target_node": target_node, "features": feature_collection([_connector_feature("direct", mode, [[from_lon, from_lat], [to_lon, to_lat]], distance)]), } def _connector_feature(kind: str, mode: str, coordinates: list[list[float]], distance_m: float) -> dict: return { "type": "Feature", "geometry": {"type": "LineString", "coordinates": coordinates}, "properties": { "feature_type": "routing_connector", "connector": kind, "mode": mode, "length_m": distance_m, "cost_s": _connector_seconds(distance_m, mode), }, } def _connector_seconds(distance_m: float, mode: str) -> float: speed = 1.35 if mode == "walk" else 8.0 return float(distance_m) / speed def _duration_minutes_ceil(seconds: int | float | None) -> int | None: if seconds is None: return None return max(0, int(math.ceil(float(seconds) / 60))) def _duration_label(seconds: int | float | None) -> str | None: minutes_total = _duration_minutes_ceil(seconds) if minutes_total is None: return None days = minutes_total // (24 * 60) remaining = minutes_total % (24 * 60) hours = remaining // 60 minutes = remaining % 60 if days: return f"{days}d {hours:02d}:{minutes:02d}" if hours: return f"{hours}:{minutes:02d}" return f"{minutes} min" def _expanded_bbox(min_lon: float, min_lat: float, max_lon: float, max_lat: float, padding_km: float) -> tuple[float, float, float, float]: mid_lat = (min_lat + max_lat) / 2 lat_delta = padding_km / 111.0 lon_delta = padding_km / max(1.0, 111.0 * math.cos(math.radians(mid_lat))) return (min_lon - lon_delta, min_lat - lat_delta, max_lon + lon_delta, max_lat + lat_delta) def _distance_m(lat_a: float, lon_a: float, lat_b: float, lon_b: float) -> float: radius = 6_371_000.0 phi_a = math.radians(lat_a) phi_b = math.radians(lat_b) delta_phi = math.radians(lat_b - lat_a) delta_lambda = math.radians(lon_b - lon_a) hav = math.sin(delta_phi / 2) ** 2 + math.cos(phi_a) * math.cos(phi_b) * math.sin(delta_lambda / 2) ** 2 return radius * 2 * math.atan2(math.sqrt(hav), math.sqrt(1 - hav))