From 6e95613ecbea6b5f140bd6fab5b3bce3ad83c239 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 3 Oct 2024 14:12:19 -0700 Subject: [PATCH] Start adding assertions for successful `BasicPublishAsync` calls. --- projects/Test/Applications/MassPublish/Program.cs | 8 ++++++-- .../ConnectionRecovery/TestRpcAfterRecovery.cs | 4 ++-- projects/Test/Integration/TestAsyncConsumer.cs | 4 ++-- .../Test/Integration/TestAsyncEventingBasicConsumer.cs | 4 ++-- projects/Test/Integration/TestBasicPublish.cs | 10 +++++----- projects/Test/Integration/TestConfirmSelect.cs | 8 ++++---- projects/Test/OAuth2/TestOAuth2.cs | 2 +- 7 files changed, 22 insertions(+), 18 deletions(-) diff --git a/projects/Test/Applications/MassPublish/Program.cs b/projects/Test/Applications/MassPublish/Program.cs index 9eff678d9..4cb8caa73 100644 --- a/projects/Test/Applications/MassPublish/Program.cs +++ b/projects/Test/Applications/MassPublish/Program.cs @@ -140,12 +140,16 @@ await consumeChannel.BasicConsumeAsync(queue: QueueName, autoAck: true, consumer using IChannel publishChannel = await publishConnection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); publishChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync; - + bool ack = false; for (int i = 0; i < ItemsPerBatch; i++) { - await publishChannel.BasicPublishAsync(exchange: ExchangeName, routingKey: RoutingKey, + ack = await publishChannel.BasicPublishAsync(exchange: ExchangeName, routingKey: RoutingKey, basicProperties: s_properties, body: s_payload, mandatory: true); Interlocked.Increment(ref s_messagesSent); + if (false == ack) + { + Console.Error.WriteLine("[ERROR] channel {0} saw nack!", publishChannel.ChannelNumber); + } } if (s_debug) diff --git a/projects/Test/Integration/ConnectionRecovery/TestRpcAfterRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestRpcAfterRecovery.cs index 874b39f20..64c391842 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestRpcAfterRecovery.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestRpcAfterRecovery.cs @@ -77,8 +77,8 @@ public async Task TestPublishRpcRightAfterReconnect() try { - await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: testQueueName, - mandatory: false, basicProperties: properties, body: _messageBody); + Assert.True(await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: testQueueName, + mandatory: false, basicProperties: properties, body: _messageBody)); } catch (Exception e) { diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index 773238ff6..2f472f4bb 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -695,8 +695,8 @@ public async Task TestCloseWithinEventHandler_GH1567() var bp = new BasicProperties(); - await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, - basicProperties: bp, mandatory: true, body: GetRandomBody(64)); + Assert.True(await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, + basicProperties: bp, mandatory: true, body: GetRandomBody(64))); Assert.True(await tcs.Task); } diff --git a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs index cf3c59d31..f5b727389 100644 --- a/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs +++ b/projects/Test/Integration/TestAsyncEventingBasicConsumer.cs @@ -108,8 +108,8 @@ public async Task TestAsyncEventingBasicConsumer_GH1038() await using IChannel publisherChannel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }); byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!"); var props = new BasicProperties(); - await publisherChannel.BasicPublishAsync(exchange: exchangeName, routingKey: string.Empty, - mandatory: false, basicProperties: props, body: messageBodyBytes); + Assert.True(await publisherChannel.BasicPublishAsync(exchange: exchangeName, routingKey: string.Empty, + mandatory: false, basicProperties: props, body: messageBodyBytes)); await Task.WhenAll(_onReceivedTcs.Task, _onCallbackExceptionTcs.Task); Assert.True(await _onReceivedTcs.Task); diff --git a/projects/Test/Integration/TestBasicPublish.cs b/projects/Test/Integration/TestBasicPublish.cs index 107460fd8..a395a6669 100644 --- a/projects/Test/Integration/TestBasicPublish.cs +++ b/projects/Test/Integration/TestBasicPublish.cs @@ -76,7 +76,7 @@ public async Task TestBasicRoundtripArray() }; string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); - await _channel.BasicPublishAsync("", q.QueueName, true, bp, sendBody); + Assert.True(await _channel.BasicPublishAsync("", q.QueueName, true, bp, sendBody)); bool waitRes = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(5)); await _channel.BasicCancelAsync(tag); @@ -131,7 +131,7 @@ public async Task TestBasicRoundtripReadOnlyMemory() }; string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); - await _channel.BasicPublishAsync("", q.QueueName, new ReadOnlyMemory(sendBody)); + Assert.True(await _channel.BasicPublishAsync("", q.QueueName, new ReadOnlyMemory(sendBody))); bool waitRes = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(2)); await _channel.BasicCancelAsync(tag); @@ -161,7 +161,7 @@ public async Task CanNotModifyPayloadAfterPublish() }; string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); - await _channel.BasicPublishAsync("", q.QueueName, sendBody); + Assert.True(await _channel.BasicPublishAsync("", q.QueueName, sendBody)); sendBody.AsSpan().Fill(1); Assert.True(await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(5))); @@ -247,7 +247,7 @@ public async Task TestMaxInboundMessageBodySize() string tag = await channel.BasicConsumeAsync(q.QueueName, true, consumer); - await channel.BasicPublishAsync("", q.QueueName, msg0); + Assert.True(await channel.BasicPublishAsync("", q.QueueName, msg0)); AlreadyClosedException ex = await Assert.ThrowsAsync(() => channel.BasicPublishAsync("", q.QueueName, msg1).AsTask()); Assert.IsType(ex.InnerException); @@ -315,7 +315,7 @@ public async Task TestPropertiesRoundtrip_Headers() }; string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer); - await _channel.BasicPublishAsync("", q.QueueName, false, bp, sendBody); + Assert.True(await _channel.BasicPublishAsync("", q.QueueName, false, bp, sendBody)); bool waitResFalse = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(5)); await _channel.BasicCancelAsync(tag); Assert.True(waitResFalse); diff --git a/projects/Test/Integration/TestConfirmSelect.cs b/projects/Test/Integration/TestConfirmSelect.cs index 21f9e7b18..af79ed7db 100644 --- a/projects/Test/Integration/TestConfirmSelect.cs +++ b/projects/Test/Integration/TestConfirmSelect.cs @@ -78,8 +78,8 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength) var properties = new BasicProperties(); // _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync()); - await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty, - mandatory: false, basicProperties: properties, body: body); + Assert.True(await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty, + mandatory: false, basicProperties: properties, body: body)); try { @@ -88,7 +88,7 @@ await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty, CorrelationId = new string('o', correlationIdLength) }; // _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync()); - await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body); + Assert.True(await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body)); } catch { @@ -97,7 +97,7 @@ await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty, properties = new BasicProperties(); // _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync()); - await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body); + Assert.True(await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body)); // _output.WriteLine("I'm done..."); } } diff --git a/projects/Test/OAuth2/TestOAuth2.cs b/projects/Test/OAuth2/TestOAuth2.cs index b92d57a3b..886064132 100644 --- a/projects/Test/OAuth2/TestOAuth2.cs +++ b/projects/Test/OAuth2/TestOAuth2.cs @@ -245,7 +245,7 @@ private async Task PublishAsync(IChannel publishChannel) AppId = "oauth2", }; - await publishChannel.BasicPublishAsync(exchange: Exchange, routingKey: "hello", false, basicProperties: properties, body: body); + Assert.True(await publishChannel.BasicPublishAsync(exchange: Exchange, routingKey: "hello", false, basicProperties: properties, body: body)); _testOutputHelper.WriteLine("Sent and confirmed message"); }