Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 8 additions & 14 deletions python/packages/core/agent_framework/_workflows/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,9 @@ async def _run_workflow_with_tracing(
span.add_event(OtelAttr.WORKFLOW_STARTED)
# Emit explicit start/status events to the stream
with _framework_event_origin():
started = WorkflowEvent.started()
yield started
yield WorkflowEvent.started()
with _framework_event_origin():
in_progress = WorkflowEvent.status(WorkflowRunState.IN_PROGRESS)
yield in_progress
yield WorkflowEvent.status(WorkflowRunState.IN_PROGRESS)

# Reset context for a new run if supported
if reset_context:
Expand Down Expand Up @@ -372,17 +370,15 @@ async def _run_workflow_with_tracing(
if event.type == "request_info" and not emitted_in_progress_pending:
emitted_in_progress_pending = True
with _framework_event_origin():
pending_status = WorkflowEvent.status(WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS)
yield pending_status
yield WorkflowEvent.status(WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS)

# Workflow runs until idle - emit final status based on whether requests are pending
if saw_request:
with _framework_event_origin():
terminal_status = WorkflowEvent.status(WorkflowRunState.IDLE_WITH_PENDING_REQUESTS)
yield terminal_status
yield WorkflowEvent.status(WorkflowRunState.IDLE_WITH_PENDING_REQUESTS)
else:
with _framework_event_origin():
terminal_status = WorkflowEvent.status(WorkflowRunState.IDLE)
yield terminal_status
yield WorkflowEvent.status(WorkflowRunState.IDLE)

span.add_event(OtelAttr.WORKFLOW_COMPLETED)
except Exception as exc:
Expand All @@ -393,11 +389,9 @@ async def _run_workflow_with_tracing(
# Surface structured failure details before propagating exception
details = WorkflowErrorDetails.from_exception(exc)
with _framework_event_origin():
failed_event = WorkflowEvent.failed(details)
yield failed_event
yield WorkflowEvent.failed(details)
with _framework_event_origin():
failed_status = WorkflowEvent.status(WorkflowRunState.FAILED)
yield failed_status
yield WorkflowEvent.status(WorkflowRunState.FAILED)
span.add_event(
name=OtelAttr.WORKFLOW_ERROR,
attributes={
Expand Down
69 changes: 69 additions & 0 deletions python/packages/declarative/tests/test_workflow_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,75 @@ async def test_execute_if_workflow(self):
_text_outputs = [str(o) for o in outputs if isinstance(o, str) or hasattr(o, "data")] # noqa: F841
assert any("Condition was true" in str(o) for o in outputs)

@pytest.mark.asyncio
async def test_entry_join_executor_initializes_workflow_inputs(self):
"""Regression test for #3948: Entry JoinExecutor must initialize Workflow.Inputs.

When workflow.run() is called with a dict input, the Entry node (JoinExecutor
with kind: 'Entry') must call _ensure_state_initialized so that Workflow.Inputs
is populated. Without this, expressions like =inputs.age resolve to blank and
conditions like =Local.age < 13 always evaluate as true (blank treated as 0).
"""
factory = WorkflowFactory()
workflow = factory.create_workflow_from_yaml("""
name: entry-inputs-test
actions:
- kind: SetValue
id: get_age
path: Local.age
value: =inputs.age
- kind: If
id: check_age
condition: =Local.age < 13
then:
- kind: SendActivity
activity:
text: child
else:
- kind: SendActivity
activity:
text: adult
""")

# age=8 -> child branch
result_child = await workflow.run({"age": 8})
outputs_child = result_child.get_outputs()
assert any("child" in str(o) for o in outputs_child), f"Expected 'child' for age=8 but got: {outputs_child}"
assert not any("adult" in str(o) for o in outputs_child), (
f"Did not expect 'adult' for age=8 but got: {outputs_child}"
)

# age=25 -> adult branch (bug: blank treated as 0 made this always go to child)
result_adult = await workflow.run({"age": 25})
outputs_adult = result_adult.get_outputs()
assert any("adult" in str(o) for o in outputs_adult), f"Expected 'adult' for age=25 but got: {outputs_adult}"
assert not any("child" in str(o) for o in outputs_adult), (
f"Did not expect 'child' for age=25 but got: {outputs_adult}"
)

@pytest.mark.asyncio
async def test_entry_join_executor_initializes_workflow_inputs_string(self):
"""Regression test for #3948: Entry JoinExecutor must initialize Workflow.Inputs for string input.

When workflow.run() is called with a string input, Workflow.Inputs.input and
System.LastMessage.Text should be set correctly.
"""
factory = WorkflowFactory()
workflow = factory.create_workflow_from_yaml("""
name: entry-string-inputs-test
actions:
- kind: SetValue
path: Local.msg
value: =inputs.input
- kind: SendActivity
activity:
text: =Local.msg
""")

result = await workflow.run("hello-world")
outputs = result.get_outputs()
assert any("hello-world" in str(o) for o in outputs), f"Expected 'hello-world' in outputs but got: {outputs}"


class TestWorkflowFactoryAgentRegistration:
"""Tests for agent registration."""
Expand Down
Loading