Files
meubility-workbench/app/harmonization.py
2026-07-01 23:29:51 +02:00

395 lines
18 KiB
Python

from __future__ import annotations
from datetime import date, datetime, timezone
from typing import Any
from sqlalchemy import and_, func, select
from sqlalchemy.orm import Session, aliased
from app.data_management import dataset_row_counts
from app.models import (
CanonicalStopLink,
Dataset,
GtfsCalendar,
GtfsCalendarDate,
GtfsRoute,
GtfsStop,
GtfsStopTime,
GtfsTrip,
RouteMatch,
Source,
)
GTFS_QA_NOTE_PREFIX = "[GTFS QA]"
def gtfs_harmonization_inventory(session: Session) -> dict[str, Any]:
feeds = [_feed_inventory_item(session, source) for source in _gtfs_sources(session)]
summary = {
"sources": len(feeds),
"active_sources": sum(1 for feed in feeds if feed["active_dataset"] is not None),
"datasets": sum(len(feed["datasets"]) for feed in feeds),
"ready": sum(1 for feed in feeds if feed["qa_status"] == "ready"),
"needs_review": sum(1 for feed in feeds if feed["qa_status"] == "needs_review"),
"blocked": sum(1 for feed in feeds if feed["qa_status"] == "blocked"),
}
return {
"summary": summary,
"feeds": feeds,
}
def gtfs_harmonization_feed_detail(session: Session, source_id: int) -> dict[str, Any] | None:
source = session.get(Source, source_id)
if source is None or source.kind != "gtfs":
return None
feed = _feed_inventory_item(session, source)
return {
**feed,
"sections": _feed_sections(feed),
}
def _gtfs_sources(session: Session) -> list[Source]:
return session.scalars(select(Source).where(Source.kind == "gtfs").order_by(Source.country, Source.priority, Source.name, Source.id)).all()
def _feed_inventory_item(session: Session, source: Source) -> dict[str, Any]:
datasets = sorted([dataset for dataset in source.datasets if dataset.kind == "gtfs"], key=lambda item: (not item.is_active, item.created_at, item.id))
active_dataset = next((dataset for dataset in datasets if dataset.is_active), None)
counts = dataset_row_counts(session, active_dataset.id, active_dataset.kind) if active_dataset is not None else {}
validation = _validate_gtfs_dataset(session, source, active_dataset, counts)
overlap = _overlap_summary(session, active_dataset)
service = _service_horizon(session, active_dataset)
issues = [*validation["issues"], *service["issues"], *overlap["issues"], *_license_issues(source)]
qa_status = _qa_status(issues, active_dataset)
return {
"source": _source_payload(source),
"active_dataset": None if active_dataset is None else _dataset_payload(active_dataset, counts),
"datasets": [_dataset_payload(dataset, dataset_row_counts(session, dataset.id, dataset.kind)) for dataset in datasets],
"counts": counts,
"validation": validation,
"service": service,
"overlap": overlap,
"license": _license_payload(source),
"issues": issues,
"qa_status": qa_status,
}
def _source_payload(source: Source) -> dict[str, Any]:
return {
"id": source.id,
"name": source.name,
"country": source.country,
"license": source.license,
"priority": source.priority,
"mode_scope": source.mode_scope,
"source_basis": source.source_basis,
"status": source.status,
"enabled": source.enabled,
"last_error": source.last_error,
"last_run_at": _iso(source.last_run_at),
"url": source.url,
"catalog_entry_id": source.catalog_entry_id,
"notes": source.notes,
"qa_review": _qa_review_payload(source.notes),
}
def _dataset_payload(dataset: Dataset, counts: dict[str, Any]) -> dict[str, Any]:
return {
"id": dataset.id,
"kind": dataset.kind,
"is_active": dataset.is_active,
"status": dataset.status,
"sha256": dataset.sha256,
"local_path": dataset.local_path,
"created_at": _iso(dataset.created_at),
"counts": counts,
}
def _validate_gtfs_dataset(session: Session, source: Source, dataset: Dataset | None, counts: dict[str, Any]) -> dict[str, Any]:
if dataset is None:
return {
"status": "blocked",
"items": [],
"issues": [_issue("missing_active_dataset", "bad", "No active GTFS dataset", "Import this source before harmonization.")],
}
items = [
_metric("Agencies", counts.get("agencies", 0), "bad" if not counts.get("agencies", 0) else "good"),
_metric("Stops", counts.get("stops", 0), "bad" if not counts.get("stops", 0) else "good"),
_metric("Routes", counts.get("routes", 0), "bad" if not counts.get("routes", 0) else "good"),
_metric("Trips", counts.get("trips", 0), "bad" if not counts.get("trips", 0) else "good"),
_metric("Stop times", counts.get("stop_times", 0), "bad" if not counts.get("stop_times", 0) else "good"),
_metric("Shapes", counts.get("shapes", 0), "warn" if not counts.get("shapes", 0) else "good"),
]
missing_coords = _count(session, GtfsStop, dataset.id, (GtfsStop.lat.is_(None) | GtfsStop.lon.is_(None)))
invalid_coords = _count(
session,
GtfsStop,
dataset.id,
(GtfsStop.lat < -90) | (GtfsStop.lat > 90) | (GtfsStop.lon < -180) | (GtfsStop.lon > 180),
)
routes_without_trips = _routes_without_trips(session, dataset.id)
trips_without_stop_times = _trips_without_stop_times(session, dataset.id)
stop_times_without_seconds = _stop_times_without_seconds(session, dataset.id)
route_geometry_missing = _count(session, GtfsRoute, dataset.id, GtfsRoute.geometry_geojson.is_(None))
canonical_links = _count(session, CanonicalStopLink, dataset.id, CanonicalStopLink.object_type == "gtfs_stop")
match_counts = counts.get("match_counts", {}) if isinstance(counts.get("match_counts"), dict) else {}
items.extend(
[
_metric("Stops missing coordinates", missing_coords, "bad" if missing_coords else "good"),
_metric("Stops with invalid coordinates", invalid_coords, "bad" if invalid_coords else "good"),
_metric("Routes without trips", routes_without_trips, "bad" if routes_without_trips else "good"),
_metric("Trips without stop_times", trips_without_stop_times, "bad" if trips_without_stop_times else "good"),
_metric("Stop times without parsed seconds", stop_times_without_seconds, "warn" if stop_times_without_seconds else "good"),
_metric("Routes without geometry", route_geometry_missing, "warn" if route_geometry_missing else "good"),
_metric("Canonical stop links", canonical_links, "warn" if counts.get("stops", 0) and canonical_links == 0 else "good"),
_metric("Route matches", counts.get("matches", 0), "warn" if counts.get("routes", 0) and not counts.get("matches", 0) else "good"),
]
)
issues: list[dict[str, str]] = []
if counts.get("missing_sidecar"):
issues.append(_issue("missing_sidecar", "bad", "GTFS sidecar is missing", "Queue a recovery import for this dataset."))
for key, label in [
("agencies", "No agencies imported"),
("stops", "No stops imported"),
("routes", "No routes imported"),
("trips", "No trips imported"),
("stop_times", "No stop_times imported"),
]:
if not counts.get(key, 0):
issues.append(_issue(f"missing_{key}", "bad", label, "Required GTFS content is absent or failed to import."))
if missing_coords:
issues.append(_issue("missing_stop_coordinates", "bad", f"{missing_coords:,} stops have no coordinates", "Stop coordinates are required for deduplication and routing access."))
if invalid_coords:
issues.append(_issue("invalid_stop_coordinates", "bad", f"{invalid_coords:,} stops have invalid coordinates", "Fix or exclude invalid stop coordinates before publication."))
if routes_without_trips:
issues.append(_issue("routes_without_trips", "warn", f"{routes_without_trips:,} routes have no trips", "These routes cannot contribute timetable service."))
if trips_without_stop_times:
issues.append(_issue("trips_without_stop_times", "bad", f"{trips_without_stop_times:,} trips have no stop_times", "These trips cannot be routed."))
if route_geometry_missing:
issues.append(_issue("route_geometry_missing", "warn", f"{route_geometry_missing:,} routes have no geometry", "Use GTFS shapes, route-layer matching, or stop-by-stop fallback."))
if counts.get("routes", 0) and not counts.get("shapes", 0):
issues.append(_issue("missing_shapes", "warn", "No GTFS shapes imported", "OSM route matching or generated geometry will be needed."))
if counts.get("routes", 0) and not match_counts:
issues.append(_issue("no_route_matching", "warn", "No route-match rows", "Run route matching before route-layer publication QA."))
return {
"status": _qa_status(issues, dataset),
"items": items,
"issues": issues,
}
def _service_horizon(session: Session, dataset: Dataset | None) -> dict[str, Any]:
if dataset is None:
return {"start_date": None, "end_date": None, "days_until_end": None, "items": [], "issues": []}
cal_min, cal_max = session.execute(
select(func.min(GtfsCalendar.start_date), func.max(GtfsCalendar.end_date)).where(GtfsCalendar.dataset_id == dataset.id)
).one()
date_min, date_max = session.execute(
select(func.min(GtfsCalendarDate.date), func.max(GtfsCalendarDate.date)).where(GtfsCalendarDate.dataset_id == dataset.id)
).one()
start_int = _min_int(cal_min, date_min)
end_int = _max_int(cal_max, date_max)
start_date = _gtfs_date(start_int)
end_date = _gtfs_date(end_int)
today = datetime.now(timezone.utc).date()
days_until_end = None if end_date is None else (end_date - today).days
issues: list[dict[str, str]] = []
if end_date is None:
issues.append(_issue("service_horizon_missing", "bad", "No service calendar horizon", "calendar.txt or calendar_dates.txt is required for reliable routing."))
elif days_until_end is not None and days_until_end < 0:
issues.append(_issue("service_horizon_expired", "bad", f"Service expired {abs(days_until_end):,} days ago", "Update or exclude this feed."))
elif days_until_end is not None and days_until_end < 30:
issues.append(_issue("service_horizon_short", "warn", f"Service ends in {days_until_end:,} days", "Update cadence is too close for publication confidence."))
return {
"start_date": None if start_date is None else start_date.isoformat(),
"end_date": None if end_date is None else end_date.isoformat(),
"days_until_end": days_until_end,
"items": [
_metric("Service starts", start_date.isoformat() if start_date else "n/a", "info"),
_metric("Service ends", end_date.isoformat() if end_date else "n/a", "bad" if end_date is None or (days_until_end is not None and days_until_end < 0) else "warn" if days_until_end is not None and days_until_end < 30 else "good"),
],
"issues": issues,
}
def _overlap_summary(session: Session, dataset: Dataset | None) -> dict[str, Any]:
if dataset is None:
return {"items": [], "issues": []}
route_key_overlaps = _shared_route_keys(session, dataset.id)
canonical_stop_overlaps = _shared_canonical_stops(session, dataset.id)
issues: list[dict[str, str]] = []
if route_key_overlaps:
issues.append(_issue("shared_route_keys", "warn", f"{route_key_overlaps:,} route keys also exist in another active feed", "Deduplicate or rank source authority for overlapping routes."))
if canonical_stop_overlaps:
issues.append(_issue("shared_canonical_stops", "warn", f"{canonical_stop_overlaps:,} canonical stops are shared with another active feed", "This is useful linking evidence, but conflicts need review."))
return {
"items": [
_metric("Shared route keys", route_key_overlaps, "warn" if route_key_overlaps else "good"),
_metric("Shared canonical stops", canonical_stop_overlaps, "warn" if canonical_stop_overlaps else "good"),
],
"issues": issues,
}
def _license_payload(source: Source) -> dict[str, Any]:
text = (source.license or "").strip()
unknown = not text or "unknown" in text.lower()
return {
"label": text or "unknown",
"redistribution_status": "unknown" if unknown else "review_required",
"tone": "warn" if unknown else "info",
}
def _license_issues(source: Source) -> list[dict[str, str]]:
if _license_payload(source)["redistribution_status"] == "unknown":
return [_issue("license_unknown", "warn", "License/redistribution status is unknown", "Publication needs explicit import, derivation, redistribution, and attribution flags.")]
return []
def _qa_review_payload(notes: str | None) -> dict[str, Any]:
if not notes:
return {"status": "unreviewed", "note": "", "updated_at": None}
for line in str(notes).splitlines():
if not line.startswith(GTFS_QA_NOTE_PREFIX):
continue
payload: dict[str, str] = {}
for part in line[len(GTFS_QA_NOTE_PREFIX) :].strip().split(";"):
if "=" not in part:
continue
key, value = part.split("=", 1)
payload[key.strip()] = value.strip()
return {
"status": payload.get("status") or "unreviewed",
"note": payload.get("note") or "",
"updated_at": payload.get("updated_at"),
}
return {"status": "unreviewed", "note": "", "updated_at": None}
def _routes_without_trips(session: Session, dataset_id: int) -> int:
trip_exists = select(GtfsTrip.id).where(GtfsTrip.dataset_id == dataset_id, GtfsTrip.route_id == GtfsRoute.route_id).exists()
return int(session.scalar(select(func.count()).select_from(GtfsRoute).where(GtfsRoute.dataset_id == dataset_id, ~trip_exists)) or 0)
def _trips_without_stop_times(session: Session, dataset_id: int) -> int:
stop_time_exists = select(GtfsStopTime.id).where(GtfsStopTime.dataset_id == dataset_id, GtfsStopTime.trip_id == GtfsTrip.trip_id).exists()
return int(session.scalar(select(func.count()).select_from(GtfsTrip).where(GtfsTrip.dataset_id == dataset_id, ~stop_time_exists)) or 0)
def _stop_times_without_seconds(session: Session, dataset_id: int) -> int:
return int(
session.scalar(
select(func.count())
.select_from(GtfsStopTime)
.where(GtfsStopTime.dataset_id == dataset_id, GtfsStopTime.arrival_seconds.is_(None), GtfsStopTime.departure_seconds.is_(None))
)
or 0
)
def _shared_route_keys(session: Session, dataset_id: int) -> int:
current = aliased(GtfsRoute)
other = aliased(GtfsRoute)
other_dataset = aliased(Dataset)
return int(
session.scalar(
select(func.count(func.distinct(current.route_key)))
.select_from(current)
.join(other, and_(other.route_key == current.route_key, other.dataset_id != current.dataset_id))
.join(other_dataset, other_dataset.id == other.dataset_id)
.where(
current.dataset_id == dataset_id,
current.route_key.is_not(None),
current.route_key != "",
other_dataset.kind == "gtfs",
other_dataset.is_active.is_(True),
)
)
or 0
)
def _shared_canonical_stops(session: Session, dataset_id: int) -> int:
current = aliased(CanonicalStopLink)
other = aliased(CanonicalStopLink)
other_dataset = aliased(Dataset)
return int(
session.scalar(
select(func.count(func.distinct(current.canonical_stop_id)))
.select_from(current)
.join(other, and_(other.canonical_stop_id == current.canonical_stop_id, other.dataset_id != current.dataset_id))
.join(other_dataset, other_dataset.id == other.dataset_id)
.where(
current.dataset_id == dataset_id,
current.object_type == "gtfs_stop",
other.object_type == "gtfs_stop",
other_dataset.kind == "gtfs",
other_dataset.is_active.is_(True),
)
)
or 0
)
def _count(session: Session, model: Any, dataset_id: int, *criteria: Any) -> int:
stmt = select(func.count()).select_from(model).where(model.dataset_id == dataset_id)
if criteria:
stmt = stmt.where(*criteria)
return int(session.scalar(stmt) or 0)
def _metric(label: str, value: Any, tone: str = "info", description: str = "") -> dict[str, Any]:
return {"label": label, "value": value, "tone": tone, "description": description}
def _issue(issue_id: str, severity: str, title: str, detail: str) -> dict[str, str]:
return {"id": issue_id, "severity": severity, "title": title, "detail": detail}
def _qa_status(issues: list[dict[str, str]], dataset: Dataset | None) -> str:
if dataset is None or any(issue.get("severity") == "bad" for issue in issues):
return "blocked"
if any(issue.get("severity") == "warn" for issue in issues):
return "needs_review"
return "ready"
def _feed_sections(feed: dict[str, Any]) -> list[dict[str, Any]]:
return [
{"id": "validation", "title": "GTFS Validation", "items": feed["validation"]["items"]},
{"id": "service", "title": "Service Horizon", "items": feed["service"]["items"]},
{"id": "overlap", "title": "Overlap and Deduplication", "items": feed["overlap"]["items"]},
{"id": "license", "title": "License", "items": [_metric("Redistribution", feed["license"]["redistribution_status"], feed["license"]["tone"]), _metric("License", feed["license"]["label"], feed["license"]["tone"])]},
]
def _gtfs_date(value: int | None) -> date | None:
if value is None:
return None
try:
return datetime.strptime(str(int(value)), "%Y%m%d").date()
except ValueError:
return None
def _min_int(*values: int | None) -> int | None:
clean = [int(value) for value in values if value is not None]
return min(clean) if clean else None
def _max_int(*values: int | None) -> int | None:
clean = [int(value) for value in values if value is not None]
return max(clean) if clean else None
def _iso(value: datetime | None) -> str | None:
return None if value is None else value.isoformat()