Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using a linked list for fast efficient tracking of [publisher confirm] delivery tags #836

Merged
merged 2 commits into from
May 19, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 81 additions & 82 deletions projects/RabbitMQ.Client/client/impl/ModelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,12 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing;
using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Util;

Expand All @@ -71,19 +70,16 @@ abstract class ModelBase : IFullModel, IRecoverable
private readonly object _shutdownLock = new object();
private readonly object _rpcLock = new object();
private readonly object _confirmLock = new object();

private ulong _maxDeliveryId;
private ulong _deliveredItems = 0;
private readonly LinkedList<ulong> _pendingDeliveryTags = new LinkedList<ulong>();
private readonly CountdownEvent _deliveryTagsCountdown = new CountdownEvent(0);

private EventHandler<ShutdownEventArgs> _modelShutdown;

private bool _onlyAcksReceived = true;
private ulong _nextPublishSeqNo;

public IConsumerDispatcher ConsumerDispatcher { get; private set; }

public ModelBase(ISession session)
: this(session, session.Connection.ConsumerWorkService)
public ModelBase(ISession session) : this(session, session.Connection.ConsumerWorkService)
{ }

public ModelBase(ISession session, ConsumerWorkService workService)
Expand All @@ -103,7 +99,7 @@ public ModelBase(ISession session, ConsumerWorkService workService)
protected void Initialise(ISession session)
{
CloseReason = null;
_nextPublishSeqNo = 0;
NextPublishSeqNo = 0;
Session = session;
Session.CommandReceived = HandleCommand;
Session.SessionShutdown += OnSessionShutdown;
Expand Down Expand Up @@ -180,7 +176,7 @@ public bool IsOpen
get { return CloseReason == null; }
}

public ulong NextPublishSeqNo { get => _nextPublishSeqNo; }
public ulong NextPublishSeqNo { get; private set; }

public ISession Session { get; private set; }

Expand Down Expand Up @@ -494,11 +490,8 @@ public virtual void OnModelShutdown(ShutdownEventArgs reason)
}
}
}
lock (_confirmLock)
{
Monitor.Pulse(_confirmLock);
}

_deliveryTagsCountdown.Reset(0);
_flowControlBlock.Set();
}

Expand Down Expand Up @@ -1084,14 +1077,25 @@ public abstract void BasicNack(ulong deliveryTag,
bool multiple,
bool requeue);

