Skip to content

Commit

Permalink
Replacing usage of ManualResetEvent with ManualResetEventSlim to be u…
Browse files Browse the repository at this point in the history
…sed as a more lightweight synchronization primitive.
  • Loading branch information
stebet committed May 14, 2020
1 parent 0ea9274 commit 141a62f
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 52 deletions.
5 changes: 2 additions & 3 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ internal sealed class Connection : IConnection
///<summary>Heartbeat frame for transmission. Reusable across connections.</summary>
private readonly EmptyOutboundFrame _heartbeatFrame = new EmptyOutboundFrame();

private readonly ManualResetEvent _appContinuation = new ManualResetEvent(false);
private readonly ManualResetEventSlim _appContinuation = new ManualResetEventSlim(false);

private volatile ShutdownEventArgs _closeReason = null;
private volatile bool _closed = false;
Expand Down Expand Up @@ -346,8 +346,7 @@ public void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
}
}

bool receivedSignal = _appContinuation.WaitOne(timeout);

bool receivedSignal = _appContinuation.Wait(timeout);
if (!receivedSignal)
{
_frameHandler.Close();
Expand Down
6 changes: 3 additions & 3 deletions projects/RabbitMQ.Client/client/impl/ModelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ abstract class ModelBase : IFullModel, IRecoverable
private TimeSpan _continuationTimeout = TimeSpan.FromSeconds(20);

private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
private readonly ManualResetEvent _flowControlBlock = new ManualResetEvent(true);
private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true);

