Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.CooperativeBlocking);
if (counts.NumProcessingWork >= numThreadsGoal && _separated.numRequestedWorkers > 0)

if (counts.NumProcessingWork >= numThreadsGoal && _separated._hasOutstandingThreadRequest != 0)
{
addWorker = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private static void GateThreadStart()

if (!disableStarvationDetection &&
threadPoolInstance._pendingBlockingAdjustment == PendingBlockingAdjustment.None &&
threadPoolInstance._separated.numRequestedWorkers > 0 &&
threadPoolInstance._separated._hasOutstandingThreadRequest != 0 &&
SufficientDelaySinceLastDequeue(threadPoolInstance))
{
bool addWorker = false;
Expand Down Expand Up @@ -187,7 +187,7 @@ private static void GateThreadStart()
}
}

if (threadPoolInstance._separated.numRequestedWorkers <= 0 &&
if (threadPoolInstance._separated._hasOutstandingThreadRequest == 0 &&
threadPoolInstance._pendingBlockingAdjustment == PendingBlockingAdjustment.None &&
Interlocked.Decrement(ref threadPoolInstance._separated.gateThreadRunningState) <= GetRunningStateForNumRuns(0))
{
Expand All @@ -208,7 +208,7 @@ public static void Wake(PortableThreadPool threadPoolInstance)
// in deciding "too long"
private static bool SufficientDelaySinceLastDequeue(PortableThreadPool threadPoolInstance)
{
uint delay = (uint)(Environment.TickCount - threadPoolInstance._separated.lastDequeueTime);
uint delay = (uint)(Environment.TickCount - threadPoolInstance._separated.lastDispatchTime);
uint minimumDelay;
if (threadPoolInstance._cpuUtilization < CpuUtilizationLow)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ public short NumProcessingWork
}
}

/// <summary>
/// Reduces the number of threads processing work items by one.
/// </summary>
public void DecrementProcessingWork()
{
// This should never underflow
Debug.Assert(NumProcessingWork > 0);
Interlocked.Decrement(ref _data);
Debug.Assert(NumProcessingWork >= 0);
}

/// <summary>
/// Number of thread pool threads that currently exist.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;
using System.Diagnostics.Tracing;
using System.Runtime.CompilerServices;

Expand Down Expand Up @@ -118,20 +119,26 @@ private static void WorkerThreadStart()
WorkerDoWork(threadPoolInstance, ref spinWait);
}

