Skip to content

Commit da93c9b

Browse files
set dsm checkpoint for all records in array
1 parent 664124e commit da93c9b

File tree

6 files changed

+1211
-1027
lines changed

6 files changed

+1211
-1027
lines changed

datadog_lambda/dsm.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import logging
2+
from datadog_lambda.trigger import EventTypes, _EventSource
3+
from datadog_lambda.tracing import (
4+
extract_datadog_context_from_kinesis_event,
5+
extract_datadog_context_from_sqs_or_sns_event,
6+
)
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
def set_dsm_context(event, event_source: _EventSource):
12+
if (
13+
not event_source.equals(EventTypes.KINESIS)
14+
and not event_source.equals(EventTypes.SNS)
15+
and not event_source.equals(EventTypes.SQS)
16+
):
17+
return
18+
19+
for record in event.get("Records", []):
20+
source_arn = (
21+
record.get("Sns", {}).get("TopicArn")
22+
if event_source.equals(EventTypes.SNS)
23+
else record.get("eventSourceARN")
24+
)
25+
26+
if not source_arn:
27+
logger.debug(
28+
f"DSM:No source arn found, not setting checkpoint for record: {record}"
29+
)
30+
continue
31+
try:
32+
from ddtrace.data_streams import set_consume_checkpoint
33+
34+
# Allowed to be None, DSM checkpoint will still be set
35+
context_json = None
36+
try:
37+
context_json = (
38+
extract_datadog_context_from_kinesis_event(
39+
record.get("kinesis", {})
40+
)
41+
if event_source.equals(EventTypes.KINESIS)
42+
else extract_datadog_context_from_sqs_or_sns_event(record)
43+
)
44+
except Exception as e:
45+
logger.debug(
46+
f"DSM:Failed to extract context from {source_arn} with error: {e}. "
47+
f"Will still attempt to set checkpoint."
48+
)
49+
50+
carrier_get = lambda k: context_json and context_json.get(k) # noqa: E731
51+
set_consume_checkpoint(
52+
event_source.to_string(),
53+
source_arn,
54+
carrier_get,
55+
manual_checkpoint=False,
56+
)
57+
except Exception as e:
58+
logger.debug(
59+
f"DSM:Failed to set consume checkpoint for {source_arn} with error: {e}"
60+
)

datadog_lambda/tracing.py

Lines changed: 69 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -67,24 +67,6 @@
6767
LOWER_64_BITS = "LOWER_64_BITS"
6868

6969

70-
def _dsm_set_checkpoint(context_json, event_type, arn):
71-
if not config.data_streams_enabled:
72-
return
73-
74-
if not arn:
75-
return
76-
77-
try:
78-
from ddtrace.data_streams import set_consume_checkpoint
79-
80-
carrier_get = lambda k: context_json and context_json.get(k) # noqa: E731
81-
set_consume_checkpoint(event_type, arn, carrier_get, manual_checkpoint=False)
82-
except Exception as e:
83-
logger.debug(
84-
f"DSM:Failed to set consume checkpoint for {event_type} {arn}: {e}"
85-
)
86-
87-
8870
def _convert_xray_trace_id(xray_trace_id):
8971
"""
9072
Convert X-Ray trace id (hex)'s last 63 bits to a Datadog trace id (int).
@@ -234,10 +216,7 @@ def extract_context_from_sqs_or_sns_event_or_context(
234216
Lambda Context.
235217
236218
Falls back to lambda context if no trace data is found in the SQS message attributes.
237-
Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
238219
"""
239-
source_arn = ""
240-
event_type = "sqs" if event_source.equals(EventTypes.SQS) else "sns"
241220

