from __future__ import annotations import json import math import re from dataclasses import dataclass from pathlib import Path from typing import Callable import osmium from sqlalchemy import delete, func, select, text from sqlalchemy.orm import Session from app.config import settings from app.models import Dataset, OsmAddress from app.pipeline.routing_layer import active_routing_dataset from app.spatial import analyze_postgresql_tables, refresh_postgis_geometries ProgressCallback = Callable[[str, str, int | None, int | None, dict[str, object] | None], None] ADDRESS_INDEX_VERSION = "osm_addresses_v2_nodes_ways_area_geometry" ADDRESS_TAGS = { "addr:housenumber", "addr:housename", "addr:street", "addr:place", "addr:postcode", "addr:city", "addr:country", "addr:unit", "addr:suburb", "addr:district", "addr:municipality", "entrance", "name", } @dataclass class AddressIndexResult: dataset_id: int input_path: str addresses: int node_addresses: int way_addresses: int skipped: int version: str = ADDRESS_INDEX_VERSION def as_dict(self) -> dict[str, object]: return { "version": self.version, "dataset_id": self.dataset_id, "input_path": self.input_path, "addresses": self.addresses, "node_addresses": self.node_addresses, "way_addresses": self.way_addresses, "skipped": self.skipped, } def rebuild_address_index( session: Session, *, dataset_id: int | None = None, input_path: str | Path | None = None, reset: bool = True, batch_size: int = 20_000, progress_callback: ProgressCallback | None = None, ) -> dict[str, object]: dataset = session.get(Dataset, dataset_id) if dataset_id is not None else active_routing_dataset(session) if dataset is None: raise ValueError("No OSM PBF dataset is available for address indexing.") path = Path(input_path or dataset.local_path) if not path.exists(): raise FileNotFoundError(f"Address index PBF does not exist: {path}") if reset: _emit(progress_callback, "address_index_clear_started", "Clearing existing OSM address index.", None, None, {"dataset_id": dataset.id}) _clear_address_rows(session, dataset_id=int(dataset.id)) session.commit() if settings.is_postgresql_database: _emit(progress_callback, "address_index_indexes_dropped", "Dropping address lookup indexes before bulk import.", None, None, {"dataset_id": dataset.id}) _drop_address_indexes(session) session.commit() _emit(progress_callback, "address_index_import_started", "Importing OSM address nodes and ways.", None, None, {"dataset_id": dataset.id, "path": str(path)}) handler = _AddressHandler( session=session, dataset_id=dataset.id, batch_size=batch_size, progress_callback=progress_callback, ) if hasattr(osmium, "FileProcessor"): _apply_address_file_processor(handler, path) else: handler.apply_file(str(path), locations=True) handler.flush() return finalize_address_index( session, dataset_id=dataset.id, input_path=path, node_addresses=handler.node_address_count, way_addresses=handler.way_address_count, skipped=handler.skipped_count, progress_callback=progress_callback, ) def finalize_address_index( session: Session, *, dataset_id: int, input_path: str | Path, node_addresses: int = 0, way_addresses: int = 0, skipped: int = 0, progress_callback: ProgressCallback | None = None, ) -> dict[str, object]: dataset = session.get(Dataset, dataset_id) if dataset is None: raise ValueError("Address index dataset does not exist.") if settings.is_postgresql_database: _emit(progress_callback, "address_index_geometry_started", "Refreshing address point geometries.", None, None, {"dataset_id": dataset.id}) refresh_postgis_geometries(session, dataset_id=dataset.id, tables=["osm_addresses"], only_missing=False) session.commit() _emit(progress_callback, "address_index_indexes_started", "Rebuilding address lookup indexes.", None, None, {"dataset_id": dataset.id}) _create_address_indexes(session) session.commit() analyze_postgresql_tables(session, ["osm_addresses"]) address_count = int(session.scalar(select(func.count()).select_from(OsmAddress).where(OsmAddress.dataset_id == dataset.id)) or 0) metadata = _metadata(dataset) metadata["address_index"] = { "version": ADDRESS_INDEX_VERSION, "addresses": address_count, "node_addresses": int(node_addresses), "way_addresses": int(way_addresses), "skipped": int(skipped), "input_path": str(input_path), } dataset.metadata_json = json.dumps(metadata, indent=2) session.commit() result = AddressIndexResult( dataset_id=dataset.id, input_path=str(input_path), addresses=address_count, node_addresses=node_addresses, way_addresses=way_addresses, skipped=skipped, ).as_dict() _emit(progress_callback, "address_index_import_completed", "OSM address index import completed.", address_count, address_count, result) return result def _clear_address_rows(session: Session, *, dataset_id: int) -> None: if settings.is_postgresql_database: other_dataset_count = int( session.scalar( select(func.count(func.distinct(OsmAddress.dataset_id))).where(OsmAddress.dataset_id != int(dataset_id)) ) or 0 ) if other_dataset_count == 0: session.execute(text("TRUNCATE TABLE osm_addresses RESTART IDENTITY")) return session.execute(delete(OsmAddress).where(OsmAddress.dataset_id == int(dataset_id))) def address_index_status(session: Session) -> dict[str, object]: dataset = active_routing_dataset(session) dataset_id = None if dataset is None else int(dataset.id) address_count = 0 metadata: dict[str, object] = {} if dataset is not None: metadata = _metadata(dataset).get("address_index") or {} if isinstance(metadata, dict): try: address_count = int(metadata.get("addresses") or 0) except (TypeError, ValueError): address_count = 0 if not address_count: address_count = int(session.scalar(select(func.count()).select_from(OsmAddress).where(OsmAddress.dataset_id == dataset.id)) or 0) installed_version = metadata.get("version") if isinstance(metadata, dict) else None return { "dataset_id": dataset_id, "addresses": address_count, "available": address_count > 0, "version": installed_version, "current_version": ADDRESS_INDEX_VERSION, "stale": bool(address_count and installed_version != ADDRESS_INDEX_VERSION), "input_path": metadata.get("input_path") if isinstance(metadata, dict) else None, } class _AddressHandler(osmium.SimpleHandler): def __init__( self, *, session: Session, dataset_id: int, batch_size: int, progress_callback: ProgressCallback | None, ) -> None: super().__init__() self.session = session self.dataset_id = int(dataset_id) self.batch_size = max(1_000, int(batch_size)) self.progress_callback = progress_callback self.rows: list[dict[str, object]] = [] self.address_count = 0 self.node_address_count = 0 self.way_address_count = 0 self.skipped_count = 0 self.processed_count = 0 def node(self, node) -> None: self.process_node(node) def way(self, way) -> None: self.process_way(way) def process_object(self, obj) -> None: if hasattr(obj, "nodes"): self.process_way(obj) elif hasattr(obj, "location"): self.process_node(obj) def process_node(self, node) -> None: self.processed_count += 1 tags = {tag.k: tag.v for tag in node.tags} if not _has_address(tags): return if not node.location.valid(): self.skipped_count += 1 return row = _address_row( dataset_id=self.dataset_id, osm_type="node", osm_id=str(node.id), tags=tags, lon=float(node.location.lon), lat=float(node.location.lat), bounds=(float(node.location.lon), float(node.location.lat), float(node.location.lon), float(node.location.lat)), geometry_geojson=None, ) if row is None: self.skipped_count += 1 return self.rows.append(row) self.node_address_count += 1 self._after_address() def process_way(self, way) -> None: self.processed_count += 1 tags = {tag.k: tag.v for tag in way.tags} if not _has_address(tags): return coords = [ (float(node.location.lon), float(node.location.lat)) for node in way.nodes if node.location.valid() ] if not coords: self.skipped_count += 1 return lon, lat = _centroid(coords) min_lon = min(coord[0] for coord in coords) max_lon = max(coord[0] for coord in coords) min_lat = min(coord[1] for coord in coords) max_lat = max(coord[1] for coord in coords) row = _address_row( dataset_id=self.dataset_id, osm_type="way", osm_id=str(way.id), tags=tags, lon=lon, lat=lat, bounds=(min_lon, min_lat, max_lon, max_lat), geometry_geojson=_address_area_geometry_geojson(coords, closed=_way_is_closed(way)), ) if row is None: self.skipped_count += 1 return self.rows.append(row) self.way_address_count += 1 self._after_address() def _after_address(self) -> None: self.address_count += 1 if len(self.rows) >= self.batch_size: self.flush() if self.address_count % 50_000 == 0: _emit( self.progress_callback, "address_index_import_batch", f"Imported {self.address_count:,} OSM addresses.", self.address_count, None, {"processed": self.processed_count, "skipped": self.skipped_count}, ) def flush(self) -> None: if not self.rows: return self.session.bulk_insert_mappings(OsmAddress, self.rows) self.session.commit() self.rows = [] def _apply_address_file_processor(handler: _AddressHandler, path: Path) -> None: processor = ( osmium.FileProcessor(str(path), osmium.osm.NODE | osmium.osm.WAY) .with_locations() .with_filter(osmium.filter.KeyFilter("addr:housenumber", "addr:housename")) ) for obj in processor: handler.process_object(obj) def _has_address(tags: dict[str, str]) -> bool: housenumber = _clean(tags.get("addr:housenumber") or tags.get("addr:housename")) if not housenumber: return False return any(_clean(tags.get(key)) for key in ("addr:street", "addr:place", "addr:city", "addr:postcode")) def _address_row( *, dataset_id: int, osm_type: str, osm_id: str, tags: dict[str, str], lon: float, lat: float, bounds: tuple[float, float, float, float], geometry_geojson: str | None = None, ) -> dict[str, object] | None: housenumber = _clean(tags.get("addr:housenumber") or tags.get("addr:housename")) street = _clean(tags.get("addr:street")) place = _clean(tags.get("addr:place")) postcode = _clean(tags.get("addr:postcode")) city = _clean(tags.get("addr:city") or tags.get("addr:municipality")) country = _clean(tags.get("addr:country")) unit = _clean(tags.get("addr:unit")) name = _clean(tags.get("name")) display_name = _display_name(housenumber=housenumber, street=street, place=place, postcode=postcode, city=city, name=name) if not display_name: return None search_text = _search_text(display_name, housenumber, street, place, postcode, city, country, unit, name) selected_tags = {key: tags[key] for key in sorted(ADDRESS_TAGS) if key in tags} min_lon, min_lat, max_lon, max_lat = bounds return { "dataset_id": dataset_id, "osm_type": osm_type, "osm_id": osm_id, "housenumber": housenumber, "street": street, "place": place, "postcode": postcode, "city": city, "country": country, "unit": unit, "name": name, "display_name": display_name, "search_text": search_text, "lon": lon, "lat": lat, "min_lon": min_lon, "min_lat": min_lat, "max_lon": max_lon, "max_lat": max_lat, "geometry_geojson": geometry_geojson, "tags_json": json.dumps(selected_tags, separators=(",", ":")) if selected_tags else None, } def _address_area_geometry_geojson(coords: list[tuple[float, float]], *, closed: bool | None = None) -> str | None: if closed is False: return None if len(coords) < 3: return None ring_coords = list(coords) first = ring_coords[0] last = ring_coords[-1] already_closed = abs(first[0] - last[0]) <= 1e-12 and abs(first[1] - last[1]) <= 1e-12 if not already_closed: if closed is not True: return None ring_coords.append(first) if len(ring_coords) < 4: return None ring = [[float(lon), float(lat)] for lon, lat in ring_coords] if len({(round(lon, 12), round(lat, 12)) for lon, lat in ring_coords[:-1]}) < 3: return None return json.dumps({"type": "Polygon", "coordinates": [ring]}, separators=(",", ":")) def _way_is_closed(way) -> bool: try: nodes = way.nodes return len(nodes) >= 3 and nodes[0].ref == nodes[-1].ref except (AttributeError, IndexError, TypeError): return False def _display_name( *, housenumber: str | None, street: str | None, place: str | None, postcode: str | None, city: str | None, name: str | None, ) -> str | None: road = street or place or name if road and housenumber: first = f"{road} {housenumber}" else: first = road or housenumber locality = " ".join(part for part in [postcode, city] if part) if first and locality: return f"{first}, {locality}" return first or locality def _search_text(*parts: str | None) -> str: return re.sub(r"\s+", " ", " ".join(part.casefold() for part in parts if part)).strip() def _clean(value: object) -> str | None: cleaned = re.sub(r"\s+", " ", str(value or "")).strip() return cleaned or None def _centroid(coords: list[tuple[float, float]]) -> tuple[float, float]: if len(coords) >= 4 and coords[0] == coords[-1]: area = 0.0 cx = 0.0 cy = 0.0 for (x1, y1), (x2, y2) in zip(coords, coords[1:]): cross = x1 * y2 - x2 * y1 area += cross cx += (x1 + x2) * cross cy += (y1 + y2) * cross if abs(area) > 1e-18: factor = 1 / (3 * area) return cx * factor, cy * factor return ( math.fsum(coord[0] for coord in coords) / len(coords), math.fsum(coord[1] for coord in coords) / len(coords), ) def _drop_address_indexes(session: Session) -> None: for name in [ "ix_osm_addresses_dataset_city_street", "ix_osm_addresses_dataset_postcode", "ix_osm_addresses_bbox", "ix_osm_addresses_geom_gist", "ix_osm_addresses_area_geom_gist", "ix_osm_addresses_search_trgm", "ix_osm_addresses_display_trgm", "ix_osm_addresses_street_key_house", "ix_osm_addresses_street_key_trgm", ]: session.execute(text(f"DROP INDEX IF EXISTS {name}")) def _create_address_indexes(session: Session) -> None: statements = [ "CREATE INDEX IF NOT EXISTS ix_osm_addresses_dataset_city_street ON osm_addresses (dataset_id, city, street, housenumber)", "CREATE INDEX IF NOT EXISTS ix_osm_addresses_dataset_postcode ON osm_addresses (dataset_id, postcode)", "CREATE INDEX IF NOT EXISTS ix_osm_addresses_bbox ON osm_addresses (dataset_id, min_lon, max_lon, min_lat, max_lat)", ] if settings.is_postgresql_database: statements.extend( [ "CREATE INDEX IF NOT EXISTS ix_osm_addresses_geom_gist ON osm_addresses USING GIST (geom)", "CREATE INDEX IF NOT EXISTS ix_osm_addresses_area_geom_gist ON osm_addresses USING GIST (area_geom)", "CREATE INDEX IF NOT EXISTS ix_osm_addresses_search_trgm ON osm_addresses USING GIN (LOWER(COALESCE(search_text, '')) gin_trgm_ops)", "CREATE INDEX IF NOT EXISTS ix_osm_addresses_display_trgm ON osm_addresses USING GIN (LOWER(COALESCE(display_name, '')) gin_trgm_ops)", "CREATE INDEX IF NOT EXISTS ix_osm_addresses_street_key_house ON osm_addresses (dataset_id, REPLACE(LOWER(COALESCE(NULLIF(street, ''), NULLIF(place, ''), '')), 'ß', 'ss'), housenumber)", "CREATE INDEX IF NOT EXISTS ix_osm_addresses_street_key_trgm ON osm_addresses USING GIN (REPLACE(LOWER(COALESCE(NULLIF(street, ''), NULLIF(place, ''), '')), 'ß', 'ss') gin_trgm_ops)", ] ) for statement in statements: session.execute(text(statement)) def _metadata(dataset: Dataset) -> dict[str, object]: try: value = json.loads(dataset.metadata_json or "{}") except json.JSONDecodeError: return {} return value if isinstance(value, dict) else {} def _emit( progress_callback: ProgressCallback | None, event_type: str, message: str, progress_current: int | None, progress_total: int | None, metadata: dict[str, object] | None = None, ) -> None: if progress_callback is not None: progress_callback(event_type, message, progress_current, progress_total, metadata)