private readonly object _shutdownLock = new object();
private readonly object _rpcLock = new object();
Expand Down Expand Up @@ -353,7 +353,7 @@ public void ModelSend(MethodBase method, ContentHeaderBase header, ReadOnlyMemor
{
if (method.HasContent)
{
_flowControlBlock.WaitOne();
_flowControlBlock.Wait();
Session.Transmit(new Command(method, header, body));
}
else
Expand Down Expand Up @@ -1410,7 +1410,7 @@ public void WaitForConfirmsOrDie(TimeSpan timeout)

internal void SendCommands(IList<Command> commands)
{
_flowControlBlock.WaitOne();
_flowControlBlock.Wait();
AllocatatePublishSeqNos(commands.Count);
Session.Transmit(commands);
}
Expand Down
8 changes: 4 additions & 4 deletions projects/Unit/Fixtures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -698,14 +698,14 @@ internal void StartRabbitMQ()
// Concurrency and Coordination
//

internal void Wait(ManualResetEvent latch)
internal void Wait(ManualResetEventSlim latch)
{
Assert.IsTrue(latch.WaitOne(TimeSpan.FromSeconds(10)), "waiting on a latch timed out");
Assert.IsTrue(latch.Wait(TimeSpan.FromSeconds(10)), "waiting on a latch timed out");
}

internal void Wait(ManualResetEvent latch, TimeSpan timeSpan)
internal void Wait(ManualResetEventSlim latch, TimeSpan timeSpan)
{
Assert.IsTrue(latch.WaitOne(timeSpan), "waiting on a latch timed out");
Assert.IsTrue(latch.Wait(timeSpan), "waiting on a latch timed out");
}

//
Expand Down
58 changes: 29 additions & 29 deletions projects/Unit/TestConnectionRecovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void CleanUp()
[Test]
public void TestBasicAckAfterChannelRecovery()
{
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
var cons = new AckingBasicConsumer(Model, latch, CloseAndWaitForRecovery);

TestDelayedBasicAckNackAfterChannelRecovery(cons, latch);
Expand Down Expand Up @@ -115,7 +115,7 @@ public void TestBasicAckAfterBasicGetAndChannelRecovery()
public void TestBasicAckEventHandlerRecovery()
{
Model.ConfirmSelect();
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
((AutorecoveringModel)Model).BasicAcks += (m, args) => latch.Set();
((AutorecoveringModel)Model).BasicNacks += (m, args) => latch.Set();

Expand Down Expand Up @@ -235,7 +235,7 @@ public void TestBasicModelRecoveryOnServerRestart()
[Test]
public void TestBasicNackAfterChannelRecovery()
{
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
var cons = new NackingBasicConsumer(Model, latch, CloseAndWaitForRecovery);

TestDelayedBasicAckNackAfterChannelRecovery(cons, latch);
Expand All @@ -244,7 +244,7 @@ public void TestBasicNackAfterChannelRecovery()
[Test]
public void TestBasicRejectAfterChannelRecovery()
{
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
var cons = new RejectingBasicConsumer(Model, latch, CloseAndWaitForRecovery);

TestDelayedBasicAckNackAfterChannelRecovery(cons, latch);
Expand All @@ -253,7 +253,7 @@ public void TestBasicRejectAfterChannelRecovery()
[Test]
public void TestBlockedListenersRecovery()
{
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
Conn.ConnectionBlocked += (c, reason) => latch.Set();
CloseAndWaitForRecovery();
CloseAndWaitForRecovery();
Expand Down Expand Up @@ -314,7 +314,7 @@ public void TestConsumerWorkServiceRecovery()
CloseAndWaitForRecovery(c);

Assert.IsTrue(m.IsOpen);
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
cons.Received += (s, args) => latch.Set();

m.BasicPublish("", q, null, encoding.GetBytes("msg"));
Expand Down Expand Up @@ -354,7 +354,7 @@ public void TestConsumerRecoveryOnClientNamedQueueWithOneRecovery()
AssertConsumerCount(m, q1, 1);
Assert.False(queueNameChangeAfterRecoveryCalled);

var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
cons.Received += (s, args) => latch.Set();

m.BasicPublish("", q1, null, encoding.GetBytes("msg"));
Expand All @@ -376,7 +376,7 @@ public void TestConsumerRecoveryWithManyConsumers()
Model.BasicConsume(q, true, cons);
}

var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
((AutorecoveringConnection)Conn).ConsumerTagChangeAfterRecovery += (prev, current) => latch.Set();

CloseAndWaitForRecovery();
Expand Down Expand Up @@ -593,7 +593,7 @@ public void TestServerNamedTransientAutoDeleteQueueAndBindingRecovery()
string q = ch.QueueDeclare(queue: "", durable: false, exclusive: false, autoDelete: true, arguments: null).QueueName;
string nameBefore = q;
string nameAfter = null;
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
((AutorecoveringConnection)Conn).QueueNameChangeAfterRecovery += (source, ea) =>
{
nameBefore = ea.NameBefore;
Expand Down Expand Up @@ -696,7 +696,7 @@ public void TestServerNamedQueueRecovery()
string nameBefore = q;
string nameAfter = null;

var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
var connection = (AutorecoveringConnection)Conn;
connection.RecoverySucceeded += (source, ea) => latch.Set();
connection.QueueNameChangeAfterRecovery += (source, ea) => { nameAfter = ea.NameAfter; };
Expand Down Expand Up @@ -733,8 +733,8 @@ public void TestShutdownEventHandlersRecoveryOnConnectionAfterDelayedServerResta
{
int counter = 0;
Conn.ConnectionShutdown += (c, args) => Interlocked.Increment(ref counter);
ManualResetEvent shutdownLatch = PrepareForShutdown(Conn);
ManualResetEvent recoveryLatch = PrepareForRecovery((AutorecoveringConnection)Conn);
ManualResetEventSlim shutdownLatch = PrepareForShutdown(Conn);
ManualResetEventSlim recoveryLatch = PrepareForRecovery((AutorecoveringConnection)Conn);

Assert.IsTrue(Conn.IsOpen);
StopRabbitMQ();
Expand Down Expand Up @@ -884,7 +884,7 @@ public void TestThatDeletedQueuesDontReappearOnRecovery()
[Test]
public void TestUnblockedListenersRecovery()
{
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
Conn.ConnectionUnblocked += (source, ea) => latch.Set();
CloseAndWaitForRecovery();
CloseAndWaitForRecovery();
Expand Down Expand Up @@ -943,7 +943,7 @@ internal void CloseAllAndWaitForRecovery()

internal void CloseAllAndWaitForRecovery(AutorecoveringConnection conn)
{
ManualResetEvent rl = PrepareForRecovery(conn);
ManualResetEventSlim rl = PrepareForRecovery(conn);
CloseAllConnections();
Wait(rl);
}
Expand All @@ -955,31 +955,31 @@ internal void CloseAndWaitForRecovery()

internal void CloseAndWaitForRecovery(AutorecoveringConnection conn)
{
ManualResetEvent sl = PrepareForShutdown(conn);
ManualResetEvent rl = PrepareForRecovery(conn);
ManualResetEventSlim sl = PrepareForShutdown(conn);
ManualResetEventSlim rl = PrepareForRecovery(conn);
CloseConnection(conn);
Wait(sl);
Wait(rl);
}

internal void CloseAndWaitForShutdown(AutorecoveringConnection conn)
{
ManualResetEvent sl = PrepareForShutdown(conn);
ManualResetEventSlim sl = PrepareForShutdown(conn);
CloseConnection(conn);
Wait(sl);
}

internal ManualResetEvent PrepareForRecovery(AutorecoveringConnection conn)
internal ManualResetEventSlim PrepareForRecovery(AutorecoveringConnection conn)
{
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
conn.RecoverySucceeded += (source, ea) => latch.Set();

return latch;
}

internal ManualResetEvent PrepareForShutdown(IConnection conn)
internal ManualResetEventSlim PrepareForShutdown(IConnection conn)
{
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
conn.ConnectionShutdown += (c, args) => latch.Set();

return latch;
Expand All @@ -997,14 +997,14 @@ internal void RestartServerAndWaitForRecovery()

internal void RestartServerAndWaitForRecovery(AutorecoveringConnection conn)
{
ManualResetEvent sl = PrepareForShutdown(conn);
ManualResetEvent rl = PrepareForRecovery(conn);
ManualResetEventSlim sl = PrepareForShutdown(conn);
ManualResetEventSlim rl = PrepareForRecovery(conn);
RestartRabbitMQ();
Wait(sl);
Wait(rl);
}

internal void TestDelayedBasicAckNackAfterChannelRecovery(TestBasicConsumer1 cons, ManualResetEvent latch)
internal void TestDelayedBasicAckNackAfterChannelRecovery(TestBasicConsumer1 cons, ManualResetEventSlim latch)
{
string q = Model.QueueDeclare(GenerateQueueName(), false, false, false, null).QueueName;
int n = 30;
Expand Down Expand Up @@ -1047,7 +1047,7 @@ internal void WaitForShutdown(IConnection conn)

public class AckingBasicConsumer : TestBasicConsumer1
{
public AckingBasicConsumer(IModel model, ManualResetEvent latch, Action fn)
public AckingBasicConsumer(IModel model, ManualResetEventSlim latch, Action fn)
: base(model, latch, fn)
{
}
Expand All @@ -1060,7 +1060,7 @@ public override void PostHandleDelivery(ulong deliveryTag)

public class NackingBasicConsumer : TestBasicConsumer1
{
public NackingBasicConsumer(IModel model, ManualResetEvent latch, Action fn)
public NackingBasicConsumer(IModel model, ManualResetEventSlim latch, Action fn)
: base(model, latch, fn)
{
}
Expand All @@ -1073,7 +1073,7 @@ public override void PostHandleDelivery(ulong deliveryTag)

public class RejectingBasicConsumer : TestBasicConsumer1
{
public RejectingBasicConsumer(IModel model, ManualResetEvent latch, Action fn)
public RejectingBasicConsumer(IModel model, ManualResetEventSlim latch, Action fn)
: base(model, latch, fn)
{
}
Expand All @@ -1087,10 +1087,10 @@ public override void PostHandleDelivery(ulong deliveryTag)
public class TestBasicConsumer1 : DefaultBasicConsumer
{
private readonly Action _action;
private readonly ManualResetEvent _latch;
private readonly ManualResetEventSlim _latch;
private ushort _counter = 0;

public TestBasicConsumer1(IModel model, ManualResetEvent latch, Action fn)
public TestBasicConsumer1(IModel model, ManualResetEventSlim latch, Action fn)
: base(model)
{
_latch = latch;
Expand Down
4 changes: 2 additions & 2 deletions projects/Unit/TestConnectionShutdown.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class TestConnectionShutdown : IntegrationFixture
[Test]
public void TestShutdownSignalPropagationToChannels()
{
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);

Model.ModelShutdown += (model, args) => {
latch.Set();
Expand All @@ -67,7 +67,7 @@ public void TestShutdownSignalPropagationToChannels()
public void TestConsumerDispatcherShutdown()
{
var m = (AutorecoveringModel)Model;
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);

Model.ModelShutdown += (model, args) =>
{
Expand Down
16 changes: 8 additions & 8 deletions projects/Unit/TestConsumerOperationDispatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void TestChannelShutdownDoesNotShutDownDispatcher()
string q2 = ch2.QueueDeclare().QueueName;
ch2.QueueBind(queue: q2, exchange: _x, routingKey: "");

var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
ch1.BasicConsume(q1, true, new EventingBasicConsumer(ch1));
var c2 = new EventingBasicConsumer(ch2);
c2.Received += (object sender, BasicDeliverEventArgs e) =>
Expand All @@ -180,10 +180,10 @@ public void TestChannelShutdownDoesNotShutDownDispatcher()

private class ShutdownLatchConsumer : DefaultBasicConsumer
{
public ManualResetEvent Latch { get; private set; }
public ManualResetEvent DuplicateLatch { get; private set; }
public ManualResetEventSlim Latch { get; private set; }
public ManualResetEventSlim DuplicateLatch { get; private set; }

public ShutdownLatchConsumer(ManualResetEvent latch, ManualResetEvent duplicateLatch)
public ShutdownLatchConsumer(ManualResetEventSlim latch, ManualResetEventSlim duplicateLatch)
{
Latch = latch;
DuplicateLatch = duplicateLatch;
Expand All @@ -192,7 +192,7 @@ public ShutdownLatchConsumer(ManualResetEvent latch, ManualResetEvent duplicateL
public override void HandleModelShutdown(object model, ShutdownEventArgs reason)
{
// keep track of duplicates
if (Latch.WaitOne(0)){
if (Latch.Wait(0)){
DuplicateLatch.Set();
} else {
Latch.Set();
Expand All @@ -203,15 +203,15 @@ public override void HandleModelShutdown(object model, ShutdownEventArgs reason)
[Test]
public void TestModelShutdownHandler()
{
var latch = new ManualResetEvent(false);
var duplicateLatch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
var duplicateLatch = new ManualResetEventSlim(false);
string q = Model.QueueDeclare().QueueName;
var c = new ShutdownLatchConsumer(latch, duplicateLatch);

Model.BasicConsume(queue: q, autoAck: true, consumer: c);
Model.Close();
Wait(latch, TimeSpan.FromSeconds(5));
Assert.IsFalse(duplicateLatch.WaitOne(TimeSpan.FromSeconds(5)),
Assert.IsFalse(duplicateLatch.Wait(TimeSpan.FromSeconds(5)),
"event handler fired more than once");
}
}
Expand Down
4 changes: 2 additions & 2 deletions projects/Unit/TestEventingConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public void TestEventingConsumerRegistrationEvents()
{
string q = Model.QueueDeclare();

var registeredLatch = new ManualResetEvent(false);
var registeredLatch = new ManualResetEventSlim(false);
object registeredSender = null;
var unregisteredLatch = new ManualResetEvent(false);
var unregisteredLatch = new ManualResetEventSlim(false);
object unregisteredSender = null;

EventingBasicConsumer ec = new EventingBasicConsumer(Model);
Expand Down
2 changes: 1 addition & 1 deletion projects/Unit/TestModelShutdown.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ internal class TestModelShutdown : IntegrationFixture
public void TestConsumerDispatcherShutdown()
{
var m = (AutorecoveringModel)Model;
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);

Model.ModelShutdown += (model, args) =>
{
Expand Down

0 comments on commit 141a62f

Please sign in to comment.