Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
eb8f4c4
first commit, will add e2e tests in the next one
Jan 16, 2026
939f980
Merge branch 'dev' into stevosyan/add-dedupe-statuses
Jan 16, 2026
28bd993
added the e2e tests:
Jan 17, 2026
56c9551
removing unnecessary changes
Jan 17, 2026
9145d44
made hellocities more succinct
Jan 17, 2026
c1a7805
missed instance ID assignment
Jan 17, 2026
7bfde01
PR comment updates
Jan 21, 2026
c22d39f
fixing whitespace
Jan 21, 2026
73e34ba
removed linked cancellation token use
Jan 26, 2026
a90d03e
updated the exception handling in the e2e test to only catch Orchestr…
Jan 26, 2026
5f07dea
Merge branch 'dev' into stevosyan/add-dedupe-statuses
Jan 26, 2026
ca2d617
removing question comments
Jan 26, 2026
ab99f05
removed unnecessary usings
Jan 26, 2026
4aaccc2
fixing a comment
Jan 26, 2026
6d5ee8c
updating the terminating poll to end if the orchestration reaches any…
Jan 26, 2026
ff08413
updated HTTP request to include cancellation token
Jan 27, 2026
ff30220
fixing the build bug
Jan 27, 2026
8b4c17a
fixed CreateTaskOrchestration overrides to ultimately call the same m…
Jan 27, 2026
aad6002
PR comments:
Feb 4, 2026
33d0f63
updating the termination poll logic to use WaitForOrchestration
Feb 5, 2026
ffc19f9
Merge branch 'dev' into stevosyan/add-dedupe-statuses
Feb 5, 2026
0f0f28d
Adding an ArgumentException for invalid dedupe statuses (any running …
Feb 11, 2026
dc29240
added support to terminate existing running instances for restart
Feb 11, 2026
142e172
Merge branch 'dev' into stevosyan/add-dedupe-statuses
Feb 11, 2026
7e53ac6
Update src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs
sophiatev Feb 11, 2026
ac48ed7
Update test/e2e/Tests/Tests/DedupeStatusesTests.cs
sophiatev Feb 11, 2026
25d8932
Update test/e2e/Tests/Tests/RestartOrchestrationTests.cs
sophiatev Feb 11, 2026
b608832
fixing the typoe
Feb 11, 2026
583c22e
added log assertions to the tests, removed the huge RestartOrchestrat…
Feb 11, 2026
d530408
Update test/e2e/Apps/BasicJava/src/main/java/com/function/HelloCities…
sophiatev Feb 11, 2026
2ea3890
fixing the build warnings and addressing PR comments
Feb 20, 2026
4850850
Merge branch 'dev' into stevosyan/add-dedupe-statuses
Feb 20, 2026
cda3ae0
removing the version specified in one of the package references in VS…
Feb 20, 2026
5f1a596
further fixing the nuget build errors
Feb 20, 2026
2c0af1e
continuing the nuget attempts, addressing some copilot comments
Feb 20, 2026
8959fcf
fixed the failing test
Feb 20, 2026
513376d
updated .NET SDK dependencies
Feb 26, 2026
5c76266
Merge branch 'dev' into stevosyan/add-dedupe-statuses
Feb 26, 2026
d478db5
attempting to fix the e2e dts and mssql errors
Feb 27, 2026
fc60b13
Update test/e2e/Apps/BasicPowerShell/LargeOutputOrchestrator/run.ps1
sophiatev Feb 27, 2026
7d3ecfc
moving the placement of the log checks in the restart tests to try to…
Feb 27, 2026
371bc48
added conditional skip on the Suspended status for MSSQL since it doe…
Feb 27, 2026
ef4f298
changing the logic in the restart test to wait for the restart to com…
Feb 27, 2026
da873e5
Update test/e2e/Apps/BasicNode/src/functions/LargeOutputOrchestrator.ts
sophiatev Feb 27, 2026
e630439
more changes to try to fix the flakiness in the tests
Feb 27, 2026
5d54f01
missed a pending case in DedupeStatusesTests
Feb 27, 2026
d25c05c
fixing the ScheduledStartTime typo in the tests
Feb 28, 2026
b9a7453
updated the DTS and MSSQL dependencies which unblocked certain test c…
Mar 2, 2026
2fbc898
Merge branch 'dev' into stevosyan/add-dedupe-statuses
Mar 2, 2026
0bdca81
updated the mssql dependency to the working package
Mar 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp" Version="3.9.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.Workspaces" Version="3.9.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.Workspaces.Common" Version="3.9.0" />
<PackageVersion Include="Microsoft.DurableTask.Client.Grpc" Version="1.20.1" />
<PackageVersion Include="Microsoft.DurableTask.Worker.Grpc" Version="1.20.1" />
<PackageVersion Include="Microsoft.DurableTask.Abstractions" Version="1.20.1" />
<PackageVersion Include="Microsoft.DurableTask.Client.Grpc" Version="1.22.0" />
<PackageVersion Include="Microsoft.DurableTask.Worker.Grpc" Version="1.22.0" />
<PackageVersion Include="Microsoft.DurableTask.Abstractions" Version="1.22.0" />
<PackageVersion Include="Microsoft.DurableTask.Analyzers" Version="0.2.0" />
<PackageVersion Include="Microsoft.Extensions.Azure" Version="1.7.0" />
<PackageVersion Include="Microsoft.Extensions.Caching.Memory" Version="10.0.0" />
Expand Down Expand Up @@ -58,8 +58,8 @@
<PackageVersion Include="Microsoft.Azure.DurableTask.Redis" Version="0.1.9-alpha" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker" Version="2.2.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.ApplicationInsights" Version="2.0.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask.AzureManaged" Version="0.4.2-alpha" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask.SqlServer" Version="1.5.2" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask.AzureManaged" Version="1.4.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask.SqlServer" Version="1.5.4" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.3.0" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore" Version="2.0.2" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Sdk" Version="2.0.5"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1140,16 +1140,6 @@ async Task<string> IDurableOrchestrationClient.RestartAsync(string instanceId, b
// GetOrchestrationInstanceStateAsync will throw ArgumentException if the provided instanceid is not found.
OrchestrationState state = await this.GetOrchestrationInstanceStateAsync(instanceId);

bool isInstanceNotCompleted = state.OrchestrationStatus == OrchestrationStatus.Running ||
state.OrchestrationStatus == OrchestrationStatus.Pending ||
state.OrchestrationStatus == OrchestrationStatus.Suspended;

if (isInstanceNotCompleted && !restartWithNewInstanceId)
{
throw new InvalidOperationException($"Instance '{instanceId}' cannot be restarted while it is in state '{state.OrchestrationStatus}'. " +
"Wait until it has completed, or restart with a new instance ID.");
}

JToken input = ParseToJToken(state.Input);

string newInstanceId = null;
Expand Down
150 changes: 143 additions & 7 deletions src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.Entities;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using DurableTask.Core.Query;
using Microsoft.Azure.WebJobs.Host.Scale;
Expand Down Expand Up @@ -38,6 +40,7 @@ public class DurabilityProvider :
private readonly IOrchestrationServiceClient innerServiceClient;
private readonly IEntityOrchestrationService entityOrchestrationService;
private readonly string connectionName;
private readonly int orchestrationCreationRequestTimeoutInSeconds = 180;

/// <summary>
/// Creates the default <see cref="DurabilityProvider"/>.
Expand Down Expand Up @@ -406,10 +409,63 @@ public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage)
return this.GetOrchestrationServiceClient().CreateTaskOrchestrationAsync(creationMessage);
}

