diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index da7301911..c384a876c 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -13,6 +13,8 @@ def set_dsm_context(event, event_source): _dsm_set_sqs_context(event) elif event_source.equals(EventTypes.SNS): _dsm_set_sns_context(event) + elif event_source.equals(EventTypes.KINESIS): + _dsm_set_kinesis_context(event) def _dsm_set_context_helper( @@ -78,6 +80,23 @@ def sqs_arn_extractor(record): _dsm_set_context_helper(event, "sqs", sqs_arn_extractor, sqs_payload_calculator) +def _dsm_set_kinesis_context(event): + from ddtrace.internal.datastreams.botocore import calculate_kinesis_payload_size + + def kinesis_payload_calculator(record, context_json): + return calculate_kinesis_payload_size(record) + + def kinesis_arn_extractor(record): + arn = record.get("eventSourceARN") + if arn is None: + return "" + return arn + + _dsm_set_context_helper( + event, "kinesis", kinesis_arn_extractor, kinesis_payload_calculator + ) + + def _get_dsm_context_from_lambda(message): """ Lambda-specific message formats: diff --git a/tests/test_dsm.py b/tests/test_dsm.py index 73558625d..5ef4ab178 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -7,6 +7,7 @@ set_dsm_context, _dsm_set_sqs_context, _dsm_set_sns_context, + _dsm_set_kinesis_context, _get_dsm_context_from_lambda, ) from datadog_lambda.trigger import EventTypes, _EventSource @@ -22,6 +23,10 @@ def setUp(self): self.mock_dsm_set_sns_context = patcher.start() self.addCleanup(patcher.stop) + patcher = patch("datadog_lambda.dsm._dsm_set_kinesis_context") + self.mock_dsm_set_kinesis_context = patcher.start() + self.addCleanup(patcher.stop) + patcher = patch("ddtrace.internal.datastreams.data_streams_processor") self.mock_data_streams_processor = patcher.start() self.addCleanup(patcher.stop) @@ -45,6 +50,13 @@ def setUp(self): self.mock_calculate_sns_payload_size.return_value = 150 self.addCleanup(patcher.stop) + patcher = patch( + "ddtrace.internal.datastreams.botocore.calculate_kinesis_payload_size" + ) + self.mock_calculate_kinesis_payload_size = patcher.start() + self.mock_calculate_kinesis_payload_size.return_value = 200 + self.addCleanup(patcher.stop) + patcher = patch("ddtrace.internal.datastreams.processor.DsmPathwayCodec.decode") self.mock_dsm_pathway_codec_decode = patcher.start() self.addCleanup(patcher.stop) @@ -207,6 +219,88 @@ def test_sns_multiple_records_process_each_record(self): self.assertIn("type:sns", tags) self.assertEqual(kwargs["payload_size"], 150) + def test_kinesis_event_with_no_records_does_nothing(self): + """Test that events where Records is None don't trigger DSM processing""" + events_with_no_records = [ + {}, + {"Records": None}, + {"someOtherField": "value"}, + ] + + for event in events_with_no_records: + _dsm_set_kinesis_context(event) + self.mock_data_streams_processor.assert_not_called() + + def test_kinesis_event_triggers_dsm_kinesis_context(self): + """Test that Kinesis event sources trigger the Kinesis-specific DSM context function""" + kinesis_event = { + "Records": [ + { + "eventSource": "aws:kinesis", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream", + "kinesis": { + "data": "SGVsbG8gZnJvbSBLaW5lc2lzIQ==", + "partitionKey": "partition-key", + }, + } + ] + } + + event_source = _EventSource(EventTypes.KINESIS) + set_dsm_context(kinesis_event, event_source) + + self.mock_dsm_set_kinesis_context.assert_called_once_with(kinesis_event) + + def test_kinesis_multiple_records_process_each_record(self): + """Test that each record in a Kinesis event gets processed individually""" + multi_record_event = { + "Records": [ + { + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/stream1", + "kinesis": { + "data": "TWVzc2FnZSAx", + "partitionKey": "partition-1", + }, + }, + { + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/stream2", + "kinesis": { + "data": "TWVzc2FnZSAy", + "partitionKey": "partition-2", + }, + }, + { + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/stream3", + "kinesis": { + "data": "TWVzc2FnZSAz", + "partitionKey": "partition-3", + }, + }, + ] + } + + mock_context = MagicMock() + self.mock_dsm_pathway_codec_decode.return_value = mock_context + + _dsm_set_kinesis_context(multi_record_event) + + self.assertEqual(mock_context.set_checkpoint.call_count, 3) + + calls = mock_context.set_checkpoint.call_args_list + expected_arns = [ + "arn:aws:kinesis:us-east-1:123456789012:stream/stream1", + "arn:aws:kinesis:us-east-1:123456789012:stream/stream2", + "arn:aws:kinesis:us-east-1:123456789012:stream/stream3", + ] + + for i, call in enumerate(calls): + args, kwargs = call + tags = args[0] + self.assertIn("direction:in", tags) + self.assertIn(f"topic:{expected_arns[i]}", tags) + self.assertIn("type:kinesis", tags) + self.assertEqual(kwargs["payload_size"], 200) + class TestGetDSMContext(unittest.TestCase): def test_sqs_to_lambda_string_value_format(self):