Files
meubility-workbench/app/address_search.py
2026-07-01 23:29:51 +02:00

1273 lines
45 KiB
Python

from __future__ import annotations
import re
import math
from typing import Any
from sqlalchemy import select, text
from sqlalchemy.orm import Session
from app.config import settings
from app.models import OsmAddress
from app.pipeline.routing_layer import active_routing_dataset
ADDRESS_PREFIX = "address:"
ADDRESS_POINT_PREFIX = "address-point:"
COORDINATE_PREFIX = "coord:"
MAX_ADDRESS_SEARCH_ROWS = 250
def address_token(address_id: int) -> str:
return f"{ADDRESS_PREFIX}{int(address_id)}"
def address_point_token(address_id: int, lat: float, lon: float) -> str:
return f"{ADDRESS_POINT_PREFIX}{int(address_id)}:{float(lat):.7f}:{float(lon):.7f}"
def coordinate_token(lat: float, lon: float) -> str:
return f"{COORDINATE_PREFIX}{float(lat):.7f}:{float(lon):.7f}"
def is_address_token(value: object) -> bool:
token = str(value or "").strip()
return token.startswith(ADDRESS_PREFIX) or token.startswith(ADDRESS_POINT_PREFIX)
def is_address_point_token(value: object) -> bool:
return str(value or "").strip().startswith(ADDRESS_POINT_PREFIX)
def is_coordinate_token(value: object) -> bool:
return str(value or "").strip().startswith(COORDINATE_PREFIX)
def is_location_token(value: object) -> bool:
return is_address_token(value) or is_coordinate_token(value)
def parse_address_token(value: object) -> int:
token = str(value or "").strip()
if not token.startswith(ADDRESS_PREFIX):
raise ValueError("invalid address token")
try:
address_id = int(token[len(ADDRESS_PREFIX) :])
except ValueError as exc:
raise ValueError("invalid address token") from exc
if address_id <= 0:
raise ValueError("invalid address token")
return address_id
def parse_address_point_token(value: object) -> tuple[int, float, float]:
token = str(value or "").strip()
if not token.startswith(ADDRESS_POINT_PREFIX):
raise ValueError("invalid address point token")
parts = token[len(ADDRESS_POINT_PREFIX) :].split(":")
if len(parts) != 3:
raise ValueError("invalid address point token")
try:
address_id = int(parts[0])
lat = float(parts[1])
lon = float(parts[2])
except ValueError as exc:
raise ValueError("invalid address point token") from exc
if address_id <= 0 or not (-90 <= lat <= 90) or not (-180 <= lon <= 180):
raise ValueError("invalid address point token")
return address_id, lat, lon
def parse_coordinate_token(value: object) -> tuple[float, float]:
token = str(value or "").strip()
if not token.startswith(COORDINATE_PREFIX):
raise ValueError("invalid coordinate token")
parts = token[len(COORDINATE_PREFIX) :].split(":")
if len(parts) != 2:
raise ValueError("invalid coordinate token")
try:
lat = float(parts[0])
lon = float(parts[1])
except ValueError as exc:
raise ValueError("invalid coordinate token") from exc
if not (-90 <= lat <= 90) or not (-180 <= lon <= 180):
raise ValueError("invalid coordinate token")
return lat, lon
def search_addresses(
db: Session,
query: str | None = None,
*,
limit: int = 25,
bbox: tuple[float, float, float, float] | None = None,
) -> list[dict[str, Any]]:
dataset = active_routing_dataset(db)
if dataset is None:
return []
q = _normalize_query(query)
selected_limit = max(1, min(int(limit), 100))
if settings.is_postgresql_database:
if q and not _query_has_number(q):
payloads = _search_folded_addresses_postgresql(db, int(dataset.id), q, selected_limit, bbox)
return payloads[:selected_limit]
rows = (
_search_numbered_addresses_postgresql(db, int(dataset.id), q, selected_limit, bbox)
if q and _query_has_number(q)
else _search_addresses_postgresql(db, int(dataset.id), q, selected_limit, bbox)
)
else:
rows = _search_addresses_sqlite(db, int(dataset.id), q, selected_limit, bbox)
payloads = [_address_payload(row) for row in rows]
if not _query_has_number(q):
payloads = _fold_street_payloads(payloads)
return payloads[:selected_limit]
def _search_folded_addresses_postgresql(
db: Session,
dataset_id: int,
query: str,
limit: int,
bbox: tuple[float, float, float, float] | None,
) -> list[dict[str, Any]]:
combined: list[dict[str, Any]] = []
seen: set[tuple[str, str, str]] = set()
for street_query, locality_query in _folded_query_candidates(query):
query_specs: list[tuple[int, tuple[float, float, float, float] | None]] = []
if bbox is not None and locality_query is None:
query_specs.append((limit, bbox))
query_specs.append((max(limit * 3, limit), None))
else:
query_specs.append((limit, None))
for query_limit, bbox_filter in query_specs:
for payload in _search_folded_addresses_postgresql_query(
db,
dataset_id,
street_query,
query_limit,
bbox,
bbox_filter=bbox_filter,
locality_query=locality_query,
):
key = _folded_payload_key(payload)
if key in seen:
continue
seen.add(key)
combined.append(payload)
if len(combined) >= limit:
return combined[:limit]
return combined[:limit]
def _search_folded_addresses_postgresql_query(
db: Session,
dataset_id: int,
query: str,
limit: int,
bbox: tuple[float, float, float, float] | None,
*,
bbox_filter: tuple[float, float, float, float] | None,
locality_query: str | None,
) -> list[dict[str, Any]]:
params: dict[str, Any] = {
"dataset_id": dataset_id,
"query": query,
"prefix": f"{query}%",
"limit": limit,
}
bbox_filter_sql = ""
if bbox_filter is not None:
min_lon, min_lat, max_lon, max_lat = bbox_filter
params.update(
{
"filter_min_lon": min_lon,
"filter_min_lat": min_lat,
"filter_max_lon": max_lon,
"filter_max_lat": max_lat,
}
)
bbox_filter_sql = """
AND geom && ST_MakeEnvelope(:filter_min_lon, :filter_min_lat, :filter_max_lon, :filter_max_lat, 4326)
"""
locality_filter_sql = ""
locality_rank_sql = "0"
if locality_query:
params["locality_query"] = locality_query
params["locality_prefix"] = f"{locality_query}%"
locality_filter_sql = """
AND (
LOWER(COALESCE(city, '')) = :locality_query
OR LOWER(COALESCE(city, '')) LIKE :locality_prefix
OR LOWER(COALESCE(postcode, '')) = :locality_query
)
"""
locality_rank_sql = """
CASE
WHEN LOWER(COALESCE(city, '')) = :locality_query THEN 0
WHEN LOWER(COALESCE(postcode, '')) = :locality_query THEN 1
WHEN LOWER(COALESCE(city, '')) LIKE :locality_prefix THEN 2
ELSE 3
END
"""
bbox_rank_sql, bbox_distance_sql = _postgresql_bbox_rank_sql_for_alias("grouped", bbox, params)
street_key_sql = _street_key_sql()
rows = db.execute(
text(
f"""
WITH grouped AS (
SELECT
MIN(id) AS id,
MIN(dataset_id) AS dataset_id,
COALESCE(NULLIF(street, ''), NULLIF(place, '')) AS street_label,
MIN(street) AS street,
MIN(place) AS place,
postcode,
city,
MIN(country) AS country,
AVG(lat) AS lat,
AVG(lon) AS lon,
COUNT(*) AS folded_address_count,
{locality_rank_sql} AS locality_rank,
CASE
WHEN {street_key_sql} = :query THEN 0
WHEN {street_key_sql} LIKE :prefix THEN 1
ELSE 2
END AS match_rank
FROM osm_addresses
WHERE dataset_id = :dataset_id
AND {street_key_sql} <> ''
AND ({street_key_sql} = :query OR {street_key_sql} LIKE :prefix)
{bbox_filter_sql}
{locality_filter_sql}
GROUP BY COALESCE(NULLIF(street, ''), NULLIF(place, '')), postcode, city, locality_rank, match_rank
)
SELECT
id,
dataset_id,
street_label,
street,
place,
postcode,
city,
country,
lat,
lon,
folded_address_count,
locality_rank,
match_rank,
{bbox_rank_sql} AS bbox_rank,
{bbox_distance_sql} AS bbox_distance_m
FROM grouped
ORDER BY locality_rank, bbox_rank, match_rank, bbox_distance_m, street_label, postcode, city, id
LIMIT :limit
"""
),
params,
).mappings()
return [_folded_address_payload(dict(row)) for row in rows]
def _search_numbered_addresses_postgresql(
db: Session,
dataset_id: int,
query: str,
limit: int,
bbox: tuple[float, float, float, float] | None,
) -> list[dict[str, Any]]:
candidates = _numbered_query_candidates(query)
if not candidates:
return _search_addresses_postgresql(db, dataset_id, query, limit, bbox)
result_by_id: dict[int, dict[str, Any]] = {}
for street_query, housenumber_query, locality_query in candidates:
for row in _execute_numbered_addresses_postgresql(
db,
dataset_id=dataset_id,
street_query=street_query,
housenumber_query=housenumber_query,
locality_query=locality_query,
limit=limit,
bbox=bbox,
):
result_by_id.setdefault(int(row["id"]), row)
if len(result_by_id) >= limit:
return list(result_by_id.values())[:limit]
result = list(result_by_id.values())
if result:
return result
for street_query, housenumber_query, locality_query in candidates:
for row in _execute_numbered_street_fallback_postgresql(
db,
dataset_id=dataset_id,
street_query=street_query,
housenumber_query=housenumber_query,
locality_query=locality_query,
limit=limit,
bbox=bbox,
):
result_by_id.setdefault(int(row["id"]), row)
if len(result_by_id) >= limit:
return list(result_by_id.values())[:limit]
result = list(result_by_id.values())
return result or _search_addresses_postgresql(db, dataset_id, query, limit, bbox)
def _execute_numbered_addresses_postgresql(
db: Session,
*,
dataset_id: int,
street_query: str,
housenumber_query: str,
locality_query: str | None,
limit: int,
bbox: tuple[float, float, float, float] | None,
) -> list[dict[str, Any]]:
params: dict[str, Any] = {
"dataset_id": dataset_id,
"street_query": street_query,
"street_prefix": f"{street_query}%",
"housenumber_query": housenumber_query,
"housenumber_prefix": f"{housenumber_query}%",
"limit": limit,
}
locality_filter_sql = ""
locality_rank_sql = "0"
if locality_query:
params["locality_query"] = locality_query
params["locality_prefix"] = f"{locality_query}%"
locality_filter_sql = """
AND (
LOWER(COALESCE(city, '')) = :locality_query
OR LOWER(COALESCE(city, '')) LIKE :locality_prefix
OR LOWER(COALESCE(postcode, '')) = :locality_query
)
"""
locality_rank_sql = """
CASE
WHEN LOWER(COALESCE(city, '')) = :locality_query THEN 0
WHEN LOWER(COALESCE(postcode, '')) = :locality_query THEN 1
WHEN LOWER(COALESCE(city, '')) LIKE :locality_prefix THEN 2
ELSE 3
END
"""
bbox_rank_sql, bbox_distance_sql = _postgresql_bbox_rank_sql(bbox, params)
street_key_sql = _street_key_sql()
rows = db.execute(
text(
f"""
SELECT
id,
dataset_id,
housenumber,
street,
place,
postcode,
city,
country,
unit,
name,
display_name,
search_text,
lon,
lat,
{bbox_rank_sql} AS bbox_rank,
{bbox_distance_sql} AS bbox_distance_m,
{locality_rank_sql} AS locality_rank,
CASE
WHEN {street_key_sql} = :street_query AND LOWER(COALESCE(housenumber, '')) = :housenumber_query THEN 0
WHEN {street_key_sql} = :street_query AND LOWER(COALESCE(housenumber, '')) LIKE :housenumber_prefix THEN 1
WHEN {street_key_sql} LIKE :street_prefix AND LOWER(COALESCE(housenumber, '')) LIKE :housenumber_prefix THEN 2
ELSE 3
END AS match_rank,
1.0 AS similarity_rank
FROM osm_addresses
WHERE dataset_id = :dataset_id
AND {street_key_sql} <> ''
AND ({street_key_sql} = :street_query OR {street_key_sql} LIKE :street_prefix)
AND LOWER(COALESCE(housenumber, '')) LIKE :housenumber_prefix
{locality_filter_sql}
ORDER BY locality_rank, bbox_rank, match_rank, bbox_distance_m, display_name, id
LIMIT :limit
"""
),
params,
).mappings()
return [dict(row) for row in rows]
def _execute_numbered_street_fallback_postgresql(
db: Session,
*,
dataset_id: int,
street_query: str,
housenumber_query: str,
locality_query: str | None,
limit: int,
bbox: tuple[float, float, float, float] | None,
) -> list[dict[str, Any]]:
params: dict[str, Any] = {
"dataset_id": dataset_id,
"street_query": street_query,
"street_prefix": f"{street_query}%",
"housenumber_query": housenumber_query,
"housenumber_prefix": f"{housenumber_query}%",
"housenumber_number": _leading_number(housenumber_query),
"limit": limit,
}
locality_filter_sql, locality_rank_sql = _postgresql_locality_sql(locality_query, params, indent=" ")
bbox_rank_sql, bbox_distance_sql = _postgresql_bbox_rank_sql(bbox, params)
street_key_sql = _street_key_sql()
rows = db.execute(
text(
f"""
SELECT
id,
dataset_id,
housenumber,
street,
place,
postcode,
city,
country,
unit,
name,
display_name,
search_text,
lon,
lat,
{bbox_rank_sql} AS bbox_rank,
{bbox_distance_sql} AS bbox_distance_m,
{locality_rank_sql} AS locality_rank,
CASE
WHEN LOWER(COALESCE(housenumber, '')) = :housenumber_query THEN 0
WHEN LOWER(COALESCE(housenumber, '')) LIKE :housenumber_prefix THEN 1
ELSE 2
END AS match_rank,
CASE
WHEN :housenumber_number IS NULL THEN 999999
WHEN substring(COALESCE(housenumber, '') from '^[0-9]+') = '' THEN 999999
ELSE abs(CAST(substring(COALESCE(housenumber, '') from '^[0-9]+') AS INTEGER) - :housenumber_number)
END AS house_distance
FROM osm_addresses
WHERE dataset_id = :dataset_id
AND {street_key_sql} <> ''
AND ({street_key_sql} = :street_query OR {street_key_sql} LIKE :street_prefix)
{locality_filter_sql}
ORDER BY locality_rank, bbox_rank, match_rank, house_distance, bbox_distance_m, display_name, id
LIMIT :limit
"""
),
params,
).mappings()
return [dict(row) for row in rows]
def _postgresql_locality_sql(locality_query: str | None, params: dict[str, Any], *, indent: str = "") -> tuple[str, str]:
if not locality_query:
return "", "0"
params["locality_query"] = locality_query
params["locality_prefix"] = f"{locality_query}%"
filter_sql = f"""
{indent}AND (
{indent} LOWER(COALESCE(city, '')) = :locality_query
{indent} OR LOWER(COALESCE(city, '')) LIKE :locality_prefix
{indent} OR LOWER(COALESCE(postcode, '')) = :locality_query
{indent})
"""
rank_sql = """
CASE
WHEN LOWER(COALESCE(city, '')) = :locality_query THEN 0
WHEN LOWER(COALESCE(postcode, '')) = :locality_query THEN 1
WHEN LOWER(COALESCE(city, '')) LIKE :locality_prefix THEN 2
ELSE 3
END
"""
return filter_sql, rank_sql
def address_by_token(db: Session, value: object) -> OsmAddress:
address_id = parse_address_token(value)
address = db.get(OsmAddress, address_id)
if address is None:
raise ValueError("selected address does not exist")
return address
def address_point_by_token(db: Session, value: object) -> tuple[OsmAddress, float, float]:
address_id, lat, lon = parse_address_point_token(value)
address = db.get(OsmAddress, address_id)
if address is None:
raise ValueError("selected address does not exist")
return address, lat, lon
def nearest_addresses(
db: Session,
*,
lat: float,
lon: float,
limit: int = 3,
radius_m: float = 150,
) -> list[dict[str, Any]]:
dataset = active_routing_dataset(db)
if dataset is None:
return []
selected_limit = max(1, min(int(limit), 25))
if not settings.is_postgresql_database:
radius_deg = float(radius_m) / 111_320
rows = db.scalars(
select(OsmAddress)
.where(
OsmAddress.dataset_id == dataset.id,
OsmAddress.lat >= lat - radius_deg,
OsmAddress.lat <= lat + radius_deg,
OsmAddress.lon >= lon - radius_deg,
OsmAddress.lon <= lon + radius_deg,
)
.limit(250)
).all()
payloads = []
for row in rows:
payload = _address_payload(row)
payload["distance_m"] = _distance_m(lat, lon, float(row.lat), float(row.lon))
if payload["distance_m"] <= radius_m:
payloads.append(payload)
payloads.sort(key=lambda item: (float(item.get("distance_m") or 0), item.get("display_name") or ""))
return payloads[:selected_limit]
radius_deg = float(radius_m) / 111_320
rows = db.execute(
text(
"""
WITH point AS (
SELECT ST_SetSRID(ST_MakePoint(:lon, :lat), 4326) AS geom
)
SELECT
id,
dataset_id,
housenumber,
street,
place,
postcode,
city,
country,
unit,
name,
display_name,
search_text,
lon,
lat,
ST_DistanceSphere(osm_addresses.geom, point.geom) AS distance_m
FROM osm_addresses
CROSS JOIN point
WHERE dataset_id = :dataset_id
AND osm_addresses.geom IS NOT NULL
AND osm_addresses.geom && ST_Expand(point.geom, :radius_deg)
AND ST_DWithin(osm_addresses.geom::geography, point.geom::geography, :radius_m)
ORDER BY osm_addresses.geom <-> point.geom, id
LIMIT :limit
"""
),
{
"dataset_id": int(dataset.id),
"lat": float(lat),
"lon": float(lon),
"radius_deg": radius_deg,
"radius_m": float(radius_m),
"limit": selected_limit,
},
).mappings()
payloads = []
for row in rows:
payload = _address_payload(dict(row))
payload["distance_m"] = float(row["distance_m"] or 0)
payloads.append(payload)
return payloads
def address_at_point(
db: Session,
*,
lat: float,
lon: float,
max_size_m: float = 250,
node_radius_m: float = 12,
) -> dict[str, Any] | None:
dataset = active_routing_dataset(db)
if dataset is None:
return None
lat_span = float(max_size_m) / 111_320
lon_span = float(max_size_m) / (111_320 * max(0.2, abs(math.cos(math.radians(float(lat))))))
if not settings.is_postgresql_database:
row = db.scalar(
select(OsmAddress)
.where(
OsmAddress.dataset_id == dataset.id,
OsmAddress.osm_type == "way",
OsmAddress.min_lon <= lon,
OsmAddress.max_lon >= lon,
OsmAddress.min_lat <= lat,
OsmAddress.max_lat >= lat,
(OsmAddress.max_lon - OsmAddress.min_lon) <= lon_span,
(OsmAddress.max_lat - OsmAddress.min_lat) <= lat_span,
)
.order_by((OsmAddress.max_lon - OsmAddress.min_lon) * (OsmAddress.max_lat - OsmAddress.min_lat), OsmAddress.id)
)
if row is None:
return None
payload = _address_payload(row)
payload["distance_m"] = _distance_m(lat, lon, float(row.lat), float(row.lon))
payload["selection_reason"] = "address_bbox"
return payload
candidate_radius_m = max(float(max_size_m), float(node_radius_m), 20.0)
candidate_radius_deg = candidate_radius_m / 111_320
row = db.execute(
text(
"""
WITH point AS (
SELECT ST_SetSRID(ST_MakePoint(:lon, :lat), 4326) AS geom
),
polygon_hit AS (
SELECT
id,
dataset_id,
housenumber,
street,
place,
postcode,
city,
country,
unit,
name,
display_name,
search_text,
lon,
lat,
ST_DistanceSphere(osm_addresses.geom, point.geom) AS distance_m,
'address_polygon' AS selection_reason
FROM osm_addresses
CROSS JOIN point
WHERE dataset_id = :dataset_id
AND osm_type = 'way'
AND area_geom IS NOT NULL
AND area_geom && point.geom
AND ST_Covers(area_geom, point.geom)
ORDER BY ST_Area(area_geom::geography), ST_DistanceSphere(osm_addresses.geom, point.geom), id
LIMIT 1
),
nearby_candidates AS MATERIALIZED (
SELECT
id,
dataset_id,
osm_type,
housenumber,
street,
place,
postcode,
city,
country,
unit,
name,
display_name,
search_text,
lon,
lat,
min_lon,
min_lat,
max_lon,
max_lat,
osm_addresses.geom AS geom,
ST_DistanceSphere(osm_addresses.geom, point.geom) AS distance_m
FROM osm_addresses
CROSS JOIN point
WHERE dataset_id = :dataset_id
AND osm_addresses.geom IS NOT NULL
AND osm_addresses.geom && ST_Expand(point.geom, :candidate_radius_deg)
ORDER BY osm_addresses.geom <-> point.geom, id
LIMIT 200
),
bbox_hit AS (
SELECT
id,
dataset_id,
housenumber,
street,
place,
postcode,
city,
country,
unit,
name,
display_name,
search_text,
lon,
lat,
distance_m,
'address_bbox' AS selection_reason
FROM nearby_candidates
WHERE dataset_id = :dataset_id
AND osm_type = 'way'
AND min_lon <= :lon
AND max_lon >= :lon
AND min_lat <= :lat
AND max_lat >= :lat
AND (max_lon - min_lon) <= :lon_span
AND (max_lat - min_lat) <= :lat_span
AND NOT EXISTS (SELECT 1 FROM polygon_hit)
ORDER BY ABS((max_lon - min_lon) * (max_lat - min_lat)), distance_m, id
LIMIT 1
),
node_hit AS (
SELECT
id,
dataset_id,
housenumber,
street,
place,
postcode,
city,
country,
unit,
name,
display_name,
search_text,
lon,
lat,
distance_m,
'address_node' AS selection_reason
FROM nearby_candidates
WHERE osm_type = 'node'
AND distance_m <= :node_radius_m
AND NOT EXISTS (SELECT 1 FROM polygon_hit)
AND NOT EXISTS (SELECT 1 FROM bbox_hit)
ORDER BY distance_m, id
LIMIT 1
)
SELECT * FROM polygon_hit
UNION ALL
SELECT * FROM bbox_hit
UNION ALL
SELECT * FROM node_hit
LIMIT 1
"""
),
{
"dataset_id": int(dataset.id),
"lat": float(lat),
"lon": float(lon),
"lat_span": lat_span,
"lon_span": lon_span,
"candidate_radius_deg": candidate_radius_deg,
"node_radius_m": max(0.0, float(node_radius_m)),
},
).mappings().first()
if row is None:
return None
payload = _address_payload(dict(row))
payload["distance_m"] = float(row["distance_m"] or 0)
payload["selection_reason"] = row["selection_reason"]
return payload
def _search_addresses_postgresql(
db: Session,
dataset_id: int,
query: str,
limit: int,
bbox: tuple[float, float, float, float] | None,
) -> list[dict[str, Any]]:
params: dict[str, Any] = {"dataset_id": dataset_id, "limit": _raw_address_limit(query, limit)}
where = ["dataset_id = :dataset_id"]
tokens = [token for token in re.split(r"[\s,;/]+", query) if token]
long_tokens = [token for token in tokens if len(token) >= 3]
if query:
params["query"] = query
params["pattern"] = f"%{query}%"
token_clauses = []
for index, token in enumerate(long_tokens[:6]):
key = f"token_{index}"
params[key] = f"%{token}%"
token_clauses.append(f"LOWER(COALESCE(search_text, '')) LIKE :{key}")
token_sql = " AND ".join(token_clauses)
where.append(
"("
"LOWER(COALESCE(search_text, '')) % :query "
"OR LOWER(COALESCE(search_text, '')) LIKE :pattern "
+ (f"OR ({token_sql})" if token_sql else "")
+ ")"
)
bbox_rank_sql, bbox_distance_sql = _postgresql_bbox_rank_sql(bbox, params)
rank_sql = (
"""
CASE
WHEN :query = '' THEN 4
WHEN LOWER(COALESCE(display_name, '')) = :query THEN 0
WHEN LOWER(COALESCE(display_name, '')) LIKE (:query || '%') THEN 1
WHEN LOWER(COALESCE(search_text, '')) LIKE :pattern THEN 2
ELSE 3
END
"""
if query
else "4"
)
if not query:
params["query"] = ""
params["pattern"] = "%"
rows = db.execute(
text(
f"""
SELECT
id,
dataset_id,
housenumber,
street,
place,
postcode,
city,
country,
unit,
name,
display_name,
search_text,
lon,
lat,
{bbox_rank_sql} AS bbox_rank,
{bbox_distance_sql} AS bbox_distance_m,
{rank_sql} AS match_rank,
CASE
WHEN :query = '' THEN 0
ELSE similarity(LOWER(COALESCE(search_text, '')), :query)
END AS similarity_rank
FROM osm_addresses
WHERE {" AND ".join(where)}
ORDER BY bbox_rank, match_rank, similarity_rank DESC, display_name, id
LIMIT :limit
"""
),
params,
).mappings()
return [dict(row) for row in rows]
def _search_addresses_sqlite(
db: Session,
dataset_id: int,
query: str,
limit: int,
bbox: tuple[float, float, float, float] | None,
) -> list[OsmAddress]:
stmt = select(OsmAddress).where(OsmAddress.dataset_id == dataset_id)
if query:
tokens = [token for token in re.split(r"[\s,;/]+", query) if token]
for token in tokens[:6]:
stmt = stmt.where(OsmAddress.search_text.ilike(f"%{token}%"))
stmt = stmt.limit(MAX_ADDRESS_SEARCH_ROWS)
rows = list(db.scalars(stmt).all())
rows.sort(key=lambda row: (_bbox_rank(row.lat, row.lon, bbox), _address_match_rank(row, query), row.display_name, row.id))
return rows[: _raw_address_limit(query, limit)]
def _postgresql_bbox_rank_sql(
bbox: tuple[float, float, float, float] | None,
params: dict[str, Any],
) -> tuple[str, str]:
if bbox is None:
return "1", "0.0"
min_lon, min_lat, max_lon, max_lat = bbox
center_lon = (min_lon + max_lon) / 2
center_lat = (min_lat + max_lat) / 2
params.update(
{
"bbox_min_lon": min_lon,
"bbox_min_lat": min_lat,
"bbox_max_lon": max_lon,
"bbox_max_lat": max_lat,
"bbox_center_lon": center_lon,
"bbox_center_lat": center_lat,
}
)
bbox_rank_sql = """
CASE
WHEN lon IS NULL OR lat IS NULL THEN 2
WHEN lon BETWEEN :bbox_min_lon AND :bbox_max_lon
AND lat BETWEEN :bbox_min_lat AND :bbox_max_lat THEN 0
ELSE 1
END
"""
bbox_distance_sql = """
sqrt(
power((lon - :bbox_center_lon) * 111320.0 * cos(radians(:bbox_center_lat)), 2)
+ power((lat - :bbox_center_lat) * 111320.0, 2)
)
"""
return bbox_rank_sql, bbox_distance_sql
def _postgresql_bbox_rank_sql_for_alias(
alias: str,
bbox: tuple[float, float, float, float] | None,
params: dict[str, Any],
) -> tuple[str, str]:
if bbox is None:
return "1", "0.0"
min_lon, min_lat, max_lon, max_lat = bbox
center_lon = (min_lon + max_lon) / 2
center_lat = (min_lat + max_lat) / 2
params.update(
{
"bbox_min_lon": min_lon,
"bbox_min_lat": min_lat,
"bbox_max_lon": max_lon,
"bbox_max_lat": max_lat,
"bbox_center_lon": center_lon,
"bbox_center_lat": center_lat,
}
)
bbox_rank_sql = f"""
CASE
WHEN {alias}.lon IS NULL OR {alias}.lat IS NULL THEN 2
WHEN {alias}.lon BETWEEN :bbox_min_lon AND :bbox_max_lon
AND {alias}.lat BETWEEN :bbox_min_lat AND :bbox_max_lat THEN 0
ELSE 1
END
"""
bbox_distance_sql = f"""
sqrt(
power(({alias}.lon - :bbox_center_lon) * 111320.0 * cos(radians(:bbox_center_lat)), 2)
+ power(({alias}.lat - :bbox_center_lat) * 111320.0, 2)
)
"""
return bbox_rank_sql, bbox_distance_sql
def _address_payload(row: OsmAddress | dict[str, Any]) -> dict[str, Any]:
get = row.get if isinstance(row, dict) else lambda key, default=None: getattr(row, key, default)
address_id = int(get("id"))
street = get("street")
place = get("place")
housenumber = get("housenumber")
city = get("city")
local_name = " ".join(str(part) for part in [street or place, housenumber] if part).strip() or get("display_name")
return {
"id": address_token(address_id),
"address_id": address_id,
"kind": "address",
"dataset_id": get("dataset_id"),
"stop_id": address_token(address_id),
"name": get("display_name"),
"display_name": get("display_name"),
"city": city,
"local_name": local_name,
"street": street,
"place": place,
"housenumber": housenumber,
"postcode": get("postcode"),
"lat": get("lat"),
"lon": get("lon"),
"source_id": None,
"source_name": "OSM address",
"scheduled": False,
"grouped": False,
"grouped_stop_count": 1,
"folded_address_count": 1,
"approximate": False,
}
def _folded_address_payload(row: dict[str, Any]) -> dict[str, Any]:
address_id = int(row["id"])
lat = row.get("lat")
lon = row.get("lon")
street_label = row.get("street_label") or row.get("street") or row.get("place")
locality = " ".join(str(part) for part in [row.get("postcode"), row.get("city")] if part).strip()
display_name = f"{street_label}, {locality}" if locality else str(street_label or "Address")
token = address_point_token(address_id, float(lat), float(lon)) if lat is not None and lon is not None else address_token(address_id)
return {
"id": token,
"address_id": address_id,
"representative_address_id": address_id,
"kind": "address",
"dataset_id": row.get("dataset_id"),
"stop_id": token,
"name": display_name,
"display_name": display_name,
"city": row.get("city"),
"local_name": str(street_label or display_name),
"street": row.get("street") or street_label,
"place": row.get("place"),
"housenumber": None,
"postcode": row.get("postcode"),
"lat": lat,
"lon": lon,
"source_id": None,
"source_name": "OSM street address",
"scheduled": False,
"grouped": False,
"grouped_stop_count": 1,
"folded_address_count": int(row.get("folded_address_count") or 1),
"approximate": True,
}
def _folded_payload_key(payload: dict[str, Any]) -> tuple[str, str, str]:
return (
str(payload.get("street") or payload.get("place") or payload.get("display_name") or "").casefold(),
str(payload.get("postcode") or "").casefold(),
str(payload.get("city") or "").casefold(),
)
def _fold_street_payloads(payloads: list[dict[str, Any]]) -> list[dict[str, Any]]:
folded: dict[tuple[str, str, str], dict[str, Any]] = {}
singles: list[dict[str, Any]] = []
for payload in payloads:
street = str(payload.get("street") or payload.get("place") or "").casefold().strip()
city = str(payload.get("city") or "").casefold().strip()
postcode = str(payload.get("postcode") or "").casefold().strip()
if not street:
singles.append(payload)
continue
key = (street, city, postcode)
current = folded.get(key)
if current is None:
current = dict(payload)
current["_representatives"] = [payload]
current["folded_address_count"] = 1
current["approximate"] = True
current["housenumber"] = None
local_name = str(payload.get("street") or payload.get("place") or "")
locality = " ".join(part for part in [payload.get("postcode"), payload.get("city")] if part)
current["local_name"] = local_name
current["display_name"] = f"{local_name}, {locality}" if locality else local_name
current["name"] = current["display_name"]
folded[key] = current
continue
current["folded_address_count"] = int(current.get("folded_address_count") or 1) + 1
current["_representatives"].append(payload)
result = list(folded.values())
for payload in result:
representatives = payload.pop("_representatives", [])
coords = [
(float(item["lat"]), float(item["lon"]))
for item in representatives
if item.get("lat") is not None and item.get("lon") is not None
]
if coords:
payload["lat"] = sum(item[0] for item in coords) / len(coords)
payload["lon"] = sum(item[1] for item in coords) / len(coords)
token = address_point_token(int(payload["address_id"]), float(payload["lat"]), float(payload["lon"]))
payload["id"] = token
payload["stop_id"] = token
payload["representative_address_id"] = payload["address_id"]
payload["source_name"] = "OSM street address"
result.extend(singles)
return result
def _address_match_rank(row: OsmAddress, query: str) -> int:
if not query:
return 4
haystack = row.search_text.casefold()
if row.display_name.casefold() == query:
return 0
if row.display_name.casefold().startswith(query):
return 1
if query in haystack:
return 2
tokens = [token for token in re.split(r"[\s,;/]+", query) if token]
return 3 if tokens and all(token in haystack for token in tokens) else 4
def _bbox_rank(lat: float | None, lon: float | None, bbox: tuple[float, float, float, float] | None) -> tuple[int, float]:
if bbox is None:
return (1, 0.0)
if lat is None or lon is None:
return (2, float("inf"))
min_lon, min_lat, max_lon, max_lat = bbox
if min_lon <= lon <= max_lon and min_lat <= lat <= max_lat:
return (0, 0.0)
center_lon = (min_lon + max_lon) / 2
center_lat = (min_lat + max_lat) / 2
return (1, (lon - center_lon) * (lon - center_lon) + (lat - center_lat) * (lat - center_lat))
def _distance_m(lat_a: float, lon_a: float, lat_b: float, lon_b: float) -> float:
return (((float(lon_b) - float(lon_a)) ** 2 + (float(lat_b) - float(lat_a)) ** 2) ** 0.5) * 111_320
def _normalize_query(query: str | None) -> str:
return re.sub(r"\s+", " ", str(query or "").casefold().strip())
def _query_has_number(query: str) -> bool:
return bool(re.search(r"\d", query or ""))
def _split_numbered_query(query: str) -> tuple[str, str, str | None] | None:
candidates = _numbered_query_candidates(query)
return candidates[0] if candidates else None
def _numbered_query_candidates(query: str) -> list[tuple[str, str, str | None]]:
normalized = _normalize_query(query)
if "," in normalized:
left, right = [part.strip() for part in normalized.split(",", 1)]
left_has_number = _query_has_number(left)
right_has_number = _query_has_number(right)
if left_has_number and not right_has_number:
return _numbered_query_candidates_from_parts(left, right)
if left_has_number and _looks_like_locality(right):
return _numbered_query_candidates_from_parts(left, right)
if right_has_number:
candidates: list[tuple[str, str, str | None]] = []
for candidate in _numbered_query_candidates_from_parts(right, left):
if candidate not in candidates:
candidates.append(candidate)
for candidate in _numbered_query_candidates_from_parts(left, right):
if candidate not in candidates:
candidates.append(candidate)
return candidates
street_part, locality_query = _split_locality_query(query)
return _numbered_query_candidates_from_parts(street_part, locality_query)
def _numbered_query_candidates_from_parts(street_part: str, locality_query: str | None) -> list[tuple[str, str, str | None]]:
match = re.search(r"\b(\d+[a-zäöüß]?)\b", street_part or "", flags=re.IGNORECASE)
if match is None:
return []
housenumber = match.group(1).casefold()
street = re.sub(r"\b" + re.escape(match.group(1)) + r"\b", " ", street_part, count=1, flags=re.IGNORECASE)
street = _normalize_query(street)
if len(street) < 3 or not housenumber:
return []
candidates: list[tuple[str, str, str | None]] = []
for locality in _locality_candidates(locality_query):
_append_numbered_candidate(candidates, street, housenumber, locality)
if locality_query is None:
tokens = [token for token in street.split(" ") if token]
for index in range(len(tokens) - 1, 0, -1):
leading_locality = " ".join(tokens[:index])
trailing_street = " ".join(tokens[index:])
_append_numbered_candidate(candidates, trailing_street, housenumber, leading_locality)
for index in range(1, len(tokens)):
leading_street = " ".join(tokens[:index])
trailing_locality = " ".join(tokens[index:])
_append_numbered_candidate(candidates, leading_street, housenumber, trailing_locality)
return candidates
def _append_numbered_candidate(
candidates: list[tuple[str, str, str | None]],
street: str,
housenumber: str,
locality: str | None,
) -> None:
normalized_street = _normalize_query(street)
normalized_locality = _normalize_query(locality) if locality else None
if len(normalized_street) < 3 or not housenumber:
return
if normalized_locality is not None and len(normalized_locality) < 2:
normalized_locality = None
candidate = (normalized_street, housenumber, normalized_locality)
if candidate not in candidates:
candidates.append(candidate)
def _locality_candidates(locality: str | None) -> list[str | None]:
normalized = _normalize_query(locality)
if not normalized:
return [None]
candidates: list[str | None] = []
_append_locality_candidate(candidates, normalized)
match = re.match(r"^(\d{4,5})\s+(.+)$", normalized)
if match:
_append_locality_candidate(candidates, match.group(2))
_append_locality_candidate(candidates, match.group(1))
match = re.match(r"^(.+)\s+(\d{4,5})$", normalized)
if match:
_append_locality_candidate(candidates, match.group(1))
_append_locality_candidate(candidates, match.group(2))
return candidates
def _append_locality_candidate(candidates: list[str | None], value: str | None) -> None:
normalized = _normalize_query(value)
candidate = normalized if normalized else None
if candidate not in candidates:
candidates.append(candidate)
def _looks_like_locality(value: str) -> bool:
normalized = _normalize_query(value)
return bool(re.match(r"^\d{4,5}(\s+|$)", normalized)) or not _query_has_number(normalized)
def _leading_number(value: str | None) -> int | None:
match = re.match(r"\s*(\d+)", str(value or ""))
return None if match is None else int(match.group(1))
def _split_locality_query(query: str) -> tuple[str, str | None]:
normalized = _normalize_query(query)
if "," not in normalized:
return normalized, None
locality, remainder = normalized.split(",", 1)
locality = locality.strip()
remainder = remainder.strip()
if len(locality) < 2 or len(remainder) < 2:
return normalized, None
return remainder, locality
def _folded_query_candidates(query: str) -> list[tuple[str, str | None]]:
normalized = _normalize_query(query)
if not normalized:
return []
street_query, locality_query = _split_locality_query(normalized)
if locality_query:
candidates: list[tuple[str, str | None]] = []
for locality in _locality_candidates(locality_query):
_append_folded_candidate(candidates, street_query, locality)
return candidates
candidates = [(normalized, None)]
tokens = [token for token in normalized.split(" ") if token]
if len(tokens) < 2:
return candidates
for index in range(1, len(tokens)):
leading_locality = " ".join(tokens[:index])
trailing_street = " ".join(tokens[index:])
_append_folded_candidate(candidates, trailing_street, leading_locality)
for index in range(1, len(tokens)):
leading_street = " ".join(tokens[:index])
trailing_locality = " ".join(tokens[index:])
_append_folded_candidate(candidates, leading_street, trailing_locality)
return candidates
def _append_folded_candidate(
candidates: list[tuple[str, str | None]],
street: str,
locality: str | None,
) -> None:
normalized_street = _normalize_query(street)
if len(normalized_street) < 3:
return
for locality_candidate in _locality_candidates(locality):
if locality_candidate is not None and len(locality_candidate) < 2:
locality_candidate = None
candidate = (normalized_street, locality_candidate)
if candidate not in candidates:
candidates.append(candidate)
def _street_key_sql() -> str:
return "REPLACE(LOWER(COALESCE(NULLIF(street, ''), NULLIF(place, ''), '')), 'ß', 'ss')"
def _raw_address_limit(query: str, limit: int) -> int:
multiplier = 30 if query and not _query_has_number(query) else 6
return min(MAX_ADDRESS_SEARCH_ROWS, max(limit * multiplier, limit))