212 lines
6.2 KiB
Python
212 lines
6.2 KiB
Python
from __future__ import annotations
|
|
|
|
from contextlib import contextmanager
|
|
from dataclasses import dataclass
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
import threading
|
|
import time
|
|
from typing import Iterator
|
|
|
|
from app.config import settings
|
|
|
|
try:
|
|
import fcntl
|
|
except ImportError: # pragma: no cover - this app currently targets Linux/macOS dev hosts
|
|
fcntl = None # type: ignore[assignment]
|
|
|
|
|
|
class DatabaseWriteBusy(RuntimeError):
|
|
def __init__(self, operation: str, active: dict[str, object] | None = None) -> None:
|
|
self.operation = operation
|
|
self.active = active or {}
|
|
active_operation = self.active.get("operation")
|
|
detail = f"Database is busy with another write operation"
|
|
if active_operation:
|
|
detail += f": {active_operation}"
|
|
super().__init__(detail)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class DatabaseWriteState:
|
|
locked: bool
|
|
operation: str | None = None
|
|
pid: int | None = None
|
|
started_at: float | None = None
|
|
|
|
@property
|
|
def elapsed_seconds(self) -> float | None:
|
|
if self.started_at is None:
|
|
return None
|
|
return max(0.0, time.time() - self.started_at)
|
|
|
|
|
|
_process_write_lock = threading.Lock()
|
|
_state_lock = threading.Lock()
|
|
_state = DatabaseWriteState(locked=False)
|
|
|
|
|
|
def is_sqlite_database() -> bool:
|
|
return settings.is_sqlite_database
|
|
|
|
|
|
@contextmanager
|
|
def database_write_lock(operation: str, timeout: float | None = None) -> Iterator[None]:
|
|
"""Serialize SQLite writes inside and across app processes.
|
|
|
|
SQLite allows only one writer. This lock prevents mutating endpoints from
|
|
competing until SQLite times out with a low-level "database is locked" error.
|
|
"""
|
|
if not is_sqlite_database():
|
|
yield
|
|
return
|
|
|
|
effective_timeout = settings.database_write_lock_timeout_seconds if timeout is None else timeout
|
|
deadline = None if effective_timeout is None else time.monotonic() + max(0.0, effective_timeout)
|
|
if not _acquire_process_lock(deadline):
|
|
raise DatabaseWriteBusy(operation, database_write_status().__dict__)
|
|
|
|
handle = None
|
|
file_locked = False
|
|
try:
|
|
lock_path = _lock_path()
|
|
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
|
handle = _open_locked_handle(lock_path, deadline)
|
|
if handle is None:
|
|
raise DatabaseWriteBusy(operation, _read_lock_metadata(lock_path))
|
|
file_locked = True
|
|
_write_lock_metadata(handle, operation)
|
|
_set_state(DatabaseWriteState(locked=True, operation=operation, pid=os.getpid(), started_at=time.time()))
|
|
yield
|
|
finally:
|
|
_set_state(DatabaseWriteState(locked=False))
|
|
if handle is not None:
|
|
if file_locked and fcntl is not None:
|
|
try:
|
|
fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
|
|
except OSError:
|
|
pass
|
|
handle.close()
|
|
if file_locked:
|
|
try:
|
|
_lock_path().unlink()
|
|
except FileNotFoundError:
|
|
pass
|
|
except OSError:
|
|
pass
|
|
_process_write_lock.release()
|
|
|
|
|
|
def database_write_status() -> DatabaseWriteState:
|
|
with _state_lock:
|
|
return _state
|
|
|
|
|
|
def _acquire_process_lock(deadline: float | None) -> bool:
|
|
while True:
|
|
if _process_write_lock.acquire(blocking=False):
|
|
return True
|
|
if deadline is not None and time.monotonic() >= deadline:
|
|
return False
|
|
time.sleep(0.05)
|
|
|
|
|
|
def _acquire_file_lock(handle, deadline: float | None) -> bool:
|
|
if fcntl is None:
|
|
return True
|
|
while True:
|
|
try:
|
|
fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
return True
|
|
except BlockingIOError:
|
|
if deadline is not None and time.monotonic() >= deadline:
|
|
return False
|
|
time.sleep(0.05)
|
|
|
|
|
|
def _open_locked_handle(lock_path: Path, deadline: float | None):
|
|
while True:
|
|
try:
|
|
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
|
handle = lock_path.open("a+", encoding="utf-8")
|
|
except FileNotFoundError:
|
|
if deadline is not None and time.monotonic() >= deadline:
|
|
return None
|
|
time.sleep(0.05)
|
|
continue
|
|
if _try_file_lock(handle):
|
|
return handle
|
|
metadata = _read_lock_metadata(lock_path)
|
|
handle.close()
|
|
if not _lock_metadata_is_stale(metadata):
|
|
if deadline is not None and time.monotonic() >= deadline:
|
|
return None
|
|
time.sleep(0.05)
|
|
continue
|
|
try:
|
|
lock_path.unlink()
|
|
except FileNotFoundError:
|
|
pass
|
|
except OSError:
|
|
return None
|
|
if deadline is not None and time.monotonic() >= deadline:
|
|
return None
|
|
|
|
|
|
def _try_file_lock(handle) -> bool:
|
|
if fcntl is None:
|
|
return True
|
|
try:
|
|
fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
except BlockingIOError:
|
|
return False
|
|
return True
|
|
|
|
|
|
def _lock_metadata_is_stale(metadata: dict[str, object]) -> bool:
|
|
pid = metadata.get("pid")
|
|
try:
|
|
pid_int = int(pid) # type: ignore[arg-type]
|
|
except (TypeError, ValueError):
|
|
return False
|
|
if pid_int <= 0 or pid_int == os.getpid():
|
|
return False
|
|
return not _pid_exists(pid_int)
|
|
|
|
|
|
def _pid_exists(pid: int) -> bool:
|
|
try:
|
|
os.kill(pid, 0)
|
|
except ProcessLookupError:
|
|
return False
|
|
except PermissionError:
|
|
return True
|
|
return True
|
|
|
|
|
|
def _set_state(state: DatabaseWriteState) -> None:
|
|
global _state
|
|
with _state_lock:
|
|
_state = state
|
|
|
|
|
|
def _lock_path() -> Path:
|
|
return settings.data_dir / "workbench.write.lock"
|
|
|
|
|
|
def _write_lock_metadata(handle, operation: str) -> None:
|
|
handle.seek(0)
|
|
handle.truncate()
|
|
json.dump({"operation": operation, "pid": os.getpid(), "started_at": time.time()}, handle, separators=(",", ":"))
|
|
handle.flush()
|
|
os.fsync(handle.fileno())
|
|
|
|
|
|
def _read_lock_metadata(path: Path) -> dict[str, object]:
|
|
try:
|
|
text = path.read_text(encoding="utf-8").strip()
|
|
return json.loads(text) if text else {}
|
|
except (OSError, json.JSONDecodeError):
|
|
return {}
|