From e65c6fb16a4868108282e98030e510777e374790 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 26 Sep 2024 21:08:58 -0700 Subject: [PATCH] * Make `ConfirmSelectAsync` `private` and assume that semaphore is held. --- .../Impl/AutorecoveringChannel.cs | 18 --- projects/RabbitMQ.Client/Impl/ChannelBase.cs | 114 +++++++++--------- 2 files changed, 57 insertions(+), 75 deletions(-) diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs index 4f1bba5a4..2a2efec63 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs @@ -181,12 +181,6 @@ await newChannel.BasicQosAsync(0, _prefetchCountGlobal, true, cancellationToken) .ConfigureAwait(false); } - if (_publisherConfirmationsEnabled) - { - await newChannel.ConfirmSelectAsync(_publisherConfirmationTrackingEnabled, cancellationToken) - .ConfigureAwait(false); - } - if (_usesTransactions) { await newChannel.TxSelectAsync(cancellationToken) @@ -350,18 +344,6 @@ public Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global, return _innerChannel.BasicQosAsync(prefetchSize, prefetchCount, global, cancellationToken); } - public Task ConfirmSelectAsync(bool publisherConfirmationTrackingEnabled = false, CancellationToken cancellationToken = default) - { - /* - * Note: - * No need to pass this on to InnerChannel, as confirms will have already - * been enabled - */ - _publisherConfirmationsEnabled = true; - _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled; - return Task.CompletedTask; - } - public async Task ExchangeBindAsync(string destination, string source, string routingKey, IDictionary? arguments, bool noWait, CancellationToken cancellationToken) diff --git a/projects/RabbitMQ.Client/Impl/ChannelBase.cs b/projects/RabbitMQ.Client/Impl/ChannelBase.cs index 9ecd5514b..89b4f30d1 100644 --- a/projects/RabbitMQ.Client/Impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/Impl/ChannelBase.cs @@ -386,13 +386,21 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken) try { enqueued = Enqueue(k); + if (enqueued) + { + var method = new ChannelOpen(); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); - var method = new ChannelOpen(); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); + bool result = await k; + Debug.Assert(result); - bool result = await k; - Debug.Assert(result); + if (_publisherConfirmationsEnabled) + { + await ConfirmSelectAsync(publisherConfirmationTrackingEnabled, cancellationToken) + .ConfigureAwait(false); + } + } } finally { @@ -403,13 +411,6 @@ await ModelSendAsync(in method, k.CancellationToken) _rpcSemaphore.Release(); } - if (_publisherConfirmationsEnabled) - { - // TODO bring this back within RPC semaphore - await ConfirmSelectAsync(publisherConfirmationTrackingEnabled, cancellationToken) - .ConfigureAwait(false); - } - return this; } @@ -1244,51 +1245,6 @@ await ModelSendAsync(in method, k.CancellationToken) } } - // TODO internal - // TODO rpc semaphore held - public async Task ConfirmSelectAsync(bool publisherConfirmationTrackingEnablefd = false, - CancellationToken cancellationToken = default) - { - _publisherConfirmationsEnabled = true; - _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnablefd; - - bool enqueued = false; - var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken); - - await _rpcSemaphore.WaitAsync(k.CancellationToken) - .ConfigureAwait(false); - try - { - if (_nextPublishSeqNo == 0UL) - { - if (_publisherConfirmationTrackingEnabled) - { - _pendingDeliveryTags.Clear(); - _confirmsTaskCompletionSources.Clear(); - } - _nextPublishSeqNo = 1; - } - - enqueued = Enqueue(k); - - var method = new ConfirmSelect(false); - await ModelSendAsync(in method, k.CancellationToken) - .ConfigureAwait(false); - - bool result = await k; - Debug.Assert(result); - return; - } - finally - { - if (false == enqueued) - { - k.Dispose(); - } - _rpcSemaphore.Release(); - } - } - public async Task ExchangeBindAsync(string destination, string source, string routingKey, IDictionary? arguments, bool noWait, CancellationToken cancellationToken) @@ -1773,6 +1729,50 @@ await ModelSendAsync(in method, k.CancellationToken) } } + // NOTE: _rpcSemaphore is held + private async Task ConfirmSelectAsync(bool publisherConfirmationTrackingEnablefd = false, + CancellationToken cancellationToken = default) + { + _publisherConfirmationsEnabled = true; + _publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnablefd; + + bool enqueued = false; + var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken); + + try + { + if (_nextPublishSeqNo == 0UL) + { + if (_publisherConfirmationTrackingEnabled) + { + _pendingDeliveryTags.Clear(); + _confirmsTaskCompletionSources.Clear(); + } + _nextPublishSeqNo = 1; + } + + enqueued = Enqueue(k); + if (enqueued) + { + var method = new ConfirmSelect(false); + await ModelSendAsync(in method, k.CancellationToken) + .ConfigureAwait(false); + + bool result = await k; + Debug.Assert(result); + } + + return; + } + finally + { + if (false == enqueued) + { + k.Dispose(); + } + } + } + // NOTE: this method is internal for its use in this test: // TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout_ReturnFalse private async Task HandleAckNack(ulong deliveryTag, bool multiple, bool isNack, CancellationToken cancellationToken = default)