Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions docs/guide/messaging/transports/rabbitmq/object-management.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,51 @@ return await app.RunJasperFxCommands(args);
Note that this stateful resource model is also available at the command line as well for deploy time
management.

## Externally-Owned Queues and Exchanges <Badge type="tip" text="6.6" />

Sometimes a queue or exchange your application uses is owned and managed by a *different* system,
and the identity your application connects with simply does not have the `configure` or `delete`
permissions to create or remove it. In that case you want Wolverine to *use* the queue or exchange,
but never try to declare it at startup (even with `AutoProvision()` turned on) and never delete it
during a resource teardown. Mark the endpoint as `ExternallyOwned()` to get exactly that behavior:

```csharp
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseRabbitMq()
// AutoProvision is on for the resources this app *does* own...
.AutoProvision();

// ...but this queue belongs to another team. Listen to it,
// but never declare it at startup or delete it on teardown,
// and don't try to set up or tear down its bindings.
opts.ListenToRabbitQueue("shared-orders")
.ExternallyOwned();

// Same idea for an exchange we publish to but do not own
opts.PublishMessage<OrderPlaced>()
.ToRabbitExchange("shared-events")
.ExternallyOwned();
}).StartAsync();
```

When an endpoint is marked `ExternallyOwned()`, Wolverine will:

* Skip declaring the queue or exchange at startup, regardless of `AutoProvision()`
* Skip deleting it during a `resources teardown` (or the Oakton/JasperFx resource commands)
* Skip setting up or tearing down any bindings for that resource

This applies to queue listeners, queue subscribers, and exchange subscribers alike.

::: tip
`ExternallyOwned()` is distinct from `DeclarePassive`. A `DeclarePassive` exchange still makes a
*passive* declaration against the broker at startup to verify that the resource already exists
(failing fast if it does not), whereas an externally-owned resource never touches the broker for
declaration at all. As of Wolverine 6.6, a `DeclarePassive` exchange is also left alone during
resource teardown — Wolverine will not delete a resource it only verified rather than created.
:::

## Exchange-to-Exchange Bindings

