Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/packages/core/agent_framework/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -1626,7 +1626,7 @@ def create_processing_span(
links.append(trace.Link(span_context))

return workflow_tracer().start_as_current_span(
OtelAttr.EXECUTOR_PROCESS_SPAN,
f"{OtelAttr.EXECUTOR_PROCESS_SPAN} {executor_id}",
kind=trace.SpanKind.INTERNAL,
attributes={
OtelAttr.EXECUTOR_ID: executor_id,
Expand Down Expand Up @@ -1699,7 +1699,7 @@ def create_edge_group_processing_span(
pass

return workflow_tracer().start_as_current_span(
OtelAttr.EDGE_GROUP_PROCESS_SPAN,
f"{OtelAttr.EDGE_GROUP_PROCESS_SPAN} {edge_group_type}",
kind=trace.SpanKind.INTERNAL,
attributes=attributes,
links=links,
Expand Down
27 changes: 18 additions & 9 deletions python/packages/core/tests/workflow/test_edge.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,13 @@ async def test_single_edge_group_tracing_success(span_exporter) -> None:
assert success is True

spans = span_exporter.get_finished_spans()
edge_group_spans = [s for s in spans if s.name == "edge_group.process"]
edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None]

assert len(edge_group_spans) == 1

span = edge_group_spans[0]
assert span.attributes is not None
assert span.name == "edge_group.process SingleEdgeGroup"
assert span.attributes.get("edge_group.type") == "SingleEdgeGroup"
assert span.attributes.get("edge_group.delivered") is True
assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DELIVERED.value
Expand Down Expand Up @@ -365,12 +366,13 @@ async def test_single_edge_group_tracing_condition_failure(span_exporter) -> Non
assert success is True # Returns True but condition failed

spans = span_exporter.get_finished_spans()
edge_group_spans = [s for s in spans if s.name == "edge_group.process"]
edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None]

assert len(edge_group_spans) == 1

span = edge_group_spans[0]
assert span.attributes is not None
assert span.name == "edge_group.process SingleEdgeGroup"
assert span.attributes.get("edge_group.type") == "SingleEdgeGroup"
assert span.attributes.get("edge_group.delivered") is False
assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DROPPED_CONDITION_FALSE.value
Expand Down Expand Up @@ -399,12 +401,13 @@ async def test_single_edge_group_tracing_type_mismatch(span_exporter) -> None:
assert success is False

spans = span_exporter.get_finished_spans()
edge_group_spans = [s for s in spans if s.name == "edge_group.process"]
edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None]

assert len(edge_group_spans) == 1

span = edge_group_spans[0]
assert span.attributes is not None
assert span.name == "edge_group.process SingleEdgeGroup"
assert span.attributes.get("edge_group.type") == "SingleEdgeGroup"
assert span.attributes.get("edge_group.delivered") is False
assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DROPPED_TYPE_MISMATCH.value
Expand Down Expand Up @@ -432,12 +435,13 @@ async def test_single_edge_group_tracing_target_mismatch(span_exporter) -> None:
assert success is False

spans = span_exporter.get_finished_spans()
edge_group_spans = [s for s in spans if s.name == "edge_group.process"]
edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None]

assert len(edge_group_spans) == 1

span = edge_group_spans[0]
assert span.attributes is not None
assert span.name == "edge_group.process SingleEdgeGroup"
assert span.attributes.get("edge_group.type") == "SingleEdgeGroup"
assert span.attributes.get("edge_group.delivered") is False
assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DROPPED_TARGET_MISMATCH.value
Expand Down Expand Up @@ -790,12 +794,13 @@ async def test_fan_out_edge_group_tracing_success(span_exporter) -> None:
assert success is True

spans = span_exporter.get_finished_spans()
edge_group_spans = [s for s in spans if s.name == "edge_group.process"]
edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None]

assert len(edge_group_spans) == 1

span = edge_group_spans[0]
assert span.attributes is not None
assert span.name == "edge_group.process FanOutEdgeGroup"
assert span.attributes.get("edge_group.type") == "FanOutEdgeGroup"
assert span.attributes.get("edge_group.delivered") is True
assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DELIVERED.value
Expand Down Expand Up @@ -845,12 +850,13 @@ async def test_fan_out_edge_group_tracing_with_target(span_exporter) -> None:
assert success is True

spans = span_exporter.get_finished_spans()
edge_group_spans = [s for s in spans if s.name == "edge_group.process"]
edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None]

assert len(edge_group_spans) == 1

