Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 201 additions & 0 deletions docs/guide/messaging/exclusive-node-processing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
# Exclusive Node Processing

Sometimes you need to ensure that only one node in your cluster processes messages from a specific queue or topic, but you still want to take advantage of parallel processing for better throughput. This is different from strict ordering, which processes messages one at a time.

## When to Use Exclusive Node Processing

Use exclusive node processing when you need:

- **Singleton processing**: Background jobs or scheduled tasks that should only run on one node
- **Resource constraints**: Operations that access limited resources that can't be shared across nodes
- **Stateful processing**: When maintaining in-memory state that shouldn't be distributed
- **Ordered event streams**: Processing events in order while still maintaining throughput

## Basic Configuration

### Exclusive Node with Parallelism

Configure a listener to run exclusively on one node while processing multiple messages in parallel:

<!-- snippet: ExclusiveNodeWithParallelism -->
```cs
var builder = Host.CreateDefaultBuilder();
builder.UseWolverine(opts =>
{
opts.ListenToRabbitQueue("important-jobs")
.ExclusiveNodeWithParallelism(maxParallelism: 5);
});
```
<!-- endSnippet -->

This configuration ensures:
- Only one node in the cluster will process this queue
- Up to 5 messages can be processed in parallel on that node
- If the exclusive node fails, another node will take over

### Default Parallelism

If you don't specify the parallelism level, it defaults to 10:

<!-- snippet: ExclusiveNodeDefaultParallelism -->
```cs
opts.ListenToRabbitQueue("background-tasks")
.ExclusiveNodeWithParallelism(); // Defaults to 10 parallel messages
```
<!-- endSnippet -->

## Session-Based Ordering

For scenarios where you need to maintain ordering within specific groups (like Azure Service Bus sessions), use exclusive node with session ordering:

<!-- snippet: ExclusiveNodeWithSessionOrdering -->
```cs
opts.ListenToAzureServiceBusQueue("ordered-events")
.ExclusiveNodeWithSessionOrdering(maxParallelSessions: 5);
```
<!-- endSnippet -->

This ensures:
- Only one node processes the queue
- Multiple sessions can be processed in parallel (up to 5 in this example)
- Messages within each session are processed in order
- Different sessions can be processed concurrently

## Azure Service Bus Specific Configuration

Azure Service Bus has special support for exclusive node processing with sessions:

<!-- snippet: AzureServiceBusExclusiveNode -->
```cs
opts.ListenToAzureServiceBusQueue("user-events")
.ExclusiveNodeWithSessions(maxParallelSessions: 8);
```
<!-- endSnippet -->

This is a convenience method that:
1. Enables session support with the specified parallelism
2. Configures exclusive node processing
3. Ensures proper session handling

For topic subscriptions without sessions:

<!-- snippet: AzureServiceBusSubscriptionExclusive -->
```cs
opts.ListenToAzureServiceBusSubscription("notifications", "email-sender")
.ExclusiveNodeWithParallelism(maxParallelism: 3);
```
<!-- endSnippet -->

## Combining with Other Options

Exclusive node processing can be combined with other listener configurations:

<!-- snippet: ExclusiveNodeCombined -->
```cs
opts.ListenToRabbitQueue("critical-tasks")
.ExclusiveNodeWithParallelism(maxParallelism: 5)
.UseDurableInbox() // Use durable inbox for reliability
.TelemetryEnabled(true) // Enable telemetry
.Named("CriticalTaskProcessor"); // Give it a friendly name
```
<!-- endSnippet -->

## Comparison with Other Modes

| Mode | Nodes | Parallelism | Ordering | Use Case |
|------|-------|-------------|----------|----------|
| **Default (Competing Consumers)** | All nodes | Configurable | No guarantee | High throughput, load balancing |
| **Sequential** | Current node | 1 | Yes (local) | Local ordering, single thread |
| **ListenWithStrictOrdering** | One (exclusive) | 1 | Yes (global) | Global ordering, single thread |
| **ExclusiveNodeWithParallelism** | One (exclusive) | Configurable | No | Singleton with throughput |
| **ExclusiveNodeWithSessionOrdering** | One (exclusive) | Configurable | Yes (per session) | Singleton with session ordering |

## Implementation Notes

### Leader Election

When using exclusive node processing, Wolverine uses its leader election mechanism to ensure only one node claims the exclusive listener. This requires:

