From 0ce1d8230f00ecc463068d891a9853cea75e4b47 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Thu, 20 Nov 2025 08:06:38 +0900 Subject: [PATCH 1/7] Handoff sample --- .../checkpoint/HANDOFF_CHECKPOINT_README.md | 120 +++++++ .../checkpoint/handoff_checkpoint_resume.py | 330 ++++++++++++++++++ python/test_checkpoint_encoding.py | 96 +++++ python/test_checkpoint_exact_flow.py | 126 +++++++ 4 files changed, 672 insertions(+) create mode 100644 python/samples/getting_started/workflows/checkpoint/HANDOFF_CHECKPOINT_README.md create mode 100644 python/samples/getting_started/workflows/checkpoint/handoff_checkpoint_resume.py create mode 100644 python/test_checkpoint_encoding.py create mode 100644 python/test_checkpoint_exact_flow.py diff --git a/python/samples/getting_started/workflows/checkpoint/HANDOFF_CHECKPOINT_README.md b/python/samples/getting_started/workflows/checkpoint/HANDOFF_CHECKPOINT_README.md new file mode 100644 index 0000000000..9748e7e151 --- /dev/null +++ b/python/samples/getting_started/workflows/checkpoint/HANDOFF_CHECKPOINT_README.md @@ -0,0 +1,120 @@ +# Handoff Pattern with Checkpointing: Pause/Resume Support + +## Customer Question + +> "When `HandoffUserInputRequest` is generated I need to pause the workflow, send the request to the user interface, and when I get the user message I should resume the workflow sending the response. In order to suspend and resume the workflow I was looking at the checkpoint concept. +> +> Question: How should I resume the workflow providing the user response (for the `HandoffUserInputRequest`) back to the specific workflow instance? +> +> The problem is that `workflow.send_responses_streaming` doesn't have a `checkpoint` param, only `workflow.run_stream` can accept a checkpoint." + +## Answer: YES, This Pattern is Supported + +The Handoff pattern **does support** checkpoint-based pause/resume with user input requests. However, it requires a **two-step pattern** because, as the customer correctly observed, `send_responses_streaming()` doesn't accept a `checkpoint_id` parameter. + +## The Two-Step Resume Pattern + +```python +# Step 1: Restore the checkpoint to load pending requests into workflow state +async for event in workflow.run_stream(checkpoint_id="checkpoint_123"): + if isinstance(event, RequestInfoEvent): + pending_request_ids.append(event.request_id) + break # Stop after checkpoint restoration + +# Step 2: Reset workflow's internal running flags (required) +if hasattr(workflow, "_is_running"): + workflow._is_running = False +if hasattr(workflow, "_runner") and hasattr(workflow._runner, "_running"): + workflow._runner._running = False + +# Step 3: Send user responses +responses = {req_id: user_response for req_id in pending_request_ids} +async for event in workflow.send_responses_streaming(responses): + # Process events... +``` + +## Why This Pattern Works + +1. **Checkpoint Restoration**: `run_stream(checkpoint_id=...)` restores the workflow state including pending `HandoffUserInputRequest` events +2. **In-Memory State**: The checkpoint loads pending requests into the workflow's in-memory state +3. **Response Delivery**: `send_responses_streaming(responses)` sends responses to those restored pending requests +4. **Stateless HTTP Compatible**: This pattern works for stateless HTTP scenarios where the workflow instance doesn't persist between requests + +## Complete Working Sample + +See: `handoff_checkpoint_resume.py` + +This sample demonstrates: +- Starting a handoff workflow +- Receiving a `HandoffUserInputRequest` +- Pausing (checkpoint saved automatically) +- **Simulating process restart** (creating new workflow instance) +- Resuming from checkpoint with user response (two-step pattern) +- Continuing the conversation + +## Key Architectural Points + +### Why Not a Single `send_responses_streaming(responses, checkpoint_id)` Call? + +The current architecture separates concerns: +- `run_stream(checkpoint_id)` - State restoration (loading checkpoints) +- `send_responses_streaming(responses)` - Response delivery (workflow execution) + +This separation actually enables the pattern to work correctly because: +1. Checkpoint restoration must happen first to populate pending requests +2. Response validation occurs against the restored pending requests +3. The workflow must be in a specific internal state before accepting responses + +### Comparison to Other Checkpoint Samples + +Unlike `checkpoint_with_human_in_the_loop.py` which uses a simple request/response executor, the Handoff pattern: +- Uses `HandoffUserInputRequest` (instead of custom request types) +- Manages conversation state automatically +- Handles multi-agent routing +- Requires the two-step pattern for stateless scenarios + +## Implementation Note from DevUI + +The DevUI package uses this exact pattern for stateless HTTP scenarios: + +```python +# From agent_framework_devui/_executor.py +# Step 1: Restore checkpoint +async for _event in workflow.run_stream(checkpoint_id=checkpoint_id, checkpoint_storage=storage): + restored = True + break # Stop immediately after restoration + +# Step 2: Reset flags +if hasattr(workflow, "_is_running"): + workflow._is_running = False +if hasattr(workflow, "_runner") and hasattr(workflow._runner, "_running"): + workflow._runner._running = False + +# Step 3: Send responses +async for event in workflow.send_responses_streaming(responses): + # Process events... +``` + +## Future Enhancement Consideration + +A potential framework enhancement could provide: +```python +# Hypothetical future API (not currently supported) +async for event in workflow.run_stream( + checkpoint_id="checkpoint_123", + responses={"request_id": "user response"} +): + # Combined checkpoint restoration + response delivery +``` + +However, the current two-step pattern is the supported and working approach. + +## Summary + +**YES** - The Handoff pattern supports checkpoint-based pause/resume with `HandoffUserInputRequest`. + +**Pattern**: Use the two-step approach: +1. `workflow.run_stream(checkpoint_id=...)` +2. `workflow.send_responses_streaming(responses)` + +This is the documented and supported pattern for stateless scenarios where workflow instances don't persist between requests. diff --git a/python/samples/getting_started/workflows/checkpoint/handoff_checkpoint_resume.py b/python/samples/getting_started/workflows/checkpoint/handoff_checkpoint_resume.py new file mode 100644 index 0000000000..02b1ae49ba --- /dev/null +++ b/python/samples/getting_started/workflows/checkpoint/handoff_checkpoint_resume.py @@ -0,0 +1,330 @@ +# Copyright (c) Microsoft. All rights reserved. + +""" +Sample: Handoff Workflow with Checkpoint-Based Pause/Resume + +Demonstrates the two-step pattern for resuming a handoff workflow from a checkpoint +and providing user responses to HandoffUserInputRequest. + +Scenario: +1. User starts a conversation with the workflow +2. Workflow requests user input (HandoffUserInputRequest is emitted) +3. Workflow pauses and saves a checkpoint +4. Process can exit/restart +5. On resume: Load checkpoint + provide user response +6. Workflow continues from where it left off + +Pattern: +- Step 1: workflow.run_stream(checkpoint_id=...) to restore checkpoint +- Step 2: workflow.send_responses_streaming(responses) to provide user input +- This two-step approach is required because send_responses_streaming doesn't accept checkpoint_id + +Prerequisites: +- Azure CLI authentication (az login) +- Environment variables configured for AzureOpenAIChatClient +""" + +import asyncio +import logging +from pathlib import Path +from typing import cast + +from agent_framework import ( + ChatAgent, + ChatMessage, + FileCheckpointStorage, + HandoffBuilder, + HandoffUserInputRequest, + RequestInfoEvent, + Workflow, + WorkflowOutputEvent, + WorkflowStatusEvent, +) +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential + +CHECKPOINT_DIR = Path(__file__).parent / "tmp" / "handoff_checkpoints" +CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True) + + +def create_agents(client: AzureOpenAIChatClient) -> tuple[ChatAgent, ChatAgent, ChatAgent]: + """Create a simple handoff scenario: triage, refund, and order specialists.""" + + triage = client.create_agent( + name="triage_agent", + instructions=( + "You are a customer service triage agent. Listen to customer issues and determine " + "if they need refund help or order tracking. Use handoff_to_refund_agent or " + "handoff_to_order_agent to transfer them." + ), + ) + + refund = client.create_agent( + name="refund_agent", + instructions=( + "You are a refund specialist. Help customers with refund requests. " + "Be empathetic and ask for order numbers if not provided." + ), + ) + + order = client.create_agent( + name="order_agent", + instructions=( + "You are an order tracking specialist. Help customers track their orders. " + "Ask for order numbers and provide shipping updates." + ), + ) + + return triage, refund, order + + +def create_workflow(checkpoint_storage: FileCheckpointStorage) -> tuple[Workflow, ChatAgent, ChatAgent, ChatAgent]: + """Build the handoff workflow with checkpointing enabled.""" + + client = AzureOpenAIChatClient(credential=AzureCliCredential()) + triage, refund, order = create_agents(client) + + workflow = ( + HandoffBuilder( + name="checkpoint_handoff_demo", + participants=[triage, refund, order], + ) + .set_coordinator("triage_agent") + .with_checkpointing(checkpoint_storage) + .with_termination_condition( + # Terminate after 5 user messages for this demo + lambda conv: sum(1 for msg in conv if msg.role.value == "user") >= 5 + ) + .build() + ) + + return workflow, triage, refund, order + + +async def run_until_user_input_needed( + workflow: Workflow, + initial_message: str | None = None, + checkpoint_id: str | None = None, +) -> tuple[list[RequestInfoEvent], str | None]: + """ + Run the workflow until it needs user input or completes. + + Returns: + Tuple of (pending_requests, checkpoint_id_to_use_for_resume) + """ + pending_requests: list[RequestInfoEvent] = [] + latest_checkpoint_id: str | None = checkpoint_id + + if initial_message: + print(f"\nStarting workflow with: {initial_message}\n") + event_stream = workflow.run_stream(message=initial_message) # type: ignore[attr-defined] + elif checkpoint_id: + print(f"\nResuming workflow from checkpoint: {checkpoint_id}\n") + event_stream = workflow.run_stream(checkpoint_id=checkpoint_id) # type: ignore[attr-defined] + else: + raise ValueError("Must provide either initial_message or checkpoint_id") + + async for event in event_stream: + if isinstance(event, WorkflowStatusEvent): + print(f"[Status] {event.state}") + + elif isinstance(event, RequestInfoEvent): + if isinstance(event.data, HandoffUserInputRequest): + print(f"\n{'=' * 60}") + print("WORKFLOW PAUSED - User input needed") + print(f"Request ID: {event.request_id}") + print(f"Awaiting agent: {event.data.awaiting_agent_id}") + print(f"Prompt: {event.data.prompt}") + + # Print conversation history + print("\nConversation so far:") + for msg in event.data.conversation[-3:]: # Show last 3 messages + author = msg.author_name or msg.role.value + print(f" {author}: {msg.text[:80]}...") + + print(f"{'=' * 60}\n") + pending_requests.append(event) + + elif isinstance(event, WorkflowOutputEvent): + print("\n[Workflow Completed]") + if event.data: + print(f"Final conversation length: {len(event.data)} messages") + return [], None + + # Workflow paused with pending requests + # The latest checkpoint was created at the end of the last superstep + # We'll use the checkpoint storage to find it + return pending_requests, latest_checkpoint_id + + +async def resume_with_response( + workflow: Workflow, + checkpoint_storage: FileCheckpointStorage, + user_response: str, +) -> tuple[list[RequestInfoEvent], str | None]: + """ + Two-step resume pattern (answers customer's question): + + Step 1: Restore checkpoint to load pending requests into workflow state + Step 2: Send user responses using send_responses_streaming + + This is the current pattern required because send_responses_streaming + doesn't accept a checkpoint_id parameter. + """ + print(f"\n{'=' * 60}") + print("RESUMING WORKFLOW WITH USER RESPONSE") + print(f"User says: {user_response}") + print(f"{'=' * 60}\n") + + # Get the latest checkpoint + checkpoints = await checkpoint_storage.list_checkpoints() + if not checkpoints: + raise RuntimeError("No checkpoints found to resume from") + + # Sort by timestamp to get latest + checkpoints.sort(key=lambda cp: cp.timestamp, reverse=True) + latest_checkpoint = checkpoints[0] + + print(f"Step 1: Restoring checkpoint {latest_checkpoint.checkpoint_id}") + + # Step 1: Restore the checkpoint to load pending requests into memory + # The checkpoint restoration re-emits pending RequestInfoEvents + pending_request_ids: list[str] = [] + async for event in workflow.run_stream(checkpoint_id=latest_checkpoint.checkpoint_id): # type: ignore[attr-defined] + if isinstance(event, RequestInfoEvent) and isinstance(event.data, HandoffUserInputRequest): + pending_request_ids.append(event.request_id) + print(f"Found pending request: {event.request_id}") + + if not pending_request_ids: + raise RuntimeError("No pending requests found after checkpoint restoration") + + print(f"Step 2: Sending user response for {len(pending_request_ids)} request(s)") + + # Step 2: Send the user's response + responses = {req_id: user_response for req_id in pending_request_ids} + + new_pending_requests: list[RequestInfoEvent] = [] + + async for event in workflow.send_responses_streaming(responses): + if isinstance(event, WorkflowStatusEvent): + print(f"[Status] {event.state}") + + elif isinstance(event, WorkflowOutputEvent): + # Workflow completed or paused - show the conversation + print("\n[Workflow Output Event - Conversation Update]") + if event.data and isinstance(event.data, list): + # Cast event.data to list[ChatMessage] for type checking + conversation = cast(list[ChatMessage], event.data) # type: ignore + for msg in conversation[-3:]: # Show last 3 messages + if isinstance(msg, ChatMessage): + author = msg.author_name or msg.role.value + text = msg.text[:100] + "..." if len(msg.text) > 100 else msg.text + print(f" {author}: {text}") + + elif isinstance(event, RequestInfoEvent): + if isinstance(event.data, HandoffUserInputRequest): + print(f"\n{'=' * 60}") + print("WORKFLOW PAUSED AGAIN - User input needed") + print(f"Request ID: {event.request_id}") + print(f"Awaiting agent: {event.data.awaiting_agent_id}") + + # Show recent agent responses (last 3 messages excluding initial user message) + print("\nRecent conversation:") + recent_msgs = [m for m in event.data.conversation[-4:] if m.role.value != "user"][-2:] + for msg in recent_msgs: + author = msg.author_name or msg.role.value + print(f"\n [{author}]:") + print(f" {msg.text}") + + print(f"{'=' * 60}") + new_pending_requests.append(event) + + elif isinstance(event, WorkflowOutputEvent): + print("\n[Workflow Completed]") + print(f"Final conversation length: {len(event.data)} messages") # type: ignore[arg-type] + return [], None + + return new_pending_requests, latest_checkpoint.checkpoint_id + + +async def main() -> None: + """ + Demonstrate the checkpoint-based pause/resume pattern for handoff workflows. + + This sample shows: + 1. Starting a workflow and getting a HandoffUserInputRequest + 2. Pausing (checkpoint is saved automatically) + 3. Resuming from checkpoint with a user response (two-step pattern) + 4. Continuing the conversation until completion + """ + + # Enable INFO logging to see workflow progress + logging.basicConfig( + level=logging.INFO, + format="[%(levelname)s] %(name)s: %(message)s", + ) + + # Clean up old checkpoints + for file in CHECKPOINT_DIR.glob("*.json"): + file.unlink() + + storage = FileCheckpointStorage(storage_path=CHECKPOINT_DIR) + workflow, _, _, _ = create_workflow(checkpoint_storage=storage) + + print("=" * 60) + print("HANDOFF WORKFLOW CHECKPOINT DEMO") + print("=" * 60) + + # Scenario: User needs help with a damaged order + initial_request = "Hi, my order 12345 arrived damaged. I need help." + + # Phase 1: Initial run - workflow will pause when it needs user input + pending_requests, _ = await run_until_user_input_needed( + workflow, + initial_message=initial_request, + ) + + if not pending_requests: + print("Workflow completed without needing user input") + return + + print("\n>>> Workflow paused. You could exit the process here.") + print(f">>> Checkpoint was saved. Pending requests: {len(pending_requests)}") + print("\n>>> Simulating process restart...\n") + + # Simulate process restart - create fresh workflow instance + workflow2, _, _, _ = create_workflow(checkpoint_storage=storage) + + # Phase 2: Resume with user response + user_response_1 = "Yes, I'd like a replacement or refund. The packaging was completely destroyed." + + pending_requests, _ = await resume_with_response( + workflow2, + storage, + user_response_1, + ) + + if not pending_requests: + print("\nWorkflow completed!") + return + + # Phase 3: Continue conversation - can repeat the pattern + print("\n>>> Workflow paused again. Another checkpoint saved.") + print(">>> Simulating another resume...\n") + + workflow3, _, _, _ = create_workflow(checkpoint_storage=storage) + user_response_2 = "A replacement would be great. Can you ship it to the same address?" + + await resume_with_response( + workflow3, + storage, + user_response_2, + ) + + print("\n" + "=" * 60) + print("DEMO COMPLETE") + print("=" * 60) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/test_checkpoint_encoding.py b/python/test_checkpoint_encoding.py new file mode 100644 index 0000000000..2dbbef1f90 --- /dev/null +++ b/python/test_checkpoint_encoding.py @@ -0,0 +1,96 @@ +"""Debug script to test checkpoint encoding with ChatMessage.""" + +from dataclasses import asdict, dataclass + +from agent_framework import ChatMessage +from agent_framework._workflows._checkpoint_encoding import encode_checkpoint_value + + +@dataclass +class HandoffUserInputRequest: + conversation: list[ChatMessage] + awaiting_agent_id: str + prompt: str + source_executor_id: str + + +# Create a sample request +msg1 = ChatMessage(role="user", text="Hello") +msg2 = ChatMessage(role="assistant", text="Hi there") +request = HandoffUserInputRequest( + conversation=[msg1, msg2], awaiting_agent_id="agent1", prompt="test", source_executor_id="source1" +) + +print("=" * 60) +print("Test 1: encode_checkpoint_value on HandoffUserInputRequest") +print("=" * 60) +encoded = encode_checkpoint_value(request) +print(f"Type of encoded: {type(encoded)}") +print(f"Keys: {encoded.keys() if isinstance(encoded, dict) else 'N/A'}") +if isinstance(encoded, dict) and "value" in encoded: + value = encoded["value"] + print(f"\nType of encoded['value']: {type(value)}") + print(f"Keys in value: {value.keys() if isinstance(value, dict) else 'N/A'}") + if isinstance(value, dict) and "conversation" in value: + conv = value["conversation"] + print(f"\nType of conversation: {type(conv)}") + print(f"Length: {len(conv) if isinstance(conv, list) else 'N/A'}") + if isinstance(conv, list) and len(conv) > 0: + first = conv[0] + print(f"Type of first item: {type(first)}") + print(f"Is it a ChatMessage? {isinstance(first, ChatMessage)}") + if isinstance(first, dict): + print(f"Keys in first item: {first.keys()}") + +print("\n" + "=" * 60) +print("Test 2: Try json.dumps on encoded value") +print("=" * 60) +import json + +try: + json_str = json.dumps(encoded, indent=2) + print("SUCCESS! Encoded value is JSON serializable") + print(f"JSON length: {len(json_str)} chars") +except Exception as e: + print(f"FAILED: {e}") + +print("\n" + "=" * 60) +print("Test 3: Create WorkflowCheckpoint with encoded request") +print("=" * 60) +from agent_framework._workflows._checkpoint import WorkflowCheckpoint + +checkpoint = WorkflowCheckpoint(workflow_id="test", pending_request_info_events={"req1": {"data": encoded}}) + +print("Checkpoint created") +print(f"Type of checkpoint.pending_request_info_events: {type(checkpoint.pending_request_info_events)}") + +checkpoint_dict = asdict(checkpoint) +print(f"\nType after asdict: {type(checkpoint_dict)}") + +# Check what's in pending_request_info_events after asdict +pending = checkpoint_dict.get("pending_request_info_events", {}) +if "req1" in pending: + req1_data = pending["req1"]["data"] + print(f"Type of checkpoint_dict['pending_request_info_events']['req1']['data']: {type(req1_data)}") + if isinstance(req1_data, dict) and "value" in req1_data: + value = req1_data["value"] + if isinstance(value, dict) and "conversation" in value: + conv = value["conversation"] + print(f"Type of conversation after asdict: {type(conv)}") + if isinstance(conv, list) and len(conv) > 0: + first = conv[0] + print(f"Type of first item after asdict: {type(first)}") + print(f"Is it a ChatMessage? {isinstance(first, ChatMessage)}") + +print("\n" + "=" * 60) +print("Test 4: Try json.dumps on asdict(checkpoint)") +print("=" * 60) +try: + json_str = json.dumps(checkpoint_dict, indent=2) + print("SUCCESS! checkpoint_dict is JSON serializable") + print(f"JSON length: {len(json_str)} chars") +except Exception as e: + print(f"FAILED: {e}") + import traceback + + traceback.print_exc() diff --git a/python/test_checkpoint_exact_flow.py b/python/test_checkpoint_exact_flow.py new file mode 100644 index 0000000000..7f67134718 --- /dev/null +++ b/python/test_checkpoint_exact_flow.py @@ -0,0 +1,126 @@ +"""Debug script to replicate the exact checkpoint creation flow.""" + +import asyncio +import json +from dataclasses import asdict + +from agent_framework import ChatMessage +from agent_framework._workflows._checkpoint import WorkflowCheckpoint +from agent_framework._workflows._checkpoint_encoding import encode_checkpoint_value +from agent_framework._workflows._events import RequestInfoEvent +from agent_framework._workflows._handoff import HandoffUserInputRequest + + +async def main(): + # Simulate the exact flow in _runner_context.py _get_serialized_workflow_state + + # Step 1: Create messages like in the workflow + messages = { + "gateway": [ + ChatMessage(role="user", text="Hello"), + ChatMessage(role="assistant", text="Hi there"), + ] + } + + # Step 2: Serialize messages like line 462-463 of _runner_context.py + serialized_messages = {} + for source_id, message_list in messages.items(): + serialized_messages[source_id] = [msg.to_dict() for msg in message_list] + + print("Messages serialized successfully") + print(f"Type of serialized_messages['gateway'][0]: {type(serialized_messages['gateway'][0])}") + + # Step 3: Create HandoffUserInputRequest like in _handoff.py line 538 + request_data = HandoffUserInputRequest( + conversation=[ChatMessage(role="user", text="Test")], + awaiting_agent_id="agent1", + prompt="test", + source_executor_id="source1", + ) + + # Step 4: Create RequestInfoEvent like in _workflow_context.py line 374 + request_info_event = RequestInfoEvent( + request_id="req-123", + source_executor_id="gateway", + request_data=request_data, + response_type=object, + ) + + # Step 5: Serialize RequestInfoEvent like line 465 of _runner_context.py + serialized_pending_request_info_events = {"req-123": request_info_event.to_dict()} + + print("Pending requests serialized successfully") + + # Step 6: Create state dict like line 468-473 of _runner_context.py + state = { + "messages": serialized_messages, + "shared_state": encode_checkpoint_value({}), + "iteration_count": 1, + "pending_request_info_events": serialized_pending_request_info_events, + } + + # Step 7: Create WorkflowCheckpoint like line 382-388 of _runner_context.py + checkpoint = WorkflowCheckpoint( + workflow_id="test-workflow", + messages=state["messages"], + shared_state=state["shared_state"], + pending_request_info_events=state["pending_request_info_events"], + iteration_count=state["iteration_count"], + metadata={}, + ) + + print("WorkflowCheckpoint created successfully") + + # Step 8: Convert to dict like line 141 of _checkpoint.py + checkpoint_dict = asdict(checkpoint) + + print("asdict(checkpoint) completed") + + # Step 9: Check types + print("\nInspecting checkpoint_dict...") + print(f"Type of checkpoint_dict['messages']: {type(checkpoint_dict['messages'])}") + if "gateway" in checkpoint_dict["messages"]: + msgs = checkpoint_dict["messages"]["gateway"] + print(f"Type of first message: {type(msgs[0])}") + if not isinstance(msgs[0], dict): + print(f"ERROR: Message is not a dict, it's: {msgs[0]}") + + print( + f"\nType of checkpoint_dict['pending_request_info_events']: {type(checkpoint_dict['pending_request_info_events'])}" + ) + if "req-123" in checkpoint_dict["pending_request_info_events"]: + req = checkpoint_dict["pending_request_info_events"]["req-123"] + print(f"Type of request: {type(req)}") + if "data" in req: + print(f"Type of request['data']: {type(req['data'])}") + + # Step 10: Try json.dump like line 147 of _checkpoint.py + print("\nAttempting json.dump...") + try: + json_str = json.dumps(checkpoint_dict, indent=2) + print(f"SUCCESS! JSON length: {len(json_str)} chars") + except Exception as e: + print(f"FAILED: {e}") + import traceback + + traceback.print_exc() + + # Find the problematic object + print("\n\nSearching for ChatMessage objects...") + + def find_chat_messages(obj, path="root"): + if isinstance(obj, ChatMessage): + print(f"Found ChatMessage at: {path}") + return + if isinstance(obj, dict): + for k, v in obj.items(): + find_chat_messages(v, f"{path}.{k}") + elif isinstance(obj, list): + for i, item in enumerate(obj): + find_chat_messages(item, f"{path}[{i}]") + + find_chat_messages(checkpoint_dict) + + +if __name__ == "__main__": + asyncio.run(main()) From 8815d4b87d38adad14d05728aa72d5ee5d7b4269 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Thu, 20 Nov 2025 19:23:01 +0900 Subject: [PATCH 2/7] handoff with tool approval --- ...f_with_tool_approval_checkpoint_resume.py} | 227 +++++++++++------- 1 file changed, 145 insertions(+), 82 deletions(-) rename python/samples/getting_started/workflows/checkpoint/{handoff_checkpoint_resume.py => handoff_with_tool_approval_checkpoint_resume.py} (57%) diff --git a/python/samples/getting_started/workflows/checkpoint/handoff_checkpoint_resume.py b/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py similarity index 57% rename from python/samples/getting_started/workflows/checkpoint/handoff_checkpoint_resume.py rename to python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py index 02b1ae49ba..7289d2aaa7 100644 --- a/python/samples/getting_started/workflows/checkpoint/handoff_checkpoint_resume.py +++ b/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py @@ -25,6 +25,7 @@ """ import asyncio +import json import logging from pathlib import Path from typing import cast @@ -33,12 +34,14 @@ ChatAgent, ChatMessage, FileCheckpointStorage, + FunctionApprovalRequestContent, HandoffBuilder, HandoffUserInputRequest, RequestInfoEvent, Workflow, WorkflowOutputEvent, WorkflowStatusEvent, + ai_function, ) from agent_framework.azure import AzureOpenAIChatClient from azure.identity import AzureCliCredential @@ -47,6 +50,13 @@ CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True) +@ai_function(approval_mode="always_require") +def submit_refund(refund_description: str, amount: str, order_id: str) -> str: + """Capture a refund request for manual review before processing.""" + summary = f"refund recorded for order {order_id} (amount: {amount}) with details: {refund_description}" + return summary + + def create_agents(client: AzureOpenAIChatClient) -> tuple[ChatAgent, ChatAgent, ChatAgent]: """Create a simple handoff scenario: triage, refund, and order specialists.""" @@ -63,8 +73,11 @@ def create_agents(client: AzureOpenAIChatClient) -> tuple[ChatAgent, ChatAgent, name="refund_agent", instructions=( "You are a refund specialist. Help customers with refund requests. " - "Be empathetic and ask for order numbers if not provided." + "Be empathetic and ask for order numbers if not provided. " + "When the user confirms they want a refund and supplies order details, call submit_refund " + "to record the request before continuing." ), + tools=[submit_refund], ) order = client.create_agent( @@ -101,13 +114,63 @@ def create_workflow(checkpoint_storage: FileCheckpointStorage) -> tuple[Workflow return workflow, triage, refund, order +def _print_handoff_request(request: HandoffUserInputRequest, request_id: str) -> None: + """Log pending handoff request details for debugging.""" + print(f"\n{'=' * 60}") + print("WORKFLOW PAUSED - User input needed") + print(f"Request ID: {request_id}") + print(f"Awaiting agent: {request.awaiting_agent_id}") + print(f"Prompt: {request.prompt}") + + print("\nConversation so far:") + for msg in request.conversation[-3:]: + author = msg.author_name or msg.role.value + snippet = msg.text[:120] + "..." if len(msg.text) > 120 else msg.text + print(f" {author}: {snippet}") + + print(f"{'=' * 60}\n") + + +def _print_function_approval_request(request: FunctionApprovalRequestContent, request_id: str) -> None: + """Log pending tool approval details for debugging.""" + args = request.function_call.parse_arguments() or {} + print(f"\n{'=' * 60}") + print("WORKFLOW PAUSED - Tool approval required") + print(f"Request ID: {request_id}") + print(f"Function: {request.function_call.name}") + print(f"Arguments:\n{json.dumps(args, indent=2)}") + print(f"{'=' * 60}\n") + + +def _build_responses_for_requests( + pending_requests: list[RequestInfoEvent], + *, + user_response: str | None, + approve_tools: bool | None, +) -> dict[str, object]: + """Create response payloads for each pending request.""" + responses: dict[str, object] = {} + for request in pending_requests: + if isinstance(request.data, HandoffUserInputRequest): + if user_response is None: + raise ValueError("User response is required for HandoffUserInputRequest") + responses[request.request_id] = user_response + elif isinstance(request.data, FunctionApprovalRequestContent): + if approve_tools is None: + raise ValueError("Approval decision is required for FunctionApprovalRequestContent") + responses[request.request_id] = request.data.create_response(approved=approve_tools) + else: + raise ValueError(f"Unsupported request type: {type(request.data)}") + return responses + + async def run_until_user_input_needed( workflow: Workflow, initial_message: str | None = None, checkpoint_id: str | None = None, ) -> tuple[list[RequestInfoEvent], str | None]: """ - Run the workflow until it needs user input or completes. + Run the workflow until it needs user input or approval, or completes. Returns: Tuple of (pending_requests, checkpoint_id_to_use_for_resume) @@ -129,21 +192,11 @@ async def run_until_user_input_needed( print(f"[Status] {event.state}") elif isinstance(event, RequestInfoEvent): + pending_requests.append(event) if isinstance(event.data, HandoffUserInputRequest): - print(f"\n{'=' * 60}") - print("WORKFLOW PAUSED - User input needed") - print(f"Request ID: {event.request_id}") - print(f"Awaiting agent: {event.data.awaiting_agent_id}") - print(f"Prompt: {event.data.prompt}") - - # Print conversation history - print("\nConversation so far:") - for msg in event.data.conversation[-3:]: # Show last 3 messages - author = msg.author_name or msg.role.value - print(f" {author}: {msg.text[:80]}...") - - print(f"{'=' * 60}\n") - pending_requests.append(event) + _print_handoff_request(event.data, event.request_id) + elif isinstance(event.data, FunctionApprovalRequestContent): + _print_function_approval_request(event.data, event.request_id) elif isinstance(event, WorkflowOutputEvent): print("\n[Workflow Completed]") @@ -157,13 +210,14 @@ async def run_until_user_input_needed( return pending_requests, latest_checkpoint_id -async def resume_with_response( +async def resume_with_responses( workflow: Workflow, checkpoint_storage: FileCheckpointStorage, - user_response: str, + user_response: str | None = None, + approve_tools: bool | None = None, ) -> tuple[list[RequestInfoEvent], str | None]: """ - Two-step resume pattern (answers customer's question): + Two-step resume pattern (answers customer questions and tool approvals): Step 1: Restore checkpoint to load pending requests into workflow state Step 2: Send user responses using send_responses_streaming @@ -172,8 +226,11 @@ async def resume_with_response( doesn't accept a checkpoint_id parameter. """ print(f"\n{'=' * 60}") - print("RESUMING WORKFLOW WITH USER RESPONSE") - print(f"User says: {user_response}") + print("RESUMING WORKFLOW WITH HUMAN INPUT") + if user_response is not None: + print(f"User says: {user_response}") + if approve_tools is not None: + print(f"Approve tools: {approve_tools}") print(f"{'=' * 60}\n") # Get the latest checkpoint @@ -189,19 +246,24 @@ async def resume_with_response( # Step 1: Restore the checkpoint to load pending requests into memory # The checkpoint restoration re-emits pending RequestInfoEvents - pending_request_ids: list[str] = [] + restored_requests: list[RequestInfoEvent] = [] async for event in workflow.run_stream(checkpoint_id=latest_checkpoint.checkpoint_id): # type: ignore[attr-defined] - if isinstance(event, RequestInfoEvent) and isinstance(event.data, HandoffUserInputRequest): - pending_request_ids.append(event.request_id) - print(f"Found pending request: {event.request_id}") + if isinstance(event, RequestInfoEvent): + restored_requests.append(event) + if isinstance(event.data, HandoffUserInputRequest): + _print_handoff_request(event.data, event.request_id) + elif isinstance(event.data, FunctionApprovalRequestContent): + _print_function_approval_request(event.data, event.request_id) - if not pending_request_ids: + if not restored_requests: raise RuntimeError("No pending requests found after checkpoint restoration") - print(f"Step 2: Sending user response for {len(pending_request_ids)} request(s)") - - # Step 2: Send the user's response - responses = {req_id: user_response for req_id in pending_request_ids} + responses = _build_responses_for_requests( + restored_requests, + user_response=user_response, + approve_tools=approve_tools, + ) + print(f"Step 2: Sending responses for {len(responses)} request(s)") new_pending_requests: list[RequestInfoEvent] = [] @@ -210,7 +272,6 @@ async def resume_with_response( print(f"[Status] {event.state}") elif isinstance(event, WorkflowOutputEvent): - # Workflow completed or paused - show the conversation print("\n[Workflow Output Event - Conversation Update]") if event.data and isinstance(event.data, list): # Cast event.data to list[ChatMessage] for type checking @@ -222,27 +283,11 @@ async def resume_with_response( print(f" {author}: {text}") elif isinstance(event, RequestInfoEvent): + new_pending_requests.append(event) if isinstance(event.data, HandoffUserInputRequest): - print(f"\n{'=' * 60}") - print("WORKFLOW PAUSED AGAIN - User input needed") - print(f"Request ID: {event.request_id}") - print(f"Awaiting agent: {event.data.awaiting_agent_id}") - - # Show recent agent responses (last 3 messages excluding initial user message) - print("\nRecent conversation:") - recent_msgs = [m for m in event.data.conversation[-4:] if m.role.value != "user"][-2:] - for msg in recent_msgs: - author = msg.author_name or msg.role.value - print(f"\n [{author}]:") - print(f" {msg.text}") - - print(f"{'=' * 60}") - new_pending_requests.append(event) - - elif isinstance(event, WorkflowOutputEvent): - print("\n[Workflow Completed]") - print(f"Final conversation length: {len(event.data)} messages") # type: ignore[arg-type] - return [], None + _print_handoff_request(event.data, event.request_id) + elif isinstance(event.data, FunctionApprovalRequestContent): + _print_function_approval_request(event.data, event.request_id) return new_pending_requests, latest_checkpoint.checkpoint_id @@ -254,7 +299,7 @@ async def main() -> None: This sample shows: 1. Starting a workflow and getting a HandoffUserInputRequest 2. Pausing (checkpoint is saved automatically) - 3. Resuming from checkpoint with a user response (two-step pattern) + 3. Resuming from checkpoint with a user response or tool approval (two-step pattern) 4. Continuing the conversation until completion """ @@ -267,6 +312,8 @@ async def main() -> None: # Clean up old checkpoints for file in CHECKPOINT_DIR.glob("*.json"): file.unlink() + for file in CHECKPOINT_DIR.glob("*.json.tmp"): + file.unlink() storage = FileCheckpointStorage(storage_path=CHECKPOINT_DIR) workflow, _, _, _ = create_workflow(checkpoint_storage=storage) @@ -276,7 +323,7 @@ async def main() -> None: print("=" * 60) # Scenario: User needs help with a damaged order - initial_request = "Hi, my order 12345 arrived damaged. I need help." + initial_request = "Hi, my order 12345 arrived damaged. I need a refund." # Phase 1: Initial run - workflow will pause when it needs user input pending_requests, _ = await run_until_user_input_needed( @@ -290,36 +337,52 @@ async def main() -> None: print("\n>>> Workflow paused. You could exit the process here.") print(f">>> Checkpoint was saved. Pending requests: {len(pending_requests)}") - print("\n>>> Simulating process restart...\n") - - # Simulate process restart - create fresh workflow instance - workflow2, _, _, _ = create_workflow(checkpoint_storage=storage) - - # Phase 2: Resume with user response - user_response_1 = "Yes, I'd like a replacement or refund. The packaging was completely destroyed." - - pending_requests, _ = await resume_with_response( - workflow2, - storage, - user_response_1, - ) - - if not pending_requests: - print("\nWorkflow completed!") - return - - # Phase 3: Continue conversation - can repeat the pattern - print("\n>>> Workflow paused again. Another checkpoint saved.") - print(">>> Simulating another resume...\n") - workflow3, _, _, _ = create_workflow(checkpoint_storage=storage) - user_response_2 = "A replacement would be great. Can you ship it to the same address?" - - await resume_with_response( - workflow3, - storage, - user_response_2, - ) + # Scripted human input for demo purposes + handoff_responses = [ + ( + "The headphones in order 12345 arrived cracked. " + "Please submit the refund for $89.99 and send a replacement to my original address." + ), + "Yes, that covers the damage and refund request.", + "That's everything I needed for the refund.", + "Thanks for handling the refund.", + ] + approval_decisions = [True, True, True] + handoff_index = 0 + approval_index = 0 + + while pending_requests: + print("\n>>> Simulating process restart...\n") + workflow_step, _, _, _ = create_workflow(checkpoint_storage=storage) + + needs_user_input = any(isinstance(req.data, HandoffUserInputRequest) for req in pending_requests) + needs_tool_approval = any(isinstance(req.data, FunctionApprovalRequestContent) for req in pending_requests) + + user_response = None + if needs_user_input: + if handoff_index < len(handoff_responses): + user_response = handoff_responses[handoff_index] + handoff_index += 1 + else: + user_response = handoff_responses[-1] + print(f">>> Responding to handoff request with: {user_response}") + + approval_response = None + if needs_tool_approval: + if approval_index < len(approval_decisions): + approval_response = approval_decisions[approval_index] + approval_index += 1 + else: + approval_response = approval_decisions[-1] + print(">>> Approving pending tool calls from the agent.") + + pending_requests, _ = await resume_with_responses( + workflow_step, + storage, + user_response=user_response, + approve_tools=approval_response, + ) print("\n" + "=" * 60) print("DEMO COMPLETE") From d77f3b6f021ee954b0e8389fb0e7a268c27401b3 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Thu, 20 Nov 2025 19:51:30 +0900 Subject: [PATCH 3/7] add Hanoff sample with tool approval and checkpointing --- .../checkpoint/HANDOFF_CHECKPOINT_README.md | 120 ------------------ ...ff_with_tool_approval_checkpoint_resume.py | 52 ++++---- 2 files changed, 26 insertions(+), 146 deletions(-) delete mode 100644 python/samples/getting_started/workflows/checkpoint/HANDOFF_CHECKPOINT_README.md diff --git a/python/samples/getting_started/workflows/checkpoint/HANDOFF_CHECKPOINT_README.md b/python/samples/getting_started/workflows/checkpoint/HANDOFF_CHECKPOINT_README.md deleted file mode 100644 index 9748e7e151..0000000000 --- a/python/samples/getting_started/workflows/checkpoint/HANDOFF_CHECKPOINT_README.md +++ /dev/null @@ -1,120 +0,0 @@ -# Handoff Pattern with Checkpointing: Pause/Resume Support - -## Customer Question - -> "When `HandoffUserInputRequest` is generated I need to pause the workflow, send the request to the user interface, and when I get the user message I should resume the workflow sending the response. In order to suspend and resume the workflow I was looking at the checkpoint concept. -> -> Question: How should I resume the workflow providing the user response (for the `HandoffUserInputRequest`) back to the specific workflow instance? -> -> The problem is that `workflow.send_responses_streaming` doesn't have a `checkpoint` param, only `workflow.run_stream` can accept a checkpoint." - -## Answer: YES, This Pattern is Supported - -The Handoff pattern **does support** checkpoint-based pause/resume with user input requests. However, it requires a **two-step pattern** because, as the customer correctly observed, `send_responses_streaming()` doesn't accept a `checkpoint_id` parameter. - -## The Two-Step Resume Pattern - -```python -# Step 1: Restore the checkpoint to load pending requests into workflow state -async for event in workflow.run_stream(checkpoint_id="checkpoint_123"): - if isinstance(event, RequestInfoEvent): - pending_request_ids.append(event.request_id) - break # Stop after checkpoint restoration - -# Step 2: Reset workflow's internal running flags (required) -if hasattr(workflow, "_is_running"): - workflow._is_running = False -if hasattr(workflow, "_runner") and hasattr(workflow._runner, "_running"): - workflow._runner._running = False - -# Step 3: Send user responses -responses = {req_id: user_response for req_id in pending_request_ids} -async for event in workflow.send_responses_streaming(responses): - # Process events... -``` - -## Why This Pattern Works - -1. **Checkpoint Restoration**: `run_stream(checkpoint_id=...)` restores the workflow state including pending `HandoffUserInputRequest` events -2. **In-Memory State**: The checkpoint loads pending requests into the workflow's in-memory state -3. **Response Delivery**: `send_responses_streaming(responses)` sends responses to those restored pending requests -4. **Stateless HTTP Compatible**: This pattern works for stateless HTTP scenarios where the workflow instance doesn't persist between requests - -## Complete Working Sample - -See: `handoff_checkpoint_resume.py` - -This sample demonstrates: -- Starting a handoff workflow -- Receiving a `HandoffUserInputRequest` -- Pausing (checkpoint saved automatically) -- **Simulating process restart** (creating new workflow instance) -- Resuming from checkpoint with user response (two-step pattern) -- Continuing the conversation - -## Key Architectural Points - -### Why Not a Single `send_responses_streaming(responses, checkpoint_id)` Call? - -The current architecture separates concerns: -- `run_stream(checkpoint_id)` - State restoration (loading checkpoints) -- `send_responses_streaming(responses)` - Response delivery (workflow execution) - -This separation actually enables the pattern to work correctly because: -1. Checkpoint restoration must happen first to populate pending requests -2. Response validation occurs against the restored pending requests -3. The workflow must be in a specific internal state before accepting responses - -### Comparison to Other Checkpoint Samples - -Unlike `checkpoint_with_human_in_the_loop.py` which uses a simple request/response executor, the Handoff pattern: -- Uses `HandoffUserInputRequest` (instead of custom request types) -- Manages conversation state automatically -- Handles multi-agent routing -- Requires the two-step pattern for stateless scenarios - -## Implementation Note from DevUI - -The DevUI package uses this exact pattern for stateless HTTP scenarios: - -```python -# From agent_framework_devui/_executor.py -# Step 1: Restore checkpoint -async for _event in workflow.run_stream(checkpoint_id=checkpoint_id, checkpoint_storage=storage): - restored = True - break # Stop immediately after restoration - -# Step 2: Reset flags -if hasattr(workflow, "_is_running"): - workflow._is_running = False -if hasattr(workflow, "_runner") and hasattr(workflow._runner, "_running"): - workflow._runner._running = False - -# Step 3: Send responses -async for event in workflow.send_responses_streaming(responses): - # Process events... -``` - -## Future Enhancement Consideration - -A potential framework enhancement could provide: -```python -# Hypothetical future API (not currently supported) -async for event in workflow.run_stream( - checkpoint_id="checkpoint_123", - responses={"request_id": "user response"} -): - # Combined checkpoint restoration + response delivery -``` - -However, the current two-step pattern is the supported and working approach. - -## Summary - -**YES** - The Handoff pattern supports checkpoint-based pause/resume with `HandoffUserInputRequest`. - -**Pattern**: Use the two-step approach: -1. `workflow.run_stream(checkpoint_id=...)` -2. `workflow.send_responses_streaming(responses)` - -This is the documented and supported pattern for stateless scenarios where workflow instances don't persist between requests. diff --git a/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py b/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py index 7289d2aaa7..196b7e5010 100644 --- a/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py +++ b/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py @@ -1,29 +1,5 @@ # Copyright (c) Microsoft. All rights reserved. -""" -Sample: Handoff Workflow with Checkpoint-Based Pause/Resume - -Demonstrates the two-step pattern for resuming a handoff workflow from a checkpoint -and providing user responses to HandoffUserInputRequest. - -Scenario: -1. User starts a conversation with the workflow -2. Workflow requests user input (HandoffUserInputRequest is emitted) -3. Workflow pauses and saves a checkpoint -4. Process can exit/restart -5. On resume: Load checkpoint + provide user response -6. Workflow continues from where it left off - -Pattern: -- Step 1: workflow.run_stream(checkpoint_id=...) to restore checkpoint -- Step 2: workflow.send_responses_streaming(responses) to provide user input -- This two-step approach is required because send_responses_streaming doesn't accept checkpoint_id - -Prerequisites: -- Azure CLI authentication (az login) -- Environment variables configured for AzureOpenAIChatClient -""" - import asyncio import json import logging @@ -46,6 +22,31 @@ from agent_framework.azure import AzureOpenAIChatClient from azure.identity import AzureCliCredential +""" +Sample: Handoff Workflow with Tool Approvals + Checkpoint Resume + +Demonstrates the two-step pattern for resuming a handoff workflow from a checkpoint +while handling both HandoffUserInputRequest prompts and FunctionApprovalRequestContent +for tool calls (e.g., submit_refund). + +Scenario: +1. User starts a conversation with the workflow. +2. Agents may emit user input requests or tool approval requests. +3. Workflow pauses and writes a checkpoint capturing pending requests. +4. Process can exit/restart. +5. On resume: Load the checkpoint, surface pending approvals/user prompts, and provide responses. +6. Workflow continues from the saved state. + +Pattern: +- Step 1: workflow.run_stream(checkpoint_id=...) to restore checkpoint and pending requests. +- Step 2: workflow.send_responses_streaming(responses) to supply human replies and approvals. +- Two-step approach is required because send_responses_streaming does not accept checkpoint_id. + +Prerequisites: +- Azure CLI authentication (az login). +- Environment variables configured for AzureOpenAIChatClient. +""" + CHECKPOINT_DIR = Path(__file__).parent / "tmp" / "handoff_checkpoints" CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True) @@ -53,8 +54,7 @@ @ai_function(approval_mode="always_require") def submit_refund(refund_description: str, amount: str, order_id: str) -> str: """Capture a refund request for manual review before processing.""" - summary = f"refund recorded for order {order_id} (amount: {amount}) with details: {refund_description}" - return summary + return f"refund recorded for order {order_id} (amount: {amount}) with details: {refund_description}" def create_agents(client: AzureOpenAIChatClient) -> tuple[ChatAgent, ChatAgent, ChatAgent]: From 0fd078497bdff6f348dd33d374891b00784639aa Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Thu, 20 Nov 2025 19:52:39 +0900 Subject: [PATCH 4/7] Add to README --- python/samples/getting_started/workflows/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index ffbeedd751..f9c6eeaf7b 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -52,6 +52,7 @@ Once comfortable with these, explore the rest of the samples below. | Checkpoint & Resume | [checkpoint/checkpoint_with_resume.py](./checkpoint/checkpoint_with_resume.py) | Create checkpoints, inspect them, and resume execution | | Checkpoint & HITL Resume | [checkpoint/checkpoint_with_human_in_the_loop.py](./checkpoint/checkpoint_with_human_in_the_loop.py) | Combine checkpointing with human approvals and resume pending HITL requests | | Checkpointed Sub-Workflow | [checkpoint/sub_workflow_checkpoint.py](./checkpoint/sub_workflow_checkpoint.py) | Save and resume a sub-workflow that pauses for human approval | +| Handoff + Tool Approval Resume | [checkpoint/handoff_with_tool_approval_checkpoint_resume.py](./checkpoint/handoff_with_tool_approval_checkpoint_resume.py) | Handoff workflow that captures tool-call approvals in checkpoints and resumes with human decisions | ### composition From 919cdbe83ea4756fa4e097decb68eeae4e0c2b44 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Thu, 20 Nov 2025 19:54:06 +0900 Subject: [PATCH 5/7] Remove unnecessary files --- python/test_checkpoint_encoding.py | 96 -------------------- python/test_checkpoint_exact_flow.py | 126 --------------------------- 2 files changed, 222 deletions(-) delete mode 100644 python/test_checkpoint_encoding.py delete mode 100644 python/test_checkpoint_exact_flow.py diff --git a/python/test_checkpoint_encoding.py b/python/test_checkpoint_encoding.py deleted file mode 100644 index 2dbbef1f90..0000000000 --- a/python/test_checkpoint_encoding.py +++ /dev/null @@ -1,96 +0,0 @@ -"""Debug script to test checkpoint encoding with ChatMessage.""" - -from dataclasses import asdict, dataclass - -from agent_framework import ChatMessage -from agent_framework._workflows._checkpoint_encoding import encode_checkpoint_value - - -@dataclass -class HandoffUserInputRequest: - conversation: list[ChatMessage] - awaiting_agent_id: str - prompt: str - source_executor_id: str - - -# Create a sample request -msg1 = ChatMessage(role="user", text="Hello") -msg2 = ChatMessage(role="assistant", text="Hi there") -request = HandoffUserInputRequest( - conversation=[msg1, msg2], awaiting_agent_id="agent1", prompt="test", source_executor_id="source1" -) - -print("=" * 60) -print("Test 1: encode_checkpoint_value on HandoffUserInputRequest") -print("=" * 60) -encoded = encode_checkpoint_value(request) -print(f"Type of encoded: {type(encoded)}") -print(f"Keys: {encoded.keys() if isinstance(encoded, dict) else 'N/A'}") -if isinstance(encoded, dict) and "value" in encoded: - value = encoded["value"] - print(f"\nType of encoded['value']: {type(value)}") - print(f"Keys in value: {value.keys() if isinstance(value, dict) else 'N/A'}") - if isinstance(value, dict) and "conversation" in value: - conv = value["conversation"] - print(f"\nType of conversation: {type(conv)}") - print(f"Length: {len(conv) if isinstance(conv, list) else 'N/A'}") - if isinstance(conv, list) and len(conv) > 0: - first = conv[0] - print(f"Type of first item: {type(first)}") - print(f"Is it a ChatMessage? {isinstance(first, ChatMessage)}") - if isinstance(first, dict): - print(f"Keys in first item: {first.keys()}") - -print("\n" + "=" * 60) -print("Test 2: Try json.dumps on encoded value") -print("=" * 60) -import json - -try: - json_str = json.dumps(encoded, indent=2) - print("SUCCESS! Encoded value is JSON serializable") - print(f"JSON length: {len(json_str)} chars") -except Exception as e: - print(f"FAILED: {e}") - -print("\n" + "=" * 60) -print("Test 3: Create WorkflowCheckpoint with encoded request") -print("=" * 60) -from agent_framework._workflows._checkpoint import WorkflowCheckpoint - -checkpoint = WorkflowCheckpoint(workflow_id="test", pending_request_info_events={"req1": {"data": encoded}}) - -print("Checkpoint created") -print(f"Type of checkpoint.pending_request_info_events: {type(checkpoint.pending_request_info_events)}") - -checkpoint_dict = asdict(checkpoint) -print(f"\nType after asdict: {type(checkpoint_dict)}") - -# Check what's in pending_request_info_events after asdict -pending = checkpoint_dict.get("pending_request_info_events", {}) -if "req1" in pending: - req1_data = pending["req1"]["data"] - print(f"Type of checkpoint_dict['pending_request_info_events']['req1']['data']: {type(req1_data)}") - if isinstance(req1_data, dict) and "value" in req1_data: - value = req1_data["value"] - if isinstance(value, dict) and "conversation" in value: - conv = value["conversation"] - print(f"Type of conversation after asdict: {type(conv)}") - if isinstance(conv, list) and len(conv) > 0: - first = conv[0] - print(f"Type of first item after asdict: {type(first)}") - print(f"Is it a ChatMessage? {isinstance(first, ChatMessage)}") - -print("\n" + "=" * 60) -print("Test 4: Try json.dumps on asdict(checkpoint)") -print("=" * 60) -try: - json_str = json.dumps(checkpoint_dict, indent=2) - print("SUCCESS! checkpoint_dict is JSON serializable") - print(f"JSON length: {len(json_str)} chars") -except Exception as e: - print(f"FAILED: {e}") - import traceback - - traceback.print_exc() diff --git a/python/test_checkpoint_exact_flow.py b/python/test_checkpoint_exact_flow.py deleted file mode 100644 index 7f67134718..0000000000 --- a/python/test_checkpoint_exact_flow.py +++ /dev/null @@ -1,126 +0,0 @@ -"""Debug script to replicate the exact checkpoint creation flow.""" - -import asyncio -import json -from dataclasses import asdict - -from agent_framework import ChatMessage -from agent_framework._workflows._checkpoint import WorkflowCheckpoint -from agent_framework._workflows._checkpoint_encoding import encode_checkpoint_value -from agent_framework._workflows._events import RequestInfoEvent -from agent_framework._workflows._handoff import HandoffUserInputRequest - - -async def main(): - # Simulate the exact flow in _runner_context.py _get_serialized_workflow_state - - # Step 1: Create messages like in the workflow - messages = { - "gateway": [ - ChatMessage(role="user", text="Hello"), - ChatMessage(role="assistant", text="Hi there"), - ] - } - - # Step 2: Serialize messages like line 462-463 of _runner_context.py - serialized_messages = {} - for source_id, message_list in messages.items(): - serialized_messages[source_id] = [msg.to_dict() for msg in message_list] - - print("Messages serialized successfully") - print(f"Type of serialized_messages['gateway'][0]: {type(serialized_messages['gateway'][0])}") - - # Step 3: Create HandoffUserInputRequest like in _handoff.py line 538 - request_data = HandoffUserInputRequest( - conversation=[ChatMessage(role="user", text="Test")], - awaiting_agent_id="agent1", - prompt="test", - source_executor_id="source1", - ) - - # Step 4: Create RequestInfoEvent like in _workflow_context.py line 374 - request_info_event = RequestInfoEvent( - request_id="req-123", - source_executor_id="gateway", - request_data=request_data, - response_type=object, - ) - - # Step 5: Serialize RequestInfoEvent like line 465 of _runner_context.py - serialized_pending_request_info_events = {"req-123": request_info_event.to_dict()} - - print("Pending requests serialized successfully") - - # Step 6: Create state dict like line 468-473 of _runner_context.py - state = { - "messages": serialized_messages, - "shared_state": encode_checkpoint_value({}), - "iteration_count": 1, - "pending_request_info_events": serialized_pending_request_info_events, - } - - # Step 7: Create WorkflowCheckpoint like line 382-388 of _runner_context.py - checkpoint = WorkflowCheckpoint( - workflow_id="test-workflow", - messages=state["messages"], - shared_state=state["shared_state"], - pending_request_info_events=state["pending_request_info_events"], - iteration_count=state["iteration_count"], - metadata={}, - ) - - print("WorkflowCheckpoint created successfully") - - # Step 8: Convert to dict like line 141 of _checkpoint.py - checkpoint_dict = asdict(checkpoint) - - print("asdict(checkpoint) completed") - - # Step 9: Check types - print("\nInspecting checkpoint_dict...") - print(f"Type of checkpoint_dict['messages']: {type(checkpoint_dict['messages'])}") - if "gateway" in checkpoint_dict["messages"]: - msgs = checkpoint_dict["messages"]["gateway"] - print(f"Type of first message: {type(msgs[0])}") - if not isinstance(msgs[0], dict): - print(f"ERROR: Message is not a dict, it's: {msgs[0]}") - - print( - f"\nType of checkpoint_dict['pending_request_info_events']: {type(checkpoint_dict['pending_request_info_events'])}" - ) - if "req-123" in checkpoint_dict["pending_request_info_events"]: - req = checkpoint_dict["pending_request_info_events"]["req-123"] - print(f"Type of request: {type(req)}") - if "data" in req: - print(f"Type of request['data']: {type(req['data'])}") - - # Step 10: Try json.dump like line 147 of _checkpoint.py - print("\nAttempting json.dump...") - try: - json_str = json.dumps(checkpoint_dict, indent=2) - print(f"SUCCESS! JSON length: {len(json_str)} chars") - except Exception as e: - print(f"FAILED: {e}") - import traceback - - traceback.print_exc() - - # Find the problematic object - print("\n\nSearching for ChatMessage objects...") - - def find_chat_messages(obj, path="root"): - if isinstance(obj, ChatMessage): - print(f"Found ChatMessage at: {path}") - return - if isinstance(obj, dict): - for k, v in obj.items(): - find_chat_messages(v, f"{path}.{k}") - elif isinstance(obj, list): - for i, item in enumerate(obj): - find_chat_messages(item, f"{path}[{i}]") - - find_chat_messages(checkpoint_dict) - - -if __name__ == "__main__": - asyncio.run(main()) From 0d9c6d1db08d2ed44d1de2477cf86444a67cdf7f Mon Sep 17 00:00:00 2001 From: Evan Mattson <35585003+moonbox3@users.noreply.github.com> Date: Fri, 21 Nov 2025 07:53:38 +0900 Subject: [PATCH 6/7] Update python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- ...doff_with_tool_approval_checkpoint_resume.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py b/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py index 196b7e5010..c1d2dc57cc 100644 --- a/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py +++ b/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py @@ -273,14 +273,17 @@ async def resume_with_responses( elif isinstance(event, WorkflowOutputEvent): print("\n[Workflow Output Event - Conversation Update]") - if event.data and isinstance(event.data, list): - # Cast event.data to list[ChatMessage] for type checking - conversation = cast(list[ChatMessage], event.data) # type: ignore + if ( + event.data + and isinstance(event.data, list) + and all(isinstance(msg, ChatMessage) for msg in event.data) + ): + # Now safe to cast event.data to list[ChatMessage] + conversation = cast(list[ChatMessage], event.data) for msg in conversation[-3:]: # Show last 3 messages - if isinstance(msg, ChatMessage): - author = msg.author_name or msg.role.value - text = msg.text[:100] + "..." if len(msg.text) > 100 else msg.text - print(f" {author}: {text}") + author = msg.author_name or msg.role.value + text = msg.text[:100] + "..." if len(msg.text) > 100 else msg.text + print(f" {author}: {text}") elif isinstance(event, RequestInfoEvent): new_pending_requests.append(event) From 77c168881ecaaccaab53ef5daa8d6667a231523c Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Fri, 21 Nov 2025 08:11:32 +0900 Subject: [PATCH 7/7] Fix comment --- .../checkpoint/handoff_with_tool_approval_checkpoint_resume.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py b/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py index 196b7e5010..b443e391d9 100644 --- a/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py +++ b/python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py @@ -32,7 +32,7 @@ Scenario: 1. User starts a conversation with the workflow. 2. Agents may emit user input requests or tool approval requests. -3. Workflow pauses and writes a checkpoint capturing pending requests. +3. Workflow writes a checkpoint capturing pending requests and pauses. 4. Process can exit/restart. 5. On resume: Load the checkpoint, surface pending approvals/user prompts, and provide responses. 6. Workflow continues from the saved state.