-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prepare Worker for External DF SDK support (and a few DF bug fixes) #746
Changes from 37 commits
4423d13
9d73110
2fcbfae
f43f350
092e333
506e4f7
7a38ec8
9b2cf42
47caee9
1010f97
d916d58
946ea37
64b6c49
9e7cf96
92f113e
e5843ce
2ee6b96
ae28f6e
94f995d
96f6ee7
1cb30b8
ff5102e
b8c6a6a
baae3cc
6feb7fe
94d83ae
d11a561
0e0c077
1c4a7ae
dc0ed38
3bd961c
caa5176
932e124
3d1a7b4
adec8fa
8d5d9bf
8351653
7232e58
d9b3baf
edf3011
aca4c0b
e5ca2ba
f69b146
4387799
a9a0a6e
12ccdee
5f862e6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
// | ||
// Copyright (c) Microsoft. All rights reserved. | ||
// Licensed under the MIT license. See LICENSE file in the project root for full license information. | ||
// | ||
|
||
#pragma warning disable 1591 // Missing XML comment for publicly visible type or member 'member' | ||
|
||
namespace Microsoft.Azure.Functions.PowerShellWorker.Durable.Commands | ||
{ | ||
using System.Collections; | ||
using System.Management.Automation; | ||
using Microsoft.Azure.Functions.PowerShellWorker.Durable.Tasks; | ||
|
||
[Cmdlet("Get", "DurableTaskResult")] | ||
public class GetDurableTaskResultCommand : PSCmdlet | ||
{ | ||
[Parameter(Mandatory = true)] | ||
[ValidateNotNull] | ||
public DurableTask[] Task { get; set; } | ||
|
||
private readonly DurableTaskHandler _durableTaskHandler = new DurableTaskHandler(); | ||
|
||
protected override void EndProcessing() | ||
{ | ||
var privateData = (Hashtable)MyInvocation.MyCommand.Module.PrivateData; | ||
var context = (OrchestrationContext)privateData[SetFunctionInvocationContextCommand.ContextKey]; | ||
|
||
_durableTaskHandler.GetTaskResult(Task, context, WriteObject); | ||
} | ||
|
||
protected override void StopProcessing() | ||
{ | ||
_durableTaskHandler.Stop(); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,10 +43,8 @@ protected override void EndProcessing() | |
{ | ||
var privateData = (Hashtable)MyInvocation.MyCommand.Module.PrivateData; | ||
var context = (OrchestrationContext)privateData[SetFunctionInvocationContextCommand.ContextKey]; | ||
var loadedFunctions = FunctionLoader.GetLoadedFunctions(); | ||
|
||
var task = new ActivityInvocationTask(FunctionName, Input, RetryOptions); | ||
ActivityInvocationTask.ValidateTask(task, loadedFunctions); | ||
Comment on lines
-46
to
-49
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These two were removed since they're incompatible with the external SDK. In addition, other DF SDKs don't perform this validation steps. If we want them, they ought to exist outside the DF SDKs. I do agree that these are useful validations to have though, but if we perform these checks in the DF SDK components, then we're creating a strong dependency with worker-specific utilities. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The intent of |
||
|
||
_durableTaskHandler.StopAndInitiateDurableTaskOrReplay( | ||
task, context, NoWait.IsPresent, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,10 +6,12 @@ | |
namespace Microsoft.Azure.Functions.PowerShellWorker.Durable | ||
{ | ||
using System; | ||
using System.Collections; | ||
using System.Collections.Generic; | ||
using System.Management.Automation; | ||
using System.Threading; | ||
using Microsoft.Azure.Functions.PowerShellWorker.Durable.Tasks; | ||
using Utility; | ||
using Microsoft.PowerShell.Commands; | ||
|
||
internal class DurableTaskHandler | ||
{ | ||
|
@@ -47,6 +49,7 @@ public void StopAndInitiateDurableTaskOrReplay( | |
} | ||
|
||
completedHistoryEvent.IsProcessed = true; | ||
context.IsReplaying = completedHistoryEvent.IsPlayed; | ||
|
||
switch (completedHistoryEvent.EventType) | ||
{ | ||
|
@@ -57,6 +60,13 @@ public void StopAndInitiateDurableTaskOrReplay( | |
output(eventResult); | ||
} | ||
break; | ||
case HistoryEventType.EventRaised: | ||
var eventRaisedResult = GetEventResult(completedHistoryEvent); | ||
if (eventRaisedResult != null) | ||
{ | ||
output(eventRaisedResult); | ||
} | ||
break; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This allows |
||
|
||
case HistoryEventType.TaskFailed: | ||
if (retryOptions == null) | ||
|
@@ -76,7 +86,7 @@ public void StopAndInitiateDurableTaskOrReplay( | |
retryOptions.MaxNumberOfAttempts, | ||
onSuccess: | ||
result => { | ||
output(TypeExtensions.ConvertFromJson(result)); | ||
output(ConvertFromJson(result)); | ||
}, | ||
onFailure); | ||
|
||
|
@@ -126,6 +136,7 @@ public void WaitAll( | |
var allTasksCompleted = completedEvents.Count == tasksToWaitFor.Count; | ||
if (allTasksCompleted) | ||
{ | ||
context.IsReplaying = completedEvents.Count == 0 ? false : completedEvents[0].IsPlayed; | ||
CurrentUtcDateTimeUpdater.UpdateCurrentUtcDateTime(context); | ||
|
||
foreach (var completedHistoryEvent in completedEvents) | ||
|
@@ -164,6 +175,7 @@ public void WaitAny( | |
if (scheduledHistoryEvent != null) | ||
{ | ||
scheduledHistoryEvent.IsProcessed = true; | ||
scheduledHistoryEvent.IsPlayed = true; | ||
} | ||
|
||
if (completedHistoryEvent != null) | ||
|
@@ -179,12 +191,14 @@ public void WaitAny( | |
} | ||
|
||
completedHistoryEvent.IsProcessed = true; | ||
completedHistoryEvent.IsPlayed = true; | ||
} | ||
} | ||
|
||
var anyTaskCompleted = completedTasks.Count > 0; | ||
if (anyTaskCompleted) | ||
{ | ||
context.IsReplaying = context.History[firstCompletedHistoryEventIndex].IsPlayed; | ||
CurrentUtcDateTimeUpdater.UpdateCurrentUtcDateTime(context); | ||
// Return a reference to the first completed task | ||
output(firstCompletedTask); | ||
|
@@ -195,6 +209,21 @@ public void WaitAny( | |
} | ||
} | ||
|
||
public void GetTaskResult( | ||
IReadOnlyCollection<DurableTask> tasksToQueryResultFor, | ||
OrchestrationContext context, | ||
Action<object> output) | ||
{ | ||
foreach (var task in tasksToQueryResultFor) { | ||
var scheduledHistoryEvent = task.GetScheduledHistoryEvent(context, true); | ||
var processedHistoryEvent = task.GetCompletedHistoryEvent(context, scheduledHistoryEvent, true); | ||
if (processedHistoryEvent != null) | ||
{ | ||
output(GetEventResult(processedHistoryEvent)); | ||
} | ||
} | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Part of the community-contributed logic for "Get-TaskResult". Again, this could use some caching, but I'd rather optimize that later since this is feature is urgently missing. |
||
public void Stop() | ||
{ | ||
_waitForStop.Set(); | ||
|
@@ -206,15 +235,41 @@ private static object GetEventResult(HistoryEvent historyEvent) | |
|
||
if (historyEvent.EventType == HistoryEventType.TaskCompleted) | ||
{ | ||
return TypeExtensions.ConvertFromJson(historyEvent.Result); | ||
return ConvertFromJson(historyEvent.Result); | ||
} | ||
else if (historyEvent.EventType == HistoryEventType.EventRaised) | ||
{ | ||
return TypeExtensions.ConvertFromJson(historyEvent.Input); | ||
return ConvertFromJson(historyEvent.Input); | ||
} | ||
return null; | ||
} | ||
|
||
public static object ConvertFromJson(string json) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This used to exist in |
||
{ | ||
object retObj = JsonObject.ConvertFromJson(json, returnHashtable: true, error: out _); | ||
|
||
if (retObj is PSObject psObj) | ||
{ | ||
retObj = psObj.BaseObject; | ||
} | ||
|
||
if (retObj is Hashtable hashtable) | ||
{ | ||
try | ||
{ | ||
// ConvertFromJson returns case-sensitive Hashtable by design -- JSON may contain keys that only differ in case. | ||
// We try casting the Hashtable to a case-insensitive one, but if that fails, we keep using the original one. | ||
retObj = new Hashtable(hashtable, StringComparer.OrdinalIgnoreCase); | ||
} | ||
catch | ||
{ | ||
retObj = hashtable; | ||
} | ||
} | ||
|
||
return retObj; | ||
} | ||
|
||
private void InitiateAndWaitForStop(OrchestrationContext context) | ||
{ | ||
context.OrchestrationActionCollector.Stop(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the community-contributed CmdLet that allows users to get the result of an already-completed Task. In other PLs, users can easily do this by re-await/re-yielding that Task, but the PS programming model doesn't have that. The implementation of this CmdLet isn't as efficient as it could be, but I don't think that's something we can't address after merging it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This addition seems to be independent from the main purpose of the PR. If this is not very difficult, I recommend separating this into its own PR. Unless this is indeed too difficult.