diff --git a/src/libraries/System.Threading.Tasks.Dataflow/src/Base/DataflowBlock.cs b/src/libraries/System.Threading.Tasks.Dataflow/src/Base/DataflowBlock.cs index 63cdedf4b466a6..7007b0c80f5f7a 100644 --- a/src/libraries/System.Threading.Tasks.Dataflow/src/Base/DataflowBlock.cs +++ b/src/libraries/System.Threading.Tasks.Dataflow/src/Base/DataflowBlock.cs @@ -1434,63 +1434,48 @@ public static Task OutputAvailableAsync(this ISourceBlock OutputAvailableAsync( this ISourceBlock source, CancellationToken cancellationToken) { - if (source is null) - { - throw new ArgumentNullException(nameof(source)); - } - - // Fast path for cancellation - if (cancellationToken.IsCancellationRequested) - return Common.CreateTaskFromCancellation(cancellationToken); - - // In a method like this, normally we would want to check source.Completion.IsCompleted - // and avoid linking completely by simply returning a completed task. However, - // some blocks that are completed still have data available, like WriteOnceBlock, - // which completes as soon as it gets a value and stores that value forever. - // As such, OutputAvailableAsync must link from the source so that the source - // can push data to us if it has it, at which point we can immediately unlink. + return + source is null ? throw new ArgumentNullException(nameof(source)) : + cancellationToken.IsCancellationRequested ? Common.CreateTaskFromCancellation(cancellationToken) : + Impl(source, cancellationToken); - // Create a target task that will complete when it's offered a message (but it won't accept the message) - var target = new OutputAvailableAsyncTarget(); - try + static async Task Impl(ISourceBlock source, CancellationToken cancellationToken) { - // Link from the source. If the source propagates a message during or immediately after linking - // such that our target is already completed, just return its task. - target._unlinker = source.LinkTo(target, DataflowLinkOptions.UnlinkAfterOneAndPropagateCompletion); + // In a method like this, normally we would want to check source.Completion.IsCompleted + // and avoid linking completely by simply returning a completed task. However, + // some blocks that are completed still have data available, like WriteOnceBlock, + // which completes as soon as it gets a value and stores that value forever. + // As such, OutputAvailableAsync must link from the source so that the source + // can push data to us if it has it, at which point we can immediately unlink. - // If the task is already completed (an exception may have occurred, or the source may have propagated - // a message to the target during LinkTo or soon thereafter), just return the task directly. - if (target.Task.IsCompleted) - { - return target.Task; - } + // Create a target task that will complete when it's offered a message (but it won't accept the message) + var target = new OutputAvailableAsyncTarget(); - // If cancellation could be requested, hook everything up to be notified of cancellation requests. - if (cancellationToken.CanBeCanceled) + // Link from the source. + using (source.LinkTo(target, DataflowLinkOptions.UnlinkAfterOneAndPropagateCompletion)) { - // When cancellation is requested, unlink the target from the source and cancel the target. - target._ctr = cancellationToken.Register( + CancellationTokenRegistration registration = default; + try + { + // Register for cancellation if the target isn't already completed (the source may have propagated + // a message to the target during LinkTo or soon thereafter). + if (!target.Task.IsCompleted) + { + registration = #if NET6_0_OR_GREATER - OutputAvailableAsyncTarget.CancelAndUnlink, + cancellationToken.UnsafeRegister(static (state, cancellationToken) => ((OutputAvailableAsyncTarget)state!).TrySetCanceled(cancellationToken), target); #else - static state => OutputAvailableAsyncTarget.CancelAndUnlink(state, default), + cancellationToken.Register(static state => ((OutputAvailableAsyncTarget)state!).TrySetCanceled(), target); #endif - target); - } - - return target.Task; - } - catch (Exception exc) - { - // Source.LinkTo could throw, as could cancellationToken.Register if cancellation was already requested - // such that it synchronously invokes the source's unlinker IDisposable, which could throw. - target.TrySetException(exc); - - // Undo the link from the source to the target - target.AttemptThreadSafeUnlink(); + } - // Return the now faulted task - return target.Task; + return await target.Task.ConfigureAwait(false); + } + finally + { + registration.Dispose(); + } + } } } @@ -1504,46 +1489,6 @@ public OutputAvailableAsyncTarget() : { } - /// - /// Cached continuation delegate that unregisters from cancellation and - /// marshals the antecedent's result to the return value. - /// - internal static readonly Func, object?, bool> s_handleCompletion = (antecedent, state) => - { - var target = state as OutputAvailableAsyncTarget; - Debug.Assert(target != null, "Expected non-null target"); - target._ctr.Dispose(); - return antecedent.GetAwaiter().GetResult(); - }; - - /// Cancels the target and unlinks the target from the source. - /// An OutputAvailableAsyncTarget. - /// The token that triggered cancellation - internal static void CancelAndUnlink(object? state, CancellationToken cancellationToken) - { - var target = state as OutputAvailableAsyncTarget; - Debug.Assert(target != null, "Expected a non-null target"); - - target.TrySetCanceled(cancellationToken); - target.AttemptThreadSafeUnlink(); - } - - /// Disposes of _unlinker if the target has been linked. - internal void AttemptThreadSafeUnlink() - { - // A race is possible. Therefore use an interlocked operation. - IDisposable? cachedUnlinker = _unlinker; - if (cachedUnlinker != null && Interlocked.CompareExchange(ref _unlinker, null, cachedUnlinker) == cachedUnlinker) - { - cachedUnlinker.Dispose(); - } - } - - /// The IDisposable used to unlink this target from its source. - internal IDisposable? _unlinker; - /// The registration used to unregister this target from the cancellation token. - internal CancellationTokenRegistration _ctr; - /// Completes the task when offered a message (but doesn't consume the message). DataflowMessageStatus ITargetBlock.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock? source, bool consumeToAccept) { @@ -1551,14 +1496,12 @@ DataflowMessageStatus ITargetBlock.OfferMessage(DataflowMessageHeader message if (source == null) throw new ArgumentNullException(nameof(source)); TrySetResult(true); + return DataflowMessageStatus.DecliningPermanently; } /// - void IDataflowBlock.Complete() - { - TrySetResult(false); - } + void IDataflowBlock.Complete() => TrySetResult(false); /// void IDataflowBlock.Fault(Exception exception) @@ -1572,13 +1515,13 @@ void IDataflowBlock.Fault(Exception exception) } /// - Task IDataflowBlock.Completion { get { throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); } } + Task IDataflowBlock.Completion => throw new NotSupportedException(SR.NotSupported_MemberNotNeeded); /// The data to display in the debugger display attribute. private object DebuggerDisplayContent => $"{Common.GetNameForDebugger(this)} IsCompleted = {base.Task.IsCompleted}"; /// Gets the data to display in the debugger display attribute for this instance. - object IDebuggerDisplay.Content { get { return DebuggerDisplayContent; } } + object IDebuggerDisplay.Content => DebuggerDisplayContent; } #endregion diff --git a/src/libraries/System.Threading.Tasks.Dataflow/src/System.Threading.Tasks.Dataflow.csproj b/src/libraries/System.Threading.Tasks.Dataflow/src/System.Threading.Tasks.Dataflow.csproj index 9835aa30e3b35a..3a4156130e82f9 100644 --- a/src/libraries/System.Threading.Tasks.Dataflow/src/System.Threading.Tasks.Dataflow.csproj +++ b/src/libraries/System.Threading.Tasks.Dataflow/src/System.Threading.Tasks.Dataflow.csproj @@ -2,6 +2,8 @@ $(NetCoreAppCurrent);$(NetCoreAppPrevious);$(NetCoreAppMinimum);netstandard2.1;netstandard2.0;$(NetFrameworkMinimum) true + true + 1 TPL Dataflow promotes actor/agent-oriented designs through primitives for in-process message passing, dataflow, and pipelining. TDF builds upon the APIs and scheduling infrastructure provided by the Task Parallel Library (TPL), and integrates with the language support for asynchrony provided by C#, Visual Basic, and F#. Commonly Used Types: