Skip to content

Commit

Permalink
Spike an exception based approach (misses removing the bool value ret…
Browse files Browse the repository at this point in the history
…urn type)
  • Loading branch information
danielmarbach committed Oct 4, 2024
1 parent a11e32d commit 35314d9
Showing 1 changed file with 54 additions and 88 deletions.
142 changes: 54 additions & 88 deletions projects/RabbitMQ.Client/Impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Buffers.Binary;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
Expand All @@ -43,6 +44,7 @@
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing;
using RabbitMQ.Client.Util;

namespace RabbitMQ.Client.Impl
{
Expand All @@ -62,10 +64,7 @@ internal abstract class ChannelBase : IChannel, IRecoverable
private bool _publisherConfirmationTrackingEnabled = false;
private ulong _nextPublishSeqNo = 0;
private readonly SemaphoreSlim _confirmSemaphore = new(1, 1);
private readonly LinkedList<ulong> _pendingDeliveryTags = new();
private readonly Dictionary<ulong, TaskCompletionSource<bool>> _confirmsTaskCompletionSources = new();

private bool _onlyAcksReceived = true;
private readonly ConcurrentDictionary<ulong, TaskCompletionSource<bool>> _confirmsTaskCompletionSources = new();

private ShutdownEventArgs? _closeReason;
public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason);
Expand Down Expand Up @@ -505,7 +504,7 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
.ConfigureAwait(false);
try
{
if (_confirmsTaskCompletionSources?.Count > 0)
if (!_confirmsTaskCompletionSources.IsEmpty)
{
var exception = new AlreadyClosedException(reason);
foreach (TaskCompletionSource<bool> confirmsTaskCompletionSource in _confirmsTaskCompletionSources.Values)
Expand Down Expand Up @@ -615,7 +614,7 @@ await _basicAcksAsyncWrapper.InvokeAsync(this, args)
.ConfigureAwait(false);
}

await HandleAckNack(ack._deliveryTag, ack._multiple, false, cancellationToken)
await HandleAck(ack._deliveryTag, ack._multiple, cancellationToken)
.ConfigureAwait(false);

return true;
Expand All @@ -633,7 +632,7 @@ await _basicNacksAsyncWrapper.InvokeAsync(this, args)
.ConfigureAwait(false);
}

await HandleAckNack(nack._deliveryTag, nack._multiple, true, cancellationToken)
await HandleNack(nack._deliveryTag, nack._multiple, cancellationToken)
.ConfigureAwait(false);

return true;
Expand All @@ -657,20 +656,14 @@ await _basicReturnAsyncWrapper.InvokeAsync(this, e)
{
ulong publishSequenceNumber = 0;
IReadOnlyBasicProperties props = e.BasicProperties;
if (props.Headers is not null)
object? maybeSeqNum = props.Headers?[Constants.PublishSequenceNumberHeader];
if (maybeSeqNum != null)
{
object? maybeSeqNum = props.Headers[Constants.PublishSequenceNumberHeader];
if (maybeSeqNum is not null)
{
publishSequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte[])maybeSeqNum);
}
publishSequenceNumber = BinaryPrimitives.ReadUInt64BigEndian((byte[])maybeSeqNum);
}

if (publishSequenceNumber != 0 && _publisherConfirmationTrackingEnabled)
{
await HandleAckNack(publishSequenceNumber, false, true, cancellationToken)
.ConfigureAwait(false);
}
await HandleNack(publishSequenceNumber, false, cancellationToken)
.ConfigureAwait(false);
}

return true;
Expand Down Expand Up @@ -1022,7 +1015,6 @@ await _confirmSemaphore.WaitAsync(cancellationToken)

if (_publisherConfirmationTrackingEnabled)
{
_pendingDeliveryTags.AddLast(publishSequenceNumber);
publisherConfirmationTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
_confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs;
}
Expand Down Expand Up @@ -1068,9 +1060,9 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
try
{
_nextPublishSeqNo--;
if (_publisherConfirmationTrackingEnabled && _pendingDeliveryTags is not null)
if (_publisherConfirmationTrackingEnabled)
{
_pendingDeliveryTags.RemoveLast();
_confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _);
}
}
finally
Expand Down Expand Up @@ -1119,7 +1111,6 @@ await _confirmSemaphore.WaitAsync(cancellationToken)

