Skip to content

Commit 14c6b88

Browse files
committed
Extract learnings
1 parent e871d59 commit 14c6b88

File tree

5 files changed

+197
-33
lines changed

5 files changed

+197
-33
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
"""Agent for extracting learnings from human interactions."""
2+
3+
from agents import Agent
4+
from datetime import timedelta
5+
6+
7+
def new_extract_learnings_agent() -> Agent:
8+
"""
9+
Create an agent that extracts 1-2 sentence learnings from human interactions.
10+
11+
This agent analyzes the full conversation context to understand how we got to
12+
the human interaction and what key insight or decision was made.
13+
14+
Returns:
15+
Agent configured to extract a concise learning
16+
"""
17+
instructions = """
18+
You are a learning extraction agent for a procurement system.
19+
20+
Your job is to analyze only the wait_for_human tool call and extract a concise 1-2 sentence learning that can be applied to future decisions.
21+
22+
The rest of the information is just context but the focus should be on understanding what the human wanted to do and why.
23+
24+
Please extract a 1-2 sentence learning from the wait_for_human tool call.
25+
"""
26+
27+
return Agent(
28+
name="Extract Learnings Agent",
29+
instructions=instructions,
30+
model="gpt-4o",
31+
tools=[], # No tools needed - just analysis
32+
)

examples/demos/procurement_agent/project/scripts/send_test_events.py

Lines changed: 65 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -41,60 +41,92 @@ async def send_fake_events(workflow_id: str):
4141
handle = client.get_workflow_handle(workflow_id)
4242

