Skip to content

Commit ab614d9

Browse files
authored
Merge pull request #836 from stebet/linkedListForDeliveryTags
Using a linked list for fast efficient tracking of [publisher confirm] delivery tags
2 parents 98299f8 + d16050d commit ab614d9

File tree

1 file changed

+81
-82
lines changed

1 file changed

+81
-82
lines changed

projects/RabbitMQ.Client/client/impl/ModelBase.cs

+81-82
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,12 @@
4242
using System.Collections.Generic;
4343
using System.Diagnostics;
4444
using System.IO;
45-
using System.Runtime.CompilerServices;
4645
using System.Text;
4746
using System.Threading;
4847
using System.Threading.Tasks;
48+
4949
using RabbitMQ.Client.Events;
5050
using RabbitMQ.Client.Exceptions;
51-
using RabbitMQ.Client.Framing;
5251
using RabbitMQ.Client.Framing.Impl;
5352
using RabbitMQ.Util;
5453

@@ -71,19 +70,16 @@ abstract class ModelBase : IFullModel, IRecoverable
7170
private readonly object _shutdownLock = new object();
7271
private readonly object _rpcLock = new object();
7372
private readonly object _confirmLock = new object();
74-
75-
private ulong _maxDeliveryId;
76-
private ulong _deliveredItems = 0;
73+
private readonly LinkedList<ulong> _pendingDeliveryTags = new LinkedList<ulong>();
74+
private readonly CountdownEvent _deliveryTagsCountdown = new CountdownEvent(0);
7775

7876
private EventHandler<ShutdownEventArgs> _modelShutdown;
7977

8078
private bool _onlyAcksReceived = true;
81-
private ulong _nextPublishSeqNo;
8279

8380
public IConsumerDispatcher ConsumerDispatcher { get; private set; }
8481

85-
public ModelBase(ISession session)
86-
: this(session, session.Connection.ConsumerWorkService)
82+
public ModelBase(ISession session) : this(session, session.Connection.ConsumerWorkService)
8783
{ }
8884

8985
public ModelBase(ISession session, ConsumerWorkService workService)
@@ -103,7 +99,7 @@ public ModelBase(ISession session, ConsumerWorkService workService)
10399
protected void Initialise(ISession session)
104100
{
105101
CloseReason = null;
106-
_nextPublishSeqNo = 0;
102+
NextPublishSeqNo = 0;
107103
Session = session;
108104
Session.CommandReceived = HandleCommand;
109105
Session.SessionShutdown += OnSessionShutdown;
@@ -180,7 +176,7 @@ public bool IsOpen
180176
get { return CloseReason == null; }
181177
}
182178

183-
public ulong NextPublishSeqNo { get => _nextPublishSeqNo; }
179+
public ulong NextPublishSeqNo { get; private set; }
184180

185181
public ISession Session { get; private set; }
186182

@@ -494,11 +490,8 @@ public virtual void OnModelShutdown(ShutdownEventArgs reason)
494490
}
495491
}
496492
}
497-
lock (_confirmLock)
498-
{
499-
Monitor.Pulse(_confirmLock);
500-
}
501493

494+
_deliveryTagsCountdown.Reset(0);
502495
_flowControlBlock.Set();
503496
}
504497

@@ -1084,14 +1077,25 @@ public abstract void BasicNack(ulong deliveryTag,
10841077
bool multiple,
10851078
bool requeue);
10861079

