Skip to content

Commit 45a9d78

Browse files
authored
Merge branch 'master' into linkedListForDeliveryTags
2 parents 99b4732 + e95f22a commit 45a9d78

9 files changed

+52
-53
lines changed

CHANGELOG.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ our stellar community members (in no particular order): @stebet, @bording,
1313

1414
A full list of changes can be found in the GitHub milestone: [`6.0.0`](https://github.com/rabbitmq/rabbitmq-dotnet-client/milestone/41?closed=1).
1515

16-
### The Switch to System.Memory (and Significantly Lower Memory Footprint that Comes with It)
16+
### The Switch to System.Memory (and [Significantly Lower Memory Footprint](https://stebet.net/real-world-example-of-reducing-allocations-using-span-t-and-memory-t/) that Comes with It)
1717

1818
The client now uses the [`System.Memory` library](https://www.nuget.org/packages/System.Memory/) for message and command payloads. This significantly
1919
reduces object allocation and GC pressure for heavy workloads but also

projects/RabbitMQ.Client/client/impl/Connection.cs

+2-3
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ internal sealed class Connection : IConnection
6464
///<summary>Heartbeat frame for transmission. Reusable across connections.</summary>
6565
private readonly EmptyOutboundFrame _heartbeatFrame = new EmptyOutboundFrame();
6666

67-
private readonly ManualResetEvent _appContinuation = new ManualResetEvent(false);
67+
private readonly ManualResetEventSlim _appContinuation = new ManualResetEventSlim(false);
6868

6969
private volatile ShutdownEventArgs _closeReason = null;
7070
private volatile bool _closed = false;
@@ -346,8 +346,7 @@ public void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
346346
}
347347
}
348348

349-
bool receivedSignal = _appContinuation.WaitOne(timeout);
350-
349+
bool receivedSignal = _appContinuation.Wait(timeout);
351350
if (!receivedSignal)
352351
{
353352
_frameHandler.Close();

projects/RabbitMQ.Client/client/impl/ModelBase.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ abstract class ModelBase : IFullModel, IRecoverable
6565
private TimeSpan _continuationTimeout = TimeSpan.FromSeconds(20);
6666

6767
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
68-
private readonly ManualResetEvent _flowControlBlock = new ManualResetEvent(true);
68+
private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true);
6969

