Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 32 additions & 25 deletions src/Testing/CoreTests/Transports/BackPressureAgentTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using NSubstitute;
using Wolverine.Configuration;
using Wolverine.Runtime.Agents;
using Wolverine.Transports;
using Wolverine.Transports.Tcp;
using Xunit;
Expand All @@ -11,14 +12,16 @@ public class BackPressureAgentTests
private readonly BackPressureAgent theBackPressureAgent;
private readonly Endpoint theEndpoint = new TcpEndpoint(5555);
private readonly IListeningAgent theListeningAgent = Substitute.For<IListeningAgent>();
private readonly IWolverineObserver theObserver;

public BackPressureAgentTests()
{
theBackPressureAgent = new BackPressureAgent(theListeningAgent, theEndpoint);
theObserver = Substitute.For<IWolverineObserver>();
theBackPressureAgent = new BackPressureAgent(theListeningAgent, theEndpoint, theObserver);
}

[Fact]
public void do_nothing_when_accepting_and_under_the_threshold()
public async Task do_nothing_when_accepting_and_under_the_threshold()
{
theListeningAgent.Status
.Returns(ListeningStatus.Accepting);
Expand All @@ -29,70 +32,74 @@ public void do_nothing_when_accepting_and_under_the_threshold()
// based on the current queued item count, the current status
// of the listening agent, and the configured buffering limits
// for the endpoint
theBackPressureAgent.CheckNowAsync();
await theBackPressureAgent.CheckNowAsync();

// Should decide NOT to do anything in this particular case
theListeningAgent.DidNotReceive().MarkAsTooBusyAndStopReceivingAsync();
theListeningAgent.DidNotReceive().StartAsync();
await theListeningAgent.DidNotReceive().MarkAsTooBusyAndStopReceivingAsync();
await theListeningAgent.DidNotReceive().StartAsync();
}

[Fact]
public void do_nothing_when_accepting_at_the_threshold()
public async Task do_nothing_when_accepting_at_the_threshold()
{
theListeningAgent.Status.Returns(ListeningStatus.Accepting);
theListeningAgent.QueueCount.Returns(theEndpoint.BufferingLimits.Maximum);

theBackPressureAgent.CheckNowAsync();
await theBackPressureAgent.CheckNowAsync();

theListeningAgent.DidNotReceive().MarkAsTooBusyAndStopReceivingAsync();
theListeningAgent.DidNotReceive().StartAsync();
await theListeningAgent.DidNotReceive().MarkAsTooBusyAndStopReceivingAsync();
await theListeningAgent.DidNotReceive().StartAsync();
}

[Fact]
public void stop_receiving_accepting_over_the_threshold()
public async Task stop_receiving_accepting_over_the_threshold()
{
theListeningAgent.Status.Returns(ListeningStatus.Accepting);
theListeningAgent.QueueCount.Returns(theEndpoint.BufferingLimits.Maximum + 1);

theBackPressureAgent.CheckNowAsync();
await theBackPressureAgent.CheckNowAsync();

theListeningAgent.Received().MarkAsTooBusyAndStopReceivingAsync();
theListeningAgent.DidNotReceive().StartAsync();
await theListeningAgent.Received().MarkAsTooBusyAndStopReceivingAsync();
await theListeningAgent.DidNotReceive().StartAsync();

await theObserver.Received().BackPressureTriggered(theEndpoint, theListeningAgent);
}

[Fact]
public void do_nothing_when_too_busy_and_over_the_restart_limit()
public async Task do_nothing_when_too_busy_and_over_the_restart_limit()
{
theListeningAgent.Status.Returns(ListeningStatus.TooBusy);
theListeningAgent.QueueCount.Returns(theEndpoint.BufferingLimits.Restart + 1);

theBackPressureAgent.CheckNowAsync();
await theBackPressureAgent.CheckNowAsync();

theListeningAgent.DidNotReceive().MarkAsTooBusyAndStopReceivingAsync();
theListeningAgent.DidNotReceive().StartAsync();
await theListeningAgent.DidNotReceive().MarkAsTooBusyAndStopReceivingAsync();
await theListeningAgent.DidNotReceive().StartAsync();

await theObserver.DidNotReceive().BackPressureTriggered(theEndpoint, theListeningAgent);
}

[Fact]
public void restart_when_too_busy_but_reached_the_restart_threshold()
public async Task restart_when_too_busy_but_reached_the_restart_threshold()
{
theListeningAgent.Status.Returns(ListeningStatus.TooBusy);
theListeningAgent.QueueCount.Returns(theEndpoint.BufferingLimits.Restart);

theBackPressureAgent.CheckNowAsync();
await theBackPressureAgent.CheckNowAsync();

theListeningAgent.DidNotReceive().MarkAsTooBusyAndStopReceivingAsync();
theListeningAgent.Received().StartAsync();
await theListeningAgent.DidNotReceive().MarkAsTooBusyAndStopReceivingAsync();
await theListeningAgent.Received().StartAsync();
}

