From 564174af3c5c16717fdb1784dad9c02829bfd7b0 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Fri, 17 Apr 2026 13:00:21 -0400 Subject: [PATCH 1/4] initial checkin --- datadog_lambda/tracing.py | 184 ++++++++++++++++++++++++++++++++++++++ datadog_lambda/wrapper.py | 78 +++++++++++++++- 2 files changed, 261 insertions(+), 1 deletion(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 3c7d9f11..29bc8064 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -621,6 +621,180 @@ def get_injected_authorizer_data(event, is_http_api) -> dict: logger.debug("Failed to check if invocated by an authorizer. error %s", e) +def is_durable_execution_replay(event): + """ + Check if this Lambda invocation is a durable execution replay. + + A replay occurs when there are existing operations in InitialExecutionState, + meaning this invocation is resuming from a previous checkpoint rather than + starting fresh. + + For replay invocations, we should skip creating inferred spans because: + - The trace context is being continued from the checkpoint + - Creating an inferred span would create a duplicate + + Returns: + True if this is a replay invocation (should skip inferred span) + False if this is first invocation or not a durable execution + """ + if not isinstance(event, dict): + return False + + if "DurableExecutionArn" not in event: + return False + + initial_state = event.get("InitialExecutionState", {}) + operations = initial_state.get("Operations", []) + + # The SDK always includes the EXECUTION operation itself (1 operation on first invocation). + # A replay has >1 operations (the EXECUTION + previously completed operations). + # This aligns with the SDK's ReplayStatus logic in execution.py. + is_replay = len(operations) > 1 + + if is_replay: + print(f"[DD-DURABLE] Detected replay invocation with {len(operations)} existing operations") + else: + print(f"[DD-DURABLE] Detected first invocation ({len(operations)} operations)") + + return is_replay + + +def extract_context_from_durable_execution(event, lambda_context): + """ + Extract Datadog trace context from AWS Lambda Durable Execution event. + + Looks for extra trace context checkpoints created by the dd-trace plugin. + These are STEP operations with Name="_dd_trace_context" that store trace + headers in their StepDetails.Result payload. Customer operation payloads + are never read or modified. + + Scans operations in reverse to find the LAST trace checkpoint, which + corresponds to the most recently completed customer operation. This gives + proper parent chaining: each invocation's root span is parented to the + last operation span from the previous invocation. + """ + try: + if not isinstance(event, dict): + return None + + if "DurableExecutionArn" not in event or "InitialExecutionState" not in event: + return None + + print("[DD-DURABLE] Detected AWS Lambda Durable Execution event") + + initial_state = event.get("InitialExecutionState", {}) + operations = initial_state.get("Operations", []) + + print(f"[DD-DURABLE] Found {len(operations)} operations in InitialExecutionState") + + # Scan in reverse to find the LAST trace context checkpoint + # (corresponds to the most recently completed customer operation) + for idx in range(len(operations) - 1, -1, -1): + operation = operations[idx] + op_name = operation.get("Name") + + if op_name != "_dd_trace_context": + continue + + operation_id = operation.get("Id") + print(f"[DD-DURABLE] Found trace checkpoint: id={operation_id}, index={idx}") + + # Trace context is in StepDetails.Result (standard STEP format) + step_details = operation.get("StepDetails", {}) + payload_str = step_details.get("Result") + + if not payload_str: + print(f"[DD-DURABLE] Trace checkpoint {operation_id} has no Result, skipping") + continue + + try: + payload = json.loads(payload_str) + if not isinstance(payload, dict): + print(f"[DD-DURABLE] Trace checkpoint payload is not a dict: {type(payload)}") + continue + + trace_id = payload.get("x-datadog-trace-id") + span_id = payload.get("x-datadog-parent-id") + + if trace_id and span_id: + # Use HTTPPropagator to restore full context including + # baggage, _dd.p.* tags, origin, and sampling priority + context = propagator.extract(payload) + if context and context.trace_id: + print(f"[DD-DURABLE] Extracted trace context from trace checkpoint {operation_id}") + print(f"[DD-DURABLE] trace_id={trace_id}, span_id={span_id}, headers={list(payload.keys())}") + logger.debug( + "Extracted Datadog trace context from trace checkpoint %s: %s", + operation_id, + context, + ) + return context + except (json.JSONDecodeError, TypeError, ValueError) as e: + print(f"[DD-DURABLE] Failed to parse trace checkpoint payload: {e}") + logger.debug("Failed to parse trace checkpoint payload: %s", e) + continue + + print("[DD-DURABLE] No trace context checkpoints found in operations") + except Exception as e: + logger.debug("Failed to extract trace context from durable execution: %s", e) + + return None + + +def create_durable_execution_root_span(event): + """ + Create the durable execution root span on the FIRST invocation only. + + Component 1 & 4 of extracheckpoint trace propagation: + - First invocation (no checkpoint context): creates root span, returns it + - Subsequent invocations (checkpoint context found): returns None + (context already activated by extract_context_from_durable_execution, + no need to recreate root span) + + Returns the root span (caller must call span.finish() when invocation ends), + or None if not a durable execution or if this is a replay. + """ + print(f"[DD-DURABLE] create_durable_execution_root_span called, event type={type(event).__name__}") + try: + if not isinstance(event, dict): + return None + + execution_arn = event.get("DurableExecutionArn") + has_initial_state = "InitialExecutionState" in event + if not execution_arn or not has_initial_state: + return None + + # Component 4: On replay, context is already activated from checkpoint. + # Don't recreate root span — it was already emitted in a prior invocation. + if is_durable_execution_replay(event): + print("[DD-DURABLE] Replay invocation — skipping root span creation (context from checkpoint)") + return None + + # Component 1: First invocation — create new root span + service_name = os.environ.get("DD_DURABLE_EXECUTION_SERVICE") or "aws.durable-execution" + resource = execution_arn.split(":")[-1] if ":" in execution_arn else execution_arn + + span = tracer.trace( + "aws.durable-execution", + service=service_name, + resource=resource, + span_type="serverless", + ) + + if span: + span.set_tag("durable.execution_arn", execution_arn) + print(f"[DD-DURABLE] Created root span: trace_id={span.trace_id}, span_id={span.span_id}, resource={resource}") + else: + print("[DD-DURABLE] tracer.trace() returned None") + + return span + + except Exception as e: + logger.debug("Failed to create durable execution root span: %s", e) + print(f"[DD-DURABLE] Failed to create root span: {e}") + return None + + def extract_dd_trace_context( event, lambda_context, extractor=None, decode_authorizer_context: bool = True ): @@ -634,6 +808,16 @@ def extract_dd_trace_context( trace_context_source = None event_source = parse_event_source(event) + # Check for AWS Lambda Durable Execution events first (before other checks) + # This ensures trace context is properly continued across durable invocations + durable_context = extract_context_from_durable_execution(event, lambda_context) + if _is_context_complete(durable_context): + logger.debug("Extracted Datadog trace context from durable execution") + dd_trace_context = durable_context + trace_context_source = TraceContextSource.EVENT + logger.debug("extracted dd trace context from durable execution: %s", dd_trace_context) + return dd_trace_context, trace_context_source, event_source + if extractor is not None: context = extract_context_custom_extractor(extractor, event, lambda_context) elif isinstance(event, (set, dict)) and "request" in event: diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 767816a5..292fa25c 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -39,6 +39,8 @@ create_inferred_span, InferredSpanInfo, is_authorizer_response, + is_durable_execution_replay, + create_durable_execution_root_span, tracer, propagator, ) @@ -149,6 +151,8 @@ def __init__(self, func): self.inferred_span = None self.response = None self.blocking_response = None + self.durable_root_span = None + self.is_durable = False if config.profiling_enabled and profiler: self.prof = profiler.Profiler(env=config.env, service=config.service) @@ -269,7 +273,9 @@ def _before(self, event, context): if config.trace_enabled: set_dd_trace_py_root(trace_context_source, config.merge_xray_traces) - if config.make_inferred_span: + # Skip inferred span for durable execution replays to avoid duplicates + # For replays, trace context comes from checkpoint, not from event trigger + if config.make_inferred_span and not is_durable_execution_replay(event): self.inferred_span = create_inferred_span( event, context, event_source, config.decode_authorizer_context ) @@ -277,6 +283,40 @@ def _before(self, event, context): if config.appsec_enabled: asm_set_context(event_source) + # For durable executions: create root span BEFORE aws.lambda span + # so aws.lambda becomes a child of the root durable execution span. + self.is_durable = isinstance(event, dict) and "DurableExecutionArn" in event + if self.is_durable: + # Set _reactivate on the active context so it persists after + # all spans close. This prevents ddtrace from purging the + # context when the last span (aws.lambda or root) finishes. + active_ctx = tracer.context_provider.active() + if active_ctx and hasattr(active_ctx, '_reactivate'): + active_ctx._reactivate = True + print(f"[DD-DURABLE] Set _reactivate=True on active context") + + # For replay: copy _meta from extracted context for _dd.p.* tag propagation + # set_dd_trace_py_root only copies trace_id/span_id/sampling_priority, + # so propagation tags from the checkpoint would be lost without this. + if dd_context and hasattr(dd_context, '_meta') and active_ctx and hasattr(active_ctx, '_meta'): + for k, v in dd_context._meta.items(): + if k not in active_ctx._meta: + active_ctx._meta[k] = v + + # Component 1: Create root span (first invocation only) + # Component 4: On replays, returns None (context from checkpoint) + self.durable_root_span = create_durable_execution_root_span(event) + + # Store root span reference for Component 2 (checkpoint save) + if self.durable_root_span: + try: + from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import set_durable_root_span + set_durable_root_span(self.durable_root_span) + except Exception: + pass + + # Create aws.lambda span — child of root durable span (first invocation) + # or child of checkpoint context (replay), or normal parent otherwise self.span = create_function_execution_span( context=context, function_name=config.function_name, @@ -289,6 +329,7 @@ def _before(self, event, context): parent_span=self.inferred_span, span_pointers=calculate_span_pointers(event_source, event), ) + if config.appsec_enabled: asm_start_request(self.span, event, event_source, self.trigger_tags) self.blocking_response = get_asm_blocked_response(self.event_source) @@ -352,6 +393,41 @@ def _after(self, event, context): self.span.finish() + # Component 3: After aws.lambda closes but BEFORE root span closes, + # check if trace context was enriched during this invocation. + # The context is still alive because _reactivate=True on the parent context. + # This is best-effort for the end-of-invocation case; the piggyback in + # _patched_create_checkpoint handles saves during handler execution. + if self.is_durable: + try: + from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import ( + _has_context_changed, save_updated_trace_checkpoint, + _get_current_execution_state, _inject_current_context, + ) + state = _get_current_execution_state() + if state: + before_headers = getattr(state, "_dd_before_trace_headers", None) + if before_headers is not None and _has_context_changed(before_headers): + print("[DD-DURABLE] Context changed at end of invocation, saving updated checkpoint") + state._dd_saving_trace_checkpoint = True + try: + save_updated_trace_checkpoint(state) + finally: + state._dd_saving_trace_checkpoint = False + current = _inject_current_context() + if current: + state._dd_before_trace_headers = current + except Exception as e: + print(f"[DD-DURABLE] Best-effort context change check in _after: {e}") + + # Finish durable execution root span LAST (Component 1) + # Component 2 (root checkpoint) is handled in _patched_create_checkpoint + # piggybacked on the first operation's checkpoint call. + if self.durable_root_span: + self.durable_root_span.finish() + print(f"[DD-DURABLE] Finished root span: trace_id={self.durable_root_span.trace_id}, span_id={self.durable_root_span.span_id}") + self.durable_root_span = None + if status_code: self.trigger_tags["http.status_code"] = status_code mark_trace_as_error_for_5xx_responses(context, status_code, self.span) From 4465fc331580d655225653401754849c27839c0a Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Fri, 17 Apr 2026 13:32:27 -0400 Subject: [PATCH 2/4] testing helper --- datadog_lambda/patch.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/datadog_lambda/patch.py b/datadog_lambda/patch.py index 6d2af0dc..134e42d8 100644 --- a/datadog_lambda/patch.py +++ b/datadog_lambda/patch.py @@ -31,6 +31,14 @@ def patch_all(): if config.trace_enabled: patch_all_dd() + # Todo: remove this for PR. This is just a testing helper + # Manually patch the durable execution integration since it may not + # be registered in the PyPI ddtrace's _monkey.py yet. + try: + from ddtrace import patch as _patch_dd + _patch_dd(aws_durable_execution_sdk_python=True) + except Exception: + pass else: _patch_http() _ensure_patch_requests() From 55d823494b91015a2c8c6a2e39301e05ee1dec11 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 23 Apr 2026 07:27:50 -0400 Subject: [PATCH 3/4] initial commit --- datadog_lambda/patch.py | 16 ++--- datadog_lambda/tracing.py | 124 ++++++++++++++++++++------------------ datadog_lambda/wrapper.py | 54 ++++------------- 3 files changed, 88 insertions(+), 106 deletions(-) diff --git a/datadog_lambda/patch.py b/datadog_lambda/patch.py index 134e42d8..105a712b 100644 --- a/datadog_lambda/patch.py +++ b/datadog_lambda/patch.py @@ -31,14 +31,16 @@ def patch_all(): if config.trace_enabled: patch_all_dd() - # Todo: remove this for PR. This is just a testing helper - # Manually patch the durable execution integration since it may not - # be registered in the PyPI ddtrace's _monkey.py yet. + # Todo: remove this for PR. This is just a testing helper. + # Call the aws_durable_execution_sdk_python integration's patch() + # directly because PyPI ddtrace's _monkey.py doesn't know about it yet, + # so ddtrace.patch(aws_durable_execution_sdk_python=True) would be a no-op. try: - from ddtrace import patch as _patch_dd - _patch_dd(aws_durable_execution_sdk_python=True) - except Exception: - pass + from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import patch as _patch_ade + _patch_ade() + print("[DD-DURABLE] aws_durable_execution_sdk_python integration patched") + except Exception as e: + print(f"[DD-DURABLE] Failed to patch aws_durable_execution_sdk_python: {e}") else: _patch_http() _ensure_patch_requests() diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 29bc8064..001495ef 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -659,19 +659,19 @@ def is_durable_execution_replay(event): return is_replay +_TRACE_CHECKPOINT_PREFIX = "_dd_trace_context_" + + def extract_context_from_durable_execution(event, lambda_context): """ Extract Datadog trace context from AWS Lambda Durable Execution event. - Looks for extra trace context checkpoints created by the dd-trace plugin. - These are STEP operations with Name="_dd_trace_context" that store trace - headers in their StepDetails.Result payload. Customer operation payloads - are never read or modified. - - Scans operations in reverse to find the LAST trace checkpoint, which - corresponds to the most recently completed customer operation. This gives - proper parent chaining: each invocation's root span is parented to the - last operation span from the previous invocation. + Looks for trace-context checkpoints created by the dd-trace-py integration + on previous invocations. Checkpoints are STEP operations named + ``_dd_trace_context_{N}`` with trace headers stored as their StepDetails.Result + payload. The one with the highest ``{N}`` wins — it corresponds to the + latest trace-context state from the previous invocation. Customer + operation payloads are never read or modified. """ try: if not isinstance(event, dict): @@ -687,54 +687,59 @@ def extract_context_from_durable_execution(event, lambda_context): print(f"[DD-DURABLE] Found {len(operations)} operations in InitialExecutionState") - # Scan in reverse to find the LAST trace context checkpoint - # (corresponds to the most recently completed customer operation) - for idx in range(len(operations) - 1, -1, -1): - operation = operations[idx] + # Collect all _dd_trace_context_{N} checkpoints, then pick the highest N + candidates = [] # list of (number, operation) + for operation in operations: op_name = operation.get("Name") - - if op_name != "_dd_trace_context": + if not op_name or not op_name.startswith(_TRACE_CHECKPOINT_PREFIX): continue + suffix = op_name[len(_TRACE_CHECKPOINT_PREFIX):] + try: + number = int(suffix) + except ValueError: + continue + candidates.append((number, operation)) - operation_id = operation.get("Id") - print(f"[DD-DURABLE] Found trace checkpoint: id={operation_id}, index={idx}") + if not candidates: + print("[DD-DURABLE] No trace context checkpoints found in operations") + return None - # Trace context is in StepDetails.Result (standard STEP format) - step_details = operation.get("StepDetails", {}) - payload_str = step_details.get("Result") + candidates.sort(key=lambda t: t[0]) + number, operation = candidates[-1] + operation_id = operation.get("Id") + op_name = operation.get("Name") + print(f"[DD-DURABLE] Using latest trace checkpoint: name={op_name}, id={operation_id}") - if not payload_str: - print(f"[DD-DURABLE] Trace checkpoint {operation_id} has no Result, skipping") - continue + step_details = operation.get("StepDetails", {}) + payload_str = step_details.get("Result") + if not payload_str: + print(f"[DD-DURABLE] Trace checkpoint {op_name} has no Result, skipping") + return None - try: - payload = json.loads(payload_str) - if not isinstance(payload, dict): - print(f"[DD-DURABLE] Trace checkpoint payload is not a dict: {type(payload)}") - continue - - trace_id = payload.get("x-datadog-trace-id") - span_id = payload.get("x-datadog-parent-id") - - if trace_id and span_id: - # Use HTTPPropagator to restore full context including - # baggage, _dd.p.* tags, origin, and sampling priority - context = propagator.extract(payload) - if context and context.trace_id: - print(f"[DD-DURABLE] Extracted trace context from trace checkpoint {operation_id}") - print(f"[DD-DURABLE] trace_id={trace_id}, span_id={span_id}, headers={list(payload.keys())}") - logger.debug( - "Extracted Datadog trace context from trace checkpoint %s: %s", - operation_id, - context, - ) - return context - except (json.JSONDecodeError, TypeError, ValueError) as e: - print(f"[DD-DURABLE] Failed to parse trace checkpoint payload: {e}") - logger.debug("Failed to parse trace checkpoint payload: %s", e) - continue + try: + payload = json.loads(payload_str) + except (json.JSONDecodeError, TypeError, ValueError) as e: + print(f"[DD-DURABLE] Failed to parse trace checkpoint payload: {e}") + logger.debug("Failed to parse trace checkpoint payload: %s", e) + return None + + if not isinstance(payload, dict): + print(f"[DD-DURABLE] Trace checkpoint payload is not a dict: {type(payload)}") + return None - print("[DD-DURABLE] No trace context checkpoints found in operations") + context = propagator.extract(payload) + if context and context.trace_id: + print( + f"[DD-DURABLE] Extracted trace context from {op_name}: " + f"trace_id={context.trace_id}, span_id={context.span_id}, " + f"headers={list(payload.keys())}" + ) + logger.debug( + "Extracted Datadog trace context from trace checkpoint %s: %s", + op_name, + context, + ) + return context except Exception as e: logger.debug("Failed to extract trace context from durable execution: %s", e) @@ -1161,12 +1166,16 @@ def process_injected_data(event, request_time_epoch_ms, args, tags): start_time_ns = int( injected_authorizer_data.get(Headers.Parent_Span_Finish_Time) ) - integration_latency = int( - event["requestContext"]["authorizer"].get("integrationLatency", 0) - ) - finish_time_ns = max( - start_time_ns, (request_time_epoch_ms + integration_latency) * 1e6 - ) + finish_time_ns = ( + request_time_epoch_ms + + ( + int( + event["requestContext"]["authorizer"].get( + "integrationLatency", 0 + ) + ) + ) + ) * 1e6 upstream_authorizer_span = insert_upstream_authorizer_span( args, tags, start_time_ns, finish_time_ns ) @@ -1629,9 +1638,9 @@ def create_function_execution_span( trace_context_source, merge_xray_traces, trigger_tags, - durable_function_tags=None, parent_span=None, span_pointers=None, + durable_function_tags=None, ): tags = None if context: @@ -1640,7 +1649,6 @@ def create_function_execution_span( function_arn = ":".join(tk[0:7]) if len(tk) > 7 else function_arn function_version = tk[7] if len(tk) > 7 else "$LATEST" tags = { - "span.kind": "server", "cold_start": str(is_cold_start).lower(), "function_arn": function_arn, "function_version": function_version, diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 292fa25c..718473e6 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -153,6 +153,7 @@ def __init__(self, func): self.blocking_response = None self.durable_root_span = None self.is_durable = False + self.durable_status = None if config.profiling_enabled and profiler: self.prof = profiler.Profiler(env=config.env, service=config.service) @@ -307,14 +308,6 @@ def _before(self, event, context): # Component 4: On replays, returns None (context from checkpoint) self.durable_root_span = create_durable_execution_root_span(event) - # Store root span reference for Component 2 (checkpoint save) - if self.durable_root_span: - try: - from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import set_durable_root_span - set_durable_root_span(self.durable_root_span) - except Exception: - pass - # Create aws.lambda span — child of root durable span (first invocation) # or child of checkpoint context (replay), or normal parent otherwise self.span = create_function_execution_span( @@ -333,6 +326,7 @@ def _before(self, event, context): if config.appsec_enabled: asm_start_request(self.span, event, event_source, self.trigger_tags) self.blocking_response = get_asm_blocked_response(self.event_source) + else: set_correlation_ids() if config.profiling_enabled and profiler and is_new_sandbox(): @@ -359,6 +353,11 @@ def _after(self, event, context): if should_trace_cold_start: trace_ctx = tracer.current_trace_context() + if self.is_durable: + self.durable_status = extract_durable_execution_status( + self.response, event + ) + if self.span: if config.appsec_enabled and not self.blocking_response: asm_start_response( @@ -384,45 +383,18 @@ def _after(self, event, context): if status_code: self.span.set_tag("http.status_code", status_code) - durable_status = extract_durable_execution_status(self.response, event) - if durable_status: + if self.durable_status: self.span.set_tag( "aws_lambda.durable_function.execution_status", - durable_status, + self.durable_status, ) self.span.finish() - # Component 3: After aws.lambda closes but BEFORE root span closes, - # check if trace context was enriched during this invocation. - # The context is still alive because _reactivate=True on the parent context. - # This is best-effort for the end-of-invocation case; the piggyback in - # _patched_create_checkpoint handles saves during handler execution. - if self.is_durable: - try: - from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import ( - _has_context_changed, save_updated_trace_checkpoint, - _get_current_execution_state, _inject_current_context, - ) - state = _get_current_execution_state() - if state: - before_headers = getattr(state, "_dd_before_trace_headers", None) - if before_headers is not None and _has_context_changed(before_headers): - print("[DD-DURABLE] Context changed at end of invocation, saving updated checkpoint") - state._dd_saving_trace_checkpoint = True - try: - save_updated_trace_checkpoint(state) - finally: - state._dd_saving_trace_checkpoint = False - current = _inject_current_context() - if current: - state._dd_before_trace_headers = current - except Exception as e: - print(f"[DD-DURABLE] Best-effort context change check in _after: {e}") - - # Finish durable execution root span LAST (Component 1) - # Component 2 (root checkpoint) is handled in _patched_create_checkpoint - # piggybacked on the first operation's checkpoint call. + # Finish durable execution root span LAST, after aws.lambda. + # The trace-context checkpoint (for cross-invocation continuity) + # is saved by dd-trace-py's aws_durable_execution_sdk_python + # integration when the aws.durable_execution.execute span closes. if self.durable_root_span: self.durable_root_span.finish() print(f"[DD-DURABLE] Finished root span: trace_id={self.durable_root_span.trace_id}, span_id={self.durable_root_span.span_id}") From 1267a241f3e1f26af09bddbe6e2b5be20452c45d Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 30 Apr 2026 00:29:36 -0400 Subject: [PATCH 4/4] feat(durable): extract and resume trace context across invocations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce a two-tier extraction priority for AWS Lambda durable execution events in ``datadog_lambda.tracing.extract_context_from_durable_execution``: 1. Highest-numbered ``_datadog_{N}`` STEP checkpoint (written by the ``aws_durable_execution_sdk_python`` integration on a prior invocation). 2. Datadog headers found in the event's original ``InputPayload`` — handles upstream-instrumented invokers that embed propagation headers under ``_datadog`` or ``headers``. If neither yields a context the function returns ``None`` and the existing extraction chain continues; on the first invocation of a fresh durable execution the tracer mints a new trace, and subsequent invocations recover that same trace via the priority-1 checkpoint. ``create_durable_execution_root_span`` continues to emit ``aws.durable-execution`` as the root span on the very first invocation only; the span id is left as whatever the tracer mints, since the dd-trace-py integration discovers it via a grandparent walk in the live span tree. The wrapper finishes the durable root span after ``aws.lambda`` so the suspend-path checkpoint write in dd-trace-py captures the fully-resolved trace context. Strips a handful of leftover ``[DD-DURABLE]`` ``print()`` debug lines that would have spammed production Lambda logs. Co-Authored-By: Claude Opus 4.7 --- datadog_lambda/patch.py | 18 ++-- datadog_lambda/tracing.py | 212 +++++++++++++++++++++----------------- datadog_lambda/wrapper.py | 20 +++- tests/test_durable.py | 143 +++++++++++++++++++++++++ 4 files changed, 287 insertions(+), 106 deletions(-) diff --git a/datadog_lambda/patch.py b/datadog_lambda/patch.py index 105a712b..879f2bc3 100644 --- a/datadog_lambda/patch.py +++ b/datadog_lambda/patch.py @@ -31,16 +31,20 @@ def patch_all(): if config.trace_enabled: patch_all_dd() - # Todo: remove this for PR. This is just a testing helper. - # Call the aws_durable_execution_sdk_python integration's patch() - # directly because PyPI ddtrace's _monkey.py doesn't know about it yet, - # so ddtrace.patch(aws_durable_execution_sdk_python=True) would be a no-op. + # AIDEV-NOTE: Until the aws_durable_execution_sdk_python integration + # ships in a stable ddtrace release, this branch wires it up directly. + # ddtrace.patch(aws_durable_execution_sdk_python=True) becomes a no-op + # against PyPI ddtrace because _monkey.py doesn't know the name yet. + # Remove once the integration is GA. try: - from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import patch as _patch_ade + from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import ( + patch as _patch_ade, + ) + _patch_ade() - print("[DD-DURABLE] aws_durable_execution_sdk_python integration patched") + logger.debug("aws_durable_execution_sdk_python integration patched") except Exception as e: - print(f"[DD-DURABLE] Failed to patch aws_durable_execution_sdk_python: {e}") + logger.debug("Failed to patch aws_durable_execution_sdk_python: %s", e) else: _patch_http() _ensure_patch_requests() diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 001495ef..871aea69 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -649,117 +649,143 @@ def is_durable_execution_replay(event): # The SDK always includes the EXECUTION operation itself (1 operation on first invocation). # A replay has >1 operations (the EXECUTION + previously completed operations). # This aligns with the SDK's ReplayStatus logic in execution.py. - is_replay = len(operations) > 1 + return len(operations) > 1 - if is_replay: - print(f"[DD-DURABLE] Detected replay invocation with {len(operations)} existing operations") - else: - print(f"[DD-DURABLE] Detected first invocation ({len(operations)} operations)") - - return is_replay +_TRACE_CHECKPOINT_PREFIX = "_datadog_" -_TRACE_CHECKPOINT_PREFIX = "_dd_trace_context_" +def _extract_from_datadog_checkpoint(operations): + """Priority 1: highest-numbered ``_datadog_{N}`` STEP operation. -def extract_context_from_durable_execution(event, lambda_context): + Returns a Context, or None if no usable checkpoint is present. """ - Extract Datadog trace context from AWS Lambda Durable Execution event. + candidates = [] + for operation in operations: + op_name = operation.get("Name") + if not op_name or not op_name.startswith(_TRACE_CHECKPOINT_PREFIX): + continue + suffix = op_name[len(_TRACE_CHECKPOINT_PREFIX) :] + try: + number = int(suffix) + except ValueError: + continue + candidates.append((number, operation)) + + if not candidates: + return None + + candidates.sort(key=lambda t: t[0]) + _, operation = candidates[-1] + + payload_str = (operation.get("StepDetails") or {}).get("Result") + if not payload_str: + return None - Looks for trace-context checkpoints created by the dd-trace-py integration - on previous invocations. Checkpoints are STEP operations named - ``_dd_trace_context_{N}`` with trace headers stored as their StepDetails.Result - payload. The one with the highest ``{N}`` wins — it corresponds to the - latest trace-context state from the previous invocation. Customer - operation payloads are never read or modified. - """ try: - if not isinstance(event, dict): - return None + payload = json.loads(payload_str) + except (ValueError, TypeError): + return None - if "DurableExecutionArn" not in event or "InitialExecutionState" not in event: - return None + if not isinstance(payload, dict): + return None - print("[DD-DURABLE] Detected AWS Lambda Durable Execution event") + context = propagator.extract(payload) + if context and context.trace_id: + return context + return None - initial_state = event.get("InitialExecutionState", {}) - operations = initial_state.get("Operations", []) - print(f"[DD-DURABLE] Found {len(operations)} operations in InitialExecutionState") +def _extract_from_input_payload(operations): + """Priority 2: Datadog headers in the original event's InputPayload. - # Collect all _dd_trace_context_{N} checkpoints, then pick the highest N - candidates = [] # list of (number, operation) - for operation in operations: - op_name = operation.get("Name") - if not op_name or not op_name.startswith(_TRACE_CHECKPOINT_PREFIX): - continue - suffix = op_name[len(_TRACE_CHECKPOINT_PREFIX):] - try: - number = int(suffix) - except ValueError: - continue - candidates.append((number, operation)) + The first operation in ``InitialExecutionState.Operations`` is the EXECUTION + operation; its ``ExecutionDetails.InputPayload`` is the original event the + durable function was invoked with — immutable across replays. If the caller + embedded Datadog headers (typical for API Gateway, direct invoke from + another instrumented service, or a chained durable invoke), use them. + """ + if not operations: + return None - if not candidates: - print("[DD-DURABLE] No trace context checkpoints found in operations") - return None + first_op = operations[0] + payload_str = (first_op.get("ExecutionDetails") or {}).get("InputPayload") + if not payload_str: + return None - candidates.sort(key=lambda t: t[0]) - number, operation = candidates[-1] - operation_id = operation.get("Id") - op_name = operation.get("Name") - print(f"[DD-DURABLE] Using latest trace checkpoint: name={op_name}, id={operation_id}") + try: + payload = json.loads(payload_str) + except (ValueError, TypeError): + return None - step_details = operation.get("StepDetails", {}) - payload_str = step_details.get("Result") - if not payload_str: - print(f"[DD-DURABLE] Trace checkpoint {op_name} has no Result, skipping") - return None + if not isinstance(payload, dict): + return None - try: - payload = json.loads(payload_str) - except (json.JSONDecodeError, TypeError, ValueError) as e: - print(f"[DD-DURABLE] Failed to parse trace checkpoint payload: {e}") - logger.debug("Failed to parse trace checkpoint payload: %s", e) + # Try the wrapping conventions used by other instrumented sources. + for carrier in ( + payload.get("_datadog"), + payload.get("headers"), + payload, + ): + if not isinstance(carrier, dict): + continue + context = propagator.extract(carrier) + if context and context.trace_id: + return context + return None + + +def extract_context_from_durable_execution(event, lambda_context): + """ + Extract Datadog trace context from AWS Lambda Durable Execution event. + + Two-tier priority: + + 1. Highest-numbered ``_datadog_{N}`` STEP checkpoint (set by the + ``aws_durable_execution_sdk_python`` integration on a prior invocation). + 2. Datadog headers found in the original event's ``InputPayload`` — + carries upstream context when an instrumented service invoked us. + + If neither yields a context, returns ``None`` and the caller falls through + to the rest of the extraction chain (Lambda context, X-Ray, etc.). On the + very first invocation of a durable execution with no upstream context, the + tracer will simply mint a fresh trace; subsequent invocations recover that + same trace via the priority-1 checkpoint. + """ + try: + if not isinstance(event, dict): return None - if not isinstance(payload, dict): - print(f"[DD-DURABLE] Trace checkpoint payload is not a dict: {type(payload)}") + if "DurableExecutionArn" not in event or "InitialExecutionState" not in event: return None - context = propagator.extract(payload) - if context and context.trace_id: - print( - f"[DD-DURABLE] Extracted trace context from {op_name}: " - f"trace_id={context.trace_id}, span_id={context.span_id}, " - f"headers={list(payload.keys())}" - ) - logger.debug( - "Extracted Datadog trace context from trace checkpoint %s: %s", - op_name, - context, - ) - return context + operations = event.get("InitialExecutionState", {}).get("Operations", []) + + ctx = _extract_from_datadog_checkpoint(operations) + if ctx is not None: + return ctx + + return _extract_from_input_payload(operations) + except Exception as e: logger.debug("Failed to extract trace context from durable execution: %s", e) - - return None + return None def create_durable_execution_root_span(event): """ Create the durable execution root span on the FIRST invocation only. - Component 1 & 4 of extracheckpoint trace propagation: - - First invocation (no checkpoint context): creates root span, returns it - - Subsequent invocations (checkpoint context found): returns None - (context already activated by extract_context_from_durable_execution, - no need to recreate root span) + - First invocation (no prior operations): creates the root span and returns it. + The span gets a fresh random ``span_id`` from the tracer; dd-trace-py's + checkpoint writer reads that id back via the live span tree (grandparent + walk) and persists it so subsequent invocations parent off the same root. + - Replay invocations: returns ``None`` — trace context is restored from the + ``_datadog_{N}`` checkpoint by ``extract_context_from_durable_execution``. - Returns the root span (caller must call span.finish() when invocation ends), - or None if not a durable execution or if this is a replay. + Returns the root span (caller must call ``span.finish()`` when the invocation + ends), or ``None`` if not a durable execution or if this is a replay. """ - print(f"[DD-DURABLE] create_durable_execution_root_span called, event type={type(event).__name__}") try: if not isinstance(event, dict): return None @@ -769,15 +795,15 @@ def create_durable_execution_root_span(event): if not execution_arn or not has_initial_state: return None - # Component 4: On replay, context is already activated from checkpoint. - # Don't recreate root span — it was already emitted in a prior invocation. if is_durable_execution_replay(event): - print("[DD-DURABLE] Replay invocation — skipping root span creation (context from checkpoint)") return None - # Component 1: First invocation — create new root span - service_name = os.environ.get("DD_DURABLE_EXECUTION_SERVICE") or "aws.durable-execution" - resource = execution_arn.split(":")[-1] if ":" in execution_arn else execution_arn + service_name = ( + os.environ.get("DD_DURABLE_EXECUTION_SERVICE") or "aws.durable-execution" + ) + resource = ( + execution_arn.split(":")[-1] if ":" in execution_arn else execution_arn + ) span = tracer.trace( "aws.durable-execution", @@ -785,18 +811,14 @@ def create_durable_execution_root_span(event): resource=resource, span_type="serverless", ) + if span is None: + return None - if span: - span.set_tag("durable.execution_arn", execution_arn) - print(f"[DD-DURABLE] Created root span: trace_id={span.trace_id}, span_id={span.span_id}, resource={resource}") - else: - print("[DD-DURABLE] tracer.trace() returned None") - + span.set_tag("durable.execution_arn", execution_arn) return span except Exception as e: logger.debug("Failed to create durable execution root span: %s", e) - print(f"[DD-DURABLE] Failed to create root span: {e}") return None @@ -820,7 +842,9 @@ def extract_dd_trace_context( logger.debug("Extracted Datadog trace context from durable execution") dd_trace_context = durable_context trace_context_source = TraceContextSource.EVENT - logger.debug("extracted dd trace context from durable execution: %s", dd_trace_context) + logger.debug( + "extracted dd trace context from durable execution: %s", dd_trace_context + ) return dd_trace_context, trace_context_source, event_source if extractor is not None: diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 718473e6..83df4163 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -286,20 +286,31 @@ def _before(self, event, context): # For durable executions: create root span BEFORE aws.lambda span # so aws.lambda becomes a child of the root durable execution span. - self.is_durable = isinstance(event, dict) and "DurableExecutionArn" in event + self.is_durable = ( + isinstance(event, dict) and "DurableExecutionArn" in event + ) if self.is_durable: # Set _reactivate on the active context so it persists after # all spans close. This prevents ddtrace from purging the # context when the last span (aws.lambda or root) finishes. + # AIDEV-NOTE: _reactivate=True keeps the active Context alive + # after all spans on it close. Without it, ddtrace purges the + # context when the last span (aws.lambda or the durable root) + # finishes, and any later trace-checkpoint write would lose + # the propagation parent. Python-only fix; no JS analog. active_ctx = tracer.context_provider.active() - if active_ctx and hasattr(active_ctx, '_reactivate'): + if active_ctx and hasattr(active_ctx, "_reactivate"): active_ctx._reactivate = True - print(f"[DD-DURABLE] Set _reactivate=True on active context") # For replay: copy _meta from extracted context for _dd.p.* tag propagation # set_dd_trace_py_root only copies trace_id/span_id/sampling_priority, # so propagation tags from the checkpoint would be lost without this. - if dd_context and hasattr(dd_context, '_meta') and active_ctx and hasattr(active_ctx, '_meta'): + if ( + dd_context + and hasattr(dd_context, "_meta") + and active_ctx + and hasattr(active_ctx, "_meta") + ): for k, v in dd_context._meta.items(): if k not in active_ctx._meta: active_ctx._meta[k] = v @@ -397,7 +408,6 @@ def _after(self, event, context): # integration when the aws.durable_execution.execute span closes. if self.durable_root_span: self.durable_root_span.finish() - print(f"[DD-DURABLE] Finished root span: trace_id={self.durable_root_span.trace_id}, span_id={self.durable_root_span.span_id}") self.durable_root_span = None if status_code: diff --git a/tests/test_durable.py b/tests/test_durable.py index 36a3e8c5..d06a3cc0 100644 --- a/tests/test_durable.py +++ b/tests/test_durable.py @@ -176,3 +176,146 @@ def test_returns_none_for_none_response(self): "DurableExecutionArn": "arn:aws:lambda:us-east-1:123:function:f:1/durable-execution/n/id" } self.assertIsNone(extract_durable_execution_status(None, event)) + + +import json + +from datadog_lambda.tracing import ( + create_durable_execution_root_span, + extract_context_from_durable_execution, + is_durable_execution_replay, +) + + +_TEST_ARN = "arn:aws:lambda:us-east-2:1:function:f:1" "/durable-execution/wf/abc-123" + + +def _event(operations): + return { + "DurableExecutionArn": _TEST_ARN, + "InitialExecutionState": {"Operations": operations}, + } + + +def _execution_op(input_payload=None): + op = {"OperationType": "EXECUTION", "Name": "execution"} + if input_payload is not None: + op["ExecutionDetails"] = {"InputPayload": input_payload} + return op + + +def _trace_checkpoint_op(n, headers): + return { + "OperationType": "STEP", + "Id": f"id-{n}", + "Name": f"_datadog_{n}", + "StepDetails": {"Result": json.dumps(headers)}, + } + + +class TestExtractContextPriorityOne(unittest.TestCase): + """Highest-numbered ``_datadog_{N}`` STEP wins.""" + + def test_returns_context_from_latest_checkpoint(self): + ev = _event( + [ + _execution_op(), + _trace_checkpoint_op( + 0, + { + "x-datadog-trace-id": "111", + "x-datadog-parent-id": "222", + "x-datadog-sampling-priority": "1", + }, + ), + _trace_checkpoint_op( + 1, + { + "x-datadog-trace-id": "111", + "x-datadog-parent-id": "333", + "x-datadog-sampling-priority": "1", + }, + ), + ] + ) + ctx = extract_context_from_durable_execution(ev, None) + self.assertEqual(ctx.trace_id, 111) + # Latest checkpoint (N=1) wins. + self.assertEqual(ctx.span_id, 333) + + +class TestExtractContextPriorityTwo(unittest.TestCase): + """When no checkpoint exists, fall back to the original event payload.""" + + def test_extracts_from_input_payload_headers_field(self): + upstream_headers = { + "x-datadog-trace-id": "777", + "x-datadog-parent-id": "888", + "x-datadog-sampling-priority": "1", + } + input_payload = json.dumps({"headers": upstream_headers, "body": "..."}) + ev = _event([_execution_op(input_payload)]) + ctx = extract_context_from_durable_execution(ev, None) + self.assertEqual(ctx.trace_id, 777) + self.assertEqual(ctx.span_id, 888) + + def test_extracts_from_input_payload_underscore_datadog_field(self): + upstream_headers = { + "x-datadog-trace-id": "999", + "x-datadog-parent-id": "111", + "x-datadog-sampling-priority": "1", + } + input_payload = json.dumps({"_datadog": upstream_headers}) + ev = _event([_execution_op(input_payload)]) + ctx = extract_context_from_durable_execution(ev, None) + self.assertEqual(ctx.trace_id, 999) + + +class TestExtractContextReturnsNoneWhenNoUpstream(unittest.TestCase): + """No checkpoint and no upstream headers → return None and let the rest + of the extraction chain run. The tracer mints a fresh trace on the first + invocation; subsequent invocations recover it via the priority-1 checkpoint. + """ + + def test_returns_none_when_only_execution_op(self): + ev = _event([_execution_op()]) + self.assertIsNone(extract_context_from_durable_execution(ev, None)) + + def test_returns_none_when_input_payload_has_no_dd_headers(self): + ev = _event([_execution_op(json.dumps({"some": "user-event"}))]) + self.assertIsNone(extract_context_from_durable_execution(ev, None)) + + +class TestIsDurableExecutionReplay(unittest.TestCase): + def test_first_invocation_is_not_replay(self): + self.assertFalse(is_durable_execution_replay(_event([_execution_op()]))) + + def test_second_invocation_is_replay(self): + ev = _event([_execution_op(), _trace_checkpoint_op(0, {})]) + self.assertTrue(is_durable_execution_replay(ev)) + + def test_non_durable_event_is_not_replay(self): + self.assertFalse(is_durable_execution_replay({"body": "..."})) + + +class TestCreateDurableExecutionRootSpan(unittest.TestCase): + def test_returns_none_on_replay(self): + ev = _event([_execution_op(), _trace_checkpoint_op(0, {})]) + self.assertIsNone(create_durable_execution_root_span(ev)) + + def test_returns_none_for_non_durable_event(self): + self.assertIsNone(create_durable_execution_root_span({"body": "..."})) + + def test_first_invocation_returns_a_span(self): + ev = _event([_execution_op()]) + span = create_durable_execution_root_span(ev) + try: + self.assertIsNotNone(span) + # The span_id is whatever the tracer minted; dd-trace-py reads it + # back via a grandparent walk from the in-process span tree, so we + # don't assert any deterministic relationship to the ARN here. + self.assertGreater(span.span_id, 0) + self.assertEqual(span.get_tag("durable.execution_arn"), _TEST_ARN) + finally: + if span is not None: + span.finish()