internal void AllocatatePublishSeqNos(int count)
internal void AllocatePublishSeqNos(int count)
{

lock (_confirmLock)
if (NextPublishSeqNo > 0)
{
if (_nextPublishSeqNo > 0)
lock (_confirmLock)
{
_nextPublishSeqNo = InterlockedEx.Add(ref _nextPublishSeqNo, (ulong)count);
if (_deliveryTagsCountdown.IsSet)
{
_deliveryTagsCountdown.Reset(count);
}
else
{
_deliveryTagsCountdown.AddCount(count);
}

for (int i = 0; i < count; i++)
{
_pendingDeliveryTags.AddLast(NextPublishSeqNo++);
}
}
}
}
Expand All @@ -1111,11 +1115,21 @@ public void BasicPublish(string exchange,
{
basicProperties = CreateBasicProperties();
}
if (_nextPublishSeqNo > 0)

if (NextPublishSeqNo > 0)
{
lock (_confirmLock)
{
_nextPublishSeqNo = InterlockedEx.Increment(ref _nextPublishSeqNo);
if (_deliveryTagsCountdown.IsSet)
{
_deliveryTagsCountdown.Reset(1);
}
else
{
_deliveryTagsCountdown.AddCount();
}

_pendingDeliveryTags.AddLast(NextPublishSeqNo++);
}
}

Expand Down Expand Up @@ -1176,7 +1190,7 @@ public void ConfirmSelect()
{
if (NextPublishSeqNo == 0UL)
{
_nextPublishSeqNo = 1;
NextPublishSeqNo = 1;
}

_Private_ConfirmSelect(false);
Expand Down Expand Up @@ -1338,34 +1352,32 @@ public bool WaitForConfirms(TimeSpan timeout, out bool timedOut)
}
bool isWaitInfinite = timeout.TotalMilliseconds == Timeout.Infinite;
Stopwatch stopwatch = Stopwatch.StartNew();
lock (_confirmLock)
while (true)
{
while (true)
if (!IsOpen)
{
if (!IsOpen)
{
throw new AlreadyClosedException(CloseReason);
}
throw new AlreadyClosedException(CloseReason);
}

if (_deliveredItems == _nextPublishSeqNo - 1)
{
bool aux = _onlyAcksReceived;
_onlyAcksReceived = true;
timedOut = false;
return aux;
}
if (isWaitInfinite)
{
Monitor.Wait(_confirmLock);
}
else
if (_deliveryTagsCountdown.IsSet)
{
bool aux = _onlyAcksReceived;
_onlyAcksReceived = true;
timedOut = false;
return aux;
}

if (isWaitInfinite)
{
_deliveryTagsCountdown.Wait();
}
else
{
TimeSpan elapsed = stopwatch.Elapsed;
if (elapsed > timeout || !_deliveryTagsCountdown.Wait(timeout - elapsed))
{
TimeSpan elapsed = stopwatch.Elapsed;
if (elapsed > timeout || !Monitor.Wait(_confirmLock, timeout - elapsed))
{
timedOut = true;
return _onlyAcksReceived;
}
timedOut = true;
return _onlyAcksReceived;
}
}
}
Expand Down Expand Up @@ -1411,33 +1423,41 @@ public void WaitForConfirmsOrDie(TimeSpan timeout)
internal void SendCommands(IList<Command> commands)
{
_flowControlBlock.Wait();
AllocatatePublishSeqNos(commands.Count);
AllocatePublishSeqNos(commands.Count);
Session.Transmit(commands);
}

protected virtual void handleAckNack(ulong deliveryTag, bool multiple, bool isNack)
{
lock (_confirmLock)
// No need to do this if publisher confirms have never been enabled.
if (NextPublishSeqNo > 0)
{
_deliveredItems = InterlockedEx.Increment(ref _deliveredItems);

if (multiple && _maxDeliveryId < deliveryTag)
// let's take a lock so we can assume that deliveryTags are unique, never duplicated and always sorted
lock (_confirmLock)
{
_maxDeliveryId = deliveryTag;
}
// No need to do anything if there are no delivery tags in the list
if (_pendingDeliveryTags.Count > 0)
{
if (multiple)
{
while (_pendingDeliveryTags.First.Value < deliveryTag)
{
_pendingDeliveryTags.RemoveFirst();
_deliveryTagsCountdown.Signal();
}
}

_deliveredItems = Math.Max(_maxDeliveryId, _deliveredItems);
_onlyAcksReceived = _onlyAcksReceived && !isNack;
if (_deliveredItems == _nextPublishSeqNo - 1)
{
Monitor.Pulse(_confirmLock);
if (_pendingDeliveryTags.Remove(deliveryTag))
{
_deliveryTagsCountdown.Signal();
}
}

_onlyAcksReceived = _onlyAcksReceived && !isNack;
}
}

}



private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bool exclusive,
bool autoDelete, IDictionary<string, object> arguments)
{
Expand Down Expand Up @@ -1479,26 +1499,5 @@ public class QueueDeclareRpcContinuation : SimpleBlockingRpcContinuation
{
public QueueDeclareOk m_result;
}

public static class InterlockedEx
{
public static ulong Increment(ref ulong location)
{
long incrementedSigned = Interlocked.Increment(ref Unsafe.As<ulong, long>(ref location));
return Unsafe.As<long, ulong>(ref incrementedSigned);
}

public static ulong Decrement(ref ulong location)
{
long decrementedSigned = Interlocked.Decrement(ref Unsafe.As<ulong, long>(ref location));
return Unsafe.As<long, ulong>(ref decrementedSigned);
}

public static ulong Add(ref ulong location, ulong value)
{
long addSigned = Interlocked.Add(ref Unsafe.As<ulong, long>(ref location), Unsafe.As<ulong, long>(ref value));
return Unsafe.As<long, ulong>(ref addSigned);
}
}
}
}