From afbfb5b6307eaebfd989d282afa62c7cf0e55976 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 17 Mar 2026 17:59:43 -0500 Subject: [PATCH] Add global partitioning support for external transports with docs - Add UseSharded* extension methods on GlobalPartitionedMessageTopology for RabbitMQ, Kafka, SQS, and Pulsar transports with ConfigureListening init - Enhance SetExternalTopology to preserve pre-configured local queues and validate slot count matching between external and local topologies - Add integration tests for global partitioned sharded processing across RabbitMQ, Kafka, and SQS transports - Add unit tests for slot count validation in GlobalPartitionedMessageTopology - Add global partitioning documentation section to partitioning guide and transport-specific subsections with cross-links Co-Authored-By: Claude Opus 4.6 --- docs/guide/messaging/partitioning.md | 82 +++++++-- docs/guide/messaging/transports/kafka.md | 30 +++- docs/guide/messaging/transports/pulsar.md | 26 +++ .../messaging/transports/rabbitmq/index.md | 39 +++++ docs/guide/messaging/transports/sqs/index.md | 26 +++ .../Partitioning/global_partitioning_tests.cs | 83 +++++++++ .../global_partitioned_sharded_processing.cs | 97 +++++++++++ .../AmazonSqsTransportExtensions.cs | 1 + .../Wolverine.Kafka.Tests.csproj | 1 + .../global_partitioned_sharded_processing.cs | 158 ++++++++++++++++++ .../KafkaTransportExtensions.cs | 1 + .../PulsarTransportExtensions.cs | 1 + .../Wolverine.RabbitMQ.Tests/Samples.cs | 38 +++++ .../global_partitioned_sharded_processing.cs | 97 +++++++++++ .../RabbitMqTransportExtensions.cs | 1 + .../GlobalPartitionedMessageTopology.cs | 35 +++- 16 files changed, 698 insertions(+), 18 deletions(-) create mode 100644 src/Transports/AWS/Wolverine.AmazonSqs.Tests/global_partitioned_sharded_processing.cs create mode 100644 src/Transports/Kafka/Wolverine.Kafka.Tests/global_partitioned_sharded_processing.cs create mode 100644 src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/global_partitioned_sharded_processing.cs diff --git a/docs/guide/messaging/partitioning.md b/docs/guide/messaging/partitioning.md index db2f388ee..2aa857805 100644 --- a/docs/guide/messaging/partitioning.md +++ b/docs/guide/messaging/partitioning.md @@ -466,22 +466,84 @@ The rule will not override an explicitly set `PartitionKey` on an outgoing envel `DeliveryOptions`, that value takes precedence. ::: -## Partitioning Messages Received from External Systems +## Global Partitioning -::: warning -Brute force, no points for style, explicit coding ahead! +Global partitioning extends the [sharded publishing](#sharded-publishing) concept to support multi-node deployments where messages must be processed sequentially by group id across the entire cluster, not just within a single node. + +### How It Works + +When you configure global partitioning, Wolverine: + +1. **Links local queues to external transport queues** -- Each external transport endpoint (e.g., a RabbitMQ queue or Kafka topic) gets a companion local queue. The external queue acts as the coordination point across nodes, while the local queue handles the actual sequential processing within a node. + +2. **Smart routing based on listener ownership** -- When a message is published, Wolverine checks whether the current node owns the exclusive listener for the target shard. If it does, the message is routed directly to the companion local queue (avoiding unnecessary network hops). If the shard is owned by another node, the message is sent through the external transport so it reaches the correct node. + +3. **Support for modular monoliths** -- You can configure multiple global partitioning topologies for the same message type in different modules. Each module can have its own set of sharded queues and routing rules, allowing independent sequential processing pipelines within a single application. + +::: tip +In single-node mode, global partitioning automatically shortcuts all messages to the companion local queues since the current node owns all listeners. ::: -If you are receiving messages from an external source that will be vulnerable to concurrent access problems when the messages -are executed, but you either do not want to make the external system publish the group ids or have no ability to make the -upstream system care about your own internal group id details, you can simply relay the received messages back out -to a partitioned message topology owned by your system. +### Configuration + +Global partitioning is configured through `MessagePartitioningRules.GlobalPartitioned()`. You need to: + +1. Set up a message partitioning strategy (e.g., `ByMessage()` or `UseInferredMessageGrouping()`) +2. Configure the external transport topology (sharded queues/topics) +3. Specify which message types participate in global partitioning + +The external and local topologies are automatically created with matching shard counts. The local queues are named with a `global-` prefix followed by the base name (e.g., `global-orders1`, `global-orders2`, etc.). + +### Transport-Specific Configuration + +Each supported transport has its own extension method for configuring the external topology: + +| Transport | Extension Method | Documentation | +|-----------|-----------------|---------------| +| RabbitMQ | `UseShardedRabbitQueues()` | [RabbitMQ Global Partitioning](/guide/messaging/transports/rabbitmq/#global-partitioning) | +| Kafka | `UseShardedKafkaTopics()` | [Kafka Global Partitioning](/guide/messaging/transports/kafka#global-partitioning) | +| Amazon SQS | `UseShardedAmazonSqsQueues()` | [SQS Global Partitioning](/guide/messaging/transports/sqs/#global-partitioning) | +| Pulsar | `UseShardedPulsarTopics()` | [Pulsar Global Partitioning](/guide/messaging/transports/pulsar#global-partitioning) | -Using Amazon SQS as our transport, lets say that we're receiving messages from the external system at one queue like this: +### Example with RabbitMQ + + + +```cs +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRabbitMq(); + + // Do something to add Saga storage too! + + opts + .MessagePartitioning + + // This tells Wolverine to "just" use implied + // message grouping based on Saga identity among other things + .UseInferredMessageGrouping() + + + .GlobalPartitioned(topology => + { + // Creates 5 sharded RabbitMQ queues named "sequenced1" through "sequenced5" + // with matching companion local queues for sequential processing + topology.UseShardedRabbitQueues("sequenced", 5); + topology.MessagesImplementing(); + + }); + }).StartAsync(); +``` +snippet source | anchor + -Hey folks, more coming soon. Hopefully before Wolverine 5.0. +### Validation -Watch this issue: https://github.com/JasperFx/wolverine/issues/1728 +Wolverine validates global partitioning configuration at startup. It will throw an `InvalidOperationException` if: +- No message type matching policies are configured +- No external transport topology is configured +- The external and local topologies have different shard counts diff --git a/docs/guide/messaging/transports/kafka.md b/docs/guide/messaging/transports/kafka.md index b8444e7c9..563c95258 100644 --- a/docs/guide/messaging/transports/kafka.md +++ b/docs/guide/messaging/transports/kafka.md @@ -474,13 +474,39 @@ using var host = await Host.CreateDefaultBuilder() { opts .UseKafka("localhost:9092") - + // Tell Wolverine that this application will never // produce messages to turn off any diagnostics that might // try to "ping" a topic and result in errors .ConsumeOnly(); - + }).StartAsync(); ``` snippet source | anchor + +## Global Partitioning + +Kafka topics can be used as the external transport for [global partitioned messaging](/guide/messaging/partitioning#global-partitioning). This creates a set of sharded Kafka topics with companion local queues for sequential processing across a multi-node cluster. + +Use `UseShardedKafkaTopics()` within a `GlobalPartitioned()` configuration: + +```cs +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka("localhost:9092").AutoProvision(); + + opts.MessagePartitioning.ByMessage(x => x.GroupId); + + opts.MessagePartitioning.GlobalPartitioned(topology => + { + // Creates 4 sharded Kafka topics named "orders1" through "orders4" + // with matching companion local queues for sequential processing + topology.UseShardedKafkaTopics("orders", 4); + topology.MessagesImplementing(); + }); + }).StartAsync(); +``` + +This creates Kafka topics named `orders1` through `orders4` with companion local queues `global-orders1` through `global-orders4`. Messages are routed to the correct shard based on their group id, and Wolverine handles the coordination between nodes automatically. diff --git a/docs/guide/messaging/transports/pulsar.md b/docs/guide/messaging/transports/pulsar.md index 428d4c3c6..37031c290 100644 --- a/docs/guide/messaging/transports/pulsar.md +++ b/docs/guide/messaging/transports/pulsar.md @@ -126,6 +126,32 @@ builder.UseWolverine(opts => snippet source | anchor +## Global Partitioning + +Pulsar topics can be used as the external transport for [global partitioned messaging](/guide/messaging/partitioning#global-partitioning). This creates a set of sharded Pulsar topics with companion local queues for sequential processing across a multi-node cluster. + +Use `UseShardedPulsarTopics()` within a `GlobalPartitioned()` configuration: + +```cs +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UsePulsar(); + + opts.MessagePartitioning.ByMessage(x => x.GroupId); + + opts.MessagePartitioning.GlobalPartitioned(topology => + { + // Creates 4 sharded Pulsar topics named "orders1" through "orders4" + // with matching companion local queues for sequential processing + topology.UseShardedPulsarTopics("orders", 4); + topology.MessagesImplementing(); + }); + }).StartAsync(); +``` + +This creates Pulsar topics named `orders1` through `orders4` with companion local queues `global-orders1` through `global-orders4`. Messages are routed to the correct shard based on their group id, and Wolverine handles the coordination between nodes automatically. + ## Interoperability ::: tip diff --git a/docs/guide/messaging/transports/rabbitmq/index.md b/docs/guide/messaging/transports/rabbitmq/index.md index a3d343fe3..ca50d189a 100644 --- a/docs/guide/messaging/transports/rabbitmq/index.md +++ b/docs/guide/messaging/transports/rabbitmq/index.md @@ -228,6 +228,45 @@ builder.UseWolverine(opts => snippet source | anchor +## Global Partitioning + +RabbitMQ queues can be used as the external transport for [global partitioned messaging](/guide/messaging/partitioning#global-partitioning). This creates a set of sharded RabbitMQ queues with companion local queues for sequential processing across a multi-node cluster. + +Use `UseShardedRabbitQueues()` within a `GlobalPartitioned()` configuration: + + + +```cs +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRabbitMq(); + + // Do something to add Saga storage too! + + opts + .MessagePartitioning + + // This tells Wolverine to "just" use implied + // message grouping based on Saga identity among other things + .UseInferredMessageGrouping() + + + .GlobalPartitioned(topology => + { + // Creates 5 sharded RabbitMQ queues named "sequenced1" through "sequenced5" + // with matching companion local queues for sequential processing + topology.UseShardedRabbitQueues("sequenced", 5); + topology.MessagesImplementing(); + + }); + }).StartAsync(); +``` +snippet source | anchor + + +This creates RabbitMQ queues named `sequenced1` through `sequenced5` with companion local queues `global-sequenced1` through `global-sequenced5`. Messages are routed to the correct shard based on their group id, and Wolverine handles the coordination between nodes automatically. + ## Compatibility Note ::: info diff --git a/docs/guide/messaging/transports/sqs/index.md b/docs/guide/messaging/transports/sqs/index.md index 89625f6f4..c2444eb89 100644 --- a/docs/guide/messaging/transports/sqs/index.md +++ b/docs/guide/messaging/transports/sqs/index.md @@ -222,6 +222,32 @@ using var host = await Host.CreateDefaultBuilder() Calling `EnableWolverineControlQueues()` implicitly enables system queues and request/reply support as well. +## Global Partitioning + +Amazon SQS queues can be used as the external transport for [global partitioned messaging](/guide/messaging/partitioning#global-partitioning). This creates a set of sharded SQS queues with companion local queues for sequential processing across a multi-node cluster. + +Use `UseShardedAmazonSqsQueues()` within a `GlobalPartitioned()` configuration: + +```cs +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseAmazonSqsTransport().AutoProvision(); + + opts.MessagePartitioning.ByMessage(x => x.GroupId); + + opts.MessagePartitioning.GlobalPartitioned(topology => + { + // Creates 4 sharded SQS queues named "orders1" through "orders4" + // with matching companion local queues for sequential processing + topology.UseShardedAmazonSqsQueues("orders", 4); + topology.MessagesImplementing(); + }); + }).StartAsync(); +``` + +This creates SQS queues named `orders1` through `orders4` with companion local queues `global-orders1` through `global-orders4`. Messages are routed to the correct shard based on their group id, and Wolverine handles the coordination between nodes automatically. + ## Disabling System Queues If your application does not have IAM permissions to create or delete queues, you can explicitly disable system queues: diff --git a/src/Testing/CoreTests/Runtime/Partitioning/global_partitioning_tests.cs b/src/Testing/CoreTests/Runtime/Partitioning/global_partitioning_tests.cs index 5b17ab106..42a7a66d2 100644 --- a/src/Testing/CoreTests/Runtime/Partitioning/global_partitioning_tests.cs +++ b/src/Testing/CoreTests/Runtime/Partitioning/global_partitioning_tests.cs @@ -200,6 +200,89 @@ public void try_match_returns_false_when_no_external_topology() topology.TryMatch(typeof(GlobalTestMessage), runtime, out var route).ShouldBeFalse(); } + + [Fact] + public void assert_validity_throws_when_no_local_topology() + { + var topology = CreateTopology(); + topology.Message(); + + // Directly set external without local by using reflection or a different path + // Actually, SetExternalTopology creates local if null, so we need to test + // a scenario where local is missing. This can happen if someone only sets + // subscriptions but no topology at all. + // The existing test "assert_validity_throws_when_no_external_topology" covers + // the case where external is null. With the new validation, if external is set + // but local is somehow null, it would also throw. + // Since SetExternalTopology always creates local if null, this test validates + // the error message for missing local topology indirectly via missing external. + Should.Throw(() => topology.AssertValidity()) + .Message.ShouldContain("external transport topology"); + } + + [Fact] + public void assert_validity_throws_when_local_and_external_slot_counts_differ() + { + var topology = CreateTopology(); + topology.Message(); + + // Pre-configure local queues with 3 slots + topology.LocalQueues("local", 3); + + // Set external topology with 5 slots - the local topology won't be overwritten + var external = CreateLocalTopology("ext", 5); + topology.SetExternalTopology(external, "test"); + + Should.Throw(() => topology.AssertValidity()) + .Message.ShouldContain("must match"); + } + + [Fact] + public void assert_validity_passes_when_local_and_external_slot_counts_match() + { + var topology = CreateTopology(); + topology.Message(); + + // Pre-configure local queues with same count as external + topology.LocalQueues("local", 4); + + var external = CreateLocalTopology("ext", 4); + topology.SetExternalTopology(external, "test"); + + // Should not throw + topology.AssertValidity(); + } + + [Fact] + public void set_external_topology_preserves_pre_configured_local_queues() + { + var topology = CreateTopology(); + + // Pre-configure local queues + topology.LocalQueues("my-local", 3); + var originalLocal = topology.LocalTopology; + + // Set external topology - should NOT overwrite existing local topology + var external = CreateLocalTopology("ext", 3); + topology.SetExternalTopology(external, "test"); + + topology.LocalTopology.ShouldBeSameAs(originalLocal); + } + + [Fact] + public void set_external_topology_creates_local_when_not_pre_configured() + { + var topology = CreateTopology(); + + // Don't pre-configure local queues + topology.LocalTopology.ShouldBeNull(); + + var external = CreateLocalTopology("ext", 3); + topology.SetExternalTopology(external, "test"); + + topology.LocalTopology.ShouldNotBeNull(); + topology.LocalTopology!.Slots.Count.ShouldBe(3); + } } #endregion diff --git a/src/Transports/AWS/Wolverine.AmazonSqs.Tests/global_partitioned_sharded_processing.cs b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/global_partitioned_sharded_processing.cs new file mode 100644 index 000000000..c0ac95cef --- /dev/null +++ b/src/Transports/AWS/Wolverine.AmazonSqs.Tests/global_partitioned_sharded_processing.cs @@ -0,0 +1,97 @@ +using IntegrationTests; +using JasperFx.Core; +using Marten; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.Configuration; +using Wolverine.Marten; +using Wolverine.Runtime.Partitioning; +using Wolverine.Tracking; +using Xunit; +using Xunit.Abstractions; + +namespace Wolverine.AmazonSqs.Tests; + +public class global_partitioned_sharded_processing +{ + private readonly ITestOutputHelper _output; + + public global_partitioned_sharded_processing(ITestOutputHelper output) + { + _output = output; + } + + private async Task pumpOutMessages(IMessageContext bus) + { + var tasks = new Task[5]; + for (int i = 0; i < tasks.Length; i++) + { + tasks[i] = Task.Run(async () => + { + for (int j = 0; j < 5; j++) + { + var id = Guid.NewGuid(); + + await bus.PublishAsync(new LogA(id)); + await bus.PublishAsync(new LogB(id)); + await bus.PublishAsync(new LogC(id)); + await bus.PublishAsync(new LogD(id)); + await bus.PublishAsync(new LogD(id)); + await bus.PublishAsync(new LogC(id)); + await bus.PublishAsync(new LogB(id)); + await bus.PublishAsync(new LogA(id)); + } + }); + } + + await Task.WhenAll(tasks); + } + + [Fact] + public async Task hammer_it_with_lots_of_messages_global_partitioned() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Durability.Mode = DurabilityMode.Solo; + opts.UseAmazonSqsTransportLocally().AutoProvision().AutoPurgeOnStartup(); + + opts.Discovery.DisableConventionalDiscovery().IncludeType(typeof(LetterMessageHandler)); + + opts.Services.AddMarten(m => + { + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "gletters_sqs"; + m.DisableNpgsqlLogging = true; + }).IntegrateWithWolverine(); + + opts.MessagePartitioning.ByMessage(x => x.Id.ToString()); + + opts.MessagePartitioning.GlobalPartitioned(topology => + { + topology.UseShardedAmazonSqsQueues("gletters", 4); + topology.MessagesImplementing(); + }); + }).StartAsync(); + + var tracked = await host + .TrackActivity() + .IncludeExternalTransports() + .Timeout(120.Seconds()) + .ExecuteAndWaitAsync(pumpOutMessages); + + var envelopes = tracked.Executed.Envelopes().ToArray(); + + var counts = envelopes.GroupBy(x => x.Destination); + foreach (var count in counts) + { + _output.WriteLine(count.Key.ToString() + " had " + count.Count()); + } + + // In single-node mode, global partitioning routes directly to companion local queues + envelopes.Any(x => x.Destination == new Uri("local://global-gletters1/")).ShouldBeTrue(); + envelopes.Any(x => x.Destination == new Uri("local://global-gletters2/")).ShouldBeTrue(); + envelopes.Any(x => x.Destination == new Uri("local://global-gletters3/")).ShouldBeTrue(); + envelopes.Any(x => x.Destination == new Uri("local://global-gletters4/")).ShouldBeTrue(); + } +} diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/AmazonSqsTransportExtensions.cs b/src/Transports/AWS/Wolverine.AmazonSqs/AmazonSqsTransportExtensions.cs index 68149662e..dd41d04b5 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs/AmazonSqsTransportExtensions.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs/AmazonSqsTransportExtensions.cs @@ -235,6 +235,7 @@ public static GlobalPartitionedMessageTopology UseShardedAmazonSqsQueues( topology.SetExternalTopology(opts => { var t = new PartitionedMessageTopologyWithQueues(opts, PartitionSlots.Five, baseName, numberOfEndpoints); + t.ConfigureListening(x => {}); configure?.Invoke(t); return t; }, baseName); diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/Wolverine.Kafka.Tests.csproj b/src/Transports/Kafka/Wolverine.Kafka.Tests/Wolverine.Kafka.Tests.csproj index 2b80c1363..1a2c1e6ad 100644 --- a/src/Transports/Kafka/Wolverine.Kafka.Tests/Wolverine.Kafka.Tests.csproj +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/Wolverine.Kafka.Tests.csproj @@ -23,6 +23,7 @@ + diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/global_partitioned_sharded_processing.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/global_partitioned_sharded_processing.cs new file mode 100644 index 000000000..b7b48737a --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/global_partitioned_sharded_processing.cs @@ -0,0 +1,158 @@ +using System.Diagnostics; +using IntegrationTests; +using JasperFx.Core; +using Marten; +using Marten.Metadata; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.Configuration; +using Wolverine.Marten; +using Wolverine.Runtime.Partitioning; +using Wolverine.Tracking; +using Xunit; +using Xunit.Abstractions; + +namespace Wolverine.Kafka.Tests; + +public class global_partitioned_sharded_processing +{ + private readonly ITestOutputHelper _output; + + public global_partitioned_sharded_processing(ITestOutputHelper output) + { + _output = output; + } + + private async Task pumpOutMessages(IMessageContext bus) + { + var tasks = new Task[5]; + for (int i = 0; i < tasks.Length; i++) + { + tasks[i] = Task.Run(async () => + { + for (int j = 0; j < 5; j++) + { + var id = Guid.NewGuid(); + + await bus.PublishAsync(new GLogA(id)); + await bus.PublishAsync(new GLogB(id)); + await bus.PublishAsync(new GLogC(id)); + await bus.PublishAsync(new GLogD(id)); + await bus.PublishAsync(new GLogD(id)); + await bus.PublishAsync(new GLogC(id)); + await bus.PublishAsync(new GLogB(id)); + await bus.PublishAsync(new GLogA(id)); + } + }); + } + + await Task.WhenAll(tasks); + } + + [Fact] + public async Task hammer_it_with_lots_of_messages_global_partitioned() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Durability.Mode = DurabilityMode.Solo; + opts.UseKafka("localhost:9092").AutoProvision(); + + opts.Discovery.DisableConventionalDiscovery().IncludeType(typeof(GLetterMessageHandler)); + + opts.Services.AddMarten(m => + { + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "gletters_kafka"; + m.DisableNpgsqlLogging = true; + }).IntegrateWithWolverine(); + + opts.MessagePartitioning.ByMessage(x => x.Id.ToString()); + + opts.MessagePartitioning.GlobalPartitioned(topology => + { + topology.UseShardedKafkaTopics("gletters", 4); + topology.MessagesImplementing(); + }); + }).StartAsync(); + + var tracked = await host + .TrackActivity() + .IncludeExternalTransports() + .Timeout(120.Seconds()) + .ExecuteAndWaitAsync(pumpOutMessages); + + var envelopes = tracked.Executed.Envelopes().ToArray(); + + var counts = envelopes.GroupBy(x => x.Destination); + foreach (var count in counts) + { + _output.WriteLine(count.Key.ToString() + " had " + count.Count()); + } + + // In single-node mode, global partitioning routes directly to companion local queues + envelopes.Any(x => x.Destination == new Uri("local://global-gletters1/")).ShouldBeTrue(); + envelopes.Any(x => x.Destination == new Uri("local://global-gletters2/")).ShouldBeTrue(); + envelopes.Any(x => x.Destination == new Uri("local://global-gletters3/")).ShouldBeTrue(); + envelopes.Any(x => x.Destination == new Uri("local://global-gletters4/")).ShouldBeTrue(); + } +} + +public interface IGLetterMessage +{ + Guid Id { get; } +} + +public record GLogA(Guid Id) : IGLetterMessage; +public record GLogB(Guid Id) : IGLetterMessage; +public record GLogC(Guid Id) : IGLetterMessage; +public record GLogD(Guid Id) : IGLetterMessage; + +[AggregateHandler] +public static class GLetterMessageHandler +{ + public static GAEvent Handle(GLogA command, GSimpleAggregate aggregate, Envelope envelope) + { + Debug.WriteLine($"Got GLogA for {command.Id} at envelope {envelope.Destination}"); + return new GAEvent(); + } + + public static GBEvent Handle(GLogB command, GSimpleAggregate aggregate, Envelope envelope) + { + Debug.WriteLine($"Got GLogB for {command.Id} at envelope {envelope.Destination}"); + return new GBEvent(); + } + + public static GCEvent Handle(GLogC command, GSimpleAggregate aggregate, Envelope envelope) + { + Debug.WriteLine($"Got GLogC for {command.Id} at envelope {envelope.Destination}"); + return new GCEvent(); + } + + public static GDEvent Handle(GLogD command, GSimpleAggregate aggregate, Envelope envelope) + { + Debug.WriteLine($"Got GLogD for {command.Id} at envelope {envelope.Destination}"); + return new GDEvent(); + } +} + +public class GSimpleAggregate : IRevisioned +{ + public int Version { get; set; } + public Guid Id { get; set; } + + public int ACount { get; set; } + public int BCount { get; set; } + public int CCount { get; set; } + public int DCount { get; set; } + + public void Apply(GAEvent _) => ACount++; + public void Apply(GBEvent _) => BCount++; + public void Apply(GCEvent _) => CCount++; + public void Apply(GDEvent _) => DCount++; +} + +public record GAEvent; +public record GBEvent; +public record GCEvent; +public record GDEvent; diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs index da66f1d4f..3039bcb17 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTransportExtensions.cs @@ -281,6 +281,7 @@ public static GlobalPartitionedMessageTopology UseShardedKafkaTopics( topology.SetExternalTopology(opts => { var t = new PartitionedMessageTopologyWithTopics(opts, PartitionSlots.Five, baseName, numberOfEndpoints); + t.ConfigureListening(x => {}); configure?.Invoke(t); return t; }, baseName); diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransportExtensions.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransportExtensions.cs index 2a686fe61..d98103f23 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransportExtensions.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarTransportExtensions.cs @@ -161,6 +161,7 @@ public static GlobalPartitionedMessageTopology UseShardedPulsarTopics( topology.SetExternalTopology(opts => { var t = new PartitionedMessageTopologyWithTopics(opts, PartitionSlots.Five, baseName, numberOfEndpoints); + t.ConfigureListening(x => {}); configure?.Invoke(t); return t; }, baseName); diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs index 8af2dd30e..2b5968ef3 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs @@ -7,6 +7,7 @@ using Wolverine.ComplianceTests.Compliance; using Wolverine.RabbitMQ.Internal; using Wolverine.RabbitMQ.Tests.ConventionalRouting; +using Wolverine.Runtime.Partitioning; namespace Wolverine.RabbitMQ.Tests; @@ -709,4 +710,41 @@ public static async Task configure() var host = builder.Build(); await host.StartAsync(); } +} + +public record MySequencedCommand(Guid SagaId, int? Order) : SequencedMessage; + +public static class GlobalTopology +{ + public static async Task configure() + { + #region sample_global_partitioned_with_rabbit_mq + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRabbitMq(); + + // Do something to add Saga storage too! + + opts + .MessagePartitioning + + // This tells Wolverine to "just" use implied + // message grouping based on Saga identity among other things + .UseInferredMessageGrouping() + + + .GlobalPartitioned(topology => + { + // Creates 5 sharded RabbitMQ queues named "sequenced1" through "sequenced5" + // with matching companion local queues for sequential processing + topology.UseShardedRabbitQueues("sequenced", 5); + topology.MessagesImplementing(); + + }); + }).StartAsync(); + + #endregion + } } \ No newline at end of file diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/global_partitioned_sharded_processing.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/global_partitioned_sharded_processing.cs new file mode 100644 index 000000000..42063ecf7 --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/global_partitioned_sharded_processing.cs @@ -0,0 +1,97 @@ +using IntegrationTests; +using JasperFx.Core; +using Marten; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.Configuration; +using Wolverine.Marten; +using Wolverine.Runtime.Partitioning; +using Wolverine.Tracking; +using Xunit; +using Xunit.Abstractions; + +namespace Wolverine.RabbitMQ.Tests; + +public class global_partitioned_sharded_processing +{ + private readonly ITestOutputHelper _output; + + public global_partitioned_sharded_processing(ITestOutputHelper output) + { + _output = output; + } + + private async Task pumpOutMessages(IMessageContext bus) + { + var tasks = new Task[5]; + for (int i = 0; i < tasks.Length; i++) + { + tasks[i] = Task.Run(async () => + { + for (int j = 0; j < 5; j++) + { + var id = Guid.NewGuid(); + + await bus.PublishAsync(new LogA(id)); + await bus.PublishAsync(new LogB(id)); + await bus.PublishAsync(new LogC(id)); + await bus.PublishAsync(new LogD(id)); + await bus.PublishAsync(new LogD(id)); + await bus.PublishAsync(new LogC(id)); + await bus.PublishAsync(new LogB(id)); + await bus.PublishAsync(new LogA(id)); + } + }); + } + + await Task.WhenAll(tasks); + } + + [Fact] + public async Task hammer_it_with_lots_of_messages_global_partitioned() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Durability.Mode = DurabilityMode.Solo; + opts.UseRabbitMq().AutoProvision().AutoPurgeOnStartup(); + + opts.Discovery.DisableConventionalDiscovery().IncludeType(typeof(LetterMessageHandler)); + + opts.Services.AddMarten(m => + { + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "gletters_rabbit"; + m.DisableNpgsqlLogging = true; + }).IntegrateWithWolverine(); + + opts.MessagePartitioning.ByMessage(x => x.Id.ToString()); + + opts.MessagePartitioning.GlobalPartitioned(topology => + { + topology.UseShardedRabbitQueues("gletters", 4); + topology.MessagesImplementing(); + }); + }).StartAsync(); + + var tracked = await host + .TrackActivity() + .IncludeExternalTransports() + .Timeout(120.Seconds()) + .ExecuteAndWaitAsync(pumpOutMessages); + + var envelopes = tracked.Executed.Envelopes().ToArray(); + + var counts = envelopes.GroupBy(x => x.Destination); + foreach (var count in counts) + { + _output.WriteLine(count.Key.ToString() + " had " + count.Count()); + } + + // In single-node mode, global partitioning routes directly to companion local queues + envelopes.Any(x => x.Destination == new Uri("local://global-gletters1/")).ShouldBeTrue(); + envelopes.Any(x => x.Destination == new Uri("local://global-gletters2/")).ShouldBeTrue(); + envelopes.Any(x => x.Destination == new Uri("local://global-gletters3/")).ShouldBeTrue(); + envelopes.Any(x => x.Destination == new Uri("local://global-gletters4/")).ShouldBeTrue(); + } +} diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqTransportExtensions.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqTransportExtensions.cs index ba7309c61..e115cc82b 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqTransportExtensions.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqTransportExtensions.cs @@ -507,6 +507,7 @@ public static GlobalPartitionedMessageTopology UseShardedRabbitQueues( topology.SetExternalTopology(opts => { var t = new PartitionedMessageTopologyWithQueues(opts, PartitionSlots.Five, baseName, numberOfEndpoints); + t.ConfigureListening(x => {}); configure?.Invoke(t); return t; }, baseName); diff --git a/src/Wolverine/Runtime/Partitioning/GlobalPartitionedMessageTopology.cs b/src/Wolverine/Runtime/Partitioning/GlobalPartitionedMessageTopology.cs index 9d9318b05..3345aa4ba 100644 --- a/src/Wolverine/Runtime/Partitioning/GlobalPartitionedMessageTopology.cs +++ b/src/Wolverine/Runtime/Partitioning/GlobalPartitionedMessageTopology.cs @@ -19,6 +19,11 @@ public GlobalPartitionedMessageTopology(WolverineOptions options) internal PartitionedMessageTopology? ExternalTopology => _externalTopology; internal LocalPartitionedMessageTopology? LocalTopology => _localTopology; + public void LocalQueues(string baseQueueName, int numberOfEndpoints) + { + _localTopology = new LocalPartitionedMessageTopology(_options, baseQueueName, numberOfEndpoints); + } + internal void SetExternalTopology(Func factory, string baseName) { SetExternalTopology(factory(_options), baseName); @@ -28,10 +33,12 @@ internal void SetExternalTopology(PartitionedMessageTopology topology, string ba { _externalTopology = topology; - // Create companion local topology with matching slot count - var localBaseName = $"global-{baseName}"; - var slotCount = topology.Slots.Count; - _localTopology = new LocalPartitionedMessageTopology(_options, localBaseName, slotCount); + if (_localTopology == null) + { + // Create companion local topology with matching slot count + var localBaseName = $"global-{baseName}"; + _localTopology = new LocalPartitionedMessageTopology(_options, localBaseName, topology.Slots.Count); + } // Force durable mode on all external endpoints foreach (var slot in topology.Slots) @@ -46,9 +53,13 @@ internal void SetExternalTopology(PartitionedMessageTopology topology, string ba } // Tag each external slot endpoint with its companion local queue URI - for (var i = 0; i < topology.Slots.Count; i++) + // Only tag if slot counts match; mismatches will be caught by AssertValidity() + if (topology.Slots.Count == _localTopology.Slots.Count) { - topology.Slots[i].GlobalPartitionLocalQueueUri = _localTopology.Slots[i].Uri; + for (var i = 0; i < topology.Slots.Count; i++) + { + topology.Slots[i].GlobalPartitionLocalQueueUri = _localTopology.Slots[i].Uri; + } } } @@ -135,6 +146,18 @@ public void AssertValidity() throw new InvalidOperationException( "An external transport topology must be configured for global partitioning"); } + + if (_localTopology == null) + { + throw new InvalidOperationException( + "A local queue topology must be configured for global partitioning"); + } + + if (_externalTopology.Slots.Count != _localTopology.Slots.Count) + { + throw new InvalidOperationException( + $"The external topology has {_externalTopology.Slots.Count} slots but the local topology has {_localTopology.Slots.Count} slots. These must match for global partitioning."); + } } internal bool Matches(Type messageType)