Skip to content

Commit

Permalink
Merge pull request #41 from AMalininHere/worker-events
Browse files Browse the repository at this point in the history
Add ability to perform some logic during worker's lifecycle
  • Loading branch information
TechnoBerry authored Oct 10, 2021
2 parents a2fed08 + b510978 commit b64453c
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 16 deletions.
6 changes: 6 additions & 0 deletions src/Camunda.Worker/CamundaWorkerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,11 @@ public ICamundaWorkerBuilder ConfigurePipeline(Action<IPipelineBuilder> configur
});
return this;
}

public ICamundaWorkerBuilder ConfigureEvents(Action<WorkerEvents> configureAction)
{
Services.Configure(configureAction);
return this;
}
}
}
35 changes: 22 additions & 13 deletions src/Camunda.Worker/Execution/DefaultCamundaWorker.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Camunda.Worker.Client;
Expand All @@ -17,23 +16,26 @@ public sealed class DefaultCamundaWorker : ICamundaWorker
private readonly IExternalTaskClient _externalTaskClient;
private readonly ITopicsProvider _topicsProvider;
private readonly FetchAndLockOptions _fetchAndLockOptions;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly WorkerEvents _workerEvents;
private readonly IServiceProvider _serviceProvider;
private readonly WorkerHandlerDescriptor _workerHandlerDescriptor;
private readonly ILogger<DefaultCamundaWorker> _logger;

public DefaultCamundaWorker(
IExternalTaskClient externalTaskClient,
ITopicsProvider topicsProvider,
IOptions<FetchAndLockOptions> fetchAndLockOptions,
IServiceScopeFactory serviceScopeFactory,
IOptions<WorkerEvents> workerEvents,
IServiceProvider serviceProvider,
WorkerHandlerDescriptor workerHandlerDescriptor,
ILogger<DefaultCamundaWorker>? logger = null
)
{
_externalTaskClient = Guard.NotNull(externalTaskClient, nameof(externalTaskClient));
_topicsProvider = Guard.NotNull(topicsProvider, nameof(topicsProvider));
_fetchAndLockOptions = Guard.NotNull(fetchAndLockOptions, nameof(fetchAndLockOptions)).Value;
_serviceScopeFactory = Guard.NotNull(serviceScopeFactory, nameof(serviceScopeFactory));
_workerEvents = Guard.NotNull(workerEvents, nameof(workerEvents)).Value;
_serviceProvider = Guard.NotNull(serviceProvider, nameof(serviceProvider));
_workerHandlerDescriptor = Guard.NotNull(workerHandlerDescriptor, nameof(workerHandlerDescriptor));
_logger = logger ?? NullLogger<DefaultCamundaWorker>.Instance;
}
Expand All @@ -42,13 +44,23 @@ public async Task RunAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
await _workerEvents.OnBeforeFetchAndLock(_serviceProvider, cancellationToken);

var externalTasks = await SelectAsync(cancellationToken);

var executableTasks = externalTasks
.Select(ProcessExternalTask)
.ToList();
if (externalTasks.Count != 0)
{
var tasks = new Task[externalTasks.Count];
var i = 0;
foreach (var externalTask in externalTasks)
{
tasks[i++] = Task.Run(() => ProcessExternalTask(externalTask), cancellationToken);
}

await Task.WhenAll(tasks);
}

await Task.WhenAll(executableTasks);
await _workerEvents.OnAfterProcessingAllTasks(_serviceProvider, cancellationToken);
}
}

Expand All @@ -65,7 +77,7 @@ private async Task<IReadOnlyCollection<ExternalTask>> SelectAsync(CancellationTo
catch (Exception e) when (!cancellationToken.IsCancellationRequested)
{
Log.FailedLocking(_logger, e);
await DelayOnFailure(cancellationToken);
await _workerEvents.OnFailedFetchAndLock(_serviceProvider, cancellationToken);
return Array.Empty<ExternalTask>();
}
}
Expand All @@ -85,12 +97,9 @@ private FetchAndLockRequest MakeRequestBody()
return fetchAndLockRequest;
}

private static Task DelayOnFailure(CancellationToken cancellationToken) =>
Task.Delay(10_000, cancellationToken);

