Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/Abstractions/TaskOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,12 @@ public record StartOrchestrationOptions(string? InstanceId = null, DateTimeOffse
/// Gets the version to associate with the orchestration instance.
/// </summary>
public TaskVersion? Version { get; init; }

/// <summary>
/// Gets the orchestration runtime statuses that should be considered for deduplication.
/// </summary>
/// <remarks>
/// For type-safe usage, use the WithDedupeStatuses extension method.
/// </remarks>
public IReadOnlyList<string>? DedupeStatuses { get; init; }
}
36 changes: 36 additions & 0 deletions src/Client/Core/StartOrchestrationOptionsExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

Comment thread
YunchuWang marked this conversation as resolved.
using System.Linq;

namespace Microsoft.DurableTask.Client;

/// <summary>
/// Extension methods for <see cref="StartOrchestrationOptions"/> to provide type-safe deduplication status configuration.
/// </summary>
public static class StartOrchestrationOptionsExtensions
{
public static readonly OrchestrationRuntimeStatus[] ValidDedupeStatuses = new[]

Check warning on line 13 in src/Client/Core/StartOrchestrationOptionsExtensions.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Check warning on line 13 in src/Client/Core/StartOrchestrationOptionsExtensions.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Missing XML comment for publicly visible type or member 'StartOrchestrationOptionsExtensions.ValidDedupeStatuses'
Comment thread
YunchuWang marked this conversation as resolved.
{
OrchestrationRuntimeStatus.Completed,
OrchestrationRuntimeStatus.Failed,
OrchestrationRuntimeStatus.Terminated,
OrchestrationRuntimeStatus.Canceled,

Check warning on line 18 in src/Client/Core/StartOrchestrationOptionsExtensions.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

'OrchestrationRuntimeStatus.Canceled' is obsolete: 'The Canceled status is not currently used and exists only for compatibility reasons.'

Check warning on line 18 in src/Client/Core/StartOrchestrationOptionsExtensions.cs

View workflow job for this annotation

GitHub Actions / build

'OrchestrationRuntimeStatus.Canceled' is obsolete: 'The Canceled status is not currently used and exists only for compatibility reasons.'
};
Comment thread
YunchuWang marked this conversation as resolved.

/// <summary>
/// Creates a new <see cref="StartOrchestrationOptions"/> with the specified deduplication statuses.
/// </summary>
/// <param name="options">The base options to extend.</param>
/// <param name="dedupeStatuses">The orchestration runtime statuses that should be considered for deduplication.</param>
/// <returns>A new <see cref="StartOrchestrationOptions"/> instance with the deduplication statuses set.</returns>
public static StartOrchestrationOptions WithDedupeStatuses(
this StartOrchestrationOptions options,
params OrchestrationRuntimeStatus[] dedupeStatuses)
{
return options with
{
DedupeStatuses = dedupeStatuses.Select(s => s.ToString()).ToList(),
};
}
Comment thread
YunchuWang marked this conversation as resolved.
}
Comment thread
YunchuWang marked this conversation as resolved.
37 changes: 33 additions & 4 deletions src/Client/Grpc/GrpcDurableTaskClient.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Collections.Immutable;
using System.Diagnostics;
using System.Text;
using DurableTask.Core.History;
Expand Down Expand Up @@ -91,7 +92,7 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
version = this.options.DefaultVersion;
}

var request = new P.CreateInstanceRequest
P.CreateInstanceRequest request = new()
{
Name = orchestratorName.Name,
Version = version,
Expand Down Expand Up @@ -122,6 +123,34 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
request.ScheduledStartTimestamp = Timestamp.FromDateTimeOffset(startAt.Value.ToUniversalTime());
}

// Set orchestration ID reuse policy for deduplication support
// Note: This requires the protobuf to support OrchestrationIdReusePolicy field
// If the protobuf doesn't support it yet, this will need to be updated when the protobuf is updated
if (options?.DedupeStatuses != null && options.DedupeStatuses.Count > 0)
{
// Parse and validate all status strings to enum first
ImmutableHashSet<OrchestrationRuntimeStatus> dedupeStatuses = options.DedupeStatuses
.Select(s =>
{
if (!System.Enum.TryParse<OrchestrationRuntimeStatus>(s, ignoreCase: true, out OrchestrationRuntimeStatus status))
{
throw new ArgumentException(
$"Invalid orchestration runtime status: '{s}' for deduplication.");
}
Comment thread
YunchuWang marked this conversation as resolved.
Comment thread
YunchuWang marked this conversation as resolved.

return status;
}).ToImmutableHashSet();

Comment thread
YunchuWang marked this conversation as resolved.
// Convert dedupe statuses to protobuf statuses and create reuse policy
IEnumerable<P.OrchestrationStatus> dedupeStatusesProto = dedupeStatuses.Select(s => s.ToGrpcStatus());
P.OrchestrationIdReusePolicy? policy = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatusesProto);

if (policy != null)
{
request.OrchestrationIdReusePolicy = policy;
}
}