if (_publisherConfirmationTrackingEnabled)
{
_pendingDeliveryTags.AddLast(publishSequenceNumber);
publisherConfirmationTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
_confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs;
}
Expand Down Expand Up @@ -1164,9 +1155,9 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
try
{
_nextPublishSeqNo--;
if (_publisherConfirmationTrackingEnabled && _pendingDeliveryTags is not null)
if (_publisherConfirmationTrackingEnabled)
{
_pendingDeliveryTags.RemoveLast();
_confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _);
}
}
finally
Expand All @@ -1189,6 +1180,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
{
await publisherConfirmationTcs.Task.WaitAsync(cancellationToken)
.ConfigureAwait(false);

return await publisherConfirmationTcs.Task
.ConfigureAwait(false);
}
Expand Down Expand Up @@ -1770,7 +1762,6 @@ private async Task ConfirmSelectAsync(bool publisherConfirmationTrackingEnablefd
{
if (_publisherConfirmationTrackingEnabled)
{
_pendingDeliveryTags.Clear();
_confirmsTaskCompletionSources.Clear();
}
_nextPublishSeqNo = 1;
Expand All @@ -1796,84 +1787,57 @@ await ModelSendAsync(in method, k.CancellationToken)
}
}

// TODO NOTE: this method used to be internal for its use in this test:
// TestWaitForConfirmsWithTimeoutAsync_MessageNacked_WaitingHasTimedout_ReturnFalse
private async Task HandleAckNack(ulong deliveryTag, bool multiple, bool isNack, CancellationToken cancellationToken = default)
private Task HandleAck(ulong deliveryTag, bool multiple, CancellationToken cancellationToken = default)
{
bool isAck = false == isNack;

// Only do this if confirms are enabled *and* the library is tracking confirmations
if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled)
if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled && deliveryTag > 0 && !_confirmsTaskCompletionSources.IsEmpty)
{
// let's take a lock so we can assume that deliveryTags are unique, never duplicated and always sorted
await _confirmSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
if (multiple)
{
// No need to do anything if there are no delivery tags in the list
if (_pendingDeliveryTags.Count > 0)
foreach (var pair in _confirmsTaskCompletionSources)
{
if (multiple)
{
do
{
if (_pendingDeliveryTags.First is null)
{
break;
}
else
{
ulong pendingDeliveryTag = _pendingDeliveryTags.First.Value;
if (pendingDeliveryTag > deliveryTag)
{
break;
}
else
{
TaskCompletionSource<bool> tcs = _confirmsTaskCompletionSources[pendingDeliveryTag];
tcs.SetResult(isAck);
_confirmsTaskCompletionSources.Remove(pendingDeliveryTag);
_pendingDeliveryTags.RemoveFirst();
}
}
}
while (true);
}
else
if (pair.Key <= deliveryTag)
{
/*
* Note:
* In the case of `basic.return`, the TCS will have been handled and removed by HandleBasicReturn()
* RabbitMQ still sends `basic.ack`, so the TCS will not be in the dict, hence, TryGetValue here
*/
if (_confirmsTaskCompletionSources.TryGetValue(deliveryTag, out TaskCompletionSource<bool>? tcs))
{
tcs.SetResult(isAck);
_confirmsTaskCompletionSources.Remove(deliveryTag);
_pendingDeliveryTags.Remove(deliveryTag);
}
pair.Value.SetResult(true);
_confirmsTaskCompletionSources.Remove(pair.Key, out _);
}
}
}
else
{
if (_confirmsTaskCompletionSources.TryRemove(deliveryTag, out TaskCompletionSource<bool>? tcs))
{
tcs.SetResult(true);
}
}
}
return Task.CompletedTask;
}

_onlyAcksReceived = _onlyAcksReceived && isAck;

if (_pendingDeliveryTags.Count == 0 && _confirmsTaskCompletionSources.Count > 0)
private Task HandleNack(ulong deliveryTag, bool multiple, CancellationToken cancellationToken = default)
{
if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled && deliveryTag > 0 && !_confirmsTaskCompletionSources.IsEmpty)
{
if (multiple)
{
foreach (var pair in _confirmsTaskCompletionSources)
{
// Done, mark tasks
foreach (TaskCompletionSource<bool> tcs in _confirmsTaskCompletionSources.Values)
if (pair.Key <= deliveryTag)
{
tcs.TrySetResult(_onlyAcksReceived);
pair.Value.SetException(new Exception("TBD"));
_confirmsTaskCompletionSources.Remove(pair.Key, out _);
}

_confirmsTaskCompletionSources.Clear();
_onlyAcksReceived = true;
}
}
finally
else
{
_confirmSemaphore.Release();
if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource<bool>? tcs))
{
tcs.SetException(new Exception("TBD"));
}
}
}

return Task.CompletedTask;
}

private BasicProperties? PopulateBasicPropertiesHeaders<TProperties>(TProperties basicProperties,
Expand Down Expand Up @@ -1948,7 +1912,9 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers
publishSequenceNumberBytes = BitConverter.GetBytes(publishSequenceNumber);
}

headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes;
var ulongByte = new byte[8];
NetworkOrderSerializer.WriteUInt64(ref ulongByte.AsSpan().GetStart(), publishSequenceNumber);
headers[Constants.PublishSequenceNumberHeader] = ulongByte;
}
}
}
Expand Down

0 comments on commit 35314d9

Please sign in to comment.