Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion docs/guide/handlers/batching.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,64 @@ public async Task send_end_to_end_with_batch()
Alright, with all that being said, here's a few more facts about the batch messaging support:

1. There is absolutely no need to create a specific message handler for the `Item` message, and in fact, you should
not do so
not do so -- *unless* you are running in `MultipleHandlerBehavior.Separated` mode and deliberately want both a
per-message handler and a batched handler (see [Combining a direct handler with batching](#combining-a-direct-handler-with-batching) below)
2. The message batching is able to group the message batches by tenant id *if* your Wolverine system uses multi-tenancy

## Combining a direct handler with batching

By default Wolverine assumes the batch handler is the *only* consumer of the element type, so an incoming `Item`
is always routed straight to the batch. If you *also* declare a direct `Handle(Item)` handler alongside
`BatchMessagesOf<Item>()`, the direct handler wins and the batch is silently shadowed -- the batched handler never runs.

The one exception is `MultipleHandlerBehavior.Separated`. Under that mode Wolverine treats the per-message handler and
the batched handler as two independent consumers of `Item`, so **both** run for every `Item`:

```csharp
opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated;
opts.BatchMessagesOf<Item>();

// Direct, per-message handler
public static class ItemAuditHandler
{
public static void Handle(Item item) { /* runs once per message */ }
}

// Batched handler
public static class ItemHandler
{
public static void Handle(Item[] items) { /* runs once per assembled batch */ }
}
```

To make this work, Wolverine moves the batch onto its own dedicated local queue (the element type's queue name with a
`-batch` suffix) so it no longer collides with the direct handler's queue, and fans every `Item` out to both queues.
This applies to messages published in-process *and* to `Item` messages arriving from an external transport listener.

### Multiple batched handlers

`MultipleHandlerBehavior.Separated` also lets you register **more than one** batched handler for the same element type --
for example one handler that publishes an integration event for the batch and another that archives it:

```csharp
opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated;
opts.BatchMessagesOf<Item>();

public static class ItemPublisher
{
public static void Handle(Item[] items) { /* publish an integration event */ }
}

public static class ItemArchiver
{
public static void Handle(Item[] items) { /* archive the batch */ }
}
```

Under `Separated` mode each `Handle(Item[])` handler is given its own sticky queue, so Wolverine fans the assembled
batch out to every one of them and each runs independently. (Under the default `Classic` behavior the multiple
`Handle(Item[])` handlers are instead combined into a single logical handler that invokes each one in turn.)

## What about durable messaging ("inbox")?

The durable inbox behaves just a little bit differently for message batching. Wolverine will technically
Expand Down
170 changes: 170 additions & 0 deletions src/Testing/CoreTests/Acceptance/batching_with_separated_handlers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
using JasperFx.Core;
using Microsoft.Extensions.Hosting;
using Wolverine;
using Wolverine.Runtime.Handlers;
using Wolverine.Tracking;
using Xunit;

namespace CoreTests.Acceptance;

// End-to-end batching behavior under MultipleHandlerBehavior.Separated, where a per-message direct
// handler and a batch handler are independent consumers of the same element type.
//
// - Direct + batch (LoadEvent): a direct Handle(LoadEvent) and a BatchMessagesOf<LoadEvent>()
// batch handler must BOTH run. Before the fix the direct handler silently shadowed the batch.
// - Multiple batch handlers (InvoiceEvent): a batched array type with more than one Handle(T[])
// handler is split onto per-handler sticky queues; the produced batch must fan out to all of
// them (previously threw NoHandlerForEndpointException and none ran).
//
// The routing/executor-resolution mechanics behind these are asserted in
// CoreTests.Runtime.Routing.separated_batch_routing.
public class batching_with_separated_handlers
{
private static IHostBuilder ConfigureMultipleBatchHost(bool withDirectHandler)
{
return Host.CreateDefaultBuilder().UseWolverine(opts =>
{
opts.Discovery.DisableConventionalDiscovery()
.IncludeType(typeof(InvoicePublisher))
.IncludeType(typeof(InvoiceArchiver));

if (withDirectHandler)
{
opts.Discovery.IncludeType(typeof(InvoiceTelemetry));
}

opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated;
opts.BatchMessagesOf<InvoiceEvent>(b => b.TriggerTime = 250.Milliseconds());
});
}

private static async Task WaitForAsync(Func<bool> condition)
{
for (var i = 0; i < 50 && !condition(); i++)
{
await Task.Delay(100.Milliseconds());
}
}

[Fact]
public async Task separated_direct_and_batch_handler_both_run_on_local_publish()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Discovery.DisableConventionalDiscovery()
.IncludeType(typeof(LoadPublisher))
.IncludeType(typeof(LoadTelemetry));

opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated;
opts.BatchMessagesOf<LoadEvent>(b => b.TriggerTime = 250.Milliseconds());
})
.StartAsync();

LoadPublisher.BatchCalls.Clear();
LoadTelemetry.SingleCalls.Clear();

await host.TrackActivity()
.Timeout(10.Seconds())
.WaitForMessageToBeReceivedAt<LoadEvent[]>(host)
.ExecuteAndWaitAsync(c => c.PublishAsync(new LoadEvent(1)));

// The direct telemetry handler runs per-message...
LoadTelemetry.SingleCalls.Count.ShouldBe(1);
// ...and the batch publisher handler also runs on the batched array.
LoadPublisher.BatchCalls.Count.ShouldBe(1);
LoadPublisher.BatchCalls[0].Select(x => x.Id).ShouldBe([1]);
}