// We've timed out waiting on the semaphore. Time to exit.
// In rare cases we may be asked to keep running/waiting.
if (ShouldExitWorker(threadPoolInstance, threadAdjustmentLock))
{
break;
}
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void WorkerDoWork(PortableThreadPool threadPoolInstance, ref bool spinWait)
{
bool alreadyRemovedWorkingWorker = false;
while (TakeActiveRequest(threadPoolInstance))

// We allow the requests to be "stolen" by threads who are done dispatching,
// thus it is not guaranteed at this point that there is a request.
while (threadPoolInstance._separated._hasOutstandingThreadRequest != 0 &&
Interlocked.Exchange(ref threadPoolInstance._separated._hasOutstandingThreadRequest, 0) != 0)
{
threadPoolInstance._separated.lastDequeueTime = Environment.TickCount;
// We took the request, now we must Dispatch some work items.
threadPoolInstance.NotifyDispatchProgress(Environment.TickCount);
if (!ThreadPoolWorkQueue.Dispatch())
{
// ShouldStopProcessingWorkNow() caused the thread to stop processing work, and it would have
Expand All @@ -141,7 +148,7 @@ private static void WorkerDoWork(PortableThreadPool threadPoolInstance, ref bool
break;
}

if (threadPoolInstance._separated.numRequestedWorkers <= 0)
if (threadPoolInstance._separated._hasOutstandingThreadRequest == 0)
{
break;
}
Expand Down Expand Up @@ -175,7 +182,6 @@ private static void WorkerDoWork(PortableThreadPool threadPoolInstance, ref bool

// returns true if the worker is shutting down
// returns false if we should do another iteration
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static bool ShouldExitWorker(PortableThreadPool threadPoolInstance, LowLevelLock threadAdjustmentLock)
{
// The thread cannot exit if it has IO pending, otherwise the IO may be canceled
Expand Down Expand Up @@ -239,29 +245,17 @@ private static bool ShouldExitWorker(PortableThreadPool threadPoolInstance, LowL
/// </summary>
private static void RemoveWorkingWorker(PortableThreadPool threadPoolInstance)
{
// A compare-exchange loop is used instead of Interlocked.Decrement or Interlocked.Add to defensively prevent
// NumProcessingWork from underflowing. See the setter for NumProcessingWork.
ThreadCounts counts = threadPoolInstance._separated.counts;
while (true)
{
ThreadCounts newCounts = counts;
newCounts.NumProcessingWork--;
threadPoolInstance._separated.counts.DecrementProcessingWork();

ThreadCounts countsBeforeUpdate =
threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (countsBeforeUpdate == counts)
{
break;
}
// It's possible that a thread request was not actionable due to being at max active threads.
// Now we may be able to add a thread and if we can we must add.
// Otherwise, if all workers happen to quit at once, there will be noone left running while
// there is a request.

counts = countsBeforeUpdate;
}
// NOTE: The request check must happen after the active count was updated,
// which the interlocked decrement guarantees.

// It's possible that we decided we had thread requests just before a request came in,
// but reduced the worker count *after* the request came in. In this case, we might
// miss the notification of a thread request. So we wake up a thread (maybe this one!)
// if there is work to do.
if (threadPoolInstance._separated.numRequestedWorkers > 0)
if (threadPoolInstance._separated._hasOutstandingThreadRequest != 0)
{
MaybeAddWorkingWorker(threadPoolInstance);
}
Expand Down Expand Up @@ -297,18 +291,14 @@ internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance
counts = oldCounts;
}

int toCreate = newNumExistingThreads - numExistingThreads;
int toRelease = newNumProcessingWork - numProcessingWork;
Debug.Assert(newNumProcessingWork - numProcessingWork == 1);
s_semaphore.Release(1);

if (toRelease > 0)
{
s_semaphore.Release(toRelease);
}

while (toCreate > 0)
int toCreate = newNumExistingThreads - numExistingThreads;
Debug.Assert(toCreate == 0 || toCreate == 1);
if (toCreate != 0)
{
CreateWorkerThread();
toCreate--;
}
}

Expand Down Expand Up @@ -347,21 +337,6 @@ internal static bool ShouldStopProcessingWorkNow(PortableThreadPool threadPoolIn
counts = oldCounts;
}
}

private static bool TakeActiveRequest(PortableThreadPool threadPoolInstance)
{
int count = threadPoolInstance._separated.numRequestedWorkers;
while (count > 0)
{
int prevCount = Interlocked.CompareExchange(ref threadPoolInstance._separated.numRequestedWorkers, count - 1, count);
if (prevCount == count)
{
return true;
}
count = prevCount;
}
return false;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,10 @@ private struct CacheLineSeparated
[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 1)]
public ThreadCounts counts; // SOS's ThreadPool command depends on this name

// Periodically updated heartbeat timestamp to indicate that we are making progress.
// Used in starvation detection.
[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 2)]
public int lastDequeueTime;
public int lastDispatchTime;

[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 3)]
public int priorCompletionCount;
Expand All @@ -97,8 +99,20 @@ private struct CacheLineSeparated
[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 3 + sizeof(int) * 2)]
public int nextCompletedWorkRequestsTime;

// This flag is used for communication between item enqueuing and workers that process the items.
// There are two states of this flag:
// 0: has no guarantees
// 1: means a worker will check work queues and ensure that
// any work items inserted in work queue before setting the flag
// are picked up.
// Note: The state must be cleared by the worker thread _before_
// checking. Otherwise there is a window between finding no work
// and resetting the flag, when the flag is in a wrong state.
// A new work item may be added right before the flag is reset
// without asking for a worker, while the last worker is quitting.
[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 4)]
public volatile int numRequestedWorkers;
public int _hasOutstandingThreadRequest;

[FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 4 + sizeof(int))]
public int gateThreadRunningState;
}
Expand Down Expand Up @@ -209,7 +223,7 @@ public bool SetMinThreads(int workerThreads, int ioCompletionThreads)
else if (_separated.counts.NumThreadsGoal < newMinThreads)
{
_separated.counts.InterlockedSetNumThreadsGoal(newMinThreads);
if (_separated.numRequestedWorkers > 0)
if (_separated._hasOutstandingThreadRequest != 0)
{
addWorker = true;
}
Expand Down Expand Up @@ -330,26 +344,30 @@ private ThreadInt64PersistentCounter.ThreadLocalNode CreateThreadLocalCompletion
return threadLocalCompletionCountNode;
}

private void NotifyWorkItemProgress(ThreadInt64PersistentCounter.ThreadLocalNode threadLocalCompletionCountNode, int currentTimeMs)
private static void NotifyWorkItemProgress(ThreadInt64PersistentCounter.ThreadLocalNode threadLocalCompletionCountNode)
{
threadLocalCompletionCountNode.Increment();
_separated.lastDequeueTime = currentTimeMs;
}

internal void NotifyWorkItemProgress()
{
NotifyWorkItemProgress(GetOrCreateThreadLocalCompletionCountNode());
}

internal bool NotifyWorkItemComplete(ThreadInt64PersistentCounter.ThreadLocalNode threadLocalCompletionCountNode, int currentTimeMs)
{
NotifyWorkItemProgress(threadLocalCompletionCountNode);
if (ShouldAdjustMaxWorkersActive(currentTimeMs))
{
AdjustMaxWorkersActive();
}
}

internal void NotifyWorkItemProgress() =>
NotifyWorkItemProgress(GetOrCreateThreadLocalCompletionCountNode(), Environment.TickCount);
return !WorkerThread.ShouldStopProcessingWorkNow(this);
}