using Activity? newActivity = TraceHelper.StartActivityForNewOrchestration(request);

P.CreateInstanceResponse? result = await this.sidecarClient.StartInstanceAsync(
Expand Down Expand Up @@ -405,7 +434,7 @@ public override async Task<string> RestartAsync(
Check.NotNullOrEmpty(instanceId);
Check.NotEntity(this.options.EnableEntitySupport, instanceId);

var request = new P.RestartInstanceRequest
P.RestartInstanceRequest request = new P.RestartInstanceRequest
{
InstanceId = instanceId,
RestartWithNewInstanceId = restartWithNewInstanceId,
Expand Down Expand Up @@ -441,7 +470,7 @@ public override async Task RewindInstanceAsync(
Check.NotNullOrEmpty(instanceId);
Check.NotEntity(this.options.EnableEntitySupport, instanceId);

var request = new P.RewindInstanceRequest
P.RewindInstanceRequest request = new P.RewindInstanceRequest
{
InstanceId = instanceId,
Reason = reason,
Expand Down Expand Up @@ -573,7 +602,7 @@ async Task<PurgeResult> PurgeInstancesCoreAsync(

OrchestrationMetadata CreateMetadata(P.OrchestrationState state, bool includeInputsAndOutputs)
{
var metadata = new OrchestrationMetadata(state.Name, state.InstanceId)
OrchestrationMetadata metadata = new OrchestrationMetadata(state.Name, state.InstanceId)
{
CreatedAt = state.CreatedTimestamp.ToDateTimeOffset(),
LastUpdatedAt = state.LastUpdatedTimestamp.ToDateTimeOffset(),
Expand Down
80 changes: 79 additions & 1 deletion src/Client/Grpc/ProtoUtils.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,93 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Collections.Immutable;
using System.Linq;
using P = Microsoft.DurableTask.Protobuf;

namespace Microsoft.DurableTask.Client.Grpc;

/// <summary>
/// Protobuf helpers and utilities.
/// </summary>
static class ProtoUtils
public static class ProtoUtils
Comment thread
YunchuWang marked this conversation as resolved.
{
/// <summary>
/// Gets the terminal orchestration statuses that are commonly used for deduplication.
/// These are the statuses that can be used in OrchestrationIdReusePolicy.
/// </summary>
/// <returns>An immutable array of terminal orchestration statuses.</returns>
public static ImmutableArray<P.OrchestrationStatus> GetTerminalStatuses()
{
#pragma warning disable CS0618 // Type or member is obsolete - Canceled is intentionally included for compatibility
return ImmutableArray.Create(
P.OrchestrationStatus.Completed,
P.OrchestrationStatus.Failed,
P.OrchestrationStatus.Terminated,
P.OrchestrationStatus.Canceled);
#pragma warning restore CS0618
}

/// <summary>
/// Converts dedupe statuses (statuses that should NOT be replaced) to an OrchestrationIdReusePolicy
/// with replaceable statuses (statuses that CAN be replaced).
/// </summary>
/// <param name="dedupeStatuses">The orchestration statuses that should NOT be replaced. These are statuses for which an exception should be thrown if an orchestration already exists.</param>
/// <returns>An OrchestrationIdReusePolicy with replaceable statuses set, or null if all terminal statuses are dedupe statuses.</returns>
/// <remarks>
/// The policy uses "replaceableStatus" - these are statuses that CAN be replaced.
/// dedupeStatuses are statuses that should NOT be replaced.
/// So replaceableStatus = all terminal statuses MINUS dedupeStatuses.
/// </remarks>
Comment thread
YunchuWang marked this conversation as resolved.
public static P.OrchestrationIdReusePolicy? ConvertDedupeStatusesToReusePolicy(
IEnumerable<P.OrchestrationStatus>? dedupeStatuses)
{
ImmutableArray<P.OrchestrationStatus> terminalStatuses = GetTerminalStatuses();
ImmutableHashSet<P.OrchestrationStatus> dedupeStatusSet = dedupeStatuses?.ToImmutableHashSet() ?? ImmutableHashSet<P.OrchestrationStatus>.Empty;

P.OrchestrationIdReusePolicy policy = new();

// Add terminal statuses that are NOT in dedupeStatuses as replaceable
foreach (P.OrchestrationStatus terminalStatus in terminalStatuses.Where(status => !dedupeStatusSet.Contains(status)))
{
policy.ReplaceableStatus.Add(terminalStatus);
}

// Only return policy if we have replaceable statuses
return policy.ReplaceableStatus.Count > 0 ? policy : null;
}

/// <summary>
/// Converts an OrchestrationIdReusePolicy with replaceable statuses to dedupe statuses
/// (statuses that should NOT be replaced).
/// </summary>
/// <param name="policy">The OrchestrationIdReusePolicy containing replaceable statuses.</param>
/// <returns>An array of orchestration statuses that should NOT be replaced, or null if all terminal statuses are replaceable.</returns>
/// <remarks>
/// The policy uses "replaceableStatus" - these are statuses that CAN be replaced.
/// dedupeStatuses are statuses that should NOT be replaced (should throw exception).
/// So dedupeStatuses = all terminal statuses MINUS replaceableStatus.
/// </remarks>
Comment thread
YunchuWang marked this conversation as resolved.
public static P.OrchestrationStatus[]? ConvertReusePolicyToDedupeStatuses(
P.OrchestrationIdReusePolicy? policy)
{
if (policy == null || policy.ReplaceableStatus.Count == 0)
{
return null;
}

ImmutableArray<P.OrchestrationStatus> terminalStatuses = GetTerminalStatuses();
ImmutableHashSet<P.OrchestrationStatus> replaceableStatusSet = policy.ReplaceableStatus.ToImmutableHashSet();

// Calculate dedupe statuses = terminal statuses - replaceable statuses
P.OrchestrationStatus[] dedupeStatuses = terminalStatuses
.Where(terminalStatus => !replaceableStatusSet.Contains(terminalStatus))
.ToArray();

// Only return if there are dedupe statuses
return dedupeStatuses.Length > 0 ? dedupeStatuses : null;
}
Comment thread
YunchuWang marked this conversation as resolved.

Comment thread
YunchuWang marked this conversation as resolved.
#pragma warning disable 0618 // Referencing Obsolete member. This is intention as we are only converting it.
/// <summary>
/// Converts <see cref="OrchestrationRuntimeStatus" /> to <see cref="P.OrchestrationStatus" />.
Expand Down
21 changes: 19 additions & 2 deletions src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using DurableTask.Core;
using DurableTask.Core.History;
using DurableTask.Core.Query;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Client.Entities;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
Expand Down Expand Up @@ -192,7 +193,23 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
},
};

await this.Client.CreateTaskOrchestrationAsync(message);
Core.OrchestrationStatus[]? dedupeStatuses = null;
if (options?.DedupeStatuses != null && options.DedupeStatuses.Count > 0)
{
dedupeStatuses = options.DedupeStatuses
.Select(s =>
{
if (!Enum.TryParse<OrchestrationRuntimeStatus>(s, ignoreCase: true, out var status))
{
throw new ArgumentException(
$"Invalid orchestration runtime status: '{s}' for deduplication.");
}
Comment thread
YunchuWang marked this conversation as resolved.
Comment thread
YunchuWang marked this conversation as resolved.
return status.ConvertToCore();
})
.ToArray();
}

await this.Client.CreateTaskOrchestrationAsync(message, dedupeStatuses);
return instanceId;
}

Expand Down Expand Up @@ -303,7 +320,7 @@ public override async Task<string> RestartAsync(
},
};

await this.Client.CreateTaskOrchestrationAsync(message);
await this.Client.CreateTaskOrchestrationAsync(message, dedupeStatuses: null);
return newInstanceId;
}

Expand Down
26 changes: 24 additions & 2 deletions src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
// Copyright (c) Microsoft Corporation.
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Collections.Concurrent;
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using DurableTask.Core;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using DurableTask.Core.Query;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Microsoft.DurableTask.Client.Grpc;
using Microsoft.DurableTask.Testing.Sidecar.Dispatcher;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -202,6 +205,18 @@ async Task WaitForWorkItemClientConnection()

try
{
// Convert OrchestrationIdReusePolicy to dedupeStatuses
// The policy uses "replaceableStatus" - these are statuses that CAN be replaced
// dedupeStatuses are statuses that should NOT be replaced (should throw exception)
// So dedupeStatuses = all terminal statuses MINUS replaceableStatus
OrchestrationStatus[]? dedupeStatuses = null;
P.OrchestrationStatus[]? dedupeStatusesProto = ProtoUtils.ConvertReusePolicyToDedupeStatuses(request.OrchestrationIdReusePolicy);
if (dedupeStatusesProto != null)
{
// Convert protobuf statuses to Core.OrchestrationStatus
dedupeStatuses = dedupeStatusesProto.Select(s => (OrchestrationStatus)s).ToArray();
}
Comment thread
YunchuWang marked this conversation as resolved.

await this.client.CreateTaskOrchestrationAsync(
new TaskMessage
{
Expand All @@ -216,7 +231,14 @@ await this.client.CreateTaskOrchestrationAsync(
: null
},
OrchestrationInstance = instance,
});
},
dedupeStatuses);
}
catch (OrchestrationAlreadyExistsException e)
Comment thread
YunchuWang marked this conversation as resolved.
{
// Convert to gRPC exception
this.log.LogWarning(e, "Orchestration with ID {InstanceId} already exists", instance.InstanceId);
throw new RpcException(new Status(StatusCode.AlreadyExists, e.Message));
}
catch (Exception e)
{
Expand Down
1 change: 1 addition & 0 deletions test/Abstractions.Tests/Abstractions.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

<ItemGroup>
<ProjectReference Include="$(SrcRoot)Abstractions/Abstractions.csproj" />
<ProjectReference Include="$(SrcRoot)Client/Core/Client.csproj" />
</ItemGroup>

</Project>
Loading
Loading