4343
# Define the procurement event flow for Steel Beams
44-
item = "Steel Beams"
45-
46-
fake_events = [
44+
steel_beams_events = [
4745
SubmitalApprovalEvent(
4846
event_type=EventType.SUBMITTAL_APPROVED,
49-
item=item
47+
item="Steel Beams"
5048
),
5149
ShipmentDepartedFactoryEvent(
5250
event_type=EventType.SHIPMENT_DEPARTED_FACTORY,
53-
item=item,
51+
item="Steel Beams",
5452
eta=datetime(2026, 2, 15)
5553
),
5654
ShipmentArrivedSiteEvent(
5755
event_type=EventType.SHIPMENT_ARRIVED_SITE,
58-
item=item,
56+
item="Steel Beams",
5957
date_arrived=datetime(2026, 2, 15)
6058
),
6159
InspectionFailedEvent(
6260
event_type=EventType.INSPECTION_FAILED,
63-
item=item,
61+
item="Steel Beams",
6462
inspection_date=datetime(2026, 2, 16)
6563
)
6664
]
6765

66+
# Define the procurement event flow for HVAC Units
67+
hvac_events = [
68+
SubmitalApprovalEvent(
69+
event_type=EventType.SUBMITTAL_APPROVED,
70+
item="HVAC Units"
71+
),
72+
ShipmentDepartedFactoryEvent(
73+
event_type=EventType.SHIPMENT_DEPARTED_FACTORY,
74+
item="HVAC Units",
75+
eta=datetime(2026, 3, 10)
76+
),
77+
ShipmentArrivedSiteEvent(
78+
event_type=EventType.SHIPMENT_ARRIVED_SITE,
79+
item="HVAC Units",
80+
date_arrived=datetime(2026, 3, 10)
81+
),
82+
InspectionFailedEvent(
83+
event_type=EventType.INSPECTION_FAILED,
84+
item="HVAC Units",
85+
inspection_date=datetime(2026, 3, 11)
86+
)
87+
]
88+
89+
# Combine all events
90+
all_events = [
91+
("Steel Beams", steel_beams_events),
92+
("HVAC Units", hvac_events)
93+
]
94+
6895
print(f"Connected to workflow: {workflow_id}")
6996
print("=" * 60)
70-
print("Sending procurement events for Steel Beams...")
97+
print("Sending procurement events...")
7198
print("=" * 60)
7299

73-
for i, event in enumerate(fake_events, 1):
74-
print(f"\n[Event {i}] Sending: {event.event_type.value}")
75-
print(f" Item: {event.item}")
76-
77-
# Show additional details based on event type
78-
if hasattr(event, 'eta'):
79-
print(f" ETA: {event.eta}")
80-
if hasattr(event, 'date_arrived'):
81-
print(f" Date Arrived: {event.date_arrived}")
82-
if hasattr(event, 'inspection_date'):
83-
print(f" Inspection Date: {event.inspection_date}")
84-
85-
try:
86-
# Send the event using the send_event signal
87-
# Convert event to JSON string
88-
event_data = event.model_dump_json()
89-
await handle.signal("send_event", event_data)
90-
print(f"✓ Event sent successfully!")
91-
92-
# Wait a bit between events so you can see them being processed
93-
await asyncio.sleep(2)
94-
95-
except Exception as e:
96-
print(f"✗ Error sending event: {e}")
97-
logger.error(f"Failed to send event: {e}")
100+
for item_name, events in all_events:
101+
print(f"\n{'=' * 60}")
102+
print(f"Processing: {item_name}")
103+
print("=" * 60)
104+
105+
for i, event in enumerate(events, 1):
106+
print(f"\n[Event {i}] Sending: {event.event_type.value}")
107+
print(f" Item: {event.item}")
108+
109+
# Show additional details based on event type
110+
if hasattr(event, 'eta'):
111+
print(f" ETA: {event.eta}")
112+
if hasattr(event, 'date_arrived'):
113+
print(f" Date Arrived: {event.date_arrived}")
114+
if hasattr(event, 'inspection_date'):
115+
print(f" Inspection Date: {event.inspection_date}")
116+
117+
try:
118+
# Send the event using the send_event signal
119+
# Convert event to JSON string
120+
event_data = event.model_dump_json()
121+
await handle.signal("send_event", event_data)
122+
print(f"✓ Event sent successfully!")
123+
124+
# Wait a bit between events so you can see them being processed
125+
await asyncio.sleep(2)
126+
127+
except Exception as e:
128+
print(f"✗ Error sending event: {e}")
129+
logger.error(f"Failed to send event: {e}")
98130

99131
print("\n" + "=" * 60)
100132
print("All events have been sent!")
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
"""Utility functions for the procurement agent."""
2+
3+
from project.utils.learning_extraction import get_new_wait_for_human_context
4+
5+
__all__ = ["get_new_wait_for_human_context"]
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
"""Utility for extracting new context from human interactions using a "going backwards" approach.
2+
3+
This module prevents re-processing old wait_for_human calls by:
4+
1. Iterating backwards through the conversation
5+
2. Stopping when we hit a previously-processed wait_for_human call
6+
3. Returning only the NEW portion of the conversation
7+
"""
8+
9+
from typing import List, Dict, Any, Set, Optional, Tuple
10+
from agentex.lib.utils.logging import make_logger
11+
12+
logger = make_logger(__name__)
13+
14+
15+
def get_new_wait_for_human_context(
16+
full_conversation: List[Dict[str, Any]],
17+
extracted_learning_call_ids: Set[str],
18+
) -> Optional[Tuple[List[Dict[str, Any]], str]]:
19+
"""
20+
Extract NEW context since the last processed wait_for_human call.
21+
22+
Similar to OpenCode's filterCompacted() pattern, this function:
23+
- Iterates backwards through the full conversation history
24+
- Stops when it finds a wait_for_human call we've already processed
25+
- Returns only the NEW context
26+
27+
Args:
28+
full_conversation: The complete conversation history (self._state.input_list)
29+
extracted_learning_call_ids: Set of call_ids we've already extracted learnings from
30+
31+
Returns:
32+
Tuple of (new_context_messages, call_id) if a new wait_for_human was found, None otherwise
33+
"""
34+
# Go backwards through the conversation to find new wait_for_human calls
35+
new_context = []
36+
found_new_wait_for_human = False
37+
new_wait_for_human_call_id = None
38+
39+
for item in reversed(full_conversation):
40+
# Always collect items as we go backwards
41+
new_context.append(item)
42+
43+
# Check if this is a wait_for_human function call
44+
if isinstance(item, dict) and item.get("type") == "function_call":
45+
if item.get("name") == "wait_for_human":
46+
call_id = item.get("call_id")
47+
48+
# If we've already extracted learning for this call_id, STOP
49+
if call_id in extracted_learning_call_ids:
50+
logger.info(f"Found already-processed wait_for_human call_id: {call_id}, stopping")
51+
break
52+
53+
# This is a NEW wait_for_human call
54+
if not found_new_wait_for_human:
55+
found_new_wait_for_human = True
56+
new_wait_for_human_call_id = call_id
57+
logger.info(f"Found NEW wait_for_human call_id: {call_id}")
58+
59+
# If we found a new wait_for_human call, return the new context
60+
if found_new_wait_for_human:
61+
# Reverse back to chronological order
62+
new_context.reverse()
63+
logger.info(f"Extracted {len(new_context)} messages of new context")
64+
return (new_context, new_wait_for_human_call_id)
65+
else:
66+
logger.info("No new wait_for_human calls found")
67+
return None

examples/demos/procurement_agent/project/workflow.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
from agentex.lib.core.temporal.plugins.openai_agents.hooks.hooks import TemporalStreamingHooks
2020

2121
from project.agents.procurement_agent import new_procurement_agent
22+
from project.agents.extract_learnings_agent import new_extract_learnings_agent
23+
from project.utils.learning_extraction import get_new_wait_for_human_context
2224
from project.activities.activities import create_master_construction_schedule, get_master_construction_schedule
2325
from project.models.events import (
2426
EventType,
@@ -65,6 +67,7 @@ def __init__(self):
6567
self.human_queue: asyncio.Queue = asyncio.Queue() # Human input
6668
self.event_log: list = []
6769
self.human_input_learnings: list = []
70+
self.extracted_learning_call_ids: set = set() # Track which wait_for_human calls we've extracted learnings from
6871

6972
# Define activity retry policy with exponential backoff
7073
# Based on Temporal best practices from blog post
@@ -227,6 +230,31 @@ async def on_task_create(self, params: CreateTaskParams) -> str:
227230
self._state.input_list = result.to_input_list()
228231
logger.info(f"Successfully processed event at turn {self._state.turn_number}")
229232

233+
# Extract learnings from NEW wait_for_human calls only (using going backwards approach)
234+
try:
235+
result_context = get_new_wait_for_human_context(
236+
full_conversation=self._state.input_list,
237+
extracted_learning_call_ids=self.extracted_learning_call_ids,
238+
)
239+
240+
if result_context is not None:
241+
new_context, call_id = result_context
242+
logger.info("Found new wait_for_human call, extracting learning...")
243+
244+
# Create extraction agent and run with only the NEW context
245+
extract_agent = new_extract_learnings_agent()
246+
extraction_result = await Runner.run(extract_agent, new_context, hooks=hooks)
247+
248+
# Append the learning and track the call_id
249+
learning = extraction_result.final_output
250+
if learning:
251+
self.human_input_learnings.append(learning)
252+
self.extracted_learning_call_ids.add(call_id)
253+
logger.info(f"Extracted learning: {learning}")
254+
255+
except Exception as e:
256+
logger.error(f"Failed to extract learning: {e}")
257+
230258
except Exception as e:
231259
# Agent execution failed - graceful degradation
232260
logger.error(f"Agent execution failed processing event: {e}")

0 commit comments

Comments
 (0)