from __future__ import annotations from contextlib import contextmanager from dataclasses import dataclass import json import os from pathlib import Path import threading import time from typing import Iterator from app.config import settings try: import fcntl except ImportError: # pragma: no cover - this app currently targets Linux/macOS dev hosts fcntl = None # type: ignore[assignment] class DatabaseWriteBusy(RuntimeError): def __init__(self, operation: str, active: dict[str, object] | None = None) -> None: self.operation = operation self.active = active or {} active_operation = self.active.get("operation") detail = f"Database is busy with another write operation" if active_operation: detail += f": {active_operation}" super().__init__(detail) @dataclass(frozen=True) class DatabaseWriteState: locked: bool operation: str | None = None pid: int | None = None started_at: float | None = None @property def elapsed_seconds(self) -> float | None: if self.started_at is None: return None return max(0.0, time.time() - self.started_at) _process_write_lock = threading.Lock() _state_lock = threading.Lock() _state = DatabaseWriteState(locked=False) def is_sqlite_database() -> bool: return settings.is_sqlite_database @contextmanager def database_write_lock(operation: str, timeout: float | None = None) -> Iterator[None]: """Serialize SQLite writes inside and across app processes. SQLite allows only one writer. This lock prevents mutating endpoints from competing until SQLite times out with a low-level "database is locked" error. """ if not is_sqlite_database(): yield return effective_timeout = settings.database_write_lock_timeout_seconds if timeout is None else timeout deadline = None if effective_timeout is None else time.monotonic() + max(0.0, effective_timeout) if not _acquire_process_lock(deadline): raise DatabaseWriteBusy(operation, database_write_status().__dict__) handle = None file_locked = False try: lock_path = _lock_path() lock_path.parent.mkdir(parents=True, exist_ok=True) handle = _open_locked_handle(lock_path, deadline) if handle is None: raise DatabaseWriteBusy(operation, _read_lock_metadata(lock_path)) file_locked = True _write_lock_metadata(handle, operation) _set_state(DatabaseWriteState(locked=True, operation=operation, pid=os.getpid(), started_at=time.time())) yield finally: _set_state(DatabaseWriteState(locked=False)) if handle is not None: if file_locked and fcntl is not None: try: fcntl.flock(handle.fileno(), fcntl.LOCK_UN) except OSError: pass handle.close() if file_locked: try: _lock_path().unlink() except FileNotFoundError: pass except OSError: pass _process_write_lock.release() def database_write_status() -> DatabaseWriteState: with _state_lock: return _state def _acquire_process_lock(deadline: float | None) -> bool: while True: if _process_write_lock.acquire(blocking=False): return True if deadline is not None and time.monotonic() >= deadline: return False time.sleep(0.05) def _acquire_file_lock(handle, deadline: float | None) -> bool: if fcntl is None: return True while True: try: fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) return True except BlockingIOError: if deadline is not None and time.monotonic() >= deadline: return False time.sleep(0.05) def _open_locked_handle(lock_path: Path, deadline: float | None): while True: try: lock_path.parent.mkdir(parents=True, exist_ok=True) handle = lock_path.open("a+", encoding="utf-8") except FileNotFoundError: if deadline is not None and time.monotonic() >= deadline: return None time.sleep(0.05) continue if _try_file_lock(handle): return handle metadata = _read_lock_metadata(lock_path) handle.close() if not _lock_metadata_is_stale(metadata): if deadline is not None and time.monotonic() >= deadline: return None time.sleep(0.05) continue try: lock_path.unlink() except FileNotFoundError: pass except OSError: return None if deadline is not None and time.monotonic() >= deadline: return None def _try_file_lock(handle) -> bool: if fcntl is None: return True try: fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) except BlockingIOError: return False return True def _lock_metadata_is_stale(metadata: dict[str, object]) -> bool: pid = metadata.get("pid") try: pid_int = int(pid) # type: ignore[arg-type] except (TypeError, ValueError): return False if pid_int <= 0 or pid_int == os.getpid(): return False return not _pid_exists(pid_int) def _pid_exists(pid: int) -> bool: try: os.kill(pid, 0) except ProcessLookupError: return False except PermissionError: return True return True def _set_state(state: DatabaseWriteState) -> None: global _state with _state_lock: _state = state def _lock_path() -> Path: return settings.data_dir / "workbench.write.lock" def _write_lock_metadata(handle, operation: str) -> None: handle.seek(0) handle.truncate() json.dump({"operation": operation, "pid": os.getpid(), "started_at": time.time()}, handle, separators=(",", ":")) handle.flush() os.fsync(handle.fileno()) def _read_lock_metadata(path: Path) -> dict[str, object]: try: text = path.read_text(encoding="utf-8").strip() return json.loads(text) if text else {} except (OSError, json.JSONDecodeError): return {}