Skip to content

Commit

Permalink
Add UnackedMessageCountLimit setting for RabbitMQ (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kralizek authored Feb 24, 2019
1 parent 2811ffc commit fcacbc9
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class RabbitMqOptions
public ExchangeOptions CommandExchange { get; set; }

public ExchangeOptions EventExchange { get; set; }

public ushort? UnackedMessageCountLimit { get; set; }
}

public class ConfigurationFactory : IConfigurationFactory
Expand Down Expand Up @@ -69,12 +71,14 @@ public IRabbitMqConfiguration Create(RabbitMqOptions options)

return new RabbitMqConfiguration
{
Options = options,
OutboundEncoding = outboundEncoding,
CommandQueueFactory = commandQueueFactory,
EventQueueFactory = eventQueueFactory,
ConnectionFactory = connectionFactory,
CommandExchangeManager = commandExchangeManager,
EventExchangeManager = eventExchangeManager
EventExchangeManager = eventExchangeManager,
UnackedMessageCountLimit = options.UnackedMessageCountLimit
};

IExchangeManager GetExchangeManager(ExchangeOptions exchangeOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@ public interface IRabbitMqConfiguration
Encoding OutboundEncoding { get; set; }

ISerializer Serializer { get; set; }

IExchangeManager CommandExchangeManager { get; set; }

IExchangeManager EventExchangeManager { get; set; }

ushort? UnackedMessageCountLimit { get; set; }
}

public class RabbitMqConfiguration : IRabbitMqConfiguration
{
public RabbitMqOptions Options { get; set; }

public IConnectionFactory ConnectionFactory { get; set; }

public IQueueFactory CommandQueueFactory { get; set; }
Expand All @@ -34,5 +40,7 @@ public class RabbitMqConfiguration : IRabbitMqConfiguration

public IExchangeManager EventExchangeManager { get; set; }

public ushort? UnackedMessageCountLimit { get; set; }

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public Task<IObservable<Message>> StartAsync()
_connection = _configuration.ConnectionFactory.CreateConnection();
_channel = _connection.CreateModel();

if (_configuration.UnackedMessageCountLimit.HasValue)
{
_channel.BasicQos(0, _configuration.UnackedMessageCountLimit.Value, true);
}

var hasEvents = _messageDescriptorStore.HasEvents();
var hasCommands = _messageDescriptorStore.HasCommands();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,5 +145,39 @@ public void ConnectionNode_is_correctly_bound(string userName, string password,
Assert.That(sut.Connection["HostName"], Is.EqualTo(hostName));
Assert.That(sut.Connection["VirtualHost"], Is.EqualTo(vhost));
}

[Test, AutoMoqData]
public void UnackedMessageCountLimit_is_correctly_bound_when_value_is_provided(ushort limit)
{
var settings = new Dictionary<string, string>
{
[$"{nameof(RabbitMqOptions.UnackedMessageCountLimit)}"] = limit.ToString()
};

var configuration = CreateConfiguration(settings);

var sut = new RabbitMqOptions();

configuration.Bind(sut);

Assert.That(sut.UnackedMessageCountLimit, Is.EqualTo(limit));
}

[Test, AutoMoqData]
public void UnackedMessageCountLimit_is_correctly_bound_when_no_value_is_provided()
{
var settings = new Dictionary<string, string>
{
[$"{nameof(RabbitMqOptions.UnackedMessageCountLimit)}"] = null
};

var configuration = CreateConfiguration(settings);

var sut = new RabbitMqOptions();

configuration.Bind(sut);

Assert.That(sut.UnackedMessageCountLimit, Is.Null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Threading.Tasks;
using AutoFixture.Idioms;
using AutoFixture.NUnit3;
using Microsoft.Extensions.Logging;
using Moq;
using NUnit.Framework;
using Nybus;
Expand Down Expand Up @@ -737,5 +736,29 @@ public async Task NotifyFail_can_handle_closed_connections([Frozen] IRabbitMqCon

Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicNack(deliveryTag, It.IsAny<bool>(), It.IsAny<bool>()));
}

[Test, CustomAutoMoqData]
public async Task Global_QoS_is_sent_if_value_is_set([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut, ushort limit)
{
Mock.Get(configuration).SetupGet(p => p.UnackedMessageCountLimit).Returns(limit);

await sut.StartAsync();

await sut.StopAsync();

Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicQos(0, limit, true));
}

[Test, CustomAutoMoqData]
public async Task No_QoS_is_sent_if_no_value_is_set([Frozen] IRabbitMqConfiguration configuration, RabbitMqBusEngine sut)
{
Mock.Get(configuration).SetupGet(p => p.UnackedMessageCountLimit).Returns(null as ushort?);

await sut.StartAsync();

await sut.StopAsync();

Mock.Get(configuration.ConnectionFactory.CreateConnection().CreateModel()).Verify(p => p.BasicQos(It.IsAny<uint>(), It.IsAny<ushort>(), It.IsAny<bool>()), Times.Never);
}
}
}

0 comments on commit fcacbc9

Please sign in to comment.