Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,35 @@ public void AddStandardStrategies()
Add(new WorkflowExecutedWorkflowStrategy());
Add(new ActivityExecutingWorkflowStrategy());
Add(new ActivityExecutedWorkflowStrategy());

// Activity commit strategies.
Add(new CommitAlwaysActivityStrategy());
Add(new CommitNeverActivityStrategy());
Add(new ExecutingActivityStrategy());
Add(new ExecutedActivityStrategy());
}

public void Add(IWorkflowCommitStrategy strategy)
{
var registration = ObjectRegistrationFactory.Describe(strategy);
Add(registration);
}

public void Add(string displayName, IWorkflowCommitStrategy strategy)
{
var registration = ObjectRegistrationFactory.Describe(strategy);
registration.Metadata.DisplayName = displayName;
Add(registration);
}

public void Add(string displayName, string description, IWorkflowCommitStrategy strategy)
{
var registration = ObjectRegistrationFactory.Describe(strategy);
registration.Metadata.DisplayName = displayName;
registration.Metadata.Description = description;
Add(registration);
}

public void Add(string name, string displayName, string description, IWorkflowCommitStrategy strategy)
{
var registration = ObjectRegistrationFactory.Describe(strategy);
Expand All @@ -53,33 +53,33 @@ public void Add(string name, string displayName, string description, IWorkflowCo
registration.Metadata.Description = description;
Add(registration);
}

public void Add(WorkflowCommitStrategyRegistration registration)
{
Services.Configure<CommitStateOptions>(options => options.WorkflowCommitStrategies[registration.Metadata.Name] = registration);
}

public void Add(IActivityCommitStrategy strategy)
{
var registration = ObjectRegistrationFactory.Describe(strategy);
Add(registration);
}

public void Add(string displayName, IActivityCommitStrategy strategy)
{
var registration = ObjectRegistrationFactory.Describe(strategy);
registration.Metadata.DisplayName = displayName;
Add(registration);
}

public void Add(string displayName, string description, IActivityCommitStrategy strategy)
{
var registration = ObjectRegistrationFactory.Describe(strategy);
registration.Metadata.DisplayName = displayName;
registration.Metadata.Description = description;
Add(registration);
}

public void Add(string name, string displayName, string description, IActivityCommitStrategy strategy)
{
var registration = ObjectRegistrationFactory.Describe(strategy);
Expand All @@ -88,12 +88,32 @@ public void Add(string name, string displayName, string description, IActivityCo
registration.Metadata.Description = description;
Add(registration);
}

public void Add(ActivityCommitStrategyRegistration registration)
{
Services.Configure<CommitStateOptions>(options => options.ActivityCommitStrategies[registration.Metadata.Name] = registration);
}


/// <summary>
/// Sets the specified workflow commit strategy as the global default.
/// The strategy will not be added to the registry and will only serve as a fallback when workflows do not specify their own strategy.
/// </summary>
/// <param name="strategy">The workflow commit strategy instance to use as the default.</param>
public void SetDefaultWorkflowCommitStrategy(IWorkflowCommitStrategy strategy)
{
Services.Configure<CommitStateOptions>(options => options.DefaultWorkflowCommitStrategy = strategy);
}

/// <summary>
/// Sets the specified activity commit strategy as the global default.
/// The strategy will not be added to the registry and will only serve as a fallback when activities do not specify their own strategy.
/// </summary>
/// <param name="strategy">The activity commit strategy instance to use as the default.</param>
public void SetDefaultActivityCommitStrategy(IActivityCommitStrategy strategy)
{
Services.Configure<CommitStateOptions>(options => options.DefaultActivityCommitStrategy = strategy);
}

public override void Apply()
{
Services.AddSingleton<ICommitStrategyRegistry, DefaultCommitStrategyRegistry>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,46 @@
// ReSharper disable once CheckNamespace
namespace Elsa.Extensions;

/// <summary>
/// Provides extension methods for configuring commit strategies on <see cref="WorkflowsFeature"/>.
/// </summary>
public static class WorkflowsFeatureCommitStateExtensions
{
/// <summary>
/// Configures commit strategies for workflows.
/// </summary>
/// <param name="workflowsFeature">The workflows feature.</param>
/// <param name="configure">An optional configuration delegate for the commit strategies feature.</param>
/// <returns>The workflows feature for chaining.</returns>
public static WorkflowsFeature UseCommitStrategies(this WorkflowsFeature workflowsFeature, Action<CommitStrategiesFeature>? configure = null)
{
workflowsFeature.Module.Use(configure);
return workflowsFeature;
}


/// <summary>
/// Sets the specified workflow commit strategy as the global default for all workflows that do not specify their own strategy.
/// The strategy will not be added to the registry and serves only as a fallback.
/// </summary>
/// <param name="workflowsFeature">The workflows feature.</param>
/// <param name="strategy">The workflow commit strategy instance to use as the default.</param>
/// <returns>The workflows feature for chaining.</returns>
public static WorkflowsFeature WithDefaultWorkflowCommitStrategy(this WorkflowsFeature workflowsFeature, IWorkflowCommitStrategy strategy)
{
workflowsFeature.Module.Use<CommitStrategiesFeature>(feature => feature.SetDefaultWorkflowCommitStrategy(strategy));
return workflowsFeature;
}

/// <summary>
/// Sets the specified activity commit strategy as the global default for all activities that do not specify their own strategy.
/// The strategy will not be added to the registry and serves only as a fallback.
/// </summary>
/// <param name="workflowsFeature">The workflows feature.</param>
/// <param name="strategy">The activity commit strategy instance to use as the default.</param>
/// <returns>The workflows feature for chaining.</returns>
public static WorkflowsFeature WithDefaultActivityCommitStrategy(this WorkflowsFeature workflowsFeature, IActivityCommitStrategy strategy)
{
workflowsFeature.Module.Use<CommitStrategiesFeature>(feature => feature.SetDefaultActivityCommitStrategy(strategy));
return workflowsFeature;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,29 @@
namespace Elsa.Workflows.CommitStates;

/// <summary>
/// Configuration options for commit state strategies.
/// </summary>
public class CommitStateOptions
{
/// <summary>
/// Gets or sets the workflow commit strategies.
/// </summary>
public IDictionary<string, WorkflowCommitStrategyRegistration> WorkflowCommitStrategies { get; set; } = new Dictionary<string, WorkflowCommitStrategyRegistration>();

/// <summary>
/// Gets or sets the activity commit strategies.
/// </summary>
public IDictionary<string, ActivityCommitStrategyRegistration> ActivityCommitStrategies { get; set; } = new Dictionary<string, ActivityCommitStrategyRegistration>();

/// <summary>
/// Gets or sets the default workflow commit strategy instance to use when a workflow does not specify its own.
/// This strategy is not added to the registry and serves only as a fallback.
/// </summary>
public IWorkflowCommitStrategy? DefaultWorkflowCommitStrategy { get; set; }

/// <summary>
/// Gets or sets the default activity commit strategy instance to use when an activity does not specify its own.
/// This strategy is not added to the registry and serves only as a fallback.
/// </summary>
public IActivityCommitStrategy? DefaultActivityCommitStrategy { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Elsa.Workflows.CommitStates;
using Elsa.Workflows.Pipelines.ActivityExecution;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Elsa.Workflows.Middleware.Activities;

Expand All @@ -22,11 +23,11 @@ public static class ActivityInvokerMiddlewareExtensions
/// <summary>
/// A default activity execution middleware component that evaluates the current activity's properties, executes the activity and adds any produced bookmarks to the workflow execution context.
/// </summary>
public class DefaultActivityInvokerMiddleware(ActivityMiddlewareDelegate next, ICommitStrategyRegistry commitStrategyRegistry, ILogger<DefaultActivityInvokerMiddleware> logger)
public class DefaultActivityInvokerMiddleware(ActivityMiddlewareDelegate next, ICommitStrategyRegistry commitStrategyRegistry, IOptions<CommitStateOptions> commitStateOptions, ILogger<DefaultActivityInvokerMiddleware> logger)
: IActivityExecutionMiddleware
{
private static readonly MethodInfo ExecuteAsyncMethodInfo = typeof(IActivity).GetMethod(nameof(IActivity.ExecuteAsync))!;

/// <inheritdoc />
public async ValueTask InvokeAsync(ActivityExecutionContext context)
{
Expand Down Expand Up @@ -65,7 +66,7 @@ public async ValueTask InvokeAsync(ActivityExecutionContext context)

// Execute activity.
await ExecuteActivityAsync(context);

var currentActivityStatus = context.Status;
var activityDidComplete = previousActivityStatus != ActivityStatus.Completed && currentActivityStatus == ActivityStatus.Completed;

Expand All @@ -86,7 +87,7 @@ public async ValueTask InvokeAsync(ActivityExecutionContext context)

// Invoke next middleware.
await next(context);

// If the activity completed, send a notification.
if (activityDidComplete)
{
Expand All @@ -105,7 +106,7 @@ public async ValueTask InvokeAsync(ActivityExecutionContext context)
/// </summary>
protected virtual async ValueTask ExecuteActivityAsync(ActivityExecutionContext context)
{
var executeDelegate = context.WorkflowExecutionContext.ExecuteDelegate
var executeDelegate = context.WorkflowExecutionContext.ExecuteDelegate
?? (ExecuteActivityDelegate)Delegate.CreateDelegate(typeof(ExecuteActivityDelegate), context.Activity, ExecuteAsyncMethodInfo);

await executeDelegate(context);
Expand All @@ -129,7 +130,11 @@ private async Task EvaluateInputPropertiesAsync(ActivityExecutionContext context
private bool ShouldCommit(ActivityExecutionContext context, ActivityLifetimeEvent lifetimeEvent)
{
var strategyName = context.Activity.GetCommitStrategy();
var strategy = string.IsNullOrWhiteSpace(strategyName) ? null : commitStrategyRegistry.FindActivityStrategy(strategyName);

IActivityCommitStrategy? strategy = !string.IsNullOrWhiteSpace(strategyName)
? commitStrategyRegistry.FindActivityStrategy(strategyName)
: commitStateOptions.Value.DefaultActivityCommitStrategy;

var commitAction = CommitAction.Default;

if (strategy != null)
Expand All @@ -147,7 +152,10 @@ private bool ShouldCommit(ActivityExecutionContext context, ActivityLifetimeEven
case CommitAction.Default:
{
var workflowStrategyName = context.WorkflowExecutionContext.Workflow.Options.CommitStrategyName;
var workflowStrategy = string.IsNullOrWhiteSpace(workflowStrategyName) ? null : commitStrategyRegistry.FindWorkflowStrategy(workflowStrategyName);

IWorkflowCommitStrategy? workflowStrategy = !string.IsNullOrWhiteSpace(workflowStrategyName)
? commitStrategyRegistry.FindWorkflowStrategy(workflowStrategyName)
: commitStateOptions.Value.DefaultWorkflowCommitStrategy;

if (workflowStrategy == null)
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Elsa.Workflows.Models;
using Elsa.Workflows.Options;
using Elsa.Workflows.Pipelines.WorkflowExecution;
using Microsoft.Extensions.Options;

namespace Elsa.Workflows.Middleware.Workflows;

Expand All @@ -20,7 +21,7 @@ public static class UseActivitySchedulerMiddlewareExtensions
/// <summary>
/// A workflow execution middleware component that executes scheduled work items.
/// </summary>
public class DefaultActivitySchedulerMiddleware(WorkflowMiddlewareDelegate next, IActivityInvoker activityInvoker, ICommitStrategyRegistry commitStrategyRegistry) : WorkflowExecutionMiddleware(next)
public class DefaultActivitySchedulerMiddleware(WorkflowMiddlewareDelegate next, IActivityInvoker activityInvoker, ICommitStrategyRegistry commitStrategyRegistry, IOptions<CommitStateOptions> commitStateOptions) : WorkflowExecutionMiddleware(next)
{
/// <inheritdoc />
public override async ValueTask InvokeAsync(WorkflowExecutionContext context)
Expand All @@ -29,19 +30,19 @@ public override async ValueTask InvokeAsync(WorkflowExecutionContext context)

context.TransitionTo(WorkflowSubStatus.Executing);
await ConditionallyCommitStateAsync(context, WorkflowLifetimeEvent.WorkflowExecuting);

while (scheduler.HasAny)
{
// Do not start a workflow if cancellation has been requested.
if (context.CancellationToken.IsCancellationRequested)
break;

var currentWorkItem = scheduler.Take();
await ExecuteWorkItemAsync(context, currentWorkItem);
}

await Next(context);

if (context.Status == WorkflowStatus.Running)
context.TransitionTo(context.AllActivitiesCompleted() ? WorkflowSubStatus.Finished : WorkflowSubStatus.Suspended);
}
Expand All @@ -59,18 +60,20 @@ private async Task ExecuteWorkItemAsync(WorkflowExecutionContext context, Activi

await activityInvoker.InvokeAsync(context, workItem.Activity, options);
}

private async Task ConditionallyCommitStateAsync(WorkflowExecutionContext context, WorkflowLifetimeEvent lifetimeEvent)
{
var strategyName = context.Workflow.Options.CommitStrategyName;
var strategy = string.IsNullOrWhiteSpace(strategyName) ? null : commitStrategyRegistry.FindWorkflowStrategy(strategyName);

if(strategy == null)
IWorkflowCommitStrategy? strategy = !string.IsNullOrWhiteSpace(strategyName)
? commitStrategyRegistry.FindWorkflowStrategy(strategyName)
: commitStateOptions.Value.DefaultWorkflowCommitStrategy;

if (strategy == null)
return;

var strategyContext = new WorkflowCommitStateStrategyContext(context, lifetimeEvent);
var commitAction = strategy.ShouldCommit(strategyContext);

if (commitAction is CommitAction.Commit)
await context.CommitAsync();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Elsa.Workflows.Runtime.Stimuli;
using JetBrains.Annotations;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Elsa.Workflows.Runtime.Middleware.Activities;

Expand All @@ -24,8 +25,9 @@ public class BackgroundActivityInvokerMiddleware(
IIdentityGenerator identityGenerator,
IBackgroundActivityScheduler backgroundActivityScheduler,
ICommitStrategyRegistry commitStrategyRegistry,
IMediator mediator)
: DefaultActivityInvokerMiddleware(next, commitStrategyRegistry, logger)
IMediator mediator,
IOptions<CommitStateOptions> commitStateOptions)
: DefaultActivityInvokerMiddleware(next, commitStrategyRegistry, commitStateOptions, logger)
{
internal static string GetBackgroundActivityOutputKey(string activityNodeId) => $"__BackgroundActivityOutput:{activityNodeId}";
internal static string GetBackgroundActivityOutcomesKey(string activityNodeId) => $"__BackgroundActivityOutcomes:{activityNodeId}";
Expand Down Expand Up @@ -129,7 +131,7 @@ private static bool GetTaskRunAsynchronously(ActivityExecutionContext context)
}

private static bool GetIsBackgroundExecution(ActivityExecutionContext context) => context.TransientProperties.ContainsKey(BackgroundActivityExecutionContextExtensions.IsBackgroundExecution);

/// <summary>
/// If the input contains captured output from the background activity invoker, apply that to the execution context.
/// </summary>
Expand Down Expand Up @@ -183,7 +185,7 @@ private void CaptureBookmarkData(ActivityExecutionContext context)

context.WorkflowExecutionContext.Properties.Remove(bookmarksKey);
}

private void CapturePropertiesIfAny(ActivityExecutionContext context)
{
var activity = context.Activity;
Expand All @@ -195,7 +197,7 @@ private void CapturePropertiesIfAny(ActivityExecutionContext context)
if (capturedProperties == null)
return;

foreach (var property in capturedProperties)
foreach (var property in capturedProperties)
context.Properties[property.Key] = property.Value;
}

Expand Down
Loading
Loading