361 lines
13 KiB
Python
361 lines
13 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.journey import duration_minutes_ceil, find_journeys, format_duration_label
|
|
from app.models import Itinerary, ItineraryLeg, TravelRequest
|
|
from app.routing import route_between_points
|
|
|
|
|
|
def generate_itineraries(
|
|
db: Session,
|
|
*,
|
|
from_stop_id: str,
|
|
to_stop_id: str,
|
|
via_stop_id: str | None,
|
|
departure: str,
|
|
service_date: str | None,
|
|
max_transfers: int,
|
|
transfer_seconds: int,
|
|
limit: int,
|
|
source_ids: list[int] | None,
|
|
preferences: dict[str, Any] | None = None,
|
|
) -> dict:
|
|
request = TravelRequest(
|
|
origin_stop_id=from_stop_id,
|
|
destination_stop_id=to_stop_id,
|
|
via_stop_id=via_stop_id or None,
|
|
departure_time=departure,
|
|
service_date=service_date or None,
|
|
max_transfers=max(0, max_transfers),
|
|
transfer_seconds=max(0, transfer_seconds),
|
|
source_filter=",".join(str(source_id) for source_id in source_ids or []) or None,
|
|
preferences_json=json.dumps(preferences or {}, separators=(",", ":")),
|
|
)
|
|
db.add(request)
|
|
db.flush()
|
|
|
|
journey_result = find_journeys(
|
|
db=db,
|
|
from_stop_id=from_stop_id,
|
|
to_stop_id=to_stop_id,
|
|
via_stop_id=via_stop_id,
|
|
departure=departure,
|
|
service_date=service_date,
|
|
max_transfers=max(0, max_transfers),
|
|
transfer_seconds=max(0, transfer_seconds),
|
|
limit=limit,
|
|
source_ids=source_ids,
|
|
)
|
|
itineraries: list[Itinerary] = []
|
|
for index, journey in enumerate(journey_result.get("journeys", []), start=1):
|
|
itinerary = _journey_itinerary(request.id, journey, index)
|
|
db.add(itinerary)
|
|
db.flush()
|
|
_add_journey_legs(db, itinerary.id, journey)
|
|
itineraries.append(itinerary)
|
|
|
|
car_itinerary = _car_itinerary(db, request.id, journey_result.get("from"), journey_result.get("to"))
|
|
if car_itinerary is not None:
|
|
db.add(car_itinerary)
|
|
db.flush()
|
|
_add_routing_leg(db, car_itinerary.id, car_itinerary)
|
|
itineraries.append(car_itinerary)
|
|
|
|
placeholders = _placeholder_itineraries(
|
|
request.id,
|
|
journey_result.get("from"),
|
|
journey_result.get("to"),
|
|
service_date=service_date,
|
|
include_car=car_itinerary is None,
|
|
)
|
|
for itinerary in placeholders:
|
|
db.add(itinerary)
|
|
db.flush()
|
|
itineraries.append(itinerary)
|
|
|
|
db.flush()
|
|
return {
|
|
"request": travel_request_payload(request),
|
|
"journey_context": {
|
|
"from": journey_result.get("from"),
|
|
"to": journey_result.get("to"),
|
|
"via": journey_result.get("via"),
|
|
"sources": journey_result.get("sources", []),
|
|
},
|
|
"itineraries": [itinerary_payload(db, itinerary) for itinerary in itineraries],
|
|
}
|
|
|
|
|
|
def travel_request_payload(request: TravelRequest) -> dict[str, Any]:
|
|
return {
|
|
"id": request.id,
|
|
"origin_stop_id": request.origin_stop_id,
|
|
"destination_stop_id": request.destination_stop_id,
|
|
"via_stop_id": request.via_stop_id,
|
|
"departure_time": request.departure_time,
|
|
"service_date": request.service_date,
|
|
"max_transfers": request.max_transfers,
|
|
"transfer_seconds": request.transfer_seconds,
|
|
"source_filter": request.source_filter,
|
|
"preferences": _json_dict(request.preferences_json),
|
|
"created_at": request.created_at.isoformat() if request.created_at else None,
|
|
}
|
|
|
|
|
|
def itinerary_payload(db: Session, itinerary: Itinerary) -> dict[str, Any]:
|
|
legs = db.scalars(
|
|
select(ItineraryLeg)
|
|
.where(ItineraryLeg.itinerary_id == itinerary.id)
|
|
.order_by(ItineraryLeg.sequence)
|
|
).all()
|
|
return {
|
|
"id": itinerary.id,
|
|
"request_id": itinerary.request_id,
|
|
"title": itinerary.title,
|
|
"family": itinerary.family,
|
|
"status": itinerary.status,
|
|
"saved": itinerary.saved,
|
|
"summary": _json_dict(itinerary.summary_json),
|
|
"score": _json_dict(itinerary.score_json),
|
|
"payload": _json_dict(itinerary.payload_json),
|
|
"legs": [itinerary_leg_payload(leg) for leg in legs],
|
|
"created_at": itinerary.created_at.isoformat() if itinerary.created_at else None,
|
|
"updated_at": itinerary.updated_at.isoformat() if itinerary.updated_at else None,
|
|
}
|
|
|
|
|
|
def itinerary_leg_payload(leg: ItineraryLeg) -> dict[str, Any]:
|
|
return {
|
|
"id": leg.id,
|
|
"itinerary_id": leg.itinerary_id,
|
|
"sequence": leg.sequence,
|
|
"mode": leg.mode,
|
|
"route_ref": leg.route_ref,
|
|
"route_name": leg.route_name,
|
|
"from_name": leg.from_name,
|
|
"to_name": leg.to_name,
|
|
"departure_time": leg.departure_time,
|
|
"arrival_time": leg.arrival_time,
|
|
"locked": leg.locked,
|
|
"payload": _json_dict(leg.payload_json),
|
|
}
|
|
|
|
|
|
def set_itinerary_saved(db: Session, itinerary: Itinerary, saved: bool) -> dict[str, Any]:
|
|
itinerary.saved = saved
|
|
itinerary.status = "saved" if saved else "candidate"
|
|
itinerary.updated_at = datetime.now(timezone.utc)
|
|
db.flush()
|
|
return itinerary_payload(db, itinerary)
|
|
|
|
|
|
def set_leg_locked(db: Session, leg: ItineraryLeg, locked: bool) -> dict[str, Any]:
|
|
leg.locked = locked
|
|
itinerary = db.get(Itinerary, leg.itinerary_id)
|
|
if itinerary is not None:
|
|
itinerary.updated_at = datetime.now(timezone.utc)
|
|
db.flush()
|
|
return itinerary_leg_payload(leg)
|
|
|
|
|
|
def recent_itineraries(db: Session, *, saved_only: bool = False, limit: int = 30) -> list[dict[str, Any]]:
|
|
stmt = select(Itinerary).order_by(Itinerary.updated_at.desc(), Itinerary.id.desc())
|
|
if saved_only:
|
|
stmt = stmt.where(Itinerary.saved.is_(True))
|
|
rows = db.scalars(stmt.limit(max(1, min(limit, 100)))).all()
|
|
return [itinerary_payload(db, itinerary) for itinerary in rows]
|
|
|
|
|
|
def _journey_itinerary(request_id: int, journey: dict, index: int) -> Itinerary:
|
|
score = _journey_score(journey)
|
|
summary = {
|
|
"departure_time": journey.get("departure_time"),
|
|
"arrival_time": journey.get("arrival_time"),
|
|
"duration_minutes": journey.get("duration_minutes"),
|
|
"duration_label": journey.get("duration_label"),
|
|
"transfers": journey.get("transfers"),
|
|
"leg_count": len(journey.get("legs", [])),
|
|
"route_refs": [leg.get("route_ref") or leg.get("route_id") for leg in journey.get("legs", [])],
|
|
}
|
|
return Itinerary(
|
|
request_id=request_id,
|
|
title=f"Public transport option {index}",
|
|
family="public_transport",
|
|
status="candidate",
|
|
saved=False,
|
|
summary_json=json.dumps(summary, separators=(",", ":")),
|
|
score_json=json.dumps(score, separators=(",", ":")),
|
|
payload_json=json.dumps({"journey": journey}, separators=(",", ":")),
|
|
)
|
|
|
|
|
|
def _add_journey_legs(db: Session, itinerary_id: int, journey: dict) -> None:
|
|
for index, leg in enumerate(journey.get("legs", []), start=1):
|
|
db.add(
|
|
ItineraryLeg(
|
|
itinerary_id=itinerary_id,
|
|
sequence=index,
|
|
mode=leg.get("mode"),
|
|
route_ref=leg.get("route_ref"),
|
|
route_name=leg.get("route_name"),
|
|
from_name=(leg.get("from") or {}).get("name") or (leg.get("from") or {}).get("stop_id"),
|
|
to_name=(leg.get("to") or {}).get("name") or (leg.get("to") or {}).get("stop_id"),
|
|
departure_time=leg.get("departure_time"),
|
|
arrival_time=leg.get("arrival_time"),
|
|
locked=False,
|
|
payload_json=json.dumps({"journey_leg": leg}, separators=(",", ":")),
|
|
)
|
|
)
|
|
|
|
|
|
def _car_itinerary(db: Session, request_id: int, from_stop: dict | None, to_stop: dict | None) -> Itinerary | None:
|
|
from_lon = _float_or_none((from_stop or {}).get("lon"))
|
|
from_lat = _float_or_none((from_stop or {}).get("lat"))
|
|
to_lon = _float_or_none((to_stop or {}).get("lon"))
|
|
to_lat = _float_or_none((to_stop or {}).get("lat"))
|
|
if None in {from_lon, from_lat, to_lon, to_lat}:
|
|
return None
|
|
try:
|
|
route = route_between_points(
|
|
db,
|
|
from_lon=from_lon,
|
|
from_lat=from_lat,
|
|
to_lon=to_lon,
|
|
to_lat=to_lat,
|
|
mode="drive",
|
|
max_visited=300_000,
|
|
)
|
|
except Exception: # noqa: BLE001 - car comparison is optional
|
|
return None
|
|
duration_seconds = _float_or_none(route.get("duration_seconds"))
|
|
duration_minutes = duration_minutes_ceil(duration_seconds)
|
|
distance_m = _float_or_none(route.get("distance_m"))
|
|
summary = {
|
|
"from": (from_stop or {}).get("name") or (from_stop or {}).get("stop_id") or "origin",
|
|
"to": (to_stop or {}).get("name") or (to_stop or {}).get("stop_id") or "destination",
|
|
"duration_minutes": duration_minutes,
|
|
"duration_label": format_duration_label(duration_seconds),
|
|
"distance_km": None if distance_m is None else round(distance_m / 1000, 1),
|
|
"transfers": 0,
|
|
"engine": route.get("engine"),
|
|
}
|
|
score = {
|
|
"duration_minutes": duration_minutes,
|
|
"transfers": 0,
|
|
"complexity": 1,
|
|
"emissions": "high",
|
|
"estimated_cost": None,
|
|
}
|
|
return Itinerary(
|
|
request_id=request_id,
|
|
title="Car only",
|
|
family="car",
|
|
status="candidate",
|
|
saved=False,
|
|
summary_json=json.dumps(summary, separators=(",", ":")),
|
|
score_json=json.dumps(score, separators=(",", ":")),
|
|
payload_json=json.dumps({"routing": route}, separators=(",", ":")),
|
|
)
|
|
|
|
|
|
def _add_routing_leg(db: Session, itinerary_id: int, itinerary: Itinerary) -> None:
|
|
payload = _json_dict(itinerary.payload_json)
|
|
route = payload.get("routing") if isinstance(payload, dict) else None
|
|
if not isinstance(route, dict):
|
|
return
|
|
db.add(
|
|
ItineraryLeg(
|
|
itinerary_id=itinerary_id,
|
|
sequence=1,
|
|
mode=str(route.get("mode") or "drive"),
|
|
route_ref=None,
|
|
route_name="Road route",
|
|
from_name=str((route.get("start_node") or {}).get("osm_node_id") or "origin"),
|
|
to_name=str((route.get("target_node") or {}).get("osm_node_id") or "destination"),
|
|
departure_time=None,
|
|
arrival_time=None,
|
|
locked=False,
|
|
payload_json=json.dumps({"routing_leg": route}, separators=(",", ":")),
|
|
)
|
|
)
|
|
|
|
|
|
def _placeholder_itineraries(
|
|
request_id: int,
|
|
from_stop: dict | None,
|
|
to_stop: dict | None,
|
|
*,
|
|
service_date: str | None,
|
|
include_car: bool = True,
|
|
) -> list[Itinerary]:
|
|
from_name = (from_stop or {}).get("name") or (from_stop or {}).get("stop_id") or "origin"
|
|
to_name = (to_stop or {}).get("name") or (to_stop or {}).get("stop_id") or "destination"
|
|
placeholders = [
|
|
("car_ferry", "Car + ferry", "Needs ferry-port candidate graph", {"complexity": 3, "emissions": "medium_high"}),
|
|
("flight_access", "Flight + airport access", "Needs airport/flight schedule connector", {"complexity": 4, "emissions": "high"}),
|
|
("rail_long_stay", "Rail with adjustable city stop", "Use via stop and leg locking to refine", {"complexity": 3, "emissions": "low"}),
|
|
]
|
|
if include_car:
|
|
placeholders.insert(0, ("car", "Car only", "Needs road-routing connector", {"complexity": 1, "emissions": "high"}))
|
|
rows = []
|
|
for family, title, note, score in placeholders:
|
|
summary = {
|
|
"from": from_name,
|
|
"to": to_name,
|
|
"service_date": service_date,
|
|
"note": note,
|
|
"duration_minutes": None,
|
|
"transfers": None,
|
|
}
|
|
rows.append(
|
|
Itinerary(
|
|
request_id=request_id,
|
|
title=title,
|
|
family=family,
|
|
status="placeholder",
|
|
saved=False,
|
|
summary_json=json.dumps(summary, separators=(",", ":")),
|
|
score_json=json.dumps(score, separators=(",", ":")),
|
|
payload_json=json.dumps({"placeholder": True, "note": note}, separators=(",", ":")),
|
|
)
|
|
)
|
|
return rows
|
|
|
|
|
|
def _float_or_none(value: object) -> float | None:
|
|
try:
|
|
return None if value is None else float(value)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def _journey_score(journey: dict) -> dict[str, Any]:
|
|
modes = [leg.get("mode") for leg in journey.get("legs", [])]
|
|
duration = journey.get("duration_minutes")
|
|
transfers = int(journey.get("transfers") or 0)
|
|
railish = sum(1 for mode in modes if mode in {"train", "subway", "tram", "light_rail"})
|
|
busish = sum(1 for mode in modes if mode in {"bus", "coach", "trolleybus"})
|
|
emissions_hint = "low" if railish >= busish else "medium"
|
|
return {
|
|
"duration_minutes": duration,
|
|
"transfers": transfers,
|
|
"complexity": transfers + len(modes),
|
|
"emissions": emissions_hint,
|
|
"overnight": False,
|
|
"estimated_cost": None,
|
|
}
|
|
|
|
|
|
def _json_dict(value: str | None) -> dict[str, Any]:
|
|
try:
|
|
data = json.loads(value or "{}")
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
return data if isinstance(data, dict) else {}
|