Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions docs/guide/messaging/transports/rabbitmq/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,44 @@ using var host = await Host.CreateDefaultBuilder()
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs#L124-L146' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_only_use_sending_connection_with_rabbitmq' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Connecting to a RabbitMQ cluster

If you run RabbitMQ in a high-availability cluster, declare each node via
`AddClusterNode`. Wolverine forwards the list to the RabbitMQ.NET client,
which selects a node and transparently fails over to another if the
chosen node becomes unreachable.

<!-- snippet: sample_rabbit_mq_cluster_nodes -->
<!-- endSnippet -->

`AddClusterNode(host, port)` copies the TLS settings configured on the
`ConnectionFactory` onto the new endpoint, so a homogeneous cluster only
needs `Ssl` configured once. To override per node — for example, with
distinct certificates — pass an
[`AmqpTcpEndpoint`](https://www.rabbitmq.com/client-libraries/dotnet-api-guide#endpoints)
directly:

```csharp
opts.UseRabbitMq(f => { f.UserName = "guest"; f.Password = "guest"; })
.AddClusterNode(new AmqpTcpEndpoint("rabbit-1.local", 5671, new SslOption
{
Enabled = true,
ServerName = "rabbit-1.local",
CertPath = "/etc/wolverine/rabbit-1.pem"
}));
```

Multi-tenant configurations that share a cluster (i.e. tenants separated
by virtual host via `AddTenant(tenantId, virtualHostName)`) inherit the
parent transport's cluster nodes automatically. Tenants configured via
`AddTenant(tenantId, Uri)` or `AddTenant(tenantId, Action<ConnectionFactory>)`
do **not** inherit the cluster — those overloads are intended for tenants
on separate brokers and bring their own connection settings. Put differently:
virtual-host tenants share the same broker and therefore the same cluster
topology; URI- and Action-based tenants are explicitly pointed at a
different broker, so inheriting cluster nodes from the parent would be
wrong.

## Aspire Integration

::: tip
Expand Down
24 changes: 24 additions & 0 deletions src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,30 @@ public static async Task use_sender_connection_only()
#endregion
}

public static async Task configure_rabbit_mq_cluster_nodes()
{
#region sample_rabbit_mq_cluster_nodes
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
// Configure the shared connection settings (credentials, TLS, etc.)
// first via UseRabbitMq, then declare each cluster node. The
// RabbitMQ.NET client picks one node and handles failover
// between them on connection loss.
opts.UseRabbitMq(f =>
{
f.UserName = "guest";
f.Password = "guest";
f.Ssl.Enabled = true;
f.Ssl.ServerName = "rabbit-cluster";
})
.AddClusterNode("rabbit-1.local")
.AddClusterNode("rabbit-2.local")
.AddClusterNode("rabbit-3.local");
}).StartAsync();
#endregion
}

