Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/tutorials/modular-monolith.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ you also get:
* The ability to mix and match [durable vs lighter weight "fire and forget"](/guide/runtime.html#endpoint-types) (`Buffered` in Wolverine parlance) semantics for different handlers
* Granular tracing and logging on the handlers

::: tip
When using `MultipleHandlerBehavior.Separated`, Wolverine automatically fans out messages arriving from external
broker endpoints (RabbitMQ, Azure Service Bus, Kafka, etc.) to all the separated local handler queues. This means you
don't need any special routing configuration -- a single message received from an external queue will be forwarded to
each local handler queue automatically, so every separated handler processes its own copy of the message independently.
:::


## Splitting Your System into Separate Assemblies

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
using JasperFx.Core;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.Attributes;
using Wolverine.Tracking;
using Xunit;
using Xunit.Abstractions;

namespace Wolverine.RabbitMQ.Tests;

public class fanout_from_external_to_separated_local_handlers(ITestOutputHelper output)
{
[Fact]
public async Task should_fanout_to_all_local_handlers_from_external_endpoint()
{
var queueName = RabbitTesting.NextQueueName();

using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseRabbitMq()
.AutoProvision()
.AutoPurgeOnStartup();

opts.ListenToRabbitQueue(queueName);
opts.PublishMessage<FanoutTestMessage>().ToRabbitQueue(queueName);

opts.Discovery.DisableConventionalDiscovery()
.IncludeType<FanoutHandlerOne>()
.IncludeType<FanoutHandlerTwo>()
.IncludeType<FanoutHandlerThree>();

opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated;
})
.StartAsync();

var message = new FanoutTestMessage(Guid.NewGuid());

var tracked = await host.TrackActivity()
.IncludeExternalTransports()
.Timeout(30.Seconds())
.SendMessageAndWaitAsync(message);

var all = tracked.AllRecordsInOrder().ToArray();
foreach (var record in all)
{
output.WriteLine(record.ToString());
}

// 1 execution for the fanout handler at the RabbitMQ endpoint +
// 3 executions for each local handler
var executed = tracked.Executed.MessagesOf<FanoutTestMessage>().ToArray();
executed.Length.ShouldBe(4);
}
}

public record FanoutTestMessage(Guid Id);

[WolverineIgnore]
public class FanoutHandlerOne
{
public void Handle(FanoutTestMessage message)
{
// handled
}
}

[WolverineIgnore]
public class FanoutHandlerTwo
{
public void Handle(FanoutTestMessage message)
{
// handled
}
}

[WolverineIgnore]
public class FanoutHandlerThree
{
public void Handle(FanoutTestMessage message)
{
// handled
}
}
31 changes: 31 additions & 0 deletions src/Wolverine/Runtime/Handlers/FanoutMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
namespace Wolverine.Runtime.Handlers;

internal class FanoutMessageHandler<T> : MessageHandler<T>
{
private readonly Uri[] _localQueueUris;

public FanoutMessageHandler(Uri[] localQueueUris, HandlerChain chain)
{
_localQueueUris = localQueueUris;
Chain = chain;
}

protected override async Task HandleAsync(T message, MessageContext context, CancellationToken cancellation)
{
var incoming = context.Envelope!;
DeliveryOptions? options = null;
if (incoming.Headers.Count > 0)
{
options = new DeliveryOptions();
foreach (var header in incoming.Headers)
{
options.WithHeader(header.Key, header.Value ?? string.Empty);
}
}

foreach (var uri in _localQueueUris)
{
await context.EndpointFor(uri).SendAsync(message, options);
}
}
}
33 changes: 32 additions & 1 deletion src/Wolverine/Runtime/Handlers/HandlerGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,19 @@ public void AddRange(IEnumerable<HandlerCall> calls)
{
if (!chain.HasDefaultNonStickyHandlers())
{
// If all sticky handlers target local queues, create a fanout handler
// to relay the message from the external endpoint to each local queue
var allLocal = chain.ByEndpoint.All(
c => c.Endpoints.All(e => e is LocalQueue));

if (allLocal && endpoint is not LocalQueue)
{
return getOrBuildFanoutHandler(messageType, chain);
}

throw new NoHandlerForEndpointException(messageType, endpoint.Uri);
}

return HandlerFor(messageType);
}

Expand Down Expand Up @@ -270,6 +280,27 @@ public void AddRange(IEnumerable<HandlerCall> calls)
return handler;
}

private IMessageHandler getOrBuildFanoutHandler(Type messageType, HandlerChain chain)
{
if (_handlers.TryFind(messageType, out var cached) && cached != null)
{
return cached;
}

var localUris = chain.ByEndpoint
.SelectMany(c => c.Endpoints)
.OfType<LocalQueue>()
.Select(e => e.Uri)
.Distinct()
.ToArray();

var handlerType = typeof(FanoutMessageHandler<>).MakeGenericType(messageType);
var handler = (IMessageHandler)Activator.CreateInstance(handlerType, localUris, chain)!;

_handlers = _handlers.AddOrUpdate(messageType, handler);
return handler;
}

internal void Compile(WolverineOptions options, IServiceContainer container)
{
if (_hasCompiled)
Expand Down
Loading