Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,7 @@ await host.StartAsync();
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs#L399-L444' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_conventional_routing_for_azure_service_bus' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Route to Topics and Subscriptions

::: info
This option was introduced in Wolverine 1.6.0.
:::
## Route to Topics and Subscriptions <Badge type="tip" text="1.6.0" />

You can also opt into conventional routing using topics and subscriptions named after the
message type names like this:
Expand Down Expand Up @@ -89,4 +85,10 @@ opts.UseAzureServiceBusTesting()
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/ConventionalRouting/Broadcasting/end_to_end_with_conventional_routing.cs#L32-L51' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_topic_and_subscription_conventional_routing_with_azure_service_bus' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Separated Handler Behavior <Badge type="tip" text="4.12" />

In the case of using the `MultipleHandlerBehavior.Separated` mode, this convention will create a subscription
for each separate handler using the handler type to derive the subscription name and the message type to derive
the topic name. Both the topic and subscription are declared by the transport if using the `AutoProvision()` setting.


Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ var receiver = WolverineHost.For(opts =>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs#L536-L574' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_conventional_routing_exchange_conventions' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Separated Handler Behavior <Badge type="tip" text="4.12" />

In the case of using the `MultipleHandlerBehavior.Separated` mode, this convention will create an exchange
for the message type, then a separate queue for each handler using the handler type to create the name *and* finally
a binding from that queue to the exchange.

TODO -- add content on filtering message types
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,11 @@ public AmazonSqsMessageRoutingConvention QueueNameForSender(Func<Type, string> n
{
return IdentifierForSender(namingRule);
}

protected override (AmazonSqsListenerConfiguration, Endpoint) FindOrCreateListenerForIdentifierUsingSeparatedHandler(string identifier,
AmazonSqsTransport transport, Type messageType, Type handlerType)
{
throw new NotSupportedException(
"The AWS SQS transport does not (yet) support conventional routing to multiple handlers in the same application. You will have to resort to explicit routing.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using JasperFx.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.Attributes;
using Wolverine.Runtime;
using Wolverine.Tracking;
using Xunit;
using Xunit.Abstractions;

namespace Wolverine.AzureServiceBus.Tests.Bugs;

public class Bug_1684_separated_handlers_and_conventional_routing(ITestOutputHelper Output)
{
[Fact]
public async Task try_it_and_send_to_multiple_topic_subscriptions()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseAzureServiceBusTesting()
.AutoProvision()
.AutoPurgeOnStartup()
.UseTopicAndSubscriptionConventionalRouting(x =>
{
x.IncludeTypes(type => type == typeof(Msg));
});

opts.Policies.DisableConventionalLocalRouting();

opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated;
})
.ConfigureServices(services =>
{
//services.AddHostedService<BackgroundJob>();
})

.StartAsync();

var message = new Msg(Guid.NewGuid());
var tracked = await host.TrackActivity().IncludeExternalTransports().SendMessageAndWaitAsync(message);

var all = tracked.AllRecordsInOrder().ToArray();
foreach (var record in all)
{
Output.WriteLine(record.ToString());
}

var received = tracked.Received.MessagesOf<Msg>().ToArray();
received.Length.ShouldBe(2);
}
}

public class BackgroundJob(IWolverineRuntime runtime) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var bus = new MessageBus(runtime);
var message = new Msg(Guid.NewGuid());
await bus.PublishAsync(message);
}
}

public record Msg(Guid Id);

[StickyHandler(nameof(ConsumerOne))]
public class ConsumerOne : IWolverineHandler
{
public Task Consume(Msg message)
{
Console.ForegroundColor = ConsoleColor.Blue;
Console.WriteLine($"Consumed by One: {message.Id}");
return Task.CompletedTask;
}
}

