121 lines
4.5 KiB
Python
121 lines
4.5 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any
|
|
|
|
import requests
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models import Source
|
|
|
|
|
|
GEOFABRIK_INDEX_URL = "https://download.geofabrik.de/index-v1-nogeom.json"
|
|
_CACHE: dict[str, Any] = {"expires_at": None, "rows": None}
|
|
|
|
|
|
def geofabrik_catalog(q: str | None = None, limit: int = 80) -> list[dict[str, Any]]:
|
|
rows = _geofabrik_rows()
|
|
query = (q or "").strip().casefold()
|
|
if query:
|
|
rows = [
|
|
row
|
|
for row in rows
|
|
if query in row["id"].casefold()
|
|
or query in row["name"].casefold()
|
|
or query in (row.get("parent") or "").casefold()
|
|
or query in " ".join(row.get("country_codes") or []).casefold()
|
|
]
|
|
rows.sort(key=lambda row: (row.get("parent") or "", row["name"]))
|
|
return rows[: max(1, min(limit, 500))]
|
|
|
|
|
|
def geofabrik_entry(geofabrik_id: str) -> dict[str, Any] | None:
|
|
target = geofabrik_id.strip().casefold()
|
|
for row in _geofabrik_rows():
|
|
if row["id"].casefold() == target:
|
|
return row
|
|
return None
|
|
|
|
|
|
def create_geofabrik_source(session: Session, geofabrik_id: str, *, import_updates: bool = False) -> Source:
|
|
entry = geofabrik_entry(geofabrik_id)
|
|
if entry is None:
|
|
raise ValueError(f"Geofabrik extract not found: {geofabrik_id}")
|
|
if not entry.get("pbf_url"):
|
|
raise ValueError(f"Geofabrik extract has no PBF URL: {geofabrik_id}")
|
|
existing = session.scalar(select(Source).where(Source.kind == "osm_pbf", Source.url == entry["pbf_url"]))
|
|
if existing is not None:
|
|
return existing
|
|
source = Source(
|
|
name=f"Geofabrik {entry['name']}",
|
|
kind="osm_pbf",
|
|
url=entry["pbf_url"],
|
|
country=",".join(entry.get("country_codes") or [])[:8] or None,
|
|
license="ODbL / Geofabrik extract terms",
|
|
priority="P0 fallback",
|
|
mode_scope="public transport OSM routes, stops, and infrastructure",
|
|
source_basis="OpenStreetMap / Geofabrik extracts",
|
|
notes=_geofabrik_notes(entry, import_updates=import_updates),
|
|
)
|
|
session.add(source)
|
|
session.flush()
|
|
if import_updates and entry.get("updates_url"):
|
|
update_source = Source(
|
|
name=f"Geofabrik {entry['name']} updates",
|
|
kind="osm_diff",
|
|
url=entry["updates_url"],
|
|
country=source.country,
|
|
license=source.license,
|
|
priority=source.priority,
|
|
mode_scope=source.mode_scope,
|
|
source_basis="OpenStreetMap / Geofabrik replication diffs",
|
|
notes=f"Diff base for Geofabrik extract {entry['id']}; applying diffs to a local base extract is not implemented yet.",
|
|
)
|
|
session.add(update_source)
|
|
return source
|
|
|
|
|
|
def _geofabrik_rows() -> list[dict[str, Any]]:
|
|
now = datetime.now(timezone.utc)
|
|
expires_at = _CACHE.get("expires_at")
|
|
if _CACHE.get("rows") is not None and isinstance(expires_at, datetime) and expires_at > now:
|
|
return list(_CACHE["rows"])
|
|
response = requests.get(GEOFABRIK_INDEX_URL, timeout=45)
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
rows = [_normalize_feature(feature) for feature in payload.get("features", [])]
|
|
rows = [row for row in rows if row.get("id") and row.get("pbf_url")]
|
|
_CACHE["rows"] = rows
|
|
_CACHE["expires_at"] = now + timedelta(hours=12)
|
|
return list(rows)
|
|
|
|
|
|
def _normalize_feature(feature: dict[str, Any]) -> dict[str, Any]:
|
|
props = feature.get("properties") or {}
|
|
urls = props.get("urls") or {}
|
|
country_codes = props.get("iso3166-1:alpha2") or []
|
|
if isinstance(country_codes, str):
|
|
country_codes = [country_codes]
|
|
return {
|
|
"id": str(props.get("id") or ""),
|
|
"name": str(props.get("name") or props.get("id") or ""),
|
|
"parent": props.get("parent"),
|
|
"country_codes": country_codes,
|
|
"pbf_url": urls.get("pbf"),
|
|
"updates_url": urls.get("updates"),
|
|
"taginfo_url": urls.get("taginfo"),
|
|
"urls": urls,
|
|
}
|
|
|
|
|
|
def _geofabrik_notes(entry: dict[str, Any], *, import_updates: bool) -> str:
|
|
parts = [
|
|
f"geofabrik_id={entry['id']}",
|
|
f"parent={entry.get('parent') or 'root'}",
|
|
f"updates_url={entry.get('updates_url') or ''}",
|
|
"diff_source_requested=true" if import_updates else "diff_source_requested=false",
|
|
"Overlap dedupe is handled by OSM object identity in the route layer; source-specific map layers may still show both extracts.",
|
|
]
|
|
return "; ".join(parts)
|