diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index 8ef46a4a7..0c34d986d 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -822,6 +822,7 @@ message GetWorkItemsRequest { int32 maxConcurrentEntityWorkItems = 3; repeated WorkerCapability capabilities = 10; + WorkItemFilters workItemFilters = 11; } enum WorkerCapability { @@ -844,6 +845,26 @@ enum WorkerCapability { WORKER_CAPABILITY_LARGE_PAYLOADS = 3; } +message WorkItemFilters { + repeated OrchestrationFilter orchestrations = 1; + repeated ActivityFilter activities = 2; + repeated EntityFilter entities = 3; +} + +message OrchestrationFilter { + string name = 1; + repeated string versions = 2; +} + +message ActivityFilter { + string name = 1; + repeated string versions = 2; +} + +message EntityFilter { + string name = 1; +} + message WorkItem { oneof request { OrchestratorRequest orchestratorRequest = 1; diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index b13392fe5..743f3f8bd 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch main at 2026-01-13 00:01:21 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/026329c53fe6363985655857b9ca848ec7238bd2/protos/orchestrator_service.proto +# The following files were downloaded from branch main at 2026-02-24 00:01:28 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/1caadbd7ecfdf5f2309acbeac28a3e36d16aa156/protos/orchestrator_service.proto diff --git a/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs b/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs index 3f349b710..d42845f60 100644 --- a/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs +++ b/src/Worker/Core/DependencyInjection/DurableTaskWorkerBuilderExtensions.cs @@ -137,4 +137,39 @@ public static IDurableTaskWorkerBuilder UseOrchestrationFilter(this IDurableTask builder.Services.AddSingleton(filter); return builder; } + + /// + /// Adds to the specified . + /// + /// The builder to set the builder target for. + /// The instance of a to use. + /// If null, the auto-generated default filters will be cleared. + /// The same instance, allowing for method chaining. + /// Work item filters are auto-generated from the registry by default. + /// Use this method with explicit filters to override the defaults, or with null to opt out of filtering entirely. + public static IDurableTaskWorkerBuilder UseWorkItemFilters(this IDurableTaskWorkerBuilder builder, DurableTaskWorkerWorkItemFilters? workItemFilters) + { + Check.NotNull(builder); + + // Use PostConfigure to ensure provided filters override the auto-generated defaults. + // When null is passed, the filters are cleared to opt out of filtering entirely. + builder.Services.AddOptions(builder.Name) + .PostConfigure(opts => + { + if (workItemFilters is null) + { + opts.Orchestrations = []; + opts.Activities = []; + opts.Entities = []; + } + else + { + opts.Orchestrations = workItemFilters.Orchestrations; + opts.Activities = workItemFilters.Activities; + opts.Entities = workItemFilters.Entities; + } + }); + + return builder; + } } diff --git a/src/Worker/Core/DependencyInjection/ServiceCollectionExtensions.cs b/src/Worker/Core/DependencyInjection/ServiceCollectionExtensions.cs index e68c550cf..9e8a93c6c 100644 --- a/src/Worker/Core/DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/Worker/Core/DependencyInjection/ServiceCollectionExtensions.cs @@ -86,6 +86,21 @@ static IServiceCollection ConfigureDurableOptions(IServiceCollection services, s } }); + // Auto-generate work item filters from the registry by default. + // Users can override these by calling UseWorkItemFilters(customFilters) on the builder. + services.AddOptions(name) + .Configure, IOptionsMonitor>( + (opts, registryMonitor, workerOptionsMonitor) => + { + DurableTaskRegistry registry = registryMonitor.Get(name); + DurableTaskWorkerOptions workerOptions = workerOptionsMonitor.Get(name); + DurableTaskWorkerWorkItemFilters generated = + DurableTaskWorkerWorkItemFilters.FromDurableTaskRegistry(registry, workerOptions); + opts.Orchestrations = generated.Orchestrations; + opts.Activities = generated.Activities; + opts.Entities = generated.Entities; + }); + return services; } diff --git a/src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs b/src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs new file mode 100644 index 000000000..c29ff95ca --- /dev/null +++ b/src/Worker/Core/DurableTaskWorkerWorkItemFilters.cs @@ -0,0 +1,137 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Worker; + +/// +/// A class that represents work item filters for a Durable Task Worker. These filters are passed to the backend +/// and only work items matching the filters will be processed by the worker. If no filters are provided, +/// the worker will process all work items. By default, these are auto-generated from the registered orchestrations, +/// activities, and entities in the . To opt-out of filters, provide a null +/// value to the method when configuring the worker. +/// +public class DurableTaskWorkerWorkItemFilters +{ + /// + /// Gets or sets the orchestration filters. + /// + public IReadOnlyList Orchestrations { get; set; } = []; + + /// + /// Gets or sets the activity filters. + /// + public IReadOnlyList Activities { get; set; } = []; + + /// + /// Gets or sets the entity filters. + /// + public IReadOnlyList Entities { get; set; } = []; + + /// + /// Creates a new instance of the class. + /// + /// to construct the filter from. + /// that optionally provides versioning information. + /// A new instance of constructed from the provided registry. + internal static DurableTaskWorkerWorkItemFilters FromDurableTaskRegistry(DurableTaskRegistry registry, DurableTaskWorkerOptions? workerOptions) + { + // TODO: Support multiple versions per orchestration/activity. + // For now, fetch the version based on the versioning match strategy if defined. If undefined, default to null (all versions match). + IReadOnlyList versions = []; + if (workerOptions?.Versioning?.MatchStrategy == DurableTaskWorkerOptions.VersionMatchStrategy.Strict) + { + versions = [workerOptions.Versioning.Version]; + } + + return new DurableTaskWorkerWorkItemFilters + { + Orchestrations = registry.Orchestrators.Select(orchestration => new OrchestrationFilter + { + Name = orchestration.Key, + Versions = versions, + }).ToList(), + Activities = registry.Activities.Select(activity => new ActivityFilter + { + Name = activity.Key, + Versions = versions, + }).ToList(), + Entities = registry.Entities.Select(entity => new EntityFilter + { + // Entity names are normalized to lowercase in the backend. + Name = entity.Key.ToString(), + }).ToList(), + }; + } + + /// + /// Specifies an orchestration filter. + /// + /// The name of the orchestration. + /// The optional versions of the orchestration. + public readonly struct OrchestrationFilter(string name, IReadOnlyList? versions) + { + /// + /// Initializes a new instance of the struct with default values. + /// + public OrchestrationFilter() + : this(string.Empty, []) + { + } + + /// + /// Gets or initializes the name of the orchestration to filter. + /// + public string Name { get; init; } = name; + + /// + /// Gets or initializes the versions of the orchestration to filter. + /// + public IReadOnlyList Versions { get; init; } = versions ?? []; + } + + /// + /// Specifies an activity filter. + /// + /// The name of the activity. + /// The optional versions of the activity. + public readonly struct ActivityFilter(string name, IReadOnlyList? versions) + { + /// + /// Initializes a new instance of the struct with default values. + /// + public ActivityFilter() + : this(string.Empty, []) + { + } + + /// + /// Gets or initializes the name of the activity to filter. + /// + public string Name { get; init; } = name; + + /// + /// Gets or initializes the versions of the activity to filter. + /// + public IReadOnlyList Versions { get; init; } = versions ?? []; + } + + /// + /// Specifies an entity filter. + /// + /// The name of the entity. + public readonly struct EntityFilter(string name) + { + /// + /// Initializes a new instance of the struct with default values. + /// + public EntityFilter() + : this(string.Empty) + { + } + + /// + /// Gets or initializes the name of the entity to filter. + /// + public string Name { get; init; } = name; + } +} diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index 0049bb0af..a3aa3dab0 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -12,6 +12,7 @@ using Microsoft.DurableTask.Abstractions; using Microsoft.DurableTask.Entities; using Microsoft.DurableTask.Tracing; +using Microsoft.DurableTask.Worker.Grpc.Internal; using Microsoft.DurableTask.Worker.Shims; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -257,6 +258,7 @@ async ValueTask BuildRuntimeStateAsync( MaxConcurrentEntityWorkItems = workerOptions.Concurrency.MaximumConcurrentEntityWorkItems, Capabilities = { this.worker.grpcOptions.Capabilities }, + WorkItemFilters = this.worker.workItemFilters?.ToGrpcWorkItemFilters(), }, cancellationToken: cancellation); } diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.cs index 93875961b..1d03d96ae 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.cs @@ -18,6 +18,7 @@ sealed partial class GrpcDurableTaskWorker : DurableTaskWorker readonly ILoggerFactory loggerFactory; readonly ILogger logger; readonly IOrchestrationFilter? orchestrationFilter; + readonly DurableTaskWorkerWorkItemFilters? workItemFilters; /// /// Initializes a new instance of the class. @@ -30,6 +31,7 @@ sealed partial class GrpcDurableTaskWorker : DurableTaskWorker /// The logger. /// The optional used to filter orchestration execution. /// The custom exception properties provider that help build failure details. + /// The optional used to filter work items in the backend. public GrpcDurableTaskWorker( string name, IDurableTaskFactory factory, @@ -38,7 +40,8 @@ public GrpcDurableTaskWorker( IServiceProvider services, ILoggerFactory loggerFactory, IOrchestrationFilter? orchestrationFilter = null, - IExceptionPropertiesProvider? exceptionPropertiesProvider = null) + IExceptionPropertiesProvider? exceptionPropertiesProvider = null, + IOptionsMonitor? workItemFiltersMonitor = null) : base(name, factory) { this.grpcOptions = Check.NotNull(grpcOptions).Get(name); @@ -48,6 +51,7 @@ public GrpcDurableTaskWorker( this.logger = CreateLogger(loggerFactory, this.workerOptions); this.orchestrationFilter = orchestrationFilter; this.ExceptionPropertiesProvider = exceptionPropertiesProvider; + this.workItemFilters = workItemFiltersMonitor?.Get(name); } /// diff --git a/src/Worker/Grpc/Internal/DurableTaskWorkerWorkItemFiltersExtension.cs b/src/Worker/Grpc/Internal/DurableTaskWorkerWorkItemFiltersExtension.cs new file mode 100644 index 000000000..176d376c1 --- /dev/null +++ b/src/Worker/Grpc/Internal/DurableTaskWorkerWorkItemFiltersExtension.cs @@ -0,0 +1,53 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using P = Microsoft.DurableTask.Protobuf; + +namespace Microsoft.DurableTask.Worker.Grpc.Internal; + +/// +/// Extension for to convert to gRPC types. +/// +public static class DurableTaskWorkerWorkItemFiltersExtension +{ + /// + /// Converts a to a gRPC . + /// + /// The to convert. + /// A gRPC . + public static P.WorkItemFilters ToGrpcWorkItemFilters(this DurableTaskWorkerWorkItemFilters workItemFilter) + { + Check.NotNull(workItemFilter); + var grpcWorkItemFilters = new P.WorkItemFilters(); + foreach (var orchestrationFilter in workItemFilter.Orchestrations) + { + var grpcOrchestrationFilter = new P.OrchestrationFilter + { + Name = orchestrationFilter.Name, + }; + grpcOrchestrationFilter.Versions.AddRange(orchestrationFilter.Versions); + grpcWorkItemFilters.Orchestrations.Add(grpcOrchestrationFilter); + } + + foreach (var activityFilter in workItemFilter.Activities) + { + var grpcActivityFilter = new P.ActivityFilter + { + Name = activityFilter.Name, + }; + grpcActivityFilter.Versions.AddRange(activityFilter.Versions); + grpcWorkItemFilters.Activities.Add(grpcActivityFilter); + } + + foreach (var entityFilter in workItemFilter.Entities) + { + var grpcEntityFilter = new P.EntityFilter + { + Name = entityFilter.Name, + }; + grpcWorkItemFilters.Entities.Add(grpcEntityFilter); + } + + return grpcWorkItemFilters; + } +} diff --git a/test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs b/test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs new file mode 100644 index 000000000..9e325fe7b --- /dev/null +++ b/test/Worker/Core.Tests/DependencyInjection/UseWorkItemFiltersTests.cs @@ -0,0 +1,380 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; + +namespace Microsoft.DurableTask.Worker.Tests; + +public class UseWorkItemFiltersTests +{ + [Fact] + public void UseWorkItemFilters_NullBuilder_Throws() + { + // Arrange + IDurableTaskWorkerBuilder builder = null!; + + // Act + Action act = () => builder.UseWorkItemFilters(null); + + // Assert + act.Should().ThrowExactly().WithParameterName("builder"); + } + + [Fact] + public void UseWorkItemFilters_WithExplicitFilters_RegistersFilters() + { + // Arrange + ServiceCollection services = new(); + DefaultDurableTaskWorkerBuilder builder = new("test", services); + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = [new DurableTaskWorkerWorkItemFilters.OrchestrationFilter { Name = "MyOrch", Versions = ["1.0"] }], + Activities = [new DurableTaskWorkerWorkItemFilters.ActivityFilter { Name = "MyActivity", Versions = [] }], + Entities = [new DurableTaskWorkerWorkItemFilters.EntityFilter { Name = "myentity" }], + }; + + // Act + builder.UseWorkItemFilters(filters); + ServiceProvider provider = services.BuildServiceProvider(); + IOptionsMonitor filtersMonitor = + provider.GetRequiredService>(); + DurableTaskWorkerWorkItemFilters actual = filtersMonitor.Get("test"); + + // Assert + actual.Orchestrations.Should().BeEquivalentTo(filters.Orchestrations); + actual.Activities.Should().BeEquivalentTo(filters.Activities); + actual.Entities.Should().BeEquivalentTo(filters.Entities); + } + + [Fact] + public void UseWorkItemFilters_ReturnsBuilder_ForChaining() + { + // Arrange + ServiceCollection services = new(); + DefaultDurableTaskWorkerBuilder builder = new("test", services); + + // Act + IDurableTaskWorkerBuilder result = builder.UseWorkItemFilters(null); + + // Assert + result.Should().BeSameAs(builder); + } + + [Fact] + public void WorkItemFilters_DefaultFromRegistry_WhenNoExplicitFiltersConfigured() + { + // Arrange + ServiceCollection services = new(); + services.AddDurableTaskWorker("test", builder => + { + builder.AddTasks(registry => + { + registry.AddOrchestrator(); + registry.AddActivity(); + }); + }); + + // Act + ServiceProvider provider = services.BuildServiceProvider(); + IOptionsMonitor filtersMonitor = + provider.GetRequiredService>(); + DurableTaskWorkerWorkItemFilters actual = filtersMonitor.Get("test"); + + // Assert + actual.Orchestrations.Should().ContainSingle(o => o.Name == nameof(TestOrchestrator)); + actual.Activities.Should().ContainSingle(a => a.Name == nameof(TestActivity)); + } + + [Fact] + public void WorkItemFilters_DefaultWithEntity_WhenNoExplicitFiltersConfigured() + { + // Arrange + ServiceCollection services = new(); + services.AddDurableTaskWorker("test", builder => + { + builder.AddTasks(registry => + { + registry.AddEntity(); + }); + }); + + // Act + ServiceProvider provider = services.BuildServiceProvider(); + IOptionsMonitor filtersMonitor = + provider.GetRequiredService>(); + DurableTaskWorkerWorkItemFilters actual = filtersMonitor.Get("test"); + + // Assert + actual.Entities.Should().ContainSingle(e => e.Name == nameof(TestEntity)); + } + + [Fact] + public void WorkItemFilters_DefaultNullWithVersioningCurrentOrOlder_WhenNoExplicitFiltersConfigured() + { + // Arrange + ServiceCollection services = new(); + services.AddDurableTaskWorker("test", builder => + { + builder.AddTasks(registry => + { + registry.AddOrchestrator(); + registry.AddActivity(); + }); + builder.Configure(options => + { + options.Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + Version = "1.0", + MatchStrategy = DurableTaskWorkerOptions.VersionMatchStrategy.CurrentOrOlder, + }; + }); + }); + + // Act + ServiceProvider provider = services.BuildServiceProvider(); + IOptionsMonitor filtersMonitor = + provider.GetRequiredService>(); + DurableTaskWorkerWorkItemFilters actual = filtersMonitor.Get("test"); + + // Assert + actual.Orchestrations.Should().ContainSingle(o => o.Name == nameof(TestOrchestrator) && o.Versions.Count == 0); + actual.Activities.Should().ContainSingle(a => a.Name == nameof(TestActivity) && a.Versions.Count == 0); + } + + [Fact] + public void WorkItemFilters_DefaultNullWithVersioningNone_WhenNoExplicitFiltersConfigured() + { + // Arrange + ServiceCollection services = new(); + services.AddDurableTaskWorker("test", builder => + { + builder.AddTasks(registry => + { + registry.AddOrchestrator(); + registry.AddActivity(); + }); + builder.Configure(options => + { + options.Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + Version = "1.0", + MatchStrategy = DurableTaskWorkerOptions.VersionMatchStrategy.None, + }; + }); + }); + + // Act + ServiceProvider provider = services.BuildServiceProvider(); + IOptionsMonitor filtersMonitor = + provider.GetRequiredService>(); + DurableTaskWorkerWorkItemFilters actual = filtersMonitor.Get("test"); + + // Assert + actual.Orchestrations.Should().ContainSingle(o => o.Name == nameof(TestOrchestrator) && o.Versions.Count == 0); + actual.Activities.Should().ContainSingle(a => a.Name == nameof(TestActivity) && a.Versions.Count == 0); + } + + [Fact] + public void WorkItemFilters_DefaultVersionWithVersioningStrict_WhenNoExplicitFiltersConfigured() + { + // Arrange + ServiceCollection services = new(); + services.AddDurableTaskWorker("test", builder => + { + builder.AddTasks(registry => + { + registry.AddOrchestrator(); + registry.AddActivity(); + }); + builder.Configure(options => + { + options.Versioning = new DurableTaskWorkerOptions.VersioningOptions + { + Version = "1.0", + MatchStrategy = DurableTaskWorkerOptions.VersionMatchStrategy.Strict, + }; + }); + }); + + // Act + ServiceProvider provider = services.BuildServiceProvider(); + IOptionsMonitor filtersMonitor = + provider.GetRequiredService>(); + DurableTaskWorkerWorkItemFilters actual = filtersMonitor.Get("test"); + + // Assert + actual.Orchestrations.Should().ContainSingle(o => o.Name == nameof(TestOrchestrator) && o.Versions.Contains("1.0")); + actual.Activities.Should().ContainSingle(a => a.Name == nameof(TestActivity) && a.Versions.Contains("1.0")); + } + + [Fact] + public void WorkItemFilters_DefaultEmptyRegistry_ProducesEmptyFilters() + { + // Arrange + ServiceCollection services = new(); + services.AddDurableTaskWorker("test", builder => + { + builder.AddTasks(_ => { }); + }); + + // Act + ServiceProvider provider = services.BuildServiceProvider(); + IOptionsMonitor filtersMonitor = + provider.GetRequiredService>(); + DurableTaskWorkerWorkItemFilters actual = filtersMonitor.Get("test"); + + // Assert + actual.Orchestrations.Should().BeEmpty(); + actual.Activities.Should().BeEmpty(); + actual.Entities.Should().BeEmpty(); + } + + [Fact] + public void WorkItemFilters_ExplicitFiltersOverrideDefaults() + { + // Arrange + ServiceCollection services = new(); + DurableTaskWorkerWorkItemFilters customFilters = new() + { + Orchestrations = [new DurableTaskWorkerWorkItemFilters.OrchestrationFilter { Name = "CustomOrch", Versions = ["2.0"] }], + Activities = [], + Entities = [], + }; + + services.AddDurableTaskWorker("test", builder => + { + builder.AddTasks(registry => + { + registry.AddOrchestrator(); + registry.AddActivity(); + }); + builder.UseWorkItemFilters(customFilters); + }); + + // Act + ServiceProvider provider = services.BuildServiceProvider(); + IOptionsMonitor filtersMonitor = + provider.GetRequiredService>(); + DurableTaskWorkerWorkItemFilters actual = filtersMonitor.Get("test"); + + // Assert + actual.Orchestrations.Should().ContainSingle(o => o.Name == "CustomOrch" && o.Versions.Contains("2.0")); + actual.Activities.Should().BeEmpty(); + actual.Entities.Should().BeEmpty(); + } + + [Fact] + public void WorkItemFilters_NullOverwritesDefaults() + { + // Arrange + ServiceCollection services = new(); + services.AddDurableTaskWorker("test", builder => + { + builder.AddTasks(registry => + { + registry.AddOrchestrator(); + registry.AddActivity(); + }); + builder.UseWorkItemFilters(null); + }); + + // Act + ServiceProvider provider = services.BuildServiceProvider(); + IOptionsMonitor filtersMonitor = + provider.GetRequiredService>(); + DurableTaskWorkerWorkItemFilters actual = filtersMonitor.Get("test"); + + // Assert + actual.Orchestrations.Should().BeEmpty(); + actual.Activities.Should().BeEmpty(); + actual.Entities.Should().BeEmpty(); + } + + [Fact] + public void WorkItemFilters_EmptyFiltersOverrideDefaults() + { + // Arrange + ServiceCollection services = new(); + DurableTaskWorkerWorkItemFilters emptyFilters = new() + { + Orchestrations = [], + Activities = [], + Entities = [], + }; + + services.AddDurableTaskWorker("test", builder => + { + builder.AddTasks(registry => + { + registry.AddOrchestrator(); + registry.AddActivity(); + }); + builder.UseWorkItemFilters(emptyFilters); + }); + + // Act + ServiceProvider provider = services.BuildServiceProvider(); + IOptionsMonitor filtersMonitor = + provider.GetRequiredService>(); + DurableTaskWorkerWorkItemFilters actual = filtersMonitor.Get("test"); + + // Assert + actual.Orchestrations.Should().BeEmpty(); + actual.Activities.Should().BeEmpty(); + actual.Entities.Should().BeEmpty(); + } + + [Fact] + public void WorkItemFilters_NamedBuilders_HaveUniqueDefaultFilters() + { + // Arrange + ServiceCollection services = new(); + services.AddDurableTaskWorker("worker1", builder => + { + builder.AddTasks(registry => registry.AddOrchestrator()); + }); + services.AddDurableTaskWorker("worker2", builder => + { + builder.AddTasks(registry => registry.AddActivity()); + }); + + // Act + ServiceProvider provider = services.BuildServiceProvider(); + IOptionsMonitor filtersMonitor = + provider.GetRequiredService>(); + + DurableTaskWorkerWorkItemFilters worker1Filters = filtersMonitor.Get("worker1"); + DurableTaskWorkerWorkItemFilters worker2Filters = filtersMonitor.Get("worker2"); + + // Assert + worker1Filters.Orchestrations.Should().ContainSingle(o => o.Name == nameof(TestOrchestrator)); + worker1Filters.Activities.Should().BeEmpty(); + + worker2Filters.Activities.Should().ContainSingle(a => a.Name == nameof(TestActivity)); + worker2Filters.Orchestrations.Should().BeEmpty(); + + worker1Filters.Should().NotBeSameAs(worker2Filters); + } + + sealed class TestOrchestrator : TaskOrchestrator + { + public override Task RunAsync(TaskOrchestrationContext context, object input) + { + throw new NotImplementedException(); + } + } + + sealed class TestActivity : TaskActivity + { + public override Task RunAsync(TaskActivityContext context, object input) + { + throw new NotImplementedException(); + } + } + + sealed class TestEntity : TaskEntity + { + } +} \ No newline at end of file diff --git a/test/Worker/Grpc.Tests/DurableTaskWorkerWorkItemFiltersExtensionTests.cs b/test/Worker/Grpc.Tests/DurableTaskWorkerWorkItemFiltersExtensionTests.cs new file mode 100644 index 000000000..5921b4dee --- /dev/null +++ b/test/Worker/Grpc.Tests/DurableTaskWorkerWorkItemFiltersExtensionTests.cs @@ -0,0 +1,424 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask.Worker.Grpc.Internal; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using P = Microsoft.DurableTask.Protobuf; + +namespace Microsoft.DurableTask.Worker.Grpc.Tests; + +public class DurableTaskWorkerWorkItemFiltersExtensionTests +{ + [Fact] + public void ToGrpcWorkItemFilters_EmptyFilters_ReturnsEmptyGrpcFilters() + { + // Arrange + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = [], + Activities = [], + Entities = [], + }; + + // Act + P.WorkItemFilters result = filters.ToGrpcWorkItemFilters(); + + // Assert + result.Orchestrations.Should().BeEmpty(); + result.Activities.Should().BeEmpty(); + result.Entities.Should().BeEmpty(); + } + + [Fact] + public void ToGrpcWorkItemFilters_WithOrchestration_ConvertsName() + { + // Arrange + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = + [ + new DurableTaskWorkerWorkItemFilters.OrchestrationFilter + { + Name = "TestOrchestrator", + Versions = [], + }, + ], + Activities = [], + Entities = [], + }; + + // Act + P.WorkItemFilters result = filters.ToGrpcWorkItemFilters(); + + // Assert + result.Orchestrations.Should().ContainSingle(); + result.Orchestrations[0].Name.Should().Be("TestOrchestrator"); + result.Orchestrations[0].Versions.Should().BeEmpty(); + } + + [Fact] + public void ToGrpcWorkItemFilters_WithOrchestrationVersions_ConvertsVersions() + { + // Arrange + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = + [ + new DurableTaskWorkerWorkItemFilters.OrchestrationFilter + { + Name = "TestOrchestrator", + Versions = ["1.0", "2.0"], + }, + ], + Activities = [], + Entities = [], + }; + + // Act + P.WorkItemFilters result = filters.ToGrpcWorkItemFilters(); + + // Assert + result.Orchestrations.Should().ContainSingle(); + result.Orchestrations[0].Versions.Should().BeEquivalentTo(["1.0", "2.0"]); + } + + [Fact] + public void ToGrpcWorkItemFilters_WithActivity_ConvertsName() + { + // Arrange + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = [], + Activities = + [ + new DurableTaskWorkerWorkItemFilters.ActivityFilter + { + Name = "TestActivity", + Versions = [], + }, + ], + Entities = [], + }; + + // Act + P.WorkItemFilters result = filters.ToGrpcWorkItemFilters(); + + // Assert + result.Activities.Should().ContainSingle(); + result.Activities[0].Name.Should().Be("TestActivity"); + result.Activities[0].Versions.Should().BeEmpty(); + } + + [Fact] + public void ToGrpcWorkItemFilters_WithActivityVersions_ConvertsVersions() + { + // Arrange + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = [], + Activities = + [ + new DurableTaskWorkerWorkItemFilters.ActivityFilter + { + Name = "TestActivity", + Versions = ["v1", "v2", "v3"], + }, + ], + Entities = [], + }; + + // Act + P.WorkItemFilters result = filters.ToGrpcWorkItemFilters(); + + // Assert + result.Activities.Should().ContainSingle(); + result.Activities[0].Versions.Should().BeEquivalentTo(["v1", "v2", "v3"]); + } + + [Fact] + public void ToGrpcWorkItemFilters_WithEntity_ConvertsName() + { + // Arrange + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = [], + Activities = [], + Entities = + [ + new DurableTaskWorkerWorkItemFilters.EntityFilter + { + Name = "testentity", + }, + ], + }; + + // Act + P.WorkItemFilters result = filters.ToGrpcWorkItemFilters(); + + // Assert + result.Entities.Should().ContainSingle(); + result.Entities[0].Name.Should().Be("testentity"); + } + + [Fact] + public void ToGrpcWorkItemFilters_WithNullVersions_ConvertsWithoutError() + { + // Arrange + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = + [ + new DurableTaskWorkerWorkItemFilters.OrchestrationFilter + { + Name = "TestOrchestrator", + }, + ], + Activities = + [ + new DurableTaskWorkerWorkItemFilters.ActivityFilter + { + Name = "TestActivity", + }, + ], + Entities = [], + }; + + // Act + P.WorkItemFilters result = filters.ToGrpcWorkItemFilters(); + + // Assert + result.Orchestrations.Should().ContainSingle(); + result.Orchestrations[0].Name.Should().Be("TestOrchestrator"); + result.Orchestrations[0].Versions.Should().BeEmpty(); + result.Activities.Should().ContainSingle(); + result.Activities[0].Name.Should().Be("TestActivity"); + result.Activities[0].Versions.Should().BeEmpty(); + } + + [Fact] + public void ToGrpcWorkItemFilters_WithMultipleOrchestrations_ConvertsAll() + { + // Arrange + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = + [ + new DurableTaskWorkerWorkItemFilters.OrchestrationFilter { Name = "Orch1", Versions = ["1.0"] }, + new DurableTaskWorkerWorkItemFilters.OrchestrationFilter { Name = "Orch2", Versions = ["2.0"] }, + new DurableTaskWorkerWorkItemFilters.OrchestrationFilter { Name = "Orch3", Versions = [] }, + ], + Activities = [], + Entities = [], + }; + + // Act + P.WorkItemFilters result = filters.ToGrpcWorkItemFilters(); + + // Assert + result.Orchestrations.Should().HaveCount(3); + result.Orchestrations.Select(o => o.Name).Should().BeEquivalentTo(["Orch1", "Orch2", "Orch3"]); + } + + [Fact] + public void ToGrpcWorkItemFilters_WithMultipleActivities_ConvertsAll() + { + // Arrange + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = [], + Activities = + [ + new DurableTaskWorkerWorkItemFilters.ActivityFilter { Name = "Activity1", Versions = [] }, + new DurableTaskWorkerWorkItemFilters.ActivityFilter { Name = "Activity2", Versions = [] }, + ], + Entities = [], + }; + + // Act + P.WorkItemFilters result = filters.ToGrpcWorkItemFilters(); + + // Assert + result.Activities.Should().HaveCount(2); + result.Activities.Select(a => a.Name).Should().BeEquivalentTo(["Activity1", "Activity2"]); + } + + [Fact] + public void ToGrpcWorkItemFilters_WithMultipleEntities_ConvertsAll() + { + // Arrange + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = [], + Activities = [], + Entities = + [ + new DurableTaskWorkerWorkItemFilters.EntityFilter { Name = "entity1" }, + new DurableTaskWorkerWorkItemFilters.EntityFilter { Name = "entity2" }, + new DurableTaskWorkerWorkItemFilters.EntityFilter { Name = "entity3" }, + ], + }; + + // Act + P.WorkItemFilters result = filters.ToGrpcWorkItemFilters(); + + // Assert + result.Entities.Should().HaveCount(3); + result.Entities.Select(e => e.Name).Should().BeEquivalentTo(["entity1", "entity2", "entity3"]); + } + + [Fact] + public void ToGrpcWorkItemFilters_WithMixedFilters_ConvertsAll() + { + // Arrange + DurableTaskWorkerWorkItemFilters filters = new() + { + Orchestrations = + [ + new DurableTaskWorkerWorkItemFilters.OrchestrationFilter { Name = "MyOrchestrator", Versions = ["1.0"] }, + ], + Activities = + [ + new DurableTaskWorkerWorkItemFilters.ActivityFilter { Name = "MyActivity", Versions = ["1.0", "2.0"] }, + ], + Entities = + [ + new DurableTaskWorkerWorkItemFilters.EntityFilter { Name = "myentity" }, + ], + }; + + // Act + P.WorkItemFilters result = filters.ToGrpcWorkItemFilters(); + + // Assert + result.Orchestrations.Should().ContainSingle().Which.Name.Should().Be("MyOrchestrator"); + result.Orchestrations[0].Versions.Should().BeEquivalentTo(["1.0"]); + result.Activities.Should().ContainSingle().Which.Name.Should().Be("MyActivity"); + result.Activities[0].Versions.Should().BeEquivalentTo(["1.0", "2.0"]); + result.Entities.Should().ContainSingle().Which.Name.Should().Be("myentity"); + } + + [Fact] + public void WorkerConstruction_DefaultFilters_FlowToWorker() + { + // Arrange + ServiceCollection services = new(); + services.AddSingleton(new Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory()); + + services.AddDurableTaskWorker(builder => + { + builder.UseGrpc(); + builder.AddTasks(registry => + { + registry.AddOrchestrator(); + registry.AddActivity(); + }); + }); + + // Act + using ServiceProvider provider = services.BuildServiceProvider(); + IHostedService hosted = Assert.Single(provider.GetServices()); + Assert.IsType(hosted); + + DurableTaskWorkerWorkItemFilters? filters = (DurableTaskWorkerWorkItemFilters?)typeof(GrpcDurableTaskWorker) + .GetField("workItemFilters", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic)! + .GetValue(hosted); + + // Assert + filters.Should().NotBeNull(); + filters!.Orchestrations.Should().ContainSingle(o => o.Name == nameof(TestOrchestrator)); + filters.Activities.Should().ContainSingle(a => a.Name == nameof(TestActivity)); + } + + [Fact] + public void WorkerConstruction_ExplicitFilters_FlowToWorker() + { + // Arrange + ServiceCollection services = new(); + services.AddSingleton(new Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory()); + + DurableTaskWorkerWorkItemFilters customFilters = new() + { + Orchestrations = [new DurableTaskWorkerWorkItemFilters.OrchestrationFilter { Name = "CustomOrch", Versions = ["2.0"] }], + Activities = [], + Entities = [], + }; + + services.AddDurableTaskWorker(builder => + { + builder.UseGrpc(); + builder.AddTasks(registry => + { + registry.AddOrchestrator(); + registry.AddActivity(); + }); + builder.UseWorkItemFilters(customFilters); + }); + + // Act + using ServiceProvider provider = services.BuildServiceProvider(); + IHostedService hosted = Assert.Single(provider.GetServices()); + Assert.IsType(hosted); + + DurableTaskWorkerWorkItemFilters? filters = (DurableTaskWorkerWorkItemFilters?)typeof(GrpcDurableTaskWorker) + .GetField("workItemFilters", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic)! + .GetValue(hosted); + + // Assert + filters.Should().NotBeNull(); + filters!.Orchestrations.Should().ContainSingle(o => o.Name == "CustomOrch" && o.Versions.Contains("2.0")); + filters.Activities.Should().BeEmpty(); + filters.Entities.Should().BeEmpty(); + } + + [Fact] + public void WorkerConstruction_NullFilters_ClearsDefaultsOnWorker() + { + // Arrange + ServiceCollection services = new(); + services.AddSingleton(new Microsoft.Extensions.Logging.Abstractions.NullLoggerFactory()); + + services.AddDurableTaskWorker(builder => + { + builder.UseGrpc(); + builder.AddTasks(registry => + { + registry.AddOrchestrator(); + registry.AddActivity(); + }); + builder.UseWorkItemFilters(null); + }); + + // Act + using ServiceProvider provider = services.BuildServiceProvider(); + IHostedService hosted = Assert.Single(provider.GetServices()); + Assert.IsType(hosted); + + DurableTaskWorkerWorkItemFilters? filters = (DurableTaskWorkerWorkItemFilters?)typeof(GrpcDurableTaskWorker) + .GetField("workItemFilters", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic)! + .GetValue(hosted); + + // Assert + filters.Should().NotBeNull(); + filters!.Orchestrations.Should().BeEmpty(); + filters.Activities.Should().BeEmpty(); + filters.Entities.Should().BeEmpty(); + } + + sealed class TestOrchestrator : TaskOrchestrator + { + public override Task RunAsync(TaskOrchestrationContext context, object input) + { + throw new NotImplementedException(); + } + } + + sealed class TestActivity : TaskActivity + { + public override Task RunAsync(TaskActivityContext context, object input) + { + throw new NotImplementedException(); + } + } +}