/// <inheritdoc />
public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses)
{
return this.GetOrchestrationServiceClient().CreateTaskOrchestrationAsync(creationMessage, dedupeStatuses);
/// <summary>
/// Creates a new task orchestration instance using the specified creation message and dedupe statuses.
/// </summary>
/// <param name="creationMessage">The creation message for the orchestration.</param>
/// <param name="dedupeStatuses">An array of orchestration statuses used for "dedupping":
/// If an orchestration with the same instance ID already exists, and its status is in this array, then a
/// <see cref="OrchestrationAlreadyExistsException"/> will be thrown.
/// If the array contains all of the running statuses (<see cref="OrchestrationStatus.Pending"/>, <see cref="OrchestrationStatus.Running"/>,
/// and <see cref="OrchestrationStatus.Suspended"/>), then only terminal statuses can be reused.
/// If at least one of these statuses is not included in the array, then if an instance with that status is found, it will first be terminated
/// before a new orchestration is created. If the existing instance does not reach a terminal state within 3 minutes, the operation will be cancelled.
/// </param>
/// <returns>A task that completes when the creation message for the task orchestration instance is enqueued.</returns>
/// <exception cref="OrchestrationAlreadyExistsException">Thrown if an orchestration with the same instance ID already exists and its status
/// is in <paramref name="dedupeStatuses"/>.</exception>
/// <exception cref="OperationCanceledException">Thrown if an existing running instance does not reach a terminal state within 3 minutes.</exception>
/// <exception cref="ArgumentException">
/// Thrown if <paramref name="dedupeStatuses"/> contains <see cref="OrchestrationStatus.Terminated"/> but also allows at least one running status
/// to be reusable. In this case, an existing orchestration with that running status would be terminated, but the creation of the new orchestration
/// would immediately fail due to the existing orchestration now having status <see cref="OrchestrationStatus.Terminated"/>.
/// </exception>
public async Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses)
{
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(this.orchestrationCreationRequestTimeoutInSeconds));
await this.CreateTaskOrchestrationAsync(creationMessage, dedupeStatuses, timeoutCts.Token);
}

