Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace Mocha.Transport.RabbitMQ;

/// <summary>
/// Defines bus-level defaults that are applied to all auto-provisioned queues and exchanges
/// when they are created by topology conventions.
/// </summary>
public sealed class RabbitMQBusDefaults
{
/// <summary>
/// Gets or sets the default queue configuration that is applied to all auto-provisioned queues.
/// Individual queue settings will override these defaults.
/// </summary>
public RabbitMQDefaultQueueOptions Queue { get; set; } = new();

/// <summary>
/// Gets or sets the default exchange configuration that is applied to all auto-provisioned exchanges.
/// Individual exchange settings will override these defaults.
/// </summary>
public RabbitMQDefaultExchangeOptions Exchange { get; set; } = new();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
namespace Mocha.Transport.RabbitMQ;

/// <summary>
/// Default options for exchanges created by topology conventions.
/// </summary>
public sealed class RabbitMQDefaultExchangeOptions
{
/// <summary>
/// Gets or sets the default exchange type.
/// When set, all auto-provisioned exchanges will use this type unless explicitly overridden.
/// </summary>
public string? Type { get; set; }

/// <summary>
/// Gets or sets whether exchanges are durable by default.
/// Default is null (uses the RabbitMQ default of true).
/// </summary>
public bool? Durable { get; set; }

/// <summary>
/// Gets or sets whether exchanges are auto-deleted by default.
/// Default is null (uses the RabbitMQ default of false).
/// </summary>
public bool? AutoDelete { get; set; }

/// <summary>
/// Gets or sets additional default arguments applied to all auto-provisioned exchanges.
/// </summary>
public Dictionary<string, object> Arguments { get; set; } = new();

/// <summary>
/// Applies these defaults to an exchange configuration, without overriding explicitly set values.
/// </summary>
internal void ApplyTo(RabbitMQExchangeConfiguration configuration)
{
configuration.Type ??= Type;
configuration.Durable ??= Durable;
configuration.AutoDelete ??= AutoDelete;

if (Arguments.Count > 0)
{
configuration.Arguments ??= new Dictionary<string, object>();

foreach (var (key, value) in Arguments)
{
configuration.Arguments.TryAdd(key, value);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
namespace Mocha.Transport.RabbitMQ;

/// <summary>
/// Default options for queues created by topology conventions.
/// </summary>
public sealed class RabbitMQDefaultQueueOptions
{
/// <summary>
/// Gets or sets the default queue type (Classic, Quorum, or Stream).
/// When set, all auto-provisioned queues will use this queue type unless explicitly overridden.
/// </summary>
public string? QueueType { get; set; }

/// <summary>
/// Gets or sets whether queues are durable by default.
/// Default is null (uses the RabbitMQ default of true).
/// </summary>
public bool? Durable { get; set; }

/// <summary>
/// Gets or sets whether queues are auto-deleted by default.
/// Default is null (uses the RabbitMQ default of false).
/// </summary>
public bool? AutoDelete { get; set; }

/// <summary>
/// Gets or sets additional default arguments applied to all auto-provisioned queues.
/// </summary>
public Dictionary<string, object> Arguments { get; set; } = new();

/// <summary>
/// Applies these defaults to a queue configuration, without overriding explicitly set values.
/// Quorum and stream queue types are not applied to queues that have auto-delete or exclusive
/// set, since those properties are incompatible with these queue types. Default arguments are
/// also skipped for incompatible queues because they may contain queue-type-specific settings
/// (e.g. <c>x-delivery-limit</c> is only valid for quorum queues).
/// </summary>
internal void ApplyTo(RabbitMQQueueConfiguration configuration)
{
configuration.Durable ??= Durable;
configuration.AutoDelete ??= AutoDelete;

Comment thread
PascalSenn marked this conversation as resolved.
// Quorum and stream queues do not support auto-delete or exclusive properties.
// Skip applying queue type and default arguments for incompatible configurations
// (e.g. reply queues) since default arguments are often queue-type-specific.
var isIncompatibleWithQueueType =
configuration.AutoDelete is true || configuration.Exclusive is true;

if (isIncompatibleWithQueueType)
{
return;
}

if (Arguments.Count > 0 || QueueType is not null)
{
configuration.Arguments ??= new Dictionary<string, object>();

if (QueueType is not null)
{
configuration.Arguments.TryAdd("x-queue-type", QueueType);
}

foreach (var (key, value) in Arguments)
{
configuration.Arguments.TryAdd(key, value);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public RabbitMQTransportConfiguration()
/// Gets or sets the explicitly declared bindings for this transport.
/// </summary>
public List<RabbitMQBindingConfiguration> Bindings { get; set; } = [];

/// <summary>
/// Gets or sets the bus-level defaults applied to all auto-provisioned queues and exchanges.
/// </summary>
public RabbitMQBusDefaults Defaults { get; set; } = new();
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ public interface IRabbitMQMessagingTransportDescriptor
IRabbitMQMessagingTransportDescriptor ConnectionProvider(
Func<IServiceProvider, IRabbitMQConnectionProvider> connectionFactory);

/// <summary>
/// Configures bus-level defaults that are applied to all auto-provisioned queues and exchanges.
/// </summary>
/// <param name="configure">A delegate that configures the bus defaults.</param>
/// <returns>The descriptor for method chaining.</returns>
IRabbitMQMessagingTransportDescriptor ConfigureDefaults(Action<RabbitMQBusDefaults> configure);

/// <summary>
/// Gets or creates a receive endpoint descriptor with the specified name.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,14 @@ public IRabbitMQMessagingTransportDescriptor ConnectionProvider(
return this;
}

/// <inheritdoc />
public IRabbitMQMessagingTransportDescriptor ConfigureDefaults(Action<RabbitMQBusDefaults> configure)
{
configure(Configuration.Defaults);

return this;
}

/// <inheritdoc />
public IRabbitMQReceiveEndpointDescriptor Endpoint(string name)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public MessageEnvelope Parse(BasicDeliverEventArgs eventArgs)
MessageType = props.Type ?? props.Headers?.GetString(RabbitMQMessageHeaders.MessageType),
SentAt = sentAt,
DeliverBy = ParseExpiration(props.Expiration, sentAt),
// TODO quorum queues can use x-delivery-count instead of redelivered!
DeliveryCount = eventArgs.Redelivered ? 1 : 0,
DeliveryCount = GetDeliveryCount(props.Headers, eventArgs.Redelivered),
Headers = BuildHeaders(props.Headers),
EnclosedMessageTypes = props.Headers?.GetStringArray(RabbitMQMessageHeaders.EnclosedMessageTypes) ?? [],
Body = eventArgs.Body
Expand All @@ -48,6 +47,22 @@ public MessageEnvelope Parse(BasicDeliverEventArgs eventArgs)
return envelope;
}

/// <summary>
/// Returns the delivery count from the quorum queue <c>x-delivery-count</c> header when
/// available; otherwise falls back to the classic queue <c>Redelivered</c> flag.
/// </summary>
private static int GetDeliveryCount(IDictionary<string, object?>? headers, bool redelivered)
{
if (headers is not null
&& headers.TryGetValue("x-delivery-count", out var value)
&& value is long count)
{
return count > int.MaxValue ? int.MaxValue : (int)count;
}

return redelivered ? 1 : 0;
}

private static DateTimeOffset? ParseExpiration(string? expiration, DateTimeOffset? sentAt)
{
if (string.IsNullOrEmpty(expiration) || !long.TryParse(expiration, out var ms))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ protected override void OnAfterInitialized(IMessagingSetupContext context)
Port = Connection.Port,
Path = Connection.VirtualHost
};
_topology = new RabbitMQMessagingTopology(this, builder.Uri);

_topology = new RabbitMQMessagingTopology(this, builder.Uri, configuration.Defaults);

foreach (var exchange in configuration.Exchanges)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public sealed class RabbitMQQueueConfiguration : TopologyConfiguration<RabbitMQM
/// When true, the queue is persisted to disk and will be restored after a broker restart.
/// Default is true.
/// </summary>
public bool? Durable { get; set; } = true;
public bool? Durable { get; set; }

/// <summary>
/// Gets or sets a value indicating whether the queue can only be accessed by the connection that created it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ namespace Mocha.Transport.RabbitMQ;
/// Manages the RabbitMQ topology model (exchanges, queues, and bindings) for a transport instance,
/// providing thread-safe mutation and lookup of topology resources.
/// </summary>
public sealed class RabbitMQMessagingTopology(RabbitMQMessagingTransport transport, Uri baseAddress)
public sealed class RabbitMQMessagingTopology(
RabbitMQMessagingTransport transport,
Uri baseAddress,
RabbitMQBusDefaults defaults)
: MessagingTopology<RabbitMQMessagingTransport>(transport, baseAddress)
{
private readonly object _lock = new();
Expand All @@ -27,6 +30,11 @@ public sealed class RabbitMQMessagingTopology(RabbitMQMessagingTransport transpo
/// </summary>
public IReadOnlyList<RabbitMQBinding> Bindings => _bindings;

/// <summary>
/// Gets the bus-level defaults applied to all auto-provisioned queues and exchanges.
/// </summary>
public RabbitMQBusDefaults Defaults => defaults;

/// <summary>
/// Adds a new exchange to the topology, initializing it from the given configuration.
/// </summary>
Expand All @@ -46,6 +54,7 @@ public RabbitMQExchange AddExchange(RabbitMQExchangeConfiguration configuration)
exchange = new RabbitMQExchange();

configuration.Topology = this;
defaults.Exchange.ApplyTo(configuration);
exchange.Initialize(configuration);

_exchanges.Add(exchange);
Expand Down Expand Up @@ -75,6 +84,9 @@ public RabbitMQQueue AddQueue(RabbitMQQueueConfiguration configuration)
}

configuration.Topology = this;

defaults.Queue.ApplyTo(configuration);

queue = new RabbitMQQueue();
queue.Initialize(configuration);

Expand Down
Loading
Loading