from __future__ import annotations import shutil import time from pathlib import Path from urllib.parse import urlparse import requests from app.config import settings from app.models import Source from app.pipeline.utils import sha256_file def materialize_source(source: Source) -> Path: """Download/copy a source into the local cache and return the file path. Files are stored by content hash per source. Re-running an unchanged source reuses the existing cached file instead of creating another timestamped copy. """ source_dir = settings.data_dir / "sources" / f"source_{source.id}" source_dir.mkdir(parents=True, exist_ok=True) suffix = _guess_suffix(source.url, source.kind) parsed = urlparse(source.url) if parsed.scheme in {"http", "https"}: temp_path = _download_temp_path(source_dir, suffix) existing_size = temp_path.stat().st_size if temp_path.exists() else 0 headers = {"Range": f"bytes={existing_size}-"} if existing_size > 0 else None with requests.get(source.url, stream=True, timeout=120, headers=headers) as r: r.raise_for_status() mode = "ab" if existing_size > 0 and r.status_code == 206 else "wb" with temp_path.open(mode) as f: for chunk in r.iter_content(chunk_size=1024 * 1024): if chunk: f.write(chunk) return _store_or_reuse_cached_file(source_dir=source_dir, source_path=temp_path, suffix=suffix, move=True) if parsed.scheme == "file": source_path = Path(parsed.path) else: source_path = Path(source.url) if not source_path.exists(): raise FileNotFoundError(f"Source file does not exist: {source.url}") if _is_relative_to(source_path.resolve(), source_dir.resolve()): return source_path return _store_or_reuse_cached_file(source_dir=source_dir, source_path=source_path, suffix=suffix, move=False) def _download_temp_path(source_dir: Path, suffix: str) -> Path: candidates = sorted( source_dir.glob(f"*.download{suffix}"), key=lambda path: path.stat().st_mtime if path.exists() else 0, reverse=True, ) if candidates: return candidates[0] return source_dir / f"{int(time.time())}.download{suffix}" def _guess_suffix(url: str, kind: str) -> str: path = urlparse(url).path or url lower = path.lower() for suffix in (".zip", ".geojson", ".json", ".osm.pbf", ".pbf", ".osm", ".osm.xml", ".osc.gz", ".osc", ".csv"): if lower.endswith(suffix): return suffix if kind == "gtfs": return ".zip" if kind == "osm_geojson": return ".geojson" return ".dat" def _store_or_reuse_cached_file(source_dir: Path, source_path: Path, suffix: str, move: bool) -> Path: source_hash = sha256_file(source_path) target = source_dir / f"{source_hash[:16]}{suffix}" if target.exists() and sha256_file(target) == source_hash: if move and source_path != target: source_path.unlink(missing_ok=True) return target existing = _find_existing_cached_file(source_dir, source_hash, suffix, exclude=source_path) if existing is not None: if move and source_path != existing: source_path.unlink(missing_ok=True) return existing if move: source_path.replace(target) else: shutil.copyfile(source_path, target) return target def _find_existing_cached_file(source_dir: Path, source_hash: str, suffix: str, exclude: Path | None = None) -> Path | None: for candidate in sorted(source_dir.glob(f"*{suffix}")): if exclude is not None and candidate.resolve() == exclude.resolve(): continue if candidate.is_file() and sha256_file(candidate) == source_hash: return candidate return None def _is_relative_to(path: Path, parent: Path) -> bool: try: path.relative_to(parent) return True except ValueError: return False