Skip to content

Commit 20b0054

Browse files
committed
durable: extract trace context from checkpoints and input payload
1 parent 42b2e54 commit 20b0054

2 files changed

Lines changed: 207 additions & 20 deletions

File tree

datadog_lambda/tracing.py

Lines changed: 141 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
DD_TRACE_JAVA_TRACE_ID_PADDING = "00000000"
6262
HIGHER_64_BITS = "HIGHER_64_BITS"
6363
LOWER_64_BITS = "LOWER_64_BITS"
64+
_TRACE_CHECKPOINT_PREFIX = "_datadog_"
6465

6566

6667
def _dsm_set_checkpoint(context_json, event_type, arn):
@@ -546,6 +547,121 @@ def extract_context_from_step_functions(event, lambda_context):
546547
return extract_context_from_lambda_context(lambda_context)
547548

548549

550+
def _durable_operations(event):
551+
if not isinstance(event, dict):
552+
return []
553+
554+
operations = event.get("InitialExecutionState", {}).get("Operations")
555+
if isinstance(operations, list):
556+
return operations
557+
if not isinstance(operations, dict):
558+
return []
559+
560+
numeric_keys = []
561+
other_keys = []
562+
for key, value in operations.items():
563+
if not isinstance(value, dict):
564+
continue
565+
try:
566+
numeric_keys.append((int(key), value))
567+
except (TypeError, ValueError):
568+
other_keys.append((str(key), value))
569+
570+
numeric_keys.sort(key=lambda item: item[0])
571+
other_keys.sort(key=lambda item: item[0])
572+
return [value for _, value in numeric_keys + other_keys]
573+
574+
575+
def _extract_context_from_durable_checkpoint(operation):
576+
if not isinstance(operation, dict):
577+
return None
578+
579+
step_details = operation.get("StepDetails")
580+
if not isinstance(step_details, dict):
581+
return None
582+
583+
result = step_details.get("Result")
584+
if isinstance(result, str):
585+
try:
586+
result = json.loads(result)
587+
except Exception:
588+
return None
589+
590+
if not isinstance(result, dict):
591+
return None
592+
593+
return propagator.extract(result)
594+
595+
596+
def _extract_context_from_durable_input_payload(operation):
597+
if not isinstance(operation, dict):
598+
return None
599+
600+
execution_details = operation.get("ExecutionDetails")
601+
if not isinstance(execution_details, dict):
602+
return None
603+
604+
input_payload = execution_details.get("InputPayload")
605+
if isinstance(input_payload, str):
606+
try:
607+
input_payload = json.loads(input_payload)
608+
except Exception:
609+
return None
610+
611+
if not isinstance(input_payload, dict):
612+
return None
613+
614+
headers = input_payload.get("headers")
615+
if isinstance(headers, dict):
616+
return propagator.extract(headers)
617+
618+
dd_data = input_payload.get("_datadog")
619+
if isinstance(dd_data, dict):
620+
return propagator.extract(dd_data)
621+
622+
return None
623+
624+
625+
def extract_context_from_durable_execution(event):
626+
if not isinstance(event, dict):
627+
return None
628+
if not isinstance(event.get("DurableExecutionArn"), str):
629+
return None
630+
631+
operations = _durable_operations(event)
632+
if not operations:
633+
return None
634+
635+
best_context = None
636+
best_number = -1
637+
for operation in operations:
638+
if not isinstance(operation, dict):
639+
continue
640+
name = operation.get("Name")
641+
if not isinstance(name, str) or not name.startswith(_TRACE_CHECKPOINT_PREFIX):
642+
continue
643+
suffix = name[len(_TRACE_CHECKPOINT_PREFIX) :]
644+
try:
645+
number = int(suffix)
646+
except (TypeError, ValueError):
647+
continue
648+
if number < best_number:
649+
continue
650+
context = _extract_context_from_durable_checkpoint(operation)
651+
if _is_context_complete(context):
652+
best_context = context
653+
best_number = number
654+
655+
if best_context is not None:
656+
return best_context
657+
658+
upstream_context = _extract_context_from_durable_input_payload(operations[0])
659+
if _is_context_complete(upstream_context):
660+
return upstream_context
661+
662+
return None
663+
664+
549665
def extract_context_custom_extractor(extractor, event, lambda_context):
550666
"""
551667
Extract Datadog trace context using a custom trace extractor function
@@ -633,29 +749,34 @@ def extract_dd_trace_context(
633749
global dd_trace_context
634750
trace_context_source = None
635751
event_source = parse_event_source(event)
752+
context = None
636753

637754
if extractor is not None:
638755
context = extract_context_custom_extractor(extractor, event, lambda_context)
639-
elif isinstance(event, (set, dict)) and "request" in event:
640-
context = extract_context_from_request_header_or_context(
641-
event, lambda_context, event_source
642-
)
643-
elif isinstance(event, (set, dict)) and "headers" in event:
644-
context = extract_context_from_http_event_or_context(
645-
event, lambda_context, event_source, decode_authorizer_context
646-
)
647-
elif event_source.equals(EventTypes.SNS) or event_source.equals(EventTypes.SQS):
648-
context = extract_context_from_sqs_or_sns_event_or_context(
649-
event, lambda_context, event_source
650-
)
651-
elif event_source.equals(EventTypes.EVENTBRIDGE):
652-
context = extract_context_from_eventbridge_event(event, lambda_context)
653-
elif event_source.equals(EventTypes.KINESIS):
654-
context = extract_context_from_kinesis_event(event, lambda_context)
655-
elif event_source.equals(EventTypes.STEPFUNCTIONS):
656-
context = extract_context_from_step_functions(event, lambda_context)
657-
else:
658-
context = extract_context_from_lambda_context(lambda_context)
756+
elif isinstance(event, (set, dict)) and "DurableExecutionArn" in event:
757+
context = extract_context_from_durable_execution(event)
758+
759+
if context is None:
760+
if isinstance(event, (set, dict)) and "request" in event:
761+
context = extract_context_from_request_header_or_context(
762+
event, lambda_context, event_source
763+
)
764+
elif isinstance(event, (set, dict)) and "headers" in event:
765+
context = extract_context_from_http_event_or_context(
766+
event, lambda_context, event_source, decode_authorizer_context
767+
)
768+
elif event_source.equals(EventTypes.SNS) or event_source.equals(EventTypes.SQS):
769+
context = extract_context_from_sqs_or_sns_event_or_context(
770+
event, lambda_context, event_source
771+
)
772+
elif event_source.equals(EventTypes.EVENTBRIDGE):
773+
context = extract_context_from_eventbridge_event(event, lambda_context)
774+
elif event_source.equals(EventTypes.KINESIS):
775+
context = extract_context_from_kinesis_event(event, lambda_context)
776+
elif event_source.equals(EventTypes.STEPFUNCTIONS):
777+
context = extract_context_from_step_functions(event, lambda_context)
778+
else:
779+
context = extract_context_from_lambda_context(lambda_context)
659780

660781
if _is_context_complete(context):
661782
logger.debug("Extracted Datadog trace context from event or context")

tests/test_tracing.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,72 @@ def test_with_w3c_trace_headers(self):
394394
headers, {"headers": headers}
395395
)
396396

397+
@with_trace_propagation_style("datadog")
398+
def test_extracts_durable_trace_context_from_latest_checkpoint_operation_map(self):
399+
lambda_ctx = get_mock_context()
400+
headers = {
401+
TraceHeader.TRACE_ID: "123",
402+
TraceHeader.PARENT_ID: "321",
403+
TraceHeader.SAMPLING_PRIORITY: "1",
404+
}
405+
406+
event = {
407+
"DurableExecutionArn": "arn:aws:lambda:us-east-2:123456789012:function:demo:1/durable-execution/demo/abc",
408+
"CheckpointToken": "token",
409+
"InitialExecutionState": {
410+
"Operations": {
411+
"0": {
412+
"Type": "EXECUTION",
413+
"ExecutionDetails": {
414+
"InputPayload": {"order_id": "ORD-300"}
415+
},
416+
},
417+
"1": {
418+
"Name": "_datadog_0",
419+
"StepDetails": {"Result": {TraceHeader.TRACE_ID: "999"}},
420+
},
421+
"2": {
422+
"Name": "_datadog_1",
423+
"StepDetails": {"Result": headers},
424+
},
425+
}
426+
},
427+
}
428+
429+
ctx, source, _ = extract_dd_trace_context(event, lambda_ctx)
430+
431+
self.assertEqual(source, "event")
432+
self.assertEqual(ctx, Context(trace_id=123, span_id=321, sampling_priority=1))
433+
434+
@with_trace_propagation_style("datadog")
435+
def test_extracts_durable_trace_context_from_input_payload_when_no_checkpoint(self):
436+
lambda_ctx = get_mock_context()
437+
headers = {
438+
TraceHeader.TRACE_ID: "777",
439+
TraceHeader.PARENT_ID: "888",
440+
TraceHeader.SAMPLING_PRIORITY: "1",
441+
}
442+
443+
event = {
444+
"DurableExecutionArn": "arn:aws:lambda:us-east-2:123456789012:function:demo:1/durable-execution/demo/first",
445+
"CheckpointToken": "token",
446+
"InitialExecutionState": {
447+
"Operations": {
448+
"0": {
449+
"Type": "EXECUTION",
450+
"ExecutionDetails": {
451+
"InputPayload": json.dumps({"headers": headers})
452+
},
453+
}
454+
}
455+
},
456+
}
457+
458+
ctx, source, _ = extract_dd_trace_context(event, lambda_ctx)
459+
460+
self.assertEqual(source, "event")
461+
self.assertEqual(ctx, Context(trace_id=777, span_id=888, sampling_priority=1))
462+
397463
@with_trace_propagation_style("datadog")
398464
def test_with_extractor_function(self):
399465
def extractor_foo(event, context):

0 commit comments

Comments
 (0)