Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/Abstractions/TaskOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,13 @@
/// Gets the version to associate with the orchestration instance.
/// </summary>
public TaskVersion? Version { get; init; }

/// <summary>
/// Gets the orchestration runtime statuses that should be considered for deduplication.
/// </summary>
/// <remarks>
/// The status names should match the values from <see cref="Microsoft.DurableTask.Client.OrchestrationRuntimeStatus"/> enum

Check warning on line 142 in src/Abstractions/TaskOptions.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

XML comment has cref attribute 'OrchestrationRuntimeStatus' that could not be resolved

Check warning on line 142 in src/Abstractions/TaskOptions.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has cref attribute 'OrchestrationRuntimeStatus' that could not be resolved
/// For type-safe usage, use extension methods from <see cref="StartOrchestrationOptionsExtensions"/>.

Check warning on line 143 in src/Abstractions/TaskOptions.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

XML comment has cref attribute 'StartOrchestrationOptionsExtensions' that could not be resolved

Check warning on line 143 in src/Abstractions/TaskOptions.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has cref attribute 'StartOrchestrationOptionsExtensions' that could not be resolved
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
/// </remarks>
public IReadOnlyList<string>? DedupeStatuses { get; init; }
}
40 changes: 40 additions & 0 deletions src/Client/Core/StartOrchestrationOptionsExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

Comment thread
YunchuWang marked this conversation as resolved.
namespace Microsoft.DurableTask.Client;

/// <summary>
/// Extension methods for <see cref="StartOrchestrationOptions"/> to provide type-safe deduplication status configuration.
/// </summary>
public static class StartOrchestrationOptionsExtensions
{
/// <summary>
/// Gets the terminal orchestration runtime statuses commonly used for deduplication.
/// These are typically the statuses used to prevent replacement of an existing orchestration instance.
/// Note: Any <see cref="OrchestrationRuntimeStatus"/> value can be used for deduplication;
/// this collection is provided for convenience and reference only.
/// </summary>
public static readonly OrchestrationRuntimeStatus[] ValidDedupeStatuses = new[]
Comment thread
YunchuWang marked this conversation as resolved.
{
OrchestrationRuntimeStatus.Completed,
OrchestrationRuntimeStatus.Failed,
OrchestrationRuntimeStatus.Terminated,
OrchestrationRuntimeStatus.Canceled,

Check warning on line 22 in src/Client/Core/StartOrchestrationOptionsExtensions.cs

View workflow job for this annotation

GitHub Actions / build

'OrchestrationRuntimeStatus.Canceled' is obsolete: 'The Canceled status is not currently used and exists only for compatibility reasons.'
};
Comment thread
YunchuWang marked this conversation as resolved.

/// <summary>
/// Creates a new <see cref="StartOrchestrationOptions"/> with the specified deduplication statuses.
/// </summary>
/// <param name="options">The base options to extend.</param>
/// <param name="dedupeStatuses">The orchestration runtime statuses that should be considered for deduplication.</param>
/// <returns>A new <see cref="StartOrchestrationOptions"/> instance with the deduplication statuses set.</returns>
public static StartOrchestrationOptions WithDedupeStatuses(
this StartOrchestrationOptions options,
params OrchestrationRuntimeStatus[] dedupeStatuses)
{
return options with
{
DedupeStatuses = dedupeStatuses.Select(s => s.ToString()).ToList(),
};
}
Comment thread
YunchuWang marked this conversation as resolved.
}
Comment thread
YunchuWang marked this conversation as resolved.
39 changes: 35 additions & 4 deletions src/Client/Grpc/GrpcDurableTaskClient.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Collections.Immutable;
using System.Diagnostics;
using System.Text;
using DurableTask.Core.History;
Expand Down Expand Up @@ -91,7 +92,7 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
version = this.options.DefaultVersion;
}

var request = new P.CreateInstanceRequest
P.CreateInstanceRequest request = new()
{
Name = orchestratorName.Name,
Version = version,
Expand Down Expand Up @@ -122,6 +123,36 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
request.ScheduledStartTimestamp = Timestamp.FromDateTimeOffset(startAt.Value.ToUniversalTime());
}

