Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/Testing/BackPressureTests/BackPressureTests.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.14.1" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.6.0"/>
<PackageReference Include="xunit" Version="2.9.0"/>
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" PrivateAssets="All" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Persistence\Wolverine.Marten\Wolverine.Marten.csproj" />
<ProjectReference Include="..\..\Transports\RabbitMQ\Wolverine.RabbitMQ\Wolverine.RabbitMQ.csproj" />
</ItemGroup>

<ItemGroup>
<Compile Include="..\..\Servers.cs">
<Link>Servers.cs</Link>
</Compile>
</ItemGroup>
</Project>
160 changes: 160 additions & 0 deletions src/Testing/BackPressureTests/Harness.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
using IntegrationTests;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Marten;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Wolverine;
using Wolverine.Marten;
using Wolverine.RabbitMQ;
using Wolverine.Runtime;
using Wolverine.Tracking;
using Xunit;
using Xunit.Abstractions;


namespace BackPressureTests;

public class MassSender(IHost sender)
{
private readonly CancellationTokenSource _cancellation = new();
private Task _task;

public void Cancel()
{
_cancellation.Cancel();
}

public void StartPublishing(int maximum = 5000, TimeSpan? time = null)
{
if (time != null)
{
_cancellation.CancelAfter(time.Value);
}

var runtime = sender.GetRuntime();

_task = Task.Run(async () =>
{
for (int i = 0; i < maximum; i++)
{
if (_cancellation.IsCancellationRequested) return;
var bus = new MessageBus(runtime);
await bus.PublishAsync(new Message1(Guid.NewGuid()));
await bus.PublishAsync(new Message2(Guid.NewGuid()));
await bus.PublishAsync(new Message3(Guid.NewGuid()));
await bus.PublishAsync(new Message4(Guid.NewGuid()));
}
});
}
}

public class Harness : IAsyncLifetime, IWolverineActivator
{
private IHost _sender;
private XUnitObserver theObserver;
private IHost _receiver;
public static bool GoSlow { get; set; } = true;

public Harness(ITestOutputHelper output)
{
theObserver = new XUnitObserver(output);
}

void IWolverineActivator.Apply(IWolverineRuntime runtime)
{
runtime.Observer = theObserver;
}

public async Task InitializeAsync()
{
_sender = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseRabbitMq().AutoProvision().AutoPurgeOnStartup();
opts.Discovery.DisableConventionalDiscovery();
opts.PublishAllMessages().ToRabbitQueue("bp");
}).StartAsync();

_receiver = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
// for callbacks
opts.Services.AddSingleton<IWolverineActivator>(this);

opts.Services.AddMarten(m =>
{
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "bp";
m.DisableNpgsqlLogging = true;
}).IntegrateWithWolverine();

opts.ListenToRabbitQueue("bp").UseDurableInbox();
opts.Policies.AutoApplyTransactions();
}).StartAsync();

_receiver.GetRuntime().Observer = theObserver;
}

public async Task DisposeAsync()
{
await _sender.StopAsync();
await _receiver.StopAsync();
}

[Fact]
public async Task lets_see_if_we_can_trip_off_back_pressure_and_see_it_lifted()
{
Harness.GoSlow = true;
var sender = new MassSender(_sender);
sender.StartPublishing(20000);

await theObserver.Triggered.Task.TimeoutAfterAsync(90000);

Harness.GoSlow = false;

sender.Cancel();

await theObserver.Lifted.Task.TimeoutAfterAsync(90000);
}
}

public record Message1(Guid Id);
public record Message2(Guid Id);
public record Message3(Guid Id);
public record Message4(Guid Id);

public static class MessageHandler
{
public static async Task HandleAsync(Message1 m)
{
if (Harness.GoSlow)
{
await Task.Delay(Random.Shared.Next(100, 500));
}
}

public static async Task HandleAsync(Message2 m)
{
if (Harness.GoSlow)
{
await Task.Delay(Random.Shared.Next(100, 500));
}
}

public static async Task HandleAsync(Message3 m)
{
if (Harness.GoSlow)
{
await Task.Delay(Random.Shared.Next(100, 500));
}
}

public static async Task HandleAsync(Message4 m)
{
if (Harness.GoSlow)
{
await Task.Delay(Random.Shared.Next(100, 500));
}
}
}
84 changes: 84 additions & 0 deletions src/Testing/BackPressureTests/XUnitObserver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
using Wolverine.Configuration;
using Wolverine.Runtime.Agents;
using Wolverine.Runtime.Routing;
using Wolverine.Transports;
using Xunit.Abstractions;

