From 53f016b05c6c2d8f30016f822cca12aa629a486f Mon Sep 17 00:00:00 2001 From: Geoffrey MARC Date: Thu, 11 Jun 2026 16:42:51 +0200 Subject: [PATCH 1/2] fix(batching): run both a direct and a batch handler under Separated mode When a message type has both a direct Handle(T) handler and a BatchMessagesOf() configuration, the direct handler silently shadowed the batch: routing (LocalRouting.FindRoutes) and executor resolution (Executor.Build / ExecutorFactory.BuildFor) only consulted the batch definitions when there was no direct handler for the element type, so the batch handler never ran. Under MultipleHandlerBehavior.Separated, treat the per-message handler and the batch handler as independent consumers of the element type: move the batch onto a dedicated '-batch' local queue so it no longer collides with the direct handler's queue, fan the element type out to both queues for in-process publishes, and relay to both local queues for messages arriving from an external transport listener. --- docs/guide/handlers/batching.md | 33 ++++- .../Bug_separated_batch_and_single_handler.cs | 140 ++++++++++++++++++ .../Runtime/Handlers/HandlerGraph.cs | 19 ++- .../Runtime/Wolverine.ExecutorFactory.cs | 35 ++++- .../Runtime/WolverineRuntime.HostService.cs | 52 +++++++ .../Runtime/WolverineRuntime.Routing.cs | 20 ++- 6 files changed, 291 insertions(+), 8 deletions(-) create mode 100644 src/Testing/CoreTests/Bugs/Bug_separated_batch_and_single_handler.cs diff --git a/docs/guide/handlers/batching.md b/docs/guide/handlers/batching.md index 14e6db2d1..4a66f4aec 100644 --- a/docs/guide/handlers/batching.md +++ b/docs/guide/handlers/batching.md @@ -131,9 +131,40 @@ public async Task send_end_to_end_with_batch() Alright, with all that being said, here's a few more facts about the batch messaging support: 1. There is absolutely no need to create a specific message handler for the `Item` message, and in fact, you should - not do so + not do so -- *unless* you are running in `MultipleHandlerBehavior.Separated` mode and deliberately want both a + per-message handler and a batched handler (see [Combining a direct handler with batching](#combining-a-direct-handler-with-batching) below) 2. The message batching is able to group the message batches by tenant id *if* your Wolverine system uses multi-tenancy +## Combining a direct handler with batching + +By default Wolverine assumes the batch handler is the *only* consumer of the element type, so an incoming `Item` +is always routed straight to the batch. If you *also* declare a direct `Handle(Item)` handler alongside +`BatchMessagesOf()`, the direct handler wins and the batch is silently shadowed -- the batched handler never runs. + +The one exception is `MultipleHandlerBehavior.Separated`. Under that mode Wolverine treats the per-message handler and +the batched handler as two independent consumers of `Item`, so **both** run for every `Item`: + +```csharp +opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; +opts.BatchMessagesOf(); + +// Direct, per-message handler +public static class ItemAuditHandler +{ + public static void Handle(Item item) { /* runs once per message */ } +} + +// Batched handler +public static class ItemHandler +{ + public static void Handle(Item[] items) { /* runs once per assembled batch */ } +} +``` + +To make this work, Wolverine moves the batch onto its own dedicated local queue (the element type's queue name with a +`-batch` suffix) so it no longer collides with the direct handler's queue, and fans every `Item` out to both queues. +This applies to messages published in-process *and* to `Item` messages arriving from an external transport listener. + ## What about durable messaging ("inbox")? The durable inbox behaves just a little bit differently for message batching. Wolverine will technically diff --git a/src/Testing/CoreTests/Bugs/Bug_separated_batch_and_single_handler.cs b/src/Testing/CoreTests/Bugs/Bug_separated_batch_and_single_handler.cs new file mode 100644 index 000000000..32c32bd64 --- /dev/null +++ b/src/Testing/CoreTests/Bugs/Bug_separated_batch_and_single_handler.cs @@ -0,0 +1,140 @@ +using JasperFx.Core; +using JasperFx.Core.Reflection; +using Microsoft.Extensions.Hosting; +using Wolverine.ComplianceTests; +using Wolverine.Runtime; +using Wolverine.Runtime.Handlers; +using Wolverine.Runtime.Routing; +using Wolverine.Tracking; +using Wolverine.Transports.Local; +using Xunit; + +namespace CoreTests.Bugs; + +// Reproduction of the dom-order-api Funding scenario: +// - MultipleHandlerBehavior.Separated +// - BatchMessagesOf() +// - LoadPublisher: batch handler Handle(LoadEvent[] messages) +// - LoadTelemetry: single handler Handle(LoadEvent e) +// Under Separated mode, a LoadEvent must invoke BOTH the direct handler AND the batch +// handler. Before the fix the direct handler silently shadowed the batch. +public class Bug_separated_batch_and_single_handler +{ + [Fact] + public async Task separated_direct_and_batch_handler_both_run_on_local_publish() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(typeof(LoadPublisher)) + .IncludeType(typeof(LoadTelemetry)); + + opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; + opts.BatchMessagesOf(b => b.TriggerTime = 250.Milliseconds()); + }) + .StartAsync(); + + LoadPublisher.BatchCalls.Clear(); + LoadTelemetry.SingleCalls.Clear(); + + await host.TrackActivity() + .Timeout(10.Seconds()) + .WaitForMessageToBeReceivedAt(host) + .ExecuteAndWaitAsync(c => c.PublishAsync(new LoadEvent(1))); + + // The direct telemetry handler runs per-message... + LoadTelemetry.SingleCalls.Count.ShouldBe(1); + // ...and the batch publisher handler also runs on the batched array. + LoadPublisher.BatchCalls.Count.ShouldBe(1); + LoadPublisher.BatchCalls[0].Select(x => x.Id).ShouldBe([1]); + } + + [Fact] + public async Task batch_lives_on_its_own_queue_and_message_fans_out_to_both() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(typeof(LoadPublisher)) + .IncludeType(typeof(LoadTelemetry)); + + opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; + opts.BatchMessagesOf(); + }) + .StartAsync(); + + var runtime = host.GetRuntime(); + + // The batch was moved off the element type's default queue onto a dedicated one. + var batch = runtime.Options.BatchDefinitions.Single(); + batch.LocalExecutionQueueName!.ShouldEndWith("-batch"); + var batchUri = new Uri("local://" + batch.LocalExecutionQueueName); + + // Routing a LoadEvent now fans out to BOTH the direct handler queue and the batch queue. + var destinations = runtime.RoutingFor(typeof(LoadEvent)) + .ShouldBeOfType>() + .Routes.OfType().Select(x => x.Sender.Destination).ToArray(); + destinations.ShouldContain(batchUri); + destinations.Length.ShouldBe(2); + + // The dedicated batch queue resolves to the batching processor (not the direct handler). + var batchQueue = runtime.Endpoints.EndpointFor(batchUri)!; + var batchHandler = runtime.As().BuildFor(typeof(LoadEvent), batchQueue) + .ShouldBeOfType().Handler; + batchHandler.ShouldBeOfType>(); + } + + [Fact] + public async Task separated_external_arrival_resolves_to_fanout_for_conflicting_element_type() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(typeof(LoadPublisher)) + .IncludeType(typeof(LoadTelemetry)); + + opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; + opts.BatchMessagesOf(); + + // A non-local (external) listener — a LoadEvent arriving here must still + // reach both the direct handler and the batch. + opts.ListenForMessagesFrom("stub://external"); + }) + .StartAsync(); + + var runtime = host.GetRuntime(); + var external = runtime.Options.Transports.AllEndpoints() + .First(e => e is not LocalQueue && e.Uri.Scheme == "stub"); + + // A LoadEvent arriving from an external listener relays to BOTH local queues + // (direct + batch) via a fanout handler, so both run independently. + var handler = runtime.As().BuildFor(typeof(LoadEvent), external) + .ShouldBeOfType().Handler; + handler.ShouldBeOfType>(); + } +} + +public record LoadEvent(int Id); + +public static class LoadPublisher +{ + public static readonly List BatchCalls = new(); + + public static void Handle(LoadEvent[] messages) + { + BatchCalls.Add(messages); + } +} + +public static class LoadTelemetry +{ + public static readonly List SingleCalls = new(); + + public static void Handle(LoadEvent e) + { + SingleCalls.Add(e); + } +} diff --git a/src/Wolverine/Runtime/Handlers/HandlerGraph.cs b/src/Wolverine/Runtime/Handlers/HandlerGraph.cs index 8703d379f..a472253d5 100644 --- a/src/Wolverine/Runtime/Handlers/HandlerGraph.cs +++ b/src/Wolverine/Runtime/Handlers/HandlerGraph.cs @@ -299,8 +299,6 @@ public void AddRange(IEnumerable calls) // populate _handlers at HandlerGraph.Compile time so the steady-state // hot path is pure cache lookups. The pre-population work is tracked in // #2769 (CloseAndBuildAs elimination). - [UnconditionalSuppressMessage("AOT", "IL3050", - Justification = "FanoutMessageHandler<> closed over runtime messageType; AOT consumers pre-populate _handlers via TypeLoadMode.Static. See AOT guide / #2769.")] private IMessageHandler getOrBuildFanoutHandler(Type messageType, HandlerChain chain) { if (_handlers.TryFind(messageType, out var cached) && cached != null) @@ -315,13 +313,26 @@ private IMessageHandler getOrBuildFanoutHandler(Type messageType, HandlerChain c .Distinct() .ToArray(); - var handlerType = typeof(FanoutMessageHandler<>).MakeGenericType(messageType); - var handler = (IMessageHandler)Activator.CreateInstance(handlerType, localUris, chain)!; + var handler = BuildFanoutHandler(messageType, chain, localUris); _handlers = _handlers.AddOrUpdate(messageType, handler); return handler; } + // Closes FanoutMessageHandler<> over the runtime-resolved messageType and + // Activator.CreateInstance's the result. Used both for sticky-handler fanout + // (getOrBuildFanoutHandler, cached in _handlers) and for relaying an externally- + // arriving element type to its direct + batch local queues (NOT cached — the + // _handlers[messageType] slot belongs to the direct handler). AOT-clean apps in + // TypeLoadMode.Static pre-populate handlers at Compile time. See #2769. + [UnconditionalSuppressMessage("AOT", "IL3050", + Justification = "FanoutMessageHandler<> closed over runtime messageType; AOT consumers pre-populate handlers via TypeLoadMode.Static. See AOT guide / #2769.")] + internal IMessageHandler BuildFanoutHandler(Type messageType, HandlerChain chain, Uri[] localQueueUris) + { + var handlerType = typeof(FanoutMessageHandler<>).MakeGenericType(messageType); + return (IMessageHandler)Activator.CreateInstance(handlerType, localQueueUris, chain)!; + } + // Compile is the bootstrap-time handler-graph build. The remaining IL2026 // here comes from registerMessageTypes' MakeGenericType / GetInterfaces walks // (pre-population work tracked in #2769) and from the opt-in diff --git a/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs b/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs index cc76a3eb4..9e558585d 100644 --- a/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs +++ b/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs @@ -4,6 +4,7 @@ using Wolverine.Runtime.Agents; using Wolverine.Runtime.Handlers; using Wolverine.Runtime.Partitioning; +using Wolverine.Transports.Local; using Wolverine.Util; namespace Wolverine.Runtime; @@ -20,14 +21,44 @@ IExecutor IExecutorFactory.BuildFor(Type messageType) IExecutor IExecutorFactory.BuildFor(Type messageType, Endpoint endpoint) { IMessageHandler? handler = null; - if (Options.MessagePartitioning.TryFindTopology(messageType, out var topology)) + + // If this endpoint is the dedicated batch queue for the element type, always use the + // batching processor — even when the element type also has a direct Handle(T) handler + // (the Separated direct-handler + batch case, where HandlerFor would otherwise return + // the direct handler and shadow the batch). + var batchForEndpoint = Options.BatchDefinitions.FirstOrDefault(x => + x.ElementType == messageType && + string.Equals(endpoint.EndpointName, x.LocalExecutionQueueName, StringComparison.OrdinalIgnoreCase)); + if (batchForEndpoint != null) + { + handler = batchForEndpoint.BuildHandler(this); + } + + // External-arrival parity (Separated): when an element type that has BOTH a direct + // handler and a batch definition arrives on a non-local endpoint, relay it to both + // local queues (direct + batch) so both run, mirroring the local fan-out routing. + if (handler == null && endpoint is not LocalQueue && + Options.MultipleHandlerBehavior == MultipleHandlerBehavior.Separated) + { + var batchDefinition = Options.BatchDefinitions.FirstOrDefault(x => x.ElementType == messageType); + var directChain = Handlers.ChainFor(messageType); + if (batchDefinition?.LocalExecutionQueueName is { } batchQueueName && directChain != null) + { + var local = Options.Transports.GetOrCreate(); + var directQueueUri = local.FindQueueForMessageType(messageType).Uri; + var batchQueueUri = local.QueueFor(batchQueueName).Uri; + handler = Handlers.BuildFanoutHandler(messageType, directChain, [directQueueUri, batchQueueUri]); + } + } + + if (handler == null && Options.MessagePartitioning.TryFindTopology(messageType, out var topology)) { if (!topology!.Slots.Contains(endpoint)) { handler = new PartitionedMessageReRouter(topology, messageType); } } - + handler ??= (IMessageHandler?)Handlers.HandlerFor(messageType, endpoint); if (handler == null ) { diff --git a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs index cc691925c..411b602c3 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs @@ -98,6 +98,15 @@ public async Task StartAsync(CancellationToken cancellationToken) // Build up the message handlers Handlers.Compile(Options, _container); + // Under MultipleHandlerBehavior.Separated, a message type may have BOTH a direct + // Handle(T) handler AND a BatchMessagesOf() batch handler. By default the batch + // local queue is the element type's convention queue — the SAME queue the direct + // handler uses — so the two collide (a local queue resolves a single executor per + // message type) and the batch is silently shadowed. Move the batch onto a dedicated + // queue so both can run independently. Done before the messaging transports start so + // the new queue still receives the durable/local-queue endpoint policies. + reassignBatchQueuesThatCollideWithHandlers(); + // Pre-populate the message-type-name cache so the per-message ToMessageTypeName() // hot path inside Envelope construction never pays the first-occurrence reflection // cost (attribute reads, interface walks, generic-type pretty-printing). @@ -522,6 +531,49 @@ private async Task executeIdleSendingAgentCleanup() } } + // Suffix appended to the element type's convention queue name to host the batch processor + // when the same element type also has a direct handler under Separated mode. + internal const string BatchQueueSuffix = "-batch"; + + private void reassignBatchQueuesThatCollideWithHandlers() + { + if (Options.MultipleHandlerBehavior != MultipleHandlerBehavior.Separated) + { + return; + } + + if (Options.BatchDefinitions.Count == 0) + { + return; + } + + var local = Options.Transports.GetOrCreate(); + + foreach (var batch in Options.BatchDefinitions) + { + // No direct Handle(T) handler for the element type -> the batch owns the element + // type's queue and the existing fallback routing/executor behavior is correct. + if (Handlers.ChainFor(batch.ElementType) == null) + { + continue; + } + + // The user explicitly pointed the batch at a distinct queue already -> respect it. + var directQueue = local.FindQueueForMessageType(batch.ElementType); + if (!string.Equals(batch.LocalExecutionQueueName, directQueue.EndpointName, + StringComparison.OrdinalIgnoreCase)) + { + continue; + } + + // Move the batch onto a dedicated queue distinct from the direct handler's queue. + var batchQueueName = directQueue.EndpointName + BatchQueueSuffix; + var batchQueue = local.QueueFor(batchQueueName); + batchQueue.Mode = directQueue.Mode; + batch.LocalExecutionQueueName = batchQueue.EndpointName; + } + } + private void discoverListenersFromConventions() { // Let any registered routing conventions discover listener endpoints diff --git a/src/Wolverine/Runtime/WolverineRuntime.Routing.cs b/src/Wolverine/Runtime/WolverineRuntime.Routing.cs index 2cba53d1b..5d12b26c8 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.Routing.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.Routing.cs @@ -128,7 +128,25 @@ public IEnumerable FindRoutes(Type messageType, IWolverineRuntime if (options.HandlerGraph.CanHandle(messageType)) { - var endpoints = options.LocalRouting.DiscoverSenders(messageType, runtime).ToArray(); + var endpoints = options.LocalRouting.DiscoverSenders(messageType, runtime).ToList(); + + // Under MultipleHandlerBehavior.Separated, an element type may have BOTH a direct + // handler (covered above) AND a BatchMessagesOf() batch handler living on its own + // dedicated queue. Fan the element type out to the batch queue as well so the batch + // handler runs independently of the direct handler. + if (options.MultipleHandlerBehavior == MultipleHandlerBehavior.Separated) + { + var batchDefinition = options.BatchDefinitions.FirstOrDefault(x => x.ElementType == messageType); + if (batchDefinition?.LocalExecutionQueueName is { } batchQueueName) + { + var batchEndpoint = options.Transports.GetOrCreate().QueueFor(batchQueueName); + if (endpoints.All(e => !Equals(e.Uri, batchEndpoint.Uri))) + { + endpoints.Add(batchEndpoint); + } + } + } + return endpoints.Select(e => MessageRoute.For(messageType, e, runtime)); } From 309adefb259d6fef46336950c0188765d64e4865 Mon Sep 17 00:00:00 2001 From: Geoffrey MARC Date: Thu, 11 Jun 2026 17:13:16 +0200 Subject: [PATCH 2/2] fix(batching): fan out to multiple Handle(T[]) handlers under Separated mode Under MultipleHandlerBehavior.Separated, when the batched array type has more than one Handle(T[]) handler, Wolverine splits them onto per-handler sticky queues. The BatchingProcessor re-enqueues a single produced batch onto the batch's own execution queue, which is none of those sticky queues, so executor resolution threw NoHandlerForEndpointException and no batch handler ran. Resolve the batch's execution queue to a fan-out handler that relays the produced batch (T[] or a custom batch message type) to every sticky Handle(T[]) queue, so each batch handler runs independently. Works both with and without a direct Handle(T) handler (the dedicated -batch queue case). Classic mode is unchanged: multiple Handle(T[]) handlers are still combined into one chain. --- docs/guide/handlers/batching.md | 24 ++++ .../Bug_separated_multiple_batch_handlers.cs | 132 ++++++++++++++++++ .../Runtime/Wolverine.ExecutorFactory.cs | 34 ++++- 3 files changed, 188 insertions(+), 2 deletions(-) create mode 100644 src/Testing/CoreTests/Bugs/Bug_separated_multiple_batch_handlers.cs diff --git a/docs/guide/handlers/batching.md b/docs/guide/handlers/batching.md index 4a66f4aec..ac74b8c6b 100644 --- a/docs/guide/handlers/batching.md +++ b/docs/guide/handlers/batching.md @@ -165,6 +165,30 @@ To make this work, Wolverine moves the batch onto its own dedicated local queue `-batch` suffix) so it no longer collides with the direct handler's queue, and fans every `Item` out to both queues. This applies to messages published in-process *and* to `Item` messages arriving from an external transport listener. +### Multiple batched handlers + +`MultipleHandlerBehavior.Separated` also lets you register **more than one** batched handler for the same element type -- +for example one handler that publishes an integration event for the batch and another that archives it: + +```csharp +opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; +opts.BatchMessagesOf(); + +public static class ItemPublisher +{ + public static void Handle(Item[] items) { /* publish an integration event */ } +} + +public static class ItemArchiver +{ + public static void Handle(Item[] items) { /* archive the batch */ } +} +``` + +Under `Separated` mode each `Handle(Item[])` handler is given its own sticky queue, so Wolverine fans the assembled +batch out to every one of them and each runs independently. (Under the default `Classic` behavior the multiple +`Handle(Item[])` handlers are instead combined into a single logical handler that invokes each one in turn.) + ## What about durable messaging ("inbox")? The durable inbox behaves just a little bit differently for message batching. Wolverine will technically diff --git a/src/Testing/CoreTests/Bugs/Bug_separated_multiple_batch_handlers.cs b/src/Testing/CoreTests/Bugs/Bug_separated_multiple_batch_handlers.cs new file mode 100644 index 000000000..81ff10743 --- /dev/null +++ b/src/Testing/CoreTests/Bugs/Bug_separated_multiple_batch_handlers.cs @@ -0,0 +1,132 @@ +using JasperFx.Core; +using JasperFx.Core.Reflection; +using Microsoft.Extensions.Hosting; +using Wolverine.ComplianceTests; +using Wolverine.Runtime; +using Wolverine.Runtime.Handlers; +using Wolverine.Tracking; +using Xunit; + +namespace CoreTests.Bugs; + +// Companion to Bug_separated_batch_and_single_handler: under MultipleHandlerBehavior.Separated, +// when the batched array type T[] has MULTIPLE Handle(T[]) handlers, Wolverine splits them onto +// per-handler sticky queues. The BatchingProcessor re-enqueues a single produced T[] onto the +// batch's own execution queue, which is none of those sticky queues. Before the fan-out fix this +// threw NoHandlerForEndpointException and no batch handler ran. +public class Bug_separated_multiple_batch_handlers +{ + private static IHostBuilder ConfigureHost(bool withDirectHandler) + { + return Host.CreateDefaultBuilder().UseWolverine(opts => + { + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(typeof(InvoicePublisher)) + .IncludeType(typeof(InvoiceArchiver)); + + if (withDirectHandler) + { + opts.Discovery.IncludeType(typeof(InvoiceTelemetry)); + } + + opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; + opts.BatchMessagesOf(b => b.TriggerTime = 250.Milliseconds()); + }); + } + + private static async Task WaitForAsync(Func condition) + { + for (var i = 0; i < 50 && !condition(); i++) + { + await Task.Delay(100.Milliseconds()); + } + } + + [Fact] + public async Task produced_array_fans_out_to_each_sticky_handler_queue() + { + using var host = await ConfigureHost(withDirectHandler: false).StartAsync(); + var runtime = host.GetRuntime(); + + // The batch's execution queue receives the produced InvoiceEvent[]... + var batch = runtime.Options.BatchDefinitions.Single(); + var batchUri = new Uri("local://" + batch.LocalExecutionQueueName); + var batchQueue = runtime.Endpoints.EndpointFor(batchUri)!; + + // ...but the two Handle(InvoiceEvent[]) handlers were separated onto their own sticky queues, + // so the array must fan out from the batch queue to each of them. + var handler = runtime.As().BuildFor(typeof(InvoiceEvent[]), batchQueue) + .ShouldBeOfType().Handler; + handler.ShouldBeOfType>(); + } + + [Fact] + public async Task separated_multiple_batch_handlers_all_run() + { + using var host = await ConfigureHost(withDirectHandler: false).StartAsync(); + + InvoicePublisher.Calls.Clear(); + InvoiceArchiver.Calls.Clear(); + + await host.TrackActivity() + .Timeout(10.Seconds()) + .WaitForMessageToBeReceivedAt(host) + .ExecuteAndWaitAsync(c => c.PublishAsync(new InvoiceEvent(1))); + + // The array reception condition fires at the batch queue (the fan-out); give the two + // relayed sticky deliveries time to drain before asserting both handlers ran. + await WaitForAsync(() => InvoicePublisher.Calls.Count == 1 && InvoiceArchiver.Calls.Count == 1); + + InvoicePublisher.Calls.Count.ShouldBe(1); + InvoiceArchiver.Calls.Count.ShouldBe(1); + } + + [Fact] + public async Task separated_direct_handler_plus_multiple_batch_handlers_all_run() + { + using var host = await ConfigureHost(withDirectHandler: true).StartAsync(); + var runtime = host.GetRuntime(); + + // The direct Handle(InvoiceEvent) collides with the batch element queue, so the batch was + // moved onto its dedicated -batch queue (Bug_separated_batch_and_single_handler behavior). + runtime.Options.BatchDefinitions.Single().LocalExecutionQueueName!.ShouldEndWith("-batch"); + + InvoicePublisher.Calls.Clear(); + InvoiceArchiver.Calls.Clear(); + InvoiceTelemetry.Calls.Clear(); + + await host.TrackActivity() + .Timeout(10.Seconds()) + .WaitForMessageToBeReceivedAt(host) + .ExecuteAndWaitAsync(c => c.PublishAsync(new InvoiceEvent(7))); + + await WaitForAsync(() => + InvoiceTelemetry.Calls.Count == 1 && InvoicePublisher.Calls.Count == 1 && + InvoiceArchiver.Calls.Count == 1); + + // The direct per-message handler AND both independent batch handlers all run. + InvoiceTelemetry.Calls.Count.ShouldBe(1); + InvoicePublisher.Calls.Count.ShouldBe(1); + InvoiceArchiver.Calls.Count.ShouldBe(1); + } +} + +public record InvoiceEvent(int Id); + +public static class InvoicePublisher +{ + public static readonly List Calls = new(); + public static void Handle(InvoiceEvent[] messages) => Calls.Add(messages); +} + +public static class InvoiceArchiver +{ + public static readonly List Calls = new(); + public static void Handle(InvoiceEvent[] messages) => Calls.Add(messages); +} + +public static class InvoiceTelemetry +{ + public static readonly List Calls = new(); + public static void Handle(InvoiceEvent e) => Calls.Add(e); +} diff --git a/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs b/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs index 9e558585d..7db2e6dfd 100644 --- a/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs +++ b/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs @@ -51,6 +51,36 @@ IExecutor IExecutorFactory.BuildFor(Type messageType, Endpoint endpoint) } } + // Multiple Separated batch handlers: when the produced batch-message type (T[] or a custom + // batch type) has MORE THAN ONE Handle handler, Separated mode splits them onto per-handler + // sticky queues. The BatchingProcessor re-enqueues a single produced batch onto the batch's + // own execution queue — which is none of those sticky queues — so HandlerFor would throw. + // Relay the produced batch from that queue to every sticky handler queue via a fan-out. + if (handler == null && endpoint is LocalQueue && + Options.MultipleHandlerBehavior == MultipleHandlerBehavior.Separated) + { + var thisQueueProducesTheBatch = Options.BatchDefinitions.Any(x => + x.Batcher.BatchMessageType == messageType && + string.Equals(endpoint.EndpointName, x.LocalExecutionQueueName, StringComparison.OrdinalIgnoreCase)); + if (thisQueueProducesTheBatch) + { + var batchChain = Handlers.ChainFor(messageType); + if (batchChain != null && batchChain.ByEndpoint.Any() && !batchChain.HasDefaultNonStickyHandlers()) + { + var stickyLocalUris = batchChain.ByEndpoint + .SelectMany(c => c.Endpoints) + .OfType() + .Select(e => e.Uri) + .Distinct() + .ToArray(); + if (stickyLocalUris.Length != 0) + { + handler = Handlers.BuildFanoutHandler(messageType, batchChain, stickyLocalUris); + } + } + } + } + if (handler == null && Options.MessagePartitioning.TryFindTopology(messageType, out var topology)) { if (!topology!.Slots.Contains(endpoint)) @@ -60,7 +90,7 @@ IExecutor IExecutorFactory.BuildFor(Type messageType, Endpoint endpoint) } handler ??= (IMessageHandler?)Handlers.HandlerFor(messageType, endpoint); - if (handler == null ) + if (handler == null) { var batching = Options.BatchDefinitions.FirstOrDefault(x => x.ElementType == messageType); if (batching != null) @@ -80,7 +110,7 @@ IExecutor IExecutorFactory.BuildFor(Type messageType, Endpoint endpoint) var accumulator = MetricsAccumulator.FindAccumulator(messageType.ToMessageTypeName(), endpoint); tracker = new HybridMetricsPublishingMessageTracker(this, accumulator.EntryPoint); } - + var executor = handler == null ? new NoHandlerExecutor(messageType, this) : Executor.Build(this, ExecutionPool, Handlers, handler, tracker);