Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using System.Collections.Concurrent;
using JasperFx.CodeGeneration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Wolverine;
using Wolverine.Runtime.Handlers;
using Xunit;

namespace CoreTests.Runtime.Handlers;

public class concurrent_saga_chain_compilation
{
[Fact]
public async Task concurrent_first_time_resolution_of_a_saga_handler_does_not_throw()
{
const int iterations = 20;
const int concurrency = 64;

for (var i = 0; i < iterations; i++)
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Discovery.DisableConventionalDiscovery().IncludeType<RaceSaga>();
opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated;
opts.CodeGeneration.TypeLoadMode = TypeLoadMode.Dynamic;
}).StartAsync();

var graph = host.Services.GetRequiredService<HandlerGraph>();

var start = new ManualResetEventSlim(false);
var failures = new ConcurrentBag<Exception>();
var threads = new Thread[concurrency];

for (var t = 0; t < concurrency; t++)
{
threads[t] = new Thread(() =>
{
start.Wait();
try
{
graph.HandlerFor(typeof(StartRace));
}
catch (Exception ex)
{
failures.Add(ex);
}
});
threads[t].Start();
}

start.Set();

foreach (var thread in threads) thread.Join();

failures.ShouldBeEmpty();
}
}
}

public record StartRace(Guid Id);

public class RaceSaga : Wolverine.Saga
{
public Guid Id { get; set; }

public static RaceSaga Start(StartRace cmd) => new() { Id = cmd.Id };
}
32 changes: 17 additions & 15 deletions src/Wolverine/Runtime/Handlers/HandlerGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -272,30 +272,32 @@ public void AddRange(IEnumerable<HandlerCall> calls)
{
handler = chain.Handler;
}
else if (!chain.HasDefaultNonStickyHandlers())
{
throw new NoHandlerForEndpointException(messageType);
}
else
{
// SagaChain.DetermineFrames clears Handlers during codegen, so the
// HasDefaultNonStickyHandlers check has to happen inside the lock
lock (_compilingLock)
{
// TODO -- put this logic in JasperFx
var logger = Container?.Services.GetService<ILoggerFactory>()?.CreateLogger<HandlerGraph>() ?? new Logger<HandlerGraph>(new LoggerFactory([new DebugLoggerProvider()]));

logger.LogDebug("Starting to compile chain {MessageType}", chain.MessageType.NameInCode());

if (chain.Handler == null)
if (chain.Handler != null)
{
chain.InitializeSynchronously(Rules, this, Container!.Services);
handler = chain.CreateHandler(Container!);
handler = chain.Handler;
}
else
else if (!chain.HasDefaultNonStickyHandlers())
{
handler = chain.Handler;
throw new NoHandlerForEndpointException(messageType);
}
else
{
// TODO -- put this logic in JasperFx
var logger = Container?.Services.GetService<ILoggerFactory>()?.CreateLogger<HandlerGraph>() ?? new Logger<HandlerGraph>(new LoggerFactory([new DebugLoggerProvider()]));

logger.LogDebug("Finished building the chain {MessageType}", chain.MessageType.NameInCode());
logger.LogDebug("Starting to compile chain {MessageType}", chain.MessageType.NameInCode());

chain.InitializeSynchronously(Rules, this, Container!.Services);
handler = chain.CreateHandler(Container!);

logger.LogDebug("Finished building the chain {MessageType}", chain.MessageType.NameInCode());
}
}
}

Expand Down
Loading