45 lines
1.8 KiB
Python
45 lines
1.8 KiB
Python
from __future__ import annotations
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.db.models import Group, UserGroupMembership
|
|
from app.storage.common import FileStorageError
|
|
|
|
|
|
def user_group_ids(session: Session, *, tenant_id: str, user_id: str, include_admin_groups: bool = False) -> list[str]:
|
|
if include_admin_groups:
|
|
return [row.id for row in session.query(Group).filter(Group.tenant_id == tenant_id).order_by(Group.name.asc()).all()]
|
|
return [
|
|
row.group_id
|
|
for row in session.query(UserGroupMembership)
|
|
.filter(UserGroupMembership.tenant_id == tenant_id, UserGroupMembership.user_id == user_id)
|
|
.all()
|
|
]
|
|
|
|
|
|
def ensure_group_access(session: Session, *, tenant_id: str, group_id: str, user_id: str, is_admin: bool = False) -> None:
|
|
group = session.get(Group, group_id)
|
|
if not group or group.tenant_id != tenant_id:
|
|
raise FileStorageError("Group not found")
|
|
if is_admin:
|
|
return
|
|
membership = (
|
|
session.query(UserGroupMembership)
|
|
.filter(UserGroupMembership.tenant_id == tenant_id, UserGroupMembership.user_id == user_id, UserGroupMembership.group_id == group_id)
|
|
.one_or_none()
|
|
)
|
|
if membership is None:
|
|
raise FileStorageError("No access to this group file space")
|
|
|
|
|
|
def ensure_owner_access(session: Session, *, tenant_id: str, owner_type: str, owner_id: str, user_id: str, is_admin: bool = False) -> None:
|
|
owner_type = owner_type.lower().strip()
|
|
if owner_type == "user":
|
|
if owner_id != user_id and not is_admin:
|
|
raise FileStorageError("No access to this user file space")
|
|
return
|
|
if owner_type == "group":
|
|
ensure_group_access(session, tenant_id=tenant_id, group_id=owner_id, user_id=user_id, is_admin=is_admin)
|
|
return
|
|
raise FileStorageError("Files must be owned by a user or group")
|