diff --git a/src/Camunda.Worker/CamundaWorkerBuilder.cs b/src/Camunda.Worker/CamundaWorkerBuilder.cs index cefd66a..82bfcc0 100644 --- a/src/Camunda.Worker/CamundaWorkerBuilder.cs +++ b/src/Camunda.Worker/CamundaWorkerBuilder.cs @@ -29,6 +29,20 @@ public ICamundaWorkerBuilder AddTopicsProvider() where TProvider : cl return this; } + public ICamundaWorkerBuilder AddFetchAndLockRequestProvider( + Func factory + ) + { + Services.AddSingleton(provider => + { + var handlerDescriptors = provider.GetServices(); + var options = new WorkerServiceOptions(WorkerId, handlerDescriptors); + return factory(options, provider); + }); + + return this; + } + public ICamundaWorkerBuilder AddHandler(ExternalTaskDelegate handler, HandlerMetadata handlerMetadata) { Guard.NotNull(handler, nameof(handler)); diff --git a/src/Camunda.Worker/Execution/DefaultCamundaWorker.cs b/src/Camunda.Worker/Execution/DefaultCamundaWorker.cs index c498379..3fa2270 100644 --- a/src/Camunda.Worker/Execution/DefaultCamundaWorker.cs +++ b/src/Camunda.Worker/Execution/DefaultCamundaWorker.cs @@ -14,8 +14,7 @@ namespace Camunda.Worker.Execution public sealed class DefaultCamundaWorker : ICamundaWorker { private readonly IExternalTaskClient _externalTaskClient; - private readonly ITopicsProvider _topicsProvider; - private readonly FetchAndLockOptions _fetchAndLockOptions; + private readonly IFetchAndLockRequestProvider _fetchAndLockRequestProvider; private readonly WorkerEvents _workerEvents; private readonly IServiceProvider _serviceProvider; private readonly WorkerHandlerDescriptor _workerHandlerDescriptor; @@ -23,8 +22,7 @@ public sealed class DefaultCamundaWorker : ICamundaWorker public DefaultCamundaWorker( IExternalTaskClient externalTaskClient, - ITopicsProvider topicsProvider, - IOptions fetchAndLockOptions, + IFetchAndLockRequestProvider fetchAndLockRequestProvider, IOptions workerEvents, IServiceProvider serviceProvider, WorkerHandlerDescriptor workerHandlerDescriptor, @@ -32,8 +30,7 @@ public DefaultCamundaWorker( ) { _externalTaskClient = Guard.NotNull(externalTaskClient, nameof(externalTaskClient)); - _topicsProvider = Guard.NotNull(topicsProvider, nameof(topicsProvider)); - _fetchAndLockOptions = Guard.NotNull(fetchAndLockOptions, nameof(fetchAndLockOptions)).Value; + _fetchAndLockRequestProvider = Guard.NotNull(fetchAndLockRequestProvider, nameof(fetchAndLockRequestProvider)); _workerEvents = Guard.NotNull(workerEvents, nameof(workerEvents)).Value; _serviceProvider = Guard.NotNull(serviceProvider, nameof(serviceProvider)); _workerHandlerDescriptor = Guard.NotNull(workerHandlerDescriptor, nameof(workerHandlerDescriptor)); @@ -69,7 +66,7 @@ private async Task> SelectAsync(CancellationTo try { Log.Waiting(_logger); - var fetchAndLockRequest = MakeRequestBody(); + var fetchAndLockRequest = _fetchAndLockRequestProvider.GetRequest(); var externalTasks = await _externalTaskClient.FetchAndLockAsync(fetchAndLockRequest, cancellationToken); Log.Locked(_logger, externalTasks.Count); return externalTasks; @@ -82,21 +79,6 @@ private async Task> SelectAsync(CancellationTo } } - private FetchAndLockRequest MakeRequestBody() - { - var topics = _topicsProvider.GetTopics(); - - var fetchAndLockRequest = - new FetchAndLockRequest(_fetchAndLockOptions.WorkerId, _fetchAndLockOptions.MaxTasks) - { - UsePriority = _fetchAndLockOptions.UsePriority, - AsyncResponseTimeout = _fetchAndLockOptions.AsyncResponseTimeout, - Topics = topics - }; - - return fetchAndLockRequest; - } - private async Task ProcessExternalTask(ExternalTask externalTask) { using var scope = _serviceProvider.CreateScope(); diff --git a/src/Camunda.Worker/Execution/FetchAndLockOptions.cs b/src/Camunda.Worker/Execution/FetchAndLockOptions.cs index ec2bcb2..b47d51d 100644 --- a/src/Camunda.Worker/Execution/FetchAndLockOptions.cs +++ b/src/Camunda.Worker/Execution/FetchAndLockOptions.cs @@ -1,3 +1,5 @@ +using System; + namespace Camunda.Worker.Execution { public class FetchAndLockOptions @@ -6,6 +8,7 @@ public class FetchAndLockOptions private int _maxTasks = 1; private int _asyncResponseTimeout = 10_000; + [Obsolete] public string WorkerId { get => _workerId; diff --git a/src/Camunda.Worker/Execution/IFetchAndLockRequestProvider.cs b/src/Camunda.Worker/Execution/IFetchAndLockRequestProvider.cs new file mode 100644 index 0000000..64387f4 --- /dev/null +++ b/src/Camunda.Worker/Execution/IFetchAndLockRequestProvider.cs @@ -0,0 +1,12 @@ +using Camunda.Worker.Client; + +namespace Camunda.Worker.Execution +{ + public interface IFetchAndLockRequestProvider + { + /// + /// This method is called in the worker before each "fetch and lock" operation + /// + FetchAndLockRequest GetRequest(); + } +} diff --git a/src/Camunda.Worker/Execution/ITopicsProvider.cs b/src/Camunda.Worker/Execution/ITopicsProvider.cs index ce642c3..44c9428 100644 --- a/src/Camunda.Worker/Execution/ITopicsProvider.cs +++ b/src/Camunda.Worker/Execution/ITopicsProvider.cs @@ -1,8 +1,10 @@ +using System; using System.Collections.Generic; using Camunda.Worker.Client; namespace Camunda.Worker.Execution { + [Obsolete("Use IFetchAndLockRequestProvider instead")] public interface ITopicsProvider { IReadOnlyCollection GetTopics(); diff --git a/src/Camunda.Worker/Execution/LegacyFetchAndLockRequestProvider.cs b/src/Camunda.Worker/Execution/LegacyFetchAndLockRequestProvider.cs new file mode 100644 index 0000000..69c98ef --- /dev/null +++ b/src/Camunda.Worker/Execution/LegacyFetchAndLockRequestProvider.cs @@ -0,0 +1,34 @@ +using Camunda.Worker.Client; +using Microsoft.Extensions.Options; + +namespace Camunda.Worker.Execution +{ + internal class LegacyFetchAndLockRequestProvider : IFetchAndLockRequestProvider + { + private readonly ITopicsProvider _topicsProvider; + private readonly FetchAndLockOptions _options; + + internal LegacyFetchAndLockRequestProvider( + ITopicsProvider topicsProvider, + IOptions options + ) + { + _topicsProvider = topicsProvider; + _options = options.Value; + } + + public FetchAndLockRequest GetRequest() + { + var topics = _topicsProvider.GetTopics(); + + var fetchAndLockRequest = new FetchAndLockRequest(_options.WorkerId, _options.MaxTasks) + { + UsePriority = _options.UsePriority, + AsyncResponseTimeout = _options.AsyncResponseTimeout, + Topics = topics + }; + + return fetchAndLockRequest; + } + } +} diff --git a/src/Camunda.Worker/ICamundaWorkerBuilder.cs b/src/Camunda.Worker/ICamundaWorkerBuilder.cs index 1d1845b..5c1beec 100644 --- a/src/Camunda.Worker/ICamundaWorkerBuilder.cs +++ b/src/Camunda.Worker/ICamundaWorkerBuilder.cs @@ -12,8 +12,13 @@ public interface ICamundaWorkerBuilder ICamundaWorkerBuilder AddEndpointProvider() where TProvider : class, IEndpointProvider; + [Obsolete("Use IFetchAndLockRequestProvider and AddFetchAndLockRequestProvider instead")] ICamundaWorkerBuilder AddTopicsProvider() where TProvider : class, ITopicsProvider; + ICamundaWorkerBuilder AddFetchAndLockRequestProvider( + Func factory + ); + ICamundaWorkerBuilder AddHandler(ExternalTaskDelegate handler, HandlerMetadata handlerMetadata); ICamundaWorkerBuilder ConfigurePipeline(Action configureAction); diff --git a/src/Camunda.Worker/ServiceCollectionExtensions.cs b/src/Camunda.Worker/ServiceCollectionExtensions.cs index a486611..3d00ddf 100644 --- a/src/Camunda.Worker/ServiceCollectionExtensions.cs +++ b/src/Camunda.Worker/ServiceCollectionExtensions.cs @@ -2,6 +2,7 @@ using Camunda.Worker.Execution; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Options; namespace Camunda.Worker { @@ -24,7 +25,11 @@ public static ICamundaWorkerBuilder AddCamundaWorker( services.TryAddSingleton(_ => new WorkerHandlerDescriptor(ExternalTaskRouter.RouteAsync)); services.AddHostedService(provider => new WorkerHostedService(provider, numberOfWorkers)); - return new CamundaWorkerBuilder(services, workerId); + return new CamundaWorkerBuilder(services, workerId) + .AddFetchAndLockRequestProvider((options, provider) => new LegacyFetchAndLockRequestProvider( + provider.GetRequiredService(), + provider.GetRequiredService>() + )); } } } diff --git a/src/Camunda.Worker/WorkerServiceOptions.cs b/src/Camunda.Worker/WorkerServiceOptions.cs new file mode 100644 index 0000000..35226c1 --- /dev/null +++ b/src/Camunda.Worker/WorkerServiceOptions.cs @@ -0,0 +1,18 @@ +using System.Collections.Generic; +using Camunda.Worker.Execution; + +namespace Camunda.Worker +{ + public class WorkerServiceOptions + { + internal WorkerServiceOptions(string workerId, IEnumerable handlerDescriptors) + { + WorkerId = Guard.NotNull(workerId, nameof(workerId)); + HandlerDescriptors = Guard.NotNull(handlerDescriptors, nameof(handlerDescriptors)); + } + + public string WorkerId { get; } + + public IEnumerable HandlerDescriptors { get; } + } +} diff --git a/test/Camunda.Worker.Tests/CamundaWorkerBuilderTest.cs b/test/Camunda.Worker.Tests/CamundaWorkerBuilderTest.cs index e05999e..f962238 100644 --- a/test/Camunda.Worker.Tests/CamundaWorkerBuilderTest.cs +++ b/test/Camunda.Worker.Tests/CamundaWorkerBuilderTest.cs @@ -45,9 +45,19 @@ public void TestAddTopicsProvider() { _builder.AddTopicsProvider(); - Assert.Contains(_services, d => d.Lifetime == ServiceLifetime.Transient && - d.ServiceType == typeof(ITopicsProvider) && - d.ImplementationType == typeof(TopicsProvider)); + using var provider = _services.BuildServiceProvider(); + + Assert.IsType(provider.GetService()); + } + + [Fact] + public void TestAddFetchAndLockRequestProvider() + { + _builder.AddFetchAndLockRequestProvider((_, _) => new FetchAndLockRequestProvider()); + + using var provider = _services.BuildServiceProvider(); + + Assert.IsType(provider.GetService()); } [Fact] @@ -85,5 +95,13 @@ private class TopicsProvider : ITopicsProvider throw new NotImplementedException(); } } + + private class FetchAndLockRequestProvider : IFetchAndLockRequestProvider + { + public FetchAndLockRequest GetRequest() + { + throw new NotImplementedException(); + } + } } } diff --git a/test/Camunda.Worker.Tests/Execution/DefaultCamundaWorkerTest.cs b/test/Camunda.Worker.Tests/Execution/DefaultCamundaWorkerTest.cs index 883a2b9..b678ad4 100644 --- a/test/Camunda.Worker.Tests/Execution/DefaultCamundaWorkerTest.cs +++ b/test/Camunda.Worker.Tests/Execution/DefaultCamundaWorkerTest.cs @@ -16,12 +16,7 @@ public class DefaultCamundaWorkerTest : IDisposable { private readonly Mock _handlerMock = new(); private readonly Mock _clientMock = new(); - private readonly Mock _topicsProviderMock = new(); - private readonly IOptions _fetchAndLockOptions = Options.Create(new FetchAndLockOptions - { - WorkerId = "testWorker", - AsyncResponseTimeout = 5_000 - }); + private readonly Mock _fetchAndLockRequestProviderMock = new(); private readonly Mock _workerEventsMock = new(); private readonly ServiceProvider _serviceProvider; private readonly DefaultCamundaWorker _worker; @@ -29,15 +24,17 @@ public class DefaultCamundaWorkerTest : IDisposable public DefaultCamundaWorkerTest() { _serviceProvider = new ServiceCollection().BuildServiceProvider(); - var workerEventsOptions = Options.Create(new WorkerEvents - { - OnAfterProcessingAllTasks = _workerEventsMock.Object.OnAfterProcessingAllTasks - }); + + _fetchAndLockRequestProviderMock.Setup(provider => provider.GetRequest()) + .Returns(new FetchAndLockRequest("test")); + _worker = new DefaultCamundaWorker( _clientMock.Object, - _topicsProviderMock.Object, - _fetchAndLockOptions, - workerEventsOptions, + _fetchAndLockRequestProviderMock.Object, + Options.Create(new WorkerEvents + { + OnAfterProcessingAllTasks = _workerEventsMock.Object.OnAfterProcessingAllTasks + }), _serviceProvider, new WorkerHandlerDescriptor(_handlerMock.Object.HandleAsync) ); diff --git a/test/Camunda.Worker.Tests/Execution/LegacyFetchAndLockRequestProviderTests.cs b/test/Camunda.Worker.Tests/Execution/LegacyFetchAndLockRequestProviderTests.cs new file mode 100644 index 0000000..e83c880 --- /dev/null +++ b/test/Camunda.Worker.Tests/Execution/LegacyFetchAndLockRequestProviderTests.cs @@ -0,0 +1,46 @@ +using System.Linq; +using Bogus; +using Camunda.Worker.Client; +using Microsoft.Extensions.Options; +using Moq; +using Xunit; + +namespace Camunda.Worker.Execution +{ + public class LegacyFetchAndLockRequestProviderTests + { + [Fact] + public void GetRequest_ShouldReturnsRequest() + { + // Arrange + var fetchAndLockOptions = new Faker() + .RuleFor(o => o.WorkerId, f => f.Lorem.Word()) + .RuleFor(o => o.MaxTasks, f => f.Random.Int(1, 10)) + .RuleFor(o => o.AsyncResponseTimeout, f => f.Random.Int(100, 10000)) + .RuleFor(o => o.UsePriority, f => f.Random.Bool()) + .Generate(); + + var topics = new Faker() + .CustomInstantiator(f => new FetchAndLockRequest.Topic(f.Random.Hash(), f.Random.Int(1000, 10000))) + .GenerateLazy(5) + .ToList(); + var topicsProviderMock = new Mock(); + topicsProviderMock.Setup(p => p.GetTopics()).Returns(topics); + + var sut = new LegacyFetchAndLockRequestProvider( + topicsProviderMock.Object, + Options.Create(fetchAndLockOptions) + ); + + // Act + var request = sut.GetRequest(); + + // Assert + Assert.Same(topics, request.Topics); + Assert.Equal(fetchAndLockOptions.WorkerId, request.WorkerId); + Assert.Equal(fetchAndLockOptions.MaxTasks, request.MaxTasks); + Assert.Equal(fetchAndLockOptions.AsyncResponseTimeout, request.AsyncResponseTimeout); + Assert.Equal(fetchAndLockOptions.UsePriority, request.UsePriority); + } + } +} diff --git a/test/Camunda.Worker.Tests/ServiceCollectionExtensionsTest.cs b/test/Camunda.Worker.Tests/ServiceCollectionExtensionsTest.cs index 71a0ee1..4408415 100644 --- a/test/Camunda.Worker.Tests/ServiceCollectionExtensionsTest.cs +++ b/test/Camunda.Worker.Tests/ServiceCollectionExtensionsTest.cs @@ -1,4 +1,4 @@ -using Camunda.Worker.Client; +using System; using Camunda.Worker.Execution; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; @@ -19,21 +19,17 @@ public void TestAddCamundaWorker() var fetchAndLockOptions = provider.GetRequiredService>().Value; Assert.Equal("testWorker", fetchAndLockOptions.WorkerId); + Assert.NotNull(provider.GetService>()?.Value); - var workerEvents = provider.GetRequiredService>().Value; - Assert.NotNull(workerEvents); - - Assert.Contains(services, d => d.Lifetime == ServiceLifetime.Singleton && - d.ServiceType == typeof(IEndpointProvider)); - - Assert.Contains(services, d => d.Lifetime == ServiceLifetime.Transient && - d.ServiceType == typeof(ITopicsProvider)); - - Assert.Contains(services, d => d.Lifetime == ServiceLifetime.Transient && - d.ServiceType == typeof(ICamundaWorker)); - - Assert.Contains(services, d => d.Lifetime == ServiceLifetime.Singleton && - d.ServiceType == typeof(WorkerHandlerDescriptor)); + Assert.Contains(services, IsRegistered(typeof(IEndpointProvider), ServiceLifetime.Singleton)); + Assert.Contains(services, IsRegistered(typeof(ITopicsProvider), ServiceLifetime.Transient)); + Assert.Contains(services, IsRegistered(typeof(ICamundaWorker), ServiceLifetime.Transient)); + Assert.Contains(services, IsRegistered(typeof(WorkerHandlerDescriptor), ServiceLifetime.Singleton)); + Assert.Contains(services, IsRegistered(typeof(IFetchAndLockRequestProvider), ServiceLifetime.Singleton)); } + + private static Predicate IsRegistered(Type serviceType, ServiceLifetime lifetime) + => descriptor => descriptor.Lifetime == lifetime && + descriptor.ServiceType == serviceType; } }