from __future__ import annotations from datetime import datetime, timedelta, timezone from typing import Any import requests from sqlalchemy import select from sqlalchemy.orm import Session from app.models import Source GEOFABRIK_INDEX_URL = "https://download.geofabrik.de/index-v1-nogeom.json" _CACHE: dict[str, Any] = {"expires_at": None, "rows": None} def geofabrik_catalog(q: str | None = None, limit: int = 80) -> list[dict[str, Any]]: rows = _geofabrik_rows() query = (q or "").strip().casefold() if query: rows = [ row for row in rows if query in row["id"].casefold() or query in row["name"].casefold() or query in (row.get("parent") or "").casefold() or query in " ".join(row.get("country_codes") or []).casefold() ] rows.sort(key=lambda row: (row.get("parent") or "", row["name"])) return rows[: max(1, min(limit, 500))] def geofabrik_entry(geofabrik_id: str) -> dict[str, Any] | None: target = geofabrik_id.strip().casefold() for row in _geofabrik_rows(): if row["id"].casefold() == target: return row return None def create_geofabrik_source(session: Session, geofabrik_id: str, *, import_updates: bool = False) -> Source: entry = geofabrik_entry(geofabrik_id) if entry is None: raise ValueError(f"Geofabrik extract not found: {geofabrik_id}") if not entry.get("pbf_url"): raise ValueError(f"Geofabrik extract has no PBF URL: {geofabrik_id}") existing = session.scalar(select(Source).where(Source.kind == "osm_pbf", Source.url == entry["pbf_url"])) if existing is not None: return existing source = Source( name=f"Geofabrik {entry['name']}", kind="osm_pbf", url=entry["pbf_url"], country=",".join(entry.get("country_codes") or [])[:8] or None, license="ODbL / Geofabrik extract terms", priority="P0 fallback", mode_scope="public transport OSM routes, stops, and infrastructure", source_basis="OpenStreetMap / Geofabrik extracts", notes=_geofabrik_notes(entry, import_updates=import_updates), ) session.add(source) session.flush() if import_updates and entry.get("updates_url"): update_source = Source( name=f"Geofabrik {entry['name']} updates", kind="osm_diff", url=entry["updates_url"], country=source.country, license=source.license, priority=source.priority, mode_scope=source.mode_scope, source_basis="OpenStreetMap / Geofabrik replication diffs", notes=f"Diff base for Geofabrik extract {entry['id']}; applying diffs to a local base extract is not implemented yet.", ) session.add(update_source) return source def _geofabrik_rows() -> list[dict[str, Any]]: now = datetime.now(timezone.utc) expires_at = _CACHE.get("expires_at") if _CACHE.get("rows") is not None and isinstance(expires_at, datetime) and expires_at > now: return list(_CACHE["rows"]) response = requests.get(GEOFABRIK_INDEX_URL, timeout=45) response.raise_for_status() payload = response.json() rows = [_normalize_feature(feature) for feature in payload.get("features", [])] rows = [row for row in rows if row.get("id") and row.get("pbf_url")] _CACHE["rows"] = rows _CACHE["expires_at"] = now + timedelta(hours=12) return list(rows) def _normalize_feature(feature: dict[str, Any]) -> dict[str, Any]: props = feature.get("properties") or {} urls = props.get("urls") or {} country_codes = props.get("iso3166-1:alpha2") or [] if isinstance(country_codes, str): country_codes = [country_codes] return { "id": str(props.get("id") or ""), "name": str(props.get("name") or props.get("id") or ""), "parent": props.get("parent"), "country_codes": country_codes, "pbf_url": urls.get("pbf"), "updates_url": urls.get("updates"), "taginfo_url": urls.get("taginfo"), "urls": urls, } def _geofabrik_notes(entry: dict[str, Any], *, import_updates: bool) -> str: parts = [ f"geofabrik_id={entry['id']}", f"parent={entry.get('parent') or 'root'}", f"updates_url={entry.get('updates_url') or ''}", "diff_source_requested=true" if import_updates else "diff_source_requested=false", "Overlap dedupe is handled by OSM object identity in the route layer; source-specific map layers may still show both extracts.", ] return "; ".join(parts)