From ee40632ba6694aa2ab698fdf7a1dfd250c6545a7 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Wed, 6 May 2026 15:25:26 -0400 Subject: [PATCH] durable: add first-invocation execution_init inferred span --- datadog_lambda/durable.py | 7 ++- datadog_lambda/tracing.py | 81 ++++++++++++++++++++++++++++++++++ datadog_lambda/wrapper.py | 6 ++- tests/test_durable.py | 21 +++++++++ tests/test_tracing.py | 92 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 204 insertions(+), 3 deletions(-) diff --git a/datadog_lambda/durable.py b/datadog_lambda/durable.py index 9a28b36b..321b7052 100644 --- a/datadog_lambda/durable.py +++ b/datadog_lambda/durable.py @@ -44,8 +44,11 @@ def extract_durable_function_tags(event): execution_name, execution_id = parsed # Use the number of operations to determine if it's the first invocation. This is # what the durable execution SDK does to determine the replay status. - operations = event.get("InitialExecutionState", {}).get("Operations", []) - is_first_invocation = len(operations) == 1 + operations = event.get("InitialExecutionState", {}).get("Operations") + operation_count = ( + len(operations) if isinstance(operations, (list, dict)) else 0 + ) + is_first_invocation = operation_count == 1 return { "aws_lambda.durable_function.execution_name": execution_name, "aws_lambda.durable_function.execution_id": execution_id, diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index c9b268db..dafc8519 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -919,12 +919,93 @@ def set_dd_trace_py_root(trace_context_source, merge_xray_traces): ) +def _durable_execution_start_ns(event): + operations = _durable_operations(event) + if not operations: + return None + + first_operation = operations[0] + if not isinstance(first_operation, dict): + return None + + start_timestamp = first_operation.get("StartTimestamp") + if isinstance(start_timestamp, str): + start_timestamp = start_timestamp.strip() + + try: + start_ms = int(start_timestamp) + except (TypeError, ValueError): + try: + start_ms = int(float(start_timestamp)) + except (TypeError, ValueError): + return None + + return start_ms * 1000000 + + +def create_inferred_span_from_durable_execution_event( + event, context, durable_function_tags +): + if not durable_function_tags: + return None + if durable_function_tags.get("aws_lambda.durable_function.first_invocation") != "true": + return None + + inferred_span_start_ns = _durable_execution_start_ns(event) + if inferred_span_start_ns is None: + return None + + service_name = os.environ.get( + "DD_DURABLE_EXECUTION_SERVICE", "aws.durable-execution" + ) + execution_name = durable_function_tags.get( + "aws_lambda.durable_function.execution_name" + ) + execution_id = durable_function_tags.get("aws_lambda.durable_function.execution_id") + durable_execution_arn = event.get("DurableExecutionArn") + + tags = { + "operation_name": "aws.durable.execution_init", + "resource_names": execution_name, + "request_id": context.aws_request_id if context else None, + "service": service_name, + "service.name": service_name, + "span.type": "serverless", + "resource.name": execution_name, + "span.kind": "server", + "durable.execution_arn": durable_execution_arn, + "durable.execution_name": execution_name, + "durable.execution_id": execution_id, + } + InferredSpanInfo.set_tags(tags, tag_source="self", synchronicity="async") + + tracer.set_tags(_dd_origin) + span = tracer.trace( + "aws.durable.execution_init", + service=service_name, + resource=execution_name, + span_type="serverless", + ) + if span: + span.set_tags(tags) + span.set_metric(InferredSpanInfo.METRIC, 1.0) + span.start_ns = inferred_span_start_ns + return span + + def create_inferred_span( event, context, event_source: _EventSource = None, decode_authorizer_context: bool = True, + durable_function_tags=None, ): + if durable_function_tags: + logger.debug("Durable execution event detected. Inferring a span") + return create_inferred_span_from_durable_execution_event( + event, context, durable_function_tags + ) + if event_source is None: event_source = parse_event_source(event) try: diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 767816a5..0e2d6668 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -271,7 +271,11 @@ def _before(self, event, context): set_dd_trace_py_root(trace_context_source, config.merge_xray_traces) if config.make_inferred_span: self.inferred_span = create_inferred_span( - event, context, event_source, config.decode_authorizer_context + event, + context, + event_source, + config.decode_authorizer_context, + self.durable_function_tags, ) if config.appsec_enabled: diff --git a/tests/test_durable.py b/tests/test_durable.py index 36a3e8c5..3d52e881 100644 --- a/tests/test_durable.py +++ b/tests/test_durable.py @@ -85,6 +85,27 @@ def test_sets_first_invocation_false_when_multiple_operations(self): }, ) + def test_sets_first_invocation_false_when_operations_is_a_map(self): + event = { + "DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004", + "CheckpointToken": "some-token", + "InitialExecutionState": { + "Operations": { + "0": {"Type": "EXECUTION"}, + "1": {"Type": "STEP"}, + } + }, + } + result = extract_durable_function_tags(event) + self.assertEqual( + result, + { + "aws_lambda.durable_function.execution_name": "my-execution", + "aws_lambda.durable_function.execution_id": "550e8400-e29b-41d4-a716-446655440004", + "aws_lambda.durable_function.first_invocation": "false", + }, + ) + def test_returns_empty_dict_for_regular_lambda_event(self): event = { "body": '{"key": "value"}', diff --git a/tests/test_tracing.py b/tests/test_tracing.py index f7945118..cd7ac9df 100644 --- a/tests/test_tracing.py +++ b/tests/test_tracing.py @@ -2536,6 +2536,98 @@ def test_authorizer_span_no_negative_duration_when_clock_skew(mock_span_finish): ) +class TestDurableExecutionInferredSpan(unittest.TestCase): + def test_creates_execution_init_span_for_first_invocation(self): + event = { + "DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004", + "InitialExecutionState": { + "Operations": [{"StartTimestamp": "1778088546775"}] + }, + } + durable_function_tags = { + "aws_lambda.durable_function.execution_name": "my-execution", + "aws_lambda.durable_function.execution_id": "550e8400-e29b-41d4-a716-446655440004", + "aws_lambda.durable_function.first_invocation": "true", + } + ctx = get_mock_context(aws_request_id="abc-123") + + with patch.dict( + os.environ, {"DD_DURABLE_EXECUTION_SERVICE": "durable-svc"}, clear=False + ): + span = create_inferred_span( + event, ctx, durable_function_tags=durable_function_tags + ) + + self.assertIsNotNone(span) + self.assertEqual(span.name, "aws.durable.execution_init") + self.assertEqual(span.service, "durable-svc") + self.assertEqual(span.resource, "my-execution") + self.assertEqual(span.start_ns, 1778088546775000000) + self.assertEqual( + span.get_tag("operation_name"), "aws.durable.execution_init" + ) + self.assertEqual( + span.get_tag("durable.execution_arn"), event["DurableExecutionArn"] + ) + self.assertEqual(span.get_tag("durable.execution_name"), "my-execution") + self.assertEqual( + span.get_tag("durable.execution_id"), + "550e8400-e29b-41d4-a716-446655440004", + ) + span.finish() + + def test_does_not_create_execution_init_span_for_replay_invocation(self): + event = { + "DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004", + "InitialExecutionState": { + "Operations": [{"StartTimestamp": "1778088546775"}] + }, + } + durable_function_tags = { + "aws_lambda.durable_function.execution_name": "my-execution", + "aws_lambda.durable_function.execution_id": "550e8400-e29b-41d4-a716-446655440004", + "aws_lambda.durable_function.first_invocation": "false", + } + ctx = get_mock_context() + + span = create_inferred_span(event, ctx, durable_function_tags=durable_function_tags) + self.assertIsNone(span) + + def test_parents_lambda_span_to_execution_init_span(self): + event = { + "DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004", + "InitialExecutionState": { + "Operations": [{"StartTimestamp": "1778088546775"}] + }, + } + durable_function_tags = { + "aws_lambda.durable_function.execution_name": "my-execution", + "aws_lambda.durable_function.execution_id": "550e8400-e29b-41d4-a716-446655440004", + "aws_lambda.durable_function.first_invocation": "true", + } + ctx = get_mock_context() + + inferred_span = create_inferred_span( + event, ctx, durable_function_tags=durable_function_tags + ) + lambda_span = create_function_execution_span( + context=ctx, + function_name="Function", + is_cold_start=False, + is_proactive_init=False, + trace_context_source={"source": ""}, + merge_xray_traces=False, + trigger_tags={}, + durable_function_tags=durable_function_tags, + parent_span=inferred_span, + span_pointers=None, + ) + + self.assertEqual(lambda_span.parent_id, inferred_span.span_id) + lambda_span.finish() + inferred_span.finish() + + class TestInferredSpans(unittest.TestCase): @patch("datadog_lambda.tracing.submit_errors_metric") def test_mark_trace_as_error_for_5xx_responses_getting_400_response_code(