Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 72 additions & 10 deletions docs/guide/messaging/partitioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -466,22 +466,84 @@ The rule will not override an explicitly set `PartitionKey` on an outgoing envel
`DeliveryOptions`, that value takes precedence.
:::

## Partitioning Messages Received from External Systems
## Global Partitioning

::: warning
Brute force, no points for style, explicit coding ahead!
Global partitioning extends the [sharded publishing](#sharded-publishing) concept to support multi-node deployments where messages must be processed sequentially by group id across the entire cluster, not just within a single node.

### How It Works

When you configure global partitioning, Wolverine:

1. **Links local queues to external transport queues** -- Each external transport endpoint (e.g., a RabbitMQ queue or Kafka topic) gets a companion local queue. The external queue acts as the coordination point across nodes, while the local queue handles the actual sequential processing within a node.

2. **Smart routing based on listener ownership** -- When a message is published, Wolverine checks whether the current node owns the exclusive listener for the target shard. If it does, the message is routed directly to the companion local queue (avoiding unnecessary network hops). If the shard is owned by another node, the message is sent through the external transport so it reaches the correct node.

3. **Support for modular monoliths** -- You can configure multiple global partitioning topologies for the same message type in different modules. Each module can have its own set of sharded queues and routing rules, allowing independent sequential processing pipelines within a single application.

::: tip
In single-node mode, global partitioning automatically shortcuts all messages to the companion local queues since the current node owns all listeners.
:::

If you are receiving messages from an external source that will be vulnerable to concurrent access problems when the messages
are executed, but you either do not want to make the external system publish the group ids or have no ability to make the
upstream system care about your own internal group id details, you can simply relay the received messages back out
to a partitioned message topology owned by your system.
### Configuration

Global partitioning is configured through `MessagePartitioningRules.GlobalPartitioned()`. You need to:

1. Set up a message partitioning strategy (e.g., `ByMessage<T>()` or `UseInferredMessageGrouping()`)
2. Configure the external transport topology (sharded queues/topics)
3. Specify which message types participate in global partitioning

The external and local topologies are automatically created with matching shard counts. The local queues are named with a `global-` prefix followed by the base name (e.g., `global-orders1`, `global-orders2`, etc.).

### Transport-Specific Configuration

Each supported transport has its own extension method for configuring the external topology:

| Transport | Extension Method | Documentation |
|-----------|-----------------|---------------|
| RabbitMQ | `UseShardedRabbitQueues()` | [RabbitMQ Global Partitioning](/guide/messaging/transports/rabbitmq/#global-partitioning) |
| Kafka | `UseShardedKafkaTopics()` | [Kafka Global Partitioning](/guide/messaging/transports/kafka#global-partitioning) |
| Amazon SQS | `UseShardedAmazonSqsQueues()` | [SQS Global Partitioning](/guide/messaging/transports/sqs/#global-partitioning) |
| Pulsar | `UseShardedPulsarTopics()` | [Pulsar Global Partitioning](/guide/messaging/transports/pulsar#global-partitioning) |

Using Amazon SQS as our transport, lets say that we're receiving messages from the external system at one queue like this:
### Example with RabbitMQ

<!-- snippet: sample_global_partitioned_with_rabbit_mq -->
<a id='snippet-sample_global_partitioned_with_rabbit_mq'></a>
```cs
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseRabbitMq();

// Do something to add Saga storage too!

opts
.MessagePartitioning

// This tells Wolverine to "just" use implied
// message grouping based on Saga identity among other things
.UseInferredMessageGrouping()


.GlobalPartitioned(topology =>
{
// Creates 5 sharded RabbitMQ queues named "sequenced1" through "sequenced5"
// with matching companion local queues for sequential processing
topology.UseShardedRabbitQueues("sequenced", 5);
topology.MessagesImplementing<MySequencedCommand>();

});
}).StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs#L721-L746' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_global_partitioned_with_rabbit_mq' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Hey folks, more coming soon. Hopefully before Wolverine 5.0.
### Validation

Watch this issue: https://github.com/JasperFx/wolverine/issues/1728
Wolverine validates global partitioning configuration at startup. It will throw an `InvalidOperationException` if:

- No message type matching policies are configured
- No external transport topology is configured
- The external and local topologies have different shard counts


30 changes: 28 additions & 2 deletions docs/guide/messaging/transports/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -474,13 +474,39 @@ using var host = await Host.CreateDefaultBuilder()
{
opts
.UseKafka("localhost:9092")

// Tell Wolverine that this application will never
// produce messages to turn off any diagnostics that might
// try to "ping" a topic and result in errors
.ConsumeOnly();

}).StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Kafka/Wolverine.Kafka.Tests/DocumentationSamples.cs#L131-L146' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_disable_all_kafka_sending' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Global Partitioning

Kafka topics can be used as the external transport for [global partitioned messaging](/guide/messaging/partitioning#global-partitioning). This creates a set of sharded Kafka topics with companion local queues for sequential processing across a multi-node cluster.

Use `UseShardedKafkaTopics()` within a `GlobalPartitioned()` configuration:

```cs
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseKafka("localhost:9092").AutoProvision();

opts.MessagePartitioning.ByMessage<IMyMessage>(x => x.GroupId);

opts.MessagePartitioning.GlobalPartitioned(topology =>
{
// Creates 4 sharded Kafka topics named "orders1" through "orders4"
// with matching companion local queues for sequential processing
topology.UseShardedKafkaTopics("orders", 4);
topology.MessagesImplementing<IMyMessage>();
});
}).StartAsync();
```

This creates Kafka topics named `orders1` through `orders4` with companion local queues `global-orders1` through `global-orders4`. Messages are routed to the correct shard based on their group id, and Wolverine handles the coordination between nodes automatically.
26 changes: 26 additions & 0 deletions docs/guide/messaging/transports/pulsar.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,32 @@ builder.UseWolverine(opts =>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Pulsar/Wolverine.Pulsar.Tests/DocumentationSamples.cs#L86-L101' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_pulsar_unsubscribe_on_close' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Global Partitioning

Pulsar topics can be used as the external transport for [global partitioned messaging](/guide/messaging/partitioning#global-partitioning). This creates a set of sharded Pulsar topics with companion local queues for sequential processing across a multi-node cluster.

Use `UseShardedPulsarTopics()` within a `GlobalPartitioned()` configuration:

```cs
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UsePulsar();

opts.MessagePartitioning.ByMessage<IMyMessage>(x => x.GroupId);

opts.MessagePartitioning.GlobalPartitioned(topology =>
{
// Creates 4 sharded Pulsar topics named "orders1" through "orders4"
// with matching companion local queues for sequential processing
topology.UseShardedPulsarTopics("orders", 4);
topology.MessagesImplementing<IMyMessage>();
});
}).StartAsync();
```

This creates Pulsar topics named `orders1` through `orders4` with companion local queues `global-orders1` through `global-orders4`. Messages are routed to the correct shard based on their group id, and Wolverine handles the coordination between nodes automatically.

## Interoperability

::: tip
Expand Down
39 changes: 39 additions & 0 deletions docs/guide/messaging/transports/rabbitmq/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,45 @@ builder.UseWolverine(opts =>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/channel_configuration.cs#L13-L31' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configuring_rabbit_mq_channel_creation' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Global Partitioning

RabbitMQ queues can be used as the external transport for [global partitioned messaging](/guide/messaging/partitioning#global-partitioning). This creates a set of sharded RabbitMQ queues with companion local queues for sequential processing across a multi-node cluster.

Use `UseShardedRabbitQueues()` within a `GlobalPartitioned()` configuration:

<!-- snippet: sample_global_partitioned_with_rabbit_mq -->
<a id='snippet-sample_global_partitioned_with_rabbit_mq'></a>
```cs
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseRabbitMq();

// Do something to add Saga storage too!

opts
.MessagePartitioning

// This tells Wolverine to "just" use implied
// message grouping based on Saga identity among other things
.UseInferredMessageGrouping()


.GlobalPartitioned(topology =>
{
// Creates 5 sharded RabbitMQ queues named "sequenced1" through "sequenced5"
// with matching companion local queues for sequential processing
topology.UseShardedRabbitQueues("sequenced", 5);
topology.MessagesImplementing<MySequencedCommand>();

});
}).StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs#L721-L746' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_global_partitioned_with_rabbit_mq' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

