Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 69 additions & 2 deletions datadog_lambda/dsm.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import json
import base64

from ddtrace.internal.logger import get_logger
from datadog_lambda import logger
from datadog_lambda.trigger import EventTypes

log = get_logger(__name__)


def set_dsm_context(event, event_source):
if event_source.equals(EventTypes.SQS):
Expand All @@ -24,7 +30,6 @@ def _dsm_set_context_helper(
from datadog_lambda.wrapper import format_err_with_traceback
from ddtrace.internal.datastreams import data_streams_processor
from ddtrace.internal.datastreams.processor import DsmPathwayCodec
from ddtrace.internal.datastreams.botocore import get_datastreams_context

records = event.get("Records")
if records is None:
Expand All @@ -34,7 +39,7 @@ def _dsm_set_context_helper(
for record in records:
try:
arn = arn_extractor(record)
context_json = get_datastreams_context(record)
context_json = _get_dsm_context_from_lambda(record)
payload_size = payload_size_calculator(record, context_json)

ctx = DsmPathwayCodec.decode(context_json, processor)
Expand Down Expand Up @@ -71,3 +76,65 @@ def sqs_arn_extractor(record):
return record.get("eventSourceARN", "")

_dsm_set_context_helper(event, "sqs", sqs_arn_extractor, sqs_payload_calculator)


def _get_dsm_context_from_lambda(message):
"""
Lambda-specific message formats:
- message.messageAttributes._datadog.stringValue (SQS -> lambda)
- message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)
- message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)
- message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)
- message.kinesis.data.decode()._datadog (Kinesis -> lambda)
"""
context_json = None
message_body = message

if "kinesis" in message:
try:
kinesis_data = json.loads(
base64.b64decode(message["kinesis"]["data"]).decode()
)
return kinesis_data.get("_datadog")
except (ValueError, TypeError, KeyError):
log.debug("Unable to parse kinesis data for lambda message")
return None
elif "Sns" in message:
message_body = message["Sns"]
else:
try:
body = message.get("body")
if body:
message_body = json.loads(body)
except (ValueError, TypeError):
log.debug("Unable to parse lambda message body as JSON, treat as non-json")

message_attributes = message_body.get("MessageAttributes") or message_body.get(
"messageAttributes"
)
if not message_attributes:
log.debug("DataStreams skipped lambda message: %r", message)
return None

if "_datadog" not in message_attributes:
log.debug("DataStreams skipped lambda message: %r", message)
return None

datadog_attr = message_attributes["_datadog"]

if message_body.get("Type") == "Notification":
# SNS -> lambda notification
if datadog_attr.get("Type") == "Binary":
context_json = json.loads(base64.b64decode(datadog_attr["Value"]).decode())
elif "stringValue" in datadog_attr:
# SQS -> lambda
context_json = json.loads(datadog_attr["stringValue"])
elif "binaryValue" in datadog_attr:
# SNS -> SQS -> lambda, raw message delivery
context_json = json.loads(
base64.b64decode(datadog_attr["binaryValue"]).decode()
)
else:
log.debug("DataStreams did not handle lambda message: %r", message)

return context_json
230 changes: 230 additions & 0 deletions tests/test_dsm.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import unittest
import json
import base64
from unittest.mock import patch, MagicMock

from datadog_lambda.dsm import (
set_dsm_context,
_dsm_set_sqs_context,
_dsm_set_sns_context,
_get_dsm_context_from_lambda,
)
from datadog_lambda.trigger import EventTypes, _EventSource

Expand Down Expand Up @@ -203,3 +206,230 @@ def test_sns_multiple_records_process_each_record(self):
self.assertIn(f"topic:{expected_arns[i]}", tags)
self.assertIn("type:sns", tags)
self.assertEqual(kwargs["payload_size"], 150)


class TestGetDSMContext(unittest.TestCase):
def test_sqs_to_lambda_string_value_format(self):
"""Test format: message.messageAttributes._datadog.stringValue (SQS -> lambda)"""
trace_context = {
"x-datadog-trace-id": "789123456",
"x-datadog-parent-id": "321987654",
"dd-pathway-ctx": "test-pathway-ctx",
}

lambda_record = {
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082649185",
},
"messageAttributes": {
"_datadog": {
"stringValue": json.dumps(trace_context),
"stringListValues": [],
"binaryListValues": [],
"dataType": "String",
},
"myAttribute": {
"stringValue": "myValue",
"stringListValues": [],
"binaryListValues": [],
"dataType": "String",
},
},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"awsRegion": "us-east-2",
}

result = _get_dsm_context_from_lambda(lambda_record)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "789123456"
assert result["x-datadog-parent-id"] == "321987654"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_sns_to_lambda_format(self):
"""Test format: message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)"""
trace_context = {
"x-datadog-trace-id": "111111111",
"x-datadog-parent-id": "222222222",
"dd-pathway-ctx": "test-pathway-ctx",
}
binary_data = base64.b64encode(
json.dumps(trace_context).encode("utf-8")
).decode("utf-8")

sns_lambda_record = {
"EventSource": "aws:sns",
"EventSubscriptionArn": (
"arn:aws:sns:us-east-1:123456789012:sns-topic:12345678-1234-1234-1234-123456789012"
),
"Sns": {
"Type": "Notification",
"MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",
"TopicArn": "arn:aws:sns:us-east-1:123456789012:sns-topic",
"Subject": "Test Subject",
"Message": "Hello from SNS!",
"Timestamp": "2023-01-01T12:00:00.000Z",
"MessageAttributes": {
"_datadog": {"Type": "Binary", "Value": binary_data}
},
},
}

result = _get_dsm_context_from_lambda(sns_lambda_record)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "111111111"
assert result["x-datadog-parent-id"] == "222222222"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_sns_to_sqs_to_lambda_binary_value_format(self):
"""Test format: message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)"""
trace_context = {
"x-datadog-trace-id": "777666555",
"x-datadog-parent-id": "444333222",
"dd-pathway-ctx": "test-pathway-ctx",
}
binary_data = base64.b64encode(
json.dumps(trace_context).encode("utf-8")
).decode("utf-8")

lambda_record = {
"messageId": "test-message-id",
"receiptHandle": "test-receipt-handle",
"body": "Test message body",
"messageAttributes": {
"_datadog": {"binaryValue": binary_data, "dataType": "Binary"}
},
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-west-2:123456789012:test-queue",
}

result = _get_dsm_context_from_lambda(lambda_record)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "777666555"
assert result["x-datadog-parent-id"] == "444333222"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_sns_to_sqs_to_lambda_body_format(self):
"""Test format: message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)"""
trace_context = {
"x-datadog-trace-id": "123987456",
"x-datadog-parent-id": "654321987",
"x-datadog-sampling-priority": "1",
"dd-pathway-ctx": "test-pathway-ctx",
}

message_body = {
"Type": "Notification",
"MessageId": "test-message-id",
"Message": "Test message from SNS",
"MessageAttributes": {
"_datadog": {
"Type": "Binary",
"Value": base64.b64encode(
json.dumps(trace_context).encode("utf-8")
).decode("utf-8"),
}
},
}

lambda_record = {
"messageId": "lambda-message-id",
"body": json.dumps(message_body),
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:sns-to-sqs-queue",
}

result = _get_dsm_context_from_lambda(lambda_record)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "123987456"
assert result["x-datadog-parent-id"] == "654321987"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_kinesis_to_lambda_format(self):
"""Test format: message.kinesis.data.decode()._datadog (Kinesis -> lambda)"""
trace_context = {
"x-datadog-trace-id": "555444333",
"x-datadog-parent-id": "888777666",
"dd-pathway-ctx": "test-pathway-ctx",
}

# Create the kinesis data payload
kinesis_payload = {
"_datadog": trace_context,
"actualData": "some business data",
}
encoded_kinesis_data = base64.b64encode(
json.dumps(kinesis_payload).encode("utf-8")
).decode("utf-8")

kinesis_lambda_record = {
"eventSource": "aws:kinesis",
"eventSourceARN": (
"arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
),
"kinesis": {
"data": encoded_kinesis_data,
"partitionKey": "partition-key-1",
"sequenceNumber": (
"49590338271490256608559692538361571095921575989136588898"
),
},
}

result = _get_dsm_context_from_lambda(kinesis_lambda_record)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "555444333"
assert result["x-datadog-parent-id"] == "888777666"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_no_message_attributes(self):
"""Test message without MessageAttributes returns None."""
message = {
"messageId": "test-message-id",
"body": "Test message without attributes",
}

result = _get_dsm_context_from_lambda(message)

assert result is None

def test_no_datadog_attribute(self):
"""Test message with MessageAttributes but no _datadog attribute returns None."""
message = {
"messageId": "test-message-id",
"body": "Test message",
"messageAttributes": {
"customAttribute": {"stringValue": "custom-value", "dataType": "String"}
},
}

result = _get_dsm_context_from_lambda(message)
assert result is None

def test_empty_datadog_attribute(self):
"""Test message with empty _datadog attribute returns None."""
message = {
"messageId": "test-message-id",
"messageAttributes": {"_datadog": {}},
}

result = _get_dsm_context_from_lambda(message)

assert result is None