diff --git a/docs/guide/handlers/batching.md b/docs/guide/handlers/batching.md index 14e6db2d1..ac74b8c6b 100644 --- a/docs/guide/handlers/batching.md +++ b/docs/guide/handlers/batching.md @@ -131,9 +131,64 @@ public async Task send_end_to_end_with_batch() Alright, with all that being said, here's a few more facts about the batch messaging support: 1. There is absolutely no need to create a specific message handler for the `Item` message, and in fact, you should - not do so + not do so -- *unless* you are running in `MultipleHandlerBehavior.Separated` mode and deliberately want both a + per-message handler and a batched handler (see [Combining a direct handler with batching](#combining-a-direct-handler-with-batching) below) 2. The message batching is able to group the message batches by tenant id *if* your Wolverine system uses multi-tenancy +## Combining a direct handler with batching + +By default Wolverine assumes the batch handler is the *only* consumer of the element type, so an incoming `Item` +is always routed straight to the batch. If you *also* declare a direct `Handle(Item)` handler alongside +`BatchMessagesOf()`, the direct handler wins and the batch is silently shadowed -- the batched handler never runs. + +The one exception is `MultipleHandlerBehavior.Separated`. Under that mode Wolverine treats the per-message handler and +the batched handler as two independent consumers of `Item`, so **both** run for every `Item`: + +```csharp +opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; +opts.BatchMessagesOf(); + +// Direct, per-message handler +public static class ItemAuditHandler +{ + public static void Handle(Item item) { /* runs once per message */ } +} + +// Batched handler +public static class ItemHandler +{ + public static void Handle(Item[] items) { /* runs once per assembled batch */ } +} +``` + +To make this work, Wolverine moves the batch onto its own dedicated local queue (the element type's queue name with a +`-batch` suffix) so it no longer collides with the direct handler's queue, and fans every `Item` out to both queues. +This applies to messages published in-process *and* to `Item` messages arriving from an external transport listener. + +### Multiple batched handlers + +`MultipleHandlerBehavior.Separated` also lets you register **more than one** batched handler for the same element type -- +for example one handler that publishes an integration event for the batch and another that archives it: + +```csharp +opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; +opts.BatchMessagesOf(); + +public static class ItemPublisher +{ + public static void Handle(Item[] items) { /* publish an integration event */ } +} + +public static class ItemArchiver +{ + public static void Handle(Item[] items) { /* archive the batch */ } +} +``` + +Under `Separated` mode each `Handle(Item[])` handler is given its own sticky queue, so Wolverine fans the assembled +batch out to every one of them and each runs independently. (Under the default `Classic` behavior the multiple +`Handle(Item[])` handlers are instead combined into a single logical handler that invokes each one in turn.) + ## What about durable messaging ("inbox")? The durable inbox behaves just a little bit differently for message batching. Wolverine will technically diff --git a/src/Testing/CoreTests/Acceptance/batching_with_separated_handlers.cs b/src/Testing/CoreTests/Acceptance/batching_with_separated_handlers.cs new file mode 100644 index 000000000..eb04e76ed --- /dev/null +++ b/src/Testing/CoreTests/Acceptance/batching_with_separated_handlers.cs @@ -0,0 +1,170 @@ +using JasperFx.Core; +using Microsoft.Extensions.Hosting; +using Wolverine; +using Wolverine.Runtime.Handlers; +using Wolverine.Tracking; +using Xunit; + +namespace CoreTests.Acceptance; + +// End-to-end batching behavior under MultipleHandlerBehavior.Separated, where a per-message direct +// handler and a batch handler are independent consumers of the same element type. +// +// - Direct + batch (LoadEvent): a direct Handle(LoadEvent) and a BatchMessagesOf() +// batch handler must BOTH run. Before the fix the direct handler silently shadowed the batch. +// - Multiple batch handlers (InvoiceEvent): a batched array type with more than one Handle(T[]) +// handler is split onto per-handler sticky queues; the produced batch must fan out to all of +// them (previously threw NoHandlerForEndpointException and none ran). +// +// The routing/executor-resolution mechanics behind these are asserted in +// CoreTests.Runtime.Routing.separated_batch_routing. +public class batching_with_separated_handlers +{ + private static IHostBuilder ConfigureMultipleBatchHost(bool withDirectHandler) + { + return Host.CreateDefaultBuilder().UseWolverine(opts => + { + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(typeof(InvoicePublisher)) + .IncludeType(typeof(InvoiceArchiver)); + + if (withDirectHandler) + { + opts.Discovery.IncludeType(typeof(InvoiceTelemetry)); + } + + opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; + opts.BatchMessagesOf(b => b.TriggerTime = 250.Milliseconds()); + }); + } + + private static async Task WaitForAsync(Func condition) + { + for (var i = 0; i < 50 && !condition(); i++) + { + await Task.Delay(100.Milliseconds()); + } + } + + [Fact] + public async Task separated_direct_and_batch_handler_both_run_on_local_publish() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(typeof(LoadPublisher)) + .IncludeType(typeof(LoadTelemetry)); + + opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; + opts.BatchMessagesOf(b => b.TriggerTime = 250.Milliseconds()); + }) + .StartAsync(); + + LoadPublisher.BatchCalls.Clear(); + LoadTelemetry.SingleCalls.Clear(); + + await host.TrackActivity() + .Timeout(10.Seconds()) + .WaitForMessageToBeReceivedAt(host) + .ExecuteAndWaitAsync(c => c.PublishAsync(new LoadEvent(1))); + + // The direct telemetry handler runs per-message... + LoadTelemetry.SingleCalls.Count.ShouldBe(1); + // ...and the batch publisher handler also runs on the batched array. + LoadPublisher.BatchCalls.Count.ShouldBe(1); + LoadPublisher.BatchCalls[0].Select(x => x.Id).ShouldBe([1]); + } + + [Fact] + public async Task separated_multiple_batch_handlers_all_run() + { + using var host = await ConfigureMultipleBatchHost(withDirectHandler: false).StartAsync(); + + InvoicePublisher.Calls.Clear(); + InvoiceArchiver.Calls.Clear(); + + await host.TrackActivity() + .Timeout(10.Seconds()) + .WaitForMessageToBeReceivedAt(host) + .ExecuteAndWaitAsync(c => c.PublishAsync(new InvoiceEvent(1))); + + // The array reception condition fires at the batch queue (the fan-out); give the two + // relayed sticky deliveries time to drain before asserting both handlers ran. + await WaitForAsync(() => InvoicePublisher.Calls.Count == 1 && InvoiceArchiver.Calls.Count == 1); + + InvoicePublisher.Calls.Count.ShouldBe(1); + InvoiceArchiver.Calls.Count.ShouldBe(1); + } + + [Fact] + public async Task separated_direct_handler_plus_multiple_batch_handlers_all_run() + { + using var host = await ConfigureMultipleBatchHost(withDirectHandler: true).StartAsync(); + var runtime = host.GetRuntime(); + + // The direct Handle(InvoiceEvent) collides with the batch element queue, so the batch was + // moved onto its dedicated -batch queue (see separated_batch_routing for the mechanics). + runtime.Options.BatchDefinitions.Single().LocalExecutionQueueName!.ShouldEndWith("-batch"); + + InvoicePublisher.Calls.Clear(); + InvoiceArchiver.Calls.Clear(); + InvoiceTelemetry.Calls.Clear(); + + await host.TrackActivity() + .Timeout(10.Seconds()) + .WaitForMessageToBeReceivedAt(host) + .ExecuteAndWaitAsync(c => c.PublishAsync(new InvoiceEvent(7))); + + await WaitForAsync(() => + InvoiceTelemetry.Calls.Count == 1 && InvoicePublisher.Calls.Count == 1 && + InvoiceArchiver.Calls.Count == 1); + + // The direct per-message handler AND both independent batch handlers all run. + InvoiceTelemetry.Calls.Count.ShouldBe(1); + InvoicePublisher.Calls.Count.ShouldBe(1); + InvoiceArchiver.Calls.Count.ShouldBe(1); + } +} + +public record LoadEvent(int Id); + +public static class LoadPublisher +{ + public static readonly List BatchCalls = new(); + + public static void Handle(LoadEvent[] messages) + { + BatchCalls.Add(messages); + } +} + +public static class LoadTelemetry +{ + public static readonly List SingleCalls = new(); + + public static void Handle(LoadEvent e) + { + SingleCalls.Add(e); + } +} + +public record InvoiceEvent(int Id); + +public static class InvoicePublisher +{ + public static readonly List Calls = new(); + public static void Handle(InvoiceEvent[] messages) => Calls.Add(messages); +} + +public static class InvoiceArchiver +{ + public static readonly List Calls = new(); + public static void Handle(InvoiceEvent[] messages) => Calls.Add(messages); +} + +public static class InvoiceTelemetry +{ + public static readonly List Calls = new(); + public static void Handle(InvoiceEvent e) => Calls.Add(e); +} diff --git a/src/Testing/CoreTests/Runtime/Routing/separated_batch_routing.cs b/src/Testing/CoreTests/Runtime/Routing/separated_batch_routing.cs new file mode 100644 index 000000000..eefbd0e95 --- /dev/null +++ b/src/Testing/CoreTests/Runtime/Routing/separated_batch_routing.cs @@ -0,0 +1,118 @@ +using CoreTests.Acceptance; +using JasperFx.Core.Reflection; +using Microsoft.Extensions.Hosting; +using Wolverine; +using Wolverine.ComplianceTests; +using Wolverine.Runtime; +using Wolverine.Runtime.Batching; +using Wolverine.Runtime.Handlers; +using Wolverine.Runtime.Routing; +using Wolverine.Tracking; +using Wolverine.Transports.Local; +using Xunit; + +namespace CoreTests.Runtime.Routing; + +// Routing and executor-resolution mechanics for the Separated-mode batching scenarios whose +// end-to-end behavior is covered in CoreTests.Acceptance.batching_with_separated_handlers. These +// assert how a conflicting element type is routed (fan-out to the dedicated -batch queue) and how +// the dedicated batch queue / external listener / produced-batch queue resolve to the right handler. +// +// Shares the handler + message types (LoadEvent/InvoiceEvent ...) with the batching suite. +public class separated_batch_routing +{ + [Fact] + public async Task batch_lives_on_its_own_queue_and_message_fans_out_to_both() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(typeof(LoadPublisher)) + .IncludeType(typeof(LoadTelemetry)); + + opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; + opts.BatchMessagesOf(); + }) + .StartAsync(); + + var runtime = host.GetRuntime(); + + // The batch was moved off the element type's default queue onto a dedicated one. + var batch = runtime.Options.BatchDefinitions.Single(); + batch.LocalExecutionQueueName!.ShouldEndWith("-batch"); + var batchUri = new Uri("local://" + batch.LocalExecutionQueueName); + + // Routing a LoadEvent now fans out to BOTH the direct handler queue and the batch queue. + var destinations = runtime.RoutingFor(typeof(LoadEvent)) + .ShouldBeOfType>() + .Routes.OfType().Select(x => x.Sender.Destination).ToArray(); + destinations.ShouldContain(batchUri); + destinations.Length.ShouldBe(2); + + // The dedicated batch queue resolves to the batching processor (not the direct handler). + var batchQueue = runtime.Endpoints.EndpointFor(batchUri)!; + var batchHandler = runtime.As().BuildFor(typeof(LoadEvent), batchQueue) + .ShouldBeOfType().Handler; + batchHandler.ShouldBeOfType>(); + } + + [Fact] + public async Task separated_external_arrival_resolves_to_fanout_for_conflicting_element_type() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(typeof(LoadPublisher)) + .IncludeType(typeof(LoadTelemetry)); + + opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; + opts.BatchMessagesOf(); + + // A non-local (external) listener — a LoadEvent arriving here must still + // reach both the direct handler and the batch. + opts.ListenForMessagesFrom("stub://external"); + }) + .StartAsync(); + + var runtime = host.GetRuntime(); + var external = runtime.Options.Transports.AllEndpoints() + .First(e => e is not LocalQueue && e.Uri.Scheme == "stub"); + + // A LoadEvent arriving from an external listener relays to BOTH local queues + // (direct + batch) via a fanout handler, so both run independently. + var handler = runtime.As().BuildFor(typeof(LoadEvent), external) + .ShouldBeOfType().Handler; + handler.ShouldBeOfType>(); + } + + [Fact] + public async Task produced_array_fans_out_to_each_sticky_handler_queue() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(typeof(InvoicePublisher)) + .IncludeType(typeof(InvoiceArchiver)); + + opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; + opts.BatchMessagesOf(); + }) + .StartAsync(); + + var runtime = host.GetRuntime(); + + // The batch's execution queue receives the produced InvoiceEvent[]... + var batch = runtime.Options.BatchDefinitions.Single(); + var batchUri = new Uri("local://" + batch.LocalExecutionQueueName); + var batchQueue = runtime.Endpoints.EndpointFor(batchUri)!; + + // ...but the two Handle(InvoiceEvent[]) handlers were separated onto their own sticky queues, + // so the array must fan out from the batch queue to each of them. + var handler = runtime.As().BuildFor(typeof(InvoiceEvent[]), batchQueue) + .ShouldBeOfType().Handler; + handler.ShouldBeOfType>(); + } +} diff --git a/src/Wolverine/Runtime/Handlers/HandlerGraph.cs b/src/Wolverine/Runtime/Handlers/HandlerGraph.cs index 8703d379f..a472253d5 100644 --- a/src/Wolverine/Runtime/Handlers/HandlerGraph.cs +++ b/src/Wolverine/Runtime/Handlers/HandlerGraph.cs @@ -299,8 +299,6 @@ public void AddRange(IEnumerable calls) // populate _handlers at HandlerGraph.Compile time so the steady-state // hot path is pure cache lookups. The pre-population work is tracked in // #2769 (CloseAndBuildAs elimination). - [UnconditionalSuppressMessage("AOT", "IL3050", - Justification = "FanoutMessageHandler<> closed over runtime messageType; AOT consumers pre-populate _handlers via TypeLoadMode.Static. See AOT guide / #2769.")] private IMessageHandler getOrBuildFanoutHandler(Type messageType, HandlerChain chain) { if (_handlers.TryFind(messageType, out var cached) && cached != null) @@ -315,13 +313,26 @@ private IMessageHandler getOrBuildFanoutHandler(Type messageType, HandlerChain c .Distinct() .ToArray(); - var handlerType = typeof(FanoutMessageHandler<>).MakeGenericType(messageType); - var handler = (IMessageHandler)Activator.CreateInstance(handlerType, localUris, chain)!; + var handler = BuildFanoutHandler(messageType, chain, localUris); _handlers = _handlers.AddOrUpdate(messageType, handler); return handler; } + // Closes FanoutMessageHandler<> over the runtime-resolved messageType and + // Activator.CreateInstance's the result. Used both for sticky-handler fanout + // (getOrBuildFanoutHandler, cached in _handlers) and for relaying an externally- + // arriving element type to its direct + batch local queues (NOT cached — the + // _handlers[messageType] slot belongs to the direct handler). AOT-clean apps in + // TypeLoadMode.Static pre-populate handlers at Compile time. See #2769. + [UnconditionalSuppressMessage("AOT", "IL3050", + Justification = "FanoutMessageHandler<> closed over runtime messageType; AOT consumers pre-populate handlers via TypeLoadMode.Static. See AOT guide / #2769.")] + internal IMessageHandler BuildFanoutHandler(Type messageType, HandlerChain chain, Uri[] localQueueUris) + { + var handlerType = typeof(FanoutMessageHandler<>).MakeGenericType(messageType); + return (IMessageHandler)Activator.CreateInstance(handlerType, localQueueUris, chain)!; + } + // Compile is the bootstrap-time handler-graph build. The remaining IL2026 // here comes from registerMessageTypes' MakeGenericType / GetInterfaces walks // (pre-population work tracked in #2769) and from the opt-in diff --git a/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs b/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs index cc76a3eb4..12ca201db 100644 --- a/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs +++ b/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs @@ -4,6 +4,7 @@ using Wolverine.Runtime.Agents; using Wolverine.Runtime.Handlers; using Wolverine.Runtime.Partitioning; +using Wolverine.Transports.Local; using Wolverine.Util; namespace Wolverine.Runtime; @@ -19,17 +20,23 @@ IExecutor IExecutorFactory.BuildFor(Type messageType) IExecutor IExecutorFactory.BuildFor(Type messageType, Endpoint endpoint) { - IMessageHandler? handler = null; - if (Options.MessagePartitioning.TryFindTopology(messageType, out var topology)) + // Resolve any Separated-mode batch handler that the default HandlerFor lookup would otherwise + // miss or shadow, in priority order. See each helper for the specific case it covers. + var handler = + tryBuildDedicatedBatchQueueHandler(messageType, endpoint) + ?? tryBuildExternalArrivalFanoutHandler(messageType, endpoint) + ?? tryBuildMultipleBatchHandlerFanout(messageType, endpoint); + + if (handler == null && Options.MessagePartitioning.TryFindTopology(messageType, out var topology)) { if (!topology!.Slots.Contains(endpoint)) { handler = new PartitionedMessageReRouter(topology, messageType); } } - + handler ??= (IMessageHandler?)Handlers.HandlerFor(messageType, endpoint); - if (handler == null ) + if (handler == null) { var batching = Options.BatchDefinitions.FirstOrDefault(x => x.ElementType == messageType); if (batching != null) @@ -49,11 +56,91 @@ IExecutor IExecutorFactory.BuildFor(Type messageType, Endpoint endpoint) var accumulator = MetricsAccumulator.FindAccumulator(messageType.ToMessageTypeName(), endpoint); tracker = new HybridMetricsPublishingMessageTracker(this, accumulator.EntryPoint); } - + var executor = handler == null ? new NoHandlerExecutor(messageType, this) : Executor.Build(this, ExecutionPool, Handlers, handler, tracker); return executor; } + + /// + /// If this endpoint is the dedicated batch queue for the element type, always use the batching + /// processor — even when the element type also has a direct Handle(T) handler (the Separated + /// direct-handler + batch case, where HandlerFor would otherwise return the direct handler and + /// shadow the batch). + /// + private IMessageHandler? tryBuildDedicatedBatchQueueHandler(Type messageType, Endpoint endpoint) + { + var batchForEndpoint = Options.BatchDefinitions.FirstOrDefault(x => + x.ElementType == messageType && + string.Equals(endpoint.EndpointName, x.LocalExecutionQueueName, StringComparison.OrdinalIgnoreCase)); + + return batchForEndpoint?.BuildHandler(this); + } + + /// + /// External-arrival parity (Separated): when an element type that has BOTH a direct handler and a + /// batch definition arrives on a non-local endpoint, relay it to both local queues (direct + + /// batch) so both run, mirroring the local fan-out routing. + /// + private IMessageHandler? tryBuildExternalArrivalFanoutHandler(Type messageType, Endpoint endpoint) + { + if (endpoint is LocalQueue || Options.MultipleHandlerBehavior != MultipleHandlerBehavior.Separated) + { + return null; + } + + var batchDefinition = Options.BatchDefinitions.FirstOrDefault(x => x.ElementType == messageType); + var directChain = Handlers.ChainFor(messageType); + if (batchDefinition?.LocalExecutionQueueName is { } batchQueueName && directChain != null) + { + var local = Options.Transports.GetOrCreate(); + var directQueueUri = local.FindQueueForMessageType(messageType).Uri; + var batchQueueUri = local.QueueFor(batchQueueName).Uri; + return Handlers.BuildFanoutHandler(messageType, directChain, [directQueueUri, batchQueueUri]); + } + + return null; + } + + /// + /// Multiple Separated batch handlers: when the produced batch-message type (T[] or a custom batch + /// type) has MORE THAN ONE Handle handler, Separated mode splits them onto per-handler sticky + /// queues. The BatchingProcessor re-enqueues a single produced batch onto the batch's own + /// execution queue — which is none of those sticky queues — so HandlerFor would throw. Relay the + /// produced batch from that queue to every sticky handler queue via a fan-out. + /// + private IMessageHandler? tryBuildMultipleBatchHandlerFanout(Type messageType, Endpoint endpoint) + { + if (endpoint is not LocalQueue || Options.MultipleHandlerBehavior != MultipleHandlerBehavior.Separated) + { + return null; + } + + var thisQueueProducesTheBatch = Options.BatchDefinitions.Any(x => + x.Batcher.BatchMessageType == messageType && + string.Equals(endpoint.EndpointName, x.LocalExecutionQueueName, StringComparison.OrdinalIgnoreCase)); + if (!thisQueueProducesTheBatch) + { + return null; + } + + var batchChain = Handlers.ChainFor(messageType); + if (batchChain == null || !batchChain.ByEndpoint.Any() || batchChain.HasDefaultNonStickyHandlers()) + { + return null; + } + + var stickyLocalUris = batchChain.ByEndpoint + .SelectMany(c => c.Endpoints) + .OfType() + .Select(e => e.Uri) + .Distinct() + .ToArray(); + + return stickyLocalUris.Length != 0 + ? Handlers.BuildFanoutHandler(messageType, batchChain, stickyLocalUris) + : null; + } } \ No newline at end of file diff --git a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs index cc691925c..411b602c3 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs @@ -98,6 +98,15 @@ public async Task StartAsync(CancellationToken cancellationToken) // Build up the message handlers Handlers.Compile(Options, _container); + // Under MultipleHandlerBehavior.Separated, a message type may have BOTH a direct + // Handle(T) handler AND a BatchMessagesOf() batch handler. By default the batch + // local queue is the element type's convention queue — the SAME queue the direct + // handler uses — so the two collide (a local queue resolves a single executor per + // message type) and the batch is silently shadowed. Move the batch onto a dedicated + // queue so both can run independently. Done before the messaging transports start so + // the new queue still receives the durable/local-queue endpoint policies. + reassignBatchQueuesThatCollideWithHandlers(); + // Pre-populate the message-type-name cache so the per-message ToMessageTypeName() // hot path inside Envelope construction never pays the first-occurrence reflection // cost (attribute reads, interface walks, generic-type pretty-printing). @@ -522,6 +531,49 @@ private async Task executeIdleSendingAgentCleanup() } } + // Suffix appended to the element type's convention queue name to host the batch processor + // when the same element type also has a direct handler under Separated mode. + internal const string BatchQueueSuffix = "-batch"; + + private void reassignBatchQueuesThatCollideWithHandlers() + { + if (Options.MultipleHandlerBehavior != MultipleHandlerBehavior.Separated) + { + return; + } + + if (Options.BatchDefinitions.Count == 0) + { + return; + } + + var local = Options.Transports.GetOrCreate(); + + foreach (var batch in Options.BatchDefinitions) + { + // No direct Handle(T) handler for the element type -> the batch owns the element + // type's queue and the existing fallback routing/executor behavior is correct. + if (Handlers.ChainFor(batch.ElementType) == null) + { + continue; + } + + // The user explicitly pointed the batch at a distinct queue already -> respect it. + var directQueue = local.FindQueueForMessageType(batch.ElementType); + if (!string.Equals(batch.LocalExecutionQueueName, directQueue.EndpointName, + StringComparison.OrdinalIgnoreCase)) + { + continue; + } + + // Move the batch onto a dedicated queue distinct from the direct handler's queue. + var batchQueueName = directQueue.EndpointName + BatchQueueSuffix; + var batchQueue = local.QueueFor(batchQueueName); + batchQueue.Mode = directQueue.Mode; + batch.LocalExecutionQueueName = batchQueue.EndpointName; + } + } + private void discoverListenersFromConventions() { // Let any registered routing conventions discover listener endpoints diff --git a/src/Wolverine/Runtime/WolverineRuntime.Routing.cs b/src/Wolverine/Runtime/WolverineRuntime.Routing.cs index 2cba53d1b..741d444ab 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.Routing.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.Routing.cs @@ -128,7 +128,18 @@ public IEnumerable FindRoutes(Type messageType, IWolverineRuntime if (options.HandlerGraph.CanHandle(messageType)) { - var endpoints = options.LocalRouting.DiscoverSenders(messageType, runtime).ToArray(); + var endpoints = options.LocalRouting.DiscoverSenders(messageType, runtime).ToList(); + + // Under MultipleHandlerBehavior.Separated, an element type may have BOTH a direct + // handler (covered above) AND a BatchMessagesOf() batch handler living on its own + // dedicated queue. Fan the element type out to the batch queue as well so the batch + // handler runs independently of the direct handler. + var batchEndpoint = FindSeparatedBatchEndpoint(messageType, options); + if (batchEndpoint != null && endpoints.All(e => !Equals(e.Uri, batchEndpoint.Uri))) + { + endpoints.Add(batchEndpoint); + } + return endpoints.Select(e => MessageRoute.For(messageType, e, runtime)); } @@ -145,6 +156,29 @@ public IEnumerable FindRoutes(Type messageType, IWolverineRuntime } + /// + /// Under , an element type that has a direct + /// handler may ALSO have a BatchMessagesOf<T>() batch handler living on its own + /// dedicated local queue. Returns that batch queue endpoint (if any) so the element type can be + /// fanned out to it in addition to the direct handler's queue. Returns null when not in + /// Separated mode or when the element type has no batch definition. + /// + private static LocalQueue? FindSeparatedBatchEndpoint(Type messageType, WolverineOptions options) + { + if (options.MultipleHandlerBehavior != MultipleHandlerBehavior.Separated) + { + return null; + } + + var batchDefinition = options.BatchDefinitions.FirstOrDefault(x => x.ElementType == messageType); + if (batchDefinition?.LocalExecutionQueueName is { } batchQueueName) + { + return options.Transports.GetOrCreate().QueueFor(batchQueueName); + } + + return null; + } + public bool IsAdditive { get; set; } public RouteSourceDescriptor Describe(IWolverineRuntime runtime) => new()