diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto
index 8ef46a4a7..0c34d986d 100644
--- a/src/Grpc/orchestrator_service.proto
+++ b/src/Grpc/orchestrator_service.proto
@@ -822,6 +822,7 @@ message GetWorkItemsRequest {
int32 maxConcurrentEntityWorkItems = 3;
repeated WorkerCapability capabilities = 10;
+ WorkItemFilters workItemFilters = 11;
}
enum WorkerCapability {
@@ -844,6 +845,26 @@ enum WorkerCapability {
WORKER_CAPABILITY_LARGE_PAYLOADS = 3;
}
+message WorkItemFilters {
+ repeated OrchestrationFilter orchestrations = 1;
+ repeated ActivityFilter activities = 2;
+ repeated EntityFilter entities = 3;
+}
+
+message OrchestrationFilter {
+ string name = 1;
+ repeated string versions = 2;
+}
+
+message ActivityFilter {
+ string name = 1;
+ repeated string versions = 2;
+}
+
+message EntityFilter {
+ string name = 1;
+}
+
message WorkItem {
oneof request {
OrchestratorRequest orchestratorRequest = 1;
diff --git a/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs b/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs
index 3f349b710..66a8521bd 100644
--- a/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs
+++ b/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs
@@ -137,4 +137,32 @@ public static IDurableTaskWorkerBuilder UseOrchestrationFilter(this IDurableTask
builder.Services.AddSingleton(filter);
return builder;
}
+
+ ///
+ /// Adds to the specified .
+ ///
+ /// The builder to set the builder target for.
+ /// The instance of a to use.
+ /// The same instance, allowing for method chaining.
+ /// If this is called without specified filters, the filters will be constructed from the registered orchestrations, activities, and entities.
+ public static IDurableTaskWorkerBuilder UseWorkItemFilters(this IDurableTaskWorkerBuilder builder, DurableTaskWorkerWorkItemFilters? workItemFilters = null)
+ {
+ Check.NotNull(builder);
+ if (workItemFilters != null)
+ {
+ builder.Services.AddSingleton(workItemFilters);
+ }
+ else
+ {
+ // Auto-generate the filters from registered orchestrations, activities, and entities.
+ builder.Services.AddSingleton(provider =>
+ {
+ DurableTaskRegistry registry = provider.GetRequiredService>().Get(builder.Name);
+ DurableTaskWorkerOptions? options = provider.GetOptions(builder.Name);
+ return DurableTaskWorkerWorkItemFilters.FromDurableTaskRegistry(registry, options);
+ });
+ }
+
+ return builder;
+ }
}
diff --git a/src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs b/src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs
new file mode 100644
index 000000000..57270f0e7
--- /dev/null
+++ b/src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs
@@ -0,0 +1,99 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+namespace Microsoft.DurableTask.Worker;
+
+///
+/// A class that represents work item filters for a Durable Task Worker. These filters are passed to the backend
+/// and only work items matching the filters will be processed by the worker. If no filters are provided,
+/// the worker will process all work items.
+///
+public class DurableTaskWorkerWorkItemFilters
+{
+ ///
+ /// Gets or initializes the orchestration filters.
+ ///
+ public IReadOnlyList Orchestrations { get; init; } = [];
+
+ ///
+ /// Gets or initializes the activity filters.
+ ///
+ public IReadOnlyList Activities { get; init; } = [];
+
+ ///
+ /// Gets or initializes the entity filters.
+ ///
+ public IReadOnlyList Entities { get; init; } = [];
+
+ ///
+ /// Creates a new instance of the class.
+ ///
+ /// to construct the filter from.
+ /// that optionally provides versioning information.
+ /// A new instance of constructed from the provided registry.
+ internal static DurableTaskWorkerWorkItemFilters FromDurableTaskRegistry(DurableTaskRegistry registry, DurableTaskWorkerOptions? workerOptions)
+ {
+ // TODO: Support multiple versions per orchestration/activity. For now, grab the worker version from the options.
+ return new DurableTaskWorkerWorkItemFilters
+ {
+ Orchestrations = registry.Orchestrators.Select(orchestration => new OrchestrationFilter
+ {
+ Name = orchestration.Key,
+ Versions = workerOptions?.Versioning != null ? [workerOptions.Versioning.DefaultVersion] : [],
+ }).ToList(),
+ Activities = registry.Activities.Select(activity => new ActivityFilter
+ {
+ Name = activity.Key,
+ Versions = workerOptions?.Versioning != null ? [workerOptions.Versioning.DefaultVersion] : [],
+ }).ToList(),
+ Entities = registry.Entities.Select(entity => new EntityFilter
+ {
+ // Entity names are normalized to lowercase in the backend.
+ Name = entity.Key.ToString().ToLowerInvariant(),
+ }).ToList(),
+ };
+ }
+
+ ///
+ /// Struct specifying an orchestration filter.
+ ///
+ public struct OrchestrationFilter
+ {
+ ///
+ /// Gets or initializes the name of the orchestration to filter.
+ ///
+ public string Name { get; init; }
+
+ ///
+ /// Gets or initializes the versions of the orchestration to filter.
+ ///
+ public IReadOnlyList Versions { get; init; }
+ }
+
+ ///
+ /// Struct specifying an activity filter.
+ ///
+ public struct ActivityFilter
+ {
+ ///
+ /// Gets or initializes the name of the activity to filter.
+ ///
+ public string Name { get; init; }
+
+ ///
+ /// Gets or initializes the versions of the activity to filter.
+ ///
+ public IReadOnlyList Versions { get; init; }
+ }
+
+ ///
+ /// Struct specifying an entity filter.
+ ///
+ public struct EntityFilter
+ {
+ ///
+ /// Gets or initializes the name of the entity to filter.
+ ///
+ public string Name { get; init; }
+ }
+}
diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
index ffd38cf1f..102b35975 100644
--- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
+++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
@@ -10,6 +10,7 @@
using Microsoft.DurableTask.Abstractions;
using Microsoft.DurableTask.Entities;
using Microsoft.DurableTask.Tracing;
+using Microsoft.DurableTask.Worker.Grpc.Internal;
using Microsoft.DurableTask.Worker.Shims;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
@@ -255,6 +256,7 @@ async ValueTask BuildRuntimeStateAsync(
MaxConcurrentEntityWorkItems =
workerOptions.Concurrency.MaximumConcurrentEntityWorkItems,
Capabilities = { this.worker.grpcOptions.Capabilities },
+ WorkItemFilters = this.worker.workItemFilters?.ToGrpcWorkItemFilters(),
},
cancellationToken: cancellation);
}
diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.cs
index 93875961b..e2d875ae6 100644
--- a/src/Worker/Grpc/GrpcDurableTaskWorker.cs
+++ b/src/Worker/Grpc/GrpcDurableTaskWorker.cs
@@ -18,6 +18,7 @@ sealed partial class GrpcDurableTaskWorker : DurableTaskWorker
readonly ILoggerFactory loggerFactory;
readonly ILogger logger;
readonly IOrchestrationFilter? orchestrationFilter;
+ readonly DurableTaskWorkerWorkItemFilters? workItemFilters;
///
/// Initializes a new instance of the class.
@@ -30,6 +31,7 @@ sealed partial class GrpcDurableTaskWorker : DurableTaskWorker
/// The logger.
/// The optional used to filter orchestration execution.
/// The custom exception properties provider that help build failure details.
+ /// The optional used to filter work items in the backend.
public GrpcDurableTaskWorker(
string name,
IDurableTaskFactory factory,
@@ -38,7 +40,8 @@ public GrpcDurableTaskWorker(
IServiceProvider services,
ILoggerFactory loggerFactory,
IOrchestrationFilter? orchestrationFilter = null,
- IExceptionPropertiesProvider? exceptionPropertiesProvider = null)
+ IExceptionPropertiesProvider? exceptionPropertiesProvider = null,
+ DurableTaskWorkerWorkItemFilters? workItemFilters = null)
: base(name, factory)
{
this.grpcOptions = Check.NotNull(grpcOptions).Get(name);
@@ -48,6 +51,7 @@ public GrpcDurableTaskWorker(
this.logger = CreateLogger(loggerFactory, this.workerOptions);
this.orchestrationFilter = orchestrationFilter;
this.ExceptionPropertiesProvider = exceptionPropertiesProvider;
+ this.workItemFilters = workItemFilters;
}
///
diff --git a/src/Worker/Grpc/Internal/DurableTaskWorkerWorkItemFiltersExtension.cs b/src/Worker/Grpc/Internal/DurableTaskWorkerWorkItemFiltersExtension.cs
new file mode 100644
index 000000000..176d376c1
--- /dev/null
+++ b/src/Worker/Grpc/Internal/DurableTaskWorkerWorkItemFiltersExtension.cs
@@ -0,0 +1,53 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using P = Microsoft.DurableTask.Protobuf;
+
+namespace Microsoft.DurableTask.Worker.Grpc.Internal;
+
+///
+/// Extension for to convert to gRPC types.
+///
+public static class DurableTaskWorkerWorkItemFiltersExtension
+{
+ ///
+ /// Converts a to a gRPC .
+ ///
+ /// The to convert.
+ /// A gRPC .
+ public static P.WorkItemFilters ToGrpcWorkItemFilters(this DurableTaskWorkerWorkItemFilters workItemFilter)
+ {
+ Check.NotNull(workItemFilter);
+ var grpcWorkItemFilters = new P.WorkItemFilters();
+ foreach (var orchestrationFilter in workItemFilter.Orchestrations)
+ {
+ var grpcOrchestrationFilter = new P.OrchestrationFilter
+ {
+ Name = orchestrationFilter.Name,
+ };
+ grpcOrchestrationFilter.Versions.AddRange(orchestrationFilter.Versions);
+ grpcWorkItemFilters.Orchestrations.Add(grpcOrchestrationFilter);
+ }
+
+ foreach (var activityFilter in workItemFilter.Activities)
+ {
+ var grpcActivityFilter = new P.ActivityFilter
+ {
+ Name = activityFilter.Name,
+ };
+ grpcActivityFilter.Versions.AddRange(activityFilter.Versions);
+ grpcWorkItemFilters.Activities.Add(grpcActivityFilter);
+ }
+
+ foreach (var entityFilter in workItemFilter.Entities)
+ {
+ var grpcEntityFilter = new P.EntityFilter
+ {
+ Name = entityFilter.Name,
+ };
+ grpcWorkItemFilters.Entities.Add(grpcEntityFilter);
+ }
+
+ return grpcWorkItemFilters;
+ }
+}
diff --git a/test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs b/test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs
new file mode 100644
index 000000000..7404e8f2d
--- /dev/null
+++ b/test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs
@@ -0,0 +1,188 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using Microsoft.DurableTask.Entities;
+using Microsoft.Extensions.DependencyInjection;
+
+namespace Microsoft.DurableTask.Worker.Tests;
+
+public class UseWorkItemFiltersTests
+{
+ [Fact]
+ public void UseWorkItemFilters_NullBuilder_Throws()
+ {
+ // Arrange
+ IDurableTaskWorkerBuilder builder = null!;
+
+ // Act
+ Action act = () => builder.UseWorkItemFilters();
+
+ // Assert
+ act.Should().ThrowExactly().WithParameterName("builder");
+ }
+
+ [Fact]
+ public void UseWorkItemFilters_WithExplicitFilters_RegistersFilters()
+ {
+ // Arrange
+ ServiceCollection services = new();
+ DefaultDurableTaskWorkerBuilder builder = new("test", services);
+ DurableTaskRegistry registry = new();
+ DurableTaskWorkerWorkItemFilters filters = DurableTaskWorkerWorkItemFilters.FromDurableTaskRegistry(registry, null);
+
+ // Act
+ builder.UseWorkItemFilters(filters);
+ ServiceProvider provider = services.BuildServiceProvider();
+ DurableTaskWorkerWorkItemFilters actual = provider.GetRequiredService();
+
+ // Assert
+ actual.Should().BeSameAs(filters);
+ }
+
+ [Fact]
+ public void UseWorkItemFilters_WithoutFilters_AutoGeneratesFromRegistry()
+ {
+ // Arrange
+ ServiceCollection services = new();
+ DefaultDurableTaskWorkerBuilder builder = new("test", services);
+ builder.AddTasks(registry =>
+ {
+ registry.AddOrchestrator();
+ registry.AddActivity();
+ });
+
+ // Act
+ builder.UseWorkItemFilters();
+ ServiceProvider provider = services.BuildServiceProvider();
+ DurableTaskWorkerWorkItemFilters actual = provider.GetRequiredService();
+
+ // Assert
+ actual.Orchestrations.Should().ContainSingle(o => o.Name == nameof(TestOrchestrator));
+ actual.Activities.Should().ContainSingle(a => a.Name == nameof(TestActivity));
+ }
+
+ [Fact]
+ public void UseWorkItemFilters_WithVersioning_IncludesVersionInFilters()
+ {
+ // Arrange
+ ServiceCollection services = new();
+ DefaultDurableTaskWorkerBuilder builder = new("test", services);
+ builder.AddTasks(registry =>
+ {
+ registry.AddOrchestrator();
+ registry.AddActivity();
+ });
+ builder.Configure(options =>
+ {
+ options.Versioning = new DurableTaskWorkerOptions.VersioningOptions
+ {
+ DefaultVersion = "1.0"
+ };
+ });
+
+ // Act
+ builder.UseWorkItemFilters();
+ ServiceProvider provider = services.BuildServiceProvider();
+ DurableTaskWorkerWorkItemFilters actual = provider.GetRequiredService();
+
+ // Assert
+ actual.Orchestrations.Should().ContainSingle(o => o.Name == nameof(TestOrchestrator) && o.Versions.Contains("1.0"));
+ actual.Activities.Should().ContainSingle(a => a.Name == nameof(TestActivity) && a.Versions.Contains("1.0"));
+ }
+
+ [Fact]
+ public void UseWorkItemFilters_WithEntity_IncludesEntityInFilters()
+ {
+ // Arrange
+ ServiceCollection services = new();
+ DefaultDurableTaskWorkerBuilder builder = new("test", services);
+ builder.AddTasks(registry =>
+ {
+ registry.AddEntity();
+ });
+
+ // Act
+ builder.UseWorkItemFilters();
+ ServiceProvider provider = services.BuildServiceProvider();
+ DurableTaskWorkerWorkItemFilters actual = provider.GetRequiredService();
+
+ // Assert
+ actual.Entities.Should().ContainSingle(e => e.Name == nameof(TestEntity).ToLowerInvariant());
+ }
+
+ [Fact]
+ public void UseWorkItemFilters_ReturnsBuilder_ForChaining()
+ {
+ // Arrange
+ ServiceCollection services = new();
+ DefaultDurableTaskWorkerBuilder builder = new("test", services);
+
+ // Act
+ IDurableTaskWorkerBuilder result = builder.UseWorkItemFilters();
+
+ // Assert
+ result.Should().BeSameAs(builder);
+ }
+
+ [Fact]
+ public void UseWorkItemFilters_EmptyRegistry_CreatesEmptyFilters()
+ {
+ // Arrange
+ ServiceCollection services = new();
+ DefaultDurableTaskWorkerBuilder builder = new("test", services);
+ builder.AddTasks(_ => { });
+
+ // Act
+ builder.UseWorkItemFilters();
+ ServiceProvider provider = services.BuildServiceProvider();
+ DurableTaskWorkerWorkItemFilters actual = provider.GetRequiredService();
+
+ // Assert
+ actual.Orchestrations.Should().BeEmpty();
+ actual.Activities.Should().BeEmpty();
+ actual.Entities.Should().BeEmpty();
+ }
+
+ [Fact]
+ public void UseWorkItemFilters_NamedBuilders_HaveUniqueFilters()
+ {
+ // Arrange
+ ServiceCollection services = new();
+ DefaultDurableTaskWorkerBuilder builder1 = new("worker1", services);
+ builder1.AddTasks(registry => registry.AddOrchestrator());
+ builder1.UseWorkItemFilters();
+
+ DefaultDurableTaskWorkerBuilder builder2 = new("worker2", services);
+ builder2.AddTasks(registry => registry.AddActivity());
+ builder2.UseWorkItemFilters();
+
+ // Act
+ ServiceProvider provider = services.BuildServiceProvider();
+ IEnumerable allFilters = provider.GetServices();
+
+ // Assert
+ allFilters.Should().HaveCount(2);
+ allFilters.Should().Contain(f => f.Orchestrations.Any(o => o.Name == nameof(TestOrchestrator)) && !f.Activities.Any());
+ allFilters.Should().Contain(f => f.Activities.Any(a => a.Name == nameof(TestActivity)) && !f.Orchestrations.Any());
+ }
+
+ class TestOrchestrator : TaskOrchestrator