From 319fcfd3c211b2b80890a3680d6af60c030a68cd Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Sun, 14 Sep 2025 19:07:06 -0500 Subject: [PATCH] Ability to use separated handlers as part of conventional routing for both Rabbit MQ and the Azure Service Bus model. Closes GH-1684 --- .../azureservicebus/conventional-routing.md | 12 +-- .../rabbitmq/conventional-routing.md | 5 +- .../AmazonSqsMessageRoutingConvention.cs | 7 ++ ...rated_handlers_and_conventional_routing.cs | 85 +++++++++++++++++++ .../Wolverine.AzureServiceBus.Tests.csproj | 2 +- ...AzureServiceBusMessageRoutingConvention.cs | 7 ++ ...ceBusTopicBroadcastingRoutingConvention.cs | 20 +++++ .../PubsubMessageRoutingConvention.cs | 7 ++ ...rated_handlers_and_conventional_routing.cs | 83 ++++++++++++++++++ .../RabbitMqMessageRoutingConvention.cs | 12 +++ .../Transports/MessageRoutingConvention.cs | 32 ++++++- 11 files changed, 264 insertions(+), 8 deletions(-) create mode 100644 src/Transports/Azure/Wolverine.AzureServiceBus.Tests/Bugs/Bug_1684_separated_handlers_and_conventional_routing.cs create mode 100644 src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_1684_separated_handlers_and_conventional_routing.cs diff --git a/docs/guide/messaging/transports/azureservicebus/conventional-routing.md b/docs/guide/messaging/transports/azureservicebus/conventional-routing.md index 628cb1df2..141b79829 100644 --- a/docs/guide/messaging/transports/azureservicebus/conventional-routing.md +++ b/docs/guide/messaging/transports/azureservicebus/conventional-routing.md @@ -57,11 +57,7 @@ await host.StartAsync(); snippet source | anchor -## Route to Topics and Subscriptions - -::: info -This option was introduced in Wolverine 1.6.0. -::: +## Route to Topics and Subscriptions You can also opt into conventional routing using topics and subscriptions named after the message type names like this: @@ -89,4 +85,10 @@ opts.UseAzureServiceBusTesting() snippet source | anchor +## Separated Handler Behavior + +In the case of using the `MultipleHandlerBehavior.Separated` mode, this convention will create a subscription +for each separate handler using the handler type to derive the subscription name and the message type to derive +the topic name. Both the topic and subscription are declared by the transport if using the `AutoProvision()` setting. + diff --git a/docs/guide/messaging/transports/rabbitmq/conventional-routing.md b/docs/guide/messaging/transports/rabbitmq/conventional-routing.md index 773063b89..aa10aa524 100644 --- a/docs/guide/messaging/transports/rabbitmq/conventional-routing.md +++ b/docs/guide/messaging/transports/rabbitmq/conventional-routing.md @@ -125,6 +125,9 @@ var receiver = WolverineHost.For(opts => snippet source | anchor +## Separated Handler Behavior +In the case of using the `MultipleHandlerBehavior.Separated` mode, this convention will create an exchange +for the message type, then a separate queue for each handler using the handler type to create the name *and* finally +a binding from that queue to the exchange. -TODO -- add content on filtering message types diff --git a/src/Transports/AWS/Wolverine.AmazonSqs/AmazonSqsMessageRoutingConvention.cs b/src/Transports/AWS/Wolverine.AmazonSqs/AmazonSqsMessageRoutingConvention.cs index ebe97e386..b47e54895 100644 --- a/src/Transports/AWS/Wolverine.AmazonSqs/AmazonSqsMessageRoutingConvention.cs +++ b/src/Transports/AWS/Wolverine.AmazonSqs/AmazonSqsMessageRoutingConvention.cs @@ -30,4 +30,11 @@ public AmazonSqsMessageRoutingConvention QueueNameForSender(Func n { return IdentifierForSender(namingRule); } + + protected override (AmazonSqsListenerConfiguration, Endpoint) FindOrCreateListenerForIdentifierUsingSeparatedHandler(string identifier, + AmazonSqsTransport transport, Type messageType, Type handlerType) + { + throw new NotSupportedException( + "The AWS SQS transport does not (yet) support conventional routing to multiple handlers in the same application. You will have to resort to explicit routing."); + } } \ No newline at end of file diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/Bugs/Bug_1684_separated_handlers_and_conventional_routing.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/Bugs/Bug_1684_separated_handlers_and_conventional_routing.cs new file mode 100644 index 000000000..e95cad994 --- /dev/null +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/Bugs/Bug_1684_separated_handlers_and_conventional_routing.cs @@ -0,0 +1,85 @@ +using JasperFx.Core; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.Attributes; +using Wolverine.Runtime; +using Wolverine.Tracking; +using Xunit; +using Xunit.Abstractions; + +namespace Wolverine.AzureServiceBus.Tests.Bugs; + +public class Bug_1684_separated_handlers_and_conventional_routing(ITestOutputHelper Output) +{ + [Fact] + public async Task try_it_and_send_to_multiple_topic_subscriptions() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseAzureServiceBusTesting() + .AutoProvision() + .AutoPurgeOnStartup() + .UseTopicAndSubscriptionConventionalRouting(x => + { + x.IncludeTypes(type => type == typeof(Msg)); + }); + + opts.Policies.DisableConventionalLocalRouting(); + + opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; + }) + .ConfigureServices(services => + { + //services.AddHostedService(); + }) + + .StartAsync(); + + var message = new Msg(Guid.NewGuid()); + var tracked = await host.TrackActivity().IncludeExternalTransports().SendMessageAndWaitAsync(message); + + var all = tracked.AllRecordsInOrder().ToArray(); + foreach (var record in all) + { + Output.WriteLine(record.ToString()); + } + + var received = tracked.Received.MessagesOf().ToArray(); + received.Length.ShouldBe(2); + } +} + +public class BackgroundJob(IWolverineRuntime runtime) : BackgroundService +{ + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + var bus = new MessageBus(runtime); + var message = new Msg(Guid.NewGuid()); + await bus.PublishAsync(message); + } +} + +public record Msg(Guid Id); + +[StickyHandler(nameof(ConsumerOne))] +public class ConsumerOne : IWolverineHandler +{ + public Task Consume(Msg message) + { + Console.ForegroundColor = ConsoleColor.Blue; + Console.WriteLine($"Consumed by One: {message.Id}"); + return Task.CompletedTask; + } +} + +[StickyHandler(nameof(ConsumerOne))] +public class ConsumerTwo : IWolverineHandler +{ + public void Consume(Msg message) + { + Console.ForegroundColor = ConsoleColor.Magenta; + Console.WriteLine($"Consumed by Two: {message.Id}"); + } +} \ No newline at end of file diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/Wolverine.AzureServiceBus.Tests.csproj b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/Wolverine.AzureServiceBus.Tests.csproj index 2e32937a7..54ff6b55b 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/Wolverine.AzureServiceBus.Tests.csproj +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/Wolverine.AzureServiceBus.Tests.csproj @@ -5,7 +5,7 @@ - + diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusMessageRoutingConvention.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusMessageRoutingConvention.cs index d15cb4005..99e5c37a7 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusMessageRoutingConvention.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusMessageRoutingConvention.cs @@ -31,4 +31,11 @@ public AzureServiceBusMessageRoutingConvention QueueNameForSender(Func + x.Topic.TopicName == topicName && x.SubscriptionName == subscriptionName); + + if (subscription == null) + { + subscription = new AzureServiceBusSubscription(transport, topic, subscriptionName); + transport.Subscriptions.Add(subscription); + } + + return (new AzureServiceBusSubscriptionListenerConfiguration(subscription), subscription); + } + protected override (AzureServiceBusTopicSubscriberConfiguration, Endpoint) FindOrCreateSubscriber(string identifier, AzureServiceBusTransport transport) { diff --git a/src/Transports/GCP/Wolverine.Pubsub/PubsubMessageRoutingConvention.cs b/src/Transports/GCP/Wolverine.Pubsub/PubsubMessageRoutingConvention.cs index 9236f174a..f7e161193 100644 --- a/src/Transports/GCP/Wolverine.Pubsub/PubsubMessageRoutingConvention.cs +++ b/src/Transports/GCP/Wolverine.Pubsub/PubsubMessageRoutingConvention.cs @@ -26,6 +26,13 @@ protected override (PubsubTopicSubscriberConfiguration, Endpoint) FindOrCreateSu return (new PubsubTopicSubscriberConfiguration(topic), topic); } + protected override (PubsubTopicListenerConfiguration, Endpoint) FindOrCreateListenerForIdentifierUsingSeparatedHandler(string identifier, + PubsubTransport transport, Type messageType, Type handlerType) + { + throw new NotSupportedException( + "The Google Pubsub transport does not (yet) support conventional routing to multiple handlers in the same application. You will have to resort to explicit routing."); + } + /// /// Alternative syntax to specify the name for the queue that each message type will be sent /// diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_1684_separated_handlers_and_conventional_routing.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_1684_separated_handlers_and_conventional_routing.cs new file mode 100644 index 000000000..253872eb3 --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_1684_separated_handlers_and_conventional_routing.cs @@ -0,0 +1,83 @@ +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.Attributes; +using Wolverine.Runtime; +using Wolverine.Tracking; +using Xunit; +using Xunit.Abstractions; + +namespace Wolverine.RabbitMQ.Tests.Bugs; + +public class Bug_1684_separated_handlers_and_conventional_routing(ITestOutputHelper Output) +{ + [Fact] + public async Task try_it_and_send_to_multiple_topic_subscriptions() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRabbitMq() + .AutoProvision() + .AutoPurgeOnStartup() + .UseConventionalRouting(x => + { + x.IncludeTypes(type => type == typeof(Msg)); + }); + + opts.Policies.DisableConventionalLocalRouting(); + + opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated; + }) + .ConfigureServices(services => + { + //services.AddHostedService(); + }) + + .StartAsync(); + + var message = new Msg(Guid.NewGuid()); + var tracked = await host.TrackActivity().IncludeExternalTransports().SendMessageAndWaitAsync(message); + + var all = tracked.AllRecordsInOrder().ToArray(); + foreach (var record in all) + { + Output.WriteLine(record.ToString()); + } + + var received = tracked.Received.MessagesOf().ToArray(); + received.Length.ShouldBe(2); + } +} + +public class BackgroundJob(IWolverineRuntime runtime) : BackgroundService +{ + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + var bus = new MessageBus(runtime); + var message = new Msg(Guid.NewGuid()); + await bus.PublishAsync(message); + } +} + +public record Msg(Guid Id); + +[StickyHandler(nameof(ConsumerOne))] +public class ConsumerOne : IWolverineHandler +{ + public Task Consume(Msg message) + { + Console.ForegroundColor = ConsoleColor.Blue; + Console.WriteLine($"Consumed by One: {message.Id}"); + return Task.CompletedTask; + } +} + +[StickyHandler(nameof(ConsumerOne))] +public class ConsumerTwo : IWolverineHandler +{ + public void Consume(Msg message) + { + Console.ForegroundColor = ConsoleColor.Magenta; + Console.WriteLine($"Consumed by Two: {message.Id}"); + } +} \ No newline at end of file diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqMessageRoutingConvention.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqMessageRoutingConvention.cs index f3d5d9a1d..e46a85562 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqMessageRoutingConvention.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqMessageRoutingConvention.cs @@ -1,3 +1,4 @@ +using JasperFx.Core.Reflection; using Wolverine.Configuration; using Wolverine.RabbitMQ.Internal; using Wolverine.Transports; @@ -50,4 +51,15 @@ public RabbitMqMessageRoutingConvention ExchangeNameForSending(Func().Apply(); + + ApplyListenerRoutingDefaults(endpoint.EndpointName, transport, messageType); + + return endpoint; + } + private Endpoint? maybeCreateListenerForMessageOrHandlerType(TTransport transport, Type messageOrHandlerType, IWolverineRuntime runtime) { // Can be null, so bail out if there's no queue @@ -168,6 +195,9 @@ public TSelf ExcludeTypes(Func filter) protected abstract (TListener, Endpoint) FindOrCreateListenerForIdentifier(string identifier, TTransport transport, Type messageType); + + protected abstract (TListener, Endpoint) FindOrCreateListenerForIdentifierUsingSeparatedHandler(string identifier, + TTransport transport, Type messageType, Type handlerType); protected abstract (TSubscriber, Endpoint) FindOrCreateSubscriber(string identifier, TTransport transport);