Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replacing usage of ManualResetEvent with ManualResetEventSlim #837

Merged
merged 1 commit into from
May 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ internal sealed class Connection : IConnection
///<summary>Heartbeat frame for transmission. Reusable across connections.</summary>
private readonly EmptyOutboundFrame _heartbeatFrame = new EmptyOutboundFrame();

private readonly ManualResetEvent _appContinuation = new ManualResetEvent(false);
private readonly ManualResetEventSlim _appContinuation = new ManualResetEventSlim(false);

private volatile ShutdownEventArgs _closeReason = null;
private volatile bool _closed = false;
Expand Down Expand Up @@ -346,8 +346,7 @@ public void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
}
}

bool receivedSignal = _appContinuation.WaitOne(timeout);

bool receivedSignal = _appContinuation.Wait(timeout);
if (!receivedSignal)
{
_frameHandler.Close();
Expand Down
6 changes: 3 additions & 3 deletions projects/RabbitMQ.Client/client/impl/ModelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ abstract class ModelBase : IFullModel, IRecoverable
private TimeSpan _continuationTimeout = TimeSpan.FromSeconds(20);

private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
private readonly ManualResetEvent _flowControlBlock = new ManualResetEvent(true);
private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true);

private readonly object _shutdownLock = new object();
private readonly object _rpcLock = new object();
Expand Down Expand Up @@ -353,7 +353,7 @@ public void ModelSend(MethodBase method, ContentHeaderBase header, ReadOnlyMemor
{
if (method.HasContent)
{
_flowControlBlock.WaitOne();
_flowControlBlock.Wait();
Session.Transmit(new Command(method, header, body));
}
else
Expand Down Expand Up @@ -1410,7 +1410,7 @@ public void WaitForConfirmsOrDie(TimeSpan timeout)

internal void SendCommands(IList<Command> commands)
{
_flowControlBlock.WaitOne();
_flowControlBlock.Wait();
AllocatatePublishSeqNos(commands.Count);
Session.Transmit(commands);
}
Expand Down
8 changes: 4 additions & 4 deletions projects/Unit/Fixtures.cs
Original file line number Diff line number Diff line change
Expand Up @@ -698,14 +698,14 @@ internal void StartRabbitMQ()
// Concurrency and Coordination
//

internal void Wait(ManualResetEvent latch)
internal void Wait(ManualResetEventSlim latch)
{
Assert.IsTrue(latch.WaitOne(TimeSpan.FromSeconds(10)), "waiting on a latch timed out");
Assert.IsTrue(latch.Wait(TimeSpan.FromSeconds(10)), "waiting on a latch timed out");
}

internal void Wait(ManualResetEvent latch, TimeSpan timeSpan)
internal void Wait(ManualResetEventSlim latch, TimeSpan timeSpan)
{
Assert.IsTrue(latch.WaitOne(timeSpan), "waiting on a latch timed out");
Assert.IsTrue(latch.Wait(timeSpan), "waiting on a latch timed out");
}

//
Expand Down
58 changes: 29 additions & 29 deletions projects/Unit/TestConnectionRecovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void CleanUp()
[Test]
public void TestBasicAckAfterChannelRecovery()
{
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
var cons = new AckingBasicConsumer(Model, latch, CloseAndWaitForRecovery);

TestDelayedBasicAckNackAfterChannelRecovery(cons, latch);
Expand Down Expand Up @@ -115,7 +115,7 @@ public void TestBasicAckAfterBasicGetAndChannelRecovery()
public void TestBasicAckEventHandlerRecovery()
{
Model.ConfirmSelect();
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
((AutorecoveringModel)Model).BasicAcks += (m, args) => latch.Set();
((AutorecoveringModel)Model).BasicNacks += (m, args) => latch.Set();

Expand Down Expand Up @@ -235,7 +235,7 @@ public void TestBasicModelRecoveryOnServerRestart()
[Test]
public void TestBasicNackAfterChannelRecovery()
{
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
var cons = new NackingBasicConsumer(Model, latch, CloseAndWaitForRecovery);

TestDelayedBasicAckNackAfterChannelRecovery(cons, latch);
Expand All @@ -244,7 +244,7 @@ public void TestBasicNackAfterChannelRecovery()
[Test]
public void TestBasicRejectAfterChannelRecovery()
{
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
var cons = new RejectingBasicConsumer(Model, latch, CloseAndWaitForRecovery);

TestDelayedBasicAckNackAfterChannelRecovery(cons, latch);
Expand All @@ -253,7 +253,7 @@ public void TestBasicRejectAfterChannelRecovery()
[Test]
public void TestBlockedListenersRecovery()
{
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
Conn.ConnectionBlocked += (c, reason) => latch.Set();
CloseAndWaitForRecovery();
CloseAndWaitForRecovery();
Expand Down Expand Up @@ -314,7 +314,7 @@ public void TestConsumerWorkServiceRecovery()
CloseAndWaitForRecovery(c);

Assert.IsTrue(m.IsOpen);
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
cons.Received += (s, args) => latch.Set();

m.BasicPublish("", q, null, encoding.GetBytes("msg"));
Expand Down Expand Up @@ -354,7 +354,7 @@ public void TestConsumerRecoveryOnClientNamedQueueWithOneRecovery()
AssertConsumerCount(m, q1, 1);
Assert.False(queueNameChangeAfterRecoveryCalled);

var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
cons.Received += (s, args) => latch.Set();

m.BasicPublish("", q1, null, encoding.GetBytes("msg"));
Expand All @@ -376,7 +376,7 @@ public void TestConsumerRecoveryWithManyConsumers()
Model.BasicConsume(q, true, cons);
}

var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
((AutorecoveringConnection)Conn).ConsumerTagChangeAfterRecovery += (prev, current) => latch.Set();

CloseAndWaitForRecovery();
Expand Down Expand Up @@ -593,7 +593,7 @@ public void TestServerNamedTransientAutoDeleteQueueAndBindingRecovery()
string q = ch.QueueDeclare(queue: "", durable: false, exclusive: false, autoDelete: true, arguments: null).QueueName;
string nameBefore = q;
string nameAfter = null;
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
((AutorecoveringConnection)Conn).QueueNameChangeAfterRecovery += (source, ea) =>
{
nameBefore = ea.NameBefore;
Expand Down Expand Up @@ -696,7 +696,7 @@ public void TestServerNamedQueueRecovery()
string nameBefore = q;
string nameAfter = null;

var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
var connection = (AutorecoveringConnection)Conn;
connection.RecoverySucceeded += (source, ea) => latch.Set();
connection.QueueNameChangeAfterRecovery += (source, ea) => { nameAfter = ea.NameAfter; };
Expand Down Expand Up @@ -733,8 +733,8 @@ public void TestShutdownEventHandlersRecoveryOnConnectionAfterDelayedServerResta
{
int counter = 0;
Conn.ConnectionShutdown += (c, args) => Interlocked.Increment(ref counter);
ManualResetEvent shutdownLatch = PrepareForShutdown(Conn);
ManualResetEvent recoveryLatch = PrepareForRecovery((AutorecoveringConnection)Conn);
ManualResetEventSlim shutdownLatch = PrepareForShutdown(Conn);
ManualResetEventSlim recoveryLatch = PrepareForRecovery((AutorecoveringConnection)Conn);

Assert.IsTrue(Conn.IsOpen);
StopRabbitMQ();
Expand Down Expand Up @@ -884,7 +884,7 @@ public void TestThatDeletedQueuesDontReappearOnRecovery()
[Test]
public void TestUnblockedListenersRecovery()
{
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
Conn.ConnectionUnblocked += (source, ea) => latch.Set();
CloseAndWaitForRecovery();
CloseAndWaitForRecovery();
Expand Down Expand Up @@ -943,7 +943,7 @@ internal void CloseAllAndWaitForRecovery()

internal void CloseAllAndWaitForRecovery(AutorecoveringConnection conn)
{
ManualResetEvent rl = PrepareForRecovery(conn);
ManualResetEventSlim rl = PrepareForRecovery(conn);
CloseAllConnections();
Wait(rl);
}
Expand All @@ -955,31 +955,31 @@ internal void CloseAndWaitForRecovery()

internal void CloseAndWaitForRecovery(AutorecoveringConnection conn)
{
ManualResetEvent sl = PrepareForShutdown(conn);
ManualResetEvent rl = PrepareForRecovery(conn);
ManualResetEventSlim sl = PrepareForShutdown(conn);
ManualResetEventSlim rl = PrepareForRecovery(conn);
CloseConnection(conn);
Wait(sl);
Wait(rl);
}

internal void CloseAndWaitForShutdown(AutorecoveringConnection conn)
{
ManualResetEvent sl = PrepareForShutdown(conn);
ManualResetEventSlim sl = PrepareForShutdown(conn);
CloseConnection(conn);
Wait(sl);
}

internal ManualResetEvent PrepareForRecovery(AutorecoveringConnection conn)
internal ManualResetEventSlim PrepareForRecovery(AutorecoveringConnection conn)
{
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
conn.RecoverySucceeded += (source, ea) => latch.Set();

return latch;
}

internal ManualResetEvent PrepareForShutdown(IConnection conn)
internal ManualResetEventSlim PrepareForShutdown(IConnection conn)
{
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
conn.ConnectionShutdown += (c, args) => latch.Set();

return latch;
Expand All @@ -997,14 +997,14 @@ internal void RestartServerAndWaitForRecovery()

internal void RestartServerAndWaitForRecovery(AutorecoveringConnection conn)
{
ManualResetEvent sl = PrepareForShutdown(conn);
ManualResetEvent rl = PrepareForRecovery(conn);
ManualResetEventSlim sl = PrepareForShutdown(conn);
ManualResetEventSlim rl = PrepareForRecovery(conn);
RestartRabbitMQ();
Wait(sl);
Wait(rl);
}

internal void TestDelayedBasicAckNackAfterChannelRecovery(TestBasicConsumer1 cons, ManualResetEvent latch)
internal void TestDelayedBasicAckNackAfterChannelRecovery(TestBasicConsumer1 cons, ManualResetEventSlim latch)
{
string q = Model.QueueDeclare(GenerateQueueName(), false, false, false, null).QueueName;
int n = 30;
Expand Down Expand Up @@ -1047,7 +1047,7 @@ internal void WaitForShutdown(IConnection conn)

public class AckingBasicConsumer : TestBasicConsumer1
{
public AckingBasicConsumer(IModel model, ManualResetEvent latch, Action fn)
public AckingBasicConsumer(IModel model, ManualResetEventSlim latch, Action fn)
: base(model, latch, fn)
{
}
Expand All @@ -1060,7 +1060,7 @@ public override void PostHandleDelivery(ulong deliveryTag)

public class NackingBasicConsumer : TestBasicConsumer1
{
public NackingBasicConsumer(IModel model, ManualResetEvent latch, Action fn)
public NackingBasicConsumer(IModel model, ManualResetEventSlim latch, Action fn)
: base(model, latch, fn)
{
}
Expand All @@ -1073,7 +1073,7 @@ public override void PostHandleDelivery(ulong deliveryTag)

public class RejectingBasicConsumer : TestBasicConsumer1
{
public RejectingBasicConsumer(IModel model, ManualResetEvent latch, Action fn)
public RejectingBasicConsumer(IModel model, ManualResetEventSlim latch, Action fn)
: base(model, latch, fn)
{
}
Expand All @@ -1087,10 +1087,10 @@ public override void PostHandleDelivery(ulong deliveryTag)
public class TestBasicConsumer1 : DefaultBasicConsumer
{
private readonly Action _action;
private readonly ManualResetEvent _latch;
private readonly ManualResetEventSlim _latch;
private ushort _counter = 0;

public TestBasicConsumer1(IModel model, ManualResetEvent latch, Action fn)
public TestBasicConsumer1(IModel model, ManualResetEventSlim latch, Action fn)
: base(model)
{
_latch = latch;
Expand Down
4 changes: 2 additions & 2 deletions projects/Unit/TestConnectionShutdown.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class TestConnectionShutdown : IntegrationFixture
[Test]
public void TestShutdownSignalPropagationToChannels()
{
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);

Model.ModelShutdown += (model, args) => {
latch.Set();
Expand All @@ -67,7 +67,7 @@ public void TestShutdownSignalPropagationToChannels()
public void TestConsumerDispatcherShutdown()
{
var m = (AutorecoveringModel)Model;
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);

Model.ModelShutdown += (model, args) =>
{
Expand Down
16 changes: 8 additions & 8 deletions projects/Unit/TestConsumerOperationDispatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public void TestChannelShutdownDoesNotShutDownDispatcher()
string q2 = ch2.QueueDeclare().QueueName;
ch2.QueueBind(queue: q2, exchange: _x, routingKey: "");

var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
ch1.BasicConsume(q1, true, new EventingBasicConsumer(ch1));
var c2 = new EventingBasicConsumer(ch2);
c2.Received += (object sender, BasicDeliverEventArgs e) =>
Expand All @@ -180,10 +180,10 @@ public void TestChannelShutdownDoesNotShutDownDispatcher()

private class ShutdownLatchConsumer : DefaultBasicConsumer
{
public ManualResetEvent Latch { get; private set; }
public ManualResetEvent DuplicateLatch { get; private set; }
public ManualResetEventSlim Latch { get; private set; }
public ManualResetEventSlim DuplicateLatch { get; private set; }

public ShutdownLatchConsumer(ManualResetEvent latch, ManualResetEvent duplicateLatch)
public ShutdownLatchConsumer(ManualResetEventSlim latch, ManualResetEventSlim duplicateLatch)
{
Latch = latch;
DuplicateLatch = duplicateLatch;
Expand All @@ -192,7 +192,7 @@ public ShutdownLatchConsumer(ManualResetEvent latch, ManualResetEvent duplicateL
public override void HandleModelShutdown(object model, ShutdownEventArgs reason)
{
// keep track of duplicates
if (Latch.WaitOne(0)){
if (Latch.Wait(0)){
DuplicateLatch.Set();
} else {
Latch.Set();
Expand All @@ -203,15 +203,15 @@ public override void HandleModelShutdown(object model, ShutdownEventArgs reason)
[Test]
public void TestModelShutdownHandler()
{
var latch = new ManualResetEvent(false);
var duplicateLatch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);
var duplicateLatch = new ManualResetEventSlim(false);
string q = Model.QueueDeclare().QueueName;
var c = new ShutdownLatchConsumer(latch, duplicateLatch);

Model.BasicConsume(queue: q, autoAck: true, consumer: c);
Model.Close();
Wait(latch, TimeSpan.FromSeconds(5));
Assert.IsFalse(duplicateLatch.WaitOne(TimeSpan.FromSeconds(5)),
Assert.IsFalse(duplicateLatch.Wait(TimeSpan.FromSeconds(5)),
"event handler fired more than once");
}
}
Expand Down
4 changes: 2 additions & 2 deletions projects/Unit/TestEventingConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public void TestEventingConsumerRegistrationEvents()
{
string q = Model.QueueDeclare();

var registeredLatch = new ManualResetEvent(false);
var registeredLatch = new ManualResetEventSlim(false);
object registeredSender = null;
var unregisteredLatch = new ManualResetEvent(false);
var unregisteredLatch = new ManualResetEventSlim(false);
object unregisteredSender = null;

EventingBasicConsumer ec = new EventingBasicConsumer(Model);
Expand Down
2 changes: 1 addition & 1 deletion projects/Unit/TestModelShutdown.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ internal class TestModelShutdown : IntegrationFixture
public void TestConsumerDispatcherShutdown()
{
var m = (AutorecoveringModel)Model;
var latch = new ManualResetEvent(false);
var latch = new ManualResetEventSlim(false);

Model.ModelShutdown += (model, args) =>
{
Expand Down