span = edge_group_spans[0]
assert span.attributes is not None
assert span.name == "edge_group.process FanOutEdgeGroup"
assert span.attributes.get("edge_group.type") == "FanOutEdgeGroup"
assert span.attributes.get("edge_group.delivered") is True
assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DELIVERED.value
Expand Down Expand Up @@ -1012,12 +1018,13 @@ async def test_fan_in_edge_group_tracing_buffered(span_exporter) -> None:
assert success is True

spans = span_exporter.get_finished_spans()
edge_group_spans = [s for s in spans if s.name == "edge_group.process"]
edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None]

assert len(edge_group_spans) == 1

span = edge_group_spans[0]
assert span.attributes is not None
assert span.name == "edge_group.process FanInEdgeGroup"
assert span.attributes.get("edge_group.type") == "FanInEdgeGroup"
assert span.attributes.get("edge_group.delivered") is True
assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.BUFFERED.value
Expand All @@ -1043,12 +1050,13 @@ async def test_fan_in_edge_group_tracing_buffered(span_exporter) -> None:
assert success is True

spans = span_exporter.get_finished_spans()
edge_group_spans = [s for s in spans if s.name == "edge_group.process"]
edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None]

assert len(edge_group_spans) == 1

span = edge_group_spans[0]
assert span.attributes is not None
assert span.name == "edge_group.process FanInEdgeGroup"
assert span.attributes.get("edge_group.type") == "FanInEdgeGroup"
assert span.attributes.get("edge_group.delivered") is True
assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DELIVERED.value
Expand Down Expand Up @@ -1088,12 +1096,13 @@ async def test_fan_in_edge_group_tracing_type_mismatch(span_exporter) -> None:
assert success is False

spans = span_exporter.get_finished_spans()
edge_group_spans = [s for s in spans if s.name == "edge_group.process"]
edge_group_spans = [s for s in spans if s.attributes and s.attributes.get("edge_group.type") is not None]

assert len(edge_group_spans) == 1

span = edge_group_spans[0]
assert span.attributes is not None
assert span.name == "edge_group.process FanInEdgeGroup"
assert span.attributes.get("edge_group.type") == "FanInEdgeGroup"
assert span.attributes.get("edge_group.delivered") is False
assert span.attributes.get("edge_group.delivery_status") == EdgeGroupDeliveryStatus.DROPPED_TYPE_MISMATCH.value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ async def test_span_creation_and_attributes(span_exporter: InMemorySpanExporter)
event_names = [event.name for event in workflow_span.events]
assert "workflow.started" in event_names

# Check processing span
processing_span = next(s for s in spans if s.name == "executor.process")
# Check processing span - span name uses format "executor.process {executor_id}"
processing_span = next(s for s in spans if s.name == "executor.process executor-456")
assert processing_span.kind == trace.SpanKind.INTERNAL
assert processing_span.attributes is not None
assert processing_span.attributes.get("executor.id") == "executor-456"
Expand Down Expand Up @@ -210,14 +210,18 @@ async def test_trace_context_handling(span_exporter: InMemorySpanExporter) -> No

# Check that spans were created with proper attributes
spans = span_exporter.get_finished_spans()
processing_spans = [s for s in spans if s.name == "executor.process"]
# Processing spans now use executor_id as the span name
processing_spans = [s for s in spans if s.attributes and s.attributes.get("executor.id") == "test-executor"]
sending_spans = [s for s in spans if s.name == "message.send"]

assert len(processing_spans) >= 1
assert len(sending_spans) >= 1

# Verify processing span attributes
processing_span = processing_spans[0]
assert (
processing_span.name == "executor.process test-executor"
) # Span name uses format "executor.process {executor_id}"
assert processing_span.attributes is not None
assert processing_span.attributes.get("executor.id") == "test-executor"
assert processing_span.attributes.get("executor.type") == "MockExecutor"
Expand Down Expand Up @@ -329,8 +333,9 @@ async def test_end_to_end_workflow_tracing(span_exporter: InMemorySpanExporter)
spans = span_exporter.get_finished_spans()

# Should have workflow span, processing spans, and sending spans
# Processing spans now use executor_id as the span name, filter by executor.id attribute
workflow_spans = [s for s in spans if s.name == "workflow.run"]
processing_spans = [s for s in spans if s.name == "executor.process"]
processing_spans = [s for s in spans if s.attributes and s.attributes.get("executor.id") is not None]
sending_spans = [s for s in spans if s.name == "message.send"]
build_spans_after_run = [s for s in spans if s.name == "workflow.build"]

Expand Down
Loading