from __future__ import annotations import json from urllib.parse import urlparse from sqlalchemy import select from sqlalchemy.orm import Session from app.config import settings from app.models import Dataset, Source from app.pipeline.download import materialize_source from app.pipeline.osm_pbf import _raw_format from app.pipeline.osm_replication import fetch_replication_state from app.pipeline.utils import sha256_file def run_osm_diff_source(session: Session, source: Source) -> Dataset: """Commit an OSM change file as a raw update artifact. Applying the diff to an authoritative OSM base extract is a separate step; this importer deliberately records the file without treating it as a complete visual route layer. """ if _looks_like_update_directory(source.url): return _commit_update_directory_state(session, source) raw_path = materialize_source(source) raw_hash = sha256_file(raw_path) existing = session.scalar( select(Dataset) .where(Dataset.source_id == source.id, Dataset.kind == "osm_diff_raw", Dataset.sha256 == raw_hash) .order_by(Dataset.id.desc()) ) if existing is not None: return existing dataset = Dataset( source_id=source.id, kind="osm_diff_raw", local_path=str(raw_path), sha256=raw_hash, is_active=False, status="committed", metadata_json=json.dumps( { "stage": "raw_osm_diff", "raw_format": _raw_format(raw_path), "source_url": source.url, }, indent=2, ), ) session.add(dataset) session.flush() return dataset def _commit_update_directory_state(session: Session, source: Source) -> Dataset: state = fetch_replication_state(source.url, timeout=settings.osm_diff_state_timeout_seconds) source_dir = settings.data_dir / "sources" / f"source_{source.id}" source_dir.mkdir(parents=True, exist_ok=True) state_path = source_dir / f"state_{state.sequence_number}.txt" state_path.write_text( "\n".join(f"{key}={value}" for key, value in sorted(state.raw.items())) + "\n", encoding="utf-8", ) state_hash = sha256_file(state_path) existing = session.scalar( select(Dataset) .where(Dataset.source_id == source.id, Dataset.kind == "osm_diff_state", Dataset.sha256 == state_hash) .order_by(Dataset.id.desc()) ) if existing is not None: return existing dataset = Dataset( source_id=source.id, kind="osm_diff_state", local_path=str(state_path), sha256=state_hash, is_active=False, status="committed", metadata_json=json.dumps( { "stage": "osm_diff_state", "updates_url": source.url, "sequence_number": state.sequence_number, "timestamp": state.timestamp, "state": state.raw, }, indent=2, ), ) session.add(dataset) session.flush() return dataset def _looks_like_update_directory(url: str) -> bool: lower_path = urlparse(url).path.lower() return lower_path.endswith("-updates") or lower_path.endswith("-updates/")