Skip to content

Commit

Permalink
Extend the use of _confirmSemaphore to the duration of when excepti…
Browse files Browse the repository at this point in the history
…ons could be caught.
  • Loading branch information
lukebakken committed Oct 4, 2024
1 parent 792096d commit 45b7dfa
Showing 1 changed file with 52 additions and 75 deletions.
127 changes: 52 additions & 75 deletions projects/RabbitMQ.Client/Impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1005,12 +1005,13 @@ public async ValueTask<bool> BasicPublishAsync<TProperties>(string exchange, str
{
TaskCompletionSource<bool>? publisherConfirmationTcs = null;
ulong publishSequenceNumber = 0;
if (_publisherConfirmationsEnabled)
try
{
await _confirmSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
if (_publisherConfirmationsEnabled)
{
await _confirmSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);

publishSequenceNumber = _nextPublishSeqNo;

if (_publisherConfirmationTrackingEnabled)
Expand All @@ -1021,14 +1022,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)

_nextPublishSeqNo++;
}
finally
{
_confirmSemaphore.Release();
}
}

try
{
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);

using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
Expand All @@ -1055,19 +1049,10 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken)
{
if (_publisherConfirmationsEnabled)
{
await _confirmSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
_nextPublishSeqNo--;
if (_publisherConfirmationTrackingEnabled)
{
_confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _);
}
}
finally
_nextPublishSeqNo--;
if (_publisherConfirmationTrackingEnabled)
{
_confirmSemaphore.Release();
_confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _);
}
}

Expand All @@ -1080,6 +1065,13 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
throw;
}
}
finally
{
if (_publisherConfirmationsEnabled)
{
_confirmSemaphore.Release();
}
}

if (publisherConfirmationTcs is not null)
{
Expand All @@ -1101,12 +1093,13 @@ public async ValueTask<bool> BasicPublishAsync<TProperties>(CachedString exchang
{
TaskCompletionSource<bool>? publisherConfirmationTcs = null;
ulong publishSequenceNumber = 0;
if (_publisherConfirmationsEnabled)
try
{
await _confirmSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
if (_publisherConfirmationsEnabled)
{
await _confirmSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);

publishSequenceNumber = _nextPublishSeqNo;

if (_publisherConfirmationTrackingEnabled)
Expand All @@ -1117,14 +1110,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)

_nextPublishSeqNo++;
}
finally
{
_confirmSemaphore.Release();
}
}

try
{
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length)
Expand All @@ -1150,19 +1136,10 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken)
{
if (_publisherConfirmationsEnabled)
{
await _confirmSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
_nextPublishSeqNo--;
if (_publisherConfirmationTrackingEnabled)
{
_confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _);
}
}
finally
_nextPublishSeqNo--;
if (_publisherConfirmationTrackingEnabled)
{
_confirmSemaphore.Release();
_confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _);
}
}

Expand All @@ -1175,6 +1152,13 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
throw;
}
}
finally
{
if (_publisherConfirmationsEnabled)
{
_confirmSemaphore.Release();
}
}

if (publisherConfirmationTcs is not null)
{
Expand Down Expand Up @@ -1793,7 +1777,7 @@ private Task HandleAck(ulong deliveryTag, bool multiple, CancellationToken cance
{
if (multiple)
{
foreach (var pair in _confirmsTaskCompletionSources)
foreach (KeyValuePair<ulong, TaskCompletionSource<bool>> pair in _confirmsTaskCompletionSources)
{
if (pair.Key <= deliveryTag)
{
Expand All @@ -1810,6 +1794,7 @@ private Task HandleAck(ulong deliveryTag, bool multiple, CancellationToken cance
}
}
}

return Task.CompletedTask;
}

Expand All @@ -1819,11 +1804,11 @@ private Task HandleNack(ulong deliveryTag, bool multiple, CancellationToken canc
{
if (multiple)
{
foreach (var pair in _confirmsTaskCompletionSources)
foreach (KeyValuePair<ulong, TaskCompletionSource<bool>> pair in _confirmsTaskCompletionSources)
{
if (pair.Key <= deliveryTag)
{
pair.Value.SetException(new Exception("TBD"));
pair.Value.SetException(new Exception("TODO"));
_confirmsTaskCompletionSources.Remove(pair.Key, out _);
}
}
Expand All @@ -1832,7 +1817,7 @@ private Task HandleNack(ulong deliveryTag, bool multiple, CancellationToken canc
{
if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource<bool>? tcs))
{
tcs.SetException(new Exception("TBD"));
tcs.SetException(new Exception("TODO"));
}
}
}
Expand All @@ -1844,39 +1829,31 @@ private Task HandleNack(ulong deliveryTag, bool multiple, CancellationToken canc
Activity? sendActivity, ulong publishSequenceNumber)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
if (sendActivity is null && false == _publisherConfirmationsEnabled)
/*
* Note: there is nothing to do in this method if *both* of these
* conditions are true:
*
* sendActivity is null - there is no activity to add as a header
* publisher confirmations are NOT enabled
*/
if (sendActivity is null && !_publisherConfirmationsEnabled)
{
return null;
}

var newHeaders = new Dictionary<string, object?>();
MaybeAddActivityToHeaders(newHeaders, basicProperties.CorrelationId, sendActivity);
MaybeAddPublishSequenceNumberToHeaders(newHeaders);
IDictionary<string, object?>? headers = basicProperties.Headers;
headers ??= new Dictionary<string, object?>();
MaybeAddActivityToHeaders(headers, basicProperties.CorrelationId, sendActivity);
MaybeAddPublishSequenceNumberToHeaders(headers);

switch (basicProperties)
{
case BasicProperties writableProperties:
MergeHeaders(newHeaders, writableProperties);
return null;
case EmptyBasicProperty:
return new BasicProperties { Headers = newHeaders };
return new BasicProperties { Headers = headers };
default:
return new BasicProperties(basicProperties) { Headers = newHeaders };
}

static void MergeHeaders(IDictionary<string, object?> newHeaders, BasicProperties props)
{
if (props.Headers is null)
{
props.Headers = newHeaders;
}
else
{
foreach (KeyValuePair<string, object?> val in newHeaders)
{
props.Headers[val.Key] = val.Value;
}
}
return new BasicProperties(basicProperties) { Headers = headers };
}

void MaybeAddActivityToHeaders(IDictionary<string, object?> headers,
Expand All @@ -1902,9 +1879,9 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers
{
if (_publisherConfirmationsEnabled)
{
var publishSequenceNumberBytes = new byte[8];
NetworkOrderSerializer.WriteUInt64(ref publishSequenceNumberBytes.AsSpan().GetStart(), publishSequenceNumber);
headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes;
Span<byte> publishSequenceNumberBytes = stackalloc byte[8];
NetworkOrderSerializer.WriteUInt64(ref publishSequenceNumberBytes.GetStart(), publishSequenceNumber);
headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes.ToArray();
}
}
}
Expand Down

0 comments on commit 45b7dfa

Please sign in to comment.