diff --git a/sources/core/Stride.Core/Threading/Dispatcher.cs b/sources/core/Stride.Core/Threading/Dispatcher.cs index fc61b4683c..bdc4a1a2b0 100644 --- a/sources/core/Stride.Core/Threading/Dispatcher.cs +++ b/sources/core/Stride.Core/Threading/Dispatcher.cs @@ -16,7 +16,7 @@ namespace Stride.Core.Threading { - public class Dispatcher + public static class Dispatcher { #if STRIDE_PLATFORM_IOS || STRIDE_PLATFORM_ANDROID public static int MaxDegreeOfParallelism = 1; @@ -25,497 +25,319 @@ public class Dispatcher #endif private static readonly ProfilingKey DispatcherSortKey = new ProfilingKey("Dispatcher.Sort"); + private static readonly ProfilingKey DispatcherBatched = new ProfilingKey("Dispatcher.Batched"); public delegate void ValueAction(ref T obj); - public static void For(int fromInclusive, int toExclusive, [Pooled] Action action) - { - using (Profile(action)) + /// + /// The call producing the least amount of overhead, other methods are built on top of this one. + /// + /// + /// The amount of items to process, + /// this total will be split into multiple batches, + /// each batch runs . with the range of items for that batch + /// + /// + /// An object shared across all threads running this job, if TJob is a struct each threads will work off of a unique copy of it + /// + /// If any of the threads executing this job threw an exception, it will be re-thrown in the caller's scope + public static unsafe void ForBatched(int items, TJob batchJob) where TJob : IBatchJob + { + using var _ = Profiler.Begin(DispatcherBatched); + + // This scope's JIT performance is VERY fragile, be careful when tweaking it + + if (items == 0) + return; + + if (MaxDegreeOfParallelism <= 1 || items == 1) { - if (fromInclusive > toExclusive) - { - var temp = fromInclusive; - fromInclusive = toExclusive + 1; - toExclusive = temp + 1; - } + batchJob.Process(0, items); + return; + } - var count = toExclusive - fromInclusive; - if (count == 0) - return; + int batchCount = Math.Min(MaxDegreeOfParallelism, items); + uint itemsPerBatch = (uint)((items + (batchCount - 1)) / batchCount); - if (MaxDegreeOfParallelism <= 1 || count == 1) - { - ExecuteBatch(fromInclusive, toExclusive, action); - } - else - { - var state = BatchState.Acquire(); - state.WorkDone = state.StartInclusive = fromInclusive; + // Performs 1/8 to 1/4 better in most cases, performs up to 1/8 worse when the ratio between + // the duration each individual item takes and the amount of items per batch hits a very narrow sweet-spot. + // Not entirely sure why yet. +#if FALSE + if (items / MaxDegreeOfParallelism > 8) + itemsPerBatch /= 4; // Batches of 2 instead of 8 to allow faster threads to steal more of the work + else if (items / MaxDegreeOfParallelism > 4) + itemsPerBatch /= 2; // Batches of 2 instead of 4 to allow faster threads to steal more of the work +#endif - try - { - var batchCount = Math.Min(MaxDegreeOfParallelism, count); - var batchSize = (count + (batchCount - 1)) / batchCount; + var batch = BatchState.Borrow(itemsPerBatch: itemsPerBatch, endExclusive: (uint)items, references: batchCount, batchJob); + try + { + ThreadPool.Instance.QueueUnsafeWorkItem(batch, &TypeAdapter, batchCount - 1); - // Kick off a worker, then perform work synchronously - state.AddReference(); - Fork(toExclusive, batchSize, MaxDegreeOfParallelism, action, state); + ProcessBatch(batchJob, batch); - // Wait for all workers to finish - state.WaitCompletion(toExclusive); + // Might as well steal some work instead of just waiting, + // also helps prevent potential deadlocks from badly threaded code + while (Volatile.Read(ref batch.ItemsDone) < batch.Total && batch.Finished.WaitOne(0) == false) + ThreadPool.Instance.TryCooperate(); - var ex = Interlocked.Exchange(ref state.ExceptionThrown, null); - if (ex != null) - throw ex; - } - finally - { - state.Release(); - } - } + var ex = Interlocked.Exchange(ref batch.ExceptionThrown, null); + if (ex != null) + throw ex; + } + finally + { + batch.Release(); } } - public static void For(int fromInclusive, int toExclusive, [Pooled] Func initializeLocal, [Pooled] Action action, [Pooled] Action finalizeLocal = null) + private static void TypeAdapter(object obj) where TJob : IBatchJob { - using (Profile(action)) + var batch = obj as BatchState; // 'as' and assert instead of direct cast to improve performance + Debug.Assert(batch is not null); + try { - if (fromInclusive > toExclusive) - { - var temp = fromInclusive; - fromInclusive = toExclusive + 1; - toExclusive = temp + 1; - } - - var count = toExclusive - fromInclusive; - if (count == 0) - return; + ProcessBatch(batch.Job, batch); + } + finally + { + batch.Release(); + } + } - if (MaxDegreeOfParallelism <= 1 || count == 1) - { - ExecuteBatch(fromInclusive, toExclusive, initializeLocal, action, finalizeLocal); - } - else + private static void ProcessBatch(TJob job, [NotNull] BatchState state) where TJob : IBatchJob + { + try + { + for (uint start; (start = Interlocked.Add(ref state.Index, state.ItemsPerBatch) - state.ItemsPerBatch) < state.Total;) { - var state = BatchState.Acquire(); - state.WorkDone = state.StartInclusive = fromInclusive; - - try - { - var batchCount = Math.Min(MaxDegreeOfParallelism, count); - var batchSize = (count + (batchCount - 1)) / batchCount; - - // Kick off a worker, then perform work synchronously - state.AddReference(); - Fork(toExclusive, batchSize, MaxDegreeOfParallelism, initializeLocal, action, finalizeLocal, state); + uint end = Math.Min(start + state.ItemsPerBatch, state.Total); - // Wait for all workers to finish - state.WaitCompletion(toExclusive); + job.Process((int)start, (int)end); - var ex = Interlocked.Exchange(ref state.ExceptionThrown, null); - if (ex != null) - throw ex; - } - finally + if (Interlocked.Add(ref state.ItemsDone, state.ItemsPerBatch) >= state.Total) { - state.Release(); + state.Finished.Set(); + break; } } } - } - - public static void ForEach([NotNull] IReadOnlyList collection, [Pooled] Func initializeLocal, [Pooled] Action action, [Pooled] Action finalizeLocal = null) - { - For(0, collection.Count, initializeLocal, (i, local) => action(collection[i], local), finalizeLocal); + catch (Exception e) + { + Interlocked.Exchange(ref state.ExceptionThrown, e); + throw; + } } - public static void ForEach([NotNull] IReadOnlyList collection, [Pooled] Action action) + public static unsafe void ForBatched(int items, in T parameter, delegate* executeBatch) { - For(0, collection.Count, i => action(collection[i])); + var batchedDelegate = new BatchedDelegate + { + Param = parameter, + Delegate = executeBatch, + }; + ForBatched(items, batchedDelegate); } - public static void ForEach([NotNull] List collection, [Pooled] Action action) + public static unsafe void ForBatched(int items, ref T parameter, delegate* executeBatch) { - For(0, collection.Count, i => action(collection[i])); + var batchedDelegate = new BatchedDelegateRef + { + Param = parameter, + Delegate = executeBatch, + }; + ForBatched(items, batchedDelegate); } - public static void ForEach([NotNull] Dictionary collection, [Pooled] Action> action) + public static unsafe void ForBatched(int items, [Pooled] Action executeBatch) { - if (MaxDegreeOfParallelism <= 1 || collection.Count <= 1) - { - ExecuteBatch(collection, 0, collection.Count, action); - } - else + var batchedDelegate = new BatchedDelegate> { - var state = BatchState.Acquire(); + Param = executeBatch, + Delegate = &ForBatchedAction, + }; + ForBatched(items, batchedDelegate); - try - { - var batchCount = Math.Min(MaxDegreeOfParallelism, collection.Count); - var batchSize = (collection.Count + (batchCount - 1)) / batchCount; - - // Kick off a worker, then perform work synchronously - state.AddReference(); - Fork(collection, batchSize, MaxDegreeOfParallelism, action, state); + static void ForBatchedAction(Action parameter, int from, int toExclusive) + { + parameter(from, toExclusive); + } + } - // Wait for all workers to finish - state.WaitCompletion(collection.Count); + public static unsafe void For(int fromInclusive, int toExclusive, [Pooled] Action action) + { + var parameters = (action, fromInclusive); + ForBatched(toExclusive - fromInclusive, parameters, &ForWrapped); - var ex = Interlocked.Exchange(ref state.ExceptionThrown, null); - if (ex != null) - throw ex; - } - finally + static void ForWrapped((Action action, int start) parameters, int from, int toExclusive) + { + for (int i = from; i < toExclusive; i++) { - state.Release(); + parameters.action(parameters.start + i); } } } - public static void ForEach([NotNull] Dictionary collection, [Pooled] Func initializeLocal, [Pooled] Action, TLocal> action, [Pooled] Action finalizeLocal = null) + public static unsafe void For(int fromInclusive, int toExclusive, [Pooled] Func initializeLocal, [Pooled] Action action, [Pooled] Action finalizeLocal = null) { - if (MaxDegreeOfParallelism <= 1 || collection.Count <= 1) - { - ExecuteBatch(collection, 0, collection.Count, initializeLocal, action, finalizeLocal); - } - else - { - var state = BatchState.Acquire(); + var parameters = (initializeLocal, action, finalizeLocal, fromInclusive); + ForBatched(toExclusive - fromInclusive, parameters, &ForWrapped); + static void ForWrapped((Func initializeLocal, Action action, Action finalizeLocal, int start) parameters, int from, int toExclusive) + { + TLocal local = default(TLocal); try { - var batchCount = Math.Min(MaxDegreeOfParallelism, collection.Count); - var batchSize = (collection.Count + (batchCount - 1)) / batchCount; - - // Kick off a worker, then perform work synchronously - state.AddReference(); - Fork(collection, batchSize, MaxDegreeOfParallelism, initializeLocal, action, finalizeLocal, state); - - // Wait for all workers to finish - state.WaitCompletion(collection.Count); + if (parameters.initializeLocal != null) + { + local = parameters.initializeLocal.Invoke(); + } - var ex = Interlocked.Exchange(ref state.ExceptionThrown, null); - if (ex != null) - throw ex; + for (int i = from; i < toExclusive; i++) + { + parameters.action(parameters.start + i, local); + } } finally { - state.Release(); + parameters.finalizeLocal?.Invoke(local); } } } - public static void ForEach([NotNull] FastCollection collection, [Pooled] Action action) - { - For(0, collection.Count, i => action(collection[i])); - } - - public static void ForEach([NotNull] FastList collection, [Pooled] Action action) + public static void ForEach([NotNull] T[] collection, [Pooled] Action action) { - For(0, collection.Count, i => action(collection.Items[i])); + ForEach(collection, action); } public static void ForEach([NotNull] ConcurrentCollector collection, [Pooled] Action action) { - For(0, collection.Count, i => action(collection.Items[i])); + ForEach>(collection, action); } - public static void ForEach([NotNull] FastList collection, [Pooled] ValueAction action) + public static void ForEach([NotNull] List collection, [Pooled] Action action) { - For(0, collection.Count, i => action(ref collection.Items[i])); + ForEach>(collection, action); } - public static void ForEach([NotNull] ConcurrentCollector collection, [Pooled] ValueAction action) + public static void ForEach([NotNull] ConcurrentCollector collection, [Pooled] Func initializeLocal, [Pooled] Action action, [Pooled] Action finalizeLocal = null) { - For(0, collection.Count, i => action(ref collection.Items[i])); + ForEach>(collection, initializeLocal, action, finalizeLocal); } - private static void Fork([NotNull] Dictionary collection, int batchSize, int maxDegreeOfParallelism, [Pooled] Action> action, [NotNull] BatchState state) + public static void ForEach([NotNull] FastCollection collection, [Pooled] Action action) { - // Other threads already processed all work before this one started. - if (state.StartInclusive >= collection.Count) - { - state.Release(); - return; - } - - // Kick off another worker if there's any work left - if (maxDegreeOfParallelism > 1 && state.StartInclusive + batchSize < collection.Count) - { - int workToSchedule = maxDegreeOfParallelism - 1; - for (int i = 0; i < workToSchedule; i++) - { - state.AddReference(); - } - ThreadPool.Instance.QueueWorkItem(() => Fork(collection, batchSize, 0, action, state), workToSchedule); - } - - try - { - // Process batches synchronously as long as there are any - int newStart; - while ((newStart = Interlocked.Add(ref state.StartInclusive, batchSize)) - batchSize < collection.Count) - { - try - { - // TODO: Reuse enumerator when processing multiple batches synchronously - var start = newStart - batchSize; - ExecuteBatch(collection, newStart - batchSize, Math.Min(collection.Count, newStart) - start, action); - } - finally - { - if (Interlocked.Add(ref state.WorkDone, batchSize) >= collection.Count) - { - // Don't wait for other threads to wake up and signal the BatchState, release as soon as work is finished - state.Finished.Set(); - } - } - } - } - catch (Exception e) - { - Interlocked.Exchange(ref state.ExceptionThrown, e); - throw; - } - finally - { - state.Release(); - } + ForEach>(collection, action); } - private static void Fork([NotNull] Dictionary collection, int batchSize, int maxDegreeOfParallelism, [Pooled] Func initializeLocal, [Pooled] Action, TLocal> action, [Pooled] Action finalizeLocal, [NotNull] BatchState state) + public static unsafe void ForEach([NotNull] ConcurrentCollector collection, [Pooled] ValueAction action) { - // Other threads already processed all work before this one started. - if (state.StartInclusive >= collection.Count) - { - state.Release(); - return; - } + var parameters = (action, collection); + ForBatched(collection.Count, parameters, &ForEachList); - // Kick off another worker if there's any work left - if (maxDegreeOfParallelism > 1 && state.StartInclusive + batchSize < collection.Count) + static void ForEachList((ValueAction action, ConcurrentCollector collection) parameters, int from, int toExclusive) { - int workToSchedule = maxDegreeOfParallelism - 1; - for (int i = 0; i < workToSchedule; i++) + for (int i = from; i < toExclusive; i++) { - state.AddReference(); + parameters.action(ref parameters.collection.Items[i]); } - ThreadPool.Instance.QueueWorkItem(() => Fork(collection, batchSize, 0, initializeLocal, action, finalizeLocal, state), workToSchedule); - } - - try - { - // Process batches synchronously as long as there are any - int newStart; - while ((newStart = Interlocked.Add(ref state.StartInclusive, batchSize)) - batchSize < collection.Count) - { - try - { - // TODO: Reuse enumerator when processing multiple batches synchronously - var start = newStart - batchSize; - ExecuteBatch(collection, newStart - batchSize, Math.Min(collection.Count, newStart) - start, initializeLocal, action, finalizeLocal); - } - finally - { - if (Interlocked.Add(ref state.WorkDone, batchSize) >= collection.Count) - { - // Don't wait for other threads to wake up and signal the BatchState, release as soon as work is finished - state.Finished.Set(); - } - } - } - } - catch (Exception e) - { - Interlocked.Exchange(ref state.ExceptionThrown, e); - throw; - } - finally - { - state.Release(); } } - private static void ExecuteBatch(int fromInclusive, int toExclusive, [Pooled] Action action) + public static unsafe void ForEach([NotNull] TList collection, [Pooled] Action action) where TList : IReadOnlyList { - for (var i = fromInclusive; i < toExclusive; i++) - { - action(i); - } - } + var parameters = (action, collection); + ForBatched(collection.Count, parameters, &ForEachList); - private static void ExecuteBatch(int fromInclusive, int toExclusive, [Pooled] Func initializeLocal, [Pooled] Action action, [Pooled] Action finalizeLocal) - { - var local = default(TLocal); - try + static void ForEachList((Action action, TList collection) parameters, int from, int toExclusive) { - if (initializeLocal != null) - { - local = initializeLocal(); - } - - for (var i = fromInclusive; i < toExclusive; i++) + for (int i = from; i < toExclusive; i++) { - action(i, local); + parameters.action(parameters.collection[i]); } } - finally - { - finalizeLocal?.Invoke(local); - } } - private static void Fork(int endExclusive, int batchSize, int maxDegreeOfParallelism, [Pooled] Action action, [NotNull] BatchState state) + public static unsafe void ForEach([NotNull] TList collection, [Pooled] Func initializeLocal, [Pooled] Action action, [Pooled] Action finalizeLocal = null) where TList : IReadOnlyList { - // Other threads already processed all work before this one started. - if (state.StartInclusive >= endExclusive) - { - state.Release(); - return; - } + var parameters = (initializeLocal, action, finalizeLocal, collection); + ForBatched(collection.Count, parameters, &ForEachList); - // Kick off another worker if there's any work left - if (maxDegreeOfParallelism > 1 && state.StartInclusive + batchSize < endExclusive) + static void ForEachList((Func initializeLocal, Action action, Action finalizeLocal, TList collection) parameters, int from, int toExclusive) { - int workToSchedule = maxDegreeOfParallelism - 1; - for (int i = 0; i < workToSchedule; i++) - { - state.AddReference(); - } - ThreadPool.Instance.QueueWorkItem(() => Fork(endExclusive, batchSize, 0, action, state), workToSchedule); - } - - try - { - // Process batches synchronously as long as there are any - int newStart; - while ((newStart = Interlocked.Add(ref state.StartInclusive, batchSize)) - batchSize < endExclusive) + TLocal local = default(TLocal); + try { - try + if (parameters.initializeLocal != null) { - ExecuteBatch(newStart - batchSize, Math.Min(endExclusive, newStart), action); + local = parameters.initializeLocal.Invoke(); } - finally + + for (int i = from; i < toExclusive; i++) { - if (Interlocked.Add(ref state.WorkDone, batchSize) >= endExclusive) - { - // Don't wait for other threads to wake up and signal the BatchState, release as soon as work is finished - state.Finished.Set(); - } + parameters.action(parameters.collection[i], local); } } - } - catch (Exception e) - { - Interlocked.Exchange(ref state.ExceptionThrown, e); - throw; - } - finally - { - state.Release(); + finally + { + parameters.finalizeLocal?.Invoke(local); + } } } - private static void Fork(int endExclusive, int batchSize, int maxDegreeOfParallelism, [Pooled] Func initializeLocal, [Pooled] Action action, [Pooled] Action finalizeLocal, [NotNull] BatchState state) + public static unsafe void ForEach([NotNull] Dictionary collection, [Pooled] Action> action) { - // Other threads already processed all work before this one started. - if (state.StartInclusive >= endExclusive) - { - state.Release(); - return; - } + var parameters = (action, collection); + ForBatched(collection.Count, parameters, &ForEachDict); - // Kick off another worker if there's any work left - if (maxDegreeOfParallelism > 1 && state.StartInclusive + batchSize < endExclusive) + static void ForEachDict((Action> action, Dictionary collection) parameters, int from, int toExclusive) { - int workToSchedule = maxDegreeOfParallelism - 1; - for (int i = 0; i < workToSchedule; i++) + using var enumerator = parameters.collection.GetEnumerator(); + + // Skip to offset + for (int i = 0; i < from; i++) { - state.AddReference(); + enumerator.MoveNext(); } - ThreadPool.Instance.QueueWorkItem(() => Fork(endExclusive, batchSize, 0, initializeLocal, action, finalizeLocal, state), workToSchedule); - } - try - { - // Process batches synchronously as long as there are any - int newStart; - while ((newStart = Interlocked.Add(ref state.StartInclusive, batchSize)) - batchSize < endExclusive) + // Process batch + for (int i = from; i < toExclusive && enumerator.MoveNext(); i++) { - try - { - ExecuteBatch(newStart - batchSize, Math.Min(endExclusive, newStart), initializeLocal, action, finalizeLocal); - } - finally - { - if (Interlocked.Add(ref state.WorkDone, batchSize) >= endExclusive) - { - // Don't wait for other threads to wake up and signal the BatchState, release as soon as work is finished - state.Finished.Set(); - } - } + parameters.action(enumerator.Current); } } - catch (Exception e) - { - Interlocked.Exchange(ref state.ExceptionThrown, e); - throw; - } - finally - { - state.Release(); - } } - private static void ExecuteBatch([NotNull] Dictionary dictionary, int offset, int count, [Pooled] Action> action) + public static unsafe void ForEach([NotNull] Dictionary collection, [Pooled] Func initializeLocal, [Pooled] Action, TLocal> action, [Pooled] Action finalizeLocal = null) { - var enumerator = dictionary.GetEnumerator(); - var index = 0; + var parameters = (initializeLocal, action, finalizeLocal, collection); + ForBatched(collection.Count, parameters, &ForEachDict); - // Skip to offset - while (index < offset && enumerator.MoveNext()) + static void ForEachDict((Func initializeLocal, Action, TLocal> action, Action finalizeLocal, Dictionary collection) parameters, int from, int toExclusive) { - index++; - } + using var enumerator = parameters.collection.GetEnumerator(); - // Process batch - while (index < offset + count && enumerator.MoveNext()) - { - action(enumerator.Current); - index++; - } - } - - private static void ExecuteBatch([NotNull] Dictionary dictionary, int offset, int count, [Pooled] Func initializeLocal, [Pooled] Action, TLocal> action, [Pooled] Action finalizeLocal) - { - var local = default(TLocal); - try - { - if (initializeLocal != null) + for (int i = 0; i < from; i++) // Skip to the start of our batch { - local = initializeLocal(); + enumerator.MoveNext(); } - var enumerator = dictionary.GetEnumerator(); - var index = 0; - - // Skip to offset - while (index < offset && enumerator.MoveNext()) + TLocal local = default; + try { - index++; - } + if (parameters.initializeLocal != null) + local = parameters.initializeLocal.Invoke(); - // Process batch - while (index < offset + count && enumerator.MoveNext()) + for (int i = from; i < toExclusive && enumerator.MoveNext(); i++) + { + parameters.action(enumerator.Current, local); + } + } + finally { - action(enumerator.Current, local); - index++; + parameters.finalizeLocal?.Invoke(local); } } - finally - { - finalizeLocal?.Invoke(local); - } } public static void Sort([NotNull] ConcurrentCollector collection, IComparer comparer) @@ -665,51 +487,84 @@ private static void Swap([NotNull] T[] collection, int i, int j) collection[j] = temp; } - private class BatchState + /// + /// An implementation of a job running in batches. + /// Implementing this as a struct improves performance as the JIT would have an easier time inlining the call. + /// Implementing this as a class would provide more utility as this object would be shared across all threads, + /// allowing for interlocked operations and other communication between threads. + /// + public interface IBatchJob + { + /// + /// Execute this job over a range of items + /// + /// the start of the range + /// the end of the range, iterate as long as i < endExclusive + void Process(int start, int endExclusive); + } + + private sealed class BatchState where TJob : IBatchJob { - private static readonly ConcurrentPool Pool = new ConcurrentPool(() => new BatchState()); + private static readonly ConcurrentStack> Pool = new(); private int referenceCount; - public readonly ManualResetEvent Finished = new ManualResetEvent(false); + public readonly ManualResetEvent Finished = new(false); - public int StartInclusive; + public uint Index, Total, ItemsPerBatch, ItemsDone; - public int WorkDone; + public TJob Job; public Exception ExceptionThrown; [NotNull] - public static BatchState Acquire() + public static BatchState Borrow(uint itemsPerBatch, uint endExclusive, int references, TJob job) { - var state = Pool.Acquire(); - state.referenceCount = 1; - state.StartInclusive = 0; - state.WorkDone = 0; + if (Pool.TryPop(out var state) == false) + state = new(); + + state.Index = 0; + state.Total = endExclusive; + state.ItemsPerBatch = itemsPerBatch; + state.ItemsDone = 0; state.ExceptionThrown = null; - state.Finished.Reset(); + state.referenceCount = references; + state.Job = job; return state; } - public void AddReference() - { - Interlocked.Increment(ref referenceCount); - } - public void Release() { - if (Interlocked.Decrement(ref referenceCount) == 0) + var refCount = Interlocked.Decrement(ref referenceCount); + if (refCount == 0) { - Pool.Release(this); + Job = default; // Clear any references it may hold onto + Finished.Reset(); + Pool.Push(this); } + Debug.Assert(refCount >= 0); } - - public void WaitCompletion(int end) + } + + struct BatchedDelegateRef : IBatchJob + { + public T Param; + public unsafe delegate* Delegate; + + public unsafe void Process(int start, int endExclusive) { - // Might as well steal some work instead of just waiting, - // also helps prevent potential deadlocks from badly threaded code - while(WorkDone < end && Finished.WaitOne(0) == false) - ThreadPool.Instance.TryCooperate(); + Delegate(ref Param, start, endExclusive); + } + } + + struct BatchedDelegate : IBatchJob + { + public T Param; + public unsafe delegate* Delegate; + + public unsafe void Process(int start, int endExclusive) + { + Delegate(Param, start, endExclusive); } } diff --git a/sources/core/Stride.Core/Threading/ThreadPool.SemaphoreW.cs b/sources/core/Stride.Core/Threading/ThreadPool.SemaphoreW.cs index 2ee9908462..93bf23f472 100644 --- a/sources/core/Stride.Core/Threading/ThreadPool.SemaphoreW.cs +++ b/sources/core/Stride.Core/Threading/ThreadPool.SemaphoreW.cs @@ -15,7 +15,7 @@ public sealed partial class ThreadPool /// /// Mostly lifted from dotnet's LowLevelLifoSemaphore /// - private class SemaphoreW + private class SemaphoreW : ISemaphore { private const int SpinSleep0Threshold = 10; @@ -60,7 +60,9 @@ public SemaphoreW(int spinCountParam) public void Wait(int timeout = -1) => internals.Wait(spinCount, lifoSemaphore, timeout); - public void Release(int releaseCount) => internals.Release(releaseCount, lifoSemaphore); + public void Release(int count) => internals.Release(count, lifoSemaphore); + + public void Dispose() => lifoSemaphore?.Dispose(); [StructLayout(LayoutKind.Explicit)] private struct Counts @@ -367,4 +369,4 @@ private struct PaddingFalseSharing #endif } } -} \ No newline at end of file +} diff --git a/sources/core/Stride.Core/Threading/ThreadPool.cs b/sources/core/Stride.Core/Threading/ThreadPool.cs index 3951ea39a6..8424455f84 100644 --- a/sources/core/Stride.Core/Threading/ThreadPool.cs +++ b/sources/core/Stride.Core/Threading/ThreadPool.cs @@ -5,6 +5,8 @@ using Stride.Core.Diagnostics; using System; using System.Collections.Concurrent; +using System.Reflection; +using System.Runtime.InteropServices; using System.Threading; namespace Stride.Core.Threading @@ -15,6 +17,8 @@ namespace Stride.Core.Threading /// public sealed partial class ThreadPool : IDisposable { + private static readonly Logger Logger = GlobalLogger.GetLogger(nameof(ThreadPool)); + /// /// The default instance that the whole process shares, use this one to avoid wasting process memory. /// @@ -28,9 +32,9 @@ public sealed partial class ThreadPool : IDisposable private static readonly ProfilingKey ProcessWorkItemKey = new ProfilingKey($"{nameof(ThreadPool)}.ProcessWorkItem"); - private readonly ConcurrentQueue workItems = new ConcurrentQueue(); - private readonly SemaphoreW semaphore; - + private readonly ConcurrentQueue workItems = new ConcurrentQueue(); + private readonly ISemaphore semaphore; + private long completionCounter; private int workScheduled, threadsBusy; private int disposing; @@ -47,8 +51,30 @@ public sealed partial class ThreadPool : IDisposable public ThreadPool(int? threadCount = null) { - semaphore = new SemaphoreW(spinCountParam:70); - + int spinCount = 70; + + if(RuntimeInformation.ProcessArchitecture is Architecture.Arm or Architecture.Arm64) + { + // Dotnet: + // On systems with ARM processors, more spin-waiting seems to be necessary to avoid perf regressions from incurring + // the full wait when work becomes available soon enough. This is more noticeable after reducing the number of + // thread requests made to the thread pool because otherwise the extra thread requests cause threads to do more + // busy-waiting instead and adding to contention in trying to look for work items, which is less preferable. + spinCount *= 4; + } + try + { + semaphore = new DotnetLifoSemaphore(spinCount); + } + catch(Exception e) + { + // For net6+ this should not happen, logging instead of throwing as this is just a performance regression + if(Environment.Version.Major >= 6) + Logger.Warning($"Could not bind to dotnet's Lifo Semaphore, falling back to suboptimal semaphore:\n{e}"); + + semaphore = new SemaphoreW(spinCountParam:70); + } + WorkerThreadsCount = threadCount ?? (Environment.ProcessorCount == 1 ? 1 : Environment.ProcessorCount - 1); leftToDispose = WorkerThreadsCount; for (int i = 0; i < WorkerThreadsCount; i++) @@ -66,7 +92,7 @@ static ThreadPool() /// Queue an action to run on one of the available threads, /// it is strongly recommended that the action takes less than a millisecond. /// - public void QueueWorkItem([NotNull, Pooled] Action workItem, int amount = 1) + public unsafe void QueueWorkItem([NotNull, Pooled] Action workItem, int amount = 1) { // Throw right here to help debugging if (workItem == null) @@ -85,10 +111,55 @@ public void QueueWorkItem([NotNull, Pooled] Action workItem, int amount = 1) } Interlocked.Add(ref workScheduled, amount); + var work = new Work { WorkHandler = &ActionHandler, Data = workItem }; for (int i = 0; i < amount; i++) { PooledDelegateHelper.AddReference(workItem); - workItems.Enqueue(workItem); + workItems.Enqueue(work); + } + semaphore.Release(amount); + } + + static void ActionHandler(object param) + { + Action action = (Action)param; + try + { + action(); + } + finally + { + PooledDelegateHelper.Release(action); + } + } + + /// + /// Queue some work item to run on one of the available threads, + /// it is strongly recommended that the action takes less than a millisecond. + /// Additionally, the parameter provided must be fixed from this call onward until the action has finished executing + /// + public unsafe void QueueUnsafeWorkItem(object parameter, delegate* obj, int amount = 1) + { + if (parameter == null) + { + throw new NullReferenceException(nameof(parameter)); + } + + if (amount < 1) + { + throw new ArgumentOutOfRangeException(nameof(amount)); + } + + if (disposing > 0) + { + throw new ObjectDisposedException(ToString()); + } + + Interlocked.Add(ref workScheduled, amount); + var work = new Work { WorkHandler = obj, Data = parameter }; + for (int i = 0; i < amount; i++) + { + workItems.Enqueue(work); } semaphore.Release(amount); } @@ -98,7 +169,7 @@ public void QueueWorkItem([NotNull, Pooled] Action workItem, int amount = 1) /// If you absolutely have to block inside one of the threadpool's thread for whatever /// reason do a busy loop over this function. /// - public bool TryCooperate() + public unsafe bool TryCooperate() { if (workItems.TryDequeue(out var workItem)) { @@ -106,12 +177,11 @@ public bool TryCooperate() Interlocked.Decrement(ref workScheduled); try { - using var _ = Profiler.Begin(ProcessWorkItemKey); - workItem.Invoke(); + using (Profiler.Begin(ProcessWorkItemKey)) + workItem.WorkHandler(workItem.Data); } finally { - PooledDelegateHelper.Release(workItem); Interlocked.Decrement(ref threadsBusy); Interlocked.Increment(ref completionCounter); } @@ -172,14 +242,11 @@ public void Dispose() { return; } - + semaphore.Release(WorkerThreadsCount); + semaphore.Dispose(); while (Volatile.Read(ref leftToDispose) != 0) { - if (semaphore.SignalCount == 0) - { - semaphore.Release(1); - } Thread.Yield(); } @@ -189,5 +256,38 @@ public void Dispose() } } + + unsafe struct Work + { + public object Data; + public delegate* WorkHandler; + } + + private interface ISemaphore : IDisposable + { + public void Release(int count); + public void Wait(int timeout = -1); + } + + private sealed class DotnetLifoSemaphore : ISemaphore + { + private readonly IDisposable semaphore; + private readonly Func wait; + private readonly Action release; + + public DotnetLifoSemaphore(int spinCount) + { + // The semaphore Dotnet uses for its own threadpool is more efficient than what's publicly available, + // but sadly it is internal - we'll hijack it through reflection + Type lifoType = Type.GetType("System.Threading.LowLevelLifoSemaphore"); + semaphore = Activator.CreateInstance(lifoType, new object[]{ 0, short.MaxValue, spinCount, new Action( () => {} ) }) as IDisposable; + wait = lifoType.GetMethod("Wait", BindingFlags.Instance | BindingFlags.Public).CreateDelegate>(semaphore); + release = lifoType.GetMethod("Release", BindingFlags.Instance | BindingFlags.Public).CreateDelegate>(semaphore); + } + + public void Dispose() => semaphore.Dispose(); + public void Release(int count) => release(count); + public void Wait(int timeout = -1) => wait(timeout, true); + } } } diff --git a/sources/engine/Stride.Engine/Engine/Processors/TransformProcessor.cs b/sources/engine/Stride.Engine/Engine/Processors/TransformProcessor.cs index fab91cf612..3ce996a3ec 100644 --- a/sources/engine/Stride.Engine/Engine/Processors/TransformProcessor.cs +++ b/sources/engine/Stride.Engine/Engine/Processors/TransformProcessor.cs @@ -93,9 +93,9 @@ protected override void OnEntityComponentRemoved(Entity entity, TransformCompone } } - internal void UpdateTransformations(FastCollection transformationComponents) + internal unsafe void UpdateTransformations(FastCollection transformationComponents) { - Dispatcher.ForEach(transformationComponents, UpdateTransformationsRecursive); + Dispatcher.ForBatched(transformationComponents.Count, transformationComponents, &UpdateTransformationsRecursive); // Re-update model node links to avoid one frame delay compared reference model (ideally entity should be sorted to avoid this in future). if (ModelNodeLinkProcessor != null) @@ -105,17 +105,18 @@ internal void UpdateTransformations(FastCollection transform { modelNodeLinkComponents.Add(modelNodeLink.Entity.Transform); } - Dispatcher.ForEach(modelNodeLinkComponents, UpdateTransformationsRecursive); + Dispatcher.ForBatched(modelNodeLinkComponents.Count, modelNodeLinkComponents, &UpdateTransformationsRecursive); } } - private static void UpdateTransformationsRecursive(TransformComponent transform) + private static void UpdateTransformationsRecursive(FastCollection transforms, int from, int toExclusive) { - transform.UpdateLocalMatrix(); - transform.UpdateWorldMatrixInternal(false); - foreach (var child in transform.Children) + for (int i = from; i < toExclusive; i++) { - UpdateTransformationsRecursive(child); + var transform = transforms[i]; + transform.UpdateLocalMatrix(); + transform.UpdateWorldMatrixInternal(false); + UpdateTransformationsRecursive(transform.Children, 0, transform.Children.Count); } } diff --git a/sources/engine/Stride.Rendering/Rendering/Materials/MaterialRenderFeature.cs b/sources/engine/Stride.Rendering/Rendering/Materials/MaterialRenderFeature.cs index 89e4da1de0..533b2c1c39 100644 --- a/sources/engine/Stride.Rendering/Rendering/Materials/MaterialRenderFeature.cs +++ b/sources/engine/Stride.Rendering/Rendering/Materials/MaterialRenderFeature.cs @@ -280,31 +280,35 @@ public override void Prepare(RenderDrawContext context) // Assign descriptor sets to each render node var resourceGroupPool = ((RootEffectRenderFeature)RootRenderFeature).ResourceGroupPool; - Dispatcher.For(0, RootRenderFeature.RenderNodes.Count, () => context.RenderContext.GetThreadContext(), (renderNodeIndex, threadContext) => + Dispatcher.ForBatched(RootRenderFeature.RenderNodes.Count, (from, toExclusive) => { - var renderNodeReference = new RenderNodeReference(renderNodeIndex); - var renderNode = RootRenderFeature.RenderNodes[renderNodeIndex]; - var renderMesh = (RenderMesh)renderNode.RenderObject; + var threadContext = context.RenderContext.GetThreadContext(); + for (int i = from; i < toExclusive; i++) + { + var renderNodeReference = new RenderNodeReference(i); + var renderNode = RootRenderFeature.RenderNodes[i]; + var renderMesh = (RenderMesh)renderNode.RenderObject; - // Ignore fallback effects - if (renderNode.RenderEffect.State != RenderEffectState.Normal) - return; + // Ignore fallback effects + if (renderNode.RenderEffect.State != RenderEffectState.Normal) + continue; - // Collect materials and create associated MaterialInfo (includes reflection) first time - // TODO: We assume same material will generate same ResourceGroup (i.e. same resources declared in same order) - // Need to offer some protection if this invariant is violated (or support it if it can actually happen in real scenario) - var material = renderMesh.MaterialPass; - var materialInfo = renderMesh.MaterialInfo; - var materialParameters = material.Parameters; + // Collect materials and create associated MaterialInfo (includes reflection) first time + // TODO: We assume same material will generate same ResourceGroup (i.e. same resources declared in same order) + // Need to offer some protection if this invariant is violated (or support it if it can actually happen in real scenario) + var material = renderMesh.MaterialPass; + var materialInfo = renderMesh.MaterialInfo; + var materialParameters = material.Parameters; - // Register resources usage - Context.StreamingManager?.StreamResources(materialParameters); + // Register resources usage + Context.StreamingManager?.StreamResources(materialParameters); - if (!UpdateMaterial(RenderSystem, threadContext, materialInfo, perMaterialDescriptorSetSlot.Index, renderNode.RenderEffect, materialParameters)) - return; + if (!UpdateMaterial(RenderSystem, threadContext, materialInfo, perMaterialDescriptorSetSlot.Index, renderNode.RenderEffect, materialParameters)) + continue; - var descriptorSetPoolOffset = ((RootEffectRenderFeature)RootRenderFeature).ComputeResourceGroupOffset(renderNodeReference); - resourceGroupPool[descriptorSetPoolOffset + perMaterialDescriptorSetSlot.Index] = materialInfo.Resources; + var descriptorSetPoolOffset = ((RootEffectRenderFeature)RootRenderFeature).ComputeResourceGroupOffset(renderNodeReference); + resourceGroupPool[descriptorSetPoolOffset + perMaterialDescriptorSetSlot.Index] = materialInfo.Resources; + } }); } diff --git a/sources/engine/Stride.Rendering/Rendering/SkinningRenderFeature.cs b/sources/engine/Stride.Rendering/Rendering/SkinningRenderFeature.cs index e05af8ed36..c453dfb92c 100644 --- a/sources/engine/Stride.Rendering/Rendering/SkinningRenderFeature.cs +++ b/sources/engine/Stride.Rendering/Rendering/SkinningRenderFeature.cs @@ -56,7 +56,7 @@ public override void PrepareEffectPermutations(RenderDrawContext context) int effectSlotCount = ((RootEffectRenderFeature)RootRenderFeature).EffectPermutationSlotCount; //foreach (var objectNodeReference in RootRenderFeature.ObjectNodeReferences) - Dispatcher.ForEach(((RootEffectRenderFeature)RootRenderFeature).ObjectNodeReferences, objectNodeReference => + Dispatcher.ForEach(RootRenderFeature.ObjectNodeReferences, objectNodeReference => { var objectNode = RootRenderFeature.GetObjectNode(objectNodeReference); var renderMesh = (RenderMesh)objectNode.RenderObject; @@ -116,25 +116,29 @@ public override unsafe void Prepare(RenderDrawContext context) { var renderModelObjectInfoData = RootRenderFeature.RenderData.GetData(renderModelObjectInfoKey); - Dispatcher.ForEach(((RootEffectRenderFeature)RootRenderFeature).RenderNodes, (ref RenderNode renderNode) => + Dispatcher.ForBatched(RootRenderFeature.RenderNodes.Count, (from, toExclusive) => { - var perDrawLayout = renderNode.RenderEffect.Reflection?.PerDrawLayout; - if (perDrawLayout == null) - return; + for (int i = from; i < toExclusive; i++) + { + var renderNode = RootRenderFeature.RenderNodes[i]; + var perDrawLayout = renderNode.RenderEffect.Reflection?.PerDrawLayout; + if (perDrawLayout == null) + continue; - var blendMatricesOffset = perDrawLayout.GetConstantBufferOffset(blendMatrices); - if (blendMatricesOffset == -1) - return; + var blendMatricesOffset = perDrawLayout.GetConstantBufferOffset(blendMatrices); + if (blendMatricesOffset == -1) + continue; - var renderModelObjectInfo = renderModelObjectInfoData[renderNode.RenderObject.ObjectNode]; - if (renderModelObjectInfo == null) - return; + var renderModelObjectInfo = renderModelObjectInfoData[renderNode.RenderObject.ObjectNode]; + if (renderModelObjectInfo == null) + continue; - var mappedCB = (byte*)renderNode.Resources.ConstantBuffer.Data + blendMatricesOffset; + var mappedCB = (byte*)renderNode.Resources.ConstantBuffer.Data + blendMatricesOffset; - fixed (Matrix* blendMatricesPtr = renderModelObjectInfo) - { - Unsafe.CopyBlockUnaligned(mappedCB, blendMatricesPtr, (uint)renderModelObjectInfo.Length * (uint)sizeof(Matrix)); + fixed (Matrix* blendMatricesPtr = renderModelObjectInfo) + { + Unsafe.CopyBlockUnaligned(mappedCB, blendMatricesPtr, (uint)renderModelObjectInfo.Length * (uint)sizeof(Matrix)); + } } }); } diff --git a/sources/engine/Stride.Rendering/Rendering/TransformRenderFeature.cs b/sources/engine/Stride.Rendering/Rendering/TransformRenderFeature.cs index 79d229ef24..766284d84e 100644 --- a/sources/engine/Stride.Rendering/Rendering/TransformRenderFeature.cs +++ b/sources/engine/Stride.Rendering/Rendering/TransformRenderFeature.cs @@ -160,51 +160,55 @@ public override unsafe void Prepare(RenderDrawContext context) // Update PerDraw (World, WorldViewProj, etc...) // Copy Entity.World to PerDraw cbuffer // TODO: Have a PerObject cbuffer? - Dispatcher.ForEach(((RootEffectRenderFeature)RootRenderFeature).RenderNodes, (ref RenderNode renderNode) => + Dispatcher.ForBatched(RootRenderFeature.RenderNodes.Count, (from, toExclusive) => { - var perDrawLayout = renderNode.RenderEffect.Reflection?.PerDrawLayout; - if (perDrawLayout == null) - return; + for (int i = from; i < toExclusive; i++) + { + var renderNode = RootRenderFeature.RenderNodes[i]; + var perDrawLayout = renderNode.RenderEffect.Reflection?.PerDrawLayout; + if (perDrawLayout == null) + continue; - var worldOffset = perDrawLayout.GetConstantBufferOffset(this.world); - var worldInverseOffset = perDrawLayout.GetConstantBufferOffset(this.worldInverse); + var worldOffset = perDrawLayout.GetConstantBufferOffset(this.world); + var worldInverseOffset = perDrawLayout.GetConstantBufferOffset(this.worldInverse); - if (worldOffset == -1 && worldInverseOffset == -1) - return; + if (worldOffset == -1 && worldInverseOffset == -1) + continue; - ref var renderModelObjectInfo = ref renderModelObjectInfoData[renderNode.RenderObject.ObjectNode]; - ref var renderModelViewInfo = ref renderModelViewInfoData[renderNode.ViewObjectNode]; + ref var renderModelObjectInfo = ref renderModelObjectInfoData[renderNode.RenderObject.ObjectNode]; + ref var renderModelViewInfo = ref renderModelViewInfoData[renderNode.ViewObjectNode]; - var mappedCB = renderNode.Resources.ConstantBuffer.Data; - if (worldOffset != -1) - { - var world = (Matrix*)((byte*)mappedCB + worldOffset); - *world = renderModelObjectInfo.World; - } - - if (worldInverseOffset != -1) - { - var perDraw = (PerDrawExtra*)((byte*)mappedCB + worldInverseOffset); + var mappedCB = renderNode.Resources.ConstantBuffer.Data; + if (worldOffset != -1) + { + var world = (Matrix*)((byte*)mappedCB + worldOffset); + *world = renderModelObjectInfo.World; + } - // Fill PerDraw - var perDrawData = new PerDrawExtra + if (worldInverseOffset != -1) { - WorldView = renderModelViewInfo.WorldView, - WorldViewProjection = renderModelViewInfo.WorldViewProjection, - }; + var perDraw = (PerDrawExtra*)((byte*)mappedCB + worldInverseOffset); + + // Fill PerDraw + var perDrawData = new PerDrawExtra + { + WorldView = renderModelViewInfo.WorldView, + WorldViewProjection = renderModelViewInfo.WorldViewProjection, + }; - Matrix.Invert(ref renderModelObjectInfo.World, out perDrawData.WorldInverse); - Matrix.Transpose(ref perDrawData.WorldInverse, out perDrawData.WorldInverseTranspose); - Matrix.Invert(ref renderModelViewInfo.WorldView, out perDrawData.WorldViewInverse); + Matrix.Invert(ref renderModelObjectInfo.World, out perDrawData.WorldInverse); + Matrix.Transpose(ref perDrawData.WorldInverse, out perDrawData.WorldInverseTranspose); + Matrix.Invert(ref renderModelViewInfo.WorldView, out perDrawData.WorldViewInverse); - perDrawData.WorldScale = new Vector3( - ((Vector3)renderModelObjectInfo.World.Row1).Length(), - ((Vector3)renderModelObjectInfo.World.Row2).Length(), - ((Vector3)renderModelObjectInfo.World.Row3).Length()); + perDrawData.WorldScale = new Vector3( + ((Vector3)renderModelObjectInfo.World.Row1).Length(), + ((Vector3)renderModelObjectInfo.World.Row2).Length(), + ((Vector3)renderModelObjectInfo.World.Row3).Length()); - perDrawData.EyeMS = new Vector4(perDrawData.WorldInverse.M41, perDrawData.WorldInverse.M42, perDrawData.WorldInverse.M43, 1.0f); + perDrawData.EyeMS = new Vector4(perDrawData.WorldInverse.M41, perDrawData.WorldInverse.M42, perDrawData.WorldInverse.M43, 1.0f); - *perDraw = perDrawData; + *perDraw = perDrawData; + } } }); }