From a1ea765c76163a89758346be6a59df01add7f5be Mon Sep 17 00:00:00 2001 From: Laveesh Rohra Date: Tue, 13 Jan 2026 15:58:38 -0800 Subject: [PATCH 1/4] Add integration tests --- .../agent_framework_durabletask/_executors.py | 9 +- python/packages/durabletask/pyproject.toml | 7 +- .../tests/integration_tests/.env.example | 17 ++ .../tests/integration_tests/README.md | 110 +++++++++ .../tests/integration_tests/__init__.py | 2 + .../tests/integration_tests/conftest.py | 231 ++++++++++++++++++ .../integration_tests/test_01_single_agent.py | 97 ++++++++ .../integration_tests/test_02_multi_agent.py | 112 +++++++++ .../test_03_single_agent_streaming.py | 225 +++++++++++++++++ ..._04_single_agent_orchestration_chaining.py | 117 +++++++++ ...5_multi_agent_orchestration_concurrency.py | 93 +++++++ ..._multi_agent_orchestration_conditionals.py | 107 ++++++++ ...test_07_single_agent_orchestration_hitl.py | 182 ++++++++++++++ .../tests/integration_tests/testutils.py | 165 +++++++++++++ .../function_app.py | 21 +- .../03_single_agent_streaming/worker.py | 4 +- .../worker.py | 4 +- .../worker.py | 17 +- python/uv.lock | 8 +- 19 files changed, 1503 insertions(+), 25 deletions(-) create mode 100644 python/packages/durabletask/tests/integration_tests/.env.example create mode 100644 python/packages/durabletask/tests/integration_tests/README.md create mode 100644 python/packages/durabletask/tests/integration_tests/__init__.py create mode 100644 python/packages/durabletask/tests/integration_tests/conftest.py create mode 100644 python/packages/durabletask/tests/integration_tests/test_01_single_agent.py create mode 100644 python/packages/durabletask/tests/integration_tests/test_02_multi_agent.py create mode 100644 python/packages/durabletask/tests/integration_tests/test_03_single_agent_streaming.py create mode 100644 python/packages/durabletask/tests/integration_tests/test_04_single_agent_orchestration_chaining.py create mode 100644 python/packages/durabletask/tests/integration_tests/test_05_multi_agent_orchestration_concurrency.py create mode 100644 python/packages/durabletask/tests/integration_tests/test_06_multi_agent_orchestration_conditionals.py create mode 100644 python/packages/durabletask/tests/integration_tests/test_07_single_agent_orchestration_hitl.py create mode 100644 python/packages/durabletask/tests/integration_tests/testutils.py diff --git a/python/packages/durabletask/agent_framework_durabletask/_executors.py b/python/packages/durabletask/agent_framework_durabletask/_executors.py index e3e3e4cfc6..c6cadc28bf 100644 --- a/python/packages/durabletask/agent_framework_durabletask/_executors.py +++ b/python/packages/durabletask/agent_framework_durabletask/_executors.py @@ -440,9 +440,8 @@ def generate_unique_id(self) -> str: def get_run_request( self, message: str, - response_format: type[BaseModel] | None, - enable_tool_calls: bool, - wait_for_response: bool = True, + *, + options: dict[str, Any] | None = None, ) -> RunRequest: """Get the current run request from the orchestration context. @@ -451,9 +450,7 @@ def get_run_request( """ request = super().get_run_request( message, - response_format, - enable_tool_calls, - wait_for_response, + options=options, ) request.orchestration_id = self._context.instance_id return request diff --git a/python/packages/durabletask/pyproject.toml b/python/packages/durabletask/pyproject.toml index 48964a589e..3937f24d4f 100644 --- a/python/packages/durabletask/pyproject.toml +++ b/python/packages/durabletask/pyproject.toml @@ -4,7 +4,7 @@ description = "Durable Task integration for Microsoft Agent Framework." authors = [{ name = "Microsoft", email = "af-support@microsoft.com"}] readme = "README.md" requires-python = ">=3.10" -version = "0.0.1" +version = "0.0.1b260113" license-files = ["LICENSE"] urls.homepage = "https://aka.ms/agent-framework" urls.source = "https://github.com/microsoft/agent-framework/tree/main/python" @@ -53,6 +53,11 @@ filterwarnings = [ timeout = 120 markers = [ "integration: marks tests as integration tests", + "integration_test: marks tests as integration tests (alternative marker)", + "sample: marks tests as sample tests", + "requires_azure_openai: marks tests that require Azure OpenAI", + "requires_dts: marks tests that require Durable Task Scheduler", + "requires_redis: marks tests that require Redis" ] [tool.ruff] diff --git a/python/packages/durabletask/tests/integration_tests/.env.example b/python/packages/durabletask/tests/integration_tests/.env.example new file mode 100644 index 0000000000..a36cf771f8 --- /dev/null +++ b/python/packages/durabletask/tests/integration_tests/.env.example @@ -0,0 +1,17 @@ +# Azure OpenAI Configuration +AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/ +AZURE_OPENAI_CHAT_DEPLOYMENT_NAME=your-deployment-name +# Optional: Use Azure CLI authentication if not provided +# AZURE_OPENAI_API_KEY=your-api-key + +# Durable Task Scheduler Configuration +ENDPOINT=http://localhost:8080 +TASKHUB=default + +# Redis Configuration (for streaming tests) +REDIS_CONNECTION_STRING=redis://localhost:6379 +REDIS_STREAM_TTL_MINUTES=10 + +# Integration Test Control +# Set to 'true' to enable integration tests +RUN_INTEGRATION_TESTS=true diff --git a/python/packages/durabletask/tests/integration_tests/README.md b/python/packages/durabletask/tests/integration_tests/README.md new file mode 100644 index 0000000000..a35a8d58d5 --- /dev/null +++ b/python/packages/durabletask/tests/integration_tests/README.md @@ -0,0 +1,110 @@ +# Sample Integration Tests + +Integration tests that validate the Durable Agent Framework samples by running them against a Durable Task Scheduler (DTS) instance. + +## Setup + +### 1. Create `.env` file + +Copy `.env.example` to `.env` and fill in your Azure credentials: + +```bash +cp .env.example .env +``` + +Required variables: +- `AZURE_OPENAI_ENDPOINT` +- `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME` +- `AZURE_OPENAI_API_KEY` (optional if using Azure CLI authentication) +- `RUN_INTEGRATION_TESTS` (set to `true`) +- `ENDPOINT` (default: http://localhost:8080) +- `TASKHUB` (default: default) + +Optional variables (for streaming tests): +- `REDIS_CONNECTION_STRING` (default: redis://localhost:6379) +- `REDIS_STREAM_TTL_MINUTES` (default: 10) + +### 2. Start required services + +**Durable Task Scheduler:** +```bash +docker run -d --name dts-emulator -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest +``` +- Port 8080: gRPC endpoint (used by tests) +- Port 8082: Web dashboard (optional, for monitoring) + +**Redis (for streaming tests):** +```bash +docker run -d --name redis -p 6379:6379 redis:latest +``` +- Port 6379: Redis server endpoint + +## Running Tests + +The tests automatically start and stop worker processes for each sample. + +### Run all sample tests +```bash +uv run pytest packages/durabletask/tests/integration_tests -v +``` + +### Run specific sample +```bash +uv run pytest packages/durabletask/tests/integration_tests/test_01_single_agent.py -v +``` + +### Run with verbose output +```bash +uv run pytest packages/durabletask/tests/integration_tests -sv +``` + +## How It Works + +Each test file uses pytest markers to automatically configure and start the worker process: + +```python +pytestmark = [ + pytest.mark.sample("03_single_agent_streaming"), + pytest.mark.integration_test, + pytest.mark.requires_azure_openai, + pytest.mark.requires_dts, + pytest.mark.requires_redis, +] +``` +roubleshooting + +**Tests are skipped:** +Ensure `RUN_INTEGRATION_TESTS=true` is set in your `.env` file. + +**DTS connection failed:** +Check that the DTS emulator container is running: `docker ps | grep dts-emulator` + +**Redis connection failed:** +Check that Redis is running: `docker ps | grep redis` + +**Missing environment variables:** +Ensure your `.env` file contains all required variables from `.env.example`. + +**Tests timeout:** +Check that Azure OpenAI credentials are valid and the service is accessible. + +If you see "DTS emulator is not available": +- Ensure Docker container is running: `docker ps | grep dts-emulator` +- Check port 8080 is not in use by another process +- Restart the container if needed + +### Azure OpenAI Errors + +If you see authentication or deployment errors: +- Verify your `AZURE_OPENAI_ENDPOINT` is correct +- Confirm `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME` matches your deployment +- If using API key, check `AZURE_OPENAI_API_KEY` is valid +- If using Azure CLI, ensure you're logged in: `az login` + +## CI/CD + +For automated testing in CI/CD pipelines: + +1. Use Docker Compose to start DTS emulator +2. Set environment variables via CI/CD secrets +3. Run tests with appropriate markers: `pytest -m integration_test` diff --git a/python/packages/durabletask/tests/integration_tests/__init__.py b/python/packages/durabletask/tests/integration_tests/__init__.py new file mode 100644 index 0000000000..c2e7b70bf2 --- /dev/null +++ b/python/packages/durabletask/tests/integration_tests/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft. All rights reserved. +"""Integration tests for agent-framework-durabletask package.""" diff --git a/python/packages/durabletask/tests/integration_tests/conftest.py b/python/packages/durabletask/tests/integration_tests/conftest.py new file mode 100644 index 0000000000..27c4468888 --- /dev/null +++ b/python/packages/durabletask/tests/integration_tests/conftest.py @@ -0,0 +1,231 @@ +# Copyright (c) Microsoft. All rights reserved. +"""Pytest configuration and fixtures for durabletask integration tests.""" + +import asyncio +import logging +import os +import subprocess +import sys +import time +import uuid +from collections.abc import Generator +from pathlib import Path +from typing import Any, cast + +import pytest +import redis.asyncio as aioredis +from dotenv import load_dotenv +from durabletask.azuremanaged.client import DurableTaskSchedulerClient + +# Load environment variables from .env file +load_dotenv() + +# Configure logging to reduce noise during tests +logging.basicConfig(level=logging.WARNING) + + +def _get_dts_endpoint() -> str: + """Get the DTS endpoint from environment or use default.""" + return os.getenv("ENDPOINT", "http://localhost:8080") + + +def _check_dts_available(endpoint: str | None = None) -> bool: + """Check if DTS emulator is available at the given endpoint.""" + try: + resolved_endpoint: str = _get_dts_endpoint() if endpoint is None else endpoint + DurableTaskSchedulerClient( + host_address=resolved_endpoint, + secure_channel=False, + taskhub="test", + token_credential=None, + ) + return True + except Exception: + return False + + +def _check_redis_available() -> bool: + """Check if Redis is available at the default connection string.""" + try: + + async def test_connection() -> bool: + redis_url = os.getenv("REDIS_CONNECTION_STRING", "redis://localhost:6379") + try: + client = aioredis.from_url(redis_url, socket_timeout=2) # type: ignore[reportUnknownMemberType] + await client.ping() # type: ignore[reportUnknownMemberType] + await client.aclose() # type: ignore[reportUnknownMemberType] + return True + except Exception: + return False + + return asyncio.run(test_connection()) + except Exception: + return False + + +def pytest_configure(config: pytest.Config) -> None: + """Register custom markers.""" + config.addinivalue_line("markers", "integration_test: mark test as integration test") + config.addinivalue_line("markers", "requires_dts: mark test as requiring DTS emulator") + config.addinivalue_line("markers", "requires_azure_openai: mark test as requiring Azure OpenAI") + config.addinivalue_line("markers", "requires_redis: mark test as requiring Redis") + config.addinivalue_line( + "markers", + "sample(path): specify the sample directory name for the test (e.g., @pytest.mark.sample('01_single_agent'))", + ) + + +def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item]) -> None: + """Skip tests based on markers and environment availability.""" + run_integration = os.getenv("RUN_INTEGRATION_TESTS", "false").lower() == "true" + skip_integration = pytest.mark.skip(reason="RUN_INTEGRATION_TESTS not set to 'true'") + + # Check Azure OpenAI environment variables + azure_openai_vars = ["AZURE_OPENAI_ENDPOINT", "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME"] + azure_openai_available = all(os.getenv(var) for var in azure_openai_vars) + skip_azure_openai = pytest.mark.skip( + reason=f"Missing required environment variables: {', '.join(azure_openai_vars)}" + ) + + # Check DTS availability + dts_available = _check_dts_available() + skip_dts = pytest.mark.skip(reason=f"DTS emulator is not available at {_get_dts_endpoint()}") + + # Check Redis availability + redis_available = _check_redis_available() + skip_redis = pytest.mark.skip(reason="Redis is not available at redis://localhost:6379") + + for item in items: + if "integration_test" in item.keywords and not run_integration: + item.add_marker(skip_integration) + if "requires_azure_openai" in item.keywords and not azure_openai_available: + item.add_marker(skip_azure_openai) + if "requires_dts" in item.keywords and not dts_available: + item.add_marker(skip_dts) + if "requires_redis" in item.keywords and not redis_available: + item.add_marker(skip_redis) + + +@pytest.fixture(scope="session") +def dts_endpoint() -> str: + """Get the DTS endpoint from environment or use default.""" + return _get_dts_endpoint() + + +@pytest.fixture(scope="session") +def dts_available(dts_endpoint: str) -> bool: + """Check if DTS emulator is available and responding.""" + if _check_dts_available(dts_endpoint): + return True + pytest.skip(f"DTS emulator is not available at {dts_endpoint}") + return False + + +@pytest.fixture(scope="session") +def check_azure_openai_env() -> None: + """Verify Azure OpenAI environment variables are set.""" + required_vars = ["AZURE_OPENAI_ENDPOINT", "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME"] + missing = [var for var in required_vars if not os.getenv(var)] + + if missing: + pytest.skip(f"Missing required environment variables: {', '.join(missing)}") + + +@pytest.fixture(scope="module") +def unique_taskhub() -> str: + """Generate a unique task hub name for test isolation.""" + # Use a shorter UUID to avoid naming issues + return f"test-{uuid.uuid4().hex[:8]}" + + +@pytest.fixture(scope="module") +def worker_process( + dts_available: bool, + check_azure_openai_env: None, + dts_endpoint: str, + unique_taskhub: str, + request: pytest.FixtureRequest, +) -> Generator[dict[str, Any], None, None]: + """ + Start a worker process for the current test module by running the sample worker.py. + + This fixture: + 1. Determines which sample to run from @pytest.mark.sample() + 2. Starts the sample's worker.py as a subprocess + 3. Waits for the worker to be ready + 4. Tears down the worker after tests complete + + Usage: + @pytest.mark.sample("01_single_agent") + class TestSingleAgent: + ... + """ + # Get sample path from marker + sample_marker = request.node.get_closest_marker("sample") # type: ignore[union-attr] + if not sample_marker: + pytest.fail("Test class must have @pytest.mark.sample() marker") + + sample_name: str = cast(str, sample_marker.args[0]) # type: ignore[union-attr] + sample_path: Path = Path(__file__).parents[4] / "samples" / "getting_started" / "durabletask" / sample_name + worker_file: Path = sample_path / "worker.py" + + if not worker_file.exists(): + pytest.fail(f"Sample worker not found: {worker_file}") + + # Set up environment for worker subprocess + env = os.environ.copy() + env["ENDPOINT"] = dts_endpoint + env["TASKHUB"] = unique_taskhub + + # Start worker subprocess + try: + # On Windows, use CREATE_NEW_PROCESS_GROUP to allow proper termination + # shell=True only on Windows to handle PATH resolution + if sys.platform == "win32": + process = subprocess.Popen( + [sys.executable, str(worker_file)], + cwd=str(sample_path), + creationflags=subprocess.CREATE_NEW_PROCESS_GROUP, + shell=True, + env=env, + text=True, + ) + # On Unix, don't use shell=True to avoid shell wrapper issues + else: + process = subprocess.Popen( + [sys.executable, str(worker_file)], + cwd=str(sample_path), + env=env, + text=True, + ) + except Exception as e: + pytest.fail(f"Failed to start worker subprocess: {e}") + + # Wait for worker to initialize + time.sleep(2) + + # Check if process is still running + if process.poll() is not None: + stderr_output = process.stderr.read() if process.stderr else "" + pytest.fail(f"Worker process exited prematurely. stderr: {stderr_output}") + + # Provide worker info to tests + worker_info = { + "process": process, + "endpoint": dts_endpoint, + "taskhub": unique_taskhub, + } + + try: + yield worker_info + finally: + # Cleanup: terminate worker subprocess + try: + process.terminate() + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + process.kill() + process.wait() + except Exception as e: + logging.warning(f"Error during worker process cleanup: {e}") diff --git a/python/packages/durabletask/tests/integration_tests/test_01_single_agent.py b/python/packages/durabletask/tests/integration_tests/test_01_single_agent.py new file mode 100644 index 0000000000..c1147746c7 --- /dev/null +++ b/python/packages/durabletask/tests/integration_tests/test_01_single_agent.py @@ -0,0 +1,97 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Integration tests for single agent functionality. + +Tests basic agent operations including: +- Agent registration and retrieval +- Single agent interactions +- Conversation continuity across multiple messages +- Multi-threaded agent usage +- Empty thread ID handling +""" + +from typing import Any + +import pytest +from durabletask.azuremanaged.client import DurableTaskSchedulerClient + +from agent_framework_durabletask import DurableAIAgentClient + +# Module-level markers - applied to all tests in this module +pytestmark = [ + pytest.mark.sample("01_single_agent"), + pytest.mark.integration_test, + pytest.mark.requires_azure_openai, + pytest.mark.requires_dts, +] + + +class TestSingleAgent: + """Test suite for single agent functionality.""" + + @pytest.fixture(autouse=True) + def setup(self, worker_process: dict[str, Any], dts_endpoint: str) -> None: + """Setup test fixtures.""" + self.endpoint: str = dts_endpoint + self.taskhub: str = str(worker_process["taskhub"]) + + # Create agent client + dts_client = DurableTaskSchedulerClient( + host_address=self.endpoint, + secure_channel=False, + taskhub=self.taskhub, + token_credential=None, + ) + self.agent_client = DurableAIAgentClient(dts_client) + + def test_agent_registration(self) -> None: + """Test that the Joker agent is registered and accessible.""" + agent = self.agent_client.get_agent("Joker") + assert agent is not None + assert agent.name == "Joker" + + def test_single_interaction(self): + """Test a single interaction with the agent.""" + agent = self.agent_client.get_agent("Joker") + thread = agent.get_new_thread() + + response = agent.run("Tell me a short joke about programming.", thread=thread) + + assert response is not None + assert response.text is not None + assert len(response.text) > 0 + + def test_conversation_continuity(self): + """Test that conversation context is maintained across turns.""" + agent = self.agent_client.get_agent("Joker") + thread = agent.get_new_thread() + + # First turn: Ask for a joke about a specific topic + response1 = agent.run("Tell me a joke about cats.", thread=thread) + assert response1 is not None + assert len(response1.text) > 0 + + # Second turn: Ask a follow-up that requires context + response2 = agent.run("Can you make it funnier?", thread=thread) + assert response2 is not None + assert len(response2.text) > 0 + + # The agent should understand "it" refers to the previous joke + + def test_multiple_threads(self): + """Test that different threads maintain separate contexts.""" + agent = self.agent_client.get_agent("Joker") + + # Create two separate threads + thread1 = agent.get_new_thread() + thread2 = agent.get_new_thread() + + assert thread1.session_id != thread2.session_id + + # Send different messages to each thread + response1 = agent.run("Tell me a joke about dogs.", thread=thread1) + response2 = agent.run("Tell me a joke about birds.", thread=thread2) + + assert response1 is not None + assert response2 is not None + assert response1.text != response2.text diff --git a/python/packages/durabletask/tests/integration_tests/test_02_multi_agent.py b/python/packages/durabletask/tests/integration_tests/test_02_multi_agent.py new file mode 100644 index 0000000000..ab55e5ed4c --- /dev/null +++ b/python/packages/durabletask/tests/integration_tests/test_02_multi_agent.py @@ -0,0 +1,112 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Integration tests for multi-agent functionality. + +Tests operations with multiple specialized agents: +- Multiple agent registration +- Agent-specific tool usage +- Independent thread management per agent +- Concurrent agent operations +- Agent isolation and tool routing +""" + +from typing import Any + +import pytest +from agent_framework import FunctionCallContent +from durabletask.azuremanaged.client import DurableTaskSchedulerClient + +from agent_framework_durabletask import DurableAIAgentClient + +# Agent names from the 02_multi_agent sample +WEATHER_AGENT_NAME: str = "WeatherAgent" +MATH_AGENT_NAME: str = "MathAgent" + +# Module-level markers - applied to all tests in this module +pytestmark = [ + pytest.mark.sample("02_multi_agent"), + pytest.mark.integration_test, + pytest.mark.requires_azure_openai, + pytest.mark.requires_dts, +] + + +class TestMultiAgent: + """Test suite for multi-agent functionality.""" + + @pytest.fixture(autouse=True) + def setup(self, worker_process: dict[str, Any], dts_endpoint: str) -> None: + """Setup test fixtures.""" + self.endpoint: str = dts_endpoint + self.taskhub: str = str(worker_process["taskhub"]) + + # Create agent client + dts_client = DurableTaskSchedulerClient( + host_address=self.endpoint, + secure_channel=False, + taskhub=self.taskhub, + token_credential=None, + ) + self.agent_client = DurableAIAgentClient(dts_client) + + def test_multiple_agents_registered(self) -> None: + """Test that both agents are registered and accessible.""" + weather_agent = self.agent_client.get_agent(WEATHER_AGENT_NAME) + math_agent = self.agent_client.get_agent(MATH_AGENT_NAME) + + assert weather_agent is not None + assert weather_agent.name == WEATHER_AGENT_NAME + assert math_agent is not None + assert math_agent.name == MATH_AGENT_NAME + + def test_weather_agent_with_tool(self): + """Test weather agent with weather tool execution.""" + agent = self.agent_client.get_agent(WEATHER_AGENT_NAME) + thread = agent.get_new_thread() + + response = agent.run("What's the weather in Seattle?", thread=thread) + + assert response is not None + assert response.text is not None + # Should contain weather information from the tool + assert len(response.text) > 0 + + # Verify that the get_weather tool was actually invoked + tool_calls = [ + content for msg in response.messages for content in msg.contents if isinstance(content, FunctionCallContent) + ] + assert len(tool_calls) > 0, "Expected at least one tool call" + assert any(call.name == "get_weather" for call in tool_calls), "Expected get_weather tool to be called" + + def test_math_agent_with_tool(self): + """Test math agent with calculation tool execution.""" + agent = self.agent_client.get_agent(MATH_AGENT_NAME) + thread = agent.get_new_thread() + + response = agent.run("Calculate a 20% tip on a $50 bill.", thread=thread) + + assert response is not None + assert response.text is not None + # Should contain calculation results from the tool + assert len(response.text) > 0 + + # Verify that the calculate_tip tool was actually invoked + tool_calls = [ + content for msg in response.messages for content in msg.contents if isinstance(content, FunctionCallContent) + ] + assert len(tool_calls) > 0, "Expected at least one tool call" + assert any(call.name == "calculate_tip" for call in tool_calls), "Expected calculate_tip tool to be called" + + def test_multiple_calls_to_same_agent(self): + """Test multiple sequential calls to the same agent.""" + agent = self.agent_client.get_agent(WEATHER_AGENT_NAME) + thread = agent.get_new_thread() + + # Multiple weather queries + response1 = agent.run("What's the weather in Chicago?", thread=thread) + response2 = agent.run("And what about Los Angeles?", thread=thread) + + assert response1 is not None + assert response2 is not None + assert len(response1.text) > 0 + assert len(response2.text) > 0 diff --git a/python/packages/durabletask/tests/integration_tests/test_03_single_agent_streaming.py b/python/packages/durabletask/tests/integration_tests/test_03_single_agent_streaming.py new file mode 100644 index 0000000000..8d93e1e581 --- /dev/null +++ b/python/packages/durabletask/tests/integration_tests/test_03_single_agent_streaming.py @@ -0,0 +1,225 @@ +# Copyright (c) Microsoft. All rights reserved. + +""" +Integration Tests for Reliable Streaming Sample + +Tests the reliable streaming sample using Redis Streams for persistent message delivery. + +The worker process is automatically started by the test fixture. + +Prerequisites: +- Azure OpenAI credentials configured (see packages/durabletask/tests/integration_tests/.env.example) +- DTS emulator running (docker run -d -p 8080:8080 mcr.microsoft.com/durabletask/emulator:latest) +- Redis running (docker run -d --name redis -p 6379:6379 redis:latest) + +Usage: + uv run pytest packages/durabletask/tests/integration_tests/test_03_single_agent_streaming.py -v +""" + +import asyncio +import os +import sys +import time +from datetime import timedelta +from pathlib import Path +from typing import Any + +import pytest +import redis.asyncio as aioredis +from durabletask.azuremanaged.client import DurableTaskSchedulerClient + +from agent_framework_durabletask import DurableAIAgentClient + +from .testutils import OrchestrationHelper + +# Add sample directory to path to import RedisStreamResponseHandler +SAMPLE_DIR = Path(__file__).parents[4] / "samples" / "getting_started" / "durabletask" / "03_single_agent_streaming" +sys.path.insert(0, str(SAMPLE_DIR)) + +from redis_stream_response_handler import RedisStreamResponseHandler # type: ignore[reportMissingImports] # noqa: E402 + +# Module-level markers - applied to all tests in this file +pytestmark = [ + pytest.mark.sample("03_single_agent_streaming"), + pytest.mark.integration_test, + pytest.mark.requires_azure_openai, + pytest.mark.requires_dts, + pytest.mark.requires_redis, +] + + +class TestSampleReliableStreaming: + """Tests for 03_single_agent_streaming sample.""" + + @pytest.fixture(autouse=True) + def setup(self, worker_process: dict[str, Any], dts_endpoint: str) -> None: + """Setup test fixtures.""" + self.endpoint: str = dts_endpoint + self.taskhub: str = str(worker_process["taskhub"]) + + # Create agent client + dts_client = DurableTaskSchedulerClient( + host_address=self.endpoint, + secure_channel=False, + taskhub=self.taskhub, + token_credential=None, + ) + self.agent_client = DurableAIAgentClient(dts_client) + self.helper = OrchestrationHelper(dts_client) + + # Redis configuration + self.redis_connection_string = os.environ.get("REDIS_CONNECTION_STRING", "redis://localhost:6379") + self.redis_stream_ttl_minutes = int(os.environ.get("REDIS_STREAM_TTL_MINUTES", "10")) + + async def _get_stream_handler(self) -> RedisStreamResponseHandler: # type: ignore[reportMissingTypeStubs] + """Create a new Redis stream handler for each request.""" + redis_client = aioredis.from_url( # type: ignore[reportUnknownMemberType] + self.redis_connection_string, + encoding="utf-8", + decode_responses=False, + ) + return RedisStreamResponseHandler( # type: ignore[reportUnknownMemberType] + redis_client=redis_client, + stream_ttl=timedelta(minutes=self.redis_stream_ttl_minutes), + ) + + async def _stream_from_redis( + self, + thread_id: str, + cursor: str | None = None, + timeout: float = 30.0, + ) -> tuple[str, bool, str]: + """ + Stream responses from Redis using the sample's RedisStreamResponseHandler. + + Args: + thread_id: The conversation/thread ID to stream from + cursor: Optional cursor to resume from + timeout: Maximum time to wait for stream completion + + Returns: + Tuple of (accumulated text, completion status, last entry_id) + """ + accumulated_text = "" + is_complete = False + last_entry_id = cursor if cursor else "0-0" + start_time = time.time() + + async with await self._get_stream_handler() as stream_handler: # type: ignore[reportUnknownMemberType] + try: + async for chunk in stream_handler.read_stream(thread_id, cursor): # type: ignore[reportUnknownMemberType] + if time.time() - start_time > timeout: + break + + last_entry_id = chunk.entry_id # type: ignore[reportUnknownMemberType] + + if chunk.error: # type: ignore[reportUnknownMemberType] + # Stream not found or timeout - this is expected if agent hasn't written yet + # Don't raise an error, just return what we have + break + + if chunk.is_done: # type: ignore[reportUnknownMemberType] + is_complete = True + break + + if chunk.text: # type: ignore[reportUnknownMemberType] + accumulated_text += chunk.text # type: ignore[reportUnknownMemberType] + + except Exception as ex: + # For test purposes, we catch exceptions and return what we have + if "timed out" not in str(ex).lower(): + raise + + return accumulated_text, is_complete, last_entry_id # type: ignore[reportReturnType] + + def test_agent_run_and_stream(self) -> None: + """Test agent execution with Redis streaming.""" + # Get the TravelPlanner agent + travel_planner = self.agent_client.get_agent("TravelPlanner") + assert travel_planner is not None + assert travel_planner.name == "TravelPlanner" + + # Create a new thread + thread = travel_planner.get_new_thread() + assert thread.session_id is not None + assert thread.session_id.key is not None + thread_id = str(thread.session_id.key) + + # Start agent run with wait_for_response=False for non-blocking execution + travel_planner.run( + "Plan a 1-day trip to Seattle in 1 sentence", thread=thread, options={"wait_for_response": False} + ) + + # Poll Redis stream with retries to handle race conditions + # The agent may take a few seconds to process and start writing to Redis + max_retries = 20 # Try for up to 40 seconds (20 * 2 seconds) + retry_count = 0 + text = "" + is_complete = False + + while retry_count < max_retries: + text, is_complete, _ = asyncio.run(self._stream_from_redis(thread_id, timeout=5.0)) + + if len(text) > 0 or is_complete: + # We got content or completion marker + break + + # Wait before retrying + time.sleep(3) + retry_count += 1 + + # Verify we got content + assert len(text) > 0, ( + f"Expected text content but got empty string for thread_id: {thread_id} after {retry_count} retries" + ) + assert "seattle" in text.lower(), f"Expected 'seattle' in response but got: {text}" + assert is_complete, "Expected stream to be complete" + + def test_stream_with_cursor_resumption(self) -> None: + """Test streaming with cursor-based resumption.""" + # Get the TravelPlanner agent + travel_planner = self.agent_client.get_agent("TravelPlanner") + thread = travel_planner.get_new_thread() + assert thread.session_id is not None + assert thread.session_id.key is not None + thread_id = str(thread.session_id.key) + + # Start agent run + travel_planner.run("What's the weather like?", thread=thread, options={"wait_for_response": False}) + + # Wait for agent to start writing + time.sleep(3) + + # Read partial stream to get a cursor + async def get_partial_stream() -> tuple[str, str]: + async with await self._get_stream_handler() as stream_handler: # type: ignore[reportUnknownMemberType] + accumulated_text = "" + last_entry_id = "0-0" + chunk_count = 0 + + # Read just first 2 chunks + async for chunk in stream_handler.read_stream(thread_id): # type: ignore[reportUnknownMemberType] + last_entry_id = chunk.entry_id # type: ignore[reportUnknownMemberType] + if chunk.text: # type: ignore[reportUnknownMemberType] + accumulated_text += chunk.text # type: ignore[reportUnknownMemberType] + chunk_count += 1 + if chunk_count >= 2: + break + + return accumulated_text, last_entry_id # type: ignore[reportReturnType] + + partial_text, cursor = asyncio.run(get_partial_stream()) + + # Resume from cursor + remaining_text, _, _ = asyncio.run(self._stream_from_redis(thread_id, cursor=cursor)) + + # Verify we got some initial content + assert len(partial_text) > 0 + + # Combined text should be coherent + full_text = partial_text + remaining_text + assert len(full_text) > 0 + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/python/packages/durabletask/tests/integration_tests/test_04_single_agent_orchestration_chaining.py b/python/packages/durabletask/tests/integration_tests/test_04_single_agent_orchestration_chaining.py new file mode 100644 index 0000000000..6bfaa69c68 --- /dev/null +++ b/python/packages/durabletask/tests/integration_tests/test_04_single_agent_orchestration_chaining.py @@ -0,0 +1,117 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Integration tests for single agent orchestration with chaining. + +Tests orchestration patterns with sequential agent calls: +- Orchestration registration and execution +- Sequential agent calls on same thread +- Conversation continuity in orchestrations +- Thread context preservation +""" + +import json +import logging +from typing import Any + +import pytest +from durabletask.azuremanaged.client import DurableTaskSchedulerClient +from durabletask.client import OrchestrationStatus + +from agent_framework_durabletask import DurableAIAgentClient + +from .testutils import OrchestrationHelper + +# Agent name from the 04_single_agent_orchestration_chaining sample +WRITER_AGENT_NAME: str = "WriterAgent" + +# Configure logging +logging.basicConfig(level=logging.WARNING) + +# Module-level markers - applied to all tests in this module +pytestmark = [ + pytest.mark.sample("04_single_agent_orchestration_chaining"), + pytest.mark.integration_test, + pytest.mark.requires_azure_openai, + pytest.mark.requires_dts, +] + + +class TestSingleAgentOrchestrationChaining: + """Test suite for single agent orchestration with chaining.""" + + @pytest.fixture(autouse=True) + def setup(self, worker_process: dict[str, Any], dts_endpoint: str) -> None: + """Setup test fixtures.""" + self.endpoint: str = dts_endpoint + self.taskhub: str = str(worker_process["taskhub"]) + + # Create DTS client for orchestrations + self.dts_client = DurableTaskSchedulerClient( + host_address=self.endpoint, + secure_channel=False, + taskhub=self.taskhub, + token_credential=None, + ) + + # Create agent client + self.agent_client = DurableAIAgentClient(self.dts_client) + + # Create orchestration helper + self.orch_helper = OrchestrationHelper(self.dts_client) + + def test_agent_registered(self): + """Test that the Writer agent is registered.""" + agent = self.agent_client.get_agent(WRITER_AGENT_NAME) + assert agent is not None + assert agent.name == WRITER_AGENT_NAME + + def test_chaining_context_preserved(self): + """Test that context is preserved across agent runs in orchestration.""" + # Start the orchestration + instance_id = self.dts_client.schedule_new_orchestration( + orchestrator="single_agent_chaining_orchestration", + input="", + ) + + # Wait for completion with output + metadata, output = self.orch_helper.wait_for_orchestration_with_output( + instance_id=instance_id, + timeout=120.0, + ) + + assert metadata is not None + assert output is not None + + # The final output should be a refined sentence + final_text = json.loads(output) + + # Should be a meaningful sentence (not empty or error message) + assert len(final_text) > 10 + assert not final_text.startswith("Error") + + def test_multiple_orchestration_instances(self): + """Test that multiple orchestration instances can run independently.""" + # Start two orchestrations + instance_id_1 = self.dts_client.schedule_new_orchestration( + orchestrator="single_agent_chaining_orchestration", + input="", + ) + instance_id_2 = self.dts_client.schedule_new_orchestration( + orchestrator="single_agent_chaining_orchestration", + input="", + ) + + assert instance_id_1 != instance_id_2 + + # Both should complete + metadata_1 = self.orch_helper.wait_for_orchestration( + instance_id=instance_id_1, + timeout=120.0, + ) + metadata_2 = self.orch_helper.wait_for_orchestration( + instance_id=instance_id_2, + timeout=120.0, + ) + + assert metadata_1.runtime_status == OrchestrationStatus.COMPLETED + assert metadata_2.runtime_status == OrchestrationStatus.COMPLETED diff --git a/python/packages/durabletask/tests/integration_tests/test_05_multi_agent_orchestration_concurrency.py b/python/packages/durabletask/tests/integration_tests/test_05_multi_agent_orchestration_concurrency.py new file mode 100644 index 0000000000..eaa354f00d --- /dev/null +++ b/python/packages/durabletask/tests/integration_tests/test_05_multi_agent_orchestration_concurrency.py @@ -0,0 +1,93 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Integration tests for multi-agent orchestration with concurrency. + +Tests concurrent execution patterns: +- Parallel agent execution +- Concurrent orchestration tasks +- Independent thread management in parallel +- Result aggregation from concurrent calls +""" + +import json +import logging +from typing import Any + +import pytest +from durabletask.azuremanaged.client import DurableTaskSchedulerClient +from durabletask.client import OrchestrationStatus + +from agent_framework_durabletask import DurableAIAgentClient + +from .testutils import OrchestrationHelper + +# Agent names from the 05_multi_agent_orchestration_concurrency sample +PHYSICIST_AGENT_NAME: str = "PhysicistAgent" +CHEMIST_AGENT_NAME: str = "ChemistAgent" + +# Configure logging +logging.basicConfig(level=logging.WARNING) + +# Module-level markers +pytestmark = [ + pytest.mark.sample("05_multi_agent_orchestration_concurrency"), + pytest.mark.integration_test, + pytest.mark.requires_dts, +] + + +class TestMultiAgentOrchestrationConcurrency: + """Test suite for multi-agent orchestration with concurrency.""" + + @pytest.fixture(autouse=True) + def setup(self, worker_process: dict[str, Any], dts_endpoint: str) -> None: + """Setup test fixtures.""" + self.endpoint = dts_endpoint + self.taskhub = worker_process["taskhub"] + + # Create DTS client for orchestrations + self.dts_client = DurableTaskSchedulerClient( + host_address=self.endpoint, + secure_channel=False, + taskhub=self.taskhub, + token_credential=None, + ) + + # Create agent client + self.agent_client = DurableAIAgentClient(self.dts_client) + + # Create orchestration helper + self.orch_helper = OrchestrationHelper(self.dts_client) + + def test_agents_registered(self): + """Test that both agents are registered.""" + physicist = self.agent_client.get_agent(PHYSICIST_AGENT_NAME) + chemist = self.agent_client.get_agent(CHEMIST_AGENT_NAME) + + assert physicist is not None + assert physicist.name == PHYSICIST_AGENT_NAME + assert chemist is not None + assert chemist.name == CHEMIST_AGENT_NAME + + def test_different_prompts(self): + """Test concurrent orchestration with different prompts.""" + prompts = [ + "What is temperature?", + "Explain molecules.", + ] + + for prompt in prompts: + instance_id = self.dts_client.schedule_new_orchestration( + orchestrator="multi_agent_concurrent_orchestration", + input=prompt, + ) + + metadata, output = self.orch_helper.wait_for_orchestration_with_output( + instance_id=instance_id, + timeout=120.0, + ) + + assert metadata.runtime_status == OrchestrationStatus.COMPLETED + result = json.loads(output) + assert "physicist" in result + assert "chemist" in result diff --git a/python/packages/durabletask/tests/integration_tests/test_06_multi_agent_orchestration_conditionals.py b/python/packages/durabletask/tests/integration_tests/test_06_multi_agent_orchestration_conditionals.py new file mode 100644 index 0000000000..b31baf1faa --- /dev/null +++ b/python/packages/durabletask/tests/integration_tests/test_06_multi_agent_orchestration_conditionals.py @@ -0,0 +1,107 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Integration tests for multi-agent orchestration with conditionals. + +Tests conditional orchestration patterns: +- Conditional branching in orchestrations +- Agent-based decision making +- Activity function execution +- Structured output handling +- Conditional routing based on agent responses +""" + +import logging +from typing import Any + +import pytest +from durabletask.azuremanaged.client import DurableTaskSchedulerClient +from durabletask.client import OrchestrationStatus + +from agent_framework_durabletask import DurableAIAgentClient + +from .testutils import OrchestrationHelper + +# Agent names from the 06_multi_agent_orchestration_conditionals sample +SPAM_AGENT_NAME: str = "SpamDetectionAgent" +EMAIL_AGENT_NAME: str = "EmailAssistantAgent" + +# Configure logging +logging.basicConfig(level=logging.WARNING) + +# Module-level markers +pytestmark = [ + pytest.mark.sample("06_multi_agent_orchestration_conditionals"), + pytest.mark.integration_test, + pytest.mark.requires_dts, +] + + +class TestMultiAgentOrchestrationConditionals: + """Test suite for multi-agent orchestration with conditionals.""" + + @pytest.fixture(autouse=True) + def setup(self, worker_process: dict[str, Any], dts_endpoint: str) -> None: + """Setup test fixtures.""" + self.endpoint: str = dts_endpoint + self.taskhub: str = str(worker_process["taskhub"]) + + # Create DTS client for orchestrations + self.dts_client = DurableTaskSchedulerClient( + host_address=self.endpoint, + secure_channel=False, + taskhub=self.taskhub, + token_credential=None, + ) + + # Create agent client + self.agent_client = DurableAIAgentClient(self.dts_client) + + # Create orchestration helper + self.orch_helper = OrchestrationHelper(self.dts_client) + + def test_agents_registered(self): + """Test that both agents are registered.""" + spam_agent = self.agent_client.get_agent(SPAM_AGENT_NAME) + email_agent = self.agent_client.get_agent(EMAIL_AGENT_NAME) + + assert spam_agent is not None + assert spam_agent.name == SPAM_AGENT_NAME + assert email_agent is not None + assert email_agent.name == EMAIL_AGENT_NAME + + def test_conditional_branching(self): + """Test that conditional branching works correctly.""" + # Test with obvious spam + spam_payload = { + "email_id": "spam-001", + "email_content": "Buy cheap medications online! No prescription needed! Limited time offer!", + } + + spam_instance_id = self.dts_client.schedule_new_orchestration( + orchestrator="spam_detection_orchestration", + input=spam_payload, + ) + + # Test with legitimate email + legit_payload = { + "email_id": "legit-001", + "email_content": "Hi team, please review the attached document before our meeting tomorrow.", + } + + legit_instance_id = self.dts_client.schedule_new_orchestration( + orchestrator="spam_detection_orchestration", + input=legit_payload, + ) + + # Both should complete successfully (different branches) + spam_metadata = self.orch_helper.wait_for_orchestration( + instance_id=spam_instance_id, + timeout=120.0, + ) + legit_metadata = self.orch_helper.wait_for_orchestration( + instance_id=legit_instance_id, + timeout=120.0, + ) + + assert spam_metadata.runtime_status == OrchestrationStatus.COMPLETED + assert legit_metadata.runtime_status == OrchestrationStatus.COMPLETED diff --git a/python/packages/durabletask/tests/integration_tests/test_07_single_agent_orchestration_hitl.py b/python/packages/durabletask/tests/integration_tests/test_07_single_agent_orchestration_hitl.py new file mode 100644 index 0000000000..a12cee021b --- /dev/null +++ b/python/packages/durabletask/tests/integration_tests/test_07_single_agent_orchestration_hitl.py @@ -0,0 +1,182 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Integration tests for single agent orchestration with human-in-the-loop. + +Tests human-in-the-loop (HITL) patterns: +- External event waiting and handling +- Timeout handling in orchestrations +- Iterative refinement with human feedback +- Activity function integration +- Approval workflow patterns +""" + +import logging +from typing import Any + +import pytest +from durabletask.azuremanaged.client import DurableTaskSchedulerClient +from durabletask.client import OrchestrationStatus + +from agent_framework_durabletask import DurableAIAgentClient + +from .testutils import OrchestrationHelper + +# Constants from the 07_single_agent_orchestration_hitl sample +WRITER_AGENT_NAME: str = "WriterAgent" +HUMAN_APPROVAL_EVENT: str = "HumanApproval" + +# Configure logging +logging.basicConfig(level=logging.WARNING) + +# Module-level markers +pytestmark = [ + pytest.mark.sample("07_single_agent_orchestration_hitl"), + pytest.mark.integration_test, + pytest.mark.requires_dts, +] + + +class TestSingleAgentOrchestrationHITL: + """Test suite for single agent orchestration with human-in-the-loop.""" + + @pytest.fixture(autouse=True) + def setup(self, worker_process: dict[str, Any], dts_endpoint: str) -> None: + """Setup test fixtures.""" + self.endpoint: str = str(worker_process["endpoint"]) + self.taskhub: str = str(worker_process["taskhub"]) + + logging.info(f"Using taskhub: {self.taskhub} at endpoint: {self.endpoint}") + + # Create DTS client for orchestrations + self.dts_client = DurableTaskSchedulerClient( + host_address=self.endpoint, + secure_channel=False, + taskhub=self.taskhub, + token_credential=None, + ) + + # Create agent client + self.agent_client = DurableAIAgentClient(self.dts_client) + + # Create orchestration helper + self.orch_helper = OrchestrationHelper(self.dts_client) + + def test_agent_registered(self): + """Test that the Writer agent is registered.""" + agent = self.agent_client.get_agent(WRITER_AGENT_NAME) + assert agent is not None + assert agent.name == WRITER_AGENT_NAME + + def test_hitl_orchestration_with_approval(self): + """Test HITL orchestration with immediate approval.""" + payload = { + "topic": "The benefits of continuous learning", + "max_review_attempts": 3, + "approval_timeout_seconds": 60, + } + + # Start the orchestration + instance_id = self.dts_client.schedule_new_orchestration( + orchestrator="content_generation_hitl_orchestration", + input=payload, + ) + + assert instance_id is not None + + # Wait for orchestration to reach notification point + notification_received = self.orch_helper.wait_for_notification(instance_id, timeout_seconds=90) + assert notification_received, "Failed to receive notification from orchestration" + + # Send approval event + approval_data = {"approved": True, "feedback": ""} + self.orch_helper.raise_event( + instance_id=instance_id, + event_name=HUMAN_APPROVAL_EVENT, + event_data=approval_data, + ) + + # Wait for completion + metadata = self.orch_helper.wait_for_orchestration( + instance_id=instance_id, + timeout=90.0, + ) + + assert metadata is not None + assert metadata.runtime_status == OrchestrationStatus.COMPLETED + + def test_hitl_orchestration_with_rejection_and_feedback(self): + """Test HITL orchestration with rejection and iterative refinement.""" + payload = { + "topic": "Artificial Intelligence in healthcare", + "max_review_attempts": 3, + "approval_timeout_seconds": 60, + } + + # Start the orchestration + instance_id = self.dts_client.schedule_new_orchestration( + orchestrator="content_generation_hitl_orchestration", + input=payload, + ) + + # Wait for orchestration to reach notification point + notification_received = self.orch_helper.wait_for_notification(instance_id, timeout_seconds=90) + assert notification_received, "Failed to receive notification from orchestration" + + # First rejection with feedback + rejection_data = { + "approved": False, + "feedback": "Please make it more concise and add specific examples.", + } + self.orch_helper.raise_event( + instance_id=instance_id, + event_name=HUMAN_APPROVAL_EVENT, + event_data=rejection_data, + ) + + # Wait for orchestration to refine and reach notification point again + notification_received = self.orch_helper.wait_for_notification(instance_id, timeout_seconds=90) + assert notification_received, "Failed to receive notification after refinement" + + # Second approval + approval_data = {"approved": True, "feedback": ""} + self.orch_helper.raise_event( + instance_id=instance_id, + event_name=HUMAN_APPROVAL_EVENT, + event_data=approval_data, + ) + + # Wait for completion + metadata = self.orch_helper.wait_for_orchestration( + instance_id=instance_id, + timeout=90.0, + ) + + assert metadata is not None + assert metadata.runtime_status == OrchestrationStatus.COMPLETED + + def test_hitl_orchestration_timeout(self): + """Test HITL orchestration timeout behavior.""" + payload = { + "topic": "Cloud computing fundamentals", + "max_review_attempts": 1, + "approval_timeout_seconds": 0.1, # Short timeout for testing + } + + # Start the orchestration + instance_id = self.dts_client.schedule_new_orchestration( + orchestrator="content_generation_hitl_orchestration", + input=payload, + ) + + # Don't send any approval - let it timeout + # The orchestration should fail due to timeout + try: + metadata = self.orch_helper.wait_for_orchestration( + instance_id=instance_id, + timeout=90.0, + ) + # If it completes, it should be failed status due to timeout + assert metadata.runtime_status == OrchestrationStatus.FAILED + except (RuntimeError, TimeoutError): + # Expected - orchestration should timeout and fail + pass diff --git a/python/packages/durabletask/tests/integration_tests/testutils.py b/python/packages/durabletask/tests/integration_tests/testutils.py new file mode 100644 index 0000000000..41a0e6f2c6 --- /dev/null +++ b/python/packages/durabletask/tests/integration_tests/testutils.py @@ -0,0 +1,165 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Test utilities for durabletask integration tests.""" + +import json +import time +from typing import Any + +from durabletask.azuremanaged.client import DurableTaskSchedulerClient +from durabletask.client import OrchestrationStatus + + +class OrchestrationHelper: + """Helper class for orchestration-related test operations.""" + + def __init__(self, dts_client: DurableTaskSchedulerClient): + """ + Initialize the orchestration helper. + + Args: + dts_client: The DurableTaskSchedulerClient instance to use + """ + self.client = dts_client + + def wait_for_orchestration( + self, + instance_id: str, + timeout: float = 60.0, + poll_interval: float = 0.5, + ) -> Any: + """ + Wait for an orchestration to complete. + + Args: + instance_id: The orchestration instance ID + timeout: Maximum time to wait in seconds + poll_interval: Time between polling attempts in seconds + + Returns: + The final OrchestrationMetadata + + Raises: + TimeoutError: If the orchestration doesn't complete within timeout + RuntimeError: If the orchestration fails + """ + # Use the built-in wait_for_orchestration_completion method + metadata = self.client.wait_for_orchestration_completion( + instance_id=instance_id, + timeout=int(timeout), + ) + + if metadata is None: + raise TimeoutError(f"Orchestration {instance_id} did not complete within {timeout} seconds") + + # Check if failed or terminated + if metadata.runtime_status == OrchestrationStatus.FAILED: + raise RuntimeError(f"Orchestration {instance_id} failed: {metadata.serialized_custom_status}") + if metadata.runtime_status == OrchestrationStatus.TERMINATED: + raise RuntimeError(f"Orchestration {instance_id} was terminated") + + return metadata + + def wait_for_orchestration_with_output( + self, + instance_id: str, + timeout: float = 60.0, + poll_interval: float = 0.5, + ) -> tuple[Any, Any]: + """ + Wait for an orchestration to complete and return its output. + + Args: + instance_id: The orchestration instance ID + timeout: Maximum time to wait in seconds + poll_interval: Time between polling attempts in seconds + + Returns: + A tuple of (OrchestrationMetadata, output) + + Raises: + TimeoutError: If the orchestration doesn't complete within timeout + RuntimeError: If the orchestration fails + """ + metadata = self.wait_for_orchestration(instance_id, timeout, poll_interval) + + # The output should be available in the metadata + return metadata, metadata.serialized_output + + def get_orchestration_status(self, instance_id: str) -> Any | None: + """ + Get the current status of an orchestration. + + Args: + instance_id: The orchestration instance ID + + Returns: + The OrchestrationMetadata or None if not found + """ + try: + # Try to wait with a short timeout to get current status + return self.client.wait_for_orchestration_completion( + instance_id=instance_id, + timeout=1, # Very short timeout, just checking status + ) + except Exception: + return None + + def raise_event( + self, + instance_id: str, + event_name: str, + event_data: Any = None, + ) -> None: + """ + Raise an external event to an orchestration. + + Args: + instance_id: The orchestration instance ID + event_name: The name of the event + event_data: The event data payload + """ + self.client.raise_orchestration_event(instance_id, event_name, data=event_data) + + def wait_for_notification(self, instance_id: str, timeout_seconds: int = 30) -> bool: + """Wait for the orchestration to reach a notification point. + + Polls the orchestration status until it appears to be waiting for approval. + + Args: + instance_id: The orchestration instance ID + timeout_seconds: Maximum time to wait + + Returns: + True if notification detected, False if timeout + """ + start_time = time.time() + while time.time() - start_time < timeout_seconds: + try: + metadata = self.client.get_orchestration_state( + instance_id=instance_id, + ) + + if metadata: + # Check if we're waiting for approval by examining custom status + if metadata.serialized_custom_status: + try: + custom_status = json.loads(metadata.serialized_custom_status) + # Handle both string and dict custom status + status_str = custom_status if isinstance(custom_status, str) else str(custom_status) + if status_str.lower().startswith("requesting human feedback"): + return True + except (json.JSONDecodeError, AttributeError): + # If it's not JSON, treat as plain string + if metadata.serialized_custom_status.lower().startswith("requesting human feedback"): + return True + + # Check for terminal states + if metadata.runtime_status.name == "COMPLETED" or metadata.runtime_status.name == "FAILED": + return False + except Exception: + pass + + time.sleep(1) + + return False diff --git a/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/function_app.py b/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/function_app.py index 2e394faea2..34b0937ed4 100644 --- a/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/function_app.py +++ b/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/function_app.py @@ -134,7 +134,14 @@ def content_generation_hitl_orchestration(context: DurableOrchestrationContext) ) return {"content": content.content} - context.set_custom_status("Content rejected by human reviewer. Incorporating feedback and regenerating...") + context.set_custom_status( + "Content rejected by human reviewer. Incorporating feedback and regenerating..." + ) + + # Check if we've exhausted attempts + if attempt >= payload.max_review_attempts: + break + rewrite_prompt = ( "The content was rejected by a human reviewer. Please rewrite the article incorporating their feedback.\n\n" f"Human Feedback: {approval_payload.feedback or 'No feedback provided.'}" @@ -154,9 +161,15 @@ def content_generation_hitl_orchestration(context: DurableOrchestrationContext) context.set_custom_status( f"Human approval timed out after {payload.approval_timeout_hours} hour(s). Treating as rejection." ) - raise TimeoutError(f"Human approval timed out after {payload.approval_timeout_hours} hour(s).") - - raise RuntimeError(f"Content could not be approved after {payload.max_review_attempts} iteration(s).") + raise TimeoutError( + f"Human approval timed out after {payload.approval_timeout_hours} hour(s)." + ) + + # If we exit the loop without returning, max attempts were exhausted + context.set_custom_status("Max review attempts exhausted.") + raise RuntimeError( + f"Content could not be approved after {payload.max_review_attempts} iteration(s)." + ) # 5. HTTP endpoint that starts the human-in-the-loop orchestration. diff --git a/python/samples/getting_started/durabletask/03_single_agent_streaming/worker.py b/python/samples/getting_started/durabletask/03_single_agent_streaming/worker.py index 93af682efc..a82716e995 100644 --- a/python/samples/getting_started/durabletask/03_single_agent_streaming/worker.py +++ b/python/samples/getting_started/durabletask/03_single_agent_streaming/worker.py @@ -18,7 +18,7 @@ from datetime import timedelta import redis.asyncio as aioredis -from agent_framework import AgentRunResponseUpdate +from agent_framework import AgentResponseUpdate from agent_framework.azure import AzureOpenAIChatClient from agent_framework_durabletask import AgentCallbackContext, AgentResponseCallbackProtocol, DurableAIAgentWorker from azure.identity import AzureCliCredential, DefaultAzureCredential @@ -66,7 +66,7 @@ def __init__(self) -> None: async def on_streaming_response_update( self, - update: AgentRunResponseUpdate, + update: AgentResponseUpdate, context: AgentCallbackContext, ) -> None: """Write streaming update to Redis Stream. diff --git a/python/samples/getting_started/durabletask/04_single_agent_orchestration_chaining/worker.py b/python/samples/getting_started/durabletask/04_single_agent_orchestration_chaining/worker.py index f5cc8f33bf..3e7d2eca12 100644 --- a/python/samples/getting_started/durabletask/04_single_agent_orchestration_chaining/worker.py +++ b/python/samples/getting_started/durabletask/04_single_agent_orchestration_chaining/worker.py @@ -15,7 +15,7 @@ import logging import os -from agent_framework import AgentRunResponse +from agent_framework import AgentResponse from agent_framework.azure import AzureOpenAIChatClient from agent_framework_durabletask import DurableAIAgentOrchestrationContext, DurableAIAgentWorker from azure.identity import AzureCliCredential, DefaultAzureCredential @@ -61,7 +61,7 @@ def get_orchestration(): def single_agent_chaining_orchestration( context: OrchestrationContext, _: str -) -> Generator[Task[AgentRunResponse], AgentRunResponse, str]: +) -> Generator[Task[AgentResponse], AgentResponse, str]: """Orchestration that runs the writer agent twice on the same thread. This demonstrates chaining behavior where the output of the first agent run diff --git a/python/samples/getting_started/durabletask/07_single_agent_orchestration_hitl/worker.py b/python/samples/getting_started/durabletask/07_single_agent_orchestration_hitl/worker.py index 3626cedc4e..5b40769437 100644 --- a/python/samples/getting_started/durabletask/07_single_agent_orchestration_hitl/worker.py +++ b/python/samples/getting_started/durabletask/07_single_agent_orchestration_hitl/worker.py @@ -72,7 +72,7 @@ def create_writer_agent(): ) -def notify_user_for_approval(context: ActivityContext, content: dict[str, str]) -> None: +def notify_user_for_approval(context: ActivityContext, content: dict[str, str]) -> str: """Activity function to notify user for approval. Args: @@ -84,8 +84,9 @@ def notify_user_for_approval(context: ActivityContext, content: dict[str, str]) logger.info(f"Title: {model.title or '(untitled)'}") logger.info(f"Content: {model.content}") logger.info("Use the client to send approval or rejection.") + return "Notification sent to user for approval." -def publish_content(context: ActivityContext, content: dict[str, str]) -> None: +def publish_content(context: ActivityContext, content: dict[str, str]) -> str: """Activity function to publish approved content. Args: @@ -96,6 +97,7 @@ def publish_content(context: ActivityContext, content: dict[str, str]) -> None: logger.info("PUBLISHING: Content has been published successfully:") logger.info(f"Title: {model.title or '(untitled)'}") logger.info(f"Content: {model.content}") + return "Published content successfully." def content_generation_hitl_orchestration( @@ -230,6 +232,14 @@ def content_generation_hitl_orchestration( # Content rejected - incorporate feedback and regenerate logger.debug(f"[Orchestration] Content rejected. Feedback: {approval.feedback}") + + # Check if we've exhausted attempts + if attempt >= payload.max_review_attempts: + context.set_custom_status("Max review attempts exhausted.") + # Max attempts exhausted + logger.error(f"[Orchestration] Max attempts ({payload.max_review_attempts}) exhausted") + break + context.set_custom_status(f"Content rejected by human reviewer. Regenerating...") rewrite_prompt = ( @@ -262,7 +272,8 @@ def content_generation_hitl_orchestration( f"Human approval timed out after {payload.approval_timeout_seconds} second(s)." ) - # Max attempts exhausted + # If we exit the loop without returning, max attempts were exhausted + context.set_custom_status("Max review attempts exhausted.") raise RuntimeError( f"Content could not be approved after {payload.max_review_attempts} iteration(s)." ) diff --git a/python/uv.lock b/python/uv.lock index be027c7845..2d7aaf0277 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -455,7 +455,7 @@ provides-extras = ["dev", "all"] [[package]] name = "agent-framework-durabletask" -version = "0.0.1" +version = "0.0.1b260113" source = { editable = "packages/durabletask" } dependencies = [ { name = "agent-framework-core", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, @@ -2362,7 +2362,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/32/6a/33d1702184d94106d3cdd7bfb788e19723206fce152e303473ca3b946c7b/greenlet-3.3.0-cp310-cp310-macosx_11_0_universal2.whl", hash = "sha256:6f8496d434d5cb2dce025773ba5597f71f5410ae499d5dd9533e0653258cdb3d", size = 273658, upload-time = "2025-12-04T14:23:37.494Z" }, { url = "https://files.pythonhosted.org/packages/d6/b7/2b5805bbf1907c26e434f4e448cd8b696a0b71725204fa21a211ff0c04a7/greenlet-3.3.0-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b96dc7eef78fd404e022e165ec55327f935b9b52ff355b067eb4a0267fc1cffb", size = 574810, upload-time = "2025-12-04T14:50:04.154Z" }, { url = "https://files.pythonhosted.org/packages/94/38/343242ec12eddf3d8458c73f555c084359883d4ddc674240d9e61ec51fd6/greenlet-3.3.0-cp310-cp310-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:73631cd5cccbcfe63e3f9492aaa664d278fda0ce5c3d43aeda8e77317e38efbd", size = 586248, upload-time = "2025-12-04T14:57:39.35Z" }, - { url = "https://files.pythonhosted.org/packages/f0/d0/0ae86792fb212e4384041e0ef8e7bc66f59a54912ce407d26a966ed2914d/greenlet-3.3.0-cp310-cp310-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:b299a0cb979f5d7197442dccc3aee67fce53500cd88951b7e6c35575701c980b", size = 597403, upload-time = "2025-12-04T15:07:10.831Z" }, { url = "https://files.pythonhosted.org/packages/b6/a8/15d0aa26c0036a15d2659175af00954aaaa5d0d66ba538345bd88013b4d7/greenlet-3.3.0-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:7dee147740789a4632cace364816046e43310b59ff8fb79833ab043aefa72fd5", size = 586910, upload-time = "2025-12-04T14:25:59.705Z" }, { url = "https://files.pythonhosted.org/packages/e1/9b/68d5e3b7ccaba3907e5532cf8b9bf16f9ef5056a008f195a367db0ff32db/greenlet-3.3.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:39b28e339fc3c348427560494e28d8a6f3561c8d2bcf7d706e1c624ed8d822b9", size = 1547206, upload-time = "2025-12-04T15:04:21.027Z" }, { url = "https://files.pythonhosted.org/packages/66/bd/e3086ccedc61e49f91e2cfb5ffad9d8d62e5dc85e512a6200f096875b60c/greenlet-3.3.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:b3c374782c2935cc63b2a27ba8708471de4ad1abaa862ffdb1ef45a643ddbb7d", size = 1613359, upload-time = "2025-12-04T14:27:26.548Z" }, @@ -2370,7 +2369,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/1f/cb/48e964c452ca2b92175a9b2dca037a553036cb053ba69e284650ce755f13/greenlet-3.3.0-cp311-cp311-macosx_11_0_universal2.whl", hash = "sha256:e29f3018580e8412d6aaf5641bb7745d38c85228dacf51a73bd4e26ddf2a6a8e", size = 274908, upload-time = "2025-12-04T14:23:26.435Z" }, { url = "https://files.pythonhosted.org/packages/28/da/38d7bff4d0277b594ec557f479d65272a893f1f2a716cad91efeb8680953/greenlet-3.3.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:a687205fb22794e838f947e2194c0566d3812966b41c78709554aa883183fb62", size = 577113, upload-time = "2025-12-04T14:50:05.493Z" }, { url = "https://files.pythonhosted.org/packages/3c/f2/89c5eb0faddc3ff014f1c04467d67dee0d1d334ab81fadbf3744847f8a8a/greenlet-3.3.0-cp311-cp311-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:4243050a88ba61842186cb9e63c7dfa677ec146160b0efd73b855a3d9c7fcf32", size = 590338, upload-time = "2025-12-04T14:57:41.136Z" }, - { url = "https://files.pythonhosted.org/packages/80/d7/db0a5085035d05134f8c089643da2b44cc9b80647c39e93129c5ef170d8f/greenlet-3.3.0-cp311-cp311-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:670d0f94cd302d81796e37299bcd04b95d62403883b24225c6b5271466612f45", size = 601098, upload-time = "2025-12-04T15:07:11.898Z" }, { url = "https://files.pythonhosted.org/packages/dc/a6/e959a127b630a58e23529972dbc868c107f9d583b5a9f878fb858c46bc1a/greenlet-3.3.0-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:6cb3a8ec3db4a3b0eb8a3c25436c2d49e3505821802074969db017b87bc6a948", size = 590206, upload-time = "2025-12-04T14:26:01.254Z" }, { url = "https://files.pythonhosted.org/packages/48/60/29035719feb91798693023608447283b266b12efc576ed013dd9442364bb/greenlet-3.3.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:2de5a0b09eab81fc6a382791b995b1ccf2b172a9fec934747a7a23d2ff291794", size = 1550668, upload-time = "2025-12-04T15:04:22.439Z" }, { url = "https://files.pythonhosted.org/packages/0a/5f/783a23754b691bfa86bd72c3033aa107490deac9b2ef190837b860996c9f/greenlet-3.3.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:4449a736606bd30f27f8e1ff4678ee193bc47f6ca810d705981cfffd6ce0d8c5", size = 1615483, upload-time = "2025-12-04T14:27:28.083Z" }, @@ -2378,7 +2376,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f8/0a/a3871375c7b9727edaeeea994bfff7c63ff7804c9829c19309ba2e058807/greenlet-3.3.0-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:b01548f6e0b9e9784a2c99c5651e5dc89ffcbe870bc5fb2e5ef864e9cc6b5dcb", size = 276379, upload-time = "2025-12-04T14:23:30.498Z" }, { url = "https://files.pythonhosted.org/packages/43/ab/7ebfe34dce8b87be0d11dae91acbf76f7b8246bf9d6b319c741f99fa59c6/greenlet-3.3.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:349345b770dc88f81506c6861d22a6ccd422207829d2c854ae2af8025af303e3", size = 597294, upload-time = "2025-12-04T14:50:06.847Z" }, { url = "https://files.pythonhosted.org/packages/a4/39/f1c8da50024feecd0793dbd5e08f526809b8ab5609224a2da40aad3a7641/greenlet-3.3.0-cp312-cp312-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:e8e18ed6995e9e2c0b4ed264d2cf89260ab3ac7e13555b8032b25a74c6d18655", size = 607742, upload-time = "2025-12-04T14:57:42.349Z" }, - { url = "https://files.pythonhosted.org/packages/77/cb/43692bcd5f7a0da6ec0ec6d58ee7cddb606d055ce94a62ac9b1aa481e969/greenlet-3.3.0-cp312-cp312-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:c024b1e5696626890038e34f76140ed1daf858e37496d33f2af57f06189e70d7", size = 622297, upload-time = "2025-12-04T15:07:13.552Z" }, { url = "https://files.pythonhosted.org/packages/75/b0/6bde0b1011a60782108c01de5913c588cf51a839174538d266de15e4bf4d/greenlet-3.3.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:047ab3df20ede6a57c35c14bf5200fcf04039d50f908270d3f9a7a82064f543b", size = 609885, upload-time = "2025-12-04T14:26:02.368Z" }, { url = "https://files.pythonhosted.org/packages/49/0e/49b46ac39f931f59f987b7cd9f34bfec8ef81d2a1e6e00682f55be5de9f4/greenlet-3.3.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:2d9ad37fc657b1102ec880e637cccf20191581f75c64087a549e66c57e1ceb53", size = 1567424, upload-time = "2025-12-04T15:04:23.757Z" }, { url = "https://files.pythonhosted.org/packages/05/f5/49a9ac2dff7f10091935def9165c90236d8f175afb27cbed38fb1d61ab6b/greenlet-3.3.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:83cd0e36932e0e7f36a64b732a6f60c2fc2df28c351bae79fbaf4f8092fe7614", size = 1636017, upload-time = "2025-12-04T14:27:29.688Z" }, @@ -2386,7 +2383,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/02/2f/28592176381b9ab2cafa12829ba7b472d177f3acc35d8fbcf3673d966fff/greenlet-3.3.0-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:a1e41a81c7e2825822f4e068c48cb2196002362619e2d70b148f20a831c00739", size = 275140, upload-time = "2025-12-04T14:23:01.282Z" }, { url = "https://files.pythonhosted.org/packages/2c/80/fbe937bf81e9fca98c981fe499e59a3f45df2a04da0baa5c2be0dca0d329/greenlet-3.3.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:9f515a47d02da4d30caaa85b69474cec77b7929b2e936ff7fb853d42f4bf8808", size = 599219, upload-time = "2025-12-04T14:50:08.309Z" }, { url = "https://files.pythonhosted.org/packages/c2/ff/7c985128f0514271b8268476af89aee6866df5eec04ac17dcfbc676213df/greenlet-3.3.0-cp313-cp313-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:7d2d9fd66bfadf230b385fdc90426fcd6eb64db54b40c495b72ac0feb5766c54", size = 610211, upload-time = "2025-12-04T14:57:43.968Z" }, - { url = "https://files.pythonhosted.org/packages/79/07/c47a82d881319ec18a4510bb30463ed6891f2ad2c1901ed5ec23d3de351f/greenlet-3.3.0-cp313-cp313-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:30a6e28487a790417d036088b3bcb3f3ac7d8babaa7d0139edbaddebf3af9492", size = 624311, upload-time = "2025-12-04T15:07:14.697Z" }, { url = "https://files.pythonhosted.org/packages/fd/8e/424b8c6e78bd9837d14ff7df01a9829fc883ba2ab4ea787d4f848435f23f/greenlet-3.3.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:087ea5e004437321508a8d6f20efc4cfec5e3c30118e1417ea96ed1d93950527", size = 612833, upload-time = "2025-12-04T14:26:03.669Z" }, { url = "https://files.pythonhosted.org/packages/b5/ba/56699ff9b7c76ca12f1cdc27a886d0f81f2189c3455ff9f65246780f713d/greenlet-3.3.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:ab97cf74045343f6c60a39913fa59710e4bd26a536ce7ab2397adf8b27e67c39", size = 1567256, upload-time = "2025-12-04T15:04:25.276Z" }, { url = "https://files.pythonhosted.org/packages/1e/37/f31136132967982d698c71a281a8901daf1a8fbab935dce7c0cf15f942cc/greenlet-3.3.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:5375d2e23184629112ca1ea89a53389dddbffcf417dad40125713d88eb5f96e8", size = 1636483, upload-time = "2025-12-04T14:27:30.804Z" }, @@ -2394,7 +2390,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d7/7c/f0a6d0ede2c7bf092d00bc83ad5bafb7e6ec9b4aab2fbdfa6f134dc73327/greenlet-3.3.0-cp314-cp314-macosx_11_0_universal2.whl", hash = "sha256:60c2ef0f578afb3c8d92ea07ad327f9a062547137afe91f38408f08aacab667f", size = 275671, upload-time = "2025-12-04T14:23:05.267Z" }, { url = "https://files.pythonhosted.org/packages/44/06/dac639ae1a50f5969d82d2e3dd9767d30d6dbdbab0e1a54010c8fe90263c/greenlet-3.3.0-cp314-cp314-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0a5d554d0712ba1de0a6c94c640f7aeba3f85b3a6e1f2899c11c2c0428da9365", size = 646360, upload-time = "2025-12-04T14:50:10.026Z" }, { url = "https://files.pythonhosted.org/packages/e0/94/0fb76fe6c5369fba9bf98529ada6f4c3a1adf19e406a47332245ef0eb357/greenlet-3.3.0-cp314-cp314-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:3a898b1e9c5f7307ebbde4102908e6cbfcb9ea16284a3abe15cab996bee8b9b3", size = 658160, upload-time = "2025-12-04T14:57:45.41Z" }, - { url = "https://files.pythonhosted.org/packages/93/79/d2c70cae6e823fac36c3bbc9077962105052b7ef81db2f01ec3b9bf17e2b/greenlet-3.3.0-cp314-cp314-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:dcd2bdbd444ff340e8d6bdf54d2f206ccddbb3ccfdcd3c25bf4afaa7b8f0cf45", size = 671388, upload-time = "2025-12-04T15:07:15.789Z" }, { url = "https://files.pythonhosted.org/packages/b8/14/bab308fc2c1b5228c3224ec2bf928ce2e4d21d8046c161e44a2012b5203e/greenlet-3.3.0-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5773edda4dc00e173820722711d043799d3adb4f01731f40619e07ea2750b955", size = 660166, upload-time = "2025-12-04T14:26:05.099Z" }, { url = "https://files.pythonhosted.org/packages/4b/d2/91465d39164eaa0085177f61983d80ffe746c5a1860f009811d498e7259c/greenlet-3.3.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:ac0549373982b36d5fd5d30beb8a7a33ee541ff98d2b502714a09f1169f31b55", size = 1615193, upload-time = "2025-12-04T15:04:27.041Z" }, { url = "https://files.pythonhosted.org/packages/42/1b/83d110a37044b92423084d52d5d5a3b3a73cafb51b547e6d7366ff62eff1/greenlet-3.3.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:d198d2d977460358c3b3a4dc844f875d1adb33817f0613f663a656f463764ccc", size = 1683653, upload-time = "2025-12-04T14:27:32.366Z" }, @@ -2402,7 +2397,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a0/66/bd6317bc5932accf351fc19f177ffba53712a202f9df10587da8df257c7e/greenlet-3.3.0-cp314-cp314t-macosx_11_0_universal2.whl", hash = "sha256:d6ed6f85fae6cdfdb9ce04c9bf7a08d666cfcfb914e7d006f44f840b46741931", size = 282638, upload-time = "2025-12-04T14:25:20.941Z" }, { url = "https://files.pythonhosted.org/packages/30/cf/cc81cb030b40e738d6e69502ccbd0dd1bced0588e958f9e757945de24404/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:d9125050fcf24554e69c4cacb086b87b3b55dc395a8b3ebe6487b045b2614388", size = 651145, upload-time = "2025-12-04T14:50:11.039Z" }, { url = "https://files.pythonhosted.org/packages/9c/ea/1020037b5ecfe95ca7df8d8549959baceb8186031da83d5ecceff8b08cd2/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:87e63ccfa13c0a0f6234ed0add552af24cc67dd886731f2261e46e241608bee3", size = 654236, upload-time = "2025-12-04T14:57:47.007Z" }, - { url = "https://files.pythonhosted.org/packages/69/cc/1e4bae2e45ca2fa55299f4e85854606a78ecc37fead20d69322f96000504/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:2662433acbca297c9153a4023fe2161c8dcfdcc91f10433171cf7e7d94ba2221", size = 662506, upload-time = "2025-12-04T15:07:16.906Z" }, { url = "https://files.pythonhosted.org/packages/57/b9/f8025d71a6085c441a7eaff0fd928bbb275a6633773667023d19179fe815/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:3c6e9b9c1527a78520357de498b0e709fb9e2f49c3a513afd5a249007261911b", size = 653783, upload-time = "2025-12-04T14:26:06.225Z" }, { url = "https://files.pythonhosted.org/packages/f6/c7/876a8c7a7485d5d6b5c6821201d542ef28be645aa024cfe1145b35c120c1/greenlet-3.3.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:286d093f95ec98fdd92fcb955003b8a3d054b4e2cab3e2707a5039e7b50520fd", size = 1614857, upload-time = "2025-12-04T15:04:28.484Z" }, { url = "https://files.pythonhosted.org/packages/4f/dc/041be1dff9f23dac5f48a43323cd0789cb798342011c19a248d9c9335536/greenlet-3.3.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:6c10513330af5b8ae16f023e8ddbfb486ab355d04467c4679c5cfe4659975dd9", size = 1676034, upload-time = "2025-12-04T14:27:33.531Z" }, From a4ba140a768bb361dde205b6f4459a81f22789bf Mon Sep 17 00:00:00 2001 From: Laveesh Rohra Date: Tue, 20 Jan 2026 16:45:35 -0800 Subject: [PATCH 2/4] Fix flaky test --- .../tests/integration_tests/conftest.py | 3 ++ .../integration_tests/test_01_single_agent.py | 12 +---- .../integration_tests/test_02_multi_agent.py | 12 +---- .../test_03_single_agent_streaming.py | 45 ++++++++++--------- ..._04_single_agent_orchestration_chaining.py | 18 ++------ ...5_multi_agent_orchestration_concurrency.py | 18 ++------ ..._multi_agent_orchestration_conditionals.py | 18 ++------ ...test_07_single_agent_orchestration_hitl.py | 18 ++------ .../tests/integration_tests/testutils.py | 42 +++++++++++++++++ 9 files changed, 84 insertions(+), 102 deletions(-) diff --git a/python/packages/durabletask/tests/integration_tests/conftest.py b/python/packages/durabletask/tests/integration_tests/conftest.py index 27c4468888..387309282f 100644 --- a/python/packages/durabletask/tests/integration_tests/conftest.py +++ b/python/packages/durabletask/tests/integration_tests/conftest.py @@ -17,6 +17,9 @@ from dotenv import load_dotenv from durabletask.azuremanaged.client import DurableTaskSchedulerClient +# Add the integration_tests directory to the path so testutils can be imported +sys.path.insert(0, str(Path(__file__).parent)) + # Load environment variables from .env file load_dotenv() diff --git a/python/packages/durabletask/tests/integration_tests/test_01_single_agent.py b/python/packages/durabletask/tests/integration_tests/test_01_single_agent.py index c1147746c7..6b1c7310f3 100644 --- a/python/packages/durabletask/tests/integration_tests/test_01_single_agent.py +++ b/python/packages/durabletask/tests/integration_tests/test_01_single_agent.py @@ -13,9 +13,7 @@ from typing import Any import pytest -from durabletask.azuremanaged.client import DurableTaskSchedulerClient - -from agent_framework_durabletask import DurableAIAgentClient +from testutils import create_agent_client # Module-level markers - applied to all tests in this module pytestmark = [ @@ -36,13 +34,7 @@ def setup(self, worker_process: dict[str, Any], dts_endpoint: str) -> None: self.taskhub: str = str(worker_process["taskhub"]) # Create agent client - dts_client = DurableTaskSchedulerClient( - host_address=self.endpoint, - secure_channel=False, - taskhub=self.taskhub, - token_credential=None, - ) - self.agent_client = DurableAIAgentClient(dts_client) + _, self.agent_client = create_agent_client(self.endpoint, self.taskhub) def test_agent_registration(self) -> None: """Test that the Joker agent is registered and accessible.""" diff --git a/python/packages/durabletask/tests/integration_tests/test_02_multi_agent.py b/python/packages/durabletask/tests/integration_tests/test_02_multi_agent.py index ab55e5ed4c..0054a96d2a 100644 --- a/python/packages/durabletask/tests/integration_tests/test_02_multi_agent.py +++ b/python/packages/durabletask/tests/integration_tests/test_02_multi_agent.py @@ -14,9 +14,7 @@ import pytest from agent_framework import FunctionCallContent -from durabletask.azuremanaged.client import DurableTaskSchedulerClient - -from agent_framework_durabletask import DurableAIAgentClient +from testutils import create_agent_client # Agent names from the 02_multi_agent sample WEATHER_AGENT_NAME: str = "WeatherAgent" @@ -41,13 +39,7 @@ def setup(self, worker_process: dict[str, Any], dts_endpoint: str) -> None: self.taskhub: str = str(worker_process["taskhub"]) # Create agent client - dts_client = DurableTaskSchedulerClient( - host_address=self.endpoint, - secure_channel=False, - taskhub=self.taskhub, - token_credential=None, - ) - self.agent_client = DurableAIAgentClient(dts_client) + _, self.agent_client = create_agent_client(self.endpoint, self.taskhub) def test_multiple_agents_registered(self) -> None: """Test that both agents are registered and accessible.""" diff --git a/python/packages/durabletask/tests/integration_tests/test_03_single_agent_streaming.py b/python/packages/durabletask/tests/integration_tests/test_03_single_agent_streaming.py index 8d93e1e581..8cd40eb3ed 100644 --- a/python/packages/durabletask/tests/integration_tests/test_03_single_agent_streaming.py +++ b/python/packages/durabletask/tests/integration_tests/test_03_single_agent_streaming.py @@ -26,11 +26,7 @@ import pytest import redis.asyncio as aioredis -from durabletask.azuremanaged.client import DurableTaskSchedulerClient - -from agent_framework_durabletask import DurableAIAgentClient - -from .testutils import OrchestrationHelper +from testutils import OrchestrationHelper, create_agent_client # Add sample directory to path to import RedisStreamResponseHandler SAMPLE_DIR = Path(__file__).parents[4] / "samples" / "getting_started" / "durabletask" / "03_single_agent_streaming" @@ -58,13 +54,7 @@ def setup(self, worker_process: dict[str, Any], dts_endpoint: str) -> None: self.taskhub: str = str(worker_process["taskhub"]) # Create agent client - dts_client = DurableTaskSchedulerClient( - host_address=self.endpoint, - secure_channel=False, - taskhub=self.taskhub, - token_credential=None, - ) - self.agent_client = DurableAIAgentClient(dts_client) + dts_client, self.agent_client = create_agent_client(self.endpoint, self.taskhub) self.helper = OrchestrationHelper(dts_client) # Redis configuration @@ -152,27 +142,38 @@ def test_agent_run_and_stream(self) -> None: # Poll Redis stream with retries to handle race conditions # The agent may take a few seconds to process and start writing to Redis - max_retries = 20 # Try for up to 40 seconds (20 * 2 seconds) + # We use cursor-based resumption to continue reading from where we left off + max_retries = 20 retry_count = 0 - text = "" + accumulated_text = "" is_complete = False + cursor: str | None = None - while retry_count < max_retries: - text, is_complete, _ = asyncio.run(self._stream_from_redis(thread_id, timeout=5.0)) + while retry_count < max_retries and not is_complete: + text, is_complete, last_cursor = asyncio.run( + self._stream_from_redis(thread_id, cursor=cursor, timeout=10.0) + ) + accumulated_text += text + cursor = last_cursor # Resume from last position on next read - if len(text) > 0 or is_complete: - # We got content or completion marker + if is_complete: + # Stream completed successfully break - # Wait before retrying - time.sleep(3) + if len(accumulated_text) > 0: + # Got content but not completion marker yet - keep reading without delay + # The agent may still be streaming or about to write completion marker + continue + + # No content yet - wait before retrying + time.sleep(2) retry_count += 1 # Verify we got content - assert len(text) > 0, ( + assert len(accumulated_text) > 0, ( f"Expected text content but got empty string for thread_id: {thread_id} after {retry_count} retries" ) - assert "seattle" in text.lower(), f"Expected 'seattle' in response but got: {text}" + assert "seattle" in accumulated_text.lower(), f"Expected 'seattle' in response but got: {accumulated_text}" assert is_complete, "Expected stream to be complete" def test_stream_with_cursor_resumption(self) -> None: diff --git a/python/packages/durabletask/tests/integration_tests/test_04_single_agent_orchestration_chaining.py b/python/packages/durabletask/tests/integration_tests/test_04_single_agent_orchestration_chaining.py index 6bfaa69c68..b87e7e0b18 100644 --- a/python/packages/durabletask/tests/integration_tests/test_04_single_agent_orchestration_chaining.py +++ b/python/packages/durabletask/tests/integration_tests/test_04_single_agent_orchestration_chaining.py @@ -14,12 +14,8 @@ from typing import Any import pytest -from durabletask.azuremanaged.client import DurableTaskSchedulerClient from durabletask.client import OrchestrationStatus - -from agent_framework_durabletask import DurableAIAgentClient - -from .testutils import OrchestrationHelper +from testutils import OrchestrationHelper, create_agent_client # Agent name from the 04_single_agent_orchestration_chaining sample WRITER_AGENT_NAME: str = "WriterAgent" @@ -45,16 +41,8 @@ def setup(self, worker_process: dict[str, Any], dts_endpoint: str) -> None: self.endpoint: str = dts_endpoint self.taskhub: str = str(worker_process["taskhub"]) - # Create DTS client for orchestrations - self.dts_client = DurableTaskSchedulerClient( - host_address=self.endpoint, - secure_channel=False, - taskhub=self.taskhub, - token_credential=None, - ) - - # Create agent client - self.agent_client = DurableAIAgentClient(self.dts_client) + # Create agent client and DTS client + self.dts_client, self.agent_client = create_agent_client(self.endpoint, self.taskhub) # Create orchestration helper self.orch_helper = OrchestrationHelper(self.dts_client) diff --git a/python/packages/durabletask/tests/integration_tests/test_05_multi_agent_orchestration_concurrency.py b/python/packages/durabletask/tests/integration_tests/test_05_multi_agent_orchestration_concurrency.py index eaa354f00d..0a9223c12f 100644 --- a/python/packages/durabletask/tests/integration_tests/test_05_multi_agent_orchestration_concurrency.py +++ b/python/packages/durabletask/tests/integration_tests/test_05_multi_agent_orchestration_concurrency.py @@ -14,12 +14,8 @@ from typing import Any import pytest -from durabletask.azuremanaged.client import DurableTaskSchedulerClient from durabletask.client import OrchestrationStatus - -from agent_framework_durabletask import DurableAIAgentClient - -from .testutils import OrchestrationHelper +from testutils import OrchestrationHelper, create_agent_client # Agent names from the 05_multi_agent_orchestration_concurrency sample PHYSICIST_AGENT_NAME: str = "PhysicistAgent" @@ -45,16 +41,8 @@ def setup(self, worker_process: dict[str, Any], dts_endpoint: str) -> None: self.endpoint = dts_endpoint self.taskhub = worker_process["taskhub"] - # Create DTS client for orchestrations - self.dts_client = DurableTaskSchedulerClient( - host_address=self.endpoint, - secure_channel=False, - taskhub=self.taskhub, - token_credential=None, - ) - - # Create agent client - self.agent_client = DurableAIAgentClient(self.dts_client) + # Create agent client and DTS client + self.dts_client, self.agent_client = create_agent_client(self.endpoint, self.taskhub) # Create orchestration helper self.orch_helper = OrchestrationHelper(self.dts_client) diff --git a/python/packages/durabletask/tests/integration_tests/test_06_multi_agent_orchestration_conditionals.py b/python/packages/durabletask/tests/integration_tests/test_06_multi_agent_orchestration_conditionals.py index b31baf1faa..9a7c6706da 100644 --- a/python/packages/durabletask/tests/integration_tests/test_06_multi_agent_orchestration_conditionals.py +++ b/python/packages/durabletask/tests/integration_tests/test_06_multi_agent_orchestration_conditionals.py @@ -14,12 +14,8 @@ from typing import Any import pytest -from durabletask.azuremanaged.client import DurableTaskSchedulerClient from durabletask.client import OrchestrationStatus - -from agent_framework_durabletask import DurableAIAgentClient - -from .testutils import OrchestrationHelper +from testutils import OrchestrationHelper, create_agent_client # Agent names from the 06_multi_agent_orchestration_conditionals sample SPAM_AGENT_NAME: str = "SpamDetectionAgent" @@ -45,16 +41,8 @@ def setup(self, worker_process: dict[str, Any], dts_endpoint: str) -> None: self.endpoint: str = dts_endpoint self.taskhub: str = str(worker_process["taskhub"]) - # Create DTS client for orchestrations - self.dts_client = DurableTaskSchedulerClient( - host_address=self.endpoint, - secure_channel=False, - taskhub=self.taskhub, - token_credential=None, - ) - - # Create agent client - self.agent_client = DurableAIAgentClient(self.dts_client) + # Create agent client and DTS client + self.dts_client, self.agent_client = create_agent_client(self.endpoint, self.taskhub) # Create orchestration helper self.orch_helper = OrchestrationHelper(self.dts_client) diff --git a/python/packages/durabletask/tests/integration_tests/test_07_single_agent_orchestration_hitl.py b/python/packages/durabletask/tests/integration_tests/test_07_single_agent_orchestration_hitl.py index a12cee021b..096f997ae1 100644 --- a/python/packages/durabletask/tests/integration_tests/test_07_single_agent_orchestration_hitl.py +++ b/python/packages/durabletask/tests/integration_tests/test_07_single_agent_orchestration_hitl.py @@ -14,12 +14,8 @@ from typing import Any import pytest -from durabletask.azuremanaged.client import DurableTaskSchedulerClient from durabletask.client import OrchestrationStatus - -from agent_framework_durabletask import DurableAIAgentClient - -from .testutils import OrchestrationHelper +from testutils import OrchestrationHelper, create_agent_client # Constants from the 07_single_agent_orchestration_hitl sample WRITER_AGENT_NAME: str = "WriterAgent" @@ -47,16 +43,8 @@ def setup(self, worker_process: dict[str, Any], dts_endpoint: str) -> None: logging.info(f"Using taskhub: {self.taskhub} at endpoint: {self.endpoint}") - # Create DTS client for orchestrations - self.dts_client = DurableTaskSchedulerClient( - host_address=self.endpoint, - secure_channel=False, - taskhub=self.taskhub, - token_credential=None, - ) - - # Create agent client - self.agent_client = DurableAIAgentClient(self.dts_client) + # Create agent client and DTS client + self.dts_client, self.agent_client = create_agent_client(self.endpoint, self.taskhub) # Create orchestration helper self.orch_helper = OrchestrationHelper(self.dts_client) diff --git a/python/packages/durabletask/tests/integration_tests/testutils.py b/python/packages/durabletask/tests/integration_tests/testutils.py index 41a0e6f2c6..34ae07c49a 100644 --- a/python/packages/durabletask/tests/integration_tests/testutils.py +++ b/python/packages/durabletask/tests/integration_tests/testutils.py @@ -9,6 +9,48 @@ from durabletask.azuremanaged.client import DurableTaskSchedulerClient from durabletask.client import OrchestrationStatus +from agent_framework_durabletask import DurableAIAgentClient + + +def create_dts_client(endpoint: str, taskhub: str) -> DurableTaskSchedulerClient: + """ + Create a DurableTaskSchedulerClient with common configuration. + + Args: + endpoint: The DTS endpoint address + taskhub: The task hub name + + Returns: + A configured DurableTaskSchedulerClient instance + """ + return DurableTaskSchedulerClient( + host_address=endpoint, + secure_channel=False, + taskhub=taskhub, + token_credential=None, + ) + + +def create_agent_client( + endpoint: str, + taskhub: str, + max_poll_retries: int = 90, +) -> tuple[DurableTaskSchedulerClient, DurableAIAgentClient]: + """ + Create a DurableAIAgentClient with the underlying DTS client. + + Args: + endpoint: The DTS endpoint address + taskhub: The task hub name + max_poll_retries: Max poll retries for the agent client + + Returns: + A tuple of (DurableTaskSchedulerClient, DurableAIAgentClient) + """ + dts_client = create_dts_client(endpoint, taskhub) + agent_client = DurableAIAgentClient(dts_client, max_poll_retries=max_poll_retries) + return dts_client, agent_client + class OrchestrationHelper: """Helper class for orchestration-related test operations.""" From f07f03a28f94a92d0a9c859436488eb0162b2ff2 Mon Sep 17 00:00:00 2001 From: Laveesh Rohra Date: Tue, 20 Jan 2026 16:56:01 -0800 Subject: [PATCH 3/4] Fix env viz --- python/packages/durabletask/tests/integration_tests/__init__.py | 2 -- python/packages/durabletask/tests/integration_tests/conftest.py | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) delete mode 100644 python/packages/durabletask/tests/integration_tests/__init__.py diff --git a/python/packages/durabletask/tests/integration_tests/__init__.py b/python/packages/durabletask/tests/integration_tests/__init__.py deleted file mode 100644 index c2e7b70bf2..0000000000 --- a/python/packages/durabletask/tests/integration_tests/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -# Copyright (c) Microsoft. All rights reserved. -"""Integration tests for agent-framework-durabletask package.""" diff --git a/python/packages/durabletask/tests/integration_tests/conftest.py b/python/packages/durabletask/tests/integration_tests/conftest.py index 387309282f..2cd045f291 100644 --- a/python/packages/durabletask/tests/integration_tests/conftest.py +++ b/python/packages/durabletask/tests/integration_tests/conftest.py @@ -21,7 +21,7 @@ sys.path.insert(0, str(Path(__file__).parent)) # Load environment variables from .env file -load_dotenv() +load_dotenv(Path(__file__).parent / ".env") # Configure logging to reduce noise during tests logging.basicConfig(level=logging.WARNING) From 04e395f753da1500ecac6c7feed390d39c80f4c8 Mon Sep 17 00:00:00 2001 From: Laveesh Rohra Date: Wed, 21 Jan 2026 14:15:15 -0800 Subject: [PATCH 4/4] Fix tests and address feedback --- .../durabletask/tests/integration_tests/README.md | 3 ++- .../integration_tests/{testutils.py => dt_testutils.py} | 8 +++----- ...test_01_single_agent.py => test_01_dt_single_agent.py} | 2 +- .../{test_02_multi_agent.py => test_02_dt_multi_agent.py} | 2 +- ..._streaming.py => test_03_dt_single_agent_streaming.py} | 2 +- ... => test_04_dt_single_agent_orchestration_chaining.py} | 2 +- ...> test_05_dt_multi_agent_orchestration_concurrency.py} | 2 +- ... test_06_dt_multi_agent_orchestration_conditionals.py} | 2 +- ...l.py => test_07_dt_single_agent_orchestration_hitl.py} | 2 +- .../tests/{test_entities.py => test_durable_entities.py} | 0 10 files changed, 12 insertions(+), 13 deletions(-) rename python/packages/durabletask/tests/integration_tests/{testutils.py => dt_testutils.py} (96%) rename python/packages/durabletask/tests/integration_tests/{test_01_single_agent.py => test_01_dt_single_agent.py} (98%) rename python/packages/durabletask/tests/integration_tests/{test_02_multi_agent.py => test_02_dt_multi_agent.py} (98%) rename python/packages/durabletask/tests/integration_tests/{test_03_single_agent_streaming.py => test_03_dt_single_agent_streaming.py} (99%) rename python/packages/durabletask/tests/integration_tests/{test_04_single_agent_orchestration_chaining.py => test_04_dt_single_agent_orchestration_chaining.py} (98%) rename python/packages/durabletask/tests/integration_tests/{test_05_multi_agent_orchestration_concurrency.py => test_05_dt_multi_agent_orchestration_concurrency.py} (97%) rename python/packages/durabletask/tests/integration_tests/{test_06_multi_agent_orchestration_conditionals.py => test_06_dt_multi_agent_orchestration_conditionals.py} (98%) rename python/packages/durabletask/tests/integration_tests/{test_07_single_agent_orchestration_hitl.py => test_07_dt_single_agent_orchestration_hitl.py} (98%) rename python/packages/durabletask/tests/{test_entities.py => test_durable_entities.py} (100%) diff --git a/python/packages/durabletask/tests/integration_tests/README.md b/python/packages/durabletask/tests/integration_tests/README.md index a35a8d58d5..59da266460 100644 --- a/python/packages/durabletask/tests/integration_tests/README.md +++ b/python/packages/durabletask/tests/integration_tests/README.md @@ -71,7 +71,8 @@ pytestmark = [ pytest.mark.requires_redis, ] ``` -roubleshooting + +## Troubleshooting **Tests are skipped:** Ensure `RUN_INTEGRATION_TESTS=true` is set in your `.env` file. diff --git a/python/packages/durabletask/tests/integration_tests/testutils.py b/python/packages/durabletask/tests/integration_tests/dt_testutils.py similarity index 96% rename from python/packages/durabletask/tests/integration_tests/testutils.py rename to python/packages/durabletask/tests/integration_tests/dt_testutils.py index 34ae07c49a..34696b42ff 100644 --- a/python/packages/durabletask/tests/integration_tests/testutils.py +++ b/python/packages/durabletask/tests/integration_tests/dt_testutils.py @@ -68,7 +68,6 @@ def wait_for_orchestration( self, instance_id: str, timeout: float = 60.0, - poll_interval: float = 0.5, ) -> Any: """ Wait for an orchestration to complete. @@ -76,7 +75,6 @@ def wait_for_orchestration( Args: instance_id: The orchestration instance ID timeout: Maximum time to wait in seconds - poll_interval: Time between polling attempts in seconds Returns: The final OrchestrationMetadata @@ -106,7 +104,6 @@ def wait_for_orchestration_with_output( self, instance_id: str, timeout: float = 60.0, - poll_interval: float = 0.5, ) -> tuple[Any, Any]: """ Wait for an orchestration to complete and return its output. @@ -114,7 +111,6 @@ def wait_for_orchestration_with_output( Args: instance_id: The orchestration instance ID timeout: Maximum time to wait in seconds - poll_interval: Time between polling attempts in seconds Returns: A tuple of (OrchestrationMetadata, output) @@ -123,7 +119,7 @@ def wait_for_orchestration_with_output( TimeoutError: If the orchestration doesn't complete within timeout RuntimeError: If the orchestration fails """ - metadata = self.wait_for_orchestration(instance_id, timeout, poll_interval) + metadata = self.wait_for_orchestration(instance_id, timeout) # The output should be available in the metadata return metadata, metadata.serialized_output @@ -200,6 +196,8 @@ def wait_for_notification(self, instance_id: str, timeout_seconds: int = 30) -> if metadata.runtime_status.name == "COMPLETED" or metadata.runtime_status.name == "FAILED": return False except Exception: + # Silently ignore transient errors during polling (e.g., network issues, service unavailable). + # The loop will retry until timeout, allowing the service to recover. pass time.sleep(1) diff --git a/python/packages/durabletask/tests/integration_tests/test_01_single_agent.py b/python/packages/durabletask/tests/integration_tests/test_01_dt_single_agent.py similarity index 98% rename from python/packages/durabletask/tests/integration_tests/test_01_single_agent.py rename to python/packages/durabletask/tests/integration_tests/test_01_dt_single_agent.py index 6b1c7310f3..38ca54050c 100644 --- a/python/packages/durabletask/tests/integration_tests/test_01_single_agent.py +++ b/python/packages/durabletask/tests/integration_tests/test_01_dt_single_agent.py @@ -13,7 +13,7 @@ from typing import Any import pytest -from testutils import create_agent_client +from dt_testutils import create_agent_client # Module-level markers - applied to all tests in this module pytestmark = [ diff --git a/python/packages/durabletask/tests/integration_tests/test_02_multi_agent.py b/python/packages/durabletask/tests/integration_tests/test_02_dt_multi_agent.py similarity index 98% rename from python/packages/durabletask/tests/integration_tests/test_02_multi_agent.py rename to python/packages/durabletask/tests/integration_tests/test_02_dt_multi_agent.py index 0054a96d2a..140de6609f 100644 --- a/python/packages/durabletask/tests/integration_tests/test_02_multi_agent.py +++ b/python/packages/durabletask/tests/integration_tests/test_02_dt_multi_agent.py @@ -14,7 +14,7 @@ import pytest from agent_framework import FunctionCallContent -from testutils import create_agent_client +from dt_testutils import create_agent_client # Agent names from the 02_multi_agent sample WEATHER_AGENT_NAME: str = "WeatherAgent" diff --git a/python/packages/durabletask/tests/integration_tests/test_03_single_agent_streaming.py b/python/packages/durabletask/tests/integration_tests/test_03_dt_single_agent_streaming.py similarity index 99% rename from python/packages/durabletask/tests/integration_tests/test_03_single_agent_streaming.py rename to python/packages/durabletask/tests/integration_tests/test_03_dt_single_agent_streaming.py index 8cd40eb3ed..d127a87356 100644 --- a/python/packages/durabletask/tests/integration_tests/test_03_single_agent_streaming.py +++ b/python/packages/durabletask/tests/integration_tests/test_03_dt_single_agent_streaming.py @@ -26,7 +26,7 @@ import pytest import redis.asyncio as aioredis -from testutils import OrchestrationHelper, create_agent_client +from dt_testutils import OrchestrationHelper, create_agent_client # Add sample directory to path to import RedisStreamResponseHandler SAMPLE_DIR = Path(__file__).parents[4] / "samples" / "getting_started" / "durabletask" / "03_single_agent_streaming" diff --git a/python/packages/durabletask/tests/integration_tests/test_04_single_agent_orchestration_chaining.py b/python/packages/durabletask/tests/integration_tests/test_04_dt_single_agent_orchestration_chaining.py similarity index 98% rename from python/packages/durabletask/tests/integration_tests/test_04_single_agent_orchestration_chaining.py rename to python/packages/durabletask/tests/integration_tests/test_04_dt_single_agent_orchestration_chaining.py index b87e7e0b18..85cdde270e 100644 --- a/python/packages/durabletask/tests/integration_tests/test_04_single_agent_orchestration_chaining.py +++ b/python/packages/durabletask/tests/integration_tests/test_04_dt_single_agent_orchestration_chaining.py @@ -14,8 +14,8 @@ from typing import Any import pytest +from dt_testutils import OrchestrationHelper, create_agent_client from durabletask.client import OrchestrationStatus -from testutils import OrchestrationHelper, create_agent_client # Agent name from the 04_single_agent_orchestration_chaining sample WRITER_AGENT_NAME: str = "WriterAgent" diff --git a/python/packages/durabletask/tests/integration_tests/test_05_multi_agent_orchestration_concurrency.py b/python/packages/durabletask/tests/integration_tests/test_05_dt_multi_agent_orchestration_concurrency.py similarity index 97% rename from python/packages/durabletask/tests/integration_tests/test_05_multi_agent_orchestration_concurrency.py rename to python/packages/durabletask/tests/integration_tests/test_05_dt_multi_agent_orchestration_concurrency.py index 0a9223c12f..367100ef0c 100644 --- a/python/packages/durabletask/tests/integration_tests/test_05_multi_agent_orchestration_concurrency.py +++ b/python/packages/durabletask/tests/integration_tests/test_05_dt_multi_agent_orchestration_concurrency.py @@ -14,8 +14,8 @@ from typing import Any import pytest +from dt_testutils import OrchestrationHelper, create_agent_client from durabletask.client import OrchestrationStatus -from testutils import OrchestrationHelper, create_agent_client # Agent names from the 05_multi_agent_orchestration_concurrency sample PHYSICIST_AGENT_NAME: str = "PhysicistAgent" diff --git a/python/packages/durabletask/tests/integration_tests/test_06_multi_agent_orchestration_conditionals.py b/python/packages/durabletask/tests/integration_tests/test_06_dt_multi_agent_orchestration_conditionals.py similarity index 98% rename from python/packages/durabletask/tests/integration_tests/test_06_multi_agent_orchestration_conditionals.py rename to python/packages/durabletask/tests/integration_tests/test_06_dt_multi_agent_orchestration_conditionals.py index 9a7c6706da..9642cd3672 100644 --- a/python/packages/durabletask/tests/integration_tests/test_06_multi_agent_orchestration_conditionals.py +++ b/python/packages/durabletask/tests/integration_tests/test_06_dt_multi_agent_orchestration_conditionals.py @@ -14,8 +14,8 @@ from typing import Any import pytest +from dt_testutils import OrchestrationHelper, create_agent_client from durabletask.client import OrchestrationStatus -from testutils import OrchestrationHelper, create_agent_client # Agent names from the 06_multi_agent_orchestration_conditionals sample SPAM_AGENT_NAME: str = "SpamDetectionAgent" diff --git a/python/packages/durabletask/tests/integration_tests/test_07_single_agent_orchestration_hitl.py b/python/packages/durabletask/tests/integration_tests/test_07_dt_single_agent_orchestration_hitl.py similarity index 98% rename from python/packages/durabletask/tests/integration_tests/test_07_single_agent_orchestration_hitl.py rename to python/packages/durabletask/tests/integration_tests/test_07_dt_single_agent_orchestration_hitl.py index 096f997ae1..2a668e9ede 100644 --- a/python/packages/durabletask/tests/integration_tests/test_07_single_agent_orchestration_hitl.py +++ b/python/packages/durabletask/tests/integration_tests/test_07_dt_single_agent_orchestration_hitl.py @@ -14,8 +14,8 @@ from typing import Any import pytest +from dt_testutils import OrchestrationHelper, create_agent_client from durabletask.client import OrchestrationStatus -from testutils import OrchestrationHelper, create_agent_client # Constants from the 07_single_agent_orchestration_hitl sample WRITER_AGENT_NAME: str = "WriterAgent" diff --git a/python/packages/durabletask/tests/test_entities.py b/python/packages/durabletask/tests/test_durable_entities.py similarity index 100% rename from python/packages/durabletask/tests/test_entities.py rename to python/packages/durabletask/tests/test_durable_entities.py