diff --git a/docs/guide/handlers/batching.md b/docs/guide/handlers/batching.md index 14e6db2d1..ac74b8c6b 100644 --- a/docs/guide/handlers/batching.md +++ b/docs/guide/handlers/batching.md @@ -131,9 +131,64 @@ public async Task send_end_to_end_with_batch() Alright, with all that being said, here's a few more facts about the batch messaging support: 1. There is absolutely no need to create a specific message handler for the `Item` message, and in fact, you should - not do so + not do so -- *unless* you are running in `MultipleHandlerBehavior.Separated` mode and deliberately want both a + per-message handler and a batched handler (see [Combining a direct handler with batching](#combining-a-direct-handler-with-batching) below) 2. The message batching is able to group the message batches by tenant id *if* your Wolverine system uses multi-tenancy +## Combining a direct handler with batching + +By default Wolverine assumes the batch handler is the *only* consumer of the element type, so an incoming `Item` +is always routed straight to the batch. If you *also* declare a direct `Handle(Item)` handler alongside +`BatchMessagesOf()`, the direct handler wins and the batch is silently shadowed -- the batched handler never runs. + +The one exception is `MultipleHandlerBehavior.Separated`. Under that mode Wolverine treats the per-message handler and +the batched handler as two independent consumers of `Item`, so **both** run for every `Item`: + +```csharp +opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; +opts.BatchMessagesOf(); + +// Direct, per-message handler +public static class ItemAuditHandler +{ + public static void Handle(Item item) { /* runs once per message */ } +} + +// Batched handler +public static class ItemHandler +{ + public static void Handle(Item[] items) { /* runs once per assembled batch */ } +} +``` + +To make this work, Wolverine moves the batch onto its own dedicated local queue (the element type's queue name with a +`-batch` suffix) so it no longer collides with the direct handler's queue, and fans every `Item` out to both queues. +This applies to messages published in-process *and* to `Item` messages arriving from an external transport listener. + +### Multiple batched handlers + +`MultipleHandlerBehavior.Separated` also lets you register **more than one** batched handler for the same element type -- +for example one handler that publishes an integration event for the batch and another that archives it: + +```csharp +opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; +opts.BatchMessagesOf(); + +public static class ItemPublisher +{ + public static void Handle(Item[] items) { /* publish an integration event */ } +} + +public static class ItemArchiver +{ + public static void Handle(Item[] items) { /* archive the batch */ } +} +``` + +Under `Separated` mode each `Handle(Item[])` handler is given its own sticky queue, so Wolverine fans the assembled +batch out to every one of them and each runs independently. (Under the default `Classic` behavior the multiple +`Handle(Item[])` handlers are instead combined into a single logical handler that invokes each one in turn.) + ## What about durable messaging ("inbox")? The durable inbox behaves just a little bit differently for message batching. Wolverine will technically diff --git a/src/Testing/CoreTests/Bugs/Bug_separated_batch_and_single_handler.cs b/src/Testing/CoreTests/Bugs/Bug_separated_batch_and_single_handler.cs new file mode 100644 index 000000000..32c32bd64 --- /dev/null +++ b/src/Testing/CoreTests/Bugs/Bug_separated_batch_and_single_handler.cs @@ -0,0 +1,140 @@ +using JasperFx.Core; +using JasperFx.Core.Reflection; +using Microsoft.Extensions.Hosting; +using Wolverine.ComplianceTests; +using Wolverine.Runtime; +using Wolverine.Runtime.Handlers; +using Wolverine.Runtime.Routing; +using Wolverine.Tracking; +using Wolverine.Transports.Local; +using Xunit; + +namespace CoreTests.Bugs; + +// Reproduction of the dom-order-api Funding scenario: +// - MultipleHandlerBehavior.Separated +// - BatchMessagesOf() +// - LoadPublisher: batch handler Handle(LoadEvent[] messages) +// - LoadTelemetry: single handler Handle(LoadEvent e) +// Under Separated mode, a LoadEvent must invoke BOTH the direct handler AND the batch +// handler. Before the fix the direct handler silently shadowed the batch. +public class Bug_separated_batch_and_single_handler +{ + [Fact] + public async Task separated_direct_and_batch_handler_both_run_on_local_publish() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(typeof(LoadPublisher)) + .IncludeType(typeof(LoadTelemetry)); + + opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; + opts.BatchMessagesOf(b => b.TriggerTime = 250.Milliseconds()); + }) + .StartAsync(); + + LoadPublisher.BatchCalls.Clear(); + LoadTelemetry.SingleCalls.Clear(); + + await host.TrackActivity() + .Timeout(10.Seconds()) + .WaitForMessageToBeReceivedAt(host) + .ExecuteAndWaitAsync(c => c.PublishAsync(new LoadEvent(1))); + + // The direct telemetry handler runs per-message... + LoadTelemetry.SingleCalls.Count.ShouldBe(1); + // ...and the batch publisher handler also runs on the batched array. + LoadPublisher.BatchCalls.Count.ShouldBe(1); + LoadPublisher.BatchCalls[0].Select(x => x.Id).ShouldBe([1]); + } + + [Fact] + public async Task batch_lives_on_its_own_queue_and_message_fans_out_to_both() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(typeof(LoadPublisher)) + .IncludeType(typeof(LoadTelemetry)); + + opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; + opts.BatchMessagesOf(); + }) + .StartAsync(); + + var runtime = host.GetRuntime(); + + // The batch was moved off the element type's default queue onto a dedicated one. + var batch = runtime.Options.BatchDefinitions.Single(); + batch.LocalExecutionQueueName!.ShouldEndWith("-batch"); + var batchUri = new Uri("local://" + batch.LocalExecutionQueueName); + + // Routing a LoadEvent now fans out to BOTH the direct handler queue and the batch queue. + var destinations = runtime.RoutingFor(typeof(LoadEvent)) + .ShouldBeOfType>() + .Routes.OfType().Select(x => x.Sender.Destination).ToArray(); + destinations.ShouldContain(batchUri); + destinations.Length.ShouldBe(2); + + // The dedicated batch queue resolves to the batching processor (not the direct handler). + var batchQueue = runtime.Endpoints.EndpointFor(batchUri)!; + var batchHandler = runtime.As().BuildFor(typeof(LoadEvent), batchQueue) + .ShouldBeOfType().Handler; + batchHandler.ShouldBeOfType>(); + } + + [Fact] + public async Task separated_external_arrival_resolves_to_fanout_for_conflicting_element_type() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(typeof(LoadPublisher)) + .IncludeType(typeof(LoadTelemetry)); + + opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; + opts.BatchMessagesOf(); + + // A non-local (external) listener — a LoadEvent arriving here must still + // reach both the direct handler and the batch. + opts.ListenForMessagesFrom("stub://external"); + }) + .StartAsync(); + + var runtime = host.GetRuntime(); + var external = runtime.Options.Transports.AllEndpoints() + .First(e => e is not LocalQueue && e.Uri.Scheme == "stub"); + + // A LoadEvent arriving from an external listener relays to BOTH local queues + // (direct + batch) via a fanout handler, so both run independently. + var handler = runtime.As().BuildFor(typeof(LoadEvent), external) + .ShouldBeOfType().Handler; + handler.ShouldBeOfType>(); + } +} + +public record LoadEvent(int Id); + +public static class LoadPublisher +{ + public static readonly List BatchCalls = new(); + + public static void Handle(LoadEvent[] messages) + { + BatchCalls.Add(messages); + } +} + +public static class LoadTelemetry +{ + public static readonly List SingleCalls = new(); + + public static void Handle(LoadEvent e) + { + SingleCalls.Add(e); + } +} diff --git a/src/Testing/CoreTests/Bugs/Bug_separated_multiple_batch_handlers.cs b/src/Testing/CoreTests/Bugs/Bug_separated_multiple_batch_handlers.cs new file mode 100644 index 000000000..81ff10743 --- /dev/null +++ b/src/Testing/CoreTests/Bugs/Bug_separated_multiple_batch_handlers.cs @@ -0,0 +1,132 @@ +using JasperFx.Core; +using JasperFx.Core.Reflection; +using Microsoft.Extensions.Hosting; +using Wolverine.ComplianceTests; +using Wolverine.Runtime; +using Wolverine.Runtime.Handlers; +using Wolverine.Tracking; +using Xunit; + +namespace CoreTests.Bugs; + +// Companion to Bug_separated_batch_and_single_handler: under MultipleHandlerBehavior.Separated, +// when the batched array type T[] has MULTIPLE Handle(T[]) handlers, Wolverine splits them onto +// per-handler sticky queues. The BatchingProcessor re-enqueues a single produced T[] onto the +// batch's own execution queue, which is none of those sticky queues. Before the fan-out fix this +// threw NoHandlerForEndpointException and no batch handler ran. +public class Bug_separated_multiple_batch_handlers +{ + private static IHostBuilder ConfigureHost(bool withDirectHandler) + { + return Host.CreateDefaultBuilder().UseWolverine(opts => + { + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(typeof(InvoicePublisher)) + .IncludeType(typeof(InvoiceArchiver)); + + if (withDirectHandler) + { + opts.Discovery.IncludeType(typeof(InvoiceTelemetry)); + } + + opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; + opts.BatchMessagesOf(b => b.TriggerTime = 250.Milliseconds()); + }); + } + + private static async Task WaitForAsync(Func condition) + { + for (var i = 0; i < 50 && !condition(); i++) + { + await Task.Delay(100.Milliseconds()); + } + } + + [Fact] + public async Task produced_array_fans_out_to_each_sticky_handler_queue() + { + using var host = await ConfigureHost(withDirectHandler: false).StartAsync(); + var runtime = host.GetRuntime(); + + // The batch's execution queue receives the produced InvoiceEvent[]... + var batch = runtime.Options.BatchDefinitions.Single(); + var batchUri = new Uri("local://" + batch.LocalExecutionQueueName); + var batchQueue = runtime.Endpoints.EndpointFor(batchUri)!; + + // ...but the two Handle(InvoiceEvent[]) handlers were separated onto their own sticky queues, + // so the array must fan out from the batch queue to each of them. + var handler = runtime.As().BuildFor(typeof(InvoiceEvent[]), batchQueue) + .ShouldBeOfType().Handler; + handler.ShouldBeOfType>(); + } + + [Fact] + public async Task separated_multiple_batch_handlers_all_run() + { + using var host = await ConfigureHost(withDirectHandler: false).StartAsync(); + + InvoicePublisher.Calls.Clear(); + InvoiceArchiver.Calls.Clear(); + + await host.TrackActivity() + .Timeout(10.Seconds()) + .WaitForMessageToBeReceivedAt(host) + .ExecuteAndWaitAsync(c => c.PublishAsync(new InvoiceEvent(1))); + + // The array reception condition fires at the batch queue (the fan-out); give the two + // relayed sticky deliveries time to drain before asserting both handlers ran. + await WaitForAsync(() => InvoicePublisher.Calls.Count == 1 && InvoiceArchiver.Calls.Count == 1); + + InvoicePublisher.Calls.Count.ShouldBe(1); + InvoiceArchiver.Calls.Count.ShouldBe(1); + } + + [Fact] + public async Task separated_direct_handler_plus_multiple_batch_handlers_all_run() + { + using var host = await ConfigureHost(withDirectHandler: true).StartAsync(); + var runtime = host.GetRuntime(); + + // The direct Handle(InvoiceEvent) collides with the batch element queue, so the batch was + // moved onto its dedicated -batch queue (Bug_separated_batch_and_single_handler behavior). + runtime.Options.BatchDefinitions.Single().LocalExecutionQueueName!.ShouldEndWith("-batch"); + + InvoicePublisher.Calls.Clear(); + InvoiceArchiver.Calls.Clear(); + InvoiceTelemetry.Calls.Clear(); + + await host.TrackActivity() + .Timeout(10.Seconds()) + .WaitForMessageToBeReceivedAt(host) + .ExecuteAndWaitAsync(c => c.PublishAsync(new InvoiceEvent(7))); + + await WaitForAsync(() => + InvoiceTelemetry.Calls.Count == 1 && InvoicePublisher.Calls.Count == 1 && + InvoiceArchiver.Calls.Count == 1); + + // The direct per-message handler AND both independent batch handlers all run. + InvoiceTelemetry.Calls.Count.ShouldBe(1); + InvoicePublisher.Calls.Count.ShouldBe(1); + InvoiceArchiver.Calls.Count.ShouldBe(1); + } +} + +public record InvoiceEvent(int Id); + +public static class InvoicePublisher +{ + public static readonly List Calls = new(); + public static void Handle(InvoiceEvent[] messages) => Calls.Add(messages); +} + +public static class InvoiceArchiver +{ + public static readonly List Calls = new(); + public static void Handle(InvoiceEvent[] messages) => Calls.Add(messages); +} + +public static class InvoiceTelemetry +{ + public static readonly List Calls = new(); + public static void Handle(InvoiceEvent e) => Calls.Add(e); +} diff --git a/src/Wolverine/Runtime/Handlers/HandlerGraph.cs b/src/Wolverine/Runtime/Handlers/HandlerGraph.cs index 8703d379f..a472253d5 100644 --- a/src/Wolverine/Runtime/Handlers/HandlerGraph.cs +++ b/src/Wolverine/Runtime/Handlers/HandlerGraph.cs @@ -299,8 +299,6 @@ public void AddRange(IEnumerable calls) // populate _handlers at HandlerGraph.Compile time so the steady-state // hot path is pure cache lookups. The pre-population work is tracked in // #2769 (CloseAndBuildAs elimination). - [UnconditionalSuppressMessage("AOT", "IL3050", - Justification = "FanoutMessageHandler<> closed over runtime messageType; AOT consumers pre-populate _handlers via TypeLoadMode.Static. See AOT guide / #2769.")] private IMessageHandler getOrBuildFanoutHandler(Type messageType, HandlerChain chain) { if (_handlers.TryFind(messageType, out var cached) && cached != null) @@ -315,13 +313,26 @@ private IMessageHandler getOrBuildFanoutHandler(Type messageType, HandlerChain c .Distinct() .ToArray(); - var handlerType = typeof(FanoutMessageHandler<>).MakeGenericType(messageType); - var handler = (IMessageHandler)Activator.CreateInstance(handlerType, localUris, chain)!; + var handler = BuildFanoutHandler(messageType, chain, localUris); _handlers = _handlers.AddOrUpdate(messageType, handler); return handler; } + // Closes FanoutMessageHandler<> over the runtime-resolved messageType and + // Activator.CreateInstance's the result. Used both for sticky-handler fanout + // (getOrBuildFanoutHandler, cached in _handlers) and for relaying an externally- + // arriving element type to its direct + batch local queues (NOT cached — the + // _handlers[messageType] slot belongs to the direct handler). AOT-clean apps in + // TypeLoadMode.Static pre-populate handlers at Compile time. See #2769. + [UnconditionalSuppressMessage("AOT", "IL3050", + Justification = "FanoutMessageHandler<> closed over runtime messageType; AOT consumers pre-populate handlers via TypeLoadMode.Static. See AOT guide / #2769.")] + internal IMessageHandler BuildFanoutHandler(Type messageType, HandlerChain chain, Uri[] localQueueUris) + { + var handlerType = typeof(FanoutMessageHandler<>).MakeGenericType(messageType); + return (IMessageHandler)Activator.CreateInstance(handlerType, localQueueUris, chain)!; + } + // Compile is the bootstrap-time handler-graph build. The remaining IL2026 // here comes from registerMessageTypes' MakeGenericType / GetInterfaces walks // (pre-population work tracked in #2769) and from the opt-in diff --git a/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs b/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs index cc76a3eb4..7db2e6dfd 100644 --- a/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs +++ b/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs @@ -4,6 +4,7 @@ using Wolverine.Runtime.Agents; using Wolverine.Runtime.Handlers; using Wolverine.Runtime.Partitioning; +using Wolverine.Transports.Local; using Wolverine.Util; namespace Wolverine.Runtime; @@ -20,16 +21,76 @@ IExecutor IExecutorFactory.BuildFor(Type messageType) IExecutor IExecutorFactory.BuildFor(Type messageType, Endpoint endpoint) { IMessageHandler? handler = null; - if (Options.MessagePartitioning.TryFindTopology(messageType, out var topology)) + + // If this endpoint is the dedicated batch queue for the element type, always use the + // batching processor — even when the element type also has a direct Handle(T) handler + // (the Separated direct-handler + batch case, where HandlerFor would otherwise return + // the direct handler and shadow the batch). + var batchForEndpoint = Options.BatchDefinitions.FirstOrDefault(x => + x.ElementType == messageType && + string.Equals(endpoint.EndpointName, x.LocalExecutionQueueName, StringComparison.OrdinalIgnoreCase)); + if (batchForEndpoint != null) + { + handler = batchForEndpoint.BuildHandler(this); + } + + // External-arrival parity (Separated): when an element type that has BOTH a direct + // handler and a batch definition arrives on a non-local endpoint, relay it to both + // local queues (direct + batch) so both run, mirroring the local fan-out routing. + if (handler == null && endpoint is not LocalQueue && + Options.MultipleHandlerBehavior == MultipleHandlerBehavior.Separated) + { + var batchDefinition = Options.BatchDefinitions.FirstOrDefault(x => x.ElementType == messageType); + var directChain = Handlers.ChainFor(messageType); + if (batchDefinition?.LocalExecutionQueueName is { } batchQueueName && directChain != null) + { + var local = Options.Transports.GetOrCreate(); + var directQueueUri = local.FindQueueForMessageType(messageType).Uri; + var batchQueueUri = local.QueueFor(batchQueueName).Uri; + handler = Handlers.BuildFanoutHandler(messageType, directChain, [directQueueUri, batchQueueUri]); + } + } + + // Multiple Separated batch handlers: when the produced batch-message type (T[] or a custom + // batch type) has MORE THAN ONE Handle handler, Separated mode splits them onto per-handler + // sticky queues. The BatchingProcessor re-enqueues a single produced batch onto the batch's + // own execution queue — which is none of those sticky queues — so HandlerFor would throw. + // Relay the produced batch from that queue to every sticky handler queue via a fan-out. + if (handler == null && endpoint is LocalQueue && + Options.MultipleHandlerBehavior == MultipleHandlerBehavior.Separated) + { + var thisQueueProducesTheBatch = Options.BatchDefinitions.Any(x => + x.Batcher.BatchMessageType == messageType && + string.Equals(endpoint.EndpointName, x.LocalExecutionQueueName, StringComparison.OrdinalIgnoreCase)); + if (thisQueueProducesTheBatch) + { + var batchChain = Handlers.ChainFor(messageType); + if (batchChain != null && batchChain.ByEndpoint.Any() && !batchChain.HasDefaultNonStickyHandlers()) + { + var stickyLocalUris = batchChain.ByEndpoint + .SelectMany(c => c.Endpoints) + .OfType() + .Select(e => e.Uri) + .Distinct() + .ToArray(); + if (stickyLocalUris.Length != 0) + { + handler = Handlers.BuildFanoutHandler(messageType, batchChain, stickyLocalUris); + } + } + } + } + + if (handler == null && Options.MessagePartitioning.TryFindTopology(messageType, out var topology)) { if (!topology!.Slots.Contains(endpoint)) { handler = new PartitionedMessageReRouter(topology, messageType); } } - + handler ??= (IMessageHandler?)Handlers.HandlerFor(messageType, endpoint); - if (handler == null ) + if (handler == null) { var batching = Options.BatchDefinitions.FirstOrDefault(x => x.ElementType == messageType); if (batching != null) @@ -49,7 +110,7 @@ IExecutor IExecutorFactory.BuildFor(Type messageType, Endpoint endpoint) var accumulator = MetricsAccumulator.FindAccumulator(messageType.ToMessageTypeName(), endpoint); tracker = new HybridMetricsPublishingMessageTracker(this, accumulator.EntryPoint); } - + var executor = handler == null ? new NoHandlerExecutor(messageType, this) : Executor.Build(this, ExecutionPool, Handlers, handler, tracker); diff --git a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs index cc691925c..411b602c3 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs @@ -98,6 +98,15 @@ public async Task StartAsync(CancellationToken cancellationToken) // Build up the message handlers Handlers.Compile(Options, _container); + // Under MultipleHandlerBehavior.Separated, a message type may have BOTH a direct + // Handle(T) handler AND a BatchMessagesOf() batch handler. By default the batch + // local queue is the element type's convention queue — the SAME queue the direct + // handler uses — so the two collide (a local queue resolves a single executor per + // message type) and the batch is silently shadowed. Move the batch onto a dedicated + // queue so both can run independently. Done before the messaging transports start so + // the new queue still receives the durable/local-queue endpoint policies. + reassignBatchQueuesThatCollideWithHandlers(); + // Pre-populate the message-type-name cache so the per-message ToMessageTypeName() // hot path inside Envelope construction never pays the first-occurrence reflection // cost (attribute reads, interface walks, generic-type pretty-printing). @@ -522,6 +531,49 @@ private async Task executeIdleSendingAgentCleanup() } } + // Suffix appended to the element type's convention queue name to host the batch processor + // when the same element type also has a direct handler under Separated mode. + internal const string BatchQueueSuffix = "-batch"; + + private void reassignBatchQueuesThatCollideWithHandlers() + { + if (Options.MultipleHandlerBehavior != MultipleHandlerBehavior.Separated) + { + return; + } + + if (Options.BatchDefinitions.Count == 0) + { + return; + } + + var local = Options.Transports.GetOrCreate(); + + foreach (var batch in Options.BatchDefinitions) + { + // No direct Handle(T) handler for the element type -> the batch owns the element + // type's queue and the existing fallback routing/executor behavior is correct. + if (Handlers.ChainFor(batch.ElementType) == null) + { + continue; + } + + // The user explicitly pointed the batch at a distinct queue already -> respect it. + var directQueue = local.FindQueueForMessageType(batch.ElementType); + if (!string.Equals(batch.LocalExecutionQueueName, directQueue.EndpointName, + StringComparison.OrdinalIgnoreCase)) + { + continue; + } + + // Move the batch onto a dedicated queue distinct from the direct handler's queue. + var batchQueueName = directQueue.EndpointName + BatchQueueSuffix; + var batchQueue = local.QueueFor(batchQueueName); + batchQueue.Mode = directQueue.Mode; + batch.LocalExecutionQueueName = batchQueue.EndpointName; + } + } + private void discoverListenersFromConventions() { // Let any registered routing conventions discover listener endpoints diff --git a/src/Wolverine/Runtime/WolverineRuntime.Routing.cs b/src/Wolverine/Runtime/WolverineRuntime.Routing.cs index 2cba53d1b..5d12b26c8 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.Routing.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.Routing.cs @@ -128,7 +128,25 @@ public IEnumerable FindRoutes(Type messageType, IWolverineRuntime if (options.HandlerGraph.CanHandle(messageType)) { - var endpoints = options.LocalRouting.DiscoverSenders(messageType, runtime).ToArray(); + var endpoints = options.LocalRouting.DiscoverSenders(messageType, runtime).ToList(); + + // Under MultipleHandlerBehavior.Separated, an element type may have BOTH a direct + // handler (covered above) AND a BatchMessagesOf() batch handler living on its own + // dedicated queue. Fan the element type out to the batch queue as well so the batch + // handler runs independently of the direct handler. + if (options.MultipleHandlerBehavior == MultipleHandlerBehavior.Separated) + { + var batchDefinition = options.BatchDefinitions.FirstOrDefault(x => x.ElementType == messageType); + if (batchDefinition?.LocalExecutionQueueName is { } batchQueueName) + { + var batchEndpoint = options.Transports.GetOrCreate().QueueFor(batchQueueName); + if (endpoints.All(e => !Equals(e.Uri, batchEndpoint.Uri))) + { + endpoints.Add(batchEndpoint); + } + } + } + return endpoints.Select(e => MessageRoute.For(messageType, e, runtime)); }