Files
meubility-workbench/app/jobs.py
2026-07-01 23:29:51 +02:00

1933 lines
71 KiB
Python

from __future__ import annotations
import json
import os
import threading
import time
from contextlib import contextmanager
from datetime import datetime, timedelta, timezone
from typing import Any, Iterator
from uuid import uuid4
from sqlalchemy import func, select, text
from sqlalchemy.orm import Session
from app.config import settings
from app.data_management import (
delete_dataset,
delete_source,
prune_inactive_datasets,
prune_unreferenced_cache_files,
unreferenced_cache_file_summary,
)
from app.db import SessionLocal, engine, init_db
from app.db_lock import DatabaseWriteBusy, database_write_lock
from app.gtfs_storage import missing_sidecar_paths as gtfs_missing_sidecar_paths
from app.models import Dataset, Job, JobEvent, Source, SourceUpdateCheck
from app.osm_storage import missing_sidecar_paths as osm_missing_sidecar_paths
from app.pipeline.gtfs import backfill_gtfs_shapes
from app.pipeline.matcher import run_route_matching
from app.pipeline.osm_addresses import rebuild_address_index
from app.pipeline.osm_labeling import relabel_osm_features
from app.pipeline.osm_pbf import run_osm_pbf_source_staged
from app.pipeline.route_layer import rebuild_route_layer
from app.pipeline.run import run_source
from app.pipeline.sample_data import clear_project_data, load_sample_project
from app.source_catalog import import_ingestable_sources, import_source_catalog, source_catalog_summary
ROUTE_MATCHING_JOB_KIND = "route_matching"
ROUTE_LAYER_JOB_KIND = "route_layer_rebuild"
ADDRESS_INDEX_JOB_KIND = "address_index_rebuild"
OSM_RELABEL_JOB_KIND = "osm_relabel"
SOURCE_IMPORT_JOB_KIND = "source_import"
SOURCE_DELETE_JOB_KIND = "source_delete"
DATASET_DELETE_JOB_KIND = "dataset_delete"
MAINTENANCE_JOB_KIND = "maintenance"
TERMINAL_JOB_STATUSES = {"completed", "failed", "cancelled"}
ACTIVE_JOB_STATUSES = {"queued", "running", "paused"}
LEASE_SECONDS = max(300, int(settings.queue_job_lease_seconds))
HEARTBEAT_INTERVAL_SECONDS = 60
class JobPaused(Exception):
pass
class JobCancelled(Exception):
pass
def create_route_layer_rebuild_job(session: Session, *, priority: int = 0) -> Job:
job = Job(
kind=ROUTE_LAYER_JOB_KIND,
status="queued",
description="Rebuild visual route layer from active GTFS and OSM datasets",
progress_current=0,
progress_total=4,
priority=int(priority),
)
session.add(job)
session.flush()
add_job_event(
session,
job,
event_type="queued",
message="Route-layer rebuild queued.",
progress_current=0,
progress_total=4,
)
return job
def create_address_index_rebuild_job(session: Session, *, priority: int = 0) -> Job:
job = Job(
kind=ADDRESS_INDEX_JOB_KIND,
status="queued",
description="Rebuild OSM address index from the active OSM PBF dataset",
progress_current=0,
progress_total=4,
priority=int(priority),
)
session.add(job)
session.flush()
add_job_event(
session,
job,
event_type="queued",
message="OSM address index rebuild queued.",
progress_current=0,
progress_total=4,
)
return job
def create_route_matching_job(session: Session, *, priority: int = 0) -> Job:
job = Job(
kind=ROUTE_MATCHING_JOB_KIND,
status="queued",
description="Match active GTFS routes against active OSM route features",
progress_current=0,
progress_total=0,
priority=int(priority),
)
session.add(job)
session.flush()
add_job_event(
session,
job,
event_type="queued",
message="Route matching queued.",
progress_current=0,
progress_total=0,
)
return job
def create_osm_relabel_job(
session: Session,
*,
dataset_id: int | None = None,
build_route_layer: bool = True,
force: bool = False,
priority: int = 0,
) -> Job:
description = "Relabel active OSM features"
if dataset_id is not None:
description = f"Relabel OSM features for dataset {dataset_id}"
job = Job(
kind=OSM_RELABEL_JOB_KIND,
status="queued",
description=description,
progress_current=0,
progress_total=2 if build_route_layer else 1,
priority=int(priority),
result_json=json.dumps(
{
"dataset_id": dataset_id,
"build_route_layer": build_route_layer,
"force": force,
},
separators=(",", ":"),
),
)
session.add(job)
session.flush()
add_job_event(
session,
job,
event_type="queued",
message="OSM relabeling queued.",
progress_current=0,
progress_total=job.progress_total,
)
return job
def start_route_layer_rebuild_worker(job_id: int) -> None:
_ = job_id
def create_source_import_job(
session: Session,
source: Source,
*,
run_match: bool = True,
build_route_layer: bool = True,
priority: int = 0,
recovery_reason: str | None = None,
) -> Job:
active_job = active_source_import_job(session, source.id)
if active_job is not None:
return active_job
description = f"Import source {source.id}: {source.name}"
if recovery_reason:
description = f"Recover source {source.id}: {source.name}"
result = {
"source_id": source.id,
"source_name": source.name,
"run_match": run_match,
"build_route_layer": build_route_layer,
"queued_pid": os.getpid(),
}
if recovery_reason:
result["recovery_reason"] = recovery_reason
job = Job(
kind=SOURCE_IMPORT_JOB_KIND,
status="queued",
description=description,
progress_current=0,
progress_total=4 if build_route_layer else 3,
priority=int(priority),
result_json=json.dumps(result, separators=(",", ":")),
)
session.add(job)
source.status = "queued"
source.last_error = None
session.flush()
add_job_event(
session,
job,
event_type="queued",
message=f"Recovery import queued for {source.name}: {recovery_reason}" if recovery_reason else f"Source import queued for {source.name}.",
progress_current=0,
progress_total=job.progress_total,
)
return job
def queue_missing_gtfs_sidecar_recovery_jobs(session: Session, *, priority: int = 20) -> int:
queued = 0
seen_source_ids: set[int] = set()
datasets = session.scalars(
select(Dataset).where(Dataset.kind.in_(["gtfs", "osm_geojson"]), Dataset.is_active.is_(True))
).all()
for dataset in datasets:
if dataset.kind == "gtfs":
storage_kind = "GTFS"
missing_paths = gtfs_missing_sidecar_paths(dataset)
elif dataset.kind == "osm_geojson":
storage_kind = "OSM"
missing_paths = osm_missing_sidecar_paths(dataset)
else:
continue
if not missing_paths:
continue
dataset.status = "missing_files"
source = session.get(Source, dataset.source_id)
if source is None or not source.enabled or source.id in seen_source_ids:
continue
seen_source_ids.add(source.id)
if (
active_source_import_job(session, source.id) is not None
or active_source_delete_job(session, source.id) is not None
or active_dataset_delete_job(session, dataset.id) is not None
):
continue
reason = f"{storage_kind} sidecar missing for dataset #{dataset.id}: {', '.join(missing_paths)}"
create_source_import_job(
session,
source,
run_match=True,
build_route_layer=True,
priority=priority,
recovery_reason=reason,
)
queued += 1
if queued:
session.flush()
return queued
def create_source_delete_job(session: Session, source: Source, *, priority: int = 50) -> Job:
active_job = active_source_delete_job(session, source.id)
if active_job is not None:
return active_job
job = Job(
kind=SOURCE_DELETE_JOB_KIND,
status="queued",
description=f"Delete source {source.id}: {source.name}",
progress_current=0,
progress_total=3,
priority=int(priority),
result_json=json.dumps(
{
"source_id": source.id,
"source_name": source.name,
"queued_pid": os.getpid(),
},
separators=(",", ":"),
),
)
session.add(job)
source.status = "queued"
source.last_error = None
session.flush()
add_job_event(
session,
job,
event_type="queued",
message=f"Source deletion queued for {source.name}.",
progress_current=0,
progress_total=job.progress_total,
)
return job
def create_dataset_delete_job(session: Session, dataset: Dataset, *, priority: int = 50) -> Job:
active_job = active_dataset_delete_job(session, dataset.id)
if active_job is not None:
return active_job
job = Job(
kind=DATASET_DELETE_JOB_KIND,
status="queued",
description=f"Delete dataset {dataset.id}: {dataset.kind}",
progress_current=0,
progress_total=3,
priority=int(priority),
result_json=json.dumps(
{
"dataset_id": dataset.id,
"dataset_kind": dataset.kind,
"dataset_status": dataset.status,
"source_id": dataset.source_id,
"queued_pid": os.getpid(),
},
separators=(",", ":"),
),
)
session.add(job)
dataset.status = "queued"
session.flush()
add_job_event(
session,
job,
event_type="queued",
message=f"Dataset deletion queued for dataset #{dataset.id}.",
progress_current=0,
progress_total=job.progress_total,
)
return job
def create_maintenance_job(
session: Session,
action: str,
payload: dict[str, Any] | None = None,
*,
priority: int = 0,
) -> Job:
normalized_payload = _normalize_job_payload(payload)
active_job = active_maintenance_job(session, action, normalized_payload)
if active_job is not None:
return active_job
job = Job(
kind=MAINTENANCE_JOB_KIND,
status="queued",
description=_maintenance_description(action, normalized_payload),
progress_current=0,
progress_total=_maintenance_progress_total(action),
priority=int(priority),
result_json=json.dumps(
{
"action": action,
"payload": normalized_payload,
"queued_pid": os.getpid(),
},
separators=(",", ":"),
),
)
session.add(job)
session.flush()
add_job_event(
session,
job,
event_type="queued",
message=f"{_maintenance_description(action, normalized_payload)} queued.",
progress_current=0,
progress_total=job.progress_total,
)
return job
def active_source_import_job(session: Session, source_id: int) -> Job | None:
for job in session.scalars(
select(Job)
.where(Job.kind == SOURCE_IMPORT_JOB_KIND, Job.status.in_(ACTIVE_JOB_STATUSES))
.order_by(Job.created_at.desc(), Job.id.desc())
).all():
if source_id_from_job(job) == source_id:
return job
return None
def active_source_delete_job(session: Session, source_id: int) -> Job | None:
for job in session.scalars(
select(Job)
.where(Job.kind == SOURCE_DELETE_JOB_KIND, Job.status.in_(ACTIVE_JOB_STATUSES))
.order_by(Job.created_at.desc(), Job.id.desc())
).all():
if source_id_from_job(job) == source_id:
return job
return None
def active_dataset_delete_job(session: Session, dataset_id: int) -> Job | None:
return active_dataset_delete_jobs(session).get(dataset_id)
def active_dataset_delete_jobs(session: Session) -> dict[int, Job]:
jobs_by_dataset: dict[int, Job] = {}
for job in session.scalars(
select(Job)
.where(Job.kind == DATASET_DELETE_JOB_KIND, Job.status.in_(ACTIVE_JOB_STATUSES))
.order_by(Job.created_at.desc(), Job.id.desc())
).all():
dataset_id = dataset_id_from_job(job)
if dataset_id is not None and dataset_id not in jobs_by_dataset:
jobs_by_dataset[dataset_id] = job
return jobs_by_dataset
def active_maintenance_job(session: Session, action: str, payload: dict[str, Any] | None = None) -> Job | None:
normalized_payload = _normalize_job_payload(payload)
for job in session.scalars(
select(Job)
.where(Job.kind == MAINTENANCE_JOB_KIND, Job.status.in_(ACTIVE_JOB_STATUSES))
.order_by(Job.created_at.desc(), Job.id.desc())
).all():
options = _json_object(job.result_json)
if options.get("action") == action and _normalize_job_payload(options.get("payload")) == normalized_payload:
return job
return None
def active_source_import_jobs(session: Session) -> dict[int, Job]:
jobs_by_source: dict[int, Job] = {}
for job in session.scalars(
select(Job)
.where(Job.kind == SOURCE_IMPORT_JOB_KIND, Job.status.in_(ACTIVE_JOB_STATUSES))
.order_by(Job.created_at.desc(), Job.id.desc())
).all():
source_id = source_id_from_job(job)
if source_id is not None and source_id not in jobs_by_source:
jobs_by_source[source_id] = job
return jobs_by_source
def active_source_workflow_jobs(session: Session) -> dict[int, Job]:
jobs_by_source: dict[int, Job] = {}
for job in session.scalars(
select(Job)
.where(
Job.kind.in_([SOURCE_IMPORT_JOB_KIND, SOURCE_DELETE_JOB_KIND, DATASET_DELETE_JOB_KIND]),
Job.status.in_(ACTIVE_JOB_STATUSES),
)
.order_by(Job.created_at.desc(), Job.id.desc())
).all():
source_id = source_id_from_job(job)
if source_id is not None and source_id not in jobs_by_source:
jobs_by_source[source_id] = job
return jobs_by_source
def active_address_index_rebuild_job(session: Session) -> Job | None:
return session.scalar(
select(Job)
.where(Job.kind == ADDRESS_INDEX_JOB_KIND, Job.status.in_(ACTIVE_JOB_STATUSES))
.order_by(Job.created_at.desc(), Job.id.desc())
.limit(1)
)
def reconcile_interrupted_jobs(session: Session) -> int:
"""Requeue interrupted jobs and repair stale active rows with terminal markers."""
recovered = 0
now = datetime.now(timezone.utc)
jobs = session.scalars(
select(Job).where(Job.status.in_(["queued", "running"]))
).all()
for job in jobs:
if _reconcile_terminal_marker(session, job, now):
recovered += 1
continue
if job.status != "running":
continue
worker_pid = _worker_pid_from_job(job)
worker_alive = worker_pid is not None and _pid_running(worker_pid)
lease_expires_at = _as_utc(job.lease_expires_at)
lease_expired = lease_expires_at is None or lease_expires_at < now
if worker_alive and lease_expired:
_renew_expired_live_worker_lease(session, job, worker_pid, now)
continue
if worker_alive or ((not lease_expired) and worker_pid is None):
continue
reason = "worker_process_exited" if worker_pid is not None else "worker_lease_expired"
job.status = "queued"
job.requested_action = None
job.lease_owner = None
job.lease_expires_at = None
job.paused_at = None
job.updated_at = now
job.error = None
options = _json_object(job.result_json)
options.pop("worker_pid", None)
options.pop("worker_id", None)
if options:
job.result_json = json.dumps(options, separators=(",", ":"))
add_job_event(
session,
job,
event_type="lease_expired",
message="Worker is no longer active; job returned to the queue.",
progress_current=job.progress_current,
progress_total=job.progress_total,
metadata={"reason": reason, "worker_pid": worker_pid},
)
source = _job_source(session, job)
if source is not None:
source.status = "queued"
source.last_error = None
recovered += 1
if recovered:
session.flush()
reconcile_source_workflow_state(session)
return recovered
def _reconcile_terminal_marker(session: Session, job: Job, now: datetime) -> bool:
terminal_status = _terminal_status_from_marker(session, job)
if terminal_status is None:
return False
previous_status = job.status
job.status = terminal_status
job.requested_action = None
job.lease_owner = None
job.lease_expires_at = None
job.paused_at = None
job.updated_at = now
if job.finished_at is None:
job.finished_at = now
if terminal_status == "completed":
job.error = None
if job.progress_total > 0:
job.progress_current = job.progress_total
_clear_job_control_request(job.id)
add_job_event(
session,
job,
event_type="terminal_reconciled",
message=f"Stale {previous_status} job had already reached {terminal_status}; kept it out of the queue.",
progress_current=job.progress_current,
progress_total=job.progress_total,
metadata={"previous_status": previous_status, "terminal_status": terminal_status},
)
source = _job_source(session, job)
if source is not None:
if terminal_status == "completed":
source.status = _source_status_without_active_job(session, source)
source.last_error = None
source.last_run_at = job.finished_at
elif terminal_status == "failed":
source.status = "error"
source.last_error = job.error
elif terminal_status == "cancelled":
source.status = _source_status_without_active_job(session, source)
dataset = _job_dataset(session, job)
if dataset is not None and terminal_status in {"completed", "cancelled"}:
dataset.status = str(_json_object(job.result_json).get("dataset_status") or "imported")
return True
def _terminal_status_from_marker(session: Session, job: Job) -> str | None:
latest_terminal_status = _latest_terminal_event_status(session, job.id)
if latest_terminal_status is not None:
return latest_terminal_status
if job.finished_at is not None:
return "failed" if job.error else "completed"
if _job_has_completed_result_marker(job):
return "completed"
latest_event = session.scalar(
select(JobEvent).where(JobEvent.job_id == job.id).order_by(JobEvent.created_at.desc(), JobEvent.id.desc()).limit(1)
)
if latest_event is None:
return None
return _status_from_terminal_event(latest_event.event_type)
def _latest_terminal_event_status(session: Session, job_id: int) -> str | None:
event = session.scalar(
select(JobEvent)
.where(JobEvent.job_id == job_id, JobEvent.event_type.in_(["completed", "failed", "cancelled"]))
.order_by(JobEvent.created_at.desc(), JobEvent.id.desc())
.limit(1)
)
if event is None:
return None
return _status_from_terminal_event(event.event_type)
def _status_from_terminal_event(event_type: str) -> str | None:
if event_type == "completed":
return "completed"
if event_type == "failed":
return "failed"
if event_type == "cancelled":
return "cancelled"
return None
def _job_has_completed_result_marker(job: Job) -> bool:
options = _json_object(job.result_json)
if job.kind == SOURCE_IMPORT_JOB_KIND:
if "dataset_id" not in options:
return False
if options.get("run_match") and "match_result" not in options:
return False
if options.get("build_route_layer") and "route_layer_result" not in options:
return False
return True
if job.kind == OSM_RELABEL_JOB_KIND:
if "relabel_result" not in options:
return False
if options.get("build_route_layer") and "route_layer_result" not in options:
return False
return True
return False
def reconcile_source_workflow_state(session: Session) -> int:
active_jobs = active_source_workflow_jobs(session)
changed = 0
sources = session.scalars(select(Source).where(Source.status.in_(["queued", "running", "paused"]))).all()
for source in sources:
active_job = active_jobs.get(source.id)
if active_job is not None:
expected = active_job.status
if source.status != expected:
source.status = expected
changed += 1
continue
replacement = _source_status_without_active_job(session, source)
if source.status != replacement:
source.status = replacement
changed += 1
if replacement != "error" and source.last_error == "Job was interrupted before completion.":
source.last_error = None
changed += 1
if changed:
session.flush()
return changed
def source_id_from_job(job: Job) -> int | None:
value = _json_object(job.result_json).get("source_id")
try:
return None if value is None else int(value)
except (TypeError, ValueError):
return None
def dataset_id_from_job(job: Job) -> int | None:
value = _json_object(job.result_json).get("dataset_id")
try:
return None if value is None else int(value)
except (TypeError, ValueError):
return None
def _worker_pid_from_job(job: Job) -> int | None:
value = _json_object(job.result_json).get("worker_pid")
try:
pid = int(value)
except (TypeError, ValueError):
return None
return pid if pid > 0 else None
def _pid_running(pid: int) -> bool:
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
return True
return True
def _renew_expired_live_worker_lease(session: Session, job: Job, worker_pid: int, now: datetime) -> None:
job.lease_expires_at = now + timedelta(seconds=LEASE_SECONDS)
job.updated_at = now
source = _job_source(session, job)
if source is not None:
source.status = "running"
source.last_error = None
add_job_event(
session,
job,
event_type="lease_renewed",
message="Worker process is still alive; renewed expired lease.",
progress_current=job.progress_current,
progress_total=job.progress_total,
metadata={"worker_pid": worker_pid},
)
def _source_status_without_active_job(session: Session, source: Source) -> str:
active_dataset = session.scalar(
select(Dataset)
.where(Dataset.source_id == source.id, Dataset.is_active.is_(True), Dataset.status == "imported")
.order_by(Dataset.created_at.desc(), Dataset.id.desc())
.limit(1)
)
if active_dataset is None:
return "error" if source.last_error else "new"
latest_check = session.scalar(
select(SourceUpdateCheck)
.where(SourceUpdateCheck.source_id == source.id)
.order_by(SourceUpdateCheck.checked_at.desc(), SourceUpdateCheck.id.desc())
.limit(1)
)
if latest_check is not None and latest_check.status == "checked":
return "update_available" if latest_check.update_available else "up_to_date"
return "ok"
def start_source_import_worker(job_id: int) -> None:
_ = job_id
def job_payload(job: Job) -> dict[str, Any]:
return {
"id": job.id,
"kind": job.kind,
"status": job.status,
"description": job.description,
"progress_current": job.progress_current,
"progress_total": job.progress_total,
"priority": job.priority,
"requested_action": job.requested_action,
"lease_owner": job.lease_owner,
"lease_expires_at": _iso(job.lease_expires_at),
"paused_at": _iso(job.paused_at),
"result": _json_object(job.result_json),
"error": job.error,
"dismissed_at": _iso(job.dismissed_at),
"created_at": _iso(job.created_at),
"started_at": _iso(job.started_at),
"updated_at": _iso(job.updated_at),
"finished_at": _iso(job.finished_at),
"terminal": job.status in TERMINAL_JOB_STATUSES,
}
def job_event_payload(event: JobEvent) -> dict[str, Any]:
return {
"id": event.id,
"job_id": event.job_id,
"level": event.level,
"event_type": event.event_type,
"message": event.message,
"progress_current": event.progress_current,
"progress_total": event.progress_total,
"metadata": _json_object(event.metadata_json),
"created_at": _iso(event.created_at),
}
def add_job_event(
session: Session,
job: Job,
*,
event_type: str,
message: str,
level: str = "info",
progress_current: int | None = None,
progress_total: int | None = None,
metadata: dict[str, Any] | None = None,
) -> JobEvent:
event = JobEvent(
job_id=job.id,
level=level,
event_type=event_type,
message=message,
progress_current=progress_current,
progress_total=progress_total,
metadata_json=None if metadata is None else json.dumps(metadata, separators=(",", ":")),
)
session.add(event)
return event
def latest_jobs(session: Session, *, limit: int = 20, kind: str | None = None, include_dismissed: bool = False) -> list[Job]:
stmt = select(Job).order_by(Job.created_at.desc(), Job.id.desc())
if kind:
stmt = stmt.where(Job.kind == kind)
if not include_dismissed:
stmt = stmt.where(Job.dismissed_at.is_(None))
return session.scalars(stmt.limit(max(1, min(limit, 100)))).all()
def job_queue_revision(session: Session, *, include_dismissed: bool = False) -> dict[str, Any]:
filters = []
if not include_dismissed:
filters.append(Job.dismissed_at.is_(None))
job_stmt = select(
func.count(Job.id),
func.coalesce(func.max(Job.id), 0),
func.max(Job.created_at),
func.max(Job.updated_at),
func.max(Job.finished_at),
func.max(Job.dismissed_at),
func.count(Job.id).filter(Job.status.in_(ACTIVE_JOB_STATUSES)),
func.count(Job.id).filter(Job.status == "queued"),
func.count(Job.id).filter(Job.status == "running"),
func.count(Job.id).filter(Job.status == "paused"),
)
if filters:
job_stmt = job_stmt.where(*filters)
(
job_count,
latest_job_id,
latest_job_created_at,
latest_job_updated_at,
latest_job_finished_at,
latest_job_dismissed_at,
active_count,
queued_count,
running_count,
paused_count,
) = session.execute(job_stmt).one()
event_stmt = select(func.coalesce(func.max(JobEvent.id), 0), func.max(JobEvent.created_at)).select_from(JobEvent)
if not include_dismissed:
event_stmt = event_stmt.join(Job, Job.id == JobEvent.job_id).where(Job.dismissed_at.is_(None))
latest_event_id, latest_event_created_at = session.execute(event_stmt).one()
revision_parts = [
int(job_count or 0),
int(latest_job_id or 0),
_revision_datetime(latest_job_created_at),
_revision_datetime(latest_job_updated_at),
_revision_datetime(latest_job_finished_at),
_revision_datetime(latest_job_dismissed_at),
int(latest_event_id or 0),
_revision_datetime(latest_event_created_at),
int(active_count or 0),
int(queued_count or 0),
int(running_count or 0),
int(paused_count or 0),
]
return {
"revision": "|".join(str(part) for part in revision_parts),
"job_count": int(job_count or 0),
"latest_job_id": int(latest_job_id or 0),
"latest_event_id": int(latest_event_id or 0),
"active_count": int(active_count or 0),
"queued_count": int(queued_count or 0),
"running_count": int(running_count or 0),
"paused_count": int(paused_count or 0),
"latest_job_created_at": _iso(latest_job_created_at),
"latest_job_updated_at": _iso(latest_job_updated_at),
"latest_job_finished_at": _iso(latest_job_finished_at),
"latest_event_created_at": _iso(latest_event_created_at),
}
def job_events(session: Session, job_id: int, *, limit: int = 100) -> list[JobEvent]:
return session.scalars(
select(JobEvent)
.where(JobEvent.job_id == job_id)
.order_by(JobEvent.created_at, JobEvent.id)
.limit(max(1, min(limit, 500)))
).all()
def request_job_control(job_id: int, action: str) -> dict[str, Any]:
if action not in {"pause", "cancel"}:
raise ValueError(f"unsupported job control action: {action}")
requested_at = datetime.now(timezone.utc)
payload = {
"job_id": int(job_id),
"requested_action": action,
"requested_at": requested_at.isoformat(),
"request_pid": os.getpid(),
}
_write_job_control_request(job_id, payload)
return {
"id": int(job_id),
"status": "running",
"requested_action": action,
"control_request_queued": True,
"terminal": False,
"updated_at": requested_at.isoformat(),
"result": {},
}
def run_worker_loop(
*,
worker_id: str | None = None,
poll_interval: float = 2.0,
max_jobs: int | None = None,
once: bool = False,
) -> dict[str, int | str]:
init_db()
worker = worker_id or f"worker-{os.getpid()}-{uuid4().hex[:8]}"
processed = 0
while True:
with SessionLocal() as session:
reconcile_interrupted_jobs(session)
session.commit()
try:
job_id = claim_next_job(worker)
except DatabaseWriteBusy:
if once:
return {"worker_id": worker, "processed": processed}
time.sleep(max(0.2, float(poll_interval)))
continue
if job_id is None:
if once:
return {"worker_id": worker, "processed": processed}
time.sleep(max(0.2, float(poll_interval)))
continue
run_claimed_job(job_id, worker)
processed += 1
if max_jobs is not None and processed >= max_jobs:
return {"worker_id": worker, "processed": processed}
def run_worker_once(*, worker_id: str | None = None) -> dict[str, int | str]:
return run_worker_loop(worker_id=worker_id, once=True, max_jobs=1)
def claim_next_job(worker_id: str, *, lease_seconds: int = LEASE_SECONDS) -> int | None:
with database_write_lock("job:claim", timeout=30):
with SessionLocal() as session:
reconcile_interrupted_jobs(session)
job = session.scalar(
select(Job)
.where(Job.status == "queued")
.order_by(Job.priority.desc(), Job.created_at, Job.id)
.limit(1)
)
if job is None:
session.commit()
return None
now = datetime.now(timezone.utc)
job.status = "running"
job.requested_action = None
job.lease_owner = worker_id
job.lease_expires_at = now + timedelta(seconds=lease_seconds)
job.paused_at = None
job.error = None
if job.started_at is None:
job.started_at = now
job.updated_at = now
add_job_event(
session,
job,
event_type="claimed",
message=f"Job claimed by {worker_id}.",
progress_current=job.progress_current,
progress_total=job.progress_total,
metadata={"worker_id": worker_id},
)
source = _job_source(session, job)
if source is not None:
source.status = "running"
source.last_error = None
dataset = _job_dataset(session, job)
if dataset is not None:
dataset.status = "running"
session.commit()
return int(job.id)
def run_claimed_job(job_id: int, worker_id: str) -> None:
init_db()
try:
with _job_heartbeat_context(job_id, worker_id):
with SessionLocal() as session:
job = session.get(Job, job_id)
if job is None:
return
if job.lease_owner != worker_id:
raise RuntimeError(f"job #{job_id} is not leased by this worker")
if job.kind == ROUTE_MATCHING_JOB_KIND:
_run_route_matching_job(job_id, worker_id)
elif job.kind == ROUTE_LAYER_JOB_KIND:
_run_route_layer_rebuild_job(job_id, worker_id)
elif job.kind == ADDRESS_INDEX_JOB_KIND:
_run_address_index_rebuild_job(job_id, worker_id)
elif job.kind == OSM_RELABEL_JOB_KIND:
_run_osm_relabel_job(job_id, worker_id)
elif job.kind == SOURCE_IMPORT_JOB_KIND:
_run_source_import_job(job_id, worker_id)
elif job.kind == SOURCE_DELETE_JOB_KIND:
_run_source_delete_job(job_id, worker_id)
elif job.kind == DATASET_DELETE_JOB_KIND:
_run_dataset_delete_job(job_id, worker_id)
elif job.kind == MAINTENANCE_JOB_KIND:
_run_maintenance_job(job_id, worker_id)
else:
raise ValueError(f"unsupported job kind: {job.kind}")
except JobPaused:
return
except JobCancelled:
_mark_job_cancelled(job_id)
except Exception as exc: # noqa: BLE001 - surfaced through job status UI
_mark_job_failed(job_id, exc)
@contextmanager
def _job_heartbeat_context(job_id: int, worker_id: str) -> Iterator[None]:
stop_event = threading.Event()
interval = max(10.0, min(float(HEARTBEAT_INTERVAL_SECONDS), float(LEASE_SECONDS) / 3))
thread = threading.Thread(
target=_job_heartbeat_loop,
args=(job_id, worker_id, stop_event, interval),
name=f"job-heartbeat-{job_id}",
daemon=True,
)
thread.start()
try:
yield
finally:
stop_event.set()
thread.join(timeout=5)
def _job_heartbeat_loop(job_id: int, worker_id: str, stop_event: threading.Event, interval: float) -> None:
while not stop_event.wait(interval):
try:
with SessionLocal() as session:
job = session.get(Job, job_id)
if job is None or job.status != "running" or job.lease_owner != worker_id:
continue
_heartbeat_job(job, worker_id)
session.commit()
except Exception:
# Best-effort liveness refresh. A normal progress callback or the
# next loop will renew the lease if this short write collides.
continue
def pause_job(session: Session, job_id: int) -> Job:
job = _get_job_or_raise(session, job_id)
if job.status == "queued":
_mark_job_paused(session, job, "Job paused before it was claimed.")
elif job.status == "running":
job.requested_action = "pause"
job.updated_at = datetime.now(timezone.utc)
_write_job_control_request(
job.id,
{"job_id": job.id, "requested_action": "pause", "requested_at": job.updated_at.isoformat(), "request_pid": os.getpid()},
)
add_job_event(session, job, event_type="pause_requested", message="Pause requested.")
elif job.status != "paused":
raise ValueError(f"cannot pause job in status {job.status}")
session.flush()
return job
def resume_job(session: Session, job_id: int) -> Job:
job = _get_job_or_raise(session, job_id)
if job.status != "paused":
raise ValueError(f"cannot resume job in status {job.status}")
job.status = "queued"
job.requested_action = None
job.lease_owner = None
job.lease_expires_at = None
job.paused_at = None
job.updated_at = datetime.now(timezone.utc)
_clear_job_control_request(job.id)
add_job_event(session, job, event_type="resumed", message="Job returned to the queue.")
source = _job_source(session, job)
if source is not None:
source.status = "queued"
source.last_error = None
dataset = _job_dataset(session, job)
if dataset is not None:
dataset.status = "queued"
session.flush()
return job
def retry_job(session: Session, job_id: int) -> Job:
job = _get_job_or_raise(session, job_id)
if job.status not in TERMINAL_JOB_STATUSES:
raise ValueError(f"cannot retry job in status {job.status}")
now = datetime.now(timezone.utc)
job.status = "queued"
job.requested_action = None
job.lease_owner = None
job.lease_expires_at = None
job.paused_at = None
job.error = None
job.dismissed_at = None
job.started_at = None
job.finished_at = None
job.progress_current = 0
job.updated_at = now
options = _json_object(job.result_json)
options.pop("worker_pid", None)
options.pop("worker_id", None)
job.result_json = json.dumps(options, separators=(",", ":")) if options else None
_clear_job_control_request(job.id)
add_job_event(
session,
job,
event_type="retried",
message="Job returned to the queue for retry.",
progress_current=job.progress_current,
progress_total=job.progress_total,
)
source = _job_source(session, job)
if source is not None:
source.status = "queued"
source.last_error = None
dataset = _job_dataset(session, job)
if dataset is not None:
dataset.status = "queued"
session.flush()
return job
def cancel_job(session: Session, job_id: int) -> Job:
job = _get_job_or_raise(session, job_id)
if job.status in TERMINAL_JOB_STATUSES:
return job
if job.status == "running":
job.requested_action = "cancel"
job.updated_at = datetime.now(timezone.utc)
_write_job_control_request(
job.id,
{"job_id": job.id, "requested_action": "cancel", "requested_at": job.updated_at.isoformat(), "request_pid": os.getpid()},
)
add_job_event(session, job, event_type="cancel_requested", message="Stop requested.")
else:
_finish_job_cancelled(session, job)
session.flush()
return job
def dismiss_job(session: Session, job_id: int) -> Job:
job = _get_job_or_raise(session, job_id)
if job.status not in TERMINAL_JOB_STATUSES:
raise ValueError(f"cannot dismiss job in status {job.status}")
if job.dismissed_at is None:
now = datetime.now(timezone.utc)
job.dismissed_at = now
job.updated_at = now
add_job_event(session, job, event_type="dismissed", message="Job dismissed from the default jobs view.")
session.flush()
return job
def dismiss_terminal_jobs(session: Session) -> int:
now = datetime.now(timezone.utc)
jobs = session.scalars(
select(Job)
.where(Job.status.in_(TERMINAL_JOB_STATUSES), Job.dismissed_at.is_(None))
.order_by(Job.created_at.desc(), Job.id.desc())
).all()
for job in jobs:
job.dismissed_at = now
job.updated_at = now
add_job_event(session, job, event_type="dismissed", message="Job dismissed from the default jobs view.")
if jobs:
session.flush()
return len(jobs)
def set_job_priority(session: Session, job_id: int, priority: int) -> Job:
job = _get_job_or_raise(session, job_id)
job.priority = int(priority)
job.updated_at = datetime.now(timezone.utc)
add_job_event(session, job, event_type="priority_changed", message=f"Priority changed to {job.priority}.", metadata={"priority": job.priority})
session.flush()
return job
def _run_route_matching_job(job_id: int, worker_id: str) -> None:
init_db()
with database_write_lock(f"job:{ROUTE_MATCHING_JOB_KIND}:{job_id}", timeout=3600):
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
_job_running(session, job, worker_id, "started", "Route matching started.", 0)
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
_check_job_control(session, job)
progress_callback = _job_progress_callback(session, job, worker_id, update_job_progress=True)
result = run_route_matching(session, progress_callback=progress_callback)
job = _job_for_worker(session, job_id, worker_id)
_complete_job(session, job, "Route matching completed.", result)
session.commit()
def _run_route_layer_rebuild_job(job_id: int, worker_id: str) -> None:
init_db()
with database_write_lock(f"job:{ROUTE_LAYER_JOB_KIND}:{job_id}", timeout=3600):
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
_job_running(session, job, worker_id, "started", "Route-layer rebuild started.", 1)
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
_job_running(session, job, worker_id, "rebuilding", "Extracting canonical stops and route patterns.", 2)
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
_check_job_control(session, job)
progress_callback = _job_progress_callback(session, job, worker_id)
result = rebuild_route_layer(session, progress_callback=progress_callback)
job = _job_for_worker(session, job_id, worker_id)
_complete_job(session, job, "Route-layer rebuild completed.", result)
session.commit()
def _run_address_index_rebuild_job(job_id: int, worker_id: str) -> None:
init_db()
with database_write_lock(f"job:{ADDRESS_INDEX_JOB_KIND}:{job_id}", timeout=3600):
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
_job_running(session, job, worker_id, "started", "OSM address index rebuild started.", 1)
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
_job_running(session, job, worker_id, "rebuilding", "Extracting searchable addresses from OSM.", 2)
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
_check_job_control(session, job)
progress_callback = _job_progress_callback(session, job, worker_id)
result = rebuild_address_index(session, progress_callback=progress_callback)
job = _job_for_worker(session, job_id, worker_id)
_complete_job(session, job, "OSM address index rebuild completed.", result)
session.commit()
def _run_osm_relabel_job(job_id: int, worker_id: str) -> None:
init_db()
with database_write_lock(f"job:{OSM_RELABEL_JOB_KIND}:{job_id}", timeout=3600):
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
options = _json_object(job.result_json)
_job_running(session, job, worker_id, "started", "OSM feature relabeling started.", 0)
progress_callback = _job_progress_callback(session, job, worker_id, update_job_progress=False)
relabel_result = relabel_osm_features(
session,
dataset_id=_optional_int(options.get("dataset_id")),
force=bool(options.get("force")),
progress_callback=progress_callback,
job_id=job.id,
)
job = _job_for_worker(session, job_id, worker_id)
options = _json_object(job.result_json)
options["relabel_result"] = relabel_result
job.result_json = json.dumps(options, separators=(",", ":"))
job.progress_current = 1
_heartbeat_job(job, worker_id)
add_job_event(
session,
job,
event_type="osm_relabel_completed",
message="OSM feature relabeling completed.",
progress_current=1,
progress_total=job.progress_total,
metadata=relabel_result,
)
_check_job_control(session, job)
session.commit()
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
options = _json_object(job.result_json)
if options.get("build_route_layer"):
_job_running(session, job, worker_id, "rebuilding_route_layer", "Rebuilding route layer after OSM relabeling.", 1)
progress_callback = _job_progress_callback(session, job, worker_id)
route_layer_result = rebuild_route_layer(session, progress_callback=progress_callback)
job = _job_for_worker(session, job_id, worker_id)
options = _json_object(job.result_json)
options["route_layer_result"] = route_layer_result
job.result_json = json.dumps(options, separators=(",", ":"))
_heartbeat_job(job, worker_id)
add_job_event(
session,
job,
event_type="route_layer_rebuilt",
message="Route layer rebuilt after OSM relabeling.",
progress_current=job.progress_total,
progress_total=job.progress_total,
metadata=route_layer_result,
)
_check_job_control(session, job)
session.commit()
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
_complete_job(session, job, "OSM relabel job completed.", _json_object(job.result_json))
session.commit()
def _run_source_import_job(job_id: int, worker_id: str) -> None:
init_db()
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
options = _json_object(job.result_json)
source = session.get(Source, int(options.get("source_id") or 0))
if source is None:
raise ValueError("source not found for import job")
source.status = "running"
source.last_error = None
source.last_run_at = datetime.now(timezone.utc)
options["worker_pid"] = os.getpid()
options["worker_id"] = worker_id
job.result_json = json.dumps(options, separators=(",", ":"))
_job_running(session, job, worker_id, "started", f"Importing source {source.name}.", 1)
progress_callback = _job_progress_callback(session, job, worker_id)
if source.kind == "osm_pbf":
dataset = run_osm_pbf_source_staged(source.id, progress_callback=progress_callback)
else:
dataset = run_source(session, source, progress_callback=progress_callback)
result = {**options, "dataset_id": dataset.id, "dataset_kind": dataset.kind, "dataset_status": dataset.status}
job = _job_for_worker(session, job_id, worker_id)
job.result_json = json.dumps(result, separators=(",", ":"))
job.progress_current = 2
_heartbeat_job(job, worker_id)
add_job_event(
session,
job,
event_type="source_imported",
message=f"Imported dataset #{dataset.id} from {source.name}.",
progress_current=2,
progress_total=job.progress_total,
metadata=result,
)
_check_job_control(session, job)
session.commit()
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
options = _json_object(job.result_json)
if options.get("run_match"):
_job_running(session, job, worker_id, "matching", "Running route matcher after import.", 3)
progress_callback = _job_progress_callback(session, job, worker_id)
match_result = run_route_matching(session, progress_callback=progress_callback)
job = _job_for_worker(session, job_id, worker_id)
options = _json_object(job.result_json)
options["match_result"] = match_result
job.result_json = json.dumps(options, separators=(",", ":"))
_heartbeat_job(job, worker_id)
add_job_event(
session,
job,
event_type="matched",
message="Route matcher completed.",
progress_current=3,
progress_total=job.progress_total,
metadata=match_result,
)
_check_job_control(session, job)
session.commit()
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
options = _json_object(job.result_json)
if options.get("build_route_layer"):
_job_running(session, job, worker_id, "rebuilding_route_layer", "Rebuilding route layer after source import.", job.progress_total - 1)
progress_callback = _job_progress_callback(session, job, worker_id)
route_layer_result = rebuild_route_layer(session, progress_callback=progress_callback)
job = _job_for_worker(session, job_id, worker_id)
options = _json_object(job.result_json)
options["route_layer_result"] = route_layer_result
job.result_json = json.dumps(options, separators=(",", ":"))
_heartbeat_job(job, worker_id)
add_job_event(
session,
job,
event_type="route_layer_rebuilt",
message="Route layer rebuilt after import.",
progress_current=job.progress_total - 1,
progress_total=job.progress_total,
metadata=route_layer_result,
)
_check_job_control(session, job)
session.commit()
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
source = _job_source(session, job)
if source is not None:
source.status = "ok"
source.last_error = None
source.last_run_at = datetime.now(timezone.utc)
_complete_job(session, job, "Source import job completed.", _json_object(job.result_json))
session.commit()
def _run_source_delete_job(job_id: int, worker_id: str) -> None:
init_db()
delete_result: dict[str, Any] = {}
with database_write_lock(f"job:{SOURCE_DELETE_JOB_KIND}:{job_id}", timeout=3600):
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
options = _json_object(job.result_json)
source_id = _optional_int(options.get("source_id"))
if source_id is None:
raise ValueError("source not found for delete job")
source = session.get(Source, source_id)
if source is None:
result = {**options, "delete_result": {"deleted": False, "reason": "source not found", "source_id": source_id}}
_complete_job(session, job, "Source delete job completed; source was already absent.", result)
session.commit()
return
source.status = "running"
source.last_error = None
options["worker_pid"] = os.getpid()
options["worker_id"] = worker_id
job.result_json = json.dumps(options, separators=(",", ":"))
_job_running(session, job, worker_id, "started", f"Deleting source {source.name}.", 1)
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
options = _json_object(job.result_json)
source_id = int(options["source_id"])
delete_result = delete_source(session, source_id)
job.progress_current = 2
_heartbeat_job(job, worker_id)
add_job_event(
session,
job,
event_type="source_deleted",
message="Source rows and datasets deleted.",
progress_current=2,
progress_total=job.progress_total,
metadata=delete_result,
)
_check_job_control(session, job)
session.commit()
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
_job_running(session, job, worker_id, "pruning_cache", "Pruning unreferenced cache files.", 3)
cache = prune_unreferenced_cache_files(session)
result = {**_json_object(job.result_json), "delete_result": delete_result, "cache_pruned": cache}
_complete_job(session, job, "Source delete job completed.", result)
session.commit()
def _run_dataset_delete_job(job_id: int, worker_id: str) -> None:
init_db()
delete_result: dict[str, Any] = {}
with database_write_lock(f"job:{DATASET_DELETE_JOB_KIND}:{job_id}", timeout=3600):
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
options = _json_object(job.result_json)
dataset_id = _optional_int(options.get("dataset_id"))
if dataset_id is None:
raise ValueError("dataset not found for delete job")
dataset = session.get(Dataset, dataset_id)
if dataset is None:
result = {**options, "delete_result": {"deleted": False, "reason": "dataset not found", "dataset_id": dataset_id}}
_complete_job(session, job, "Dataset delete job completed; dataset was already absent.", result)
session.commit()
return
dataset.status = "running"
options["worker_pid"] = os.getpid()
options["worker_id"] = worker_id
job.result_json = json.dumps(options, separators=(",", ":"))
_job_running(session, job, worker_id, "started", f"Deleting dataset #{dataset.id}.", 1)
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
options = _json_object(job.result_json)
dataset_id = int(options["dataset_id"])
source_id = _optional_int(options.get("source_id"))
delete_result = delete_dataset(session, dataset_id)
if source_id is not None:
source = session.get(Source, source_id)
if source is not None:
source.status = _source_status_without_active_job(session, source)
job.progress_current = 2
_heartbeat_job(job, worker_id)
add_job_event(
session,
job,
event_type="dataset_deleted",
message="Dataset rows and files deleted.",
progress_current=2,
progress_total=job.progress_total,
metadata=delete_result,
)
_check_job_control(session, job)
session.commit()
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
_job_running(session, job, worker_id, "pruning_cache", "Pruning unreferenced cache files.", 3)
cache = prune_unreferenced_cache_files(session)
result = {**_json_object(job.result_json), "delete_result": delete_result, "cache_pruned": cache}
_complete_job(session, job, "Dataset delete job completed.", result)
session.commit()
def _run_maintenance_job(job_id: int, worker_id: str) -> None:
init_db()
with database_write_lock(f"job:{MAINTENANCE_JOB_KIND}:{job_id}", timeout=3600):
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
options = _json_object(job.result_json)
action = str(options.get("action") or "")
payload = _normalize_job_payload(options.get("payload"))
if not action:
raise ValueError("maintenance action is missing")
options["worker_pid"] = os.getpid()
options["worker_id"] = worker_id
job.result_json = json.dumps(options, separators=(",", ":"))
_job_running(session, job, worker_id, "started", f"{_maintenance_description(action, payload)} started.", 1)
result = _run_maintenance_action(job_id, worker_id, action, payload)
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
_complete_job(session, job, f"{_maintenance_description(action, payload)} completed.", result)
session.commit()
def _run_maintenance_action(job_id: int, worker_id: str, action: str, payload: dict[str, Any]) -> dict[str, Any]:
if action == "init-db":
init_db()
return {"action": action, "payload": payload, "result": {"status": "initialized"}}
if action == "vacuum-db":
with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as connection:
connection.execute(text("VACUUM"))
connection.execute(text("PRAGMA wal_checkpoint(TRUNCATE)"))
return {"action": action, "payload": payload, "result": {"status": "vacuumed"}}
with SessionLocal() as session:
job = _job_for_worker(session, job_id, worker_id)
_check_job_control(session, job)
if action == "sample-reset":
result = load_sample_project(session, preserve_job_id=job_id)
elif action == "reset-db":
clear_project_data(session, preserve_job_id=job_id, preserve_catalog=False)
result = {"status": "reset", "preserved_job_id": job_id}
elif action == "backfill-gtfs-shapes":
result = backfill_gtfs_shapes(session, dataset_id=_optional_int(payload.get("dataset_id")))
elif action == "prune-cache":
dry_run = bool(payload.get("dry_run", True))
if dry_run:
result = {"dry_run": True, **unreferenced_cache_file_summary(session)}
else:
result = {"dry_run": False, **prune_unreferenced_cache_files(session)}
elif action == "prune-inactive-datasets":
result = prune_inactive_datasets(session, dry_run=bool(payload.get("dry_run", True)))
elif action == "source-catalog-import":
imported = import_source_catalog(session, payload.get("csv_path"), update_existing=bool(payload.get("update_existing", True)))
result = {**imported, "summary": source_catalog_summary(session)}
elif action == "source-catalog-import-ingestable":
imported = import_ingestable_sources(session, payload.get("csv_path"), update_existing=bool(payload.get("update_existing", True)))
result = {**imported, "summary": source_catalog_summary(session)}
else:
raise ValueError(f"unsupported maintenance action: {action}")
job = _job_for_worker(session, job_id, worker_id)
_check_job_control(session, job)
session.commit()
return {"action": action, "payload": payload, "result": result}
def _job_for_worker(session: Session, job_id: int, worker_id: str) -> Job:
job = session.get(Job, job_id)
if job is None:
raise ValueError(f"job not found: {job_id}")
if job.lease_owner != worker_id:
raise RuntimeError(f"job #{job_id} is not leased by this worker")
if job.status != "running":
raise RuntimeError(f"job #{job_id} is not running")
_heartbeat_job(job, worker_id)
return job
def _job_running(session: Session, job: Job, worker_id: str, event_type: str, message: str, progress_current: int) -> None:
now = datetime.now(timezone.utc)
job.status = "running"
if job.started_at is None:
job.started_at = now
_heartbeat_job(job, worker_id, now=now)
job.progress_current = progress_current
add_job_event(
session,
job,
event_type=event_type,
message=message,
progress_current=progress_current,
progress_total=job.progress_total,
)
_check_job_control(session, job)
session.commit()
def _job_progress_callback(session: Session, job: Job, worker_id: str, *, update_job_progress: bool = False):
def _callback(
event_type: str,
message: str,
progress_current: int | None = None,
progress_total: int | None = None,
metadata: dict[str, Any] | None = None,
) -> None:
current_job = session.get(Job, job.id)
if current_job is None:
return
session.refresh(current_job)
if current_job.status != "running" or current_job.lease_owner != worker_id:
raise RuntimeError(f"job #{job.id} is no longer running under worker {worker_id}")
_heartbeat_job(current_job, worker_id)
if update_job_progress and progress_current is not None:
current_job.progress_current = progress_current
if update_job_progress and progress_total is not None:
current_job.progress_total = progress_total
add_job_event(
session,
current_job,
event_type=event_type,
message=message,
progress_current=progress_current,
progress_total=progress_total,
metadata=metadata,
)
_check_job_control(session, current_job)
session.commit()
return _callback
def _heartbeat_job(job: Job, worker_id: str, *, now: datetime | None = None) -> None:
current = now or datetime.now(timezone.utc)
job.lease_owner = worker_id
job.lease_expires_at = current + timedelta(seconds=LEASE_SECONDS)
job.updated_at = current
def _check_job_control(session: Session, job: Job) -> None:
session.flush()
session.refresh(job)
requested_action = job.requested_action or _job_control_request_action(job)
if requested_action == "cancel":
_clear_job_control_request(job.id)
raise JobCancelled()
if requested_action == "pause":
_clear_job_control_request(job.id)
_mark_job_paused(session, job, "Job paused at a cooperative checkpoint.")
session.commit()
raise JobPaused()
def _mark_job_paused(session: Session, job: Job, message: str) -> None:
now = datetime.now(timezone.utc)
job.status = "paused"
job.requested_action = None
job.lease_owner = None
job.lease_expires_at = None
job.paused_at = now
job.updated_at = now
add_job_event(
session,
job,
event_type="paused",
message=message,
progress_current=job.progress_current,
progress_total=job.progress_total,
)
source = _job_source(session, job)
if source is not None:
source.status = "paused"
source.last_error = None
dataset = _job_dataset(session, job)
if dataset is not None:
dataset.status = "paused"
_clear_job_control_request(job.id)
def _complete_job(session: Session, job: Job, message: str, result: dict[str, Any]) -> None:
job.status = "completed"
job.requested_action = None
job.lease_owner = None
job.lease_expires_at = None
job.paused_at = None
job.progress_current = job.progress_total
job.result_json = json.dumps(result, separators=(",", ":"))
job.updated_at = datetime.now(timezone.utc)
job.finished_at = job.updated_at
_clear_job_control_request(job.id)
add_job_event(
session,
job,
event_type="completed",
message=message,
progress_current=job.progress_current,
progress_total=job.progress_total,
metadata=result,
)
def _finish_job_cancelled(session: Session, job: Job) -> None:
now = datetime.now(timezone.utc)
job.status = "cancelled"
job.requested_action = None
job.lease_owner = None
job.lease_expires_at = None
job.paused_at = None
job.updated_at = now
job.finished_at = now
job.error = None
_clear_job_control_request(job.id)
add_job_event(
session,
job,
event_type="cancelled",
message="Job stopped.",
progress_current=job.progress_current,
progress_total=job.progress_total,
)
source = _job_source(session, job)
if source is not None:
replacement = _source_status_without_active_job(session, source)
source.status = "new" if replacement == "error" and source.last_error is None else replacement
dataset = _job_dataset(session, job)
if dataset is not None:
dataset.status = str(_json_object(job.result_json).get("dataset_status") or "imported")
def _mark_job_cancelled(job_id: int) -> None:
with database_write_lock(f"job:{job_id}:cancelled", timeout=10):
with SessionLocal() as session:
job = session.get(Job, job_id)
if job is None:
return
_finish_job_cancelled(session, job)
session.commit()
def _get_job_or_raise(session: Session, job_id: int) -> Job:
job = session.get(Job, job_id)
if job is None:
raise ValueError(f"job not found: {job_id}")
return job
def _mark_job_failed(job_id: int, exc: Exception) -> None:
try:
with database_write_lock(f"job:{ROUTE_LAYER_JOB_KIND}:{job_id}:failed", timeout=10):
with SessionLocal() as session:
job = session.get(Job, job_id)
if job is None:
return
if job.status in TERMINAL_JOB_STATUSES and job.lease_owner is None:
return
job.status = "failed"
job.requested_action = None
job.lease_owner = None
job.lease_expires_at = None
job.paused_at = None
job.error = str(exc)
job.updated_at = datetime.now(timezone.utc)
job.finished_at = job.updated_at
source = _job_source(session, job)
if source is not None:
source.status = "error"
source.last_error = str(exc)
source.last_run_at = job.finished_at
dataset = _job_dataset(session, job)
if dataset is not None:
dataset.status = "error"
add_job_event(
session,
job,
level="error",
event_type="failed",
message=str(exc),
progress_current=job.progress_current,
progress_total=job.progress_total,
metadata={"exception_type": exc.__class__.__name__},
)
session.commit()
except DatabaseWriteBusy:
pass
def _job_source(session: Session, job: Job) -> Source | None:
source_id = source_id_from_job(job)
if source_id is None:
return None
return session.get(Source, source_id)
def _job_dataset(session: Session, job: Job) -> Dataset | None:
dataset_id = dataset_id_from_job(job)
if dataset_id is None:
return None
return session.get(Dataset, dataset_id)
def _job_control_dir():
return settings.data_dir / "job-control"
def _job_control_path(job_id: int):
return _job_control_dir() / f"job-{int(job_id)}.json"
def _write_job_control_request(job_id: int, payload: dict[str, Any]) -> None:
directory = _job_control_dir()
directory.mkdir(parents=True, exist_ok=True)
path = _job_control_path(job_id)
tmp_path = directory / f".job-{int(job_id)}-{os.getpid()}-{threading.get_ident()}.tmp"
tmp_path.write_text(json.dumps(payload, separators=(",", ":")), encoding="utf-8")
os.replace(tmp_path, path)
def _read_job_control_request(job_id: int) -> dict[str, Any]:
try:
data = json.loads(_job_control_path(job_id).read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return {}
return data if isinstance(data, dict) else {}
def _clear_job_control_request(job_id: int) -> None:
try:
_job_control_path(job_id).unlink()
except FileNotFoundError:
pass
except OSError:
pass
def _job_control_request_action(job: Job) -> str | None:
payload = _read_job_control_request(job.id)
action = payload.get("requested_action")
if action not in {"pause", "cancel"}:
return None
requested_at = _datetime_from_iso(payload.get("requested_at"))
if requested_at is not None and job.created_at is not None and requested_at < _as_utc(job.created_at):
_clear_job_control_request(job.id)
return None
return str(action)
def _json_object(value: str | None) -> dict[str, Any]:
if not value:
return {}
try:
data = json.loads(value)
except json.JSONDecodeError:
return {}
return data if isinstance(data, dict) else {}
def _normalize_job_payload(value: object) -> dict[str, Any]:
if not isinstance(value, dict):
return {}
normalized: dict[str, Any] = {}
for key in sorted(value):
item = value[key]
if item is None:
continue
if isinstance(item, dict):
normalized[str(key)] = _normalize_job_payload(item)
elif isinstance(item, list):
normalized[str(key)] = item
elif isinstance(item, (str, int, float, bool)):
normalized[str(key)] = item
else:
normalized[str(key)] = str(item)
return normalized
def _maintenance_description(action: str, payload: dict[str, Any] | None = None) -> str:
normalized = _normalize_job_payload(payload)
if action == "init-db":
return "Initialize database schema"
if action == "sample-reset":
return "Reset sample data"
if action == "reset-db":
return "Reset database contents"
if action == "backfill-gtfs-shapes":
dataset_id = normalized.get("dataset_id")
return f"Backfill GTFS shapes for dataset {dataset_id}" if dataset_id is not None else "Backfill GTFS shapes"
if action == "prune-cache":
return "Check unreferenced cache files" if normalized.get("dry_run", True) else "Prune unreferenced cache files"
if action == "prune-inactive-datasets":
return "Check inactive datasets" if normalized.get("dry_run", True) else "Prune inactive datasets"
if action == "vacuum-db":
return "Vacuum database"
if action == "source-catalog-import":
return "Import source catalog"
if action == "source-catalog-import-ingestable":
return "Import ingestable source seeds"
return f"Maintenance action {action}"
def _maintenance_progress_total(action: str) -> int:
if action in {"sample-reset", "reset-db"}:
return 4
return 1
def _optional_int(value: object) -> int | None:
if value is None:
return None
try:
return int(value)
except (TypeError, ValueError):
return None
def _datetime_from_iso(value: object) -> datetime | None:
if not isinstance(value, str) or not value:
return None
try:
parsed = datetime.fromisoformat(value)
except ValueError:
return None
return _as_utc(parsed)
def _iso(value: datetime | None) -> str | None:
return value.isoformat() if value else None
def _revision_datetime(value: datetime | None) -> str:
return "" if value is None else value.isoformat()
def _as_utc(value: datetime | None) -> datetime | None:
if value is None:
return None
if value.tzinfo is None:
return value.replace(tzinfo=timezone.utc)
return value.astimezone(timezone.utc)