242221
# EventBridge => SQS
243222
try:
@@ -249,59 +228,17 @@ def extract_context_from_sqs_or_sns_event_or_context(
249228

250229
try:
251230
first_record = event.get("Records")[0]
252-
source_arn = first_record.get("eventSourceARN", "")
253-
254-
# logic to deal with SNS => SQS event
255-
if "body" in first_record:
256-
body_str = first_record.get("body")
257-
try:
258-
body = json.loads(body_str)
259-
if body.get("Type", "") == "Notification" and "TopicArn" in body:
260-
logger.debug("Found SNS message inside SQS event")
261-
first_record = get_first_record(create_sns_event(body))
262-
except Exception:
263-
pass
264-
265-
msg_attributes = first_record.get("messageAttributes")
266-
if msg_attributes is None:
267-
sns_record = first_record.get("Sns") or {}
268-
# SNS->SQS event would extract SNS arn without this check
269-
if event_source.equals(EventTypes.SNS):
270-
source_arn = sns_record.get("TopicArn", "")
271-
msg_attributes = sns_record.get("MessageAttributes") or {}
272-
dd_payload = msg_attributes.get("_datadog")
273-
if dd_payload:
274-
# SQS uses dataType and binaryValue/stringValue
275-
# SNS uses Type and Value
276-
dd_json_data = None
277-
dd_json_data_type = dd_payload.get("Type") or dd_payload.get("dataType")
278-
if dd_json_data_type == "Binary":
279-
import base64
280-
281-
dd_json_data = dd_payload.get("binaryValue") or dd_payload.get("Value")
282-
if dd_json_data:
283-
dd_json_data = base64.b64decode(dd_json_data)
284-
elif dd_json_data_type == "String":
285-
dd_json_data = dd_payload.get("stringValue") or dd_payload.get("Value")
286-
else:
287-
logger.debug(
288-
"Datadog Lambda Python only supports extracting trace"
289-
"context from String or Binary SQS/SNS message attributes"
290-
)
291-
292-
if dd_json_data:
293-
dd_data = json.loads(dd_json_data)
231+
dd_data = extract_datadog_context_from_sqs_or_sns_event(first_record)
232+
if dd_data:
233+
if is_step_function_event(dd_data):
234+
try:
235+
return extract_context_from_step_functions(dd_data, None)
236+
except Exception:
237+
logger.debug(
238+
"Failed to extract Step Functions context from SQS/SNS event."
239+
)
240+
return propagator.extract(dd_data)
294241

295-
if is_step_function_event(dd_data):
296-
try:
297-
return extract_context_from_step_functions(dd_data, None)
298-
except Exception:
299-
logger.debug(
300-
"Failed to extract Step Functions context from SQS/SNS event."
301-
)
302-
context = propagator.extract(dd_data)
303-
_dsm_set_checkpoint(dd_data, event_type, source_arn)
304-
return context
305242
else:
306243
# Handle case where trace context is injected into attributes.AWSTraceHeader
307244
# example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
@@ -324,16 +261,53 @@ def extract_context_from_sqs_or_sns_event_or_context(
324261
span_id=int(x_ray_context["parent_id"], 16),
325262
sampling_priority=float(x_ray_context["sampled"]),
326263
)
327-
# Still want to set a DSM checkpoint even if DSM context not propagated
328-
_dsm_set_checkpoint(None, event_type, source_arn)
329264
return extract_context_from_lambda_context(lambda_context)
330265
except Exception as e:
331266
logger.debug("The trace extractor returned with error %s", e)
332-
# Still want to set a DSM checkpoint even if DSM context not propagated
333-
_dsm_set_checkpoint(None, event_type, source_arn)
334267
return extract_context_from_lambda_context(lambda_context)
335268

336269

270+
def extract_datadog_context_from_sqs_or_sns_event(record):
271+
if "body" in record:
272+
body_str = record.get("body")
273+
try:
274+
body = json.loads(body_str)
275+
if body.get("Type", "") == "Notification" and "TopicArn" in body:
276+
logger.debug("Found SNS message inside SQS event")
277+
record = get_first_record(create_sns_event(body))
278+
except Exception:
279+
pass
280+
281+
msg_attributes = record.get("messageAttributes")
282+
if msg_attributes is None:
283+
sns_record = record.get("Sns") or {}
284+
msg_attributes = sns_record.get("MessageAttributes") or {}
285+
dd_payload = msg_attributes.get("_datadog")
286+
if dd_payload:
287+
# SQS uses dataType and binaryValue/stringValue
288+
# SNS uses Type and Value
289+
dd_json_data = None
290+
dd_json_data_type = dd_payload.get("Type") or dd_payload.get("dataType")
291+
if dd_json_data_type == "Binary":
292+
import base64
293+
294+
dd_json_data = dd_payload.get("binaryValue") or dd_payload.get("Value")
295+
if dd_json_data:
296+
dd_json_data = base64.b64decode(dd_json_data)
297+
elif dd_json_data_type == "String":
298+
dd_json_data = dd_payload.get("stringValue") or dd_payload.get("Value")
299+
else:
300+
logger.debug(
301+
"Datadog Lambda Python only supports extracting trace"
302+
"context from String or Binary SQS/SNS message attributes"
303+
)
304+
305+
if dd_json_data:
306+
dd_data = json.loads(dd_json_data)
307+
return dd_data
308+
return None
309+
310+
337311
def _extract_context_from_eventbridge_sqs_event(event):
338312
"""
339313
Extracts Datadog trace context from an SQS event triggered by
@@ -389,35 +363,35 @@ def extract_context_from_eventbridge_event(event, lambda_context):
389363
def extract_context_from_kinesis_event(event, lambda_context):
390364
"""
391365
Extract datadog trace context from a Kinesis Stream's base64 encoded data string
392-
Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
393366
"""
394-
source_arn = ""
395367
try:
396368
record = get_first_record(event)
397-
source_arn = record.get("eventSourceARN", "")
398369
kinesis = record.get("kinesis")
399370
if not kinesis:
400371
return extract_context_from_lambda_context(lambda_context)
401-
data = kinesis.get("data")
402-
if data:
403-
import base64
404-
405-
b64_bytes = data.encode("ascii")
406-
str_bytes = base64.b64decode(b64_bytes)
407-
data_str = str_bytes.decode("ascii")
408-
data_obj = json.loads(data_str)
409-
dd_ctx = data_obj.get("_datadog")
410-
if dd_ctx:
411-
context = propagator.extract(dd_ctx)
412-
_dsm_set_checkpoint(dd_ctx, "kinesis", source_arn)
413-
return context
372+
dd_ctx = extract_datadog_context_from_kinesis_event(kinesis)
373+
if dd_ctx:
374+
return propagator.extract(dd_ctx)
414375
except Exception as e:
415376
logger.debug("The trace extractor returned with error %s", e)
416-
# Still want to set a DSM checkpoint even if DSM context not propagated
417-
_dsm_set_checkpoint(None, "kinesis", source_arn)
418377
return extract_context_from_lambda_context(lambda_context)
419378

420379

380+
def extract_datadog_context_from_kinesis_event(record_kinesis_field):
381+
data = record_kinesis_field.get("data")
382+
if data:
383+
import base64
384+
385+
b64_bytes = data.encode("ascii")
386+
str_bytes = base64.b64decode(b64_bytes)
387+
data_str = str_bytes.decode("ascii")
388+
data_obj = json.loads(data_str)
389+
dd_ctx = data_obj.get("_datadog")
390+
if dd_ctx:
391+
return dd_ctx
392+
return None
393+
394+
421395
def _deterministic_sha256_hash(s: str, part: str) -> int:
422396
import hashlib
423397

datadog_lambda/wrapper.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from time import time_ns
1111

1212
from datadog_lambda.asm import asm_set_context, asm_start_response, asm_start_request
13+
from datadog_lambda.dsm import set_dsm_context
1314
from datadog_lambda.extension import should_use_extension, flush_extension
1415
from datadog_lambda.cold_start import (
1516
set_cold_start,
@@ -240,6 +241,9 @@ def _before(self, event, context):
240241
if config.appsec_enabled:
241242
asm_set_context(event_source)
242243

244+
if config.data_streams_enabled:
245+
set_dsm_context(event, event_source)
246+
243247
self.span = create_function_execution_span(
244248
context=context,
245249
function_name=config.function_name,

0 commit comments

Comments
 (0)