/// <summary>
/// Creates a new task orchestration instance using the specified creation message and dedupe statuses.
/// </summary>
/// <param name="creationMessage">The creation message for the orchestration.</param>
/// <param name="dedupeStatuses">An array of orchestration statuses used for "dedupping":
/// If an orchestration with the same instance ID already exists, and its status is in this array, then a
/// <see cref="OrchestrationAlreadyExistsException"/> will be thrown.
/// If the array contains all of the running statuses (<see cref="OrchestrationStatus.Pending"/>, <see cref="OrchestrationStatus.Running"/>,
/// and <see cref="OrchestrationStatus.Suspended"/>), then only terminal statuses can be reused.
/// If at least one of these statuses is not included in the array, then if an instance with that status is found, it will first be terminated
/// before a new orchestration is created. This method will wait for the instance to reach a terminal status for a maximum of one hour or
/// until the <paramref name="cancellationToken"/> is invoked, whichever occurs first.</param>
/// <param name="cancellationToken">The cancellation token used to cancel waiting for an existing instance to terminate in the case that a
/// non-terminal instance is found whose runtime status is not included in <paramref name="dedupeStatuses"/>.</param>
/// <returns>A task that completes when the creation message for the task orchestration instance is enqueued.</returns>
/// <exception cref="OrchestrationAlreadyExistsException">Thrown if an orchestration with the same instance ID already exists and its status
/// is in <paramref name="dedupeStatuses"/>.</exception>
/// <exception cref="OperationCanceledException">Thrown if the operation is cancelled via <paramref name="cancellationToken"/>.</exception>
/// <exception cref="ArgumentException">
/// Thrown if <paramref name="dedupeStatuses"/> contains <see cref="OrchestrationStatus.Terminated"/> but also allows at least one running status
/// to be reusable. In this case, an existing orchestration with that running status would be terminated, but the creation of the new orchestration
/// would immediately fail due to the existing orchestration now having status <see cref="OrchestrationStatus.Terminated"/>.
/// </exception>
public async virtual Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses, CancellationToken cancellationToken)
{
await this.TerminateTaskOrchestrationWithReusableRunningStatusAndWaitAsync(
creationMessage.OrchestrationInstance.InstanceId,
dedupeStatuses,
cancellationToken);
await this.GetOrchestrationServiceClient().CreateTaskOrchestrationAsync(creationMessage, dedupeStatuses);
}

