from __future__ import annotations from datetime import datetime, timezone import hashlib import json from typing import Any from sqlalchemy import select from sqlalchemy.orm import Session from app.models import PipelineRun STAGE_ACQUIRE_RAW = "acquire_raw" STAGE_FILTER_TRANSPORT = "filter_transport" STAGE_EXTRACT_GEOMETRY = "extract_geometry" STAGE_LABEL_FEATURES = "label_features" STAGE_BUILD_INDEXES = "build_indexes" STAGE_MATCH_ROUTES = "match_routes" STAGE_BUILD_ROUTE_LAYER = "build_route_layer" def stable_json(value: Any) -> str: return json.dumps(value, sort_keys=True, separators=(",", ":"), default=str) def dependency_hash(value: Any) -> str: return hashlib.sha256(stable_json(value).encode("utf-8")).hexdigest() def latest_completed_run( session: Session, *, stage: str, version: str, dependency_hash_value: str, source_id: int | None = None, dataset_id: int | None = None, ) -> PipelineRun | None: stmt = ( select(PipelineRun) .where( PipelineRun.stage == stage, PipelineRun.version == version, PipelineRun.dependency_hash == dependency_hash_value, PipelineRun.status == "completed", ) .order_by(PipelineRun.finished_at.desc(), PipelineRun.id.desc()) .limit(1) ) if source_id is None: stmt = stmt.where(PipelineRun.source_id.is_(None)) else: stmt = stmt.where(PipelineRun.source_id == source_id) if dataset_id is None: stmt = stmt.where(PipelineRun.dataset_id.is_(None)) else: stmt = stmt.where(PipelineRun.dataset_id == dataset_id) return session.scalar(stmt) def start_pipeline_run( session: Session, *, stage: str, version: str, dependency_hash_value: str, source_id: int | None = None, dataset_id: int | None = None, job_id: int | None = None, inputs: dict[str, Any] | None = None, ) -> PipelineRun: now = datetime.now(timezone.utc) run = PipelineRun( stage=stage, version=version, dependency_hash=dependency_hash_value, status="running", source_id=source_id, dataset_id=dataset_id, job_id=job_id, input_json=None if inputs is None else stable_json(inputs), started_at=now, updated_at=now, ) session.add(run) session.flush() return run def finish_pipeline_run( session: Session, run: PipelineRun, *, status: str = "completed", outputs: dict[str, Any] | None = None, error: str | None = None, ) -> PipelineRun: now = datetime.now(timezone.utc) run.status = status run.output_json = None if outputs is None else stable_json(outputs) run.error = error run.updated_at = now run.finished_at = now session.flush() return run def pipeline_run_payload(run: PipelineRun) -> dict[str, Any]: return { "id": run.id, "stage": run.stage, "version": run.version, "dependency_hash": run.dependency_hash, "status": run.status, "source_id": run.source_id, "dataset_id": run.dataset_id, "job_id": run.job_id, "input": _json_object(run.input_json), "output": _json_object(run.output_json), "error": run.error, "started_at": run.started_at.isoformat() if run.started_at else None, "updated_at": run.updated_at.isoformat() if run.updated_at else None, "finished_at": run.finished_at.isoformat() if run.finished_at else None, } def _json_object(text: str | None) -> dict[str, Any]: if not text: return {} try: value = json.loads(text) except json.JSONDecodeError: return {} return value if isinstance(value, dict) else {}