This creates RabbitMQ queues named `sequenced1` through `sequenced5` with companion local queues `global-sequenced1` through `global-sequenced5`. Messages are routed to the correct shard based on their group id, and Wolverine handles the coordination between nodes automatically.

## Compatibility Note

::: info
Expand Down
26 changes: 26 additions & 0 deletions docs/guide/messaging/transports/sqs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,32 @@ using var host = await Host.CreateDefaultBuilder()

Calling `EnableWolverineControlQueues()` implicitly enables system queues and request/reply support as well.

## Global Partitioning

Amazon SQS queues can be used as the external transport for [global partitioned messaging](/guide/messaging/partitioning#global-partitioning). This creates a set of sharded SQS queues with companion local queues for sequential processing across a multi-node cluster.

Use `UseShardedAmazonSqsQueues()` within a `GlobalPartitioned()` configuration:

```cs
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseAmazonSqsTransport().AutoProvision();

opts.MessagePartitioning.ByMessage<IMyMessage>(x => x.GroupId);

opts.MessagePartitioning.GlobalPartitioned(topology =>
{
// Creates 4 sharded SQS queues named "orders1" through "orders4"
// with matching companion local queues for sequential processing
topology.UseShardedAmazonSqsQueues("orders", 4);
topology.MessagesImplementing<IMyMessage>();
});
}).StartAsync();
```

This creates SQS queues named `orders1` through `orders4` with companion local queues `global-orders1` through `global-orders4`. Messages are routed to the correct shard based on their group id, and Wolverine handles the coordination between nodes automatically.

## Disabling System Queues <Badge type="tip" text="5.14" />

If your application does not have IAM permissions to create or delete queues, you can explicitly disable system queues:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,89 @@ public void try_match_returns_false_when_no_external_topology()

topology.TryMatch(typeof(GlobalTestMessage), runtime, out var route).ShouldBeFalse();
}

