From de6aa355d7703e25cb50bbf3734c29962001bf60 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 16 Feb 2026 10:44:03 -0600 Subject: [PATCH 1/2] Automatic local fanout for separated handlers from external endpoints (GH-2198) When using MultipleHandlerBehavior.Separated with external brokers, messages now automatically fan out to all local handler queues instead of throwing NoHandlerForEndpointException. Co-Authored-By: Claude Opus 4.6 --- ...om_external_to_separated_local_handlers.cs | 84 +++++++++++++++++++ .../Runtime/Handlers/FanoutMessageHandler.cs | 31 +++++++ .../Runtime/Handlers/HandlerGraph.cs | 33 +++++++- 3 files changed, 147 insertions(+), 1 deletion(-) create mode 100644 src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/fanout_from_external_to_separated_local_handlers.cs create mode 100644 src/Wolverine/Runtime/Handlers/FanoutMessageHandler.cs diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/fanout_from_external_to_separated_local_handlers.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/fanout_from_external_to_separated_local_handlers.cs new file mode 100644 index 000000000..63a85c4a2 --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/fanout_from_external_to_separated_local_handlers.cs @@ -0,0 +1,84 @@ +using JasperFx.Core; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.Attributes; +using Wolverine.Tracking; +using Xunit; +using Xunit.Abstractions; + +namespace Wolverine.RabbitMQ.Tests; + +public class fanout_from_external_to_separated_local_handlers(ITestOutputHelper output) +{ + [Fact] + public async Task should_fanout_to_all_local_handlers_from_external_endpoint() + { + var queueName = RabbitTesting.NextQueueName(); + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRabbitMq() + .AutoProvision() + .AutoPurgeOnStartup(); + + opts.ListenToRabbitQueue(queueName); + opts.PublishMessage().ToRabbitQueue(queueName); + + opts.Discovery.DisableConventionalDiscovery() + .IncludeType() + .IncludeType() + .IncludeType(); + + opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; + }) + .StartAsync(); + + var message = new FanoutTestMessage(Guid.NewGuid()); + + var tracked = await host.TrackActivity() + .IncludeExternalTransports() + .Timeout(30.Seconds()) + .SendMessageAndWaitAsync(message); + + var all = tracked.AllRecordsInOrder().ToArray(); + foreach (var record in all) + { + output.WriteLine(record.ToString()); + } + + // 1 execution for the fanout handler at the RabbitMQ endpoint + + // 3 executions for each local handler + var executed = tracked.Executed.MessagesOf().ToArray(); + executed.Length.ShouldBe(4); + } +} + +public record FanoutTestMessage(Guid Id); + +[WolverineIgnore] +public class FanoutHandlerOne +{ + public void Handle(FanoutTestMessage message) + { + // handled + } +} + +[WolverineIgnore] +public class FanoutHandlerTwo +{ + public void Handle(FanoutTestMessage message) + { + // handled + } +} + +[WolverineIgnore] +public class FanoutHandlerThree +{ + public void Handle(FanoutTestMessage message) + { + // handled + } +} diff --git a/src/Wolverine/Runtime/Handlers/FanoutMessageHandler.cs b/src/Wolverine/Runtime/Handlers/FanoutMessageHandler.cs new file mode 100644 index 000000000..b680829f9 --- /dev/null +++ b/src/Wolverine/Runtime/Handlers/FanoutMessageHandler.cs @@ -0,0 +1,31 @@ +namespace Wolverine.Runtime.Handlers; + +internal class FanoutMessageHandler : MessageHandler +{ + private readonly Uri[] _localQueueUris; + + public FanoutMessageHandler(Uri[] localQueueUris, HandlerChain chain) + { + _localQueueUris = localQueueUris; + Chain = chain; + } + + protected override async Task HandleAsync(T message, MessageContext context, CancellationToken cancellation) + { + var incoming = context.Envelope!; + DeliveryOptions? options = null; + if (incoming.Headers.Count > 0) + { + options = new DeliveryOptions(); + foreach (var header in incoming.Headers) + { + options.WithHeader(header.Key, header.Value ?? string.Empty); + } + } + + foreach (var uri in _localQueueUris) + { + await context.EndpointFor(uri).SendAsync(message, options); + } + } +} diff --git a/src/Wolverine/Runtime/Handlers/HandlerGraph.cs b/src/Wolverine/Runtime/Handlers/HandlerGraph.cs index 402b51f30..de86db149 100644 --- a/src/Wolverine/Runtime/Handlers/HandlerGraph.cs +++ b/src/Wolverine/Runtime/Handlers/HandlerGraph.cs @@ -183,9 +183,19 @@ public void AddRange(IEnumerable calls) { if (!chain.HasDefaultNonStickyHandlers()) { + // If all sticky handlers target local queues, create a fanout handler + // to relay the message from the external endpoint to each local queue + var allLocal = chain.ByEndpoint.All( + c => c.Endpoints.All(e => e is LocalQueue)); + + if (allLocal && endpoint is not LocalQueue) + { + return getOrBuildFanoutHandler(messageType, chain); + } + throw new NoHandlerForEndpointException(messageType, endpoint.Uri); } - + return HandlerFor(messageType); } @@ -270,6 +280,27 @@ public void AddRange(IEnumerable calls) return handler; } + private IMessageHandler getOrBuildFanoutHandler(Type messageType, HandlerChain chain) + { + if (_handlers.TryFind(messageType, out var cached) && cached != null) + { + return cached; + } + + var localUris = chain.ByEndpoint + .SelectMany(c => c.Endpoints) + .OfType() + .Select(e => e.Uri) + .Distinct() + .ToArray(); + + var handlerType = typeof(FanoutMessageHandler<>).MakeGenericType(messageType); + var handler = (IMessageHandler)Activator.CreateInstance(handlerType, localUris, chain)!; + + _handlers = _handlers.AddOrUpdate(messageType, handler); + return handler; + } + internal void Compile(WolverineOptions options, IServiceContainer container) { if (_hasCompiled) From 6497c6e4c1fc9b5f9a35ce6f59ed5f08ba1bcdd0 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 16 Feb 2026 10:46:35 -0600 Subject: [PATCH 2/2] Document automatic fanout behavior for separated handlers in modular monolith tutorial Co-Authored-By: Claude Opus 4.6 --- docs/tutorials/modular-monolith.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/tutorials/modular-monolith.md b/docs/tutorials/modular-monolith.md index 93c63b221..940558f4b 100644 --- a/docs/tutorials/modular-monolith.md +++ b/docs/tutorials/modular-monolith.md @@ -128,6 +128,13 @@ you also get: * The ability to mix and match [durable vs lighter weight "fire and forget"](/guide/runtime.html#endpoint-types) (`Buffered` in Wolverine parlance) semantics for different handlers * Granular tracing and logging on the handlers +::: tip +When using `MultipleHandlerBehavior.Separated`, Wolverine automatically fans out messages arriving from external +broker endpoints (RabbitMQ, Azure Service Bus, Kafka, etc.) to all the separated local handler queues. This means you +don't need any special routing configuration -- a single message received from an external queue will be forwarded to +each local handler queue automatically, so every separated handler processes its own copy of the message independently. +::: + ## Splitting Your System into Separate Assemblies