Skip to content

Commit

Permalink
Add support for distinguishing which Parallel/Pick parallelism branch…
Browse files Browse the repository at this point in the history
… an activity is executing in (#307)

This feature introduces a context.GetCurrentParallelId() queryable from an activity,
which should identify a "parallelism" branch on which the activity executes.

Only containers which schedule concurrently (Parallel, ParallelForEach, Pick)
would induce a change of the background parallel id.

The initial parallel id, before using the above containers, is null.
  • Loading branch information
gabriela-lungu-uip authored Mar 20, 2024
1 parent 3ce0de1 commit 681464d
Show file tree
Hide file tree
Showing 14 changed files with 462 additions and 5 deletions.
1 change: 1 addition & 0 deletions src/Test/Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<Project>
<PropertyGroup>
<TargetFrameworks>net6.0;net6.0-windows</TargetFrameworks>
<LangVersion>latest</LangVersion>
</PropertyGroup>
<Import Project="$(SolutionDir)Directory.Build.props" />
<ItemGroup>
Expand Down
145 changes: 145 additions & 0 deletions src/Test/TestCases.Runtime/ParallelTrackingExtensionsTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
using Shouldly;
using System;
using System.Activities;
using System.Activities.Statements;
using System.Collections.Generic;
using System.Linq;
using UiPath.Workflow.Runtime.ParallelTracking;
using WorkflowApplicationTestExtensions;
using Xunit;

namespace TestCases.Runtime
{
public class ParallelTrackingExtensionsTests
{
[Fact]
public void ParallelActivity()
{
string nesting1Id = null;
string nesting2Branch1 = null;
string nesting2Branch2 = null;

Run(Sequence
(
new ValidateParallelId(id => id.ShouldBeNull()), // branch ""
Parallel
(
Sequence
(
// branch "{Guid}"
new ValidateParallelId(id => ValidateId(nesting1Id = id, expectedNesting: 1)),
Parallel
(
// branches "{Guid}.{Guid}"
new ValidateParallelId(id => ValidateId(nesting2Branch1 = id, expectedNesting: 2, shouldStartWith: nesting1Id)),
new ValidateParallelId(id => ValidateId(nesting2Branch2 = id, expectedNesting: 2, shouldStartWith: nesting1Id))
),
new ValidateParallelId(id => id.ShouldBe(nesting1Id))
)
),
new ValidateParallelId(id => id.ShouldBeNull())
));

nesting2Branch1.ShouldNotBe(nesting2Branch2);
}

[Fact]
public void ParallelForEachActivity()
{
var nesting1Ids = new HashSet<string>();
var nesting2Ids = new HashSet<string>();
var nesting1IdsAfter = new HashSet<string>();

Run(ParallelForEach(2, Sequence
(
new ValidateParallelId(id => nesting1Ids.Add(id).ShouldBeTrue()),
ParallelForEach(2, new ValidateParallelId(id => nesting2Ids.Add(id).ShouldBeTrue())),
new ValidateParallelId(id => nesting1IdsAfter.Add(id).ShouldBeTrue())
)));

nesting1IdsAfter.ShouldBeEquivalentTo(nesting1Ids);

// Nesting 2 ids should start with nesting 1 ids (2 counts for each nesting 1 id)
var nesting2Prefixes = nesting2Ids
.Select(id => id.Split('.')[0])
.GroupBy(id => id);
nesting2Prefixes.Select(group => group.Key).ShouldBe(nesting1Ids);
nesting2Prefixes.ShouldAllBe(group => group.Count() == 2);
}

[Fact]
public void PickActivity()
{
string trigger1Id = null;
string trigger2Id = null;
Run(Pick
(
new PickBranch
{
Trigger = new ValidateParallelId(id => ValidateId(trigger1Id = id, expectedNesting: 1)),
// only one Action should execute; the one that executes shouldn't use a new ParallelId
Action = new ValidateParallelId(id => id.ShouldBeNull())
},
new PickBranch
{
Trigger = new ValidateParallelId(id => ValidateId(trigger2Id = id, expectedNesting: 1)),
Action = new ValidateParallelId(id => id.ShouldBeNull())
}
));
trigger1Id.ShouldNotBe(trigger2Id);
}

private static void Run(Activity activity) =>
new WorkflowApplication(activity).RunUntilCompletion();

private static void ValidateId(string id, int expectedNesting, string shouldStartWith = null)
{
if (shouldStartWith is not null)
{
id.ShouldStartWith(shouldStartWith);
}
var parts = id.Split('.');
parts.Length.ShouldBe(expectedNesting);
parts.All(part => Guid.TryParse(part, out _)).ShouldBeTrue();
}

private static Sequence Sequence(params Activity[] activities)
{
var sequence = new Sequence();
activities.ToList().ForEach(sequence.Activities.Add);
return sequence;
}

private static Parallel Parallel(params Activity[] branches)
{
var parallel = new Parallel();
branches.ToList().ForEach(parallel.Branches.Add);
return parallel;
}

private static Pick Pick(params PickBranch[] branches)
{
var pick = new Pick();
branches.ToList().ForEach(pick.Branches.Add);
return pick;
}

private static ParallelForEach<int> ParallelForEach(int iterations, Activity body) => new()
{
Values = new InArgument<IEnumerable<int>>(_ => Enumerable.Range(0, iterations).ToArray()),
Body = new ActivityAction<int> { Handler = body }
};
}

public class ValidateParallelId : Activity
{
public ValidateParallelId(Action<string> validator) =>
Implementation = () => new SuspendingWrapper(new ValidateParallelIdCore(validator));
}

public class ValidateParallelIdCore(Action<string> validator) : CodeActivity
{
protected override void Execute(CodeActivityContext context) =>
validator(context.GetCurrentParallelBranchId());
}
}
1 change: 1 addition & 0 deletions src/Test/TestCases.Runtime/TestCases.Runtime.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
<ItemGroup>
<ProjectReference Include="../JsonFileInstanceStore/JsonFileInstanceStore.csproj" />
<ProjectReference Include="../TestObjects/TestObjects.csproj" />
<ProjectReference Include="../WorkflowApplicationTestExtensions/WorkflowApplicationTestExtensions.csproj" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System.Activities;
using System.Activities.Hosting;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace WorkflowApplicationTestExtensions
{
/// <summary>
/// Activity that induces Idle for a few milliseconds but not PersistableIdle.
/// This is similar to UiPath asynchronous in-process activities.
/// </summary>
public class NoPersistAsyncActivity : NativeActivity
{
private readonly Variable<NoPersistHandle> _noPersist = new();

protected override bool CanInduceIdle => true;

protected override void CacheMetadata(NativeActivityMetadata metadata)
{
metadata.AddImplementationVariable(_noPersist);
metadata.AddDefaultExtensionProvider(() => new BookmarkResumer());
base.CacheMetadata(metadata);
}

protected override void Execute(NativeActivityContext context)
{
_noPersist.Get(context).Enter(context);
context.GetExtension<BookmarkResumer>().ResumeSoon(context.CreateBookmark());
}
}

public class BookmarkResumer : IWorkflowInstanceExtension
{
private WorkflowInstanceProxy _instance;
public IEnumerable<object> GetAdditionalExtensions() => [];
public void SetInstance(WorkflowInstanceProxy instance) => _instance = instance;
public void ResumeSoon(Bookmark bookmark) => Task.Delay(10).ContinueWith(_ =>
{
_instance.BeginResumeBookmark(bookmark, null, null, null);
});
}
}
68 changes: 68 additions & 0 deletions src/Test/WorkflowApplicationTestExtensions/SuspendingWrapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using System;
using System.Activities;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.ExceptionServices;

namespace WorkflowApplicationTestExtensions
{
/// <summary>
/// Wrapper over one/multiple sequential activities.
/// Between scheduling the activity/activities, it induces PersistableIdle
/// by creating bookmarks.
/// The idea is to induce unload/load as much as possible to test persistence
/// serialization/deserialization.
/// When using <see cref="WorkflowApplicationTestExtensions"/>, the bookmarks
/// can be automatically resumed and workflow continued transparently until
/// completion.
/// </summary>
public class SuspendingWrapper : NativeActivity
{
private readonly Variable<int> _nextIndexToExecute = new();
public List<Activity> Activities { get; }
protected override bool CanInduceIdle => true;

public SuspendingWrapper(IEnumerable<Activity> activities)
{
Activities = activities.ToList();
}

public SuspendingWrapper(Activity activity) : this([activity])
{
}

public SuspendingWrapper() : this([])
{
}

protected override void CacheMetadata(NativeActivityMetadata metadata)
{
metadata.AddImplementationVariable(_nextIndexToExecute);
base.CacheMetadata(metadata);
}

protected override void Execute(NativeActivityContext context) => ExecuteNext(context);

private void OnChildCompleted(NativeActivityContext context, ActivityInstance completedInstance) =>
ExecuteNext(context);

private void OnChildFaulted(NativeActivityFaultContext faultContext, Exception propagatedException, ActivityInstance propagatedFrom) =>
ExceptionDispatchInfo.Capture(propagatedException).Throw();

private void ExecuteNext(NativeActivityContext context) =>
context.CreateBookmark(
$"{WorkflowApplicationTestExtensions.AutoResumedBookmarkNamePrefix}{Guid.NewGuid()}",
AfterResume);

private void AfterResume(NativeActivityContext context, Bookmark bookmark, object value)
{
var nextIndex = _nextIndexToExecute.Get(context);
if (nextIndex == Activities.Count)
{
return;
}
_nextIndexToExecute.Set(context, nextIndex + 1);
context.ScheduleActivity(Activities[nextIndex], OnChildCompleted, OnChildFaulted);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
using JsonFileInstanceStore;
using System;
using System.Activities;
using System.Diagnostics;
using System.Threading.Tasks;
using StringToObject = System.Collections.Generic.IDictionary<string, object>;

namespace WorkflowApplicationTestExtensions
{
public static class WorkflowApplicationTestExtensions
{
public const string AutoResumedBookmarkNamePrefix = "AutoResumedBookmark_";

public record WorkflowApplicationResult(StringToObject Outputs, int PersistenceCount);

/// <summary>
/// Simple API to wait for the workflow to complete or propagate to the caller any error.
/// Also, when PersistableIdle, will automatically Unload, Load, resume some bookmarks
/// (those named "AutoResumedBookmark_...") and continue execution.
/// </summary>
public static WorkflowApplicationResult RunUntilCompletion(this WorkflowApplication application)
{
var persistenceCount = 0;
var output = new TaskCompletionSource<WorkflowApplicationResult>();
application.Completed += (WorkflowApplicationCompletedEventArgs args) =>
{
if (args.TerminationException is { } ex)
{
output.TrySetException(ex);
}
if (args.CompletionState == ActivityInstanceState.Canceled)
{
throw new OperationCanceledException("Workflow canceled.");
}
output.TrySetResult(new(args.Outputs, persistenceCount));
};

application.Aborted += args => output.TrySetException(args.Reason);

application.InstanceStore = new FileInstanceStore(Environment.CurrentDirectory);
application.PersistableIdle += (WorkflowApplicationIdleEventArgs args) =>
{
Debug.WriteLine("PersistableIdle");
var bookmarks = args.Bookmarks;
Task.Delay(100).ContinueWith(_ =>
{
try
{
if (++persistenceCount > 100)
{
throw new Exception("Persisting too many times, aborting test.");
}
application = CloneWorkflowApplication(application);
application.Load(args.InstanceId);
foreach (var bookmark in bookmarks)
{
application.ResumeBookmark(new Bookmark(bookmark.BookmarkName), null);
}
}
catch (Exception ex)
{
output.TrySetException(ex);
}
});
return PersistableIdleAction.Unload;
};

application.BeginRun(null, null);

try
{
output.Task.Wait(TimeSpan.FromSeconds(15));
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
}
return output.Task.GetAwaiter().GetResult();
}

private static WorkflowApplication CloneWorkflowApplication(WorkflowApplication application)
{
var clone = new WorkflowApplication(application.WorkflowDefinition, application.DefinitionIdentity)
{
Aborted = application.Aborted,
Completed = application.Completed,
PersistableIdle = application.PersistableIdle,
InstanceStore = application.InstanceStore,
};
foreach (var extension in application.Extensions.GetAllSingletonExtensions())
{
clone.Extensions.Add(extension);
}
return clone;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<ProjectReference Include="..\JsonFileInstanceStore\JsonFileInstanceStore.csproj" />
</ItemGroup>
</Project>
Loading

0 comments on commit 681464d

Please sign in to comment.