from __future__ import annotations from dataclasses import dataclass from pathlib import Path import subprocess from urllib.parse import urljoin, urlparse import requests @dataclass(frozen=True) class ReplicationState: sequence_number: int timestamp: str | None raw: dict[str, str] def fetch_replication_state(updates_url: str, *, timeout: float = 30) -> ReplicationState: state_url = _state_url(updates_url) response = requests.get(state_url, timeout=timeout) response.raise_for_status() return parse_replication_state_text(response.text) def parse_replication_state_text(text: str) -> ReplicationState: values: dict[str, str] = {} for line in text.splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) values[key.strip()] = _unescape_state_value(value.strip()) sequence = values.get("sequenceNumber") if sequence is None: raise ValueError("replication state is missing sequenceNumber") try: sequence_number = int(sequence) except ValueError as exc: raise ValueError(f"invalid replication sequenceNumber: {sequence}") from exc return ReplicationState( sequence_number=sequence_number, timestamp=values.get("timestamp"), raw=values, ) def diff_url_for_sequence(updates_url: str, sequence_number: int) -> str: padded = str(sequence_number).zfill(max(9, ((len(str(sequence_number)) + 2) // 3) * 3)) parts = [padded[index : index + 3] for index in range(0, len(padded), 3)] return urljoin(_directory_url(updates_url), "/".join(parts) + ".osc.gz") def download_diff(updates_url: str, sequence_number: int, output_dir: Path, *, timeout: float = 120) -> Path: url = diff_url_for_sequence(updates_url, sequence_number) parsed_path = Path(urlparse(url).path) output_path = output_dir / parsed_path.name nested = output_dir / parsed_path.parent.name / output_path.name if output_path.exists(): return output_path if nested.exists(): return nested output_dir.mkdir(parents=True, exist_ok=True) temp_path = output_dir / f"{sequence_number}.download" with requests.get(url, stream=True, timeout=timeout) as response: response.raise_for_status() with temp_path.open("wb") as handle: for chunk in response.iter_content(chunk_size=1024 * 1024): if chunk: handle.write(chunk) temp_path.replace(output_path) return output_path def apply_osm_changes(base_path: Path, diff_paths: list[Path], output_path: Path, host_tool_path: Path) -> subprocess.CompletedProcess[str]: if not diff_paths: raise ValueError("no OSM change files supplied") output_path.parent.mkdir(parents=True, exist_ok=True) command = [ str(host_tool_path), "osmium", "apply-changes", "--output", str(output_path), "--overwrite", str(base_path), *[str(path) for path in diff_paths], ] return subprocess.run(command, check=True, capture_output=True, text=True) def _state_url(updates_url: str) -> str: return urljoin(_directory_url(updates_url), "state.txt") def _directory_url(url: str) -> str: return url if url.endswith("/") else f"{url}/" def _unescape_state_value(value: str) -> str: return ( value.replace("\\:", ":") .replace("\\=", "=") .replace("\\ ", " ") .replace("\\\\", "\\") )