Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/guide/messaging/transports/rabbitmq/object-management.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,36 @@ return await app.RunJasperFxCommands(args);
Note that this stateful resource model is also available at the command line as well for deploy time
management.

## Exchange-to-Exchange Bindings

Wolverine supports [RabbitMQ exchange-to-exchange bindings](https://www.rabbitmq.com/docs/e2e), which allow you
to route messages between exchanges before they reach a queue. This is useful for building message routing
topologies where a source exchange fans out to multiple destination exchanges, each with their own queue bindings.

You can declare exchange-to-exchange bindings using the fluent `BindExchange().ToExchange()` syntax:

```csharp
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseRabbitMq()
.AutoProvision()

// Bind source exchange to destination exchange with a routing key
.BindExchange("source-exchange").ToExchange("destination-exchange", "routing.key")

// The destination exchange still needs a queue binding for consumers
.BindExchange("destination-exchange").ToQueue("my-queue", "routing.key");

opts.PublishAllMessages().ToRabbitExchange("source-exchange");
opts.ListenToRabbitQueue("my-queue");
}).StartAsync();
```

When `AutoProvision()` is enabled, Wolverine will automatically declare the exchanges and create the
exchange-to-exchange bindings at startup. Both the source and destination exchanges are created
if they don't already exist.

## Runtime Declaration

From a user request, there are some extension methods in the WolverineFx.RabbitMQ Nuget off of `IWolverineRuntime` that will enable you to
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
using RabbitMQ.Client;
using Shouldly;
using Wolverine.RabbitMQ.Internal;
using Xunit;

namespace Wolverine.RabbitMQ.Tests;

public class RabbitMqExchangeBindingTests
{
[Fact]
public async Task declare_calls_exchange_bind()
{
var channel = Substitute.For<IChannel>();
var binding = new RabbitMqExchangeBinding("source", "destination", "routing.key");

await binding.DeclareAsync(channel, NullLogger.Instance);

await channel.Received().ExchangeBindAsync("destination", "source", "routing.key", binding.Arguments);
binding.HasDeclared.ShouldBeTrue();
}

[Fact]
public async Task teardown_calls_exchange_unbind()
{
var channel = Substitute.For<IChannel>();
var binding = new RabbitMqExchangeBinding("source", "destination", "routing.key");

await binding.TeardownAsync(channel);

await channel.Received().ExchangeUnbindAsync("destination", "source", "routing.key", binding.Arguments);
}

public class when_adding_exchange_to_exchange_bindings
{
private readonly RabbitMqTransport theTransport = new();

public when_adding_exchange_to_exchange_bindings()
{
new RabbitMqTransportExpression(theTransport, new WolverineOptions())
.BindExchange("source-exchange").ToExchange("destination-exchange", "routing.key");
}

[Fact]
public void should_add_the_exchange_binding()
{
var destExchange = theTransport.Exchanges["destination-exchange"];
var binding = destExchange.ExchangeBindings().Single();
binding.SourceExchangeName.ShouldBe("source-exchange");
binding.DestinationExchangeName.ShouldBe("destination-exchange");
binding.BindingKey.ShouldBe("routing.key");
}

[Fact]
public void should_declare_the_source_exchange()
{
theTransport.Exchanges.Contains("source-exchange").ShouldBeTrue();
}

[Fact]
public void should_declare_the_destination_exchange()
{
theTransport.Exchanges.Contains("destination-exchange").ShouldBeTrue();
}

[Fact]
public void add_binding_without_routing_key()
{
new RabbitMqTransportExpression(theTransport, new WolverineOptions())
.BindExchange("source2").ToExchange("destination2");

var destExchange = theTransport.Exchanges["destination2"];
destExchange.ExchangeBindings()
.ShouldContain(x => x.BindingKey == "source2_destination2");
}
}


public class exchange_binding_via_declare_exchange
{
private readonly RabbitMqTransport theTransport = new();

[Fact]
public void bind_exchange_via_declare_exchange_configuration()
{
new RabbitMqTransportExpression(theTransport, new WolverineOptions())
.DeclareExchange("destination", exchange =>
{
exchange.ExchangeType = ExchangeType.Topic;
exchange.BindExchange("source", "routing.*");
});

var destExchange = theTransport.Exchanges["destination"];
destExchange.ExchangeType.ShouldBe(ExchangeType.Topic);

var binding = destExchange.ExchangeBindings().Single();
binding.SourceExchangeName.ShouldBe("source");
binding.BindingKey.ShouldBe("routing.*");
}

[Fact]
public void bind_exchange_deduplicates()
{
var exchange = theTransport.Exchanges["dest"];
exchange.BindExchange("source", "key");
exchange.BindExchange("source", "key");

exchange.ExchangeBindings().Count().ShouldBe(1);
}

[Fact]
public void bind_exchange_different_keys_are_separate()
{
var exchange = theTransport.Exchanges["dest"];
exchange.BindExchange("source", "key1");
exchange.BindExchange("source", "key2");

exchange.ExchangeBindings().Count().ShouldBe(2);
}

[Fact]
public void bind_exchange_with_arguments()
{
var exchange = theTransport.Exchanges["dest"];
var args = new Dictionary<string, object> { { "x-match", "any" } };
var binding = exchange.BindExchange("source", "key", args);

binding.Arguments.ShouldContainKeyAndValue("x-match", "any");
}

[Fact]
public void bind_exchange_ensures_source_exchange_exists()
{
var exchange = theTransport.Exchanges["dest"];
exchange.BindExchange("auto-created-source", "key");

theTransport.Exchanges.Contains("auto-created-source").ShouldBeTrue();
}

[Fact]
public void has_exchange_bindings_is_false_by_default()
{
var exchange = theTransport.Exchanges["empty"];
exchange.HasExchangeBindings.ShouldBeFalse();
}

[Fact]
public void has_exchange_bindings_is_true_after_adding()
{
var exchange = theTransport.Exchanges["dest"];
exchange.BindExchange("source", "key");
exchange.HasExchangeBindings.ShouldBeTrue();
}

[Fact]
public void bind_exchange_throws_on_null_source()
{
var exchange = theTransport.Exchanges["dest"];
Should.Throw<ArgumentNullException>(() => exchange.BindExchange(null!));
}
}

public class exchange_declare_with_exchange_bindings
{
[Fact]
public async Task declare_async_also_declares_exchange_bindings()
{
var channel = Substitute.For<IChannel>();
var transport = new RabbitMqTransport();
var exchange = transport.Exchanges["dest"];
exchange.ExchangeType = ExchangeType.Topic;
exchange.BindExchange("source", "routing.key");

await exchange.DeclareAsync(channel, NullLogger.Instance);

await channel.Received().ExchangeDeclareAsync("dest", "topic", true, false, exchange.Arguments);
await channel.Received().ExchangeBindAsync("dest", "source", "routing.key",
Arg.Any<IDictionary<string, object?>>());
}
}
}
74 changes: 74 additions & 0 deletions src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/end_to_end.cs
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,80 @@ public async Task use_direct_exchange()
}


[Fact]
public async Task use_exchange_to_exchange_binding()
{
var sourceExchange = RabbitTesting.NextExchangeName();
var destinationExchange = RabbitTesting.NextExchangeName();
var queueName = RabbitTesting.NextQueueName();

using var publisher = await WolverineHost.ForAsync(opts =>
{
opts.UseRabbitMq().AutoProvision()
// Bind source exchange to destination exchange
.BindExchange(sourceExchange).ToExchange(destinationExchange, "e2e.key")
// Bind destination exchange to a queue so we can consume
.BindExchange(destinationExchange).ToQueue(queueName, "e2e.key");

opts.PublishAllMessages().ToRabbitExchange(sourceExchange);
});

using var receiver = await WolverineHost.ForAsync(opts =>
{
opts.UseRabbitMq();

opts.ListenToRabbitQueue(queueName);
opts.Services.AddSingleton<ColorHistory>();
});

var session = await publisher
.TrackActivity()
.Timeout(30.Seconds())
.AlsoTrack(receiver)
.WaitForMessageToBeReceivedAt<ColorChosen>(receiver)
.SendMessageAndWaitAsync(new ColorChosen { Name = "Blue" });

receiver.Get<ColorHistory>().Name.ShouldBe("Blue");
}

[Fact]
public async Task use_exchange_to_exchange_binding_via_declare_exchange()
{
var sourceExchange = RabbitTesting.NextExchangeName();
var destinationExchange = RabbitTesting.NextExchangeName();
var queueName = RabbitTesting.NextQueueName();

using var publisher = await WolverineHost.ForAsync(opts =>
{
opts.UseRabbitMq().AutoProvision()
.DeclareExchange(destinationExchange, exchange =>
{
exchange.ExchangeType = ExchangeType.Fanout;
exchange.BindExchange(sourceExchange);
exchange.BindQueue(queueName);
});

opts.PublishAllMessages().ToRabbitExchange(sourceExchange);
});

using var receiver = await WolverineHost.ForAsync(opts =>
{
opts.UseRabbitMq();

opts.ListenToRabbitQueue(queueName);
opts.Services.AddSingleton<ColorHistory>();
});

var session = await publisher
.TrackActivity()
.Timeout(30.Seconds())
.AlsoTrack(receiver)
.WaitForMessageToBeReceivedAt<ColorChosen>(receiver)
.SendMessageAndWaitAsync(new ColorChosen { Name = "Green" });

receiver.Get<ColorHistory>().Name.ShouldBe("Green");
}

