diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index 741be9acb..da7301911 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -1,6 +1,12 @@ +import json +import base64 + +from ddtrace.internal.logger import get_logger from datadog_lambda import logger from datadog_lambda.trigger import EventTypes +log = get_logger(__name__) + def set_dsm_context(event, event_source): if event_source.equals(EventTypes.SQS): @@ -24,7 +30,6 @@ def _dsm_set_context_helper( from datadog_lambda.wrapper import format_err_with_traceback from ddtrace.internal.datastreams import data_streams_processor from ddtrace.internal.datastreams.processor import DsmPathwayCodec - from ddtrace.internal.datastreams.botocore import get_datastreams_context records = event.get("Records") if records is None: @@ -34,7 +39,7 @@ def _dsm_set_context_helper( for record in records: try: arn = arn_extractor(record) - context_json = get_datastreams_context(record) + context_json = _get_dsm_context_from_lambda(record) payload_size = payload_size_calculator(record, context_json) ctx = DsmPathwayCodec.decode(context_json, processor) @@ -71,3 +76,65 @@ def sqs_arn_extractor(record): return record.get("eventSourceARN", "") _dsm_set_context_helper(event, "sqs", sqs_arn_extractor, sqs_payload_calculator) + + +def _get_dsm_context_from_lambda(message): + """ + Lambda-specific message formats: + - message.messageAttributes._datadog.stringValue (SQS -> lambda) + - message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda) + - message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw) + - message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda) + - message.kinesis.data.decode()._datadog (Kinesis -> lambda) + """ + context_json = None + message_body = message + + if "kinesis" in message: + try: + kinesis_data = json.loads( + base64.b64decode(message["kinesis"]["data"]).decode() + ) + return kinesis_data.get("_datadog") + except (ValueError, TypeError, KeyError): + log.debug("Unable to parse kinesis data for lambda message") + return None + elif "Sns" in message: + message_body = message["Sns"] + else: + try: + body = message.get("body") + if body: + message_body = json.loads(body) + except (ValueError, TypeError): + log.debug("Unable to parse lambda message body as JSON, treat as non-json") + + message_attributes = message_body.get("MessageAttributes") or message_body.get( + "messageAttributes" + ) + if not message_attributes: + log.debug("DataStreams skipped lambda message: %r", message) + return None + + if "_datadog" not in message_attributes: + log.debug("DataStreams skipped lambda message: %r", message) + return None + + datadog_attr = message_attributes["_datadog"] + + if message_body.get("Type") == "Notification": + # SNS -> lambda notification + if datadog_attr.get("Type") == "Binary": + context_json = json.loads(base64.b64decode(datadog_attr["Value"]).decode()) + elif "stringValue" in datadog_attr: + # SQS -> lambda + context_json = json.loads(datadog_attr["stringValue"]) + elif "binaryValue" in datadog_attr: + # SNS -> SQS -> lambda, raw message delivery + context_json = json.loads( + base64.b64decode(datadog_attr["binaryValue"]).decode() + ) + else: + log.debug("DataStreams did not handle lambda message: %r", message) + + return context_json diff --git a/tests/test_dsm.py b/tests/test_dsm.py index 30b82a962..73558625d 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -1,10 +1,13 @@ import unittest +import json +import base64 from unittest.mock import patch, MagicMock from datadog_lambda.dsm import ( set_dsm_context, _dsm_set_sqs_context, _dsm_set_sns_context, + _get_dsm_context_from_lambda, ) from datadog_lambda.trigger import EventTypes, _EventSource @@ -203,3 +206,230 @@ def test_sns_multiple_records_process_each_record(self): self.assertIn(f"topic:{expected_arns[i]}", tags) self.assertIn("type:sns", tags) self.assertEqual(kwargs["payload_size"], 150) + + +class TestGetDSMContext(unittest.TestCase): + def test_sqs_to_lambda_string_value_format(self): + """Test format: message.messageAttributes._datadog.stringValue (SQS -> lambda)""" + trace_context = { + "x-datadog-trace-id": "789123456", + "x-datadog-parent-id": "321987654", + "dd-pathway-ctx": "test-pathway-ctx", + } + + lambda_record = { + "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", + "body": "Test message.", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185", + }, + "messageAttributes": { + "_datadog": { + "stringValue": json.dumps(trace_context), + "stringListValues": [], + "binaryListValues": [], + "dataType": "String", + }, + "myAttribute": { + "stringValue": "myValue", + "stringListValues": [], + "binaryListValues": [], + "dataType": "String", + }, + }, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2", + } + + result = _get_dsm_context_from_lambda(lambda_record) + + assert result is not None + assert result == trace_context + assert result["x-datadog-trace-id"] == "789123456" + assert result["x-datadog-parent-id"] == "321987654" + assert result["dd-pathway-ctx"] == "test-pathway-ctx" + + def test_sns_to_lambda_format(self): + """Test format: message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)""" + trace_context = { + "x-datadog-trace-id": "111111111", + "x-datadog-parent-id": "222222222", + "dd-pathway-ctx": "test-pathway-ctx", + } + binary_data = base64.b64encode( + json.dumps(trace_context).encode("utf-8") + ).decode("utf-8") + + sns_lambda_record = { + "EventSource": "aws:sns", + "EventSubscriptionArn": ( + "arn:aws:sns:us-east-1:123456789012:sns-topic:12345678-1234-1234-1234-123456789012" + ), + "Sns": { + "Type": "Notification", + "MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e", + "TopicArn": "arn:aws:sns:us-east-1:123456789012:sns-topic", + "Subject": "Test Subject", + "Message": "Hello from SNS!", + "Timestamp": "2023-01-01T12:00:00.000Z", + "MessageAttributes": { + "_datadog": {"Type": "Binary", "Value": binary_data} + }, + }, + } + + result = _get_dsm_context_from_lambda(sns_lambda_record) + + assert result is not None + assert result == trace_context + assert result["x-datadog-trace-id"] == "111111111" + assert result["x-datadog-parent-id"] == "222222222" + assert result["dd-pathway-ctx"] == "test-pathway-ctx" + + def test_sns_to_sqs_to_lambda_binary_value_format(self): + """Test format: message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)""" + trace_context = { + "x-datadog-trace-id": "777666555", + "x-datadog-parent-id": "444333222", + "dd-pathway-ctx": "test-pathway-ctx", + } + binary_data = base64.b64encode( + json.dumps(trace_context).encode("utf-8") + ).decode("utf-8") + + lambda_record = { + "messageId": "test-message-id", + "receiptHandle": "test-receipt-handle", + "body": "Test message body", + "messageAttributes": { + "_datadog": {"binaryValue": binary_data, "dataType": "Binary"} + }, + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-west-2:123456789012:test-queue", + } + + result = _get_dsm_context_from_lambda(lambda_record) + + assert result is not None + assert result == trace_context + assert result["x-datadog-trace-id"] == "777666555" + assert result["x-datadog-parent-id"] == "444333222" + assert result["dd-pathway-ctx"] == "test-pathway-ctx" + + def test_sns_to_sqs_to_lambda_body_format(self): + """Test format: message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)""" + trace_context = { + "x-datadog-trace-id": "123987456", + "x-datadog-parent-id": "654321987", + "x-datadog-sampling-priority": "1", + "dd-pathway-ctx": "test-pathway-ctx", + } + + message_body = { + "Type": "Notification", + "MessageId": "test-message-id", + "Message": "Test message from SNS", + "MessageAttributes": { + "_datadog": { + "Type": "Binary", + "Value": base64.b64encode( + json.dumps(trace_context).encode("utf-8") + ).decode("utf-8"), + } + }, + } + + lambda_record = { + "messageId": "lambda-message-id", + "body": json.dumps(message_body), + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:sns-to-sqs-queue", + } + + result = _get_dsm_context_from_lambda(lambda_record) + + assert result is not None + assert result == trace_context + assert result["x-datadog-trace-id"] == "123987456" + assert result["x-datadog-parent-id"] == "654321987" + assert result["dd-pathway-ctx"] == "test-pathway-ctx" + + def test_kinesis_to_lambda_format(self): + """Test format: message.kinesis.data.decode()._datadog (Kinesis -> lambda)""" + trace_context = { + "x-datadog-trace-id": "555444333", + "x-datadog-parent-id": "888777666", + "dd-pathway-ctx": "test-pathway-ctx", + } + + # Create the kinesis data payload + kinesis_payload = { + "_datadog": trace_context, + "actualData": "some business data", + } + encoded_kinesis_data = base64.b64encode( + json.dumps(kinesis_payload).encode("utf-8") + ).decode("utf-8") + + kinesis_lambda_record = { + "eventSource": "aws:kinesis", + "eventSourceARN": ( + "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream" + ), + "kinesis": { + "data": encoded_kinesis_data, + "partitionKey": "partition-key-1", + "sequenceNumber": ( + "49590338271490256608559692538361571095921575989136588898" + ), + }, + } + + result = _get_dsm_context_from_lambda(kinesis_lambda_record) + + assert result is not None + assert result == trace_context + assert result["x-datadog-trace-id"] == "555444333" + assert result["x-datadog-parent-id"] == "888777666" + assert result["dd-pathway-ctx"] == "test-pathway-ctx" + + def test_no_message_attributes(self): + """Test message without MessageAttributes returns None.""" + message = { + "messageId": "test-message-id", + "body": "Test message without attributes", + } + + result = _get_dsm_context_from_lambda(message) + + assert result is None + + def test_no_datadog_attribute(self): + """Test message with MessageAttributes but no _datadog attribute returns None.""" + message = { + "messageId": "test-message-id", + "body": "Test message", + "messageAttributes": { + "customAttribute": {"stringValue": "custom-value", "dataType": "String"} + }, + } + + result = _get_dsm_context_from_lambda(message) + assert result is None + + def test_empty_datadog_attribute(self): + """Test message with empty _datadog attribute returns None.""" + message = { + "messageId": "test-message-id", + "messageAttributes": {"_datadog": {}}, + } + + result = _get_dsm_context_from_lambda(message) + + assert result is None