diff --git a/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs b/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs index d9958fae0..cfbf2c546 100644 --- a/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs @@ -34,20 +34,20 @@ protected override async Task ProcessChannelAsync() { await work.Consumer.HandleBasicDeliverAsync( work.ConsumerTag!, work.DeliveryTag, work.Redelivered, - work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory) + work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory, work.CancellationToken) .ConfigureAwait(false); } break; case WorkType.Cancel: - await work.Consumer.HandleBasicCancelAsync(work.ConsumerTag!) + await work.Consumer.HandleBasicCancelAsync(work.ConsumerTag!, work.CancellationToken) .ConfigureAwait(false); break; case WorkType.CancelOk: - await work.Consumer.HandleBasicCancelOkAsync(work.ConsumerTag!) + await work.Consumer.HandleBasicCancelOkAsync(work.ConsumerTag!, work.CancellationToken) .ConfigureAwait(false); break; case WorkType.ConsumeOk: - await work.Consumer.HandleBasicConsumeOkAsync(work.ConsumerTag!) + await work.Consumer.HandleBasicConsumeOkAsync(work.ConsumerTag!, work.CancellationToken) .ConfigureAwait(false); break; case WorkType.Shutdown: diff --git a/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs index 0100fb3f3..be9ebf802 100644 --- a/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs +++ b/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs @@ -46,6 +46,7 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase, private readonly ushort _concurrency; private long _isQuiescing; private bool _disposed; + private readonly CancellationTokenSource _shutdownCts = new CancellationTokenSource(); internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency) { @@ -92,7 +93,7 @@ public async ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, s try { AddConsumer(consumer, consumerTag); - WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag); + WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag, _shutdownCts); await _writer.WriteAsync(work, cancellationToken) .ConfigureAwait(false); } @@ -113,7 +114,7 @@ public async ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliver if (false == _disposed && false == IsQuiescing) { IAsyncBasicConsumer consumer = GetConsumerOrDefault(consumerTag); - var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body); + var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, _shutdownCts); await _writer.WriteAsync(work, cancellationToken) .ConfigureAwait(false); } @@ -126,7 +127,7 @@ public async ValueTask HandleBasicCancelOkAsync(string consumerTag, Cancellation if (false == _disposed && false == IsQuiescing) { IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag); - WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag); + WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag, _shutdownCts); await _writer.WriteAsync(work, cancellationToken) .ConfigureAwait(false); } @@ -139,7 +140,7 @@ public async ValueTask HandleBasicCancelAsync(string consumerTag, CancellationTo if (false == _disposed && false == IsQuiescing) { IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag); - WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag); + WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag, _shutdownCts); await _writer.WriteAsync(work, cancellationToken) .ConfigureAwait(false); } @@ -147,10 +148,23 @@ await _writer.WriteAsync(work, cancellationToken) public void Quiesce() { + if (IsQuiescing) + { + return; + } + Interlocked.Exchange(ref _isQuiescing, 1); + try + { + _shutdownCts.Cancel(); + } + catch + { + // ignore + } } - public async Task WaitForShutdownAsync() + public async Task WaitForShutdownAsync(CancellationToken cancellationToken) { if (_disposed) { @@ -169,7 +183,7 @@ public async Task WaitForShutdownAsync() * * await _reader.Completion.ConfigureAwait(false); */ - await _worker + await _worker.WaitAsync(cancellationToken) .ConfigureAwait(false); } catch (AggregateException aex) @@ -203,18 +217,13 @@ protected bool IsQuiescing { get { - if (Interlocked.Read(ref _isQuiescing) == 1) - { - return true; - } - - return false; + return Interlocked.Read(ref _isQuiescing) == 1; } } protected sealed override void ShutdownConsumer(IAsyncBasicConsumer consumer, ShutdownEventArgs reason) { - _writer.TryWrite(WorkStruct.CreateShutdown(consumer, reason)); + _writer.TryWrite(WorkStruct.CreateShutdown(consumer, reason, _shutdownCts)); } protected override Task InternalShutdownAsync() @@ -237,25 +246,32 @@ protected override Task InternalShutdownAsync() public readonly RentedMemory Body; public readonly ShutdownEventArgs? Reason; public readonly WorkType WorkType; + public readonly CancellationToken CancellationToken; + private readonly CancellationTokenSource? _cancellationTokenSource; - private WorkStruct(WorkType type, IAsyncBasicConsumer consumer, string consumerTag) + private WorkStruct(WorkType type, IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken) : this() { WorkType = type; Consumer = consumer; ConsumerTag = consumerTag; + CancellationToken = cancellationToken; + _cancellationTokenSource = null; } - private WorkStruct(IAsyncBasicConsumer consumer, ShutdownEventArgs reason) + private WorkStruct(IAsyncBasicConsumer consumer, ShutdownEventArgs reason, CancellationTokenSource? cancellationTokenSource) : this() { WorkType = WorkType.Shutdown; Consumer = consumer; Reason = reason; + CancellationToken = cancellationTokenSource?.Token ?? CancellationToken.None; + this._cancellationTokenSource = cancellationTokenSource; } private WorkStruct(IAsyncBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body) + string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body, + CancellationToken cancellationToken) { WorkType = WorkType.Deliver; Consumer = consumer; @@ -266,37 +282,62 @@ private WorkStruct(IAsyncBasicConsumer consumer, string consumerTag, ulong deliv RoutingKey = routingKey; BasicProperties = basicProperties; Body = body; - Reason = default; + Reason = null; + CancellationToken = cancellationToken; + _cancellationTokenSource = null; } - public static WorkStruct CreateCancel(IAsyncBasicConsumer consumer, string consumerTag) + public static WorkStruct CreateCancel(IAsyncBasicConsumer consumer, string consumerTag, CancellationTokenSource cancellationTokenSource) { - return new WorkStruct(WorkType.Cancel, consumer, consumerTag); + return new WorkStruct(WorkType.Cancel, consumer, consumerTag, cancellationTokenSource.Token); } - public static WorkStruct CreateCancelOk(IAsyncBasicConsumer consumer, string consumerTag) + public static WorkStruct CreateCancelOk(IAsyncBasicConsumer consumer, string consumerTag, CancellationTokenSource cancellationTokenSource) { - return new WorkStruct(WorkType.CancelOk, consumer, consumerTag); + return new WorkStruct(WorkType.CancelOk, consumer, consumerTag, cancellationTokenSource.Token); } - public static WorkStruct CreateConsumeOk(IAsyncBasicConsumer consumer, string consumerTag) + public static WorkStruct CreateConsumeOk(IAsyncBasicConsumer consumer, string consumerTag, CancellationTokenSource cancellationTokenSource) { - return new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag); + return new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag, cancellationTokenSource.Token); } - public static WorkStruct CreateShutdown(IAsyncBasicConsumer consumer, ShutdownEventArgs reason) + public static WorkStruct CreateShutdown(IAsyncBasicConsumer consumer, ShutdownEventArgs reason, CancellationTokenSource cancellationTokenSource) { - return new WorkStruct(consumer, reason); + // Create a linked CTS so the shutdown args token reflects both dispatcher cancellation and any upstream token. + CancellationTokenSource? linked = null; + try + { + if (reason.CancellationToken.CanBeCanceled) + { + linked = CancellationTokenSource.CreateLinkedTokenSource(cancellationTokenSource.Token, reason.CancellationToken); + } + } + catch + { + linked = null; + } + + CancellationToken token = linked?.Token ?? cancellationTokenSource.Token; + ShutdownEventArgs argsWithToken = reason.Exception != null ? + new ShutdownEventArgs(reason.Initiator, reason.ReplyCode, reason.ReplyText, reason.Exception, token) : + new ShutdownEventArgs(reason.Initiator, reason.ReplyCode, reason.ReplyText, reason.ClassId, reason.MethodId, reason.Cause, token); + + return new WorkStruct(consumer, argsWithToken, linked); } public static WorkStruct CreateDeliver(IAsyncBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body) + string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body, CancellationTokenSource cancellationTokenSource) { return new WorkStruct(consumer, consumerTag, deliveryTag, redelivered, - exchange, routingKey, basicProperties, body); + exchange, routingKey, basicProperties, body, cancellationTokenSource.Token); } - public void Dispose() => Body.Dispose(); + public void Dispose() + { + Body.Dispose(); + _cancellationTokenSource?.Dispose(); + } } protected enum WorkType : byte @@ -317,6 +358,7 @@ protected virtual void Dispose(bool disposing) if (disposing) { Quiesce(); + _shutdownCts.Dispose(); } } catch diff --git a/projects/RabbitMQ.Client/ConsumerDispatching/IConsumerDispatcher.cs b/projects/RabbitMQ.Client/ConsumerDispatching/IConsumerDispatcher.cs index 79676f37c..47ed29cbd 100644 --- a/projects/RabbitMQ.Client/ConsumerDispatching/IConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/ConsumerDispatching/IConsumerDispatcher.cs @@ -64,6 +64,6 @@ ValueTask HandleBasicDeliverAsync(string consumerTag, void Quiesce(); Task ShutdownAsync(ShutdownEventArgs reason); - Task WaitForShutdownAsync(); + Task WaitForShutdownAsync(CancellationToken cancellationToken); } } diff --git a/projects/RabbitMQ.Client/IChannel.cs b/projects/RabbitMQ.Client/IChannel.cs index b3d4cfb74..15f391720 100644 --- a/projects/RabbitMQ.Client/IChannel.cs +++ b/projects/RabbitMQ.Client/IChannel.cs @@ -262,10 +262,8 @@ Task CloseAsync(ushort replyCode, string replyText, bool abort, /// /// The instance containing the close data. /// Whether or not the close is an abort (ignoring certain exceptions). - /// CancellationToken for this operation. /// - Task CloseAsync(ShutdownEventArgs reason, bool abort, - CancellationToken cancellationToken = default); + Task CloseAsync(ShutdownEventArgs reason, bool abort); /// Asynchronously declare an exchange. /// The name of the exchange. diff --git a/projects/RabbitMQ.Client/IConnectionExtensions.cs b/projects/RabbitMQ.Client/IConnectionExtensions.cs index 33b9bc64c..52e8a717f 100644 --- a/projects/RabbitMQ.Client/IConnectionExtensions.cs +++ b/projects/RabbitMQ.Client/IConnectionExtensions.cs @@ -93,21 +93,21 @@ public static Task CloseAsync(this IConnection connection, ushort reasonCode, st /// /// /// Note that all active channels and sessions will be closed if this method is called. - /// In comparison to normal method, will not throw + /// In comparison to normal method, will not throw /// during closing connection. ///This method waits infinitely for the in-progress close operation to complete. /// - public static Task AbortAsync(this IConnection connection) + public static Task AbortAsync(this IConnection connection, CancellationToken cancellationToken = default) { return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", InternalConstants.DefaultConnectionAbortTimeout, true, - CancellationToken.None); + cancellationToken); } /// /// Asynchronously abort this connection and all its channels. /// /// - /// The method behaves in the same way as , with the only + /// The method behaves in the same way as , with the only /// difference that the connection is closed with the given connection close code and message. /// /// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification) @@ -116,10 +116,10 @@ public static Task AbortAsync(this IConnection connection) /// A message indicating the reason for closing the connection /// /// - public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText) + public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText, CancellationToken cancellationToken = default) { return connection.CloseAsync(reasonCode, reasonText, InternalConstants.DefaultConnectionAbortTimeout, true, - CancellationToken.None); + cancellationToken); } /// @@ -127,7 +127,7 @@ public static Task AbortAsync(this IConnection connection, ushort reasonCode, st /// timeout for all the in-progress close operations to complete. /// /// - /// This method, behaves in a similar way as method with the + /// This method, behaves in a similar way as method with the /// only difference that it explicitly specifies a timeout given /// for all the in-progress close operations to complete. /// If timeout is reached and the close operations haven't finished, then socket is forced to close. diff --git a/projects/RabbitMQ.Client/Impl/AsyncEventingWrapper.cs b/projects/RabbitMQ.Client/Impl/AsyncEventingWrapper.cs index 7599129ac..f3b091b34 100644 --- a/projects/RabbitMQ.Client/Impl/AsyncEventingWrapper.cs +++ b/projects/RabbitMQ.Client/Impl/AsyncEventingWrapper.cs @@ -61,6 +61,10 @@ private readonly async Task InternalInvoke(Delegate[] handlers, object sender, T await action(sender, @event) .ConfigureAwait(false); } + catch (OperationCanceledException) + { + // Ignore cancellation exceptions + } catch (Exception exception) { if (_onException != null) diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs index 0ea8a2699..39ecf9b74 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs @@ -234,13 +234,12 @@ await _connection.DeleteRecordedChannelAsync(this, } } - public async Task CloseAsync(ShutdownEventArgs args, bool abort, - CancellationToken cancellationToken) + public async Task CloseAsync(ShutdownEventArgs args, bool abort) { ThrowIfDisposed(); try { - await _innerChannel.CloseAsync(args, abort, cancellationToken) + await _innerChannel.CloseAsync(args, abort) .ConfigureAwait(false); } finally diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index 53bb47c80..4cbbec2c9 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -201,22 +201,17 @@ protected void TakeOver(Channel other) public Task CloseAsync(ushort replyCode, string replyText, bool abort, CancellationToken cancellationToken) { - var args = new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText); - return CloseAsync(args, abort, cancellationToken); + var args = new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText, cancellationToken: cancellationToken); + return CloseAsync(args, abort); } - public async Task CloseAsync(ShutdownEventArgs args, bool abort, - CancellationToken cancellationToken) + public async Task CloseAsync(ShutdownEventArgs args, bool abort) { - CancellationToken argCancellationToken = cancellationToken; - if (IsOpen) - { - // Note: we really do need to try and close this channel! - cancellationToken = CancellationToken.None; - } + CancellationToken cancellationToken = args.CancellationToken; bool enqueued = false; - var k = new ChannelCloseAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + // We should really try to clsoe the connection and therefore we don't allow this to be canceled by the user + var k = new ChannelCloseAsyncRpcContinuation(ContinuationTimeout, IsOpen ? CancellationToken.None : cancellationToken); await _rpcSemaphore.WaitAsync(k.CancellationToken) .ConfigureAwait(false); @@ -236,7 +231,7 @@ await ModelSendAsync(in method, k.CancellationToken) AssertResultIsTrue(await k); - await ConsumerDispatcher.WaitForShutdownAsync() + await ConsumerDispatcher.WaitForShutdownAsync(cancellationToken) .ConfigureAwait(false); } catch (AlreadyClosedException) @@ -265,7 +260,6 @@ await ConsumerDispatcher.WaitForShutdownAsync() MaybeDisposeContinuation(enqueued, k); _rpcSemaphore.Release(); ChannelShutdownAsync -= k.OnConnectionShutdownAsync; - argCancellationToken.ThrowIfCancellationRequested(); } } @@ -850,7 +844,7 @@ await Session.Connection.HandleConnectionBlockedAsync(reason, cancellationToken) protected async Task HandleConnectionCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken) { var method = new ConnectionClose(cmd.MethodSpan); - var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId); + var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId, cancellationToken: cancellationToken); try { /* @@ -862,7 +856,7 @@ protected async Task HandleConnectionCloseAsync(IncomingCommand cmd, Cance await ModelSendAsync(in replyMethod, cancellationToken) .ConfigureAwait(false); - await Session.Connection.ClosedViaPeerAsync(reason, cancellationToken) + await Session.Connection.ClosedViaPeerAsync(reason) .ConfigureAwait(false); SetCloseReason(Session.Connection.CloseReason!); @@ -895,10 +889,9 @@ protected async Task HandleConnectionStartAsync(IncomingCommand cmd, Cance { if (m_connectionStartCell is null) { - var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start"); + var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start", cancellationToken: cancellationToken); await Session.Connection.CloseAsync(reason, false, - InternalConstants.DefaultConnectionCloseTimeout, - cancellationToken) + InternalConstants.DefaultConnectionCloseTimeout) .ConfigureAwait(false); } else diff --git a/projects/RabbitMQ.Client/Impl/Connection.Heartbeat.cs b/projects/RabbitMQ.Client/Impl/Connection.Heartbeat.cs index 3eb94272d..5e82d2b2c 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.Heartbeat.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.Heartbeat.cs @@ -109,7 +109,7 @@ private async void HeartbeatReadTimerCallback(object? state) { var eose = new EndOfStreamException($"Heartbeat missing with heartbeat == {_heartbeat} seconds"); LogCloseError(eose.Message, eose); - await HandleMainLoopExceptionAsync(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", eose)) + await HandleMainLoopExceptionAsync(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", eose, _mainLoopCts.Token)) .ConfigureAwait(false); shouldTerminate = true; } diff --git a/projects/RabbitMQ.Client/Impl/Connection.Receive.cs b/projects/RabbitMQ.Client/Impl/Connection.Receive.cs index ae67544e3..5aa4c0281 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.Receive.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.Receive.cs @@ -62,7 +62,8 @@ await ReceiveLoopAsync(mainLoopToken) var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "Thread aborted (AppDomain unloaded?)", - exception: taex); + exception: taex, + cancellationToken: mainLoopToken); await HandleMainLoopExceptionAsync(ea) .ConfigureAwait(false); } @@ -73,7 +74,8 @@ await HandleMainLoopExceptionAsync(ea) var ea = new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", - exception: eose); + exception: eose, + cancellationToken: mainLoopToken); await HandleMainLoopExceptionAsync(ea) .ConfigureAwait(false); } @@ -91,7 +93,8 @@ await HardProtocolExceptionHandlerAsync(hpe, mainLoopToken) var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, fileLoadException.Message, - exception: fileLoadException); + exception: fileLoadException, + cancellationToken: mainLoopToken); await HandleMainLoopExceptionAsync(ea) .ConfigureAwait(false); } @@ -106,7 +109,8 @@ await HandleMainLoopExceptionAsync(ea) var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, ocex.Message, - exception: ocex); + exception: ocex, + cancellationToken: mainLoopToken); await HandleMainLoopExceptionAsync(ea) .ConfigureAwait(false); } @@ -116,7 +120,8 @@ await HandleMainLoopExceptionAsync(ea) var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, ex.Message, - exception: ex); + exception: ex, + cancellationToken: mainLoopToken); await HandleMainLoopExceptionAsync(ea) .ConfigureAwait(false); } diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs index f661669af..3a1aa6ba2 100644 --- a/projects/RabbitMQ.Client/Impl/Connection.cs +++ b/projects/RabbitMQ.Client/Impl/Connection.cs @@ -254,10 +254,9 @@ await _channel0.ConnectionOpenAsync(_config.VirtualHost, cancellationToken) { try { - var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "FailedOpen"); + var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "FailedOpen", cancellationToken: cancellationToken); await CloseAsync(ea, true, - InternalConstants.DefaultConnectionAbortTimeout, - cancellationToken).ConfigureAwait(false); + InternalConstants.DefaultConnectionAbortTimeout).ConfigureAwait(false); } catch { } @@ -299,8 +298,8 @@ internal void EnsureIsOpen() public Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort, CancellationToken cancellationToken = default) { - var reason = new ShutdownEventArgs(ShutdownInitiator.Application, reasonCode, reasonText); - return CloseAsync(reason, abort, timeout, cancellationToken); + var reason = new ShutdownEventArgs(ShutdownInitiator.Application, reasonCode, reasonText, cancellationToken: cancellationToken); + return CloseAsync(reason, abort, timeout); } ///Asychronously try to close connection in a graceful way @@ -318,9 +317,9 @@ public Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, b ///to complete. /// /// - internal async Task CloseAsync(ShutdownEventArgs reason, bool abort, TimeSpan timeout, CancellationToken cancellationToken) + internal async Task CloseAsync(ShutdownEventArgs reason, bool abort, TimeSpan timeout) { - CancellationToken argCancellationToken = cancellationToken; + CancellationToken cancellationToken = reason.CancellationToken; if (abort && timeout < InternalConstants.DefaultConnectionAbortTimeout) { @@ -431,12 +430,11 @@ await _frameHandler.CloseAsync(cts.Token) throw; } } - - argCancellationToken.ThrowIfCancellationRequested(); } - internal async Task ClosedViaPeerAsync(ShutdownEventArgs reason, CancellationToken cancellationToken) + internal async Task ClosedViaPeerAsync(ShutdownEventArgs reason) { + CancellationToken cancellationToken = reason.CancellationToken; if (false == SetCloseReason(reason)) { if (_closed) diff --git a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt index 8e5220768..da6257103 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Shipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Shipped.txt @@ -770,7 +770,7 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void ~RabbitMQ.Client.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandlerAsync.set -> void ~static RabbitMQ.Client.IChannelExtensions.AbortAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IChannelExtensions.CloseAsync(this RabbitMQ.Client.IChannel channel, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task -~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText) -> System.Threading.Tasks.Task +~static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task ~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.get -> System.Func ~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.set -> void @@ -784,7 +784,7 @@ static RabbitMQ.Client.IChannelExtensions.ExchangeDeclareAsync(this RabbitMQ.Cli static RabbitMQ.Client.IChannelExtensions.QueueDeclareAsync(this RabbitMQ.Client.IChannel! channel, string! queue = "", bool durable = false, bool exclusive = true, bool autoDelete = true, System.Collections.Generic.IDictionary? arguments = null, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! static RabbitMQ.Client.IChannelExtensions.QueueDeleteAsync(this RabbitMQ.Client.IChannel! channel, string! queue, bool ifUnused = false, bool ifEmpty = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! static RabbitMQ.Client.IChannelExtensions.QueueUnbindAsync(this RabbitMQ.Client.IChannel! channel, string! queue, string! exchange, string! routingKey, System.Collections.Generic.IDictionary? arguments = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! -static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection) -> System.Threading.Tasks.Task! +static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task! static RabbitMQ.Client.IConnectionExtensions.AbortAsync(this RabbitMQ.Client.IConnection! connection, ushort reasonCode, string! reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task! static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! @@ -876,7 +876,7 @@ RabbitMQ.Client.Exceptions.AlreadyClosedException.AlreadyClosedException(RabbitM RabbitMQ.Client.Exceptions.OperationInterruptedException.ShutdownReason.get -> RabbitMQ.Client.Events.ShutdownEventArgs? RabbitMQ.Client.IAsyncBasicConsumer.HandleChannelShutdownAsync(object! channel, RabbitMQ.Client.Events.ShutdownEventArgs! reason) -> System.Threading.Tasks.Task! RabbitMQ.Client.IChannel.ChannelShutdownAsync -> RabbitMQ.Client.Events.AsyncEventHandler! -RabbitMQ.Client.IChannel.CloseAsync(RabbitMQ.Client.Events.ShutdownEventArgs! reason, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! +RabbitMQ.Client.IChannel.CloseAsync(RabbitMQ.Client.Events.ShutdownEventArgs! reason, bool abort) -> System.Threading.Tasks.Task! RabbitMQ.Client.IChannel.CloseReason.get -> RabbitMQ.Client.Events.ShutdownEventArgs? RabbitMQ.Client.IConnection.CloseReason.get -> RabbitMQ.Client.Events.ShutdownEventArgs? RabbitMQ.Client.IConnection.ConnectionShutdownAsync -> RabbitMQ.Client.Events.AsyncEventHandler! diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index e8e684997..3c6096919 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -514,7 +514,7 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false, Assert.True(await publishSyncSource.Task); Assert.Equal(messageCount, messagesReceived); await _channel.QueueDeleteAsync(queue: queueName); - await _channel.CloseAsync(_closeArgs, false, CancellationToken.None); + await _channel.CloseAsync(_closeArgs, false); } [Fact] @@ -591,7 +591,7 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false, Assert.Equal((uint)0, consumerCount); } - await _channel.CloseAsync(_closeArgs, false, CancellationToken.None); + await _channel.CloseAsync(_closeArgs, false); } [Fact] diff --git a/projects/Test/Integration/TestAsyncConsumerCancellation.cs b/projects/Test/Integration/TestAsyncConsumerCancellation.cs new file mode 100644 index 000000000..994b721ab --- /dev/null +++ b/projects/Test/Integration/TestAsyncConsumerCancellation.cs @@ -0,0 +1,72 @@ +using System; +using System.Threading.Tasks; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using Xunit; +using Xunit.Abstractions; + +namespace Test.Integration +{ + public class TestAsyncConsumerCancellation : IntegrationFixture + { + public TestAsyncConsumerCancellation(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task TestConsumerCancellation() + { + string exchangeName = GenerateExchangeName(); + string queueName = GenerateQueueName(); + string routingKey = string.Empty; + + await _channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct); + await _channel.QueueDeclareAsync(queueName, false, false, true, null); + await _channel.QueueBindAsync(queueName, exchangeName, routingKey, null); + + var tcsMessageReceived = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var tcsReceivedCancelled = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var tcsShutdownCancelled = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var consumer = new AsyncEventingBasicConsumer(_channel); + consumer.ShutdownAsync += async (model, ea) => + { + try + { + await Task.Delay(TimeSpan.FromMinutes(1), ea.CancellationToken); + } + catch (OperationCanceledException) + { + tcsShutdownCancelled.SetResult(true); + } + }; + consumer.ReceivedAsync += async (model, ea) => + { + tcsMessageReceived.SetResult(true); + try + { + await Task.Delay(TimeSpan.FromMinutes(1), ea.CancellationToken); + } + catch (OperationCanceledException) + { + tcsReceivedCancelled.SetResult(true); + } + }; + await _channel.BasicConsumeAsync(queueName, false, consumer); + + //publisher + await using IChannel publisherChannel = await _conn.CreateChannelAsync(_createChannelOptions); + byte[] messageBodyBytes = "Hello, world!"u8.ToArray(); + var props = new BasicProperties(); + await publisherChannel.BasicPublishAsync(exchange: exchangeName, routingKey: string.Empty, + mandatory: false, basicProperties: props, body: messageBodyBytes); + + await WaitAsync(tcsMessageReceived, TimeSpan.FromSeconds(5), "Consumer received message"); + + await _channel.CloseAsync(); + + await WaitAsync(tcsMessageReceived, TimeSpan.FromSeconds(5), "Consumer closed"); + await WaitAsync(tcsShutdownCancelled, TimeSpan.FromSeconds(5), "Consumer closed"); + } + } +} diff --git a/projects/Test/Integration/TestChannelShutdown.cs b/projects/Test/Integration/TestChannelShutdown.cs index 3b2e6f2d3..52337841d 100644 --- a/projects/Test/Integration/TestChannelShutdown.cs +++ b/projects/Test/Integration/TestChannelShutdown.cs @@ -64,6 +64,43 @@ public async Task TestConsumerDispatcherShutdown() Assert.True(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should be shut down after CloseAsync"); } + [Fact] + public async Task TestChannelClose() + { + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + _channel.ChannelShutdownAsync += (channel, args) => + { + tcs.SetResult(true); + return Task.CompletedTask; + }; + + await _channel.CloseAsync(); + await WaitAsync(tcs, TimeSpan.FromSeconds(5), "channel shutdown"); + } + + [Fact] + public async Task TestChannelCloseWithCancellation() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + _channel.ChannelShutdownAsync += async (channel, args) => + { + try + { + await Task.Delay(TimeSpan.FromMinutes(1), args.CancellationToken); + } + catch (OperationCanceledException) + { + tcs.SetResult(true); + } + }; + + await _channel.CloseAsync(cts.Token); + await WaitAsync(tcs, TimeSpan.FromSeconds(5), "channel shutdown"); + } + [Fact] public async Task TestConcurrentDisposeAsync_GH1749() { diff --git a/projects/Test/Integration/TestConnectionShutdown.cs b/projects/Test/Integration/TestConnectionShutdown.cs index 0a7a380b8..c609c3e78 100644 --- a/projects/Test/Integration/TestConnectionShutdown.cs +++ b/projects/Test/Integration/TestConnectionShutdown.cs @@ -31,6 +31,7 @@ using System; using System.IO; +using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Exceptions; @@ -91,6 +92,33 @@ public async Task TestAbortWithSocketClosedOutOfBand() await WaitAsync(tcs, TimeSpan.FromSeconds(6), "channel shutdown"); } + [Fact] + public async Task TestAbortWithSocketClosedOutOfBandAndCancellation() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1)); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + _channel.ChannelShutdownAsync += async (channel, args) => + { + try + { + await Task.Delay(TimeSpan.FromMinutes(1), args.CancellationToken); + } + catch (OperationCanceledException) + { + tcs.SetResult(true); + } + }; + + var c = (AutorecoveringConnection)_conn; + await c.CloseFrameHandlerAsync(); + + await _conn.AbortAsync(cts.Token); + + // default Connection.Abort() timeout and then some + await WaitAsync(tcs, TimeSpan.FromSeconds(6), "channel shutdown"); + } + [Fact] public async Task TestDisposedWithSocketClosedOutOfBand() { @@ -134,6 +162,52 @@ public async Task TestShutdownSignalPropagationToChannels() await WaitAsync(tcs, TimeSpan.FromSeconds(3), "channel shutdown"); } + [Fact] + public async Task TestShutdownCancellation() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1)); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + _conn.ConnectionShutdownAsync += async (channel, args) => + { + try + { + await Task.Delay(TimeSpan.FromMinutes(1), args.CancellationToken); + } + catch (OperationCanceledException) + { + tcs.SetResult(true); + } + }; + + await _conn.CloseAsync(cancellationToken: cts.Token); + + await WaitAsync(tcs, TimeSpan.FromSeconds(3), "connection shutdown"); + } + + [Fact] + public async Task TestShutdownSignalPropagationWithCancellationToChannels() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1)); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + _channel.ChannelShutdownAsync += async (channel, args) => + { + try + { + await Task.Delay(TimeSpan.FromMinutes(1), args.CancellationToken); + } + catch (OperationCanceledException) + { + tcs.SetResult(true); + } + }; + + await _conn.CloseAsync(cts.Token); + + await WaitAsync(tcs, TimeSpan.FromSeconds(3), "channel shutdown"); + } + [Fact] public async Task TestShutdownSignalPropagationToChannelsUsingDispose() {