from __future__ import annotations import hashlib import hmac import secrets from dataclasses import dataclass from datetime import timedelta from sqlalchemy.orm import Session from app.db.models import AuthSession, Group, GroupRoleAssignment, Role, User, UserGroupMembership, UserRoleAssignment from app.security.time import ensure_aware_utc, utc_now SESSION_RANDOM_BYTES = 32 DEFAULT_SESSION_HOURS = 12 @dataclass(slots=True) class CreatedSession: model: AuthSession token: str def generate_session_token() -> str: return "ms_" + secrets.token_urlsafe(SESSION_RANDOM_BYTES) def hash_session_token(token: str) -> str: return hashlib.sha256(token.encode("utf-8")).hexdigest() def verify_session_token(token: str, expected_hash: str) -> bool: return hmac.compare_digest(hash_session_token(token), expected_hash) def create_auth_session( session: Session, *, user: User, hours: int = DEFAULT_SESSION_HOURS, user_agent: str | None = None, ip_address: str | None = None, ) -> CreatedSession: token = generate_session_token() now = utc_now() model = AuthSession( tenant_id=user.tenant_id, user_id=user.id, token_hash=hash_session_token(token), expires_at=now + timedelta(hours=hours), last_seen_at=now, user_agent=user_agent, ip_address=ip_address, ) user.last_login_at = now session.add(model) session.add(user) session.flush() return CreatedSession(model=model, token=token) def authenticate_session_token(session: Session, token: str) -> AuthSession | None: token_hash = hash_session_token(token) model = session.query(AuthSession).filter(AuthSession.token_hash == token_hash, AuthSession.revoked_at.is_(None)).one_or_none() if not model: return None now = utc_now() expires_at = ensure_aware_utc(model.expires_at) if expires_at is None or expires_at < now: return None model.last_seen_at = now session.add(model) return model def revoke_auth_session(session: Session, token: str) -> bool: model = authenticate_session_token(session, token) if not model: return False model.revoked_at = utc_now() session.add(model) return True def collect_user_roles(session: Session, user: User) -> list[Role]: roles_by_id: dict[str, Role] = {} direct = ( session.query(Role) .join(UserRoleAssignment, UserRoleAssignment.role_id == Role.id) .filter(UserRoleAssignment.user_id == user.id, UserRoleAssignment.tenant_id == user.tenant_id) .all() ) for role in direct: roles_by_id[role.id] = role group_roles = ( session.query(Role) .join(GroupRoleAssignment, GroupRoleAssignment.role_id == Role.id) .join(UserGroupMembership, UserGroupMembership.group_id == GroupRoleAssignment.group_id) .filter(UserGroupMembership.user_id == user.id, UserGroupMembership.tenant_id == user.tenant_id) .all() ) for role in group_roles: roles_by_id[role.id] = role return list(roles_by_id.values()) def collect_user_groups(session: Session, user: User) -> list[Group]: return ( session.query(Group) .join(UserGroupMembership, UserGroupMembership.group_id == Group.id) .filter(UserGroupMembership.user_id == user.id, UserGroupMembership.tenant_id == user.tenant_id) .all() ) def collect_user_scopes(session: Session, user: User) -> list[str]: if user.is_tenant_admin: return ["*"] scopes: set[str] = set() for role in collect_user_roles(session, user): scopes.update(role.permissions or []) return sorted(scopes)