Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/Abstractions/TaskOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,17 @@
/// Gets the version to associate with the orchestration instance.
/// </summary>
public TaskVersion? Version { get; init; }

/// <summary>
/// Gets the orchestration runtime statuses that should be considered for deduplication.
/// If an orchestration instance with the same instance ID already exists and is in one of these statuses,
/// the creation will throw an <see cref="OrchestrationAlreadyExistsException"/> instead of creating a new instance.

Check warning on line 141 in src/Abstractions/TaskOptions.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has cref attribute 'OrchestrationAlreadyExistsException' that could not be resolved

Check warning on line 141 in src/Abstractions/TaskOptions.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

XML comment has cref attribute 'OrchestrationAlreadyExistsException' that could not be resolved
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
/// This enables idempotent orchestration creation.
/// </summary>
/// <remarks>
/// The status names should match the values from <see cref="Microsoft.DurableTask.Client.OrchestrationRuntimeStatus"/> enum

Check warning on line 145 in src/Abstractions/TaskOptions.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has cref attribute 'OrchestrationRuntimeStatus' that could not be resolved

Check warning on line 145 in src/Abstractions/TaskOptions.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

XML comment has cref attribute 'OrchestrationRuntimeStatus' that could not be resolved
/// (e.g., "Completed", "Failed", "Terminated", "Canceled").
/// For type-safe usage, use extension methods from <see cref="Microsoft.DurableTask.Client.StartOrchestrationOptionsExtensions"/>.

Check warning on line 147 in src/Abstractions/TaskOptions.cs

View workflow job for this annotation

GitHub Actions / build

XML comment has cref attribute 'StartOrchestrationOptionsExtensions' that could not be resolved

Check warning on line 147 in src/Abstractions/TaskOptions.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

XML comment has cref attribute 'StartOrchestrationOptionsExtensions' that could not be resolved
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
/// </remarks>
public IReadOnlyList<string>? DedupeStatuses { get; init; }
}
38 changes: 38 additions & 0 deletions src/Client/Core/StartOrchestrationOptionsExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

Comment thread
YunchuWang marked this conversation as resolved.
namespace Microsoft.DurableTask.Client;

/// <summary>
/// Extension methods for <see cref="StartOrchestrationOptions"/> to provide type-safe deduplication status configuration.
/// </summary>
public static class StartOrchestrationOptionsExtensions
{
/// <summary>
/// Gets the terminal orchestration runtime statuses that are valid for deduplication.
/// These are the statuses that can be used to prevent replacement of an existing orchestration instance.
/// </summary>
public static readonly OrchestrationRuntimeStatus[] ValidDedupeStatuses = new[]
Comment thread
YunchuWang marked this conversation as resolved.
{
OrchestrationRuntimeStatus.Completed,
OrchestrationRuntimeStatus.Failed,
OrchestrationRuntimeStatus.Terminated,
OrchestrationRuntimeStatus.Canceled,
};
Comment thread
YunchuWang marked this conversation as resolved.

/// <summary>
/// Creates a new <see cref="StartOrchestrationOptions"/> with the specified deduplication statuses.
/// </summary>
/// <param name="options">The base options to extend.</param>
/// <param name="dedupeStatuses">The orchestration runtime statuses that should be considered for deduplication.</param>
/// <returns>A new <see cref="StartOrchestrationOptions"/> instance with the deduplication statuses set.</returns>
public static StartOrchestrationOptions WithDedupeStatuses(
this StartOrchestrationOptions options,
params OrchestrationRuntimeStatus[] dedupeStatuses)
{
return options with
{
DedupeStatuses = dedupeStatuses.Select(s => s.ToString()).ToList(),
};
}
Comment thread
YunchuWang marked this conversation as resolved.
}
Comment thread
YunchuWang marked this conversation as resolved.
50 changes: 46 additions & 4 deletions src/Client/Grpc/GrpcDurableTaskClient.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Collections.Immutable;
using System.Diagnostics;
using System.Text;
using DurableTask.Core.History;
Expand Down Expand Up @@ -91,7 +92,7 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
version = this.options.DefaultVersion;
}

