from __future__ import annotations import hashlib import json import re from pathlib import Path from typing import Iterable, Optional from shapely.geometry import shape def sha256_file(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest() def norm_text(value: object) -> str: if value is None: return "" value = str(value).lower().strip() value = value.replace("ß", "ss") value = re.sub(r"[^a-z0-9]+", " ", value) return re.sub(r"\s+", " ", value).strip() def norm_ref(value: object) -> str: if value is None: return "" return re.sub(r"[^a-z0-9]+", "", str(value).lower()) def first_nonempty(*values: object) -> str: for value in values: if value is None: continue text = str(value).strip() if text: return text return "" def geometry_json_and_bbox(geometry: object) -> tuple[Optional[str], tuple[Optional[float], Optional[float], Optional[float], Optional[float]]]: if geometry is None: return None, (None, None, None, None) try: geom = shape(geometry) if isinstance(geometry, dict) else geometry if geom.is_empty: return None, (None, None, None, None) min_lon, min_lat, max_lon, max_lat = geom.bounds return json.dumps(geom.__geo_interface__, separators=(",", ":")), (min_lon, min_lat, max_lon, max_lat) except Exception: return None, (None, None, None, None) def bbox_overlap(a: tuple[float | None, float | None, float | None, float | None], b: tuple[float | None, float | None, float | None, float | None]) -> bool: if any(v is None for v in (*a, *b)): return False aminx, aminy, amaxx, amaxy = a # type: ignore[misc] bminx, bminy, bmaxx, bmaxy = b # type: ignore[misc] return not (amaxx < bminx or bmaxx < aminx or amaxy < bminy or bmaxy < aminy) def bbox_center(b: tuple[float | None, float | None, float | None, float | None]) -> Optional[tuple[float, float]]: if any(v is None for v in b): return None minx, miny, maxx, maxy = b # type: ignore[misc] return ((minx + maxx) / 2, (miny + maxy) / 2) def approx_bbox_center_distance_deg(a: tuple[float | None, float | None, float | None, float | None], b: tuple[float | None, float | None, float | None, float | None]) -> Optional[float]: ca = bbox_center(a) cb = bbox_center(b) if ca is None or cb is None: return None return ((ca[0] - cb[0]) ** 2 + (ca[1] - cb[1]) ** 2) ** 0.5 def batched(iterable: Iterable[dict], batch_size: int = 1000) -> Iterable[list[dict]]: batch: list[dict] = [] for item in iterable: batch.append(item) if len(batch) >= batch_size: yield batch batch = [] if batch: yield batch