From a5d8c695470b59cb397ec4f9f09cd67c1e6dc784 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Fri, 4 Oct 2024 09:29:30 -0700 Subject: [PATCH] Extend the use of `_confirmSemaphore` to the duration of when exceptions could be caught. --- projects/RabbitMQ.Client/Impl/ChannelBase.cs | 127 ++++++++----------- 1 file changed, 52 insertions(+), 75 deletions(-) diff --git a/projects/RabbitMQ.Client/Impl/ChannelBase.cs b/projects/RabbitMQ.Client/Impl/ChannelBase.cs index 07a93b0ef..bc025b9d7 100644 --- a/projects/RabbitMQ.Client/Impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/Impl/ChannelBase.cs @@ -1005,12 +1005,13 @@ public async ValueTask BasicPublishAsync(string exchange, str { TaskCompletionSource? publisherConfirmationTcs = null; ulong publishSequenceNumber = 0; - if (_publisherConfirmationsEnabled) + try { - await _confirmSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); - try + if (_publisherConfirmationsEnabled) { + await _confirmSemaphore.WaitAsync(cancellationToken) + .ConfigureAwait(false); + publishSequenceNumber = _nextPublishSeqNo; if (_publisherConfirmationTrackingEnabled) @@ -1021,14 +1022,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken) _nextPublishSeqNo++; } - finally - { - _confirmSemaphore.Release(); - } - } - try - { var cmd = new BasicPublish(exchange, routingKey, mandatory, default); using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners @@ -1055,19 +1049,10 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken) { if (_publisherConfirmationsEnabled) { - await _confirmSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); - try - { - _nextPublishSeqNo--; - if (_publisherConfirmationTrackingEnabled) - { - _confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _); - } - } - finally + _nextPublishSeqNo--; + if (_publisherConfirmationTrackingEnabled) { - _confirmSemaphore.Release(); + _confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _); } } @@ -1080,6 +1065,13 @@ await _confirmSemaphore.WaitAsync(cancellationToken) throw; } } + finally + { + if (_publisherConfirmationsEnabled) + { + _confirmSemaphore.Release(); + } + } if (publisherConfirmationTcs is not null) { @@ -1101,12 +1093,13 @@ public async ValueTask BasicPublishAsync(CachedString exchang { TaskCompletionSource? publisherConfirmationTcs = null; ulong publishSequenceNumber = 0; - if (_publisherConfirmationsEnabled) + try { - await _confirmSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); - try + if (_publisherConfirmationsEnabled) { + await _confirmSemaphore.WaitAsync(cancellationToken) + .ConfigureAwait(false); + publishSequenceNumber = _nextPublishSeqNo; if (_publisherConfirmationTrackingEnabled) @@ -1117,14 +1110,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken) _nextPublishSeqNo++; } - finally - { - _confirmSemaphore.Release(); - } - } - try - { var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners ? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length) @@ -1150,19 +1136,10 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken) { if (_publisherConfirmationsEnabled) { - await _confirmSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); - try - { - _nextPublishSeqNo--; - if (_publisherConfirmationTrackingEnabled) - { - _confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _); - } - } - finally + _nextPublishSeqNo--; + if (_publisherConfirmationTrackingEnabled) { - _confirmSemaphore.Release(); + _confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _); } } @@ -1175,6 +1152,13 @@ await _confirmSemaphore.WaitAsync(cancellationToken) throw; } } + finally + { + if (_publisherConfirmationsEnabled) + { + _confirmSemaphore.Release(); + } + } if (publisherConfirmationTcs is not null) { @@ -1793,7 +1777,7 @@ private Task HandleAck(ulong deliveryTag, bool multiple, CancellationToken cance { if (multiple) { - foreach (var pair in _confirmsTaskCompletionSources) + foreach (KeyValuePair> pair in _confirmsTaskCompletionSources) { if (pair.Key <= deliveryTag) { @@ -1810,6 +1794,7 @@ private Task HandleAck(ulong deliveryTag, bool multiple, CancellationToken cance } } } + return Task.CompletedTask; } @@ -1819,11 +1804,11 @@ private Task HandleNack(ulong deliveryTag, bool multiple, CancellationToken canc { if (multiple) { - foreach (var pair in _confirmsTaskCompletionSources) + foreach (KeyValuePair> pair in _confirmsTaskCompletionSources) { if (pair.Key <= deliveryTag) { - pair.Value.SetException(new Exception("TBD")); + pair.Value.SetException(new Exception("TODO")); _confirmsTaskCompletionSources.Remove(pair.Key, out _); } } @@ -1832,7 +1817,7 @@ private Task HandleNack(ulong deliveryTag, bool multiple, CancellationToken canc { if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource? tcs)) { - tcs.SetException(new Exception("TBD")); + tcs.SetException(new Exception("TODO")); } } } @@ -1844,39 +1829,31 @@ private Task HandleNack(ulong deliveryTag, bool multiple, CancellationToken canc Activity? sendActivity, ulong publishSequenceNumber) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { - if (sendActivity is null && false == _publisherConfirmationsEnabled) + /* + * Note: there is nothing to do in this method if *both* of these + * conditions are true: + * + * sendActivity is null - there is no activity to add as a header + * publisher confirmations are NOT enabled + */ + if (sendActivity is null && !_publisherConfirmationsEnabled) { return null; } - var newHeaders = new Dictionary(); - MaybeAddActivityToHeaders(newHeaders, basicProperties.CorrelationId, sendActivity); - MaybeAddPublishSequenceNumberToHeaders(newHeaders); + IDictionary? headers = basicProperties.Headers; + headers ??= new Dictionary(); + MaybeAddActivityToHeaders(headers, basicProperties.CorrelationId, sendActivity); + MaybeAddPublishSequenceNumberToHeaders(headers); switch (basicProperties) { case BasicProperties writableProperties: - MergeHeaders(newHeaders, writableProperties); return null; case EmptyBasicProperty: - return new BasicProperties { Headers = newHeaders }; + return new BasicProperties { Headers = headers }; default: - return new BasicProperties(basicProperties) { Headers = newHeaders }; - } - - static void MergeHeaders(IDictionary newHeaders, BasicProperties props) - { - if (props.Headers is null) - { - props.Headers = newHeaders; - } - else - { - foreach (KeyValuePair val in newHeaders) - { - props.Headers[val.Key] = val.Value; - } - } + return new BasicProperties(basicProperties) { Headers = headers }; } void MaybeAddActivityToHeaders(IDictionary headers, @@ -1902,9 +1879,9 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary headers { if (_publisherConfirmationsEnabled) { - var publishSequenceNumberBytes = new byte[8]; - NetworkOrderSerializer.WriteUInt64(ref publishSequenceNumberBytes.AsSpan().GetStart(), publishSequenceNumber); - headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes; + Span publishSequenceNumberBytes = stackalloc byte[8]; + NetworkOrderSerializer.WriteUInt64(ref publishSequenceNumberBytes.GetStart(), publishSequenceNumber); + headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes.ToArray(); } } }