Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private static int DetermineThreadPoolThreadTimeoutMs()
}

[ThreadStatic]
private static object? t_completionCountObject;
private static ThreadInt64PersistentCounter.ThreadLocalNode? t_completionCountNode;

#pragma warning disable IDE1006 // Naming Styles
// The singleton must be initialized after the static variables above, as the constructor may be dependent on them.
Expand Down Expand Up @@ -317,22 +317,22 @@ public void GetAvailableThreads(out int workerThreads, out int ioCompletionThrea
public int ThreadCount => _separated.counts.VolatileRead().NumExistingThreads;
public long CompletedWorkItemCount => _completionCounter.Count;

public object GetOrCreateThreadLocalCompletionCountObject() =>
t_completionCountObject ?? CreateThreadLocalCompletionCountObject();
public ThreadInt64PersistentCounter.ThreadLocalNode GetOrCreateThreadLocalCompletionCountNode() =>
t_completionCountNode ?? CreateThreadLocalCompletionCountNode();

[MethodImpl(MethodImplOptions.NoInlining)]
private object CreateThreadLocalCompletionCountObject()
private ThreadInt64PersistentCounter.ThreadLocalNode CreateThreadLocalCompletionCountNode()
{
Debug.Assert(t_completionCountObject == null);
Debug.Assert(t_completionCountNode == null);

object threadLocalCompletionCountObject = _completionCounter.CreateThreadLocalCountObject();
t_completionCountObject = threadLocalCompletionCountObject;
return threadLocalCompletionCountObject;
ThreadInt64PersistentCounter.ThreadLocalNode threadLocalCompletionCountNode = _completionCounter.CreateThreadLocalCountObject();
t_completionCountNode = threadLocalCompletionCountNode;
return threadLocalCompletionCountNode;
}

private void NotifyWorkItemProgress(object threadLocalCompletionCountObject, int currentTimeMs)
private void NotifyWorkItemProgress(ThreadInt64PersistentCounter.ThreadLocalNode threadLocalCompletionCountNode, int currentTimeMs)
{
ThreadInt64PersistentCounter.Increment(threadLocalCompletionCountObject);
threadLocalCompletionCountNode.Increment();
_separated.lastDequeueTime = currentTimeMs;

if (ShouldAdjustMaxWorkersActive(currentTimeMs))
Expand All @@ -342,13 +342,13 @@ private void NotifyWorkItemProgress(object threadLocalCompletionCountObject, int
}

internal void NotifyWorkItemProgress() =>
NotifyWorkItemProgress(GetOrCreateThreadLocalCompletionCountObject(), Environment.TickCount);
NotifyWorkItemProgress(GetOrCreateThreadLocalCompletionCountNode(), Environment.TickCount);

internal bool NotifyWorkItemComplete(object? threadLocalCompletionCountObject, int currentTimeMs)
internal bool NotifyWorkItemComplete(ThreadInt64PersistentCounter.ThreadLocalNode? threadLocalCompletionCountNode, int currentTimeMs)
{
Debug.Assert(threadLocalCompletionCountObject != null);
Debug.Assert(threadLocalCompletionCountNode != null);

NotifyWorkItemProgress(threadLocalCompletionCountObject, currentTimeMs);
NotifyWorkItemProgress(threadLocalCompletionCountNode, currentTimeMs);
return !WorkerThread.ShouldStopProcessingWorkNow(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,7 @@ public ThreadInt64PersistentCounter()
_nodes = new ThreadLocalNode(this);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void Increment(object threadLocalCountObject)
{
Debug.Assert(threadLocalCountObject is ThreadLocalNode);
Unsafe.As<ThreadLocalNode>(threadLocalCountObject).Increment();
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void Decrement(object threadLocalCountObject)
{
Debug.Assert(threadLocalCountObject is ThreadLocalNode);
Unsafe.As<ThreadLocalNode>(threadLocalCountObject).Decrement();
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void Add(object threadLocalCountObject, uint count)
{
Debug.Assert(threadLocalCountObject is ThreadLocalNode);
Unsafe.As<ThreadLocalNode>(threadLocalCountObject).Add(count);
}

public object CreateThreadLocalCountObject()
public ThreadLocalNode CreateThreadLocalCountObject()
{
var node = new ThreadLocalNode(this);

Expand Down Expand Up @@ -105,7 +84,7 @@ public long Count
}
}

private sealed class ThreadLocalNode
internal sealed class ThreadLocalNode
{
private uint _count;
private readonly ThreadInt64PersistentCounter _counter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ internal static void NotifyThreadUnblocked()
{
}

internal static object? GetOrCreateThreadLocalCompletionCountObject() => null;
internal static ThreadInt64PersistentCounter.ThreadLocalNode? GetOrCreateThreadLocalCompletionCountNode() => null;

#pragma warning disable IDE0060
internal static bool NotifyWorkItemComplete(object? threadLocalCompletionCountObject, int currentTimeMs)
internal static bool NotifyWorkItemComplete(ThreadInt64PersistentCounter.ThreadLocalNode? threadLocalCompletionCountNode, int currentTimeMs)
{
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ public static partial class ThreadPool
internal static bool YieldFromDispatchLoop => false;
#endif

internal static object GetOrCreateThreadLocalCompletionCountObject() =>
PortableThreadPool.ThreadPoolInstance.GetOrCreateThreadLocalCompletionCountObject();
internal static ThreadInt64PersistentCounter.ThreadLocalNode GetOrCreateThreadLocalCompletionCountNode() =>
PortableThreadPool.ThreadPoolInstance.GetOrCreateThreadLocalCompletionCountNode();

public static bool SetMaxThreads(int workerThreads, int completionPortThreads) =>
PortableThreadPool.ThreadPoolInstance.SetMaxThreads(workerThreads, completionPortThreads);
Expand Down Expand Up @@ -61,8 +61,8 @@ internal static void NotifyThreadUnblocked()
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static bool NotifyWorkItemComplete(object threadLocalCompletionCountObject, int currentTimeMs) =>
PortableThreadPool.ThreadPoolInstance.NotifyWorkItemComplete(threadLocalCompletionCountObject, currentTimeMs);
internal static bool NotifyWorkItemComplete(ThreadInt64PersistentCounter.ThreadLocalNode threadLocalCompletionCountNode, int currentTimeMs) =>
PortableThreadPool.ThreadPoolInstance.NotifyWorkItemComplete(threadLocalCompletionCountNode, currentTimeMs);

/// <summary>
/// This method is called to request a new thread pool worker to handle pending work.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ internal static void NotifyThreadUnblocked()
{
}

internal static object? GetOrCreateThreadLocalCompletionCountObject() => null;
internal static ThreadInt64PersistentCounter.ThreadLocalNode? GetOrCreateThreadLocalCompletionCountNode() => null;

internal static bool NotifyWorkItemComplete(object? _1, int _2) => true;
internal static bool NotifyWorkItemComplete(ThreadInt64PersistentCounter.ThreadLocalNode? _1, int _2) => true;

private static RegisteredWaitHandle RegisterWaitForSingleObject(
WaitHandle? waitObject,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ internal static void InitializeForThreadPoolThread()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static void IncrementCompletedWorkItemCount() => WindowsThreadPool.IncrementCompletedWorkItemCount();

internal static object GetOrCreateThreadLocalCompletionCountObject() =>
internal static ThreadInt64PersistentCounter.ThreadLocalNode GetOrCreateThreadLocalCompletionCountNode() =>
ThreadPool.UseWindowsThreadPool ?
WindowsThreadPool.GetOrCreateThreadLocalCompletionCountObject() :
PortableThreadPool.ThreadPoolInstance.GetOrCreateThreadLocalCompletionCountObject();
WindowsThreadPool.GetOrCreateThreadLocalCompletionCountNode() :
PortableThreadPool.ThreadPoolInstance.GetOrCreateThreadLocalCompletionCountNode();

public static bool SetMaxThreads(int workerThreads, int completionPortThreads) =>
ThreadPool.UseWindowsThreadPool ?
Expand Down Expand Up @@ -132,10 +132,10 @@ internal static void NotifyWorkItemProgress()
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static bool NotifyWorkItemComplete(object threadLocalCompletionCountObject, int currentTimeMs) =>
internal static bool NotifyWorkItemComplete(ThreadInt64PersistentCounter.ThreadLocalNode threadLocalCompletionCountNode, int currentTimeMs) =>
ThreadPool.UseWindowsThreadPool ?
WindowsThreadPool.NotifyWorkItemComplete(threadLocalCompletionCountObject, currentTimeMs) :
PortableThreadPool.ThreadPoolInstance.NotifyWorkItemComplete(threadLocalCompletionCountObject, currentTimeMs);
WindowsThreadPool.NotifyWorkItemComplete(threadLocalCompletionCountNode, currentTimeMs) :
PortableThreadPool.ThreadPoolInstance.NotifyWorkItemComplete(threadLocalCompletionCountNode, currentTimeMs);

internal static bool NotifyThreadBlocked() =>
ThreadPool.UseWindowsThreadPool ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,7 @@ internal static bool Dispatch()
// Has the desire for logging changed since the last time we entered?
workQueue.RefreshLoggingEnabled();

object? threadLocalCompletionCountObject = tl.threadLocalCompletionCountObject;
ThreadInt64PersistentCounter.ThreadLocalNode? threadLocalCompletionCountNode = tl.threadLocalCompletionCountNode;
Thread currentThread = tl.currentThread;

// Start on clean ExecutionContext and SynchronizationContext
Expand Down Expand Up @@ -1170,7 +1170,7 @@ internal static bool Dispatch()
// us to return the thread to the pool or not.
//
int currentTickCount = Environment.TickCount;
if (!ThreadPool.NotifyWorkItemComplete(threadLocalCompletionCountObject!, currentTickCount))
if (!ThreadPool.NotifyWorkItemComplete(threadLocalCompletionCountNode!, currentTickCount))
{
// This thread is being parked and may remain inactive for a while. Transfer any thread-local work items
// to ensure that they would not be heavily delayed. Tell the caller that this thread was requested to stop
Expand Down Expand Up @@ -1276,7 +1276,7 @@ internal sealed class ThreadPoolWorkQueueThreadLocals
public readonly ThreadPoolWorkQueue workQueue;
public readonly ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue;
public readonly Thread currentThread;
public readonly object? threadLocalCompletionCountObject;
public readonly ThreadInt64PersistentCounter.ThreadLocalNode? threadLocalCompletionCountNode;
public readonly Random.XoshiroImpl random = new Random.XoshiroImpl();

public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq)
Expand All @@ -1286,7 +1286,7 @@ public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq)
workStealingQueue = new ThreadPoolWorkQueue.WorkStealingQueue();
ThreadPoolWorkQueue.WorkStealingQueueList.Add(workStealingQueue);
currentThread = Thread.CurrentThread;
threadLocalCompletionCountObject = ThreadPool.GetOrCreateThreadLocalCompletionCountObject();
threadLocalCompletionCountNode = ThreadPool.GetOrCreateThreadLocalCompletionCountNode();
}

public void TransferLocalWork()
Expand Down Expand Up @@ -1411,8 +1411,7 @@ void IThreadPoolWorkItem.Execute()
if (stageBeforeUpdate == QueueProcessingStage.Determining)
{
// Discount a work item here to avoid counting this queue processing work item
ThreadInt64PersistentCounter.Decrement(
ThreadPoolWorkQueueThreadLocals.threadLocals!.threadLocalCompletionCountObject!);
ThreadPoolWorkQueueThreadLocals.threadLocals!.threadLocalCompletionCountNode!.Decrement();
return;
}
}
Expand Down Expand Up @@ -1455,7 +1454,7 @@ void IThreadPoolWorkItem.Execute()
// Discount a work item here to avoid counting this queue processing work item
if (completedCount > 1)
{
ThreadInt64PersistentCounter.Add(tl.threadLocalCompletionCountObject!, completedCount - 1);
tl.threadLocalCompletionCountNode!.Add(completedCount - 1);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,24 @@ private struct WorkingThreadCounter
private static readonly ThreadInt64PersistentCounter s_completedWorkItemCounter = new ThreadInt64PersistentCounter();

[ThreadStatic]
private static object? t_completionCountObject;
private static ThreadInt64PersistentCounter.ThreadLocalNode? t_completionCountNode;

internal static void InitializeForThreadPoolThread() => t_threadCountHolder = new ThreadCountHolder();

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static void IncrementCompletedWorkItemCount() => ThreadInt64PersistentCounter.Increment(GetOrCreateThreadLocalCompletionCountObject());
internal static void IncrementCompletedWorkItemCount() => GetOrCreateThreadLocalCompletionCountNode().Increment();

internal static object GetOrCreateThreadLocalCompletionCountObject() =>
t_completionCountObject ?? CreateThreadLocalCompletionCountObject();
internal static ThreadInt64PersistentCounter.ThreadLocalNode GetOrCreateThreadLocalCompletionCountNode() =>
t_completionCountNode ?? CreateThreadLocalCompletionCountNode();

[MethodImpl(MethodImplOptions.NoInlining)]
private static object CreateThreadLocalCompletionCountObject()
private static ThreadInt64PersistentCounter.ThreadLocalNode CreateThreadLocalCompletionCountNode()
{
Debug.Assert(t_completionCountObject == null);
Debug.Assert(t_completionCountNode == null);

object threadLocalCompletionCountObject = s_completedWorkItemCounter.CreateThreadLocalCountObject();
t_completionCountObject = threadLocalCompletionCountObject;
return threadLocalCompletionCountObject;
ThreadInt64PersistentCounter.ThreadLocalNode threadLocalCompletionCountNode = s_completedWorkItemCounter.CreateThreadLocalCountObject();
t_completionCountNode = threadLocalCompletionCountNode;
return threadLocalCompletionCountNode;
}

#pragma warning disable IDE0060 // Remove unused parameter
Expand Down Expand Up @@ -132,9 +132,9 @@ public static void GetAvailableThreads(out int workerThreads, out int completion
internal static void NotifyWorkItemProgress() => IncrementCompletedWorkItemCount();

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static bool NotifyWorkItemComplete(object threadLocalCompletionCountObject, int _ /*currentTimeMs*/)
internal static bool NotifyWorkItemComplete(ThreadInt64PersistentCounter.ThreadLocalNode threadLocalCompletionCountNode, int _ /*currentTimeMs*/)
{
ThreadInt64PersistentCounter.Increment(threadLocalCompletionCountObject);
threadLocalCompletionCountNode.Increment();
return true;
}

Expand Down
Loading