From 8ac87105feedb29f887d049848318e5c0b8a2db8 Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Wed, 22 Feb 2023 17:53:09 -0500 Subject: [PATCH 1/2] Use asynchronous lock in Parallel.ForEachAsync with synchronous enumerable Avoid blocking threads while waiting for access to the enumerator in the case of a slower MoveNext. --- .../Threading/Tasks/Parallel.ForEachAsync.cs | 48 ++++++++++++------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/src/libraries/System.Threading.Tasks.Parallel/src/System/Threading/Tasks/Parallel.ForEachAsync.cs b/src/libraries/System.Threading.Tasks.Parallel/src/System/Threading/Tasks/Parallel.ForEachAsync.cs index 619cd1406168d6..ebfd8044a045b0 100644 --- a/src/libraries/System.Threading.Tasks.Parallel/src/System/Threading/Tasks/Parallel.ForEachAsync.cs +++ b/src/libraries/System.Threading.Tasks.Parallel/src/System/Threading/Tasks/Parallel.ForEachAsync.cs @@ -93,17 +93,23 @@ private static Task ForEachAsync(IEnumerable source, int dop, // Continue to loop while there are more elements to be processed. while (!state.Cancellation.IsCancellationRequested) { - // Get the next element from the enumerator. This requires asynchronously locking around MoveNextAsync/Current. + // Get the next element from the enumerator. This requires asynchronously locking around MoveNext/Current. TSource element; - lock (state) + await state.AcquireLock(); + try { - if (!state.Enumerator.MoveNext()) + if (state.Cancellation.IsCancellationRequested || // check now that the lock has been acquired + !state.Enumerator.MoveNext()) { break; } element = state.Enumerator.Current; } + finally + { + state.ReleaseLock(); + } // If the remaining dop allows it and we've not yet queued the next worker, do so now. We wait // until after we've grabbed an item from the enumerator to a) avoid unnecessary contention on the @@ -249,20 +255,11 @@ private static Task ForEachAsync(IAsyncEnumerable source, int { // Get the next element from the enumerator. This requires asynchronously locking around MoveNextAsync/Current. TSource element; + await state.AcquireLock(); try { - // TODO https://github.com/dotnet/runtime/issues/22144: - // Use a no-throwing await if/when one is available built-in. - await state.Lock.WaitAsync(state.Cancellation.Token); - } - catch (OperationCanceledException) - { - break; - } - - try - { - if (!await state.Enumerator.MoveNextAsync()) + if (state.Cancellation.IsCancellationRequested || // check now that the lock has been acquired + !await state.Enumerator.MoveNextAsync()) { break; } @@ -271,7 +268,7 @@ private static Task ForEachAsync(IAsyncEnumerable source, int } finally { - state.Lock.Release(); + state.ReleaseLock(); } // If the remaining dop allows it and we've not yet queued the next worker, do so now. We wait @@ -354,6 +351,8 @@ private abstract class ForEachAsyncState : TaskCompletionSource, IThrea private readonly TaskScheduler _scheduler; /// The present at the time of the ForEachAsync invocation. This is only used if on the default scheduler. private readonly ExecutionContext? _executionContext; + /// Semaphore used to provide exclusive access to the enumerator. + private readonly SemaphoreSlim _lock = new SemaphoreSlim(1, 1); /// The number of outstanding workers. When this hits 0, the operation has completed. private int _completionRefCount; @@ -417,6 +416,21 @@ public void QueueWorkerIfDopAvailable() /// true if this is the last worker to complete iterating; otherwise, false. public bool SignalWorkerCompletedIterating() => Interlocked.Decrement(ref _completionRefCount) == 0; + /// Asynchronously acquires exclusive access to the enumerator. + public Task AcquireLock() => + // We explicitly don't pass this.Cancellation to WaitAsync. Doing so adds overhead, and it isn't actually + // necessary. All of the operations that monitor the lock are part of the same ForEachAsync operation, and the Task + // returned from ForEachAsync can't complete until all of the constituent operations have completed, including whoever + // holds the lock while this worker is waiting on the lock. Thus, the lock will need to be released for the overall + // operation to complete. Passing the token would allow the overall operation to potentially complete a bit faster in + // the face of cancellation, in exchange for making it a bit slower / more overhead in the common case of cancellation + // not being requested. We want to optimize for the latter. This also then avoids an exception throw / catch when + // cancellation is requested. + _lock.WaitAsync(CancellationToken.None); + + /// Relinquishes exclusive access to the enumerator. + public void ReleaseLock() => _lock.Release(); + /// Stores an exception and triggers cancellation in order to alert all workers to stop as soon as possible. /// The exception. public void RecordException(Exception e) @@ -444,6 +458,7 @@ public void Complete() else if (_exceptions is null) { // Everything completed successfully. + Debug.Assert(!Cancellation.IsCancellationRequested); taskSet = TrySetResult(); } else @@ -500,7 +515,6 @@ public void Dispose() /// Specifies the type of data being enumerated. private sealed class AsyncForEachAsyncState : ForEachAsyncState, IAsyncDisposable { - public readonly SemaphoreSlim Lock = new SemaphoreSlim(1, 1); public readonly IAsyncEnumerator Enumerator; public AsyncForEachAsyncState( From a9db0b4b3622b8d0e5d23648bc726cb018dc799e Mon Sep 17 00:00:00 2001 From: Stephen Toub Date: Thu, 23 Feb 2023 11:53:03 -0500 Subject: [PATCH 2/2] Update src/libraries/System.Threading.Tasks.Parallel/src/System/Threading/Tasks/Parallel.ForEachAsync.cs Co-authored-by: Tanner Gooding --- .../src/System/Threading/Tasks/Parallel.ForEachAsync.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libraries/System.Threading.Tasks.Parallel/src/System/Threading/Tasks/Parallel.ForEachAsync.cs b/src/libraries/System.Threading.Tasks.Parallel/src/System/Threading/Tasks/Parallel.ForEachAsync.cs index ebfd8044a045b0..b13157ae3dce4c 100644 --- a/src/libraries/System.Threading.Tasks.Parallel/src/System/Threading/Tasks/Parallel.ForEachAsync.cs +++ b/src/libraries/System.Threading.Tasks.Parallel/src/System/Threading/Tasks/Parallel.ForEachAsync.cs @@ -352,7 +352,7 @@ private abstract class ForEachAsyncState : TaskCompletionSource, IThrea /// The present at the time of the ForEachAsync invocation. This is only used if on the default scheduler. private readonly ExecutionContext? _executionContext; /// Semaphore used to provide exclusive access to the enumerator. - private readonly SemaphoreSlim _lock = new SemaphoreSlim(1, 1); + private readonly SemaphoreSlim _lock = new SemaphoreSlim(initialCount: 1, maxCount: 1); /// The number of outstanding workers. When this hits 0, the operation has completed. private int _completionRefCount;