diff --git a/python/packages/core/agent_framework/observability.py b/python/packages/core/agent_framework/observability.py index 80c3a2bc79..2a590fd30e 100644 --- a/python/packages/core/agent_framework/observability.py +++ b/python/packages/core/agent_framework/observability.py @@ -1626,7 +1626,7 @@ def create_processing_span( links.append(trace.Link(span_context)) return workflow_tracer().start_as_current_span( - OtelAttr.EXECUTOR_PROCESS_SPAN, + f"{OtelAttr.EXECUTOR_PROCESS_SPAN} {executor_id}", kind=trace.SpanKind.INTERNAL, attributes={ OtelAttr.EXECUTOR_ID: executor_id, @@ -1699,7 +1699,7 @@ def create_edge_group_processing_span( pass return workflow_tracer().start_as_current_span( - OtelAttr.EDGE_GROUP_PROCESS_SPAN, + f"{OtelAttr.EDGE_GROUP_PROCESS_SPAN} {edge_group_type}", kind=trace.SpanKind.INTERNAL, attributes=attributes, links=links, diff --git a/python/packages/core/tests/workflow/test_edge.py b/python/packages/core/tests/workflow/test_edge.py index 38d7348440..316cae7a39 100644 --- a/python/packages/core/tests/workflow/test_edge.py +++ b/python/packages/core/tests/workflow/test_edge.py @@ -321,12 +321,13 @@ async def test_single_edge_group_tracing_success(span_exporter) -> None: assert success is True spans = span_exporter.get_finished_spans() - edge_group_spans = [s for s in spans if s.name == "edge_group.process"] + edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None] assert len(edge_group_spans) == 1 span = edge_group_spans[0] assert span.attributes is not None + assert span.name == "edge_group.process SingleEdgeGroup" assert span.attributes.get("edge_group.type") == "SingleEdgeGroup" assert span.attributes.get("edge_group.delivered") is True assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DELIVERED.value @@ -365,12 +366,13 @@ async def test_single_edge_group_tracing_condition_failure(span_exporter) -> Non assert success is True # Returns True but condition failed spans = span_exporter.get_finished_spans() - edge_group_spans = [s for s in spans if s.name == "edge_group.process"] + edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None] assert len(edge_group_spans) == 1 span = edge_group_spans[0] assert span.attributes is not None + assert span.name == "edge_group.process SingleEdgeGroup" assert span.attributes.get("edge_group.type") == "SingleEdgeGroup" assert span.attributes.get("edge_group.delivered") is False assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DROPPED_CONDITION_FALSE.value @@ -399,12 +401,13 @@ async def test_single_edge_group_tracing_type_mismatch(span_exporter) -> None: assert success is False spans = span_exporter.get_finished_spans() - edge_group_spans = [s for s in spans if s.name == "edge_group.process"] + edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None] assert len(edge_group_spans) == 1 span = edge_group_spans[0] assert span.attributes is not None + assert span.name == "edge_group.process SingleEdgeGroup" assert span.attributes.get("edge_group.type") == "SingleEdgeGroup" assert span.attributes.get("edge_group.delivered") is False assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DROPPED_TYPE_MISMATCH.value @@ -432,12 +435,13 @@ async def test_single_edge_group_tracing_target_mismatch(span_exporter) -> None: assert success is False spans = span_exporter.get_finished_spans() - edge_group_spans = [s for s in spans if s.name == "edge_group.process"] + edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None] assert len(edge_group_spans) == 1 span = edge_group_spans[0] assert span.attributes is not None + assert span.name == "edge_group.process SingleEdgeGroup" assert span.attributes.get("edge_group.type") == "SingleEdgeGroup" assert span.attributes.get("edge_group.delivered") is False assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DROPPED_TARGET_MISMATCH.value @@ -790,12 +794,13 @@ async def test_fan_out_edge_group_tracing_success(span_exporter) -> None: assert success is True spans = span_exporter.get_finished_spans() - edge_group_spans = [s for s in spans if s.name == "edge_group.process"] + edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None] assert len(edge_group_spans) == 1 span = edge_group_spans[0] assert span.attributes is not None + assert span.name == "edge_group.process FanOutEdgeGroup" assert span.attributes.get("edge_group.type") == "FanOutEdgeGroup" assert span.attributes.get("edge_group.delivered") is True assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DELIVERED.value @@ -845,12 +850,13 @@ async def test_fan_out_edge_group_tracing_with_target(span_exporter) -> None: assert success is True spans = span_exporter.get_finished_spans() - edge_group_spans = [s for s in spans if s.name == "edge_group.process"] + edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None] assert len(edge_group_spans) == 1 span = edge_group_spans[0] assert span.attributes is not None + assert span.name == "edge_group.process FanOutEdgeGroup" assert span.attributes.get("edge_group.type") == "FanOutEdgeGroup" assert span.attributes.get("edge_group.delivered") is True assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DELIVERED.value @@ -1012,12 +1018,13 @@ async def test_fan_in_edge_group_tracing_buffered(span_exporter) -> None: assert success is True spans = span_exporter.get_finished_spans() - edge_group_spans = [s for s in spans if s.name == "edge_group.process"] + edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None] assert len(edge_group_spans) == 1 span = edge_group_spans[0] assert span.attributes is not None + assert span.name == "edge_group.process FanInEdgeGroup" assert span.attributes.get("edge_group.type") == "FanInEdgeGroup" assert span.attributes.get("edge_group.delivered") is True assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.BUFFERED.value @@ -1043,12 +1050,13 @@ async def test_fan_in_edge_group_tracing_buffered(span_exporter) -> None: assert success is True spans = span_exporter.get_finished_spans() - edge_group_spans = [s for s in spans if s.name == "edge_group.process"] + edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None] assert len(edge_group_spans) == 1 span = edge_group_spans[0] assert span.attributes is not None + assert span.name == "edge_group.process FanInEdgeGroup" assert span.attributes.get("edge_group.type") == "FanInEdgeGroup" assert span.attributes.get("edge_group.delivered") is True assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DELIVERED.value @@ -1088,12 +1096,13 @@ async def test_fan_in_edge_group_tracing_type_mismatch(span_exporter) -> None: assert success is False spans = span_exporter.get_finished_spans() - edge_group_spans = [s for s in spans if s.name == "edge_group.process"] + edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None] assert len(edge_group_spans) == 1 span = edge_group_spans[0] assert span.attributes is not None + assert span.name == "edge_group.process FanInEdgeGroup" assert span.attributes.get("edge_group.type") == "FanInEdgeGroup" assert span.attributes.get("edge_group.delivered") is False assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DROPPED_TYPE_MISMATCH.value diff --git a/python/packages/core/tests/workflow/test_workflow_observability.py b/python/packages/core/tests/workflow/test_workflow_observability.py index 5856a80035..1760361f1a 100644 --- a/python/packages/core/tests/workflow/test_workflow_observability.py +++ b/python/packages/core/tests/workflow/test_workflow_observability.py @@ -151,8 +151,8 @@ async def test_span_creation_and_attributes(span_exporter: InMemorySpanExporter) event_names = [event.name for event in workflow_span.events] assert "workflow.started" in event_names - # Check processing span - processing_span = next(s for s in spans if s.name == "executor.process") + # Check processing span - span name uses format "executor.process {executor_id}" + processing_span = next(s for s in spans if s.name == "executor.process executor-456") assert processing_span.kind == trace.SpanKind.INTERNAL assert processing_span.attributes is not None assert processing_span.attributes.get("executor.id") == "executor-456" @@ -210,7 +210,8 @@ async def test_trace_context_handling(span_exporter: InMemorySpanExporter) -> No # Check that spans were created with proper attributes spans = span_exporter.get_finished_spans() - processing_spans = [s for s in spans if s.name == "executor.process"] + # Processing spans now use executor_id as the span name + processing_spans = [s for s in spans if s.attributes and s.attributes.get("executor.id") == "test-executor"] sending_spans = [s for s in spans if s.name == "message.send"] assert len(processing_spans) >= 1 @@ -218,6 +219,9 @@ async def test_trace_context_handling(span_exporter: InMemorySpanExporter) -> No # Verify processing span attributes processing_span = processing_spans[0] + assert ( + processing_span.name == "executor.process test-executor" + ) # Span name uses format "executor.process {executor_id}" assert processing_span.attributes is not None assert processing_span.attributes.get("executor.id") == "test-executor" assert processing_span.attributes.get("executor.type") == "MockExecutor" @@ -329,8 +333,9 @@ async def test_end_to_end_workflow_tracing(span_exporter: InMemorySpanExporter) spans = span_exporter.get_finished_spans() # Should have workflow span, processing spans, and sending spans + # Processing spans now use executor_id as the span name, filter by executor.id attribute workflow_spans = [s for s in spans if s.name == "workflow.run"] - processing_spans = [s for s in spans if s.name == "executor.process"] + processing_spans = [s for s in spans if s.attributes and s.attributes.get("executor.id") is not None] sending_spans = [s for s in spans if s.name == "message.send"] build_spans_after_run = [s for s in spans if s.name == "workflow.build"]