From e5a234037748a9ee676accb81c2007e05b24f8ab Mon Sep 17 00:00:00 2001 From: dogboat Date: Tue, 2 Jun 2026 15:31:09 -0400 Subject: [PATCH] update and optimize prefetcher --- dojo/api_v2/prefetch/authorized_querysets.py | 144 +++++++++ dojo/api_v2/prefetch/mixins.py | 4 +- dojo/api_v2/prefetch/prefetcher.py | 52 +++- dojo/api_v2/prefetch/registrations.py | 238 +++++++++++++++ unittests/test_apiv2_prefetch_rbac.py | 298 +++++++++++++++++++ 5 files changed, 725 insertions(+), 11 deletions(-) create mode 100644 dojo/api_v2/prefetch/authorized_querysets.py create mode 100644 dojo/api_v2/prefetch/registrations.py create mode 100644 unittests/test_apiv2_prefetch_rbac.py diff --git a/dojo/api_v2/prefetch/authorized_querysets.py b/dojo/api_v2/prefetch/authorized_querysets.py new file mode 100644 index 00000000000..eeafe772a51 --- /dev/null +++ b/dojo/api_v2/prefetch/authorized_querysets.py @@ -0,0 +1,144 @@ +""" +RBAC registry for the ``?prefetch=`` path. + +The prefetch mixins resolve a query-string field name through ``getattr`` on a +model instance, find a serializer for the resolved related model, and return +the serialized representation. This module allows us to specify authorization +checks on the related objects when serializing. + +``_Prefetcher`` filters every resolved related object through the registered +queryset before serializing it. If no policy is registered for a model, the +field is omitted from the response. +""" + +from collections.abc import Callable + +from django.db.models import Model, Q, QuerySet + +from dojo.authorization.authorization import user_has_configuration_permission +from dojo.models import Engagement, Finding, Notes, Test + +_REGISTRY: dict[type[Model], Callable[[object], QuerySet]] = {} + + +def discard_user(func): + """ + Adapter for auth helpers that don't accept a ``user`` parameter -- + wraps them so they can be passed to ``register()`` like any other policy. + """ + + def wrapper(*args, user, **kwargs): + return func(*args, **kwargs) + + return wrapper + + +def register(model: type[Model], func: Callable, *args, **kwargs) -> None: + """Register a policy for ``model``. At lookup, ``func`` is invoked as ``func(*args, user=, **kwargs)``.""" + + def policy(user): + return func(*args, user=user, **kwargs) + + _REGISTRY[model] = policy + + +def get_authorized_queryset(model: type[Model], user) -> QuerySet | None: + """ + Return the queryset of ``model`` instances visible to ``user``. + + Returns ``None`` when no policy has been registered. ``_Prefetcher`` + treats ``None`` as "deny" and omits the field from the response. + """ + if policy := _REGISTRY.get(model): + return policy(user) + return None + + +def superuser_only(model: type[Model], user) -> QuerySet: + """ + Policy for models whose top-level ViewSet enforces ``IsSuperUser`` + (strict ``request.user.is_superuser`` check). Only superusers pass. + """ + if user is not None and getattr(user, "is_superuser", False): + return model.objects.all() + return model.objects.none() + + +def django_view_perm(model: type[Model], user) -> QuerySet: + """ + Policy for models whose top-level ViewSet gates on DRF's ``DjangoModelPermissions``. + + Passes all superusers and any user holding ``.view_``. + """ + if user is None: + return model.objects.none() + perm = f"{model._meta.app_label}.view_{model._meta.model_name}" + if user.has_perm(perm): + return model.objects.all() + return model.objects.none() + + +def dojo_view_perm(model: type[Model], user) -> QuerySet: + """ + Policy for models whose top-level ViewSet gates on a DefectDojo + ``BaseDjangoModelPermission`` subclass that requires GET=view. + + Passes all superusers and staff users and any user holding ``.view_``. + """ + if user is None: + return model.objects.none() + perm = f"{model._meta.app_label}.view_{model._meta.model_name}" + if user_has_configuration_permission(user, perm): + return model.objects.all() + return model.objects.none() + + +def authenticated_only(model: type[Model], user) -> QuerySet: + """Policy for models whose top-level ViewSet is reachable by any authenticated user.""" + if user is not None and getattr(user, "is_authenticated", False): + return model.objects.all() + return model.objects.none() + + +def children_via_parent(child_model, parent_model, parent_field, *, user) -> QuerySet: + """ + Authorize ``child_model`` by deferring to the policy registered for + ``parent_model`` -- the child is visible iff the parent it points to via + ``parent_field`` is visible. Used for models that don't have their own + ``get_authorized_*`` helper but logically inherit authorization from a + parent (e.g. ``BurpRawRequestResponse`` -> ``Finding`` via ``finding``). + """ + if (parent_qs := get_authorized_queryset(parent_model, user)) is not None: + return child_model.objects.filter(**{f"{parent_field}__in": parent_qs}) + return child_model.objects.none() + + +def notes_policy(user) -> QuerySet: + """ + Authorization for the ``Notes`` model. + + Allows note viewership as follows: + * superuser: every note + * anyone else: a note is visible iff + (its attached Finding / Test / Engagement is visible to ``user``) + AND (the note is non-private OR ``user`` authored it). + """ + if user is None: + return Notes.objects.none() + if getattr(user, "is_superuser", False): + return Notes.objects.all() + + # Helper method to avoid unnecessary queryset fetching + def _qs_or_none(model, u): + qs = get_authorized_queryset(model, u) + return model.objects.none() if qs is None else qs + + finding_qs = _qs_or_none(Finding, user) + test_qs = _qs_or_none(Test, user) + engagement_qs = _qs_or_none(Engagement, user) + + parent_visible = Q(finding__in=finding_qs) | Q(test__in=test_qs) | Q(engagement__in=engagement_qs) + return Notes.objects.filter( + parent_visible, + Q(private=False) | Q(author=user), + ).distinct() diff --git a/dojo/api_v2/prefetch/mixins.py b/dojo/api_v2/prefetch/mixins.py index b77fc90dab1..949ce69f1f0 100644 --- a/dojo/api_v2/prefetch/mixins.py +++ b/dojo/api_v2/prefetch/mixins.py @@ -9,7 +9,7 @@ def list(self, request, *args, **kwargs): prefetch_params = request.GET.get("prefetch", "") prefetch_params = prefetch_params.split(",") if "," in prefetch_params else request.GET.getlist("prefetch") - prefetcher = _Prefetcher() + prefetcher = _Prefetcher(request=request) # Apply the same operations as the standard list method defined in the # django rest framework @@ -35,7 +35,7 @@ def retrieve(self, request, *args, **kwargs): prefetch_params = request.GET.get("prefetch", "") prefetch_params = prefetch_params.split(",") if "," in prefetch_params else request.GET.getlist("prefetch") - prefetcher = _Prefetcher() + prefetcher = _Prefetcher(request=request) entry = self.get_object() serializer = self.get_serializer() diff --git a/dojo/api_v2/prefetch/prefetcher.py b/dojo/api_v2/prefetch/prefetcher.py index 6d290b6b06b..5219f576fcd 100644 --- a/dojo/api_v2/prefetch/prefetcher.py +++ b/dojo/api_v2/prefetch/prefetcher.py @@ -4,7 +4,11 @@ from django.conf import settings from rest_framework.serializers import ModelSerializer +from dojo.api_v2.prefetch import ( + registrations as _registrations, # noqa: F401 -- side-effect import populates the RBAC registry +) from dojo.api_v2.prefetch import utils +from dojo.api_v2.prefetch.authorized_querysets import get_authorized_queryset from dojo.location.api.serializers import LocationFindingReferenceSerializer, LocationSerializer from dojo.location.models import Location, LocationFindingReference from dojo.models import FileUpload, Finding @@ -36,7 +40,8 @@ def _is_model_serializer(obj): # We process all the serializers found in the module SERIALIZER_DEFS_MODULE. We restrict the scope to avoid # processing all the classes in the symbol table available_serializers = inspect.getmembers( - sys.modules[SERIALIZER_DEFS_MODULE], _is_model_serializer, + sys.modules[SERIALIZER_DEFS_MODULE], + _is_model_serializer, ) for _, serializer in available_serializers: @@ -51,9 +56,10 @@ def _is_model_serializer(obj): return serializers - def __init__(self): + def __init__(self, request=None): self._serializers = _Prefetcher._build_serializers() self._prefetch_data = {} + self._request = request def _find_serializer(self, field_type): """ @@ -136,6 +142,8 @@ def _prefetch(self, entry, fields_to_fetch): field_to_fetch (list[string]): fields to prefetch """ + user = getattr(self._request, "user", None) if self._request else None + for field_to_fetch in fields_to_fetch: # Get the field from the instance field_value, many = self.get_field_value(entry, field_to_fetch) @@ -148,16 +156,42 @@ def _prefetch(self, entry, fields_to_fetch): if extra_serializer is None: continue - field_data = extra_serializer(many=many).to_representation( + # Authorization gate: only serialize objects from the requester's + # authorized queryset for this model. Deny-by-default -- models + # with no registered policy are omitted from the prefetch payload. + authorized_qs = get_authorized_queryset(model_type, user) + if authorized_qs is None: + continue + + # Match the legacy contract: the field key always appears in the + # prefetch payload (possibly empty) once we have a policy. Tests + # and clients can rely on the key being present whenever the + # primary field is non-null on the entry. + self._prefetch_data.setdefault(field_to_fetch, {}) + + # Check related object authorizations + if many: + related_qs = field_value.all() if hasattr(field_value, "all") else field_value + related_ids = list(related_qs.values_list("pk", flat=True)) + if not related_ids: + continue + # Set `field_value` (what will be serialized later) to the set of objects the user has permission to + field_value = authorized_qs.filter(pk__in=related_ids) + if not field_value.exists(): + continue + # Only a single related item; check it's in the set of authorized objects. If not, skip it. + elif not authorized_qs.filter(pk=field_value.pk).exists(): + continue + + serializer_kwargs = {"many": many} + if self._request is not None: + # Add in the request for the serializers to use if they want + serializer_kwargs["context"] = {"request": self._request} + field_data = extra_serializer(**serializer_kwargs).to_representation( field_value, ) # For convenience in processing we store the field data in a list - field_data_list = ( - field_data if isinstance(field_data, list) else [field_data] - ) - - if field_to_fetch not in self._prefetch_data: - self._prefetch_data[field_to_fetch] = {} + field_data_list = field_data if isinstance(field_data, list) else [field_data] # Should not fail as django always generate an id field for data in field_data_list: diff --git a/dojo/api_v2/prefetch/registrations.py b/dojo/api_v2/prefetch/registrations.py new file mode 100644 index 00000000000..44ecdc62bb7 --- /dev/null +++ b/dojo/api_v2/prefetch/registrations.py @@ -0,0 +1,238 @@ +""" +Prefetch RBAC policy registrations. + +Each register() call maps a model class to a callable ``(user) -> QuerySet`` of +model instances visible to that user. Policies are chosen to mirror the +authorization enforced by the model's top-level ViewSet: + +* ``superuser_only`` for models behind ``IsSuperUser`` +* ``authenticated_only`` for models behind plain ``IsAuthenticated`` +* ``django_view_perm`` for models behind a ``DjangoModelPermissions`` subclass +* ``dojo_view_perm`` for models behind a ``BaseDjangoModelPermission`` subclass with a GET permission map entry +* ``children_via_parent`` for models where authorization is determined by the related parent FK, not the class itself +* Delegation to the matching ``get_authorized_*`` helper for object-permission +* Custom policies where necessary (so far, only Notes) + +Models that are not registered here are denied by ``_Prefetcher``; a +newly added FK from a prefetch-enabled ViewSet will silently disappear from +the response until someone explicitly sets a policy for it. +""" + +from django.contrib.auth.models import User + +from dojo.api_v2.prefetch.authorized_querysets import ( + authenticated_only, + children_via_parent, + discard_user, + django_view_perm, + dojo_view_perm, + notes_policy, + register, + superuser_only, +) +from dojo.endpoint.queries import ( + get_authorized_endpoint_status, + get_authorized_endpoints, +) +from dojo.engagement.queries import get_authorized_engagements +from dojo.finding.queries import ( + get_authorized_findings, + get_authorized_vulnerability_ids, +) +from dojo.finding_group.queries import get_authorized_finding_groups +from dojo.github.models import GITHUB_Issue, GITHUB_PKey +from dojo.jira.models import JIRA_Instance, JIRA_Issue, JIRA_Project +from dojo.jira.queries import ( + get_authorized_jira_issues, + get_authorized_jira_projects, +) +from dojo.location.models import ( + Location, + LocationFindingReference, + LocationProductReference, +) +from dojo.location.queries import ( + get_authorized_location_finding_reference, + get_authorized_location_product_reference, + get_authorized_locations, +) +from dojo.models import ( + App_Analysis, + Benchmark_Product, + Benchmark_Product_Summary, + BurpRawRequestResponse, + Check_List, + Development_Environment, + Dojo_User, + DojoMeta, + Endpoint, + Endpoint_Params, + Endpoint_Status, + Engagement, + Engagement_Presets, + FileUpload, + Finding, + Finding_Group, + Language_Type, + Languages, + Network_Locations, + Notes, + Objects_Product, + Product, + Product_API_Scan_Configuration, + Product_Type, + Regulation, + Risk_Acceptance, + SLA_Configuration, + Sonarqube_Issue, + Test, + Test_Import, + Test_Import_Finding_Action, + Test_Type, + Tool_Configuration, + Tool_Product_History, + Tool_Product_Settings, + Tool_Type, + UserContactInfo, + Vulnerability_Id, +) +from dojo.notifications.models import Notification_Webhooks, Notifications +from dojo.product.queries import ( + get_authorized_app_analysis, + get_authorized_dojo_meta, + get_authorized_engagement_presets, + get_authorized_languages, + get_authorized_product_api_scan_configurations, + get_authorized_products, +) +from dojo.product_type.queries import get_authorized_product_types +from dojo.risk_acceptance.queries import get_authorized_risk_acceptances +from dojo.test.queries import get_authorized_test_imports, get_authorized_tests +from dojo.tool_product.queries import get_authorized_tool_product_settings +from dojo.url.models import URL + +######## +# Models backed by ViewSets (api_v2.views) from which we can derive the required permission check. +######## + + +# Superusers only +for model in ( + UserContactInfo, # UserContactInfoViewSet + Sonarqube_Issue, # SonarqubeIssueViewSet + Notifications, # NotificationsViewSet + Notification_Webhooks, # NotificationWebhooksViewSet + URL, # URLViewSet +): + register(model, superuser_only, model) + + +# Models where we need to check whether the user has "view" permissions. +for model in ( + Dojo_User, # UsersViewSet + Tool_Configuration, # ToolConfigurationsViewSet + Tool_Type, # ToolTypesViewSet + JIRA_Instance, # JiraInstanceViewSet + Language_Type, # LanguageTypeViewSet +): + register(model, django_view_perm, model) + + +# Models where we need to check "view" config permissions. Basically the same as above but includes staff viewership. +for model in ( + SLA_Configuration, # SLAConfigurationViewset (UserHasSLAPermission) +): + register(model, dojo_view_perm, model) + + +# Custom policy checks. +# Currently, only Notes: prefetchable through e.g. findings endpoint, but the set of Notes a user can prefetch depends +# on extra lookup logic. Notes _are_ backed by a ViewSet (NotesViewSet), but it restricts to superusers only, which +# isn't what we really want for prefetching -- users should be able to see their own notes! +register(Notes, notes_policy) + + +# Authentication is all that's required. These respective ViewSets have empty/non-existent GET entries for their +# perms_map, so are generally viewable for authenticated users. +for model in ( + Test_Type, # TestTypesViewSet + Development_Environment, # DevelopmentEnvironmentViewSet + Regulation, # RegulationsViewSet + Network_Locations, # NetworkLocationsViewset +): + register(model, authenticated_only, model) + + +# Models where we can simply fall back to a `get_authorized_*` method to check auth +for model, helper in ( + (Endpoint, get_authorized_endpoints), # EndPointViewSet + (Endpoint_Status, get_authorized_endpoint_status), # EndpointStatusViewSet + (Engagement, get_authorized_engagements), # EngagementViewSet + (Finding, get_authorized_findings), # FindingViewSet + (Product, get_authorized_products), # ProductViewSet + (Product_Type, get_authorized_product_types), # ProductTypeViewSet + (Test, get_authorized_tests), # TestsViewSet + (Test_Import, get_authorized_test_imports), # TestImportViewSet + (Risk_Acceptance, get_authorized_risk_acceptances), # RiskAcceptanceViewSet + (DojoMeta, get_authorized_dojo_meta), # DojoMetaViewSet + (App_Analysis, get_authorized_app_analysis), # AppAnalysisViewSet + (Languages, get_authorized_languages), # LanguageViewSet + (Engagement_Presets, get_authorized_engagement_presets), # EngagementPresetsViewset + ( + Product_API_Scan_Configuration, + get_authorized_product_api_scan_configurations, + ), # ProductAPIScanConfigurationViewSet + (Tool_Product_Settings, get_authorized_tool_product_settings), # ToolProductSettingsViewSet + (JIRA_Project, get_authorized_jira_projects), # JiraProjectViewSet + (JIRA_Issue, get_authorized_jira_issues), # JiraIssuesViewSet + (Location, get_authorized_locations), # LocationViewSet + (LocationFindingReference, get_authorized_location_finding_reference), # LocationFindingReferenceViewSet + (LocationProductReference, get_authorized_location_product_reference), # LocationProductReferenceViewSet +): + register(model, discard_user(helper), "view") + + +# Models where authorization is inherited from the parent the FK points to. +for child, parent, field in ( + (BurpRawRequestResponse, Finding, "finding"), # BurpRawRequestResponseViewSet +): + register(child, children_via_parent, child, parent, field) + + +######## +# Models *NOT* backed by ViewSets (api_v2.views) for authorization reference. +######## + + +# Defaulting to superuser required. Can be loosened if necessary, just playing it safe. +for model in ( + Endpoint_Params, # m2m from Endpoint.endpoint_params + FileUpload, # m2m from Finding/Test/Engagement.files +): + register(model, superuser_only, model) + + +# Models where we can simply fall back to a `get_authorized_*` method to check auth +for model, helper in ( + (Finding_Group, get_authorized_finding_groups), + (Vulnerability_Id, get_authorized_vulnerability_ids), +): + register(model, discard_user(helper), "view") + + +# Models where authorization is inherited from the parent the FK points to. +for child, parent, field in ( + (GITHUB_Issue, Finding, "finding"), + (Test_Import_Finding_Action, Test_Import, "test_import"), + (Check_List, Engagement, "engagement"), + (Benchmark_Product, Product, "product"), + (Benchmark_Product_Summary, Product, "product"), + (Objects_Product, Product, "product"), + (GITHUB_PKey, Product, "product"), + (Tool_Product_History, Tool_Product_Settings, "product"), +): + register(child, children_via_parent, child, parent, field) + + +# Playing it safe: the raw User model isn't exposed via ViewSet or serializer usage, but clamp it down just in case. +register(User, django_view_perm, User) diff --git a/unittests/test_apiv2_prefetch_rbac.py b/unittests/test_apiv2_prefetch_rbac.py new file mode 100644 index 00000000000..c86adf08a30 --- /dev/null +++ b/unittests/test_apiv2_prefetch_rbac.py @@ -0,0 +1,298 @@ +""" +Regression tests for the prefetch RBAC gate. + +The ``?prefetch=`` query parameter on viewsets that inherit +``PrefetchDojoModelViewSet`` used to bypass the authorization of the +related viewset entirely (see security report sub-vectors 4a/4b/4c/4e). +These tests pin the corrected behaviour: a non-superuser making the same +request must not see related objects whose top-level viewset is +superuser-only, while a superuser still receives the same payload as +before. +""" + +from django.contrib.auth.models import Permission +from django.contrib.auth.models import User as DjangoUser +from rest_framework.authtoken.models import Token +from rest_framework.test import APIClient + +from dojo.api_v2.prefetch import authorized_querysets +from dojo.models import ( + Dojo_User, + Engagement, + Finding, + Notes, + Product, + Product_Member, + Test, + Test_Type, + Tool_Configuration, + Tool_Product_Settings, + Tool_Type, +) +from unittests.dojo_test_case import DojoAPITestCase, versioned_fixtures + + +@versioned_fixtures +class PrefetchRBACTest(DojoAPITestCase): + + """Verify that the prefetch path enforces authorization on related objects.""" + + fixtures = ["dojo_testdata.json"] + + def setUp(self): + # A regular (non-superuser) user with Owner role on product 1 -- the + # bypass under test would have allowed this account to enumerate + # users, tool configurations, and notes despite the superuser-only + # guard on those viewsets. + self.reader = Dojo_User.objects.get(username="user2") + self.reader.is_superuser = False + self.reader.is_staff = False + self.reader.save() + self.reader_token, _ = Token.objects.get_or_create(user=self.reader) + + self.admin = Dojo_User.objects.get(username="admin") + self.admin_token, _ = Token.objects.get_or_create(user=self.admin) + + self.product = Product.objects.get(pk=1) + # OSS authorization keys off the legacy ``authorized_users`` M2M + # (Pro replaces this with Product_Member through the auth-filter + # plugin -- see dojo.authorization.query_registrations). + self.product.authorized_users.add(self.reader) + Product_Member.objects.get_or_create( + product=self.product, + user=self.reader, + defaults={"role_id": 4}, + ) + + engagement = Engagement.objects.filter(product=self.product).first() + if engagement is None: + engagement = Engagement.objects.create( + product=self.product, + name="prefetch-rbac-eng", + target_start="2026-01-01", + target_end="2026-01-02", + ) + + test_type, _ = Test_Type.objects.get_or_create(name="prefetch-rbac-tt") + test = Test.objects.filter(engagement=engagement).first() + if test is None: + test = Test.objects.create( + engagement=engagement, + test_type=test_type, + target_start="2026-01-01", + target_end="2026-01-02", + lead=self.admin, + ) + + self.finding = Finding.objects.filter(test=test).first() + if self.finding is None: + self.finding = Finding.objects.create( + title="prefetch-rbac-finding", + test=test, + reporter=self.admin, + severity="Info", + numerical_severity="S4", + ) + + # A private note attached to the finding. The leak in sub-vector 4e + # is most acute for these. + self.private_note = Notes.objects.create( + entry="INTERNAL: prefetch-rbac private note", + author=self.admin, + private=True, + ) + self.finding.notes.add(self.private_note) + + # A Tool_Configuration linked to the product through Tool_Product_Settings + # is the exact shape exploited in sub-vector 4b. + tool_type, _ = Tool_Type.objects.get_or_create(name="prefetch-rbac-tt") + self.tool_config = Tool_Configuration.objects.create( + name="Internal-Tool-prefetch-rbac", + url="https://internal.example.invalid", + username="svc-account-prefetch-rbac", + authentication_type="API", + api_key="should-not-leak", + tool_type=tool_type, + ) + self.tool_product_settings = Tool_Product_Settings.objects.create( + name="prefetch-rbac-tps", + product=self.product, + tool_configuration=self.tool_config, + url="https://internal.example.invalid", + ) + + # ---- 4a: user enumeration via Finding.reporter ----------------------- + + def _client(self, token): + client = APIClient() + client.credentials(HTTP_AUTHORIZATION=f"Token {token.key}") + return client + + def test_admin_can_prefetch_reporter(self): + """Superuser baseline -- prefetched reporter is still returned.""" + resp = self._client(self.admin_token).get( + f"/api/v2/findings/{self.finding.pk}/?prefetch=reporter", + ) + self.assertEqual(200, resp.status_code, resp.content[:500]) + prefetch = resp.json().get("prefetch", {}) + self.assertIn("reporter", prefetch) + self.assertIn(str(self.admin.pk), prefetch["reporter"]) + + def test_reader_cannot_prefetch_reporter(self): + """Sub-vector 4a -- a non-superuser must not receive user data via prefetch.""" + resp = self._client(self.reader_token).get( + f"/api/v2/findings/{self.finding.pk}/?prefetch=reporter", + ) + self.assertEqual(200, resp.status_code, resp.content[:500]) + prefetch = resp.json().get("prefetch", {}) + # Either the key is absent or it is present but empty -- in both + # cases no user data has been disclosed. + self.assertFalse(prefetch.get("reporter")) + + def test_user_with_view_perm_can_prefetch_reporter(self): + """ + ``django_view_perm`` lets a non-superuser with an explicit + ``dojo.view_dojo_user`` grant prefetch reporter -- matching what + ``UsersViewSet`` (gated by DjangoModelPermissions) already allows + them to do via the top-level endpoint. + """ + view_user = Permission.objects.get( + content_type__app_label="dojo", + codename="view_dojo_user", + ) + self.reader.user_permissions.add(view_user) + # has_perm caches per instance -- reload to pick up the new perm. + self.reader = Dojo_User.objects.get(pk=self.reader.pk) + self.reader_token, _ = Token.objects.get_or_create(user=self.reader) + + resp = self._client(self.reader_token).get( + f"/api/v2/findings/{self.finding.pk}/?prefetch=reporter", + ) + self.assertEqual(200, resp.status_code, resp.content[:500]) + prefetch = resp.json().get("prefetch", {}) + self.assertIn("reporter", prefetch) + self.assertIn(str(self.admin.pk), prefetch["reporter"]) + + # ---- 4b: tool configuration disclosure ------------------------------- + + def test_admin_can_prefetch_tool_configuration(self): + resp = self._client(self.admin_token).get( + f"/api/v2/tool_product_settings/{self.tool_product_settings.pk}/?prefetch=tool_configuration", + ) + self.assertEqual(200, resp.status_code, resp.content[:500]) + prefetch = resp.json().get("prefetch", {}) + self.assertIn("tool_configuration", prefetch) + self.assertIn(str(self.tool_config.pk), prefetch["tool_configuration"]) + + def test_reader_cannot_prefetch_tool_configuration(self): + """ + Sub-vector 4b -- prefetching tool_configuration must not leak the + URL, service-account username, or extras field to a non-superuser. + """ + resp = self._client(self.reader_token).get( + f"/api/v2/tool_product_settings/{self.tool_product_settings.pk}/?prefetch=tool_configuration", + ) + self.assertEqual(200, resp.status_code, resp.content[:500]) + prefetch = resp.json().get("prefetch", {}) + leaked = prefetch.get("tool_configuration", {}) + self.assertFalse( + leaked, + f"tool_configuration disclosed via prefetch to non-superuser: {leaked!r}", + ) + + # ---- 4e: private notes disclosure ------------------------------------ + + def test_admin_can_prefetch_notes(self): + resp = self._client(self.admin_token).get( + f"/api/v2/findings/{self.finding.pk}/?prefetch=notes", + ) + self.assertEqual(200, resp.status_code, resp.content[:500]) + prefetch = resp.json().get("prefetch", {}) + self.assertIn("notes", prefetch) + self.assertIn(str(self.private_note.pk), prefetch["notes"]) + + def test_reader_cannot_prefetch_private_note_from_other_author(self): + """ + Sub-vector 4e -- a private note written by someone else must not be + returned to a non-superuser via prefetch (matches the existing UI + behaviour where ``notes.filter(private=False)`` hides them). + """ + resp = self._client(self.reader_token).get( + f"/api/v2/findings/{self.finding.pk}/?prefetch=notes", + ) + self.assertEqual(200, resp.status_code, resp.content[:500]) + prefetch = resp.json().get("prefetch", {}) + leaked = prefetch.get("notes", {}) + self.assertNotIn(str(self.private_note.pk), leaked) + for note in leaked.values(): + self.assertNotIn( + "INTERNAL: prefetch-rbac private note", + note.get("entry", ""), + ) + + def test_reader_can_prefetch_public_notes(self): + """ + ``notes_policy`` lets a non-superuser see non-private notes on + findings they have parent-product access to. + """ + public_note = Notes.objects.create( + entry="public note visible to readers", + author=self.admin, + private=False, + ) + self.finding.notes.add(public_note) + + resp = self._client(self.reader_token).get( + f"/api/v2/findings/{self.finding.pk}/?prefetch=notes", + ) + self.assertEqual(200, resp.status_code, resp.content[:500]) + prefetch = resp.json().get("prefetch", {}) + self.assertIn(str(public_note.pk), prefetch.get("notes", {})) + # The private note authored by admin must still be hidden. + self.assertNotIn(str(self.private_note.pk), prefetch.get("notes", {})) + + def test_reader_can_prefetch_own_private_notes(self): + """ + ``notes_policy`` lets a non-superuser see their own private notes + even on findings where they're not the author of every note. + """ + own_private = Notes.objects.create( + entry="reader's own private note", + author=self.reader, + private=True, + ) + self.finding.notes.add(own_private) + + resp = self._client(self.reader_token).get( + f"/api/v2/findings/{self.finding.pk}/?prefetch=notes", + ) + self.assertEqual(200, resp.status_code, resp.content[:500]) + prefetch = resp.json().get("prefetch", {}) + self.assertIn(str(own_private.pk), prefetch.get("notes", {})) + # admin's private note must still be hidden. + self.assertNotIn(str(self.private_note.pk), prefetch.get("notes", {})) + + # ---- defense in depth: unregistered models are denied ---------------- + + def test_unregistered_model_is_denied_by_default(self): + """ + An attempt to prefetch a field whose related model has no + registered policy must return an empty prefetch payload, not the + unfiltered serialized object. + """ + # Pretend Dojo_User has no registered policy. The deny-by-default + # path must kick in and the field must not appear in the response. + original_dojo_user = authorized_querysets._REGISTRY.pop(Dojo_User, None) + original_user = authorized_querysets._REGISTRY.pop(DjangoUser, None) + try: + resp = self._client(self.admin_token).get( + f"/api/v2/findings/{self.finding.pk}/?prefetch=reporter", + ) + self.assertEqual(200, resp.status_code) + prefetch = resp.json().get("prefetch", {}) + self.assertFalse(prefetch.get("reporter")) + finally: + if original_dojo_user is not None: + authorized_querysets._REGISTRY[Dojo_User] = original_dojo_user + if original_user is not None: + authorized_querysets._REGISTRY[DjangoUser] = original_user