Files
meubility-workbench/app/osm_storage.py
2026-07-01 23:29:51 +02:00

982 lines
37 KiB
Python

from __future__ import annotations
import json
import sqlite3
from contextlib import contextmanager
from pathlib import Path
from typing import Iterator, Sequence
from sqlalchemy import and_, func, insert, not_, or_, select, text
from sqlalchemy.dialects.postgresql import insert as postgresql_insert
from sqlalchemy.orm import Session
from app.config import settings
from app.models import Dataset, OsmFeature
from app.spatial import refresh_postgis_geometries
OSM_STORAGE_METADATA_KEY = "osm_storage"
OSM_STORAGE_MAIN = "main"
OSM_STORAGE_SIDECAR_FEATURES = "sidecar_features"
SQLITE_IN_CHUNK_SIZE = 800
OSM_SIDECAR_ROUTE_SCOPE_INDEXES = ["ix_osm_sidecar_scope_bbox"]
OSM_FEATURE_COLUMNS = [
"dataset_id",
"osm_type",
"osm_id",
"kind",
"mode",
"route_scope",
"name",
"ref",
"operator",
"network",
"geometry_geojson",
"min_lon",
"min_lat",
"max_lon",
"max_lat",
"tags_json",
"route_key",
"operator_key",
]
def effective_osm_feature_storage(value: str | None = None) -> str:
configured = str(value or settings.osm_feature_storage or OSM_STORAGE_SIDECAR_FEATURES).strip().lower()
if configured in {OSM_STORAGE_MAIN, "main", "main_db", "postgres", "postgresql"}:
return OSM_STORAGE_MAIN
if settings.is_postgresql_database and not settings.postgres_use_sidecars:
return OSM_STORAGE_MAIN
return OSM_STORAGE_SIDECAR_FEATURES
class MissingOsmSidecar(FileNotFoundError):
pass
def dataset_metadata(dataset: Dataset) -> dict:
try:
metadata = json.loads(dataset.metadata_json or "{}")
except json.JSONDecodeError:
return {}
return metadata if isinstance(metadata, dict) else {}
def features_are_sidecar(dataset: Dataset | None) -> bool:
if dataset is None:
return False
storage = dataset_metadata(dataset).get(OSM_STORAGE_METADATA_KEY)
if not isinstance(storage, dict):
return False
tables = storage.get("tables")
if isinstance(tables, dict):
return tables.get("osm_features") == "sidecar"
return storage.get("mode") == OSM_STORAGE_SIDECAR_FEATURES
def sidecar_path(dataset: Dataset | None) -> Path | None:
if dataset is None:
return None
storage = dataset_metadata(dataset).get(OSM_STORAGE_METADATA_KEY)
if not isinstance(storage, dict):
return None
value = storage.get("sidecar_path")
if not value:
return None
return Path(str(value))
def dataset_sidecar_paths(dataset: Dataset) -> list[Path]:
path = sidecar_path(dataset)
return [] if path is None else [path]
def missing_sidecar_paths(dataset: Dataset | None) -> list[str]:
if not features_are_sidecar(dataset):
return []
path = sidecar_path(dataset)
if path is None or path.exists():
return []
return [str(path)]
@contextmanager
def sidecar_connection(dataset: Dataset) -> Iterator[sqlite3.Connection]:
path = sidecar_path(dataset)
if path is None:
raise MissingOsmSidecar(f"dataset #{dataset.id} does not reference an OSM sidecar")
if not path.exists():
raise MissingOsmSidecar(f"OSM sidecar does not exist: {path}")
connection = sqlite3.connect(f"file:{path}?mode=ro", uri=True)
connection.row_factory = sqlite3.Row
try:
yield connection
finally:
connection.close()
@contextmanager
def writable_sidecar_connection(dataset: Dataset) -> Iterator[sqlite3.Connection]:
path = sidecar_path(dataset)
if path is None:
raise MissingOsmSidecar(f"dataset #{dataset.id} does not reference an OSM sidecar")
if not path.exists():
raise MissingOsmSidecar(f"OSM sidecar does not exist: {path}")
connection = sqlite3.connect(path)
connection.row_factory = sqlite3.Row
try:
connection.execute(f"PRAGMA busy_timeout={int(settings.sqlite_busy_timeout_ms)}")
connection.execute("PRAGMA synchronous=NORMAL")
yield connection
finally:
connection.close()
def create_osm_sidecar(dataset: Dataset, rows: Sequence[dict[str, object]], *, source_hash: str | None = None) -> dict:
path = _new_sidecar_path(dataset, source_hash or dataset.sha256)
path.parent.mkdir(parents=True, exist_ok=True)
if path.exists():
path.unlink()
connection = sqlite3.connect(path)
try:
connection.execute("PRAGMA journal_mode=OFF")
connection.execute("PRAGMA synchronous=OFF")
_create_schema(connection)
deduped_rows, duplicate_count = dedupe_osm_feature_rows(rows)
inserted = 0
counts = {"route": 0, "stop": 0, "station": 0, "terminal": 0, "infra": 0, "feature": 0}
insert_sql = f"""
INSERT INTO osm_features
({", ".join(["id", *OSM_FEATURE_COLUMNS])})
VALUES
({", ".join(["?"] * (len(OSM_FEATURE_COLUMNS) + 1))})
"""
batch = []
for index, row in enumerate(deduped_rows, start=1):
kind = str(row.get("kind") or "feature")
counts[kind] = counts.get(kind, 0) + 1
batch.append((index, *[row.get(column) for column in OSM_FEATURE_COLUMNS]))
if len(batch) >= 5000:
connection.executemany(insert_sql, batch)
inserted += len(batch)
batch.clear()
if batch:
connection.executemany(insert_sql, batch)
inserted += len(batch)
connection.commit()
_create_indexes(connection)
connection.commit()
finally:
connection.close()
return {
"mode": OSM_STORAGE_SIDECAR_FEATURES,
"tables": {"osm_features": "sidecar"},
"sidecar_path": str(path),
"features": inserted,
"duplicate_features_skipped": duplicate_count,
"counts": counts,
}
def ensure_osm_sidecar_schema(connection: sqlite3.Connection) -> None:
columns = _sidecar_columns(connection)
if "route_scope" not in columns:
connection.execute("ALTER TABLE osm_features ADD COLUMN route_scope TEXT")
connection.commit()
def drop_osm_sidecar_route_scope_indexes(connection: sqlite3.Connection) -> None:
for index_name in OSM_SIDECAR_ROUTE_SCOPE_INDEXES:
connection.execute(f"DROP INDEX IF EXISTS {index_name}")
def rebuild_osm_sidecar_indexes(connection: sqlite3.Connection) -> None:
_create_indexes(connection)
def osm_feature_count(session: Session, dataset_id: int, *, kind: str | Sequence[str] | None = None) -> int:
dataset = session.get(Dataset, dataset_id)
if features_are_sidecar(dataset):
kinds = _as_list(kind)
sql = "SELECT COUNT(*) FROM osm_features"
params: list[object] = []
if kinds:
placeholders = ", ".join(["?"] * len(kinds))
sql += f" WHERE kind IN ({placeholders})"
params.extend(kinds)
try:
with sidecar_connection(dataset) as connection:
return int(connection.execute(sql, params).fetchone()[0] or 0)
except MissingOsmSidecar:
return 0
stmt = select(func.count()).select_from(OsmFeature).where(OsmFeature.dataset_id == dataset_id)
kinds = _as_list(kind)
if kinds:
stmt = stmt.where(OsmFeature.kind.in_(kinds))
return int(session.scalar(stmt) or 0)
def osm_feature_bbox(
session: Session,
dataset_ids: Sequence[int],
*,
kinds: Sequence[str] | None = None,
) -> tuple[float | None, float | None, float | None, float | None]:
if not dataset_ids:
return (None, None, None, None)
datasets = {
dataset.id: dataset
for dataset in session.scalars(select(Dataset).where(Dataset.id.in_([int(value) for value in dataset_ids]))).all()
}
boxes: list[tuple[float, float, float, float]] = []
main_dataset_ids = [dataset_id for dataset_id, dataset in datasets.items() if not features_are_sidecar(dataset)]
if main_dataset_ids:
stmt = select(func.min(OsmFeature.min_lon), func.min(OsmFeature.min_lat), func.max(OsmFeature.max_lon), func.max(OsmFeature.max_lat)).where(
OsmFeature.dataset_id.in_(main_dataset_ids)
)
if kinds:
stmt = stmt.where(OsmFeature.kind.in_(list(kinds)))
row = session.execute(stmt).one()
if None not in row:
boxes.append((float(row[0]), float(row[1]), float(row[2]), float(row[3])))
for dataset in datasets.values():
if not features_are_sidecar(dataset):
continue
where = []
params: list[object] = []
if kinds:
placeholders = ", ".join(["?"] * len(kinds))
where.append(f"kind IN ({placeholders})")
params.extend(list(kinds))
sql = "SELECT MIN(min_lon), MIN(min_lat), MAX(max_lon), MAX(max_lat) FROM osm_features"
if where:
sql += " WHERE " + " AND ".join(where)
try:
with sidecar_connection(dataset) as connection:
row = connection.execute(sql, params).fetchone()
if row is not None and None not in row:
boxes.append((float(row[0]), float(row[1]), float(row[2]), float(row[3])))
except MissingOsmSidecar:
continue
if not boxes:
return (None, None, None, None)
return (
min(box[0] for box in boxes),
min(box[1] for box in boxes),
max(box[2] for box in boxes),
max(box[3] for box in boxes),
)
def query_osm_features(
session: Session,
dataset_ids: Sequence[int],
*,
kinds: Sequence[str] | None = None,
modes: Sequence[str] | None = None,
bbox: tuple[float, float, float, float] | None = None,
geometry_required: bool | None = None,
search: str | None = None,
route_key: str | None = None,
route_scopes: Sequence[str] | None = None,
ref: str | None = None,
osm_type: str | None = None,
osm_id: str | None = None,
limit: int | None = None,
offset: int | None = None,
prefer_materialized_ids: bool = True,
) -> list[OsmFeature]:
if not dataset_ids:
return []
datasets = {
dataset.id: dataset
for dataset in session.scalars(select(Dataset).where(Dataset.id.in_([int(value) for value in dataset_ids]))).all()
}
materialized_ids = _materialized_ids_by_identity(session, list(datasets)) if prefer_materialized_ids else {}
rows: list[OsmFeature] = []
main_dataset_ids = [dataset_id for dataset_id, dataset in datasets.items() if not features_are_sidecar(dataset)]
if main_dataset_ids:
stmt = select(OsmFeature).where(OsmFeature.dataset_id.in_(main_dataset_ids))
stmt = _apply_main_filters(
stmt,
kinds=kinds,
modes=modes,
bbox=bbox,
geometry_required=geometry_required,
search=search,
route_key=route_key,
route_scopes=route_scopes,
ref=ref,
osm_type=osm_type,
osm_id=osm_id,
)
if offset:
stmt = stmt.offset(max(0, int(offset)))
rows.extend(
session.scalars(
stmt.order_by(OsmFeature.kind, OsmFeature.mode, OsmFeature.ref, OsmFeature.name, OsmFeature.id).limit(limit)
).all()
)
for dataset_id, dataset in datasets.items():
if not features_are_sidecar(dataset):
continue
rows.extend(
_query_sidecar_features(
dataset,
kinds=kinds,
modes=modes,
bbox=bbox,
geometry_required=geometry_required,
search=search,
route_key=route_key,
route_scopes=route_scopes,
ref=ref,
osm_type=osm_type,
osm_id=osm_id,
limit=limit,
offset=offset,
materialized_ids=materialized_ids,
)
)
rows.sort(key=lambda row: (row.kind or "", row.mode or "", row.ref or "", row.name or "", int(row.id or 0)))
if limit is not None:
return rows[: max(1, int(limit))]
return rows
def get_osm_feature(session: Session, feature_id: int) -> OsmFeature | None:
return session.get(OsmFeature, feature_id)
def osm_feature_identity_key(feature: OsmFeature) -> str:
return f"{feature.dataset_id}|{feature.osm_type}|{feature.osm_id}"
def osm_feature_public_id(feature: OsmFeature) -> int | str | None:
if getattr(feature, "_osm_sidecar_source", False):
return osm_feature_identity_key(feature)
return feature.id
def resolve_osm_feature(session: Session, value: int | str) -> OsmFeature | None:
int_value = _safe_int(value)
if int_value is not None:
feature = session.get(OsmFeature, int_value)
if feature is not None:
return feature
parsed = parse_osm_feature_identity_key(str(value))
if parsed is None:
return None
dataset_id, osm_type, osm_id = parsed
existing = session.scalar(
select(OsmFeature).where(
OsmFeature.dataset_id == dataset_id,
OsmFeature.osm_type == osm_type,
OsmFeature.osm_id == osm_id,
)
)
if existing is not None:
return existing
dataset = session.get(Dataset, dataset_id)
if not features_are_sidecar(dataset):
return None
try:
with sidecar_connection(dataset) as connection:
select_columns = ", ".join(_sidecar_select_columns(_sidecar_columns(connection)))
row = connection.execute(
f"""
SELECT id, {select_columns}
FROM osm_features
WHERE dataset_id = ?
AND osm_type = ?
AND osm_id = ?
""",
(dataset_id, osm_type, osm_id),
).fetchone()
except MissingOsmSidecar:
return None
if row is None:
return None
return _feature_from_row(row, {})
def parse_osm_feature_identity_key(value: str) -> tuple[int, str, str] | None:
parts = value.split("|", 2)
if len(parts) != 3:
return None
dataset_id = _safe_int(parts[0])
if dataset_id is None:
return None
osm_type = parts[1].strip()
osm_id = parts[2].strip()
if not osm_type or not osm_id:
return None
return dataset_id, osm_type, osm_id
def ensure_main_osm_feature(session: Session, feature: OsmFeature) -> OsmFeature:
existing = session.scalar(
select(OsmFeature).where(
OsmFeature.dataset_id == feature.dataset_id,
OsmFeature.osm_type == feature.osm_type,
OsmFeature.osm_id == feature.osm_id,
)
)
if existing is not None:
return existing
values = dict(
dataset_id=feature.dataset_id,
osm_type=feature.osm_type,
osm_id=feature.osm_id,
kind=feature.kind,
mode=feature.mode,
route_scope=feature.route_scope,
name=feature.name,
ref=feature.ref,
operator=feature.operator,
network=feature.network,
geometry_geojson=feature.geometry_geojson,
min_lon=feature.min_lon,
min_lat=feature.min_lat,
max_lon=feature.max_lon,
max_lat=feature.max_lat,
tags_json=feature.tags_json,
route_key=feature.route_key,
operator_key=feature.operator_key,
)
if settings.is_postgresql_database:
session.execute(
postgresql_insert(OsmFeature)
.values(**values)
.on_conflict_do_nothing(index_elements=["dataset_id", "osm_type", "osm_id"])
)
else:
session.execute(insert(OsmFeature).values(**values).prefix_with("OR IGNORE"))
session.flush()
refresh_postgis_geometries(session, dataset_id=feature.dataset_id, tables=["osm_features"])
existing = session.scalar(
select(OsmFeature).where(
OsmFeature.dataset_id == feature.dataset_id,
OsmFeature.osm_type == feature.osm_type,
OsmFeature.osm_id == feature.osm_id,
)
)
if existing is None:
raise RuntimeError(f"Could not materialize OSM feature {feature.dataset_id}:{feature.osm_type}:{feature.osm_id}")
return existing
def materialize_osm_features(session: Session, features: Sequence[OsmFeature]) -> list[OsmFeature]:
return [ensure_main_osm_feature(session, feature) for feature in features]
def _new_sidecar_path(dataset: Dataset, source_hash: str | None) -> Path:
suffix = (source_hash or dataset.sha256 or str(dataset.id))[:12]
return settings.data_dir / "sidecars" / f"source_{dataset.source_id}" / f"osm_dataset_{dataset.id}_{suffix}.sqlite"
def dedupe_osm_feature_rows(rows: Sequence[dict[str, object]]) -> tuple[list[dict[str, object]], int]:
selected: dict[tuple[int, str, str], dict[str, object]] = {}
for row in rows:
key = (int(row["dataset_id"]), str(row["osm_type"]), str(row["osm_id"]))
current = selected.get(key)
if current is None or _feature_row_preference(row) < _feature_row_preference(current):
selected[key] = dict(row)
return list(selected.values()), max(0, len(rows) - len(selected))
def _feature_row_preference(row: dict[str, object]) -> tuple[int, int, int]:
kind_rank = {
"route": 0,
"station": 1,
"terminal": 2,
"stop": 3,
"infra": 4,
"feature": 5,
}.get(str(row.get("kind") or "feature"), 6)
has_geometry = 0 if row.get("geometry_geojson") else 1
geometry_size = -len(str(row.get("geometry_geojson") or ""))
return (kind_rank, has_geometry, geometry_size)
def _create_schema(connection: sqlite3.Connection) -> None:
connection.execute(
"""
CREATE TABLE osm_features (
id INTEGER PRIMARY KEY,
dataset_id INTEGER NOT NULL,
osm_type TEXT NOT NULL,
osm_id TEXT NOT NULL,
kind TEXT NOT NULL,
mode TEXT,
route_scope TEXT,
name TEXT,
ref TEXT,
operator TEXT,
network TEXT,
geometry_geojson TEXT,
min_lon REAL,
min_lat REAL,
max_lon REAL,
max_lat REAL,
tags_json TEXT,
route_key TEXT,
operator_key TEXT,
UNIQUE(dataset_id, osm_type, osm_id)
)
"""
)
def _create_indexes(connection: sqlite3.Connection) -> None:
statements = [
"CREATE INDEX IF NOT EXISTS ix_osm_sidecar_kind_mode_bbox ON osm_features (kind, mode, min_lon, max_lon, min_lat, max_lat)",
"CREATE INDEX IF NOT EXISTS ix_osm_sidecar_scope_bbox ON osm_features (kind, mode, route_scope, min_lon, max_lon, min_lat, max_lat)",
"CREATE INDEX IF NOT EXISTS ix_osm_sidecar_route_key ON osm_features (route_key)",
"CREATE INDEX IF NOT EXISTS ix_osm_sidecar_ref ON osm_features (ref)",
"CREATE INDEX IF NOT EXISTS ix_osm_sidecar_identity ON osm_features (dataset_id, osm_type, osm_id)",
"CREATE INDEX IF NOT EXISTS ix_osm_sidecar_kind_ref_mode ON osm_features (kind, ref, mode)",
]
for statement in statements:
connection.execute(statement)
def _apply_main_filters(stmt, *, kinds, modes, bbox, geometry_required, search, route_key, route_scopes, ref, osm_type, osm_id):
if kinds:
stmt = stmt.where(OsmFeature.kind.in_(list(kinds)))
if modes:
stmt = stmt.where(OsmFeature.mode.in_(list(modes)))
if route_scopes:
stmt = stmt.where(_main_route_scope_condition([str(scope) for scope in route_scopes]))
if bbox:
min_lon, min_lat, max_lon, max_lat = bbox
if settings.is_postgresql_database:
stmt = stmt.where(
text(
"""
(
osm_features.geom && ST_MakeEnvelope(:bbox_min_lon, :bbox_min_lat, :bbox_max_lon, :bbox_max_lat, 4326)
OR (
osm_features.geom IS NULL
AND osm_features.min_lon <= :bbox_max_lon
AND osm_features.max_lon >= :bbox_min_lon
AND osm_features.min_lat <= :bbox_max_lat
AND osm_features.max_lat >= :bbox_min_lat
)
)
"""
)
).params(
bbox_min_lon=min_lon,
bbox_min_lat=min_lat,
bbox_max_lon=max_lon,
bbox_max_lat=max_lat,
)
else:
stmt = stmt.where(OsmFeature.min_lon <= max_lon, OsmFeature.max_lon >= min_lon, OsmFeature.min_lat <= max_lat, OsmFeature.max_lat >= min_lat)
if geometry_required is True:
stmt = stmt.where(OsmFeature.geometry_geojson.is_not(None))
elif geometry_required is False:
stmt = stmt.where(OsmFeature.geometry_geojson.is_(None))
if search:
if settings.is_postgresql_database:
stmt = stmt.where(
text(
"""
(
LOWER(COALESCE(osm_features.ref, '')) LIKE :search_pattern
OR LOWER(COALESCE(osm_features.name, '')) LIKE :search_pattern
OR LOWER(COALESCE(osm_features.tags_json, '')) LIKE :search_pattern
)
"""
)
).params(search_pattern=f"%{search.lower()}%")
else:
pattern = f"%{search}%"
stmt = stmt.where(
(OsmFeature.ref.ilike(pattern))
| (OsmFeature.name.ilike(pattern))
| (OsmFeature.tags_json.ilike(pattern))
)
if route_key:
stmt = stmt.where(OsmFeature.route_key == route_key)
if ref:
stmt = stmt.where(OsmFeature.ref == ref)
if osm_type:
stmt = stmt.where(OsmFeature.osm_type == osm_type)
if osm_id:
stmt = stmt.where(OsmFeature.osm_id == osm_id)
return stmt
def _main_route_scope_condition(route_scopes: list[str]):
fallback = _main_route_scope_fallback_condition(route_scopes)
stored = OsmFeature.route_scope.in_(route_scopes)
if "local" in route_scopes:
non_local_bus_fallback = _main_route_scope_fallback_condition(["long_distance", "regional"])
stored = and_(stored, not_(and_(OsmFeature.mode.in_(["bus", "trolleybus"]), non_local_bus_fallback)))
return or_(stored, fallback)
def _main_route_scope_fallback_condition(route_scopes: list[str]):
ref = func.upper(func.coalesce(OsmFeature.ref, ""))
name = func.upper(func.coalesce(OsmFeature.name, ""))
network = func.upper(func.coalesce(OsmFeature.network, ""))
tags = func.lower(func.coalesce(OsmFeature.tags_json, ""))
train_long_distance = and_(
OsmFeature.mode == "train",
or_(
ref.like("ICE%"),
ref.like("IC%"),
ref.like("EC%"),
ref.like("ECE%"),
ref.like("EN%"),
ref.like("NJ%"),
ref.like("RJ%"),
ref.like("RJX%"),
ref.like("TGV%"),
ref.like("THA%"),
ref.like("FLX%"),
name.like("%INTERCITY%"),
name.like("%EUROCITY%"),
name.like("%NIGHTJET%"),
name.like("%FLIXTRAIN%"),
tags.like('%"service":"long_distance"%'),
tags.like('%"train":"long_distance"%'),
tags.like('%"train":"high_speed"%'),
tags.like('%"train":"intercity"%'),
),
)
bus_long_distance = and_(
OsmFeature.mode.in_(["bus", "trolleybus"]),
or_(
name.like("%FLIXBUS%"),
network.like("%FLIXBUS%"),
name.like("%EUROLINES%"),
network.like("%EUROLINES%"),
name.like("%INTERCITYBUS%"),
name.like("%IC BUS%"),
name.like("%FERNBUS%"),
tags.like('%"service":"long_distance"%'),
tags.like('%"bus":"long_distance"%'),
tags.like('%"bus":"intercity"%'),
tags.like('%"network:type":"long_distance"%'),
),
)
long_distance = or_(OsmFeature.mode == "coach", train_long_distance, bus_long_distance)
bus_regional = and_(
OsmFeature.mode.in_(["bus", "trolleybus"]),
not_(bus_long_distance),
or_(
name.like("%REGIONALBUS%"),
name.like("%REGIOBUS%"),
name.like("%REGIONAL BUS%"),
name.like("%REGIONALVERKEHR%"),
network.like("%REGIONALBUS%"),
network.like("%REGIOBUS%"),
network.like("%REGIONALVERKEHR%"),
tags.like('%"service":"regional"%'),
tags.like('%"bus":"regional"%'),
tags.like('%"bus":"interurban"%'),
tags.like('%"network:type":"regional"%'),
),
)
local = or_(
OsmFeature.mode.in_(["tram", "light_rail", "subway", "ferry", "funicular", "aerialway", "monorail"]),
and_(OsmFeature.mode.in_(["bus", "trolleybus"]), not_(or_(bus_long_distance, bus_regional))),
and_(
OsmFeature.mode == "train",
or_(ref.like("S%"), name.like("%S-BAHN%"), network.like("%S-BAHN%"), tags.like('%"train":"commuter"%')),
),
)
train_regional = and_(
OsmFeature.mode == "train",
not_(train_long_distance),
or_(
ref.like("IRE%"),
ref.like("RE%"),
ref.like("RB%"),
ref.like("RER%"),
ref.like("TER%"),
ref.like("REX%"),
ref.like("MEX%"),
ref.like("ALX%"),
ref.like("WFB%"),
ref.like("R%"),
name.like("%REGIONAL%"),
name.like("%REGIO%"),
tags.like('%"service":"regional"%'),
tags.like('%"train":"regional"%'),
),
)
regional = or_(train_regional, bus_regional)
conditions = []
if "long_distance" in route_scopes:
conditions.append(long_distance)
if "regional" in route_scopes:
conditions.append(regional)
if "local" in route_scopes:
conditions.append(local)
if "unknown" in route_scopes:
conditions.append(and_(OsmFeature.mode == "train", not_(or_(long_distance, regional, local))))
return or_(*conditions) if conditions else OsmFeature.route_scope.is_(None)
def _query_sidecar_features(
dataset: Dataset,
*,
kinds: Sequence[str] | None,
modes: Sequence[str] | None,
bbox: tuple[float, float, float, float] | None,
geometry_required: bool | None,
search: str | None,
route_key: str | None,
route_scopes: Sequence[str] | None,
ref: str | None,
osm_type: str | None,
osm_id: str | None,
limit: int | None,
offset: int | None,
materialized_ids: dict[tuple[int, str, str], int],
) -> list[OsmFeature]:
where = []
params: list[object] = []
try:
with sidecar_connection(dataset) as connection:
available_columns = _sidecar_columns(connection)
if kinds:
placeholders = ", ".join(["?"] * len(kinds))
where.append(f"kind IN ({placeholders})")
params.extend(list(kinds))
if modes:
placeholders = ", ".join(["?"] * len(modes))
where.append(f"mode IN ({placeholders})")
params.extend(list(modes))
if bbox:
min_lon, min_lat, max_lon, max_lat = bbox
where.extend(["min_lon <= ?", "max_lon >= ?", "min_lat <= ?", "max_lat >= ?"])
params.extend([max_lon, min_lon, max_lat, min_lat])
if geometry_required is True:
where.append("geometry_geojson IS NOT NULL")
elif geometry_required is False:
where.append("geometry_geojson IS NULL")
if search:
where.append("(LOWER(COALESCE(ref, '')) LIKE ? OR LOWER(COALESCE(name, '')) LIKE ? OR LOWER(COALESCE(tags_json, '')) LIKE ?)")
pattern = f"%{search.lower()}%"
params.extend([pattern, pattern, pattern])
if route_key:
where.append("route_key = ?")
params.append(route_key)
if route_scopes:
condition, condition_params = _sidecar_route_scope_condition([str(scope) for scope in route_scopes], has_route_scope="route_scope" in available_columns)
where.append(condition)
params.extend(condition_params)
if ref:
where.append("ref = ?")
params.append(ref)
if osm_type:
where.append("osm_type = ?")
params.append(osm_type)
if osm_id:
where.append("osm_id = ?")
params.append(osm_id)
select_columns = ", ".join(_sidecar_select_columns(available_columns))
sql = f"SELECT id, {select_columns} FROM osm_features"
if where:
sql += " WHERE " + " AND ".join(where)
sql += " ORDER BY kind, mode, ref, name, id"
if limit is not None:
sql += " LIMIT ?"
params.append(max(1, int(limit)))
if offset:
if limit is None:
sql += " LIMIT -1"
sql += " OFFSET ?"
params.append(max(0, int(offset)))
return [_feature_from_row(row, materialized_ids) for row in connection.execute(sql, params).fetchall()]
except MissingOsmSidecar:
return []
def _sidecar_columns(connection: sqlite3.Connection) -> set[str]:
return {str(row["name"]) for row in connection.execute("PRAGMA table_info(osm_features)").fetchall()}
def _sidecar_select_columns(available_columns: set[str]) -> list[str]:
return [column if column in available_columns else f"NULL AS {column}" for column in OSM_FEATURE_COLUMNS]
def _sidecar_route_scope_condition(route_scopes: list[str], *, has_route_scope: bool) -> tuple[str, list[object]]:
fallback_sql, fallback_params = _sidecar_route_scope_fallback_condition(route_scopes)
if has_route_scope:
placeholders = ", ".join(["?"] * len(route_scopes))
stored_sql = f"route_scope IN ({placeholders})"
params: list[object] = [*route_scopes]
if "local" in route_scopes:
non_local_sql, non_local_params = _sidecar_route_scope_fallback_condition(["long_distance", "regional"])
stored_sql = f"({stored_sql} AND NOT (mode IN ('bus', 'trolleybus') AND {non_local_sql}))"
params.extend(non_local_params)
return f"({stored_sql} OR {fallback_sql})", [*params, *fallback_params]
return fallback_sql, fallback_params
def _sidecar_route_scope_fallback_condition(route_scopes: list[str]) -> tuple[str, list[object]]:
train_long_distance = """(
mode = 'train'
AND (
UPPER(COALESCE(ref, '')) LIKE 'ICE%'
OR UPPER(COALESCE(ref, '')) LIKE 'IC%'
OR UPPER(COALESCE(ref, '')) LIKE 'EC%'
OR UPPER(COALESCE(ref, '')) LIKE 'ECE%'
OR UPPER(COALESCE(ref, '')) LIKE 'EN%'
OR UPPER(COALESCE(ref, '')) LIKE 'NJ%'
OR UPPER(COALESCE(ref, '')) LIKE 'RJ%'
OR UPPER(COALESCE(ref, '')) LIKE 'RJX%'
OR UPPER(COALESCE(ref, '')) LIKE 'TGV%'
OR UPPER(COALESCE(ref, '')) LIKE 'THA%'
OR UPPER(COALESCE(ref, '')) LIKE 'FLX%'
OR UPPER(COALESCE(name, '')) LIKE '%INTERCITY%'
OR UPPER(COALESCE(name, '')) LIKE '%EUROCITY%'
OR UPPER(COALESCE(name, '')) LIKE '%NIGHTJET%'
OR UPPER(COALESCE(name, '')) LIKE '%FLIXTRAIN%'
OR LOWER(COALESCE(tags_json, '')) LIKE '%"service":"long_distance"%'
OR LOWER(COALESCE(tags_json, '')) LIKE '%"train":"long_distance"%'
OR LOWER(COALESCE(tags_json, '')) LIKE '%"train":"high_speed"%'
OR LOWER(COALESCE(tags_json, '')) LIKE '%"train":"intercity"%'
)
)"""
bus_long_distance = """(
mode IN ('bus', 'trolleybus')
AND (
UPPER(COALESCE(name, '')) LIKE '%FLIXBUS%'
OR UPPER(COALESCE(network, '')) LIKE '%FLIXBUS%'
OR UPPER(COALESCE(name, '')) LIKE '%EUROLINES%'
OR UPPER(COALESCE(network, '')) LIKE '%EUROLINES%'
OR UPPER(COALESCE(name, '')) LIKE '%INTERCITYBUS%'
OR UPPER(COALESCE(name, '')) LIKE '%IC BUS%'
OR UPPER(COALESCE(name, '')) LIKE '%FERNBUS%'
OR LOWER(COALESCE(tags_json, '')) LIKE '%"service":"long_distance"%'
OR LOWER(COALESCE(tags_json, '')) LIKE '%"bus":"long_distance"%'
OR LOWER(COALESCE(tags_json, '')) LIKE '%"bus":"intercity"%'
OR LOWER(COALESCE(tags_json, '')) LIKE '%"network:type":"long_distance"%'
)
)"""
long_distance = f"(mode = 'coach' OR {train_long_distance} OR {bus_long_distance})"
bus_regional = f"""(
mode IN ('bus', 'trolleybus')
AND NOT {bus_long_distance}
AND (
UPPER(COALESCE(name, '')) LIKE '%REGIONALBUS%'
OR UPPER(COALESCE(name, '')) LIKE '%REGIOBUS%'
OR UPPER(COALESCE(name, '')) LIKE '%REGIONAL BUS%'
OR UPPER(COALESCE(name, '')) LIKE '%REGIONALVERKEHR%'
OR UPPER(COALESCE(network, '')) LIKE '%REGIONALBUS%'
OR UPPER(COALESCE(network, '')) LIKE '%REGIOBUS%'
OR UPPER(COALESCE(network, '')) LIKE '%REGIONALVERKEHR%'
OR LOWER(COALESCE(tags_json, '')) LIKE '%"service":"regional"%'
OR LOWER(COALESCE(tags_json, '')) LIKE '%"bus":"regional"%'
OR LOWER(COALESCE(tags_json, '')) LIKE '%"bus":"interurban"%'
OR LOWER(COALESCE(tags_json, '')) LIKE '%"network:type":"regional"%'
)
)"""
train_regional = f"""(
mode = 'train'
AND NOT {train_long_distance}
AND (
UPPER(COALESCE(ref, '')) LIKE 'IRE%'
OR UPPER(COALESCE(ref, '')) LIKE 'RE%'
OR UPPER(COALESCE(ref, '')) LIKE 'RB%'
OR UPPER(COALESCE(ref, '')) LIKE 'RER%'
OR UPPER(COALESCE(ref, '')) LIKE 'TER%'
OR UPPER(COALESCE(ref, '')) LIKE 'REX%'
OR UPPER(COALESCE(ref, '')) LIKE 'MEX%'
OR UPPER(COALESCE(ref, '')) LIKE 'ALX%'
OR UPPER(COALESCE(ref, '')) LIKE 'WFB%'
OR UPPER(COALESCE(ref, '')) LIKE 'R%'
OR UPPER(COALESCE(name, '')) LIKE '%REGIONAL%'
OR UPPER(COALESCE(name, '')) LIKE '%REGIO%'
OR LOWER(COALESCE(tags_json, '')) LIKE '%"service":"regional"%'
OR LOWER(COALESCE(tags_json, '')) LIKE '%"train":"regional"%'
)
)"""
regional = f"({train_regional} OR {bus_regional})"
local = f"""(
mode IN ('tram', 'light_rail', 'subway', 'ferry', 'funicular', 'aerialway', 'monorail')
OR (mode IN ('bus', 'trolleybus') AND NOT ({bus_long_distance} OR {bus_regional}))
OR (
mode = 'train'
AND (
UPPER(COALESCE(ref, '')) LIKE 'S%'
OR UPPER(COALESCE(name, '')) LIKE '%S-BAHN%'
OR UPPER(COALESCE(network, '')) LIKE '%S-BAHN%'
OR LOWER(COALESCE(tags_json, '')) LIKE '%"train":"commuter"%'
)
)
)"""
parts = []
if "long_distance" in route_scopes:
parts.append(long_distance)
if "regional" in route_scopes:
parts.append(regional)
if "local" in route_scopes:
parts.append(local)
if "unknown" in route_scopes:
parts.append(f"(mode = 'train' AND NOT ({long_distance} OR {regional} OR {local}))")
return "(" + " OR ".join(parts or ["0"]) + ")", []
def _feature_from_row(row: sqlite3.Row, materialized_ids: dict[tuple[int, str, str], int]) -> OsmFeature:
dataset_id = int(row["dataset_id"])
osm_type = str(row["osm_type"])
osm_id = str(row["osm_id"])
feature_id = materialized_ids.get((dataset_id, osm_type, osm_id), int(row["id"]))
feature = OsmFeature(
id=feature_id,
dataset_id=dataset_id,
osm_type=osm_type,
osm_id=osm_id,
kind=str(row["kind"]),
mode=row["mode"],
route_scope=row["route_scope"],
name=row["name"],
ref=row["ref"],
operator=row["operator"],
network=row["network"],
geometry_geojson=row["geometry_geojson"],
min_lon=row["min_lon"],
min_lat=row["min_lat"],
max_lon=row["max_lon"],
max_lat=row["max_lat"],
tags_json=row["tags_json"],
route_key=row["route_key"],
operator_key=row["operator_key"],
)
setattr(feature, "_osm_sidecar_source", True)
setattr(feature, "_osm_sidecar_row_id", int(row["id"]))
return feature
def _materialized_ids_by_identity(session: Session, dataset_ids: Sequence[int]) -> dict[tuple[int, str, str], int]:
if not dataset_ids:
return {}
rows = session.execute(
select(OsmFeature.dataset_id, OsmFeature.osm_type, OsmFeature.osm_id, OsmFeature.id).where(OsmFeature.dataset_id.in_(dataset_ids))
).all()
return {(int(dataset_id), str(osm_type), str(osm_id)): int(feature_id) for dataset_id, osm_type, osm_id, feature_id in rows}
def _as_list(value: str | Sequence[str] | None) -> list[str]:
if value is None:
return []
if isinstance(value, str):
return [value]
return [str(item) for item in value]
def _safe_int(value: object) -> int | None:
try:
return int(value) # type: ignore[arg-type]
except (TypeError, ValueError):
return None