Skip to content

Commit 220437e

Browse files
Use asynchronous lock in Parallel.ForEachAsync with synchronous enumerable (#82501)
* Use asynchronous lock in Parallel.ForEachAsync with synchronous enumerable Avoid blocking threads while waiting for access to the enumerator in the case of a slower MoveNext. * Update src/libraries/System.Threading.Tasks.Parallel/src/System/Threading/Tasks/Parallel.ForEachAsync.cs Co-authored-by: Tanner Gooding <[email protected]> --------- Co-authored-by: Tanner Gooding <[email protected]>
1 parent 72c9c36 commit 220437e

File tree

1 file changed

+31
-17
lines changed

1 file changed

+31
-17
lines changed

src/libraries/System.Threading.Tasks.Parallel/src/System/Threading/Tasks/Parallel.ForEachAsync.cs

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -93,17 +93,23 @@ private static Task ForEachAsync<TSource>(IEnumerable<TSource> source, int dop,
9393
// Continue to loop while there are more elements to be processed.
9494
while (!state.Cancellation.IsCancellationRequested)
9595
{
96-
// Get the next element from the enumerator. This requires asynchronously locking around MoveNextAsync/Current.
96+
// Get the next element from the enumerator. This requires asynchronously locking around MoveNext/Current.
9797
TSource element;
98-
lock (state)
98+
await state.AcquireLock();
99+
try
99100
{
100-
if (!state.Enumerator.MoveNext())
101+
if (state.Cancellation.IsCancellationRequested || // check now that the lock has been acquired
102+
!state.Enumerator.MoveNext())
101103
{
102104
break;
103105
}
104106

105107
element = state.Enumerator.Current;
106108
}
109+
finally
110+
{
111+
state.ReleaseLock();
112+
}
107113

108114
// If the remaining dop allows it and we've not yet queued the next worker, do so now. We wait
109115
// until after we've grabbed an item from the enumerator to a) avoid unnecessary contention on the
@@ -249,20 +255,11 @@ private static Task ForEachAsync<TSource>(IAsyncEnumerable<TSource> source, int
249255
{
250256
// Get the next element from the enumerator. This requires asynchronously locking around MoveNextAsync/Current.
251257
TSource element;
258+
await state.AcquireLock();
252259
try
253260
{
254-
// TODO https://github.com/dotnet/runtime/issues/22144:
255-
// Use a no-throwing await if/when one is available built-in.
256-
await state.Lock.WaitAsync(state.Cancellation.Token);
257-
}
258-
catch (OperationCanceledException)
259-
{
260-
break;
261-
}
262-
263-
try
264-
{
265-
if (!await state.Enumerator.MoveNextAsync())
261+
if (state.Cancellation.IsCancellationRequested || // check now that the lock has been acquired
262+
!await state.Enumerator.MoveNextAsync())
266263
{
267264
break;
268265
}
@@ -271,7 +268,7 @@ private static Task ForEachAsync<TSource>(IAsyncEnumerable<TSource> source, int
271268
}
272269
finally
273270
{
274-
state.Lock.Release();
271+
state.ReleaseLock();
275272
}
276273

277274
// If the remaining dop allows it and we've not yet queued the next worker, do so now. We wait
@@ -354,6 +351,8 @@ private abstract class ForEachAsyncState<TSource> : TaskCompletionSource, IThrea
354351
private readonly TaskScheduler _scheduler;
355352
/// <summary>The <see cref="ExecutionContext"/> present at the time of the ForEachAsync invocation. This is only used if on the default scheduler.</summary>
356353
private readonly ExecutionContext? _executionContext;
354+
/// <summary>Semaphore used to provide exclusive access to the enumerator.</summary>
355+
private readonly SemaphoreSlim _lock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
357356

358357
/// <summary>The number of outstanding workers. When this hits 0, the operation has completed.</summary>
359358
private int _completionRefCount;
@@ -417,6 +416,21 @@ public void QueueWorkerIfDopAvailable()
417416
/// <returns>true if this is the last worker to complete iterating; otherwise, false.</returns>
418417
public bool SignalWorkerCompletedIterating() => Interlocked.Decrement(ref _completionRefCount) == 0;
419418

419+
/// <summary>Asynchronously acquires exclusive access to the enumerator.</summary>
420+
public Task AcquireLock() =>
421+
// We explicitly don't pass this.Cancellation to WaitAsync. Doing so adds overhead, and it isn't actually
422+
// necessary. All of the operations that monitor the lock are part of the same ForEachAsync operation, and the Task
423+
// returned from ForEachAsync can't complete until all of the constituent operations have completed, including whoever
424+
// holds the lock while this worker is waiting on the lock. Thus, the lock will need to be released for the overall
425+
// operation to complete. Passing the token would allow the overall operation to potentially complete a bit faster in
426+
// the face of cancellation, in exchange for making it a bit slower / more overhead in the common case of cancellation
427+
// not being requested. We want to optimize for the latter. This also then avoids an exception throw / catch when
428+
// cancellation is requested.
429+
_lock.WaitAsync(CancellationToken.None);
430+
431+
/// <summary>Relinquishes exclusive access to the enumerator.</summary>
432+
public void ReleaseLock() => _lock.Release();
433+
420434
/// <summary>Stores an exception and triggers cancellation in order to alert all workers to stop as soon as possible.</summary>
421435
/// <param name="e">The exception.</param>
422436
public void RecordException(Exception e)
@@ -444,6 +458,7 @@ public void Complete()
444458
else if (_exceptions is null)
445459
{
446460
// Everything completed successfully.
461+
Debug.Assert(!Cancellation.IsCancellationRequested);
447462
taskSet = TrySetResult();
448463
}
449464
else
@@ -500,7 +515,6 @@ public void Dispose()
500515
/// <typeparam name="TSource">Specifies the type of data being enumerated.</typeparam>
501516
private sealed class AsyncForEachAsyncState<TSource> : ForEachAsyncState<TSource>, IAsyncDisposable
502517
{
503-
public readonly SemaphoreSlim Lock = new SemaphoreSlim(1, 1);
504518
public readonly IAsyncEnumerator<TSource> Enumerator;
505519

506520
public AsyncForEachAsyncState(

0 commit comments

Comments
 (0)