Python: fix(core): prevent WorkflowExecutor from re-sending answered requests after checkpoint restore#3293
Python: fix(core): prevent WorkflowExecutor from re-sending answered requests after checkpoint restore#3293moonbox3 wants to merge 1 commit intomicrosoft:mainfrom
Conversation
… after checkpoint restore
Python Test Coverage Report •
Python Unit Test Overview
|
||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Pull request overview
This PR fixes a bug where WorkflowExecutor would re-send already-answered RequestInfoEvents after checkpoint restore, causing duplicate requests and incorrect response counts that led to hanging workflows.
Changes:
- Added
_responded_request_idsset to track which requests have been answered - Updated checkpoint save/restore to persist this tracking state
- Added filtering logic in
_process_workflow_result()to skip duplicate requests - Added comprehensive regression tests for checkpoint restore scenarios
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| python/packages/core/agent_framework/_workflows/_workflow_executor.py | Implements the fix by tracking responded request IDs and filtering duplicates after checkpoint restore |
| python/packages/core/tests/workflow/test_sub_workflow.py | Adds comprehensive regression tests that verify the fix and full workflow completion after checkpoint restore |
There was a problem hiding this comment.
I am not sure if this is needed and what the root cause actually is.
When a request is fulfilled, it's taken off record immediately and won't be available in the next checkpoint. My understanding is that a workflow emits a request, the executor captures it and send it out as an event or a message, followed by a checkpoint (A), which will have all the pending requests. When a respond comes back, the request will be taken off record, and processing will begin, followed by the next checkpoint (B).
If the workflow is resumed from checkpoint A, the request will be reemitted.
There was a problem hiding this comment.
The fix is needed because on_checkpoint_restore() re-adds pending requests to the sub-workflow's event queue. When a response arrives, we remove the request from WorkflowExecutor's tracking, but it's still in the sub-workflow's event stream.
So when the sub-workflow continues and makes another request_info() call, result.get_request_info_events() returns both the old (answered) request and the new one, causing duplicate SubWorkflowRequestMessages and incorrect expected_response_count.
_responded_request_ids filters these out. The proper fix would be sub-workflow-level checkpoint tracking (Issue #1614), but this works until then.
There was a problem hiding this comment.
It shouldn't be in the sub workflow's event queue because the event has been emitted. When a checkpoint is created, the event queue should be empty. This is guaranteed by the runner.
There was a problem hiding this comment.
You're right that the event queue is empty at checkpoint time. The issue is on_checkpoint_restore() (lines 519-527) which explicitly re-adds pending requests to the sub-workflow's event queue:
await asyncio.gather(*[
self.workflow._runner_context.add_request_info_event(event)
for event in request_info_events
])When _handle_response() later calls send_responses() on the sub-workflow, run_until_convergence() drains these pre-loop events (lines 88-92 in _runner.py) and they end up in the WorkflowRunResult.
The rehydration is intentional (marked as "temporary solution" with TODO #1614), so we need _responded_request_ids to filter them out.
There was a problem hiding this comment.
I don't see an issue with the flow you're describing here. If the checkpoint contains the pending requests, they should be re-emitted when the checkpoint is loaded.
|
We will handle this in a different manner. |
Motivation and Context
When a sub-workflow managed by
WorkflowExecutorwas resumed from a checkpoint and responded to a pending request, any subsequentrequest_info()calls would cause the already-answered request to be re-sent to the parent workflow alongside the new request. This resulted in duplicate requests and incorrectexpected_response_count, causing workflows to hang or throw errors.WorkflowExecutorre-sent already-answeredRequestInfoEvents after checkpoint restore_responded_request_idsset to track which requests have been answered_process_workflow_result()before sending to parent workflowDescription
Contribution Checklist