Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions datadog_lambda/dsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ def set_dsm_context(event, event_source):
_dsm_set_sqs_context(event)
elif event_source.equals(EventTypes.SNS):
_dsm_set_sns_context(event)
elif event_source.equals(EventTypes.KINESIS):
_dsm_set_kinesis_context(event)


def _dsm_set_context_helper(
Expand Down Expand Up @@ -78,6 +80,23 @@ def sqs_arn_extractor(record):
_dsm_set_context_helper(event, "sqs", sqs_arn_extractor, sqs_payload_calculator)


def _dsm_set_kinesis_context(event):
from ddtrace.internal.datastreams.botocore import calculate_kinesis_payload_size

def kinesis_payload_calculator(record, context_json):
return calculate_kinesis_payload_size(record)

def kinesis_arn_extractor(record):
arn = record.get("eventSourceARN")
if arn is None:
return ""
return arn

_dsm_set_context_helper(
event, "kinesis", kinesis_arn_extractor, kinesis_payload_calculator
)


def _get_dsm_context_from_lambda(message):
"""
Lambda-specific message formats:
Expand Down
94 changes: 94 additions & 0 deletions tests/test_dsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
set_dsm_context,
_dsm_set_sqs_context,
_dsm_set_sns_context,
_dsm_set_kinesis_context,
_get_dsm_context_from_lambda,
)
from datadog_lambda.trigger import EventTypes, _EventSource
Expand All @@ -22,6 +23,10 @@ def setUp(self):
self.mock_dsm_set_sns_context = patcher.start()
self.addCleanup(patcher.stop)

patcher = patch("datadog_lambda.dsm._dsm_set_kinesis_context")
self.mock_dsm_set_kinesis_context = patcher.start()
self.addCleanup(patcher.stop)

patcher = patch("ddtrace.internal.datastreams.data_streams_processor")
self.mock_data_streams_processor = patcher.start()
self.addCleanup(patcher.stop)
Expand All @@ -45,6 +50,13 @@ def setUp(self):
self.mock_calculate_sns_payload_size.return_value = 150
self.addCleanup(patcher.stop)

patcher = patch(
"ddtrace.internal.datastreams.botocore.calculate_kinesis_payload_size"
)
self.mock_calculate_kinesis_payload_size = patcher.start()
self.mock_calculate_kinesis_payload_size.return_value = 200
self.addCleanup(patcher.stop)

patcher = patch("ddtrace.internal.datastreams.processor.DsmPathwayCodec.decode")
self.mock_dsm_pathway_codec_decode = patcher.start()
self.addCleanup(patcher.stop)
Expand Down Expand Up @@ -207,6 +219,88 @@ def test_sns_multiple_records_process_each_record(self):
self.assertIn("type:sns", tags)
self.assertEqual(kwargs["payload_size"], 150)

def test_kinesis_event_with_no_records_does_nothing(self):
"""Test that events where Records is None don't trigger DSM processing"""
events_with_no_records = [
{},
{"Records": None},
{"someOtherField": "value"},
]

for event in events_with_no_records:
_dsm_set_kinesis_context(event)
self.mock_data_streams_processor.assert_not_called()

def test_kinesis_event_triggers_dsm_kinesis_context(self):
"""Test that Kinesis event sources trigger the Kinesis-specific DSM context function"""
kinesis_event = {
"Records": [
{
"eventSource": "aws:kinesis",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
"kinesis": {
"data": "SGVsbG8gZnJvbSBLaW5lc2lzIQ==",
"partitionKey": "partition-key",
},
}
]
}

event_source = _EventSource(EventTypes.KINESIS)
set_dsm_context(kinesis_event, event_source)

self.mock_dsm_set_kinesis_context.assert_called_once_with(kinesis_event)

def test_kinesis_multiple_records_process_each_record(self):
"""Test that each record in a Kinesis event gets processed individually"""
multi_record_event = {
"Records": [
{
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/stream1",
"kinesis": {
"data": "TWVzc2FnZSAx",
"partitionKey": "partition-1",
},
},
{
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/stream2",
"kinesis": {
"data": "TWVzc2FnZSAy",
"partitionKey": "partition-2",
},
},
{
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/stream3",
"kinesis": {
"data": "TWVzc2FnZSAz",
"partitionKey": "partition-3",
},
},
]
}

mock_context = MagicMock()
self.mock_dsm_pathway_codec_decode.return_value = mock_context

_dsm_set_kinesis_context(multi_record_event)

self.assertEqual(mock_context.set_checkpoint.call_count, 3)

calls = mock_context.set_checkpoint.call_args_list
expected_arns = [
"arn:aws:kinesis:us-east-1:123456789012:stream/stream1",
"arn:aws:kinesis:us-east-1:123456789012:stream/stream2",
"arn:aws:kinesis:us-east-1:123456789012:stream/stream3",
]

for i, call in enumerate(calls):
args, kwargs = call
tags = args[0]
self.assertIn("direction:in", tags)
self.assertIn(f"topic:{expected_arns[i]}", tags)
self.assertIn("type:kinesis", tags)
self.assertEqual(kwargs["payload_size"], 200)


class TestGetDSMContext(unittest.TestCase):
def test_sqs_to_lambda_string_value_format(self):
Expand Down