Skip to content

Commit

Permalink
Merge pull request #181 from addupsolutions/issue-180
Browse files Browse the repository at this point in the history
Fixes Issue #180
  • Loading branch information
odalet authored Nov 12, 2023
2 parents 18e1d71 + 6f990d4 commit 172778d
Show file tree
Hide file tree
Showing 8 changed files with 415 additions and 135 deletions.
65 changes: 35 additions & 30 deletions src/AddUp.FakeRabbitMQ.Tests/FakeModelBasicTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void BasicAck_removes_message_from_queue()
var deliveryTag = model.WorkingMessagesForUnitTests.First().Key;
model.BasicAck(deliveryTag, false);

Assert.Empty(server.Queues["my_queue"].Messages);
Assert.False(server.Queues["my_queue"].HasMessages);
}
}
}
Expand Down Expand Up @@ -252,7 +252,7 @@ public void BasicGet_does_not_remove_the_message_from_the_queue_if_not_acked(boo

_ = model.BasicGet("my_queue", autoAck);

Assert.Equal(expectedMessageCount, server.Queues["my_queue"].Messages.Count);
Assert.Equal(expectedMessageCount, server.Queues["my_queue"].MessageCount);
Assert.Equal(expectedMessageCount, model.WorkingMessagesForUnitTests.Count);
}
}
Expand Down Expand Up @@ -283,42 +283,40 @@ public void BasicNack_does_not_reenqueue_a_brand_new_message(bool requeue, int e
var deliveryTag = model.WorkingMessagesForUnitTests.First().Key;
model.BasicNack(deliveryTag, false, requeue);

Assert.Equal(expectedMessageCount, server.Queues["my_queue"].Messages.Count);
Assert.Equal(expectedMessageCount, server.Queues["my_queue"].MessageCount);
Assert.Equal(expectedMessageCount, model.WorkingMessagesForUnitTests.Count);
}
}
}

[Theory]
[InlineData(true, 1)] // If requeue param to BasicNack is true, the message that is nacked should remain in Rabbit
[InlineData(false, 0)] // If requeue param to BasicNack is false, the message that is nacked should be removed from Rabbit
[InlineData(true, 1)] // If requeue param to BasicNack is true, the message that is nacked should remain in Rabbit
public void BasicReject_does_not_reenqueue_a_brand_new_message(bool requeue, int expectedMessageCount)
{
var server = new RabbitServer();
using (var model = new FakeModel(server))
{
model.ExchangeDeclare("my_exchange", ExchangeType.Direct);
model.QueueDeclare("my_queue");
model.ExchangeBind("my_queue", "my_exchange", null);
using var model = new FakeModel(server);

var encodedMessage = Encoding.ASCII.GetBytes("hello world!");
model.BasicPublish("my_exchange", null, model.CreateBasicProperties(), encodedMessage);
model.ExchangeDeclare("my_exchange", ExchangeType.Direct);
model.QueueDeclare("my_queue");
model.ExchangeBind("my_queue", "my_exchange", null);

var consumer = new EventingBasicConsumer(model);
using (var messageProcessed = new ManualResetEventSlim())
{
consumer.Received += (_, _) => messageProcessed.Set();
model.BasicConsume("my_queue", false, consumer);
Assert.True(consumer.IsRunning);
var encodedMessage = Encoding.ASCII.GetBytes("hello world!");
model.BasicPublish("my_exchange", null, model.CreateBasicProperties(), encodedMessage);

messageProcessed.Wait();
var deliveryTag = model.WorkingMessagesForUnitTests.First().Key;
model.BasicReject(deliveryTag, requeue);
var consumer = new EventingBasicConsumer(model);
using var messageProcessed = new ManualResetEventSlim();

Assert.Equal(expectedMessageCount, server.Queues["my_queue"].Messages.Count);
Assert.Equal(expectedMessageCount, model.WorkingMessagesForUnitTests.Count);
}
}
consumer.Received += (_, _) => messageProcessed.Set();
model.BasicConsume("my_queue", false, consumer);
Assert.True(consumer.IsRunning);

messageProcessed.Wait();
var deliveryTag = model.WorkingMessagesForUnitTests.First().Key;
model.BasicReject(deliveryTag, requeue);

Assert.Equal(expectedMessageCount, server.Queues["my_queue"].MessageCount);
Assert.Equal(expectedMessageCount, model.WorkingMessagesForUnitTests.Count);
}

[Fact]
Expand All @@ -336,8 +334,11 @@ public void BasicPublish_publishes_message()

model.BasicPublish("my_exchange", null, model.CreateBasicProperties(), encodedMessage);

Assert.Single(server.Queues["my_queue"].Messages);
Assert.Equal(encodedMessage, server.Queues["my_queue"].Messages.First().Body);
Assert.Equal(1, server.Queues["my_queue"].MessageCount);
if (!server.Queues["my_queue"].TryPeekForUnitTests(out var peeked))
Assert.Fail("No message in queue");
else
Assert.Equal(encodedMessage, peeked.Body);
}
}

Expand All @@ -354,8 +355,11 @@ public void BasicPublish_to_default_exchange_publishes_message()

model.BasicPublish("", "my_queue", model.CreateBasicProperties(), encodedMessage);

Assert.Single(server.Queues["my_queue"].Messages);
Assert.Equal(encodedMessage, server.Queues["my_queue"].Messages.First().Body);
Assert.Equal(1, server.Queues["my_queue"].MessageCount);
if (!server.Queues["my_queue"].TryPeekForUnitTests(out var peeked))
Assert.Fail("No message in queue");
else
Assert.Equal(encodedMessage, peeked.Body);
}
}

Expand Down Expand Up @@ -505,10 +509,11 @@ public void BasicPublishBatch_publishes_messages()
batch.Add("my_exchange", null, true, model.CreateBasicProperties(), encodedMessages[1]);
batch.Publish();

Assert.Equal(2, server.Queues["my_queue"].Messages.Count);
Assert.Equal(2, server.Queues["my_queue"].MessageCount);