public static async Task listen_to_queue()
{
#region sample_listening_to_rabbitmq_queue
Expand Down
228 changes: 228 additions & 0 deletions src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/cluster_endpoints.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
using JasperFx;
using JasperFx.Core;
using JasperFx.Descriptors;
using JasperFx.Resources;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using RabbitMQ.Client;
using Shouldly;
using Wolverine;
using Wolverine.RabbitMQ.Internal;
using Wolverine.Tracking;
using Xunit;

namespace Wolverine.RabbitMQ.Tests;

public class cluster_endpoints
{
[Fact]
public void description_has_no_cluster_nodes_when_list_is_empty()
{
var factory = new ConnectionFactory { HostName = "localhost" };
var description = new RabbitMqConnectionDescription(factory, Array.Empty<AmqpTcpEndpoint>());

var rendered = description.ToDescription();

rendered.Properties.ShouldNotContain(p => p.Name.StartsWith("ClusterNodes"));
}

[Fact]
public void description_renders_cluster_nodes_as_indexed_entries()
{
var factory = new ConnectionFactory { HostName = "primary" };
var nodes = new[]
{
new AmqpTcpEndpoint("rabbit-1", 5672),
new AmqpTcpEndpoint("rabbit-2", 5673)
};

var rendered = new RabbitMqConnectionDescription(factory, nodes).ToDescription();

rendered.Properties.Where(p => p.Name == "ClusterNodes[0]").ShouldHaveSingleItem()
.Value.ShouldBe("rabbit-1:5672");
rendered.Properties.Where(p => p.Name == "ClusterNodes[1]").ShouldHaveSingleItem()
.Value.ShouldBe("rabbit-2:5673");
}

[Fact]
public void add_cluster_node_throws_when_transport_was_not_initialised()
{
var options = new WolverineOptions();
var transport = new RabbitMqTransport();
// Deliberately not calling ConfigureFactory so ConnectionFactory stays null.
var expression = new RabbitMqTransportExpression(transport, options);

Should.Throw<InvalidOperationException>(() => expression.AddClusterNode("rabbit-1"));
}

[Fact]
public void add_cluster_node_appends_in_order()
{
var options = new WolverineOptions();
var expression = options.UseRabbitMq(_ => { });

expression
.AddClusterNode("rabbit-1", 5672)
.AddClusterNode("rabbit-2", 5672)
.AddClusterNode("rabbit-3", 5672);

var endpoints = options.RabbitMqTransport().AmqpTcpEndpoints;
endpoints.Count.ShouldBe(3);
endpoints[0].HostName.ShouldBe("rabbit-1");
endpoints[1].HostName.ShouldBe("rabbit-2");
endpoints[2].HostName.ShouldBe("rabbit-3");
}

[Fact]
public void add_cluster_node_copies_ssl_settings_as_fresh_instance()
{
var options = new WolverineOptions();
var expression = options.UseRabbitMq(f =>
{
f.Ssl.Enabled = true;
f.Ssl.ServerName = "rabbit-cluster";
});

expression.AddClusterNode("rabbit-1", 5671);

var transport = options.RabbitMqTransport();
var endpoint = transport.AmqpTcpEndpoints.ShouldHaveSingleItem();

endpoint.Ssl.Enabled.ShouldBeTrue();
endpoint.Ssl.ServerName.ShouldBe("rabbit-cluster");
// Distinct instance — mutating the factory afterwards must not leak in.
endpoint.Ssl.ShouldNotBeSameAs(transport.ConnectionFactory!.Ssl);
}

[Fact]
public void add_cluster_node_with_endpoint_stores_supplied_object_by_reference()
{
var options = new WolverineOptions();
var expression = options.UseRabbitMq(_ => { });
var supplied = new AmqpTcpEndpoint("custom-host", 1234, new SslOption { Enabled = true, ServerName = "custom-tls" });

expression.AddClusterNode(supplied);

var stored = options.RabbitMqTransport().AmqpTcpEndpoints.ShouldHaveSingleItem();
stored.ShouldBeSameAs(supplied);
}

[Fact]
public void add_cluster_node_with_default_port_resolves_to_amqp_default()
{
var options = new WolverineOptions();
var expression = options.UseRabbitMq(_ => { });

expression.AddClusterNode("rabbit-1");

var endpoint = options.RabbitMqTransport().AmqpTcpEndpoints.ShouldHaveSingleItem();
endpoint.Port.ShouldBe(5672);
}

[Fact]
public void add_cluster_node_endpoint_overload_throws_when_transport_was_not_initialised()
{
var options = new WolverineOptions();
var transport = new RabbitMqTransport();
var expression = new RabbitMqTransportExpression(transport, options);
var endpoint = new AmqpTcpEndpoint("rabbit-1", 5672);

Should.Throw<InvalidOperationException>(() => expression.AddClusterNode(endpoint));
}

[Fact]
public void virtual_host_tenant_inherits_parent_cluster_nodes()
{
var options = new WolverineOptions();
var parent = options
.UseRabbitMq(f => { f.HostName = "primary"; f.UserName = "guest"; })
.AddClusterNode("rabbit-1", 5672)
.AddClusterNode("rabbit-2", 5672);

parent.AddTenant("acme", "vh-acme");

var parentTransport = options.RabbitMqTransport();
var tenant = parentTransport.Tenants["acme"];
tenant.Compile(parentTransport);

tenant.Transport.AmqpTcpEndpoints.Count.ShouldBe(2);
tenant.Transport.AmqpTcpEndpoints[0].HostName.ShouldBe("rabbit-1");
tenant.Transport.AmqpTcpEndpoints[1].HostName.ShouldBe("rabbit-2");
}

// Regression guard for the documented limitation:
// tenants configured via AddTenant(tenantId, Uri) bring their own transport
// and must not inherit cluster endpoints from the parent. If a future change
// accidentally moves the endpoint-copy loop outside the VirtualHostName branch,
// this test will start failing.
[Fact]
public void uri_tenant_does_not_inherit_parent_cluster_nodes()
{
var options = new WolverineOptions();
var parent = options
.UseRabbitMq(f => { f.HostName = "primary"; })
.AddClusterNode("rabbit-1", 5672);

parent.AddTenant("acme", new Uri("amqp://other-host:5672/vh-acme"));

var parentTransport = options.RabbitMqTransport();
var tenant = parentTransport.Tenants["acme"];
tenant.Compile(parentTransport);

tenant.Transport.AmqpTcpEndpoints.ShouldBeEmpty();
}

[Fact]
public void compiling_virtual_host_tenant_twice_does_not_duplicate_cluster_nodes()
{
var options = new WolverineOptions();
var parent = options
.UseRabbitMq(f => { f.HostName = "primary"; })
.AddClusterNode("rabbit-1", 5672)
.AddClusterNode("rabbit-2", 5672);

parent.AddTenant("acme", "vh-acme");

var parentTransport = options.RabbitMqTransport();
var tenant = parentTransport.Tenants["acme"];
tenant.Compile(parentTransport);
tenant.Compile(parentTransport);

tenant.Transport.AmqpTcpEndpoints.Count.ShouldBe(2);
}

[Fact, Trait("Category", "Flaky")]
public async Task can_publish_and_receive_through_cluster_code_path()
{
var queueName = RabbitTesting.NextQueueName();

using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseRabbitMq(f => { f.UserName = "guest"; f.Password = "guest"; })
.AutoProvision()
.AutoPurgeOnStartup()
.AddClusterNode("localhost", 5672);

opts.PublishMessage<ClusterPing>().ToRabbitQueue(queueName);
opts.ListenToRabbitQueue(queueName);

opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState);
}).StartAsync();

