Files

83 lines
2.2 KiB
Python

from __future__ import annotations
import hashlib
import hmac
import secrets
from dataclasses import dataclass
from datetime import datetime
from sqlalchemy.orm import Session
from app.db.models import ApiKey, User
from app.security.time import ensure_aware_utc, utc_now
API_KEY_PREFIX_LENGTH = 12
API_KEY_RANDOM_BYTES = 32
@dataclass(slots=True)
class CreatedApiKey:
model: ApiKey
secret: str
def hash_api_key(secret: str) -> str:
return hashlib.sha256(secret.encode("utf-8")).hexdigest()
def verify_api_key(secret: str, expected_hash: str) -> bool:
return hmac.compare_digest(hash_api_key(secret), expected_hash)
def generate_api_key_secret() -> str:
return "mm_" + secrets.token_urlsafe(API_KEY_RANDOM_BYTES)
def api_key_prefix(secret: str) -> str:
# Prefix is only a lookup helper and must not be enough to authenticate.
return secret[:API_KEY_PREFIX_LENGTH]
def create_api_key(
session: Session,
*,
user: User,
name: str,
scopes: list[str],
secret: str | None = None,
expires_at: datetime | None = None,
) -> CreatedApiKey:
secret = secret or generate_api_key_secret()
model = ApiKey(
tenant_id=user.tenant_id,
user_id=user.id,
name=name,
prefix=api_key_prefix(secret),
key_hash=hash_api_key(secret),
scopes=scopes,
expires_at=expires_at,
)
session.add(model)
session.flush()
return CreatedApiKey(model=model, secret=secret)
def authenticate_api_key(session: Session, secret: str) -> ApiKey | None:
prefix = api_key_prefix(secret)
candidates = session.query(ApiKey).filter(ApiKey.prefix == prefix, ApiKey.revoked_at.is_(None)).all()
now = utc_now()
for candidate in candidates:
expires_at = ensure_aware_utc(candidate.expires_at)
if expires_at and expires_at < now:
continue
if verify_api_key(secret, candidate.key_hash):
candidate.last_used_at = now
session.add(candidate)
return candidate
return None
def has_scope(api_key: ApiKey, required_scope: str) -> bool:
scopes = set(api_key.scopes or [])
return "*" in scopes or required_scope in scopes