Skip to content

Python: Improve checkpoint storage API - types, querying, and multi-instance safety #3529

@moonbox3

Description

@moonbox3

Summary

Multiple concerns about the CheckpointStorage protocol including return types, querying capabilities, and behavior when multiple workflow instances write to the same storage.

Files

File Line Concern
_checkpoint.py list_checkpoint_ids Return type should use type alias, IDs may not be consistent
_checkpoint.py list_checkpoints Improve API: add .get_latest(), better error handling
_checkpoint.py after delete_checkpoint Parallel workflow instances writing to same storage

Context

Return Type Consistency:

# TODO: change the return type (via type alias) can be str from functional perpective
# Concrete type
# IDs may not be consitent (str -> tuple[str, str]) or something
# checkpoint_ids (not list)
async def list_checkpoint_ids(self, workflow_id: str | None = None) -> list[CheckpointId]:

The CheckpointId type may evolve (e.g., from str to tuple[str, str]). Need a type alias to insulate users from changes.

API Improvements:

# TODO: improve this API .get_latest(), error, human in the list
# allow users to specify a workflow_id when building?
# workflow_id should be an opaque type
async def list_checkpoints(self, workflow_id: str | None = None) -> list[WorkflowCheckpoint]:

Common use case of "get the latest checkpoint" is not well-supported. Also, workflow_id should be an opaque type.

Multi-Instance Safety:

# TODO: when running two workflow instances in parallel, we could be writing to
# the same storage. How do we know if we are referencing the right checkpoints?

No isolation between concurrent workflow instances sharing the same storage.

Action Items

  • Introduce CheckpointId type alias that can evolve without breaking changes
  • Add get_latest_checkpoint(workflow_id) convenience method
  • Make workflow_id an opaque type
  • Design isolation mechanism for concurrent workflow instances
  • Document expected behavior for shared storage scenarios
  • Add documentation for specifying checkpointing via workflow.run()

Metadata

Metadata

Assignees

Labels

agent orchestrationIssues related to agent orchestrationpythonv1.0Features being tracked for the version 1.0 GAworkflowsRelated to Workflows in agent-framework

Type

Projects

Status

Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions