From eba18ba5f4e23fe13b5357294372ac4b7d7860cc Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Tue, 18 Aug 2020 14:51:26 -0700 Subject: [PATCH] Patch TaskExtensions bug in /Azure.Messaging.EventHubs.Processor --- eng/Packages.Data.props | 2 +- .../Azure.Core/src/Shared/TaskExtensions.cs | 36 +--- .../Azure.Core/tests/TaskExtensionsTest.cs | 181 ------------------ .../CHANGELOG.md | 5 + ...Azure.Messaging.EventHubs.Processor.csproj | 4 +- 5 files changed, 15 insertions(+), 213 deletions(-) diff --git a/eng/Packages.Data.props b/eng/Packages.Data.props index 6e4e3497bccd..e5b77208bf90 100644 --- a/eng/Packages.Data.props +++ b/eng/Packages.Data.props @@ -11,7 +11,7 @@ - + diff --git a/sdk/core/Azure.Core/src/Shared/TaskExtensions.cs b/sdk/core/Azure.Core/src/Shared/TaskExtensions.cs index 61b215054ec0..a22738192f8b 100644 --- a/sdk/core/Azure.Core/src/Shared/TaskExtensions.cs +++ b/sdk/core/Azure.Core/src/Shared/TaskExtensions.cs @@ -25,11 +25,6 @@ public static T EnsureCompleted(this Task task) { #if DEBUG VerifyTaskCompleted(task.IsCompleted); -#else - if (HasSynchronizationContext()) - { - throw new InvalidOperationException("Synchronously waiting on non-completed task isn't allowed."); - } #endif #pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extension method instead. return task.GetAwaiter().GetResult(); @@ -40,11 +35,6 @@ public static void EnsureCompleted(this Task task) { #if DEBUG VerifyTaskCompleted(task.IsCompleted); -#else - if (HasSynchronizationContext()) - { - throw new InvalidOperationException("Synchronously waiting on non-completed task isn't allowed."); - } #endif #pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extension method instead. task.GetAwaiter().GetResult(); @@ -53,12 +43,9 @@ public static void EnsureCompleted(this Task task) public static T EnsureCompleted(this ValueTask task) { - if (!task.IsCompleted) - { -#pragma warning disable AZC0107 // public asynchronous method shouldn't be called in synchronous scope. Use synchronous version of the method if it is available. - return EnsureCompleted(task.AsTask()); -#pragma warning restore AZC0107 // public asynchronous method shouldn't be called in synchronous scope. Use synchronous version of the method if it is available. - } +#if DEBUG + VerifyTaskCompleted(task.IsCompleted); +#endif #pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extension method instead. return task.GetAwaiter().GetResult(); #pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extension method instead. @@ -66,18 +53,12 @@ public static T EnsureCompleted(this ValueTask task) public static void EnsureCompleted(this ValueTask task) { - if (!task.IsCompleted) - { -#pragma warning disable AZC0107 // public asynchronous method shouldn't be called in synchronous scope. Use synchronous version of the method if it is available. - EnsureCompleted(task.AsTask()); -#pragma warning restore AZC0107 // public asynchronous method shouldn't be called in synchronous scope. Use synchronous version of the method if it is available. - } - else - { +#if DEBUG + VerifyTaskCompleted(task.IsCompleted); +#endif #pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extension method instead. - task.GetAwaiter().GetResult(); + task.GetAwaiter().GetResult(); #pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extension method instead. - } } public static Enumerable EnsureSyncEnumerable(this IAsyncEnumerable asyncEnumerable) => new Enumerable(asyncEnumerable); @@ -120,9 +101,6 @@ private static void VerifyTaskCompleted(bool isCompleted) } } - private static bool HasSynchronizationContext() - => SynchronizationContext.Current != null && SynchronizationContext.Current.GetType() != typeof(SynchronizationContext) || TaskScheduler.Current != TaskScheduler.Default; - /// /// Both and are defined as public structs so that foreach can use duck typing /// to call and avoid heap memory allocation. diff --git a/sdk/core/Azure.Core/tests/TaskExtensionsTest.cs b/sdk/core/Azure.Core/tests/TaskExtensionsTest.cs index 9fe2dd399949..4ce2839ef361 100644 --- a/sdk/core/Azure.Core/tests/TaskExtensionsTest.cs +++ b/sdk/core/Azure.Core/tests/TaskExtensionsTest.cs @@ -4,7 +4,6 @@ using Azure.Core.Pipeline; using NUnit.Framework; using System; -using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; @@ -12,136 +11,6 @@ namespace Azure.Core.Tests { public class TaskExtensionsTest { - [Test] - public void TaskExtensions_TaskEnsureCompleted() - { - var task = Task.CompletedTask; - task.EnsureCompleted(); - } - - [Test] - public void TaskExtensions_TaskOfTEnsureCompleted() - { - var task = Task.FromResult(42); - Assert.AreEqual(42, task.EnsureCompleted()); - } - - [Test] - public void TaskExtensions_ValueTaskEnsureCompleted() - { - var task = new ValueTask(); - task.EnsureCompleted(); - } - - [Test] - public void TaskExtensions_ValueTaskOfTEnsureCompleted() - { - var task = new ValueTask(42); - Assert.AreEqual(42, task.EnsureCompleted()); - } - - [Test] - public async Task TaskExtensions_TaskEnsureCompleted_NotCompletedNoSyncContext() - { - var tcs = new TaskCompletionSource(); - Task task = tcs.Task; -#if DEBUG - Assert.Catch(() => task.EnsureCompleted()); - await Task.CompletedTask; -#else - Task runningTask = Task.Run(() => task.EnsureCompleted()); - Assert.IsFalse(runningTask.IsCompleted); - tcs.SetResult(0); - await runningTask; -#endif - } - - [Test] - public async Task TaskExtensions_TaskOfTEnsureCompleted_NotCompletedNoSyncContext() - { - var tcs = new TaskCompletionSource(); -#if DEBUG - Assert.Catch(() => tcs.Task.EnsureCompleted()); - await Task.CompletedTask; -#else - Task runningTask = Task.Run(() => tcs.Task.EnsureCompleted()); - Assert.IsFalse(runningTask.IsCompleted); - tcs.SetResult(42); - Assert.AreEqual(42, await runningTask); -#endif - } - - [Test] - public async Task TaskExtensions_ValueTaskEnsureCompleted_NotCompletedNoSyncContext() - { - var tcs = new TaskCompletionSource(); - ValueTask task = new ValueTask(tcs.Task); -#if DEBUG - Assert.Catch(() => task.EnsureCompleted()); - await Task.CompletedTask; -#else - Task runningTask = Task.Run(() => task.EnsureCompleted()); - Assert.IsFalse(runningTask.IsCompleted); - tcs.SetResult(0); - await runningTask; -#endif - } - - [Test] - public async Task TaskExtensions_ValueTaskOfTEnsureCompleted_NotCompletedNoSyncContext() - { - var tcs = new TaskCompletionSource(); - ValueTask task = new ValueTask(tcs.Task); -#if DEBUG - Assert.Catch(() => task.EnsureCompleted()); - await Task.CompletedTask; -#else - Task runningTask = Task.Run(() => task.EnsureCompleted()); - Assert.IsFalse(runningTask.IsCompleted); - tcs.SetResult(42); - Assert.AreEqual(42, await runningTask); -#endif - } - - [Test] - public void TaskExtensions_TaskEnsureCompleted_NotCompletedInSyncContext() - { - using SingleThreadedSynchronizationContext syncContext = new SingleThreadedSynchronizationContext(); - var tcs = new TaskCompletionSource(); - Task task = tcs.Task; - - syncContext.Post(t => { Assert.Catch(() => task.EnsureCompleted()); }, null); - } - - [Test] - public void TaskExtensions_TaskOfTEnsureCompleted_NotCompletedInSyncContext() - { - using SingleThreadedSynchronizationContext syncContext = new SingleThreadedSynchronizationContext(); - var tcs = new TaskCompletionSource(); - - syncContext.Post(t => { Assert.Catch(() => tcs.Task.EnsureCompleted()); }, null); - } - - [Test] - public void TaskExtensions_ValueTaskEnsureCompleted_NotCompletedInSyncContext() - { - using SingleThreadedSynchronizationContext syncContext = new SingleThreadedSynchronizationContext(); - var tcs = new TaskCompletionSource(); - ValueTask task = new ValueTask(tcs.Task); - - syncContext.Post(t => { Assert.Catch(() => task.EnsureCompleted()); }, null); - } - - [Test] - public void TaskExtensions_ValueTaskOfTEnsureCompleted_NotCompletedInSyncContext() - { - using SingleThreadedSynchronizationContext syncContext = new SingleThreadedSynchronizationContext(); - var tcs = new TaskCompletionSource(); - var task = new ValueTask(tcs.Task); - - syncContext.Post(t => { Assert.Catch(() => task.EnsureCompleted()); }, null); - } - [Test] public void TaskExtensions_TaskWithCancellationDefault() { @@ -323,55 +192,5 @@ public void TaskExtensions_ValueTaskWithCancellationFailedAfterContinuationSched Assert.AreEqual(true, awaiter.IsCompleted); Assert.Catch(() => awaiter.GetResult(), "Error"); } - - private sealed class SingleThreadedSynchronizationContext : SynchronizationContext, IDisposable - { - private readonly Task _task; - private readonly BlockingCollection _queue; - private readonly ConcurrentQueue _exceptions; - - public SingleThreadedSynchronizationContext() - { - _queue = new BlockingCollection(); - _exceptions = new ConcurrentQueue(); - _task = Task.Run(RunLoop); - } - - private void RunLoop() - { - try - { - SetSynchronizationContext(this); - while (!_queue.IsCompleted) - { - Action action = _queue.Take(); - try - { - action(); - } - catch (Exception e) - { - _exceptions.Enqueue(e); - } - } - } - catch (InvalidOperationException) { } - catch (OperationCanceledException) { } - finally - { - SetSynchronizationContext(null); - } - } - - public override void Post(SendOrPostCallback d, object state) => _queue.Add(() => d(state)); - - public void Dispose() - { - _queue.CompleteAdding(); - _task.Wait(); - } - - public AggregateException Exceptions => new AggregateException(_exceptions); - } } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md index f8dfcac52d4a..bf5988344691 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md @@ -1,5 +1,10 @@ # Release History +## 5.2.0-preview.3 (2020-08-18) + +### Fixed +- Bug in TaskExtensions.EnsureCompleted method that causes it to unconditionally throw an exception in the environments with synchronization context + ## 5.2.0-preview.2 (2020-08-10) ### Acknowledgments diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj index e02ec4f44043..c79e1749b6bd 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj @@ -1,7 +1,7 @@ Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them to multiple consumers. This library extends its Event Processor with durable storage for checkpoint information using Azure Blob storage. For more information about Event Hubs, see https://azure.microsoft.com/en-us/services/event-hubs/ - 5.2.0-preview.2 + 5.2.0-preview.3 5.1.0 Azure;Event Hubs;EventHubs;.NET;Event Processor;EventProcessor;$(PackageCommonTags) $(RequiredTargetFrameworks) @@ -12,7 +12,7 @@ - +