Files
2026-07-01 23:29:51 +02:00

394 lines
19 KiB
Python

from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Any
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from app.gtfs_storage import missing_sidecar_paths as missing_gtfs_sidecar_paths
from app.models import (
CanonicalStop,
CanonicalStopLink,
Dataset,
GtfsAgency,
GtfsCalendar,
GtfsCalendarDate,
GtfsRoute,
GtfsShape,
GtfsStop,
GtfsTrip,
Job,
OsmFeature,
RouteMatch,
RoutePattern,
RoutePatternStop,
Source,
SourceCatalogEntry,
)
from app.osm_storage import missing_sidecar_paths as missing_osm_sidecar_paths
from app.pipeline.osm_addresses import ADDRESS_INDEX_VERSION
from app.pipeline.routing_layer import active_routing_dataset
def qa_summary(session: Session) -> dict[str, Any]:
active_gtfs_datasets = session.scalars(
select(Dataset).where(Dataset.kind == "gtfs", Dataset.is_active.is_(True)).order_by(Dataset.id)
).all()
active_osm_datasets = session.scalars(
select(Dataset).where(Dataset.kind == "osm_geojson", Dataset.is_active.is_(True)).order_by(Dataset.id)
).all()
active_gtfs_ids = [int(dataset.id) for dataset in active_gtfs_datasets]
active_osm_ids = [int(dataset.id) for dataset in active_osm_datasets]
source_catalog_total = _count(session, SourceCatalogEntry)
registered_sources = _count(session, Source)
linked_catalog_entries = int(
session.scalar(
select(func.count(func.distinct(Source.catalog_entry_id))).where(Source.catalog_entry_id.is_not(None))
)
or 0
)
priority_backlog = _priority_catalog_backlog(session)
failed_sources = int(
session.scalar(
select(func.count())
.select_from(Source)
.where((Source.last_error.is_not(None)) | Source.status.in_(["failed", "error"]))
)
or 0
)
active_jobs = _job_status_counts(session)
missing_gtfs_sidecars = sum(1 for dataset in active_gtfs_datasets if missing_gtfs_sidecar_paths(dataset))
missing_osm_sidecars = sum(1 for dataset in active_osm_datasets if missing_osm_sidecar_paths(dataset))
gtfs_counts = _gtfs_validation_counts(session, active_gtfs_ids)
link_counts = _link_quality_counts(session, active_gtfs_ids, active_osm_ids)
route_counts = _route_quality_counts(session, active_gtfs_ids)
address_status = _lightweight_address_index_status(session)
license_unknown = int(
session.scalar(
select(func.count())
.select_from(Source)
.where(Source.kind == "gtfs", (Source.license.is_(None)) | (func.lower(Source.license).in_(["", "unknown"])))
)
or 0
)
return {
"generated_at": datetime.now(timezone.utc).isoformat(),
"decision": {
"deployment": "same_workbench_for_now",
"database": "same_postgresql_database_for_now",
"split_trigger": "Split when third-party API, accounts/billing, heavy export jobs, or independent scaling are needed.",
"api_contract": "/api/qa/summary is intentionally display-ready but stable enough to become a harmonization-service summary endpoint.",
},
"sections": [
{
"id": "source_discovery",
"title": "Source Discovery",
"items": [
_item("Identified sources", source_catalog_total, "info", "Rows in the source catalog."),
_item("Registered sources", registered_sources, "info", "Sources known to the importer."),
_item("Catalog entries linked", linked_catalog_entries, "good" if linked_catalog_entries else "warn", "Catalog rows connected to importer sources."),
_item("Priority catalog backlog", priority_backlog, "warn" if priority_backlog else "good", "P0/P1 catalog rows without a registered source."),
],
},
{
"id": "import_health",
"title": "Import Health",
"items": [
_item("Active GTFS datasets", len(active_gtfs_ids), "good" if active_gtfs_ids else "warn", "Feeds currently participating in harmonization."),
_item("Active OSM datasets", len(active_osm_ids), "good" if active_osm_ids else "warn", "Visual/spatial datasets currently active."),
_item("Running jobs", active_jobs.get("running", 0), "warn" if active_jobs.get("running", 0) else "info", "Currently running queued work."),
_item("Queued jobs", active_jobs.get("queued", 0), "info", "Outstanding queued work."),
_item("Failed sources", failed_sources, "bad" if failed_sources else "good", "Sources with failed status or last_error."),
_item("Missing GTFS sidecars", missing_gtfs_sidecars, "bad" if missing_gtfs_sidecars else "good", "Active GTFS datasets whose sidecar is unavailable."),
_item("Missing OSM sidecars", missing_osm_sidecars, "bad" if missing_osm_sidecars else "good", "Active OSM datasets whose sidecar is unavailable."),
],
},
{
"id": "gtfs_validation",
"title": "GTFS Validation",
"items": [
_item("Agencies", gtfs_counts["agencies"], "info", "Imported agency.txt rows."),
_item("Stops", gtfs_counts["stops"], "info", "Imported stops."),
_item("Routes", gtfs_counts["routes"], "info", "Imported routes."),
_item("Trips", gtfs_counts["trips"], "info", "Imported trips."),
_item("Shapes", gtfs_counts["shapes"], "info", "Imported shape records."),
_item("Stops without coordinates", gtfs_counts["stops_without_coordinates"], "bad" if gtfs_counts["stops_without_coordinates"] else "good", "Stops that cannot be spatially linked or routed."),
_item("Routes without geometry", gtfs_counts["routes_without_geometry"], "warn" if gtfs_counts["routes_without_geometry"] else "good", "Routes with no stored GTFS shape geometry."),
_item("Routes without agency", gtfs_counts["routes_without_agency"], "warn" if gtfs_counts["routes_without_agency"] else "good", "Routes missing agency/operator references."),
_item("Calendar range", gtfs_counts["calendar_range"], "info", "Min/max imported service dates from calendars and exceptions."),
],
},
{
"id": "deduplication",
"title": "Deduplication and Stop Links",
"items": [
_item("Canonical stops", link_counts["canonical_stops"], "info", "Current normalized stop/station records."),
_item("GTFS stop links", link_counts["gtfs_stop_links"], "good" if link_counts["gtfs_stop_links"] else "warn", "Timetable stops linked into canonical stops."),
_item("GTFS stops without canonical link", link_counts["gtfs_stops_without_canonical"], "bad" if link_counts["gtfs_stops_without_canonical"] else "good", "Imported active stops that still need deduplication/linking."),
_item("OSM visual stop links", link_counts["osm_stop_links"], "good" if link_counts["osm_stop_links"] else "warn", "OSM stop/station features linked to canonical stops."),
_item("OSM stops without canonical link", link_counts["osm_stops_without_canonical"], "warn" if link_counts["osm_stops_without_canonical"] else "good", "Visual stops that are not yet linked to GTFS/canonical stops."),
_item("Multi-source stop groups", link_counts["multi_source_stop_groups"], "info", "Canonical stops that merge GTFS stops from multiple datasets."),
_item("Long-distance OSM links", link_counts["long_distance_osm_links"], "warn" if link_counts["long_distance_osm_links"] else "good", "OSM stop links over 150m from the canonical stop."),
],
},
{
"id": "route_quality",
"title": "Route Matching and Geometry",
"items": [
_item("Matched/accepted routes", route_counts["matched_or_accepted"], "good" if route_counts["matched_or_accepted"] else "warn", "GTFS routes with accepted or automatic OSM matches."),
_item("Probable matches", route_counts["probable"], "warn" if route_counts["probable"] else "info", "Potential conflicts needing review."),
_item("Weak matches", route_counts["weak"], "warn" if route_counts["weak"] else "good", "Low-confidence route links."),
_item("Missing route matches", route_counts["missing"], "bad" if route_counts["missing"] else "good", "Routes with no visual match."),
_item("Unreviewed GTFS routes", route_counts["routes_without_match"], "warn" if route_counts["routes_without_match"] else "good", "Active GTFS routes without a RouteMatch row."),
_item("Route patterns", route_counts["route_patterns"], "info", "Published visual route-layer patterns."),
_item("Route patterns without stops", route_counts["route_patterns_without_stops"], "warn" if route_counts["route_patterns_without_stops"] else "good", "Visual patterns missing canonical stop sequence evidence."),
],
},
{
"id": "publication_readiness",
"title": "Publication Readiness",
"items": [
_item("Address index stale", "yes" if address_status.get("stale") else "no", "warn" if address_status.get("stale") else "good", "Address polygons/search index version status."),
_item("GTFS licenses unknown", license_unknown, "warn" if license_unknown else "good", "GTFS sources without explicit redistribution/license status."),
_item("Canonical export", "draft", "warn", "Canonical Europe dataset export tables/API are not versioned yet."),
_item("Third-party API", "later", "info", "Accounts, billing, quotas, and API backend are intentionally out of scope for this step."),
],
},
],
"next_actions": [
"Add review queues for each non-zero bad/warn metric.",
"Persist source authority and redistribution policy before publishing third-party exports.",
"Create versioned canonical snapshots and export manifests.",
],
}
def _item(label: str, value: object, tone: str, description: str) -> dict[str, object]:
return {"label": label, "value": value, "tone": tone, "description": description}
def _lightweight_address_index_status(session: Session) -> dict[str, object]:
dataset = active_routing_dataset(session)
if dataset is None or not dataset.metadata_json:
return {"stale": False, "version": None, "current_version": ADDRESS_INDEX_VERSION}
try:
metadata = json.loads(dataset.metadata_json or "{}")
except json.JSONDecodeError:
metadata = {}
address_index = metadata.get("address_index") if isinstance(metadata, dict) else {}
if not isinstance(address_index, dict):
address_index = {}
version = address_index.get("version")
return {
"stale": bool(address_index and version != ADDRESS_INDEX_VERSION),
"version": version,
"current_version": ADDRESS_INDEX_VERSION,
}
def _count(session: Session, model, *where) -> int:
stmt = select(func.count()).select_from(model)
if where:
stmt = stmt.where(*where)
return int(session.scalar(stmt) or 0)
def _priority_catalog_backlog(session: Session) -> int:
linked = select(Source.id).where(Source.catalog_entry_id == SourceCatalogEntry.id).exists()
return int(
session.scalar(
select(func.count())
.select_from(SourceCatalogEntry)
.where(SourceCatalogEntry.priority.in_(["P0", "P0 fallback", "P1"]), ~linked)
)
or 0
)
def _job_status_counts(session: Session) -> dict[str, int]:
return {
str(status): int(count)
for status, count in session.execute(
select(Job.status, func.count())
.where(Job.dismissed_at.is_(None), Job.status.in_(["queued", "running", "paused", "failed"]))
.group_by(Job.status)
).all()
}
def _gtfs_validation_counts(session: Session, dataset_ids: list[int]) -> dict[str, object]:
if not dataset_ids:
return {
"agencies": 0,
"stops": 0,
"routes": 0,
"trips": 0,
"shapes": 0,
"stops_without_coordinates": 0,
"routes_without_geometry": 0,
"routes_without_agency": 0,
"calendar_range": "none",
}
calendar_min, calendar_max = session.execute(
select(func.min(GtfsCalendar.start_date), func.max(GtfsCalendar.end_date)).where(GtfsCalendar.dataset_id.in_(dataset_ids))
).one()
exception_min, exception_max = session.execute(
select(func.min(GtfsCalendarDate.date), func.max(GtfsCalendarDate.date)).where(GtfsCalendarDate.dataset_id.in_(dataset_ids))
).one()
min_date = min(value for value in [calendar_min, exception_min] if value is not None) if (calendar_min or exception_min) else None
max_date = max(value for value in [calendar_max, exception_max] if value is not None) if (calendar_max or exception_max) else None
return {
"agencies": _count(session, GtfsAgency, GtfsAgency.dataset_id.in_(dataset_ids)),
"stops": _count(session, GtfsStop, GtfsStop.dataset_id.in_(dataset_ids)),
"routes": _count(session, GtfsRoute, GtfsRoute.dataset_id.in_(dataset_ids)),
"trips": _count(session, GtfsTrip, GtfsTrip.dataset_id.in_(dataset_ids)),
"shapes": _count(session, GtfsShape, GtfsShape.dataset_id.in_(dataset_ids)),
"stops_without_coordinates": _count(
session,
GtfsStop,
GtfsStop.dataset_id.in_(dataset_ids),
(GtfsStop.lat.is_(None)) | (GtfsStop.lon.is_(None)),
),
"routes_without_geometry": _count(
session,
GtfsRoute,
GtfsRoute.dataset_id.in_(dataset_ids),
(GtfsRoute.geometry_geojson.is_(None)) | (GtfsRoute.geometry_geojson == ""),
),
"routes_without_agency": _count(
session,
GtfsRoute,
GtfsRoute.dataset_id.in_(dataset_ids),
(GtfsRoute.agency_id.is_(None)) | (GtfsRoute.agency_id == ""),
),
"calendar_range": f"{min_date or 'unknown'} -> {max_date or 'unknown'}",
}
def _link_quality_counts(session: Session, gtfs_dataset_ids: list[int], osm_dataset_ids: list[int]) -> dict[str, int]:
if gtfs_dataset_ids:
gtfs_link_exists = (
select(CanonicalStopLink.id)
.where(
CanonicalStopLink.object_type == "gtfs_stop",
CanonicalStopLink.dataset_id == GtfsStop.dataset_id,
CanonicalStopLink.object_id == GtfsStop.id,
)
.exists()
)
gtfs_stops_without_canonical = _count(
session,
GtfsStop,
GtfsStop.dataset_id.in_(gtfs_dataset_ids),
~gtfs_link_exists,
)
gtfs_stop_links = _count(
session,
CanonicalStopLink,
CanonicalStopLink.object_type == "gtfs_stop",
CanonicalStopLink.dataset_id.in_(gtfs_dataset_ids),
)
multi_source_subquery = (
select(CanonicalStopLink.canonical_stop_id)
.where(CanonicalStopLink.object_type == "gtfs_stop", CanonicalStopLink.dataset_id.in_(gtfs_dataset_ids))
.group_by(CanonicalStopLink.canonical_stop_id)
.having(func.count(func.distinct(CanonicalStopLink.dataset_id)) > 1)
.subquery()
)
multi_source_stop_groups = int(session.scalar(select(func.count()).select_from(multi_source_subquery)) or 0)
else:
gtfs_stops_without_canonical = 0
gtfs_stop_links = 0
multi_source_stop_groups = 0
if osm_dataset_ids:
osm_link_exists = (
select(CanonicalStopLink.id)
.where(
CanonicalStopLink.object_type == "osm_feature",
CanonicalStopLink.dataset_id == OsmFeature.dataset_id,
CanonicalStopLink.object_id == OsmFeature.id,
)
.exists()
)
osm_stops_without_canonical = _count(
session,
OsmFeature,
OsmFeature.dataset_id.in_(osm_dataset_ids),
OsmFeature.kind.in_(["stop", "station", "terminal"]),
~osm_link_exists,
)
osm_stop_links = _count(
session,
CanonicalStopLink,
CanonicalStopLink.object_type == "osm_feature",
CanonicalStopLink.dataset_id.in_(osm_dataset_ids),
)
long_distance_osm_links = _count(
session,
CanonicalStopLink,
CanonicalStopLink.object_type == "osm_feature",
CanonicalStopLink.dataset_id.in_(osm_dataset_ids),
CanonicalStopLink.distance_m > 150,
)
else:
osm_stops_without_canonical = 0
osm_stop_links = 0
long_distance_osm_links = 0
return {
"canonical_stops": _count(session, CanonicalStop),
"gtfs_stop_links": gtfs_stop_links,
"gtfs_stops_without_canonical": gtfs_stops_without_canonical,
"osm_stop_links": osm_stop_links,
"osm_stops_without_canonical": osm_stops_without_canonical,
"multi_source_stop_groups": multi_source_stop_groups,
"long_distance_osm_links": long_distance_osm_links,
}
def _route_quality_counts(session: Session, gtfs_dataset_ids: list[int]) -> dict[str, int]:
route_patterns = _count(session, RoutePattern)
route_pattern_stop_exists = (
select(RoutePatternStop.id)
.where(RoutePatternStop.route_pattern_id == RoutePattern.id)
.exists()
)
route_patterns_without_stops = _count(session, RoutePattern, ~route_pattern_stop_exists)
if not gtfs_dataset_ids:
return {
"matched_or_accepted": 0,
"probable": 0,
"weak": 0,
"missing": 0,
"routes_without_match": 0,
"route_patterns": route_patterns,
"route_patterns_without_stops": route_patterns_without_stops,
}
match_rows = {
str(status): int(count)
for status, count in session.execute(
select(RouteMatch.status, func.count())
.join(GtfsRoute, GtfsRoute.id == RouteMatch.gtfs_route_id)
.where(GtfsRoute.dataset_id.in_(gtfs_dataset_ids))
.group_by(RouteMatch.status)
).all()
}
match_exists = select(RouteMatch.id).where(RouteMatch.gtfs_route_id == GtfsRoute.id).exists()
routes_without_match = _count(session, GtfsRoute, GtfsRoute.dataset_id.in_(gtfs_dataset_ids), ~match_exists)
return {
"matched_or_accepted": match_rows.get("matched", 0) + match_rows.get("accepted", 0),
"probable": match_rows.get("probable", 0),
"weak": match_rows.get("weak", 0),
"missing": match_rows.get("missing", 0),
"routes_without_match": routes_without_match,
"route_patterns": route_patterns,
"route_patterns_without_stops": route_patterns_without_stops,
}