Skip to content

Commit

Permalink
Start adding assertions for successful BasicPublishAsync calls.
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken committed Oct 4, 2024
1 parent 6accd8f commit 57dbd0c
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 18 deletions.
8 changes: 6 additions & 2 deletions projects/Test/Applications/MassPublish/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,16 @@ await consumeChannel.BasicConsumeAsync(queue: QueueName, autoAck: true, consumer
using IChannel publishChannel = await publishConnection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
publishChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync;


bool ack = false;
for (int i = 0; i < ItemsPerBatch; i++)
{
await publishChannel.BasicPublishAsync(exchange: ExchangeName, routingKey: RoutingKey,
ack = await publishChannel.BasicPublishAsync(exchange: ExchangeName, routingKey: RoutingKey,
basicProperties: s_properties, body: s_payload, mandatory: true);
Interlocked.Increment(ref s_messagesSent);
if (false == ack)
{
Console.Error.WriteLine("[ERROR] channel {0} saw nack!", publishChannel.ChannelNumber);
}
}

if (s_debug)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ public async Task TestPublishRpcRightAfterReconnect()

try
{
await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: testQueueName,
mandatory: false, basicProperties: properties, body: _messageBody);
Assert.True(await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: testQueueName,
mandatory: false, basicProperties: properties, body: _messageBody));
}
catch (Exception e)
{
Expand Down
4 changes: 2 additions & 2 deletions projects/Test/Integration/TestAsyncConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,8 @@ public async Task TestCloseWithinEventHandler_GH1567()

var bp = new BasicProperties();

await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName,
basicProperties: bp, mandatory: true, body: GetRandomBody(64));
Assert.True(await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName,
basicProperties: bp, mandatory: true, body: GetRandomBody(64)));

Assert.True(await tcs.Task);
}
Expand Down
4 changes: 2 additions & 2 deletions projects/Test/Integration/TestAsyncEventingBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ public async Task TestAsyncEventingBasicConsumer_GH1038()
await using IChannel publisherChannel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
var props = new BasicProperties();
await publisherChannel.BasicPublishAsync(exchange: exchangeName, routingKey: string.Empty,
mandatory: false, basicProperties: props, body: messageBodyBytes);
Assert.True(await publisherChannel.BasicPublishAsync(exchange: exchangeName, routingKey: string.Empty,
mandatory: false, basicProperties: props, body: messageBodyBytes));

await Task.WhenAll(_onReceivedTcs.Task, _onCallbackExceptionTcs.Task);
Assert.True(await _onReceivedTcs.Task);
Expand Down
10 changes: 5 additions & 5 deletions projects/Test/Integration/TestBasicPublish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public async Task TestBasicRoundtripArray()
};
string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer);

await _channel.BasicPublishAsync("", q.QueueName, true, bp, sendBody);
Assert.True(await _channel.BasicPublishAsync("", q.QueueName, true, bp, sendBody));
bool waitRes = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(5));
await _channel.BasicCancelAsync(tag);

Expand Down Expand Up @@ -131,7 +131,7 @@ public async Task TestBasicRoundtripReadOnlyMemory()
};
string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer);

await _channel.BasicPublishAsync("", q.QueueName, new ReadOnlyMemory<byte>(sendBody));
Assert.True(await _channel.BasicPublishAsync("", q.QueueName, new ReadOnlyMemory<byte>(sendBody)));
bool waitRes = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(2));
await _channel.BasicCancelAsync(tag);

Expand Down Expand Up @@ -161,7 +161,7 @@ public async Task CanNotModifyPayloadAfterPublish()
};
string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer);

await _channel.BasicPublishAsync("", q.QueueName, sendBody);
Assert.True(await _channel.BasicPublishAsync("", q.QueueName, sendBody));
sendBody.AsSpan().Fill(1);

Assert.True(await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(5)));
Expand Down Expand Up @@ -247,7 +247,7 @@ public async Task TestMaxInboundMessageBodySize()

string tag = await channel.BasicConsumeAsync(q.QueueName, true, consumer);

await channel.BasicPublishAsync("", q.QueueName, msg0);
Assert.True(await channel.BasicPublishAsync("", q.QueueName, msg0));
AlreadyClosedException ex = await Assert.ThrowsAsync<AlreadyClosedException>(() =>
channel.BasicPublishAsync("", q.QueueName, msg1).AsTask());
Assert.IsType<MalformedFrameException>(ex.InnerException);
Expand Down Expand Up @@ -315,7 +315,7 @@ public async Task TestPropertiesRoundtrip_Headers()
};

string tag = await _channel.BasicConsumeAsync(q.QueueName, true, consumer);
await _channel.BasicPublishAsync("", q.QueueName, false, bp, sendBody);
Assert.True(await _channel.BasicPublishAsync("", q.QueueName, false, bp, sendBody));
bool waitResFalse = await consumerReceivedSemaphore.WaitAsync(TimeSpan.FromSeconds(5));
await _channel.BasicCancelAsync(tag);
Assert.True(waitResFalse);
Expand Down
8 changes: 4 additions & 4 deletions projects/Test/Integration/TestConfirmSelect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength)

var properties = new BasicProperties();
// _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync());
await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,
mandatory: false, basicProperties: properties, body: body);
Assert.True(await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,
mandatory: false, basicProperties: properties, body: body));

try
{
Expand All @@ -88,7 +88,7 @@ await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,
CorrelationId = new string('o', correlationIdLength)
};
// _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync());
await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body);
Assert.True(await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body));
}
catch
{
Expand All @@ -97,7 +97,7 @@ await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,

properties = new BasicProperties();
// _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync());
await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body);
Assert.True(await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body));
// _output.WriteLine("I'm done...");
}
}
Expand Down
2 changes: 1 addition & 1 deletion projects/Test/OAuth2/TestOAuth2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private async Task PublishAsync(IChannel publishChannel)
AppId = "oauth2",
};

await publishChannel.BasicPublishAsync(exchange: Exchange, routingKey: "hello", false, basicProperties: properties, body: body);
Assert.True(await publishChannel.BasicPublishAsync(exchange: Exchange, routingKey: "hello", false, basicProperties: properties, body: body));
_testOutputHelper.WriteLine("Sent and confirmed message");
}

Expand Down

0 comments on commit 57dbd0c

Please sign in to comment.