Skip to content

Commit 1267a24

Browse files
joeyzhao2018claude
andcommitted
feat(durable): extract and resume trace context across invocations
Introduce a two-tier extraction priority for AWS Lambda durable execution events in ``datadog_lambda.tracing.extract_context_from_durable_execution``: 1. Highest-numbered ``_datadog_{N}`` STEP checkpoint (written by the ``aws_durable_execution_sdk_python`` integration on a prior invocation). 2. Datadog headers found in the event's original ``InputPayload`` — handles upstream-instrumented invokers that embed propagation headers under ``_datadog`` or ``headers``. If neither yields a context the function returns ``None`` and the existing extraction chain continues; on the first invocation of a fresh durable execution the tracer mints a new trace, and subsequent invocations recover that same trace via the priority-1 checkpoint. ``create_durable_execution_root_span`` continues to emit ``aws.durable-execution`` as the root span on the very first invocation only; the span id is left as whatever the tracer mints, since the dd-trace-py integration discovers it via a grandparent walk in the live span tree. The wrapper finishes the durable root span after ``aws.lambda`` so the suspend-path checkpoint write in dd-trace-py captures the fully-resolved trace context. Strips a handful of leftover ``[DD-DURABLE]`` ``print()`` debug lines that would have spammed production Lambda logs. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 55d8234 commit 1267a24

4 files changed

Lines changed: 287 additions & 106 deletions

File tree

datadog_lambda/patch.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,20 @@ def patch_all():
3131

3232
if config.trace_enabled:
3333
patch_all_dd()
34-
# Todo: remove this for PR. This is just a testing helper.
35-
# Call the aws_durable_execution_sdk_python integration's patch()
36-
# directly because PyPI ddtrace's _monkey.py doesn't know about it yet,
37-
# so ddtrace.patch(aws_durable_execution_sdk_python=True) would be a no-op.
34+
# AIDEV-NOTE: Until the aws_durable_execution_sdk_python integration
35+
# ships in a stable ddtrace release, this branch wires it up directly.
36+
# ddtrace.patch(aws_durable_execution_sdk_python=True) becomes a no-op
37+
# against PyPI ddtrace because _monkey.py doesn't know the name yet.
38+
# Remove once the integration is GA.
3839
try:
39-
from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import patch as _patch_ade
40+
from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import (
41+
patch as _patch_ade,
42+
)
43+
4044
_patch_ade()
41-
print("[DD-DURABLE] aws_durable_execution_sdk_python integration patched")
45+
logger.debug("aws_durable_execution_sdk_python integration patched")
4246
except Exception as e:
43-
print(f"[DD-DURABLE] Failed to patch aws_durable_execution_sdk_python: {e}")
47+
logger.debug("Failed to patch aws_durable_execution_sdk_python: %s", e)
4448
else:
4549
_patch_http()
4650
_ensure_patch_requests()

datadog_lambda/tracing.py

Lines changed: 118 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -649,117 +649,143 @@ def is_durable_execution_replay(event):
649649
# The SDK always includes the EXECUTION operation itself (1 operation on first invocation).
650650
# A replay has >1 operations (the EXECUTION + previously completed operations).
651651
# This aligns with the SDK's ReplayStatus logic in execution.py.
652-
is_replay = len(operations) > 1
652+
return len(operations) > 1
653653

654-
if is_replay:
655-
print(f"[DD-DURABLE] Detected replay invocation with {len(operations)} existing operations")
656-
else:
657-
print(f"[DD-DURABLE] Detected first invocation ({len(operations)} operations)")
658-
659-
return is_replay
660654

655+
_TRACE_CHECKPOINT_PREFIX = "_datadog_"
661656

662-
_TRACE_CHECKPOINT_PREFIX = "_dd_trace_context_"
663657

