Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a0e6a4a
Distributed locks to prevent multiple pods executing Reload and Refresh
lukhipolito Feb 19, 2026
3cb85ca
Update src/modules/Elsa.Workflows.Runtime/Services/WorkflowDefinition…
lukhipolito-nexxbiz Feb 19, 2026
d6e28f0
Update src/modules/Elsa.Workflows.Runtime/Services/WorkflowDefinition…
lukhipolito-nexxbiz Feb 19, 2026
3aa143b
Update src/modules/Elsa.Workflows.Runtime/Services/WorkflowDefinition…
lukhipolito-nexxbiz Feb 19, 2026
fcb7b04
Moving distributed lock decorators to proper folder
lukhipolito Feb 19, 2026
5fea6f2
Adding logging for missing lock
lukhipolito Feb 19, 2026
0f648b6
Fixing logger use in distributed classes
lukhipolito Feb 19, 2026
789d8e8
Update src/modules/Elsa.Workflows.Runtime/Distributed/WorkflowDefinit…
lukhipolito-nexxbiz Feb 19, 2026
c8c6b48
Update src/modules/Elsa.Workflows.Runtime/Distributed/WorkflowDefinit…
lukhipolito-nexxbiz Feb 19, 2026
2521c3f
Update src/modules/Elsa.Workflows.Runtime/Distributed/WorkflowDefinit…
lukhipolito-nexxbiz Feb 19, 2026
d4bfe8a
Update src/modules/Elsa.Workflows.Runtime/Distributed/WorkflowDefinit…
lukhipolito-nexxbiz Feb 19, 2026
a8cae08
Update src/modules/Elsa.Workflows.Runtime/Distributed/WorkflowDefinit…
lukhipolito-nexxbiz Feb 19, 2026
444e248
Update src/modules/Elsa.Workflows.Runtime/Distributed/WorkflowDefinit…
lukhipolito-nexxbiz Feb 19, 2026
84d335d
Moving and renaming for clearer purpose
lukhipolito Feb 19, 2026
348559a
Fixing build after file moves
lukhipolito Feb 19, 2026
33cb21b
Addressing workflow refresher empty query leaking distributed lock logic
lukhipolito Feb 20, 2026
b0abe55
Merge branch 'release/3.6.0' into feat/multi-trigger-publish-update
lukhipolito Feb 20, 2026
c3d4e67
Merge branch 'release/3.6.0' into feat/multi-trigger-publish-update
lukhipolito Feb 23, 2026
7279619
Add status to WorkflowRefresherResponse model for in progress operation
lukhipolito Feb 23, 2026
37915ed
Merge branch 'release/3.6.0' into feat/multi-trigger-publish-update
lukhipolito Feb 23, 2026
82fd7e3
Removing typo, fixing build
lukhipolito Feb 23, 2026
0618f52
Update src/modules/Elsa.Workflows.Runtime.Distributed/Services/Distri…
lukhipolito-nexxbiz Feb 23, 2026
5bf7538
Refactor `RefreshWorkflowDefinitionsAsync` for lock acquisition logic…
sfmskywalker Feb 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@ public override void Configure()
public override async Task HandleAsync(Request request, CancellationToken cancellationToken)
{
var result = await RefreshWorkflowDefinitionsAsync(request.DefinitionIds, cancellationToken);
await Send.OkAsync(new Response(result.Refreshed, result.NotFound), cancellationToken);
if (result.Status == RefreshWorkflowDefinitionsStatus.Completed)
{
await Send.OkAsync(new Response(result.Refreshed, result.NotFound), cancellationToken);
}
else
{
await Send.AcceptedAtAsync<Refresh>(responseBody: new Response(result.Refreshed, result.NotFound), cancellation: cancellationToken);
}
}