1. A persistence layer (SQL Server, PostgreSQL, or RavenDB)
2. Node agent support enabled

<!-- snippet: ExclusiveNodeWithPersistence -->
```cs
opts.PersistMessagesWithSqlServer(connectionString)
.EnableNodeAgentSupport(); // Required for leader election

opts.ListenToRabbitQueue("singleton-queue")
.ExclusiveNodeWithParallelism(5);
```
<!-- endSnippet -->

### Failover Behavior

If the node running an exclusive listener fails:

1. Other nodes detect the failure through the persistence layer
2. A new node is elected to take over the exclusive listener
3. Processing resumes on the new node
4. Any in-flight messages are handled according to your durability settings

### Local Queues

Exclusive node processing is not supported for local queues since they are inherently single-node:

```cs
// This will throw NotSupportedException
opts.LocalQueue("local")
.ExclusiveNodeWithParallelism(5); // ❌ Not supported
```

## Testing Exclusive Node Processing

When testing exclusive node processing:

1. **Unit Tests**: Test the configuration separately from the execution
2. **Integration Tests**: Use `DurabilityMode.Solo` to simplify testing
3. **Load Tests**: Verify that parallelism improves throughput as expected

<!-- snippet: TestingExclusiveNode -->
```cs
// In tests, use Solo mode to avoid leader election complexity
opts.Durability.Mode = DurabilityMode.Solo;

opts.ListenToRabbitQueue("test-queue")
.ExclusiveNodeWithParallelism(5);
```
<!-- endSnippet -->

## Performance Considerations

- **Parallelism Level**: Set based on your message processing time and resource constraints
- **Session Count**: For session-based ordering, balance between parallelism and memory usage
- **Failover Time**: Leader election typically takes a few seconds; plan accordingly
- **Message Distribution**: Ensure your message grouping (sessions) distributes evenly for best performance
- **Resource Implications**: Higher parallelism values increase memory usage and thread pool consumption. Each parallel message processor maintains its own execution context. For CPU-bound operations, setting parallelism higher than available CPU cores may decrease performance. For I/O-bound operations, higher values can improve throughput but monitor memory usage carefully.

## Troubleshooting

### Messages Not Processing

If messages aren't being processed:
1. Check that node agents are enabled
2. Verify the persistence layer is configured
3. Look for leader election errors in logs
4. Ensure only one node is claiming the exclusive listener

### Lower Than Expected Throughput

If throughput is lower than expected:
1. Increase the parallelism level
2. Check for blocking operations in message handlers
3. Verify that sessions (if used) are well-distributed
4. Monitor CPU and memory usage on the exclusive node

### Failover Not Working

If failover isn't working properly:
1. Check network connectivity between nodes
2. Verify all nodes can access the persistence layer
3. Look for timeout or deadlock issues in logs
4. Ensure node agent support is enabled on all nodes
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
using System;
using Shouldly;
using Wolverine.Configuration;
using Wolverine.Transports;
using Wolverine.Transports.Local;
using Wolverine.Transports.Tcp;
using Xunit;

namespace CoreTests.Configuration;

