diff --git a/Directory.Packages.props b/Directory.Packages.props index e643fcd1a..faa4ca470 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -27,9 +27,9 @@ - - - + + + @@ -58,8 +58,8 @@ - - + + diff --git a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs index 40d2c660c..815679400 100644 --- a/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs +++ b/src/WebJobs.Extensions.DurableTask/ContextImplementations/DurableClient.cs @@ -1140,16 +1140,6 @@ async Task IDurableOrchestrationClient.RestartAsync(string instanceId, b // GetOrchestrationInstanceStateAsync will throw ArgumentException if the provided instanceid is not found. OrchestrationState state = await this.GetOrchestrationInstanceStateAsync(instanceId); - bool isInstanceNotCompleted = state.OrchestrationStatus == OrchestrationStatus.Running || - state.OrchestrationStatus == OrchestrationStatus.Pending || - state.OrchestrationStatus == OrchestrationStatus.Suspended; - - if (isInstanceNotCompleted && !restartWithNewInstanceId) - { - throw new InvalidOperationException($"Instance '{instanceId}' cannot be restarted while it is in state '{state.OrchestrationStatus}'. " + - "Wait until it has completed, or restart with a new instance ID."); - } - JToken input = ParseToJToken(state.Input); string newInstanceId = null; diff --git a/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs b/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs index ba501daf7..5d27d59d8 100644 --- a/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs +++ b/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs @@ -1,12 +1,14 @@ -// Copyright (c) .NET Foundation. All rights reserved. +// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See LICENSE in the project root for license information. using System; using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; using DurableTask.Core; using DurableTask.Core.Entities; +using DurableTask.Core.Exceptions; using DurableTask.Core.History; using DurableTask.Core.Query; using Microsoft.Azure.WebJobs.Host.Scale; @@ -38,6 +40,7 @@ public class DurabilityProvider : private readonly IOrchestrationServiceClient innerServiceClient; private readonly IEntityOrchestrationService entityOrchestrationService; private readonly string connectionName; + private readonly int orchestrationCreationRequestTimeoutInSeconds = 180; /// /// Creates the default . @@ -406,10 +409,63 @@ public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage) return this.GetOrchestrationServiceClient().CreateTaskOrchestrationAsync(creationMessage); } - /// - public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses) - { - return this.GetOrchestrationServiceClient().CreateTaskOrchestrationAsync(creationMessage, dedupeStatuses); + /// + /// Creates a new task orchestration instance using the specified creation message and dedupe statuses. + /// + /// The creation message for the orchestration. + /// An array of orchestration statuses used for "dedupping": + /// If an orchestration with the same instance ID already exists, and its status is in this array, then a + /// will be thrown. + /// If the array contains all of the running statuses (, , + /// and ), then only terminal statuses can be reused. + /// If at least one of these statuses is not included in the array, then if an instance with that status is found, it will first be terminated + /// before a new orchestration is created. If the existing instance does not reach a terminal state within 3 minutes, the operation will be cancelled. + /// + /// A task that completes when the creation message for the task orchestration instance is enqueued. + /// Thrown if an orchestration with the same instance ID already exists and its status + /// is in . + /// Thrown if an existing running instance does not reach a terminal state within 3 minutes. + /// + /// Thrown if contains but also allows at least one running status + /// to be reusable. In this case, an existing orchestration with that running status would be terminated, but the creation of the new orchestration + /// would immediately fail due to the existing orchestration now having status . + /// + public async Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses) + { + using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(this.orchestrationCreationRequestTimeoutInSeconds)); + await this.CreateTaskOrchestrationAsync(creationMessage, dedupeStatuses, timeoutCts.Token); + } + + /// + /// Creates a new task orchestration instance using the specified creation message and dedupe statuses. + /// + /// The creation message for the orchestration. + /// An array of orchestration statuses used for "dedupping": + /// If an orchestration with the same instance ID already exists, and its status is in this array, then a + /// will be thrown. + /// If the array contains all of the running statuses (, , + /// and ), then only terminal statuses can be reused. + /// If at least one of these statuses is not included in the array, then if an instance with that status is found, it will first be terminated + /// before a new orchestration is created. This method will wait for the instance to reach a terminal status for a maximum of one hour or + /// until the is invoked, whichever occurs first. + /// The cancellation token used to cancel waiting for an existing instance to terminate in the case that a + /// non-terminal instance is found whose runtime status is not included in . + /// A task that completes when the creation message for the task orchestration instance is enqueued. + /// Thrown if an orchestration with the same instance ID already exists and its status + /// is in . + /// Thrown if the operation is cancelled via . + /// + /// Thrown if contains but also allows at least one running status + /// to be reusable. In this case, an existing orchestration with that running status would be terminated, but the creation of the new orchestration + /// would immediately fail due to the existing orchestration now having status . + /// + public async virtual Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses, CancellationToken cancellationToken) + { + await this.TerminateTaskOrchestrationWithReusableRunningStatusAndWaitAsync( + creationMessage.OrchestrationInstance.InstanceId, + dedupeStatuses, + cancellationToken); + await this.GetOrchestrationServiceClient().CreateTaskOrchestrationAsync(creationMessage, dedupeStatuses); } /// @@ -614,10 +670,90 @@ public virtual Task> StreamOrchestrationHistoryAs /// /// Attempts to modify the durability service's UseSeparateQueueForEntityWorkItems property. /// - /// The value to set + /// The value to set. public virtual void SetUseSeparateQueueForEntityWorkItems(bool newValue) { throw this.GetNotImplementedException(nameof(this.SetUseSeparateQueueForEntityWorkItems)); } + + /// + /// If an orchestration exists with a status that is not in and has a running status (one of + /// , , or ), + /// then this method terminates the specified orchestration instance and waits until: + /// - The orchestration's status changes to , + /// - or the orchestration is deleted, + /// - or the operation is cancelled via the . + /// + /// The instance ID of the orchestration. + /// The dedupe statuses of the orchestration. + /// The cancellation token. + /// A task that completes when any of the above conditions are reached. + /// Thrown if the operation is cancelled via the . + /// Thrown if an orchestration already exists with status in . + /// Thrown if contains but allows + /// at least one running status to be reusable. + private async Task TerminateTaskOrchestrationWithReusableRunningStatusAndWaitAsync( + string instanceId, + OrchestrationStatus[] dedupeStatuses, + CancellationToken cancellationToken) + { + var runningStatuses = new List() + { + OrchestrationStatus.Running, + OrchestrationStatus.Pending, + OrchestrationStatus.Suspended, + }; + + if (dedupeStatuses != null && runningStatuses.Any( + status => !dedupeStatuses.Contains(status)) && dedupeStatuses.Contains(OrchestrationStatus.Terminated)) + { + throw new ArgumentException( + "Invalid dedupe statuses: cannot include 'Terminated' while also allowing reuse of running instances, " + + "because the running instance would be terminated and then immediately conflict with the dedupe check."); + } + + bool IsRunning(OrchestrationStatus status) => + runningStatuses.Contains(status); + + // At least one running status is reusable, so determine if an orchestration already exists with this status and terminate it if so + if (dedupeStatuses == null || runningStatuses.Any(status => !dedupeStatuses.Contains(status))) + { + OrchestrationState orchestrationState = await this.GetOrchestrationStateAsync(instanceId, executionId: null); + + if (orchestrationState != null) + { + if (dedupeStatuses?.Contains(orchestrationState.OrchestrationStatus) == true) + { + throw new OrchestrationAlreadyExistsException($"An orchestration with instance ID '{instanceId}' and status " + + $"'{orchestrationState.OrchestrationStatus}' already exists"); + } + + if (IsRunning(orchestrationState.OrchestrationStatus)) + { + // Check for cancellation before attempting to terminate the orchestration + cancellationToken.ThrowIfCancellationRequested(); + + string dedupeStatusesDescription = dedupeStatuses == null + ? "null (all statuses reusable)" + : dedupeStatuses.Length == 0 + ? "[] (all statuses reusable)" + : $"[{string.Join(", ", dedupeStatuses)}]"; + + string terminationReason = $"A new instance creation request has been issued for instance {instanceId} which " + + $"currently has status {orchestrationState.OrchestrationStatus}. Since the dedupe statuses of the creation request, " + + $"{dedupeStatusesDescription}, do not contain the orchestration's status, the orchestration has been terminated " + + $"and a new instance with the same instance ID will be created."; + + await this.ForceTerminateTaskOrchestrationAsync(instanceId, terminationReason); + + await this.WaitForOrchestrationAsync( + instanceId, + orchestrationState.OrchestrationInstance.ExecutionId, + TimeSpan.FromHours(1), + cancellationToken); + } + } + } + } } -} +} \ No newline at end of file diff --git a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs index 42b2873a3..ab799a484 100644 --- a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs +++ b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs @@ -1480,7 +1480,7 @@ Task IAsyncConverter WaitForCompletionOrCreateCheckStatusRes } } - public async Task HandleRequestAsync(HttpRequestMessage request) + public async Task HandleRequestAsync(HttpRequestMessage request, CancellationToken cancellationToken) { try { @@ -309,7 +310,7 @@ public async Task HandleRequestAsync(HttpRequestMessage req string instanceId = (string)routeValues[InstanceIdRouteParameter]; if (request.Method == HttpMethod.Post) { - return await this.HandleStartOrchestratorRequestAsync(request, functionName, instanceId); + return await this.HandleStartOrchestratorRequestAsync(request, functionName, instanceId, cancellationToken); } else { @@ -906,7 +907,8 @@ private async Task HandleResumeInstanceRequestAsync( private async Task HandleStartOrchestratorRequestAsync( HttpRequestMessage request, string functionName, - string instanceId) + string instanceId, + CancellationToken cancellationToken) { try { @@ -965,7 +967,8 @@ await durableClient.DurabilityProvider.CreateTaskOrchestrationAsync( Event = executionStartedEvent, OrchestrationInstance = instance, }, - this.config.Options.OverridableExistingInstanceStates.ToDedupeStatuses()); + this.config.Options.OverridableExistingInstanceStates.ToDedupeStatuses(), + cancellationToken); } else { @@ -994,6 +997,14 @@ await durableClient.DurabilityProvider.CreateTaskOrchestrationAsync( { return request.CreateErrorResponse(HttpStatusCode.BadRequest, "Invalid JSON content", e); } + catch (OrchestrationAlreadyExistsException e) + { + return request.CreateErrorResponse(HttpStatusCode.Conflict, e.Message); + } + catch (ArgumentException e) + { + return request.CreateErrorResponse(HttpStatusCode.BadRequest, e.Message); + } } private static string GetHeaderValueFromHeaders(string header, HttpRequestHeaders headers) @@ -1068,6 +1079,10 @@ private async Task HandleRestartInstanceRequestAsync( { return request.CreateErrorResponse(HttpStatusCode.BadRequest, "InstanceId does not match a valid orchestration instance.", e); } + catch (OrchestrationAlreadyExistsException e) + { + return request.CreateErrorResponse(HttpStatusCode.Conflict, "A non-terminal instance with this instance ID already exists.", e); + } catch (JsonReaderException e) { return request.CreateErrorResponse(HttpStatusCode.BadRequest, "Invalid JSON content", e); diff --git a/src/WebJobs.Extensions.DurableTask/LocalHttpListener.cs b/src/WebJobs.Extensions.DurableTask/LocalHttpListener.cs index 2fddd3c7b..cad4233bf 100644 --- a/src/WebJobs.Extensions.DurableTask/LocalHttpListener.cs +++ b/src/WebJobs.Extensions.DurableTask/LocalHttpListener.cs @@ -32,7 +32,7 @@ internal class LocalHttpListener : IDisposable private const int MinPort = 30000; private const int MaxPort = 31000; - private readonly Func> handler; + private readonly Func> handler; private readonly EndToEndTraceHelper traceHelper; private readonly DurableTaskOptions durableTaskOptions; private readonly Random portGenerator; @@ -47,7 +47,7 @@ internal class LocalHttpListener : IDisposable public LocalHttpListener( EndToEndTraceHelper traceHelper, DurableTaskOptions durableTaskOptions, - Func> handler) + Func> handler) { this.traceHelper = traceHelper ?? throw new ArgumentNullException(nameof(traceHelper)); this.handler = handler ?? throw new ArgumentNullException(nameof(handler)); @@ -182,7 +182,7 @@ private async Task HandleRequestAsync(HttpContext context) try { HttpRequestMessage request = GetRequest(context); - HttpResponseMessage response = await this.handler(request); + HttpResponseMessage response = await this.handler(request, context.RequestAborted); await SetResponseAsync(context, response); } catch (Exception e) diff --git a/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs b/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs index 47a721030..aa2807c60 100644 --- a/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs +++ b/src/WebJobs.Extensions.DurableTask/TaskHubGrpcServer.cs @@ -57,7 +57,23 @@ public override Task Hello(Empty request, ServerCallContext context) public async override Task StartInstance(P.CreateInstanceRequest request, ServerCallContext context) { try - { + { + List allStatuses = System.Enum + .GetValues() + .ToList(); + + // Not all clients are necessarily configured to set the OrchestrationIdReusePolicy field of the request. + // If it is null, we assume that they do not support per-request-dedupe statuses, and default to using just + // the OverridableExistingInstanceStates setting instead. + List reusableStatuses = request.OrchestrationIdReusePolicy is null + ? allStatuses + : request.OrchestrationIdReusePolicy.ReplaceableStatus.Select(status => (OrchestrationStatus)status).ToList(); + + OrchestrationStatus[] dedupeStatuses = allStatuses + .Except(reusableStatuses) + .Union(this.extension.Options.OverridableExistingInstanceStates.ToDedupeStatuses()) + .ToArray(); + // Create the orchestration instance var instance = new OrchestrationInstance { @@ -93,7 +109,8 @@ await this.GetDurabilityProvider(context).CreateTaskOrchestrationAsync( Event = executionStartedEvent, OrchestrationInstance = instance, }, - this.GetStatusesNotToOverride()); + dedupeStatuses, + context.CancellationToken); return new P.CreateInstanceResponse { @@ -108,6 +125,10 @@ await this.GetDurabilityProvider(context).CreateTaskOrchestrationAsync( { throw new RpcException(new Status(StatusCode.AlreadyExists, $"An Orchestration instance with the ID {request.InstanceId} already exists.")); } + catch (ArgumentException ex) + { + throw new RpcException(new Status(StatusCode.InvalidArgument, $"Invalid argument for start instance request for instance ID {request.InstanceId}: {ex.Message}")); + } catch (Exception ex) { this.extension.TraceHelper.ExtensionWarningEvent( @@ -119,12 +140,6 @@ await this.GetDurabilityProvider(context).CreateTaskOrchestrationAsync( } } - private OrchestrationStatus[] GetStatusesNotToOverride() - { - OverridableStates overridableStates = this.extension.Options.OverridableExistingInstanceStates; - return overridableStates.ToDedupeStatuses(); - } - public async override Task RaiseEvent(P.RaiseEventRequest request, ServerCallContext context) { bool throwStatusExceptionsOnRaiseEvent = this.extension.Options.ThrowStatusExceptionsOnRaiseEvent ?? this.extension.DefaultDurabilityProvider.CheckStatusBeforeRaiseEvent; @@ -523,9 +538,9 @@ private OrchestrationStatus[] GetStatusesNotToOverride() // Thrown when th instanceId is not found. throw new RpcException(new Status(StatusCode.NotFound, $"ArgumentException: {ex.Message}")); } - catch (InvalidOperationException ex) + catch (OrchestrationAlreadyExistsException ex) { - throw new RpcException(new Status(StatusCode.FailedPrecondition, $"InvalidOperationException: {ex.Message}")); + throw new RpcException(new Status(StatusCode.FailedPrecondition, $"Non-terminal instance with this instance ID already exists: {ex.Message}")); } catch (Exception ex) { diff --git a/test/Common/DurableTaskEndToEndTests.cs b/test/Common/DurableTaskEndToEndTests.cs index 25e88188c..5158ae836 100644 --- a/test/Common/DurableTaskEndToEndTests.cs +++ b/test/Common/DurableTaskEndToEndTests.cs @@ -15,6 +15,7 @@ using System.Threading.Tasks; using Azure.Storage.Blobs; using DurableTask.Core; +using DurableTask.Core.Exceptions; using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations; using Microsoft.Azure.WebJobs.Extensions.DurableTask.Options; using Microsoft.Azure.WebJobs.Host; @@ -4745,6 +4746,38 @@ await Assert.ThrowsAsync(async () => } } + [Theory] + [Trait("Category", PlatformSpecificHelpers.TestCategory)] + [MemberData(nameof(GetBooleanAndFullFeaturedStorageProviderOptionsWithOverridableStatesAndSuspend))] + public async Task OverridableStates_RunningStatusesCorrectlyDeduped_ForRestart( + bool extendedSessions, + string storageProvider, + bool anyStateOverridable, + bool suspend) + { + await this.OverridableStates_RunningStatusesCorrectlyDeduped( + extendedSessions, + storageProvider, + anyStateOverridable, + suspend, + restart: true); + } + + [Theory] + [Trait("Category", PlatformSpecificHelpers.TestCategory)] + [MemberData(nameof(GetBooleanAndFullFeaturedStorageProviderOptionsWithOverridableStates))] + public async Task OverridableStates_TerminalStatusesAlwaysReusable_ForRestart( + bool extendedSessions, + string storageProvider, + bool anyStateOverridable) + { + await this.OverridableStates_TerminalStatusesAlwaysReusable( + extendedSessions, + storageProvider, + anyStateOverridable, + restart: true); + } + [Theory] [Trait("Category", PlatformSpecificHelpers.TestCategory)] [MemberData(nameof(TestDataGenerator.GetBooleanAndFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))] @@ -5278,48 +5311,55 @@ await Assert.ThrowsAsync(async () => } } - [Theory] - [Trait("Category", PlatformSpecificHelpers.TestCategory)] - [MemberData(nameof(TestDataGenerator.GetBooleanAndFullFeaturedStorageProviderOptions), MemberType = typeof(TestDataGenerator))] - public async Task DedupeStates_AnyState(bool extendedSessions, string storageProvider) + // This method returns an array of [bool extendedSessions, string storageProvider, bool anyStateOverridable, bool suspend] + // It combines the GetBooleanAndFullFeaturedStorageProviderOptionsWithOverridableStates with both true and false for suspend. + public static IEnumerable GetBooleanAndFullFeaturedStorageProviderOptionsWithOverridableStatesAndSuspend() { - DurableTaskOptions options = new DurableTaskOptions(); - options.OverridableExistingInstanceStates = OverridableStates.AnyState; - - var instanceId = "OverridableStatesAnyStateTest_" + Guid.NewGuid().ToString("N"); - - using (ITestHost host = TestHelpers.GetJobHost( - this.loggerProvider, - nameof(this.DedupeStates_AnyState), - extendedSessions, - storageProviderType: storageProvider, - options: options)) + foreach (object[] data in GetBooleanAndFullFeaturedStorageProviderOptionsWithOverridableStates()) { - await host.StartAsync(); - - int initialValue = 0; - - var client = await host.StartOrchestratorAsync(nameof(TestOrchestrations.Counter), initialValue, this.output, instanceId: instanceId); - - // Wait for the instance to go into the Running state. This is necessary to ensure log validation consistency. - await client.WaitForStartupAsync(this.output); - - TimeSpan waitTimeout = TimeSpan.FromSeconds(Debugger.IsAttached ? 300 : 10); - - // Perform some operations - await client.RaiseEventAsync("operation", "incr", this.output); - await client.WaitForCustomStatusAsync(waitTimeout, this.output, 1); + yield return new object[] { data[0], data[1], data[2], true }; + yield return new object[] { data[0], data[1], data[2], false }; + } + } - // Make sure it's still running and didn't complete early (or fail). - var status = await client.GetStatusAsync(); - Assert.True( - status?.RuntimeStatus == OrchestrationRuntimeStatus.Running || - status?.RuntimeStatus == OrchestrationRuntimeStatus.ContinuedAsNew); + // This method returns an array of [bool extendedSessions, string storageProvider, bool anyStateOverridable] + // It combines the TestDataGenerator.GetBooleanAndFullFeaturedStorageProviderOptions with both true and false for anyStateOverridable. + public static IEnumerable GetBooleanAndFullFeaturedStorageProviderOptionsWithOverridableStates() + { + foreach (object[] data in TestDataGenerator.GetBooleanAndFullFeaturedStorageProviderOptions()) + { + yield return new object[] { data[0], data[1], true }; + yield return new object[] { data[0], data[1], false }; + } + } - await host.StartOrchestratorAsync(nameof(TestOrchestrations.Counter), initialValue, this.output, instanceId: instanceId); + [Theory] + [Trait("Category", PlatformSpecificHelpers.TestCategory)] + [MemberData(nameof(GetBooleanAndFullFeaturedStorageProviderOptionsWithOverridableStatesAndSuspend))] + public async Task OverridableStates_RunningStatusesCorrectlyDeduped_ForStartNew( + bool extendedSessions, + string storageProvider, + bool anyStateOverridable, + bool suspend) + { + await this.OverridableStates_RunningStatusesCorrectlyDeduped( + extendedSessions, + storageProvider, + anyStateOverridable, + suspend, + restart: false); + } - await host.StopAsync(); - } + [Theory] + [Trait("Category", PlatformSpecificHelpers.TestCategory)] + [MemberData(nameof(GetBooleanAndFullFeaturedStorageProviderOptionsWithOverridableStates))] + public async Task OverridableStates_TerminalStatusesAlwaysReusable_ForStartNew(bool extendedSessions, string storageProvider, bool anyStateOverridable) + { + await this.OverridableStates_TerminalStatusesAlwaysReusable( + extendedSessions, + storageProvider, + anyStateOverridable, + restart: false); } [Fact] @@ -6001,6 +6041,180 @@ private static void ValidateHttpManagementPayload(HttpManagementPayload httpMana httpManagementPayload.RestartPostUri); } + private async Task OverridableStates_RunningStatusesCorrectlyDeduped( + bool extendedSessions, + string storageProvider, + bool anyStateOverridable, + bool suspend, + bool restart) + { + DurableTaskOptions options = new () + { + OverridableExistingInstanceStates = anyStateOverridable ? OverridableStates.AnyState : OverridableStates.NonRunningStates, + }; + + string instanceId = Guid.NewGuid().ToString("N"); + + using ITestHost host = TestHelpers.GetJobHost( + this.loggerProvider, + restart ? nameof(this.OverridableStates_RunningStatusesCorrectlyDeduped_ForRestart) + : nameof(this.OverridableStates_RunningStatusesCorrectlyDeduped_ForStartNew), + extendedSessions, + storageProviderType: storageProvider, + options: options); + + await host.StartAsync(); + + int initialValue = 0; + + TestDurableClient client = await host.StartOrchestratorAsync( + nameof(TestOrchestrations.Counter), + initialValue, + this.output, + instanceId: instanceId); + + // Wait for the instance to go into the Running state. This is necessary to ensure log validation consistency. + await client.WaitForStartupAsync(this.output); + + var waitTimeout = TimeSpan.FromSeconds(Debugger.IsAttached ? 300 : 10); + + // Perform some operations + await client.RaiseEventAsync("operation", "incr", this.output); + await client.WaitForCustomStatusAsync(waitTimeout, this.output, 1); + + // Make sure it's still running and didn't complete early (or fail). + DurableOrchestrationStatus status = await client.GetStatusAsync(); + Assert.Equal(OrchestrationRuntimeStatus.Running, status?.RuntimeStatus); + + if (suspend) + { + await client.SuspendAsync("suspend for test"); + DurableOrchestrationStatus suspendedStatus = await client.WaitForStatusChange(this.output, OrchestrationRuntimeStatus.Suspended); + Assert.Equal(OrchestrationRuntimeStatus.Suspended, suspendedStatus?.RuntimeStatus); + } + + Exception exception = null; + try + { + if (restart) + { + await client.InnerClient.RestartAsync(instanceId, restartWithNewInstanceId: false); + } + else + { + await host.StartOrchestratorAsync(nameof(TestOrchestrations.Counter), initialValue, this.output, instanceId: instanceId); + } + } + catch (Exception caughtException) + { + exception = caughtException; + } + + await host.StopAsync(); + + // If any state is reusable, confirm that there is evidence the existing orchestration was terminated before the new one was created + if (anyStateOverridable && this.useTestLogger) + { + IReadOnlyCollection durableTaskCoreLogs = + this.loggerProvider.CreatedLoggers.Single(l => l.Category == "DurableTask.Core").LogMessages; + Assert.Contains(durableTaskCoreLogs, log => log.ToString().StartsWith($"{instanceId}: Orchestration completed with a 'Terminated' status")); + } + + // Otherwise confirm that an exception was thrown when trying to create a new orchestration when one with a nonterminal status already exists + else if (!anyStateOverridable) + { + Assert.NotNull(exception); + if (restart) + { + Assert.IsType(exception); + } + else + { + Assert.IsType(exception); + var functionInvocationException = (FunctionInvocationException)exception; + Assert.NotNull(functionInvocationException.InnerException); + Assert.IsType(functionInvocationException.InnerException); + } + } + } + + private async Task OverridableStates_TerminalStatusesAlwaysReusable( + bool extendedSessions, + string storageProvider, + bool anyStateOverridable, + bool restart) + { + DurableTaskOptions options = new () + { + OverridableExistingInstanceStates = anyStateOverridable ? OverridableStates.AnyState : OverridableStates.NonRunningStates, + }; + + string instanceIdBase = Guid.NewGuid().ToString("N"); + + using ITestHost host = TestHelpers.GetJobHost( + this.loggerProvider, + restart ? nameof(this.OverridableStates_TerminalStatusesAlwaysReusable_ForRestart) + : nameof(this.OverridableStates_TerminalStatusesAlwaysReusable_ForStartNew), + extendedSessions, + storageProviderType: storageProvider, + options: options); + await host.StartAsync(); + + int initialValue = 0; + + // Test for all terminal statuses: Completed, Failed, Terminated + foreach (OrchestrationRuntimeStatus terminalStatus in new[] + { + OrchestrationRuntimeStatus.Completed, + OrchestrationRuntimeStatus.Failed, + OrchestrationRuntimeStatus.Terminated, + }) + { + string instanceId = instanceIdBase + "_" + terminalStatus; + + TestDurableClient client; + client = await host.StartOrchestratorAsync( + terminalStatus == OrchestrationRuntimeStatus.Failed + ? nameof(TestOrchestrations.ThrowOrchestrator) : nameof(TestOrchestrations.Counter), + terminalStatus == OrchestrationRuntimeStatus.Failed ? string.Empty : initialValue, + this.output, + instanceId: instanceId); + + await client.WaitForStartupAsync(this.output); + DurableOrchestrationStatus status = null; + + if (terminalStatus == OrchestrationRuntimeStatus.Completed) + { + await client.RaiseEventAsync("operation", "end", this.output); + } + else if (terminalStatus == OrchestrationRuntimeStatus.Terminated) + { + await client.TerminateAsync("test terminate"); + } + + status = await client.WaitForCompletionAsync(this.output); + Assert.NotNull(status); + Assert.Equal(terminalStatus, status.RuntimeStatus); + + // Should always be able to start a new orchestration with the same instanceId + if (restart) + { + await client.InnerClient.RestartAsync(instanceId, restartWithNewInstanceId: false); + } + else + { + await host.StartOrchestratorAsync( + terminalStatus == OrchestrationRuntimeStatus.Failed + ? nameof(TestOrchestrations.ThrowOrchestrator) : nameof(TestOrchestrations.Counter), + terminalStatus == OrchestrationRuntimeStatus.Failed ? string.Empty : initialValue, + this.output, + instanceId: instanceId); + } + } + + await host.StopAsync(); + } + [DataContract] internal class ComplexType { diff --git a/test/Common/HttpApiHandlerTests.cs b/test/Common/HttpApiHandlerTests.cs index 70128ca02..586f297ec 100644 --- a/test/Common/HttpApiHandlerTests.cs +++ b/test/Common/HttpApiHandlerTests.cs @@ -11,6 +11,7 @@ using System.Threading; using System.Threading.Tasks; using DurableTask.Core; +using DurableTask.Core.Exceptions; using DurableTask.Core.History; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -490,7 +491,8 @@ public async Task HandleGetStatusRequestAsync_Failed_Orchestration_Config_Respon { Method = HttpMethod.Get, RequestUri = getStatusRequestUriBuilder.Uri, - }); + }, + CancellationToken.None); Assert.Equal(statusCode, responseMessage.StatusCode); } @@ -532,7 +534,8 @@ public async Task GetAllStatus_is_Success() { Method = HttpMethod.Get, RequestUri = getStatusRequestUriBuilder.Uri, - }); + }, + CancellationToken.None); Assert.Equal(HttpStatusCode.OK, responseMessage.StatusCode); Assert.Equal(string.Empty, responseMessage.Headers.GetValues("x-ms-continuation-token").FirstOrDefault()); var actual = JsonConvert.DeserializeObject>(await responseMessage.Content.ReadAsStringAsync()); @@ -596,7 +599,8 @@ public async Task GetQueryStatus_is_Success() { Method = HttpMethod.Get, RequestUri = getStatusRequestUriBuilder.Uri, - }); + }, + CancellationToken.None); Assert.Equal(HttpStatusCode.OK, responseMessage.StatusCode); var actual = JsonConvert.DeserializeObject>(await responseMessage.Content.ReadAsStringAsync()); clientMock.Verify(x => x.ListInstancesAsync(It.IsAny(), It.IsAny())); @@ -672,7 +676,7 @@ public async Task GetQueryStatusWithPaging_is_Success() }; requestMessage.Headers.Add("x-ms-continuation-token", "XXXX-XXXXXXXX-XXXXXXXXXXXX"); - var responseMessage = await httpApiHandler.HandleRequestAsync(requestMessage); + var responseMessage = await httpApiHandler.HandleRequestAsync(requestMessage, CancellationToken.None); Assert.Equal(HttpStatusCode.OK, responseMessage.StatusCode); Assert.Equal("YYYY-YYYYYYYY-YYYYYYYYYYYY", responseMessage.Headers.GetValues("x-ms-continuation-token").FirstOrDefault()); var actual = JsonConvert.DeserializeObject>(await responseMessage.Content.ReadAsStringAsync()); @@ -738,7 +742,8 @@ public async Task GetQueryMultipleRuntimeStatus_is_Success() { Method = HttpMethod.Get, RequestUri = getStatusRequestUriBuilder.Uri, - }); + }, + CancellationToken.None); Assert.Equal(HttpStatusCode.OK, responseMessage.StatusCode); var actual = JsonConvert.DeserializeObject>(await responseMessage.Content.ReadAsStringAsync()); clientMock.Verify(x => x.ListInstancesAsync(It.IsAny(), It.IsAny())); @@ -796,7 +801,8 @@ public async Task GetQueryWithoutRuntimeStatus_is_Success() { Method = HttpMethod.Get, RequestUri = getStatusRequestUriBuilder.Uri, - }); + }, + CancellationToken.None); Assert.Equal(HttpStatusCode.OK, responseMessage.StatusCode); var actual = JsonConvert.DeserializeObject>(await responseMessage.Content.ReadAsStringAsync()); clientMock.Verify(x => x.ListInstancesAsync(It.IsAny(), It.IsAny())); @@ -837,7 +843,8 @@ public async Task HandleGetStatusRequestAsync_Correctly_Parses_InstanceId_With_S { Method = HttpMethod.Get, RequestUri = getStatusRequestUriBuilder.Uri, - }); + }, + CancellationToken.None); var actual = JsonConvert.DeserializeObject(await responseMessage.Content.ReadAsStringAsync()); Assert.Equal(HttpStatusCode.OK, responseMessage.StatusCode); @@ -883,7 +890,8 @@ await httpApiHandler.HandleRequestAsync( { Method = HttpMethod.Post, RequestUri = terminateRequestUriBuilder.Uri, - }); + }, + CancellationToken.None); Assert.Equal(testInstanceId, actualInstanceId); Assert.Equal(testReason, actualReason); @@ -928,7 +936,8 @@ await httpApiHandler.HandleRequestAsync( { Method = HttpMethod.Post, RequestUri = suspendRequestUriBuilder.Uri, - }); + }, + CancellationToken.None); Assert.Equal(testInstanceId, actualInstanceId); Assert.Equal(testReason, actualReason); @@ -973,7 +982,8 @@ await httpApiHandler.HandleRequestAsync( { Method = HttpMethod.Post, RequestUri = resumeRequestUriBuilder.Uri, - }); + }, + CancellationToken.None); Assert.Equal(testInstanceId, actualInstanceId); Assert.Equal(testReason, actualReason); @@ -1029,7 +1039,7 @@ public async Task RestartInstance_Is_Success(bool restartWithNewInstanceId) .Returns(testResponse); var httpApiHandler = new ExtendedHttpApiHandler(clientMock.Object); - var actualResponse = await httpApiHandler.HandleRequestAsync(testRequest); + var actualResponse = await httpApiHandler.HandleRequestAsync(testRequest, CancellationToken.None); Assert.Equal(HttpStatusCode.Accepted, actualResponse.StatusCode); var content = await actualResponse.Content.ReadAsStringAsync(); @@ -1094,7 +1104,7 @@ public async Task RestartInstanceAndWaitToComplete_Is_Success(bool restartWithNe .Returns(Task.FromResult(testResponse)); var httpApiHandler = new ExtendedHttpApiHandler(clientMock.Object); - var actualResponse = await httpApiHandler.HandleRequestAsync(testRequest); + var actualResponse = await httpApiHandler.HandleRequestAsync(testRequest, CancellationToken.None); Assert.Equal(HttpStatusCode.Accepted, actualResponse.StatusCode); var content = await actualResponse.Content.ReadAsStringAsync(); @@ -1130,7 +1140,7 @@ public async Task RestartInstance_Returns_HTTP_400_On_Invalid_InstanceId() .Throws(new ArgumentException()); var httpApiHandler = new ExtendedHttpApiHandler(clientMock.Object); - var actualResponse = await httpApiHandler.HandleRequestAsync(testRequest); + var actualResponse = await httpApiHandler.HandleRequestAsync(testRequest, CancellationToken.None); Assert.Equal(HttpStatusCode.BadRequest, actualResponse.StatusCode); var content = await actualResponse.Content.ReadAsStringAsync(); @@ -1138,6 +1148,35 @@ public async Task RestartInstance_Returns_HTTP_400_On_Invalid_InstanceId() Assert.Equal("InstanceId does not match a valid orchestration instance.", error["Message"].ToString()); } + [Fact] + [Trait("Category", PlatformSpecificHelpers.TestCategory)] + public async Task RestartInstance_Returns_HTTP_409_On_Invalid_Existing_Instance() + { + string testBadInstanceId = Guid.NewGuid().ToString("N"); + + var startRequestUriBuilder = new UriBuilder(TestConstants.NotificationUrl); + startRequestUriBuilder.Path += $"/Instances/{testBadInstanceId}/restart"; + + var testRequest = new HttpRequestMessage + { + Method = HttpMethod.Post, + RequestUri = startRequestUriBuilder.Uri, + }; + + var clientMock = new Mock(); + clientMock + .Setup(x => x.RestartAsync(It.IsAny(), It.IsAny())) + .Throws(new OrchestrationAlreadyExistsException()); + + var httpApiHandler = new ExtendedHttpApiHandler(clientMock.Object); + var actualResponse = await httpApiHandler.HandleRequestAsync(testRequest, CancellationToken.None); + + Assert.Equal(HttpStatusCode.Conflict, actualResponse.StatusCode); + var content = await actualResponse.Content.ReadAsStringAsync(); + var error = JsonConvert.DeserializeObject(content); + Assert.Equal("A non-terminal instance with this instance ID already exists.", error["Message"].ToString()); + } + [Theory] [InlineData(null, false)] [InlineData(null, true)] @@ -1192,7 +1231,7 @@ public async Task StartNewInstance_Is_Success(string instanceId, bool hasContent .Returns(testResponse); var httpApiHandler = new ExtendedHttpApiHandler(clientMock.Object); - var actualResponse = await httpApiHandler.HandleRequestAsync(testRequest); + var actualResponse = await httpApiHandler.HandleRequestAsync(testRequest, CancellationToken.None); Assert.Equal(HttpStatusCode.Accepted, actualResponse.StatusCode); var content = await actualResponse.Content.ReadAsStringAsync(); @@ -1262,7 +1301,7 @@ public async Task StartNewInstanceAndWaitToComplete_Is_Success(string instanceId .Returns(Task.FromResult(testResponse)); var httpApiHandler = new ExtendedHttpApiHandler(clientMock.Object); - var actualResponse = await httpApiHandler.HandleRequestAsync(testRequest); + var actualResponse = await httpApiHandler.HandleRequestAsync(testRequest, CancellationToken.None); Assert.Equal(HttpStatusCode.Accepted, actualResponse.StatusCode); var content = await actualResponse.Content.ReadAsStringAsync(); @@ -1304,7 +1343,7 @@ public async Task StartNewInstance_Returns_HTTP_400_On_Bad_JSON() .Throws(new JsonReaderException()); var httpApiHandler = new ExtendedHttpApiHandler(clientMock.Object); - var actualResponse = await httpApiHandler.HandleRequestAsync(testRequest); + var actualResponse = await httpApiHandler.HandleRequestAsync(testRequest, CancellationToken.None); Assert.Equal(HttpStatusCode.BadRequest, actualResponse.StatusCode); var content = await actualResponse.Content.ReadAsStringAsync(); @@ -1336,13 +1375,45 @@ public async Task StartNewInstance_Returns_HTTP_400_On_Missing_Function() .Throws(new ArgumentException(exceptionMessage)); var httpApiHandler = new ExtendedHttpApiHandler(clientMock.Object); - var actualResponse = await httpApiHandler.HandleRequestAsync(testRequest); + var actualResponse = await httpApiHandler.HandleRequestAsync(testRequest, CancellationToken.None); Assert.Equal(HttpStatusCode.BadRequest, actualResponse.StatusCode); var content = await actualResponse.Content.ReadAsStringAsync(); var error = JsonConvert.DeserializeObject(content); - Assert.Equal("One or more of the arguments submitted is incorrect", error["Message"].ToString()); - Assert.Equal(exceptionMessage, error["ExceptionMessage"].ToString()); + Assert.Equal(exceptionMessage, error["Message"].ToString()); + } + + [Fact] + [Trait("Category", PlatformSpecificHelpers.TestCategory)] + public async Task StartNewInstance_Returns_HTTP_409_On_Existing_Orchestration() + { + string testInstanceId = Guid.NewGuid().ToString("N"); + string testFunctionName = "TestOrchestrator"; + + var startRequestUriBuilder = new UriBuilder(TestConstants.NotificationUrl); + startRequestUriBuilder.Path += $"/Orchestrators/{testFunctionName}"; + + var testRequest = new HttpRequestMessage + { + Method = HttpMethod.Post, + RequestUri = startRequestUriBuilder.Uri, + Content = new StringContent("\"TestContent\"", Encoding.UTF8, "application/json"), + }; + + string exceptionMessage = $"An orchestration with instance ID '{testInstanceId}' and status " + + $"'{OrchestrationStatus.Running}' already exists"; + var clientMock = new Mock(); + clientMock + .Setup(x => x.StartNewAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Throws(new OrchestrationAlreadyExistsException(exceptionMessage)); + + var httpApiHandler = new ExtendedHttpApiHandler(clientMock.Object); + var actualResponse = await httpApiHandler.HandleRequestAsync(testRequest, CancellationToken.None); + + Assert.Equal(HttpStatusCode.Conflict, actualResponse.StatusCode); + var content = await actualResponse.Content.ReadAsStringAsync(); + var error = JsonConvert.DeserializeObject(content); + Assert.Equal(exceptionMessage, error["Message"].ToString()); } [Theory] @@ -1379,7 +1450,7 @@ public async Task GetEntity_Returns_State_Or_HTTP_404(bool hasKey, bool exists) .Returns(Task.FromResult(result)); var httpApiHandler = new ExtendedHttpApiHandler(clientMock.Object); - var actualResponse = await httpApiHandler.HandleRequestAsync(testRequest); + var actualResponse = await httpApiHandler.HandleRequestAsync(testRequest, CancellationToken.None); if (exists) { @@ -1483,7 +1554,7 @@ public async Task Entities_Query_Calls_ListEntitiesAsync(bool useNameFilter, boo // Test HttpApiHandler response var httpApiHandler = new ExtendedHttpApiHandler(clientMock.Object); - HttpResponseMessage responseMessage = await httpApiHandler.HandleRequestAsync(requestMessage); + HttpResponseMessage responseMessage = await httpApiHandler.HandleRequestAsync(requestMessage, CancellationToken.None); Assert.Equal(HttpStatusCode.OK, responseMessage.StatusCode); clientMock.Verify(x => x.ListEntitiesAsync(It.IsAny(), It.IsAny())); @@ -1588,7 +1659,7 @@ public async Task SignalEntity_Is_Success(bool hasKey, bool hasOp, bool hasConte } var httpApiHandler = new ExtendedHttpApiHandler(clientMock.Object); - var actualResponse = await httpApiHandler.HandleRequestAsync(testRequest); + var actualResponse = await httpApiHandler.HandleRequestAsync(testRequest, CancellationToken.None); Assert.Equal(HttpStatusCode.Accepted, actualResponse.StatusCode); } @@ -1723,7 +1794,7 @@ public async Task StartNewInstance_Calls_CreateTaskOrchestrationAsync_With_Corre var handler = new HttpApiHandler(customExtension, NullLogger.Instance); var request = new HttpRequestMessage(HttpMethod.Post, requestUri); - var response = await handler.HandleRequestAsync(request); + var response = await handler.HandleRequestAsync(request, CancellationToken.None); // Verify mock interactions orchestrationServiceClientMock.Verify( diff --git a/test/e2e/Apps/BasicDotNetIsolated/HelloCities.cs b/test/e2e/Apps/BasicDotNetIsolated/HelloCities.cs index 70b490646..f04af2ab8 100644 --- a/test/e2e/Apps/BasicDotNetIsolated/HelloCities.cs +++ b/test/e2e/Apps/BasicDotNetIsolated/HelloCities.cs @@ -1,6 +1,8 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using System.Net; +using DurableTask.Core.Exceptions; using Microsoft.Azure.Functions.Worker; using Microsoft.Azure.Functions.Worker.Http; using Microsoft.DurableTask; @@ -41,12 +43,13 @@ public static async Task StartOrchestration( [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req, [DurableClient] DurableTaskClient client, FunctionContext executionContext, - string orchestrationName) + string orchestrationName, + string? instanceId) { ILogger logger = executionContext.GetLogger(nameof(StartOrchestration)); - // Function input comes from the request content. - string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestrationName); + // Function input comes from the request content. + instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestrationName, new StartOrchestrationOptions(InstanceId: instanceId)); logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId); @@ -60,18 +63,77 @@ public static async Task HttpStartScheduled( [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req, [DurableClient] DurableTaskClient client, FunctionContext executionContext, - DateTime scheduledStartTime) + DateTime scheduledStartTime, + string? instanceId) { ILogger logger = executionContext.GetLogger("HelloCities_HttpStart"); - var startOptions = new StartOrchestrationOptions(StartAt: scheduledStartTime); + var startOptions = new StartOrchestrationOptions(StartAt: scheduledStartTime, InstanceId: instanceId); // Function input comes from the request content. - string instanceId = await client.ScheduleNewOrchestrationInstanceAsync( + instanceId = await client.ScheduleNewOrchestrationInstanceAsync( nameof(HelloCities), startOptions); logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId); + // Returns an HTTP 202 response with an instance management payload. + // See https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-http-api#start-orchestration + return await client.CreateCheckStatusResponseAsync(req, instanceId); + } + + [Function(nameof(StartOrchestration_DedupeStatuses))] + public static async Task StartOrchestration_DedupeStatuses( + [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req, + [DurableClient] DurableTaskClient client, + FunctionContext executionContext, + string orchestrationName, + string instanceId, + string[] dedupeStatuses, + DateTime? scheduledStartTime) + { + ILogger logger = executionContext.GetLogger(nameof(StartOrchestration_DedupeStatuses)); + + StartOrchestrationOptions startOptions = new(InstanceId: instanceId); + + var parsedStatuses = new OrchestrationRuntimeStatus[dedupeStatuses.Length]; + for (int i = 0; i < dedupeStatuses.Length; i++) + { + string statusStr = dedupeStatuses[i]; + if (!Enum.TryParse(statusStr, ignoreCase: true, out var status)) + { + throw new ArgumentException($"Invalid OrchestrationRuntimeStatus value: '{statusStr}'", nameof(dedupeStatuses)); + } + parsedStatuses[i] = status; + } + startOptions = startOptions.WithDedupeStatuses(parsedStatuses); + + if (scheduledStartTime is not null) + { + startOptions = startOptions with { StartAt = scheduledStartTime }; + } + + // Function input comes from the request content. + try + { + await client.ScheduleNewOrchestrationInstanceAsync(orchestrationName, startOptions); + } + catch (OrchestrationAlreadyExistsException ex) + { + // Tests expect Conflict (409) for orchestration dedupe scenarios. + HttpResponseData response = req.CreateResponse(HttpStatusCode.Conflict); + await response.WriteStringAsync(ex.Message); + return response; + } + catch (ArgumentException ex) + { + // Tests expect BadRequest for invalid dedupe statuses + HttpResponseData response = req.CreateResponse(HttpStatusCode.BadRequest); + await response.WriteStringAsync(ex.Message); + return response; + } + + logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId); + // Returns an HTTP 202 response with an instance management payload. // See https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-http-api#start-orchestration return await client.CreateCheckStatusResponseAsync(req, instanceId); diff --git a/test/e2e/Apps/BasicDotNetIsolated/LargeOutputOrchestrator.cs b/test/e2e/Apps/BasicDotNetIsolated/LargeOutputOrchestrator.cs index d8c927948..a4ec889f7 100644 --- a/test/e2e/Apps/BasicDotNetIsolated/LargeOutputOrchestrator.cs +++ b/test/e2e/Apps/BasicDotNetIsolated/LargeOutputOrchestrator.cs @@ -19,6 +19,10 @@ public static async Task> RunOrchestrator( { ILogger logger = context.CreateReplaySafeLogger(nameof(LargeOutputOrchestrator)); int sizeInKB = context.GetInput(); + if (sizeInKB <= 0) + { + throw new ArgumentOutOfRangeException(nameof(sizeInKB)); + } logger.LogInformation("Saying hello."); var outputs = new List(); diff --git a/test/e2e/Apps/BasicDotNetIsolated/host.json b/test/e2e/Apps/BasicDotNetIsolated/host.json index 036495c64..58c3e7e5b 100644 --- a/test/e2e/Apps/BasicDotNetIsolated/host.json +++ b/test/e2e/Apps/BasicDotNetIsolated/host.json @@ -11,7 +11,8 @@ "versionMatchStrategy": "CurrentOrOlder", "versionFailureStrategy": "Fail", "extendedSessionsEnabled": true, - "extendedSessionIdleTimeoutInSeconds": 30 + "extendedSessionIdleTimeoutInSeconds": 30, + "overridableExistingInstanceStates": "AnyState" } } } \ No newline at end of file diff --git a/test/e2e/Apps/BasicJava/host.json b/test/e2e/Apps/BasicJava/host.json index de73a2331..23bbaaa21 100644 --- a/test/e2e/Apps/BasicJava/host.json +++ b/test/e2e/Apps/BasicJava/host.json @@ -9,7 +9,8 @@ }, "defaultVersion": "2.0", "versionMatchStrategy": "CurrentOrOlder", - "versionFailureStrategy": "Fail" + "versionFailureStrategy": "Fail", + "overridableExistingInstanceStates": "AnyState" } } } diff --git a/test/e2e/Apps/BasicJava/src/main/java/com/function/HelloCities.java b/test/e2e/Apps/BasicJava/src/main/java/com/function/HelloCities.java index cf46da1ef..9c1bfc5eb 100644 --- a/test/e2e/Apps/BasicJava/src/main/java/com/function/HelloCities.java +++ b/test/e2e/Apps/BasicJava/src/main/java/com/function/HelloCities.java @@ -52,7 +52,9 @@ public HttpResponseMessage startOrchestration( final ExecutionContext context) { DurableTaskClient client = durableContext.getClient(); String orchestrationName = request.getQueryParameters().get("orchestrationName"); - String instanceId = client.scheduleNewOrchestrationInstance(orchestrationName); + NewOrchestrationInstanceOptions startOptions = new NewOrchestrationInstanceOptions(); + startOptions.setInstanceId(request.getQueryParameters().get("instanceId")); + String instanceId = client.scheduleNewOrchestrationInstance(orchestrationName, startOptions); context.getLogger().info("Started orchestration with ID = '" + instanceId + "'."); return durableContext.createCheckStatusResponse(request, instanceId); } @@ -75,6 +77,7 @@ public HttpResponseMessage httpStartScheduled( } NewOrchestrationInstanceOptions startOptions = new NewOrchestrationInstanceOptions(); startOptions.setStartTime(scheduledStartTime); + startOptions.setInstanceId(request.getQueryParameters().get("instanceId")); String instanceId = client.scheduleNewOrchestrationInstance("HelloCities", startOptions); context.getLogger().info("Started orchestration with ID = '" + instanceId + "'."); return durableContext.createCheckStatusResponse(request, instanceId); diff --git a/test/e2e/Apps/BasicJava/src/main/java/com/function/LargeOutputOrchestrator.java b/test/e2e/Apps/BasicJava/src/main/java/com/function/LargeOutputOrchestrator.java index ec81fc47e..1931cc256 100644 --- a/test/e2e/Apps/BasicJava/src/main/java/com/function/LargeOutputOrchestrator.java +++ b/test/e2e/Apps/BasicJava/src/main/java/com/function/LargeOutputOrchestrator.java @@ -24,6 +24,10 @@ public List runOrchestrator( final ExecutionContext context) { int sizeInKB = ctx.getInput(Integer.class); + if (sizeInKB <= 0) { + throw new IllegalArgumentException("sizeInKB must be a positive integer."); + } + context.getLogger().info("Saying hello."); List outputs = new ArrayList<>(); diff --git a/test/e2e/Apps/BasicNode/host.json b/test/e2e/Apps/BasicNode/host.json index eb83303bc..0a6ad5997 100644 --- a/test/e2e/Apps/BasicNode/host.json +++ b/test/e2e/Apps/BasicNode/host.json @@ -9,7 +9,8 @@ }, "defaultVersion": "2.0", "versionMatchStrategy": "CurrentOrOlder", - "versionFailureStrategy": "Fail" + "versionFailureStrategy": "Fail", + "overridableExistingInstanceStates": "AnyState" } } } \ No newline at end of file diff --git a/test/e2e/Apps/BasicNode/src/functions/HelloCities.ts b/test/e2e/Apps/BasicNode/src/functions/HelloCities.ts index 5fc2b4aad..1199d4d54 100644 --- a/test/e2e/Apps/BasicNode/src/functions/HelloCities.ts +++ b/test/e2e/Apps/BasicNode/src/functions/HelloCities.ts @@ -32,6 +32,7 @@ df.app.activity(activityName, { handler: HelloCitiesActivity }); const HelloCitiesHttpStartScheduled: HttpHandler = async (request: HttpRequest, context: InvocationContext): Promise => { const client = df.getClient(context); const body: unknown = await request.text(); + const instanceId: string = await client.startNew("HelloCities", { input: request.params.ScheduledStartTime }); context.log(`Started orchestration with ID = '${instanceId}'.`); @@ -48,7 +49,8 @@ app.http('HelloCities_HttpStart_Scheduled', { const StartOrchestration: HttpHandler = async (request: HttpRequest, context: InvocationContext): Promise => { const client = df.getClient(context); - const instanceId: string = await client.startNew(request.params.orchestrationName); + + const instanceId = await client.startNew(request.params.orchestrationName, { instanceId: request.params.instanceId }); context.log(`Started orchestration with ID = '${instanceId}'.`); diff --git a/test/e2e/Apps/BasicNode/src/functions/LargeOutputOrchestrator.ts b/test/e2e/Apps/BasicNode/src/functions/LargeOutputOrchestrator.ts index bec01eba9..4b6fd4a72 100644 --- a/test/e2e/Apps/BasicNode/src/functions/LargeOutputOrchestrator.ts +++ b/test/e2e/Apps/BasicNode/src/functions/LargeOutputOrchestrator.ts @@ -13,6 +13,9 @@ function generateLargeString(sizeInKB: number): string { // Orchestration const LargeOutputOrchestrator: OrchestrationHandler = function* (context: OrchestrationContext) { const sizeInKB = context.df.getInput(); + if (sizeInKB == null || sizeInKB <= 0) { + throw new Error('sizeInKB must be a positive integer.'); + } context.log('Saying hello.'); const outputs: any[] = []; const r_1 = yield context.df.callActivity('large_output_say_hello', 'Tokyo'); diff --git a/test/e2e/Apps/BasicPowerShell/LargeOutputOrchestrator/run.ps1 b/test/e2e/Apps/BasicPowerShell/LargeOutputOrchestrator/run.ps1 index abf7cd1ef..3aab9b64e 100644 --- a/test/e2e/Apps/BasicPowerShell/LargeOutputOrchestrator/run.ps1 +++ b/test/e2e/Apps/BasicPowerShell/LargeOutputOrchestrator/run.ps1 @@ -7,6 +7,10 @@ param($Context) $sizeInKB = [int]$Context.Input +if ($sizeInKB -le 0) { + throw [System.ArgumentOutOfRangeException]::new("sizeInKB") +} + Write-Information "Saying hello." $outputs = @() diff --git a/test/e2e/Apps/BasicPowerShell/StartOrchestration/run.ps1 b/test/e2e/Apps/BasicPowerShell/StartOrchestration/run.ps1 index b9313ff98..d39130fe3 100644 --- a/test/e2e/Apps/BasicPowerShell/StartOrchestration/run.ps1 +++ b/test/e2e/Apps/BasicPowerShell/StartOrchestration/run.ps1 @@ -8,7 +8,8 @@ using namespace System.Net param($Request, $TriggerMetadata) $orchestrationName = $Request.Query.orchestrationName -$InstanceId = Start-DurableOrchestration -FunctionName $orchestrationName +$InstanceId = $Request.Query.instanceId +$InstanceId = Start-DurableOrchestration -FunctionName $orchestrationName -InstanceId $InstanceId Write-Host "Started orchestration with ID = '$InstanceId'" $Response = New-DurableOrchestrationCheckStatusResponse -Request $Request -InstanceId $InstanceId diff --git a/test/e2e/Apps/BasicPowerShell/host.json b/test/e2e/Apps/BasicPowerShell/host.json index ecb594e30..ffe42bdd2 100644 --- a/test/e2e/Apps/BasicPowerShell/host.json +++ b/test/e2e/Apps/BasicPowerShell/host.json @@ -2,7 +2,8 @@ "version": "2.0", "extensions": { "durableTask": { - "hubName": "PowerShellE2ETaskHub" + "hubName": "PowerShellE2ETaskHub", + "overridableExistingInstanceStates": "AnyState" } }, "managedDependency": { diff --git a/test/e2e/Apps/BasicPython/hello_cities.py b/test/e2e/Apps/BasicPython/hello_cities.py index d01e065cd..395afd186 100644 --- a/test/e2e/Apps/BasicPython/hello_cities.py +++ b/test/e2e/Apps/BasicPython/hello_cities.py @@ -15,7 +15,7 @@ @bp.route(route="StartOrchestration") @bp.durable_client_input(client_name="client") async def http_start(req: func.HttpRequest, client): - instance_id = await client.start_new(req.params.get('orchestrationName')) + instance_id = await client.start_new(req.params.get('orchestrationName'), req.params.get('instanceId')) logging.info(f"Started orchestration with ID = '{instance_id}'.") return client.create_check_status_response(req, instance_id) diff --git a/test/e2e/Apps/BasicPython/host.json b/test/e2e/Apps/BasicPython/host.json index 334ed1737..a7ea8cfc0 100644 --- a/test/e2e/Apps/BasicPython/host.json +++ b/test/e2e/Apps/BasicPython/host.json @@ -6,7 +6,8 @@ "tracing": { "DistributedTracingEnabled": true, "Version": "V2" - } + }, + "overridableExistingInstanceStates": "AnyState" } } } \ No newline at end of file diff --git a/test/e2e/Apps/BasicPython/large_output_orchestrator.py b/test/e2e/Apps/BasicPython/large_output_orchestrator.py index 3388df00f..d39d2d7c6 100644 --- a/test/e2e/Apps/BasicPython/large_output_orchestrator.py +++ b/test/e2e/Apps/BasicPython/large_output_orchestrator.py @@ -19,6 +19,8 @@ def generate_large_string(size_in_kb: int) -> str: @bp.orchestration_trigger(context_name="context", orchestration="LargeOutputOrchestrator") def large_output_orchestrator(context: df.DurableOrchestrationContext): size_in_kb = context.get_input() + if (size_in_kb is None or size_in_kb<= 0): + raise ValueError("size_in_kb must be a positive integer.") logging.info("Saying hello.") outputs = [] r_1 = yield context.call_activity("large_output_say_hello", "Tokyo") diff --git a/test/e2e/Tests/Tests/DedupeStatusesTests.cs b/test/e2e/Tests/Tests/DedupeStatusesTests.cs new file mode 100644 index 000000000..42400b9ba --- /dev/null +++ b/test/e2e/Tests/Tests/DedupeStatusesTests.cs @@ -0,0 +1,305 @@ +using System.Net; +using System.Text.Json; +using Microsoft.Azure.WebJobs.Extensions.DurableTask; +using Xunit; +using Xunit.Abstractions; + +namespace Microsoft.Azure.Durable.Tests.DotnetIsolatedE2E; + +[Collection(Constants.FunctionAppCollectionSequentialName)] +public class DedupeStatusesTests +{ + private readonly FunctionAppFixture fixture; + private readonly ITestOutputHelper output; + + public DedupeStatusesTests(FunctionAppFixture fixture, ITestOutputHelper testOutputHelper) + { + this.fixture = fixture; + this.fixture.TestLogs.UseTestLogger(testOutputHelper); + this.output = testOutputHelper; + } + + [Fact] + public async Task CanStartOrchestration_WithSameId_ForAllStatuses_ForEmptyDedupeStatuses() + { + HttpResponseMessage terminateResponse; + + // Completed + string completedInstanceId = Guid.NewGuid().ToString(); + using HttpResponseMessage startCompletedResponseFirstAttempt = await StartAndWaitForState( + "HelloCities", completedInstanceId, "Completed"); + using HttpResponseMessage startCompletedResponseSecondAttempt = await StartAndWaitForState( + "HelloCities", completedInstanceId, "Completed"); + + // Failed + // This invocation will fail because the "LargeOutputOrchestrator" expects a non-zero input, but we provide none + string failedInstanceId = Guid.NewGuid().ToString(); + using HttpResponseMessage startFailedResponseFirstAttempt = await StartAndWaitForState( + "LargeOutputOrchestrator", failedInstanceId, "Failed"); + using HttpResponseMessage startFailedResponseSecondAttempt = await StartAndWaitForState( + "LargeOutputOrchestrator", failedInstanceId, "Failed"); + + // Terminated + if (this.fixture.functionLanguageLocalizer.GetLanguageType() != LanguageType.Java + || this.fixture.GetDurabilityProvider() != FunctionAppFixture.ConfiguredDurabilityProviderType.MSSQL) // Bug: https://github.com/microsoft/durabletask-java/issues/237 + { + string terminatedInstanceId = Guid.NewGuid().ToString(); + using HttpResponseMessage startTerminatedResponseFirstAttempt = await StartAndWaitForState( + "LongRunningOrchestrator", terminatedInstanceId, "Running"); + await TerminateAndWaitForState(terminatedInstanceId, startTerminatedResponseFirstAttempt); + using HttpResponseMessage startTerminatedResponseSecondAttempt = await StartAndWaitForState( + "LongRunningOrchestrator", terminatedInstanceId, "Running"); + // Clean-up + terminateResponse = await HttpHelpers.InvokeHttpTrigger("TerminateInstance", $"?instanceId={terminatedInstanceId}"); + Assert.Equal(HttpStatusCode.OK, terminateResponse.StatusCode); + terminateResponse.Dispose(); + } + + // Pending + // Scheduled start times are currently only implemented in Java and .NET isolated, which is the only true way + // to get an orchestration in a "Pending" state + if (this.fixture.functionLanguageLocalizer.GetLanguageType() == LanguageType.DotnetIsolated + || this.fixture.functionLanguageLocalizer.GetLanguageType() == LanguageType.Java) + { + string pendingInstanceId = Guid.NewGuid().ToString(); + DateTime scheduledStartTime = DateTime.UtcNow.AddMinutes(10); + using HttpResponseMessage startPendingResponseFirstAttempt = await StartAndWaitForState( + "HelloCities", pendingInstanceId, "Pending", scheduledStartTime: scheduledStartTime); + using HttpResponseMessage startPendingResponseSecondAttempt = await StartAndWaitForState( + "HelloCities", pendingInstanceId, "Completed"); + } + + // Running + string runningInstanceId = Guid.NewGuid().ToString(); + using HttpResponseMessage startRunningResponseFirstAttempt = await StartAndWaitForState( + "LongRunningOrchestrator", runningInstanceId, "Running"); + using HttpResponseMessage startRunningResponseSecondAttempt = await StartAndWaitForState( + "LongRunningOrchestrator", runningInstanceId, "Running"); + // Clean-up + terminateResponse = await HttpHelpers.InvokeHttpTrigger("TerminateInstance", $"?instanceId={runningInstanceId}"); + Assert.Equal(HttpStatusCode.OK, terminateResponse.StatusCode); + terminateResponse.Dispose(); + + // Suspended + string suspendedInstanceId = Guid.NewGuid().ToString(); + using HttpResponseMessage startSuspendedResponseFirstAttempt = await StartAndWaitForState( + "LongRunningOrchestrator", suspendedInstanceId, "Running"); + await SuspendAndWaitForState(suspendedInstanceId, startSuspendedResponseFirstAttempt); + using HttpResponseMessage startSuspendedResponseSecondAttempt = await StartAndWaitForState( + "LongRunningOrchestrator", suspendedInstanceId, "Running"); + // Clean-up + terminateResponse = await HttpHelpers.InvokeHttpTrigger("TerminateInstance", $"?instanceId={suspendedInstanceId}"); + Assert.Equal(HttpStatusCode.OK, terminateResponse.StatusCode); + terminateResponse.Dispose(); + } + + [Theory] + [Trait("PowerShell", "Skip")] // Dedupe statuses not implemented in PowerShell + [Trait("Python", "Skip")] // Dedupe statuses not implemented in Python + [Trait("Node", "Skip")] // Dedupe statuses not implemented in Node + [Trait("Java", "Skip")] // Dedupe statuses not implemented in Java + [InlineData([])] + [InlineData("Pending", "Failed")] + public async Task StartOrchestration_WithSameId_FailsIfExistingStatus_InDedupeStatuses(params string[] dedupeStatuses) + { + HttpResponseMessage terminateResponse; + + // Completed + string completedInstanceId = Guid.NewGuid().ToString(); + using HttpResponseMessage startCompletedResponseFirstAttempt = await StartAndWaitForStateWithDedupeStatuses( + "HelloCities", completedInstanceId, "Completed", dedupeStatuses); + using HttpResponseMessage startCompletedResponseSecondAttempt = await StartAndWaitForStateWithDedupeStatuses( + "HelloCities", + completedInstanceId, + "Completed", + dedupeStatuses, + expectedCode: dedupeStatuses.Contains("Completed") ? HttpStatusCode.Conflict : HttpStatusCode.Accepted); + + // Terminated + string terminatedInstanceId = Guid.NewGuid().ToString(); + using HttpResponseMessage startTerminatedResponseFirstAttempt = await StartAndWaitForStateWithDedupeStatuses( + "LongRunningOrchestrator", terminatedInstanceId, "Running", dedupeStatuses); + await TerminateAndWaitForState(terminatedInstanceId, startTerminatedResponseFirstAttempt); + using HttpResponseMessage startTerminatedResponseSecondAttempt = await StartAndWaitForStateWithDedupeStatuses( + "LongRunningOrchestrator", + terminatedInstanceId, + "Running", + dedupeStatuses, + expectedCode: dedupeStatuses.Contains("Terminated") ? HttpStatusCode.Conflict : HttpStatusCode.Accepted); + // Clean-up + if (!dedupeStatuses.Contains("Terminated")) + { + terminateResponse = await HttpHelpers.InvokeHttpTrigger("TerminateInstance", $"?instanceId={terminatedInstanceId}"); + Assert.Equal(HttpStatusCode.OK, terminateResponse.StatusCode); + terminateResponse.Dispose(); + } + + // Failed + // This invocation will fail because the "LargeOutputOrchestrator" expects a non-zero input, but we provide none + string failedInstanceId = Guid.NewGuid().ToString(); + using HttpResponseMessage startFailedResponseFirstAttempt = await StartAndWaitForStateWithDedupeStatuses( + "LargeOutputOrchestrator", failedInstanceId, "Failed", dedupeStatuses); + using HttpResponseMessage startFailedResponseSecondAttempt = await StartAndWaitForStateWithDedupeStatuses( + "LargeOutputOrchestrator", + failedInstanceId, + "Failed", + dedupeStatuses, + expectedCode: dedupeStatuses.Contains("Failed") ? HttpStatusCode.Conflict : HttpStatusCode.Accepted); + + // Pending + string pendingInstanceId = Guid.NewGuid().ToString(); + DateTime scheduledStartTime = DateTime.UtcNow.AddMinutes(10); + using HttpResponseMessage startPendingResponseFirstAttempt = await StartAndWaitForStateWithDedupeStatuses( + "HelloCities", pendingInstanceId, "Pending", dedupeStatuses, scheduledStartTime: scheduledStartTime); + using HttpResponseMessage startPendingResponseSecondAttempt = await StartAndWaitForStateWithDedupeStatuses( + "HelloCities", + pendingInstanceId, + "Completed", + dedupeStatuses, + expectedCode: dedupeStatuses.Contains("Pending") ? HttpStatusCode.Conflict : HttpStatusCode.Accepted); + + // Running + string runningInstanceId = Guid.NewGuid().ToString(); + using HttpResponseMessage startRunningResponseFirstAttempt = await StartAndWaitForStateWithDedupeStatuses( + "LongRunningOrchestrator", runningInstanceId, "Running", dedupeStatuses); + using HttpResponseMessage startRunningResponseSecondAttempt = await StartAndWaitForStateWithDedupeStatuses( + "LongRunningOrchestrator", + runningInstanceId, + expectedState: "Running", + dedupeStatuses, + expectedCode: dedupeStatuses.Contains("Running") ? HttpStatusCode.Conflict : HttpStatusCode.Accepted); + // Clean-up + terminateResponse = await HttpHelpers.InvokeHttpTrigger("TerminateInstance", $"?instanceId={runningInstanceId}"); + Assert.Equal(HttpStatusCode.OK, terminateResponse.StatusCode); + terminateResponse.Dispose(); + + // Suspended + string suspendedInstanceId = Guid.NewGuid().ToString(); + using HttpResponseMessage startSuspendedResponseFirstAttempt = await StartAndWaitForStateWithDedupeStatuses( + "LongRunningOrchestrator", suspendedInstanceId, "Running", dedupeStatuses); + await SuspendAndWaitForState(suspendedInstanceId, startSuspendedResponseFirstAttempt); + using HttpResponseMessage startSuspendedResponseSecondAttempt = await StartAndWaitForStateWithDedupeStatuses( + "LongRunningOrchestrator", + suspendedInstanceId, + "Running", + dedupeStatuses, + expectedCode: dedupeStatuses.Contains("Suspended") ? HttpStatusCode.Conflict : HttpStatusCode.Accepted); + // Clean-up + terminateResponse = await HttpHelpers.InvokeHttpTrigger("TerminateInstance", $"?instanceId={suspendedInstanceId}"); + Assert.Equal(HttpStatusCode.OK, terminateResponse.StatusCode); + terminateResponse.Dispose(); + } + + [Theory] + [Trait("PowerShell", "Skip")] // Dedupe statuses not implemented in PowerShell + [Trait("Python", "Skip")] // Dedupe statuses not implemented in Python + [Trait("Node", "Skip")] // Dedupe statuses not implemented in Node + [Trait("Java", "Skip")] // Dedupe statuses not implemented in Java + [InlineData("Pending", "Failed", "Terminated")] + [InlineData("Running", "Failed", "Terminated")] + [InlineData("Suspended", "Failed", "Terminated")] + public async Task StartOrchestration_WithInvalidDedupeStatuses_ThrowsArgumentException(params string[] dedupeStatuses) + { + // Dedupe statuses cannot have both "Terminated" and a running status + // We do not provide an expected state since we expect the request to fail + using HttpResponseMessage failedRequest = await StartAndWaitForStateWithDedupeStatuses( + "HelloCities", Guid.NewGuid().ToString(), expectedState: string.Empty, dedupeStatuses, expectedCode: HttpStatusCode.BadRequest); + } + + private async Task StartAndWaitForState( + string orchestrationName, + string instanceId, + string expectedState, + DateTime? scheduledStartTime = null) + { + string functionName = "StartOrchestration"; + string queryString = $"?orchestrationName={orchestrationName}&instanceId={instanceId}"; + + if (scheduledStartTime is not null) + { + queryString += $"&ScheduledStartTime={scheduledStartTime:o}"; + functionName = "HelloCities_HttpStart_Scheduled"; + } + + HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger(functionName, queryString); + Assert.Equal(HttpStatusCode.Accepted, response.StatusCode); + string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response); + await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, expectedState, 60); + + if (expectedState != "Pending") + { + ClientOperationLogHelpers.AssertClientOperationLogExists( + () => this.fixture.TestLogs.CoreToolsLogs, + "StartOrchestration", + instanceId, + this.fixture.functionLanguageLocalizer.GetLanguageType()); + } + + return response; + } + + private async Task StartAndWaitForStateWithDedupeStatuses( + string orchestrationName, + string instanceId, + string expectedState, + string[] dedupeStatuses, + DateTime? scheduledStartTime = null, + HttpStatusCode expectedCode = HttpStatusCode.Accepted) + { + string queryString = $"?orchestrationName={orchestrationName}&instanceId={instanceId}" + + $"&dedupeStatuses={JsonSerializer.Serialize(dedupeStatuses)}"; + + if (scheduledStartTime is not null) + { + queryString += $"&scheduledStartTime={scheduledStartTime:o}"; + } + + HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger("StartOrchestration_DedupeStatuses", queryString); + Assert.Equal(expectedCode, response.StatusCode); + if (expectedCode != HttpStatusCode.Accepted) + { + return response; + } + string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response); + await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, expectedState, 60); + + if (expectedState != "Pending") + { + ClientOperationLogHelpers.AssertClientOperationLogExists( + () => this.fixture.TestLogs.CoreToolsLogs, + "StartOrchestration", + instanceId, + this.fixture.functionLanguageLocalizer.GetLanguageType()); + } + + return response; + } + + private async Task TerminateAndWaitForState(string instanceId, HttpResponseMessage startOrchestrationResponse) + { + using HttpResponseMessage terminateResponse = await HttpHelpers.InvokeHttpTrigger("TerminateInstance", $"?instanceId={instanceId}"); + Assert.Equal(HttpStatusCode.OK, terminateResponse.StatusCode); + string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(startOrchestrationResponse); + await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Terminated", 60); + + ClientOperationLogHelpers.AssertClientOperationLogExists( + () => this.fixture.TestLogs.CoreToolsLogs, + "Terminate", + instanceId, + this.fixture.functionLanguageLocalizer.GetLanguageType()); + } + + private async Task SuspendAndWaitForState(string instanceId, HttpResponseMessage startOrchestrationResponse) + { + using HttpResponseMessage suspendResponse = await HttpHelpers.InvokeHttpTrigger("SuspendInstance", $"?instanceId={instanceId}"); + Assert.Equal(HttpStatusCode.OK, suspendResponse.StatusCode); + string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(startOrchestrationResponse); + await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Suspended", 60); + + ClientOperationLogHelpers.AssertClientOperationLogExists( + () => this.fixture.TestLogs.CoreToolsLogs, + "Suspend", + instanceId, + this.fixture.functionLanguageLocalizer.GetLanguageType()); + } +} diff --git a/test/e2e/Tests/Tests/DistributedTracingEntitiesTests.cs b/test/e2e/Tests/Tests/DistributedTracingEntitiesTests.cs index 9f577800b..a29b030a7 100644 --- a/test/e2e/Tests/Tests/DistributedTracingEntitiesTests.cs +++ b/test/e2e/Tests/Tests/DistributedTracingEntitiesTests.cs @@ -57,8 +57,7 @@ public async Task DistributedTracingEntitiesTest() var orchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri); // Sanitize the JSON string to remove unwanted characters so we can easily parse it into a list - var output = orchestrationDetails.Output.Replace("\r", "").Replace("\n", "").Replace("\"", "").Replace("[", "").Replace("]", "").Replace(" ", ""); - var ids = new List(output.Split(",")); + var ids = JsonSerializer.Deserialize>(orchestrationDetails.Output); // The execution is as follows: // Orchestration A signals entity A which signals entity B. Then orchestration A calls entities A and B. Finally orchestration A signals entity C. @@ -67,6 +66,7 @@ public async Task DistributedTracingEntitiesTest() // Orchestration A and B return this list of Activities as part of their output. In order to access the output of orchestration B, we need to return its // instance ID as part of the output of orchestration A. It will be the last item in the list returned by A, so we will remove it from the list and use it // to get the output of orchestration B (which will have the final two Activities, that for orchestration B and its call to entity A). + Assert.NotNull(ids); Assert.Equal(7, ids.Count); var orchestrationId = ids[ids.Count - 1]; ids.RemoveAt(ids.Count - 1); diff --git a/test/e2e/Tests/Tests/GetOrchestrationHistoryTests.cs b/test/e2e/Tests/Tests/GetOrchestrationHistoryTests.cs index c57a85b56..9af26196d 100644 --- a/test/e2e/Tests/Tests/GetOrchestrationHistoryTests.cs +++ b/test/e2e/Tests/Tests/GetOrchestrationHistoryTests.cs @@ -44,8 +44,6 @@ public GetOrchestrationHistoryTests(FunctionAppFixture fixture, ITestOutputHelpe public async Task GetOrchestrationHistory_FailedOrchestration() { bool isNotMSSQL = this.fixture.GetDurabilityProvider() != FunctionAppFixture.ConfiguredDurabilityProviderType.MSSQL; - // The other backends currently do not serialize tags when sending the history, or the failure details of an ExecutionCompletedEvent - bool checkTagsAndFailureDetails = this.fixture.GetDurabilityProvider() == FunctionAppFixture.ConfiguredDurabilityProviderType.AzureStorage; string subOrchestrationInstanceId = Guid.NewGuid().ToString(); using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger( @@ -79,7 +77,8 @@ public async Task GetOrchestrationHistory_FailedOrchestration() Assert.Equal("ParentOrchestration", parentExecutionStartedEvent.Name); Assert.Equal(new ComplexInput("fail", subOrchestrationInstanceId, OutputSize, isNotMSSQL, this.tags), JsonConvert.DeserializeObject(parentExecutionStartedEvent.Input)); - if (checkTagsAndFailureDetails) + // MSSQL does not include tags in history events + if (isNotMSSQL) { Assert.NotNull(parentExecutionStartedEvent.Tags); Assert.Contains(TagsKey, parentExecutionStartedEvent.Tags.Keys); @@ -114,17 +113,14 @@ public async Task GetOrchestrationHistory_FailedOrchestration() Assert.Equal("System.Exception", subOrchestrationFailureDetails.InnerFailure.ErrorType); Assert.Equal("Failure!", subOrchestrationFailureDetails.InnerFailure.ErrorMessage); - if (checkTagsAndFailureDetails) - { - Assert.NotNull(parentFailureDetails); - Assert.Equal("Microsoft.DurableTask.TaskFailedException", parentFailureDetails.ErrorType); - Assert.NotNull(parentFailureDetails.InnerFailure); - Assert.Equal("Microsoft.DurableTask.TaskFailedException", parentFailureDetails.InnerFailure.ErrorType); - Assert.Equal(subOrchestrationFailureDetails.ErrorMessage, parentFailureDetails.InnerFailure.ErrorMessage); - // Finally, the doubly nested inner failure of the execution completed event will correspond to the Activity failing - Assert.NotNull(parentFailureDetails.InnerFailure.InnerFailure); - Assert.Equal("Failure!", parentFailureDetails.InnerFailure.InnerFailure.ErrorMessage); - } + Assert.NotNull(parentFailureDetails); + Assert.Equal("Microsoft.DurableTask.TaskFailedException", parentFailureDetails.ErrorType); + Assert.NotNull(parentFailureDetails.InnerFailure); + Assert.Equal("Microsoft.DurableTask.TaskFailedException", parentFailureDetails.InnerFailure.ErrorType); + Assert.Equal(subOrchestrationFailureDetails.ErrorMessage, parentFailureDetails.InnerFailure.ErrorMessage); + // Finally, the doubly nested inner failure of the execution completed event will correspond to the Activity failing + Assert.NotNull(parentFailureDetails.InnerFailure.InnerFailure); + Assert.Equal("Failure!", parentFailureDetails.InnerFailure.InnerFailure.ErrorMessage); using HttpResponseMessage getSubOrchestrationHistoryResponse = await HttpHelpers.InvokeHttpTrigger("GetInstanceHistory", $"?instanceId={subOrchestrationInstanceId}"); Assert.Equal(HttpStatusCode.OK, getSubOrchestrationHistoryResponse.StatusCode); @@ -156,7 +152,8 @@ public async Task GetOrchestrationHistory_FailedOrchestration() Assert.Equal("ParentOrchestration", subOrchestrationExecutionStartedEvent.ParentInstance.Name); Assert.Equal(parentExecutionStartedEvent.OrchestrationInstance.ExecutionId, subOrchestrationExecutionStartedEvent.ParentInstance.OrchestrationInstance.ExecutionId); } - if (checkTagsAndFailureDetails) + // MSSQL does not include tags in history events + if (isNotMSSQL) { Assert.NotNull(subOrchestrationExecutionStartedEvent.Tags); Assert.Contains(TagsKey, subOrchestrationExecutionStartedEvent.Tags.Keys); @@ -165,7 +162,8 @@ public async Task GetOrchestrationHistory_FailedOrchestration() Assert.Equal(EventType.TaskScheduled, subOrchestrationHistoryEvents[2].EventType); var taskScheduledEvent = (TaskScheduledEvent)subOrchestrationHistoryEvents[2]; Assert.Equal("ThrowExceptionActivity", taskScheduledEvent.Name); - if (checkTagsAndFailureDetails) + // MSSQL does not include tags in history events + if (isNotMSSQL) { Assert.NotNull(taskScheduledEvent.Tags); Assert.Contains(TagsKey, taskScheduledEvent.Tags.Keys); @@ -189,15 +187,12 @@ public async Task GetOrchestrationHistory_FailedOrchestration() Assert.Equal("System.Exception", taskFailureDetails.ErrorType); Assert.Equal("Failure!", taskFailureDetails.ErrorMessage); - if (checkTagsAndFailureDetails) - { - Assert.NotNull(subOrchestrationFailureDetails); - Assert.Equal("Microsoft.DurableTask.TaskFailedException", subOrchestrationFailureDetails.ErrorType); - Assert.NotNull(subOrchestrationFailureDetails.InnerFailure); - // The inner failure for the suborchestration failed event will be the actual exception thrown by the Activity - Assert.Equal(taskFailureDetails.ErrorType, subOrchestrationFailureDetails.InnerFailure.ErrorType); - Assert.Equal(taskFailureDetails.ErrorMessage, subOrchestrationFailureDetails.InnerFailure.ErrorMessage); - } + Assert.NotNull(subOrchestrationFailureDetails); + Assert.Equal("Microsoft.DurableTask.TaskFailedException", subOrchestrationFailureDetails.ErrorType); + Assert.NotNull(subOrchestrationFailureDetails.InnerFailure); + // The inner failure for the suborchestration failed event will be the actual exception thrown by the Activity + Assert.Equal(taskFailureDetails.ErrorType, subOrchestrationFailureDetails.InnerFailure.ErrorType); + Assert.Equal(taskFailureDetails.ErrorMessage, subOrchestrationFailureDetails.InnerFailure.ErrorMessage); // Verify that the ClientOperationReceived logs were emitted with a FunctionInvocationId ClientOperationLogHelpers.AssertClientOperationLogExists( @@ -222,8 +217,6 @@ public async Task GetOrchestrationHistory_FailedOrchestration() public async Task GetOrchestrationHistory_LargeHistory() { bool isNotMSSQL = this.fixture.GetDurabilityProvider() != FunctionAppFixture.ConfiguredDurabilityProviderType.MSSQL; - // The other backends currently do not serialize tags when sending the history, or the failure details of an ExecutionCompletedEvent - bool checkTagsAndFailureDetails = this.fixture.GetDurabilityProvider() == FunctionAppFixture.ConfiguredDurabilityProviderType.AzureStorage; string subOrchestrationInstanceId = Guid.NewGuid().ToString(); using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger( @@ -257,7 +250,8 @@ public async Task GetOrchestrationHistory_LargeHistory() Assert.Equal("ParentOrchestration", parentExecutionStartedEvent.Name); Assert.Equal(new ComplexInput("succeed", subOrchestrationInstanceId, OutputSize, isNotMSSQL, this.tags), JsonConvert.DeserializeObject(parentExecutionStartedEvent.Input)); - if (checkTagsAndFailureDetails) + // MSSQL does not include tags in history events + if (isNotMSSQL) { Assert.NotNull(parentExecutionStartedEvent.Tags); Assert.Contains(TagsKey, parentExecutionStartedEvent.Tags.Keys); diff --git a/test/e2e/Tests/Tests/PurgeInstancesTests.cs b/test/e2e/Tests/Tests/PurgeInstancesTests.cs index f7a7e3ad9..a6055a35c 100644 --- a/test/e2e/Tests/Tests/PurgeInstancesTests.cs +++ b/test/e2e/Tests/Tests/PurgeInstancesTests.cs @@ -173,7 +173,7 @@ void AssertFailedPurgeResponseStatusCode(HttpResponseMessage purgeHttpResponse) // Terminated orchestration, should succeed if (this.fixture.functionLanguageLocalizer.GetLanguageType() != LanguageType.Java - && this.fixture.GetDurabilityProvider() != FunctionAppFixture.ConfiguredDurabilityProviderType.MSSQL) // Bug: https://github.com/microsoft/durabletask-java/issues/237 + || this.fixture.GetDurabilityProvider() != FunctionAppFixture.ConfiguredDurabilityProviderType.MSSQL) // Bug: https://github.com/microsoft/durabletask-java/issues/237 { using HttpResponseMessage startTerminatedOrchestrationResponse = await HttpHelpers.InvokeHttpTrigger( "StartOrchestration", @@ -219,16 +219,21 @@ void AssertFailedPurgeResponseStatusCode(HttpResponseMessage purgeHttpResponse) AssertFailedPurgeResponseStatusCode(purgeRunning); // Pending orchestration, should fail - DateTime scheduledStartTime = DateTime.UtcNow + TimeSpan.FromMinutes(1); - using HttpResponseMessage startPendingOrchestrationResponse = await HttpHelpers.InvokeHttpTrigger( - "HelloCities_HttpStart_Scheduled", - $"?ScheduledStartTime={scheduledStartTime:o}"); - Assert.Equal(HttpStatusCode.Accepted, startPendingOrchestrationResponse.StatusCode); - string pendingInstanceId = await DurableHelpers.ParseInstanceIdAsync(startPendingOrchestrationResponse); - string pendingStatusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(startPendingOrchestrationResponse); - await DurableHelpers.WaitForOrchestrationStateAsync(pendingStatusQueryGetUri, "Pending", 30); - using HttpResponseMessage purgePending = await HttpHelpers.InvokeHttpTrigger("PurgeOrchestrationHistory", $"?instanceId={pendingInstanceId}"); - AssertFailedPurgeResponseStatusCode(purgePending); + // Scheduled start times are currently only implemented in Java and .NET isolated, which is the only true way to get an orchestration in a "Pending" state + if (this.fixture.functionLanguageLocalizer.GetLanguageType() == LanguageType.DotnetIsolated + || this.fixture.functionLanguageLocalizer.GetLanguageType() == LanguageType.Java) + { + DateTime scheduledStartTime = DateTime.UtcNow + TimeSpan.FromMinutes(1); + using HttpResponseMessage startPendingOrchestrationResponse = await HttpHelpers.InvokeHttpTrigger( + "HelloCities_HttpStart_Scheduled", + $"?ScheduledStartTime={scheduledStartTime:o}"); + Assert.Equal(HttpStatusCode.Accepted, startPendingOrchestrationResponse.StatusCode); + string pendingInstanceId = await DurableHelpers.ParseInstanceIdAsync(startPendingOrchestrationResponse); + string pendingStatusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(startPendingOrchestrationResponse); + await DurableHelpers.WaitForOrchestrationStateAsync(pendingStatusQueryGetUri, "Pending", 30); + using HttpResponseMessage purgePending = await HttpHelpers.InvokeHttpTrigger("PurgeOrchestrationHistory", $"?instanceId={pendingInstanceId}"); + AssertFailedPurgeResponseStatusCode(purgePending); + } // Suspended orchestration, should fail using HttpResponseMessage startSuspendedOrchestrationResponse = await HttpHelpers.InvokeHttpTrigger( diff --git a/test/e2e/Tests/Tests/RestartOrchestrationTests.cs b/test/e2e/Tests/Tests/RestartOrchestrationTests.cs index 357e1356d..dabdbd353 100644 --- a/test/e2e/Tests/Tests/RestartOrchestrationTests.cs +++ b/test/e2e/Tests/Tests/RestartOrchestrationTests.cs @@ -40,7 +40,15 @@ public async Task RestartOrchestration_CreatedTimeAndOutputChange(bool restartWi string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response); string instanceId = await DurableHelpers.ParseInstanceIdAsync(response); - await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Completed", 30); + await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Completed", 30); + + // Verify that the ClientOperationReceived log was emitted with a FunctionInvocationId + ClientOperationLogHelpers.AssertClientOperationLogExists( + () => this.fixture.TestLogs.CoreToolsLogs, + "StartOrchestration", + instanceId, + this.fixture.functionLanguageLocalizer.GetLanguageType()); + var orchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri); string output1 = orchestrationDetails.Output; DateTime createdTime1 = orchestrationDetails.CreatedTime; @@ -62,7 +70,15 @@ public async Task RestartOrchestration_CreatedTimeAndOutputChange(bool restartWi string restartStatusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(restartResponse); string restartInstanceId = await DurableHelpers.ParseInstanceIdAsync(restartResponse); - await DurableHelpers.WaitForOrchestrationStateAsync(restartStatusQueryGetUri, "Completed", 30); + await DurableHelpers.WaitForOrchestrationStateAsync(restartStatusQueryGetUri, "Completed", 30); + + // Verify that the ClientOperationReceived log was emitted with a FunctionInvocationId + ClientOperationLogHelpers.AssertClientOperationLogExists( + () => this.fixture.TestLogs.CoreToolsLogs, + "Restart", + instanceId, + this.fixture.functionLanguageLocalizer.GetLanguageType()); + var restartOrchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(restartStatusQueryGetUri); string output2 = restartOrchestrationDetails.Output; DateTime createdTime2 = restartOrchestrationDetails.CreatedTime; @@ -125,18 +141,26 @@ public async Task RestartOrchestration_NonExistentInstanceId_ShouldReturnNotFoun [Trait("Java", "Skip")] // RestartAsync not yet implemented in Java [Trait("Python", "Skip")] // RestartAsync not supported in Python [Trait("Node", "Skip")] // RestartAsync not supported in Node - // Test that if we restart a instance that doesn't reach to completed state, - // If RestartWithNewInstanceId is set to false, a InvalidOperationException error will be thrown. - public async Task RestartOrchestration_NotCompletedOrchestrationWithRestartFalse_ShouldReturnFailedPrecondition() + public async Task RestartOrchestration_NotCompletedOrchestrationWithRestartWithNewInstanceIdFalse_ShouldSucceed() { // Start a long-running orchestration using HttpResponseMessage response = await HttpHelpers.InvokeHttpTrigger("RestarttOrchestration_HttpStart/LongOrchestrator"); Assert.Equal(HttpStatusCode.Accepted, response.StatusCode); string instanceId = await DurableHelpers.ParseInstanceIdAsync(response); string statusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(response); + DurableHelpers.OrchestrationStatusDetails orchestrationDetails + = await DurableHelpers.GetRunningOrchestrationDetailsAsync(statusQueryGetUri); + DateTime createdTime1 = orchestrationDetails.CreatedTime; // Wait for the orchestration to be running - await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Running", 30); + await DurableHelpers.WaitForOrchestrationStateAsync(statusQueryGetUri, "Running", 30); + + // Verify that the ClientOperationReceived log was emitted with a FunctionInvocationId + ClientOperationLogHelpers.AssertClientOperationLogExists( + () => this.fixture.TestLogs.CoreToolsLogs, + "StartOrchestration", + instanceId, + this.fixture.functionLanguageLocalizer.GetLanguageType()); // Try to restart the running orchestration with restartWithNewInstanceId = false var restartPayload = new @@ -147,15 +171,37 @@ public async Task RestartOrchestration_NotCompletedOrchestrationWithRestartFalse string jsonBody = JsonSerializer.Serialize(restartPayload); + // Restart the orchestrator with the same instance id + // This is necessary to ensure that the created times for the two orchestration instances are different. + // The created time returned by the orchestration status API has a resolution only up to seconds, not milliseconds. + Thread.Sleep(TimeSpan.FromSeconds(1)); using HttpResponseMessage restartResponse = await HttpHelpers.InvokeHttpTriggerWithBody( - "RestartOrchestration_HttpRestartWithErrorHandling", jsonBody, "application/json"); + "RestartOrchestration_HttpRestart", jsonBody, "application/json"); + Assert.Equal(HttpStatusCode.Accepted, restartResponse.StatusCode); - Assert.Equal(HttpStatusCode.BadRequest, restartResponse.StatusCode); - - string responseContent = await restartResponse.Content.ReadAsStringAsync(); - - // Verify the returned exception contains the correct information. - Assert.Contains(fixture.functionLanguageLocalizer.GetLocalizedStringValue("RestartRunningInstance.ErrorMessage", instanceId), responseContent); + string restartStatusQueryGetUri = await DurableHelpers.ParseStatusQueryGetUriAsync(restartResponse); + string restartInstanceId = await DurableHelpers.ParseInstanceIdAsync(restartResponse); + + // Wait for the "created time" to change to verify that the orchestration was restarted + DurableHelpers.OrchestrationStatusDetails restartOrchestrationDetails + = await DurableHelpers.GetRunningOrchestrationDetailsAsync(restartStatusQueryGetUri); + DateTime createdTime2 = restartOrchestrationDetails.CreatedTime; + var waitForRestartTimeout = TimeSpan.FromSeconds(30); + using CancellationTokenSource cts = new(waitForRestartTimeout); + while (!cts.IsCancellationRequested && createdTime2 == createdTime1) + { + restartOrchestrationDetails = await DurableHelpers.GetRunningOrchestrationDetailsAsync(restartStatusQueryGetUri); + createdTime2 = restartOrchestrationDetails.CreatedTime; + } + Assert.NotEqual(createdTime1, createdTime2); + Assert.Equal(instanceId, restartInstanceId); + + // Verify that the ClientOperationReceived log was emitted with a FunctionInvocationId + ClientOperationLogHelpers.AssertClientOperationLogExists( + () => this.fixture.TestLogs.CoreToolsLogs, + "Restart", + instanceId, + this.fixture.functionLanguageLocalizer.GetLanguageType()); // Clean up: terminate the long-running orchestration using HttpResponseMessage terminateResponse = await HttpHelpers.InvokeHttpTrigger("TerminateInstance", $"?instanceId={instanceId}");