Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 53 additions & 66 deletions src/HotChocolate/Core/src/Types/Fetching/AsyncAutoResetEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,90 +5,69 @@ namespace HotChocolate.Fetching;

internal sealed class AsyncAutoResetEvent : INotifyCompletion
{
private const int Idle = 0;
private const int Signaled = 1;
private const int Waiting = 2;

private int _state;
#if NET9_0_OR_GREATER
private readonly Lock _sync = new();
#else
private readonly object _sync = new();
#endif
private Action? _continuation;
private bool _isSignaled;

public bool IsSignaled => Volatile.Read(ref _state) == Signaled;
public bool IsSignaled => Volatile.Read(ref _isSignaled);

public bool IsCompleted => Volatile.Read(ref _state) == Signaled;
public bool IsCompleted => false;

public void GetResult()
{
// Consume the signal when completing synchronously (via IsCompleted == true).
// CAS failure is benign (e.g. TryResetToIdle cleared it first).
Interlocked.CompareExchange(ref _state, Idle, Signaled);
}
public void GetResult() { }

public void OnCompleted(Action continuation)
{
Debug.Assert(_continuation is null, "There should only be one awaiter.");
_continuation = continuation;
bool wasSignaled;

while (true)
lock (_sync)
{
switch (Volatile.Read(ref _state))
{
case Idle:
// Register waiter: IDLE -> WAITING
if (Interlocked.CompareExchange(ref _state, Waiting, Idle) == Idle)
{
return;
}
break; // CAS failed, retry

case Signaled:
// Consume signal immediately: SIGNALED -> IDLE
if (Interlocked.CompareExchange(ref _state, Idle, Signaled) == Signaled)
{
_continuation = null;
ThreadPool.QueueUserWorkItem(static c => c(), continuation, preferLocal: true);
return;
}
break; // CAS failed, retry
wasSignaled = _isSignaled;

default:
Debug.Fail("OnCompleted called while already waiting.");
return;
if (wasSignaled)
{
// consume the signal
_isSignaled = false;
}
else
{
Debug.Assert(_continuation is null, "There should only be one awaiter.");
_continuation = continuation;
}
}

if (wasSignaled)
{
ThreadPool.QueueUserWorkItem(static c => c(), continuation, preferLocal: true);
}
}

public void Set()
{
while (true)
Action? continuation = null;

lock (_sync)
{
switch (Volatile.Read(ref _state))
if (_continuation is not null)
{
case Idle:
// Store signal: IDLE -> SIGNALED
if (Interlocked.CompareExchange(ref _state, Signaled, Idle) == Idle)
{
return;
}
break; // CAS failed, retry

case Waiting:
// Wake waiter: WAITING -> IDLE
if (Interlocked.CompareExchange(ref _state, Idle, Waiting) == Waiting)
{
var c = _continuation!;
_continuation = null;
ThreadPool.QueueUserWorkItem(static c => c(), c, preferLocal: true);
return;
}
break; // CAS failed, retry

case Signaled:
// Already signaled, nothing to do
return;

default:
return;
// someone is waiting - release them immediately
// we don't set _isSignaled since we're consuming it immediately
continuation = _continuation;
_continuation = null;
}
else
{
// since no one waiting we are storing the signal for the next awaiter
_isSignaled = true;
}
}

if (continuation is not null)
{
ThreadPool.QueueUserWorkItem(static c => c(), continuation, preferLocal: true);
}
}

Expand All @@ -98,7 +77,15 @@ public void Set()
/// </summary>
public bool TryResetToIdle()
{
return Interlocked.CompareExchange(ref _state, Idle, Signaled) == Signaled;
lock (_sync)
{
if (_continuation is null && _isSignaled)
{
_isSignaled = false;
return true;
}
return false;
}
}

public AsyncAutoResetEvent GetAwaiter() => this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,90 +5,69 @@ namespace HotChocolate.Fusion.Execution;

internal sealed class AsyncAutoResetEvent : INotifyCompletion
{
private const int Idle = 0;
private const int Signaled = 1;
private const int Waiting = 2;

private int _state;
#if NET9_0_OR_GREATER
private readonly Lock _sync = new();
#else
private readonly object _sync = new();
#endif
private Action? _continuation;
private bool _isSignaled;

public bool IsSignaled => Volatile.Read(ref _state) == Signaled;
public bool IsSignaled => Volatile.Read(ref _isSignaled);

public bool IsCompleted => Volatile.Read(ref _state) == Signaled;
public bool IsCompleted => false;

public void GetResult()
{
// Consume the signal when completing synchronously (via IsCompleted == true).
// CAS failure is benign (e.g. TryResetToIdle cleared it first).
Interlocked.CompareExchange(ref _state, Idle, Signaled);
}
public void GetResult() { }

public void OnCompleted(Action continuation)
{
Debug.Assert(_continuation is null, "There should only be one awaiter.");
_continuation = continuation;
bool wasSignaled;

while (true)
lock (_sync)
{
switch (Volatile.Read(ref _state))
{
case Idle:
// Register waiter: IDLE -> WAITING
if (Interlocked.CompareExchange(ref _state, Waiting, Idle) == Idle)
{
return;
}
break; // CAS failed, retry

case Signaled:
// Consume signal immediately: SIGNALED -> IDLE
if (Interlocked.CompareExchange(ref _state, Idle, Signaled) == Signaled)
{
_continuation = null;
ThreadPool.QueueUserWorkItem(static c => c(), continuation, preferLocal: true);
return;
}
break; // CAS failed, retry
wasSignaled = _isSignaled;

default:
Debug.Fail("OnCompleted called while already waiting.");
return;
if (wasSignaled)
{
// consume the signal
_isSignaled = false;
}
else
{
Debug.Assert(_continuation is null, "There should only be one awaiter.");
_continuation = continuation;
}
}

if (wasSignaled)
{
ThreadPool.QueueUserWorkItem(static c => c(), continuation, preferLocal: true);
}
}

public void Set()
{
while (true)
Action? continuation = null;

lock (_sync)
{
switch (Volatile.Read(ref _state))
if (_continuation is not null)
{
case Idle:
// Store signal: IDLE -> SIGNALED
if (Interlocked.CompareExchange(ref _state, Signaled, Idle) == Idle)
{
return;
}
break; // CAS failed, retry

case Waiting:
// Wake waiter: WAITING -> IDLE
if (Interlocked.CompareExchange(ref _state, Idle, Waiting) == Waiting)
{
var c = _continuation!;
_continuation = null;
ThreadPool.QueueUserWorkItem(static c => c(), c, preferLocal: true);
return;
}
break; // CAS failed, retry

case Signaled:
// Already signaled, nothing to do
return;

default:
return;
// someone is waiting - release them immediately
// we don't set _isSignaled since we're consuming it immediately
continuation = _continuation;
_continuation = null;
}
else
{
// since no one waiting we are storing the signal for the next awaiter
_isSignaled = true;
}
}

if (continuation is not null)
{
ThreadPool.QueueUserWorkItem(static c => c(), continuation, preferLocal: true);
}
}

Expand All @@ -98,7 +77,15 @@ public void Set()
/// </summary>
public bool TryResetToIdle()
{
return Interlocked.CompareExchange(ref _state, Idle, Signaled) == Signaled;
lock (_sync)
{
if (_continuation is null && _isSignaled)
{
_isSignaled = false;
return true;
}
return false;
}
}

public AsyncAutoResetEvent GetAwaiter() => this;
Expand Down
Loading