diff --git a/python/packages/devui/agent_framework_devui/_mapper.py b/python/packages/devui/agent_framework_devui/_mapper.py index 9d9e0e5ccd..f68fea9d01 100644 --- a/python/packages/devui/agent_framework_devui/_mapper.py +++ b/python/packages/devui/agent_framework_devui/_mapper.py @@ -918,12 +918,16 @@ async def _convert_workflow_event(self, event: Any, context: dict[str, Any]) -> # Create ExecutorActionItem with completed status # ExecutorCompletedEvent uses 'data' field, not 'result' + # Serialize the result data to ensure it's JSON-serializable + # (AgentExecutorResponse contains AgentRunResponse/ChatMessage which are SerializationMixin) + raw_result = getattr(event, "data", None) + serialized_result = self._serialize_value(raw_result) if raw_result is not None else None executor_item = ExecutorActionItem( type="executor_action", id=item_id, executor_id=executor_id, status="completed", - result=getattr(event, "data", None), + result=serialized_result, ) # Use our custom event type @@ -939,7 +943,9 @@ async def _convert_workflow_event(self, event: Any, context: dict[str, Any]) -> if event_class == "ExecutorFailedEvent": executor_id = getattr(event, "executor_id", "unknown") item_id = context.get(f"exec_item_{executor_id}", f"exec_{executor_id}_unknown") - error_info = getattr(event, "error", None) + # ExecutorFailedEvent uses 'details' field (WorkflowErrorDetails), not 'error' + details = getattr(event, "details", None) + err_msg: str | None = str(getattr(details, "message", details)) if details else None # Create ExecutorActionItem with failed status executor_item = ExecutorActionItem( @@ -947,7 +953,7 @@ async def _convert_workflow_event(self, event: Any, context: dict[str, Any]) -> id=item_id, executor_id=executor_id, status="failed", - error={"message": str(error_info)} if error_info else None, + error={"message": err_msg} if err_msg else None, ) # Use our custom event type diff --git a/python/packages/devui/agent_framework_devui/_server.py b/python/packages/devui/agent_framework_devui/_server.py index 284164cefb..31f5ee62c4 100644 --- a/python/packages/devui/agent_framework_devui/_server.py +++ b/python/packages/devui/agent_framework_devui/_server.py @@ -1121,7 +1121,7 @@ async def _stream_execution( yield "data: [DONE]\n\n" except Exception as e: - logger.error(f"Error in streaming execution: {e}") + logger.error(f"Error in streaming execution: {e}", exc_info=True) error_event = {"id": "error", "object": "error", "error": {"message": str(e), "type": "execution_error"}} yield f"data: {json.dumps(error_event)}\n\n" diff --git a/python/packages/devui/pyproject.toml b/python/packages/devui/pyproject.toml index 5dec2ce52c..981b425328 100644 --- a/python/packages/devui/pyproject.toml +++ b/python/packages/devui/pyproject.toml @@ -49,6 +49,7 @@ fallback-version = "0.0.0" [tool.pytest.ini_options] testpaths = 'tests' +pythonpath = ["tests"] addopts = "-ra -q -r fEX" asyncio_mode = "auto" asyncio_default_fixture_loop_scope = "function" diff --git a/python/packages/devui/tests/__init__.py b/python/packages/devui/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/python/packages/devui/tests/test_cleanup_hooks.py b/python/packages/devui/tests/test_cleanup_hooks.py index 241a241f8b..f065c1e0c6 100644 --- a/python/packages/devui/tests/test_cleanup_hooks.py +++ b/python/packages/devui/tests/test_cleanup_hooks.py @@ -36,8 +36,7 @@ def __init__(self, name: str = "TestAgent"): async def run_stream(self, messages=None, *, thread=None, **kwargs): """Mock streaming run method.""" yield AgentRunResponse( - messages=[ChatMessage(role=Role.ASSISTANT, content=[TextContent(text="Test response")])], - inner_messages=[], + messages=[ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Test response")])], ) diff --git a/python/packages/devui/tests/test_discovery.py b/python/packages/devui/tests/test_discovery.py index d063347a88..dbd1ce1074 100644 --- a/python/packages/devui/tests/test_discovery.py +++ b/python/packages/devui/tests/test_discovery.py @@ -21,7 +21,6 @@ def test_entities_dir(): return str(samples_dir.resolve()) -@pytest.mark.skip("Skipping while we fix discovery") async def test_discover_agents(test_entities_dir): """Test that agent discovery works and returns valid agent entities.""" discovery = EntityDiscovery(test_entities_dir) diff --git a/python/packages/devui/tests/test_execution.py b/python/packages/devui/tests/test_execution.py index bdcf2288ae..5d276e79d1 100644 --- a/python/packages/devui/tests/test_execution.py +++ b/python/packages/devui/tests/test_execution.py @@ -1,38 +1,57 @@ # Copyright (c) Microsoft. All rights reserved. -"""Focused tests for execution flow functionality.""" +"""Focused tests for execution flow functionality. + +Tests include: +- Entity discovery and info retrieval +- Agent execution (sync and streaming) using real ChatAgent with mock LLM +- Workflow execution using real WorkflowBuilder with FunctionExecutor +- Edge cases like non-streaming agents +""" import asyncio -import os import tempfile from pathlib import Path +from typing import Any import pytest +import pytest_asyncio +from agent_framework import AgentExecutor, ChatAgent, FunctionExecutor, WorkflowBuilder + +# Import test utilities +from test_helpers import ( + MockBaseChatClient, + create_concurrent_workflow, + create_executor_with_real_agent, + create_sequential_workflow, +) from agent_framework_devui._discovery import EntityDiscovery from agent_framework_devui._executor import AgentFrameworkExecutor, EntityNotFoundError from agent_framework_devui._mapper import MessageMapper from agent_framework_devui.models._openai_custom import AgentFrameworkRequest +# ============================================================================= +# Local Fixtures (async factory-based) +# ============================================================================= -class _DummyStartExecutor: - """Minimal executor stub exposing handler metadata for tests.""" - def __init__(self, *, input_types=None, handlers=None): - if input_types is not None: - self.input_types = list(input_types) - if handlers is not None: - self._handlers = dict(handlers) +@pytest_asyncio.fixture +async def executor_with_real_agent(): + """Create an executor with a REAL ChatAgent using mock chat client.""" + return await create_executor_with_real_agent() -class _DummyWorkflow: - """Simple workflow stub returning configured start executor.""" +@pytest_asyncio.fixture +async def sequential_workflow_fixture(): + """Create a realistic sequential workflow (Writer -> Reviewer).""" + return await create_sequential_workflow() - def __init__(self, start_executor): - self._start_executor = start_executor - def get_start_executor(self): - return self._start_executor +@pytest_asyncio.fixture +async def concurrent_workflow_fixture(): + """Create a realistic concurrent workflow (Researcher | Analyst | Summarizer).""" + return await create_concurrent_workflow() @pytest.fixture @@ -91,18 +110,23 @@ async def test_executor_get_entity_info(executor): assert entity_info.type in ["agent", "workflow", "unknown"] -@pytest.mark.skipif(not os.getenv("OPENAI_API_KEY"), reason="requires OpenAI API key") -async def test_executor_sync_execution(executor): - """Test synchronous execution.""" - entities = await executor.discover_entities() - # Find an agent entity to test with - agents = [e for e in entities if e.type == "agent"] - assert len(agents) > 0, "No agent entities found for testing" - agent_id = agents[0].id +# ============================================================================= +# Agent Execution Tests (using real ChatAgent with mock LLM) +# ============================================================================= + + +async def test_agent_sync_execution(executor_with_real_agent): + """Test synchronous agent execution with REAL ChatAgent (mock LLM). + + This tests the full execution pipeline without needing an API key: + - Real ChatAgent class with middleware + - Real message normalization + - Mock chat client for LLM calls + """ + executor, entity_id, mock_client = executor_with_real_agent - # Use metadata.entity_id for routing request = AgentFrameworkRequest( - metadata={"entity_id": agent_id}, + metadata={"entity_id": entity_id}, input="test data", stream=False, ) @@ -113,21 +137,17 @@ async def test_executor_sync_execution(executor): assert response.model == "devui" assert response.object == "response" assert len(response.output) > 0 - assert response.usage.total_tokens > 0 + # Verify mock client was called + assert mock_client.call_count == 1 -@pytest.mark.skipif(not os.getenv("OPENAI_API_KEY"), reason="requires OpenAI API key") -async def test_executor_sync_execution_with_model(executor): - """Test synchronous execution with model field specified.""" - entities = await executor.discover_entities() - # Find an agent entity to test with - agents = [e for e in entities if e.type == "agent"] - assert len(agents) > 0, "No agent entities found for testing" - agent_id = agents[0].id - # Use metadata.entity_id for routing AND specify a model +async def test_agent_sync_execution_respects_model_field(executor_with_real_agent): + """Test synchronous execution respects the model field in the response.""" + executor, entity_id, mock_client = executor_with_real_agent + request = AgentFrameworkRequest( - metadata={"entity_id": agent_id}, + metadata={"entity_id": entity_id}, model="custom-model-name", input="test data", stream=False, @@ -139,49 +159,254 @@ async def test_executor_sync_execution_with_model(executor): assert response.model == "custom-model-name" assert response.object == "response" assert len(response.output) > 0 - assert response.usage.total_tokens > 0 -@pytest.mark.skipif(not os.getenv("OPENAI_API_KEY"), reason="requires OpenAI API key") -@pytest.mark.skip("Skipping while we fix discovery") -async def test_executor_streaming_execution(executor): - """Test streaming execution.""" - entities = await executor.discover_entities() - # Find an agent entity to test with - agents = [e for e in entities if e.type == "agent"] - assert len(agents) > 0, "No agent entities found for testing" - agent_id = agents[0].id +async def test_chat_client_receives_correct_messages(executor_with_real_agent): + """Verify the mock chat client receives properly formatted messages. + + This tests that the REAL ChatAgent properly: + - Normalizes input messages + - Formats messages for the chat client + """ + executor, entity_id, mock_client = executor_with_real_agent + + request = AgentFrameworkRequest( + metadata={"entity_id": entity_id}, + input="What is 2+2?", + stream=False, + ) + + await executor.execute_sync(request) + + # Verify chat client was called + assert mock_client.call_count == 1 + + # Verify messages were received + assert len(mock_client.received_messages) == 1 + messages = mock_client.received_messages[0] + + # Should have at least one message + assert len(messages) >= 1, f"Expected messages, got: {messages}" + + # Verify the input text is present in the messages + all_text = " ".join(m.text or "" for m in messages) + assert "2+2" in all_text, f"Expected '2+2' in messages, got text: '{all_text}'" + + +# ============================================================================= +# Workflow Execution Tests (using real WorkflowBuilder with FunctionExecutor) +# ============================================================================= + - # Use metadata.entity_id for routing +async def test_workflow_streaming_execution(): + """Test workflow streaming execution with REAL WorkflowBuilder and FunctionExecutor. + + This tests the full workflow execution pipeline without needing an API key. + Uses a simple function-based workflow that processes input. + """ + + # Create a simple workflow using real agent_framework classes + def process_input(input_data: str) -> str: + return f"Processed: {input_data}" + + builder = WorkflowBuilder(name="Test Workflow", description="Test workflow for execution") + start_executor = FunctionExecutor(id="process", func=process_input) + builder.set_start_executor(start_executor) + workflow = builder.build() + + # Create executor and register workflow + discovery = EntityDiscovery(None) + mapper = MessageMapper() + executor = AgentFrameworkExecutor(discovery, mapper) + + entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test") + discovery.register_entity(entity_info.id, entity_info, workflow) + + # Execute workflow request = AgentFrameworkRequest( - metadata={"entity_id": agent_id}, - input="streaming test", + metadata={"entity_id": entity_info.id}, + input="hello workflow", stream=True, ) - event_count = 0 - text_events = [] + events = [] + async for event in executor.execute_streaming(request): + events.append(event) + + # Should get events from workflow execution + assert len(events) > 0, "Should receive events from workflow" + + # Check for workflow-specific events or completion + event_types = [getattr(e, "type", None) for e in events] + assert any(t is not None for t in event_types), f"Should have typed events, got: {event_types}" + + +async def test_workflow_sync_execution(): + """Test synchronous workflow execution.""" + + def echo(text: str) -> str: + return f"Echo: {text}" + + builder = WorkflowBuilder(name="Echo Workflow", description="Simple echo workflow") + start_executor = FunctionExecutor(id="echo", func=echo) + builder.set_start_executor(start_executor) + workflow = builder.build() + + # Create executor and register workflow + discovery = EntityDiscovery(None) + mapper = MessageMapper() + executor = AgentFrameworkExecutor(discovery, mapper) + + entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test") + discovery.register_entity(entity_info.id, entity_info, workflow) + + # Execute workflow synchronously + request = AgentFrameworkRequest( + metadata={"entity_id": entity_info.id}, + input="test input", + stream=False, + ) + + response = await executor.execute_sync(request) + + # Should get a valid response + assert response.object == "response" + assert len(response.output) > 0 + + +# ============================================================================= +# Full Pipeline Serialization Tests (Run + Map + JSON) +# ============================================================================= + + +async def test_full_pipeline_agent_events_are_json_serializable(executor_with_real_agent): + """CRITICAL TEST: Verify ALL events from agent execution can be JSON serialized. + + This tests the exact code path that the server uses: + 1. Execute agent via executor.execute_streaming() + 2. Each event is converted by the mapper + 3. Server calls model_dump_json() on each event for SSE + + If any event contains non-serializable objects (like AgentRunResponse), + this test will fail - catching the bug before it hits production. + """ + executor, entity_id, mock_client = executor_with_real_agent + + request = AgentFrameworkRequest( + metadata={"entity_id": entity_id}, + input="Test message for serialization", + stream=True, + ) + + events = [] + serialization_errors = [] async for event in executor.execute_streaming(request): - event_count += 1 - if hasattr(event, "type") and event.type == "response.output_text.delta": - text_events.append(event.delta) + events.append(event) - if event_count > 10: # Limit for testing - break + # This is EXACTLY what the server does before sending SSE + try: + if hasattr(event, "model_dump_json"): + json_str = event.model_dump_json() + assert json_str is not None + assert len(json_str) > 0 + except Exception as e: + serialization_errors.append(f"Event type={getattr(event, 'type', 'unknown')}: {e}") + + # Should have received events + assert len(events) > 0, "Should receive events from agent execution" + + # NO serialization errors allowed + assert len(serialization_errors) == 0, f"Found {len(serialization_errors)} serialization errors:\n" + "\n".join( + serialization_errors + ) - assert event_count > 0 - assert len(text_events) > 0 +async def test_full_pipeline_workflow_events_are_json_serializable(): + """CRITICAL TEST: Verify ALL events from workflow execution can be JSON serialized. + + This is particularly important for workflows with AgentExecutor because: + - AgentExecutor produces ExecutorCompletedEvent with AgentExecutorResponse + - AgentExecutorResponse contains AgentRunResponse and ChatMessage objects + - These are SerializationMixin objects, not Pydantic, which caused the original bug + + This test ensures the ENTIRE streaming pipeline works end-to-end. + """ + # Create a workflow with AgentExecutor (the problematic case) + mock_client = MockBaseChatClient() + agent = ChatAgent( + id="serialization_test_agent", + name="Serialization Test Agent", + description="Agent for testing serialization", + chat_client=mock_client, + system_message="You are a test assistant.", + ) + + builder = WorkflowBuilder(name="Serialization Test Workflow", description="Test workflow") + agent_executor = AgentExecutor(id="agent_node", agent=agent) + builder.set_start_executor(agent_executor) + workflow = builder.build() + + # Create executor and register + discovery = EntityDiscovery(None) + mapper = MessageMapper() + executor = AgentFrameworkExecutor(discovery, mapper) -async def test_executor_invalid_entity_id(executor): - """Test execution with invalid entity ID.""" + entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test") + discovery.register_entity(entity_info.id, entity_info, workflow) + + request = AgentFrameworkRequest( + metadata={"entity_id": entity_info.id}, + input="Test workflow serialization", + stream=True, + ) + + events = [] + serialization_errors = [] + event_types_seen = [] + + async for event in executor.execute_streaming(request): + events.append(event) + event_type = getattr(event, "type", "unknown") + event_types_seen.append(event_type) + + # This is EXACTLY what the server does before sending SSE + try: + if hasattr(event, "model_dump_json"): + json_str = event.model_dump_json() + assert json_str is not None + assert len(json_str) > 0 + except Exception as e: + serialization_errors.append(f"Event type={event_type}: {e}") + + # Should have received events + assert len(events) > 0, "Should receive events from workflow execution" + + # Verify we got workflow events (not just generic ones) + assert any("output_item" in str(t) for t in event_types_seen), ( + f"Should see output_item events, got: {event_types_seen}" + ) + + # NO serialization errors allowed - this is the critical assertion + assert len(serialization_errors) == 0, ( + f"Found {len(serialization_errors)} serialization errors:\n" + + "\n".join(serialization_errors) + + f"\n\nEvent types seen: {event_types_seen}" + ) + + # Also verify aggregate_to_response works (server calls this after streaming) + final_response = await mapper.aggregate_to_response(events, request) + assert final_response is not None + + +async def test_get_entity_info_raises_for_invalid_id(executor): + """Test that get_entity_info raises EntityNotFoundError for invalid ID.""" with pytest.raises(EntityNotFoundError): executor.get_entity_info("nonexistent_agent") -async def test_executor_missing_entity_id(executor): - """Test get_entity_id returns metadata.entity_id.""" +async def test_request_extracts_entity_id_from_metadata(executor): + """Test that AgentFrameworkRequest extracts entity_id from metadata.""" request = AgentFrameworkRequest( metadata={"entity_id": "my_agent"}, input="test", @@ -193,15 +418,16 @@ async def test_executor_missing_entity_id(executor): assert entity_id == "my_agent" -def test_executor_get_start_executor_message_types_uses_handlers(): - """Ensure handler metadata is surfaced when input_types missing.""" - executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper()) - start_executor = _DummyStartExecutor(handlers={str: lambda *_: None}) - workflow = _DummyWorkflow(start_executor) +@pytest.mark.asyncio +async def test_executor_get_start_executor_message_types(sequential_workflow_fixture): + """Test _get_start_executor_message_types with real workflow.""" + executor, _entity_id, _mock_client, workflow = sequential_workflow_fixture - start, message_types = executor._get_start_executor_message_types(workflow) + start_exec, message_types = executor._get_start_executor_message_types(workflow) - assert start is start_executor + assert start_exec is not None + assert len(message_types) > 0 + # Real sequential workflows accept str input assert str in message_types @@ -216,39 +442,95 @@ def test_executor_select_primary_input_prefers_string(): assert chosen is str -def test_executor_parse_structured_prefers_input_field(): - """Structured payloads map to string when agent start requires text.""" +@pytest.mark.asyncio +async def test_executor_parse_structured_extracts_input_for_string_workflow(): + """Structured payloads extract 'input' field when workflow expects str.""" + from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler + + class StringInputExecutor(Executor): + """Executor that accepts string input directly.""" + + @handler + async def process(self, text: str, ctx: WorkflowContext[Any, Any]) -> None: + await ctx.yield_output(f"Got: {text}") + + workflow = ( + WorkflowBuilder(name="String Workflow", description="Accepts string") + .set_start_executor(StringInputExecutor(id="str_exec")) + .build() + ) + executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper()) - start_executor = _DummyStartExecutor(handlers={type("Req", (), {}): None, str: lambda *_: None}) - workflow = _DummyWorkflow(start_executor) + # When workflow expects str and receives {"input": "hello"}, extract "hello" parsed = executor._parse_structured_workflow_input(workflow, {"input": "hello"}) - assert parsed == "hello" -def test_executor_parse_raw_falls_back_to_string(): - """Raw inputs remain untouched when start executor expects text.""" +@pytest.mark.asyncio +async def test_executor_parse_raw_string_for_string_workflow(): + """Raw string inputs pass through for string-accepting workflows.""" + from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler + + class StringInputExecutor(Executor): + """Executor that accepts string input directly.""" + + @handler + async def process(self, text: str, ctx: WorkflowContext[Any, Any]) -> None: + await ctx.yield_output(f"Got: {text}") + + workflow = ( + WorkflowBuilder(name="String Workflow", description="Accepts string") + .set_start_executor(StringInputExecutor(id="str_exec")) + .build() + ) + executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper()) - start_executor = _DummyStartExecutor(handlers={str: lambda *_: None}) - workflow = _DummyWorkflow(start_executor) + # Raw string should pass through unchanged parsed = executor._parse_raw_workflow_input(workflow, "hi there") - assert parsed == "hi there" -def test_executor_parse_stringified_json_workflow_input(): - """Stringified JSON workflow input (from frontend JSON.stringify) is correctly parsed.""" +@pytest.mark.asyncio +async def test_executor_parse_converts_to_chat_message_for_sequential_workflow(sequential_workflow_fixture): + """Sequential workflows convert string input to ChatMessage.""" + from agent_framework import ChatMessage + + executor, _entity_id, _mock_client, workflow = sequential_workflow_fixture + + # Sequential workflows expect ChatMessage, so raw string becomes ChatMessage + parsed = executor._parse_raw_workflow_input(workflow, "hello") + + assert isinstance(parsed, ChatMessage) + assert parsed.text == "hello" + + +@pytest.mark.asyncio +async def test_executor_parse_stringified_json_workflow_input(): + """Stringified JSON workflow input is parsed when workflow expects Pydantic model.""" + from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler from pydantic import BaseModel class WorkflowInput(BaseModel): input: str metadata: dict | None = None + class PydanticInputExecutor(Executor): + """Executor that accepts a Pydantic model input.""" + + @handler + async def process(self, data: WorkflowInput, ctx: WorkflowContext[Any, Any]) -> None: + await ctx.yield_output(f"Got: {data.input}") + + # Build workflow with Pydantic input type + workflow = ( + WorkflowBuilder(name="Pydantic Workflow", description="Accepts Pydantic input") + .set_start_executor(PydanticInputExecutor(id="pydantic_exec")) + .build() + ) + executor = AgentFrameworkExecutor(EntityDiscovery(None), MessageMapper()) - start_executor = _DummyStartExecutor(handlers={WorkflowInput: lambda *_: None}) - workflow = _DummyWorkflow(start_executor) # Simulate frontend sending JSON.stringify({"input": "testing!", "metadata": {"key": "value"}}) stringified_json = '{"input": "testing!", "metadata": {"key": "value"}}' @@ -333,6 +615,151 @@ def get_new_thread(self, **kwargs): assert "Processed: hello" in text_events[0].delta +# ============================================================================= +# Full Pipeline Tests for SequentialBuilder +# ============================================================================= + + +@pytest.mark.asyncio +async def test_full_pipeline_sequential_workflow(sequential_workflow_fixture): + """Test SequentialBuilder workflow full pipeline with JSON serialization. + + Uses the shared sequential_workflow_fixture (Writer → Reviewer) from conftest. + Tests that all events can be JSON serialized for SSE streaming. + """ + executor, entity_id, mock_client, _workflow = sequential_workflow_fixture + + request = AgentFrameworkRequest( + metadata={"entity_id": entity_id}, + input="Write about testing best practices", + stream=True, + ) + + events = [] + serialization_errors = [] + + async for event in executor.execute_streaming(request): + events.append(event) + event_type = getattr(event, "type", "unknown") + + # Verify JSON serialization (exactly what server does for SSE) + try: + if hasattr(event, "model_dump_json"): + json_str = event.model_dump_json() + assert json_str is not None + except Exception as e: + serialization_errors.append(f"Event type={event_type}: {e}") + + assert len(events) > 0, "Should receive events from sequential workflow" + assert len(serialization_errors) == 0, f"Serialization errors: {serialization_errors}" + assert mock_client.call_count >= 2, f"Expected both agents called, got {mock_client.call_count}" + + +@pytest.mark.asyncio +async def test_full_pipeline_concurrent_workflow(concurrent_workflow_fixture): + """Test ConcurrentBuilder workflow full pipeline with JSON serialization. + + Uses the shared concurrent_workflow_fixture (Researcher | Analyst | Summarizer) from conftest. + Tests fan-out/fan-in pattern with parallel agent execution. + """ + executor, entity_id, mock_client, _workflow = concurrent_workflow_fixture + + request = AgentFrameworkRequest( + metadata={"entity_id": entity_id}, + input="Analyze market trends for Q4", + stream=True, + ) + + events = [] + serialization_errors = [] + + async for event in executor.execute_streaming(request): + events.append(event) + event_type = getattr(event, "type", "unknown") + + # Verify JSON serialization + try: + if hasattr(event, "model_dump_json"): + json_str = event.model_dump_json() + assert json_str is not None + except Exception as e: + serialization_errors.append(f"Event type={event_type}: {e}") + + assert len(events) > 0, "Should receive events from concurrent workflow" + assert len(serialization_errors) == 0, f"Serialization errors: {serialization_errors}" + assert mock_client.call_count >= 3, f"Expected all 3 agents called, got {mock_client.call_count}" + + +# ============================================================================= +# Full Pipeline Test for Workflow with Output Events +# ============================================================================= + + +@pytest.mark.asyncio +async def test_full_pipeline_workflow_output_event_serialization(): + """Test that WorkflowOutputEvent from ctx.yield_output() serializes correctly. + + This tests the pattern where executors yield output via ctx.yield_output(), + which emits WorkflowOutputEvent that DevUI must serialize for SSE. + """ + from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler + + class OutputtingExecutor(Executor): + """Executor that yields multiple outputs.""" + + @handler + async def process(self, input_text: str, ctx: WorkflowContext[Any, Any]) -> None: + await ctx.yield_output(f"First output: {input_text}") + await ctx.yield_output("Second output: processed") + await ctx.yield_output({"final": "result", "data": [1, 2, 3]}) + + # Build workflow + workflow = ( + WorkflowBuilder(name="Output Workflow", description="Tests yield_output") + .set_start_executor(OutputtingExecutor(id="outputter")) + .build() + ) + + # Create DevUI executor and register workflow + discovery = EntityDiscovery(None) + mapper = MessageMapper() + executor = AgentFrameworkExecutor(discovery, mapper) + + entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test") + discovery.register_entity(entity_info.id, entity_info, workflow) + + # Execute with streaming + request = AgentFrameworkRequest( + metadata={"entity_id": entity_info.id}, + input="Test output events", + stream=True, + ) + + events = [] + output_events = [] + serialization_errors = [] + + async for event in executor.execute_streaming(request): + events.append(event) + event_type = getattr(event, "type", "") + + # Track output item events + if "output_item" in event_type: + output_events.append(event) + + try: + if hasattr(event, "model_dump_json"): + event.model_dump_json() + except Exception as e: + serialization_errors.append(f"Event type={event_type}: {e}") + + assert len(events) > 0, "Should receive events" + assert len(serialization_errors) == 0, f"Serialization errors: {serialization_errors}" + + # Should have received output events for the yield_output calls + assert len(output_events) >= 3, f"Expected 3+ output events for yield_output calls, got {len(output_events)}" + + if __name__ == "__main__": # Simple test runner async def run_tests(): diff --git a/python/packages/devui/tests/test_helpers.py b/python/packages/devui/tests/test_helpers.py new file mode 100644 index 0000000000..ebb03c4c15 --- /dev/null +++ b/python/packages/devui/tests/test_helpers.py @@ -0,0 +1,467 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Shared test utilities for DevUI tests. + +This module provides reusable test helpers including: +- Mock chat clients that don't require API keys +- Real workflow event classes from agent_framework +- Test agents and executors for workflow testing +- Factory functions for test data + +These follow the patterns established in other agent_framework packages +(like a2a, ag-ui) which use explicit imports instead of conftest.py +to avoid pytest plugin conflicts when running tests across packages. +""" + +from collections.abc import AsyncIterable, MutableSequence +from typing import Any + +from agent_framework import ( + AgentRunResponse, + AgentRunResponseUpdate, + AgentThread, + BaseAgent, + BaseChatClient, + ChatAgent, + ChatMessage, + ChatOptions, + ChatResponse, + ChatResponseUpdate, + ConcurrentBuilder, + FunctionCallContent, + FunctionResultContent, + Role, + SequentialBuilder, + TextContent, + use_chat_middleware, +) +from agent_framework._workflows._agent_executor import AgentExecutorResponse + +# Import real workflow event classes - NOT mocks! +from agent_framework._workflows._events import ( + ExecutorCompletedEvent, + ExecutorFailedEvent, + ExecutorInvokedEvent, + WorkflowErrorDetails, +) + +from agent_framework_devui._discovery import EntityDiscovery +from agent_framework_devui._executor import AgentFrameworkExecutor +from agent_framework_devui._mapper import MessageMapper +from agent_framework_devui.models._openai_custom import AgentFrameworkRequest + +# ============================================================================= +# Mock Chat Clients (from core tests pattern) +# ============================================================================= + + +class MockChatClient: + """Simple mock chat client that doesn't require API keys. + + Configure responses by setting `responses` or `streaming_responses` lists. + """ + + def __init__(self) -> None: + self.additional_properties: dict[str, Any] = {} + self.call_count: int = 0 + self.responses: list[ChatResponse] = [] + self.streaming_responses: list[list[ChatResponseUpdate]] = [] + + async def get_response( + self, + messages: str | ChatMessage | list[str] | list[ChatMessage], + **kwargs: Any, + ) -> ChatResponse: + self.call_count += 1 + if self.responses: + return self.responses.pop(0) + return ChatResponse(messages=ChatMessage(role="assistant", text="test response")) + + async def get_streaming_response( + self, + messages: str | ChatMessage | list[str] | list[ChatMessage], + **kwargs: Any, + ) -> AsyncIterable[ChatResponseUpdate]: + self.call_count += 1 + if self.streaming_responses: + for update in self.streaming_responses.pop(0): + yield update + else: + yield ChatResponseUpdate(text=TextContent(text="test streaming response"), role="assistant") + + +@use_chat_middleware +class MockBaseChatClient(BaseChatClient): + """Full BaseChatClient mock with middleware support. + + Use this when testing features that require the full BaseChatClient interface. + This goes through all the middleware, message normalization, etc. - only the + actual LLM call is mocked. + """ + + def __init__(self, **kwargs: Any): + super().__init__(**kwargs) + self.run_responses: list[ChatResponse] = [] + self.streaming_responses: list[list[ChatResponseUpdate]] = [] + self.call_count: int = 0 + self.received_messages: list[list[ChatMessage]] = [] + + async def _inner_get_response( + self, + *, + messages: MutableSequence[ChatMessage], + chat_options: ChatOptions, + **kwargs: Any, + ) -> ChatResponse: + self.call_count += 1 + self.received_messages.append(list(messages)) + if self.run_responses: + return self.run_responses.pop(0) + return ChatResponse(messages=ChatMessage(role="assistant", text="Mock response from ChatAgent")) + + async def _inner_get_streaming_response( + self, + *, + messages: MutableSequence[ChatMessage], + chat_options: ChatOptions, + **kwargs: Any, + ) -> AsyncIterable[ChatResponseUpdate]: + self.call_count += 1 + self.received_messages.append(list(messages)) + if self.streaming_responses: + for update in self.streaming_responses.pop(0): + yield update + else: + # Simulate realistic streaming chunks + yield ChatResponseUpdate(text=TextContent(text="Mock "), role="assistant") + yield ChatResponseUpdate(text=TextContent(text="streaming "), role="assistant") + yield ChatResponseUpdate(text=TextContent(text="response "), role="assistant") + yield ChatResponseUpdate(text=TextContent(text="from ChatAgent"), role="assistant") + + +# ============================================================================= +# Mock Agents (for workflow testing without API keys) +# ============================================================================= + + +class MockAgent(BaseAgent): + """Mock agent that returns configurable responses without needing a chat client.""" + + def __init__( + self, + response_text: str = "Mock agent response", + streaming_chunks: list[str] | None = None, + **kwargs: Any, + ): + super().__init__(**kwargs) + self.response_text = response_text + self.streaming_chunks = streaming_chunks or [response_text] + self.call_count = 0 + + async def run( + self, + messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + *, + thread: AgentThread | None = None, + **kwargs: Any, + ) -> AgentRunResponse: + self.call_count += 1 + return AgentRunResponse( + messages=[ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text=self.response_text)])] + ) + + async def run_stream( + self, + messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + *, + thread: AgentThread | None = None, + **kwargs: Any, + ) -> AsyncIterable[AgentRunResponseUpdate]: + self.call_count += 1 + for chunk in self.streaming_chunks: + yield AgentRunResponseUpdate(contents=[TextContent(text=chunk)], role=Role.ASSISTANT) + + +class MockToolCallingAgent(BaseAgent): + """Mock agent that simulates tool calls and results in streaming mode.""" + + def __init__(self, **kwargs: Any): + super().__init__(**kwargs) + self.call_count = 0 + + async def run( + self, + messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + *, + thread: AgentThread | None = None, + **kwargs: Any, + ) -> AgentRunResponse: + self.call_count += 1 + return AgentRunResponse(messages=[ChatMessage(role=Role.ASSISTANT, text="done")]) + + async def run_stream( + self, + messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + *, + thread: AgentThread | None = None, + **kwargs: Any, + ) -> AsyncIterable[AgentRunResponseUpdate]: + self.call_count += 1 + # First: text + yield AgentRunResponseUpdate( + contents=[TextContent(text="Let me search for that...")], + role=Role.ASSISTANT, + ) + # Second: tool call + yield AgentRunResponseUpdate( + contents=[ + FunctionCallContent( + call_id="call_123", + name="search", + arguments={"query": "weather"}, + ) + ], + role=Role.ASSISTANT, + ) + # Third: tool result + yield AgentRunResponseUpdate( + contents=[ + FunctionResultContent( + call_id="call_123", + result={"temperature": 72, "condition": "sunny"}, + ) + ], + role=Role.TOOL, + ) + # Fourth: final text + yield AgentRunResponseUpdate( + contents=[TextContent(text="The weather is sunny, 72°F.")], + role=Role.ASSISTANT, + ) + + +# ============================================================================= +# Factory Functions for Test Data +# ============================================================================= + + +def create_mapper() -> MessageMapper: + """Create a fresh MessageMapper.""" + return MessageMapper() + + +def create_test_request( + entity_id: str = "test_agent", + input_text: str = "Test input", + stream: bool = True, +) -> AgentFrameworkRequest: + """Create a standard test request.""" + return AgentFrameworkRequest( + metadata={"entity_id": entity_id}, + input=input_text, + stream=stream, + ) + + +def create_mock_chat_client() -> MockChatClient: + """Create a mock chat client.""" + return MockChatClient() + + +def create_mock_base_chat_client() -> MockBaseChatClient: + """Create a mock BaseChatClient.""" + return MockBaseChatClient() + + +def create_mock_agent( + id: str = "test_agent", + name: str = "TestAgent", + response_text: str = "Mock agent response", +) -> MockAgent: + """Create a mock agent.""" + return MockAgent(id=id, name=name, response_text=response_text) + + +def create_mock_tool_agent(id: str = "tool_agent", name: str = "ToolAgent") -> MockToolCallingAgent: + """Create a mock agent that simulates tool calls.""" + return MockToolCallingAgent(id=id, name=name) + + +def create_agent_run_response(text: str = "Test response") -> AgentRunResponse: + """Create an AgentRunResponse with the given text.""" + return AgentRunResponse(messages=[ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text=text)])]) + + +def create_agent_executor_response( + executor_id: str = "test_executor", + response_text: str = "Executor response", +) -> AgentExecutorResponse: + """Create an AgentExecutorResponse - the type that's nested in ExecutorCompletedEvent.data.""" + agent_response = create_agent_run_response(response_text) + return AgentExecutorResponse( + executor_id=executor_id, + agent_run_response=agent_response, + full_conversation=[ + ChatMessage(role=Role.USER, contents=[TextContent(text="User input")]), + ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text=response_text)]), + ], + ) + + +def create_executor_completed_event( + executor_id: str = "test_executor", + with_agent_response: bool = True, +) -> ExecutorCompletedEvent: + """Create an ExecutorCompletedEvent with realistic nested data. + + This creates the exact data structure that caused the serialization bug: + ExecutorCompletedEvent.data contains AgentExecutorResponse which contains + AgentRunResponse and ChatMessage objects (SerializationMixin, not Pydantic). + """ + data = create_agent_executor_response(executor_id) if with_agent_response else {"simple": "dict"} + return ExecutorCompletedEvent(executor_id=executor_id, data=data) + + +def create_executor_invoked_event(executor_id: str = "test_executor") -> ExecutorInvokedEvent: + """Create an ExecutorInvokedEvent.""" + return ExecutorInvokedEvent(executor_id=executor_id) + + +def create_executor_failed_event( + executor_id: str = "test_executor", + error_message: str = "Test error", +) -> ExecutorFailedEvent: + """Create an ExecutorFailedEvent.""" + details = WorkflowErrorDetails(error_type="TestError", message=error_message) + return ExecutorFailedEvent(executor_id=executor_id, details=details) + + +# ============================================================================= +# Workflow Setup Helpers (async factory functions) +# ============================================================================= + + +async def create_executor_with_real_agent() -> tuple[AgentFrameworkExecutor, str, MockBaseChatClient]: + """Create an executor with a REAL ChatAgent using mock chat client. + + This tests the full execution pipeline: + - Real ChatAgent class + - Real message handling and normalization + - Real middleware pipeline + - Only the LLM call is mocked + + Returns tuple of (executor, entity_id, mock_client) so tests can access all components. + """ + mock_client = MockBaseChatClient() + discovery = EntityDiscovery(None) + mapper = MessageMapper() + executor = AgentFrameworkExecutor(discovery, mapper) + + # Create a REAL ChatAgent with mock client + agent = ChatAgent( + id="test_chat_agent", + name="Test Chat Agent", + description="A real ChatAgent for testing execution flow", + chat_client=mock_client, + system_message="You are a helpful test assistant.", + ) + + # Register the real agent + entity_info = await discovery.create_entity_info_from_object(agent, source="test") + discovery.register_entity(entity_info.id, entity_info, agent) + + return executor, entity_info.id, mock_client + + +async def create_sequential_workflow() -> tuple[AgentFrameworkExecutor, str, MockBaseChatClient, Any]: + """Create a realistic sequential workflow (Writer -> Reviewer). + + This provides a reusable multi-agent workflow that: + - Chains 2 ChatAgents sequentially + - Writer generates content, Reviewer provides feedback + - Pre-configures mock responses for both agents + + Returns tuple of (executor, entity_id, mock_client, workflow) for test access. + """ + mock_client = MockBaseChatClient() + mock_client.run_responses = [ + ChatResponse(messages=ChatMessage(role=Role.ASSISTANT, text="Here's the draft content about the topic.")), + ChatResponse(messages=ChatMessage(role=Role.ASSISTANT, text="Review: Content is clear and well-structured.")), + ] + + writer = ChatAgent( + id="writer", + name="Writer", + description="Content writer agent", + chat_client=mock_client, + system_message="You are a content writer. Create clear, engaging content.", + ) + reviewer = ChatAgent( + id="reviewer", + name="Reviewer", + description="Content reviewer agent", + chat_client=mock_client, + system_message="You are a reviewer. Provide constructive feedback.", + ) + + workflow = SequentialBuilder().participants([writer, reviewer]).build() + + discovery = EntityDiscovery(None) + mapper = MessageMapper() + executor = AgentFrameworkExecutor(discovery, mapper) + + entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test") + discovery.register_entity(entity_info.id, entity_info, workflow) + + return executor, entity_info.id, mock_client, workflow + + +async def create_concurrent_workflow() -> tuple[AgentFrameworkExecutor, str, MockBaseChatClient, Any]: + """Create a realistic concurrent workflow (Researcher | Analyst | Summarizer). + + This provides a reusable fan-out/fan-in workflow that: + - Runs 3 ChatAgents in parallel + - Each agent processes the same input independently + - Pre-configures mock responses for all agents + + Returns tuple of (executor, entity_id, mock_client, workflow) for test access. + """ + mock_client = MockBaseChatClient() + mock_client.run_responses = [ + ChatResponse(messages=ChatMessage(role=Role.ASSISTANT, text="Research findings: Key data points identified.")), + ChatResponse(messages=ChatMessage(role=Role.ASSISTANT, text="Analysis: Trends indicate positive growth.")), + ChatResponse(messages=ChatMessage(role=Role.ASSISTANT, text="Summary: Overall outlook is favorable.")), + ] + + researcher = ChatAgent( + id="researcher", + name="Researcher", + description="Research agent", + chat_client=mock_client, + system_message="You are a researcher. Find key data and insights.", + ) + analyst = ChatAgent( + id="analyst", + name="Analyst", + description="Analysis agent", + chat_client=mock_client, + system_message="You are an analyst. Identify trends and patterns.", + ) + summarizer = ChatAgent( + id="summarizer", + name="Summarizer", + description="Summary agent", + chat_client=mock_client, + system_message="You are a summarizer. Provide concise summaries.", + ) + + workflow = ConcurrentBuilder().participants([researcher, analyst, summarizer]).build() + + discovery = EntityDiscovery(None) + mapper = MessageMapper() + executor = AgentFrameworkExecutor(discovery, mapper) + + entity_info = await discovery.create_entity_info_from_object(workflow, entity_type="workflow", source="test") + discovery.register_entity(entity_info.id, entity_info, workflow) + + return executor, entity_info.id, mock_client, workflow diff --git a/python/packages/devui/tests/test_mapper.py b/python/packages/devui/tests/test_mapper.py index 7c677495f0..eaaec77313 100644 --- a/python/packages/devui/tests/test_mapper.py +++ b/python/packages/devui/tests/test_mapper.py @@ -1,18 +1,17 @@ # Copyright (c) Microsoft. All rights reserved. -"""Clean focused tests for message mapping functionality.""" +"""Tests for message mapping functionality. + +This module tests the MessageMapper which converts Agent Framework events +to OpenAI-compatible streaming events. Tests use REAL classes from +agent_framework, not mocks, to ensure proper serialization. +""" -import asyncio -import sys -from pathlib import Path from typing import Any import pytest -# Add the main agent_framework package for real types -sys.path.insert(0, str(Path(__file__).parent.parent.parent / "main")) - -# Import Agent Framework types (assuming they are always available) +# Import Agent Framework types from agent_framework._types import ( AgentRunResponseUpdate, ErrorContent, @@ -22,8 +21,51 @@ TextContent, ) +# Import real workflow event classes - NOT mocks! +from agent_framework._workflows._events import ( + ExecutorCompletedEvent, + WorkflowStartedEvent, + WorkflowStatusEvent, +) + +# Import test utilities +from test_helpers import ( + create_agent_run_response, + create_executor_completed_event, + create_executor_failed_event, + create_executor_invoked_event, + create_mapper, + create_test_request, +) + from agent_framework_devui._mapper import MessageMapper -from agent_framework_devui.models._openai_custom import AgentFrameworkRequest +from agent_framework_devui.models._openai_custom import ( + AgentCompletedEvent, + AgentFailedEvent, + AgentFrameworkRequest, + AgentStartedEvent, +) + +# ============================================================================= +# Local Fixtures (to replace conftest.py fixtures) +# ============================================================================= + + +@pytest.fixture +def mapper() -> MessageMapper: + """Create a fresh MessageMapper for each test.""" + return create_mapper() + + +@pytest.fixture +def test_request() -> AgentFrameworkRequest: + """Create a standard test request.""" + return create_test_request() + + +# ============================================================================= +# Test Helpers +# ============================================================================= def create_test_content(content_type: str, **kwargs: Any) -> Any: @@ -41,37 +83,26 @@ def create_test_content(content_type: str, **kwargs: Any) -> Any: raise ValueError(f"Unknown content type: {content_type}") -def create_test_agent_update(contents: list[Any]) -> Any: - """Create test AgentRunResponseUpdate - NO fake attributes!""" +def create_test_agent_update(contents: list[Any]) -> AgentRunResponseUpdate: + """Create test AgentRunResponseUpdate.""" return AgentRunResponseUpdate( contents=contents, role=Role.ASSISTANT, message_id="test_msg", response_id="test_resp" ) -@pytest.fixture -def mapper() -> MessageMapper: - return MessageMapper() - - -@pytest.fixture -def test_request() -> AgentFrameworkRequest: - # Use metadata.entity_id for routing - return AgentFrameworkRequest( - metadata={"entity_id": "test_agent"}, - input="Test input", - stream=True, - ) +# ============================================================================= +# Basic Content Mapping Tests +# ============================================================================= async def test_critical_isinstance_bug_detection(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: """CRITICAL: Test that would have caught the isinstance vs hasattr bug.""" - content = create_test_content("text", text="Bug detection test") update = create_test_agent_update([content]) # Key assertions that would have caught the bug - assert hasattr(update, "contents") # Real attribute ✅ - assert not hasattr(update, "response") # Fake attribute should not exist ✅ + assert hasattr(update, "contents") # Real attribute + assert not hasattr(update, "response") # Fake attribute should not exist # Test isinstance works with real types assert isinstance(update, AgentRunResponseUpdate) @@ -81,7 +112,6 @@ async def test_critical_isinstance_bug_detection(mapper: MessageMapper, test_req assert len(events) > 0 assert all(hasattr(event, "type") for event in events) - # Should never get unknown events with proper types assert all(event.type != "unknown" for event in events) @@ -136,13 +166,12 @@ async def test_function_result_content_with_string_result( """Test FunctionResultContent with plain string result (regular tools).""" content = FunctionResultContent( call_id="test_call_123", - result="Hello, World!", # Plain string like regular Python function tools + result="Hello, World!", ) update = create_test_agent_update([content]) events = await mapper.convert_event(update, test_request) - # Should produce response.function_result.complete event assert len(events) >= 1 result_events = [e for e in events if e.type == "response.function_result.complete"] assert len(result_events) == 1 @@ -154,59 +183,22 @@ async def test_function_result_content_with_string_result( async def test_function_result_content_with_nested_content_objects( mapper: MessageMapper, test_request: AgentFrameworkRequest ) -> None: - """Test FunctionResultContent with nested Content objects (MCP tools case). - - This tests the issue from GitHub #1476 where MCP tools return FunctionResultContent - with nested TextContent objects that fail to serialize properly. - """ - # This is what MCP tools return - result contains nested Content objects + """Test FunctionResultContent with nested Content objects (MCP tools case).""" content = FunctionResultContent( call_id="mcp_call_456", - result=[TextContent(text="Hello from MCP!")], # List containing TextContent object + result=[TextContent(text="Hello from MCP!")], ) update = create_test_agent_update([content]) events = await mapper.convert_event(update, test_request) - # Should successfully serialize the nested Content object assert len(events) >= 1 result_events = [e for e in events if e.type == "response.function_result.complete"] assert len(result_events) == 1 - - # The output should contain the text from the nested TextContent - # Should not have TypeError or empty output - assert result_events[0].output != "" assert "Hello from MCP!" in result_events[0].output assert result_events[0].call_id == "mcp_call_456" -async def test_function_result_content_with_multiple_nested_content_objects( - mapper: MessageMapper, test_request: AgentFrameworkRequest -) -> None: - """Test FunctionResultContent with multiple nested Content objects.""" - # MCP tools can return multiple Content objects - content = FunctionResultContent( - call_id="mcp_call_789", - result=[ - TextContent(text="First result"), - TextContent(text="Second result"), - ], - ) - update = create_test_agent_update([content]) - - events = await mapper.convert_event(update, test_request) - - assert len(events) >= 1 - result_events = [e for e in events if e.type == "response.function_result.complete"] - assert len(result_events) == 1 - - # Should serialize all nested Content objects - output = result_events[0].output - assert output != "" - assert "First result" in output - assert "Second result" in output - - async def test_error_content_mapping(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: """Test ErrorContent mapping.""" content = create_test_content("error", message="Test error", code="test_code") @@ -232,75 +224,32 @@ async def test_mixed_content_types(mapper: MessageMapper, test_request: AgentFra events = await mapper.convert_event(update, test_request) assert len(events) >= 3 - - # Should have both types of events event_types = {event.type for event in events} assert "response.output_text.delta" in event_types assert "response.function_call_arguments.delta" in event_types -async def test_unknown_content_fallback(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: - """Test graceful handling of unknown content types.""" - # Test the fallback path directly since we can't create invalid AgentRunResponseUpdate - # due to Pydantic validation. Instead, test the content mapper's unknown content handling. - - class MockUnknownContent: - def __init__(self): - self.__class__.__name__ = "WeirdUnknownContent" # Not in content_mappers - - # Test the content mapper directly - context = mapper._get_or_create_context(test_request) - unknown_content = MockUnknownContent() - - # This should trigger the unknown content fallback in _convert_agent_update - event = await mapper._create_unknown_content_event(unknown_content, context) - - assert event.type == "response.output_text.delta" - assert "Unknown content type" in event.delta - assert "WeirdUnknownContent" in event.delta - - -async def test_agent_run_response_mapping(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: - """Test that mapper handles complete AgentRunResponse (non-streaming).""" - from agent_framework import AgentRunResponse, ChatMessage, Role, TextContent - - # Create a complete response like agent.run() would return - message = ChatMessage( - role=Role.ASSISTANT, - contents=[TextContent(text="Complete response from run()")], - ) - response = AgentRunResponse(messages=[message], response_id="test_resp_123") - - # Mapper should convert it to streaming events - events = await mapper.convert_event(response, test_request) - - assert len(events) > 0 - # Should produce text delta events - text_events = [e for e in events if e.type == "response.output_text.delta"] - assert len(text_events) > 0 - assert text_events[0].delta == "Complete response from run()" +# ============================================================================= +# Agent Lifecycle Event Tests +# ============================================================================= async def test_agent_lifecycle_events(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: """Test that agent lifecycle events are properly converted to OpenAI format.""" - from agent_framework_devui.models._openai_custom import AgentCompletedEvent, AgentFailedEvent, AgentStartedEvent - # Test AgentStartedEvent start_event = AgentStartedEvent() events = await mapper.convert_event(start_event, test_request) - assert len(events) == 2 # Should emit response.created and response.in_progress + assert len(events) == 2 # response.created and response.in_progress assert events[0].type == "response.created" assert events[1].type == "response.in_progress" - assert events[0].response.model == "devui" # Should use 'devui' when model not specified in request + assert events[0].response.model == "devui" assert events[0].response.status == "in_progress" # Test AgentCompletedEvent complete_event = AgentCompletedEvent() events = await mapper.convert_event(complete_event, test_request) - # AgentCompletedEvent no longer emits response.completed to avoid duplicates - # The server will emit the final response.completed with usage data assert len(events) == 0 # Test AgentFailedEvent @@ -312,154 +261,218 @@ async def test_agent_lifecycle_events(mapper: MessageMapper, test_request: Agent assert events[0].type == "response.failed" assert events[0].response.status == "failed" assert events[0].response.error.message == "Test error" - assert events[0].response.error.code == "server_error" -@pytest.mark.skip(reason="Workflow events need real classes from agent_framework.workflows") -async def test_workflow_lifecycle_events(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: - """Test that workflow lifecycle events are properly converted to OpenAI format.""" +async def test_agent_run_response_mapping(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: + """Test that mapper handles complete AgentRunResponse (non-streaming).""" + response = create_agent_run_response("Complete response from run()") - # Create mock workflow events (since we don't have access to the real ones in tests) - class WorkflowStartedEvent: # noqa: B903 - def __init__(self, workflow_id: str): - self.workflow_id = workflow_id + events = await mapper.convert_event(response, test_request) - class WorkflowCompletedEvent: # noqa: B903 - def __init__(self, workflow_id: str): - self.workflow_id = workflow_id + assert len(events) > 0 + text_events = [e for e in events if e.type == "response.output_text.delta"] + assert len(text_events) > 0 + assert text_events[0].delta == "Complete response from run()" - class WorkflowFailedEvent: # noqa: B903 - def __init__(self, workflow_id: str, error_info: dict | None = None): - self.workflow_id = workflow_id - self.error_info = error_info - # Test WorkflowStartedEvent - start_event = WorkflowStartedEvent(workflow_id="test_workflow_123") - events = await mapper.convert_event(start_event, test_request) +# ============================================================================= +# Workflow Executor Event Tests (using REAL classes, not mocks!) +# ============================================================================= - assert len(events) == 2 # Should emit response.created and response.in_progress - assert events[0].type == "response.created" - assert events[1].type == "response.in_progress" - assert events[0].response.model == "test_agent" # Should use model from request - assert events[0].response.status == "in_progress" - # Test WorkflowCompletedEvent - complete_event = WorkflowCompletedEvent(workflow_id="test_workflow_123") - events = await mapper.convert_event(complete_event, test_request) +async def test_executor_invoked_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: + """Test ExecutorInvokedEvent using the REAL class from agent_framework.""" + # Use real class, not mock! + event = create_executor_invoked_event(executor_id="exec_123") - # WorkflowCompletedEvent no longer emits response.completed to avoid duplicates - # The server will emit the final response.completed with usage data - assert len(events) == 0 + events = await mapper.convert_event(event, test_request) - # Test WorkflowFailedEvent with error info - failed_event = WorkflowFailedEvent(workflow_id="test_workflow_123", error_info={"message": "Workflow failed"}) - events = await mapper.convert_event(failed_event, test_request) + assert len(events) == 1 + assert events[0].type == "response.output_item.added" + # Access as dict since item might be ExecutorActionItem + item = events[0].item if isinstance(events[0].item, dict) else events[0].item.model_dump() + assert item["type"] == "executor_action" + assert item["executor_id"] == "exec_123" + assert item["status"] == "in_progress" + + +async def test_executor_completed_event_simple_data(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: + """Test ExecutorCompletedEvent with simple dict data.""" + # Create event with simple data + event = ExecutorCompletedEvent(executor_id="exec_123", data={"simple": "result"}) + + # First need to invoke the executor to set up context + invoke_event = create_executor_invoked_event(executor_id="exec_123") + await mapper.convert_event(invoke_event, test_request) + + # Now complete it + events = await mapper.convert_event(event, test_request) assert len(events) == 1 - assert events[0].type == "response.failed" - assert events[0].response.status == "failed" - assert events[0].response.error.message == "{'message': 'Workflow failed'}" - assert events[0].response.error.code == "server_error" + assert events[0].type == "response.output_item.done" + item = events[0].item if isinstance(events[0].item, dict) else events[0].item.model_dump() + assert item["type"] == "executor_action" + assert item["executor_id"] == "exec_123" + assert item["status"] == "completed" + # Result should be serialized + assert item["result"] == {"simple": "result"} -@pytest.mark.skip(reason="Executor events need real classes from agent_framework.workflows") -async def test_executor_action_events(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: - """Test that workflow executor events are properly converted to custom output item events.""" +async def test_executor_completed_event_with_agent_response( + mapper: MessageMapper, test_request: AgentFrameworkRequest +) -> None: + """Test ExecutorCompletedEvent with nested AgentExecutorResponse. - # Create mock executor events (since we don't have access to the real ones in tests) - class ExecutorInvokedEvent: # noqa: B903 - def __init__(self, executor_id: str, executor_type: str = "test"): - self.executor_id = executor_id - self.executor_type = executor_type + This is a REGRESSION TEST for the serialization bug where + ExecutorCompletedEvent.data contained AgentExecutorResponse with nested + AgentRunResponse and ChatMessage objects (SerializationMixin) that + Pydantic couldn't serialize. + """ + # Create event with realistic nested data - the exact structure that caused the bug + event = create_executor_completed_event(executor_id="exec_agent", with_agent_response=True) - class ExecutorCompletedEvent: # noqa: B903 - def __init__(self, executor_id: str, result: Any = None): - self.executor_id = executor_id - self.result = result + # Verify the data has the problematic structure + assert hasattr(event.data, "agent_run_response") + assert hasattr(event.data, "full_conversation") - class ExecutorFailedEvent: # noqa: B903 - def __init__(self, executor_id: str, error: Exception | None = None): - self.executor_id = executor_id - self.error = error + # First invoke the executor + invoke_event = create_executor_invoked_event(executor_id="exec_agent") + await mapper.convert_event(invoke_event, test_request) - # Test ExecutorInvokedEvent - invoked_event = ExecutorInvokedEvent(executor_id="exec_123", executor_type="test_executor") - events = await mapper.convert_event(invoked_event, test_request) + # Now complete - this should NOT raise serialization errors + events = await mapper.convert_event(event, test_request) assert len(events) == 1 - assert events[0].type == "response.output_item.added" - assert events[0].item["type"] == "executor_action" - assert events[0].item["executor_id"] == "exec_123" - assert events[0].item["status"] == "in_progress" + assert events[0].type == "response.output_item.done" - # Test ExecutorCompletedEvent - complete_event = ExecutorCompletedEvent(executor_id="exec_123", result={"data": "success"}) - events = await mapper.convert_event(complete_event, test_request) + # Get the item (might be Pydantic model or dict) + item = events[0].item if isinstance(events[0].item, dict) else events[0].item.model_dump() + assert item["type"] == "executor_action" + assert item["executor_id"] == "exec_agent" + assert item["status"] == "completed" + + # The result should be serialized (converted to dict) + result = item["result"] + assert result is not None + # Should be a dict or list, not the original object + assert isinstance(result, (dict, list)) + + +async def test_executor_completed_event_serialization_to_json( + mapper: MessageMapper, test_request: AgentFrameworkRequest +) -> None: + """REGRESSION TEST: Verify the full JSON serialization works. + + This tests the exact failure mode from the bug: calling model_dump_json() + on the event containing nested SerializationMixin objects. + """ + # Create the problematic event + event = create_executor_completed_event(executor_id="exec_json_test", with_agent_response=True) + + # Invoke first + invoke_event = create_executor_invoked_event(executor_id="exec_json_test") + await mapper.convert_event(invoke_event, test_request) + + # Complete + events = await mapper.convert_event(event, test_request) assert len(events) == 1 - assert events[0].type == "response.output_item.done" - assert events[0].item["type"] == "executor_action" - assert events[0].item["executor_id"] == "exec_123" - assert events[0].item["status"] == "completed" - assert events[0].item["result"] == {"data": "success"} + done_event = events[0] - # Test ExecutorFailedEvent - failed_event = ExecutorFailedEvent(executor_id="exec_123", error=Exception("Executor failed")) - events = await mapper.convert_event(failed_event, test_request) + # This is the critical test - model_dump_json() should NOT raise + # "Unable to serialize unknown type: " + try: + json_str = done_event.model_dump_json() + assert json_str is not None + assert len(json_str) > 0 + # Verify it's valid JSON by checking it contains expected fields + assert "executor_action" in json_str + assert "exec_json_test" in json_str + assert "completed" in json_str + except Exception as e: + pytest.fail(f"model_dump_json() raised an exception: {e}") + + +async def test_executor_failed_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: + """Test ExecutorFailedEvent using the REAL class.""" + # First invoke the executor + invoke_event = create_executor_invoked_event(executor_id="exec_fail") + await mapper.convert_event(invoke_event, test_request) + + # Now fail it + event = create_executor_failed_event(executor_id="exec_fail", error_message="Executor failed") + events = await mapper.convert_event(event, test_request) assert len(events) == 1 assert events[0].type == "response.output_item.done" - assert events[0].item["type"] == "executor_action" - assert events[0].item["executor_id"] == "exec_123" - assert events[0].item["status"] == "failed" - assert "Executor failed" in str(events[0].item["error"]["message"]) + item = events[0].item if isinstance(events[0].item, dict) else events[0].item.model_dump() + assert item["type"] == "executor_action" + assert item["executor_id"] == "exec_fail" + assert item["status"] == "failed" + assert "Executor failed" in str(item["error"]) + + +# ============================================================================= +# Workflow Lifecycle Event Tests +# ============================================================================= + + +async def test_workflow_started_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: + """Test WorkflowStartedEvent using the REAL class.""" + + event = WorkflowStartedEvent(data=None) + events = await mapper.convert_event(event, test_request) + + # WorkflowStartedEvent should emit response.created and response.in_progress + assert len(events) == 2 + assert events[0].type == "response.created" + assert events[1].type == "response.in_progress" + + +async def test_workflow_status_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: + """Test WorkflowStatusEvent using the REAL class.""" + from agent_framework._workflows._events import WorkflowRunState + + event = WorkflowStatusEvent(state=WorkflowRunState.IN_PROGRESS) + events = await mapper.convert_event(event, test_request) + + # Should emit some status-related event + assert len(events) >= 0 # May emit events or may be filtered + + +# ============================================================================= +# MagenticAgentDeltaEvent Tests +# ============================================================================= async def test_magentic_agent_delta_creates_message_container( mapper: MessageMapper, test_request: AgentFrameworkRequest ) -> None: - """Test that MagenticAgentDeltaEvent creates message containers when no executor context (fallback).""" - - # Create mock MagenticAgentDeltaEvent that mimics the real class + """Test that MagenticAgentDeltaEvent creates message containers when no executor context.""" from dataclasses import dataclass - try: - from agent_framework import WorkflowEvent - - @dataclass - class MagenticAgentDeltaEvent(WorkflowEvent): # Inherit from WorkflowEvent - agent_id: str - text: str | None = None + from agent_framework import WorkflowEvent - except ImportError: - # Fallback if WorkflowEvent is not available - @dataclass - class MagenticAgentDeltaEvent: # Use the expected name directly - agent_id: str - text: str | None = None + @dataclass + class MagenticAgentDeltaEvent(WorkflowEvent): + agent_id: str + text: str | None = None - # First delta should create message container (no executor context = fallback behavior) + # First delta should create message container first_delta = MagenticAgentDeltaEvent(agent_id="test_agent", text="Hello ") events = await mapper.convert_event(first_delta, test_request) # Should emit 3 events: message container, content part, and text delta assert len(events) == 3 assert events[0].type == "response.output_item.added" - assert events[0].item.type == "message" # Message, not executor_action! + assert events[0].item.type == "message" assert events[0].item.metadata["agent_id"] == "test_agent" - assert events[0].item.metadata["source"] == "magentic" message_id = events[0].item.id - # Check text delta references the message ID - assert events[2].type == "response.output_text.delta" - assert events[2].item_id == message_id - assert events[2].delta == "Hello " - # Second delta should NOT create new container second_delta = MagenticAgentDeltaEvent(agent_id="test_agent", text="world!") events = await mapper.convert_event(second_delta, test_request) - # Only text delta, no new container assert len(events) == 1 assert events[0].type == "response.output_text.delta" assert events[0].item_id == message_id @@ -469,78 +482,223 @@ async def test_magentic_agent_delta_routes_to_executor_item( mapper: MessageMapper, test_request: AgentFrameworkRequest ) -> None: """Test that MagenticAgentDeltaEvent routes to executor item when executor context is present.""" - from dataclasses import dataclass - try: - from agent_framework import WorkflowEvent - - @dataclass - class ExecutorInvokedEvent(WorkflowEvent): - executor_id: str - - @dataclass - class MagenticAgentDeltaEvent(WorkflowEvent): - agent_id: str - text: str | None = None + from agent_framework import WorkflowEvent - except ImportError: + @dataclass + class MagenticAgentDeltaEvent(WorkflowEvent): + agent_id: str + text: str | None = None - @dataclass - class ExecutorInvokedEvent: - executor_id: str - - @dataclass - class MagenticAgentDeltaEvent: - agent_id: str - text: str | None = None - - # First, simulate executor being invoked (sets current_executor_id in context) - executor_event = ExecutorInvokedEvent(executor_id="agent_writer") + # First, invoke an executor (sets current_executor_id in context) + executor_event = create_executor_invoked_event(executor_id="agent_writer") executor_events = await mapper.convert_event(executor_event, test_request) - # Should create executor item assert len(executor_events) == 1 assert executor_events[0].type == "response.output_item.added" - assert executor_events[0].item.type == "executor_action" executor_item_id = executor_events[0].item.id - # Now send Magentic delta - should route to executor's item, NOT create new message + # Now send Magentic delta - should route to executor's item delta = MagenticAgentDeltaEvent(agent_id="writer", text="Hello world") delta_events = await mapper.convert_event(delta, test_request) # Should only emit 1 event: text delta routed to executor's item assert len(delta_events) == 1 assert delta_events[0].type == "response.output_text.delta" - assert delta_events[0].item_id == executor_item_id # Routed to executor's item! + assert delta_events[0].item_id == executor_item_id assert delta_events[0].delta == "Hello world" -if __name__ == "__main__": - # Simple test runner - async def run_all_tests() -> None: - mapper = MessageMapper() - test_request = AgentFrameworkRequest( - metadata={"entity_id": "test"}, - input="Test", - stream=True, - ) +# ============================================================================= +# Unknown Content Fallback Tests +# ============================================================================= + + +async def test_unknown_content_fallback(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: + """Test graceful handling of unknown content types.""" + + class MockUnknownContent: + def __init__(self): + self.__class__.__name__ = "WeirdUnknownContent" + + context = mapper._get_or_create_context(test_request) + unknown_content = MockUnknownContent() + + event = await mapper._create_unknown_content_event(unknown_content, context) + + assert event.type == "response.output_text.delta" + assert "Unknown content type" in event.delta + assert "WeirdUnknownContent" in event.delta + + +# ============================================================================= +# WorkflowOutputEvent Tests +# ============================================================================= + + +async def test_workflow_output_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: + """Test WorkflowOutputEvent is converted to output_item.added.""" + from agent_framework._workflows._events import WorkflowOutputEvent + + event = WorkflowOutputEvent(data="Final workflow output", source_executor_id="final_executor") + events = await mapper.convert_event(event, test_request) + + # WorkflowOutputEvent should emit output_item.added + assert len(events) == 1 + assert events[0].type == "response.output_item.added" + # Check item contains the output text + item = events[0].item + assert item.type == "message" + assert any("Final workflow output" in str(c) for c in item.content) + + +async def test_workflow_output_event_with_list_data(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: + """Test WorkflowOutputEvent with list data (common for sequential/concurrent workflows).""" + from agent_framework import ChatMessage, Role, TextContent + from agent_framework._workflows._events import WorkflowOutputEvent + + # Sequential/Concurrent workflows often output list[ChatMessage] + messages = [ + ChatMessage(role=Role.USER, contents=[TextContent(text="Hello")]), + ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="World")]), + ] + event = WorkflowOutputEvent(data=messages, source_executor_id="complete") + events = await mapper.convert_event(event, test_request) + + assert len(events) == 1 + assert events[0].type == "response.output_item.added" + + +# ============================================================================= +# WorkflowFailedEvent Tests +# ============================================================================= + + +async def test_workflow_failed_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: + """Test WorkflowFailedEvent is converted to response.failed.""" + from agent_framework._workflows._events import WorkflowErrorDetails, WorkflowFailedEvent + + details = WorkflowErrorDetails( + error_type="TestError", + message="Workflow failed due to test error", + executor_id="failing_executor", + ) + event = WorkflowFailedEvent(details=details) + events = await mapper.convert_event(event, test_request) + + # WorkflowFailedEvent should emit response.failed + assert len(events) >= 1 + # Find the failed event + failed_events = [e for e in events if getattr(e, "type", "") == "response.failed"] + assert len(failed_events) == 1, f"Expected response.failed, got types: {[getattr(e, 'type', '') for e in events]}" + # Check response contains error info + response = failed_events[0].response + assert response.status == "failed" + assert response.error is not None + + +async def test_workflow_failed_event_with_traceback(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: + """Test WorkflowFailedEvent includes traceback when available.""" + from agent_framework._workflows._events import WorkflowErrorDetails, WorkflowFailedEvent + + details = WorkflowErrorDetails( + error_type="ValueError", + message="Invalid input provided", + traceback="Traceback (most recent call last):\n File ...\nValueError: Invalid input", + executor_id="validation_executor", + ) + event = WorkflowFailedEvent(details=details) + events = await mapper.convert_event(event, test_request) + + assert len(events) == 1 + assert events[0].type == "response.failed" + + +# ============================================================================= +# WorkflowWarningEvent and WorkflowErrorEvent Tests +# ============================================================================= + + +async def test_workflow_warning_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: + """Test WorkflowWarningEvent is converted to trace event.""" + from agent_framework._workflows._events import WorkflowWarningEvent + + event = WorkflowWarningEvent(data="This is a warning message") + events = await mapper.convert_event(event, test_request) + + # WorkflowWarningEvent should emit a trace event + assert len(events) == 1 + assert events[0].type == "response.trace.completed" + assert events[0].data["event_type"] == "WorkflowWarningEvent" + + +async def test_workflow_error_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: + """Test WorkflowErrorEvent is converted to trace event.""" + from agent_framework._workflows._events import WorkflowErrorEvent + + event = WorkflowErrorEvent(data=ValueError("Something went wrong")) + events = await mapper.convert_event(event, test_request) + + # WorkflowErrorEvent should emit a trace event + assert len(events) == 1 + assert events[0].type == "response.trace.completed" + assert events[0].data["event_type"] == "WorkflowErrorEvent" + + +# ============================================================================= +# RequestInfoEvent Tests (Human-in-the-Loop) +# ============================================================================= + + +async def test_request_info_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: + """Test RequestInfoEvent is converted to HIL request event.""" + from agent_framework._workflows._events import RequestInfoEvent + + event = RequestInfoEvent( + request_id="req_123", + source_executor_id="approval_executor", + request_data={"action": "approve", "details": "Please approve this action"}, + response_type=str, + ) + events = await mapper.convert_event(event, test_request) + + # RequestInfoEvent should emit response.request_info.requested + assert len(events) >= 1 + # Check that request info is captured + has_hil_event = any(getattr(e, "type", "") == "response.request_info.requested" for e in events) + assert has_hil_event, f"Expected response.request_info.requested, got: {[getattr(e, 'type', '') for e in events]}" + + # Verify the event contains the expected data + hil_event = [e for e in events if getattr(e, "type", "") == "response.request_info.requested"][0] + assert hil_event.request_id == "req_123" + assert hil_event.source_executor_id == "approval_executor" + + +# ============================================================================= +# SuperStep Event Tests +# ============================================================================= + + +async def test_superstep_started_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: + """Test SuperStepStartedEvent is handled gracefully.""" + from agent_framework._workflows._events import SuperStepStartedEvent + + event = SuperStepStartedEvent(iteration=1) + events = await mapper.convert_event(event, test_request) + + # SuperStepStartedEvent may not emit events (internal workflow signal) + # Just ensure it doesn't crash + assert isinstance(events, list) + + +async def test_superstep_completed_event(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None: + """Test SuperStepCompletedEvent is handled gracefully.""" + from agent_framework._workflows._events import SuperStepCompletedEvent + + event = SuperStepCompletedEvent(iteration=1) + events = await mapper.convert_event(event, test_request) - tests = [ - ("Critical isinstance bug detection", test_critical_isinstance_bug_detection), - ("Text content mapping", test_text_content_mapping), - ("Function call mapping", test_function_call_mapping), - ("Error content mapping", test_error_content_mapping), - ("Mixed content types", test_mixed_content_types), - ("Unknown content fallback", test_unknown_content_fallback), - ] - - passed = 0 - for _test_name, test_func in tests: - try: - await test_func(mapper, test_request) - passed += 1 - except Exception: - pass - - asyncio.run(run_all_tests()) + # SuperStepCompletedEvent may not emit events (internal workflow signal) + # Just ensure it doesn't crash + assert isinstance(events, list) diff --git a/python/packages/devui/tests/test_server.py b/python/packages/devui/tests/test_server.py index 8a3dbd3579..ac835bdfb5 100644 --- a/python/packages/devui/tests/test_server.py +++ b/python/packages/devui/tests/test_server.py @@ -44,7 +44,6 @@ async def test_server_health_endpoint(test_entities_dir): # Framework name is now hardcoded since we simplified to single framework -@pytest.mark.skip("Skipping while we fix discovery") async def test_server_entities_endpoint(test_entities_dir): """Test /v1/entities endpoint.""" server = DevServer(entities_dir=test_entities_dir) @@ -52,11 +51,13 @@ async def test_server_entities_endpoint(test_entities_dir): entities = await executor.discover_entities() assert len(entities) >= 1 - # Should find at least the weather agent + # Should find at least one agent agent_entities = [e for e in entities if e.type == "agent"] - assert len(agent_entities) >= 1 - agent_names = [e.name for e in agent_entities] - assert "WeatherAgent" in agent_names + assert len(agent_entities) >= 1, "Should discover at least one agent" + # Verify agents have required properties + for agent in agent_entities: + assert agent.id, "Agent should have an ID" + assert agent.name, "Agent should have a name" async def test_server_execution_sync(test_entities_dir):