public class ExclusiveNodeWithParallelismTests
{
[Fact]
public void exclusive_node_with_parallelism_sets_correct_options()
{
var endpoint = new TcpEndpoint("localhost", 5000);
var config = new ListenerConfiguration(endpoint);

config.ExclusiveNodeWithParallelism(5);
((IDelayedEndpointConfiguration)config).Apply();

endpoint.ListenerScope.ShouldBe(ListenerScope.Exclusive);
endpoint.ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(5);
endpoint.ExecutionOptions.EnsureOrdered.ShouldBe(false);
endpoint.ExecutionOptions.SingleProducerConstrained.ShouldBe(false);
endpoint.ListenerCount.ShouldBe(1);
endpoint.IsListener.ShouldBe(true);
}

[Fact]
public void exclusive_node_with_parallelism_sets_endpoint_name()
{
var endpoint = new TcpEndpoint("localhost", 5000);
var config = new ListenerConfiguration(endpoint);

config.ExclusiveNodeWithParallelism(5, "special-endpoint");
((IDelayedEndpointConfiguration)config).Apply();

endpoint.EndpointName.ShouldBe("special-endpoint");
}

[Fact]
public void exclusive_node_with_parallelism_validates_max_parallelism()
{
var endpoint = new TcpEndpoint("localhost", 5000);
var config = new ListenerConfiguration(endpoint);

Should.Throw<ArgumentException>(() => config.ExclusiveNodeWithParallelism(0))
.Message.ShouldContain("Maximum parallelism must be at least 1");

Should.Throw<ArgumentException>(() => config.ExclusiveNodeWithParallelism(-1))
.Message.ShouldContain("Maximum parallelism must be at least 1");
}

[Fact]
public void exclusive_node_with_parallelism_throws_for_local_queue()
{
var endpoint = new LocalQueue("test");
var config = new ListenerConfiguration<IListenerConfiguration, LocalQueue>(endpoint);

Should.Throw<NotSupportedException>(() => config.ExclusiveNodeWithParallelism(5))
.Message.ShouldContain("cannot use the ExclusiveNodeWithParallelism option for local queues");
}

[Fact]
public void exclusive_node_with_session_ordering_sets_correct_options()
{
var endpoint = new TcpEndpoint("localhost", 5000);
var config = new ListenerConfiguration(endpoint);

config.ExclusiveNodeWithSessionOrdering(3);
((IDelayedEndpointConfiguration)config).Apply();

endpoint.ListenerScope.ShouldBe(ListenerScope.Exclusive);
endpoint.ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(3);
endpoint.ExecutionOptions.EnsureOrdered.ShouldBe(true);
endpoint.ListenerCount.ShouldBe(3);
endpoint.IsListener.ShouldBe(true);
}

[Fact]
public void exclusive_node_with_session_ordering_sets_endpoint_name()
{
var endpoint = new TcpEndpoint("localhost", 5000);
var config = new ListenerConfiguration(endpoint);

config.ExclusiveNodeWithSessionOrdering(3, "session-endpoint");
((IDelayedEndpointConfiguration)config).Apply();

endpoint.EndpointName.ShouldBe("session-endpoint");
}

[Fact]
public void exclusive_node_with_session_ordering_validates_max_sessions()
{
var endpoint = new TcpEndpoint("localhost", 5000);
var config = new ListenerConfiguration(endpoint);

Should.Throw<ArgumentException>(() => config.ExclusiveNodeWithSessionOrdering(0))
.Message.ShouldContain("Maximum parallel sessions must be at least 1");

Should.Throw<ArgumentException>(() => config.ExclusiveNodeWithSessionOrdering(-1))
.Message.ShouldContain("Maximum parallel sessions must be at least 1");
}

[Fact]
public void exclusive_node_with_session_ordering_throws_for_local_queue()
{
var endpoint = new LocalQueue("test");
var config = new ListenerConfiguration<IListenerConfiguration, LocalQueue>(endpoint);

Should.Throw<NotSupportedException>(() => config.ExclusiveNodeWithSessionOrdering(3))
.Message.ShouldContain("cannot use the ExclusiveNodeWithSessionOrdering option for local queues");
}

[Fact]
public void can_chain_exclusive_node_with_other_configurations()
{
var endpoint = new TcpEndpoint("localhost", 5000);
var config = new ListenerConfiguration(endpoint);

config
.ExclusiveNodeWithParallelism(5)
.UseDurableInbox()
.TelemetryEnabled(false);
((IDelayedEndpointConfiguration)config).Apply();

endpoint.ListenerScope.ShouldBe(ListenerScope.Exclusive);
endpoint.ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(5);
endpoint.Mode.ShouldBe(EndpointMode.Durable);
endpoint.TelemetryEnabled.ShouldBe(false);
}

[Fact]
public void default_parallelism_is_10()
{
var endpoint = new TcpEndpoint("localhost", 5000);
var config = new ListenerConfiguration(endpoint);

config.ExclusiveNodeWithParallelism();
((IDelayedEndpointConfiguration)config).Apply();

endpoint.ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(10);
}

[Fact]
public void default_parallel_sessions_is_10()
{
var endpoint = new TcpEndpoint("localhost", 5000);
var config = new ListenerConfiguration(endpoint);

config.ExclusiveNodeWithSessionOrdering();
((IDelayedEndpointConfiguration)config).Apply();

endpoint.ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(10);
endpoint.ListenerCount.ShouldBe(10);
}
}
Loading
Loading