var request = new P.CreateInstanceRequest
P.CreateInstanceRequest request = new()
{
Name = orchestratorName.Name,
Version = version,
Expand Down Expand Up @@ -122,6 +123,47 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
request.ScheduledStartTimestamp = Timestamp.FromDateTimeOffset(startAt.Value.ToUniversalTime());
}

// Set orchestration ID reuse policy for deduplication support
// Note: This requires the protobuf to support OrchestrationIdReusePolicy field
// If the protobuf doesn't support it yet, this will need to be updated when the protobuf is updated
if (options?.DedupeStatuses != null && options.DedupeStatuses.Count > 0)
{
// Parse and validate all status strings to enum first
ImmutableHashSet<OrchestrationRuntimeStatus> dedupeStatuses = options.DedupeStatuses
.Select(s =>
{
if (!System.Enum.TryParse<OrchestrationRuntimeStatus>(s, ignoreCase: true, out OrchestrationRuntimeStatus status))
{
string validStatuses = string.Join(", ", StartOrchestrationOptionsExtensions.ValidDedupeStatuses.Select(ts => ts.ToString()));
throw new ArgumentException(
$"Invalid orchestration runtime status for deduplication: '{s}'. Valid statuses for deduplication are: {validStatuses}",
nameof(options.DedupeStatuses));
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
}
Comment thread
YunchuWang marked this conversation as resolved.
Comment thread
YunchuWang marked this conversation as resolved.

return status;
}).ToImmutableHashSet();

Comment thread
YunchuWang marked this conversation as resolved.
P.OrchestrationIdReusePolicy policy = new();

// The policy uses "replaceableStatus" - these are statuses that CAN be replaced
// dedupeStatuses are statuses that should NOT be replaced (should throw exception)
// So we add terminal statuses that are NOT in dedupeStatuses to replaceableStatus
// This matches the logic in AAPT-DTMB ProtoUtils.Convert
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
foreach (OrchestrationRuntimeStatus terminalStatus in StartOrchestrationOptionsExtensions.ValidDedupeStatuses)
{
if (!dedupeStatuses.Contains(terminalStatus))
{
policy.ReplaceableStatus.Add(terminalStatus.ToGrpcStatus());
}
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
}
Comment thread
YunchuWang marked this conversation as resolved.
Fixed

// Only set if we have replaceable statuses
if (policy.ReplaceableStatus.Count > 0)
{
request.OrchestrationIdReusePolicy = policy;
}
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
}

using Activity? newActivity = TraceHelper.StartActivityForNewOrchestration(request);

