diff --git a/samtranslator/model/role_utils/role_constructor.py b/samtranslator/model/role_utils/role_constructor.py index 4217caff91..825bd55b3f 100644 --- a/samtranslator/model/role_utils/role_constructor.py +++ b/samtranslator/model/role_utils/role_constructor.py @@ -1,4 +1,4 @@ -from typing import Dict, Optional +from typing import Any, Callable, Dict, List, Optional from samtranslator.internal.managed_policies import get_bundled_managed_policy_map from samtranslator.internal.types import GetManagedPolicyMap @@ -60,6 +60,34 @@ def _get_managed_policy_arn( return name +def _convert_intrinsic_if_values( + intrinsic_if: Dict[str, List[Any]], is_convertible: Callable[[Any], Any], convert: Callable[[Any], Any] +) -> Dict[str, List[Any]]: + """ + Convert the true and false value of the intrinsic if function according to + `convert` function. + + :param intrinsic_if: A dict of the form {"Fn::If": [condition, value_if_true, value_if_false]} + :type intrinsic_if: Dict[str, List[Any]] + :param is_convertible: The function used to decide if the value must be converted + :type convert: Callable[[Any], Any] + :param convert: The function used to make the conversion + :type convert: Callable[[Any], Any] + :return: The input dict with values converted + :rtype: Dict[str, List[Any]] + """ + value_if_true = intrinsic_if["Fn::If"][1] + value_if_false = intrinsic_if["Fn::If"][2] + + if is_convertible(value_if_true): + intrinsic_if["Fn::If"][1] = convert(value_if_true) + + if is_convertible(value_if_false): + intrinsic_if["Fn::If"][2] = convert(value_if_false) + + return intrinsic_if + + def construct_role_for_resource( # type: ignore[no-untyped-def] # noqa: PLR0913 resource_logical_id, attributes, @@ -102,23 +130,16 @@ def construct_role_for_resource( # type: ignore[no-untyped-def] # noqa: PLR0913 for index, policy_entry in enumerate(resource_policies.get()): if policy_entry.type is PolicyTypes.POLICY_STATEMENT: if is_intrinsic_if(policy_entry.data): - intrinsic_if = policy_entry.data - then_statement = intrinsic_if["Fn::If"][1] - else_statement = intrinsic_if["Fn::If"][2] - - if not is_intrinsic_no_value(then_statement): - then_statement = { - "PolicyName": execution_role.logical_id + "Policy" + str(index), - "PolicyDocument": then_statement, - } - intrinsic_if["Fn::If"][1] = then_statement - - if not is_intrinsic_no_value(else_statement): - else_statement = { - "PolicyName": execution_role.logical_id + "Policy" + str(index), - "PolicyDocument": else_statement, - } - intrinsic_if["Fn::If"][2] = else_statement + intrinsic_if = _convert_intrinsic_if_values( + policy_entry.data, + lambda value: not is_intrinsic_no_value(value), + lambda value: ( + { + "PolicyName": execution_role.logical_id + "Policy" + str(index), # noqa: B023 + "PolicyDocument": value, + } + ), + ) policy_documents.append(intrinsic_if) @@ -134,7 +155,7 @@ def construct_role_for_resource( # type: ignore[no-untyped-def] # noqa: PLR0913 # There are three options: # Managed Policy Name (string): Try to convert to Managed Policy ARN # Managed Policy Arn (string): Insert it directly into the list - # Intrinsic Function (dict): Insert it directly into the list + # Intrinsic Function (dict): Try to convert each statement to Managed Policy Arn # # When you insert into managed_policy_arns list, de-dupe to prevent same ARN from showing up twice # @@ -146,6 +167,12 @@ def construct_role_for_resource( # type: ignore[no-untyped-def] # noqa: PLR0913 managed_policy_map, get_managed_policy_map, ) + elif is_intrinsic_if(policy_arn): + policy_arn = _convert_intrinsic_if_values( + policy_arn, + lambda value: not is_intrinsic_no_value(value) and isinstance(value, str), + lambda value: _get_managed_policy_arn(value, managed_policy_map, get_managed_policy_map), + ) # De-Duplicate managed policy arns before inserting. Mainly useful # when customer specifies a managed policy which is already inserted diff --git a/tests/model/test_sam_resources.py b/tests/model/test_sam_resources.py index cbc8412dab..a77698810f 100644 --- a/tests/model/test_sam_resources.py +++ b/tests/model/test_sam_resources.py @@ -904,3 +904,76 @@ def test_capacity_provider_with_propagate_tags(self): tags = resource.Tags self.assertEqual(sorted([tag["Key"] for tag in tags]), ["Environment", "Project", "lambda:createdBy"]) self.assertEqual(sorted([tag["Value"] for tag in tags]), ["Production", "SAM", "ServerlessApp"]) + + +class TestFunctionPolicy(TestCase): + kwargs = { + "intrinsics_resolver": IntrinsicsResolver({}), + "event_resources": [], + "managed_policy_map": {"foo": "bar"}, + "resource_resolver": ResourceResolver({}), + } + + @patch("boto3.session.Session.region_name", "ap-southeast-1") + def test_managed_policy_name(self): + function = SamFunction("Foo") + function.CodeUri = "s3://foobar/foo.zip" + function.Runtime = "foo" + function.Handler = "bar" + managedPolicyName = "foo" + function.Policies = [managedPolicyName] + + cfnResources = function.to_cloudformation(**self.kwargs) + iamRoles = [x for x in cfnResources if isinstance(x, IAMRole)] + self.assertEqual(iamRoles[0].ManagedPolicyArns[1], self.kwargs["managed_policy_map"][managedPolicyName]) + + @patch("boto3.session.Session.region_name", "ap-southeast-1") + def test_unknown_policy_name(self): + function = SamFunction("Foo") + function.CodeUri = "s3://foobar/foo.zip" + function.Runtime = "foo" + function.Handler = "bar" + unknownPolicyName = "bar" + function.Policies = [unknownPolicyName] + + cfnResources = function.to_cloudformation(**self.kwargs) + iamRoles = [x for x in cfnResources if isinstance(x, IAMRole)] + self.assertEqual(iamRoles[0].ManagedPolicyArns[1], unknownPolicyName) + + @patch("boto3.session.Session.region_name", "ap-southeast-1") + def test_managed_policy_name_within_intrinsic_if_then(self): + function = SamFunction("Foo") + function.CodeUri = "s3://foobar/foo.zip" + function.Runtime = "foo" + function.Handler = "bar" + managedPolicyName = "foo" + function.Policies = [{"Fn::If": ["Condition", managedPolicyName, {"Fn::Ref": "AWS::NoValue"}]}] + + cfnResources = function.to_cloudformation(**self.kwargs) + iamRoles = [x for x in cfnResources if isinstance(x, IAMRole)] + + self.assertIn("Fn::If", iamRoles[0].ManagedPolicyArns[1]) + self.assertEqual(iamRoles[0].ManagedPolicyArns[1]["Fn::If"][0], "Condition") + self.assertEqual( + iamRoles[0].ManagedPolicyArns[1]["Fn::If"][1], self.kwargs["managed_policy_map"][managedPolicyName] + ) + self.assertDictEqual(iamRoles[0].ManagedPolicyArns[1]["Fn::If"][2], {"Fn::Ref": "AWS::NoValue"}) + + @patch("boto3.session.Session.region_name", "ap-southeast-1") + def test_managed_policy_name_within_intrinsic_if_else(self): + function = SamFunction("Foo") + function.CodeUri = "s3://foobar/foo.zip" + function.Runtime = "foo" + function.Handler = "bar" + managedPolicyName = "foo" + function.Policies = [{"Fn::If": ["Condition", {"Fn::Ref": "AWS::NoValue"}, managedPolicyName]}] + + cfnResources = function.to_cloudformation(**self.kwargs) + iamRoles = [x for x in cfnResources if isinstance(x, IAMRole)] + + self.assertIn("Fn::If", iamRoles[0].ManagedPolicyArns[1]) + self.assertEqual(iamRoles[0].ManagedPolicyArns[1]["Fn::If"][0], "Condition") + self.assertDictEqual(iamRoles[0].ManagedPolicyArns[1]["Fn::If"][1], {"Fn::Ref": "AWS::NoValue"}) + self.assertEqual( + iamRoles[0].ManagedPolicyArns[1]["Fn::If"][2], self.kwargs["managed_policy_map"][managedPolicyName] + ) diff --git a/tests/translator/input/function_policy_with_conditional_managed_policy_name.yaml b/tests/translator/input/function_policy_with_conditional_managed_policy_name.yaml new file mode 100644 index 0000000000..5d79409c28 --- /dev/null +++ b/tests/translator/input/function_policy_with_conditional_managed_policy_name.yaml @@ -0,0 +1,20 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 + +Conditions: + IsTrue: true + +Resources: + ExampleFunction: + Type: AWS::Serverless::Function + Properties: + Runtime: python3.12 + Handler: handler + InlineCode: > + def handler(): + pass + Policies: + - Fn::If: + - IsTrue + - CloudWatchLambdaInsightsExecutionRolePolicy + - !Ref AWS::NoValue diff --git a/tests/translator/output/aws-cn/function_policy_with_conditional_managed_policy_name.json b/tests/translator/output/aws-cn/function_policy_with_conditional_managed_policy_name.json new file mode 100644 index 0000000000..8eb508b0f7 --- /dev/null +++ b/tests/translator/output/aws-cn/function_policy_with_conditional_managed_policy_name.json @@ -0,0 +1,69 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Conditions": { + "IsTrue": true + }, + "Resources": { + "ExampleFunction": { + "Properties": { + "Code": { + "ZipFile": "def handler():\n pass\n" + }, + "Handler": "handler", + "Role": { + "Fn::GetAtt": [ + "ExampleFunctionRole", + "Arn" + ] + }, + "Runtime": "python3.12", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "ExampleFunctionRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + { + "Fn::If": [ + "IsTrue", + "arn:aws-cn:iam::aws:policy/CloudWatchLambdaInsightsExecutionRolePolicy", + { + "Ref": "AWS::NoValue" + } + ] + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + } + } +} diff --git a/tests/translator/output/aws-us-gov/function_policy_with_conditional_managed_policy_name.json b/tests/translator/output/aws-us-gov/function_policy_with_conditional_managed_policy_name.json new file mode 100644 index 0000000000..63643769f2 --- /dev/null +++ b/tests/translator/output/aws-us-gov/function_policy_with_conditional_managed_policy_name.json @@ -0,0 +1,69 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Conditions": { + "IsTrue": true + }, + "Resources": { + "ExampleFunction": { + "Properties": { + "Code": { + "ZipFile": "def handler():\n pass\n" + }, + "Handler": "handler", + "Role": { + "Fn::GetAtt": [ + "ExampleFunctionRole", + "Arn" + ] + }, + "Runtime": "python3.12", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "ExampleFunctionRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws-us-gov:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + { + "Fn::If": [ + "IsTrue", + "arn:aws-us-gov:iam::aws:policy/CloudWatchLambdaInsightsExecutionRolePolicy", + { + "Ref": "AWS::NoValue" + } + ] + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + } + } +} diff --git a/tests/translator/output/function_policy_with_conditional_managed_policy_name.json b/tests/translator/output/function_policy_with_conditional_managed_policy_name.json new file mode 100644 index 0000000000..00a4573c8c --- /dev/null +++ b/tests/translator/output/function_policy_with_conditional_managed_policy_name.json @@ -0,0 +1,69 @@ +{ + "AWSTemplateFormatVersion": "2010-09-09", + "Conditions": { + "IsTrue": true + }, + "Resources": { + "ExampleFunction": { + "Properties": { + "Code": { + "ZipFile": "def handler():\n pass\n" + }, + "Handler": "handler", + "Role": { + "Fn::GetAtt": [ + "ExampleFunctionRole", + "Arn" + ] + }, + "Runtime": "python3.12", + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::Lambda::Function" + }, + "ExampleFunctionRole": { + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": [ + "sts:AssumeRole" + ], + "Effect": "Allow", + "Principal": { + "Service": [ + "lambda.amazonaws.com" + ] + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + { + "Fn::If": [ + "IsTrue", + "arn:aws:iam::aws:policy/CloudWatchLambdaInsightsExecutionRolePolicy", + { + "Ref": "AWS::NoValue" + } + ] + } + ], + "Tags": [ + { + "Key": "lambda:createdBy", + "Value": "SAM" + } + ] + }, + "Type": "AWS::IAM::Role" + } + } +}