diff --git a/.gitignore b/.gitignore index 73c3d9b..e3fed49 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,10 @@ log/ logs/ target/ +# python +/.venv/ +__pycache__/ + *.log pdp-gui/json dependency.tree diff --git a/src/test/java/pdp/PolicyHarnessTest.java b/src/test/java/pdp/PolicyHarnessTest.java index e049d01..95e4be0 100644 --- a/src/test/java/pdp/PolicyHarnessTest.java +++ b/src/test/java/pdp/PolicyHarnessTest.java @@ -67,8 +67,12 @@ protected void beforeEach() { @TestFactory Stream policyHarness() throws Exception { String policy = System.getProperty("policy"); - return Stream.of(Objects.requireNonNull(new ClassPathResource("test-harness").getFile().listFiles())) + return Stream.concat( + Stream.of(Objects.requireNonNull(new ClassPathResource("test-harness").getFile().listFiles())), + Stream.of(Objects.requireNonNull(new ClassPathResource("test-harness-generated").getFile().listFiles())) + ) .filter(File::isDirectory) + .filter(file -> (file.getName().matches("^[a-zA-Z].*"))) .filter(file -> policy == null || file.getName().equalsIgnoreCase(policy)) .map(directory -> DynamicTest.dynamicTest( "Policy harness: " + directory.getName(), diff --git a/src/test/resources/test-harness-generated/generate-tests.py b/src/test/resources/test-harness-generated/generate-tests.py new file mode 100644 index 0000000..5786a95 --- /dev/null +++ b/src/test/resources/test-harness-generated/generate-tests.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +import os +from pathlib import Path +from pdp_harness import PDPPolicy, PDPRequest, PDPResponse, PDPDecision, PDPTest + +# very simple test; generate policy, requests and responses and write them to the screen +# use for debugging +def test(): + policy = PDPPolicy( + idp_entityids=["http://idp1"], + sp_entityids=["http://sp1"], + is_sp_negated=False, + attributes={ + "eduPersonAffiliation": ["member", "staff"], + "eduPersonPrincipalName": ["foo@example.com"] + }, + is_deny=False, + is_and=True + ) + print("Policy:") + print(policy.to_json()) + + request = PDPRequest( + idp_entityid="http://idp1", + sp_entityid="http://sp1", + attributes={"eduPersonAffiliation": ["member", "staff"]} + ) + print("Request:") + print(request.to_json()) + + response_permit = PDPResponse( + policy=policy, + decision=PDPDecision.Permit + ) + response_na = PDPResponse( + policy=policy, + decision=PDPDecision.NotApplicable + ) + response_deny = PDPResponse( + policy=policy, + decision=PDPDecision.Deny + ) + print("Response permit:") + print(response_permit.to_json()) + print("Response na:") + print(response_na.to_json()) + print("Response deny:") + print(response_deny.to_json()) + + +# generate some simple tests +def generate_tests(): + # write all tests to this directory + os.chdir(Path(__file__).parent) + + test1 = PDPTest( + name="simple_attr_allow", + policy=PDPPolicy( + idp_entityids=["http://idp1"], + sp_entityids=["http://sp1"], + is_sp_negated=False, + attributes={ + "eduPersonAffiliation": ["member", "staff"], + } + ), + request=PDPRequest( + idp_entityid="http://idp1", + sp_entityid="http://sp1", + attributes={"eduPersonAffiliation": ["member"]} + ), + decision=PDPDecision.Permit + ) + test1.write() + + # copy the test and adjust the attribute to give a Deny + test2 = test1.copy("simple_attr_deny") + test2.request.attributes = {"eduPersonAffiliation": ["notmember"]} + test2.response.decision = PDPDecision.Deny + test2.write() + + # copy the test and adjust the SP to give a NotApplicable + test3 = test1.copy("simple_attr_na") + test3.request.sp_entityid = "http://sp2" + test3.response.decision = PDPDecision.NotApplicable + test3.write() + + # copy the test and adjust the response to fail the test + test4 = test1.copy("simple_attr_fail") + test4.response.decision = PDPDecision.NotApplicable + test4.write() + + +def main(): + generate_tests() + + +if __name__ == '__main__': + main() diff --git a/src/test/resources/test-harness-generated/pdp_harness.py b/src/test/resources/test-harness-generated/pdp_harness.py new file mode 100644 index 0000000..62c27c1 --- /dev/null +++ b/src/test/resources/test-harness-generated/pdp_harness.py @@ -0,0 +1,276 @@ +from __future__ import annotations + +""" +This module provides a test harness for the PDP (Policy Decision Point) system. +It contains classes for constructing policies, requests and responses in a +consistent JSON format that can be used to test the PDP behavior. + +See generate-tests.py for examples of how to use these classes to +create test cases by: + +1. Creating a PDPPolicy with allow/deny rules, attributes and entity IDs +2. Creating a PDPRequest with attributes to test against the policy +3. Creating a PDPTest that combines policy and request with expected decision +4. Writing the test files (policy.json, request.json, response.json) to disk + +The generated test files can then be executed by the Java PolicyHarnessTest. +""" + +import copy +import json +import random +import datetime +from dataclasses import dataclass, field +from enum import StrEnum +from pathlib import Path +from zoneinfo import ZoneInfo + +from jinja2 import Template, StrictUndefined + + +MARTY_DATE = datetime.datetime(2015, 10, 21, 7, 28, 0, tzinfo=ZoneInfo("America/Los_Angeles")) + + +@dataclass +class PDPPolicy: + id: str | None = None + idp_entityids: list[str] = field(default_factory=list) + sp_entityids: list[str] = field(default_factory=list) + attributes: dict[str, list[str]] = field(default_factory=dict) + is_sp_negated: bool = False + is_deny: bool = False + is_and: bool = False + created: datetime.datetime = MARTY_DATE + + def __post_init__(self): + if self.id is None: + self.id = ''.join(random.choices("0123456789abcdef", k=16)) + + @property + def flat_attributes(self): + flat = [ + (name, value) + for name, values in self.attributes.items() + for value in values + ] + return flat + + def write_json(self, path: Path): + path.write_text(self.to_json()) + + def to_json(self) -> str: + """Render this policy as normalized JSON using a Jinja2 template.""" + policy_template = """ + { + "id": "{{ p.id }}", + "policyId": "urn:surfconext:xacml:policy:id:{{ p.id }}", + "name": "{{ p.id }}", + "description": "This is the description of the policy", + "serviceProviderIds": {{ p.sp_entityids | list | tojson }}, + "serviceProvidersNegated": {{ p.is_sp_negated | tojson }}, + "identityProviderIds": {{ p.idp_entityids | list | tojson }}, + "attributes": [ + {%- for attr_name, attr_value in p.flat_attributes -%} + {%- if not loop.first %},{% endif %} + { + "name": "urn:mace:dir:attribute-def:{{ attr_name }}", + "value": "{{ attr_value }}", + "negated": false, + {#- TODO: the groupID is required for more advanced attribute combinations + (see https://github.com/OpenConext/OpenConext-manage/issues/579) #} + "groupID": 0 + } + {%- endfor -%} + ], + "denyRule": {{ p.is_deny | tojson }}, + "allAttributesMustMatch": {{ p.is_and | tojson }}, + "created": "{{ p.created.isoformat() }}", + "denyAdvice": "NOT ALLOWED", + "denyAdviceNl": "MAG NIET", + "active": true, + "actionsAllowed": false, + "type": "reg", + "revisionNbr": 0, + "activatedSr": false + } + """ + template = Template(policy_template, undefined=StrictUndefined) + json_str = template.render(p=self) + + # check if the JSON is correct and reformat + return json.dumps(json.loads(json_str), indent=4) + + +@dataclass +class PDPRequest: + idp_entityid: str + sp_entityid: str + attributes: dict[str, list[str]] = field(default_factory=dict) + + @property + def flat_attributes(self): + flat = [ + (name, value) + for name, values in self.attributes.items() + for value in values + ] + return flat + + def write_json(self, path: Path): + path.write_text(self.to_json()) + + def to_json(self) -> str: + """Render this request as normalized JSON using a Jinja2 template.""" + + request_template = """ + { + "Request": { + "AccessSubject": { + "Attribute": [ + {%- for attr_name, value in r.flat_attributes -%} + {%- if not loop.first %},{% endif %} + { + "AttributeId": "urn:mace:dir:attribute-def:{{ attr_name }}", + "Value": "{{ value }}" + } + {%- endfor -%} + ] + }, + "Resource": { + "Attribute": [{ + "AttributeId": "SPentityID", + "Value": "{{ r.sp_entityid }}" + },{ + "AttributeId": "IDPentityID", + "Value": "{{ r.idp_entityid }}" + },{ + "AttributeId": "ClientID", + "Value": "EngineBlock" + }] + } + } + } + """ + template = Template(request_template, undefined=StrictUndefined) + json_str = template.render(r=self) + + # check if the JSON is correct and reformat + return json.dumps(json.loads(json_str), indent=4) + + +class PDPDecision(StrEnum): + Permit = "Permit" + Deny = "Deny" + NotApplicable = "NotApplicable" + + +@dataclass +class PDPResponse: + policy: PDPPolicy + decision: PDPDecision + + def write_json(self, path: Path): + path.write_text(self.to_json()) + + def to_json(self) -> str: + """Render this response as normalized JSON using a Jinja2 template.""" + response_template = """ + { + "Response": [{ + "Status": { + "StatusCode": { "Value": "urn:oasis:names:tc:xacml:1.0:status:ok" } + }, + {%- if r.decision == 'Deny' %} + "AssociatedAdvice": [ + { + "AttributeAssignment": [ + { + "Category": "urn:oasis:names:tc:xacml:3.0:attribute-category:resource", + "AttributeId": "DenyMessage:en", + "Value": "NOT ALLOWED", + "DataType": "http://www.w3.org/2001/XMLSchema#string" + }, + { + "Category": "urn:oasis:names:tc:xacml:3.0:attribute-category:resource", + "AttributeId": "DenyMessage:nl", + "Value": "MAG NIET", + "DataType": "http://www.w3.org/2001/XMLSchema#string" + }, + { + "Category": "urn:oasis:names:tc:xacml:3.0:attribute-category:resource", + "AttributeId": "IdPOnly", + "Value": true, + "DataType": "http://www.w3.org/2001/XMLSchema#boolean" + } + ], + "Id": "urn:surfconext:xacml:policy:id:{{ r.policy.id }}" + } + ], + {%- elif r.decision == 'Permit' %} + {#- TODO: this is a bit annoying, because typically you don't really want to + have to specify which rule matches exactly #} + {#- TODO: also unclear what whould be in here, exactly. Even for complex rules, + this still contains only 1 attribute #} + "Category" : [ { + "CategoryId" : "urn:mace:dir:attribute-def:{{ r.policy.flat_attributes[0][0] }}", + "Attribute" : [ { + "AttributeId" : "urn:mace:dir:attribute-def:{{ r.policy.flat_attributes[0][0] }}", + "Value" : "{{ r.policy.flat_attributes[0][1] }}", + "DataType" : "http://www.w3.org/2001/XMLSchema#string" + } ] + } ], + {%- endif %} + "PolicyIdentifier": { + "PolicySetIdReference": [{ + "Version": "1.0", + "Id": "urn:openconext:pdp:root:policyset" + }] + {%- if r.decision != 'NotApplicable' -%}, + "PolicyIdReference": [{ + "Version": "1", + "Id": "urn:surfconext:xacml:policy:id:{{ r.policy.id }}" + }] + {%- endif -%} + }, + "Decision": "{{ r.decision }}" + }] + } + """ + template = Template(response_template, undefined=StrictUndefined) + json_str = template.render(r=self) + + # check if the JSON is correct and reformat + return json.dumps(json.loads(json_str), indent=4) + + +@dataclass +class PDPTest: + name: str + policy: PDPPolicy + request: PDPRequest + # need to specify either one of these: + response: PDPResponse | None = None + decision: PDPDecision | None = None + + def __post_init__(self): + if self.response is None and self.decision is None: + raise ValueError("either response or decision must be specified") + if self.response is None: + self.response = PDPResponse(self.policy, self.decision) + + def copy(self, name: str) -> PDPTest: + new_test = copy.deepcopy(self) + new_test.name = name + return new_test + + def write(self, basedir: Path = Path('.')): + output_dir = basedir / self.name + print(f"writing to {output_dir.absolute()}") + + output_dir.mkdir(parents=True, exist_ok=True) + for f in output_dir.glob("*"): + f.unlink() + + self.policy.write_json(output_dir / "policy.json") + self.request.write_json(output_dir / "request.json") + self.response.write_json(output_dir / "response.json")