From 71d758d1ec1a6d9f3eebe295af867cb8d6b88cc0 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Sun, 7 Dec 2025 15:06:35 -0800 Subject: [PATCH 01/11] Support dedup status when starting orchestration --- src/Abstractions/TaskOptions.cs | 13 ++ .../StartOrchestrationOptionsExtensions.cs | 38 ++++ src/Client/Grpc/GrpcDurableTaskClient.cs | 50 ++++- .../ShimDurableTaskClient.cs | 21 ++- .../Sidecar/Grpc/TaskHubGrpcServer.cs | 40 +++- .../GrpcDurableTaskClientIntegrationTests.cs | 174 ++++++++++++++++++ 6 files changed, 329 insertions(+), 7 deletions(-) create mode 100644 src/Client/Core/StartOrchestrationOptionsExtensions.cs diff --git a/src/Abstractions/TaskOptions.cs b/src/Abstractions/TaskOptions.cs index f8e527fe..504cf8ac 100644 --- a/src/Abstractions/TaskOptions.cs +++ b/src/Abstractions/TaskOptions.cs @@ -134,4 +134,17 @@ public record StartOrchestrationOptions(string? InstanceId = null, DateTimeOffse /// Gets the version to associate with the orchestration instance. /// public TaskVersion? Version { get; init; } + + /// + /// Gets the orchestration runtime statuses that should be considered for deduplication. + /// If an orchestration instance with the same instance ID already exists and is in one of these statuses, + /// the creation will throw an instead of creating a new instance. + /// This enables idempotent orchestration creation. + /// + /// + /// The status names should match the values from enum + /// (e.g., "Completed", "Failed", "Terminated", "Canceled"). + /// For type-safe usage, use extension methods from . + /// + public IReadOnlyList? DedupeStatuses { get; init; } } diff --git a/src/Client/Core/StartOrchestrationOptionsExtensions.cs b/src/Client/Core/StartOrchestrationOptionsExtensions.cs new file mode 100644 index 00000000..38add4ae --- /dev/null +++ b/src/Client/Core/StartOrchestrationOptionsExtensions.cs @@ -0,0 +1,38 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Client; + +/// +/// Extension methods for to provide type-safe deduplication status configuration. +/// +public static class StartOrchestrationOptionsExtensions +{ + /// + /// Gets the terminal orchestration runtime statuses that are valid for deduplication. + /// These are the statuses that can be used to prevent replacement of an existing orchestration instance. + /// + public static readonly OrchestrationRuntimeStatus[] ValidDedupeStatuses = new[] + { + OrchestrationRuntimeStatus.Completed, + OrchestrationRuntimeStatus.Failed, + OrchestrationRuntimeStatus.Terminated, + OrchestrationRuntimeStatus.Canceled, + }; + + /// + /// Creates a new with the specified deduplication statuses. + /// + /// The base options to extend. + /// The orchestration runtime statuses that should be considered for deduplication. + /// A new instance with the deduplication statuses set. + public static StartOrchestrationOptions WithDedupeStatuses( + this StartOrchestrationOptions options, + params OrchestrationRuntimeStatus[] dedupeStatuses) + { + return options with + { + DedupeStatuses = dedupeStatuses.Select(s => s.ToString()).ToList(), + }; + } +} diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index 76686892..f0b06902 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Collections.Immutable; using System.Diagnostics; using System.Text; using DurableTask.Core.History; @@ -91,7 +92,7 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( version = this.options.DefaultVersion; } - var request = new P.CreateInstanceRequest + P.CreateInstanceRequest request = new() { Name = orchestratorName.Name, Version = version, @@ -122,6 +123,47 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( request.ScheduledStartTimestamp = Timestamp.FromDateTimeOffset(startAt.Value.ToUniversalTime()); } + // Set orchestration ID reuse policy for deduplication support + // Note: This requires the protobuf to support OrchestrationIdReusePolicy field + // If the protobuf doesn't support it yet, this will need to be updated when the protobuf is updated + if (options?.DedupeStatuses != null && options.DedupeStatuses.Count > 0) + { + // Parse and validate all status strings to enum first + ImmutableHashSet dedupeStatuses = options.DedupeStatuses + .Select(s => + { + if (!System.Enum.TryParse(s, ignoreCase: true, out OrchestrationRuntimeStatus status)) + { + string validStatuses = string.Join(", ", StartOrchestrationOptionsExtensions.ValidDedupeStatuses.Select(ts => ts.ToString())); + throw new ArgumentException( + $"Invalid orchestration runtime status for deduplication: '{s}'. Valid statuses for deduplication are: {validStatuses}", + nameof(options.DedupeStatuses)); + } + + return status; + }).ToImmutableHashSet(); + + P.OrchestrationIdReusePolicy policy = new(); + + // The policy uses "replaceableStatus" - these are statuses that CAN be replaced + // dedupeStatuses are statuses that should NOT be replaced (should throw exception) + // So we add terminal statuses that are NOT in dedupeStatuses to replaceableStatus + // This matches the logic in AAPT-DTMB ProtoUtils.Convert + foreach (OrchestrationRuntimeStatus terminalStatus in StartOrchestrationOptionsExtensions.ValidDedupeStatuses) + { + if (!dedupeStatuses.Contains(terminalStatus)) + { + policy.ReplaceableStatus.Add(terminalStatus.ToGrpcStatus()); + } + } + + // Only set if we have replaceable statuses + if (policy.ReplaceableStatus.Count > 0) + { + request.OrchestrationIdReusePolicy = policy; + } + } + using Activity? newActivity = TraceHelper.StartActivityForNewOrchestration(request); P.CreateInstanceResponse? result = await this.sidecarClient.StartInstanceAsync( @@ -405,7 +447,7 @@ public override async Task RestartAsync( Check.NotNullOrEmpty(instanceId); Check.NotEntity(this.options.EnableEntitySupport, instanceId); - var request = new P.RestartInstanceRequest + P.RestartInstanceRequest request = new P.RestartInstanceRequest { InstanceId = instanceId, RestartWithNewInstanceId = restartWithNewInstanceId, @@ -441,7 +483,7 @@ public override async Task RewindInstanceAsync( Check.NotNullOrEmpty(instanceId); Check.NotEntity(this.options.EnableEntitySupport, instanceId); - var request = new P.RewindInstanceRequest + P.RewindInstanceRequest request = new P.RewindInstanceRequest { InstanceId = instanceId, Reason = reason, @@ -573,7 +615,7 @@ async Task PurgeInstancesCoreAsync( OrchestrationMetadata CreateMetadata(P.OrchestrationState state, bool includeInputsAndOutputs) { - var metadata = new OrchestrationMetadata(state.Name, state.InstanceId) + OrchestrationMetadata metadata = new OrchestrationMetadata(state.Name, state.InstanceId) { CreatedAt = state.CreatedTimestamp.ToDateTimeOffset(), LastUpdatedAt = state.LastUpdatedTimestamp.ToDateTimeOffset(), diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs index 4fbf828d..487c6b94 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs @@ -7,6 +7,7 @@ using DurableTask.Core; using DurableTask.Core.History; using DurableTask.Core.Query; +using Microsoft.DurableTask.Client; using Microsoft.DurableTask.Client.Entities; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; @@ -192,7 +193,25 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( }, }; - await this.Client.CreateTaskOrchestrationAsync(message); + Core.OrchestrationStatus[]? dedupeStatuses = null; + if (options?.DedupeStatuses != null && options.DedupeStatuses.Count > 0) + { + dedupeStatuses = options.DedupeStatuses + .Select(s => + { + if (!Enum.TryParse(s, ignoreCase: true, out var status)) + { + var validStatuses = string.Join(", ", StartOrchestrationOptionsExtensions.ValidDedupeStatuses.Select(ts => ts.ToString())); + throw new ArgumentException( + $"Invalid orchestration runtime status for deduplication: '{s}'. Valid statuses for deduplication are: {validStatuses}", + nameof(options.DedupeStatuses)); + } + return status.ConvertToCore(); + }) + .ToArray(); + } + + await this.Client.CreateTaskOrchestrationAsync(message, dedupeStatuses); return instanceId; } diff --git a/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs b/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs index 15f65dd8..0575b8d6 100644 --- a/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs +++ b/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs @@ -1,10 +1,11 @@ -// Copyright (c) Microsoft Corporation. +// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. using System.Collections.Concurrent; using System.Diagnostics; using System.Globalization; using DurableTask.Core; +using DurableTask.Core.Exceptions; using DurableTask.Core.History; using DurableTask.Core.Query; using Google.Protobuf.WellKnownTypes; @@ -202,6 +203,34 @@ async Task WaitForWorkItemClientConnection() try { + // Convert OrchestrationIdReusePolicy to dedupeStatuses + // The policy uses "replaceableStatus" - these are statuses that CAN be replaced + // dedupeStatuses are statuses that should NOT be replaced (should throw exception) + // So dedupeStatuses = all terminal statuses MINUS replaceableStatus + OrchestrationStatus[]? dedupeStatuses = null; + if (request.OrchestrationIdReusePolicy != null && request.OrchestrationIdReusePolicy.ReplaceableStatus.Count > 0) + { + var terminalStatuses = new HashSet + { + OrchestrationStatus.Completed, + OrchestrationStatus.Failed, + OrchestrationStatus.Terminated, + OrchestrationStatus.Canceled, + }; + + // Remove replaceable statuses from terminal statuses to get dedupe statuses + foreach (P.OrchestrationStatus replaceableStatus in request.OrchestrationIdReusePolicy.ReplaceableStatus) + { + terminalStatuses.Remove((OrchestrationStatus)replaceableStatus); + } + + // Only set dedupeStatuses if there are any statuses that should not be replaced + if (terminalStatuses.Count > 0) + { + dedupeStatuses = terminalStatuses.ToArray(); + } + } + await this.client.CreateTaskOrchestrationAsync( new TaskMessage { @@ -216,7 +245,14 @@ await this.client.CreateTaskOrchestrationAsync( : null }, OrchestrationInstance = instance, - }); + }, + dedupeStatuses); + } + catch (OrchestrationAlreadyExistsException e) + { + // Convert to gRPC exception + this.log.LogWarning(e, "Orchestration with ID {InstanceId} already exists", instance.InstanceId); + throw new RpcException(new Status(StatusCode.AlreadyExists, e.Message)); } catch (Exception e) { diff --git a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs index 2d5df336..8cdbda7b 100644 --- a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs @@ -4,9 +4,12 @@ using System.Diagnostics.CodeAnalysis; using FluentAssertions; using FluentAssertions.Execution; +using Grpc.Core; using Microsoft.DurableTask.Client; using Microsoft.DurableTask.Worker; using Xunit.Abstractions; +using RpcException = Grpc.Core.RpcException; +using StatusCode = Grpc.Core.StatusCode; namespace Microsoft.DurableTask.Grpc.Tests; @@ -308,6 +311,177 @@ static async Task Orchestration(TaskOrchestrationContext context, bool s }); } + [Fact] + public async Task ScheduleNewOrchestrationInstance_WithDedupeStatuses_ThrowsWhenInstanceExists() + { + await using HostTestLifetime server = await this.StartAsync(); + + string instanceId = "dedup-test-instance"; + + // Create first orchestration instance + string firstInstanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, + input: false, + new StartOrchestrationOptions(instanceId)); + + // Wait for it to complete + await server.Client.WaitForInstanceStartAsync(firstInstanceId, default); + await server.Client.RaiseEventAsync(firstInstanceId, "event", default); + await server.Client.WaitForInstanceCompletionAsync(firstInstanceId, default); + + // Verify it's completed + OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false); + metadata.Should().NotBeNull(); + metadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + + // Try to create another instance with the same ID and dedupe statuses including Completed + // This should throw an exception + Func createAction = () => server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, + input: false, + new StartOrchestrationOptions(instanceId).WithDedupeStatuses(OrchestrationRuntimeStatus.Completed)); + + await createAction.Should().ThrowAsync() + .Where(e => e.StatusCode == StatusCode.AlreadyExists); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_WithDedupeStatuses_AllowsReplacementWhenStatusNotInDedupeList() + { + await using HostTestLifetime server = await this.StartAsync(); + + string instanceId = "dedup-test-instance-replace"; + + // Create first orchestration instance + string firstInstanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, + input: false, + new StartOrchestrationOptions(instanceId)); + + // Wait for it to complete + await server.Client.WaitForInstanceStartAsync(firstInstanceId, default); + await server.Client.RaiseEventAsync(firstInstanceId, "event", default); + await server.Client.WaitForInstanceCompletionAsync(firstInstanceId, default); + + // Verify it's completed + OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false); + metadata.Should().NotBeNull(); + metadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + + // Try to create another instance with the same ID but dedupe statuses does NOT include Completed + // This should succeed (replace the existing instance) + string secondInstanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, + input: false, + new StartOrchestrationOptions(instanceId).WithDedupeStatuses( + OrchestrationRuntimeStatus.Failed, + OrchestrationRuntimeStatus.Terminated)); + + secondInstanceId.Should().Be(instanceId); + + // Wait for the new instance to start running + await server.Client.WaitForInstanceStartAsync(instanceId, default); + + // Verify the new instance is running + OrchestrationMetadata? newMetadata = await server.Client.GetInstanceAsync(instanceId, false); + newMetadata.Should().NotBeNull(); + newMetadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Running); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_WithDedupeStatuses_ThrowsWhenInstanceIsFailed() + { + await using HostTestLifetime server = await this.StartAsync(); + + string instanceId = "dedup-test-instance-failed"; + + // Create first orchestration instance that will fail + string firstInstanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, + input: true, // true means it will throw + new StartOrchestrationOptions(instanceId)); + + // Wait for it to fail + await server.Client.WaitForInstanceStartAsync(firstInstanceId, default); + await server.Client.RaiseEventAsync(firstInstanceId, "event", default); + await server.Client.WaitForInstanceCompletionAsync(firstInstanceId, default); + + // Verify it's failed + OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false); + metadata.Should().NotBeNull(); + metadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Failed); + + // Try to create another instance with the same ID and dedupe statuses including Failed + // This should throw an exception + Func createAction = () => server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, + input: false, + new StartOrchestrationOptions(instanceId).WithDedupeStatuses(OrchestrationRuntimeStatus.Failed)); + + await createAction.Should().ThrowAsync() + .Where(e => e.StatusCode == StatusCode.AlreadyExists); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_WithDedupeStatuses_AllowsCreationWhenInstanceDoesNotExist() + { + await using HostTestLifetime server = await this.StartAsync(); + + string instanceId = "dedup-test-instance-new"; + + // Create instance with dedupe statuses - should succeed since instance doesn't exist + string createdInstanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, + input: false, + new StartOrchestrationOptions(instanceId).WithDedupeStatuses( + OrchestrationRuntimeStatus.Completed, + OrchestrationRuntimeStatus.Failed)); + + createdInstanceId.Should().Be(instanceId); + + // Verify the instance was created + OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false); + metadata.Should().NotBeNull(); + metadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Running); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_WithMultipleDedupeStatuses_ThrowsWhenAnyStatusMatches() + { + await using HostTestLifetime server = await this.StartAsync(); + + string instanceId = "dedup-test-instance-multiple"; + + // Create first orchestration instance + string firstInstanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, + input: false, + new StartOrchestrationOptions(instanceId)); + + // Wait for it to complete + await server.Client.WaitForInstanceStartAsync(firstInstanceId, default); + await server.Client.RaiseEventAsync(firstInstanceId, "event", default); + await server.Client.WaitForInstanceCompletionAsync(firstInstanceId, default); + + // Verify it's completed + OrchestrationMetadata? metadata = await server.Client.GetInstanceAsync(instanceId, false); + metadata.Should().NotBeNull(); + metadata!.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + + // Try to create another instance with multiple dedupe statuses including Completed + // This should throw an exception + Func createAction = () => server.Client.ScheduleNewOrchestrationInstanceAsync( + OrchestrationName, + input: false, + new StartOrchestrationOptions(instanceId).WithDedupeStatuses( + OrchestrationRuntimeStatus.Completed, + OrchestrationRuntimeStatus.Failed, + OrchestrationRuntimeStatus.Terminated)); + + await createAction.Should().ThrowAsync() + .Where(e => e.StatusCode == StatusCode.AlreadyExists); + } + class DateTimeToleranceComparer : IEqualityComparer { public bool Equals(DateTimeOffset x, DateTimeOffset y) => (x - y).Duration() < TimeSpan.FromMilliseconds(100); From f433e54aeeceefa04f61d41395cf689e5dc528ea Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Mon, 8 Dec 2025 15:24:07 -0800 Subject: [PATCH 02/11] tests fix --- .../ShimDurableTaskClient.cs | 2 +- .../ShimDurableTaskClientTests.cs | 14 +++++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs index 487c6b94..8ffea529 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs @@ -322,7 +322,7 @@ public override async Task RestartAsync( }, }; - await this.Client.CreateTaskOrchestrationAsync(message); + await this.Client.CreateTaskOrchestrationAsync(message, dedupeStatuses: null); return newInstanceId; } diff --git a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs index fa65cc42..3cb10c74 100644 --- a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs +++ b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs @@ -350,8 +350,8 @@ public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId) // Capture the TaskMessage for verification becasue we will create this message at RestartAsync. TaskMessage? capturedMessage = null; this.orchestrationClient - .Setup(x => x.CreateTaskOrchestrationAsync(It.IsAny())) - .Callback(msg => capturedMessage = msg) + .Setup(x => x.CreateTaskOrchestrationAsync(It.IsAny(), It.IsAny())) + .Callback((msg, _) => capturedMessage = msg) .Returns(Task.CompletedTask); string restartedInstanceId = await this.client.RestartAsync(originalInstanceId, restartWithNewInstanceId); @@ -367,7 +367,7 @@ public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId) // Verify that CreateTaskOrchestrationAsync was called this.orchestrationClient.Verify( - x => x.CreateTaskOrchestrationAsync(It.IsAny()), + x => x.CreateTaskOrchestrationAsync(It.IsAny(), It.IsAny()), Times.Once); // Verify the captured message details @@ -534,7 +534,9 @@ async Task RunScheduleNewOrchestrationInstanceAsync( { // arrange this.orchestrationClient.Setup( - m => m.CreateTaskOrchestrationAsync(MatchStartExecutionMessage(name, input, options))) + m => m.CreateTaskOrchestrationAsync( + MatchStartExecutionMessage(name, input, options), + It.IsAny())) .Returns(Task.CompletedTask); // act @@ -542,7 +544,9 @@ async Task RunScheduleNewOrchestrationInstanceAsync( // assert this.orchestrationClient.Verify( - m => m.CreateTaskOrchestrationAsync(MatchStartExecutionMessage(name, input, options)), + m => m.CreateTaskOrchestrationAsync( + MatchStartExecutionMessage(name, input, options), + It.IsAny()), Times.Once()); if (options?.InstanceId is string str) From fe00211285ad6e7bc3e9472c64053f6217f57df1 Mon Sep 17 00:00:00 2001 From: wangbill Date: Mon, 8 Dec 2025 15:35:30 -0800 Subject: [PATCH 03/11] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/Abstractions/TaskOptions.cs | 4 ++-- src/Client/Grpc/GrpcDurableTaskClient.cs | 7 ++----- .../GrpcDurableTaskClientIntegrationTests.cs | 2 +- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/Abstractions/TaskOptions.cs b/src/Abstractions/TaskOptions.cs index 504cf8ac..aadb5672 100644 --- a/src/Abstractions/TaskOptions.cs +++ b/src/Abstractions/TaskOptions.cs @@ -138,13 +138,13 @@ public record StartOrchestrationOptions(string? InstanceId = null, DateTimeOffse /// /// Gets the orchestration runtime statuses that should be considered for deduplication. /// If an orchestration instance with the same instance ID already exists and is in one of these statuses, - /// the creation will throw an instead of creating a new instance. + /// the creation will throw an exception instead of creating a new instance. /// This enables idempotent orchestration creation. /// /// /// The status names should match the values from enum /// (e.g., "Completed", "Failed", "Terminated", "Canceled"). - /// For type-safe usage, use extension methods from . + /// For type-safe usage, use extension methods from . /// public IReadOnlyList? DedupeStatuses { get; init; } } diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index f0b06902..67429711 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -149,12 +149,9 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( // dedupeStatuses are statuses that should NOT be replaced (should throw exception) // So we add terminal statuses that are NOT in dedupeStatuses to replaceableStatus // This matches the logic in AAPT-DTMB ProtoUtils.Convert - foreach (OrchestrationRuntimeStatus terminalStatus in StartOrchestrationOptionsExtensions.ValidDedupeStatuses) + foreach (OrchestrationRuntimeStatus terminalStatus in StartOrchestrationOptionsExtensions.ValidDedupeStatuses.Where(ts => !dedupeStatuses.Contains(ts))) { - if (!dedupeStatuses.Contains(terminalStatus)) - { - policy.ReplaceableStatus.Add(terminalStatus.ToGrpcStatus()); - } + policy.ReplaceableStatus.Add(terminalStatus.ToGrpcStatus()); } // Only set if we have replaceable statuses diff --git a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs index 8cdbda7b..e64865a9 100644 --- a/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs +++ b/test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs @@ -318,7 +318,7 @@ public async Task ScheduleNewOrchestrationInstance_WithDedupeStatuses_ThrowsWhen string instanceId = "dedup-test-instance"; - // Create first orchestration instance + // Schedule and complete first orchestration instance string firstInstanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync( OrchestrationName, input: false, From 9531d65a346b61914c0d0b264e24a82acd986969 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Mon, 8 Dec 2025 16:36:20 -0800 Subject: [PATCH 04/11] client side no status check --- src/Abstractions/TaskOptions.cs | 4 ---- src/Client/Core/StartOrchestrationOptionsExtensions.cs | 6 ++++-- src/Client/Grpc/GrpcDurableTaskClient.cs | 8 +++----- .../ShimDurableTaskClient.cs | 4 ++-- 4 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/Abstractions/TaskOptions.cs b/src/Abstractions/TaskOptions.cs index aadb5672..a579bfe9 100644 --- a/src/Abstractions/TaskOptions.cs +++ b/src/Abstractions/TaskOptions.cs @@ -137,13 +137,9 @@ public record StartOrchestrationOptions(string? InstanceId = null, DateTimeOffse /// /// Gets the orchestration runtime statuses that should be considered for deduplication. - /// If an orchestration instance with the same instance ID already exists and is in one of these statuses, - /// the creation will throw an exception instead of creating a new instance. - /// This enables idempotent orchestration creation. /// /// /// The status names should match the values from enum - /// (e.g., "Completed", "Failed", "Terminated", "Canceled"). /// For type-safe usage, use extension methods from . /// public IReadOnlyList? DedupeStatuses { get; init; } diff --git a/src/Client/Core/StartOrchestrationOptionsExtensions.cs b/src/Client/Core/StartOrchestrationOptionsExtensions.cs index 38add4ae..eac1801a 100644 --- a/src/Client/Core/StartOrchestrationOptionsExtensions.cs +++ b/src/Client/Core/StartOrchestrationOptionsExtensions.cs @@ -9,8 +9,10 @@ namespace Microsoft.DurableTask.Client; public static class StartOrchestrationOptionsExtensions { /// - /// Gets the terminal orchestration runtime statuses that are valid for deduplication. - /// These are the statuses that can be used to prevent replacement of an existing orchestration instance. + /// Gets the terminal orchestration runtime statuses commonly used for deduplication. + /// These are typically the statuses used to prevent replacement of an existing orchestration instance. + /// Note: Any value can be used for deduplication; + /// this collection is provided for convenience and reference only. /// public static readonly OrchestrationRuntimeStatus[] ValidDedupeStatuses = new[] { diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index 67429711..125354b3 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -134,9 +134,9 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( { if (!System.Enum.TryParse(s, ignoreCase: true, out OrchestrationRuntimeStatus status)) { - string validStatuses = string.Join(", ", StartOrchestrationOptionsExtensions.ValidDedupeStatuses.Select(ts => ts.ToString())); + string validStatuses = string.Join(", ", System.Enum.GetNames(typeof(OrchestrationRuntimeStatus))); throw new ArgumentException( - $"Invalid orchestration runtime status for deduplication: '{s}'. Valid statuses for deduplication are: {validStatuses}", + $"Invalid orchestration runtime status: '{s}' for deduplication. Valid values are: {validStatuses}", nameof(options.DedupeStatuses)); } @@ -146,9 +146,7 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( P.OrchestrationIdReusePolicy policy = new(); // The policy uses "replaceableStatus" - these are statuses that CAN be replaced - // dedupeStatuses are statuses that should NOT be replaced (should throw exception) - // So we add terminal statuses that are NOT in dedupeStatuses to replaceableStatus - // This matches the logic in AAPT-DTMB ProtoUtils.Convert + // dedupeStatuses are statuses that should NOT be replaced foreach (OrchestrationRuntimeStatus terminalStatus in StartOrchestrationOptionsExtensions.ValidDedupeStatuses.Where(ts => !dedupeStatuses.Contains(ts))) { policy.ReplaceableStatus.Add(terminalStatus.ToGrpcStatus()); diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs index 8ffea529..b0553000 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs @@ -201,9 +201,9 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( { if (!Enum.TryParse(s, ignoreCase: true, out var status)) { - var validStatuses = string.Join(", ", StartOrchestrationOptionsExtensions.ValidDedupeStatuses.Select(ts => ts.ToString())); + var validStatuses = string.Join(", ", Enum.GetNames(typeof(OrchestrationRuntimeStatus))); throw new ArgumentException( - $"Invalid orchestration runtime status for deduplication: '{s}'. Valid statuses for deduplication are: {validStatuses}", + $"Invalid orchestration runtime status: '{s}' for deduplication. Valid values are: {validStatuses}", nameof(options.DedupeStatuses)); } return status.ConvertToCore(); From 6182e724d4cf7921cc03e0d10b0b7c036a93a654 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Tue, 9 Dec 2025 09:15:24 -0800 Subject: [PATCH 05/11] more tests --- .../Abstractions.Tests.csproj | 1 + test/Abstractions.Tests/TaskOptionsTests.cs | 144 +++++++++++++++--- 2 files changed, 128 insertions(+), 17 deletions(-) diff --git a/test/Abstractions.Tests/Abstractions.Tests.csproj b/test/Abstractions.Tests/Abstractions.Tests.csproj index 9e3b1ad5..665f53f6 100644 --- a/test/Abstractions.Tests/Abstractions.Tests.csproj +++ b/test/Abstractions.Tests/Abstractions.Tests.csproj @@ -6,6 +6,7 @@ + diff --git a/test/Abstractions.Tests/TaskOptionsTests.cs b/test/Abstractions.Tests/TaskOptionsTests.cs index fe4a101e..3e805a9e 100644 --- a/test/Abstractions.Tests/TaskOptionsTests.cs +++ b/test/Abstractions.Tests/TaskOptionsTests.cs @@ -1,13 +1,15 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -namespace Microsoft.DurableTask.Tests; - -public class TaskOptionsTests -{ - [Fact] - public void Empty_Ctors_Okay() - { +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask.Client; + +namespace Microsoft.DurableTask.Tests; + +public class TaskOptionsTests +{ + [Fact] + public void Empty_Ctors_Okay() + { TaskOptions options = new(); options.Retry.Should().BeNull(); options.Tags.Should().BeNull(); @@ -21,21 +23,129 @@ public void Empty_Ctors_Okay() startOptions.Version.Should().BeNull(); startOptions.InstanceId.Should().BeNull(); startOptions.StartAt.Should().BeNull(); - startOptions.Tags.Should().BeEmpty(); + startOptions.Tags.Should().BeEmpty(); } - [Fact] - public void SubOrchestrationOptions_InstanceId_Correct() + [Fact] + public void SubOrchestrationOptions_InstanceId_Correct() { string instanceId = Guid.NewGuid().ToString(); SubOrchestrationOptions subOptions = new(new TaskOptions(), instanceId); instanceId.Equals(subOptions.InstanceId).Should().BeTrue(); string subInstanceId = Guid.NewGuid().ToString(); - subOptions = new(new SubOrchestrationOptions(instanceId: subInstanceId)); + subOptions = new(new SubOrchestrationOptions(instanceId: subInstanceId)); subInstanceId.Equals(subOptions.InstanceId).Should().BeTrue(); - subOptions = new(new SubOrchestrationOptions(instanceId: subInstanceId), instanceId); + subOptions = new(new SubOrchestrationOptions(instanceId: subInstanceId), instanceId); instanceId.Equals(subOptions.InstanceId).Should().BeTrue(); - } -} + } + + [Fact] + public void WithDedupeStatuses_SetsCorrectStringValues() + { + // Arrange + StartOrchestrationOptions options = new(); + OrchestrationRuntimeStatus[] statuses = new[] + { + OrchestrationRuntimeStatus.Completed, + OrchestrationRuntimeStatus.Failed, + OrchestrationRuntimeStatus.Terminated, + }; + + // Act + StartOrchestrationOptions result = options.WithDedupeStatuses(statuses); + + // Assert + result.DedupeStatuses.Should().NotBeNull(); + result.DedupeStatuses.Should().HaveCount(3); + result.DedupeStatuses.Should().Contain("Completed"); + result.DedupeStatuses.Should().Contain("Failed"); + result.DedupeStatuses.Should().Contain("Terminated"); + } + + [Fact] + public void WithDedupeStatuses_HandlesEmptyArray() + { + // Arrange + StartOrchestrationOptions options = new(); + + // Act + StartOrchestrationOptions result = options.WithDedupeStatuses(); + + // Assert + result.DedupeStatuses.Should().NotBeNull(); + result.DedupeStatuses.Should().BeEmpty(); + } + + [Fact] + public void WithDedupeStatuses_HandlesEmptyArrayExplicit() + { + // Arrange + StartOrchestrationOptions options = new(); + OrchestrationRuntimeStatus[] statuses = Array.Empty(); + + // Act + StartOrchestrationOptions result = options.WithDedupeStatuses(statuses); + + // Assert + result.DedupeStatuses.Should().NotBeNull(); + result.DedupeStatuses.Should().BeEmpty(); + } + + [Fact] + public void WithDedupeStatuses_PreservesOtherProperties() + { + // Arrange + string instanceId = Guid.NewGuid().ToString(); + DateTimeOffset startAt = DateTimeOffset.UtcNow.AddHours(1); + StartOrchestrationOptions options = new(instanceId, startAt); + + // Act + StartOrchestrationOptions result = options.WithDedupeStatuses( + OrchestrationRuntimeStatus.Completed, + OrchestrationRuntimeStatus.Failed); + + // Assert + result.InstanceId.Should().Be(instanceId); + result.StartAt.Should().Be(startAt); + result.DedupeStatuses.Should().NotBeNull(); + result.DedupeStatuses.Should().HaveCount(2); + } + + [Fact] + public void ValidDedupeStatuses_ContainsExpectedTerminalStatuses() + { + // Act +#pragma warning disable CS0618 // Type or member is obsolete - Canceled is intentionally included for compatibility + OrchestrationRuntimeStatus[] validStatuses = StartOrchestrationOptionsExtensions.ValidDedupeStatuses; + + // Assert + validStatuses.Should().NotBeNull(); + validStatuses.Should().HaveCount(4); + validStatuses.Should().Contain(OrchestrationRuntimeStatus.Completed); + validStatuses.Should().Contain(OrchestrationRuntimeStatus.Failed); + validStatuses.Should().Contain(OrchestrationRuntimeStatus.Terminated); + validStatuses.Should().Contain(OrchestrationRuntimeStatus.Canceled); +#pragma warning restore CS0618 + } + + [Fact] + public void WithDedupeStatuses_ConvertsAllEnumValuesToStrings() + { + // Arrange + StartOrchestrationOptions options = new(); + OrchestrationRuntimeStatus[] allStatuses = Enum.GetValues(); + + // Act + StartOrchestrationOptions result = options.WithDedupeStatuses(allStatuses); + + // Assert + result.DedupeStatuses.Should().NotBeNull(); + result.DedupeStatuses.Should().HaveCount(allStatuses.Length); + foreach (OrchestrationRuntimeStatus status in allStatuses) + { + result.DedupeStatuses.Should().Contain(status.ToString()); + } + } +} From da2d895583075d29a722c57cefcc583f0269046a Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Tue, 9 Dec 2025 09:24:25 -0800 Subject: [PATCH 06/11] refactor --- src/Client/Grpc/GrpcDurableTaskClient.cs | 14 +-- src/Client/Grpc/ProtoUtils.cs | 87 ++++++++++++++++++- .../Sidecar/Grpc/TaskHubGrpcServer.cs | 26 ++---- 3 files changed, 96 insertions(+), 31 deletions(-) diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index 125354b3..063f9cb8 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -143,17 +143,11 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( return status; }).ToImmutableHashSet(); - P.OrchestrationIdReusePolicy policy = new(); + // Convert dedupe statuses to protobuf statuses and create reuse policy + IEnumerable dedupeStatusesProto = dedupeStatuses.Select(s => s.ToGrpcStatus()); + P.OrchestrationIdReusePolicy? policy = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatusesProto); - // The policy uses "replaceableStatus" - these are statuses that CAN be replaced - // dedupeStatuses are statuses that should NOT be replaced - foreach (OrchestrationRuntimeStatus terminalStatus in StartOrchestrationOptionsExtensions.ValidDedupeStatuses.Where(ts => !dedupeStatuses.Contains(ts))) - { - policy.ReplaceableStatus.Add(terminalStatus.ToGrpcStatus()); - } - - // Only set if we have replaceable statuses - if (policy.ReplaceableStatus.Count > 0) + if (policy != null) { request.OrchestrationIdReusePolicy = policy; } diff --git a/src/Client/Grpc/ProtoUtils.cs b/src/Client/Grpc/ProtoUtils.cs index f5bc750d..ab1af588 100644 --- a/src/Client/Grpc/ProtoUtils.cs +++ b/src/Client/Grpc/ProtoUtils.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Collections.Immutable; using P = Microsoft.DurableTask.Protobuf; namespace Microsoft.DurableTask.Client.Grpc; @@ -8,8 +9,92 @@ namespace Microsoft.DurableTask.Client.Grpc; /// /// Protobuf helpers and utilities. /// -static class ProtoUtils +public static class ProtoUtils { + /// + /// Gets the terminal orchestration statuses that are commonly used for deduplication. + /// These are the statuses that can be used in OrchestrationIdReusePolicy. + /// + /// An immutable array of terminal orchestration statuses. + public static ImmutableArray GetTerminalStatuses() + { +#pragma warning disable CS0618 // Type or member is obsolete - Canceled is intentionally included for compatibility + return ImmutableArray.Create( + P.OrchestrationStatus.Completed, + P.OrchestrationStatus.Failed, + P.OrchestrationStatus.Terminated, + P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + } + + /// + /// Converts dedupe statuses (statuses that should NOT be replaced) to an OrchestrationIdReusePolicy + /// with replaceable statuses (statuses that CAN be replaced). + /// + /// The orchestration statuses that should NOT be replaced. These are statuses for which an exception should be thrown if an orchestration already exists. + /// An OrchestrationIdReusePolicy with replaceable statuses set, or null if all terminal statuses are dedupe statuses. + /// + /// The policy uses "replaceableStatus" - these are statuses that CAN be replaced. + /// dedupeStatuses are statuses that should NOT be replaced. + /// So replaceableStatus = all terminal statuses MINUS dedupeStatuses. + /// + public static P.OrchestrationIdReusePolicy? ConvertDedupeStatusesToReusePolicy( + IEnumerable dedupeStatuses) + { + ImmutableArray terminalStatuses = GetTerminalStatuses(); + ImmutableHashSet dedupeStatusSet = dedupeStatuses.ToImmutableHashSet(); + + P.OrchestrationIdReusePolicy policy = new(); + + // Add terminal statuses that are NOT in dedupeStatuses as replaceable + foreach (P.OrchestrationStatus terminalStatus in terminalStatuses) + { + if (!dedupeStatusSet.Contains(terminalStatus)) + { + policy.ReplaceableStatus.Add(terminalStatus); + } + } + + // Only return policy if we have replaceable statuses + return policy.ReplaceableStatus.Count > 0 ? policy : null; + } + + /// + /// Converts an OrchestrationIdReusePolicy with replaceable statuses to dedupe statuses + /// (statuses that should NOT be replaced). + /// + /// The OrchestrationIdReusePolicy containing replaceable statuses. + /// An array of orchestration statuses that should NOT be replaced, or null if all terminal statuses are replaceable. + /// + /// The policy uses "replaceableStatus" - these are statuses that CAN be replaced. + /// dedupeStatuses are statuses that should NOT be replaced (should throw exception). + /// So dedupeStatuses = all terminal statuses MINUS replaceableStatus. + /// + public static P.OrchestrationStatus[]? ConvertReusePolicyToDedupeStatuses( + P.OrchestrationIdReusePolicy? policy) + { + if (policy == null || policy.ReplaceableStatus.Count == 0) + { + return null; + } + + ImmutableArray terminalStatuses = GetTerminalStatuses(); + ImmutableHashSet replaceableStatusSet = policy.ReplaceableStatus.ToImmutableHashSet(); + + // Calculate dedupe statuses = terminal statuses - replaceable statuses + List dedupeStatuses = new(); + foreach (P.OrchestrationStatus terminalStatus in terminalStatuses) + { + if (!replaceableStatusSet.Contains(terminalStatus)) + { + dedupeStatuses.Add(terminalStatus); + } + } + + // Only return if there are dedupe statuses + return dedupeStatuses.Count > 0 ? dedupeStatuses.ToArray() : null; + } + #pragma warning disable 0618 // Referencing Obsolete member. This is intention as we are only converting it. /// /// Converts to . diff --git a/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs b/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs index 0575b8d6..bbb162c3 100644 --- a/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs +++ b/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs @@ -4,12 +4,14 @@ using System.Collections.Concurrent; using System.Diagnostics; using System.Globalization; +using System.Linq; using DurableTask.Core; using DurableTask.Core.Exceptions; using DurableTask.Core.History; using DurableTask.Core.Query; using Google.Protobuf.WellKnownTypes; using Grpc.Core; +using Microsoft.DurableTask.Client.Grpc; using Microsoft.DurableTask.Testing.Sidecar.Dispatcher; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -208,27 +210,11 @@ async Task WaitForWorkItemClientConnection() // dedupeStatuses are statuses that should NOT be replaced (should throw exception) // So dedupeStatuses = all terminal statuses MINUS replaceableStatus OrchestrationStatus[]? dedupeStatuses = null; - if (request.OrchestrationIdReusePolicy != null && request.OrchestrationIdReusePolicy.ReplaceableStatus.Count > 0) + P.OrchestrationStatus[]? dedupeStatusesProto = ProtoUtils.ConvertReusePolicyToDedupeStatuses(request.OrchestrationIdReusePolicy); + if (dedupeStatusesProto != null) { - var terminalStatuses = new HashSet - { - OrchestrationStatus.Completed, - OrchestrationStatus.Failed, - OrchestrationStatus.Terminated, - OrchestrationStatus.Canceled, - }; - - // Remove replaceable statuses from terminal statuses to get dedupe statuses - foreach (P.OrchestrationStatus replaceableStatus in request.OrchestrationIdReusePolicy.ReplaceableStatus) - { - terminalStatuses.Remove((OrchestrationStatus)replaceableStatus); - } - - // Only set dedupeStatuses if there are any statuses that should not be replaced - if (terminalStatuses.Count > 0) - { - dedupeStatuses = terminalStatuses.ToArray(); - } + // Convert protobuf statuses to Core.OrchestrationStatus + dedupeStatuses = dedupeStatusesProto.Select(s => (OrchestrationStatus)s).ToArray(); } await this.client.CreateTaskOrchestrationAsync( From beb06f608f32bacf1264681e633a1c0aaef5e810 Mon Sep 17 00:00:00 2001 From: wangbill Date: Tue, 9 Dec 2025 09:57:06 -0800 Subject: [PATCH 07/11] Potential fix for pull request finding 'Missed opportunity to use Where' Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com> --- src/Client/Grpc/ProtoUtils.cs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/Client/Grpc/ProtoUtils.cs b/src/Client/Grpc/ProtoUtils.cs index ab1af588..423b60b6 100644 --- a/src/Client/Grpc/ProtoUtils.cs +++ b/src/Client/Grpc/ProtoUtils.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. using System.Collections.Immutable; +using System.Linq; using P = Microsoft.DurableTask.Protobuf; namespace Microsoft.DurableTask.Client.Grpc; @@ -47,12 +48,9 @@ public static class ProtoUtils P.OrchestrationIdReusePolicy policy = new(); // Add terminal statuses that are NOT in dedupeStatuses as replaceable - foreach (P.OrchestrationStatus terminalStatus in terminalStatuses) + foreach (P.OrchestrationStatus terminalStatus in terminalStatuses.Where(status => !dedupeStatusSet.Contains(status))) { - if (!dedupeStatusSet.Contains(terminalStatus)) - { - policy.ReplaceableStatus.Add(terminalStatus); - } + policy.ReplaceableStatus.Add(terminalStatus); } // Only return policy if we have replaceable statuses From 8bc9eb9df4bd2d783a9c4f4d7736ba176b9e5b71 Mon Sep 17 00:00:00 2001 From: wangbill Date: Tue, 9 Dec 2025 09:57:58 -0800 Subject: [PATCH 08/11] Update test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../ShimDurableTaskClientTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs index 3cb10c74..6e67fe68 100644 --- a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs +++ b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs @@ -347,7 +347,7 @@ public async Task RestartAsync_EndToEnd(bool restartWithNewInstanceId) .Setup(x => x.GetOrchestrationStateAsync(originalInstanceId, false)) .ReturnsAsync(new List { originalState }); - // Capture the TaskMessage for verification becasue we will create this message at RestartAsync. + // Capture the TaskMessage for verification because we will create this message at RestartAsync. TaskMessage? capturedMessage = null; this.orchestrationClient .Setup(x => x.CreateTaskOrchestrationAsync(It.IsAny(), It.IsAny())) From 29aab391d9260f30f38e7b58689b8c0fd8db27db Mon Sep 17 00:00:00 2001 From: wangbill Date: Tue, 9 Dec 2025 09:58:29 -0800 Subject: [PATCH 09/11] Update src/Client/Grpc/ProtoUtils.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/Client/Grpc/ProtoUtils.cs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/Client/Grpc/ProtoUtils.cs b/src/Client/Grpc/ProtoUtils.cs index 423b60b6..8174242f 100644 --- a/src/Client/Grpc/ProtoUtils.cs +++ b/src/Client/Grpc/ProtoUtils.cs @@ -80,17 +80,12 @@ public static class ProtoUtils ImmutableHashSet replaceableStatusSet = policy.ReplaceableStatus.ToImmutableHashSet(); // Calculate dedupe statuses = terminal statuses - replaceable statuses - List dedupeStatuses = new(); - foreach (P.OrchestrationStatus terminalStatus in terminalStatuses) - { - if (!replaceableStatusSet.Contains(terminalStatus)) - { - dedupeStatuses.Add(terminalStatus); - } - } + P.OrchestrationStatus[] dedupeStatuses = terminalStatuses + .Where(terminalStatus => !replaceableStatusSet.Contains(terminalStatus)) + .ToArray(); // Only return if there are dedupe statuses - return dedupeStatuses.Count > 0 ? dedupeStatuses.ToArray() : null; + return dedupeStatuses.Length > 0 ? dedupeStatuses : null; } #pragma warning disable 0618 // Referencing Obsolete member. This is intention as we are only converting it. From 4dd9648e3a554b888d7b239328bc6d82f08871d1 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Tue, 9 Dec 2025 12:10:55 -0800 Subject: [PATCH 10/11] save --- src/Abstractions/TaskOptions.cs | 3 +-- src/Client/Core/StartOrchestrationOptionsExtensions.cs | 8 ++------ src/Client/Grpc/GrpcDurableTaskClient.cs | 4 +--- 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/src/Abstractions/TaskOptions.cs b/src/Abstractions/TaskOptions.cs index a579bfe9..c38144b6 100644 --- a/src/Abstractions/TaskOptions.cs +++ b/src/Abstractions/TaskOptions.cs @@ -139,8 +139,7 @@ public record StartOrchestrationOptions(string? InstanceId = null, DateTimeOffse /// Gets the orchestration runtime statuses that should be considered for deduplication. /// /// - /// The status names should match the values from enum - /// For type-safe usage, use extension methods from . + /// For type-safe usage, use the WithDedupeStatuses extension method. /// public IReadOnlyList? DedupeStatuses { get; init; } } diff --git a/src/Client/Core/StartOrchestrationOptionsExtensions.cs b/src/Client/Core/StartOrchestrationOptionsExtensions.cs index eac1801a..4bfac52f 100644 --- a/src/Client/Core/StartOrchestrationOptionsExtensions.cs +++ b/src/Client/Core/StartOrchestrationOptionsExtensions.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Linq; + namespace Microsoft.DurableTask.Client; /// @@ -8,12 +10,6 @@ namespace Microsoft.DurableTask.Client; /// public static class StartOrchestrationOptionsExtensions { - /// - /// Gets the terminal orchestration runtime statuses commonly used for deduplication. - /// These are typically the statuses used to prevent replacement of an existing orchestration instance. - /// Note: Any value can be used for deduplication; - /// this collection is provided for convenience and reference only. - /// public static readonly OrchestrationRuntimeStatus[] ValidDedupeStatuses = new[] { OrchestrationRuntimeStatus.Completed, diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index 063f9cb8..b6a6c72d 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -134,10 +134,8 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( { if (!System.Enum.TryParse(s, ignoreCase: true, out OrchestrationRuntimeStatus status)) { - string validStatuses = string.Join(", ", System.Enum.GetNames(typeof(OrchestrationRuntimeStatus))); throw new ArgumentException( - $"Invalid orchestration runtime status: '{s}' for deduplication. Valid values are: {validStatuses}", - nameof(options.DedupeStatuses)); + $"Invalid orchestration runtime status: '{s}' for deduplication."); } return status; From 9cfbb41019baa237e15f0bca878f6e75a6add636 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Tue, 9 Dec 2025 12:43:36 -0800 Subject: [PATCH 11/11] feedback --- src/Client/Grpc/ProtoUtils.cs | 4 +- .../ShimDurableTaskClient.cs | 4 +- .../Grpc.Tests/GrpcDurableTaskClientTests.cs | 131 +++++ test/Client/Grpc.Tests/ProtoUtilsTests.cs | 463 ++++++++++++++++++ .../ShimDurableTaskClientTests.cs | 189 +++++++ 5 files changed, 786 insertions(+), 5 deletions(-) create mode 100644 test/Client/Grpc.Tests/GrpcDurableTaskClientTests.cs create mode 100644 test/Client/Grpc.Tests/ProtoUtilsTests.cs diff --git a/src/Client/Grpc/ProtoUtils.cs b/src/Client/Grpc/ProtoUtils.cs index 8174242f..f307f43f 100644 --- a/src/Client/Grpc/ProtoUtils.cs +++ b/src/Client/Grpc/ProtoUtils.cs @@ -40,10 +40,10 @@ public static class ProtoUtils /// So replaceableStatus = all terminal statuses MINUS dedupeStatuses. /// public static P.OrchestrationIdReusePolicy? ConvertDedupeStatusesToReusePolicy( - IEnumerable dedupeStatuses) + IEnumerable? dedupeStatuses) { ImmutableArray terminalStatuses = GetTerminalStatuses(); - ImmutableHashSet dedupeStatusSet = dedupeStatuses.ToImmutableHashSet(); + ImmutableHashSet dedupeStatusSet = dedupeStatuses?.ToImmutableHashSet() ?? ImmutableHashSet.Empty; P.OrchestrationIdReusePolicy policy = new(); diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs index b0553000..f6b6140f 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs @@ -201,10 +201,8 @@ public override async Task ScheduleNewOrchestrationInstanceAsync( { if (!Enum.TryParse(s, ignoreCase: true, out var status)) { - var validStatuses = string.Join(", ", Enum.GetNames(typeof(OrchestrationRuntimeStatus))); throw new ArgumentException( - $"Invalid orchestration runtime status: '{s}' for deduplication. Valid values are: {validStatuses}", - nameof(options.DedupeStatuses)); + $"Invalid orchestration runtime status: '{s}' for deduplication."); } return status.ConvertToCore(); }) diff --git a/test/Client/Grpc.Tests/GrpcDurableTaskClientTests.cs b/test/Client/Grpc.Tests/GrpcDurableTaskClientTests.cs new file mode 100644 index 00000000..8d098106 --- /dev/null +++ b/test/Client/Grpc.Tests/GrpcDurableTaskClientTests.cs @@ -0,0 +1,131 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Grpc.Core; +using Microsoft.DurableTask.Client; +using Microsoft.Extensions.Logging; + +namespace Microsoft.DurableTask.Client.Grpc.Tests; + +public class GrpcDurableTaskClientTests +{ + readonly Mock loggerMock = new(); + + GrpcDurableTaskClient CreateClient() + { + var callInvoker = Mock.Of(); + var options = new GrpcDurableTaskClientOptions + { + CallInvoker = callInvoker, + }; + + return new GrpcDurableTaskClient("test", options, this.loggerMock.Object); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstanceAsync_InvalidDedupeStatus_ThrowsArgumentException() + { + // Arrange + var client = this.CreateClient(); + var startOptions = new StartOrchestrationOptions + { + DedupeStatuses = new[] { "InvalidStatus", "AnotherInvalidStatus" }, + }; + + // Act & Assert + Func act = async () => await client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + startOptions); + + var exception = await act.Should().ThrowAsync(); + exception.Which.Message.Should().Contain("Invalid orchestration runtime status: 'InvalidStatus' for deduplication."); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstanceAsync_InvalidDedupeStatus_ContainsInvalidStatusInMessage() + { + // Arrange + var client = this.CreateClient(); + var startOptions = new StartOrchestrationOptions + { + DedupeStatuses = new[] { "NonExistentStatus" }, + }; + + // Act & Assert + Func act = async () => await client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + startOptions); + + var exception = await act.Should().ThrowAsync(); + exception.Which.Message.Should().Contain("'NonExistentStatus'"); + exception.Which.Message.Should().Contain("for deduplication"); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstanceAsync_MixedValidAndInvalidStatus_ThrowsArgumentException() + { + // Arrange + var client = this.CreateClient(); + var startOptions = new StartOrchestrationOptions + { + DedupeStatuses = new[] { "Completed", "InvalidStatus", "Failed" }, + }; + + // Act & Assert + Func act = async () => await client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + startOptions); + + var exception = await act.Should().ThrowAsync(); + exception.Which.Message.Should().Contain("Invalid orchestration runtime status: 'InvalidStatus' for deduplication."); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstanceAsync_CaseInsensitiveValidStatus_DoesNotThrowArgumentException() + { + // Arrange + var client = this.CreateClient(); + var startOptions = new StartOrchestrationOptions + { + DedupeStatuses = new[] { "completed", "FAILED", "Terminated" }, + }; + + // Act & Assert - Case-insensitive parsing should work, so no ArgumentException should be thrown + // The call will fail at the gRPC level, but validation should pass + Func act = async () => await client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + startOptions); + + // Should not throw ArgumentException for invalid status (case-insensitive parsing works) + // It may throw other exceptions due to gRPC call failure, but not ArgumentException + var exception = await act.Should().ThrowAsync(); + exception.Which.Should().NotBeOfType(); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstanceAsync_ValidDedupeStatus_DoesNotThrowArgumentException() + { + // Arrange + var client = this.CreateClient(); + var startOptions = new StartOrchestrationOptions + { + DedupeStatuses = new[] { "Completed", "Failed" }, + }; + + // Act & Assert - Valid statuses should pass validation + // The call will fail at the gRPC level, but validation should pass + Func act = async () => await client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + startOptions); + + // Should not throw ArgumentException for invalid status since "Completed" and "Failed" are valid + var exception = await act.Should().ThrowAsync(); + exception.Which.Should().NotBeOfType(); + } +} + diff --git a/test/Client/Grpc.Tests/ProtoUtilsTests.cs b/test/Client/Grpc.Tests/ProtoUtilsTests.cs new file mode 100644 index 00000000..4db7a884 --- /dev/null +++ b/test/Client/Grpc.Tests/ProtoUtilsTests.cs @@ -0,0 +1,463 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Collections.Immutable; +using P = Microsoft.DurableTask.Protobuf; + +namespace Microsoft.DurableTask.Client.Grpc.Tests; + +public class ProtoUtilsTests +{ + [Fact] + public void GetTerminalStatuses_ReturnsExpectedStatuses() + { + // Act + ImmutableArray terminalStatuses = ProtoUtils.GetTerminalStatuses(); + + // Assert + terminalStatuses.Should().HaveCount(4); + terminalStatuses.Should().Contain(P.OrchestrationStatus.Completed); + terminalStatuses.Should().Contain(P.OrchestrationStatus.Failed); + terminalStatuses.Should().Contain(P.OrchestrationStatus.Terminated); +#pragma warning disable CS0618 // Type or member is obsolete + terminalStatuses.Should().Contain(P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + } + + [Fact] + public void GetTerminalStatuses_ReturnsImmutableArray() + { + // Act + ImmutableArray terminalStatuses = ProtoUtils.GetTerminalStatuses(); + + // Assert + terminalStatuses.IsDefault.Should().BeFalse(); + terminalStatuses.IsEmpty.Should().BeFalse(); + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_EmptyArray_ReturnsPolicyWithAllTerminalStatuses() + { + // Arrange + var dedupeStatuses = Array.Empty(); + + // Act + P.OrchestrationIdReusePolicy? result = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + + // Assert + // Empty array means no dedupe statuses, so all terminal statuses are replaceable + result.Should().NotBeNull(); + result!.ReplaceableStatus.Should().HaveCount(4); + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_AllTerminalStatuses_ReturnsNull() + { + // Arrange + ImmutableArray allTerminalStatuses = ProtoUtils.GetTerminalStatuses(); + var dedupeStatuses = allTerminalStatuses.ToArray(); + + // Act + P.OrchestrationIdReusePolicy? result = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + + // Assert + result.Should().BeNull(); + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_NoDedupeStatuses_ReturnsPolicyWithAllTerminalStatuses() + { + // Arrange + var dedupeStatuses = Array.Empty(); + + // Act + P.OrchestrationIdReusePolicy? result = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + + // Assert + // When no dedupe statuses, all terminal statuses should be replaceable + result.Should().NotBeNull(); + result!.ReplaceableStatus.Should().HaveCount(4); + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Completed); + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Failed); + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Terminated); +#pragma warning disable CS0618 // Type or member is obsolete + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_SingleDedupeStatus_ReturnsPolicyWithRemainingStatuses() + { + // Arrange + var dedupeStatuses = new[] { P.OrchestrationStatus.Completed }; + + // Act + P.OrchestrationIdReusePolicy? result = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + + // Assert + result.Should().NotBeNull(); + result!.ReplaceableStatus.Should().HaveCount(3); + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Failed); + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Terminated); +#pragma warning disable CS0618 // Type or member is obsolete + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + result.ReplaceableStatus.Should().NotContain(P.OrchestrationStatus.Completed); + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_MultipleDedupeStatuses_ReturnsPolicyWithRemainingStatuses() + { + // Arrange + var dedupeStatuses = new[] + { + P.OrchestrationStatus.Completed, + P.OrchestrationStatus.Failed + }; + + // Act + P.OrchestrationIdReusePolicy? result = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + + // Assert + result.Should().NotBeNull(); + result!.ReplaceableStatus.Should().HaveCount(2); + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Terminated); +#pragma warning disable CS0618 // Type or member is obsolete + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + result.ReplaceableStatus.Should().NotContain(P.OrchestrationStatus.Completed); + result.ReplaceableStatus.Should().NotContain(P.OrchestrationStatus.Failed); + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_DuplicateDedupeStatuses_HandlesDuplicates() + { + // Arrange + var dedupeStatuses = new[] + { + P.OrchestrationStatus.Completed, + P.OrchestrationStatus.Completed, + P.OrchestrationStatus.Failed + }; + + // Act + P.OrchestrationIdReusePolicy? result = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + + // Assert + result.Should().NotBeNull(); + result!.ReplaceableStatus.Should().HaveCount(2); + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Terminated); +#pragma warning disable CS0618 // Type or member is obsolete + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_NonTerminalStatus_IgnoresNonTerminalStatus() + { + // Arrange + var dedupeStatuses = new[] + { + P.OrchestrationStatus.Completed, + P.OrchestrationStatus.Running, // Non-terminal status + P.OrchestrationStatus.Pending // Non-terminal status + }; + + // Act + P.OrchestrationIdReusePolicy? result = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + + // Assert + result.Should().NotBeNull(); + result!.ReplaceableStatus.Should().HaveCount(3); + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Failed); + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Terminated); +#pragma warning disable CS0618 // Type or member is obsolete + result.ReplaceableStatus.Should().Contain(P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + result.ReplaceableStatus.Should().NotContain(P.OrchestrationStatus.Completed); + } + + [Fact] + public void ConvertReusePolicyToDedupeStatuses_NullPolicy_ReturnsNull() + { + // Arrange + P.OrchestrationIdReusePolicy? policy = null; + + // Act + P.OrchestrationStatus[]? result = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + result.Should().BeNull(); + } + + [Fact] + public void ConvertReusePolicyToDedupeStatuses_EmptyPolicy_ReturnsNull() + { + // Arrange + var policy = new P.OrchestrationIdReusePolicy(); + + // Act + P.OrchestrationStatus[]? result = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + result.Should().BeNull(); + } + + [Fact] + public void ConvertReusePolicyToDedupeStatuses_AllTerminalStatusesReplaceable_ReturnsNull() + { + // Arrange + var policy = new P.OrchestrationIdReusePolicy(); + ImmutableArray terminalStatuses = ProtoUtils.GetTerminalStatuses(); + foreach (var status in terminalStatuses) + { + policy.ReplaceableStatus.Add(status); + } + + // Act + P.OrchestrationStatus[]? result = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + result.Should().BeNull(); + } + + [Fact] + public void ConvertReusePolicyToDedupeStatuses_SingleReplaceableStatus_ReturnsRemainingStatuses() + { + // Arrange + var policy = new P.OrchestrationIdReusePolicy(); + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Completed); + + // Act + P.OrchestrationStatus[]? result = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + result.Should().NotBeNull(); + result!.Should().HaveCount(3); + result.Should().Contain(P.OrchestrationStatus.Failed); + result.Should().Contain(P.OrchestrationStatus.Terminated); +#pragma warning disable CS0618 // Type or member is obsolete + result.Should().Contain(P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + result.Should().NotContain(P.OrchestrationStatus.Completed); + } + + [Fact] + public void ConvertReusePolicyToDedupeStatuses_MultipleReplaceableStatuses_ReturnsRemainingStatuses() + { + // Arrange + var policy = new P.OrchestrationIdReusePolicy(); + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Completed); + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Failed); + + // Act + P.OrchestrationStatus[]? result = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + result.Should().NotBeNull(); + result!.Should().HaveCount(2); + result.Should().Contain(P.OrchestrationStatus.Terminated); +#pragma warning disable CS0618 // Type or member is obsolete + result.Should().Contain(P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + result.Should().NotContain(P.OrchestrationStatus.Completed); + result.Should().NotContain(P.OrchestrationStatus.Failed); + } + + [Fact] + public void ConvertReusePolicyToDedupeStatuses_NonTerminalStatusInPolicy_IgnoresNonTerminalStatus() + { + // Arrange + var policy = new P.OrchestrationIdReusePolicy(); + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Completed); + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Running); // Non-terminal status + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Pending); // Non-terminal status + + // Act + P.OrchestrationStatus[]? result = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + result.Should().NotBeNull(); + result!.Should().HaveCount(3); + result.Should().Contain(P.OrchestrationStatus.Failed); + result.Should().Contain(P.OrchestrationStatus.Terminated); +#pragma warning disable CS0618 // Type or member is obsolete + result.Should().Contain(P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + result.Should().NotContain(P.OrchestrationStatus.Completed); + } + + [Fact] + public void ConvertReusePolicyToDedupeStatuses_DuplicateReplaceableStatuses_HandlesDuplicates() + { + // Arrange + var policy = new P.OrchestrationIdReusePolicy(); + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Completed); + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Completed); // Duplicate + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Failed); + + // Act + P.OrchestrationStatus[]? result = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + result.Should().NotBeNull(); + result!.Should().HaveCount(2); + result.Should().Contain(P.OrchestrationStatus.Terminated); +#pragma warning disable CS0618 // Type or member is obsolete + result.Should().Contain(P.OrchestrationStatus.Canceled); +#pragma warning restore CS0618 + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_ThenConvertBack_ReturnsOriginalDedupeStatuses() + { + // Arrange + var originalDedupeStatuses = new[] + { + P.OrchestrationStatus.Completed, + P.OrchestrationStatus.Failed + }; + + // Act + P.OrchestrationIdReusePolicy? policy = ProtoUtils.ConvertDedupeStatusesToReusePolicy(originalDedupeStatuses); + P.OrchestrationStatus[]? convertedBack = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + convertedBack.Should().NotBeNull(); + convertedBack!.Should().BeEquivalentTo(originalDedupeStatuses); + } + + [Fact] + public void ConvertReusePolicyToDedupeStatuses_ThenConvertBack_ReturnsOriginalPolicy() + { + // Arrange + var policy = new P.OrchestrationIdReusePolicy(); + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Completed); + policy.ReplaceableStatus.Add(P.OrchestrationStatus.Failed); + + // Act + P.OrchestrationStatus[]? dedupeStatuses = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + P.OrchestrationIdReusePolicy? convertedBack = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + + // Assert + convertedBack.Should().NotBeNull(); + convertedBack!.ReplaceableStatus.Should().BeEquivalentTo(policy.ReplaceableStatus); + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_AllStatuses_ThenConvertBack_ReturnsNull() + { + // Arrange + ImmutableArray allTerminalStatuses = ProtoUtils.GetTerminalStatuses(); + var dedupeStatuses = allTerminalStatuses.ToArray(); + + // Act + P.OrchestrationIdReusePolicy? policy = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + P.OrchestrationStatus[]? convertedBack = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + policy.Should().BeNull(); + convertedBack.Should().BeNull(); + } + + [Fact] + public void ConvertReusePolicyToDedupeStatuses_AllStatuses_ThenConvertBack_ReturnsPolicyWithAllStatuses() + { + // Arrange + var policy = new P.OrchestrationIdReusePolicy(); + ImmutableArray terminalStatuses = ProtoUtils.GetTerminalStatuses(); + foreach (var status in terminalStatuses) + { + policy.ReplaceableStatus.Add(status); + } + + // Act + P.OrchestrationStatus[]? dedupeStatuses = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + P.OrchestrationIdReusePolicy? convertedBack = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + + // Assert + // Policy with all statuses -> no dedupe statuses -> null + // null dedupe statuses -> all are replaceable -> policy with all statuses + dedupeStatuses.Should().BeNull(); + convertedBack.Should().NotBeNull(); + convertedBack!.ReplaceableStatus.Should().HaveCount(4); + convertedBack.ReplaceableStatus.Should().BeEquivalentTo(policy.ReplaceableStatus); + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_EmptyArray_ThenConvertBack_ReturnsNull() + { + // Arrange + var dedupeStatuses = Array.Empty(); + + // Act + P.OrchestrationIdReusePolicy? policy = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + P.OrchestrationStatus[]? convertedBack = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + // Empty dedupe statuses -> all terminal statuses are replaceable -> policy with all statuses + // Policy with all statuses -> no dedupe statuses -> null + policy.Should().NotBeNull(); + convertedBack.Should().BeNull(); + } + + [Fact] + public void ConvertReusePolicyToDedupeStatuses_EmptyPolicy_ThenConvertBack_ReturnsPolicyWithAllStatuses() + { + // Arrange + var policy = new P.OrchestrationIdReusePolicy(); + + // Act + P.OrchestrationStatus[]? dedupeStatuses = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + P.OrchestrationIdReusePolicy? convertedBack = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + + // Assert + // Empty policy (no replaceable statuses) -> ConvertReusePolicyToDedupeStatuses returns null + // null dedupe statuses -> all terminal statuses are replaceable -> policy with all statuses + dedupeStatuses.Should().BeNull(); + convertedBack.Should().NotBeNull(); + convertedBack!.ReplaceableStatus.Should().HaveCount(4); + } + + [Theory] + [InlineData(P.OrchestrationStatus.Completed)] + [InlineData(P.OrchestrationStatus.Failed)] + [InlineData(P.OrchestrationStatus.Terminated)] + public void ConvertDedupeStatusesToReusePolicy_SingleStatus_ThenConvertBack_ReturnsOriginal( + P.OrchestrationStatus dedupeStatus) + { + // Arrange + var dedupeStatuses = new[] { dedupeStatus }; + + // Act + P.OrchestrationIdReusePolicy? policy = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + P.OrchestrationStatus[]? convertedBack = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + convertedBack.Should().NotBeNull(); + convertedBack!.Should().ContainSingle(); + convertedBack.Should().Contain(dedupeStatus); + } + + [Fact] + public void ConvertDedupeStatusesToReusePolicy_ThreeOutOfFourStatuses_ThenConvertBack_ReturnsOriginal() + { + // Arrange + var dedupeStatuses = new[] + { + P.OrchestrationStatus.Completed, + P.OrchestrationStatus.Failed, + P.OrchestrationStatus.Terminated + }; + + // Act + P.OrchestrationIdReusePolicy? policy = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatuses); + P.OrchestrationStatus[]? convertedBack = ProtoUtils.ConvertReusePolicyToDedupeStatuses(policy); + + // Assert + convertedBack.Should().NotBeNull(); + convertedBack!.Should().HaveCount(3); + convertedBack.Should().BeEquivalentTo(dedupeStatuses); + } +} + diff --git a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs index 6e67fe68..9509d1e0 100644 --- a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs +++ b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs @@ -326,6 +326,195 @@ public async Task ScheduleNewOrchestrationInstance_IdProvided_TagsProvided() await this.RunScheduleNewOrchestrationInstanceAsync("test", "input", options); } + [Fact] + public async Task ScheduleNewOrchestrationInstance_InvalidDedupeStatus_ThrowsArgumentException() + { + // Arrange + StartOrchestrationOptions options = new() + { + DedupeStatuses = new[] { "InvalidStatus" }, + }; + + // Act & Assert + Func act = async () => await this.client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + options, + default); + + var exception = await act.Should().ThrowAsync(); + exception.Which.Message.Should().Contain("Invalid orchestration runtime status: 'InvalidStatus' for deduplication."); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_InvalidDedupeStatus_ContainsInvalidStatusInMessage() + { + // Arrange + StartOrchestrationOptions options = new() + { + DedupeStatuses = new[] { "NonExistentStatus" }, + }; + + // Act & Assert + Func act = async () => await this.client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + options, + default); + + var exception = await act.Should().ThrowAsync(); + exception.Which.Message.Should().Contain("'NonExistentStatus'"); + exception.Which.Message.Should().Contain("for deduplication"); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_MultipleInvalidDedupeStatuses_ThrowsOnFirstInvalid() + { + // Arrange + StartOrchestrationOptions options = new() + { + DedupeStatuses = new[] { "InvalidStatus1", "InvalidStatus2", "InvalidStatus3" }, + }; + + // Act & Assert + Func act = async () => await this.client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + options, + default); + + var exception = await act.Should().ThrowAsync(); + exception.Which.Message.Should().Contain("'InvalidStatus1'"); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_MixedValidAndInvalidStatus_ThrowsArgumentException() + { + // Arrange + StartOrchestrationOptions options = new() + { + DedupeStatuses = new[] { "Completed", "InvalidStatus", "Failed" }, + }; + + // Act & Assert + Func act = async () => await this.client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + options, + default); + + var exception = await act.Should().ThrowAsync(); + exception.Which.Message.Should().Contain("Invalid orchestration runtime status: 'InvalidStatus' for deduplication."); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_ValidDedupeStatuses_DoesNotThrow() + { + // Arrange + StartOrchestrationOptions options = new() + { + DedupeStatuses = new[] { "Completed", "Failed" }, + }; + + // Setup the mock to handle the call + this.orchestrationClient.Setup( + m => m.CreateTaskOrchestrationAsync( + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + + // Act & Assert - Should not throw ArgumentException for invalid status + Func act = async () => await this.client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + options, + default); + + await act.Should().NotThrowAsync(); + this.orchestrationClient.VerifyAll(); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_CaseInsensitiveValidStatus_DoesNotThrow() + { + // Arrange + StartOrchestrationOptions options = new() + { + DedupeStatuses = new[] { "completed", "FAILED", "Terminated" }, + }; + + // Setup the mock to handle the call + this.orchestrationClient.Setup( + m => m.CreateTaskOrchestrationAsync( + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + + // Act & Assert - Case-insensitive parsing should work + Func act = async () => await this.client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + options, + default); + + await act.Should().NotThrowAsync(); + this.orchestrationClient.VerifyAll(); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_EmptyDedupeStatuses_DoesNotThrow() + { + // Arrange + StartOrchestrationOptions options = new() + { + DedupeStatuses = new List(), + }; + + // Setup the mock to handle the call + this.orchestrationClient.Setup( + m => m.CreateTaskOrchestrationAsync( + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + + // Act & Assert + Func act = async () => await this.client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + options, + default); + + await act.Should().NotThrowAsync(); + this.orchestrationClient.VerifyAll(); + } + + [Fact] + public async Task ScheduleNewOrchestrationInstance_NullDedupeStatuses_DoesNotThrow() + { + // Arrange + StartOrchestrationOptions options = new() + { + DedupeStatuses = null, + }; + + // Setup the mock to handle the call + this.orchestrationClient.Setup( + m => m.CreateTaskOrchestrationAsync( + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + + // Act & Assert + Func act = async () => await this.client.ScheduleNewOrchestrationInstanceAsync( + new TaskName("TestOrchestration"), + input: null, + options, + default); + + await act.Should().NotThrowAsync(); + this.orchestrationClient.VerifyAll(); + } + [Theory] [InlineData(false)] [InlineData(true)]