Skip to content

Commit

Permalink
Resolved merge conflicts after model -> channel rename
Browse files Browse the repository at this point in the history
  • Loading branch information
rosca-sabina committed Mar 14, 2023
1 parent 9ba9cf2 commit b71f65d
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -354,26 +354,26 @@ private void RecoverChannelsAndItsConsumers()
private sealed class RecoveryChannelFactory : IDisposable
{
private readonly IConnection _connection;
private IModel? _recoveryChannel;
private IChannel? _recoveryChannel;

public RecoveryChannelFactory(IConnection connection)
{
_connection = connection;
}

public IModel RecoveryChannel
public IChannel RecoveryChannel
{
get
{
if (_recoveryChannel == null)
{
_recoveryChannel = _connection.CreateModel();
_recoveryChannel = _connection.CreateChannel();
}

if (_recoveryChannel.IsClosed)
{
_recoveryChannel.Dispose();
_recoveryChannel = _connection.CreateModel();
_recoveryChannel = _connection.CreateChannel();
}

return _recoveryChannel;
Expand Down
86 changes: 43 additions & 43 deletions projects/Unit/TestConnectionRecovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1064,15 +1064,15 @@ public void TestTopologyRecoveryQueueFilter()
var latch = new ManualResetEventSlim(false);
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryFilter(filter);
conn.RecoverySucceeded += (source, ea) => latch.Set();
IModel ch = conn.CreateModel();
IChannel ch = conn.CreateChannel();

var queueToRecover = "recovered.queue";
var queueToIgnore = "filtered.queue";
ch.QueueDeclare(queueToRecover, false, false, false, null);
ch.QueueDeclare(queueToIgnore, false, false, false, null);

_model.QueueDelete(queueToRecover);
_model.QueueDelete(queueToIgnore);
_channel.QueueDelete(queueToRecover);
_channel.QueueDelete(queueToIgnore);

try
{
Expand Down Expand Up @@ -1108,15 +1108,15 @@ public void TestTopologyRecoveryExchangeFilter()
var latch = new ManualResetEventSlim(false);
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryFilter(filter);
conn.RecoverySucceeded += (source, ea) => latch.Set();
IModel ch = conn.CreateModel();
IChannel ch = conn.CreateChannel();

var exchangeToRecover = "recovered.exchange";
var exchangeToIgnore = "filtered.exchange";
ch.ExchangeDeclare(exchangeToRecover, "topic", false, true);
ch.ExchangeDeclare(exchangeToIgnore, "direct", false, true);

_model.ExchangeDelete(exchangeToRecover);
_model.ExchangeDelete(exchangeToIgnore);
_channel.ExchangeDelete(exchangeToRecover);
_channel.ExchangeDelete(exchangeToIgnore);

try
{
Expand Down Expand Up @@ -1152,7 +1152,7 @@ public void TestTopologyRecoveryBindingFilter()
var latch = new ManualResetEventSlim(false);
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryFilter(filter);
conn.RecoverySucceeded += (source, ea) => latch.Set();
IModel ch = conn.CreateModel();
IChannel ch = conn.CreateChannel();

var exchange = "topology.recovery.exchange";
var queueWithRecoveredBinding = "topology.recovery.queue.1";
Expand All @@ -1168,8 +1168,8 @@ public void TestTopologyRecoveryBindingFilter()
ch.QueuePurge(queueWithRecoveredBinding);
ch.QueuePurge(queueWithIgnoredBinding);

_model.QueueUnbind(queueWithRecoveredBinding, exchange, bindingToRecover);
_model.QueueUnbind(queueWithIgnoredBinding, exchange, bindingToIgnore);
_channel.QueueUnbind(queueWithRecoveredBinding, exchange, bindingToRecover);
_channel.QueueUnbind(queueWithIgnoredBinding, exchange, bindingToIgnore);

try
{
Expand All @@ -1196,7 +1196,7 @@ public void TestTopologyRecoveryConsumerFilter()
var latch = new ManualResetEventSlim(false);
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryFilter(filter);
conn.RecoverySucceeded += (source, ea) => latch.Set();
IModel ch = conn.CreateModel();
IChannel ch = conn.CreateChannel();
ch.ConfirmSelect();

var exchange = "topology.recovery.exchange";
Expand Down Expand Up @@ -1260,7 +1260,7 @@ public void TestTopologyRecoveryDefaultFilterRecoversAllEntities()
var latch = new ManualResetEventSlim(false);
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryFilter(filter);
conn.RecoverySucceeded += (source, ea) => latch.Set();
IModel ch = conn.CreateModel();
IChannel ch = conn.CreateChannel();
ch.ConfirmSelect();

var exchange = "topology.recovery.exchange";
Expand All @@ -1287,9 +1287,9 @@ public void TestTopologyRecoveryDefaultFilterRecoversAllEntities()
consumer2.Received += (source, ea) => consumerLatch2.Set();
ch.BasicConsume(queue2, true, "filtered.consumer", consumer2);

_model.ExchangeDelete(exchange);
_model.QueueDelete(queue1);
_model.QueueDelete(queue2);
_channel.ExchangeDelete(exchange);
_channel.QueueDelete(queue1);
_channel.QueueDelete(queue2);

try
{
Expand Down Expand Up @@ -1330,25 +1330,25 @@ public void TestTopologyRecoveryQueueExceptionHandler()
},
QueueRecoveryExceptionHandler = (rq, ex, connection) =>
{
using (var model = connection.CreateModel())
using (var channel = connection.CreateChannel())
{
model.QueueDeclare(rq.Name, false, false, false, changedQueueArguments);
channel.QueueDeclare(rq.Name, false, false, false, changedQueueArguments);
}
}
};
var latch = new ManualResetEventSlim(false);
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandler(exceptionHandler);
conn.RecoverySucceeded += (source, ea) => latch.Set();
IModel ch = conn.CreateModel();
IChannel ch = conn.CreateChannel();

var queueToRecoverWithException = "recovery.exception.queue";
var queueToRecoverSuccessfully = "successfully.recovered.queue";
ch.QueueDeclare(queueToRecoverWithException, false, false, false, null);
ch.QueueDeclare(queueToRecoverSuccessfully, false, false, false, null);

_model.QueueDelete(queueToRecoverSuccessfully);
_model.QueueDelete(queueToRecoverWithException);
_model.QueueDeclare(queueToRecoverWithException, false, false, false, changedQueueArguments);
_channel.QueueDelete(queueToRecoverSuccessfully);
_channel.QueueDelete(queueToRecoverWithException);
_channel.QueueDeclare(queueToRecoverWithException, false, false, false, changedQueueArguments);

try
{
Expand All @@ -1362,7 +1362,7 @@ public void TestTopologyRecoveryQueueExceptionHandler()
finally
{
//Cleanup
_model.QueueDelete(queueToRecoverWithException);
_channel.QueueDelete(queueToRecoverWithException);

conn.Abort();
}
Expand All @@ -1381,25 +1381,25 @@ public void TestTopologyRecoveryExchangeExceptionHandler()
},
ExchangeRecoveryExceptionHandler = (re, ex, connection) =>
{
using (var model = connection.CreateModel())
using (var channel = connection.CreateChannel())
{
model.ExchangeDeclare(re.Name, "topic", false, false);
channel.ExchangeDeclare(re.Name, "topic", false, false);
}
}
};
var latch = new ManualResetEventSlim(false);
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandler(exceptionHandler);
conn.RecoverySucceeded += (source, ea) => latch.Set();
IModel ch = conn.CreateModel();
IChannel ch = conn.CreateChannel();

var exchangeToRecoverWithException = "recovery.exception.exchange";
var exchangeToRecoverSuccessfully = "successfully.recovered.exchange";
ch.ExchangeDeclare(exchangeToRecoverWithException, "direct", false, false);
ch.ExchangeDeclare(exchangeToRecoverSuccessfully, "direct", false, false);

_model.ExchangeDelete(exchangeToRecoverSuccessfully);
_model.ExchangeDelete(exchangeToRecoverWithException);
_model.ExchangeDeclare(exchangeToRecoverWithException, "topic", false, false);
_channel.ExchangeDelete(exchangeToRecoverSuccessfully);
_channel.ExchangeDelete(exchangeToRecoverWithException);
_channel.ExchangeDeclare(exchangeToRecoverWithException, "topic", false, false);

try
{
Expand All @@ -1413,7 +1413,7 @@ public void TestTopologyRecoveryExchangeExceptionHandler()
finally
{
//Cleanup
_model.ExchangeDelete(exchangeToRecoverWithException);
_channel.ExchangeDelete(exchangeToRecoverWithException);

conn.Abort();
}
Expand All @@ -1436,22 +1436,22 @@ public void TestTopologyRecoveryBindingExceptionHandler()
},
BindingRecoveryExceptionHandler = (b, ex, connection) =>
{
using (var model = connection.CreateModel())
using (var channel = connection.CreateChannel())
{
model.QueueDeclare(queueWithExceptionBinding, false, false, false, null);
model.QueueBind(queueWithExceptionBinding, exchange, bindingToRecoverWithException);
channel.QueueDeclare(queueWithExceptionBinding, false, false, false, null);
channel.QueueBind(queueWithExceptionBinding, exchange, bindingToRecoverWithException);
}
}
};
var latch = new ManualResetEventSlim(false);
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandler(exceptionHandler);
conn.RecoverySucceeded += (source, ea) => latch.Set();
IModel ch = conn.CreateModel();
IChannel ch = conn.CreateChannel();

var queueWithRecoveredBinding = "successfully.recovered.queue";
var bindingToRecoverSuccessfully = "successfully.recovered.binding";

_model.QueueDeclare(queueWithExceptionBinding, false, false, false, null);
_channel.QueueDeclare(queueWithExceptionBinding, false, false, false, null);

ch.ExchangeDeclare(exchange, "direct");
ch.QueueDeclare(queueWithRecoveredBinding, false, false, false, null);
Expand All @@ -1460,9 +1460,9 @@ public void TestTopologyRecoveryBindingExceptionHandler()
ch.QueuePurge(queueWithRecoveredBinding);
ch.QueuePurge(queueWithExceptionBinding);

_model.QueueUnbind(queueWithRecoveredBinding, exchange, bindingToRecoverSuccessfully);
_model.QueueUnbind(queueWithExceptionBinding, exchange, bindingToRecoverWithException);
_model.QueueDelete(queueWithExceptionBinding);
_channel.QueueUnbind(queueWithRecoveredBinding, exchange, bindingToRecoverSuccessfully);
_channel.QueueUnbind(queueWithExceptionBinding, exchange, bindingToRecoverWithException);
_channel.QueueDelete(queueWithExceptionBinding);

try
{
Expand Down Expand Up @@ -1494,9 +1494,9 @@ public void TestTopologyRecoveryConsumerExceptionHandler()
},
ConsumerRecoveryExceptionHandler = (c, ex, connection) =>
{
using (var model = connection.CreateModel())
using (var channel = connection.CreateChannel())
{
model.QueueDeclare(queueWithExceptionConsumer, false, false, false, null);
channel.QueueDeclare(queueWithExceptionConsumer, false, false, false, null);
}

// So topology recovery runs again. This time he missing queue should exist, making
Expand All @@ -1507,18 +1507,18 @@ public void TestTopologyRecoveryConsumerExceptionHandler()
var latch = new ManualResetEventSlim(false);
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandler(exceptionHandler);
conn.RecoverySucceeded += (source, ea) => latch.Set();
IModel ch = conn.CreateModel();
IChannel ch = conn.CreateChannel();
ch.ConfirmSelect();

_model.QueueDeclare(queueWithExceptionConsumer, false, false, false, null);
_model.QueuePurge(queueWithExceptionConsumer);
_channel.QueueDeclare(queueWithExceptionConsumer, false, false, false, null);
_channel.QueuePurge(queueWithExceptionConsumer);

var recoverLatch = new ManualResetEventSlim(false);
var consumerToRecover = new EventingBasicConsumer(ch);
consumerToRecover.Received += (source, ea) => recoverLatch.Set();
ch.BasicConsume(queueWithExceptionConsumer, true, "exception.consumer", consumerToRecover);

_model.QueueDelete(queueWithExceptionConsumer);
_channel.QueueDelete(queueWithExceptionConsumer);

try
{
Expand Down Expand Up @@ -1549,7 +1549,7 @@ public void TestTopologyRecoveryConsumerExceptionHandler()

internal bool SendAndConsumeMessage(string queue, string exchange, string routingKey)
{
using (var ch = _conn.CreateModel())
using (var ch = _conn.CreateChannel())
{
var latch = new ManualResetEventSlim(false);

Expand Down

0 comments on commit b71f65d

Please sign in to comment.