From 256a421a1e3ef77ca7b5831155018429bf7ada67 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Mon, 1 Dec 2025 16:37:08 +0900 Subject: [PATCH 1/3] Use executor_id and edge_group_id as span names for meaningful observability traces --- .../core/agent_framework/observability.py | 5 +++-- .../packages/core/tests/workflow/test_edge.py | 18 +++++++++--------- .../workflow/test_workflow_observability.py | 11 +++++++---- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/python/packages/core/agent_framework/observability.py b/python/packages/core/agent_framework/observability.py index 1543b53251..9db7fdecd9 100644 --- a/python/packages/core/agent_framework/observability.py +++ b/python/packages/core/agent_framework/observability.py @@ -1626,7 +1626,7 @@ def create_processing_span( links.append(trace.Link(span_context)) return workflow_tracer().start_as_current_span( - OtelAttr.EXECUTOR_PROCESS_SPAN, + executor_id, kind=trace.SpanKind.INTERNAL, attributes={ OtelAttr.EXECUTOR_ID: executor_id, @@ -1698,8 +1698,9 @@ def create_edge_group_processing_span( # If linking fails, continue without link (graceful degradation) pass + span_name = edge_group_id if edge_group_id else edge_group_type return workflow_tracer().start_as_current_span( - OtelAttr.EDGE_GROUP_PROCESS_SPAN, + span_name, kind=trace.SpanKind.INTERNAL, attributes=attributes, links=links, diff --git a/python/packages/core/tests/workflow/test_edge.py b/python/packages/core/tests/workflow/test_edge.py index 38d7348440..098ea3c4f8 100644 --- a/python/packages/core/tests/workflow/test_edge.py +++ b/python/packages/core/tests/workflow/test_edge.py @@ -321,7 +321,7 @@ async def test_single_edge_group_tracing_success(span_exporter) -> None: assert success is True spans = span_exporter.get_finished_spans() - edge_group_spans = [s for s in spans if s.name == "edge_group.process"] + edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None] assert len(edge_group_spans) == 1 @@ -365,7 +365,7 @@ async def test_single_edge_group_tracing_condition_failure(span_exporter) -> Non assert success is True # Returns True but condition failed spans = span_exporter.get_finished_spans() - edge_group_spans = [s for s in spans if s.name == "edge_group.process"] + edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None] assert len(edge_group_spans) == 1 @@ -399,7 +399,7 @@ async def test_single_edge_group_tracing_type_mismatch(span_exporter) -> None: assert success is False spans = span_exporter.get_finished_spans() - edge_group_spans = [s for s in spans if s.name == "edge_group.process"] + edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None] assert len(edge_group_spans) == 1 @@ -432,7 +432,7 @@ async def test_single_edge_group_tracing_target_mismatch(span_exporter) -> None: assert success is False spans = span_exporter.get_finished_spans() - edge_group_spans = [s for s in spans if s.name == "edge_group.process"] + edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None] assert len(edge_group_spans) == 1 @@ -790,7 +790,7 @@ async def test_fan_out_edge_group_tracing_success(span_exporter) -> None: assert success is True spans = span_exporter.get_finished_spans() - edge_group_spans = [s for s in spans if s.name == "edge_group.process"] + edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None] assert len(edge_group_spans) == 1 @@ -845,7 +845,7 @@ async def test_fan_out_edge_group_tracing_with_target(span_exporter) -> None: assert success is True spans = span_exporter.get_finished_spans() - edge_group_spans = [s for s in spans if s.name == "edge_group.process"] + edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None] assert len(edge_group_spans) == 1 @@ -1012,7 +1012,7 @@ async def test_fan_in_edge_group_tracing_buffered(span_exporter) -> None: assert success is True spans = span_exporter.get_finished_spans() - edge_group_spans = [s for s in spans if s.name == "edge_group.process"] + edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None] assert len(edge_group_spans) == 1 @@ -1043,7 +1043,7 @@ async def test_fan_in_edge_group_tracing_buffered(span_exporter) -> None: assert success is True spans = span_exporter.get_finished_spans() - edge_group_spans = [s for s in spans if s.name == "edge_group.process"] + edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None] assert len(edge_group_spans) == 1 @@ -1088,7 +1088,7 @@ async def test_fan_in_edge_group_tracing_type_mismatch(span_exporter) -> None: assert success is False spans = span_exporter.get_finished_spans() - edge_group_spans = [s for s in spans if s.name == "edge_group.process"] + edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None] assert len(edge_group_spans) == 1 diff --git a/python/packages/core/tests/workflow/test_workflow_observability.py b/python/packages/core/tests/workflow/test_workflow_observability.py index 5856a80035..f3903e3bbc 100644 --- a/python/packages/core/tests/workflow/test_workflow_observability.py +++ b/python/packages/core/tests/workflow/test_workflow_observability.py @@ -151,8 +151,8 @@ async def test_span_creation_and_attributes(span_exporter: InMemorySpanExporter) event_names = [event.name for event in workflow_span.events] assert "workflow.started" in event_names - # Check processing span - processing_span = next(s for s in spans if s.name == "executor.process") + # Check processing span - span name is now the executor_id + processing_span = next(s for s in spans if s.name == "executor-456") assert processing_span.kind == trace.SpanKind.INTERNAL assert processing_span.attributes is not None assert processing_span.attributes.get("executor.id") == "executor-456" @@ -210,7 +210,8 @@ async def test_trace_context_handling(span_exporter: InMemorySpanExporter) -> No # Check that spans were created with proper attributes spans = span_exporter.get_finished_spans() - processing_spans = [s for s in spans if s.name == "executor.process"] + # Processing spans now use executor_id as the span name + processing_spans = [s for s in spans if s.attributes and s.attributes.get("executor.id") == "test-executor"] sending_spans = [s for s in spans if s.name == "message.send"] assert len(processing_spans) >= 1 @@ -218,6 +219,7 @@ async def test_trace_context_handling(span_exporter: InMemorySpanExporter) -> No # Verify processing span attributes processing_span = processing_spans[0] + assert processing_span.name == "test-executor" # Span name is now executor_id assert processing_span.attributes is not None assert processing_span.attributes.get("executor.id") == "test-executor" assert processing_span.attributes.get("executor.type") == "MockExecutor" @@ -329,8 +331,9 @@ async def test_end_to_end_workflow_tracing(span_exporter: InMemorySpanExporter) spans = span_exporter.get_finished_spans() # Should have workflow span, processing spans, and sending spans + # Processing spans now use executor_id as the span name, filter by executor.id attribute workflow_spans = [s for s in spans if s.name == "workflow.run"] - processing_spans = [s for s in spans if s.name == "executor.process"] + processing_spans = [s for s in spans if s.attributes and s.attributes.get("executor.id") is not None] sending_spans = [s for s in spans if s.name == "message.send"] build_spans_after_run = [s for s in spans if s.name == "workflow.build"] From 858a9825419d0395e0f0f3ed7877cb961be661f3 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Tue, 2 Dec 2025 10:08:27 +0900 Subject: [PATCH 2/3] Update tests --- python/packages/core/tests/workflow/test_edge.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/python/packages/core/tests/workflow/test_edge.py b/python/packages/core/tests/workflow/test_edge.py index 098ea3c4f8..7df49d80be 100644 --- a/python/packages/core/tests/workflow/test_edge.py +++ b/python/packages/core/tests/workflow/test_edge.py @@ -327,6 +327,7 @@ async def test_single_edge_group_tracing_success(span_exporter) -> None: span = edge_group_spans[0] assert span.attributes is not None + assert span.name == span.attributes.get("edge_group.id") assert span.attributes.get("edge_group.type") == "SingleEdgeGroup" assert span.attributes.get("edge_group.delivered") is True assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DELIVERED.value @@ -371,6 +372,7 @@ async def test_single_edge_group_tracing_condition_failure(span_exporter) -> Non span = edge_group_spans[0] assert span.attributes is not None + assert span.name == span.attributes.get("edge_group.id") assert span.attributes.get("edge_group.type") == "SingleEdgeGroup" assert span.attributes.get("edge_group.delivered") is False assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DROPPED_CONDITION_FALSE.value @@ -405,6 +407,7 @@ async def test_single_edge_group_tracing_type_mismatch(span_exporter) -> None: span = edge_group_spans[0] assert span.attributes is not None + assert span.name == span.attributes.get("edge_group.id") assert span.attributes.get("edge_group.type") == "SingleEdgeGroup" assert span.attributes.get("edge_group.delivered") is False assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DROPPED_TYPE_MISMATCH.value @@ -438,6 +441,7 @@ async def test_single_edge_group_tracing_target_mismatch(span_exporter) -> None: span = edge_group_spans[0] assert span.attributes is not None + assert span.name == span.attributes.get("edge_group.id") assert span.attributes.get("edge_group.type") == "SingleEdgeGroup" assert span.attributes.get("edge_group.delivered") is False assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DROPPED_TARGET_MISMATCH.value @@ -796,6 +800,7 @@ async def test_fan_out_edge_group_tracing_success(span_exporter) -> None: span = edge_group_spans[0] assert span.attributes is not None + assert span.name == span.attributes.get("edge_group.id") assert span.attributes.get("edge_group.type") == "FanOutEdgeGroup" assert span.attributes.get("edge_group.delivered") is True assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DELIVERED.value @@ -851,6 +856,7 @@ async def test_fan_out_edge_group_tracing_with_target(span_exporter) -> None: span = edge_group_spans[0] assert span.attributes is not None + assert span.name == span.attributes.get("edge_group.id") assert span.attributes.get("edge_group.type") == "FanOutEdgeGroup" assert span.attributes.get("edge_group.delivered") is True assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DELIVERED.value @@ -1018,6 +1024,7 @@ async def test_fan_in_edge_group_tracing_buffered(span_exporter) -> None: span = edge_group_spans[0] assert span.attributes is not None + assert span.name == span.attributes.get("edge_group.id") assert span.attributes.get("edge_group.type") == "FanInEdgeGroup" assert span.attributes.get("edge_group.delivered") is True assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.BUFFERED.value @@ -1049,6 +1056,7 @@ async def test_fan_in_edge_group_tracing_buffered(span_exporter) -> None: span = edge_group_spans[0] assert span.attributes is not None + assert span.name == span.attributes.get("edge_group.id") assert span.attributes.get("edge_group.type") == "FanInEdgeGroup" assert span.attributes.get("edge_group.delivered") is True assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DELIVERED.value @@ -1094,6 +1102,7 @@ async def test_fan_in_edge_group_tracing_type_mismatch(span_exporter) -> None: span = edge_group_spans[0] assert span.attributes is not None + assert span.name == span.attributes.get("edge_group.id") assert span.attributes.get("edge_group.type") == "FanInEdgeGroup" assert span.attributes.get("edge_group.delivered") is False assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DROPPED_TYPE_MISMATCH.value From 53a3c716ae3685e98c5d822038bf5b43744a0d89 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Thu, 4 Dec 2025 09:55:30 +0900 Subject: [PATCH 3/3] Adjust naming --- .../core/agent_framework/observability.py | 5 ++--- .../packages/core/tests/workflow/test_edge.py | 18 +++++++++--------- .../workflow/test_workflow_observability.py | 8 +++++--- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/python/packages/core/agent_framework/observability.py b/python/packages/core/agent_framework/observability.py index 831d0694e9..2a590fd30e 100644 --- a/python/packages/core/agent_framework/observability.py +++ b/python/packages/core/agent_framework/observability.py @@ -1626,7 +1626,7 @@ def create_processing_span( links.append(trace.Link(span_context)) return workflow_tracer().start_as_current_span( - executor_id, + f"{OtelAttr.EXECUTOR_PROCESS_SPAN} {executor_id}", kind=trace.SpanKind.INTERNAL, attributes={ OtelAttr.EXECUTOR_ID: executor_id, @@ -1698,9 +1698,8 @@ def create_edge_group_processing_span( # If linking fails, continue without link (graceful degradation) pass - span_name = edge_group_id if edge_group_id else edge_group_type return workflow_tracer().start_as_current_span( - span_name, + f"{OtelAttr.EDGE_GROUP_PROCESS_SPAN} {edge_group_type}", kind=trace.SpanKind.INTERNAL, attributes=attributes, links=links, diff --git a/python/packages/core/tests/workflow/test_edge.py b/python/packages/core/tests/workflow/test_edge.py index 7df49d80be..316cae7a39 100644 --- a/python/packages/core/tests/workflow/test_edge.py +++ b/python/packages/core/tests/workflow/test_edge.py @@ -327,7 +327,7 @@ async def test_single_edge_group_tracing_success(span_exporter) -> None: span = edge_group_spans[0] assert span.attributes is not None - assert span.name == span.attributes.get("edge_group.id") + assert span.name == "edge_group.process SingleEdgeGroup" assert span.attributes.get("edge_group.type") == "SingleEdgeGroup" assert span.attributes.get("edge_group.delivered") is True assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DELIVERED.value @@ -372,7 +372,7 @@ async def test_single_edge_group_tracing_condition_failure(span_exporter) -> Non span = edge_group_spans[0] assert span.attributes is not None - assert span.name == span.attributes.get("edge_group.id") + assert span.name == "edge_group.process SingleEdgeGroup" assert span.attributes.get("edge_group.type") == "SingleEdgeGroup" assert span.attributes.get("edge_group.delivered") is False assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DROPPED_CONDITION_FALSE.value @@ -407,7 +407,7 @@ async def test_single_edge_group_tracing_type_mismatch(span_exporter) -> None: span = edge_group_spans[0] assert span.attributes is not None - assert span.name == span.attributes.get("edge_group.id") + assert span.name == "edge_group.process SingleEdgeGroup" assert span.attributes.get("edge_group.type") == "SingleEdgeGroup" assert span.attributes.get("edge_group.delivered") is False assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DROPPED_TYPE_MISMATCH.value @@ -441,7 +441,7 @@ async def test_single_edge_group_tracing_target_mismatch(span_exporter) -> None: span = edge_group_spans[0] assert span.attributes is not None - assert span.name == span.attributes.get("edge_group.id") + assert span.name == "edge_group.process SingleEdgeGroup" assert span.attributes.get("edge_group.type") == "SingleEdgeGroup" assert span.attributes.get("edge_group.delivered") is False assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DROPPED_TARGET_MISMATCH.value @@ -800,7 +800,7 @@ async def test_fan_out_edge_group_tracing_success(span_exporter) -> None: span = edge_group_spans[0] assert span.attributes is not None - assert span.name == span.attributes.get("edge_group.id") + assert span.name == "edge_group.process FanOutEdgeGroup" assert span.attributes.get("edge_group.type") == "FanOutEdgeGroup" assert span.attributes.get("edge_group.delivered") is True assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DELIVERED.value @@ -856,7 +856,7 @@ async def test_fan_out_edge_group_tracing_with_target(span_exporter) -> None: span = edge_group_spans[0] assert span.attributes is not None - assert span.name == span.attributes.get("edge_group.id") + assert span.name == "edge_group.process FanOutEdgeGroup" assert span.attributes.get("edge_group.type") == "FanOutEdgeGroup" assert span.attributes.get("edge_group.delivered") is True assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DELIVERED.value @@ -1024,7 +1024,7 @@ async def test_fan_in_edge_group_tracing_buffered(span_exporter) -> None: span = edge_group_spans[0] assert span.attributes is not None - assert span.name == span.attributes.get("edge_group.id") + assert span.name == "edge_group.process FanInEdgeGroup" assert span.attributes.get("edge_group.type") == "FanInEdgeGroup" assert span.attributes.get("edge_group.delivered") is True assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.BUFFERED.value @@ -1056,7 +1056,7 @@ async def test_fan_in_edge_group_tracing_buffered(span_exporter) -> None: span = edge_group_spans[0] assert span.attributes is not None - assert span.name == span.attributes.get("edge_group.id") + assert span.name == "edge_group.process FanInEdgeGroup" assert span.attributes.get("edge_group.type") == "FanInEdgeGroup" assert span.attributes.get("edge_group.delivered") is True assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DELIVERED.value @@ -1102,7 +1102,7 @@ async def test_fan_in_edge_group_tracing_type_mismatch(span_exporter) -> None: span = edge_group_spans[0] assert span.attributes is not None - assert span.name == span.attributes.get("edge_group.id") + assert span.name == "edge_group.process FanInEdgeGroup" assert span.attributes.get("edge_group.type") == "FanInEdgeGroup" assert span.attributes.get("edge_group.delivered") is False assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DROPPED_TYPE_MISMATCH.value diff --git a/python/packages/core/tests/workflow/test_workflow_observability.py b/python/packages/core/tests/workflow/test_workflow_observability.py index f3903e3bbc..1760361f1a 100644 --- a/python/packages/core/tests/workflow/test_workflow_observability.py +++ b/python/packages/core/tests/workflow/test_workflow_observability.py @@ -151,8 +151,8 @@ async def test_span_creation_and_attributes(span_exporter: InMemorySpanExporter) event_names = [event.name for event in workflow_span.events] assert "workflow.started" in event_names - # Check processing span - span name is now the executor_id - processing_span = next(s for s in spans if s.name == "executor-456") + # Check processing span - span name uses format "executor.process {executor_id}" + processing_span = next(s for s in spans if s.name == "executor.process executor-456") assert processing_span.kind == trace.SpanKind.INTERNAL assert processing_span.attributes is not None assert processing_span.attributes.get("executor.id") == "executor-456" @@ -219,7 +219,9 @@ async def test_trace_context_handling(span_exporter: InMemorySpanExporter) -> No # Verify processing span attributes processing_span = processing_spans[0] - assert processing_span.name == "test-executor" # Span name is now executor_id + assert ( + processing_span.name == "executor.process test-executor" + ) # Span name uses format "executor.process {executor_id}" assert processing_span.attributes is not None assert processing_span.attributes.get("executor.id") == "test-executor" assert processing_span.attributes.get("executor.type") == "MockExecutor"