From c9bc2b159142728db9d278c6a3a8455198dd3d18 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Wed, 18 Sep 2024 19:50:42 +0000 Subject: [PATCH 01/12] Event Args --- .../client/events/AsyncEventArgs.cs | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 projects/RabbitMQ.Client/client/events/AsyncEventArgs.cs diff --git a/projects/RabbitMQ.Client/client/events/AsyncEventArgs.cs b/projects/RabbitMQ.Client/client/events/AsyncEventArgs.cs new file mode 100644 index 000000000..2573c5cb8 --- /dev/null +++ b/projects/RabbitMQ.Client/client/events/AsyncEventArgs.cs @@ -0,0 +1,75 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. +//--------------------------------------------------------------------------- + +using System.Threading; + +namespace RabbitMQ.Client.Events +{ + /// + /// Provides data for + /// events that can be invoked asynchronously. + /// + public class AsyncEventArgs + { + /// + /// Initializes a new instance of the + /// class. + /// + /// + /// A cancellation token related to the original operation that raised + /// the event. It's important for your handler to pass this token + /// along to any asynchronous or long-running synchronous operations + /// that take a token so cancellation will correctly propagate. The + /// default value is . + /// + public AsyncEventArgs(CancellationToken cancellationToken = default) + { + CancellationToken = cancellationToken; + } + + /// + /// Gets a cancellation token related to the original operation that + /// raised the event. It's important for your handler to pass this + /// token along to any asynchronous or long-running synchronous + /// operations that take a token so cancellation (via something like + /// + /// new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token + /// + /// for example) will correctly propagate. + /// + public CancellationToken CancellationToken { get; } + + /// + /// Provides a value to use with events that do not have event data. + /// + public static readonly AsyncEventArgs Empty = new AsyncEventArgs(); + } +} From 17d2903769b6ad54a0eebb61b8369a7c7af70bef Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Wed, 18 Sep 2024 19:51:38 +0000 Subject: [PATCH 02/12] Restrict AsyncEventHandler arguments --- projects/RabbitMQ.Client/client/events/AsyncEventHandler.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/projects/RabbitMQ.Client/client/events/AsyncEventHandler.cs b/projects/RabbitMQ.Client/client/events/AsyncEventHandler.cs index 014d035c6..c01d73b3b 100644 --- a/projects/RabbitMQ.Client/client/events/AsyncEventHandler.cs +++ b/projects/RabbitMQ.Client/client/events/AsyncEventHandler.cs @@ -33,5 +33,5 @@ namespace RabbitMQ.Client.Events { - public delegate Task AsyncEventHandler(object sender, TEvent @event); + public delegate Task AsyncEventHandler(object sender, TEvent @event) where TEvent : AsyncEventArgs; } From a0b3a030f74601dd7935749b8505a265b3d7ebd4 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Wed, 18 Sep 2024 20:07:26 +0000 Subject: [PATCH 03/12] Adjust event args and declaration --- .../RabbitMQ.Client/client/api/IConnection.cs | 4 ++-- .../RabbitMQ.Client/client/api/IRecoverable.cs | 2 +- .../client/api/ShutdownEventArgs.cs | 16 ++++++++++------ .../client/events/AsyncEventArgs.cs | 1 + .../client/events/BasicAckEventArgs.cs | 7 ++++--- .../client/events/BasicDeliverEventArgs.cs | 6 ++++-- .../client/events/BasicNackEventArgs.cs | 7 ++++--- .../client/events/BasicReturnEventArgs.cs | 6 ++++-- .../client/events/CallbackExceptionEventArgs.cs | 10 ++++++---- .../client/events/ConnectionBlockedEventArgs.cs | 6 ++++-- .../events/ConnectionRecoveryErrorEventArgs.cs | 6 ++++-- .../client/events/ConsumerEventArgs.cs | 6 ++++-- .../ConsumerTagChangedAfterRecoveryEventArgs.cs | 7 +++++-- .../client/events/FlowControlEventArgs.cs | 6 ++++-- .../QueueNameChangedAfterRecoveryEventArgs.cs | 7 +++++-- .../client/events/RecoveringConsumerEventArgs.cs | 7 +++++-- .../client/impl/AsyncEventingWrapper.cs | 2 +- .../client/impl/AutorecoveringChannel.cs | 2 +- .../impl/AutorecoveringConnection.Recovery.cs | 2 +- .../client/impl/AutorecoveringConnection.cs | 8 ++++---- .../RabbitMQ.Client/client/impl/ChannelBase.cs | 8 ++++---- .../client/impl/Connection.Commands.cs | 2 +- .../RabbitMQ.Client/client/impl/Connection.cs | 8 ++++---- 23 files changed, 83 insertions(+), 53 deletions(-) diff --git a/projects/RabbitMQ.Client/client/api/IConnection.cs b/projects/RabbitMQ.Client/client/api/IConnection.cs index 3ea28ac77..0fc6a4c11 100644 --- a/projects/RabbitMQ.Client/client/api/IConnection.cs +++ b/projects/RabbitMQ.Client/client/api/IConnection.cs @@ -161,7 +161,7 @@ public interface IConnection : INetworkConnection, IDisposable /// /// This event will never fire for connections that disable automatic recovery. /// - event AsyncEventHandler RecoverySucceededAsync; + event AsyncEventHandler RecoverySucceededAsync; /// /// Raised when the connection recovery fails, e.g. because reconnection or topology @@ -212,7 +212,7 @@ public interface IConnection : INetworkConnection, IDisposable /// /// Raised when a connection is unblocked by the AMQP broker. /// - event AsyncEventHandler ConnectionUnblockedAsync; + event AsyncEventHandler ConnectionUnblockedAsync; /// /// This method updates the secret used to authenticate this connection. diff --git a/projects/RabbitMQ.Client/client/api/IRecoverable.cs b/projects/RabbitMQ.Client/client/api/IRecoverable.cs index c8ce3edc4..82a8cd80d 100644 --- a/projects/RabbitMQ.Client/client/api/IRecoverable.cs +++ b/projects/RabbitMQ.Client/client/api/IRecoverable.cs @@ -39,6 +39,6 @@ namespace RabbitMQ.Client /// public interface IRecoverable { - event AsyncEventHandler RecoveryAsync; + event AsyncEventHandler RecoveryAsync; } } diff --git a/projects/RabbitMQ.Client/client/api/ShutdownEventArgs.cs b/projects/RabbitMQ.Client/client/api/ShutdownEventArgs.cs index 2d940ebb6..9f84b2347 100644 --- a/projects/RabbitMQ.Client/client/api/ShutdownEventArgs.cs +++ b/projects/RabbitMQ.Client/client/api/ShutdownEventArgs.cs @@ -30,6 +30,8 @@ //--------------------------------------------------------------------------- using System; +using System.Threading; +using RabbitMQ.Client.Events; namespace RabbitMQ.Client { @@ -39,7 +41,8 @@ namespace RabbitMQ.Client /// /// The and properties should be used to determine the originator of the shutdown event. /// - public class ShutdownEventArgs : EventArgs + /// TODO: Should this be moved to the events folder and the namespace be adjusted? + public class ShutdownEventArgs : AsyncEventArgs { private readonly Exception? _exception; @@ -48,8 +51,8 @@ public class ShutdownEventArgs : EventArgs /// 0 for and . /// public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText, - object? cause = null) - : this(initiator, replyCode, replyText, 0, 0, cause) + object? cause = null, CancellationToken cancellationToken = default) + : this(initiator, replyCode, replyText, 0, 0, cause, cancellationToken: cancellationToken) { } @@ -57,7 +60,8 @@ public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string r /// Construct a with the given parameters. /// public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText, - ushort classId, ushort methodId, object? cause = null) + ushort classId, ushort methodId, object? cause = null, CancellationToken cancellationToken = default) + : base(cancellationToken) { Initiator = initiator; ReplyCode = replyCode; @@ -70,8 +74,8 @@ public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string r /// /// Construct a with the given parameters. /// - public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText, Exception exception) - : this(initiator, replyCode, replyText, 0, 0) + public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText, Exception exception, CancellationToken cancellationToken = default) + : this(initiator, replyCode, replyText, 0, 0, cancellationToken: cancellationToken) { _exception = exception ?? throw new ArgumentNullException(nameof(exception)); } diff --git a/projects/RabbitMQ.Client/client/events/AsyncEventArgs.cs b/projects/RabbitMQ.Client/client/events/AsyncEventArgs.cs index 2573c5cb8..e6ac9272b 100644 --- a/projects/RabbitMQ.Client/client/events/AsyncEventArgs.cs +++ b/projects/RabbitMQ.Client/client/events/AsyncEventArgs.cs @@ -51,6 +51,7 @@ public class AsyncEventArgs /// default value is . /// public AsyncEventArgs(CancellationToken cancellationToken = default) + : base() { CancellationToken = cancellationToken; } diff --git a/projects/RabbitMQ.Client/client/events/BasicAckEventArgs.cs b/projects/RabbitMQ.Client/client/events/BasicAckEventArgs.cs index 6919a2622..09b3b3986 100644 --- a/projects/RabbitMQ.Client/client/events/BasicAckEventArgs.cs +++ b/projects/RabbitMQ.Client/client/events/BasicAckEventArgs.cs @@ -30,15 +30,16 @@ //--------------------------------------------------------------------------- using System; +using System.Threading; namespace RabbitMQ.Client.Events { ///Contains all the information about a message acknowledged ///from an AMQP broker within the Basic content-class. - public class BasicAckEventArgs : EventArgs + public class BasicAckEventArgs : AsyncEventArgs { - public BasicAckEventArgs(ulong deliveryTag, bool multiple) - : base() + public BasicAckEventArgs(ulong deliveryTag, bool multiple, CancellationToken cancellationToken = default) + : base(cancellationToken) { DeliveryTag = deliveryTag; Multiple = multiple; diff --git a/projects/RabbitMQ.Client/client/events/BasicDeliverEventArgs.cs b/projects/RabbitMQ.Client/client/events/BasicDeliverEventArgs.cs index c35c069c6..d862872c7 100644 --- a/projects/RabbitMQ.Client/client/events/BasicDeliverEventArgs.cs +++ b/projects/RabbitMQ.Client/client/events/BasicDeliverEventArgs.cs @@ -30,12 +30,13 @@ //--------------------------------------------------------------------------- using System; +using System.Threading; namespace RabbitMQ.Client.Events { ///Contains all the information about a message delivered ///from an AMQP broker within the Basic content-class. - public class BasicDeliverEventArgs : EventArgs + public class BasicDeliverEventArgs : AsyncEventArgs { ///Constructor that fills the event's properties from ///its arguments. @@ -45,7 +46,8 @@ public BasicDeliverEventArgs(string consumerTag, string exchange, string routingKey, IReadOnlyBasicProperties properties, - ReadOnlyMemory body) : base() + ReadOnlyMemory body, + CancellationToken cancellationToken = default) : base(cancellationToken) { ConsumerTag = consumerTag; DeliveryTag = deliveryTag; diff --git a/projects/RabbitMQ.Client/client/events/BasicNackEventArgs.cs b/projects/RabbitMQ.Client/client/events/BasicNackEventArgs.cs index 292e42cfb..6d63318e5 100644 --- a/projects/RabbitMQ.Client/client/events/BasicNackEventArgs.cs +++ b/projects/RabbitMQ.Client/client/events/BasicNackEventArgs.cs @@ -30,15 +30,16 @@ //--------------------------------------------------------------------------- using System; +using System.Threading; namespace RabbitMQ.Client.Events { ///Contains all the information about a message nack'd ///from an AMQP broker within the Basic content-class. - public class BasicNackEventArgs : EventArgs + public class BasicNackEventArgs : AsyncEventArgs { - public BasicNackEventArgs(ulong deliveryTag, bool multiple, bool requeue) - : base() + public BasicNackEventArgs(ulong deliveryTag, bool multiple, bool requeue, CancellationToken cancellationToken = default) + : base(cancellationToken) { DeliveryTag = deliveryTag; Multiple = multiple; diff --git a/projects/RabbitMQ.Client/client/events/BasicReturnEventArgs.cs b/projects/RabbitMQ.Client/client/events/BasicReturnEventArgs.cs index 9598ea4f0..dd4c4e199 100644 --- a/projects/RabbitMQ.Client/client/events/BasicReturnEventArgs.cs +++ b/projects/RabbitMQ.Client/client/events/BasicReturnEventArgs.cs @@ -30,12 +30,13 @@ //--------------------------------------------------------------------------- using System; +using System.Threading; namespace RabbitMQ.Client.Events { ///Contains all the information about a message returned ///from an AMQP broker within the Basic content-class. - public class BasicReturnEventArgs : EventArgs + public class BasicReturnEventArgs : AsyncEventArgs { public BasicReturnEventArgs( ushort replyCode, @@ -43,7 +44,8 @@ public BasicReturnEventArgs( string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, - ReadOnlyMemory body) : base() + ReadOnlyMemory body, + CancellationToken cancellationToken = default) : base(cancellationToken) { ReplyCode = replyCode; ReplyText = replyText; diff --git a/projects/RabbitMQ.Client/client/events/CallbackExceptionEventArgs.cs b/projects/RabbitMQ.Client/client/events/CallbackExceptionEventArgs.cs index f0c04b44f..f7a5160bf 100644 --- a/projects/RabbitMQ.Client/client/events/CallbackExceptionEventArgs.cs +++ b/projects/RabbitMQ.Client/client/events/CallbackExceptionEventArgs.cs @@ -31,13 +31,15 @@ using System; using System.Collections.Generic; +using System.Threading; namespace RabbitMQ.Client.Events { - public abstract class BaseExceptionEventArgs : EventArgs + public abstract class BaseExceptionEventArgs : AsyncEventArgs { ///Wrap an exception thrown by a callback. - protected BaseExceptionEventArgs(IDictionary detail, Exception exception) + protected BaseExceptionEventArgs(IDictionary detail, Exception exception, CancellationToken cancellationToken = default) + : base(cancellationToken) { Detail = detail; Exception = exception; @@ -76,8 +78,8 @@ public class CallbackExceptionEventArgs : BaseExceptionEventArgs private const string ContextString = "context"; private const string ConsumerString = "consumer"; - public CallbackExceptionEventArgs(IDictionary detail, Exception exception) - : base(detail, exception) + public CallbackExceptionEventArgs(IDictionary detail, Exception exception, CancellationToken cancellationToken = default) + : base(detail, exception, cancellationToken) { } diff --git a/projects/RabbitMQ.Client/client/events/ConnectionBlockedEventArgs.cs b/projects/RabbitMQ.Client/client/events/ConnectionBlockedEventArgs.cs index bbb0a28a4..0a9b14c39 100644 --- a/projects/RabbitMQ.Client/client/events/ConnectionBlockedEventArgs.cs +++ b/projects/RabbitMQ.Client/client/events/ConnectionBlockedEventArgs.cs @@ -30,15 +30,17 @@ //--------------------------------------------------------------------------- using System; +using System.Threading; namespace RabbitMQ.Client.Events { /// /// Event relating to connection being blocked. /// - public class ConnectionBlockedEventArgs : EventArgs + public class ConnectionBlockedEventArgs : AsyncEventArgs { - public ConnectionBlockedEventArgs(string reason) + public ConnectionBlockedEventArgs(string reason, CancellationToken cancellationToken = default) + : base(cancellationToken) { Reason = reason; } diff --git a/projects/RabbitMQ.Client/client/events/ConnectionRecoveryErrorEventArgs.cs b/projects/RabbitMQ.Client/client/events/ConnectionRecoveryErrorEventArgs.cs index 39559641c..f956ec731 100644 --- a/projects/RabbitMQ.Client/client/events/ConnectionRecoveryErrorEventArgs.cs +++ b/projects/RabbitMQ.Client/client/events/ConnectionRecoveryErrorEventArgs.cs @@ -30,12 +30,14 @@ //--------------------------------------------------------------------------- using System; +using System.Threading; namespace RabbitMQ.Client.Events { - public sealed class ConnectionRecoveryErrorEventArgs : EventArgs + public sealed class ConnectionRecoveryErrorEventArgs : AsyncEventArgs { - public ConnectionRecoveryErrorEventArgs(Exception ex) + public ConnectionRecoveryErrorEventArgs(Exception ex, CancellationToken cancellationToken = default) + : base(cancellationToken) { Exception = ex; } diff --git a/projects/RabbitMQ.Client/client/events/ConsumerEventArgs.cs b/projects/RabbitMQ.Client/client/events/ConsumerEventArgs.cs index 4274be57c..cc972915c 100644 --- a/projects/RabbitMQ.Client/client/events/ConsumerEventArgs.cs +++ b/projects/RabbitMQ.Client/client/events/ConsumerEventArgs.cs @@ -30,16 +30,18 @@ //--------------------------------------------------------------------------- using System; +using System.Threading; namespace RabbitMQ.Client.Events { ///Event relating to a successful consumer registration ///or cancellation. - public class ConsumerEventArgs : EventArgs + public class ConsumerEventArgs : AsyncEventArgs { ///Construct an event containing the consumer-tags of ///the consumer the event relates to. - public ConsumerEventArgs(string[] consumerTags) + public ConsumerEventArgs(string[] consumerTags, CancellationToken cancellationToken = default) + : base(cancellationToken) { ConsumerTags = consumerTags; } diff --git a/projects/RabbitMQ.Client/client/events/ConsumerTagChangedAfterRecoveryEventArgs.cs b/projects/RabbitMQ.Client/client/events/ConsumerTagChangedAfterRecoveryEventArgs.cs index 2c2259335..80bdf6242 100644 --- a/projects/RabbitMQ.Client/client/events/ConsumerTagChangedAfterRecoveryEventArgs.cs +++ b/projects/RabbitMQ.Client/client/events/ConsumerTagChangedAfterRecoveryEventArgs.cs @@ -30,17 +30,20 @@ //--------------------------------------------------------------------------- using System; +using System.Threading; namespace RabbitMQ.Client.Events { - public sealed class ConsumerTagChangedAfterRecoveryEventArgs : EventArgs + public sealed class ConsumerTagChangedAfterRecoveryEventArgs : AsyncEventArgs { /// /// Initializes a new instance of the class. /// /// The tag before. /// The tag after. - public ConsumerTagChangedAfterRecoveryEventArgs(string tagBefore, string tagAfter) + /// The cancellation token. + public ConsumerTagChangedAfterRecoveryEventArgs(string tagBefore, string tagAfter, CancellationToken cancellationToken = default) + : base(cancellationToken) { TagBefore = tagBefore; TagAfter = tagAfter; diff --git a/projects/RabbitMQ.Client/client/events/FlowControlEventArgs.cs b/projects/RabbitMQ.Client/client/events/FlowControlEventArgs.cs index 5c246ab19..347f0e512 100644 --- a/projects/RabbitMQ.Client/client/events/FlowControlEventArgs.cs +++ b/projects/RabbitMQ.Client/client/events/FlowControlEventArgs.cs @@ -30,15 +30,17 @@ //--------------------------------------------------------------------------- using System; +using System.Threading; namespace RabbitMQ.Client.Events { /// /// Event relating to flow control. /// - public class FlowControlEventArgs : EventArgs + public class FlowControlEventArgs : AsyncEventArgs { - public FlowControlEventArgs(bool active) + public FlowControlEventArgs(bool active, CancellationToken cancellationToken = default) + : base(cancellationToken) { Active = active; } diff --git a/projects/RabbitMQ.Client/client/events/QueueNameChangedAfterRecoveryEventArgs.cs b/projects/RabbitMQ.Client/client/events/QueueNameChangedAfterRecoveryEventArgs.cs index bdeabcc77..d052d681e 100644 --- a/projects/RabbitMQ.Client/client/events/QueueNameChangedAfterRecoveryEventArgs.cs +++ b/projects/RabbitMQ.Client/client/events/QueueNameChangedAfterRecoveryEventArgs.cs @@ -30,17 +30,20 @@ //--------------------------------------------------------------------------- using System; +using System.Threading; namespace RabbitMQ.Client.Events { - public sealed class QueueNameChangedAfterRecoveryEventArgs : EventArgs + public sealed class QueueNameChangedAfterRecoveryEventArgs : AsyncEventArgs { /// /// Initializes a new instance of the class. /// /// The name before. /// The name after. - public QueueNameChangedAfterRecoveryEventArgs(string nameBefore, string nameAfter) + /// The cancellation token. + public QueueNameChangedAfterRecoveryEventArgs(string nameBefore, string nameAfter, CancellationToken cancellationToken = default) + : base(cancellationToken) { NameBefore = nameBefore; NameAfter = nameAfter; diff --git a/projects/RabbitMQ.Client/client/events/RecoveringConsumerEventArgs.cs b/projects/RabbitMQ.Client/client/events/RecoveringConsumerEventArgs.cs index 7da24d90c..f2702cc37 100644 --- a/projects/RabbitMQ.Client/client/events/RecoveringConsumerEventArgs.cs +++ b/projects/RabbitMQ.Client/client/events/RecoveringConsumerEventArgs.cs @@ -30,13 +30,14 @@ //--------------------------------------------------------------------------- using System.Collections.Generic; +using System.Threading; namespace RabbitMQ.Client.Events { /// /// Event related to consumer recovery, during automatic recovery. /// - public class RecoveringConsumerEventArgs + public class RecoveringConsumerEventArgs : AsyncEventArgs { /// /// Constructs an event containing the consumer arguments and consumer @@ -44,7 +45,9 @@ public class RecoveringConsumerEventArgs /// /// Consumer arguments of the consumer for this event /// Consumer tag of the consumer for this event - public RecoveringConsumerEventArgs(string consumerTag, IDictionary? consumerArguments) + /// The cancellation token. + public RecoveringConsumerEventArgs(string consumerTag, IDictionary? consumerArguments, CancellationToken cancellationToken = default) + : base(cancellationToken) { ConsumerTag = consumerTag; ConsumerArguments = consumerArguments; diff --git a/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs b/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs index beecf229a..0c85ddb4c 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs @@ -4,7 +4,7 @@ namespace RabbitMQ.Client.Impl { - internal struct AsyncEventingWrapper + internal struct AsyncEventingWrapper where T : AsyncEventArgs { private event AsyncEventHandler? _event; private Delegate[]? _handlers; diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index 3ce67ae3c..19aaf51fb 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -115,7 +115,7 @@ public event AsyncEventHandler ChannelShutdownAsync remove => InnerChannel.ChannelShutdownAsync -= value; } - public event AsyncEventHandler RecoveryAsync + public event AsyncEventHandler RecoveryAsync { add { InnerChannel.RecoveryAsync += value; } remove { InnerChannel.RecoveryAsync -= value; } diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs index 2a896065f..bb17fcb0b 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs @@ -206,7 +206,7 @@ await RecoverChannelsAndItsConsumersAsync(recordedEntitiesSemaphoreHeld: true, c ESLog.Info("Connection recovery completed"); ThrowIfDisposed(); - await _recoverySucceededAsyncWrapper.InvokeAsync(this, EventArgs.Empty) + await _recoverySucceededAsyncWrapper.InvokeAsync(this, AsyncEventArgs.Empty) .ConfigureAwait(false); return true; diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index c7315eaf4..9ee7ce24f 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -69,7 +69,7 @@ internal AutorecoveringConnection(ConnectionConfig config, IEndpointResolver end ConnectionShutdownAsync += HandleConnectionShutdownAsync; _recoverySucceededAsyncWrapper = - new AsyncEventingWrapper("OnConnectionRecovery", onExceptionAsync); + new AsyncEventingWrapper("OnConnectionRecovery", onExceptionAsync); _connectionRecoveryErrorAsyncWrapper = new AsyncEventingWrapper("OnConnectionRecoveryError", onExceptionAsync); @@ -99,12 +99,12 @@ await innerConnection.OpenAsync(cancellationToken) return connection; } - public event AsyncEventHandler RecoverySucceededAsync + public event AsyncEventHandler RecoverySucceededAsync { add => _recoverySucceededAsyncWrapper.AddHandler(value); remove => _recoverySucceededAsyncWrapper.RemoveHandler(value); } - private AsyncEventingWrapper _recoverySucceededAsyncWrapper; + private AsyncEventingWrapper _recoverySucceededAsyncWrapper; public event AsyncEventHandler ConnectionRecoveryErrorAsync { @@ -131,7 +131,7 @@ public event AsyncEventHandler ConnectionShutdownAsync remove => InnerConnection.ConnectionShutdownAsync -= value; } - public event AsyncEventHandler ConnectionUnblockedAsync + public event AsyncEventHandler ConnectionUnblockedAsync { add => InnerConnection.ConnectionUnblockedAsync += value; remove => InnerConnection.ConnectionUnblockedAsync -= value; diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 3d94112ea..60e92bee0 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -88,7 +88,7 @@ protected ChannelBase(ConnectionConfig config, ISession session, new AsyncEventingWrapper(string.Empty, (exception, context) => Task.CompletedTask); _flowControlAsyncWrapper = new AsyncEventingWrapper("OnFlowControl", onExceptionAsync); _channelShutdownAsyncWrapper = new AsyncEventingWrapper("OnChannelShutdownAsync", onExceptionAsync); - _recoveryAsyncWrapper = new AsyncEventingWrapper("OnChannelRecovery", onExceptionAsync); + _recoveryAsyncWrapper = new AsyncEventingWrapper("OnChannelRecovery", onExceptionAsync); session.CommandReceived = HandleCommandAsync; session.SessionShutdownAsync += OnSessionShutdownAsync; Session = session; @@ -155,17 +155,17 @@ public event AsyncEventHandler ChannelShutdownAsync private AsyncEventingWrapper _channelShutdownAsyncWrapper; - public event AsyncEventHandler RecoveryAsync + public event AsyncEventHandler RecoveryAsync { add => _recoveryAsyncWrapper.AddHandler(value); remove => _recoveryAsyncWrapper.RemoveHandler(value); } - private AsyncEventingWrapper _recoveryAsyncWrapper; + private AsyncEventingWrapper _recoveryAsyncWrapper; internal Task RunRecoveryEventHandlers(object sender) { - return _recoveryAsyncWrapper.InvokeAsync(sender, EventArgs.Empty); + return _recoveryAsyncWrapper.InvokeAsync(sender, AsyncEventArgs.Empty); } public int ChannelNumber => ((Session)Session).ChannelNumber; diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs index 0a954b89e..40f68630d 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs @@ -67,7 +67,7 @@ internal Task HandleConnectionUnblockedAsync() { if (!_connectionUnblockedAsyncWrapper.IsEmpty) { - return _connectionUnblockedAsyncWrapper.InvokeAsync(this, EventArgs.Empty); + return _connectionUnblockedAsyncWrapper.InvokeAsync(this, AsyncEventArgs.Empty); } return Task.CompletedTask; } diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 77aaed419..302c3df19 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -72,7 +72,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler) new AsyncEventingWrapper("OnConnectionBlocked", onExceptionAsync); _connectionUnblockedAsyncWrapper = - new AsyncEventingWrapper("OnConnectionUnblocked", onExceptionAsync); + new AsyncEventingWrapper("OnConnectionUnblocked", onExceptionAsync); _connectionShutdownAsyncWrapper = new AsyncEventingWrapper("OnShutdown", onExceptionAsync); @@ -142,12 +142,12 @@ public event AsyncEventHandler ConnectionBlockedAsyn } private AsyncEventingWrapper _connectionBlockedAsyncWrapper; - public event AsyncEventHandler ConnectionUnblockedAsync + public event AsyncEventHandler ConnectionUnblockedAsync { add => _connectionUnblockedAsyncWrapper.AddHandler(value); remove => _connectionUnblockedAsyncWrapper.RemoveHandler(value); } - private AsyncEventingWrapper _connectionUnblockedAsyncWrapper; + private AsyncEventingWrapper _connectionUnblockedAsyncWrapper; public event AsyncEventHandler RecoveringConsumerAsync { @@ -182,7 +182,7 @@ public event AsyncEventHandler ConnectionShutdownAsync /// /// This event is never fired by non-recovering connections but it is a part of the interface. /// - public event AsyncEventHandler RecoverySucceededAsync + public event AsyncEventHandler RecoverySucceededAsync { add { } remove { } From fa1dc4b4da94111fabbedc4f94ca2fdc765ff0bf Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Wed, 18 Sep 2024 20:09:54 +0000 Subject: [PATCH 04/12] Propagate token into wrapper --- .../client/events/CallbackExceptionEventArgs.cs | 9 +++++---- .../client/impl/AsyncEventingWrapper.cs | 13 +++++++------ .../client/impl/AutorecoveringConnection.cs | 4 ++-- projects/RabbitMQ.Client/client/impl/ChannelBase.cs | 6 +++--- projects/RabbitMQ.Client/client/impl/Connection.cs | 6 +++--- 5 files changed, 20 insertions(+), 18 deletions(-) diff --git a/projects/RabbitMQ.Client/client/events/CallbackExceptionEventArgs.cs b/projects/RabbitMQ.Client/client/events/CallbackExceptionEventArgs.cs index f7a5160bf..3d5b16a82 100644 --- a/projects/RabbitMQ.Client/client/events/CallbackExceptionEventArgs.cs +++ b/projects/RabbitMQ.Client/client/events/CallbackExceptionEventArgs.cs @@ -78,28 +78,29 @@ public class CallbackExceptionEventArgs : BaseExceptionEventArgs private const string ContextString = "context"; private const string ConsumerString = "consumer"; + // TODO Why is this public when there is a build method? public CallbackExceptionEventArgs(IDictionary detail, Exception exception, CancellationToken cancellationToken = default) : base(detail, exception, cancellationToken) { } - public static CallbackExceptionEventArgs Build(Exception e, string context) + public static CallbackExceptionEventArgs Build(Exception e, string context, CancellationToken cancellationToken = default) { var details = new Dictionary(1) { {ContextString, context} }; - return new CallbackExceptionEventArgs(details, e); + return new CallbackExceptionEventArgs(details, e, cancellationToken); } - public static CallbackExceptionEventArgs Build(Exception e, string context, object consumer) + public static CallbackExceptionEventArgs Build(Exception e, string context, object consumer, CancellationToken cancellationToken = default) { var details = new Dictionary(2) { {ContextString, context}, {ConsumerString, consumer} }; - return new CallbackExceptionEventArgs(details, e); + return new CallbackExceptionEventArgs(details, e, cancellationToken); } } } diff --git a/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs b/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs index 0c85ddb4c..b0eb9de63 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client.Events; @@ -9,11 +10,11 @@ internal struct AsyncEventingWrapper where T : AsyncEventArgs private event AsyncEventHandler? _event; private Delegate[]? _handlers; private string? _context; - private Func? _onException; + private Func? _onException; public readonly bool IsEmpty => _event is null; - public AsyncEventingWrapper(string context, Func onException) + public AsyncEventingWrapper(string context, Func onException) { _event = null; _handlers = null; @@ -34,7 +35,7 @@ public void RemoveHandler(AsyncEventHandler? handler) } // Do not make this function async! (This type is a struct that gets copied at the start of an async method => empty _handlers is copied) - public Task InvokeAsync(object sender, T parameter) + public Task InvokeAsync(object sender, T parameter, CancellationToken cancellationToken = default) { Delegate[]? handlers = _handlers; if (handlers is null) @@ -48,10 +49,10 @@ public Task InvokeAsync(object sender, T parameter) _handlers = handlers; } - return InternalInvoke(handlers, sender, parameter); + return InternalInvoke(handlers, sender, parameter, cancellationToken); } - private readonly async Task InternalInvoke(Delegate[] handlers, object sender, T parameter) + private readonly async Task InternalInvoke(Delegate[] handlers, object sender, T parameter, CancellationToken cancellationToken) { foreach (AsyncEventHandler action in handlers) { @@ -64,7 +65,7 @@ await action(sender, parameter) { if (_onException != null) { - await _onException(exception, _context!) + await _onException(exception, _context!, cancellationToken) .ConfigureAwait(false); } else diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index 9ee7ce24f..19ee70efe 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -83,8 +83,8 @@ internal AutorecoveringConnection(ConnectionConfig config, IEndpointResolver end _recoveringConsumerAsyncWrapper = new AsyncEventingWrapper("OnRecoveringConsumer", onExceptionAsync); - Task onExceptionAsync(Exception exception, string context) => - _innerConnection.OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context)); + Task onExceptionAsync(Exception exception, string context, CancellationToken cancellationToken) => + _innerConnection.OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken)); } internal static async ValueTask CreateAsync(ConnectionConfig config, IEndpointResolver endpoints, diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 60e92bee0..447a2ec8b 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -79,13 +79,13 @@ protected ChannelBase(ConnectionConfig config, ISession session, ContinuationTimeout = config.ContinuationTimeout; ConsumerDispatcher = new AsyncConsumerDispatcher(this, perChannelConsumerDispatchConcurrency.GetValueOrDefault(config.ConsumerDispatchConcurrency)); - Func onExceptionAsync = (exception, context) => - OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context)); + Func onExceptionAsync = (exception, context, cancellationToken) => + OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken)); _basicAcksAsyncWrapper = new AsyncEventingWrapper("OnBasicAck", onExceptionAsync); _basicNacksAsyncWrapper = new AsyncEventingWrapper("OnBasicNack", onExceptionAsync); _basicReturnAsyncWrapper = new AsyncEventingWrapper("OnBasicReturn", onExceptionAsync); _callbackExceptionAsyncWrapper = - new AsyncEventingWrapper(string.Empty, (exception, context) => Task.CompletedTask); + new AsyncEventingWrapper(string.Empty, (exception, context, cancellationToken) => Task.CompletedTask); _flowControlAsyncWrapper = new AsyncEventingWrapper("OnFlowControl", onExceptionAsync); _channelShutdownAsyncWrapper = new AsyncEventingWrapper("OnChannelShutdownAsync", onExceptionAsync); _recoveryAsyncWrapper = new AsyncEventingWrapper("OnChannelRecovery", onExceptionAsync); diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 302c3df19..f87184742 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -66,7 +66,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler) _callbackExceptionAsyncWrapper = new AsyncEventingWrapper(string.Empty, - (exception, context) => Task.CompletedTask); + (exception, context, cancellationToken) => Task.CompletedTask); _connectionBlockedAsyncWrapper = new AsyncEventingWrapper("OnConnectionBlocked", onExceptionAsync); @@ -89,8 +89,8 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler) _mainLoopTask = Task.CompletedTask; - Task onExceptionAsync(Exception exception, string context) => - OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context)); + Task onExceptionAsync(Exception exception, string context, CancellationToken cancellationToken) => + OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context, cancellationToken)); } public Guid Id => _id; From b77319645987edd371fcf354867bbb17d1053c0f Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Wed, 18 Sep 2024 20:30:21 +0000 Subject: [PATCH 05/12] Propagate token into consumer --- .../client/api/AsyncDefaultBasicConsumer.cs | 24 ++++++++++++------- .../client/api/IAsyncBasicConsumer.cs | 15 ++++++++---- .../events/AsyncEventingBasicConsumer.cs | 21 ++++++++-------- .../ConsumerDispatching/FallbackConsumer.cs | 9 +++---- .../Test/Integration/TestAsyncConsumer.cs | 4 ++-- 5 files changed, 43 insertions(+), 30 deletions(-) diff --git a/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs b/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs index 7035754dd..e9bc07b0f 100644 --- a/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; namespace RabbitMQ.Client @@ -45,7 +46,7 @@ public string[] ConsumerTags /// Retrieve the this consumer is associated with, /// for use in acknowledging received messages, for instance. /// - public IChannel Channel { get; private set; } + public IChannel Channel { get; } /// /// Called when the consumer is cancelled for reasons other than by a basicCancel: @@ -53,25 +54,28 @@ public string[] ConsumerTags /// See for notification of consumer cancellation due to basicCancel /// /// Consumer tag this consumer is registered. - public virtual Task HandleBasicCancelAsync(string consumerTag) + /// The cancellation token. + public virtual Task HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken = default) { - return OnCancel(consumerTag); + return OnCancelAsync(new []{ consumerTag }, cancellationToken); } /// /// Called upon successful deregistration of the consumer from the broker. /// /// Consumer tag this consumer is registered. - public virtual Task HandleBasicCancelOkAsync(string consumerTag) + /// The cancellation token. + public virtual Task HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken = default) { - return OnCancel(consumerTag); + return OnCancelAsync(new []{ consumerTag }, cancellationToken); } /// /// Called upon successful registration of the consumer with the broker. /// /// Consumer tag this consumer is registered. - public virtual Task HandleBasicConsumeOkAsync(string consumerTag) + /// The cancellation token. + public virtual Task HandleBasicConsumeOkAsync(string consumerTag, CancellationToken cancellationToken = default) { _consumerTags.Add(consumerTag); IsRunning = true; @@ -94,7 +98,8 @@ public virtual Task HandleBasicDeliverAsync(string consumerTag, string exchange, string routingKey, IReadOnlyBasicProperties properties, - ReadOnlyMemory body) + ReadOnlyMemory body, + CancellationToken cancellationToken = default) { // Nothing to do here. return Task.CompletedTask; @@ -108,16 +113,17 @@ public virtual Task HandleBasicDeliverAsync(string consumerTag, public virtual Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason) { ShutdownReason = reason; - return OnCancel(_consumerTags.ToArray()); + return OnCancelAsync(ConsumerTags, reason.CancellationToken); } /// /// Default implementation - overridable in subclasses. /// The set of consumer tags that where cancelled + /// The cancellation token. /// /// This default implementation simply sets the property to false, and takes no further action. /// - public virtual Task OnCancel(params string[] consumerTags) + protected virtual Task OnCancelAsync(string[] consumerTags, CancellationToken cancellationToken = default) { IsRunning = false; foreach (string consumerTag in consumerTags) diff --git a/projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs b/projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs index e1bf604ff..eb1d006d3 100644 --- a/projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; namespace RabbitMQ.Client @@ -20,19 +21,22 @@ public interface IAsyncBasicConsumer /// See for notification of consumer cancellation due to basicCancel /// /// Consumer tag this consumer is registered. - Task HandleBasicCancelAsync(string consumerTag); + /// The cancellation token. + Task HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken = default); /// /// Called upon successful deregistration of the consumer from the broker. /// /// Consumer tag this consumer is registered. - Task HandleBasicCancelOkAsync(string consumerTag); + /// The cancellation token. + Task HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken = default); /// /// Called upon successful registration of the consumer with the broker. /// /// Consumer tag this consumer is registered. - Task HandleBasicConsumeOkAsync(string consumerTag); + /// The cancellation token. + Task HandleBasicConsumeOkAsync(string consumerTag, CancellationToken cancellationToken = default); /// /// Called each time a message arrives for this consumer. @@ -44,7 +48,7 @@ public interface IAsyncBasicConsumer /// /// /// NOTE: Using the body outside of - /// + /// /// requires that it be copied! /// /// @@ -55,7 +59,8 @@ Task HandleBasicDeliverAsync(string consumerTag, string exchange, string routingKey, IReadOnlyBasicProperties properties, - ReadOnlyMemory body); + ReadOnlyMemory body, + CancellationToken cancellationToken = default); /// /// Called when the channel shuts down. diff --git a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs index 9b88c51f9..949467faf 100644 --- a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client.Impl; @@ -51,37 +52,37 @@ public event AsyncEventHandler Unregistered private AsyncEventingWrapper _unregisteredWrapper; ///Fires when the server confirms successful consumer cancellation. - public override async Task OnCancel(params string[] consumerTags) + protected override async Task OnCancelAsync(string[] consumerTags, CancellationToken cancellationToken = default) { - await base.OnCancel(consumerTags) + await base.OnCancelAsync(consumerTags, cancellationToken) .ConfigureAwait(false); if (!_unregisteredWrapper.IsEmpty) { - await _unregisteredWrapper.InvokeAsync(this, new ConsumerEventArgs(consumerTags)) + await _unregisteredWrapper.InvokeAsync(this, new ConsumerEventArgs(consumerTags, cancellationToken), cancellationToken) .ConfigureAwait(false); } } ///Fires when the server confirms successful consumer registration. - public override async Task HandleBasicConsumeOkAsync(string consumerTag) + public override async Task HandleBasicConsumeOkAsync(string consumerTag, CancellationToken cancellationToken = default) { - await base.HandleBasicConsumeOkAsync(consumerTag) + await base.HandleBasicConsumeOkAsync(consumerTag, cancellationToken) .ConfigureAwait(false); if (!_registeredWrapper.IsEmpty) { - await _registeredWrapper.InvokeAsync(this, new ConsumerEventArgs(new[] { consumerTag })) + await _registeredWrapper.InvokeAsync(this, new ConsumerEventArgs(new[] { consumerTag }, cancellationToken), cancellationToken) .ConfigureAwait(false); } } ///Fires the Received event. public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, - IReadOnlyBasicProperties properties, ReadOnlyMemory body) + IReadOnlyBasicProperties properties, ReadOnlyMemory body, CancellationToken cancellationToken = default) { - var deliverEventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); + var deliverEventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body, cancellationToken); // No need to call base, it's empty. - return _receivedWrapper.InvokeAsync(this, deliverEventArgs); + return _receivedWrapper.InvokeAsync(this, deliverEventArgs, cancellationToken); } ///Fires the Shutdown event. @@ -91,7 +92,7 @@ await base.HandleChannelShutdownAsync(channel, reason) .ConfigureAwait(false); if (!_shutdownWrapper.IsEmpty) { - await _shutdownWrapper.InvokeAsync(this, reason) + await _shutdownWrapper.InvokeAsync(this, reason, reason.CancellationToken) .ConfigureAwait(false); } } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs index ab5f7c203..061be4d5b 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/FallbackConsumer.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client.Logging; @@ -8,26 +9,26 @@ internal sealed class FallbackConsumer : IAsyncBasicConsumer { public IChannel? Channel { get; } = null; - Task IAsyncBasicConsumer.HandleBasicCancelAsync(string consumerTag) + Task IAsyncBasicConsumer.HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken) { ESLog.Info($"Unhandled {nameof(IAsyncBasicConsumer.HandleBasicCancelAsync)} for tag {consumerTag}"); return Task.CompletedTask; } - Task IAsyncBasicConsumer.HandleBasicCancelOkAsync(string consumerTag) + Task IAsyncBasicConsumer.HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken) { ESLog.Info($"Unhandled {nameof(IAsyncBasicConsumer.HandleBasicCancelOkAsync)} for tag {consumerTag}"); return Task.CompletedTask; } - Task IAsyncBasicConsumer.HandleBasicConsumeOkAsync(string consumerTag) + Task IAsyncBasicConsumer.HandleBasicConsumeOkAsync(string consumerTag, CancellationToken cancellationToken) { ESLog.Info($"Unhandled {nameof(IAsyncBasicConsumer.HandleBasicConsumeOkAsync)} for tag {consumerTag}"); return Task.CompletedTask; } Task IAsyncBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, - IReadOnlyBasicProperties properties, ReadOnlyMemory body) + IReadOnlyBasicProperties properties, ReadOnlyMemory body, CancellationToken cancellationToken) { ESLog.Info($"Unhandled {nameof(IAsyncBasicConsumer.HandleBasicDeliverAsync)} for tag {consumerTag}"); return Task.CompletedTask; diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index f5a45364c..25dcf5801 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -811,10 +811,10 @@ public override Task HandleChannelShutdownAsync(object channel, ShutdownEventArg return base.HandleChannelShutdownAsync(channel, reason); } - public override Task OnCancel(params string[] consumerTags) + public override Task OnCancelAsync(params string[] consumerTags) { _output.WriteLine("[ERROR] {0} OnCancel {1}", _logPrefix, consumerTags[0]); - return base.OnCancel(consumerTags); + return base.OnCancelAsync(consumerTags); } } } From c2f4c19c14c3d6c2cd8c96f554eca1746ddb5cb6 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Wed, 18 Sep 2024 20:36:02 +0000 Subject: [PATCH 06/12] Simplify wrapper to take token from arg --- .../events/AsyncEventingBasicConsumer.cs | 8 +++---- .../client/impl/AsyncEventingWrapper.cs | 22 +++++++++---------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs index 949467faf..9c714fd9f 100644 --- a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs @@ -58,7 +58,7 @@ await base.OnCancelAsync(consumerTags, cancellationToken) .ConfigureAwait(false); if (!_unregisteredWrapper.IsEmpty) { - await _unregisteredWrapper.InvokeAsync(this, new ConsumerEventArgs(consumerTags, cancellationToken), cancellationToken) + await _unregisteredWrapper.InvokeAsync(this, new ConsumerEventArgs(consumerTags, cancellationToken)) .ConfigureAwait(false); } } @@ -70,7 +70,7 @@ await base.HandleBasicConsumeOkAsync(consumerTag, cancellationToken) .ConfigureAwait(false); if (!_registeredWrapper.IsEmpty) { - await _registeredWrapper.InvokeAsync(this, new ConsumerEventArgs(new[] { consumerTag }, cancellationToken), cancellationToken) + await _registeredWrapper.InvokeAsync(this, new ConsumerEventArgs(new[] { consumerTag }, cancellationToken)) .ConfigureAwait(false); } } @@ -82,7 +82,7 @@ public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryT var deliverEventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body, cancellationToken); // No need to call base, it's empty. - return _receivedWrapper.InvokeAsync(this, deliverEventArgs, cancellationToken); + return _receivedWrapper.InvokeAsync(this, deliverEventArgs); } ///Fires the Shutdown event. @@ -92,7 +92,7 @@ await base.HandleChannelShutdownAsync(channel, reason) .ConfigureAwait(false); if (!_shutdownWrapper.IsEmpty) { - await _shutdownWrapper.InvokeAsync(this, reason, reason.CancellationToken) + await _shutdownWrapper.InvokeAsync(this, reason) .ConfigureAwait(false); } } diff --git a/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs b/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs index b0eb9de63..c66afd00d 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs @@ -5,9 +5,9 @@ namespace RabbitMQ.Client.Impl { - internal struct AsyncEventingWrapper where T : AsyncEventArgs + internal struct AsyncEventingWrapper where TEvent : AsyncEventArgs { - private event AsyncEventHandler? _event; + private event AsyncEventHandler? _event; private Delegate[]? _handlers; private string? _context; private Func? _onException; @@ -22,20 +22,20 @@ public AsyncEventingWrapper(string context, Func? handler) + public void AddHandler(AsyncEventHandler? handler) { _event += handler; _handlers = null; } - public void RemoveHandler(AsyncEventHandler? handler) + public void RemoveHandler(AsyncEventHandler? handler) { _event -= handler; _handlers = null; } // Do not make this function async! (This type is a struct that gets copied at the start of an async method => empty _handlers is copied) - public Task InvokeAsync(object sender, T parameter, CancellationToken cancellationToken = default) + public Task InvokeAsync(object sender, TEvent parameter) { Delegate[]? handlers = _handlers; if (handlers is null) @@ -49,23 +49,23 @@ public Task InvokeAsync(object sender, T parameter, CancellationToken cancellati _handlers = handlers; } - return InternalInvoke(handlers, sender, parameter, cancellationToken); + return InternalInvoke(handlers, sender, parameter); } - private readonly async Task InternalInvoke(Delegate[] handlers, object sender, T parameter, CancellationToken cancellationToken) + private readonly async Task InternalInvoke(Delegate[] handlers, object sender, TEvent @event) { - foreach (AsyncEventHandler action in handlers) + foreach (AsyncEventHandler action in handlers) { try { - await action(sender, parameter) + await action(sender, @event) .ConfigureAwait(false); } catch (Exception exception) { if (_onException != null) { - await _onException(exception, _context!, cancellationToken) + await _onException(exception, _context!, @event.CancellationToken) .ConfigureAwait(false); } else @@ -76,7 +76,7 @@ await _onException(exception, _context!, cancellationToken) } } - public void Takeover(in AsyncEventingWrapper other) + public void Takeover(in AsyncEventingWrapper other) { _event = other._event; _handlers = other._handlers; From 75e87e483ef3afa4704aed379afc14014d8ed00f Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Wed, 18 Sep 2024 20:51:43 +0000 Subject: [PATCH 07/12] Pass through token into args on declaration --- .../client/events/AsyncEventArgs.cs | 10 ++++++++++ .../RabbitMQ.Client/client/framing/Channel.cs | 2 +- .../client/impl/AutorecoveringChannel.cs | 2 +- .../impl/AutorecoveringConnection.Recovery.cs | 10 +++++----- .../client/impl/ChannelBase.cs | 20 +++++++++---------- .../client/impl/Connection.Commands.cs | 8 ++++---- 6 files changed, 31 insertions(+), 21 deletions(-) diff --git a/projects/RabbitMQ.Client/client/events/AsyncEventArgs.cs b/projects/RabbitMQ.Client/client/events/AsyncEventArgs.cs index e6ac9272b..1426731dc 100644 --- a/projects/RabbitMQ.Client/client/events/AsyncEventArgs.cs +++ b/projects/RabbitMQ.Client/client/events/AsyncEventArgs.cs @@ -68,6 +68,16 @@ public AsyncEventArgs(CancellationToken cancellationToken = default) /// public CancellationToken CancellationToken { get; } + public static AsyncEventArgs CreateOrDefault(CancellationToken cancellationToken) + { + if (cancellationToken.CanBeCanceled) + { + return new AsyncEventArgs(cancellationToken); + } + + return Empty; + } + /// /// Provides a value to use with events that do not have event data. /// diff --git a/projects/RabbitMQ.Client/client/framing/Channel.cs b/projects/RabbitMQ.Client/client/framing/Channel.cs index 7456f876a..31aa1b56b 100644 --- a/projects/RabbitMQ.Client/client/framing/Channel.cs +++ b/projects/RabbitMQ.Client/client/framing/Channel.cs @@ -98,7 +98,7 @@ protected override Task DispatchCommandAsync(IncomingCommand cmd, Cancella case ProtocolCommandId.BasicReturn: { // Note: always returns true - return HandleBasicReturn(cmd); + return HandleBasicReturn(cmd, cancellationToken); } case ProtocolCommandId.ChannelClose: { diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index 19aaf51fb..ce88dd71a 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -213,7 +213,7 @@ await _connection.RecoverConsumersAsync(this, newChannel, recordedEntitiesSemaph .ConfigureAwait(false); } - await _innerChannel.RunRecoveryEventHandlers(this) + await _innerChannel.RunRecoveryEventHandlers(this, cancellationToken) .ConfigureAwait(false); return true; diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs index bb17fcb0b..d3e086ae7 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs @@ -206,7 +206,7 @@ await RecoverChannelsAndItsConsumersAsync(recordedEntitiesSemaphoreHeld: true, c ESLog.Info("Connection recovery completed"); ThrowIfDisposed(); - await _recoverySucceededAsyncWrapper.InvokeAsync(this, AsyncEventArgs.Empty) + await _recoverySucceededAsyncWrapper.InvokeAsync(this, AsyncEventArgs.CreateOrDefault(cancellationToken)) .ConfigureAwait(false); return true; @@ -272,7 +272,7 @@ await maybeNewInnerConnection.OpenAsync(cancellationToken) if (!_connectionRecoveryErrorAsyncWrapper.IsEmpty) { // Note: recordedEntities semaphore is _NOT_ held at this point - await _connectionRecoveryErrorAsyncWrapper.InvokeAsync(this, new ConnectionRecoveryErrorEventArgs(e)) + await _connectionRecoveryErrorAsyncWrapper.InvokeAsync(this, new ConnectionRecoveryErrorEventArgs(e, cancellationToken)) .ConfigureAwait(false); } @@ -386,7 +386,7 @@ await RecordQueueAsync(new RecordedQueue(newName, recordedQueue), try { _recordedEntitiesSemaphore.Release(); - await _queueNameChangedAfterRecoveryAsyncWrapper.InvokeAsync(this, new QueueNameChangedAfterRecoveryEventArgs(oldName, newName)) + await _queueNameChangedAfterRecoveryAsyncWrapper.InvokeAsync(this, new QueueNameChangedAfterRecoveryEventArgs(oldName, newName, cancellationToken)) .ConfigureAwait(false); } finally @@ -520,7 +520,7 @@ internal async ValueTask RecoverConsumersAsync(AutorecoveringChannel channelToRe try { _recordedEntitiesSemaphore.Release(); - await _recoveringConsumerAsyncWrapper.InvokeAsync(this, new RecoveringConsumerEventArgs(consumer.ConsumerTag, consumer.Arguments)) + await _recoveringConsumerAsyncWrapper.InvokeAsync(this, new RecoveringConsumerEventArgs(consumer.ConsumerTag, consumer.Arguments, cancellationToken)) .ConfigureAwait(false); } finally @@ -542,7 +542,7 @@ await _recordedEntitiesSemaphore.WaitAsync(cancellationToken) try { _recordedEntitiesSemaphore.Release(); - await _consumerTagChangeAfterRecoveryAsyncWrapper.InvokeAsync(this, new ConsumerTagChangedAfterRecoveryEventArgs(oldTag, newTag)) + await _consumerTagChangeAfterRecoveryAsyncWrapper.InvokeAsync(this, new ConsumerTagChangedAfterRecoveryEventArgs(oldTag, newTag, cancellationToken)) .ConfigureAwait(false); } finally diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 447a2ec8b..6907a93fb 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -163,9 +163,9 @@ public event AsyncEventHandler RecoveryAsync private AsyncEventingWrapper _recoveryAsyncWrapper; - internal Task RunRecoveryEventHandlers(object sender) + internal Task RunRecoveryEventHandlers(object sender, CancellationToken cancellationToken) { - return _recoveryAsyncWrapper.InvokeAsync(sender, AsyncEventArgs.Empty); + return _recoveryAsyncWrapper.InvokeAsync(sender, AsyncEventArgs.CreateOrDefault(cancellationToken)); } public int ChannelNumber => ((Session)Session).ChannelNumber; @@ -494,7 +494,7 @@ await _channelShutdownAsyncWrapper.InvokeAsync(this, reason) if (ConfirmsAreEnabled) { - await _confirmSemaphore.WaitAsync() + await _confirmSemaphore.WaitAsync(reason.CancellationToken) .ConfigureAwait(false); try { @@ -582,7 +582,7 @@ protected async Task HandleBasicAck(IncomingCommand cmd, CancellationToken var ack = new BasicAck(cmd.MethodSpan); if (!_basicAcksAsyncWrapper.IsEmpty) { - var args = new BasicAckEventArgs(ack._deliveryTag, ack._multiple); + var args = new BasicAckEventArgs(ack._deliveryTag, ack._multiple, cancellationToken); await _basicAcksAsyncWrapper.InvokeAsync(this, args) .ConfigureAwait(false); } @@ -598,7 +598,7 @@ protected async Task HandleBasicNack(IncomingCommand cmd, CancellationToke if (!_basicNacksAsyncWrapper.IsEmpty) { var args = new BasicNackEventArgs( - nack._deliveryTag, nack._multiple, nack._requeue); + nack._deliveryTag, nack._multiple, nack._requeue, cancellationToken); await _basicNacksAsyncWrapper.InvokeAsync(this, args) .ConfigureAwait(false); } @@ -641,14 +641,14 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag) return deliveryTag; } - protected async Task HandleBasicReturn(IncomingCommand cmd) + protected async Task HandleBasicReturn(IncomingCommand cmd, CancellationToken cancellationToken) { if (!_basicReturnAsyncWrapper.IsEmpty) { var basicReturn = new BasicReturn(cmd.MethodSpan); var e = new BasicReturnEventArgs(basicReturn._replyCode, basicReturn._replyText, basicReturn._exchange, basicReturn._routingKey, - new ReadOnlyBasicProperties(cmd.HeaderSpan), cmd.Body.Memory); + new ReadOnlyBasicProperties(cmd.HeaderSpan), cmd.Body.Memory, cancellationToken); await _basicReturnAsyncWrapper.InvokeAsync(this, e) .ConfigureAwait(false); } @@ -713,7 +713,7 @@ await ModelSendAsync(in method, cancellationToken). if (!_flowControlAsyncWrapper.IsEmpty) { - await _flowControlAsyncWrapper.InvokeAsync(this, new FlowControlEventArgs(active)) + await _flowControlAsyncWrapper.InvokeAsync(this, new FlowControlEventArgs(active, cancellationToken)) .ConfigureAwait(false); } @@ -723,7 +723,7 @@ await ModelSendAsync(in method, cancellationToken). protected async Task HandleConnectionBlockedAsync(IncomingCommand cmd, CancellationToken cancellationToken) { string reason = new ConnectionBlocked(cmd.MethodSpan)._reason; - await Session.Connection.HandleConnectionBlockedAsync(reason) + await Session.Connection.HandleConnectionBlockedAsync(reason, cancellationToken) .ConfigureAwait(false); return true; } @@ -801,7 +801,7 @@ await k.HandleCommandAsync(cmd) protected async Task HandleConnectionUnblockedAsync(CancellationToken cancellationToken) { - await Session.Connection.HandleConnectionUnblockedAsync() + await Session.Connection.HandleConnectionUnblockedAsync(cancellationToken) .ConfigureAwait(false); return true; } diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs index 40f68630d..08bbfdbae 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Commands.cs @@ -54,20 +54,20 @@ internal void NotifyReceivedCloseOk() _closed = true; } - internal Task HandleConnectionBlockedAsync(string reason) + internal Task HandleConnectionBlockedAsync(string reason, CancellationToken cancellationToken) { if (!_connectionBlockedAsyncWrapper.IsEmpty) { - return _connectionBlockedAsyncWrapper.InvokeAsync(this, new ConnectionBlockedEventArgs(reason)); + return _connectionBlockedAsyncWrapper.InvokeAsync(this, new ConnectionBlockedEventArgs(reason, cancellationToken)); } return Task.CompletedTask; } - internal Task HandleConnectionUnblockedAsync() + internal Task HandleConnectionUnblockedAsync(CancellationToken cancellationToken) { if (!_connectionUnblockedAsyncWrapper.IsEmpty) { - return _connectionUnblockedAsyncWrapper.InvokeAsync(this, AsyncEventArgs.Empty); + return _connectionUnblockedAsyncWrapper.InvokeAsync(this, AsyncEventArgs.CreateOrDefault(cancellationToken)); } return Task.CompletedTask; } From 66acbd5dc38d97adfdb1f3f482a3a80d8a34d05b Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Wed, 18 Sep 2024 20:52:11 +0000 Subject: [PATCH 08/12] Address compiler warning --- projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs b/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs index c66afd00d..f7564dc77 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs @@ -12,7 +12,7 @@ internal struct AsyncEventingWrapper where TEvent : AsyncEventArgs private string? _context; private Func? _onException; - public readonly bool IsEmpty => _event is null; + public bool IsEmpty => _event is null; public AsyncEventingWrapper(string context, Func onException) { From 9b8c8e801f66f46c01baa0a1001cc341f8dea8ed Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Wed, 18 Sep 2024 20:54:24 +0000 Subject: [PATCH 09/12] Rename --- .../client/impl/AsyncEventingWrapper.cs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs b/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs index f7564dc77..7599129ac 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncEventingWrapper.cs @@ -7,16 +7,16 @@ namespace RabbitMQ.Client.Impl { internal struct AsyncEventingWrapper where TEvent : AsyncEventArgs { - private event AsyncEventHandler? _event; + private event AsyncEventHandler? _eventHandler; private Delegate[]? _handlers; private string? _context; private Func? _onException; - public bool IsEmpty => _event is null; + public bool IsEmpty => _eventHandler is null; public AsyncEventingWrapper(string context, Func onException) { - _event = null; + _eventHandler = null; _handlers = null; _context = context; _onException = onException; @@ -24,13 +24,13 @@ public AsyncEventingWrapper(string context, Func? handler) { - _event += handler; + _eventHandler += handler; _handlers = null; } public void RemoveHandler(AsyncEventHandler? handler) { - _event -= handler; + _eventHandler -= handler; _handlers = null; } @@ -40,7 +40,7 @@ public Task InvokeAsync(object sender, TEvent parameter) Delegate[]? handlers = _handlers; if (handlers is null) { - handlers = _event?.GetInvocationList(); + handlers = _eventHandler?.GetInvocationList(); if (handlers is null) { return Task.CompletedTask; @@ -78,7 +78,7 @@ await _onException(exception, _context!, @event.CancellationToken) public void Takeover(in AsyncEventingWrapper other) { - _event = other._event; + _eventHandler = other._eventHandler; _handlers = other._handlers; _context = other._context; _onException = other._onException; From 4be03673c0e0a52405a7755afc2aaabe92b0ddf7 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Wed, 18 Sep 2024 21:05:13 -0700 Subject: [PATCH 10/12] Ensure that PublicAPI.Shipped.txt contains new signatures and method names. Ensure code compiles. --- .../AsyncBasicConsumerFake.cs | 13 ++-- .../Networking_BasicDeliver_Commons.cs | 4 +- .../RabbitMQ.Client/PublicAPI.Shipped.txt | 71 ++++++++++--------- .../client/api/AsyncDefaultBasicConsumer.cs | 9 +-- .../Test/Common/TestConnectionRecoveryBase.cs | 23 +++--- .../Test/Integration/TestAsyncConsumer.cs | 27 ++++--- .../TestAsyncConsumerCancelNotify.cs | 6 +- .../TestAsyncConsumerExceptions.cs | 12 ++-- .../TestAsyncConsumerOperationDispatch.cs | 6 +- projects/Test/Integration/TestMainLoop.cs | 4 +- .../TestConnectionBlocked.cs | 2 +- .../TestConnectionBlockedChannelLeak.cs | 2 +- 12 files changed, 106 insertions(+), 73 deletions(-) diff --git a/projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs b/projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs index caf408be0..31a0e83bf 100644 --- a/projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs +++ b/projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs @@ -17,8 +17,9 @@ public AsyncBasicConsumerFake(ManualResetEventSlim autoResetEvent) _autoResetEvent = autoResetEvent; } - public Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, - IReadOnlyBasicProperties properties, ReadOnlyMemory body) + public Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, + string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body, + CancellationToken cancellationToken = default) { if (Interlocked.Increment(ref _current) == Count) { @@ -28,14 +29,16 @@ public Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool return Task.CompletedTask; } - public Task HandleBasicCancelAsync(string consumerTag) => Task.CompletedTask; + public Task HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken = default) => Task.CompletedTask; - public Task HandleBasicCancelOkAsync(string consumerTag) => Task.CompletedTask; + public Task HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken = default) => Task.CompletedTask; - public Task HandleBasicConsumeOkAsync(string consumerTag) => Task.CompletedTask; + public Task HandleBasicConsumeOkAsync(string consumerTag, CancellationToken cancellationToken = default) => Task.CompletedTask; public Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason) => Task.CompletedTask; + public Task OnCancelAsync(string[] consumerTags, CancellationToken cancellationToken = default) => Task.CompletedTask; + public IChannel Channel { get; } } } diff --git a/projects/Benchmarks/Networking/Networking_BasicDeliver_Commons.cs b/projects/Benchmarks/Networking/Networking_BasicDeliver_Commons.cs index d8e35ec56..3f41a5a02 100644 --- a/projects/Benchmarks/Networking/Networking_BasicDeliver_Commons.cs +++ b/projects/Benchmarks/Networking/Networking_BasicDeliver_Commons.cs @@ -42,7 +42,9 @@ public CountingConsumer(IChannel channel, uint messageCount) : base(channel) } /// - public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body) + public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, + string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body, + CancellationToken cancellationToken = default) { if (Interlocked.Decrement(ref _remainingCount) == 0) { diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index c923a00ec..be90e34c8 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -60,9 +60,6 @@ override RabbitMQ.Client.AmqpTcpEndpoint.ToString() -> string override RabbitMQ.Client.AmqpTimestamp.Equals(object obj) -> bool override RabbitMQ.Client.AmqpTimestamp.GetHashCode() -> int override RabbitMQ.Client.AmqpTimestamp.ToString() -> string -override RabbitMQ.Client.Events.AsyncEventingBasicConsumer.OnCancel(params string[] consumerTags) -> System.Threading.Tasks.Task -override RabbitMQ.Client.Events.AsyncEventingBasicConsumer.HandleBasicConsumeOkAsync(string consumerTag) -> System.Threading.Tasks.Task -override RabbitMQ.Client.Events.AsyncEventingBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IReadOnlyBasicProperties properties, System.ReadOnlyMemory body) -> System.Threading.Tasks.Task override RabbitMQ.Client.Events.AsyncEventingBasicConsumer.HandleChannelShutdownAsync(object channel, RabbitMQ.Client.ShutdownEventArgs reason) -> System.Threading.Tasks.Task override RabbitMQ.Client.Exceptions.MalformedFrameException.ReplyCode.get -> ushort override RabbitMQ.Client.Exceptions.SyntaxErrorException.ReplyCode.get -> ushort @@ -254,33 +251,20 @@ RabbitMQ.Client.Events.AsyncEventingBasicConsumer.Registered -> RabbitMQ.Client. RabbitMQ.Client.Events.AsyncEventingBasicConsumer.Shutdown -> RabbitMQ.Client.Events.AsyncEventHandler RabbitMQ.Client.Events.AsyncEventingBasicConsumer.Unregistered -> RabbitMQ.Client.Events.AsyncEventHandler RabbitMQ.Client.Events.BaseExceptionEventArgs -RabbitMQ.Client.Events.BaseExceptionEventArgs.BaseExceptionEventArgs(System.Collections.Generic.IDictionary detail, System.Exception exception) -> void RabbitMQ.Client.Events.BasicAckEventArgs -RabbitMQ.Client.Events.BasicAckEventArgs.BasicAckEventArgs(ulong deliveryTag, bool multiple) -> void RabbitMQ.Client.Events.BasicDeliverEventArgs -RabbitMQ.Client.Events.BasicDeliverEventArgs.BasicDeliverEventArgs(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IReadOnlyBasicProperties properties, System.ReadOnlyMemory body) -> void RabbitMQ.Client.Events.BasicNackEventArgs -RabbitMQ.Client.Events.BasicNackEventArgs.BasicNackEventArgs(ulong deliveryTag, bool multiple, bool requeue) -> void RabbitMQ.Client.Events.BasicReturnEventArgs -RabbitMQ.Client.Events.BasicReturnEventArgs.BasicReturnEventArgs(ushort replyCode, string replyText, string exchange, string routingKey, RabbitMQ.Client.IReadOnlyBasicProperties basicProperties, System.ReadOnlyMemory body) -> void RabbitMQ.Client.Events.CallbackExceptionEventArgs -RabbitMQ.Client.Events.CallbackExceptionEventArgs.CallbackExceptionEventArgs(System.Collections.Generic.IDictionary detail, System.Exception exception) -> void RabbitMQ.Client.Events.ConnectionBlockedEventArgs -RabbitMQ.Client.Events.ConnectionBlockedEventArgs.ConnectionBlockedEventArgs(string reason) -> void RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs -RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs.ConnectionRecoveryErrorEventArgs(System.Exception ex) -> void RabbitMQ.Client.Events.ConsumerEventArgs -RabbitMQ.Client.Events.ConsumerEventArgs.ConsumerEventArgs(string[] consumerTags) -> void RabbitMQ.Client.Events.ConsumerTagChangedAfterRecoveryEventArgs -RabbitMQ.Client.Events.ConsumerTagChangedAfterRecoveryEventArgs.ConsumerTagChangedAfterRecoveryEventArgs(string tagBefore, string tagAfter) -> void RabbitMQ.Client.Events.FlowControlEventArgs -RabbitMQ.Client.Events.FlowControlEventArgs.FlowControlEventArgs(bool active) -> void RabbitMQ.Client.Events.QueueNameChangedAfterRecoveryEventArgs -RabbitMQ.Client.Events.QueueNameChangedAfterRecoveryEventArgs.QueueNameChangedAfterRecoveryEventArgs(string nameBefore, string nameAfter) -> void RabbitMQ.Client.Events.RecoveringConsumerEventArgs RabbitMQ.Client.Events.RecoveringConsumerEventArgs.ConsumerArguments.get -> System.Collections.Generic.IDictionary RabbitMQ.Client.Events.RecoveringConsumerEventArgs.ConsumerTag.get -> string -RabbitMQ.Client.Events.RecoveringConsumerEventArgs.RecoveringConsumerEventArgs(string consumerTag, System.Collections.Generic.IDictionary consumerArguments) -> void RabbitMQ.Client.Exceptions.AlreadyClosedException RabbitMQ.Client.Exceptions.AlreadyClosedException.AlreadyClosedException(RabbitMQ.Client.ShutdownEventArgs reason) -> void RabbitMQ.Client.Exceptions.AuthenticationFailureException @@ -368,10 +352,6 @@ RabbitMQ.Client.IAmqpWriteable.GetRequiredBufferSize() -> int RabbitMQ.Client.IAmqpWriteable.WriteTo(System.Span span) -> int RabbitMQ.Client.IAsyncBasicConsumer RabbitMQ.Client.IAsyncBasicConsumer.Channel.get -> RabbitMQ.Client.IChannel -RabbitMQ.Client.IAsyncBasicConsumer.HandleBasicCancelAsync(string consumerTag) -> System.Threading.Tasks.Task -RabbitMQ.Client.IAsyncBasicConsumer.HandleBasicCancelOkAsync(string consumerTag) -> System.Threading.Tasks.Task -RabbitMQ.Client.IAsyncBasicConsumer.HandleBasicConsumeOkAsync(string consumerTag) -> System.Threading.Tasks.Task -RabbitMQ.Client.IAsyncBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IReadOnlyBasicProperties properties, System.ReadOnlyMemory body) -> System.Threading.Tasks.Task RabbitMQ.Client.IAsyncBasicConsumer.HandleChannelShutdownAsync(object channel, RabbitMQ.Client.ShutdownEventArgs reason) -> System.Threading.Tasks.Task RabbitMQ.Client.IAuthMechanism RabbitMQ.Client.IAuthMechanismFactory @@ -550,7 +530,6 @@ RabbitMQ.Client.IRecordedQueue.Exclusive.get -> bool RabbitMQ.Client.IRecordedQueue.IsServerNamed.get -> bool RabbitMQ.Client.IRecordedQueue.Name.get -> string RabbitMQ.Client.IRecoverable -RabbitMQ.Client.IRecoverable.RecoveryAsync -> RabbitMQ.Client.Events.AsyncEventHandler RabbitMQ.Client.ITcpClient RabbitMQ.Client.ITcpClient.Client.get -> System.Net.Sockets.Socket RabbitMQ.Client.ITcpClient.Close() -> void @@ -618,9 +597,6 @@ RabbitMQ.Client.ShutdownEventArgs.Initiator.get -> RabbitMQ.Client.ShutdownIniti RabbitMQ.Client.ShutdownEventArgs.MethodId.get -> ushort RabbitMQ.Client.ShutdownEventArgs.ReplyCode.get -> ushort RabbitMQ.Client.ShutdownEventArgs.ReplyText.get -> string -RabbitMQ.Client.ShutdownEventArgs.ShutdownEventArgs(RabbitMQ.Client.ShutdownInitiator initiator, ushort replyCode, string replyText, object cause = null) -> void -RabbitMQ.Client.ShutdownEventArgs.ShutdownEventArgs(RabbitMQ.Client.ShutdownInitiator initiator, ushort replyCode, string replyText, System.Exception exception) -> void -RabbitMQ.Client.ShutdownEventArgs.ShutdownEventArgs(RabbitMQ.Client.ShutdownInitiator initiator, ushort replyCode, string replyText, ushort classId, ushort methodId, object cause = null) -> void RabbitMQ.Client.ShutdownInitiator RabbitMQ.Client.ShutdownInitiator.Application = 0 -> RabbitMQ.Client.ShutdownInitiator RabbitMQ.Client.ShutdownInitiator.Library = 1 -> RabbitMQ.Client.ShutdownInitiator @@ -749,8 +725,6 @@ static RabbitMQ.Client.ConnectionFactory.DefaultAmqpUriSslProtocols.get -> Syste static RabbitMQ.Client.ConnectionFactory.DefaultAmqpUriSslProtocols.set -> void static RabbitMQ.Client.ConnectionFactoryBase.DefaultSocketFactory(System.Net.Sockets.AddressFamily addressFamily) -> RabbitMQ.Client.ITcpClient static RabbitMQ.Client.EndpointResolverExtensions.SelectOneAsync(this RabbitMQ.Client.IEndpointResolver resolver, System.Func> selector, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task -static RabbitMQ.Client.Events.CallbackExceptionEventArgs.Build(System.Exception e, string context) -> RabbitMQ.Client.Events.CallbackExceptionEventArgs -static RabbitMQ.Client.Events.CallbackExceptionEventArgs.Build(System.Exception e, string context, object consumer) -> RabbitMQ.Client.Events.CallbackExceptionEventArgs static RabbitMQ.Client.ExchangeType.All() -> System.Collections.Generic.ICollection static RabbitMQ.Client.PublicationAddress.Parse(string uriLikeString) -> RabbitMQ.Client.PublicationAddress static RabbitMQ.Client.PublicationAddress.TryParse(string uriLikeString, out RabbitMQ.Client.PublicationAddress result) -> bool @@ -765,12 +739,7 @@ static readonly RabbitMQ.Client.ConnectionFactory.DefaultHeartbeat -> System.Tim static readonly RabbitMQ.Client.Protocols.AMQP_0_9_1 -> RabbitMQ.Client.IProtocol static readonly RabbitMQ.Client.Protocols.DefaultProtocol -> RabbitMQ.Client.IProtocol static readonly RabbitMQ.Client.PublicationAddress.PSEUDO_URI_PARSER -> System.Text.RegularExpressions.Regex -virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleBasicCancelAsync(string consumerTag) -> System.Threading.Tasks.Task -virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleBasicCancelOkAsync(string consumerTag) -> System.Threading.Tasks.Task -virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleBasicConsumeOkAsync(string consumerTag) -> System.Threading.Tasks.Task -virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IReadOnlyBasicProperties properties, System.ReadOnlyMemory body) -> System.Threading.Tasks.Task virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleChannelShutdownAsync(object channel, RabbitMQ.Client.ShutdownEventArgs reason) -> System.Threading.Tasks.Task -virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.OnCancel(params string[] consumerTags) -> System.Threading.Tasks.Task virtual RabbitMQ.Client.Exceptions.ProtocolException.ShutdownReason.get -> RabbitMQ.Client.ShutdownEventArgs virtual RabbitMQ.Client.TcpClientAdapter.Client.get -> System.Net.Sockets.Socket virtual RabbitMQ.Client.TcpClientAdapter.Close() -> void @@ -889,9 +858,45 @@ RabbitMQ.Client.IConnection.CreateChannelAsync(ushort? consumerDispatchConcurren RabbitMQ.Client.IConnection.CallbackExceptionAsync -> RabbitMQ.Client.Events.AsyncEventHandler! RabbitMQ.Client.IConnection.ConnectionBlockedAsync -> RabbitMQ.Client.Events.AsyncEventHandler! RabbitMQ.Client.IConnection.ConnectionShutdownAsync -> RabbitMQ.Client.Events.AsyncEventHandler! -RabbitMQ.Client.IConnection.RecoverySucceededAsync -> RabbitMQ.Client.Events.AsyncEventHandler! RabbitMQ.Client.IConnection.ConnectionRecoveryErrorAsync -> RabbitMQ.Client.Events.AsyncEventHandler! RabbitMQ.Client.IConnection.ConsumerTagChangeAfterRecoveryAsync -> RabbitMQ.Client.Events.AsyncEventHandler! RabbitMQ.Client.IConnection.QueueNameChangedAfterRecoveryAsync -> RabbitMQ.Client.Events.AsyncEventHandler! RabbitMQ.Client.IConnection.RecoveringConsumerAsync -> RabbitMQ.Client.Events.AsyncEventHandler! -RabbitMQ.Client.IConnection.ConnectionUnblockedAsync -> RabbitMQ.Client.Events.AsyncEventHandler! +override RabbitMQ.Client.Events.AsyncEventingBasicConsumer.HandleBasicConsumeOkAsync(string! consumerTag, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +override RabbitMQ.Client.Events.AsyncEventingBasicConsumer.HandleBasicDeliverAsync(string! consumerTag, ulong deliveryTag, bool redelivered, string! exchange, string! routingKey, RabbitMQ.Client.IReadOnlyBasicProperties! properties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +override RabbitMQ.Client.Events.AsyncEventingBasicConsumer.OnCancelAsync(string![]! consumerTags, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +RabbitMQ.Client.Events.AsyncEventArgs +RabbitMQ.Client.Events.AsyncEventArgs.AsyncEventArgs(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void +RabbitMQ.Client.Events.AsyncEventArgs.CancellationToken.get -> System.Threading.CancellationToken +RabbitMQ.Client.Events.BaseExceptionEventArgs.BaseExceptionEventArgs(System.Collections.Generic.IDictionary! detail, System.Exception! exception, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void +RabbitMQ.Client.Events.BasicAckEventArgs.BasicAckEventArgs(ulong deliveryTag, bool multiple, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void +RabbitMQ.Client.Events.BasicDeliverEventArgs.BasicDeliverEventArgs(string! consumerTag, ulong deliveryTag, bool redelivered, string! exchange, string! routingKey, RabbitMQ.Client.IReadOnlyBasicProperties! properties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void +RabbitMQ.Client.Events.BasicNackEventArgs.BasicNackEventArgs(ulong deliveryTag, bool multiple, bool requeue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void +RabbitMQ.Client.Events.BasicReturnEventArgs.BasicReturnEventArgs(ushort replyCode, string! replyText, string! exchange, string! routingKey, RabbitMQ.Client.IReadOnlyBasicProperties! basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void +RabbitMQ.Client.Events.CallbackExceptionEventArgs.CallbackExceptionEventArgs(System.Collections.Generic.IDictionary! detail, System.Exception! exception, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void +RabbitMQ.Client.Events.ConnectionBlockedEventArgs.ConnectionBlockedEventArgs(string! reason, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void +RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs.ConnectionRecoveryErrorEventArgs(System.Exception! ex, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void +RabbitMQ.Client.Events.ConsumerEventArgs.ConsumerEventArgs(string![]! consumerTags, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void +RabbitMQ.Client.Events.ConsumerTagChangedAfterRecoveryEventArgs.ConsumerTagChangedAfterRecoveryEventArgs(string! tagBefore, string! tagAfter, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void +RabbitMQ.Client.Events.FlowControlEventArgs.FlowControlEventArgs(bool active, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void +RabbitMQ.Client.Events.QueueNameChangedAfterRecoveryEventArgs.QueueNameChangedAfterRecoveryEventArgs(string! nameBefore, string! nameAfter, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void +RabbitMQ.Client.Events.RecoveringConsumerEventArgs.RecoveringConsumerEventArgs(string! consumerTag, System.Collections.Generic.IDictionary? consumerArguments, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void +RabbitMQ.Client.IAsyncBasicConsumer.HandleBasicCancelAsync(string! consumerTag, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +RabbitMQ.Client.IAsyncBasicConsumer.HandleBasicCancelOkAsync(string! consumerTag, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +RabbitMQ.Client.IAsyncBasicConsumer.HandleBasicConsumeOkAsync(string! consumerTag, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +RabbitMQ.Client.IAsyncBasicConsumer.HandleBasicDeliverAsync(string! consumerTag, ulong deliveryTag, bool redelivered, string! exchange, string! routingKey, RabbitMQ.Client.IReadOnlyBasicProperties! properties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +RabbitMQ.Client.IConnection.ConnectionUnblockedAsync -> RabbitMQ.Client.Events.AsyncEventHandler! +RabbitMQ.Client.IConnection.RecoverySucceededAsync -> RabbitMQ.Client.Events.AsyncEventHandler! +RabbitMQ.Client.IRecoverable.RecoveryAsync -> RabbitMQ.Client.Events.AsyncEventHandler! +RabbitMQ.Client.ShutdownEventArgs.ShutdownEventArgs(RabbitMQ.Client.ShutdownInitiator initiator, ushort replyCode, string! replyText, object? cause = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void +RabbitMQ.Client.ShutdownEventArgs.ShutdownEventArgs(RabbitMQ.Client.ShutdownInitiator initiator, ushort replyCode, string! replyText, System.Exception! exception, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void +RabbitMQ.Client.ShutdownEventArgs.ShutdownEventArgs(RabbitMQ.Client.ShutdownInitiator initiator, ushort replyCode, string! replyText, ushort classId, ushort methodId, object? cause = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void +static RabbitMQ.Client.Events.AsyncEventArgs.CreateOrDefault(System.Threading.CancellationToken cancellationToken) -> RabbitMQ.Client.Events.AsyncEventArgs! +static RabbitMQ.Client.Events.CallbackExceptionEventArgs.Build(System.Exception! e, string! context, object! consumer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> RabbitMQ.Client.Events.CallbackExceptionEventArgs! +static RabbitMQ.Client.Events.CallbackExceptionEventArgs.Build(System.Exception! e, string! context, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> RabbitMQ.Client.Events.CallbackExceptionEventArgs! +static readonly RabbitMQ.Client.Events.AsyncEventArgs.Empty -> RabbitMQ.Client.Events.AsyncEventArgs! +virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleBasicCancelAsync(string! consumerTag, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleBasicCancelOkAsync(string! consumerTag, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleBasicConsumeOkAsync(string! consumerTag, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleBasicDeliverAsync(string! consumerTag, ulong deliveryTag, bool redelivered, string! exchange, string! routingKey, RabbitMQ.Client.IReadOnlyBasicProperties! properties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.OnCancelAsync(string![]! consumerTags, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! diff --git a/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs b/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs index e9bc07b0f..419449539 100644 --- a/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs @@ -57,7 +57,7 @@ public string[] ConsumerTags /// The cancellation token. public virtual Task HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken = default) { - return OnCancelAsync(new []{ consumerTag }, cancellationToken); + return OnCancelAsync(new[] { consumerTag }, cancellationToken); } /// @@ -67,7 +67,7 @@ public virtual Task HandleBasicCancelAsync(string consumerTag, CancellationToken /// The cancellation token. public virtual Task HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken = default) { - return OnCancelAsync(new []{ consumerTag }, cancellationToken); + return OnCancelAsync(new[] { consumerTag }, cancellationToken); } /// @@ -98,7 +98,7 @@ public virtual Task HandleBasicDeliverAsync(string consumerTag, string exchange, string routingKey, IReadOnlyBasicProperties properties, - ReadOnlyMemory body, + ReadOnlyMemory body, CancellationToken cancellationToken = default) { // Nothing to do here. @@ -118,7 +118,7 @@ public virtual Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs /// /// Default implementation - overridable in subclasses. - /// The set of consumer tags that where cancelled + /// The set of consumer tags that were cancelled /// The cancellation token. /// /// This default implementation simply sets the property to false, and takes no further action. @@ -126,6 +126,7 @@ public virtual Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs protected virtual Task OnCancelAsync(string[] consumerTags, CancellationToken cancellationToken = default) { IsRunning = false; + foreach (string consumerTag in consumerTags) { _consumerTags.Remove(consumerTag); diff --git a/projects/Test/Common/TestConnectionRecoveryBase.cs b/projects/Test/Common/TestConnectionRecoveryBase.cs index 150e919e3..201861752 100644 --- a/projects/Test/Common/TestConnectionRecoveryBase.cs +++ b/projects/Test/Common/TestConnectionRecoveryBase.cs @@ -278,9 +278,10 @@ public AckingBasicConsumer(IChannel channel, ushort totalMessageCount, TaskCompl { } - public override Task PostHandleDeliveryAsync(ulong deliveryTag) + public override Task PostHandleDeliveryAsync(ulong deliveryTag, + CancellationToken cancellationToken = default) { - return Channel.BasicAckAsync(deliveryTag, false).AsTask(); + return Channel.BasicAckAsync(deliveryTag, false, cancellationToken).AsTask(); } } @@ -291,9 +292,10 @@ public NackingBasicConsumer(IChannel channel, ushort totalMessageCount, TaskComp { } - public override Task PostHandleDeliveryAsync(ulong deliveryTag) + public override Task PostHandleDeliveryAsync(ulong deliveryTag, + CancellationToken cancellationToken = default) { - return Channel.BasicNackAsync(deliveryTag, false, false).AsTask(); + return Channel.BasicNackAsync(deliveryTag, false, false, cancellationToken).AsTask(); } } @@ -304,9 +306,10 @@ public RejectingBasicConsumer(IChannel channel, ushort totalMessageCount, TaskCo { } - public override Task PostHandleDeliveryAsync(ulong deliveryTag) + public override Task PostHandleDeliveryAsync(ulong deliveryTag, + CancellationToken cancellationToken = default) { - return Channel.BasicRejectAsync(deliveryTag, false).AsTask(); + return Channel.BasicRejectAsync(deliveryTag, false, cancellationToken).AsTask(); } } @@ -329,11 +332,12 @@ public override Task HandleBasicDeliverAsync(string consumerTag, string exchange, string routingKey, IReadOnlyBasicProperties properties, - ReadOnlyMemory body) + ReadOnlyMemory body, + CancellationToken cancellationToken = default) { try { - return PostHandleDeliveryAsync(deliveryTag); + return PostHandleDeliveryAsync(deliveryTag, cancellationToken); } finally { @@ -345,7 +349,8 @@ public override Task HandleBasicDeliverAsync(string consumerTag, } } - public virtual Task PostHandleDeliveryAsync(ulong deliveryTag) + public virtual Task PostHandleDeliveryAsync(ulong deliveryTag, + CancellationToken cancellationToken = default) { return Task.CompletedTask; } diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index 25dcf5801..6cc67e84b 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -780,29 +780,35 @@ public DefaultAsyncConsumer(IChannel channel, string logPrefix, ITestOutputHelpe _output = output; } - public override Task HandleBasicCancelAsync(string consumerTag) + public override Task HandleBasicCancelAsync(string consumerTag, + CancellationToken cancellationToken = default) { _output.WriteLine("[ERROR] {0} HandleBasicCancelAsync {1}", _logPrefix, consumerTag); - return base.HandleBasicCancelAsync(consumerTag); + return base.HandleBasicCancelAsync(consumerTag, cancellationToken); } - public override Task HandleBasicCancelOkAsync(string consumerTag) + public override Task HandleBasicCancelOkAsync(string consumerTag, + CancellationToken cancellationToken = default) { _output.WriteLine("[ERROR] {0} HandleBasicCancelOkAsync {1}", _logPrefix, consumerTag); - return base.HandleBasicCancelOkAsync(consumerTag); + return base.HandleBasicCancelOkAsync(consumerTag, cancellationToken); } - public override Task HandleBasicConsumeOkAsync(string consumerTag) + public override Task HandleBasicConsumeOkAsync(string consumerTag, + CancellationToken cancellationToken = default) { _output.WriteLine("[ERROR] {0} HandleBasicConsumeOkAsync {1}", _logPrefix, consumerTag); - return base.HandleBasicConsumeOkAsync(consumerTag); + return base.HandleBasicConsumeOkAsync(consumerTag, cancellationToken); } public override async Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body) + string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body, + CancellationToken cancellationToken = default) { _output.WriteLine("[ERROR] {0} HandleBasicDeliverAsync {1}", _logPrefix, consumerTag); - await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); + await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, + exchange, routingKey, properties, body, + cancellationToken); } public override Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason) @@ -811,10 +817,11 @@ public override Task HandleChannelShutdownAsync(object channel, ShutdownEventArg return base.HandleChannelShutdownAsync(channel, reason); } - public override Task OnCancelAsync(params string[] consumerTags) + protected override Task OnCancelAsync(string[] consumerTags, + CancellationToken cancellationToken = default) { _output.WriteLine("[ERROR] {0} OnCancel {1}", _logPrefix, consumerTags[0]); - return base.OnCancelAsync(consumerTags); + return base.OnCancelAsync(consumerTags, cancellationToken); } } } diff --git a/projects/Test/Integration/TestAsyncConsumerCancelNotify.cs b/projects/Test/Integration/TestAsyncConsumerCancelNotify.cs index c13afaa36..8cea6ec8c 100644 --- a/projects/Test/Integration/TestAsyncConsumerCancelNotify.cs +++ b/projects/Test/Integration/TestAsyncConsumerCancelNotify.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System.Linq; +using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Events; @@ -114,7 +115,8 @@ public CancelNotificationConsumer(IChannel channel, TestAsyncConsumerCancelNotif } } - public override Task HandleBasicCancelAsync(string consumerTag) + public override Task HandleBasicCancelAsync(string consumerTag, + CancellationToken cancellationToken = default) { if (!_eventMode) { @@ -122,7 +124,7 @@ public override Task HandleBasicCancelAsync(string consumerTag) _testClass._tcs.SetResult(true); } - return base.HandleBasicCancelAsync(consumerTag); + return base.HandleBasicCancelAsync(consumerTag, cancellationToken); } private Task CancelledAsync(object sender, ConsumerEventArgs arg) diff --git a/projects/Test/Integration/TestAsyncConsumerExceptions.cs b/projects/Test/Integration/TestAsyncConsumerExceptions.cs index 71251fbeb..b3d238697 100644 --- a/projects/Test/Integration/TestAsyncConsumerExceptions.cs +++ b/projects/Test/Integration/TestAsyncConsumerExceptions.cs @@ -136,7 +136,8 @@ public override Task HandleBasicDeliverAsync(string consumerTag, string exchange, string routingKey, IReadOnlyBasicProperties properties, - ReadOnlyMemory body) + ReadOnlyMemory body, + CancellationToken cancellationToken = default) { return Task.FromException(TestException); } @@ -148,7 +149,8 @@ public ConsumerFailingOnCancel(IChannel channel) : base(channel) { } - public override Task HandleBasicCancelAsync(string consumerTag) + public override Task HandleBasicCancelAsync(string consumerTag, + CancellationToken cancellationToken = default) { return Task.FromException(TestException); } @@ -172,7 +174,8 @@ public ConsumerFailingOnConsumeOk(IChannel channel) : base(channel) { } - public override Task HandleBasicConsumeOkAsync(string consumerTag) + public override Task HandleBasicConsumeOkAsync(string consumerTag, + CancellationToken cancellationToken = default) { return Task.FromException(TestException); } @@ -184,7 +187,8 @@ public ConsumerFailingOnCancelOk(IChannel channel) : base(channel) { } - public override Task HandleBasicCancelOkAsync(string consumerTag) + public override Task HandleBasicCancelOkAsync(string consumerTag, + CancellationToken cancellationToken = default) { return Task.FromException(TestException); } diff --git a/projects/Test/Integration/TestAsyncConsumerOperationDispatch.cs b/projects/Test/Integration/TestAsyncConsumerOperationDispatch.cs index 86cbb1d46..eb82321f5 100644 --- a/projects/Test/Integration/TestAsyncConsumerOperationDispatch.cs +++ b/projects/Test/Integration/TestAsyncConsumerOperationDispatch.cs @@ -86,7 +86,8 @@ public CollectingConsumer(IChannel channel) public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, - IReadOnlyBasicProperties properties, ReadOnlyMemory body) + IReadOnlyBasicProperties properties, ReadOnlyMemory body, + CancellationToken cancellationToken = default) { // we test concurrent dispatch from the moment basic.delivery is returned. // delivery tags have guaranteed ordering and we verify that it is preserved @@ -98,7 +99,8 @@ public override Task HandleBasicDeliverAsync(string consumerTag, s_counter.Signal(); } - return Channel.BasicAckAsync(deliveryTag: deliveryTag, multiple: false).AsTask(); + return Channel.BasicAckAsync(deliveryTag: deliveryTag, multiple: false, + cancellationToken: cancellationToken).AsTask(); } } diff --git a/projects/Test/Integration/TestMainLoop.cs b/projects/Test/Integration/TestMainLoop.cs index 24e611265..321f26bac 100644 --- a/projects/Test/Integration/TestMainLoop.cs +++ b/projects/Test/Integration/TestMainLoop.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Events; @@ -62,7 +63,8 @@ public override Task HandleBasicDeliverAsync(string consumerTag, string exchange, string routingKey, IReadOnlyBasicProperties properties, - ReadOnlyMemory body) + ReadOnlyMemory body, + CancellationToken cancellationToken = default) { throw new Exception("I am a bad consumer"); } diff --git a/projects/Test/SequentialIntegration/TestConnectionBlocked.cs b/projects/Test/SequentialIntegration/TestConnectionBlocked.cs index c610da454..d4ffaf607 100644 --- a/projects/Test/SequentialIntegration/TestConnectionBlocked.cs +++ b/projects/Test/SequentialIntegration/TestConnectionBlocked.cs @@ -66,7 +66,7 @@ public async Task TestConnectionBlockedNotification() await UnblockAsync(); }; - _conn.ConnectionUnblockedAsync += (object sender, EventArgs ea) => + _conn.ConnectionUnblockedAsync += (object sender, AsyncEventArgs ea) => { tcs.SetResult(true); return Task.CompletedTask; diff --git a/projects/Test/SequentialIntegration/TestConnectionBlockedChannelLeak.cs b/projects/Test/SequentialIntegration/TestConnectionBlockedChannelLeak.cs index daeab8ab3..df27c25da 100644 --- a/projects/Test/SequentialIntegration/TestConnectionBlockedChannelLeak.cs +++ b/projects/Test/SequentialIntegration/TestConnectionBlockedChannelLeak.cs @@ -88,7 +88,7 @@ public async Task TestConnectionBlockedChannelLeak_GH1573() return Task.CompletedTask; }; - _conn.ConnectionUnblockedAsync += (object sender, EventArgs ea) => + _conn.ConnectionUnblockedAsync += (object sender, AsyncEventArgs ea) => { connectionUnblockedTcs.SetResult(true); return Task.CompletedTask; From e6e8b5c7ad76a250f0abfb2b0b2285f2fb7515d5 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 19 Sep 2024 06:00:10 -0700 Subject: [PATCH 11/12] Rename several methods in `AsyncEventingBasicConsumer` to use `Async` suffix. --- .../AsyncBasicConsumerFake.cs | 2 - .../RabbitMQ.Client/PublicAPI.Shipped.txt | 8 ++-- .../events/AsyncEventingBasicConsumer.cs | 46 +++++++++---------- .../client/events/BasicDeliverEventArgs.cs | 2 +- .../Test/Applications/MassPublish/Program.cs | 2 +- .../TestConnectionRecovery.cs | 2 +- .../Test/Integration/TestAsyncConsumer.cs | 16 +++---- .../TestAsyncConsumerCancelNotify.cs | 4 +- .../TestAsyncConsumerOperationDispatch.cs | 2 +- .../TestAsyncEventingBasicConsumer.cs | 2 +- projects/Test/Integration/TestBasicPublish.cs | 18 ++++---- .../TestConcurrentAccessWithSharedChannel.cs | 2 +- .../TestConnectionRecoveryWithoutSetup.cs | 8 ++-- .../TestConnectionTopologyRecovery.cs | 8 ++-- .../Test/Integration/TestFloodPublishing.cs | 2 +- projects/Test/OAuth2/TestOAuth2.cs | 2 +- .../TestActivitySource.cs | 12 ++--- .../TestOpenTelemetry.cs | 8 ++-- 18 files changed, 72 insertions(+), 74 deletions(-) diff --git a/projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs b/projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs index 31a0e83bf..541b826ba 100644 --- a/projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs +++ b/projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs @@ -37,8 +37,6 @@ public Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool public Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason) => Task.CompletedTask; - public Task OnCancelAsync(string[] consumerTags, CancellationToken cancellationToken = default) => Task.CompletedTask; - public IChannel Channel { get; } } } diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index be90e34c8..6c26eab60 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -246,10 +246,6 @@ RabbitMQ.Client.EndpointResolverExtensions RabbitMQ.Client.Events.AsyncEventHandler RabbitMQ.Client.Events.AsyncEventingBasicConsumer RabbitMQ.Client.Events.AsyncEventingBasicConsumer.AsyncEventingBasicConsumer(RabbitMQ.Client.IChannel channel) -> void -RabbitMQ.Client.Events.AsyncEventingBasicConsumer.Received -> RabbitMQ.Client.Events.AsyncEventHandler -RabbitMQ.Client.Events.AsyncEventingBasicConsumer.Registered -> RabbitMQ.Client.Events.AsyncEventHandler -RabbitMQ.Client.Events.AsyncEventingBasicConsumer.Shutdown -> RabbitMQ.Client.Events.AsyncEventHandler -RabbitMQ.Client.Events.AsyncEventingBasicConsumer.Unregistered -> RabbitMQ.Client.Events.AsyncEventHandler RabbitMQ.Client.Events.BaseExceptionEventArgs RabbitMQ.Client.Events.BasicAckEventArgs RabbitMQ.Client.Events.BasicDeliverEventArgs @@ -900,3 +896,7 @@ virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleBasicCancelOkAsync(strin virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleBasicConsumeOkAsync(string! consumerTag, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.HandleBasicDeliverAsync(string! consumerTag, ulong deliveryTag, bool redelivered, string! exchange, string! routingKey, RabbitMQ.Client.IReadOnlyBasicProperties! properties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! virtual RabbitMQ.Client.AsyncDefaultBasicConsumer.OnCancelAsync(string![]! consumerTags, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +RabbitMQ.Client.Events.AsyncEventingBasicConsumer.ShutdownAsync -> RabbitMQ.Client.Events.AsyncEventHandler! +RabbitMQ.Client.Events.AsyncEventingBasicConsumer.UnregisteredAsync -> RabbitMQ.Client.Events.AsyncEventHandler! +RabbitMQ.Client.Events.AsyncEventingBasicConsumer.RegisteredAsync -> RabbitMQ.Client.Events.AsyncEventHandler! +RabbitMQ.Client.Events.AsyncEventingBasicConsumer.ReceivedAsync -> RabbitMQ.Client.Events.AsyncEventHandler! diff --git a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs index 9c714fd9f..fb04690c1 100644 --- a/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs +++ b/projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs @@ -20,45 +20,45 @@ public AsyncEventingBasicConsumer(IChannel channel) : base(channel) /// Accessing the body at a later point is unsafe as its memory can /// be already released. /// - public event AsyncEventHandler Received + public event AsyncEventHandler ReceivedAsync { - add => _receivedWrapper.AddHandler(value); - remove => _receivedWrapper.RemoveHandler(value); + add => _receivedAsyncWrapper.AddHandler(value); + remove => _receivedAsyncWrapper.RemoveHandler(value); } - private AsyncEventingWrapper _receivedWrapper; + private AsyncEventingWrapper _receivedAsyncWrapper; ///Fires when the server confirms successful consumer registration. - public event AsyncEventHandler Registered + public event AsyncEventHandler RegisteredAsync { - add => _registeredWrapper.AddHandler(value); - remove => _registeredWrapper.RemoveHandler(value); + add => _registeredAsyncWrapper.AddHandler(value); + remove => _registeredAsyncWrapper.RemoveHandler(value); } - private AsyncEventingWrapper _registeredWrapper; + private AsyncEventingWrapper _registeredAsyncWrapper; ///Fires on channel shutdown, both client and server initiated. - public event AsyncEventHandler Shutdown + public event AsyncEventHandler ShutdownAsync { - add => _shutdownWrapper.AddHandler(value); - remove => _shutdownWrapper.RemoveHandler(value); + add => _shutdownAsyncWrapper.AddHandler(value); + remove => _shutdownAsyncWrapper.RemoveHandler(value); } - private AsyncEventingWrapper _shutdownWrapper; + private AsyncEventingWrapper _shutdownAsyncWrapper; ///Fires when the server confirms successful consumer cancellation. - public event AsyncEventHandler Unregistered + public event AsyncEventHandler UnregisteredAsync { - add => _unregisteredWrapper.AddHandler(value); - remove => _unregisteredWrapper.RemoveHandler(value); + add => _unregisteredAsyncWrapper.AddHandler(value); + remove => _unregisteredAsyncWrapper.RemoveHandler(value); } - private AsyncEventingWrapper _unregisteredWrapper; + private AsyncEventingWrapper _unregisteredAsyncWrapper; ///Fires when the server confirms successful consumer cancellation. protected override async Task OnCancelAsync(string[] consumerTags, CancellationToken cancellationToken = default) { await base.OnCancelAsync(consumerTags, cancellationToken) .ConfigureAwait(false); - if (!_unregisteredWrapper.IsEmpty) + if (!_unregisteredAsyncWrapper.IsEmpty) { - await _unregisteredWrapper.InvokeAsync(this, new ConsumerEventArgs(consumerTags, cancellationToken)) + await _unregisteredAsyncWrapper.InvokeAsync(this, new ConsumerEventArgs(consumerTags, cancellationToken)) .ConfigureAwait(false); } } @@ -68,9 +68,9 @@ public override async Task HandleBasicConsumeOkAsync(string consumerTag, Cancell { await base.HandleBasicConsumeOkAsync(consumerTag, cancellationToken) .ConfigureAwait(false); - if (!_registeredWrapper.IsEmpty) + if (!_registeredAsyncWrapper.IsEmpty) { - await _registeredWrapper.InvokeAsync(this, new ConsumerEventArgs(new[] { consumerTag }, cancellationToken)) + await _registeredAsyncWrapper.InvokeAsync(this, new ConsumerEventArgs(new[] { consumerTag }, cancellationToken)) .ConfigureAwait(false); } } @@ -82,7 +82,7 @@ public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryT var deliverEventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body, cancellationToken); // No need to call base, it's empty. - return _receivedWrapper.InvokeAsync(this, deliverEventArgs); + return _receivedAsyncWrapper.InvokeAsync(this, deliverEventArgs); } ///Fires the Shutdown event. @@ -90,9 +90,9 @@ public override async Task HandleChannelShutdownAsync(object channel, ShutdownEv { await base.HandleChannelShutdownAsync(channel, reason) .ConfigureAwait(false); - if (!_shutdownWrapper.IsEmpty) + if (!_shutdownAsyncWrapper.IsEmpty) { - await _shutdownWrapper.InvokeAsync(this, reason) + await _shutdownAsyncWrapper.InvokeAsync(this, reason) .ConfigureAwait(false); } } diff --git a/projects/RabbitMQ.Client/client/events/BasicDeliverEventArgs.cs b/projects/RabbitMQ.Client/client/events/BasicDeliverEventArgs.cs index d862872c7..5b4f2e7bb 100644 --- a/projects/RabbitMQ.Client/client/events/BasicDeliverEventArgs.cs +++ b/projects/RabbitMQ.Client/client/events/BasicDeliverEventArgs.cs @@ -67,7 +67,7 @@ public BasicDeliverEventArgs(string consumerTag, /// /// /// NOTE: Using this memory outside of - /// + /// /// requires that it be copied! /// /// This shows how to copy the data for use: diff --git a/projects/Test/Applications/MassPublish/Program.cs b/projects/Test/Applications/MassPublish/Program.cs index e10bd4f28..8bb649b71 100644 --- a/projects/Test/Applications/MassPublish/Program.cs +++ b/projects/Test/Applications/MassPublish/Program.cs @@ -81,7 +81,7 @@ await consumeChannel.QueueDeclareAsync(queue: QueueName, passive: false, durable: false, exclusive: false, autoDelete: false, arguments: null); var asyncListener = new AsyncEventingBasicConsumer(consumeChannel); - asyncListener.Received += AsyncListener_Received; + asyncListener.ReceivedAsync += AsyncListener_Received; await consumeChannel.QueueBindAsync(queue: QueueName, exchange: ExchangeName, routingKey: RoutingKey, arguments: null); diff --git a/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs index 35b027bad..bb5bc3323 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs @@ -93,7 +93,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName, await _channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: routingKey); var consumer = new AsyncEventingBasicConsumer(_channel); - consumer.Received += MessageReceived; + consumer.ReceivedAsync += MessageReceived; await _channel.BasicConsumeAsync(queueName, true, consumer); using (IChannel pubCh = await _conn.CreateChannelAsync()) diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index 6cc67e84b..0a0bde8a7 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -105,7 +105,7 @@ public async Task TestBasicRoundtripConcurrent() return Task.CompletedTask; }; - consumer.Received += (o, a) => + consumer.ReceivedAsync += (o, a) => { if (ByteArraysEqual(a.Body.Span, body1)) { @@ -274,7 +274,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages() }; var consumer = new AsyncEventingBasicConsumer(consumeChannel); - consumer.Received += (o, a) => + consumer.ReceivedAsync += (o, a) => { if (ByteArraysEqual(a.Body.ToArray(), body1)) { @@ -370,7 +370,7 @@ public async Task TestBasicRejectAsync() }; var consumer = new AsyncEventingBasicConsumer(_channel); - consumer.Received += async (object sender, BasicDeliverEventArgs args) => + consumer.ReceivedAsync += async (object sender, BasicDeliverEventArgs args) => { var c = sender as AsyncEventingBasicConsumer; Assert.Same(c, consumer); @@ -470,7 +470,7 @@ public async Task TestBasicAckAsync() await _channel.ConfirmSelectAsync(); var consumer = new AsyncEventingBasicConsumer(_channel); - consumer.Received += async (object sender, BasicDeliverEventArgs args) => + consumer.ReceivedAsync += async (object sender, BasicDeliverEventArgs args) => { var c = sender as AsyncEventingBasicConsumer; Assert.NotNull(c); @@ -534,7 +534,7 @@ public async Task TestBasicNackAsync() }; var consumer = new AsyncEventingBasicConsumer(_channel); - consumer.Received += async (object sender, BasicDeliverEventArgs args) => + consumer.ReceivedAsync += async (object sender, BasicDeliverEventArgs args) => { var c = sender as AsyncEventingBasicConsumer; Assert.NotNull(c); @@ -651,7 +651,7 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650() await _channel.QueueBindAsync(queue2Name, exchangeName, queue2Name); var consumer1 = new AsyncEventingBasicConsumer(_channel); - consumer1.Received += async (sender, args) => + consumer1.ReceivedAsync += async (sender, args) => { using (IChannel innerChannel = await _conn.CreateChannelAsync()) { @@ -666,7 +666,7 @@ await innerChannel.BasicPublishAsync(exchangeName, queue2Name, await _channel.BasicConsumeAsync(queue1Name, autoAck: true, consumer1); var consumer2 = new AsyncEventingBasicConsumer(_channel); - consumer2.Received += async (sender, args) => + consumer2.ReceivedAsync += async (sender, args) => { tcs.TrySetResult(true); await Task.Yield(); @@ -691,7 +691,7 @@ public async Task TestCloseWithinEventHandler_GH1567() string queueName = q.QueueName; var consumer = new AsyncEventingBasicConsumer(_channel); - consumer.Received += async (_, eventArgs) => + consumer.ReceivedAsync += async (_, eventArgs) => { await _channel.BasicCancelAsync(eventArgs.ConsumerTag); #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed diff --git a/projects/Test/Integration/TestAsyncConsumerCancelNotify.cs b/projects/Test/Integration/TestAsyncConsumerCancelNotify.cs index 8cea6ec8c..793f054ce 100644 --- a/projects/Test/Integration/TestAsyncConsumerCancelNotify.cs +++ b/projects/Test/Integration/TestAsyncConsumerCancelNotify.cs @@ -74,7 +74,7 @@ public async Task TestCorrectConsumerTag() string consumerTag2 = await _channel.BasicConsumeAsync(q2, true, consumer); string notifiedConsumerTag = null; - consumer.Unregistered += (sender, args) => + consumer.UnregisteredAsync += (sender, args) => { notifiedConsumerTag = args.ConsumerTags.First(); _tcs.TrySetResult(true); @@ -111,7 +111,7 @@ public CancelNotificationConsumer(IChannel channel, TestAsyncConsumerCancelNotif _eventMode = eventMode; if (eventMode) { - Unregistered += CancelledAsync; + UnregisteredAsync += CancelledAsync; } } diff --git a/projects/Test/Integration/TestAsyncConsumerOperationDispatch.cs b/projects/Test/Integration/TestAsyncConsumerOperationDispatch.cs index eb82321f5..0b3234822 100644 --- a/projects/Test/Integration/TestAsyncConsumerOperationDispatch.cs +++ b/projects/Test/Integration/TestAsyncConsumerOperationDispatch.cs @@ -169,7 +169,7 @@ public async Task TestChannelShutdownDoesNotShutDownDispatcher() var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); await ch1.BasicConsumeAsync(q1, true, new AsyncEventingBasicConsumer(ch1)); var c2 = new AsyncEventingBasicConsumer(ch2); - c2.Received += (object sender, BasicDeliverEventArgs e) => + c2.ReceivedAsync += (object sender, BasicDeliverEventArgs e) => { tcs.SetResult(true); return Task.CompletedTask; diff --git a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs index 7152de944..680936bda 100644 --- a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs +++ b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs @@ -101,7 +101,7 @@ public async Task TestAsyncEventingBasicConsumer_GH1038() //async subscriber var consumer = new AsyncEventingBasicConsumer(_channel); - consumer.Received += AsyncConsumerOnReceived; + consumer.ReceivedAsync += AsyncConsumerOnReceived; await _channel.BasicConsumeAsync(queueName, false, consumer); //publisher diff --git a/projects/Test/Integration/TestBasicPublish.cs b/projects/Test/Integration/TestBasicPublish.cs index 4f582297b..6f70661dc 100644 --- a/projects/Test/Integration/TestBasicPublish.cs +++ b/projects/Test/Integration/TestBasicPublish.cs @@ -68,7 +68,7 @@ public async Task TestBasicRoundtripArray() var consumer = new AsyncEventingBasicConsumer(_channel); using (var consumerReceivedSemaphore = new SemaphoreSlim(0, 1)) { - consumer.Received += (o, a) => + consumer.ReceivedAsync += (o, a) => { consumeBody = a.Body.ToArray(); consumerReceivedSemaphore.Release(); @@ -98,7 +98,7 @@ public async Task TestBasicRoundtripCachedString() var consumer = new AsyncEventingBasicConsumer(_channel); using (var consumerReceivedSemaphore = new SemaphoreSlim(0, 1)) { - consumer.Received += (o, a) => + consumer.ReceivedAsync += (o, a) => { consumeBody = a.Body.ToArray(); consumerReceivedSemaphore.Release(); @@ -127,7 +127,7 @@ public async Task TestBasicRoundtripReadOnlyMemory() var consumer = new AsyncEventingBasicConsumer(_channel); using (var consumerReceivedSemaphore = new SemaphoreSlim(0, 1)) { - consumer.Received += (o, a) => + consumer.ReceivedAsync += (o, a) => { consumeBody = a.Body.ToArray(); consumerReceivedSemaphore.Release(); @@ -156,7 +156,7 @@ public async Task CanNotModifyPayloadAfterPublish() using (var consumerReceivedSemaphore = new SemaphoreSlim(0, 1)) { bool modified = true; - consumer.Received += (o, a) => + consumer.ReceivedAsync += (o, a) => { if (a.Body.Span.IndexOf((byte)1) < 0) { @@ -229,25 +229,25 @@ public async Task TestMaxInboundMessageBodySize() var consumer = new AsyncEventingBasicConsumer(channel); - consumer.Shutdown += (o, a) => + consumer.ShutdownAsync += (o, a) => { tcs.SetResult(true); return Task.CompletedTask; }; - consumer.Registered += (o, a) => + consumer.RegisteredAsync += (o, a) => { sawConsumerRegistered = true; return Task.CompletedTask; }; - consumer.Unregistered += (o, a) => + consumer.UnregisteredAsync += (o, a) => { sawConsumerUnregistered = true; return Task.CompletedTask; }; - consumer.Received += (o, a) => + consumer.ReceivedAsync += (o, a) => { Interlocked.Increment(ref count); return Task.CompletedTask; @@ -312,7 +312,7 @@ public async Task TestPropertiesRoundtrip_Headers() using (var consumerReceivedSemaphore = new SemaphoreSlim(0, 1)) { string response = null; - consumer.Received += (o, a) => + consumer.ReceivedAsync += (o, a) => { response = _encoding.GetString(a.BasicProperties.Headers["Hello"] as byte[]); consumeBody = a.Body.ToArray(); diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs index d793b1654..26f83eb34 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs @@ -82,7 +82,7 @@ await TestConcurrentOperationsAsync(async () => var consumer = new AsyncEventingBasicConsumer(_channel); - consumer.Received += async (object sender, BasicDeliverEventArgs ea) => + consumer.ReceivedAsync += async (object sender, BasicDeliverEventArgs ea) => { try { diff --git a/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs index 56755eac6..aba99c7d5 100644 --- a/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs +++ b/projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs @@ -133,7 +133,7 @@ public async Task TestConsumerWorkServiceRecovery() Assert.True(ch.IsOpen); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - cons.Received += (s, args) => + cons.ReceivedAsync += (s, args) => { tcs.SetResult(true); return Task.CompletedTask; @@ -189,7 +189,7 @@ public async Task TestConsumerRecoveryOnClientNamedQueueWithOneRecovery() Assert.False(queueNameChangeAfterRecoveryCalled); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - cons.Received += (s, args) => + cons.ReceivedAsync += (s, args) => { tcs.SetResult(true); return Task.CompletedTask; @@ -320,7 +320,7 @@ public async Task TestTopologyRecoveryConsumerFilter() var consumerRecoveryTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var consumerToRecover = new AsyncEventingBasicConsumer(ch); - consumerToRecover.Received += (source, ea) => + consumerToRecover.ReceivedAsync += (source, ea) => { consumerRecoveryTcs.SetResult(true); return Task.CompletedTask; @@ -329,7 +329,7 @@ public async Task TestTopologyRecoveryConsumerFilter() var ignoredTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var consumerToIgnore = new AsyncEventingBasicConsumer(ch); - consumerToIgnore.Received += (source, ea) => + consumerToIgnore.ReceivedAsync += (source, ea) => { ignoredTcs.SetResult(true); return Task.CompletedTask; diff --git a/projects/Test/Integration/TestConnectionTopologyRecovery.cs b/projects/Test/Integration/TestConnectionTopologyRecovery.cs index 77c893b21..bf53ed987 100644 --- a/projects/Test/Integration/TestConnectionTopologyRecovery.cs +++ b/projects/Test/Integration/TestConnectionTopologyRecovery.cs @@ -71,7 +71,7 @@ public async Task TestRecoverTopologyOnDisposedChannel() await AssertConsumerCountAsync(_channel, q, 1); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - cons.Received += (s, args) => + cons.ReceivedAsync += (s, args) => { tcs.SetResult(true); return Task.CompletedTask; @@ -285,7 +285,7 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities() var consumerReceivedTcs1 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var consumer1 = new AsyncEventingBasicConsumer(ch); - consumer1.Received += (source, ea) => + consumer1.ReceivedAsync += (source, ea) => { consumerReceivedTcs1.SetResult(true); return Task.CompletedTask; @@ -294,7 +294,7 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities() var consumerReceivedTcs2 = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var consumer2 = new AsyncEventingBasicConsumer(ch); - consumer2.Received += (source, ea) => + consumer2.ReceivedAsync += (source, ea) => { consumerReceivedTcs2.SetResult(true); return Task.CompletedTask; @@ -569,7 +569,7 @@ public async Task TestTopologyRecoveryConsumerExceptionHandler() var recoveredConsumerReceivedTcs = new ManualResetEventSlim(false); var consumerToRecover = new AsyncEventingBasicConsumer(ch); - consumerToRecover.Received += (source, ea) => + consumerToRecover.ReceivedAsync += (source, ea) => { recoveredConsumerReceivedTcs.Set(); return Task.CompletedTask; diff --git a/projects/Test/Integration/TestFloodPublishing.cs b/projects/Test/Integration/TestFloodPublishing.cs index d301ca57f..38c73123b 100644 --- a/projects/Test/Integration/TestFloodPublishing.cs +++ b/projects/Test/Integration/TestFloodPublishing.cs @@ -250,7 +250,7 @@ public async Task TestMultithreadFloodPublishing() }; var consumer = new AsyncEventingBasicConsumer(consumeChannel); - consumer.Received += async (o, a) => + consumer.ReceivedAsync += async (o, a) => { string receivedMessage = _encoding.GetString(a.Body.ToArray()); Assert.Equal(message, receivedMessage); diff --git a/projects/Test/OAuth2/TestOAuth2.cs b/projects/Test/OAuth2/TestOAuth2.cs index 5b061ab29..c4abf5f94 100644 --- a/projects/Test/OAuth2/TestOAuth2.cs +++ b/projects/Test/OAuth2/TestOAuth2.cs @@ -265,7 +265,7 @@ private async ValueTask DeclareConsumeChannelAsync() private async Task ConsumeAsync(IChannel consumeChannel) { var asyncListener = new AsyncEventingBasicConsumer(consumeChannel); - asyncListener.Received += AsyncListener_Received; + asyncListener.ReceivedAsync += AsyncListener_Received; string consumerTag = await consumeChannel.BasicConsumeAsync("testqueue", true, "testconsumer", asyncListener); await _doneEvent.WaitAsync(TimeSpan.FromSeconds(5)); _testOutputHelper.WriteLine("Received message"); diff --git a/projects/Test/SequentialIntegration/TestActivitySource.cs b/projects/Test/SequentialIntegration/TestActivitySource.cs index d1622647a..7266ae86a 100644 --- a/projects/Test/SequentialIntegration/TestActivitySource.cs +++ b/projects/Test/SequentialIntegration/TestActivitySource.cs @@ -94,7 +94,7 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera var consumer = new AsyncEventingBasicConsumer(_channel); var consumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.Received += (o, a) => + consumer.ReceivedAsync += (o, a) => { consumeBody = a.Body.ToArray(); consumerReceivedTcs.SetResult(true); @@ -133,7 +133,7 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTags(bool use var consumer = new AsyncEventingBasicConsumer(_channel); var consumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.Received += (o, a) => + consumer.ReceivedAsync += (o, a) => { consumeBody = a.Body.ToArray(); consumerReceivedTcs.SetResult(true); @@ -174,7 +174,7 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTags(boo var consumer = new AsyncEventingBasicConsumer(_channel); var consumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.Received += (o, a) => + consumer.ReceivedAsync += (o, a) => { consumeBody = a.Body.ToArray(); consumerReceivedTcs.SetResult(true); @@ -215,7 +215,7 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs var consumer = new AsyncEventingBasicConsumer(_channel); var consumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.Received += (o, a) => + consumer.ReceivedAsync += (o, a) => { consumeBody = a.Body.ToArray(); consumerReceivedTcs.SetResult(true); @@ -255,7 +255,7 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo var consumer = new AsyncEventingBasicConsumer(_channel); var consumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.Received += (o, a) => + consumer.ReceivedAsync += (o, a) => { consumeBody = a.Body.ToArray(); consumerReceivedTcs.SetResult(true); @@ -297,7 +297,7 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn var consumer = new AsyncEventingBasicConsumer(_channel); var consumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.Received += (o, a) => + consumer.ReceivedAsync += (o, a) => { consumeBody = a.Body.ToArray(); consumerReceivedTcs.SetResult(true); diff --git a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs index f7e0742ed..c4a185601 100644 --- a/projects/Test/SequentialIntegration/TestOpenTelemetry.cs +++ b/projects/Test/SequentialIntegration/TestOpenTelemetry.cs @@ -107,7 +107,7 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera var consumer = new AsyncEventingBasicConsumer(_channel); var consumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.Received += (o, a) => + consumer.ReceivedAsync += (o, a) => { consumeBody = a.Body.ToArray(); string baggageItem = Baggage.GetBaggage("TestItem"); @@ -165,7 +165,7 @@ public async Task TestPublisherAndConsumerActivityTagsAsync(bool useRoutingKeyAs var consumer = new AsyncEventingBasicConsumer(_channel); var consumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.Received += (o, a) => + consumer.ReceivedAsync += (o, a) => { consumeBody = a.Body.ToArray(); string baggageItem = Baggage.GetBaggage("TestItem"); @@ -223,7 +223,7 @@ public async Task TestPublisherWithPublicationAddressAndConsumerActivityTagsAsyn var consumer = new AsyncEventingBasicConsumer(_channel); var consumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.Received += (o, a) => + consumer.ReceivedAsync += (o, a) => { consumeBody = a.Body.ToArray(); string baggageItem = Baggage.GetBaggage("TestItem"); @@ -282,7 +282,7 @@ public async Task TestPublisherWithCachedStringsAndConsumerActivityTagsAsync(boo var consumer = new AsyncEventingBasicConsumer(_channel); var consumerReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - consumer.Received += (o, a) => + consumer.ReceivedAsync += (o, a) => { consumeBody = a.Body.ToArray(); string baggageItem = Baggage.GetBaggage("TestItem"); From 66f0e4135b3df3208794c4985d63141dd70e7dac Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Thu, 19 Sep 2024 15:34:48 +0200 Subject: [PATCH 12/12] Cleanup session interface and base --- .../client/impl/ChannelBase.cs | 4 ++-- .../RabbitMQ.Client/client/impl/ISession.cs | 22 +------------------ .../client/impl/SessionBase.cs | 9 ++------ 3 files changed, 5 insertions(+), 30 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 6907a93fb..e12d7014e 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -408,7 +408,7 @@ internal async Task FinishCloseAsync(CancellationToken cancellationToken) ShutdownEventArgs? reason = CloseReason; if (reason != null) { - await Session.CloseAsync(reason, cancellationToken) + await Session.CloseAsync(reason) .ConfigureAwait(false); } @@ -664,7 +664,7 @@ protected async Task HandleChannelCloseAsync(IncomingCommand cmd, Cancella channelClose._classId, channelClose._methodId)); - await Session.CloseAsync(_closeReason, false, cancellationToken) + await Session.CloseAsync(_closeReason, notify: false) .ConfigureAwait(false); var method = new ChannelCloseOk(); diff --git a/projects/RabbitMQ.Client/client/impl/ISession.cs b/projects/RabbitMQ.Client/client/impl/ISession.cs index 74f198edc..9532b217f 100644 --- a/projects/RabbitMQ.Client/client/impl/ISession.cs +++ b/projects/RabbitMQ.Client/client/impl/ISession.cs @@ -41,39 +41,19 @@ namespace RabbitMQ.Client.Impl internal interface ISession { - /// - /// Gets the channel number. - /// ushort ChannelNumber { get; } - /// - /// Gets the close reason. - /// ShutdownEventArgs? CloseReason { get; } - /// - /// Single recipient - no need for multiple handlers to be informed of arriving commands. - /// CommandReceivedAction? CommandReceived { get; set; } - /// - /// Gets the connection. - /// Connection Connection { get; } - /// - /// Gets a value indicating whether this session is open. - /// bool IsOpen { get; } - /// - /// Multicast session shutdown event. - /// event AsyncEventHandler SessionShutdownAsync; - Task CloseAsync(ShutdownEventArgs reason, CancellationToken cancellationToken); - - Task CloseAsync(ShutdownEventArgs reason, bool notify, CancellationToken cancellationToken); + Task CloseAsync(ShutdownEventArgs reason, bool notify = true); Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken); diff --git a/projects/RabbitMQ.Client/client/impl/SessionBase.cs b/projects/RabbitMQ.Client/client/impl/SessionBase.cs index 2b5381bde..f72823f63 100644 --- a/projects/RabbitMQ.Client/client/impl/SessionBase.cs +++ b/projects/RabbitMQ.Client/client/impl/SessionBase.cs @@ -92,12 +92,7 @@ public override string ToString() return $"{GetType().Name}#{ChannelNumber}:{Connection}"; } - public Task CloseAsync(ShutdownEventArgs reason, CancellationToken cancellationToken) - { - return CloseAsync(reason, true, cancellationToken); - } - - public Task CloseAsync(ShutdownEventArgs reason, bool notify, CancellationToken cancellationToken) + public Task CloseAsync(ShutdownEventArgs reason, bool notify = true) { if (Interlocked.CompareExchange(ref _closeReason, reason, null) is null) { @@ -155,7 +150,7 @@ public ValueTask TransmitAsync(in TMethod cmd, in THeader head private Task OnConnectionShutdownAsync(object? conn, ShutdownEventArgs reason) { - return CloseAsync(reason, CancellationToken.None); + return CloseAsync(reason); } private Task OnSessionShutdownAsync(ShutdownEventArgs reason)