62 lines
1.6 KiB
Python
62 lines
1.6 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import time
|
|
from contextlib import contextmanager
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Iterator
|
|
|
|
from app.config import settings
|
|
|
|
|
|
@contextmanager
|
|
def measure_pipeline_phase(
|
|
phase: str,
|
|
*,
|
|
source_id: int | None = None,
|
|
dataset_id: int | None = None,
|
|
metadata: dict[str, object] | None = None,
|
|
) -> Iterator[dict[str, object]]:
|
|
start = time.perf_counter()
|
|
payload: dict[str, object] = dict(metadata or {})
|
|
try:
|
|
yield payload
|
|
finally:
|
|
duration = round(time.perf_counter() - start, 3)
|
|
payload["duration_seconds"] = duration
|
|
record_pipeline_metric(
|
|
phase,
|
|
source_id=source_id,
|
|
dataset_id=dataset_id,
|
|
duration_seconds=duration,
|
|
metadata=payload,
|
|
)
|
|
|
|
|
|
def record_pipeline_metric(
|
|
phase: str,
|
|
*,
|
|
source_id: int | None = None,
|
|
dataset_id: int | None = None,
|
|
duration_seconds: float | None = None,
|
|
metadata: dict[str, object] | None = None,
|
|
) -> None:
|
|
path = _metric_path()
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
row = {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"phase": phase,
|
|
"source_id": source_id,
|
|
"dataset_id": dataset_id,
|
|
"duration_seconds": duration_seconds,
|
|
"metadata": metadata or {},
|
|
}
|
|
with path.open("a", encoding="utf-8") as handle:
|
|
handle.write(json.dumps(row, separators=(",", ":"), default=str))
|
|
handle.write("\n")
|
|
|
|
|
|
def _metric_path() -> Path:
|
|
return settings.data_dir / "metrics" / "pipeline_metrics.jsonl"
|