forked from dougbinks/enkiTS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTaskScheduler.h
386 lines (321 loc) · 16.9 KB
/
TaskScheduler.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
// Copyright (c) 2013 Doug Binks
//
// This software is provided 'as-is', without any express or implied
// warranty. In no event will the authors be held liable for any damages
// arising from the use of this software.
//
// Permission is granted to anyone to use this software for any purpose,
// including commercial applications, and to alter it and redistribute it
// freely, subject to the following restrictions:
//
// 1. The origin of this software must not be misrepresented; you must not
// claim that you wrote the original software. If you use this software
// in a product, an acknowledgement in the product documentation would be
// appreciated but is not required.
// 2. Altered source versions must be plainly marked as such, and must not be
// misrepresented as being the original software.
// 3. This notice may not be removed or altered from any source distribution.
#pragma once
#include <atomic>
#include <thread>
#include <condition_variable>
#include <stdint.h>
#include <functional>
#include <assert.h>
// ENKITS_TASK_PRIORITIES_NUM can be set from 1 to 5.
// 1 corresponds to effectively no priorities.
#ifndef ENKITS_TASK_PRIORITIES_NUM
#define ENKITS_TASK_PRIORITIES_NUM 3
#endif
#if defined(_WIN32) && defined(ENKITS_BUILD_DLL)
// Building enkiTS as a DLL
#define ENKITS_API __declspec(dllexport)
#elif defined(_WIN32) && defined(ENKITS_DLL)
// Using enkiTS as a DLL
#define ENKITS_API __declspec(dllimport)
#elif defined(__GNUC__) && defined(ENKITS_BUILD_DLL)
// Building enkiTS as a shared library
#define ENKITS_API __attribute__((visibility("default")))
#else
#define ENKITS_API
#endif
// Define ENKI_CUSTOM_ALLOC_FILE_AND_LINE (at project level) to get file and line report in custom allocators,
// this is default in Debug - to turn off define ENKI_CUSTOM_ALLOC_NO_FILE_AND_LINE
#ifndef ENKI_CUSTOM_ALLOC_FILE_AND_LINE
#if defined(_DEBUG ) && !defined(ENKI_CUSTOM_ALLOC_NO_FILE_AND_LINE)
#define ENKI_CUSTOM_ALLOC_FILE_AND_LINE
#endif
#endif
namespace enki
{
struct TaskSetPartition
{
uint32_t start;
uint32_t end;
};
class TaskScheduler;
class TaskPipe;
class PinnedTaskList;
struct ThreadArgs;
struct ThreadDataStore;
struct SubTaskSet;
struct semaphoreid_t;
uint32_t GetNumHardwareThreads();
enum TaskPriority
{
TASK_PRIORITY_HIGH = 0,
#if ( ENKITS_TASK_PRIORITIES_NUM > 3 )
TASK_PRIORITY_MED_HI,
#endif
#if ( ENKITS_TASK_PRIORITIES_NUM > 2 )
TASK_PRIORITY_MED,
#endif
#if ( ENKITS_TASK_PRIORITIES_NUM > 4 )
TASK_PRIORITY_MED_LO,
#endif
#if ( ENKITS_TASK_PRIORITIES_NUM > 1 )
TASK_PRIORITY_LOW,
#endif
TASK_PRIORITY_NUM
};
// ICompletable is a base class used to check for completion.
// Do not use this class directly, instead derive from ITaskSet or IPinnedTask.
class ICompletable
{
public:
ICompletable() : m_Priority(TASK_PRIORITY_HIGH), m_RunningCount(0), m_WaitingForTaskCount(0) {}
bool GetIsComplete() const {
return 0 == m_RunningCount.load( std::memory_order_acquire );
}
virtual ~ICompletable() {}
TaskPriority m_Priority;
private:
friend class TaskScheduler;
std::atomic<int32_t> m_RunningCount;
mutable std::atomic<int32_t> m_WaitingForTaskCount;
};
// Subclass ITaskSet to create tasks.
// TaskSets can be re-used, but check completion first.
class ITaskSet : public ICompletable
{
public:
ITaskSet()
: m_SetSize(1)
, m_MinRange(1)
, m_RangeToRun(1)
{}
ITaskSet( uint32_t setSize_ )
: m_SetSize( setSize_ )
, m_MinRange(1)
, m_RangeToRun(1)
{}
ITaskSet( uint32_t setSize_, uint32_t minRange_ )
: m_SetSize( setSize_ )
, m_MinRange( minRange_ )
, m_RangeToRun(minRange_)
{}
// Execute range should be overloaded to process tasks. It will be called with a
// range_ where range.start >= 0; range.start < range.end; and range.end < m_SetSize;
// The range values should be mapped so that linearly processing them in order is cache friendly
// i.e. neighbouring values should be close together.
// threadnum should not be used for changing processing of data, it's intended purpose
// is to allow per-thread data buckets for output.
virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ ) = 0;
// Set Size - usually the number of data items to be processed, see ExecuteRange. Defaults to 1
uint32_t m_SetSize;
// Min Range - Minimum size of of TaskSetPartition range when splitting a task set into partitions.
// Designed for reducing scheduling overhead by preventing set being
// divided up too small. Ranges passed to ExecuteRange will *not* be a mulitple of this,
// only attempts to deliver range sizes larger than this most of the time.
// This should be set to a value which results in computation effort of at least 10k
// clock cycles to minimize task scheduler overhead.
// NOTE: The last partition will be smaller than m_MinRange if m_SetSize is not a multiple
// of m_MinRange.
// Also known as grain size in literature.
uint32_t m_MinRange;
private:
friend class TaskScheduler;
uint32_t m_RangeToRun;
};
// Subclass IPinnedTask to create tasks which can be run on a given thread only.
class IPinnedTask : public ICompletable
{
public:
IPinnedTask() : threadNum(0), pNext(NULL) {} // default is to run a task on main thread
IPinnedTask( uint32_t threadNum_ ) : threadNum(threadNum_), pNext(NULL) {} // default is to run a task on main thread
// IPinnedTask needs to be non abstract for intrusive list functionality.
// Should never be called as should be overridden.
virtual void Execute() { assert(false); }
uint32_t threadNum; // thread to run this pinned task on
std::atomic<IPinnedTask*> pNext; // Do not use. For intrusive list only.
};
// A utility task set for creating tasks based on std::func.
typedef std::function<void (TaskSetPartition range, uint32_t threadnum )> TaskSetFunction;
class TaskSet : public ITaskSet
{
public:
TaskSet() = default;
TaskSet( TaskSetFunction func_ ) : m_Function( func_ ) {}
TaskSet( uint32_t setSize_, TaskSetFunction func_ ) : ITaskSet( setSize_ ), m_Function( func_ ) {}
virtual void ExecuteRange( TaskSetPartition range_, uint32_t threadnum_ )
{
m_Function( range_, threadnum_ );
}
TaskSetFunction m_Function;
};
// TaskScheduler implements several callbacks intended for profilers
typedef void (*ProfilerCallbackFunc)( uint32_t threadnum_ );
struct ProfilerCallbacks
{
ProfilerCallbackFunc threadStart;
ProfilerCallbackFunc threadStop;
ProfilerCallbackFunc waitForNewTaskSuspendStart; // thread suspended waiting for new tasks
ProfilerCallbackFunc waitForNewTaskSuspendStop; // thread unsuspended
ProfilerCallbackFunc waitForTaskCompleteStart; // thread waiting for task completion
ProfilerCallbackFunc waitForTaskCompleteStop; // thread stopped waiting
ProfilerCallbackFunc waitForTaskCompleteSuspendStart; // thread suspended waiting task completion
ProfilerCallbackFunc waitForTaskCompleteSuspendStop; // thread unsuspended
};
// Custom allocator, set in TaskSchedulerConfig. Also see ENKI_CUSTOM_ALLOC_FILE_AND_LINE for file_ and line_
typedef void* (*AllocFunc)( size_t align_, size_t size_, void* userData_, const char* file_, int line_ );
typedef void (*FreeFunc)( void* ptr_, size_t size_, void* userData_, const char* file_, int line_ );
ENKITS_API void* DefaultAllocFunc( size_t align_, size_t size_, void* userData_, const char* file_, int line_ );
ENKITS_API void DefaultFreeFunc( void* ptr_, size_t size_, void* userData_, const char* file_, int line_ );
struct CustomAllocator
{
AllocFunc alloc = DefaultAllocFunc;
FreeFunc free = DefaultFreeFunc;
void* userData = nullptr;
};
// TaskSchedulerConfig - configuration struct for advanced Initialize
struct TaskSchedulerConfig
{
// numTaskThreadsToCreate - Number of tasking threads the task scheduler will create. Must be > 0.
// Defaults to GetNumHardwareThreads()-1 threads as thread which calls initialize is thread 0.
uint32_t numTaskThreadsToCreate = GetNumHardwareThreads()-1;
// numExternalTaskThreads - Advanced use. Number of external threads which need to use TaskScheduler API.
// See TaskScheduler::RegisterExternalTaskThread() for usage.
// Defaults to 0, the thread used to initialize the TaskScheduler.
uint32_t numExternalTaskThreads = 0;
ProfilerCallbacks profilerCallbacks = {};
CustomAllocator customAllocator;
};
class TaskScheduler
{
public:
ENKITS_API TaskScheduler();
ENKITS_API ~TaskScheduler();
// Call an Initialize function before adding tasks.
// Initialize() will create GetNumHardwareThreads()-1 tasking threads, which is
// sufficient to fill the system when including the main thread.
// Initialize can be called multiple times - it will wait for completion
// before re-initializing.
ENKITS_API void Initialize();
// Initialize( numThreadsTotal_ )
// will create numThreadsTotal_-1 threads, as thread 0 is
// the thread on which the initialize was called.
// numThreadsTotal_ must be > 0
ENKITS_API void Initialize( uint32_t numThreadsTotal_ );
// Initialize with advanced TaskSchedulerConfig settings. See TaskSchedulerConfig.
ENKITS_API void Initialize( TaskSchedulerConfig config_ );
// Get config. Can be called before Initialize to get the defaults.
ENKITS_API TaskSchedulerConfig GetConfig() const;
// Adds the TaskSet to pipe and returns if the pipe is not full.
// If the pipe is full, pTaskSet is run.
// should only be called from main thread, or within a task
ENKITS_API void AddTaskSetToPipe( ITaskSet* pTaskSet_ );
// Thread 0 is main thread, otherwise use threadNum
// Pinned tasks can be added from any thread
ENKITS_API void AddPinnedTask( IPinnedTask* pTask_ );
// This function will run any IPinnedTask* for current thread, but not run other
// Main thread should call this or use a wait to ensure it's tasks are run.
ENKITS_API void RunPinnedTasks();
// Runs the TaskSets in pipe until true == pTaskSet->GetIsComplete();
// should only be called from thread which created the taskscheduler , or within a task
// if called with 0 it will try to run tasks, and return if none available.
// To run only a subset of tasks, set priorityOfLowestToRun_ to a high priority.
// Default is lowest priority available.
// Only wait for child tasks of the current task otherwise a deadlock could occur.
ENKITS_API void WaitforTask( const ICompletable* pCompletable_, enki::TaskPriority priorityOfLowestToRun_ = TaskPriority(TASK_PRIORITY_NUM - 1) );
// Waits for all task sets to complete - not guaranteed to work unless we know we
// are in a situation where tasks aren't being continuously added.
ENKITS_API void WaitforAll();
// Waits for all task sets to complete and shutdown threads - not guaranteed to work unless we know we
// are in a situation where tasks aren't being continuously added.
// This function can be safely called even if TaskScheduler::Initialize() has not been called.
ENKITS_API void WaitforAllAndShutdown();
// Returns the number of threads created for running tasks + number of external threads
// plus 1 to account for the thread used to initialize the task scheduler.
// Equivalent to config values: numTaskThreadsToCreate + numExternalTaskThreads + 1.
// It is guaranteed that GetThreadNum() < GetNumTaskThreads()
ENKITS_API uint32_t GetNumTaskThreads() const;
// Returns the current task threadNum
// Will return 0 for thread which initialized the task scheduler,
// and all other non-enkiTS threads which have not been registered ( see RegisterExternalTaskThread() ),
// and < GetNumTaskThreads() for all threads.
// It is guaranteed that GetThreadNum() < GetNumTaskThreads()
ENKITS_API uint32_t GetThreadNum() const;
// Call on a thread to register the thread to use the TaskScheduling API.
// This is implicitly done for the thread which initializes the TaskScheduler
// Intended for developers who have threads who need to call the TaskScheduler API
// Returns true if successfull, false if not.
// Can only have numExternalTaskThreads registered at any one time, which must be set
// at initialization time.
ENKITS_API bool RegisterExternalTaskThread();
// Call on a thread on which RegisterExternalTaskThread has been called to deregister that thread.
ENKITS_API void DeRegisterExternalTaskThread();
// Get the number of registered external task threads.
ENKITS_API uint32_t GetNumRegisteredExternalTaskThreads();
// ------------- Start DEPRECATED Functions -------------
// DEPRECATED - WaitforTaskSet, deprecated interface use WaitforTask
inline void WaitforTaskSet( const ICompletable* pCompletable_ ) { WaitforTask( pCompletable_ ); }
// DEPRECATED - GetProfilerCallbacks. Use TaskSchedulerConfig instead
// Returns the ProfilerCallbacks structure so that it can be modified to
// set the callbacks. Should be set prior to initialization.
inline ProfilerCallbacks* GetProfilerCallbacks() { return &m_Config.profilerCallbacks; }
// ------------- End DEPRECATED Functions -------------
private:
static void TaskingThreadFunction( const ThreadArgs& args_ );
bool HaveTasks( uint32_t threadNum_ );
void WaitForNewTasks( uint32_t threadNum_ );
void WaitForTaskCompletion( const ICompletable* pCompletable_, uint32_t threadNum_ );
void RunPinnedTasks( uint32_t threadNum_, uint32_t priority_ );
bool TryRunTask( uint32_t threadNum_, uint32_t& hintPipeToCheck_io_ );
bool TryRunTask( uint32_t threadNum_, uint32_t priority_, uint32_t& hintPipeToCheck_io_ );
void StartThreads();
void StopThreads( bool bWait_ );
void SplitAndAddTask( uint32_t threadNum_, SubTaskSet subTask_, uint32_t rangeToSplit_ );
void WakeThreadsForNewTasks();
void WakeThreadsForTaskCompletion();
template< typename T > T* NewArray( size_t num_, const char* file_, int line_ );
template< typename T > void DeleteArray( T* p_, size_t num_, const char* file_, int line_ );
template<class T, class... Args> T* New( const char* file_, int line_, Args&&... args_ );
template< typename T > void Delete( T* p_, const char* file_, int line_ );
semaphoreid_t* SemaphoreNew();
void SemaphoreDelete( semaphoreid_t* pSemaphore_ );
TaskPipe* m_pPipesPerThread[ TASK_PRIORITY_NUM ];
PinnedTaskList* m_pPinnedTaskListPerThread[ TASK_PRIORITY_NUM ];
uint32_t m_NumThreads;
ThreadDataStore* m_pThreadDataStore;
std::thread* m_pThreads;
std::atomic<int32_t> m_bRunning;
std::atomic<int32_t> m_NumInternalTaskThreadsRunning;
std::atomic<int32_t> m_NumThreadsWaitingForNewTasks;
std::atomic<int32_t> m_NumThreadsWaitingForTaskCompletion;
uint32_t m_NumPartitions;
semaphoreid_t* m_pNewTaskSemaphore;
semaphoreid_t* m_pTaskCompleteSemaphore;
uint32_t m_NumInitialPartitions;
bool m_bHaveThreads;
TaskSchedulerConfig m_Config;
std::atomic<int32_t> m_NumExternalTaskThreadsRegistered;
TaskScheduler( const TaskScheduler& nocopy_ );
TaskScheduler& operator=( const TaskScheduler& nocopy_ );
protected:
void SetCustomAllocator( CustomAllocator customAllocator_ ); // for C interface
};
inline uint32_t GetNumHardwareThreads()
{
return std::thread::hardware_concurrency();
}
}