diff --git a/Microsoft.DurableTask.sln b/Microsoft.DurableTask.sln index 0b05c418..9a00fb4b 100644 --- a/Microsoft.DurableTask.sln +++ b/Microsoft.DurableTask.sln @@ -127,6 +127,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ActivityVersioningSample", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EntityWithVersionedOrchestrationSample", "samples\EntityWithVersionedOrchestrationSample\EntityWithVersionedOrchestrationSample.csproj", "{8E0D27B3-2B5D-4B6F-A4E6-5C8E7B0F7DD2}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "UnversionedFallbackSample", "samples\UnversionedFallbackSample\UnversionedFallbackSample.csproj", "{1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -761,6 +763,18 @@ Global {8E0D27B3-2B5D-4B6F-A4E6-5C8E7B0F7DD2}.Release|x64.Build.0 = Release|Any CPU {8E0D27B3-2B5D-4B6F-A4E6-5C8E7B0F7DD2}.Release|x86.ActiveCfg = Release|Any CPU {8E0D27B3-2B5D-4B6F-A4E6-5C8E7B0F7DD2}.Release|x86.Build.0 = Release|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Debug|Any CPU.Build.0 = Debug|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Debug|x64.ActiveCfg = Debug|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Debug|x64.Build.0 = Debug|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Debug|x86.ActiveCfg = Debug|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Debug|x86.Build.0 = Debug|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Release|Any CPU.ActiveCfg = Release|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Release|Any CPU.Build.0 = Release|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Release|x64.ActiveCfg = Release|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Release|x64.Build.0 = Release|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Release|x86.ActiveCfg = Release|Any CPU + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -824,6 +838,7 @@ Global {1E30F09F-1ADA-4375-81CC-F0FBC74D5621} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17} {3FBCFDBA-F547-4FD5-B8C6-0B645EF73E3A} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17} {8E0D27B3-2B5D-4B6F-A4E6-5C8E7B0F7DD2} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17} + {1C0E65CE-1B36-48BB-B688-FA3AAFDE8A25} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {AB41CB55-35EA-4986-A522-387AB3402E71} diff --git a/README.md b/README.md index 84686d37..8090c181 100644 --- a/README.md +++ b/README.md @@ -196,7 +196,7 @@ Durable Task Scheduler provides durable execution in Azure. Durable execution is This SDK can also be used with the Durable Task Scheduler directly, without any Durable Functions dependency. For getting started, you can find documentation and samples [here](https://learn.microsoft.com/en-us/azure/azure-functions/durable/what-is-durable-task). -For runnable DTS emulator examples that demonstrate versioning, see the [WorkerVersioningSample](samples/WorkerVersioningSample/README.md) (deployment-based versioning), the [EternalOrchestrationVersionMigrationSample](samples/EternalOrchestrationVersionMigrationSample/README.md) (multi-version routing with `[DurableTask(Version = "...")]`), the [ActivityVersioningSample](samples/ActivityVersioningSample/README.md) (activity versioning with inherited defaults and explicit override support), and the [EntityWithVersionedOrchestrationSample](samples/EntityWithVersionedOrchestrationSample/README.md) (a single instance migrating v1→v2 via `ContinueAsNew(NewVersion)` while preserving entity-held state). +For runnable DTS emulator examples that demonstrate versioning, see the [WorkerVersioningSample](samples/WorkerVersioningSample/README.md) (deployment-based versioning), the [EternalOrchestrationVersionMigrationSample](samples/EternalOrchestrationVersionMigrationSample/README.md) (multi-version routing with `[DurableTask(Version = "...")]`), the [ActivityVersioningSample](samples/ActivityVersioningSample/README.md) (activity versioning with inherited defaults and explicit override support), the [EntityWithVersionedOrchestrationSample](samples/EntityWithVersionedOrchestrationSample/README.md) (a single instance migrating v1→v2 via `ContinueAsNew(NewVersion)` while preserving entity-held state), and the [UnversionedFallbackSample](samples/UnversionedFallbackSample/README.md) (an unversioned catch-all for unmatched explicit versions). ## Obtaining the Protobuf definitions diff --git a/samples/UnversionedFallbackSample/Program.cs b/samples/UnversionedFallbackSample/Program.cs new file mode 100644 index 00000000..e15680af --- /dev/null +++ b/samples/UnversionedFallbackSample/Program.cs @@ -0,0 +1,108 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +// This sample demonstrates opt-in unversioned fallback for per-task versioning. +// A worker can register one explicit legacy implementation for a known version +// and an unversioned implementation as the catch-all for versions that do not +// have an explicit [DurableTask(Version = "...")] registration. + +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Client.AzureManaged; +using Microsoft.DurableTask.Worker; +using Microsoft.DurableTask.Worker.AzureManaged; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +HostApplicationBuilder builder = Host.CreateApplicationBuilder(args); + +string connectionString = builder.Configuration.GetValue("DURABLE_TASK_SCHEDULER_CONNECTION_STRING") + ?? throw new InvalidOperationException( + "Set DURABLE_TASK_SCHEDULER_CONNECTION_STRING. " + + "For the local emulator: Endpoint=http://localhost:8080;TaskHub=default;Authentication=None"); + +builder.Services.AddDurableTaskWorker(wb => +{ + wb.AddTasks(tasks => tasks.AddAllGeneratedTasks()); + wb.UseVersioning(new DurableTaskWorkerOptions.VersioningOptions + { + UnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.WhenNoExactMatch, + }); + wb.UseWorkItemFilters(); + wb.UseDurableTaskScheduler(connectionString); +}); + +builder.Services.AddDurableTaskClient(cb => cb.UseDurableTaskScheduler(connectionString)); + +IHost host = builder.Build(); +await host.StartAsync(); + +await using DurableTaskClient client = host.Services.GetRequiredService(); + +Console.WriteLine("=== Unversioned fallback for versioned task dispatch ==="); +Console.WriteLine(); + +SupportRequest request = new("Contoso", "BGP session down"); + +Console.WriteLine("Scheduling SupportWorkflow version 1.4.0 ..."); +string legacyId = await client.ScheduleNewOrchestrationInstanceAsync( + nameof(SupportWorkflow), + request, + new StartOrchestrationOptions + { + Version = new TaskVersion("1.4.0"), + }); +OrchestrationMetadata legacy = await client.WaitForInstanceCompletionAsync(legacyId, getInputsAndOutputs: true); +Console.WriteLine($" Result: {legacy.ReadOutputAs()}"); +Console.WriteLine(); + +Console.WriteLine("Scheduling SupportWorkflow version 1.0 ..."); +string fallbackId = await client.ScheduleNewOrchestrationInstanceAsync( + nameof(SupportWorkflow), + request, + new StartOrchestrationOptions + { + Version = new TaskVersion("1.0"), + }); +OrchestrationMetadata fallback = await client.WaitForInstanceCompletionAsync(fallbackId, getInputsAndOutputs: true); +Console.WriteLine($" Result: {fallback.ReadOutputAs()}"); +Console.WriteLine(); + +Console.WriteLine("Done! Version 1.4.0 used the explicit legacy class; version 1.0 used the unversioned fallback."); + +await host.StopAsync(); + +/// +/// The current implementation. With UnversionedFallback enabled, this unversioned registration handles every +/// requested SupportWorkflow version that does not have an exact explicit registration. +/// +[DurableTask(nameof(SupportWorkflow))] +public sealed class SupportWorkflow : TaskOrchestrator +{ + /// + public override Task RunAsync(TaskOrchestrationContext context, SupportRequest input) + { + return Task.FromResult( + $"Current SupportWorkflow handled version '{context.Version}' for {input.Customer}: {input.Issue}"); + } +} + +/// +/// A pinned legacy implementation for version 1.4.0. +/// +[DurableTask(nameof(SupportWorkflow), Version = "1.4.0")] +public sealed class SupportWorkflowLegacyV140 : TaskOrchestrator +{ + /// + public override Task RunAsync(TaskOrchestrationContext context, SupportRequest input) + { + return Task.FromResult( + $"Legacy SupportWorkflow 1.4.0 handled version '{context.Version}' for {input.Customer}: {input.Issue}"); + } +} + +/// +/// Request input for the support workflow. +/// +public sealed record SupportRequest(string Customer, string Issue); diff --git a/samples/UnversionedFallbackSample/README.md b/samples/UnversionedFallbackSample/README.md new file mode 100644 index 00000000..ad2dcbc8 --- /dev/null +++ b/samples/UnversionedFallbackSample/README.md @@ -0,0 +1,78 @@ +# Unversioned Fallback Sample + +This sample demonstrates opt-in unversioned fallback for per-task versioning. It shows how one explicit versioned class can coexist with an unversioned catch-all implementation for versions that do not have their own `[DurableTask(Version = "...")]` registration. + +## What it shows + +- `SupportWorkflowLegacyV140` is registered as `[DurableTask(nameof(SupportWorkflow), Version = "1.4.0")]`. +- `SupportWorkflow` is registered without a version and acts as the current catch-all implementation. +- The worker enables `DurableTaskWorkerOptions.VersioningOptions.UnversionedFallback = WhenNoExactMatch`. +- `UseWorkItemFilters()` is enabled, so the generated filter must allow unmatched versions to reach the worker. +- A version `1.4.0` request dispatches to the explicit legacy class. +- A version `1.0` request has no exact registration, so it dispatches to the unversioned fallback class. + +## Prerequisites + +- .NET 10.0 SDK +- [Docker](https://www.docker.com/get-started) + +## Running the Sample + +### 1. Start the DTS emulator + +```bash +docker run --name durabletask-emulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest +``` + +The emulator exposes the gRPC sidecar on port 8080 and the local dashboard on port 8082. After running the sample below, you can open the dashboard at to inspect the orchestrations and their versions. + +### 2. Set the connection string + +```bash +export DURABLE_TASK_SCHEDULER_CONNECTION_STRING="Endpoint=http://localhost:8080;TaskHub=default;Authentication=None" +``` + +PowerShell: + +```powershell +$env:DURABLE_TASK_SCHEDULER_CONNECTION_STRING = "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None" +``` + +### 3. Run the sample + +```bash +dotnet run +``` + +Expected output: + +```text +=== Unversioned fallback for versioned task dispatch === + +Scheduling SupportWorkflow version 1.4.0 ... + Result: Legacy SupportWorkflow 1.4.0 handled version '1.4.0' for Contoso: BGP session down + +Scheduling SupportWorkflow version 1.0 ... + Result: Current SupportWorkflow handled version '1.0' for Contoso: BGP session down + +Done! Version 1.4.0 used the explicit legacy class; version 1.0 used the unversioned fallback. +``` + +### 4. Clean up + +```bash +docker rm -f durabletask-emulator +``` + +## Key takeaways + +- Exact version matches always win. A `1.4.0` request dispatches to the `1.4.0` class, not the unversioned class. +- Unversioned fallback is opt-in. Without `WhenNoExactMatch`, a mixed unversioned plus versioned registration remains a closed set and unknown versions fail rather than falling back. +- Use this mode only when the unversioned implementation is compatible with the versions it may receive. Replaying existing histories against a different implementation can cause non-determinism or deserialization failures. +- `UseWorkItemFilters()` composes with this mode by allowing unmatched versions for logical names that have an unversioned catch-all registration. + +## See also + +- [EternalOrchestrationVersionMigrationSample](../EternalOrchestrationVersionMigrationSample/README.md) — multi-version orchestration dispatch and `ContinueAsNew(NewVersion = "...")` migration. +- [ActivityVersioningSample](../ActivityVersioningSample/README.md) — activity versioning with inherited defaults and explicit overrides. +- [WorkerVersioningSample](../WorkerVersioningSample/README.md) — worker-level deployment versioning via `UseVersioning()`. diff --git a/samples/UnversionedFallbackSample/UnversionedFallbackSample.csproj b/samples/UnversionedFallbackSample/UnversionedFallbackSample.csproj new file mode 100644 index 00000000..2f40b471 --- /dev/null +++ b/samples/UnversionedFallbackSample/UnversionedFallbackSample.csproj @@ -0,0 +1,25 @@ + + + + Exe + net10.0 + enable + + + + + + + + + + + + + + + + + diff --git a/src/Abstractions/TaskOptions.cs b/src/Abstractions/TaskOptions.cs index a2f400f2..78f63a14 100644 --- a/src/Abstractions/TaskOptions.cs +++ b/src/Abstractions/TaskOptions.cs @@ -64,7 +64,7 @@ public TaskOptions(TaskOptions options) /// When non-null (including ), the task is scheduled with the /// specified version explicitly. The worker dispatches to the registered (name, version) exactly; /// when no exact match exists, it falls back to an unversioned registration only when the name has no - /// versioned registrations at all. + /// versioned registrations at all, unless unversioned fallback is explicitly enabled on the worker. /// /// public TaskVersion? Version { get; init; } diff --git a/src/Worker/Core/DependencyInjection/DefaultDurableTaskWorkerBuilder.cs b/src/Worker/Core/DependencyInjection/DefaultDurableTaskWorkerBuilder.cs index 5e966992..28a024af 100644 --- a/src/Worker/Core/DependencyInjection/DefaultDurableTaskWorkerBuilder.cs +++ b/src/Worker/Core/DependencyInjection/DefaultDurableTaskWorkerBuilder.cs @@ -4,6 +4,7 @@ using Microsoft.DurableTask.Worker.Hosting; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; namespace Microsoft.DurableTask.Worker; @@ -54,10 +55,20 @@ public IHostedService Build(IServiceProvider serviceProvider) Verify.NotNull(this.buildTarget, error); DurableTaskRegistry registry = serviceProvider.GetOptions(this.Name); + DurableTaskWorkerOptions workerOptions = serviceProvider.GetOptions(this.Name); + if (workerOptions.Versioning?.UnversionedFallback + == DurableTaskWorkerOptions.UnversionedFallbackMode.WhenNoExactMatch) + { + ILoggerFactory? loggerFactory = serviceProvider.GetService(); + if (loggerFactory is not null) + { + Logs.CreateWorkerLogger(loggerFactory).UnversionedFallbackEnabled(this.Name); + } + } // Note: Modifying any logic in this section could introduce breaking changes. // Do not alter the input parameter. return (IHostedService)ActivatorUtilities.CreateInstance( - serviceProvider, this.buildTarget, this.Name, registry.BuildFactory()); + serviceProvider, this.buildTarget, this.Name, registry.BuildFactory(workerOptions)); } } diff --git a/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs b/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs index a9274078..3ca86560 100644 --- a/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs +++ b/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs @@ -106,6 +106,7 @@ public static IDurableTaskWorkerBuilder UseVersioning(this IDurableTaskWorkerBui DefaultVersion = versionOptions.DefaultVersion, MatchStrategy = versionOptions.MatchStrategy, FailureStrategy = versionOptions.FailureStrategy, + UnversionedFallback = versionOptions.UnversionedFallback, }; }); return builder; diff --git a/src/Worker/Core/DurableTaskFactory.cs b/src/Worker/Core/DurableTaskFactory.cs index fcc97800..8ba2533d 100644 --- a/src/Worker/Core/DurableTaskFactory.cs +++ b/src/Worker/Core/DurableTaskFactory.cs @@ -16,6 +16,7 @@ sealed class DurableTaskFactory : IDurableTaskFactory2, IVersionedTaskFactory readonly IDictionary> entities; readonly HashSet versionedOrchestratorNames; readonly HashSet versionedActivityNames; + readonly bool useUnversionedFallback; /// /// Initializes a new instance of the class. @@ -23,19 +24,21 @@ sealed class DurableTaskFactory : IDurableTaskFactory2, IVersionedTaskFactory /// The activity factories. /// The orchestrator factories. /// The entity factories. + /// The unversioned fallback mode. internal DurableTaskFactory( IDictionary> activities, IDictionary> orchestrators, - IDictionary> entities) + IDictionary> entities, + DurableTaskWorkerOptions.UnversionedFallbackMode unversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.Never) { this.activities = Check.NotNull(activities); this.orchestrators = Check.NotNull(orchestrators); this.entities = Check.NotNull(entities); + this.useUnversionedFallback = unversionedFallback == DurableTaskWorkerOptions.UnversionedFallbackMode.WhenNoExactMatch; - // Snapshot the set of logical names that have at least one versioned registration. Used to gate the - // unversioned-fallback path: when a logical name has any versioned registration, we refuse to fall - // back to its unversioned registration for an unmatched versioned request — that would silently - // route the call to a different implementation than the caller asked for. + // Snapshot the set of logical names that have at least one versioned registration. By default, this gates + // unversioned fallback so a mixed versioned/unversioned name remains a closed set. Workers can opt in to + // allowing the unversioned registration to handle unmatched versions. this.versionedOrchestratorNames = new HashSet( this.orchestrators.Keys .Where(k => !string.IsNullOrWhiteSpace(k.Version)) @@ -63,12 +66,11 @@ public bool TryCreateActivity( return true; } - // Unversioned registrations remain the compatibility fallback for a versioned request, but ONLY when - // no versioned registration exists for the same logical name. This mirrors the orchestrator rule: - // once a name has any versioned registration, an unmatched versioned request returns "not found" - // rather than silently routing to a catch-all the caller did not ask for. + // Unversioned registrations remain the compatibility fallback for a versioned request when no versioned + // registration exists for the same logical name. Workers can also opt in to treating the unversioned + // registration as a catch-all for unmatched versions. if (!string.IsNullOrWhiteSpace(version.Version) - && !this.versionedActivityNames.Contains(name.Name) + && (this.useUnversionedFallback || !this.versionedActivityNames.Contains(name.Name)) && this.activities.TryGetValue(new TaskVersionKey(name, default(TaskVersion)), out factory)) { activity = factory.Invoke(serviceProvider); @@ -99,12 +101,11 @@ public bool TryCreateOrchestrator( return true; } - // Unversioned registrations remain the compatibility fallback for a versioned request, but ONLY when - // no versioned registration exists for the same logical name. If any versioned registration is present - // (e.g., v1 and v2 are registered, request asks for v3), we refuse to silently route the call to a - // catch-all registration the caller did not ask for. + // Unversioned registrations remain the compatibility fallback for a versioned request when no versioned + // registration exists for the same logical name. Workers can also opt in to treating the unversioned + // registration as a catch-all for unmatched versions. if (!string.IsNullOrWhiteSpace(version.Version) - && !this.versionedOrchestratorNames.Contains(name.Name) + && (this.useUnversionedFallback || !this.versionedOrchestratorNames.Contains(name.Name)) && this.orchestrators.TryGetValue(new TaskVersionKey(name, default(TaskVersion)), out factory)) { orchestrator = factory.Invoke(serviceProvider); diff --git a/src/Worker/Core/DurableTaskRegistryExtensions.cs b/src/Worker/Core/DurableTaskRegistryExtensions.cs index 5d185394..799e0af4 100644 --- a/src/Worker/Core/DurableTaskRegistryExtensions.cs +++ b/src/Worker/Core/DurableTaskRegistryExtensions.cs @@ -14,8 +14,23 @@ static class DurableTaskRegistryExtensions /// The registry to build. /// The built factory. public static IDurableTaskFactory BuildFactory(this DurableTaskRegistry registry) + => registry.BuildFactory(null); + + /// + /// Builds a into a . + /// + /// The registry to build. + /// The worker options to use when building the factory. + /// The built factory. + public static IDurableTaskFactory BuildFactory(this DurableTaskRegistry registry, DurableTaskWorkerOptions? workerOptions) { Check.NotNull(registry); - return new DurableTaskFactory(registry.ActivitiesByVersion, registry.OrchestratorsByVersion, registry.Entities); + DurableTaskWorkerOptions.UnversionedFallbackMode unversionedFallback = + workerOptions?.Versioning?.UnversionedFallback ?? DurableTaskWorkerOptions.UnversionedFallbackMode.Never; + return new DurableTaskFactory( + registry.ActivitiesByVersion, + registry.OrchestratorsByVersion, + registry.Entities, + unversionedFallback); } } diff --git a/src/Worker/Core/DurableTaskWorkerOptions.cs b/src/Worker/Core/DurableTaskWorkerOptions.cs index 3aa4eea2..6383eb1c 100644 --- a/src/Worker/Core/DurableTaskWorkerOptions.cs +++ b/src/Worker/Core/DurableTaskWorkerOptions.cs @@ -49,6 +49,32 @@ public enum VersionFailureStrategy Fail = 1, } + /// + /// Controls when an unversioned task registration is used to serve a versioned request that has no exact + /// match. Only affects task names that have both an unversioned registration and at least one versioned + /// registration; otherwise the dispatch rules are unchanged. + /// + public enum UnversionedFallbackMode + { + /// + /// Never fall back to an unversioned registration as a catch-all for unmatched versioned requests. This + /// is the default closed-set behavior: once a task name has at least one versioned registration, a + /// request for a version that has no exact match returns "not found" rather than dispatching to the + /// unversioned registration. Unversioned requests are still served by the unversioned registration, and + /// versioned requests still fall back to the unversioned registration when the task name has no + /// versioned registrations at all. + /// + Never = 0, + + /// + /// Fall back to the unversioned registration when no exact versioned match exists. An exact versioned + /// match still wins; only unmatched versioned requests dispatch to the unversioned registration as a + /// catch-all implementation. Use only when the unversioned implementation is replay-compatible with + /// every version it may receive. + /// + WhenNoExactMatch = 1, + } + /// /// Gets or sets the data converter. Default value is . /// @@ -176,7 +202,6 @@ public DataConverter DataConverter /// internal bool DataConverterExplicitlySet { get; private set; } - /// /// Applies these option values to another. /// @@ -190,7 +215,9 @@ internal void ApplyTo(DurableTaskWorkerOptions other) other.MaximumTimerInterval = this.MaximumTimerInterval; other.EnableEntitySupport = this.EnableEntitySupport; other.Versioning = this.Versioning; +#pragma warning disable CS0618 // Internal forwarding of the experimental OrchestrationFilter property. other.OrchestrationFilter = this.OrchestrationFilter; +#pragma warning restore CS0618 other.Logging.UseLegacyCategories = this.Logging.UseLegacyCategories; } } @@ -243,6 +270,27 @@ public class VersioningOptions /// If the version matching strategy is set to , this value has no effect. /// public VersionFailureStrategy FailureStrategy { get; set; } = VersionFailureStrategy.Reject; + + /// + /// Gets or sets whether unversioned task registrations can be used when no exact version match exists. + /// + /// + /// + /// The default value, , preserves the closed-set behavior for + /// mixed unversioned and versioned registrations. In this mode, a request for an unknown version does not + /// fall back to the unversioned registration once any versioned registration exists for the same task name. + /// + /// + /// When set to , an exact versioned registration still + /// wins, but unmatched versioned requests can use the unversioned registration as a catch-all implementation. + /// + /// + /// WARNING: Only enable this mode when the unversioned implementation is compatible with every version it may + /// receive. Replaying an existing orchestration or activity history against a different implementation can + /// cause non-determinism or deserialization failures. + /// + /// + public UnversionedFallbackMode UnversionedFallback { get; set; } = UnversionedFallbackMode.Never; } /// diff --git a/src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs b/src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs index ccedc3b1..dc8bfccc 100644 --- a/src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs +++ b/src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs @@ -7,8 +7,8 @@ namespace Microsoft.DurableTask.Worker; /// A class that represents work item filters for a Durable Task Worker. These filters are passed to the backend /// and only work items matching the filters will be processed by the worker. If no filters are provided, /// the worker will process all work items. To opt-in to work item filtering, call -/// on the worker builder with either -/// explicit filters or auto-generated filters from the . +/// on the worker +/// builder with either explicit filters or auto-generated filters from the . /// public class DurableTaskWorkerWorkItemFilters { @@ -43,21 +43,23 @@ internal static DurableTaskWorkerWorkItemFilters FromDurableTaskRegistry(Durable workerOptions?.Versioning?.MatchStrategy == DurableTaskWorkerOptions.VersionMatchStrategy.Strict ? [workerOptions.Versioning.Version ?? string.Empty] : null; + bool useUnversionedFallback = + workerOptions?.Versioning?.UnversionedFallback == DurableTaskWorkerOptions.UnversionedFallbackMode.WhenNoExactMatch; // Orchestration filters group registrations by logical name and emit the concrete distinct // version set actually registered (treating null/unversioned as ""). Strict mode overrides - // this with the single configured worker version. For unversioned-only names (no versioned - // registration exists for the name), we emit an empty version list — the filter wildcard — - // so the backend can deliver versioned work items that the factory will then resolve via - // the documented unversioned fallback in DurableTaskFactory.TryCreateOrchestrator. When a - // name has at least one versioned registration, the factory refuses unversioned-fallback, - // so emitting the concrete version set prevents the backend from streaming work items the - // worker would then reject after the fact. + // this with the single configured worker version. When the factory can resolve unknown + // versions via an unversioned registration (unversioned-only names, or mixed names with + // opt-in unversioned fallback), we emit an empty version list — the filter wildcard — so the + // backend can deliver versioned work items the factory can handle. Otherwise, emitting the + // concrete version set prevents the backend from streaming work items the worker would then + // reject after the fact. List orchestrationFilters = registry.OrchestratorsByVersion .GroupBy(orchestration => orchestration.Key.Name, StringComparer.OrdinalIgnoreCase) .Select(group => { - IReadOnlyList versions = strictWorkerVersions ?? GetFilterVersions(group.Select(entry => entry.Key.Version)); + IReadOnlyList versions = + strictWorkerVersions ?? GetFilterVersions(group.Select(entry => entry.Key.Version), useUnversionedFallback); return new OrchestrationFilter { @@ -71,7 +73,8 @@ internal static DurableTaskWorkerWorkItemFilters FromDurableTaskRegistry(Durable .GroupBy(activity => activity.Key.Name, StringComparer.OrdinalIgnoreCase) .Select(group => { - IReadOnlyList versions = strictWorkerVersions ?? GetFilterVersions(group.Select(entry => entry.Key.Version)); + IReadOnlyList versions = + strictWorkerVersions ?? GetFilterVersions(group.Select(entry => entry.Key.Version), useUnversionedFallback); return new ActivityFilter { @@ -92,7 +95,7 @@ internal static DurableTaskWorkerWorkItemFilters FromDurableTaskRegistry(Durable }).ToList(), }; - static IReadOnlyList GetFilterVersions(IEnumerable versions) + static IReadOnlyList GetFilterVersions(IEnumerable versions, bool useUnversionedFallback) { // Normalize null to "" so an unversioned registration appears consistently. string[] normalized = versions @@ -105,7 +108,8 @@ static IReadOnlyList GetFilterVersions(IEnumerable versions) // versioned work items that the factory will resolve via unversioned fallback. Without // this, callers asking for a specific version would be filtered out at the backend even // though the worker can handle them. - if (normalized.Length == 1 && normalized[0].Length == 0) + if ((normalized.Length == 1 && normalized[0].Length == 0) + || (useUnversionedFallback && normalized.Contains(string.Empty, StringComparer.OrdinalIgnoreCase))) { return []; } diff --git a/src/Worker/Core/Logs.cs b/src/Worker/Core/Logs.cs index 81a65e25..6a38ea39 100644 --- a/src/Worker/Core/Logs.cs +++ b/src/Worker/Core/Logs.cs @@ -35,6 +35,12 @@ static partial class Logs [LoggerMessage(EventId = 605, Level = LogLevel.Information, Message = "'{Name}' activity of orchestration ID '{InstanceId}' failed.")] public static partial void ActivityFailed(this ILogger logger, Exception ex, string instanceId, string name); + [LoggerMessage( + EventId = 606, + Level = LogLevel.Warning, + Message = "Unversioned fallback mode is enabled for Durable Task worker '{workerName}'. Unmatched versioned orchestrations and activities may run on unversioned registrations; ensure those implementations are replay-compatible with every version they may receive. Replaying existing histories against a different implementation can cause non-determinism or deserialization failures.")] + public static partial void UnversionedFallbackEnabled(this ILogger logger, string workerName); + /// /// Creates a logger named "Microsoft.DurableTask.Worker" with an optional subcategory. /// diff --git a/test/Worker/Core.Tests/DependencyInjection/DefaultDurableTaskWorkerBuilderTests.cs b/test/Worker/Core.Tests/DependencyInjection/DefaultDurableTaskWorkerBuilderTests.cs index e510da34..97b52798 100644 --- a/test/Worker/Core.Tests/DependencyInjection/DefaultDurableTaskWorkerBuilderTests.cs +++ b/test/Worker/Core.Tests/DependencyInjection/DefaultDurableTaskWorkerBuilderTests.cs @@ -5,6 +5,7 @@ using Microsoft.DurableTask.Worker.Hosting; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace Microsoft.DurableTask.Worker.Tests; @@ -85,6 +86,36 @@ public void Build_Target_Built() target.Options.DataConverter.Should().BeSameAs(converter); } + [Fact] + public void Build_WithUnversionedFallback_LogsWarning() + { + // Arrange + CapturingLoggerFactory loggerFactory = new(); + ServiceCollection services = new(); + services.AddOptions(); + services.AddSingleton(loggerFactory); + DefaultDurableTaskWorkerBuilder builder = new("test", services) + { + BuildTarget = typeof(GoodBuildTarget), + }; + builder.UseVersioning(new DurableTaskWorkerOptions.VersioningOptions + { + UnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.WhenNoExactMatch, + }); + + // Act + builder.Build(services.BuildServiceProvider()); + + // Assert + loggerFactory.Logs.Should().Contain(log => + log.Level == LogLevel.Warning + && log.Message.Contains("unversioned", StringComparison.OrdinalIgnoreCase) + && log.Message.Contains("fallback", StringComparison.OrdinalIgnoreCase) + && log.Message.Contains("replay", StringComparison.OrdinalIgnoreCase) + && log.Message.Contains("non-determinism", StringComparison.OrdinalIgnoreCase) + && log.Message.Contains("deserialization", StringComparison.OrdinalIgnoreCase)); + } + class BadBuildTarget : BackgroundService { protected override Task ExecuteAsync(CancellationToken stoppingToken) @@ -130,4 +161,40 @@ class CustomDataConverter : DataConverter class GoodBuildTargetOptions : DurableTaskWorkerOptions { } + + sealed class CapturingLoggerFactory : ILoggerFactory + { + public List<(LogLevel Level, string Message)> Logs { get; } = []; + + public void AddProvider(ILoggerProvider provider) + { + } + + public ILogger CreateLogger(string categoryName) => new CapturingLogger(this.Logs); + + public void Dispose() + { + } + } + + sealed class CapturingLogger(List<(LogLevel Level, string Message)> logs) : ILogger + { + readonly List<(LogLevel Level, string Message)> logs = logs; + + public IDisposable? BeginScope(TState state) + where TState : notnull + => null; + + public bool IsEnabled(LogLevel logLevel) => true; + + public void Log( + LogLevel logLevel, + EventId eventId, + TState state, + Exception? exception, + Func formatter) + { + this.logs.Add((logLevel, formatter(state, exception))); + } + } } diff --git a/test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs b/test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs index 6ad2eac8..a9eb0737 100644 --- a/test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs +++ b/test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs @@ -365,6 +365,40 @@ public void WorkItemFilters_UnversionedAndVersionedOrchestrators_EmitConcreteVer actual.Orchestrations[0].Versions.Should().BeEquivalentTo([string.Empty, "v2"]); } + [Fact] + public void WorkItemFilters_UnversionedFallbackWithMixedOrchestrators_EmitsWildcardVersionList() + { + // Arrange + ServiceCollection services = new(); + services.AddDurableTaskWorker("test", builder => + { + builder.AddTasks(registry => + { + registry.AddOrchestrator(); + registry.AddOrchestrator(); + }); + builder.Configure(options => + { + options.Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + UnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.WhenNoExactMatch, + }; + }); + builder.UseWorkItemFilters(); + }); + + // Act + ServiceProvider provider = services.BuildServiceProvider(); + IOptionsMonitor filtersMonitor = + provider.GetRequiredService>(); + DurableTaskWorkerWorkItemFilters actual = filtersMonitor.Get("test"); + + // Assert + actual.Orchestrations.Should().ContainSingle(); + actual.Orchestrations[0].Name.Should().Be("FilterWorkflow"); + actual.Orchestrations[0].Versions.Should().BeEmpty(); + } + [Fact] public void WorkItemFilters_VersionedActivities_GroupVersionsByLogicalName() { @@ -421,6 +455,79 @@ public void WorkItemFilters_UnversionedAndVersionedActivities_EmitConcreteVersio actual.Activities[0].Versions.Should().BeEquivalentTo([string.Empty, "v2"]); } + [Fact] + public void WorkItemFilters_UnversionedFallbackWithMixedActivities_EmitsWildcardVersionList() + { + // Arrange + ServiceCollection services = new(); + services.AddDurableTaskWorker("test", builder => + { + builder.AddTasks(registry => + { + registry.AddActivity(); + registry.AddActivity(); + }); + builder.Configure(options => + { + options.Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + UnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.WhenNoExactMatch, + }; + }); + builder.UseWorkItemFilters(); + }); + + // Act + ServiceProvider provider = services.BuildServiceProvider(); + IOptionsMonitor filtersMonitor = + provider.GetRequiredService>(); + DurableTaskWorkerWorkItemFilters actual = filtersMonitor.Get("test"); + + // Assert + actual.Activities.Should().ContainSingle(); + actual.Activities[0].Name.Should().Be("FilterActivity"); + actual.Activities[0].Versions.Should().BeEmpty(); + } + + [Fact] + public void WorkItemFilters_UnversionedFallbackWithVersioningStrict_UsesConfiguredWorkerVersion() + { + // Arrange + ServiceCollection services = new(); + services.AddDurableTaskWorker("test", builder => + { + builder.AddTasks(registry => + { + registry.AddOrchestrator(); + registry.AddOrchestrator(); + registry.AddActivity(); + registry.AddActivity(); + }); + builder.Configure(options => + { + options.Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + Version = "1.0", + MatchStrategy = DurableTaskWorkerOptions.VersionMatchStrategy.Strict, + UnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.WhenNoExactMatch, + }; + }); + builder.UseWorkItemFilters(); + }); + + // Act + ServiceProvider provider = services.BuildServiceProvider(); + IOptionsMonitor filtersMonitor = + provider.GetRequiredService>(); + DurableTaskWorkerWorkItemFilters actual = filtersMonitor.Get("test"); + + // Assert + actual.Orchestrations.Should().ContainSingle(); + actual.Orchestrations[0].Versions.Should().BeEquivalentTo(["1.0"]); + actual.Activities.Should().ContainSingle(); + actual.Activities[0].Versions.Should().BeEquivalentTo(["1.0"]); + } + [Fact] public void WorkItemFilters_DefaultEmptyRegistry_ProducesEmptyFilters() { diff --git a/test/Worker/Core.Tests/DurableTaskFactoryActivityVersioningTests.cs b/test/Worker/Core.Tests/DurableTaskFactoryActivityVersioningTests.cs index dbef0947..ad6baa8b 100644 --- a/test/Worker/Core.Tests/DurableTaskFactoryActivityVersioningTests.cs +++ b/test/Worker/Core.Tests/DurableTaskFactoryActivityVersioningTests.cs @@ -130,6 +130,34 @@ public void TryCreateActivity_WithMixedRegistrations_DoesNotFallBackToUnversione activity.Should().BeNull(); } + [Fact] + public void TryCreateActivity_WithMixedRegistrationsAndUnversionedFallback_UsesUnversionedRegistrationForUnknownVersion() + { + // Arrange + DurableTaskRegistry registry = new(); + registry.AddActivity(); + registry.AddActivity(); + DurableTaskWorkerOptions workerOptions = new() + { + Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + UnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.WhenNoExactMatch, + }, + }; + IDurableTaskFactory factory = registry.BuildFactory(workerOptions); + + // Act + bool found = ((IVersionedTaskFactory)factory).TryCreateActivity( + new TaskName("InvoiceActivity"), + new TaskVersion("v2"), + Mock.Of(), + out ITaskActivity? activity); + + // Assert + found.Should().BeTrue(); + activity.Should().BeOfType(); + } + [DurableTask("InvoiceActivity", Version = "v1")] sealed class InvoiceActivityV1 : TaskActivity { diff --git a/test/Worker/Core.Tests/DurableTaskFactoryVersioningTests.cs b/test/Worker/Core.Tests/DurableTaskFactoryVersioningTests.cs index 5244607e..35150fa8 100644 --- a/test/Worker/Core.Tests/DurableTaskFactoryVersioningTests.cs +++ b/test/Worker/Core.Tests/DurableTaskFactoryVersioningTests.cs @@ -112,6 +112,35 @@ public void TryCreateOrchestrator_WithMixedRegistrations_DoesNotFallBackForUnkno orchestrator.Should().BeNull(); } + [Fact] + public void TryCreateOrchestrator_WithMixedRegistrationsAndUnversionedFallback_UsesUnversionedRegistrationForUnknownVersion() + { + // Arrange + DurableTaskRegistry registry = new(); + registry.AddOrchestrator(); + registry.AddOrchestrator(); + registry.AddOrchestrator(); + DurableTaskWorkerOptions workerOptions = new() + { + Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + UnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.WhenNoExactMatch, + }, + }; + IDurableTaskFactory factory = registry.BuildFactory(workerOptions); + + // Act + bool found = ((IVersionedTaskFactory)factory).TryCreateOrchestrator( + new TaskName("InvoiceWorkflow"), + new TaskVersion("v3"), + Mock.Of(), + out ITaskOrchestrator? orchestrator); + + // Assert + found.Should().BeTrue(); + orchestrator.Should().BeOfType(); + } + [Fact] public void TryCreateOrchestrator_WithOnlyUnversionedRegistration_FallsBackForVersionedRequest() { diff --git a/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerTests.cs b/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerTests.cs index 4c78aac0..8c527c2e 100644 --- a/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerTests.cs +++ b/test/Worker/Grpc.Tests/GrpcDurableTaskWorkerTests.cs @@ -28,6 +28,9 @@ public class GrpcDurableTaskWorkerTests static readonly MethodInfo ProcessorConnectAsyncMethod = typeof(GrpcDurableTaskWorker) .GetNestedType("Processor", BindingFlags.NonPublic)! .GetMethod("ConnectAsync", BindingFlags.Instance | BindingFlags.NonPublic)!; + static readonly MethodInfo ProcessorRunOrchestratorAsyncMethod = typeof(GrpcDurableTaskWorker) + .GetNestedType("Processor", BindingFlags.NonPublic)! + .GetMethod("OnRunOrchestratorAsync", BindingFlags.Instance | BindingFlags.NonPublic)!; static readonly MethodInfo TryRecreateChannelAsyncMethod = typeof(GrpcDurableTaskWorker) .GetMethod("TryRecreateChannelAsync", BindingFlags.Instance | BindingFlags.NonPublic)!; @@ -514,6 +517,71 @@ public void Constructor_StrictWorkerVersioningWithoutRegistryContents_DoesNotThr act.Should().NotThrow(); } + [Fact] + public async Task OnRunOrchestratorAsync_StrictVersioningWithUnversionedFallback_RejectsMismatchBeforeFactoryDispatch() + { + // Arrange + const string orchestrationName = "StrictFallbackWorkflow"; + string completionToken = Guid.NewGuid().ToString("N"); + DurableTaskWorkerOptions workerOptions = new() + { + Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + Version = "1.0", + MatchStrategy = DurableTaskWorkerOptions.VersionMatchStrategy.Strict, + FailureStrategy = DurableTaskWorkerOptions.VersionFailureStrategy.Reject, + UnversionedFallback = DurableTaskWorkerOptions.UnversionedFallbackMode.WhenNoExactMatch, + }, + Logging = { UseLegacyCategories = false }, + }; + Mock factoryMock = new(MockBehavior.Strict); + GrpcDurableTaskWorker worker = CreateWorker( + new GrpcDurableTaskWorkerOptions(), + workerOptions, + NullLoggerFactory.Instance, + factoryMock.Object); + Mock clientMock = new( + MockBehavior.Strict, + Mock.Of()); + TaskCompletionSource abandonRequest = new( + TaskCreationOptions.RunContinuationsAsynchronously); + clientMock + .Setup(c => c.AbandonTaskOrchestratorWorkItemAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns((P.AbandonOrchestrationTaskRequest request, Metadata _, DateTime? _, CancellationToken _) => + { + abandonRequest.SetResult(request); + return CreateUnaryCall(Task.FromResult(new P.AbandonOrchestrationTaskResponse())); + }); + object processor = CreateProcessor(worker, clientMock.Object); + P.OrchestratorRequest request = CreateOrchestratorRequest( + orchestrationName, + instanceVersion: "2.0"); + + // Act + await InvokeProcessorRunOrchestratorAsync(processor, request, completionToken); + + // Assert + P.AbandonOrchestrationTaskRequest actualRequest = await abandonRequest.Task.WaitAsync(TimeSpan.FromSeconds(5)); + actualRequest.CompletionToken.Should().Be(completionToken); + factoryMock.Verify( + f => f.TryCreateOrchestrator( + It.IsAny(), + It.IsAny(), + out It.Ref.IsAny), + Times.Never); + clientMock.Verify( + c => c.CompleteOrchestratorTaskAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny()), + Times.Never); + } + [Fact] public void Constructor_NoWorkerVersioningWithoutRegistryContents_DoesNotThrow() { @@ -606,6 +674,17 @@ static object CreateProcessor(GrpcDurableTaskWorker worker, P.TaskHubSidecarServ return (AsyncServerStreamingCall)task.GetType().GetProperty("Result")!.GetValue(task)!; } + static async Task InvokeProcessorRunOrchestratorAsync( + object processor, + P.OrchestratorRequest request, + string completionToken) + { + Task task = (Task)ProcessorRunOrchestratorAsyncMethod.Invoke( + processor, + new object?[] { request, completionToken, CancellationToken.None })!; + await task; + } + static async Task InvokeProcessorExecuteAsync(object processor, CancellationToken cancellationToken) { Task task = (Task)ProcessorExecuteAsyncMethod.Invoke(processor, new object?[] { cancellationToken })!; @@ -613,6 +692,36 @@ static async Task InvokeProcessorExecuteAsync(object proces return (ProcessorExitReason)task.GetType().GetProperty("Result")!.GetValue(task)!; } + static P.OrchestratorRequest CreateOrchestratorRequest(string orchestrationName, string instanceVersion) + { + string instanceId = Guid.NewGuid().ToString("N"); + string executionId = Guid.NewGuid().ToString("N"); + return new P.OrchestratorRequest + { + InstanceId = instanceId, + ExecutionId = executionId, + NewEvents = + { + new P.HistoryEvent + { + EventId = -1, + Timestamp = Timestamp.FromDateTime(DateTime.UtcNow), + ExecutionStarted = new P.ExecutionStartedEvent + { + Name = orchestrationName, + Version = instanceVersion, + Input = "\"input\"", + OrchestrationInstance = new P.OrchestrationInstance + { + InstanceId = instanceId, + ExecutionId = executionId, + }, + }, + }, + }, + }; + } + static void InvokeApplySuccessfulRecreate( GrpcDurableTaskWorker worker, object result, diff --git a/test/Worker/Grpc.Tests/WorkItemStreamConsumerTests.cs b/test/Worker/Grpc.Tests/WorkItemStreamConsumerTests.cs index 2464c0c7..67fb2b21 100644 --- a/test/Worker/Grpc.Tests/WorkItemStreamConsumerTests.cs +++ b/test/Worker/Grpc.Tests/WorkItemStreamConsumerTests.cs @@ -142,9 +142,18 @@ public async Task PerItem_HeartbeatReset_KeepsTimerAlive() // Feed one item, wait long enough that the original timer would have expired, then complete. // Synchronize on the first item actually being processed so the second delay is measured from // the consumer's timer reset instead of from the test thread's write timing. + // + // Timings are sized to leave ~500ms of slack on both sides of the assertion so the test is + // robust to thread-pool scheduling jitter on loaded CI runners: + // - first delay (1000ms) + second delay (1500ms) = 2500ms total before 2nd item is written. + // - Without the per-item timer reset, the 2000ms original timer would have fired at 2000ms, + // leaving a ~500ms margin before the 2nd item is written (proves the test exercises the reset). + // - With the per-item reset, the new timer fires at first-delay + jitter + 2000ms, + // leaving ~500ms margin between the 2nd item write and the new timer expiry. Channel channel = Channel.CreateUnbounded(); - TimeSpan timeout = TimeSpan.FromMilliseconds(500); + TimeSpan timeout = TimeSpan.FromMilliseconds(2000); TaskCompletionSource firstItemProcessed = new(TaskCreationOptions.RunContinuationsAsynchronously); + TaskCompletionSource secondItemProcessed = new(TaskCreationOptions.RunContinuationsAsynchronously); int itemCount = 0; Task consumeTask = WorkItemStreamConsumer.ConsumeAsync( @@ -152,21 +161,31 @@ public async Task PerItem_HeartbeatReset_KeepsTimerAlive() silentDisconnectTimeout: timeout, onItem: _ => { - if (Interlocked.Increment(ref itemCount) == 1) + int seen = Interlocked.Increment(ref itemCount); + if (seen == 1) { firstItemProcessed.TrySetResult(); } + else if (seen == 2) + { + secondItemProcessed.TrySetResult(); + } }, onFirstMessage: null, cancellation: CancellationToken.None); - await Task.Delay(TimeSpan.FromMilliseconds(150)); + await Task.Delay(TimeSpan.FromMilliseconds(1000)); await channel.Writer.WriteAsync(new P.WorkItem { HealthPing = new P.HealthPing() }); - await firstItemProcessed.Task.WaitAsync(TimeSpan.FromSeconds(5)); + await firstItemProcessed.Task.WaitAsync(TimeSpan.FromSeconds(10)); // Without the per-item reset, the original timer would fire before this second item arrives. - await Task.Delay(TimeSpan.FromMilliseconds(400)); + await Task.Delay(TimeSpan.FromMilliseconds(1500)); await channel.Writer.WriteAsync(new P.WorkItem { HealthPing = new P.HealthPing() }); + + // Wait for the consumer to actually dequeue and process the 2nd item (which re-arms the timer) + // before completing the channel. Without this barrier, the test could observe a SilentDisconnect + // if the timer fires after the test writes the 2nd item but before the consumer dequeues it. + await secondItemProcessed.Task.WaitAsync(TimeSpan.FromSeconds(10)); channel.Writer.Complete(); WorkItemStreamResult result = await consumeTask;