Skip to content

Commit

Permalink
ResourceLoader: Enhance deadlock prevention
Browse files Browse the repository at this point in the history
Benefits:
- Simpler code. The main load function is renamed so it's apparent that it's not just a thread entry point anymore.
- Cache and thread modes of the original task are honored. A beautiful consequence of this is that, unlike formerly, re-issued loads can use the resource cache, which makes this mechanism much more performant.
- The newly added getter for caller task id in WorkerThreadPool allows to remove the custom tracking of that in ResourceLoader.
- The check to replace a cached resource and the replacement itself happen atomically. That fixes deadlock prevention leading to multiple resource instances of the same one on disk. As a side effect, it also makes the regular check for replace load mode more robust.
  • Loading branch information
RandomShaper committed Aug 21, 2024
1 parent bd0959e commit 28619e2
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 20 deletions.
52 changes: 34 additions & 18 deletions core/io/resource_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,11 @@ Ref<Resource> ResourceLoader::_load(const String &p_path, const String &p_origin
ERR_FAIL_V_MSG(Ref<Resource>(), vformat("No loader found for resource: %s (expected type: %s)", p_path, p_type_hint));
}

void ResourceLoader::_thread_load_function(void *p_userdata) {
// This implementation must allow re-entrancy for a task that started awaiting in a deeper stack frame.
void ResourceLoader::_run_load_task(void *p_userdata) {
ThreadLoadTask &load_task = *(ThreadLoadTask *)p_userdata;

thread_load_mutex.lock();
caller_task_id = load_task.task_id;
if (cleaning_tasks) {
load_task.status = THREAD_LOAD_FAILED;
thread_load_mutex.unlock();
Expand Down Expand Up @@ -376,15 +376,28 @@ void ResourceLoader::_thread_load_function(void *p_userdata) {
unlock_pending = false;

if (!ignoring) {
if (replacing) {
Ref<Resource> old_res = ResourceCache::get_ref(load_task.local_path);
if (old_res.is_valid() && old_res != load_task.resource) {
// If resource is already loaded, only replace its data, to avoid existing invalidating instances.
old_res->copy_from(load_task.resource);
ResourceCache::lock.lock(); // Check and operations must happen atomically.
bool pending_unlock = true;
Ref<Resource> old_res = ResourceCache::get_ref(load_task.local_path);
if (old_res.is_valid()) {
if (old_res != load_task.resource) {
// Resource can already exists at this point for two reasons:
// a) The load uses replace mode.
// b) There were more than one load in flight for the same path because of deadlock prevention.
// Either case, we want to keep the resource that was already there.
ResourceCache::lock.unlock();
pending_unlock = false;
if (replacing) {
old_res->copy_from(load_task.resource);
}
load_task.resource = old_res;
}
} else {
load_task.resource->set_path(load_task.local_path);
}
if (pending_unlock) {
ResourceCache::lock.unlock();
}
load_task.resource->set_path(load_task.local_path, replacing);
} else {
load_task.resource->set_path_cache(load_task.local_path);
}
Expand Down Expand Up @@ -552,14 +565,20 @@ Ref<ResourceLoader::LoadToken> ResourceLoader::_load_start(const String &p_path,
run_on_current_thread = must_not_register || p_thread_mode == LOAD_THREAD_FROM_CURRENT;

if (run_on_current_thread) {
load_task_ptr->thread_id = Thread::get_caller_id();
// The current thread may happen to be a thread from the pool.
WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->get_caller_task_id();
if (tid != WorkerThreadPool::INVALID_TASK_ID) {
load_task_ptr->task_id = tid;
} else {
load_task_ptr->thread_id = Thread::get_caller_id();
}
} else {
load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_thread_load_function, load_task_ptr);
load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_run_load_task, load_task_ptr);
}
}

if (run_on_current_thread) {
_thread_load_function(load_task_ptr);
_run_load_task(load_task_ptr);
if (must_not_register) {
load_token->res_if_unregistered = load_task_ptr->resource;
}
Expand Down Expand Up @@ -702,7 +721,7 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
if (load_task.status == THREAD_LOAD_IN_PROGRESS) {
DEV_ASSERT((load_task.task_id == 0) != (load_task.thread_id == 0));

if ((load_task.task_id != 0 && load_task.task_id == caller_task_id) ||
if ((load_task.task_id != 0 && load_task.task_id == WorkerThreadPool::get_singleton()->get_caller_task_id()) ||
(load_task.thread_id != 0 && load_task.thread_id == Thread::get_caller_id())) {
// Load is in progress, but it's precisely this thread the one in charge.
// That means this is a cyclic load.
Expand Down Expand Up @@ -731,12 +750,10 @@ Ref<Resource> ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro
// resource loading that means that the task to wait for can be restarted here to break the
// cycle, with as much recursion into this process as needed.
// When the stack is eventually unrolled, the original load will have been notified to go on.
// CACHE_MODE_IGNORE is needed because, otherwise, the new request would just see there's
// an ongoing load for that resource and wait for it again. This value forces a new load.
Ref<ResourceLoader::LoadToken> token = _load_start(load_task.local_path, load_task.type_hint, LOAD_THREAD_DISTRIBUTE, ResourceFormatLoader::CACHE_MODE_IGNORE);
Ref<Resource> resource = _load_complete(*token.ptr(), &wtp_task_err);
_run_load_task(&load_task);
Ref<Resource> resource = load_task.resource;
if (r_error) {
*r_error = wtp_task_err;
*r_error = load_task.error;
}
thread_load_mutex.lock();
return resource;
Expand Down Expand Up @@ -1324,7 +1341,6 @@ bool ResourceLoader::abort_on_missing_resource = true;
bool ResourceLoader::timestamp_on_load = false;

thread_local int ResourceLoader::load_nesting = 0;
thread_local WorkerThreadPool::TaskID ResourceLoader::caller_task_id = 0;
thread_local Vector<String> ResourceLoader::load_paths_stack;
thread_local HashMap<int, HashMap<String, Ref<Resource>>> ResourceLoader::res_ref_overrides;

Expand Down
3 changes: 1 addition & 2 deletions core/io/resource_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,9 @@ class ResourceLoader {
HashSet<String> sub_tasks;
};

static void _thread_load_function(void *p_userdata);
static void _run_load_task(void *p_userdata);

static thread_local int load_nesting;
static thread_local WorkerThreadPool::TaskID caller_task_id;
static thread_local HashMap<int, HashMap<String, Ref<Resource>>> res_ref_overrides; // Outermost key is nesting level.
static thread_local Vector<String> load_paths_stack;
static SafeBinaryMutex<BINARY_MUTEX_TAG> thread_load_mutex;
Expand Down
9 changes: 9 additions & 0 deletions core/object/worker_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,15 @@ int WorkerThreadPool::get_thread_index() {
return singleton->thread_ids.has(tid) ? singleton->thread_ids[tid] : -1;
}

WorkerThreadPool::TaskID WorkerThreadPool::get_caller_task_id() {
int th_index = get_thread_index();
if (th_index != -1 && singleton->threads[th_index].current_task) {
return singleton->threads[th_index].current_task->self;
} else {
return INVALID_TASK_ID;
}
}

#ifdef THREADS_ENABLED
uint32_t WorkerThreadPool::thread_enter_unlock_allowance_zone(Mutex *p_mutex) {
return _thread_enter_unlock_allowance_zone(p_mutex, false);
Expand Down
1 change: 1 addition & 0 deletions core/object/worker_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ class WorkerThreadPool : public Object {

static WorkerThreadPool *get_singleton() { return singleton; }
static int get_thread_index();
static TaskID get_caller_task_id();

#ifdef THREADS_ENABLED
static uint32_t thread_enter_unlock_allowance_zone(Mutex *p_mutex);
Expand Down

0 comments on commit 28619e2

Please sign in to comment.