Wolverine supports [RabbitMQ exchange-to-exchange bindings](https://www.rabbitmq.com/docs/e2e), which allow you
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging.Abstractions;
using RabbitMQ.Client;
using Shouldly;
using Wolverine.ComplianceTests;
using Wolverine.RabbitMQ.Internal;
using Xunit;

namespace Wolverine.RabbitMQ.Tests;

// GH-3064: a queue or exchange marked ExternallyOwned() must never be declared (created) at startup
// nor deleted during resource teardown, even with AutoProvision on - the escape hatch for topology
// owned by another system where the calling identity lacks configure/delete permissions. Assertions
// hit the broker directly (passive declare on a separate admin connection) rather than relying on
// Wolverine's in-memory model.
public class externally_owned_rabbit_topology_is_skipped : IAsyncLifetime
{
private readonly string _externalQueue = "ext-queue-" + Guid.NewGuid().ToString("N");
private readonly string _externalExchange = "ext-exchange-" + Guid.NewGuid().ToString("N");
private readonly string _ownedQueue = "owned-queue-" + Guid.NewGuid().ToString("N");
private readonly string _passiveExchange = "passive-exchange-" + Guid.NewGuid().ToString("N");

public Task InitializeAsync() => Task.CompletedTask;

public async Task DisposeAsync()
{
try
{
await using var conn = await new ConnectionFactory { HostName = "localhost" }.CreateConnectionAsync();
await using var channel = await conn.CreateChannelAsync();
foreach (var q in new[] { _externalQueue, _ownedQueue })
{
try { await channel.QueueDeleteAsync(q, false, false); } catch { }
}
foreach (var e in new[] { _externalExchange, _passiveExchange })
{
try { await channel.ExchangeDeleteAsync(e); } catch { }
}
}
catch
{
// ignore cleanup failures
}
}

// --- broker-side existence checks via a separate admin connection ---

private static async Task<bool> QueueExistsAsync(string name)
{
await using var conn = await new ConnectionFactory { HostName = "localhost" }.CreateConnectionAsync();
await using var channel = await conn.CreateChannelAsync();
try { await channel.QueueDeclarePassiveAsync(name); return true; }
catch { return false; } // passive declare on a missing queue closes the channel with a 404
}

private static async Task<bool> ExchangeExistsAsync(string name)
{
await using var conn = await new ConnectionFactory { HostName = "localhost" }.CreateConnectionAsync();
await using var channel = await conn.CreateChannelAsync();
try { await channel.ExchangeDeclarePassiveAsync(name); return true; }
catch { return false; }
}

private static async Task PreCreateQueueAsync(string name)
{
await using var conn = await new ConnectionFactory { HostName = "localhost" }.CreateConnectionAsync();
await using var channel = await conn.CreateChannelAsync();
await channel.QueueDeclareAsync(name, durable: true, exclusive: false, autoDelete: false);
}

private static async Task PreCreateExchangeAsync(string name)
{
await using var conn = await new ConnectionFactory { HostName = "localhost" }.CreateConnectionAsync();
await using var channel = await conn.CreateChannelAsync();
await channel.ExchangeDeclareAsync(name, type: "fanout", durable: true, autoDelete: false);
}

// === Setup: externally-owned topology is never created ===

[Fact]
public async Task externally_owned_queue_is_not_created_at_startup()
{
using var host = await WolverineHost.ForAsync(opts =>
{
opts.UseRabbitMq().AutoProvision();
opts.PublishMessage<ExtMessage>().ToRabbitQueue(_externalQueue).ExternallyOwned();
});

(await QueueExistsAsync(_externalQueue)).ShouldBeFalse();
}

[Fact]
public async Task externally_owned_exchange_is_not_created_at_startup()
{
using var host = await WolverineHost.ForAsync(opts =>
{
opts.UseRabbitMq().AutoProvision();
opts.PublishMessage<ExtMessage>().ToRabbitExchange(_externalExchange).ExternallyOwned();
});

(await ExchangeExistsAsync(_externalExchange)).ShouldBeFalse();
}

[Fact]
public async Task owned_topology_is_still_created_alongside_externally_owned()
{
using var host = await WolverineHost.ForAsync(opts =>
{
opts.UseRabbitMq().AutoProvision();
opts.PublishMessage<ExtMessage>().ToRabbitQueue(_externalQueue).ExternallyOwned();
opts.PublishMessage<OwnedMessage>().ToRabbitQueue(_ownedQueue); // owned -> should be created
});

(await QueueExistsAsync(_ownedQueue)).ShouldBeTrue();
(await QueueExistsAsync(_externalQueue)).ShouldBeFalse();
}

// === Teardown: externally-owned topology survives ===

[Fact]
public async Task teardown_leaves_an_externally_owned_listener_queue_alone()
{
await PreCreateQueueAsync(_externalQueue);

using var host = await WolverineHost.ForAsync(opts =>
{
opts.UseRabbitMq();
opts.ListenToRabbitQueue(_externalQueue).ExternallyOwned();
});

var queue = host.Get<WolverineOptions>().RabbitMqTransport().Queues[_externalQueue];
await queue.TeardownAsync(NullLogger.Instance);

(await QueueExistsAsync(_externalQueue)).ShouldBeTrue();
}

[Fact]
public async Task teardown_leaves_an_externally_owned_exchange_alone()
{
await PreCreateExchangeAsync(_externalExchange);

using var host = await WolverineHost.ForAsync(opts =>
{
opts.UseRabbitMq();
opts.PublishMessage<ExtMessage>().ToRabbitExchange(_externalExchange).ExternallyOwned();
});

var exchange = host.Get<WolverineOptions>().RabbitMqTransport().Exchanges[_externalExchange];
await exchange.TeardownAsync(NullLogger.Instance);

(await ExchangeExistsAsync(_externalExchange)).ShouldBeTrue();
}

// === Q3: DeclarePassive exchanges must also survive teardown (no longer deleted) ===

[Fact]
public async Task teardown_leaves_a_declare_passive_exchange_alone()
{
await PreCreateExchangeAsync(_passiveExchange);

using var host = await WolverineHost.ForAsync(opts =>
{
opts.PublishMessage<ExtMessage>().ToRabbitExchange(_passiveExchange, ex => ex.DeclarePassive = true);
});

var exchange = host.Get<WolverineOptions>().RabbitMqTransport().Exchanges[_passiveExchange];
await exchange.TeardownAsync(NullLogger.Instance);

(await ExchangeExistsAsync(_passiveExchange)).ShouldBeTrue();
}
}

public record ExtMessage(string Name);

public record OwnedMessage(string Name);
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ internal RabbitMqEndpoint(Uri uri, EndpointRole role, RabbitMqTransport parent)

public string ExchangeName { get; protected set; } = string.Empty;

/// <summary>
/// When <c>true</c>, Wolverine treats this queue or exchange as owned by an external system: it
/// will not declare (create) it during startup or delete it during <c>resources teardown</c>, even
/// when <c>AutoProvision()</c> is enabled on the parent transport. Bindings owned by an
/// externally-owned queue/exchange are likewise left untouched. Use this when the calling identity
/// lacks the <c>configure</c>/<c>delete</c> permissions for the resource. Default is <c>false</c>.
/// See https://github.com/JasperFx/wolverine/issues/3064.
/// </summary>
public bool IsExternallyOwned { get; set; }

public abstract ValueTask<bool> CheckAsync();
public abstract ValueTask TeardownAsync(ILogger logger);
public abstract ValueTask SetupAsync(ILogger logger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public override async ValueTask InitializeAsync(ILogger logger)
return;
}

if (_parent.AutoProvision && !DisableAutoProvision)
if (_parent.AutoProvision && !DisableAutoProvision && !IsExternallyOwned)
{
await _parent.WithAdminChannelAsync(model => DeclareAsync(model, logger));
}
Expand All @@ -142,7 +142,9 @@ internal override string RoutingKey()

internal async Task DeclareAsync(IChannel channel, ILogger logger)
{
if (DeclaredName == string.Empty)
// Externally-owned exchanges are declared/managed by another system; don't touch the broker
// here at all (neither the exchange nor its source-exchange bindings below). GH-3064.
if (DeclaredName == string.Empty || IsExternallyOwned)
{
return;
}
Expand Down Expand Up @@ -195,6 +197,15 @@ public override async ValueTask<bool> CheckAsync()

public override async ValueTask TeardownAsync(ILogger logger)
{
// Don't delete an exchange we don't own. IsExternallyOwned is the explicit flag for that;
// DeclarePassive is honored here too — it means "only verify existence, never create" at setup,
// so deleting on teardown would be asymmetric and would destroy a resource the caller chose not
// to create. Bindings are likewise left alone. GH-3064 (DeclarePassive teardown fix included).
if (IsExternallyOwned || DeclarePassive)
{
return;
}

await _parent.WithAdminChannelAsync(async channel =>
{
foreach (var binding in _exchangeBindings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ public override async ValueTask<bool> CheckAsync()

public override async ValueTask TeardownAsync(ILogger logger)
{
// This is a reply uri owned by another node, so get out of here
if (isSystemQueue() || AutoDelete)
// This is a reply uri owned by another node, so get out of here. Externally-owned queues
// belong to another system and must not be deleted (nor their bindings torn down). GH-3064.
if (isSystemQueue() || AutoDelete || IsExternallyOwned)
{
return;
}
Expand All @@ -133,7 +134,8 @@ await _parent.WithAdminChannelAsync(async channel =>

public override async ValueTask SetupAsync(ILogger logger)
{
if (isSystemQueue())
// Externally-owned queues are declared/managed by another system; don't try to create them. GH-3064.
if (isSystemQueue() || IsExternallyOwned)
{
return;
}
Expand Down Expand Up @@ -270,7 +272,9 @@ internal async ValueTask InitializeAsync(IChannel channel, ILogger logger)

if (_parent.AutoProvision || _parent.AutoPurgeAllQueues || PurgeOnStartup)
{
if (_parent.AutoProvision)
// Externally-owned queues (and their bindings) are managed by another system; skip the
// declare even when AutoProvision is on so startup doesn't fail without configure ACLs. GH-3064.
if (_parent.AutoProvision && !IsExternallyOwned)
{
await DeclareAsync(channel, logger);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ public bool DeclarePassive
get => _exchange.DeclarePassive;
set => _exchange.DeclarePassive = value;
}

/// <summary>
/// When <c>true</c>, Wolverine treats this exchange as owned by an external system: it will not
/// declare (create) it at startup or delete it during <c>resources teardown</c>, even when
/// <c>AutoProvision()</c> is enabled, and will not set up or tear down its bindings. Use this when
/// the calling identity lacks the <c>configure</c>/<c>delete</c> permissions for the exchange.
/// Distinct from <see cref="DeclarePassive"/>, which still touches the broker to verify existence
/// at startup. See https://github.com/JasperFx/wolverine/issues/3064.
/// </summary>
public bool IsExternallyOwned
{
get => _exchange.IsExternallyOwned;
set => _exchange.IsExternallyOwned = value;
}

public IDictionary<string, object?> Arguments => _exchange.Arguments;
public TopicBindingExchange BindTopic(string topicPattern)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,18 @@ public RabbitMqListenerConfiguration PreFetchCount(ushort count)
return this;
}

/// <summary>
/// Mark this queue as owned by an external system. Wolverine will not declare (create) the queue
/// at startup or delete it during <c>resources teardown</c>, even when <c>AutoProvision()</c> is
/// enabled, and will not set up or tear down its bindings. Use this when the calling identity lacks
/// the <c>configure</c>/<c>delete</c> permissions for the queue. See https://github.com/JasperFx/wolverine/issues/3064.
/// </summary>
public RabbitMqListenerConfiguration ExternallyOwned()
{
add(e => e.IsExternallyOwned = true);
return this;
}

/// <summary>
/// Use a custom interoperability strategy to map Wolverine messages to an upstream
/// system's protocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,19 @@ public RabbitMqSubscriberConfiguration DisableDeadLetterQueueing()

return this;
}

/// <summary>
/// Mark the target queue or exchange as owned by an external system. Wolverine will not declare
/// (create) it at startup or delete it during <c>resources teardown</c>, even when
/// <c>AutoProvision()</c> is enabled, and will not set up or tear down its bindings. Use this when
/// the calling identity lacks the <c>configure</c>/<c>delete</c> permissions for the resource.
/// See https://github.com/JasperFx/wolverine/issues/3064.
/// </summary>
public RabbitMqSubscriberConfiguration ExternallyOwned()
{
add(e => e.IsExternallyOwned = true);
return this;
}
}

public class RabbitMqExchangeConfiguration : InteroperableSubscriberConfiguration<RabbitMqExchangeConfiguration, RabbitMqExchange, IRabbitMqEnvelopeMapper, RabbitMqEnvelopeMapper>
Expand Down Expand Up @@ -101,6 +114,23 @@ public RabbitMqExchangeConfiguration UseNServiceBusInterop()
return this;
}

/// <summary>
/// Modify the exchange type, the default is fan out
/// </summary>
/// <param name="exchangeType"></param>
/// <returns></returns>
/// <summary>
/// Mark this exchange as owned by an external system. Wolverine will not declare (create) it at
/// startup or delete it during <c>resources teardown</c>, even when <c>AutoProvision()</c> is
/// enabled, and will not set up or tear down its bindings. Use this when the calling identity lacks
/// the <c>configure</c>/<c>delete</c> permissions for the exchange. See https://github.com/JasperFx/wolverine/issues/3064.
/// </summary>
public RabbitMqExchangeConfiguration ExternallyOwned()
{
add(e => e.IsExternallyOwned = true);
return this;
}

/// <summary>
/// Modify the exchange type, the default is fan out
/// </summary>
Expand Down
Loading