Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Breaking change] Change selector API #24

Merged
merged 1 commit into from
Mar 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions src/Camunda.Worker/Execution/DefaultCamundaWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,18 @@ namespace Camunda.Worker.Execution
{
public sealed class DefaultCamundaWorker : ICamundaWorker
{
private readonly ITopicsProvider _topicsProvider;
private readonly IExternalTaskSelector _selector;
private readonly IContextFactory _contextFactory;
private readonly PipelineDescriptor _pipelineDescriptor;
private readonly ILogger<DefaultCamundaWorker> _logger;

public DefaultCamundaWorker(ITopicsProvider topicsProvider,
public DefaultCamundaWorker(
IExternalTaskSelector selector,
IContextFactory contextFactory,
PipelineDescriptor pipelineDescriptor,
ILogger<DefaultCamundaWorker>? logger = null)
ILogger<DefaultCamundaWorker>? logger = null
)
{
_topicsProvider = Guard.NotNull(topicsProvider, nameof(topicsProvider));
_selector = Guard.NotNull(selector, nameof(selector));
_contextFactory = Guard.NotNull(contextFactory, nameof(contextFactory));
_pipelineDescriptor = Guard.NotNull(pipelineDescriptor, nameof(pipelineDescriptor));
Expand All @@ -33,7 +32,7 @@ public async Task Run(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var externalTasks = await SelectExternalTasks(cancellationToken);
var externalTasks = await _selector.SelectAsync(cancellationToken);

var executableTasks = externalTasks
.Select(ProcessExternalTask)
Expand All @@ -43,13 +42,6 @@ public async Task Run(CancellationToken cancellationToken)
}
}

private Task<IReadOnlyCollection<ExternalTask>> SelectExternalTasks(CancellationToken cancellationToken)
{
var topics = _topicsProvider.GetTopics();
var selectedTasks = _selector.SelectAsync(topics, cancellationToken);
return selectedTasks;
}

private async Task ProcessExternalTask(ExternalTask externalTask)
{
using var context = _contextFactory.MakeContext(externalTask);
Expand Down
18 changes: 15 additions & 3 deletions src/Camunda.Worker/Execution/ExternalTaskSelector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ namespace Camunda.Worker.Execution
public sealed class ExternalTaskSelector : IExternalTaskSelector
{
private readonly IExternalTaskClient _client;
private readonly ITopicsProvider _topicsProvider;
private readonly CamundaWorkerOptions _options;
private readonly ILogger<ExternalTaskSelector> _logger;

public ExternalTaskSelector(
IExternalTaskClient client,
ITopicsProvider topicsProvider,
IOptions<CamundaWorkerOptions> options,
ILogger<ExternalTaskSelector>? logger = null
)
{
_client = Guard.NotNull(client, nameof(client));
_topicsProvider = Guard.NotNull(topicsProvider, nameof(topicsProvider));
_options = Guard.NotNull(options, nameof(options)).Value;
_logger = logger ?? NullLogger<ExternalTaskSelector>.Instance;
}
Expand All @@ -30,25 +33,34 @@ public async Task<IReadOnlyCollection<ExternalTask>> SelectAsync(
IReadOnlyCollection<FetchAndLockRequest.Topic> topics,
CancellationToken cancellationToken = default
)
{
return await SelectAsync(cancellationToken);
}

public async Task<IReadOnlyCollection<ExternalTask>> SelectAsync(
CancellationToken cancellationToken = default
)
{
try
{
_logger.LogDebug("Waiting for external task");
var fetchAndLockRequest = MakeRequestBody(topics);
var fetchAndLockRequest = MakeRequestBody();
var externalTasks = await PerformSelection(fetchAndLockRequest, cancellationToken);
_logger.LogDebug("Locked {Count} external tasks", externalTasks.Count);
return externalTasks;
}
catch (Exception e) when (!cancellationToken.IsCancellationRequested)
{
_logger.LogWarning(e,"Failed receiving of external tasks. Reason: \"{Reason}\"", e.Message);
_logger.LogWarning(e, "Failed receiving of external tasks. Reason: \"{Reason}\"", e.Message);
await DelayOnFailure(cancellationToken);
return Array.Empty<ExternalTask>();
}
}

private FetchAndLockRequest MakeRequestBody(IReadOnlyCollection<FetchAndLockRequest.Topic> topics)
private FetchAndLockRequest MakeRequestBody()
{
var topics = _topicsProvider.GetTopics();

var fetchAndLockRequest = new FetchAndLockRequest(_options.WorkerId)
{
UsePriority = true,
Expand Down
6 changes: 1 addition & 5 deletions src/Camunda.Worker/Execution/IExternalTaskSelector.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Camunda.Worker.Client;

namespace Camunda.Worker.Execution
{
public interface IExternalTaskSelector
{
Task<IReadOnlyCollection<ExternalTask>> SelectAsync(
IReadOnlyCollection<FetchAndLockRequest.Topic> topics,
CancellationToken cancellationToken = default
);
Task<IReadOnlyCollection<ExternalTask>> SelectAsync(CancellationToken cancellationToken = default);
}
}
1 change: 0 additions & 1 deletion test/Camunda.Worker.Tests/CamundaWorkerBuilderTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ private class TopicsProvider : ITopicsProvider
private class ExternalTaskSelector : IExternalTaskSelector
{
public Task<IReadOnlyCollection<ExternalTask>> SelectAsync(
IReadOnlyCollection<FetchAndLockRequest.Topic> topics,
CancellationToken cancellationToken = default
)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,17 @@ namespace Camunda.Worker.Execution
public class DefaultCamundaWorkerTest
{
private readonly Mock<IExternalTaskRouter> _routerMock = new Mock<IExternalTaskRouter>();
private readonly Mock<ITopicsProvider> _topicsProviderMock = new Mock<ITopicsProvider>();
private readonly Mock<IExternalTaskSelector> _selectorMock = new Mock<IExternalTaskSelector>();
private readonly Mock<IContextFactory> _contextFactoryMock = new Mock<IContextFactory>();
private readonly DefaultCamundaWorker _worker;

public DefaultCamundaWorkerTest()
{
_topicsProviderMock.Setup(provider => provider.GetTopics())
.Returns(Array.Empty<FetchAndLockRequest.Topic>());

var contextMock = new Mock<IExternalTaskContext>();
_contextFactoryMock.Setup(factory => factory.MakeContext(It.IsAny<ExternalTask>()))
.Returns(contextMock.Object);

_worker = new DefaultCamundaWorker(
_topicsProviderMock.Object,
_selectorMock.Object,
_contextFactoryMock.Object,
new PipelineDescriptor(_routerMock.Object.RouteAsync)
Expand Down Expand Up @@ -78,7 +73,6 @@ private void ConfigureSelector(CancellationTokenSource cts, IReadOnlyCollection<
{
_selectorMock
.Setup(selector => selector.SelectAsync(
It.IsAny<IReadOnlyCollection<FetchAndLockRequest.Topic>>(),
It.IsAny<CancellationToken>()
))
.Callback(cts.Cancel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace Camunda.Worker.Execution
public class ExternalTaskSelectorTest
{
private readonly Mock<IExternalTaskClient> _clientMock = new Mock<IExternalTaskClient>();
private readonly Mock<ITopicsProvider> _topicsProviderMock = new Mock<ITopicsProvider>();

private readonly IOptions<CamundaWorkerOptions> _options = Options.Create(new CamundaWorkerOptions
{
Expand All @@ -24,8 +25,12 @@ public class ExternalTaskSelectorTest

public ExternalTaskSelectorTest()
{
_topicsProviderMock.Setup(provider => provider.GetTopics())
.Returns(Array.Empty<FetchAndLockRequest.Topic>());

_selector = new ExternalTaskSelector(
_clientMock.Object,
_topicsProviderMock.Object,
_options
);
}
Expand All @@ -38,7 +43,7 @@ public async Task TestSuccessfullySelection()
client.FetchAndLockAsync(It.IsAny<FetchAndLockRequest>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new List<ExternalTask>());

var result = await _selector.SelectAsync(new FetchAndLockRequest.Topic[0]);
var result = await _selector.SelectAsync();

Assert.Empty(result);
_clientMock.VerifyAll();
Expand Down