diff --git a/src/Extensions/AzureBlobPayloads/DependencyInjection/DurableTaskWorkerBuilderExtensions.AzureBlobPayloads.cs b/src/Extensions/AzureBlobPayloads/DependencyInjection/DurableTaskWorkerBuilderExtensions.AzureBlobPayloads.cs index d65e30b82..b690d288c 100644 --- a/src/Extensions/AzureBlobPayloads/DependencyInjection/DurableTaskWorkerBuilderExtensions.AzureBlobPayloads.cs +++ b/src/Extensions/AzureBlobPayloads/DependencyInjection/DurableTaskWorkerBuilderExtensions.AzureBlobPayloads.cs @@ -8,6 +8,7 @@ using Microsoft.DurableTask.Worker.Grpc; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; +using P = Microsoft.DurableTask.Protobuf; namespace Microsoft.DurableTask; @@ -77,6 +78,8 @@ static IDurableTaskWorkerBuilder UseExternalizedPayloadsCore(IDurableTaskWorkerB throw new ArgumentException( "Channel or CallInvoker must be provided to use Azure Blob Payload Externalization feature"); } + + opt.Capabilities.Add(P.WorkerCapability.LargePayloads); }); return builder; diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index d59bca3ca..1a86c0a27 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -692,14 +692,14 @@ message AbandonEntityTaskResponse { } message SkipGracefulOrchestrationTerminationsRequest { - InstanceBatch instanceBatch = 1; - google.protobuf.StringValue reason = 2; + InstanceBatch instanceBatch = 1; + google.protobuf.StringValue reason = 2; } message SkipGracefulOrchestrationTerminationsResponse { // Those instances which could not be terminated because they had locked entities at the time of this termination call, // are already in a terminal state (completed, failed, terminated, etc.), are not orchestrations, or do not exist (i.e. have been purged) - repeated string unterminatedInstanceIds = 1; + repeated string unterminatedInstanceIds = 1; } service TaskHubSidecarService { @@ -797,6 +797,16 @@ enum WorkerCapability { // When set, the service may return work items without any history events as an optimization. // It is strongly recommended that all SDKs support this capability. WORKER_CAPABILITY_HISTORY_STREAMING = 1; + + // Indicates that the worker supports scheduled tasks. + // The service may send schedule-triggered orchestration work items, + // and the worker must handle them, including the scheduledTime field. + WORKER_CAPABILITY_SCHEDULED_TASKS = 2; + + // Signals that the worker can handle large payloads stored externally (e.g., Blob Storage). + // Work items may contain URI references instead of inline data, and the worker must fetch them. + // This avoids message size limits and reduces network overhead. + WORKER_CAPABILITY_LARGE_PAYLOADS = 3; } message WorkItem { diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index f3ec814a8..69b075cd3 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch main at 2025-11-04 23:19:51 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/8c0d166673593700cfa9d0b123cd55e025b2846e/protos/orchestrator_service.proto +# The following files were downloaded from branch main at 2025-11-14 16:36:47 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/9f762f1301b91e3e7c736b9c5a29c2e09f2a850e/protos/orchestrator_service.proto diff --git a/src/ScheduledTasks/Extension/DurableTaskWorkerBuilderExtensions.cs b/src/ScheduledTasks/Extension/DurableTaskWorkerBuilderExtensions.cs index 893e35f28..7e8257668 100644 --- a/src/ScheduledTasks/Extension/DurableTaskWorkerBuilderExtensions.cs +++ b/src/ScheduledTasks/Extension/DurableTaskWorkerBuilderExtensions.cs @@ -1,6 +1,9 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. using Microsoft.DurableTask.Worker; +using Microsoft.DurableTask.Worker.Grpc; +using Microsoft.Extensions.DependencyInjection; +using P = Microsoft.DurableTask.Protobuf; namespace Microsoft.DurableTask.ScheduledTasks; @@ -20,5 +23,13 @@ public static void UseScheduledTasks(this IDurableTaskWorkerBuilder builder) r.AddEntity(); r.AddOrchestrator(); }); + + // Register the capability for gRPC workers + builder.Services + .AddOptions(builder.Name) + .PostConfigure(opt => + { + opt.Capabilities.Add(P.WorkerCapability.ScheduledTasks); + }); } } diff --git a/src/ScheduledTasks/ScheduledTasks.csproj b/src/ScheduledTasks/ScheduledTasks.csproj index a9951609a..7283fb12e 100644 --- a/src/ScheduledTasks/ScheduledTasks.csproj +++ b/src/ScheduledTasks/ScheduledTasks.csproj @@ -11,6 +11,7 @@ + diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index 4dfa70f7f..ffd38cf1f 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -254,7 +254,7 @@ async ValueTask BuildRuntimeStateAsync( workerOptions.Concurrency.MaximumConcurrentOrchestrationWorkItems, MaxConcurrentEntityWorkItems = workerOptions.Concurrency.MaximumConcurrentEntityWorkItems, - Capabilities = { P.WorkerCapability.HistoryStreaming }, + Capabilities = { this.worker.grpcOptions.Capabilities }, }, cancellationToken: cancellation); } diff --git a/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs b/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs index be277a3a9..78d520878 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using P = Microsoft.DurableTask.Protobuf; + namespace Microsoft.DurableTask.Worker.Grpc; /// @@ -23,6 +25,12 @@ public sealed class GrpcDurableTaskWorkerOptions : DurableTaskWorkerOptions /// public CallInvoker? CallInvoker { get; set; } + /// + /// Gets the collection of capabilities enabled on this worker. + /// Capabilities are announced to the backend on connection. + /// + public HashSet Capabilities { get; } = new() { P.WorkerCapability.HistoryStreaming }; + /// /// Gets the internal protocol options. These are used to control backend-dependent features. /// diff --git a/test/Worker/Grpc.Tests/Worker.Grpc.Tests.csproj b/test/Worker/Grpc.Tests/Worker.Grpc.Tests.csproj index cda2aa55a..b3c7f9c13 100644 --- a/test/Worker/Grpc.Tests/Worker.Grpc.Tests.csproj +++ b/test/Worker/Grpc.Tests/Worker.Grpc.Tests.csproj @@ -12,4 +12,9 @@ + + + + + diff --git a/test/Worker/Grpc.Tests/WorkerCapabilitiesTests.cs b/test/Worker/Grpc.Tests/WorkerCapabilitiesTests.cs new file mode 100644 index 000000000..6dd3adc75 --- /dev/null +++ b/test/Worker/Grpc.Tests/WorkerCapabilitiesTests.cs @@ -0,0 +1,289 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.Extensions.DependencyInjection; +using P = Microsoft.DurableTask.Protobuf; +#if NET6_0_OR_GREATER +using Microsoft.DurableTask; +using Microsoft.DurableTask.ScheduledTasks; +#endif + +namespace Microsoft.DurableTask.Worker.Grpc.Tests; + +/// +/// Tests for worker capabilities configuration. +/// +public class WorkerCapabilitiesTests +{ + [Fact] + public void DefaultCapabilities_IncludesHistoryStreaming() + { + // Arrange + ServiceCollection services = new(); + DefaultDurableTaskWorkerBuilder builder = new(null, services); + builder.UseGrpc(); + + // Act + IServiceProvider provider = services.BuildServiceProvider(); + GrpcDurableTaskWorkerOptions options = provider.GetOptions(); + + // Assert + options.Capabilities.Should().NotBeNull(); + options.Capabilities.Should().Contain(P.WorkerCapability.HistoryStreaming); + options.Capabilities.Should().HaveCount(1); + } + + [Fact] + public void Capabilities_CanBeManuallyAdded() + { + // Arrange + ServiceCollection services = new(); + DefaultDurableTaskWorkerBuilder builder = new(null, services); + builder.UseGrpc(opt => + { + opt.Capabilities.Add(P.WorkerCapability.ScheduledTasks); + opt.Capabilities.Add(P.WorkerCapability.LargePayloads); + }); + + // Act + IServiceProvider provider = services.BuildServiceProvider(); + GrpcDurableTaskWorkerOptions options = provider.GetOptions(); + + // Assert + options.Capabilities.Should().NotBeNull(); + options.Capabilities.Should().Contain(P.WorkerCapability.HistoryStreaming); + options.Capabilities.Should().Contain(P.WorkerCapability.ScheduledTasks); + options.Capabilities.Should().Contain(P.WorkerCapability.LargePayloads); + options.Capabilities.Should().HaveCount(3); + } + + [Fact] + public void Capabilities_CanBeAddedViaPostConfigure() + { + // Arrange + ServiceCollection services = new(); + DefaultDurableTaskWorkerBuilder builder = new(null, services); + builder.UseGrpc(); + + // Add capability via PostConfigure + services.Configure(builder.Name, opt => + { + opt.Capabilities.Add(P.WorkerCapability.ScheduledTasks); + }); + + // Act + IServiceProvider provider = services.BuildServiceProvider(); + GrpcDurableTaskWorkerOptions options = provider.GetOptions(); + + // Assert + options.Capabilities.Should().NotBeNull(); + options.Capabilities.Should().Contain(P.WorkerCapability.HistoryStreaming); + options.Capabilities.Should().Contain(P.WorkerCapability.ScheduledTasks); + options.Capabilities.Should().HaveCount(2); + } + + [Fact] + public void Capabilities_AddingDuplicateDoesNotCreateDuplicates() + { + // Arrange + ServiceCollection services = new(); + DefaultDurableTaskWorkerBuilder builder = new(null, services); + builder.UseGrpc(opt => + { + // HistoryStreaming is already in the default set + opt.Capabilities.Add(P.WorkerCapability.HistoryStreaming); + opt.Capabilities.Add(P.WorkerCapability.HistoryStreaming); + }); + + // Act + IServiceProvider provider = services.BuildServiceProvider(); + GrpcDurableTaskWorkerOptions options = provider.GetOptions(); + + // Assert + options.Capabilities.Should().NotBeNull(); + options.Capabilities.Should().Contain(P.WorkerCapability.HistoryStreaming); + // HashSet prevents duplicates + options.Capabilities.Should().HaveCount(1); + } + + [Fact] + public void Capabilities_CanBeClearedAndReset() + { + // Arrange + ServiceCollection services = new(); + DefaultDurableTaskWorkerBuilder builder = new(null, services); + builder.UseGrpc(opt => + { + opt.Capabilities.Clear(); + opt.Capabilities.Add(P.WorkerCapability.ScheduledTasks); + }); + + // Act + IServiceProvider provider = services.BuildServiceProvider(); + GrpcDurableTaskWorkerOptions options = provider.GetOptions(); + + // Assert + options.Capabilities.Should().NotBeNull(); + options.Capabilities.Should().NotContain(P.WorkerCapability.HistoryStreaming); + options.Capabilities.Should().Contain(P.WorkerCapability.ScheduledTasks); + options.Capabilities.Should().HaveCount(1); + } + + [Fact] + public void Capabilities_AreIndependentPerBuilder() + { + // Arrange + ServiceCollection services = new(); + DefaultDurableTaskWorkerBuilder builder1 = new("worker1", services); + DefaultDurableTaskWorkerBuilder builder2 = new("worker2", services); + + builder1.UseGrpc(opt => opt.Capabilities.Add(P.WorkerCapability.ScheduledTasks)); + builder2.UseGrpc(opt => opt.Capabilities.Add(P.WorkerCapability.LargePayloads)); + + // Act + IServiceProvider provider = services.BuildServiceProvider(); + GrpcDurableTaskWorkerOptions options1 = provider.GetOptions("worker1"); + GrpcDurableTaskWorkerOptions options2 = provider.GetOptions("worker2"); + + // Assert + options1.Capabilities.Should().Contain(P.WorkerCapability.HistoryStreaming); + options1.Capabilities.Should().Contain(P.WorkerCapability.ScheduledTasks); + options1.Capabilities.Should().NotContain(P.WorkerCapability.LargePayloads); + + options2.Capabilities.Should().Contain(P.WorkerCapability.HistoryStreaming); + options2.Capabilities.Should().Contain(P.WorkerCapability.LargePayloads); + options2.Capabilities.Should().NotContain(P.WorkerCapability.ScheduledTasks); + } + +#if NET6_0_OR_GREATER + [Fact] + public void UseScheduledTasks_AddsScheduledTasksCapability() + { + // Arrange + ServiceCollection services = new(); + DefaultDurableTaskWorkerBuilder builder = new(null, services); + builder.UseGrpc(); + builder.UseScheduledTasks(); + + // Act + IServiceProvider provider = services.BuildServiceProvider(); + GrpcDurableTaskWorkerOptions options = provider.GetOptions(); + + // Assert + options.Capabilities.Should().NotBeNull(); + options.Capabilities.Should().Contain(P.WorkerCapability.HistoryStreaming); + options.Capabilities.Should().Contain(P.WorkerCapability.ScheduledTasks); + options.Capabilities.Should().HaveCount(2); + } + + [Fact] + public void UseScheduledTasks_WithExistingCapabilities_PreservesOtherCapabilities() + { + // Arrange + ServiceCollection services = new(); + DefaultDurableTaskWorkerBuilder builder = new(null, services); + builder.UseGrpc(opt => + { + opt.Capabilities.Add(P.WorkerCapability.LargePayloads); + }); + builder.UseScheduledTasks(); + + // Act + IServiceProvider provider = services.BuildServiceProvider(); + GrpcDurableTaskWorkerOptions options = provider.GetOptions(); + + // Assert + options.Capabilities.Should().NotBeNull(); + options.Capabilities.Should().Contain(P.WorkerCapability.HistoryStreaming); + options.Capabilities.Should().Contain(P.WorkerCapability.ScheduledTasks); + options.Capabilities.Should().Contain(P.WorkerCapability.LargePayloads); + options.Capabilities.Should().HaveCount(3); + } + + [Fact] + public void UseExternalizedPayloads_AddsLargePayloadsCapability() + { + // Arrange + ServiceCollection services = new(); + DefaultDurableTaskWorkerBuilder builder = new(null, services); + GrpcChannel channel = GetChannel(); + builder.UseGrpc(channel); + + // Register required dependencies for UseExternalizedPayloads + services.AddSingleton(new TestPayloadStore()); + services.Configure(builder.Name, opt => + { + opt.ContainerName = "test"; + opt.ConnectionString = "UseDevelopmentStorage=true"; + }); + + builder.UseExternalizedPayloads(); + + // Act + IServiceProvider provider = services.BuildServiceProvider(); + GrpcDurableTaskWorkerOptions options = provider.GetOptions(); + + // Assert + options.Capabilities.Should().NotBeNull(); + options.Capabilities.Should().Contain(P.WorkerCapability.HistoryStreaming); + options.Capabilities.Should().Contain(P.WorkerCapability.LargePayloads); + options.Capabilities.Should().HaveCount(2); + } + + [Fact] + public void UseExternalizedPayloads_WithExistingCapabilities_PreservesOtherCapabilities() + { + // Arrange + ServiceCollection services = new(); + DefaultDurableTaskWorkerBuilder builder = new(null, services); + GrpcChannel channel = GetChannel(); + builder.UseGrpc(opt => + { + opt.Channel = channel; + opt.Capabilities.Add(P.WorkerCapability.ScheduledTasks); + }); + + // Register required dependencies for UseExternalizedPayloads + services.AddSingleton(new TestPayloadStore()); + services.Configure(builder.Name, opt => + { + opt.ContainerName = "test"; + opt.ConnectionString = "UseDevelopmentStorage=true"; + }); + + builder.UseExternalizedPayloads(); + + // Act + IServiceProvider provider = services.BuildServiceProvider(); + GrpcDurableTaskWorkerOptions options = provider.GetOptions(); + + // Assert + options.Capabilities.Should().NotBeNull(); + options.Capabilities.Should().Contain(P.WorkerCapability.HistoryStreaming); + options.Capabilities.Should().Contain(P.WorkerCapability.ScheduledTasks); + options.Capabilities.Should().Contain(P.WorkerCapability.LargePayloads); + options.Capabilities.Should().HaveCount(3); + } + + static GrpcChannel GetChannel() => GrpcChannel.ForAddress("http://localhost:9001"); + + sealed class TestPayloadStore : PayloadStore + { + public override Task UploadAsync(string payLoad, CancellationToken cancellationToken) + { + return Task.FromResult($"token:{Guid.NewGuid()}"); + } + + public override Task DownloadAsync(string token, CancellationToken cancellationToken) + { + return Task.FromResult(string.Empty); + } + + public override bool IsKnownPayloadToken(string value) + { + return value.StartsWith("token:", StringComparison.Ordinal); + } + } +#endif +} +