From 1c4849b162151df3f36b0e24d6f9cf70ccbce8ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Wed, 19 Jun 2024 07:27:33 +0200 Subject: [PATCH 01/17] ResourceLoader: Revert workaround resource loading crashes due to buggy TLS This reverts commit 41c07856361d7cf2bcbda6d84386b1a0d3969f6a. (cherry picked from commit e9407d48772e9ed1382f6ccd5a73e6d12465ab2f) --- core/io/resource_loader.cpp | 18 +++++++----------- core/io/resource_loader.h | 2 +- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 928bb95de3a8..3a61653c957d 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -245,9 +245,9 @@ ResourceLoader::LoadToken::~LoadToken() { Ref ResourceLoader::_load(const String &p_path, const String &p_original_path, const String &p_type_hint, ResourceFormatLoader::CacheMode p_cache_mode, Error *r_error, bool p_use_sub_threads, float *r_progress) { const String &original_path = p_original_path.is_empty() ? p_path : p_original_path; load_nesting++; - if (load_paths_stack->size()) { + if (load_paths_stack.size()) { thread_load_mutex.lock(); - const String &parent_task_path = load_paths_stack->get(load_paths_stack->size() - 1); + const String &parent_task_path = load_paths_stack.get(load_paths_stack.size() - 1); HashMap::Iterator E = thread_load_tasks.find(parent_task_path); // Avoid double-tracking, for progress reporting, resources that boil down to a remapped path containing the real payload (e.g., imported resources). bool is_remapped_load = original_path == parent_task_path; @@ -256,7 +256,7 @@ Ref ResourceLoader::_load(const String &p_path, const String &p_origin } thread_load_mutex.unlock(); } - load_paths_stack->push_back(original_path); + load_paths_stack.push_back(original_path); // Try all loaders and pick the first match for the type hint bool found = false; @@ -272,7 +272,7 @@ Ref ResourceLoader::_load(const String &p_path, const String &p_origin } } - load_paths_stack->resize(load_paths_stack->size() - 1); + load_paths_stack.resize(load_paths_stack.size() - 1); res_ref_overrides.erase(load_nesting); load_nesting--; @@ -306,8 +306,7 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { // Thread-safe either if it's the current thread or a brand new one. CallQueue *own_mq_override = nullptr; if (load_nesting == 0) { - load_paths_stack = memnew(Vector); - + DEV_ASSERT(load_paths_stack.is_empty()); if (!Thread::is_main_thread()) { // Let the caller thread use its own, for added flexibility. Provide one otherwise. if (MessageQueue::get_singleton() == MessageQueue::get_main_singleton()) { @@ -408,10 +407,7 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { MessageQueue::set_thread_singleton_override(nullptr); memdelete(own_mq_override); } - if (load_paths_stack) { - memdelete(load_paths_stack); - load_paths_stack = nullptr; - } + DEV_ASSERT(load_paths_stack.is_empty()); } } @@ -1309,7 +1305,7 @@ bool ResourceLoader::timestamp_on_load = false; thread_local int ResourceLoader::load_nesting = 0; thread_local WorkerThreadPool::TaskID ResourceLoader::caller_task_id = 0; -thread_local Vector *ResourceLoader::load_paths_stack = nullptr; +thread_local Vector ResourceLoader::load_paths_stack; thread_local HashMap>> ResourceLoader::res_ref_overrides; template <> diff --git a/core/io/resource_loader.h b/core/io/resource_loader.h index 5f1831f0d988..9d07964105f3 100644 --- a/core/io/resource_loader.h +++ b/core/io/resource_loader.h @@ -188,7 +188,7 @@ class ResourceLoader { static thread_local int load_nesting; static thread_local WorkerThreadPool::TaskID caller_task_id; static thread_local HashMap>> res_ref_overrides; // Outermost key is nesting level. - static thread_local Vector *load_paths_stack; // A pointer to avoid broken TLS implementations from double-running the destructor. + static thread_local Vector load_paths_stack; static SafeBinaryMutex thread_load_mutex; static HashMap thread_load_tasks; static bool cleaning_tasks; From 257dd2f9e51b4f770f230d0766f74880a7c8d383 Mon Sep 17 00:00:00 2001 From: Aleksey Vasenev Date: Mon, 29 Jul 2024 20:20:55 +0300 Subject: [PATCH 02/17] Fix use condition_variable after free (cherry picked from commit 2ff6594928f0d6004ca76359af8e31c504b0bd57) --- core/io/resource_loader.cpp | 20 ++++++++++++-------- core/io/resource_loader.h | 2 ++ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 3a61653c957d..3345b921084f 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -336,11 +336,10 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { load_task.status = THREAD_LOAD_LOADED; } - if (load_task.cond_var) { + if (load_task.cond_var && load_task.need_wait) { load_task.cond_var->notify_all(); - memdelete(load_task.cond_var); - load_task.cond_var = nullptr; } + load_task.need_wait = false; bool ignoring = load_task.cache_mode == ResourceFormatLoader::CACHE_MODE_IGNORE || load_task.cache_mode == ResourceFormatLoader::CACHE_MODE_IGNORE_DEEP; bool replacing = load_task.cache_mode == ResourceFormatLoader::CACHE_MODE_REPLACE || load_task.cache_mode == ResourceFormatLoader::CACHE_MODE_REPLACE_DEEP; @@ -729,15 +728,21 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro DEV_ASSERT(wtp_task_err == OK); thread_load_mutex.lock(); } - } else { + } else if (load_task.need_wait) { // Loading thread is main or user thread. if (!load_task.cond_var) { load_task.cond_var = memnew(ConditionVariable); } + load_task.awaiters_count++; do { load_task.cond_var->wait(p_thread_load_lock); DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count()); - } while (load_task.cond_var); + } while (load_task.need_wait); + load_task.awaiters_count--; + if (load_task.awaiters_count == 0) { + memdelete(load_task.cond_var); + load_task.cond_var = nullptr; + } } } else { if (loader_is_wtp) { @@ -1165,11 +1170,10 @@ void ResourceLoader::clear_thread_load_tasks() { if (thread_load_tasks.size()) { for (KeyValue &E : thread_load_tasks) { if (E.value.status == THREAD_LOAD_IN_PROGRESS) { - if (E.value.cond_var) { + if (E.value.cond_var && E.value.need_wait) { E.value.cond_var->notify_all(); - memdelete(E.value.cond_var); - E.value.cond_var = nullptr; } + E.value.need_wait = false; none_running = false; } } diff --git a/core/io/resource_loader.h b/core/io/resource_loader.h index 9d07964105f3..ec9997891ea5 100644 --- a/core/io/resource_loader.h +++ b/core/io/resource_loader.h @@ -167,6 +167,8 @@ class ResourceLoader { Thread::ID thread_id = 0; // Used if running on an user thread (e.g., simple non-threaded load). bool awaited = false; // If it's in the pool, this helps not awaiting from more than one dependent thread. ConditionVariable *cond_var = nullptr; // In not in the worker pool or already awaiting, this is used as a secondary awaiting mechanism. + uint32_t awaiters_count = 0; + bool need_wait = true; LoadToken *load_token = nullptr; String local_path; String remapped_path; From ece392538ec4ec2742546babb4f13d77c28390b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Wed, 10 Jul 2024 12:01:22 +0200 Subject: [PATCH 03/17] ResourceLoader: Properly push & pop TLS state on recursive load tasks (cherry picked from commit bd0959ebdd8819321f9b24880d05b43eb2aaa4cc) --- core/io/resource_loader.cpp | 22 ++++++++++++++++++++++ core/io/resource_loader.h | 2 ++ 2 files changed, 24 insertions(+) diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 3345b921084f..f75480977467 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -207,6 +207,24 @@ void ResourceFormatLoader::_bind_methods() { /////////////////////////////////// +// These are used before and after a wait for a WorkerThreadPool task +// because that can lead to another load started in the same thread, +// something we must treat as a different stack for the purposes +// of tracking nesting. + +#define PREPARE_FOR_WTP_WAIT \ + int load_nesting_backup = ResourceLoader::load_nesting; \ + Vector load_paths_stack_backup = ResourceLoader::load_paths_stack; \ + ResourceLoader::load_nesting = 0; \ + ResourceLoader::load_paths_stack.clear(); + +#define RESTORE_AFTER_WTP_WAIT \ + DEV_ASSERT(ResourceLoader::load_nesting == 0); \ + DEV_ASSERT(ResourceLoader::load_paths_stack.is_empty()); \ + ResourceLoader::load_nesting = load_nesting_backup; \ + ResourceLoader::load_paths_stack = load_paths_stack_backup; \ + load_paths_stack_backup.clear(); + // This should be robust enough to be called redundantly without issues. void ResourceLoader::LoadToken::clear() { thread_load_mutex.lock(); @@ -234,7 +252,9 @@ void ResourceLoader::LoadToken::clear() { // If task is unused, await it here, locally, now the token data is consistent. if (task_to_await) { + PREPARE_FOR_WTP_WAIT WorkerThreadPool::get_singleton()->wait_for_task_completion(task_to_await); + RESTORE_AFTER_WTP_WAIT } } @@ -704,7 +724,9 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro // Loading thread is in the worker pool. load_task.awaited = true; thread_load_mutex.unlock(); + PREPARE_FOR_WTP_WAIT wtp_task_err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id); + RESTORE_AFTER_WTP_WAIT } if (load_task.status == THREAD_LOAD_IN_PROGRESS) { // If early errored, awaiting would deadlock. diff --git a/core/io/resource_loader.h b/core/io/resource_loader.h index ec9997891ea5..7a931cb1617e 100644 --- a/core/io/resource_loader.h +++ b/core/io/resource_loader.h @@ -100,6 +100,8 @@ typedef Error (*ResourceLoaderImport)(const String &p_path); typedef void (*ResourceLoadedCallback)(Ref p_resource, const String &p_path); class ResourceLoader { + friend class LoadToken; + enum { MAX_LOADERS = 64 }; From ea28ac510de3cef098a7624986072ff44546e87f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Wed, 10 Jul 2024 12:53:14 +0200 Subject: [PATCH 04/17] ResourceLoader: Enhance deadlock prevention Benefits: - Simpler code. The main load function is renamed so it's apparent that it's not just a thread entry point anymore. - Cache and thread modes of the original task are honored. A beautiful consequence of this is that, unlike formerly, re-issued loads can use the resource cache, which makes this mechanism much more performant. - The newly added getter for caller task id in WorkerThreadPool allows to remove the custom tracking of that in ResourceLoader. - The check to replace a cached resource and the replacement itself happen atomically. That fixes deadlock prevention leading to multiple resource instances of the same one on disk. As a side effect, it also makes the regular check for replace load mode more robust. (cherry picked from commit 28619e26cf35227c3ddab35878e1045f82895657) --- core/io/resource_loader.cpp | 52 +++++++++++++++++++----------- core/io/resource_loader.h | 3 +- core/object/worker_thread_pool.cpp | 9 ++++++ core/object/worker_thread_pool.h | 1 + 4 files changed, 45 insertions(+), 20 deletions(-) diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index f75480977467..f21d1e92dd15 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -311,11 +311,11 @@ Ref ResourceLoader::_load(const String &p_path, const String &p_origin ERR_FAIL_V_MSG(Ref(), vformat("No loader found for resource: %s (expected type: %s)", p_path, p_type_hint)); } -void ResourceLoader::_thread_load_function(void *p_userdata) { +// This implementation must allow re-entrancy for a task that started awaiting in a deeper stack frame. +void ResourceLoader::_run_load_task(void *p_userdata) { ThreadLoadTask &load_task = *(ThreadLoadTask *)p_userdata; thread_load_mutex.lock(); - caller_task_id = load_task.task_id; if (cleaning_tasks) { load_task.status = THREAD_LOAD_FAILED; thread_load_mutex.unlock(); @@ -372,15 +372,28 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { unlock_pending = false; if (!ignoring) { - if (replacing) { - Ref old_res = ResourceCache::get_ref(load_task.local_path); - if (old_res.is_valid() && old_res != load_task.resource) { - // If resource is already loaded, only replace its data, to avoid existing invalidating instances. - old_res->copy_from(load_task.resource); + ResourceCache::lock.lock(); // Check and operations must happen atomically. + bool pending_unlock = true; + Ref old_res = ResourceCache::get_ref(load_task.local_path); + if (old_res.is_valid()) { + if (old_res != load_task.resource) { + // Resource can already exists at this point for two reasons: + // a) The load uses replace mode. + // b) There were more than one load in flight for the same path because of deadlock prevention. + // Either case, we want to keep the resource that was already there. + ResourceCache::lock.unlock(); + pending_unlock = false; + if (replacing) { + old_res->copy_from(load_task.resource); + } load_task.resource = old_res; } + } else { + load_task.resource->set_path(load_task.local_path); + } + if (pending_unlock) { + ResourceCache::lock.unlock(); } - load_task.resource->set_path(load_task.local_path, replacing); } else { load_task.resource->set_path_cache(load_task.local_path); } @@ -548,14 +561,20 @@ Ref ResourceLoader::_load_start(const String &p_path, run_on_current_thread = must_not_register || p_thread_mode == LOAD_THREAD_FROM_CURRENT; if (run_on_current_thread) { - load_task_ptr->thread_id = Thread::get_caller_id(); + // The current thread may happen to be a thread from the pool. + WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->get_caller_task_id(); + if (tid != WorkerThreadPool::INVALID_TASK_ID) { + load_task_ptr->task_id = tid; + } else { + load_task_ptr->thread_id = Thread::get_caller_id(); + } } else { - load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_thread_load_function, load_task_ptr); + load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_run_load_task, load_task_ptr); } } if (run_on_current_thread) { - _thread_load_function(load_task_ptr); + _run_load_task(load_task_ptr); if (must_not_register) { load_token->res_if_unregistered = load_task_ptr->resource; } @@ -708,7 +727,7 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro if (load_task.status == THREAD_LOAD_IN_PROGRESS) { DEV_ASSERT((load_task.task_id == 0) != (load_task.thread_id == 0)); - if ((load_task.task_id != 0 && load_task.task_id == caller_task_id) || + if ((load_task.task_id != 0 && load_task.task_id == WorkerThreadPool::get_singleton()->get_caller_task_id()) || (load_task.thread_id != 0 && load_task.thread_id == Thread::get_caller_id())) { // Load is in progress, but it's precisely this thread the one in charge. // That means this is a cyclic load. @@ -737,12 +756,10 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro // resource loading that means that the task to wait for can be restarted here to break the // cycle, with as much recursion into this process as needed. // When the stack is eventually unrolled, the original load will have been notified to go on. - // CACHE_MODE_IGNORE is needed because, otherwise, the new request would just see there's - // an ongoing load for that resource and wait for it again. This value forces a new load. - Ref token = _load_start(load_task.local_path, load_task.type_hint, LOAD_THREAD_DISTRIBUTE, ResourceFormatLoader::CACHE_MODE_IGNORE); - Ref resource = _load_complete(*token.ptr(), &wtp_task_err); + _run_load_task(&load_task); + Ref resource = load_task.resource; if (r_error) { - *r_error = wtp_task_err; + *r_error = load_task.error; } thread_load_mutex.lock(); return resource; @@ -1330,7 +1347,6 @@ bool ResourceLoader::abort_on_missing_resource = true; bool ResourceLoader::timestamp_on_load = false; thread_local int ResourceLoader::load_nesting = 0; -thread_local WorkerThreadPool::TaskID ResourceLoader::caller_task_id = 0; thread_local Vector ResourceLoader::load_paths_stack; thread_local HashMap>> ResourceLoader::res_ref_overrides; diff --git a/core/io/resource_loader.h b/core/io/resource_loader.h index 7a931cb1617e..511bea1e1690 100644 --- a/core/io/resource_loader.h +++ b/core/io/resource_loader.h @@ -187,10 +187,9 @@ class ResourceLoader { HashSet sub_tasks; }; - static void _thread_load_function(void *p_userdata); + static void _run_load_task(void *p_userdata); static thread_local int load_nesting; - static thread_local WorkerThreadPool::TaskID caller_task_id; static thread_local HashMap>> res_ref_overrides; // Outermost key is nesting level. static thread_local Vector load_paths_stack; static SafeBinaryMutex thread_load_mutex; diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp index 56b9fa847567..155d963a2bd8 100644 --- a/core/object/worker_thread_pool.cpp +++ b/core/object/worker_thread_pool.cpp @@ -665,6 +665,15 @@ int WorkerThreadPool::get_thread_index() { return singleton->thread_ids.has(tid) ? singleton->thread_ids[tid] : -1; } +WorkerThreadPool::TaskID WorkerThreadPool::get_caller_task_id() { + int th_index = get_thread_index(); + if (th_index != -1 && singleton->threads[th_index].current_task) { + return singleton->threads[th_index].current_task->self; + } else { + return INVALID_TASK_ID; + } +} + #ifdef THREADS_ENABLED uint32_t WorkerThreadPool::thread_enter_unlock_allowance_zone(Mutex *p_mutex) { return _thread_enter_unlock_allowance_zone(p_mutex, false); diff --git a/core/object/worker_thread_pool.h b/core/object/worker_thread_pool.h index 8774143abfe4..57b67b32fad1 100644 --- a/core/object/worker_thread_pool.h +++ b/core/object/worker_thread_pool.h @@ -239,6 +239,7 @@ class WorkerThreadPool : public Object { static WorkerThreadPool *get_singleton() { return singleton; } static int get_thread_index(); + static TaskID get_caller_task_id(); #ifdef THREADS_ENABLED static uint32_t thread_enter_unlock_allowance_zone(Mutex *p_mutex); From 8a78f5c323d5d6e6471c2a0182732744e47ea637 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Wed, 10 Jul 2024 13:51:02 +0200 Subject: [PATCH 05/17] ResourceLoader: Optimize remap check by deferring until a non-mutex zone (cherry picked from commit 5c970db2e49af93139d15d3fe090db44b4bd3317) --- core/io/resource_loader.cpp | 9 +++++---- core/io/resource_loader.h | 2 -- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index f21d1e92dd15..9938b0ac5dce 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -338,8 +338,10 @@ void ResourceLoader::_run_load_task(void *p_userdata) { } // -- + bool xl_remapped = false; + const String &remapped_path = _path_remap(load_task.local_path, &xl_remapped); Error load_err = OK; - Ref res = _load(load_task.remapped_path, load_task.remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_err, load_task.use_sub_threads, &load_task.progress); + Ref res = _load(remapped_path, remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_err, load_task.use_sub_threads, &load_task.progress); if (MessageQueue::get_singleton() != MessageQueue::get_main_singleton()) { MessageQueue::get_singleton()->flush(); } @@ -398,14 +400,14 @@ void ResourceLoader::_run_load_task(void *p_userdata) { load_task.resource->set_path_cache(load_task.local_path); } - if (load_task.xl_remapped) { + if (xl_remapped) { load_task.resource->set_as_translation_remapped(true); } #ifdef TOOLS_ENABLED load_task.resource->set_edited(false); if (timestamp_on_load) { - uint64_t mt = FileAccess::get_modified_time(load_task.remapped_path); + uint64_t mt = FileAccess::get_modified_time(remapped_path); //printf("mt %s: %lli\n",remapped_path.utf8().get_data(),mt); load_task.resource->set_last_modified_time(mt); } @@ -527,7 +529,6 @@ Ref ResourceLoader::_load_start(const String &p_path, { ThreadLoadTask load_task; - load_task.remapped_path = _path_remap(local_path, &load_task.xl_remapped); load_task.load_token = load_token.ptr(); load_task.local_path = local_path; load_task.type_hint = p_type_hint; diff --git a/core/io/resource_loader.h b/core/io/resource_loader.h index 511bea1e1690..f00a5cf11729 100644 --- a/core/io/resource_loader.h +++ b/core/io/resource_loader.h @@ -173,7 +173,6 @@ class ResourceLoader { bool need_wait = true; LoadToken *load_token = nullptr; String local_path; - String remapped_path; String type_hint; float progress = 0.0f; float max_reported_progress = 0.0f; @@ -182,7 +181,6 @@ class ResourceLoader { ResourceFormatLoader::CacheMode cache_mode = ResourceFormatLoader::CACHE_MODE_REUSE; Error error = OK; Ref resource; - bool xl_remapped = false; bool use_sub_threads = false; HashSet sub_tasks; }; From b3e46a913d10b029b8ebeb58017e1ce260c42988 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Mon, 15 Jul 2024 10:30:02 +0200 Subject: [PATCH 06/17] ResourceLoader: Fix edge cases in the management of user tokens 1. Make handling of user tokens atomic: Loads started with the external-facing API used to perform a two-step setup of the user token. Between both, the mutex was unlocked without its reference count having been increased. A non-user-initiated load could therefore destroy the load task when it unreferenced the token. Those stages now happen atomically so in the one hand, the described race condition can't happen so the load task life insurance doesn't have a gap anymore and, on the other hand, the ugliness that the call to load could return `ERR_BUSY` if happening while other thread was between both steps is gone. The code has been refactored so the user token concerns are still outside the inner load start function, which is agnostic to that for a cleaner implementation. 2. Clear ambiguity between load operations running on `WorkerThreadPool`: The two cases are: single-loaded thread directly started by a user pool task and a load started by the system as part of a multi-threaded load. Since ensuring all the code dealing with this distinction would make it very complex, and error-prone, a different measure is applied instead: just take one of the cases out of the dicotomy. We now ensure every load happening on a pool thread has been initiated by the system. The way of achieving that is that a single-threaded user-started load initiated from a pool thread, is run as another task. (cherry picked from commit df23858488098086da20c67d9fc62f7ffb3d528c) --- core/io/resource_loader.cpp | 178 +++++++++++++++++++----------------- core/io/resource_loader.h | 6 +- 2 files changed, 100 insertions(+), 84 deletions(-) diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 9938b0ac5dce..c3c31d8823dc 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -231,23 +231,22 @@ void ResourceLoader::LoadToken::clear() { WorkerThreadPool::TaskID task_to_await = 0; + // User-facing tokens shouldn't be deleted until completely claimed. + DEV_ASSERT(user_rc == 0 && user_path.is_empty()); + if (!local_path.is_empty()) { // Empty is used for the special case where the load task is not registered. DEV_ASSERT(thread_load_tasks.has(local_path)); ThreadLoadTask &load_task = thread_load_tasks[local_path]; - if (!load_task.awaited) { + if (load_task.task_id && !load_task.awaited) { task_to_await = load_task.task_id; - load_task.awaited = true; } + // Removing a task which is still in progress would be catastrophic. + // Tokens must be alive until the task thread function is done. + DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED); thread_load_tasks.erase(local_path); local_path.clear(); } - if (!user_path.is_empty()) { - DEV_ASSERT(user_load_tokens.has(user_path)); - user_load_tokens.erase(user_path); - user_path.clear(); - } - thread_load_mutex.unlock(); // If task is unused, await it here, locally, now the token data is consistent. @@ -457,36 +456,44 @@ static String _validate_local_path(const String &p_path) { } Error ResourceLoader::load_threaded_request(const String &p_path, const String &p_type_hint, bool p_use_sub_threads, ResourceFormatLoader::CacheMode p_cache_mode) { - thread_load_mutex.lock(); - if (user_load_tokens.has(p_path)) { - print_verbose("load_threaded_request(): Another threaded load for resource path '" + p_path + "' has been initiated. Not an error."); - user_load_tokens[p_path]->reference(); // Additional request. - thread_load_mutex.unlock(); - return OK; - } - user_load_tokens[p_path] = nullptr; - thread_load_mutex.unlock(); + Ref token = _load_start(p_path, p_type_hint, p_use_sub_threads ? LOAD_THREAD_DISTRIBUTE : LOAD_THREAD_SPAWN_SINGLE, p_cache_mode, true); + return token.is_valid() ? OK : FAILED; +} - Ref token = _load_start(p_path, p_type_hint, p_use_sub_threads ? LOAD_THREAD_DISTRIBUTE : LOAD_THREAD_SPAWN_SINGLE, p_cache_mode); - if (token.is_valid()) { - thread_load_mutex.lock(); - token->user_path = p_path; - token->reference(); // First request. - user_load_tokens[p_path] = token.ptr(); - print_lt("REQUEST: user load tokens: " + itos(user_load_tokens.size())); - thread_load_mutex.unlock(); - return OK; +ResourceLoader::LoadToken *ResourceLoader::_load_threaded_request_reuse_user_token(const String &p_path) { + HashMap::Iterator E = user_load_tokens.find(p_path); + if (E) { + print_verbose("load_threaded_request(): Another threaded load for resource path '" + p_path + "' has been initiated. Not an error."); + LoadToken *token = E->value; + token->user_rc++; + return token; } else { - return FAILED; + return nullptr; } } +void ResourceLoader::_load_threaded_request_setup_user_token(LoadToken *p_token, const String &p_path) { + p_token->user_path = p_path; + p_token->reference(); // Extra RC until all user requests have been gotten. + p_token->user_rc = 1; + user_load_tokens[p_path] = p_token; + print_lt("REQUEST: user load tokens: " + itos(user_load_tokens.size())); +} + Ref ResourceLoader::load(const String &p_path, const String &p_type_hint, ResourceFormatLoader::CacheMode p_cache_mode, Error *r_error) { if (r_error) { *r_error = OK; } - Ref load_token = _load_start(p_path, p_type_hint, LOAD_THREAD_FROM_CURRENT, p_cache_mode); + LoadThreadMode thread_mode = LOAD_THREAD_FROM_CURRENT; + if (WorkerThreadPool::get_singleton()->get_caller_task_id() != WorkerThreadPool::INVALID_TASK_ID) { + // If user is initiating a single-threaded load from a WorkerThreadPool task, + // we instead spawn a new task so there's a precondition that a load in a pool task + // is always initiated by the engine. That makes certain aspects simpler, such as + // cyclic load detection and awaiting. + thread_mode = LOAD_THREAD_SPAWN_SINGLE; + } + Ref load_token = _load_start(p_path, p_type_hint, thread_mode, p_cache_mode); if (!load_token.is_valid()) { if (r_error) { *r_error = FAILED; @@ -498,7 +505,7 @@ Ref ResourceLoader::load(const String &p_path, const String &p_type_hi return res; } -Ref ResourceLoader::_load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode) { +Ref ResourceLoader::_load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode, bool p_for_user) { String local_path = _validate_local_path(p_path); bool ignoring_cache = p_cache_mode == ResourceFormatLoader::CACHE_MODE_IGNORE || p_cache_mode == ResourceFormatLoader::CACHE_MODE_IGNORE_DEEP; @@ -511,6 +518,13 @@ Ref ResourceLoader::_load_start(const String &p_path, { MutexLock thread_load_lock(thread_load_mutex); + if (p_for_user) { + LoadToken *existing_token = _load_threaded_request_reuse_user_token(p_path); + if (existing_token) { + return Ref(existing_token); + } + } + if (!ignoring_cache && thread_load_tasks.has(local_path)) { load_token = Ref(thread_load_tasks[local_path].load_token); if (load_token.is_valid()) { @@ -524,6 +538,9 @@ Ref ResourceLoader::_load_start(const String &p_path, load_token.instantiate(); load_token->local_path = local_path; + if (p_for_user) { + _load_threaded_request_setup_user_token(load_token.ptr(), p_path); + } //create load task { @@ -541,6 +558,7 @@ Ref ResourceLoader::_load_start(const String &p_path, load_task.resource = existing; load_task.status = THREAD_LOAD_LOADED; load_task.progress = 1.0; + DEV_ASSERT(!thread_load_tasks.has(local_path)); thread_load_tasks[local_path] = load_task; return load_token; } @@ -572,7 +590,7 @@ Ref ResourceLoader::_load_start(const String &p_path, } else { load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_run_load_task, load_task_ptr); } - } + } // MutexLock(thread_load_mutex). if (run_on_current_thread) { _run_load_task(load_task_ptr); @@ -668,13 +686,7 @@ Ref ResourceLoader::load_threaded_get(const String &p_path, Error *r_e } LoadToken *load_token = user_load_tokens[p_path]; - if (!load_token) { - // This happens if requested from one thread and rapidly querying from another. - if (r_error) { - *r_error = ERR_BUSY; - } - return Ref(); - } + DEV_ASSERT(load_token->user_rc >= 1); // Support userland requesting on the main thread before the load is reported to be complete. if (Thread::is_main_thread() && !load_token->local_path.is_empty()) { @@ -691,8 +703,15 @@ Ref ResourceLoader::load_threaded_get(const String &p_path, Error *r_e } res = _load_complete_inner(*load_token, r_error, thread_load_lock); - if (load_token->unreference()) { - memdelete(load_token); + + load_token->user_rc--; + if (load_token->user_rc == 0) { + load_token->user_path.clear(); + user_load_tokens.erase(p_path); + if (load_token->unreference()) { + memdelete(load_token); + load_token = nullptr; + } } } @@ -739,55 +758,45 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro } bool loader_is_wtp = load_task.task_id != 0; - Error wtp_task_err = FAILED; if (loader_is_wtp) { // Loading thread is in the worker pool. - load_task.awaited = true; thread_load_mutex.unlock(); + PREPARE_FOR_WTP_WAIT - wtp_task_err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id); + Error wait_err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id); RESTORE_AFTER_WTP_WAIT - } - if (load_task.status == THREAD_LOAD_IN_PROGRESS) { // If early errored, awaiting would deadlock. - if (loader_is_wtp) { - if (wtp_task_err == ERR_BUSY) { - // The WorkerThreadPool has reported that the current task wants to await on an older one. - // That't not allowed for safety, to avoid deadlocks. Fortunately, though, in the context of - // resource loading that means that the task to wait for can be restarted here to break the - // cycle, with as much recursion into this process as needed. - // When the stack is eventually unrolled, the original load will have been notified to go on. - _run_load_task(&load_task); - Ref resource = load_task.resource; - if (r_error) { - *r_error = load_task.error; - } - thread_load_mutex.lock(); - return resource; - } else { - DEV_ASSERT(wtp_task_err == OK); - thread_load_mutex.lock(); - } - } else if (load_task.need_wait) { - // Loading thread is main or user thread. - if (!load_task.cond_var) { - load_task.cond_var = memnew(ConditionVariable); - } - load_task.awaiters_count++; - do { - load_task.cond_var->wait(p_thread_load_lock); - DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count()); - } while (load_task.need_wait); - load_task.awaiters_count--; - if (load_task.awaiters_count == 0) { - memdelete(load_task.cond_var); - load_task.cond_var = nullptr; - } + DEV_ASSERT(!wait_err || wait_err == ERR_BUSY); + if (wait_err == ERR_BUSY) { + // The WorkerThreadPool has reported that the current task wants to await on an older one. + // That't not allowed for safety, to avoid deadlocks. Fortunately, though, in the context of + // resource loading that means that the task to wait for can be restarted here to break the + // cycle, with as much recursion into this process as needed. + // When the stack is eventually unrolled, the original load will have been notified to go on. + _run_load_task(&load_task); } - } else { - if (loader_is_wtp) { - thread_load_mutex.lock(); + + thread_load_mutex.lock(); + load_task.awaited = true; + + DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED); + } else if (load_task.need_wait) { + // Loading thread is main or user thread. + if (!load_task.cond_var) { + load_task.cond_var = memnew(ConditionVariable); } + load_task.awaiters_count++; + do { + load_task.cond_var->wait(p_thread_load_lock); + DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count()); + } while (load_task.need_wait); + load_task.awaiters_count--; + if (load_task.awaiters_count == 0) { + memdelete(load_task.cond_var); + load_task.cond_var = nullptr; + } + + DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED); } } @@ -1227,10 +1236,13 @@ void ResourceLoader::clear_thread_load_tasks() { } while (user_load_tokens.begin()) { - // User load tokens remove themselves from the map on destruction. - memdelete(user_load_tokens.begin()->value); + LoadToken *user_token = user_load_tokens.begin()->value; + user_load_tokens.remove(user_load_tokens.begin()); + DEV_ASSERT(user_token->user_rc > 0 && !user_token->user_path.is_empty()); + user_token->user_path.clear(); + user_token->user_rc = 0; + user_token->unreference(); } - user_load_tokens.clear(); thread_load_tasks.clear(); diff --git a/core/io/resource_loader.h b/core/io/resource_loader.h index f00a5cf11729..217c14d7a384 100644 --- a/core/io/resource_loader.h +++ b/core/io/resource_loader.h @@ -123,6 +123,7 @@ class ResourceLoader { struct LoadToken : public RefCounted { String local_path; String user_path; + uint32_t user_rc = 0; // Having user RC implies regular RC incremented in one, until the user RC reaches zero. Ref res_if_unregistered; void clear(); @@ -132,10 +133,13 @@ class ResourceLoader { static const int BINARY_MUTEX_TAG = 1; - static Ref _load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode); + static Ref _load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode, bool p_for_user = false); static Ref _load_complete(LoadToken &p_load_token, Error *r_error); private: + static LoadToken *_load_threaded_request_reuse_user_token(const String &p_path); + static void _load_threaded_request_setup_user_token(LoadToken *p_token, const String &p_path); + static Ref _load_complete_inner(LoadToken &p_load_token, Error *r_error, MutexLock> &p_thread_load_lock); static Ref loader[MAX_LOADERS]; From c75c50ecac2967217966762d492c4d9d268e51a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Thu, 18 Jul 2024 14:54:58 +0200 Subject: [PATCH 07/17] WorkerThreadPool (plus friends): Overhaul unlock allowance zones This fixes a rare but possible deadlock, maybe due to undefined behavior. The new implementation is safer, at the cost of some added boilerplate. (cherry picked from commit f4d76853b9d921e3645295f9bebc39eb73661e67) --- SConstruct | 2 +- core/io/resource_loader.cpp | 6 ++- core/io/resource_loader.h | 3 ++ core/object/worker_thread_pool.cpp | 60 +++++++++-------------- core/object/worker_thread_pool.h | 19 +++++--- core/os/condition_variable.h | 8 ++- core/os/mutex.h | 9 +++- core/os/safe_binary_mutex.h | 76 ++++++++++++++--------------- core/templates/command_queue_mt.cpp | 8 --- core/templates/command_queue_mt.h | 18 +++---- modules/gdscript/gdscript_cache.cpp | 10 +++- modules/gdscript/gdscript_cache.h | 9 +++- 12 files changed, 118 insertions(+), 110 deletions(-) diff --git a/SConstruct b/SConstruct index 0ae8f1a38729..2ccbd27638d9 100644 --- a/SConstruct +++ b/SConstruct @@ -856,7 +856,7 @@ else: # GCC, Clang if cc_version_major >= 11: # Broke on MethodBind templates before GCC 11. env.Append(CCFLAGS=["-Wlogical-op"]) elif methods.using_clang(env) or methods.using_emcc(env): - env.Append(CCFLAGS=["-Wimplicit-fallthrough"]) + env.Append(CCFLAGS=["-Wimplicit-fallthrough", "-Wno-undefined-var-template"]) elif env["warnings"] == "all": env.Append(CCFLAGS=["-Wall"] + common_warnings) elif env["warnings"] == "moderate": diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index c3c31d8823dc..fd4fbc36f7c4 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -1363,8 +1363,12 @@ thread_local int ResourceLoader::load_nesting = 0; thread_local Vector ResourceLoader::load_paths_stack; thread_local HashMap>> ResourceLoader::res_ref_overrides; +SafeBinaryMutex &_get_res_loader_mutex() { + return ResourceLoader::thread_load_mutex; +} + template <> -thread_local uint32_t SafeBinaryMutex::count = 0; +thread_local SafeBinaryMutex::TLSData SafeBinaryMutex::tls_data(_get_res_loader_mutex()); SafeBinaryMutex ResourceLoader::thread_load_mutex; HashMap ResourceLoader::thread_load_tasks; bool ResourceLoader::cleaning_tasks = false; diff --git a/core/io/resource_loader.h b/core/io/resource_loader.h index 217c14d7a384..f75bf019fb54 100644 --- a/core/io/resource_loader.h +++ b/core/io/resource_loader.h @@ -194,7 +194,10 @@ class ResourceLoader { static thread_local int load_nesting; static thread_local HashMap>> res_ref_overrides; // Outermost key is nesting level. static thread_local Vector load_paths_stack; + static SafeBinaryMutex thread_load_mutex; + friend SafeBinaryMutex &_get_res_loader_mutex(); + static HashMap thread_load_tasks; static bool cleaning_tasks; diff --git a/core/object/worker_thread_pool.cpp b/core/object/worker_thread_pool.cpp index 155d963a2bd8..7fd43c409490 100644 --- a/core/object/worker_thread_pool.cpp +++ b/core/object/worker_thread_pool.cpp @@ -32,6 +32,7 @@ #include "core/object/script_language.h" #include "core/os/os.h" +#include "core/os/safe_binary_mutex.h" #include "core/os/thread_safe.h" WorkerThreadPool::Task *const WorkerThreadPool::ThreadData::YIELDING = (Task *)1; @@ -46,7 +47,7 @@ void WorkerThreadPool::Task::free_template_userdata() { WorkerThreadPool *WorkerThreadPool::singleton = nullptr; #ifdef THREADS_ENABLED -thread_local uintptr_t WorkerThreadPool::unlockable_mutexes[MAX_UNLOCKABLE_MUTEXES] = {}; +thread_local WorkerThreadPool::UnlockableLocks WorkerThreadPool::unlockable_locks[MAX_UNLOCKABLE_LOCKS]; #endif void WorkerThreadPool::_process_task(Task *p_task) { @@ -428,13 +429,9 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) { void WorkerThreadPool::_lock_unlockable_mutexes() { #ifdef THREADS_ENABLED - for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) { - if (unlockable_mutexes[i]) { - if ((((uintptr_t)unlockable_mutexes[i]) & 1) == 0) { - ((Mutex *)unlockable_mutexes[i])->lock(); - } else { - ((BinaryMutex *)(unlockable_mutexes[i] & ~1))->lock(); - } + for (uint32_t i = 0; i < MAX_UNLOCKABLE_LOCKS; i++) { + if (unlockable_locks[i].ulock) { + unlockable_locks[i].ulock->lock(); } } #endif @@ -442,13 +439,9 @@ void WorkerThreadPool::_lock_unlockable_mutexes() { void WorkerThreadPool::_unlock_unlockable_mutexes() { #ifdef THREADS_ENABLED - for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) { - if (unlockable_mutexes[i]) { - if ((((uintptr_t)unlockable_mutexes[i]) & 1) == 0) { - ((Mutex *)unlockable_mutexes[i])->unlock(); - } else { - ((BinaryMutex *)(unlockable_mutexes[i] & ~1))->unlock(); - } + for (uint32_t i = 0; i < MAX_UNLOCKABLE_LOCKS; i++) { + if (unlockable_locks[i].ulock) { + unlockable_locks[i].ulock->unlock(); } } #endif @@ -675,37 +668,28 @@ WorkerThreadPool::TaskID WorkerThreadPool::get_caller_task_id() { } #ifdef THREADS_ENABLED -uint32_t WorkerThreadPool::thread_enter_unlock_allowance_zone(Mutex *p_mutex) { - return _thread_enter_unlock_allowance_zone(p_mutex, false); -} - -uint32_t WorkerThreadPool::thread_enter_unlock_allowance_zone(BinaryMutex *p_mutex) { - return _thread_enter_unlock_allowance_zone(p_mutex, true); -} - -uint32_t WorkerThreadPool::_thread_enter_unlock_allowance_zone(void *p_mutex, bool p_is_binary) { - for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) { - if (unlikely((unlockable_mutexes[i] & ~1) == (uintptr_t)p_mutex)) { +uint32_t WorkerThreadPool::_thread_enter_unlock_allowance_zone(THREADING_NAMESPACE::unique_lock &p_ulock) { + for (uint32_t i = 0; i < MAX_UNLOCKABLE_LOCKS; i++) { + DEV_ASSERT((bool)unlockable_locks[i].ulock == (bool)unlockable_locks[i].rc); + if (unlockable_locks[i].ulock == &p_ulock) { // Already registered in the current thread. - return UINT32_MAX; - } - if (!unlockable_mutexes[i]) { - unlockable_mutexes[i] = (uintptr_t)p_mutex; - if (p_is_binary) { - unlockable_mutexes[i] |= 1; - } + unlockable_locks[i].rc++; + return i; + } else if (!unlockable_locks[i].ulock) { + unlockable_locks[i].ulock = &p_ulock; + unlockable_locks[i].rc = 1; return i; } } - ERR_FAIL_V_MSG(UINT32_MAX, "No more unlockable mutex slots available. Engine bug."); + ERR_FAIL_V_MSG(UINT32_MAX, "No more unlockable lock slots available. Engine bug."); } void WorkerThreadPool::thread_exit_unlock_allowance_zone(uint32_t p_zone_id) { - if (p_zone_id == UINT32_MAX) { - return; + DEV_ASSERT(unlockable_locks[p_zone_id].ulock && unlockable_locks[p_zone_id].rc); + unlockable_locks[p_zone_id].rc--; + if (unlockable_locks[p_zone_id].rc == 0) { + unlockable_locks[p_zone_id].ulock = nullptr; } - DEV_ASSERT(unlockable_mutexes[p_zone_id]); - unlockable_mutexes[p_zone_id] = 0; } #endif diff --git a/core/object/worker_thread_pool.h b/core/object/worker_thread_pool.h index 57b67b32fad1..5be4f2092708 100644 --- a/core/object/worker_thread_pool.h +++ b/core/object/worker_thread_pool.h @@ -162,8 +162,12 @@ class WorkerThreadPool : public Object { static WorkerThreadPool *singleton; #ifdef THREADS_ENABLED - static const uint32_t MAX_UNLOCKABLE_MUTEXES = 2; - static thread_local uintptr_t unlockable_mutexes[MAX_UNLOCKABLE_MUTEXES]; + static const uint32_t MAX_UNLOCKABLE_LOCKS = 2; + struct UnlockableLocks { + THREADING_NAMESPACE::unique_lock *ulock = nullptr; + uint32_t rc = 0; + }; + static thread_local UnlockableLocks unlockable_locks[MAX_UNLOCKABLE_LOCKS]; #endif TaskID _add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description); @@ -192,7 +196,7 @@ class WorkerThreadPool : public Object { void _wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task); #ifdef THREADS_ENABLED - static uint32_t _thread_enter_unlock_allowance_zone(void *p_mutex, bool p_is_binary); + static uint32_t _thread_enter_unlock_allowance_zone(THREADING_NAMESPACE::unique_lock &p_ulock); #endif void _lock_unlockable_mutexes(); @@ -242,11 +246,14 @@ class WorkerThreadPool : public Object { static TaskID get_caller_task_id(); #ifdef THREADS_ENABLED - static uint32_t thread_enter_unlock_allowance_zone(Mutex *p_mutex); - static uint32_t thread_enter_unlock_allowance_zone(BinaryMutex *p_mutex); + _ALWAYS_INLINE_ static uint32_t thread_enter_unlock_allowance_zone(const MutexLock &p_lock) { return _thread_enter_unlock_allowance_zone(p_lock._get_lock()); } + template + _ALWAYS_INLINE_ static uint32_t thread_enter_unlock_allowance_zone(const SafeBinaryMutex &p_mutex) { return _thread_enter_unlock_allowance_zone(p_mutex._get_lock()); } static void thread_exit_unlock_allowance_zone(uint32_t p_zone_id); #else - static uint32_t thread_enter_unlock_allowance_zone(void *p_mutex) { return UINT32_MAX; } + static uint32_t thread_enter_unlock_allowance_zone(const MutexLock &p_lock) { return UINT32_MAX; } + template + static uint32_t thread_enter_unlock_allowance_zone(const SafeBinaryMutex &p_mutex) { return UINT32_MAX; } static void thread_exit_unlock_allowance_zone(uint32_t p_zone_id) {} #endif diff --git a/core/os/condition_variable.h b/core/os/condition_variable.h index fa1355e98c70..c819fa6b402b 100644 --- a/core/os/condition_variable.h +++ b/core/os/condition_variable.h @@ -32,6 +32,7 @@ #define CONDITION_VARIABLE_H #include "core/os/mutex.h" +#include "core/os/safe_binary_mutex.h" #ifdef THREADS_ENABLED @@ -56,7 +57,12 @@ class ConditionVariable { public: template _ALWAYS_INLINE_ void wait(const MutexLock &p_lock) const { - condition.wait(const_cast &>(p_lock.lock)); + condition.wait(const_cast &>(p_lock._get_lock())); + } + + template + _ALWAYS_INLINE_ void wait(const MutexLock> &p_lock) const { + condition.wait(const_cast &>(p_lock.mutex._get_lock())); } _ALWAYS_INLINE_ void notify_one() const { diff --git a/core/os/mutex.h b/core/os/mutex.h index 3e7aa81bc1a5..773b31828dc6 100644 --- a/core/os/mutex.h +++ b/core/os/mutex.h @@ -72,13 +72,18 @@ class MutexImpl { template class MutexLock { - friend class ConditionVariable; - THREADING_NAMESPACE::unique_lock lock; public: explicit MutexLock(const MutexT &p_mutex) : lock(p_mutex.mutex) {} + + // Clarification: all the funny syntax is needed so this function exists only for binary mutexes. + template + _ALWAYS_INLINE_ THREADING_NAMESPACE::unique_lock &_get_lock( + typename std::enable_if::value> * = nullptr) const { + return const_cast &>(lock); + } }; using Mutex = MutexImpl; // Recursive, for general use diff --git a/core/os/safe_binary_mutex.h b/core/os/safe_binary_mutex.h index 1e98cc074cdc..4ca4b50b02cd 100644 --- a/core/os/safe_binary_mutex.h +++ b/core/os/safe_binary_mutex.h @@ -47,76 +47,76 @@ // Also, don't forget to declare the thread_local variable on each use. template class SafeBinaryMutex { - friend class MutexLock; + friend class MutexLock>; using StdMutexType = THREADING_NAMESPACE::mutex; mutable THREADING_NAMESPACE::mutex mutex; - static thread_local uint32_t count; + + struct TLSData { + mutable THREADING_NAMESPACE::unique_lock lock; + uint32_t count = 0; + + TLSData(SafeBinaryMutex &p_mutex) : + lock(p_mutex.mutex, THREADING_NAMESPACE::defer_lock) {} + }; + static thread_local TLSData tls_data; public: _ALWAYS_INLINE_ void lock() const { - if (++count == 1) { - mutex.lock(); + if (++tls_data.count == 1) { + tls_data.lock.lock(); } } _ALWAYS_INLINE_ void unlock() const { - DEV_ASSERT(count); - if (--count == 0) { - mutex.unlock(); + DEV_ASSERT(tls_data.count); + if (--tls_data.count == 0) { + tls_data.lock.unlock(); } } - _ALWAYS_INLINE_ bool try_lock() const { - if (count) { - count++; - return true; - } else { - if (mutex.try_lock()) { - count++; - return true; - } else { - return false; - } - } + _ALWAYS_INLINE_ THREADING_NAMESPACE::unique_lock &_get_lock() const { + return const_cast &>(tls_data.lock); } - ~SafeBinaryMutex() { - DEV_ASSERT(!count); + _ALWAYS_INLINE_ SafeBinaryMutex() { + } + + _ALWAYS_INLINE_ ~SafeBinaryMutex() { + DEV_ASSERT(!tls_data.count); } }; -// This specialization is needed so manual locking and MutexLock can be used -// at the same time on a SafeBinaryMutex. template class MutexLock> { friend class ConditionVariable; - THREADING_NAMESPACE::unique_lock lock; + const SafeBinaryMutex &mutex; public: - _ALWAYS_INLINE_ explicit MutexLock(const SafeBinaryMutex &p_mutex) : - lock(p_mutex.mutex) { - SafeBinaryMutex::count++; - }; - _ALWAYS_INLINE_ ~MutexLock() { - SafeBinaryMutex::count--; - }; + explicit MutexLock(const SafeBinaryMutex &p_mutex) : + mutex(p_mutex) { + mutex.lock(); + } + + ~MutexLock() { + mutex.unlock(); + } }; #else // No threads. template -class SafeBinaryMutex : public MutexImpl { - static thread_local uint32_t count; -}; +class SafeBinaryMutex { + struct TLSData { + TLSData(SafeBinaryMutex &p_mutex) {} + }; + static thread_local TLSData tls_data; -template -class MutexLock> { public: - MutexLock(const SafeBinaryMutex &p_mutex) {} - ~MutexLock() {} + void lock() const {} + void unlock() const {} }; #endif // THREADS_ENABLED diff --git a/core/templates/command_queue_mt.cpp b/core/templates/command_queue_mt.cpp index ef75a70868e6..5fa767263f95 100644 --- a/core/templates/command_queue_mt.cpp +++ b/core/templates/command_queue_mt.cpp @@ -33,14 +33,6 @@ #include "core/config/project_settings.h" #include "core/os/os.h" -void CommandQueueMT::lock() { - mutex.lock(); -} - -void CommandQueueMT::unlock() { - mutex.unlock(); -} - CommandQueueMT::CommandQueueMT() { command_mem.reserve(DEFAULT_COMMAND_MEM_SIZE_KB * 1024); } diff --git a/core/templates/command_queue_mt.h b/core/templates/command_queue_mt.h index 1e6c6e42a96e..8ef5dd30642d 100644 --- a/core/templates/command_queue_mt.h +++ b/core/templates/command_queue_mt.h @@ -362,23 +362,24 @@ class CommandQueueMT { return; } - lock(); + MutexLock lock(mutex); - uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(&mutex); while (flush_read_ptr < command_mem.size()) { uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr]; flush_read_ptr += 8; CommandBase *cmd = reinterpret_cast(&command_mem[flush_read_ptr]); + uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(lock); cmd->call(); + WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id); // Handle potential realloc due to the command and unlock allowance. cmd = reinterpret_cast(&command_mem[flush_read_ptr]); if (unlikely(cmd->sync)) { sync_head++; - unlock(); // Give an opportunity to awaiters right away. + lock.~MutexLock(); // Give an opportunity to awaiters right away. sync_cond_var.notify_all(); - lock(); + new (&lock) MutexLock(mutex); // Handle potential realloc happened during unlock. cmd = reinterpret_cast(&command_mem[flush_read_ptr]); } @@ -387,14 +388,11 @@ class CommandQueueMT { flush_read_ptr += size; } - WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id); command_mem.clear(); flush_read_ptr = 0; _prevent_sync_wraparound(); - - unlock(); } _FORCE_INLINE_ void _wait_for_sync(MutexLock &p_lock) { @@ -410,9 +408,6 @@ class CommandQueueMT { void _no_op() {} public: - void lock(); - void unlock(); - /* NORMAL PUSH COMMANDS */ DECL_PUSH(0) SPACE_SEP_LIST(DECL_PUSH, 15) @@ -446,9 +441,8 @@ class CommandQueueMT { } void set_pump_task_id(WorkerThreadPool::TaskID p_task_id) { - lock(); + MutexLock lock(mutex); pump_task_id = p_task_id; - unlock(); } CommandQueueMT(); diff --git a/modules/gdscript/gdscript_cache.cpp b/modules/gdscript/gdscript_cache.cpp index 3b6526ffd91e..b3c0744bdf20 100644 --- a/modules/gdscript/gdscript_cache.cpp +++ b/modules/gdscript/gdscript_cache.cpp @@ -144,6 +144,14 @@ GDScriptParserRef::~GDScriptParserRef() { GDScriptCache *GDScriptCache::singleton = nullptr; +SafeBinaryMutex &_get_gdscript_cache_mutex() { + return GDScriptCache::mutex; +} + +template <> +thread_local SafeBinaryMutex::TLSData SafeBinaryMutex::tls_data(_get_gdscript_cache_mutex()); +SafeBinaryMutex GDScriptCache::mutex; + void GDScriptCache::move_script(const String &p_from, const String &p_to) { if (singleton == nullptr || p_from == p_to) { return; @@ -369,7 +377,7 @@ Ref GDScriptCache::get_full_script(const String &p_path, Error &r_erro // Allowing lifting the lock might cause a script to be reloaded multiple times, // which, as a last resort deadlock prevention strategy, is a good tradeoff. - uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(&singleton->mutex); + uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(singleton->mutex); r_error = script->reload(true); WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id); if (r_error) { diff --git a/modules/gdscript/gdscript_cache.h b/modules/gdscript/gdscript_cache.h index f7f2cd90e9e9..4903da92b4f3 100644 --- a/modules/gdscript/gdscript_cache.h +++ b/modules/gdscript/gdscript_cache.h @@ -34,7 +34,7 @@ #include "gdscript.h" #include "core/object/ref_counted.h" -#include "core/os/mutex.h" +#include "core/os/safe_binary_mutex.h" #include "core/templates/hash_map.h" #include "core/templates/hash_set.h" @@ -95,7 +95,12 @@ class GDScriptCache { bool cleared = false; - Mutex mutex; +public: + static const int BINARY_MUTEX_TAG = 2; + +private: + static SafeBinaryMutex mutex; + friend SafeBinaryMutex &_get_gdscript_cache_mutex(); public: static void move_script(const String &p_from, const String &p_to); From fe2e862e2eba20f4f2158c40d0ef9c2fdb508b14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Tue, 13 Aug 2024 12:52:08 +0200 Subject: [PATCH 08/17] ResourceLoader: Use better error handling for possible engine bugs (cherry picked from commit 31a9e10ddb37f7b5c8697c24ba02ce7bd7a1305a) --- core/io/resource_loader.cpp | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index fd4fbc36f7c4..5204880d9d5c 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -636,13 +636,7 @@ ResourceLoader::ThreadLoadStatus ResourceLoader::load_threaded_get_status(const } String local_path = _validate_local_path(p_path); - if (!thread_load_tasks.has(local_path)) { -#ifdef DEV_ENABLED - CRASH_NOW(); -#endif - // On non-dev, be defensive and at least avoid crashing (at this point at least). - return THREAD_LOAD_INVALID_RESOURCE; - } + ERR_FAIL_COND_V_MSG(!thread_load_tasks.has(local_path), THREAD_LOAD_INVALID_RESOURCE, "Bug in ResourceLoader logic, please report."); ThreadLoadTask &load_task = thread_load_tasks[local_path]; status = load_task.status; @@ -732,14 +726,10 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro if (!p_load_token.local_path.is_empty()) { if (!thread_load_tasks.has(p_load_token.local_path)) { -#ifdef DEV_ENABLED - CRASH_NOW(); -#endif - // On non-dev, be defensive and at least avoid crashing (at this point at least). if (r_error) { *r_error = ERR_BUG; } - return Ref(); + ERR_FAIL_V_MSG(Ref(), "Bug in ResourceLoader logic, please report."); } ThreadLoadTask &load_task = thread_load_tasks[p_load_token.local_path]; From 1fd87e8747cf1f76485d9e36a53ec70f6040168f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Mon, 26 Aug 2024 12:33:19 +0200 Subject: [PATCH 09/17] Change warning muting so it affects all levels, but locally (cherry picked from commit 9cbc3f14198c30c14315cabf72b0e3e3438b2f61) --- SConstruct | 2 +- core/os/safe_binary_mutex.h | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/SConstruct b/SConstruct index 2ccbd27638d9..0ae8f1a38729 100644 --- a/SConstruct +++ b/SConstruct @@ -856,7 +856,7 @@ else: # GCC, Clang if cc_version_major >= 11: # Broke on MethodBind templates before GCC 11. env.Append(CCFLAGS=["-Wlogical-op"]) elif methods.using_clang(env) or methods.using_emcc(env): - env.Append(CCFLAGS=["-Wimplicit-fallthrough", "-Wno-undefined-var-template"]) + env.Append(CCFLAGS=["-Wimplicit-fallthrough"]) elif env["warnings"] == "all": env.Append(CCFLAGS=["-Wall"] + common_warnings) elif env["warnings"] == "moderate": diff --git a/core/os/safe_binary_mutex.h b/core/os/safe_binary_mutex.h index 4ca4b50b02cd..1035ee76b4bb 100644 --- a/core/os/safe_binary_mutex.h +++ b/core/os/safe_binary_mutex.h @@ -37,6 +37,11 @@ #ifdef THREADS_ENABLED +#ifdef __clang__ +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wundefined-var-template" +#endif + // A very special kind of mutex, used in scenarios where these // requirements hold at the same time: // - Must be used with a condition variable (only binary mutexes are suitable). @@ -105,6 +110,10 @@ class MutexLock> { } }; +#ifdef __clang__ +#pragma clang diagnostic pop +#endif + #else // No threads. template From 98e77113a2e8aedaf9ab1c85aa98f07ae7ffc29e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Wed, 28 Aug 2024 13:53:39 +0200 Subject: [PATCH 10/17] ResourceLoader: Handle another case of user tokens (cherry picked from commit 0441c67de67b3c84d57e8ade5f3a0fee70959338) --- core/io/resource_loader.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 5204880d9d5c..149f382bf10b 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -528,6 +528,11 @@ Ref ResourceLoader::_load_start(const String &p_path, if (!ignoring_cache && thread_load_tasks.has(local_path)) { load_token = Ref(thread_load_tasks[local_path].load_token); if (load_token.is_valid()) { + if (p_for_user) { + // Load task exists, with no user tokens at the moment. + // Let's "attach" to it. + _load_threaded_request_setup_user_token(load_token.ptr(), p_path); + } return load_token; } else { // The token is dying (reached 0 on another thread). From f2d0f66eca23fcf4b79a71669d90d351575eed7a Mon Sep 17 00:00:00 2001 From: Kongfa Waroros Date: Mon, 29 Jul 2024 21:31:57 +0700 Subject: [PATCH 11/17] Fix ResourceLoader is not verbosely printing a resource path on loading (cherry picked from commit bfb5570c033b633c030cc269d5e98213dfebd5a5) --- core/io/resource_loader.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 149f382bf10b..2980d72783b7 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -339,12 +339,19 @@ void ResourceLoader::_run_load_task(void *p_userdata) { bool xl_remapped = false; const String &remapped_path = _path_remap(load_task.local_path, &xl_remapped); + + print_verbose("Loading resource: " + remapped_path); + Error load_err = OK; Ref res = _load(remapped_path, remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_err, load_task.use_sub_threads, &load_task.progress); if (MessageQueue::get_singleton() != MessageQueue::get_main_singleton()) { MessageQueue::get_singleton()->flush(); } + if (res.is_null()) { + print_verbose("Failed loading resource: " + remapped_path); + } + thread_load_mutex.lock(); load_task.resource = res; From cd327055089eb867fa9064b26c228be1f97cf9d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Thu, 5 Sep 2024 08:35:05 +0200 Subject: [PATCH 12/17] ResourceLoader: Simplify handling of unregistered tasks (cherry picked from commit c450f4d667e9f2462cb506cd3a32ca882c49ba68) --- core/io/resource_loader.cpp | 70 ++++++++++++++++--------------------- core/io/resource_loader.h | 4 ++- 2 files changed, 34 insertions(+), 40 deletions(-) diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 2980d72783b7..82084c159879 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -234,17 +234,22 @@ void ResourceLoader::LoadToken::clear() { // User-facing tokens shouldn't be deleted until completely claimed. DEV_ASSERT(user_rc == 0 && user_path.is_empty()); - if (!local_path.is_empty()) { // Empty is used for the special case where the load task is not registered. - DEV_ASSERT(thread_load_tasks.has(local_path)); - ThreadLoadTask &load_task = thread_load_tasks[local_path]; - if (load_task.task_id && !load_task.awaited) { - task_to_await = load_task.task_id; + if (!local_path.is_empty()) { + if (task_if_unregistered) { + memdelete(task_if_unregistered); + task_if_unregistered = nullptr; + } else { + DEV_ASSERT(thread_load_tasks.has(local_path)); + ThreadLoadTask &load_task = thread_load_tasks[local_path]; + if (load_task.task_id && !load_task.awaited) { + task_to_await = load_task.task_id; + } + // Removing a task which is still in progress would be catastrophic. + // Tokens must be alive until the task thread function is done. + DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED); + thread_load_tasks.erase(local_path); } - // Removing a task which is still in progress would be catastrophic. - // Tokens must be alive until the task thread function is done. - DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED); - thread_load_tasks.erase(local_path); - local_path.clear(); + local_path.clear(); // Mark as already cleared. } thread_load_mutex.unlock(); @@ -519,9 +524,7 @@ Ref ResourceLoader::_load_start(const String &p_path, Ref load_token; bool must_not_register = false; - ThreadLoadTask unregistered_load_task; // Once set, must be valid up to the call to do the load. ThreadLoadTask *load_task_ptr = nullptr; - bool run_on_current_thread = false; { MutexLock thread_load_lock(thread_load_mutex); @@ -576,12 +579,11 @@ Ref ResourceLoader::_load_start(const String &p_path, } } - // If we want to ignore cache, but there's another task loading it, we can't add this one to the map and we also have to finish within scope. + // If we want to ignore cache, but there's another task loading it, we can't add this one to the map. must_not_register = ignoring_cache && thread_load_tasks.has(local_path); if (must_not_register) { - load_token->local_path.clear(); - unregistered_load_task = load_task; - load_task_ptr = &unregistered_load_task; + load_token->task_if_unregistered = memnew(ThreadLoadTask(load_task)); + load_task_ptr = load_token->task_if_unregistered; } else { DEV_ASSERT(!thread_load_tasks.has(local_path)); HashMap::Iterator E = thread_load_tasks.insert(local_path, load_task); @@ -589,9 +591,7 @@ Ref ResourceLoader::_load_start(const String &p_path, } } - run_on_current_thread = must_not_register || p_thread_mode == LOAD_THREAD_FROM_CURRENT; - - if (run_on_current_thread) { + if (p_thread_mode == LOAD_THREAD_FROM_CURRENT) { // The current thread may happen to be a thread from the pool. WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->get_caller_task_id(); if (tid != WorkerThreadPool::INVALID_TASK_ID) { @@ -604,11 +604,8 @@ Ref ResourceLoader::_load_start(const String &p_path, } } // MutexLock(thread_load_mutex). - if (run_on_current_thread) { + if (p_thread_mode == LOAD_THREAD_FROM_CURRENT) { _run_load_task(load_task_ptr); - if (must_not_register) { - load_token->res_if_unregistered = load_task_ptr->resource; - } } return load_token; @@ -736,7 +733,10 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro *r_error = OK; } - if (!p_load_token.local_path.is_empty()) { + ThreadLoadTask *load_task_ptr = nullptr; + if (p_load_token.task_if_unregistered) { + load_task_ptr = p_load_token.task_if_unregistered; + } else { if (!thread_load_tasks.has(p_load_token.local_path)) { if (r_error) { *r_error = ERR_BUG; @@ -807,22 +807,14 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro load_task.error = FAILED; } - Ref resource = load_task.resource; - if (r_error) { - *r_error = load_task.error; - } - return resource; - } else { - // Special case of an unregistered task. - // The resource should have been loaded by now. - Ref resource = p_load_token.res_if_unregistered; - if (!resource.is_valid()) { - if (r_error) { - *r_error = FAILED; - } - } - return resource; + load_task_ptr = &load_task; + } + + Ref resource = load_task_ptr->resource; + if (r_error) { + *r_error = load_task_ptr->error; } + return resource; } bool ResourceLoader::_ensure_load_progress() { diff --git a/core/io/resource_loader.h b/core/io/resource_loader.h index f75bf019fb54..34ac1ba3e92f 100644 --- a/core/io/resource_loader.h +++ b/core/io/resource_loader.h @@ -106,6 +106,8 @@ class ResourceLoader { MAX_LOADERS = 64 }; + struct ThreadLoadTask; + public: enum ThreadLoadStatus { THREAD_LOAD_INVALID_RESOURCE, @@ -124,7 +126,7 @@ class ResourceLoader { String local_path; String user_path; uint32_t user_rc = 0; // Having user RC implies regular RC incremented in one, until the user RC reaches zero. - Ref res_if_unregistered; + ThreadLoadTask *task_if_unregistered = nullptr; void clear(); From f806cfb72b0739e127397dee42d98677ac56510c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Thu, 5 Sep 2024 09:48:13 +0200 Subject: [PATCH 13/17] ResourceLoader: Add thread-aware resource changed mechanism (cherry picked from commit 0f3ee922e07fd4d16d9ef6dac150beb9c84ac527) --- core/io/resource.cpp | 22 +++++----- core/io/resource_loader.cpp | 84 +++++++++++++++++++++++++++++++++++++ core/io/resource_loader.h | 12 ++++++ 3 files changed, 107 insertions(+), 11 deletions(-) diff --git a/core/io/resource.cpp b/core/io/resource.cpp index 432adb88da9f..f2071ebe0d27 100644 --- a/core/io/resource.cpp +++ b/core/io/resource.cpp @@ -40,12 +40,12 @@ #include void Resource::emit_changed() { - if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) { - // Let the connection happen on the call queue, later, since signals are not thread-safe. - call_deferred("emit_signal", CoreStringName(changed)); - } else { - emit_signal(CoreStringName(changed)); + if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) { + ResourceLoader::resource_changed_emit(this); + return; } + + emit_signal(CoreStringName(changed)); } void Resource::_resource_path_changed() { @@ -166,22 +166,22 @@ bool Resource::editor_can_reload_from_file() { } void Resource::connect_changed(const Callable &p_callable, uint32_t p_flags) { - if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) { - // Let the check and connection happen on the call queue, later, since signals are not thread-safe. - callable_mp(this, &Resource::connect_changed).call_deferred(p_callable, p_flags); + if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) { + ResourceLoader::resource_changed_connect(this, p_callable, p_flags); return; } + if (!is_connected(CoreStringName(changed), p_callable) || p_flags & CONNECT_REFERENCE_COUNTED) { connect(CoreStringName(changed), p_callable, p_flags); } } void Resource::disconnect_changed(const Callable &p_callable) { - if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) { - // Let the check and disconnection happen on the call queue, later, since signals are not thread-safe. - callable_mp(this, &Resource::disconnect_changed).call_deferred(p_callable); + if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) { + ResourceLoader::resource_changed_disconnect(this, p_callable); return; } + if (is_connected(CoreStringName(changed), p_callable)) { disconnect(CoreStringName(changed), p_callable); } diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 82084c159879..0ee9fb2d9f36 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -31,6 +31,7 @@ #include "resource_loader.h" #include "core/config/project_settings.h" +#include "core/core_bind.h" #include "core/io/file_access.h" #include "core/io/resource_importer.h" #include "core/object/script_language.h" @@ -327,6 +328,9 @@ void ResourceLoader::_run_load_task(void *p_userdata) { } thread_load_mutex.unlock(); + ThreadLoadTask *curr_load_task_backup = curr_load_task; + curr_load_task = &load_task; + // Thread-safe either if it's the current thread or a brand new one. CallQueue *own_mq_override = nullptr; if (load_nesting == 0) { @@ -454,6 +458,8 @@ void ResourceLoader::_run_load_task(void *p_userdata) { } DEV_ASSERT(load_paths_stack.is_empty()); } + + curr_load_task = curr_load_task_backup; } static String _validate_local_path(const String &p_path) { @@ -814,6 +820,39 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro if (r_error) { *r_error = load_task_ptr->error; } + + if (resource.is_valid()) { + if (curr_load_task) { + // A task awaiting another => Let the awaiter accumulate the resource changed connections. + DEV_ASSERT(curr_load_task != load_task_ptr); + for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) { + curr_load_task->resource_changed_connections.push_back(rcc); + } + } else { + // A leaf task being awaited => Propagate the resource changed connections. + if (Thread::is_main_thread()) { + // On the main thread it's safe to migrate the connections to the standard signal mechanism. + for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) { + if (rcc.callable.is_valid()) { + rcc.source->connect_changed(rcc.callable, rcc.flags); + } + } + } else { + // On non-main threads, we have to queue and call it done when processed. + if (!load_task_ptr->resource_changed_connections.is_empty()) { + for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) { + if (rcc.callable.is_valid()) { + MessageQueue::get_main_singleton()->push_callable(callable_mp(rcc.source, &Resource::connect_changed).bind(rcc.callable, rcc.flags)); + } + } + core_bind::Semaphore done; + MessageQueue::get_main_singleton()->push_callable(callable_mp(&done, &core_bind::Semaphore::post)); + done.wait(); + } + } + } + } + return resource; } @@ -828,6 +867,50 @@ bool ResourceLoader::_ensure_load_progress() { return true; } +void ResourceLoader::resource_changed_connect(Resource *p_source, const Callable &p_callable, uint32_t p_flags) { + print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR "\t%d", Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class(), p_callable.get_object_id())); + + MutexLock lock(thread_load_mutex); + + for (const ThreadLoadTask::ResourceChangedConnection &rcc : curr_load_task->resource_changed_connections) { + if (unlikely(rcc.source == p_source && rcc.callable == p_callable)) { + return; + } + } + + ThreadLoadTask::ResourceChangedConnection rcc; + rcc.source = p_source; + rcc.callable = p_callable; + rcc.flags = p_flags; + curr_load_task->resource_changed_connections.push_back(rcc); +} + +void ResourceLoader::resource_changed_disconnect(Resource *p_source, const Callable &p_callable) { + print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR "t%d", Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class(), p_callable.get_object_id())); + + MutexLock lock(thread_load_mutex); + + for (uint32_t i = 0; i < curr_load_task->resource_changed_connections.size(); ++i) { + const ThreadLoadTask::ResourceChangedConnection &rcc = curr_load_task->resource_changed_connections[i]; + if (unlikely(rcc.source == p_source && rcc.callable == p_callable)) { + curr_load_task->resource_changed_connections.remove_at_unordered(i); + return; + } + } +} + +void ResourceLoader::resource_changed_emit(Resource *p_source) { + print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR, Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class())); + + MutexLock lock(thread_load_mutex); + + for (const ThreadLoadTask::ResourceChangedConnection &rcc : curr_load_task->resource_changed_connections) { + if (unlikely(rcc.source == p_source)) { + rcc.callable.call(); + } + } +} + Ref ResourceLoader::ensure_resource_ref_override_for_outer_load(const String &p_path, const String &p_res_type) { ERR_FAIL_COND_V(load_nesting == 0, Ref()); // It makes no sense to use this from nesting level 0. const String &local_path = _validate_local_path(p_path); @@ -1356,6 +1439,7 @@ bool ResourceLoader::timestamp_on_load = false; thread_local int ResourceLoader::load_nesting = 0; thread_local Vector ResourceLoader::load_paths_stack; thread_local HashMap>> ResourceLoader::res_ref_overrides; +thread_local ResourceLoader::ThreadLoadTask *ResourceLoader::curr_load_task = nullptr; SafeBinaryMutex &_get_res_loader_mutex() { return ResourceLoader::thread_load_mutex; diff --git a/core/io/resource_loader.h b/core/io/resource_loader.h index 34ac1ba3e92f..caaf9f8f45dd 100644 --- a/core/io/resource_loader.h +++ b/core/io/resource_loader.h @@ -189,6 +189,13 @@ class ResourceLoader { Ref resource; bool use_sub_threads = false; HashSet sub_tasks; + + struct ResourceChangedConnection { + Resource *source = nullptr; + Callable callable; + uint32_t flags = 0; + }; + LocalVector resource_changed_connections; }; static void _run_load_task(void *p_userdata); @@ -196,6 +203,7 @@ class ResourceLoader { static thread_local int load_nesting; static thread_local HashMap>> res_ref_overrides; // Outermost key is nesting level. static thread_local Vector load_paths_stack; + static thread_local ThreadLoadTask *curr_load_task; static SafeBinaryMutex thread_load_mutex; friend SafeBinaryMutex &_get_res_loader_mutex(); @@ -216,6 +224,10 @@ class ResourceLoader { static bool is_within_load() { return load_nesting > 0; }; + static void resource_changed_connect(Resource *p_source, const Callable &p_callable, uint32_t p_flags); + static void resource_changed_disconnect(Resource *p_source, const Callable &p_callable); + static void resource_changed_emit(Resource *p_source); + static Ref load(const String &p_path, const String &p_type_hint = "", ResourceFormatLoader::CacheMode p_cache_mode = ResourceFormatLoader::CACHE_MODE_REUSE, Error *r_error = nullptr); static bool exists(const String &p_path, const String &p_type_hint = ""); From ea651a150b27302626ac9174e2209a1029576e21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Thu, 5 Sep 2024 19:06:38 +0200 Subject: [PATCH 14/17] ResourceLoader: Add last resort life-time insurance for tokens (cherry picked from commit ccd470d33c49e28d5be3ca258da4f2ce950949db) --- core/io/resource_loader.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 0ee9fb2d9f36..5c14661eacfb 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -317,6 +317,7 @@ Ref ResourceLoader::_load(const String &p_path, const String &p_origin } // This implementation must allow re-entrancy for a task that started awaiting in a deeper stack frame. +// The load task token must be manually re-referenced before this is called, which includes threaded runs. void ResourceLoader::_run_load_task(void *p_userdata) { ThreadLoadTask &load_task = *(ThreadLoadTask *)p_userdata; @@ -447,6 +448,9 @@ void ResourceLoader::_run_load_task(void *p_userdata) { } } + // It's safe now to let the task go in case no one else was grabbing the token. + load_task.load_token->unreference(); + if (unlock_pending) { thread_load_mutex.unlock(); } @@ -597,6 +601,11 @@ Ref ResourceLoader::_load_start(const String &p_path, } } + // It's important to keep the token alive because until the load completes, + // which includes before the thread start, it may happen that no one is grabbing + // the token anymore so it's released. + load_task_ptr->load_token->reference(); + if (p_thread_mode == LOAD_THREAD_FROM_CURRENT) { // The current thread may happen to be a thread from the pool. WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->get_caller_task_id(); @@ -781,6 +790,7 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro // resource loading that means that the task to wait for can be restarted here to break the // cycle, with as much recursion into this process as needed. // When the stack is eventually unrolled, the original load will have been notified to go on. + load_task.load_token->reference(); _run_load_task(&load_task); } From 251237d2b4021b9cdd25c644cf13061b98b0d0ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Fri, 6 Sep 2024 17:43:05 +0200 Subject: [PATCH 15/17] ResourceLoader: Fixup resource changed feature This is a complement to: https://github.com/godotengine/godot/pull/96593 (cherry picked from commit 97197ff5e9c73ffbb2e3822d40a63bc3f8c47373) --- core/io/resource_loader.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 5c14661eacfb..5f578dbeecfc 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -826,6 +826,8 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro load_task_ptr = &load_task; } + thread_load_mutex.unlock(); + Ref resource = load_task_ptr->resource; if (r_error) { *r_error = load_task_ptr->error; @@ -863,6 +865,8 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro } } + thread_load_mutex.lock(); + return resource; } From 9ed06bce54002e83244fdf8cabf0a12931a2b17b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Fri, 13 Sep 2024 12:19:06 +0200 Subject: [PATCH 16/17] EditorResourcePreview: Let loads complete after exit requested (cherry picked from commit f31867d2b9e7b916e24683c024764bcc3d326a83) --- editor/editor_resource_preview.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/editor/editor_resource_preview.cpp b/editor/editor_resource_preview.cpp index 71865f7e8cc0..47caa4fef94e 100644 --- a/editor/editor_resource_preview.cpp +++ b/editor/editor_resource_preview.cpp @@ -531,8 +531,10 @@ void EditorResourcePreview::stop() { } while (!exited.is_set()) { + // Sync pending work. OS::get_singleton()->delay_usec(10000); - RenderingServer::get_singleton()->sync(); //sync pending stuff, as thread may be blocked on rendering server + RenderingServer::get_singleton()->sync(); + MessageQueue::get_singleton()->flush(); } thread.wait_to_finish(); From 24442046f9e0225166263e6bf9e656fce2b461ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20J=2E=20Est=C3=A9banez?= Date: Mon, 23 Sep 2024 19:00:47 +0200 Subject: [PATCH 17/17] ResourceLoader: Report error if resource type unrecognized Co-authored-by: Summersay415 (cherry picked from commit 214deab6dd3d6746e8af55aa67e50b210d16e841) --- core/io/resource_loader.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 5f578dbeecfc..877f81d3faab 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -308,11 +308,16 @@ Ref ResourceLoader::_load(const String &p_path, const String &p_origin ERR_FAIL_COND_V_MSG(found, Ref(), vformat("Failed loading resource: %s. Make sure resources have been imported by opening the project in the editor at least once.", p_path)); + DEV_ASSERT(!found); + #ifdef TOOLS_ENABLED Ref file_check = FileAccess::create(FileAccess::ACCESS_RESOURCES); ERR_FAIL_COND_V_MSG(!file_check->file_exists(p_path), Ref(), vformat("Resource file not found: %s (expected type: %s)", p_path, p_type_hint)); #endif + if (r_error) { + *r_error = ERR_FILE_UNRECOGNIZED; + } ERR_FAIL_V_MSG(Ref(), vformat("No loader found for resource: %s (expected type: %s)", p_path, p_type_hint)); }