diff --git a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs index 365a68c4489ac..d870c60648c6a 100644 --- a/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs +++ b/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs @@ -381,6 +381,17 @@ public int Count } } +#if CORECLR + // This config var can be used to enable an experimental mode that may reduce the effects of some priority inversion + // issues seen in cases involving a lot of sync-over-async. See EnqueueForPrioritizationExperiment() for more + // information. The mode is experimental and may change in the future. + internal static readonly bool s_prioritizationExperiment = + AppContextConfigHelper.GetBooleanConfig( + "System.Threading.ThreadPool.PrioritizationExperiment", + "DOTNET_ThreadPool_PrioritizationExperiment", + defaultValue: false); +#endif + private const int ProcessorsPerAssignableWorkItemQueue = 16; private static readonly int s_assignableWorkItemQueueCount = Environment.ProcessorCount <= 32 ? 0 : @@ -394,6 +405,11 @@ public int Count internal readonly ConcurrentQueue workItems = new ConcurrentQueue(); internal readonly ConcurrentQueue highPriorityWorkItems = new ConcurrentQueue(); +#if CORECLR + internal readonly ConcurrentQueue lowPriorityWorkItems = + s_prioritizationExperiment ? new ConcurrentQueue() : null!; +#endif + // SOS's ThreadPool command depends on the following name. The global queue doesn't scale well beyond a point of // concurrency. Some additional queues may be added and assigned to a limited number of worker threads if necessary to // help with limiting the concurrency level. @@ -608,23 +624,68 @@ public void Enqueue(object callback, bool forceGlobal) if (_loggingEnabled && FrameworkEventSource.Log.IsEnabled()) FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback); - ThreadPoolWorkQueueThreadLocals? tl; - if (!forceGlobal && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null) +#if CORECLR + if (s_prioritizationExperiment) { - tl.workStealingQueue.LocalPush(callback); + EnqueueForPrioritizationExperiment(callback, forceGlobal); } else +#endif { - ConcurrentQueue queue = - s_assignableWorkItemQueueCount > 0 && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null - ? tl.assignedGlobalWorkItemQueue - : workItems; - queue.Enqueue(callback); + ThreadPoolWorkQueueThreadLocals? tl; + if (!forceGlobal && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null) + { + tl.workStealingQueue.LocalPush(callback); + } + else + { + ConcurrentQueue queue = + s_assignableWorkItemQueueCount > 0 && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null + ? tl.assignedGlobalWorkItemQueue + : workItems; + queue.Enqueue(callback); + } } EnsureThreadRequested(); } +#if CORECLR + [MethodImpl(MethodImplOptions.NoInlining)] + private void EnqueueForPrioritizationExperiment(object callback, bool forceGlobal) + { + ThreadPoolWorkQueueThreadLocals? tl = ThreadPoolWorkQueueThreadLocals.threadLocals; + if (!forceGlobal && tl != null) + { + tl.workStealingQueue.LocalPush(callback); + return; + } + + ConcurrentQueue queue; + + // This is a rough and experimental attempt at identifying work items that should be lower priority than other + // global work items (even ones that haven't been queued yet), and to queue them to a low-priority global queue that + // is checked after all other global queues. In some cases, a work item may queue another work item that is part of + // the same set of work. For global work items, the second work item would typically get queued behind other global + // work items. In some cases involving a lot of sync-over-async, that can significantly delay worker threads from + // getting unblocked. + if (tl == null && callback is QueueUserWorkItemCallbackBase) + { + queue = lowPriorityWorkItems; + } + else if (s_assignableWorkItemQueueCount > 0 && tl != null) + { + queue = tl.assignedGlobalWorkItemQueue; + } + else + { + queue = workItems; + } + + queue.Enqueue(callback); + } +#endif + public void EnqueueAtHighPriority(object workItem) { Debug.Assert((workItem is IThreadPoolWorkItem) ^ (workItem is Task)); @@ -710,6 +771,14 @@ internal static bool LocalFindAndPop(object callback) } } +#if CORECLR + // Check for low-priority work items + if (s_prioritizationExperiment && lowPriorityWorkItems.TryDequeue(out workItem)) + { + return workItem; + } +#endif + // Try to steal from other threads' local work items { WorkStealingQueue localWsq = tl.workStealingQueue; @@ -769,6 +838,13 @@ public long GlobalCount get { long count = (long)highPriorityWorkItems.Count + workItems.Count; +#if CORECLR + if (s_prioritizationExperiment) + { + count += lowPriorityWorkItems.Count; + } +#endif + for (int i = 0; i < s_assignableWorkItemQueueCount; i++) { count += _assignableWorkItemQueues[i].Count; @@ -1803,6 +1879,17 @@ internal static IEnumerable GetQueuedWorkItems() yield return workItem; } +#if CORECLR + if (ThreadPoolWorkQueue.s_prioritizationExperiment) + { + // Enumerate low-priority global queue + foreach (object workItem in s_workQueue.lowPriorityWorkItems) + { + yield return workItem; + } + } +#endif + // Enumerate each local queue foreach (ThreadPoolWorkQueue.WorkStealingQueue wsq in ThreadPoolWorkQueue.WorkStealingQueueList.Queues) { diff --git a/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs b/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs index c96ad22b47b53..cc3b5b082fc89 100644 --- a/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs +++ b/src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs @@ -1251,6 +1251,89 @@ async Task StartClientAsync() }).Dispose(); } + [ConditionalFact(nameof(IsThreadingAndRemoteExecutorSupported))] + public static void PrioritizationExperimentConfigVarTest() + { + // Avoid contaminating the main process' environment + RemoteExecutor.Invoke(() => + { + // The actual test process below will inherit the config var + Environment.SetEnvironmentVariable("DOTNET_ThreadPool_PrioritizationExperiment", "1"); + + RemoteExecutor.Invoke(() => + { + const int WorkItemCountPerKind = 100; + + int completedWorkItemCount = 0; + var allWorkItemsCompleted = new AutoResetEvent(false); + Action workItem = _ => + { + if (Interlocked.Increment(ref completedWorkItemCount) == WorkItemCountPerKind * 3) + { + allWorkItemsCompleted.Set(); + } + }; + + var startTest = new ManualResetEvent(false); + + var t = new Thread(() => + { + // Enqueue global work from a non-thread-pool thread + + startTest.CheckedWait(); + + for (int i = 0; i < WorkItemCountPerKind; i++) + { + ThreadPool.UnsafeQueueUserWorkItem(workItem, 0, preferLocal: false); + } + }); + t.IsBackground = true; + t.Start(); + + ThreadPool.UnsafeQueueUserWorkItem( + _ => + { + // Enqueue global work from a thread pool worker thread + + startTest.CheckedWait(); + + for (int i = 0; i < WorkItemCountPerKind; i++) + { + ThreadPool.UnsafeQueueUserWorkItem(workItem, 0, preferLocal: false); + } + }, + 0, + preferLocal: false); + + t = new Thread(() => + { + // Enqueue local work from thread pool worker threads + + Assert.True(WorkItemCountPerKind / 10 * 10 == WorkItemCountPerKind); + Action localWorkItemEnqueuer = _ => + { + for (int i = 0; i < WorkItemCountPerKind / 10; i++) + { + ThreadPool.UnsafeQueueUserWorkItem(workItem, 0, preferLocal: true); + } + }; + + startTest.CheckedWait(); + + for (int i = 0; i < 10; i++) + { + ThreadPool.UnsafeQueueUserWorkItem(localWorkItemEnqueuer, 0, preferLocal: false); + } + }); + t.IsBackground = true; + t.Start(); + + startTest.Set(); + allWorkItemsCompleted.CheckedWait(); + }).Dispose(); + }).Dispose(); + } + public static bool IsThreadingAndRemoteExecutorSupported => PlatformDetection.IsThreadingSupported && RemoteExecutor.IsSupported;