namespace BackPressureTests;

public class XUnitObserver(ITestOutputHelper Output) : IWolverineObserver
{
public TaskCompletionSource Triggered { get; set; } = new();
public TaskCompletionSource Lifted { get; set; } = new();

public void Reset()
{
Triggered = new();
Lifted = new();
}

public Task AssumedLeadership()
{
return Task.CompletedTask;
}

public Task NodeStarted()
{
return Task.CompletedTask;
}

public Task NodeStopped()
{
return Task.CompletedTask;
}

public Task AgentStarted(Uri agentUri)
{
return Task.CompletedTask;
}

public Task AgentStopped(Uri agentUri)
{
return Task.CompletedTask;
}

public Task AssignmentsChanged(AssignmentGrid grid, AgentCommands commands)
{
return Task.CompletedTask;
}

public Task StaleNodes(IReadOnlyList<WolverineNode> staleNodes)
{
return Task.CompletedTask;
}

public Task RuntimeIsFullyStarted()
{
Output.WriteLine("The WolverineRuntime is fully started");
return Task.CompletedTask;
}

public void EndpointAdded(Endpoint endpoint)
{

}

public void MessageRouted(Type messageType, IMessageRouter router)
{
}

public Task BackPressureTriggered(Endpoint endpoint, IListeningAgent agent)
{
Output.WriteLine("Back Pressure was Triggerd!");
Triggered?.TrySetResult();
return Task.CompletedTask;

}

public Task BackPressureLifted(Endpoint endpoint)
{
Output.WriteLine("Back Pressure was Lifted!");
Lifted?.TrySetResult();
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public ValueTask DisposeAsync()
_complete.SafeDispose();
_defer.SafeDispose();
_deadLetter.SafeDispose();
return _receiver.DisposeAsync();
return new ValueTask();
}

public Uri Address => _endpoint.Uri;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ public async ValueTask StopAsync()

public override async ValueTask DisposeAsync()
{
_receiver.Dispose();
await base.DisposeAsync();

if (_sender.IsValueCreated && _sender.Value is IAsyncDisposable ad)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ public ValueTask DeferAsync(Envelope envelope)
return _receiver.PostAsync(envelope);
}

public async ValueTask DisposeAsync()
public ValueTask DisposeAsync()
{
await _receiver.DisposeAsync();
return new ValueTask();
}

public Uri Address => Uri;
Expand Down
56 changes: 28 additions & 28 deletions wolverine.sln
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,9 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OtelWebApiWolverineMarten",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wolverine.MySql", "src\Persistence\MySql\Wolverine.MySql\Wolverine.MySql.csproj", "{738DB46A-B1B5-4843-A536-A5918918DEB5}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MySqlTests", "src\Persistence\MySql\MySqlTests\MySqlTests.csproj", "{9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MySqlTests", "src\Persistence\MySql\MySqlTests\MySqlTests.csproj", "{382BD656-89CD-4899-A30F-1589578B639F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MySqlTests", "src\Persistence\MySql\MySqlTests\MySqlTests.csproj", "{162630BD-6192-4888-BD52-D257C75F3E52}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BackPressureTests", "src\Testing\BackPressureTests\BackPressureTests.csproj", "{02F5459A-A96B-42AB-9E4E-CF6B067238FB}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -1813,30 +1813,30 @@ Global
{738DB46A-B1B5-4843-A536-A5918918DEB5}.Release|x64.Build.0 = Release|Any CPU
{738DB46A-B1B5-4843-A536-A5918918DEB5}.Release|x86.ActiveCfg = Release|Any CPU
{738DB46A-B1B5-4843-A536-A5918918DEB5}.Release|x86.Build.0 = Release|Any CPU
{9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Debug|x64.ActiveCfg = Debug|Any CPU
{9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Debug|x64.Build.0 = Debug|Any CPU
{9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Debug|x86.ActiveCfg = Debug|Any CPU
{9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Debug|x86.Build.0 = Debug|Any CPU
{9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Release|Any CPU.Build.0 = Release|Any CPU
{9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Release|x64.ActiveCfg = Release|Any CPU
{9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Release|x64.Build.0 = Release|Any CPU
{9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Release|x86.ActiveCfg = Release|Any CPU
{9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Release|x86.Build.0 = Release|Any CPU
{162630BD-6192-4888-BD52-D257C75F3E52}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{162630BD-6192-4888-BD52-D257C75F3E52}.Debug|Any CPU.Build.0 = Debug|Any CPU
{162630BD-6192-4888-BD52-D257C75F3E52}.Debug|x64.ActiveCfg = Debug|Any CPU
{162630BD-6192-4888-BD52-D257C75F3E52}.Debug|x64.Build.0 = Debug|Any CPU
{162630BD-6192-4888-BD52-D257C75F3E52}.Debug|x86.ActiveCfg = Debug|Any CPU
{162630BD-6192-4888-BD52-D257C75F3E52}.Debug|x86.Build.0 = Debug|Any CPU
{162630BD-6192-4888-BD52-D257C75F3E52}.Release|Any CPU.ActiveCfg = Release|Any CPU
{162630BD-6192-4888-BD52-D257C75F3E52}.Release|Any CPU.Build.0 = Release|Any CPU
{162630BD-6192-4888-BD52-D257C75F3E52}.Release|x64.ActiveCfg = Release|Any CPU
{162630BD-6192-4888-BD52-D257C75F3E52}.Release|x64.Build.0 = Release|Any CPU
{162630BD-6192-4888-BD52-D257C75F3E52}.Release|x86.ActiveCfg = Release|Any CPU
{162630BD-6192-4888-BD52-D257C75F3E52}.Release|x86.Build.0 = Release|Any CPU
{382BD656-89CD-4899-A30F-1589578B639F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{382BD656-89CD-4899-A30F-1589578B639F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{382BD656-89CD-4899-A30F-1589578B639F}.Debug|x64.ActiveCfg = Debug|Any CPU
{382BD656-89CD-4899-A30F-1589578B639F}.Debug|x64.Build.0 = Debug|Any CPU
{382BD656-89CD-4899-A30F-1589578B639F}.Debug|x86.ActiveCfg = Debug|Any CPU
{382BD656-89CD-4899-A30F-1589578B639F}.Debug|x86.Build.0 = Debug|Any CPU
{382BD656-89CD-4899-A30F-1589578B639F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{382BD656-89CD-4899-A30F-1589578B639F}.Release|Any CPU.Build.0 = Release|Any CPU
{382BD656-89CD-4899-A30F-1589578B639F}.Release|x64.ActiveCfg = Release|Any CPU
{382BD656-89CD-4899-A30F-1589578B639F}.Release|x64.Build.0 = Release|Any CPU
{382BD656-89CD-4899-A30F-1589578B639F}.Release|x86.ActiveCfg = Release|Any CPU
{382BD656-89CD-4899-A30F-1589578B639F}.Release|x86.Build.0 = Release|Any CPU
{02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Debug|x64.ActiveCfg = Debug|Any CPU
{02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Debug|x64.Build.0 = Debug|Any CPU
{02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Debug|x86.ActiveCfg = Debug|Any CPU
{02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Debug|x86.Build.0 = Debug|Any CPU
{02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Release|Any CPU.Build.0 = Release|Any CPU
{02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Release|x64.ActiveCfg = Release|Any CPU
{02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Release|x64.Build.0 = Release|Any CPU
{02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Release|x86.ActiveCfg = Release|Any CPU
{02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1986,8 +1986,8 @@ Global
{0FD02607-BF12-4201-90F9-3FA88BFCDFBC} = {F429686D-BB41-4E1C-A84E-518F8A289AEF}
{AC643465-CD1E-4E9E-9860-DDAAF956A3DC} = {96119B5E-B5F0-400A-9580-B342EBE26212}
{738DB46A-B1B5-4843-A536-A5918918DEB5} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210}
{9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7} = {96119B5E-B5F0-400A-9580-B342EBE26212}
{162630BD-6192-4888-BD52-D257C75F3E52} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210}
{382BD656-89CD-4899-A30F-1589578B639F} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210}
{02F5459A-A96B-42AB-9E4E-CF6B067238FB} = {96119B5E-B5F0-400A-9580-B342EBE26212}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {30422362-0D90-4DBE-8C97-DD2B5B962768}
Expand Down
Loading