Skip to content

Commit

Permalink
Add a config flag to experiment with some work item prioritization ch…
Browse files Browse the repository at this point in the history
…anges in cases involving a lot of sync-over-async (dotnet#103983)

* Add a config flag to experiment with some work item prioritization changes in cases involving a lot of sync-over-async

Some services that use a lot of sync-over-async were seen to experience stalls due to some priority inversion issues in work items that get queued to the thread pool. For instance, a work item W1 queues another work item W2 to the global queue and blocks waiting for a task to complete, where W2 would need to run in order to complete the task, but W2 is queued behind a number of other work items that operate like W1, and this sometimes leads to long-duration stalls. This change adds an experimental config option that when enabled, enqueues some kinds of work items to a new low-priority global queue that is checked after all other global queues. This was seen to help in some cases.
  • Loading branch information
kouvel committed Jul 9, 2024
1 parent 7f5ce1e commit 4882c61
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,17 @@ public int Count
}
}

#if CORECLR
// This config var can be used to enable an experimental mode that may reduce the effects of some priority inversion
// issues seen in cases involving a lot of sync-over-async. See EnqueueForPrioritizationExperiment() for more
// information. The mode is experimental and may change in the future.
internal static readonly bool s_prioritizationExperiment =
AppContextConfigHelper.GetBooleanConfig(
"System.Threading.ThreadPool.PrioritizationExperiment",
"DOTNET_ThreadPool_PrioritizationExperiment",
defaultValue: false);
#endif

private const int ProcessorsPerAssignableWorkItemQueue = 16;
private static readonly int s_assignableWorkItemQueueCount =
Environment.ProcessorCount <= 32 ? 0 :
Expand All @@ -394,6 +405,11 @@ public int Count
internal readonly ConcurrentQueue<object> workItems = new ConcurrentQueue<object>();
internal readonly ConcurrentQueue<object> highPriorityWorkItems = new ConcurrentQueue<object>();

#if CORECLR
internal readonly ConcurrentQueue<object> lowPriorityWorkItems =
s_prioritizationExperiment ? new ConcurrentQueue<object>() : null!;
#endif

// SOS's ThreadPool command depends on the following name. The global queue doesn't scale well beyond a point of
// concurrency. Some additional queues may be added and assigned to a limited number of worker threads if necessary to
// help with limiting the concurrency level.
Expand Down Expand Up @@ -608,23 +624,68 @@ public void Enqueue(object callback, bool forceGlobal)
if (_loggingEnabled && FrameworkEventSource.Log.IsEnabled())
FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback);

ThreadPoolWorkQueueThreadLocals? tl;
if (!forceGlobal && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null)
#if CORECLR
if (s_prioritizationExperiment)
{
tl.workStealingQueue.LocalPush(callback);
EnqueueForPrioritizationExperiment(callback, forceGlobal);
}
else
#endif
{
ConcurrentQueue<object> queue =
s_assignableWorkItemQueueCount > 0 && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null
? tl.assignedGlobalWorkItemQueue
: workItems;
queue.Enqueue(callback);
ThreadPoolWorkQueueThreadLocals? tl;
if (!forceGlobal && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null)
{
tl.workStealingQueue.LocalPush(callback);
}
else
{
ConcurrentQueue<object> queue =
s_assignableWorkItemQueueCount > 0 && (tl = ThreadPoolWorkQueueThreadLocals.threadLocals) != null
? tl.assignedGlobalWorkItemQueue
: workItems;
queue.Enqueue(callback);
}
}

EnsureThreadRequested();
}

#if CORECLR
[MethodImpl(MethodImplOptions.NoInlining)]
private void EnqueueForPrioritizationExperiment(object callback, bool forceGlobal)
{
ThreadPoolWorkQueueThreadLocals? tl = ThreadPoolWorkQueueThreadLocals.threadLocals;
if (!forceGlobal && tl != null)
{
tl.workStealingQueue.LocalPush(callback);
return;
}

ConcurrentQueue<object> queue;

// This is a rough and experimental attempt at identifying work items that should be lower priority than other
// global work items (even ones that haven't been queued yet), and to queue them to a low-priority global queue that
// is checked after all other global queues. In some cases, a work item may queue another work item that is part of
// the same set of work. For global work items, the second work item would typically get queued behind other global
// work items. In some cases involving a lot of sync-over-async, that can significantly delay worker threads from
// getting unblocked.
if (tl == null && callback is QueueUserWorkItemCallbackBase)
{
queue = lowPriorityWorkItems;
}
else if (s_assignableWorkItemQueueCount > 0 && tl != null)
{
queue = tl.assignedGlobalWorkItemQueue;
}
else
{
queue = workItems;
}

queue.Enqueue(callback);
}
#endif

