diff --git a/src/HotChocolate/Core/src/Types/Fetching/AsyncAutoResetEvent.cs b/src/HotChocolate/Core/src/Types/Fetching/AsyncAutoResetEvent.cs index e83623d3ed4..2e2d57f8b2e 100644 --- a/src/HotChocolate/Core/src/Types/Fetching/AsyncAutoResetEvent.cs +++ b/src/HotChocolate/Core/src/Types/Fetching/AsyncAutoResetEvent.cs @@ -5,90 +5,69 @@ namespace HotChocolate.Fetching; internal sealed class AsyncAutoResetEvent : INotifyCompletion { - private const int Idle = 0; - private const int Signaled = 1; - private const int Waiting = 2; - - private int _state; +#if NET9_0_OR_GREATER + private readonly Lock _sync = new(); +#else + private readonly object _sync = new(); +#endif private Action? _continuation; + private bool _isSignaled; - public bool IsSignaled => Volatile.Read(ref _state) == Signaled; + public bool IsSignaled => Volatile.Read(ref _isSignaled); - public bool IsCompleted => Volatile.Read(ref _state) == Signaled; + public bool IsCompleted => false; - public void GetResult() - { - // Consume the signal when completing synchronously (via IsCompleted == true). - // CAS failure is benign (e.g. TryResetToIdle cleared it first). - Interlocked.CompareExchange(ref _state, Idle, Signaled); - } + public void GetResult() { } public void OnCompleted(Action continuation) { - Debug.Assert(_continuation is null, "There should only be one awaiter."); - _continuation = continuation; + bool wasSignaled; - while (true) + lock (_sync) { - switch (Volatile.Read(ref _state)) - { - case Idle: - // Register waiter: IDLE -> WAITING - if (Interlocked.CompareExchange(ref _state, Waiting, Idle) == Idle) - { - return; - } - break; // CAS failed, retry - - case Signaled: - // Consume signal immediately: SIGNALED -> IDLE - if (Interlocked.CompareExchange(ref _state, Idle, Signaled) == Signaled) - { - _continuation = null; - ThreadPool.QueueUserWorkItem(static c => c(), continuation, preferLocal: true); - return; - } - break; // CAS failed, retry + wasSignaled = _isSignaled; - default: - Debug.Fail("OnCompleted called while already waiting."); - return; + if (wasSignaled) + { + // consume the signal + _isSignaled = false; } + else + { + Debug.Assert(_continuation is null, "There should only be one awaiter."); + _continuation = continuation; + } + } + + if (wasSignaled) + { + ThreadPool.QueueUserWorkItem(static c => c(), continuation, preferLocal: true); } } public void Set() { - while (true) + Action? continuation = null; + + lock (_sync) { - switch (Volatile.Read(ref _state)) + if (_continuation is not null) { - case Idle: - // Store signal: IDLE -> SIGNALED - if (Interlocked.CompareExchange(ref _state, Signaled, Idle) == Idle) - { - return; - } - break; // CAS failed, retry - - case Waiting: - // Wake waiter: WAITING -> IDLE - if (Interlocked.CompareExchange(ref _state, Idle, Waiting) == Waiting) - { - var c = _continuation!; - _continuation = null; - ThreadPool.QueueUserWorkItem(static c => c(), c, preferLocal: true); - return; - } - break; // CAS failed, retry - - case Signaled: - // Already signaled, nothing to do - return; - - default: - return; + // someone is waiting - release them immediately + // we don't set _isSignaled since we're consuming it immediately + continuation = _continuation; + _continuation = null; } + else + { + // since no one waiting we are storing the signal for the next awaiter + _isSignaled = true; + } + } + + if (continuation is not null) + { + ThreadPool.QueueUserWorkItem(static c => c(), continuation, preferLocal: true); } } @@ -98,7 +77,15 @@ public void Set() /// public bool TryResetToIdle() { - return Interlocked.CompareExchange(ref _state, Idle, Signaled) == Signaled; + lock (_sync) + { + if (_continuation is null && _isSignaled) + { + _isSignaled = false; + return true; + } + return false; + } } public AsyncAutoResetEvent GetAwaiter() => this; diff --git a/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/AsyncAutoResetEvent.cs b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/AsyncAutoResetEvent.cs index 30c34ba8d23..7e8c7e9b07e 100644 --- a/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/AsyncAutoResetEvent.cs +++ b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/AsyncAutoResetEvent.cs @@ -5,90 +5,69 @@ namespace HotChocolate.Fusion.Execution; internal sealed class AsyncAutoResetEvent : INotifyCompletion { - private const int Idle = 0; - private const int Signaled = 1; - private const int Waiting = 2; - - private int _state; +#if NET9_0_OR_GREATER + private readonly Lock _sync = new(); +#else + private readonly object _sync = new(); +#endif private Action? _continuation; + private bool _isSignaled; - public bool IsSignaled => Volatile.Read(ref _state) == Signaled; + public bool IsSignaled => Volatile.Read(ref _isSignaled); - public bool IsCompleted => Volatile.Read(ref _state) == Signaled; + public bool IsCompleted => false; - public void GetResult() - { - // Consume the signal when completing synchronously (via IsCompleted == true). - // CAS failure is benign (e.g. TryResetToIdle cleared it first). - Interlocked.CompareExchange(ref _state, Idle, Signaled); - } + public void GetResult() { } public void OnCompleted(Action continuation) { - Debug.Assert(_continuation is null, "There should only be one awaiter."); - _continuation = continuation; + bool wasSignaled; - while (true) + lock (_sync) { - switch (Volatile.Read(ref _state)) - { - case Idle: - // Register waiter: IDLE -> WAITING - if (Interlocked.CompareExchange(ref _state, Waiting, Idle) == Idle) - { - return; - } - break; // CAS failed, retry - - case Signaled: - // Consume signal immediately: SIGNALED -> IDLE - if (Interlocked.CompareExchange(ref _state, Idle, Signaled) == Signaled) - { - _continuation = null; - ThreadPool.QueueUserWorkItem(static c => c(), continuation, preferLocal: true); - return; - } - break; // CAS failed, retry + wasSignaled = _isSignaled; - default: - Debug.Fail("OnCompleted called while already waiting."); - return; + if (wasSignaled) + { + // consume the signal + _isSignaled = false; } + else + { + Debug.Assert(_continuation is null, "There should only be one awaiter."); + _continuation = continuation; + } + } + + if (wasSignaled) + { + ThreadPool.QueueUserWorkItem(static c => c(), continuation, preferLocal: true); } } public void Set() { - while (true) + Action? continuation = null; + + lock (_sync) { - switch (Volatile.Read(ref _state)) + if (_continuation is not null) { - case Idle: - // Store signal: IDLE -> SIGNALED - if (Interlocked.CompareExchange(ref _state, Signaled, Idle) == Idle) - { - return; - } - break; // CAS failed, retry - - case Waiting: - // Wake waiter: WAITING -> IDLE - if (Interlocked.CompareExchange(ref _state, Idle, Waiting) == Waiting) - { - var c = _continuation!; - _continuation = null; - ThreadPool.QueueUserWorkItem(static c => c(), c, preferLocal: true); - return; - } - break; // CAS failed, retry - - case Signaled: - // Already signaled, nothing to do - return; - - default: - return; + // someone is waiting - release them immediately + // we don't set _isSignaled since we're consuming it immediately + continuation = _continuation; + _continuation = null; } + else + { + // since no one waiting we are storing the signal for the next awaiter + _isSignaled = true; + } + } + + if (continuation is not null) + { + ThreadPool.QueueUserWorkItem(static c => c(), continuation, preferLocal: true); } } @@ -98,7 +77,15 @@ public void Set() /// public bool TryResetToIdle() { - return Interlocked.CompareExchange(ref _state, Idle, Signaled) == Signaled; + lock (_sync) + { + if (_continuation is null && _isSignaled) + { + _isSignaled = false; + return true; + } + return false; + } } public AsyncAutoResetEvent GetAwaiter() => this;