[Fact]
public async Task separated_multiple_batch_handlers_all_run()
{
using var host = await ConfigureMultipleBatchHost(withDirectHandler: false).StartAsync();

InvoicePublisher.Calls.Clear();
InvoiceArchiver.Calls.Clear();

await host.TrackActivity()
.Timeout(10.Seconds())
.WaitForMessageToBeReceivedAt<InvoiceEvent[]>(host)
.ExecuteAndWaitAsync(c => c.PublishAsync(new InvoiceEvent(1)));

// The array reception condition fires at the batch queue (the fan-out); give the two
// relayed sticky deliveries time to drain before asserting both handlers ran.
await WaitForAsync(() => InvoicePublisher.Calls.Count == 1 && InvoiceArchiver.Calls.Count == 1);

InvoicePublisher.Calls.Count.ShouldBe(1);
InvoiceArchiver.Calls.Count.ShouldBe(1);
}

[Fact]
public async Task separated_direct_handler_plus_multiple_batch_handlers_all_run()
{
using var host = await ConfigureMultipleBatchHost(withDirectHandler: true).StartAsync();
var runtime = host.GetRuntime();

// The direct Handle(InvoiceEvent) collides with the batch element queue, so the batch was
// moved onto its dedicated -batch queue (see separated_batch_routing for the mechanics).
runtime.Options.BatchDefinitions.Single().LocalExecutionQueueName!.ShouldEndWith("-batch");

InvoicePublisher.Calls.Clear();
InvoiceArchiver.Calls.Clear();
InvoiceTelemetry.Calls.Clear();

await host.TrackActivity()
.Timeout(10.Seconds())
.WaitForMessageToBeReceivedAt<InvoiceEvent[]>(host)
.ExecuteAndWaitAsync(c => c.PublishAsync(new InvoiceEvent(7)));

await WaitForAsync(() =>
InvoiceTelemetry.Calls.Count == 1 && InvoicePublisher.Calls.Count == 1 &&
InvoiceArchiver.Calls.Count == 1);

// The direct per-message handler AND both independent batch handlers all run.
InvoiceTelemetry.Calls.Count.ShouldBe(1);
InvoicePublisher.Calls.Count.ShouldBe(1);
InvoiceArchiver.Calls.Count.ShouldBe(1);
}
}

public record LoadEvent(int Id);

public static class LoadPublisher
{
public static readonly List<LoadEvent[]> BatchCalls = new();

public static void Handle(LoadEvent[] messages)
{
BatchCalls.Add(messages);
}
}

