diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQTransportConfiguration.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQTransportConfiguration.cs index c648f7794f6..8ff0cb1e28e 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQTransportConfiguration.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/Configurations/RabbitMQTransportConfiguration.cs @@ -51,6 +51,13 @@ public RabbitMQTransportConfiguration() /// public List Bindings { get; set; } = []; + /// + /// Gets or sets a value indicating whether topology resources (queues, exchanges, bindings) + /// should be automatically provisioned on the broker. When null, defaults to true. + /// Individual resources can override this setting. + /// + public bool? AutoProvision { get; set; } + /// /// Gets or sets the bus-level defaults applied to all auto-provisioned queues and exchanges. /// diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/Conventions/RabbitMQReceiveEndpointTopologyConvention.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/Conventions/RabbitMQReceiveEndpointTopologyConvention.cs index 3d351a9021a..7259d57e7e9 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/Conventions/RabbitMQReceiveEndpointTopologyConvention.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/Conventions/RabbitMQReceiveEndpointTopologyConvention.cs @@ -32,7 +32,8 @@ public void DiscoverTopology( new RabbitMQQueueConfiguration { Name = configuration.QueueName, - AutoDelete = endpoint.Kind == ReceiveEndpointKind.Reply + AutoDelete = endpoint.Kind == ReceiveEndpointKind.Reply, + AutoProvision = configuration.AutoProvision }); } diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/Descriptors/IRabbitMQMessagingTransportDescriptor.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/Descriptors/IRabbitMQMessagingTransportDescriptor.cs index eb70bd7c0f5..7077a05e64e 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/Descriptors/IRabbitMQMessagingTransportDescriptor.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/Descriptors/IRabbitMQMessagingTransportDescriptor.cs @@ -70,6 +70,17 @@ IRabbitMQMessagingTransportDescriptor ConnectionProvider( /// A binding descriptor for further configuration. IRabbitMQBindingDescriptor DeclareBinding(string exchange, string queue); + /// + /// Sets whether topology resources should be automatically provisioned on the broker. + /// When disabled, queues, exchanges, and bindings must exist before the transport starts. + /// Individual resources can override this setting via their own AutoProvision method. + /// + /// + /// true to enable auto-provisioning (default); false to disable it globally. + /// + /// The descriptor for method chaining. + IRabbitMQMessagingTransportDescriptor AutoProvision(bool autoProvision = true); + /// new IRabbitMQMessagingTransportDescriptor Name(string name); diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/Descriptors/RabbitMQMessagingTransportDescriptor.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/Descriptors/RabbitMQMessagingTransportDescriptor.cs index 6fc435bbad4..6bacc61c643 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/Descriptors/RabbitMQMessagingTransportDescriptor.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/Descriptors/RabbitMQMessagingTransportDescriptor.cs @@ -136,6 +136,13 @@ public RabbitMQMessagingTransportDescriptor(IMessagingSetupContext discoveryCont return this; } + /// + public IRabbitMQMessagingTransportDescriptor AutoProvision(bool autoProvision = true) + { + Configuration.AutoProvision = autoProvision; + return this; + } + /// public IRabbitMQMessagingTransportDescriptor ConnectionProvider( Func connectionProvider) diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQDispatchEndpoint.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQDispatchEndpoint.cs index bb9edbee7c4..c6a784c6bb2 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQDispatchEndpoint.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQDispatchEndpoint.cs @@ -145,12 +145,14 @@ private async ValueTask EnsureProvisionedAsync(IChannel channel, CancellationTok return; } - if (Queue is not null) + var autoProvision = ((RabbitMQMessagingTopology)transport.Topology).AutoProvision; + + if (Queue is not null && (Queue.AutoProvision ?? autoProvision)) { await Queue.ProvisionAsync(channel, cancellationToken); } - if (Exchange is not null) + if (Exchange is not null && (Exchange.AutoProvision ?? autoProvision)) { await Exchange.ProvisionAsync(channel, cancellationToken); } diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQMessagingTransport.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQMessagingTransport.cs index 6e2e80967f7..3a8a0174d0a 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQMessagingTransport.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/RabbitMQMessagingTransport.cs @@ -72,8 +72,11 @@ protected override void OnAfterInitialized(IMessagingSetupContext context) Port = Connection.Port, Path = Connection.VirtualHost }; - - _topology = new RabbitMQMessagingTopology(this, builder.Uri, configuration.Defaults); + _topology = new RabbitMQMessagingTopology( + this, + builder.Uri, + configuration.Defaults, + configuration.AutoProvision ?? true); foreach (var exchange in configuration.Exchanges) { @@ -110,20 +113,30 @@ private RabbitMQDispatcher CreateDispatcher(IMessagingSetupContext context) async Task ProvisionTopologyAsync(IConnection connection, CancellationToken ct) { await using var channel = await connection.CreateChannelAsync(cancellationToken: ct); + var autoProvision = _topology.AutoProvision; - foreach (var queue in _topology.Queues) + foreach (var exchange in _topology.Exchanges) { - await queue.ProvisionAsync(channel, ct); + if (exchange.AutoProvision ?? autoProvision) + { + await exchange.ProvisionAsync(channel, ct); + } } - foreach (var exchange in _topology.Exchanges) + foreach (var queue in _topology.Queues) { - await exchange.ProvisionAsync(channel, ct); + if (queue.AutoProvision ?? autoProvision) + { + await queue.ProvisionAsync(channel, ct); + } } foreach (var binding in _topology.Bindings) { - await binding.ProvisionAsync(channel, ct); + if (binding.AutoProvision ?? autoProvision) + { + await binding.ProvisionAsync(channel, ct); + } } } } @@ -137,6 +150,7 @@ public override TransportDescription Describe() var entities = new List(); var links = new List(); + var autoProvision = _topology.AutoProvision; foreach (var exchange in _topology.Exchanges) { @@ -151,7 +165,7 @@ public override TransportDescription Describe() ["type"] = exchange.Type, ["durable"] = exchange.Durable, ["autoDelete"] = exchange.AutoDelete, - ["autoProvision"] = exchange.AutoProvision + ["autoProvision"] = exchange.AutoProvision ?? autoProvision })); } @@ -168,7 +182,7 @@ public override TransportDescription Describe() ["durable"] = queue.Durable, ["exclusive"] = queue.Exclusive, ["autoDelete"] = queue.AutoDelete, - ["autoProvision"] = queue.AutoProvision + ["autoProvision"] = queue.AutoProvision ?? autoProvision })); } @@ -189,7 +203,7 @@ public override TransportDescription Describe() new Dictionary { ["routingKey"] = string.IsNullOrEmpty(binding.RoutingKey) ? null : binding.RoutingKey, - ["autoProvision"] = binding.AutoProvision + ["autoProvision"] = binding.AutoProvision ?? autoProvision })); } diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQBinding.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQBinding.cs index 8dfa58051be..d1247fbcb46 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQBinding.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQBinding.cs @@ -15,8 +15,9 @@ public abstract class RabbitMQBinding : TopologyResource /// Gets a value indicating whether this binding is automatically provisioned during topology setup. + /// When null, the transport-level default is used. /// - public bool AutoProvision { get; protected set; } + public bool? AutoProvision { get; protected set; } /// /// Gets the routing key pattern used to filter messages passing through this binding. @@ -55,7 +56,7 @@ protected override void OnInitialize(RabbitMQBindingConfiguration configuration) { RoutingKey = configuration.RoutingKey ?? string.Empty; Arguments = configuration.Arguments?.ToImmutableDictionary(kv => kv.Key, kv => (object?)kv.Value) ?? ImmutableDictionary.Empty; - AutoProvision = configuration.AutoProvision ?? true; + AutoProvision = configuration.AutoProvision; } protected override void OnComplete(RabbitMQBindingConfiguration configuration) @@ -96,7 +97,7 @@ protected override void OnInitialize(RabbitMQBindingConfiguration configuration) { RoutingKey = configuration.RoutingKey ?? string.Empty; Arguments = configuration.Arguments?.ToImmutableDictionary(kv => kv.Key, kv => (object?)kv.Value) ?? ImmutableDictionary.Empty; - AutoProvision = configuration.AutoProvision ?? true; + AutoProvision = configuration.AutoProvision; } protected override void OnComplete(RabbitMQBindingConfiguration configuration) diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQExchange.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQExchange.cs index 5780e767832..56f0aeed476 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQExchange.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQExchange.cs @@ -22,8 +22,9 @@ public sealed class RabbitMQExchange : TopologyResource /// Gets a value indicating whether this exchange is automatically provisioned during topology setup. + /// When null, the transport-level default is used. /// - public bool AutoProvision { get; private set; } + public bool? AutoProvision { get; private set; } /// /// Gets the exchange type (e.g., "direct", "fanout", "topic", "headers"). @@ -58,7 +59,7 @@ protected override void OnInitialize(RabbitMQExchangeConfiguration configuration Type = configuration.Type ?? "fanout"; AutoDelete = configuration.AutoDelete ?? false; Arguments = configuration.Arguments?.ToImmutableDictionary(kv => kv.Key, kv => (object?)kv.Value) ?? ImmutableDictionary.Empty; - AutoProvision = configuration.AutoProvision ?? true; + AutoProvision = configuration.AutoProvision; } protected override void OnComplete(RabbitMQExchangeConfiguration configuration) diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQMessagingTopology.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQMessagingTopology.cs index 685b3d968bf..630ae3d3bcb 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQMessagingTopology.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQMessagingTopology.cs @@ -7,7 +7,8 @@ namespace Mocha.Transport.RabbitMQ; public sealed class RabbitMQMessagingTopology( RabbitMQMessagingTransport transport, Uri baseAddress, - RabbitMQBusDefaults defaults) + RabbitMQBusDefaults defaults, + bool autoProvision) : MessagingTopology(transport, baseAddress) { private readonly object _lock = new(); @@ -15,6 +16,12 @@ public sealed class RabbitMQMessagingTopology( private readonly List _queues = []; private readonly List _bindings = []; + /// + /// Gets a value indicating whether topology resources should be auto-provisioned by default. + /// Individual resources may override this setting via their own AutoProvision property. + /// + public bool AutoProvision => autoProvision; + /// /// Gets the list of exchanges registered in this topology. /// diff --git a/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQQueue.cs b/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQQueue.cs index f4ebc3f7e9e..2ab3067f0bf 100644 --- a/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQQueue.cs +++ b/src/Mocha/src/Mocha.Transport.RabbitMQ/Topology/RabbitMQQueue.cs @@ -27,8 +27,9 @@ public sealed class RabbitMQQueue : TopologyResource /// /// Gets a value indicating whether this queue is automatically provisioned during topology setup. + /// When null, the transport-level default is used. /// - public bool AutoProvision { get; private set; } + public bool? AutoProvision { get; private set; } /// /// Gets a value indicating whether this queue survives broker restarts. @@ -58,7 +59,7 @@ protected override void OnInitialize(RabbitMQQueueConfiguration configuration) Exclusive = configuration.Exclusive ?? false; AutoDelete = configuration.AutoDelete ?? false; Arguments = configuration.Arguments?.ToImmutableDictionary(kv => kv.Key, kv => (object?)kv.Value) ?? ImmutableDictionary.Empty; - AutoProvision = configuration.AutoProvision ?? true; + AutoProvision = configuration.AutoProvision; } protected override void OnComplete(RabbitMQQueueConfiguration configuration) diff --git a/src/Mocha/src/Mocha/Endpoints/Configurations/ReceiveEndpointConfiguration.cs b/src/Mocha/src/Mocha/Endpoints/Configurations/ReceiveEndpointConfiguration.cs index 3cdea4aad8f..060b0b8d3af 100644 --- a/src/Mocha/src/Mocha/Endpoints/Configurations/ReceiveEndpointConfiguration.cs +++ b/src/Mocha/src/Mocha/Endpoints/Configurations/ReceiveEndpointConfiguration.cs @@ -38,7 +38,7 @@ public class ReceiveEndpointConfiguration : MessagingConfiguration /// /// Gets or sets whether the transport should automatically provision infrastructure for this endpoint. /// - public bool? AutoProvision { get; set; } = false; + public bool? AutoProvision { get; set; } /// /// Gets or sets the maximum number of messages that can be processed concurrently on this endpoint. diff --git a/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/AutoProvisionTests.cs b/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/AutoProvisionTests.cs new file mode 100644 index 00000000000..6ec4400d323 --- /dev/null +++ b/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/AutoProvisionTests.cs @@ -0,0 +1,321 @@ +using Microsoft.Extensions.DependencyInjection; +using Mocha.Transport.RabbitMQ.Tests.Helpers; + +namespace Mocha.Transport.RabbitMQ.Tests; + +public class AutoProvisionTests +{ + [Fact] + public void AutoProvision_Should_BeTrue_When_NotConfigured() + { + // arrange & act + var (_, _, topology) = CreateTopology(_ => { }); + + // assert + Assert.True(topology.AutoProvision); + } + + [Fact] + public void AutoProvision_Should_BeFalse_When_AutoProvisionSetToFalse() + { + // arrange & act + var (_, _, topology) = CreateTopology(t => t.AutoProvision(false)); + + // assert + Assert.False(topology.AutoProvision); + } + + [Fact] + public void AutoProvision_Should_BeTrue_When_AutoProvisionSetToTrue() + { + // arrange & act + var (_, _, topology) = CreateTopology(t => t.AutoProvision(true)); + + // assert + Assert.True(topology.AutoProvision); + } + + [Fact] + public void Exchange_AutoProvision_Should_BeNull_When_NotSet() + { + // arrange & act + var (_, _, topology) = CreateTopology(t => t.DeclareExchange("ex1")); + + // assert — null means it inherits the topology default + var exchange = topology.Exchanges.Single(e => e.Name == "ex1"); + Assert.Null(exchange.AutoProvision); + } + + [Fact] + public void Exchange_AutoProvision_Should_BeTrue_When_ExplicitlyEnabled() + { + // arrange & act + var (_, _, topology) = CreateTopology(t => + { + t.AutoProvision(false); + t.DeclareExchange("ex1").AutoProvision(true); + }); + + // assert — explicit true overrides topology false + var exchange = topology.Exchanges.Single(e => e.Name == "ex1"); + Assert.True(exchange.AutoProvision); + } + + [Fact] + public void Exchange_AutoProvision_Should_BeFalse_When_ExplicitlyDisabled() + { + // arrange & act + var (_, _, topology) = CreateTopology(t => + { + t.AutoProvision(true); + t.DeclareExchange("ex1").AutoProvision(false); + }); + + // assert — explicit false overrides topology true + var exchange = topology.Exchanges.Single(e => e.Name == "ex1"); + Assert.False(exchange.AutoProvision); + } + + [Fact] + public void Queue_AutoProvision_Should_BeNull_When_NotSet() + { + // arrange & act + var (_, _, topology) = CreateTopology(t => t.DeclareQueue("q1")); + + // assert + var queue = topology.Queues.Single(q => q.Name == "q1"); + Assert.Null(queue.AutoProvision); + } + + [Fact] + public void Queue_AutoProvision_Should_BeTrue_When_ExplicitlyEnabled() + { + // arrange & act + var (_, _, topology) = CreateTopology(t => + { + t.AutoProvision(false); + t.DeclareQueue("q1").AutoProvision(true); + }); + + // assert + var queue = topology.Queues.Single(q => q.Name == "q1"); + Assert.True(queue.AutoProvision); + } + + [Fact] + public void Queue_AutoProvision_Should_BeFalse_When_ExplicitlyDisabled() + { + // arrange & act + var (_, _, topology) = CreateTopology(t => + { + t.AutoProvision(true); + t.DeclareQueue("q1").AutoProvision(false); + }); + + // assert + var queue = topology.Queues.Single(q => q.Name == "q1"); + Assert.False(queue.AutoProvision); + } + + [Fact] + public void Binding_AutoProvision_Should_BeNull_When_NotSet() + { + // arrange & act + var (_, _, topology) = CreateTopology(t => + { + t.DeclareExchange("ex1"); + t.DeclareQueue("q1"); + t.DeclareBinding("ex1", "q1"); + }); + + // assert + var binding = Assert.Single(topology.Bindings); + Assert.Null(binding.AutoProvision); + } + + [Fact] + public void Binding_AutoProvision_Should_BeTrue_When_ExplicitlyEnabled() + { + // arrange & act + var (_, _, topology) = CreateTopology(t => + { + t.AutoProvision(false); + t.DeclareExchange("ex1"); + t.DeclareQueue("q1"); + t.DeclareBinding("ex1", "q1").AutoProvision(true); + }); + + // assert + var binding = Assert.Single(topology.Bindings); + Assert.True(binding.AutoProvision); + } + + [Fact] + public void Binding_AutoProvision_Should_BeFalse_When_ExplicitlyDisabled() + { + // arrange & act + var (_, _, topology) = CreateTopology(t => + { + t.AutoProvision(true); + t.DeclareExchange("ex1"); + t.DeclareQueue("q1"); + t.DeclareBinding("ex1", "q1").AutoProvision(false); + }); + + // assert + var binding = Assert.Single(topology.Bindings); + Assert.False(binding.AutoProvision); + } + + [Fact] + public void Describe_Should_IncludeAutoProvisionProperty_When_TransportEnabledByDefault() + { + // arrange + var runtime = CreateRuntime(b => b.AddEventHandler()); + var transport = runtime.Transports.OfType().Single(); + + // act + var description = transport.Describe(); + + // assert — all entities should have autoProvision in their properties + Assert.NotNull(description.Topology); + foreach (var entity in description.Topology!.Entities) + { + Assert.True( + entity.Properties!.ContainsKey("autoProvision"), + $"Entity '{entity.Name}' should have autoProvision in properties"); + } + + // exchanges declared via convention get null AutoProvision → inherit topology default (true) + var exchanges = description.Topology.Entities.Where(e => e.Kind == "exchange"); + Assert.All(exchanges, e => + Assert.True((bool)e.Properties!["autoProvision"]!, + $"Exchange '{e.Name}' should report autoProvision=true")); + } + + [Fact] + public void Describe_Should_ReportAutoProvisionFalse_When_TopologyDisabled() + { + // arrange & act + var (_, transport, _) = CreateTopology(t => + { + t.AutoProvision(false); + t.DeclareExchange("ex1"); + t.DeclareQueue("q1"); + t.DeclareBinding("ex1", "q1"); + }); + + var description = transport.Describe(); + + // assert — all entities and links should have autoProvision = false + Assert.NotNull(description.Topology); + foreach (var entity in description.Topology!.Entities.Where(e => e.Name is "ex1" or "q1")) + { + Assert.False( + (bool)entity.Properties!["autoProvision"]!, + $"Entity '{entity.Name}' should report autoProvision=false"); + } + + foreach (var link in description.Topology.Links) + { + Assert.False( + (bool)link.Properties!["autoProvision"]!, + "Binding link should report autoProvision=false"); + } + } + + [Fact] + public void Describe_Should_ReportMixedAutoProvision_When_ResourceOverridesTopology() + { + // arrange + var (_, transport, _) = CreateTopology(t => + { + t.AutoProvision(false); + t.DeclareExchange("ex1").AutoProvision(true); + t.DeclareQueue("q1"); + t.DeclareBinding("ex1", "q1"); + }); + + // act + var description = transport.Describe(); + + // assert + Assert.NotNull(description.Topology); + + var exchangeEntity = description.Topology!.Entities.Single(e => e.Name == "ex1"); + Assert.True( + (bool)exchangeEntity.Properties!["autoProvision"]!, + "Exchange with explicit AutoProvision(true) should report true"); + + var queueEntity = description.Topology.Entities.Single(e => e.Name == "q1"); + Assert.False( + (bool)queueEntity.Properties!["autoProvision"]!, + "Queue with null AutoProvision should inherit topology false"); + } + + [Fact] + public void Convention_Should_PropagateAutoProvision_When_ReceiveEndpointCreated() + { + // arrange — the reply endpoint always sets AutoProvision = true + var runtime = CreateRuntime(b => + b.AddRequestHandler()); + var transport = runtime.Transports.OfType().Single(); + var topology = (RabbitMQMessagingTopology)transport.Topology; + + // act — find the reply queue (temporary, auto-provisioned) + var replyEndpoint = transport.ReceiveEndpoints + .FirstOrDefault(e => e.Kind == ReceiveEndpointKind.Reply); + + // assert + Assert.NotNull(replyEndpoint); + var replyQueueName = ((RabbitMQReceiveEndpoint)replyEndpoint).Queue?.Name; + Assert.NotNull(replyQueueName); + + var replyQueue = topology.Queues.Single(q => q.Name == replyQueueName); + Assert.True(replyQueue.AutoProvision); + } + + [Fact] + public void Descriptor_AutoProvision_Should_ReturnSelf_When_Chaining() + { + // arrange & act — just verify the builder compiles and chains correctly + var (_, transport, _) = CreateTopology(t => + t.AutoProvision(false) + .DeclareExchange("ex1") + .AutoProvision(true)); + + // assert — the last call wins + var exchange = ((RabbitMQMessagingTopology)transport.Topology) + .Exchanges.Single(e => e.Name == "ex1"); + Assert.True(exchange.AutoProvision); + } + + private static ( + MessagingRuntime Runtime, + RabbitMQMessagingTransport Transport, + RabbitMQMessagingTopology Topology) CreateTopology(Action configure) + { + var services = new ServiceCollection(); + var builder = services.AddMessageBus(); + var runtime = builder + .AddRabbitMQ(t => + { + t.ConnectionProvider(_ => new StubConnectionProvider()); + configure(t); + }) + .BuildRuntime(); + var transport = runtime.Transports.OfType().Single(); + var topology = (RabbitMQMessagingTopology)transport.Topology; + return (runtime, transport, topology); + } + + private static MessagingRuntime CreateRuntime(Action configure) + { + var services = new ServiceCollection(); + services.AddSingleton(new MessageRecorder()); + var builder = services.AddMessageBus(); + configure(builder); + var runtime = builder.AddRabbitMQ(t => t.ConnectionProvider(_ => new StubConnectionProvider())).BuildRuntime(); + return runtime; + } +} diff --git a/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Behaviors/AutoProvisionIntegrationTests.cs b/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Behaviors/AutoProvisionIntegrationTests.cs new file mode 100644 index 00000000000..bb9aa2721a9 --- /dev/null +++ b/src/Mocha/test/Mocha.Transport.RabbitMQ.Tests/Behaviors/AutoProvisionIntegrationTests.cs @@ -0,0 +1,264 @@ +using System.Collections.Concurrent; +using Microsoft.Extensions.DependencyInjection; +using Mocha.Transport.RabbitMQ.Tests.Helpers; +using RabbitMQ.Client; + +namespace Mocha.Transport.RabbitMQ.Tests.Behaviors; + +[Collection("RabbitMQ")] +public class AutoProvisionIntegrationTests +{ + private static readonly TimeSpan s_timeout = TimeSpan.FromSeconds(30); + private readonly RabbitMQFixture _fixture; + + public AutoProvisionIntegrationTests(RabbitMQFixture fixture) + { + _fixture = fixture; + } + + [Fact] + public async Task PublishAsync_Should_Deliver_When_AutoProvisionEnabledByDefault() + { + // arrange — default auto-provision (true) + var recorder = new MessageRecorder(); + await using var vhost = await _fixture.CreateVhostAsync(); + await using var bus = await new ServiceCollection() + .AddSingleton(vhost.ConnectionFactory) + .AddSingleton(recorder) + .AddMessageBus() + .AddEventHandler() + .AddRabbitMQ() + .BuildTestBusAsync(); + + using var scope = bus.Provider.CreateScope(); + var messageBus = scope.ServiceProvider.GetRequiredService(); + + // act + await messageBus.PublishAsync(new OrderCreated { OrderId = "AP-1" }, CancellationToken.None); + + // assert — message is delivered because topology was auto-provisioned + Assert.True(await recorder.WaitAsync(s_timeout), "Handler did not receive the event"); + var order = Assert.IsType(Assert.Single(recorder.Messages)); + Assert.Equal("AP-1", order.OrderId); + } + + [Fact] + public async Task PublishAsync_Should_Deliver_When_AutoProvisionExplicitlyEnabled() + { + // arrange — explicit auto-provision true + var recorder = new MessageRecorder(); + await using var vhost = await _fixture.CreateVhostAsync(); + await using var bus = await new ServiceCollection() + .AddSingleton(vhost.ConnectionFactory) + .AddSingleton(recorder) + .AddMessageBus() + .AddEventHandler() + .AddRabbitMQ(t => t.AutoProvision(true)) + .BuildTestBusAsync(); + + using var scope = bus.Provider.CreateScope(); + var messageBus = scope.ServiceProvider.GetRequiredService(); + + // act + await messageBus.PublishAsync(new OrderCreated { OrderId = "AP-2" }, CancellationToken.None); + + // assert + Assert.True(await recorder.WaitAsync(s_timeout), "Handler did not receive the event"); + var order = Assert.IsType(Assert.Single(recorder.Messages)); + Assert.Equal("AP-2", order.OrderId); + } + + [Fact] + public async Task SendAsync_Should_Deliver_When_AutoProvisionEnabledByDefault() + { + // arrange + var recorder = new MessageRecorder(); + await using var vhost = await _fixture.CreateVhostAsync(); + await using var bus = await new ServiceCollection() + .AddSingleton(vhost.ConnectionFactory) + .AddSingleton(recorder) + .AddMessageBus() + .AddRequestHandler() + .AddRabbitMQ() + .BuildTestBusAsync(); + + using var scope = bus.Provider.CreateScope(); + var messageBus = scope.ServiceProvider.GetRequiredService(); + + // act + await messageBus.SendAsync(new ProcessPayment { OrderId = "AP-3", Amount = 42.00m }, CancellationToken.None); + + // assert + Assert.True(await recorder.WaitAsync(s_timeout), "Handler did not receive the request"); + var payment = Assert.IsType(Assert.Single(recorder.Messages)); + Assert.Equal("AP-3", payment.OrderId); + } + + [Fact] + public async Task ExplicitTopology_Should_Deliver_When_AutoProvisionEnabledOnResources() + { + // arrange — transport auto-provision disabled, but individual resources enabled + var capture = new OrderCapture(); + await using var vhost = await _fixture.CreateVhostAsync(); + await using var bus = await new ServiceCollection() + .AddSingleton(vhost.ConnectionFactory) + .AddSingleton(capture) + .AddMessageBus() + .AddConsumer() + .AddRabbitMQ(t => + { + t.AutoProvision(false); + t.BindHandlersExplicitly(); + t.DeclareExchange("ap-ex").AutoProvision(true); + t.DeclareQueue("ap-q").AutoProvision(true); + t.DeclareBinding("ap-ex", "ap-q").AutoProvision(true); + + t.Endpoint("ap-ep").Consumer().Queue("ap-q"); + t.DispatchEndpoint("ap-dispatch").ToExchange("ap-ex").Publish(); + }) + .BuildTestBusAsync(); + + using var scope = bus.Provider.CreateScope(); + var messageBus = scope.ServiceProvider.GetRequiredService(); + + // act + await messageBus.PublishAsync(new OrderCreated { OrderId = "AP-4" }, CancellationToken.None); + + // assert — resources were explicitly enabled, so message should be delivered + Assert.True(await capture.WaitAsync(s_timeout), "Consumer did not receive the message"); + var message = Assert.Single(capture.Messages); + Assert.Equal("AP-4", message.OrderId); + } + + [Fact] + public async Task ExplicitTopology_Should_Deliver_When_PreProvisionedAndAutoProvisionDisabled() + { + // arrange — pre-provision resources manually, then disable auto-provision + var capture = new OrderCapture(); + await using var vhost = await _fixture.CreateVhostAsync(); + + // Pre-provision resources on the broker + await using (var connection = await vhost.ConnectionFactory.CreateConnectionAsync()) + await using (var channel = await connection.CreateChannelAsync()) + { + await channel.ExchangeDeclareAsync("pre-ex", "fanout", durable: true); + await channel.QueueDeclareAsync("pre-q", durable: true, exclusive: false, autoDelete: false); + await channel.QueueBindAsync("pre-q", "pre-ex", ""); + } + + await using var bus = await new ServiceCollection() + .AddSingleton(vhost.ConnectionFactory) + .AddSingleton(capture) + .AddMessageBus() + .AddConsumer() + .AddRabbitMQ(t => + { + t.AutoProvision(false); + t.BindHandlersExplicitly(); + t.DeclareExchange("pre-ex"); + t.DeclareQueue("pre-q"); + t.DeclareBinding("pre-ex", "pre-q"); + + t.Endpoint("pre-ep").Consumer().Queue("pre-q"); + t.DispatchEndpoint("pre-dispatch").ToExchange("pre-ex").Publish(); + }) + .BuildTestBusAsync(); + + using var scope = bus.Provider.CreateScope(); + var messageBus = scope.ServiceProvider.GetRequiredService(); + + // act + await messageBus.PublishAsync(new OrderCreated { OrderId = "AP-5" }, CancellationToken.None); + + // assert — pre-provisioned resources work with auto-provision disabled + Assert.True(await capture.WaitAsync(s_timeout), "Consumer did not receive the message"); + var message = Assert.Single(capture.Messages); + Assert.Equal("AP-5", message.OrderId); + } + + [Fact] + public async Task ExplicitTopology_Should_Deliver_When_MixedAutoProvision() + { + // arrange — transport enabled, some resources disabled but pre-provisioned + var capture = new OrderCapture(); + await using var vhost = await _fixture.CreateVhostAsync(); + + // Pre-provision only the exchange (with auto-provision disabled for it) + await using (var connection = await vhost.ConnectionFactory.CreateConnectionAsync()) + await using (var channel = await connection.CreateChannelAsync()) + { + await channel.ExchangeDeclareAsync("mixed-ex", "fanout", durable: true); + } + + await using var bus = await new ServiceCollection() + .AddSingleton(vhost.ConnectionFactory) + .AddSingleton(capture) + .AddMessageBus() + .AddConsumer() + .AddRabbitMQ(t => + { + t.AutoProvision(true); + t.BindHandlersExplicitly(); + t.DeclareExchange("mixed-ex").AutoProvision(false); // already exists + t.DeclareQueue("mixed-q"); // will be auto-provisioned (inherits true) + t.DeclareBinding("mixed-ex", "mixed-q"); // will be auto-provisioned + + t.Endpoint("mixed-ep").Consumer().Queue("mixed-q"); + t.DispatchEndpoint("mixed-dispatch").ToExchange("mixed-ex").Publish(); + }) + .BuildTestBusAsync(); + + using var scope = bus.Provider.CreateScope(); + var messageBus = scope.ServiceProvider.GetRequiredService(); + + // act + await messageBus.PublishAsync(new OrderCreated { OrderId = "AP-6" }, CancellationToken.None); + + // assert + Assert.True(await capture.WaitAsync(s_timeout), "Consumer did not receive the message"); + var message = Assert.Single(capture.Messages); + Assert.Equal("AP-6", message.OrderId); + } + + public sealed class ProcessPaymentHandler(MessageRecorder recorder) : IEventRequestHandler + { + public ValueTask HandleAsync(ProcessPayment request, CancellationToken cancellationToken) + { + recorder.Record(request); + return default; + } + } + + public sealed class OrderCapture + { + private readonly SemaphoreSlim _semaphore = new(0); + public ConcurrentBag Messages { get; } = []; + + public void Record(IConsumeContext context) + { + Messages.Add(context.Message); + _semaphore.Release(); + } + + public async Task WaitAsync(TimeSpan timeout, int expectedCount = 1) + { + for (var i = 0; i < expectedCount; i++) + { + if (!await _semaphore.WaitAsync(timeout)) + { + return false; + } + } + return true; + } + } + + public sealed class OrderSpyConsumer(OrderCapture capture) : IConsumer + { + public ValueTask ConsumeAsync(IConsumeContext context) + { + capture.Record(context); + return default; + } + } +} diff --git a/website/src/docs/mocha/v1/transports/rabbitmq.md b/website/src/docs/mocha/v1/transports/rabbitmq.md index 189364cc2b7..15837b95361 100644 --- a/website/src/docs/mocha/v1/transports/rabbitmq.md +++ b/website/src/docs/mocha/v1/transports/rabbitmq.md @@ -278,6 +278,116 @@ builder.Services All explicitly declared topology is provisioned when the transport starts, before receive endpoints begin consuming. +# Control auto-provisioning + +By default, the transport auto-provisions all topology resources (exchanges, queues, bindings) on the broker at startup. In production environments where infrastructure is managed externally - for example by Terraform, Ansible, or the [RabbitMQ Messaging Topology Operator](https://www.rabbitmq.com/kubernetes/operator/install-topology-operator) on Kubernetes - you can disable auto-provisioning so the transport expects resources to already exist. + +## Disable globally + +Turn off auto-provisioning for the entire transport: + +```csharp +builder.Services + .AddMessageBus() + .AddEventHandler() + .AddRabbitMQ(transport => + { + transport.AutoProvision(false); + }); +``` + +With auto-provisioning disabled, the transport will not create any exchanges, queues, or bindings. All resources must already exist on the broker before the transport starts. + +## Override per resource + +Individual resources can override the transport-level setting. This is useful when most topology is managed externally but a few resources need to be created dynamically: + +```csharp +builder.Services + .AddMessageBus() + .AddRabbitMQ(transport => + { + // Disable globally + transport.AutoProvision(false); + + // This exchange already exists on the broker - skip provisioning + transport.DeclareExchange("order-events"); + + // This queue should be created by the transport + transport.DeclareQueue("billing-orders") + .AutoProvision(true); + + // This binding should also be created + transport.DeclareBinding("order-events", "billing-orders") + .AutoProvision(true); + }); +``` + +The effective auto-provision value for each resource follows a cascading pattern: + +| Resource setting | Transport setting | Result | +| ---------------- | ----------------- | --------------- | +| `true` | any | Provisioned | +| `false` | any | Not provisioned | +| not set | `true` (default) | Provisioned | +| not set | `false` | Not provisioned | + +When a resource does not specify `AutoProvision`, it inherits the transport-level default. When the transport does not specify `AutoProvision`, it defaults to `true`. + +## Common patterns + +**Fully managed infrastructure:** Disable auto-provisioning globally and declare all resources without `AutoProvision`. The transport will use existing broker resources without attempting to create them. + +```csharp +transport.AutoProvision(false); +transport.DeclareExchange("order-events"); +transport.DeclareQueue("billing-orders"); +transport.DeclareBinding("order-events", "billing-orders"); +``` + +**Selective provisioning:** Disable globally but enable for specific resources that are owned by this service. + +```csharp +transport.AutoProvision(false); +transport.DeclareExchange("shared-events"); // managed externally +transport.DeclareQueue("my-service-queue") + .AutoProvision(true); // owned by this service +transport.DeclareBinding("shared-events", "my-service-queue") + .AutoProvision(true); // owned by this service +``` + +**Kubernetes with the Messaging Topology Operator:** When the [RabbitMQ Messaging Topology Operator](https://www.rabbitmq.com/kubernetes/operator/install-topology-operator) manages your exchanges, queues, and bindings as Kubernetes custom resources, disable auto-provisioning entirely. The operator declares topology through CRDs, and the transport simply uses the existing resources: + +```yaml +# Kubernetes CRD - managed by the Messaging Topology Operator +apiVersion: rabbitmq.com/v1beta1 +kind: Queue +metadata: + name: billing-orders +spec: + name: billing-orders + durable: true + rabbitmqClusterReference: + name: my-cluster +``` + +```csharp +// Application code - topology already exists on the broker +transport.AutoProvision(false); +transport.DeclareExchange("order-events"); +transport.DeclareQueue("billing-orders"); +transport.DeclareBinding("order-events", "billing-orders"); +``` + +**Opt-out individual resources:** Keep auto-provisioning enabled but skip specific resources that are managed elsewhere. + +```csharp +transport.DeclareExchange("platform-events") + .AutoProvision(false); // managed by platform team +transport.DeclareQueue("my-queue"); // auto-provisioned (default) +transport.DeclareBinding("platform-events", "my-queue"); // auto-provisioned (default) +``` + # Prefetch and concurrency Customize queue names, prefetch counts, and handler assignments on receive endpoints: