Support dedup status when starting orchestration#542
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds support for deduplication status configuration when starting orchestrations, allowing callers to specify which orchestration runtime statuses should prevent creation of a new instance with the same instance ID. This enables idempotent orchestration creation patterns.
Key changes:
- Adds a
DedupeStatusesproperty toStartOrchestrationOptionsfor specifying which terminal statuses should prevent instance replacement - Provides a type-safe
WithDedupeStatuses()extension method for configuring deduplication - Implements deduplication logic in both gRPC and shim client implementations by converting dedupe statuses to the protobuf
OrchestrationIdReusePolicy - Updates the server to handle the reuse policy and throw appropriate exceptions when deduplication conditions are met
- Adds comprehensive integration tests validating various deduplication scenarios
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| src/Abstractions/TaskOptions.cs | Adds DedupeStatuses property to StartOrchestrationOptions with XML documentation |
| src/Client/Core/StartOrchestrationOptionsExtensions.cs | New file providing WithDedupeStatuses() extension method and ValidDedupeStatuses constant |
| src/Client/Grpc/GrpcDurableTaskClient.cs | Implements dedupe status handling by converting to OrchestrationIdReusePolicy protobuf message |
| src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs | Implements dedupe status handling for the shim client by passing dedupeStatuses to core API |
| src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs | Server-side implementation that converts OrchestrationIdReusePolicy to dedupe statuses and catches OrchestrationAlreadyExistsException |
| test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs | Adds 5 integration tests covering various deduplication scenarios including successful deduplication, replacement, and edge cases |
src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs
Outdated
Show resolved
Hide resolved
test/Grpc.IntegrationTests/GrpcDurableTaskClientIntegrationTests.cs
Outdated
Show resolved
Hide resolved
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs
Outdated
Show resolved
Hide resolved
test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs
Outdated
Show resolved
Hide resolved
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
…skClientTests.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 6 comments.
Comments suppressed due to low confidence (2)
src/Client/Grpc/ProtoUtils.cs:94
- The new public methods
GetTerminalStatuses,ConvertDedupeStatusesToReusePolicy, andConvertReusePolicyToDedupeStatusesin ProtoUtils lack unit test coverage. These conversion methods contain important logic for the deduplication feature and should have dedicated unit tests intest/Client/Grpc.Tests/to verify edge cases such as empty dedupe status lists, all terminal statuses being dedupe statuses, and round-trip conversions.
public static ImmutableArray<P.OrchestrationStatus> GetTerminalStatuses()
{
#pragma warning disable CS0618 // Type or member is obsolete - Canceled is intentionally included for compatibility
return ImmutableArray.Create(
P.OrchestrationStatus.Completed,
P.OrchestrationStatus.Failed,
P.OrchestrationStatus.Terminated,
P.OrchestrationStatus.Canceled);
#pragma warning restore CS0618
}
/// <summary>
/// Converts dedupe statuses (statuses that should NOT be replaced) to an OrchestrationIdReusePolicy
/// with replaceable statuses (statuses that CAN be replaced).
/// </summary>
/// <param name="dedupeStatuses">The orchestration statuses that should NOT be replaced. These are statuses for which an exception should be thrown if an orchestration already exists.</param>
/// <returns>An OrchestrationIdReusePolicy with replaceable statuses set, or null if all terminal statuses are dedupe statuses.</returns>
/// <remarks>
/// The policy uses "replaceableStatus" - these are statuses that CAN be replaced.
/// dedupeStatuses are statuses that should NOT be replaced.
/// So replaceableStatus = all terminal statuses MINUS dedupeStatuses.
/// </remarks>
public static P.OrchestrationIdReusePolicy? ConvertDedupeStatusesToReusePolicy(
IEnumerable<P.OrchestrationStatus> dedupeStatuses)
{
ImmutableArray<P.OrchestrationStatus> terminalStatuses = GetTerminalStatuses();
ImmutableHashSet<P.OrchestrationStatus> dedupeStatusSet = dedupeStatuses.ToImmutableHashSet();
P.OrchestrationIdReusePolicy policy = new();
// Add terminal statuses that are NOT in dedupeStatuses as replaceable
foreach (P.OrchestrationStatus terminalStatus in terminalStatuses.Where(status => !dedupeStatusSet.Contains(status)))
{
policy.ReplaceableStatus.Add(terminalStatus);
}
// Only return policy if we have replaceable statuses
return policy.ReplaceableStatus.Count > 0 ? policy : null;
}
/// <summary>
/// Converts an OrchestrationIdReusePolicy with replaceable statuses to dedupe statuses
/// (statuses that should NOT be replaced).
/// </summary>
/// <param name="policy">The OrchestrationIdReusePolicy containing replaceable statuses.</param>
/// <returns>An array of orchestration statuses that should NOT be replaced, or null if all terminal statuses are replaceable.</returns>
/// <remarks>
/// The policy uses "replaceableStatus" - these are statuses that CAN be replaced.
/// dedupeStatuses are statuses that should NOT be replaced (should throw exception).
/// So dedupeStatuses = all terminal statuses MINUS replaceableStatus.
/// </remarks>
public static P.OrchestrationStatus[]? ConvertReusePolicyToDedupeStatuses(
P.OrchestrationIdReusePolicy? policy)
{
if (policy == null || policy.ReplaceableStatus.Count == 0)
{
return null;
}
ImmutableArray<P.OrchestrationStatus> terminalStatuses = GetTerminalStatuses();
ImmutableHashSet<P.OrchestrationStatus> replaceableStatusSet = policy.ReplaceableStatus.ToImmutableHashSet();
// Calculate dedupe statuses = terminal statuses - replaceable statuses
P.OrchestrationStatus[] dedupeStatuses = terminalStatuses
.Where(terminalStatus => !replaceableStatusSet.Contains(terminalStatus))
.ToArray();
// Only return if there are dedupe statuses
return dedupeStatuses.Length > 0 ? dedupeStatuses : null;
}
#pragma warning disable 0618 // Referencing Obsolete member. This is intention as we are only converting it.
/// <summary>
/// Converts <see cref="OrchestrationRuntimeStatus" /> to <see cref="P.OrchestrationStatus" />.
/// </summary>
src/Client/Grpc/ProtoUtils.cs:94
- The
GetTerminalStatuses,ConvertDedupeStatusesToReusePolicy, andConvertReusePolicyToDedupeStatusesmethods are now part of the public API (class changed from internal to public). Consider adding<param>documentation for the method parameters and potentially adding examples in the XML documentation to help API consumers understand how to use these conversion methods correctly.
/// <summary>
/// Gets the terminal orchestration statuses that are commonly used for deduplication.
/// These are the statuses that can be used in OrchestrationIdReusePolicy.
/// </summary>
/// <returns>An immutable array of terminal orchestration statuses.</returns>
public static ImmutableArray<P.OrchestrationStatus> GetTerminalStatuses()
{
#pragma warning disable CS0618 // Type or member is obsolete - Canceled is intentionally included for compatibility
return ImmutableArray.Create(
P.OrchestrationStatus.Completed,
P.OrchestrationStatus.Failed,
P.OrchestrationStatus.Terminated,
P.OrchestrationStatus.Canceled);
#pragma warning restore CS0618
}
/// <summary>
/// Converts dedupe statuses (statuses that should NOT be replaced) to an OrchestrationIdReusePolicy
/// with replaceable statuses (statuses that CAN be replaced).
/// </summary>
/// <param name="dedupeStatuses">The orchestration statuses that should NOT be replaced. These are statuses for which an exception should be thrown if an orchestration already exists.</param>
/// <returns>An OrchestrationIdReusePolicy with replaceable statuses set, or null if all terminal statuses are dedupe statuses.</returns>
/// <remarks>
/// The policy uses "replaceableStatus" - these are statuses that CAN be replaced.
/// dedupeStatuses are statuses that should NOT be replaced.
/// So replaceableStatus = all terminal statuses MINUS dedupeStatuses.
/// </remarks>
public static P.OrchestrationIdReusePolicy? ConvertDedupeStatusesToReusePolicy(
IEnumerable<P.OrchestrationStatus> dedupeStatuses)
{
ImmutableArray<P.OrchestrationStatus> terminalStatuses = GetTerminalStatuses();
ImmutableHashSet<P.OrchestrationStatus> dedupeStatusSet = dedupeStatuses.ToImmutableHashSet();
P.OrchestrationIdReusePolicy policy = new();
// Add terminal statuses that are NOT in dedupeStatuses as replaceable
foreach (P.OrchestrationStatus terminalStatus in terminalStatuses.Where(status => !dedupeStatusSet.Contains(status)))
{
policy.ReplaceableStatus.Add(terminalStatus);
}
// Only return policy if we have replaceable statuses
return policy.ReplaceableStatus.Count > 0 ? policy : null;
}
/// <summary>
/// Converts an OrchestrationIdReusePolicy with replaceable statuses to dedupe statuses
/// (statuses that should NOT be replaced).
/// </summary>
/// <param name="policy">The OrchestrationIdReusePolicy containing replaceable statuses.</param>
/// <returns>An array of orchestration statuses that should NOT be replaced, or null if all terminal statuses are replaceable.</returns>
/// <remarks>
/// The policy uses "replaceableStatus" - these are statuses that CAN be replaced.
/// dedupeStatuses are statuses that should NOT be replaced (should throw exception).
/// So dedupeStatuses = all terminal statuses MINUS replaceableStatus.
/// </remarks>
public static P.OrchestrationStatus[]? ConvertReusePolicyToDedupeStatuses(
P.OrchestrationIdReusePolicy? policy)
{
if (policy == null || policy.ReplaceableStatus.Count == 0)
{
return null;
}
ImmutableArray<P.OrchestrationStatus> terminalStatuses = GetTerminalStatuses();
ImmutableHashSet<P.OrchestrationStatus> replaceableStatusSet = policy.ReplaceableStatus.ToImmutableHashSet();
// Calculate dedupe statuses = terminal statuses - replaceable statuses
P.OrchestrationStatus[] dedupeStatuses = terminalStatuses
.Where(terminalStatus => !replaceableStatusSet.Contains(terminalStatus))
.ToArray();
// Only return if there are dedupe statuses
return dedupeStatuses.Length > 0 ? dedupeStatuses : null;
}
#pragma warning disable 0618 // Referencing Obsolete member. This is intention as we are only converting it.
/// <summary>
/// Converts <see cref="OrchestrationRuntimeStatus" /> to <see cref="P.OrchestrationStatus" />.
/// </summary>
src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs
Outdated
Show resolved
Hide resolved
halspang
left a comment
There was a problem hiding this comment.
I feel like this is backwards to what I would intuitively have thought. I think, looking at the actual reuse policy makes more sense from a naming perspective.
I have an orchestration with an Id 1 and a status Completed. I want to be able to reuse that Id, so I have a policy with the status Completed in it.
In the case of this PR, it seems like that's actually the opposite of what we're doing. If I have an orchestration who's Id I want to reuse that is completed, I provide the values Failed, Running, and Terminated? Or, I could provide an empty array?
Is this following a paradigm from another portion of Durable Functions? If not, I would strongly consider switching the naming around. Or, if you feel really strongly about it, we can have that conversation and see how others feel about it since I could be the only one who feels this way :)
#183 this pr addresses this issue to support dedupstatus overloaded creation as defined in durabletask. without it, it is breaking customers as mentioned in the github issue. yes agree it feels confusing and inverse of replaceablestatuses but it mainly is needed to ensure backward compatibility from replaceablestatus to dedupstatus. |
Fix #183
This pull request introduces support for idempotent orchestration creation by allowing orchestration instances to be deduplicated based on their runtime status. The main changes add a new
DedupeStatusesoption for orchestration creation, implement type-safe configuration via extension methods, propagate this option through the client and server layers, and add comprehensive integration tests to verify the new behavior.Idempotent orchestration creation
DedupeStatusesproperty toStartOrchestrationOptions, enabling orchestration creation to throw an exception if an instance with the same ID exists in a specified status. This supports idempotent orchestration creation.StartOrchestrationOptionsExtensionswith a type-safeWithDedupeStatusesmethod and a list of valid deduplication statuses, improving API usability and safety.Client and server propagation
GrpcDurableTaskClient,ShimDurableTaskClient, andTaskHubGrpcServerto honor the new deduplication logic, mapping dedupe statuses to protocol fields and throwing anOrchestrationAlreadyExistsExceptionwhen appropriate. [1] [2] [3] [4]AlreadyExists.Integration tests