public void EnqueueAtHighPriority(object workItem)
{
Debug.Assert((workItem is IThreadPoolWorkItem) ^ (workItem is Task));
Expand Down Expand Up @@ -710,6 +771,14 @@ internal static bool LocalFindAndPop(object callback)
}
}

#if CORECLR
// Check for low-priority work items
if (s_prioritizationExperiment && lowPriorityWorkItems.TryDequeue(out workItem))
{
return workItem;
}
#endif

// Try to steal from other threads' local work items
{
WorkStealingQueue localWsq = tl.workStealingQueue;
Expand Down Expand Up @@ -769,6 +838,13 @@ public long GlobalCount
get
{
long count = (long)highPriorityWorkItems.Count + workItems.Count;
#if CORECLR
if (s_prioritizationExperiment)
{
count += lowPriorityWorkItems.Count;
}
#endif

for (int i = 0; i < s_assignableWorkItemQueueCount; i++)
{
count += _assignableWorkItemQueues[i].Count;
Expand Down Expand Up @@ -1803,6 +1879,17 @@ internal static IEnumerable<object> GetQueuedWorkItems()
yield return workItem;
}

#if CORECLR
if (ThreadPoolWorkQueue.s_prioritizationExperiment)
{
// Enumerate low-priority global queue
foreach (object workItem in s_workQueue.lowPriorityWorkItems)
{
yield return workItem;
}
}
#endif

// Enumerate each local queue
foreach (ThreadPoolWorkQueue.WorkStealingQueue wsq in ThreadPoolWorkQueue.WorkStealingQueueList.Queues)
{
Expand Down
83 changes: 83 additions & 0 deletions src/libraries/System.Threading.ThreadPool/tests/ThreadPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1251,6 +1251,89 @@ async Task StartClientAsync()
}).Dispose();
}

[ConditionalFact(nameof(IsThreadingAndRemoteExecutorSupported))]
public static void PrioritizationExperimentConfigVarTest()
{
// Avoid contaminating the main process' environment
RemoteExecutor.Invoke(() =>
{
// The actual test process below will inherit the config var
Environment.SetEnvironmentVariable("DOTNET_ThreadPool_PrioritizationExperiment", "1");
RemoteExecutor.Invoke(() =>
{
const int WorkItemCountPerKind = 100;
int completedWorkItemCount = 0;
var allWorkItemsCompleted = new AutoResetEvent(false);
Action<int> workItem = _ =>
{
if (Interlocked.Increment(ref completedWorkItemCount) == WorkItemCountPerKind * 3)
{
allWorkItemsCompleted.Set();
}
};
var startTest = new ManualResetEvent(false);
var t = new Thread(() =>
{
// Enqueue global work from a non-thread-pool thread
startTest.CheckedWait();
for (int i = 0; i < WorkItemCountPerKind; i++)
{
ThreadPool.UnsafeQueueUserWorkItem(workItem, 0, preferLocal: false);
}
});
t.IsBackground = true;
t.Start();
ThreadPool.UnsafeQueueUserWorkItem(
_ =>
{
// Enqueue global work from a thread pool worker thread
startTest.CheckedWait();
for (int i = 0; i < WorkItemCountPerKind; i++)
{
ThreadPool.UnsafeQueueUserWorkItem(workItem, 0, preferLocal: false);
}
},
0,
preferLocal: false);
t = new Thread(() =>
{
// Enqueue local work from thread pool worker threads
Assert.True(WorkItemCountPerKind / 10 * 10 == WorkItemCountPerKind);
Action<int> localWorkItemEnqueuer = _ =>
{
for (int i = 0; i < WorkItemCountPerKind / 10; i++)
{
ThreadPool.UnsafeQueueUserWorkItem(workItem, 0, preferLocal: true);
}
};
startTest.CheckedWait();
for (int i = 0; i < 10; i++)
{
ThreadPool.UnsafeQueueUserWorkItem(localWorkItemEnqueuer, 0, preferLocal: false);
}
});
t.IsBackground = true;
t.Start();
startTest.Set();
allWorkItemsCompleted.CheckedWait();
}).Dispose();
}).Dispose();
}

public static bool IsThreadingAndRemoteExecutorSupported =>
PlatformDetection.IsThreadingSupported && RemoteExecutor.IsSupported;

Expand Down

0 comments on commit 4882c61

Please sign in to comment.