Skip to content

Commit 81c90d3

Browse files
fixes
1 parent 2e3d70f commit 81c90d3

File tree

2 files changed

+50
-1
lines changed

2 files changed

+50
-1
lines changed

datadog_lambda/dsm.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ def _dsm_set_kinesis_context(event):
6868
if not context_json:
6969
logger.debug("DataStreams skipped lambda message: %r", record)
7070
return None
71-
7271
carrier_get = _create_carrier_get(context_json)
7372
set_consume_checkpoint("kinesis", arn, carrier_get)
7473

@@ -78,10 +77,21 @@ def _get_dsm_context_from_lambda(message):
7877
Lambda-specific message formats:
7978
- message.messageAttributes._datadog.stringValue (SQS -> lambda)
8079
- message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)
80+
- message.kinesis.data.decode()._datadog (Kinesis -> lambda)
8181
"""
8282
context_json = None
8383
message_body = message
8484

85+
if "kinesis" in message:
86+
try:
87+
kinesis_data = json.loads(
88+
base64.b64decode(message["kinesis"]["data"]).decode()
89+
)
90+
return kinesis_data.get("_datadog")
91+
except (ValueError, TypeError, KeyError):
92+
logger.debug("Unable to parse kinesis data for lambda message")
93+
return None
94+
8595
if "Sns" in message:
8696
message_body = message["Sns"]
8797

tests/test_dsm.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,45 @@ def test_sns_to_lambda_format(self):
442442
assert result["x-datadog-parent-id"] == "222222222"
443443
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
444444

445+
def test_kinesis_to_lambda_format(self):
446+
"""Test format: message.kinesis.data.decode()._datadog (Kinesis -> lambda)"""
447+
trace_context = {
448+
"x-datadog-trace-id": "555444333",
449+
"x-datadog-parent-id": "888777666",
450+
"dd-pathway-ctx": "test-pathway-ctx",
451+
}
452+
453+
# Create the kinesis data payload
454+
kinesis_payload = {
455+
"_datadog": trace_context,
456+
"actualData": "some business data",
457+
}
458+
encoded_kinesis_data = base64.b64encode(
459+
json.dumps(kinesis_payload).encode("utf-8")
460+
).decode("utf-8")
461+
462+
kinesis_lambda_record = {
463+
"eventSource": "aws:kinesis",
464+
"eventSourceARN": (
465+
"arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
466+
),
467+
"kinesis": {
468+
"data": encoded_kinesis_data,
469+
"partitionKey": "partition-key-1",
470+
"sequenceNumber": (
471+
"49590338271490256608559692538361571095921575989136588898"
472+
),
473+
},
474+
}
475+
476+
result = _get_dsm_context_from_lambda(kinesis_lambda_record)
477+
478+
assert result is not None
479+
assert result == trace_context
480+
assert result["x-datadog-trace-id"] == "555444333"
481+
assert result["x-datadog-parent-id"] == "888777666"
482+
assert result["dd-pathway-ctx"] == "test-pathway-ctx"
483+
445484
def test_no_message_attributes(self):
446485
"""Test message without MessageAttributes returns None."""
447486
message = {

0 commit comments

Comments
 (0)