Skip to content

Commit

Permalink
Fixing untied tasks with correct task dependency. Only remaining prob…
Browse files Browse the repository at this point in the history
…lem is that the HPX direct action support could be affected when untied tasks are enabled, but that shouldn't happen. To make it future-proof, we should make sure that direct actions are correctly supported with untied tasks.
  • Loading branch information
khuck committed Aug 22, 2024
1 parent 7e5218f commit 159d301
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 29 deletions.
93 changes: 66 additions & 27 deletions src/apex/apex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ DEFINE_DESTRUCTOR(apex_finalize_static_void)
#include "banner.hpp"
#include "apex_dynamic.hpp"

#if APEX_DEBUG
#ifdef APEX_DEBUG
#define FUNCTION_ENTER if (apex_options::use_verbose()) { \
fprintf(stderr, "enter %lu *** %s:%d!\n", \
thread_instance::get_id(), __APEX_FUNCTION__, __LINE__); fflush(stdout); }
Expand Down Expand Up @@ -650,6 +650,7 @@ inline std::shared_ptr<task_wrapper> _new_task(
} else {
profiler * p = thread_instance::instance().get_current_profiler();
if (p != nullptr) {
//printf("Extracting parent: %s\n", p->tt_ptr->task_id->get_name().c_str());
tt_ptr->parents.push_back(p->tt_ptr);
} else {
tt_ptr->parents.push_back(task_wrapper::get_apex_main_wrapper());
Expand Down Expand Up @@ -1153,8 +1154,12 @@ void stop(profiler* the_profiler, bool cleanup) {
#if defined(APEX_DEBUG)//_disabled)
if (apex_options::use_verbose()) { debug_print("Stop", the_profiler->tt_ptr); }
#endif
thread_instance::instance().clear_current_profiler(the_profiler, false,
null_task_wrapper);
if (apex_options::untied_timers() == true) {
thread_instance::instance().clear_untied_current_profiler();
} else {
thread_instance::instance().clear_current_profiler(the_profiler, false,
null_task_wrapper);
}
apex* instance = apex::instance(); // get the Apex static instance
// protect against calls after finalization
if (!instance || _exited || _measurement_stopped) {
Expand Down Expand Up @@ -1221,13 +1226,18 @@ void stop(std::shared_ptr<task_wrapper> tt_ptr) {
return;
}
// get the thread id that is running this task
if (tt_ptr->prof->thread_id != thread_instance::instance().get_id()) {
if (tt_ptr->prof->thread_id != thread_instance::instance().get_id() &&
!apex_options::untied_timers()) {
printf("Task %s started by %lu stopped by %lu\n", tt_ptr->task_id->get_name().c_str(),
tt_ptr->prof->thread_id, thread_instance::instance().get_id());
APEX_ASSERT(tt_ptr->prof->thread_id == thread_instance::instance().get_id());
}
thread_instance::instance().clear_current_profiler(tt_ptr->prof, false,
null_task_wrapper);
if (apex_options::untied_timers()) {
thread_instance::instance().clear_untied_current_profiler();
} else {
thread_instance::instance().clear_current_profiler(tt_ptr->prof, false,
null_task_wrapper);
}
// protect against calls after finalization
if (!instance || _exited || _measurement_stopped) {
APEX_UTIL_REF_COUNT_STOP_AFTER_FINALIZE
Expand Down Expand Up @@ -1285,8 +1295,12 @@ void yield(profiler* the_profiler)
APEX_UTIL_REF_COUNT_DOUBLE_YIELD
return;
}
thread_instance::instance().clear_current_profiler(the_profiler, false,
null_task_wrapper);
if (apex_options::untied_timers() == true) {
thread_instance::instance().clear_untied_current_profiler();
} else {
thread_instance::instance().clear_current_profiler(the_profiler, false,
null_task_wrapper);
}
std::shared_ptr<profiler> p{the_profiler};
if (_notify_listeners) {
//read_lock_type l(instance->listener_mutex);
Expand Down Expand Up @@ -1336,8 +1350,12 @@ void yield(std::shared_ptr<task_wrapper> tt_ptr)
APEX_UTIL_REF_COUNT_DOUBLE_YIELD
return;
}
thread_instance::instance().clear_current_profiler(tt_ptr->prof,
true, tt_ptr);
if (apex_options::untied_timers() == true) {
thread_instance::instance().clear_untied_current_profiler();
} else {
thread_instance::instance().clear_current_profiler(tt_ptr->prof, true,
tt_ptr);
}
std::shared_ptr<profiler> p{tt_ptr->prof};
if (_notify_listeners) {
//read_lock_type l(instance->listener_mutex);
Expand Down Expand Up @@ -1815,16 +1833,25 @@ void finalize(void)
}
}
#endif
// FIRST, stop the top level timer, while the infrastructure is still
// functioning.
auto tmp = thread_instance::get_top_level_timer();
if (tmp != nullptr) {
stop(tmp);
thread_instance::clear_top_level_timer();
}
// Second, stop the main timer, while the infrastructure is still
// functioning.
instance->the_profiler_listener->stop_main_timer();
//if (apex_options::untied_timers() == true) {
profiler * top_profiler = thread_instance::instance().get_current_profiler();
while (top_profiler != nullptr) {
stop(top_profiler);
top_profiler = thread_instance::instance().get_current_profiler();
}
/*
} else {
// FIRST, stop the top level timer, while the infrastructure is still
// functioning.
auto tmp = thread_instance::get_top_level_timer();
if (tmp != nullptr) {
stop(tmp);
thread_instance::clear_top_level_timer();
}
// Second, stop the main timer, while the infrastructure is still
// functioning.
instance->the_profiler_listener->stop_main_timer();
} */
// if not done already...
shutdown_throttling(); // stop thread scheduler policies
/* Do this before OTF2 grabs a final timestamp - we might have
Expand Down Expand Up @@ -2017,14 +2044,26 @@ void exit_thread(void)
instance->known_threads.erase(&ti);
}
}
auto tmp = thread_instance::get_top_level_timer();
// tell the timer cleanup that we are exiting
thread_instance::exiting();
//printf("Old thread: %p\n", &(*tmp));
if (tmp != nullptr) {
stop(tmp);
thread_instance::clear_top_level_timer();
//if (apex_options::untied_timers() == true) {
profiler * top_profiler = thread_instance::instance().get_current_profiler();
// tell the timer cleanup that we are exiting
thread_instance::exiting();
while (top_profiler != nullptr) {
stop(top_profiler);
top_profiler = thread_instance::instance().get_current_profiler();
}
/*
} else {
auto tmp = thread_instance::get_top_level_timer();
// tell the timer cleanup that we are exiting
thread_instance::exiting();
//printf("Old thread: %p\n", &(*tmp));
if (tmp != nullptr) {
stop(tmp);
thread_instance::clear_top_level_timer();
}
}
*/
// ok to set this now - we need everything still running
_exited = true;
event_data data;
Expand Down
3 changes: 3 additions & 0 deletions src/apex/profiler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class profiler {
task_identifier * task_id; // for counters, timers
public:
std::shared_ptr<task_wrapper> tt_ptr; // for timers
std::shared_ptr<task_wrapper> untied_parent; // for timer stack handling with untied timers
uint64_t start_ns;
uint64_t end_ns;
#if APEX_HAVE_PAPI
Expand Down Expand Up @@ -75,6 +76,7 @@ class profiler {
reset_type reset = reset_type::NONE) :
task_id(task->get_task_id()),
tt_ptr(task),
untied_parent(nullptr),
start_ns(our_clock::now_ns()),
#if APEX_HAVE_PAPI
papi_start_values{0,0,0,0,0,0,0,0},
Expand All @@ -97,6 +99,7 @@ class profiler {
reset_type reset = reset_type::NONE) :
task_id(id),
tt_ptr(nullptr),
untied_parent(nullptr),
start_ns(our_clock::now_ns()),
#if APEX_HAVE_PAPI
papi_start_values{0,0,0,0,0,0,0,0},
Expand Down
15 changes: 14 additions & 1 deletion src/apex/thread_instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,12 @@ string thread_instance::map_addr_to_name(apex_function_address function_address)
}

void thread_instance::set_current_profiler(profiler * the_profiler) {
if (apex_options::untied_timers() == true) {
// make the previous profiler on the "stack" the parent of this profiler
the_profiler->untied_parent = instance().untied_current_profiler;
// make this profiler the new top of the "stack"
instance().untied_current_profiler = the_profiler->tt_ptr;
}
instance().current_profilers.push_back(the_profiler);
//printf("%lu pushing %s\n", get_id(), the_profiler->get_task_id()->get_short_name().c_str());
}
Expand Down Expand Up @@ -275,6 +281,7 @@ profiler * thread_instance::restore_children_profilers(
void thread_instance::clear_all_profilers() {
// nothing to do?
if (current_profilers.empty() || !_is_worker) return;
if (apex_options::untied_timers() == true) { return; }
// copy the stack
auto the_stack(current_profilers);
auto tmp = the_stack.back();
Expand Down Expand Up @@ -369,7 +376,13 @@ void thread_instance::clear_current_profiler(profiler * the_profiler,
}

profiler * thread_instance::get_current_profiler(void) {
if (instance().current_profilers.empty()) { return nullptr; }
if (apex_options::untied_timers() == true) {
if (instance().untied_current_profiler == nullptr) {
return nullptr;
}
return instance().untied_current_profiler->prof;
}
else if (instance().current_profilers.empty()) { return nullptr; }
return instance().current_profilers.back();
}

Expand Down
9 changes: 8 additions & 1 deletion src/apex/thread_instance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ class thread_instance {
thread_instance (bool is_worker) :
_id(-1), _id_reversed(UINTMAX_MAX), _runtime_id(-1),
_top_level_timer_name(), _is_worker(is_worker), _task_count(0),
_top_level_timer(nullptr), _exiting(false) {
_top_level_timer(nullptr), _exiting(false),
untied_current_profiler(nullptr) {
/* Even do this for non-workers, because for CUPTI processing we need to
* generate GUIDs for the activity events! */
_id = common()._num_threads++;
Expand Down Expand Up @@ -124,6 +125,7 @@ class thread_instance {
// map from function address to name - unique to all threads to avoid locking
std::map<apex_function_address, std::string> _function_map;
std::vector<profiler*> current_profilers;
std::shared_ptr<task_wrapper> untied_current_profiler;
uint64_t _get_guid(void) {
// start at 1, because 0 means nullptr which means "no parent"
_task_count++;
Expand Down Expand Up @@ -161,6 +163,11 @@ class thread_instance {
static void clear_current_profiler() {
instance().current_profilers.pop_back();
}
static void clear_untied_current_profiler() {
auto tmp = instance().untied_current_profiler;
if (tmp == nullptr) return;
instance().untied_current_profiler = tmp->prof->untied_parent;
}
static const char * program_path(void);
static bool is_worker() { return instance()._is_worker; }
static uint64_t get_guid() { return instance()._get_guid(); }
Expand Down

0 comments on commit 159d301

Please sign in to comment.