var session = await host
.TrackActivity()
.IncludeExternalTransports()
.Timeout(30.Seconds())
.PublishMessageAndWaitAsync(new ClusterPing("hello"));

session.Received.SingleMessage<ClusterPing>().Text.ShouldBe("hello");
}
}

public record ClusterPing(string Text);

public class ClusterPingHandler
{
public void Handle(ClusterPing _) { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,23 @@ namespace Wolverine.RabbitMQ.Internal;
/// <summary>
/// Secret-safe description of a <see cref="ConnectionFactory"/> for use with
/// <see cref="OptionsDescription"/>. Exposes non-secret connection fields
/// (host, port, vhost, username, SSL, heartbeat) and deliberately omits the
/// password and any credential-carrying properties.
/// (host, port, vhost, username, SSL, heartbeat, cluster nodes) and
/// deliberately omits the password and any credential-carrying properties.
/// </summary>
public sealed class RabbitMqConnectionDescription : IDescribeMyself
{
private readonly ConnectionFactory _factory;
private readonly IReadOnlyList<AmqpTcpEndpoint> _clusterNodes;

public RabbitMqConnectionDescription(ConnectionFactory factory)
: this(factory, Array.Empty<AmqpTcpEndpoint>())
{
}

public RabbitMqConnectionDescription(ConnectionFactory factory, IReadOnlyList<AmqpTcpEndpoint> clusterNodes)
{
_factory = factory;
_clusterNodes = clusterNodes ?? Array.Empty<AmqpTcpEndpoint>();
}

public OptionsDescription ToDescription()
Expand Down Expand Up @@ -48,6 +55,12 @@ public OptionsDescription ToDescription()
}
}

for (var i = 0; i < _clusterNodes.Count; i++)
{
var ep = _clusterNodes[i];
description.AddValue($"ClusterNodes[{i}]", $"{ep.HostName}:{ep.Port}");
}

return description;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ public RabbitMqTransport Compile(RabbitMqTransport parent)

f.VirtualHost = VirtualHostName;
});

// Inherit the parent's cluster endpoints so virtual-host tenants
// share the broker topology declared on the parent transport.
// Guard against duplicate appends if Compile() runs more than once
// on the same tenant instance.
if (Transport.AmqpTcpEndpoints.Count == 0)
{
foreach (var ep in parent.AmqpTcpEndpoints)
{
Transport.AmqpTcpEndpoints.Add(ep);
}
}
}

CloneDeadLetterQueue(parent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ private void configureDefaults(ConnectionFactory factory)
/// </summary>
[ChildDescription]
public RabbitMqConnectionDescription? ConnectionDescription =>
ConnectionFactory == null ? null : new RabbitMqConnectionDescription(ConnectionFactory);
ConnectionFactory == null
? null
: new RabbitMqConnectionDescription(ConnectionFactory, AmqpTcpEndpoints.ToList());

internal void ConfigureFactory(Action<ConnectionFactory> configure)
{
Expand Down
Loading
Loading