Files
meubility-workbench/app/feed_discovery.py
2026-07-01 23:29:51 +02:00

924 lines
35 KiB
Python

from __future__ import annotations
import csv
import hashlib
import json
import re
from dataclasses import dataclass, field
from datetime import datetime, timezone
from html import unescape
from html.parser import HTMLParser
from pathlib import Path
from typing import Iterable
from urllib.parse import parse_qs, urljoin, urlparse
import requests
MOBILITY_DATABASE_FEEDS_URL = "https://files.mobilitydatabase.org/feeds_v2.csv"
MOBILITY_DATABASE_ACCEPTANCE_TEST_URL = (
"https://raw.githubusercontent.com/MobilityData/gtfs-validator/master/"
"scripts/mobility-database-harvester/acceptance_test_feed_list.csv"
)
PTNA_GTFS_INDEX_URL = "https://ptna.openstreetmap.de/gtfs/index.html"
PTNA_COUNTRY_URL_TEMPLATE = "https://ptna.openstreetmap.de/gtfs/{country}/index.php"
DEFAULT_DISCOVERY_COUNTRIES = ["DE", "AT", "CH", "NL", "DK", "FR", "BE", "LU", "NO", "SE", "FI", "IE", "GB"]
CURATED_TEST_COUNTRIES = ["DE", "CH", "AT", "NL", "DK", "FI", "NO", "SE", "IE", "GB", "FR", "BE", "LU"]
DIRECT_INGEST_HEADERS = ["name", "kind", "url", "country", "license", "mode_scope", "source_basis", "priority", "notes"]
CANONICAL_HEADERS = [
"candidate_id",
"discovery_source",
"country",
"subdivision",
"provider",
"feed_name",
"stable_id",
"ptna_feed_id",
"data_type",
"status",
"is_official",
"selected_url",
"direct_download_url",
"latest_url",
"original_release_url",
"license_url",
"license_text",
"osm_license_text",
"details_url",
"routes_url",
"valid_from",
"valid_to",
"release_date",
"feed_version",
"bbox",
"features",
"priority",
"availability_status",
"http_status",
"content_type",
"content_length",
"final_url",
"source_basis",
"notes",
]
@dataclass
class FeedCandidate:
discovery_source: str
country: str = ""
subdivision: str = ""
provider: str = ""
feed_name: str = ""
stable_id: str = ""
ptna_feed_id: str = ""
data_type: str = "gtfs"
status: str = ""
is_official: str = ""
selected_url: str = ""
direct_download_url: str = ""
latest_url: str = ""
original_release_url: str = ""
license_url: str = ""
license_text: str = ""
osm_license_text: str = ""
details_url: str = ""
routes_url: str = ""
valid_from: str = ""
valid_to: str = ""
release_date: str = ""
feed_version: str = ""
bbox: str = ""
features: str = ""
priority: str = ""
availability_status: str = "unchecked"
http_status: str = ""
content_type: str = ""
content_length: str = ""
final_url: str = ""
source_basis: str = ""
notes: str = ""
evidence_sources: list[str] = field(default_factory=list)
def key(self) -> str:
if self.stable_id:
return f"stable:{self.stable_id}"
if self.selected_url:
return f"url:{_normalize_url_key(self.selected_url)}"
if self.ptna_feed_id:
return f"ptna:{self.ptna_feed_id}"
return "hash:" + hashlib.sha256(json.dumps(self.row(), sort_keys=True).encode("utf-8")).hexdigest()
def candidate_id(self) -> str:
seed = "|".join(
[
self.discovery_source,
self.country,
self.stable_id,
self.ptna_feed_id,
self.selected_url,
self.provider,
self.feed_name,
]
)
return hashlib.sha256(seed.encode("utf-8")).hexdigest()[:16]
def row(self) -> dict[str, str]:
payload = {header: _string(getattr(self, header, "")) for header in CANONICAL_HEADERS if header != "candidate_id"}
payload["candidate_id"] = self.candidate_id()
return payload
def ingestable_row(self) -> dict[str, str]:
name = _feed_source_name(self.country, self.provider or self.feed_name)
license_value = self.license_text or (f"see {self.license_url}" if self.license_url else "")
basis_parts = [self.source_basis or self.discovery_source]
if self.details_url:
basis_parts.append(f"details: {self.details_url}")
if self.original_release_url and self.original_release_url != self.selected_url:
basis_parts.append(f"release: {self.original_release_url}")
notes = self.notes or ""
if self.latest_url and self.latest_url != self.selected_url:
notes = _join_notes(notes, f"Mobility Database mirror: {self.latest_url}")
if self.osm_license_text:
notes = _join_notes(notes, f"OSM permission note: {_truncate(self.osm_license_text, 240)}")
return {
"name": _truncate(name, 240),
"kind": "gtfs",
"url": self.selected_url,
"country": self.country,
"license": _truncate(license_value, 240),
"mode_scope": _mode_scope_from_features(self.features),
"source_basis": _truncate("; ".join(part for part in basis_parts if part), 500),
"priority": self.priority or _candidate_priority(self),
"notes": _truncate(notes, 1200),
}
def default_generated_dir() -> Path:
return Path(__file__).resolve().parents[1] / "docs" / "generated"
def build_gtfs_discovery_manifests(
*,
output_dir: Path | str | None = None,
countries: Iterable[str] | None = None,
include_mobility_database: bool = True,
include_acceptance_test_list: bool = True,
include_ptna: bool = True,
max_ptna_details: int = 80,
test_limit: int = 24,
check_urls: bool = False,
timeout: float = 30.0,
) -> dict[str, object]:
selected_countries = _normalize_countries(countries)
out_dir = Path(output_dir) if output_dir is not None else default_generated_dir()
out_dir.mkdir(parents=True, exist_ok=True)
candidates: list[FeedCandidate] = []
candidates.extend(load_curated_ingestable_seed(countries=selected_countries))
if include_mobility_database:
candidates.extend(fetch_mobility_database_candidates(countries=selected_countries, timeout=timeout))
if include_acceptance_test_list:
candidates.extend(fetch_mobility_acceptance_candidates(countries=selected_countries, timeout=timeout))
if include_ptna:
candidates.extend(fetch_ptna_candidates(countries=selected_countries, max_details=max_ptna_details, timeout=timeout))
merged = merge_candidates(candidates)
ingestable = [candidate for candidate in merged if candidate.selected_url and candidate.data_type == "gtfs"]
if check_urls:
for candidate in ingestable:
annotate_url_availability(candidate, timeout=min(timeout, 12.0))
test_run = select_test_run_candidates(ingestable, limit=test_limit)
candidates_path = out_dir / "gtfs_feed_candidates.csv"
ingestable_path = out_dir / "gtfs_ingestable_sources.csv"
test_path = out_dir / "gtfs_test_run_sources.csv"
report_path = out_dir / "gtfs_discovery_report.json"
_write_csv(candidates_path, CANONICAL_HEADERS, [candidate.row() for candidate in merged])
_write_csv(ingestable_path, DIRECT_INGEST_HEADERS, [candidate.ingestable_row() for candidate in ingestable])
_write_csv(test_path, DIRECT_INGEST_HEADERS, [candidate.ingestable_row() for candidate in test_run])
by_source = _count_by(merged, lambda item: item.discovery_source)
by_country = _count_by(ingestable, lambda item: item.country or "unknown")
report = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"countries": selected_countries or "all",
"sources": {
"mobility_database": MOBILITY_DATABASE_FEEDS_URL if include_mobility_database else None,
"mobility_acceptance_test_list": MOBILITY_DATABASE_ACCEPTANCE_TEST_URL if include_acceptance_test_list else None,
"ptna": PTNA_GTFS_INDEX_URL if include_ptna else None,
},
"counts": {
"candidates": len(merged),
"ingestable": len(ingestable),
"test_run": len(test_run),
"by_source": by_source,
"ingestable_by_country": by_country,
},
"files": {
"candidates": str(candidates_path),
"ingestable": str(ingestable_path),
"test_run": str(test_path),
},
}
report_path.write_text(json.dumps(report, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
return report
def fetch_mobility_database_candidates(
*,
countries: list[str] | None = None,
timeout: float = 30.0,
url: str = MOBILITY_DATABASE_FEEDS_URL,
) -> list[FeedCandidate]:
text = _fetch_text(url, timeout=timeout)
rows = csv.DictReader(text.splitlines())
candidates: list[FeedCandidate] = []
for row in rows:
if _value(row, "data_type").lower() != "gtfs":
continue
country = _value(row, "location.country_code").upper()
if countries and country not in countries:
continue
direct_url = _normalize_feed_url(_value(row, "urls.direct_download"))
latest_url = _normalize_feed_url(_value(row, "urls.latest"))
selected_url = _choose_feed_url(direct_url, latest_url)
candidate = FeedCandidate(
discovery_source="mobility_database",
country=country,
subdivision=_value(row, "location.subdivision_name"),
provider=_value(row, "provider"),
feed_name=_value(row, "name"),
stable_id=_value(row, "id"),
data_type="gtfs",
status=_value(row, "status"),
is_official=_value(row, "is_official"),
selected_url=selected_url,
direct_download_url=direct_url,
latest_url=latest_url,
license_url=_value(row, "urls.license"),
bbox=_bbox_from_mobility_row(row),
features=_value(row, "features"),
source_basis="Mobility Database feed catalog",
notes=_value(row, "note"),
)
normalize_candidate_geography(candidate)
apply_known_download_overrides(candidate)
candidate.priority = _candidate_priority(candidate)
candidates.append(candidate)
return candidates
def fetch_mobility_acceptance_candidates(
*,
countries: list[str] | None = None,
timeout: float = 30.0,
url: str = MOBILITY_DATABASE_ACCEPTANCE_TEST_URL,
) -> list[FeedCandidate]:
text = _fetch_text(url, timeout=timeout)
rows = csv.DictReader(text.splitlines())
candidates: list[FeedCandidate] = []
for row in rows:
country = _value(row, "country_code").upper()
if countries and country not in countries:
continue
latest_url = _normalize_feed_url(_value(row, "urls.latest"))
if not latest_url:
continue
candidate = FeedCandidate(
discovery_source="mobility_validator_acceptance",
country=country,
subdivision=_value(row, "subdivision_name"),
provider=_value(row, "provider"),
feed_name=_value(row, "provider"),
stable_id=_value(row, "stable_id"),
status="acceptance_test",
selected_url=latest_url,
latest_url=latest_url,
source_basis="MobilityData validator acceptance-test feed list",
notes="Useful smoke-test feed list; prefer Mobility Database feeds_v2 metadata for production source review.",
priority="P3",
)
normalize_candidate_geography(candidate)
apply_known_download_overrides(candidate)
candidates.append(candidate)
return candidates
def fetch_ptna_candidates(
*,
countries: list[str] | None = None,
max_details: int = 80,
timeout: float = 30.0,
) -> list[FeedCandidate]:
country_codes = countries or DEFAULT_DISCOVERY_COUNTRIES
if not country_codes:
country_codes = discover_ptna_country_codes(timeout=timeout)
candidates: list[FeedCandidate] = []
detail_fetches = 0
for country in country_codes:
country_url = PTNA_COUNTRY_URL_TEMPLATE.format(country=country)
try:
html = _fetch_text(country_url, timeout=timeout)
except requests.RequestException:
continue
for candidate in parse_ptna_country_page(html, country=country, page_url=country_url):
if candidate.details_url and detail_fetches < max_details:
try:
detail_html = _fetch_text(candidate.details_url, timeout=timeout)
enrich_ptna_candidate_from_details(candidate, detail_html, candidate.details_url)
detail_fetches += 1
except requests.RequestException:
candidate.notes = _join_notes(candidate.notes, "PTNA detail page could not be fetched during discovery.")
candidate.priority = _candidate_priority(candidate)
candidates.append(candidate)
return candidates
def discover_ptna_country_codes(*, timeout: float = 30.0) -> list[str]:
html = _fetch_text(PTNA_GTFS_INDEX_URL, timeout=timeout)
links = _all_links(html, PTNA_GTFS_INDEX_URL)
codes: list[str] = []
for link in links:
match = re.search(r"/gtfs/([A-Z]{2})/index\.php$", urlparse(link).path)
if match and match.group(1) not in codes:
codes.append(match.group(1))
return codes
def parse_ptna_country_page(html: str, *, country: str, page_url: str) -> list[FeedCandidate]:
rows = _parse_table_rows(html, page_url)
candidates: list[FeedCandidate] = []
for row in rows:
links = [link for cell in row.cells for link in cell.links]
routes_url = _first_link_matching(links, "routes.php?feed=")
details_url = _first_link_matching(links, "gtfs-details.php?feed=")
if not routes_url and not details_url:
continue
feed_id = _feed_id_from_url(routes_url or details_url)
if not feed_id:
continue
texts = [cell.text for cell in row.cells]
release_link = _normalize_feed_url(row.cells[6].first_external_link if len(row.cells) > 6 else "")
direct_url = release_link if _looks_like_download_url(release_link) else ""
candidate = FeedCandidate(
discovery_source="ptna",
country=country,
provider=texts[2] if len(texts) > 2 else "",
feed_name=texts[1] if len(texts) > 1 else feed_id,
ptna_feed_id=feed_id,
selected_url=direct_url,
direct_download_url=direct_url,
original_release_url=release_link,
details_url=details_url,
routes_url=routes_url,
valid_from=texts[3] if len(texts) > 3 else "",
valid_to=texts[4] if len(texts) > 4 else "",
feed_version=texts[5] if len(texts) > 5 else "",
release_date=texts[6] if len(texts) > 6 else "",
source_basis="PTNA GTFS analysis",
notes="PTNA candidate; use original publisher URL where available.",
)
normalize_candidate_geography(candidate)
apply_known_download_overrides(candidate)
candidates.append(candidate)
return candidates
def enrich_ptna_candidate_from_details(candidate: FeedCandidate, html: str, page_url: str) -> None:
fields = parse_ptna_detail_fields(html, page_url)
candidate.original_release_url = _normalize_feed_url(fields.get("release url href") or fields.get("release url") or candidate.original_release_url)
candidate.license_url = fields.get("publisher's license href") or candidate.license_url
candidate.license_text = fields.get("publisher's license") or candidate.license_text
candidate.osm_license_text = fields.get("license given for use in osm") or candidate.osm_license_text
candidate.valid_from = fields.get("feed start date") or candidate.valid_from
candidate.valid_to = fields.get("feed end date") or candidate.valid_to
candidate.feed_version = fields.get("feed version") or candidate.feed_version
candidate.release_date = fields.get("release date") or candidate.release_date
network_guid = fields.get('"network:guid"')
if network_guid:
candidate.notes = _join_notes(candidate.notes, f"PTNA network:guid={network_guid}")
if not candidate.selected_url and _looks_like_download_url(candidate.original_release_url):
candidate.selected_url = _normalize_feed_url(candidate.original_release_url)
candidate.direct_download_url = candidate.selected_url
normalize_candidate_geography(candidate)
def parse_ptna_detail_fields(html: str, page_url: str) -> dict[str, str]:
parsed: dict[str, str] = {}
for row in _parse_table_rows(html, page_url):
if len(row.cells) < 2:
continue
label = _clean_text(row.cells[0].text).lower()
if not label:
continue
detail = _clean_text(row.cells[1].text)
parsed[label] = detail
if row.cells[1].first_external_link:
parsed[f"{label} href"] = row.cells[1].first_external_link
return parsed
def load_curated_ingestable_seed(
*,
countries: list[str] | None = None,
path: Path | str | None = None,
) -> list[FeedCandidate]:
seed_path = Path(path) if path is not None else Path(__file__).resolve().parents[1] / "docs" / "ingestable_sources_seed.csv"
if not seed_path.exists():
return []
candidates: list[FeedCandidate] = []
with seed_path.open("r", encoding="utf-8-sig", newline="") as handle:
for row in csv.DictReader(handle):
if _value(row, "kind").lower() != "gtfs":
continue
country = _value(row, "country").upper()
if countries and country not in countries and country != "EU":
continue
candidate = FeedCandidate(
discovery_source="curated_seed",
country=country,
provider=_value(row, "name").removesuffix(" GTFS"),
feed_name=_value(row, "name"),
selected_url=_normalize_feed_url(_value(row, "url")),
direct_download_url=_normalize_feed_url(_value(row, "url")),
license_text=_value(row, "license"),
features=_value(row, "mode_scope"),
priority=_value(row, "priority"),
source_basis=_value(row, "source_basis") or "curated seed",
notes=_value(row, "notes"),
)
normalize_candidate_geography(candidate)
apply_known_download_overrides(candidate)
candidates.append(candidate)
return candidates
def merge_candidates(candidates: Iterable[FeedCandidate]) -> list[FeedCandidate]:
by_key: dict[str, FeedCandidate] = {}
alias_to_key: dict[str, str] = {}
for candidate in candidates:
keys = _candidate_alias_keys(candidate)
primary_key = keys[0]
existing_key = next((alias_to_key[key] for key in keys if key in alias_to_key), None)
existing = by_key.get(existing_key) if existing_key is not None else None
if existing is None:
by_key[primary_key] = candidate
for key in keys:
alias_to_key[key] = primary_key
continue
_merge_candidate(existing, candidate)
for key in keys:
alias_to_key[key] = existing_key or primary_key
return sorted(by_key.values(), key=lambda item: (_priority_sort_key(item.priority), item.country, item.provider.lower(), item.feed_name.lower()))
def select_test_run_candidates(candidates: Iterable[FeedCandidate], *, limit: int = 24) -> list[FeedCandidate]:
sorted_candidates = sorted(
[
candidate
for candidate in candidates
if candidate.discovery_source != "mobility_validator_acceptance" and _test_candidate_eligible(candidate)
],
key=_test_candidate_sort_key,
)
selected: list[FeedCandidate] = []
seen_urls: set[str] = set()
per_country: dict[str, int] = {}
def add(candidate: FeedCandidate, *, force: bool = False) -> None:
if len(selected) >= limit:
return
url_key = _normalize_url_key(candidate.selected_url)
if not candidate.selected_url or url_key in seen_urls:
return
country = candidate.country or "unknown"
country_limit = 7 if force and country == "DE" else 3
if per_country.get(country, 0) >= country_limit:
return
selected.append(candidate)
seen_urls.add(url_key)
per_country[country] = per_country.get(country, 0) + 1
preferred_tokens = [
"opendata-oepnv.de",
"download.gtfs.de/germany/",
"vbb.de/vbbgtfs",
"rnv-online.de",
"vrn.de",
"gtfs.geops.ch",
"wienerlinien.at",
"gtfs.openov.nl",
"gtfs.ovapi.nl",
"rejseplanen.info",
"dev.hsl.fi/gtfs",
"hsldev.com/gtfs",
"rb_norway-aggregated-gtfs",
"data.bus-data.dft.gov.uk",
"transportforireland",
"gtfs.irail.be/de-lijn",
]
for candidate in sorted_candidates:
text = " ".join([candidate.provider, candidate.feed_name, candidate.source_basis, candidate.selected_url]).lower()
if any(token in text for token in preferred_tokens):
add(candidate, force=True)
for country in CURATED_TEST_COUNTRIES:
for candidate in sorted_candidates:
if candidate.country == country:
add(candidate)
if len(selected) >= limit:
break
if len(selected) >= limit:
break
for candidate in sorted_candidates:
add(candidate)
if len(selected) >= limit:
break
return selected
def _test_candidate_eligible(candidate: FeedCandidate) -> bool:
if not candidate.selected_url:
return False
if _priority_sort_key(candidate.priority) > 2:
return False
text = " ".join([candidate.status, candidate.selected_url, candidate.provider, candidate.feed_name, candidate.notes]).lower()
if "deprecated" in text or "inactive" in text or "{apikey}" in text:
return False
if "registration required" in text or "authentication" in text:
return False
return True
def annotate_url_availability(candidate: FeedCandidate, *, timeout: float = 10.0) -> FeedCandidate:
if not candidate.selected_url:
candidate.availability_status = "missing_url"
return candidate
headers = {"User-Agent": "meubility-workbench-feed-discovery/0.1"}
try:
response = requests.head(candidate.selected_url, allow_redirects=True, timeout=timeout, headers=headers)
if response.status_code in {405, 403} or response.status_code >= 500:
response = requests.get(
candidate.selected_url,
allow_redirects=True,
timeout=timeout,
headers={**headers, "Range": "bytes=0-0"},
stream=True,
)
candidate.http_status = str(response.status_code)
candidate.content_type = response.headers.get("content-type", "")
candidate.content_length = response.headers.get("content-length", "")
candidate.final_url = response.url
candidate.availability_status = "ok" if response.status_code < 400 else "error"
response.close()
except requests.RequestException as exc:
candidate.availability_status = "error"
candidate.notes = _join_notes(candidate.notes, f"Availability check failed: {exc}")
return candidate
def normalize_candidate_geography(candidate: FeedCandidate) -> None:
text = " ".join(
[
candidate.selected_url,
candidate.direct_download_url,
candidate.latest_url,
candidate.original_release_url,
candidate.provider,
candidate.feed_name,
candidate.source_basis,
]
).lower()
if "download.gtfs.de/germany/" in text or "gtfs for germany" in text:
candidate.country = "DE"
elif "storage.googleapis.com/marduk-production/outbound/gtfs/rb_norway" in text:
candidate.country = "NO"
elif "gtfs.ovapi.nl" in text or "openov.nl" in text:
candidate.country = "NL"
elif "www.nvbw.de/fileadmin/user_upload/service/open_data/" in text:
candidate.country = "DE"
def apply_known_download_overrides(candidate: FeedCandidate) -> None:
stale_direct_ids = {"mdb-684", "mdb-777"}
if candidate.stable_id in stale_direct_ids and candidate.latest_url:
candidate.selected_url = candidate.latest_url
candidate.notes = _join_notes(
candidate.notes,
"Selected Mobility Database latest.zip mirror because the catalog direct URL is known to be stale.",
)
@dataclass
class _HtmlCell:
text: str = ""
links: list[str] = field(default_factory=list)
@property
def first_external_link(self) -> str:
for link in self.links:
parsed = urlparse(link)
if parsed.scheme in {"http", "https"} and "ptna.openstreetmap.de" not in parsed.netloc:
return link
return ""
@dataclass
class _HtmlRow:
cells: list[_HtmlCell] = field(default_factory=list)
class _TableParser(HTMLParser):
def __init__(self, base_url: str):
super().__init__(convert_charrefs=True)
self.base_url = base_url
self.rows: list[_HtmlRow] = []
self._row: _HtmlRow | None = None
self._cell: _HtmlCell | None = None
self._active_link: str = ""
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
attrs_dict = {key: value or "" for key, value in attrs}
if tag == "tr":
self._row = _HtmlRow()
elif tag in {"td", "th"} and self._row is not None:
self._cell = _HtmlCell()
elif tag == "a" and self._cell is not None:
href = attrs_dict.get("href", "")
if href:
self._active_link = urljoin(self.base_url, href)
self._cell.links.append(self._active_link)
def handle_endtag(self, tag: str) -> None:
if tag in {"td", "th"} and self._row is not None and self._cell is not None:
self._cell.text = _clean_text(self._cell.text)
self._row.cells.append(self._cell)
self._cell = None
self._active_link = ""
elif tag == "a":
self._active_link = ""
elif tag == "tr":
if self._row is not None and self._row.cells:
self.rows.append(self._row)
self._row = None
self._cell = None
self._active_link = ""
def handle_data(self, data: str) -> None:
if self._cell is not None:
self._cell.text += data
class _LinkParser(HTMLParser):
def __init__(self, base_url: str):
super().__init__(convert_charrefs=True)
self.base_url = base_url
self.links: list[str] = []
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
if tag != "a":
return
for key, value in attrs:
if key == "href" and value:
self.links.append(urljoin(self.base_url, value))
def _parse_table_rows(html: str, base_url: str) -> list[_HtmlRow]:
parser = _TableParser(base_url)
parser.feed(html)
return parser.rows
def _all_links(html: str, base_url: str) -> list[str]:
parser = _LinkParser(base_url)
parser.feed(html)
return parser.links
def _fetch_text(url: str, *, timeout: float) -> str:
response = requests.get(url, timeout=timeout, headers={"User-Agent": "meubility-workbench-feed-discovery/0.1"})
response.raise_for_status()
return response.text
def _first_link_matching(links: Iterable[str], needle: str) -> str:
for link in links:
if needle in link:
return link
return ""
def _feed_id_from_url(url: str) -> str:
query = parse_qs(urlparse(url).query)
return (query.get("feed") or [""])[0]
def _looks_like_download_url(url: str) -> bool:
if not url:
return False
parsed = urlparse(url)
lower_path = parsed.path.lower()
lower_url = url.lower()
if lower_path.endswith(".zip"):
return True
if "exportformat=gtfs" in lower_url or "google_transit" in lower_url:
return True
if lower_path.rstrip("/").endswith(("current_gtfs", "gtfs")):
return True
if "gtfs.ovapi.nl" in parsed.netloc.lower() and "gtfs" in lower_path:
return True
return False
def _normalize_feed_url(url: str) -> str:
cleaned = _clean_text(url)
if not cleaned:
return ""
parsed = urlparse(cleaned)
if parsed.scheme:
return cleaned
first = cleaned.split("/", 1)[0]
if "." in first:
return f"https://{cleaned}"
return cleaned
def _choose_feed_url(direct_url: str, latest_url: str) -> str:
if direct_url:
return direct_url
return latest_url
def _candidate_priority(candidate: FeedCandidate) -> str:
status = candidate.status.lower()
official = candidate.is_official.lower() == "true"
if candidate.discovery_source == "curated_seed":
return candidate.priority or "P1"
if status == "active" and official and candidate.direct_download_url:
return "P0"
if status == "active" and candidate.direct_download_url:
return "P1"
if status == "active" and candidate.latest_url:
return "P2"
if candidate.discovery_source == "ptna":
return "P2" if candidate.selected_url else "P4"
return "P3"
def _test_candidate_sort_key(candidate: FeedCandidate) -> tuple[int, int, str, str]:
source_bonus = 0 if candidate.discovery_source == "curated_seed" else 1
country_bonus = CURATED_TEST_COUNTRIES.index(candidate.country) if candidate.country in CURATED_TEST_COUNTRIES else 99
return (_priority_sort_key(candidate.priority), source_bonus + country_bonus, candidate.country, candidate.provider.lower())
def _priority_sort_key(priority: str) -> int:
match = re.match(r"P(\d+)", priority or "")
return int(match.group(1)) if match else 9
def _candidate_alias_keys(candidate: FeedCandidate) -> list[str]:
keys = [candidate.key()]
if candidate.stable_id:
keys.append(f"stable:{candidate.stable_id}")
for url in [candidate.selected_url, candidate.direct_download_url, candidate.latest_url]:
if url:
keys.append(f"url:{_normalize_url_key(url)}")
if candidate.ptna_feed_id:
keys.append(f"ptna:{candidate.ptna_feed_id}")
deduped: list[str] = []
for key in keys:
if key not in deduped:
deduped.append(key)
return deduped
def _merge_candidate(existing: FeedCandidate, incoming: FeedCandidate) -> None:
if incoming.discovery_source == "curated_seed":
for field_name in ["country", "provider", "feed_name", "license_text", "features", "source_basis", "notes"]:
new_value = getattr(incoming, field_name, "")
if new_value:
setattr(existing, field_name, new_value)
existing.discovery_source = _join_unique(existing.discovery_source, incoming.discovery_source)
for field_name in CANONICAL_HEADERS:
if field_name == "candidate_id":
continue
current = getattr(existing, field_name, "")
new_value = getattr(incoming, field_name, "")
if not current and new_value:
setattr(existing, field_name, new_value)
existing.priority = _better_priority(existing.priority, incoming.priority)
existing.source_basis = _join_unique(existing.source_basis, incoming.source_basis)
existing.notes = _join_notes(existing.notes, incoming.notes)
def _better_priority(left: str, right: str) -> str:
return left if _priority_sort_key(left) <= _priority_sort_key(right) else right
def _join_unique(left: str, right: str) -> str:
parts: list[str] = []
for value in [left, right]:
for part in value.split(";"):
cleaned = part.strip()
if cleaned and cleaned not in parts:
parts.append(cleaned)
return "; ".join(parts)
def _join_notes(left: str, right: str) -> str:
return _join_unique(left, right)
def _compact_name(value: str) -> str:
return re.sub(r"\s+", " ", _clean_text(value)).strip()
def _feed_source_name(country: str, value: str) -> str:
base = _compact_name(value) or "GTFS feed"
prefix = country.upper()
display = base
if prefix and not base.upper().startswith(f"{prefix} "):
display = f"{prefix} {base}"
if "gtfs" not in display.lower():
display = f"{display} GTFS"
return display
def _clean_text(value: str) -> str:
cleaned = unescape(value or "").replace("\xa0", " ")
cleaned = re.sub(r"\s+", " ", cleaned)
return cleaned.strip()
def _mode_scope_from_features(features: str) -> str:
lower = features.lower()
modes = []
if "rail" in lower or "train" in lower:
modes.append("rail")
if "tram" in lower or "light_rail" in lower:
modes.append("tram")
if "subway" in lower or "metro" in lower:
modes.append("metro")
if "bus" in lower or not modes:
modes.append("bus")
if "ferry" in lower:
modes.append("ferry")
return ",".join(dict.fromkeys(modes))
def _bbox_from_mobility_row(row: dict[str, str]) -> str:
min_lat = _value(row, "location.bounding_box.minimum_latitude")
max_lat = _value(row, "location.bounding_box.maximum_latitude")
min_lon = _value(row, "location.bounding_box.minimum_longitude")
max_lon = _value(row, "location.bounding_box.maximum_longitude")
if not all([min_lat, max_lat, min_lon, max_lon]):
return ""
return f"{min_lon},{min_lat},{max_lon},{max_lat}"
def _normalize_countries(countries: Iterable[str] | None) -> list[str] | None:
if countries is None:
return DEFAULT_DISCOVERY_COUNTRIES
normalized = [country.strip().upper() for country in countries if country and country.strip()]
if any(country == "ALL" for country in normalized):
return None
return normalized
def _normalize_url_key(url: str) -> str:
parsed = urlparse(url.strip())
scheme = parsed.scheme.lower()
netloc = parsed.netloc.lower()
path = parsed.path.rstrip("/")
query = parsed.query
return f"{scheme}://{netloc}{path}" + (f"?{query}" if query else "")
def _write_csv(path: Path, headers: list[str], rows: list[dict[str, str]]) -> None:
with path.open("w", encoding="utf-8", newline="") as handle:
writer = csv.DictWriter(handle, fieldnames=headers, extrasaction="ignore")
writer.writeheader()
writer.writerows(rows)
def _count_by(items: Iterable[FeedCandidate], key_fn) -> dict[str, int]:
counts: dict[str, int] = {}
for item in items:
key = key_fn(item)
counts[key] = counts.get(key, 0) + 1
return dict(sorted(counts.items()))
def _value(row: dict[str, str], key: str) -> str:
return _clean_text(row.get(key, ""))
def _string(value: object) -> str:
return "" if value is None else str(value)
def _truncate(value: str, length: int) -> str:
return value[:length] if value else ""