Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions dojo/api_v2/prefetch/authorized_querysets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
"""
RBAC registry for the ``?prefetch=`` path.

The prefetch mixins resolve a query-string field name through ``getattr`` on a
model instance, find a serializer for the resolved related model, and return
the serialized representation. This module allows us to specify authorization
checks on the related objects when serializing.

``_Prefetcher`` filters every resolved related object through the registered
queryset before serializing it. If no policy is registered for a model, the
field is omitted from the response.
"""

from collections.abc import Callable

from django.db.models import Model, Q, QuerySet

from dojo.authorization.authorization import user_has_configuration_permission
from dojo.models import Engagement, Finding, Notes, Test

_REGISTRY: dict[type[Model], Callable[[object], QuerySet]] = {}


def discard_user(func):
"""
Adapter for auth helpers that don't accept a ``user`` parameter --
wraps them so they can be passed to ``register()`` like any other policy.
"""

def wrapper(*args, user, **kwargs):
return func(*args, **kwargs)

return wrapper


def register(model: type[Model], func: Callable, *args, **kwargs) -> None:
"""Register a policy for ``model``. At lookup, ``func`` is invoked as ``func(*args, user=<requesting user>, **kwargs)``."""

def policy(user):
return func(*args, user=user, **kwargs)

_REGISTRY[model] = policy


def get_authorized_queryset(model: type[Model], user) -> QuerySet | None:
"""
Return the queryset of ``model`` instances visible to ``user``.

Returns ``None`` when no policy has been registered. ``_Prefetcher``
treats ``None`` as "deny" and omits the field from the response.
"""
if policy := _REGISTRY.get(model):
return policy(user)
return None


def superuser_only(model: type[Model], user) -> QuerySet:
"""
Policy for models whose top-level ViewSet enforces ``IsSuperUser``
(strict ``request.user.is_superuser`` check). Only superusers pass.
"""
if user is not None and getattr(user, "is_superuser", False):
return model.objects.all()
return model.objects.none()


def django_view_perm(model: type[Model], user) -> QuerySet:
"""
Policy for models whose top-level ViewSet gates on DRF's ``DjangoModelPermissions``.

Passes all superusers and any user holding ``<app_label>.view_<model_name>``.
"""
if user is None:
return model.objects.none()
perm = f"{model._meta.app_label}.view_{model._meta.model_name}"
if user.has_perm(perm):
return model.objects.all()
return model.objects.none()


def dojo_view_perm(model: type[Model], user) -> QuerySet:
"""
Policy for models whose top-level ViewSet gates on a DefectDojo
``BaseDjangoModelPermission`` subclass that requires GET=view.

Passes all superusers and staff users and any user holding ``<app_label>.view_<model_name>``.
"""
if user is None:
return model.objects.none()
perm = f"{model._meta.app_label}.view_{model._meta.model_name}"
if user_has_configuration_permission(user, perm):
return model.objects.all()
return model.objects.none()


def authenticated_only(model: type[Model], user) -> QuerySet:
"""Policy for models whose top-level ViewSet is reachable by any authenticated user."""
if user is not None and getattr(user, "is_authenticated", False):
return model.objects.all()
return model.objects.none()


def children_via_parent(child_model, parent_model, parent_field, *, user) -> QuerySet:
"""
Authorize ``child_model`` by deferring to the policy registered for
``parent_model`` -- the child is visible iff the parent it points to via
``parent_field`` is visible. Used for models that don't have their own
``get_authorized_*`` helper but logically inherit authorization from a
parent (e.g. ``BurpRawRequestResponse`` -> ``Finding`` via ``finding``).
"""
if (parent_qs := get_authorized_queryset(parent_model, user)) is not None:
return child_model.objects.filter(**{f"{parent_field}__in": parent_qs})
return child_model.objects.none()


def notes_policy(user) -> QuerySet:
"""
Authorization for the ``Notes`` model.

Allows note viewership as follows:
* superuser: every note
* anyone else: a note is visible iff
(its attached Finding / Test / Engagement is visible to ``user``)
AND (the note is non-private OR ``user`` authored it).
"""
if user is None:
return Notes.objects.none()
if getattr(user, "is_superuser", False):
return Notes.objects.all()

# Helper method to avoid unnecessary queryset fetching
def _qs_or_none(model, u):
qs = get_authorized_queryset(model, u)
return model.objects.none() if qs is None else qs

finding_qs = _qs_or_none(Finding, user)
test_qs = _qs_or_none(Test, user)
engagement_qs = _qs_or_none(Engagement, user)

