Skip to content

Commit

Permalink
Fixing bug when threads call APEX API functions without first registe…
Browse files Browse the repository at this point in the history
…ring with APEX. That should now be handled correctly. Also debugging isues in the taskstubs API implementation, and making the printf statements controllable with the APEX verbose option.
  • Loading branch information
khuck committed Oct 1, 2024
1 parent 054c191 commit 0748e50
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 32 deletions.
37 changes: 37 additions & 0 deletions src/apex/apex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,17 @@ inline std::shared_ptr<task_wrapper> _new_task(
return tt_ptr;
}

/* This function helps because it's possible that the user could call the
* API from a new thread without registering the thread. So, make sure it
* got registered.
*/
bool register_thread_helper(void) {
if (!_registered) {
register_thread("Worker Thread", nullptr);
}
return true;
}

profiler* start(const std::string &timer_name)
{
in_apex prevent_deadlocks;
Expand Down Expand Up @@ -745,6 +756,8 @@ profiler* start(const std::string &timer_name)
APEX_UTIL_REF_COUNT_SUSPENDED_START
return profiler::get_disabled_profiler();
}
// make sure APEX knows about this worker thread!
[[maybe_unused]] thread_local static bool _helper = register_thread_helper();
std::shared_ptr<task_wrapper> tt_ptr(nullptr);
profiler * new_profiler = nullptr;
if (_notify_listeners) {
Expand Down Expand Up @@ -812,6 +825,8 @@ profiler* start(const apex_function_address function_address) {
APEX_UTIL_REF_COUNT_SUSPENDED_START
return profiler::get_disabled_profiler();
}
// make sure APEX knows about this worker thread!
[[maybe_unused]] thread_local static bool _helper = register_thread_helper();
std::shared_ptr<task_wrapper> tt_ptr(nullptr);
profiler * new_profiler = nullptr;
if (_notify_listeners) {
Expand Down Expand Up @@ -885,6 +900,8 @@ void start(std::shared_ptr<task_wrapper> tt_ptr) {
tt_ptr->prof = profiler::get_disabled_profiler();
return;
}
// make sure APEX knows about this worker thread!
[[maybe_unused]] thread_local static bool _helper = register_thread_helper();
// get the thread id that is running this task
if (tt_ptr->thread_id != thread_instance::instance().get_id()) {
printf("Task %s created by %lu started by %lu\n", tt_ptr->task_id->get_name().c_str(),
Expand Down Expand Up @@ -943,6 +960,8 @@ void resume(std::shared_ptr<task_wrapper> tt_ptr) {
APEX_UTIL_REF_COUNT_RESUME_AFTER_FINALIZE
return;
}
// make sure APEX knows about this worker thread!
[[maybe_unused]] thread_local static bool _helper = register_thread_helper();
//APEX_ASSERT(tt_ptr->state == task_wrapper::YIELDED);
tt_ptr->state = task_wrapper::RUNNING;
if (_notify_listeners) {
Expand Down Expand Up @@ -1054,6 +1073,8 @@ profiler* resume(profiler * p) {
APEX_UTIL_REF_COUNT_RESUME_AFTER_FINALIZE
return nullptr;
}
// make sure APEX knows about this worker thread!
[[maybe_unused]] thread_local static bool _helper = register_thread_helper();
p->restart();
APEX_ASSERT(p->tt_ptr->state == task_wrapper::STOPPED);
p->tt_ptr->state = task_wrapper::RUNNING;
Expand Down Expand Up @@ -1150,6 +1171,8 @@ void apex::stop_internal(profiler* the_profiler) {
APEX_UTIL_REF_COUNT_STOP_AFTER_FINALIZE
return;
}
// make sure APEX knows about this worker thread!
[[maybe_unused]] thread_local static bool _helper = register_thread_helper();
std::shared_ptr<profiler> p{the_profiler};
APEX_ASSERT(p->tt_ptr->state == task_wrapper::RUNNING);
p->tt_ptr->state = task_wrapper::STOPPED;
Expand Down Expand Up @@ -1213,6 +1236,8 @@ void stop(profiler* the_profiler, bool cleanup) {
APEX_UTIL_REF_COUNT_STOP_AFTER_FINALIZE
return;
}
// make sure APEX knows about this worker thread!
[[maybe_unused]] thread_local static bool _helper = register_thread_helper();
std::shared_ptr<profiler> p{the_profiler};
APEX_ASSERT(p->tt_ptr->state == task_wrapper::RUNNING);
p->tt_ptr->state = task_wrapper::STOPPED;
Expand Down Expand Up @@ -1298,6 +1323,8 @@ void stop(std::shared_ptr<task_wrapper> tt_ptr) {
if (tt_ptr->state == task_wrapper::YIELDED) {
resume(tt_ptr);
}
// make sure APEX knows about this worker thread!
[[maybe_unused]] thread_local static bool _helper = register_thread_helper();
APEX_ASSERT(tt_ptr->state == task_wrapper::RUNNING);
tt_ptr->state = task_wrapper::STOPPED;
if (tt_ptr->prof != nullptr) {
Expand Down Expand Up @@ -1362,6 +1389,8 @@ void yield(profiler* the_profiler) {
thread_instance::instance().clear_current_profiler(the_profiler, false,
null_task_wrapper);
}
// make sure APEX knows about this worker thread!
[[maybe_unused]] thread_local static bool _helper = register_thread_helper();
std::shared_ptr<profiler> p{the_profiler};
APEX_ASSERT(p->tt_ptr->state == task_wrapper::RUNNING);
p->tt_ptr->state = task_wrapper::YIELDED;
Expand Down Expand Up @@ -1418,6 +1447,8 @@ void yield(std::shared_ptr<task_wrapper> tt_ptr) {
thread_instance::instance().clear_current_profiler(tt_ptr->prof, true,
tt_ptr);
}
// make sure APEX knows about this worker thread!
[[maybe_unused]] thread_local static bool _helper = register_thread_helper();
std::shared_ptr<profiler> p{tt_ptr->prof};
APEX_ASSERT(tt_ptr->state == task_wrapper::RUNNING);
tt_ptr->state = task_wrapper::YIELDED;
Expand Down Expand Up @@ -1454,6 +1485,8 @@ void sample_value(const std::string &name, double value, bool threaded)
if (apex_options::suspend() == true) { return; }
apex* instance = apex::instance(); // get the Apex static instance
if (!instance) return; // protect against calls after finalization
// make sure APEX knows about this worker thread!
[[maybe_unused]] thread_local static bool _helper = register_thread_helper();
// parse the counter name
// either /threadqueue{locality#0/total}/length
// or /threadqueue{locality#0/worker-thread#0}/length
Expand Down Expand Up @@ -2459,6 +2492,8 @@ void send (uint64_t tag, uint64_t size, uint64_t target) {
apex* instance = apex::instance();
// protect against calls after finalization
if (!instance || _exited) { return ; }
// make sure APEX knows about this worker thread!
[[maybe_unused]] thread_local static bool _helper = register_thread_helper();

if (_notify_listeners) {
// eventually, we want to use the thread id, but for now, just use 0.
Expand Down Expand Up @@ -2488,6 +2523,8 @@ void recv (uint64_t tag, uint64_t size, uint64_t source_rank, uint64_t
apex* instance = apex::instance();
// protect against calls after finalization
if (!instance || _exited) { return ; }
// make sure APEX knows about this worker thread!
[[maybe_unused]] thread_local static bool _helper = register_thread_helper();

if (_notify_listeners) {
// eventually, we want to use the thread id, but for now, just use 0.
Expand Down
1 change: 1 addition & 0 deletions src/apex/profiler_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1903,6 +1903,7 @@ if (rc != 0) cout << "PAPI error! " << name << ": " << PAPI_strerror(rc) << endl
// get the right task identifier, based on whether there are aliases
profiler * p = new profiler(tt_ptr, is_resume);
p->thread_id = _pls.my_tid;
APEX_ASSERT(p->thread_id == (unsigned int)thread_instance::get_id());
p->guid = tt_ptr->guid;
thread_instance::instance().set_current_profiler(p);
#if APEX_HAVE_PAPI
Expand Down
94 changes: 67 additions & 27 deletions src/apex/taskstubs_implementation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include <mutex>
#include <unordered_map>
#include <stack>
#include <stdarg.h>


using maptype = std::unordered_map<tasktimer_guid_t,
std::shared_ptr<apex::task_wrapper>>;
Expand All @@ -32,10 +34,22 @@ maptype& getMyMap(void) {
return theMap;
}

void safePrint(const char * format, tasktimer_guid_t guid) {
int verbosePrint(int priority, const char *format, ...)
{
static std::mutex local_mtx;
std::scoped_lock lock{local_mtx};
printf("%lu %s GUID %lu\n", apex::thread_instance::get_id(), format, guid);
va_list args;
va_start(args, format);
vprintf(format, args);
va_end(args);
}

#define VERBOSE_PRINTF(...) if (apex::apex_options::use_verbose()) { printf(__VA_ARGS__); }

void safePrint(const char * format, tasktimer_guid_t guid, const char * event) {
//static std::mutex local_mtx;
//std::scoped_lock lock{local_mtx};
VERBOSE_PRINTF("%lu TS: %s: GUID %lu %s\n", apex::thread_instance::get_id(), format, guid, event);
return;
}

Expand All @@ -56,7 +70,7 @@ void safeInsert(
}

std::shared_ptr<apex::task_wrapper> safeLookup(
tasktimer_guid_t guid) {
tasktimer_guid_t guid, const char * event) {
#if 0
// in the thread local map?
auto task = getMyMap().find(guid);
Expand All @@ -77,7 +91,7 @@ std::shared_ptr<apex::task_wrapper> safeLookup(
std::scoped_lock lock{mtx()};
auto task = getCommonMap().find(guid);
if (task == getCommonMap().end()) {
safePrint("Not found", guid);
safePrint("Not found", guid, event);
//APEX_ASSERT(false);
return nullptr;
}
Expand Down Expand Up @@ -106,7 +120,7 @@ extern "C" {
}
void tasktimer_finalize_impl(void) {
/* Debatable whether we want to do this finalize */
apex::finalize();
//apex::finalize();
}
// measurement function declarations
tasktimer_timer_t tasktimer_create_impl(
Expand All @@ -117,16 +131,18 @@ extern "C" {
const uint64_t parent_count) {
static bool& over = apex::get_program_over();
if (over) return nullptr;
safePrint("Creating", timer_guid, timer_name);

// need to look up the parent shared pointers?
std::vector<std::shared_ptr<apex::task_wrapper>> parent_tasks;
for (uint64_t i = 0 ; i < parent_count ; i++) {
auto tmp = safeLookup(parent_guids[i]);
auto tmp = safeLookup(parent_guids[i], "parent lookup");
if (tmp != nullptr)
parent_tasks.push_back(tmp);
}
// if no name, use address
if (timer_name == nullptr || strlen(timer_name) == 0) {
//printf("Null name for timer: %p\n", function_address);
//VERBOSE_PRINTF("Null name for timer: %p\n", function_address);
if (parent_count > 0) {
auto task = apex::new_task(
(apex_function_address)function_address,
Expand All @@ -153,10 +169,20 @@ extern "C" {
}
return (tasktimer_timer_t)(timer_guid);
}

#define MAP_TASK(_timer, _apex_timer, _event) \
static bool& over_{apex::get_program_over()}; \
if (over_) return; \
uint64_t _tmp = (uint64_t)(_timer); \
auto _apex_timer = safeLookup(_tmp, _event);

void tasktimer_schedule_impl(
tasktimer_timer_t timer,
tasktimer_argument_value_p arguments,
uint64_t argument_count) {
MAP_TASK(timer, apex_timer, "schedule");
VERBOSE_PRINTF("%lu TS: Scheduling: %lu %s\n", apex::thread_instance::get_id(), timer,
apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str());
static bool& over = apex::get_program_over();
if (over) return;
// TODO: handle the schedule event, somehow
Expand All @@ -165,18 +191,13 @@ extern "C" {
APEX_UNUSED(argument_count);
}

#define MAP_TASK(_timer, _apex_timer) \
static bool& over_{apex::get_program_over()}; \
if (over_) return; \
uint64_t _tmp = (uint64_t)(_timer); \
auto _apex_timer = safeLookup(_tmp);

void tasktimer_start_impl(
tasktimer_timer_t timer,
tasktimer_execution_space_p) {
// TODO: capture the execution space, somehow...a new task?
MAP_TASK(timer, apex_timer);
//printf("%lu TS: Starting: %lu\n", apex::thread_instance::get_id(), timer);
MAP_TASK(timer, apex_timer, "start");
VERBOSE_PRINTF("%lu TS: Starting: %lu %s\n", apex::thread_instance::get_id(), timer,
apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str());
if (apex_timer != nullptr) {
apex::start(apex_timer);
}
Expand All @@ -186,8 +207,9 @@ extern "C" {
#if 1
static bool& over = apex::get_program_over();
if (over) return;
MAP_TASK(timer, apex_timer);
//printf("%lu TS: Yielding: %lu\n", apex::thread_instance::get_id(), timer);
MAP_TASK(timer, apex_timer, "yield");
VERBOSE_PRINTF("%lu TS: Yielding: %lu %s\n", apex::thread_instance::get_id(), timer,
apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str());
if (apex_timer != nullptr) {
apex::yield(apex_timer);
}
Expand All @@ -198,8 +220,9 @@ extern "C" {
tasktimer_execution_space_p) {
#if 1
// TODO: capture the execution space, somehow...a new task?
MAP_TASK(timer, apex_timer);
//printf("%lu TS: Resuming: %lu\n", apex::thread_instance::get_id(), timer);
MAP_TASK(timer, apex_timer, "resume");
VERBOSE_PRINTF("%lu TS: Resuming: %lu %s\n", apex::thread_instance::get_id(), timer,
apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str());
// TODO: why no resume function for task_wrapper objects?
if (apex_timer != nullptr) {
apex::resume(apex_timer);
Expand All @@ -210,15 +233,28 @@ extern "C" {
}
void tasktimer_stop_impl(
tasktimer_timer_t timer) {
MAP_TASK(timer, apex_timer);
//printf("%lu TS: Stopping: %lu\n", apex::thread_instance::get_id(), timer);
static std::set<tasktimer_timer_t> stopped;
static std::mutex local_mtx;
MAP_TASK(timer, apex_timer, "stop");
VERBOSE_PRINTF("%lu TS: Stopping: %lu %s\n", apex::thread_instance::get_id(), timer,
apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str());
if (apex_timer != nullptr) {
{
std::scoped_lock lock{local_mtx};
if (stopped.count(timer) > 0) {
VERBOSE_PRINTF("%lu TS: ERROR! TIMER STOPPED TWICE! : %lu\n", apex::thread_instance::get_id(), timer);
return;
}
stopped.insert(timer);
}
apex::stop(apex_timer);
}
}
void tasktimer_destroy_impl(
tasktimer_timer_t timer) {
MAP_TASK(timer, apex_timer);
MAP_TASK(timer, apex_timer, "destroy");
VERBOSE_PRINTF("%lu TS: Destroying: %lu %s\n", apex::thread_instance::get_id(), timer,
apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str());
if (apex_timer != nullptr) {
// TODO: need to handle the destroy event somehow.
// definitely need to remove it from the local map.
Expand All @@ -229,10 +265,12 @@ extern "C" {
tasktimer_timer_t timer,
const tasktimer_guid_t* parents, const uint64_t parent_count) {
// TODO: need to handle the add parents event
MAP_TASK(timer, apex_timer);
MAP_TASK(timer, apex_timer, "add parents");
VERBOSE_PRINTF("%lu TS: Adding parents: %lu %s\n", apex::thread_instance::get_id(), timer,
apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str());
if (apex_timer != nullptr) {
for (uint64_t i = 0 ; i < parent_count ; i++) {
auto tmp = safeLookup(parents[i]);
auto tmp = safeLookup(parents[i], "parent lookup");
if (tmp != nullptr) {
// add the parent to the child
apex_timer->parents.push_back(tmp);
Expand All @@ -249,10 +287,12 @@ extern "C" {
tasktimer_timer_t timer,
const tasktimer_guid_t* children, const uint64_t child_count) {
// TODO: need to handle the add children event
MAP_TASK(timer, apex_timer);
MAP_TASK(timer, apex_timer, "add children");
VERBOSE_PRINTF("%lu TS: Adding children: %lu %s\n", apex::thread_instance::get_id(), timer,
apex_timer == nullptr ? "unknown" : apex_timer->task_id->get_name().c_str());
if (apex_timer != nullptr) {
for (uint64_t i = 0 ; i < child_count ; i++) {
auto tmp = safeLookup(children[i]);
auto tmp = safeLookup(children[i], "child lookup");
if (tmp != nullptr) {
// add the parent to the child
tmp->parents.push_back(apex_timer);
Expand Down Expand Up @@ -288,7 +328,7 @@ extern "C" {
tasktimer_execution_space_p dest_type,
const char* dest_name,
const void* dest_ptr) {
std::shared_ptr<apex::task_wrapper> parent = safeLookup(guid);
std::shared_ptr<apex::task_wrapper> parent = safeLookup(guid, "data transfer");
auto task = apex::new_task("data xfer", 0, parent);
timerStack(task, true);
}
Expand Down
8 changes: 5 additions & 3 deletions src/apex/thread_instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,15 +341,17 @@ void thread_instance::clear_current_profiler_untied(profiler * the_profiler,
// since we aren't yielding, just stop the children.
stop(tmp->prof); // we better be re-entrant safe!
}
// this is a serious problem...
// this is a serious problem...or is it? no!
if (tmp->prof->untied_parent == nullptr) {
/*
// unless...we happen to be exiting. Bets are off.
if (apex_options::suspend() == true) { return; }
// if we've already cleared the stack on this thread, we're fine
if (instance()._exiting) { return; }
std::cerr << "Warning! empty profiler stack!\n";
std::cerr << "Warning! empty profiler stack!" << __LINE__ << "\n";
APEX_ASSERT(false);
//abort();
*/
return;
}
// get the new top of the stack
Expand Down Expand Up @@ -423,7 +425,7 @@ void thread_instance::clear_current_profiler(profiler * the_profiler,
if (apex_options::suspend() == true) { return; }
// if we've already cleared the stack on this thread, we're fine
if (instance()._exiting) { return; }
std::cerr << "Warning! empty profiler stack!\n";
std::cerr << "Warning! empty profiler stack!" << __LINE__ << "\n";
APEX_ASSERT(false);
//abort();
return;
Expand Down
Loading

0 comments on commit 0748e50

Please sign in to comment.