Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions python/packages/devui/agent_framework_devui/_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,15 @@ def invalidate_entity(self, entity_id: str) -> None:
Args:
entity_id: Entity identifier to invalidate
"""
# Check if entity is in-memory - these cannot be invalidated
entity_info = self._entities.get(entity_id)
if entity_info and entity_info.source == "in_memory":
logger.warning(
f"Attempted to invalidate in-memory entity {entity_id} - ignoring "
f"(in-memory entities cannot be reloaded)"
)
return

# Remove from loaded objects cache
if entity_id in self._loaded_objects:
del self._loaded_objects[entity_id]
Expand Down Expand Up @@ -366,6 +375,7 @@ async def create_entity_info_from_object(
description=description,
type=entity_type,
framework="agent_framework",
source=source, # IMPORTANT: Pass the source parameter
tools=[str(tool) for tool in (tools_list or [])],
instructions=instructions,
model_id=model,
Expand Down
5 changes: 4 additions & 1 deletion python/packages/devui/agent_framework_devui/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,11 @@ async def _execute_workflow(
except Exception as e:
logger.warning(f"Could not convert HIL responses to proper types: {e}")

# Step 2: Now send responses to the in-memory workflow
async for event in workflow.send_responses_streaming(hil_responses):
# Enrich new RequestInfoEvents that may come from subsequent HIL requests
if isinstance(event, RequestInfoEvent):
self._enrich_request_info_event_with_response_schema(event, workflow)

for trace_event in trace_collector.get_pending_events():
yield trace_event
yield event
Expand Down
71 changes: 20 additions & 51 deletions python/packages/devui/agent_framework_devui/_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
InputTokensDetails,
OpenAIResponse,
OutputTokensDetails,
ResponseCompletedEvent,
ResponseErrorEvent,
ResponseFunctionCallArgumentsDeltaEvent,
ResponseFunctionResultComplete,
Expand Down Expand Up @@ -186,6 +185,8 @@ async def convert_event(self, raw_event: Any, request: AgentFrameworkRequest) ->
if isinstance(raw_event, AgentRunUpdateEvent):
# Extract the AgentRunResponseUpdate from the event's data attribute
if raw_event.data and isinstance(raw_event.data, AgentRunResponseUpdate):
# Preserve executor_id in context for proper output routing
context["current_executor_id"] = raw_event.executor_id
return await self._convert_agent_update(raw_event.data, context)
# If no data, treat as generic workflow event
return await self._convert_workflow_event(raw_event, context)
Expand Down Expand Up @@ -502,8 +503,17 @@ async def _convert_agent_update(self, update: Any, context: dict[str, Any]) -> S
# Check if we're streaming text content
has_text_content = any(content.__class__.__name__ == "TextContent" for content in update.contents)

# If we have text content and haven't created a message yet, create one
if has_text_content and "current_message_id" not in context:
# Check if we're in an executor context with an existing item
executor_id = context.get("current_executor_id")
executor_item_key = f"exec_item_{executor_id}" if executor_id else None

# If we have an executor item, use it for deltas instead of creating a message
if has_text_content and executor_item_key and executor_item_key in context:
# Use the executor's item ID for this agent's output
context["current_message_id"] = context[executor_item_key]
# Note: We don't create a new message item here since the executor item already exists
# Otherwise, create a message item if we haven't yet (for non-executor contexts)
elif has_text_content and "current_message_id" not in context:
message_id = f"msg_{uuid4().hex[:8]}"
context["current_message_id"] = message_id
context["output_index"] = context.get("output_index", -1) + 1
Expand Down Expand Up @@ -671,25 +681,9 @@ async def _convert_agent_lifecycle_event(self, event: Any, context: dict[str, An
]

if isinstance(event, AgentCompletedEvent):
execution_id = context.get("execution_id", f"agent_{uuid4().hex[:12]}")

response_obj = Response(
id=f"resp_{execution_id}",
object="response",
created_at=float(time.time()),
model=model_name,
output=[],
status="completed",
parallel_tool_calls=False,
tool_choice="none",
tools=[],
)

return [
ResponseCompletedEvent(
type="response.completed", sequence_number=self._next_sequence(context), response=response_obj
)
]
# Don't emit response.completed here - the server will emit a proper one
# with usage data after aggregating all events
return []

if isinstance(event, AgentFailedEvent):
execution_id = context.get("execution_id", f"agent_{uuid4().hex[:12]}")
Expand Down Expand Up @@ -839,35 +833,10 @@ async def _convert_workflow_event(self, event: Any, context: dict[str, Any]) ->
)
]

# Handle WorkflowCompletedEvent - emit response.completed
# Handle WorkflowCompletedEvent - Don't emit response.completed here
# The server will emit a proper one with usage data after aggregating all events
if event_class == "WorkflowCompletedEvent":
workflow_id = context.get("workflow_id", str(uuid4()))

# Import Response type for proper construction
from openai.types.responses import Response

# Get model name from request or use 'devui' as default
request_obj = context.get("request")
model_name = request_obj.model if request_obj and request_obj.model else "devui"

# Create a full Response object for completed state
response_obj = Response(
id=f"resp_{workflow_id}",
object="response",
created_at=float(time.time()),
model=model_name,
output=[], # Output items already sent via output_item.added events
status="completed",
parallel_tool_calls=False,
tool_choice="none",
tools=[],
)

return [
ResponseCompletedEvent(
type="response.completed", sequence_number=self._next_sequence(context), response=response_obj
)
]
return []

if event_class == "WorkflowFailedEvent":
workflow_id = context.get("workflow_id", str(uuid4()))
Expand Down Expand Up @@ -1103,7 +1072,7 @@ async def _convert_workflow_event(self, event: Any, context: dict[str, Any]) ->
context[magentic_key] = message_id
context["output_index"] = context.get("output_index", -1) + 1

# Import required types
# Import required types for creating message containers
from openai.types.responses import ResponseOutputMessage, ResponseOutputText
from openai.types.responses.response_content_part_added_event import (
ResponseContentPartAddedEvent,
Expand Down
22 changes: 20 additions & 2 deletions python/packages/devui/agent_framework_devui/_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async def _ensure_executor(self) -> AgentFrameworkExecutor:
discovery = self.executor.entity_discovery
for entity in self._pending_entities:
try:
entity_info = await discovery.create_entity_info_from_object(entity, source="in-memory")
entity_info = await discovery.create_entity_info_from_object(entity, source="in_memory")
discovery.register_entity(entity_info.id, entity_info, entity)
logger.info(f"Registered in-memory entity: {entity_info.id}")
except Exception as e:
Expand Down Expand Up @@ -552,6 +552,14 @@ async def reload_entity(entity_id: str) -> dict[str, Any]:
if not entity_info:
raise HTTPException(status_code=404, detail=f"Entity {entity_id} not found")

# Check if entity is in-memory (cannot be reloaded)
if entity_info.source == "in_memory":
raise HTTPException(
status_code=400,
detail="In-memory entities cannot be reloaded. "
"They only exist in memory and have no source files to reload from.",
)

# Invalidate cache
executor.entity_discovery.invalidate_entity(entity_id)

Expand Down Expand Up @@ -1049,10 +1057,20 @@ async def _stream_execution(
from .models import ResponseCompletedEvent

final_response = await executor.message_mapper.aggregate_to_response(events, request)

# The sequence number for response.completed should be the next number after all events
# The last event in the list should have the highest sequence number so far
# We need to increment from that
last_seq = 0
for event in reversed(events):
if hasattr(event, "sequence_number") and event.sequence_number is not None:
last_seq = event.sequence_number
break

completed_event = ResponseCompletedEvent(
type="response.completed",
response=final_response,
sequence_number=len(events),
sequence_number=last_seq + 1,
)
yield f"data: {completed_event.model_dump_json()}\n\n"

Expand Down

Large diffs are not rendered by default.

Loading
Loading