from __future__ import annotations from datetime import datetime, timezone from typing import Callable, Any from sqlalchemy.orm import Session from app.models import Source from app.pipeline.gtfs import run_gtfs_source from app.pipeline.osm_diff import run_osm_diff_source from app.pipeline.osm_geojson import run_osm_geojson_source from app.pipeline.osm_pbf import run_osm_pbf_source ProgressCallback = Callable[[str, str, int | None, int | None, dict[str, Any] | None], None] def run_source(session: Session, source: Source, progress_callback: ProgressCallback | None = None): source.status = "running" source.last_run_at = datetime.now(timezone.utc) source.last_error = None session.flush() try: if source.kind == "gtfs": dataset = run_gtfs_source(session, source, progress_callback=progress_callback) elif source.kind == "osm_geojson": dataset = run_osm_geojson_source(session, source) elif source.kind == "osm_pbf": dataset = run_osm_pbf_source(session, source, progress_callback=progress_callback) elif source.kind == "osm_diff": dataset = run_osm_diff_source(session, source) else: raise ValueError(f"Unsupported source kind: {source.kind}") source.status = "ok" source.last_error = None return dataset except Exception as exc: # noqa: BLE001 - persist pipeline error for UI source.status = "error" source.last_error = str(exc) raise