Skip to content

Commit

Permalink
Allow ExternalSDK to handle orchestration
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmrdavid committed Feb 3, 2022
1 parent 2fcbfae commit f43f350
Show file tree
Hide file tree
Showing 11 changed files with 75 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable.Commands
{
using System.Collections;
using System.Management.Automation;
using Microsoft.PowerShell.Commands;

/// <summary>
/// Set the orchestration context.
Expand Down
10 changes: 10 additions & 0 deletions src/DurableSDK/DurableTaskHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public void StopAndInitiateDurableTaskOrReplay(
}

completedHistoryEvent.IsProcessed = true;
context.IsReplaying = completedHistoryEvent.IsPlayed;

switch (completedHistoryEvent.EventType)
{
Expand All @@ -60,6 +61,13 @@ public void StopAndInitiateDurableTaskOrReplay(
output(eventResult);
}
break;
case HistoryEventType.EventRaised:
var eventRaisedResult = GetEventResult(completedHistoryEvent);
if (eventRaisedResult != null)
{
output(eventRaisedResult);
}
break;

case HistoryEventType.TaskFailed:
if (retryOptions == null)
Expand Down Expand Up @@ -129,6 +137,7 @@ public void WaitAll(
var allTasksCompleted = completedEvents.Count == tasksToWaitFor.Count;
if (allTasksCompleted)
{
context.IsReplaying = completedEvents[0].IsPlayed;
CurrentUtcDateTimeUpdater.UpdateCurrentUtcDateTime(context);

foreach (var completedHistoryEvent in completedEvents)
Expand Down Expand Up @@ -188,6 +197,7 @@ public void WaitAny(
var anyTaskCompleted = completedTasks.Count > 0;
if (anyTaskCompleted)
{
context.IsReplaying = context.History[firstCompletedHistoryEventIndex].IsPlayed;
CurrentUtcDateTimeUpdater.UpdateCurrentUtcDateTime(context);
// Return a reference to the first completed task
output(firstCompletedTask);
Expand Down
3 changes: 3 additions & 0 deletions src/DurableSDK/IOrchestrationInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@

namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
using System;
using System.Collections;
using System.Management.Automation;

internal interface IOrchestrationInvoker
{
Hashtable Invoke(OrchestrationBindingInfo orchestrationBindingInfo, IPowerShellServices pwsh);
void SetExternalInvoker(Action<PowerShell> externalInvoker);
}
}
1 change: 1 addition & 0 deletions src/DurableSDK/IPowerShellServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable

internal interface IPowerShellServices
{
PowerShell GetPowerShell();
void SetDurableClient(object durableClient);

void SetOrchestrationContext(OrchestrationContext orchestrationContext);
Expand Down
1 change: 1 addition & 0 deletions src/DurableSDK/OrchestrationActionCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
using System.Threading;

using Microsoft.Azure.Functions.PowerShellWorker.Durable.Actions;
using Newtonsoft.Json;

internal class OrchestrationActionCollector
{
Expand Down
13 changes: 11 additions & 2 deletions src/DurableSDK/OrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ public class OrchestrationContext
public object Input { get; internal set; }

[DataMember]
internal string InstanceId { get; set; }
public string InstanceId { get; set; }

[DataMember]
internal string ParentInstanceId { get; set; }

[DataMember]
internal bool IsReplaying { get; set; }
public bool IsReplaying { get; set; }

[DataMember]
internal HistoryEvent[] History { get; set; }
Expand All @@ -35,6 +35,15 @@ public class OrchestrationContext

internal OrchestrationActionCollector OrchestrationActionCollector { get; } = new OrchestrationActionCollector();

internal object ExternalResult;
internal bool ExternalIsError;

internal void SetExternalResult(object result, bool isError)
{
this.ExternalResult = result;
this.ExternalIsError = isError;
}

internal object CustomStatus { get; set; }
}
}
30 changes: 24 additions & 6 deletions src/DurableSDK/OrchestrationInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable

internal class OrchestrationInvoker : IOrchestrationInvoker
{
private Action<PowerShell> externalInvoker = null;

public Hashtable Invoke(OrchestrationBindingInfo orchestrationBindingInfo, IPowerShellServices pwsh)
{
try
Expand All @@ -30,13 +32,23 @@ public Hashtable Invoke(OrchestrationBindingInfo orchestrationBindingInfo, IPowe

// Marks the first OrchestratorStarted event as processed
orchestrationStart.IsProcessed = true;

var useExternalSDK = externalInvoker != null;
if (useExternalSDK)
{
externalInvoker.Invoke(pwsh.GetPowerShell());
var result = orchestrationBindingInfo.Context.ExternalResult;
var isError = orchestrationBindingInfo.Context.ExternalIsError;
if (isError)
{
throw (Exception)result;
}
else
{
return (Hashtable)result;
}
}

// IDEA:
// This seems to be where the user-code is allowed to run.
// When using the new SDK, we'll want the user-code to send an `asyncResult`
// with a specific flag/signature that tells the worker to short-circuit
// its regular DF logic, and to return the value its been provided without further processing.
// All we need is to make the orchestrationBinding info viewable to the user-code. < This should be our next step
var asyncResult = pwsh.BeginInvoke(outputBuffer);

var (shouldStop, actions) =
Expand All @@ -46,6 +58,7 @@ public Hashtable Invoke(OrchestrationBindingInfo orchestrationBindingInfo, IPowe
{
// The orchestration function should be stopped and restarted
pwsh.StopInvoke();
// return (Hashtable)orchestrationBindingInfo.Context.OrchestrationActionCollector.output;
return CreateOrchestrationResult(isDone: false, actions, output: null, context.CustomStatus);
}
else
Expand Down Expand Up @@ -90,5 +103,10 @@ private static Hashtable CreateOrchestrationResult(
var orchestrationMessage = new OrchestrationMessage(isDone, actions, output, customStatus);
return new Hashtable { { "$return", orchestrationMessage } };
}

public void SetExternalInvoker(Action<PowerShell> externalInvoker)
{
this.externalInvoker = externalInvoker;
}
}
}
6 changes: 6 additions & 0 deletions src/DurableSDK/PowerShellServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ public PowerShellServices(PowerShell pwsh)
_pwsh = pwsh;
}

public PowerShell GetPowerShell()
{
return this._pwsh;
}

public void SetDurableClient(object durableClient)
{
_pwsh.AddCommand(SetFunctionInvocationContextCommand)
.AddParameter("DurableClient", durableClient)
.InvokeAndClearCommands();


_hasSetOrchestrationContext = true;
}

Expand Down
16 changes: 16 additions & 0 deletions src/DurableWorker/DurableController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ internal class DurableController
private readonly IPowerShellServices _powerShellServices;
private readonly IOrchestrationInvoker _orchestrationInvoker;
private OrchestrationBindingInfo _orchestrationBindingInfo;
private PowerShell pwsh;

public DurableController(
DurableFunctionInfo durableDurableFunctionInfo,
Expand All @@ -38,6 +39,7 @@ public DurableController(
new PowerShellServices(pwsh),
new OrchestrationInvoker())
{
this.pwsh = pwsh;
}

internal DurableController(
Expand Down Expand Up @@ -66,6 +68,20 @@ public void BeforeFunctionInvocation(IList<ParameterBinding> inputData)
{
_orchestrationBindingInfo = CreateOrchestrationBindingInfo(inputData);
_powerShellServices.SetOrchestrationContext(_orchestrationBindingInfo.Context);

// Bote: Cannot find the DurableSDK module here, somehow.
Collection<object> output2 = this.pwsh.AddCommand("Get-Module")
.InvokeAndClearCommands<object>();

var context = inputData[0];
Collection<Action<object>> output = this.pwsh.AddCommand("Set-BindingData")
.AddParameter("Input", context.Data.String)
.AddParameter("SetResult", (Action<object, bool>)_orchestrationBindingInfo.Context.SetExternalResult)
.InvokeAndClearCommands<Action<object>>();
if (output.Count() == 1)
{
this._orchestrationInvoker.SetExternalInvoker(output[0]);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

# Set aliases for cmdlets to export
Set-Alias -Name Wait-ActivityFunction -Value Wait-DurableTask
Set-Alias -Name Invoke-ActivityFunction -Value Invoke-DurableActivity
# Set-Alias -Name Invoke-ActivityFunction -Value Invoke-DurableActivity
Set-Alias -Name New-OrchestrationCheckStatusResponse -Value New-DurableOrchestrationCheckStatusResponse
Set-Alias -Name Start-NewOrchestration -Value Start-DurableOrchestration

Expand Down
2 changes: 1 addition & 1 deletion src/PowerShell/PowerShellManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ public Hashtable InvokeFunction(
FunctionInvocationPerformanceStopwatch stopwatch)
{
var outputBindings = FunctionMetadata.GetOutputBindingHashtable(_pwsh.Runspace.InstanceId);

var durableController = new DurableController(functionInfo.DurableFunctionInfo, _pwsh);

try
Expand All @@ -227,6 +226,7 @@ public Hashtable InvokeFunction(

try
{

return durableController.TryInvokeOrchestrationFunction(out var result)
? result
: InvokeNonOrchestrationFunction(durableController, outputBindings);
Expand Down

0 comments on commit f43f350

Please sign in to comment.