diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 529bae1329..2eb7e5c530 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -64,7 +64,7 @@ internal sealed class Connection : IConnection ///Heartbeat frame for transmission. Reusable across connections. private readonly EmptyOutboundFrame _heartbeatFrame = new EmptyOutboundFrame(); - private readonly ManualResetEvent _appContinuation = new ManualResetEvent(false); + private readonly ManualResetEventSlim _appContinuation = new ManualResetEventSlim(false); private volatile ShutdownEventArgs _closeReason = null; private volatile bool _closed = false; @@ -346,8 +346,7 @@ public void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout) } } - bool receivedSignal = _appContinuation.WaitOne(timeout); - + bool receivedSignal = _appContinuation.Wait(timeout); if (!receivedSignal) { _frameHandler.Close(); diff --git a/projects/RabbitMQ.Client/client/impl/ModelBase.cs b/projects/RabbitMQ.Client/client/impl/ModelBase.cs index 12bd32aa03..85998a41c5 100644 --- a/projects/RabbitMQ.Client/client/impl/ModelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ModelBase.cs @@ -66,7 +66,7 @@ abstract class ModelBase : IFullModel, IRecoverable private TimeSpan _continuationTimeout = TimeSpan.FromSeconds(20); private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue(); - private readonly ManualResetEvent _flowControlBlock = new ManualResetEvent(true); + private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true); private readonly object _shutdownLock = new object(); private readonly object _rpcLock = new object(); @@ -353,7 +353,7 @@ public void ModelSend(MethodBase method, ContentHeaderBase header, ReadOnlyMemor { if (method.HasContent) { - _flowControlBlock.WaitOne(); + _flowControlBlock.Wait(); Session.Transmit(new Command(method, header, body)); } else @@ -1410,7 +1410,7 @@ public void WaitForConfirmsOrDie(TimeSpan timeout) internal void SendCommands(IList commands) { - _flowControlBlock.WaitOne(); + _flowControlBlock.Wait(); AllocatatePublishSeqNos(commands.Count); Session.Transmit(commands); } diff --git a/projects/Unit/Fixtures.cs b/projects/Unit/Fixtures.cs index a07e20bb66..bbb03e10b3 100644 --- a/projects/Unit/Fixtures.cs +++ b/projects/Unit/Fixtures.cs @@ -698,14 +698,14 @@ internal void StartRabbitMQ() // Concurrency and Coordination // - internal void Wait(ManualResetEvent latch) + internal void Wait(ManualResetEventSlim latch) { - Assert.IsTrue(latch.WaitOne(TimeSpan.FromSeconds(10)), "waiting on a latch timed out"); + Assert.IsTrue(latch.Wait(TimeSpan.FromSeconds(10)), "waiting on a latch timed out"); } - internal void Wait(ManualResetEvent latch, TimeSpan timeSpan) + internal void Wait(ManualResetEventSlim latch, TimeSpan timeSpan) { - Assert.IsTrue(latch.WaitOne(timeSpan), "waiting on a latch timed out"); + Assert.IsTrue(latch.Wait(timeSpan), "waiting on a latch timed out"); } // diff --git a/projects/Unit/TestConnectionRecovery.cs b/projects/Unit/TestConnectionRecovery.cs index dd58c79af4..5f29a1c096 100644 --- a/projects/Unit/TestConnectionRecovery.cs +++ b/projects/Unit/TestConnectionRecovery.cs @@ -86,7 +86,7 @@ public void CleanUp() [Test] public void TestBasicAckAfterChannelRecovery() { - var latch = new ManualResetEvent(false); + var latch = new ManualResetEventSlim(false); var cons = new AckingBasicConsumer(Model, latch, CloseAndWaitForRecovery); TestDelayedBasicAckNackAfterChannelRecovery(cons, latch); @@ -115,7 +115,7 @@ public void TestBasicAckAfterBasicGetAndChannelRecovery() public void TestBasicAckEventHandlerRecovery() { Model.ConfirmSelect(); - var latch = new ManualResetEvent(false); + var latch = new ManualResetEventSlim(false); ((AutorecoveringModel)Model).BasicAcks += (m, args) => latch.Set(); ((AutorecoveringModel)Model).BasicNacks += (m, args) => latch.Set(); @@ -235,7 +235,7 @@ public void TestBasicModelRecoveryOnServerRestart() [Test] public void TestBasicNackAfterChannelRecovery() { - var latch = new ManualResetEvent(false); + var latch = new ManualResetEventSlim(false); var cons = new NackingBasicConsumer(Model, latch, CloseAndWaitForRecovery); TestDelayedBasicAckNackAfterChannelRecovery(cons, latch); @@ -244,7 +244,7 @@ public void TestBasicNackAfterChannelRecovery() [Test] public void TestBasicRejectAfterChannelRecovery() { - var latch = new ManualResetEvent(false); + var latch = new ManualResetEventSlim(false); var cons = new RejectingBasicConsumer(Model, latch, CloseAndWaitForRecovery); TestDelayedBasicAckNackAfterChannelRecovery(cons, latch); @@ -253,7 +253,7 @@ public void TestBasicRejectAfterChannelRecovery() [Test] public void TestBlockedListenersRecovery() { - var latch = new ManualResetEvent(false); + var latch = new ManualResetEventSlim(false); Conn.ConnectionBlocked += (c, reason) => latch.Set(); CloseAndWaitForRecovery(); CloseAndWaitForRecovery(); @@ -314,7 +314,7 @@ public void TestConsumerWorkServiceRecovery() CloseAndWaitForRecovery(c); Assert.IsTrue(m.IsOpen); - var latch = new ManualResetEvent(false); + var latch = new ManualResetEventSlim(false); cons.Received += (s, args) => latch.Set(); m.BasicPublish("", q, null, encoding.GetBytes("msg")); @@ -354,7 +354,7 @@ public void TestConsumerRecoveryOnClientNamedQueueWithOneRecovery() AssertConsumerCount(m, q1, 1); Assert.False(queueNameChangeAfterRecoveryCalled); - var latch = new ManualResetEvent(false); + var latch = new ManualResetEventSlim(false); cons.Received += (s, args) => latch.Set(); m.BasicPublish("", q1, null, encoding.GetBytes("msg")); @@ -376,7 +376,7 @@ public void TestConsumerRecoveryWithManyConsumers() Model.BasicConsume(q, true, cons); } - var latch = new ManualResetEvent(false); + var latch = new ManualResetEventSlim(false); ((AutorecoveringConnection)Conn).ConsumerTagChangeAfterRecovery += (prev, current) => latch.Set(); CloseAndWaitForRecovery(); @@ -593,7 +593,7 @@ public void TestServerNamedTransientAutoDeleteQueueAndBindingRecovery() string q = ch.QueueDeclare(queue: "", durable: false, exclusive: false, autoDelete: true, arguments: null).QueueName; string nameBefore = q; string nameAfter = null; - var latch = new ManualResetEvent(false); + var latch = new ManualResetEventSlim(false); ((AutorecoveringConnection)Conn).QueueNameChangeAfterRecovery += (source, ea) => { nameBefore = ea.NameBefore; @@ -696,7 +696,7 @@ public void TestServerNamedQueueRecovery() string nameBefore = q; string nameAfter = null; - var latch = new ManualResetEvent(false); + var latch = new ManualResetEventSlim(false); var connection = (AutorecoveringConnection)Conn; connection.RecoverySucceeded += (source, ea) => latch.Set(); connection.QueueNameChangeAfterRecovery += (source, ea) => { nameAfter = ea.NameAfter; }; @@ -733,8 +733,8 @@ public void TestShutdownEventHandlersRecoveryOnConnectionAfterDelayedServerResta { int counter = 0; Conn.ConnectionShutdown += (c, args) => Interlocked.Increment(ref counter); - ManualResetEvent shutdownLatch = PrepareForShutdown(Conn); - ManualResetEvent recoveryLatch = PrepareForRecovery((AutorecoveringConnection)Conn); + ManualResetEventSlim shutdownLatch = PrepareForShutdown(Conn); + ManualResetEventSlim recoveryLatch = PrepareForRecovery((AutorecoveringConnection)Conn); Assert.IsTrue(Conn.IsOpen); StopRabbitMQ(); @@ -884,7 +884,7 @@ public void TestThatDeletedQueuesDontReappearOnRecovery() [Test] public void TestUnblockedListenersRecovery() { - var latch = new ManualResetEvent(false); + var latch = new ManualResetEventSlim(false); Conn.ConnectionUnblocked += (source, ea) => latch.Set(); CloseAndWaitForRecovery(); CloseAndWaitForRecovery(); @@ -943,7 +943,7 @@ internal void CloseAllAndWaitForRecovery() internal void CloseAllAndWaitForRecovery(AutorecoveringConnection conn) { - ManualResetEvent rl = PrepareForRecovery(conn); + ManualResetEventSlim rl = PrepareForRecovery(conn); CloseAllConnections(); Wait(rl); } @@ -955,8 +955,8 @@ internal void CloseAndWaitForRecovery() internal void CloseAndWaitForRecovery(AutorecoveringConnection conn) { - ManualResetEvent sl = PrepareForShutdown(conn); - ManualResetEvent rl = PrepareForRecovery(conn); + ManualResetEventSlim sl = PrepareForShutdown(conn); + ManualResetEventSlim rl = PrepareForRecovery(conn); CloseConnection(conn); Wait(sl); Wait(rl); @@ -964,22 +964,22 @@ internal void CloseAndWaitForRecovery(AutorecoveringConnection conn) internal void CloseAndWaitForShutdown(AutorecoveringConnection conn) { - ManualResetEvent sl = PrepareForShutdown(conn); + ManualResetEventSlim sl = PrepareForShutdown(conn); CloseConnection(conn); Wait(sl); } - internal ManualResetEvent PrepareForRecovery(AutorecoveringConnection conn) + internal ManualResetEventSlim PrepareForRecovery(AutorecoveringConnection conn) { - var latch = new ManualResetEvent(false); + var latch = new ManualResetEventSlim(false); conn.RecoverySucceeded += (source, ea) => latch.Set(); return latch; } - internal ManualResetEvent PrepareForShutdown(IConnection conn) + internal ManualResetEventSlim PrepareForShutdown(IConnection conn) { - var latch = new ManualResetEvent(false); + var latch = new ManualResetEventSlim(false); conn.ConnectionShutdown += (c, args) => latch.Set(); return latch; @@ -997,14 +997,14 @@ internal void RestartServerAndWaitForRecovery() internal void RestartServerAndWaitForRecovery(AutorecoveringConnection conn) { - ManualResetEvent sl = PrepareForShutdown(conn); - ManualResetEvent rl = PrepareForRecovery(conn); + ManualResetEventSlim sl = PrepareForShutdown(conn); + ManualResetEventSlim rl = PrepareForRecovery(conn); RestartRabbitMQ(); Wait(sl); Wait(rl); } - internal void TestDelayedBasicAckNackAfterChannelRecovery(TestBasicConsumer1 cons, ManualResetEvent latch) + internal void TestDelayedBasicAckNackAfterChannelRecovery(TestBasicConsumer1 cons, ManualResetEventSlim latch) { string q = Model.QueueDeclare(GenerateQueueName(), false, false, false, null).QueueName; int n = 30; @@ -1047,7 +1047,7 @@ internal void WaitForShutdown(IConnection conn) public class AckingBasicConsumer : TestBasicConsumer1 { - public AckingBasicConsumer(IModel model, ManualResetEvent latch, Action fn) + public AckingBasicConsumer(IModel model, ManualResetEventSlim latch, Action fn) : base(model, latch, fn) { } @@ -1060,7 +1060,7 @@ public override void PostHandleDelivery(ulong deliveryTag) public class NackingBasicConsumer : TestBasicConsumer1 { - public NackingBasicConsumer(IModel model, ManualResetEvent latch, Action fn) + public NackingBasicConsumer(IModel model, ManualResetEventSlim latch, Action fn) : base(model, latch, fn) { } @@ -1073,7 +1073,7 @@ public override void PostHandleDelivery(ulong deliveryTag) public class RejectingBasicConsumer : TestBasicConsumer1 { - public RejectingBasicConsumer(IModel model, ManualResetEvent latch, Action fn) + public RejectingBasicConsumer(IModel model, ManualResetEventSlim latch, Action fn) : base(model, latch, fn) { } @@ -1087,10 +1087,10 @@ public override void PostHandleDelivery(ulong deliveryTag) public class TestBasicConsumer1 : DefaultBasicConsumer { private readonly Action _action; - private readonly ManualResetEvent _latch; + private readonly ManualResetEventSlim _latch; private ushort _counter = 0; - public TestBasicConsumer1(IModel model, ManualResetEvent latch, Action fn) + public TestBasicConsumer1(IModel model, ManualResetEventSlim latch, Action fn) : base(model) { _latch = latch; diff --git a/projects/Unit/TestConnectionShutdown.cs b/projects/Unit/TestConnectionShutdown.cs index 7d1b5b950f..98874f29c0 100644 --- a/projects/Unit/TestConnectionShutdown.cs +++ b/projects/Unit/TestConnectionShutdown.cs @@ -53,7 +53,7 @@ public class TestConnectionShutdown : IntegrationFixture [Test] public void TestShutdownSignalPropagationToChannels() { - var latch = new ManualResetEvent(false); + var latch = new ManualResetEventSlim(false); Model.ModelShutdown += (model, args) => { latch.Set(); @@ -67,7 +67,7 @@ public void TestShutdownSignalPropagationToChannels() public void TestConsumerDispatcherShutdown() { var m = (AutorecoveringModel)Model; - var latch = new ManualResetEvent(false); + var latch = new ManualResetEventSlim(false); Model.ModelShutdown += (model, args) => { diff --git a/projects/Unit/TestConsumerOperationDispatch.cs b/projects/Unit/TestConsumerOperationDispatch.cs index b07e82f348..fa08cb5ff7 100644 --- a/projects/Unit/TestConsumerOperationDispatch.cs +++ b/projects/Unit/TestConsumerOperationDispatch.cs @@ -163,7 +163,7 @@ public void TestChannelShutdownDoesNotShutDownDispatcher() string q2 = ch2.QueueDeclare().QueueName; ch2.QueueBind(queue: q2, exchange: _x, routingKey: ""); - var latch = new ManualResetEvent(false); + var latch = new ManualResetEventSlim(false); ch1.BasicConsume(q1, true, new EventingBasicConsumer(ch1)); var c2 = new EventingBasicConsumer(ch2); c2.Received += (object sender, BasicDeliverEventArgs e) => @@ -180,10 +180,10 @@ public void TestChannelShutdownDoesNotShutDownDispatcher() private class ShutdownLatchConsumer : DefaultBasicConsumer { - public ManualResetEvent Latch { get; private set; } - public ManualResetEvent DuplicateLatch { get; private set; } + public ManualResetEventSlim Latch { get; private set; } + public ManualResetEventSlim DuplicateLatch { get; private set; } - public ShutdownLatchConsumer(ManualResetEvent latch, ManualResetEvent duplicateLatch) + public ShutdownLatchConsumer(ManualResetEventSlim latch, ManualResetEventSlim duplicateLatch) { Latch = latch; DuplicateLatch = duplicateLatch; @@ -192,7 +192,7 @@ public ShutdownLatchConsumer(ManualResetEvent latch, ManualResetEvent duplicateL public override void HandleModelShutdown(object model, ShutdownEventArgs reason) { // keep track of duplicates - if (Latch.WaitOne(0)){ + if (Latch.Wait(0)){ DuplicateLatch.Set(); } else { Latch.Set(); @@ -203,15 +203,15 @@ public override void HandleModelShutdown(object model, ShutdownEventArgs reason) [Test] public void TestModelShutdownHandler() { - var latch = new ManualResetEvent(false); - var duplicateLatch = new ManualResetEvent(false); + var latch = new ManualResetEventSlim(false); + var duplicateLatch = new ManualResetEventSlim(false); string q = Model.QueueDeclare().QueueName; var c = new ShutdownLatchConsumer(latch, duplicateLatch); Model.BasicConsume(queue: q, autoAck: true, consumer: c); Model.Close(); Wait(latch, TimeSpan.FromSeconds(5)); - Assert.IsFalse(duplicateLatch.WaitOne(TimeSpan.FromSeconds(5)), + Assert.IsFalse(duplicateLatch.Wait(TimeSpan.FromSeconds(5)), "event handler fired more than once"); } } diff --git a/projects/Unit/TestEventingConsumer.cs b/projects/Unit/TestEventingConsumer.cs index 2bfa84b63c..65c1a78d0d 100644 --- a/projects/Unit/TestEventingConsumer.cs +++ b/projects/Unit/TestEventingConsumer.cs @@ -54,9 +54,9 @@ public void TestEventingConsumerRegistrationEvents() { string q = Model.QueueDeclare(); - var registeredLatch = new ManualResetEvent(false); + var registeredLatch = new ManualResetEventSlim(false); object registeredSender = null; - var unregisteredLatch = new ManualResetEvent(false); + var unregisteredLatch = new ManualResetEventSlim(false); object unregisteredSender = null; EventingBasicConsumer ec = new EventingBasicConsumer(Model); diff --git a/projects/Unit/TestModelShutdown.cs b/projects/Unit/TestModelShutdown.cs index b42b1ceac5..dd842fb0b9 100644 --- a/projects/Unit/TestModelShutdown.cs +++ b/projects/Unit/TestModelShutdown.cs @@ -54,7 +54,7 @@ internal class TestModelShutdown : IntegrationFixture public void TestConsumerDispatcherShutdown() { var m = (AutorecoveringModel)Model; - var latch = new ManualResetEvent(false); + var latch = new ManualResetEventSlim(false); Model.ModelShutdown += (model, args) => {