From a31a22ba772a5c24c3ee96b5d351791e2306ba5a Mon Sep 17 00:00:00 2001 From: Andy Staples Date: Wed, 8 Apr 2026 13:21:18 -0600 Subject: [PATCH] Add samples for new features --- .../python/async-http-api/README.md | 28 +++ .../python/async-http-api/client.py | 59 +++-- .../python/large-payload/README.md | 219 +++++++++++++++++ .../python/large-payload/client.py | 77 ++++++ .../python/large-payload/docker-compose.yml | 21 ++ .../python/large-payload/requirements.txt | 2 + .../python/large-payload/worker.py | 100 ++++++++ .../python/orchestration-management/README.md | 222 ++++++++++++++++++ .../python/orchestration-management/client.py | 114 +++++++++ .../orchestration-management/requirements.txt | 2 + .../python/orchestration-management/worker.py | 70 ++++++ .../python/testing/README.md | 113 +++++++++ .../python/testing/requirements.txt | 2 + .../python/testing/test_workflows.py | 148 ++++++++++++ .../python/testing/workflows.py | 78 ++++++ .../python/work-item-filtering/README.md | 221 +++++++++++++++++ .../python/work-item-filtering/client.py | 66 ++++++ .../work-item-filtering/requirements.txt | 2 + .../python/work-item-filtering/worker_a.py | 69 ++++++ .../python/work-item-filtering/worker_b.py | 71 ++++++ 20 files changed, 1660 insertions(+), 24 deletions(-) create mode 100644 samples/durable-task-sdks/python/large-payload/README.md create mode 100644 samples/durable-task-sdks/python/large-payload/client.py create mode 100644 samples/durable-task-sdks/python/large-payload/docker-compose.yml create mode 100644 samples/durable-task-sdks/python/large-payload/requirements.txt create mode 100644 samples/durable-task-sdks/python/large-payload/worker.py create mode 100644 samples/durable-task-sdks/python/orchestration-management/README.md create mode 100644 samples/durable-task-sdks/python/orchestration-management/client.py create mode 100644 samples/durable-task-sdks/python/orchestration-management/requirements.txt create mode 100644 samples/durable-task-sdks/python/orchestration-management/worker.py create mode 100644 samples/durable-task-sdks/python/testing/README.md create mode 100644 samples/durable-task-sdks/python/testing/requirements.txt create mode 100644 samples/durable-task-sdks/python/testing/test_workflows.py create mode 100644 samples/durable-task-sdks/python/testing/workflows.py create mode 100644 samples/durable-task-sdks/python/work-item-filtering/README.md create mode 100644 samples/durable-task-sdks/python/work-item-filtering/client.py create mode 100644 samples/durable-task-sdks/python/work-item-filtering/requirements.txt create mode 100644 samples/durable-task-sdks/python/work-item-filtering/worker_a.py create mode 100644 samples/durable-task-sdks/python/work-item-filtering/worker_b.py diff --git a/samples/durable-task-sdks/python/async-http-api/README.md b/samples/durable-task-sdks/python/async-http-api/README.md index 0c3509ec..d1a14c2a 100644 --- a/samples/durable-task-sdks/python/async-http-api/README.md +++ b/samples/durable-task-sdks/python/async-http-api/README.md @@ -243,3 +243,31 @@ To access the Durable Task Scheduler Dashboard and review your orchestration: 5. Review the execution details The dashboard helps you understand how the async HTTP API pattern works behind the scenes, showing how the durable orchestration provides the backend processing for the asynchronous API endpoints. + +## Async Client + +This sample uses `AsyncDurableTaskSchedulerClient` — the native async/await client — instead of the synchronous `DurableTaskSchedulerClient`. This is the recommended approach for FastAPI and other async web frameworks because it avoids blocking the event loop when communicating with the scheduler. + +Key differences from the synchronous client: + +```python +from azure.identity.aio import DefaultAzureCredential # Async credential +from durabletask.azuremanaged.client import AsyncDurableTaskSchedulerClient + +# Create the async client +client = AsyncDurableTaskSchedulerClient( + host_address=endpoint, + secure_channel=True, + taskhub=taskhub, + token_credential=DefaultAzureCredential(), +) + +# All client methods are awaitable +instance_id = await client.schedule_new_orchestration("my_orchestrator", input=data) +state = await client.get_orchestration_state(instance_id) + +# Close the client when done +await client.close() +``` + +The async client supports the same operations as the sync client: scheduling, querying, raising events, terminating, suspending, resuming, restarting, and purging orchestrations. diff --git a/samples/durable-task-sdks/python/async-http-api/client.py b/samples/durable-task-sdks/python/async-http-api/client.py index db84ea99..4b9216b9 100644 --- a/samples/durable-task-sdks/python/async-http-api/client.py +++ b/samples/durable-task-sdks/python/async-http-api/client.py @@ -2,19 +2,17 @@ import logging import uuid import os +from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException, BackgroundTasks from pydantic import BaseModel -from azure.identity import DefaultAzureCredential +from azure.identity.aio import DefaultAzureCredential from durabletask import client as durable_client -from durabletask.azuremanaged.client import DurableTaskSchedulerClient +from durabletask.azuremanaged.client import AsyncDurableTaskSchedulerClient # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -# Set up FastAPI app -app = FastAPI(title="Durable Task Async HTTP API Sample") - # Models for request and response class OperationRequest(BaseModel): processing_time: int = 5 # Default processing time in seconds @@ -23,9 +21,6 @@ class OperationResponse(BaseModel): operation_id: str status_url: str -# Dictionary to store client references -client_cache = {} - # Get environment variables for taskhub and endpoint with defaults TASKHUB = os.getenv("TASKHUB", "default") ENDPOINT = os.getenv("ENDPOINT", "http://localhost:8080") @@ -33,18 +28,34 @@ class OperationResponse(BaseModel): print(f"Using taskhub: {TASKHUB}") print(f"Using endpoint: {ENDPOINT}") -async def get_client(): - """Get or create a Durable Task client.""" - if "client" not in client_cache: - # Set credential to None for emulator, or DefaultAzureCredential for Azure - credential = None if ENDPOINT == "http://localhost:8080" else DefaultAzureCredential() - client_cache["client"] = DurableTaskSchedulerClient( - host_address=ENDPOINT, - secure_channel=ENDPOINT != "http://localhost:8080", - taskhub=TASKHUB, - token_credential=credential - ) - return client_cache["client"] +# Shared async client instance (managed by the app lifespan) +_async_client: AsyncDurableTaskSchedulerClient | None = None + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Manage the async client lifecycle with the FastAPI app.""" + global _async_client + credential = None if ENDPOINT == "http://localhost:8080" else DefaultAzureCredential() + _async_client = AsyncDurableTaskSchedulerClient( + host_address=ENDPOINT, + secure_channel=ENDPOINT != "http://localhost:8080", + taskhub=TASKHUB, + token_credential=credential, + ) + yield + await _async_client.close() + _async_client = None + + +# Set up FastAPI app with lifespan +app = FastAPI(title="Durable Task Async HTTP API Sample", lifespan=lifespan) + + +async def get_client() -> AsyncDurableTaskSchedulerClient: + """Get the async Durable Task client.""" + assert _async_client is not None, "Client not initialized — app not started" + return _async_client @app.post("/api/start-operation", response_model=OperationResponse) async def start_operation(request: OperationRequest): @@ -65,8 +76,8 @@ async def start_operation(request: OperationRequest): "processing_time": request.processing_time } - # Schedule the orchestration - instance_id = client.schedule_new_orchestration( + # Schedule the orchestration using the async client + instance_id = await client.schedule_new_orchestration( "async_http_api_orchestrator", input=input_data, instance_id=operation_id # Use operation_id as instance_id for simplicity @@ -91,8 +102,8 @@ async def get_operation_status(operation_id: str): # Get client client = await get_client() - # Get the orchestration status - status = client.get_orchestration_state(operation_id) + # Get the orchestration status using the async client + status = await client.get_orchestration_state(operation_id) if not status: raise HTTPException(status_code=404, detail=f"Operation {operation_id} not found") diff --git a/samples/durable-task-sdks/python/large-payload/README.md b/samples/durable-task-sdks/python/large-payload/README.md new file mode 100644 index 00000000..20d0d20e --- /dev/null +++ b/samples/durable-task-sdks/python/large-payload/README.md @@ -0,0 +1,219 @@ +# Large Payload Externalization + +Python | Durable Task SDK + +## Description of the Sample + +This sample demonstrates large payload externalization with the Azure Durable Task Scheduler using the Python SDK. When orchestration inputs, activity outputs, or event data exceed a configurable size threshold, the SDK automatically offloads them to Azure Blob Storage and replaces them with compact reference tokens in gRPC messages. + +In this sample: +1. A `generate_data` activity produces payloads of configurable size +2. A `process_data` activity receives and summarizes the data +3. The orchestrator runs twice — once with a small payload (stays inline) and once with a large payload (externalized to blob storage) +4. Externalization is completely transparent to the orchestration logic + +This pattern is useful for: +- Workflows that process large documents, images, or datasets +- Avoiding gRPC message size limits when passing data between activities +- Keeping orchestration history compact while allowing large intermediate results + +## Prerequisites + +1. [Python 3.10+](https://www.python.org/downloads/) +2. [Docker](https://www.docker.com/products/docker-desktop/) (for running the emulator and Azurite) +3. [Azure CLI](https://docs.microsoft.com/cli/azure/install-azure-cli) (if using a deployed Durable Task Scheduler) + +## Configuring Durable Task Scheduler + +There are two ways to run this sample locally: + +### Using the Emulator (Recommended) + +This sample provides a `docker-compose.yml` that starts both the DTS emulator and Azurite (Azure Storage emulator) together: + +```bash +docker compose up -d +``` + +This starts: +- **DTS emulator** on ports 8080 (gRPC) and 8082 (dashboard) +- **Azurite** on ports 10000 (blob), 10001 (queue), and 10002 (table) + +Wait a few seconds for both containers to be ready. To stop the services later: + +```bash +docker compose down +``` + +Note: The example code automatically uses the default emulator settings (endpoint: `http://localhost:8080`, taskhub: `default`) and the Azurite connection string (`UseDevelopmentStorage=true`). You don't need to set any environment variables. + +### Using a Deployed Scheduler and Taskhub in Azure + +Local development with a deployed scheduler: + +1. Install the durable task scheduler CLI extension: + + ```bash + az upgrade + az extension add --name durabletask --allow-preview true + ``` + +2. Create a resource group in a region where the Durable Task Scheduler is available: + + ```bash + az provider show --namespace Microsoft.DurableTask --query "resourceTypes[?resourceType=='schedulers'].locations | [0]" --out table + ``` + + ```bash + az group create --name my-resource-group --location + ``` + +3. Create a durable task scheduler resource: + + ```bash + az durabletask scheduler create \ + --resource-group my-resource-group \ + --name my-scheduler \ + --ip-allowlist '["0.0.0.0/0"]' \ + --sku-name "Dedicated" \ + --sku-capacity 1 \ + --tags "{'myattribute':'myvalue'}" + ``` + +4. Create a task hub within the scheduler resource: + + ```bash + az durabletask taskhub create \ + --resource-group my-resource-group \ + --scheduler-name my-scheduler \ + --name "my-taskhub" + ``` + +5. Grant the current user permission to connect to the `my-taskhub` task hub: + + ```bash + subscriptionId=$(az account show --query "id" -o tsv) + loggedInUser=$(az account show --query "user.name" -o tsv) + + az role assignment create \ + --assignee $loggedInUser \ + --role "Durable Task Data Contributor" \ + --scope "/subscriptions/$subscriptionId/resourceGroups/my-resource-group/providers/Microsoft.DurableTask/schedulers/my-scheduler/taskHubs/my-taskhub" + ``` + +## How to Run the Sample + +Once you have set up either the emulator or deployed scheduler, follow these steps to run the sample: + +1. First, activate your Python virtual environment (if you're using one): + ```bash + python -m venv venv + source venv/bin/activate # On Windows, use: venv\Scripts\activate + ``` + +2. If you're using a deployed scheduler, set environment variables: + ```bash + export ENDPOINT=$(az durabletask scheduler show \ + --resource-group my-resource-group \ + --name my-scheduler \ + --query "properties.endpoint" \ + --output tsv) + + export TASKHUB="my-taskhub" + export STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=https;AccountName=..." + ``` + +3. Install the required packages: + ```bash + pip install -r requirements.txt + ``` + +4. Start the worker in a terminal: + ```bash + python worker.py + ``` + You should see output indicating the worker has started and registered the orchestration and activities. + +5. In a new terminal (with the virtual environment activated if applicable), run the client: + > **Note:** Remember to set the environment variables again if you're using a deployed scheduler. + + ```bash + python client.py + ``` + +## Expected Output + +### Worker Output +``` +Using taskhub: default +Using endpoint: http://localhost:8080 +INFO:__main__:Starting Large Payload pattern worker... +INFO:__main__:Generating data with 10 records +INFO:__main__:Processed 10 records (70 bytes) +INFO:__main__:Generating data with 10000 records +INFO:__main__:Processed 10000 records (70000 bytes) +``` + +### Client Output +``` +Using taskhub: default +Using endpoint: http://localhost:8080 + +--- Small payload (stays inline) --- +Result: "Processed 10 records (70 bytes)" + +--- Large payload (externalized to blob storage) --- +Result: "Processed 10000 records (70000 bytes)" + +Done! +``` + +Both orchestrations produce the same type of result. The difference is invisible to the application code — the SDK transparently externalizes the 70 KB payload to blob storage and retrieves it when needed. + +## Code Walkthrough + +### Payload Store Configuration + +The `BlobPayloadStore` is configured with a `BlobPayloadStoreOptions` object: + +```python +store = BlobPayloadStore(BlobPayloadStoreOptions( + connection_string=storage_conn_str, + threshold_bytes=1_024, # Externalize payloads larger than 1 KB +)) +``` + +Key options: +- **`connection_string`**: Azure Storage connection string (or `UseDevelopmentStorage=true` for Azurite) +- **`threshold_bytes`**: Payloads larger than this are externalized (default: 900 KB) +- **`max_stored_payload_bytes`**: Maximum payload size that can be stored (default: 10 MB) +- **`enable_compression`**: Whether to compress payloads with GZip before storing (default: `True`) +- **`container_name`**: Blob container name (default: `durabletask-payloads`) + +### Passing the Store to Worker and Client + +Both the worker and client must be configured with the same payload store: + +```python +# Worker +with DurableTaskSchedulerWorker(..., payload_store=store) as worker: + ... + +# Client +client = DurableTaskSchedulerClient(..., payload_store=store) +``` + +## Viewing in the Dashboard + +- **Emulator:** Navigate to http://localhost:8082 → select the "default" task hub +- **Azure:** Navigate to your Scheduler resource in the Azure Portal → Task Hub → Dashboard URL + +## Related Samples + +- [Function Chaining](../function-chaining/) - Basic sequential workflow pattern +- [Fan-Out/Fan-In](../fan-out-fan-in/) - Parallel processing pattern +- [Large Payload (.NET)](../../dotnet/LargePayload/) - Same pattern in .NET + +## Learn More + +- [Durable Task Scheduler documentation](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/develop-with-durable-task-scheduler) diff --git a/samples/durable-task-sdks/python/large-payload/client.py b/samples/durable-task-sdks/python/large-payload/client.py new file mode 100644 index 00000000..ba90d3bc --- /dev/null +++ b/samples/durable-task-sdks/python/large-payload/client.py @@ -0,0 +1,77 @@ +import asyncio +import logging +import os +from azure.identity import DefaultAzureCredential +from durabletask import client as durable_client +from durabletask.azuremanaged.client import DurableTaskSchedulerClient +from durabletask.extensions.azure_blob_payloads import BlobPayloadStore, BlobPayloadStoreOptions + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +async def main(): + """Main entry point for the client application.""" + logger.info("Starting Large Payload pattern client...") + + # Get environment variables for taskhub and endpoint with defaults + taskhub_name = os.getenv("TASKHUB", "default") + endpoint = os.getenv("ENDPOINT", "http://localhost:8080") + + # Azure Storage connection string (defaults to Azurite) + storage_conn_str = os.getenv( + "STORAGE_CONNECTION_STRING", + "UseDevelopmentStorage=true", + ) + + print(f"Using taskhub: {taskhub_name}") + print(f"Using endpoint: {endpoint}") + + # Configure the blob payload store — must match the worker configuration + store = BlobPayloadStore(BlobPayloadStoreOptions( + connection_string=storage_conn_str, + threshold_bytes=1_024, + )) + + # Set credential to None for emulator, or DefaultAzureCredential for Azure + credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() + + client = DurableTaskSchedulerClient( + host_address=endpoint, + secure_channel=endpoint != "http://localhost:8080", + taskhub=taskhub_name, + token_credential=credential, + payload_store=store, + ) + + # --- Small payload (stays inline) --- + print("\n--- Small payload (stays inline) ---") + instance_id = client.schedule_new_orchestration( + "large_payload_orchestrator", input=10 + ) + logger.info(f"Scheduled orchestration with ID: {instance_id}") + + state = client.wait_for_orchestration_completion(instance_id, timeout=60) + if state and state.runtime_status == durable_client.OrchestrationStatus.COMPLETED: + print(f"Result: {state.serialized_output}") + elif state: + print(f"Orchestration failed: {state.failure_details}") + + # --- Large payload (externalized to blob storage) --- + print("\n--- Large payload (externalized to blob storage) ---") + instance_id = client.schedule_new_orchestration( + "large_payload_orchestrator", input=10_000 + ) + logger.info(f"Scheduled orchestration with ID: {instance_id}") + + state = client.wait_for_orchestration_completion(instance_id, timeout=60) + if state and state.runtime_status == durable_client.OrchestrationStatus.COMPLETED: + print(f"Result: {state.serialized_output}") + elif state: + print(f"Orchestration failed: {state.failure_details}") + + print("\nDone!") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/samples/durable-task-sdks/python/large-payload/docker-compose.yml b/samples/durable-task-sdks/python/large-payload/docker-compose.yml new file mode 100644 index 00000000..cad4c496 --- /dev/null +++ b/samples/durable-task-sdks/python/large-payload/docker-compose.yml @@ -0,0 +1,21 @@ +services: + azurite: + image: mcr.microsoft.com/azure-storage/azurite:latest + command: azurite --blobHost 0.0.0.0 --queueHost 0.0.0.0 --tableHost 0.0.0.0 --skipApiVersionCheck + ports: + - "10000:10000" + - "10001:10001" + - "10002:10002" + volumes: + - azurite-data:/data + + dts-emulator: + image: mcr.microsoft.com/dts/dts-emulator:latest + ports: + - "8080:8080" + - "8082:8082" + depends_on: + - azurite + +volumes: + azurite-data: diff --git a/samples/durable-task-sdks/python/large-payload/requirements.txt b/samples/durable-task-sdks/python/large-payload/requirements.txt new file mode 100644 index 00000000..4f2ead35 --- /dev/null +++ b/samples/durable-task-sdks/python/large-payload/requirements.txt @@ -0,0 +1,2 @@ +durabletask-azuremanaged[azure-blob-payloads] +azure-identity diff --git a/samples/durable-task-sdks/python/large-payload/worker.py b/samples/durable-task-sdks/python/large-payload/worker.py new file mode 100644 index 00000000..0374e5f4 --- /dev/null +++ b/samples/durable-task-sdks/python/large-payload/worker.py @@ -0,0 +1,100 @@ +import asyncio +import logging +import os +from azure.identity import DefaultAzureCredential +from durabletask import task +from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker +from durabletask.extensions.azure_blob_payloads import BlobPayloadStore, BlobPayloadStoreOptions + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# Activity functions +def generate_data(ctx: task.ActivityContext, num_records: int) -> str: + """Activity that generates a payload of configurable size. + + For a small *num_records* value, the payload stays below the + externalization threshold and is sent inline. For a large value, + the SDK automatically offloads the payload to Azure Blob Storage. + """ + logger.info(f"Generating data with {num_records} records") + return "RECORD|" * num_records + + +def process_data(ctx: task.ActivityContext, data: str) -> str: + """Activity that summarizes the received data.""" + record_count = data.count("RECORD|") + summary = f"Processed {record_count} records ({len(data)} bytes)" + logger.info(summary) + return summary + + +# Orchestrator function +def large_payload_orchestrator(ctx: task.OrchestrationContext, num_records: int): + """Orchestrator that generates data and then processes it. + + Both the activity output (data) and the orchestration result are + transparently externalized to blob storage when they exceed the + configured threshold. + """ + data = yield ctx.call_activity(generate_data, input=num_records) + summary = yield ctx.call_activity(process_data, input=data) + return summary + + +async def main(): + """Main entry point for the worker process.""" + logger.info("Starting Large Payload pattern worker...") + + # Get environment variables for taskhub and endpoint with defaults + taskhub_name = os.getenv("TASKHUB", "default") + endpoint = os.getenv("ENDPOINT", "http://localhost:8080") + + # Azure Storage connection string (defaults to Azurite) + storage_conn_str = os.getenv( + "STORAGE_CONNECTION_STRING", + "UseDevelopmentStorage=true", + ) + + print(f"Using taskhub: {taskhub_name}") + print(f"Using endpoint: {endpoint}") + + # Configure the blob payload store with a low threshold for demo purposes + store = BlobPayloadStore(BlobPayloadStoreOptions( + connection_string=storage_conn_str, + # Use a low threshold so that externalization is visible in the demo + threshold_bytes=1_024, + )) + + # Set credential to None for emulator, or DefaultAzureCredential for Azure + credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() + + with DurableTaskSchedulerWorker( + host_address=endpoint, + secure_channel=endpoint != "http://localhost:8080", + taskhub=taskhub_name, + token_credential=credential, + payload_store=store, + ) as worker: + + # Register activities and orchestrator + worker.add_activity(generate_data) + worker.add_activity(process_data) + worker.add_orchestrator(large_payload_orchestrator) + + # Start the worker + worker.start() + + try: + # Keep the worker running + while True: + await asyncio.sleep(1) + except KeyboardInterrupt: + logger.info("Worker shutdown initiated") + + logger.info("Worker stopped") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/samples/durable-task-sdks/python/orchestration-management/README.md b/samples/durable-task-sdks/python/orchestration-management/README.md new file mode 100644 index 00000000..db0a293e --- /dev/null +++ b/samples/durable-task-sdks/python/orchestration-management/README.md @@ -0,0 +1,222 @@ +# Orchestration Management + +Python | Durable Task SDK + +## Description of the Sample + +This sample demonstrates orchestration lifecycle management operations with the Azure Durable Task Scheduler using the Python SDK. It covers restarting completed orchestrations, querying orchestration instances by filter, and batch purging old orchestrations. + +In this sample: +1. Three orchestrations are scheduled and run to completion +2. A completed orchestration is restarted with the same instance ID +3. Another is restarted with a new instance ID +4. Orchestrations are queried by creation time and status +5. Completed orchestrations are batch-purged + +This pattern is useful for: +- Re-running failed or completed workflows with the same original input +- Cleaning up old orchestration history to manage storage +- Querying orchestration status for monitoring dashboards +- Implementing retry-from-scratch logic in management tools + +## Prerequisites + +1. [Python 3.10+](https://www.python.org/downloads/) +2. [Docker](https://www.docker.com/products/docker-desktop/) (for running the emulator) +3. [Azure CLI](https://docs.microsoft.com/cli/azure/install-azure-cli) (if using a deployed Durable Task Scheduler) + +## Configuring Durable Task Scheduler + +There are two ways to run this sample locally: + +### Using the Emulator (Recommended) + +The emulator simulates a scheduler and taskhub in a Docker container, making it ideal for development and learning. + +1. Pull the Docker Image for the Emulator: + ```bash + docker pull mcr.microsoft.com/dts/dts-emulator:latest + ``` + +2. Run the Emulator: + ```bash + docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest + ``` + +Wait a few seconds for the container to be ready. + +Note: The example code automatically uses the default emulator settings (endpoint: `http://localhost:8080`, taskhub: `default`). You don't need to set any environment variables. + +### Using a Deployed Scheduler and Taskhub in Azure + +Local development with a deployed scheduler: + +1. Install the durable task scheduler CLI extension: + + ```bash + az upgrade + az extension add --name durabletask --allow-preview true + ``` + +2. Create a resource group in a region where the Durable Task Scheduler is available: + + ```bash + az provider show --namespace Microsoft.DurableTask --query "resourceTypes[?resourceType=='schedulers'].locations | [0]" --out table + ``` + + ```bash + az group create --name my-resource-group --location + ``` + +3. Create a durable task scheduler resource: + + ```bash + az durabletask scheduler create \ + --resource-group my-resource-group \ + --name my-scheduler \ + --ip-allowlist '["0.0.0.0/0"]' \ + --sku-name "Dedicated" \ + --sku-capacity 1 \ + --tags "{'myattribute':'myvalue'}" + ``` + +4. Create a task hub within the scheduler resource: + + ```bash + az durabletask taskhub create \ + --resource-group my-resource-group \ + --scheduler-name my-scheduler \ + --name "my-taskhub" + ``` + +5. Grant the current user permission to connect to the `my-taskhub` task hub: + + ```bash + subscriptionId=$(az account show --query "id" -o tsv) + loggedInUser=$(az account show --query "user.name" -o tsv) + + az role assignment create \ + --assignee $loggedInUser \ + --role "Durable Task Data Contributor" \ + --scope "/subscriptions/$subscriptionId/resourceGroups/my-resource-group/providers/Microsoft.DurableTask/schedulers/my-scheduler/taskHubs/my-taskhub" + ``` + +## How to Run the Sample + +Once you have set up either the emulator or deployed scheduler, follow these steps to run the sample: + +1. First, activate your Python virtual environment (if you're using one): + ```bash + python -m venv venv + source venv/bin/activate # On Windows, use: venv\Scripts\activate + ``` + +2. If you're using a deployed scheduler, set environment variables: + ```bash + export ENDPOINT=$(az durabletask scheduler show \ + --resource-group my-resource-group \ + --name my-scheduler \ + --query "properties.endpoint" \ + --output tsv) + + export TASKHUB="my-taskhub" + ``` + +3. Install the required packages: + ```bash + pip install -r requirements.txt + ``` + +4. Start the worker in a terminal: + ```bash + python worker.py + ``` + +5. In a new terminal (with the virtual environment activated if applicable), run the client: + > **Note:** Remember to set the environment variables again if you're using a deployed scheduler. + + ```bash + python client.py + ``` + +## Expected Output + +### Client Output +``` +=== Step 1: Schedule orchestrations === + Completed: -> {"batch_id": "batch-1", "items_processed": 10, "status": "success"} + Completed: -> {"batch_id": "batch-2", "items_processed": 20, "status": "success"} + Completed: -> {"batch_id": "batch-3", "items_processed": 30, "status": "success"} + +=== Step 2: Restart orchestration (same instance ID) === + Restarted -> new execution ID: + Restarted orchestration completed: {"batch_id": "batch-1", "items_processed": 10, "status": "success"} + +=== Step 3: Restart orchestration (new instance ID) === + Restarted with new ID: + New orchestration completed: {"batch_id": "batch-2", "items_processed": 20, "status": "success"} + +=== Step 4: Query orchestrations === + Found 5 completed orchestration(s) since 2025-01-01T00:00:00+00:00 + +=== Step 5: Batch purge completed orchestrations === + Purged 5 orchestration(s) + Remaining completed orchestrations: 0 + +Done! +``` + +## Code Walkthrough + +### Restarting Orchestrations + +Restart re-runs a completed orchestration with its original input: + +```python +# Restart with the same instance ID (replaces the old execution) +restarted_id = client.restart_orchestration(instance_id) + +# Restart with a new instance ID (keeps the old execution) +new_id = client.restart_orchestration(instance_id, restart_with_new_instance_id=True) +``` + +- **Same ID:** The restarted orchestration reuses the original instance ID. Useful for retrying a workflow in-place. +- **New ID:** A new instance ID is generated. Useful when you want to keep the history of the original execution. + +### Querying Orchestrations + +Query instances by creation time and status: + +```python +query = durable_client.OrchestrationQuery( + created_time_from=start_time, + runtime_status=[durable_client.OrchestrationStatus.COMPLETED], +) +states = client.get_all_orchestration_states(query) +``` + +### Batch Purging + +Remove completed orchestrations by filter criteria: + +```python +result = client.purge_orchestrations_by( + created_time_from=start_time, + runtime_status=[durable_client.OrchestrationStatus.COMPLETED], +) +print(f"Purged {result.deleted_instance_count} orchestration(s)") +``` + +## Viewing in the Dashboard + +- **Emulator:** Navigate to http://localhost:8082 → select the "default" task hub +- **Azure:** Navigate to your Scheduler resource in the Azure Portal → Task Hub → Dashboard URL + +## Related Samples + +- [Function Chaining](../function-chaining/) - Basic sequential workflow pattern +- [Async HTTP API](../async-http-api/) - RESTful API with orchestration backend + +## Learn More + +- [Durable Task Scheduler documentation](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/develop-with-durable-task-scheduler) diff --git a/samples/durable-task-sdks/python/orchestration-management/client.py b/samples/durable-task-sdks/python/orchestration-management/client.py new file mode 100644 index 00000000..e67bc472 --- /dev/null +++ b/samples/durable-task-sdks/python/orchestration-management/client.py @@ -0,0 +1,114 @@ +import asyncio +import logging +import os +from datetime import datetime, timezone +from azure.identity import DefaultAzureCredential +from durabletask import client as durable_client +from durabletask.azuremanaged.client import DurableTaskSchedulerClient + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +async def main(): + """Main entry point demonstrating orchestration management operations.""" + logger.info("Starting Orchestration Management client...") + + # Get environment variables for taskhub and endpoint with defaults + taskhub_name = os.getenv("TASKHUB", "default") + endpoint = os.getenv("ENDPOINT", "http://localhost:8080") + + print(f"Using taskhub: {taskhub_name}") + print(f"Using endpoint: {endpoint}") + + # Set credential to None for emulator, or DefaultAzureCredential for Azure + credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() + + client = DurableTaskSchedulerClient( + host_address=endpoint, + secure_channel=endpoint != "http://localhost:8080", + taskhub=taskhub_name, + token_credential=credential, + ) + + # Record the start time for batch purge later + start_time = datetime.now(timezone.utc) + + # ========================================================================= + # 1. Schedule and complete orchestrations + # ========================================================================= + print("\n=== Step 1: Schedule orchestrations ===") + instance_ids = [] + for i in range(3): + instance_id = client.schedule_new_orchestration( + "data_processing_orchestrator", + input={"batch_id": f"batch-{i + 1}", "item_count": (i + 1) * 10}, + ) + instance_ids.append(instance_id) + logger.info(f"Scheduled orchestration {i + 1} with ID: {instance_id}") + + # Wait for all orchestrations to complete + for instance_id in instance_ids: + state = client.wait_for_orchestration_completion(instance_id, timeout=60) + if state and state.runtime_status == durable_client.OrchestrationStatus.COMPLETED: + print(f" Completed: {instance_id} -> {state.serialized_output}") + elif state: + print(f" Failed: {instance_id} -> {state.failure_details}") + + # ========================================================================= + # 2. Restart an orchestration (reuses the same instance ID) + # ========================================================================= + print("\n=== Step 2: Restart orchestration (same instance ID) ===") + original_id = instance_ids[0] + restarted_id = client.restart_orchestration(original_id) + print(f" Restarted {original_id} -> new execution ID: {restarted_id}") + + state = client.wait_for_orchestration_completion(restarted_id, timeout=60) + if state and state.runtime_status == durable_client.OrchestrationStatus.COMPLETED: + print(f" Restarted orchestration completed: {state.serialized_output}") + + # ========================================================================= + # 3. Restart with a new instance ID + # ========================================================================= + print("\n=== Step 3: Restart orchestration (new instance ID) ===") + new_id = client.restart_orchestration( + instance_ids[1], restart_with_new_instance_id=True + ) + print(f" Restarted {instance_ids[1]} with new ID: {new_id}") + + state = client.wait_for_orchestration_completion(new_id, timeout=60) + if state and state.runtime_status == durable_client.OrchestrationStatus.COMPLETED: + print(f" New orchestration completed: {state.serialized_output}") + + # ========================================================================= + # 4. Query orchestration instances + # ========================================================================= + print("\n=== Step 4: Query orchestrations ===") + query = durable_client.OrchestrationQuery( + created_time_from=start_time, + runtime_status=[durable_client.OrchestrationStatus.COMPLETED], + ) + states = client.get_all_orchestration_states(query) + print(f" Found {len(states)} completed orchestration(s) since {start_time.isoformat()}") + for s in states: + print(f" - {s.instance_id}: {s.name}") + + # ========================================================================= + # 5. Batch purge completed orchestrations + # ========================================================================= + print("\n=== Step 5: Batch purge completed orchestrations ===") + result = client.purge_orchestrations_by( + created_time_from=start_time, + runtime_status=[durable_client.OrchestrationStatus.COMPLETED], + ) + print(f" Purged {result.deleted_instance_count} orchestration(s)") + + # Verify purge worked + states = client.get_all_orchestration_states(query) + print(f" Remaining completed orchestrations: {len(states)}") + + print("\nDone!") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/samples/durable-task-sdks/python/orchestration-management/requirements.txt b/samples/durable-task-sdks/python/orchestration-management/requirements.txt new file mode 100644 index 00000000..1bc468dd --- /dev/null +++ b/samples/durable-task-sdks/python/orchestration-management/requirements.txt @@ -0,0 +1,2 @@ +durabletask-azuremanaged +azure-identity diff --git a/samples/durable-task-sdks/python/orchestration-management/worker.py b/samples/durable-task-sdks/python/orchestration-management/worker.py new file mode 100644 index 00000000..b82d803d --- /dev/null +++ b/samples/durable-task-sdks/python/orchestration-management/worker.py @@ -0,0 +1,70 @@ +import asyncio +import logging +import os +from azure.identity import DefaultAzureCredential +from durabletask import task +from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# Activity functions +def process_batch(ctx: task.ActivityContext, data: dict) -> dict: + """Activity that simulates processing a batch of data.""" + batch_id = data["batch_id"] + item_count = data["item_count"] + logger.info(f"Processing batch {batch_id} with {item_count} items") + return { + "batch_id": batch_id, + "items_processed": item_count, + "status": "success", + } + + +# Orchestrator function +def data_processing_orchestrator(ctx: task.OrchestrationContext, data: dict): + """Orchestrator that processes a batch of data.""" + result = yield ctx.call_activity(process_batch, input=data) + return result + + +async def main(): + """Main entry point for the worker process.""" + logger.info("Starting Orchestration Management pattern worker...") + + # Get environment variables for taskhub and endpoint with defaults + taskhub_name = os.getenv("TASKHUB", "default") + endpoint = os.getenv("ENDPOINT", "http://localhost:8080") + + print(f"Using taskhub: {taskhub_name}") + print(f"Using endpoint: {endpoint}") + + # Set credential to None for emulator, or DefaultAzureCredential for Azure + credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() + + with DurableTaskSchedulerWorker( + host_address=endpoint, + secure_channel=endpoint != "http://localhost:8080", + taskhub=taskhub_name, + token_credential=credential, + ) as worker: + + # Register activities and orchestrator + worker.add_activity(process_batch) + worker.add_orchestrator(data_processing_orchestrator) + + # Start the worker + worker.start() + + try: + while True: + await asyncio.sleep(1) + except KeyboardInterrupt: + logger.info("Worker shutdown initiated") + + logger.info("Worker stopped") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/samples/durable-task-sdks/python/testing/README.md b/samples/durable-task-sdks/python/testing/README.md new file mode 100644 index 00000000..f563290f --- /dev/null +++ b/samples/durable-task-sdks/python/testing/README.md @@ -0,0 +1,113 @@ +# In-Memory Testing + +Python | Durable Task SDK + +## Description of the Sample + +This sample demonstrates how to unit test Durable Task orchestrations and activities using the in-memory backend (`InMemoryOrchestrationBackend`) — no Docker, no emulator, and no Azure resources required. + +In this sample: +1. `workflows.py` defines a pure `order_processing_orchestrator` with `validate_order`, `charge_payment`, and `ship_order` activities +2. `test_workflows.py` uses `create_test_backend()` from `durabletask.testing` to run orchestrations entirely in-process +3. Tests verify both happy paths (successful orders) and failure paths (validation errors) + +This pattern is useful for: +- Fast feedback loops during development — tests run in seconds +- CI/CD pipelines where Docker is unavailable or adds overhead +- Verifying orchestration logic in isolation before deploying to Azure +- Test-driven development of durable workflows + +## Prerequisites + +1. [Python 3.10+](https://www.python.org/downloads/) +2. No Docker or emulator required! + +## How to Run the Sample + +1. First, activate your Python virtual environment (if you're using one): + ```bash + python -m venv venv + source venv/bin/activate # On Windows, use: venv\Scripts\activate + ``` + +2. Install the required packages: + ```bash + pip install -r requirements.txt + ``` + +3. Run the tests: + ```bash + pytest test_workflows.py -v + ``` + +## Expected Output + +``` +test_workflows.py::TestOrderProcessing::test_single_item_order PASSED +test_workflows.py::TestOrderProcessing::test_multi_item_order PASSED +test_workflows.py::TestOrderValidationFailures::test_empty_items_fails PASSED +test_workflows.py::TestOrderValidationFailures::test_invalid_quantity_fails PASSED + +========================= 4 passed ========================= +``` + +## Code Walkthrough + +### Separating Workflow Logic + +Keep orchestrators and activities in a separate module (`workflows.py`) with no infrastructure dependencies: + +```python +from durabletask import task + +def validate_order(ctx: task.ActivityContext, order) -> None: + if not order.items: + raise ValueError("Order must contain at least one item") + +def order_processing_orchestrator(ctx: task.OrchestrationContext, order): + yield ctx.call_activity(validate_order, input=order) + # ... more activities +``` + +### Writing Tests with the In-Memory Backend + +Use `create_test_backend()` to spin up a lightweight gRPC server, then connect a standard `TaskHubGrpcWorker` and `TaskHubGrpcClient`: + +```python +from durabletask import client, worker +from durabletask.testing import create_test_backend + +@pytest.fixture(autouse=True) +def backend(): + b = create_test_backend(port=50061) + yield b + b.stop() + b.reset() + +def test_my_orchestration(backend): + w = worker.TaskHubGrpcWorker(host_address="localhost:50061") + w.add_orchestrator(my_orchestrator) + w.add_activity(my_activity) + + with w: + w.start() + c = client.TaskHubGrpcClient(host_address="localhost:50061") + instance_id = c.schedule_new_orchestration(my_orchestrator, input="data") + state = c.wait_for_orchestration_completion(instance_id, timeout=30) + + assert state.runtime_status == client.OrchestrationStatus.COMPLETED +``` + +Key points: +- Use `TaskHubGrpcWorker` and `TaskHubGrpcClient` (not the DTS-specific classes) since the in-memory backend implements the same gRPC interface +- Call `backend.stop()` and `backend.reset()` after each test to clean up state +- The `timeout` parameter on `wait_for_orchestration_completion` prevents tests from hanging + +## Related Samples + +- [Function Chaining](../function-chaining/) - Basic sequential workflow pattern +- [Human Interaction](../human-interaction/) - Approval workflows with external events + +## Learn More + +- [Durable Task Scheduler documentation](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/develop-with-durable-task-scheduler) diff --git a/samples/durable-task-sdks/python/testing/requirements.txt b/samples/durable-task-sdks/python/testing/requirements.txt new file mode 100644 index 00000000..721d2f6f --- /dev/null +++ b/samples/durable-task-sdks/python/testing/requirements.txt @@ -0,0 +1,2 @@ +durabletask +pytest diff --git a/samples/durable-task-sdks/python/testing/test_workflows.py b/samples/durable-task-sdks/python/testing/test_workflows.py new file mode 100644 index 00000000..2f67522d --- /dev/null +++ b/samples/durable-task-sdks/python/testing/test_workflows.py @@ -0,0 +1,148 @@ +""" +Unit tests for the order-processing workflows. + +These tests use the in-memory backend so they run entirely in-process +with no external dependencies — no sidecar, no emulator, no Azure. + +Run with: + pytest test_workflows.py +""" + +import json + +import pytest + +from durabletask import client, worker +from durabletask.testing import create_test_backend + +from workflows import ( + charge_payment, + order_processing_orchestrator, + ship_order, + validate_order, +) + +HOST = "localhost:50061" + + +@pytest.fixture(autouse=True) +def backend(): + """Start and stop the in-memory backend for each test.""" + b = create_test_backend(port=50061) + yield b + b.stop() + b.reset() + + +def _create_worker() -> worker.TaskHubGrpcWorker: + """Create a worker with all orchestrators and activities registered.""" + w = worker.TaskHubGrpcWorker(host_address=HOST) + w.add_orchestrator(order_processing_orchestrator) + w.add_activity(validate_order) + w.add_activity(charge_payment) + w.add_activity(ship_order) + return w + + +# --------------------------------------------------------------------------- +# Happy path tests +# --------------------------------------------------------------------------- + +class TestOrderProcessing: + """Tests for the order_processing_orchestrator.""" + + def test_single_item_order(self): + """A single-item order should complete with the correct total.""" + order = { + "customer": "Alice", + "items": [{"name": "Widget", "quantity": 2, "unit_price": 10.00}], + } + + with _create_worker() as w: + w.start() + c = client.TaskHubGrpcClient(host_address=HOST) + instance_id = c.schedule_new_orchestration( + order_processing_orchestrator, input=order + ) + state = c.wait_for_orchestration_completion(instance_id, timeout=30) + + assert state is not None + assert state.runtime_status == client.OrchestrationStatus.COMPLETED + assert state.serialized_output is not None + + result = json.loads(state.serialized_output) + assert result["total"] == 20.0 + assert result["status"] == "completed" + assert result["payment_id"] == "PAY-2000" + assert result["tracking_id"] == "TRACK-ALICE-1" + + def test_multi_item_order(self): + """An order with multiple items should calculate the correct total.""" + order = { + "customer": "Bob", + "items": [ + {"name": "Widget", "quantity": 3, "unit_price": 25.00}, + {"name": "Gadget", "quantity": 1, "unit_price": 99.99}, + ], + } + + with _create_worker() as w: + w.start() + c = client.TaskHubGrpcClient(host_address=HOST) + instance_id = c.schedule_new_orchestration( + order_processing_orchestrator, input=order + ) + state = c.wait_for_orchestration_completion(instance_id, timeout=30) + + assert state is not None + assert state.runtime_status == client.OrchestrationStatus.COMPLETED + + result = json.loads(state.serialized_output) + expected_total = 3 * 25.00 + 1 * 99.99 # 174.99 + assert result["total"] == expected_total + assert result["tracking_id"] == "TRACK-BOB-2" + + +# --------------------------------------------------------------------------- +# Failure path tests +# --------------------------------------------------------------------------- + +class TestOrderValidationFailures: + """Tests that verify validation errors are surfaced correctly.""" + + def test_empty_items_fails(self): + """An order with no items should fail validation.""" + order = {"customer": "Eve", "items": []} + + with _create_worker() as w: + w.start() + c = client.TaskHubGrpcClient(host_address=HOST) + instance_id = c.schedule_new_orchestration( + order_processing_orchestrator, input=order + ) + state = c.wait_for_orchestration_completion(instance_id, timeout=30) + + assert state is not None + assert state.runtime_status == client.OrchestrationStatus.FAILED + assert state.failure_details is not None + assert "at least one item" in state.failure_details.message + + def test_invalid_quantity_fails(self): + """An item with zero quantity should fail validation.""" + order = { + "customer": "Mallory", + "items": [{"name": "Widget", "quantity": 0, "unit_price": 10.00}], + } + + with _create_worker() as w: + w.start() + c = client.TaskHubGrpcClient(host_address=HOST) + instance_id = c.schedule_new_orchestration( + order_processing_orchestrator, input=order + ) + state = c.wait_for_orchestration_completion(instance_id, timeout=30) + + assert state is not None + assert state.runtime_status == client.OrchestrationStatus.FAILED + assert state.failure_details is not None + assert "Invalid quantity" in state.failure_details.message diff --git a/samples/durable-task-sdks/python/testing/workflows.py b/samples/durable-task-sdks/python/testing/workflows.py new file mode 100644 index 00000000..f78700c8 --- /dev/null +++ b/samples/durable-task-sdks/python/testing/workflows.py @@ -0,0 +1,78 @@ +""" +Orchestrators and activities for a simple order-processing workflow. + +This module defines pure workflow logic with no infrastructure dependencies, +making it easy to test with the in-memory backend. +""" + +from durabletask import task + + +# --------------------------------------------------------------------------- +# Activities +# --------------------------------------------------------------------------- + +def validate_order(ctx: task.ActivityContext, order) -> None: + """Validate that the order has items and a customer name. + + Raises ``ValueError`` on invalid input. + """ + if not order.customer: + raise ValueError("Order must have a customer name") + if not order.items: + raise ValueError("Order must contain at least one item") + for item in order.items: + if item["quantity"] <= 0: + raise ValueError(f"Invalid quantity for '{item['name']}': {item['quantity']}") + + +def charge_payment(ctx: task.ActivityContext, amount: float) -> str: + """Process a payment and return a confirmation ID. + + Raises ``ValueError`` if the amount is not positive. + """ + if amount <= 0: + raise ValueError("Payment amount must be positive") + # In a real app this would call a payment gateway + return f"PAY-{int(amount * 100)}" + + +def ship_order(ctx: task.ActivityContext, data: dict) -> str: + """Ship an order and return a tracking ID.""" + customer = data["customer"] + item_count = data["item_count"] + # In a real app this would call a shipping service + return f"TRACK-{customer.upper()}-{item_count}" + + +# --------------------------------------------------------------------------- +# Orchestrator +# --------------------------------------------------------------------------- + +def order_processing_orchestrator(ctx: task.OrchestrationContext, order): + """Process an order: validate, calculate total, charge, and ship. + + Demonstrates a sequential activity chain that is easy to unit test + with the in-memory backend. + """ + # 1. Validate the order + yield ctx.call_activity(validate_order, input=order) + + # 2. Calculate total + total = sum(item["quantity"] * item["unit_price"] for item in order.items) + + # 3. Charge payment + payment_id = yield ctx.call_activity(charge_payment, input=total) + + # 4. Ship the order + tracking_id = yield ctx.call_activity(ship_order, input={ + "customer": order.customer, + "item_count": len(order.items), + }) + + return { + "payment_id": payment_id, + "tracking_id": tracking_id, + "total": total, + "status": "completed", + } diff --git a/samples/durable-task-sdks/python/work-item-filtering/README.md b/samples/durable-task-sdks/python/work-item-filtering/README.md new file mode 100644 index 00000000..6a527e0c --- /dev/null +++ b/samples/durable-task-sdks/python/work-item-filtering/README.md @@ -0,0 +1,221 @@ +# Work Item Filtering + +Python | Durable Task SDK + +## Description of the Sample + +This sample demonstrates work item filtering with the Azure Durable Task Scheduler using the Python SDK. Work item filtering allows you to run multiple specialized workers where each worker only processes specific orchestrations and activities, enabling workload isolation and independent scaling. + +In this sample: +1. **Worker A** registers a `greeting_orchestrator` with a `say_hello` activity and enables auto-generated filters +2. **Worker B** registers a `math_orchestrator` with an `add_numbers` activity and enables auto-generated filters +3. The client schedules both types of orchestrations +4. Each orchestration is routed only to the worker that registered the matching orchestrator and activities + +This pattern is useful for: +- Running specialized workers that handle different workload types +- Scaling workers independently based on workload characteristics (e.g., CPU-intensive vs. I/O-bound) +- Isolating workloads for reliability — a failure in one worker type doesn't affect others +- Deploying updates to specific orchestration types without affecting the rest + +## Prerequisites + +1. [Python 3.10+](https://www.python.org/downloads/) +2. [Docker](https://www.docker.com/products/docker-desktop/) (for running the emulator) +3. [Azure CLI](https://docs.microsoft.com/cli/azure/install-azure-cli) (if using a deployed Durable Task Scheduler) + +## Configuring Durable Task Scheduler + +There are two ways to run this sample locally: + +### Using the Emulator (Recommended) + +The emulator simulates a scheduler and taskhub in a Docker container, making it ideal for development and learning. + +1. Pull the Docker Image for the Emulator: + ```bash + docker pull mcr.microsoft.com/dts/dts-emulator:latest + ``` + +2. Run the Emulator: + ```bash + docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest + ``` + +Wait a few seconds for the container to be ready. + +Note: The example code automatically uses the default emulator settings (endpoint: `http://localhost:8080`, taskhub: `default`). You don't need to set any environment variables. + +### Using a Deployed Scheduler and Taskhub in Azure + +Local development with a deployed scheduler: + +1. Install the durable task scheduler CLI extension: + + ```bash + az upgrade + az extension add --name durabletask --allow-preview true + ``` + +2. Create a resource group in a region where the Durable Task Scheduler is available: + + ```bash + az provider show --namespace Microsoft.DurableTask --query "resourceTypes[?resourceType=='schedulers'].locations | [0]" --out table + ``` + + ```bash + az group create --name my-resource-group --location + ``` + +3. Create a durable task scheduler resource: + + ```bash + az durabletask scheduler create \ + --resource-group my-resource-group \ + --name my-scheduler \ + --ip-allowlist '["0.0.0.0/0"]' \ + --sku-name "Dedicated" \ + --sku-capacity 1 \ + --tags "{'myattribute':'myvalue'}" + ``` + +4. Create a task hub within the scheduler resource: + + ```bash + az durabletask taskhub create \ + --resource-group my-resource-group \ + --scheduler-name my-scheduler \ + --name "my-taskhub" + ``` + +5. Grant the current user permission to connect to the `my-taskhub` task hub: + + ```bash + subscriptionId=$(az account show --query "id" -o tsv) + loggedInUser=$(az account show --query "user.name" -o tsv) + + az role assignment create \ + --assignee $loggedInUser \ + --role "Durable Task Data Contributor" \ + --scope "/subscriptions/$subscriptionId/resourceGroups/my-resource-group/providers/Microsoft.DurableTask/schedulers/my-scheduler/taskHubs/my-taskhub" + ``` + +## How to Run the Sample + +Once you have set up either the emulator or deployed scheduler, follow these steps to run the sample: + +1. First, activate your Python virtual environment (if you're using one): + ```bash + python -m venv venv + source venv/bin/activate # On Windows, use: venv\Scripts\activate + ``` + +2. If you're using a deployed scheduler, set environment variables: + ```bash + export ENDPOINT=$(az durabletask scheduler show \ + --resource-group my-resource-group \ + --name my-scheduler \ + --query "properties.endpoint" \ + --output tsv) + + export TASKHUB="my-taskhub" + ``` + +3. Install the required packages: + ```bash + pip install -r requirements.txt + ``` + +4. Start Worker A (greeting worker) in a terminal: + ```bash + python worker_a.py + ``` + +5. Start Worker B (math worker) in a second terminal: + ```bash + python worker_b.py + ``` + +6. In a third terminal, run the client: + > **Note:** Remember to set the environment variables again if you're using a deployed scheduler. + + ```bash + python client.py + ``` + +## Expected Output + +### Worker A Output +``` +[Worker A] Using taskhub: default +[Worker A] Using endpoint: http://localhost:8080 +INFO:__main__:[Worker A] Ready — processing only greeting orchestrations +INFO:__main__:[Worker A] say_hello called with name: World +``` + +### Worker B Output +``` +[Worker B] Using taskhub: default +[Worker B] Using endpoint: http://localhost:8080 +INFO:__main__:[Worker B] Ready — processing only math orchestrations +INFO:__main__:[Worker B] add_numbers called with a=40, b=2 +``` + +### Client Output +``` +--- Scheduling greeting orchestration (Worker A) --- +--- Scheduling math orchestration (Worker B) --- + +Waiting for orchestrations to complete... +Greeting result: "Hello, World!" +Math result: 42 + +Done! Check the worker terminal outputs to see which worker handled each orchestration. +``` + +Notice that Worker A only processed the greeting orchestration and Worker B only processed the math orchestration. + +## Code Walkthrough + +### Auto-Generated Filters + +The simplest way to enable filtering is to call `use_work_item_filters()` with no arguments after registering your orchestrators and activities: + +```python +worker.add_orchestrator(greeting_orchestrator) +worker.add_activity(say_hello) +worker.use_work_item_filters() # Auto-generate from registry +``` + +The SDK automatically builds filters from everything registered with the worker. + +### Explicit Filters + +For more control, you can pass explicit `WorkItemFilters` with optional version constraints: + +```python +from durabletask import worker + +worker.use_work_item_filters(worker.WorkItemFilters( + orchestrations=[ + worker.OrchestrationWorkItemFilter(name="greeting_orchestrator", versions=["1.0"]), + ], + activities=[ + worker.ActivityWorkItemFilter(name="say_hello"), + ], +)) +``` + +## Viewing in the Dashboard + +- **Emulator:** Navigate to http://localhost:8082 → select the "default" task hub +- **Azure:** Navigate to your Scheduler resource in the Azure Portal → Task Hub → Dashboard URL + +## Related Samples + +- [Function Chaining](../function-chaining/) - Basic sequential workflow pattern +- [Orchestration Versioning](../versioning/) - Safe orchestration evolution with version constraints + +## Learn More + +- [Durable Task Scheduler documentation](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/develop-with-durable-task-scheduler) diff --git a/samples/durable-task-sdks/python/work-item-filtering/client.py b/samples/durable-task-sdks/python/work-item-filtering/client.py new file mode 100644 index 00000000..21f6ecb7 --- /dev/null +++ b/samples/durable-task-sdks/python/work-item-filtering/client.py @@ -0,0 +1,66 @@ +import asyncio +import logging +import os +from azure.identity import DefaultAzureCredential +from durabletask import client as durable_client +from durabletask.azuremanaged.client import DurableTaskSchedulerClient + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +async def main(): + """Main entry point for the client application.""" + logger.info("Starting Work Item Filtering client...") + + # Get environment variables for taskhub and endpoint with defaults + taskhub_name = os.getenv("TASKHUB", "default") + endpoint = os.getenv("ENDPOINT", "http://localhost:8080") + + print(f"Using taskhub: {taskhub_name}") + print(f"Using endpoint: {endpoint}") + + # Set credential to None for emulator, or DefaultAzureCredential for Azure + credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() + + client = DurableTaskSchedulerClient( + host_address=endpoint, + secure_channel=endpoint != "http://localhost:8080", + taskhub=taskhub_name, + token_credential=credential, + ) + + # --- Schedule a greeting orchestration (handled by Worker A) --- + print("\n--- Scheduling greeting orchestration (Worker A) ---") + greeting_id = client.schedule_new_orchestration( + "greeting_orchestrator", input="World" + ) + logger.info(f"Greeting orchestration scheduled with ID: {greeting_id}") + + # --- Schedule a math orchestration (handled by Worker B) --- + print("\n--- Scheduling math orchestration (Worker B) ---") + math_id = client.schedule_new_orchestration( + "math_orchestrator", input={"a": 40, "b": 2} + ) + logger.info(f"Math orchestration scheduled with ID: {math_id}") + + # --- Wait for both to complete --- + print("\nWaiting for orchestrations to complete...") + + greeting_state = client.wait_for_orchestration_completion(greeting_id, timeout=60) + if greeting_state and greeting_state.runtime_status == durable_client.OrchestrationStatus.COMPLETED: + print(f"Greeting result: {greeting_state.serialized_output}") + elif greeting_state: + print(f"Greeting orchestration failed: {greeting_state.failure_details}") + + math_state = client.wait_for_orchestration_completion(math_id, timeout=60) + if math_state and math_state.runtime_status == durable_client.OrchestrationStatus.COMPLETED: + print(f"Math result: {math_state.serialized_output}") + elif math_state: + print(f"Math orchestration failed: {math_state.failure_details}") + + print("\nDone! Check the worker terminal outputs to see which worker handled each orchestration.") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/samples/durable-task-sdks/python/work-item-filtering/requirements.txt b/samples/durable-task-sdks/python/work-item-filtering/requirements.txt new file mode 100644 index 00000000..1bc468dd --- /dev/null +++ b/samples/durable-task-sdks/python/work-item-filtering/requirements.txt @@ -0,0 +1,2 @@ +durabletask-azuremanaged +azure-identity diff --git a/samples/durable-task-sdks/python/work-item-filtering/worker_a.py b/samples/durable-task-sdks/python/work-item-filtering/worker_a.py new file mode 100644 index 00000000..8b8dc866 --- /dev/null +++ b/samples/durable-task-sdks/python/work-item-filtering/worker_a.py @@ -0,0 +1,69 @@ +import asyncio +import logging +import os +from azure.identity import DefaultAzureCredential +from durabletask import task +from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# Activity functions +def say_hello(ctx: task.ActivityContext, name: str) -> str: + """Activity that returns a greeting.""" + logger.info(f"[Worker A] say_hello called with name: {name}") + return f"Hello, {name}!" + + +# Orchestrator function +def greeting_orchestrator(ctx: task.OrchestrationContext, name: str): + """Orchestrator that calls the say_hello activity.""" + result = yield ctx.call_activity(say_hello, input=name) + return result + + +async def main(): + """Main entry point for Worker A — handles greeting orchestrations only.""" + logger.info("Starting Worker A (greeting worker)...") + + # Get environment variables for taskhub and endpoint with defaults + taskhub_name = os.getenv("TASKHUB", "default") + endpoint = os.getenv("ENDPOINT", "http://localhost:8080") + + print(f"[Worker A] Using taskhub: {taskhub_name}") + print(f"[Worker A] Using endpoint: {endpoint}") + + # Set credential to None for emulator, or DefaultAzureCredential for Azure + credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() + + with DurableTaskSchedulerWorker( + host_address=endpoint, + secure_channel=endpoint != "http://localhost:8080", + taskhub=taskhub_name, + token_credential=credential, + ) as worker: + + # Register only the greeting orchestrator and activity + worker.add_orchestrator(greeting_orchestrator) + worker.add_activity(say_hello) + + # Enable work item filtering — the worker will only receive work items + # for the orchestrators and activities registered above + worker.use_work_item_filters() + + # Start the worker + worker.start() + logger.info("[Worker A] Ready — processing only greeting orchestrations") + + try: + while True: + await asyncio.sleep(1) + except KeyboardInterrupt: + logger.info("[Worker A] Shutdown initiated") + + logger.info("[Worker A] Stopped") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/samples/durable-task-sdks/python/work-item-filtering/worker_b.py b/samples/durable-task-sdks/python/work-item-filtering/worker_b.py new file mode 100644 index 00000000..ef1684bb --- /dev/null +++ b/samples/durable-task-sdks/python/work-item-filtering/worker_b.py @@ -0,0 +1,71 @@ +import asyncio +import logging +import os +from azure.identity import DefaultAzureCredential +from durabletask import task +from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +# Activity functions +def add_numbers(ctx: task.ActivityContext, data: dict) -> int: + """Activity that adds two numbers together.""" + a = data["a"] + b = data["b"] + logger.info(f"[Worker B] add_numbers called with a={a}, b={b}") + return a + b + + +# Orchestrator function +def math_orchestrator(ctx: task.OrchestrationContext, data: dict): + """Orchestrator that calls the add_numbers activity.""" + result = yield ctx.call_activity(add_numbers, input=data) + return result + + +async def main(): + """Main entry point for Worker B — handles math orchestrations only.""" + logger.info("Starting Worker B (math worker)...") + + # Get environment variables for taskhub and endpoint with defaults + taskhub_name = os.getenv("TASKHUB", "default") + endpoint = os.getenv("ENDPOINT", "http://localhost:8080") + + print(f"[Worker B] Using taskhub: {taskhub_name}") + print(f"[Worker B] Using endpoint: {endpoint}") + + # Set credential to None for emulator, or DefaultAzureCredential for Azure + credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() + + with DurableTaskSchedulerWorker( + host_address=endpoint, + secure_channel=endpoint != "http://localhost:8080", + taskhub=taskhub_name, + token_credential=credential, + ) as worker: + + # Register only the math orchestrator and activity + worker.add_orchestrator(math_orchestrator) + worker.add_activity(add_numbers) + + # Enable work item filtering — the worker will only receive work items + # for the orchestrators and activities registered above + worker.use_work_item_filters() + + # Start the worker + worker.start() + logger.info("[Worker B] Ready — processing only math orchestrations") + + try: + while True: + await asyncio.sleep(1) + except KeyboardInterrupt: + logger.info("[Worker B] Shutdown initiated") + + logger.info("[Worker B] Stopped") + +if __name__ == "__main__": + asyncio.run(main())