diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs index 73fe2afe9a4d0e..bd6773321374b9 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.Blocking.cs @@ -230,7 +230,8 @@ private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWo HillClimbing.ThreadPoolHillClimber.ForceChange( newNumThreadsGoal, HillClimbing.StateOrTransition.CooperativeBlocking); - if (counts.NumProcessingWork >= numThreadsGoal && _separated.numRequestedWorkers > 0) + + if (counts.NumProcessingWork >= numThreadsGoal && _separated._hasOutstandingThreadRequest != 0) { addWorker = true; } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.GateThread.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.GateThread.cs index a0434cdfa9abb3..e545b660908538 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.GateThread.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.GateThread.cs @@ -132,7 +132,7 @@ private static void GateThreadStart() if (!disableStarvationDetection && threadPoolInstance._pendingBlockingAdjustment == PendingBlockingAdjustment.None && - threadPoolInstance._separated.numRequestedWorkers > 0 && + threadPoolInstance._separated._hasOutstandingThreadRequest != 0 && SufficientDelaySinceLastDequeue(threadPoolInstance)) { bool addWorker = false; @@ -187,7 +187,7 @@ private static void GateThreadStart() } } - if (threadPoolInstance._separated.numRequestedWorkers <= 0 && + if (threadPoolInstance._separated._hasOutstandingThreadRequest == 0 && threadPoolInstance._pendingBlockingAdjustment == PendingBlockingAdjustment.None && Interlocked.Decrement(ref threadPoolInstance._separated.gateThreadRunningState) <= GetRunningStateForNumRuns(0)) { @@ -208,7 +208,7 @@ public static void Wake(PortableThreadPool threadPoolInstance) // in deciding "too long" private static bool SufficientDelaySinceLastDequeue(PortableThreadPool threadPoolInstance) { - uint delay = (uint)(Environment.TickCount - threadPoolInstance._separated.lastDequeueTime); + uint delay = (uint)(Environment.TickCount - threadPoolInstance._separated.lastDispatchTime); uint minimumDelay; if (threadPoolInstance._cpuUtilization < CpuUtilizationLow) { diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs index 26b6ab0cf0ac48..2ec801826eb845 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.ThreadCounts.cs @@ -44,6 +44,17 @@ public short NumProcessingWork } } + /// + /// Reduces the number of threads processing work items by one. + /// + public void DecrementProcessingWork() + { + // This should never underflow + Debug.Assert(NumProcessingWork > 0); + Interlocked.Decrement(ref _data); + Debug.Assert(NumProcessingWork >= 0); + } + /// /// Number of thread pool threads that currently exist. /// diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs index a34e0f8ff98c4e..eeef34fb076cab 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.WorkerThread.cs @@ -1,6 +1,7 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Diagnostics; using System.Diagnostics.Tracing; using System.Runtime.CompilerServices; @@ -118,6 +119,8 @@ private static void WorkerThreadStart() WorkerDoWork(threadPoolInstance, ref spinWait); } + // We've timed out waiting on the semaphore. Time to exit. + // In rare cases we may be asked to keep running/waiting. if (ShouldExitWorker(threadPoolInstance, threadAdjustmentLock)) { break; @@ -125,13 +128,17 @@ private static void WorkerThreadStart() } } - [MethodImpl(MethodImplOptions.AggressiveInlining)] private static void WorkerDoWork(PortableThreadPool threadPoolInstance, ref bool spinWait) { bool alreadyRemovedWorkingWorker = false; - while (TakeActiveRequest(threadPoolInstance)) + + // We allow the requests to be "stolen" by threads who are done dispatching, + // thus it is not guaranteed at this point that there is a request. + while (threadPoolInstance._separated._hasOutstandingThreadRequest != 0 && + Interlocked.Exchange(ref threadPoolInstance._separated._hasOutstandingThreadRequest, 0) != 0) { - threadPoolInstance._separated.lastDequeueTime = Environment.TickCount; + // We took the request, now we must Dispatch some work items. + threadPoolInstance.NotifyDispatchProgress(Environment.TickCount); if (!ThreadPoolWorkQueue.Dispatch()) { // ShouldStopProcessingWorkNow() caused the thread to stop processing work, and it would have @@ -141,7 +148,7 @@ private static void WorkerDoWork(PortableThreadPool threadPoolInstance, ref bool break; } - if (threadPoolInstance._separated.numRequestedWorkers <= 0) + if (threadPoolInstance._separated._hasOutstandingThreadRequest == 0) { break; } @@ -175,7 +182,6 @@ private static void WorkerDoWork(PortableThreadPool threadPoolInstance, ref bool // returns true if the worker is shutting down // returns false if we should do another iteration - [MethodImpl(MethodImplOptions.AggressiveInlining)] private static bool ShouldExitWorker(PortableThreadPool threadPoolInstance, LowLevelLock threadAdjustmentLock) { // The thread cannot exit if it has IO pending, otherwise the IO may be canceled @@ -239,29 +245,17 @@ private static bool ShouldExitWorker(PortableThreadPool threadPoolInstance, LowL /// private static void RemoveWorkingWorker(PortableThreadPool threadPoolInstance) { - // A compare-exchange loop is used instead of Interlocked.Decrement or Interlocked.Add to defensively prevent - // NumProcessingWork from underflowing. See the setter for NumProcessingWork. - ThreadCounts counts = threadPoolInstance._separated.counts; - while (true) - { - ThreadCounts newCounts = counts; - newCounts.NumProcessingWork--; + threadPoolInstance._separated.counts.DecrementProcessingWork(); - ThreadCounts countsBeforeUpdate = - threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts); - if (countsBeforeUpdate == counts) - { - break; - } + // It's possible that a thread request was not actionable due to being at max active threads. + // Now we may be able to add a thread and if we can we must add. + // Otherwise, if all workers happen to quit at once, there will be noone left running while + // there is a request. - counts = countsBeforeUpdate; - } + // NOTE: The request check must happen after the active count was updated, + // which the interlocked decrement guarantees. - // It's possible that we decided we had thread requests just before a request came in, - // but reduced the worker count *after* the request came in. In this case, we might - // miss the notification of a thread request. So we wake up a thread (maybe this one!) - // if there is work to do. - if (threadPoolInstance._separated.numRequestedWorkers > 0) + if (threadPoolInstance._separated._hasOutstandingThreadRequest != 0) { MaybeAddWorkingWorker(threadPoolInstance); } @@ -297,18 +291,14 @@ internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance counts = oldCounts; } - int toCreate = newNumExistingThreads - numExistingThreads; - int toRelease = newNumProcessingWork - numProcessingWork; + Debug.Assert(newNumProcessingWork - numProcessingWork == 1); + s_semaphore.Release(1); - if (toRelease > 0) - { - s_semaphore.Release(toRelease); - } - - while (toCreate > 0) + int toCreate = newNumExistingThreads - numExistingThreads; + Debug.Assert(toCreate == 0 || toCreate == 1); + if (toCreate != 0) { CreateWorkerThread(); - toCreate--; } } @@ -347,21 +337,6 @@ internal static bool ShouldStopProcessingWorkNow(PortableThreadPool threadPoolIn counts = oldCounts; } } - - private static bool TakeActiveRequest(PortableThreadPool threadPoolInstance) - { - int count = threadPoolInstance._separated.numRequestedWorkers; - while (count > 0) - { - int prevCount = Interlocked.CompareExchange(ref threadPoolInstance._separated.numRequestedWorkers, count - 1, count); - if (prevCount == count) - { - return true; - } - count = prevCount; - } - return false; - } } } } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs index a2184f544cf1ad..d53f03ce2a81f7 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs @@ -87,8 +87,10 @@ private struct CacheLineSeparated [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 1)] public ThreadCounts counts; // SOS's ThreadPool command depends on this name + // Periodically updated heartbeat timestamp to indicate that we are making progress. + // Used in starvation detection. [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 2)] - public int lastDequeueTime; + public int lastDispatchTime; [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 3)] public int priorCompletionCount; @@ -97,8 +99,20 @@ private struct CacheLineSeparated [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 3 + sizeof(int) * 2)] public int nextCompletedWorkRequestsTime; + // This flag is used for communication between item enqueuing and workers that process the items. + // There are two states of this flag: + // 0: has no guarantees + // 1: means a worker will check work queues and ensure that + // any work items inserted in work queue before setting the flag + // are picked up. + // Note: The state must be cleared by the worker thread _before_ + // checking. Otherwise there is a window between finding no work + // and resetting the flag, when the flag is in a wrong state. + // A new work item may be added right before the flag is reset + // without asking for a worker, while the last worker is quitting. [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 4)] - public volatile int numRequestedWorkers; + public int _hasOutstandingThreadRequest; + [FieldOffset(Internal.PaddingHelpers.CACHE_LINE_SIZE * 4 + sizeof(int))] public int gateThreadRunningState; } @@ -209,7 +223,7 @@ public bool SetMinThreads(int workerThreads, int ioCompletionThreads) else if (_separated.counts.NumThreadsGoal < newMinThreads) { _separated.counts.InterlockedSetNumThreadsGoal(newMinThreads); - if (_separated.numRequestedWorkers > 0) + if (_separated._hasOutstandingThreadRequest != 0) { addWorker = true; } @@ -330,26 +344,30 @@ private ThreadInt64PersistentCounter.ThreadLocalNode CreateThreadLocalCompletion return threadLocalCompletionCountNode; } - private void NotifyWorkItemProgress(ThreadInt64PersistentCounter.ThreadLocalNode threadLocalCompletionCountNode, int currentTimeMs) + private static void NotifyWorkItemProgress(ThreadInt64PersistentCounter.ThreadLocalNode threadLocalCompletionCountNode) { threadLocalCompletionCountNode.Increment(); - _separated.lastDequeueTime = currentTimeMs; + } + internal void NotifyWorkItemProgress() + { + NotifyWorkItemProgress(GetOrCreateThreadLocalCompletionCountNode()); + } + + internal bool NotifyWorkItemComplete(ThreadInt64PersistentCounter.ThreadLocalNode threadLocalCompletionCountNode, int currentTimeMs) + { + NotifyWorkItemProgress(threadLocalCompletionCountNode); if (ShouldAdjustMaxWorkersActive(currentTimeMs)) { AdjustMaxWorkersActive(); } - } - internal void NotifyWorkItemProgress() => - NotifyWorkItemProgress(GetOrCreateThreadLocalCompletionCountNode(), Environment.TickCount); + return !WorkerThread.ShouldStopProcessingWorkNow(this); + } - internal bool NotifyWorkItemComplete(ThreadInt64PersistentCounter.ThreadLocalNode? threadLocalCompletionCountNode, int currentTimeMs) + internal void NotifyDispatchProgress(int currentTickCount) { - Debug.Assert(threadLocalCompletionCountNode != null); - - NotifyWorkItemProgress(threadLocalCompletionCountNode, currentTimeMs); - return !WorkerThread.ShouldStopProcessingWorkNow(this); + _separated.lastDispatchTime = currentTickCount; } // @@ -459,13 +477,15 @@ private bool ShouldAdjustMaxWorkersActive(int currentTimeMs) return _pendingBlockingAdjustment == PendingBlockingAdjustment.None; } - internal void RequestWorker() + internal void EnsureWorkerRequested() { - // The order of operations here is important. MaybeAddWorkingWorker() and EnsureRunning() use speculative checks to - // do their work and the memory barrier from the interlocked operation is necessary in this case for correctness. - Interlocked.Increment(ref _separated.numRequestedWorkers); - WorkerThread.MaybeAddWorkingWorker(this); - GateThread.EnsureRunning(this); + // Only one worker is requested at a time to mitigate Thundering Herd problem. + if (_separated._hasOutstandingThreadRequest == 0 && + Interlocked.Exchange(ref _separated._hasOutstandingThreadRequest, 1) == 0) + { + WorkerThread.MaybeAddWorkingWorker(this); + GateThread.EnsureRunning(this); + } } private bool OnGen2GCCallback() diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Threads.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Threads.cs index 7933e49db422b9..d2515952b4d8e1 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Threads.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Threads.cs @@ -8,6 +8,7 @@ public static partial class ThreadPool // Indicates that the threadpool should yield the thread from the dispatch loop to the // runtime periodically. We use this to return back to the JS event loop so that the JS // event queue can be drained - internal static bool YieldFromDispatchLoop => true; +#pragma warning disable IDE0060 // Remove unused parameter + internal static bool YieldFromDispatchLoop(int currentTickCount) => true; +#pragma warning restore IDE0060 } } -} diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.cs index 5250e1df5f1ab9..3d1428d0c53b62 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.cs @@ -35,7 +35,9 @@ public static partial class ThreadPool { // Indicates whether the thread pool should yield the thread from the dispatch loop to the runtime periodically so that // the runtime may use the thread for processing other work - internal static bool YieldFromDispatchLoop => true; +#pragma warning disable IDE0060 // Remove unused parameter + internal static bool YieldFromDispatchLoop(int currentTickCount) => true; +#pragma warning restore IDE0060 private const bool IsWorkerTrackingEnabledInConfig = false; @@ -78,7 +80,7 @@ public static void GetAvailableThreads(out int workerThreads, out int completion public static long CompletedWorkItemCount => 0; [DynamicDependency("BackgroundJobHandler")] // https://github.com/dotnet/runtime/issues/101434 - internal static unsafe void RequestWorkerThread() + internal static unsafe void EnsureWorkerRequested() { if (_callbackQueued) return; diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Unix.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Unix.cs index b100409793ba20..236aa07bc32aae 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Unix.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Unix.cs @@ -19,7 +19,11 @@ public static partial class ThreadPool #if !(TARGET_BROWSER && FEATURE_WASM_MANAGED_THREADS) // Indicates whether the thread pool should yield the thread from the dispatch loop to the runtime periodically so that // the runtime may use the thread for processing other work. - internal static bool YieldFromDispatchLoop => false; + internal static bool YieldFromDispatchLoop(int currentTickCount) + { + PortableThreadPool.ThreadPoolInstance.NotifyDispatchProgress(currentTickCount); + return false; + } #endif internal static ThreadInt64PersistentCounter.ThreadLocalNode GetOrCreateThreadLocalCompletionCountNode() => @@ -67,9 +71,9 @@ internal static bool NotifyWorkItemComplete(ThreadInt64PersistentCounter.ThreadL /// /// This method is called to request a new thread pool worker to handle pending work. /// - internal static unsafe void RequestWorkerThread() + internal static unsafe void EnsureWorkerRequested() { - PortableThreadPool.ThreadPoolInstance.RequestWorker(); + PortableThreadPool.ThreadPoolInstance.EnsureWorkerRequested(); } internal static void ReportThreadStatus(bool isWorking) diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Wasi.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Wasi.cs index fa499f0fd857fb..5f983e1d3c4389 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Wasi.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Wasi.cs @@ -35,7 +35,9 @@ public static partial class ThreadPool { // Indicates whether the thread pool should yield the thread from the dispatch loop to the runtime periodically so that // the runtime may use the thread for processing other work - internal static bool YieldFromDispatchLoop => true; +#pragma warning disable IDE0060 // Remove unused parameter + internal static bool YieldFromDispatchLoop(int currentTickCount) => true; +#pragma warning restore IDE0060 private const bool IsWorkerTrackingEnabledInConfig = false; @@ -75,7 +77,7 @@ public static void GetAvailableThreads(out int workerThreads, out int completion public static long CompletedWorkItemCount => 0; - internal static unsafe void RequestWorkerThread() + internal static unsafe void EnsureWorkerRequested() { } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Windows.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Windows.cs index 0ad70d35a92c32..223cea9c318e4e 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Windows.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPool.Windows.cs @@ -30,11 +30,19 @@ public static partial class ThreadPool // Indicates whether the thread pool should yield the thread from the dispatch loop to the runtime periodically so that // the runtime may use the thread for processing other work. - // - // Windows thread pool threads need to yield back to the thread pool periodically, otherwise those threads may be - // considered to be doing long-running work and change thread pool heuristics, such as slowing or halting thread - // injection. - internal static bool YieldFromDispatchLoop => UseWindowsThreadPool; + internal static bool YieldFromDispatchLoop(int currentTickCount) + { + if (UseWindowsThreadPool) + { + // Windows thread pool threads need to yield back to the thread pool periodically, otherwise those threads may be + // considered to be doing long-running work and change thread pool heuristics, such as slowing or halting thread + // injection. + return true; + } + + PortableThreadPool.ThreadPoolInstance.NotifyDispatchProgress(currentTickCount); + return false; + } [CLSCompliant(false)] [SupportedOSPlatform("windows")] @@ -155,17 +163,24 @@ internal static void NotifyThreadUnblocked() } /// - /// This method is called to request a new thread pool worker to handle pending work. + /// This method is called to notify the thread pool about pending work. + /// It will start with an ordinary read to check if a request is already pending as we + /// optimize for a case when queues already have items and this flag is already set. + /// Make sure that the presence of the item that is being added to the queue is visible + /// before calling this. + /// Typically this is not a problem when enqueing uses an interlocked update of the queue + /// index to establish the presence of the new item. More care may be needed when an item + /// is inserted via ordinary or volatile writes. /// - internal static void RequestWorkerThread() + internal static void EnsureWorkerRequested() { if (ThreadPool.UseWindowsThreadPool) { - WindowsThreadPool.RequestWorkerThread(); + WindowsThreadPool.EnsureWorkerRequested(); } else { - PortableThreadPool.ThreadPoolInstance.RequestWorker(); + PortableThreadPool.ThreadPoolInstance.EnsureWorkerRequested(); } } diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index b3bdc1b80f7077..a6ddb68878cbc0 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -127,7 +127,11 @@ public void LocalPush(object obj) // When there are at least 2 elements' worth of space, we can take the fast path. if (tail < m_headIndex + m_mask) { - Volatile.Write(ref m_array[tail & m_mask], obj); + m_array[tail & m_mask] = obj; + // The following write makes the slot to "appear" in the queue. + // It must happen after the write of the item, and it does, since m_tailIndex is volatile. + // NOTE: we also must be sure this write is not delayed past our check for a + // pending thread request. m_tailIndex = tail + 1; } else @@ -156,7 +160,11 @@ public void LocalPush(object obj) m_mask = (m_mask << 1) | 1; } - Volatile.Write(ref m_array[tail & m_mask], obj); + m_array[tail & m_mask] = obj; + // The following write makes the slot to "appear" in the queue. + // It must happen after the write of the item, and it does, since m_tailIndex is volatile. + // NOTE: we also must be sure this write is not delayed past our check for a + // pending thread request. m_tailIndex = tail + 1; } finally @@ -165,6 +173,10 @@ public void LocalPush(object obj) m_foreignLock.Exit(useMemoryBarrier: false); } } + + // Our caller will check for a thread request now (with an ordinary read), + // make sure the check happens after the new slot appears in the queue. + Interlocked.MemoryBarrier(); } [MethodImpl(MethodImplOptions.NoInlining)] @@ -410,7 +422,6 @@ public int Count private bool _loggingEnabled; private bool _dispatchNormalPriorityWorkFirst; - private bool _mayHaveHighPriorityWorkItems; // SOS's ThreadPool command depends on the following names internal readonly WorkQueue workItems = new WorkQueue(); @@ -431,29 +442,6 @@ public int Count private readonly int[] _assignedWorkItemQueueThreadCounts = s_assignableWorkItemQueueCount > 0 ? new int[s_assignableWorkItemQueueCount] : Array.Empty(); - [StructLayout(LayoutKind.Sequential)] - private struct CacheLineSeparated - { - private readonly Internal.PaddingFor32 pad1; - - // This flag is used for communication between item enqueuing and workers that process the items. - // There are two states of this flag: - // 0: has no guarantees - // 1: means a worker will check work queues and ensure that - // any work items inserted in work queue before setting the flag - // are picked up. - // Note: The state must be cleared by the worker thread _before_ - // checking. Otherwise there is a window between finding no work - // and resetting the flag, when the flag is in a wrong state. - // A new work item may be added right before the flag is reset - // without asking for a worker, while the last worker is quitting. - public int _hasOutstandingThreadRequest; - - private readonly Internal.PaddingFor32 pad2; - } - - private CacheLineSeparated _separated; - public ThreadPoolWorkQueue() { for (int i = 0; i < s_assignableWorkItemQueueCount; i++) @@ -572,7 +560,7 @@ private void UnassignWorkItemQueue(ThreadPoolWorkQueueThreadLocals tl) if (movedWorkItem) { - EnsureThreadRequested(); + ThreadPool.EnsureWorkerRequested(); } } @@ -608,16 +596,6 @@ public void RefreshLoggingEnabledFull() _loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer); } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal void EnsureThreadRequested() - { - // Only one worker is requested at a time to mitigate Thundering Herd problem. - if (Interlocked.Exchange(ref _separated._hasOutstandingThreadRequest, 1) == 0) - { - ThreadPool.RequestWorkerThread(); - } - } - public void Enqueue(object callback, bool forceGlobal) { Debug.Assert((callback is IThreadPoolWorkItem) ^ (callback is Task)); @@ -648,7 +626,7 @@ public void Enqueue(object callback, bool forceGlobal) } } - EnsureThreadRequested(); + ThreadPool.EnsureWorkerRequested(); } #if CORECLR @@ -696,10 +674,7 @@ public void EnqueueAtHighPriority(object workItem) highPriorityWorkItems.Enqueue(workItem); - // If the change below is seen by another thread, ensure that the enqueued work item will also be visible - Volatile.Write(ref _mayHaveHighPriorityWorkItems, true); - - EnsureThreadRequested(); + ThreadPool.EnsureWorkerRequested(); } internal static void TransferAllLocalWorkItemsToHighPriorityGlobalQueue() @@ -713,39 +688,32 @@ internal static void TransferAllLocalWorkItemsToHighPriorityGlobalQueue() // Pop each work item off the local queue and push it onto the global. This is a // bounded loop as no other thread is allowed to push into this thread's queue. ThreadPoolWorkQueue queue = ThreadPool.s_workQueue; - bool addedHighPriorityWorkItem = false; - bool ensureThreadRequest = false; + bool ensureWorkerRequest = false; while (tl.workStealingQueue.LocalPop() is object workItem) { + // A work item had been removed temporarily and other threads may have missed stealing it, so ensure that + // there will be a thread request + ensureWorkerRequest = true; + // If there's an unexpected exception here that happens to get handled, the lost work item, or missing thread // request, etc., may lead to other issues. A fail-fast or try-finally here could reduce the effect of such // uncommon issues to various degrees, but it's also uncommon to check for unexpected exceptions. try { queue.highPriorityWorkItems.Enqueue(workItem); - addedHighPriorityWorkItem = true; } catch (OutOfMemoryException) { // This is not expected to throw under normal circumstances tl.workStealingQueue.LocalPush(workItem); - // A work item had been removed temporarily and other threads may have missed stealing it, so ensure that - // there will be a thread request - ensureThreadRequest = true; break; } } - if (addedHighPriorityWorkItem) + if (ensureWorkerRequest) { - Volatile.Write(ref queue._mayHaveHighPriorityWorkItems, true); - ensureThreadRequest = true; - } - - if (ensureThreadRequest) - { - queue.EnsureThreadRequested(); + ThreadPool.EnsureWorkerRequested(); } } @@ -774,9 +742,11 @@ internal static bool LocalFindAndPop(object callback) tl.isProcessingHighPriorityWorkItems = false; } - else if ( - _mayHaveHighPriorityWorkItems && - Interlocked.CompareExchange(ref _mayHaveHighPriorityWorkItems, false, true) && +#if FEATURE_SINGLE_THREADED + else if (highPriorityWorkItems.Count == 0 && +#else + else if (!highPriorityWorkItems.IsEmpty && +#endif TryStartProcessingHighPriorityWorkItemsAndDequeue(tl, out workItem)) { return workItem; @@ -855,7 +825,6 @@ private bool TryStartProcessingHighPriorityWorkItemsAndDequeue( } tl.isProcessingHighPriorityWorkItems = true; - _mayHaveHighPriorityWorkItems = true; return true; } @@ -940,9 +909,6 @@ internal static bool Dispatch() workQueue.AssignWorkItemQueue(tl); } - // Before dequeuing the first work item, acknowledge that the thread request has been satisfied - workQueue._separated._hasOutstandingThreadRequest = 0; - // The state change must happen before sweeping queues for items. Interlocked.MemoryBarrier(); @@ -955,12 +921,11 @@ internal static bool Dispatch() } // Missing a steal means there may be an item that we were unable to get. - // Effectively, we failed to fulfill our promise to check the queues after - // clearing "Scheduled" flag. + // Effectively, we failed to fulfill our promise to check the queues for work. // We need to make sure someone will do another pass. if (missedSteal) { - workQueue.EnsureThreadRequested(); + ThreadPool.EnsureWorkerRequested(); } // Tell the VM we're returning normally, not because Hill Climbing asked us to return. @@ -972,11 +937,8 @@ internal static bool Dispatch() // In a worst case the current workitem will indirectly depend on progress of other // items and that would lead to a deadlock if no one else checks the queue. // We must ensure at least one more worker is coming if the queue is not empty. - workQueue.EnsureThreadRequested(); - - // - // After this point, this method is no longer responsible for ensuring thread requests except for missed steals - // + // After this point, we are no longer responsible for ensuring thread requests. + ThreadPool.EnsureWorkerRequested(); // Has the desire for logging changed since the last time we entered? workQueue.RefreshLoggingEnabled(); @@ -1010,18 +972,6 @@ internal static bool Dispatch() workQueue.UnassignWorkItemQueue(tl); } - // - // No work. - // If we missed a steal, though, there may be more work in the queue. - // Instead of looping around and trying again, we'll just request another thread. Hopefully the thread - // that owns the contended work-stealing queue will pick up its own workitems in the meantime, - // which will be more efficient than this thread doing it anyway. - // - if (missedSteal) - { - workQueue.EnsureThreadRequested(); - } - return true; } } @@ -1089,7 +1039,7 @@ internal static bool Dispatch() // The quantum expired, do any necessary periodic activities - if (ThreadPool.YieldFromDispatchLoop) + if (ThreadPool.YieldFromDispatchLoop(currentTickCount)) { // The runtime-specific thread pool implementation requires the Dispatch loop to return to the VM // periodically to let it perform its own work diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/WindowsThreadPool.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/WindowsThreadPool.cs index 25fa8cda1ad3e6..341dadb705db28 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/WindowsThreadPool.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/WindowsThreadPool.cs @@ -27,6 +27,29 @@ internal static class WindowsThreadPool private static IntPtr s_work; + [StructLayout(LayoutKind.Sequential)] + private struct CacheLineSeparated + { + private readonly Internal.PaddingFor32 pad1; + + // This flag is used for communication between item enqueuing and workers that process the items. + // There are two states of this flag: + // 0: has no guarantees + // 1: means a worker will check work queues and ensure that + // any work items inserted in work queue before setting the flag + // are picked up. + // Note: The state must be cleared by the worker thread _before_ + // checking. Otherwise there is a window between finding no work + // and resetting the flag, when the flag is in a wrong state. + // A new work item may be added right before the flag is reset + // without asking for a worker, while the last worker is quitting. + public int _hasOutstandingThreadRequest; + + private readonly Internal.PaddingFor32 pad2; + } + + private static CacheLineSeparated _separated; + private sealed class ThreadCountHolder { internal ThreadCountHolder() => Interlocked.Increment(ref s_threadCount); @@ -147,6 +170,10 @@ private static void DispatchCallback(IntPtr instance, IntPtr context, IntPtr wor var wrapper = ThreadPoolCallbackWrapper.Enter(); Debug.Assert(s_work == work); + // Before looking for work items, acknowledge that the thread request has been satisfied + _separated._hasOutstandingThreadRequest = 0; + // NOTE: the thread request must be cleared before doing Dispatch. + // the following Interlocked.Increment will guarantee the ordering. Interlocked.Increment(ref s_workingThreadCounter.Count); ThreadPoolWorkQueue.Dispatch(); Interlocked.Decrement(ref s_workingThreadCounter.Count); @@ -155,7 +182,17 @@ private static void DispatchCallback(IntPtr instance, IntPtr context, IntPtr wor wrapper.Exit(resetThread: false); } - internal static unsafe void RequestWorkerThread() + internal static void EnsureWorkerRequested() + { + // Only one worker is requested at a time to mitigate Thundering Herd problem. + if (_separated._hasOutstandingThreadRequest == 0 && + Interlocked.Exchange(ref _separated._hasOutstandingThreadRequest, 1) == 0) + { + RequestWorkerThread(); + } + } + + private static unsafe void RequestWorkerThread() { if (s_work == IntPtr.Zero) {