diff --git a/docs/guide/messaging/transports/rabbitmq/index.md b/docs/guide/messaging/transports/rabbitmq/index.md index 1459c0b4d..2a37dae82 100644 --- a/docs/guide/messaging/transports/rabbitmq/index.md +++ b/docs/guide/messaging/transports/rabbitmq/index.md @@ -112,6 +112,44 @@ using var host = await Host.CreateDefaultBuilder() snippet source | anchor +## Connecting to a RabbitMQ cluster + +If you run RabbitMQ in a high-availability cluster, declare each node via +`AddClusterNode`. Wolverine forwards the list to the RabbitMQ.NET client, +which selects a node and transparently fails over to another if the +chosen node becomes unreachable. + + + + +`AddClusterNode(host, port)` copies the TLS settings configured on the +`ConnectionFactory` onto the new endpoint, so a homogeneous cluster only +needs `Ssl` configured once. To override per node — for example, with +distinct certificates — pass an +[`AmqpTcpEndpoint`](https://www.rabbitmq.com/client-libraries/dotnet-api-guide#endpoints) +directly: + +```csharp +opts.UseRabbitMq(f => { f.UserName = "guest"; f.Password = "guest"; }) + .AddClusterNode(new AmqpTcpEndpoint("rabbit-1.local", 5671, new SslOption + { + Enabled = true, + ServerName = "rabbit-1.local", + CertPath = "/etc/wolverine/rabbit-1.pem" + })); +``` + +Multi-tenant configurations that share a cluster (i.e. tenants separated +by virtual host via `AddTenant(tenantId, virtualHostName)`) inherit the +parent transport's cluster nodes automatically. Tenants configured via +`AddTenant(tenantId, Uri)` or `AddTenant(tenantId, Action)` +do **not** inherit the cluster — those overloads are intended for tenants +on separate brokers and bring their own connection settings. Put differently: +virtual-host tenants share the same broker and therefore the same cluster +topology; URI- and Action-based tenants are explicitly pointed at a +different broker, so inheriting cluster nodes from the parent would be +wrong. + ## Aspire Integration ::: tip diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs index d0bdbce72..9dfd132e3 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Samples.cs @@ -146,6 +146,30 @@ public static async Task use_sender_connection_only() #endregion } + public static async Task configure_rabbit_mq_cluster_nodes() + { + #region sample_rabbit_mq_cluster_nodes + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + // Configure the shared connection settings (credentials, TLS, etc.) + // first via UseRabbitMq, then declare each cluster node. The + // RabbitMQ.NET client picks one node and handles failover + // between them on connection loss. + opts.UseRabbitMq(f => + { + f.UserName = "guest"; + f.Password = "guest"; + f.Ssl.Enabled = true; + f.Ssl.ServerName = "rabbit-cluster"; + }) + .AddClusterNode("rabbit-1.local") + .AddClusterNode("rabbit-2.local") + .AddClusterNode("rabbit-3.local"); + }).StartAsync(); + #endregion + } + public static async Task listen_to_queue() { #region sample_listening_to_rabbitmq_queue diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/cluster_endpoints.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/cluster_endpoints.cs new file mode 100644 index 000000000..da82ec5e6 --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/cluster_endpoints.cs @@ -0,0 +1,228 @@ +using JasperFx; +using JasperFx.Core; +using JasperFx.Descriptors; +using JasperFx.Resources; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using RabbitMQ.Client; +using Shouldly; +using Wolverine; +using Wolverine.RabbitMQ.Internal; +using Wolverine.Tracking; +using Xunit; + +namespace Wolverine.RabbitMQ.Tests; + +public class cluster_endpoints +{ + [Fact] + public void description_has_no_cluster_nodes_when_list_is_empty() + { + var factory = new ConnectionFactory { HostName = "localhost" }; + var description = new RabbitMqConnectionDescription(factory, Array.Empty()); + + var rendered = description.ToDescription(); + + rendered.Properties.ShouldNotContain(p => p.Name.StartsWith("ClusterNodes")); + } + + [Fact] + public void description_renders_cluster_nodes_as_indexed_entries() + { + var factory = new ConnectionFactory { HostName = "primary" }; + var nodes = new[] + { + new AmqpTcpEndpoint("rabbit-1", 5672), + new AmqpTcpEndpoint("rabbit-2", 5673) + }; + + var rendered = new RabbitMqConnectionDescription(factory, nodes).ToDescription(); + + rendered.Properties.Where(p => p.Name == "ClusterNodes[0]").ShouldHaveSingleItem() + .Value.ShouldBe("rabbit-1:5672"); + rendered.Properties.Where(p => p.Name == "ClusterNodes[1]").ShouldHaveSingleItem() + .Value.ShouldBe("rabbit-2:5673"); + } + + [Fact] + public void add_cluster_node_throws_when_transport_was_not_initialised() + { + var options = new WolverineOptions(); + var transport = new RabbitMqTransport(); + // Deliberately not calling ConfigureFactory so ConnectionFactory stays null. + var expression = new RabbitMqTransportExpression(transport, options); + + Should.Throw(() => expression.AddClusterNode("rabbit-1")); + } + + [Fact] + public void add_cluster_node_appends_in_order() + { + var options = new WolverineOptions(); + var expression = options.UseRabbitMq(_ => { }); + + expression + .AddClusterNode("rabbit-1", 5672) + .AddClusterNode("rabbit-2", 5672) + .AddClusterNode("rabbit-3", 5672); + + var endpoints = options.RabbitMqTransport().AmqpTcpEndpoints; + endpoints.Count.ShouldBe(3); + endpoints[0].HostName.ShouldBe("rabbit-1"); + endpoints[1].HostName.ShouldBe("rabbit-2"); + endpoints[2].HostName.ShouldBe("rabbit-3"); + } + + [Fact] + public void add_cluster_node_copies_ssl_settings_as_fresh_instance() + { + var options = new WolverineOptions(); + var expression = options.UseRabbitMq(f => + { + f.Ssl.Enabled = true; + f.Ssl.ServerName = "rabbit-cluster"; + }); + + expression.AddClusterNode("rabbit-1", 5671); + + var transport = options.RabbitMqTransport(); + var endpoint = transport.AmqpTcpEndpoints.ShouldHaveSingleItem(); + + endpoint.Ssl.Enabled.ShouldBeTrue(); + endpoint.Ssl.ServerName.ShouldBe("rabbit-cluster"); + // Distinct instance — mutating the factory afterwards must not leak in. + endpoint.Ssl.ShouldNotBeSameAs(transport.ConnectionFactory!.Ssl); + } + + [Fact] + public void add_cluster_node_with_endpoint_stores_supplied_object_by_reference() + { + var options = new WolverineOptions(); + var expression = options.UseRabbitMq(_ => { }); + var supplied = new AmqpTcpEndpoint("custom-host", 1234, new SslOption { Enabled = true, ServerName = "custom-tls" }); + + expression.AddClusterNode(supplied); + + var stored = options.RabbitMqTransport().AmqpTcpEndpoints.ShouldHaveSingleItem(); + stored.ShouldBeSameAs(supplied); + } + + [Fact] + public void add_cluster_node_with_default_port_resolves_to_amqp_default() + { + var options = new WolverineOptions(); + var expression = options.UseRabbitMq(_ => { }); + + expression.AddClusterNode("rabbit-1"); + + var endpoint = options.RabbitMqTransport().AmqpTcpEndpoints.ShouldHaveSingleItem(); + endpoint.Port.ShouldBe(5672); + } + + [Fact] + public void add_cluster_node_endpoint_overload_throws_when_transport_was_not_initialised() + { + var options = new WolverineOptions(); + var transport = new RabbitMqTransport(); + var expression = new RabbitMqTransportExpression(transport, options); + var endpoint = new AmqpTcpEndpoint("rabbit-1", 5672); + + Should.Throw(() => expression.AddClusterNode(endpoint)); + } + + [Fact] + public void virtual_host_tenant_inherits_parent_cluster_nodes() + { + var options = new WolverineOptions(); + var parent = options + .UseRabbitMq(f => { f.HostName = "primary"; f.UserName = "guest"; }) + .AddClusterNode("rabbit-1", 5672) + .AddClusterNode("rabbit-2", 5672); + + parent.AddTenant("acme", "vh-acme"); + + var parentTransport = options.RabbitMqTransport(); + var tenant = parentTransport.Tenants["acme"]; + tenant.Compile(parentTransport); + + tenant.Transport.AmqpTcpEndpoints.Count.ShouldBe(2); + tenant.Transport.AmqpTcpEndpoints[0].HostName.ShouldBe("rabbit-1"); + tenant.Transport.AmqpTcpEndpoints[1].HostName.ShouldBe("rabbit-2"); + } + + // Regression guard for the documented limitation: + // tenants configured via AddTenant(tenantId, Uri) bring their own transport + // and must not inherit cluster endpoints from the parent. If a future change + // accidentally moves the endpoint-copy loop outside the VirtualHostName branch, + // this test will start failing. + [Fact] + public void uri_tenant_does_not_inherit_parent_cluster_nodes() + { + var options = new WolverineOptions(); + var parent = options + .UseRabbitMq(f => { f.HostName = "primary"; }) + .AddClusterNode("rabbit-1", 5672); + + parent.AddTenant("acme", new Uri("amqp://other-host:5672/vh-acme")); + + var parentTransport = options.RabbitMqTransport(); + var tenant = parentTransport.Tenants["acme"]; + tenant.Compile(parentTransport); + + tenant.Transport.AmqpTcpEndpoints.ShouldBeEmpty(); + } + + [Fact] + public void compiling_virtual_host_tenant_twice_does_not_duplicate_cluster_nodes() + { + var options = new WolverineOptions(); + var parent = options + .UseRabbitMq(f => { f.HostName = "primary"; }) + .AddClusterNode("rabbit-1", 5672) + .AddClusterNode("rabbit-2", 5672); + + parent.AddTenant("acme", "vh-acme"); + + var parentTransport = options.RabbitMqTransport(); + var tenant = parentTransport.Tenants["acme"]; + tenant.Compile(parentTransport); + tenant.Compile(parentTransport); + + tenant.Transport.AmqpTcpEndpoints.Count.ShouldBe(2); + } + + [Fact, Trait("Category", "Flaky")] + public async Task can_publish_and_receive_through_cluster_code_path() + { + var queueName = RabbitTesting.NextQueueName(); + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRabbitMq(f => { f.UserName = "guest"; f.Password = "guest"; }) + .AutoProvision() + .AutoPurgeOnStartup() + .AddClusterNode("localhost", 5672); + + opts.PublishMessage().ToRabbitQueue(queueName); + opts.ListenToRabbitQueue(queueName); + + opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState); + }).StartAsync(); + + var session = await host + .TrackActivity() + .IncludeExternalTransports() + .Timeout(30.Seconds()) + .PublishMessageAndWaitAsync(new ClusterPing("hello")); + + session.Received.SingleMessage().Text.ShouldBe("hello"); + } +} + +public record ClusterPing(string Text); + +public class ClusterPingHandler +{ + public void Handle(ClusterPing _) { } +} diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqConnectionDescription.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqConnectionDescription.cs index a42d509a9..dbdec31f7 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqConnectionDescription.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqConnectionDescription.cs @@ -6,16 +6,23 @@ namespace Wolverine.RabbitMQ.Internal; /// /// Secret-safe description of a for use with /// . Exposes non-secret connection fields -/// (host, port, vhost, username, SSL, heartbeat) and deliberately omits the -/// password and any credential-carrying properties. +/// (host, port, vhost, username, SSL, heartbeat, cluster nodes) and +/// deliberately omits the password and any credential-carrying properties. /// public sealed class RabbitMqConnectionDescription : IDescribeMyself { private readonly ConnectionFactory _factory; + private readonly IReadOnlyList _clusterNodes; public RabbitMqConnectionDescription(ConnectionFactory factory) + : this(factory, Array.Empty()) + { + } + + public RabbitMqConnectionDescription(ConnectionFactory factory, IReadOnlyList clusterNodes) { _factory = factory; + _clusterNodes = clusterNodes ?? Array.Empty(); } public OptionsDescription ToDescription() @@ -48,6 +55,12 @@ public OptionsDescription ToDescription() } } + for (var i = 0; i < _clusterNodes.Count; i++) + { + var ep = _clusterNodes[i]; + description.AddValue($"ClusterNodes[{i}]", $"{ep.HostName}:{ep.Port}"); + } + return description; } } diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTenant.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTenant.cs index 907c716be..3d774f177 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTenant.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTenant.cs @@ -40,6 +40,18 @@ public RabbitMqTransport Compile(RabbitMqTransport parent) f.VirtualHost = VirtualHostName; }); + + // Inherit the parent's cluster endpoints so virtual-host tenants + // share the broker topology declared on the parent transport. + // Guard against duplicate appends if Compile() runs more than once + // on the same tenant instance. + if (Transport.AmqpTcpEndpoints.Count == 0) + { + foreach (var ep in parent.AmqpTcpEndpoints) + { + Transport.AmqpTcpEndpoints.Add(ep); + } + } } CloneDeadLetterQueue(parent); diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransport.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransport.cs index 5591a2f2b..cc408415c 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransport.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransport.cs @@ -114,7 +114,9 @@ private void configureDefaults(ConnectionFactory factory) /// [ChildDescription] public RabbitMqConnectionDescription? ConnectionDescription => - ConnectionFactory == null ? null : new RabbitMqConnectionDescription(ConnectionFactory); + ConnectionFactory == null + ? null + : new RabbitMqConnectionDescription(ConnectionFactory, AmqpTcpEndpoints.ToList()); internal void ConfigureFactory(Action configure) { diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransportExpression.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransportExpression.cs index 7752a2609..cc3671110 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransportExpression.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransportExpression.cs @@ -113,6 +113,71 @@ public RabbitMqTransportExpression ConfigureChannelCreation(Action + /// Add a RabbitMQ cluster node. Wolverine passes all configured nodes to the + /// RabbitMQ client, which selects one and handles failover between them. If + /// TLS is configured on the ConnectionFactory, the same SslOption values + /// are copied onto the new endpoint as a fresh SslOption instance, so TLS + /// applies to all cluster nodes by default. To control TLS or any other + /// AmqpTcpEndpoint setting per node, use the + /// overload instead. + /// + /// Hostname of the broker node. + /// Port. Defaults to -1, which the AmqpTcpEndpoint constructor + /// resolves to 5672 (or 5671 when TLS is enabled). + public RabbitMqTransportExpression AddClusterNode(string hostName, int port = -1) + { + if (Transport.ConnectionFactory == null) + { + throw new InvalidOperationException( + "Call UseRabbitMq(...) or UseRabbitMqUsingNamedConnection(...) before adding cluster nodes so that connection settings (TLS, credentials) can be inherited."); + } + + var ssl = CloneSslOption(Transport.ConnectionFactory.Ssl); + Transport.AmqpTcpEndpoints.Add(new AmqpTcpEndpoint(hostName, port, ssl)); + return this; + } + + /// + /// Add a RabbitMQ cluster node with full per-node control (e.g. per-node TLS + /// or non-default port). The supplied AmqpTcpEndpoint is used as-is — no + /// values are inherited from the ConnectionFactory. + /// + public RabbitMqTransportExpression AddClusterNode(AmqpTcpEndpoint endpoint) + { + if (endpoint == null) + { + throw new ArgumentNullException(nameof(endpoint)); + } + + if (Transport.ConnectionFactory == null) + { + throw new InvalidOperationException( + "Call UseRabbitMq(...) or UseRabbitMqUsingNamedConnection(...) before adding cluster nodes so that connection settings (TLS, credentials) can be inherited."); + } + + Transport.AmqpTcpEndpoints.Add(endpoint); + return this; + } + + private static SslOption CloneSslOption(SslOption? source) + { + if (source == null) return new SslOption(); + + return new SslOption + { + Enabled = source.Enabled, + ServerName = source.ServerName, + CertPath = source.CertPath, + CertPassphrase = source.CertPassphrase, + AcceptablePolicyErrors = source.AcceptablePolicyErrors, + Version = source.Version, + CheckCertificateRevocation = source.CheckCertificateRevocation, + CertificateValidationCallback = source.CertificateValidationCallback, + CertificateSelectionCallback = source.CertificateSelectionCallback + }; + } + protected override RabbitMqListenerConfiguration createListenerExpression(RabbitMqQueue listenerEndpoint) { return new RabbitMqListenerConfiguration(listenerEndpoint, Transport);