// Set orchestration ID reuse policy for deduplication support
// Note: This requires the protobuf to support OrchestrationIdReusePolicy field
// If the protobuf doesn't support it yet, this will need to be updated when the protobuf is updated
if (options?.DedupeStatuses != null && options.DedupeStatuses.Count > 0)
{
// Parse and validate all status strings to enum first
ImmutableHashSet<OrchestrationRuntimeStatus> dedupeStatuses = options.DedupeStatuses
.Select(s =>
{
if (!System.Enum.TryParse<OrchestrationRuntimeStatus>(s, ignoreCase: true, out OrchestrationRuntimeStatus status))
{
string validStatuses = string.Join(", ", System.Enum.GetNames(typeof(OrchestrationRuntimeStatus)));
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
throw new ArgumentException(
$"Invalid orchestration runtime status: '{s}' for deduplication. Valid values are: {validStatuses}",
nameof(options.DedupeStatuses));
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
}
Comment thread
YunchuWang marked this conversation as resolved.
Comment thread
YunchuWang marked this conversation as resolved.

return status;
}).ToImmutableHashSet();

Comment thread
YunchuWang marked this conversation as resolved.
// Convert dedupe statuses to protobuf statuses and create reuse policy
IEnumerable<P.OrchestrationStatus> dedupeStatusesProto = dedupeStatuses.Select(s => s.ToGrpcStatus());
P.OrchestrationIdReusePolicy? policy = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatusesProto);

if (policy != null)
{
request.OrchestrationIdReusePolicy = policy;
}
}

using Activity? newActivity = TraceHelper.StartActivityForNewOrchestration(request);

