Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 100 additions & 9 deletions python/packages/devui/agent_framework_devui/_conversations.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,12 @@ async def list_items(
pass

@abstractmethod
def get_item(self, conversation_id: str, item_id: str) -> ConversationItem | None:
async def get_item(self, conversation_id: str, item_id: str) -> ConversationItem | None:
"""Get a specific conversation item by ID.

Supports checkpoint items - will load full checkpoint state from storage.
For checkpoints, the full state is included in metadata.full_checkpoint.

Args:
conversation_id: Conversation ID
item_id: Item ID
Expand All @@ -162,7 +165,7 @@ def get_thread(self, conversation_id: str) -> AgentThread | None:
pass

@abstractmethod
def list_conversations_by_metadata(self, metadata_filter: dict[str, str]) -> list[Conversation]:
async def list_conversations_by_metadata(self, metadata_filter: dict[str, str]) -> list[Conversation]:
"""Filter conversations by metadata (e.g., agent_id).

Args:
Expand Down Expand Up @@ -444,14 +447,31 @@ async def list_items(
# Get all checkpoints for this conversation
checkpoints = await checkpoint_storage.list_checkpoints()
for checkpoint in checkpoints:
# Create a conversation item for each checkpoint
# Create a conversation item for each checkpoint with summary metadata
# Full checkpoint state is NOT included here (too large for list view)
# Use get_item() to retrieve full checkpoint details
# Calculate approximate size of checkpoint
import json

checkpoint_json = json.dumps(checkpoint.to_dict())
checkpoint_size = len(checkpoint_json.encode("utf-8"))

checkpoint_item = {
"id": f"checkpoint_{checkpoint.checkpoint_id}",
"type": "checkpoint",
"checkpoint_id": checkpoint.checkpoint_id,
"workflow_id": checkpoint.workflow_id,
"timestamp": checkpoint.timestamp,
"status": "completed",
"metadata": {
# Summary metrics for list view
"iteration_count": checkpoint.iteration_count,
"pending_hil_count": len(checkpoint.pending_request_info_events),
"has_pending_hil": len(checkpoint.pending_request_info_events) > 0,
"message_count": sum(len(msgs) for msgs in checkpoint.messages.values()),
"size_bytes": checkpoint_size,
"version": checkpoint.version,
},
}
items.append(cast(ConversationItem, checkpoint_item))

Expand All @@ -472,24 +492,91 @@ async def list_items(

return paginated_items, has_more

def get_item(self, conversation_id: str, item_id: str) -> ConversationItem | None:
"""Get a specific conversation item by ID."""
# Use the item index for O(1) lookup
async def get_item(self, conversation_id: str, item_id: str) -> ConversationItem | None:
"""Get a specific conversation item by ID.

Supports checkpoint items - will load full checkpoint state from storage.
For checkpoints, the full state is included in metadata.full_checkpoint.
"""
# First check item index for messages, function calls, etc. (O(1) lookup)
conv_items = self._item_index.get(conversation_id, {})
return conv_items.get(item_id)
item = conv_items.get(item_id)
if item:
return item

# If not found and ID is a checkpoint, load from checkpoint storage
if item_id.startswith("checkpoint_"):
checkpoint_id = item_id[len("checkpoint_") :] # Remove "checkpoint_" prefix
conv_data = self._conversations.get(conversation_id)
if not conv_data:
return None

checkpoint_storage = conv_data.get("checkpoint_storage")
if not checkpoint_storage:
return None

# Load full checkpoint from storage
checkpoint = await checkpoint_storage.load_checkpoint(checkpoint_id)
if not checkpoint:
return None

# Calculate size of checkpoint
import json

checkpoint_json = json.dumps(checkpoint.to_dict())
checkpoint_size = len(checkpoint_json.encode("utf-8"))

# Build checkpoint item with FULL state in metadata
checkpoint_item = {
"id": item_id,
"type": "checkpoint",
"checkpoint_id": checkpoint.checkpoint_id,
"workflow_id": checkpoint.workflow_id,
"timestamp": checkpoint.timestamp,
"status": "completed",
"metadata": {
# Summary metrics (same as list view)
"iteration_count": checkpoint.iteration_count,
"pending_hil_count": len(checkpoint.pending_request_info_events),
"has_pending_hil": len(checkpoint.pending_request_info_events) > 0,
"message_count": sum(len(msgs) for msgs in checkpoint.messages.values()),
"size_bytes": checkpoint_size,
"version": checkpoint.version,
# 🔥 FULL checkpoint state (lazy loaded)
"full_checkpoint": checkpoint.to_dict(),
},
}

return cast(ConversationItem, checkpoint_item)

return None

def get_thread(self, conversation_id: str) -> AgentThread | None:
"""Get AgentThread for execution - CRITICAL for agent.run_stream()."""
conv_data = self._conversations.get(conversation_id)
return conv_data["thread"] if conv_data else None

def list_conversations_by_metadata(self, metadata_filter: dict[str, str]) -> list[Conversation]:
async def list_conversations_by_metadata(self, metadata_filter: dict[str, str]) -> list[Conversation]:
"""Filter conversations by metadata (e.g., agent_id)."""
results = []
for conv_data in self._conversations.values():
conv_meta = conv_data.get("metadata", {})
conv_meta = conv_data.get("metadata", {}).copy() # Copy to avoid mutating original

# Check if all filter items match
if all(conv_meta.get(k) == v for k, v in metadata_filter.items()):
# Enrich workflow sessions with checkpoint summary
if conv_meta.get("type") == "workflow_session":
checkpoint_storage = conv_data.get("checkpoint_storage")
if checkpoint_storage:
checkpoints = await checkpoint_storage.list_checkpoints()
latest = checkpoints[0] if checkpoints else None
conv_meta["checkpoint_summary"] = {
"count": len(checkpoints),
"latest_iteration": latest.iteration_count if latest else 0,
"has_pending_hil": len(latest.pending_request_info_events) > 0 if latest else False,
"pending_hil_count": len(latest.pending_request_info_events) if latest else 0,
}

results.append(
Conversation(
id=conv_data["id"],
Expand All @@ -498,6 +585,10 @@ def list_conversations_by_metadata(self, metadata_filter: dict[str, str]) -> lis
metadata=conv_meta,
)
)

# Sort by created_at descending (most recent first)
results.sort(key=lambda c: c.created_at, reverse=True)

return results


Expand Down
33 changes: 32 additions & 1 deletion python/packages/devui/agent_framework_devui/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,20 @@ def _extract_user_message_fallback(self, input_data: Any) -> str:
return json.dumps(input_data)
return str(input_data)

def _is_openai_multimodal_format(self, input_data: Any) -> bool:
"""Check if input is OpenAI ResponseInputParam format (list with message items).

Args:
input_data: Input data to check

Returns:
True if input is OpenAI multimodal format
"""
if not isinstance(input_data, list) or not input_data:
return False
first_item = input_data[0]
return isinstance(first_item, dict) and first_item.get("type") == "message"

async def _parse_workflow_input(self, workflow: Any, raw_input: Any) -> Any:
"""Parse input based on workflow's expected input type.

Expand All @@ -733,9 +747,26 @@ async def _parse_workflow_input(self, workflow: Any, raw_input: Any) -> Any:
Parsed input appropriate for the workflow
"""
try:
# Handle structured input
# Handle JSON string input (from frontend api.ts JSON.stringify)
if isinstance(raw_input, str):
try:
parsed = json.loads(raw_input)
raw_input = parsed
except (json.JSONDecodeError, TypeError):
# Plain text string, continue with string handling
pass

# Check for OpenAI multimodal format (list with type: "message")
# This handles ChatMessage inputs with images, files, etc.
if self._is_openai_multimodal_format(raw_input):
logger.debug("Detected OpenAI multimodal format, converting to ChatMessage")
return self._convert_input_to_chat_message(raw_input)

# Handle structured input (dict)
if isinstance(raw_input, dict):
return self._parse_structured_workflow_input(workflow, raw_input)

# Handle string input
return self._parse_raw_workflow_input(workflow, str(raw_input))

except Exception as e:
Expand Down
48 changes: 47 additions & 1 deletion python/packages/devui/agent_framework_devui/_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,10 @@ async def _convert_workflow_event(self, event: Any, context: dict[str, Any]) ->
context[f"exec_item_{executor_id}"] = item_id
context["output_index"] = context.get("output_index", -1) + 1

# Track current executor for routing Magentic agent events
# This allows MagenticAgentDeltaEvent to route to the executor's item
context["current_executor_id"] = executor_id

# Create ExecutorActionItem with proper type
executor_item = ExecutorActionItem(
type="executor_action",
Expand All @@ -908,6 +912,10 @@ async def _convert_workflow_event(self, event: Any, context: dict[str, Any]) ->
executor_id = getattr(event, "executor_id", "unknown")
item_id = context.get(f"exec_item_{executor_id}", f"exec_{executor_id}_unknown")

# Clear current executor tracking when executor completes
if context.get("current_executor_id") == executor_id:
context.pop("current_executor_id", None)

# Create ExecutorActionItem with completed status
# ExecutorCompletedEvent uses 'data' field, not 'result'
executor_item = ExecutorActionItem(
Expand Down Expand Up @@ -1059,6 +1067,30 @@ async def _convert_workflow_event(self, event: Any, context: dict[str, Any]) ->
text = getattr(event, "text", None)

if text:
# Check if we're inside an executor - route to executor's item
# This prevents duplicate timeline entries (executor + inner agent)
current_executor_id = context.get("current_executor_id")
executor_item_key = f"exec_item_{current_executor_id}" if current_executor_id else None

if executor_item_key and executor_item_key in context:
# Route delta to the executor's item instead of creating a new message item
item_id = context[executor_item_key]

# Emit text delta event routed to the executor's item
return [
ResponseTextDeltaEvent(
type="response.output_text.delta",
output_index=context.get("output_index", 0),
content_index=0,
item_id=item_id,
delta=text,
logprobs=[],
sequence_number=self._next_sequence(context),
)
]

# Fallback: No executor context - create separate message item (original behavior)
# This handles cases where MagenticAgentDeltaEvent is emitted outside an executor
events = []

# Track Magentic agent messages separately from regular messages
Expand Down Expand Up @@ -1181,7 +1213,21 @@ async def _convert_workflow_event(self, event: Any, context: dict[str, Any]) ->
agent_id = getattr(event, "agent_id", "unknown_agent")
message = getattr(event, "message", None)

# Track Magentic agent messages
# Check if we're inside an executor - if so, deltas were already routed there
# We don't need to emit a separate message completion event
current_executor_id = context.get("current_executor_id")
executor_item_key = f"exec_item_{current_executor_id}" if current_executor_id else None

if executor_item_key and executor_item_key in context:
# Deltas were routed to executor item - no separate message item to complete
# The executor's output_item.done will mark completion
logger.debug(
f"MagenticAgentMessageEvent from {agent_id} - "
f"deltas routed to executor {current_executor_id}, skipping"
)
return []

# Fallback: Handle case where we created a separate message item (no executor context)
magentic_key = f"magentic_message_{agent_id}"

# Check if we were streaming for this agent
Expand Down
Loading
Loading