/// <inheritdoc />
Expand Down Expand Up @@ -614,10 +670,90 @@ public virtual Task<IAsyncEnumerable<HistoryEvent>> StreamOrchestrationHistoryAs
/// <summary>
/// Attempts to modify the durability service's UseSeparateQueueForEntityWorkItems property.
/// </summary>
/// <param name="newValue">The value to set</param>
/// <param name="newValue">The value to set.</param>
public virtual void SetUseSeparateQueueForEntityWorkItems(bool newValue)
{
throw this.GetNotImplementedException(nameof(this.SetUseSeparateQueueForEntityWorkItems));
}

/// <summary>
/// If an orchestration exists with a status that is not in <paramref name="dedupeStatuses"/> and has a running status (one of
/// <see cref="OrchestrationStatus.Pending"/>, <see cref="OrchestrationStatus.Running"/>, or <see cref="OrchestrationStatus.Suspended"/>),
/// then this method terminates the specified orchestration instance and waits until:
/// - The orchestration's status changes to <see cref="OrchestrationStatus.Terminated"/>,
/// - or the orchestration is deleted,
/// - or the operation is cancelled via the <paramref name="cancellationToken"/>.
/// </summary>
/// <param name="instanceId">The instance ID of the orchestration.</param>
/// <param name="dedupeStatuses">The dedupe statuses of the orchestration.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that completes when any of the above conditions are reached.</returns>
/// <exception cref="OperationCanceledException">Thrown if the operation is cancelled via the <paramref name="cancellationToken"/>.</exception>
/// <exception cref="OrchestrationAlreadyExistsException">Thrown if an orchestration already exists with status in <paramref name="dedupeStatuses"/>.</exception>
/// <exception cref="ArgumentException">Thrown if <paramref name="dedupeStatuses"/> contains <see cref="OrchestrationStatus.Terminated"/> but allows
/// at least one running status to be reusable.</exception>
private async Task TerminateTaskOrchestrationWithReusableRunningStatusAndWaitAsync(
string instanceId,
OrchestrationStatus[] dedupeStatuses,
CancellationToken cancellationToken)
{
var runningStatuses = new List<OrchestrationStatus>()
{
OrchestrationStatus.Running,
OrchestrationStatus.Pending,
OrchestrationStatus.Suspended,
};

if (dedupeStatuses != null && runningStatuses.Any(
status => !dedupeStatuses.Contains(status)) && dedupeStatuses.Contains(OrchestrationStatus.Terminated))
{
throw new ArgumentException(
"Invalid dedupe statuses: cannot include 'Terminated' while also allowing reuse of running instances, " +
"because the running instance would be terminated and then immediately conflict with the dedupe check.");
}

bool IsRunning(OrchestrationStatus status) =>
runningStatuses.Contains(status);

// At least one running status is reusable, so determine if an orchestration already exists with this status and terminate it if so
if (dedupeStatuses == null || runningStatuses.Any(status => !dedupeStatuses.Contains(status)))
{
OrchestrationState orchestrationState = await this.GetOrchestrationStateAsync(instanceId, executionId: null);

if (orchestrationState != null)
{
if (dedupeStatuses?.Contains(orchestrationState.OrchestrationStatus) == true)
{
throw new OrchestrationAlreadyExistsException($"An orchestration with instance ID '{instanceId}' and status " +
$"'{orchestrationState.OrchestrationStatus}' already exists");
}

if (IsRunning(orchestrationState.OrchestrationStatus))
{
// Check for cancellation before attempting to terminate the orchestration
cancellationToken.ThrowIfCancellationRequested();

string dedupeStatusesDescription = dedupeStatuses == null
? "null (all statuses reusable)"
: dedupeStatuses.Length == 0
? "[] (all statuses reusable)"
: $"[{string.Join(", ", dedupeStatuses)}]";

string terminationReason = $"A new instance creation request has been issued for instance {instanceId} which " +
$"currently has status {orchestrationState.OrchestrationStatus}. Since the dedupe statuses of the creation request, " +
$"{dedupeStatusesDescription}, do not contain the orchestration's status, the orchestration has been terminated " +
$"and a new instance with the same instance ID will be created.";

await this.ForceTerminateTaskOrchestrationAsync(instanceId, terminationReason);

await this.WaitForOrchestrationAsync(
instanceId,
orchestrationState.OrchestrationInstance.ExecutionId,
TimeSpan.FromHours(1),
cancellationToken);
}
}
}
}
}
}
}
2 changes: 1 addition & 1 deletion src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1480,7 +1480,7 @@ Task<HttpResponseMessage> IAsyncConverter<HttpRequestMessage, HttpResponseMessag
HttpRequestMessage request,
CancellationToken cancellationToken)
{
return this.HttpApiHandler.HandleRequestAsync(request);
return this.HttpApiHandler.HandleRequestAsync(request, cancellationToken);
}

