Skip to content

Commit

Permalink
change signature of IExternalTaskSelector.SelectAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
TechnoBerry committed Mar 18, 2021
1 parent 59dcdf9 commit 3ff2111
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 28 deletions.
16 changes: 4 additions & 12 deletions src/Camunda.Worker/Execution/DefaultCamundaWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,18 @@ namespace Camunda.Worker.Execution
{
public sealed class DefaultCamundaWorker : ICamundaWorker
{
private readonly ITopicsProvider _topicsProvider;
private readonly IExternalTaskSelector _selector;
private readonly IContextFactory _contextFactory;
private readonly PipelineDescriptor _pipelineDescriptor;
private readonly ILogger<DefaultCamundaWorker> _logger;

public DefaultCamundaWorker(ITopicsProvider topicsProvider,
public DefaultCamundaWorker(
IExternalTaskSelector selector,
IContextFactory contextFactory,
PipelineDescriptor pipelineDescriptor,
ILogger<DefaultCamundaWorker>? logger = null)
ILogger<DefaultCamundaWorker>? logger = null
)
{
_topicsProvider = Guard.NotNull(topicsProvider, nameof(topicsProvider));
_selector = Guard.NotNull(selector, nameof(selector));
_contextFactory = Guard.NotNull(contextFactory, nameof(contextFactory));
_pipelineDescriptor = Guard.NotNull(pipelineDescriptor, nameof(pipelineDescriptor));
Expand All @@ -33,7 +32,7 @@ public async Task Run(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var externalTasks = await SelectExternalTasks(cancellationToken);
var externalTasks = await _selector.SelectAsync(cancellationToken);

var executableTasks = externalTasks
.Select(ProcessExternalTask)
Expand All @@ -43,13 +42,6 @@ public async Task Run(CancellationToken cancellationToken)
}
}

private Task<IReadOnlyCollection<ExternalTask>> SelectExternalTasks(CancellationToken cancellationToken)
{
var topics = _topicsProvider.GetTopics();
var selectedTasks = _selector.SelectAsync(topics, cancellationToken);
return selectedTasks;
}

private async Task ProcessExternalTask(ExternalTask externalTask)
{
using var context = _contextFactory.MakeContext(externalTask);
Expand Down
18 changes: 15 additions & 3 deletions src/Camunda.Worker/Execution/ExternalTaskSelector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ namespace Camunda.Worker.Execution
public sealed class ExternalTaskSelector : IExternalTaskSelector
{
private readonly IExternalTaskClient _client;
private readonly ITopicsProvider _topicsProvider;
private readonly CamundaWorkerOptions _options;
private readonly ILogger<ExternalTaskSelector> _logger;

public ExternalTaskSelector(
IExternalTaskClient client,
ITopicsProvider topicsProvider,
IOptions<CamundaWorkerOptions> options,
ILogger<ExternalTaskSelector>? logger = null
)
{
_client = Guard.NotNull(client, nameof(client));
_topicsProvider = Guard.NotNull(topicsProvider, nameof(topicsProvider));
_options = Guard.NotNull(options, nameof(options)).Value;
_logger = logger ?? NullLogger<ExternalTaskSelector>.Instance;
}
Expand All @@ -30,25 +33,34 @@ public async Task<IReadOnlyCollection<ExternalTask>> SelectAsync(
IReadOnlyCollection<FetchAndLockRequest.Topic> topics,
CancellationToken cancellationToken = default
)
{
return await SelectAsync(cancellationToken);
}

public async Task<IReadOnlyCollection<ExternalTask>> SelectAsync(
CancellationToken cancellationToken = default
)
{
try
{
_logger.LogDebug("Waiting for external task");
var fetchAndLockRequest = MakeRequestBody(topics);
var fetchAndLockRequest = MakeRequestBody();
var externalTasks = await PerformSelection(fetchAndLockRequest, cancellationToken);
_logger.LogDebug("Locked {Count} external tasks", externalTasks.Count);
return externalTasks;
}
catch (Exception e) when (!cancellationToken.IsCancellationRequested)
{
_logger.LogWarning(e,"Failed receiving of external tasks. Reason: \"{Reason}\"", e.Message);
_logger.LogWarning(e, "Failed receiving of external tasks. Reason: \"{Reason}\"", e.Message);
await DelayOnFailure(cancellationToken);
return Array.Empty<ExternalTask>();
}
}

private FetchAndLockRequest MakeRequestBody(IReadOnlyCollection<FetchAndLockRequest.Topic> topics)
private FetchAndLockRequest MakeRequestBody()
{
var topics = _topicsProvider.GetTopics();

var fetchAndLockRequest = new FetchAndLockRequest(_options.WorkerId)
{
UsePriority = true,
Expand Down
6 changes: 1 addition & 5 deletions src/Camunda.Worker/Execution/IExternalTaskSelector.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Camunda.Worker.Client;

namespace Camunda.Worker.Execution
{
public interface IExternalTaskSelector
{
Task<IReadOnlyCollection<ExternalTask>> SelectAsync(
IReadOnlyCollection<FetchAndLockRequest.Topic> topics,
CancellationToken cancellationToken = default
);
Task<IReadOnlyCollection<ExternalTask>> SelectAsync(CancellationToken cancellationToken = default);
}
}
1 change: 0 additions & 1 deletion test/Camunda.Worker.Tests/CamundaWorkerBuilderTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ private class TopicsProvider : ITopicsProvider
private class ExternalTaskSelector : IExternalTaskSelector
{
public Task<IReadOnlyCollection<ExternalTask>> SelectAsync(
IReadOnlyCollection<FetchAndLockRequest.Topic> topics,
CancellationToken cancellationToken = default
)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,17 @@ namespace Camunda.Worker.Execution
public class DefaultCamundaWorkerTest
{
private readonly Mock<IExternalTaskRouter> _routerMock = new Mock<IExternalTaskRouter>();
private readonly Mock<ITopicsProvider> _topicsProviderMock = new Mock<ITopicsProvider>();
private readonly Mock<IExternalTaskSelector> _selectorMock = new Mock<IExternalTaskSelector>();
private readonly Mock<IContextFactory> _contextFactoryMock = new Mock<IContextFactory>();
private readonly DefaultCamundaWorker _worker;

public DefaultCamundaWorkerTest()
{
_topicsProviderMock.Setup(provider => provider.GetTopics())
.Returns(Array.Empty<FetchAndLockRequest.Topic>());

var contextMock = new Mock<IExternalTaskContext>();
_contextFactoryMock.Setup(factory => factory.MakeContext(It.IsAny<ExternalTask>()))
.Returns(contextMock.Object);

_worker = new DefaultCamundaWorker(
_topicsProviderMock.Object,
_selectorMock.Object,
_contextFactoryMock.Object,
new PipelineDescriptor(_routerMock.Object.RouteAsync)
Expand Down Expand Up @@ -78,7 +73,6 @@ private void ConfigureSelector(CancellationTokenSource cts, IReadOnlyCollection<
{
_selectorMock
.Setup(selector => selector.SelectAsync(
It.IsAny<IReadOnlyCollection<FetchAndLockRequest.Topic>>(),
It.IsAny<CancellationToken>()
))
.Callback(cts.Cancel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace Camunda.Worker.Execution
public class ExternalTaskSelectorTest
{
private readonly Mock<IExternalTaskClient> _clientMock = new Mock<IExternalTaskClient>();
private readonly Mock<ITopicsProvider> _topicsProviderMock = new Mock<ITopicsProvider>();

private readonly IOptions<CamundaWorkerOptions> _options = Options.Create(new CamundaWorkerOptions
{
Expand All @@ -24,8 +25,12 @@ public class ExternalTaskSelectorTest

public ExternalTaskSelectorTest()
{
_topicsProviderMock.Setup(provider => provider.GetTopics())
.Returns(Array.Empty<FetchAndLockRequest.Topic>());

_selector = new ExternalTaskSelector(
_clientMock.Object,
_topicsProviderMock.Object,
_options
);
}
Expand All @@ -38,7 +43,7 @@ public async Task TestSuccessfullySelection()
client.FetchAndLockAsync(It.IsAny<FetchAndLockRequest>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new List<ExternalTask>());

var result = await _selector.SelectAsync(new FetchAndLockRequest.Topic[0]);
var result = await _selector.SelectAsync();

Assert.Empty(result);
_clientMock.VerifyAll();
Expand Down

0 comments on commit 3ff2111

Please sign in to comment.