Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare Worker for External DF SDK support (and a few DF bug fixes) #746

Closed
wants to merge 47 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
4423d13
separate DF SDK classes from DF worker classes
davidmrdavid Jan 29, 2022
9d73110
fix typo
davidmrdavid Jan 29, 2022
2fcbfae
DurableSDK now compiles by itself
davidmrdavid Jan 29, 2022
f43f350
Allow ExternalSDK to handle orchestration
davidmrdavid Feb 3, 2022
092e333
document next steps
davidmrdavid Feb 3, 2022
506e4f7
allow external SDK to set the user-code's input. Still need to refact…
davidmrdavid Feb 4, 2022
7a38ec8
add import module
davidmrdavid Feb 16, 2022
9b2cf42
supress traces
davidmrdavid Feb 16, 2022
47caee9
avoid nullptr
davidmrdavid Feb 16, 2022
1010f97
pass tests
davidmrdavid Feb 17, 2022
d916d58
fix E2E tests
davidmrdavid Feb 17, 2022
946ea37
develop E2E tests
davidmrdavid Feb 18, 2022
64b6c49
Enabled external durable client (#765)
davidmrdavid Feb 18, 2022
9e7cf96
bindings work
davidmrdavid Feb 18, 2022
92f113e
conditional binding intialization
davidmrdavid Feb 18, 2022
e5843ce
conditional import
davidmrdavid Feb 18, 2022
2ee6b96
Added exception handling logic
michaelpeng36 Feb 25, 2022
ae28f6e
Ensure unit tests are functioning properly
michaelpeng36 Feb 28, 2022
94f995d
Corrected unit test names
michaelpeng36 Feb 28, 2022
96f6ee7
Turned repeated variables in unit tests into static members
michaelpeng36 Feb 28, 2022
1cb30b8
Revert durableController name to durableFunctionsUtils
michaelpeng36 Mar 1, 2022
ff5102e
Merge remote-tracking branch 'origin/michaelpeng/get-durable-tests-wo…
davidmrdavid Mar 1, 2022
b8c6a6a
Fixed issue with building the worker
michaelpeng36 Mar 8, 2022
baae3cc
Fix E2E test
michaelpeng36 Mar 8, 2022
6feb7fe
Fixed unit test setup
michaelpeng36 Mar 8, 2022
94d83ae
Fixed another unit test setup
michaelpeng36 Mar 8, 2022
d11a561
Remove string representation of booleans
michaelpeng36 Mar 8, 2022
0e0c077
patch e2e test
davidmrdavid Mar 9, 2022
1c4a7ae
remove typo in toString
davidmrdavid Mar 9, 2022
dc0ed38
Merge branch 'dev' of https://github.com/Azure/azure-functions-powers…
davidmrdavid Mar 9, 2022
3bd961c
Return results from Start-DurableExternalEventListener (#685) (#753)
davidmrdavid Mar 9, 2022
caa5176
add external contrib
davidmrdavid Mar 9, 2022
932e124
add e2e test for GetTaskResult
davidmrdavid Mar 9, 2022
3d1a7b4
parametrize test
davidmrdavid Mar 9, 2022
adec8fa
patch new e2e test
davidmrdavid Mar 9, 2022
8d5d9bf
patch external contrib
davidmrdavid Mar 9, 2022
8351653
fix typo in test
davidmrdavid Mar 9, 2022
7232e58
comment changes
davidmrdavid Mar 14, 2022
d9b3baf
Adds IExternalInvoker (#776)
michaelpeng36 Mar 16, 2022
edf3011
rename hasOrchestrationContext to hasInitializedDurableFunction
davidmrdavid Mar 16, 2022
aca4c0b
remove outdated TODO comment
davidmrdavid Mar 16, 2022
e5ca2ba
remove now unused function - CreateOrchestrationBindingInfo
davidmrdavid Mar 16, 2022
f69b146
Allow worker to read results directly from the external SDK (#777)
michaelpeng36 Mar 17, 2022
4387799
comment out external SDK path
davidmrdavid Mar 17, 2022
a9a0a6e
change serialization
davidmrdavid Jul 12, 2022
12ccdee
Merge branch 'dev' of https://github.com/Azure/azure-functions-powers…
davidmrdavid Jul 12, 2022
5f862e6
restore orchestrationInvoker and powershellServices
davidmrdavid Jul 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/DurableSDK/Commands/InvokeDurableActivityCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ protected override void EndProcessing()
{
var privateData = (Hashtable)MyInvocation.MyCommand.Module.PrivateData;
var context = (OrchestrationContext)privateData[SetFunctionInvocationContextCommand.ContextKey];
var loadedFunctions = FunctionLoader.GetLoadedFunctions();

var task = new ActivityInvocationTask(FunctionName, Input, RetryOptions);
ActivityInvocationTask.ValidateTask(task, loadedFunctions);
Comment on lines -46 to -49
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two were removed since they're incompatible with the external SDK. In addition, other DF SDKs don't perform this validation steps. If we want them, they ought to exist outside the DF SDKs.

I do agree that these are useful validations to have though, but if we perform these checks in the DF SDK components, then we're creating a strong dependency with worker-specific utilities.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent of ActivityInvocationTask.ValidateTask is to provide user-friendly error messages when the specified activity function does not exits (for example, because of a typo in the name). Without this code, the error messages are really confusing and unhelpful. If you remove it, we need an adequate replacement.


_durableTaskHandler.StopAndInitiateDurableTaskOrReplay(
task, context, NoWait.IsPresent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable.Commands
{
using System.Collections;
using System.Management.Automation;
using Microsoft.PowerShell.Commands;

/// <summary>
/// Set the orchestration context.
Expand Down
36 changes: 32 additions & 4 deletions src/DurableSDK/DurableTaskHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
using System;
using System.Collections;
using System.Collections.Generic;
using System.Management.Automation;
using System.Threading;
using Microsoft.Azure.Functions.PowerShellWorker.Durable.Tasks;
using Utility;
using Microsoft.PowerShell.Commands;

internal class DurableTaskHandler
{
Expand Down Expand Up @@ -83,7 +85,7 @@ public void StopAndInitiateDurableTaskOrReplay(
retryOptions.MaxNumberOfAttempts,
onSuccess:
result => {
output(TypeExtensions.ConvertFromJson(result));
output(ConvertFromJson(result));
},
onFailure);

Expand Down Expand Up @@ -232,15 +234,41 @@ private static object GetEventResult(HistoryEvent historyEvent)

if (historyEvent.EventType == HistoryEventType.TaskCompleted)
{
return TypeExtensions.ConvertFromJson(historyEvent.Result);
return ConvertFromJson(historyEvent.Result);
}
else if (historyEvent.EventType == HistoryEventType.EventRaised)
{
return TypeExtensions.ConvertFromJson(historyEvent.Input);
return ConvertFromJson(historyEvent.Input);
}
return null;
}

public static object ConvertFromJson(string json)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This used to exist in TypeExtensions, creating a tight-dependency with worker utilities. It's been copied here to allow this folder to be readily copied out into an external SDK

{
object retObj = JsonObject.ConvertFromJson(json, returnHashtable: true, error: out _);

if (retObj is PSObject psObj)
{
retObj = psObj.BaseObject;
}

if (retObj is Hashtable hashtable)
{
try
{
// ConvertFromJson returns case-sensitive Hashtable by design -- JSON may contain keys that only differ in case.
// We try casting the Hashtable to a case-insensitive one, but if that fails, we keep using the original one.
retObj = new Hashtable(hashtable, StringComparer.OrdinalIgnoreCase);
}
catch
{
retObj = hashtable;
}
}

return retObj;
}

private void InitiateAndWaitForStop(OrchestrationContext context)
{
context.OrchestrationActionCollector.Stop();
Expand Down
26 changes: 26 additions & 0 deletions src/DurableSDK/ExternalInvoker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//

namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
using System;
using System.Collections;
using System.Management.Automation;

internal class ExternalInvoker : IExternalInvoker
{
private readonly Func<PowerShell, object> _externalSDKInvokerFunction;

public ExternalInvoker(Func<PowerShell, object> invokerFunction)
{
_externalSDKInvokerFunction = invokerFunction;
}

public Hashtable Invoke(IPowerShellServices powerShellServices)
{
return (Hashtable)_externalSDKInvokerFunction.Invoke(powerShellServices.GetPowerShell());
}
}
}
16 changes: 16 additions & 0 deletions src/DurableSDK/IExternalInvoker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//

namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
using System.Collections;

// Represents a contract for the
internal interface IExternalInvoker
{
// Method to invoke an orchestration using the external Durable SDK
Hashtable Invoke(IPowerShellServices powerShellServices);
}
}
1 change: 1 addition & 0 deletions src/DurableSDK/IOrchestrationInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
internal interface IOrchestrationInvoker
{
Hashtable Invoke(OrchestrationBindingInfo orchestrationBindingInfo, IPowerShellServices pwsh);
void SetExternalInvoker(IExternalInvoker externalInvoker);
}
}
11 changes: 10 additions & 1 deletion src/DurableSDK/IPowerShellServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,26 @@

namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
using Microsoft.Azure.WebJobs.Script.Grpc.Messages;
using System;
using System.Management.Automation;

internal interface IPowerShellServices
{
PowerShell GetPowerShell();

bool UseExternalDurableSDK();

void SetDurableClient(object durableClient);

void SetOrchestrationContext(OrchestrationContext orchestrationContext);
OrchestrationBindingInfo SetOrchestrationContext(ParameterBinding context, out IExternalInvoker externalInvoker);

void ClearOrchestrationContext();

void TracePipelineObject();

void AddParameter(string name, object value);

IAsyncResult BeginInvoke(PSDataCollection<object> output);

void EndInvoke(IAsyncResult asyncResult);
Expand Down
1 change: 1 addition & 0 deletions src/DurableSDK/OrchestrationActionCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
using System.Threading;

using Microsoft.Azure.Functions.PowerShellWorker.Durable.Actions;
using Newtonsoft.Json;

internal class OrchestrationActionCollector
{
Expand Down
115 changes: 80 additions & 35 deletions src/DurableSDK/OrchestrationInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,58 +11,98 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
using System.Linq;
using System.Management.Automation;

using PowerShellWorker.Utility;
using Microsoft.Azure.Functions.PowerShellWorker.Durable.Actions;

internal class OrchestrationInvoker : IOrchestrationInvoker
{
public Hashtable Invoke(OrchestrationBindingInfo orchestrationBindingInfo, IPowerShellServices pwsh)
private IExternalInvoker _externalInvoker;
internal static string isOrchestrationFailureKey = "IsOrchestrationFailure";

public Hashtable Invoke(
OrchestrationBindingInfo orchestrationBindingInfo,
IPowerShellServices powerShellServices)
{
try
{
var outputBuffer = new PSDataCollection<object>();
var context = orchestrationBindingInfo.Context;
if (powerShellServices.UseExternalDurableSDK())
{
return InvokeExternalDurableSDK(powerShellServices);
}
return InvokeInternalDurableSDK(orchestrationBindingInfo, powerShellServices);
}
catch (Exception ex)
{
ex.Data.Add(isOrchestrationFailureKey, true);
throw;
}
finally
{
powerShellServices.ClearStreamsAndCommands();
}
}

public Hashtable InvokeExternalDurableSDK(IPowerShellServices powerShellServices)
{
return _externalInvoker.Invoke(powerShellServices);
}

public Hashtable InvokeInternalDurableSDK(
OrchestrationBindingInfo orchestrationBindingInfo,
IPowerShellServices powerShellServices)
{
var outputBuffer = new PSDataCollection<object>();
var context = orchestrationBindingInfo.Context;

// context.History should never be null when initializing CurrentUtcDateTime
var orchestrationStart = context.History.First(
e => e.EventType == HistoryEventType.OrchestratorStarted);
context.CurrentUtcDateTime = orchestrationStart.Timestamp.ToUniversalTime();

// context.History should never be null when initializing CurrentUtcDateTime
var orchestrationStart = context.History.First(
e => e.EventType == HistoryEventType.OrchestratorStarted);
context.CurrentUtcDateTime = orchestrationStart.Timestamp.ToUniversalTime();
// Marks the first OrchestratorStarted event as processed
orchestrationStart.IsProcessed = true;

// Marks the first OrchestratorStarted event as processed
orchestrationStart.IsProcessed = true;

var asyncResult = pwsh.BeginInvoke(outputBuffer);
// Finish initializing the Function invocation
powerShellServices.AddParameter(orchestrationBindingInfo.ParameterName, context);
powerShellServices.TracePipelineObject();

var (shouldStop, actions) =
orchestrationBindingInfo.Context.OrchestrationActionCollector.WaitForActions(asyncResult.AsyncWaitHandle);
var asyncResult = powerShellServices.BeginInvoke(outputBuffer);

if (shouldStop)
var (shouldStop, actions) =
orchestrationBindingInfo.Context.OrchestrationActionCollector.WaitForActions(asyncResult.AsyncWaitHandle);

if (shouldStop)
{
// The orchestration function should be stopped and restarted
powerShellServices.StopInvoke();
// return (Hashtable)orchestrationBindingInfo.Context.OrchestrationActionCollector.output;
return CreateOrchestrationResult(isDone: false, actions, output: null, context.CustomStatus);
}
else
{
try
{
// The orchestration function should be stopped and restarted
pwsh.StopInvoke();
return CreateOrchestrationResult(isDone: false, actions, output: null, context.CustomStatus);
// The orchestration function completed
powerShellServices.EndInvoke(asyncResult);
var result = CreateReturnValueFromFunctionOutput(outputBuffer);
return CreateOrchestrationResult(isDone: true, actions, output: result, context.CustomStatus);
}
else
catch (Exception e)
{
try
{
// The orchestration function completed
pwsh.EndInvoke(asyncResult);
var result = FunctionReturnValueBuilder.CreateReturnValueFromFunctionOutput(outputBuffer);
return CreateOrchestrationResult(isDone: true, actions, output: result, context.CustomStatus);
}
catch (Exception e)
{
// The orchestrator code has thrown an unhandled exception:
// this should be treated as an entire orchestration failure
throw new OrchestrationFailureException(actions, context.CustomStatus, e);
}
// The orchestrator code has thrown an unhandled exception:
// this should be treated as an entire orchestration failure
throw new OrchestrationFailureException(actions, context.CustomStatus, e);
}
}
finally
}

public static object CreateReturnValueFromFunctionOutput(IList<object> pipelineItems)
{
if (pipelineItems == null || pipelineItems.Count <= 0)
{
pwsh.ClearStreamsAndCommands();
return null;
}

return pipelineItems.Count == 1 ? pipelineItems[0] : pipelineItems.ToArray();
}

private static Hashtable CreateOrchestrationResult(
Expand All @@ -72,7 +112,12 @@ private static Hashtable CreateOrchestrationResult(
object customStatus)
{
var orchestrationMessage = new OrchestrationMessage(isDone, actions, output, customStatus);
return new Hashtable { { AzFunctionInfo.DollarReturn, orchestrationMessage } };
return new Hashtable { { "$return", orchestrationMessage } };
}

public void SetExternalInvoker(IExternalInvoker externalInvoker)
{
_externalInvoker = externalInvoker;
}
}
}
69 changes: 69 additions & 0 deletions src/DurableSDK/PowerShellExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//

using System.Collections;
using System.Collections.ObjectModel;

namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
using System.Management.Automation;

internal static class PowerShellExtensions
{
public static void InvokeAndClearCommands(this PowerShell pwsh)
{
try
{
pwsh.Invoke();
}
finally
{
pwsh.Streams.ClearStreams();
pwsh.Commands.Clear();
}
}

public static void InvokeAndClearCommands(this PowerShell pwsh, IEnumerable input)
{
try
{
pwsh.Invoke(input);
}
finally
{
pwsh.Streams.ClearStreams();
pwsh.Commands.Clear();
}
}

public static Collection<T> InvokeAndClearCommands<T>(this PowerShell pwsh)
{
try
{
var result = pwsh.Invoke<T>();
return result;
}
finally
{
pwsh.Streams.ClearStreams();
pwsh.Commands.Clear();
}
}

public static Collection<T> InvokeAndClearCommands<T>(this PowerShell pwsh, IEnumerable input)
{
try
{
var result = pwsh.Invoke<T>(input);
return result;
}
finally
{
pwsh.Streams.ClearStreams();
pwsh.Commands.Clear();
}
}
}
}
Loading