private async Task ProcessExternalTask(ExternalTask externalTask)
{
using var scope = _serviceScopeFactory.CreateScope();
using var scope = _serviceProvider.CreateScope();
var context = new ExternalTaskContext(externalTask, _externalTaskClient, scope.ServiceProvider);

try
Expand Down
29 changes: 29 additions & 0 deletions src/Camunda.Worker/Execution/WorkerEvents.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;

namespace Camunda.Worker.Execution
{
[ExcludeFromCodeCoverage]
public class WorkerEvents
{
public Func<IServiceProvider, CancellationToken, Task> OnBeforeFetchAndLock { get; set; } =
DefaultOnBeforeFetchAndLock;

public Func<IServiceProvider, CancellationToken, Task> OnFailedFetchAndLock { get; set; } =
DefaultOnFailedFetchAndLock;

public Func<IServiceProvider, CancellationToken, Task> OnAfterProcessingAllTasks { get; set; } =
DefaultOnAfterProcessingAllTasks;

private static Task DefaultOnBeforeFetchAndLock(IServiceProvider provider, CancellationToken ct) =>
Task.CompletedTask;

private static Task DefaultOnFailedFetchAndLock(IServiceProvider provider, CancellationToken ct) =>
Task.Delay(10_000, ct);

private static Task DefaultOnAfterProcessingAllTasks(IServiceProvider provider, CancellationToken ct) =>
Task.CompletedTask;
}
}
2 changes: 2 additions & 0 deletions src/Camunda.Worker/ICamundaWorkerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,7 @@ public interface ICamundaWorkerBuilder
ICamundaWorkerBuilder AddHandler(ExternalTaskDelegate handler, HandlerMetadata handlerMetadata);

ICamundaWorkerBuilder ConfigurePipeline(Action<IPipelineBuilder> configureAction);

ICamundaWorkerBuilder ConfigureEvents(Action<WorkerEvents> configureAction);
}
}
1 change: 1 addition & 0 deletions src/Camunda.Worker/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public static ICamundaWorkerBuilder AddCamundaWorker(
Guard.GreaterThanOrEqual(numberOfWorkers, Constants.MinimumParallelExecutors, nameof(numberOfWorkers));

services.AddOptions<FetchAndLockOptions>().Configure(options => { options.WorkerId = workerId; });
services.AddOptions<WorkerEvents>();
services.TryAddTransient<ITopicsProvider, StaticTopicsProvider>();
services.TryAddTransient<ICamundaWorker, DefaultCamundaWorker>();
services.TryAddSingleton<IEndpointProvider, TopicBasedEndpointProvider>();
Expand Down
13 changes: 12 additions & 1 deletion test/Camunda.Worker.Tests/CamundaWorkerBuilderTest.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Camunda.Worker.Client;
using Camunda.Worker.Execution;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Xunit;

namespace Camunda.Worker
Expand Down Expand Up @@ -59,6 +59,17 @@ public void TestConfigurePipeline()
d.ServiceType == typeof(WorkerHandlerDescriptor));
}

[Fact]
public void TestConfigureEvents()
{
_builder.ConfigureEvents(events =>
{
events.OnBeforeFetchAndLock = (_, _) => Task.CompletedTask;
});

Assert.Contains(_services, d => d.ServiceType == typeof(IConfigureOptions<WorkerEvents>));
}

private class EndpointProvider : IEndpointProvider
{
public ExternalTaskDelegate GetEndpointDelegate(ExternalTask externalTask)
Expand Down
19 changes: 17 additions & 2 deletions test/Camunda.Worker.Tests/Execution/DefaultCamundaWorkerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,23 @@ public class DefaultCamundaWorkerTest : IDisposable
WorkerId = "testWorker",
AsyncResponseTimeout = 5_000
});
private readonly Mock<IWorkerEvents> _workerEventsMock = new();
private readonly ServiceProvider _serviceProvider;
private readonly DefaultCamundaWorker _worker;

public DefaultCamundaWorkerTest()
{
_serviceProvider = new ServiceCollection().BuildServiceProvider();
var workerEventsOptions = Options.Create(new WorkerEvents
{
OnAfterProcessingAllTasks = _workerEventsMock.Object.OnAfterProcessingAllTasks
});
_worker = new DefaultCamundaWorker(
_clientMock.Object,
_topicsProviderMock.Object,
_fetchAndLockOptions,
_serviceProvider.GetRequiredService<IServiceScopeFactory>(),
workerEventsOptions,
_serviceProvider,
new WorkerHandlerDescriptor(_handlerMock.Object.HandleAsync)
);
}
Expand Down Expand Up @@ -60,9 +66,13 @@ public async Task TestRun(int numberOfExternalTasks)

var cts = new CancellationTokenSource();

_workerEventsMock
.Setup(e => e.OnAfterProcessingAllTasks(It.IsAny<IServiceProvider>(), It.IsAny<CancellationToken>()))
.Callback(cts.Cancel)
.Returns(Task.CompletedTask);

_clientMock
.Setup(client => client.FetchAndLockAsync(It.IsAny<FetchAndLockRequest>(), It.IsAny<CancellationToken>()))
.Callback(cts.Cancel)
.ReturnsAsync(externalTasks)
.Verifiable();

Expand Down Expand Up @@ -105,5 +115,10 @@ public interface IHandler
{
Task HandleAsync(IExternalTaskContext context);
}

public interface IWorkerEvents
{
public Task OnAfterProcessingAllTasks(IServiceProvider provider, CancellationToken ct);
}
}
}
3 changes: 3 additions & 0 deletions test/Camunda.Worker.Tests/ServiceCollectionExtensionsTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ public void TestAddCamundaWorker()
var fetchAndLockOptions = provider.GetRequiredService<IOptions<FetchAndLockOptions>>().Value;
Assert.Equal("testWorker", fetchAndLockOptions.WorkerId);

var workerEvents = provider.GetRequiredService<IOptions<WorkerEvents>>().Value;
Assert.NotNull(workerEvents);

Assert.Contains(services, d => d.Lifetime == ServiceLifetime.Singleton &&
d.ServiceType == typeof(IEndpointProvider));

Expand Down

0 comments on commit b64453c

Please sign in to comment.