public static class LoadTelemetry
{
public static readonly List<LoadEvent> SingleCalls = new();

public static void Handle(LoadEvent e)
{
SingleCalls.Add(e);
}
}

public record InvoiceEvent(int Id);

public static class InvoicePublisher
{
public static readonly List<InvoiceEvent[]> Calls = new();
public static void Handle(InvoiceEvent[] messages) => Calls.Add(messages);
}

public static class InvoiceArchiver
{
public static readonly List<InvoiceEvent[]> Calls = new();
public static void Handle(InvoiceEvent[] messages) => Calls.Add(messages);
}

public static class InvoiceTelemetry
{
public static readonly List<InvoiceEvent> Calls = new();
public static void Handle(InvoiceEvent e) => Calls.Add(e);
}
118 changes: 118 additions & 0 deletions src/Testing/CoreTests/Runtime/Routing/separated_batch_routing.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
using CoreTests.Acceptance;
using JasperFx.Core.Reflection;
using Microsoft.Extensions.Hosting;
using Wolverine;
using Wolverine.ComplianceTests;
using Wolverine.Runtime;
using Wolverine.Runtime.Batching;
using Wolverine.Runtime.Handlers;
using Wolverine.Runtime.Routing;
using Wolverine.Tracking;
using Wolverine.Transports.Local;
using Xunit;

namespace CoreTests.Runtime.Routing;

// Routing and executor-resolution mechanics for the Separated-mode batching scenarios whose
// end-to-end behavior is covered in CoreTests.Acceptance.batching_with_separated_handlers. These
// assert how a conflicting element type is routed (fan-out to the dedicated -batch queue) and how
// the dedicated batch queue / external listener / produced-batch queue resolve to the right handler.
//
// Shares the handler + message types (LoadEvent/InvoiceEvent ...) with the batching suite.
public class separated_batch_routing
{
[Fact]
public async Task batch_lives_on_its_own_queue_and_message_fans_out_to_both()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Discovery.DisableConventionalDiscovery()
.IncludeType(typeof(LoadPublisher))
.IncludeType(typeof(LoadTelemetry));

opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated;
opts.BatchMessagesOf<LoadEvent>();
})
.StartAsync();

var runtime = host.GetRuntime();

// The batch was moved off the element type's default queue onto a dedicated one.
var batch = runtime.Options.BatchDefinitions.Single();
batch.LocalExecutionQueueName!.ShouldEndWith("-batch");
var batchUri = new Uri("local://" + batch.LocalExecutionQueueName);

// Routing a LoadEvent now fans out to BOTH the direct handler queue and the batch queue.
var destinations = runtime.RoutingFor(typeof(LoadEvent))
.ShouldBeOfType<MessageRouter<LoadEvent>>()
.Routes.OfType<MessageRoute>().Select(x => x.Sender.Destination).ToArray();
destinations.ShouldContain(batchUri);
destinations.Length.ShouldBe(2);

// The dedicated batch queue resolves to the batching processor (not the direct handler).
var batchQueue = runtime.Endpoints.EndpointFor(batchUri)!;
var batchHandler = runtime.As<IExecutorFactory>().BuildFor(typeof(LoadEvent), batchQueue)
.ShouldBeOfType<Executor>().Handler;
batchHandler.ShouldBeOfType<BatchingProcessor<LoadEvent>>();
}

[Fact]
public async Task separated_external_arrival_resolves_to_fanout_for_conflicting_element_type()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Discovery.DisableConventionalDiscovery()
.IncludeType(typeof(LoadPublisher))
.IncludeType(typeof(LoadTelemetry));

opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated;
opts.BatchMessagesOf<LoadEvent>();

// A non-local (external) listener — a LoadEvent arriving here must still
// reach both the direct handler and the batch.
opts.ListenForMessagesFrom("stub://external");
})
.StartAsync();

var runtime = host.GetRuntime();
var external = runtime.Options.Transports.AllEndpoints()
.First(e => e is not LocalQueue && e.Uri.Scheme == "stub");