[Fact]
public void restart_when_too_busy_but_below_the_restart_threshold()
public async Task restart_when_too_busy_but_below_the_restart_threshold()
{
theListeningAgent.Status.Returns(ListeningStatus.TooBusy);
theListeningAgent.QueueCount.Returns(theEndpoint.BufferingLimits.Restart - 1);

theBackPressureAgent.CheckNowAsync();
await theBackPressureAgent.CheckNowAsync();

theListeningAgent.DidNotReceive().MarkAsTooBusyAndStopReceivingAsync();
theListeningAgent.Received().StartAsync();
await theListeningAgent.DidNotReceive().MarkAsTooBusyAndStopReceivingAsync();
await theListeningAgent.Received().StartAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public async Task<AgentCommands> ExecuteAsync(IWolverineRuntime runtime,
if (pageSize == 0)
{
_logger.LogInformation(
"Unable to recover inbox messages to destination {Destination}. Listener has status {Status} and queued count {QueuedCount}",
_count.Destination, _circuit.Status, _circuit.QueueCount);
"Unable to recover inbox messages to destination {Destination}. Listener has status {Status}, queued count {QueuedCount}, and BufferingLimits {BufferedLimits}",
_count.Destination, _circuit.Status, _circuit.QueueCount, _circuit.Endpoint.BufferingLimits);
return AgentCommands.Empty;
}

Expand Down
14 changes: 14 additions & 0 deletions src/Wolverine/Runtime/Agents/IWolverineObserver.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Wolverine.Configuration;
using Wolverine.Runtime.Routing;
using Wolverine.Transports;

namespace Wolverine.Runtime.Agents;

Expand All @@ -19,6 +20,9 @@ public interface IWolverineObserver
Task RuntimeIsFullyStarted();
void EndpointAdded(Endpoint endpoint);
void MessageRouted(Type messageType, IMessageRouter router);

Task BackPressureTriggered(Endpoint endpoint, IListeningAgent agent);
Task BackPressureLifted(Endpoint endpoint);
}

internal class PersistenceWolverineObserver : IWolverineObserver
Expand All @@ -30,6 +34,16 @@ public PersistenceWolverineObserver(IWolverineRuntime runtime)
_runtime = runtime;
}

public Task BackPressureTriggered(Endpoint endpoint, IListeningAgent agent)
{
return Task.CompletedTask;
}

public Task BackPressureLifted(Endpoint endpoint)
{
return Task.CompletedTask;
}

public async Task AssumedLeadership()
{
await _runtime.Storage.Nodes.LogRecordsAsync(NodeRecord.For(_runtime.Options,
Expand Down
15 changes: 9 additions & 6 deletions src/Wolverine/Transports/BackPressureAgent.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Timers;
using Wolverine.Configuration;
using Wolverine.Runtime.Agents;
using Timer = System.Timers.Timer;

namespace Wolverine.Transports;
Expand All @@ -8,12 +9,14 @@ internal class BackPressureAgent : IDisposable
{
private readonly IListeningAgent _agent;
private readonly Endpoint _endpoint;
private readonly IWolverineObserver _observer;
private Timer? _timer;

public BackPressureAgent(IListeningAgent agent, Endpoint endpoint)
public BackPressureAgent(IListeningAgent agent, Endpoint endpoint, IWolverineObserver observer)
{
_agent = agent;
_endpoint = endpoint;
_observer = observer;
}

public void Dispose()
Expand All @@ -40,23 +43,23 @@ private void TimerOnElapsed(object? sender, ElapsedEventArgs e)
#pragma warning restore CS4014
}

public ValueTask CheckNowAsync()
public async ValueTask CheckNowAsync()
{
if (_agent.Status is ListeningStatus.Accepting or ListeningStatus.Unknown)
{
if (_agent.QueueCount > _endpoint.BufferingLimits.Maximum)
{
return _agent.MarkAsTooBusyAndStopReceivingAsync();
await _observer.BackPressureTriggered(_endpoint, _agent);
await _agent.MarkAsTooBusyAndStopReceivingAsync();
}
}
else if (_agent.Status == ListeningStatus.TooBusy)
{
if (_agent.QueueCount <= _endpoint.BufferingLimits.Restart)
{
return _agent.StartAsync();
await _agent.StartAsync();
await _observer.BackPressureLifted(_endpoint);
}
}

return ValueTask.CompletedTask;
}
}
4 changes: 2 additions & 2 deletions src/Wolverine/Transports/ListeningAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public ListeningAgent(Endpoint endpoint, WolverineRuntime runtime)

if (endpoint.ShouldEnforceBackPressure())
{
_backPressureAgent = new BackPressureAgent(this, endpoint);
_backPressureAgent = new BackPressureAgent(this, endpoint, runtime.Observer);
_backPressureAgent.Start();
}
}
Expand Down Expand Up @@ -264,7 +264,7 @@ public async ValueTask MarkAsTooBusyAndStopReceivingAsync()
Status = ListeningStatus.TooBusy;
_runtime.Tracker.Publish(new ListenerState(Uri, Endpoint.EndpointName, Status));

_logger.LogInformation("Marked listener at {Uri} as too busy and stopped receiving", Uri);
_logger.LogInformation("Marked listener at {Uri} as too busy and stopped receiving. The current local message count is {LocalCount}, and the BufferingLimits are set to {BufferingLimits}. You may want to increase the buffering limits", Uri, QueueCount, Endpoint.BufferingLimits);
}
finally
{
Expand Down
Loading