private async Task<RefreshWorkflowDefinitionsResponse> RefreshWorkflowDefinitionsAsync(ICollection<string>? definitionIds, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public override void Apply()
{
Services
.AddScoped<DistributedWorkflowRuntime>()
.AddScoped<DistributedBookmarkQueueWorker>();
.AddScoped<DistributedBookmarkQueueWorker>()

.Decorate<IWorkflowDefinitionsRefresher, DistributedWorkflowDefinitionsRefresher>()
.Decorate<IWorkflowDefinitionsReloader, DistributedWorkflowDefinitionsReloader>();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using Elsa.Workflows.Runtime.Requests;
using Elsa.Workflows.Runtime.Responses;
using JetBrains.Annotations;
using Medallion.Threading;
using Microsoft.Extensions.Logging;

namespace Elsa.Workflows.Runtime.Distributed;

/// <summary>
/// Decorator class that adds distributed locking to the Workflow Definitions Refresher.
/// </summary>
[UsedImplicitly]
public class DistributedWorkflowDefinitionsRefresher(IWorkflowDefinitionsRefresher inner,
IDistributedLockProvider distributedLockProvider,
ILogger<DistributedWorkflowDefinitionsRefresher> logger) : IWorkflowDefinitionsRefresher
{
/// <summary>
/// This ensures that only one instance of the application can refresh a set of workflow definitions at a time, preventing potential conflicts and ensuring consistency across distributed environments.
/// </summary>
public async Task<RefreshWorkflowDefinitionsResponse> RefreshWorkflowDefinitionsAsync(RefreshWorkflowDefinitionsRequest request, CancellationToken cancellationToken = default)
{
var isRefreshingAll = request.DefinitionIds == null || request.DefinitionIds.Count == 0;
var lockKey = isRefreshingAll
? "WorkflowDefinitionsRefresher:All"
: $"WorkflowDefinitionsRefresher:{string.Join(",", request.DefinitionIds!.OrderBy(x => x))}";

await using var distributedLock = await distributedLockProvider.TryAcquireLockAsync(
lockKey,
TimeSpan.Zero,
cancellationToken);

if (distributedLock == null)
{
var logMessage = isRefreshingAll
? "Could not acquire lock for refreshing all workflow definitions. Another instance is already refreshing all workflow definitions"
: "Could not acquire lock for refreshing workflow definitions. Another instance is already refreshing these workflow definitions";

logger.LogInformation(logMessage);

var failedDefinitionIds = isRefreshingAll ? Array.Empty<string>() : request.DefinitionIds!;
return new(Array.Empty<string>(), failedDefinitionIds, RefreshWorkflowDefinitionsStatus.AlreadyInProgress);
}

return await inner.RefreshWorkflowDefinitionsAsync(request, cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using JetBrains.Annotations;
using Medallion.Threading;
using Microsoft.Extensions.Logging;

namespace Elsa.Workflows.Runtime.Distributed;

/// <summary>
/// Decorator class that adds distributed locking to the Workflow Definitions Reloader.
/// </summary>
[UsedImplicitly]
public class DistributedWorkflowDefinitionsReloader(
IWorkflowDefinitionsReloader inner,
IDistributedLockProvider distributedLockProvider,
ILogger<DistributedWorkflowDefinitionsReloader> logger) : IWorkflowDefinitionsReloader
{
private const string LockKey = "WorkflowDefinitionsReloader";

/// <summary>
/// This ensures that only one instance of the application can reload workflow definitions at a time, preventing potential conflicts and ensuring consistency across distributed environments.
/// </summary>
public async Task ReloadWorkflowDefinitionsAsync(CancellationToken cancellationToken = default)
{
await using var distributedLock = await distributedLockProvider.TryAcquireLockAsync(
LockKey,
TimeSpan.Zero,
cancellationToken);

if (distributedLock == null)
{
logger.LogInformation("Could not acquire lock for workflow definitions reload. Another instance is already reloading workflow definitions");
return;
}

await inner.ReloadWorkflowDefinitionsAsync(cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Elsa.Workflows.Runtime;

public enum RefreshWorkflowDefinitionsStatus
{
Completed,
AlreadyInProgress
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,8 @@ namespace Elsa.Workflows.Runtime.Responses;
/// <summary>
/// Represents a response to a request to refresh workflow definitions.
/// </summary>
public record RefreshWorkflowDefinitionsResponse(ICollection<string> Refreshed, ICollection<string> NotFound);
public record RefreshWorkflowDefinitionsResponse(
ICollection<string> Refreshed,
ICollection<string> NotFound,
RefreshWorkflowDefinitionsStatus Status = RefreshWorkflowDefinitionsStatus.Completed
);
Loading