7070
private readonly object _shutdownLock = new object();
7171
private readonly object _rpcLock = new object();
@@ -349,7 +349,7 @@ public void ModelSend(MethodBase method, ContentHeaderBase header, ReadOnlyMemor
349349
{
350350
if (method.HasContent)
351351
{
352-
_flowControlBlock.WaitOne();
352+
_flowControlBlock.Wait();
353353
Session.Transmit(new Command(method, header, body));
354354
}
355355
else
@@ -1422,7 +1422,7 @@ public void WaitForConfirmsOrDie(TimeSpan timeout)
14221422

14231423
internal void SendCommands(IList<Command> commands)
14241424
{
1425-
_flowControlBlock.WaitOne();
1425+
_flowControlBlock.Wait();
14261426
AllocatePublishSeqNos(commands.Count);
14271427
Session.Transmit(commands);
14281428
}

projects/Unit/Fixtures.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -698,14 +698,14 @@ internal void StartRabbitMQ()
698698
// Concurrency and Coordination
699699
//
700700

701-
internal void Wait(ManualResetEvent latch)
701+
internal void Wait(ManualResetEventSlim latch)
702702
{
703-
Assert.IsTrue(latch.WaitOne(TimeSpan.FromSeconds(10)), "waiting on a latch timed out");
703+
Assert.IsTrue(latch.Wait(TimeSpan.FromSeconds(10)), "waiting on a latch timed out");
704704
}
705705

706-
internal void Wait(ManualResetEvent latch, TimeSpan timeSpan)
706+
internal void Wait(ManualResetEventSlim latch, TimeSpan timeSpan)
707707
{
708-
Assert.IsTrue(latch.WaitOne(timeSpan), "waiting on a latch timed out");
708+
Assert.IsTrue(latch.Wait(timeSpan), "waiting on a latch timed out");
709709
}
710710

711711
//

projects/Unit/TestConnectionRecovery.cs

+29-29
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public void CleanUp()
8686
[Test]
8787
public void TestBasicAckAfterChannelRecovery()
8888
{
89-
var latch = new ManualResetEvent(false);
89+
var latch = new ManualResetEventSlim(false);
9090
var cons = new AckingBasicConsumer(Model, latch, CloseAndWaitForRecovery);
9191

9292
TestDelayedBasicAckNackAfterChannelRecovery(cons, latch);
@@ -115,7 +115,7 @@ public void TestBasicAckAfterBasicGetAndChannelRecovery()
115115
public void TestBasicAckEventHandlerRecovery()
116116
{
117117
Model.ConfirmSelect();
118-
var latch = new ManualResetEvent(false);
118+
var latch = new ManualResetEventSlim(false);
119119
((AutorecoveringModel)Model).BasicAcks += (m, args) => latch.Set();
120120
((AutorecoveringModel)Model).BasicNacks += (m, args) => latch.Set();
121121

@@ -235,7 +235,7 @@ public void TestBasicModelRecoveryOnServerRestart()
235235
[Test]
236236
public void TestBasicNackAfterChannelRecovery()
237237
{
238-
var latch = new ManualResetEvent(false);
238+
var latch = new ManualResetEventSlim(false);
239239
var cons = new NackingBasicConsumer(Model, latch, CloseAndWaitForRecovery);
240240

241241
TestDelayedBasicAckNackAfterChannelRecovery(cons, latch);
@@ -244,7 +244,7 @@ public void TestBasicNackAfterChannelRecovery()
244244
[Test]
245245
public void TestBasicRejectAfterChannelRecovery()
246246
{
247-
var latch = new ManualResetEvent(false);
247+
var latch = new ManualResetEventSlim(false);
248248
var cons = new RejectingBasicConsumer(Model, latch, CloseAndWaitForRecovery);
249249

250250
TestDelayedBasicAckNackAfterChannelRecovery(cons, latch);
@@ -253,7 +253,7 @@ public void TestBasicRejectAfterChannelRecovery()
253253
[Test]
254254
public void TestBlockedListenersRecovery()
255255
{
256-
var latch = new ManualResetEvent(false);
256+
var latch = new ManualResetEventSlim(false);
257257
Conn.ConnectionBlocked += (c, reason) => latch.Set();
258258
CloseAndWaitForRecovery();
259259
CloseAndWaitForRecovery();
@@ -314,7 +314,7 @@ public void TestConsumerWorkServiceRecovery()
314314
CloseAndWaitForRecovery(c);
315315

316316
Assert.IsTrue(m.IsOpen);
317-
var latch = new ManualResetEvent(false);
317+
var latch = new ManualResetEventSlim(false);
318318
cons.Received += (s, args) => latch.Set();
319319

320320
m.BasicPublish("", q, null, encoding.GetBytes("msg"));
@@ -354,7 +354,7 @@ public void TestConsumerRecoveryOnClientNamedQueueWithOneRecovery()
354354
AssertConsumerCount(m, q1, 1);
355355
Assert.False(queueNameChangeAfterRecoveryCalled);
356356

357-
var latch = new ManualResetEvent(false);
357+
var latch = new ManualResetEventSlim(false);
358358
cons.Received += (s, args) => latch.Set();
359359

360360
m.BasicPublish("", q1, null, encoding.GetBytes("msg"));
@@ -376,7 +376,7 @@ public void TestConsumerRecoveryWithManyConsumers()
376376
Model.BasicConsume(q, true, cons);
377377
}
378378

379-
var latch = new ManualResetEvent(false);
379+
var latch = new ManualResetEventSlim(false);
380380
((AutorecoveringConnection)Conn).ConsumerTagChangeAfterRecovery += (prev, current) => latch.Set();
381381

382382
CloseAndWaitForRecovery();
@@ -593,7 +593,7 @@ public void TestServerNamedTransientAutoDeleteQueueAndBindingRecovery()
593593
string q = ch.QueueDeclare(queue: "", durable: false, exclusive: false, autoDelete: true, arguments: null).QueueName;
594594
string nameBefore = q;
595595
string nameAfter = null;
596-
var latch = new ManualResetEvent(false);
596+
var latch = new ManualResetEventSlim(false);
597597
((AutorecoveringConnection)Conn).QueueNameChangeAfterRecovery += (source, ea) =>
598598
{
599599
nameBefore = ea.NameBefore;
@@ -696,7 +696,7 @@ public void TestServerNamedQueueRecovery()
696696
string nameBefore = q;
697697
string nameAfter = null;
698698

699-
var latch = new ManualResetEvent(false);
699+
var latch = new ManualResetEventSlim(false);
700700
var connection = (AutorecoveringConnection)Conn;
701701
connection.RecoverySucceeded += (source, ea) => latch.Set();
702702
connection.QueueNameChangeAfterRecovery += (source, ea) => { nameAfter = ea.NameAfter; };
@@ -733,8 +733,8 @@ public void TestShutdownEventHandlersRecoveryOnConnectionAfterDelayedServerResta
733733
{
734734
int counter = 0;
735735
Conn.ConnectionShutdown += (c, args) => Interlocked.Increment(ref counter);
736-
ManualResetEvent shutdownLatch = PrepareForShutdown(Conn);
737-
ManualResetEvent recoveryLatch = PrepareForRecovery((AutorecoveringConnection)Conn);
736+
ManualResetEventSlim shutdownLatch = PrepareForShutdown(Conn);
737+
ManualResetEventSlim recoveryLatch = PrepareForRecovery((AutorecoveringConnection)Conn);
738738

739739
Assert.IsTrue(Conn.IsOpen);
740740
StopRabbitMQ();
@@ -884,7 +884,7 @@ public void TestThatDeletedQueuesDontReappearOnRecovery()
884884
[Test]
885885
public void TestUnblockedListenersRecovery()
886886
{
887-
var latch = new ManualResetEvent(false);
887+
var latch = new ManualResetEventSlim(false);
888888
Conn.ConnectionUnblocked += (source, ea) => latch.Set();
889889
CloseAndWaitForRecovery();
890890
CloseAndWaitForRecovery();
@@ -943,7 +943,7 @@ internal void CloseAllAndWaitForRecovery()
943943

944944
internal void CloseAllAndWaitForRecovery(AutorecoveringConnection conn)
945945
{
946-
ManualResetEvent rl = PrepareForRecovery(conn);
946+
ManualResetEventSlim rl = PrepareForRecovery(conn);
947947
CloseAllConnections();
948948
Wait(rl);
949949
}
@@ -955,31 +955,31 @@ internal void CloseAndWaitForRecovery()
955955

956956
internal void CloseAndWaitForRecovery(AutorecoveringConnection conn)
957957
{
958-
ManualResetEvent sl = PrepareForShutdown(conn);
959-
ManualResetEvent rl = PrepareForRecovery(conn);
958+
ManualResetEventSlim sl = PrepareForShutdown(conn);
959+
ManualResetEventSlim rl = PrepareForRecovery(conn);
960960
CloseConnection(conn);
961961
Wait(sl);
962962
Wait(rl);
963963
}
964964

965965
internal void CloseAndWaitForShutdown(AutorecoveringConnection conn)
966966
{
967-
ManualResetEvent sl = PrepareForShutdown(conn);
967+
ManualResetEventSlim sl = PrepareForShutdown(conn);
968968
CloseConnection(conn);
969969
Wait(sl);
970970
}
971971

972-
internal ManualResetEvent PrepareForRecovery(AutorecoveringConnection conn)
972+
internal ManualResetEventSlim PrepareForRecovery(AutorecoveringConnection conn)
973973
{
974-
var latch = new ManualResetEvent(false);
974+
var latch = new ManualResetEventSlim(false);
975975
conn.RecoverySucceeded += (source, ea) => latch.Set();
976976

977977
return latch;
978978
}
979979

980-
internal ManualResetEvent PrepareForShutdown(IConnection conn)
980+
internal ManualResetEventSlim PrepareForShutdown(IConnection conn)
981981
{
982-
var latch = new ManualResetEvent(false);
982+
var latch = new ManualResetEventSlim(false);
983983
conn.ConnectionShutdown += (c, args) => latch.Set();
984984

985985
return latch;
@@ -997,14 +997,14 @@ internal void RestartServerAndWaitForRecovery()
997997

998998
internal void RestartServerAndWaitForRecovery(AutorecoveringConnection conn)
999999
{
1000-
ManualResetEvent sl = PrepareForShutdown(conn);
1001-
ManualResetEvent rl = PrepareForRecovery(conn);
1000+
ManualResetEventSlim sl = PrepareForShutdown(conn);
1001+
ManualResetEventSlim rl = PrepareForRecovery(conn);
10021002
RestartRabbitMQ();
10031003
Wait(sl);
10041004
Wait(rl);
10051005
}
10061006

1007-
internal void TestDelayedBasicAckNackAfterChannelRecovery(TestBasicConsumer1 cons, ManualResetEvent latch)
1007+
internal void TestDelayedBasicAckNackAfterChannelRecovery(TestBasicConsumer1 cons, ManualResetEventSlim latch)
10081008
{
10091009
string q = Model.QueueDeclare(GenerateQueueName(), false, false, false, null).QueueName;
10101010
int n = 30;
@@ -1047,7 +1047,7 @@ internal void WaitForShutdown(IConnection conn)
10471047

10481048
public class AckingBasicConsumer : TestBasicConsumer1
10491049
{
1050-
public AckingBasicConsumer(IModel model, ManualResetEvent latch, Action fn)
1050+
public AckingBasicConsumer(IModel model, ManualResetEventSlim latch, Action fn)
10511051
: base(model, latch, fn)
10521052
{
10531053
}
@@ -1060,7 +1060,7 @@ public override void PostHandleDelivery(ulong deliveryTag)
10601060

10611061
public class NackingBasicConsumer : TestBasicConsumer1
10621062
{
1063-
public NackingBasicConsumer(IModel model, ManualResetEvent latch, Action fn)
1063+
public NackingBasicConsumer(IModel model, ManualResetEventSlim latch, Action fn)
10641064
: base(model, latch, fn)
10651065
{
10661066
}
@@ -1073,7 +1073,7 @@ public override void PostHandleDelivery(ulong deliveryTag)
10731073

10741074
public class RejectingBasicConsumer : TestBasicConsumer1
10751075
{
1076-
public RejectingBasicConsumer(IModel model, ManualResetEvent latch, Action fn)
1076+
public RejectingBasicConsumer(IModel model, ManualResetEventSlim latch, Action fn)
10771077
: base(model, latch, fn)
10781078
{
10791079
}
@@ -1087,10 +1087,10 @@ public override void PostHandleDelivery(ulong deliveryTag)
10871087
public class TestBasicConsumer1 : DefaultBasicConsumer
10881088
{
10891089
private readonly Action _action;
1090-
private readonly ManualResetEvent _latch;
1090+
private readonly ManualResetEventSlim _latch;
10911091
private ushort _counter = 0;
10921092

1093-
public TestBasicConsumer1(IModel model, ManualResetEvent latch, Action fn)
1093+
public TestBasicConsumer1(IModel model, ManualResetEventSlim latch, Action fn)
10941094
: base(model)
10951095
{
10961096
_latch = latch;

projects/Unit/TestConnectionShutdown.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public class TestConnectionShutdown : IntegrationFixture
5353
[Test]
5454
public void TestShutdownSignalPropagationToChannels()
5555
{
56-
var latch = new ManualResetEvent(false);
56+
var latch = new ManualResetEventSlim(false);
5757

5858
Model.ModelShutdown += (model, args) => {
5959
latch.Set();
@@ -67,7 +67,7 @@ public void TestShutdownSignalPropagationToChannels()
6767
public void TestConsumerDispatcherShutdown()
6868
{
6969
var m = (AutorecoveringModel)Model;
70-
var latch = new ManualResetEvent(false);
70+
var latch = new ManualResetEventSlim(false);
7171

7272
Model.ModelShutdown += (model, args) =>
7373
{

projects/Unit/TestConsumerOperationDispatch.cs

+8-8
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ public void TestChannelShutdownDoesNotShutDownDispatcher()
163163
string q2 = ch2.QueueDeclare().QueueName;
164164
ch2.QueueBind(queue: q2, exchange: _x, routingKey: "");
165165

166-
var latch = new ManualResetEvent(false);
166+
var latch = new ManualResetEventSlim(false);
167167
ch1.BasicConsume(q1, true, new EventingBasicConsumer(ch1));
168168
var c2 = new EventingBasicConsumer(ch2);
169169
c2.Received += (object sender, BasicDeliverEventArgs e) =>
@@ -180,10 +180,10 @@ public void TestChannelShutdownDoesNotShutDownDispatcher()
180180

181181
private class ShutdownLatchConsumer : DefaultBasicConsumer
182182
{
183-
public ManualResetEvent Latch { get; private set; }
184-
public ManualResetEvent DuplicateLatch { get; private set; }
183+
public ManualResetEventSlim Latch { get; private set; }
184+
public ManualResetEventSlim DuplicateLatch { get; private set; }
185185

186-
public ShutdownLatchConsumer(ManualResetEvent latch, ManualResetEvent duplicateLatch)
186+
public ShutdownLatchConsumer(ManualResetEventSlim latch, ManualResetEventSlim duplicateLatch)
187187
{
188188
Latch = latch;
189189
DuplicateLatch = duplicateLatch;
@@ -192,7 +192,7 @@ public ShutdownLatchConsumer(ManualResetEvent latch, ManualResetEvent duplicateL
192192
public override void HandleModelShutdown(object model, ShutdownEventArgs reason)
193193
{
194194
// keep track of duplicates
195-
if (Latch.WaitOne(0)){
195+
if (Latch.Wait(0)){
196196
DuplicateLatch.Set();
197197
} else {
198198
Latch.Set();
@@ -203,15 +203,15 @@ public override void HandleModelShutdown(object model, ShutdownEventArgs reason)
203203
[Test]
204204
public void TestModelShutdownHandler()
205205
{
206-
var latch = new ManualResetEvent(false);
207-
var duplicateLatch = new ManualResetEvent(false);
206+
var latch = new ManualResetEventSlim(false);
207+
var duplicateLatch = new ManualResetEventSlim(false);
208208
string q = Model.QueueDeclare().QueueName;
209209
var c = new ShutdownLatchConsumer(latch, duplicateLatch);
210210

211211
Model.BasicConsume(queue: q, autoAck: true, consumer: c);
212212
Model.Close();
213213
Wait(latch, TimeSpan.FromSeconds(5));
214-
Assert.IsFalse(duplicateLatch.WaitOne(TimeSpan.FromSeconds(5)),
214+
Assert.IsFalse(duplicateLatch.Wait(TimeSpan.FromSeconds(5)),
215215
"event handler fired more than once");
216216
}
217217
}

projects/Unit/TestEventingConsumer.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ public void TestEventingConsumerRegistrationEvents()
5454
{
5555
string q = Model.QueueDeclare();
5656

57-
var registeredLatch = new ManualResetEvent(false);
57+
var registeredLatch = new ManualResetEventSlim(false);
5858
object registeredSender = null;
59-
var unregisteredLatch = new ManualResetEvent(false);
59+
var unregisteredLatch = new ManualResetEventSlim(false);
6060
object unregisteredSender = null;
6161

6262
EventingBasicConsumer ec = new EventingBasicConsumer(Model);

projects/Unit/TestModelShutdown.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ internal class TestModelShutdown : IntegrationFixture
5454
public void TestConsumerDispatcherShutdown()
5555
{
5656
var m = (AutorecoveringModel)Model;
57-
var latch = new ManualResetEvent(false);
57+
var latch = new ManualResetEventSlim(false);
5858

5959
Model.ModelShutdown += (model, args) =>
6060
{

0 commit comments

Comments
 (0)