[StickyHandler(nameof(ConsumerOne))]
public class ConsumerTwo : IWolverineHandler
{
public void Consume(Msg message)
{
Console.ForegroundColor = ConsoleColor.Magenta;
Console.WriteLine($"Consumed by Two: {message.Id}");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.11.0"/>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.14.1" />
<PackageReference Include="xunit" Version="2.9.0"/>
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" PrivateAssets="All" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.2" PrivateAssets="All"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,11 @@ public AzureServiceBusMessageRoutingConvention QueueNameForSender(Func<Type, str
{
return IdentifierForSender(namingRule);
}

protected override (AzureServiceBusQueueListenerConfiguration, Endpoint) FindOrCreateListenerForIdentifierUsingSeparatedHandler(
string identifier, AzureServiceBusTransport transport, Type messageType, Type handlerType)
{
throw new NotSupportedException(
"The Azure Service Bus conventional routing by queues can not support conventional routing to multiple handlers in the same application. You will have to resort to explicit routing.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,26 @@ protected override (AzureServiceBusSubscriptionListenerConfiguration, Endpoint)
return (new AzureServiceBusSubscriptionListenerConfiguration(subscription), subscription);
}

protected override (AzureServiceBusSubscriptionListenerConfiguration, Endpoint) FindOrCreateListenerForIdentifierUsingSeparatedHandler(
string topicName, AzureServiceBusTransport transport, Type messageType, Type handlerType)
{
var topic = transport.Topics[topicName];

var subscriptionName = _subscriptionNameSource == null ? transport.MaybeCorrectName(handlerType.FullName) : _subscriptionNameSource(handlerType);

var subscription =
transport.Subscriptions.FirstOrDefault(x =>
x.Topic.TopicName == topicName && x.SubscriptionName == subscriptionName);

if (subscription == null)
{
subscription = new AzureServiceBusSubscription(transport, topic, subscriptionName);
transport.Subscriptions.Add(subscription);
}

return (new AzureServiceBusSubscriptionListenerConfiguration(subscription), subscription);
}

protected override (AzureServiceBusTopicSubscriberConfiguration, Endpoint) FindOrCreateSubscriber(string identifier,
AzureServiceBusTransport transport)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ protected override (PubsubTopicSubscriberConfiguration, Endpoint) FindOrCreateSu
return (new PubsubTopicSubscriberConfiguration(topic), topic);
}

protected override (PubsubTopicListenerConfiguration, Endpoint) FindOrCreateListenerForIdentifierUsingSeparatedHandler(string identifier,
PubsubTransport transport, Type messageType, Type handlerType)
{
throw new NotSupportedException(
"The Google Pubsub transport does not (yet) support conventional routing to multiple handlers in the same application. You will have to resort to explicit routing.");
}

/// <summary>
/// Alternative syntax to specify the name for the queue that each message type will be sent
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.Attributes;
using Wolverine.Runtime;
using Wolverine.Tracking;
using Xunit;
using Xunit.Abstractions;

namespace Wolverine.RabbitMQ.Tests.Bugs;

public class Bug_1684_separated_handlers_and_conventional_routing(ITestOutputHelper Output)
{
[Fact]
public async Task try_it_and_send_to_multiple_topic_subscriptions()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseRabbitMq()
.AutoProvision()
.AutoPurgeOnStartup()
.UseConventionalRouting(x =>
{
x.IncludeTypes(type => type == typeof(Msg));
});

opts.Policies.DisableConventionalLocalRouting();

opts.MultipleHandlerBehavior = MultipleHandlerBehavior.Separated;
})
.ConfigureServices(services =>
{
//services.AddHostedService<BackgroundJob>();
})

.StartAsync();

var message = new Msg(Guid.NewGuid());
var tracked = await host.TrackActivity().IncludeExternalTransports().SendMessageAndWaitAsync(message);

var all = tracked.AllRecordsInOrder().ToArray();
foreach (var record in all)
{
Output.WriteLine(record.ToString());
}

var received = tracked.Received.MessagesOf<Msg>().ToArray();
received.Length.ShouldBe(2);
}
}

public class BackgroundJob(IWolverineRuntime runtime) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var bus = new MessageBus(runtime);
var message = new Msg(Guid.NewGuid());
await bus.PublishAsync(message);
}
}

public record Msg(Guid Id);

