diff --git a/src/Testing/CoreTests/Transports/BackPressureAgentTests.cs b/src/Testing/CoreTests/Transports/BackPressureAgentTests.cs index 3e7fda3f2..5445433f4 100644 --- a/src/Testing/CoreTests/Transports/BackPressureAgentTests.cs +++ b/src/Testing/CoreTests/Transports/BackPressureAgentTests.cs @@ -1,5 +1,6 @@ using NSubstitute; using Wolverine.Configuration; +using Wolverine.Runtime.Agents; using Wolverine.Transports; using Wolverine.Transports.Tcp; using Xunit; @@ -11,14 +12,16 @@ public class BackPressureAgentTests private readonly BackPressureAgent theBackPressureAgent; private readonly Endpoint theEndpoint = new TcpEndpoint(5555); private readonly IListeningAgent theListeningAgent = Substitute.For(); + private readonly IWolverineObserver theObserver; public BackPressureAgentTests() { - theBackPressureAgent = new BackPressureAgent(theListeningAgent, theEndpoint); + theObserver = Substitute.For(); + theBackPressureAgent = new BackPressureAgent(theListeningAgent, theEndpoint, theObserver); } [Fact] - public void do_nothing_when_accepting_and_under_the_threshold() + public async Task do_nothing_when_accepting_and_under_the_threshold() { theListeningAgent.Status .Returns(ListeningStatus.Accepting); @@ -29,70 +32,74 @@ public void do_nothing_when_accepting_and_under_the_threshold() // based on the current queued item count, the current status // of the listening agent, and the configured buffering limits // for the endpoint - theBackPressureAgent.CheckNowAsync(); + await theBackPressureAgent.CheckNowAsync(); // Should decide NOT to do anything in this particular case - theListeningAgent.DidNotReceive().MarkAsTooBusyAndStopReceivingAsync(); - theListeningAgent.DidNotReceive().StartAsync(); + await theListeningAgent.DidNotReceive().MarkAsTooBusyAndStopReceivingAsync(); + await theListeningAgent.DidNotReceive().StartAsync(); } [Fact] - public void do_nothing_when_accepting_at_the_threshold() + public async Task do_nothing_when_accepting_at_the_threshold() { theListeningAgent.Status.Returns(ListeningStatus.Accepting); theListeningAgent.QueueCount.Returns(theEndpoint.BufferingLimits.Maximum); - theBackPressureAgent.CheckNowAsync(); + await theBackPressureAgent.CheckNowAsync(); - theListeningAgent.DidNotReceive().MarkAsTooBusyAndStopReceivingAsync(); - theListeningAgent.DidNotReceive().StartAsync(); + await theListeningAgent.DidNotReceive().MarkAsTooBusyAndStopReceivingAsync(); + await theListeningAgent.DidNotReceive().StartAsync(); } [Fact] - public void stop_receiving_accepting_over_the_threshold() + public async Task stop_receiving_accepting_over_the_threshold() { theListeningAgent.Status.Returns(ListeningStatus.Accepting); theListeningAgent.QueueCount.Returns(theEndpoint.BufferingLimits.Maximum + 1); - theBackPressureAgent.CheckNowAsync(); + await theBackPressureAgent.CheckNowAsync(); - theListeningAgent.Received().MarkAsTooBusyAndStopReceivingAsync(); - theListeningAgent.DidNotReceive().StartAsync(); + await theListeningAgent.Received().MarkAsTooBusyAndStopReceivingAsync(); + await theListeningAgent.DidNotReceive().StartAsync(); + + await theObserver.Received().BackPressureTriggered(theEndpoint, theListeningAgent); } [Fact] - public void do_nothing_when_too_busy_and_over_the_restart_limit() + public async Task do_nothing_when_too_busy_and_over_the_restart_limit() { theListeningAgent.Status.Returns(ListeningStatus.TooBusy); theListeningAgent.QueueCount.Returns(theEndpoint.BufferingLimits.Restart + 1); - theBackPressureAgent.CheckNowAsync(); + await theBackPressureAgent.CheckNowAsync(); - theListeningAgent.DidNotReceive().MarkAsTooBusyAndStopReceivingAsync(); - theListeningAgent.DidNotReceive().StartAsync(); + await theListeningAgent.DidNotReceive().MarkAsTooBusyAndStopReceivingAsync(); + await theListeningAgent.DidNotReceive().StartAsync(); + + await theObserver.DidNotReceive().BackPressureTriggered(theEndpoint, theListeningAgent); } [Fact] - public void restart_when_too_busy_but_reached_the_restart_threshold() + public async Task restart_when_too_busy_but_reached_the_restart_threshold() { theListeningAgent.Status.Returns(ListeningStatus.TooBusy); theListeningAgent.QueueCount.Returns(theEndpoint.BufferingLimits.Restart); - theBackPressureAgent.CheckNowAsync(); + await theBackPressureAgent.CheckNowAsync(); - theListeningAgent.DidNotReceive().MarkAsTooBusyAndStopReceivingAsync(); - theListeningAgent.Received().StartAsync(); + await theListeningAgent.DidNotReceive().MarkAsTooBusyAndStopReceivingAsync(); + await theListeningAgent.Received().StartAsync(); } [Fact] - public void restart_when_too_busy_but_below_the_restart_threshold() + public async Task restart_when_too_busy_but_below_the_restart_threshold() { theListeningAgent.Status.Returns(ListeningStatus.TooBusy); theListeningAgent.QueueCount.Returns(theEndpoint.BufferingLimits.Restart - 1); - theBackPressureAgent.CheckNowAsync(); + await theBackPressureAgent.CheckNowAsync(); - theListeningAgent.DidNotReceive().MarkAsTooBusyAndStopReceivingAsync(); - theListeningAgent.Received().StartAsync(); + await theListeningAgent.DidNotReceive().MarkAsTooBusyAndStopReceivingAsync(); + await theListeningAgent.Received().StartAsync(); } } \ No newline at end of file diff --git a/src/Wolverine/Persistence/Durability/RecoverIncomingMessagesCommand.cs b/src/Wolverine/Persistence/Durability/RecoverIncomingMessagesCommand.cs index 12c1b7e7d..03635ab6e 100644 --- a/src/Wolverine/Persistence/Durability/RecoverIncomingMessagesCommand.cs +++ b/src/Wolverine/Persistence/Durability/RecoverIncomingMessagesCommand.cs @@ -31,8 +31,8 @@ public async Task ExecuteAsync(IWolverineRuntime runtime, if (pageSize == 0) { _logger.LogInformation( - "Unable to recover inbox messages to destination {Destination}. Listener has status {Status} and queued count {QueuedCount}", - _count.Destination, _circuit.Status, _circuit.QueueCount); + "Unable to recover inbox messages to destination {Destination}. Listener has status {Status}, queued count {QueuedCount}, and BufferingLimits {BufferedLimits}", + _count.Destination, _circuit.Status, _circuit.QueueCount, _circuit.Endpoint.BufferingLimits); return AgentCommands.Empty; } diff --git a/src/Wolverine/Runtime/Agents/IWolverineObserver.cs b/src/Wolverine/Runtime/Agents/IWolverineObserver.cs index 20dbf06fc..813d10de1 100644 --- a/src/Wolverine/Runtime/Agents/IWolverineObserver.cs +++ b/src/Wolverine/Runtime/Agents/IWolverineObserver.cs @@ -1,5 +1,6 @@ using Wolverine.Configuration; using Wolverine.Runtime.Routing; +using Wolverine.Transports; namespace Wolverine.Runtime.Agents; @@ -19,6 +20,9 @@ public interface IWolverineObserver Task RuntimeIsFullyStarted(); void EndpointAdded(Endpoint endpoint); void MessageRouted(Type messageType, IMessageRouter router); + + Task BackPressureTriggered(Endpoint endpoint, IListeningAgent agent); + Task BackPressureLifted(Endpoint endpoint); } internal class PersistenceWolverineObserver : IWolverineObserver @@ -30,6 +34,16 @@ public PersistenceWolverineObserver(IWolverineRuntime runtime) _runtime = runtime; } + public Task BackPressureTriggered(Endpoint endpoint, IListeningAgent agent) + { + return Task.CompletedTask; + } + + public Task BackPressureLifted(Endpoint endpoint) + { + return Task.CompletedTask; + } + public async Task AssumedLeadership() { await _runtime.Storage.Nodes.LogRecordsAsync(NodeRecord.For(_runtime.Options, diff --git a/src/Wolverine/Transports/BackPressureAgent.cs b/src/Wolverine/Transports/BackPressureAgent.cs index edc239fd3..3628577c5 100644 --- a/src/Wolverine/Transports/BackPressureAgent.cs +++ b/src/Wolverine/Transports/BackPressureAgent.cs @@ -1,5 +1,6 @@ using System.Timers; using Wolverine.Configuration; +using Wolverine.Runtime.Agents; using Timer = System.Timers.Timer; namespace Wolverine.Transports; @@ -8,12 +9,14 @@ internal class BackPressureAgent : IDisposable { private readonly IListeningAgent _agent; private readonly Endpoint _endpoint; + private readonly IWolverineObserver _observer; private Timer? _timer; - public BackPressureAgent(IListeningAgent agent, Endpoint endpoint) + public BackPressureAgent(IListeningAgent agent, Endpoint endpoint, IWolverineObserver observer) { _agent = agent; _endpoint = endpoint; + _observer = observer; } public void Dispose() @@ -40,23 +43,23 @@ private void TimerOnElapsed(object? sender, ElapsedEventArgs e) #pragma warning restore CS4014 } - public ValueTask CheckNowAsync() + public async ValueTask CheckNowAsync() { if (_agent.Status is ListeningStatus.Accepting or ListeningStatus.Unknown) { if (_agent.QueueCount > _endpoint.BufferingLimits.Maximum) { - return _agent.MarkAsTooBusyAndStopReceivingAsync(); + await _observer.BackPressureTriggered(_endpoint, _agent); + await _agent.MarkAsTooBusyAndStopReceivingAsync(); } } else if (_agent.Status == ListeningStatus.TooBusy) { if (_agent.QueueCount <= _endpoint.BufferingLimits.Restart) { - return _agent.StartAsync(); + await _agent.StartAsync(); + await _observer.BackPressureLifted(_endpoint); } } - - return ValueTask.CompletedTask; } } \ No newline at end of file diff --git a/src/Wolverine/Transports/ListeningAgent.cs b/src/Wolverine/Transports/ListeningAgent.cs index 556f58db9..cd479a6ce 100644 --- a/src/Wolverine/Transports/ListeningAgent.cs +++ b/src/Wolverine/Transports/ListeningAgent.cs @@ -65,7 +65,7 @@ public ListeningAgent(Endpoint endpoint, WolverineRuntime runtime) if (endpoint.ShouldEnforceBackPressure()) { - _backPressureAgent = new BackPressureAgent(this, endpoint); + _backPressureAgent = new BackPressureAgent(this, endpoint, runtime.Observer); _backPressureAgent.Start(); } } @@ -264,7 +264,7 @@ public async ValueTask MarkAsTooBusyAndStopReceivingAsync() Status = ListeningStatus.TooBusy; _runtime.Tracker.Publish(new ListenerState(Uri, Endpoint.EndpointName, Status)); - _logger.LogInformation("Marked listener at {Uri} as too busy and stopped receiving", Uri); + _logger.LogInformation("Marked listener at {Uri} as too busy and stopped receiving. The current local message count is {LocalCount}, and the BufferingLimits are set to {BufferingLimits}. You may want to increase the buffering limits", Uri, QueueCount, Endpoint.BufferingLimits); } finally {