from __future__ import annotations from dataclasses import dataclass from fastapi import Depends, Header, HTTPException, status from sqlalchemy.orm import Session from app.db.models import ApiKey, AuthSession, User from app.db.session import get_session from app.security.api_keys import authenticate_api_key, has_scope as api_key_has_scope from app.security.sessions import authenticate_session_token, collect_user_scopes @dataclass(slots=True) class ApiPrincipal: user: User tenant_id: str scopes: list[str] api_key: ApiKey | None = None auth_session: AuthSession | None = None def _extract_token(authorization: str | None, x_api_key: str | None) -> tuple[str | None, str]: if x_api_key: return x_api_key.strip(), "api_key" if authorization and authorization.lower().startswith("bearer "): return authorization[7:].strip(), "bearer" return None, "none" def get_api_principal( session: Session = Depends(get_session), authorization: str | None = Header(default=None), x_api_key: str | None = Header(default=None, alias="X-API-Key"), ) -> ApiPrincipal: token, source = _extract_token(authorization, x_api_key) if not token: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing API key or session token") # API keys remain supported for CLI/automation. Browser login uses session tokens. api_key = authenticate_api_key(session, token) if api_key: user = session.get(User, api_key.user_id) if not user or not user.is_active: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Inactive user") session.commit() return ApiPrincipal(api_key=api_key, user=user, tenant_id=api_key.tenant_id, scopes=api_key.scopes or []) auth_session = authenticate_session_token(session, token) if auth_session: user = session.get(User, auth_session.user_id) if not user or not user.is_active: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Inactive user") scopes = collect_user_scopes(session, user) session.commit() return ApiPrincipal(auth_session=auth_session, user=user, tenant_id=user.tenant_id, scopes=scopes) raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key or session token") def has_scope(principal: ApiPrincipal, required_scope: str) -> bool: scopes = set(principal.scopes or []) if principal.api_key: return api_key_has_scope(principal.api_key, required_scope) return "*" in scopes or required_scope in scopes def require_scope(required_scope: str): def dependency(principal: ApiPrincipal = Depends(get_api_principal)) -> ApiPrincipal: if not has_scope(principal, required_scope): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f"Missing scope: {required_scope}") return principal return dependency