internal static string ValidatePayloadSize(string payload)
Expand Down
23 changes: 19 additions & 4 deletions src/WebJobs.Extensions.DurableTask/HttpApiHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.Threading;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using Microsoft.AspNetCore.Routing;
using Microsoft.AspNetCore.Routing.Template;
Expand Down Expand Up @@ -285,7 +286,7 @@ internal async Task<HttpResponseMessage> WaitForCompletionOrCreateCheckStatusRes
}
}

public async Task<HttpResponseMessage> HandleRequestAsync(HttpRequestMessage request)
public async Task<HttpResponseMessage> HandleRequestAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
try
{
Expand All @@ -309,7 +310,7 @@ public async Task<HttpResponseMessage> HandleRequestAsync(HttpRequestMessage req
string instanceId = (string)routeValues[InstanceIdRouteParameter];
if (request.Method == HttpMethod.Post)
{
return await this.HandleStartOrchestratorRequestAsync(request, functionName, instanceId);
return await this.HandleStartOrchestratorRequestAsync(request, functionName, instanceId, cancellationToken);
}
else
{
Expand Down Expand Up @@ -906,7 +907,8 @@ private async Task<HttpResponseMessage> HandleResumeInstanceRequestAsync(
private async Task<HttpResponseMessage> HandleStartOrchestratorRequestAsync(
HttpRequestMessage request,
string functionName,
string instanceId)
string instanceId,
CancellationToken cancellationToken)
{
try
{
Expand Down Expand Up @@ -965,7 +967,8 @@ await durableClient.DurabilityProvider.CreateTaskOrchestrationAsync(
Event = executionStartedEvent,
OrchestrationInstance = instance,
},
this.config.Options.OverridableExistingInstanceStates.ToDedupeStatuses());
this.config.Options.OverridableExistingInstanceStates.ToDedupeStatuses(),
cancellationToken);
}
else
{
Expand Down Expand Up @@ -994,6 +997,14 @@ await durableClient.DurabilityProvider.CreateTaskOrchestrationAsync(
{
return request.CreateErrorResponse(HttpStatusCode.BadRequest, "Invalid JSON content", e);
}
catch (OrchestrationAlreadyExistsException e)
{
return request.CreateErrorResponse(HttpStatusCode.Conflict, e.Message);
}
catch (ArgumentException e)
{
return request.CreateErrorResponse(HttpStatusCode.BadRequest, e.Message);
}
}

private static string GetHeaderValueFromHeaders(string header, HttpRequestHeaders headers)
Expand Down Expand Up @@ -1068,6 +1079,10 @@ private async Task<HttpResponseMessage> HandleRestartInstanceRequestAsync(
{
return request.CreateErrorResponse(HttpStatusCode.BadRequest, "InstanceId does not match a valid orchestration instance.", e);
}
catch (OrchestrationAlreadyExistsException e)
{
return request.CreateErrorResponse(HttpStatusCode.Conflict, "A non-terminal instance with this instance ID already exists.", e);
}
catch (JsonReaderException e)
{
return request.CreateErrorResponse(HttpStatusCode.BadRequest, "Invalid JSON content", e);
Expand Down
Loading
Loading