From 6e39df06a2e5a864de7df276796f7e12015d52c6 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Tue, 10 Sep 2024 10:08:10 -0700 Subject: [PATCH] Require `IChannel` for `AsyncDefaultBasicConsumer` --- .../Networking/Networking_BasicDeliver_Commons.cs | 4 ++-- projects/RabbitMQ.Client/PublicAPI.Shipped.txt | 4 ---- .../client/api/AsyncDefaultBasicConsumer.cs | 13 +++---------- projects/Test/Integration/TestAsyncConsumer.cs | 13 ++++++++----- .../TestAsyncConsumerOperationDispatch.cs | 4 ++-- 5 files changed, 15 insertions(+), 23 deletions(-) diff --git a/projects/Benchmarks/Networking/Networking_BasicDeliver_Commons.cs b/projects/Benchmarks/Networking/Networking_BasicDeliver_Commons.cs index 717156d7c..d8e35ec56 100644 --- a/projects/Benchmarks/Networking/Networking_BasicDeliver_Commons.cs +++ b/projects/Benchmarks/Networking/Networking_BasicDeliver_Commons.cs @@ -14,7 +14,7 @@ public static async Task Publish_Hello_World(IConnection connection, uint messag using (IChannel channel = await connection.CreateChannelAsync()) { QueueDeclareOk queue = await channel.QueueDeclareAsync(); - var consumer = new CountingConsumer(messageCount); + var consumer = new CountingConsumer(channel, messageCount); await channel.BasicConsumeAsync(queue.QueueName, true, consumer); for (int i = 0; i < messageCount; i++) @@ -35,7 +35,7 @@ internal sealed class CountingConsumer : AsyncDefaultBasicConsumer public Task CompletedTask => _tcs.Task; - public CountingConsumer(uint messageCount) + public CountingConsumer(IChannel channel, uint messageCount) : base(channel) { _remainingCount = (int)messageCount; _tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index 36617b407..ee07572f5 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -96,15 +96,11 @@ RabbitMQ.Client.AmqpTimestamp.AmqpTimestamp() -> void RabbitMQ.Client.AmqpTimestamp.AmqpTimestamp(long unixTime) -> void RabbitMQ.Client.AmqpTimestamp.Equals(RabbitMQ.Client.AmqpTimestamp other) -> bool RabbitMQ.Client.AsyncDefaultBasicConsumer -RabbitMQ.Client.AsyncDefaultBasicConsumer.AsyncDefaultBasicConsumer() -> void RabbitMQ.Client.AsyncDefaultBasicConsumer.AsyncDefaultBasicConsumer(RabbitMQ.Client.IChannel channel) -> void RabbitMQ.Client.AsyncDefaultBasicConsumer.Channel.get -> RabbitMQ.Client.IChannel -RabbitMQ.Client.AsyncDefaultBasicConsumer.Channel.set -> void RabbitMQ.Client.AsyncDefaultBasicConsumer.ConsumerTags.get -> string[] RabbitMQ.Client.AsyncDefaultBasicConsumer.IsRunning.get -> bool -RabbitMQ.Client.AsyncDefaultBasicConsumer.IsRunning.set -> void RabbitMQ.Client.AsyncDefaultBasicConsumer.ShutdownReason.get -> RabbitMQ.Client.ShutdownEventArgs -RabbitMQ.Client.AsyncDefaultBasicConsumer.ShutdownReason.set -> void RabbitMQ.Client.BasicGetResult RabbitMQ.Client.BasicGetResult.BasicGetResult(ulong deliveryTag, bool redelivered, string exchange, string routingKey, uint messageCount, RabbitMQ.Client.IReadOnlyBasicProperties basicProperties, System.ReadOnlyMemory body) -> void RabbitMQ.Client.BasicProperties diff --git a/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs b/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs index b48ed713b..7035754dd 100644 --- a/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs @@ -9,13 +9,6 @@ public class AsyncDefaultBasicConsumer : IAsyncBasicConsumer { private readonly HashSet _consumerTags = new HashSet(); - /// - /// Creates a new instance of an . - /// - public AsyncDefaultBasicConsumer() - { - } - /// /// Constructor which sets the Channel property to the given value. /// @@ -40,19 +33,19 @@ public string[] ConsumerTags /// /// Returns true while the consumer is registered and expecting deliveries from the broker. /// - public bool IsRunning { get; protected set; } + public bool IsRunning { get; private set; } /// /// If our shuts down, this property will contain a description of the reason for the /// shutdown. Otherwise it will contain null. See . /// - public ShutdownEventArgs? ShutdownReason { get; protected set; } + public ShutdownEventArgs? ShutdownReason { get; private set; } /// /// Retrieve the this consumer is associated with, /// for use in acknowledging received messages, for instance. /// - public IChannel? Channel { get; set; } + public IChannel Channel { get; private set; } /// /// Called when the consumer is cancelled for reasons other than by a basicCancel: diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index 8cab7d1ed..d07a21b27 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -54,7 +54,7 @@ public TestAsyncConsumer(ITestOutputHelper output) public async Task TestBasicRoundtripConcurrent() { AddCallbackExceptionHandlers(); - _channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output); + _channel.DefaultConsumer = new DefaultAsyncConsumer(_channel, "_channel,", _output); QueueDeclareOk q = await _channel.QueueDeclareAsync(); @@ -147,7 +147,7 @@ public async Task TestBasicRoundtripConcurrent() public async Task TestBasicRoundtripConcurrentManyMessages() { AddCallbackExceptionHandlers(); - _channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output); + _channel.DefaultConsumer = new DefaultAsyncConsumer(_channel, "_channel,", _output); const int publish_total = 4096; const int length = 512; @@ -205,7 +205,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages() using (IChannel publishChannel = await publishConn.CreateChannelAsync()) { AddCallbackExceptionHandlers(publishConn, publishChannel); - publishChannel.DefaultConsumer = new DefaultAsyncConsumer("publishChannel,", _output); + publishChannel.DefaultConsumer = new DefaultAsyncConsumer(publishChannel, + "publishChannel,", _output); publishChannel.ChannelShutdown += (o, ea) => { HandleChannelShutdown(publishChannel, ea, (args) => @@ -247,7 +248,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages() using (IChannel consumeChannel = await consumeConn.CreateChannelAsync()) { AddCallbackExceptionHandlers(consumeConn, consumeChannel); - consumeChannel.DefaultConsumer = new DefaultAsyncConsumer("consumeChannel,", _output); + consumeChannel.DefaultConsumer = new DefaultAsyncConsumer(consumeChannel, + "consumeChannel,", _output); consumeChannel.ChannelShutdown += (o, ea) => { HandleChannelShutdown(consumeChannel, ea, (args) => @@ -722,7 +724,8 @@ private class DefaultAsyncConsumer : AsyncDefaultBasicConsumer private readonly string _logPrefix; private readonly ITestOutputHelper _output; - public DefaultAsyncConsumer(string logPrefix, ITestOutputHelper output) + public DefaultAsyncConsumer(IChannel channel, string logPrefix, ITestOutputHelper output) + : base(channel) { _logPrefix = logPrefix; _output = output; diff --git a/projects/Test/Integration/TestAsyncConsumerOperationDispatch.cs b/projects/Test/Integration/TestAsyncConsumerOperationDispatch.cs index 1d5795e57..86cbb1d46 100644 --- a/projects/Test/Integration/TestAsyncConsumerOperationDispatch.cs +++ b/projects/Test/Integration/TestAsyncConsumerOperationDispatch.cs @@ -182,7 +182,7 @@ public async Task TestChannelShutdownDoesNotShutDownDispatcher() private class ShutdownLatchConsumer : AsyncDefaultBasicConsumer { - public ShutdownLatchConsumer() + public ShutdownLatchConsumer(IChannel channel) : base(channel) { Latch = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); DuplicateLatch = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -211,7 +211,7 @@ public override Task HandleChannelShutdownAsync(object channel, ShutdownEventArg public async Task TestChannelShutdownHandler() { string q = await _channel.QueueDeclareAsync(); - var consumer = new ShutdownLatchConsumer(); + var consumer = new ShutdownLatchConsumer(_channel); await _channel.BasicConsumeAsync(q, true, consumer); await _channel.CloseAsync();