Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch of fixes for WorkerThreadPool and ResourceLoader #94169

Merged
merged 5 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion SConstruct
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ else: # GCC, Clang
if cc_version_major >= 11: # Broke on MethodBind templates before GCC 11.
env.Append(CCFLAGS=["-Wlogical-op"])
elif methods.using_clang(env) or methods.using_emcc(env):
env.Append(CCFLAGS=["-Wimplicit-fallthrough"])
env.Append(CCFLAGS=["-Wimplicit-fallthrough", "-Wno-undefined-var-template"])
elif env["warnings"] == "all":
env.Append(CCFLAGS=["-Wall"] + common_warnings)
elif env["warnings"] == "moderate":
Expand Down
261 changes: 158 additions & 103 deletions core/io/resource_loader.cpp

Large diffs are not rendered by default.

16 changes: 11 additions & 5 deletions core/io/resource_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ typedef Error (*ResourceLoaderImport)(const String &p_path);
typedef void (*ResourceLoadedCallback)(Ref<Resource> p_resource, const String &p_path);

class ResourceLoader {
friend class LoadToken;

enum {
MAX_LOADERS = 64
};
Expand All @@ -121,6 +123,7 @@ class ResourceLoader {
struct LoadToken : public RefCounted {
String local_path;
String user_path;
uint32_t user_rc = 0; // Having user RC implies regular RC incremented in one, until the user RC reaches zero.
Ref<Resource> res_if_unregistered;

void clear();
Expand All @@ -130,10 +133,13 @@ class ResourceLoader {

static const int BINARY_MUTEX_TAG = 1;

static Ref<LoadToken> _load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode);
static Ref<LoadToken> _load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode, bool p_for_user = false);
static Ref<Resource> _load_complete(LoadToken &p_load_token, Error *r_error);

private:
static LoadToken *_load_threaded_request_reuse_user_token(const String &p_path);
static void _load_threaded_request_setup_user_token(LoadToken *p_token, const String &p_path);

static Ref<Resource> _load_complete_inner(LoadToken &p_load_token, Error *r_error, MutexLock<SafeBinaryMutex<BINARY_MUTEX_TAG>> &p_thread_load_lock);

static Ref<ResourceFormatLoader> loader[MAX_LOADERS];
Expand Down Expand Up @@ -171,7 +177,6 @@ class ResourceLoader {
bool need_wait = true;
LoadToken *load_token = nullptr;
String local_path;
String remapped_path;
String type_hint;
float progress = 0.0f;
float max_reported_progress = 0.0f;
Expand All @@ -180,18 +185,19 @@ class ResourceLoader {
ResourceFormatLoader::CacheMode cache_mode = ResourceFormatLoader::CACHE_MODE_REUSE;
Error error = OK;
Ref<Resource> resource;
bool xl_remapped = false;
bool use_sub_threads = false;
HashSet<String> sub_tasks;
};

static void _thread_load_function(void *p_userdata);
static void _run_load_task(void *p_userdata);

static thread_local int load_nesting;
static thread_local WorkerThreadPool::TaskID caller_task_id;
static thread_local HashMap<int, HashMap<String, Ref<Resource>>> res_ref_overrides; // Outermost key is nesting level.
static thread_local Vector<String> load_paths_stack;

static SafeBinaryMutex<BINARY_MUTEX_TAG> thread_load_mutex;
friend SafeBinaryMutex<BINARY_MUTEX_TAG> &_get_res_loader_mutex();

static HashMap<String, ThreadLoadTask> thread_load_tasks;
static bool cleaning_tasks;

Expand Down
67 changes: 30 additions & 37 deletions core/object/worker_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include "core/object/script_language.h"
#include "core/os/os.h"
#include "core/os/safe_binary_mutex.h"
#include "core/os/thread_safe.h"

WorkerThreadPool::Task *const WorkerThreadPool::ThreadData::YIELDING = (Task *)1;
Expand All @@ -46,7 +47,7 @@ void WorkerThreadPool::Task::free_template_userdata() {
WorkerThreadPool *WorkerThreadPool::singleton = nullptr;

#ifdef THREADS_ENABLED
thread_local uintptr_t WorkerThreadPool::unlockable_mutexes[MAX_UNLOCKABLE_MUTEXES] = {};
thread_local WorkerThreadPool::UnlockableLocks WorkerThreadPool::unlockable_locks[MAX_UNLOCKABLE_LOCKS];
#endif

void WorkerThreadPool::_process_task(Task *p_task) {
Expand Down Expand Up @@ -428,27 +429,19 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {

void WorkerThreadPool::_lock_unlockable_mutexes() {
#ifdef THREADS_ENABLED
for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) {
if (unlockable_mutexes[i]) {
if ((((uintptr_t)unlockable_mutexes[i]) & 1) == 0) {
((Mutex *)unlockable_mutexes[i])->lock();
} else {
((BinaryMutex *)(unlockable_mutexes[i] & ~1))->lock();
}
for (uint32_t i = 0; i < MAX_UNLOCKABLE_LOCKS; i++) {
if (unlockable_locks[i].ulock) {
unlockable_locks[i].ulock->lock();
}
}
#endif
}

void WorkerThreadPool::_unlock_unlockable_mutexes() {
#ifdef THREADS_ENABLED
for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) {
if (unlockable_mutexes[i]) {
if ((((uintptr_t)unlockable_mutexes[i]) & 1) == 0) {
((Mutex *)unlockable_mutexes[i])->unlock();
} else {
((BinaryMutex *)(unlockable_mutexes[i] & ~1))->unlock();
}
for (uint32_t i = 0; i < MAX_UNLOCKABLE_LOCKS; i++) {
if (unlockable_locks[i].ulock) {
unlockable_locks[i].ulock->unlock();
}
}
#endif
Expand Down Expand Up @@ -665,38 +658,38 @@ int WorkerThreadPool::get_thread_index() {
return singleton->thread_ids.has(tid) ? singleton->thread_ids[tid] : -1;
}

#ifdef THREADS_ENABLED
uint32_t WorkerThreadPool::thread_enter_unlock_allowance_zone(Mutex *p_mutex) {
return _thread_enter_unlock_allowance_zone(p_mutex, false);
}

uint32_t WorkerThreadPool::thread_enter_unlock_allowance_zone(BinaryMutex *p_mutex) {
return _thread_enter_unlock_allowance_zone(p_mutex, true);
WorkerThreadPool::TaskID WorkerThreadPool::get_caller_task_id() {
int th_index = get_thread_index();
if (th_index != -1 && singleton->threads[th_index].current_task) {
return singleton->threads[th_index].current_task->self;
} else {
return INVALID_TASK_ID;
}
}

uint32_t WorkerThreadPool::_thread_enter_unlock_allowance_zone(void *p_mutex, bool p_is_binary) {
for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) {
if (unlikely((unlockable_mutexes[i] & ~1) == (uintptr_t)p_mutex)) {
#ifdef THREADS_ENABLED
uint32_t WorkerThreadPool::_thread_enter_unlock_allowance_zone(THREADING_NAMESPACE::unique_lock<THREADING_NAMESPACE::mutex> &p_ulock) {
for (uint32_t i = 0; i < MAX_UNLOCKABLE_LOCKS; i++) {
DEV_ASSERT((bool)unlockable_locks[i].ulock == (bool)unlockable_locks[i].rc);
if (unlockable_locks[i].ulock == &p_ulock) {
// Already registered in the current thread.
return UINT32_MAX;
}
if (!unlockable_mutexes[i]) {
unlockable_mutexes[i] = (uintptr_t)p_mutex;
if (p_is_binary) {
unlockable_mutexes[i] |= 1;
}
unlockable_locks[i].rc++;
return i;
} else if (!unlockable_locks[i].ulock) {
unlockable_locks[i].ulock = &p_ulock;
unlockable_locks[i].rc = 1;
return i;
}
}
ERR_FAIL_V_MSG(UINT32_MAX, "No more unlockable mutex slots available. Engine bug.");
ERR_FAIL_V_MSG(UINT32_MAX, "No more unlockable lock slots available. Engine bug.");
}

void WorkerThreadPool::thread_exit_unlock_allowance_zone(uint32_t p_zone_id) {
if (p_zone_id == UINT32_MAX) {
return;
DEV_ASSERT(unlockable_locks[p_zone_id].ulock && unlockable_locks[p_zone_id].rc);
unlockable_locks[p_zone_id].rc--;
if (unlockable_locks[p_zone_id].rc == 0) {
unlockable_locks[p_zone_id].ulock = nullptr;
}
DEV_ASSERT(unlockable_mutexes[p_zone_id]);
unlockable_mutexes[p_zone_id] = 0;
}
#endif

Expand Down
20 changes: 14 additions & 6 deletions core/object/worker_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,12 @@ class WorkerThreadPool : public Object {
static WorkerThreadPool *singleton;

#ifdef THREADS_ENABLED
static const uint32_t MAX_UNLOCKABLE_MUTEXES = 2;
static thread_local uintptr_t unlockable_mutexes[MAX_UNLOCKABLE_MUTEXES];
static const uint32_t MAX_UNLOCKABLE_LOCKS = 2;
struct UnlockableLocks {
THREADING_NAMESPACE::unique_lock<THREADING_NAMESPACE::mutex> *ulock = nullptr;
uint32_t rc = 0;
};
static thread_local UnlockableLocks unlockable_locks[MAX_UNLOCKABLE_LOCKS];
#endif

TaskID _add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description);
Expand Down Expand Up @@ -192,7 +196,7 @@ class WorkerThreadPool : public Object {
void _wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task);

#ifdef THREADS_ENABLED
static uint32_t _thread_enter_unlock_allowance_zone(void *p_mutex, bool p_is_binary);
static uint32_t _thread_enter_unlock_allowance_zone(THREADING_NAMESPACE::unique_lock<THREADING_NAMESPACE::mutex> &p_ulock);
#endif

void _lock_unlockable_mutexes();
Expand Down Expand Up @@ -239,13 +243,17 @@ class WorkerThreadPool : public Object {

static WorkerThreadPool *get_singleton() { return singleton; }
static int get_thread_index();
static TaskID get_caller_task_id();

#ifdef THREADS_ENABLED
static uint32_t thread_enter_unlock_allowance_zone(Mutex *p_mutex);
static uint32_t thread_enter_unlock_allowance_zone(BinaryMutex *p_mutex);
_ALWAYS_INLINE_ static uint32_t thread_enter_unlock_allowance_zone(const MutexLock<BinaryMutex> &p_lock) { return _thread_enter_unlock_allowance_zone(p_lock._get_lock()); }
template <int Tag>
_ALWAYS_INLINE_ static uint32_t thread_enter_unlock_allowance_zone(const SafeBinaryMutex<Tag> &p_mutex) { return _thread_enter_unlock_allowance_zone(p_mutex._get_lock()); }
static void thread_exit_unlock_allowance_zone(uint32_t p_zone_id);
#else
static uint32_t thread_enter_unlock_allowance_zone(void *p_mutex) { return UINT32_MAX; }
static uint32_t thread_enter_unlock_allowance_zone(const MutexLock<BinaryMutex> &p_lock) { return UINT32_MAX; }
template <int Tag>
static uint32_t thread_enter_unlock_allowance_zone(const SafeBinaryMutex<Tag> &p_mutex) { return UINT32_MAX; }
static void thread_exit_unlock_allowance_zone(uint32_t p_zone_id) {}
#endif

Expand Down
8 changes: 7 additions & 1 deletion core/os/condition_variable.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#define CONDITION_VARIABLE_H

#include "core/os/mutex.h"
#include "core/os/safe_binary_mutex.h"

#ifdef THREADS_ENABLED

Expand All @@ -56,7 +57,12 @@ class ConditionVariable {
public:
template <typename BinaryMutexT>
_ALWAYS_INLINE_ void wait(const MutexLock<BinaryMutexT> &p_lock) const {
condition.wait(const_cast<THREADING_NAMESPACE::unique_lock<THREADING_NAMESPACE::mutex> &>(p_lock.lock));
condition.wait(const_cast<THREADING_NAMESPACE::unique_lock<THREADING_NAMESPACE::mutex> &>(p_lock._get_lock()));
}

template <int Tag>
_ALWAYS_INLINE_ void wait(const MutexLock<SafeBinaryMutex<Tag>> &p_lock) const {
condition.wait(const_cast<THREADING_NAMESPACE::unique_lock<THREADING_NAMESPACE::mutex> &>(p_lock.mutex._get_lock()));
}

_ALWAYS_INLINE_ void notify_one() const {
Expand Down
9 changes: 7 additions & 2 deletions core/os/mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,18 @@ class MutexImpl {

template <typename MutexT>
class MutexLock {
friend class ConditionVariable;

THREADING_NAMESPACE::unique_lock<typename MutexT::StdMutexType> lock;

public:
explicit MutexLock(const MutexT &p_mutex) :
lock(p_mutex.mutex) {}

// Clarification: all the funny syntax is needed so this function exists only for binary mutexes.
template <typename T = MutexT>
_ALWAYS_INLINE_ THREADING_NAMESPACE::unique_lock<THREADING_NAMESPACE::mutex> &_get_lock(
typename std::enable_if<std::is_same<T, THREADING_NAMESPACE::mutex>::value> * = nullptr) const {
return const_cast<THREADING_NAMESPACE::unique_lock<THREADING_NAMESPACE::mutex> &>(lock);
}
};

using Mutex = MutexImpl<THREADING_NAMESPACE::recursive_mutex>; // Recursive, for general use
Expand Down
76 changes: 38 additions & 38 deletions core/os/safe_binary_mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,76 +47,76 @@
// Also, don't forget to declare the thread_local variable on each use.
template <int Tag>
class SafeBinaryMutex {
friend class MutexLock<SafeBinaryMutex>;
friend class MutexLock<SafeBinaryMutex<Tag>>;

using StdMutexType = THREADING_NAMESPACE::mutex;

mutable THREADING_NAMESPACE::mutex mutex;
static thread_local uint32_t count;

struct TLSData {
mutable THREADING_NAMESPACE::unique_lock<THREADING_NAMESPACE::mutex> lock;
uint32_t count = 0;

TLSData(SafeBinaryMutex<Tag> &p_mutex) :
lock(p_mutex.mutex, THREADING_NAMESPACE::defer_lock) {}
};
static thread_local TLSData tls_data;

public:
_ALWAYS_INLINE_ void lock() const {
if (++count == 1) {
mutex.lock();
if (++tls_data.count == 1) {
tls_data.lock.lock();
}
}

_ALWAYS_INLINE_ void unlock() const {
DEV_ASSERT(count);
if (--count == 0) {
mutex.unlock();
DEV_ASSERT(tls_data.count);
if (--tls_data.count == 0) {
tls_data.lock.unlock();
}
}

_ALWAYS_INLINE_ bool try_lock() const {
if (count) {
count++;
return true;
} else {
if (mutex.try_lock()) {
count++;
return true;
} else {
return false;
}
}
_ALWAYS_INLINE_ THREADING_NAMESPACE::unique_lock<THREADING_NAMESPACE::mutex> &_get_lock() const {
return const_cast<THREADING_NAMESPACE::unique_lock<THREADING_NAMESPACE::mutex> &>(tls_data.lock);
}

~SafeBinaryMutex() {
DEV_ASSERT(!count);
_ALWAYS_INLINE_ SafeBinaryMutex() {
}

_ALWAYS_INLINE_ ~SafeBinaryMutex() {
DEV_ASSERT(!tls_data.count);
}
};

// This specialization is needed so manual locking and MutexLock can be used
// at the same time on a SafeBinaryMutex.
template <int Tag>
class MutexLock<SafeBinaryMutex<Tag>> {
friend class ConditionVariable;

THREADING_NAMESPACE::unique_lock<THREADING_NAMESPACE::mutex> lock;
const SafeBinaryMutex<Tag> &mutex;

public:
_ALWAYS_INLINE_ explicit MutexLock(const SafeBinaryMutex<Tag> &p_mutex) :
lock(p_mutex.mutex) {
SafeBinaryMutex<Tag>::count++;
};
_ALWAYS_INLINE_ ~MutexLock() {
SafeBinaryMutex<Tag>::count--;
};
explicit MutexLock(const SafeBinaryMutex<Tag> &p_mutex) :
mutex(p_mutex) {
mutex.lock();
}

~MutexLock() {
mutex.unlock();
}
};

#else // No threads.

template <int Tag>
class SafeBinaryMutex : public MutexImpl {
static thread_local uint32_t count;
};
class SafeBinaryMutex {
struct TLSData {
TLSData(SafeBinaryMutex<Tag> &p_mutex) {}
};
static thread_local TLSData tls_data;

template <int Tag>
class MutexLock<SafeBinaryMutex<Tag>> {
public:
MutexLock(const SafeBinaryMutex<Tag> &p_mutex) {}
~MutexLock() {}
void lock() const {}
void unlock() const {}
};

#endif // THREADS_ENABLED
Expand Down
Loading
Loading