diff --git a/docs/guide/messaging/exclusive-node-processing.md b/docs/guide/messaging/exclusive-node-processing.md new file mode 100644 index 000000000..751cfd3b6 --- /dev/null +++ b/docs/guide/messaging/exclusive-node-processing.md @@ -0,0 +1,201 @@ +# Exclusive Node Processing + +Sometimes you need to ensure that only one node in your cluster processes messages from a specific queue or topic, but you still want to take advantage of parallel processing for better throughput. This is different from strict ordering, which processes messages one at a time. + +## When to Use Exclusive Node Processing + +Use exclusive node processing when you need: + +- **Singleton processing**: Background jobs or scheduled tasks that should only run on one node +- **Resource constraints**: Operations that access limited resources that can't be shared across nodes +- **Stateful processing**: When maintaining in-memory state that shouldn't be distributed +- **Ordered event streams**: Processing events in order while still maintaining throughput + +## Basic Configuration + +### Exclusive Node with Parallelism + +Configure a listener to run exclusively on one node while processing multiple messages in parallel: + + +```cs +var builder = Host.CreateDefaultBuilder(); +builder.UseWolverine(opts => +{ + opts.ListenToRabbitQueue("important-jobs") + .ExclusiveNodeWithParallelism(maxParallelism: 5); +}); +``` + + +This configuration ensures: +- Only one node in the cluster will process this queue +- Up to 5 messages can be processed in parallel on that node +- If the exclusive node fails, another node will take over + +### Default Parallelism + +If you don't specify the parallelism level, it defaults to 10: + + +```cs +opts.ListenToRabbitQueue("background-tasks") + .ExclusiveNodeWithParallelism(); // Defaults to 10 parallel messages +``` + + +## Session-Based Ordering + +For scenarios where you need to maintain ordering within specific groups (like Azure Service Bus sessions), use exclusive node with session ordering: + + +```cs +opts.ListenToAzureServiceBusQueue("ordered-events") + .ExclusiveNodeWithSessionOrdering(maxParallelSessions: 5); +``` + + +This ensures: +- Only one node processes the queue +- Multiple sessions can be processed in parallel (up to 5 in this example) +- Messages within each session are processed in order +- Different sessions can be processed concurrently + +## Azure Service Bus Specific Configuration + +Azure Service Bus has special support for exclusive node processing with sessions: + + +```cs +opts.ListenToAzureServiceBusQueue("user-events") + .ExclusiveNodeWithSessions(maxParallelSessions: 8); +``` + + +This is a convenience method that: +1. Enables session support with the specified parallelism +2. Configures exclusive node processing +3. Ensures proper session handling + +For topic subscriptions without sessions: + + +```cs +opts.ListenToAzureServiceBusSubscription("notifications", "email-sender") + .ExclusiveNodeWithParallelism(maxParallelism: 3); +``` + + +## Combining with Other Options + +Exclusive node processing can be combined with other listener configurations: + + +```cs +opts.ListenToRabbitQueue("critical-tasks") + .ExclusiveNodeWithParallelism(maxParallelism: 5) + .UseDurableInbox() // Use durable inbox for reliability + .TelemetryEnabled(true) // Enable telemetry + .Named("CriticalTaskProcessor"); // Give it a friendly name +``` + + +## Comparison with Other Modes + +| Mode | Nodes | Parallelism | Ordering | Use Case | +|------|-------|-------------|----------|----------| +| **Default (Competing Consumers)** | All nodes | Configurable | No guarantee | High throughput, load balancing | +| **Sequential** | Current node | 1 | Yes (local) | Local ordering, single thread | +| **ListenWithStrictOrdering** | One (exclusive) | 1 | Yes (global) | Global ordering, single thread | +| **ExclusiveNodeWithParallelism** | One (exclusive) | Configurable | No | Singleton with throughput | +| **ExclusiveNodeWithSessionOrdering** | One (exclusive) | Configurable | Yes (per session) | Singleton with session ordering | + +## Implementation Notes + +### Leader Election + +When using exclusive node processing, Wolverine uses its leader election mechanism to ensure only one node claims the exclusive listener. This requires: + +1. A persistence layer (SQL Server, PostgreSQL, or RavenDB) +2. Node agent support enabled + + +```cs +opts.PersistMessagesWithSqlServer(connectionString) + .EnableNodeAgentSupport(); // Required for leader election + +opts.ListenToRabbitQueue("singleton-queue") + .ExclusiveNodeWithParallelism(5); +``` + + +### Failover Behavior + +If the node running an exclusive listener fails: + +1. Other nodes detect the failure through the persistence layer +2. A new node is elected to take over the exclusive listener +3. Processing resumes on the new node +4. Any in-flight messages are handled according to your durability settings + +### Local Queues + +Exclusive node processing is not supported for local queues since they are inherently single-node: + +```cs +// This will throw NotSupportedException +opts.LocalQueue("local") + .ExclusiveNodeWithParallelism(5); // ❌ Not supported +``` + +## Testing Exclusive Node Processing + +When testing exclusive node processing: + +1. **Unit Tests**: Test the configuration separately from the execution +2. **Integration Tests**: Use `DurabilityMode.Solo` to simplify testing +3. **Load Tests**: Verify that parallelism improves throughput as expected + + +```cs +// In tests, use Solo mode to avoid leader election complexity +opts.Durability.Mode = DurabilityMode.Solo; + +opts.ListenToRabbitQueue("test-queue") + .ExclusiveNodeWithParallelism(5); +``` + + +## Performance Considerations + +- **Parallelism Level**: Set based on your message processing time and resource constraints +- **Session Count**: For session-based ordering, balance between parallelism and memory usage +- **Failover Time**: Leader election typically takes a few seconds; plan accordingly +- **Message Distribution**: Ensure your message grouping (sessions) distributes evenly for best performance +- **Resource Implications**: Higher parallelism values increase memory usage and thread pool consumption. Each parallel message processor maintains its own execution context. For CPU-bound operations, setting parallelism higher than available CPU cores may decrease performance. For I/O-bound operations, higher values can improve throughput but monitor memory usage carefully. + +## Troubleshooting + +### Messages Not Processing + +If messages aren't being processed: +1. Check that node agents are enabled +2. Verify the persistence layer is configured +3. Look for leader election errors in logs +4. Ensure only one node is claiming the exclusive listener + +### Lower Than Expected Throughput + +If throughput is lower than expected: +1. Increase the parallelism level +2. Check for blocking operations in message handlers +3. Verify that sessions (if used) are well-distributed +4. Monitor CPU and memory usage on the exclusive node + +### Failover Not Working + +If failover isn't working properly: +1. Check network connectivity between nodes +2. Verify all nodes can access the persistence layer +3. Look for timeout or deadlock issues in logs +4. Ensure node agent support is enabled on all nodes \ No newline at end of file diff --git a/src/Testing/CoreTests/Configuration/ExclusiveNodeWithParallelismTests.cs b/src/Testing/CoreTests/Configuration/ExclusiveNodeWithParallelismTests.cs new file mode 100644 index 000000000..dece1133d --- /dev/null +++ b/src/Testing/CoreTests/Configuration/ExclusiveNodeWithParallelismTests.cs @@ -0,0 +1,158 @@ +using System; +using Shouldly; +using Wolverine.Configuration; +using Wolverine.Transports; +using Wolverine.Transports.Local; +using Wolverine.Transports.Tcp; +using Xunit; + +namespace CoreTests.Configuration; + +public class ExclusiveNodeWithParallelismTests +{ + [Fact] + public void exclusive_node_with_parallelism_sets_correct_options() + { + var endpoint = new TcpEndpoint("localhost", 5000); + var config = new ListenerConfiguration(endpoint); + + config.ExclusiveNodeWithParallelism(5); + ((IDelayedEndpointConfiguration)config).Apply(); + + endpoint.ListenerScope.ShouldBe(ListenerScope.Exclusive); + endpoint.ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(5); + endpoint.ExecutionOptions.EnsureOrdered.ShouldBe(false); + endpoint.ExecutionOptions.SingleProducerConstrained.ShouldBe(false); + endpoint.ListenerCount.ShouldBe(1); + endpoint.IsListener.ShouldBe(true); + } + + [Fact] + public void exclusive_node_with_parallelism_sets_endpoint_name() + { + var endpoint = new TcpEndpoint("localhost", 5000); + var config = new ListenerConfiguration(endpoint); + + config.ExclusiveNodeWithParallelism(5, "special-endpoint"); + ((IDelayedEndpointConfiguration)config).Apply(); + + endpoint.EndpointName.ShouldBe("special-endpoint"); + } + + [Fact] + public void exclusive_node_with_parallelism_validates_max_parallelism() + { + var endpoint = new TcpEndpoint("localhost", 5000); + var config = new ListenerConfiguration(endpoint); + + Should.Throw(() => config.ExclusiveNodeWithParallelism(0)) + .Message.ShouldContain("Maximum parallelism must be at least 1"); + + Should.Throw(() => config.ExclusiveNodeWithParallelism(-1)) + .Message.ShouldContain("Maximum parallelism must be at least 1"); + } + + [Fact] + public void exclusive_node_with_parallelism_throws_for_local_queue() + { + var endpoint = new LocalQueue("test"); + var config = new ListenerConfiguration(endpoint); + + Should.Throw(() => config.ExclusiveNodeWithParallelism(5)) + .Message.ShouldContain("cannot use the ExclusiveNodeWithParallelism option for local queues"); + } + + [Fact] + public void exclusive_node_with_session_ordering_sets_correct_options() + { + var endpoint = new TcpEndpoint("localhost", 5000); + var config = new ListenerConfiguration(endpoint); + + config.ExclusiveNodeWithSessionOrdering(3); + ((IDelayedEndpointConfiguration)config).Apply(); + + endpoint.ListenerScope.ShouldBe(ListenerScope.Exclusive); + endpoint.ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(3); + endpoint.ExecutionOptions.EnsureOrdered.ShouldBe(true); + endpoint.ListenerCount.ShouldBe(3); + endpoint.IsListener.ShouldBe(true); + } + + [Fact] + public void exclusive_node_with_session_ordering_sets_endpoint_name() + { + var endpoint = new TcpEndpoint("localhost", 5000); + var config = new ListenerConfiguration(endpoint); + + config.ExclusiveNodeWithSessionOrdering(3, "session-endpoint"); + ((IDelayedEndpointConfiguration)config).Apply(); + + endpoint.EndpointName.ShouldBe("session-endpoint"); + } + + [Fact] + public void exclusive_node_with_session_ordering_validates_max_sessions() + { + var endpoint = new TcpEndpoint("localhost", 5000); + var config = new ListenerConfiguration(endpoint); + + Should.Throw(() => config.ExclusiveNodeWithSessionOrdering(0)) + .Message.ShouldContain("Maximum parallel sessions must be at least 1"); + + Should.Throw(() => config.ExclusiveNodeWithSessionOrdering(-1)) + .Message.ShouldContain("Maximum parallel sessions must be at least 1"); + } + + [Fact] + public void exclusive_node_with_session_ordering_throws_for_local_queue() + { + var endpoint = new LocalQueue("test"); + var config = new ListenerConfiguration(endpoint); + + Should.Throw(() => config.ExclusiveNodeWithSessionOrdering(3)) + .Message.ShouldContain("cannot use the ExclusiveNodeWithSessionOrdering option for local queues"); + } + + [Fact] + public void can_chain_exclusive_node_with_other_configurations() + { + var endpoint = new TcpEndpoint("localhost", 5000); + var config = new ListenerConfiguration(endpoint); + + config + .ExclusiveNodeWithParallelism(5) + .UseDurableInbox() + .TelemetryEnabled(false); + ((IDelayedEndpointConfiguration)config).Apply(); + + endpoint.ListenerScope.ShouldBe(ListenerScope.Exclusive); + endpoint.ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(5); + endpoint.Mode.ShouldBe(EndpointMode.Durable); + endpoint.TelemetryEnabled.ShouldBe(false); + } + + [Fact] + public void default_parallelism_is_10() + { + var endpoint = new TcpEndpoint("localhost", 5000); + var config = new ListenerConfiguration(endpoint); + + config.ExclusiveNodeWithParallelism(); + ((IDelayedEndpointConfiguration)config).Apply(); + + endpoint.ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(10); + } + + [Fact] + public void default_parallel_sessions_is_10() + { + var endpoint = new TcpEndpoint("localhost", 5000); + var config = new ListenerConfiguration(endpoint); + + config.ExclusiveNodeWithSessionOrdering(); + ((IDelayedEndpointConfiguration)config).Apply(); + + endpoint.ExecutionOptions.MaxDegreeOfParallelism.ShouldBe(10); + endpoint.ListenerCount.ShouldBe(10); + } +} \ No newline at end of file diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusListenerConfigurationExtensions.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusListenerConfigurationExtensions.cs new file mode 100644 index 000000000..d44fa39e7 --- /dev/null +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusListenerConfigurationExtensions.cs @@ -0,0 +1,49 @@ +using Wolverine.Configuration; + +namespace Wolverine.AzureServiceBus; + +/// +/// Extension methods for Azure Service Bus specific exclusive node configurations +/// +public static class AzureServiceBusListenerConfigurationExtensions +{ + /// + /// Configure this Azure Service Bus queue to run exclusively on a single node + /// with session-based parallel processing. This ensures only one node processes + /// the queue while allowing multiple sessions to be processed in parallel. + /// + /// The Azure Service Bus listener configuration + /// Maximum number of sessions to process in parallel. Default is 10. + /// Optional endpoint name for identification + /// The configuration for method chaining + public static AzureServiceBusListenerConfiguration ExclusiveNodeWithSessions( + this AzureServiceBusListenerConfiguration configuration, + int maxParallelSessions = 10, + string? endpointName = null) + { + // First ensure sessions are required with the specified parallelism + configuration.RequireSessions(maxParallelSessions); + + // Then apply exclusive node configuration + configuration.ExclusiveNodeWithSessionOrdering(maxParallelSessions, endpointName); + + return configuration; + } + + /// + /// Configure this Azure Service Bus topic subscription to run exclusively on a single node + /// with parallel message processing. Useful for singleton consumers that still need throughput. + /// + /// The Azure Service Bus subscription configuration + /// Maximum number of messages to process in parallel. Default is 10. + /// Optional endpoint name for identification + /// The configuration for method chaining + public static AzureServiceBusSubscriptionListenerConfiguration ExclusiveNodeWithParallelism( + this AzureServiceBusSubscriptionListenerConfiguration configuration, + int maxParallelism = 10, + string? endpointName = null) + { + configuration.ExclusiveNodeWithParallelism(maxParallelism, endpointName); + return configuration; + } +} \ No newline at end of file diff --git a/src/Wolverine/Configuration/ListenerConfiguration.cs b/src/Wolverine/Configuration/ListenerConfiguration.cs index e963cd151..7232c4620 100644 --- a/src/Wolverine/Configuration/ListenerConfiguration.cs +++ b/src/Wolverine/Configuration/ListenerConfiguration.cs @@ -65,6 +65,99 @@ public TSelf ListenWithStrictOrdering(string? endpointName = null) return this.As(); } + /// + /// Configure this listener to run exclusively on a single node in the cluster, + /// but allow parallel message processing within that node. This is useful for + /// scenarios where you need single-node consistency but want to maintain throughput. + /// + /// Maximum number of messages to process in parallel on the exclusive node. Default is 10. + /// Optional endpoint name for identification + /// The configuration for method chaining + public TSelf ExclusiveNodeWithParallelism(int maxParallelism = 10, string? endpointName = null) + { + if (maxParallelism < 1) + { + throw new ArgumentException("Maximum parallelism must be at least 1", nameof(maxParallelism)); + } + + if (maxParallelism > 100) + { + // Warning for very high parallelism values that might indicate misunderstanding + // This is logged but doesn't prevent configuration + System.Diagnostics.Debug.WriteLine( + $"WARNING: Setting maxParallelism to {maxParallelism} is very high. " + + "This will consume significant memory and thread pool resources. " + + "Consider if this high level of parallelism is necessary for your use case."); + } + + if (_endpoint is LocalQueue) + throw new NotSupportedException( + $"Wolverine cannot use the {nameof(ExclusiveNodeWithParallelism)} option for local queues. Use an external message queue for exclusive node processing across a clustered application."); + + add(e => + { + e.IsListener = true; + e.ListenerScope = ListenerScope.Exclusive; + e.ExecutionOptions.MaxDegreeOfParallelism = maxParallelism; + e.ExecutionOptions.EnsureOrdered = false; // Allow parallel processing + e.ExecutionOptions.SingleProducerConstrained = false; // Allow multiple producers within the node + e.ListenerCount = 1; // Single listener instance for exclusive node + + if (endpointName.IsNotEmpty()) + { + e.EndpointName = endpointName; + } + }); + + return this.As(); + } + + /// + /// Configure this listener to run exclusively on a single node with parallel processing, + /// but maintain ordering within specific groups (sessions). This is particularly useful + /// for Azure Service Bus with sessions or similar scenarios. + /// + /// Maximum number of sessions/groups to process in parallel. Default is 10. + /// Optional endpoint name for identification + /// The configuration for method chaining + public TSelf ExclusiveNodeWithSessionOrdering(int maxParallelSessions = 10, string? endpointName = null) + { + if (maxParallelSessions < 1) + { + throw new ArgumentException("Maximum parallel sessions must be at least 1", nameof(maxParallelSessions)); + } + + if (maxParallelSessions > 100) + { + // Warning for very high session count that might indicate misunderstanding + // This is logged but doesn't prevent configuration + System.Diagnostics.Debug.WriteLine( + $"WARNING: Setting maxParallelSessions to {maxParallelSessions} is very high. " + + "This will consume significant memory and thread pool resources. " + + "Consider if this many concurrent sessions is necessary for your use case."); + } + + if (_endpoint is LocalQueue) + throw new NotSupportedException( + $"Wolverine cannot use the {nameof(ExclusiveNodeWithSessionOrdering)} option for local queues. Use an external message queue for exclusive node processing across a clustered application."); + + add(e => + { + e.IsListener = true; + e.ListenerScope = ListenerScope.Exclusive; + e.ExecutionOptions.MaxDegreeOfParallelism = maxParallelSessions; + e.ExecutionOptions.EnsureOrdered = true; // Maintain ordering within sessions + e.ListenerCount = maxParallelSessions; // Multiple listeners for different sessions + + if (endpointName.IsNotEmpty()) + { + e.EndpointName = endpointName; + } + }); + + return this.As(); + } + public TSelf TelemetryEnabled(bool isEnabled) { add(e => e.TelemetryEnabled = isEnabled);