internal bool NotifyWorkItemComplete(ThreadInt64PersistentCounter.ThreadLocalNode? threadLocalCompletionCountNode, int currentTimeMs)
internal void NotifyDispatchProgress(int currentTickCount)
{
Debug.Assert(threadLocalCompletionCountNode != null);

NotifyWorkItemProgress(threadLocalCompletionCountNode, currentTimeMs);
return !WorkerThread.ShouldStopProcessingWorkNow(this);
_separated.lastDispatchTime = currentTickCount;
}

//
Expand Down Expand Up @@ -459,13 +477,15 @@ private bool ShouldAdjustMaxWorkersActive(int currentTimeMs)
return _pendingBlockingAdjustment == PendingBlockingAdjustment.None;
}

internal void RequestWorker()
internal void EnsureWorkerRequested()
{
// The order of operations here is important. MaybeAddWorkingWorker() and EnsureRunning() use speculative checks to
// do their work and the memory barrier from the interlocked operation is necessary in this case for correctness.
Interlocked.Increment(ref _separated.numRequestedWorkers);
WorkerThread.MaybeAddWorkingWorker(this);
GateThread.EnsureRunning(this);
// Only one worker is requested at a time to mitigate Thundering Herd problem.
if (_separated._hasOutstandingThreadRequest == 0 &&
Interlocked.Exchange(ref _separated._hasOutstandingThreadRequest, 1) == 0)
{
WorkerThread.MaybeAddWorkingWorker(this);
GateThread.EnsureRunning(this);
}
}

private bool OnGen2GCCallback()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public static partial class ThreadPool
// Indicates that the threadpool should yield the thread from the dispatch loop to the
// runtime periodically. We use this to return back to the JS event loop so that the JS
// event queue can be drained
internal static bool YieldFromDispatchLoop => true;
#pragma warning disable IDE0060 // Remove unused parameter
internal static bool YieldFromDispatchLoop(int currentTickCount) => true;
#pragma warning restore IDE0060 }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ public static partial class ThreadPool
{
// Indicates whether the thread pool should yield the thread from the dispatch loop to the runtime periodically so that
// the runtime may use the thread for processing other work
internal static bool YieldFromDispatchLoop => true;
#pragma warning disable IDE0060 // Remove unused parameter
internal static bool YieldFromDispatchLoop(int currentTickCount) => true;
#pragma warning restore IDE0060

private const bool IsWorkerTrackingEnabledInConfig = false;

Expand Down Expand Up @@ -78,7 +80,7 @@ public static void GetAvailableThreads(out int workerThreads, out int completion
public static long CompletedWorkItemCount => 0;

[DynamicDependency("BackgroundJobHandler")] // https://github.com/dotnet/runtime/issues/101434
internal static unsafe void RequestWorkerThread()
internal static unsafe void EnsureWorkerRequested()
{
if (_callbackQueued)
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ public static partial class ThreadPool
#if !(TARGET_BROWSER && FEATURE_WASM_MANAGED_THREADS)
// Indicates whether the thread pool should yield the thread from the dispatch loop to the runtime periodically so that
// the runtime may use the thread for processing other work.
internal static bool YieldFromDispatchLoop => false;
internal static bool YieldFromDispatchLoop(int currentTickCount)
{
PortableThreadPool.ThreadPoolInstance.NotifyDispatchProgress(currentTickCount);
return false;
}
#endif

internal static ThreadInt64PersistentCounter.ThreadLocalNode GetOrCreateThreadLocalCompletionCountNode() =>
Expand Down Expand Up @@ -67,9 +71,9 @@ internal static bool NotifyWorkItemComplete(ThreadInt64PersistentCounter.ThreadL
/// <summary>
/// This method is called to request a new thread pool worker to handle pending work.
/// </summary>
internal static unsafe void RequestWorkerThread()
internal static unsafe void EnsureWorkerRequested()
{
PortableThreadPool.ThreadPoolInstance.RequestWorker();
PortableThreadPool.ThreadPoolInstance.EnsureWorkerRequested();
}

internal static void ReportThreadStatus(bool isWorking)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ public static partial class ThreadPool
{
// Indicates whether the thread pool should yield the thread from the dispatch loop to the runtime periodically so that
// the runtime may use the thread for processing other work
internal static bool YieldFromDispatchLoop => true;
#pragma warning disable IDE0060 // Remove unused parameter
internal static bool YieldFromDispatchLoop(int currentTickCount) => true;
#pragma warning restore IDE0060

private const bool IsWorkerTrackingEnabledInConfig = false;

Expand Down Expand Up @@ -75,7 +77,7 @@ public static void GetAvailableThreads(out int workerThreads, out int completion

public static long CompletedWorkItemCount => 0;

internal static unsafe void RequestWorkerThread()
internal static unsafe void EnsureWorkerRequested()
{
}

Expand Down
Loading
Loading