-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Closed
Copy link
Labels
agent orchestrationIssues related to agent orchestrationIssues related to agent orchestrationpythonv1.0Features being tracked for the version 1.0 GAFeatures being tracked for the version 1.0 GAworkflowsRelated to Workflows in agent-frameworkRelated to Workflows in agent-framework
Description
Summary
Multiple concerns about the CheckpointStorage protocol including return types, querying capabilities, and behavior when multiple workflow instances write to the same storage.
Files
| File | Line | Concern |
|---|---|---|
_checkpoint.py |
list_checkpoint_ids |
Return type should use type alias, IDs may not be consistent |
_checkpoint.py |
list_checkpoints |
Improve API: add .get_latest(), better error handling |
_checkpoint.py |
after delete_checkpoint |
Parallel workflow instances writing to same storage |
Context
Return Type Consistency:
# TODO: change the return type (via type alias) can be str from functional perpective
# Concrete type
# IDs may not be consitent (str -> tuple[str, str]) or something
# checkpoint_ids (not list)
async def list_checkpoint_ids(self, workflow_id: str | None = None) -> list[CheckpointId]:The CheckpointId type may evolve (e.g., from str to tuple[str, str]). Need a type alias to insulate users from changes.
API Improvements:
# TODO: improve this API .get_latest(), error, human in the list
# allow users to specify a workflow_id when building?
# workflow_id should be an opaque type
async def list_checkpoints(self, workflow_id: str | None = None) -> list[WorkflowCheckpoint]:Common use case of "get the latest checkpoint" is not well-supported. Also, workflow_id should be an opaque type.
Multi-Instance Safety:
# TODO: when running two workflow instances in parallel, we could be writing to
# the same storage. How do we know if we are referencing the right checkpoints?No isolation between concurrent workflow instances sharing the same storage.
Action Items
- Introduce
CheckpointIdtype alias that can evolve without breaking changes - Add
get_latest_checkpoint(workflow_id)convenience method - Make
workflow_idan opaque type - Design isolation mechanism for concurrent workflow instances
- Document expected behavior for shared storage scenarios
- Add documentation for specifying checkpointing via
workflow.run()
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
agent orchestrationIssues related to agent orchestrationIssues related to agent orchestrationpythonv1.0Features being tracked for the version 1.0 GAFeatures being tracked for the version 1.0 GAworkflowsRelated to Workflows in agent-frameworkRelated to Workflows in agent-framework
Type
Projects
Status
Done