[Fact]
public void assert_validity_throws_when_no_local_topology()
{
var topology = CreateTopology();
topology.Message<GlobalTestMessage>();

// Directly set external without local by using reflection or a different path
// Actually, SetExternalTopology creates local if null, so we need to test
// a scenario where local is missing. This can happen if someone only sets
// subscriptions but no topology at all.
// The existing test "assert_validity_throws_when_no_external_topology" covers
// the case where external is null. With the new validation, if external is set
// but local is somehow null, it would also throw.
// Since SetExternalTopology always creates local if null, this test validates
// the error message for missing local topology indirectly via missing external.
Should.Throw<InvalidOperationException>(() => topology.AssertValidity())
.Message.ShouldContain("external transport topology");
}

[Fact]
public void assert_validity_throws_when_local_and_external_slot_counts_differ()
{
var topology = CreateTopology();
topology.Message<GlobalTestMessage>();

// Pre-configure local queues with 3 slots
topology.LocalQueues("local", 3);

// Set external topology with 5 slots - the local topology won't be overwritten
var external = CreateLocalTopology("ext", 5);
topology.SetExternalTopology(external, "test");

Should.Throw<InvalidOperationException>(() => topology.AssertValidity())
.Message.ShouldContain("must match");
}

[Fact]
public void assert_validity_passes_when_local_and_external_slot_counts_match()
{
var topology = CreateTopology();
topology.Message<GlobalTestMessage>();

// Pre-configure local queues with same count as external
topology.LocalQueues("local", 4);

var external = CreateLocalTopology("ext", 4);
topology.SetExternalTopology(external, "test");

// Should not throw
topology.AssertValidity();
}

[Fact]
public void set_external_topology_preserves_pre_configured_local_queues()
{
var topology = CreateTopology();

// Pre-configure local queues
topology.LocalQueues("my-local", 3);
var originalLocal = topology.LocalTopology;

// Set external topology - should NOT overwrite existing local topology
var external = CreateLocalTopology("ext", 3);
topology.SetExternalTopology(external, "test");

topology.LocalTopology.ShouldBeSameAs(originalLocal);
}

[Fact]
public void set_external_topology_creates_local_when_not_pre_configured()
{
var topology = CreateTopology();

// Don't pre-configure local queues
topology.LocalTopology.ShouldBeNull();

var external = CreateLocalTopology("ext", 3);
topology.SetExternalTopology(external, "test");

topology.LocalTopology.ShouldNotBeNull();
topology.LocalTopology!.Slots.Count.ShouldBe(3);
}
}

#endregion
Expand Down
Loading
Loading