[Fact]
public async Task request_reply_from_within_handler()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public exchange_queue_binding_model_setup_and_teardown_smoke_tests()
.BindExchange("fan1")
.ToQueue("xqueue2", "key2");

// Exchange-to-exchange binding
expression
.BindExchange("direct1")
.ToExchange("fan1", "e2e_key");

var wolverineRuntime = new MockWolverineRuntime();
theTransport.TryBuildStatefulResource(wolverineRuntime, out var resource);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ public static async Task UnBindRabbitMqQueue(this IWolverineRuntime runtime, str
var transport = runtime.Options.Transports.GetOrCreate<RabbitMqTransport>();
await transport.WithAdminChannelAsync(model => model.QueueUnbindAsync(queueName, exchangeName, routingKey));
}

/// <summary>
/// Will un-bind an exchange-to-exchange binding
/// </summary>
/// <param name="runtime"></param>
/// <param name="destinationExchangeName">The destination exchange name</param>
/// <param name="sourceExchangeName">The source exchange name</param>
/// <param name="routingKey">Binding key name</param>
public static async Task UnBindRabbitMqExchange(this IWolverineRuntime runtime, string destinationExchangeName, string sourceExchangeName, string routingKey)
{
var transport = runtime.Options.Transports.GetOrCreate<RabbitMqTransport>();
await transport.WithAdminChannelAsync(model => model.ExchangeUnbindAsync(destinationExchangeName, sourceExchangeName, routingKey));
}
}

public class RabbitMqObjects
Expand Down Expand Up @@ -101,7 +114,7 @@ await _transport.WithAdminChannelAsync(async model =>
foreach (var queue in _queues)
{
await queue.DeclareAsync(model, _logger);

foreach (var binding in queue.Bindings())
{
await binding.DeclareAsync(model, _logger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,14 @@ public interface IRabbitMqBindableExchange : IRabbitMqExchange
/// <param name="bindingKey"></param>
/// <returns></returns>
public RabbitMqBinding BindQueue(string queueName, string? bindingKey = null);


/// <summary>
/// Bind a source exchange to this exchange (this exchange is the destination).
/// Messages published to the source exchange will be routed to this exchange.
/// </summary>
/// <param name="sourceExchangeName">The exchange that receives published messages</param>
/// <param name="bindingKey">Optional routing/binding key</param>
/// <returns></returns>
public RabbitMqExchangeBinding BindExchange(string sourceExchangeName, string? bindingKey = null);

}
Loading
Loading