diff --git a/projects/RabbitMQ.Client/IChannel.cs b/projects/RabbitMQ.Client/IChannel.cs index ba2fd6b29..5694a5b5d 100644 --- a/projects/RabbitMQ.Client/IChannel.cs +++ b/projects/RabbitMQ.Client/IChannel.cs @@ -202,11 +202,12 @@ Task BasicConsumeAsync(string queue, bool autoAck, string consumerTag, b /// The message properties. /// The message body. /// CancellationToken for this operation. - /// Returns true if publisher confirmations enabled and the message was ack-ed /// /// Routing key must be shorter than 255 bytes. + /// TODO + /// Throws if a nack or basic.return is returned for the message. /// - ValueTask BasicPublishAsync(string exchange, string routingKey, + ValueTask BasicPublishAsync(string exchange, string routingKey, bool mandatory, TProperties basicProperties, ReadOnlyMemory body, CancellationToken cancellationToken = default) where TProperties : IReadOnlyBasicProperties, IAmqpHeader; @@ -220,11 +221,12 @@ ValueTask BasicPublishAsync(string exchange, string routingKe /// The message properties. /// The message body. /// CancellationToken for this operation. - /// Returns true if publisher confirmations enabled and the message was ack-ed /// /// Routing key must be shorter than 255 bytes. + /// TODO + /// Throws if a nack or basic.return is returned for the message. /// - ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, + ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, bool mandatory, TProperties basicProperties, ReadOnlyMemory body, CancellationToken cancellationToken = default) where TProperties : IReadOnlyBasicProperties, IAmqpHeader; diff --git a/projects/RabbitMQ.Client/IChannelExtensions.cs b/projects/RabbitMQ.Client/IChannelExtensions.cs index 472d1a37a..c7359e0db 100644 --- a/projects/RabbitMQ.Client/IChannelExtensions.cs +++ b/projects/RabbitMQ.Client/IChannelExtensions.cs @@ -78,7 +78,7 @@ public static Task BasicConsumeAsync(this IChannel channel, /// /// The publication occurs with mandatory=false. /// - public static ValueTask BasicPublishAsync(this IChannel channel, + public static ValueTask BasicPublishAsync(this IChannel channel, PublicationAddress addr, T basicProperties, ReadOnlyMemory body, @@ -94,7 +94,7 @@ public static ValueTask BasicPublishAsync(this IChannel channel, /// /// The publication occurs with mandatory=false and empty BasicProperties /// - public static ValueTask BasicPublishAsync(this IChannel channel, + public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory body, @@ -109,7 +109,7 @@ public static ValueTask BasicPublishAsync(this IChannel channel, /// /// The publication occurs with mandatory=false and empty BasicProperties /// - public static ValueTask BasicPublishAsync(this IChannel channel, + public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory body, @@ -124,7 +124,7 @@ public static ValueTask BasicPublishAsync(this IChannel channel, /// /// The publication occurs with empty BasicProperties /// - public static ValueTask BasicPublishAsync(this IChannel channel, + public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, bool mandatory, @@ -140,7 +140,7 @@ public static ValueTask BasicPublishAsync(this IChannel channel, /// /// The publication occurs with empty BasicProperties /// - public static ValueTask BasicPublishAsync(this IChannel channel, + public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, bool mandatory, diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs index 7533e2ab2..2a2efec63 100644 --- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs @@ -311,7 +311,7 @@ await _connection.RecordConsumerAsync(rc, recordedEntitiesSemaphoreHeld: false) public Task BasicGetAsync(string queue, bool autoAck, CancellationToken cancellationToken) => InnerChannel.BasicGetAsync(queue, autoAck, cancellationToken); - public ValueTask BasicPublishAsync(string exchange, string routingKey, + public ValueTask BasicPublishAsync(string exchange, string routingKey, bool mandatory, TProperties basicProperties, ReadOnlyMemory body, @@ -319,7 +319,7 @@ public ValueTask BasicPublishAsync(string exchange, string ro where TProperties : IReadOnlyBasicProperties, IAmqpHeader => InnerChannel.BasicPublishAsync(exchange, routingKey, mandatory, basicProperties, body, cancellationToken); - public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, + public ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, bool mandatory, TProperties basicProperties, ReadOnlyMemory body, diff --git a/projects/RabbitMQ.Client/Impl/ChannelBase.cs b/projects/RabbitMQ.Client/Impl/ChannelBase.cs index 7ccac987e..06cf89391 100644 --- a/projects/RabbitMQ.Client/Impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/Impl/ChannelBase.cs @@ -31,6 +31,7 @@ using System; using System.Buffers.Binary; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; @@ -43,6 +44,7 @@ using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing; +using RabbitMQ.Client.Util; namespace RabbitMQ.Client.Impl { @@ -62,10 +64,7 @@ internal abstract class ChannelBase : IChannel, IRecoverable private bool _publisherConfirmationTrackingEnabled = false; private ulong _nextPublishSeqNo = 0; private readonly SemaphoreSlim _confirmSemaphore = new(1, 1); - private readonly LinkedList _pendingDeliveryTags = new(); - private readonly Dictionary> _confirmsTaskCompletionSources = new(); - - private bool _onlyAcksReceived = true; + private readonly ConcurrentDictionary> _confirmsTaskCompletionSources = new(); private ShutdownEventArgs? _closeReason; public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason); @@ -505,7 +504,7 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken) .ConfigureAwait(false); try { - if (_confirmsTaskCompletionSources?.Count > 0) + if (!_confirmsTaskCompletionSources.IsEmpty) { var exception = new AlreadyClosedException(reason); foreach (TaskCompletionSource confirmsTaskCompletionSource in _confirmsTaskCompletionSources.Values) @@ -615,7 +614,7 @@ await _basicAcksAsyncWrapper.InvokeAsync(this, args) .ConfigureAwait(false); } - await HandleAckNack(ack._deliveryTag, ack._multiple, false, cancellationToken) + await HandleAck(ack._deliveryTag, ack._multiple, cancellationToken) .ConfigureAwait(false); return true; @@ -633,7 +632,7 @@ await _basicNacksAsyncWrapper.InvokeAsync(this, args) .ConfigureAwait(false); } - await HandleAckNack(nack._deliveryTag, nack._multiple, true, cancellationToken) + await HandleNack(nack._deliveryTag, nack._multiple, cancellationToken) .ConfigureAwait(false); return true; @@ -657,20 +656,14 @@ await _basicReturnAsyncWrapper.InvokeAsync(this, e) { ulong publishSequenceNumber = 0; IReadOnlyBasicProperties props = e.BasicProperties; - if (props.Headers is not null) + object? maybeSeqNum = props.Headers?[Constants.PublishSequenceNumberHeader]; + if (maybeSeqNum != null) { - object? maybeSeqNum = props.Headers[Constants.PublishSequenceNumberHeader]; - if (maybeSeqNum is not null) - { - publishSequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte[])maybeSeqNum); - } + publishSequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte[])maybeSeqNum); } - if (publishSequenceNumber != 0 && _publisherConfirmationTrackingEnabled) - { - await HandleAckNack(publishSequenceNumber, false, true, cancellationToken) - .ConfigureAwait(false); - } + await HandleNack(publishSequenceNumber, false, cancellationToken) + .ConfigureAwait(false); } return true; @@ -1005,38 +998,31 @@ await ModelSendAsync(in method, k.CancellationToken) } } - public async ValueTask BasicPublishAsync(string exchange, string routingKey, + public async ValueTask BasicPublishAsync(string exchange, string routingKey, bool mandatory, TProperties basicProperties, ReadOnlyMemory body, CancellationToken cancellationToken = default) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { TaskCompletionSource? publisherConfirmationTcs = null; ulong publishSequenceNumber = 0; - if (_publisherConfirmationsEnabled) + try { - await _confirmSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); - try + if (_publisherConfirmationsEnabled) { + await _confirmSemaphore.WaitAsync(cancellationToken) + .ConfigureAwait(false); + publishSequenceNumber = _nextPublishSeqNo; if (_publisherConfirmationTrackingEnabled) { - _pendingDeliveryTags.AddLast(publishSequenceNumber); publisherConfirmationTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); _confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs; } _nextPublishSeqNo++; } - finally - { - _confirmSemaphore.Release(); - } - } - try - { var cmd = new BasicPublish(exchange, routingKey, mandatory, default); using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners @@ -1063,19 +1049,10 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken) { if (_publisherConfirmationsEnabled) { - await _confirmSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); - try - { - _nextPublishSeqNo--; - if (_publisherConfirmationTrackingEnabled && _pendingDeliveryTags is not null) - { - _pendingDeliveryTags.RemoveLast(); - } - } - finally + _nextPublishSeqNo--; + if (_publisherConfirmationTrackingEnabled) { - _confirmSemaphore.Release(); + _confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _); } } @@ -1088,52 +1065,46 @@ await _confirmSemaphore.WaitAsync(cancellationToken) throw; } } + finally + { + if (_publisherConfirmationsEnabled) + { + _confirmSemaphore.Release(); + } + } if (publisherConfirmationTcs is not null) { await publisherConfirmationTcs.Task.WaitAsync(cancellationToken) .ConfigureAwait(false); - return await publisherConfirmationTcs.Task - .ConfigureAwait(false); - } - else - { - return true; } } - public async ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, + public async ValueTask BasicPublishAsync(CachedString exchange, CachedString routingKey, bool mandatory, TProperties basicProperties, ReadOnlyMemory body, CancellationToken cancellationToken = default) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { TaskCompletionSource? publisherConfirmationTcs = null; ulong publishSequenceNumber = 0; - if (_publisherConfirmationsEnabled) + try { - await _confirmSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); - try + if (_publisherConfirmationsEnabled) { + await _confirmSemaphore.WaitAsync(cancellationToken) + .ConfigureAwait(false); + publishSequenceNumber = _nextPublishSeqNo; if (_publisherConfirmationTrackingEnabled) { - _pendingDeliveryTags.AddLast(publishSequenceNumber); publisherConfirmationTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); _confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs; } _nextPublishSeqNo++; } - finally - { - _confirmSemaphore.Release(); - } - } - try - { var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default); using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners ? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length) @@ -1159,19 +1130,10 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken) { if (_publisherConfirmationsEnabled) { - await _confirmSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); - try - { - _nextPublishSeqNo--; - if (_publisherConfirmationTrackingEnabled && _pendingDeliveryTags is not null) - { - _pendingDeliveryTags.RemoveLast(); - } - } - finally + _nextPublishSeqNo--; + if (_publisherConfirmationTrackingEnabled) { - _confirmSemaphore.Release(); + _confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _); } } @@ -1184,17 +1146,18 @@ await _confirmSemaphore.WaitAsync(cancellationToken) throw; } } + finally + { + if (_publisherConfirmationsEnabled) + { + _confirmSemaphore.Release(); + } + } if (publisherConfirmationTcs is not null) { await publisherConfirmationTcs.Task.WaitAsync(cancellationToken) .ConfigureAwait(false); - return await publisherConfirmationTcs.Task - .ConfigureAwait(false); - } - else - { - return true; } } @@ -1770,7 +1733,6 @@ private async Task ConfirmSelectAsync(bool publisherConfirmationTrackingEnablefd { if (_publisherConfirmationTrackingEnabled) { - _pendingDeliveryTags.Clear(); _confirmsTaskCompletionSources.Clear(); } _nextPublishSeqNo = 1; @@ -1796,123 +1758,98 @@ await ModelSendAsync(in method, k.CancellationToken) } } - // TODO NOTE: this method used to be internal for its use in this test: - // TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout_ReturnFalse - private async Task HandleAckNack(ulong deliveryTag, bool multiple, bool isNack, CancellationToken cancellationToken = default) + private Task HandleAck(ulong deliveryTag, bool multiple, CancellationToken cancellationToken = default) { - bool isAck = false == isNack; - - // Only do this if confirms are enabled *and* the library is tracking confirmations - if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled) + if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled && deliveryTag > 0 && !_confirmsTaskCompletionSources.IsEmpty) { - // let's take a lock so we can assume that deliveryTags are unique, never duplicated and always sorted - await _confirmSemaphore.WaitAsync(cancellationToken) - .ConfigureAwait(false); - try + if (multiple) { - // No need to do anything if there are no delivery tags in the list - if (_pendingDeliveryTags.Count > 0) + foreach (KeyValuePair> pair in _confirmsTaskCompletionSources) { - if (multiple) - { - do - { - if (_pendingDeliveryTags.First is null) - { - break; - } - else - { - ulong pendingDeliveryTag = _pendingDeliveryTags.First.Value; - if (pendingDeliveryTag > deliveryTag) - { - break; - } - else - { - TaskCompletionSource tcs = _confirmsTaskCompletionSources[pendingDeliveryTag]; - tcs.SetResult(isAck); - _confirmsTaskCompletionSources.Remove(pendingDeliveryTag); - _pendingDeliveryTags.RemoveFirst(); - } - } - } - while (true); - } - else + if (pair.Key <= deliveryTag) { - /* - * Note: - * In the case of `basic.return`, the TCS will have been handled and removed by HandleBasicReturn() - * RabbitMQ still sends `basic.ack`, so the TCS will not be in the dict, hence, TryGetValue here - */ - if (_confirmsTaskCompletionSources.TryGetValue(deliveryTag, out TaskCompletionSource? tcs)) - { - tcs.SetResult(isAck); - _confirmsTaskCompletionSources.Remove(deliveryTag); - _pendingDeliveryTags.Remove(deliveryTag); - } + pair.Value.SetResult(true); + _confirmsTaskCompletionSources.Remove(pair.Key, out _); } } + } + else + { + if (_confirmsTaskCompletionSources.TryRemove(deliveryTag, out TaskCompletionSource? tcs)) + { + tcs.SetResult(true); + } + } + } - _onlyAcksReceived = _onlyAcksReceived && isAck; + return Task.CompletedTask; + } - if (_pendingDeliveryTags.Count == 0 && _confirmsTaskCompletionSources.Count > 0) + private Task HandleNack(ulong deliveryTag, bool multiple, CancellationToken cancellationToken = default) + { + if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled && deliveryTag > 0 && !_confirmsTaskCompletionSources.IsEmpty) + { + if (multiple) + { + foreach (KeyValuePair> pair in _confirmsTaskCompletionSources) { - // Done, mark tasks - foreach (TaskCompletionSource tcs in _confirmsTaskCompletionSources.Values) + if (pair.Key <= deliveryTag) { - tcs.TrySetResult(_onlyAcksReceived); + pair.Value.SetException(new Exception("TODO")); + _confirmsTaskCompletionSources.Remove(pair.Key, out _); } - - _confirmsTaskCompletionSources.Clear(); - _onlyAcksReceived = true; } } - finally + else { - _confirmSemaphore.Release(); + if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource? tcs)) + { + tcs.SetException(new Exception("TODO")); + } } } + + return Task.CompletedTask; } private BasicProperties? PopulateBasicPropertiesHeaders(TProperties basicProperties, Activity? sendActivity, ulong publishSequenceNumber) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { - if (sendActivity is null && false == _publisherConfirmationsEnabled) + /* + * Note: there is nothing to do in this method if *both* of these + * conditions are true: + * + * sendActivity is null - there is no activity to add as a header + * publisher confirmations are NOT enabled + */ + if (sendActivity is null && !_publisherConfirmationsEnabled) { return null; } - var newHeaders = new Dictionary(); - MaybeAddActivityToHeaders(newHeaders, basicProperties.CorrelationId, sendActivity); - MaybeAddPublishSequenceNumberToHeaders(newHeaders); + bool newHeaders = false; + IDictionary? headers = basicProperties.Headers; + if (headers is null) + { + headers = new Dictionary(); + newHeaders = true; + } + MaybeAddActivityToHeaders(headers, basicProperties.CorrelationId, sendActivity); + MaybeAddPublishSequenceNumberToHeaders(headers); switch (basicProperties) { case BasicProperties writableProperties: - MergeHeaders(newHeaders, writableProperties); + if (newHeaders) + { + writableProperties.Headers = headers; + } return null; case EmptyBasicProperty: - return new BasicProperties { Headers = newHeaders }; + return new BasicProperties { Headers = headers }; default: - return new BasicProperties(basicProperties) { Headers = newHeaders }; - } - - static void MergeHeaders(IDictionary newHeaders, BasicProperties props) - { - if (props.Headers is null) - { - props.Headers = newHeaders; - } - else - { - foreach (KeyValuePair val in newHeaders) - { - props.Headers[val.Key] = val.Value; - } - } + return new BasicProperties(basicProperties) { Headers = headers }; } void MaybeAddActivityToHeaders(IDictionary headers, @@ -1938,16 +1875,8 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary headers { if (_publisherConfirmationsEnabled) { - byte[] publishSequenceNumberBytes; - if (BitConverter.IsLittleEndian) - { - publishSequenceNumberBytes = BitConverter.GetBytes(BinaryPrimitives.ReverseEndianness(publishSequenceNumber)); - } - else - { - publishSequenceNumberBytes = BitConverter.GetBytes(publishSequenceNumber); - } - + byte[] publishSequenceNumberBytes = new byte[8]; + NetworkOrderSerializer.WriteUInt64(ref publishSequenceNumberBytes.GetStart(), publishSequenceNumber); headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes; } } diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index 5f0bc4705..f15f696f9 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -11,12 +11,12 @@ RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedExc RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(RabbitMQ.Client.Events.ShutdownEventArgs! reason, string! prefix) -> void RabbitMQ.Client.Exceptions.ProtocolViolationException.ProtocolViolationException() -> void RabbitMQ.Client.Exceptions.RabbitMQClientException.RabbitMQClientException() -> void -RabbitMQ.Client.IChannel.BasicPublishAsync(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask -RabbitMQ.Client.IChannel.BasicPublishAsync(string! exchange, string! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +RabbitMQ.Client.IChannel.BasicPublishAsync(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +RabbitMQ.Client.IChannel.BasicPublishAsync(string! exchange, string! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask RabbitMQ.Client.IConnection.CreateChannelAsync(RabbitMQ.Client.CreateChannelOptions? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! static RabbitMQ.Client.CreateChannelOptions.Default.get -> RabbitMQ.Client.CreateChannelOptions! -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask -static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask \ No newline at end of file +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.PublicationAddress! addr, T basicProperties, System.ReadOnlyMemory body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask \ No newline at end of file diff --git a/projects/Test/Applications/GH-1647/Program.cs b/projects/Test/Applications/GH-1647/Program.cs index 64c363205..18ac9e57d 100644 --- a/projects/Test/Applications/GH-1647/Program.cs +++ b/projects/Test/Applications/GH-1647/Program.cs @@ -55,14 +55,15 @@ { await using var channel = await connection.CreateChannelAsync(channelOptions); // New channel for each message await Task.Delay(1000); - if (await channel.BasicPublishAsync(exchange: string.Empty, routingKey: string.Empty, - mandatory: false, basicProperties: props, body: msg)) + try { + await channel.BasicPublishAsync(exchange: string.Empty, routingKey: string.Empty, + mandatory: false, basicProperties: props, body: msg); Console.WriteLine($"Sent message {i}"); } - else + catch (Exception ex) { - Console.Error.WriteLine($"[ERROR] message {i} not acked!"); + Console.Error.WriteLine($"[ERROR] message {i} not acked: {ex}"); } } catch (Exception ex) diff --git a/projects/Test/Applications/MassPublish/Program.cs b/projects/Test/Applications/MassPublish/Program.cs index 4cb8caa73..3e074e3ff 100644 --- a/projects/Test/Applications/MassPublish/Program.cs +++ b/projects/Test/Applications/MassPublish/Program.cs @@ -140,15 +140,18 @@ await consumeChannel.BasicConsumeAsync(queue: QueueName, autoAck: true, consumer using IChannel publishChannel = await publishConnection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); publishChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync; - bool ack = false; for (int i = 0; i < ItemsPerBatch; i++) { - ack = await publishChannel.BasicPublishAsync(exchange: ExchangeName, routingKey: RoutingKey, - basicProperties: s_properties, body: s_payload, mandatory: true); - Interlocked.Increment(ref s_messagesSent); - if (false == ack) + try { - Console.Error.WriteLine("[ERROR] channel {0} saw nack!", publishChannel.ChannelNumber); + await publishChannel.BasicPublishAsync(exchange: ExchangeName, routingKey: RoutingKey, + basicProperties: s_properties, body: s_payload, mandatory: true); + Interlocked.Increment(ref s_messagesSent); + } + catch (Exception ex) + { + Console.Error.WriteLine("[ERROR] channel {0} saw nack, ex: {1}", + publishChannel.ChannelNumber, ex); } } diff --git a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs index 7558f6c0f..d2439f256 100644 --- a/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs +++ b/projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs @@ -67,14 +67,16 @@ static async Task PublishMessagesIndividuallyAsync() var sw = new Stopwatch(); sw.Start(); - bool ack = false; for (int i = 0; i < MESSAGE_COUNT; i++) { byte[] body = Encoding.UTF8.GetBytes(i.ToString()); - ack = await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body); - if (false == ack) + try + { + await channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body); + } + catch (Exception ex) { - Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack '{ack}'"); + Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: {ex}"); } } @@ -100,7 +102,7 @@ static async Task PublishMessagesInBatchAsync() var sw = new Stopwatch(); sw.Start(); - var publishTasks = new List>(); + var publishTasks = new List(); for (int i = 0; i < MESSAGE_COUNT; i++) { byte[] body = Encoding.UTF8.GetBytes(i.ToString()); @@ -109,12 +111,15 @@ static async Task PublishMessagesInBatchAsync() if (outstandingMessageCount == batchSize) { - foreach (ValueTask pt in publishTasks) + foreach (ValueTask pt in publishTasks) { - bool ack = await pt; - if (false == ack) + try + { + await pt; + } + catch (Exception ex) { - Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack '{ack}'"); + Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'"); } } publishTasks.Clear(); @@ -124,12 +129,15 @@ static async Task PublishMessagesInBatchAsync() if (publishTasks.Count > 0) { - foreach (ValueTask pt in publishTasks) + foreach (ValueTask pt in publishTasks) { - bool ack = await pt; - if (false == ack) + try { - Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack '{ack}'"); + await pt; + } + catch (Exception ex) + { + Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack or return, ex: '{ex}'"); } } publishTasks.Clear(); @@ -238,7 +246,7 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple) var sw = new Stopwatch(); sw.Start(); - var publishTasks = new List>(); + var publishTasks = new List>(); for (int i = 0; i < MESSAGE_COUNT; i++) { string msg = i.ToString(); @@ -264,18 +272,22 @@ async Task CleanOutstandingConfirms(ulong deliveryTag, bool multiple) // This will cause a basic.return, for fun rk = Guid.NewGuid().ToString(); } - ValueTask pt = channel.BasicPublishAsync(exchange: string.Empty, routingKey: rk, body: body, mandatory: true); - publishTasks.Add(pt); + (ulong, ValueTask) data = + (nextPublishSeqNo, channel.BasicPublishAsync(exchange: string.Empty, routingKey: rk, body: body, mandatory: true)); + publishTasks.Add(data); } using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); // await Task.WhenAll(publishTasks).WaitAsync(cts.Token); - foreach (ValueTask pt in publishTasks) + foreach ((ulong SeqNo, ValueTask PublishTask) datum in publishTasks) { - bool ack = await pt; - if (false == ack) + try + { + await datum.PublishTask; + } + catch (Exception ex) { - Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack '{ack}'"); + Console.Error.WriteLine($"{DateTime.Now} [ERROR] saw nack, seqNo: '{datum.SeqNo}', ex: '{ex}'"); } } diff --git a/projects/Test/Integration/ConnectionRecovery/TestRpcAfterRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestRpcAfterRecovery.cs index 64c391842..874b39f20 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestRpcAfterRecovery.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestRpcAfterRecovery.cs @@ -77,8 +77,8 @@ public async Task TestPublishRpcRightAfterReconnect() try { - Assert.True(await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: testQueueName, - mandatory: false, basicProperties: properties, body: _messageBody)); + await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: testQueueName, + mandatory: false, basicProperties: properties, body: _messageBody); } catch (Exception e) { diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index 2f472f4bb..773238ff6 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -695,8 +695,8 @@ public async Task TestCloseWithinEventHandler_GH1567() var bp = new BasicProperties(); - Assert.True(await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, - basicProperties: bp, mandatory: true, body: GetRandomBody(64))); + await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, + basicProperties: bp, mandatory: true, body: GetRandomBody(64)); Assert.True(await tcs.Task); } diff --git a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs index f5b727389..cf3c59d31 100644 --- a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs +++ b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs @@ -108,8 +108,8 @@ public async Task TestAsyncEventingBasicConsumer_GH1038() await using IChannel publisherChannel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!"); var props = new BasicProperties(); - Assert.True(await publisherChannel.BasicPublishAsync(exchange: exchangeName, routingKey: string.Empty, - mandatory: false, basicProperties: props, body: messageBodyBytes)); + await publisherChannel.BasicPublishAsync(exchange: exchangeName, routingKey: string.Empty, + mandatory: false, basicProperties: props, body: messageBodyBytes); await Task.WhenAll(_onReceivedTcs.Task, _onCallbackExceptionTcs.Task); Assert.True(await _onReceivedTcs.Task); diff --git a/projects/Test/Integration/TestBasicPublish.cs b/projects/Test/Integration/TestBasicPublish.cs index a395a6669..107460fd8 100644 --- a/projects/Test/Integration/TestBasicPublish.cs +++ b/projects/Test/Integration/TestBasicPublish.cs @@ -76,7 +76,7 @@ public async Task TestBasicRoundtripArray() }; string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); - Assert.True(await _channel.BasicPublishAsync("", q.QueueName, true, bp, sendBody)); + await _channel.BasicPublishAsync("", q.QueueName, true, bp, sendBody); bool waitRes = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(5)); await _channel.BasicCancelAsync(tag); @@ -131,7 +131,7 @@ public async Task TestBasicRoundtripReadOnlyMemory() }; string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); - Assert.True(await _channel.BasicPublishAsync("", q.QueueName, new ReadOnlyMemory(sendBody))); + await _channel.BasicPublishAsync("", q.QueueName, new ReadOnlyMemory(sendBody)); bool waitRes = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(2)); await _channel.BasicCancelAsync(tag); @@ -161,7 +161,7 @@ public async Task CanNotModifyPayloadAfterPublish() }; string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); - Assert.True(await _channel.BasicPublishAsync("", q.QueueName, sendBody)); + await _channel.BasicPublishAsync("", q.QueueName, sendBody); sendBody.AsSpan().Fill(1); Assert.True(await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(5))); @@ -247,7 +247,7 @@ public async Task TestMaxInboundMessageBodySize() string tag = await channel.BasicConsumeAsync(q.QueueName, true, consumer); - Assert.True(await channel.BasicPublishAsync("", q.QueueName, msg0)); + await channel.BasicPublishAsync("", q.QueueName, msg0); AlreadyClosedException ex = await Assert.ThrowsAsync(() => channel.BasicPublishAsync("", q.QueueName, msg1).AsTask()); Assert.IsType(ex.InnerException); @@ -315,7 +315,7 @@ public async Task TestPropertiesRoundtrip_Headers() }; string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); - Assert.True(await _channel.BasicPublishAsync("", q.QueueName, false, bp, sendBody)); + await _channel.BasicPublishAsync("", q.QueueName, false, bp, sendBody); bool waitResFalse = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(5)); await _channel.BasicCancelAsync(tag); Assert.True(waitResFalse); diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs index c08be2918..66841c2cc 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs @@ -114,7 +114,7 @@ await TestConcurrentOperationsAsync(async () => await _channel.BasicConsumeAsync(queue: q.QueueName, autoAck: false, consumer); - var publishTasks = new List>(); + var publishTasks = new List(); for (ushort i = 0; i < _messageCount; i++) { msgTracker[i] = false; @@ -122,9 +122,9 @@ await TestConcurrentOperationsAsync(async () => publishTasks.Add(_channel.BasicPublishAsync("", q.QueueName, mandatory: true, body: body)); } - foreach (ValueTask pt in publishTasks) + foreach (ValueTask pt in publishTasks) { - Assert.True(await pt); + await pt; } Assert.True(await tcs.Task); diff --git a/projects/Test/Integration/TestConfirmSelect.cs b/projects/Test/Integration/TestConfirmSelect.cs index af79ed7db..3db7e37a1 100644 --- a/projects/Test/Integration/TestConfirmSelect.cs +++ b/projects/Test/Integration/TestConfirmSelect.cs @@ -46,23 +46,23 @@ public TestConfirmSelect(ITestOutputHelper output) : base(output) [Fact] public async Task TestConfirmSelectIdempotency() { - ValueTask PublishAsync() + ValueTask PublishAsync() { return _channel.BasicPublishAsync(exchange: "", routingKey: Guid.NewGuid().ToString(), _encoding.GetBytes("message")); } Assert.Equal(1ul, await _channel.GetNextPublishSequenceNumberAsync()); - Assert.True(await PublishAsync()); + await PublishAsync(); Assert.Equal(2ul, await _channel.GetNextPublishSequenceNumberAsync()); - Assert.True(await PublishAsync()); + await PublishAsync(); Assert.Equal(3ul, await _channel.GetNextPublishSequenceNumberAsync()); - Assert.True(await PublishAsync()); + await PublishAsync(); Assert.Equal(4ul, await _channel.GetNextPublishSequenceNumberAsync()); - Assert.True(await PublishAsync()); + await PublishAsync(); Assert.Equal(5ul, await _channel.GetNextPublishSequenceNumberAsync()); - Assert.True(await PublishAsync()); + await PublishAsync(); Assert.Equal(6ul, await _channel.GetNextPublishSequenceNumberAsync()); } @@ -78,8 +78,8 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength) var properties = new BasicProperties(); // _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync()); - Assert.True(await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty, - mandatory: false, basicProperties: properties, body: body)); + await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty, + mandatory: false, basicProperties: properties, body: body); try { @@ -88,7 +88,7 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength) CorrelationId = new string('o', correlationIdLength) }; // _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync()); - Assert.True(await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body)); + await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body); } catch { @@ -97,7 +97,7 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength) properties = new BasicProperties(); // _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync()); - Assert.True(await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body)); + await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body); // _output.WriteLine("I'm done..."); } } diff --git a/projects/Test/Integration/TestConfirmSelectAsync.cs b/projects/Test/Integration/TestConfirmSelectAsync.cs index 3e79965c1..0502185e2 100644 --- a/projects/Test/Integration/TestConfirmSelectAsync.cs +++ b/projects/Test/Integration/TestConfirmSelectAsync.cs @@ -49,20 +49,20 @@ public TestConfirmSelectAsync(ITestOutputHelper output) : base(output) public async Task TestConfirmSelectIdempotency() { Assert.Equal(1ul, await _channel.GetNextPublishSequenceNumberAsync()); - Assert.True(await PublishAsync()); + await PublishAsync(); Assert.Equal(2ul, await _channel.GetNextPublishSequenceNumberAsync()); - Assert.True(await PublishAsync()); + await PublishAsync(); Assert.Equal(3ul, await _channel.GetNextPublishSequenceNumberAsync()); - Assert.True(await PublishAsync()); + await PublishAsync(); Assert.Equal(4ul, await _channel.GetNextPublishSequenceNumberAsync()); - Assert.True(await PublishAsync()); + await PublishAsync(); Assert.Equal(5ul, await _channel.GetNextPublishSequenceNumberAsync()); - Assert.True(await PublishAsync()); + await PublishAsync(); Assert.Equal(6ul, await _channel.GetNextPublishSequenceNumberAsync()); } - private ValueTask PublishAsync() + private ValueTask PublishAsync() { return _channel.BasicPublishAsync(exchange: "", routingKey: Guid.NewGuid().ToString(), _message); diff --git a/projects/Test/Integration/TestConnectionTopologyRecovery.cs b/projects/Test/Integration/TestConnectionTopologyRecovery.cs index f2d32258c..3d01632ae 100644 --- a/projects/Test/Integration/TestConnectionTopologyRecovery.cs +++ b/projects/Test/Integration/TestConnectionTopologyRecovery.cs @@ -228,7 +228,8 @@ public async Task TestTopologyRecoveryBindingFilter() Assert.True(ch.IsOpen); Assert.True(await SendAndConsumeMessageAsync(_conn, queueWithRecoveredBinding, exchange, bindingToRecover)); - Assert.False(await SendAndConsumeMessageAsync(_conn, queueWithIgnoredBinding, exchange, bindingToIgnore)); + // TODO use real exception being thrown + await Assert.ThrowsAnyAsync(() => SendAndConsumeMessageAsync(_conn, queueWithIgnoredBinding, exchange, bindingToIgnore)); } finally { diff --git a/projects/Test/OAuth2/TestOAuth2.cs b/projects/Test/OAuth2/TestOAuth2.cs index 886064132..347ad12ce 100644 --- a/projects/Test/OAuth2/TestOAuth2.cs +++ b/projects/Test/OAuth2/TestOAuth2.cs @@ -245,7 +245,8 @@ private async Task PublishAsync(IChannel publishChannel) AppId = "oauth2", }; - Assert.True(await publishChannel.BasicPublishAsync(exchange: Exchange, routingKey: "hello", false, basicProperties: properties, body: body)); + await publishChannel.BasicPublishAsync(exchange: Exchange, routingKey: "hello", false, + basicProperties: properties, body: body); _testOutputHelper.WriteLine("Sent and confirmed message"); } diff --git a/projects/Test/SequentialIntegration/TestActivitySource.cs b/projects/Test/SequentialIntegration/TestActivitySource.cs index 31b8c38db..7ea58dc22 100644 --- a/projects/Test/SequentialIntegration/TestActivitySource.cs +++ b/projects/Test/SequentialIntegration/TestActivitySource.cs @@ -99,7 +99,7 @@ public async Task TestPublisherAndConsumerActivityTags(bool useRoutingKeyAsOpera }; string consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer); - Assert.True(await _channel.BasicPublishAsync("", q.QueueName, true, sendBody)); + await _channel.BasicPublishAsync("", q.QueueName, true, sendBody); await consumerReceivedTcs.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.True(await consumerReceivedTcs.Task);