Skip to content

Commit

Permalink
ResourceLoader: Fix edge cases in the management of user tokens
Browse files Browse the repository at this point in the history
1. Make handling of user tokens atomic:
   Loads started with the external-facing API used to perform a two-step setup of the user token. Between both, the mutex was unlocked without its reference count having been increased. A non-user-initiated load could therefore destroy the load task when it unreferenced the token.
   Those stages now happen atomically so in the one hand, the described race condition can't happen so the load task life insurance doesn't have a gap anymore and, on the other hand, the ugliness that the call to load could return `ERR_BUSY` if happening while other thread was between both steps is gone.
   The code has been refactored so the user token concerns are still outside the inner load start function, which is agnostic to that for a cleaner implementation.
2. Clear ambiguity between load operations running on `WorkerThreadPool`:
   The two cases are: single-loaded thread directly started by a user pool task and a load started by the system as part of a multi-threaded load.
   Since ensuring all the code dealing with this distinction would make it very complex, and error-prone, a different measure is applied instead: just take one of the cases out of the dicotomy. We now ensure every load happening on a pool thread has been initiated by the system.
   The way of achieving that is that a single-threaded user-started load initiated from a pool thread, is run as another task.
  • Loading branch information
RandomShaper committed Aug 21, 2024
1 parent 5c970db commit df23858
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 84 deletions.
178 changes: 95 additions & 83 deletions core/io/resource_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,23 +231,22 @@ void ResourceLoader::LoadToken::clear() {

WorkerThreadPool::TaskID task_to_await = 0;

// User-facing tokens shouldn't be deleted until completely claimed.
DEV_ASSERT(user_rc == 0 && user_path.is_empty());

if (!local_path.is_empty()) { // Empty is used for the special case where the load task is not registered.
DEV_ASSERT(thread_load_tasks.has(local_path));
ThreadLoadTask &load_task = thread_load_tasks[local_path];
if (!load_task.awaited) {
if (load_task.task_id && !load_task.awaited) {
task_to_await = load_task.task_id;
load_task.awaited = true;
}
// Removing a task which is still in progress would be catastrophic.
// Tokens must be alive until the task thread function is done.
DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED);
thread_load_tasks.erase(local_path);
local_path.clear();
}

if (!user_path.is_empty()) {
DEV_ASSERT(user_load_tokens.has(user_path));
user_load_tokens.erase(user_path);
user_path.clear();
}

thread_load_mutex.unlock();

// If task is unused, await it here, locally, now the token data is consistent.
Expand Down Expand Up @@ -461,36 +460,44 @@ static String _validate_local_path(const String &p_path) {
}

Error ResourceLoader::load_threaded_request(const String &p_path, const String &p_type_hint, bool p_use_sub_threads, ResourceFormatLoader::CacheMode p_cache_mode) {
thread_load_mutex.lock();
if (user_load_tokens.has(p_path)) {
print_verbose("load_threaded_request(): Another threaded load for resource path '" + p_path + "' has been initiated. Not an error.");
user_load_tokens[p_path]->reference(); // Additional request.
thread_load_mutex.unlock();
return OK;
}
user_load_tokens[p_path] = nullptr;
thread_load_mutex.unlock();
Ref<ResourceLoader::LoadToken> token = _load_start(p_path, p_type_hint, p_use_sub_threads ? LOAD_THREAD_DISTRIBUTE : LOAD_THREAD_SPAWN_SINGLE, p_cache_mode, true);
return token.is_valid() ? OK : FAILED;
}

Ref<ResourceLoader::LoadToken> token = _load_start(p_path, p_type_hint, p_use_sub_threads ? LOAD_THREAD_DISTRIBUTE : LOAD_THREAD_SPAWN_SINGLE, p_cache_mode);
if (token.is_valid()) {
thread_load_mutex.lock();
token->user_path = p_path;
token->reference(); // First request.
user_load_tokens[p_path] = token.ptr();
print_lt("REQUEST: user load tokens: " + itos(user_load_tokens.size()));
thread_load_mutex.unlock();
return OK;
ResourceLoader::LoadToken *ResourceLoader::_load_threaded_request_reuse_user_token(const String &p_path) {
HashMap<String, LoadToken *>::Iterator E = user_load_tokens.find(p_path);
if (E) {
print_verbose("load_threaded_request(): Another threaded load for resource path '" + p_path + "' has been initiated. Not an error.");
LoadToken *token = E->value;
token->user_rc++;
return token;
} else {
return FAILED;
return nullptr;
}
}

void ResourceLoader::_load_threaded_request_setup_user_token(LoadToken *p_token, const String &p_path) {
p_token->user_path = p_path;
p_token->reference(); // Extra RC until all user requests have been gotten.
p_token->user_rc = 1;
user_load_tokens[p_path] = p_token;
print_lt("REQUEST: user load tokens: " + itos(user_load_tokens.size()));
}

Ref<Resource> ResourceLoader::load(const String &p_path, const String &p_type_hint, ResourceFormatLoader::CacheMode p_cache_mode, Error *r_error) {
if (r_error) {
*r_error = OK;
}

Ref<LoadToken> load_token = _load_start(p_path, p_type_hint, LOAD_THREAD_FROM_CURRENT, p_cache_mode);
LoadThreadMode thread_mode = LOAD_THREAD_FROM_CURRENT;
if (WorkerThreadPool::get_singleton()->get_caller_task_id() != WorkerThreadPool::INVALID_TASK_ID) {
// If user is initiating a single-threaded load from a WorkerThreadPool task,
// we instead spawn a new task so there's a precondition that a load in a pool task
// is always initiated by the engine. That makes certain aspects simpler, such as
// cyclic load detection and awaiting.
thread_mode = LOAD_THREAD_SPAWN_SINGLE;
}
Ref<LoadToken> load_token = _load_start(p_path, p_type_hint, thread_mode, p_cache_mode);
if (!load_token.is_valid()) {
if (r_error) {
*r_error = FAILED;
Expand All @@ -502,7 +509,7 @@ Ref<Resource> ResourceLoader::load(const String &p_path, const String &p_type_hi
return res;
}

Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode) {
Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode, bool p_for_user) {
String local_path = _validate_local_path(p_path);

bool ignoring_cache = p_cache_mode == ResourceFormatLoader::CACHE_MODE_IGNORE || p_cache_mode == ResourceFormatLoader::CACHE_MODE_IGNORE_DEEP;
Expand All @@ -515,6 +522,13 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
{
MutexLock thread_load_lock(thread_load_mutex);

if (p_for_user) {
LoadToken *existing_token = _load_threaded_request_reuse_user_token(p_path);
if (existing_token) {
return Ref<LoadToken>(existing_token);
}
}

if (!ignoring_cache && thread_load_tasks.has(local_path)) {
load_token = Ref<LoadToken>(thread_load_tasks[local_path].load_token);
if (load_token.is_valid()) {
Expand All @@ -528,6 +542,9 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,

load_token.instantiate();
load_token->local_path = local_path;
if (p_for_user) {
_load_threaded_request_setup_user_token(load_token.ptr(), p_path);
}

//create load task
{
Expand All @@ -545,6 +562,7 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
load_task.resource = existing;
load_task.status = THREAD_LOAD_LOADED;
load_task.progress = 1.0;
DEV_ASSERT(!thread_load_tasks.has(local_path));
thread_load_tasks[local_path] = load_task;
return load_token;
}
Expand Down Expand Up @@ -576,7 +594,7 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
} else {
load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_run_load_task, load_task_ptr);
}
}
} // MutexLock(thread_load_mutex).

if (run_on_current_thread) {
_run_load_task(load_task_ptr);
Expand Down Expand Up @@ -666,13 +684,7 @@ Ref<Resource> ResourceLoader::load_threaded_get(const String &p_path, Error *r_e
}

LoadToken *load_token = user_load_tokens[p_path];
if (!load_token) {
// This happens if requested from one thread and rapidly querying from another.
if (r_error) {
*r_error = ERR_BUSY;
}
return Ref<Resource>();
}
DEV_ASSERT(load_token->user_rc >= 1);

// Support userland requesting on the main thread before the load is reported to be complete.
if (Thread::is_main_thread() && !load_token->local_path.is_empty()) {
Expand All @@ -689,8 +701,15 @@ Ref<Resource> ResourceLoader::load_threaded_get(const String &p_path, Error *r_e
}

res = _load_complete_inner(*load_token, r_error, thread_load_lock);
if (load_token->unreference()) {
memdelete(load_token);

load_token->user_rc--;
if (load_token->user_rc == 0) {
load_token->user_path.clear();
user_load_tokens.erase(p_path);
if (load_token->unreference()) {
memdelete(load_token);
load_token = nullptr;
}
}
}

Expand Down Expand Up @@ -733,55 +752,45 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
}

bool loader_is_wtp = load_task.task_id != 0;
Error wtp_task_err = FAILED;
if (loader_is_wtp) {
// Loading thread is in the worker pool.
load_task.awaited = true;
thread_load_mutex.unlock();

PREPARE_FOR_WTP_WAIT
wtp_task_err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id);
Error wait_err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id);
RESTORE_AFTER_WTP_WAIT
}

if (load_task.status == THREAD_LOAD_IN_PROGRESS) { // If early errored, awaiting would deadlock.
if (loader_is_wtp) {
if (wtp_task_err == ERR_BUSY) {
// The WorkerThreadPool has reported that the current task wants to await on an older one.
// That't not allowed for safety, to avoid deadlocks. Fortunately, though, in the context of
// resource loading that means that the task to wait for can be restarted here to break the
// cycle, with as much recursion into this process as needed.
// When the stack is eventually unrolled, the original load will have been notified to go on.
_run_load_task(&load_task);
Ref<Resource> resource = load_task.resource;
if (r_error) {
*r_error = load_task.error;
}
thread_load_mutex.lock();
return resource;
} else {
DEV_ASSERT(wtp_task_err == OK);
thread_load_mutex.lock();
}
} else if (load_task.need_wait) {
// Loading thread is main or user thread.
if (!load_task.cond_var) {
load_task.cond_var = memnew(ConditionVariable);
}
load_task.awaiters_count++;
do {
load_task.cond_var->wait(p_thread_load_lock);
DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count());
} while (load_task.need_wait);
load_task.awaiters_count--;
if (load_task.awaiters_count == 0) {
memdelete(load_task.cond_var);
load_task.cond_var = nullptr;
}
DEV_ASSERT(!wait_err || wait_err == ERR_BUSY);
if (wait_err == ERR_BUSY) {
// The WorkerThreadPool has reported that the current task wants to await on an older one.
// That't not allowed for safety, to avoid deadlocks. Fortunately, though, in the context of
// resource loading that means that the task to wait for can be restarted here to break the
// cycle, with as much recursion into this process as needed.
// When the stack is eventually unrolled, the original load will have been notified to go on.
_run_load_task(&load_task);
}
} else {
if (loader_is_wtp) {
thread_load_mutex.lock();

thread_load_mutex.lock();
load_task.awaited = true;

DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED);
} else if (load_task.need_wait) {
// Loading thread is main or user thread.
if (!load_task.cond_var) {
load_task.cond_var = memnew(ConditionVariable);
}
load_task.awaiters_count++;
do {
load_task.cond_var->wait(p_thread_load_lock);
DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count());
} while (load_task.need_wait);
load_task.awaiters_count--;
if (load_task.awaiters_count == 0) {
memdelete(load_task.cond_var);
load_task.cond_var = nullptr;
}

DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED);
}
}

