Skip to content

Commit

Permalink
Untied tasks working everywhere, even with direct actions. Next step …
Browse files Browse the repository at this point in the history
…is to refactor and remove all references to the per-thread timer stacks.
  • Loading branch information
khuck committed Sep 9, 2024
1 parent f39b24b commit fcdbdfe
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 13 deletions.
28 changes: 23 additions & 5 deletions src/apex/apex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,11 @@ uint64_t init(const char * thread_name, uint64_t comm_rank,
in_apex prevent_deadlocks;
// if APEX is disabled, do nothing.
if (apex_options::disable() == true) { FUNCTION_EXIT; return APEX_ERROR; }
// if we are configured with HPX, disable untied timers so we can handle
// direct actions correctly.
#ifdef APEX_HAVE_HPX
apex_options::untied_timers(false);
#endif
// FIRST! make sure APEX thinks this is a worker thread (the main thread
// is always a worker thread)
thread_instance::instance(true);
Expand Down Expand Up @@ -548,6 +553,7 @@ uint64_t init(const char * thread_name, uint64_t comm_rank,
std::shared_ptr<task_wrapper> twp =
new_task(task_name, UINTMAX_MAX, task_wrapper::get_apex_main_wrapper());
start(twp);
APEX_ASSERT(twp->state == task_wrapper::RUNNING);
thread_instance::set_top_level_timer(twp);
}
/* restore the suspended bit */
Expand Down Expand Up @@ -639,7 +645,7 @@ void debug_print(const char * event, std::shared_ptr<task_wrapper> tt_ptr) {
return;
} else {
ss << thread_instance::get_id() << " " << event << " : " <<
tt_ptr->guid << " : " << tt_ptr->get_task_id()->get_name() << " parents: ";
tt_ptr->guid << " : " << tt_ptr->get_task_id()->get_name() << " - parents: ";
for (auto& p : tt_ptr->parents) {
ss << p->get_task_id()->get_name() << ", ";
}
Expand Down Expand Up @@ -780,9 +786,11 @@ profiler* start(const std::string &timer_name)
APEX_UTIL_REF_COUNT_START
}
#endif
/*
if (apex_options::untied_timers() == true) {
return new_profiler;
}
*/
return thread_instance::instance().restore_children_profilers(tt_ptr);
}

Expand Down Expand Up @@ -838,9 +846,11 @@ profiler* start(const apex_function_address function_address) {
}
}
APEX_UTIL_REF_COUNT_START
/*
if (apex_options::untied_timers() == true) {
return new_profiler;
}
*/
return thread_instance::instance().restore_children_profilers(tt_ptr);
}

