156 lines
4.3 KiB
Python
156 lines
4.3 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
from app.config import settings
|
|
|
|
|
|
@dataclass
|
|
class WorkerHandle:
|
|
index: int
|
|
worker_id: str
|
|
pid: int | None
|
|
status: str
|
|
pid_file: Path
|
|
log_file: Path
|
|
started_by_server: bool = False
|
|
|
|
|
|
_handles: list[WorkerHandle] = []
|
|
|
|
|
|
def start_queue_workers() -> list[WorkerHandle]:
|
|
if not settings.queue_worker_autostart:
|
|
return []
|
|
worker_count = max(0, int(settings.queue_worker_count))
|
|
handles: list[WorkerHandle] = []
|
|
worker_dir = settings.data_dir / "workers"
|
|
worker_dir.mkdir(parents=True, exist_ok=True)
|
|
for index in range(worker_count):
|
|
worker_id = f"server-worker-{index + 1}"
|
|
pid_file = worker_dir / f"{worker_id}.pid"
|
|
log_file = worker_dir / f"{worker_id}.log"
|
|
existing_pid = _read_pid(pid_file)
|
|
if existing_pid is not None and _pid_running(existing_pid):
|
|
handles.append(
|
|
WorkerHandle(
|
|
index=index,
|
|
worker_id=worker_id,
|
|
pid=existing_pid,
|
|
status="already_running",
|
|
pid_file=pid_file,
|
|
log_file=log_file,
|
|
)
|
|
)
|
|
continue
|
|
pid_file.unlink(missing_ok=True)
|
|
process = _spawn_worker(worker_id, log_file)
|
|
pid_file.write_text(str(process.pid), encoding="utf-8")
|
|
handles.append(
|
|
WorkerHandle(
|
|
index=index,
|
|
worker_id=worker_id,
|
|
pid=process.pid,
|
|
status="started",
|
|
pid_file=pid_file,
|
|
log_file=log_file,
|
|
started_by_server=True,
|
|
)
|
|
)
|
|
_handles[:] = handles
|
|
return list(_handles)
|
|
|
|
|
|
def stop_queue_workers() -> None:
|
|
if not settings.queue_worker_stop_on_shutdown:
|
|
return
|
|
for handle in list(_handles):
|
|
if not handle.started_by_server or handle.pid is None:
|
|
continue
|
|
_terminate_pid(handle.pid)
|
|
handle.pid_file.unlink(missing_ok=True)
|
|
|
|
|
|
def queue_worker_status() -> list[dict[str, object]]:
|
|
if not settings.queue_worker_autostart:
|
|
return []
|
|
worker_dir = settings.data_dir / "workers"
|
|
statuses: list[dict[str, object]] = []
|
|
configured_count = max(0, int(settings.queue_worker_count))
|
|
for index in range(configured_count):
|
|
worker_id = f"server-worker-{index + 1}"
|
|
pid_file = worker_dir / f"{worker_id}.pid"
|
|
log_file = worker_dir / f"{worker_id}.log"
|
|
pid = _read_pid(pid_file)
|
|
running = pid is not None and _pid_running(pid)
|
|
statuses.append(
|
|
{
|
|
"index": index,
|
|
"worker_id": worker_id,
|
|
"pid": pid,
|
|
"running": running,
|
|
"pid_file": str(pid_file),
|
|
"log_file": str(log_file),
|
|
}
|
|
)
|
|
return statuses
|
|
|
|
|
|
def _spawn_worker(worker_id: str, log_file: Path) -> subprocess.Popen:
|
|
root = Path(__file__).resolve().parents[1]
|
|
command = [
|
|
sys.executable,
|
|
"-m",
|
|
"app.cli",
|
|
"worker",
|
|
"--worker-id",
|
|
worker_id,
|
|
"--poll-interval",
|
|
str(settings.queue_worker_poll_interval_seconds),
|
|
]
|
|
env = os.environ.copy()
|
|
env["MOBILITY_SUPERVISED_WORKER"] = "1"
|
|
log_file.parent.mkdir(parents=True, exist_ok=True)
|
|
log_handle = log_file.open("ab", buffering=0)
|
|
try:
|
|
return subprocess.Popen(
|
|
command,
|
|
cwd=str(root),
|
|
env=env,
|
|
stdin=subprocess.DEVNULL,
|
|
stdout=log_handle,
|
|
stderr=subprocess.STDOUT,
|
|
start_new_session=True,
|
|
)
|
|
finally:
|
|
log_handle.close()
|
|
|
|
|
|
def _read_pid(path: Path) -> int | None:
|
|
try:
|
|
return int(path.read_text(encoding="utf-8").strip())
|
|
except (FileNotFoundError, ValueError, OSError):
|
|
return None
|
|
|
|
|
|
def _pid_running(pid: int) -> bool:
|
|
try:
|
|
os.kill(pid, 0)
|
|
except ProcessLookupError:
|
|
return False
|
|
except PermissionError:
|
|
return True
|
|
return True
|
|
|
|
|
|
def _terminate_pid(pid: int) -> None:
|
|
try:
|
|
os.kill(pid, signal.SIGTERM)
|
|
except ProcessLookupError:
|
|
return
|