From 86f5b72e0d88fd7ea2302c0f9cf7a2b93c1bd82b Mon Sep 17 00:00:00 2001 From: Hal Spang Date: Wed, 14 Jan 2026 17:11:39 -0800 Subject: [PATCH 1/2] Introduce WorkItemFilters into worker flow This change adds WorkItemFilters into the grpc worker. This includes builder methods to specify them and the connection into the GetWorkItems flow inside the worker processor. Signed-off-by: Hal Spang --- src/Grpc/orchestrator_service.proto | 21 ++ .../DurableTaskWorkerBuilderExtensions.cs | 28 +++ .../Core/DurableTaskWorkerWorkItemFilters.cs | 126 ++++++++++++ .../Grpc/GrpcDurableTaskWorker.Processor.cs | 2 + src/Worker/Grpc/GrpcDurableTaskWorker.cs | 6 +- ...rableTaskWorkerWorkItemFiltersExtension.cs | 52 +++++ .../UseWorkItemFiltersTests.cs | 189 ++++++++++++++++++ 7 files changed, 423 insertions(+), 1 deletion(-) create mode 100644 src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs create mode 100644 src/Worker/Grpc/Internal/DurableTaskWorkerWorkItemFiltersExtension.cs create mode 100644 test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index 8ef46a4a7..0c34d986d 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -822,6 +822,7 @@ message GetWorkItemsRequest { int32 maxConcurrentEntityWorkItems = 3; repeated WorkerCapability capabilities = 10; + WorkItemFilters workItemFilters = 11; } enum WorkerCapability { @@ -844,6 +845,26 @@ enum WorkerCapability { WORKER_CAPABILITY_LARGE_PAYLOADS = 3; } +message WorkItemFilters { + repeated OrchestrationFilter orchestrations = 1; + repeated ActivityFilter activities = 2; + repeated EntityFilter entities = 3; +} + +message OrchestrationFilter { + string name = 1; + repeated string versions = 2; +} + +message ActivityFilter { + string name = 1; + repeated string versions = 2; +} + +message EntityFilter { + string name = 1; +} + message WorkItem { oneof request { OrchestratorRequest orchestratorRequest = 1; diff --git a/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs b/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs index 3f349b710..0a83e9d52 100644 --- a/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs +++ b/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs @@ -137,4 +137,32 @@ public static IDurableTaskWorkerBuilder UseOrchestrationFilter(this IDurableTask builder.Services.AddSingleton(filter); return builder; } + + /// + /// Adds to the specified . + /// + /// The builder to set the builder target for. + /// The instance of a to use. + /// The same instance, allowing for method chaining. + /// If this is called without specified filters, the filters will be constructed from the registered orchestrations, activities, and entities. + public static IDurableTaskWorkerBuilder UseWorkItemFilters(this IDurableTaskWorkerBuilder builder, DurableTaskWorkerWorkItemFilters? workItemFilters = null) + { + Check.NotNull(builder); + if (workItemFilters != null) + { + builder.Services.AddSingleton(workItemFilters); + } + else + { + // Auto-generate the filters from registered orchestrations, activities, and entitites. + builder.Services.AddSingleton(provider => + { + DurableTaskRegistry registry = provider.GetRequiredService>().Get(builder.Name); + DurableTaskWorkerOptions? options = provider.GetOptions(builder.Name); + return new DurableTaskWorkerWorkItemFilters(registry, options); + }); + } + + return builder; + } } diff --git a/src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs b/src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs new file mode 100644 index 000000000..25fc14254 --- /dev/null +++ b/src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs @@ -0,0 +1,126 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Worker; + +/// +/// A class that represents work item filters for a Durable Task Worker. These filters are passed to the backend +/// and only work items matching the filters will be processed by the worker. If no filters are provided, +/// the worker will process all work items. +/// +public class DurableTaskWorkerWorkItemFilters +{ + /// + /// Initializes a new instance of the class. + /// + public DurableTaskWorkerWorkItemFilters() + { + this.Orchestrations = []; + this.Activities = []; + this.Entities = []; + } + + /// + /// Initializes a new instance of the class. + /// + /// to construct the filter from. + /// that optionally provides versioning information. + internal DurableTaskWorkerWorkItemFilters(DurableTaskRegistry registry, DurableTaskWorkerOptions? workerOptions) + { + List orchestrationActions = new(); + foreach (var orchestration in registry.Orchestrators) + { + orchestrationActions.Add(new OrchestrationFilter + { + Name = orchestration.Key, + + // TODO: Support multiple orchestration versions, for now, utilize the Worker's version. + Versions = workerOptions?.Versioning != null ? [workerOptions.Versioning.DefaultVersion] : [], + }); + } + + this.Orchestrations = orchestrationActions; + List activityActions = new(); + foreach (var activity in registry.Activities) + { + activityActions.Add(new ActivityFilter + { + Name = activity.Key, + + // TODO: Support multiple activity versions, for now, utilize the Worker's version. + Versions = workerOptions?.Versioning != null ? [workerOptions.Versioning.DefaultVersion] : [], + }); + } + + this.Activities = activityActions; + List entityActions = new(); + foreach (var entity in registry.Entities) + { + entityActions.Add(new EntityFilter + { + // Entity names are normalized to lowercase in the backend. + Name = entity.Key.ToString().ToLowerInvariant(), + }); + } + + this.Entities = entityActions; + } + + /// + /// Gets or initializes the orchestration filters. + /// + public IReadOnlyList Orchestrations { get; init; } + + /// + /// Gets or initializes the activity filters. + /// + public IReadOnlyList Activities { get; init; } + + /// + /// Gets or initializes the entity filters. + /// + public IReadOnlyList Entities { get; init; } + + /// + /// Struct specifying an orchestration filter. + /// + public struct OrchestrationFilter + { + /// + /// Gets or initializes the name of the orchestration to filter. + /// + public string Name { get; init; } + + /// + /// Gets or initializes the versions of the orchestration to filter. + /// + public List Versions { get; init; } + } + + /// + /// Struct specifying an activity filter. + /// + public struct ActivityFilter + { + /// + /// Gets or initializes the name of the activity to filter. + /// + public string Name { get; init; } + + /// + /// Gets or initializes the versions of the activity to filter. + /// + public List Versions { get; init; } + } + + /// + /// Struct specifying an entity filter. + /// + public struct EntityFilter + { + /// + /// Gets or initializes the name of the entity to filter. + /// + public string Name { get; init; } + } +} diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index ffd38cf1f..1c6fb63ab 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -10,6 +10,7 @@ using Microsoft.DurableTask.Abstractions; using Microsoft.DurableTask.Entities; using Microsoft.DurableTask.Tracing; +using Microsoft.DurableTask.Worker.Grpc.Internal; using Microsoft.DurableTask.Worker.Shims; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -255,6 +256,7 @@ async ValueTask BuildRuntimeStateAsync( MaxConcurrentEntityWorkItems = workerOptions.Concurrency.MaximumConcurrentEntityWorkItems, Capabilities = { this.worker.grpcOptions.Capabilities }, + WorkItemFilters = this.worker?.workItemFilters?.ToGrpcWorkItemFilters(), }, cancellationToken: cancellation); } diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.cs index 93875961b..e2d875ae6 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.cs @@ -18,6 +18,7 @@ sealed partial class GrpcDurableTaskWorker : DurableTaskWorker readonly ILoggerFactory loggerFactory; readonly ILogger logger; readonly IOrchestrationFilter? orchestrationFilter; + readonly DurableTaskWorkerWorkItemFilters? workItemFilters; /// /// Initializes a new instance of the class. @@ -30,6 +31,7 @@ sealed partial class GrpcDurableTaskWorker : DurableTaskWorker /// The logger. /// The optional used to filter orchestration execution. /// The custom exception properties provider that help build failure details. + /// The optional used to filter work items in the backend. public GrpcDurableTaskWorker( string name, IDurableTaskFactory factory, @@ -38,7 +40,8 @@ public GrpcDurableTaskWorker( IServiceProvider services, ILoggerFactory loggerFactory, IOrchestrationFilter? orchestrationFilter = null, - IExceptionPropertiesProvider? exceptionPropertiesProvider = null) + IExceptionPropertiesProvider? exceptionPropertiesProvider = null, + DurableTaskWorkerWorkItemFilters? workItemFilters = null) : base(name, factory) { this.grpcOptions = Check.NotNull(grpcOptions).Get(name); @@ -48,6 +51,7 @@ public GrpcDurableTaskWorker( this.logger = CreateLogger(loggerFactory, this.workerOptions); this.orchestrationFilter = orchestrationFilter; this.ExceptionPropertiesProvider = exceptionPropertiesProvider; + this.workItemFilters = workItemFilters; } /// diff --git a/src/Worker/Grpc/Internal/DurableTaskWorkerWorkItemFiltersExtension.cs b/src/Worker/Grpc/Internal/DurableTaskWorkerWorkItemFiltersExtension.cs new file mode 100644 index 000000000..75bc47ac6 --- /dev/null +++ b/src/Worker/Grpc/Internal/DurableTaskWorkerWorkItemFiltersExtension.cs @@ -0,0 +1,52 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using P = Microsoft.DurableTask.Protobuf; + +namespace Microsoft.DurableTask.Worker.Grpc.Internal; + +/// +/// Extension for to convert to gRPC types. +/// +public static class DurableTaskWorkerWorkItemFiltersExtensions +{ + /// + /// Converts a to a gRPC . + /// + /// The to convert. + /// A gRPC . + public static P.WorkItemFilters ToGrpcWorkItemFilters(this DurableTaskWorkerWorkItemFilters workItemFilter) + { + var grpcWorkItemFilters = new P.WorkItemFilters(); + foreach (var orchestrationFilter in workItemFilter.Orchestrations) + { + var grpcOrchestrationFilter = new P.OrchestrationFilter + { + Name = orchestrationFilter.Name, + }; + grpcOrchestrationFilter.Versions.AddRange(orchestrationFilter.Versions); + grpcWorkItemFilters.Orchestrations.Add(grpcOrchestrationFilter); + } + + foreach (var activityFilter in workItemFilter.Activities) + { + var grpcActivityAction = new P.ActivityFilter + { + Name = activityFilter.Name, + }; + grpcActivityAction.Versions.AddRange(activityFilter.Versions); + grpcWorkItemFilters.Activities.Add(grpcActivityAction); + } + + foreach (var entityFilter in workItemFilter.Entities) + { + var grpcEntityAction = new P.EntityFilter + { + Name = entityFilter.Name, + }; + grpcWorkItemFilters.Entities.Add(grpcEntityAction); + } + + return grpcWorkItemFilters; + } +} diff --git a/test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs b/test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs new file mode 100644 index 000000000..59c68a1f1 --- /dev/null +++ b/test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs @@ -0,0 +1,189 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask.Entities; +using Microsoft.DurableTask.Worker.Hosting; +using Microsoft.Extensions.DependencyInjection; + +namespace Microsoft.DurableTask.Worker.Tests; + +public class UseWorkItemFiltersTests +{ + [Fact] + public void UseWorkItemFilters_NullBuilder_Throws() + { + // Arrange + IDurableTaskWorkerBuilder builder = null!; + + // Act + Action act = () => builder.UseWorkItemFilters(); + + // Assert + act.Should().ThrowExactly().WithParameterName("builder"); + } + + [Fact] + public void UseWorkItemFilters_WithExplicitFilters_RegistersFilters() + { + // Arrange + ServiceCollection services = new(); + DefaultDurableTaskWorkerBuilder builder = new("test", services); + DurableTaskRegistry registry = new(); + DurableTaskWorkerWorkItemFilters filters = new(registry, null); + + // Act + builder.UseWorkItemFilters(filters); + ServiceProvider provider = services.BuildServiceProvider(); + DurableTaskWorkerWorkItemFilters actual = provider.GetRequiredService(); + + // Assert + actual.Should().BeSameAs(filters); + } + + [Fact] + public void UseWorkItemFilters_WithoutFilters_AutoGeneratesFromRegistry() + { + // Arrange + ServiceCollection services = new(); + DefaultDurableTaskWorkerBuilder builder = new("test", services); + builder.AddTasks(registry => + { + registry.AddOrchestrator(); + registry.AddActivity(); + }); + + // Act + builder.UseWorkItemFilters(); + ServiceProvider provider = services.BuildServiceProvider(); + DurableTaskWorkerWorkItemFilters actual = provider.GetRequiredService(); + + // Assert + actual.Orchestrations.Should().ContainSingle(o => o.Name == nameof(TestOrchestrator)); + actual.Activities.Should().ContainSingle(a => a.Name == nameof(TestActivity)); + } + + [Fact] + public void UseWorkItemFilters_WithVersioning_IncludesVersionInFilters() + { + // Arrange + ServiceCollection services = new(); + DefaultDurableTaskWorkerBuilder builder = new("test", services); + builder.AddTasks(registry => + { + registry.AddOrchestrator(); + registry.AddActivity(); + }); + builder.Configure(options => + { + options.Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + DefaultVersion = "1.0" + }; + }); + + // Act + builder.UseWorkItemFilters(); + ServiceProvider provider = services.BuildServiceProvider(); + DurableTaskWorkerWorkItemFilters actual = provider.GetRequiredService(); + + // Assert + actual.Orchestrations.Should().ContainSingle(o => o.Name == nameof(TestOrchestrator) && o.Versions.Contains("1.0")); + actual.Activities.Should().ContainSingle(a => a.Name == nameof(TestActivity) && a.Versions.Contains("1.0")); + } + + [Fact] + public void UseWorkItemFilters_WithEntity_IncludesEntityInFilters() + { + // Arrange + ServiceCollection services = new(); + DefaultDurableTaskWorkerBuilder builder = new("test", services); + builder.AddTasks(registry => + { + registry.AddEntity(); + }); + + // Act + builder.UseWorkItemFilters(); + ServiceProvider provider = services.BuildServiceProvider(); + DurableTaskWorkerWorkItemFilters actual = provider.GetRequiredService(); + + // Assert + actual.Entities.Should().ContainSingle(e => e.Name == nameof(TestEntity).ToLowerInvariant()); + } + + [Fact] + public void UseWorkItemFilters_ReturnsBuilder_ForChaining() + { + // Arrange + ServiceCollection services = new(); + DefaultDurableTaskWorkerBuilder builder = new("test", services); + + // Act + IDurableTaskWorkerBuilder result = builder.UseWorkItemFilters(); + + // Assert + result.Should().BeSameAs(builder); + } + + [Fact] + public void UseWorkItemFilters_EmptyRegistry_CreatesEmptyFilters() + { + // Arrange + ServiceCollection services = new(); + DefaultDurableTaskWorkerBuilder builder = new("test", services); + builder.AddTasks(_ => { }); + + // Act + builder.UseWorkItemFilters(); + ServiceProvider provider = services.BuildServiceProvider(); + DurableTaskWorkerWorkItemFilters actual = provider.GetRequiredService(); + + // Assert + actual.Orchestrations.Should().BeEmpty(); + actual.Activities.Should().BeEmpty(); + actual.Entities.Should().BeEmpty(); + } + + [Fact] + public void UseWorkItemFilters_NamedBuilders_HaveUniqueFilters() + { + // Arrange + ServiceCollection services = new(); + DefaultDurableTaskWorkerBuilder builder1 = new("worker1", services); + builder1.AddTasks(registry => registry.AddOrchestrator()); + builder1.UseWorkItemFilters(); + + DefaultDurableTaskWorkerBuilder builder2 = new("worker2", services); + builder2.AddTasks(registry => registry.AddActivity()); + builder2.UseWorkItemFilters(); + + // Act + ServiceProvider provider = services.BuildServiceProvider(); + IEnumerable allFilters = provider.GetServices(); + + // Assert + allFilters.Should().HaveCount(2); + allFilters.Should().Contain(f => f.Orchestrations.Any(o => o.Name == nameof(TestOrchestrator)) && !f.Activities.Any()); + allFilters.Should().Contain(f => f.Activities.Any(a => a.Name == nameof(TestActivity)) && !f.Orchestrations.Any()); + } + + class TestOrchestrator : TaskOrchestrator + { + public override Task RunAsync(TaskOrchestrationContext context, object input) + { + throw new NotImplementedException(); + } + } + + class TestActivity : TaskActivity + { + public override Task RunAsync(TaskActivityContext context, object input) + { + throw new NotImplementedException(); + } + } + + class TestEntity : TaskEntity + { + } +} From 787ecb230c74c094717cabd689fe3cb847fc912b Mon Sep 17 00:00:00 2001 From: Hal Spang Date: Thu, 15 Jan 2026 13:25:41 -0800 Subject: [PATCH 2/2] Add tests, update filter construction Signed-off-by: Hal Spang --- .../DurableTaskWorkerBuilderExtensions.cs | 4 +- .../Core/DurableTaskWorkerWorkItemFilters.cs | 79 ++---- .../Grpc/GrpcDurableTaskWorker.Processor.cs | 2 +- ...rableTaskWorkerWorkItemFiltersExtension.cs | 13 +- .../UseWorkItemFiltersTests.cs | 3 +- ...TaskWorkerWorkItemFiltersExtensionTests.cs | 263 ++++++++++++++++++ 6 files changed, 300 insertions(+), 64 deletions(-) create mode 100644 test/Worker/Grpc.Tests/DurableTaskWorkerWorkItemFiltersExtensionTests.cs diff --git a/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs b/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs index 0a83e9d52..66a8521bd 100644 --- a/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs +++ b/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs @@ -154,12 +154,12 @@ public static IDurableTaskWorkerBuilder UseWorkItemFilters(this IDurableTaskWork } else { - // Auto-generate the filters from registered orchestrations, activities, and entitites. + // Auto-generate the filters from registered orchestrations, activities, and entities. builder.Services.AddSingleton(provider => { DurableTaskRegistry registry = provider.GetRequiredService>().Get(builder.Name); DurableTaskWorkerOptions? options = provider.GetOptions(builder.Name); - return new DurableTaskWorkerWorkItemFilters(registry, options); + return DurableTaskWorkerWorkItemFilters.FromDurableTaskRegistry(registry, options); }); } diff --git a/src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs b/src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs index 25fc14254..57270f0e7 100644 --- a/src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs +++ b/src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs @@ -11,76 +11,49 @@ namespace Microsoft.DurableTask.Worker; public class DurableTaskWorkerWorkItemFilters { /// - /// Initializes a new instance of the class. + /// Gets or initializes the orchestration filters. /// - public DurableTaskWorkerWorkItemFilters() - { - this.Orchestrations = []; - this.Activities = []; - this.Entities = []; - } + public IReadOnlyList Orchestrations { get; init; } = []; + + /// + /// Gets or initializes the activity filters. + /// + public IReadOnlyList Activities { get; init; } = []; /// - /// Initializes a new instance of the class. + /// Gets or initializes the entity filters. + /// + public IReadOnlyList Entities { get; init; } = []; + + /// + /// Creates a new instance of the class. /// /// to construct the filter from. /// that optionally provides versioning information. - internal DurableTaskWorkerWorkItemFilters(DurableTaskRegistry registry, DurableTaskWorkerOptions? workerOptions) + /// A new instance of constructed from the provided registry. + internal static DurableTaskWorkerWorkItemFilters FromDurableTaskRegistry(DurableTaskRegistry registry, DurableTaskWorkerOptions? workerOptions) { - List orchestrationActions = new(); - foreach (var orchestration in registry.Orchestrators) + // TODO: Support multiple versions per orchestration/activity. For now, grab the worker version from the options. + return new DurableTaskWorkerWorkItemFilters { - orchestrationActions.Add(new OrchestrationFilter + Orchestrations = registry.Orchestrators.Select(orchestration => new OrchestrationFilter { Name = orchestration.Key, - - // TODO: Support multiple orchestration versions, for now, utilize the Worker's version. Versions = workerOptions?.Versioning != null ? [workerOptions.Versioning.DefaultVersion] : [], - }); - } - - this.Orchestrations = orchestrationActions; - List activityActions = new(); - foreach (var activity in registry.Activities) - { - activityActions.Add(new ActivityFilter + }).ToList(), + Activities = registry.Activities.Select(activity => new ActivityFilter { Name = activity.Key, - - // TODO: Support multiple activity versions, for now, utilize the Worker's version. Versions = workerOptions?.Versioning != null ? [workerOptions.Versioning.DefaultVersion] : [], - }); - } - - this.Activities = activityActions; - List entityActions = new(); - foreach (var entity in registry.Entities) - { - entityActions.Add(new EntityFilter + }).ToList(), + Entities = registry.Entities.Select(entity => new EntityFilter { // Entity names are normalized to lowercase in the backend. Name = entity.Key.ToString().ToLowerInvariant(), - }); - } - - this.Entities = entityActions; + }).ToList(), + }; } - /// - /// Gets or initializes the orchestration filters. - /// - public IReadOnlyList Orchestrations { get; init; } - - /// - /// Gets or initializes the activity filters. - /// - public IReadOnlyList Activities { get; init; } - - /// - /// Gets or initializes the entity filters. - /// - public IReadOnlyList Entities { get; init; } - /// /// Struct specifying an orchestration filter. /// @@ -94,7 +67,7 @@ public struct OrchestrationFilter /// /// Gets or initializes the versions of the orchestration to filter. /// - public List Versions { get; init; } + public IReadOnlyList Versions { get; init; } } /// @@ -110,7 +83,7 @@ public struct ActivityFilter /// /// Gets or initializes the versions of the activity to filter. /// - public List Versions { get; init; } + public IReadOnlyList Versions { get; init; } } /// diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index 1c6fb63ab..102b35975 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -256,7 +256,7 @@ async ValueTask BuildRuntimeStateAsync( MaxConcurrentEntityWorkItems = workerOptions.Concurrency.MaximumConcurrentEntityWorkItems, Capabilities = { this.worker.grpcOptions.Capabilities }, - WorkItemFilters = this.worker?.workItemFilters?.ToGrpcWorkItemFilters(), + WorkItemFilters = this.worker.workItemFilters?.ToGrpcWorkItemFilters(), }, cancellationToken: cancellation); } diff --git a/src/Worker/Grpc/Internal/DurableTaskWorkerWorkItemFiltersExtension.cs b/src/Worker/Grpc/Internal/DurableTaskWorkerWorkItemFiltersExtension.cs index 75bc47ac6..176d376c1 100644 --- a/src/Worker/Grpc/Internal/DurableTaskWorkerWorkItemFiltersExtension.cs +++ b/src/Worker/Grpc/Internal/DurableTaskWorkerWorkItemFiltersExtension.cs @@ -8,7 +8,7 @@ namespace Microsoft.DurableTask.Worker.Grpc.Internal; /// /// Extension for to convert to gRPC types. /// -public static class DurableTaskWorkerWorkItemFiltersExtensions +public static class DurableTaskWorkerWorkItemFiltersExtension { /// /// Converts a to a gRPC . @@ -17,6 +17,7 @@ public static class DurableTaskWorkerWorkItemFiltersExtensions /// A gRPC . public static P.WorkItemFilters ToGrpcWorkItemFilters(this DurableTaskWorkerWorkItemFilters workItemFilter) { + Check.NotNull(workItemFilter); var grpcWorkItemFilters = new P.WorkItemFilters(); foreach (var orchestrationFilter in workItemFilter.Orchestrations) { @@ -30,21 +31,21 @@ public static P.WorkItemFilters ToGrpcWorkItemFilters(this DurableTaskWorkerWork foreach (var activityFilter in workItemFilter.Activities) { - var grpcActivityAction = new P.ActivityFilter + var grpcActivityFilter = new P.ActivityFilter { Name = activityFilter.Name, }; - grpcActivityAction.Versions.AddRange(activityFilter.Versions); - grpcWorkItemFilters.Activities.Add(grpcActivityAction); + grpcActivityFilter.Versions.AddRange(activityFilter.Versions); + grpcWorkItemFilters.Activities.Add(grpcActivityFilter); } foreach (var entityFilter in workItemFilter.Entities) { - var grpcEntityAction = new P.EntityFilter + var grpcEntityFilter = new P.EntityFilter { Name = entityFilter.Name, }; - grpcWorkItemFilters.Entities.Add(grpcEntityAction); + grpcWorkItemFilters.Entities.Add(grpcEntityFilter); } return grpcWorkItemFilters; diff --git a/test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs b/test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs index 59c68a1f1..7404e8f2d 100644 --- a/test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs +++ b/test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs @@ -2,7 +2,6 @@ // Licensed under the MIT License. using Microsoft.DurableTask.Entities; -using Microsoft.DurableTask.Worker.Hosting; using Microsoft.Extensions.DependencyInjection; namespace Microsoft.DurableTask.Worker.Tests; @@ -29,7 +28,7 @@ public void UseWorkItemFilters_WithExplicitFilters_RegistersFilters() ServiceCollection services = new(); DefaultDurableTaskWorkerBuilder builder = new("test", services); DurableTaskRegistry registry = new(); - DurableTaskWorkerWorkItemFilters filters = new(registry, null); + DurableTaskWorkerWorkItemFilters filters = DurableTaskWorkerWorkItemFilters.FromDurableTaskRegistry(registry, null); // Act builder.UseWorkItemFilters(filters); diff --git a/test/Worker/Grpc.Tests/DurableTaskWorkerWorkItemFiltersExtensionTests.cs b/test/Worker/Grpc.Tests/DurableTaskWorkerWorkItemFiltersExtensionTests.cs new file mode 100644 index 000000000..fa669df7e --- /dev/null +++ b/test/Worker/Grpc.Tests/DurableTaskWorkerWorkItemFiltersExtensionTests.cs @@ -0,0 +1,263 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask.Worker.Grpc.Internal; +using P = Microsoft.DurableTask.Protobuf; + +namespace Microsoft.DurableTask.Worker.Grpc.Tests; + +public class DurableTaskWorkerWorkItemFiltersExtensionTests +{ + [Fact] + public void ToGrpcWorkItemFilters_EmptyFilters_ReturnsEmptyGrpcFilters() + { + // Arrange + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = [], + Activities = [], + Entities = [], + }; + + // Act + P.WorkItemFilters result = filters.ToGrpcWorkItemFilters(); + + // Assert + result.Orchestrations.Should().BeEmpty(); + result.Activities.Should().BeEmpty(); + result.Entities.Should().BeEmpty(); + } + + [Fact] + public void ToGrpcWorkItemFilters_WithOrchestration_ConvertsName() + { + // Arrange + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = + [ + new DurableTaskWorkerWorkItemFilters.OrchestrationFilter + { + Name = "TestOrchestrator", + Versions = [], + }, + ], + Activities = [], + Entities = [], + }; + + // Act + P.WorkItemFilters result = filters.ToGrpcWorkItemFilters(); + + // Assert + result.Orchestrations.Should().ContainSingle(); + result.Orchestrations[0].Name.Should().Be("TestOrchestrator"); + result.Orchestrations[0].Versions.Should().BeEmpty(); + } + + [Fact] + public void ToGrpcWorkItemFilters_WithOrchestrationVersions_ConvertsVersions() + { + // Arrange + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = + [ + new DurableTaskWorkerWorkItemFilters.OrchestrationFilter + { + Name = "TestOrchestrator", + Versions = ["1.0", "2.0"], + }, + ], + Activities = [], + Entities = [], + }; + + // Act + P.WorkItemFilters result = filters.ToGrpcWorkItemFilters(); + + // Assert + result.Orchestrations.Should().ContainSingle(); + result.Orchestrations[0].Versions.Should().BeEquivalentTo(["1.0", "2.0"]); + } + + [Fact] + public void ToGrpcWorkItemFilters_WithActivity_ConvertsName() + { + // Arrange + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = [], + Activities = + [ + new DurableTaskWorkerWorkItemFilters.ActivityFilter + { + Name = "TestActivity", + Versions = [], + }, + ], + Entities = [], + }; + + // Act + P.WorkItemFilters result = filters.ToGrpcWorkItemFilters(); + + // Assert + result.Activities.Should().ContainSingle(); + result.Activities[0].Name.Should().Be("TestActivity"); + result.Activities[0].Versions.Should().BeEmpty(); + } + + [Fact] + public void ToGrpcWorkItemFilters_WithActivityVersions_ConvertsVersions() + { + // Arrange + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = [], + Activities = + [ + new DurableTaskWorkerWorkItemFilters.ActivityFilter + { + Name = "TestActivity", + Versions = ["v1", "v2", "v3"], + }, + ], + Entities = [], + }; + + // Act + P.WorkItemFilters result = filters.ToGrpcWorkItemFilters(); + + // Assert + result.Activities.Should().ContainSingle(); + result.Activities[0].Versions.Should().BeEquivalentTo(["v1", "v2", "v3"]); + } + + [Fact] + public void ToGrpcWorkItemFilters_WithEntity_ConvertsName() + { + // Arrange + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = [], + Activities = [], + Entities = + [ + new DurableTaskWorkerWorkItemFilters.EntityFilter + { + Name = "testentity", + }, + ], + }; + + // Act + P.WorkItemFilters result = filters.ToGrpcWorkItemFilters(); + + // Assert + result.Entities.Should().ContainSingle(); + result.Entities[0].Name.Should().Be("testentity"); + } + + [Fact] + public void ToGrpcWorkItemFilters_WithMultipleOrchestrations_ConvertsAll() + { + // Arrange + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = + [ + new DurableTaskWorkerWorkItemFilters.OrchestrationFilter { Name = "Orch1", Versions = ["1.0"] }, + new DurableTaskWorkerWorkItemFilters.OrchestrationFilter { Name = "Orch2", Versions = ["2.0"] }, + new DurableTaskWorkerWorkItemFilters.OrchestrationFilter { Name = "Orch3", Versions = [] }, + ], + Activities = [], + Entities = [], + }; + + // Act + P.WorkItemFilters result = filters.ToGrpcWorkItemFilters(); + + // Assert + result.Orchestrations.Should().HaveCount(3); + result.Orchestrations.Select(o => o.Name).Should().BeEquivalentTo(["Orch1", "Orch2", "Orch3"]); + } + + [Fact] + public void ToGrpcWorkItemFilters_WithMultipleActivities_ConvertsAll() + { + // Arrange + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = [], + Activities = + [ + new DurableTaskWorkerWorkItemFilters.ActivityFilter { Name = "Activity1", Versions = [] }, + new DurableTaskWorkerWorkItemFilters.ActivityFilter { Name = "Activity2", Versions = [] }, + ], + Entities = [], + }; + + // Act + P.WorkItemFilters result = filters.ToGrpcWorkItemFilters(); + + // Assert + result.Activities.Should().HaveCount(2); + result.Activities.Select(a => a.Name).Should().BeEquivalentTo(["Activity1", "Activity2"]); + } + + [Fact] + public void ToGrpcWorkItemFilters_WithMultipleEntities_ConvertsAll() + { + // Arrange + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = [], + Activities = [], + Entities = + [ + new DurableTaskWorkerWorkItemFilters.EntityFilter { Name = "entity1" }, + new DurableTaskWorkerWorkItemFilters.EntityFilter { Name = "entity2" }, + new DurableTaskWorkerWorkItemFilters.EntityFilter { Name = "entity3" }, + ], + }; + + // Act + P.WorkItemFilters result = filters.ToGrpcWorkItemFilters(); + + // Assert + result.Entities.Should().HaveCount(3); + result.Entities.Select(e => e.Name).Should().BeEquivalentTo(["entity1", "entity2", "entity3"]); + } + + [Fact] + public void ToGrpcWorkItemFilters_WithMixedFilters_ConvertsAll() + { + // Arrange + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = + [ + new DurableTaskWorkerWorkItemFilters.OrchestrationFilter { Name = "MyOrchestrator", Versions = ["1.0"] }, + ], + Activities = + [ + new DurableTaskWorkerWorkItemFilters.ActivityFilter { Name = "MyActivity", Versions = ["1.0", "2.0"] }, + ], + Entities = + [ + new DurableTaskWorkerWorkItemFilters.EntityFilter { Name = "myentity" }, + ], + }; + + // Act + P.WorkItemFilters result = filters.ToGrpcWorkItemFilters(); + + // Assert + result.Orchestrations.Should().ContainSingle().Which.Name.Should().Be("MyOrchestrator"); + result.Orchestrations[0].Versions.Should().BeEquivalentTo(["1.0"]); + result.Activities.Should().ContainSingle().Which.Name.Should().Be("MyActivity"); + result.Activities[0].Versions.Should().BeEquivalentTo(["1.0", "2.0"]); + result.Entities.Should().ContainSingle().Which.Name.Should().Be("myentity"); + } +}