Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 46 additions & 19 deletions samtranslator/model/role_utils/role_constructor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Optional
from typing import Any, Callable, Dict, List, Optional

from samtranslator.internal.managed_policies import get_bundled_managed_policy_map
from samtranslator.internal.types import GetManagedPolicyMap
Expand Down Expand Up @@ -60,6 +60,34 @@ def _get_managed_policy_arn(
return name


def _convert_intrinsic_if_values(
intrinsic_if: Dict[str, List[Any]], is_convertible: Callable[[Any], Any], convert: Callable[[Any], Any]
) -> Dict[str, List[Any]]:
"""
Convert the true and false value of the intrinsic if function according to
`convert` function.

:param intrinsic_if: A dict of the form {"Fn::If": [condition, value_if_true, value_if_false]}
:type intrinsic_if: Dict[str, List[Any]]
:param is_convertible: The function used to decide if the value must be converted
:type convert: Callable[[Any], Any]
:param convert: The function used to make the conversion
:type convert: Callable[[Any], Any]
:return: The input dict with values converted
:rtype: Dict[str, List[Any]]
"""
value_if_true = intrinsic_if["Fn::If"][1]
value_if_false = intrinsic_if["Fn::If"][2]

if is_convertible(value_if_true):
intrinsic_if["Fn::If"][1] = convert(value_if_true)

if is_convertible(value_if_false):
intrinsic_if["Fn::If"][2] = convert(value_if_false)

return intrinsic_if


def construct_role_for_resource( # type: ignore[no-untyped-def] # noqa: PLR0913
resource_logical_id,
attributes,
Expand Down Expand Up @@ -102,23 +130,16 @@ def construct_role_for_resource( # type: ignore[no-untyped-def] # noqa: PLR0913
for index, policy_entry in enumerate(resource_policies.get()):
if policy_entry.type is PolicyTypes.POLICY_STATEMENT:
if is_intrinsic_if(policy_entry.data):
intrinsic_if = policy_entry.data
then_statement = intrinsic_if["Fn::If"][1]
else_statement = intrinsic_if["Fn::If"][2]

if not is_intrinsic_no_value(then_statement):
then_statement = {
"PolicyName": execution_role.logical_id + "Policy" + str(index),
"PolicyDocument": then_statement,
}
intrinsic_if["Fn::If"][1] = then_statement

if not is_intrinsic_no_value(else_statement):
else_statement = {
"PolicyName": execution_role.logical_id + "Policy" + str(index),
"PolicyDocument": else_statement,
}
intrinsic_if["Fn::If"][2] = else_statement
intrinsic_if = _convert_intrinsic_if_values(
policy_entry.data,
lambda value: not is_intrinsic_no_value(value),
lambda value: (
{
"PolicyName": execution_role.logical_id + "Policy" + str(index), # noqa: B023
"PolicyDocument": value,
}
),
)

policy_documents.append(intrinsic_if)

Expand All @@ -134,7 +155,7 @@ def construct_role_for_resource( # type: ignore[no-untyped-def] # noqa: PLR0913
# There are three options:
# Managed Policy Name (string): Try to convert to Managed Policy ARN
# Managed Policy Arn (string): Insert it directly into the list
# Intrinsic Function (dict): Insert it directly into the list
# Intrinsic Function (dict): Try to convert each statement to Managed Policy Arn
#
# When you insert into managed_policy_arns list, de-dupe to prevent same ARN from showing up twice
#
Expand All @@ -146,6 +167,12 @@ def construct_role_for_resource( # type: ignore[no-untyped-def] # noqa: PLR0913
managed_policy_map,
get_managed_policy_map,
)
elif is_intrinsic_if(policy_arn):
policy_arn = _convert_intrinsic_if_values(
policy_arn,
lambda value: not is_intrinsic_no_value(value) and isinstance(value, str),
lambda value: _get_managed_policy_arn(value, managed_policy_map, get_managed_policy_map),
)

# De-Duplicate managed policy arns before inserting. Mainly useful
# when customer specifies a managed policy which is already inserted
Expand Down
73 changes: 73 additions & 0 deletions tests/model/test_sam_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -904,3 +904,76 @@ def test_capacity_provider_with_propagate_tags(self):
tags = resource.Tags
self.assertEqual(sorted([tag["Key"] for tag in tags]), ["Environment", "Project", "lambda:createdBy"])
self.assertEqual(sorted([tag["Value"] for tag in tags]), ["Production", "SAM", "ServerlessApp"])


class TestFunctionPolicy(TestCase):
kwargs = {
"intrinsics_resolver": IntrinsicsResolver({}),
"event_resources": [],
"managed_policy_map": {"foo": "bar"},
"resource_resolver": ResourceResolver({}),
}

@patch("boto3.session.Session.region_name", "ap-southeast-1")
def test_managed_policy_name(self):
function = SamFunction("Foo")
function.CodeUri = "s3://foobar/foo.zip"
function.Runtime = "foo"
function.Handler = "bar"
managedPolicyName = "foo"
function.Policies = [managedPolicyName]

cfnResources = function.to_cloudformation(**self.kwargs)
iamRoles = [x for x in cfnResources if isinstance(x, IAMRole)]
self.assertEqual(iamRoles[0].ManagedPolicyArns[1], self.kwargs["managed_policy_map"][managedPolicyName])

@patch("boto3.session.Session.region_name", "ap-southeast-1")
def test_unknown_policy_name(self):
function = SamFunction("Foo")
function.CodeUri = "s3://foobar/foo.zip"
function.Runtime = "foo"
function.Handler = "bar"
unknownPolicyName = "bar"
function.Policies = [unknownPolicyName]

cfnResources = function.to_cloudformation(**self.kwargs)
iamRoles = [x for x in cfnResources if isinstance(x, IAMRole)]
self.assertEqual(iamRoles[0].ManagedPolicyArns[1], unknownPolicyName)

@patch("boto3.session.Session.region_name", "ap-southeast-1")
def test_managed_policy_name_within_intrinsic_if_then(self):
function = SamFunction("Foo")
function.CodeUri = "s3://foobar/foo.zip"
function.Runtime = "foo"
function.Handler = "bar"
managedPolicyName = "foo"
function.Policies = [{"Fn::If": ["Condition", managedPolicyName, {"Fn::Ref": "AWS::NoValue"}]}]

cfnResources = function.to_cloudformation(**self.kwargs)
iamRoles = [x for x in cfnResources if isinstance(x, IAMRole)]

self.assertIn("Fn::If", iamRoles[0].ManagedPolicyArns[1])
self.assertEqual(iamRoles[0].ManagedPolicyArns[1]["Fn::If"][0], "Condition")
self.assertEqual(
iamRoles[0].ManagedPolicyArns[1]["Fn::If"][1], self.kwargs["managed_policy_map"][managedPolicyName]
)
self.assertDictEqual(iamRoles[0].ManagedPolicyArns[1]["Fn::If"][2], {"Fn::Ref": "AWS::NoValue"})

@patch("boto3.session.Session.region_name", "ap-southeast-1")
def test_managed_policy_name_within_intrinsic_if_else(self):
function = SamFunction("Foo")
function.CodeUri = "s3://foobar/foo.zip"
function.Runtime = "foo"
function.Handler = "bar"
managedPolicyName = "foo"
function.Policies = [{"Fn::If": ["Condition", {"Fn::Ref": "AWS::NoValue"}, managedPolicyName]}]

cfnResources = function.to_cloudformation(**self.kwargs)
iamRoles = [x for x in cfnResources if isinstance(x, IAMRole)]

self.assertIn("Fn::If", iamRoles[0].ManagedPolicyArns[1])
self.assertEqual(iamRoles[0].ManagedPolicyArns[1]["Fn::If"][0], "Condition")
self.assertDictEqual(iamRoles[0].ManagedPolicyArns[1]["Fn::If"][1], {"Fn::Ref": "AWS::NoValue"})
self.assertEqual(
iamRoles[0].ManagedPolicyArns[1]["Fn::If"][2], self.kwargs["managed_policy_map"][managedPolicyName]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Conditions:
IsTrue: true

Resources:
ExampleFunction:
Type: AWS::Serverless::Function
Properties:
Runtime: python3.12
Handler: handler
InlineCode: >
def handler():
pass
Policies:
- Fn::If:
- IsTrue
- CloudWatchLambdaInsightsExecutionRolePolicy
- !Ref AWS::NoValue
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
{
"AWSTemplateFormatVersion": "2010-09-09",
"Conditions": {
"IsTrue": true
},
"Resources": {
"ExampleFunction": {
"Properties": {
"Code": {
"ZipFile": "def handler():\n pass\n"
},
"Handler": "handler",
"Role": {
"Fn::GetAtt": [
"ExampleFunctionRole",
"Arn"
]
},
"Runtime": "python3.12",
"Tags": [
{
"Key": "lambda:createdBy",
"Value": "SAM"
}
]
},
"Type": "AWS::Lambda::Function"
},
"ExampleFunctionRole": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
}
}
],
"Version": "2012-10-17"
},
"ManagedPolicyArns": [
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
{
"Fn::If": [
"IsTrue",
"arn:aws-cn:iam::aws:policy/CloudWatchLambdaInsightsExecutionRolePolicy",
{
"Ref": "AWS::NoValue"
}
]
}
],
"Tags": [
{
"Key": "lambda:createdBy",
"Value": "SAM"
}
]
},
"Type": "AWS::IAM::Role"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
{
"AWSTemplateFormatVersion": "2010-09-09",
"Conditions": {
"IsTrue": true
},
"Resources": {
"ExampleFunction": {
"Properties": {
"Code": {
"ZipFile": "def handler():\n pass\n"
},
"Handler": "handler",
"Role": {
"Fn::GetAtt": [
"ExampleFunctionRole",
"Arn"
]
},
"Runtime": "python3.12",
"Tags": [
{
"Key": "lambda:createdBy",
"Value": "SAM"
}
]
},
"Type": "AWS::Lambda::Function"
},
"ExampleFunctionRole": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
}
}
],
"Version": "2012-10-17"
},
"ManagedPolicyArns": [
"arn:aws-us-gov:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
{
"Fn::If": [
"IsTrue",
"arn:aws-us-gov:iam::aws:policy/CloudWatchLambdaInsightsExecutionRolePolicy",
{
"Ref": "AWS::NoValue"
}
]
}
],
"Tags": [
{
"Key": "lambda:createdBy",
"Value": "SAM"
}
]
},
"Type": "AWS::IAM::Role"
}
}
}
Loading