parent_visible = Q(finding__in=finding_qs) | Q(test__in=test_qs) | Q(engagement__in=engagement_qs)
return Notes.objects.filter(
parent_visible,
Q(private=False) | Q(author=user),
).distinct()
4 changes: 2 additions & 2 deletions dojo/api_v2/prefetch/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def list(self, request, *args, **kwargs):
prefetch_params = request.GET.get("prefetch", "")
prefetch_params = prefetch_params.split(",") if "," in prefetch_params else request.GET.getlist("prefetch")

prefetcher = _Prefetcher()
prefetcher = _Prefetcher(request=request)

# Apply the same operations as the standard list method defined in the
# django rest framework
Expand All @@ -35,7 +35,7 @@ def retrieve(self, request, *args, **kwargs):
prefetch_params = request.GET.get("prefetch", "")
prefetch_params = prefetch_params.split(",") if "," in prefetch_params else request.GET.getlist("prefetch")

prefetcher = _Prefetcher()
prefetcher = _Prefetcher(request=request)

entry = self.get_object()
serializer = self.get_serializer()
Expand Down
52 changes: 43 additions & 9 deletions dojo/api_v2/prefetch/prefetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
from django.conf import settings
from rest_framework.serializers import ModelSerializer

from dojo.api_v2.prefetch import (
registrations as _registrations, # noqa: F401 -- side-effect import populates the RBAC registry
)
from dojo.api_v2.prefetch import utils
from dojo.api_v2.prefetch.authorized_querysets import get_authorized_queryset
from dojo.location.api.serializers import LocationFindingReferenceSerializer, LocationSerializer
from dojo.location.models import Location, LocationFindingReference
from dojo.models import FileUpload, Finding
Expand Down Expand Up @@ -36,7 +40,8 @@ def _is_model_serializer(obj):
# We process all the serializers found in the module SERIALIZER_DEFS_MODULE. We restrict the scope to avoid
# processing all the classes in the symbol table
available_serializers = inspect.getmembers(
sys.modules[SERIALIZER_DEFS_MODULE], _is_model_serializer,
sys.modules[SERIALIZER_DEFS_MODULE],
_is_model_serializer,
)

for _, serializer in available_serializers:
Expand All @@ -51,9 +56,10 @@ def _is_model_serializer(obj):

return serializers

def __init__(self):
def __init__(self, request=None):
self._serializers = _Prefetcher._build_serializers()
self._prefetch_data = {}
self._request = request

def _find_serializer(self, field_type):
"""
Expand Down Expand Up @@ -136,6 +142,8 @@ def _prefetch(self, entry, fields_to_fetch):
field_to_fetch (list[string]): fields to prefetch

"""
user = getattr(self._request, "user", None) if self._request else None

for field_to_fetch in fields_to_fetch:
# Get the field from the instance
field_value, many = self.get_field_value(entry, field_to_fetch)
Expand All @@ -148,16 +156,42 @@ def _prefetch(self, entry, fields_to_fetch):
if extra_serializer is None:
continue

field_data = extra_serializer(many=many).to_representation(
# Authorization gate: only serialize objects from the requester's
# authorized queryset for this model. Deny-by-default -- models
# with no registered policy are omitted from the prefetch payload.
authorized_qs = get_authorized_queryset(model_type, user)
if authorized_qs is None:
continue

# Match the legacy contract: the field key always appears in the
# prefetch payload (possibly empty) once we have a policy. Tests
# and clients can rely on the key being present whenever the
# primary field is non-null on the entry.
self._prefetch_data.setdefault(field_to_fetch, {})

# Check related object authorizations
if many:
related_qs = field_value.all() if hasattr(field_value, "all") else field_value
related_ids = list(related_qs.values_list("pk", flat=True))
if not related_ids:
continue
# Set `field_value` (what will be serialized later) to the set of objects the user has permission to
field_value = authorized_qs.filter(pk__in=related_ids)
if not field_value.exists():
continue
# Only a single related item; check it's in the set of authorized objects. If not, skip it.
elif not authorized_qs.filter(pk=field_value.pk).exists():
continue

serializer_kwargs = {"many": many}
if self._request is not None:
# Add in the request for the serializers to use if they want
serializer_kwargs["context"] = {"request": self._request}
field_data = extra_serializer(**serializer_kwargs).to_representation(
field_value,
)
# For convenience in processing we store the field data in a list
field_data_list = (
field_data if isinstance(field_data, list) else [field_data]
)

if field_to_fetch not in self._prefetch_data:
self._prefetch_data[field_to_fetch] = {}
field_data_list = field_data if isinstance(field_data, list) else [field_data]

# Should not fail as django always generate an id field
for data in field_data_list:
Expand Down
Loading
Loading