Skip to content

Commit

Permalink
Merge pull request #45 from AMalininHere/fetch-and-lock-request-provider
Browse files Browse the repository at this point in the history
Add FetchAndLockRequestProvider
  • Loading branch information
TechnoBerry authored Nov 11, 2021
2 parents d073eb0 + 40c2253 commit 7f255ed
Show file tree
Hide file tree
Showing 13 changed files with 186 additions and 54 deletions.
14 changes: 14 additions & 0 deletions src/Camunda.Worker/CamundaWorkerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,20 @@ public ICamundaWorkerBuilder AddTopicsProvider<TProvider>() where TProvider : cl
return this;
}

public ICamundaWorkerBuilder AddFetchAndLockRequestProvider(
Func<WorkerServiceOptions, IServiceProvider, IFetchAndLockRequestProvider> factory
)
{
Services.AddSingleton(provider =>
{
var handlerDescriptors = provider.GetServices<HandlerDescriptor>();
var options = new WorkerServiceOptions(WorkerId, handlerDescriptors);
return factory(options, provider);
});

return this;
}

public ICamundaWorkerBuilder AddHandler(ExternalTaskDelegate handler, HandlerMetadata handlerMetadata)
{
Guard.NotNull(handler, nameof(handler));
Expand Down
26 changes: 4 additions & 22 deletions src/Camunda.Worker/Execution/DefaultCamundaWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,23 @@ namespace Camunda.Worker.Execution
public sealed class DefaultCamundaWorker : ICamundaWorker
{
private readonly IExternalTaskClient _externalTaskClient;
private readonly ITopicsProvider _topicsProvider;
private readonly FetchAndLockOptions _fetchAndLockOptions;
private readonly IFetchAndLockRequestProvider _fetchAndLockRequestProvider;
private readonly WorkerEvents _workerEvents;
private readonly IServiceProvider _serviceProvider;
private readonly WorkerHandlerDescriptor _workerHandlerDescriptor;
private readonly ILogger<DefaultCamundaWorker> _logger;

public DefaultCamundaWorker(
IExternalTaskClient externalTaskClient,
ITopicsProvider topicsProvider,
IOptions<FetchAndLockOptions> fetchAndLockOptions,
IFetchAndLockRequestProvider fetchAndLockRequestProvider,
IOptions<WorkerEvents> workerEvents,
IServiceProvider serviceProvider,
WorkerHandlerDescriptor workerHandlerDescriptor,
ILogger<DefaultCamundaWorker>? logger = null
)
{
_externalTaskClient = Guard.NotNull(externalTaskClient, nameof(externalTaskClient));
_topicsProvider = Guard.NotNull(topicsProvider, nameof(topicsProvider));
_fetchAndLockOptions = Guard.NotNull(fetchAndLockOptions, nameof(fetchAndLockOptions)).Value;
_fetchAndLockRequestProvider = Guard.NotNull(fetchAndLockRequestProvider, nameof(fetchAndLockRequestProvider));
_workerEvents = Guard.NotNull(workerEvents, nameof(workerEvents)).Value;
_serviceProvider = Guard.NotNull(serviceProvider, nameof(serviceProvider));
_workerHandlerDescriptor = Guard.NotNull(workerHandlerDescriptor, nameof(workerHandlerDescriptor));
Expand Down Expand Up @@ -69,7 +66,7 @@ private async Task<IReadOnlyCollection<ExternalTask>> SelectAsync(CancellationTo
try
{
Log.Waiting(_logger);
var fetchAndLockRequest = MakeRequestBody();
var fetchAndLockRequest = _fetchAndLockRequestProvider.GetRequest();
var externalTasks = await _externalTaskClient.FetchAndLockAsync(fetchAndLockRequest, cancellationToken);
Log.Locked(_logger, externalTasks.Count);
return externalTasks;
Expand All @@ -82,21 +79,6 @@ private async Task<IReadOnlyCollection<ExternalTask>> SelectAsync(CancellationTo
}
}

private FetchAndLockRequest MakeRequestBody()
{
var topics = _topicsProvider.GetTopics();

var fetchAndLockRequest =
new FetchAndLockRequest(_fetchAndLockOptions.WorkerId, _fetchAndLockOptions.MaxTasks)
{
UsePriority = _fetchAndLockOptions.UsePriority,
AsyncResponseTimeout = _fetchAndLockOptions.AsyncResponseTimeout,
Topics = topics
};

return fetchAndLockRequest;
}

private async Task ProcessExternalTask(ExternalTask externalTask)
{
using var scope = _serviceProvider.CreateScope();
Expand Down
3 changes: 3 additions & 0 deletions src/Camunda.Worker/Execution/FetchAndLockOptions.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System;

namespace Camunda.Worker.Execution
{
public class FetchAndLockOptions
Expand All @@ -6,6 +8,7 @@ public class FetchAndLockOptions
private int _maxTasks = 1;
private int _asyncResponseTimeout = 10_000;

[Obsolete]
public string WorkerId
{
get => _workerId;
Expand Down
12 changes: 12 additions & 0 deletions src/Camunda.Worker/Execution/IFetchAndLockRequestProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using Camunda.Worker.Client;

namespace Camunda.Worker.Execution
{
public interface IFetchAndLockRequestProvider
{
/// <summary>
/// This method is called in the worker before each "fetch and lock" operation
/// </summary>
FetchAndLockRequest GetRequest();
}
}
2 changes: 2 additions & 0 deletions src/Camunda.Worker/Execution/ITopicsProvider.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System;
using System.Collections.Generic;
using Camunda.Worker.Client;

namespace Camunda.Worker.Execution
{
[Obsolete("Use IFetchAndLockRequestProvider instead")]
public interface ITopicsProvider
{
IReadOnlyCollection<FetchAndLockRequest.Topic> GetTopics();
Expand Down
34 changes: 34 additions & 0 deletions src/Camunda.Worker/Execution/LegacyFetchAndLockRequestProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using Camunda.Worker.Client;
using Microsoft.Extensions.Options;

namespace Camunda.Worker.Execution
{
internal class LegacyFetchAndLockRequestProvider : IFetchAndLockRequestProvider
{
private readonly ITopicsProvider _topicsProvider;
private readonly FetchAndLockOptions _options;

internal LegacyFetchAndLockRequestProvider(
ITopicsProvider topicsProvider,
IOptions<FetchAndLockOptions> options
)
{
_topicsProvider = topicsProvider;
_options = options.Value;
}

public FetchAndLockRequest GetRequest()
{
var topics = _topicsProvider.GetTopics();

var fetchAndLockRequest = new FetchAndLockRequest(_options.WorkerId, _options.MaxTasks)
{
UsePriority = _options.UsePriority,
AsyncResponseTimeout = _options.AsyncResponseTimeout,
Topics = topics
};

return fetchAndLockRequest;
}
}
}
5 changes: 5 additions & 0 deletions src/Camunda.Worker/ICamundaWorkerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@ public interface ICamundaWorkerBuilder

ICamundaWorkerBuilder AddEndpointProvider<TProvider>() where TProvider : class, IEndpointProvider;

[Obsolete("Use IFetchAndLockRequestProvider and AddFetchAndLockRequestProvider instead")]
ICamundaWorkerBuilder AddTopicsProvider<TProvider>() where TProvider : class, ITopicsProvider;

ICamundaWorkerBuilder AddFetchAndLockRequestProvider(
Func<WorkerServiceOptions, IServiceProvider, IFetchAndLockRequestProvider> factory
);

ICamundaWorkerBuilder AddHandler(ExternalTaskDelegate handler, HandlerMetadata handlerMetadata);

ICamundaWorkerBuilder ConfigurePipeline(Action<IPipelineBuilder> configureAction);
Expand Down
7 changes: 6 additions & 1 deletion src/Camunda.Worker/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Camunda.Worker.Execution;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;

namespace Camunda.Worker
{
Expand All @@ -24,7 +25,11 @@ public static ICamundaWorkerBuilder AddCamundaWorker(
services.TryAddSingleton(_ => new WorkerHandlerDescriptor(ExternalTaskRouter.RouteAsync));
services.AddHostedService(provider => new WorkerHostedService(provider, numberOfWorkers));

return new CamundaWorkerBuilder(services, workerId);
return new CamundaWorkerBuilder(services, workerId)
.AddFetchAndLockRequestProvider((options, provider) => new LegacyFetchAndLockRequestProvider(
provider.GetRequiredService<ITopicsProvider>(),
provider.GetRequiredService<IOptions<FetchAndLockOptions>>()
));
}
}
}
18 changes: 18 additions & 0 deletions src/Camunda.Worker/WorkerServiceOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System.Collections.Generic;
using Camunda.Worker.Execution;

namespace Camunda.Worker
{
public class WorkerServiceOptions
{
internal WorkerServiceOptions(string workerId, IEnumerable<HandlerDescriptor> handlerDescriptors)
{
WorkerId = Guard.NotNull(workerId, nameof(workerId));
HandlerDescriptors = Guard.NotNull(handlerDescriptors, nameof(handlerDescriptors));
}

public string WorkerId { get; }

public IEnumerable<HandlerDescriptor> HandlerDescriptors { get; }
}
}
24 changes: 21 additions & 3 deletions test/Camunda.Worker.Tests/CamundaWorkerBuilderTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,19 @@ public void TestAddTopicsProvider()
{
_builder.AddTopicsProvider<TopicsProvider>();

Assert.Contains(_services, d => d.Lifetime == ServiceLifetime.Transient &&
d.ServiceType == typeof(ITopicsProvider) &&
d.ImplementationType == typeof(TopicsProvider));
using var provider = _services.BuildServiceProvider();

Assert.IsType<TopicsProvider>(provider.GetService<ITopicsProvider>());
}

[Fact]
public void TestAddFetchAndLockRequestProvider()
{
_builder.AddFetchAndLockRequestProvider((_, _) => new FetchAndLockRequestProvider());

using var provider = _services.BuildServiceProvider();

Assert.IsType<FetchAndLockRequestProvider>(provider.GetService<IFetchAndLockRequestProvider>());
}

[Fact]
Expand Down Expand Up @@ -85,5 +95,13 @@ private class TopicsProvider : ITopicsProvider
throw new NotImplementedException();
}
}

private class FetchAndLockRequestProvider : IFetchAndLockRequestProvider
{
public FetchAndLockRequest GetRequest()
{
throw new NotImplementedException();
}
}
}
}
23 changes: 10 additions & 13 deletions test/Camunda.Worker.Tests/Execution/DefaultCamundaWorkerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,25 @@ public class DefaultCamundaWorkerTest : IDisposable
{
private readonly Mock<IHandler> _handlerMock = new();
private readonly Mock<IExternalTaskClient> _clientMock = new();
private readonly Mock<ITopicsProvider> _topicsProviderMock = new();
private readonly IOptions<FetchAndLockOptions> _fetchAndLockOptions = Options.Create(new FetchAndLockOptions
{
WorkerId = "testWorker",
AsyncResponseTimeout = 5_000
});
private readonly Mock<IFetchAndLockRequestProvider> _fetchAndLockRequestProviderMock = new();
private readonly Mock<IWorkerEvents> _workerEventsMock = new();
private readonly ServiceProvider _serviceProvider;
private readonly DefaultCamundaWorker _worker;

public DefaultCamundaWorkerTest()
{
_serviceProvider = new ServiceCollection().BuildServiceProvider();
var workerEventsOptions = Options.Create(new WorkerEvents
{
OnAfterProcessingAllTasks = _workerEventsMock.Object.OnAfterProcessingAllTasks
});

_fetchAndLockRequestProviderMock.Setup(provider => provider.GetRequest())
.Returns(new FetchAndLockRequest("test"));

_worker = new DefaultCamundaWorker(
_clientMock.Object,
_topicsProviderMock.Object,
_fetchAndLockOptions,
workerEventsOptions,
_fetchAndLockRequestProviderMock.Object,
Options.Create(new WorkerEvents
{
OnAfterProcessingAllTasks = _workerEventsMock.Object.OnAfterProcessingAllTasks
}),
_serviceProvider,
new WorkerHandlerDescriptor(_handlerMock.Object.HandleAsync)
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System.Linq;
using Bogus;
using Camunda.Worker.Client;
using Microsoft.Extensions.Options;
using Moq;
using Xunit;

namespace Camunda.Worker.Execution
{
public class LegacyFetchAndLockRequestProviderTests
{
[Fact]
public void GetRequest_ShouldReturnsRequest()
{
// Arrange
var fetchAndLockOptions = new Faker<FetchAndLockOptions>()
.RuleFor(o => o.WorkerId, f => f.Lorem.Word())
.RuleFor(o => o.MaxTasks, f => f.Random.Int(1, 10))
.RuleFor(o => o.AsyncResponseTimeout, f => f.Random.Int(100, 10000))
.RuleFor(o => o.UsePriority, f => f.Random.Bool())
.Generate();

var topics = new Faker<FetchAndLockRequest.Topic>()
.CustomInstantiator(f => new FetchAndLockRequest.Topic(f.Random.Hash(), f.Random.Int(1000, 10000)))
.GenerateLazy(5)
.ToList();
var topicsProviderMock = new Mock<ITopicsProvider>();
topicsProviderMock.Setup(p => p.GetTopics()).Returns(topics);

var sut = new LegacyFetchAndLockRequestProvider(
topicsProviderMock.Object,
Options.Create(fetchAndLockOptions)
);

// Act
var request = sut.GetRequest();

// Assert
Assert.Same(topics, request.Topics);
Assert.Equal(fetchAndLockOptions.WorkerId, request.WorkerId);
Assert.Equal(fetchAndLockOptions.MaxTasks, request.MaxTasks);
Assert.Equal(fetchAndLockOptions.AsyncResponseTimeout, request.AsyncResponseTimeout);
Assert.Equal(fetchAndLockOptions.UsePriority, request.UsePriority);
}
}
}
26 changes: 11 additions & 15 deletions test/Camunda.Worker.Tests/ServiceCollectionExtensionsTest.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Camunda.Worker.Client;
using System;
using Camunda.Worker.Execution;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
Expand All @@ -19,21 +19,17 @@ public void TestAddCamundaWorker()

var fetchAndLockOptions = provider.GetRequiredService<IOptions<FetchAndLockOptions>>().Value;
Assert.Equal("testWorker", fetchAndLockOptions.WorkerId);
Assert.NotNull(provider.GetService<IOptions<WorkerEvents>>()?.Value);

var workerEvents = provider.GetRequiredService<IOptions<WorkerEvents>>().Value;
Assert.NotNull(workerEvents);

Assert.Contains(services, d => d.Lifetime == ServiceLifetime.Singleton &&
d.ServiceType == typeof(IEndpointProvider));

Assert.Contains(services, d => d.Lifetime == ServiceLifetime.Transient &&
d.ServiceType == typeof(ITopicsProvider));

Assert.Contains(services, d => d.Lifetime == ServiceLifetime.Transient &&
d.ServiceType == typeof(ICamundaWorker));

Assert.Contains(services, d => d.Lifetime == ServiceLifetime.Singleton &&
d.ServiceType == typeof(WorkerHandlerDescriptor));
Assert.Contains(services, IsRegistered(typeof(IEndpointProvider), ServiceLifetime.Singleton));
Assert.Contains(services, IsRegistered(typeof(ITopicsProvider), ServiceLifetime.Transient));
Assert.Contains(services, IsRegistered(typeof(ICamundaWorker), ServiceLifetime.Transient));
Assert.Contains(services, IsRegistered(typeof(WorkerHandlerDescriptor), ServiceLifetime.Singleton));
Assert.Contains(services, IsRegistered(typeof(IFetchAndLockRequestProvider), ServiceLifetime.Singleton));
}

private static Predicate<ServiceDescriptor> IsRegistered(Type serviceType, ServiceLifetime lifetime)
=> descriptor => descriptor.Lifetime == lifetime &&
descriptor.ServiceType == serviceType;
}
}

0 comments on commit 7f255ed

Please sign in to comment.