From 016470fc006db4dadf22f9c1419a8a8f25e21f6f Mon Sep 17 00:00:00 2001 From: wangbill Date: Wed, 18 Feb 2026 12:24:05 -0800 Subject: [PATCH 1/3] Large Payload Samples --- .../dotnet/LargePayload/.gitignore | 28 ++ .../dotnet/LargePayload/LargePayload.csproj | 28 ++ .../LargePayload/LargePayloadOrchestration.cs | 156 ++++++++++ .../dotnet/LargePayload/Program.cs | 14 + .../dotnet/LargePayload/README.md | 282 ++++++++++++++++++ .../dotnet/LargePayload/host.json | 29 ++ .../classes/com/example/Functions.class | Bin 0 -> 5044 bytes .../python/large-payload/.gitignore | 46 +++ .../python/large-payload/README.md | 269 +++++++++++++++++ .../python/large-payload/function_app.py | 105 +++++++ .../python/large-payload/host.json | 29 ++ .../python/large-payload/requirements.txt | 6 + 12 files changed, 992 insertions(+) create mode 100644 samples/durable-functions/dotnet/LargePayload/.gitignore create mode 100644 samples/durable-functions/dotnet/LargePayload/LargePayload.csproj create mode 100644 samples/durable-functions/dotnet/LargePayload/LargePayloadOrchestration.cs create mode 100644 samples/durable-functions/dotnet/LargePayload/Program.cs create mode 100644 samples/durable-functions/dotnet/LargePayload/README.md create mode 100644 samples/durable-functions/dotnet/LargePayload/host.json create mode 100644 samples/durable-functions/java/HelloCities/target/classes/com/example/Functions.class create mode 100644 samples/durable-functions/python/large-payload/.gitignore create mode 100644 samples/durable-functions/python/large-payload/README.md create mode 100644 samples/durable-functions/python/large-payload/function_app.py create mode 100644 samples/durable-functions/python/large-payload/host.json create mode 100644 samples/durable-functions/python/large-payload/requirements.txt diff --git a/samples/durable-functions/dotnet/LargePayload/.gitignore b/samples/durable-functions/dotnet/LargePayload/.gitignore new file mode 100644 index 0000000..5dd4f37 --- /dev/null +++ b/samples/durable-functions/dotnet/LargePayload/.gitignore @@ -0,0 +1,28 @@ +## Ignore Visual Studio temporary files, build results, and +## files generated by popular Visual Studio add-ons. + +# Azure Functions localsettings file +local.settings.json + +# User-specific files +*.suo +*.user +*.userosscache +*.sln.docstates + +# Build results +[Dd]ebug/ +[Dd]ebugPublic/ +[Rr]elease/ +[Rr]eleases/ +x64/ +x86/ +bld/ +[Bb]in/ +[Oo]bj/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo diff --git a/samples/durable-functions/dotnet/LargePayload/LargePayload.csproj b/samples/durable-functions/dotnet/LargePayload/LargePayload.csproj new file mode 100644 index 0000000..3ea9934 --- /dev/null +++ b/samples/durable-functions/dotnet/LargePayload/LargePayload.csproj @@ -0,0 +1,28 @@ + + + net8.0 + v4 + Exe + enable + enable + + + + + + + + + + + + + + PreserveNewest + + + PreserveNewest + Never + + + diff --git a/samples/durable-functions/dotnet/LargePayload/LargePayloadOrchestration.cs b/samples/durable-functions/dotnet/LargePayload/LargePayloadOrchestration.cs new file mode 100644 index 0000000..67e1824 --- /dev/null +++ b/samples/durable-functions/dotnet/LargePayload/LargePayloadOrchestration.cs @@ -0,0 +1,156 @@ +// Large Payload Sample - .NET Isolated Durable Functions with Durable Task Scheduler +// +// Demonstrates how to use the large payload storage feature to handle payloads +// that exceed the Durable Task Scheduler's message size limit. When enabled, +// payloads larger than the configured threshold are automatically offloaded to +// Azure Blob Storage (compressed via gzip), keeping orchestration history lean +// while supporting arbitrarily large data. +// +// This sample uses a fan-out/fan-in pattern: the orchestrator fans out to multiple +// activity functions, each of which generates a large payload (configurable size). +// The orchestrator then aggregates the results. + +using System.Text.Json; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Azure.Functions.Worker.Http; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.Extensions.Logging; + +namespace LargePayload; + +public static class LargePayloadOrchestration +{ + // Default payload size in KB (override via PAYLOAD_SIZE_KB app setting) + private const int DefaultPayloadSizeKb = 100; + + // Default number of parallel activities (override via ACTIVITY_COUNT app setting) + private const int DefaultActivityCount = 5; + + // ----------------------------------------------------------------------- + // HTTP Trigger – starts the orchestration + // ----------------------------------------------------------------------- + [Function("StartLargePayload")] + public static async Task HttpStart( + [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req, + [DurableClient] DurableTaskClient client, + FunctionContext executionContext) + { + ILogger logger = executionContext.GetLogger("StartLargePayload"); + + // Read configuration from environment variables and pass as orchestration input + // (environment variable access must not happen inside the orchestrator). + int activityCount = int.TryParse( + Environment.GetEnvironmentVariable("ACTIVITY_COUNT"), out int ac) ? ac : DefaultActivityCount; + int payloadSizeKb = int.TryParse( + Environment.GetEnvironmentVariable("PAYLOAD_SIZE_KB"), out int ps) ? ps : DefaultPayloadSizeKb; + + var config = new OrchestratorConfig(activityCount, payloadSizeKb); + + string instanceId = await client.ScheduleNewOrchestrationInstanceAsync( + nameof(LargePayloadFanOutFanIn), config); + + logger.LogInformation("Started orchestration with ID = '{InstanceId}'.", instanceId); + + return await client.CreateCheckStatusResponseAsync(req, instanceId); + } + + // ----------------------------------------------------------------------- + // Orchestrator – fans out to N parallel activities, each producing a large payload + // ----------------------------------------------------------------------- + [Function(nameof(LargePayloadFanOutFanIn))] + public static async Task LargePayloadFanOutFanIn( + [OrchestrationTrigger] TaskOrchestrationContext context) + { + ILogger logger = context.CreateReplaySafeLogger(nameof(LargePayloadFanOutFanIn)); + + // Read config from orchestration input (set by the HTTP trigger) + // to avoid non-deterministic environment variable access in the orchestrator. + OrchestratorConfig config = context.GetInput() ?? new OrchestratorConfig(); + int activityCount = config.ActivityCount > 0 ? config.ActivityCount : DefaultActivityCount; + int payloadSizeKb = config.PayloadSizeKb > 0 ? config.PayloadSizeKb : DefaultPayloadSizeKb; + + logger.LogInformation( + "Starting fan-out: {Count} activities, each generating {SizeKb} KB payloads.", + activityCount, payloadSizeKb); + + // Fan-out: schedule N activities in parallel + var tasks = new List>(); + for (int i = 0; i < activityCount; i++) + { + tasks.Add(context.CallActivityAsync( + nameof(ProcessLargeData), + new ActivityInput(i, payloadSizeKb))); + } + + // Fan-in: wait for all activities to complete + ActivityResult[] results = await Task.WhenAll(tasks); + + // Aggregate results + var summary = new PayloadSummary( + ItemsProcessed: results.Length, + TotalSizeKb: results.Sum(r => r.SizeKb), + IndividualSizes: results.Select(r => r.SizeKb).ToArray()); + + logger.LogInformation( + "Fan-in complete: {Count} items, {TotalKb} KB total.", + summary.ItemsProcessed, summary.TotalSizeKb); + + return summary; + } + + // ----------------------------------------------------------------------- + // Activity – generates and returns a large payload + // ----------------------------------------------------------------------- + [Function(nameof(ProcessLargeData))] + public static ActivityResult ProcessLargeData( + [ActivityTrigger] ActivityInput input, + FunctionContext executionContext) + { + ILogger logger = executionContext.GetLogger(nameof(ProcessLargeData)); + + logger.LogInformation( + "Task {TaskId}: generating {SizeKb} KB payload...", + input.TaskId, input.PayloadSizeKb); + + string payload = GenerateLargePayload(input.PayloadSizeKb); + + logger.LogInformation( + "Task {TaskId}: payload size = {Bytes} bytes.", + input.TaskId, payload.Length); + + return new ActivityResult(input.TaskId, input.PayloadSizeKb, payload); + } + + // ----------------------------------------------------------------------- + // Health-check endpoint + // ----------------------------------------------------------------------- + [Function("Hello")] + public static HttpResponseData Hello( + [HttpTrigger(AuthorizationLevel.Anonymous, "get")] HttpRequestData req) + { + var response = req.CreateResponse(System.Net.HttpStatusCode.OK); + response.WriteString("Hello from Large Payload Sample!"); + return response; + } + + // ----------------------------------------------------------------------- + // Helper: generate a JSON payload of approximately the specified size + // ----------------------------------------------------------------------- + private static string GenerateLargePayload(int sizeKb) + { + int targetBytes = sizeKb * 1024; + // Reserve space for JSON envelope + string filler = new('x', Math.Max(0, targetBytes - 100)); + var payload = new { size_kb = sizeKb, data = filler }; + return JsonSerializer.Serialize(payload); + } +} + +// ----------------------------------------------------------------------- +// DTOs +// ----------------------------------------------------------------------- +public record OrchestratorConfig(int ActivityCount = 5, int PayloadSizeKb = 100); +public record ActivityInput(int TaskId, int PayloadSizeKb); +public record ActivityResult(int TaskId, int SizeKb, string Payload); +public record PayloadSummary(int ItemsProcessed, int TotalSizeKb, int[] IndividualSizes); diff --git a/samples/durable-functions/dotnet/LargePayload/Program.cs b/samples/durable-functions/dotnet/LargePayload/Program.cs new file mode 100644 index 0000000..4b07a32 --- /dev/null +++ b/samples/durable-functions/dotnet/LargePayload/Program.cs @@ -0,0 +1,14 @@ +using Microsoft.Azure.Functions.Worker; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.DependencyInjection; + +var host = new HostBuilder() + .ConfigureFunctionsWebApplication() + .ConfigureServices(services => + { + services.AddApplicationInsightsTelemetryWorkerService(); + services.ConfigureFunctionsApplicationInsights(); + }) + .Build(); + +host.Run(); diff --git a/samples/durable-functions/dotnet/LargePayload/README.md b/samples/durable-functions/dotnet/LargePayload/README.md new file mode 100644 index 0000000..739698a --- /dev/null +++ b/samples/durable-functions/dotnet/LargePayload/README.md @@ -0,0 +1,282 @@ + + +# Large Payload Support - .NET Isolated Durable Functions + +This sample demonstrates how to use the **large payload storage** feature with .NET isolated Durable Functions and the [Durable Task Scheduler](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler). When enabled, payloads that exceed a configurable threshold are automatically offloaded to Azure Blob Storage (compressed via gzip), keeping orchestration history lean while supporting arbitrarily large data. + +The sample uses a **fan-out/fan-in** pattern: the orchestrator fans out to multiple activity functions, each generating a configurable-size payload (default 100 KB). The orchestrator then aggregates the results. + +## How large payload storage works + +The Durable Task Scheduler has a per-message size limit. When `largePayloadStorageEnabled` is set to `true` in `host.json`, any orchestration input, output, or activity result that exceeds `largePayloadStorageThresholdBytes` is: + +1. Compressed with gzip +2. Uploaded to a blob container (`durabletask-payloads` by default) in the storage account configured via `AzureWebJobsStorage` +3. Replaced in the orchestration history with a small reference pointer + +This happens transparently — no code changes are required. + +## Prerequisites + +- [.NET 8 SDK](https://dotnet.microsoft.com/download/dotnet/8.0) +- [Azure Functions Core Tools v4](https://learn.microsoft.com/azure/azure-functions/functions-run-local) +- [Azure CLI](https://learn.microsoft.com/cli/azure/install-azure-cli) +- A [Durable Task Scheduler](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) resource with a task hub +- An Azure Storage account (for large payload blob storage) +- [Docker](https://docs.docker.com/engine/install/) (optional, for local emulator) + +## Project structure + +``` +LargePayload/ +├── LargePayload.csproj # Project file with NuGet package references +├── LargePayloadOrchestration.cs # Orchestrator, activity, and HTTP trigger +├── Program.cs # Host builder setup +├── host.json # Host configuration with large payload settings +└── README.md +``` + +## Key configuration in host.json + +```json +"durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING", + "largePayloadStorageEnabled": true, + "largePayloadStorageThresholdBytes": 10240 + }, + "hubName": "%TASKHUB_NAME%" +} +``` + +| Setting | Description | Default | +|---|---|---| +| `largePayloadStorageEnabled` | Enables large payload externalization to blob storage | `false` | +| `largePayloadStorageThresholdBytes` | Payloads larger than this (in bytes) are externalized | `10240` (10 KB) | + +## NuGet packages + +This sample uses the following published packages: + +| Package | Version | +|---|---| +| `Microsoft.Azure.Functions.Worker.Extensions.DurableTask` | 1.14.1 | +| `Microsoft.Azure.Functions.Worker.Extensions.DurableTask.AzureManaged` | 1.3.0 | + +The `AzureManaged` package includes `Microsoft.DurableTask.Extensions.AzureBlobPayloads` as a transitive dependency, which provides the blob externalization capability. + +## Running locally with the emulator + +1. Pull and run the Durable Task Scheduler emulator: + + ```bash + docker run -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest + ``` + + Port `8080` exposes the gRPC endpoint and `8082` exposes the monitoring dashboard. + +2. Start Azurite (local storage emulator): + + ```bash + azurite start + ``` + +3. Create a `local.settings.json` file: + + ```json + { + "IsEncrypted": false, + "Values": { + "FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated", + "AzureWebJobsStorage": "UseDevelopmentStorage=true", + "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;Authentication=None", + "TASKHUB_NAME": "default", + "PAYLOAD_SIZE_KB": "100", + "ACTIVITY_COUNT": "5" + } + } + ``` + +4. Build and run the function app: + + ```bash + func start + ``` + + Expected output: + ``` + Hello: [GET] http://localhost:7071/api/Hello + LargePayloadFanOutFanIn: orchestrationTrigger + ProcessLargeData: activityTrigger + StartLargePayload: [GET,POST] http://localhost:7071/api/StartLargePayload + ``` + +5. Trigger the orchestration: + + ```bash + curl -X POST http://localhost:7071/api/StartLargePayload + ``` + + The response includes a `statusQueryGetUri` you can poll to check orchestration progress. + +## Deploy to Azure + +### 1. Create Azure resources + +If you don't already have them, create the required Azure resources: + +```bash +# Create a resource group +az group create --name my-rg --location + +# Create a Durable Task Scheduler and task hub +az durabletask scheduler create --name my-scheduler --resource-group my-rg --location --sku free +az durabletask taskhub create --scheduler-name my-scheduler --resource-group my-rg --name my-taskhub + +# Create a storage account +az storage account create --name mystorageaccount --resource-group my-rg --location --sku Standard_LRS + +# Create a function app (.NET 8 isolated, Linux) +az functionapp create \ + --name my-func-app \ + --resource-group my-rg \ + --storage-account mystorageaccount \ + --consumption-plan-location \ + --runtime dotnet-isolated \ + --runtime-version 8.0 \ + --os-type Linux \ + --functions-version 4 +``` + +### 2. Configure identity-based authentication + +The Durable Task Scheduler **requires** identity-based authentication (managed identity). You can use either system-assigned or user-assigned managed identity. + +#### Option A: System-assigned managed identity + +```bash +# Enable system-assigned managed identity +az functionapp identity assign --name my-func-app --resource-group my-rg + +# Get the principal ID +PRINCIPAL_ID=$(az functionapp identity show --name my-func-app --resource-group my-rg --query principalId -o tsv) + +# Grant "Durable Task Data Contributor" role on the scheduler +SCHEDULER_ID=$(az durabletask scheduler show --name my-scheduler --resource-group my-rg --query id -o tsv) +az role assignment create --assignee $PRINCIPAL_ID --role "Durable Task Data Contributor" --scope $SCHEDULER_ID + +# Grant "Storage Blob Data Contributor" role on the storage account (for large payload blobs) +STORAGE_ID=$(az storage account show --name mystorageaccount --resource-group my-rg --query id -o tsv) +az role assignment create --assignee $PRINCIPAL_ID --role "Storage Blob Data Contributor" --scope $STORAGE_ID +``` + +Configure app settings for system-assigned identity: + +```bash +SCHEDULER_ENDPOINT=$(az durabletask scheduler show --name my-scheduler --resource-group my-rg --query endpoint -o tsv) + +az functionapp config appsettings set --name my-func-app --resource-group my-rg --settings \ + "DURABLE_TASK_SCHEDULER_CONNECTION_STRING=Endpoint=${SCHEDULER_ENDPOINT};TaskHub=my-taskhub;Authentication=ManagedIdentity" \ + "AzureWebJobsStorage__accountName=mystorageaccount" \ + "FUNCTIONS_WORKER_RUNTIME=dotnet-isolated" \ + "TASKHUB_NAME=my-taskhub" +``` + +> **Note:** For system-assigned identity, you only need `AzureWebJobsStorage__accountName`. No `__credential` or `__clientId` is required — the SDK uses `DefaultAzureCredential` automatically. + +#### Option B: User-assigned managed identity + +```bash +# Create a user-assigned identity +az identity create --name my-identity --resource-group my-rg + +IDENTITY_CLIENT_ID=$(az identity show --name my-identity --resource-group my-rg --query clientId -o tsv) +IDENTITY_PRINCIPAL_ID=$(az identity show --name my-identity --resource-group my-rg --query principalId -o tsv) +IDENTITY_ID=$(az identity show --name my-identity --resource-group my-rg --query id -o tsv) + +# Assign the identity to the function app +az functionapp identity assign --name my-func-app --resource-group my-rg --identities $IDENTITY_ID + +# Grant roles (same as above, using IDENTITY_PRINCIPAL_ID) +az role assignment create --assignee $IDENTITY_PRINCIPAL_ID --role "Durable Task Data Contributor" --scope $SCHEDULER_ID +az role assignment create --assignee $IDENTITY_PRINCIPAL_ID --role "Storage Blob Data Contributor" --scope $STORAGE_ID +``` + +Configure app settings for user-assigned identity: + +```bash +az functionapp config appsettings set --name my-func-app --resource-group my-rg --settings \ + "DURABLE_TASK_SCHEDULER_CONNECTION_STRING=Endpoint=${SCHEDULER_ENDPOINT};TaskHub=my-taskhub;Authentication=ManagedIdentity;ClientId=${IDENTITY_CLIENT_ID}" \ + "AzureWebJobsStorage__accountName=mystorageaccount" \ + "AzureWebJobsStorage__credential=managedidentity" \ + "AzureWebJobsStorage__clientId=${IDENTITY_CLIENT_ID}" \ + "FUNCTIONS_WORKER_RUNTIME=dotnet-isolated" \ + "TASKHUB_NAME=my-taskhub" +``` + +### 3. Deploy the function app + +```bash +func azure functionapp publish my-func-app +``` + +### 4. Test the deployment + +```bash +curl -X POST https://my-func-app.azurewebsites.net/api/StartLargePayload +``` + +Poll the `statusQueryGetUri` from the response to check completion. A successful result looks like: + +```json +{ + "runtimeStatus": "Completed", + "output": { + "ItemsProcessed": 5, + "TotalSizeKb": 500, + "IndividualSizes": [100, 100, 100, 100, 100] + } +} +``` + +### 5. Verify payload externalization + +Check that payloads were externalized to blob storage: + +```bash +az storage blob list \ + --account-name mystorageaccount \ + --container-name durabletask-payloads \ + --auth-mode login \ + --output table +``` + +You should see compressed blobs (typically ~450 bytes each for 100 KB payloads due to gzip compression of repetitive data). + +## Configuration options + +| App Setting | Description | Example | +|---|---|---| +| `PAYLOAD_SIZE_KB` | Size of each generated payload in KB | `100` | +| `ACTIVITY_COUNT` | Number of parallel activity invocations | `5` | + +## Next steps + +- [Durable Task Scheduler documentation](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) +- [Durable Functions overview](https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-overview) +- [Configure identity-based authentication](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/develop-with-durable-task-scheduler) diff --git a/samples/durable-functions/dotnet/LargePayload/host.json b/samples/durable-functions/dotnet/LargePayload/host.json new file mode 100644 index 0000000..0ef0311 --- /dev/null +++ b/samples/durable-functions/dotnet/LargePayload/host.json @@ -0,0 +1,29 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request" + }, + "enableLiveMetricsFilters": true, + "logLevel": { + "Host.Triggers.DurableTask": "Information" + } + }, + "logLevel": { + "Function": "Information" + } + }, + "extensions": { + "durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING", + "largePayloadStorageEnabled": true, + "largePayloadStorageThresholdBytes": 10240 + }, + "hubName": "%TASKHUB_NAME%" + } + } +} diff --git a/samples/durable-functions/java/HelloCities/target/classes/com/example/Functions.class b/samples/durable-functions/java/HelloCities/target/classes/com/example/Functions.class new file mode 100644 index 0000000000000000000000000000000000000000..1d8aa10d53ebc17c753a6c09a25d5a1b5186e1d7 GIT binary patch literal 5044 zcmc&%`*##)8GgQGcV|f!h)V-3RvIW^LPA$5Ld{|@mxiQnHjzz(3spPpek4OCGwIB1 zAb6?R+Ip=w)LOM#FU24A7bCgTT9{&^Z`DSLbvzH|F$T^;z%+Abvf8XbQ-*5i; zpZo6vIDXpDa4Tw*qt{lS*WgDkG^!bNDjmEg7!B zzISgttRYzv8*8I&*dEV*;`BGK7-i&lz zeNpC`Bq~)$dEn{pav)t7o(M?erXekF+_pSR2B(O9%}kIo7F z?ZZ2LHeb*Af^AfCBmbW;G8d|rXROEt!!?++QOmMD-A9JNnQaoIZ!mA*si3b&RyDq+ zn^h)eXR)!mha0WU=??il=`@%<{qSZL80H;gaZx(78ndX4h;`&k0vZ)B*%gBC_%`~K zk{O>_>6n`>&a=9D)mzHTHE9ZbW*hlP8b&E)JH|~vgT6Rb?iel17D_X-&lg*DJuFtI zt<|bWUAsbo5W8J?5=S*0p<>lw^{#nXy`i2Ij^X3RymRhYPHK_-@X%T5HbU z!%Y>9*j1xV;N&)_YcmH0&NT@|OXfSU%_!vDFZ;7m;KUZHT53gPb{SzkrYmX}v}9jk zZ{vbkU#%^Pi6&F+QxzTgZKkJMaN!ndfQi6Vb3LP3Q3!6}uWyAmAxt)T8r9T3zYV(U zqpGv)-AH0g07*;;B#Zj`lr&8{iAhe+d3$-?)-ctunFADe<2mFte2xIwyq7i$Duo#o z1XA47<|z07n&GVrTx`=mk@^KEf+)hrP2F`{)@C#p%7I)L=5Rs7=LHUK?P>}|aOTGK z>$*YYKgP*Z&wXe<&6CVR3K#JOo+eiGWyv~~b?>6#E%8oG0*|BqnXNXCxfyR!s)hLpL37QnzNS>N;)l6^Bp5 zt0dSy`y)IJ03zvXUA&1j`M==2>KSG>Z@6m7TzVvmqCFZWeuP`XV4=D94L6Bp=4MPf zreSGV5!mAw-FIZvarAZNO&3h;=*B7>4P5HG>MV+yf(z!gQ?D>-z0ul-s2RAVWYT>%VY8&W z1-|po%7js(Y z0U&EP*`)34HX?1SG(ts3S`^iLa-*Ev_e5L$!NIV+JfpAr-w7ICVRhVa1`3RB;dJ{k zTbpLJ37h7p%1_j|;0dCuXjh%GoHW!|%C3zspQltjO#hf|doE8Z;Th2F!Y}Yk4R109 z^@t5?&0dz-An;)HOzLGqXAp3qKSfpOIK;Wy$vF-rc*oC3UNt^t`O8g+`0@9U{3~xd&=oQP z-TVVm0Rw+`@`+`{Zp6^z^L_+sIdt>w9{xY(q8EGlqQ8Sr`)F{#iwq82hLHFu9>>Qd z4&n*Y`uI=UA>W5vWQ5AhfwPg67lel#gX zKgWN1{8``Qx2aZTb1pqPGc=eUPd}SJe-G0!+{Nq%IDa3{0|BAbM7UfdywF7Wk|L;J zg94~9fy1=&B=dHJK^;YbUka)qeSz(bpy%k-m+4+7j~Fj9K$U!bt0G-_1Vt)uRiu;j z{%I!s)FUpE#MKSxEjAac(!N;Y7M)2lZMT#E!1+>V`q^SBmM#@b@$?JDQX;)nENSU$ z#hJguEWHP-FjN}W-ann_JRQG-zKqrzKRFyb!5U{`Va4vj?!d)YC8`TX8QUU*Wc^7bfZ-eXmf0Q z_yZh(7Nsbn#$Ck@g4t33rhBFFJ#QegFUf literal 0 HcmV?d00001 diff --git a/samples/durable-functions/python/large-payload/.gitignore b/samples/durable-functions/python/large-payload/.gitignore new file mode 100644 index 0000000..19a37b2 --- /dev/null +++ b/samples/durable-functions/python/large-payload/.gitignore @@ -0,0 +1,46 @@ +## Ignore Visual Studio temporary files, build results, and +## files generated by popular Visual Studio add-ons. + +.DS_Store + +# Azure Functions localsettings file +local.settings.json + +# User-specific files +*.suo +*.user +*.userosscache +*.sln.docstates + +# Build results +[Dd]ebug/ +[Dd]ebugPublic/ +[Rr]elease/ +[Rr]eleases/ +x64/ +x86/ +bld/ +[Bb]in/ +[Oo]bj/ + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.egg-info/ +.eggs/ +dist/ +build/ +*.egg +.venv/ +venv/ +.env + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Azure Functions +.python_packages/ diff --git a/samples/durable-functions/python/large-payload/README.md b/samples/durable-functions/python/large-payload/README.md new file mode 100644 index 0000000..ae0039b --- /dev/null +++ b/samples/durable-functions/python/large-payload/README.md @@ -0,0 +1,269 @@ + + +# Large Payload Support - Python Durable Functions + +This sample demonstrates how to use the **large payload storage** feature with Python Durable Functions and the [Durable Task Scheduler](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler). When enabled, payloads that exceed a configurable threshold are automatically offloaded to Azure Blob Storage (compressed via gzip), keeping orchestration history lean while supporting arbitrarily large data. + +The sample uses a **fan-out/fan-in** pattern: the orchestrator fans out to multiple activity functions, each generating a configurable-size payload (default 100 KB). The orchestrator then aggregates the results. + +## How large payload storage works + +The Durable Task Scheduler has a per-message size limit. When `largePayloadStorageEnabled` is set to `true` in `host.json`, any orchestration input, output, or activity result that exceeds `largePayloadStorageThresholdBytes` is: + +1. Compressed with gzip +2. Uploaded to a blob container (`durabletask-payloads` by default) in the storage account configured via `AzureWebJobsStorage` +3. Replaced in the orchestration history with a small reference pointer + +This happens transparently — no code changes are required. + +## Prerequisites + +- [Python 3.9+](https://www.python.org/downloads/) +- [Azure Functions Core Tools v4](https://learn.microsoft.com/azure/azure-functions/functions-run-local) +- [Azure CLI](https://learn.microsoft.com/cli/azure/install-azure-cli) +- A [Durable Task Scheduler](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) resource with a task hub +- An Azure Storage account (for large payload blob storage) +- [Docker](https://docs.docker.com/engine/install/) (optional, for local emulator) + +## Project structure + +``` +large-payload/ +├── function_app.py # Function definitions (orchestrator, activity, HTTP trigger) +├── host.json # Host configuration with large payload storage settings +├── requirements.txt # Python dependencies +└── README.md +``` + +## Key configuration in host.json + +```json +"durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING", + "largePayloadStorageEnabled": true, + "largePayloadStorageThresholdBytes": 10240 + }, + "hubName": "%TASKHUB_NAME%" +} +``` + +| Setting | Description | Default | +|---|---|---| +| `largePayloadStorageEnabled` | Enables large payload externalization to blob storage | `false` | +| `largePayloadStorageThresholdBytes` | Payloads larger than this (in bytes) are externalized | `10240` (10 KB) | + +## Running locally with the emulator + +1. Pull and run the Durable Task Scheduler emulator: + + ```bash + docker run -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest + ``` + +2. Start Azurite (local storage emulator): + + ```bash + azurite start + ``` + +3. Create a `local.settings.json` file: + + ```json + { + "IsEncrypted": false, + "Values": { + "AzureWebJobsStorage": "UseDevelopmentStorage=true", + "AzureWebJobsFeatureFlags": "EnableWorkerIndexing", + "FUNCTIONS_WORKER_RUNTIME": "python", + "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;Authentication=None", + "TASKHUB_NAME": "default", + "PAYLOAD_SIZE_KB": "100", + "ACTIVITY_COUNT": "5" + } + } + ``` + +4. Install Python dependencies: + + ```bash + pip install -r requirements.txt + ``` + +5. Start the function app: + + ```bash + func start + ``` + +6. Trigger the orchestration: + + ```bash + curl -X POST http://localhost:7071/api/startlargepayload + ``` + + The response includes a `statusQueryGetUri` you can poll to check orchestration progress. + +## Deploy to Azure + +### 1. Create Azure resources + +If you don't already have them, create the required Azure resources: + +```bash +# Create a resource group +az group create --name my-rg --location + +# Create a Durable Task Scheduler and task hub +az durabletask scheduler create --name my-scheduler --resource-group my-rg --location --sku free +az durabletask taskhub create --scheduler-name my-scheduler --resource-group my-rg --name my-taskhub + +# Create a storage account +az storage account create --name mystorageaccount --resource-group my-rg --location --sku Standard_LRS + +# Create a function app (Linux, Python 3.11) +az functionapp create \ + --name my-func-app \ + --resource-group my-rg \ + --storage-account mystorageaccount \ + --consumption-plan-location \ + --runtime python \ + --runtime-version 3.11 \ + --os-type Linux \ + --functions-version 4 +``` + +### 2. Configure identity-based authentication + +The Durable Task Scheduler **requires** identity-based authentication (managed identity). You can use either system-assigned or user-assigned managed identity. + +#### Option A: System-assigned managed identity + +```bash +# Enable system-assigned managed identity +az functionapp identity assign --name my-func-app --resource-group my-rg + +# Get the principal ID +PRINCIPAL_ID=$(az functionapp identity show --name my-func-app --resource-group my-rg --query principalId -o tsv) + +# Grant "Durable Task Data Contributor" role on the scheduler +SCHEDULER_ID=$(az durabletask scheduler show --name my-scheduler --resource-group my-rg --query id -o tsv) +az role assignment create --assignee $PRINCIPAL_ID --role "Durable Task Data Contributor" --scope $SCHEDULER_ID + +# Grant "Storage Blob Data Contributor" role on the storage account (for large payload blobs) +STORAGE_ID=$(az storage account show --name mystorageaccount --resource-group my-rg --query id -o tsv) +az role assignment create --assignee $PRINCIPAL_ID --role "Storage Blob Data Contributor" --scope $STORAGE_ID +``` + +Configure app settings for system-assigned identity: + +```bash +SCHEDULER_ENDPOINT=$(az durabletask scheduler show --name my-scheduler --resource-group my-rg --query endpoint -o tsv) + +az functionapp config appsettings set --name my-func-app --resource-group my-rg --settings \ + "DURABLE_TASK_SCHEDULER_CONNECTION_STRING=Endpoint=${SCHEDULER_ENDPOINT};TaskHub=my-taskhub;Authentication=ManagedIdentity" \ + "AzureWebJobsStorage__accountName=mystorageaccount" \ + "AzureWebJobsFeatureFlags=EnableWorkerIndexing" \ + "FUNCTIONS_WORKER_RUNTIME=python" \ + "TASKHUB_NAME=my-taskhub" +``` + +> **Note:** For system-assigned identity, you only need `AzureWebJobsStorage__accountName`. No `__credential` or `__clientId` is required — the SDK uses `DefaultAzureCredential` automatically. + +#### Option B: User-assigned managed identity + +```bash +# Create a user-assigned identity +az identity create --name my-identity --resource-group my-rg + +IDENTITY_CLIENT_ID=$(az identity show --name my-identity --resource-group my-rg --query clientId -o tsv) +IDENTITY_PRINCIPAL_ID=$(az identity show --name my-identity --resource-group my-rg --query principalId -o tsv) +IDENTITY_ID=$(az identity show --name my-identity --resource-group my-rg --query id -o tsv) + +# Assign the identity to the function app +az functionapp identity assign --name my-func-app --resource-group my-rg --identities $IDENTITY_ID + +# Grant roles (same as above, using IDENTITY_PRINCIPAL_ID) +az role assignment create --assignee $IDENTITY_PRINCIPAL_ID --role "Durable Task Data Contributor" --scope $SCHEDULER_ID +az role assignment create --assignee $IDENTITY_PRINCIPAL_ID --role "Storage Blob Data Contributor" --scope $STORAGE_ID +``` + +Configure app settings for user-assigned identity: + +```bash +az functionapp config appsettings set --name my-func-app --resource-group my-rg --settings \ + "DURABLE_TASK_SCHEDULER_CONNECTION_STRING=Endpoint=${SCHEDULER_ENDPOINT};TaskHub=my-taskhub;Authentication=ManagedIdentity;ClientId=${IDENTITY_CLIENT_ID}" \ + "AzureWebJobsStorage__accountName=mystorageaccount" \ + "AzureWebJobsStorage__credential=managedidentity" \ + "AzureWebJobsStorage__clientId=${IDENTITY_CLIENT_ID}" \ + "AzureWebJobsFeatureFlags=EnableWorkerIndexing" \ + "FUNCTIONS_WORKER_RUNTIME=python" \ + "TASKHUB_NAME=my-taskhub" +``` + +### 3. Deploy the function app + +```bash +func azure functionapp publish my-func-app +``` + +### 4. Test the deployment + +```bash +curl -X POST https://my-func-app.azurewebsites.net/api/startlargepayload +``` + +Poll the `statusQueryGetUri` from the response to check completion. A successful result looks like: + +```json +{ + "runtimeStatus": "Completed", + "output": { + "items_processed": 5, + "total_size_kb": 500, + "individual_sizes": [100, 100, 100, 100, 100] + } +} +``` + +### 5. Verify payload externalization + +Check that payloads were externalized to blob storage: + +```bash +az storage blob list \ + --account-name mystorageaccount \ + --container-name durabletask-payloads \ + --auth-mode login \ + --output table +``` + +You should see compressed blobs (typically ~450 bytes each for 100 KB payloads due to gzip compression of repetitive data). + +## Configuration options + +| App Setting | Description | Example | +|---|---|---| +| `PAYLOAD_SIZE_KB` | Size of each generated payload in KB | `100` | +| `ACTIVITY_COUNT` | Number of parallel activity invocations | `5` | + +## Next steps + +- [Durable Task Scheduler documentation](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) +- [Durable Functions overview](https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-overview) +- [Configure identity-based authentication](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/develop-with-durable-task-scheduler) diff --git a/samples/durable-functions/python/large-payload/function_app.py b/samples/durable-functions/python/large-payload/function_app.py new file mode 100644 index 0000000..7e2e32e --- /dev/null +++ b/samples/durable-functions/python/large-payload/function_app.py @@ -0,0 +1,105 @@ +""" +Large Payload Sample - Python Durable Functions with Durable Task Scheduler + +Demonstrates how to use the large payload storage feature to handle payloads +that exceed the Durable Task Scheduler's message size limit. When enabled, +payloads larger than the configured threshold are automatically offloaded to +Azure Blob Storage (compressed via gzip), keeping orchestration history lean +while supporting arbitrarily large data. + +This sample uses a fan-out/fan-in pattern: the orchestrator fans out to multiple +activity functions, each of which generates a large payload (configurable size). +The orchestrator then aggregates the results. +""" + +import json +import logging +import os +import azure.functions as func +import azure.durable_functions as df + +app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS) + +# Default payload size in KB (override via PAYLOAD_SIZE_KB app setting) +DEFAULT_PAYLOAD_SIZE_KB = 100 + +# Default number of parallel activities (override via ACTIVITY_COUNT app setting) +DEFAULT_ACTIVITY_COUNT = 5 + + +def generate_large_payload(size_kb: int) -> str: + """Generate a JSON payload of approximately the specified size in KB.""" + # Each character in the string is roughly 1 byte + target_bytes = size_kb * 1024 + filler = "x" * (target_bytes - 50) # Reserve space for JSON envelope + return json.dumps({"size_kb": size_kb, "data": filler}) + + +# --------------------------------------------------------------------------- +# HTTP Trigger – starts the orchestration +# --------------------------------------------------------------------------- +@app.route(route="startlargepayload", methods=["POST", "GET"]) +@app.durable_client_input(client_name="client") +async def start_large_payload(req: func.HttpRequest, client): + """HTTP trigger that starts the large-payload orchestration.""" + instance_id = await client.start_new("large_payload_orchestrator") + logging.info("Started orchestration with ID = '%s'.", instance_id) + return client.create_check_status_response(req, instance_id) + + +# --------------------------------------------------------------------------- +# Orchestrator – fans out to N parallel activities, each producing a large payload +# --------------------------------------------------------------------------- +@app.orchestration_trigger(context_name="context") +def large_payload_orchestrator(context: df.DurableOrchestrationContext): + """Fan-out/fan-in orchestrator that exercises large payload externalization.""" + activity_count = int(os.environ.get("ACTIVITY_COUNT", DEFAULT_ACTIVITY_COUNT)) + payload_size_kb = int(os.environ.get("PAYLOAD_SIZE_KB", DEFAULT_PAYLOAD_SIZE_KB)) + + # Fan-out: schedule N activities in parallel + tasks = [] + for i in range(activity_count): + tasks.append( + context.call_activity( + "process_large_data", + {"task_id": i, "payload_size_kb": payload_size_kb}, + ) + ) + + # Fan-in: wait for all activities to complete + results = yield context.task_all(tasks) + + # Aggregate results + total_size = sum(r["size_kb"] for r in results) + summary = { + "items_processed": len(results), + "total_size_kb": total_size, + "individual_sizes": [r["size_kb"] for r in results], + } + return summary + + +# --------------------------------------------------------------------------- +# Activity – generates and returns a large payload +# --------------------------------------------------------------------------- +@app.activity_trigger(input_name="input") +def process_large_data(input: dict) -> dict: + """Activity that generates a large payload of configurable size.""" + task_id = input["task_id"] + payload_size_kb = input["payload_size_kb"] + + logging.info("Task %d: generating %d KB payload...", task_id, payload_size_kb) + payload = generate_large_payload(payload_size_kb) + actual_size = len(payload.encode("utf-8")) + logging.info("Task %d: payload size = %d bytes", task_id, actual_size) + + return {"task_id": task_id, "size_kb": payload_size_kb, "payload": payload} + + +# --------------------------------------------------------------------------- +# Health-check endpoint +# --------------------------------------------------------------------------- +@app.route(route="hello", methods=["GET"]) +def hello(req: func.HttpRequest) -> func.HttpResponse: + """Simple health-check endpoint.""" + return func.HttpResponse("Hello from Large Payload Sample!") diff --git a/samples/durable-functions/python/large-payload/host.json b/samples/durable-functions/python/large-payload/host.json new file mode 100644 index 0000000..129a1ce --- /dev/null +++ b/samples/durable-functions/python/large-payload/host.json @@ -0,0 +1,29 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request" + } + }, + "logLevel": { + "Host.Triggers.DurableTask": "Information" + } + }, + "extensions": { + "durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING", + "largePayloadStorageEnabled": true, + "largePayloadStorageThresholdBytes": 10240 + }, + "hubName": "%TASKHUB_NAME%" + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", + "version": "[4.29.0, 5.0.0)" + } +} diff --git a/samples/durable-functions/python/large-payload/requirements.txt b/samples/durable-functions/python/large-payload/requirements.txt new file mode 100644 index 0000000..58ba02b --- /dev/null +++ b/samples/durable-functions/python/large-payload/requirements.txt @@ -0,0 +1,6 @@ +# DO NOT include azure-functions-worker in this file +# The Python Worker is managed by Azure Functions platform +# Manually managing azure-functions-worker may cause unexpected issues + +azure-functions +azure-functions-durable From 578da2be574273e7b3bbfd0893586a3634f3d1bd Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 19 Feb 2026 13:48:09 -0800 Subject: [PATCH 2/3] feedback --- .../LargePayload/LargePayloadOrchestration.cs | 8 ++--- .../dotnet/LargePayload/README.md | 2 +- .../java/HelloCities/.gitignore | 10 +++++++ .../classes/com/example/Functions.class | Bin 5044 -> 0 bytes .../python/large-payload/README.md | 2 +- .../python/large-payload/function_app.py | 28 +++++++++++++++--- .../python/large-payload/host.json | 2 +- 7 files changed, 41 insertions(+), 11 deletions(-) create mode 100644 samples/durable-functions/java/HelloCities/.gitignore delete mode 100644 samples/durable-functions/java/HelloCities/target/classes/com/example/Functions.class diff --git a/samples/durable-functions/dotnet/LargePayload/LargePayloadOrchestration.cs b/samples/durable-functions/dotnet/LargePayload/LargePayloadOrchestration.cs index 67e1824..b0f0028 100644 --- a/samples/durable-functions/dotnet/LargePayload/LargePayloadOrchestration.cs +++ b/samples/durable-functions/dotnet/LargePayload/LargePayloadOrchestration.cs @@ -45,7 +45,7 @@ public static async Task HttpStart( int payloadSizeKb = int.TryParse( Environment.GetEnvironmentVariable("PAYLOAD_SIZE_KB"), out int ps) ? ps : DefaultPayloadSizeKb; - var config = new OrchestratorConfig(activityCount, payloadSizeKb); + OrchestratorConfig config = new OrchestratorConfig(activityCount, payloadSizeKb); string instanceId = await client.ScheduleNewOrchestrationInstanceAsync( nameof(LargePayloadFanOutFanIn), config); @@ -75,7 +75,7 @@ public static async Task LargePayloadFanOutFanIn( activityCount, payloadSizeKb); // Fan-out: schedule N activities in parallel - var tasks = new List>(); + List> tasks = new List>(); for (int i = 0; i < activityCount; i++) { tasks.Add(context.CallActivityAsync( @@ -87,7 +87,7 @@ public static async Task LargePayloadFanOutFanIn( ActivityResult[] results = await Task.WhenAll(tasks); // Aggregate results - var summary = new PayloadSummary( + PayloadSummary summary = new PayloadSummary( ItemsProcessed: results.Length, TotalSizeKb: results.Sum(r => r.SizeKb), IndividualSizes: results.Select(r => r.SizeKb).ToArray()); @@ -129,7 +129,7 @@ public static ActivityResult ProcessLargeData( public static HttpResponseData Hello( [HttpTrigger(AuthorizationLevel.Anonymous, "get")] HttpRequestData req) { - var response = req.CreateResponse(System.Net.HttpStatusCode.OK); + HttpResponseData response = req.CreateResponse(System.Net.HttpStatusCode.OK); response.WriteString("Hello from Large Payload Sample!"); return response; } diff --git a/samples/durable-functions/dotnet/LargePayload/README.md b/samples/durable-functions/dotnet/LargePayload/README.md index 739698a..17812ef 100644 --- a/samples/durable-functions/dotnet/LargePayload/README.md +++ b/samples/durable-functions/dotnet/LargePayload/README.md @@ -145,7 +145,7 @@ If you don't already have them, create the required Azure resources: az group create --name my-rg --location # Create a Durable Task Scheduler and task hub -az durabletask scheduler create --name my-scheduler --resource-group my-rg --location --sku free +az durabletask scheduler create --name my-scheduler --resource-group my-rg --location --sku-name az durabletask taskhub create --scheduler-name my-scheduler --resource-group my-rg --name my-taskhub # Create a storage account diff --git a/samples/durable-functions/java/HelloCities/.gitignore b/samples/durable-functions/java/HelloCities/.gitignore new file mode 100644 index 0000000..90a4a03 --- /dev/null +++ b/samples/durable-functions/java/HelloCities/.gitignore @@ -0,0 +1,10 @@ +# Build output +target/ + +# IDE +.idea/ +*.iml +.vscode/ + +# OS +.DS_Store diff --git a/samples/durable-functions/java/HelloCities/target/classes/com/example/Functions.class b/samples/durable-functions/java/HelloCities/target/classes/com/example/Functions.class deleted file mode 100644 index 1d8aa10d53ebc17c753a6c09a25d5a1b5186e1d7..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 5044 zcmc&%`*##)8GgQGcV|f!h)V-3RvIW^LPA$5Ld{|@mxiQnHjzz(3spPpek4OCGwIB1 zAb6?R+Ip=w)LOM#FU24A7bCgTT9{&^Z`DSLbvzH|F$T^;z%+Abvf8XbQ-*5i; zpZo6vIDXpDa4Tw*qt{lS*WgDkG^!bNDjmEg7!B zzISgttRYzv8*8I&*dEV*;`BGK7-i&lz zeNpC`Bq~)$dEn{pav)t7o(M?erXekF+_pSR2B(O9%}kIo7F z?ZZ2LHeb*Af^AfCBmbW;G8d|rXROEt!!?++QOmMD-A9JNnQaoIZ!mA*si3b&RyDq+ zn^h)eXR)!mha0WU=??il=`@%<{qSZL80H;gaZx(78ndX4h;`&k0vZ)B*%gBC_%`~K zk{O>_>6n`>&a=9D)mzHTHE9ZbW*hlP8b&E)JH|~vgT6Rb?iel17D_X-&lg*DJuFtI zt<|bWUAsbo5W8J?5=S*0p<>lw^{#nXy`i2Ij^X3RymRhYPHK_-@X%T5HbU z!%Y>9*j1xV;N&)_YcmH0&NT@|OXfSU%_!vDFZ;7m;KUZHT53gPb{SzkrYmX}v}9jk zZ{vbkU#%^Pi6&F+QxzTgZKkJMaN!ndfQi6Vb3LP3Q3!6}uWyAmAxt)T8r9T3zYV(U zqpGv)-AH0g07*;;B#Zj`lr&8{iAhe+d3$-?)-ctunFADe<2mFte2xIwyq7i$Duo#o z1XA47<|z07n&GVrTx`=mk@^KEf+)hrP2F`{)@C#p%7I)L=5Rs7=LHUK?P>}|aOTGK z>$*YYKgP*Z&wXe<&6CVR3K#JOo+eiGWyv~~b?>6#E%8oG0*|BqnXNXCxfyR!s)hLpL37QnzNS>N;)l6^Bp5 zt0dSy`y)IJ03zvXUA&1j`M==2>KSG>Z@6m7TzVvmqCFZWeuP`XV4=D94L6Bp=4MPf zreSGV5!mAw-FIZvarAZNO&3h;=*B7>4P5HG>MV+yf(z!gQ?D>-z0ul-s2RAVWYT>%VY8&W z1-|po%7js(Y z0U&EP*`)34HX?1SG(ts3S`^iLa-*Ev_e5L$!NIV+JfpAr-w7ICVRhVa1`3RB;dJ{k zTbpLJ37h7p%1_j|;0dCuXjh%GoHW!|%C3zspQltjO#hf|doE8Z;Th2F!Y}Yk4R109 z^@t5?&0dz-An;)HOzLGqXAp3qKSfpOIK;Wy$vF-rc*oC3UNt^t`O8g+`0@9U{3~xd&=oQP z-TVVm0Rw+`@`+`{Zp6^z^L_+sIdt>w9{xY(q8EGlqQ8Sr`)F{#iwq82hLHFu9>>Qd z4&n*Y`uI=UA>W5vWQ5AhfwPg67lel#gX zKgWN1{8``Qx2aZTb1pqPGc=eUPd}SJe-G0!+{Nq%IDa3{0|BAbM7UfdywF7Wk|L;J zg94~9fy1=&B=dHJK^;YbUka)qeSz(bpy%k-m+4+7j~Fj9K$U!bt0G-_1Vt)uRiu;j z{%I!s)FUpE#MKSxEjAac(!N;Y7M)2lZMT#E!1+>V`q^SBmM#@b@$?JDQX;)nENSU$ z#hJguEWHP-FjN}W-ann_JRQG-zKqrzKRFyb!5U{`Va4vj?!d)YC8`TX8QUU*Wc^7bfZ-eXmf0Q z_yZh(7Nsbn#$Ck@g4t33rhBFFJ#QegFUf diff --git a/samples/durable-functions/python/large-payload/README.md b/samples/durable-functions/python/large-payload/README.md index ae0039b..03194f9 100644 --- a/samples/durable-functions/python/large-payload/README.md +++ b/samples/durable-functions/python/large-payload/README.md @@ -130,7 +130,7 @@ If you don't already have them, create the required Azure resources: az group create --name my-rg --location # Create a Durable Task Scheduler and task hub -az durabletask scheduler create --name my-scheduler --resource-group my-rg --location --sku free +az durabletask scheduler create --name my-scheduler --resource-group my-rg --location --sku-name az durabletask taskhub create --scheduler-name my-scheduler --resource-group my-rg --name my-taskhub # Create a storage account diff --git a/samples/durable-functions/python/large-payload/function_app.py b/samples/durable-functions/python/large-payload/function_app.py index 7e2e32e..bf4ce7e 100644 --- a/samples/durable-functions/python/large-payload/function_app.py +++ b/samples/durable-functions/python/large-payload/function_app.py @@ -31,7 +31,7 @@ def generate_large_payload(size_kb: int) -> str: """Generate a JSON payload of approximately the specified size in KB.""" # Each character in the string is roughly 1 byte target_bytes = size_kb * 1024 - filler = "x" * (target_bytes - 50) # Reserve space for JSON envelope + filler = "x" * max(0, target_bytes - 50) # Reserve space for JSON envelope return json.dumps({"size_kb": size_kb, "data": filler}) @@ -42,7 +42,24 @@ def generate_large_payload(size_kb: int) -> str: @app.durable_client_input(client_name="client") async def start_large_payload(req: func.HttpRequest, client): """HTTP trigger that starts the large-payload orchestration.""" - instance_id = await client.start_new("large_payload_orchestrator") + try: + body = req.get_json() + except ValueError: + body = {} + + activity_count = int( + req.params.get("activity_count") + or body.get("activity_count") + or DEFAULT_ACTIVITY_COUNT + ) + payload_size_kb = int( + req.params.get("payload_size_kb") + or body.get("payload_size_kb") + or DEFAULT_PAYLOAD_SIZE_KB + ) + + config = {"activity_count": activity_count, "payload_size_kb": payload_size_kb} + instance_id = await client.start_new("large_payload_orchestrator", client_input=config) logging.info("Started orchestration with ID = '%s'.", instance_id) return client.create_check_status_response(req, instance_id) @@ -53,8 +70,11 @@ async def start_large_payload(req: func.HttpRequest, client): @app.orchestration_trigger(context_name="context") def large_payload_orchestrator(context: df.DurableOrchestrationContext): """Fan-out/fan-in orchestrator that exercises large payload externalization.""" - activity_count = int(os.environ.get("ACTIVITY_COUNT", DEFAULT_ACTIVITY_COUNT)) - payload_size_kb = int(os.environ.get("PAYLOAD_SIZE_KB", DEFAULT_PAYLOAD_SIZE_KB)) + # Read config from orchestration input (set by the HTTP trigger) + # to avoid non-deterministic environment variable access in the orchestrator. + config = context.get_input() or {} + activity_count = config.get("activity_count", DEFAULT_ACTIVITY_COUNT) + payload_size_kb = config.get("payload_size_kb", DEFAULT_PAYLOAD_SIZE_KB) # Fan-out: schedule N activities in parallel tasks = [] diff --git a/samples/durable-functions/python/large-payload/host.json b/samples/durable-functions/python/large-payload/host.json index 129a1ce..f3ffbfc 100644 --- a/samples/durable-functions/python/large-payload/host.json +++ b/samples/durable-functions/python/large-payload/host.json @@ -23,7 +23,7 @@ } }, "extensionBundle": { - "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", + "id": "Microsoft.Azure.Functions.ExtensionBundle", "version": "[4.29.0, 5.0.0)" } } From 6437b724daf9bb0cbe8611a1796bd3e81321ae7c Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 19 Feb 2026 13:51:18 -0800 Subject: [PATCH 3/3] remove python sample --- .../python/large-payload/.gitignore | 46 --- .../python/large-payload/README.md | 269 ------------------ .../python/large-payload/function_app.py | 125 -------- .../python/large-payload/host.json | 29 -- .../python/large-payload/requirements.txt | 6 - 5 files changed, 475 deletions(-) delete mode 100644 samples/durable-functions/python/large-payload/.gitignore delete mode 100644 samples/durable-functions/python/large-payload/README.md delete mode 100644 samples/durable-functions/python/large-payload/function_app.py delete mode 100644 samples/durable-functions/python/large-payload/host.json delete mode 100644 samples/durable-functions/python/large-payload/requirements.txt diff --git a/samples/durable-functions/python/large-payload/.gitignore b/samples/durable-functions/python/large-payload/.gitignore deleted file mode 100644 index 19a37b2..0000000 --- a/samples/durable-functions/python/large-payload/.gitignore +++ /dev/null @@ -1,46 +0,0 @@ -## Ignore Visual Studio temporary files, build results, and -## files generated by popular Visual Studio add-ons. - -.DS_Store - -# Azure Functions localsettings file -local.settings.json - -# User-specific files -*.suo -*.user -*.userosscache -*.sln.docstates - -# Build results -[Dd]ebug/ -[Dd]ebugPublic/ -[Rr]elease/ -[Rr]eleases/ -x64/ -x86/ -bld/ -[Bb]in/ -[Oo]bj/ - -# Python -__pycache__/ -*.py[cod] -*$py.class -*.egg-info/ -.eggs/ -dist/ -build/ -*.egg -.venv/ -venv/ -.env - -# IDE -.idea/ -.vscode/ -*.swp -*.swo - -# Azure Functions -.python_packages/ diff --git a/samples/durable-functions/python/large-payload/README.md b/samples/durable-functions/python/large-payload/README.md deleted file mode 100644 index 03194f9..0000000 --- a/samples/durable-functions/python/large-payload/README.md +++ /dev/null @@ -1,269 +0,0 @@ - - -# Large Payload Support - Python Durable Functions - -This sample demonstrates how to use the **large payload storage** feature with Python Durable Functions and the [Durable Task Scheduler](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler). When enabled, payloads that exceed a configurable threshold are automatically offloaded to Azure Blob Storage (compressed via gzip), keeping orchestration history lean while supporting arbitrarily large data. - -The sample uses a **fan-out/fan-in** pattern: the orchestrator fans out to multiple activity functions, each generating a configurable-size payload (default 100 KB). The orchestrator then aggregates the results. - -## How large payload storage works - -The Durable Task Scheduler has a per-message size limit. When `largePayloadStorageEnabled` is set to `true` in `host.json`, any orchestration input, output, or activity result that exceeds `largePayloadStorageThresholdBytes` is: - -1. Compressed with gzip -2. Uploaded to a blob container (`durabletask-payloads` by default) in the storage account configured via `AzureWebJobsStorage` -3. Replaced in the orchestration history with a small reference pointer - -This happens transparently — no code changes are required. - -## Prerequisites - -- [Python 3.9+](https://www.python.org/downloads/) -- [Azure Functions Core Tools v4](https://learn.microsoft.com/azure/azure-functions/functions-run-local) -- [Azure CLI](https://learn.microsoft.com/cli/azure/install-azure-cli) -- A [Durable Task Scheduler](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) resource with a task hub -- An Azure Storage account (for large payload blob storage) -- [Docker](https://docs.docker.com/engine/install/) (optional, for local emulator) - -## Project structure - -``` -large-payload/ -├── function_app.py # Function definitions (orchestrator, activity, HTTP trigger) -├── host.json # Host configuration with large payload storage settings -├── requirements.txt # Python dependencies -└── README.md -``` - -## Key configuration in host.json - -```json -"durableTask": { - "storageProvider": { - "type": "azureManaged", - "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING", - "largePayloadStorageEnabled": true, - "largePayloadStorageThresholdBytes": 10240 - }, - "hubName": "%TASKHUB_NAME%" -} -``` - -| Setting | Description | Default | -|---|---|---| -| `largePayloadStorageEnabled` | Enables large payload externalization to blob storage | `false` | -| `largePayloadStorageThresholdBytes` | Payloads larger than this (in bytes) are externalized | `10240` (10 KB) | - -## Running locally with the emulator - -1. Pull and run the Durable Task Scheduler emulator: - - ```bash - docker run -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest - ``` - -2. Start Azurite (local storage emulator): - - ```bash - azurite start - ``` - -3. Create a `local.settings.json` file: - - ```json - { - "IsEncrypted": false, - "Values": { - "AzureWebJobsStorage": "UseDevelopmentStorage=true", - "AzureWebJobsFeatureFlags": "EnableWorkerIndexing", - "FUNCTIONS_WORKER_RUNTIME": "python", - "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;Authentication=None", - "TASKHUB_NAME": "default", - "PAYLOAD_SIZE_KB": "100", - "ACTIVITY_COUNT": "5" - } - } - ``` - -4. Install Python dependencies: - - ```bash - pip install -r requirements.txt - ``` - -5. Start the function app: - - ```bash - func start - ``` - -6. Trigger the orchestration: - - ```bash - curl -X POST http://localhost:7071/api/startlargepayload - ``` - - The response includes a `statusQueryGetUri` you can poll to check orchestration progress. - -## Deploy to Azure - -### 1. Create Azure resources - -If you don't already have them, create the required Azure resources: - -```bash -# Create a resource group -az group create --name my-rg --location - -# Create a Durable Task Scheduler and task hub -az durabletask scheduler create --name my-scheduler --resource-group my-rg --location --sku-name -az durabletask taskhub create --scheduler-name my-scheduler --resource-group my-rg --name my-taskhub - -# Create a storage account -az storage account create --name mystorageaccount --resource-group my-rg --location --sku Standard_LRS - -# Create a function app (Linux, Python 3.11) -az functionapp create \ - --name my-func-app \ - --resource-group my-rg \ - --storage-account mystorageaccount \ - --consumption-plan-location \ - --runtime python \ - --runtime-version 3.11 \ - --os-type Linux \ - --functions-version 4 -``` - -### 2. Configure identity-based authentication - -The Durable Task Scheduler **requires** identity-based authentication (managed identity). You can use either system-assigned or user-assigned managed identity. - -#### Option A: System-assigned managed identity - -```bash -# Enable system-assigned managed identity -az functionapp identity assign --name my-func-app --resource-group my-rg - -# Get the principal ID -PRINCIPAL_ID=$(az functionapp identity show --name my-func-app --resource-group my-rg --query principalId -o tsv) - -# Grant "Durable Task Data Contributor" role on the scheduler -SCHEDULER_ID=$(az durabletask scheduler show --name my-scheduler --resource-group my-rg --query id -o tsv) -az role assignment create --assignee $PRINCIPAL_ID --role "Durable Task Data Contributor" --scope $SCHEDULER_ID - -# Grant "Storage Blob Data Contributor" role on the storage account (for large payload blobs) -STORAGE_ID=$(az storage account show --name mystorageaccount --resource-group my-rg --query id -o tsv) -az role assignment create --assignee $PRINCIPAL_ID --role "Storage Blob Data Contributor" --scope $STORAGE_ID -``` - -Configure app settings for system-assigned identity: - -```bash -SCHEDULER_ENDPOINT=$(az durabletask scheduler show --name my-scheduler --resource-group my-rg --query endpoint -o tsv) - -az functionapp config appsettings set --name my-func-app --resource-group my-rg --settings \ - "DURABLE_TASK_SCHEDULER_CONNECTION_STRING=Endpoint=${SCHEDULER_ENDPOINT};TaskHub=my-taskhub;Authentication=ManagedIdentity" \ - "AzureWebJobsStorage__accountName=mystorageaccount" \ - "AzureWebJobsFeatureFlags=EnableWorkerIndexing" \ - "FUNCTIONS_WORKER_RUNTIME=python" \ - "TASKHUB_NAME=my-taskhub" -``` - -> **Note:** For system-assigned identity, you only need `AzureWebJobsStorage__accountName`. No `__credential` or `__clientId` is required — the SDK uses `DefaultAzureCredential` automatically. - -#### Option B: User-assigned managed identity - -```bash -# Create a user-assigned identity -az identity create --name my-identity --resource-group my-rg - -IDENTITY_CLIENT_ID=$(az identity show --name my-identity --resource-group my-rg --query clientId -o tsv) -IDENTITY_PRINCIPAL_ID=$(az identity show --name my-identity --resource-group my-rg --query principalId -o tsv) -IDENTITY_ID=$(az identity show --name my-identity --resource-group my-rg --query id -o tsv) - -# Assign the identity to the function app -az functionapp identity assign --name my-func-app --resource-group my-rg --identities $IDENTITY_ID - -# Grant roles (same as above, using IDENTITY_PRINCIPAL_ID) -az role assignment create --assignee $IDENTITY_PRINCIPAL_ID --role "Durable Task Data Contributor" --scope $SCHEDULER_ID -az role assignment create --assignee $IDENTITY_PRINCIPAL_ID --role "Storage Blob Data Contributor" --scope $STORAGE_ID -``` - -Configure app settings for user-assigned identity: - -```bash -az functionapp config appsettings set --name my-func-app --resource-group my-rg --settings \ - "DURABLE_TASK_SCHEDULER_CONNECTION_STRING=Endpoint=${SCHEDULER_ENDPOINT};TaskHub=my-taskhub;Authentication=ManagedIdentity;ClientId=${IDENTITY_CLIENT_ID}" \ - "AzureWebJobsStorage__accountName=mystorageaccount" \ - "AzureWebJobsStorage__credential=managedidentity" \ - "AzureWebJobsStorage__clientId=${IDENTITY_CLIENT_ID}" \ - "AzureWebJobsFeatureFlags=EnableWorkerIndexing" \ - "FUNCTIONS_WORKER_RUNTIME=python" \ - "TASKHUB_NAME=my-taskhub" -``` - -### 3. Deploy the function app - -```bash -func azure functionapp publish my-func-app -``` - -### 4. Test the deployment - -```bash -curl -X POST https://my-func-app.azurewebsites.net/api/startlargepayload -``` - -Poll the `statusQueryGetUri` from the response to check completion. A successful result looks like: - -```json -{ - "runtimeStatus": "Completed", - "output": { - "items_processed": 5, - "total_size_kb": 500, - "individual_sizes": [100, 100, 100, 100, 100] - } -} -``` - -### 5. Verify payload externalization - -Check that payloads were externalized to blob storage: - -```bash -az storage blob list \ - --account-name mystorageaccount \ - --container-name durabletask-payloads \ - --auth-mode login \ - --output table -``` - -You should see compressed blobs (typically ~450 bytes each for 100 KB payloads due to gzip compression of repetitive data). - -## Configuration options - -| App Setting | Description | Example | -|---|---|---| -| `PAYLOAD_SIZE_KB` | Size of each generated payload in KB | `100` | -| `ACTIVITY_COUNT` | Number of parallel activity invocations | `5` | - -## Next steps - -- [Durable Task Scheduler documentation](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/durable-task-scheduler) -- [Durable Functions overview](https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-overview) -- [Configure identity-based authentication](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler/develop-with-durable-task-scheduler) diff --git a/samples/durable-functions/python/large-payload/function_app.py b/samples/durable-functions/python/large-payload/function_app.py deleted file mode 100644 index bf4ce7e..0000000 --- a/samples/durable-functions/python/large-payload/function_app.py +++ /dev/null @@ -1,125 +0,0 @@ -""" -Large Payload Sample - Python Durable Functions with Durable Task Scheduler - -Demonstrates how to use the large payload storage feature to handle payloads -that exceed the Durable Task Scheduler's message size limit. When enabled, -payloads larger than the configured threshold are automatically offloaded to -Azure Blob Storage (compressed via gzip), keeping orchestration history lean -while supporting arbitrarily large data. - -This sample uses a fan-out/fan-in pattern: the orchestrator fans out to multiple -activity functions, each of which generates a large payload (configurable size). -The orchestrator then aggregates the results. -""" - -import json -import logging -import os -import azure.functions as func -import azure.durable_functions as df - -app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS) - -# Default payload size in KB (override via PAYLOAD_SIZE_KB app setting) -DEFAULT_PAYLOAD_SIZE_KB = 100 - -# Default number of parallel activities (override via ACTIVITY_COUNT app setting) -DEFAULT_ACTIVITY_COUNT = 5 - - -def generate_large_payload(size_kb: int) -> str: - """Generate a JSON payload of approximately the specified size in KB.""" - # Each character in the string is roughly 1 byte - target_bytes = size_kb * 1024 - filler = "x" * max(0, target_bytes - 50) # Reserve space for JSON envelope - return json.dumps({"size_kb": size_kb, "data": filler}) - - -# --------------------------------------------------------------------------- -# HTTP Trigger – starts the orchestration -# --------------------------------------------------------------------------- -@app.route(route="startlargepayload", methods=["POST", "GET"]) -@app.durable_client_input(client_name="client") -async def start_large_payload(req: func.HttpRequest, client): - """HTTP trigger that starts the large-payload orchestration.""" - try: - body = req.get_json() - except ValueError: - body = {} - - activity_count = int( - req.params.get("activity_count") - or body.get("activity_count") - or DEFAULT_ACTIVITY_COUNT - ) - payload_size_kb = int( - req.params.get("payload_size_kb") - or body.get("payload_size_kb") - or DEFAULT_PAYLOAD_SIZE_KB - ) - - config = {"activity_count": activity_count, "payload_size_kb": payload_size_kb} - instance_id = await client.start_new("large_payload_orchestrator", client_input=config) - logging.info("Started orchestration with ID = '%s'.", instance_id) - return client.create_check_status_response(req, instance_id) - - -# --------------------------------------------------------------------------- -# Orchestrator – fans out to N parallel activities, each producing a large payload -# --------------------------------------------------------------------------- -@app.orchestration_trigger(context_name="context") -def large_payload_orchestrator(context: df.DurableOrchestrationContext): - """Fan-out/fan-in orchestrator that exercises large payload externalization.""" - # Read config from orchestration input (set by the HTTP trigger) - # to avoid non-deterministic environment variable access in the orchestrator. - config = context.get_input() or {} - activity_count = config.get("activity_count", DEFAULT_ACTIVITY_COUNT) - payload_size_kb = config.get("payload_size_kb", DEFAULT_PAYLOAD_SIZE_KB) - - # Fan-out: schedule N activities in parallel - tasks = [] - for i in range(activity_count): - tasks.append( - context.call_activity( - "process_large_data", - {"task_id": i, "payload_size_kb": payload_size_kb}, - ) - ) - - # Fan-in: wait for all activities to complete - results = yield context.task_all(tasks) - - # Aggregate results - total_size = sum(r["size_kb"] for r in results) - summary = { - "items_processed": len(results), - "total_size_kb": total_size, - "individual_sizes": [r["size_kb"] for r in results], - } - return summary - - -# --------------------------------------------------------------------------- -# Activity – generates and returns a large payload -# --------------------------------------------------------------------------- -@app.activity_trigger(input_name="input") -def process_large_data(input: dict) -> dict: - """Activity that generates a large payload of configurable size.""" - task_id = input["task_id"] - payload_size_kb = input["payload_size_kb"] - - logging.info("Task %d: generating %d KB payload...", task_id, payload_size_kb) - payload = generate_large_payload(payload_size_kb) - actual_size = len(payload.encode("utf-8")) - logging.info("Task %d: payload size = %d bytes", task_id, actual_size) - - return {"task_id": task_id, "size_kb": payload_size_kb, "payload": payload} - - -# --------------------------------------------------------------------------- -# Health-check endpoint -# --------------------------------------------------------------------------- -@app.route(route="hello", methods=["GET"]) -def hello(req: func.HttpRequest) -> func.HttpResponse: - """Simple health-check endpoint.""" - return func.HttpResponse("Hello from Large Payload Sample!") diff --git a/samples/durable-functions/python/large-payload/host.json b/samples/durable-functions/python/large-payload/host.json deleted file mode 100644 index f3ffbfc..0000000 --- a/samples/durable-functions/python/large-payload/host.json +++ /dev/null @@ -1,29 +0,0 @@ -{ - "version": "2.0", - "logging": { - "applicationInsights": { - "samplingSettings": { - "isEnabled": true, - "excludedTypes": "Request" - } - }, - "logLevel": { - "Host.Triggers.DurableTask": "Information" - } - }, - "extensions": { - "durableTask": { - "storageProvider": { - "type": "azureManaged", - "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING", - "largePayloadStorageEnabled": true, - "largePayloadStorageThresholdBytes": 10240 - }, - "hubName": "%TASKHUB_NAME%" - } - }, - "extensionBundle": { - "id": "Microsoft.Azure.Functions.ExtensionBundle", - "version": "[4.29.0, 5.0.0)" - } -} diff --git a/samples/durable-functions/python/large-payload/requirements.txt b/samples/durable-functions/python/large-payload/requirements.txt deleted file mode 100644 index 58ba02b..0000000 --- a/samples/durable-functions/python/large-payload/requirements.txt +++ /dev/null @@ -1,6 +0,0 @@ -# DO NOT include azure-functions-worker in this file -# The Python Worker is managed by Azure Functions platform -# Manually managing azure-functions-worker may cause unexpected issues - -azure-functions -azure-functions-durable