var index = 0;
foreach (var item in server.Queues["my_queue"].Messages)
var items = server.Queues["my_queue"].GetAllMessagesForUnitTests();
foreach (var item in items)
{
Assert.Equal(encodedMessages[index].ToArray(), item.Body);
index++;
Expand Down
103 changes: 75 additions & 28 deletions src/AddUp.FakeRabbitMQ.Tests/FakeModelMiscTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Diagnostics.CodeAnalysis;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Xunit;
Expand Down Expand Up @@ -56,43 +57,89 @@ public void MessageCount_returns_the_number_of_messages_in_the_queue()
public void MessageCount_returns_the_number_of_non_consumed_messages_in_the_queue()
{
var server = new RabbitServer();
using (var model = new FakeModel(server))
using var model = new FakeModel(server);

const string queueName = "myQueue";
model.QueueDeclare(queueName);
model.ExchangeDeclare("my_exchange", ExchangeType.Direct);
model.ExchangeBind(queueName, "my_exchange", null);

for (var i = 0; i < 10; i++)
{
const string queueName = "myQueue";
model.QueueDeclare(queueName);
model.ExchangeDeclare("my_exchange", ExchangeType.Direct);
model.ExchangeBind(queueName, "my_exchange", null);
var message = $"hello world: {i}";
var encodedMessage = Encoding.ASCII.GetBytes(message);
model.BasicPublish("my_exchange", null, model.CreateBasicProperties(), encodedMessage);
}

for (var i = 0; i < 10; i++)
{
// Consume 4 messages
const string consumerTag = "consumer-tag";
var consumer = new EventingBasicConsumer(model);
var consumptionCount = 0;
using var messagesProcessed = new ManualResetEventSlim();

consumer.Received += (s, e) =>
{
consumptionCount++;
if (consumptionCount > 4) return;
model.BasicAck(e.DeliveryTag, false);
if (consumptionCount == 4)
messagesProcessed.Set();
};

model.BasicConsume(queueName, false, consumerTag, consumer);

messagesProcessed.Wait();
Assert.Equal(6u, model.MessageCount(queueName));
}

[Fact]
public async Task MessageCount_returns_the_number_of_non_consumed_messages_in_the_queue_autoAck_mode()
{
var server = new RabbitServer();
using var model = new FakeModel(server);

const string queueName = "myQueue";
model.QueueDeclare(queueName);
model.ExchangeDeclare("my_exchange", ExchangeType.Direct);
model.ExchangeBind(queueName, "my_exchange", null);

void publishMessages(int startIndex, int count)
{
for (var i = startIndex; i < startIndex + count; i++)
{
var message = $"hello world: {i}";
var encodedMessage = Encoding.ASCII.GetBytes(message);
model.BasicPublish("my_exchange", null, model.CreateBasicProperties(), encodedMessage);
}
}

// Consume 4 messages
var consumer = new EventingBasicConsumer(model);
var consumptionCount = 0;
using (var messagesProcessed = new ManualResetEventSlim())
{
consumer.Received += (s, e) =>
{
if (consumptionCount >= 4)
{
messagesProcessed.Set();
return;
}
model.BasicAck(e.DeliveryTag, false);
consumptionCount++;
};

model.BasicConsume(queueName, true, consumer);
messagesProcessed.Wait();
Assert.Equal(6u, model.MessageCount(queueName));
}
publishMessages(0, 4);

// Consume 4 messages
const string consumerTag = "consumer-tag";
var consumer = new EventingBasicConsumer(model);
var consumptionCount = 0;
using var messagesProcessed = new ManualResetEventSlim();

void consume(object sender, BasicDeliverEventArgs e)
{
consumptionCount++;
if (consumptionCount >= 4)
messagesProcessed.Set();
}

consumer.Received += consume;

model.BasicConsume(queueName, true, consumerTag, consumer);
messagesProcessed.Wait();
model.BasicCancel(consumerTag);

publishMessages(4, 6); // Publish another 6 messages
await Task.Delay(1000); // They will never be consumed

Assert.Equal(4, consumptionCount);
Assert.Equal(6u, model.MessageCount(queueName));
}

[Fact]
Expand Down
39 changes: 25 additions & 14 deletions src/AddUp.FakeRabbitMQ.Tests/FakeModelQueueTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
using Xunit;
Expand All @@ -10,6 +12,8 @@ namespace AddUp.RabbitMQ.Fakes;
[ExcludeFromCodeCoverage]
public class FakeModelQueueTests
{
private long lastDeliveryTag; // USed to simulate generation of the delivery tag by FakeModel

[Fact]
public void QueueBind_binds_an_exchange_to_a_queue()
{
Expand Down Expand Up @@ -202,20 +206,20 @@ public void QueuePurge_removes_all_messages_from_specified_queue()
using (var model = new FakeModel(node))
{
model.QueueDeclare("my_other_queue");
node.Queues["my_other_queue"].Messages.Enqueue(new RabbitMessage());
node.Queues["my_other_queue"].Messages.Enqueue(new RabbitMessage());
node.Queues["my_other_queue"].Enqueue(MakeRabbitMessage());
node.Queues["my_other_queue"].Enqueue(MakeRabbitMessage());

model.QueueDeclare("my_queue");
node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage());
node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage());
node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage());
node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage());
node.Queues["my_queue"].Enqueue(MakeRabbitMessage());
node.Queues["my_queue"].Enqueue(MakeRabbitMessage());
node.Queues["my_queue"].Enqueue(MakeRabbitMessage());
node.Queues["my_queue"].Enqueue(MakeRabbitMessage());

var count = model.QueuePurge("my_queue");
Assert.Equal(4u, count);

Assert.True(node.Queues["my_queue"].Messages.IsEmpty);
Assert.False(node.Queues["my_other_queue"].Messages.IsEmpty);
Assert.False(node.Queues["my_queue"].HasMessages);
Assert.True(node.Queues["my_other_queue"].HasMessages);
}
}

Expand All @@ -226,15 +230,22 @@ public void QueuePurge_returns_0_if_queue_does_not_exist()
using (var model = new FakeModel(node))
{
model.QueueDeclare("my_queue");
node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage());
node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage());
node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage());
node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage());
node.Queues["my_queue"].Enqueue(MakeRabbitMessage());
node.Queues["my_queue"].Enqueue(MakeRabbitMessage());
node.Queues["my_queue"].Enqueue(MakeRabbitMessage());
node.Queues["my_queue"].Enqueue(MakeRabbitMessage());

var count = model.QueuePurge("my_other_queue");
Assert.Equal(0u, count);

Assert.False(node.Queues["my_queue"].Messages.IsEmpty);
Assert.True(node.Queues["my_queue"].HasMessages);
}
}

private RabbitMessage MakeRabbitMessage()
{
_ = Interlocked.Increment(ref lastDeliveryTag);
var deliveryTag = Convert.ToUInt64(lastDeliveryTag);
return new(deliveryTag);
}
}
Loading

0 comments on commit 172778d

Please sign in to comment.