Expand Down Expand Up @@ -1221,10 +1230,13 @@ void ResourceLoader::clear_thread_load_tasks() {
}

while (user_load_tokens.begin()) {
// User load tokens remove themselves from the map on destruction.
memdelete(user_load_tokens.begin()->value);
LoadToken *user_token = user_load_tokens.begin()->value;
user_load_tokens.remove(user_load_tokens.begin());
DEV_ASSERT(user_token->user_rc > 0 && !user_token->user_path.is_empty());
user_token->user_path.clear();
user_token->user_rc = 0;
user_token->unreference();
}
user_load_tokens.clear();

thread_load_tasks.clear();

Expand Down
6 changes: 5 additions & 1 deletion core/io/resource_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class ResourceLoader {
struct LoadToken : public RefCounted {
String local_path;
String user_path;
uint32_t user_rc = 0; // Having user RC implies regular RC incremented in one, until the user RC reaches zero.
Ref<Resource> res_if_unregistered;

void clear();
Expand All @@ -132,10 +133,13 @@ class ResourceLoader {

static const int BINARY_MUTEX_TAG = 1;

static Ref<LoadToken> _load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode);
static Ref<LoadToken> _load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode, bool p_for_user = false);
static Ref<Resource> _load_complete(LoadToken &p_load_token, Error *r_error);

private:
static LoadToken *_load_threaded_request_reuse_user_token(const String &p_path);
static void _load_threaded_request_setup_user_token(LoadToken *p_token, const String &p_path);

static Ref<Resource> _load_complete_inner(LoadToken &p_load_token, Error *r_error, MutexLock<SafeBinaryMutex<BINARY_MUTEX_TAG>> &p_thread_load_lock);

static Ref<ResourceFormatLoader> loader[MAX_LOADERS];
Expand Down

0 comments on commit df23858

Please sign in to comment.