// A LoadEvent arriving from an external listener relays to BOTH local queues
// (direct + batch) via a fanout handler, so both run independently.
var handler = runtime.As<IExecutorFactory>().BuildFor(typeof(LoadEvent), external)
.ShouldBeOfType<Executor>().Handler;
handler.ShouldBeOfType<FanoutMessageHandler<LoadEvent>>();
}

[Fact]
public async Task produced_array_fans_out_to_each_sticky_handler_queue()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Discovery.DisableConventionalDiscovery()
.IncludeType(typeof(InvoicePublisher))
.IncludeType(typeof(InvoiceArchiver));

opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated;
opts.BatchMessagesOf<InvoiceEvent>();
})
.StartAsync();

var runtime = host.GetRuntime();

// The batch's execution queue receives the produced InvoiceEvent[]...
var batch = runtime.Options.BatchDefinitions.Single();
var batchUri = new Uri("local://" + batch.LocalExecutionQueueName);
var batchQueue = runtime.Endpoints.EndpointFor(batchUri)!;

// ...but the two Handle(InvoiceEvent[]) handlers were separated onto their own sticky queues,
// so the array must fan out from the batch queue to each of them.
var handler = runtime.As<IExecutorFactory>().BuildFor(typeof(InvoiceEvent[]), batchQueue)
.ShouldBeOfType<Executor>().Handler;
handler.ShouldBeOfType<FanoutMessageHandler<InvoiceEvent[]>>();
}
}
19 changes: 15 additions & 4 deletions src/Wolverine/Runtime/Handlers/HandlerGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,6 @@ public void AddRange(IEnumerable<HandlerCall> calls)
// populate _handlers at HandlerGraph.Compile time so the steady-state
// hot path is pure cache lookups. The pre-population work is tracked in
// #2769 (CloseAndBuildAs elimination).
[UnconditionalSuppressMessage("AOT", "IL3050",
Justification = "FanoutMessageHandler<> closed over runtime messageType; AOT consumers pre-populate _handlers via TypeLoadMode.Static. See AOT guide / #2769.")]
private IMessageHandler getOrBuildFanoutHandler(Type messageType, HandlerChain chain)
{
if (_handlers.TryFind(messageType, out var cached) && cached != null)
Expand All @@ -315,13 +313,26 @@ private IMessageHandler getOrBuildFanoutHandler(Type messageType, HandlerChain c
.Distinct()
.ToArray();

var handlerType = typeof(FanoutMessageHandler<>).MakeGenericType(messageType);
var handler = (IMessageHandler)Activator.CreateInstance(handlerType, localUris, chain)!;
var handler = BuildFanoutHandler(messageType, chain, localUris);

_handlers = _handlers.AddOrUpdate(messageType, handler);
return handler;
}

// Closes FanoutMessageHandler<> over the runtime-resolved messageType and
// Activator.CreateInstance's the result. Used both for sticky-handler fanout
// (getOrBuildFanoutHandler, cached in _handlers) and for relaying an externally-
// arriving element type to its direct + batch local queues (NOT cached — the
// _handlers[messageType] slot belongs to the direct handler). AOT-clean apps in
// TypeLoadMode.Static pre-populate handlers at Compile time. See #2769.
[UnconditionalSuppressMessage("AOT", "IL3050",
Justification = "FanoutMessageHandler<> closed over runtime messageType; AOT consumers pre-populate handlers via TypeLoadMode.Static. See AOT guide / #2769.")]
internal IMessageHandler BuildFanoutHandler(Type messageType, HandlerChain chain, Uri[] localQueueUris)
{
var handlerType = typeof(FanoutMessageHandler<>).MakeGenericType(messageType);
return (IMessageHandler)Activator.CreateInstance(handlerType, localQueueUris, chain)!;
}

// Compile is the bootstrap-time handler-graph build. The remaining IL2026
// here comes from registerMessageTypes' MakeGenericType / GetInterfaces walks
// (pre-population work tracked in #2769) and from the opt-in
Expand Down
Loading
Loading