From 3d8198c36a97132c390d72e4932c68ad2e6d2e85 Mon Sep 17 00:00:00 2001 From: Alexey Malinin Date: Tue, 9 Nov 2021 04:02:43 +0700 Subject: [PATCH 1/5] add `WorkerServiceOptions` class --- src/Camunda.Worker/WorkerServiceOptions.cs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 src/Camunda.Worker/WorkerServiceOptions.cs diff --git a/src/Camunda.Worker/WorkerServiceOptions.cs b/src/Camunda.Worker/WorkerServiceOptions.cs new file mode 100644 index 0000000..35226c1 --- /dev/null +++ b/src/Camunda.Worker/WorkerServiceOptions.cs @@ -0,0 +1,18 @@ +using System.Collections.Generic; +using Camunda.Worker.Execution; + +namespace Camunda.Worker +{ + public class WorkerServiceOptions + { + internal WorkerServiceOptions(string workerId, IEnumerable handlerDescriptors) + { + WorkerId = Guard.NotNull(workerId, nameof(workerId)); + HandlerDescriptors = Guard.NotNull(handlerDescriptors, nameof(handlerDescriptors)); + } + + public string WorkerId { get; } + + public IEnumerable HandlerDescriptors { get; } + } +} From 93309b492f2c0be40314363d650e4366130c1591 Mon Sep 17 00:00:00 2001 From: Alexey Malinin Date: Tue, 9 Nov 2021 04:21:54 +0700 Subject: [PATCH 2/5] add `IFetchAndLockRequestProvider` with implementation for current users --- .../Execution/IFetchAndLockRequestProvider.cs | 12 +++++ .../LegacyFetchAndLockRequestProvider.cs | 34 ++++++++++++++ .../LegacyFetchAndLockRequestProviderTests.cs | 46 +++++++++++++++++++ 3 files changed, 92 insertions(+) create mode 100644 src/Camunda.Worker/Execution/IFetchAndLockRequestProvider.cs create mode 100644 src/Camunda.Worker/Execution/LegacyFetchAndLockRequestProvider.cs create mode 100644 test/Camunda.Worker.Tests/Execution/LegacyFetchAndLockRequestProviderTests.cs diff --git a/src/Camunda.Worker/Execution/IFetchAndLockRequestProvider.cs b/src/Camunda.Worker/Execution/IFetchAndLockRequestProvider.cs new file mode 100644 index 0000000..64387f4 --- /dev/null +++ b/src/Camunda.Worker/Execution/IFetchAndLockRequestProvider.cs @@ -0,0 +1,12 @@ +using Camunda.Worker.Client; + +namespace Camunda.Worker.Execution +{ + public interface IFetchAndLockRequestProvider + { + /// + /// This method is called in the worker before each "fetch and lock" operation + /// + FetchAndLockRequest GetRequest(); + } +} diff --git a/src/Camunda.Worker/Execution/LegacyFetchAndLockRequestProvider.cs b/src/Camunda.Worker/Execution/LegacyFetchAndLockRequestProvider.cs new file mode 100644 index 0000000..69c98ef --- /dev/null +++ b/src/Camunda.Worker/Execution/LegacyFetchAndLockRequestProvider.cs @@ -0,0 +1,34 @@ +using Camunda.Worker.Client; +using Microsoft.Extensions.Options; + +namespace Camunda.Worker.Execution +{ + internal class LegacyFetchAndLockRequestProvider : IFetchAndLockRequestProvider + { + private readonly ITopicsProvider _topicsProvider; + private readonly FetchAndLockOptions _options; + + internal LegacyFetchAndLockRequestProvider( + ITopicsProvider topicsProvider, + IOptions options + ) + { + _topicsProvider = topicsProvider; + _options = options.Value; + } + + public FetchAndLockRequest GetRequest() + { + var topics = _topicsProvider.GetTopics(); + + var fetchAndLockRequest = new FetchAndLockRequest(_options.WorkerId, _options.MaxTasks) + { + UsePriority = _options.UsePriority, + AsyncResponseTimeout = _options.AsyncResponseTimeout, + Topics = topics + }; + + return fetchAndLockRequest; + } + } +} diff --git a/test/Camunda.Worker.Tests/Execution/LegacyFetchAndLockRequestProviderTests.cs b/test/Camunda.Worker.Tests/Execution/LegacyFetchAndLockRequestProviderTests.cs new file mode 100644 index 0000000..e83c880 --- /dev/null +++ b/test/Camunda.Worker.Tests/Execution/LegacyFetchAndLockRequestProviderTests.cs @@ -0,0 +1,46 @@ +using System.Linq; +using Bogus; +using Camunda.Worker.Client; +using Microsoft.Extensions.Options; +using Moq; +using Xunit; + +namespace Camunda.Worker.Execution +{ + public class LegacyFetchAndLockRequestProviderTests + { + [Fact] + public void GetRequest_ShouldReturnsRequest() + { + // Arrange + var fetchAndLockOptions = new Faker() + .RuleFor(o => o.WorkerId, f => f.Lorem.Word()) + .RuleFor(o => o.MaxTasks, f => f.Random.Int(1, 10)) + .RuleFor(o => o.AsyncResponseTimeout, f => f.Random.Int(100, 10000)) + .RuleFor(o => o.UsePriority, f => f.Random.Bool()) + .Generate(); + + var topics = new Faker() + .CustomInstantiator(f => new FetchAndLockRequest.Topic(f.Random.Hash(), f.Random.Int(1000, 10000))) + .GenerateLazy(5) + .ToList(); + var topicsProviderMock = new Mock(); + topicsProviderMock.Setup(p => p.GetTopics()).Returns(topics); + + var sut = new LegacyFetchAndLockRequestProvider( + topicsProviderMock.Object, + Options.Create(fetchAndLockOptions) + ); + + // Act + var request = sut.GetRequest(); + + // Assert + Assert.Same(topics, request.Topics); + Assert.Equal(fetchAndLockOptions.WorkerId, request.WorkerId); + Assert.Equal(fetchAndLockOptions.MaxTasks, request.MaxTasks); + Assert.Equal(fetchAndLockOptions.AsyncResponseTimeout, request.AsyncResponseTimeout); + Assert.Equal(fetchAndLockOptions.UsePriority, request.UsePriority); + } + } +} From aeef239bb8ec06dc3ee3339c913f8d755c7df519 Mon Sep 17 00:00:00 2001 From: Alexey Malinin Date: Wed, 10 Nov 2021 03:11:26 +0700 Subject: [PATCH 3/5] use new service in the worker and add method to register it --- src/Camunda.Worker/CamundaWorkerBuilder.cs | 14 ++++++++++ .../Execution/DefaultCamundaWorker.cs | 26 +++---------------- src/Camunda.Worker/ICamundaWorkerBuilder.cs | 4 +++ .../ServiceCollectionExtensions.cs | 7 ++++- .../CamundaWorkerBuilderTest.cs | 24 ++++++++++++++--- .../Execution/DefaultCamundaWorkerTest.cs | 23 +++++++--------- .../ServiceCollectionExtensionsTest.cs | 26 ++++++++----------- 7 files changed, 70 insertions(+), 54 deletions(-) diff --git a/src/Camunda.Worker/CamundaWorkerBuilder.cs b/src/Camunda.Worker/CamundaWorkerBuilder.cs index cefd66a..82bfcc0 100644 --- a/src/Camunda.Worker/CamundaWorkerBuilder.cs +++ b/src/Camunda.Worker/CamundaWorkerBuilder.cs @@ -29,6 +29,20 @@ public ICamundaWorkerBuilder AddTopicsProvider() where TProvider : cl return this; } + public ICamundaWorkerBuilder AddFetchAndLockRequestProvider( + Func factory + ) + { + Services.AddSingleton(provider => + { + var handlerDescriptors = provider.GetServices(); + var options = new WorkerServiceOptions(WorkerId, handlerDescriptors); + return factory(options, provider); + }); + + return this; + } + public ICamundaWorkerBuilder AddHandler(ExternalTaskDelegate handler, HandlerMetadata handlerMetadata) { Guard.NotNull(handler, nameof(handler)); diff --git a/src/Camunda.Worker/Execution/DefaultCamundaWorker.cs b/src/Camunda.Worker/Execution/DefaultCamundaWorker.cs index c498379..3fa2270 100644 --- a/src/Camunda.Worker/Execution/DefaultCamundaWorker.cs +++ b/src/Camunda.Worker/Execution/DefaultCamundaWorker.cs @@ -14,8 +14,7 @@ namespace Camunda.Worker.Execution public sealed class DefaultCamundaWorker : ICamundaWorker { private readonly IExternalTaskClient _externalTaskClient; - private readonly ITopicsProvider _topicsProvider; - private readonly FetchAndLockOptions _fetchAndLockOptions; + private readonly IFetchAndLockRequestProvider _fetchAndLockRequestProvider; private readonly WorkerEvents _workerEvents; private readonly IServiceProvider _serviceProvider; private readonly WorkerHandlerDescriptor _workerHandlerDescriptor; @@ -23,8 +22,7 @@ public sealed class DefaultCamundaWorker : ICamundaWorker public DefaultCamundaWorker( IExternalTaskClient externalTaskClient, - ITopicsProvider topicsProvider, - IOptions fetchAndLockOptions, + IFetchAndLockRequestProvider fetchAndLockRequestProvider, IOptions workerEvents, IServiceProvider serviceProvider, WorkerHandlerDescriptor workerHandlerDescriptor, @@ -32,8 +30,7 @@ public DefaultCamundaWorker( ) { _externalTaskClient = Guard.NotNull(externalTaskClient, nameof(externalTaskClient)); - _topicsProvider = Guard.NotNull(topicsProvider, nameof(topicsProvider)); - _fetchAndLockOptions = Guard.NotNull(fetchAndLockOptions, nameof(fetchAndLockOptions)).Value; + _fetchAndLockRequestProvider = Guard.NotNull(fetchAndLockRequestProvider, nameof(fetchAndLockRequestProvider)); _workerEvents = Guard.NotNull(workerEvents, nameof(workerEvents)).Value; _serviceProvider = Guard.NotNull(serviceProvider, nameof(serviceProvider)); _workerHandlerDescriptor = Guard.NotNull(workerHandlerDescriptor, nameof(workerHandlerDescriptor)); @@ -69,7 +66,7 @@ private async Task> SelectAsync(CancellationTo try { Log.Waiting(_logger); - var fetchAndLockRequest = MakeRequestBody(); + var fetchAndLockRequest = _fetchAndLockRequestProvider.GetRequest(); var externalTasks = await _externalTaskClient.FetchAndLockAsync(fetchAndLockRequest, cancellationToken); Log.Locked(_logger, externalTasks.Count); return externalTasks; @@ -82,21 +79,6 @@ private async Task> SelectAsync(CancellationTo } } - private FetchAndLockRequest MakeRequestBody() - { - var topics = _topicsProvider.GetTopics(); - - var fetchAndLockRequest = - new FetchAndLockRequest(_fetchAndLockOptions.WorkerId, _fetchAndLockOptions.MaxTasks) - { - UsePriority = _fetchAndLockOptions.UsePriority, - AsyncResponseTimeout = _fetchAndLockOptions.AsyncResponseTimeout, - Topics = topics - }; - - return fetchAndLockRequest; - } - private async Task ProcessExternalTask(ExternalTask externalTask) { using var scope = _serviceProvider.CreateScope(); diff --git a/src/Camunda.Worker/ICamundaWorkerBuilder.cs b/src/Camunda.Worker/ICamundaWorkerBuilder.cs index 1d1845b..581b9fd 100644 --- a/src/Camunda.Worker/ICamundaWorkerBuilder.cs +++ b/src/Camunda.Worker/ICamundaWorkerBuilder.cs @@ -14,6 +14,10 @@ public interface ICamundaWorkerBuilder ICamundaWorkerBuilder AddTopicsProvider() where TProvider : class, ITopicsProvider; + ICamundaWorkerBuilder AddFetchAndLockRequestProvider( + Func factory + ); + ICamundaWorkerBuilder AddHandler(ExternalTaskDelegate handler, HandlerMetadata handlerMetadata); ICamundaWorkerBuilder ConfigurePipeline(Action configureAction); diff --git a/src/Camunda.Worker/ServiceCollectionExtensions.cs b/src/Camunda.Worker/ServiceCollectionExtensions.cs index a486611..3d00ddf 100644 --- a/src/Camunda.Worker/ServiceCollectionExtensions.cs +++ b/src/Camunda.Worker/ServiceCollectionExtensions.cs @@ -2,6 +2,7 @@ using Camunda.Worker.Execution; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Options; namespace Camunda.Worker { @@ -24,7 +25,11 @@ public static ICamundaWorkerBuilder AddCamundaWorker( services.TryAddSingleton(_ => new WorkerHandlerDescriptor(ExternalTaskRouter.RouteAsync)); services.AddHostedService(provider => new WorkerHostedService(provider, numberOfWorkers)); - return new CamundaWorkerBuilder(services, workerId); + return new CamundaWorkerBuilder(services, workerId) + .AddFetchAndLockRequestProvider((options, provider) => new LegacyFetchAndLockRequestProvider( + provider.GetRequiredService(), + provider.GetRequiredService>() + )); } } } diff --git a/test/Camunda.Worker.Tests/CamundaWorkerBuilderTest.cs b/test/Camunda.Worker.Tests/CamundaWorkerBuilderTest.cs index e05999e..f962238 100644 --- a/test/Camunda.Worker.Tests/CamundaWorkerBuilderTest.cs +++ b/test/Camunda.Worker.Tests/CamundaWorkerBuilderTest.cs @@ -45,9 +45,19 @@ public void TestAddTopicsProvider() { _builder.AddTopicsProvider(); - Assert.Contains(_services, d => d.Lifetime == ServiceLifetime.Transient && - d.ServiceType == typeof(ITopicsProvider) && - d.ImplementationType == typeof(TopicsProvider)); + using var provider = _services.BuildServiceProvider(); + + Assert.IsType(provider.GetService()); + } + + [Fact] + public void TestAddFetchAndLockRequestProvider() + { + _builder.AddFetchAndLockRequestProvider((_, _) => new FetchAndLockRequestProvider()); + + using var provider = _services.BuildServiceProvider(); + + Assert.IsType(provider.GetService()); } [Fact] @@ -85,5 +95,13 @@ private class TopicsProvider : ITopicsProvider throw new NotImplementedException(); } } + + private class FetchAndLockRequestProvider : IFetchAndLockRequestProvider + { + public FetchAndLockRequest GetRequest() + { + throw new NotImplementedException(); + } + } } } diff --git a/test/Camunda.Worker.Tests/Execution/DefaultCamundaWorkerTest.cs b/test/Camunda.Worker.Tests/Execution/DefaultCamundaWorkerTest.cs index 883a2b9..b678ad4 100644 --- a/test/Camunda.Worker.Tests/Execution/DefaultCamundaWorkerTest.cs +++ b/test/Camunda.Worker.Tests/Execution/DefaultCamundaWorkerTest.cs @@ -16,12 +16,7 @@ public class DefaultCamundaWorkerTest : IDisposable { private readonly Mock _handlerMock = new(); private readonly Mock _clientMock = new(); - private readonly Mock _topicsProviderMock = new(); - private readonly IOptions _fetchAndLockOptions = Options.Create(new FetchAndLockOptions - { - WorkerId = "testWorker", - AsyncResponseTimeout = 5_000 - }); + private readonly Mock _fetchAndLockRequestProviderMock = new(); private readonly Mock _workerEventsMock = new(); private readonly ServiceProvider _serviceProvider; private readonly DefaultCamundaWorker _worker; @@ -29,15 +24,17 @@ public class DefaultCamundaWorkerTest : IDisposable public DefaultCamundaWorkerTest() { _serviceProvider = new ServiceCollection().BuildServiceProvider(); - var workerEventsOptions = Options.Create(new WorkerEvents - { - OnAfterProcessingAllTasks = _workerEventsMock.Object.OnAfterProcessingAllTasks - }); + + _fetchAndLockRequestProviderMock.Setup(provider => provider.GetRequest()) + .Returns(new FetchAndLockRequest("test")); + _worker = new DefaultCamundaWorker( _clientMock.Object, - _topicsProviderMock.Object, - _fetchAndLockOptions, - workerEventsOptions, + _fetchAndLockRequestProviderMock.Object, + Options.Create(new WorkerEvents + { + OnAfterProcessingAllTasks = _workerEventsMock.Object.OnAfterProcessingAllTasks + }), _serviceProvider, new WorkerHandlerDescriptor(_handlerMock.Object.HandleAsync) ); diff --git a/test/Camunda.Worker.Tests/ServiceCollectionExtensionsTest.cs b/test/Camunda.Worker.Tests/ServiceCollectionExtensionsTest.cs index 71a0ee1..4408415 100644 --- a/test/Camunda.Worker.Tests/ServiceCollectionExtensionsTest.cs +++ b/test/Camunda.Worker.Tests/ServiceCollectionExtensionsTest.cs @@ -1,4 +1,4 @@ -using Camunda.Worker.Client; +using System; using Camunda.Worker.Execution; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; @@ -19,21 +19,17 @@ public void TestAddCamundaWorker() var fetchAndLockOptions = provider.GetRequiredService>().Value; Assert.Equal("testWorker", fetchAndLockOptions.WorkerId); + Assert.NotNull(provider.GetService>()?.Value); - var workerEvents = provider.GetRequiredService>().Value; - Assert.NotNull(workerEvents); - - Assert.Contains(services, d => d.Lifetime == ServiceLifetime.Singleton && - d.ServiceType == typeof(IEndpointProvider)); - - Assert.Contains(services, d => d.Lifetime == ServiceLifetime.Transient && - d.ServiceType == typeof(ITopicsProvider)); - - Assert.Contains(services, d => d.Lifetime == ServiceLifetime.Transient && - d.ServiceType == typeof(ICamundaWorker)); - - Assert.Contains(services, d => d.Lifetime == ServiceLifetime.Singleton && - d.ServiceType == typeof(WorkerHandlerDescriptor)); + Assert.Contains(services, IsRegistered(typeof(IEndpointProvider), ServiceLifetime.Singleton)); + Assert.Contains(services, IsRegistered(typeof(ITopicsProvider), ServiceLifetime.Transient)); + Assert.Contains(services, IsRegistered(typeof(ICamundaWorker), ServiceLifetime.Transient)); + Assert.Contains(services, IsRegistered(typeof(WorkerHandlerDescriptor), ServiceLifetime.Singleton)); + Assert.Contains(services, IsRegistered(typeof(IFetchAndLockRequestProvider), ServiceLifetime.Singleton)); } + + private static Predicate IsRegistered(Type serviceType, ServiceLifetime lifetime) + => descriptor => descriptor.Lifetime == lifetime && + descriptor.ServiceType == serviceType; } } From bd551840e555cb68ccc13355915e796904ba6a38 Mon Sep 17 00:00:00 2001 From: Alexey Malinin Date: Wed, 10 Nov 2021 03:31:21 +0700 Subject: [PATCH 4/5] mark interfaces and methods that will be removed as obsolete --- src/Camunda.Worker/Execution/FetchAndLockOptions.cs | 3 +++ src/Camunda.Worker/Execution/ITopicsProvider.cs | 2 ++ src/Camunda.Worker/ICamundaWorkerBuilder.cs | 1 + 3 files changed, 6 insertions(+) diff --git a/src/Camunda.Worker/Execution/FetchAndLockOptions.cs b/src/Camunda.Worker/Execution/FetchAndLockOptions.cs index ec2bcb2..482ee4d 100644 --- a/src/Camunda.Worker/Execution/FetchAndLockOptions.cs +++ b/src/Camunda.Worker/Execution/FetchAndLockOptions.cs @@ -1,5 +1,8 @@ +using System; + namespace Camunda.Worker.Execution { + [Obsolete("Use IFetchAndLockRequestProvider instead")] public class FetchAndLockOptions { private string _workerId = string.Empty; diff --git a/src/Camunda.Worker/Execution/ITopicsProvider.cs b/src/Camunda.Worker/Execution/ITopicsProvider.cs index ce642c3..44c9428 100644 --- a/src/Camunda.Worker/Execution/ITopicsProvider.cs +++ b/src/Camunda.Worker/Execution/ITopicsProvider.cs @@ -1,8 +1,10 @@ +using System; using System.Collections.Generic; using Camunda.Worker.Client; namespace Camunda.Worker.Execution { + [Obsolete("Use IFetchAndLockRequestProvider instead")] public interface ITopicsProvider { IReadOnlyCollection GetTopics(); diff --git a/src/Camunda.Worker/ICamundaWorkerBuilder.cs b/src/Camunda.Worker/ICamundaWorkerBuilder.cs index 581b9fd..5c1beec 100644 --- a/src/Camunda.Worker/ICamundaWorkerBuilder.cs +++ b/src/Camunda.Worker/ICamundaWorkerBuilder.cs @@ -12,6 +12,7 @@ public interface ICamundaWorkerBuilder ICamundaWorkerBuilder AddEndpointProvider() where TProvider : class, IEndpointProvider; + [Obsolete("Use IFetchAndLockRequestProvider and AddFetchAndLockRequestProvider instead")] ICamundaWorkerBuilder AddTopicsProvider() where TProvider : class, ITopicsProvider; ICamundaWorkerBuilder AddFetchAndLockRequestProvider( From 40c2253a5fd45fc575c2a6a8de60c211614bec79 Mon Sep 17 00:00:00 2001 From: Alexey Malinin Date: Fri, 12 Nov 2021 01:41:40 +0700 Subject: [PATCH 5/5] deprecate only `WorkerId` in `FetchAndLockOptions` --- src/Camunda.Worker/Execution/FetchAndLockOptions.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Camunda.Worker/Execution/FetchAndLockOptions.cs b/src/Camunda.Worker/Execution/FetchAndLockOptions.cs index 482ee4d..b47d51d 100644 --- a/src/Camunda.Worker/Execution/FetchAndLockOptions.cs +++ b/src/Camunda.Worker/Execution/FetchAndLockOptions.cs @@ -2,13 +2,13 @@ namespace Camunda.Worker.Execution { - [Obsolete("Use IFetchAndLockRequestProvider instead")] public class FetchAndLockOptions { private string _workerId = string.Empty; private int _maxTasks = 1; private int _asyncResponseTimeout = 10_000; + [Obsolete] public string WorkerId { get => _workerId;