Skip to content

Commit ee40632

Browse files
committed
durable: add first-invocation execution_init inferred span
1 parent 20b0054 commit ee40632

5 files changed

Lines changed: 204 additions & 3 deletions

File tree

datadog_lambda/durable.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,11 @@ def extract_durable_function_tags(event):
4444
execution_name, execution_id = parsed
4545
# Use the number of operations to determine if it's the first invocation. This is
4646
# what the durable execution SDK does to determine the replay status.
47-
operations = event.get("InitialExecutionState", {}).get("Operations", [])
48-
is_first_invocation = len(operations) == 1
47+
operations = event.get("InitialExecutionState", {}).get("Operations")
48+
operation_count = (
49+
len(operations) if isinstance(operations, (list, dict)) else 0
50+
)
51+
is_first_invocation = operation_count == 1
4952
return {
5053
"aws_lambda.durable_function.execution_name": execution_name,
5154
"aws_lambda.durable_function.execution_id": execution_id,

datadog_lambda/tracing.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -919,12 +919,93 @@ def set_dd_trace_py_root(trace_context_source, merge_xray_traces):
919919
)
920920

921921

922+
def _durable_execution_start_ns(event):
923+
operations = _durable_operations(event)
924+
if not operations:
925+
return None
926+
927+
first_operation = operations[0]
928+
if not isinstance(first_operation, dict):
929+
return None
930+
931+
start_timestamp = first_operation.get("StartTimestamp")
932+
if isinstance(start_timestamp, str):
933+
start_timestamp = start_timestamp.strip()
934+
935+
try:
936+
start_ms = int(start_timestamp)
937+
except (TypeError, ValueError):
938+
try:
939+
start_ms = int(float(start_timestamp))
940+
except (TypeError, ValueError):
941+
return None
942+
943+
return start_ms * 1000000
944+
945+
946+
def create_inferred_span_from_durable_execution_event(
947+
event, context, durable_function_tags
948+
):
949+
if not durable_function_tags:
950+
return None
951+
if durable_function_tags.get("aws_lambda.durable_function.first_invocation") != "true":
952+
return None
953+
954+
inferred_span_start_ns = _durable_execution_start_ns(event)
955+
if inferred_span_start_ns is None:
956+
return None
957+
958+
service_name = os.environ.get(
959+
"DD_DURABLE_EXECUTION_SERVICE", "aws.durable-execution"
960+
)
961+
execution_name = durable_function_tags.get(
962+
"aws_lambda.durable_function.execution_name"
963+
)
964+
execution_id = durable_function_tags.get("aws_lambda.durable_function.execution_id")
965+
durable_execution_arn = event.get("DurableExecutionArn")
966+
967+
tags = {
968+
"operation_name": "aws.durable.execution_init",
969+
"resource_names": execution_name,
970+
"request_id": context.aws_request_id if context else None,
971+
"service": service_name,
972+
"service.name": service_name,
973+
"span.type": "serverless",
974+
"resource.name": execution_name,
975+
"span.kind": "server",
976+
"durable.execution_arn": durable_execution_arn,
977+
"durable.execution_name": execution_name,
978+
"durable.execution_id": execution_id,
979+
}
980+
InferredSpanInfo.set_tags(tags, tag_source="self", synchronicity="async")
981+
982+
tracer.set_tags(_dd_origin)
983+
span = tracer.trace(
984+
"aws.durable.execution_init",
985+
service=service_name,
986+
resource=execution_name,
987+
span_type="serverless",
988+
)
989+
if span:
990+
span.set_tags(tags)
991+
span.set_metric(InferredSpanInfo.METRIC, 1.0)
992+
span.start_ns = inferred_span_start_ns
993+
return span
994+
995+
922996
def create_inferred_span(
923997
event,
924998
context,
925999
event_source: _EventSource = None,
9261000
decode_authorizer_context: bool = True,
1001+
durable_function_tags=None,
9271002
):
1003+
if durable_function_tags:
1004+
logger.debug("Durable execution event detected. Inferring a span")
1005+
return create_inferred_span_from_durable_execution_event(
1006+
event, context, durable_function_tags
1007+
)
1008+
9281009
if event_source is None:
9291010
event_source = parse_event_source(event)
9301011
try:

datadog_lambda/wrapper.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,11 @@ def _before(self, event, context):
271271
set_dd_trace_py_root(trace_context_source, config.merge_xray_traces)
272272
if config.make_inferred_span:
273273
self.inferred_span = create_inferred_span(
274-
event, context, event_source, config.decode_authorizer_context
274+
event,
275+
context,
276+
event_source,
277+
config.decode_authorizer_context,
278+
self.durable_function_tags,
275279
)
276280

277281
if config.appsec_enabled:

tests/test_durable.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,27 @@ def test_sets_first_invocation_false_when_multiple_operations(self):
8585
},
8686
)
8787

