Skip to content

Commit

Permalink
Introduce option
Browse files Browse the repository at this point in the history
  • Loading branch information
danielmarbach committed Sep 26, 2024
1 parent 748ec8c commit 6d7e576
Show file tree
Hide file tree
Showing 22 changed files with 90 additions and 69 deletions.
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static class Constants
/// <summary>
/// The default consumer dispatch concurrency. See <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
/// to set this value for every channel created on a connection,
/// and <see cref="IConnection.CreateChannelAsync(bool, bool, ushort?, System.Threading.CancellationToken)" />
/// and <see cref="IConnection.CreateChannelAsync(CreateChannelOptions?, System.Threading.CancellationToken)" />
/// for setting this value for a particular channel.
/// </summary>
public const ushort DefaultConsumerDispatchConcurrency = 1;
Expand Down
33 changes: 33 additions & 0 deletions projects/RabbitMQ.Client/CreateChannelOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
namespace RabbitMQ.Client
{
/// <summary>
/// Channel creation options.
/// </summary>
public sealed class CreateChannelOptions
{
/// <summary>
/// Enable or disable publisher confirmations on this channel. Defaults to <c>false</c>
/// </summary>
public bool PublisherConfirmationsEnabled { get; set; } = false;

/// <summary>
/// Should this library track publisher confirmations for you? Defaults to <c>false</c>
/// </summary>
public bool PublisherConfirmationTrackingEnabled { get; set; } = false;

/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
///
/// Defaults to <c>null</c>, which will use the value from <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
///
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.
public ushort? ConsumerDispatchConcurrency { get; set; } = null;

/// <summary>
/// The default channel options.
/// </summary>
public static CreateChannelOptions Default { get; } = new CreateChannelOptions();
}
}
22 changes: 3 additions & 19 deletions projects/RabbitMQ.Client/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,26 +240,10 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
/// <summary>
/// Asynchronously create and return a fresh channel, session, and channel.
/// </summary>
/// <param name="publisherConfirmationsEnabled">
/// Enable or disable publisher confirmations on this channel. Defaults to <c>false</c>
/// </param>
/// <param name="publisherConfirmationTrackingEnabled">
/// Should this library track publisher confirmations for you? Defaults to <c>false</c>
/// </param>
/// <param name="consumerDispatchConcurrency">
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
///
/// Defaults to <c>null</c>, which will use the value from <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
///
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.
/// <param name="options">
/// The channel creation options.
/// </param>
/// <param name="cancellationToken">Cancellation token</param>
Task<IChannel> CreateChannelAsync(bool publisherConfirmationsEnabled = false,
bool publisherConfirmationTrackingEnabled = false,
ushort? consumerDispatchConcurrency = null,
CancellationToken cancellationToken = default);
Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = default, CancellationToken cancellationToken = default);
}
}
12 changes: 6 additions & 6 deletions projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -256,21 +256,21 @@ await CloseInnerConnectionAsync()
}
}

public async Task<IChannel> CreateChannelAsync(bool publisherConfirmationsEnabled = false,
bool publisherConfirmationTrackingEnabled = false,
ushort? consumerDispatchConcurrency = null,
public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = default,
CancellationToken cancellationToken = default)
{
EnsureIsOpen();

ushort cdc = consumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);
options ??= CreateChannelOptions.Default;

ushort cdc = options.ConsumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);

RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(
publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled, cdc, cancellationToken)
options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cdc, cancellationToken)
.ConfigureAwait(false);

var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc,
publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled);
options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled);
await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
.ConfigureAwait(false);
return autorecoveringChannel;
Expand Down
10 changes: 5 additions & 5 deletions projects/RabbitMQ.Client/Impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -264,17 +264,17 @@ await CloseAsync(ea, true,
}
}

public async Task<IChannel> CreateChannelAsync(bool publisherConfirmationsEnabled = false,
bool publisherConfirmationTrackingEnabled = false,
ushort? consumerDispatchConcurrency = null,
public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = default,
CancellationToken cancellationToken = default)
{
EnsureIsOpen();

options ??= CreateChannelOptions.Default;
ISession session = CreateSession();

// TODO channel CreateChannelAsync() to combine ctor and OpenAsync
var channel = new Channel(_config, session, consumerDispatchConcurrency);
IChannel ch = await channel.OpenAsync(publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled, cancellationToken)
var channel = new Channel(_config, session, options.ConsumerDispatchConcurrency);
IChannel ch = await channel.OpenAsync(options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cancellationToken)
.ConfigureAwait(false);
return ch;
}
Expand Down
11 changes: 10 additions & 1 deletion projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
RabbitMQ.Client.CreateChannelOptions
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.get -> ushort?
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.set -> void
RabbitMQ.Client.CreateChannelOptions.CreateChannelOptions() -> void
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.get -> bool
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.set -> void
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.get -> bool
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.set -> void
RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(RabbitMQ.Client.Events.ShutdownEventArgs! reason) -> void
RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(RabbitMQ.Client.Events.ShutdownEventArgs! reason, string! prefix) -> void
RabbitMQ.Client.Exceptions.ProtocolViolationException.ProtocolViolationException() -> void
RabbitMQ.Client.Exceptions.RabbitMQClientException.RabbitMQClientException() -> void
RabbitMQ.Client.IConnection.CreateChannelAsync(bool publisherConfirmationsEnabled = false, bool publisherConfirmationTrackingEnabled = false, ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
RabbitMQ.Client.IConnection.CreateChannelAsync(RabbitMQ.Client.CreateChannelOptions? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
static RabbitMQ.Client.CreateChannelOptions.Default.get -> RabbitMQ.Client.CreateChannelOptions!
3 changes: 1 addition & 2 deletions projects/Test/Applications/MassPublish/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ await consumeChannel.BasicConsumeAsync(queue: QueueName, autoAck: true, consumer

publishTasks.Add(Task.Run(async () =>
{
using IChannel publishChannel = await publishConnection.CreateChannelAsync(publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true);
using IChannel publishChannel = await publishConnection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
publishChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ static async Task PublishMessagesIndividuallyAsync()
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms per-message");

await using IConnection connection = await CreateConnectionAsync();
await using IChannel channel = await connection.CreateChannelAsync(publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true);
await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });

// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
Expand All @@ -83,8 +82,7 @@ static async Task PublishMessagesInBatchAsync()
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms in batches");

await using IConnection connection = await CreateConnectionAsync();
await using IChannel channel = await connection.CreateChannelAsync(publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true);
await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });

// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
Expand Down Expand Up @@ -136,8 +134,7 @@ async Task HandlePublishConfirmsAsynchronously()

// NOTE: setting trackConfirmations to false because this program
// is tracking them itself.
await using IChannel channel = await connection.CreateChannelAsync(publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: false);
await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = false });

// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
Expand Down
3 changes: 1 addition & 2 deletions projects/Test/Common/IntegrationFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ public virtual async Task InitializeAsync()

if (_openChannel)
{
_channel = await _conn.CreateChannelAsync(publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true);
_channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
}

if (IsVerbose)
Expand Down
4 changes: 2 additions & 2 deletions projects/Test/Common/TestConnectionRecoveryBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName)
{
using (AutorecoveringConnection publishingConn = await CreateAutorecoveringConnectionAsync())
{
using (IChannel publishingChannel = await publishingConn.CreateChannelAsync(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true))
using (IChannel publishingChannel = await publishingConn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
{
for (ushort i = 0; i < TotalMessageCount; i++)
{
Expand Down Expand Up @@ -342,7 +342,7 @@ public virtual Task PostHandleDeliveryAsync(ulong deliveryTag,

protected static async Task<bool> SendAndConsumeMessageAsync(IConnection conn, string queue, string exchange, string routingKey)
{
using (IChannel ch = await conn.CreateChannelAsync(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true))
using (IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
{
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName,
consumer.ReceivedAsync += MessageReceived;
await _channel.BasicConsumeAsync(queueName, true, consumer);

await using (IChannel pubCh = await _conn.CreateChannelAsync(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true))
await using (IChannel pubCh = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
{
await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: routingKey, body: body);
await pubCh.CloseAsync();
Expand All @@ -106,7 +106,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName,

await CloseAndWaitForRecoveryAsync();

await using (IChannel pubCh = await _conn.CreateChannelAsync(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true))
await using (IChannel pubCh = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
{
await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: "unused", body: body);
await pubCh.CloseAsync();
Expand Down
6 changes: 3 additions & 3 deletions projects/Test/Integration/TestAsyncConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
});
return Task.CompletedTask;
};
await using (IChannel publishChannel = await publishConn.CreateChannelAsync(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true))
await using (IChannel publishChannel = await publishConn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
{
AddCallbackExceptionHandlers(publishConn, publishChannel);
publishChannel.DefaultConsumer = new DefaultAsyncConsumer(publishChannel,
Expand Down Expand Up @@ -646,7 +646,7 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
var consumer1 = new AsyncEventingBasicConsumer(_channel);
consumer1.ReceivedAsync += async (sender, args) =>
{
await using IChannel innerChannel = await _conn.CreateChannelAsync(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true);
await using IChannel innerChannel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
await innerChannel.BasicPublishAsync(exchangeName, queue2Name,
mandatory: true,
body: Encoding.ASCII.GetBytes(nameof(TestCreateChannelWithinAsyncConsumerCallback_GH650)));
Expand Down Expand Up @@ -708,7 +708,7 @@ private async Task ValidateConsumerDispatchConcurrency()
Assert.Equal(ConsumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
Assert.Equal(_consumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
await using IChannel ch = await _conn.CreateChannelAsync(
consumerDispatchConcurrency: expectedConsumerDispatchConcurrency);
new CreateChannelOptions { ConsumerDispatchConcurrency = expectedConsumerDispatchConcurrency});
AutorecoveringChannel ach = (AutorecoveringChannel)ch;
Assert.Equal(expectedConsumerDispatchConcurrency, ach.ConsumerDispatcher.Concurrency);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public async Task TestAsyncEventingBasicConsumer_GH1038()
await _channel.BasicConsumeAsync(queueName, false, consumer);

//publisher
await using IChannel publisherChannel = await _conn.CreateChannelAsync(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true);
await using IChannel publisherChannel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
var props = new BasicProperties();
await publisherChannel.BasicPublishAsync(exchange: exchangeName, routingKey: string.Empty,
Expand Down
Loading

0 comments on commit 6d7e576

Please sign in to comment.