Skip to content

Commit

Permalink
* Make ConfirmSelectAsync private and assume that semaphore is held.
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken committed Oct 2, 2024
1 parent 88aa14d commit e65c6fb
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 75 deletions.
18 changes: 0 additions & 18 deletions projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,6 @@ await newChannel.BasicQosAsync(0, _prefetchCountGlobal, true, cancellationToken)
.ConfigureAwait(false);
}

if (_publisherConfirmationsEnabled)
{
await newChannel.ConfirmSelectAsync(_publisherConfirmationTrackingEnabled, cancellationToken)
.ConfigureAwait(false);
}

if (_usesTransactions)
{
await newChannel.TxSelectAsync(cancellationToken)
Expand Down Expand Up @@ -350,18 +344,6 @@ public Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global,
return _innerChannel.BasicQosAsync(prefetchSize, prefetchCount, global, cancellationToken);
}

public Task ConfirmSelectAsync(bool publisherConfirmationTrackingEnabled = false, CancellationToken cancellationToken = default)
{
/*
* Note:
* No need to pass this on to InnerChannel, as confirms will have already
* been enabled
*/
_publisherConfirmationsEnabled = true;
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
return Task.CompletedTask;
}

public async Task ExchangeBindAsync(string destination, string source, string routingKey,
IDictionary<string, object?>? arguments, bool noWait,
CancellationToken cancellationToken)
Expand Down
114 changes: 57 additions & 57 deletions projects/RabbitMQ.Client/Impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -386,13 +386,21 @@ await _rpcSemaphore.WaitAsync(k.CancellationToken)
try
{
enqueued = Enqueue(k);
if (enqueued)
{
var method = new ChannelOpen();
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

var method = new ChannelOpen();
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);
bool result = await k;
Debug.Assert(result);

bool result = await k;
Debug.Assert(result);
if (_publisherConfirmationsEnabled)
{
await ConfirmSelectAsync(publisherConfirmationTrackingEnabled, cancellationToken)
.ConfigureAwait(false);
}
}
}
finally
{
Expand All @@ -403,13 +411,6 @@ await ModelSendAsync(in method, k.CancellationToken)
_rpcSemaphore.Release();
}

if (_publisherConfirmationsEnabled)
{
// TODO bring this back within RPC semaphore
await ConfirmSelectAsync(publisherConfirmationTrackingEnabled, cancellationToken)
.ConfigureAwait(false);
}

return this;
}

Expand Down Expand Up @@ -1244,51 +1245,6 @@ await ModelSendAsync(in method, k.CancellationToken)
}
}

// TODO internal
// TODO rpc semaphore held
public async Task ConfirmSelectAsync(bool publisherConfirmationTrackingEnablefd = false,
CancellationToken cancellationToken = default)
{
_publisherConfirmationsEnabled = true;
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnablefd;

bool enqueued = false;
var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken);

await _rpcSemaphore.WaitAsync(k.CancellationToken)
.ConfigureAwait(false);
try
{
if (_nextPublishSeqNo == 0UL)
{
if (_publisherConfirmationTrackingEnabled)
{
_pendingDeliveryTags.Clear();
_confirmsTaskCompletionSources.Clear();
}
_nextPublishSeqNo = 1;
}

enqueued = Enqueue(k);

var method = new ConfirmSelect(false);
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

bool result = await k;
Debug.Assert(result);
return;
}
finally
{
if (false == enqueued)
{
k.Dispose();
}
_rpcSemaphore.Release();
}
}

public async Task ExchangeBindAsync(string destination, string source, string routingKey,
IDictionary<string, object?>? arguments, bool noWait,
CancellationToken cancellationToken)
Expand Down Expand Up @@ -1773,6 +1729,50 @@ await ModelSendAsync(in method, k.CancellationToken)
}
}

// NOTE: _rpcSemaphore is held
private async Task ConfirmSelectAsync(bool publisherConfirmationTrackingEnablefd = false,
CancellationToken cancellationToken = default)
{
_publisherConfirmationsEnabled = true;
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnablefd;

bool enqueued = false;
var k = new ConfirmSelectAsyncRpcContinuation(ContinuationTimeout, cancellationToken);

try
{
if (_nextPublishSeqNo == 0UL)
{
if (_publisherConfirmationTrackingEnabled)
{
_pendingDeliveryTags.Clear();
_confirmsTaskCompletionSources.Clear();
}
_nextPublishSeqNo = 1;
}

enqueued = Enqueue(k);
if (enqueued)
{
var method = new ConfirmSelect(false);
await ModelSendAsync(in method, k.CancellationToken)
.ConfigureAwait(false);

bool result = await k;
Debug.Assert(result);
}

return;
}
finally
{
if (false == enqueued)
{
k.Dispose();
}
}
}

// NOTE: this method is internal for its use in this test:
// TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout_ReturnFalse
private async Task HandleAckNack(ulong deliveryTag, bool multiple, bool isNack, CancellationToken cancellationToken = default)
Expand Down

0 comments on commit e65c6fb

Please sign in to comment.