From fcdbdfe898662f6988e00d8f77979085f4fee4b5 Mon Sep 17 00:00:00 2001 From: Kevin Huck Date: Mon, 9 Sep 2024 15:46:22 -0700 Subject: [PATCH] Untied tasks working everywhere, even with direct actions. Next step is to refactor and remove all references to the per-thread timer stacks. --- src/apex/apex.cpp | 28 ++++++-- src/apex/apex_types.h | 2 +- src/apex/thread_instance.cpp | 65 ++++++++++++++++++- src/apex/thread_instance.hpp | 2 + .../C++/apex_hpx_direct_actions.cpp | 18 +++-- .../apex_hpx_task_wrapper_direct_actions.cpp | 2 + 6 files changed, 104 insertions(+), 13 deletions(-) diff --git a/src/apex/apex.cpp b/src/apex/apex.cpp index 8c309e3b..aa07bfe7 100644 --- a/src/apex/apex.cpp +++ b/src/apex/apex.cpp @@ -463,6 +463,11 @@ uint64_t init(const char * thread_name, uint64_t comm_rank, in_apex prevent_deadlocks; // if APEX is disabled, do nothing. if (apex_options::disable() == true) { FUNCTION_EXIT; return APEX_ERROR; } + // if we are configured with HPX, disable untied timers so we can handle + // direct actions correctly. +#ifdef APEX_HAVE_HPX + apex_options::untied_timers(false); +#endif // FIRST! make sure APEX thinks this is a worker thread (the main thread // is always a worker thread) thread_instance::instance(true); @@ -548,6 +553,7 @@ uint64_t init(const char * thread_name, uint64_t comm_rank, std::shared_ptr twp = new_task(task_name, UINTMAX_MAX, task_wrapper::get_apex_main_wrapper()); start(twp); + APEX_ASSERT(twp->state == task_wrapper::RUNNING); thread_instance::set_top_level_timer(twp); } /* restore the suspended bit */ @@ -639,7 +645,7 @@ void debug_print(const char * event, std::shared_ptr tt_ptr) { return; } else { ss << thread_instance::get_id() << " " << event << " : " << - tt_ptr->guid << " : " << tt_ptr->get_task_id()->get_name() << " parents: "; + tt_ptr->guid << " : " << tt_ptr->get_task_id()->get_name() << " - parents: "; for (auto& p : tt_ptr->parents) { ss << p->get_task_id()->get_name() << ", "; } @@ -780,9 +786,11 @@ profiler* start(const std::string &timer_name) APEX_UTIL_REF_COUNT_START } #endif +/* if (apex_options::untied_timers() == true) { return new_profiler; } + */ return thread_instance::instance().restore_children_profilers(tt_ptr); } @@ -838,9 +846,11 @@ profiler* start(const apex_function_address function_address) { } } APEX_UTIL_REF_COUNT_START + /* if (apex_options::untied_timers() == true) { return new_profiler; } + */ return thread_instance::instance().restore_children_profilers(tt_ptr); } @@ -1190,7 +1200,9 @@ void stop(profiler* the_profiler, bool cleanup) { } LOCAL_DEBUG_PRINT("Stop", the_profiler->tt_ptr); if (apex_options::untied_timers() == true) { - thread_instance::instance().clear_untied_current_profiler(); + //thread_instance::instance().clear_untied_current_profiler(); + thread_instance::instance().clear_current_profiler_untied(the_profiler, false, + null_task_wrapper); } else { thread_instance::instance().clear_current_profiler(the_profiler, false, null_task_wrapper); @@ -1270,7 +1282,9 @@ void stop(std::shared_ptr tt_ptr) { //APEX_ASSERT(tt_ptr->prof->thread_id == thread_instance::instance().get_id()); } if (apex_options::untied_timers()) { - thread_instance::instance().clear_untied_current_profiler(); + //thread_instance::instance().clear_untied_current_profiler(); + thread_instance::instance().clear_current_profiler_untied(tt_ptr->prof, false, + null_task_wrapper); } else { thread_instance::instance().clear_current_profiler(tt_ptr->prof, false, null_task_wrapper); @@ -1341,7 +1355,9 @@ void yield(profiler* the_profiler) { } LOCAL_DEBUG_PRINT("Yield", the_profiler->tt_ptr); if (apex_options::untied_timers() == true) { - thread_instance::instance().clear_untied_current_profiler(); + //thread_instance::instance().clear_untied_current_profiler(); + thread_instance::instance().clear_current_profiler_untied(the_profiler, false, + null_task_wrapper); } else { thread_instance::instance().clear_current_profiler(the_profiler, false, null_task_wrapper); @@ -1395,7 +1411,9 @@ void yield(std::shared_ptr tt_ptr) { return; } if (apex_options::untied_timers() == true) { - thread_instance::instance().clear_untied_current_profiler(); + //thread_instance::instance().clear_untied_current_profiler(); + thread_instance::instance().clear_current_profiler_untied(tt_ptr->prof, true, + tt_ptr); } else { thread_instance::instance().clear_current_profiler(tt_ptr->prof, true, tt_ptr); diff --git a/src/apex/apex_types.h b/src/apex/apex_types.h index d23c463f..f2907f1b 100644 --- a/src/apex/apex_types.h +++ b/src/apex/apex_types.h @@ -270,7 +270,7 @@ inline unsigned int sc_nprocessors_onln(void) macro (APEX_SUSPEND, suspend, bool, false, "Suspend APEX timers and counters during the application execution") \ macro (APEX_PAPI_SUSPEND, papi_suspend, bool, false, "Suspend PAPI counters during the application execution") \ macro (APEX_PROCESS_ASYNC_STATE, process_async_state, bool, true, "Enable/disable asynchronous processing of statistics (useful when only collecting trace data)") \ - macro (APEX_UNTIED_TIMERS, untied_timers, bool, false, "Disable callstack state maintenance for specific OS threads. This allows APEX timers to start on one thread and stop on another. This is not compatible with tracing.") \ + macro (APEX_UNTIED_TIMERS, untied_timers, bool, true, "Disable callstack state maintenance for specific OS threads. This allows APEX timers to start on one thread and stop on another. This is not compatible with OTF2 tracing.") \ macro (APEX_TAU, use_tau, bool, false, "Enable TAU profiling (if application is executed with tau_exec).") \ macro (APEX_OTF2, use_otf2, bool, false, "Enable OTF2 trace output.") \ macro (APEX_OTF2_COLLECTIVE_SIZE, otf2_collective_size, int, 1, "") \ diff --git a/src/apex/thread_instance.cpp b/src/apex/thread_instance.cpp index db08348d..a48905ff 100644 --- a/src/apex/thread_instance.cpp +++ b/src/apex/thread_instance.cpp @@ -249,7 +249,12 @@ void thread_instance::set_current_profiler(profiler * the_profiler) { if (apex_options::untied_timers() == true) { APEX_ASSERT(the_profiler != nullptr && the_profiler->tt_ptr != nullptr); // make the previous profiler on the "stack" the parent of this profiler - the_profiler->untied_parent = instance().untied_current_profiler; + if (instance().untied_current_profiler == nullptr) { + //the_profiler->untied_parent = task_wrapper::get_apex_main_wrapper(); + the_profiler->untied_parent = instance().get_top_level_timer(); + } else { + the_profiler->untied_parent = instance().untied_current_profiler; + } // make this profiler the new top of the "stack" instance().untied_current_profiler = the_profiler->tt_ptr; } @@ -299,6 +304,64 @@ void thread_instance::clear_all_profilers() { } } +void thread_instance::clear_current_profiler_untied(profiler * the_profiler, + bool save_children, std::shared_ptr &tt_ptr) { + static APEX_NATIVE_TLS bool fixing_stack = false; + // check for recursion + if (fixing_stack) {return;} + // get the current profiler + auto tmp = instance().untied_current_profiler; + if (tmp == nullptr) { + // nothing to do? This thread has no other running timers. + return; + } + printf("%lu popping %s\n", get_id(), tmp->get_task_id()->get_short_name().c_str()); + /* Uh-oh! Someone has caused the dreaded "overlapping timer" problem to + * happen! No problem - stop the child timer. + * Keep the children around, along with a reference to the parent's + * guid so that if/when we see this parent again, we can restart + * the children timers. */ + if (tmp->prof != the_profiler) { + fixing_stack = true; + // if the data pointer location isn't available, we can't support this runtime. + // create a vector to store the children + if (save_children == true) { + APEX_ASSERT(tt_ptr != nullptr); + } + while (tmp->prof != the_profiler) { + if (save_children == true) { + // if we are yielding, we need to stop the children + /* Make a copy of the profiler object on the top of the stack. */ + profiler * profiler_copy = new profiler(*tmp->prof); + tt_ptr->data_ptr.push_back(tmp->prof); + /* Stop the copy. The original will get reset when the + parent resumes. */ + stop(profiler_copy, false); // we better be re-entrant safe! + } else { + // since we aren't yielding, just stop the children. + stop(tmp->prof); // we better be re-entrant safe! + } + // this is a serious problem... + if (tmp->prof->untied_parent == nullptr) { + // unless...we happen to be exiting. Bets are off. + if (apex_options::suspend() == true) { return; } + // if we've already cleared the stack on this thread, we're fine + if (instance()._exiting) { return; } + std::cerr << "Warning! empty profiler stack!\n"; + APEX_ASSERT(false); + //abort(); + return; + } + // get the new top of the stack + tmp = tmp->prof->untied_parent; + printf("%lu popping? %s\n", get_id(), tmp->get_task_id()->get_short_name().c_str()); + } + // done with the stack, allow proper recursion again. + fixing_stack = false; + } + instance().untied_current_profiler = tmp->prof->untied_parent; +} + void thread_instance::clear_current_profiler(profiler * the_profiler, bool save_children, std::shared_ptr &tt_ptr) { // this is a stack variable that provides safety when using recursion. diff --git a/src/apex/thread_instance.hpp b/src/apex/thread_instance.hpp index 5cf7f798..89fbba5a 100644 --- a/src/apex/thread_instance.hpp +++ b/src/apex/thread_instance.hpp @@ -163,6 +163,8 @@ class thread_instance { static profiler * get_current_profiler(void); static void clear_current_profiler(profiler * the_profiler, bool save_children, std::shared_ptr &tt_ptr); + static void clear_current_profiler_untied(profiler * the_profiler, + bool save_children, std::shared_ptr &tt_ptr); static void clear_current_profiler() { instance().current_profilers.pop_back(); } diff --git a/src/unit_tests/C++/apex_hpx_direct_actions.cpp b/src/unit_tests/C++/apex_hpx_direct_actions.cpp index 258a777a..bad39a1c 100644 --- a/src/unit_tests/C++/apex_hpx_direct_actions.cpp +++ b/src/unit_tests/C++/apex_hpx_direct_actions.cpp @@ -19,6 +19,8 @@ const int num_iterations = 10; pthread_barrier_t barrier; #endif +std::atomic guid{32}; + int nsleep(long miliseconds, int tid) { struct timespec req, rem; @@ -47,7 +49,7 @@ int nsleep(long miliseconds, int tid) } void innerLoop(int *tid) { - std::shared_ptr tt_ptr = apex::new_task(__func__); + std::shared_ptr tt_ptr = apex::new_task(__func__, guid++); #ifdef __DEBUG_PRINT__ std::stringstream buf; buf << "APP: " << *tid << ": Starting thread " << tt_ptr->guid << "\n"; std::cout << buf.str(); @@ -65,7 +67,7 @@ void innerLoop(int *tid) { #endif /* Start a timer like an "direct_action" */ - std::shared_ptr af = apex::new_task("direct_action", UINTMAX_MAX, tt_ptr); + std::shared_ptr af = apex::new_task("direct_action", guid++, tt_ptr); #ifdef __DEBUG_PRINT__ buf.str(""); buf.clear(); buf << "APP: " << *tid << ": Starting direct_action " << af->guid << "\n"; std::cout << buf.str(); @@ -126,11 +128,12 @@ void* someThread(void* tmp) #endif apex::register_thread(name); - apex::profiler* p = apex::start(__func__); + auto task = apex::new_task(__func__, guid++); + apex::start(task); for (int i = 0 ; i < num_iterations ; i++) { innerLoop(tid); } - apex::stop(p); + apex::stop(task); /* tell APEX that this thread is exiting */ apex::exit_thread(); @@ -142,8 +145,11 @@ int main (int argc, char** argv) { apex::init("apex::start unit test", 0, 1); /* important, to make sure we get correct profiles at the end */ apex::apex_options::use_screen_output(true); + /* disable untied timers! not yet supported with direct actions. */ + //apex::apex_options::untied_timers(false); /* start a timer */ - apex::profiler* p = apex::start("main"); + auto task = apex::new_task("main", guid); + apex::start(task); /* Spawn X threads */ if (argc > 1) { test_numthreads = strtoul(argv[1],NULL,0); @@ -169,7 +175,7 @@ int main (int argc, char** argv) { free(tids); free(thread); /* stop our main timer */ - apex::stop(p); + apex::stop(task); /* finalize APEX */ apex::finalize(); apex_profile * profile1 = apex::get_profile("direct_action"); diff --git a/src/unit_tests/C++/apex_hpx_task_wrapper_direct_actions.cpp b/src/unit_tests/C++/apex_hpx_task_wrapper_direct_actions.cpp index b5df8404..3217d89d 100644 --- a/src/unit_tests/C++/apex_hpx_task_wrapper_direct_actions.cpp +++ b/src/unit_tests/C++/apex_hpx_task_wrapper_direct_actions.cpp @@ -147,6 +147,8 @@ int main (int argc, char** argv) { apex::init("apex::start unit test", 0, 1); /* important, to make sure we get correct profiles at the end */ apex::apex_options::use_screen_output(true); + /* disable untied timers! not yet supported with direct actions. */ + //apex::apex_options::untied_timers(false); /* start a timer */ apex::profiler* p = apex::start("main"); /* Spawn X threads */