Files

124 lines
3.6 KiB
Python

from __future__ import annotations
import hashlib
import hmac
import secrets
from dataclasses import dataclass
from datetime import timedelta
from sqlalchemy.orm import Session
from app.db.models import AuthSession, Group, GroupRoleAssignment, Role, User, UserGroupMembership, UserRoleAssignment
from app.security.time import ensure_aware_utc, utc_now
SESSION_RANDOM_BYTES = 32
DEFAULT_SESSION_HOURS = 12
@dataclass(slots=True)
class CreatedSession:
model: AuthSession
token: str
def generate_session_token() -> str:
return "ms_" + secrets.token_urlsafe(SESSION_RANDOM_BYTES)
def hash_session_token(token: str) -> str:
return hashlib.sha256(token.encode("utf-8")).hexdigest()
def verify_session_token(token: str, expected_hash: str) -> bool:
return hmac.compare_digest(hash_session_token(token), expected_hash)
def create_auth_session(
session: Session,
*,
user: User,
hours: int = DEFAULT_SESSION_HOURS,
user_agent: str | None = None,
ip_address: str | None = None,
) -> CreatedSession:
token = generate_session_token()
now = utc_now()
model = AuthSession(
tenant_id=user.tenant_id,
user_id=user.id,
token_hash=hash_session_token(token),
expires_at=now + timedelta(hours=hours),
last_seen_at=now,
user_agent=user_agent,
ip_address=ip_address,
)
user.last_login_at = now
session.add(model)
session.add(user)
session.flush()
return CreatedSession(model=model, token=token)
def authenticate_session_token(session: Session, token: str) -> AuthSession | None:
token_hash = hash_session_token(token)
model = session.query(AuthSession).filter(AuthSession.token_hash == token_hash, AuthSession.revoked_at.is_(None)).one_or_none()
if not model:
return None
now = utc_now()
expires_at = ensure_aware_utc(model.expires_at)
if expires_at is None or expires_at < now:
return None
model.last_seen_at = now
session.add(model)
return model
def revoke_auth_session(session: Session, token: str) -> bool:
model = authenticate_session_token(session, token)
if not model:
return False
model.revoked_at = utc_now()
session.add(model)
return True
def collect_user_roles(session: Session, user: User) -> list[Role]:
roles_by_id: dict[str, Role] = {}
direct = (
session.query(Role)
.join(UserRoleAssignment, UserRoleAssignment.role_id == Role.id)
.filter(UserRoleAssignment.user_id == user.id, UserRoleAssignment.tenant_id == user.tenant_id)
.all()
)
for role in direct:
roles_by_id[role.id] = role
group_roles = (
session.query(Role)
.join(GroupRoleAssignment, GroupRoleAssignment.role_id == Role.id)
.join(UserGroupMembership, UserGroupMembership.group_id == GroupRoleAssignment.group_id)
.filter(UserGroupMembership.user_id == user.id, UserGroupMembership.tenant_id == user.tenant_id)
.all()
)
for role in group_roles:
roles_by_id[role.id] = role
return list(roles_by_id.values())
def collect_user_groups(session: Session, user: User) -> list[Group]:
return (
session.query(Group)
.join(UserGroupMembership, UserGroupMembership.group_id == Group.id)
.filter(UserGroupMembership.user_id == user.id, UserGroupMembership.tenant_id == user.tenant_id)
.all()
)
def collect_user_scopes(session: Session, user: User) -> list[str]:
if user.is_tenant_admin:
return ["*"]
scopes: set[str] = set()
for role in collect_user_roles(session, user):
scopes.update(role.permissions or [])
return sorted(scopes)