395 lines
18 KiB
Python
395 lines
18 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import date, datetime, timezone
|
|
from typing import Any
|
|
|
|
from sqlalchemy import and_, func, select
|
|
from sqlalchemy.orm import Session, aliased
|
|
|
|
from app.data_management import dataset_row_counts
|
|
from app.models import (
|
|
CanonicalStopLink,
|
|
Dataset,
|
|
GtfsCalendar,
|
|
GtfsCalendarDate,
|
|
GtfsRoute,
|
|
GtfsStop,
|
|
GtfsStopTime,
|
|
GtfsTrip,
|
|
RouteMatch,
|
|
Source,
|
|
)
|
|
|
|
|
|
GTFS_QA_NOTE_PREFIX = "[GTFS QA]"
|
|
|
|
|
|
def gtfs_harmonization_inventory(session: Session) -> dict[str, Any]:
|
|
feeds = [_feed_inventory_item(session, source) for source in _gtfs_sources(session)]
|
|
summary = {
|
|
"sources": len(feeds),
|
|
"active_sources": sum(1 for feed in feeds if feed["active_dataset"] is not None),
|
|
"datasets": sum(len(feed["datasets"]) for feed in feeds),
|
|
"ready": sum(1 for feed in feeds if feed["qa_status"] == "ready"),
|
|
"needs_review": sum(1 for feed in feeds if feed["qa_status"] == "needs_review"),
|
|
"blocked": sum(1 for feed in feeds if feed["qa_status"] == "blocked"),
|
|
}
|
|
return {
|
|
"summary": summary,
|
|
"feeds": feeds,
|
|
}
|
|
|
|
|
|
def gtfs_harmonization_feed_detail(session: Session, source_id: int) -> dict[str, Any] | None:
|
|
source = session.get(Source, source_id)
|
|
if source is None or source.kind != "gtfs":
|
|
return None
|
|
feed = _feed_inventory_item(session, source)
|
|
return {
|
|
**feed,
|
|
"sections": _feed_sections(feed),
|
|
}
|
|
|
|
|
|
def _gtfs_sources(session: Session) -> list[Source]:
|
|
return session.scalars(select(Source).where(Source.kind == "gtfs").order_by(Source.country, Source.priority, Source.name, Source.id)).all()
|
|
|
|
|
|
def _feed_inventory_item(session: Session, source: Source) -> dict[str, Any]:
|
|
datasets = sorted([dataset for dataset in source.datasets if dataset.kind == "gtfs"], key=lambda item: (not item.is_active, item.created_at, item.id))
|
|
active_dataset = next((dataset for dataset in datasets if dataset.is_active), None)
|
|
counts = dataset_row_counts(session, active_dataset.id, active_dataset.kind) if active_dataset is not None else {}
|
|
validation = _validate_gtfs_dataset(session, source, active_dataset, counts)
|
|
overlap = _overlap_summary(session, active_dataset)
|
|
service = _service_horizon(session, active_dataset)
|
|
issues = [*validation["issues"], *service["issues"], *overlap["issues"], *_license_issues(source)]
|
|
qa_status = _qa_status(issues, active_dataset)
|
|
return {
|
|
"source": _source_payload(source),
|
|
"active_dataset": None if active_dataset is None else _dataset_payload(active_dataset, counts),
|
|
"datasets": [_dataset_payload(dataset, dataset_row_counts(session, dataset.id, dataset.kind)) for dataset in datasets],
|
|
"counts": counts,
|
|
"validation": validation,
|
|
"service": service,
|
|
"overlap": overlap,
|
|
"license": _license_payload(source),
|
|
"issues": issues,
|
|
"qa_status": qa_status,
|
|
}
|
|
|
|
|
|
def _source_payload(source: Source) -> dict[str, Any]:
|
|
return {
|
|
"id": source.id,
|
|
"name": source.name,
|
|
"country": source.country,
|
|
"license": source.license,
|
|
"priority": source.priority,
|
|
"mode_scope": source.mode_scope,
|
|
"source_basis": source.source_basis,
|
|
"status": source.status,
|
|
"enabled": source.enabled,
|
|
"last_error": source.last_error,
|
|
"last_run_at": _iso(source.last_run_at),
|
|
"url": source.url,
|
|
"catalog_entry_id": source.catalog_entry_id,
|
|
"notes": source.notes,
|
|
"qa_review": _qa_review_payload(source.notes),
|
|
}
|
|
|
|
|
|
def _dataset_payload(dataset: Dataset, counts: dict[str, Any]) -> dict[str, Any]:
|
|
return {
|
|
"id": dataset.id,
|
|
"kind": dataset.kind,
|
|
"is_active": dataset.is_active,
|
|
"status": dataset.status,
|
|
"sha256": dataset.sha256,
|
|
"local_path": dataset.local_path,
|
|
"created_at": _iso(dataset.created_at),
|
|
"counts": counts,
|
|
}
|
|
|
|
|
|
def _validate_gtfs_dataset(session: Session, source: Source, dataset: Dataset | None, counts: dict[str, Any]) -> dict[str, Any]:
|
|
if dataset is None:
|
|
return {
|
|
"status": "blocked",
|
|
"items": [],
|
|
"issues": [_issue("missing_active_dataset", "bad", "No active GTFS dataset", "Import this source before harmonization.")],
|
|
}
|
|
items = [
|
|
_metric("Agencies", counts.get("agencies", 0), "bad" if not counts.get("agencies", 0) else "good"),
|
|
_metric("Stops", counts.get("stops", 0), "bad" if not counts.get("stops", 0) else "good"),
|
|
_metric("Routes", counts.get("routes", 0), "bad" if not counts.get("routes", 0) else "good"),
|
|
_metric("Trips", counts.get("trips", 0), "bad" if not counts.get("trips", 0) else "good"),
|
|
_metric("Stop times", counts.get("stop_times", 0), "bad" if not counts.get("stop_times", 0) else "good"),
|
|
_metric("Shapes", counts.get("shapes", 0), "warn" if not counts.get("shapes", 0) else "good"),
|
|
]
|
|
missing_coords = _count(session, GtfsStop, dataset.id, (GtfsStop.lat.is_(None) | GtfsStop.lon.is_(None)))
|
|
invalid_coords = _count(
|
|
session,
|
|
GtfsStop,
|
|
dataset.id,
|
|
(GtfsStop.lat < -90) | (GtfsStop.lat > 90) | (GtfsStop.lon < -180) | (GtfsStop.lon > 180),
|
|
)
|
|
routes_without_trips = _routes_without_trips(session, dataset.id)
|
|
trips_without_stop_times = _trips_without_stop_times(session, dataset.id)
|
|
stop_times_without_seconds = _stop_times_without_seconds(session, dataset.id)
|
|
route_geometry_missing = _count(session, GtfsRoute, dataset.id, GtfsRoute.geometry_geojson.is_(None))
|
|
canonical_links = _count(session, CanonicalStopLink, dataset.id, CanonicalStopLink.object_type == "gtfs_stop")
|
|
match_counts = counts.get("match_counts", {}) if isinstance(counts.get("match_counts"), dict) else {}
|
|
|
|
items.extend(
|
|
[
|
|
_metric("Stops missing coordinates", missing_coords, "bad" if missing_coords else "good"),
|
|
_metric("Stops with invalid coordinates", invalid_coords, "bad" if invalid_coords else "good"),
|
|
_metric("Routes without trips", routes_without_trips, "bad" if routes_without_trips else "good"),
|
|
_metric("Trips without stop_times", trips_without_stop_times, "bad" if trips_without_stop_times else "good"),
|
|
_metric("Stop times without parsed seconds", stop_times_without_seconds, "warn" if stop_times_without_seconds else "good"),
|
|
_metric("Routes without geometry", route_geometry_missing, "warn" if route_geometry_missing else "good"),
|
|
_metric("Canonical stop links", canonical_links, "warn" if counts.get("stops", 0) and canonical_links == 0 else "good"),
|
|
_metric("Route matches", counts.get("matches", 0), "warn" if counts.get("routes", 0) and not counts.get("matches", 0) else "good"),
|
|
]
|
|
)
|
|
issues: list[dict[str, str]] = []
|
|
if counts.get("missing_sidecar"):
|
|
issues.append(_issue("missing_sidecar", "bad", "GTFS sidecar is missing", "Queue a recovery import for this dataset."))
|
|
for key, label in [
|
|
("agencies", "No agencies imported"),
|
|
("stops", "No stops imported"),
|
|
("routes", "No routes imported"),
|
|
("trips", "No trips imported"),
|
|
("stop_times", "No stop_times imported"),
|
|
]:
|
|
if not counts.get(key, 0):
|
|
issues.append(_issue(f"missing_{key}", "bad", label, "Required GTFS content is absent or failed to import."))
|
|
if missing_coords:
|
|
issues.append(_issue("missing_stop_coordinates", "bad", f"{missing_coords:,} stops have no coordinates", "Stop coordinates are required for deduplication and routing access."))
|
|
if invalid_coords:
|
|
issues.append(_issue("invalid_stop_coordinates", "bad", f"{invalid_coords:,} stops have invalid coordinates", "Fix or exclude invalid stop coordinates before publication."))
|
|
if routes_without_trips:
|
|
issues.append(_issue("routes_without_trips", "warn", f"{routes_without_trips:,} routes have no trips", "These routes cannot contribute timetable service."))
|
|
if trips_without_stop_times:
|
|
issues.append(_issue("trips_without_stop_times", "bad", f"{trips_without_stop_times:,} trips have no stop_times", "These trips cannot be routed."))
|
|
if route_geometry_missing:
|
|
issues.append(_issue("route_geometry_missing", "warn", f"{route_geometry_missing:,} routes have no geometry", "Use GTFS shapes, route-layer matching, or stop-by-stop fallback."))
|
|
if counts.get("routes", 0) and not counts.get("shapes", 0):
|
|
issues.append(_issue("missing_shapes", "warn", "No GTFS shapes imported", "OSM route matching or generated geometry will be needed."))
|
|
if counts.get("routes", 0) and not match_counts:
|
|
issues.append(_issue("no_route_matching", "warn", "No route-match rows", "Run route matching before route-layer publication QA."))
|
|
return {
|
|
"status": _qa_status(issues, dataset),
|
|
"items": items,
|
|
"issues": issues,
|
|
}
|
|
|
|
|
|
def _service_horizon(session: Session, dataset: Dataset | None) -> dict[str, Any]:
|
|
if dataset is None:
|
|
return {"start_date": None, "end_date": None, "days_until_end": None, "items": [], "issues": []}
|
|
cal_min, cal_max = session.execute(
|
|
select(func.min(GtfsCalendar.start_date), func.max(GtfsCalendar.end_date)).where(GtfsCalendar.dataset_id == dataset.id)
|
|
).one()
|
|
date_min, date_max = session.execute(
|
|
select(func.min(GtfsCalendarDate.date), func.max(GtfsCalendarDate.date)).where(GtfsCalendarDate.dataset_id == dataset.id)
|
|
).one()
|
|
start_int = _min_int(cal_min, date_min)
|
|
end_int = _max_int(cal_max, date_max)
|
|
start_date = _gtfs_date(start_int)
|
|
end_date = _gtfs_date(end_int)
|
|
today = datetime.now(timezone.utc).date()
|
|
days_until_end = None if end_date is None else (end_date - today).days
|
|
issues: list[dict[str, str]] = []
|
|
if end_date is None:
|
|
issues.append(_issue("service_horizon_missing", "bad", "No service calendar horizon", "calendar.txt or calendar_dates.txt is required for reliable routing."))
|
|
elif days_until_end is not None and days_until_end < 0:
|
|
issues.append(_issue("service_horizon_expired", "bad", f"Service expired {abs(days_until_end):,} days ago", "Update or exclude this feed."))
|
|
elif days_until_end is not None and days_until_end < 30:
|
|
issues.append(_issue("service_horizon_short", "warn", f"Service ends in {days_until_end:,} days", "Update cadence is too close for publication confidence."))
|
|
return {
|
|
"start_date": None if start_date is None else start_date.isoformat(),
|
|
"end_date": None if end_date is None else end_date.isoformat(),
|
|
"days_until_end": days_until_end,
|
|
"items": [
|
|
_metric("Service starts", start_date.isoformat() if start_date else "n/a", "info"),
|
|
_metric("Service ends", end_date.isoformat() if end_date else "n/a", "bad" if end_date is None or (days_until_end is not None and days_until_end < 0) else "warn" if days_until_end is not None and days_until_end < 30 else "good"),
|
|
],
|
|
"issues": issues,
|
|
}
|
|
|
|
|
|
def _overlap_summary(session: Session, dataset: Dataset | None) -> dict[str, Any]:
|
|
if dataset is None:
|
|
return {"items": [], "issues": []}
|
|
route_key_overlaps = _shared_route_keys(session, dataset.id)
|
|
canonical_stop_overlaps = _shared_canonical_stops(session, dataset.id)
|
|
issues: list[dict[str, str]] = []
|
|
if route_key_overlaps:
|
|
issues.append(_issue("shared_route_keys", "warn", f"{route_key_overlaps:,} route keys also exist in another active feed", "Deduplicate or rank source authority for overlapping routes."))
|
|
if canonical_stop_overlaps:
|
|
issues.append(_issue("shared_canonical_stops", "warn", f"{canonical_stop_overlaps:,} canonical stops are shared with another active feed", "This is useful linking evidence, but conflicts need review."))
|
|
return {
|
|
"items": [
|
|
_metric("Shared route keys", route_key_overlaps, "warn" if route_key_overlaps else "good"),
|
|
_metric("Shared canonical stops", canonical_stop_overlaps, "warn" if canonical_stop_overlaps else "good"),
|
|
],
|
|
"issues": issues,
|
|
}
|
|
|
|
|
|
def _license_payload(source: Source) -> dict[str, Any]:
|
|
text = (source.license or "").strip()
|
|
unknown = not text or "unknown" in text.lower()
|
|
return {
|
|
"label": text or "unknown",
|
|
"redistribution_status": "unknown" if unknown else "review_required",
|
|
"tone": "warn" if unknown else "info",
|
|
}
|
|
|
|
|
|
def _license_issues(source: Source) -> list[dict[str, str]]:
|
|
if _license_payload(source)["redistribution_status"] == "unknown":
|
|
return [_issue("license_unknown", "warn", "License/redistribution status is unknown", "Publication needs explicit import, derivation, redistribution, and attribution flags.")]
|
|
return []
|
|
|
|
|
|
def _qa_review_payload(notes: str | None) -> dict[str, Any]:
|
|
if not notes:
|
|
return {"status": "unreviewed", "note": "", "updated_at": None}
|
|
for line in str(notes).splitlines():
|
|
if not line.startswith(GTFS_QA_NOTE_PREFIX):
|
|
continue
|
|
payload: dict[str, str] = {}
|
|
for part in line[len(GTFS_QA_NOTE_PREFIX) :].strip().split(";"):
|
|
if "=" not in part:
|
|
continue
|
|
key, value = part.split("=", 1)
|
|
payload[key.strip()] = value.strip()
|
|
return {
|
|
"status": payload.get("status") or "unreviewed",
|
|
"note": payload.get("note") or "",
|
|
"updated_at": payload.get("updated_at"),
|
|
}
|
|
return {"status": "unreviewed", "note": "", "updated_at": None}
|
|
|
|
|
|
def _routes_without_trips(session: Session, dataset_id: int) -> int:
|
|
trip_exists = select(GtfsTrip.id).where(GtfsTrip.dataset_id == dataset_id, GtfsTrip.route_id == GtfsRoute.route_id).exists()
|
|
return int(session.scalar(select(func.count()).select_from(GtfsRoute).where(GtfsRoute.dataset_id == dataset_id, ~trip_exists)) or 0)
|
|
|
|
|
|
def _trips_without_stop_times(session: Session, dataset_id: int) -> int:
|
|
stop_time_exists = select(GtfsStopTime.id).where(GtfsStopTime.dataset_id == dataset_id, GtfsStopTime.trip_id == GtfsTrip.trip_id).exists()
|
|
return int(session.scalar(select(func.count()).select_from(GtfsTrip).where(GtfsTrip.dataset_id == dataset_id, ~stop_time_exists)) or 0)
|
|
|
|
|
|
def _stop_times_without_seconds(session: Session, dataset_id: int) -> int:
|
|
return int(
|
|
session.scalar(
|
|
select(func.count())
|
|
.select_from(GtfsStopTime)
|
|
.where(GtfsStopTime.dataset_id == dataset_id, GtfsStopTime.arrival_seconds.is_(None), GtfsStopTime.departure_seconds.is_(None))
|
|
)
|
|
or 0
|
|
)
|
|
|
|
|
|
def _shared_route_keys(session: Session, dataset_id: int) -> int:
|
|
current = aliased(GtfsRoute)
|
|
other = aliased(GtfsRoute)
|
|
other_dataset = aliased(Dataset)
|
|
return int(
|
|
session.scalar(
|
|
select(func.count(func.distinct(current.route_key)))
|
|
.select_from(current)
|
|
.join(other, and_(other.route_key == current.route_key, other.dataset_id != current.dataset_id))
|
|
.join(other_dataset, other_dataset.id == other.dataset_id)
|
|
.where(
|
|
current.dataset_id == dataset_id,
|
|
current.route_key.is_not(None),
|
|
current.route_key != "",
|
|
other_dataset.kind == "gtfs",
|
|
other_dataset.is_active.is_(True),
|
|
)
|
|
)
|
|
or 0
|
|
)
|
|
|
|
|
|
def _shared_canonical_stops(session: Session, dataset_id: int) -> int:
|
|
current = aliased(CanonicalStopLink)
|
|
other = aliased(CanonicalStopLink)
|
|
other_dataset = aliased(Dataset)
|
|
return int(
|
|
session.scalar(
|
|
select(func.count(func.distinct(current.canonical_stop_id)))
|
|
.select_from(current)
|
|
.join(other, and_(other.canonical_stop_id == current.canonical_stop_id, other.dataset_id != current.dataset_id))
|
|
.join(other_dataset, other_dataset.id == other.dataset_id)
|
|
.where(
|
|
current.dataset_id == dataset_id,
|
|
current.object_type == "gtfs_stop",
|
|
other.object_type == "gtfs_stop",
|
|
other_dataset.kind == "gtfs",
|
|
other_dataset.is_active.is_(True),
|
|
)
|
|
)
|
|
or 0
|
|
)
|
|
|
|
|
|
def _count(session: Session, model: Any, dataset_id: int, *criteria: Any) -> int:
|
|
stmt = select(func.count()).select_from(model).where(model.dataset_id == dataset_id)
|
|
if criteria:
|
|
stmt = stmt.where(*criteria)
|
|
return int(session.scalar(stmt) or 0)
|
|
|
|
|
|
def _metric(label: str, value: Any, tone: str = "info", description: str = "") -> dict[str, Any]:
|
|
return {"label": label, "value": value, "tone": tone, "description": description}
|
|
|
|
|
|
def _issue(issue_id: str, severity: str, title: str, detail: str) -> dict[str, str]:
|
|
return {"id": issue_id, "severity": severity, "title": title, "detail": detail}
|
|
|
|
|
|
def _qa_status(issues: list[dict[str, str]], dataset: Dataset | None) -> str:
|
|
if dataset is None or any(issue.get("severity") == "bad" for issue in issues):
|
|
return "blocked"
|
|
if any(issue.get("severity") == "warn" for issue in issues):
|
|
return "needs_review"
|
|
return "ready"
|
|
|
|
|
|
def _feed_sections(feed: dict[str, Any]) -> list[dict[str, Any]]:
|
|
return [
|
|
{"id": "validation", "title": "GTFS Validation", "items": feed["validation"]["items"]},
|
|
{"id": "service", "title": "Service Horizon", "items": feed["service"]["items"]},
|
|
{"id": "overlap", "title": "Overlap and Deduplication", "items": feed["overlap"]["items"]},
|
|
{"id": "license", "title": "License", "items": [_metric("Redistribution", feed["license"]["redistribution_status"], feed["license"]["tone"]), _metric("License", feed["license"]["label"], feed["license"]["tone"])]},
|
|
]
|
|
|
|
|
|
def _gtfs_date(value: int | None) -> date | None:
|
|
if value is None:
|
|
return None
|
|
try:
|
|
return datetime.strptime(str(int(value)), "%Y%m%d").date()
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _min_int(*values: int | None) -> int | None:
|
|
clean = [int(value) for value in values if value is not None]
|
|
return min(clean) if clean else None
|
|
|
|
|
|
def _max_int(*values: int | None) -> int | None:
|
|
clean = [int(value) for value in values if value is not None]
|
|
return max(clean) if clean else None
|
|
|
|
|
|
def _iso(value: datetime | None) -> str | None:
|
|
return None if value is None else value.isoformat()
|