Skip to content

Commit 33ae740

Browse files
authored
Merge pull request #24 from AMalininHere/change-selector-api
2 parents 59dcdf9 + 3ff2111 commit 33ae740

File tree

6 files changed

+26
-28
lines changed

6 files changed

+26
-28
lines changed

src/Camunda.Worker/Execution/DefaultCamundaWorker.cs

+4-12
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,18 @@ namespace Camunda.Worker.Execution
1010
{
1111
public sealed class DefaultCamundaWorker : ICamundaWorker
1212
{
13-
private readonly ITopicsProvider _topicsProvider;
1413
private readonly IExternalTaskSelector _selector;
1514
private readonly IContextFactory _contextFactory;
1615
private readonly PipelineDescriptor _pipelineDescriptor;
1716
private readonly ILogger<DefaultCamundaWorker> _logger;
1817

19-
public DefaultCamundaWorker(ITopicsProvider topicsProvider,
18+
public DefaultCamundaWorker(
2019
IExternalTaskSelector selector,
2120
IContextFactory contextFactory,
2221
PipelineDescriptor pipelineDescriptor,
23-
ILogger<DefaultCamundaWorker>? logger = null)
22+
ILogger<DefaultCamundaWorker>? logger = null
23+
)
2424
{
25-
_topicsProvider = Guard.NotNull(topicsProvider, nameof(topicsProvider));
2625
_selector = Guard.NotNull(selector, nameof(selector));
2726
_contextFactory = Guard.NotNull(contextFactory, nameof(contextFactory));
2827
_pipelineDescriptor = Guard.NotNull(pipelineDescriptor, nameof(pipelineDescriptor));
@@ -33,7 +32,7 @@ public async Task Run(CancellationToken cancellationToken)
3332
{
3433
while (!cancellationToken.IsCancellationRequested)
3534
{
36-
var externalTasks = await SelectExternalTasks(cancellationToken);
35+
var externalTasks = await _selector.SelectAsync(cancellationToken);
3736

3837
var executableTasks = externalTasks
3938
.Select(ProcessExternalTask)
@@ -43,13 +42,6 @@ public async Task Run(CancellationToken cancellationToken)
4342
}
4443
}
4544

46-
private Task<IReadOnlyCollection<ExternalTask>> SelectExternalTasks(CancellationToken cancellationToken)
47-
{
48-
var topics = _topicsProvider.GetTopics();
49-
var selectedTasks = _selector.SelectAsync(topics, cancellationToken);
50-
return selectedTasks;
51-
}
52-
5345
private async Task ProcessExternalTask(ExternalTask externalTask)
5446
{
5547
using var context = _contextFactory.MakeContext(externalTask);

src/Camunda.Worker/Execution/ExternalTaskSelector.cs

+15-3
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,19 @@ namespace Camunda.Worker.Execution
1212
public sealed class ExternalTaskSelector : IExternalTaskSelector
1313
{
1414
private readonly IExternalTaskClient _client;
15+
private readonly ITopicsProvider _topicsProvider;
1516
private readonly CamundaWorkerOptions _options;
1617
private readonly ILogger<ExternalTaskSelector> _logger;
1718

1819
public ExternalTaskSelector(
1920
IExternalTaskClient client,
21+
ITopicsProvider topicsProvider,
2022
IOptions<CamundaWorkerOptions> options,
2123
ILogger<ExternalTaskSelector>? logger = null
2224
)
2325
{
2426
_client = Guard.NotNull(client, nameof(client));
27+
_topicsProvider = Guard.NotNull(topicsProvider, nameof(topicsProvider));
2528
_options = Guard.NotNull(options, nameof(options)).Value;
2629
_logger = logger ?? NullLogger<ExternalTaskSelector>.Instance;
2730
}
@@ -30,25 +33,34 @@ public async Task<IReadOnlyCollection<ExternalTask>> SelectAsync(
3033
IReadOnlyCollection<FetchAndLockRequest.Topic> topics,
3134
CancellationToken cancellationToken = default
3235
)
36+
{
37+
return await SelectAsync(cancellationToken);
38+
}
39+
40+
public async Task<IReadOnlyCollection<ExternalTask>> SelectAsync(
41+
CancellationToken cancellationToken = default
42+
)
3343
{
3444
try
3545
{
3646
_logger.LogDebug("Waiting for external task");
37-
var fetchAndLockRequest = MakeRequestBody(topics);
47+
var fetchAndLockRequest = MakeRequestBody();
3848
var externalTasks = await PerformSelection(fetchAndLockRequest, cancellationToken);
3949
_logger.LogDebug("Locked {Count} external tasks", externalTasks.Count);
4050
return externalTasks;
4151
}
4252
catch (Exception e) when (!cancellationToken.IsCancellationRequested)
4353
{
44-
_logger.LogWarning(e,"Failed receiving of external tasks. Reason: \"{Reason}\"", e.Message);
54+
_logger.LogWarning(e, "Failed receiving of external tasks. Reason: \"{Reason}\"", e.Message);
4555
await DelayOnFailure(cancellationToken);
4656
return Array.Empty<ExternalTask>();
4757
}
4858
}
4959

50-
private FetchAndLockRequest MakeRequestBody(IReadOnlyCollection<FetchAndLockRequest.Topic> topics)
60+
private FetchAndLockRequest MakeRequestBody()
5161
{
62+
var topics = _topicsProvider.GetTopics();
63+
5264
var fetchAndLockRequest = new FetchAndLockRequest(_options.WorkerId)
5365
{
5466
UsePriority = true,
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
11
using System.Collections.Generic;
22
using System.Threading;
33
using System.Threading.Tasks;
4-
using Camunda.Worker.Client;
54

65
namespace Camunda.Worker.Execution
76
{
87
public interface IExternalTaskSelector
98
{
10-
Task<IReadOnlyCollection<ExternalTask>> SelectAsync(
11-
IReadOnlyCollection<FetchAndLockRequest.Topic> topics,
12-
CancellationToken cancellationToken = default
13-
);
9+
Task<IReadOnlyCollection<ExternalTask>> SelectAsync(CancellationToken cancellationToken = default);
1410
}
1511
}

test/Camunda.Worker.Tests/CamundaWorkerBuilderTest.cs

-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ private class TopicsProvider : ITopicsProvider
105105
private class ExternalTaskSelector : IExternalTaskSelector
106106
{
107107
public Task<IReadOnlyCollection<ExternalTask>> SelectAsync(
108-
IReadOnlyCollection<FetchAndLockRequest.Topic> topics,
109108
CancellationToken cancellationToken = default
110109
)
111110
{

test/Camunda.Worker.Tests/Execution/DefaultCamundaWorkerTest.cs

-6
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,17 @@ namespace Camunda.Worker.Execution
1111
public class DefaultCamundaWorkerTest
1212
{
1313
private readonly Mock<IExternalTaskRouter> _routerMock = new Mock<IExternalTaskRouter>();
14-
private readonly Mock<ITopicsProvider> _topicsProviderMock = new Mock<ITopicsProvider>();
1514
private readonly Mock<IExternalTaskSelector> _selectorMock = new Mock<IExternalTaskSelector>();
1615
private readonly Mock<IContextFactory> _contextFactoryMock = new Mock<IContextFactory>();
1716
private readonly DefaultCamundaWorker _worker;
1817

1918
public DefaultCamundaWorkerTest()
2019
{
21-
_topicsProviderMock.Setup(provider => provider.GetTopics())
22-
.Returns(Array.Empty<FetchAndLockRequest.Topic>());
23-
2420
var contextMock = new Mock<IExternalTaskContext>();
2521
_contextFactoryMock.Setup(factory => factory.MakeContext(It.IsAny<ExternalTask>()))
2622
.Returns(contextMock.Object);
2723

2824
_worker = new DefaultCamundaWorker(
29-
_topicsProviderMock.Object,
3025
_selectorMock.Object,
3126
_contextFactoryMock.Object,
3227
new PipelineDescriptor(_routerMock.Object.RouteAsync)
@@ -78,7 +73,6 @@ private void ConfigureSelector(CancellationTokenSource cts, IReadOnlyCollection<
7873
{
7974
_selectorMock
8075
.Setup(selector => selector.SelectAsync(
81-
It.IsAny<IReadOnlyCollection<FetchAndLockRequest.Topic>>(),
8276
It.IsAny<CancellationToken>()
8377
))
8478
.Callback(cts.Cancel)

test/Camunda.Worker.Tests/Execution/ExternalTaskSelectorTest.cs

+6-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ namespace Camunda.Worker.Execution
1212
public class ExternalTaskSelectorTest
1313
{
1414
private readonly Mock<IExternalTaskClient> _clientMock = new Mock<IExternalTaskClient>();
15+
private readonly Mock<ITopicsProvider> _topicsProviderMock = new Mock<ITopicsProvider>();
1516

1617
private readonly IOptions<CamundaWorkerOptions> _options = Options.Create(new CamundaWorkerOptions
1718
{
@@ -24,8 +25,12 @@ public class ExternalTaskSelectorTest
2425

2526
public ExternalTaskSelectorTest()
2627
{
28+
_topicsProviderMock.Setup(provider => provider.GetTopics())
29+
.Returns(Array.Empty<FetchAndLockRequest.Topic>());
30+
2731
_selector = new ExternalTaskSelector(
2832
_clientMock.Object,
33+
_topicsProviderMock.Object,
2934
_options
3035
);
3136
}
@@ -38,7 +43,7 @@ public async Task TestSuccessfullySelection()
3843
client.FetchAndLockAsync(It.IsAny<FetchAndLockRequest>(), It.IsAny<CancellationToken>()))
3944
.ReturnsAsync(new List<ExternalTask>());
4045

41-
var result = await _selector.SelectAsync(new FetchAndLockRequest.Topic[0]);
46+
var result = await _selector.SelectAsync();
4247

4348
Assert.Empty(result);
4449
_clientMock.VerifyAll();

0 commit comments

Comments
 (0)