88+
def test_sets_first_invocation_false_when_operations_is_a_map(self):
89+
event = {
90+
"DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004",
91+
"CheckpointToken": "some-token",
92+
"InitialExecutionState": {
93+
"Operations": {
94+
"0": {"Type": "EXECUTION"},
95+
"1": {"Type": "STEP"},
96+
}
97+
},
98+
}
99+
result = extract_durable_function_tags(event)
100+
self.assertEqual(
101+
result,
102+
{
103+
"aws_lambda.durable_function.execution_name": "my-execution",
104+
"aws_lambda.durable_function.execution_id": "550e8400-e29b-41d4-a716-446655440004",
105+
"aws_lambda.durable_function.first_invocation": "false",
106+
},
107+
)
108+
88109
def test_returns_empty_dict_for_regular_lambda_event(self):
89110
event = {
90111
"body": '{"key": "value"}',

tests/test_tracing.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2536,6 +2536,98 @@ def test_authorizer_span_no_negative_duration_when_clock_skew(mock_span_finish):
25362536
)
25372537

25382538

2539+
class TestDurableExecutionInferredSpan(unittest.TestCase):
2540+
def test_creates_execution_init_span_for_first_invocation(self):
2541+
event = {
2542+
"DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004",
2543+
"InitialExecutionState": {
2544+
"Operations": [{"StartTimestamp": "1778088546775"}]
2545+
},
2546+
}
2547+
durable_function_tags = {
2548+
"aws_lambda.durable_function.execution_name": "my-execution",
2549+
"aws_lambda.durable_function.execution_id": "550e8400-e29b-41d4-a716-446655440004",
2550+
"aws_lambda.durable_function.first_invocation": "true",
2551+
}
2552+
ctx = get_mock_context(aws_request_id="abc-123")
2553+
2554+
with patch.dict(
2555+
os.environ, {"DD_DURABLE_EXECUTION_SERVICE": "durable-svc"}, clear=False
2556+
):
2557+
span = create_inferred_span(
2558+
event, ctx, durable_function_tags=durable_function_tags
2559+
)
2560+
2561+
self.assertIsNotNone(span)
2562+
self.assertEqual(span.name, "aws.durable.execution_init")
2563+
self.assertEqual(span.service, "durable-svc")
2564+
self.assertEqual(span.resource, "my-execution")
2565+
self.assertEqual(span.start_ns, 1778088546775000000)
2566+
self.assertEqual(
2567+
span.get_tag("operation_name"), "aws.durable.execution_init"
2568+
)
2569+
self.assertEqual(
2570+
span.get_tag("durable.execution_arn"), event["DurableExecutionArn"]
2571+
)
2572+
self.assertEqual(span.get_tag("durable.execution_name"), "my-execution")
2573+
self.assertEqual(
2574+
span.get_tag("durable.execution_id"),
2575+
"550e8400-e29b-41d4-a716-446655440004",
2576+
)
2577+
span.finish()
2578+
2579+
def test_does_not_create_execution_init_span_for_replay_invocation(self):
2580+
event = {
2581+
"DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004",
2582+
"InitialExecutionState": {
2583+
"Operations": [{"StartTimestamp": "1778088546775"}]
2584+
},
2585+
}
2586+
durable_function_tags = {
2587+
"aws_lambda.durable_function.execution_name": "my-execution",
2588+
"aws_lambda.durable_function.execution_id": "550e8400-e29b-41d4-a716-446655440004",
2589+
"aws_lambda.durable_function.first_invocation": "false",
2590+
}
2591+
ctx = get_mock_context()
2592+
2593+
span = create_inferred_span(event, ctx, durable_function_tags=durable_function_tags)
2594+
self.assertIsNone(span)
2595+
2596+
def test_parents_lambda_span_to_execution_init_span(self):
2597+
event = {
2598+
"DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004",
2599+
"InitialExecutionState": {
2600+
"Operations": [{"StartTimestamp": "1778088546775"}]
2601+
},
2602+
}
2603+
durable_function_tags = {
2604+
"aws_lambda.durable_function.execution_name": "my-execution",
2605+
"aws_lambda.durable_function.execution_id": "550e8400-e29b-41d4-a716-446655440004",
2606+
"aws_lambda.durable_function.first_invocation": "true",
2607+
}
2608+
ctx = get_mock_context()
2609+
2610+
inferred_span = create_inferred_span(
2611+
event, ctx, durable_function_tags=durable_function_tags
2612+
)
2613+
lambda_span = create_function_execution_span(
2614+
context=ctx,
2615+
function_name="Function",
2616+
is_cold_start=False,
2617+
is_proactive_init=False,
2618+
trace_context_source={"source": ""},
2619+
merge_xray_traces=False,
2620+
trigger_tags={},
2621+
durable_function_tags=durable_function_tags,
2622+
parent_span=inferred_span,
2623+
span_pointers=None,
2624+
)
2625+
2626+
self.assertEqual(lambda_span.parent_id, inferred_span.span_id)
2627+
lambda_span.finish()
2628+
inferred_span.finish()
2629+
2630+
25392631
class TestInferredSpans(unittest.TestCase):
25402632
@patch("datadog_lambda.tracing.submit_errors_metric")
25412633
def test_mark_trace_as_error_for_5xx_responses_getting_400_response_code(

0 commit comments

Comments
 (0)