Files
meubility-workbench/app/itineraries.py
2026-07-01 23:29:51 +02:00

361 lines
13 KiB
Python

from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Any
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.journey import duration_minutes_ceil, find_journeys, format_duration_label
from app.models import Itinerary, ItineraryLeg, TravelRequest
from app.routing import route_between_points
def generate_itineraries(
db: Session,
*,
from_stop_id: str,
to_stop_id: str,
via_stop_id: str | None,
departure: str,
service_date: str | None,
max_transfers: int,
transfer_seconds: int,
limit: int,
source_ids: list[int] | None,
preferences: dict[str, Any] | None = None,
) -> dict:
request = TravelRequest(
origin_stop_id=from_stop_id,
destination_stop_id=to_stop_id,
via_stop_id=via_stop_id or None,
departure_time=departure,
service_date=service_date or None,
max_transfers=max(0, max_transfers),
transfer_seconds=max(0, transfer_seconds),
source_filter=",".join(str(source_id) for source_id in source_ids or []) or None,
preferences_json=json.dumps(preferences or {}, separators=(",", ":")),
)
db.add(request)
db.flush()
journey_result = find_journeys(
db=db,
from_stop_id=from_stop_id,
to_stop_id=to_stop_id,
via_stop_id=via_stop_id,
departure=departure,
service_date=service_date,
max_transfers=max(0, max_transfers),
transfer_seconds=max(0, transfer_seconds),
limit=limit,
source_ids=source_ids,
)
itineraries: list[Itinerary] = []
for index, journey in enumerate(journey_result.get("journeys", []), start=1):
itinerary = _journey_itinerary(request.id, journey, index)
db.add(itinerary)
db.flush()
_add_journey_legs(db, itinerary.id, journey)
itineraries.append(itinerary)
car_itinerary = _car_itinerary(db, request.id, journey_result.get("from"), journey_result.get("to"))
if car_itinerary is not None:
db.add(car_itinerary)
db.flush()
_add_routing_leg(db, car_itinerary.id, car_itinerary)
itineraries.append(car_itinerary)
placeholders = _placeholder_itineraries(
request.id,
journey_result.get("from"),
journey_result.get("to"),
service_date=service_date,
include_car=car_itinerary is None,
)
for itinerary in placeholders:
db.add(itinerary)
db.flush()
itineraries.append(itinerary)
db.flush()
return {
"request": travel_request_payload(request),
"journey_context": {
"from": journey_result.get("from"),
"to": journey_result.get("to"),
"via": journey_result.get("via"),
"sources": journey_result.get("sources", []),
},
"itineraries": [itinerary_payload(db, itinerary) for itinerary in itineraries],
}
def travel_request_payload(request: TravelRequest) -> dict[str, Any]:
return {
"id": request.id,
"origin_stop_id": request.origin_stop_id,
"destination_stop_id": request.destination_stop_id,
"via_stop_id": request.via_stop_id,
"departure_time": request.departure_time,
"service_date": request.service_date,
"max_transfers": request.max_transfers,
"transfer_seconds": request.transfer_seconds,
"source_filter": request.source_filter,
"preferences": _json_dict(request.preferences_json),
"created_at": request.created_at.isoformat() if request.created_at else None,
}
def itinerary_payload(db: Session, itinerary: Itinerary) -> dict[str, Any]:
legs = db.scalars(
select(ItineraryLeg)
.where(ItineraryLeg.itinerary_id == itinerary.id)
.order_by(ItineraryLeg.sequence)
).all()
return {
"id": itinerary.id,
"request_id": itinerary.request_id,
"title": itinerary.title,
"family": itinerary.family,
"status": itinerary.status,
"saved": itinerary.saved,
"summary": _json_dict(itinerary.summary_json),
"score": _json_dict(itinerary.score_json),
"payload": _json_dict(itinerary.payload_json),
"legs": [itinerary_leg_payload(leg) for leg in legs],
"created_at": itinerary.created_at.isoformat() if itinerary.created_at else None,
"updated_at": itinerary.updated_at.isoformat() if itinerary.updated_at else None,
}
def itinerary_leg_payload(leg: ItineraryLeg) -> dict[str, Any]:
return {
"id": leg.id,
"itinerary_id": leg.itinerary_id,
"sequence": leg.sequence,
"mode": leg.mode,
"route_ref": leg.route_ref,
"route_name": leg.route_name,
"from_name": leg.from_name,
"to_name": leg.to_name,
"departure_time": leg.departure_time,
"arrival_time": leg.arrival_time,
"locked": leg.locked,
"payload": _json_dict(leg.payload_json),
}
def set_itinerary_saved(db: Session, itinerary: Itinerary, saved: bool) -> dict[str, Any]:
itinerary.saved = saved
itinerary.status = "saved" if saved else "candidate"
itinerary.updated_at = datetime.now(timezone.utc)
db.flush()
return itinerary_payload(db, itinerary)
def set_leg_locked(db: Session, leg: ItineraryLeg, locked: bool) -> dict[str, Any]:
leg.locked = locked
itinerary = db.get(Itinerary, leg.itinerary_id)
if itinerary is not None:
itinerary.updated_at = datetime.now(timezone.utc)
db.flush()
return itinerary_leg_payload(leg)
def recent_itineraries(db: Session, *, saved_only: bool = False, limit: int = 30) -> list[dict[str, Any]]:
stmt = select(Itinerary).order_by(Itinerary.updated_at.desc(), Itinerary.id.desc())
if saved_only:
stmt = stmt.where(Itinerary.saved.is_(True))
rows = db.scalars(stmt.limit(max(1, min(limit, 100)))).all()
return [itinerary_payload(db, itinerary) for itinerary in rows]
def _journey_itinerary(request_id: int, journey: dict, index: int) -> Itinerary:
score = _journey_score(journey)
summary = {
"departure_time": journey.get("departure_time"),
"arrival_time": journey.get("arrival_time"),
"duration_minutes": journey.get("duration_minutes"),
"duration_label": journey.get("duration_label"),
"transfers": journey.get("transfers"),
"leg_count": len(journey.get("legs", [])),
"route_refs": [leg.get("route_ref") or leg.get("route_id") for leg in journey.get("legs", [])],
}
return Itinerary(
request_id=request_id,
title=f"Public transport option {index}",
family="public_transport",
status="candidate",
saved=False,
summary_json=json.dumps(summary, separators=(",", ":")),
score_json=json.dumps(score, separators=(",", ":")),
payload_json=json.dumps({"journey": journey}, separators=(",", ":")),
)
def _add_journey_legs(db: Session, itinerary_id: int, journey: dict) -> None:
for index, leg in enumerate(journey.get("legs", []), start=1):
db.add(
ItineraryLeg(
itinerary_id=itinerary_id,
sequence=index,
mode=leg.get("mode"),
route_ref=leg.get("route_ref"),
route_name=leg.get("route_name"),
from_name=(leg.get("from") or {}).get("name") or (leg.get("from") or {}).get("stop_id"),
to_name=(leg.get("to") or {}).get("name") or (leg.get("to") or {}).get("stop_id"),
departure_time=leg.get("departure_time"),
arrival_time=leg.get("arrival_time"),
locked=False,
payload_json=json.dumps({"journey_leg": leg}, separators=(",", ":")),
)
)
def _car_itinerary(db: Session, request_id: int, from_stop: dict | None, to_stop: dict | None) -> Itinerary | None:
from_lon = _float_or_none((from_stop or {}).get("lon"))
from_lat = _float_or_none((from_stop or {}).get("lat"))
to_lon = _float_or_none((to_stop or {}).get("lon"))
to_lat = _float_or_none((to_stop or {}).get("lat"))
if None in {from_lon, from_lat, to_lon, to_lat}:
return None
try:
route = route_between_points(
db,
from_lon=from_lon,
from_lat=from_lat,
to_lon=to_lon,
to_lat=to_lat,
mode="drive",
max_visited=300_000,
)
except Exception: # noqa: BLE001 - car comparison is optional
return None
duration_seconds = _float_or_none(route.get("duration_seconds"))
duration_minutes = duration_minutes_ceil(duration_seconds)
distance_m = _float_or_none(route.get("distance_m"))
summary = {
"from": (from_stop or {}).get("name") or (from_stop or {}).get("stop_id") or "origin",
"to": (to_stop or {}).get("name") or (to_stop or {}).get("stop_id") or "destination",
"duration_minutes": duration_minutes,
"duration_label": format_duration_label(duration_seconds),
"distance_km": None if distance_m is None else round(distance_m / 1000, 1),
"transfers": 0,
"engine": route.get("engine"),
}
score = {
"duration_minutes": duration_minutes,
"transfers": 0,
"complexity": 1,
"emissions": "high",
"estimated_cost": None,
}
return Itinerary(
request_id=request_id,
title="Car only",
family="car",
status="candidate",
saved=False,
summary_json=json.dumps(summary, separators=(",", ":")),
score_json=json.dumps(score, separators=(",", ":")),
payload_json=json.dumps({"routing": route}, separators=(",", ":")),
)
def _add_routing_leg(db: Session, itinerary_id: int, itinerary: Itinerary) -> None:
payload = _json_dict(itinerary.payload_json)
route = payload.get("routing") if isinstance(payload, dict) else None
if not isinstance(route, dict):
return
db.add(
ItineraryLeg(
itinerary_id=itinerary_id,
sequence=1,
mode=str(route.get("mode") or "drive"),
route_ref=None,
route_name="Road route",
from_name=str((route.get("start_node") or {}).get("osm_node_id") or "origin"),
to_name=str((route.get("target_node") or {}).get("osm_node_id") or "destination"),
departure_time=None,
arrival_time=None,
locked=False,
payload_json=json.dumps({"routing_leg": route}, separators=(",", ":")),
)
)
def _placeholder_itineraries(
request_id: int,
from_stop: dict | None,
to_stop: dict | None,
*,
service_date: str | None,
include_car: bool = True,
) -> list[Itinerary]:
from_name = (from_stop or {}).get("name") or (from_stop or {}).get("stop_id") or "origin"
to_name = (to_stop or {}).get("name") or (to_stop or {}).get("stop_id") or "destination"
placeholders = [
("car_ferry", "Car + ferry", "Needs ferry-port candidate graph", {"complexity": 3, "emissions": "medium_high"}),
("flight_access", "Flight + airport access", "Needs airport/flight schedule connector", {"complexity": 4, "emissions": "high"}),
("rail_long_stay", "Rail with adjustable city stop", "Use via stop and leg locking to refine", {"complexity": 3, "emissions": "low"}),
]
if include_car:
placeholders.insert(0, ("car", "Car only", "Needs road-routing connector", {"complexity": 1, "emissions": "high"}))
rows = []
for family, title, note, score in placeholders:
summary = {
"from": from_name,
"to": to_name,
"service_date": service_date,
"note": note,
"duration_minutes": None,
"transfers": None,
}
rows.append(
Itinerary(
request_id=request_id,
title=title,
family=family,
status="placeholder",
saved=False,
summary_json=json.dumps(summary, separators=(",", ":")),
score_json=json.dumps(score, separators=(",", ":")),
payload_json=json.dumps({"placeholder": True, "note": note}, separators=(",", ":")),
)
)
return rows
def _float_or_none(value: object) -> float | None:
try:
return None if value is None else float(value)
except (TypeError, ValueError):
return None
def _journey_score(journey: dict) -> dict[str, Any]:
modes = [leg.get("mode") for leg in journey.get("legs", [])]
duration = journey.get("duration_minutes")
transfers = int(journey.get("transfers") or 0)
railish = sum(1 for mode in modes if mode in {"train", "subway", "tram", "light_rail"})
busish = sum(1 for mode in modes if mode in {"bus", "coach", "trolleybus"})
emissions_hint = "low" if railish >= busish else "medium"
return {
"duration_minutes": duration,
"transfers": transfers,
"complexity": transfers + len(modes),
"emissions": emissions_hint,
"overnight": False,
"estimated_cost": None,
}
def _json_dict(value: str | None) -> dict[str, Any]:
try:
data = json.loads(value or "{}")
except json.JSONDecodeError:
return {}
return data if isinstance(data, dict) else {}