From 5795b13edf9cbe7fd80a24b4128316a93450aac5 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Fri, 6 Feb 2026 14:56:14 +0900 Subject: [PATCH 1/3] Streamline workflow run api with send responses in one method --- .../core/agent_framework/_workflows/_agent.py | 46 +-- .../agent_framework/_workflows/_workflow.py | 292 +++++++++--------- .../_workflows/_workflow_executor.py | 2 +- .../test_agent_executor_tool_calls.py | 8 +- .../test_request_info_and_response.py | 12 +- .../core/tests/workflow/test_sub_workflow.py | 8 +- .../core/tests/workflow/test_workflow.py | 18 +- .../_workflows/_executors_agents.py | 2 +- .../devui/agent_framework_devui/_executor.py | 53 +--- .../_magentic.py | 2 +- .../orchestrations/tests/test_group_chat.py | 2 +- .../orchestrations/tests/test_handoff.py | 8 +- .../orchestrations/tests/test_magentic.py | 12 +- .../orchestrations/03_swarm.py | 2 +- .../handoff_participant_factory.py | 6 +- .../orchestrations/handoff_simple.py | 4 +- .../handoff_with_code_interpreter_file.py | 2 +- .../orchestrations/magentic_checkpoint.py | 4 +- .../magentic_human_plan_review.py | 4 +- ...re_chat_agents_tool_calls_with_feedback.py | 2 +- .../checkpoint_with_human_in_the_loop.py | 2 +- ...ff_with_tool_approval_checkpoint_resume.py | 10 +- .../checkpoint/sub_workflow_checkpoint.py | 2 +- .../sub_workflow_parallel_requests.py | 2 +- .../declarative/customer_support/main.py | 2 +- .../declarative/function_tools/main.py | 2 +- .../human-in-the-loop/agents_with_HITL.py | 4 +- .../agents_with_approval_requests.py | 4 +- .../concurrent_request_info.py | 4 +- .../group_chat_request_info.py | 4 +- .../guessing_game_with_human_input.py | 10 +- .../sequential_request_info.py | 6 +- .../magentic_human_plan_review.py | 2 +- .../concurrent_builder_tool_approval.py | 4 +- .../group_chat_builder_tool_approval.py | 4 +- .../sequential_builder_tool_approval.py | 6 +- .../orchestrations/handoff.py | 2 +- 37 files changed, 252 insertions(+), 307 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index 70b385c06d..ebdee9b613 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -331,60 +331,46 @@ async def _run_core( Yields: WorkflowEvent objects from the workflow execution. """ - # Determine the execution mode based on state + # Determine the execution mode based on state. + # The streaming flag controls the workflow's internal streaming mode, + # which affects executor behavior (e.g. AgentExecutor emits different event + # types in streaming vs non-streaming mode). if bool(self.pending_requests): - # This is a continuation - send function responses back function_responses = self._process_pending_requests(input_messages) - if streaming: - async for event in self.workflow.send_responses_streaming(function_responses): + async for event in self.workflow.run(responses=function_responses, stream=True, **kwargs): yield event else: - workflow_result = await self.workflow.send_responses(function_responses) - for event in workflow_result: + for event in await self.workflow.run(responses=function_responses, **kwargs): yield event elif checkpoint_id is not None: - # Resume from checkpoint - don't prepend thread history since workflow state - # is being restored from the checkpoint if streaming: async for event in self.workflow.run( - message=None, - stream=True, - checkpoint_id=checkpoint_id, - checkpoint_storage=checkpoint_storage, - **kwargs, + stream=True, checkpoint_id=checkpoint_id, + checkpoint_storage=checkpoint_storage, **kwargs, ): yield event else: - workflow_result = await self.workflow.run( - message=None, + for event in await self.workflow.run( checkpoint_id=checkpoint_id, - checkpoint_storage=checkpoint_storage, - **kwargs, - ) - for event in workflow_result: + checkpoint_storage=checkpoint_storage, **kwargs, + ): yield event else: - # Initial run - build conversation from thread history conversation_messages = await self._build_conversation_messages(thread, input_messages) - if streaming: async for event in self.workflow.run( - message=conversation_messages, - stream=True, - checkpoint_storage=checkpoint_storage, - **kwargs, + message=conversation_messages, stream=True, + checkpoint_storage=checkpoint_storage, **kwargs, ): yield event else: - workflow_result = await self.workflow.run( + for event in await self.workflow.run( message=conversation_messages, - checkpoint_storage=checkpoint_storage, - **kwargs, - ) - for event in workflow_result: + checkpoint_storage=checkpoint_storage, **kwargs, + ): yield event # endregion Run Methods diff --git a/python/packages/core/agent_framework/_workflows/_workflow.py b/python/packages/core/agent_framework/_workflows/_workflow.py index 665e6541f3..952bf41658 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow.py +++ b/python/packages/core/agent_framework/_workflows/_workflow.py @@ -10,6 +10,7 @@ from collections.abc import AsyncIterable, Awaitable, Callable from typing import Any, Literal, overload +from .._types import ResponseStream from ..observability import OtelAttr, capture_exception, create_workflow_span from ._agent import WorkflowAgent from ._checkpoint import CheckpointStorage @@ -147,7 +148,7 @@ class Workflow(DictConvertible): 2. Executor implements `response_handler()` to process the response 3. Requests are emitted as RequestInfoEvent instances in the event stream 4. Workflow enters IDLE_WITH_PENDING_REQUESTS state - 5. Caller handles requests and provides responses via the `send_responses` or `send_responses_streaming` methods + 5. Caller handles requests and provides responses via `run(responses=...)` or `run(responses=..., stream=True)` 6. Responses are routed to the requesting executors and response handlers are invoked ## Checkpointing @@ -453,186 +454,143 @@ def run( message: Any | None = None, *, stream: Literal[True], + responses: dict[str, Any] | None = None, checkpoint_id: str | None = None, checkpoint_storage: CheckpointStorage | None = None, **kwargs: Any, - ) -> AsyncIterable[WorkflowEvent]: ... + ) -> ResponseStream[WorkflowEvent, WorkflowRunResult]: ... @overload - async def run( + def run( self, message: Any | None = None, *, stream: Literal[False] = ..., + responses: dict[str, Any] | None = None, checkpoint_id: str | None = None, checkpoint_storage: CheckpointStorage | None = None, include_status_events: bool = False, **kwargs: Any, - ) -> WorkflowRunResult: ... + ) -> Awaitable[WorkflowRunResult]: ... def run( self, message: Any | None = None, *, stream: bool = False, + responses: dict[str, Any] | None = None, checkpoint_id: str | None = None, checkpoint_storage: CheckpointStorage | None = None, include_status_events: bool = False, **kwargs: Any, - ) -> AsyncIterable[WorkflowEvent] | Awaitable[WorkflowRunResult]: + ) -> ResponseStream[WorkflowEvent, WorkflowRunResult] | Awaitable[WorkflowRunResult]: """Run the workflow, optionally streaming events. - Unified interface supporting initial runs and checkpoint restoration. + Unified interface supporting initial runs, checkpoint restoration, and + sending responses to pending requests. Args: - message: Initial message for the start executor. Required for new workflow runs, - should be None when resuming from checkpoint. - stream: If True, returns an async iterable of events. If False (default), - returns an awaitable WorkflowRunResult. - checkpoint_id: ID of checkpoint to restore from. If provided, the workflow resumes - from this checkpoint instead of starting fresh. + message: Initial message for the start executor. Required for new workflow runs. + Mutually exclusive with responses. + stream: If True, returns a ResponseStream of events with + ``get_final_response()`` for the final WorkflowRunResult. If False + (default), returns an awaitable WorkflowRunResult. + responses: Responses to send for pending request info events, where keys are + request IDs and values are the corresponding response data. Mutually + exclusive with message. Can be combined with checkpoint_id to restore + a checkpoint and send responses in a single call. + checkpoint_id: ID of checkpoint to restore from. Can be used alone (resume + from checkpoint), with message (not allowed), or with responses + (restore then send responses). checkpoint_storage: Runtime checkpoint storage. include_status_events: Whether to include WorkflowStatusEvent instances (non-streaming only). **kwargs: Additional keyword arguments to pass through to agent invocations. Returns: - When stream=True: An AsyncIterable[WorkflowEvent] for streaming events. + When stream=True: A ResponseStream[WorkflowEvent, WorkflowRunResult] for + streaming events. Iterate for events, call get_final_response() for result. When stream=False: An Awaitable[WorkflowRunResult] with all events. Raises: - ValueError: If both message and checkpoint_id are provided, or if neither is provided. + ValueError: If parameter combination is invalid. """ - if stream: - return self._run_streaming( + # Validate parameters and set running flag eagerly (before any async work) + self._validate_run_params(message, responses, checkpoint_id) + self._ensure_not_running() + + response_stream = ResponseStream[WorkflowEvent, WorkflowRunResult]( + self._run_core( message=message, + responses=responses, checkpoint_id=checkpoint_id, checkpoint_storage=checkpoint_storage, + streaming=stream, **kwargs, - ) - return self._run_non_streaming( - message=message, - checkpoint_id=checkpoint_id, - checkpoint_storage=checkpoint_storage, - include_status_events=include_status_events, - **kwargs, + ), + finalizer=functools.partial(self._finalize_events, include_status_events=include_status_events), + cleanup_hooks=[ + functools.partial(self._run_cleanup, checkpoint_storage), + ], ) - async def _run_streaming( + if stream: + return response_stream + return response_stream.get_final_response() + + async def _run_core( self, message: Any | None = None, *, + responses: dict[str, Any] | None = None, checkpoint_id: str | None = None, checkpoint_storage: CheckpointStorage | None = None, + streaming: bool = False, **kwargs: Any, ) -> AsyncIterable[WorkflowEvent]: - """Internal streaming implementation.""" - # Validate mutually exclusive parameters BEFORE setting running flag - if message is not None and checkpoint_id is not None: - raise ValueError("Cannot provide both 'message' and 'checkpoint_id'. Use one or the other.") - - if message is None and checkpoint_id is None: - raise ValueError("Must provide either 'message' (new run) or 'checkpoint_id' (resume).") - - self._ensure_not_running() + """Single core execution path for both streaming and non-streaming modes. + Yields: + WorkflowEvent: The events generated during the workflow execution. + """ # Enable runtime checkpointing if storage provided - # Two cases: - # 1. checkpoint_storage + checkpoint_id: Load checkpoint from this storage and resume - # 2. checkpoint_storage without checkpoint_id: Enable checkpointing for this run if checkpoint_storage is not None: self._runner.context.set_runtime_checkpoint_storage(checkpoint_storage) - try: - # Reset context only for new runs (not checkpoint restoration) - reset_context = message is not None and checkpoint_id is None - - async for event in self._run_workflow_with_tracing( - initial_executor_fn=functools.partial( - self._execute_with_message_or_checkpoint, message, checkpoint_id, checkpoint_storage - ), - reset_context=reset_context, - streaming=True, - run_kwargs=kwargs if kwargs else None, - ): - if isinstance(event, WorkflowOutputEvent) and not self._should_yield_output_event(event): - continue - yield event - finally: - if checkpoint_storage is not None: - self._runner.context.clear_runtime_checkpoint_storage() - self._reset_running_flag() - - async def send_responses_streaming(self, responses: dict[str, Any]) -> AsyncIterable[WorkflowEvent]: - """Send responses back to the workflow and stream the events generated by the workflow. + initial_executor_fn, reset_context = self._resolve_execution_mode( + message, responses, checkpoint_id, checkpoint_storage + ) - Args: - responses: The responses to be sent back to the workflow, where keys are request IDs - and values are the corresponding response data. + async for event in self._run_workflow_with_tracing( + initial_executor_fn=initial_executor_fn, + reset_context=reset_context, + streaming=streaming, + run_kwargs=kwargs if kwargs else None, + ): + if isinstance(event, WorkflowOutputEvent) and not self._should_yield_output_event(event): + continue + yield event - Yields: - WorkflowEvent: The events generated during the workflow execution after sending the responses. - """ - self._ensure_not_running() - try: - async for event in self._run_workflow_with_tracing( - initial_executor_fn=functools.partial(self._send_responses_internal, responses), - reset_context=False, # Don't reset context when sending responses - streaming=True, - ): - if isinstance(event, WorkflowOutputEvent) and not self._should_yield_output_event(event): - continue - yield event - finally: - self._reset_running_flag() + async def _run_cleanup(self, checkpoint_storage: CheckpointStorage | None) -> None: + """Cleanup hook called after stream consumption.""" + if checkpoint_storage is not None: + self._runner.context.clear_runtime_checkpoint_storage() + self._reset_running_flag() - async def _run_non_streaming( - self, - message: Any | None = None, + @staticmethod + def _finalize_events( + events: list[WorkflowEvent], *, - checkpoint_id: str | None = None, - checkpoint_storage: CheckpointStorage | None = None, include_status_events: bool = False, - **kwargs: Any, ) -> WorkflowRunResult: - """Internal non-streaming implementation.""" - # Validate mutually exclusive parameters BEFORE setting running flag - if message is not None and checkpoint_id is not None: - raise ValueError("Cannot provide both 'message' and 'checkpoint_id'. Use one or the other.") + """Convert collected workflow events into a WorkflowRunResult. - if message is None and checkpoint_id is None: - raise ValueError("Must provide either 'message' (new run) or 'checkpoint_id' (resume).") - - self._ensure_not_running() - - # Enable runtime checkpointing if storage provided - if checkpoint_storage is not None: - self._runner.context.set_runtime_checkpoint_storage(checkpoint_storage) - - try: - # Reset context only for new runs (not checkpoint restoration) - reset_context = message is not None and checkpoint_id is None - - raw_events = [ - event - async for event in self._run_workflow_with_tracing( - initial_executor_fn=functools.partial( - self._execute_with_message_or_checkpoint, message, checkpoint_id, checkpoint_storage - ), - reset_context=reset_context, - run_kwargs=kwargs if kwargs else None, - ) - ] - finally: - if checkpoint_storage is not None: - self._runner.context.clear_runtime_checkpoint_storage() - self._reset_running_flag() - - # Filter events for non-streaming mode + Filters out internal events for non-streaming callers. + """ filtered: list[WorkflowEvent] = [] status_events: list[WorkflowStatusEvent] = [] - for ev in raw_events: - # Omit WorkflowStartedEvent from non-streaming (telemetry-only) + for ev in events: + # Omit WorkflowStartedEvent from result (telemetry-only) if isinstance(ev, WorkflowStartedEvent): continue # Track status; include inline only if explicitly requested @@ -641,41 +599,89 @@ async def _run_non_streaming( if include_status_events: filtered.append(ev) continue - if isinstance(ev, WorkflowOutputEvent) and not self._should_yield_output_event(ev): - continue filtered.append(ev) return WorkflowRunResult(filtered, status_events) - async def send_responses(self, responses: dict[str, Any]) -> WorkflowRunResult: - """Send responses back to the workflow. + @staticmethod + def _validate_run_params( + message: Any | None, + responses: dict[str, Any] | None, + checkpoint_id: str | None, + ) -> None: + """Validate parameter combinations for run(). - Args: - responses: A dictionary where keys are request IDs and values are the corresponding response data. + Rules: + - message and responses are mutually exclusive + - message and checkpoint_id are mutually exclusive + - At least one of message, responses, or checkpoint_id must be provided + - responses + checkpoint_id is allowed (restore then send) + """ + if message is not None and responses is not None: + raise ValueError("Cannot provide both 'message' and 'responses'. Use one or the other.") + + if message is not None and checkpoint_id is not None: + raise ValueError("Cannot provide both 'message' and 'checkpoint_id'. Use one or the other.") + + if message is None and responses is None and checkpoint_id is None: + raise ValueError( + "Must provide at least one of: 'message' (new run), 'responses' (send responses), " + "or 'checkpoint_id' (resume from checkpoint)." + ) + + def _resolve_execution_mode( + self, + message: Any | None, + responses: dict[str, Any] | None, + checkpoint_id: str | None, + checkpoint_storage: CheckpointStorage | None, + ) -> tuple[Callable[[], Awaitable[None]], bool]: + """Determine the initial executor function and reset_context flag based on parameters. Returns: - A WorkflowRunResult instance containing a list of events generated during the workflow execution. + A tuple of (initial_executor_fn, reset_context). """ - self._ensure_not_running() - try: - events = [ - event - async for event in self._run_workflow_with_tracing( - initial_executor_fn=functools.partial(self._send_responses_internal, responses), - reset_context=False, # Don't reset context when sending responses + if responses is not None: + if checkpoint_id is not None: + # Combined: restore checkpoint then send responses + initial_executor_fn = functools.partial( + self._restore_and_send_responses, checkpoint_id, checkpoint_storage, responses ) - ] - status_events = [e for e in events if isinstance(e, WorkflowStatusEvent)] - filtered_events: list[WorkflowEvent] = [] - for e in events: - if isinstance(e, WorkflowOutputEvent) and not self._should_yield_output_event(e): - continue - if isinstance(e, (WorkflowStatusEvent, WorkflowStartedEvent)): - continue - filtered_events.append(e) - return WorkflowRunResult(filtered_events, status_events) - finally: - self._reset_running_flag() + else: + # Send responses only (requires pending requests in workflow state) + initial_executor_fn = functools.partial(self._send_responses_internal, responses) + return initial_executor_fn, False + else: + # Regular run or checkpoint restoration + initial_executor_fn = functools.partial( + self._execute_with_message_or_checkpoint, message, checkpoint_id, checkpoint_storage + ) + reset_context = message is not None and checkpoint_id is None + return initial_executor_fn, reset_context + + async def _restore_and_send_responses( + self, + checkpoint_id: str, + checkpoint_storage: CheckpointStorage | None, + responses: dict[str, Any], + ) -> None: + """Restore from a checkpoint then send responses to pending requests. + + Args: + checkpoint_id: ID of checkpoint to restore from. + checkpoint_storage: Runtime checkpoint storage. + responses: Responses to send after restoration. + """ + has_checkpointing = self._runner.context.has_checkpointing() + + if not has_checkpointing and checkpoint_storage is None: + raise ValueError( + "Cannot restore from checkpoint: either provide checkpoint_storage parameter " + "or build workflow with WorkflowBuilder.with_checkpointing(checkpoint_storage)." + ) + + await self._runner.restore_from_checkpoint(checkpoint_id, checkpoint_storage) + await self._send_responses_internal(responses) async def _send_responses_internal(self, responses: dict[str, Any]) -> None: """Internal method to validate and send responses to the executors.""" diff --git a/python/packages/core/agent_framework/_workflows/_workflow_executor.py b/python/packages/core/agent_framework/_workflows/_workflow_executor.py index 029e89e000..4f1c625f33 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_executor.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_executor.py @@ -651,7 +651,7 @@ async def _handle_response( try: # Resume the sub-workflow with all collected responses - result = await self.workflow.send_responses(responses_to_send) + result = await self.workflow.run(responses=responses_to_send) # Remove handled requests from result. The result may contain the original # RequestInfoEvents that were already handled. This is due to checkpointing # and rehydration of the workflow that re-adds the RequestInfoEvents to the diff --git a/python/packages/core/tests/workflow/test_agent_executor_tool_calls.py b/python/packages/core/tests/workflow/test_agent_executor_tool_calls.py index 7f2e4931e5..f4dc465af7 100644 --- a/python/packages/core/tests/workflow/test_agent_executor_tool_calls.py +++ b/python/packages/core/tests/workflow/test_agent_executor_tool_calls.py @@ -268,7 +268,7 @@ async def test_agent_executor_tool_call_with_approval() -> None: assert approval_request.data.function_call.arguments == '{"query": "test"}' # Act - events = await workflow.send_responses({ + events = await workflow.run(responses={ approval_request.request_id: approval_request.data.to_function_approval_response(True) }) @@ -304,7 +304,7 @@ async def test_agent_executor_tool_call_with_approval_streaming() -> None: # Act output: str | None = None - async for event in workflow.send_responses_streaming({ + async for event in workflow.run(stream=True, responses={ approval_request.request_id: approval_request.data.to_function_approval_response(True) }): if isinstance(event, WorkflowOutputEvent): @@ -347,7 +347,7 @@ async def test_agent_executor_parallel_tool_call_with_approval() -> None: approval_request.request_id: approval_request.data.to_function_approval_response(True) # type: ignore for approval_request in events.get_request_info_events() } - events = await workflow.send_responses(responses) + events = await workflow.run(responses=responses) # Assert final_response = events.get_outputs() @@ -386,7 +386,7 @@ async def test_agent_executor_parallel_tool_call_with_approval_streaming() -> No } output: str | None = None - async for event in workflow.send_responses_streaming(responses): + async for event in workflow.run(stream=True, responses=responses): if isinstance(event, WorkflowOutputEvent): output = event.data diff --git a/python/packages/core/tests/workflow/test_request_info_and_response.py b/python/packages/core/tests/workflow/test_request_info_and_response.py index 210cebd340..6cb2a49de5 100644 --- a/python/packages/core/tests/workflow/test_request_info_and_response.py +++ b/python/packages/core/tests/workflow/test_request_info_and_response.py @@ -193,7 +193,7 @@ async def test_approval_workflow(self): # Send response and continue workflow completed = False - async for event in workflow.send_responses_streaming({request_info_event.request_id: True}): + async for event in workflow.run(stream=True, responses={request_info_event.request_id: True}): if isinstance(event, WorkflowStatusEvent) and event.state == WorkflowRunState.IDLE: completed = True @@ -220,7 +220,7 @@ async def test_calculation_workflow(self): # Send response with calculated result calculated_result = 31.0 completed = False - async for event in workflow.send_responses_streaming({request_info_event.request_id: calculated_result}): + async for event in workflow.run(stream=True, responses={request_info_event.request_id: calculated_result}): if isinstance(event, WorkflowStatusEvent) and event.state == WorkflowRunState.IDLE: completed = True @@ -255,7 +255,7 @@ async def test_multiple_requests_workflow(self): # Send responses for both requests responses = {approval_event.request_id: True, calc_event.request_id: 50.0} completed = False - async for event in workflow.send_responses_streaming(responses): + async for event in workflow.run(stream=True, responses=responses): if isinstance(event, WorkflowStatusEvent) and event.state == WorkflowRunState.IDLE: completed = True @@ -277,7 +277,7 @@ async def test_denied_approval_workflow(self): # Deny the request completed = False - async for event in workflow.send_responses_streaming({request_info_event.request_id: False}): + async for event in workflow.run(stream=True, responses={request_info_event.request_id: False}): if isinstance(event, WorkflowStatusEvent) and event.state == WorkflowRunState.IDLE: completed = True @@ -304,7 +304,7 @@ async def test_workflow_state_with_pending_requests(self): # Continue with response completed = False - async for event in workflow.send_responses_streaming({request_info_event.request_id: True}): + async for event in workflow.run(stream=True, responses={request_info_event.request_id: True}): if isinstance(event, WorkflowStatusEvent) and event.state == WorkflowRunState.IDLE: completed = True @@ -399,7 +399,7 @@ async def test_checkpoint_with_pending_request_info_events(self): # Step 6: Provide response to the restored request and complete the workflow final_completed = False - async for event in restored_workflow.send_responses_streaming({ + async for event in restored_workflow.run(stream=True, responses={ request_info_event.request_id: True # Approve the request }): if isinstance(event, WorkflowStatusEvent) and event.state == WorkflowRunState.IDLE: diff --git a/python/packages/core/tests/workflow/test_sub_workflow.py b/python/packages/core/tests/workflow/test_sub_workflow.py index 33333d2906..ce05041f15 100644 --- a/python/packages/core/tests/workflow/test_sub_workflow.py +++ b/python/packages/core/tests/workflow/test_sub_workflow.py @@ -201,7 +201,7 @@ async def test_basic_sub_workflow() -> None: assert request_events[0].data.domain == "example.com" # Send response through the main workflow - await main_workflow.send_responses({ + await main_workflow.run(responses={ request_events[0].request_id: True # Domain is approved }) @@ -245,7 +245,7 @@ async def test_sub_workflow_with_interception(): assert request_events[0].data.domain == "unknown.com" # Send external response - await main_workflow.send_responses({ + await main_workflow.run(responses={ request_events[0].request_id: False # Domain not approved }) assert parent.result is not None @@ -447,7 +447,7 @@ async def collect_result(self, result: ValidationResult, ctx: WorkflowContext) - # Send responses for all requests (approve all domains) responses = {event.request_id: True for event in request_events} - await main_workflow.send_responses(responses) + await main_workflow.run(responses=responses) # All results should be collected assert len(processor.results) == len(emails) @@ -613,7 +613,7 @@ async def test_sub_workflow_checkpoint_restore_no_duplicate_requests() -> None: assert resumed_first_request_id == first_request_id request_events: list[RequestInfoEvent] = [] - async for event in workflow2.send_responses_streaming({resumed_first_request_id: "first_answer"}): + async for event in workflow2.run(stream=True, responses={resumed_first_request_id: "first_answer"}): if isinstance(event, RequestInfoEvent): request_events.append(event) diff --git a/python/packages/core/tests/workflow/test_workflow.py b/python/packages/core/tests/workflow/test_workflow.py index 314fad89a0..6d4a420686 100644 --- a/python/packages/core/tests/workflow/test_workflow.py +++ b/python/packages/core/tests/workflow/test_workflow.py @@ -383,7 +383,7 @@ async def test_workflow_run_stream_from_checkpoint_with_external_storage( try: events: list[WorkflowEvent] = [] async for event in workflow_without_checkpointing.run( - checkpoint_id=checkpoint_id, checkpoint_storage=storage + checkpoint_id=checkpoint_id, checkpoint_storage=storage, stream=True ): events.append(event) if len(events) >= 2: # Limit to avoid infinite loops @@ -956,11 +956,11 @@ async def test_workflow_run_parameter_validation(simple_executor: Executor) -> N pass # Invalid: none of message or checkpoint_id - with pytest.raises(ValueError, match="Must provide either"): + with pytest.raises(ValueError, match="Must provide at least one of"): await workflow.run() # Invalid: none of message or checkpoint_id (streaming) - with pytest.raises(ValueError, match="Must provide either"): + with pytest.raises(ValueError, match="Must provide at least one of"): async for _ in workflow.run(stream=True): pass @@ -1178,8 +1178,8 @@ async def handle(self, message: NumberMessage, ctx: WorkflowContext[NumberMessag assert outputs[0] == 40 -async def test_output_executors_filtering_with_send_responses() -> None: - """Test output filtering works correctly with send_responses method.""" +async def test_output_executors_filtering_with_run_responses() -> None: + """Test output filtering works correctly with run(responses=...) method.""" executor = MockExecutorRequestApproval(id="approval_executor") workflow = WorkflowBuilder().set_start_executor(executor).with_output_from([executor]).build() @@ -1193,7 +1193,7 @@ async def test_output_executors_filtering_with_send_responses() -> None: # Send approval response responses = {request_events[0].request_id: ApprovalMessage(approved=True)} - response_result = await workflow.send_responses(responses) + response_result = await workflow.run(responses=responses) outputs = response_result.get_outputs() # Output should be yielded since approval_executor is in output_executors @@ -1201,8 +1201,8 @@ async def test_output_executors_filtering_with_send_responses() -> None: assert outputs[0] == 42 -async def test_output_executors_filtering_with_send_responses_streaming() -> None: - """Test output filtering works correctly with send_responses_streaming method.""" +async def test_output_executors_filtering_with_run_responses_streaming() -> None: + """Test output filtering works correctly with run(responses=..., stream=True) method.""" executor = MockExecutorRequestApproval(id="approval_executor") workflow = WorkflowBuilder().set_start_executor(executor).build() @@ -1222,7 +1222,7 @@ async def test_output_executors_filtering_with_send_responses_streaming() -> Non # Send approval response via streaming responses = {request_events[0].request_id: ApprovalMessage(approved=True)} output_events: list[WorkflowOutputEvent] = [] - async for event in workflow.send_responses_streaming(responses): + async for event in workflow.run(responses=responses, stream=True): if isinstance(event, WorkflowOutputEvent): output_events.append(event) diff --git a/python/packages/declarative/agent_framework_declarative/_workflows/_executors_agents.py b/python/packages/declarative/agent_framework_declarative/_workflows/_executors_agents.py index 51904f665d..d4300a9909 100644 --- a/python/packages/declarative/agent_framework_declarative/_workflows/_executors_agents.py +++ b/python/packages/declarative/agent_framework_declarative/_workflows/_executors_agents.py @@ -755,7 +755,7 @@ async def handle_action( When externalLoop.when is configured and evaluates to true after agent response, this method emits an ExternalInputRequest via ctx.request_info() and returns. The workflow will yield, and when the caller provides a response via - send_responses_streaming(), the handle_external_input_response handler + run(responses=..., stream=True), the handle_external_input_response handler will continue the loop. """ state = await self._ensure_state_initialized(ctx, trigger) diff --git a/python/packages/devui/agent_framework_devui/_executor.py b/python/packages/devui/agent_framework_devui/_executor.py index ca06a6a951..ba3af3ce45 100644 --- a/python/packages/devui/agent_framework_devui/_executor.py +++ b/python/packages/devui/agent_framework_devui/_executor.py @@ -451,8 +451,6 @@ async def _execute_workflow( logger.info(f"Resuming workflow with HIL responses for {len(hil_responses)} request(s)") # Unwrap primitive responses if they're wrapped in {response: value} format - from ._utils import parse_input_for_type - unwrapped_responses = {} for request_id, response_value in hil_responses.items(): if isinstance(response_value, dict) and "response" in response_value: @@ -461,60 +459,15 @@ async def _execute_workflow( hil_responses = unwrapped_responses - # NOTE: Two-step approach for stateless HTTP (framework limitation): - # 1. Restore checkpoint to load pending requests into workflow's in-memory state - # 2. Then send responses using send_responses_streaming - # Future: Framework should support run(stream=True, checkpoint_id, responses) in single call - # (checkpoint_id is guaranteed to exist due to earlier validation) - logger.debug(f"Restoring checkpoint {checkpoint_id} then sending HIL responses") + logger.debug(f"Restoring checkpoint {checkpoint_id} and sending HIL responses") try: - # Step 1: Restore checkpoint to populate workflow's in-memory pending requests - restored = False - async for _event in workflow.run( + async for event in workflow.run( stream=True, + responses=hil_responses, checkpoint_id=checkpoint_id, checkpoint_storage=checkpoint_storage, ): - restored = True - break # Stop immediately after restoration, don't process events - - if not restored: - raise RuntimeError("Checkpoint restoration did not yield any events") - - # Reset running flags so we can call send_responses_streaming - if hasattr(workflow, "_is_running"): - workflow._is_running = False - if hasattr(workflow, "_runner") and hasattr(workflow._runner, "_running"): - workflow._runner._running = False - - # Extract response types from restored workflow and convert responses to proper types - try: - if hasattr(workflow, "_runner") and hasattr(workflow._runner, "context"): - runner_context = workflow._runner.context - pending_requests_dict = await runner_context.get_pending_request_info_events() - - converted_responses = {} - for request_id, response_value in hil_responses.items(): - if request_id in pending_requests_dict: - pending_request = pending_requests_dict[request_id] - if hasattr(pending_request, "response_type"): - response_type = pending_request.response_type - try: - response_value = parse_input_for_type(response_value, response_type) - logger.debug( - f"Converted HIL response for {request_id} to {type(response_value)}" - ) - except Exception as e: - logger.warning(f"Failed to convert HIL response for {request_id}: {e}") - - converted_responses[request_id] = response_value - - hil_responses = converted_responses - except Exception as e: - logger.warning(f"Could not convert HIL responses to proper types: {e}") - - async for event in workflow.send_responses_streaming(hil_responses): # Enrich new RequestInfoEvents that may come from subsequent HIL requests if isinstance(event, RequestInfoEvent): self._enrich_request_info_event_with_response_schema(event, workflow) diff --git a/python/packages/orchestrations/agent_framework_orchestrations/_magentic.py b/python/packages/orchestrations/agent_framework_orchestrations/_magentic.py index 51996f09a0..5abaa9a4c0 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/_magentic.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/_magentic.py @@ -1521,7 +1521,7 @@ def with_plan_review(self, enable: bool = True) -> "MagenticBuilder": if request.kind == MagenticHumanInterventionKind.PLAN_REVIEW: # Review plan and respond reply = MagenticHumanInterventionReply(decision=MagenticHumanInterventionDecision.APPROVE) - await workflow.send_responses({event.request_id: reply}) + await workflow.run(responses={event.request_id: reply}) See Also: - :class:`MagenticHumanInterventionRequest`: Event emitted for review diff --git a/python/packages/orchestrations/tests/test_group_chat.py b/python/packages/orchestrations/tests/test_group_chat.py index 77e707d6f7..17c298d0cd 100644 --- a/python/packages/orchestrations/tests/test_group_chat.py +++ b/python/packages/orchestrations/tests/test_group_chat.py @@ -786,7 +786,7 @@ async def selector(state: GroupChatState) -> str: # Continue the workflow with a response outputs: list[WorkflowOutputEvent] = [] - async for event in workflow.send_responses_streaming({ + async for event in workflow.run(stream=True, responses={ request_event.request_id: AgentRequestInfoResponse.approve() }): if isinstance(event, WorkflowOutputEvent): diff --git a/python/packages/orchestrations/tests/test_handoff.py b/python/packages/orchestrations/tests/test_handoff.py index 2242508aa7..12f4536c4a 100644 --- a/python/packages/orchestrations/tests/test_handoff.py +++ b/python/packages/orchestrations/tests/test_handoff.py @@ -257,7 +257,7 @@ async def async_termination(conv: list[ChatMessage]) -> bool: assert requests events = await _drain( - workflow.send_responses_streaming({ + workflow.run(stream=True, responses={ requests[-1].request_id: [ChatMessage(role="user", text="Second user message")] }) ) @@ -510,7 +510,7 @@ def create_specialist() -> MockHandoffAgent: # Follow-up message events = await _drain( - workflow.send_responses_streaming({requests[-1].request_id: [ChatMessage(role="user", text="More details")]}) + workflow.run(stream=True, responses={requests[-1].request_id: [ChatMessage(role="user", text="More details")]}) ) outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] assert outputs @@ -584,7 +584,7 @@ def create_specialist_b() -> MockHandoffAgent: # Second user message - specialist_a hands off to specialist_b events = await _drain( - workflow.send_responses_streaming({requests[-1].request_id: [ChatMessage(role="user", text="Need escalation")]}) + workflow.run(stream=True, responses={requests[-1].request_id: [ChatMessage(role="user", text="Need escalation")]}) ) requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] assert requests @@ -619,7 +619,7 @@ def create_specialist() -> MockHandoffAgent: assert requests events = await _drain( - workflow.send_responses_streaming({requests[-1].request_id: [ChatMessage(role="user", text="follow up")]}) + workflow.run(stream=True, responses={requests[-1].request_id: [ChatMessage(role="user", text="follow up")]}) ) outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] assert outputs, "Should have workflow output after termination condition is met" diff --git a/python/packages/orchestrations/tests/test_magentic.py b/python/packages/orchestrations/tests/test_magentic.py index 58943cdad4..581da7f81e 100644 --- a/python/packages/orchestrations/tests/test_magentic.py +++ b/python/packages/orchestrations/tests/test_magentic.py @@ -255,7 +255,7 @@ async def test_magentic_workflow_plan_review_approval_to_completion(): completed = False output: list[ChatMessage] | None = None - async for ev in wf.send_responses_streaming(responses={req_event.request_id: req_event.data.approve()}): + async for ev in wf.run(stream=True, responses={req_event.request_id: req_event.data.approve()}): if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: completed = True elif isinstance(ev, WorkflowOutputEvent): @@ -301,16 +301,16 @@ async def replan(self, magentic_context: MagenticContext) -> ChatMessage: # typ # Send a revise response saw_second_review = False completed = False - async for ev in wf.send_responses_streaming( - responses={req_event.request_id: req_event.data.revise("Looks good; consider Z")} + async for ev in wf.run( + stream=True, responses={req_event.request_id: req_event.data.revise("Looks good; consider Z")} ): if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticPlanReviewRequest: saw_second_review = True req_event = ev # Approve the second review - async for ev in wf.send_responses_streaming( - responses={req_event.request_id: req_event.data.approve()} # type: ignore[union-attr] + async for ev in wf.run( + stream=True, responses={req_event.request_id: req_event.data.approve()} # type: ignore[union-attr] ): if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: completed = True @@ -401,7 +401,7 @@ async def test_magentic_checkpoint_resume_round_trip(): assert isinstance(req_event.data, MagenticPlanReviewRequest) responses = {req_event.request_id: req_event.data.approve()} - async for event in wf_resume.send_responses_streaming(responses=responses): + async for event in wf_resume.run(stream=True, responses=responses): if isinstance(event, WorkflowOutputEvent): completed = event assert completed is not None diff --git a/python/samples/autogen-migration/orchestrations/03_swarm.py b/python/samples/autogen-migration/orchestrations/03_swarm.py index df398a96ea..f5cf25d468 100644 --- a/python/samples/autogen-migration/orchestrations/03_swarm.py +++ b/python/samples/autogen-migration/orchestrations/03_swarm.py @@ -193,7 +193,7 @@ async def run_agent_framework() -> None: current_executor = None stream_line_open = False - async for event in workflow.send_responses_streaming(responses): + async for event in workflow.run(stream=True, responses=responses): if isinstance(event, WorkflowOutputEvent) and isinstance(event.data, AgentResponseUpdate): # Print executor name header when switching to a new agent if current_executor != event.executor_id: diff --git a/python/samples/getting_started/orchestrations/handoff_participant_factory.py b/python/samples/getting_started/orchestrations/handoff_participant_factory.py index ee5c8830bc..2a40564220 100644 --- a/python/samples/getting_started/orchestrations/handoff_participant_factory.py +++ b/python/samples/getting_started/orchestrations/handoff_participant_factory.py @@ -211,9 +211,9 @@ async def _run_workflow(workflow: Workflow, user_inputs: list[str]) -> None: responses = {req.request_id: HandoffAgentUserRequest.terminate() for req in pending_requests} # Send responses and get new events - # We use send_responses_streaming() to get events as they occur, allowing us to - # display agent responses in real-time and handle new requests as they arrive - workflow_result = await workflow.send_responses(responses) + # We use run(responses=...) to get events, allowing us to + # display agent responses and handle new requests as they arrive + workflow_result = await workflow.run(responses=responses) pending_requests = _handle_events(workflow_result) diff --git a/python/samples/getting_started/orchestrations/handoff_simple.py b/python/samples/getting_started/orchestrations/handoff_simple.py index d439d5a719..f7532e2d9b 100644 --- a/python/samples/getting_started/orchestrations/handoff_simple.py +++ b/python/samples/getting_started/orchestrations/handoff_simple.py @@ -260,9 +260,9 @@ async def main() -> None: } # Send responses and get new events - # We use send_responses() to get events from the workflow, allowing us to + # We use run(responses=...) to get events from the workflow, allowing us to # display agent responses and handle new requests as they arrive - events = await workflow.send_responses(responses) + events = await workflow.run(responses=responses) pending_requests = _handle_events(events) """ diff --git a/python/samples/getting_started/orchestrations/handoff_with_code_interpreter_file.py b/python/samples/getting_started/orchestrations/handoff_with_code_interpreter_file.py index d6b335e15c..1547b7dc93 100644 --- a/python/samples/getting_started/orchestrations/handoff_with_code_interpreter_file.py +++ b/python/samples/getting_started/orchestrations/handoff_with_code_interpreter_file.py @@ -200,7 +200,7 @@ async def main() -> None: print(f"\nUser: {user_input}") responses = {request.request_id: HandoffAgentUserRequest.create_response(user_input)} - events = await _drain(workflow.send_responses_streaming(responses)) + events = await _drain(workflow.run(stream=True, responses=responses)) requests, file_ids = _handle_events(events) all_file_ids.extend(file_ids) input_index += 1 diff --git a/python/samples/getting_started/orchestrations/magentic_checkpoint.py b/python/samples/getting_started/orchestrations/magentic_checkpoint.py index 08b233661b..67fc13187f 100644 --- a/python/samples/getting_started/orchestrations/magentic_checkpoint.py +++ b/python/samples/getting_started/orchestrations/magentic_checkpoint.py @@ -31,7 +31,7 @@ must keep stable IDs so the checkpoint state aligns when we rebuild the graph. 2. **Executor snapshotting** - checkpoints capture the pending plan-review request map, at superstep boundaries. -3. **Resume with responses** - `Workflow.send_responses_streaming` accepts a +3. **Resume with responses** - `Workflow.run(responses=...)` accepts a `responses` mapping so we can inject the stored human reply during restoration. Prerequisites: @@ -159,7 +159,7 @@ async def main() -> None: # Supply the approval and continue to run to completion. final_event: WorkflowOutputEvent | None = None - async for event in resumed_workflow.send_responses_streaming({request_info_event.request_id: approval}): + async for event in resumed_workflow.run(stream=True, responses={request_info_event.request_id: approval}): if isinstance(event, WorkflowOutputEvent): final_event = event diff --git a/python/samples/getting_started/orchestrations/magentic_human_plan_review.py b/python/samples/getting_started/orchestrations/magentic_human_plan_review.py index 9af07ae13f..6fcf05136f 100644 --- a/python/samples/getting_started/orchestrations/magentic_human_plan_review.py +++ b/python/samples/getting_started/orchestrations/magentic_human_plan_review.py @@ -141,14 +141,14 @@ async def main() -> None: print("=" * 60) # Initiate the first run of the workflow. - # Runs are not isolated; state is preserved across multiple calls to run or send_responses_streaming. + # Runs are not isolated; state is preserved across multiple calls to run. stream = workflow.run(task, stream=True) pending_responses = await process_event_stream(stream) while pending_responses is not None: # Run the workflow until there is no more human feedback to provide, # in which case this workflow completes. - stream = workflow.send_responses_streaming(pending_responses) + stream = workflow.run(stream=True, responses=pending_responses) pending_responses = await process_event_stream(stream) diff --git a/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py b/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py index 4b7eabf9ba..8f7de91419 100644 --- a/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py +++ b/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py @@ -283,7 +283,7 @@ async def main() -> None: ) initial_run = False elif pending_responses is not None: - stream = workflow.send_responses_streaming(pending_responses) + stream = workflow.run(stream=True, responses=pending_responses) pending_responses = None else: break diff --git a/python/samples/getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py b/python/samples/getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py index 1f7f5659af..1342587091 100644 --- a/python/samples/getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py +++ b/python/samples/getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py @@ -245,7 +245,7 @@ async def run_interactive_session( while True: if responses: - event_stream = workflow.send_responses_streaming(responses) + event_stream = workflow.run(stream=True, responses=responses) requests.clear() responses = None else: diff --git a/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py b/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py index 5ab80e37ee..9126fad4e8 100644 --- a/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py +++ b/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py @@ -40,8 +40,8 @@ Pattern: - Step 1: workflow.run(checkpoint_id=..., stream=True) to restore checkpoint and pending requests. -- Step 2: workflow.send_responses_streaming(responses) to supply human replies and approvals. -- Two-step approach is required because send_responses_streaming does not accept checkpoint_id. +- Step 2: workflow.run(stream=True, responses=responses) to supply human replies and approvals. +- Two-step approach is required because run(responses=...) does not accept checkpoint_id. Prerequisites: - Azure CLI authentication (az login). @@ -230,9 +230,9 @@ async def resume_with_responses( Two-step resume pattern (answers customer questions and tool approvals): Step 1: Restore checkpoint to load pending requests into workflow state - Step 2: Send user responses using send_responses_streaming + Step 2: Send user responses using run(stream=True, responses=responses) - This is the current pattern required because send_responses_streaming + This is the current pattern required because run(responses=...) doesn't accept a checkpoint_id parameter. """ print(f"\n{'=' * 60}") @@ -277,7 +277,7 @@ async def resume_with_responses( new_pending_requests: list[RequestInfoEvent] = [] - async for event in workflow.send_responses_streaming(responses): + async for event in workflow.run(stream=True, responses=responses): if isinstance(event, WorkflowStatusEvent): print(f"[Status] {event.state}") diff --git a/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py b/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py index 6f8567d02c..792c8fe9e6 100644 --- a/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py +++ b/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py @@ -376,7 +376,7 @@ async def main() -> None: approval_response = "approve" output_event: WorkflowOutputEvent | None = None - async for event in workflow2.send_responses_streaming({request_info_event.request_id: approval_response}): + async for event in workflow2.run(stream=True, responses={request_info_event.request_id: approval_response}): if isinstance(event, WorkflowOutputEvent): output_event = event diff --git a/python/samples/getting_started/workflows/composition/sub_workflow_parallel_requests.py b/python/samples/getting_started/workflows/composition/sub_workflow_parallel_requests.py index 0959f591f0..b68f078825 100644 --- a/python/samples/getting_started/workflows/composition/sub_workflow_parallel_requests.py +++ b/python/samples/getting_started/workflows/composition/sub_workflow_parallel_requests.py @@ -347,7 +347,7 @@ async def main() -> None: else: print(f"Unknown request info event data type: {type(event.data)}") - run_result = await main_workflow.send_responses(responses) + run_result = await main_workflow.run(responses=responses) outputs = run_result.get_outputs() if outputs: diff --git a/python/samples/getting_started/workflows/declarative/customer_support/main.py b/python/samples/getting_started/workflows/declarative/customer_support/main.py index 685ff905d5..79a37a1f9d 100644 --- a/python/samples/getting_started/workflows/declarative/customer_support/main.py +++ b/python/samples/getting_started/workflows/declarative/customer_support/main.py @@ -252,7 +252,7 @@ async def main() -> None: # Continue workflow with user response print(f"\n{YELLOW}WORKFLOW:{RESET} Restore\n") response = AgentExternalInputResponse(user_input=user_input) - stream = workflow.send_responses_streaming({pending_request_id: response}) + stream = workflow.run(stream=True, responses={pending_request_id: response}) pending_request_id = None else: # Start workflow diff --git a/python/samples/getting_started/workflows/declarative/function_tools/main.py b/python/samples/getting_started/workflows/declarative/function_tools/main.py index 0fd8dce643..e481d358b8 100644 --- a/python/samples/getting_started/workflows/declarative/function_tools/main.py +++ b/python/samples/getting_started/workflows/declarative/function_tools/main.py @@ -90,7 +90,7 @@ async def main(): while True: if pending_request_id: response = ExternalInputResponse(user_input=user_input) - stream = workflow.send_responses_streaming({pending_request_id: response}) + stream = workflow.run(stream=True, responses={pending_request_id: response}) else: stream = workflow.run({"userInput": user_input}, stream=True) diff --git a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_HITL.py b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_HITL.py index 39b4d72086..15ce31c02d 100644 --- a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_HITL.py +++ b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_HITL.py @@ -203,7 +203,7 @@ async def main() -> None: ) # Initiate the first run of the workflow. - # Runs are not isolated; state is preserved across multiple calls to run or send_responses_streaming. + # Runs are not isolated; state is preserved across multiple calls to run. stream = workflow.run( "Create a short launch blurb for the LumenX desk lamp. Emphasize adjustability and warm lighting.", stream=True, @@ -213,7 +213,7 @@ async def main() -> None: while pending_responses is not None: # Run the workflow until there is no more human feedback to provide, # in which case this workflow completes. - stream = workflow.send_responses_streaming(pending_responses) + stream = workflow.run(stream=True, responses=pending_responses) pending_responses = await process_event_stream(stream) print("\nWorkflow complete.") diff --git a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py index 8f73b26438..e06ef98392 100644 --- a/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py +++ b/python/samples/getting_started/workflows/human-in-the-loop/agents_with_approval_requests.py @@ -249,7 +249,7 @@ async def main() -> None: ) # Initiate the first run of the workflow. - # Runs are not isolated; state is preserved across multiple calls to run or send_responses_streaming. + # Runs are not isolated; state is preserved across multiple calls to run. events = await workflow.run(incoming_email) request_info_events = events.get_request_info_events() @@ -276,7 +276,7 @@ async def main() -> None: print("Performing automatic approval for demo purposes...") responses[request_info_event.request_id] = data.to_function_approval_response(approved=True) - events = await workflow.send_responses(responses) + events = await workflow.run(responses=responses) request_info_events = events.get_request_info_events() # The output should only come from conclude_workflow executor and it's a single string diff --git a/python/samples/getting_started/workflows/human-in-the-loop/concurrent_request_info.py b/python/samples/getting_started/workflows/human-in-the-loop/concurrent_request_info.py index 178fe028a5..ec717c17aa 100644 --- a/python/samples/getting_started/workflows/human-in-the-loop/concurrent_request_info.py +++ b/python/samples/getting_started/workflows/human-in-the-loop/concurrent_request_info.py @@ -186,14 +186,14 @@ async def main() -> None: ) # Initiate the first run of the workflow. - # Runs are not isolated; state is preserved across multiple calls to run or send_responses_streaming. + # Runs are not isolated; state is preserved across multiple calls to run. stream = workflow.run("Analyze the impact of large language models on software development.", stream=True) pending_responses = await process_event_stream(stream) while pending_responses is not None: # Run the workflow until there is no more human feedback to provide, # in which case this workflow completes. - stream = workflow.send_responses_streaming(pending_responses) + stream = workflow.run(stream=True, responses=pending_responses) pending_responses = await process_event_stream(stream) diff --git a/python/samples/getting_started/workflows/human-in-the-loop/group_chat_request_info.py b/python/samples/getting_started/workflows/human-in-the-loop/group_chat_request_info.py index fb51c5b530..b2a9f2b85c 100644 --- a/python/samples/getting_started/workflows/human-in-the-loop/group_chat_request_info.py +++ b/python/samples/getting_started/workflows/human-in-the-loop/group_chat_request_info.py @@ -149,7 +149,7 @@ async def main() -> None: ) # Initiate the first run of the workflow. - # Runs are not isolated; state is preserved across multiple calls to run or send_responses_streaming. + # Runs are not isolated; state is preserved across multiple calls to run. stream = workflow.run( "Discuss how our team should approach adopting AI tools for productivity. " "Consider benefits, risks, and implementation strategies.", @@ -160,7 +160,7 @@ async def main() -> None: while pending_responses is not None: # Run the workflow until there is no more human feedback to provide, # in which case this workflow completes. - stream = workflow.send_responses_streaming(pending_responses) + stream = workflow.run(stream=True, responses=pending_responses) pending_responses = await process_event_stream(stream) diff --git a/python/samples/getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py b/python/samples/getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py index ef03d7bd05..26c4cf527d 100644 --- a/python/samples/getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py +++ b/python/samples/getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py @@ -31,7 +31,7 @@ Purpose: Show how to integrate a human step in the middle of an LLM workflow by using -`request_info` and `send_responses_streaming`. +`request_info` and `run(responses=..., stream=True)`. Demonstrate: - Alternating turns between an AgentExecutor and a human, driven by events. @@ -44,11 +44,11 @@ - Basic familiarity with WorkflowBuilder, executors, edges, events, and streaming runs. """ -# How human-in-the-loop is achieved via `request_info` and `send_responses_streaming`: +# How human-in-the-loop is achieved via `request_info` and `run(responses=..., stream=True)`: # - An executor (TurnManager) calls `ctx.request_info` with a payload (HumanFeedbackRequest). # - The workflow run pauses and emits a RequestInfoEvent with the payload and the request_id. # - The application captures the event, prompts the user, and collects replies. -# - The application calls `send_responses_streaming` with a map of request_ids to replies. +# - The application calls `run(stream=True, responses=...)` with a map of request_ids to replies. # - The workflow resumes, and the response is delivered to the executor method decorated with @response_handler. # - The executor can then continue the workflow, e.g., by sending a new message to the agent. @@ -205,14 +205,14 @@ async def main() -> None: ).build() # Initiate the first run of the workflow. - # Runs are not isolated; state is preserved across multiple calls to run or send_responses_streaming. + # Runs are not isolated; state is preserved across multiple calls to run. stream = workflow.run("start", stream=True) pending_responses = await process_event_stream(stream) while pending_responses is not None: # Run the workflow until there is no more human feedback to provide, # in which case this workflow completes. - stream = workflow.send_responses_streaming(pending_responses) + stream = workflow.run(stream=True, responses=pending_responses) pending_responses = await process_event_stream(stream) """ diff --git a/python/samples/getting_started/workflows/human-in-the-loop/sequential_request_info.py b/python/samples/getting_started/workflows/human-in-the-loop/sequential_request_info.py index bc9eff94f9..eb6dee31f7 100644 --- a/python/samples/getting_started/workflows/human-in-the-loop/sequential_request_info.py +++ b/python/samples/getting_started/workflows/human-in-the-loop/sequential_request_info.py @@ -14,7 +14,7 @@ Demonstrate: - Configuring request info with `.with_request_info()` - Handling RequestInfoEvent with AgentInputRequest data -- Injecting responses back into the workflow via send_responses_streaming +- Injecting responses back into the workflow via run(responses=..., stream=True) Prerequisites: - Azure OpenAI configured for AzureOpenAIChatClient with required environment variables @@ -124,14 +124,14 @@ async def main() -> None: ) # Initiate the first run of the workflow. - # Runs are not isolated; state is preserved across multiple calls to run or send_responses_streaming. + # Runs are not isolated; state is preserved across multiple calls to run. stream = workflow.run("Write a brief introduction to artificial intelligence.", stream=True) pending_responses = await process_event_stream(stream) while pending_responses is not None: # Run the workflow until there is no more human feedback to provide, # in which case this workflow completes. - stream = workflow.send_responses_streaming(pending_responses) + stream = workflow.run(stream=True, responses=pending_responses) pending_responses = await process_event_stream(stream) diff --git a/python/samples/getting_started/workflows/orchestration/magentic_human_plan_review.py b/python/samples/getting_started/workflows/orchestration/magentic_human_plan_review.py index aa7b9b5f8c..cf1702e9b3 100644 --- a/python/samples/getting_started/workflows/orchestration/magentic_human_plan_review.py +++ b/python/samples/getting_started/workflows/orchestration/magentic_human_plan_review.py @@ -85,7 +85,7 @@ async def main() -> None: while not output_event: if pending_responses is not None: - stream = workflow.send_responses_streaming(pending_responses) + stream = workflow.run(stream=True, responses=pending_responses) else: stream = workflow.run(task, stream=True) diff --git a/python/samples/getting_started/workflows/tool-approval/concurrent_builder_tool_approval.py b/python/samples/getting_started/workflows/tool-approval/concurrent_builder_tool_approval.py index cfb425ae7e..a6b32ae6fd 100644 --- a/python/samples/getting_started/workflows/tool-approval/concurrent_builder_tool_approval.py +++ b/python/samples/getting_started/workflows/tool-approval/concurrent_builder_tool_approval.py @@ -157,7 +157,7 @@ async def main() -> None: print("-" * 60) # Initiate the first run of the workflow. - # Runs are not isolated; state is preserved across multiple calls to run or send_responses_streaming. + # Runs are not isolated; state is preserved across multiple calls to run. stream = workflow.run( "Manage my portfolio. Use a max of 5000 dollars to adjust my position using " "your best judgment based on market sentiment. No need to confirm trades with me.", @@ -168,7 +168,7 @@ async def main() -> None: while pending_responses is not None: # Run the workflow until there is no more human feedback to provide, # in which case this workflow completes. - stream = workflow.send_responses_streaming(pending_responses) + stream = workflow.run(stream=True, responses=pending_responses) pending_responses = await process_event_stream(stream) """ diff --git a/python/samples/getting_started/workflows/tool-approval/group_chat_builder_tool_approval.py b/python/samples/getting_started/workflows/tool-approval/group_chat_builder_tool_approval.py index eeee1abfb2..1fe9f7dc4e 100644 --- a/python/samples/getting_started/workflows/tool-approval/group_chat_builder_tool_approval.py +++ b/python/samples/getting_started/workflows/tool-approval/group_chat_builder_tool_approval.py @@ -168,7 +168,7 @@ async def main() -> None: print("-" * 60) # Initiate the first run of the workflow. - # Runs are not isolated; state is preserved across multiple calls to run or send_responses_streaming. + # Runs are not isolated; state is preserved across multiple calls to run. stream = workflow.run( "We need to deploy version 2.4.0 to production. Please coordinate the deployment.", stream=True ) @@ -177,7 +177,7 @@ async def main() -> None: while pending_responses is not None: # Run the workflow until there is no more human feedback to provide, # in which case this workflow completes. - stream = workflow.send_responses_streaming(pending_responses) + stream = workflow.run(stream=True, responses=pending_responses) pending_responses = await process_event_stream(stream) """ diff --git a/python/samples/getting_started/workflows/tool-approval/sequential_builder_tool_approval.py b/python/samples/getting_started/workflows/tool-approval/sequential_builder_tool_approval.py index d0e234e1db..2e328c0e75 100644 --- a/python/samples/getting_started/workflows/tool-approval/sequential_builder_tool_approval.py +++ b/python/samples/getting_started/workflows/tool-approval/sequential_builder_tool_approval.py @@ -37,7 +37,7 @@ Demonstrate: - Using @tool(approval_mode="always_require") for sensitive operations. - Handling RequestInfoEvent with function_approval_request Content in sequential workflows. -- Resuming workflow execution after approval via send_responses_streaming. +- Resuming workflow execution after approval via run(responses=..., stream=True). Prerequisites: - OpenAI or Azure OpenAI configured with the required environment variables. @@ -118,7 +118,7 @@ async def main() -> None: print("-" * 60) # Initiate the first run of the workflow. - # Runs are not isolated; state is preserved across multiple calls to run or send_responses_streaming. + # Runs are not isolated; state is preserved across multiple calls to run. stream = workflow.run( "Check the schema and then update all orders with status 'pending' to 'processing'", stream=True ) @@ -127,7 +127,7 @@ async def main() -> None: while pending_responses is not None: # Run the workflow until there is no more human feedback to provide, # in which case this workflow completes. - stream = workflow.send_responses_streaming(pending_responses) + stream = workflow.run(stream=True, responses=pending_responses) pending_responses = await process_event_stream(stream) """ diff --git a/python/samples/semantic-kernel-migration/orchestrations/handoff.py b/python/samples/semantic-kernel-migration/orchestrations/handoff.py index f2333c0fb5..a248c3610a 100644 --- a/python/samples/semantic-kernel-migration/orchestrations/handoff.py +++ b/python/samples/semantic-kernel-migration/orchestrations/handoff.py @@ -255,7 +255,7 @@ async def run_agent_framework_example(initial_task: str, scripted_responses: Seq except StopIteration: user_reply = "Thanks, that's all." responses = {request.request_id: user_reply for request in pending} - final_events = await _drain_events(workflow.send_responses_streaming(responses)) + final_events = await _drain_events(workflow.run(stream=True, responses=responses)) pending = _collect_handoff_requests(final_events) conversation = _extract_final_conversation(final_events) From 0e960d2a488a478da439c5de14d33a962921f629 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Fri, 6 Feb 2026 18:50:35 +0900 Subject: [PATCH 2/3] Fixes --- .../_workflows/_typing_utils.py | 48 +++++++++++++++++++ .../agent_framework/_workflows/_workflow.py | 10 ++-- ...ff_with_tool_approval_checkpoint_resume.py | 31 +++++------- 3 files changed, 68 insertions(+), 21 deletions(-) diff --git a/python/packages/core/agent_framework/_workflows/_typing_utils.py b/python/packages/core/agent_framework/_workflows/_typing_utils.py index ca1e358546..5bff0900b6 100644 --- a/python/packages/core/agent_framework/_workflows/_typing_utils.py +++ b/python/packages/core/agent_framework/_workflows/_typing_utils.py @@ -177,6 +177,54 @@ def is_instance_of(data: Any, target_type: type | UnionType | Any) -> bool: return isinstance(data, target_type) +def try_coerce_to_type(data: Any, target_type: type | UnionType | Any) -> Any: + """Try to coerce data to the target type. + + Attempts lightweight type coercion for common cases where raw data + (e.g., from JSON deserialization) needs to be converted to the expected type. + + Returns the coerced value if successful, or the original value if coercion + is not needed or not possible. + + Args: + data: The data to coerce. + target_type: The type to coerce to. + + Returns: + The coerced value, or the original value if coercion fails. + """ + # If already the right type, return as-is + if is_instance_of(data, target_type): + return data + + # Can't coerce to non-concrete targets (Union, generic, etc.) + if not isinstance(target_type, type): + return data + + # int -> float (JSON integers for float fields) + if isinstance(data, int) and target_type is float: + return float(data) + + # dict -> dataclass + if isinstance(data, dict): + from dataclasses import is_dataclass + + if is_dataclass(target_type): + try: + return target_type(**data) + except (TypeError, ValueError): + return data + + # dict -> Pydantic model + if hasattr(target_type, "model_validate"): + try: + return target_type.model_validate(data) + except Exception: + return data + + return data + + def serialize_type(t: type) -> str: """Serialize a type to a string. diff --git a/python/packages/core/agent_framework/_workflows/_workflow.py b/python/packages/core/agent_framework/_workflows/_workflow.py index ff690aedaa..e8d7f6f155 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow.py +++ b/python/packages/core/agent_framework/_workflows/_workflow.py @@ -32,7 +32,7 @@ from ._runner import Runner from ._runner_context import RunnerContext from ._state import State -from ._typing_utils import is_instance_of +from ._typing_utils import is_instance_of, try_coerce_to_type logger = logging.getLogger(__name__) @@ -685,20 +685,24 @@ async def _send_responses_internal(self, responses: dict[str, Any]) -> None: if not pending_requests: raise RuntimeError("No pending requests found in workflow context.") - # Validate responses against pending requests + # Validate and coerce responses against pending requests + coerced_responses: dict[str, Any] = {} for request_id, response in responses.items(): if request_id not in pending_requests: raise ValueError(f"Response provided for unknown request ID: {request_id}") pending_request = pending_requests[request_id] + # Try to coerce raw values (e.g., dicts from JSON) to the expected type + response = try_coerce_to_type(response, pending_request.response_type) if not is_instance_of(response, pending_request.response_type): raise ValueError( f"Response type mismatch for request ID {request_id}: " f"expected {pending_request.response_type}, got {type(response)}" ) + coerced_responses[request_id] = response await asyncio.gather(*[ self._runner_context.send_request_info_response(request_id, response) - for request_id, response in responses.items() + for request_id, response in coerced_responses.items() ]) def _get_executor_by_id(self, executor_id: str) -> Executor: diff --git a/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py b/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py index e7dc68596e..88d766e2da 100644 --- a/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py +++ b/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py @@ -23,22 +23,21 @@ """ Sample: Handoff Workflow with Tool Approvals + Checkpoint Resume -Demonstrates the two-step pattern for resuming a handoff workflow from a checkpoint -while handling both HandoffAgentUserRequest prompts and function approval request Content -for tool calls (e.g., submit_refund). +Demonstrates resuming a handoff workflow from a checkpoint while handling both +HandoffAgentUserRequest prompts and function approval request Content for tool calls +(e.g., submit_refund). Scenario: 1. User starts a conversation with the workflow. 2. Agents may emit user input requests or tool approval requests. 3. Workflow writes a checkpoint capturing pending requests and pauses. 4. Process can exit/restart. -5. On resume: Load the checkpoint, surface pending approvals/user prompts, and provide responses. +5. On resume: Restore checkpoint and provide responses in a single call. 6. Workflow continues from the saved state. Pattern: -- Step 1: workflow.run(checkpoint_id=..., stream=True) to restore checkpoint and pending requests. -- Step 2: workflow.run(stream=True, responses=responses) to supply human replies and approvals. -- Two-step approach is required because run(responses=...) does not accept checkpoint_id. +- workflow.run(stream=True, checkpoint_id=..., responses=responses) restores the checkpoint + and sends human replies/approvals in a single call. Prerequisites: - Azure CLI authentication (az login). @@ -228,13 +227,10 @@ async def resume_with_responses( approve_tools: bool | None = None, ) -> tuple[list[WorkflowEvent], str | None]: """ - Two-step resume pattern (answers customer questions and tool approvals): + Resume from checkpoint and send responses in a single call. - Step 1: Restore checkpoint to load pending requests into workflow state - Step 2: Send user responses using run(stream=True, responses=responses) - - This is the current pattern required because run(responses=...) - doesn't accept a checkpoint_id parameter. + Uses workflow.run(stream=True, checkpoint_id=..., responses=...) to restore + the checkpoint and deliver human replies/approvals atomically. """ print(f"\n{'=' * 60}") print("RESUMING WORKFLOW WITH HUMAN INPUT") @@ -253,10 +249,9 @@ async def resume_with_responses( checkpoints.sort(key=lambda cp: cp.timestamp, reverse=True) latest_checkpoint = checkpoints[0] - print(f"Step 1: Restoring checkpoint {latest_checkpoint.checkpoint_id}") + print(f"Restoring checkpoint {latest_checkpoint.checkpoint_id}") - # Step 1: Restore the checkpoint to load pending requests into memory - # The checkpoint restoration re-emits pending request_info events + # First, restore checkpoint to discover pending requests restored_requests: list[WorkflowEvent] = [] async for event in workflow.run(checkpoint_id=latest_checkpoint.checkpoint_id, stream=True): # type: ignore[attr-defined] if event.type == "request_info": @@ -274,7 +269,7 @@ async def resume_with_responses( user_response=user_response, approve_tools=approve_tools, ) - print(f"Step 2: Sending responses for {len(responses)} request(s)") + print(f"Sending responses for {len(responses)} request(s)") new_pending_requests: list[WorkflowEvent] = [] @@ -309,7 +304,7 @@ async def main() -> None: This sample shows: 1. Starting a workflow and getting a HandoffAgentUserRequest 2. Pausing (checkpoint is saved automatically) - 3. Resuming from checkpoint with a user response or tool approval (two-step pattern) + 3. Resuming from checkpoint with a user response or tool approval 4. Continuing the conversation until completion """ From c7c0ae268ba8907633630b93e33a08246301ac4f Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Fri, 6 Feb 2026 19:04:08 +0900 Subject: [PATCH 3/3] Address copilot feedback --- .../core/tests/workflow/test_typing_utils.py | 70 +++++++++++++++++++ ...ff_with_tool_approval_checkpoint_resume.py | 18 +++-- 2 files changed, 82 insertions(+), 6 deletions(-) diff --git a/python/packages/core/tests/workflow/test_typing_utils.py b/python/packages/core/tests/workflow/test_typing_utils.py index 19973276f5..ab483e05e9 100644 --- a/python/packages/core/tests/workflow/test_typing_utils.py +++ b/python/packages/core/tests/workflow/test_typing_utils.py @@ -13,6 +13,7 @@ normalize_type_to_list, resolve_type_annotation, serialize_type, + try_coerce_to_type, ) # region: normalize_type_to_list tests @@ -420,3 +421,72 @@ class Message: # Incompatible nested structure incompatible_target = list[dict[Union[str, bytes], int]] assert not is_type_compatible(source, incompatible_target) + + +# region: try_coerce_to_type tests + + +def test_coerce_already_correct_type() -> None: + """Values already matching the target type are returned as-is.""" + assert try_coerce_to_type(42, int) == 42 + assert try_coerce_to_type("hello", str) == "hello" + assert try_coerce_to_type(True, bool) is True + + +def test_coerce_int_to_float() -> None: + """JSON integers should be coercible to float.""" + result = try_coerce_to_type(1, float) + assert result == 1.0 + assert isinstance(result, float) + + +def test_coerce_dict_to_dataclass() -> None: + """Dicts (from JSON) should be coercible to dataclasses.""" + + @dataclass + class Point: + x: int + y: int + + result = try_coerce_to_type({"x": 1, "y": 2}, Point) + assert isinstance(result, Point) + assert result.x == 1 + assert result.y == 2 + + +def test_coerce_dict_to_dataclass_bad_keys_returns_original() -> None: + """Dicts with wrong keys should return the original dict, not raise.""" + + @dataclass + class Point: + x: int + y: int + + original = {"a": 1, "b": 2} + result = try_coerce_to_type(original, Point) + assert result is original + + +def test_coerce_non_concrete_target_returns_original() -> None: + """Union and other non-concrete types should return the original value.""" + result = try_coerce_to_type(42, int | str) + assert result == 42 + + result = try_coerce_to_type({"x": 1}, Union[str, int]) + assert result == {"x": 1} + + +def test_coerce_unrelated_types_returns_original() -> None: + """Coercion between unrelated types should return the original value.""" + assert try_coerce_to_type("hello", int) == "hello" + assert try_coerce_to_type(3.14, str) == 3.14 + assert try_coerce_to_type([1, 2], dict) == [1, 2] + + +def test_coerce_any_returns_original() -> None: + """Any target type should accept any value without coercion.""" + assert try_coerce_to_type(42, Any) == 42 + assert try_coerce_to_type({"k": "v"}, Any) == {"k": "v"} + + +# endregion: try_coerce_to_type tests diff --git a/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py b/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py index 88d766e2da..a89a848257 100644 --- a/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py +++ b/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py @@ -32,12 +32,15 @@ 2. Agents may emit user input requests or tool approval requests. 3. Workflow writes a checkpoint capturing pending requests and pauses. 4. Process can exit/restart. -5. On resume: Restore checkpoint and provide responses in a single call. +5. On resume: Restore checkpoint, inspect pending requests, then provide responses. 6. Workflow continues from the saved state. Pattern: -- workflow.run(stream=True, checkpoint_id=..., responses=responses) restores the checkpoint - and sends human replies/approvals in a single call. +- workflow.run(checkpoint_id=..., stream=True) to restore checkpoint and discover pending requests. +- workflow.run(stream=True, responses=responses) to supply human replies and approvals. + (Two steps are needed here because the sample must inspect request types before building responses. + When response payloads are already known, use the single-call form: + workflow.run(stream=True, checkpoint_id=..., responses=responses).) Prerequisites: - Azure CLI authentication (az login). @@ -227,10 +230,13 @@ async def resume_with_responses( approve_tools: bool | None = None, ) -> tuple[list[WorkflowEvent], str | None]: """ - Resume from checkpoint and send responses in a single call. + Resume from checkpoint and send responses. - Uses workflow.run(stream=True, checkpoint_id=..., responses=...) to restore - the checkpoint and deliver human replies/approvals atomically. + Step 1: Restore checkpoint to discover pending request types. + Step 2: Build typed responses and send via workflow.run(responses=...). + + When response payloads are already known, these can be combined into a single + workflow.run(stream=True, checkpoint_id=..., responses=...) call. """ print(f"\n{'=' * 60}") print("RESUMING WORKFLOW WITH HUMAN INPUT")