diff --git a/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj b/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj index 0d9f2ddcf56832..8e537cba3577c5 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj +++ b/src/libraries/System.Threading.RateLimiting/src/System.Threading.RateLimiting.csproj @@ -1,7 +1,9 @@ - + $(NetCoreAppCurrent);$(NetCoreAppMinimum);netstandard2.0;$(NetFrameworkMinimum) true + true + 1 true APIs to help manage rate limiting. diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs index 508340a2e138be..38982e7332ef2f 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/ConcurrencyLimiter.cs @@ -132,6 +132,8 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C return new ValueTask(SuccessfulLease); } + using var disposer = default(RequestRegistration.Disposer); + // Perf: Check SemaphoreSlim implementation instead of locking lock (Lock) { @@ -152,15 +154,25 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C RequestRegistration oldestRequest = _queue.DequeueHead(); _queueCount -= oldestRequest.Count; Debug.Assert(_queueCount >= 0); - if (!oldestRequest.Tcs.TrySetResult(FailedLease)) + if (!oldestRequest.TrySetResult(FailedLease)) { - // Updating queue count is handled by the cancellation code - _queueCount += oldestRequest.Count; + if (!oldestRequest.QueueCountModified) + { + // We already updated the queue count, the Cancel code is about to run or running and waiting on our lock, + // tell Cancel not to do anything + oldestRequest.QueueCountModified = true; + } + else + { + // Updating queue count was handled by the cancellation code, don't double count + _queueCount += oldestRequest.Count; + } } else { Interlocked.Increment(ref _failedLeasesCount); } + disposer.Add(oldestRequest); } while (_options.QueueLimit - _queueCount < permitCount); } @@ -172,22 +184,12 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C } } - CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken); - CancellationTokenRegistration ctr = default; - if (cancellationToken.CanBeCanceled) - { - ctr = cancellationToken.Register(static obj => - { - ((CancelQueueState)obj!).TrySetCanceled(); - }, tcs); - } - - RequestRegistration request = new RequestRegistration(permitCount, tcs, ctr); + var request = new RequestRegistration(permitCount, this, cancellationToken); _queue.EnqueueTail(request); _queueCount += permitCount; Debug.Assert(_queueCount <= _options.QueueLimit); - return new ValueTask(request.Tcs.Task); + return new ValueTask(request.Task); } } @@ -223,8 +225,15 @@ private bool TryLeaseUnsynchronized(int permitCount, [NotNullWhen(true)] out Rat return false; } +#if DEBUG + // for unit testing + internal event Action? ReleasePreHook; + internal event Action? ReleasePostHook; +#endif + private void Release(int releaseCount) { + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (_disposed) @@ -235,6 +244,10 @@ private void Release(int releaseCount) _permitCount += releaseCount; Debug.Assert(_permitCount <= _options.PermitLimit); +#if DEBUG + ReleasePreHook?.Invoke(); +#endif + while (_queue.Count > 0) { RequestRegistration nextPendingRequest = @@ -242,6 +255,22 @@ private void Release(int releaseCount) ? _queue.PeekHead() : _queue.PeekTail(); + // Request was handled already, either via cancellation or being kicked from the queue due to a newer request being queued. + // We just need to remove the item and let the next queued item be considered for completion. + if (nextPendingRequest.Task.IsCompleted) + { + nextPendingRequest = + _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst + ? _queue.DequeueHead() + : _queue.DequeueTail(); + disposer.Add(nextPendingRequest); + continue; + } + +#if DEBUG + ReleasePostHook?.Invoke(); +#endif + if (_permitCount >= nextPendingRequest.Count) { nextPendingRequest = @@ -255,18 +284,27 @@ private void Release(int releaseCount) ConcurrencyLease lease = nextPendingRequest.Count == 0 ? SuccessfulLease : new ConcurrencyLease(true, this, nextPendingRequest.Count); // Check if request was canceled - if (!nextPendingRequest.Tcs.TrySetResult(lease)) + if (!nextPendingRequest.TrySetResult(lease)) { - // Queued item was canceled so add count back + // Queued item was canceled so add count back, permits weren't acquired _permitCount += nextPendingRequest.Count; - // Updating queue count is handled by the cancellation code - _queueCount += nextPendingRequest.Count; + if (!nextPendingRequest.QueueCountModified) + { + // We already updated the queue count, the Cancel code is about to run or running and waiting on our lock, + // tell Cancel not to do anything + nextPendingRequest.QueueCountModified = true; + } + else + { + // Updating queue count was handled by the cancellation code, don't double count + _queueCount += nextPendingRequest.Count; + } } else { Interlocked.Increment(ref _successfulLeasesCount); } - nextPendingRequest.CancellationTokenRegistration.Dispose(); + disposer.Add(nextPendingRequest); Debug.Assert(_queueCount >= 0); } else @@ -278,7 +316,6 @@ private void Release(int releaseCount) if (_permitCount == _options.PermitLimit) { Debug.Assert(_idleSince is null); - Debug.Assert(_queueCount == 0); _idleSince = Stopwatch.GetTimestamp(); } } @@ -291,6 +328,7 @@ protected override void Dispose(bool disposing) return; } + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (_disposed) @@ -303,8 +341,8 @@ protected override void Dispose(bool disposing) RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - next.CancellationTokenRegistration.Dispose(); - next.Tcs.TrySetResult(FailedLease); + disposer.Add(next); + next.TrySetResult(FailedLease); } } } @@ -372,49 +410,78 @@ protected override void Dispose(bool disposing) } } - private readonly struct RequestRegistration + private sealed class RequestRegistration : TaskCompletionSource { - public RequestRegistration(int requestedCount, TaskCompletionSource tcs, - CancellationTokenRegistration cancellationTokenRegistration) - { - Count = requestedCount; - // Perf: Use AsyncOperation instead - Tcs = tcs; - CancellationTokenRegistration = cancellationTokenRegistration; - } + private readonly CancellationToken _cancellationToken; + private CancellationTokenRegistration _cancellationTokenRegistration; - public int Count { get; } + // Update under the limiter lock and only if the queue count was updated by the calling code + public bool QueueCountModified { get; set; } - public TaskCompletionSource Tcs { get; } + // this field is used only by the disposal mechanics and never shared between threads + private RequestRegistration? _next; - public CancellationTokenRegistration CancellationTokenRegistration { get; } - } + public RequestRegistration(int permitCount, ConcurrencyLimiter limiter, CancellationToken cancellationToken) + : base(limiter, TaskCreationOptions.RunContinuationsAsynchronously) + { + Count = permitCount; + _cancellationToken = cancellationToken; - private sealed class CancelQueueState : TaskCompletionSource - { - private readonly int _permitCount; - private readonly ConcurrencyLimiter _limiter; - private readonly CancellationToken _cancellationToken; + // RequestRegistration objects are created while the limiter lock is held + // if cancellationToken fires before or while the lock is held, UnsafeRegister + // is going to invoke the callback synchronously, but this does not create + // a deadlock because lock are reentrant + if (cancellationToken.CanBeCanceled) +#if NETCOREAPP || NETSTANDARD2_1_OR_GREATER + _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this); +#else + _cancellationTokenRegistration = cancellationToken.Register(Cancel, this); +#endif + } + + public int Count { get; } - public CancelQueueState(int permitCount, ConcurrencyLimiter limiter, CancellationToken cancellationToken) - : base(TaskCreationOptions.RunContinuationsAsynchronously) + private static void Cancel(object? state) { - _permitCount = permitCount; - _limiter = limiter; - _cancellationToken = cancellationToken; + if (state is RequestRegistration registration && registration.TrySetCanceled(registration._cancellationToken)) + { + var limiter = (ConcurrencyLimiter)registration.Task.AsyncState!; + lock (limiter.Lock) + { + // Queuing and replenishing code might modify the _queueCount, since there is no guarantee of when the cancellation + // code runs and we only want to update the _queueCount once, we set a bool (under a lock) so either method + // can update the count and not double count. + if (!registration.QueueCountModified) + { + limiter._queueCount -= registration.Count; + registration.QueueCountModified = true; + } + } + } } - public new bool TrySetCanceled() + /// + /// Collects registrations to dispose outside the limiter lock to avoid deadlock. + /// + public struct Disposer : IDisposable { - if (TrySetCanceled(_cancellationToken)) + private RequestRegistration? _next; + + public void Add(RequestRegistration request) { - lock (_limiter.Lock) + request._next = _next; + _next = request; + } + + public void Dispose() + { + for (var current = _next; current is not null; current = current._next) { - _limiter._queueCount -= _permitCount; + current._cancellationTokenRegistration.Dispose(); } - return true; + + _next = null; } - return false; } } } diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs index 381f7038fc60be..0cc6f9f7b21dc3 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/FixedWindowRateLimiter.cs @@ -151,6 +151,7 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C return new ValueTask(SuccessfulLease); } + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease)) @@ -170,14 +171,25 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C RequestRegistration oldestRequest = _queue.DequeueHead(); _queueCount -= oldestRequest.Count; Debug.Assert(_queueCount >= 0); - if (!oldestRequest.Tcs.TrySetResult(FailedLease)) + if (!oldestRequest.TrySetResult(FailedLease)) { - _queueCount += oldestRequest.Count; + if (!oldestRequest.QueueCountModified) + { + // We already updated the queue count, the Cancel code is about to run or running and waiting on our lock, + // tell Cancel not to do anything + oldestRequest.QueueCountModified = true; + } + else + { + // Updating queue count was handled by the cancellation code, don't double count + _queueCount += oldestRequest.Count; + } } else { Interlocked.Increment(ref _failedLeasesCount); } + disposer.Add(oldestRequest); } while (_options.QueueLimit - _queueCount < permitCount); } @@ -189,22 +201,12 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C } } - CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken); - CancellationTokenRegistration ctr = default; - if (cancellationToken.CanBeCanceled) - { - ctr = cancellationToken.Register(static obj => - { - ((CancelQueueState)obj!).TrySetCanceled(); - }, tcs); - } - - RequestRegistration registration = new RequestRegistration(permitCount, tcs, ctr); + var registration = new RequestRegistration(permitCount, this, cancellationToken); _queue.EnqueueTail(registration); _queueCount += permitCount; Debug.Assert(_queueCount <= _options.QueueLimit); - return new ValueTask(registration.Tcs.Task); + return new ValueTask(registration.Task); } } @@ -279,6 +281,8 @@ private static void Replenish(object? state) // Used in tests that test behavior with specific time intervals private void ReplenishInternal(long nowTicks) { + using var disposer = default(RequestRegistration.Disposer); + // Method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes lock (Lock) { @@ -312,7 +316,17 @@ private void ReplenishInternal(long nowTicks) ? _queue.PeekHead() : _queue.PeekTail(); - if (_permitCount >= nextPendingRequest.Count) + // Request was handled already, either via cancellation or being kicked from the queue due to a newer request being queued. + // We just need to remove the item and let the next queued item be considered for completion. + if (nextPendingRequest.Task.IsCompleted) + { + nextPendingRequest = + _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst + ? _queue.DequeueHead() + : _queue.DequeueTail(); + disposer.Add(nextPendingRequest); + } + else if (_permitCount >= nextPendingRequest.Count) { // Request can be fulfilled nextPendingRequest = @@ -324,18 +338,27 @@ private void ReplenishInternal(long nowTicks) _permitCount -= nextPendingRequest.Count; Debug.Assert(_permitCount >= 0); - if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease)) + if (!nextPendingRequest.TrySetResult(SuccessfulLease)) { - // Queued item was canceled so add count back + // Queued item was canceled so add count back, permits weren't acquired _permitCount += nextPendingRequest.Count; - // Updating queue count is handled by the cancellation code - _queueCount += nextPendingRequest.Count; + if (!nextPendingRequest.QueueCountModified) + { + // We already updated the queue count, the Cancel code is about to run or running and waiting on our lock, + // tell Cancel not to do anything + nextPendingRequest.QueueCountModified = true; + } + else + { + // Updating queue count was handled by the cancellation code, don't double count + _queueCount += nextPendingRequest.Count; + } } else { Interlocked.Increment(ref _successfulLeasesCount); } - nextPendingRequest.CancellationTokenRegistration.Dispose(); + disposer.Add(nextPendingRequest); Debug.Assert(_queueCount >= 0); } else @@ -348,7 +371,6 @@ private void ReplenishInternal(long nowTicks) if (_permitCount == _options.PermitLimit) { Debug.Assert(_idleSince is null); - Debug.Assert(_queueCount == 0); _idleSince = Stopwatch.GetTimestamp(); } } @@ -361,6 +383,7 @@ protected override void Dispose(bool disposing) return; } + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (_disposed) @@ -374,8 +397,8 @@ protected override void Dispose(bool disposing) RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - next.CancellationTokenRegistration.Dispose(); - next.Tcs.TrySetResult(FailedLease); + disposer.Add(next); + next.TrySetResult(FailedLease); } } } @@ -424,48 +447,78 @@ public override bool TryGetMetadata(string metadataName, out object? metadata) } } - private readonly struct RequestRegistration + private sealed class RequestRegistration : TaskCompletionSource { - public RequestRegistration(int permitCount, TaskCompletionSource tcs, CancellationTokenRegistration cancellationTokenRegistration) + private readonly CancellationToken _cancellationToken; + private CancellationTokenRegistration _cancellationTokenRegistration; + + // Update under the limiter lock and only if the queue count was updated by the calling code + public bool QueueCountModified { get; set; } + + // this field is used only by the disposal mechanics and never shared between threads + private RequestRegistration? _next; + + public RequestRegistration(int permitCount, FixedWindowRateLimiter limiter, CancellationToken cancellationToken) + : base(limiter, TaskCreationOptions.RunContinuationsAsynchronously) { Count = permitCount; - // Use VoidAsyncOperationWithData instead - Tcs = tcs; - CancellationTokenRegistration = cancellationTokenRegistration; + _cancellationToken = cancellationToken; + + // RequestRegistration objects are created while the limiter lock is held + // if cancellationToken fires before or while the lock is held, UnsafeRegister + // is going to invoke the callback synchronously, but this does not create + // a deadlock because lock are reentrant + if (cancellationToken.CanBeCanceled) +#if NETCOREAPP || NETSTANDARD2_1_OR_GREATER + _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this); +#else + _cancellationTokenRegistration = cancellationToken.Register(Cancel, this); +#endif } public int Count { get; } - public TaskCompletionSource Tcs { get; } - - public CancellationTokenRegistration CancellationTokenRegistration { get; } - } - - private sealed class CancelQueueState : TaskCompletionSource - { - private readonly int _permitCount; - private readonly FixedWindowRateLimiter _limiter; - private readonly CancellationToken _cancellationToken; - - public CancelQueueState(int permitCount, FixedWindowRateLimiter limiter, CancellationToken cancellationToken) - : base(TaskCreationOptions.RunContinuationsAsynchronously) + private static void Cancel(object? state) { - _permitCount = permitCount; - _limiter = limiter; - _cancellationToken = cancellationToken; + if (state is RequestRegistration registration && registration.TrySetCanceled(registration._cancellationToken)) + { + var limiter = (FixedWindowRateLimiter)registration.Task.AsyncState!; + lock (limiter.Lock) + { + // Queuing and replenishing code might modify the _queueCount, since there is no guarantee of when the cancellation + // code runs and we only want to update the _queueCount once, we set a bool (under a lock) so either method + // can update the count and not double count. + if (!registration.QueueCountModified) + { + limiter._queueCount -= registration.Count; + registration.QueueCountModified = true; + } + } + } } - public new bool TrySetCanceled() + /// + /// Collects registrations to dispose outside the limiter lock to avoid deadlock. + /// + public struct Disposer : IDisposable { - if (TrySetCanceled(_cancellationToken)) + private RequestRegistration? _next; + + public void Add(RequestRegistration request) { - lock (_limiter.Lock) + request._next = _next; + _next = request; + } + + public void Dispose() + { + for (var current = _next; current is not null; current = current._next) { - _limiter._queueCount -= _permitCount; + current._cancellationTokenRegistration.Dispose(); } - return true; + + _next = null; } - return false; } } } diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs index 1db6c8e9cbb39b..0b092711335372 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/SlidingWindowRateLimiter.cs @@ -159,6 +159,7 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C return new ValueTask(SuccessfulLease); } + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease)) @@ -178,14 +179,25 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C RequestRegistration oldestRequest = _queue.DequeueHead(); _queueCount -= oldestRequest.Count; Debug.Assert(_queueCount >= 0); - if (!oldestRequest.Tcs.TrySetResult(FailedLease)) + if (!oldestRequest.TrySetResult(FailedLease)) { - _queueCount += oldestRequest.Count; + if (!oldestRequest.QueueCountModified) + { + // We already updated the queue count, the Cancel code is about to run or running and waiting on our lock, + // tell Cancel not to do anything + oldestRequest.QueueCountModified = true; + } + else + { + // Updating queue count was handled by the cancellation code, don't double count + _queueCount += oldestRequest.Count; + } } else { Interlocked.Increment(ref _failedLeasesCount); } + disposer.Add(oldestRequest); } while (_options.QueueLimit - _queueCount < permitCount); } @@ -197,22 +209,12 @@ protected override ValueTask AcquireAsyncCore(int permitCount, C } } - CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken); - CancellationTokenRegistration ctr = default; - if (cancellationToken.CanBeCanceled) - { - ctr = cancellationToken.Register(static obj => - { - ((CancelQueueState)obj!).TrySetCanceled(); - }, tcs); - } - - RequestRegistration registration = new RequestRegistration(permitCount, tcs, ctr); + var registration = new RequestRegistration(permitCount, this, cancellationToken); _queue.EnqueueTail(registration); _queueCount += permitCount; Debug.Assert(_queueCount <= _options.QueueLimit); - return new ValueTask(registration.Tcs.Task); + return new ValueTask(registration.Task); } } @@ -281,6 +283,8 @@ private static void Replenish(object? state) // Used in tests that test behavior with specific time intervals private void ReplenishInternal(long nowTicks) { + using var disposer = default(RequestRegistration.Disposer); + // Method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes lock (Lock) { @@ -318,8 +322,18 @@ private void ReplenishInternal(long nowTicks) ? _queue.PeekHead() : _queue.PeekTail(); + // Request was handled already, either via cancellation or being kicked from the queue due to a newer request being queued. + // We just need to remove the item and let the next queued item be considered for completion. + if (nextPendingRequest.Task.IsCompleted) + { + nextPendingRequest = + _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst + ? _queue.DequeueHead() + : _queue.DequeueTail(); + disposer.Add(nextPendingRequest); + } // If we have enough permits after replenishing to serve the queued requests - if (_permitCount >= nextPendingRequest.Count) + else if (_permitCount >= nextPendingRequest.Count) { // Request can be fulfilled nextPendingRequest = @@ -332,19 +346,28 @@ private void ReplenishInternal(long nowTicks) _requestsPerSegment[_currentSegmentIndex] += nextPendingRequest.Count; Debug.Assert(_permitCount >= 0); - if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease)) + if (!nextPendingRequest.TrySetResult(SuccessfulLease)) { - // Queued item was canceled so add count back + // Queued item was canceled so add count back, permits weren't acquired _permitCount += nextPendingRequest.Count; _requestsPerSegment[_currentSegmentIndex] -= nextPendingRequest.Count; - // Updating queue count is handled by the cancellation code - _queueCount += nextPendingRequest.Count; + if (!nextPendingRequest.QueueCountModified) + { + // We already updated the queue count, the Cancel code is about to run or running and waiting on our lock, + // tell Cancel not to do anything + nextPendingRequest.QueueCountModified = true; + } + else + { + // Updating queue count was handled by the cancellation code, don't double count + _queueCount += nextPendingRequest.Count; + } } else { Interlocked.Increment(ref _successfulLeasesCount); } - nextPendingRequest.CancellationTokenRegistration.Dispose(); + disposer.Add(nextPendingRequest); Debug.Assert(_queueCount >= 0); } else @@ -357,7 +380,6 @@ private void ReplenishInternal(long nowTicks) if (_permitCount == _options.PermitLimit) { Debug.Assert(_idleSince is null); - Debug.Assert(_queueCount == 0); _idleSince = Stopwatch.GetTimestamp(); } } @@ -370,6 +392,7 @@ protected override void Dispose(bool disposing) return; } + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (_disposed) @@ -383,8 +406,8 @@ protected override void Dispose(bool disposing) RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - next.CancellationTokenRegistration.Dispose(); - next.Tcs.TrySetResult(FailedLease); + disposer.Add(next); + next.TrySetResult(FailedLease); } } } @@ -433,48 +456,78 @@ public override bool TryGetMetadata(string metadataName, out object? metadata) } } - private readonly struct RequestRegistration + private sealed class RequestRegistration : TaskCompletionSource { - public RequestRegistration(int permitCount, TaskCompletionSource tcs, CancellationTokenRegistration cancellationTokenRegistration) + private readonly CancellationToken _cancellationToken; + private CancellationTokenRegistration _cancellationTokenRegistration; + + // Update under the limiter lock and only if the queue count was updated by the calling code + public bool QueueCountModified { get; set; } + + // this field is used only by the disposal mechanics and never shared between threads + private RequestRegistration? _next; + + public RequestRegistration(int permitCount, SlidingWindowRateLimiter limiter, CancellationToken cancellationToken) + : base(limiter, TaskCreationOptions.RunContinuationsAsynchronously) { Count = permitCount; - // Use VoidAsyncOperationWithData instead - Tcs = tcs; - CancellationTokenRegistration = cancellationTokenRegistration; + _cancellationToken = cancellationToken; + + // RequestRegistration objects are created while the limiter lock is held + // if cancellationToken fires before or while the lock is held, UnsafeRegister + // is going to invoke the callback synchronously, but this does not create + // a deadlock because lock are reentrant + if (cancellationToken.CanBeCanceled) +#if NETCOREAPP || NETSTANDARD2_1_OR_GREATER + _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this); +#else + _cancellationTokenRegistration = cancellationToken.Register(Cancel, this); +#endif } public int Count { get; } - public TaskCompletionSource Tcs { get; } - - public CancellationTokenRegistration CancellationTokenRegistration { get; } - } - - private sealed class CancelQueueState : TaskCompletionSource - { - private readonly int _permitCount; - private readonly SlidingWindowRateLimiter _limiter; - private readonly CancellationToken _cancellationToken; - - public CancelQueueState(int permitCount, SlidingWindowRateLimiter limiter, CancellationToken cancellationToken) - : base(TaskCreationOptions.RunContinuationsAsynchronously) + private static void Cancel(object? state) { - _permitCount = permitCount; - _limiter = limiter; - _cancellationToken = cancellationToken; + if (state is RequestRegistration registration && registration.TrySetCanceled(registration._cancellationToken)) + { + var limiter = (SlidingWindowRateLimiter)registration.Task.AsyncState!; + lock (limiter.Lock) + { + // Queuing and replenishing code might modify the _queueCount, since there is no guarantee of when the cancellation + // code runs and we only want to update the _queueCount once, we set a bool (under a lock) so either method + // can update the count and not double count. + if (!registration.QueueCountModified) + { + limiter._queueCount -= registration.Count; + registration.QueueCountModified = true; + } + } + } } - public new bool TrySetCanceled() + /// + /// Collects registrations to dispose outside the limiter lock to avoid deadlock. + /// + public struct Disposer : IDisposable { - if (TrySetCanceled(_cancellationToken)) + private RequestRegistration? _next; + + public void Add(RequestRegistration request) { - lock (_limiter.Lock) + request._next = _next; + _next = request; + } + + public void Dispose() + { + for (var current = _next; current is not null; current = current._next) { - _limiter._queueCount -= _permitCount; + current._cancellationTokenRegistration.Dispose(); } - return true; + + _next = null; } - return false; } } } diff --git a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs index f1fbcb4433c4d8..84b22a6b7e7045 100644 --- a/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs +++ b/src/libraries/System.Threading.RateLimiting/src/System/Threading/RateLimiting/TokenBucketRateLimiter.cs @@ -152,6 +152,7 @@ protected override ValueTask AcquireAsyncCore(int tokenCount, Ca return new ValueTask(SuccessfulLease); } + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (TryLeaseUnsynchronized(tokenCount, out RateLimitLease? lease)) @@ -171,15 +172,25 @@ protected override ValueTask AcquireAsyncCore(int tokenCount, Ca RequestRegistration oldestRequest = _queue.DequeueHead(); _queueCount -= oldestRequest.Count; Debug.Assert(_queueCount >= 0); - if (!oldestRequest.Tcs.TrySetResult(FailedLease)) + if (!oldestRequest.TrySetResult(FailedLease)) { - // Updating queue count is handled by the cancellation code - _queueCount += oldestRequest.Count; + if (!oldestRequest.QueueCountModified) + { + // We already updated the queue count, the Cancel code is about to run or running and waiting on our lock, + // tell Cancel not to do anything + oldestRequest.QueueCountModified = true; + } + else + { + // Updating queue count was handled by the cancellation code, don't double count + _queueCount += oldestRequest.Count; + } } else { Interlocked.Increment(ref _failedLeasesCount); } + disposer.Add(oldestRequest); } while (_options.QueueLimit - _queueCount < tokenCount); } @@ -191,22 +202,12 @@ protected override ValueTask AcquireAsyncCore(int tokenCount, Ca } } - CancelQueueState tcs = new CancelQueueState(tokenCount, this, cancellationToken); - CancellationTokenRegistration ctr = default; - if (cancellationToken.CanBeCanceled) - { - ctr = cancellationToken.Register(static obj => - { - ((CancelQueueState)obj!).TrySetCanceled(); - }, tcs); - } - - RequestRegistration registration = new RequestRegistration(tokenCount, tcs, ctr); + var registration = new RequestRegistration(tokenCount, this, cancellationToken); _queue.EnqueueTail(registration); _queueCount += tokenCount; Debug.Assert(_queueCount <= _options.QueueLimit); - return new ValueTask(registration.Tcs.Task); + return new ValueTask(registration.Task); } } @@ -283,6 +284,8 @@ private static void Replenish(object? state) // Used in tests to avoid dealing with real time private void ReplenishInternal(long nowTicks) { + using var disposer = default(RequestRegistration.Disposer); + // method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes lock (Lock) { @@ -323,7 +326,17 @@ private void ReplenishInternal(long nowTicks) ? queue.PeekHead() : queue.PeekTail(); - if (_tokenCount >= nextPendingRequest.Count) + // Request was handled already, either via cancellation or being kicked from the queue due to a newer request being queued. + // We just need to remove the item and let the next queued item be considered for completion. + if (nextPendingRequest.Task.IsCompleted) + { + nextPendingRequest = + _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst + ? queue.DequeueHead() + : queue.DequeueTail(); + disposer.Add(nextPendingRequest); + } + else if (_tokenCount >= nextPendingRequest.Count) { // Request can be fulfilled nextPendingRequest = @@ -335,18 +348,27 @@ private void ReplenishInternal(long nowTicks) _tokenCount -= nextPendingRequest.Count; Debug.Assert(_tokenCount >= 0); - if (!nextPendingRequest.Tcs.TrySetResult(SuccessfulLease)) + if (!nextPendingRequest.TrySetResult(SuccessfulLease)) { - // Queued item was canceled so add count back + // Queued item was canceled so add count back, permits weren't acquired _tokenCount += nextPendingRequest.Count; - // Updating queue count is handled by the cancellation code - _queueCount += nextPendingRequest.Count; + if (!nextPendingRequest.QueueCountModified) + { + // We already updated the queue count, the Cancel code is about to run or running and waiting on our lock, + // tell Cancel not to do anything + nextPendingRequest.QueueCountModified = true; + } + else + { + // Updating queue count was handled by the cancellation code, don't double count + _queueCount += nextPendingRequest.Count; + } } else { Interlocked.Increment(ref _successfulLeasesCount); } - nextPendingRequest.CancellationTokenRegistration.Dispose(); + disposer.Add(nextPendingRequest); Debug.Assert(_queueCount >= 0); } else @@ -359,7 +381,6 @@ private void ReplenishInternal(long nowTicks) if (_tokenCount == _options.TokenLimit) { Debug.Assert(_idleSince is null); - Debug.Assert(_queueCount == 0); _idleSince = Stopwatch.GetTimestamp(); } } @@ -372,6 +393,7 @@ protected override void Dispose(bool disposing) return; } + using var disposer = default(RequestRegistration.Disposer); lock (Lock) { if (_disposed) @@ -385,8 +407,8 @@ protected override void Dispose(bool disposing) RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst ? _queue.DequeueHead() : _queue.DequeueTail(); - next.CancellationTokenRegistration.Dispose(); - next.Tcs.TrySetResult(FailedLease); + disposer.Add(next); + next.TrySetResult(FailedLease); } } } @@ -435,48 +457,78 @@ public override bool TryGetMetadata(string metadataName, out object? metadata) } } - private readonly struct RequestRegistration + private sealed class RequestRegistration : TaskCompletionSource { - public RequestRegistration(int tokenCount, TaskCompletionSource tcs, CancellationTokenRegistration cancellationTokenRegistration) - { - Count = tokenCount; - // Use VoidAsyncOperationWithData instead - Tcs = tcs; - CancellationTokenRegistration = cancellationTokenRegistration; - } + private readonly CancellationToken _cancellationToken; + private CancellationTokenRegistration _cancellationTokenRegistration; - public int Count { get; } + // Update under the limiter lock and only if the queue count was updated by the calling code + public bool QueueCountModified { get; set; } - public TaskCompletionSource Tcs { get; } + // this field is used only by the disposal mechanics and never shared between threads + private RequestRegistration? _next; - public CancellationTokenRegistration CancellationTokenRegistration { get; } - } + public RequestRegistration(int permitCount, TokenBucketRateLimiter limiter, CancellationToken cancellationToken) + : base(limiter, TaskCreationOptions.RunContinuationsAsynchronously) + { + Count = permitCount; + _cancellationToken = cancellationToken; - private sealed class CancelQueueState : TaskCompletionSource - { - private readonly int _tokenCount; - private readonly TokenBucketRateLimiter _limiter; - private readonly CancellationToken _cancellationToken; + // RequestRegistration objects are created while the limiter lock is held + // if cancellationToken fires before or while the lock is held, UnsafeRegister + // is going to invoke the callback synchronously, but this does not create + // a deadlock because lock are reentrant + if (cancellationToken.CanBeCanceled) +#if NETCOREAPP || NETSTANDARD2_1_OR_GREATER + _cancellationTokenRegistration = cancellationToken.UnsafeRegister(Cancel, this); +#else + _cancellationTokenRegistration = cancellationToken.Register(Cancel, this); +#endif + } - public CancelQueueState(int tokenCount, TokenBucketRateLimiter limiter, CancellationToken cancellationToken) - : base(TaskCreationOptions.RunContinuationsAsynchronously) + public int Count { get; } + + private static void Cancel(object? state) { - _tokenCount = tokenCount; - _limiter = limiter; - _cancellationToken = cancellationToken; + if (state is RequestRegistration registration && registration.TrySetCanceled(registration._cancellationToken)) + { + var limiter = (TokenBucketRateLimiter)registration.Task.AsyncState!; + lock (limiter.Lock) + { + // Queuing and replenishing code might modify the _queueCount, since there is no guarantee of when the cancellation + // code runs and we only want to update the _queueCount once, we set a bool (under a lock) so either method + // can update the count and not double count. + if (!registration.QueueCountModified) + { + limiter._queueCount -= registration.Count; + registration.QueueCountModified = true; + } + } + } } - public new bool TrySetCanceled() + /// + /// Collects registrations to dispose outside the limiter lock to avoid deadlock. + /// + public struct Disposer : IDisposable { - if (TrySetCanceled(_cancellationToken)) + private RequestRegistration? _next; + + public void Add(RequestRegistration request) { - lock (_limiter.Lock) + request._next = _next; + _next = request; + } + + public void Dispose() + { + for (var current = _next; current is not null; current = current._next) { - _limiter._queueCount -= _tokenCount; + current._cancellationTokenRegistration.Dispose(); } - return true; + + _next = null; } - return false; } } } diff --git a/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs b/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs index 54e2bbecc19752..63914c4d99ec5e 100644 --- a/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs +++ b/src/libraries/System.Threading.RateLimiting/tests/ConcurrencyLimiterTests.cs @@ -1,6 +1,7 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Reflection; using System.Threading.Tasks; using Xunit; @@ -123,6 +124,51 @@ public override async Task CanAcquireResourceAsync_QueuesAndGrabsNewest() Assert.True(lease.IsAcquired); } +#if DEBUG + [Fact] + public Task DoesNotDeadlockCleaningUpCanceledRequestedLease_Pre() => + DoesNotDeadlockCleaningUpCanceledRequestedLease((limiter, hook) => SetReleasePreHook(limiter, hook)); + + [Fact] + public Task DoesNotDeadlockCleaningUpCanceledRequestedLease_Post() => + DoesNotDeadlockCleaningUpCanceledRequestedLease((limiter, hook) => SetReleasePostHook(limiter, hook)); + + private void SetReleasePreHook(ConcurrencyLimiter limiter, Action hook) + { + typeof(ConcurrencyLimiter).GetEvent("ReleasePreHook", BindingFlags.NonPublic | BindingFlags.Instance).AddMethod.Invoke(limiter, new object[] { hook }); + } + + private void SetReleasePostHook(ConcurrencyLimiter limiter, Action hook) + { + typeof(ConcurrencyLimiter).GetEvent("ReleasePostHook", BindingFlags.NonPublic | BindingFlags.Instance).AddMethod.Invoke(limiter, new object[] { hook }); + } + + private async Task DoesNotDeadlockCleaningUpCanceledRequestedLease(Action attachHook) + { + using var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions + { + PermitLimit = 1, + QueueProcessingOrder = QueueProcessingOrder.OldestFirst, + QueueLimit = 1 + }); + var lease = limiter.AttemptAcquire(1); + Assert.True(lease.IsAcquired); + + var cts = new CancellationTokenSource(); + _ = limiter.AcquireAsync(1, cts.Token); + attachHook(limiter, () => + { + Task.Run(cts.Cancel); + Thread.Sleep(1); + }); + + var task1 = Task.Delay(1000); + var task2 = Task.Run(lease.Dispose); + Assert.Same(task2, await Task.WhenAny(task1, task2)); + await task2; + } +#endif + [Fact] public override async Task FailsWhenQueuingMoreThanLimit_OldestFirst() {