Expand Down Expand Up @@ -1190,7 +1200,9 @@ void stop(profiler* the_profiler, bool cleanup) {
}
LOCAL_DEBUG_PRINT("Stop", the_profiler->tt_ptr);
if (apex_options::untied_timers() == true) {
thread_instance::instance().clear_untied_current_profiler();
//thread_instance::instance().clear_untied_current_profiler();
thread_instance::instance().clear_current_profiler_untied(the_profiler, false,
null_task_wrapper);
} else {
thread_instance::instance().clear_current_profiler(the_profiler, false,
null_task_wrapper);
Expand Down Expand Up @@ -1270,7 +1282,9 @@ void stop(std::shared_ptr<task_wrapper> tt_ptr) {
//APEX_ASSERT(tt_ptr->prof->thread_id == thread_instance::instance().get_id());
}
if (apex_options::untied_timers()) {
thread_instance::instance().clear_untied_current_profiler();
//thread_instance::instance().clear_untied_current_profiler();
thread_instance::instance().clear_current_profiler_untied(tt_ptr->prof, false,
null_task_wrapper);
} else {
thread_instance::instance().clear_current_profiler(tt_ptr->prof, false,
null_task_wrapper);
Expand Down Expand Up @@ -1341,7 +1355,9 @@ void yield(profiler* the_profiler) {
}
LOCAL_DEBUG_PRINT("Yield", the_profiler->tt_ptr);
if (apex_options::untied_timers() == true) {
thread_instance::instance().clear_untied_current_profiler();
//thread_instance::instance().clear_untied_current_profiler();
thread_instance::instance().clear_current_profiler_untied(the_profiler, false,
null_task_wrapper);
} else {
thread_instance::instance().clear_current_profiler(the_profiler, false,
null_task_wrapper);
Expand Down Expand Up @@ -1395,7 +1411,9 @@ void yield(std::shared_ptr<task_wrapper> tt_ptr) {
return;
}
if (apex_options::untied_timers() == true) {
thread_instance::instance().clear_untied_current_profiler();
//thread_instance::instance().clear_untied_current_profiler();
thread_instance::instance().clear_current_profiler_untied(tt_ptr->prof, true,
tt_ptr);
} else {
thread_instance::instance().clear_current_profiler(tt_ptr->prof, true,
tt_ptr);
Expand Down
2 changes: 1 addition & 1 deletion src/apex/apex_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ inline unsigned int sc_nprocessors_onln(void)
macro (APEX_SUSPEND, suspend, bool, false, "Suspend APEX timers and counters during the application execution") \
macro (APEX_PAPI_SUSPEND, papi_suspend, bool, false, "Suspend PAPI counters during the application execution") \
macro (APEX_PROCESS_ASYNC_STATE, process_async_state, bool, true, "Enable/disable asynchronous processing of statistics (useful when only collecting trace data)") \
macro (APEX_UNTIED_TIMERS, untied_timers, bool, false, "Disable callstack state maintenance for specific OS threads. This allows APEX timers to start on one thread and stop on another. This is not compatible with tracing.") \
macro (APEX_UNTIED_TIMERS, untied_timers, bool, true, "Disable callstack state maintenance for specific OS threads. This allows APEX timers to start on one thread and stop on another. This is not compatible with OTF2 tracing.") \
macro (APEX_TAU, use_tau, bool, false, "Enable TAU profiling (if application is executed with tau_exec).") \
macro (APEX_OTF2, use_otf2, bool, false, "Enable OTF2 trace output.") \
macro (APEX_OTF2_COLLECTIVE_SIZE, otf2_collective_size, int, 1, "") \
Expand Down
65 changes: 64 additions & 1 deletion src/apex/thread_instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,12 @@ void thread_instance::set_current_profiler(profiler * the_profiler) {
if (apex_options::untied_timers() == true) {
APEX_ASSERT(the_profiler != nullptr && the_profiler->tt_ptr != nullptr);
// make the previous profiler on the "stack" the parent of this profiler
the_profiler->untied_parent = instance().untied_current_profiler;
if (instance().untied_current_profiler == nullptr) {
//the_profiler->untied_parent = task_wrapper::get_apex_main_wrapper();
the_profiler->untied_parent = instance().get_top_level_timer();
} else {
the_profiler->untied_parent = instance().untied_current_profiler;
}
// make this profiler the new top of the "stack"
instance().untied_current_profiler = the_profiler->tt_ptr;
}
Expand Down Expand Up @@ -299,6 +304,64 @@ void thread_instance::clear_all_profilers() {
}
}

void thread_instance::clear_current_profiler_untied(profiler * the_profiler,
bool save_children, std::shared_ptr<task_wrapper> &tt_ptr) {
static APEX_NATIVE_TLS bool fixing_stack = false;
// check for recursion
if (fixing_stack) {return;}
// get the current profiler
auto tmp = instance().untied_current_profiler;
if (tmp == nullptr) {
// nothing to do? This thread has no other running timers.
return;
}
printf("%lu popping %s\n", get_id(), tmp->get_task_id()->get_short_name().c_str());
/* Uh-oh! Someone has caused the dreaded "overlapping timer" problem to
* happen! No problem - stop the child timer.
* Keep the children around, along with a reference to the parent's
* guid so that if/when we see this parent again, we can restart
* the children timers. */
if (tmp->prof != the_profiler) {
fixing_stack = true;
// if the data pointer location isn't available, we can't support this runtime.
// create a vector to store the children
if (save_children == true) {
APEX_ASSERT(tt_ptr != nullptr);
}
while (tmp->prof != the_profiler) {
if (save_children == true) {
// if we are yielding, we need to stop the children
/* Make a copy of the profiler object on the top of the stack. */
profiler * profiler_copy = new profiler(*tmp->prof);
tt_ptr->data_ptr.push_back(tmp->prof);
/* Stop the copy. The original will get reset when the
parent resumes. */
stop(profiler_copy, false); // we better be re-entrant safe!
} else {
// since we aren't yielding, just stop the children.
stop(tmp->prof); // we better be re-entrant safe!
}
// this is a serious problem...
if (tmp->prof->untied_parent == nullptr) {
// unless...we happen to be exiting. Bets are off.
if (apex_options::suspend() == true) { return; }
// if we've already cleared the stack on this thread, we're fine
if (instance()._exiting) { return; }
std::cerr << "Warning! empty profiler stack!\n";
APEX_ASSERT(false);
//abort();
return;
}
// get the new top of the stack
tmp = tmp->prof->untied_parent;
printf("%lu popping? %s\n", get_id(), tmp->get_task_id()->get_short_name().c_str());
}
// done with the stack, allow proper recursion again.
fixing_stack = false;
}
instance().untied_current_profiler = tmp->prof->untied_parent;
}

void thread_instance::clear_current_profiler(profiler * the_profiler,
bool save_children, std::shared_ptr<task_wrapper> &tt_ptr) {
// this is a stack variable that provides safety when using recursion.
Expand Down
2 changes: 2 additions & 0 deletions src/apex/thread_instance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ class thread_instance {
static profiler * get_current_profiler(void);
static void clear_current_profiler(profiler * the_profiler,
bool save_children, std::shared_ptr<task_wrapper> &tt_ptr);
static void clear_current_profiler_untied(profiler * the_profiler,
bool save_children, std::shared_ptr<task_wrapper> &tt_ptr);
static void clear_current_profiler() {
instance().current_profilers.pop_back();
}
Expand Down
18 changes: 12 additions & 6 deletions src/unit_tests/C++/apex_hpx_direct_actions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ const int num_iterations = 10;
pthread_barrier_t barrier;
#endif

std::atomic<uint64_t> guid{32};

int nsleep(long miliseconds, int tid)
{
struct timespec req, rem;
Expand Down Expand Up @@ -47,7 +49,7 @@ int nsleep(long miliseconds, int tid)
}

void innerLoop(int *tid) {
std::shared_ptr<apex::task_wrapper> tt_ptr = apex::new_task(__func__);
std::shared_ptr<apex::task_wrapper> tt_ptr = apex::new_task(__func__, guid++);
#ifdef __DEBUG_PRINT__
std::stringstream buf;
buf << "APP: " << *tid << ": Starting thread " << tt_ptr->guid << "\n"; std::cout << buf.str();
Expand All @@ -65,7 +67,7 @@ void innerLoop(int *tid) {
#endif

/* Start a timer like an "direct_action" */
std::shared_ptr<apex::task_wrapper> af = apex::new_task("direct_action", UINTMAX_MAX, tt_ptr);
std::shared_ptr<apex::task_wrapper> af = apex::new_task("direct_action", guid++, tt_ptr);
#ifdef __DEBUG_PRINT__
buf.str(""); buf.clear();
buf << "APP: " << *tid << ": Starting direct_action " << af->guid << "\n"; std::cout << buf.str();
Expand Down Expand Up @@ -126,11 +128,12 @@ void* someThread(void* tmp)
#endif
apex::register_thread(name);

apex::profiler* p = apex::start(__func__);
auto task = apex::new_task(__func__, guid++);
apex::start(task);
for (int i = 0 ; i < num_iterations ; i++) {
innerLoop(tid);
}
apex::stop(p);
apex::stop(task);

/* tell APEX that this thread is exiting */
apex::exit_thread();
Expand All @@ -142,8 +145,11 @@ int main (int argc, char** argv) {
apex::init("apex::start unit test", 0, 1);
/* important, to make sure we get correct profiles at the end */
apex::apex_options::use_screen_output(true);
/* disable untied timers! not yet supported with direct actions. */
//apex::apex_options::untied_timers(false);
/* start a timer */
apex::profiler* p = apex::start("main");
auto task = apex::new_task("main", guid);
apex::start(task);
/* Spawn X threads */
if (argc > 1) {
test_numthreads = strtoul(argv[1],NULL,0);
Expand All @@ -169,7 +175,7 @@ int main (int argc, char** argv) {
free(tids);
free(thread);
/* stop our main timer */
apex::stop(p);
apex::stop(task);
/* finalize APEX */
apex::finalize();
apex_profile * profile1 = apex::get_profile("direct_action");
Expand Down
2 changes: 2 additions & 0 deletions src/unit_tests/C++/apex_hpx_task_wrapper_direct_actions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ int main (int argc, char** argv) {
apex::init("apex::start unit test", 0, 1);
/* important, to make sure we get correct profiles at the end */
apex::apex_options::use_screen_output(true);
/* disable untied timers! not yet supported with direct actions. */
//apex::apex_options::untied_timers(false);
/* start a timer */
apex::profiler* p = apex::start("main");
/* Spawn X threads */
Expand Down

0 comments on commit fcdbdfe

Please sign in to comment.