P.CreateInstanceResponse? result = await this.sidecarClient.StartInstanceAsync(
Expand Down Expand Up @@ -405,7 +447,7 @@ public override async Task<string> RestartAsync(
Check.NotNullOrEmpty(instanceId);
Check.NotEntity(this.options.EnableEntitySupport, instanceId);

var request = new P.RestartInstanceRequest
P.RestartInstanceRequest request = new P.RestartInstanceRequest
{
InstanceId = instanceId,
RestartWithNewInstanceId = restartWithNewInstanceId,
Expand Down Expand Up @@ -441,7 +483,7 @@ public override async Task RewindInstanceAsync(
Check.NotNullOrEmpty(instanceId);
Check.NotEntity(this.options.EnableEntitySupport, instanceId);

var request = new P.RewindInstanceRequest
P.RewindInstanceRequest request = new P.RewindInstanceRequest
{
InstanceId = instanceId,
Reason = reason,
Expand Down Expand Up @@ -573,7 +615,7 @@ async Task<PurgeResult> PurgeInstancesCoreAsync(

OrchestrationMetadata CreateMetadata(P.OrchestrationState state, bool includeInputsAndOutputs)
{
var metadata = new OrchestrationMetadata(state.Name, state.InstanceId)
OrchestrationMetadata metadata = new OrchestrationMetadata(state.Name, state.InstanceId)
{
CreatedAt = state.CreatedTimestamp.ToDateTimeOffset(),
LastUpdatedAt = state.LastUpdatedTimestamp.ToDateTimeOffset(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using DurableTask.Core;
using DurableTask.Core.History;
using DurableTask.Core.Query;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Client.Entities;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
Expand Down Expand Up @@ -192,7 +193,25 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
},
};

await this.Client.CreateTaskOrchestrationAsync(message);
Core.OrchestrationStatus[]? dedupeStatuses = null;
if (options?.DedupeStatuses != null && options.DedupeStatuses.Count > 0)
{
dedupeStatuses = options.DedupeStatuses
.Select(s =>
{
if (!Enum.TryParse<OrchestrationRuntimeStatus>(s, ignoreCase: true, out var status))
{
var validStatuses = string.Join(", ", StartOrchestrationOptionsExtensions.ValidDedupeStatuses.Select(ts => ts.ToString()));
throw new ArgumentException(
$"Invalid orchestration runtime status for deduplication: '{s}'. Valid statuses for deduplication are: {validStatuses}",
nameof(options.DedupeStatuses));
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
Comment thread
YunchuWang marked this conversation as resolved.
Outdated
}
Comment thread
YunchuWang marked this conversation as resolved.
Comment thread
YunchuWang marked this conversation as resolved.
return status.ConvertToCore();
})
.ToArray();
}

await this.Client.CreateTaskOrchestrationAsync(message, dedupeStatuses);
return instanceId;
}

Expand Down
40 changes: 38 additions & 2 deletions src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// Copyright (c) Microsoft Corporation.
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using System.Collections.Concurrent;
using System.Diagnostics;
using System.Globalization;
using DurableTask.Core;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using DurableTask.Core.Query;
using Google.Protobuf.WellKnownTypes;
Expand Down Expand Up @@ -202,6 +203,34 @@ async Task WaitForWorkItemClientConnection()

try
{
// Convert OrchestrationIdReusePolicy to dedupeStatuses
// The policy uses "replaceableStatus" - these are statuses that CAN be replaced
// dedupeStatuses are statuses that should NOT be replaced (should throw exception)
// So dedupeStatuses = all terminal statuses MINUS replaceableStatus
OrchestrationStatus[]? dedupeStatuses = null;
if (request.OrchestrationIdReusePolicy != null && request.OrchestrationIdReusePolicy.ReplaceableStatus.Count > 0)
{
var terminalStatuses = new HashSet<OrchestrationStatus>
{
OrchestrationStatus.Completed,
OrchestrationStatus.Failed,
OrchestrationStatus.Terminated,
OrchestrationStatus.Canceled,
};

// Remove replaceable statuses from terminal statuses to get dedupe statuses
foreach (P.OrchestrationStatus replaceableStatus in request.OrchestrationIdReusePolicy.ReplaceableStatus)
{
terminalStatuses.Remove((OrchestrationStatus)replaceableStatus);
}

// Only set dedupeStatuses if there are any statuses that should not be replaced
if (terminalStatuses.Count > 0)
{
dedupeStatuses = terminalStatuses.ToArray();
}
}
Comment thread
YunchuWang marked this conversation as resolved.

await this.client.CreateTaskOrchestrationAsync(
new TaskMessage
{
Expand All @@ -216,7 +245,14 @@ await this.client.CreateTaskOrchestrationAsync(
: null
},
OrchestrationInstance = instance,
});
},
dedupeStatuses);
}
catch (OrchestrationAlreadyExistsException e)
Comment thread
YunchuWang marked this conversation as resolved.
{
// Convert to gRPC exception
this.log.LogWarning(e, "Orchestration with ID {InstanceId} already exists", instance.InstanceId);
throw new RpcException(new Status(StatusCode.AlreadyExists, e.Message));
}
catch (Exception e)
{
Expand Down
Loading
Loading