658+
def _extract_from_datadog_checkpoint(operations):
659+
"""Priority 1: highest-numbered ``_datadog_{N}`` STEP operation.
664660
665-
def extract_context_from_durable_execution(event, lambda_context):
661+
Returns a Context, or None if no usable checkpoint is present.
666662
"""
667-
Extract Datadog trace context from AWS Lambda Durable Execution event.
663+
candidates = []
664+
for operation in operations:
665+
op_name = operation.get("Name")
666+
if not op_name or not op_name.startswith(_TRACE_CHECKPOINT_PREFIX):
667+
continue
668+
suffix = op_name[len(_TRACE_CHECKPOINT_PREFIX) :]
669+
try:
670+
number = int(suffix)
671+
except ValueError:
672+
continue
673+
candidates.append((number, operation))
674+
675+
if not candidates:
676+
return None
677+
678+
candidates.sort(key=lambda t: t[0])
679+
_, operation = candidates[-1]
680+
681+
payload_str = (operation.get("StepDetails") or {}).get("Result")
682+
if not payload_str:
683+
return None
668684

669-
Looks for trace-context checkpoints created by the dd-trace-py integration
670-
on previous invocations. Checkpoints are STEP operations named
671-
``_dd_trace_context_{N}`` with trace headers stored as their StepDetails.Result
672-
payload. The one with the highest ``{N}`` wins — it corresponds to the
673-
latest trace-context state from the previous invocation. Customer
674-
operation payloads are never read or modified.
675-
"""
676685
try:
677-
if not isinstance(event, dict):
678-
return None
686+
payload = json.loads(payload_str)
687+
except (ValueError, TypeError):
688+
return None
679689

680-
if "DurableExecutionArn" not in event or "InitialExecutionState" not in event:
681-
return None
690+
if not isinstance(payload, dict):
691+
return None
682692

683-
print("[DD-DURABLE] Detected AWS Lambda Durable Execution event")
693+
context = propagator.extract(payload)
694+
if context and context.trace_id:
695+
return context
696+
return None
684697

685-
initial_state = event.get("InitialExecutionState", {})
686-
operations = initial_state.get("Operations", [])
687698

688-
print(f"[DD-DURABLE] Found {len(operations)} operations in InitialExecutionState")
699+
def _extract_from_input_payload(operations):
700+
"""Priority 2: Datadog headers in the original event's InputPayload.
689701
690-
# Collect all _dd_trace_context_{N} checkpoints, then pick the highest N
691-
candidates = [] # list of (number, operation)
692-
for operation in operations:
693-
op_name = operation.get("Name")
694-
if not op_name or not op_name.startswith(_TRACE_CHECKPOINT_PREFIX):
695-
continue
696-
suffix = op_name[len(_TRACE_CHECKPOINT_PREFIX):]
697-
try:
698-
number = int(suffix)
699-
except ValueError:
700-
continue
701-
candidates.append((number, operation))
702+
The first operation in ``InitialExecutionState.Operations`` is the EXECUTION
703+
operation; its ``ExecutionDetails.InputPayload`` is the original event the
704+
durable function was invoked with — immutable across replays. If the caller
705+
embedded Datadog headers (typical for API Gateway, direct invoke from
706+
another instrumented service, or a chained durable invoke), use them.
707+
"""
708+
if not operations:
709+
return None
702710

703-
if not candidates:
704-
print("[DD-DURABLE] No trace context checkpoints found in operations")
705-
return None
711+
first_op = operations[0]
712+
payload_str = (first_op.get("ExecutionDetails") or {}).get("InputPayload")
713+
if not payload_str:
714+
return None
706715

707-
candidates.sort(key=lambda t: t[0])
708-
number, operation = candidates[-1]
709-
operation_id = operation.get("Id")
710-
op_name = operation.get("Name")
711-
print(f"[DD-DURABLE] Using latest trace checkpoint: name={op_name}, id={operation_id}")
716+
try:
717+
payload = json.loads(payload_str)
718+
except (ValueError, TypeError):
719+
return None
712720

713-
step_details = operation.get("StepDetails", {})
714-
payload_str = step_details.get("Result")
715-
if not payload_str:
716-
print(f"[DD-DURABLE] Trace checkpoint {op_name} has no Result, skipping")
717-
return None
721+
if not isinstance(payload, dict):
722+
return None
718723

719-
try:
720-
payload = json.loads(payload_str)
721-
except (json.JSONDecodeError, TypeError, ValueError) as e:
722-
print(f"[DD-DURABLE] Failed to parse trace checkpoint payload: {e}")
723-
logger.debug("Failed to parse trace checkpoint payload: %s", e)
724+
# Try the wrapping conventions used by other instrumented sources.
725+
for carrier in (
726+
payload.get("_datadog"),
727+
payload.get("headers"),
728+
payload,
729+
):
730+
if not isinstance(carrier, dict):
731+
continue
732+
context = propagator.extract(carrier)
733+
if context and context.trace_id:
734+
return context
735+
return None
736+
737+
738+
def extract_context_from_durable_execution(event, lambda_context):
739+
"""
740+
Extract Datadog trace context from AWS Lambda Durable Execution event.
741+
742+
Two-tier priority:
743+
744+
1. Highest-numbered ``_datadog_{N}`` STEP checkpoint (set by the
745+
``aws_durable_execution_sdk_python`` integration on a prior invocation).
746+
2. Datadog headers found in the original event's ``InputPayload`` —
747+
carries upstream context when an instrumented service invoked us.
748+
749+
If neither yields a context, returns ``None`` and the caller falls through
750+
to the rest of the extraction chain (Lambda context, X-Ray, etc.). On the
751+
very first invocation of a durable execution with no upstream context, the
752+
tracer will simply mint a fresh trace; subsequent invocations recover that
753+
same trace via the priority-1 checkpoint.
754+
"""
755+
try:
756+
if not isinstance(event, dict):
724757
return None
725758

726-
if not isinstance(payload, dict):
727-
print(f"[DD-DURABLE] Trace checkpoint payload is not a dict: {type(payload)}")
759+
if "DurableExecutionArn" not in event or "InitialExecutionState" not in event:
728760
return None
729761

730-
context = propagator.extract(payload)
731-
if context and context.trace_id:
732-
print(
733-
f"[DD-DURABLE] Extracted trace context from {op_name}: "
734-
f"trace_id={context.trace_id}, span_id={context.span_id}, "
735-
f"headers={list(payload.keys())}"
736-
)
737-
logger.debug(
738-
"Extracted Datadog trace context from trace checkpoint %s: %s",
739-
op_name,
740-
context,
741-
)
742-
return context
762+
operations = event.get("InitialExecutionState", {}).get("Operations", [])
763+
764+
ctx = _extract_from_datadog_checkpoint(operations)
765+
if ctx is not None:
766+
return ctx
767+
768+
return _extract_from_input_payload(operations)
769+
743770
except Exception as e:
744771
logger.debug("Failed to extract trace context from durable execution: %s", e)
745-
746-
return None
772+
return None
747773

748774

749775
def create_durable_execution_root_span(event):
750776
"""
751777
Create the durable execution root span on the FIRST invocation only.
752778
753-
Component 1 & 4 of extracheckpoint trace propagation:
754-
- First invocation (no checkpoint context): creates root span, returns it
755-
- Subsequent invocations (checkpoint context found): returns None
756-
(context already activated by extract_context_from_durable_execution,
757-
no need to recreate root span)
779+
- First invocation (no prior operations): creates the root span and returns it.
780+
The span gets a fresh random ``span_id`` from the tracer; dd-trace-py's
781+
checkpoint writer reads that id back via the live span tree (grandparent
782+
walk) and persists it so subsequent invocations parent off the same root.
783+
- Replay invocations: returns ``None`` — trace context is restored from the
784+
``_datadog_{N}`` checkpoint by ``extract_context_from_durable_execution``.
758785
759-
Returns the root span (caller must call span.finish() when invocation ends),
760-
or None if not a durable execution or if this is a replay.
786+
Returns the root span (caller must call ``span.finish()`` when the invocation
787+
ends), or ``None`` if not a durable execution or if this is a replay.
761788
"""
762-
print(f"[DD-DURABLE] create_durable_execution_root_span called, event type={type(event).__name__}")
763789
try:
764790
if not isinstance(event, dict):
765791
return None
@@ -769,34 +795,30 @@ def create_durable_execution_root_span(event):
769795
if not execution_arn or not has_initial_state:
770796
return None
771797

772-
# Component 4: On replay, context is already activated from checkpoint.
773-
# Don't recreate root span — it was already emitted in a prior invocation.
774798
if is_durable_execution_replay(event):
775-
print("[DD-DURABLE] Replay invocation — skipping root span creation (context from checkpoint)")
776799
return None
777800

778-
# Component 1: First invocation — create new root span
779-
service_name = os.environ.get("DD_DURABLE_EXECUTION_SERVICE") or "aws.durable-execution"
780-
resource = execution_arn.split(":")[-1] if ":" in execution_arn else execution_arn
801+
service_name = (
802+
os.environ.get("DD_DURABLE_EXECUTION_SERVICE") or "aws.durable-execution"
803+
)
804+
resource = (
805+
execution_arn.split(":")[-1] if ":" in execution_arn else execution_arn
806+
)
781807

782808
span = tracer.trace(
783809
"aws.durable-execution",
784810
service=service_name,
785811
resource=resource,
786812
span_type="serverless",
787813
)
814+
if span is None:
815+
return None
788816

789-
if span:
790-
span.set_tag("durable.execution_arn", execution_arn)
791-
print(f"[DD-DURABLE] Created root span: trace_id={span.trace_id}, span_id={span.span_id}, resource={resource}")
792-
else:
793-
print("[DD-DURABLE] tracer.trace() returned None")
794-
817+
span.set_tag("durable.execution_arn", execution_arn)
795818
return span
796819

797820
except Exception as e:
798821
logger.debug("Failed to create durable execution root span: %s", e)
799-
print(f"[DD-DURABLE] Failed to create root span: {e}")
800822
return None
801823

802824

@@ -820,7 +842,9 @@ def extract_dd_trace_context(
820842
logger.debug("Extracted Datadog trace context from durable execution")
821843
dd_trace_context = durable_context
822844
trace_context_source = TraceContextSource.EVENT
823-
logger.debug("extracted dd trace context from durable execution: %s", dd_trace_context)
845+
logger.debug(
846+
"extracted dd trace context from durable execution: %s", dd_trace_context
847+
)
824848
return dd_trace_context, trace_context_source, event_source
825849

826850
if extractor is not None:

datadog_lambda/wrapper.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -286,20 +286,31 @@ def _before(self, event, context):
286286

287287
# For durable executions: create root span BEFORE aws.lambda span
288288
# so aws.lambda becomes a child of the root durable execution span.
289-
self.is_durable = isinstance(event, dict) and "DurableExecutionArn" in event
289+
self.is_durable = (
290+
isinstance(event, dict) and "DurableExecutionArn" in event
291+
)
290292
if self.is_durable:
291293
# Set _reactivate on the active context so it persists after
292294
# all spans close. This prevents ddtrace from purging the
293295
# context when the last span (aws.lambda or root) finishes.
296+
# AIDEV-NOTE: _reactivate=True keeps the active Context alive
297+
# after all spans on it close. Without it, ddtrace purges the
298+
# context when the last span (aws.lambda or the durable root)
299+
# finishes, and any later trace-checkpoint write would lose
300+
# the propagation parent. Python-only fix; no JS analog.
294301
active_ctx = tracer.context_provider.active()
295-
if active_ctx and hasattr(active_ctx, '_reactivate'):
302+
if active_ctx and hasattr(active_ctx, "_reactivate"):
296303
active_ctx._reactivate = True
297-
print(f"[DD-DURABLE] Set _reactivate=True on active context")
298304

299305
# For replay: copy _meta from extracted context for _dd.p.* tag propagation
300306
# set_dd_trace_py_root only copies trace_id/span_id/sampling_priority,
301307
# so propagation tags from the checkpoint would be lost without this.
302-
if dd_context and hasattr(dd_context, '_meta') and active_ctx and hasattr(active_ctx, '_meta'):
308+
if (
309+
dd_context
310+
and hasattr(dd_context, "_meta")
311+
and active_ctx
312+
and hasattr(active_ctx, "_meta")
313+
):
303314
for k, v in dd_context._meta.items():
304315
if k not in active_ctx._meta:
305316
active_ctx._meta[k] = v
@@ -397,7 +408,6 @@ def _after(self, event, context):
397408
# integration when the aws.durable_execution.execute span closes.
398409
if self.durable_root_span:
399410
self.durable_root_span.finish()
400-
print(f"[DD-DURABLE] Finished root span: trace_id={self.durable_root_span.trace_id}, span_id={self.durable_root_span.span_id}")
401411
self.durable_root_span = None
402412

403413
if status_code:

0 commit comments

Comments
 (0)