[StickyHandler(nameof(ConsumerOne))]
public class ConsumerOne : IWolverineHandler
{
public Task Consume(Msg message)
{
Console.ForegroundColor = ConsoleColor.Blue;
Console.WriteLine($"Consumed by One: {message.Id}");
return Task.CompletedTask;
}
}

[StickyHandler(nameof(ConsumerOne))]
public class ConsumerTwo : IWolverineHandler
{
public void Consume(Msg message)
{
Console.ForegroundColor = ConsoleColor.Magenta;
Console.WriteLine($"Consumed by Two: {message.Id}");
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using JasperFx.Core.Reflection;
using Wolverine.Configuration;
using Wolverine.RabbitMQ.Internal;
using Wolverine.Transports;
Expand Down Expand Up @@ -50,4 +51,15 @@ public RabbitMqMessageRoutingConvention ExchangeNameForSending(Func<Type, string
{
return IdentifierForSender(nameForExchange);
}

protected override (RabbitMqConventionalListenerConfiguration, Endpoint) FindOrCreateListenerForIdentifierUsingSeparatedHandler(
string identifier, RabbitMqTransport transport, Type messageType, Type handlerType)
{
var exchange = transport.Exchanges[identifier];
var queueName = transport.MaybeCorrectName(handlerType.FullNameInCode());
var queue = transport.Queues[queueName];

queue.BindExchange(exchange.Name, $"{exchange.Name}-{queue.QueueName}");
return (new RabbitMqConventionalListenerConfiguration(queue, transport, _identifierForSender), queue);
}
}
32 changes: 31 additions & 1 deletion src/Wolverine/Transports/MessageRoutingConvention.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void IMessageRoutingConvention.DiscoverListeners(IWolverineRuntime runtime, IRea
foreach (var handlerChain in chain.ByEndpoint)
{
var handlerType = handlerChain.Handlers.First().HandlerType;
var endpoint = maybeCreateListenerForMessageOrHandlerType(transport, handlerType, runtime);
var endpoint = maybeCreateListenerForMessageAndSeparatedHandlerType(transport, messageType, handlerType, runtime);
if (endpoint != null)
{
handlerChain.RegisterEndpoint(endpoint);
Expand All @@ -63,6 +63,33 @@ void IMessageRoutingConvention.DiscoverListeners(IWolverineRuntime runtime, IRea
}
}

private Endpoint? maybeCreateListenerForMessageAndSeparatedHandlerType(TTransport transport, Type messageType, Type handlerType, IWolverineRuntime runtime)
{
// Can be null, so bail out if there's no queue
var topicName = _queueNameForListener(messageType);
if (topicName.IsEmpty())
{
return null;
}

var corrected = transport.MaybeCorrectName(topicName);

var (configuration, endpoint) = FindOrCreateListenerForIdentifierUsingSeparatedHandler(corrected, transport, messageType, handlerType);
//endpoint.EndpointName = queueName;

endpoint.IsListener = true;

var context = new MessageRoutingContext(messageType, runtime);

_configureListener(configuration, context);

configuration!.As<IDelayedEndpointConfiguration>().Apply();

ApplyListenerRoutingDefaults(endpoint.EndpointName, transport, messageType);

return endpoint;
}

private Endpoint? maybeCreateListenerForMessageOrHandlerType(TTransport transport, Type messageOrHandlerType, IWolverineRuntime runtime)
{
// Can be null, so bail out if there's no queue
Expand Down Expand Up @@ -168,6 +195,9 @@ public TSelf ExcludeTypes(Func<Type, bool> filter)

protected abstract (TListener, Endpoint) FindOrCreateListenerForIdentifier(string identifier,
TTransport transport, Type messageType);

protected abstract (TListener, Endpoint) FindOrCreateListenerForIdentifierUsingSeparatedHandler(string identifier,
TTransport transport, Type messageType, Type handlerType);

protected abstract (TSubscriber, Endpoint) FindOrCreateSubscriber(string identifier, TTransport transport);

Expand Down
Loading