P.CreateInstanceResponse? result = await this.sidecarClient.StartInstanceAsync(
Expand Down Expand Up @@ -405,7 +436,7 @@ public override async Task<string> RestartAsync(
Check.NotNullOrEmpty(instanceId);
Check.NotEntity(this.options.EnableEntitySupport, instanceId);

var request = new P.RestartInstanceRequest
P.RestartInstanceRequest request = new P.RestartInstanceRequest
{
InstanceId = instanceId,
RestartWithNewInstanceId = restartWithNewInstanceId,
Expand Down Expand Up @@ -441,7 +472,7 @@ public override async Task RewindInstanceAsync(
Check.NotNullOrEmpty(instanceId);
Check.NotEntity(this.options.EnableEntitySupport, instanceId);

var request = new P.RewindInstanceRequest
P.RewindInstanceRequest request = new P.RewindInstanceRequest
{
InstanceId = instanceId,
Reason = reason,
Expand Down Expand Up @@ -573,7 +604,7 @@ async Task<PurgeResult> PurgeInstancesCoreAsync(

OrchestrationMetadata CreateMetadata(P.OrchestrationState state, bool includeInputsAndOutputs)
{
var metadata = new OrchestrationMetadata(state.Name, state.InstanceId)
OrchestrationMetadata metadata = new OrchestrationMetadata(state.Name, state.InstanceId)
{
CreatedAt = state.CreatedTimestamp.ToDateTimeOffset(),
LastUpdatedAt = state.LastUpdatedTimestamp.ToDateTimeOffset(),
Expand Down
87 changes: 86 additions & 1 deletion src/Client/Grpc/ProtoUtils.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,100 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Collections.Immutable;
using P = Microsoft.DurableTask.Protobuf;

namespace Microsoft.DurableTask.Client.Grpc;

/// <summary>
/// Protobuf helpers and utilities.
/// </summary>
static class ProtoUtils
public static class ProtoUtils
Comment thread
YunchuWang marked this conversation as resolved.
{
/// <summary>
/// Gets the terminal orchestration statuses that are commonly used for deduplication.
/// These are the statuses that can be used in OrchestrationIdReusePolicy.
/// </summary>
/// <returns>An immutable array of terminal orchestration statuses.</returns>
public static ImmutableArray<P.OrchestrationStatus> GetTerminalStatuses()
{
#pragma warning disable CS0618 // Type or member is obsolete - Canceled is intentionally included for compatibility
return ImmutableArray.Create(
P.OrchestrationStatus.Completed,
P.OrchestrationStatus.Failed,
P.OrchestrationStatus.Terminated,
P.OrchestrationStatus.Canceled);
#pragma warning restore CS0618
}

/// <summary>
/// Converts dedupe statuses (statuses that should NOT be replaced) to an OrchestrationIdReusePolicy
/// with replaceable statuses (statuses that CAN be replaced).
/// </summary>
/// <param name="dedupeStatuses">The orchestration statuses that should NOT be replaced. These are statuses for which an exception should be thrown if an orchestration already exists.</param>
/// <returns>An OrchestrationIdReusePolicy with replaceable statuses set, or null if all terminal statuses are dedupe statuses.</returns>
/// <remarks>
/// The policy uses "replaceableStatus" - these are statuses that CAN be replaced.
/// dedupeStatuses are statuses that should NOT be replaced.
/// So replaceableStatus = all terminal statuses MINUS dedupeStatuses.
/// </remarks>
Comment thread
YunchuWang marked this conversation as resolved.
public static P.OrchestrationIdReusePolicy? ConvertDedupeStatusesToReusePolicy(
IEnumerable<P.OrchestrationStatus> dedupeStatuses)
{
ImmutableArray<P.OrchestrationStatus> terminalStatuses = GetTerminalStatuses();
ImmutableHashSet<P.OrchestrationStatus> dedupeStatusSet = dedupeStatuses.ToImmutableHashSet();

P.OrchestrationIdReusePolicy policy = new();

// Add terminal statuses that are NOT in dedupeStatuses as replaceable
foreach (P.OrchestrationStatus terminalStatus in terminalStatuses)
{
if (!dedupeStatusSet.Contains(terminalStatus))
{
policy.ReplaceableStatus.Add(terminalStatus);
}
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
}
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed

// Only return policy if we have replaceable statuses
return policy.ReplaceableStatus.Count > 0 ? policy : null;
}

/// <summary>
/// Converts an OrchestrationIdReusePolicy with replaceable statuses to dedupe statuses
/// (statuses that should NOT be replaced).
/// </summary>
/// <param name="policy">The OrchestrationIdReusePolicy containing replaceable statuses.</param>
/// <returns>An array of orchestration statuses that should NOT be replaced, or null if all terminal statuses are replaceable.</returns>
/// <remarks>
/// The policy uses "replaceableStatus" - these are statuses that CAN be replaced.
/// dedupeStatuses are statuses that should NOT be replaced (should throw exception).
/// So dedupeStatuses = all terminal statuses MINUS replaceableStatus.
/// </remarks>
Comment thread
YunchuWang marked this conversation as resolved.
public static P.OrchestrationStatus[]? ConvertReusePolicyToDedupeStatuses(
P.OrchestrationIdReusePolicy? policy)
{
if (policy == null || policy.ReplaceableStatus.Count == 0)
{
return null;
}

ImmutableArray<P.OrchestrationStatus> terminalStatuses = GetTerminalStatuses();
ImmutableHashSet<P.OrchestrationStatus> replaceableStatusSet = policy.ReplaceableStatus.ToImmutableHashSet();

// Calculate dedupe statuses = terminal statuses - replaceable statuses
List<P.OrchestrationStatus> dedupeStatuses = new();
foreach (P.OrchestrationStatus terminalStatus in terminalStatuses)
{
if (!replaceableStatusSet.Contains(terminalStatus))
{
dedupeStatuses.Add(terminalStatus);
}
}
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed

// Only return if there are dedupe statuses
return dedupeStatuses.Count > 0 ? dedupeStatuses.ToArray() : null;
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
}
Comment thread
YunchuWang marked this conversation as resolved.

Comment thread
YunchuWang marked this conversation as resolved.
#pragma warning disable 0618 // Referencing Obsolete member. This is intention as we are only converting it.
/// <summary>
/// Converts <see cref="OrchestrationRuntimeStatus" /> to <see cref="P.OrchestrationStatus" />.
Expand Down
23 changes: 21 additions & 2 deletions src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using DurableTask.Core;
using DurableTask.Core.History;
using DurableTask.Core.Query;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Client.Entities;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
Expand Down Expand Up @@ -192,7 +193,25 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
},
};

await this.Client.CreateTaskOrchestrationAsync(message);
Core.OrchestrationStatus[]? dedupeStatuses = null;
if (options?.DedupeStatuses != null && options.DedupeStatuses.Count > 0)
{
dedupeStatuses = options.DedupeStatuses
.Select(s =>
{
if (!Enum.TryParse<OrchestrationRuntimeStatus>(s, ignoreCase: true, out var status))
{
var validStatuses = string.Join(", ", Enum.GetNames(typeof(OrchestrationRuntimeStatus)));
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
throw new ArgumentException(
$"Invalid orchestration runtime status: '{s}' for deduplication. Valid values are: {validStatuses}",
nameof(options.DedupeStatuses));
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
}
Comment thread
YunchuWang marked this conversation as resolved.
Comment thread
YunchuWang marked this conversation as resolved.
return status.ConvertToCore();
})
.ToArray();
}

await this.Client.CreateTaskOrchestrationAsync(message, dedupeStatuses);
return instanceId;
}

Expand Down Expand Up @@ -303,7 +322,7 @@ public override async Task<string> RestartAsync(
},
};

await this.Client.CreateTaskOrchestrationAsync(message);
await this.Client.CreateTaskOrchestrationAsync(message, dedupeStatuses: null);
return newInstanceId;
}

Expand Down
26 changes: 24 additions & 2 deletions src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
// Copyright (c) Microsoft Corporation.
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Collections.Concurrent;
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using DurableTask.Core;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using DurableTask.Core.Query;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Microsoft.DurableTask.Client.Grpc;
using Microsoft.DurableTask.Testing.Sidecar.Dispatcher;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -202,6 +205,18 @@ async Task WaitForWorkItemClientConnection()

try
{
// Convert OrchestrationIdReusePolicy to dedupeStatuses
// The policy uses "replaceableStatus" - these are statuses that CAN be replaced
// dedupeStatuses are statuses that should NOT be replaced (should throw exception)
// So dedupeStatuses = all terminal statuses MINUS replaceableStatus
OrchestrationStatus[]? dedupeStatuses = null;
P.OrchestrationStatus[]? dedupeStatusesProto = ProtoUtils.ConvertReusePolicyToDedupeStatuses(request.OrchestrationIdReusePolicy);
if (dedupeStatusesProto != null)
{
// Convert protobuf statuses to Core.OrchestrationStatus
dedupeStatuses = dedupeStatusesProto.Select(s => (OrchestrationStatus)s).ToArray();
}
Comment thread
YunchuWang marked this conversation as resolved.

await this.client.CreateTaskOrchestrationAsync(
new TaskMessage
{
Expand All @@ -216,7 +231,14 @@ await this.client.CreateTaskOrchestrationAsync(
: null
},
OrchestrationInstance = instance,
});
},
dedupeStatuses);
}
catch (OrchestrationAlreadyExistsException e)
Comment thread
YunchuWang marked this conversation as resolved.
{
// Convert to gRPC exception
this.log.LogWarning(e, "Orchestration with ID {InstanceId} already exists", instance.InstanceId);
throw new RpcException(new Status(StatusCode.AlreadyExists, e.Message));
}
catch (Exception e)
{
Expand Down
1 change: 1 addition & 0 deletions test/Abstractions.Tests/Abstractions.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

<ItemGroup>
<ProjectReference Include="$(SrcRoot)Abstractions/Abstractions.csproj" />
<ProjectReference Include="$(SrcRoot)Client/Core/Client.csproj" />
</ItemGroup>

</Project>
Loading
Loading