1087-
internal void AllocatatePublishSeqNos(int count)
1080+
internal void AllocatePublishSeqNos(int count)
10881081
{
1089-
1090-
lock (_confirmLock)
1082+
if (NextPublishSeqNo > 0)
10911083
{
1092-
if (_nextPublishSeqNo > 0)
1084+
lock (_confirmLock)
10931085
{
1094-
_nextPublishSeqNo = InterlockedEx.Add(ref _nextPublishSeqNo, (ulong)count);
1086+
if (_deliveryTagsCountdown.IsSet)
1087+
{
1088+
_deliveryTagsCountdown.Reset(count);
1089+
}
1090+
else
1091+
{
1092+
_deliveryTagsCountdown.AddCount(count);
1093+
}
1094+
1095+
for (int i = 0; i < count; i++)
1096+
{
1097+
_pendingDeliveryTags.AddLast(NextPublishSeqNo++);
1098+
}
10951099
}
10961100
}
10971101
}
@@ -1111,11 +1115,21 @@ public void BasicPublish(string exchange,
11111115
{
11121116
basicProperties = CreateBasicProperties();
11131117
}
1114-
if (_nextPublishSeqNo > 0)
1118+
1119+
if (NextPublishSeqNo > 0)
11151120
{
11161121
lock (_confirmLock)
11171122
{
1118-
_nextPublishSeqNo = InterlockedEx.Increment(ref _nextPublishSeqNo);
1123+
if (_deliveryTagsCountdown.IsSet)
1124+
{
1125+
_deliveryTagsCountdown.Reset(1);
1126+
}
1127+
else
1128+
{
1129+
_deliveryTagsCountdown.AddCount();
1130+
}
1131+
1132+
_pendingDeliveryTags.AddLast(NextPublishSeqNo++);
11191133
}
11201134
}
11211135

@@ -1176,7 +1190,7 @@ public void ConfirmSelect()
11761190
{
11771191
if (NextPublishSeqNo == 0UL)
11781192
{
1179-
_nextPublishSeqNo = 1;
1193+
NextPublishSeqNo = 1;
11801194
}
11811195

11821196
_Private_ConfirmSelect(false);
@@ -1338,34 +1352,32 @@ public bool WaitForConfirms(TimeSpan timeout, out bool timedOut)
13381352
}
13391353
bool isWaitInfinite = timeout.TotalMilliseconds == Timeout.Infinite;
13401354
Stopwatch stopwatch = Stopwatch.StartNew();
1341-
lock (_confirmLock)
1355+
while (true)
13421356
{
1343-
while (true)
1357+
if (!IsOpen)
13441358
{
1345-
if (!IsOpen)
1346-
{
1347-
throw new AlreadyClosedException(CloseReason);
1348-
}
1359+
throw new AlreadyClosedException(CloseReason);
1360+
}
13491361

1350-
if (_deliveredItems == _nextPublishSeqNo - 1)
1351-
{
1352-
bool aux = _onlyAcksReceived;
1353-
_onlyAcksReceived = true;
1354-
timedOut = false;
1355-
return aux;
1356-
}
1357-
if (isWaitInfinite)
1358-
{
1359-
Monitor.Wait(_confirmLock);
1360-
}
1361-
else
1362+
if (_deliveryTagsCountdown.IsSet)
1363+
{
1364+
bool aux = _onlyAcksReceived;
1365+
_onlyAcksReceived = true;
1366+
timedOut = false;
1367+
return aux;
1368+
}
1369+
1370+
if (isWaitInfinite)
1371+
{
1372+
_deliveryTagsCountdown.Wait();
1373+
}
1374+
else
1375+
{
1376+
TimeSpan elapsed = stopwatch.Elapsed;
1377+
if (elapsed > timeout || !_deliveryTagsCountdown.Wait(timeout - elapsed))
13621378
{
1363-
TimeSpan elapsed = stopwatch.Elapsed;
1364-
if (elapsed > timeout || !Monitor.Wait(_confirmLock, timeout - elapsed))
1365-
{
1366-
timedOut = true;
1367-
return _onlyAcksReceived;
1368-
}
1379+
timedOut = true;
1380+
return _onlyAcksReceived;
13691381
}
13701382
}
13711383
}
@@ -1411,33 +1423,41 @@ public void WaitForConfirmsOrDie(TimeSpan timeout)
14111423
internal void SendCommands(IList<Command> commands)
14121424
{
14131425
_flowControlBlock.Wait();
1414-
AllocatatePublishSeqNos(commands.Count);
1426+
AllocatePublishSeqNos(commands.Count);
14151427
Session.Transmit(commands);
14161428
}
14171429

14181430
protected virtual void handleAckNack(ulong deliveryTag, bool multiple, bool isNack)
14191431
{
1420-
lock (_confirmLock)
1432+
// No need to do this if publisher confirms have never been enabled.
1433+
if (NextPublishSeqNo > 0)
14211434
{
1422-
_deliveredItems = InterlockedEx.Increment(ref _deliveredItems);
1423-
1424-
if (multiple && _maxDeliveryId < deliveryTag)
1435+
// let's take a lock so we can assume that deliveryTags are unique, never duplicated and always sorted
1436+
lock (_confirmLock)
14251437
{
1426-
_maxDeliveryId = deliveryTag;
1427-
}
1438+
// No need to do anything if there are no delivery tags in the list
1439+
if (_pendingDeliveryTags.Count > 0)
1440+
{
1441+
if (multiple)
1442+
{
1443+
while (_pendingDeliveryTags.First.Value < deliveryTag)
1444+
{
1445+
_pendingDeliveryTags.RemoveFirst();
1446+
_deliveryTagsCountdown.Signal();
1447+
}
1448+
}
14281449

1429-
_deliveredItems = Math.Max(_maxDeliveryId, _deliveredItems);
1430-
_onlyAcksReceived = _onlyAcksReceived && !isNack;
1431-
if (_deliveredItems == _nextPublishSeqNo - 1)
1432-
{
1433-
Monitor.Pulse(_confirmLock);
1450+
if (_pendingDeliveryTags.Remove(deliveryTag))
1451+
{
1452+
_deliveryTagsCountdown.Signal();
1453+
}
1454+
}
1455+
1456+
_onlyAcksReceived = _onlyAcksReceived && !isNack;
14341457
}
14351458
}
1436-
14371459
}
14381460

1439-
1440-
14411461
private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bool exclusive,
14421462
bool autoDelete, IDictionary<string, object> arguments)
14431463
{
@@ -1479,26 +1499,5 @@ public class QueueDeclareRpcContinuation : SimpleBlockingRpcContinuation
14791499
{
14801500
public QueueDeclareOk m_result;
14811501
}
1482-
1483-
public static class InterlockedEx
1484-
{
1485-
public static ulong Increment(ref ulong location)
1486-
{
1487-
long incrementedSigned = Interlocked.Increment(ref Unsafe.As<ulong, long>(ref location));
1488-
return Unsafe.As<long, ulong>(ref incrementedSigned);
1489-
}
1490-
1491-
public static ulong Decrement(ref ulong location)
1492-
{
1493-
long decrementedSigned = Interlocked.Decrement(ref Unsafe.As<ulong, long>(ref location));
1494-
return Unsafe.As<long, ulong>(ref decrementedSigned);
1495-
}
1496-
1497-
public static ulong Add(ref ulong location, ulong value)
1498-
{
1499-
long addSigned = Interlocked.Add(ref Unsafe.As<ulong, long>(ref location), Unsafe.As<ulong, long>(ref value));
1500-
return Unsafe.As<long, ulong>(ref addSigned);
1501-
}
1502-
}
15031502
}
15041503
}

0 commit comments

Comments
 (0)