diff --git a/docs/guide/messaging/transports/rabbitmq/object-management.md b/docs/guide/messaging/transports/rabbitmq/object-management.md index 768dc4c58..a24410c15 100644 --- a/docs/guide/messaging/transports/rabbitmq/object-management.md +++ b/docs/guide/messaging/transports/rabbitmq/object-management.md @@ -143,6 +143,36 @@ return await app.RunJasperFxCommands(args); Note that this stateful resource model is also available at the command line as well for deploy time management. +## Exchange-to-Exchange Bindings + +Wolverine supports [RabbitMQ exchange-to-exchange bindings](https://www.rabbitmq.com/docs/e2e), which allow you +to route messages between exchanges before they reach a queue. This is useful for building message routing +topologies where a source exchange fans out to multiple destination exchanges, each with their own queue bindings. + +You can declare exchange-to-exchange bindings using the fluent `BindExchange().ToExchange()` syntax: + +```csharp +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRabbitMq() + .AutoProvision() + + // Bind source exchange to destination exchange with a routing key + .BindExchange("source-exchange").ToExchange("destination-exchange", "routing.key") + + // The destination exchange still needs a queue binding for consumers + .BindExchange("destination-exchange").ToQueue("my-queue", "routing.key"); + + opts.PublishAllMessages().ToRabbitExchange("source-exchange"); + opts.ListenToRabbitQueue("my-queue"); + }).StartAsync(); +``` + +When `AutoProvision()` is enabled, Wolverine will automatically declare the exchanges and create the +exchange-to-exchange bindings at startup. Both the source and destination exchanges are created +if they don't already exist. + ## Runtime Declaration From a user request, there are some extension methods in the WolverineFx.RabbitMQ Nuget off of `IWolverineRuntime` that will enable you to diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/RabbitMqExchangeBindingTests.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/RabbitMqExchangeBindingTests.cs new file mode 100644 index 000000000..091ed7f26 --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/RabbitMqExchangeBindingTests.cs @@ -0,0 +1,182 @@ +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using RabbitMQ.Client; +using Shouldly; +using Wolverine.RabbitMQ.Internal; +using Xunit; + +namespace Wolverine.RabbitMQ.Tests; + +public class RabbitMqExchangeBindingTests +{ + [Fact] + public async Task declare_calls_exchange_bind() + { + var channel = Substitute.For(); + var binding = new RabbitMqExchangeBinding("source", "destination", "routing.key"); + + await binding.DeclareAsync(channel, NullLogger.Instance); + + await channel.Received().ExchangeBindAsync("destination", "source", "routing.key", binding.Arguments); + binding.HasDeclared.ShouldBeTrue(); + } + + [Fact] + public async Task teardown_calls_exchange_unbind() + { + var channel = Substitute.For(); + var binding = new RabbitMqExchangeBinding("source", "destination", "routing.key"); + + await binding.TeardownAsync(channel); + + await channel.Received().ExchangeUnbindAsync("destination", "source", "routing.key", binding.Arguments); + } + + public class when_adding_exchange_to_exchange_bindings + { + private readonly RabbitMqTransport theTransport = new(); + + public when_adding_exchange_to_exchange_bindings() + { + new RabbitMqTransportExpression(theTransport, new WolverineOptions()) + .BindExchange("source-exchange").ToExchange("destination-exchange", "routing.key"); + } + + [Fact] + public void should_add_the_exchange_binding() + { + var destExchange = theTransport.Exchanges["destination-exchange"]; + var binding = destExchange.ExchangeBindings().Single(); + binding.SourceExchangeName.ShouldBe("source-exchange"); + binding.DestinationExchangeName.ShouldBe("destination-exchange"); + binding.BindingKey.ShouldBe("routing.key"); + } + + [Fact] + public void should_declare_the_source_exchange() + { + theTransport.Exchanges.Contains("source-exchange").ShouldBeTrue(); + } + + [Fact] + public void should_declare_the_destination_exchange() + { + theTransport.Exchanges.Contains("destination-exchange").ShouldBeTrue(); + } + + [Fact] + public void add_binding_without_routing_key() + { + new RabbitMqTransportExpression(theTransport, new WolverineOptions()) + .BindExchange("source2").ToExchange("destination2"); + + var destExchange = theTransport.Exchanges["destination2"]; + destExchange.ExchangeBindings() + .ShouldContain(x => x.BindingKey == "source2_destination2"); + } + } + + + public class exchange_binding_via_declare_exchange + { + private readonly RabbitMqTransport theTransport = new(); + + [Fact] + public void bind_exchange_via_declare_exchange_configuration() + { + new RabbitMqTransportExpression(theTransport, new WolverineOptions()) + .DeclareExchange("destination", exchange => + { + exchange.ExchangeType = ExchangeType.Topic; + exchange.BindExchange("source", "routing.*"); + }); + + var destExchange = theTransport.Exchanges["destination"]; + destExchange.ExchangeType.ShouldBe(ExchangeType.Topic); + + var binding = destExchange.ExchangeBindings().Single(); + binding.SourceExchangeName.ShouldBe("source"); + binding.BindingKey.ShouldBe("routing.*"); + } + + [Fact] + public void bind_exchange_deduplicates() + { + var exchange = theTransport.Exchanges["dest"]; + exchange.BindExchange("source", "key"); + exchange.BindExchange("source", "key"); + + exchange.ExchangeBindings().Count().ShouldBe(1); + } + + [Fact] + public void bind_exchange_different_keys_are_separate() + { + var exchange = theTransport.Exchanges["dest"]; + exchange.BindExchange("source", "key1"); + exchange.BindExchange("source", "key2"); + + exchange.ExchangeBindings().Count().ShouldBe(2); + } + + [Fact] + public void bind_exchange_with_arguments() + { + var exchange = theTransport.Exchanges["dest"]; + var args = new Dictionary { { "x-match", "any" } }; + var binding = exchange.BindExchange("source", "key", args); + + binding.Arguments.ShouldContainKeyAndValue("x-match", "any"); + } + + [Fact] + public void bind_exchange_ensures_source_exchange_exists() + { + var exchange = theTransport.Exchanges["dest"]; + exchange.BindExchange("auto-created-source", "key"); + + theTransport.Exchanges.Contains("auto-created-source").ShouldBeTrue(); + } + + [Fact] + public void has_exchange_bindings_is_false_by_default() + { + var exchange = theTransport.Exchanges["empty"]; + exchange.HasExchangeBindings.ShouldBeFalse(); + } + + [Fact] + public void has_exchange_bindings_is_true_after_adding() + { + var exchange = theTransport.Exchanges["dest"]; + exchange.BindExchange("source", "key"); + exchange.HasExchangeBindings.ShouldBeTrue(); + } + + [Fact] + public void bind_exchange_throws_on_null_source() + { + var exchange = theTransport.Exchanges["dest"]; + Should.Throw(() => exchange.BindExchange(null!)); + } + } + + public class exchange_declare_with_exchange_bindings + { + [Fact] + public async Task declare_async_also_declares_exchange_bindings() + { + var channel = Substitute.For(); + var transport = new RabbitMqTransport(); + var exchange = transport.Exchanges["dest"]; + exchange.ExchangeType = ExchangeType.Topic; + exchange.BindExchange("source", "routing.key"); + + await exchange.DeclareAsync(channel, NullLogger.Instance); + + await channel.Received().ExchangeDeclareAsync("dest", "topic", true, false, exchange.Arguments); + await channel.Received().ExchangeBindAsync("dest", "source", "routing.key", + Arg.Any>()); + } + } +} diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/end_to_end.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/end_to_end.cs index 5eca04a66..526db5eee 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/end_to_end.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/end_to_end.cs @@ -758,6 +758,80 @@ public async Task use_direct_exchange() } + [Fact] + public async Task use_exchange_to_exchange_binding() + { + var sourceExchange = RabbitTesting.NextExchangeName(); + var destinationExchange = RabbitTesting.NextExchangeName(); + var queueName = RabbitTesting.NextQueueName(); + + using var publisher = await WolverineHost.ForAsync(opts => + { + opts.UseRabbitMq().AutoProvision() + // Bind source exchange to destination exchange + .BindExchange(sourceExchange).ToExchange(destinationExchange, "e2e.key") + // Bind destination exchange to a queue so we can consume + .BindExchange(destinationExchange).ToQueue(queueName, "e2e.key"); + + opts.PublishAllMessages().ToRabbitExchange(sourceExchange); + }); + + using var receiver = await WolverineHost.ForAsync(opts => + { + opts.UseRabbitMq(); + + opts.ListenToRabbitQueue(queueName); + opts.Services.AddSingleton(); + }); + + var session = await publisher + .TrackActivity() + .Timeout(30.Seconds()) + .AlsoTrack(receiver) + .WaitForMessageToBeReceivedAt(receiver) + .SendMessageAndWaitAsync(new ColorChosen { Name = "Blue" }); + + receiver.Get().Name.ShouldBe("Blue"); + } + + [Fact] + public async Task use_exchange_to_exchange_binding_via_declare_exchange() + { + var sourceExchange = RabbitTesting.NextExchangeName(); + var destinationExchange = RabbitTesting.NextExchangeName(); + var queueName = RabbitTesting.NextQueueName(); + + using var publisher = await WolverineHost.ForAsync(opts => + { + opts.UseRabbitMq().AutoProvision() + .DeclareExchange(destinationExchange, exchange => + { + exchange.ExchangeType = ExchangeType.Fanout; + exchange.BindExchange(sourceExchange); + exchange.BindQueue(queueName); + }); + + opts.PublishAllMessages().ToRabbitExchange(sourceExchange); + }); + + using var receiver = await WolverineHost.ForAsync(opts => + { + opts.UseRabbitMq(); + + opts.ListenToRabbitQueue(queueName); + opts.Services.AddSingleton(); + }); + + var session = await publisher + .TrackActivity() + .Timeout(30.Seconds()) + .AlsoTrack(receiver) + .WaitForMessageToBeReceivedAt(receiver) + .SendMessageAndWaitAsync(new ColorChosen { Name = "Green" }); + + receiver.Get().Name.ShouldBe("Green"); + } + [Fact] public async Task request_reply_from_within_handler() { diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/exchange_queue_binding_model_setup_and_teardown_smoke_tests.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/exchange_queue_binding_model_setup_and_teardown_smoke_tests.cs index 5c59e58c8..dca0df7ae 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/exchange_queue_binding_model_setup_and_teardown_smoke_tests.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/exchange_queue_binding_model_setup_and_teardown_smoke_tests.cs @@ -39,6 +39,11 @@ public exchange_queue_binding_model_setup_and_teardown_smoke_tests() .BindExchange("fan1") .ToQueue("xqueue2", "key2"); + // Exchange-to-exchange binding + expression + .BindExchange("direct1") + .ToExchange("fan1", "e2e_key"); + var wolverineRuntime = new MockWolverineRuntime(); theTransport.TryBuildStatefulResource(wolverineRuntime, out var resource); diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/DynamicObjectCreationExtensions.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/DynamicObjectCreationExtensions.cs index e73765200..b8ac2552a 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/DynamicObjectCreationExtensions.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/DynamicObjectCreationExtensions.cs @@ -44,6 +44,19 @@ public static async Task UnBindRabbitMqQueue(this IWolverineRuntime runtime, str var transport = runtime.Options.Transports.GetOrCreate(); await transport.WithAdminChannelAsync(model => model.QueueUnbindAsync(queueName, exchangeName, routingKey)); } + + /// + /// Will un-bind an exchange-to-exchange binding + /// + /// + /// The destination exchange name + /// The source exchange name + /// Binding key name + public static async Task UnBindRabbitMqExchange(this IWolverineRuntime runtime, string destinationExchangeName, string sourceExchangeName, string routingKey) + { + var transport = runtime.Options.Transports.GetOrCreate(); + await transport.WithAdminChannelAsync(model => model.ExchangeUnbindAsync(destinationExchangeName, sourceExchangeName, routingKey)); + } } public class RabbitMqObjects @@ -101,7 +114,7 @@ await _transport.WithAdminChannelAsync(async model => foreach (var queue in _queues) { await queue.DeclareAsync(model, _logger); - + foreach (var binding in queue.Bindings()) { await binding.DeclareAsync(model, _logger); diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/IRabbitMqBindableExchange.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/IRabbitMqBindableExchange.cs index 91b47a35a..4507a30d8 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/IRabbitMqBindableExchange.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/IRabbitMqBindableExchange.cs @@ -17,5 +17,14 @@ public interface IRabbitMqBindableExchange : IRabbitMqExchange /// /// public RabbitMqBinding BindQueue(string queueName, string? bindingKey = null); - + + /// + /// Bind a source exchange to this exchange (this exchange is the destination). + /// Messages published to the source exchange will be routed to this exchange. + /// + /// The exchange that receives published messages + /// Optional routing/binding key + /// + public RabbitMqExchangeBinding BindExchange(string sourceExchangeName, string? bindingKey = null); + } \ No newline at end of file diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqExchange.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqExchange.cs index 47045517e..740c2baff 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqExchange.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqExchange.cs @@ -10,6 +10,7 @@ namespace Wolverine.RabbitMQ.Internal; public class RabbitMqExchange : RabbitMqEndpoint, IRabbitMqExchange { private readonly RabbitMqTransport _parent; + private readonly List _exchangeBindings = []; private bool _initialized; @@ -58,6 +59,46 @@ public override bool AutoStartSendingAgent() public IDictionary Arguments { get; } = new Dictionary(); + internal bool HasExchangeBindings => _exchangeBindings.Count > 0; + + /// + /// Bind a source exchange to this exchange (this exchange is the destination). + /// Messages published to the source exchange will be routed to this exchange. + /// + /// The exchange that receives published messages + /// Optional routing/binding key + /// Optional binding arguments + /// + public RabbitMqExchangeBinding BindExchange(string sourceExchangeName, string? bindingKey = null, Dictionary? arguments = null) + { + if (sourceExchangeName == null) + { + throw new ArgumentNullException(nameof(sourceExchangeName)); + } + + var existing = _exchangeBindings.FirstOrDefault(x => x.SourceExchangeName == sourceExchangeName && x.BindingKey == bindingKey); + if (existing != null) return existing; + + // Ensure the source exchange exists so resource setup works correctly + _parent.Exchanges.FillDefault(sourceExchangeName); + + var binding = new RabbitMqExchangeBinding(sourceExchangeName, Name, bindingKey); + if (arguments is not null) + { + foreach (var argument in arguments) + { + binding.Arguments.Add(argument); + } + } + _exchangeBindings.Add(binding); + return binding; + } + + public IEnumerable ExchangeBindings() + { + return _exchangeBindings; + } + // this is meh public string? DirectRoutingKey { get; set; } @@ -106,6 +147,19 @@ internal async Task DeclareAsync(IChannel channel, ILogger logger) DeclaredName, exchangeTypeName, IsDurable, AutoDelete); HasDeclared = true; + + foreach (var binding in _exchangeBindings) + { + // Ensure the source exchange is declared before creating the binding, + // since ExchangeBindAsync requires both exchanges to exist in RabbitMQ + var sourceExchange = _parent.Exchanges[binding.SourceExchangeName]; + if (!sourceExchange.HasDeclared) + { + await sourceExchange.DeclareAsync(channel, logger); + } + + await binding.DeclareAsync(channel, logger); + } } public override async ValueTask CheckAsync() @@ -126,6 +180,11 @@ public override async ValueTask TeardownAsync(ILogger logger) { await _parent.WithAdminChannelAsync(async channel => { + foreach (var binding in _exchangeBindings) + { + await binding.TeardownAsync(channel); + } + if (DeclaredName == string.Empty) { } diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransportExpression.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransportExpression.cs index cbfaab45b..055b165bc 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransportExpression.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransportExpression.cs @@ -309,6 +309,35 @@ public RabbitMqTransportExpression ToQueue(string queueName, string bindingKey, return _parent; } + + /// + /// Bind the named exchange to a destination exchange. The binding key will be + /// [source exchange name]_[destination exchange name] + /// + /// + /// Optional configuration of the destination Rabbit MQ exchange + public RabbitMqTransportExpression ToExchange(string destinationExchangeName, + Action? configure = null) + { + return ToExchange(destinationExchangeName, $"{_exchangeName}_{destinationExchangeName}", configure); + } + + /// + /// Bind the named exchange to a destination exchange with a user supplied binding key + /// + /// + /// + /// Optional configuration of the destination Rabbit MQ exchange + /// Optional configuration for arguments to the Rabbit MQ binding + public RabbitMqTransportExpression ToExchange(string destinationExchangeName, string bindingKey, + Action? configure = null, Dictionary? arguments = null) + { + _parent.DeclareExchange(destinationExchangeName, configure); + var destinationExchange = _parent.Transport.Exchanges[destinationExchangeName]; + destinationExchange.BindExchange(_exchangeName, bindingKey, arguments); + + return _parent; + } } /// diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqExchangeBinding.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqExchangeBinding.cs new file mode 100644 index 000000000..d2314ab48 --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqExchangeBinding.cs @@ -0,0 +1,74 @@ +using Microsoft.Extensions.Logging; +using RabbitMQ.Client; + +namespace Wolverine.RabbitMQ; + +public class RabbitMqExchangeBinding +{ + public RabbitMqExchangeBinding(string sourceExchangeName, string destinationExchangeName, string? bindingKey = null) + { + SourceExchangeName = sourceExchangeName ?? throw new ArgumentNullException(nameof(sourceExchangeName)); + DestinationExchangeName = destinationExchangeName ?? throw new ArgumentNullException(nameof(destinationExchangeName)); + BindingKey = bindingKey ?? $"{sourceExchangeName}_{destinationExchangeName}"; + } + + public string BindingKey { get; } + public string SourceExchangeName { get; } + public string DestinationExchangeName { get; } + + public IDictionary Arguments { get; } = new Dictionary(); + public bool HasDeclared { get; private set; } + + internal async Task DeclareAsync(IChannel channel, ILogger logger) + { + await channel.ExchangeBindAsync(DestinationExchangeName, SourceExchangeName, BindingKey, Arguments); + logger.LogInformation( + "Declared a Rabbit Mq exchange binding '{Key}' from exchange {Source} to exchange {Destination}", + BindingKey, SourceExchangeName, DestinationExchangeName); + + HasDeclared = true; + } + + public async Task TeardownAsync(IChannel channel) + { + await channel.ExchangeUnbindAsync(DestinationExchangeName, SourceExchangeName, BindingKey, Arguments); + } + + protected bool Equals(RabbitMqExchangeBinding other) + { + return BindingKey == other.BindingKey + && SourceExchangeName == other.SourceExchangeName + && DestinationExchangeName == other.DestinationExchangeName; + } + + public override bool Equals(object? obj) + { + if (ReferenceEquals(null, obj)) + { + return false; + } + + if (ReferenceEquals(this, obj)) + { + return true; + } + + if (obj.GetType() != GetType()) + { + return false; + } + + return Equals((RabbitMqExchangeBinding)obj); + } + + public override int GetHashCode() + { + return HashCode.Combine(BindingKey, SourceExchangeName, DestinationExchangeName); + } + + public override string ToString() + { + return + $"{nameof(BindingKey)}: {BindingKey}, {nameof(SourceExchangeName)}: {SourceExchangeName}, {nameof(DestinationExchangeName)}: {DestinationExchangeName}"; + } +} diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqExchangeConfigurationExpression.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqExchangeConfigurationExpression.cs index 1c6728cd9..e4a7fb960 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqExchangeConfigurationExpression.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqExchangeConfigurationExpression.cs @@ -41,4 +41,9 @@ public RabbitMqBinding BindQueue(string queueName, string? bindingKey = null) var queue = _transport.Queues[queueName]; return queue.BindExchange(Name, bindingKey); } + + public RabbitMqExchangeBinding BindExchange(string sourceExchangeName, string? bindingKey = null) + { + return _exchange.BindExchange(sourceExchangeName, bindingKey); + } } \ No newline at end of file