diff --git a/src/modules/Elsa.Workflows.Api/Endpoints/WorkflowDefinitions/Refresh/Endpoint.cs b/src/modules/Elsa.Workflows.Api/Endpoints/WorkflowDefinitions/Refresh/Endpoint.cs index b83a549310..509998b268 100644 --- a/src/modules/Elsa.Workflows.Api/Endpoints/WorkflowDefinitions/Refresh/Endpoint.cs +++ b/src/modules/Elsa.Workflows.Api/Endpoints/WorkflowDefinitions/Refresh/Endpoint.cs @@ -20,7 +20,14 @@ public override void Configure() public override async Task HandleAsync(Request request, CancellationToken cancellationToken) { var result = await RefreshWorkflowDefinitionsAsync(request.DefinitionIds, cancellationToken); - await Send.OkAsync(new Response(result.Refreshed, result.NotFound), cancellationToken); + if (result.Status == RefreshWorkflowDefinitionsStatus.Completed) + { + await Send.OkAsync(new Response(result.Refreshed, result.NotFound), cancellationToken); + } + else + { + await Send.AcceptedAtAsync(responseBody: new Response(result.Refreshed, result.NotFound), cancellation: cancellationToken); + } } private async Task RefreshWorkflowDefinitionsAsync(ICollection? definitionIds, CancellationToken cancellationToken) diff --git a/src/modules/Elsa.Workflows.Runtime.Distributed/Features/DistributedRuntimeFeature.cs b/src/modules/Elsa.Workflows.Runtime.Distributed/Features/DistributedRuntimeFeature.cs index e783dd43e5..a41bcf5752 100644 --- a/src/modules/Elsa.Workflows.Runtime.Distributed/Features/DistributedRuntimeFeature.cs +++ b/src/modules/Elsa.Workflows.Runtime.Distributed/Features/DistributedRuntimeFeature.cs @@ -29,6 +29,9 @@ public override void Apply() { Services .AddScoped() - .AddScoped(); + .AddScoped() + + .Decorate() + .Decorate(); } } \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime.Distributed/Services/DistributedWorkflowDefinitionsRefresher.cs b/src/modules/Elsa.Workflows.Runtime.Distributed/Services/DistributedWorkflowDefinitionsRefresher.cs new file mode 100644 index 0000000000..5a7bb16ba8 --- /dev/null +++ b/src/modules/Elsa.Workflows.Runtime.Distributed/Services/DistributedWorkflowDefinitionsRefresher.cs @@ -0,0 +1,46 @@ +using Elsa.Workflows.Runtime.Requests; +using Elsa.Workflows.Runtime.Responses; +using JetBrains.Annotations; +using Medallion.Threading; +using Microsoft.Extensions.Logging; + +namespace Elsa.Workflows.Runtime.Distributed; + +/// +/// Decorator class that adds distributed locking to the Workflow Definitions Refresher. +/// +[UsedImplicitly] +public class DistributedWorkflowDefinitionsRefresher(IWorkflowDefinitionsRefresher inner, + IDistributedLockProvider distributedLockProvider, + ILogger logger) : IWorkflowDefinitionsRefresher +{ + /// + /// This ensures that only one instance of the application can refresh a set of workflow definitions at a time, preventing potential conflicts and ensuring consistency across distributed environments. + /// + public async Task RefreshWorkflowDefinitionsAsync(RefreshWorkflowDefinitionsRequest request, CancellationToken cancellationToken = default) + { + var isRefreshingAll = request.DefinitionIds == null || request.DefinitionIds.Count == 0; + var lockKey = isRefreshingAll + ? "WorkflowDefinitionsRefresher:All" + : $"WorkflowDefinitionsRefresher:{string.Join(",", request.DefinitionIds!.OrderBy(x => x))}"; + + await using var distributedLock = await distributedLockProvider.TryAcquireLockAsync( + lockKey, + TimeSpan.Zero, + cancellationToken); + + if (distributedLock == null) + { + var logMessage = isRefreshingAll + ? "Could not acquire lock for refreshing all workflow definitions. Another instance is already refreshing all workflow definitions" + : "Could not acquire lock for refreshing workflow definitions. Another instance is already refreshing these workflow definitions"; + + logger.LogInformation(logMessage); + + var failedDefinitionIds = isRefreshingAll ? Array.Empty() : request.DefinitionIds!; + return new(Array.Empty(), failedDefinitionIds, RefreshWorkflowDefinitionsStatus.AlreadyInProgress); + } + + return await inner.RefreshWorkflowDefinitionsAsync(request, cancellationToken); + } +} \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime.Distributed/Services/DistributedWorkflowDefinitionsReloader.cs b/src/modules/Elsa.Workflows.Runtime.Distributed/Services/DistributedWorkflowDefinitionsReloader.cs new file mode 100644 index 0000000000..40599f0340 --- /dev/null +++ b/src/modules/Elsa.Workflows.Runtime.Distributed/Services/DistributedWorkflowDefinitionsReloader.cs @@ -0,0 +1,36 @@ +using JetBrains.Annotations; +using Medallion.Threading; +using Microsoft.Extensions.Logging; + +namespace Elsa.Workflows.Runtime.Distributed; + +/// +/// Decorator class that adds distributed locking to the Workflow Definitions Reloader. +/// +[UsedImplicitly] +public class DistributedWorkflowDefinitionsReloader( + IWorkflowDefinitionsReloader inner, + IDistributedLockProvider distributedLockProvider, + ILogger logger) : IWorkflowDefinitionsReloader +{ + private const string LockKey = "WorkflowDefinitionsReloader"; + + /// + /// This ensures that only one instance of the application can reload workflow definitions at a time, preventing potential conflicts and ensuring consistency across distributed environments. + /// + public async Task ReloadWorkflowDefinitionsAsync(CancellationToken cancellationToken = default) + { + await using var distributedLock = await distributedLockProvider.TryAcquireLockAsync( + LockKey, + TimeSpan.Zero, + cancellationToken); + + if (distributedLock == null) + { + logger.LogInformation("Could not acquire lock for workflow definitions reload. Another instance is already reloading workflow definitions"); + return; + } + + await inner.ReloadWorkflowDefinitionsAsync(cancellationToken); + } +} \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime/Enums/RefreshWorkflowDefinitionsStatus.cs b/src/modules/Elsa.Workflows.Runtime/Enums/RefreshWorkflowDefinitionsStatus.cs new file mode 100644 index 0000000000..d7b260756b --- /dev/null +++ b/src/modules/Elsa.Workflows.Runtime/Enums/RefreshWorkflowDefinitionsStatus.cs @@ -0,0 +1,7 @@ +namespace Elsa.Workflows.Runtime; + +public enum RefreshWorkflowDefinitionsStatus +{ + Completed, + AlreadyInProgress +} \ No newline at end of file diff --git a/src/modules/Elsa.Workflows.Runtime/Responses/RefreshWorkflowDefinitionsResponse.cs b/src/modules/Elsa.Workflows.Runtime/Responses/RefreshWorkflowDefinitionsResponse.cs index 6a9843f15b..be2619ca15 100644 --- a/src/modules/Elsa.Workflows.Runtime/Responses/RefreshWorkflowDefinitionsResponse.cs +++ b/src/modules/Elsa.Workflows.Runtime/Responses/RefreshWorkflowDefinitionsResponse.cs @@ -3,4 +3,8 @@ namespace Elsa.Workflows.Runtime.Responses; /// /// Represents a response to a request to refresh workflow definitions. /// -public record RefreshWorkflowDefinitionsResponse(ICollection Refreshed, ICollection NotFound); \ No newline at end of file +public record RefreshWorkflowDefinitionsResponse( + ICollection Refreshed, + ICollection NotFound, + RefreshWorkflowDefinitionsStatus Status = RefreshWorkflowDefinitionsStatus.Completed + ); \ No newline at end of file